PushServiceWebSocket.sys.mjs (38241B)
1 /* This Source Code Form is subject to the terms of the Mozilla Public 2 * License, v. 2.0. If a copy of the MPL was not distributed with this file, 3 * You can obtain one at http://mozilla.org/MPL/2.0/. */ 4 5 import { PushDB } from "resource://gre/modules/PushDB.sys.mjs"; 6 import { PushRecord } from "resource://gre/modules/PushRecord.sys.mjs"; 7 import { PushCrypto } from "resource://gre/modules/PushCrypto.sys.mjs"; 8 9 const lazy = {}; 10 11 ChromeUtils.defineESModuleGetters(lazy, { 12 ObjectUtils: "resource://gre/modules/ObjectUtils.sys.mjs", 13 pushBroadcastService: "resource://gre/modules/PushBroadcastService.sys.mjs", 14 }); 15 16 const kPUSHWSDB_DB_NAME = "pushapi"; 17 const kPUSHWSDB_DB_VERSION = 5; // Change this if the IndexedDB format changes 18 const kPUSHWSDB_STORE_NAME = "pushapi"; 19 20 // WebSocket close code sent by the server to indicate that the client should 21 // not automatically reconnect. 22 const kBACKOFF_WS_STATUS_CODE = 4774; 23 24 // Maps ack statuses, unsubscribe reasons, and delivery error reasons to codes 25 // included in request payloads. 26 const kACK_STATUS_TO_CODE = { 27 [Ci.nsIPushErrorReporter.ACK_DELIVERED]: 100, 28 [Ci.nsIPushErrorReporter.ACK_DECRYPTION_ERROR]: 101, 29 [Ci.nsIPushErrorReporter.ACK_NOT_DELIVERED]: 102, 30 }; 31 32 const kUNREGISTER_REASON_TO_CODE = { 33 [Ci.nsIPushErrorReporter.UNSUBSCRIBE_MANUAL]: 200, 34 [Ci.nsIPushErrorReporter.UNSUBSCRIBE_QUOTA_EXCEEDED]: 201, 35 [Ci.nsIPushErrorReporter.UNSUBSCRIBE_PERMISSION_REVOKED]: 202, 36 }; 37 38 const kDELIVERY_REASON_TO_CODE = { 39 [Ci.nsIPushErrorReporter.DELIVERY_UNCAUGHT_EXCEPTION]: 301, 40 [Ci.nsIPushErrorReporter.DELIVERY_UNHANDLED_REJECTION]: 302, 41 [Ci.nsIPushErrorReporter.DELIVERY_INTERNAL_ERROR]: 303, 42 }; 43 44 const kERROR_CODE_TO_GLEAN_LABEL = { 45 [Ci.nsIPushErrorReporter.ACK_DECRYPTION_ERROR]: "decryption_error", 46 [Ci.nsIPushErrorReporter.ACK_NOT_DELIVERED]: "not_delivered", 47 [Ci.nsIPushErrorReporter.DELIVERY_UNCAUGHT_EXCEPTION]: "uncaught_exception", 48 [Ci.nsIPushErrorReporter.DELIVERY_UNHANDLED_REJECTION]: "unhandled_rejection", 49 [Ci.nsIPushErrorReporter.DELIVERY_INTERNAL_ERROR]: "internal_error", 50 }; 51 52 const prefs = Services.prefs.getBranch("dom.push."); 53 54 ChromeUtils.defineLazyGetter(lazy, "console", () => { 55 return console.createInstance({ 56 maxLogLevelPref: "dom.push.loglevel", 57 prefix: "PushServiceWebSocket", 58 }); 59 }); 60 61 /** 62 * A proxy between the PushService and the WebSocket. The listener is used so 63 * that the PushService can silence messages from the WebSocket by setting 64 * PushWebSocketListener._pushService to null. This is required because 65 * a WebSocket can continue to send messages or errors after it has been 66 * closed but the PushService may not be interested in these. It's easier to 67 * stop listening than to have checks at specific points. 68 */ 69 var PushWebSocketListener = function (pushService) { 70 this._pushService = pushService; 71 }; 72 73 PushWebSocketListener.prototype = { 74 onStart(context) { 75 if (!this._pushService) { 76 return; 77 } 78 this._pushService._wsOnStart(context); 79 }, 80 81 onStop(context, statusCode) { 82 if (!this._pushService) { 83 return; 84 } 85 this._pushService._wsOnStop(context, statusCode); 86 }, 87 88 onAcknowledge() { 89 // EMPTY 90 }, 91 92 onBinaryMessageAvailable() { 93 // EMPTY 94 }, 95 96 onMessageAvailable(context, message) { 97 if (!this._pushService) { 98 return; 99 } 100 this._pushService._wsOnMessageAvailable(context, message); 101 }, 102 103 onServerClose(context, aStatusCode, aReason) { 104 if (!this._pushService) { 105 return; 106 } 107 this._pushService._wsOnServerClose(context, aStatusCode, aReason); 108 }, 109 }; 110 111 // websocket states 112 // websocket is off 113 const STATE_SHUT_DOWN = 0; 114 // Websocket has been opened on client side, waiting for successful open. 115 // (_wsOnStart) 116 const STATE_WAITING_FOR_WS_START = 1; 117 // Websocket opened, hello sent, waiting for server reply (_handleHelloReply). 118 const STATE_WAITING_FOR_HELLO = 2; 119 // Websocket operational, handshake completed, begin protocol messaging. 120 const STATE_READY = 3; 121 122 export var PushServiceWebSocket = { 123 QueryInterface: ChromeUtils.generateQI(["nsINamed", "nsIObserver"]), 124 name: "PushServiceWebSocket", 125 126 _mainPushService: null, 127 _serverURI: null, 128 _currentlyRegistering: new Set(), 129 130 newPushDB() { 131 return new PushDB( 132 kPUSHWSDB_DB_NAME, 133 kPUSHWSDB_DB_VERSION, 134 kPUSHWSDB_STORE_NAME, 135 "channelID", 136 PushRecordWebSocket 137 ); 138 }, 139 140 disconnect() { 141 this._shutdownWS(); 142 }, 143 144 observe(aSubject, aTopic, aData) { 145 if (aTopic == "nsPref:changed" && aData == "userAgentID") { 146 this._onUAIDChanged(); 147 } else if (aTopic == "timer-callback") { 148 this._onTimerFired(aSubject); 149 } 150 }, 151 152 /** 153 * Handles a UAID change. Unlike reconnects, we cancel all pending requests 154 * after disconnecting. Existing subscriptions stored in IndexedDB will be 155 * dropped on reconnect. 156 */ 157 _onUAIDChanged() { 158 lazy.console.debug("onUAIDChanged()"); 159 160 this._shutdownWS(); 161 this._startBackoffTimer(); 162 }, 163 164 /** Handles a ping, backoff, or request timeout timer event. */ 165 _onTimerFired(timer) { 166 lazy.console.debug("onTimerFired()"); 167 168 if (timer == this._pingTimer) { 169 this._sendPing(); 170 return; 171 } 172 173 if (timer == this._backoffTimer) { 174 lazy.console.debug("onTimerFired: Reconnecting after backoff"); 175 this._beginWSSetup(); 176 return; 177 } 178 179 if (timer == this._requestTimeoutTimer) { 180 this._timeOutRequests(); 181 } 182 }, 183 184 /** 185 * Sends a ping to the server. Bypasses the request queue, but starts the 186 * request timeout timer. If the socket is already closed, or the server 187 * does not respond within the timeout, the client will reconnect. 188 */ 189 _sendPing() { 190 lazy.console.debug("sendPing()"); 191 192 this._startRequestTimeoutTimer(); 193 try { 194 this._wsSendMessage({}); 195 this._lastPingTime = Date.now(); 196 } catch (e) { 197 lazy.console.debug("sendPing: Error sending ping", e); 198 this._reconnect(); 199 } 200 }, 201 202 /** Times out any pending requests. */ 203 _timeOutRequests() { 204 lazy.console.debug("timeOutRequests()"); 205 206 if (!this._hasPendingRequests()) { 207 // Cancel the repeating timer and exit early if we aren't waiting for 208 // pongs or requests. 209 this._requestTimeoutTimer.cancel(); 210 return; 211 } 212 213 let now = Date.now(); 214 215 // Set to true if at least one request timed out, or we're still waiting 216 // for a pong after the request timeout. 217 let requestTimedOut = false; 218 219 if ( 220 this._lastPingTime > 0 && 221 now - this._lastPingTime > this._requestTimeout 222 ) { 223 lazy.console.debug("timeOutRequests: Did not receive pong in time"); 224 requestTimedOut = true; 225 } else { 226 for (let [key, request] of this._pendingRequests) { 227 let duration = now - request.ctime; 228 // If any of the registration requests time out, all the ones after it 229 // also made to fail, since we are going to be disconnecting the 230 // socket. 231 requestTimedOut |= duration > this._requestTimeout; 232 if (requestTimedOut) { 233 request.reject(new Error("Request timed out: " + key)); 234 this._pendingRequests.delete(key); 235 } 236 } 237 } 238 239 // The most likely reason for a pong or registration request timing out is 240 // that the socket has disconnected. Best to reconnect. 241 if (requestTimedOut) { 242 this._reconnect(); 243 } 244 }, 245 246 get _UAID() { 247 return prefs.getStringPref("userAgentID"); 248 }, 249 250 set _UAID(newID) { 251 if (typeof newID !== "string") { 252 lazy.console.warn( 253 "Got invalid, non-string UAID", 254 newID, 255 "Not updating userAgentID" 256 ); 257 return; 258 } 259 lazy.console.debug("New _UAID", newID); 260 prefs.setStringPref("userAgentID", newID); 261 }, 262 263 _ws: null, 264 _pendingRequests: new Map(), 265 _currentState: STATE_SHUT_DOWN, 266 _requestTimeout: 0, 267 _requestTimeoutTimer: null, 268 _retryFailCount: 0, 269 270 /** 271 * According to the WS spec, servers should immediately close the underlying 272 * TCP connection after they close a WebSocket. This causes wsOnStop to be 273 * called with error NS_BASE_STREAM_CLOSED. Since the client has to keep the 274 * WebSocket up, it should try to reconnect. But if the server closes the 275 * WebSocket because it wants the client to back off, then the client 276 * shouldn't re-establish the connection. If the server sends the backoff 277 * close code, this field will be set to true in wsOnServerClose. It is 278 * checked in wsOnStop. 279 */ 280 _skipReconnect: false, 281 282 /** Indicates whether the server supports Web Push-style message delivery. */ 283 _dataEnabled: false, 284 285 /** 286 * The last time the client sent a ping to the server. If non-zero, keeps the 287 * request timeout timer active. Reset to zero when the server responds with 288 * a pong or pending messages. 289 */ 290 _lastPingTime: 0, 291 292 /** 293 * A one-shot timer used to ping the server, to avoid timing out idle 294 * connections. Reset to the ping interval on each incoming message. 295 */ 296 _pingTimer: null, 297 298 /** A one-shot timer fired after the reconnect backoff period. */ 299 _backoffTimer: null, 300 301 /** 302 * Sends a message to the Push Server through an open websocket. 303 * typeof(msg) shall be an object 304 */ 305 _wsSendMessage(msg) { 306 if (!this._ws) { 307 lazy.console.warn( 308 "wsSendMessage: No WebSocket initialized.", 309 "Cannot send a message" 310 ); 311 return; 312 } 313 msg = JSON.stringify(msg); 314 lazy.console.debug("wsSendMessage: Sending message", msg); 315 this._ws.sendMsg(msg); 316 }, 317 318 init(options, mainPushService, serverURI) { 319 lazy.console.debug("init()"); 320 321 this._mainPushService = mainPushService; 322 this._serverURI = serverURI; 323 // Filled in at connect() time 324 this._broadcastListeners = null; 325 326 // Override the default WebSocket factory function. The returned object 327 // must be null or satisfy the nsIWebSocketChannel interface. Used by 328 // the tests to provide a mock WebSocket implementation. 329 if (options.makeWebSocket) { 330 this._makeWebSocket = options.makeWebSocket; 331 } 332 333 this._requestTimeout = prefs.getIntPref("requestTimeout"); 334 335 return Promise.resolve(); 336 }, 337 338 _reconnect() { 339 lazy.console.debug("reconnect()"); 340 this._shutdownWS(false); 341 this._startBackoffTimer(); 342 }, 343 344 _shutdownWS(shouldCancelPending = true) { 345 lazy.console.debug("shutdownWS()"); 346 347 if (this._currentState == STATE_READY) { 348 prefs.removeObserver("userAgentID", this); 349 } 350 351 this._currentState = STATE_SHUT_DOWN; 352 this._skipReconnect = false; 353 354 if (this._wsListener) { 355 this._wsListener._pushService = null; 356 } 357 try { 358 this._ws.close(0, null); 359 } catch (e) {} 360 this._ws = null; 361 362 this._lastPingTime = 0; 363 364 if (this._pingTimer) { 365 this._pingTimer.cancel(); 366 } 367 368 if (shouldCancelPending) { 369 this._cancelPendingRequests(); 370 } 371 372 if (this._notifyRequestQueue) { 373 this._notifyRequestQueue(); 374 this._notifyRequestQueue = null; 375 } 376 }, 377 378 uninit() { 379 // All pending requests (ideally none) are dropped at this point. We 380 // shouldn't have any applications performing registration/unregistration 381 // or receiving notifications. 382 this._shutdownWS(); 383 384 if (this._backoffTimer) { 385 this._backoffTimer.cancel(); 386 } 387 if (this._requestTimeoutTimer) { 388 this._requestTimeoutTimer.cancel(); 389 } 390 391 this._mainPushService = null; 392 393 this._dataEnabled = false; 394 }, 395 396 /** 397 * How retries work: If the WS is closed due to a socket error, 398 * _startBackoffTimer() is called. The retry timer is started and when 399 * it times out, beginWSSetup() is called again. 400 * 401 * If we are in the middle of a timeout (i.e. waiting), but 402 * a register/unregister is called, we don't want to wait around anymore. 403 * _sendRequest will automatically call beginWSSetup(), which will cancel the 404 * timer. In addition since the state will have changed, even if a pending 405 * timer event comes in (because the timer fired the event before it was 406 * cancelled), so the connection won't be reset. 407 */ 408 _startBackoffTimer() { 409 lazy.console.debug("startBackoffTimer()"); 410 411 // Calculate new timeout, but cap it to pingInterval. 412 let retryTimeout = 413 prefs.getIntPref("retryBaseInterval") * Math.pow(2, this._retryFailCount); 414 retryTimeout = Math.min(retryTimeout, prefs.getIntPref("pingInterval")); 415 416 this._retryFailCount++; 417 418 lazy.console.debug( 419 "startBackoffTimer: Retry in", 420 retryTimeout, 421 "Try number", 422 this._retryFailCount 423 ); 424 425 if (!this._backoffTimer) { 426 this._backoffTimer = Cc["@mozilla.org/timer;1"].createInstance( 427 Ci.nsITimer 428 ); 429 } 430 this._backoffTimer.init(this, retryTimeout, Ci.nsITimer.TYPE_ONE_SHOT); 431 }, 432 433 /** Indicates whether we're waiting for pongs or requests. */ 434 _hasPendingRequests() { 435 return this._lastPingTime > 0 || this._pendingRequests.size > 0; 436 }, 437 438 /** 439 * Starts the request timeout timer unless we're already waiting for a pong 440 * or register request. 441 */ 442 _startRequestTimeoutTimer() { 443 if (this._hasPendingRequests()) { 444 return; 445 } 446 if (!this._requestTimeoutTimer) { 447 this._requestTimeoutTimer = Cc["@mozilla.org/timer;1"].createInstance( 448 Ci.nsITimer 449 ); 450 } 451 this._requestTimeoutTimer.init( 452 this, 453 this._requestTimeout, 454 Ci.nsITimer.TYPE_REPEATING_SLACK 455 ); 456 }, 457 458 /** Starts or resets the ping timer. */ 459 _startPingTimer() { 460 if (!this._pingTimer) { 461 this._pingTimer = Cc["@mozilla.org/timer;1"].createInstance(Ci.nsITimer); 462 } 463 this._pingTimer.init( 464 this, 465 prefs.getIntPref("pingInterval"), 466 Ci.nsITimer.TYPE_ONE_SHOT 467 ); 468 }, 469 470 _makeWebSocket(uri) { 471 if (!prefs.getBoolPref("connection.enabled")) { 472 lazy.console.warn( 473 "makeWebSocket: connection.enabled is not set to true.", 474 "Aborting." 475 ); 476 return null; 477 } 478 if (Services.io.offline) { 479 lazy.console.warn("makeWebSocket: Network is offline."); 480 return null; 481 } 482 let contractId = 483 uri.scheme == "ws" 484 ? "@mozilla.org/network/protocol;1?name=ws" 485 : "@mozilla.org/network/protocol;1?name=wss"; 486 let socket = Cc[contractId].createInstance(Ci.nsIWebSocketChannel); 487 488 socket.initLoadInfo( 489 null, // aLoadingNode 490 Services.scriptSecurityManager.getSystemPrincipal(), 491 null, // aTriggeringPrincipal 492 Ci.nsILoadInfo.SEC_ALLOW_CROSS_ORIGIN_SEC_CONTEXT_IS_NULL, 493 Ci.nsIContentPolicy.TYPE_WEBSOCKET 494 ); 495 // Allow deprecated HTTP request from SystemPrincipal 496 socket.loadInfo.allowDeprecatedSystemRequests = true; 497 498 return socket; 499 }, 500 501 _beginWSSetup() { 502 lazy.console.debug("beginWSSetup()"); 503 if (this._currentState != STATE_SHUT_DOWN) { 504 lazy.console.error( 505 "_beginWSSetup: Not in shutdown state! Current state", 506 this._currentState 507 ); 508 return; 509 } 510 511 // Stop any pending reconnects scheduled for the near future. 512 if (this._backoffTimer) { 513 this._backoffTimer.cancel(); 514 } 515 516 let uri = this._serverURI; 517 if (!uri) { 518 return; 519 } 520 let socket = this._makeWebSocket(uri); 521 if (!socket) { 522 return; 523 } 524 this._ws = socket.QueryInterface(Ci.nsIWebSocketChannel); 525 526 lazy.console.debug("beginWSSetup: Connecting to", uri.spec); 527 this._wsListener = new PushWebSocketListener(this); 528 this._ws.protocol = "push-notification"; 529 530 try { 531 // Grab a wakelock before we open the socket to ensure we don't go to 532 // sleep before connection the is opened. 533 this._ws.asyncOpen(uri, uri.spec, {}, 0, this._wsListener, null); 534 this._currentState = STATE_WAITING_FOR_WS_START; 535 } catch (e) { 536 lazy.console.error( 537 "beginWSSetup: Error opening websocket.", 538 "asyncOpen failed", 539 e 540 ); 541 this._reconnect(); 542 } 543 }, 544 545 connect(broadcastListeners) { 546 lazy.console.debug("connect()", broadcastListeners); 547 this._broadcastListeners = broadcastListeners; 548 this._beginWSSetup(); 549 }, 550 551 isConnected() { 552 return !!this._ws; 553 }, 554 555 /** 556 * Protocol handler invoked by server message. 557 */ 558 _handleHelloReply(reply) { 559 lazy.console.debug("handleHelloReply()"); 560 if (this._currentState != STATE_WAITING_FOR_HELLO) { 561 lazy.console.error( 562 "handleHelloReply: Unexpected state", 563 this._currentState, 564 "(expected STATE_WAITING_FOR_HELLO)" 565 ); 566 this._shutdownWS(); 567 return; 568 } 569 570 if (typeof reply.uaid !== "string") { 571 lazy.console.error("handleHelloReply: Received invalid UAID", reply.uaid); 572 this._shutdownWS(); 573 return; 574 } 575 576 if (reply.uaid === "") { 577 lazy.console.error("handleHelloReply: Received empty UAID"); 578 this._shutdownWS(); 579 return; 580 } 581 582 // To avoid sticking extra large values sent by an evil server into prefs. 583 if (reply.uaid.length > 128) { 584 lazy.console.error( 585 "handleHelloReply: UAID received from server was too long", 586 reply.uaid 587 ); 588 this._shutdownWS(); 589 return; 590 } 591 592 let sendRequests = () => { 593 if (this._notifyRequestQueue) { 594 this._notifyRequestQueue(); 595 this._notifyRequestQueue = null; 596 } 597 this._sendPendingRequests(); 598 }; 599 600 function finishHandshake() { 601 this._UAID = reply.uaid; 602 this._currentState = STATE_READY; 603 prefs.addObserver("userAgentID", this); 604 605 // Handle broadcasts received in response to the "hello" message. 606 if (!lazy.ObjectUtils.isEmpty(reply.broadcasts)) { 607 // The reply isn't technically a broadcast message, but it has 608 // the shape of a broadcast message (it has a broadcasts field). 609 const context = { phase: lazy.pushBroadcastService.PHASES.HELLO }; 610 this._mainPushService.receivedBroadcastMessage(reply, context); 611 } 612 613 this._dataEnabled = !!reply.use_webpush; 614 if (this._dataEnabled) { 615 this._mainPushService 616 .getAllUnexpired() 617 .then(records => 618 Promise.all( 619 records.map(record => 620 this._mainPushService.ensureCrypto(record).catch(error => { 621 lazy.console.error( 622 "finishHandshake: Error updating record", 623 record.keyID, 624 error 625 ); 626 }) 627 ) 628 ) 629 ) 630 .then(sendRequests); 631 } else { 632 sendRequests(); 633 } 634 } 635 636 // By this point we've got a UAID from the server that we are ready to 637 // accept. 638 // 639 // We unconditionally drop all existing registrations and notify service 640 // workers if we receive a new UAID. This ensures we expunge all stale 641 // registrations if the `userAgentID` pref is reset. 642 if (this._UAID != reply.uaid) { 643 lazy.console.debug("handleHelloReply: Received new UAID"); 644 645 this._mainPushService 646 .dropUnexpiredRegistrations() 647 .then(finishHandshake.bind(this)); 648 649 return; 650 } 651 652 // otherwise we are good to go 653 finishHandshake.bind(this)(); 654 }, 655 656 /** 657 * Protocol handler invoked by server message. 658 */ 659 _handleRegisterReply(reply) { 660 lazy.console.debug("handleRegisterReply()"); 661 662 let tmp = this._takeRequestForReply(reply); 663 if (!tmp) { 664 return; 665 } 666 667 if (reply.status == 200) { 668 try { 669 Services.io.newURI(reply.pushEndpoint); 670 } catch (e) { 671 tmp.reject(new Error("Invalid push endpoint: " + reply.pushEndpoint)); 672 return; 673 } 674 675 let record = new PushRecordWebSocket({ 676 channelID: reply.channelID, 677 pushEndpoint: reply.pushEndpoint, 678 scope: tmp.record.scope, 679 originAttributes: tmp.record.originAttributes, 680 version: null, 681 systemRecord: tmp.record.systemRecord, 682 appServerKey: tmp.record.appServerKey, 683 ctime: Date.now(), 684 }); 685 tmp.resolve(record); 686 } else { 687 lazy.console.error( 688 "handleRegisterReply: Unexpected server response", 689 reply 690 ); 691 tmp.reject( 692 new Error("Wrong status code for register reply: " + reply.status) 693 ); 694 } 695 }, 696 697 _handleUnregisterReply(reply) { 698 lazy.console.debug("handleUnregisterReply()"); 699 700 let request = this._takeRequestForReply(reply); 701 if (!request) { 702 return; 703 } 704 705 let success = reply.status === 200; 706 request.resolve(success); 707 }, 708 709 _handleDataUpdate(update) { 710 let promise; 711 if (typeof update.channelID != "string") { 712 lazy.console.warn( 713 "handleDataUpdate: Discarding update without channel ID", 714 update 715 ); 716 return; 717 } 718 function updateRecord(record) { 719 // Ignore messages that we've already processed. This can happen if the 720 // connection drops between notifying the service worker and acking the 721 // the message. In that case, the server will re-send the message on 722 // reconnect. 723 if (record.hasRecentMessageID(update.version)) { 724 lazy.console.warn( 725 "handleDataUpdate: Ignoring duplicate message", 726 update.version 727 ); 728 Glean.webPush.detectedDuplicatedMessageIds.add(); 729 return null; 730 } 731 record.noteRecentMessageID(update.version); 732 return record; 733 } 734 if (typeof update.data != "string") { 735 promise = this._mainPushService.receivedPushMessage( 736 update.channelID, 737 update.version, 738 null, 739 null, 740 updateRecord 741 ); 742 } else { 743 let message = ChromeUtils.base64URLDecode(update.data, { 744 // The Push server may append padding. 745 padding: "ignore", 746 }); 747 promise = this._mainPushService.receivedPushMessage( 748 update.channelID, 749 update.version, 750 update.headers, 751 message, 752 updateRecord 753 ); 754 } 755 promise 756 .then( 757 status => { 758 this._sendAck(update.channelID, update.version, status); 759 }, 760 err => { 761 lazy.console.error( 762 "handleDataUpdate: Error delivering message", 763 update, 764 err 765 ); 766 this._sendAck( 767 update.channelID, 768 update.version, 769 Ci.nsIPushErrorReporter.ACK_DECRYPTION_ERROR 770 ); 771 } 772 ) 773 .catch(err => { 774 lazy.console.error( 775 "handleDataUpdate: Error acknowledging message", 776 update, 777 err 778 ); 779 }); 780 }, 781 782 /** 783 * Protocol handler invoked by server message. 784 */ 785 _handleNotificationReply(reply) { 786 lazy.console.debug("handleNotificationReply()"); 787 if (this._dataEnabled) { 788 this._handleDataUpdate(reply); 789 return; 790 } 791 792 if (typeof reply.updates !== "object") { 793 lazy.console.warn( 794 "handleNotificationReply: Missing updates", 795 reply.updates 796 ); 797 return; 798 } 799 800 lazy.console.debug("handleNotificationReply: Got updates", reply.updates); 801 for (let i = 0; i < reply.updates.length; i++) { 802 let update = reply.updates[i]; 803 lazy.console.debug("handleNotificationReply: Handling update", update); 804 if (typeof update.channelID !== "string") { 805 lazy.console.debug( 806 "handleNotificationReply: Invalid update at index", 807 i, 808 update 809 ); 810 continue; 811 } 812 813 if (update.version === undefined) { 814 lazy.console.debug("handleNotificationReply: Missing version", update); 815 continue; 816 } 817 818 let version = update.version; 819 820 if (typeof version === "string") { 821 version = parseInt(version, 10); 822 } 823 824 if (typeof version === "number" && version >= 0) { 825 // FIXME(nsm): this relies on app update notification being infallible! 826 // eventually fix this 827 this._receivedUpdate(update.channelID, version); 828 } 829 } 830 }, 831 832 _handleBroadcastReply(reply) { 833 let phase = lazy.pushBroadcastService.PHASES.BROADCAST; 834 // Check if this reply is the result of registration. 835 for (const id of Object.keys(reply.broadcasts)) { 836 const wasRegistering = this._currentlyRegistering.delete(id); 837 if (wasRegistering) { 838 // If we get multiple broadcasts and only one is "registering", 839 // then we consider the phase to be REGISTER for all of them. 840 // It is acceptable since registrations do not happen so often, 841 // and are all very likely to occur soon after browser startup. 842 phase = lazy.pushBroadcastService.PHASES.REGISTER; 843 } 844 } 845 const context = { phase }; 846 this._mainPushService.receivedBroadcastMessage(reply, context); 847 }, 848 849 reportDeliveryError(messageID, reason) { 850 lazy.console.debug("reportDeliveryError()"); 851 let code = kDELIVERY_REASON_TO_CODE[reason]; 852 if (!code) { 853 throw new Error("Invalid delivery error reason"); 854 } 855 Glean.webPush.errorCode[kERROR_CODE_TO_GLEAN_LABEL[reason]].add(); 856 let data = { messageType: "nack", version: messageID, code }; 857 this._queueRequest(data); 858 }, 859 860 _sendAck(channelID, version, status) { 861 lazy.console.debug("sendAck()"); 862 let code = kACK_STATUS_TO_CODE[status]; 863 if (!code) { 864 throw new Error("Invalid ack status"); 865 } 866 if (code > 100) { 867 Glean.webPush.errorCode[kERROR_CODE_TO_GLEAN_LABEL[status]].add(); 868 } 869 let data = { messageType: "ack", updates: [{ channelID, version, code }] }; 870 this._queueRequest(data); 871 }, 872 873 _generateID() { 874 // generateUUID() gives a UUID surrounded by {...}, slice them off. 875 return Services.uuid.generateUUID().toString().slice(1, -1); 876 }, 877 878 register(record) { 879 lazy.console.debug("register() ", record); 880 881 let data = { channelID: this._generateID(), messageType: "register" }; 882 883 if (record.appServerKey) { 884 data.key = ChromeUtils.base64URLEncode(record.appServerKey, { 885 // The Push server requires padding. 886 pad: true, 887 }); 888 } 889 890 return this._sendRequestForReply(record, data).then(requestRecord => { 891 if (!this._dataEnabled) { 892 return requestRecord; 893 } 894 return PushCrypto.generateKeys().then(([publicKey, privateKey]) => { 895 requestRecord.p256dhPublicKey = publicKey; 896 requestRecord.p256dhPrivateKey = privateKey; 897 requestRecord.authenticationSecret = 898 PushCrypto.generateAuthenticationSecret(); 899 return requestRecord; 900 }); 901 }); 902 }, 903 904 unregister(record, reason) { 905 lazy.console.debug("unregister() ", record, reason); 906 907 return Promise.resolve().then(_ => { 908 let code = kUNREGISTER_REASON_TO_CODE[reason]; 909 if (!code) { 910 throw new Error("Invalid unregister reason"); 911 } 912 let data = { 913 channelID: record.channelID, 914 messageType: "unregister", 915 code, 916 }; 917 918 return this._sendRequestForReply(record, data); 919 }); 920 }, 921 922 _queueStart: Promise.resolve(), 923 _notifyRequestQueue: null, 924 _queue: null, 925 _enqueue(op) { 926 lazy.console.debug("enqueue()"); 927 if (!this._queue) { 928 this._queue = this._queueStart; 929 } 930 this._queue = this._queue.then(op).catch(_ => {}); 931 }, 932 933 /** Sends a request to the server. */ 934 _send(data) { 935 if (this._currentState != STATE_READY) { 936 lazy.console.warn( 937 "send: Unexpected state; ignoring message", 938 this._currentState 939 ); 940 return; 941 } 942 if (!this._requestHasReply(data)) { 943 this._wsSendMessage(data); 944 return; 945 } 946 // If we're expecting a reply, check that we haven't cancelled the request. 947 let key = this._makePendingRequestKey(data); 948 if (!this._pendingRequests.has(key)) { 949 lazy.console.log("send: Request cancelled; ignoring message", key); 950 return; 951 } 952 this._wsSendMessage(data); 953 }, 954 955 /** Indicates whether a request has a corresponding reply from the server. */ 956 _requestHasReply(data) { 957 return data.messageType == "register" || data.messageType == "unregister"; 958 }, 959 960 /** 961 * Sends all pending requests that expect replies. Called after the connection 962 * is established and the handshake is complete. 963 */ 964 _sendPendingRequests() { 965 this._enqueue(_ => { 966 for (let request of this._pendingRequests.values()) { 967 this._send(request.data); 968 } 969 }); 970 }, 971 972 /** Queues an outgoing request, establishing a connection if necessary. */ 973 _queueRequest(data) { 974 lazy.console.debug("queueRequest()", data); 975 976 if (this._currentState == STATE_READY) { 977 // If we're ready, no need to queue; just send the request. 978 this._send(data); 979 return; 980 } 981 982 // Otherwise, we're still setting up. If we don't have a request queue, 983 // make one now. 984 if (!this._notifyRequestQueue) { 985 let promise = new Promise(resolve => { 986 this._notifyRequestQueue = resolve; 987 }); 988 this._enqueue(_ => promise); 989 } 990 991 let isRequest = this._requestHasReply(data); 992 if (!isRequest) { 993 // Don't queue requests, since they're stored in `_pendingRequests`, and 994 // `_sendPendingRequests` will send them after reconnecting. Without this 995 // check, we'd send requests twice. 996 this._enqueue(_ => this._send(data)); 997 } 998 999 if (!this._ws) { 1000 // This will end up calling notifyRequestQueue(). 1001 this._beginWSSetup(); 1002 // If beginWSSetup does not succeed to make ws, notifyRequestQueue will 1003 // not be call. 1004 if (!this._ws && this._notifyRequestQueue) { 1005 this._notifyRequestQueue(); 1006 this._notifyRequestQueue = null; 1007 } 1008 } 1009 }, 1010 1011 _receivedUpdate(aChannelID, aLatestVersion) { 1012 lazy.console.debug( 1013 "receivedUpdate: Updating", 1014 aChannelID, 1015 "->", 1016 aLatestVersion 1017 ); 1018 1019 this._mainPushService 1020 .receivedPushMessage(aChannelID, "", null, null, record => { 1021 if (record.version === null || record.version < aLatestVersion) { 1022 lazy.console.debug( 1023 "receivedUpdate: Version changed for", 1024 aChannelID, 1025 aLatestVersion 1026 ); 1027 record.version = aLatestVersion; 1028 return record; 1029 } 1030 lazy.console.debug( 1031 "receivedUpdate: No significant version change for", 1032 aChannelID, 1033 aLatestVersion 1034 ); 1035 return null; 1036 }) 1037 .then(status => { 1038 this._sendAck(aChannelID, aLatestVersion, status); 1039 }) 1040 .catch(err => { 1041 lazy.console.error( 1042 "receivedUpdate: Error acknowledging message", 1043 aChannelID, 1044 aLatestVersion, 1045 err 1046 ); 1047 }); 1048 }, 1049 1050 // begin Push protocol handshake 1051 _wsOnStart() { 1052 lazy.console.debug("wsOnStart()"); 1053 1054 if (this._currentState != STATE_WAITING_FOR_WS_START) { 1055 lazy.console.error( 1056 "wsOnStart: NOT in STATE_WAITING_FOR_WS_START. Current", 1057 "state", 1058 this._currentState, 1059 "Skipping" 1060 ); 1061 return; 1062 } 1063 1064 this._mainPushService 1065 .getAllUnexpired() 1066 .then( 1067 records => this._sendHello(records), 1068 err => { 1069 lazy.console.warn( 1070 "Error fetching existing records before handshake; assuming none", 1071 err 1072 ); 1073 this._sendHello([]); 1074 } 1075 ) 1076 .catch(err => { 1077 // If we failed to send the handshake, back off and reconnect. 1078 lazy.console.warn("Failed to send handshake; reconnecting", err); 1079 this._reconnect(); 1080 }); 1081 }, 1082 1083 /** 1084 * Sends a `hello` handshake to the server. 1085 * 1086 * @param {Array<PushRecordWebSocket>} An array of records for existing 1087 * subscriptions, used to determine whether to rotate our UAID. 1088 */ 1089 _sendHello(records) { 1090 let data = { 1091 messageType: "hello", 1092 broadcasts: this._broadcastListeners, 1093 use_webpush: true, 1094 }; 1095 1096 if (records.length && this._UAID) { 1097 // Only send our UAID if we have existing push subscriptions, to 1098 // avoid tying a persistent identifier to the connection (bug 1099 // 1617136). The push server will issue our client a new UAID in 1100 // the `hello` response, which we'll store until either the next 1101 // time we reconnect, or the user subscribes to push. Once we have a 1102 // push subscription, we'll stop rotating the UAID when we connect, 1103 // so that we can receive push messages for them. 1104 data.uaid = this._UAID; 1105 } 1106 1107 this._wsSendMessage(data); 1108 this._currentState = STATE_WAITING_FOR_HELLO; 1109 }, 1110 1111 /** 1112 * This statusCode is not the websocket protocol status code, but the TCP 1113 * connection close status code. 1114 * 1115 * If we do not explicitly call ws.close() then statusCode is always 1116 * NS_BASE_STREAM_CLOSED, even on a successful close. 1117 */ 1118 _wsOnStop(context, statusCode) { 1119 lazy.console.debug("wsOnStop()"); 1120 1121 if (statusCode != Cr.NS_OK && !this._skipReconnect) { 1122 lazy.console.debug( 1123 "wsOnStop: Reconnecting after socket error", 1124 statusCode 1125 ); 1126 this._reconnect(); 1127 return; 1128 } 1129 1130 this._shutdownWS(); 1131 }, 1132 1133 _wsOnMessageAvailable(context, message) { 1134 lazy.console.debug("wsOnMessageAvailable()", message); 1135 1136 // Clearing the last ping time indicates we're no longer waiting for a pong. 1137 this._lastPingTime = 0; 1138 1139 let reply; 1140 try { 1141 reply = JSON.parse(message); 1142 } catch (e) { 1143 lazy.console.warn("wsOnMessageAvailable: Invalid JSON", message, e); 1144 return; 1145 } 1146 1147 // If we receive a message, we know the connection succeeded. Reset the 1148 // connection attempt and ping interval counters. 1149 this._retryFailCount = 0; 1150 1151 let doNotHandle = false; 1152 if ( 1153 message === "{}" || 1154 reply.messageType === undefined || 1155 reply.messageType === "ping" || 1156 typeof reply.messageType != "string" 1157 ) { 1158 lazy.console.debug("wsOnMessageAvailable: Pong received"); 1159 doNotHandle = true; 1160 } 1161 1162 // Reset the ping timer. Note: This path is executed at every step of the 1163 // handshake, so this timer does not need to be set explicitly at startup. 1164 this._startPingTimer(); 1165 1166 // If it is a ping, do not handle the message. 1167 if (doNotHandle) { 1168 if (!this._hasPendingRequests()) { 1169 this._requestTimeoutTimer.cancel(); 1170 } 1171 return; 1172 } 1173 1174 // An allowlist of protocol handlers. Add to these if new messages are added 1175 // in the protocol. 1176 let handlers = [ 1177 "Hello", 1178 "Register", 1179 "Unregister", 1180 "Notification", 1181 "Broadcast", 1182 ]; 1183 1184 // Build up the handler name to call from messageType. 1185 // e.g. messageType == "register" -> _handleRegisterReply. 1186 let handlerName = 1187 reply.messageType[0].toUpperCase() + 1188 reply.messageType.slice(1).toLowerCase(); 1189 1190 if (!handlers.includes(handlerName)) { 1191 lazy.console.warn( 1192 "wsOnMessageAvailable: No allowlisted handler", 1193 handlerName, 1194 "for message", 1195 reply.messageType 1196 ); 1197 return; 1198 } 1199 1200 let handler = "_handle" + handlerName + "Reply"; 1201 1202 if (typeof this[handler] !== "function") { 1203 lazy.console.warn( 1204 "wsOnMessageAvailable: Handler", 1205 handler, 1206 "allowlisted but not implemented" 1207 ); 1208 return; 1209 } 1210 1211 this[handler](reply); 1212 }, 1213 1214 /** 1215 * The websocket should never be closed. Since we don't call ws.close(), 1216 * _wsOnStop() receives error code NS_BASE_STREAM_CLOSED (see comment in that 1217 * function), which calls reconnect and re-establishes the WebSocket 1218 * connection. 1219 * 1220 * If the server requested that we back off, we won't reconnect until the 1221 * next network state change event, or until we need to send a new register 1222 * request. 1223 */ 1224 _wsOnServerClose(context, aStatusCode, aReason) { 1225 lazy.console.debug("wsOnServerClose()", aStatusCode, aReason); 1226 1227 if (aStatusCode == kBACKOFF_WS_STATUS_CODE) { 1228 lazy.console.debug("wsOnServerClose: Skipping automatic reconnect"); 1229 this._skipReconnect = true; 1230 } 1231 }, 1232 1233 /** 1234 * Rejects all pending register requests with errors. 1235 */ 1236 _cancelPendingRequests() { 1237 for (let request of this._pendingRequests.values()) { 1238 request.reject(new Error("Request aborted")); 1239 } 1240 this._pendingRequests.clear(); 1241 }, 1242 1243 /** Creates a case-insensitive map key for a request that expects a reply. */ 1244 _makePendingRequestKey(data) { 1245 return (data.messageType + "|" + data.channelID).toLowerCase(); 1246 }, 1247 1248 /** Sends a request and waits for a reply from the server. */ 1249 _sendRequestForReply(record, data) { 1250 return Promise.resolve().then(_ => { 1251 // start the timer since we now have at least one request 1252 this._startRequestTimeoutTimer(); 1253 1254 let key = this._makePendingRequestKey(data); 1255 if (!this._pendingRequests.has(key)) { 1256 let request = { 1257 data, 1258 record, 1259 ctime: Date.now(), 1260 }; 1261 request.promise = new Promise((resolve, reject) => { 1262 request.resolve = resolve; 1263 request.reject = reject; 1264 }); 1265 this._pendingRequests.set(key, request); 1266 this._queueRequest(data); 1267 } 1268 1269 return this._pendingRequests.get(key).promise; 1270 }); 1271 }, 1272 1273 /** Removes and returns a pending request for a server reply. */ 1274 _takeRequestForReply(reply) { 1275 if (typeof reply.channelID !== "string") { 1276 return null; 1277 } 1278 let key = this._makePendingRequestKey(reply); 1279 let request = this._pendingRequests.get(key); 1280 if (!request) { 1281 return null; 1282 } 1283 this._pendingRequests.delete(key); 1284 if (!this._hasPendingRequests()) { 1285 this._requestTimeoutTimer.cancel(); 1286 } 1287 return request; 1288 }, 1289 1290 sendSubscribeBroadcast(serviceId, version) { 1291 this._currentlyRegistering.add(serviceId); 1292 let data = { 1293 messageType: "broadcast_subscribe", 1294 broadcasts: { 1295 [serviceId]: version, 1296 }, 1297 }; 1298 1299 this._queueRequest(data); 1300 }, 1301 }; 1302 1303 function PushRecordWebSocket(record) { 1304 PushRecord.call(this, record); 1305 this.channelID = record.channelID; 1306 this.version = record.version; 1307 } 1308 1309 PushRecordWebSocket.prototype = Object.create(PushRecord.prototype, { 1310 keyID: { 1311 get() { 1312 return this.channelID; 1313 }, 1314 }, 1315 }); 1316 1317 PushRecordWebSocket.prototype.toSubscription = function () { 1318 let subscription = PushRecord.prototype.toSubscription.call(this); 1319 subscription.version = this.version; 1320 return subscription; 1321 };