tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

DataChannel.cpp (60053B)


      1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
      2 /* vim: set ts=2 et sw=2 tw=80: */
      3 /* This Source Code Form is subject to the terms of the Mozilla Public
      4 * License, v. 2.0. If a copy of the MPL was not distributed with this file,
      5 * You can obtain one at http://mozilla.org/MPL/2.0/. */
      6 
      7 #include <algorithm>
      8 #include <stdio.h>
      9 
     10 #ifdef XP_WIN
     11 #  include <winsock.h>  // for htonl, htons, ntohl, ntohs
     12 #endif
     13 
     14 #include "nsIInputStream.h"
     15 #include "nsIPrefBranch.h"
     16 #include "nsIPrefService.h"
     17 #include "mozilla/Sprintf.h"
     18 #include "nsProxyRelease.h"
     19 #include "nsThread.h"
     20 #include "nsThreadUtils.h"
     21 #include "nsNetUtil.h"
     22 #include "mozilla/Components.h"
     23 #include "mozilla/StaticMutex.h"
     24 #include "mozilla/UniquePtrExtensions.h"
     25 #include "mozilla/dom/RTCDataChannel.h"
     26 #include "mozilla/dom/RTCDataChannelBinding.h"
     27 #ifdef MOZ_PEERCONNECTION
     28 #  include "transport/runnable_utils.h"
     29 #  include "jsapi/MediaTransportHandler.h"
     30 #  include "mediapacket.h"
     31 #endif
     32 
     33 #include "DataChannel.h"
     34 #include "DataChannelDcSctp.h"
     35 #include "DataChannelUsrsctp.h"
     36 #include "DataChannelLog.h"
     37 #include "DataChannelProtocol.h"
     38 
     39 namespace mozilla {
     40 
     41 LazyLogModule gDataChannelLog("DataChannel");
     42 
     43 OutgoingMsg::OutgoingMsg(nsACString&& aData,
     44                         const DataChannelMessageMetadata& aMetadata)
     45    : mData(std::move(aData)), mMetadata(aMetadata) {}
     46 
     47 void OutgoingMsg::Advance(size_t offset) {
     48  mPos += offset;
     49  if (mPos > mData.Length()) {
     50    mPos = mData.Length();
     51  }
     52 }
     53 
     54 DataChannelConnection::~DataChannelConnection() {
     55  DC_INFO(("%p: Deleting DataChannelConnection", this));
     56  // This may die on the MainThread, or on the STS thread, or on an
     57  // sctp thread if we were in a callback when the DOM side shut things down.
     58  MOZ_ASSERT(mState == DataChannelConnectionState::Closed);
     59  MOZ_ASSERT(mPending.empty());
     60 
     61  if (!mSTS->IsOnCurrentThread()) {
     62    // We may be on MainThread *or* on an sctp thread (being called from
     63    // receive_cb() or SendSctpPacket())
     64    if (mInternalIOThread) {
     65      // Avoid spinning the event thread from here (which if we're mainthread
     66      // is in the event loop already)
     67      nsCOMPtr<nsIRunnable> r = WrapRunnable(
     68          nsCOMPtr<nsIThread>(mInternalIOThread), &nsIThread::AsyncShutdown);
     69      mSTS->Dispatch(r.forget(), NS_DISPATCH_FALLIBLE);
     70    }
     71  } else {
     72    // on STS, safe to call shutdown
     73    if (mInternalIOThread) {
     74      mInternalIOThread->Shutdown();
     75    }
     76  }
     77 }
     78 
     79 void DataChannelConnection::Destroy() {
     80  MOZ_ASSERT(NS_IsMainThread());
     81  DC_INFO(("%p: Destroying DataChannelConnection", this));
     82  CloseAll();
     83 #ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
     84  MOZ_DIAGNOSTIC_ASSERT(mSTS);
     85 #endif
     86  mListener = nullptr;
     87  mSTS->Dispatch(
     88      NS_NewCancelableRunnableFunction(
     89          __func__,
     90          [this, self = RefPtr<DataChannelConnection>(this)]() {
     91            mPacketReceivedListener.DisconnectIfExists();
     92            mStateChangeListener.DisconnectIfExists();
     93 #ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
     94            mShutdown = true;
     95            DC_INFO(("Shutting down connection %p, id %p", this, (void*)mId));
     96 #endif
     97          }),
     98      NS_DISPATCH_FALLIBLE);
     99 }
    100 
    101 Maybe<RefPtr<DataChannelConnection>> DataChannelConnection::Create(
    102    DataChannelConnection::DataConnectionListener* aListener,
    103    nsISerialEventTarget* aTarget, MediaTransportHandler* aHandler,
    104    const uint16_t aLocalPort, const uint16_t aNumStreams) {
    105  MOZ_ASSERT(NS_IsMainThread());
    106 
    107  RefPtr<DataChannelConnection> connection;
    108  if (Preferences::GetBool("media.peerconnection.sctp.use_dcsctp", false)) {
    109    connection = new DataChannelConnectionDcSctp(aListener, aTarget,
    110                                                 aHandler);  // Walks into a bar
    111  } else {
    112    connection = new DataChannelConnectionUsrsctp(
    113        aListener, aTarget, aHandler);  // Walks into a bar
    114  }
    115  return connection->Init(aLocalPort, aNumStreams) ? Some(connection)
    116                                                   : Nothing();
    117 }
    118 
    119 DataChannelConnection::DataChannelConnection(
    120    DataChannelConnection::DataConnectionListener* aListener,
    121    nsISerialEventTarget* aTarget, MediaTransportHandler* aHandler)
    122    : NeckoTargetHolder(aTarget),
    123      mListener(aListener),
    124      mTransportHandler(aHandler) {
    125  MOZ_ASSERT(NS_IsMainThread());
    126  DC_VERBOSE(
    127      ("%p: DataChannelConnection c'tor, listener=%p", this, mListener.get()));
    128 
    129  // XXX FIX! make this a global we get once
    130  // Find the STS thread
    131  nsresult rv;
    132  mSTS = mozilla::components::SocketTransport::Service(&rv);
    133  MOZ_ASSERT(NS_SUCCEEDED(rv));
    134 
    135 #ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
    136  mShutdown = false;
    137 #endif
    138 }
    139 
    140 // Only called on MainThread, mMaxMessageSize is read on other threads
    141 void DataChannelConnection::SetMaxMessageSize(uint64_t aMaxMessageSize) {
    142  MOZ_ASSERT(NS_IsMainThread());
    143 
    144  mMaxMessageSize = aMaxMessageSize;
    145 
    146  nsresult rv;
    147  nsCOMPtr<nsIPrefService> prefs;
    148  prefs = mozilla::components::Preferences::Service(&rv);
    149  if (!NS_WARN_IF(NS_FAILED(rv))) {
    150    nsCOMPtr<nsIPrefBranch> branch = do_QueryInterface(prefs);
    151 
    152    if (branch) {
    153      int32_t temp;
    154      if (!NS_FAILED(branch->GetIntPref(
    155              "media.peerconnection.sctp.force_maximum_message_size", &temp))) {
    156        if (temp > 0 && (uint64_t)temp < mMaxMessageSize) {
    157          mMaxMessageSize = (uint64_t)temp;
    158        }
    159      }
    160    }
    161  }
    162 
    163  // Fix remote MMS. This code exists, so future implementations of
    164  // RTCSctpTransport.maxMessageSize can simply provide that value from
    165  // GetMaxMessageSize.
    166 
    167  // TODO: Bug 1382779, once resolved, can be increased to
    168  // min(Uint8ArrayMaxSize, UINT32_MAX)
    169  // TODO: Bug 1381146, once resolved, can be increased to whatever we support
    170  // then (hopefully
    171  //       SIZE_MAX)
    172  if (mMaxMessageSize == 0 ||
    173      mMaxMessageSize > WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_REMOTE) {
    174    mMaxMessageSize = WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_REMOTE;
    175  }
    176 
    177  DC_DEBUG(("%p: Maximum message size (outgoing data): %" PRIu64
    178            " (enforced=%s)",
    179            this, mMaxMessageSize,
    180            aMaxMessageSize != mMaxMessageSize ? "yes" : "no"));
    181 
    182  for (auto& channel : mChannels.GetAll()) {
    183    channel->SetMaxMessageSize(GetMaxMessageSize());
    184  }
    185 }
    186 
    187 double DataChannelConnection::GetMaxMessageSize() {
    188  MOZ_ASSERT(NS_IsMainThread());
    189  if (mMaxMessageSize) {
    190    return static_cast<double>(mMaxMessageSize);
    191  }
    192 
    193  return std::numeric_limits<double>::infinity();
    194 }
    195 
    196 RefPtr<DataChannelConnection::StatsPromise> DataChannelConnection::GetStats(
    197    const DOMHighResTimeStamp aTimestamp) const {
    198  MOZ_ASSERT(NS_IsMainThread());
    199  nsTArray<RefPtr<DataChannelStatsPromise>> statsPromises;
    200  for (const RefPtr<DataChannel>& chan : mChannels.GetAll()) {
    201    if (chan) {
    202      RefPtr<DataChannelStatsPromise> statsPromise(chan->GetStats(aTimestamp));
    203      if (statsPromise) {
    204        statsPromises.AppendElement(std::move(statsPromise));
    205      }
    206    }
    207  }
    208 
    209  return DataChannelStatsPromise::All(GetMainThreadSerialEventTarget(),
    210                                      statsPromises);
    211 }
    212 
    213 RefPtr<DataChannelStatsPromise> DataChannel::GetStats(
    214    const DOMHighResTimeStamp aTimestamp) {
    215  MOZ_ASSERT(NS_IsMainThread());
    216 
    217  return InvokeAsync(mDomEventTarget, __func__,
    218                     [this, self = RefPtr<DataChannel>(this), aTimestamp] {
    219                       if (!GetDomDataChannel()) {
    220                         // Empty stats object, I guess... too late to
    221                         // return a nullptr and rejecting will trash stats
    222                         // promises for all other datachannels.
    223                         return DataChannelStatsPromise::CreateAndResolve(
    224                             dom::RTCDataChannelStats(), __func__);
    225                       }
    226                       return DataChannelStatsPromise::CreateAndResolve(
    227                           GetDomDataChannel()->GetStats(aTimestamp), __func__);
    228                     });
    229 }
    230 
    231 bool DataChannelConnection::ConnectToTransport(const std::string& aTransportId,
    232                                               const bool aClient,
    233                                               const uint16_t aLocalPort,
    234                                               const uint16_t aRemotePort) {
    235  MOZ_ASSERT(NS_IsMainThread());
    236 
    237  static const auto paramString =
    238      [](const std::string& tId, const Maybe<bool>& client,
    239         const uint16_t localPort, const uint16_t remotePort) -> std::string {
    240    std::ostringstream stream;
    241    stream << "Transport ID: '" << tId << "', Role: '"
    242           << (client ? (client.value() ? "client" : "server") : "")
    243           << "', Local Port: '" << localPort << "', Remote Port: '"
    244           << remotePort << "'";
    245    return stream.str();
    246  };
    247 
    248  const auto params =
    249      paramString(aTransportId, Some(aClient), aLocalPort, aRemotePort);
    250  DC_INFO(
    251      ("%p: ConnectToTransport connecting DTLS transport with parameters: %s",
    252       this, params.c_str()));
    253 
    254  DC_INFO(("%p: New transport parameters: %s", this, params.c_str()));
    255  if (NS_WARN_IF(aTransportId.empty())) {
    256    return false;
    257  }
    258 
    259  if (!mAllocateEven.isSome()) {
    260    // Do this stuff once.
    261    mLocalPort = aLocalPort;
    262    mRemotePort = aRemotePort;
    263    mAllocateEven = Some(aClient);
    264    nsTArray<RefPtr<DataChannel>> hasStreamId;
    265    // Could be faster. Probably doesn't matter.
    266    while (auto channel = mChannels.Get(INVALID_STREAM)) {
    267      mChannels.Remove(channel);
    268      auto id = FindFreeStream();
    269      if (id != INVALID_STREAM) {
    270        channel->SetStream(id);
    271        mChannels.Insert(channel);
    272        DC_DEBUG(("%p: Inserting auto-selected id %u for channel %p", this,
    273                  static_cast<unsigned>(id), channel.get()));
    274        mStreamIds.InsertElementSorted(id);
    275        hasStreamId.AppendElement(std::move(channel));
    276      } else {
    277        DC_WARN(("%p: Could not find id for channel %p, calling AnnounceClosed",
    278                 this, channel.get()));
    279        // Spec language is very similar to AnnounceClosed, the differences
    280        // being a lack of a closed check at the top, a different error event,
    281        // and no removal of the channel from the [[DataChannels]] slot.
    282        // We don't support firing errors right now, and we probabaly want the
    283        // closed check anyway, and we don't really have something equivalent
    284        // to the [[DataChannels]] slot, so just use AnnounceClosed for now.
    285        channel->AnnounceClosed();
    286      }
    287    }
    288 
    289    mSTS->Dispatch(NS_NewCancelableRunnableFunction(
    290                       __func__,
    291                       [this, self = RefPtr<DataChannelConnection>(this),
    292                        hasStreamId = std::move(hasStreamId)]() {
    293                         SetState(DataChannelConnectionState::Connecting);
    294                         for (auto& channel : hasStreamId) {
    295                           OpenFinish(std::move(channel));
    296                         }
    297                       }),
    298                   NS_DISPATCH_FALLIBLE);
    299  }
    300 
    301  // We do not check whether this is a new transport id here, that happens on
    302  // STS.
    303  RUN_ON_THREAD(mSTS,
    304                WrapRunnable(RefPtr<DataChannelConnection>(this),
    305                             &DataChannelConnection::SetSignals, aTransportId),
    306                NS_DISPATCH_NORMAL);
    307  return true;
    308 }
    309 
    310 void DataChannelConnection::SetSignals(const std::string& aTransportId) {
    311  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    312  if (mTransportId == aTransportId) {
    313    // Nothing to do!
    314    return;
    315  }
    316 
    317  mTransportId = aTransportId;
    318 
    319  if (!mConnectedToTransportHandler) {
    320    mPacketReceivedListener =
    321        mTransportHandler->GetSctpPacketReceived().Connect(
    322            mSTS, this, &DataChannelConnection::OnPacketReceived);
    323    mStateChangeListener = mTransportHandler->GetStateChange().Connect(
    324        mSTS, this, &DataChannelConnection::TransportStateChange);
    325    mConnectedToTransportHandler = true;
    326  }
    327  // SignalStateChange() doesn't call you with the initial state
    328  if (mTransportHandler->GetState(mTransportId, false) ==
    329      TransportLayer::TS_OPEN) {
    330    DC_DEBUG(("%p: Setting transport signals, dtls already open", this));
    331    OnTransportReady();
    332  } else {
    333    DC_DEBUG(("%p: Setting transport signals, dtls not open yet", this));
    334  }
    335 }
    336 
    337 void DataChannelConnection::TransportStateChange(
    338    const std::string& aTransportId, TransportLayer::State aState) {
    339  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    340  if (aTransportId == mTransportId) {
    341    if (aState == TransportLayer::TS_OPEN) {
    342      DC_DEBUG(("%p: Transport is open!", this));
    343      OnTransportReady();
    344    } else if (aState == TransportLayer::TS_CLOSED ||
    345               aState == TransportLayer::TS_NONE ||
    346               aState == TransportLayer::TS_ERROR) {
    347      DC_DEBUG(("%p: Transport is closed!", this));
    348      CloseAll_s();
    349    }
    350  }
    351 }
    352 
    353 // Process any pending Opens
    354 void DataChannelConnection::ProcessQueuedOpens() {
    355  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    356  std::set<RefPtr<DataChannel>> temp(std::move(mPending));
    357  // Technically in an unspecified state, although no reasonable impl will leave
    358  // anything in here.
    359  mPending.clear();
    360  for (auto channel : temp) {
    361    DC_DEBUG(("%p: Processing queued open for %p (%u)", this, channel.get(),
    362              channel->mStream));
    363    OpenFinish(channel);  // may end up back in mPending
    364  }
    365 }
    366 
    367 void DataChannelConnection::OnPacketReceived(const std::string& aTransportId,
    368                                             const MediaPacket& packet) {
    369  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    370  if (packet.type() == MediaPacket::SCTP && mTransportId == aTransportId) {
    371    OnSctpPacketReceived(packet);
    372  }
    373 }
    374 
    375 void DataChannelConnection::SendPacket(std::unique_ptr<MediaPacket>&& packet) {
    376  mSTS->Dispatch(NS_NewCancelableRunnableFunction(
    377                     "DataChannelConnection::SendPacket",
    378                     [this, self = RefPtr<DataChannelConnection>(this),
    379                      packet = std::move(packet)]() mutable {
    380                       // DC_DEBUG(("%p: SCTP/DTLS sent %ld bytes",
    381                       // this, len));
    382                       if (!mTransportId.empty() && mTransportHandler) {
    383                         mTransportHandler->SendPacket(mTransportId,
    384                                                       std::move(*packet));
    385                       }
    386                     }),
    387                 NS_DISPATCH_FALLIBLE);
    388 }
    389 
    390 already_AddRefed<DataChannel> DataChannelConnection::FindChannelByStream(
    391    uint16_t stream) {
    392  return mChannels.Get(stream).forget();
    393 }
    394 
    395 uint16_t DataChannelConnection::FindFreeStream() const {
    396  MOZ_ASSERT(NS_IsMainThread());
    397 
    398  MOZ_ASSERT(mAllocateEven.isSome());
    399  if (!mAllocateEven.isSome()) {
    400    return INVALID_STREAM;
    401  }
    402 
    403  uint16_t i = (*mAllocateEven ? 0 : 1);
    404 
    405  // Find the lowest odd/even id that is not present in mStreamIds
    406  for (auto id : mStreamIds) {
    407    if (i >= MAX_NUM_STREAMS) {
    408      return INVALID_STREAM;
    409    }
    410 
    411    if (id == i) {
    412      // i is in use, try the next one
    413      i += 2;
    414    } else if (id > i) {
    415      // i is definitely not in use
    416      break;
    417    }
    418  }
    419 
    420  return i;
    421 }
    422 
    423 // Returns a POSIX error code.
    424 int DataChannelConnection::SendControlMessage(DataChannel& aChannel,
    425                                              const uint8_t* data,
    426                                              uint32_t len) {
    427  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    428  // Create message instance and send
    429  // Note: Main-thread IO, but doesn't block
    430 #if (UINT32_MAX > SIZE_MAX)
    431  if (len > SIZE_MAX) {
    432    return EMSGSIZE;
    433  }
    434 #endif
    435 
    436  DataChannelMessageMetadata metadata(aChannel.mStream,
    437                                      DATA_CHANNEL_PPID_CONTROL, false);
    438  nsCString buffer(reinterpret_cast<const char*>(data), len);
    439  OutgoingMsg msg(std::move(buffer), metadata);
    440 
    441  return SendMessage(aChannel, std::move(msg));
    442 }
    443 
    444 // Returns a POSIX error code.
    445 int DataChannelConnection::SendOpenAckMessage(DataChannel& aChannel) {
    446  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    447  DC_INFO(("%p: Sending DataChannel open ack, channel %p", this, &aChannel));
    448  struct rtcweb_datachannel_ack ack = {};
    449  ack.msg_type = DATA_CHANNEL_ACK;
    450 
    451  return SendControlMessage(aChannel, (const uint8_t*)&ack, sizeof(ack));
    452 }
    453 
    454 // Returns a POSIX error code.
    455 int DataChannelConnection::SendOpenRequestMessage(DataChannel& aChannel) {
    456  DC_INFO(
    457      ("%p: Sending DataChannel open request, channel %p", this, &aChannel));
    458  const nsACString& label = aChannel.mLabel;
    459  const nsACString& protocol = aChannel.mProtocol;
    460  const bool unordered = !aChannel.mOrdered;
    461  const DataChannelReliabilityPolicy prPolicy = aChannel.mPrPolicy;
    462  const uint32_t prValue = aChannel.mPrValue;
    463 
    464  const size_t label_len = label.Length();     // not including nul
    465  const size_t proto_len = protocol.Length();  // not including nul
    466  // careful - request struct include one char for the label
    467  const size_t req_size = sizeof(struct rtcweb_datachannel_open_request) - 1 +
    468                          label_len + proto_len;
    469  UniqueFreePtr<struct rtcweb_datachannel_open_request> req(
    470      (struct rtcweb_datachannel_open_request*)moz_xmalloc(req_size));
    471 
    472  memset(req.get(), 0, req_size);
    473  req->msg_type = DATA_CHANNEL_OPEN_REQUEST;
    474  switch (prPolicy) {
    475    case DataChannelReliabilityPolicy::Reliable:
    476      req->channel_type = DATA_CHANNEL_RELIABLE;
    477      break;
    478    case DataChannelReliabilityPolicy::LimitedLifetime:
    479      req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_TIMED;
    480      break;
    481    case DataChannelReliabilityPolicy::LimitedRetransmissions:
    482      req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT;
    483      break;
    484    default:
    485      return EINVAL;
    486  }
    487  if (unordered) {
    488    // Per the current types, all differ by 0x80 between ordered and unordered
    489    req->channel_type |=
    490        0x80;  // NOTE: be careful if new types are added in the future
    491  }
    492 
    493  req->reliability_param = htonl(prValue);
    494  req->priority = htons(0); /* XXX: add support */
    495  req->label_length = htons(label_len);
    496  req->protocol_length = htons(proto_len);
    497  memcpy(&req->label[0], PromiseFlatCString(label).get(), label_len);
    498  memcpy(&req->label[label_len], PromiseFlatCString(protocol).get(), proto_len);
    499 
    500  // TODO: req_size is an int... that looks hairy
    501  return SendControlMessage(aChannel, (const uint8_t*)req.get(), req_size);
    502 }
    503 
    504 // Caller must ensure that length <= SIZE_MAX
    505 void DataChannelConnection::HandleOpenRequestMessage(
    506    const struct rtcweb_datachannel_open_request* req, uint32_t length,
    507    uint16_t stream) {
    508  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    509  DataChannelReliabilityPolicy prPolicy;
    510 
    511  const size_t requiredLength = (sizeof(*req) - 1) + ntohs(req->label_length) +
    512                                ntohs(req->protocol_length);
    513  if (((size_t)length) != requiredLength) {
    514    if (((size_t)length) < requiredLength) {
    515      DC_ERROR(
    516          ("%p: insufficient length: %u, should be %zu. Unable to continue.",
    517           this, length, requiredLength));
    518      return;
    519    }
    520    DC_WARN(("%p: Inconsistent length: %u, should be %zu", this, length,
    521             requiredLength));
    522  }
    523 
    524  DC_DEBUG(("%p: length %u, sizeof(*req) = %zu", this, length, sizeof(*req)));
    525 
    526  switch (req->channel_type) {
    527    case DATA_CHANNEL_RELIABLE:
    528    case DATA_CHANNEL_RELIABLE_UNORDERED:
    529      prPolicy = DataChannelReliabilityPolicy::Reliable;
    530      break;
    531    case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
    532    case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT_UNORDERED:
    533      prPolicy = DataChannelReliabilityPolicy::LimitedRetransmissions;
    534      break;
    535    case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
    536    case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED_UNORDERED:
    537      prPolicy = DataChannelReliabilityPolicy::LimitedLifetime;
    538      break;
    539    default:
    540      DC_ERROR(("%p: Unknown channel type %d", this, req->channel_type));
    541      /* XXX error handling */
    542      return;
    543  }
    544 
    545  if (stream >= mNegotiatedIdLimit) {
    546    DC_ERROR(
    547        ("%p: stream %u out of bounds (%u)", this, stream, mNegotiatedIdLimit));
    548    return;
    549  }
    550 
    551  const uint32_t prValue = ntohl(req->reliability_param);
    552  const bool ordered = !(req->channel_type & 0x80);
    553  const nsCString label(&req->label[0], ntohs(req->label_length));
    554  const nsCString protocol(&req->label[ntohs(req->label_length)],
    555                           ntohs(req->protocol_length));
    556 
    557  // Always dispatch this to mainthread; this is a brand new datachannel, which
    558  // has not had any opportunity to be transferred to a worker.
    559  Dispatch(
    560      NS_NewCancelableRunnableFunction(
    561          "DataChannelConnection::HandleOpenRequestMessage",
    562          [this, self = RefPtr<DataChannelConnection>(this), stream, prPolicy,
    563           prValue, ordered, label, protocol]() {
    564            RefPtr<DataChannel> channel = FindChannelByStream(stream);
    565            if (channel) {
    566              if (!channel->mNegotiated) {
    567                DC_ERROR((
    568                    "HandleOpenRequestMessage: channel for pre-existing stream "
    569                    "%u that was not externally negotiated. JS is lying to us, "
    570                    "or there's an id collision.",
    571                    stream));
    572                /* XXX: some error handling */
    573              } else {
    574                DC_DEBUG(("Open for externally negotiated channel %u", stream));
    575                // XXX should also check protocol, maybe label
    576                if (prPolicy != channel->mPrPolicy ||
    577                    prValue != channel->mPrValue ||
    578                    ordered != channel->mOrdered) {
    579                  DC_WARN(
    580                      ("external negotiation mismatch with OpenRequest:"
    581                       "channel %u, policy %s/%s, value %u/%u, ordered %d/%d",
    582                       stream, ToString(prPolicy), ToString(channel->mPrPolicy),
    583                       prValue, channel->mPrValue, static_cast<int>(ordered),
    584                       static_cast<int>(channel->mOrdered)));
    585                }
    586              }
    587              return;
    588            }
    589            channel = new DataChannel(this, stream, label, protocol, prPolicy,
    590                                      prValue, ordered, false);
    591            mChannels.Insert(channel);
    592            mStreamIds.InsertElementSorted(stream);
    593 
    594            DC_INFO(("%p: sending ON_CHANNEL_CREATED for %p/%s/%s: %u", this,
    595                     channel.get(), channel->mLabel.get(),
    596                     channel->mProtocol.get(), stream));
    597 
    598            // Awkward. If we convert over to using Maybe for this in
    599            // DataChannel, we won't need to have this extra conversion, since
    600            // Nullable converts easily to Maybe.
    601            dom::Nullable<uint16_t> maxLifeTime;
    602            dom::Nullable<uint16_t> maxRetransmits;
    603            if (prPolicy == DataChannelReliabilityPolicy::LimitedLifetime) {
    604              maxLifeTime.SetValue(std::min(
    605                  std::numeric_limits<uint16_t>::max(), (uint16_t)prValue));
    606            } else if (prPolicy ==
    607                       DataChannelReliabilityPolicy::LimitedRetransmissions) {
    608              maxRetransmits.SetValue(std::min(
    609                  std::numeric_limits<uint16_t>::max(), (uint16_t)prValue));
    610            }
    611 
    612            if (mListener) {
    613              // important to give it an already_AddRefed pointer!
    614              // TODO(bug 1974443): Have nsDOMDataChannel create the DataChannel
    615              // object, or have DataChannel take an nsDOMDataChannel, to avoid
    616              // passing this param list more than once?
    617              mListener->NotifyDataChannel(do_AddRef(channel), label, ordered,
    618                                           maxLifeTime, maxRetransmits,
    619                                           protocol, false);
    620              // Spec says to queue this in the queued task for ondatachannel
    621              channel->AnnounceOpen();
    622            }
    623 
    624            mSTS->Dispatch(
    625                NS_NewCancelableRunnableFunction(
    626                    "DataChannelConnection::HandleOpenRequestMessage",
    627                    [this, self = RefPtr<DataChannelConnection>(this),
    628                     channel = std::move(channel)]() {
    629                      // Note that any message can be buffered;
    630                      // SendOpenAckMessage may error later than this check.
    631                      const auto error = SendOpenAckMessage(*channel);
    632                      if (error) {
    633                        DC_ERROR(
    634                            ("%p: SendOpenAckMessage failed, channel %p, error "
    635                             "= %d",
    636                             this, channel.get(), error));
    637                        FinishClose_s(channel);
    638                        return;
    639                      }
    640                      channel->mWaitingForAck = false;
    641                      channel->mSendStreamNeedsReset = true;
    642                      channel->mRecvStreamNeedsReset = true;
    643                      OnStreamOpen(channel->mStream);
    644                    }),
    645                NS_DISPATCH_FALLIBLE);
    646          }),
    647      NS_DISPATCH_FALLIBLE);
    648 }
    649 
    650 // Caller must ensure that length <= SIZE_MAX
    651 void DataChannelConnection::HandleOpenAckMessage(
    652    const struct rtcweb_datachannel_ack* ack, uint32_t length,
    653    uint16_t stream) {
    654  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    655 
    656  RefPtr<DataChannel> channel = FindChannelByStream(stream);
    657  if (NS_WARN_IF(!channel)) {
    658    return;
    659  }
    660 
    661  DC_INFO(("%p: OpenAck received for channel %p, stream %u, waiting=%d", this,
    662           channel.get(), stream, channel->mWaitingForAck ? 1 : 0));
    663 
    664  channel->mWaitingForAck = false;
    665 
    666  // Either externally negotiated or we sent Open
    667  channel->AnnounceOpen();
    668  OnStreamOpen(stream);
    669 }
    670 
    671 // Caller must ensure that length <= SIZE_MAX
    672 void DataChannelConnection::HandleUnknownMessage(uint32_t ppid, uint32_t length,
    673                                                 uint16_t stream) {
    674  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    675  /* XXX: Send an error message? */
    676  DC_ERROR(("%p: unknown DataChannel message received: %u, len %u on stream %d",
    677            this, ppid, length, stream));
    678  // XXX Log to JS error console if possible
    679 }
    680 
    681 void DataChannelConnection::HandleDataMessage(IncomingMsg&& aMsg) {
    682  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    683 
    684  RefPtr<DataChannel> channel = FindChannelByStream(aMsg.GetStreamId());
    685  if (!channel) {
    686    MOZ_ASSERT(false,
    687               "Wait until OnStreamOpen is called before calling "
    688               "HandleDataMessage!");
    689    return;
    690  }
    691 
    692  const size_t data_length = aMsg.GetData().Length();
    693  bool isBinary = false;
    694 
    695  switch (aMsg.GetPpid()) {
    696    case DATA_CHANNEL_PPID_DOMSTRING:
    697    case DATA_CHANNEL_PPID_DOMSTRING_PARTIAL:
    698      DC_DEBUG(
    699          ("%p: DataChannel: Received string message of length %zu on "
    700           "channel %u",
    701           this, data_length, channel->mStream));
    702      // WebSockets checks IsUTF8() here; we can try to deliver it
    703      break;
    704 
    705    case DATA_CHANNEL_PPID_DOMSTRING_EMPTY:
    706      DC_DEBUG(
    707          ("%p: DataChannel: Received empty string message of length %zu on "
    708           "channel %u",
    709           this, data_length, channel->mStream));
    710      // Just in case.
    711      aMsg.GetData().Truncate(0);
    712      break;
    713 
    714    case DATA_CHANNEL_PPID_BINARY:
    715    case DATA_CHANNEL_PPID_BINARY_PARTIAL:
    716      DC_DEBUG(
    717          ("%p: DataChannel: Received binary message of length %zu on "
    718           "channel id %u",
    719           this, data_length, channel->mStream));
    720      isBinary = true;
    721      break;
    722 
    723    case DATA_CHANNEL_PPID_BINARY_EMPTY:
    724      DC_DEBUG(
    725          ("%p: DataChannel: Received empty binary message of length %zu on "
    726           "channel id %u",
    727           this, data_length, channel->mStream));
    728      // Just in case.
    729      aMsg.GetData().Truncate(0);
    730      isBinary = true;
    731      break;
    732 
    733    default:
    734      NS_ERROR("Unknown data PPID");
    735      DC_ERROR(("%p: Unknown data PPID %" PRIu32, this, aMsg.GetPpid()));
    736      return;
    737  }
    738 
    739  channel->OnMessageReceived(std::move(aMsg.GetData()), isBinary);
    740 }
    741 
    742 void DataChannelConnection::HandleDCEPMessage(IncomingMsg&& aMsg) {
    743  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    744  const struct rtcweb_datachannel_open_request* req;
    745  const struct rtcweb_datachannel_ack* ack;
    746 
    747  req = reinterpret_cast<const struct rtcweb_datachannel_open_request*>(
    748      aMsg.GetData().BeginReading());
    749 
    750  size_t data_length = aMsg.GetLength();
    751 
    752  DC_INFO(("%p: Handling DCEP message of length %zu", this, data_length));
    753 
    754  // Ensure minimum message size (ack is the smallest DCEP message)
    755  if (data_length < sizeof(*ack)) {
    756    DC_WARN(("%p: Ignored invalid DCEP message (too short)", this));
    757    return;
    758  }
    759 
    760  switch (req->msg_type) {
    761    case DATA_CHANNEL_OPEN_REQUEST:
    762      // structure includes a possibly-unused char label[1] (in a packed
    763      // structure)
    764      if (NS_WARN_IF(data_length < sizeof(*req) - 1)) {
    765        return;
    766      }
    767 
    768      HandleOpenRequestMessage(req, data_length, aMsg.GetStreamId());
    769      break;
    770    case DATA_CHANNEL_ACK:
    771      // >= sizeof(*ack) checked above
    772 
    773      ack = reinterpret_cast<const struct rtcweb_datachannel_ack*>(
    774          aMsg.GetData().BeginReading());
    775      HandleOpenAckMessage(ack, data_length, aMsg.GetStreamId());
    776      break;
    777    default:
    778      HandleUnknownMessage(aMsg.GetPpid(), data_length, aMsg.GetStreamId());
    779      break;
    780  }
    781 }
    782 
    783 bool DataChannelConnection::ReassembleMessageChunk(IncomingMsg& aReassembled,
    784                                                   const void* buffer,
    785                                                   size_t length, uint32_t ppid,
    786                                                   uint16_t stream) {
    787  // Note: Until we support SIZE_MAX sized messages, we need this check
    788 #if (SIZE_MAX > UINT32_MAX)
    789  if (length > UINT32_MAX) {
    790    DC_ERROR(("%p: Cannot handle message of size %zu (max=%u)", this, length,
    791              UINT32_MAX));
    792    return false;
    793  }
    794 #endif
    795 
    796  // Ensure it doesn't blow up our buffer
    797  // TODO: Change 'WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL' to whatever the
    798  //       new buffer is capable of holding.
    799  if (length + aReassembled.GetLength() >
    800      WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL) {
    801    DC_ERROR(
    802        ("%p: Buffered message would become too large to handle, closing "
    803         "connection",
    804         this));
    805    return false;
    806  }
    807 
    808  if (aReassembled.GetPpid() != ppid) {
    809    NS_WARNING("DataChannel message aborted by fragment type change!");
    810    return false;
    811  }
    812 
    813  aReassembled.Append((uint8_t*)buffer, length);
    814 
    815  return true;
    816 }
    817 
    818 void DataChannelConnection::OnStreamsReset(std::vector<uint16_t>&& aStreams) {
    819  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    820  for (auto stream : aStreams) {
    821    DC_INFO(("%p: Received reset request for stream %u", this, stream));
    822    RefPtr<DataChannel> channel = FindChannelByStream(stream);
    823    if (channel) {
    824      channel->mRecvStreamNeedsReset = false;
    825      if (channel->mSendStreamNeedsReset) {
    826        // We do not send our own reset yet, we give the RTCDataChannel a chance
    827        // to finish sending messages first.
    828        DC_INFO(("%p: Need to send a reset, closing gracefully", this));
    829        channel->GracefulClose();
    830      } else {
    831        DC_INFO(
    832            ("%p: We've already reset our stream, closing immediately", this));
    833        FinishClose_s(channel);
    834      }
    835    }
    836  }
    837 }
    838 
    839 void DataChannelConnection::OnStreamsResetComplete(
    840    std::vector<uint16_t>&& aStreams) {
    841  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    842  for (auto stream : aStreams) {
    843    DC_INFO(("%p: Received reset response for stream %u", this, stream));
    844    RefPtr<DataChannel> channel = FindChannelByStream(stream);
    845    if (channel) {
    846      channel->mSendStreamNeedsReset = false;
    847      if (!channel->mRecvStreamNeedsReset) {
    848        // The other end has already performed its reset
    849        DC_INFO(
    850            ("%p: Remote stream has already been reset, closing immediately",
    851             this));
    852        FinishClose_s(channel);
    853      }
    854    }
    855  }
    856 }
    857 
    858 already_AddRefed<DataChannel> DataChannelConnection::Open(
    859    const nsACString& label, const nsACString& protocol,
    860    DataChannelReliabilityPolicy prPolicy, bool inOrder, uint32_t prValue,
    861    bool aExternalNegotiated, uint16_t aStream) {
    862  MOZ_ASSERT(NS_IsMainThread());
    863  if (!aExternalNegotiated) {
    864    if (mAllocateEven.isSome()) {
    865      aStream = FindFreeStream();
    866      if (aStream == INVALID_STREAM) {
    867        return nullptr;
    868      }
    869    } else {
    870      // We do not yet know whether we are client or server, and an id has not
    871      // been chosen for us. We will need to choose later.
    872      aStream = INVALID_STREAM;
    873    }
    874  }
    875 
    876  DC_INFO(
    877      ("%p: DC Open: label %s/%s, type %s, inorder %d, prValue %u, "
    878       "external: %s, stream %u",
    879       this, PromiseFlatCString(label).get(),
    880       PromiseFlatCString(protocol).get(), ToString(prPolicy), inOrder, prValue,
    881       aExternalNegotiated ? "true" : "false", aStream));
    882 
    883  if ((prPolicy == DataChannelReliabilityPolicy::Reliable) && (prValue != 0)) {
    884    return nullptr;
    885  }
    886 
    887  if (aStream != INVALID_STREAM) {
    888    if (mStreamIds.ContainsSorted(aStream)) {
    889      DC_ERROR(("%p: external negotiation of already-open channel %u", this,
    890                aStream));
    891      // This is the only place where duplicate id checking is performed. The
    892      // JSImpl code assumes that any error is due to id-related problems. This
    893      // probably needs some cleanup.
    894      return nullptr;
    895    }
    896 
    897    DC_DEBUG(("%p: Inserting externally-negotiated id %u", this,
    898              static_cast<unsigned>(aStream)));
    899    mStreamIds.InsertElementSorted(aStream);
    900  }
    901 
    902  RefPtr<DataChannel> channel(new DataChannel(this, aStream, label, protocol,
    903                                              prPolicy, prValue, inOrder,
    904                                              aExternalNegotiated));
    905  mChannels.Insert(channel);
    906 
    907  if (aStream != INVALID_STREAM) {
    908    mSTS->Dispatch(NS_NewCancelableRunnableFunction(
    909                       "DataChannel::OpenFinish",
    910                       [this, self = RefPtr<DataChannelConnection>(this),
    911                        channel]() mutable { OpenFinish(channel); }),
    912                   NS_DISPATCH_FALLIBLE);
    913  }
    914 
    915  return channel.forget();
    916 }
    917 
    918 // Separate routine so we can also call it to finish up from pending opens
    919 void DataChannelConnection::OpenFinish(RefPtr<DataChannel> aChannel) {
    920  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    921  const uint16_t stream = aChannel->mStream;
    922 
    923  // Cases we care about:
    924  // Pre-negotiated:
    925  //    Not Open:
    926  //      Doesn't fit:
    927  //         -> change initial ask or renegotiate after open
    928  //      -> queue open
    929  //    Open:
    930  //      Doesn't fit:
    931  //         -> RaiseStreamLimitTo && queue
    932  //      Does fit:
    933  //         -> open
    934  // Not negotiated:
    935  //    Not Open:
    936  //      -> queue open
    937  //    Open:
    938  //      -> Try to get a stream
    939  //      Doesn't fit:
    940  //         -> RaiseStreamLimitTo && queue
    941  //      Does fit:
    942  //         -> open
    943  // So the Open cases are basically the same
    944  // Not Open cases are simply queue for non-negotiated, and
    945  // either change the initial ask or possibly renegotiate after open.
    946  DataChannelConnectionState state = GetState();
    947  if (state != DataChannelConnectionState::Open ||
    948      stream >= mNegotiatedIdLimit) {
    949    if (state == DataChannelConnectionState::Open) {
    950      MOZ_ASSERT(stream != INVALID_STREAM);
    951      // RaiseStreamLimitTo() limits to MAX_NUM_STREAMS -- allocate extra
    952      // streams to avoid asking for more every time we want a higher limit.
    953      uint16_t num_desired = std::min(16 * (stream / 16 + 1), MAX_NUM_STREAMS);
    954      DC_DEBUG(("%p: Attempting to raise stream limit %u -> %u", this,
    955                mNegotiatedIdLimit, num_desired));
    956      if (!RaiseStreamLimitTo(num_desired)) {
    957        NS_ERROR("Failed to request more streams");
    958        FinishClose_s(aChannel);
    959        return;
    960      }
    961    }
    962    DC_INFO(("%p: Queuing channel %p (%u) to finish open", this, aChannel.get(),
    963             stream));
    964    mPending.insert(std::move(aChannel));
    965    return;
    966  }
    967 
    968  MOZ_ASSERT(state == DataChannelConnectionState::Open);
    969  MOZ_ASSERT(stream != INVALID_STREAM);
    970  MOZ_ASSERT(stream < mNegotiatedIdLimit);
    971 
    972  if (!aChannel->mNegotiated) {
    973    if (!aChannel->mOrdered) {
    974      // Don't send unordered until this gets cleared.
    975      aChannel->mWaitingForAck = true;
    976    }
    977 
    978    const int error = SendOpenRequestMessage(*aChannel);
    979    if (error) {
    980      DC_ERROR(("%p: SendOpenRequest failed, error = %d", this, error));
    981      FinishClose_s(aChannel);
    982      return;
    983    }
    984  }
    985 
    986  // Even if we're in the negotiated case, and will never send an open request,
    987  // we're supposed to send a stream reset when we tear down.
    988  aChannel->mSendStreamNeedsReset = true;
    989  aChannel->mRecvStreamNeedsReset = true;
    990 
    991  if (aChannel->mNegotiated) {
    992    // Either externally negotiated or we sent Open
    993    aChannel->AnnounceOpen();
    994    OnStreamOpen(stream);
    995  }
    996 }
    997 
    998 nsISerialEventTarget* DataChannelConnection::GetIOThread() {
    999  // Spawn a thread to send the data
   1000  if (!mInternalIOThread) {
   1001    // TODO(bug 1998966): Lazy shutdown once done? Maybe have this live in
   1002    // DataChannel (so we have an IO thread for each channel that sends blobs)?
   1003    NS_NewNamedThread("DataChannel IO", getter_AddRefs(mInternalIOThread));
   1004  }
   1005 
   1006  return mInternalIOThread.get();
   1007 }
   1008 
   1009 void DataChannelConnection::SetState(DataChannelConnectionState aState) {
   1010  MOZ_ASSERT(mSTS->IsOnCurrentThread());
   1011 
   1012  DC_DEBUG(
   1013      ("%p: DataChannelConnection labeled %s switching connection state %s -> "
   1014       "%s",
   1015       this, mTransportId.c_str(), ToString(mState), ToString(aState)));
   1016 
   1017  if (mState == aState) {
   1018    return;
   1019  }
   1020 
   1021  mState = aState;
   1022 
   1023  if (mState == DataChannelConnectionState::Open) {
   1024    Dispatch(NS_NewCancelableRunnableFunction(
   1025                 __func__,
   1026                 [this, self = RefPtr<DataChannelConnection>(this)]() {
   1027                   if (mListener) {
   1028                     mListener->NotifySctpConnected();
   1029                   }
   1030                 }),
   1031             NS_DISPATCH_FALLIBLE);
   1032  } else if (mState == DataChannelConnectionState::Closed) {
   1033    Dispatch(NS_NewCancelableRunnableFunction(
   1034                 __func__,
   1035                 [this, self = RefPtr<DataChannelConnection>(this)]() {
   1036                   if (mListener) {
   1037                     mListener->NotifySctpClosed();
   1038                   }
   1039                 }),
   1040             NS_DISPATCH_FALLIBLE);
   1041  }
   1042 }
   1043 
   1044 void DataChannelConnection::SendDataMessage(DataChannel& aChannel,
   1045                                            nsACString&& aMsg, bool aIsBinary) {
   1046  // Could be main, could be a worker
   1047 
   1048  nsCString temp(std::move(aMsg));
   1049 
   1050  mSTS->Dispatch(
   1051      NS_NewCancelableRunnableFunction(
   1052          __func__,
   1053          [this, self = RefPtr<DataChannelConnection>(this),
   1054           channel = RefPtr(&aChannel), msg = std::move(temp),
   1055           aIsBinary]() mutable {
   1056            Maybe<uint16_t> maxRetransmissions;
   1057            Maybe<uint16_t> maxLifetimeMs;
   1058 
   1059            switch (channel->mPrPolicy) {
   1060              case DataChannelReliabilityPolicy::Reliable:
   1061                break;
   1062              case DataChannelReliabilityPolicy::LimitedRetransmissions:
   1063                maxRetransmissions = Some(channel->mPrValue);
   1064                break;
   1065              case DataChannelReliabilityPolicy::LimitedLifetime:
   1066                maxLifetimeMs = Some(channel->mPrValue);
   1067                break;
   1068            }
   1069 
   1070            uint32_t ppid;
   1071            if (aIsBinary) {
   1072              if (msg.Length()) {
   1073                ppid = DATA_CHANNEL_PPID_BINARY;
   1074              } else {
   1075                ppid = DATA_CHANNEL_PPID_BINARY_EMPTY;
   1076                msg.Append('\0');
   1077              }
   1078            } else {
   1079              if (msg.Length()) {
   1080                ppid = DATA_CHANNEL_PPID_DOMSTRING;
   1081              } else {
   1082                ppid = DATA_CHANNEL_PPID_DOMSTRING_EMPTY;
   1083                msg.Append('\0');
   1084              }
   1085            }
   1086 
   1087            DataChannelMessageMetadata metadata(
   1088                channel->mStream, ppid,
   1089                !channel->mOrdered && !channel->mWaitingForAck,
   1090                maxRetransmissions, maxLifetimeMs);
   1091            // Create message instance and send
   1092            OutgoingMsg outgoing(std::move(msg), metadata);
   1093 
   1094            SendMessage(*channel, std::move(outgoing));
   1095          }),
   1096      NS_DISPATCH_FALLIBLE);
   1097 }
   1098 
   1099 void DataChannelConnection::EndOfStream(const RefPtr<DataChannel>& aChannel) {
   1100  mSTS->Dispatch(
   1101      NS_NewCancelableRunnableFunction(
   1102          __func__,
   1103          [this, self = RefPtr<DataChannelConnection>(this), channel = aChannel,
   1104           stream = aChannel->mStream]() {
   1105            if (channel->mSendStreamNeedsReset) {
   1106              if (channel->mEndOfStreamCalled) {
   1107                return;
   1108              }
   1109              channel->mEndOfStreamCalled = true;
   1110              DC_INFO((
   1111                  "%p: Need to send a reset for channel %p, closing gracefully",
   1112                  this, channel.get()));
   1113              nsTArray<uint16_t> temp({stream});
   1114              bool success = ResetStreams(temp);
   1115              if (success) {
   1116                return;
   1117              }
   1118              // We presume that OnStreamResetComplete will not be called in
   1119              // this case, nor will we receive a stream reset from the other
   1120              // end.
   1121              DC_INFO(
   1122                  ("%p: Failed to send a reset for channel %p, closing "
   1123                   "immediately",
   1124                   this, channel.get()));
   1125              channel->mRecvStreamNeedsReset = false;
   1126            }
   1127 
   1128            if (!channel->mRecvStreamNeedsReset) {
   1129              // Stream is reset in both directions (or never existed in the
   1130              // first place), we're ready to finish tearing down.
   1131              DC_INFO(
   1132                  ("%p: Stream does not need reset in either direction for "
   1133                   "channel %p",
   1134                   this, channel.get()));
   1135              FinishClose_s(channel);
   1136            }
   1137          }),
   1138      NS_DISPATCH_FALLIBLE);
   1139 }
   1140 
   1141 void DataChannel::EndOfStream() {
   1142  // This can happen before mDomEventTarget is actually ready.
   1143  if (mConnection) {
   1144    mConnection->EndOfStream(this);
   1145  }
   1146 }
   1147 
   1148 void DataChannelConnection::FinishClose_s(const RefPtr<DataChannel>& aChannel) {
   1149  MOZ_ASSERT(mSTS->IsOnCurrentThread());
   1150 
   1151  // We're removing this from all containers, make sure the passed pointer
   1152  // stays valid.
   1153  // It is possible for this to be called twice if both JS and the transport
   1154  // side cause closure at the same time, but this is idempotent so no big deal
   1155  aChannel->mBufferedData.Clear();
   1156  mChannels.Remove(aChannel);
   1157  mPending.erase(aChannel);
   1158 
   1159  // Close the channel's data transport by following the associated
   1160  // procedure.
   1161  aChannel->AnnounceClosed();
   1162 }
   1163 
   1164 void DataChannelConnection::CloseAll_s() {
   1165  // Make sure no more channels will be opened
   1166  SetState(DataChannelConnectionState::Closed);
   1167 
   1168  nsTArray<uint16_t> streamsToReset;
   1169  // Close current channels
   1170  // If there are runnables, they hold a strong ref and keep the channel
   1171  // and/or connection alive (even if in a CLOSED state)
   1172  for (auto& channel : mChannels.GetAll()) {
   1173    DC_INFO(("%p: closing channel %p, stream %u", this, channel.get(),
   1174             channel->mStream));
   1175    if (channel->mSendStreamNeedsReset) {
   1176      DC_INFO(("%p: channel %p needs to send reset", this, channel.get()));
   1177      channel->mSendStreamNeedsReset = false;
   1178      streamsToReset.AppendElement(channel->mStream);
   1179    }
   1180    // We do not wait for the reset to finish in this case; we won't be around
   1181    // to see the response.
   1182    FinishClose_s(channel);
   1183  }
   1184 
   1185  // Clean up any pending opens for channels
   1186  std::set<RefPtr<DataChannel>> temp(std::move(mPending));
   1187  // Technically in an unspecified state, although no reasonable impl will leave
   1188  // anything in here.
   1189  mPending.clear();
   1190  for (const auto& channel : temp) {
   1191    DC_INFO(("%p: closing pending channel %p, stream %u", this, channel.get(),
   1192             channel->mStream));
   1193    FinishClose_s(channel);  // also releases the ref on each iteration
   1194  }
   1195 
   1196  // It's more efficient to let the Resets queue in shutdown and then
   1197  // ResetStreams() here.
   1198  if (!streamsToReset.IsEmpty()) {
   1199    ResetStreams(streamsToReset);
   1200  }
   1201 }
   1202 
   1203 void DataChannelConnection::CloseAll() {
   1204  MOZ_ASSERT(NS_IsMainThread());
   1205  DC_INFO(("%p: Closing all channels", this));
   1206 
   1207  mSTS->Dispatch(NS_NewCancelableRunnableFunction(
   1208                     "DataChannelConnection::CloseAll",
   1209                     [this, self = RefPtr<DataChannelConnection>(this)]() {
   1210                       CloseAll_s();
   1211                     }),
   1212                 NS_DISPATCH_FALLIBLE);
   1213 }
   1214 
   1215 void DataChannelConnection::MarkStreamAvailable(uint16_t aStream) {
   1216  MOZ_ASSERT(NS_IsMainThread());
   1217  mStreamIds.RemoveElementSorted(aStream);
   1218 }
   1219 
   1220 bool DataChannelConnection::Channels::IdComparator::Equals(
   1221    const RefPtr<DataChannel>& aChannel, uint16_t aId) const {
   1222  return aChannel->mStream == aId;
   1223 }
   1224 
   1225 bool DataChannelConnection::Channels::IdComparator::LessThan(
   1226    const RefPtr<DataChannel>& aChannel, uint16_t aId) const {
   1227  return aChannel->mStream < aId;
   1228 }
   1229 
   1230 bool DataChannelConnection::Channels::IdComparator::Equals(
   1231    const RefPtr<DataChannel>& a1, const RefPtr<DataChannel>& a2) const {
   1232  return Equals(a1, a2->mStream);
   1233 }
   1234 
   1235 bool DataChannelConnection::Channels::IdComparator::LessThan(
   1236    const RefPtr<DataChannel>& a1, const RefPtr<DataChannel>& a2) const {
   1237  return LessThan(a1, a2->mStream);
   1238 }
   1239 
   1240 void DataChannelConnection::Channels::Insert(
   1241    const RefPtr<DataChannel>& aChannel) {
   1242  DC_DEBUG(("%p: Inserting channel %u : %p", this, aChannel->mStream,
   1243            aChannel.get()));
   1244  MutexAutoLock lock(mMutex);
   1245  if (aChannel->mStream != INVALID_STREAM) {
   1246    MOZ_ASSERT(!mChannels.ContainsSorted(aChannel, IdComparator()));
   1247  }
   1248 
   1249  MOZ_ASSERT(!mChannels.Contains(aChannel));
   1250 
   1251  mChannels.InsertElementSorted(aChannel, IdComparator());
   1252 }
   1253 
   1254 bool DataChannelConnection::Channels::Remove(
   1255    const RefPtr<DataChannel>& aChannel) {
   1256  DC_DEBUG(("%p: Removing channel %u : %p", this, aChannel->mStream,
   1257            aChannel.get()));
   1258  MutexAutoLock lock(mMutex);
   1259  if (aChannel->mStream == INVALID_STREAM) {
   1260    return mChannels.RemoveElement(aChannel);
   1261  }
   1262 
   1263  auto index = mChannels.BinaryIndexOf(aChannel->mStream, IdComparator());
   1264  if (index != ChannelArray::NoIndex) {
   1265    if (mChannels[index].get() == aChannel.get()) {
   1266      mChannels.RemoveElementAt(index);
   1267      return true;
   1268    }
   1269  }
   1270  return false;
   1271 }
   1272 
   1273 RefPtr<DataChannel> DataChannelConnection::Channels::Get(uint16_t aId) const {
   1274  MutexAutoLock lock(mMutex);
   1275  auto index = mChannels.BinaryIndexOf(aId, IdComparator());
   1276  if (index == ChannelArray::NoIndex) {
   1277    return nullptr;
   1278  }
   1279  return mChannels[index];
   1280 }
   1281 
   1282 RefPtr<DataChannel> DataChannelConnection::Channels::GetNextChannel(
   1283    uint16_t aCurrentId) const {
   1284  MutexAutoLock lock(mMutex);
   1285  if (mChannels.IsEmpty()) {
   1286    return nullptr;
   1287  }
   1288 
   1289  auto index = mChannels.IndexOfFirstElementGt(aCurrentId, IdComparator());
   1290  if (index == mChannels.Length()) {
   1291    index = 0;
   1292  }
   1293  return mChannels[index];
   1294 }
   1295 
   1296 DataChannel::DataChannel(DataChannelConnection* connection, uint16_t stream,
   1297                         const nsACString& label, const nsACString& protocol,
   1298                         DataChannelReliabilityPolicy policy, uint32_t value,
   1299                         bool ordered, bool negotiated)
   1300    : mLabel(label),
   1301      mProtocol(protocol),
   1302      mPrPolicy(policy),
   1303      mPrValue(value),
   1304      mNegotiated(negotiated),
   1305      mOrdered(ordered),
   1306      mStream(stream),
   1307      mConnection(connection),
   1308      mDomEventTarget(new StopGapEventTarget) {
   1309  DC_INFO(
   1310      ("%p: Necko DataChannel created, label '%s'. Waiting for RTCDataChannel "
   1311       "to be created.",
   1312       this, mLabel.get()));
   1313  NS_ASSERTION(mConnection, "NULL connection");
   1314 }
   1315 
   1316 DataChannel::~DataChannel() {
   1317  DC_INFO(("%p: DataChannel is being destroyed.", this));
   1318 }
   1319 
   1320 void DataChannel::SetMainthreadDomDataChannel(dom::RTCDataChannel* aChannel) {
   1321  MOZ_ASSERT(NS_IsMainThread());
   1322  DC_INFO(
   1323      ("%p: Mainthread RTCDataChannel created(%p). Waiting for confirmation of "
   1324       "event target.",
   1325       this, aChannel));
   1326  mMainthreadDomDataChannel = aChannel;
   1327  SetMaxMessageSize(mConnection->GetMaxMessageSize());
   1328  if (GetStream()) {
   1329    mMainthreadDomDataChannel->SetId(*GetStream());
   1330  }
   1331 }
   1332 
   1333 void DataChannel::OnWorkerTransferStarted() {
   1334  MOZ_ASSERT(NS_IsMainThread());
   1335  DC_INFO((
   1336      "%p: RTCDataChannel is being transferred. Disabling synchronous updates. "
   1337      "Mainthread will not be our event target, waiting to learn worker "
   1338      "thread.",
   1339      this));
   1340  mHasWorkerDomDataChannel = true;
   1341 }
   1342 
   1343 void DataChannel::OnWorkerTransferComplete(dom::RTCDataChannel* aChannel) {
   1344  MOZ_ASSERT(!NS_IsMainThread());
   1345  DC_INFO(
   1346      ("%p: Worker RTCDataChannel created(%p). Worker thread is our event "
   1347       "target.",
   1348       this, aChannel));
   1349  mWorkerDomDataChannel = aChannel;
   1350  mDomEventTarget->SetRealEventTarget(GetCurrentSerialEventTarget());
   1351 }
   1352 
   1353 void DataChannel::OnWorkerTransferDisabled() {
   1354  MOZ_ASSERT(NS_IsMainThread());
   1355  DC_INFO(
   1356      ("%p: Mainthread RTCDataChannel is no longer eligible for transfer. "
   1357       "Mainthread is our event target.",
   1358       this));
   1359  mDomEventTarget->SetRealEventTarget(GetCurrentSerialEventTarget());
   1360 }
   1361 
   1362 void DataChannel::UnsetMainthreadDomDataChannel() {
   1363  MOZ_ASSERT(NS_IsMainThread());
   1364  DC_INFO(("%p: Mainthread RTCDataChannel is being destroyed(%p).", this,
   1365           mMainthreadDomDataChannel));
   1366  mMainthreadDomDataChannel = nullptr;
   1367  if (mHasWorkerDomDataChannel) {
   1368    DC_INFO(
   1369        ("Mainthread RTCDataChannel is being destroyed. Dispatching task to "
   1370         "inform corresponding worker RTCDataChannel."));
   1371    mDomEventTarget->Dispatch(
   1372        NS_NewCancelableRunnableFunction(
   1373            "DataChannel::UnsetMainthreadDomDataChannel",
   1374            [this, self = RefPtr<DataChannel>(this)] {
   1375              if (mWorkerDomDataChannel) {
   1376                mWorkerDomDataChannel->UnsetWorkerNeedsUs();
   1377              }
   1378            }),
   1379        NS_DISPATCH_FALLIBLE);
   1380  } else {
   1381    DC_INFO(("%p: No worker RTCDataChannel. Closing.", this));
   1382    EndOfStream();
   1383  }
   1384 }
   1385 
   1386 void DataChannel::UnsetWorkerDomDataChannel() {
   1387  MOZ_ASSERT(!NS_IsMainThread());
   1388  MOZ_ASSERT(mDomEventTarget->IsOnCurrentThread());
   1389  DC_INFO(("%p: Worker RTCDataChannel is being destroyed(%p). Closing.", this,
   1390           mWorkerDomDataChannel));
   1391  mWorkerDomDataChannel = nullptr;
   1392  EndOfStream();
   1393 }
   1394 
   1395 void DataChannel::DecrementBufferedAmount(size_t aSize) {
   1396  mDomEventTarget->Dispatch(
   1397      NS_NewCancelableRunnableFunction(
   1398          "DataChannel::DecrementBufferedAmount",
   1399          [this, self = RefPtr<DataChannel>(this), aSize] {
   1400            if (GetDomDataChannel()) {
   1401              GetDomDataChannel()->DecrementBufferedAmount(aSize);
   1402            }
   1403          }),
   1404      NS_DISPATCH_FALLIBLE);
   1405 }
   1406 
   1407 void DataChannel::AnnounceOpen() {
   1408  // When an underlying data transport is to be announced (the other peer
   1409  // created a channel with negotiated unset or set to false), the user agent of
   1410  // the peer that did not initiate the creation process MUST queue a task to
   1411  // run the following steps:
   1412  DC_INFO(
   1413      ("%p: DataChannel is open. Queueing AnnounceOpen call to RTCDataChannel.",
   1414       this));
   1415 
   1416  mDomEventTarget->Dispatch(
   1417      NS_NewCancelableRunnableFunction(
   1418          "DataChannel::AnnounceOpen",
   1419          [this, self = RefPtr<DataChannel>(this)] {
   1420            if (GetDomDataChannel()) {
   1421              DC_INFO(("Calling AnnounceOpen on RTCDataChannel."));
   1422              GetDomDataChannel()->AnnounceOpen();
   1423            }
   1424 
   1425            // Right now, we're already on mainthread, but this might be a
   1426            // worker someday.
   1427            if (mConnection) {
   1428              GetMainThreadSerialEventTarget()->Dispatch(
   1429                  NS_NewCancelableRunnableFunction(
   1430                      "DataChannel::AnnounceOpen",
   1431                      [this, self = RefPtr<DataChannel>(this),
   1432                       connection = mConnection]() {
   1433                        // Stats stuff
   1434                        // TODO: Can we simplify this?
   1435                        if (!mEverOpened && connection->mListener) {
   1436                          mEverOpened = true;
   1437                          connection->mListener->NotifyDataChannelOpen(this);
   1438                        }
   1439                      }),
   1440                  NS_DISPATCH_FALLIBLE);
   1441            }
   1442          }),
   1443      NS_DISPATCH_FALLIBLE);
   1444 }
   1445 
   1446 void DataChannel::AnnounceClosed() {
   1447  // When an RTCDataChannel object's underlying data transport has been closed,
   1448  // the user agent MUST queue a task to run the following steps:
   1449  DC_INFO(
   1450      ("%p: DataChannel is closed. Queueing AnnounceClosed call to "
   1451       "RTCDataChannel.",
   1452       this));
   1453 
   1454  GetMainThreadSerialEventTarget()->Dispatch(
   1455      NS_NewCancelableRunnableFunction(
   1456          "DataChannel::AnnounceClosed",
   1457          [this, self = RefPtr<DataChannel>(this), connection = mConnection]() {
   1458            if (mAnnouncedClosed) {
   1459              return;
   1460            }
   1461            mAnnouncedClosed = true;
   1462            // We have to unset this first, and then fire DOM events, so the
   1463            // event handler won't hit an error if it tries to reuse this id.
   1464            if (mStream != INVALID_STREAM) {
   1465              DC_INFO(("%p: Marking stream id %u available", this, mStream));
   1466              connection->MarkStreamAvailable(mStream);
   1467            }
   1468 
   1469            // Stats stuff
   1470            if (mEverOpened && connection->mListener) {
   1471              connection->mListener->NotifyDataChannelClosed(this);
   1472            }
   1473 
   1474            DC_INFO(("%p: Dispatching AnnounceClosed to DOM thread", this));
   1475            mDomEventTarget->Dispatch(
   1476                NS_NewCancelableRunnableFunction(
   1477                    "DataChannel::AnnounceClosed",
   1478                    [this, self = RefPtr<DataChannel>(this)] {
   1479                      DC_INFO(("%p: Attempting to call AnnounceClosed.", this));
   1480                      if (GetDomDataChannel()) {
   1481                        DC_INFO(
   1482                            ("%p: Calling AnnounceClosed on RTCDataChannel.",
   1483                             this));
   1484                        GetDomDataChannel()->AnnounceClosed();
   1485                      }
   1486                    }),
   1487                NS_DISPATCH_FALLIBLE);
   1488          }),
   1489      NS_DISPATCH_FALLIBLE);
   1490 }
   1491 
   1492 void DataChannel::GracefulClose() {
   1493  DC_INFO(
   1494      ("%p: DataChannel transport is closing. Queueing GracefulClose call to "
   1495       "RTCDataChannel.",
   1496       this));
   1497 
   1498  mDomEventTarget->Dispatch(
   1499      NS_NewCancelableRunnableFunction(
   1500          "DataChannel::GracefulClose",
   1501          [this, self = RefPtr<DataChannel>(this)] {
   1502            if (GetDomDataChannel()) {
   1503              DC_INFO(("Calling GracefulClose on RTCDataChannel."));
   1504              GetDomDataChannel()->GracefulClose();
   1505            }
   1506          }),
   1507      NS_DISPATCH_FALLIBLE);
   1508 }
   1509 
   1510 void DataChannel::SendMsg(nsCString&& aMsg) {
   1511  SendBuffer(std::move(aMsg), false);
   1512 }
   1513 
   1514 void DataChannel::SendBinaryMsg(nsCString&& aMsg) {
   1515  SendBuffer(std::move(aMsg), true);
   1516 }
   1517 
   1518 void DataChannel::SendBuffer(nsCString&& aMsg, bool aBinary) {
   1519  MOZ_ASSERT(mDomEventTarget->IsOnCurrentThread());
   1520  if (mMessagesSentPromise) {
   1521    mMessagesSentPromise = mMessagesSentPromise->Then(
   1522        mDomEventTarget, __func__,
   1523        [this, self = RefPtr<DataChannel>(this), msg = std::move(aMsg),
   1524         aBinary](
   1525            const GenericNonExclusivePromise::ResolveOrRejectValue&) mutable {
   1526          if (mConnection) {
   1527            mConnection->SendDataMessage(*this, std::move(msg), aBinary);
   1528            return GenericNonExclusivePromise::CreateAndResolve(true, __func__);
   1529          }
   1530          return GenericNonExclusivePromise::CreateAndResolve(false, __func__);
   1531        });
   1532 
   1533    UnsetMessagesSentPromiseWhenSettled();
   1534    return;
   1535  }
   1536  mConnection->SendDataMessage(*this, std::move(aMsg), aBinary);
   1537 }
   1538 
   1539 void DataChannel::SendBinaryBlob(nsIInputStream* aBlob) {
   1540  MOZ_ASSERT(mDomEventTarget->IsOnCurrentThread());
   1541  if (!mMessagesSentPromise) {
   1542    mMessagesSentPromise =
   1543        GenericNonExclusivePromise::CreateAndResolve(true, __func__);
   1544  }
   1545 
   1546  mMessagesSentPromise = mMessagesSentPromise->Then(
   1547      mConnection->GetIOThread(), __func__,
   1548      [this, self = RefPtr<DataChannel>(this), blob = RefPtr(aBlob)](
   1549          const GenericNonExclusivePromise::ResolveOrRejectValue&) {
   1550        nsCString data;
   1551        if (NS_SUCCEEDED(NS_ReadInputStreamToString(blob, data, -1))) {
   1552          if (mConnection) {
   1553            // This dispatches to STS, which is when we're supposed to resolve
   1554            mConnection->SendDataMessage(*this, std::move(data), true);
   1555          }
   1556          blob->Close();
   1557          return GenericNonExclusivePromise::CreateAndResolve(true, __func__);
   1558        }
   1559        return GenericNonExclusivePromise::CreateAndResolve(false, __func__);
   1560      });
   1561 
   1562  UnsetMessagesSentPromiseWhenSettled();
   1563 }
   1564 
   1565 void DataChannel::UnsetMessagesSentPromiseWhenSettled() {
   1566  MOZ_ASSERT(mDomEventTarget->IsOnCurrentThread());
   1567  // This is why we are using a non-exclusive promise; we want to null this out
   1568  // when we're done, but only if nothing else has chained off of it.
   1569  mMessagesSentPromise->Then(
   1570      mDomEventTarget, __func__,
   1571      [this, self = RefPtr(this), promise = mMessagesSentPromise]() {
   1572        if (promise == mMessagesSentPromise) {
   1573          mMessagesSentPromise = nullptr;
   1574        }
   1575      });
   1576 }
   1577 
   1578 void DataChannel::SetStream(uint16_t aId) {
   1579  MOZ_ASSERT(NS_IsMainThread());
   1580  mStream = aId;
   1581 
   1582  // This is an inconvenient wrinkle in the spec; if the stream id is discovered
   1583  // on main (for any reason), we update mainthread-homed RTCDataChannel
   1584  // synchronously, but must dispatch for workers. It is possible this will
   1585  // change, but probably not.
   1586  if (mHasWorkerDomDataChannel) {
   1587    DC_INFO(
   1588        ("DataChannel has been allocated a stream ID. Queueing task to inform "
   1589         "worker RTCDataChannel."));
   1590    mDomEventTarget->Dispatch(
   1591        NS_NewCancelableRunnableFunction(
   1592            __func__,
   1593            [this, self = RefPtr<DataChannel>(this), aId] {
   1594              if (mWorkerDomDataChannel) {
   1595                mWorkerDomDataChannel->SetId(aId);
   1596              }
   1597            }),
   1598        NS_DISPATCH_FALLIBLE);
   1599  } else {
   1600    DC_INFO(
   1601        ("%p: DataChannel has been allocated a stream ID. Synchronously "
   1602         "informing mainthread RTCDataChannel.",
   1603         this));
   1604    mMainthreadDomDataChannel->SetId(aId);
   1605  }
   1606 }
   1607 
   1608 void DataChannel::SetMaxMessageSize(double aMaxMessageSize) {
   1609  MOZ_ASSERT(NS_IsMainThread());
   1610 
   1611  if (mHasWorkerDomDataChannel) {
   1612    DC_INFO(
   1613        ("DataChannel has updated its maximum message size. Queueing task to "
   1614         "inform worker RTCDataChannel."));
   1615    mDomEventTarget->Dispatch(
   1616        NS_NewCancelableRunnableFunction(
   1617            __func__,
   1618            [this, self = RefPtr<DataChannel>(this), aMaxMessageSize] {
   1619              if (mWorkerDomDataChannel) {
   1620                mWorkerDomDataChannel->SetMaxMessageSize(aMaxMessageSize);
   1621              }
   1622            }),
   1623        NS_DISPATCH_FALLIBLE);
   1624  } else {
   1625    DC_INFO(
   1626        ("%p: DataChannel has updated its maximum message size. Synchronously "
   1627         "informing mainthread RTCDataChannel.",
   1628         this));
   1629    if (mMainthreadDomDataChannel) {
   1630      mMainthreadDomDataChannel->SetMaxMessageSize(aMaxMessageSize);
   1631    }
   1632  }
   1633 }
   1634 
   1635 void DataChannel::OnMessageReceived(nsCString&& aMsg, bool aIsBinary) {
   1636  // Receiving any data implies that the other end has received an OPEN
   1637  // request from us.
   1638  mWaitingForAck = false;
   1639 
   1640  DC_DEBUG(
   1641      ("%p: received message (%s)", this, aIsBinary ? "binary" : "string"));
   1642 
   1643  mDomEventTarget->Dispatch(NS_NewCancelableRunnableFunction(
   1644                                "DataChannel::OnMessageReceived",
   1645                                [this, self = RefPtr<DataChannel>(this),
   1646                                 msg = std::move(aMsg), aIsBinary]() {
   1647                                  if (GetDomDataChannel()) {
   1648                                    GetDomDataChannel()->DoOnMessageAvailable(
   1649                                        msg, aIsBinary);
   1650                                  }
   1651                                }),
   1652                            NS_DISPATCH_FALLIBLE);
   1653 }
   1654 
   1655 }  // namespace mozilla