common_readableStreams.js (10182B)
1 const SAME_COMPARTMENT = "same-compartment"; 2 const IFRAME_COMPARTMENT = "iframe-compartment"; 3 const BIG_BUFFER_SIZE = 1000000; 4 const ITER_MAX = 10; 5 6 function makeBuffer(size) { 7 let buffer = new Uint8Array(size); 8 buffer.fill(42); 9 10 let value = 0; 11 for (let i = 0; i < 1000000; i += 1000) { 12 buffer.set([++value % 255], i); 13 } 14 15 return buffer; 16 } 17 18 function apply_compartment(compartment, data) { 19 if (compartment == SAME_COMPARTMENT) { 20 return self[data.func](data.args, self); 21 } 22 23 if (compartment == IFRAME_COMPARTMENT) { 24 const iframe = document.querySelector("#iframe").contentWindow; 25 return iframe[data.func](data.args, self); 26 } 27 28 ok(false, "Invalid compartment value"); 29 return undefined; 30 } 31 32 async function test_nativeStream(compartment) { 33 info("test_nativeStream"); 34 35 let r = await fetch("/"); 36 37 return apply_compartment(compartment, { 38 func: "test_nativeStream_continue", 39 args: r, 40 }); 41 } 42 43 async function test_nativeStream_continue(r, that) { 44 that.ok(r.body instanceof that.ReadableStream, "We have a ReadableStream"); 45 46 let a = r.clone(); 47 that.ok(a instanceof that.Response, "We have a cloned Response"); 48 that.ok(a.body instanceof that.ReadableStream, "We have a ReadableStream"); 49 50 let b = a.clone(); 51 that.ok(b instanceof that.Response, "We have a cloned Response"); 52 that.ok(b.body instanceof that.ReadableStream, "We have a ReadableStream"); 53 54 let blob = await r.blob(); 55 56 that.ok(blob instanceof that.Blob, "We have a blob"); 57 let d = await a.body.getReader().read(); 58 59 that.ok(!d.done, "We have read something!"); 60 blob = await b.blob(); 61 62 that.ok(blob instanceof that.Blob, "We have a blob"); 63 } 64 65 async function test_timeout(compartment) { 66 info("test_timeout"); 67 68 let blob = new Blob([""]); 69 let r = await fetch(URL.createObjectURL(blob)); 70 71 return apply_compartment(compartment, { 72 func: "test_timeout_continue", 73 args: r, 74 }); 75 } 76 77 async function test_timeout_continue(r, that) { 78 await r.body.getReader().read(); 79 80 await new Promise(resolve => setTimeout(resolve, 0)); 81 82 try { 83 await r.blob(); 84 that.ok(false, "We cannot have a blob here!"); 85 } catch (exc) { 86 that.ok(true, "We cannot have a blob here!"); 87 } 88 } 89 90 async function test_nonNativeStream(compartment) { 91 info("test_nonNativeStream"); 92 93 let buffer = makeBuffer(BIG_BUFFER_SIZE); 94 info("Buffer size: " + buffer.byteLength); 95 96 let r = new Response( 97 new ReadableStream({ 98 start: controller => { 99 controller.enqueue(buffer); 100 controller.close(); 101 }, 102 }) 103 ); 104 105 return apply_compartment(compartment, { 106 func: "test_nonNativeStream_continue", 107 args: { r, buffer }, 108 }); 109 } 110 111 async function test_nonNativeStream_continue(data, that) { 112 that.ok( 113 data.r.body instanceof that.ReadableStream, 114 "We have a ReadableStream" 115 ); 116 117 let a = data.r.clone(); 118 that.ok(a instanceof that.Response, "We have a cloned Response"); 119 that.ok(a.body instanceof that.ReadableStream, "We have a ReadableStream"); 120 121 let b = a.clone(); 122 that.ok(b instanceof that.Response, "We have a cloned Response"); 123 that.ok(b.body instanceof that.ReadableStream, "We have a ReadableStream"); 124 125 let blob = await data.r.blob(); 126 127 that.ok(blob instanceof that.Blob, "We have a blob"); 128 let d = await a.body.getReader().read(); 129 130 that.ok(!d.done, "We have read something!"); 131 blob = await b.blob(); 132 133 that.ok(blob instanceof that.Blob, "We have a blob"); 134 that.is(blob.size, data.buffer.byteLength, "Blob size matches"); 135 } 136 137 async function test_noUint8Array(compartment) { 138 info("test_noUint8Array"); 139 140 let r = new Response( 141 new ReadableStream({ 142 start: controller => { 143 controller.enqueue("hello world!"); 144 controller.close(); 145 }, 146 }) 147 ); 148 149 return apply_compartment(compartment, { 150 func: "test_noUint8Array_continue", 151 args: r, 152 }); 153 } 154 155 async function test_noUint8Array_continue(r, that) { 156 that.ok(r.body instanceof that.ReadableStream, "We have a ReadableStream"); 157 158 try { 159 await r.blob(); 160 that.ok(false, "We cannot have a blob here!"); 161 } catch (ex) { 162 that.ok(true, "We cannot have a blob here!"); 163 } 164 } 165 166 async function test_pendingStream(compartment) { 167 let r = new Response( 168 new ReadableStream({ 169 start: controller => { 170 controller.enqueue(makeBuffer(BIG_BUFFER_SIZE)); 171 // Let's keep this controler open. 172 self.ccc = controller; 173 }, 174 }) 175 ); 176 177 return apply_compartment(compartment, { 178 func: "test_pendingStream_continue", 179 args: r, 180 }); 181 } 182 183 async function test_pendingStream_continue(r, that) { 184 let d = await r.body.getReader().read(); 185 186 that.ok(!d.done, "We have read something!"); 187 188 if ("close" in that) { 189 that.close(); 190 } 191 } 192 193 async function test_nativeStream_cache(compartment) { 194 info("test_nativeStream_cache"); 195 196 let origBody = "123456789abcdef"; 197 let url = "/nativeStream"; 198 199 let cache = await caches.open("nativeStream"); 200 201 info("Storing a body as a string"); 202 await cache.put(url, new Response(origBody)); 203 204 return apply_compartment(compartment, { 205 func: "test_nativeStream_cache_continue", 206 args: { caches, cache, url, origBody }, 207 }); 208 } 209 210 async function test_nativeStream_cache_continue(data, that) { 211 that.info("Retrieving the stored value"); 212 let cacheResponse = await data.cache.match(data.url); 213 214 that.info("Converting the response to text"); 215 let cacheBody = await cacheResponse.text(); 216 217 that.is(data.origBody, cacheBody, "Bodies match"); 218 219 await data.caches.delete("nativeStream"); 220 } 221 222 async function test_nonNativeStream_cache(compartment) { 223 info("test_nonNativeStream_cache"); 224 225 let url = "/nonNativeStream"; 226 227 let cache = await caches.open("nonNativeStream"); 228 let buffer = makeBuffer(BIG_BUFFER_SIZE); 229 info("Buffer size: " + buffer.byteLength); 230 231 info("Storing a body as a string"); 232 let r = new Response( 233 new ReadableStream({ 234 start: controller => { 235 controller.enqueue(buffer); 236 controller.close(); 237 }, 238 }) 239 ); 240 241 return apply_compartment(compartment, { 242 func: "test_nonNativeStream_cache_continue", 243 args: { caches, cache, buffer, r }, 244 }); 245 } 246 247 async function test_nonNativeStream_cache_continue(data, that) { 248 await data.cache.put(data.url, data.r); 249 250 that.info("Retrieving the stored value"); 251 let cacheResponse = await data.cache.match(data.url); 252 253 that.info("Converting the response to text"); 254 let cacheBody = await cacheResponse.arrayBuffer(); 255 256 that.ok(cacheBody instanceof that.ArrayBuffer, "Body is an array buffer"); 257 that.is(cacheBody.byteLength, BIG_BUFFER_SIZE, "Body length is correct"); 258 259 let value = 0; 260 for (let i = 0; i < 1000000; i += 1000) { 261 that.is( 262 new Uint8Array(cacheBody)[i], 263 ++value % 255, 264 "byte in position " + i + " is correct" 265 ); 266 } 267 268 await data.caches.delete("nonNativeStream"); 269 } 270 271 async function test_codeExecution(compartment) { 272 info("test_codeExecution"); 273 274 let r = new Response( 275 new ReadableStream({ 276 start(c) { 277 controller = c; 278 }, 279 pull() { 280 console.log("pull called"); 281 }, 282 }) 283 ); 284 285 return apply_compartment(compartment, { 286 func: "test_codeExecution_continue", 287 args: r, 288 }); 289 } 290 291 // This is intended to just be a drop-in replacement for an old observer 292 // notification. 293 function addConsoleStorageListener(listener) { 294 const ConsoleAPIStorage = SpecialPowers.Cc[ 295 "@mozilla.org/consoleAPI-storage;1" 296 ].getService(SpecialPowers.Ci.nsIConsoleAPIStorage); 297 listener.__handler = (message, id) => { 298 listener.observe(message, id); 299 }; 300 ConsoleAPIStorage.addLogEventListener( 301 listener.__handler, 302 SpecialPowers.wrap(document).nodePrincipal 303 ); 304 } 305 306 function removeConsoleStorageListener(listener) { 307 const ConsoleAPIStorage = SpecialPowers.Cc[ 308 "@mozilla.org/consoleAPI-storage;1" 309 ].getService(SpecialPowers.Ci.nsIConsoleAPIStorage); 310 ConsoleAPIStorage.removeLogEventListener(listener.__handler); 311 } 312 313 async function test_codeExecution_continue(r, that) { 314 function consoleListener() { 315 addConsoleStorageListener(this); 316 } 317 318 var promise = new Promise(resolve => { 319 consoleListener.prototype = { 320 observe(aSubject) { 321 that.ok(true, "Something has been received"); 322 323 var obj = aSubject.wrappedJSObject; 324 if (obj.arguments[0] && obj.arguments[0] === "pull called") { 325 that.ok(true, "Message received!"); 326 removeConsoleStorageListener(this); 327 resolve(); 328 } 329 }, 330 }; 331 }); 332 333 var cl = new consoleListener(); 334 335 r.body.getReader().read(); 336 await promise; 337 } 338 339 async function test_global(compartment) { 340 info("test_global: " + compartment); 341 342 self.foo = 42; 343 self.iter = ITER_MAX; 344 345 let r = new Response( 346 new ReadableStream({ 347 start(c) { 348 self.controller = c; 349 }, 350 pull() { 351 if (!("iter" in self) || self.iter < 0 || self.iter > ITER_MAX) { 352 throw "Something bad is happening here!"; 353 } 354 355 let buffer = new Uint8Array(1); 356 buffer.fill(self.foo); 357 self.controller.enqueue(buffer); 358 359 if (--self.iter == 0) { 360 controller.close(); 361 } 362 }, 363 }) 364 ); 365 366 return apply_compartment(compartment, { 367 func: "test_global_continue", 368 args: r, 369 }); 370 } 371 372 async function test_global_continue(r, that) { 373 let a = await r.arrayBuffer(); 374 375 that.is( 376 Object.getPrototypeOf(a), 377 that.ArrayBuffer.prototype, 378 "Body is an array buffer" 379 ); 380 that.is(a.byteLength, ITER_MAX, "Body length is correct"); 381 382 for (let i = 0; i < ITER_MAX; ++i) { 383 that.is(new Uint8Array(a)[i], 42, "Byte " + i + " is correct"); 384 } 385 } 386 387 function workify(func) { 388 info("Workifying " + func); 389 390 return new Promise((resolve, reject) => { 391 let worker = new Worker("worker_readableStreams.js"); 392 worker.postMessage(func); 393 worker.onmessage = function (e) { 394 if (e.data.type == "done") { 395 resolve(); 396 return; 397 } 398 399 if (e.data.type == "error") { 400 reject(e.data.message); 401 return; 402 } 403 404 if (e.data.type == "test") { 405 ok(e.data.test, e.data.message); 406 return; 407 } 408 409 if (e.data.type == "info") { 410 info(e.data.message); 411 } 412 }; 413 }); 414 }