rs-utils.js (5140B)
1 'use strict'; 2 (function () { 3 // Fake setInterval-like functionality in environments that don't have it 4 class IntervalHandle { 5 constructor(callback, delayMs) { 6 this.callback = callback; 7 this.delayMs = delayMs; 8 this.cancelled = false; 9 Promise.resolve().then(() => this.check()); 10 } 11 12 async check() { 13 while (true) { 14 await new Promise(resolve => step_timeout(resolve, this.delayMs)); 15 if (this.cancelled) { 16 return; 17 } 18 this.callback(); 19 } 20 } 21 22 cancel() { 23 this.cancelled = true; 24 } 25 } 26 27 let localSetInterval, localClearInterval; 28 if (typeof globalThis.setInterval !== "undefined" && 29 typeof globalThis.clearInterval !== "undefined") { 30 localSetInterval = globalThis.setInterval; 31 localClearInterval = globalThis.clearInterval; 32 } else { 33 localSetInterval = function setInterval(callback, delayMs) { 34 return new IntervalHandle(callback, delayMs); 35 } 36 localClearInterval = function clearInterval(handle) { 37 handle.cancel(); 38 } 39 } 40 41 class RandomPushSource { 42 constructor(toPush) { 43 this.pushed = 0; 44 this.toPush = toPush; 45 this.started = false; 46 this.paused = false; 47 this.closed = false; 48 49 this._intervalHandle = null; 50 } 51 52 readStart() { 53 if (this.closed) { 54 return; 55 } 56 57 if (!this.started) { 58 this._intervalHandle = localSetInterval(writeChunk, 2); 59 this.started = true; 60 } 61 62 if (this.paused) { 63 this._intervalHandle = localSetInterval(writeChunk, 2); 64 this.paused = false; 65 } 66 67 const source = this; 68 function writeChunk() { 69 if (source.paused) { 70 return; 71 } 72 73 source.pushed++; 74 75 if (source.toPush > 0 && source.pushed > source.toPush) { 76 if (source._intervalHandle) { 77 localClearInterval(source._intervalHandle); 78 source._intervalHandle = undefined; 79 } 80 source.closed = true; 81 source.onend(); 82 } else { 83 source.ondata(randomChunk(128)); 84 } 85 } 86 } 87 88 readStop() { 89 if (this.paused) { 90 return; 91 } 92 93 if (this.started) { 94 this.paused = true; 95 localClearInterval(this._intervalHandle); 96 this._intervalHandle = undefined; 97 } else { 98 throw new Error('Can\'t pause reading an unstarted source.'); 99 } 100 } 101 } 102 103 function randomChunk(size) { 104 let chunk = ''; 105 106 for (let i = 0; i < size; ++i) { 107 // Add a random character from the basic printable ASCII set. 108 chunk += String.fromCharCode(Math.round(Math.random() * 84) + 32); 109 } 110 111 return chunk; 112 } 113 114 function readableStreamToArray(readable, reader) { 115 if (reader === undefined) { 116 reader = readable.getReader(); 117 } 118 119 const chunks = []; 120 121 return pump(); 122 123 function pump() { 124 return reader.read().then(result => { 125 if (result.done) { 126 return chunks; 127 } 128 129 chunks.push(result.value); 130 return pump(); 131 }); 132 } 133 } 134 135 class SequentialPullSource { 136 constructor(limit, options) { 137 const async = options && options.async; 138 139 this.current = 0; 140 this.limit = limit; 141 this.opened = false; 142 this.closed = false; 143 144 this._exec = f => f(); 145 if (async) { 146 this._exec = f => step_timeout(f, 0); 147 } 148 } 149 150 open(cb) { 151 this._exec(() => { 152 this.opened = true; 153 cb(); 154 }); 155 } 156 157 read(cb) { 158 this._exec(() => { 159 if (++this.current <= this.limit) { 160 cb(null, false, this.current); 161 } else { 162 cb(null, true, null); 163 } 164 }); 165 } 166 167 close(cb) { 168 this._exec(() => { 169 this.closed = true; 170 cb(); 171 }); 172 } 173 } 174 175 function sequentialReadableStream(limit, options) { 176 const sequentialSource = new SequentialPullSource(limit, options); 177 178 const stream = new ReadableStream({ 179 start() { 180 return new Promise((resolve, reject) => { 181 sequentialSource.open(err => { 182 if (err) { 183 reject(err); 184 } 185 resolve(); 186 }); 187 }); 188 }, 189 190 pull(c) { 191 return new Promise((resolve, reject) => { 192 sequentialSource.read((err, done, chunk) => { 193 if (err) { 194 reject(err); 195 } else if (done) { 196 sequentialSource.close(err2 => { 197 if (err2) { 198 reject(err2); 199 } 200 c.close(); 201 resolve(); 202 }); 203 } else { 204 c.enqueue(chunk); 205 resolve(); 206 } 207 }); 208 }); 209 } 210 }); 211 212 stream.source = sequentialSource; 213 214 return stream; 215 } 216 217 function transferArrayBufferView(view) { 218 return structuredClone(view, { transfer: [view.buffer] }); 219 } 220 221 self.RandomPushSource = RandomPushSource; 222 self.readableStreamToArray = readableStreamToArray; 223 self.sequentialReadableStream = sequentialReadableStream; 224 self.transferArrayBufferView = transferArrayBufferView; 225 226 }());