tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

DataChannelDcSctp.cpp (16922B)


      1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
      2 /* vim: set ts=2 et sw=2 tw=80: */
      3 /* This Source Code Form is subject to the terms of the Mozilla Public
      4 * License, v. 2.0. If a copy of the MPL was not distributed with this file,
      5 * You can obtain one at http://mozilla.org/MPL/2.0/. */
      6 
      7 #include "DataChannelDcSctp.h"
      8 #include "mozilla/Components.h"
      9 #include "mozilla/RandomNum.h"
     10 #include "DataChannelLog.h"
     11 #include "transport/runnable_utils.h"
     12 
     13 namespace mozilla {
     14 
     15 DataChannelConnectionDcSctp::DataChannelConnectionDcSctp(
     16    DataConnectionListener* aListener, nsISerialEventTarget* aTarget,
     17    MediaTransportHandler* aHandler)
     18    : DataChannelConnection(aListener, aTarget, aHandler) {
     19  // dcsctp does not expose anything related to negotiation of maximum stream
     20  // id.
     21  mNegotiatedIdLimit = MAX_NUM_STREAMS;
     22 }
     23 
     24 void DataChannelConnectionDcSctp::Destroy() {
     25  MOZ_ASSERT(NS_IsMainThread());
     26  DC_DEBUG(("%s: %p", __func__, this));
     27  DataChannelConnection::Destroy();
     28  mSTS->Dispatch(NS_NewRunnableFunction(
     29      "DataChannelConnectionDcSctp::Destroy",
     30      [this, self = RefPtr<DataChannelConnectionDcSctp>(this)]() {
     31        if (mDcSctp) {
     32          mDcSctp->Close();
     33          // Do we do this now?
     34          mDcSctp = nullptr;
     35        }
     36      }));
     37 }
     38 
     39 bool DataChannelConnectionDcSctp::RaiseStreamLimitTo(uint16_t aNewLimit) {
     40  MOZ_ASSERT(mSTS->IsOnCurrentThread());
     41  DC_DEBUG(("%s: %p", __func__, this));
     42  // dcsctp does not expose anything related to negotiation of maximum stream
     43  // id. It probably just negotiates 65534. Just smile and nod.
     44  return true;
     45 }
     46 
     47 void DataChannelConnectionDcSctp::OnTransportReady() {
     48  MOZ_ASSERT(mSTS->IsOnCurrentThread());
     49  DC_DEBUG(("%s: %p", __func__, this));
     50  if (!mDcSctp) {
     51    auto factory = std::make_unique<dcsctp::DcSctpSocketFactory>();
     52    DcSctpOptions options;
     53    options.local_port = mLocalPort;
     54    options.remote_port = mRemotePort;
     55    options.max_message_size = 8 * 1024 * 1024;
     56    options.max_timer_backoff_duration = DurationMs(3000);
     57    // Don't close the connection automatically on too many retransmissions.
     58    options.max_retransmissions = std::nullopt;
     59    options.max_init_retransmits = std::nullopt;
     60    options.per_stream_send_queue_limit = 1024 * 1024 * 64;
     61    // This is just set to avoid denial-of-service. Practically unlimited.
     62    options.max_send_buffer_size =
     63        std::numeric_limits<decltype(options.max_send_buffer_size)>::max();
     64    options.max_receiver_window_buffer_size = 16 * 1024 * 1024;
     65    options.enable_message_interleaving = true;
     66    // The default value of 200 leads to extremely poor congestion recovery
     67    // when packet loss has occurred.
     68    options.delayed_ack_max_timeout = DurationMs(50);
     69 
     70    mDcSctp =
     71        factory->Create("DataChannelConnectionDcSctp", *this, nullptr, options);
     72    mDcSctp->Connect();
     73  }
     74 }
     75 
     76 bool DataChannelConnectionDcSctp::Init(const uint16_t aLocalPort,
     77                                       const uint16_t aNumStreams) {
     78  return true;
     79 }
     80 
     81 int DataChannelConnectionDcSctp::SendMessage(DataChannel& aChannel,
     82                                             OutgoingMsg&& aMsg) {
     83  MOZ_ASSERT(mSTS->IsOnCurrentThread());
     84  DC_DEBUG(("%s: %p (size %u)", __func__, this,
     85            static_cast<unsigned>(aMsg.GetRemainingData().size())));
     86  if (!mDcSctp) {
     87    return EBADF;  // Debatable?
     88  }
     89 
     90  // I do not see any way to get nsCString to pass ownership of its buffer to
     91  // anything besides another nsString. Bummer.
     92  auto remaining = aMsg.GetRemainingData();
     93  std::vector<uint8_t> data;
     94  data.assign(remaining.begin(), remaining.end());
     95  DcSctpMessage msg(StreamID(aMsg.GetMetadata().mStreamId),
     96                    PPID(aMsg.GetMetadata().mPpid), std::move(data));
     97  SendOptions options;
     98  options.unordered = IsUnordered(aMsg.GetMetadata().mUnordered);
     99  aMsg.GetMetadata().mMaxLifetimeMs.apply([&options](uint16_t lifetime) {
    100    options.lifetime = DurationMs(lifetime);
    101  });
    102  aMsg.GetMetadata().mMaxRetransmissions.apply(
    103      [&options](uint16_t rtx) { options.max_retransmissions = rtx; });
    104 
    105  if (aMsg.GetMetadata().mPpid == DATA_CHANNEL_PPID_CONTROL) {
    106    // Make sure we get a callback when this DCEP message is sent, and remember
    107    // the stream id and the size. This allows us to work around the dcsctp bug
    108    // that counts DCEP as part of bufferedAmount.
    109    uint64_t id = mNextLifecycleId++;
    110    options.lifecycle_id = LifecycleId(id);
    111    mBufferedDCEPBytes[id] =
    112        std::make_pair(aMsg.GetMetadata().mStreamId, remaining.size());
    113  }
    114 
    115  auto result = mDcSctp->Send(std::move(msg), options);
    116 
    117  if (aMsg.GetMetadata().mPpid != DATA_CHANNEL_PPID_DOMSTRING_EMPTY &&
    118      aMsg.GetMetadata().mPpid != DATA_CHANNEL_PPID_BINARY_EMPTY) {
    119    mBufferedAmounts[aMsg.GetMetadata().mStreamId] += remaining.size();
    120  }
    121 
    122  switch (result) {
    123    case SendStatus::kSuccess:
    124      break;
    125    case SendStatus::kErrorMessageEmpty:
    126      DC_ERROR(("%s: %p send failed (kErrorMessageEmpty)", __func__, this));
    127      return EINVAL;
    128    case SendStatus::kErrorMessageTooLarge:
    129      DC_ERROR(("%s: %p send failed (kErrorMessageTooLarge)", __func__, this));
    130      return EMSGSIZE;
    131    case SendStatus::kErrorResourceExhaustion:
    132      DC_ERROR(
    133          ("%s: %p send failed (kErrorResourceExhaustion)", __func__, this));
    134      return ENOBUFS;  // Debatable?
    135    case SendStatus::kErrorShuttingDown:
    136      DC_ERROR(("%s: %p send failed (kErrorShuttingDown)", __func__, this));
    137      return EPIPE;  // Debatable?
    138  }
    139 
    140  return 0;
    141 }
    142 
    143 void DataChannelConnectionDcSctp::OnSctpPacketReceived(
    144    const MediaPacket& aPacket) {
    145  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    146  DC_DEBUG(
    147      ("%s: %p size=%u", __func__, this, static_cast<unsigned>(aPacket.len())));
    148  if (!mDcSctp) {
    149    return;
    150  }
    151  webrtc::ArrayView<const uint8_t> data(aPacket.data(), aPacket.len());
    152  mDcSctp->ReceivePacket(data);
    153 }
    154 
    155 bool DataChannelConnectionDcSctp::ResetStreams(nsTArray<uint16_t>& aStreams) {
    156  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    157  DC_DEBUG(("%s: %p", __func__, this));
    158  if (!mDcSctp) {
    159    return false;
    160  }
    161  std::vector<StreamID> converted;
    162  for (auto id : aStreams) {
    163    DC_DEBUG(("%s: %p Resetting %u", __func__, this, id));
    164    converted.push_back(StreamID(id));
    165  }
    166  auto result =
    167      mDcSctp->ResetStreams(webrtc::ArrayView<const StreamID>(converted));
    168  aStreams.Clear();
    169  return result == ResetStreamsStatus::kPerformed;
    170 }
    171 
    172 void DataChannelConnectionDcSctp::OnStreamOpen(uint16_t aStream) {
    173  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    174  DC_DEBUG(("%s: %p", __func__, this));
    175  for (auto it = mPreChannelData.begin(); it != mPreChannelData.end();) {
    176    if (it->GetStreamId() == aStream) {
    177      HandleDataMessage(std::move(*it));
    178      it = mPreChannelData.erase(it);
    179    } else {
    180      ++it;
    181    }
    182  }
    183 }
    184 
    185 SendPacketStatus DataChannelConnectionDcSctp::SendPacketWithStatus(
    186    webrtc::ArrayView<const uint8_t> aData) {
    187  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    188  DC_DEBUG(("%s: %p", __func__, this));
    189  std::unique_ptr<MediaPacket> packet(new MediaPacket);
    190  packet->SetType(MediaPacket::SCTP);
    191  packet->Copy(aData.data(), aData.size());
    192 
    193  DataChannelConnection::SendPacket(std::move(packet));
    194  return SendPacketStatus::kSuccess;
    195 }
    196 
    197 class DcSctpTimeout : public Timeout {
    198 public:
    199  explicit DcSctpTimeout(DataChannelConnectionDcSctp* aConnection)
    200      : mConnection(aConnection) {}
    201 
    202  // Called to start time timeout, with the duration in milliseconds as
    203  // `duration` and with the timeout identifier as `timeout_id`, which - if
    204  // the timeout expires - shall be provided to `DcSctpSocket::HandleTimeout`.
    205  //
    206  // `Start` and `Stop` will always be called in pairs. In other words will
    207  // ´Start` never be called twice, without a call to `Stop` in between.
    208  void Start(DurationMs duration, TimeoutID timeout_id) override {
    209    mId = timeout_id.value();
    210    DC_DEBUG(("%s: %u %ums", __func__, mId,
    211              static_cast<unsigned>(duration.value())));
    212    auto result = NS_NewTimerWithCallback(
    213        [connection = mConnection, timeout_id](nsITimer* timer) {
    214          DC_DEBUG(("%s: %u fired", __func__,
    215                    static_cast<unsigned>(timeout_id.value())));
    216          connection->HandleTimeout(timeout_id);
    217        },
    218        duration.value(), nsITimer::TYPE_ONE_SHOT, "DcSctpTimeout::Start"_ns);
    219    if (result.isOk()) {
    220      mTimer = result.unwrap();
    221    }
    222  }
    223 
    224  // Called to stop the running timeout.
    225  //
    226  // `Start` and `Stop` will always be called in pairs. In other words will
    227  // ´Start` never be called twice, without a call to `Stop` in between.
    228  //
    229  // `Stop` will always be called prior to releasing this object.
    230  void Stop() override {
    231    DC_DEBUG(("%s: %u", __func__, mId));
    232    if (mTimer) {
    233      mTimer->Cancel();
    234      mTimer = nullptr;
    235    }
    236  }
    237 
    238 private:
    239  RefPtr<DataChannelConnectionDcSctp> mConnection;
    240  nsCOMPtr<nsITimer> mTimer;
    241  unsigned mId = 0;
    242 };
    243 
    244 std::unique_ptr<Timeout> DataChannelConnectionDcSctp::CreateTimeout(
    245    webrtc::TaskQueueBase::DelayPrecision aPrecision) {
    246  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    247  DC_DEBUG(("%s: %p", __func__, this));
    248  // There is no such thing as a low precision TYPE_ONE_SHOT
    249  (void)aPrecision;
    250  return std::make_unique<DcSctpTimeout>(this);
    251 }
    252 
    253 void DataChannelConnectionDcSctp::HandleTimeout(TimeoutID aId) {
    254  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    255  DC_DEBUG(("%s: %p", __func__, this));
    256  if (mDcSctp) {
    257    mDcSctp->HandleTimeout(aId);
    258  }
    259 }
    260 
    261 uint32_t DataChannelConnectionDcSctp::GetRandomInt(uint32_t aLow,
    262                                                   uint32_t aHigh) {
    263  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    264  DC_DEBUG(("%s: %p", __func__, this));
    265  return aLow + RandomUint64OrDie() % (aHigh - aLow);
    266 }
    267 
    268 void DataChannelConnectionDcSctp::OnMessageReceived(DcSctpMessage aMessage) {
    269  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    270  DC_DEBUG(("%s: %p", __func__, this));
    271  RefPtr<DataChannel> channel =
    272      FindChannelByStream(aMessage.stream_id().value());
    273 
    274  IncomingMsg msg(aMessage.ppid().value(), aMessage.stream_id().value());
    275  // Sadly, nsCString and std::vector have no way to relinquish their buffers
    276  // to one another.
    277  msg.Append(aMessage.payload().data(), aMessage.payload().size());
    278  if (msg.GetPpid() == DATA_CHANNEL_PPID_CONTROL) {
    279    HandleDCEPMessage(std::move(msg));
    280  } else if (channel && !HasPreChannelData(msg.GetStreamId())) {
    281    HandleDataMessage(std::move(msg));
    282  } else {
    283    mPreChannelData.push_back(std::move(msg));
    284  }
    285 }
    286 
    287 void DataChannelConnectionDcSctp::OnError(ErrorKind aError,
    288                                          absl::string_view aMessage) {
    289  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    290  DC_ERROR(("%s: %p %d %s", __func__, this, static_cast<int>(aError),
    291            std::string(aMessage).c_str()));
    292 }
    293 
    294 void DataChannelConnectionDcSctp::OnAborted(ErrorKind aError,
    295                                            absl::string_view aMessage) {
    296  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    297  DC_ERROR(("%s: %p %d %s", __func__, this, static_cast<int>(aError),
    298            std::string(aMessage).c_str()));
    299  CloseAll_s();
    300 }
    301 
    302 void DataChannelConnectionDcSctp::OnConnected() {
    303  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    304  DC_DEBUG(("%s: %p", __func__, this));
    305  DataChannelConnectionState state = GetState();
    306  // TODO: Some duplicate code here, refactor
    307  if (state == DataChannelConnectionState::Connecting) {
    308    SetState(DataChannelConnectionState::Open);
    309 
    310    OnConnected();
    311    DC_DEBUG(("%s: %p DTLS connect() succeeded!  Entering connected mode",
    312              __func__, this));
    313 
    314    // Open any streams pending...
    315    // TODO: Do we really need to dispatch here? We're already on STS...
    316    RUN_ON_THREAD(mSTS,
    317                  WrapRunnable(RefPtr<DataChannelConnection>(this),
    318                               &DataChannelConnection::ProcessQueuedOpens),
    319                  NS_DISPATCH_NORMAL);
    320 
    321  } else if (state == DataChannelConnectionState::Open) {
    322    DC_DEBUG(("%s: %p DataConnection Already OPEN", __func__, this));
    323  } else {
    324    DC_ERROR(("%s: %p Unexpected state: %s", __func__, this, ToString(state)));
    325  }
    326 }
    327 
    328 void DataChannelConnectionDcSctp::OnClosed() {
    329  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    330  DC_DEBUG(("%s: %p", __func__, this));
    331  CloseAll_s();
    332 }
    333 
    334 void DataChannelConnectionDcSctp::OnConnectionRestarted() {
    335  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    336  DC_DEBUG(("%s: %p", __func__, this));
    337 }
    338 
    339 void DataChannelConnectionDcSctp::OnStreamsResetFailed(
    340    webrtc::ArrayView<const StreamID> aOutgoingStreams,
    341    absl::string_view aReason) {
    342  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    343  DC_ERROR(("%s: %p", __func__, this));
    344  // It probably does not make much sense to retry this here. If dcsctp doesn't
    345  // want to retry, we probably don't either.
    346  (void)aReason;
    347  std::vector<uint16_t> streamsReset;
    348  for (auto id : aOutgoingStreams) {
    349    streamsReset.push_back(id.value());
    350  }
    351  OnStreamsResetComplete(std::move(streamsReset));
    352 }
    353 
    354 void DataChannelConnectionDcSctp::OnStreamsResetPerformed(
    355    webrtc::ArrayView<const StreamID> aOutgoingStreams) {
    356  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    357  DC_DEBUG(("%s: %p", __func__, this));
    358  std::vector<uint16_t> streamsReset;
    359  for (auto id : aOutgoingStreams) {
    360    streamsReset.push_back(id.value());
    361  }
    362  OnStreamsResetComplete(std::move(streamsReset));
    363 }
    364 
    365 void DataChannelConnectionDcSctp::OnIncomingStreamsReset(
    366    webrtc::ArrayView<const StreamID> aIncomingStreams) {
    367  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    368  DC_DEBUG(("%s: %p", __func__, this));
    369  std::vector<uint16_t> streamsReset;
    370  for (auto id : aIncomingStreams) {
    371    streamsReset.push_back(id.value());
    372  }
    373  OnStreamsReset(std::move(streamsReset));
    374 }
    375 
    376 // We (ab)use this callback to detect when _any_ data has been sent on the
    377 // stream id, to drive updates to mainthread.
    378 void DataChannelConnectionDcSctp::OnBufferedAmountLow(StreamID aStreamId) {
    379  MOZ_ASSERT(mSTS->IsOnCurrentThread());
    380  UpdateBufferedAmount(aStreamId);
    381 }
    382 
    383 void DataChannelConnectionDcSctp::OnLifecycleMessageFullySent(
    384    LifecycleId aLifecycleId) {
    385  DC_DEBUG(("%s: %p aLifecycleId=%u", __func__, this,
    386            static_cast<unsigned>(aLifecycleId.value())));
    387  OnDCEPMessageDone(aLifecycleId);
    388 }
    389 
    390 void DataChannelConnectionDcSctp::OnLifecycleMessageExpired(
    391    LifecycleId aLifecycleId, bool aMaybeDelivered) {
    392  DC_DEBUG(("%s: %p aLifecycleId=%u aMaybeDelivered=%d", __func__, this,
    393            static_cast<unsigned>(aLifecycleId.value()),
    394            static_cast<int>(aMaybeDelivered)));
    395  if (!aMaybeDelivered) {
    396    OnDCEPMessageDone(aLifecycleId);
    397  }
    398 }
    399 
    400 void DataChannelConnectionDcSctp::UpdateBufferedAmount(StreamID aStreamId) {
    401  DC_DEBUG(("%s: %p id=%u", __func__, this,
    402            static_cast<unsigned>(aStreamId.value())));
    403  mSTS->Dispatch(NS_NewRunnableFunction(
    404      "DataChannelConnectionDcSctp::UpdateBufferedAmount",
    405      [this, self = RefPtr<DataChannelConnectionDcSctp>(this), aStreamId]() {
    406        auto channel = mChannels.Get(aStreamId.value());
    407        if (!channel || !mDcSctp) {
    408          return;
    409        }
    410 
    411        size_t oldAmount = mBufferedAmounts[aStreamId.value()];
    412        size_t newAmount = mDcSctp->buffered_amount(aStreamId);
    413        int decreaseWithoutDCEP =
    414            oldAmount - newAmount - mDCEPBytesSent[aStreamId.value()];
    415 
    416        if (decreaseWithoutDCEP > 0) {
    417          channel->DecrementBufferedAmount(decreaseWithoutDCEP);
    418        }
    419 
    420        DC_DEBUG(("%s: %p id=%u amount %u -> %u (difference without DCEP %d)",
    421                  __func__, this, static_cast<unsigned>(aStreamId.value()),
    422                  static_cast<unsigned>(oldAmount),
    423                  static_cast<unsigned>(newAmount), decreaseWithoutDCEP));
    424        mDCEPBytesSent.erase(aStreamId.value());
    425        mBufferedAmounts[aStreamId.value()] = newAmount;
    426        mDcSctp->SetBufferedAmountLowThreshold(aStreamId,
    427                                               newAmount ? newAmount - 1 : 0);
    428      }));
    429 }
    430 
    431 void DataChannelConnectionDcSctp::OnDCEPMessageDone(LifecycleId aLifecycleId) {
    432  DC_DEBUG(("%s: %p", __func__, this));
    433  // Find the stream id and the size of this DCEP packet.
    434  auto it = mBufferedDCEPBytes.find(aLifecycleId.value());
    435  if (it == mBufferedDCEPBytes.end()) {
    436    MOZ_ASSERT(false);
    437    return;
    438  }
    439 
    440  auto& [stream, size] = it->second;
    441 
    442  // Find the running total of DCEP bytes sent for this stream, and add the
    443  // number of DCEP bytes we just learned about.
    444  mDCEPBytesSent[stream] += size;
    445  DC_DEBUG(("%s: %p id=%u amount=%u", __func__, this,
    446            static_cast<unsigned>(stream), static_cast<unsigned>(size)));
    447 
    448  // This is mainly to reset the buffered amount low threshold.
    449  UpdateBufferedAmount(StreamID(stream));
    450 
    451  mBufferedDCEPBytes.erase(it);
    452 }
    453 
    454 bool DataChannelConnectionDcSctp::HasPreChannelData(uint16_t aStream) const {
    455  for (const auto& msg : mPreChannelData) {
    456    if (msg.GetStreamId() == aStream) {
    457      return true;
    458    }
    459  }
    460  return false;
    461 }
    462 
    463 }  // namespace mozilla