dcsctp_transport.cc (27295B)
1 /* 2 * Copyright 2021 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "media/sctp/dcsctp_transport.h" 12 13 #include <atomic> 14 #include <cstddef> 15 #include <cstdint> 16 #include <functional> 17 #include <limits> 18 #include <memory> 19 #include <optional> 20 #include <string> 21 #include <utility> 22 #include <vector> 23 24 #include "absl/strings/string_view.h" 25 #include "api/array_view.h" 26 #include "api/data_channel_interface.h" 27 #include "api/dtls_transport_interface.h" 28 #include "api/environment/environment.h" 29 #include "api/field_trials_view.h" 30 #include "api/priority.h" 31 #include "api/rtc_error.h" 32 #include "api/sctp_transport_interface.h" 33 #include "api/sequence_checker.h" 34 #include "api/task_queue/task_queue_base.h" 35 #include "api/transport/data_channel_transport_interface.h" 36 #include "media/sctp/sctp_transport_internal.h" 37 #include "net/dcsctp/public/dcsctp_message.h" 38 #include "net/dcsctp/public/dcsctp_options.h" 39 #include "net/dcsctp/public/dcsctp_socket.h" 40 #include "net/dcsctp/public/dcsctp_socket_factory.h" 41 #include "net/dcsctp/public/packet_observer.h" 42 #include "net/dcsctp/public/text_pcap_packet_observer.h" 43 #include "net/dcsctp/public/timeout.h" 44 #include "net/dcsctp/public/types.h" 45 #include "p2p/base/packet_transport_internal.h" 46 #include "p2p/dtls/dtls_transport_internal.h" 47 #include "rtc_base/async_packet_socket.h" 48 #include "rtc_base/checks.h" 49 #include "rtc_base/copy_on_write_buffer.h" 50 #include "rtc_base/logging.h" 51 #include "rtc_base/network/received_packet.h" 52 #include "rtc_base/socket.h" 53 #include "rtc_base/strings/string_builder.h" 54 #include "rtc_base/thread.h" 55 #include "rtc_base/trace_event.h" 56 #include "system_wrappers/include/clock.h" 57 58 namespace webrtc { 59 60 namespace { 61 using ::dcsctp::SendPacketStatus; 62 63 // When there is packet loss for a long time, the SCTP retry timers will use 64 // exponential backoff, which can grow to very long durations and when the 65 // connection recovers, it may take a long time to reach the new backoff 66 // duration. By limiting it to a reasonable limit, the time to recover reduces. 67 constexpr dcsctp::DurationMs kMaxTimerBackoffDuration = 68 dcsctp::DurationMs(3000); 69 70 enum class WebrtcPPID : dcsctp::PPID::UnderlyingType { 71 // https://www.rfc-editor.org/rfc/rfc8832.html#section-8.1 72 kDCEP = 50, 73 // https://www.rfc-editor.org/rfc/rfc8831.html#section-8 74 kString = 51, 75 kBinaryPartial = 52, // Deprecated 76 kBinary = 53, 77 kStringPartial = 54, // Deprecated 78 kStringEmpty = 56, 79 kBinaryEmpty = 57, 80 }; 81 82 WebrtcPPID ToPPID(DataMessageType message_type, size_t size) { 83 switch (message_type) { 84 case DataMessageType::kControl: 85 return WebrtcPPID::kDCEP; 86 case DataMessageType::kText: 87 return size > 0 ? WebrtcPPID::kString : WebrtcPPID::kStringEmpty; 88 case DataMessageType::kBinary: 89 return size > 0 ? WebrtcPPID::kBinary : WebrtcPPID::kBinaryEmpty; 90 } 91 } 92 93 std::optional<DataMessageType> ToDataMessageType(dcsctp::PPID ppid) { 94 switch (static_cast<WebrtcPPID>(ppid.value())) { 95 case WebrtcPPID::kDCEP: 96 return DataMessageType::kControl; 97 case WebrtcPPID::kString: 98 case WebrtcPPID::kStringPartial: 99 case WebrtcPPID::kStringEmpty: 100 return DataMessageType::kText; 101 case WebrtcPPID::kBinary: 102 case WebrtcPPID::kBinaryPartial: 103 case WebrtcPPID::kBinaryEmpty: 104 return DataMessageType::kBinary; 105 } 106 return std::nullopt; 107 } 108 109 std::optional<SctpErrorCauseCode> ToErrorCauseCode(dcsctp::ErrorKind error) { 110 switch (error) { 111 case dcsctp::ErrorKind::kParseFailed: 112 return SctpErrorCauseCode::kUnrecognizedParameters; 113 case dcsctp::ErrorKind::kPeerReported: 114 return SctpErrorCauseCode::kUserInitiatedAbort; 115 case dcsctp::ErrorKind::kWrongSequence: 116 case dcsctp::ErrorKind::kProtocolViolation: 117 return SctpErrorCauseCode::kProtocolViolation; 118 case dcsctp::ErrorKind::kResourceExhaustion: 119 return SctpErrorCauseCode::kOutOfResource; 120 case dcsctp::ErrorKind::kTooManyRetries: 121 case dcsctp::ErrorKind::kUnsupportedOperation: 122 case dcsctp::ErrorKind::kNoError: 123 case dcsctp::ErrorKind::kNotConnected: 124 // No SCTP error cause code matches those 125 break; 126 } 127 return std::nullopt; 128 } 129 130 bool IsEmptyPPID(dcsctp::PPID ppid) { 131 WebrtcPPID webrtc_ppid = static_cast<WebrtcPPID>(ppid.value()); 132 return webrtc_ppid == WebrtcPPID::kStringEmpty || 133 webrtc_ppid == WebrtcPPID::kBinaryEmpty; 134 } 135 } // namespace 136 137 DcSctpTransport::DcSctpTransport(const Environment& env, 138 Thread* network_thread, 139 DtlsTransportInternal* transport) 140 : DcSctpTransport(env, 141 network_thread, 142 transport, 143 std::make_unique<dcsctp::DcSctpSocketFactory>()) {} 144 145 DcSctpTransport::DcSctpTransport( 146 const Environment& env, 147 Thread* network_thread, 148 DtlsTransportInternal* transport, 149 std::unique_ptr<dcsctp::DcSctpSocketFactory> socket_factory) 150 : network_thread_(network_thread), 151 transport_(transport), 152 env_(env), 153 random_(env_.clock().TimeInMicroseconds()), 154 socket_factory_(std::move(socket_factory)), 155 task_queue_timeout_factory_( 156 *network_thread, 157 [this]() { return TimeMillis(); }, 158 [this](dcsctp::TimeoutID timeout_id) { 159 socket_->HandleTimeout(timeout_id); 160 }) { 161 RTC_DCHECK_RUN_ON(network_thread_); 162 static std::atomic<int> instance_count = 0; 163 StringBuilder sb; 164 sb << debug_name_ << instance_count++; 165 debug_name_ = sb.Release(); 166 ConnectTransportSignals(); 167 } 168 169 DcSctpTransport::~DcSctpTransport() { 170 if (socket_) { 171 socket_->Close(); 172 } 173 } 174 175 void DcSctpTransport::SetOnConnectedCallback(std::function<void()> callback) { 176 RTC_DCHECK_RUN_ON(network_thread_); 177 on_connected_callback_ = std::move(callback); 178 } 179 180 void DcSctpTransport::SetDataChannelSink(DataChannelSink* sink) { 181 RTC_DCHECK_RUN_ON(network_thread_); 182 data_channel_sink_ = sink; 183 if (data_channel_sink_ && ready_to_send_data_) { 184 data_channel_sink_->OnReadyToSend(); 185 } 186 } 187 188 void DcSctpTransport::SetDtlsTransport(DtlsTransportInternal* transport) { 189 RTC_DCHECK_RUN_ON(network_thread_); 190 DisconnectTransportSignals(); 191 transport_ = transport; 192 ConnectTransportSignals(); 193 MaybeConnectSocket(); 194 } 195 196 bool DcSctpTransport::Start(const SctpOptions& options) { 197 RTC_DCHECK_RUN_ON(network_thread_); 198 RTC_DCHECK(options.max_message_size > 0); 199 RTC_DLOG(LS_INFO) << debug_name_ << "->Start(local=" << options.local_port 200 << ", remote=" << options.remote_port 201 << ", max_message_size=" << options.max_message_size << ")"; 202 203 if (!socket_) { 204 dcsctp::DcSctpOptions dcsctp_options = 205 CreateDcSctpOptions(options, env_.field_trials()); 206 std::unique_ptr<dcsctp::PacketObserver> packet_observer; 207 if (RTC_LOG_CHECK_LEVEL(LS_VERBOSE)) { 208 packet_observer = 209 std::make_unique<dcsctp::TextPcapPacketObserver>(debug_name_); 210 } 211 212 socket_ = socket_factory_->Create( 213 debug_name_, *this, std::move(packet_observer), dcsctp_options); 214 } else { 215 if (options.local_port != socket_->options().local_port || 216 options.remote_port != socket_->options().remote_port) { 217 RTC_LOG(LS_ERROR) 218 << debug_name_ << "->Start(local=" << options.local_port 219 << ", remote=" << options.remote_port 220 << "): Can't change ports on already started transport."; 221 return false; 222 } 223 socket_->SetMaxMessageSize(options.max_message_size); 224 } 225 226 MaybeConnectSocket(); 227 228 for (const auto& [sid, stream_state] : stream_states_) { 229 socket_->SetStreamPriority(sid, stream_state.priority); 230 } 231 232 return true; 233 } 234 235 bool DcSctpTransport::OpenStream(int sid, PriorityValue priority) { 236 RTC_DCHECK_RUN_ON(network_thread_); 237 RTC_DLOG(LS_INFO) << debug_name_ << "->OpenStream(" << sid << ", " 238 << priority.value() << ")."; 239 240 StreamState stream_state; 241 stream_state.priority = dcsctp::StreamPriority(priority.value()); 242 stream_states_.insert_or_assign(dcsctp::StreamID(static_cast<uint16_t>(sid)), 243 stream_state); 244 if (socket_) { 245 socket_->SetStreamPriority(dcsctp::StreamID(sid), 246 dcsctp::StreamPriority(priority.value())); 247 } 248 249 return true; 250 } 251 252 bool DcSctpTransport::ResetStream(int sid) { 253 RTC_DCHECK_RUN_ON(network_thread_); 254 RTC_DLOG(LS_INFO) << debug_name_ << "->ResetStream(" << sid << ")."; 255 if (!socket_) { 256 RTC_LOG(LS_ERROR) << debug_name_ << "->ResetStream(sid=" << sid 257 << "): Transport is not started."; 258 return false; 259 } 260 261 dcsctp::StreamID streams[1] = {dcsctp::StreamID(static_cast<uint16_t>(sid))}; 262 263 auto it = stream_states_.find(streams[0]); 264 if (it == stream_states_.end()) { 265 RTC_LOG(LS_ERROR) << debug_name_ << "->ResetStream(sid=" << sid 266 << "): Stream is not open."; 267 return false; 268 } 269 270 StreamState& stream_state = it->second; 271 if (stream_state.closure_initiated || stream_state.incoming_reset_done || 272 stream_state.outgoing_reset_done) { 273 // The closing procedure was already initiated by the remote, don't do 274 // anything. 275 return false; 276 } 277 stream_state.closure_initiated = true; 278 socket_->ResetStreams(streams); 279 return true; 280 } 281 282 RTCError DcSctpTransport::SendData(int sid, 283 const SendDataParams& params, 284 const CopyOnWriteBuffer& payload) { 285 RTC_DCHECK_RUN_ON(network_thread_); 286 RTC_DLOG(LS_VERBOSE) << debug_name_ << "->SendData(sid=" << sid 287 << ", type=" << static_cast<int>(params.type) 288 << ", length=" << payload.size() << ")."; 289 290 if (!socket_) { 291 RTC_LOG(LS_ERROR) << debug_name_ 292 << "->SendData(...): Transport is not started."; 293 return RTCError(RTCErrorType::INVALID_STATE); 294 } 295 296 // It is possible for a message to be sent from the signaling thread at the 297 // same time a data-channel is closing, but before the signaling thread is 298 // aware of it. So we need to keep track of currently active data channels and 299 // skip sending messages for the ones that are not open or closing. 300 // The sending errors are not impacting the data channel API contract as 301 // it is allowed to discard queued messages when the channel is closing. 302 auto stream_state = 303 stream_states_.find(dcsctp::StreamID(static_cast<uint16_t>(sid))); 304 if (stream_state == stream_states_.end()) { 305 RTC_LOG(LS_VERBOSE) << "Skipping message on non-open stream with sid: " 306 << sid; 307 return RTCError(RTCErrorType::INVALID_STATE); 308 } 309 310 if (stream_state->second.closure_initiated || 311 stream_state->second.incoming_reset_done || 312 stream_state->second.outgoing_reset_done) { 313 RTC_LOG(LS_VERBOSE) << "Skipping message on closing stream with sid: " 314 << sid; 315 return RTCError(RTCErrorType::INVALID_STATE); 316 } 317 318 auto max_message_size = socket_->options().max_message_size; 319 if (max_message_size > 0 && payload.size() > max_message_size) { 320 RTC_LOG(LS_WARNING) << debug_name_ 321 << "->SendData(...): " 322 "Trying to send packet bigger " 323 "than the max message size: " 324 << payload.size() << " vs max of " << max_message_size; 325 return RTCError(RTCErrorType::INVALID_RANGE); 326 } 327 328 std::vector<uint8_t> message_payload(payload.cdata(), 329 payload.cdata() + payload.size()); 330 if (message_payload.empty()) { 331 // https://www.rfc-editor.org/rfc/rfc8831.html#section-6.6 332 // SCTP does not support the sending of empty user messages. Therefore, if 333 // an empty message has to be sent, the appropriate PPID (WebRTC String 334 // Empty or WebRTC Binary Empty) is used, and the SCTP user message of one 335 // zero byte is sent. 336 message_payload.push_back('\0'); 337 } 338 339 dcsctp::DcSctpMessage message( 340 dcsctp::StreamID(static_cast<uint16_t>(sid)), 341 dcsctp::PPID(static_cast<uint16_t>(ToPPID(params.type, payload.size()))), 342 std::move(message_payload)); 343 344 dcsctp::SendOptions send_options; 345 send_options.unordered = dcsctp::IsUnordered(!params.ordered); 346 if (params.max_rtx_ms.has_value()) { 347 RTC_DCHECK(*params.max_rtx_ms >= 0 && 348 *params.max_rtx_ms <= std::numeric_limits<uint16_t>::max()); 349 send_options.lifetime = dcsctp::DurationMs(*params.max_rtx_ms); 350 } 351 if (params.max_rtx_count.has_value()) { 352 RTC_DCHECK(*params.max_rtx_count >= 0 && 353 *params.max_rtx_count <= std::numeric_limits<uint16_t>::max()); 354 send_options.max_retransmissions = *params.max_rtx_count; 355 } 356 357 dcsctp::SendStatus error = socket_->Send(std::move(message), send_options); 358 switch (error) { 359 case dcsctp::SendStatus::kSuccess: 360 return RTCError::OK(); 361 case dcsctp::SendStatus::kErrorResourceExhaustion: 362 ready_to_send_data_ = false; 363 return RTCError(RTCErrorType::RESOURCE_EXHAUSTED); 364 default: 365 absl::string_view error_message = dcsctp::ToString(error); 366 RTC_LOG(LS_ERROR) << debug_name_ 367 << "->SendData(...): send() failed with error " 368 << error_message << "."; 369 return RTCError(RTCErrorType::NETWORK_ERROR, error_message); 370 } 371 } 372 373 bool DcSctpTransport::ReadyToSendData() { 374 RTC_DCHECK_RUN_ON(network_thread_); 375 return ready_to_send_data_; 376 } 377 378 int DcSctpTransport::max_message_size() const { 379 if (!socket_) { 380 RTC_LOG(LS_ERROR) << debug_name_ 381 << "->max_message_size(...): Transport is not started."; 382 return 0; 383 } 384 return socket_->options().max_message_size; 385 } 386 387 std::optional<int> DcSctpTransport::max_outbound_streams() const { 388 if (!socket_) 389 return std::nullopt; 390 return socket_->options().announced_maximum_outgoing_streams; 391 } 392 393 std::optional<int> DcSctpTransport::max_inbound_streams() const { 394 if (!socket_) 395 return std::nullopt; 396 return socket_->options().announced_maximum_incoming_streams; 397 } 398 399 size_t DcSctpTransport::buffered_amount(int sid) const { 400 if (!socket_) 401 return 0; 402 return socket_->buffered_amount(dcsctp::StreamID(sid)); 403 } 404 405 size_t DcSctpTransport::buffered_amount_low_threshold(int sid) const { 406 if (!socket_) 407 return 0; 408 return socket_->buffered_amount_low_threshold(dcsctp::StreamID(sid)); 409 } 410 411 void DcSctpTransport::SetBufferedAmountLowThreshold(int sid, size_t bytes) { 412 if (!socket_) 413 return; 414 socket_->SetBufferedAmountLowThreshold(dcsctp::StreamID(sid), bytes); 415 } 416 417 void DcSctpTransport::set_debug_name_for_testing(const char* debug_name) { 418 debug_name_ = debug_name; 419 } 420 421 SendPacketStatus DcSctpTransport::SendPacketWithStatus( 422 ArrayView<const uint8_t> data) { 423 RTC_DCHECK_RUN_ON(network_thread_); 424 RTC_DCHECK(socket_); 425 426 if (data.size() > (socket_->options().mtu)) { 427 RTC_LOG(LS_ERROR) << debug_name_ 428 << "->SendPacket(...): " 429 "SCTP seems to have made a packet that is bigger " 430 "than its official MTU: " 431 << data.size() << " vs max of " << socket_->options().mtu; 432 return SendPacketStatus::kError; 433 } 434 TRACE_EVENT0("webrtc", "DcSctpTransport::SendPacket"); 435 436 if (!transport_ || !transport_->writable()) 437 return SendPacketStatus::kError; 438 439 RTC_DLOG(LS_VERBOSE) << debug_name_ << "->SendPacket(length=" << data.size() 440 << ")"; 441 442 auto result = 443 transport_->SendPacket(reinterpret_cast<const char*>(data.data()), 444 data.size(), AsyncSocketPacketOptions(), 0); 445 446 if (result < 0) { 447 RTC_LOG(LS_WARNING) << debug_name_ << "->SendPacket(length=" << data.size() 448 << ") failed with error: " << transport_->GetError() 449 << "."; 450 451 if (IsBlockingError(transport_->GetError())) { 452 return SendPacketStatus::kTemporaryFailure; 453 } 454 return SendPacketStatus::kError; 455 } 456 return SendPacketStatus::kSuccess; 457 } 458 459 std::unique_ptr<dcsctp::Timeout> DcSctpTransport::CreateTimeout( 460 TaskQueueBase::DelayPrecision precision) { 461 return task_queue_timeout_factory_.CreateTimeout(precision); 462 } 463 464 dcsctp::TimeMs DcSctpTransport::TimeMillis() { 465 return dcsctp::TimeMs(env_.clock().TimeInMilliseconds()); 466 } 467 468 uint32_t DcSctpTransport::GetRandomInt(uint32_t low, uint32_t high) { 469 return random_.Rand(low, high); 470 } 471 472 void DcSctpTransport::OnTotalBufferedAmountLow() { 473 RTC_DCHECK_RUN_ON(network_thread_); 474 if (!ready_to_send_data_) { 475 ready_to_send_data_ = true; 476 if (data_channel_sink_) { 477 data_channel_sink_->OnReadyToSend(); 478 } 479 } 480 } 481 482 void DcSctpTransport::OnBufferedAmountLow(dcsctp::StreamID stream_id) { 483 RTC_DCHECK_RUN_ON(network_thread_); 484 if (data_channel_sink_) { 485 data_channel_sink_->OnBufferedAmountLow(*stream_id); 486 } 487 } 488 489 void DcSctpTransport::OnMessageReceived(dcsctp::DcSctpMessage message) { 490 RTC_DCHECK_RUN_ON(network_thread_); 491 RTC_DLOG(LS_VERBOSE) << debug_name_ << "->OnMessageReceived(sid=" 492 << message.stream_id().value() 493 << ", ppid=" << message.ppid().value() 494 << ", length=" << message.payload().size() << ")."; 495 auto type = ToDataMessageType(message.ppid()); 496 if (!type.has_value()) { 497 RTC_LOG(LS_VERBOSE) << debug_name_ 498 << "->OnMessageReceived(): Received an unknown PPID " 499 << message.ppid().value() 500 << " on an SCTP packet. Dropping."; 501 return; 502 } 503 receive_buffer_.Clear(); 504 if (!IsEmptyPPID(message.ppid())) 505 receive_buffer_.AppendData(message.payload().data(), 506 message.payload().size()); 507 508 if (data_channel_sink_) { 509 data_channel_sink_->OnDataReceived(message.stream_id().value(), *type, 510 receive_buffer_); 511 } 512 } 513 514 void DcSctpTransport::OnError(dcsctp::ErrorKind error, 515 absl::string_view message) { 516 if (error == dcsctp::ErrorKind::kResourceExhaustion) { 517 // Indicates that a message failed to be enqueued, because the send buffer 518 // is full, which is a very common (and wanted) state for high throughput 519 // sending/benchmarks. 520 RTC_LOG(LS_VERBOSE) << debug_name_ 521 << "->OnError(error=" << dcsctp::ToString(error) 522 << ", message=" << message << ")."; 523 } else { 524 RTC_LOG(LS_ERROR) << debug_name_ 525 << "->OnError(error=" << dcsctp::ToString(error) 526 << ", message=" << message << ")."; 527 } 528 } 529 530 void DcSctpTransport::OnAborted(dcsctp::ErrorKind error, 531 absl::string_view message) { 532 RTC_DCHECK_RUN_ON(network_thread_); 533 RTC_LOG(LS_ERROR) << debug_name_ 534 << "->OnAborted(error=" << dcsctp::ToString(error) 535 << ", message=" << message << ")."; 536 ready_to_send_data_ = false; 537 RTCError rtc_error(RTCErrorType::OPERATION_ERROR_WITH_DATA, 538 std::string(message)); 539 rtc_error.set_error_detail(RTCErrorDetailType::SCTP_FAILURE); 540 auto code = ToErrorCauseCode(error); 541 if (code.has_value()) { 542 rtc_error.set_sctp_cause_code(static_cast<uint16_t>(*code)); 543 } 544 if (data_channel_sink_) { 545 data_channel_sink_->OnTransportClosed(rtc_error); 546 } 547 } 548 549 void DcSctpTransport::OnConnected() { 550 RTC_DCHECK_RUN_ON(network_thread_); 551 RTC_DLOG(LS_INFO) << debug_name_ << "->OnConnected()."; 552 ready_to_send_data_ = true; 553 if (data_channel_sink_) { 554 data_channel_sink_->OnReadyToSend(); 555 } 556 if (on_connected_callback_) { 557 on_connected_callback_(); 558 } 559 } 560 561 void DcSctpTransport::OnClosed() { 562 RTC_DCHECK_RUN_ON(network_thread_); 563 RTC_DLOG(LS_INFO) << debug_name_ << "->OnClosed()."; 564 ready_to_send_data_ = false; 565 } 566 567 void DcSctpTransport::OnConnectionRestarted() { 568 RTC_DLOG(LS_INFO) << debug_name_ << "->OnConnectionRestarted()."; 569 } 570 571 void DcSctpTransport::OnStreamsResetFailed( 572 ArrayView<const dcsctp::StreamID> outgoing_streams, 573 absl::string_view reason) { 574 // TODO(orphis): Need a test to check for correct behavior 575 for (auto& stream_id : outgoing_streams) { 576 RTC_LOG(LS_WARNING) 577 << debug_name_ 578 << "->OnStreamsResetFailed(...): Outgoing stream reset failed" 579 << ", sid=" << stream_id.value() << ", reason: " << reason << "."; 580 } 581 } 582 583 void DcSctpTransport::OnStreamsResetPerformed( 584 ArrayView<const dcsctp::StreamID> outgoing_streams) { 585 RTC_DCHECK_RUN_ON(network_thread_); 586 for (auto& stream_id : outgoing_streams) { 587 RTC_LOG(LS_INFO) << debug_name_ 588 << "->OnStreamsResetPerformed(...): Outgoing stream reset" 589 << ", sid=" << stream_id.value(); 590 591 auto it = stream_states_.find(stream_id); 592 if (it == stream_states_.end()) { 593 // Ignoring an outgoing stream reset for a closed stream 594 return; 595 } 596 597 StreamState& stream_state = it->second; 598 stream_state.outgoing_reset_done = true; 599 600 if (stream_state.incoming_reset_done) { 601 // When the close was not initiated locally, we can signal the end of the 602 // data channel close procedure when the remote ACKs the reset. 603 if (data_channel_sink_) { 604 data_channel_sink_->OnChannelClosed(stream_id.value()); 605 } 606 stream_states_.erase(stream_id); 607 } 608 } 609 } 610 611 void DcSctpTransport::OnIncomingStreamsReset( 612 ArrayView<const dcsctp::StreamID> incoming_streams) { 613 RTC_DCHECK_RUN_ON(network_thread_); 614 for (auto& stream_id : incoming_streams) { 615 RTC_LOG(LS_INFO) << debug_name_ 616 << "->OnIncomingStreamsReset(...): Incoming stream reset" 617 << ", sid=" << stream_id.value(); 618 619 auto it = stream_states_.find(stream_id); 620 if (it == stream_states_.end()) 621 return; 622 623 StreamState& stream_state = it->second; 624 stream_state.incoming_reset_done = true; 625 626 if (!stream_state.closure_initiated) { 627 // When receiving an incoming stream reset event for a non local close 628 // procedure, the transport needs to reset the stream in the other 629 // direction too. 630 dcsctp::StreamID streams[1] = {stream_id}; 631 socket_->ResetStreams(streams); 632 if (data_channel_sink_) { 633 data_channel_sink_->OnChannelClosing(stream_id.value()); 634 } 635 } 636 637 if (stream_state.outgoing_reset_done) { 638 // The close procedure that was initiated locally is complete when we 639 // receive and incoming reset event. 640 if (data_channel_sink_) { 641 data_channel_sink_->OnChannelClosed(stream_id.value()); 642 } 643 stream_states_.erase(stream_id); 644 } 645 } 646 } 647 648 void DcSctpTransport::ConnectTransportSignals() { 649 RTC_DCHECK_RUN_ON(network_thread_); 650 if (!transport_) { 651 return; 652 } 653 transport_->SubscribeWritableState( 654 this, [this](PacketTransportInternal* transport) { 655 OnTransportWritableState(transport); 656 }); 657 658 transport_->RegisterReceivedPacketCallback( 659 this, 660 [&](PacketTransportInternal* transport, const ReceivedIpPacket& packet) { 661 OnTransportReadPacket(transport, packet); 662 }); 663 transport_->SetOnCloseCallback([this]() { 664 RTC_DCHECK_RUN_ON(network_thread_); 665 RTC_DLOG(LS_VERBOSE) << debug_name_ << "->OnTransportClosed()."; 666 if (data_channel_sink_) { 667 data_channel_sink_->OnTransportClosed({}); 668 } 669 }); 670 transport_->SubscribeDtlsTransportState( 671 this, [this](DtlsTransportInternal* transport, DtlsTransportState state) { 672 OnDtlsTransportState(transport, state); 673 }); 674 } 675 676 void DcSctpTransport::DisconnectTransportSignals() { 677 RTC_DCHECK_RUN_ON(network_thread_); 678 if (!transport_) { 679 return; 680 } 681 transport_->UnsubscribeWritableState(this); 682 transport_->DeregisterReceivedPacketCallback(this); 683 transport_->SetOnCloseCallback(nullptr); 684 transport_->UnsubscribeDtlsTransportState(this); 685 } 686 687 void DcSctpTransport::OnTransportWritableState( 688 PacketTransportInternal* transport) { 689 RTC_DCHECK_RUN_ON(network_thread_); 690 RTC_DCHECK_EQ(transport_, transport); 691 RTC_DLOG(LS_VERBOSE) << debug_name_ 692 << "->OnTransportWritableState(), writable=" 693 << transport->writable() << " socket: " 694 << (socket_ ? std::to_string( 695 static_cast<int>(socket_->state())) 696 : "UNSET"); 697 MaybeConnectSocket(); 698 } 699 700 void DcSctpTransport::OnDtlsTransportState(DtlsTransportInternal* transport, 701 DtlsTransportState state) { 702 if (state == DtlsTransportState::kNew && socket_) { 703 // IF DTLS restart (DtlsTransportState::kNew) 704 // THEN 705 // reset the socket so that we send an SCTP init 706 // before any outgoing messages. This is needed 707 // after DTLS fingerprint changed since peer will discard 708 // messages with crypto derived from old fingerprint. 709 // The socket will be restarted (with changed parameters) 710 // later. 711 RTC_DLOG(LS_INFO) << debug_name_ << " DTLS restart"; 712 socket_.reset(); 713 } 714 } 715 716 void DcSctpTransport::OnTransportReadPacket( 717 PacketTransportInternal* /* transport */, 718 const ReceivedIpPacket& packet) { 719 RTC_DCHECK_RUN_ON(network_thread_); 720 if (packet.decryption_info() != ReceivedIpPacket::kDtlsDecrypted) { 721 // We are only interested in SCTP packets. 722 return; 723 } 724 725 RTC_DLOG(LS_VERBOSE) << debug_name_ << "->OnTransportReadPacket(), length=" 726 << packet.payload().size(); 727 if (socket_) { 728 socket_->ReceivePacket(packet.payload()); 729 } 730 } 731 732 void DcSctpTransport::MaybeConnectSocket() { 733 RTC_DLOG(LS_VERBOSE) 734 << debug_name_ << "->MaybeConnectSocket(), writable=" 735 << (transport_ ? std::to_string(transport_->writable()) : "UNSET") 736 << " socket: " 737 << (socket_ ? std::to_string(static_cast<int>(socket_->state())) 738 : "UNSET"); 739 if (transport_ && transport_->writable() && socket_ && 740 socket_->state() == dcsctp::SocketState::kClosed) { 741 socket_->Connect(); 742 } 743 } 744 745 dcsctp::DcSctpOptions DcSctpTransport::CreateDcSctpOptions( 746 const SctpOptions& options, 747 const FieldTrialsView& field_trials) { 748 dcsctp::DcSctpOptions dcsctp_options; 749 dcsctp_options.local_port = options.local_port; 750 dcsctp_options.remote_port = options.remote_port; 751 dcsctp_options.max_message_size = options.max_message_size; 752 dcsctp_options.max_timer_backoff_duration = kMaxTimerBackoffDuration; 753 // Don't close the connection automatically on too many retransmissions. 754 dcsctp_options.max_retransmissions = std::nullopt; 755 dcsctp_options.max_init_retransmits = std::nullopt; 756 dcsctp_options.per_stream_send_queue_limit = 757 DataChannelInterface::MaxSendQueueSize(); 758 // This is just set to avoid denial-of-service. Practically unlimited. 759 dcsctp_options.max_send_buffer_size = std::numeric_limits<size_t>::max(); 760 dcsctp_options.enable_message_interleaving = 761 field_trials.IsEnabled("WebRTC-DataChannelMessageInterleaving"); 762 763 return dcsctp_options; 764 } 765 766 } // namespace webrtc