DataChannel.h (19395B)
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* vim: set ts=2 et sw=2 tw=80: */ 3 /* This Source Code Form is subject to the terms of the Mozilla Public 4 * License, v. 2.0. If a copy of the MPL was not distributed with this file, 5 * You can obtain one at http://mozilla.org/MPL/2.0/. */ 6 7 #ifndef NETWERK_SCTP_DATACHANNEL_DATACHANNEL_H_ 8 #define NETWERK_SCTP_DATACHANNEL_DATACHANNEL_H_ 9 10 #include <map> 11 #include <memory> 12 #include <string> 13 #include <vector> 14 #include <errno.h> 15 #include "nsISupports.h" 16 #include "nsCOMPtr.h" 17 #include "mozilla/MozPromise.h" 18 #include "mozilla/StopGapEventTarget.h" 19 #include "mozilla/WeakPtr.h" 20 #include "mozilla/dom/RTCStatsReportBinding.h" 21 #include "nsString.h" 22 #include "nsThreadUtils.h" 23 #include "nsTArray.h" 24 #include "nsDeque.h" 25 #include "mozilla/dom/Blob.h" 26 #include "mozilla/Mutex.h" 27 #include "DataChannelProtocol.h" 28 #include "mozilla/net/NeckoTargetHolder.h" 29 #include "MediaEventSource.h" 30 31 #include "transport/transportlayer.h" // For TransportLayer::State 32 33 namespace mozilla { 34 35 class DataChannelConnection; 36 class DataChannel; 37 class MediaPacket; 38 class MediaTransportHandler; 39 namespace dom { 40 class RTCDataChannel; 41 struct RTCStatsCollection; 42 }; // namespace dom 43 44 enum class DataChannelConnectionState { Connecting, Open, Closed }; 45 enum class DataChannelReliabilityPolicy { 46 Reliable, 47 LimitedRetransmissions, 48 LimitedLifetime 49 }; 50 51 class DataChannelMessageMetadata { 52 public: 53 DataChannelMessageMetadata(uint16_t aStreamId, uint32_t aPpid, 54 bool aUnordered, 55 Maybe<uint16_t> aMaxRetransmissions = Nothing(), 56 Maybe<uint16_t> aMaxLifetimeMs = Nothing()) 57 : mStreamId(aStreamId), 58 mPpid(aPpid), 59 mUnordered(aUnordered), 60 mMaxRetransmissions(aMaxRetransmissions), 61 mMaxLifetimeMs(aMaxLifetimeMs) {} 62 63 DataChannelMessageMetadata(const DataChannelMessageMetadata& aOrig) = default; 64 DataChannelMessageMetadata(DataChannelMessageMetadata&& aOrig) = default; 65 DataChannelMessageMetadata& operator=( 66 const DataChannelMessageMetadata& aOrig) = default; 67 DataChannelMessageMetadata& operator=(DataChannelMessageMetadata&& aOrig) = 68 default; 69 70 uint16_t mStreamId; 71 uint32_t mPpid; 72 bool mUnordered; 73 Maybe<uint16_t> mMaxRetransmissions; 74 Maybe<uint16_t> mMaxLifetimeMs; 75 }; 76 77 class OutgoingMsg { 78 public: 79 OutgoingMsg(nsACString&& data, const DataChannelMessageMetadata& aMetadata); 80 OutgoingMsg(OutgoingMsg&& aOrig) = default; 81 OutgoingMsg& operator=(OutgoingMsg&& aOrig) = default; 82 OutgoingMsg(const OutgoingMsg&) = delete; 83 OutgoingMsg& operator=(const OutgoingMsg&) = delete; 84 85 void Advance(size_t offset); 86 const DataChannelMessageMetadata& GetMetadata() const { return mMetadata; }; 87 size_t GetLength() const { return mData.Length(); }; 88 Span<const uint8_t> GetRemainingData() const { 89 auto span = Span<const uint8_t>(mData); 90 return span.From(mPos); 91 } 92 93 protected: 94 nsCString mData; 95 DataChannelMessageMetadata mMetadata; 96 size_t mPos = 0; 97 }; 98 99 class IncomingMsg { 100 public: 101 explicit IncomingMsg(uint32_t aPpid, uint16_t aStreamId) 102 : mPpid(aPpid), mStreamId(aStreamId) {} 103 IncomingMsg(IncomingMsg&& aOrig) = default; 104 IncomingMsg& operator=(IncomingMsg&& aOrig) = default; 105 IncomingMsg(const IncomingMsg&) = delete; 106 IncomingMsg& operator=(const IncomingMsg&) = delete; 107 108 void Append(const uint8_t* aData, size_t aLen) { 109 mData.Append((const char*)aData, aLen); 110 } 111 112 const nsCString& GetData() const { return mData; } 113 nsCString& GetData() { return mData; } 114 size_t GetLength() const { return mData.Length(); }; 115 uint16_t GetStreamId() const { return mStreamId; } 116 uint32_t GetPpid() const { return mPpid; } 117 118 protected: 119 // TODO(bug 1949918): We've historically passed this around as a c-string, but 120 // that's not really appropriate for binary messages. 121 nsCString mData; 122 uint32_t mPpid; 123 uint16_t mStreamId; 124 }; 125 126 // Would be nice if this were DataChannel::StatsPromise, but no big deal. 127 typedef MozPromise<dom::RTCDataChannelStats, nsresult, true> 128 DataChannelStatsPromise; 129 130 // One per PeerConnection 131 class DataChannelConnection : public net::NeckoTargetHolder { 132 friend class DataChannel; 133 friend class DataChannelConnectRunnable; 134 friend class DataChannelConnectionUsrsctp; 135 136 protected: 137 virtual ~DataChannelConnection(); 138 139 public: 140 enum class PendingType { 141 None, // No outgoing messages are pending. 142 Dcep, // Outgoing DCEP messages are pending. 143 Data, // Outgoing data channel messages are pending. 144 }; 145 146 class DataConnectionListener : public SupportsWeakPtr { 147 public: 148 virtual ~DataConnectionListener() = default; 149 150 // Called when a new DataChannel has been opened by the other side. 151 virtual void NotifyDataChannel( 152 already_AddRefed<DataChannel> aChannel, const nsACString& aLabel, 153 bool aOrdered, mozilla::dom::Nullable<uint16_t> aMaxLifeTime, 154 mozilla::dom::Nullable<uint16_t> aMaxRetransmits, 155 const nsACString& aProtocol, bool aNegotiated) = 0; 156 157 // Called when a DataChannel transitions to state open 158 virtual void NotifyDataChannelOpen(DataChannel* aChannel) = 0; 159 160 // Called when a DataChannel (that was open at some point in the past) 161 // transitions to state closed 162 virtual void NotifyDataChannelClosed(DataChannel* aChannel) = 0; 163 164 // Called when SCTP connects 165 virtual void NotifySctpConnected() = 0; 166 167 // Called when SCTP closes 168 virtual void NotifySctpClosed() = 0; 169 }; 170 171 // Create a new DataChannel Connection 172 // Must be called on Main thread 173 static Maybe<RefPtr<DataChannelConnection>> Create( 174 DataConnectionListener* aListener, nsISerialEventTarget* aTarget, 175 MediaTransportHandler* aHandler, const uint16_t aLocalPort, 176 const uint16_t aNumStreams); 177 178 DataChannelConnection(const DataChannelConnection&) = delete; 179 DataChannelConnection(DataChannelConnection&&) = delete; 180 DataChannelConnection& operator=(const DataChannelConnection&) = delete; 181 DataChannelConnection& operator=(DataChannelConnection&&) = delete; 182 183 NS_INLINE_DECL_THREADSAFE_REFCOUNTING(DataChannelConnection) 184 185 // Called immediately after construction 186 virtual bool Init(const uint16_t aLocalPort, const uint16_t aNumStreams) = 0; 187 // Called when our transport is ready to send and recv 188 virtual void OnTransportReady() = 0; 189 // This is called after an ACK comes in, to prompt subclasses to deliver 190 // anything they've buffered while awaiting the ACK. 191 virtual void OnStreamOpen(uint16_t stream) = 0; 192 // Called when the base class wants to raise the stream limit 193 virtual bool RaiseStreamLimitTo(uint16_t aNewLimit) = 0; 194 // Called when the base class wants to send a message; it is expected that 195 // this will eventually result in a call/s to SendSctpPacket once the SCTP 196 // packet is ready to be sent to the transport. 197 virtual int SendMessage(DataChannel& aChannel, OutgoingMsg&& aMsg) = 0; 198 // Called when the base class receives a packet from the transport 199 virtual void OnSctpPacketReceived(const MediaPacket& packet) = 0; 200 // Called when the base class is closing streams 201 virtual bool ResetStreams(nsTArray<uint16_t>& aStreams) = 0; 202 // Called when the SCTP connection is being shut down 203 virtual void Destroy(); 204 205 // Call only when the remote SDP has a=max-message-size 206 void SetMaxMessageSize(uint64_t aMaxMessageSize); 207 double GetMaxMessageSize(); 208 void HandleDataMessage(IncomingMsg&& aMsg); 209 void HandleDCEPMessage(IncomingMsg&& aMsg); 210 void ProcessQueuedOpens(); 211 void OnStreamsReset(std::vector<uint16_t>&& aStreams); 212 void OnStreamsResetComplete(std::vector<uint16_t>&& aStreams); 213 214 typedef DataChannelStatsPromise::AllPromiseType StatsPromise; 215 RefPtr<StatsPromise> GetStats(const DOMHighResTimeStamp aTimestamp) const; 216 217 bool ConnectToTransport(const std::string& aTransportId, const bool aClient, 218 const uint16_t aLocalPort, 219 const uint16_t aRemotePort); 220 void TransportStateChange(const std::string& aTransportId, 221 TransportLayer::State aState); 222 void SetSignals(const std::string& aTransportId); 223 224 [[nodiscard]] already_AddRefed<DataChannel> Open( 225 const nsACString& label, const nsACString& protocol, 226 DataChannelReliabilityPolicy prPolicy, bool inOrder, uint32_t prValue, 227 bool aExternalNegotiated, uint16_t aStream); 228 229 void EndOfStream(const RefPtr<DataChannel>& aChannel); 230 void FinishClose_s(const RefPtr<DataChannel>& aChannel); 231 void CloseAll(); 232 void CloseAll_s(); 233 void MarkStreamAvailable(uint16_t aStream); 234 235 nsISerialEventTarget* GetIOThread(); 236 237 bool InShutdown() const { 238 #ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED 239 return mShutdown; 240 #else 241 return false; 242 #endif 243 } 244 245 protected: 246 class Channels { 247 public: 248 using ChannelArray = AutoTArray<RefPtr<DataChannel>, 16>; 249 250 Channels() : mMutex("DataChannelConnection::Channels::mMutex") {} 251 Channels(const Channels&) = delete; 252 Channels(Channels&&) = delete; 253 Channels& operator=(const Channels&) = delete; 254 Channels& operator=(Channels&&) = delete; 255 256 void Insert(const RefPtr<DataChannel>& aChannel); 257 bool Remove(const RefPtr<DataChannel>& aChannel); 258 RefPtr<DataChannel> Get(uint16_t aId) const; 259 ChannelArray GetAll() const { 260 MutexAutoLock lock(mMutex); 261 return mChannels.Clone(); 262 } 263 RefPtr<DataChannel> GetNextChannel(uint16_t aCurrentId) const; 264 265 private: 266 struct IdComparator { 267 bool Equals(const RefPtr<DataChannel>& aChannel, uint16_t aId) const; 268 bool LessThan(const RefPtr<DataChannel>& aChannel, uint16_t aId) const; 269 bool Equals(const RefPtr<DataChannel>& a1, 270 const RefPtr<DataChannel>& a2) const; 271 bool LessThan(const RefPtr<DataChannel>& a1, 272 const RefPtr<DataChannel>& a2) const; 273 }; 274 mutable Mutex mMutex; 275 ChannelArray mChannels MOZ_GUARDED_BY(mMutex); 276 }; 277 278 DataChannelConnection(DataConnectionListener* aListener, 279 nsISerialEventTarget* aTarget, 280 MediaTransportHandler* aHandler); 281 282 void SendDataMessage(DataChannel& aChannel, nsACString&& aMsg, 283 bool aIsBinary); 284 285 DataChannelConnectionState GetState() const { 286 MOZ_ASSERT(mSTS->IsOnCurrentThread()); 287 return mState; 288 } 289 290 void SetState(DataChannelConnectionState aState); 291 292 static void DTLSConnectThread(void* data); 293 void SendPacket(std::unique_ptr<MediaPacket>&& packet); 294 void OnPacketReceived(const std::string& aTransportId, 295 const MediaPacket& packet); 296 already_AddRefed<DataChannel> FindChannelByStream(uint16_t stream); 297 uint16_t FindFreeStream() const; 298 int SendControlMessage(DataChannel& aChannel, const uint8_t* data, 299 uint32_t len); 300 int SendOpenAckMessage(DataChannel& aChannel); 301 int SendOpenRequestMessage(DataChannel& aChannel); 302 303 void OpenFinish(RefPtr<DataChannel> aChannel); 304 305 void HandleUnknownMessage(uint32_t ppid, uint32_t length, uint16_t stream); 306 void HandleOpenRequestMessage( 307 const struct rtcweb_datachannel_open_request* req, uint32_t length, 308 uint16_t stream); 309 void HandleOpenAckMessage(const struct rtcweb_datachannel_ack* ack, 310 uint32_t length, uint16_t stream); 311 bool ReassembleMessageChunk(IncomingMsg& aReassembled, const void* buffer, 312 size_t length, uint32_t ppid, uint16_t stream); 313 314 /******************** Mainthread only **********************/ 315 // Avoid cycles with PeerConnectionImpl 316 // Use from main thread only as WeakPtr is not threadsafe 317 WeakPtr<DataConnectionListener> mListener; 318 uint64_t mMaxMessageSize = WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_REMOTE_DEFAULT; 319 nsTArray<uint16_t> mStreamIds; 320 Maybe<bool> mAllocateEven; 321 nsCOMPtr<nsIThread> mInternalIOThread = nullptr; 322 /***********************************************************/ 323 324 /*********************** STS only **************************/ 325 std::set<RefPtr<DataChannel>> mPending; 326 uint16_t mNegotiatedIdLimit = 0; 327 PendingType mPendingType = PendingType::None; 328 std::string mTransportId; 329 bool mConnectedToTransportHandler = false; 330 RefPtr<MediaTransportHandler> mTransportHandler; 331 MediaEventListener mPacketReceivedListener; 332 MediaEventListener mStateChangeListener; 333 DataChannelConnectionState mState = DataChannelConnectionState::Closed; 334 /***********************************************************/ 335 336 // NOTE: while this container will auto-expand, increases in the number of 337 // channels available from the stack must be negotiated! 338 // Accessed from both main and sts, API is threadsafe 339 Channels mChannels; 340 341 // Set once on main in Init, invariant thereafter 342 uintptr_t mId = 0; 343 344 // Set once on main in ConnectToTransport, and read only (STS) thereafter. 345 // Nothing should be using these before that first ConnectToTransport call. 346 uint16_t mLocalPort = 0; 347 uint16_t mRemotePort = 0; 348 349 nsCOMPtr<nsISerialEventTarget> mSTS; 350 351 #ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED 352 bool mShutdown = false; 353 #endif 354 }; 355 356 class DataChannel { 357 friend class DataChannelConnection; 358 friend class DataChannelConnectionUsrsctp; 359 360 public: 361 DataChannel(DataChannelConnection* connection, uint16_t stream, 362 const nsACString& label, const nsACString& protocol, 363 DataChannelReliabilityPolicy policy, uint32_t value, bool ordered, 364 bool negotiated); 365 DataChannel(const DataChannel&) = delete; 366 DataChannel(DataChannel&&) = delete; 367 DataChannel& operator=(const DataChannel&) = delete; 368 DataChannel& operator=(DataChannel&&) = delete; 369 370 private: 371 ~DataChannel(); 372 373 public: 374 NS_INLINE_DECL_THREADSAFE_REFCOUNTING(DataChannel) 375 376 // The transfer dance here is somewhat complex because we can be dispatching 377 // events while the transfer is in progress, and even before we know whether a 378 // transfer might occur. 379 // 380 // Called when the mainthread RTCDataChannel is created 381 void SetMainthreadDomDataChannel(dom::RTCDataChannel* aChannel); 382 383 // Called when the mainthread RTCDataChannel has started the transfer steps. 384 // This is important, because it means that certain updates (particularly 385 // things like stream id and max message size) that are ordinarily set 386 // synchronously should now be dispatched. 387 void OnWorkerTransferStarted(); 388 389 // Called when the worker thread RTCDataChannel is created. This is where we 390 // learn about the worker thread, and we can start dispatching events to it. 391 void OnWorkerTransferComplete(dom::RTCDataChannel* aChannel); 392 393 // Called when the window of opportunity to start a transfer has closed. This 394 // is where we know that main is our event target, and we can start 395 // dispatching events to it. 396 void OnWorkerTransferDisabled(); 397 398 // Unsets our (weak) ref to the mainthread RTCDataChannel. If we *also* have a 399 // worker-thread RTCDataChannel, we must notify it about this, because it 400 // means that there's one less reason for it to keep a self-ref. 401 void UnsetMainthreadDomDataChannel(); 402 403 // Unsets our (weak) ref to a worker thread RTCDataChannel, if one exists. 404 void UnsetWorkerDomDataChannel(); 405 406 // Helper for send methods that converts POSIX error codes to an ErrorResult. 407 static void SendErrnoToErrorResult(int error, size_t aMessageSize, 408 ErrorResult& aRv); 409 410 // Send a string 411 void SendMsg(nsCString&& aMsg); 412 413 // Send a binary message (TypedArray) 414 void SendBinaryMsg(nsCString&& aMsg); 415 416 // Send a binary blob 417 void SendBinaryBlob(nsIInputStream* aBlob); 418 419 void DecrementBufferedAmount(size_t aSize); 420 void AnnounceOpen(); 421 void AnnounceClosed(); 422 void GracefulClose(); 423 424 Maybe<uint16_t> GetStream() const { 425 MOZ_ASSERT(NS_IsMainThread()); 426 if (mStream == INVALID_STREAM) { 427 return Nothing(); 428 } 429 return Some(mStream); 430 } 431 432 void SetStream(uint16_t aId); 433 void SetMaxMessageSize(double aMaxMessageSize); 434 double GetMaxMessageSize() const { return mConnection->GetMaxMessageSize(); } 435 436 void OnMessageReceived(nsCString&& aMsg, bool aIsBinary); 437 438 RefPtr<DataChannelStatsPromise> GetStats( 439 const DOMHighResTimeStamp aTimestamp); 440 441 // Called when there will be no more data sent 442 void EndOfStream(); 443 444 dom::RTCDataChannel* GetDomDataChannel() const { 445 MOZ_ASSERT(mDomEventTarget->IsOnCurrentThread()); 446 if (NS_IsMainThread()) { 447 return mMainthreadDomDataChannel; 448 } 449 return mWorkerDomDataChannel; 450 } 451 452 private: 453 nsresult AddDataToBinaryMsg(const char* data, uint32_t size); 454 void SendBuffer(nsCString&& aMsg, bool aBinary); 455 void UnsetMessagesSentPromiseWhenSettled(); 456 457 const nsCString mLabel; 458 const nsCString mProtocol; 459 const DataChannelReliabilityPolicy mPrPolicy; 460 const uint32_t mPrValue; 461 const bool mNegotiated; 462 const bool mOrdered; 463 464 // DOM Thread only; wherever the RTCDataChannel lives. 465 dom::RTCDataChannel* mMainthreadDomDataChannel = nullptr; 466 bool mHasWorkerDomDataChannel = false; 467 bool mEverOpened = false; 468 bool mAnnouncedClosed = false; 469 uint16_t mStream; 470 RefPtr<GenericNonExclusivePromise> mMessagesSentPromise; 471 RefPtr<DataChannelConnection> mConnection; 472 473 // STS only 474 // The channel has been opened, but the peer has not yet acked - ensures that 475 // the messages are sent ordered until this is cleared. 476 bool mWaitingForAck = false; 477 bool mSendStreamNeedsReset = false; 478 bool mRecvStreamNeedsReset = false; 479 bool mEndOfStreamCalled = false; 480 nsTArray<OutgoingMsg> mBufferedData; 481 std::map<uint16_t, IncomingMsg> mRecvBuffers; 482 483 // At first, this is not hooked to any real event target, and just buffers 484 // events. Later on, when we know what event target we should use, we hook it 485 // in here, this dispatches the buffered events, and then acts as a 486 // passthrough. 487 const RefPtr<StopGapEventTarget> mDomEventTarget; 488 489 // Worker thread only. We keep this separately because the spec requires it to 490 // have a strong ref (from the worker global scope) as long as the *original* 491 // mainthread RTCDataChannel is still alive. When the mainthread 492 // RTCDataChannel goes away, we will notice, and then let the worker 493 // RTCDataChannel know about it. 494 dom::RTCDataChannel* mWorkerDomDataChannel = nullptr; 495 }; 496 497 static constexpr const char* ToString(DataChannelConnectionState state) { 498 switch (state) { 499 case DataChannelConnectionState::Connecting: 500 return "CONNECTING"; 501 case DataChannelConnectionState::Open: 502 return "OPEN"; 503 case DataChannelConnectionState::Closed: 504 return "CLOSED"; 505 } 506 return ""; 507 }; 508 509 static constexpr const char* ToString(DataChannelConnection::PendingType type) { 510 switch (type) { 511 case DataChannelConnection::PendingType::None: 512 return "NONE"; 513 case DataChannelConnection::PendingType::Dcep: 514 return "DCEP"; 515 case DataChannelConnection::PendingType::Data: 516 return "DATA"; 517 } 518 return ""; 519 }; 520 521 static constexpr const char* ToString(DataChannelReliabilityPolicy type) { 522 switch (type) { 523 case DataChannelReliabilityPolicy::Reliable: 524 return "RELIABLE"; 525 case DataChannelReliabilityPolicy::LimitedRetransmissions: 526 return "LIMITED_RETRANSMISSIONS"; 527 case DataChannelReliabilityPolicy::LimitedLifetime: 528 return "LIMITED_LIFETIME"; 529 } 530 return ""; 531 }; 532 533 } // namespace mozilla 534 535 #endif // NETWERK_SCTP_DATACHANNEL_DATACHANNEL_H_