worker-utils.js (3742B)
1 /* This Source Code Form is subject to the terms of the Mozilla Public 2 * License, v. 2.0. If a copy of the MPL was not distributed with this 3 * file, You can obtain one at <http://mozilla.org/MPL/2.0/>. */ 4 5 "use strict"; 6 7 class WorkerDispatcher { 8 #msgId = 1; 9 #worker = null; 10 // Map of message ids -> promise resolution functions, for dispatching worker responses 11 #pendingCalls = new Map(); 12 #url = ""; 13 14 constructor(url) { 15 this.#url = url; 16 } 17 18 start() { 19 // When running in debugger jest test, we don't have access to ChromeWorker 20 if (typeof ChromeWorker == "function") { 21 this.#worker = new ChromeWorker(this.#url); 22 } else { 23 this.#worker = new Worker(this.#url); 24 } 25 this.#worker.onerror = err => { 26 console.error(`Error in worker ${this.#url}`, err.message); 27 }; 28 this.#worker.addEventListener("message", this.#onMessage); 29 } 30 31 stop() { 32 if (!this.#worker) { 33 return; 34 } 35 36 this.#worker.removeEventListener("message", this.#onMessage); 37 this.#worker.terminate(); 38 this.#worker = null; 39 this.#pendingCalls.clear(); 40 } 41 42 task(method, { queue = false } = {}) { 43 const calls = []; 44 const push = args => { 45 return new Promise((resolve, reject) => { 46 if (queue && calls.length === 0) { 47 Promise.resolve().then(flush); 48 } 49 50 calls.push({ args, resolve, reject }); 51 52 if (!queue) { 53 flush(); 54 } 55 }); 56 }; 57 58 const flush = () => { 59 const items = calls.slice(); 60 calls.length = 0; 61 62 if (!this.#worker) { 63 this.start(); 64 } 65 66 const id = this.#msgId++; 67 this.#worker.postMessage({ 68 id, 69 method, 70 calls: items.map(item => item.args), 71 }); 72 73 this.#pendingCalls.set(id, items); 74 }; 75 76 return (...args) => push(args); 77 } 78 79 invoke(method, ...args) { 80 return this.task(method)(...args); 81 } 82 83 #onMessage = ({ data: result }) => { 84 const items = this.#pendingCalls.get(result.id); 85 this.#pendingCalls.delete(result.id); 86 if (!items) { 87 return; 88 } 89 90 if (!this.#worker) { 91 return; 92 } 93 94 result.results.forEach((resultData, i) => { 95 const { resolve, reject } = items[i]; 96 97 if (resultData.error) { 98 const err = new Error(resultData.message); 99 err.metadata = resultData.metadata; 100 reject(err); 101 } else { 102 resolve(resultData.response); 103 } 104 }); 105 }; 106 } 107 108 function workerHandler(publicInterface) { 109 return function (msg) { 110 const { id, method, calls } = msg.data; 111 112 Promise.all( 113 calls.map(args => { 114 try { 115 const response = publicInterface[method].apply(undefined, args); 116 if (response instanceof Promise) { 117 return response.then( 118 val => ({ response: val }), 119 err => asErrorMessage(err) 120 ); 121 } 122 return { response }; 123 } catch (error) { 124 return asErrorMessage(error); 125 } 126 }) 127 ).then(results => { 128 globalThis.postMessage({ id, results }); 129 }); 130 }; 131 } 132 133 function asErrorMessage(error) { 134 if (typeof error === "object" && error && "message" in error) { 135 // Error can't be sent via postMessage, so be sure to convert to 136 // string. 137 return { 138 error: true, 139 message: 140 error.message + 141 (error.stack ? "\nStack in the worker:" + error.stack : ""), 142 metadata: error.metadata, 143 }; 144 } 145 146 return { 147 error: true, 148 message: error == null ? error : error.toString(), 149 metadata: undefined, 150 }; 151 } 152 153 // Might be loaded within a worker thread where `module` isn't available. 154 if (typeof module !== "undefined") { 155 module.exports = { 156 WorkerDispatcher, 157 workerHandler, 158 }; 159 }