dispatcher.js (9640B)
1 // Define a universal message passing API. It works cross-origin and across 2 // browsing context groups. 3 const dispatcher_path = "/common/dispatcher/dispatcher.py"; 4 5 // Finds the nearest ancestor window that has a non srcdoc location. This should 6 // give us a usable location for constructing further URLs. 7 function findLocationFromAncestors(w) { 8 if (w.location.href == 'about:srcdoc') { 9 return findLocationFromAncestors(w.parent); 10 } 11 return w.location; 12 } 13 14 // Handles differences between workers vs frames (src vs srcdoc). 15 function findLocation() { 16 if (location.href == 'about:srcdoc') { 17 return findLocationFromAncestors(window.parent); 18 } 19 if (location.protocol == 'blob:' || location.protocol == 'data:') { 20 // Allows working around blob and data URLs. 21 if (self.document && self.document.baseURI) { 22 return self.document.baseURI; 23 } 24 } 25 return location; 26 } 27 28 const dispatcherLocation = findLocation(); 29 const dispatcher_url = new URL(dispatcher_path, dispatcherLocation).href; 30 31 // Return a promise, limiting the number of concurrent accesses to a shared 32 // resources to |max_concurrent_access|. 33 const concurrencyLimiter = (max_concurrency) => { 34 let pending = 0; 35 let waiting = []; 36 return async (task) => { 37 pending++; 38 if (pending > max_concurrency) 39 await new Promise(resolve => waiting.push(resolve)); 40 let result = await task(); 41 pending--; 42 waiting.shift()?.(); 43 return result; 44 }; 45 } 46 47 // Wait for a random amount of time in the range [10ms,100ms]. 48 const randomDelay = () => { 49 return new Promise(resolve => setTimeout(resolve, 10 + 90*Math.random())); 50 } 51 52 // Sending too many requests in parallel causes congestion. Limiting it improves 53 // throughput. 54 // 55 // Note: The following table has been determined on the test: 56 // ../cache-storage.tentative.https.html 57 // using Chrome with a 64 core CPU / 64GB ram, in release mode: 58 // ┌───────────┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬────┐ 59 // │concurrency│ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ 10│ 15│ 20│ 30│ 50│ 100│ 60 // ├───────────┼───┼───┼───┼───┼───┼───┼───┼───┼───┼───┼───┼────┤ 61 // │time (s) │ 54│ 38│ 31│ 29│ 26│ 24│ 22│ 22│ 22│ 22│ 34│ 36 │ 62 // └───────────┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴────┘ 63 const limiter = concurrencyLimiter(6); 64 65 // While requests to different remote contexts can go in parallel, we need to 66 // ensure that requests to each remote context are done in order. This maps a 67 // uuid to a queue of requests to send. A queue is processed until it is empty 68 // and then is deleted from the map. 69 const sendQueues = new Map(); 70 71 // Sends a single item (with rate-limiting) and calls the associated resolver 72 // when it is successfully sent. 73 const sendItem = async function (uuid, resolver, message) { 74 await limiter(async () => { 75 // Requests might be dropped. Retry until getting a confirmation it has been 76 // processed. 77 while(1) { 78 try { 79 let response = await fetch(dispatcher_url + `?uuid=${uuid}`, { 80 method: 'POST', 81 body: message 82 }) 83 if (await response.text() == "done") { 84 resolver(); 85 return; 86 } 87 } catch (fetch_error) {} 88 await randomDelay(); 89 }; 90 }); 91 } 92 93 // While the queue is non-empty, send the next item. This is async and new items 94 // may be added to the queue while others are being sent. 95 const processQueue = async function (uuid, queue) { 96 while (queue.length) { 97 const [resolver, message] = queue.shift(); 98 await sendItem(uuid, resolver, message); 99 } 100 // The queue is empty, delete it. 101 sendQueues.delete(uuid); 102 } 103 104 const send = async function (uuid, message) { 105 const itemSentPromise = new Promise((resolve) => { 106 const item = [resolve, message]; 107 if (sendQueues.has(uuid)) { 108 // There is already a queue for `uuid`, just add to it and it will be processed. 109 sendQueues.get(uuid).push(item); 110 } else { 111 // There is no queue for `uuid`, create it and start processing. 112 const queue = [item]; 113 sendQueues.set(uuid, queue); 114 processQueue(uuid, queue); 115 } 116 }); 117 // Wait until the item has been successfully sent. 118 await itemSentPromise; 119 } 120 121 const receive = async function (uuid) { 122 while(1) { 123 let data = "not ready"; 124 try { 125 data = await limiter(async () => { 126 let response = await fetch(dispatcher_url + `?uuid=${uuid}`); 127 return await response.text(); 128 }); 129 } catch (fetch_error) {} 130 131 if (data == "not ready") { 132 await randomDelay(); 133 continue; 134 } 135 136 return data; 137 } 138 } 139 140 // Returns an URL. When called, the server sends toward the `uuid` queue the 141 // request headers. Useful for determining if something was requested with 142 // Cookies. 143 const showRequestHeaders = function(origin, uuid) { 144 return origin + dispatcher_path + `?uuid=${uuid}&show-headers`; 145 } 146 147 // Same as above, except for the response is cacheable. 148 const cacheableShowRequestHeaders = function(origin, uuid) { 149 return origin + dispatcher_path + `?uuid=${uuid}&cacheable&show-headers`; 150 } 151 152 // This script requires 153 // - `/common/utils.js` for `token()`. 154 155 // Returns the URL of a document that can be used as a `RemoteContext`. 156 // 157 // `uuid` should be a UUID uniquely identifying the given remote context. 158 // `options` has the following shape: 159 // 160 // { 161 // host: (optional) Sets the returned URL's `host` property. Useful for 162 // cross-origin executors. 163 // protocol: (optional) Sets the returned URL's `protocol` property. 164 // } 165 function remoteExecutorUrl(uuid, options) { 166 const url = new URL("/common/dispatcher/remote-executor.html", dispatcherLocation); 167 url.searchParams.set("uuid", uuid); 168 169 if (options?.host) { 170 url.host = options.host; 171 } 172 173 if (options?.protocol) { 174 url.protocol = options.protocol; 175 } 176 177 return url; 178 } 179 180 // Represents a remote executor. For more detailed explanation see `README.md`. 181 class RemoteContext { 182 // `uuid` is a UUID string that identifies the remote context and should 183 // match with the `uuid` parameter of the URL of the remote context. 184 constructor(uuid) { 185 this.context_id = uuid; 186 } 187 188 // Evaluates the script `expr` on the executor. 189 // - If `expr` is evaluated to a Promise that is resolved with a value: 190 // `execute_script()` returns a Promise resolved with the value. 191 // - If `expr` is evaluated to a non-Promise value: 192 // `execute_script()` returns a Promise resolved with the value. 193 // - If `expr` throws an error or is evaluated to a Promise that is rejected: 194 // `execute_script()` returns a rejected Promise with the error's 195 // `message`. 196 // Note that currently the type of error (e.g. DOMException) is not 197 // preserved, except for `TypeError`. 198 // The values should be able to be serialized by JSON.stringify(). 199 async execute_script(fn, args) { 200 const receiver = token(); 201 await this.send({receiver: receiver, fn: fn.toString(), args: args}); 202 const response = JSON.parse(await receive(receiver)); 203 if (response.status === 'success') { 204 return response.value; 205 } 206 207 // exception 208 if (response.name === 'TypeError') { 209 throw new TypeError(response.value); 210 } 211 throw new Error(response.value); 212 } 213 214 async send(msg) { 215 return await send(this.context_id, JSON.stringify(msg)); 216 } 217 }; 218 219 class Executor { 220 constructor(uuid) { 221 this.uuid = uuid; 222 223 // If `suspend_callback` is not `null`, the executor should be suspended 224 // when there are no ongoing tasks. 225 this.suspend_callback = null; 226 227 this.execute(); 228 } 229 230 // Wait until there are no ongoing tasks nor fetch requests for polling 231 // tasks, and then suspend the executor and call `callback()`. 232 // Navigation from the executor page should be triggered inside `callback()`, 233 // to avoid conflict with in-flight fetch requests. 234 suspend(callback) { 235 this.suspend_callback = callback; 236 } 237 238 resume() { 239 } 240 241 async execute() { 242 while(true) { 243 if (this.suspend_callback !== null) { 244 this.suspend_callback(); 245 this.suspend_callback = null; 246 // Wait for `resume()` to be called. 247 await new Promise(resolve => this.resume = resolve); 248 249 // Workaround for https://crbug.com/1244230. 250 // Without this workaround, the executor is resumed and the fetch 251 // request to poll the next task is initiated synchronously from 252 // pageshow event after the page restored from BFCache, and the fetch 253 // request promise is never resolved (and thus the test results in 254 // timeout) due to https://crbug.com/1244230. The root cause is not yet 255 // known, but setTimeout() with 0ms causes the resume triggered on 256 // another task and seems to resolve the issue. 257 await new Promise(resolve => setTimeout(resolve, 0)); 258 259 continue; 260 } 261 262 const task = JSON.parse(await receive(this.uuid)); 263 264 let response; 265 try { 266 const value = await eval(task.fn).apply(null, task.args); 267 response = JSON.stringify({ 268 status: 'success', 269 value: value 270 }); 271 } catch(e) { 272 response = JSON.stringify({ 273 status: 'exception', 274 name: e.name, 275 value: e.message 276 }); 277 } 278 await send(task.receiver, response); 279 } 280 } 281 }