flow.js (9314B)
1 var expect = require('chai').expect; 2 var util = require('./util'); 3 4 var Flow = require('../lib/protocol/flow').Flow; 5 6 var MAX_PAYLOAD_SIZE = 16384; 7 8 function createFlow(log) { 9 var flowControlId = util.random(10, 100); 10 var flow = new Flow(flowControlId); 11 flow._log = util.log.child(log || {}); 12 return flow; 13 } 14 15 describe('flow.js', function() { 16 describe('Flow class', function() { 17 var flow; 18 beforeEach(function() { 19 flow = createFlow(); 20 }); 21 22 describe('._receive(frame, callback) method', function() { 23 it('is called when there\'s a frame in the input buffer to be consumed', function(done) { 24 var frame = { type: 'PRIORITY', flags: {}, priority: 1 }; 25 flow._receive = function _receive(receivedFrame, callback) { 26 expect(receivedFrame).to.equal(frame); 27 callback(); 28 }; 29 flow.write(frame, done); 30 }); 31 it('has to be overridden by the child class, otherwise it throws', function() { 32 expect(flow._receive.bind(flow)).to.throw(Error); 33 }); 34 }); 35 describe('._send() method', function() { 36 it('is called when the output buffer should be filled with more frames and the flow' + 37 'control queue is empty', function() { 38 var notFlowControlledFrame = { type: 'PRIORITY', flags: {}, priority: 1 }; 39 flow._send = function _send() { 40 this.push(notFlowControlledFrame); 41 }; 42 expect(flow.read()).to.equal(notFlowControlledFrame); 43 44 flow._window = 0; 45 flow._queue.push({ type: 'DATA', flags: {}, data: { length: 1 } }); 46 var frame = flow.read(); 47 while (frame.type === notFlowControlledFrame.type) frame = flow.read(); 48 expect(frame.type).to.equal('BLOCKED'); 49 expect(flow.read()).to.equal(null); 50 }); 51 it('has to be overridden by the child class, otherwise it throws', function() { 52 expect(flow._send.bind(flow)).to.throw(Error); 53 }); 54 }); 55 describe('._increaseWindow(size) method', function() { 56 it('should increase `this._window` by `size`', function() { 57 flow._send = util.noop; 58 flow._window = 0; 59 60 var increase1 = util.random(0,100); 61 var increase2 = util.random(0,100); 62 flow._increaseWindow(increase1); 63 flow._increaseWindow(increase2); 64 expect(flow._window).to.equal(increase1 + increase2); 65 66 flow._increaseWindow(Infinity); 67 expect(flow._window).to.equal(Infinity); 68 }); 69 it('should emit error when increasing with a finite `size` when `_window` is infinite', function() { 70 flow._send = util.noop; 71 flow._increaseWindow(Infinity); 72 var increase = util.random(1,100); 73 74 expect(flow._increaseWindow.bind(flow, increase)).to.throw('Uncaught, unspecified "error" event.'); 75 }); 76 it('should emit error when `_window` grows over the window limit', function() { 77 var WINDOW_SIZE_LIMIT = Math.pow(2, 31) - 1; 78 flow._send = util.noop; 79 flow._window = 0; 80 81 flow._increaseWindow(WINDOW_SIZE_LIMIT); 82 expect(flow._increaseWindow.bind(flow, 1)).to.throw('Uncaught, unspecified "error" event.'); 83 84 }); 85 }); 86 describe('.read() method', function() { 87 describe('when the flow control queue is not empty', function() { 88 it('should return the first item in the queue if the window is enough', function() { 89 var priorityFrame = { type: 'PRIORITY', flags: {}, priority: 1 }; 90 var dataFrame = { type: 'DATA', flags: {}, data: { length: 10 } }; 91 flow._send = util.noop; 92 flow._window = 10; 93 flow._queue = [priorityFrame, dataFrame]; 94 95 expect(flow.read()).to.equal(priorityFrame); 96 expect(flow.read()).to.equal(dataFrame); 97 }); 98 it('should also split DATA frames when needed', function() { 99 var buffer = Buffer.alloc(10); 100 var dataFrame = { type: 'DATA', flags: {}, stream: util.random(0, 100), data: buffer }; 101 flow._send = util.noop; 102 flow._window = 5; 103 flow._queue = [dataFrame]; 104 105 var expectedFragment = { flags: {}, type: 'DATA', stream: dataFrame.stream, data: buffer.slice(0,5) }; 106 expect(flow.read()).to.deep.equal(expectedFragment); 107 expect(dataFrame.data).to.deep.equal(buffer.slice(5)); 108 }); 109 }); 110 }); 111 describe('.push(frame) method', function() { 112 it('should push `frame` into the output queue or the flow control queue', function() { 113 var priorityFrame = { type: 'PRIORITY', flags: {}, priority: 1 }; 114 var dataFrame = { type: 'DATA', flags: {}, data: { length: 10 } }; 115 flow._window = 10; 116 117 flow.push(dataFrame); // output queue 118 flow.push(dataFrame); // flow control queue, because of depleted window 119 flow.push(priorityFrame); // flow control queue, because it's not empty 120 121 expect(flow.read()).to.be.equal(dataFrame); 122 expect(flow._queue[0]).to.be.equal(dataFrame); 123 expect(flow._queue[1]).to.be.equal(priorityFrame); 124 }); 125 }); 126 describe('.write() method', function() { 127 it('call with a DATA frame should trigger sending WINDOW_UPDATE if remote flow control is not' + 128 'disabled', function(done) { 129 flow._window = 100; 130 flow._send = util.noop; 131 flow._receive = function(frame, callback) { 132 callback(); 133 }; 134 135 var buffer = Buffer.alloc(util.random(10, 100)); 136 flow.write({ type: 'DATA', flags: {}, data: buffer }); 137 flow.once('readable', function() { 138 expect(flow.read()).to.be.deep.equal({ 139 type: 'WINDOW_UPDATE', 140 flags: {}, 141 stream: flow._flowControlId, 142 window_size: buffer.length 143 }); 144 done(); 145 }); 146 }); 147 }); 148 }); 149 describe('test scenario', function() { 150 var flow1, flow2; 151 beforeEach(function() { 152 flow1 = createFlow({ flow: 1 }); 153 flow2 = createFlow({ flow: 2 }); 154 flow1._flowControlId = flow2._flowControlId; 155 flow1._send = flow2._send = util.noop; 156 flow1._receive = flow2._receive = function(frame, callback) { callback(); }; 157 }); 158 159 describe('sending a large data stream', function() { 160 it('should work as expected', function(done) { 161 // Sender side 162 var frameNumber = util.random(5, 8); 163 var input = []; 164 flow1._send = function _send() { 165 if (input.length >= frameNumber) { 166 this.push({ type: 'DATA', flags: { END_STREAM: true }, data: Buffer.alloc(0) }); 167 this.push(null); 168 } else { 169 var buffer = Buffer.allocUnsafe(util.random(1000, 100000)); 170 input.push(buffer); 171 this.push({ type: 'DATA', flags: {}, data: buffer }); 172 } 173 }; 174 175 // Receiver side 176 var output = []; 177 flow2._receive = function _receive(frame, callback) { 178 if (frame.type === 'DATA') { 179 expect(frame.data.length).to.be.lte(MAX_PAYLOAD_SIZE); 180 output.push(frame.data); 181 } 182 if (frame.flags.END_STREAM) { 183 this.emit('end_stream'); 184 } 185 callback(); 186 }; 187 188 // Checking results 189 flow2.on('end_stream', function() { 190 input = util.concat(input); 191 output = util.concat(output); 192 193 expect(input).to.deep.equal(output); 194 195 done(); 196 }); 197 198 // Start piping 199 flow1.pipe(flow2).pipe(flow1); 200 }); 201 }); 202 203 describe('when running out of window', function() { 204 it('should send a BLOCKED frame', function(done) { 205 // Sender side 206 var frameNumber = util.random(5, 8); 207 var input = []; 208 flow1._send = function _send() { 209 if (input.length >= frameNumber) { 210 this.push({ type: 'DATA', flags: { END_STREAM: true }, data: Buffer.alloc(0) }); 211 this.push(null); 212 } else { 213 var buffer = Buffer.allocUnsafe(util.random(1000, 100000)); 214 input.push(buffer); 215 this.push({ type: 'DATA', flags: {}, data: buffer }); 216 } 217 }; 218 219 // Receiver side 220 // Do not send WINDOW_UPDATESs except when the other side sends BLOCKED 221 var output = []; 222 flow2._restoreWindow = util.noop; 223 flow2._receive = function _receive(frame, callback) { 224 if (frame.type === 'DATA') { 225 expect(frame.data.length).to.be.lte(MAX_PAYLOAD_SIZE); 226 output.push(frame.data); 227 } 228 if (frame.flags.END_STREAM) { 229 this.emit('end_stream'); 230 } 231 if (frame.type === 'BLOCKED') { 232 setTimeout(function() { 233 this._push({ 234 type: 'WINDOW_UPDATE', 235 flags: {}, 236 stream: this._flowControlId, 237 window_size: this._received 238 }); 239 this._received = 0; 240 }.bind(this), 20); 241 } 242 callback(); 243 }; 244 245 // Checking results 246 flow2.on('end_stream', function() { 247 input = util.concat(input); 248 output = util.concat(output); 249 250 expect(input).to.deep.equal(output); 251 252 done(); 253 }); 254 255 // Start piping 256 flow1.pipe(flow2).pipe(flow1); 257 }); 258 }); 259 }); 260 });