DataChannelDcSctp.cpp (16922B)
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* vim: set ts=2 et sw=2 tw=80: */ 3 /* This Source Code Form is subject to the terms of the Mozilla Public 4 * License, v. 2.0. If a copy of the MPL was not distributed with this file, 5 * You can obtain one at http://mozilla.org/MPL/2.0/. */ 6 7 #include "DataChannelDcSctp.h" 8 #include "mozilla/Components.h" 9 #include "mozilla/RandomNum.h" 10 #include "DataChannelLog.h" 11 #include "transport/runnable_utils.h" 12 13 namespace mozilla { 14 15 DataChannelConnectionDcSctp::DataChannelConnectionDcSctp( 16 DataConnectionListener* aListener, nsISerialEventTarget* aTarget, 17 MediaTransportHandler* aHandler) 18 : DataChannelConnection(aListener, aTarget, aHandler) { 19 // dcsctp does not expose anything related to negotiation of maximum stream 20 // id. 21 mNegotiatedIdLimit = MAX_NUM_STREAMS; 22 } 23 24 void DataChannelConnectionDcSctp::Destroy() { 25 MOZ_ASSERT(NS_IsMainThread()); 26 DC_DEBUG(("%s: %p", __func__, this)); 27 DataChannelConnection::Destroy(); 28 mSTS->Dispatch(NS_NewRunnableFunction( 29 "DataChannelConnectionDcSctp::Destroy", 30 [this, self = RefPtr<DataChannelConnectionDcSctp>(this)]() { 31 if (mDcSctp) { 32 mDcSctp->Close(); 33 // Do we do this now? 34 mDcSctp = nullptr; 35 } 36 })); 37 } 38 39 bool DataChannelConnectionDcSctp::RaiseStreamLimitTo(uint16_t aNewLimit) { 40 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 41 DC_DEBUG(("%s: %p", __func__, this)); 42 // dcsctp does not expose anything related to negotiation of maximum stream 43 // id. It probably just negotiates 65534. Just smile and nod. 44 return true; 45 } 46 47 void DataChannelConnectionDcSctp::OnTransportReady() { 48 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 49 DC_DEBUG(("%s: %p", __func__, this)); 50 if (!mDcSctp) { 51 auto factory = std::make_unique<dcsctp::DcSctpSocketFactory>(); 52 DcSctpOptions options; 53 options.local_port = mLocalPort; 54 options.remote_port = mRemotePort; 55 options.max_message_size = 8 * 1024 * 1024; 56 options.max_timer_backoff_duration = DurationMs(3000); 57 // Don't close the connection automatically on too many retransmissions. 58 options.max_retransmissions = std::nullopt; 59 options.max_init_retransmits = std::nullopt; 60 options.per_stream_send_queue_limit = 1024 * 1024 * 64; 61 // This is just set to avoid denial-of-service. Practically unlimited. 62 options.max_send_buffer_size = 63 std::numeric_limits<decltype(options.max_send_buffer_size)>::max(); 64 options.max_receiver_window_buffer_size = 16 * 1024 * 1024; 65 options.enable_message_interleaving = true; 66 // The default value of 200 leads to extremely poor congestion recovery 67 // when packet loss has occurred. 68 options.delayed_ack_max_timeout = DurationMs(50); 69 70 mDcSctp = 71 factory->Create("DataChannelConnectionDcSctp", *this, nullptr, options); 72 mDcSctp->Connect(); 73 } 74 } 75 76 bool DataChannelConnectionDcSctp::Init(const uint16_t aLocalPort, 77 const uint16_t aNumStreams) { 78 return true; 79 } 80 81 int DataChannelConnectionDcSctp::SendMessage(DataChannel& aChannel, 82 OutgoingMsg&& aMsg) { 83 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 84 DC_DEBUG(("%s: %p (size %u)", __func__, this, 85 static_cast<unsigned>(aMsg.GetRemainingData().size()))); 86 if (!mDcSctp) { 87 return EBADF; // Debatable? 88 } 89 90 // I do not see any way to get nsCString to pass ownership of its buffer to 91 // anything besides another nsString. Bummer. 92 auto remaining = aMsg.GetRemainingData(); 93 std::vector<uint8_t> data; 94 data.assign(remaining.begin(), remaining.end()); 95 DcSctpMessage msg(StreamID(aMsg.GetMetadata().mStreamId), 96 PPID(aMsg.GetMetadata().mPpid), std::move(data)); 97 SendOptions options; 98 options.unordered = IsUnordered(aMsg.GetMetadata().mUnordered); 99 aMsg.GetMetadata().mMaxLifetimeMs.apply([&options](uint16_t lifetime) { 100 options.lifetime = DurationMs(lifetime); 101 }); 102 aMsg.GetMetadata().mMaxRetransmissions.apply( 103 [&options](uint16_t rtx) { options.max_retransmissions = rtx; }); 104 105 if (aMsg.GetMetadata().mPpid == DATA_CHANNEL_PPID_CONTROL) { 106 // Make sure we get a callback when this DCEP message is sent, and remember 107 // the stream id and the size. This allows us to work around the dcsctp bug 108 // that counts DCEP as part of bufferedAmount. 109 uint64_t id = mNextLifecycleId++; 110 options.lifecycle_id = LifecycleId(id); 111 mBufferedDCEPBytes[id] = 112 std::make_pair(aMsg.GetMetadata().mStreamId, remaining.size()); 113 } 114 115 auto result = mDcSctp->Send(std::move(msg), options); 116 117 if (aMsg.GetMetadata().mPpid != DATA_CHANNEL_PPID_DOMSTRING_EMPTY && 118 aMsg.GetMetadata().mPpid != DATA_CHANNEL_PPID_BINARY_EMPTY) { 119 mBufferedAmounts[aMsg.GetMetadata().mStreamId] += remaining.size(); 120 } 121 122 switch (result) { 123 case SendStatus::kSuccess: 124 break; 125 case SendStatus::kErrorMessageEmpty: 126 DC_ERROR(("%s: %p send failed (kErrorMessageEmpty)", __func__, this)); 127 return EINVAL; 128 case SendStatus::kErrorMessageTooLarge: 129 DC_ERROR(("%s: %p send failed (kErrorMessageTooLarge)", __func__, this)); 130 return EMSGSIZE; 131 case SendStatus::kErrorResourceExhaustion: 132 DC_ERROR( 133 ("%s: %p send failed (kErrorResourceExhaustion)", __func__, this)); 134 return ENOBUFS; // Debatable? 135 case SendStatus::kErrorShuttingDown: 136 DC_ERROR(("%s: %p send failed (kErrorShuttingDown)", __func__, this)); 137 return EPIPE; // Debatable? 138 } 139 140 return 0; 141 } 142 143 void DataChannelConnectionDcSctp::OnSctpPacketReceived( 144 const MediaPacket& aPacket) { 145 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 146 DC_DEBUG( 147 ("%s: %p size=%u", __func__, this, static_cast<unsigned>(aPacket.len()))); 148 if (!mDcSctp) { 149 return; 150 } 151 webrtc::ArrayView<const uint8_t> data(aPacket.data(), aPacket.len()); 152 mDcSctp->ReceivePacket(data); 153 } 154 155 bool DataChannelConnectionDcSctp::ResetStreams(nsTArray<uint16_t>& aStreams) { 156 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 157 DC_DEBUG(("%s: %p", __func__, this)); 158 if (!mDcSctp) { 159 return false; 160 } 161 std::vector<StreamID> converted; 162 for (auto id : aStreams) { 163 DC_DEBUG(("%s: %p Resetting %u", __func__, this, id)); 164 converted.push_back(StreamID(id)); 165 } 166 auto result = 167 mDcSctp->ResetStreams(webrtc::ArrayView<const StreamID>(converted)); 168 aStreams.Clear(); 169 return result == ResetStreamsStatus::kPerformed; 170 } 171 172 void DataChannelConnectionDcSctp::OnStreamOpen(uint16_t aStream) { 173 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 174 DC_DEBUG(("%s: %p", __func__, this)); 175 for (auto it = mPreChannelData.begin(); it != mPreChannelData.end();) { 176 if (it->GetStreamId() == aStream) { 177 HandleDataMessage(std::move(*it)); 178 it = mPreChannelData.erase(it); 179 } else { 180 ++it; 181 } 182 } 183 } 184 185 SendPacketStatus DataChannelConnectionDcSctp::SendPacketWithStatus( 186 webrtc::ArrayView<const uint8_t> aData) { 187 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 188 DC_DEBUG(("%s: %p", __func__, this)); 189 std::unique_ptr<MediaPacket> packet(new MediaPacket); 190 packet->SetType(MediaPacket::SCTP); 191 packet->Copy(aData.data(), aData.size()); 192 193 DataChannelConnection::SendPacket(std::move(packet)); 194 return SendPacketStatus::kSuccess; 195 } 196 197 class DcSctpTimeout : public Timeout { 198 public: 199 explicit DcSctpTimeout(DataChannelConnectionDcSctp* aConnection) 200 : mConnection(aConnection) {} 201 202 // Called to start time timeout, with the duration in milliseconds as 203 // `duration` and with the timeout identifier as `timeout_id`, which - if 204 // the timeout expires - shall be provided to `DcSctpSocket::HandleTimeout`. 205 // 206 // `Start` and `Stop` will always be called in pairs. In other words will 207 // ´Start` never be called twice, without a call to `Stop` in between. 208 void Start(DurationMs duration, TimeoutID timeout_id) override { 209 mId = timeout_id.value(); 210 DC_DEBUG(("%s: %u %ums", __func__, mId, 211 static_cast<unsigned>(duration.value()))); 212 auto result = NS_NewTimerWithCallback( 213 [connection = mConnection, timeout_id](nsITimer* timer) { 214 DC_DEBUG(("%s: %u fired", __func__, 215 static_cast<unsigned>(timeout_id.value()))); 216 connection->HandleTimeout(timeout_id); 217 }, 218 duration.value(), nsITimer::TYPE_ONE_SHOT, "DcSctpTimeout::Start"_ns); 219 if (result.isOk()) { 220 mTimer = result.unwrap(); 221 } 222 } 223 224 // Called to stop the running timeout. 225 // 226 // `Start` and `Stop` will always be called in pairs. In other words will 227 // ´Start` never be called twice, without a call to `Stop` in between. 228 // 229 // `Stop` will always be called prior to releasing this object. 230 void Stop() override { 231 DC_DEBUG(("%s: %u", __func__, mId)); 232 if (mTimer) { 233 mTimer->Cancel(); 234 mTimer = nullptr; 235 } 236 } 237 238 private: 239 RefPtr<DataChannelConnectionDcSctp> mConnection; 240 nsCOMPtr<nsITimer> mTimer; 241 unsigned mId = 0; 242 }; 243 244 std::unique_ptr<Timeout> DataChannelConnectionDcSctp::CreateTimeout( 245 webrtc::TaskQueueBase::DelayPrecision aPrecision) { 246 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 247 DC_DEBUG(("%s: %p", __func__, this)); 248 // There is no such thing as a low precision TYPE_ONE_SHOT 249 (void)aPrecision; 250 return std::make_unique<DcSctpTimeout>(this); 251 } 252 253 void DataChannelConnectionDcSctp::HandleTimeout(TimeoutID aId) { 254 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 255 DC_DEBUG(("%s: %p", __func__, this)); 256 if (mDcSctp) { 257 mDcSctp->HandleTimeout(aId); 258 } 259 } 260 261 uint32_t DataChannelConnectionDcSctp::GetRandomInt(uint32_t aLow, 262 uint32_t aHigh) { 263 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 264 DC_DEBUG(("%s: %p", __func__, this)); 265 return aLow + RandomUint64OrDie() % (aHigh - aLow); 266 } 267 268 void DataChannelConnectionDcSctp::OnMessageReceived(DcSctpMessage aMessage) { 269 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 270 DC_DEBUG(("%s: %p", __func__, this)); 271 RefPtr<DataChannel> channel = 272 FindChannelByStream(aMessage.stream_id().value()); 273 274 IncomingMsg msg(aMessage.ppid().value(), aMessage.stream_id().value()); 275 // Sadly, nsCString and std::vector have no way to relinquish their buffers 276 // to one another. 277 msg.Append(aMessage.payload().data(), aMessage.payload().size()); 278 if (msg.GetPpid() == DATA_CHANNEL_PPID_CONTROL) { 279 HandleDCEPMessage(std::move(msg)); 280 } else if (channel && !HasPreChannelData(msg.GetStreamId())) { 281 HandleDataMessage(std::move(msg)); 282 } else { 283 mPreChannelData.push_back(std::move(msg)); 284 } 285 } 286 287 void DataChannelConnectionDcSctp::OnError(ErrorKind aError, 288 absl::string_view aMessage) { 289 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 290 DC_ERROR(("%s: %p %d %s", __func__, this, static_cast<int>(aError), 291 std::string(aMessage).c_str())); 292 } 293 294 void DataChannelConnectionDcSctp::OnAborted(ErrorKind aError, 295 absl::string_view aMessage) { 296 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 297 DC_ERROR(("%s: %p %d %s", __func__, this, static_cast<int>(aError), 298 std::string(aMessage).c_str())); 299 CloseAll_s(); 300 } 301 302 void DataChannelConnectionDcSctp::OnConnected() { 303 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 304 DC_DEBUG(("%s: %p", __func__, this)); 305 DataChannelConnectionState state = GetState(); 306 // TODO: Some duplicate code here, refactor 307 if (state == DataChannelConnectionState::Connecting) { 308 SetState(DataChannelConnectionState::Open); 309 310 OnConnected(); 311 DC_DEBUG(("%s: %p DTLS connect() succeeded! Entering connected mode", 312 __func__, this)); 313 314 // Open any streams pending... 315 // TODO: Do we really need to dispatch here? We're already on STS... 316 RUN_ON_THREAD(mSTS, 317 WrapRunnable(RefPtr<DataChannelConnection>(this), 318 &DataChannelConnection::ProcessQueuedOpens), 319 NS_DISPATCH_NORMAL); 320 321 } else if (state == DataChannelConnectionState::Open) { 322 DC_DEBUG(("%s: %p DataConnection Already OPEN", __func__, this)); 323 } else { 324 DC_ERROR(("%s: %p Unexpected state: %s", __func__, this, ToString(state))); 325 } 326 } 327 328 void DataChannelConnectionDcSctp::OnClosed() { 329 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 330 DC_DEBUG(("%s: %p", __func__, this)); 331 CloseAll_s(); 332 } 333 334 void DataChannelConnectionDcSctp::OnConnectionRestarted() { 335 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 336 DC_DEBUG(("%s: %p", __func__, this)); 337 } 338 339 void DataChannelConnectionDcSctp::OnStreamsResetFailed( 340 webrtc::ArrayView<const StreamID> aOutgoingStreams, 341 absl::string_view aReason) { 342 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 343 DC_ERROR(("%s: %p", __func__, this)); 344 // It probably does not make much sense to retry this here. If dcsctp doesn't 345 // want to retry, we probably don't either. 346 (void)aReason; 347 std::vector<uint16_t> streamsReset; 348 for (auto id : aOutgoingStreams) { 349 streamsReset.push_back(id.value()); 350 } 351 OnStreamsResetComplete(std::move(streamsReset)); 352 } 353 354 void DataChannelConnectionDcSctp::OnStreamsResetPerformed( 355 webrtc::ArrayView<const StreamID> aOutgoingStreams) { 356 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 357 DC_DEBUG(("%s: %p", __func__, this)); 358 std::vector<uint16_t> streamsReset; 359 for (auto id : aOutgoingStreams) { 360 streamsReset.push_back(id.value()); 361 } 362 OnStreamsResetComplete(std::move(streamsReset)); 363 } 364 365 void DataChannelConnectionDcSctp::OnIncomingStreamsReset( 366 webrtc::ArrayView<const StreamID> aIncomingStreams) { 367 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 368 DC_DEBUG(("%s: %p", __func__, this)); 369 std::vector<uint16_t> streamsReset; 370 for (auto id : aIncomingStreams) { 371 streamsReset.push_back(id.value()); 372 } 373 OnStreamsReset(std::move(streamsReset)); 374 } 375 376 // We (ab)use this callback to detect when _any_ data has been sent on the 377 // stream id, to drive updates to mainthread. 378 void DataChannelConnectionDcSctp::OnBufferedAmountLow(StreamID aStreamId) { 379 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 380 UpdateBufferedAmount(aStreamId); 381 } 382 383 void DataChannelConnectionDcSctp::OnLifecycleMessageFullySent( 384 LifecycleId aLifecycleId) { 385 DC_DEBUG(("%s: %p aLifecycleId=%u", __func__, this, 386 static_cast<unsigned>(aLifecycleId.value()))); 387 OnDCEPMessageDone(aLifecycleId); 388 } 389 390 void DataChannelConnectionDcSctp::OnLifecycleMessageExpired( 391 LifecycleId aLifecycleId, bool aMaybeDelivered) { 392 DC_DEBUG(("%s: %p aLifecycleId=%u aMaybeDelivered=%d", __func__, this, 393 static_cast<unsigned>(aLifecycleId.value()), 394 static_cast<int>(aMaybeDelivered))); 395 if (!aMaybeDelivered) { 396 OnDCEPMessageDone(aLifecycleId); 397 } 398 } 399 400 void DataChannelConnectionDcSctp::UpdateBufferedAmount(StreamID aStreamId) { 401 DC_DEBUG(("%s: %p id=%u", __func__, this, 402 static_cast<unsigned>(aStreamId.value()))); 403 mSTS->Dispatch(NS_NewRunnableFunction( 404 "DataChannelConnectionDcSctp::UpdateBufferedAmount", 405 [this, self = RefPtr<DataChannelConnectionDcSctp>(this), aStreamId]() { 406 auto channel = mChannels.Get(aStreamId.value()); 407 if (!channel || !mDcSctp) { 408 return; 409 } 410 411 size_t oldAmount = mBufferedAmounts[aStreamId.value()]; 412 size_t newAmount = mDcSctp->buffered_amount(aStreamId); 413 int decreaseWithoutDCEP = 414 oldAmount - newAmount - mDCEPBytesSent[aStreamId.value()]; 415 416 if (decreaseWithoutDCEP > 0) { 417 channel->DecrementBufferedAmount(decreaseWithoutDCEP); 418 } 419 420 DC_DEBUG(("%s: %p id=%u amount %u -> %u (difference without DCEP %d)", 421 __func__, this, static_cast<unsigned>(aStreamId.value()), 422 static_cast<unsigned>(oldAmount), 423 static_cast<unsigned>(newAmount), decreaseWithoutDCEP)); 424 mDCEPBytesSent.erase(aStreamId.value()); 425 mBufferedAmounts[aStreamId.value()] = newAmount; 426 mDcSctp->SetBufferedAmountLowThreshold(aStreamId, 427 newAmount ? newAmount - 1 : 0); 428 })); 429 } 430 431 void DataChannelConnectionDcSctp::OnDCEPMessageDone(LifecycleId aLifecycleId) { 432 DC_DEBUG(("%s: %p", __func__, this)); 433 // Find the stream id and the size of this DCEP packet. 434 auto it = mBufferedDCEPBytes.find(aLifecycleId.value()); 435 if (it == mBufferedDCEPBytes.end()) { 436 MOZ_ASSERT(false); 437 return; 438 } 439 440 auto& [stream, size] = it->second; 441 442 // Find the running total of DCEP bytes sent for this stream, and add the 443 // number of DCEP bytes we just learned about. 444 mDCEPBytesSent[stream] += size; 445 DC_DEBUG(("%s: %p id=%u amount=%u", __func__, this, 446 static_cast<unsigned>(stream), static_cast<unsigned>(size))); 447 448 // This is mainly to reset the buffered amount low threshold. 449 UpdateBufferedAmount(StreamID(stream)); 450 451 mBufferedDCEPBytes.erase(it); 452 } 453 454 bool DataChannelConnectionDcSctp::HasPreChannelData(uint16_t aStream) const { 455 for (const auto& msg : mPreChannelData) { 456 if (msg.GetStreamId() == aStream) { 457 return true; 458 } 459 } 460 return false; 461 } 462 463 } // namespace mozilla