endpoint.js (9633B)
1 var assert = require('assert'); 2 3 var Serializer = require('./framer').Serializer; 4 var Deserializer = require('./framer').Deserializer; 5 var Compressor = require('./compressor').Compressor; 6 var Decompressor = require('./compressor').Decompressor; 7 var Connection = require('./connection').Connection; 8 var Duplex = require('stream').Duplex; 9 var Transform = require('stream').Transform; 10 11 exports.Endpoint = Endpoint; 12 13 // The Endpoint class 14 // ================== 15 16 // Public API 17 // ---------- 18 19 // - **new Endpoint(log, role, settings, filters)**: create a new Endpoint. 20 // 21 // - `log`: bunyan logger of the parent 22 // - `role`: 'CLIENT' or 'SERVER' 23 // - `settings`: initial HTTP/2 settings 24 // - `filters`: a map of functions that filter the traffic between components (for debugging or 25 // intentional failure injection). 26 // 27 // Filter functions get three arguments: 28 // 1. `frame`: the current frame 29 // 2. `forward(frame)`: function that can be used to forward a frame to the next component 30 // 3. `done()`: callback to signal the end of the filter process 31 // 32 // Valid filter names and their position in the stack: 33 // - `beforeSerialization`: after compression, before serialization 34 // - `beforeCompression`: after multiplexing, before compression 35 // - `afterDeserialization`: after deserialization, before decompression 36 // - `afterDecompression`: after decompression, before multiplexing 37 // 38 // * **Event: 'stream' (Stream)**: 'stream' event forwarded from the underlying Connection 39 // 40 // * **Event: 'error' (type)**: signals an error 41 // 42 // * **createStream(): Stream**: initiate a new stream (forwarded to the underlying Connection) 43 // 44 // * **close([error])**: close the connection with an error code 45 46 // Constructor 47 // ----------- 48 49 // The process of initialization: 50 function Endpoint(log, role, settings, filters) { 51 Duplex.call(this); 52 53 // * Initializing logging infrastructure 54 this._log = log.child({ component: 'endpoint', e: this }); 55 56 // * First part of the handshake process: sending and receiving the client connection header 57 // prelude. 58 assert((role === 'CLIENT') || role === 'SERVER'); 59 if (role === 'CLIENT') { 60 this._writePrelude(); 61 } else { 62 this._readPrelude(); 63 } 64 65 // * Initialization of component. This includes the second part of the handshake process: 66 // sending the first SETTINGS frame. This is done by the connection class right after 67 // initialization. 68 this._initializeDataFlow(role, settings, filters || {}); 69 70 // * Initialization of management code. 71 this._initializeManagement(); 72 73 // * Initializing error handling. 74 this._initializeErrorHandling(); 75 } 76 Endpoint.prototype = Object.create(Duplex.prototype, { constructor: { value: Endpoint } }); 77 78 // Handshake 79 // --------- 80 81 var CLIENT_PRELUDE = Buffer.from('PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n'); 82 83 // Writing the client header is simple and synchronous. 84 Endpoint.prototype._writePrelude = function _writePrelude() { 85 this._log.debug('Sending the client connection header prelude.'); 86 this.push(CLIENT_PRELUDE); 87 }; 88 89 // The asynchronous process of reading the client header: 90 Endpoint.prototype._readPrelude = function _readPrelude() { 91 // * progress in the header is tracker using a `cursor` 92 var cursor = 0; 93 94 // * `_write` is temporarily replaced by the comparator function 95 this._write = function _temporalWrite(chunk, encoding, done) { 96 // * which compares the stored header with the current `chunk` byte by byte and emits the 97 // 'error' event if there's a byte that doesn't match 98 var offset = cursor; 99 while(cursor < CLIENT_PRELUDE.length && (cursor - offset) < chunk.length) { 100 if (CLIENT_PRELUDE[cursor] !== chunk[cursor - offset]) { 101 this._log.fatal({ cursor: cursor, offset: offset, chunk: chunk }, 102 'Client connection header prelude does not match.'); 103 this._error('handshake', 'PROTOCOL_ERROR'); 104 return; 105 } 106 cursor += 1; 107 } 108 109 // * if the whole header is over, and there were no error then restore the original `_write` 110 // and call it with the remaining part of the current chunk 111 if (cursor === CLIENT_PRELUDE.length) { 112 this._log.debug('Successfully received the client connection header prelude.'); 113 delete this._write; 114 chunk = chunk.slice(cursor - offset); 115 this._write(chunk, encoding, done); 116 } 117 }; 118 }; 119 120 // Data flow 121 // --------- 122 123 // +---------------------------------------------+ 124 // | | 125 // | +-------------------------------------+ | 126 // | | +---------+ +---------+ +---------+ | | 127 // | | | stream1 | | stream2 | | ... | | | 128 // | | +---------+ +---------+ +---------+ | | 129 // | | connection | | 130 // | +-------------------------------------+ | 131 // | | ^ | 132 // | pipe | | pipe | 133 // | v | | 134 // | +------------------+------------------+ | 135 // | | compressor | decompressor | | 136 // | +------------------+------------------+ | 137 // | | ^ | 138 // | pipe | | pipe | 139 // | v | | 140 // | +------------------+------------------+ | 141 // | | serializer | deserializer | | 142 // | +------------------+------------------+ | 143 // | | ^ | 144 // | _read() | | _write() | 145 // | v | | 146 // | +------------+ +-----------+ | 147 // | |output queue| |input queue| | 148 // +------+------------+-----+-----------+-------+ 149 // | ^ 150 // read() | | write() 151 // v | 152 153 function createTransformStream(filter) { 154 var transform = new Transform({ objectMode: true }); 155 var push = transform.push.bind(transform); 156 transform._transform = function(frame, encoding, done) { 157 filter(frame, push, done); 158 }; 159 return transform; 160 } 161 162 function pipeAndFilter(stream1, stream2, filter) { 163 if (filter) { 164 stream1.pipe(createTransformStream(filter)).pipe(stream2); 165 } else { 166 stream1.pipe(stream2); 167 } 168 } 169 170 Endpoint.prototype._initializeDataFlow = function _initializeDataFlow(role, settings, filters) { 171 var firstStreamId, compressorRole, decompressorRole; 172 if (role === 'CLIENT') { 173 firstStreamId = 1; 174 compressorRole = 'REQUEST'; 175 decompressorRole = 'RESPONSE'; 176 } else { 177 firstStreamId = 2; 178 compressorRole = 'RESPONSE'; 179 decompressorRole = 'REQUEST'; 180 } 181 182 this._serializer = new Serializer(this._log); 183 this._deserializer = new Deserializer(this._log); 184 this._compressor = new Compressor(this._log, compressorRole); 185 this._decompressor = new Decompressor(this._log, decompressorRole); 186 this._connection = new Connection(this._log, firstStreamId, settings); 187 188 pipeAndFilter(this._connection, this._compressor, filters.beforeCompression); 189 pipeAndFilter(this._compressor, this._serializer, filters.beforeSerialization); 190 pipeAndFilter(this._deserializer, this._decompressor, filters.afterDeserialization); 191 pipeAndFilter(this._decompressor, this._connection, filters.afterDecompression); 192 193 this._connection.on('ACKNOWLEDGED_SETTINGS_HEADER_TABLE_SIZE', 194 this._decompressor.setTableSizeLimit.bind(this._decompressor)); 195 this._connection.on('RECEIVING_SETTINGS_HEADER_TABLE_SIZE', 196 this._compressor.setTableSizeLimit.bind(this._compressor)); 197 }; 198 199 var noread = {}; 200 Endpoint.prototype._read = function _read() { 201 this._readableState.sync = true; 202 var moreNeeded = noread, chunk; 203 while (moreNeeded && (chunk = this._serializer.read())) { 204 moreNeeded = this.push(chunk); 205 } 206 if (moreNeeded === noread) { 207 this._serializer.once('readable', this._read.bind(this)); 208 } 209 this._readableState.sync = false; 210 }; 211 212 Endpoint.prototype._write = function _write(chunk, encoding, done) { 213 this._deserializer.write(chunk, encoding, done); 214 }; 215 216 // Management 217 // -------------- 218 219 Endpoint.prototype._initializeManagement = function _initializeManagement() { 220 this._connection.on('stream', this.emit.bind(this, 'stream')); 221 }; 222 223 Endpoint.prototype.createStream = function createStream() { 224 return this._connection.createStream(); 225 }; 226 227 // Error handling 228 // -------------- 229 230 Endpoint.prototype._initializeErrorHandling = function _initializeErrorHandling() { 231 this._serializer.on('error', this._error.bind(this, 'serializer')); 232 this._deserializer.on('error', this._error.bind(this, 'deserializer')); 233 this._compressor.on('error', this._error.bind(this, 'compressor')); 234 this._decompressor.on('error', this._error.bind(this, 'decompressor')); 235 this._connection.on('error', this._error.bind(this, 'connection')); 236 237 this._connection.on('peerError', this.emit.bind(this, 'peerError')); 238 }; 239 240 Endpoint.prototype._error = function _error(component, error) { 241 this._log.fatal({ source: component, message: error }, 'Fatal error, closing connection'); 242 this.close(error); 243 setImmediate(this.emit.bind(this, 'error', error)); 244 }; 245 246 Endpoint.prototype.close = function close(error) { 247 this._connection.close(error); 248 }; 249 250 // Bunyan serializers 251 // ------------------ 252 253 exports.serializers = {}; 254 255 var nextId = 0; 256 exports.serializers.e = function(endpoint) { 257 if (!('id' in endpoint)) { 258 endpoint.id = nextId; 259 nextId += 1; 260 } 261 return endpoint.id; 262 };