stream.js (16157B)
1 var expect = require('chai').expect; 2 var util = require('./util'); 3 4 var stream = require('../lib/protocol/stream'); 5 var Stream = stream.Stream; 6 7 function createStream() { 8 var stream = new Stream(util.log, null); 9 stream.upstream._window = Infinity; 10 return stream; 11 } 12 13 // Execute a list of commands and assertions 14 var recorded_events = ['state', 'error', 'window_update', 'headers', 'promise']; 15 function execute_sequence(stream, sequence, done) { 16 if (!done) { 17 done = sequence; 18 sequence = stream; 19 stream = createStream(); 20 } 21 22 var outgoing_frames = []; 23 24 var emit = stream.emit, events = []; 25 stream.emit = function(name) { 26 if (recorded_events.indexOf(name) !== -1) { 27 events.push({ name: name, data: Array.prototype.slice.call(arguments, 1) }); 28 } 29 return emit.apply(this, arguments); 30 }; 31 32 var commands = [], checks = []; 33 sequence.forEach(function(step) { 34 if ('method' in step || 'incoming' in step || 'outgoing' in step || 'wait' in step || 'set_state' in step) { 35 commands.push(step); 36 } 37 38 if ('outgoing' in step || 'event' in step || 'active' in step) { 39 checks.push(step); 40 } 41 }); 42 43 var activeCount = 0; 44 function count_change(change) { 45 activeCount += change; 46 } 47 48 function execute(callback) { 49 var command = commands.shift(); 50 if (command) { 51 if ('method' in command) { 52 var value = stream[command.method.name].apply(stream, command.method.arguments); 53 if (command.method.ret) { 54 command.method.ret(value); 55 } 56 execute(callback); 57 } else if ('incoming' in command) { 58 command.incoming.count_change = count_change; 59 stream.upstream.write(command.incoming); 60 execute(callback); 61 } else if ('outgoing' in command) { 62 outgoing_frames.push(stream.upstream.read()); 63 execute(callback); 64 } else if ('set_state' in command) { 65 stream.state = command.set_state; 66 execute(callback); 67 } else if ('wait' in command) { 68 setTimeout(execute.bind(null, callback), command.wait); 69 } else { 70 throw new Error('Invalid command', command); 71 } 72 } else { 73 setTimeout(callback, 5); 74 } 75 } 76 77 function check() { 78 checks.forEach(function(check) { 79 if ('outgoing' in check) { 80 var frame = outgoing_frames.shift(); 81 for (var key in check.outgoing) { 82 expect(frame).to.have.property(key).that.deep.equals(check.outgoing[key]); 83 } 84 count_change(frame.count_change); 85 } else if ('event' in check) { 86 var event = events.shift(); 87 expect(event.name).to.be.equal(check.event.name); 88 check.event.data.forEach(function(data, index) { 89 expect(event.data[index]).to.deep.equal(data); 90 }); 91 } else if ('active' in check) { 92 expect(activeCount).to.be.equal(check.active); 93 } else { 94 throw new Error('Invalid check', check); 95 } 96 }); 97 done(); 98 } 99 100 setImmediate(execute.bind(null, check)); 101 } 102 103 var example_frames = [ 104 { type: 'PRIORITY', flags: {}, priority: 1 }, 105 { type: 'WINDOW_UPDATE', flags: {}, settings: {} }, 106 { type: 'RST_STREAM', flags: {}, error: 'CANCEL' }, 107 { type: 'HEADERS', flags: {}, headers: {}, priority: undefined }, 108 { type: 'DATA', flags: {}, data: Buffer.alloc(5) }, 109 { type: 'PUSH_PROMISE', flags: {}, headers: {}, promised_stream: new Stream(util.log, null) } 110 ]; 111 112 var invalid_incoming_frames = { 113 IDLE: [ 114 { type: 'DATA', flags: {}, data: Buffer.alloc(5) }, 115 { type: 'WINDOW_UPDATE', flags: {}, settings: {} }, 116 { type: 'PUSH_PROMISE', flags: {}, headers: {} }, 117 { type: 'RST_STREAM', flags: {}, error: 'CANCEL' } 118 ], 119 RESERVED_LOCAL: [ 120 { type: 'DATA', flags: {}, data: Buffer.alloc(5) }, 121 { type: 'HEADERS', flags: {}, headers: {}, priority: undefined }, 122 { type: 'PUSH_PROMISE', flags: {}, headers: {} }, 123 { type: 'WINDOW_UPDATE', flags: {}, settings: {} } 124 ], 125 RESERVED_REMOTE: [ 126 { type: 'DATA', flags: {}, data: Buffer.alloc(5) }, 127 { type: 'PUSH_PROMISE', flags: {}, headers: {} }, 128 { type: 'WINDOW_UPDATE', flags: {}, settings: {} } 129 ], 130 OPEN: [ 131 ], 132 HALF_CLOSED_LOCAL: [ 133 ], 134 HALF_CLOSED_REMOTE: [ 135 { type: 'DATA', flags: {}, data: Buffer.alloc(5) }, 136 { type: 'HEADERS', flags: {}, headers: {}, priority: undefined }, 137 { type: 'PUSH_PROMISE', flags: {}, headers: {} } 138 ] 139 }; 140 141 var invalid_outgoing_frames = { 142 IDLE: [ 143 { type: 'DATA', flags: {}, data: Buffer.alloc(5) }, 144 { type: 'WINDOW_UPDATE', flags: {}, settings: {} }, 145 { type: 'PUSH_PROMISE', flags: {}, headers: {} } 146 ], 147 RESERVED_LOCAL: [ 148 { type: 'DATA', flags: {}, data: Buffer.alloc(5) }, 149 { type: 'PUSH_PROMISE', flags: {}, headers: {} }, 150 { type: 'WINDOW_UPDATE', flags: {}, settings: {} } 151 ], 152 RESERVED_REMOTE: [ 153 { type: 'DATA', flags: {}, data: Buffer.alloc(5) }, 154 { type: 'HEADERS', flags: {}, headers: {}, priority: undefined }, 155 { type: 'PUSH_PROMISE', flags: {}, headers: {} }, 156 { type: 'WINDOW_UPDATE', flags: {}, settings: {} } 157 ], 158 OPEN: [ 159 ], 160 HALF_CLOSED_LOCAL: [ 161 { type: 'DATA', flags: {}, data: Buffer.alloc(5) }, 162 { type: 'HEADERS', flags: {}, headers: {}, priority: undefined }, 163 { type: 'PUSH_PROMISE', flags: {}, headers: {} } 164 ], 165 HALF_CLOSED_REMOTE: [ 166 ], 167 CLOSED: [ 168 { type: 'WINDOW_UPDATE', flags: {}, settings: {} }, 169 { type: 'HEADERS', flags: {}, headers: {}, priority: undefined }, 170 { type: 'DATA', flags: {}, data: Buffer.alloc(5) }, 171 { type: 'PUSH_PROMISE', flags: {}, headers: {}, promised_stream: new Stream(util.log, null) } 172 ] 173 }; 174 175 describe('stream.js', function() { 176 describe('Stream class', function() { 177 describe('._transition(sending, frame) method', function() { 178 it('should emit error, and answer RST_STREAM for invalid incoming frames', function() { 179 Object.keys(invalid_incoming_frames).forEach(function(state) { 180 invalid_incoming_frames[state].forEach(function(invalid_frame) { 181 var stream = createStream(); 182 var connectionErrorHappened = false; 183 stream.state = state; 184 stream.once('connectionError', function() { connectionErrorHappened = true; }); 185 stream._transition(false, invalid_frame); 186 expect(connectionErrorHappened); 187 }); 188 }); 189 190 // CLOSED state as a result of incoming END_STREAM (or RST_STREAM) 191 var stream = createStream(); 192 stream.headers({}); 193 stream.end(); 194 stream.upstream.write({ type: 'HEADERS', headers:{}, flags: { END_STREAM: true }, count_change: util.noop }); 195 example_frames.slice(2).forEach(function(invalid_frame) { 196 invalid_frame.count_change = util.noop; 197 expect(stream._transition.bind(stream, false, invalid_frame)).to.throw('Uncaught, unspecified "error" event.'); 198 }); 199 200 // CLOSED state as a result of outgoing END_STREAM 201 stream = createStream(); 202 stream.upstream.write({ type: 'HEADERS', headers:{}, flags: { END_STREAM: true }, count_change: util.noop }); 203 stream.headers({}); 204 stream.end(); 205 example_frames.slice(3).forEach(function(invalid_frame) { 206 invalid_frame.count_change = util.noop; 207 expect(stream._transition.bind(stream, false, invalid_frame)).to.throw('Uncaught, unspecified "error" event.'); 208 }); 209 }); 210 it('should throw exception for invalid outgoing frames', function() { 211 Object.keys(invalid_outgoing_frames).forEach(function(state) { 212 invalid_outgoing_frames[state].forEach(function(invalid_frame) { 213 var stream = createStream(); 214 stream.state = state; 215 expect(stream._transition.bind(stream, true, invalid_frame)).to.throw(Error); 216 }); 217 }); 218 }); 219 it('should close the stream when there\'s an incoming or outgoing RST_STREAM', function() { 220 [ 221 'RESERVED_LOCAL', 222 'RESERVED_REMOTE', 223 'OPEN', 224 'HALF_CLOSED_LOCAL', 225 'HALF_CLOSED_REMOTE' 226 ].forEach(function(state) { 227 [true, false].forEach(function(sending) { 228 var stream = createStream(); 229 stream.state = state; 230 stream._transition(sending, { type: 'RST_STREAM', flags: {} }); 231 expect(stream.state).to.be.equal('CLOSED'); 232 }); 233 }); 234 }); 235 it('should ignore any incoming frame after sending reset', function() { 236 var stream = createStream(); 237 stream.reset(); 238 example_frames.forEach(stream._transition.bind(stream, false)); 239 }); 240 it('should ignore certain incoming frames after closing the stream with END_STREAM', function() { 241 var stream = createStream(); 242 stream.upstream.write({ type: 'HEADERS', flags: { END_STREAM: true }, headers:{} }); 243 stream.headers({}); 244 stream.end(); 245 example_frames.slice(0,3).forEach(function(frame) { 246 frame.count_change = util.noop; 247 stream._transition(false, frame); 248 }); 249 }); 250 }); 251 }); 252 describe('test scenario', function() { 253 describe('sending request', function() { 254 it('should trigger the appropriate state transitions and outgoing frames', function(done) { 255 execute_sequence([ 256 { method : { name: 'headers', arguments: [{ ':path': '/' }] } }, 257 { outgoing: { type: 'HEADERS', flags: { }, headers: { ':path': '/' } } }, 258 { event : { name: 'state', data: ['OPEN'] } }, 259 260 { wait : 5 }, 261 { method : { name: 'end', arguments: [] } }, 262 { event : { name: 'state', data: ['HALF_CLOSED_LOCAL'] } }, 263 { outgoing: { type: 'DATA', flags: { END_STREAM: true }, data: Buffer.alloc(0) } }, 264 265 { wait : 10 }, 266 { incoming: { type: 'HEADERS', flags: { }, headers: { ':status': 200 } } }, 267 { incoming: { type: 'DATA' , flags: { END_STREAM: true }, data: Buffer.alloc(5) } }, 268 { event : { name: 'headers', data: [{ ':status': 200 }] } }, 269 { event : { name: 'state', data: ['CLOSED'] } }, 270 271 { active : 0 } 272 ], done); 273 }); 274 }); 275 describe('answering request', function() { 276 it('should trigger the appropriate state transitions and outgoing frames', function(done) { 277 var payload = Buffer.alloc(5); 278 execute_sequence([ 279 { incoming: { type: 'HEADERS', flags: { }, headers: { ':path': '/' } } }, 280 { event : { name: 'state', data: ['OPEN'] } }, 281 { event : { name: 'headers', data: [{ ':path': '/' }] } }, 282 283 { wait : 5 }, 284 { incoming: { type: 'DATA', flags: { }, data: Buffer.alloc(5) } }, 285 { incoming: { type: 'DATA', flags: { END_STREAM: true }, data: Buffer.alloc(10) } }, 286 { event : { name: 'state', data: ['HALF_CLOSED_REMOTE'] } }, 287 288 { wait : 5 }, 289 { method : { name: 'headers', arguments: [{ ':status': 200 }] } }, 290 { outgoing: { type: 'HEADERS', flags: { }, headers: { ':status': 200 } } }, 291 292 { wait : 5 }, 293 { method : { name: 'end', arguments: [payload] } }, 294 { outgoing: { type: 'DATA', flags: { END_STREAM: true }, data: payload } }, 295 { event : { name: 'state', data: ['CLOSED'] } }, 296 297 { active : 0 } 298 ], done); 299 }); 300 }); 301 describe('sending push stream', function() { 302 it('should trigger the appropriate state transitions and outgoing frames', function(done) { 303 var payload = Buffer.alloc(5); 304 var pushStream; 305 306 execute_sequence([ 307 // receiving request 308 { incoming: { type: 'HEADERS', flags: { END_STREAM: true }, headers: { ':path': '/' } } }, 309 { event : { name: 'state', data: ['OPEN'] } }, 310 { event : { name: 'state', data: ['HALF_CLOSED_REMOTE'] } }, 311 { event : { name: 'headers', data: [{ ':path': '/' }] } }, 312 313 // sending response headers 314 { wait : 5 }, 315 { method : { name: 'headers', arguments: [{ ':status': '200' }] } }, 316 { outgoing: { type: 'HEADERS', flags: { }, headers: { ':status': '200' } } }, 317 318 // sending push promise 319 { method : { name: 'promise', arguments: [{ ':path': '/' }], ret: function(str) { pushStream = str; } } }, 320 { outgoing: { type: 'PUSH_PROMISE', flags: { }, headers: { ':path': '/' } } }, 321 322 // sending response data 323 { method : { name: 'end', arguments: [payload] } }, 324 { outgoing: { type: 'DATA', flags: { END_STREAM: true }, data: payload } }, 325 { event : { name: 'state', data: ['CLOSED'] } }, 326 327 { active : 0 } 328 ], function() { 329 // initial state of the promised stream 330 expect(pushStream.state).to.equal('RESERVED_LOCAL'); 331 332 execute_sequence(pushStream, [ 333 // push headers 334 { wait : 5 }, 335 { method : { name: 'headers', arguments: [{ ':status': '200' }] } }, 336 { outgoing: { type: 'HEADERS', flags: { }, headers: { ':status': '200' } } }, 337 { event : { name: 'state', data: ['HALF_CLOSED_REMOTE'] } }, 338 339 // push data 340 { method : { name: 'end', arguments: [payload] } }, 341 { outgoing: { type: 'DATA', flags: { END_STREAM: true }, data: payload } }, 342 { event : { name: 'state', data: ['CLOSED'] } }, 343 344 { active : 1 } 345 ], done); 346 }); 347 }); 348 }); 349 describe('receiving push stream', function() { 350 it('should trigger the appropriate state transitions and outgoing frames', function(done) { 351 var payload = Buffer.alloc(5); 352 var original_stream = createStream(); 353 var promised_stream = createStream(); 354 355 done = util.callNTimes(2, done); 356 357 execute_sequence(original_stream, [ 358 // sending request headers 359 { method : { name: 'headers', arguments: [{ ':path': '/' }] } }, 360 { method : { name: 'end', arguments: [] } }, 361 { outgoing: { type: 'HEADERS', flags: { END_STREAM: true }, headers: { ':path': '/' } } }, 362 { event : { name: 'state', data: ['OPEN'] } }, 363 { event : { name: 'state', data: ['HALF_CLOSED_LOCAL'] } }, 364 365 // receiving response headers 366 { wait : 10 }, 367 { incoming: { type: 'HEADERS', flags: { }, headers: { ':status': 200 } } }, 368 { event : { name: 'headers', data: [{ ':status': 200 }] } }, 369 370 // receiving push promise 371 { incoming: { type: 'PUSH_PROMISE', flags: { }, headers: { ':path': '/2.html' }, promised_stream: promised_stream } }, 372 { event : { name: 'promise', data: [promised_stream, { ':path': '/2.html' }] } }, 373 374 // receiving response data 375 { incoming: { type: 'DATA' , flags: { END_STREAM: true }, data: payload } }, 376 { event : { name: 'state', data: ['CLOSED'] } }, 377 378 { active : 0 } 379 ], done); 380 381 execute_sequence(promised_stream, [ 382 // initial state of the promised stream 383 { event : { name: 'state', data: ['RESERVED_REMOTE'] } }, 384 385 // push headers 386 { wait : 10 }, 387 { incoming: { type: 'HEADERS', flags: { END_STREAM: false }, headers: { ':status': 200 } } }, 388 { event : { name: 'state', data: ['HALF_CLOSED_LOCAL'] } }, 389 { event : { name: 'headers', data: [{ ':status': 200 }] } }, 390 391 // push data 392 { incoming: { type: 'DATA', flags: { END_STREAM: true }, data: payload } }, 393 { event : { name: 'state', data: ['CLOSED'] } }, 394 395 { active : 0 } 396 ], done); 397 }); 398 }); 399 }); 400 401 describe('bunyan formatter', function() { 402 describe('`s`', function() { 403 var format = stream.serializers.s; 404 it('should assign a unique ID to each frame', function() { 405 var stream1 = createStream(); 406 var stream2 = createStream(); 407 expect(format(stream1)).to.be.equal(format(stream1)); 408 expect(format(stream2)).to.be.equal(format(stream2)); 409 expect(format(stream1)).to.not.be.equal(format(stream2)); 410 }); 411 }); 412 }); 413 });