commit d235b12a77a8170992b621d981623e17c5c1fa12
parent f0daba13ae48041d050c6c3c7781ed56970dc5e4
Author: Byron Campen <docfaraday@gmail.com>
Date: Wed, 12 Nov 2025 23:27:37 +0000
Bug 1577830: Ensure that sends for blobs and non-blobs are serialized. r=ng
Differential Revision: https://phabricator.services.mozilla.com/D271684
Diffstat:
4 files changed, 92 insertions(+), 170 deletions(-)
diff --git a/dom/media/webrtc/jsapi/RTCDataChannel.cpp b/dom/media/webrtc/jsapi/RTCDataChannel.cpp
@@ -385,13 +385,10 @@ void RTCDataChannel::Send(const nsAString& aData, ErrorResult& aRv) {
}
size_t length = msgString.Length();
- if (!mDataChannel->SendMsg(std::move(msgString))) {
- ++mMessagesSent;
- mBytesSent += length;
- IncrementBufferedAmount(length);
- } else {
- aRv.ThrowOperationError("Failed to queue message");
- }
+ mDataChannel->SendMsg(std::move(msgString));
+ ++mMessagesSent;
+ mBytesSent += length;
+ IncrementBufferedAmount(length);
}
void RTCDataChannel::Send(Blob& aData, ErrorResult& aRv) {
@@ -423,13 +420,10 @@ void RTCDataChannel::Send(Blob& aData, ErrorResult& aRv) {
return;
}
- if (!mDataChannel->SendBinaryBlob(msgStream)) {
- ++mMessagesSent;
- mBytesSent += msgLength;
- IncrementBufferedAmount(msgLength);
- } else {
- aRv.ThrowOperationError("Failed to queue message");
- }
+ mDataChannel->SendBinaryBlob(msgStream);
+ ++mMessagesSent;
+ mBytesSent += msgLength;
+ IncrementBufferedAmount(msgLength);
}
void RTCDataChannel::Send(const ArrayBuffer& aData, ErrorResult& aRv) {
@@ -451,13 +445,10 @@ void RTCDataChannel::Send(const ArrayBuffer& aData, ErrorResult& aRv) {
}
size_t length = msgString.Length();
- if (!mDataChannel->SendBinaryMsg(std::move(msgString))) {
- ++mMessagesSent;
- mBytesSent += length;
- IncrementBufferedAmount(length);
- } else {
- aRv.ThrowOperationError("Failed to queue message");
- }
+ mDataChannel->SendBinaryMsg(std::move(msgString));
+ ++mMessagesSent;
+ mBytesSent += length;
+ IncrementBufferedAmount(length);
}
void RTCDataChannel::Send(const ArrayBufferView& aData, ErrorResult& aRv) {
@@ -479,13 +470,10 @@ void RTCDataChannel::Send(const ArrayBufferView& aData, ErrorResult& aRv) {
}
size_t length = msgString.Length();
- if (!mDataChannel->SendBinaryMsg(std::move(msgString))) {
- ++mMessagesSent;
- mBytesSent += length;
- IncrementBufferedAmount(length);
- } else {
- aRv.ThrowOperationError("Failed to queue message");
- }
+ mDataChannel->SendBinaryMsg(std::move(msgString));
+ ++mMessagesSent;
+ mBytesSent += length;
+ IncrementBufferedAmount(length);
}
void RTCDataChannel::GracefulClose() {
diff --git a/netwerk/sctp/datachannel/DataChannel.cpp b/netwerk/sctp/datachannel/DataChannel.cpp
@@ -986,89 +986,16 @@ void DataChannelConnection::OpenFinish(RefPtr<DataChannel> aChannel) {
OnStreamOpen(stream);
}
-class ReadBlobRunnable : public Runnable {
- public:
- ReadBlobRunnable(DataChannelConnection* aConnection, uint16_t aStream,
- nsIInputStream* aBlob)
- : Runnable("ReadBlobRunnable"),
- mConnection(aConnection),
- mStream(aStream),
- mBlob(aBlob) {}
-
- NS_IMETHOD Run() override {
- // ReadBlob() is responsible to releasing the reference
- DataChannelConnection* self = mConnection;
- self->ReadBlob(mConnection.forget(), mStream, mBlob);
- return NS_OK;
- }
-
- private:
- // Make sure the Connection doesn't die while there are jobs outstanding.
- // Let it die (if released by PeerConnectionImpl while we're running)
- // when we send our runnable back to MainThread. Then ~DataChannelConnection
- // can send the IOThread to MainThread to die in a runnable, avoiding
- // unsafe event loop recursion. Evil.
- RefPtr<DataChannelConnection> mConnection;
- uint16_t mStream;
- // Use RefCount for preventing the object is deleted when SendBlob returns.
- RefPtr<nsIInputStream> mBlob;
-};
-
-// Returns a POSIX error code.
-int DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream* aBlob) {
- RefPtr<DataChannel> channel = mChannels.Get(stream);
- if (NS_WARN_IF(!channel)) {
- return EINVAL; // TODO: Find a better error code
- }
-
+nsISerialEventTarget* DataChannelConnection::GetIOThread() {
// Spawn a thread to send the data
if (!mInternalIOThread) {
- nsresult rv =
- NS_NewNamedThread("DataChannel IO", getter_AddRefs(mInternalIOThread));
- if (NS_FAILED(rv)) {
- return EINVAL; // TODO: Find a better error code
- }
+ // TODO(bug 1998966): Lazy shutdown once done? Maybe have this live in
+ // DataChannel (so we have an IO thread for each channel that sends blobs)?
+ NS_NewNamedThread("DataChannel IO", getter_AddRefs(mInternalIOThread));
}
- mInternalIOThread->Dispatch(
- do_AddRef(new ReadBlobRunnable(this, stream, aBlob)),
- NS_DISPATCH_FALLIBLE);
- return 0;
-}
-
-class DataChannelBlobSendRunnable : public Runnable {
- public:
- DataChannelBlobSendRunnable(
- already_AddRefed<DataChannelConnection>& aConnection, uint16_t aStream)
- : Runnable("DataChannelBlobSendRunnable"),
- mConnection(aConnection),
- mStream(aStream) {}
-
- ~DataChannelBlobSendRunnable() override {
- if (!NS_IsMainThread() && mConnection) {
- MOZ_ASSERT(false);
- // explicitly leak the connection if destroyed off mainthread
- mConnection.forget().leak();
- }
- }
-
- NS_IMETHOD Run() override {
- MOZ_ASSERT(NS_IsMainThread());
-
- mConnection->SendBinaryMessage(mStream, std::move(mData));
- mConnection = nullptr;
- return NS_OK;
- }
-
- // explicitly public so we can avoid allocating twice and copying
- nsCString mData;
-
- private:
- // Note: we can be destroyed off the target thread, so be careful not to let
- // this get Released()ed on the temp thread!
- RefPtr<DataChannelConnection> mConnection;
- uint16_t mStream;
-};
+ return mInternalIOThread.get();
+}
void DataChannelConnection::SetState(DataChannelConnectionState aState) {
MOZ_ASSERT(mSTS->IsOnCurrentThread());
@@ -1105,36 +1032,8 @@ void DataChannelConnection::SetState(DataChannelConnectionState aState) {
}
}
-void DataChannelConnection::ReadBlob(
- already_AddRefed<DataChannelConnection> aThis, uint16_t aStream,
- nsIInputStream* aBlob) {
- MOZ_ASSERT(!mSTS->IsOnCurrentThread());
- MOZ_ASSERT(!NS_IsMainThread());
- // NOTE: 'aThis' has been forgotten by the caller to avoid releasing
- // it off mainthread; if PeerConnectionImpl has released then we want
- // ~DataChannelConnection() to run on MainThread
-
- // Must not let Dispatching it cause the DataChannelConnection to get
- // released on the wrong thread. Using
- // WrapRunnable(RefPtr<DataChannelConnection>(aThis),... will occasionally
- // cause aThis to get released on this thread. Also, an explicit Runnable
- // lets us avoid copying the blob data an extra time.
- RefPtr<DataChannelBlobSendRunnable> runnable =
- new DataChannelBlobSendRunnable(aThis, aStream);
- // avoid copying the blob data by passing the mData from the runnable
- if (NS_FAILED(NS_ReadInputStreamToString(aBlob, runnable->mData, -1))) {
- // Bug 966602: Doesn't return an error to the caller via onerror.
- // We must release DataChannelConnection on MainThread to avoid issues (bug
- // 876167) aThis is now owned by the runnable; release it there
- NS_ReleaseOnMainThread("DataChannelBlobSendRunnable", runnable.forget());
- return;
- }
- aBlob->Close();
- Dispatch(runnable.forget(), NS_DISPATCH_FALLIBLE);
-}
-
-int DataChannelConnection::SendDataMessage(uint16_t aStream, nsACString&& aMsg,
- bool aIsBinary) {
+void DataChannelConnection::SendDataMessage(uint16_t aStream, nsACString&& aMsg,
+ bool aIsBinary) {
// Could be main, could be a worker
nsCString temp(std::move(aMsg));
@@ -1191,8 +1090,6 @@ int DataChannelConnection::SendDataMessage(uint16_t aStream, nsACString&& aMsg,
SendMessage(*channel, std::move(outgoing));
}),
NS_DISPATCH_FALLIBLE);
-
- return 0;
}
void DataChannelConnection::EndOfStream(const RefPtr<DataChannel>& aChannel) {
@@ -1599,19 +1496,72 @@ void DataChannel::GracefulClose() {
NS_DISPATCH_FALLIBLE);
}
-int DataChannel::SendMsg(nsACString&& aMsg) {
+void DataChannel::SendMsg(nsCString&& aMsg) {
+ SendBuffer(std::move(aMsg), false);
+}
+
+void DataChannel::SendBinaryMsg(nsCString&& aMsg) {
+ SendBuffer(std::move(aMsg), true);
+}
+
+void DataChannel::SendBuffer(nsCString&& aMsg, bool aBinary) {
MOZ_ASSERT(mDomEventTarget->IsOnCurrentThread());
- return mConnection->SendMessage(mStream, std::move(aMsg));
+ if (mMessagesSentPromise) {
+ mMessagesSentPromise = mMessagesSentPromise->Then(
+ mDomEventTarget, __func__,
+ [this, self = RefPtr<DataChannel>(this), msg = std::move(aMsg),
+ aBinary](
+ const GenericNonExclusivePromise::ResolveOrRejectValue&) mutable {
+ if (mConnection) {
+ mConnection->SendDataMessage(mStream, std::move(msg), aBinary);
+ return GenericNonExclusivePromise::CreateAndResolve(true, __func__);
+ }
+ return GenericNonExclusivePromise::CreateAndResolve(false, __func__);
+ });
+
+ UnsetMessagesSentPromiseWhenSettled();
+ return;
+ }
+ mConnection->SendDataMessage(mStream, std::move(aMsg), aBinary);
}
-int DataChannel::SendBinaryMsg(nsACString&& aMsg) {
+void DataChannel::SendBinaryBlob(nsIInputStream* aBlob) {
MOZ_ASSERT(mDomEventTarget->IsOnCurrentThread());
- return mConnection->SendBinaryMessage(mStream, std::move(aMsg));
+ if (!mMessagesSentPromise) {
+ mMessagesSentPromise =
+ GenericNonExclusivePromise::CreateAndResolve(true, __func__);
+ }
+
+ mMessagesSentPromise = mMessagesSentPromise->Then(
+ mConnection->GetIOThread(), __func__,
+ [this, self = RefPtr<DataChannel>(this), blob = RefPtr(aBlob)](
+ const GenericNonExclusivePromise::ResolveOrRejectValue&) {
+ nsCString data;
+ if (NS_SUCCEEDED(NS_ReadInputStreamToString(blob, data, -1))) {
+ if (mConnection) {
+ // This dispatches to STS, which is when we're supposed to resolve
+ mConnection->SendDataMessage(mStream, std::move(data), true);
+ }
+ blob->Close();
+ return GenericNonExclusivePromise::CreateAndResolve(true, __func__);
+ }
+ return GenericNonExclusivePromise::CreateAndResolve(false, __func__);
+ });
+
+ UnsetMessagesSentPromiseWhenSettled();
}
-int DataChannel::SendBinaryBlob(nsIInputStream* aBlob) {
+void DataChannel::UnsetMessagesSentPromiseWhenSettled() {
MOZ_ASSERT(mDomEventTarget->IsOnCurrentThread());
- return mConnection->SendBlob(mStream, aBlob);
+ // This is why we are using a non-exclusive promise; we want to null this out
+ // when we're done, but only if nothing else has chained off of it.
+ mMessagesSentPromise->Then(
+ mDomEventTarget, __func__,
+ [this, self = RefPtr(this), promise = mMessagesSentPromise]() {
+ if (promise == mMessagesSentPromise) {
+ mMessagesSentPromise = nullptr;
+ }
+ });
}
void DataChannel::SetStream(uint16_t aId) {
diff --git a/netwerk/sctp/datachannel/DataChannel.h b/netwerk/sctp/datachannel/DataChannel.h
@@ -232,21 +232,7 @@ class DataChannelConnection : public net::NeckoTargetHolder {
void CloseAll_s();
void MarkStreamAvailable(uint16_t aStream);
- // Returns a POSIX error code.
- int SendMessage(uint16_t stream, nsACString&& aMsg) {
- return SendDataMessage(stream, std::move(aMsg), false);
- }
-
- // Returns a POSIX error code.
- int SendBinaryMessage(uint16_t stream, nsACString&& aMsg) {
- return SendDataMessage(stream, std::move(aMsg), true);
- }
-
- // Returns a POSIX error code.
- int SendBlob(uint16_t stream, nsIInputStream* aBlob);
-
- void ReadBlob(already_AddRefed<DataChannelConnection> aThis, uint16_t aStream,
- nsIInputStream* aBlob);
+ nsISerialEventTarget* GetIOThread();
bool InShutdown() const {
#ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
@@ -293,7 +279,7 @@ class DataChannelConnection : public net::NeckoTargetHolder {
nsISerialEventTarget* aTarget,
MediaTransportHandler* aHandler);
- int SendDataMessage(uint16_t aStream, nsACString&& aMsg, bool aIsBinary);
+ void SendDataMessage(uint16_t aStream, nsACString&& aMsg, bool aIsBinary);
DataChannelConnectionState GetState() const {
MOZ_ASSERT(mSTS->IsOnCurrentThread());
@@ -421,13 +407,13 @@ class DataChannel {
ErrorResult& aRv);
// Send a string
- int SendMsg(nsACString&& aMsg);
+ void SendMsg(nsCString&& aMsg);
// Send a binary message (TypedArray)
- int SendBinaryMsg(nsACString&& aMsg);
+ void SendBinaryMsg(nsCString&& aMsg);
// Send a binary blob
- int SendBinaryBlob(nsIInputStream* aBlob);
+ void SendBinaryBlob(nsIInputStream* aBlob);
void DecrementBufferedAmount(size_t aSize);
void AnnounceOpen();
@@ -464,6 +450,8 @@ class DataChannel {
private:
nsresult AddDataToBinaryMsg(const char* data, uint32_t size);
+ void SendBuffer(nsCString&& aMsg, bool aBinary);
+ void UnsetMessagesSentPromiseWhenSettled();
const nsCString mLabel;
const nsCString mProtocol;
@@ -472,15 +460,13 @@ class DataChannel {
const bool mNegotiated;
const bool mOrdered;
- // Mainthread only. Once we have transferrable datachannels, this could be
- // worker only instead; wherever the RTCDataChannel lives. Once this can be
- // on a worker thread, we'll need a ref to that thread for state updates and
- // such. This will be nulled out when the RTCDataChannel tears down.
+ // DOM Thread only; wherever the RTCDataChannel lives.
dom::RTCDataChannel* mMainthreadDomDataChannel = nullptr;
bool mHasWorkerDomDataChannel = false;
bool mEverOpened = false;
bool mAnnouncedClosed = false;
uint16_t mStream;
+ RefPtr<GenericNonExclusivePromise> mMessagesSentPromise;
RefPtr<DataChannelConnection> mConnection;
// STS only
diff --git a/testing/web-platform/meta/webrtc/RTCDataChannel-send-blob-order.html.ini b/testing/web-platform/meta/webrtc/RTCDataChannel-send-blob-order.html.ini
@@ -1,2 +0,0 @@
-[RTCDataChannel-send-blob-order.html]
- disabled: https://bugzilla.mozilla.org/show_bug.cgi?id=1577830