RTCDataChannel-worker-shim.js (7270B)
1 // This can be used to transfer an RTCDataChannel to a worker, and expose an 2 // interface that acts as a passthrough to the channel on the worker. There are 3 // some caveats though: 4 // * certain kinds of error won't propagate back to the callsite, and will 5 // manifest as an unhandled error (eg; webidl errors on attribute setters) 6 // * the event handler/GC interactions won't translate exactly (because the 7 // worker code registers all handler types up front, instead of waiting for 8 // registrations to happen on the wrapper) 9 // * RTCDataChannel.label must be unique on the worker 10 class WorkerBackedDataChannel extends EventTarget { 11 #worker; 12 #dcAttrs; 13 #eventHandlers; 14 #errorPromise; 15 #label; 16 17 // If you want to make multiple of these with the same worker, create first 18 // with no args, and the others with first.worker 19 constructor(worker = null) { 20 super(); 21 this.#worker = worker || WorkerBackedDataChannel.makeWorker(); 22 23 // Cache of the RTTCDataChannel's state attributes, filled on init. Some are 24 // updated by state updates later. 25 this.#dcAttrs = null; 26 27 // For tracking the onxxxx-style event callbacks 28 // TODO: Maybe there's a simpler way to do this? 29 this.#eventHandlers = new Map(); 30 31 this.#listenForEventMessages(); 32 33 // Ejection seat that we put in our promises, for cases where we've misused 34 // the worker (or its code), or encountered some sort of unhandled error 35 this.#errorPromise = new Promise((_, reject) => { 36 // the Worker 'error' and 'messageerror' events 37 const onErrorEvent = (e) => { 38 switch (e.type) { 39 case 'error': 40 case 'messageerror': 41 reject(new Error(`Worker sent ${e.type} event: ${e.message}`)); 42 break; 43 } 44 }; 45 this.#worker.addEventListener('error', onErrorEvent); 46 this.#worker.addEventListener('messageerror', onErrorEvent); 47 48 // Unhandled exceptions thrown by *our* worker code; not Worker error 49 // events (those are handled above), and not errors thrown by 50 // RTCDataChannel (those are handled in #sendRequestToWorker) 51 this.#worker.addEventListener('message', ({data}) => { 52 const {type, label, result} = data; 53 if (type == 'workerError' && 54 (label === undefined || label == this.#label)) { 55 reject(new Error( 56 `Worker code sent error message: ${result}`)); 57 } 58 }); 59 }); 60 } 61 62 async init(channel) { 63 this.#label = channel.label; 64 65 // DO NOT GO ASYNC BEFORE THIS! Doing so will render the channel 66 // untransferable. 67 const initPromise = this.#sendRequestToWorker('init', channel, [channel]); 68 this.#dcAttrs = await Promise.race([initPromise, this.#errorPromise]); 69 return this.#dcAttrs; 70 } 71 72 static makeWorker() { 73 return new Worker('/webrtc/RTCDataChannel-worker.js'); 74 } 75 76 // Make it easy to put more channels on this worker 77 get worker() { return this.#worker; } 78 79 // Read-only attributes 80 get label() { return this.#dcAttrs.label; } 81 get ordered() { return this.#dcAttrs.ordered; } 82 get maxPacketLifeTime() { return this.#dcAttrs.maxPacketLifeTime; } 83 get maxRetransmits() { return this.#dcAttrs.maxRetransmits; } 84 get protocol() { return this.#dcAttrs.protocol; } 85 get negotiated() { return this.#dcAttrs.negotiated; } 86 get id() { return this.#dcAttrs.id; } 87 get readyState() { return this.#dcAttrs.readyState; } 88 get bufferedAmount() { return this.#dcAttrs.bufferedAmount; } 89 90 // Writable attributes 91 set bufferedAmountLowThreshold(val) { 92 this.#dcAttrs.bufferedAmountLowThreshold = val; 93 this.#sendRequestToWorker('setBufferedAmountLowThreshold', val); 94 } 95 get bufferedAmountLowThreshold() { 96 return this.#dcAttrs.bufferedAmountLowThreshold; 97 } 98 99 set binaryType(val) { 100 this.#dcAttrs.binaryType = val; 101 this.#sendRequestToWorker('setBinaryType', val); 102 } 103 get binaryType() { 104 return this.#dcAttrs.binaryType; 105 } 106 107 // Note: these do not try to match the way the handler is registered on the 108 // other end (eg; dc.onopen = handler is performed on the worker as an 109 // addEventListener call, not as workerDc.onopen = func). This means that 110 // this wrapper is not suitable for testing GC logic based on event handlers. 111 set onopen(fn) { this.#setEventHandler('open', fn); } 112 set onbufferedamountlow(fn) { this.#setEventHandler('bufferedamountlow', fn); } 113 set onerror(fn) { this.#setEventHandler('error', fn); } 114 set onclosing(fn) { this.#setEventHandler('closing', fn); } 115 set onclose(fn) { this.#setEventHandler('close', fn); } 116 set onmessage(fn) { this.#setEventHandler('message', fn); } 117 118 async send(data) { 119 return this.#sendRequestToWorker('send', data); 120 } 121 122 async close() { 123 return this.#sendRequestToWorker('close'); 124 } 125 126 // Used to refresh readyState, bufferedAmount, and id 127 async updateState() { 128 const resp = await Promise.race([this.#sendRequestToWorker('queryState'), this.#errorPromise]); 129 this.#dcAttrs.readyState = resp.readyState; 130 this.#dcAttrs.bufferedAmount = resp.bufferedAmount; 131 this.#dcAttrs.id = resp.id; 132 return resp; 133 } 134 135 #setEventHandler(type, handler) { 136 // Listener might not exist, removeEventListener doesn't care 137 this.removeEventListener(type, this.#eventHandlers.get(type)); 138 this.#eventHandlers.delete(type); 139 if (handler) { 140 this.addEventListener(type, handler); 141 this.#eventHandlers.set(type, handler); 142 } 143 } 144 145 #listenForEventMessages() { 146 this.#worker.addEventListener('message', ({data}) => { 147 const { type, label, result } = data; 148 149 const eventTypes = 150 ['open', 'bufferedamountlow', 'error', 'closing', 'close', 'message']; 151 152 if (label == this.#label && eventTypes.includes(type)) { 153 let e; 154 if (type == 'message') { 155 const {data, origin} = result; 156 e = new MessageEvent(type, {data, origin}); 157 } else { 158 e = new Event(type); 159 } 160 this.dispatchEvent(e); 161 } 162 }); 163 } 164 165 166 #sendRequestToWorker(type, arg, transferOrOptions) { 167 if (!this.#label) { 168 throw new Error('RTCDataChannel worker shim not initialized!'); 169 } 170 171 return new Promise((resolve, reject) => { 172 // We currently assume that if multiple requests of the same type are 173 // sent, they get responses in the same order. That probably won't 174 // change, but if it does we'll need a transaction id. 175 const msg = { type, label: this.#label, arg }; 176 const responseType = `${type}Response` 177 178 const onResponse = ({data}) => { 179 const {type, label, result} = data; 180 if (type == responseType && label == this.#label) { 181 this.#worker.removeEventListener('message', onResponse); 182 if (result?.error) { 183 // Error thrown by RTCDataChannel, other error cases are handled by 184 // the code in this.#errorPromise 185 // TODO: Maybe re-synthesize the specific error thrown by the 186 // RTCDataChannel? 187 reject(new Error(`RTCDataChannel error: ${result.error.message}`)); 188 } else { 189 resolve(result); 190 } 191 } 192 }; 193 194 this.#worker.addEventListener('message', onResponse); 195 this.#worker.postMessage(msg, transferOrOptions); 196 }); 197 } 198 }