tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

dcsctp_transport.cc (27295B)


      1 /*
      2 *  Copyright 2021 The WebRTC project authors. All Rights Reserved.
      3 *
      4 *  Use of this source code is governed by a BSD-style license
      5 *  that can be found in the LICENSE file in the root of the source
      6 *  tree. An additional intellectual property rights grant can be found
      7 *  in the file PATENTS.  All contributing project authors may
      8 *  be found in the AUTHORS file in the root of the source tree.
      9 */
     10 
     11 #include "media/sctp/dcsctp_transport.h"
     12 
     13 #include <atomic>
     14 #include <cstddef>
     15 #include <cstdint>
     16 #include <functional>
     17 #include <limits>
     18 #include <memory>
     19 #include <optional>
     20 #include <string>
     21 #include <utility>
     22 #include <vector>
     23 
     24 #include "absl/strings/string_view.h"
     25 #include "api/array_view.h"
     26 #include "api/data_channel_interface.h"
     27 #include "api/dtls_transport_interface.h"
     28 #include "api/environment/environment.h"
     29 #include "api/field_trials_view.h"
     30 #include "api/priority.h"
     31 #include "api/rtc_error.h"
     32 #include "api/sctp_transport_interface.h"
     33 #include "api/sequence_checker.h"
     34 #include "api/task_queue/task_queue_base.h"
     35 #include "api/transport/data_channel_transport_interface.h"
     36 #include "media/sctp/sctp_transport_internal.h"
     37 #include "net/dcsctp/public/dcsctp_message.h"
     38 #include "net/dcsctp/public/dcsctp_options.h"
     39 #include "net/dcsctp/public/dcsctp_socket.h"
     40 #include "net/dcsctp/public/dcsctp_socket_factory.h"
     41 #include "net/dcsctp/public/packet_observer.h"
     42 #include "net/dcsctp/public/text_pcap_packet_observer.h"
     43 #include "net/dcsctp/public/timeout.h"
     44 #include "net/dcsctp/public/types.h"
     45 #include "p2p/base/packet_transport_internal.h"
     46 #include "p2p/dtls/dtls_transport_internal.h"
     47 #include "rtc_base/async_packet_socket.h"
     48 #include "rtc_base/checks.h"
     49 #include "rtc_base/copy_on_write_buffer.h"
     50 #include "rtc_base/logging.h"
     51 #include "rtc_base/network/received_packet.h"
     52 #include "rtc_base/socket.h"
     53 #include "rtc_base/strings/string_builder.h"
     54 #include "rtc_base/thread.h"
     55 #include "rtc_base/trace_event.h"
     56 #include "system_wrappers/include/clock.h"
     57 
     58 namespace webrtc {
     59 
     60 namespace {
     61 using ::dcsctp::SendPacketStatus;
     62 
     63 // When there is packet loss for a long time, the SCTP retry timers will use
     64 // exponential backoff, which can grow to very long durations and when the
     65 // connection recovers, it may take a long time to reach the new backoff
     66 // duration. By limiting it to a reasonable limit, the time to recover reduces.
     67 constexpr dcsctp::DurationMs kMaxTimerBackoffDuration =
     68    dcsctp::DurationMs(3000);
     69 
     70 enum class WebrtcPPID : dcsctp::PPID::UnderlyingType {
     71  // https://www.rfc-editor.org/rfc/rfc8832.html#section-8.1
     72  kDCEP = 50,
     73  // https://www.rfc-editor.org/rfc/rfc8831.html#section-8
     74  kString = 51,
     75  kBinaryPartial = 52,  // Deprecated
     76  kBinary = 53,
     77  kStringPartial = 54,  // Deprecated
     78  kStringEmpty = 56,
     79  kBinaryEmpty = 57,
     80 };
     81 
     82 WebrtcPPID ToPPID(DataMessageType message_type, size_t size) {
     83  switch (message_type) {
     84    case DataMessageType::kControl:
     85      return WebrtcPPID::kDCEP;
     86    case DataMessageType::kText:
     87      return size > 0 ? WebrtcPPID::kString : WebrtcPPID::kStringEmpty;
     88    case DataMessageType::kBinary:
     89      return size > 0 ? WebrtcPPID::kBinary : WebrtcPPID::kBinaryEmpty;
     90  }
     91 }
     92 
     93 std::optional<DataMessageType> ToDataMessageType(dcsctp::PPID ppid) {
     94  switch (static_cast<WebrtcPPID>(ppid.value())) {
     95    case WebrtcPPID::kDCEP:
     96      return DataMessageType::kControl;
     97    case WebrtcPPID::kString:
     98    case WebrtcPPID::kStringPartial:
     99    case WebrtcPPID::kStringEmpty:
    100      return DataMessageType::kText;
    101    case WebrtcPPID::kBinary:
    102    case WebrtcPPID::kBinaryPartial:
    103    case WebrtcPPID::kBinaryEmpty:
    104      return DataMessageType::kBinary;
    105  }
    106  return std::nullopt;
    107 }
    108 
    109 std::optional<SctpErrorCauseCode> ToErrorCauseCode(dcsctp::ErrorKind error) {
    110  switch (error) {
    111    case dcsctp::ErrorKind::kParseFailed:
    112      return SctpErrorCauseCode::kUnrecognizedParameters;
    113    case dcsctp::ErrorKind::kPeerReported:
    114      return SctpErrorCauseCode::kUserInitiatedAbort;
    115    case dcsctp::ErrorKind::kWrongSequence:
    116    case dcsctp::ErrorKind::kProtocolViolation:
    117      return SctpErrorCauseCode::kProtocolViolation;
    118    case dcsctp::ErrorKind::kResourceExhaustion:
    119      return SctpErrorCauseCode::kOutOfResource;
    120    case dcsctp::ErrorKind::kTooManyRetries:
    121    case dcsctp::ErrorKind::kUnsupportedOperation:
    122    case dcsctp::ErrorKind::kNoError:
    123    case dcsctp::ErrorKind::kNotConnected:
    124      // No SCTP error cause code matches those
    125      break;
    126  }
    127  return std::nullopt;
    128 }
    129 
    130 bool IsEmptyPPID(dcsctp::PPID ppid) {
    131  WebrtcPPID webrtc_ppid = static_cast<WebrtcPPID>(ppid.value());
    132  return webrtc_ppid == WebrtcPPID::kStringEmpty ||
    133         webrtc_ppid == WebrtcPPID::kBinaryEmpty;
    134 }
    135 }  // namespace
    136 
    137 DcSctpTransport::DcSctpTransport(const Environment& env,
    138                                 Thread* network_thread,
    139                                 DtlsTransportInternal* transport)
    140    : DcSctpTransport(env,
    141                      network_thread,
    142                      transport,
    143                      std::make_unique<dcsctp::DcSctpSocketFactory>()) {}
    144 
    145 DcSctpTransport::DcSctpTransport(
    146    const Environment& env,
    147    Thread* network_thread,
    148    DtlsTransportInternal* transport,
    149    std::unique_ptr<dcsctp::DcSctpSocketFactory> socket_factory)
    150    : network_thread_(network_thread),
    151      transport_(transport),
    152      env_(env),
    153      random_(env_.clock().TimeInMicroseconds()),
    154      socket_factory_(std::move(socket_factory)),
    155      task_queue_timeout_factory_(
    156          *network_thread,
    157          [this]() { return TimeMillis(); },
    158          [this](dcsctp::TimeoutID timeout_id) {
    159            socket_->HandleTimeout(timeout_id);
    160          }) {
    161  RTC_DCHECK_RUN_ON(network_thread_);
    162  static std::atomic<int> instance_count = 0;
    163  StringBuilder sb;
    164  sb << debug_name_ << instance_count++;
    165  debug_name_ = sb.Release();
    166  ConnectTransportSignals();
    167 }
    168 
    169 DcSctpTransport::~DcSctpTransport() {
    170  if (socket_) {
    171    socket_->Close();
    172  }
    173 }
    174 
    175 void DcSctpTransport::SetOnConnectedCallback(std::function<void()> callback) {
    176  RTC_DCHECK_RUN_ON(network_thread_);
    177  on_connected_callback_ = std::move(callback);
    178 }
    179 
    180 void DcSctpTransport::SetDataChannelSink(DataChannelSink* sink) {
    181  RTC_DCHECK_RUN_ON(network_thread_);
    182  data_channel_sink_ = sink;
    183  if (data_channel_sink_ && ready_to_send_data_) {
    184    data_channel_sink_->OnReadyToSend();
    185  }
    186 }
    187 
    188 void DcSctpTransport::SetDtlsTransport(DtlsTransportInternal* transport) {
    189  RTC_DCHECK_RUN_ON(network_thread_);
    190  DisconnectTransportSignals();
    191  transport_ = transport;
    192  ConnectTransportSignals();
    193  MaybeConnectSocket();
    194 }
    195 
    196 bool DcSctpTransport::Start(const SctpOptions& options) {
    197  RTC_DCHECK_RUN_ON(network_thread_);
    198  RTC_DCHECK(options.max_message_size > 0);
    199  RTC_DLOG(LS_INFO) << debug_name_ << "->Start(local=" << options.local_port
    200                    << ", remote=" << options.remote_port
    201                    << ", max_message_size=" << options.max_message_size << ")";
    202 
    203  if (!socket_) {
    204    dcsctp::DcSctpOptions dcsctp_options =
    205        CreateDcSctpOptions(options, env_.field_trials());
    206    std::unique_ptr<dcsctp::PacketObserver> packet_observer;
    207    if (RTC_LOG_CHECK_LEVEL(LS_VERBOSE)) {
    208      packet_observer =
    209          std::make_unique<dcsctp::TextPcapPacketObserver>(debug_name_);
    210    }
    211 
    212    socket_ = socket_factory_->Create(
    213        debug_name_, *this, std::move(packet_observer), dcsctp_options);
    214  } else {
    215    if (options.local_port != socket_->options().local_port ||
    216        options.remote_port != socket_->options().remote_port) {
    217      RTC_LOG(LS_ERROR)
    218          << debug_name_ << "->Start(local=" << options.local_port
    219          << ", remote=" << options.remote_port
    220          << "): Can't change ports on already started transport.";
    221      return false;
    222    }
    223    socket_->SetMaxMessageSize(options.max_message_size);
    224  }
    225 
    226  MaybeConnectSocket();
    227 
    228  for (const auto& [sid, stream_state] : stream_states_) {
    229    socket_->SetStreamPriority(sid, stream_state.priority);
    230  }
    231 
    232  return true;
    233 }
    234 
    235 bool DcSctpTransport::OpenStream(int sid, PriorityValue priority) {
    236  RTC_DCHECK_RUN_ON(network_thread_);
    237  RTC_DLOG(LS_INFO) << debug_name_ << "->OpenStream(" << sid << ", "
    238                    << priority.value() << ").";
    239 
    240  StreamState stream_state;
    241  stream_state.priority = dcsctp::StreamPriority(priority.value());
    242  stream_states_.insert_or_assign(dcsctp::StreamID(static_cast<uint16_t>(sid)),
    243                                  stream_state);
    244  if (socket_) {
    245    socket_->SetStreamPriority(dcsctp::StreamID(sid),
    246                               dcsctp::StreamPriority(priority.value()));
    247  }
    248 
    249  return true;
    250 }
    251 
    252 bool DcSctpTransport::ResetStream(int sid) {
    253  RTC_DCHECK_RUN_ON(network_thread_);
    254  RTC_DLOG(LS_INFO) << debug_name_ << "->ResetStream(" << sid << ").";
    255  if (!socket_) {
    256    RTC_LOG(LS_ERROR) << debug_name_ << "->ResetStream(sid=" << sid
    257                      << "): Transport is not started.";
    258    return false;
    259  }
    260 
    261  dcsctp::StreamID streams[1] = {dcsctp::StreamID(static_cast<uint16_t>(sid))};
    262 
    263  auto it = stream_states_.find(streams[0]);
    264  if (it == stream_states_.end()) {
    265    RTC_LOG(LS_ERROR) << debug_name_ << "->ResetStream(sid=" << sid
    266                      << "): Stream is not open.";
    267    return false;
    268  }
    269 
    270  StreamState& stream_state = it->second;
    271  if (stream_state.closure_initiated || stream_state.incoming_reset_done ||
    272      stream_state.outgoing_reset_done) {
    273    // The closing procedure was already initiated by the remote, don't do
    274    // anything.
    275    return false;
    276  }
    277  stream_state.closure_initiated = true;
    278  socket_->ResetStreams(streams);
    279  return true;
    280 }
    281 
    282 RTCError DcSctpTransport::SendData(int sid,
    283                                   const SendDataParams& params,
    284                                   const CopyOnWriteBuffer& payload) {
    285  RTC_DCHECK_RUN_ON(network_thread_);
    286  RTC_DLOG(LS_VERBOSE) << debug_name_ << "->SendData(sid=" << sid
    287                       << ", type=" << static_cast<int>(params.type)
    288                       << ", length=" << payload.size() << ").";
    289 
    290  if (!socket_) {
    291    RTC_LOG(LS_ERROR) << debug_name_
    292                      << "->SendData(...): Transport is not started.";
    293    return RTCError(RTCErrorType::INVALID_STATE);
    294  }
    295 
    296  // It is possible for a message to be sent from the signaling thread at the
    297  // same time a data-channel is closing, but before the signaling thread is
    298  // aware of it. So we need to keep track of currently active data channels and
    299  // skip sending messages for the ones that are not open or closing.
    300  // The sending errors are not impacting the data channel API contract as
    301  // it is allowed to discard queued messages when the channel is closing.
    302  auto stream_state =
    303      stream_states_.find(dcsctp::StreamID(static_cast<uint16_t>(sid)));
    304  if (stream_state == stream_states_.end()) {
    305    RTC_LOG(LS_VERBOSE) << "Skipping message on non-open stream with sid: "
    306                        << sid;
    307    return RTCError(RTCErrorType::INVALID_STATE);
    308  }
    309 
    310  if (stream_state->second.closure_initiated ||
    311      stream_state->second.incoming_reset_done ||
    312      stream_state->second.outgoing_reset_done) {
    313    RTC_LOG(LS_VERBOSE) << "Skipping message on closing stream with sid: "
    314                        << sid;
    315    return RTCError(RTCErrorType::INVALID_STATE);
    316  }
    317 
    318  auto max_message_size = socket_->options().max_message_size;
    319  if (max_message_size > 0 && payload.size() > max_message_size) {
    320    RTC_LOG(LS_WARNING) << debug_name_
    321                        << "->SendData(...): "
    322                           "Trying to send packet bigger "
    323                           "than the max message size: "
    324                        << payload.size() << " vs max of " << max_message_size;
    325    return RTCError(RTCErrorType::INVALID_RANGE);
    326  }
    327 
    328  std::vector<uint8_t> message_payload(payload.cdata(),
    329                                       payload.cdata() + payload.size());
    330  if (message_payload.empty()) {
    331    // https://www.rfc-editor.org/rfc/rfc8831.html#section-6.6
    332    // SCTP does not support the sending of empty user messages. Therefore, if
    333    // an empty message has to be sent, the appropriate PPID (WebRTC String
    334    // Empty or WebRTC Binary Empty) is used, and the SCTP user message of one
    335    // zero byte is sent.
    336    message_payload.push_back('\0');
    337  }
    338 
    339  dcsctp::DcSctpMessage message(
    340      dcsctp::StreamID(static_cast<uint16_t>(sid)),
    341      dcsctp::PPID(static_cast<uint16_t>(ToPPID(params.type, payload.size()))),
    342      std::move(message_payload));
    343 
    344  dcsctp::SendOptions send_options;
    345  send_options.unordered = dcsctp::IsUnordered(!params.ordered);
    346  if (params.max_rtx_ms.has_value()) {
    347    RTC_DCHECK(*params.max_rtx_ms >= 0 &&
    348               *params.max_rtx_ms <= std::numeric_limits<uint16_t>::max());
    349    send_options.lifetime = dcsctp::DurationMs(*params.max_rtx_ms);
    350  }
    351  if (params.max_rtx_count.has_value()) {
    352    RTC_DCHECK(*params.max_rtx_count >= 0 &&
    353               *params.max_rtx_count <= std::numeric_limits<uint16_t>::max());
    354    send_options.max_retransmissions = *params.max_rtx_count;
    355  }
    356 
    357  dcsctp::SendStatus error = socket_->Send(std::move(message), send_options);
    358  switch (error) {
    359    case dcsctp::SendStatus::kSuccess:
    360      return RTCError::OK();
    361    case dcsctp::SendStatus::kErrorResourceExhaustion:
    362      ready_to_send_data_ = false;
    363      return RTCError(RTCErrorType::RESOURCE_EXHAUSTED);
    364    default:
    365      absl::string_view error_message = dcsctp::ToString(error);
    366      RTC_LOG(LS_ERROR) << debug_name_
    367                        << "->SendData(...): send() failed with error "
    368                        << error_message << ".";
    369      return RTCError(RTCErrorType::NETWORK_ERROR, error_message);
    370  }
    371 }
    372 
    373 bool DcSctpTransport::ReadyToSendData() {
    374  RTC_DCHECK_RUN_ON(network_thread_);
    375  return ready_to_send_data_;
    376 }
    377 
    378 int DcSctpTransport::max_message_size() const {
    379  if (!socket_) {
    380    RTC_LOG(LS_ERROR) << debug_name_
    381                      << "->max_message_size(...): Transport is not started.";
    382    return 0;
    383  }
    384  return socket_->options().max_message_size;
    385 }
    386 
    387 std::optional<int> DcSctpTransport::max_outbound_streams() const {
    388  if (!socket_)
    389    return std::nullopt;
    390  return socket_->options().announced_maximum_outgoing_streams;
    391 }
    392 
    393 std::optional<int> DcSctpTransport::max_inbound_streams() const {
    394  if (!socket_)
    395    return std::nullopt;
    396  return socket_->options().announced_maximum_incoming_streams;
    397 }
    398 
    399 size_t DcSctpTransport::buffered_amount(int sid) const {
    400  if (!socket_)
    401    return 0;
    402  return socket_->buffered_amount(dcsctp::StreamID(sid));
    403 }
    404 
    405 size_t DcSctpTransport::buffered_amount_low_threshold(int sid) const {
    406  if (!socket_)
    407    return 0;
    408  return socket_->buffered_amount_low_threshold(dcsctp::StreamID(sid));
    409 }
    410 
    411 void DcSctpTransport::SetBufferedAmountLowThreshold(int sid, size_t bytes) {
    412  if (!socket_)
    413    return;
    414  socket_->SetBufferedAmountLowThreshold(dcsctp::StreamID(sid), bytes);
    415 }
    416 
    417 void DcSctpTransport::set_debug_name_for_testing(const char* debug_name) {
    418  debug_name_ = debug_name;
    419 }
    420 
    421 SendPacketStatus DcSctpTransport::SendPacketWithStatus(
    422    ArrayView<const uint8_t> data) {
    423  RTC_DCHECK_RUN_ON(network_thread_);
    424  RTC_DCHECK(socket_);
    425 
    426  if (data.size() > (socket_->options().mtu)) {
    427    RTC_LOG(LS_ERROR) << debug_name_
    428                      << "->SendPacket(...): "
    429                         "SCTP seems to have made a packet that is bigger "
    430                         "than its official MTU: "
    431                      << data.size() << " vs max of " << socket_->options().mtu;
    432    return SendPacketStatus::kError;
    433  }
    434  TRACE_EVENT0("webrtc", "DcSctpTransport::SendPacket");
    435 
    436  if (!transport_ || !transport_->writable())
    437    return SendPacketStatus::kError;
    438 
    439  RTC_DLOG(LS_VERBOSE) << debug_name_ << "->SendPacket(length=" << data.size()
    440                       << ")";
    441 
    442  auto result =
    443      transport_->SendPacket(reinterpret_cast<const char*>(data.data()),
    444                             data.size(), AsyncSocketPacketOptions(), 0);
    445 
    446  if (result < 0) {
    447    RTC_LOG(LS_WARNING) << debug_name_ << "->SendPacket(length=" << data.size()
    448                        << ") failed with error: " << transport_->GetError()
    449                        << ".";
    450 
    451    if (IsBlockingError(transport_->GetError())) {
    452      return SendPacketStatus::kTemporaryFailure;
    453    }
    454    return SendPacketStatus::kError;
    455  }
    456  return SendPacketStatus::kSuccess;
    457 }
    458 
    459 std::unique_ptr<dcsctp::Timeout> DcSctpTransport::CreateTimeout(
    460    TaskQueueBase::DelayPrecision precision) {
    461  return task_queue_timeout_factory_.CreateTimeout(precision);
    462 }
    463 
    464 dcsctp::TimeMs DcSctpTransport::TimeMillis() {
    465  return dcsctp::TimeMs(env_.clock().TimeInMilliseconds());
    466 }
    467 
    468 uint32_t DcSctpTransport::GetRandomInt(uint32_t low, uint32_t high) {
    469  return random_.Rand(low, high);
    470 }
    471 
    472 void DcSctpTransport::OnTotalBufferedAmountLow() {
    473  RTC_DCHECK_RUN_ON(network_thread_);
    474  if (!ready_to_send_data_) {
    475    ready_to_send_data_ = true;
    476    if (data_channel_sink_) {
    477      data_channel_sink_->OnReadyToSend();
    478    }
    479  }
    480 }
    481 
    482 void DcSctpTransport::OnBufferedAmountLow(dcsctp::StreamID stream_id) {
    483  RTC_DCHECK_RUN_ON(network_thread_);
    484  if (data_channel_sink_) {
    485    data_channel_sink_->OnBufferedAmountLow(*stream_id);
    486  }
    487 }
    488 
    489 void DcSctpTransport::OnMessageReceived(dcsctp::DcSctpMessage message) {
    490  RTC_DCHECK_RUN_ON(network_thread_);
    491  RTC_DLOG(LS_VERBOSE) << debug_name_ << "->OnMessageReceived(sid="
    492                       << message.stream_id().value()
    493                       << ", ppid=" << message.ppid().value()
    494                       << ", length=" << message.payload().size() << ").";
    495  auto type = ToDataMessageType(message.ppid());
    496  if (!type.has_value()) {
    497    RTC_LOG(LS_VERBOSE) << debug_name_
    498                        << "->OnMessageReceived(): Received an unknown PPID "
    499                        << message.ppid().value()
    500                        << " on an SCTP packet. Dropping.";
    501    return;
    502  }
    503  receive_buffer_.Clear();
    504  if (!IsEmptyPPID(message.ppid()))
    505    receive_buffer_.AppendData(message.payload().data(),
    506                               message.payload().size());
    507 
    508  if (data_channel_sink_) {
    509    data_channel_sink_->OnDataReceived(message.stream_id().value(), *type,
    510                                       receive_buffer_);
    511  }
    512 }
    513 
    514 void DcSctpTransport::OnError(dcsctp::ErrorKind error,
    515                              absl::string_view message) {
    516  if (error == dcsctp::ErrorKind::kResourceExhaustion) {
    517    // Indicates that a message failed to be enqueued, because the send buffer
    518    // is full, which is a very common (and wanted) state for high throughput
    519    // sending/benchmarks.
    520    RTC_LOG(LS_VERBOSE) << debug_name_
    521                        << "->OnError(error=" << dcsctp::ToString(error)
    522                        << ", message=" << message << ").";
    523  } else {
    524    RTC_LOG(LS_ERROR) << debug_name_
    525                      << "->OnError(error=" << dcsctp::ToString(error)
    526                      << ", message=" << message << ").";
    527  }
    528 }
    529 
    530 void DcSctpTransport::OnAborted(dcsctp::ErrorKind error,
    531                                absl::string_view message) {
    532  RTC_DCHECK_RUN_ON(network_thread_);
    533  RTC_LOG(LS_ERROR) << debug_name_
    534                    << "->OnAborted(error=" << dcsctp::ToString(error)
    535                    << ", message=" << message << ").";
    536  ready_to_send_data_ = false;
    537  RTCError rtc_error(RTCErrorType::OPERATION_ERROR_WITH_DATA,
    538                     std::string(message));
    539  rtc_error.set_error_detail(RTCErrorDetailType::SCTP_FAILURE);
    540  auto code = ToErrorCauseCode(error);
    541  if (code.has_value()) {
    542    rtc_error.set_sctp_cause_code(static_cast<uint16_t>(*code));
    543  }
    544  if (data_channel_sink_) {
    545    data_channel_sink_->OnTransportClosed(rtc_error);
    546  }
    547 }
    548 
    549 void DcSctpTransport::OnConnected() {
    550  RTC_DCHECK_RUN_ON(network_thread_);
    551  RTC_DLOG(LS_INFO) << debug_name_ << "->OnConnected().";
    552  ready_to_send_data_ = true;
    553  if (data_channel_sink_) {
    554    data_channel_sink_->OnReadyToSend();
    555  }
    556  if (on_connected_callback_) {
    557    on_connected_callback_();
    558  }
    559 }
    560 
    561 void DcSctpTransport::OnClosed() {
    562  RTC_DCHECK_RUN_ON(network_thread_);
    563  RTC_DLOG(LS_INFO) << debug_name_ << "->OnClosed().";
    564  ready_to_send_data_ = false;
    565 }
    566 
    567 void DcSctpTransport::OnConnectionRestarted() {
    568  RTC_DLOG(LS_INFO) << debug_name_ << "->OnConnectionRestarted().";
    569 }
    570 
    571 void DcSctpTransport::OnStreamsResetFailed(
    572    ArrayView<const dcsctp::StreamID> outgoing_streams,
    573    absl::string_view reason) {
    574  // TODO(orphis): Need a test to check for correct behavior
    575  for (auto& stream_id : outgoing_streams) {
    576    RTC_LOG(LS_WARNING)
    577        << debug_name_
    578        << "->OnStreamsResetFailed(...): Outgoing stream reset failed"
    579        << ", sid=" << stream_id.value() << ", reason: " << reason << ".";
    580  }
    581 }
    582 
    583 void DcSctpTransport::OnStreamsResetPerformed(
    584    ArrayView<const dcsctp::StreamID> outgoing_streams) {
    585  RTC_DCHECK_RUN_ON(network_thread_);
    586  for (auto& stream_id : outgoing_streams) {
    587    RTC_LOG(LS_INFO) << debug_name_
    588                     << "->OnStreamsResetPerformed(...): Outgoing stream reset"
    589                     << ", sid=" << stream_id.value();
    590 
    591    auto it = stream_states_.find(stream_id);
    592    if (it == stream_states_.end()) {
    593      // Ignoring an outgoing stream reset for a closed stream
    594      return;
    595    }
    596 
    597    StreamState& stream_state = it->second;
    598    stream_state.outgoing_reset_done = true;
    599 
    600    if (stream_state.incoming_reset_done) {
    601      //  When the close was not initiated locally, we can signal the end of the
    602      //  data channel close procedure when the remote ACKs the reset.
    603      if (data_channel_sink_) {
    604        data_channel_sink_->OnChannelClosed(stream_id.value());
    605      }
    606      stream_states_.erase(stream_id);
    607    }
    608  }
    609 }
    610 
    611 void DcSctpTransport::OnIncomingStreamsReset(
    612    ArrayView<const dcsctp::StreamID> incoming_streams) {
    613  RTC_DCHECK_RUN_ON(network_thread_);
    614  for (auto& stream_id : incoming_streams) {
    615    RTC_LOG(LS_INFO) << debug_name_
    616                     << "->OnIncomingStreamsReset(...): Incoming stream reset"
    617                     << ", sid=" << stream_id.value();
    618 
    619    auto it = stream_states_.find(stream_id);
    620    if (it == stream_states_.end())
    621      return;
    622 
    623    StreamState& stream_state = it->second;
    624    stream_state.incoming_reset_done = true;
    625 
    626    if (!stream_state.closure_initiated) {
    627      // When receiving an incoming stream reset event for a non local close
    628      // procedure, the transport needs to reset the stream in the other
    629      // direction too.
    630      dcsctp::StreamID streams[1] = {stream_id};
    631      socket_->ResetStreams(streams);
    632      if (data_channel_sink_) {
    633        data_channel_sink_->OnChannelClosing(stream_id.value());
    634      }
    635    }
    636 
    637    if (stream_state.outgoing_reset_done) {
    638      // The close procedure that was initiated locally is complete when we
    639      // receive and incoming reset event.
    640      if (data_channel_sink_) {
    641        data_channel_sink_->OnChannelClosed(stream_id.value());
    642      }
    643      stream_states_.erase(stream_id);
    644    }
    645  }
    646 }
    647 
    648 void DcSctpTransport::ConnectTransportSignals() {
    649  RTC_DCHECK_RUN_ON(network_thread_);
    650  if (!transport_) {
    651    return;
    652  }
    653  transport_->SubscribeWritableState(
    654      this, [this](PacketTransportInternal* transport) {
    655        OnTransportWritableState(transport);
    656      });
    657 
    658  transport_->RegisterReceivedPacketCallback(
    659      this,
    660      [&](PacketTransportInternal* transport, const ReceivedIpPacket& packet) {
    661        OnTransportReadPacket(transport, packet);
    662      });
    663  transport_->SetOnCloseCallback([this]() {
    664    RTC_DCHECK_RUN_ON(network_thread_);
    665    RTC_DLOG(LS_VERBOSE) << debug_name_ << "->OnTransportClosed().";
    666    if (data_channel_sink_) {
    667      data_channel_sink_->OnTransportClosed({});
    668    }
    669  });
    670  transport_->SubscribeDtlsTransportState(
    671      this, [this](DtlsTransportInternal* transport, DtlsTransportState state) {
    672        OnDtlsTransportState(transport, state);
    673      });
    674 }
    675 
    676 void DcSctpTransport::DisconnectTransportSignals() {
    677  RTC_DCHECK_RUN_ON(network_thread_);
    678  if (!transport_) {
    679    return;
    680  }
    681  transport_->UnsubscribeWritableState(this);
    682  transport_->DeregisterReceivedPacketCallback(this);
    683  transport_->SetOnCloseCallback(nullptr);
    684  transport_->UnsubscribeDtlsTransportState(this);
    685 }
    686 
    687 void DcSctpTransport::OnTransportWritableState(
    688    PacketTransportInternal* transport) {
    689  RTC_DCHECK_RUN_ON(network_thread_);
    690  RTC_DCHECK_EQ(transport_, transport);
    691  RTC_DLOG(LS_VERBOSE) << debug_name_
    692                       << "->OnTransportWritableState(), writable="
    693                       << transport->writable() << " socket: "
    694                       << (socket_ ? std::to_string(
    695                                         static_cast<int>(socket_->state()))
    696                                   : "UNSET");
    697  MaybeConnectSocket();
    698 }
    699 
    700 void DcSctpTransport::OnDtlsTransportState(DtlsTransportInternal* transport,
    701                                           DtlsTransportState state) {
    702  if (state == DtlsTransportState::kNew && socket_) {
    703    // IF DTLS restart (DtlsTransportState::kNew)
    704    // THEN
    705    //   reset the socket so that we send an SCTP init
    706    //   before any outgoing messages. This is needed
    707    //   after DTLS fingerprint changed since peer will discard
    708    //   messages with crypto derived from old fingerprint.
    709    //   The socket will be restarted (with changed parameters)
    710    //   later.
    711    RTC_DLOG(LS_INFO) << debug_name_ << " DTLS restart";
    712    socket_.reset();
    713  }
    714 }
    715 
    716 void DcSctpTransport::OnTransportReadPacket(
    717    PacketTransportInternal* /* transport */,
    718    const ReceivedIpPacket& packet) {
    719  RTC_DCHECK_RUN_ON(network_thread_);
    720  if (packet.decryption_info() != ReceivedIpPacket::kDtlsDecrypted) {
    721    // We are only interested in SCTP packets.
    722    return;
    723  }
    724 
    725  RTC_DLOG(LS_VERBOSE) << debug_name_ << "->OnTransportReadPacket(), length="
    726                       << packet.payload().size();
    727  if (socket_) {
    728    socket_->ReceivePacket(packet.payload());
    729  }
    730 }
    731 
    732 void DcSctpTransport::MaybeConnectSocket() {
    733  RTC_DLOG(LS_VERBOSE)
    734      << debug_name_ << "->MaybeConnectSocket(), writable="
    735      << (transport_ ? std::to_string(transport_->writable()) : "UNSET")
    736      << " socket: "
    737      << (socket_ ? std::to_string(static_cast<int>(socket_->state()))
    738                  : "UNSET");
    739  if (transport_ && transport_->writable() && socket_ &&
    740      socket_->state() == dcsctp::SocketState::kClosed) {
    741    socket_->Connect();
    742  }
    743 }
    744 
    745 dcsctp::DcSctpOptions DcSctpTransport::CreateDcSctpOptions(
    746    const SctpOptions& options,
    747    const FieldTrialsView& field_trials) {
    748  dcsctp::DcSctpOptions dcsctp_options;
    749  dcsctp_options.local_port = options.local_port;
    750  dcsctp_options.remote_port = options.remote_port;
    751  dcsctp_options.max_message_size = options.max_message_size;
    752  dcsctp_options.max_timer_backoff_duration = kMaxTimerBackoffDuration;
    753  // Don't close the connection automatically on too many retransmissions.
    754  dcsctp_options.max_retransmissions = std::nullopt;
    755  dcsctp_options.max_init_retransmits = std::nullopt;
    756  dcsctp_options.per_stream_send_queue_limit =
    757      DataChannelInterface::MaxSendQueueSize();
    758  // This is just set to avoid denial-of-service. Practically unlimited.
    759  dcsctp_options.max_send_buffer_size = std::numeric_limits<size_t>::max();
    760  dcsctp_options.enable_message_interleaving =
    761      field_trials.IsEnabled("WebRTC-DataChannelMessageInterleaving");
    762 
    763  return dcsctp_options;
    764 }
    765 
    766 }  // namespace webrtc