DataChannel.cpp (60053B)
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* vim: set ts=2 et sw=2 tw=80: */ 3 /* This Source Code Form is subject to the terms of the Mozilla Public 4 * License, v. 2.0. If a copy of the MPL was not distributed with this file, 5 * You can obtain one at http://mozilla.org/MPL/2.0/. */ 6 7 #include <algorithm> 8 #include <stdio.h> 9 10 #ifdef XP_WIN 11 # include <winsock.h> // for htonl, htons, ntohl, ntohs 12 #endif 13 14 #include "nsIInputStream.h" 15 #include "nsIPrefBranch.h" 16 #include "nsIPrefService.h" 17 #include "mozilla/Sprintf.h" 18 #include "nsProxyRelease.h" 19 #include "nsThread.h" 20 #include "nsThreadUtils.h" 21 #include "nsNetUtil.h" 22 #include "mozilla/Components.h" 23 #include "mozilla/StaticMutex.h" 24 #include "mozilla/UniquePtrExtensions.h" 25 #include "mozilla/dom/RTCDataChannel.h" 26 #include "mozilla/dom/RTCDataChannelBinding.h" 27 #ifdef MOZ_PEERCONNECTION 28 # include "transport/runnable_utils.h" 29 # include "jsapi/MediaTransportHandler.h" 30 # include "mediapacket.h" 31 #endif 32 33 #include "DataChannel.h" 34 #include "DataChannelDcSctp.h" 35 #include "DataChannelUsrsctp.h" 36 #include "DataChannelLog.h" 37 #include "DataChannelProtocol.h" 38 39 namespace mozilla { 40 41 LazyLogModule gDataChannelLog("DataChannel"); 42 43 OutgoingMsg::OutgoingMsg(nsACString&& aData, 44 const DataChannelMessageMetadata& aMetadata) 45 : mData(std::move(aData)), mMetadata(aMetadata) {} 46 47 void OutgoingMsg::Advance(size_t offset) { 48 mPos += offset; 49 if (mPos > mData.Length()) { 50 mPos = mData.Length(); 51 } 52 } 53 54 DataChannelConnection::~DataChannelConnection() { 55 DC_INFO(("%p: Deleting DataChannelConnection", this)); 56 // This may die on the MainThread, or on the STS thread, or on an 57 // sctp thread if we were in a callback when the DOM side shut things down. 58 MOZ_ASSERT(mState == DataChannelConnectionState::Closed); 59 MOZ_ASSERT(mPending.empty()); 60 61 if (!mSTS->IsOnCurrentThread()) { 62 // We may be on MainThread *or* on an sctp thread (being called from 63 // receive_cb() or SendSctpPacket()) 64 if (mInternalIOThread) { 65 // Avoid spinning the event thread from here (which if we're mainthread 66 // is in the event loop already) 67 nsCOMPtr<nsIRunnable> r = WrapRunnable( 68 nsCOMPtr<nsIThread>(mInternalIOThread), &nsIThread::AsyncShutdown); 69 mSTS->Dispatch(r.forget(), NS_DISPATCH_FALLIBLE); 70 } 71 } else { 72 // on STS, safe to call shutdown 73 if (mInternalIOThread) { 74 mInternalIOThread->Shutdown(); 75 } 76 } 77 } 78 79 void DataChannelConnection::Destroy() { 80 MOZ_ASSERT(NS_IsMainThread()); 81 DC_INFO(("%p: Destroying DataChannelConnection", this)); 82 CloseAll(); 83 #ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED 84 MOZ_DIAGNOSTIC_ASSERT(mSTS); 85 #endif 86 mListener = nullptr; 87 mSTS->Dispatch( 88 NS_NewCancelableRunnableFunction( 89 __func__, 90 [this, self = RefPtr<DataChannelConnection>(this)]() { 91 mPacketReceivedListener.DisconnectIfExists(); 92 mStateChangeListener.DisconnectIfExists(); 93 #ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED 94 mShutdown = true; 95 DC_INFO(("Shutting down connection %p, id %p", this, (void*)mId)); 96 #endif 97 }), 98 NS_DISPATCH_FALLIBLE); 99 } 100 101 Maybe<RefPtr<DataChannelConnection>> DataChannelConnection::Create( 102 DataChannelConnection::DataConnectionListener* aListener, 103 nsISerialEventTarget* aTarget, MediaTransportHandler* aHandler, 104 const uint16_t aLocalPort, const uint16_t aNumStreams) { 105 MOZ_ASSERT(NS_IsMainThread()); 106 107 RefPtr<DataChannelConnection> connection; 108 if (Preferences::GetBool("media.peerconnection.sctp.use_dcsctp", false)) { 109 connection = new DataChannelConnectionDcSctp(aListener, aTarget, 110 aHandler); // Walks into a bar 111 } else { 112 connection = new DataChannelConnectionUsrsctp( 113 aListener, aTarget, aHandler); // Walks into a bar 114 } 115 return connection->Init(aLocalPort, aNumStreams) ? Some(connection) 116 : Nothing(); 117 } 118 119 DataChannelConnection::DataChannelConnection( 120 DataChannelConnection::DataConnectionListener* aListener, 121 nsISerialEventTarget* aTarget, MediaTransportHandler* aHandler) 122 : NeckoTargetHolder(aTarget), 123 mListener(aListener), 124 mTransportHandler(aHandler) { 125 MOZ_ASSERT(NS_IsMainThread()); 126 DC_VERBOSE( 127 ("%p: DataChannelConnection c'tor, listener=%p", this, mListener.get())); 128 129 // XXX FIX! make this a global we get once 130 // Find the STS thread 131 nsresult rv; 132 mSTS = mozilla::components::SocketTransport::Service(&rv); 133 MOZ_ASSERT(NS_SUCCEEDED(rv)); 134 135 #ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED 136 mShutdown = false; 137 #endif 138 } 139 140 // Only called on MainThread, mMaxMessageSize is read on other threads 141 void DataChannelConnection::SetMaxMessageSize(uint64_t aMaxMessageSize) { 142 MOZ_ASSERT(NS_IsMainThread()); 143 144 mMaxMessageSize = aMaxMessageSize; 145 146 nsresult rv; 147 nsCOMPtr<nsIPrefService> prefs; 148 prefs = mozilla::components::Preferences::Service(&rv); 149 if (!NS_WARN_IF(NS_FAILED(rv))) { 150 nsCOMPtr<nsIPrefBranch> branch = do_QueryInterface(prefs); 151 152 if (branch) { 153 int32_t temp; 154 if (!NS_FAILED(branch->GetIntPref( 155 "media.peerconnection.sctp.force_maximum_message_size", &temp))) { 156 if (temp > 0 && (uint64_t)temp < mMaxMessageSize) { 157 mMaxMessageSize = (uint64_t)temp; 158 } 159 } 160 } 161 } 162 163 // Fix remote MMS. This code exists, so future implementations of 164 // RTCSctpTransport.maxMessageSize can simply provide that value from 165 // GetMaxMessageSize. 166 167 // TODO: Bug 1382779, once resolved, can be increased to 168 // min(Uint8ArrayMaxSize, UINT32_MAX) 169 // TODO: Bug 1381146, once resolved, can be increased to whatever we support 170 // then (hopefully 171 // SIZE_MAX) 172 if (mMaxMessageSize == 0 || 173 mMaxMessageSize > WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_REMOTE) { 174 mMaxMessageSize = WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_REMOTE; 175 } 176 177 DC_DEBUG(("%p: Maximum message size (outgoing data): %" PRIu64 178 " (enforced=%s)", 179 this, mMaxMessageSize, 180 aMaxMessageSize != mMaxMessageSize ? "yes" : "no")); 181 182 for (auto& channel : mChannels.GetAll()) { 183 channel->SetMaxMessageSize(GetMaxMessageSize()); 184 } 185 } 186 187 double DataChannelConnection::GetMaxMessageSize() { 188 MOZ_ASSERT(NS_IsMainThread()); 189 if (mMaxMessageSize) { 190 return static_cast<double>(mMaxMessageSize); 191 } 192 193 return std::numeric_limits<double>::infinity(); 194 } 195 196 RefPtr<DataChannelConnection::StatsPromise> DataChannelConnection::GetStats( 197 const DOMHighResTimeStamp aTimestamp) const { 198 MOZ_ASSERT(NS_IsMainThread()); 199 nsTArray<RefPtr<DataChannelStatsPromise>> statsPromises; 200 for (const RefPtr<DataChannel>& chan : mChannels.GetAll()) { 201 if (chan) { 202 RefPtr<DataChannelStatsPromise> statsPromise(chan->GetStats(aTimestamp)); 203 if (statsPromise) { 204 statsPromises.AppendElement(std::move(statsPromise)); 205 } 206 } 207 } 208 209 return DataChannelStatsPromise::All(GetMainThreadSerialEventTarget(), 210 statsPromises); 211 } 212 213 RefPtr<DataChannelStatsPromise> DataChannel::GetStats( 214 const DOMHighResTimeStamp aTimestamp) { 215 MOZ_ASSERT(NS_IsMainThread()); 216 217 return InvokeAsync(mDomEventTarget, __func__, 218 [this, self = RefPtr<DataChannel>(this), aTimestamp] { 219 if (!GetDomDataChannel()) { 220 // Empty stats object, I guess... too late to 221 // return a nullptr and rejecting will trash stats 222 // promises for all other datachannels. 223 return DataChannelStatsPromise::CreateAndResolve( 224 dom::RTCDataChannelStats(), __func__); 225 } 226 return DataChannelStatsPromise::CreateAndResolve( 227 GetDomDataChannel()->GetStats(aTimestamp), __func__); 228 }); 229 } 230 231 bool DataChannelConnection::ConnectToTransport(const std::string& aTransportId, 232 const bool aClient, 233 const uint16_t aLocalPort, 234 const uint16_t aRemotePort) { 235 MOZ_ASSERT(NS_IsMainThread()); 236 237 static const auto paramString = 238 [](const std::string& tId, const Maybe<bool>& client, 239 const uint16_t localPort, const uint16_t remotePort) -> std::string { 240 std::ostringstream stream; 241 stream << "Transport ID: '" << tId << "', Role: '" 242 << (client ? (client.value() ? "client" : "server") : "") 243 << "', Local Port: '" << localPort << "', Remote Port: '" 244 << remotePort << "'"; 245 return stream.str(); 246 }; 247 248 const auto params = 249 paramString(aTransportId, Some(aClient), aLocalPort, aRemotePort); 250 DC_INFO( 251 ("%p: ConnectToTransport connecting DTLS transport with parameters: %s", 252 this, params.c_str())); 253 254 DC_INFO(("%p: New transport parameters: %s", this, params.c_str())); 255 if (NS_WARN_IF(aTransportId.empty())) { 256 return false; 257 } 258 259 if (!mAllocateEven.isSome()) { 260 // Do this stuff once. 261 mLocalPort = aLocalPort; 262 mRemotePort = aRemotePort; 263 mAllocateEven = Some(aClient); 264 nsTArray<RefPtr<DataChannel>> hasStreamId; 265 // Could be faster. Probably doesn't matter. 266 while (auto channel = mChannels.Get(INVALID_STREAM)) { 267 mChannels.Remove(channel); 268 auto id = FindFreeStream(); 269 if (id != INVALID_STREAM) { 270 channel->SetStream(id); 271 mChannels.Insert(channel); 272 DC_DEBUG(("%p: Inserting auto-selected id %u for channel %p", this, 273 static_cast<unsigned>(id), channel.get())); 274 mStreamIds.InsertElementSorted(id); 275 hasStreamId.AppendElement(std::move(channel)); 276 } else { 277 DC_WARN(("%p: Could not find id for channel %p, calling AnnounceClosed", 278 this, channel.get())); 279 // Spec language is very similar to AnnounceClosed, the differences 280 // being a lack of a closed check at the top, a different error event, 281 // and no removal of the channel from the [[DataChannels]] slot. 282 // We don't support firing errors right now, and we probabaly want the 283 // closed check anyway, and we don't really have something equivalent 284 // to the [[DataChannels]] slot, so just use AnnounceClosed for now. 285 channel->AnnounceClosed(); 286 } 287 } 288 289 mSTS->Dispatch(NS_NewCancelableRunnableFunction( 290 __func__, 291 [this, self = RefPtr<DataChannelConnection>(this), 292 hasStreamId = std::move(hasStreamId)]() { 293 SetState(DataChannelConnectionState::Connecting); 294 for (auto& channel : hasStreamId) { 295 OpenFinish(std::move(channel)); 296 } 297 }), 298 NS_DISPATCH_FALLIBLE); 299 } 300 301 // We do not check whether this is a new transport id here, that happens on 302 // STS. 303 RUN_ON_THREAD(mSTS, 304 WrapRunnable(RefPtr<DataChannelConnection>(this), 305 &DataChannelConnection::SetSignals, aTransportId), 306 NS_DISPATCH_NORMAL); 307 return true; 308 } 309 310 void DataChannelConnection::SetSignals(const std::string& aTransportId) { 311 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 312 if (mTransportId == aTransportId) { 313 // Nothing to do! 314 return; 315 } 316 317 mTransportId = aTransportId; 318 319 if (!mConnectedToTransportHandler) { 320 mPacketReceivedListener = 321 mTransportHandler->GetSctpPacketReceived().Connect( 322 mSTS, this, &DataChannelConnection::OnPacketReceived); 323 mStateChangeListener = mTransportHandler->GetStateChange().Connect( 324 mSTS, this, &DataChannelConnection::TransportStateChange); 325 mConnectedToTransportHandler = true; 326 } 327 // SignalStateChange() doesn't call you with the initial state 328 if (mTransportHandler->GetState(mTransportId, false) == 329 TransportLayer::TS_OPEN) { 330 DC_DEBUG(("%p: Setting transport signals, dtls already open", this)); 331 OnTransportReady(); 332 } else { 333 DC_DEBUG(("%p: Setting transport signals, dtls not open yet", this)); 334 } 335 } 336 337 void DataChannelConnection::TransportStateChange( 338 const std::string& aTransportId, TransportLayer::State aState) { 339 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 340 if (aTransportId == mTransportId) { 341 if (aState == TransportLayer::TS_OPEN) { 342 DC_DEBUG(("%p: Transport is open!", this)); 343 OnTransportReady(); 344 } else if (aState == TransportLayer::TS_CLOSED || 345 aState == TransportLayer::TS_NONE || 346 aState == TransportLayer::TS_ERROR) { 347 DC_DEBUG(("%p: Transport is closed!", this)); 348 CloseAll_s(); 349 } 350 } 351 } 352 353 // Process any pending Opens 354 void DataChannelConnection::ProcessQueuedOpens() { 355 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 356 std::set<RefPtr<DataChannel>> temp(std::move(mPending)); 357 // Technically in an unspecified state, although no reasonable impl will leave 358 // anything in here. 359 mPending.clear(); 360 for (auto channel : temp) { 361 DC_DEBUG(("%p: Processing queued open for %p (%u)", this, channel.get(), 362 channel->mStream)); 363 OpenFinish(channel); // may end up back in mPending 364 } 365 } 366 367 void DataChannelConnection::OnPacketReceived(const std::string& aTransportId, 368 const MediaPacket& packet) { 369 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 370 if (packet.type() == MediaPacket::SCTP && mTransportId == aTransportId) { 371 OnSctpPacketReceived(packet); 372 } 373 } 374 375 void DataChannelConnection::SendPacket(std::unique_ptr<MediaPacket>&& packet) { 376 mSTS->Dispatch(NS_NewCancelableRunnableFunction( 377 "DataChannelConnection::SendPacket", 378 [this, self = RefPtr<DataChannelConnection>(this), 379 packet = std::move(packet)]() mutable { 380 // DC_DEBUG(("%p: SCTP/DTLS sent %ld bytes", 381 // this, len)); 382 if (!mTransportId.empty() && mTransportHandler) { 383 mTransportHandler->SendPacket(mTransportId, 384 std::move(*packet)); 385 } 386 }), 387 NS_DISPATCH_FALLIBLE); 388 } 389 390 already_AddRefed<DataChannel> DataChannelConnection::FindChannelByStream( 391 uint16_t stream) { 392 return mChannels.Get(stream).forget(); 393 } 394 395 uint16_t DataChannelConnection::FindFreeStream() const { 396 MOZ_ASSERT(NS_IsMainThread()); 397 398 MOZ_ASSERT(mAllocateEven.isSome()); 399 if (!mAllocateEven.isSome()) { 400 return INVALID_STREAM; 401 } 402 403 uint16_t i = (*mAllocateEven ? 0 : 1); 404 405 // Find the lowest odd/even id that is not present in mStreamIds 406 for (auto id : mStreamIds) { 407 if (i >= MAX_NUM_STREAMS) { 408 return INVALID_STREAM; 409 } 410 411 if (id == i) { 412 // i is in use, try the next one 413 i += 2; 414 } else if (id > i) { 415 // i is definitely not in use 416 break; 417 } 418 } 419 420 return i; 421 } 422 423 // Returns a POSIX error code. 424 int DataChannelConnection::SendControlMessage(DataChannel& aChannel, 425 const uint8_t* data, 426 uint32_t len) { 427 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 428 // Create message instance and send 429 // Note: Main-thread IO, but doesn't block 430 #if (UINT32_MAX > SIZE_MAX) 431 if (len > SIZE_MAX) { 432 return EMSGSIZE; 433 } 434 #endif 435 436 DataChannelMessageMetadata metadata(aChannel.mStream, 437 DATA_CHANNEL_PPID_CONTROL, false); 438 nsCString buffer(reinterpret_cast<const char*>(data), len); 439 OutgoingMsg msg(std::move(buffer), metadata); 440 441 return SendMessage(aChannel, std::move(msg)); 442 } 443 444 // Returns a POSIX error code. 445 int DataChannelConnection::SendOpenAckMessage(DataChannel& aChannel) { 446 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 447 DC_INFO(("%p: Sending DataChannel open ack, channel %p", this, &aChannel)); 448 struct rtcweb_datachannel_ack ack = {}; 449 ack.msg_type = DATA_CHANNEL_ACK; 450 451 return SendControlMessage(aChannel, (const uint8_t*)&ack, sizeof(ack)); 452 } 453 454 // Returns a POSIX error code. 455 int DataChannelConnection::SendOpenRequestMessage(DataChannel& aChannel) { 456 DC_INFO( 457 ("%p: Sending DataChannel open request, channel %p", this, &aChannel)); 458 const nsACString& label = aChannel.mLabel; 459 const nsACString& protocol = aChannel.mProtocol; 460 const bool unordered = !aChannel.mOrdered; 461 const DataChannelReliabilityPolicy prPolicy = aChannel.mPrPolicy; 462 const uint32_t prValue = aChannel.mPrValue; 463 464 const size_t label_len = label.Length(); // not including nul 465 const size_t proto_len = protocol.Length(); // not including nul 466 // careful - request struct include one char for the label 467 const size_t req_size = sizeof(struct rtcweb_datachannel_open_request) - 1 + 468 label_len + proto_len; 469 UniqueFreePtr<struct rtcweb_datachannel_open_request> req( 470 (struct rtcweb_datachannel_open_request*)moz_xmalloc(req_size)); 471 472 memset(req.get(), 0, req_size); 473 req->msg_type = DATA_CHANNEL_OPEN_REQUEST; 474 switch (prPolicy) { 475 case DataChannelReliabilityPolicy::Reliable: 476 req->channel_type = DATA_CHANNEL_RELIABLE; 477 break; 478 case DataChannelReliabilityPolicy::LimitedLifetime: 479 req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_TIMED; 480 break; 481 case DataChannelReliabilityPolicy::LimitedRetransmissions: 482 req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT; 483 break; 484 default: 485 return EINVAL; 486 } 487 if (unordered) { 488 // Per the current types, all differ by 0x80 between ordered and unordered 489 req->channel_type |= 490 0x80; // NOTE: be careful if new types are added in the future 491 } 492 493 req->reliability_param = htonl(prValue); 494 req->priority = htons(0); /* XXX: add support */ 495 req->label_length = htons(label_len); 496 req->protocol_length = htons(proto_len); 497 memcpy(&req->label[0], PromiseFlatCString(label).get(), label_len); 498 memcpy(&req->label[label_len], PromiseFlatCString(protocol).get(), proto_len); 499 500 // TODO: req_size is an int... that looks hairy 501 return SendControlMessage(aChannel, (const uint8_t*)req.get(), req_size); 502 } 503 504 // Caller must ensure that length <= SIZE_MAX 505 void DataChannelConnection::HandleOpenRequestMessage( 506 const struct rtcweb_datachannel_open_request* req, uint32_t length, 507 uint16_t stream) { 508 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 509 DataChannelReliabilityPolicy prPolicy; 510 511 const size_t requiredLength = (sizeof(*req) - 1) + ntohs(req->label_length) + 512 ntohs(req->protocol_length); 513 if (((size_t)length) != requiredLength) { 514 if (((size_t)length) < requiredLength) { 515 DC_ERROR( 516 ("%p: insufficient length: %u, should be %zu. Unable to continue.", 517 this, length, requiredLength)); 518 return; 519 } 520 DC_WARN(("%p: Inconsistent length: %u, should be %zu", this, length, 521 requiredLength)); 522 } 523 524 DC_DEBUG(("%p: length %u, sizeof(*req) = %zu", this, length, sizeof(*req))); 525 526 switch (req->channel_type) { 527 case DATA_CHANNEL_RELIABLE: 528 case DATA_CHANNEL_RELIABLE_UNORDERED: 529 prPolicy = DataChannelReliabilityPolicy::Reliable; 530 break; 531 case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT: 532 case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT_UNORDERED: 533 prPolicy = DataChannelReliabilityPolicy::LimitedRetransmissions; 534 break; 535 case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED: 536 case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED_UNORDERED: 537 prPolicy = DataChannelReliabilityPolicy::LimitedLifetime; 538 break; 539 default: 540 DC_ERROR(("%p: Unknown channel type %d", this, req->channel_type)); 541 /* XXX error handling */ 542 return; 543 } 544 545 if (stream >= mNegotiatedIdLimit) { 546 DC_ERROR( 547 ("%p: stream %u out of bounds (%u)", this, stream, mNegotiatedIdLimit)); 548 return; 549 } 550 551 const uint32_t prValue = ntohl(req->reliability_param); 552 const bool ordered = !(req->channel_type & 0x80); 553 const nsCString label(&req->label[0], ntohs(req->label_length)); 554 const nsCString protocol(&req->label[ntohs(req->label_length)], 555 ntohs(req->protocol_length)); 556 557 // Always dispatch this to mainthread; this is a brand new datachannel, which 558 // has not had any opportunity to be transferred to a worker. 559 Dispatch( 560 NS_NewCancelableRunnableFunction( 561 "DataChannelConnection::HandleOpenRequestMessage", 562 [this, self = RefPtr<DataChannelConnection>(this), stream, prPolicy, 563 prValue, ordered, label, protocol]() { 564 RefPtr<DataChannel> channel = FindChannelByStream(stream); 565 if (channel) { 566 if (!channel->mNegotiated) { 567 DC_ERROR(( 568 "HandleOpenRequestMessage: channel for pre-existing stream " 569 "%u that was not externally negotiated. JS is lying to us, " 570 "or there's an id collision.", 571 stream)); 572 /* XXX: some error handling */ 573 } else { 574 DC_DEBUG(("Open for externally negotiated channel %u", stream)); 575 // XXX should also check protocol, maybe label 576 if (prPolicy != channel->mPrPolicy || 577 prValue != channel->mPrValue || 578 ordered != channel->mOrdered) { 579 DC_WARN( 580 ("external negotiation mismatch with OpenRequest:" 581 "channel %u, policy %s/%s, value %u/%u, ordered %d/%d", 582 stream, ToString(prPolicy), ToString(channel->mPrPolicy), 583 prValue, channel->mPrValue, static_cast<int>(ordered), 584 static_cast<int>(channel->mOrdered))); 585 } 586 } 587 return; 588 } 589 channel = new DataChannel(this, stream, label, protocol, prPolicy, 590 prValue, ordered, false); 591 mChannels.Insert(channel); 592 mStreamIds.InsertElementSorted(stream); 593 594 DC_INFO(("%p: sending ON_CHANNEL_CREATED for %p/%s/%s: %u", this, 595 channel.get(), channel->mLabel.get(), 596 channel->mProtocol.get(), stream)); 597 598 // Awkward. If we convert over to using Maybe for this in 599 // DataChannel, we won't need to have this extra conversion, since 600 // Nullable converts easily to Maybe. 601 dom::Nullable<uint16_t> maxLifeTime; 602 dom::Nullable<uint16_t> maxRetransmits; 603 if (prPolicy == DataChannelReliabilityPolicy::LimitedLifetime) { 604 maxLifeTime.SetValue(std::min( 605 std::numeric_limits<uint16_t>::max(), (uint16_t)prValue)); 606 } else if (prPolicy == 607 DataChannelReliabilityPolicy::LimitedRetransmissions) { 608 maxRetransmits.SetValue(std::min( 609 std::numeric_limits<uint16_t>::max(), (uint16_t)prValue)); 610 } 611 612 if (mListener) { 613 // important to give it an already_AddRefed pointer! 614 // TODO(bug 1974443): Have nsDOMDataChannel create the DataChannel 615 // object, or have DataChannel take an nsDOMDataChannel, to avoid 616 // passing this param list more than once? 617 mListener->NotifyDataChannel(do_AddRef(channel), label, ordered, 618 maxLifeTime, maxRetransmits, 619 protocol, false); 620 // Spec says to queue this in the queued task for ondatachannel 621 channel->AnnounceOpen(); 622 } 623 624 mSTS->Dispatch( 625 NS_NewCancelableRunnableFunction( 626 "DataChannelConnection::HandleOpenRequestMessage", 627 [this, self = RefPtr<DataChannelConnection>(this), 628 channel = std::move(channel)]() { 629 // Note that any message can be buffered; 630 // SendOpenAckMessage may error later than this check. 631 const auto error = SendOpenAckMessage(*channel); 632 if (error) { 633 DC_ERROR( 634 ("%p: SendOpenAckMessage failed, channel %p, error " 635 "= %d", 636 this, channel.get(), error)); 637 FinishClose_s(channel); 638 return; 639 } 640 channel->mWaitingForAck = false; 641 channel->mSendStreamNeedsReset = true; 642 channel->mRecvStreamNeedsReset = true; 643 OnStreamOpen(channel->mStream); 644 }), 645 NS_DISPATCH_FALLIBLE); 646 }), 647 NS_DISPATCH_FALLIBLE); 648 } 649 650 // Caller must ensure that length <= SIZE_MAX 651 void DataChannelConnection::HandleOpenAckMessage( 652 const struct rtcweb_datachannel_ack* ack, uint32_t length, 653 uint16_t stream) { 654 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 655 656 RefPtr<DataChannel> channel = FindChannelByStream(stream); 657 if (NS_WARN_IF(!channel)) { 658 return; 659 } 660 661 DC_INFO(("%p: OpenAck received for channel %p, stream %u, waiting=%d", this, 662 channel.get(), stream, channel->mWaitingForAck ? 1 : 0)); 663 664 channel->mWaitingForAck = false; 665 666 // Either externally negotiated or we sent Open 667 channel->AnnounceOpen(); 668 OnStreamOpen(stream); 669 } 670 671 // Caller must ensure that length <= SIZE_MAX 672 void DataChannelConnection::HandleUnknownMessage(uint32_t ppid, uint32_t length, 673 uint16_t stream) { 674 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 675 /* XXX: Send an error message? */ 676 DC_ERROR(("%p: unknown DataChannel message received: %u, len %u on stream %d", 677 this, ppid, length, stream)); 678 // XXX Log to JS error console if possible 679 } 680 681 void DataChannelConnection::HandleDataMessage(IncomingMsg&& aMsg) { 682 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 683 684 RefPtr<DataChannel> channel = FindChannelByStream(aMsg.GetStreamId()); 685 if (!channel) { 686 MOZ_ASSERT(false, 687 "Wait until OnStreamOpen is called before calling " 688 "HandleDataMessage!"); 689 return; 690 } 691 692 const size_t data_length = aMsg.GetData().Length(); 693 bool isBinary = false; 694 695 switch (aMsg.GetPpid()) { 696 case DATA_CHANNEL_PPID_DOMSTRING: 697 case DATA_CHANNEL_PPID_DOMSTRING_PARTIAL: 698 DC_DEBUG( 699 ("%p: DataChannel: Received string message of length %zu on " 700 "channel %u", 701 this, data_length, channel->mStream)); 702 // WebSockets checks IsUTF8() here; we can try to deliver it 703 break; 704 705 case DATA_CHANNEL_PPID_DOMSTRING_EMPTY: 706 DC_DEBUG( 707 ("%p: DataChannel: Received empty string message of length %zu on " 708 "channel %u", 709 this, data_length, channel->mStream)); 710 // Just in case. 711 aMsg.GetData().Truncate(0); 712 break; 713 714 case DATA_CHANNEL_PPID_BINARY: 715 case DATA_CHANNEL_PPID_BINARY_PARTIAL: 716 DC_DEBUG( 717 ("%p: DataChannel: Received binary message of length %zu on " 718 "channel id %u", 719 this, data_length, channel->mStream)); 720 isBinary = true; 721 break; 722 723 case DATA_CHANNEL_PPID_BINARY_EMPTY: 724 DC_DEBUG( 725 ("%p: DataChannel: Received empty binary message of length %zu on " 726 "channel id %u", 727 this, data_length, channel->mStream)); 728 // Just in case. 729 aMsg.GetData().Truncate(0); 730 isBinary = true; 731 break; 732 733 default: 734 NS_ERROR("Unknown data PPID"); 735 DC_ERROR(("%p: Unknown data PPID %" PRIu32, this, aMsg.GetPpid())); 736 return; 737 } 738 739 channel->OnMessageReceived(std::move(aMsg.GetData()), isBinary); 740 } 741 742 void DataChannelConnection::HandleDCEPMessage(IncomingMsg&& aMsg) { 743 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 744 const struct rtcweb_datachannel_open_request* req; 745 const struct rtcweb_datachannel_ack* ack; 746 747 req = reinterpret_cast<const struct rtcweb_datachannel_open_request*>( 748 aMsg.GetData().BeginReading()); 749 750 size_t data_length = aMsg.GetLength(); 751 752 DC_INFO(("%p: Handling DCEP message of length %zu", this, data_length)); 753 754 // Ensure minimum message size (ack is the smallest DCEP message) 755 if (data_length < sizeof(*ack)) { 756 DC_WARN(("%p: Ignored invalid DCEP message (too short)", this)); 757 return; 758 } 759 760 switch (req->msg_type) { 761 case DATA_CHANNEL_OPEN_REQUEST: 762 // structure includes a possibly-unused char label[1] (in a packed 763 // structure) 764 if (NS_WARN_IF(data_length < sizeof(*req) - 1)) { 765 return; 766 } 767 768 HandleOpenRequestMessage(req, data_length, aMsg.GetStreamId()); 769 break; 770 case DATA_CHANNEL_ACK: 771 // >= sizeof(*ack) checked above 772 773 ack = reinterpret_cast<const struct rtcweb_datachannel_ack*>( 774 aMsg.GetData().BeginReading()); 775 HandleOpenAckMessage(ack, data_length, aMsg.GetStreamId()); 776 break; 777 default: 778 HandleUnknownMessage(aMsg.GetPpid(), data_length, aMsg.GetStreamId()); 779 break; 780 } 781 } 782 783 bool DataChannelConnection::ReassembleMessageChunk(IncomingMsg& aReassembled, 784 const void* buffer, 785 size_t length, uint32_t ppid, 786 uint16_t stream) { 787 // Note: Until we support SIZE_MAX sized messages, we need this check 788 #if (SIZE_MAX > UINT32_MAX) 789 if (length > UINT32_MAX) { 790 DC_ERROR(("%p: Cannot handle message of size %zu (max=%u)", this, length, 791 UINT32_MAX)); 792 return false; 793 } 794 #endif 795 796 // Ensure it doesn't blow up our buffer 797 // TODO: Change 'WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL' to whatever the 798 // new buffer is capable of holding. 799 if (length + aReassembled.GetLength() > 800 WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL) { 801 DC_ERROR( 802 ("%p: Buffered message would become too large to handle, closing " 803 "connection", 804 this)); 805 return false; 806 } 807 808 if (aReassembled.GetPpid() != ppid) { 809 NS_WARNING("DataChannel message aborted by fragment type change!"); 810 return false; 811 } 812 813 aReassembled.Append((uint8_t*)buffer, length); 814 815 return true; 816 } 817 818 void DataChannelConnection::OnStreamsReset(std::vector<uint16_t>&& aStreams) { 819 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 820 for (auto stream : aStreams) { 821 DC_INFO(("%p: Received reset request for stream %u", this, stream)); 822 RefPtr<DataChannel> channel = FindChannelByStream(stream); 823 if (channel) { 824 channel->mRecvStreamNeedsReset = false; 825 if (channel->mSendStreamNeedsReset) { 826 // We do not send our own reset yet, we give the RTCDataChannel a chance 827 // to finish sending messages first. 828 DC_INFO(("%p: Need to send a reset, closing gracefully", this)); 829 channel->GracefulClose(); 830 } else { 831 DC_INFO( 832 ("%p: We've already reset our stream, closing immediately", this)); 833 FinishClose_s(channel); 834 } 835 } 836 } 837 } 838 839 void DataChannelConnection::OnStreamsResetComplete( 840 std::vector<uint16_t>&& aStreams) { 841 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 842 for (auto stream : aStreams) { 843 DC_INFO(("%p: Received reset response for stream %u", this, stream)); 844 RefPtr<DataChannel> channel = FindChannelByStream(stream); 845 if (channel) { 846 channel->mSendStreamNeedsReset = false; 847 if (!channel->mRecvStreamNeedsReset) { 848 // The other end has already performed its reset 849 DC_INFO( 850 ("%p: Remote stream has already been reset, closing immediately", 851 this)); 852 FinishClose_s(channel); 853 } 854 } 855 } 856 } 857 858 already_AddRefed<DataChannel> DataChannelConnection::Open( 859 const nsACString& label, const nsACString& protocol, 860 DataChannelReliabilityPolicy prPolicy, bool inOrder, uint32_t prValue, 861 bool aExternalNegotiated, uint16_t aStream) { 862 MOZ_ASSERT(NS_IsMainThread()); 863 if (!aExternalNegotiated) { 864 if (mAllocateEven.isSome()) { 865 aStream = FindFreeStream(); 866 if (aStream == INVALID_STREAM) { 867 return nullptr; 868 } 869 } else { 870 // We do not yet know whether we are client or server, and an id has not 871 // been chosen for us. We will need to choose later. 872 aStream = INVALID_STREAM; 873 } 874 } 875 876 DC_INFO( 877 ("%p: DC Open: label %s/%s, type %s, inorder %d, prValue %u, " 878 "external: %s, stream %u", 879 this, PromiseFlatCString(label).get(), 880 PromiseFlatCString(protocol).get(), ToString(prPolicy), inOrder, prValue, 881 aExternalNegotiated ? "true" : "false", aStream)); 882 883 if ((prPolicy == DataChannelReliabilityPolicy::Reliable) && (prValue != 0)) { 884 return nullptr; 885 } 886 887 if (aStream != INVALID_STREAM) { 888 if (mStreamIds.ContainsSorted(aStream)) { 889 DC_ERROR(("%p: external negotiation of already-open channel %u", this, 890 aStream)); 891 // This is the only place where duplicate id checking is performed. The 892 // JSImpl code assumes that any error is due to id-related problems. This 893 // probably needs some cleanup. 894 return nullptr; 895 } 896 897 DC_DEBUG(("%p: Inserting externally-negotiated id %u", this, 898 static_cast<unsigned>(aStream))); 899 mStreamIds.InsertElementSorted(aStream); 900 } 901 902 RefPtr<DataChannel> channel(new DataChannel(this, aStream, label, protocol, 903 prPolicy, prValue, inOrder, 904 aExternalNegotiated)); 905 mChannels.Insert(channel); 906 907 if (aStream != INVALID_STREAM) { 908 mSTS->Dispatch(NS_NewCancelableRunnableFunction( 909 "DataChannel::OpenFinish", 910 [this, self = RefPtr<DataChannelConnection>(this), 911 channel]() mutable { OpenFinish(channel); }), 912 NS_DISPATCH_FALLIBLE); 913 } 914 915 return channel.forget(); 916 } 917 918 // Separate routine so we can also call it to finish up from pending opens 919 void DataChannelConnection::OpenFinish(RefPtr<DataChannel> aChannel) { 920 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 921 const uint16_t stream = aChannel->mStream; 922 923 // Cases we care about: 924 // Pre-negotiated: 925 // Not Open: 926 // Doesn't fit: 927 // -> change initial ask or renegotiate after open 928 // -> queue open 929 // Open: 930 // Doesn't fit: 931 // -> RaiseStreamLimitTo && queue 932 // Does fit: 933 // -> open 934 // Not negotiated: 935 // Not Open: 936 // -> queue open 937 // Open: 938 // -> Try to get a stream 939 // Doesn't fit: 940 // -> RaiseStreamLimitTo && queue 941 // Does fit: 942 // -> open 943 // So the Open cases are basically the same 944 // Not Open cases are simply queue for non-negotiated, and 945 // either change the initial ask or possibly renegotiate after open. 946 DataChannelConnectionState state = GetState(); 947 if (state != DataChannelConnectionState::Open || 948 stream >= mNegotiatedIdLimit) { 949 if (state == DataChannelConnectionState::Open) { 950 MOZ_ASSERT(stream != INVALID_STREAM); 951 // RaiseStreamLimitTo() limits to MAX_NUM_STREAMS -- allocate extra 952 // streams to avoid asking for more every time we want a higher limit. 953 uint16_t num_desired = std::min(16 * (stream / 16 + 1), MAX_NUM_STREAMS); 954 DC_DEBUG(("%p: Attempting to raise stream limit %u -> %u", this, 955 mNegotiatedIdLimit, num_desired)); 956 if (!RaiseStreamLimitTo(num_desired)) { 957 NS_ERROR("Failed to request more streams"); 958 FinishClose_s(aChannel); 959 return; 960 } 961 } 962 DC_INFO(("%p: Queuing channel %p (%u) to finish open", this, aChannel.get(), 963 stream)); 964 mPending.insert(std::move(aChannel)); 965 return; 966 } 967 968 MOZ_ASSERT(state == DataChannelConnectionState::Open); 969 MOZ_ASSERT(stream != INVALID_STREAM); 970 MOZ_ASSERT(stream < mNegotiatedIdLimit); 971 972 if (!aChannel->mNegotiated) { 973 if (!aChannel->mOrdered) { 974 // Don't send unordered until this gets cleared. 975 aChannel->mWaitingForAck = true; 976 } 977 978 const int error = SendOpenRequestMessage(*aChannel); 979 if (error) { 980 DC_ERROR(("%p: SendOpenRequest failed, error = %d", this, error)); 981 FinishClose_s(aChannel); 982 return; 983 } 984 } 985 986 // Even if we're in the negotiated case, and will never send an open request, 987 // we're supposed to send a stream reset when we tear down. 988 aChannel->mSendStreamNeedsReset = true; 989 aChannel->mRecvStreamNeedsReset = true; 990 991 if (aChannel->mNegotiated) { 992 // Either externally negotiated or we sent Open 993 aChannel->AnnounceOpen(); 994 OnStreamOpen(stream); 995 } 996 } 997 998 nsISerialEventTarget* DataChannelConnection::GetIOThread() { 999 // Spawn a thread to send the data 1000 if (!mInternalIOThread) { 1001 // TODO(bug 1998966): Lazy shutdown once done? Maybe have this live in 1002 // DataChannel (so we have an IO thread for each channel that sends blobs)? 1003 NS_NewNamedThread("DataChannel IO", getter_AddRefs(mInternalIOThread)); 1004 } 1005 1006 return mInternalIOThread.get(); 1007 } 1008 1009 void DataChannelConnection::SetState(DataChannelConnectionState aState) { 1010 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 1011 1012 DC_DEBUG( 1013 ("%p: DataChannelConnection labeled %s switching connection state %s -> " 1014 "%s", 1015 this, mTransportId.c_str(), ToString(mState), ToString(aState))); 1016 1017 if (mState == aState) { 1018 return; 1019 } 1020 1021 mState = aState; 1022 1023 if (mState == DataChannelConnectionState::Open) { 1024 Dispatch(NS_NewCancelableRunnableFunction( 1025 __func__, 1026 [this, self = RefPtr<DataChannelConnection>(this)]() { 1027 if (mListener) { 1028 mListener->NotifySctpConnected(); 1029 } 1030 }), 1031 NS_DISPATCH_FALLIBLE); 1032 } else if (mState == DataChannelConnectionState::Closed) { 1033 Dispatch(NS_NewCancelableRunnableFunction( 1034 __func__, 1035 [this, self = RefPtr<DataChannelConnection>(this)]() { 1036 if (mListener) { 1037 mListener->NotifySctpClosed(); 1038 } 1039 }), 1040 NS_DISPATCH_FALLIBLE); 1041 } 1042 } 1043 1044 void DataChannelConnection::SendDataMessage(DataChannel& aChannel, 1045 nsACString&& aMsg, bool aIsBinary) { 1046 // Could be main, could be a worker 1047 1048 nsCString temp(std::move(aMsg)); 1049 1050 mSTS->Dispatch( 1051 NS_NewCancelableRunnableFunction( 1052 __func__, 1053 [this, self = RefPtr<DataChannelConnection>(this), 1054 channel = RefPtr(&aChannel), msg = std::move(temp), 1055 aIsBinary]() mutable { 1056 Maybe<uint16_t> maxRetransmissions; 1057 Maybe<uint16_t> maxLifetimeMs; 1058 1059 switch (channel->mPrPolicy) { 1060 case DataChannelReliabilityPolicy::Reliable: 1061 break; 1062 case DataChannelReliabilityPolicy::LimitedRetransmissions: 1063 maxRetransmissions = Some(channel->mPrValue); 1064 break; 1065 case DataChannelReliabilityPolicy::LimitedLifetime: 1066 maxLifetimeMs = Some(channel->mPrValue); 1067 break; 1068 } 1069 1070 uint32_t ppid; 1071 if (aIsBinary) { 1072 if (msg.Length()) { 1073 ppid = DATA_CHANNEL_PPID_BINARY; 1074 } else { 1075 ppid = DATA_CHANNEL_PPID_BINARY_EMPTY; 1076 msg.Append('\0'); 1077 } 1078 } else { 1079 if (msg.Length()) { 1080 ppid = DATA_CHANNEL_PPID_DOMSTRING; 1081 } else { 1082 ppid = DATA_CHANNEL_PPID_DOMSTRING_EMPTY; 1083 msg.Append('\0'); 1084 } 1085 } 1086 1087 DataChannelMessageMetadata metadata( 1088 channel->mStream, ppid, 1089 !channel->mOrdered && !channel->mWaitingForAck, 1090 maxRetransmissions, maxLifetimeMs); 1091 // Create message instance and send 1092 OutgoingMsg outgoing(std::move(msg), metadata); 1093 1094 SendMessage(*channel, std::move(outgoing)); 1095 }), 1096 NS_DISPATCH_FALLIBLE); 1097 } 1098 1099 void DataChannelConnection::EndOfStream(const RefPtr<DataChannel>& aChannel) { 1100 mSTS->Dispatch( 1101 NS_NewCancelableRunnableFunction( 1102 __func__, 1103 [this, self = RefPtr<DataChannelConnection>(this), channel = aChannel, 1104 stream = aChannel->mStream]() { 1105 if (channel->mSendStreamNeedsReset) { 1106 if (channel->mEndOfStreamCalled) { 1107 return; 1108 } 1109 channel->mEndOfStreamCalled = true; 1110 DC_INFO(( 1111 "%p: Need to send a reset for channel %p, closing gracefully", 1112 this, channel.get())); 1113 nsTArray<uint16_t> temp({stream}); 1114 bool success = ResetStreams(temp); 1115 if (success) { 1116 return; 1117 } 1118 // We presume that OnStreamResetComplete will not be called in 1119 // this case, nor will we receive a stream reset from the other 1120 // end. 1121 DC_INFO( 1122 ("%p: Failed to send a reset for channel %p, closing " 1123 "immediately", 1124 this, channel.get())); 1125 channel->mRecvStreamNeedsReset = false; 1126 } 1127 1128 if (!channel->mRecvStreamNeedsReset) { 1129 // Stream is reset in both directions (or never existed in the 1130 // first place), we're ready to finish tearing down. 1131 DC_INFO( 1132 ("%p: Stream does not need reset in either direction for " 1133 "channel %p", 1134 this, channel.get())); 1135 FinishClose_s(channel); 1136 } 1137 }), 1138 NS_DISPATCH_FALLIBLE); 1139 } 1140 1141 void DataChannel::EndOfStream() { 1142 // This can happen before mDomEventTarget is actually ready. 1143 if (mConnection) { 1144 mConnection->EndOfStream(this); 1145 } 1146 } 1147 1148 void DataChannelConnection::FinishClose_s(const RefPtr<DataChannel>& aChannel) { 1149 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 1150 1151 // We're removing this from all containers, make sure the passed pointer 1152 // stays valid. 1153 // It is possible for this to be called twice if both JS and the transport 1154 // side cause closure at the same time, but this is idempotent so no big deal 1155 aChannel->mBufferedData.Clear(); 1156 mChannels.Remove(aChannel); 1157 mPending.erase(aChannel); 1158 1159 // Close the channel's data transport by following the associated 1160 // procedure. 1161 aChannel->AnnounceClosed(); 1162 } 1163 1164 void DataChannelConnection::CloseAll_s() { 1165 // Make sure no more channels will be opened 1166 SetState(DataChannelConnectionState::Closed); 1167 1168 nsTArray<uint16_t> streamsToReset; 1169 // Close current channels 1170 // If there are runnables, they hold a strong ref and keep the channel 1171 // and/or connection alive (even if in a CLOSED state) 1172 for (auto& channel : mChannels.GetAll()) { 1173 DC_INFO(("%p: closing channel %p, stream %u", this, channel.get(), 1174 channel->mStream)); 1175 if (channel->mSendStreamNeedsReset) { 1176 DC_INFO(("%p: channel %p needs to send reset", this, channel.get())); 1177 channel->mSendStreamNeedsReset = false; 1178 streamsToReset.AppendElement(channel->mStream); 1179 } 1180 // We do not wait for the reset to finish in this case; we won't be around 1181 // to see the response. 1182 FinishClose_s(channel); 1183 } 1184 1185 // Clean up any pending opens for channels 1186 std::set<RefPtr<DataChannel>> temp(std::move(mPending)); 1187 // Technically in an unspecified state, although no reasonable impl will leave 1188 // anything in here. 1189 mPending.clear(); 1190 for (const auto& channel : temp) { 1191 DC_INFO(("%p: closing pending channel %p, stream %u", this, channel.get(), 1192 channel->mStream)); 1193 FinishClose_s(channel); // also releases the ref on each iteration 1194 } 1195 1196 // It's more efficient to let the Resets queue in shutdown and then 1197 // ResetStreams() here. 1198 if (!streamsToReset.IsEmpty()) { 1199 ResetStreams(streamsToReset); 1200 } 1201 } 1202 1203 void DataChannelConnection::CloseAll() { 1204 MOZ_ASSERT(NS_IsMainThread()); 1205 DC_INFO(("%p: Closing all channels", this)); 1206 1207 mSTS->Dispatch(NS_NewCancelableRunnableFunction( 1208 "DataChannelConnection::CloseAll", 1209 [this, self = RefPtr<DataChannelConnection>(this)]() { 1210 CloseAll_s(); 1211 }), 1212 NS_DISPATCH_FALLIBLE); 1213 } 1214 1215 void DataChannelConnection::MarkStreamAvailable(uint16_t aStream) { 1216 MOZ_ASSERT(NS_IsMainThread()); 1217 mStreamIds.RemoveElementSorted(aStream); 1218 } 1219 1220 bool DataChannelConnection::Channels::IdComparator::Equals( 1221 const RefPtr<DataChannel>& aChannel, uint16_t aId) const { 1222 return aChannel->mStream == aId; 1223 } 1224 1225 bool DataChannelConnection::Channels::IdComparator::LessThan( 1226 const RefPtr<DataChannel>& aChannel, uint16_t aId) const { 1227 return aChannel->mStream < aId; 1228 } 1229 1230 bool DataChannelConnection::Channels::IdComparator::Equals( 1231 const RefPtr<DataChannel>& a1, const RefPtr<DataChannel>& a2) const { 1232 return Equals(a1, a2->mStream); 1233 } 1234 1235 bool DataChannelConnection::Channels::IdComparator::LessThan( 1236 const RefPtr<DataChannel>& a1, const RefPtr<DataChannel>& a2) const { 1237 return LessThan(a1, a2->mStream); 1238 } 1239 1240 void DataChannelConnection::Channels::Insert( 1241 const RefPtr<DataChannel>& aChannel) { 1242 DC_DEBUG(("%p: Inserting channel %u : %p", this, aChannel->mStream, 1243 aChannel.get())); 1244 MutexAutoLock lock(mMutex); 1245 if (aChannel->mStream != INVALID_STREAM) { 1246 MOZ_ASSERT(!mChannels.ContainsSorted(aChannel, IdComparator())); 1247 } 1248 1249 MOZ_ASSERT(!mChannels.Contains(aChannel)); 1250 1251 mChannels.InsertElementSorted(aChannel, IdComparator()); 1252 } 1253 1254 bool DataChannelConnection::Channels::Remove( 1255 const RefPtr<DataChannel>& aChannel) { 1256 DC_DEBUG(("%p: Removing channel %u : %p", this, aChannel->mStream, 1257 aChannel.get())); 1258 MutexAutoLock lock(mMutex); 1259 if (aChannel->mStream == INVALID_STREAM) { 1260 return mChannels.RemoveElement(aChannel); 1261 } 1262 1263 auto index = mChannels.BinaryIndexOf(aChannel->mStream, IdComparator()); 1264 if (index != ChannelArray::NoIndex) { 1265 if (mChannels[index].get() == aChannel.get()) { 1266 mChannels.RemoveElementAt(index); 1267 return true; 1268 } 1269 } 1270 return false; 1271 } 1272 1273 RefPtr<DataChannel> DataChannelConnection::Channels::Get(uint16_t aId) const { 1274 MutexAutoLock lock(mMutex); 1275 auto index = mChannels.BinaryIndexOf(aId, IdComparator()); 1276 if (index == ChannelArray::NoIndex) { 1277 return nullptr; 1278 } 1279 return mChannels[index]; 1280 } 1281 1282 RefPtr<DataChannel> DataChannelConnection::Channels::GetNextChannel( 1283 uint16_t aCurrentId) const { 1284 MutexAutoLock lock(mMutex); 1285 if (mChannels.IsEmpty()) { 1286 return nullptr; 1287 } 1288 1289 auto index = mChannels.IndexOfFirstElementGt(aCurrentId, IdComparator()); 1290 if (index == mChannels.Length()) { 1291 index = 0; 1292 } 1293 return mChannels[index]; 1294 } 1295 1296 DataChannel::DataChannel(DataChannelConnection* connection, uint16_t stream, 1297 const nsACString& label, const nsACString& protocol, 1298 DataChannelReliabilityPolicy policy, uint32_t value, 1299 bool ordered, bool negotiated) 1300 : mLabel(label), 1301 mProtocol(protocol), 1302 mPrPolicy(policy), 1303 mPrValue(value), 1304 mNegotiated(negotiated), 1305 mOrdered(ordered), 1306 mStream(stream), 1307 mConnection(connection), 1308 mDomEventTarget(new StopGapEventTarget) { 1309 DC_INFO( 1310 ("%p: Necko DataChannel created, label '%s'. Waiting for RTCDataChannel " 1311 "to be created.", 1312 this, mLabel.get())); 1313 NS_ASSERTION(mConnection, "NULL connection"); 1314 } 1315 1316 DataChannel::~DataChannel() { 1317 DC_INFO(("%p: DataChannel is being destroyed.", this)); 1318 } 1319 1320 void DataChannel::SetMainthreadDomDataChannel(dom::RTCDataChannel* aChannel) { 1321 MOZ_ASSERT(NS_IsMainThread()); 1322 DC_INFO( 1323 ("%p: Mainthread RTCDataChannel created(%p). Waiting for confirmation of " 1324 "event target.", 1325 this, aChannel)); 1326 mMainthreadDomDataChannel = aChannel; 1327 SetMaxMessageSize(mConnection->GetMaxMessageSize()); 1328 if (GetStream()) { 1329 mMainthreadDomDataChannel->SetId(*GetStream()); 1330 } 1331 } 1332 1333 void DataChannel::OnWorkerTransferStarted() { 1334 MOZ_ASSERT(NS_IsMainThread()); 1335 DC_INFO(( 1336 "%p: RTCDataChannel is being transferred. Disabling synchronous updates. " 1337 "Mainthread will not be our event target, waiting to learn worker " 1338 "thread.", 1339 this)); 1340 mHasWorkerDomDataChannel = true; 1341 } 1342 1343 void DataChannel::OnWorkerTransferComplete(dom::RTCDataChannel* aChannel) { 1344 MOZ_ASSERT(!NS_IsMainThread()); 1345 DC_INFO( 1346 ("%p: Worker RTCDataChannel created(%p). Worker thread is our event " 1347 "target.", 1348 this, aChannel)); 1349 mWorkerDomDataChannel = aChannel; 1350 mDomEventTarget->SetRealEventTarget(GetCurrentSerialEventTarget()); 1351 } 1352 1353 void DataChannel::OnWorkerTransferDisabled() { 1354 MOZ_ASSERT(NS_IsMainThread()); 1355 DC_INFO( 1356 ("%p: Mainthread RTCDataChannel is no longer eligible for transfer. " 1357 "Mainthread is our event target.", 1358 this)); 1359 mDomEventTarget->SetRealEventTarget(GetCurrentSerialEventTarget()); 1360 } 1361 1362 void DataChannel::UnsetMainthreadDomDataChannel() { 1363 MOZ_ASSERT(NS_IsMainThread()); 1364 DC_INFO(("%p: Mainthread RTCDataChannel is being destroyed(%p).", this, 1365 mMainthreadDomDataChannel)); 1366 mMainthreadDomDataChannel = nullptr; 1367 if (mHasWorkerDomDataChannel) { 1368 DC_INFO( 1369 ("Mainthread RTCDataChannel is being destroyed. Dispatching task to " 1370 "inform corresponding worker RTCDataChannel.")); 1371 mDomEventTarget->Dispatch( 1372 NS_NewCancelableRunnableFunction( 1373 "DataChannel::UnsetMainthreadDomDataChannel", 1374 [this, self = RefPtr<DataChannel>(this)] { 1375 if (mWorkerDomDataChannel) { 1376 mWorkerDomDataChannel->UnsetWorkerNeedsUs(); 1377 } 1378 }), 1379 NS_DISPATCH_FALLIBLE); 1380 } else { 1381 DC_INFO(("%p: No worker RTCDataChannel. Closing.", this)); 1382 EndOfStream(); 1383 } 1384 } 1385 1386 void DataChannel::UnsetWorkerDomDataChannel() { 1387 MOZ_ASSERT(!NS_IsMainThread()); 1388 MOZ_ASSERT(mDomEventTarget->IsOnCurrentThread()); 1389 DC_INFO(("%p: Worker RTCDataChannel is being destroyed(%p). Closing.", this, 1390 mWorkerDomDataChannel)); 1391 mWorkerDomDataChannel = nullptr; 1392 EndOfStream(); 1393 } 1394 1395 void DataChannel::DecrementBufferedAmount(size_t aSize) { 1396 mDomEventTarget->Dispatch( 1397 NS_NewCancelableRunnableFunction( 1398 "DataChannel::DecrementBufferedAmount", 1399 [this, self = RefPtr<DataChannel>(this), aSize] { 1400 if (GetDomDataChannel()) { 1401 GetDomDataChannel()->DecrementBufferedAmount(aSize); 1402 } 1403 }), 1404 NS_DISPATCH_FALLIBLE); 1405 } 1406 1407 void DataChannel::AnnounceOpen() { 1408 // When an underlying data transport is to be announced (the other peer 1409 // created a channel with negotiated unset or set to false), the user agent of 1410 // the peer that did not initiate the creation process MUST queue a task to 1411 // run the following steps: 1412 DC_INFO( 1413 ("%p: DataChannel is open. Queueing AnnounceOpen call to RTCDataChannel.", 1414 this)); 1415 1416 mDomEventTarget->Dispatch( 1417 NS_NewCancelableRunnableFunction( 1418 "DataChannel::AnnounceOpen", 1419 [this, self = RefPtr<DataChannel>(this)] { 1420 if (GetDomDataChannel()) { 1421 DC_INFO(("Calling AnnounceOpen on RTCDataChannel.")); 1422 GetDomDataChannel()->AnnounceOpen(); 1423 } 1424 1425 // Right now, we're already on mainthread, but this might be a 1426 // worker someday. 1427 if (mConnection) { 1428 GetMainThreadSerialEventTarget()->Dispatch( 1429 NS_NewCancelableRunnableFunction( 1430 "DataChannel::AnnounceOpen", 1431 [this, self = RefPtr<DataChannel>(this), 1432 connection = mConnection]() { 1433 // Stats stuff 1434 // TODO: Can we simplify this? 1435 if (!mEverOpened && connection->mListener) { 1436 mEverOpened = true; 1437 connection->mListener->NotifyDataChannelOpen(this); 1438 } 1439 }), 1440 NS_DISPATCH_FALLIBLE); 1441 } 1442 }), 1443 NS_DISPATCH_FALLIBLE); 1444 } 1445 1446 void DataChannel::AnnounceClosed() { 1447 // When an RTCDataChannel object's underlying data transport has been closed, 1448 // the user agent MUST queue a task to run the following steps: 1449 DC_INFO( 1450 ("%p: DataChannel is closed. Queueing AnnounceClosed call to " 1451 "RTCDataChannel.", 1452 this)); 1453 1454 GetMainThreadSerialEventTarget()->Dispatch( 1455 NS_NewCancelableRunnableFunction( 1456 "DataChannel::AnnounceClosed", 1457 [this, self = RefPtr<DataChannel>(this), connection = mConnection]() { 1458 if (mAnnouncedClosed) { 1459 return; 1460 } 1461 mAnnouncedClosed = true; 1462 // We have to unset this first, and then fire DOM events, so the 1463 // event handler won't hit an error if it tries to reuse this id. 1464 if (mStream != INVALID_STREAM) { 1465 DC_INFO(("%p: Marking stream id %u available", this, mStream)); 1466 connection->MarkStreamAvailable(mStream); 1467 } 1468 1469 // Stats stuff 1470 if (mEverOpened && connection->mListener) { 1471 connection->mListener->NotifyDataChannelClosed(this); 1472 } 1473 1474 DC_INFO(("%p: Dispatching AnnounceClosed to DOM thread", this)); 1475 mDomEventTarget->Dispatch( 1476 NS_NewCancelableRunnableFunction( 1477 "DataChannel::AnnounceClosed", 1478 [this, self = RefPtr<DataChannel>(this)] { 1479 DC_INFO(("%p: Attempting to call AnnounceClosed.", this)); 1480 if (GetDomDataChannel()) { 1481 DC_INFO( 1482 ("%p: Calling AnnounceClosed on RTCDataChannel.", 1483 this)); 1484 GetDomDataChannel()->AnnounceClosed(); 1485 } 1486 }), 1487 NS_DISPATCH_FALLIBLE); 1488 }), 1489 NS_DISPATCH_FALLIBLE); 1490 } 1491 1492 void DataChannel::GracefulClose() { 1493 DC_INFO( 1494 ("%p: DataChannel transport is closing. Queueing GracefulClose call to " 1495 "RTCDataChannel.", 1496 this)); 1497 1498 mDomEventTarget->Dispatch( 1499 NS_NewCancelableRunnableFunction( 1500 "DataChannel::GracefulClose", 1501 [this, self = RefPtr<DataChannel>(this)] { 1502 if (GetDomDataChannel()) { 1503 DC_INFO(("Calling GracefulClose on RTCDataChannel.")); 1504 GetDomDataChannel()->GracefulClose(); 1505 } 1506 }), 1507 NS_DISPATCH_FALLIBLE); 1508 } 1509 1510 void DataChannel::SendMsg(nsCString&& aMsg) { 1511 SendBuffer(std::move(aMsg), false); 1512 } 1513 1514 void DataChannel::SendBinaryMsg(nsCString&& aMsg) { 1515 SendBuffer(std::move(aMsg), true); 1516 } 1517 1518 void DataChannel::SendBuffer(nsCString&& aMsg, bool aBinary) { 1519 MOZ_ASSERT(mDomEventTarget->IsOnCurrentThread()); 1520 if (mMessagesSentPromise) { 1521 mMessagesSentPromise = mMessagesSentPromise->Then( 1522 mDomEventTarget, __func__, 1523 [this, self = RefPtr<DataChannel>(this), msg = std::move(aMsg), 1524 aBinary]( 1525 const GenericNonExclusivePromise::ResolveOrRejectValue&) mutable { 1526 if (mConnection) { 1527 mConnection->SendDataMessage(*this, std::move(msg), aBinary); 1528 return GenericNonExclusivePromise::CreateAndResolve(true, __func__); 1529 } 1530 return GenericNonExclusivePromise::CreateAndResolve(false, __func__); 1531 }); 1532 1533 UnsetMessagesSentPromiseWhenSettled(); 1534 return; 1535 } 1536 mConnection->SendDataMessage(*this, std::move(aMsg), aBinary); 1537 } 1538 1539 void DataChannel::SendBinaryBlob(nsIInputStream* aBlob) { 1540 MOZ_ASSERT(mDomEventTarget->IsOnCurrentThread()); 1541 if (!mMessagesSentPromise) { 1542 mMessagesSentPromise = 1543 GenericNonExclusivePromise::CreateAndResolve(true, __func__); 1544 } 1545 1546 mMessagesSentPromise = mMessagesSentPromise->Then( 1547 mConnection->GetIOThread(), __func__, 1548 [this, self = RefPtr<DataChannel>(this), blob = RefPtr(aBlob)]( 1549 const GenericNonExclusivePromise::ResolveOrRejectValue&) { 1550 nsCString data; 1551 if (NS_SUCCEEDED(NS_ReadInputStreamToString(blob, data, -1))) { 1552 if (mConnection) { 1553 // This dispatches to STS, which is when we're supposed to resolve 1554 mConnection->SendDataMessage(*this, std::move(data), true); 1555 } 1556 blob->Close(); 1557 return GenericNonExclusivePromise::CreateAndResolve(true, __func__); 1558 } 1559 return GenericNonExclusivePromise::CreateAndResolve(false, __func__); 1560 }); 1561 1562 UnsetMessagesSentPromiseWhenSettled(); 1563 } 1564 1565 void DataChannel::UnsetMessagesSentPromiseWhenSettled() { 1566 MOZ_ASSERT(mDomEventTarget->IsOnCurrentThread()); 1567 // This is why we are using a non-exclusive promise; we want to null this out 1568 // when we're done, but only if nothing else has chained off of it. 1569 mMessagesSentPromise->Then( 1570 mDomEventTarget, __func__, 1571 [this, self = RefPtr(this), promise = mMessagesSentPromise]() { 1572 if (promise == mMessagesSentPromise) { 1573 mMessagesSentPromise = nullptr; 1574 } 1575 }); 1576 } 1577 1578 void DataChannel::SetStream(uint16_t aId) { 1579 MOZ_ASSERT(NS_IsMainThread()); 1580 mStream = aId; 1581 1582 // This is an inconvenient wrinkle in the spec; if the stream id is discovered 1583 // on main (for any reason), we update mainthread-homed RTCDataChannel 1584 // synchronously, but must dispatch for workers. It is possible this will 1585 // change, but probably not. 1586 if (mHasWorkerDomDataChannel) { 1587 DC_INFO( 1588 ("DataChannel has been allocated a stream ID. Queueing task to inform " 1589 "worker RTCDataChannel.")); 1590 mDomEventTarget->Dispatch( 1591 NS_NewCancelableRunnableFunction( 1592 __func__, 1593 [this, self = RefPtr<DataChannel>(this), aId] { 1594 if (mWorkerDomDataChannel) { 1595 mWorkerDomDataChannel->SetId(aId); 1596 } 1597 }), 1598 NS_DISPATCH_FALLIBLE); 1599 } else { 1600 DC_INFO( 1601 ("%p: DataChannel has been allocated a stream ID. Synchronously " 1602 "informing mainthread RTCDataChannel.", 1603 this)); 1604 mMainthreadDomDataChannel->SetId(aId); 1605 } 1606 } 1607 1608 void DataChannel::SetMaxMessageSize(double aMaxMessageSize) { 1609 MOZ_ASSERT(NS_IsMainThread()); 1610 1611 if (mHasWorkerDomDataChannel) { 1612 DC_INFO( 1613 ("DataChannel has updated its maximum message size. Queueing task to " 1614 "inform worker RTCDataChannel.")); 1615 mDomEventTarget->Dispatch( 1616 NS_NewCancelableRunnableFunction( 1617 __func__, 1618 [this, self = RefPtr<DataChannel>(this), aMaxMessageSize] { 1619 if (mWorkerDomDataChannel) { 1620 mWorkerDomDataChannel->SetMaxMessageSize(aMaxMessageSize); 1621 } 1622 }), 1623 NS_DISPATCH_FALLIBLE); 1624 } else { 1625 DC_INFO( 1626 ("%p: DataChannel has updated its maximum message size. Synchronously " 1627 "informing mainthread RTCDataChannel.", 1628 this)); 1629 if (mMainthreadDomDataChannel) { 1630 mMainthreadDomDataChannel->SetMaxMessageSize(aMaxMessageSize); 1631 } 1632 } 1633 } 1634 1635 void DataChannel::OnMessageReceived(nsCString&& aMsg, bool aIsBinary) { 1636 // Receiving any data implies that the other end has received an OPEN 1637 // request from us. 1638 mWaitingForAck = false; 1639 1640 DC_DEBUG( 1641 ("%p: received message (%s)", this, aIsBinary ? "binary" : "string")); 1642 1643 mDomEventTarget->Dispatch(NS_NewCancelableRunnableFunction( 1644 "DataChannel::OnMessageReceived", 1645 [this, self = RefPtr<DataChannel>(this), 1646 msg = std::move(aMsg), aIsBinary]() { 1647 if (GetDomDataChannel()) { 1648 GetDomDataChannel()->DoOnMessageAvailable( 1649 msg, aIsBinary); 1650 } 1651 }), 1652 NS_DISPATCH_FALLIBLE); 1653 } 1654 1655 } // namespace mozilla