tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

PushServiceWebSocket.sys.mjs (38241B)


      1 /* This Source Code Form is subject to the terms of the Mozilla Public
      2 * License, v. 2.0. If a copy of the MPL was not distributed with this file,
      3 * You can obtain one at http://mozilla.org/MPL/2.0/. */
      4 
      5 import { PushDB } from "resource://gre/modules/PushDB.sys.mjs";
      6 import { PushRecord } from "resource://gre/modules/PushRecord.sys.mjs";
      7 import { PushCrypto } from "resource://gre/modules/PushCrypto.sys.mjs";
      8 
      9 const lazy = {};
     10 
     11 ChromeUtils.defineESModuleGetters(lazy, {
     12  ObjectUtils: "resource://gre/modules/ObjectUtils.sys.mjs",
     13  pushBroadcastService: "resource://gre/modules/PushBroadcastService.sys.mjs",
     14 });
     15 
     16 const kPUSHWSDB_DB_NAME = "pushapi";
     17 const kPUSHWSDB_DB_VERSION = 5; // Change this if the IndexedDB format changes
     18 const kPUSHWSDB_STORE_NAME = "pushapi";
     19 
     20 // WebSocket close code sent by the server to indicate that the client should
     21 // not automatically reconnect.
     22 const kBACKOFF_WS_STATUS_CODE = 4774;
     23 
     24 // Maps ack statuses, unsubscribe reasons, and delivery error reasons to codes
     25 // included in request payloads.
     26 const kACK_STATUS_TO_CODE = {
     27  [Ci.nsIPushErrorReporter.ACK_DELIVERED]: 100,
     28  [Ci.nsIPushErrorReporter.ACK_DECRYPTION_ERROR]: 101,
     29  [Ci.nsIPushErrorReporter.ACK_NOT_DELIVERED]: 102,
     30 };
     31 
     32 const kUNREGISTER_REASON_TO_CODE = {
     33  [Ci.nsIPushErrorReporter.UNSUBSCRIBE_MANUAL]: 200,
     34  [Ci.nsIPushErrorReporter.UNSUBSCRIBE_QUOTA_EXCEEDED]: 201,
     35  [Ci.nsIPushErrorReporter.UNSUBSCRIBE_PERMISSION_REVOKED]: 202,
     36 };
     37 
     38 const kDELIVERY_REASON_TO_CODE = {
     39  [Ci.nsIPushErrorReporter.DELIVERY_UNCAUGHT_EXCEPTION]: 301,
     40  [Ci.nsIPushErrorReporter.DELIVERY_UNHANDLED_REJECTION]: 302,
     41  [Ci.nsIPushErrorReporter.DELIVERY_INTERNAL_ERROR]: 303,
     42 };
     43 
     44 const kERROR_CODE_TO_GLEAN_LABEL = {
     45  [Ci.nsIPushErrorReporter.ACK_DECRYPTION_ERROR]: "decryption_error",
     46  [Ci.nsIPushErrorReporter.ACK_NOT_DELIVERED]: "not_delivered",
     47  [Ci.nsIPushErrorReporter.DELIVERY_UNCAUGHT_EXCEPTION]: "uncaught_exception",
     48  [Ci.nsIPushErrorReporter.DELIVERY_UNHANDLED_REJECTION]: "unhandled_rejection",
     49  [Ci.nsIPushErrorReporter.DELIVERY_INTERNAL_ERROR]: "internal_error",
     50 };
     51 
     52 const prefs = Services.prefs.getBranch("dom.push.");
     53 
     54 ChromeUtils.defineLazyGetter(lazy, "console", () => {
     55  return console.createInstance({
     56    maxLogLevelPref: "dom.push.loglevel",
     57    prefix: "PushServiceWebSocket",
     58  });
     59 });
     60 
     61 /**
     62 * A proxy between the PushService and the WebSocket. The listener is used so
     63 * that the PushService can silence messages from the WebSocket by setting
     64 * PushWebSocketListener._pushService to null. This is required because
     65 * a WebSocket can continue to send messages or errors after it has been
     66 * closed but the PushService may not be interested in these. It's easier to
     67 * stop listening than to have checks at specific points.
     68 */
     69 var PushWebSocketListener = function (pushService) {
     70  this._pushService = pushService;
     71 };
     72 
     73 PushWebSocketListener.prototype = {
     74  onStart(context) {
     75    if (!this._pushService) {
     76      return;
     77    }
     78    this._pushService._wsOnStart(context);
     79  },
     80 
     81  onStop(context, statusCode) {
     82    if (!this._pushService) {
     83      return;
     84    }
     85    this._pushService._wsOnStop(context, statusCode);
     86  },
     87 
     88  onAcknowledge() {
     89    // EMPTY
     90  },
     91 
     92  onBinaryMessageAvailable() {
     93    // EMPTY
     94  },
     95 
     96  onMessageAvailable(context, message) {
     97    if (!this._pushService) {
     98      return;
     99    }
    100    this._pushService._wsOnMessageAvailable(context, message);
    101  },
    102 
    103  onServerClose(context, aStatusCode, aReason) {
    104    if (!this._pushService) {
    105      return;
    106    }
    107    this._pushService._wsOnServerClose(context, aStatusCode, aReason);
    108  },
    109 };
    110 
    111 // websocket states
    112 // websocket is off
    113 const STATE_SHUT_DOWN = 0;
    114 // Websocket has been opened on client side, waiting for successful open.
    115 // (_wsOnStart)
    116 const STATE_WAITING_FOR_WS_START = 1;
    117 // Websocket opened, hello sent, waiting for server reply (_handleHelloReply).
    118 const STATE_WAITING_FOR_HELLO = 2;
    119 // Websocket operational, handshake completed, begin protocol messaging.
    120 const STATE_READY = 3;
    121 
    122 export var PushServiceWebSocket = {
    123  QueryInterface: ChromeUtils.generateQI(["nsINamed", "nsIObserver"]),
    124  name: "PushServiceWebSocket",
    125 
    126  _mainPushService: null,
    127  _serverURI: null,
    128  _currentlyRegistering: new Set(),
    129 
    130  newPushDB() {
    131    return new PushDB(
    132      kPUSHWSDB_DB_NAME,
    133      kPUSHWSDB_DB_VERSION,
    134      kPUSHWSDB_STORE_NAME,
    135      "channelID",
    136      PushRecordWebSocket
    137    );
    138  },
    139 
    140  disconnect() {
    141    this._shutdownWS();
    142  },
    143 
    144  observe(aSubject, aTopic, aData) {
    145    if (aTopic == "nsPref:changed" && aData == "userAgentID") {
    146      this._onUAIDChanged();
    147    } else if (aTopic == "timer-callback") {
    148      this._onTimerFired(aSubject);
    149    }
    150  },
    151 
    152  /**
    153   * Handles a UAID change. Unlike reconnects, we cancel all pending requests
    154   * after disconnecting. Existing subscriptions stored in IndexedDB will be
    155   * dropped on reconnect.
    156   */
    157  _onUAIDChanged() {
    158    lazy.console.debug("onUAIDChanged()");
    159 
    160    this._shutdownWS();
    161    this._startBackoffTimer();
    162  },
    163 
    164  /** Handles a ping, backoff, or request timeout timer event. */
    165  _onTimerFired(timer) {
    166    lazy.console.debug("onTimerFired()");
    167 
    168    if (timer == this._pingTimer) {
    169      this._sendPing();
    170      return;
    171    }
    172 
    173    if (timer == this._backoffTimer) {
    174      lazy.console.debug("onTimerFired: Reconnecting after backoff");
    175      this._beginWSSetup();
    176      return;
    177    }
    178 
    179    if (timer == this._requestTimeoutTimer) {
    180      this._timeOutRequests();
    181    }
    182  },
    183 
    184  /**
    185   * Sends a ping to the server. Bypasses the request queue, but starts the
    186   * request timeout timer. If the socket is already closed, or the server
    187   * does not respond within the timeout, the client will reconnect.
    188   */
    189  _sendPing() {
    190    lazy.console.debug("sendPing()");
    191 
    192    this._startRequestTimeoutTimer();
    193    try {
    194      this._wsSendMessage({});
    195      this._lastPingTime = Date.now();
    196    } catch (e) {
    197      lazy.console.debug("sendPing: Error sending ping", e);
    198      this._reconnect();
    199    }
    200  },
    201 
    202  /** Times out any pending requests. */
    203  _timeOutRequests() {
    204    lazy.console.debug("timeOutRequests()");
    205 
    206    if (!this._hasPendingRequests()) {
    207      // Cancel the repeating timer and exit early if we aren't waiting for
    208      // pongs or requests.
    209      this._requestTimeoutTimer.cancel();
    210      return;
    211    }
    212 
    213    let now = Date.now();
    214 
    215    // Set to true if at least one request timed out, or we're still waiting
    216    // for a pong after the request timeout.
    217    let requestTimedOut = false;
    218 
    219    if (
    220      this._lastPingTime > 0 &&
    221      now - this._lastPingTime > this._requestTimeout
    222    ) {
    223      lazy.console.debug("timeOutRequests: Did not receive pong in time");
    224      requestTimedOut = true;
    225    } else {
    226      for (let [key, request] of this._pendingRequests) {
    227        let duration = now - request.ctime;
    228        // If any of the registration requests time out, all the ones after it
    229        // also made to fail, since we are going to be disconnecting the
    230        // socket.
    231        requestTimedOut |= duration > this._requestTimeout;
    232        if (requestTimedOut) {
    233          request.reject(new Error("Request timed out: " + key));
    234          this._pendingRequests.delete(key);
    235        }
    236      }
    237    }
    238 
    239    // The most likely reason for a pong or registration request timing out is
    240    // that the socket has disconnected. Best to reconnect.
    241    if (requestTimedOut) {
    242      this._reconnect();
    243    }
    244  },
    245 
    246  get _UAID() {
    247    return prefs.getStringPref("userAgentID");
    248  },
    249 
    250  set _UAID(newID) {
    251    if (typeof newID !== "string") {
    252      lazy.console.warn(
    253        "Got invalid, non-string UAID",
    254        newID,
    255        "Not updating userAgentID"
    256      );
    257      return;
    258    }
    259    lazy.console.debug("New _UAID", newID);
    260    prefs.setStringPref("userAgentID", newID);
    261  },
    262 
    263  _ws: null,
    264  _pendingRequests: new Map(),
    265  _currentState: STATE_SHUT_DOWN,
    266  _requestTimeout: 0,
    267  _requestTimeoutTimer: null,
    268  _retryFailCount: 0,
    269 
    270  /**
    271   * According to the WS spec, servers should immediately close the underlying
    272   * TCP connection after they close a WebSocket. This causes wsOnStop to be
    273   * called with error NS_BASE_STREAM_CLOSED. Since the client has to keep the
    274   * WebSocket up, it should try to reconnect. But if the server closes the
    275   * WebSocket because it wants the client to back off, then the client
    276   * shouldn't re-establish the connection. If the server sends the backoff
    277   * close code, this field will be set to true in wsOnServerClose. It is
    278   * checked in wsOnStop.
    279   */
    280  _skipReconnect: false,
    281 
    282  /** Indicates whether the server supports Web Push-style message delivery. */
    283  _dataEnabled: false,
    284 
    285  /**
    286   * The last time the client sent a ping to the server. If non-zero, keeps the
    287   * request timeout timer active. Reset to zero when the server responds with
    288   * a pong or pending messages.
    289   */
    290  _lastPingTime: 0,
    291 
    292  /**
    293   * A one-shot timer used to ping the server, to avoid timing out idle
    294   * connections. Reset to the ping interval on each incoming message.
    295   */
    296  _pingTimer: null,
    297 
    298  /** A one-shot timer fired after the reconnect backoff period. */
    299  _backoffTimer: null,
    300 
    301  /**
    302   * Sends a message to the Push Server through an open websocket.
    303   * typeof(msg) shall be an object
    304   */
    305  _wsSendMessage(msg) {
    306    if (!this._ws) {
    307      lazy.console.warn(
    308        "wsSendMessage: No WebSocket initialized.",
    309        "Cannot send a message"
    310      );
    311      return;
    312    }
    313    msg = JSON.stringify(msg);
    314    lazy.console.debug("wsSendMessage: Sending message", msg);
    315    this._ws.sendMsg(msg);
    316  },
    317 
    318  init(options, mainPushService, serverURI) {
    319    lazy.console.debug("init()");
    320 
    321    this._mainPushService = mainPushService;
    322    this._serverURI = serverURI;
    323    // Filled in at connect() time
    324    this._broadcastListeners = null;
    325 
    326    // Override the default WebSocket factory function. The returned object
    327    // must be null or satisfy the nsIWebSocketChannel interface. Used by
    328    // the tests to provide a mock WebSocket implementation.
    329    if (options.makeWebSocket) {
    330      this._makeWebSocket = options.makeWebSocket;
    331    }
    332 
    333    this._requestTimeout = prefs.getIntPref("requestTimeout");
    334 
    335    return Promise.resolve();
    336  },
    337 
    338  _reconnect() {
    339    lazy.console.debug("reconnect()");
    340    this._shutdownWS(false);
    341    this._startBackoffTimer();
    342  },
    343 
    344  _shutdownWS(shouldCancelPending = true) {
    345    lazy.console.debug("shutdownWS()");
    346 
    347    if (this._currentState == STATE_READY) {
    348      prefs.removeObserver("userAgentID", this);
    349    }
    350 
    351    this._currentState = STATE_SHUT_DOWN;
    352    this._skipReconnect = false;
    353 
    354    if (this._wsListener) {
    355      this._wsListener._pushService = null;
    356    }
    357    try {
    358      this._ws.close(0, null);
    359    } catch (e) {}
    360    this._ws = null;
    361 
    362    this._lastPingTime = 0;
    363 
    364    if (this._pingTimer) {
    365      this._pingTimer.cancel();
    366    }
    367 
    368    if (shouldCancelPending) {
    369      this._cancelPendingRequests();
    370    }
    371 
    372    if (this._notifyRequestQueue) {
    373      this._notifyRequestQueue();
    374      this._notifyRequestQueue = null;
    375    }
    376  },
    377 
    378  uninit() {
    379    // All pending requests (ideally none) are dropped at this point. We
    380    // shouldn't have any applications performing registration/unregistration
    381    // or receiving notifications.
    382    this._shutdownWS();
    383 
    384    if (this._backoffTimer) {
    385      this._backoffTimer.cancel();
    386    }
    387    if (this._requestTimeoutTimer) {
    388      this._requestTimeoutTimer.cancel();
    389    }
    390 
    391    this._mainPushService = null;
    392 
    393    this._dataEnabled = false;
    394  },
    395 
    396  /**
    397   * How retries work: If the WS is closed due to a socket error,
    398   * _startBackoffTimer() is called. The retry timer is started and when
    399   * it times out, beginWSSetup() is called again.
    400   *
    401   * If we are in the middle of a timeout (i.e. waiting), but
    402   * a register/unregister is called, we don't want to wait around anymore.
    403   * _sendRequest will automatically call beginWSSetup(), which will cancel the
    404   * timer. In addition since the state will have changed, even if a pending
    405   * timer event comes in (because the timer fired the event before it was
    406   * cancelled), so the connection won't be reset.
    407   */
    408  _startBackoffTimer() {
    409    lazy.console.debug("startBackoffTimer()");
    410 
    411    // Calculate new timeout, but cap it to pingInterval.
    412    let retryTimeout =
    413      prefs.getIntPref("retryBaseInterval") * Math.pow(2, this._retryFailCount);
    414    retryTimeout = Math.min(retryTimeout, prefs.getIntPref("pingInterval"));
    415 
    416    this._retryFailCount++;
    417 
    418    lazy.console.debug(
    419      "startBackoffTimer: Retry in",
    420      retryTimeout,
    421      "Try number",
    422      this._retryFailCount
    423    );
    424 
    425    if (!this._backoffTimer) {
    426      this._backoffTimer = Cc["@mozilla.org/timer;1"].createInstance(
    427        Ci.nsITimer
    428      );
    429    }
    430    this._backoffTimer.init(this, retryTimeout, Ci.nsITimer.TYPE_ONE_SHOT);
    431  },
    432 
    433  /** Indicates whether we're waiting for pongs or requests. */
    434  _hasPendingRequests() {
    435    return this._lastPingTime > 0 || this._pendingRequests.size > 0;
    436  },
    437 
    438  /**
    439   * Starts the request timeout timer unless we're already waiting for a pong
    440   * or register request.
    441   */
    442  _startRequestTimeoutTimer() {
    443    if (this._hasPendingRequests()) {
    444      return;
    445    }
    446    if (!this._requestTimeoutTimer) {
    447      this._requestTimeoutTimer = Cc["@mozilla.org/timer;1"].createInstance(
    448        Ci.nsITimer
    449      );
    450    }
    451    this._requestTimeoutTimer.init(
    452      this,
    453      this._requestTimeout,
    454      Ci.nsITimer.TYPE_REPEATING_SLACK
    455    );
    456  },
    457 
    458  /** Starts or resets the ping timer. */
    459  _startPingTimer() {
    460    if (!this._pingTimer) {
    461      this._pingTimer = Cc["@mozilla.org/timer;1"].createInstance(Ci.nsITimer);
    462    }
    463    this._pingTimer.init(
    464      this,
    465      prefs.getIntPref("pingInterval"),
    466      Ci.nsITimer.TYPE_ONE_SHOT
    467    );
    468  },
    469 
    470  _makeWebSocket(uri) {
    471    if (!prefs.getBoolPref("connection.enabled")) {
    472      lazy.console.warn(
    473        "makeWebSocket: connection.enabled is not set to true.",
    474        "Aborting."
    475      );
    476      return null;
    477    }
    478    if (Services.io.offline) {
    479      lazy.console.warn("makeWebSocket: Network is offline.");
    480      return null;
    481    }
    482    let contractId =
    483      uri.scheme == "ws"
    484        ? "@mozilla.org/network/protocol;1?name=ws"
    485        : "@mozilla.org/network/protocol;1?name=wss";
    486    let socket = Cc[contractId].createInstance(Ci.nsIWebSocketChannel);
    487 
    488    socket.initLoadInfo(
    489      null, // aLoadingNode
    490      Services.scriptSecurityManager.getSystemPrincipal(),
    491      null, // aTriggeringPrincipal
    492      Ci.nsILoadInfo.SEC_ALLOW_CROSS_ORIGIN_SEC_CONTEXT_IS_NULL,
    493      Ci.nsIContentPolicy.TYPE_WEBSOCKET
    494    );
    495    // Allow deprecated HTTP request from SystemPrincipal
    496    socket.loadInfo.allowDeprecatedSystemRequests = true;
    497 
    498    return socket;
    499  },
    500 
    501  _beginWSSetup() {
    502    lazy.console.debug("beginWSSetup()");
    503    if (this._currentState != STATE_SHUT_DOWN) {
    504      lazy.console.error(
    505        "_beginWSSetup: Not in shutdown state! Current state",
    506        this._currentState
    507      );
    508      return;
    509    }
    510 
    511    // Stop any pending reconnects scheduled for the near future.
    512    if (this._backoffTimer) {
    513      this._backoffTimer.cancel();
    514    }
    515 
    516    let uri = this._serverURI;
    517    if (!uri) {
    518      return;
    519    }
    520    let socket = this._makeWebSocket(uri);
    521    if (!socket) {
    522      return;
    523    }
    524    this._ws = socket.QueryInterface(Ci.nsIWebSocketChannel);
    525 
    526    lazy.console.debug("beginWSSetup: Connecting to", uri.spec);
    527    this._wsListener = new PushWebSocketListener(this);
    528    this._ws.protocol = "push-notification";
    529 
    530    try {
    531      // Grab a wakelock before we open the socket to ensure we don't go to
    532      // sleep before connection the is opened.
    533      this._ws.asyncOpen(uri, uri.spec, {}, 0, this._wsListener, null);
    534      this._currentState = STATE_WAITING_FOR_WS_START;
    535    } catch (e) {
    536      lazy.console.error(
    537        "beginWSSetup: Error opening websocket.",
    538        "asyncOpen failed",
    539        e
    540      );
    541      this._reconnect();
    542    }
    543  },
    544 
    545  connect(broadcastListeners) {
    546    lazy.console.debug("connect()", broadcastListeners);
    547    this._broadcastListeners = broadcastListeners;
    548    this._beginWSSetup();
    549  },
    550 
    551  isConnected() {
    552    return !!this._ws;
    553  },
    554 
    555  /**
    556   * Protocol handler invoked by server message.
    557   */
    558  _handleHelloReply(reply) {
    559    lazy.console.debug("handleHelloReply()");
    560    if (this._currentState != STATE_WAITING_FOR_HELLO) {
    561      lazy.console.error(
    562        "handleHelloReply: Unexpected state",
    563        this._currentState,
    564        "(expected STATE_WAITING_FOR_HELLO)"
    565      );
    566      this._shutdownWS();
    567      return;
    568    }
    569 
    570    if (typeof reply.uaid !== "string") {
    571      lazy.console.error("handleHelloReply: Received invalid UAID", reply.uaid);
    572      this._shutdownWS();
    573      return;
    574    }
    575 
    576    if (reply.uaid === "") {
    577      lazy.console.error("handleHelloReply: Received empty UAID");
    578      this._shutdownWS();
    579      return;
    580    }
    581 
    582    // To avoid sticking extra large values sent by an evil server into prefs.
    583    if (reply.uaid.length > 128) {
    584      lazy.console.error(
    585        "handleHelloReply: UAID received from server was too long",
    586        reply.uaid
    587      );
    588      this._shutdownWS();
    589      return;
    590    }
    591 
    592    let sendRequests = () => {
    593      if (this._notifyRequestQueue) {
    594        this._notifyRequestQueue();
    595        this._notifyRequestQueue = null;
    596      }
    597      this._sendPendingRequests();
    598    };
    599 
    600    function finishHandshake() {
    601      this._UAID = reply.uaid;
    602      this._currentState = STATE_READY;
    603      prefs.addObserver("userAgentID", this);
    604 
    605      // Handle broadcasts received in response to the "hello" message.
    606      if (!lazy.ObjectUtils.isEmpty(reply.broadcasts)) {
    607        // The reply isn't technically a broadcast message, but it has
    608        // the shape of a broadcast message (it has a broadcasts field).
    609        const context = { phase: lazy.pushBroadcastService.PHASES.HELLO };
    610        this._mainPushService.receivedBroadcastMessage(reply, context);
    611      }
    612 
    613      this._dataEnabled = !!reply.use_webpush;
    614      if (this._dataEnabled) {
    615        this._mainPushService
    616          .getAllUnexpired()
    617          .then(records =>
    618            Promise.all(
    619              records.map(record =>
    620                this._mainPushService.ensureCrypto(record).catch(error => {
    621                  lazy.console.error(
    622                    "finishHandshake: Error updating record",
    623                    record.keyID,
    624                    error
    625                  );
    626                })
    627              )
    628            )
    629          )
    630          .then(sendRequests);
    631      } else {
    632        sendRequests();
    633      }
    634    }
    635 
    636    // By this point we've got a UAID from the server that we are ready to
    637    // accept.
    638    //
    639    // We unconditionally drop all existing registrations and notify service
    640    // workers if we receive a new UAID. This ensures we expunge all stale
    641    // registrations if the `userAgentID` pref is reset.
    642    if (this._UAID != reply.uaid) {
    643      lazy.console.debug("handleHelloReply: Received new UAID");
    644 
    645      this._mainPushService
    646        .dropUnexpiredRegistrations()
    647        .then(finishHandshake.bind(this));
    648 
    649      return;
    650    }
    651 
    652    // otherwise we are good to go
    653    finishHandshake.bind(this)();
    654  },
    655 
    656  /**
    657   * Protocol handler invoked by server message.
    658   */
    659  _handleRegisterReply(reply) {
    660    lazy.console.debug("handleRegisterReply()");
    661 
    662    let tmp = this._takeRequestForReply(reply);
    663    if (!tmp) {
    664      return;
    665    }
    666 
    667    if (reply.status == 200) {
    668      try {
    669        Services.io.newURI(reply.pushEndpoint);
    670      } catch (e) {
    671        tmp.reject(new Error("Invalid push endpoint: " + reply.pushEndpoint));
    672        return;
    673      }
    674 
    675      let record = new PushRecordWebSocket({
    676        channelID: reply.channelID,
    677        pushEndpoint: reply.pushEndpoint,
    678        scope: tmp.record.scope,
    679        originAttributes: tmp.record.originAttributes,
    680        version: null,
    681        systemRecord: tmp.record.systemRecord,
    682        appServerKey: tmp.record.appServerKey,
    683        ctime: Date.now(),
    684      });
    685      tmp.resolve(record);
    686    } else {
    687      lazy.console.error(
    688        "handleRegisterReply: Unexpected server response",
    689        reply
    690      );
    691      tmp.reject(
    692        new Error("Wrong status code for register reply: " + reply.status)
    693      );
    694    }
    695  },
    696 
    697  _handleUnregisterReply(reply) {
    698    lazy.console.debug("handleUnregisterReply()");
    699 
    700    let request = this._takeRequestForReply(reply);
    701    if (!request) {
    702      return;
    703    }
    704 
    705    let success = reply.status === 200;
    706    request.resolve(success);
    707  },
    708 
    709  _handleDataUpdate(update) {
    710    let promise;
    711    if (typeof update.channelID != "string") {
    712      lazy.console.warn(
    713        "handleDataUpdate: Discarding update without channel ID",
    714        update
    715      );
    716      return;
    717    }
    718    function updateRecord(record) {
    719      // Ignore messages that we've already processed. This can happen if the
    720      // connection drops between notifying the service worker and acking the
    721      // the message. In that case, the server will re-send the message on
    722      // reconnect.
    723      if (record.hasRecentMessageID(update.version)) {
    724        lazy.console.warn(
    725          "handleDataUpdate: Ignoring duplicate message",
    726          update.version
    727        );
    728        Glean.webPush.detectedDuplicatedMessageIds.add();
    729        return null;
    730      }
    731      record.noteRecentMessageID(update.version);
    732      return record;
    733    }
    734    if (typeof update.data != "string") {
    735      promise = this._mainPushService.receivedPushMessage(
    736        update.channelID,
    737        update.version,
    738        null,
    739        null,
    740        updateRecord
    741      );
    742    } else {
    743      let message = ChromeUtils.base64URLDecode(update.data, {
    744        // The Push server may append padding.
    745        padding: "ignore",
    746      });
    747      promise = this._mainPushService.receivedPushMessage(
    748        update.channelID,
    749        update.version,
    750        update.headers,
    751        message,
    752        updateRecord
    753      );
    754    }
    755    promise
    756      .then(
    757        status => {
    758          this._sendAck(update.channelID, update.version, status);
    759        },
    760        err => {
    761          lazy.console.error(
    762            "handleDataUpdate: Error delivering message",
    763            update,
    764            err
    765          );
    766          this._sendAck(
    767            update.channelID,
    768            update.version,
    769            Ci.nsIPushErrorReporter.ACK_DECRYPTION_ERROR
    770          );
    771        }
    772      )
    773      .catch(err => {
    774        lazy.console.error(
    775          "handleDataUpdate: Error acknowledging message",
    776          update,
    777          err
    778        );
    779      });
    780  },
    781 
    782  /**
    783   * Protocol handler invoked by server message.
    784   */
    785  _handleNotificationReply(reply) {
    786    lazy.console.debug("handleNotificationReply()");
    787    if (this._dataEnabled) {
    788      this._handleDataUpdate(reply);
    789      return;
    790    }
    791 
    792    if (typeof reply.updates !== "object") {
    793      lazy.console.warn(
    794        "handleNotificationReply: Missing updates",
    795        reply.updates
    796      );
    797      return;
    798    }
    799 
    800    lazy.console.debug("handleNotificationReply: Got updates", reply.updates);
    801    for (let i = 0; i < reply.updates.length; i++) {
    802      let update = reply.updates[i];
    803      lazy.console.debug("handleNotificationReply: Handling update", update);
    804      if (typeof update.channelID !== "string") {
    805        lazy.console.debug(
    806          "handleNotificationReply: Invalid update at index",
    807          i,
    808          update
    809        );
    810        continue;
    811      }
    812 
    813      if (update.version === undefined) {
    814        lazy.console.debug("handleNotificationReply: Missing version", update);
    815        continue;
    816      }
    817 
    818      let version = update.version;
    819 
    820      if (typeof version === "string") {
    821        version = parseInt(version, 10);
    822      }
    823 
    824      if (typeof version === "number" && version >= 0) {
    825        // FIXME(nsm): this relies on app update notification being infallible!
    826        // eventually fix this
    827        this._receivedUpdate(update.channelID, version);
    828      }
    829    }
    830  },
    831 
    832  _handleBroadcastReply(reply) {
    833    let phase = lazy.pushBroadcastService.PHASES.BROADCAST;
    834    // Check if this reply is the result of registration.
    835    for (const id of Object.keys(reply.broadcasts)) {
    836      const wasRegistering = this._currentlyRegistering.delete(id);
    837      if (wasRegistering) {
    838        // If we get multiple broadcasts and only one is "registering",
    839        // then we consider the phase to be REGISTER for all of them.
    840        // It is acceptable since registrations do not happen so often,
    841        // and are all very likely to occur soon after browser startup.
    842        phase = lazy.pushBroadcastService.PHASES.REGISTER;
    843      }
    844    }
    845    const context = { phase };
    846    this._mainPushService.receivedBroadcastMessage(reply, context);
    847  },
    848 
    849  reportDeliveryError(messageID, reason) {
    850    lazy.console.debug("reportDeliveryError()");
    851    let code = kDELIVERY_REASON_TO_CODE[reason];
    852    if (!code) {
    853      throw new Error("Invalid delivery error reason");
    854    }
    855    Glean.webPush.errorCode[kERROR_CODE_TO_GLEAN_LABEL[reason]].add();
    856    let data = { messageType: "nack", version: messageID, code };
    857    this._queueRequest(data);
    858  },
    859 
    860  _sendAck(channelID, version, status) {
    861    lazy.console.debug("sendAck()");
    862    let code = kACK_STATUS_TO_CODE[status];
    863    if (!code) {
    864      throw new Error("Invalid ack status");
    865    }
    866    if (code > 100) {
    867      Glean.webPush.errorCode[kERROR_CODE_TO_GLEAN_LABEL[status]].add();
    868    }
    869    let data = { messageType: "ack", updates: [{ channelID, version, code }] };
    870    this._queueRequest(data);
    871  },
    872 
    873  _generateID() {
    874    // generateUUID() gives a UUID surrounded by {...}, slice them off.
    875    return Services.uuid.generateUUID().toString().slice(1, -1);
    876  },
    877 
    878  register(record) {
    879    lazy.console.debug("register() ", record);
    880 
    881    let data = { channelID: this._generateID(), messageType: "register" };
    882 
    883    if (record.appServerKey) {
    884      data.key = ChromeUtils.base64URLEncode(record.appServerKey, {
    885        // The Push server requires padding.
    886        pad: true,
    887      });
    888    }
    889 
    890    return this._sendRequestForReply(record, data).then(requestRecord => {
    891      if (!this._dataEnabled) {
    892        return requestRecord;
    893      }
    894      return PushCrypto.generateKeys().then(([publicKey, privateKey]) => {
    895        requestRecord.p256dhPublicKey = publicKey;
    896        requestRecord.p256dhPrivateKey = privateKey;
    897        requestRecord.authenticationSecret =
    898          PushCrypto.generateAuthenticationSecret();
    899        return requestRecord;
    900      });
    901    });
    902  },
    903 
    904  unregister(record, reason) {
    905    lazy.console.debug("unregister() ", record, reason);
    906 
    907    return Promise.resolve().then(_ => {
    908      let code = kUNREGISTER_REASON_TO_CODE[reason];
    909      if (!code) {
    910        throw new Error("Invalid unregister reason");
    911      }
    912      let data = {
    913        channelID: record.channelID,
    914        messageType: "unregister",
    915        code,
    916      };
    917 
    918      return this._sendRequestForReply(record, data);
    919    });
    920  },
    921 
    922  _queueStart: Promise.resolve(),
    923  _notifyRequestQueue: null,
    924  _queue: null,
    925  _enqueue(op) {
    926    lazy.console.debug("enqueue()");
    927    if (!this._queue) {
    928      this._queue = this._queueStart;
    929    }
    930    this._queue = this._queue.then(op).catch(_ => {});
    931  },
    932 
    933  /** Sends a request to the server. */
    934  _send(data) {
    935    if (this._currentState != STATE_READY) {
    936      lazy.console.warn(
    937        "send: Unexpected state; ignoring message",
    938        this._currentState
    939      );
    940      return;
    941    }
    942    if (!this._requestHasReply(data)) {
    943      this._wsSendMessage(data);
    944      return;
    945    }
    946    // If we're expecting a reply, check that we haven't cancelled the request.
    947    let key = this._makePendingRequestKey(data);
    948    if (!this._pendingRequests.has(key)) {
    949      lazy.console.log("send: Request cancelled; ignoring message", key);
    950      return;
    951    }
    952    this._wsSendMessage(data);
    953  },
    954 
    955  /** Indicates whether a request has a corresponding reply from the server. */
    956  _requestHasReply(data) {
    957    return data.messageType == "register" || data.messageType == "unregister";
    958  },
    959 
    960  /**
    961   * Sends all pending requests that expect replies. Called after the connection
    962   * is established and the handshake is complete.
    963   */
    964  _sendPendingRequests() {
    965    this._enqueue(_ => {
    966      for (let request of this._pendingRequests.values()) {
    967        this._send(request.data);
    968      }
    969    });
    970  },
    971 
    972  /** Queues an outgoing request, establishing a connection if necessary. */
    973  _queueRequest(data) {
    974    lazy.console.debug("queueRequest()", data);
    975 
    976    if (this._currentState == STATE_READY) {
    977      // If we're ready, no need to queue; just send the request.
    978      this._send(data);
    979      return;
    980    }
    981 
    982    // Otherwise, we're still setting up. If we don't have a request queue,
    983    // make one now.
    984    if (!this._notifyRequestQueue) {
    985      let promise = new Promise(resolve => {
    986        this._notifyRequestQueue = resolve;
    987      });
    988      this._enqueue(_ => promise);
    989    }
    990 
    991    let isRequest = this._requestHasReply(data);
    992    if (!isRequest) {
    993      // Don't queue requests, since they're stored in `_pendingRequests`, and
    994      // `_sendPendingRequests` will send them after reconnecting. Without this
    995      // check, we'd send requests twice.
    996      this._enqueue(_ => this._send(data));
    997    }
    998 
    999    if (!this._ws) {
   1000      // This will end up calling notifyRequestQueue().
   1001      this._beginWSSetup();
   1002      // If beginWSSetup does not succeed to make ws, notifyRequestQueue will
   1003      // not be call.
   1004      if (!this._ws && this._notifyRequestQueue) {
   1005        this._notifyRequestQueue();
   1006        this._notifyRequestQueue = null;
   1007      }
   1008    }
   1009  },
   1010 
   1011  _receivedUpdate(aChannelID, aLatestVersion) {
   1012    lazy.console.debug(
   1013      "receivedUpdate: Updating",
   1014      aChannelID,
   1015      "->",
   1016      aLatestVersion
   1017    );
   1018 
   1019    this._mainPushService
   1020      .receivedPushMessage(aChannelID, "", null, null, record => {
   1021        if (record.version === null || record.version < aLatestVersion) {
   1022          lazy.console.debug(
   1023            "receivedUpdate: Version changed for",
   1024            aChannelID,
   1025            aLatestVersion
   1026          );
   1027          record.version = aLatestVersion;
   1028          return record;
   1029        }
   1030        lazy.console.debug(
   1031          "receivedUpdate: No significant version change for",
   1032          aChannelID,
   1033          aLatestVersion
   1034        );
   1035        return null;
   1036      })
   1037      .then(status => {
   1038        this._sendAck(aChannelID, aLatestVersion, status);
   1039      })
   1040      .catch(err => {
   1041        lazy.console.error(
   1042          "receivedUpdate: Error acknowledging message",
   1043          aChannelID,
   1044          aLatestVersion,
   1045          err
   1046        );
   1047      });
   1048  },
   1049 
   1050  // begin Push protocol handshake
   1051  _wsOnStart() {
   1052    lazy.console.debug("wsOnStart()");
   1053 
   1054    if (this._currentState != STATE_WAITING_FOR_WS_START) {
   1055      lazy.console.error(
   1056        "wsOnStart: NOT in STATE_WAITING_FOR_WS_START. Current",
   1057        "state",
   1058        this._currentState,
   1059        "Skipping"
   1060      );
   1061      return;
   1062    }
   1063 
   1064    this._mainPushService
   1065      .getAllUnexpired()
   1066      .then(
   1067        records => this._sendHello(records),
   1068        err => {
   1069          lazy.console.warn(
   1070            "Error fetching existing records before handshake; assuming none",
   1071            err
   1072          );
   1073          this._sendHello([]);
   1074        }
   1075      )
   1076      .catch(err => {
   1077        // If we failed to send the handshake, back off and reconnect.
   1078        lazy.console.warn("Failed to send handshake; reconnecting", err);
   1079        this._reconnect();
   1080      });
   1081  },
   1082 
   1083  /**
   1084   * Sends a `hello` handshake to the server.
   1085   *
   1086   * @param {Array<PushRecordWebSocket>} An array of records for existing
   1087   *        subscriptions, used to determine whether to rotate our UAID.
   1088   */
   1089  _sendHello(records) {
   1090    let data = {
   1091      messageType: "hello",
   1092      broadcasts: this._broadcastListeners,
   1093      use_webpush: true,
   1094    };
   1095 
   1096    if (records.length && this._UAID) {
   1097      // Only send our UAID if we have existing push subscriptions, to
   1098      // avoid tying a persistent identifier to the connection (bug
   1099      // 1617136). The push server will issue our client a new UAID in
   1100      // the `hello` response, which we'll store until either the next
   1101      // time we reconnect, or the user subscribes to push. Once we have a
   1102      // push subscription, we'll stop rotating the UAID when we connect,
   1103      // so that we can receive push messages for them.
   1104      data.uaid = this._UAID;
   1105    }
   1106 
   1107    this._wsSendMessage(data);
   1108    this._currentState = STATE_WAITING_FOR_HELLO;
   1109  },
   1110 
   1111  /**
   1112   * This statusCode is not the websocket protocol status code, but the TCP
   1113   * connection close status code.
   1114   *
   1115   * If we do not explicitly call ws.close() then statusCode is always
   1116   * NS_BASE_STREAM_CLOSED, even on a successful close.
   1117   */
   1118  _wsOnStop(context, statusCode) {
   1119    lazy.console.debug("wsOnStop()");
   1120 
   1121    if (statusCode != Cr.NS_OK && !this._skipReconnect) {
   1122      lazy.console.debug(
   1123        "wsOnStop: Reconnecting after socket error",
   1124        statusCode
   1125      );
   1126      this._reconnect();
   1127      return;
   1128    }
   1129 
   1130    this._shutdownWS();
   1131  },
   1132 
   1133  _wsOnMessageAvailable(context, message) {
   1134    lazy.console.debug("wsOnMessageAvailable()", message);
   1135 
   1136    // Clearing the last ping time indicates we're no longer waiting for a pong.
   1137    this._lastPingTime = 0;
   1138 
   1139    let reply;
   1140    try {
   1141      reply = JSON.parse(message);
   1142    } catch (e) {
   1143      lazy.console.warn("wsOnMessageAvailable: Invalid JSON", message, e);
   1144      return;
   1145    }
   1146 
   1147    // If we receive a message, we know the connection succeeded. Reset the
   1148    // connection attempt and ping interval counters.
   1149    this._retryFailCount = 0;
   1150 
   1151    let doNotHandle = false;
   1152    if (
   1153      message === "{}" ||
   1154      reply.messageType === undefined ||
   1155      reply.messageType === "ping" ||
   1156      typeof reply.messageType != "string"
   1157    ) {
   1158      lazy.console.debug("wsOnMessageAvailable: Pong received");
   1159      doNotHandle = true;
   1160    }
   1161 
   1162    // Reset the ping timer.  Note: This path is executed at every step of the
   1163    // handshake, so this timer does not need to be set explicitly at startup.
   1164    this._startPingTimer();
   1165 
   1166    // If it is a ping, do not handle the message.
   1167    if (doNotHandle) {
   1168      if (!this._hasPendingRequests()) {
   1169        this._requestTimeoutTimer.cancel();
   1170      }
   1171      return;
   1172    }
   1173 
   1174    // An allowlist of protocol handlers. Add to these if new messages are added
   1175    // in the protocol.
   1176    let handlers = [
   1177      "Hello",
   1178      "Register",
   1179      "Unregister",
   1180      "Notification",
   1181      "Broadcast",
   1182    ];
   1183 
   1184    // Build up the handler name to call from messageType.
   1185    // e.g. messageType == "register" -> _handleRegisterReply.
   1186    let handlerName =
   1187      reply.messageType[0].toUpperCase() +
   1188      reply.messageType.slice(1).toLowerCase();
   1189 
   1190    if (!handlers.includes(handlerName)) {
   1191      lazy.console.warn(
   1192        "wsOnMessageAvailable: No allowlisted handler",
   1193        handlerName,
   1194        "for message",
   1195        reply.messageType
   1196      );
   1197      return;
   1198    }
   1199 
   1200    let handler = "_handle" + handlerName + "Reply";
   1201 
   1202    if (typeof this[handler] !== "function") {
   1203      lazy.console.warn(
   1204        "wsOnMessageAvailable: Handler",
   1205        handler,
   1206        "allowlisted but not implemented"
   1207      );
   1208      return;
   1209    }
   1210 
   1211    this[handler](reply);
   1212  },
   1213 
   1214  /**
   1215   * The websocket should never be closed. Since we don't call ws.close(),
   1216   * _wsOnStop() receives error code NS_BASE_STREAM_CLOSED (see comment in that
   1217   * function), which calls reconnect and re-establishes the WebSocket
   1218   * connection.
   1219   *
   1220   * If the server requested that we back off, we won't reconnect until the
   1221   * next network state change event, or until we need to send a new register
   1222   * request.
   1223   */
   1224  _wsOnServerClose(context, aStatusCode, aReason) {
   1225    lazy.console.debug("wsOnServerClose()", aStatusCode, aReason);
   1226 
   1227    if (aStatusCode == kBACKOFF_WS_STATUS_CODE) {
   1228      lazy.console.debug("wsOnServerClose: Skipping automatic reconnect");
   1229      this._skipReconnect = true;
   1230    }
   1231  },
   1232 
   1233  /**
   1234   * Rejects all pending register requests with errors.
   1235   */
   1236  _cancelPendingRequests() {
   1237    for (let request of this._pendingRequests.values()) {
   1238      request.reject(new Error("Request aborted"));
   1239    }
   1240    this._pendingRequests.clear();
   1241  },
   1242 
   1243  /** Creates a case-insensitive map key for a request that expects a reply. */
   1244  _makePendingRequestKey(data) {
   1245    return (data.messageType + "|" + data.channelID).toLowerCase();
   1246  },
   1247 
   1248  /** Sends a request and waits for a reply from the server. */
   1249  _sendRequestForReply(record, data) {
   1250    return Promise.resolve().then(_ => {
   1251      // start the timer since we now have at least one request
   1252      this._startRequestTimeoutTimer();
   1253 
   1254      let key = this._makePendingRequestKey(data);
   1255      if (!this._pendingRequests.has(key)) {
   1256        let request = {
   1257          data,
   1258          record,
   1259          ctime: Date.now(),
   1260        };
   1261        request.promise = new Promise((resolve, reject) => {
   1262          request.resolve = resolve;
   1263          request.reject = reject;
   1264        });
   1265        this._pendingRequests.set(key, request);
   1266        this._queueRequest(data);
   1267      }
   1268 
   1269      return this._pendingRequests.get(key).promise;
   1270    });
   1271  },
   1272 
   1273  /** Removes and returns a pending request for a server reply. */
   1274  _takeRequestForReply(reply) {
   1275    if (typeof reply.channelID !== "string") {
   1276      return null;
   1277    }
   1278    let key = this._makePendingRequestKey(reply);
   1279    let request = this._pendingRequests.get(key);
   1280    if (!request) {
   1281      return null;
   1282    }
   1283    this._pendingRequests.delete(key);
   1284    if (!this._hasPendingRequests()) {
   1285      this._requestTimeoutTimer.cancel();
   1286    }
   1287    return request;
   1288  },
   1289 
   1290  sendSubscribeBroadcast(serviceId, version) {
   1291    this._currentlyRegistering.add(serviceId);
   1292    let data = {
   1293      messageType: "broadcast_subscribe",
   1294      broadcasts: {
   1295        [serviceId]: version,
   1296      },
   1297    };
   1298 
   1299    this._queueRequest(data);
   1300  },
   1301 };
   1302 
   1303 function PushRecordWebSocket(record) {
   1304  PushRecord.call(this, record);
   1305  this.channelID = record.channelID;
   1306  this.version = record.version;
   1307 }
   1308 
   1309 PushRecordWebSocket.prototype = Object.create(PushRecord.prototype, {
   1310  keyID: {
   1311    get() {
   1312      return this.channelID;
   1313    },
   1314  },
   1315 });
   1316 
   1317 PushRecordWebSocket.prototype.toSubscription = function () {
   1318  let subscription = PushRecord.prototype.toSubscription.call(this);
   1319  subscription.version = this.version;
   1320  return subscription;
   1321 };