commit e2d169db608ad0eeaa53e596590452b56f51f8e1
parent 03725a4aff91077c58e896e8c6ed81cdd7a2119f
Author: Byron Campen <docfaraday@gmail.com>
Date: Wed, 29 Oct 2025 12:35:15 +0000
Bug 1988096: Track whether stream ids are in use on a per-direction basis. r=ng
Also, make sure that we don't fire close events until streams have been reset
in both directions.
Differential Revision: https://phabricator.services.mozilla.com/D269063
Diffstat:
5 files changed, 111 insertions(+), 106 deletions(-)
diff --git a/dom/media/webrtc/jsapi/RTCDataChannel.cpp b/dom/media/webrtc/jsapi/RTCDataChannel.cpp
@@ -529,7 +529,7 @@ void RTCDataChannel::GracefulClose() {
// closed.
if (!mBufferedAmount && mReadyState != RTCDataChannelState::Closed &&
mDataChannel) {
- mDataChannel->FinishClose();
+ mDataChannel->EndOfStream();
}
}));
}
@@ -643,7 +643,7 @@ void RTCDataChannel::DecrementBufferedAmount(size_t aSize) {
if (mReadyState == RTCDataChannelState::Closing) {
if (mDataChannel) {
// We're done sending
- mDataChannel->FinishClose();
+ mDataChannel->EndOfStream();
}
}
}
diff --git a/netwerk/sctp/datachannel/DataChannel.cpp b/netwerk/sctp/datachannel/DataChannel.cpp
@@ -618,6 +618,8 @@ void DataChannelConnection::HandleOpenRequestMessage(
return;
}
channel->mWaitingForAck = false;
+ channel->mSendStreamNeedsReset = true;
+ channel->mRecvStreamNeedsReset = true;
OnStreamOpen(channel->mStream);
}));
}));
@@ -787,70 +789,44 @@ bool DataChannelConnection::ReassembleMessageChunk(IncomingMsg& aReassembled,
return true;
}
-void DataChannelConnection::ClearResets() {
- MOZ_ASSERT(mSTS->IsOnCurrentThread());
- // Clear all pending resets
- if (!mStreamsResetting.IsEmpty()) {
- DC_DEBUG(("%p: Clearing resets for %zu streams", this,
- mStreamsResetting.Length()));
- }
- mStreamsResetting.Clear();
-}
-
-void DataChannelConnection::MarkStreamForReset(DataChannel& aChannel) {
+void DataChannelConnection::OnStreamsReset(std::vector<uint16_t>&& aStreams) {
MOZ_ASSERT(mSTS->IsOnCurrentThread());
-
- DC_DEBUG(("%p: Resetting outgoing stream %u", this, aChannel.mStream));
- // Rarely has more than a couple items and only for a short time
- for (size_t i = 0; i < mStreamsResetting.Length(); ++i) {
- if (mStreamsResetting[i] == aChannel.mStream) {
- return;
+ for (auto stream : aStreams) {
+ DC_INFO(("%p: Received reset request for stream %u", this, stream));
+ RefPtr<DataChannel> channel = FindChannelByStream(stream);
+ if (channel) {
+ channel->mRecvStreamNeedsReset = false;
+ if (channel->mSendStreamNeedsReset) {
+ // We do not send our own reset yet, we give the RTCDataChannel a chance
+ // to finish sending messages first.
+ DC_INFO(("%p: Need to send a reset, closing gracefully", this));
+ channel->GracefulClose();
+ } else {
+ DC_INFO(
+ ("%p: We've already reset our stream, closing immediately", this));
+ FinishClose_s(channel);
+ }
}
}
- mStreamsResetting.AppendElement(aChannel.mStream);
}
-void DataChannelConnection::OnStreamsReset(std::vector<uint16_t>&& aStreams) {
+void DataChannelConnection::OnStreamsResetComplete(
+ std::vector<uint16_t>&& aStreams) {
MOZ_ASSERT(mSTS->IsOnCurrentThread());
for (auto stream : aStreams) {
+ DC_INFO(("%p: Received reset response for stream %u", this, stream));
RefPtr<DataChannel> channel = FindChannelByStream(stream);
if (channel) {
- // The other side closed the channel
- // We could be in three states:
- // 1. Normal state (input and output streams (OPEN)
- // Notify application, send a RESET in response on our
- // outbound channel. Go to CLOSED
- // 2. We sent our own reset (CLOSING); either they crossed on the
- // wire, or this is a response to our Reset.
- // Go to CLOSED
- // 3. We've sent a open but haven't gotten a response yet (CONNECTING)
- // I believe this is impossible, as we don't have an input stream
- // yet.
-
- DC_DEBUG(("%p: stream %u closed", this, stream));
-
- DC_DEBUG(("%p: Disconnected DataChannel %p", this, (void*)channel));
- channel->GracefulClose();
- } else {
- DC_WARN(("%p: Can't find incoming stream %u", this, stream));
+ channel->mSendStreamNeedsReset = false;
+ if (!channel->mRecvStreamNeedsReset) {
+ // The other end has already performed its reset
+ DC_INFO(
+ ("%p: Remote stream has already been reset, closing immediately",
+ this));
+ FinishClose_s(channel);
+ }
}
}
-
- Dispatch(
- NS_NewRunnableFunction("DataChannelConnection::HandleStreamResetEvent",
- [this, self = RefPtr<DataChannelConnection>(this),
- streamsReset = std::move(aStreams)]() {
- for (auto stream : streamsReset) {
- mStreamIds.RemoveElementSorted(stream);
- }
- }));
-
- // Process pending resets in bulk
- if (!mStreamsResetting.IsEmpty()) {
- DC_DEBUG(
- ("%p: Sending %zu pending resets", this, mStreamsResetting.Length()));
- ResetStreams(mStreamsResetting);
- }
}
already_AddRefed<DataChannel> DataChannelConnection::Open(
@@ -981,6 +957,11 @@ void DataChannelConnection::OpenFinish(RefPtr<DataChannel> aChannel) {
}
}
+ // Even if we're in the negotiated case, and will never send an open request,
+ // we're supposed to send a stream reset when we tear down.
+ aChannel->mSendStreamNeedsReset = true;
+ aChannel->mRecvStreamNeedsReset = true;
+
// Either externally negotiated or we sent Open
// FIX? Move into DOMDataChannel? I don't think we can send it yet here
aChannel->AnnounceOpen();
@@ -1188,17 +1169,28 @@ int DataChannelConnection::SendDataMessage(uint16_t aStream, nsACString&& aMsg,
return 0;
}
-void DataChannelConnection::FinishClose(DataChannel* aChannel) {
+void DataChannelConnection::EndOfStream(DataChannel* aChannel) {
mSTS->Dispatch(NS_NewRunnableFunction(
__func__,
[this, self = RefPtr<DataChannelConnection>(this),
- channel = RefPtr<DataChannel>(aChannel)]() { FinishClose_s(channel); }));
+ channel = RefPtr<DataChannel>(aChannel), stream = aChannel->mStream]() {
+ if (channel->mSendStreamNeedsReset) {
+ DC_INFO(("%p: Need to send a reset, closing gracefully", this));
+ nsTArray<uint16_t> temp({stream});
+ ResetStreams(temp);
+ } else if (!channel->mRecvStreamNeedsReset) {
+ // Stream is reset in both directions (or never existed in the first
+ // place), we're ready to finish tearing down.
+ DC_INFO(("%p: Stream does not need reset in either direction", this));
+ FinishClose_s(channel);
+ }
+ }));
}
-void DataChannel::FinishClose() {
+void DataChannel::EndOfStream() {
// This can happen before mDomEventTarget is actually ready.
if (mConnection) {
- mConnection->FinishClose(this);
+ mConnection->EndOfStream(this);
}
}
@@ -1214,21 +1206,6 @@ void DataChannelConnection::FinishClose_s(DataChannel* aChannel) {
mChannels.Remove(aChannel);
mPending.erase(aChannel);
- // Follow the closing procedure defined for the channel's underlying
- // data transport :
-
- // In the case of an SCTP-based transport, follow [RFC8831], section
- // 6.7.
- if (channel->mStream != INVALID_STREAM) {
- MarkStreamForReset(*aChannel);
- if (GetState() != DataChannelConnectionState::Closed) {
- // Individual channel is being closed, send reset now.
- // If the whole connection is closed, rely on the caller to send the
- // resets once it is done closing all of the channels.
- ResetStreams(mStreamsResetting);
- }
- }
-
// Close the channel's data transport by following the associated
// procedure.
aChannel->AnnounceClosed();
@@ -1238,10 +1215,17 @@ void DataChannelConnection::CloseAll_s() {
// Make sure no more channels will be opened
SetState(DataChannelConnectionState::Closed);
+ nsTArray<uint16_t> streamsToReset;
// Close current channels
// If there are runnables, they hold a strong ref and keep the channel
// and/or connection alive (even if in a CLOSED state)
for (auto& channel : mChannels.GetAll()) {
+ if (channel->mSendStreamNeedsReset) {
+ channel->mSendStreamNeedsReset = false;
+ streamsToReset.AppendElement(channel->mStream);
+ }
+ // We do not wait for the reset to finish in this case; we won't be around
+ // to see the response.
FinishClose_s(channel);
}
@@ -1255,10 +1239,11 @@ void DataChannelConnection::CloseAll_s() {
channel->mStream));
FinishClose_s(channel); // also releases the ref on each iteration
}
+
// It's more efficient to let the Resets queue in shutdown and then
// ResetStreams() here.
- if (!mStreamsResetting.IsEmpty()) {
- ResetStreams(mStreamsResetting);
+ if (!streamsToReset.IsEmpty()) {
+ ResetStreams(streamsToReset);
}
}
@@ -1271,6 +1256,11 @@ void DataChannelConnection::CloseAll() {
[this, self = RefPtr<DataChannelConnection>(this)]() { CloseAll_s(); }));
}
+void DataChannelConnection::MarkStreamAvailable(uint16_t aStream) {
+ MOZ_ASSERT(NS_IsMainThread());
+ mStreamIds.RemoveElementSorted(aStream);
+}
+
bool DataChannelConnection::Channels::IdComparator::Equals(
const RefPtr<DataChannel>& aChannel, uint16_t aId) const {
return aChannel->mStream == aId;
@@ -1424,7 +1414,7 @@ void DataChannel::UnsetMainthreadDomDataChannel() {
}));
} else {
DC_INFO(("%p: No worker RTCDataChannel. Closing.", this));
- FinishClose();
+ EndOfStream();
}
}
@@ -1434,7 +1424,7 @@ void DataChannel::UnsetWorkerDomDataChannel() {
DC_INFO(("%p: Worker RTCDataChannel is being destroyed(%p). Closing.", this,
mWorkerDomDataChannel));
mWorkerDomDataChannel = nullptr;
- FinishClose();
+ EndOfStream();
}
void DataChannel::DecrementBufferedAmount(size_t aSize) {
@@ -1489,25 +1479,29 @@ void DataChannel::AnnounceClosed() {
"RTCDataChannel.",
this));
- mDomEventTarget->Dispatch(NS_NewRunnableFunction(
- "DataChannel::AnnounceClosed", [this, self = RefPtr<DataChannel>(this)] {
- if (GetDomDataChannel()) {
- DC_INFO(("Calling AnnounceClosed on RTCDataChannel."));
- GetDomDataChannel()->AnnounceClosed();
+ GetMainThreadSerialEventTarget()->Dispatch(NS_NewRunnableFunction(
+ "DataChannel::AnnounceClosed",
+ [this, self = RefPtr<DataChannel>(this), connection = mConnection]() {
+ // We have to unset this first, and then fire DOM events, so the event
+ // handler won't hit an error if it tries to reuse this id.
+ if (mStream != INVALID_STREAM) {
+ connection->MarkStreamAvailable(mStream);
}
- if (mConnection) {
- GetMainThreadSerialEventTarget()->Dispatch(NS_NewRunnableFunction(
- "DataChannel::AnnounceClosed",
- [this, self = RefPtr<DataChannel>(this),
- connection = mConnection]() {
- // Stats stuff
- // TODO: Can we simplify this?
- if (mEverOpened && connection->mListener) {
- connection->mListener->NotifyDataChannelClosed(this);
- }
- }));
+ // Stats stuff
+ if (mEverOpened && connection->mListener) {
+ connection->mListener->NotifyDataChannelClosed(this);
}
+
+ mDomEventTarget->Dispatch(
+ NS_NewRunnableFunction("DataChannel::AnnounceClosed", [this, self] {
+ DC_INFO(("%p: Attempting to call AnnounceClosed.", this));
+ if (GetDomDataChannel()) {
+ DC_INFO(
+ ("%p: Calling AnnounceClosed on RTCDataChannel.", this));
+ GetDomDataChannel()->AnnounceClosed();
+ }
+ }));
}));
}
diff --git a/netwerk/sctp/datachannel/DataChannel.h b/netwerk/sctp/datachannel/DataChannel.h
@@ -209,6 +209,7 @@ class DataChannelConnection : public net::NeckoTargetHolder {
void HandleDCEPMessage(IncomingMsg&& aMsg);
void ProcessQueuedOpens();
void OnStreamsReset(std::vector<uint16_t>&& aStreams);
+ void OnStreamsResetComplete(std::vector<uint16_t>&& aStreams);
typedef DataChannelStatsPromise::AllPromiseType StatsPromise;
RefPtr<StatsPromise> GetStats(const DOMHighResTimeStamp aTimestamp) const;
@@ -225,10 +226,11 @@ class DataChannelConnection : public net::NeckoTargetHolder {
DataChannelReliabilityPolicy prPolicy, bool inOrder, uint32_t prValue,
bool aExternalNegotiated, uint16_t aStream);
- void FinishClose(DataChannel* aChannel);
+ void EndOfStream(DataChannel* aChannel);
void FinishClose_s(DataChannel* aChannel);
void CloseAll();
void CloseAll_s();
+ void MarkStreamAvailable(uint16_t aStream);
// Returns a POSIX error code.
int SendMessage(uint16_t stream, nsACString&& aMsg) {
@@ -313,8 +315,6 @@ class DataChannelConnection : public net::NeckoTargetHolder {
void OpenFinish(RefPtr<DataChannel> aChannel);
- void ClearResets();
- void MarkStreamForReset(DataChannel& aChannel);
void HandleUnknownMessage(uint32_t ppid, uint32_t length, uint16_t stream);
void HandleOpenRequestMessage(
const struct rtcweb_datachannel_open_request* req, uint32_t length,
@@ -343,8 +343,6 @@ class DataChannelConnection : public net::NeckoTargetHolder {
RefPtr<MediaTransportHandler> mTransportHandler;
MediaEventListener mPacketReceivedListener;
MediaEventListener mStateChangeListener;
- // Streams pending reset.
- AutoTArray<uint16_t, 4> mStreamsResetting;
DataChannelConnectionState mState = DataChannelConnectionState::Closed;
/***********************************************************/
@@ -453,7 +451,8 @@ class DataChannel {
RefPtr<DataChannelStatsPromise> GetStats(
const DOMHighResTimeStamp aTimestamp);
- void FinishClose();
+ // Called when there will be no more data sent
+ void EndOfStream();
dom::RTCDataChannel* GetDomDataChannel() const {
MOZ_ASSERT(mDomEventTarget->IsOnCurrentThread());
@@ -487,6 +486,8 @@ class DataChannel {
// The channel has been opened, but the peer has not yet acked - ensures that
// the messages are sent ordered until this is cleared.
bool mWaitingForAck = false;
+ bool mSendStreamNeedsReset = false;
+ bool mRecvStreamNeedsReset = false;
nsTArray<OutgoingMsg> mBufferedData;
std::map<uint16_t, IncomingMsg> mRecvBuffers;
diff --git a/netwerk/sctp/datachannel/DataChannelDcSctp.cpp b/netwerk/sctp/datachannel/DataChannelDcSctp.cpp
@@ -341,15 +341,23 @@ void DataChannelConnectionDcSctp::OnStreamsResetFailed(
DC_ERROR(("%s: %p", __func__, this));
// It probably does not make much sense to retry this here. If dcsctp doesn't
// want to retry, we probably don't either.
- (void)aOutgoingStreams;
(void)aReason;
+ std::vector<uint16_t> streamsReset;
+ for (auto id : aOutgoingStreams) {
+ streamsReset.push_back(id.value());
+ }
+ OnStreamsResetComplete(std::move(streamsReset));
}
void DataChannelConnectionDcSctp::OnStreamsResetPerformed(
webrtc::ArrayView<const StreamID> aOutgoingStreams) {
MOZ_ASSERT(mSTS->IsOnCurrentThread());
DC_DEBUG(("%s: %p", __func__, this));
- (void)aOutgoingStreams;
+ std::vector<uint16_t> streamsReset;
+ for (auto id : aOutgoingStreams) {
+ streamsReset.push_back(id.value());
+ }
+ OnStreamsResetComplete(std::move(streamsReset));
}
void DataChannelConnectionDcSctp::OnIncomingStreamsReset(
diff --git a/netwerk/sctp/datachannel/DataChannelUsrsctp.cpp b/netwerk/sctp/datachannel/DataChannelUsrsctp.cpp
@@ -1256,13 +1256,15 @@ void DataChannelConnectionUsrsctp::HandleStreamResetEvent(
(strrst->strreset_length - sizeof(struct sctp_stream_reset_event)) /
sizeof(uint16_t);
for (size_t i = 0; i < n; ++i) {
- if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
- streamsReset.push_back(strrst->strreset_stream_list[i]);
- }
+ streamsReset.push_back(strrst->strreset_stream_list[i]);
}
}
- OnStreamsReset(std::move(streamsReset));
+ if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
+ OnStreamsReset(std::move(streamsReset));
+ } else if (strrst->strreset_flags & SCTP_STREAM_RESET_OUTGOING_SSN) {
+ OnStreamsResetComplete(std::move(streamsReset));
+ }
}
void DataChannelConnectionUsrsctp::HandleStreamChangeEvent(