commit e0e338611430f18cae466a786d5a15f660f5df6c
parent f6fd8610b5e399c38d47188a330d008907e3920e
Author: Byron Campen <docfaraday@gmail.com>
Date: Wed, 29 Oct 2025 12:35:16 +0000
Bug 1988096: Use cancelable runnables, and fallible dispatch. r=ng
This helps ensure that these runnables (and all of their lambda captures)
aren't leaked during shutdown.
Differential Revision: https://phabricator.services.mozilla.com/D269067
Diffstat:
1 file changed, 340 insertions(+), 281 deletions(-)
diff --git a/netwerk/sctp/datachannel/DataChannel.cpp b/netwerk/sctp/datachannel/DataChannel.cpp
@@ -67,7 +67,7 @@ DataChannelConnection::~DataChannelConnection() {
// is in the event loop already)
nsCOMPtr<nsIRunnable> r = WrapRunnable(
nsCOMPtr<nsIThread>(mInternalIOThread), &nsIThread::AsyncShutdown);
- mSTS->Dispatch(r.forget());
+ mSTS->Dispatch(r.forget(), NS_DISPATCH_FALLIBLE);
}
} else {
// on STS, safe to call shutdown
@@ -85,15 +85,18 @@ void DataChannelConnection::Destroy() {
MOZ_DIAGNOSTIC_ASSERT(mSTS);
#endif
mListener = nullptr;
- mSTS->Dispatch(NS_NewRunnableFunction(
- __func__, [this, self = RefPtr<DataChannelConnection>(this)]() {
- mPacketReceivedListener.DisconnectIfExists();
- mStateChangeListener.DisconnectIfExists();
+ mSTS->Dispatch(
+ NS_NewCancelableRunnableFunction(
+ __func__,
+ [this, self = RefPtr<DataChannelConnection>(this)]() {
+ mPacketReceivedListener.DisconnectIfExists();
+ mStateChangeListener.DisconnectIfExists();
#ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
- mShutdown = true;
- DC_DEBUG(("Shutting down connection %p, id %p", this, (void*)mId));
+ mShutdown = true;
+ DC_INFO(("Shutting down connection %p, id %p", this, (void*)mId));
#endif
- }));
+ }),
+ NS_DISPATCH_FALLIBLE);
}
Maybe<RefPtr<DataChannelConnection>> DataChannelConnection::Create(
@@ -282,14 +285,16 @@ bool DataChannelConnection::ConnectToTransport(const std::string& aTransportId,
}
}
- mSTS->Dispatch(NS_NewRunnableFunction(
- __func__, [this, self = RefPtr<DataChannelConnection>(this),
- hasStreamId = std::move(hasStreamId)]() {
- SetState(DataChannelConnectionState::Connecting);
- for (auto channel : hasStreamId) {
- OpenFinish(channel);
- }
- }));
+ mSTS->Dispatch(NS_NewCancelableRunnableFunction(
+ __func__,
+ [this, self = RefPtr<DataChannelConnection>(this),
+ hasStreamId = std::move(hasStreamId)]() {
+ SetState(DataChannelConnectionState::Connecting);
+ for (auto channel : hasStreamId) {
+ OpenFinish(channel);
+ }
+ }),
+ NS_DISPATCH_FALLIBLE);
}
// We do not check whether this is a new transport id here, that happens on
@@ -367,15 +372,18 @@ void DataChannelConnection::OnPacketReceived(const std::string& aTransportId,
}
void DataChannelConnection::SendPacket(std::unique_ptr<MediaPacket>&& packet) {
- mSTS->Dispatch(NS_NewRunnableFunction(
- "DataChannelConnection::SendPacket",
- [this, self = RefPtr<DataChannelConnection>(this),
- packet = std::move(packet)]() mutable {
- // DC_DEBUG(("%p: SCTP/DTLS sent %ld bytes", this, len));
- if (!mTransportId.empty() && mTransportHandler) {
- mTransportHandler->SendPacket(mTransportId, std::move(*packet));
- }
- }));
+ mSTS->Dispatch(NS_NewCancelableRunnableFunction(
+ "DataChannelConnection::SendPacket",
+ [this, self = RefPtr<DataChannelConnection>(this),
+ packet = std::move(packet)]() mutable {
+ // DC_DEBUG(("%p: SCTP/DTLS sent %ld bytes",
+ // this, len));
+ if (!mTransportId.empty() && mTransportHandler) {
+ mTransportHandler->SendPacket(mTransportId,
+ std::move(*packet));
+ }
+ }),
+ NS_DISPATCH_FALLIBLE);
}
already_AddRefed<DataChannel> DataChannelConnection::FindChannelByStream(
@@ -544,85 +552,94 @@ void DataChannelConnection::HandleOpenRequestMessage(
// Always dispatch this to mainthread; this is a brand new datachannel, which
// has not had any opportunity to be transferred to a worker.
- Dispatch(NS_NewRunnableFunction(
- "DataChannelConnection::HandleOpenRequestMessage",
- [this, self = RefPtr<DataChannelConnection>(this), stream, prPolicy,
- prValue, ordered, label, protocol]() {
- RefPtr<DataChannel> channel = FindChannelByStream(stream);
- if (channel) {
- if (!channel->mNegotiated) {
- DC_ERROR(
- ("HandleOpenRequestMessage: channel for pre-existing stream "
- "%u that was not externally negotiated. JS is lying to us, or "
- "there's an id collision.",
- stream));
- /* XXX: some error handling */
- } else {
- DC_DEBUG(("Open for externally negotiated channel %u", stream));
- // XXX should also check protocol, maybe label
- if (prPolicy != channel->mPrPolicy ||
- prValue != channel->mPrValue || ordered != channel->mOrdered) {
- DC_WARN(
- ("external negotiation mismatch with OpenRequest:"
- "channel %u, policy %s/%s, value %u/%u, ordered %d/%d",
- stream, ToString(prPolicy), ToString(channel->mPrPolicy),
- prValue, channel->mPrValue, static_cast<int>(ordered),
- static_cast<int>(channel->mOrdered)));
+ Dispatch(
+ NS_NewCancelableRunnableFunction(
+ "DataChannelConnection::HandleOpenRequestMessage",
+ [this, self = RefPtr<DataChannelConnection>(this), stream, prPolicy,
+ prValue, ordered, label, protocol]() {
+ RefPtr<DataChannel> channel = FindChannelByStream(stream);
+ if (channel) {
+ if (!channel->mNegotiated) {
+ DC_ERROR((
+ "HandleOpenRequestMessage: channel for pre-existing stream "
+ "%u that was not externally negotiated. JS is lying to us, "
+ "or there's an id collision.",
+ stream));
+ /* XXX: some error handling */
+ } else {
+ DC_DEBUG(("Open for externally negotiated channel %u", stream));
+ // XXX should also check protocol, maybe label
+ if (prPolicy != channel->mPrPolicy ||
+ prValue != channel->mPrValue ||
+ ordered != channel->mOrdered) {
+ DC_WARN(
+ ("external negotiation mismatch with OpenRequest:"
+ "channel %u, policy %s/%s, value %u/%u, ordered %d/%d",
+ stream, ToString(prPolicy), ToString(channel->mPrPolicy),
+ prValue, channel->mPrValue, static_cast<int>(ordered),
+ static_cast<int>(channel->mOrdered)));
+ }
+ }
+ return;
+ }
+ channel = new DataChannel(this, stream, label, protocol, prPolicy,
+ prValue, ordered, false);
+ mChannels.Insert(channel);
+ mStreamIds.InsertElementSorted(stream);
+
+ DC_DEBUG(("%p: sending ON_CHANNEL_CREATED for %s/%s: %u", this,
+ channel->mLabel.get(), channel->mProtocol.get(), stream));
+
+ // Awkward. If we convert over to using Maybe for this in
+ // DataChannel, we won't need to have this extra conversion, since
+ // Nullable converts easily to Maybe.
+ dom::Nullable<uint16_t> maxLifeTime;
+ dom::Nullable<uint16_t> maxRetransmits;
+ if (prPolicy == DataChannelReliabilityPolicy::LimitedLifetime) {
+ maxLifeTime.SetValue(std::min(
+ std::numeric_limits<uint16_t>::max(), (uint16_t)prValue));
+ } else if (prPolicy ==
+ DataChannelReliabilityPolicy::LimitedRetransmissions) {
+ maxRetransmits.SetValue(std::min(
+ std::numeric_limits<uint16_t>::max(), (uint16_t)prValue));
}
- }
- return;
- }
- channel = new DataChannel(this, stream, label, protocol, prPolicy,
- prValue, ordered, false);
- mChannels.Insert(channel);
- mStreamIds.InsertElementSorted(stream);
-
- DC_DEBUG(("%p: sending ON_CHANNEL_CREATED for %s/%s: %u", this,
- channel->mLabel.get(), channel->mProtocol.get(), stream));
-
- // Awkward. If we convert over to using Maybe for this in DataChannel,
- // we won't need to have this extra conversion, since Nullable converts
- // easily to Maybe.
- dom::Nullable<uint16_t> maxLifeTime;
- dom::Nullable<uint16_t> maxRetransmits;
- if (prPolicy == DataChannelReliabilityPolicy::LimitedLifetime) {
- maxLifeTime.SetValue(std::min(std::numeric_limits<uint16_t>::max(),
- (uint16_t)prValue));
- } else if (prPolicy ==
- DataChannelReliabilityPolicy::LimitedRetransmissions) {
- maxRetransmits.SetValue(std::min(std::numeric_limits<uint16_t>::max(),
- (uint16_t)prValue));
- }
- if (mListener) {
- // important to give it an already_AddRefed pointer!
- // TODO(bug 1974443): Have nsDOMDataChannel create the DataChannel
- // object, or have DataChannel take an nsDOMDataChannel, to avoid
- // passing this param list more than once?
- mListener->NotifyDataChannel(do_AddRef(channel), label, ordered,
- maxLifeTime, maxRetransmits, protocol,
- false);
- // Spec says to queue this in the queued task for ondatachannel
- channel->AnnounceOpen();
- }
+ if (mListener) {
+ // important to give it an already_AddRefed pointer!
+ // TODO(bug 1974443): Have nsDOMDataChannel create the DataChannel
+ // object, or have DataChannel take an nsDOMDataChannel, to avoid
+ // passing this param list more than once?
+ mListener->NotifyDataChannel(do_AddRef(channel), label, ordered,
+ maxLifeTime, maxRetransmits,
+ protocol, false);
+ // Spec says to queue this in the queued task for ondatachannel
+ channel->AnnounceOpen();
+ }
- mSTS->Dispatch(NS_NewRunnableFunction(
- "DataChannelConnection::HandleOpenRequestMessage",
- [this, self = RefPtr<DataChannelConnection>(this), channel]() {
- // Note that any message can be buffered; SendOpenAckMessage may
- // error later than this check.
- const auto error = SendOpenAckMessage(*channel);
- if (error) {
- DC_ERROR(("SendOpenAckMessage failed, error = %d", error));
- FinishClose_s(channel);
- return;
- }
- channel->mWaitingForAck = false;
- channel->mSendStreamNeedsReset = true;
- channel->mRecvStreamNeedsReset = true;
- OnStreamOpen(channel->mStream);
- }));
- }));
+ mSTS->Dispatch(
+ NS_NewCancelableRunnableFunction(
+ "DataChannelConnection::HandleOpenRequestMessage",
+ [this, self = RefPtr<DataChannelConnection>(this),
+ channel = std::move(channel)]() {
+ // Note that any message can be buffered;
+ // SendOpenAckMessage may error later than this check.
+ const auto error = SendOpenAckMessage(*channel);
+ if (error) {
+ DC_ERROR(
+ ("%p: SendOpenAckMessage failed, channel %p, error "
+ "= %d",
+ this, channel.get(), error));
+ FinishClose_s(channel);
+ return;
+ }
+ channel->mWaitingForAck = false;
+ channel->mSendStreamNeedsReset = true;
+ channel->mRecvStreamNeedsReset = true;
+ OnStreamOpen(channel->mStream);
+ }),
+ NS_DISPATCH_FALLIBLE);
+ }),
+ NS_DISPATCH_FALLIBLE);
}
// Caller must ensure that length <= SIZE_MAX
@@ -879,11 +896,11 @@ already_AddRefed<DataChannel> DataChannelConnection::Open(
mChannels.Insert(channel);
if (aStream != INVALID_STREAM) {
- mSTS->Dispatch(NS_NewRunnableFunction(
- "DataChannel::OpenFinish",
- [this, self = RefPtr<DataChannelConnection>(this), channel]() mutable {
- OpenFinish(channel);
- }));
+ mSTS->Dispatch(NS_NewCancelableRunnableFunction(
+ "DataChannel::OpenFinish",
+ [this, self = RefPtr<DataChannelConnection>(this),
+ channel]() mutable { OpenFinish(channel); }),
+ NS_DISPATCH_FALLIBLE);
}
return channel.forget();
@@ -1013,7 +1030,8 @@ int DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream* aBlob) {
}
mInternalIOThread->Dispatch(
- do_AddRef(new ReadBlobRunnable(this, stream, aBlob)), NS_DISPATCH_NORMAL);
+ do_AddRef(new ReadBlobRunnable(this, stream, aBlob)),
+ NS_DISPATCH_FALLIBLE);
return 0;
}
@@ -1066,19 +1084,23 @@ void DataChannelConnection::SetState(DataChannelConnectionState aState) {
mState = aState;
if (mState == DataChannelConnectionState::Open) {
- Dispatch(NS_NewRunnableFunction(
- __func__, [this, self = RefPtr<DataChannelConnection>(this)]() {
- if (mListener) {
- mListener->NotifySctpConnected();
- }
- }));
+ Dispatch(NS_NewCancelableRunnableFunction(
+ __func__,
+ [this, self = RefPtr<DataChannelConnection>(this)]() {
+ if (mListener) {
+ mListener->NotifySctpConnected();
+ }
+ }),
+ NS_DISPATCH_FALLIBLE);
} else if (mState == DataChannelConnectionState::Closed) {
- Dispatch(NS_NewRunnableFunction(
- __func__, [this, self = RefPtr<DataChannelConnection>(this)]() {
- if (mListener) {
- mListener->NotifySctpClosed();
- }
- }));
+ Dispatch(NS_NewCancelableRunnableFunction(
+ __func__,
+ [this, self = RefPtr<DataChannelConnection>(this)]() {
+ if (mListener) {
+ mListener->NotifySctpClosed();
+ }
+ }),
+ NS_DISPATCH_FALLIBLE);
}
}
@@ -1107,7 +1129,7 @@ void DataChannelConnection::ReadBlob(
return;
}
aBlob->Close();
- Dispatch(runnable.forget());
+ Dispatch(runnable.forget(), NS_DISPATCH_FALLIBLE);
}
int DataChannelConnection::SendDataMessage(uint16_t aStream, nsACString&& aMsg,
@@ -1116,91 +1138,98 @@ int DataChannelConnection::SendDataMessage(uint16_t aStream, nsACString&& aMsg,
nsCString temp(std::move(aMsg));
- mSTS->Dispatch(NS_NewRunnableFunction(
- __func__, [this, self = RefPtr<DataChannelConnection>(this), aStream,
- msg = std::move(temp), aIsBinary]() mutable {
- RefPtr<DataChannel> channel = FindChannelByStream(aStream);
- if (!channel) {
- // Must have closed due to a transport error?
- return;
- }
+ mSTS->Dispatch(
+ NS_NewCancelableRunnableFunction(
+ __func__,
+ [this, self = RefPtr<DataChannelConnection>(this), aStream,
+ msg = std::move(temp), aIsBinary]() mutable {
+ RefPtr<DataChannel> channel = FindChannelByStream(aStream);
+ if (!channel) {
+ // Must have closed due to a transport error?
+ return;
+ }
- Maybe<uint16_t> maxRetransmissions;
- Maybe<uint16_t> maxLifetimeMs;
-
- switch (channel->mPrPolicy) {
- case DataChannelReliabilityPolicy::Reliable:
- break;
- case DataChannelReliabilityPolicy::LimitedRetransmissions:
- maxRetransmissions = Some(channel->mPrValue);
- break;
- case DataChannelReliabilityPolicy::LimitedLifetime:
- maxLifetimeMs = Some(channel->mPrValue);
- break;
- }
+ Maybe<uint16_t> maxRetransmissions;
+ Maybe<uint16_t> maxLifetimeMs;
+
+ switch (channel->mPrPolicy) {
+ case DataChannelReliabilityPolicy::Reliable:
+ break;
+ case DataChannelReliabilityPolicy::LimitedRetransmissions:
+ maxRetransmissions = Some(channel->mPrValue);
+ break;
+ case DataChannelReliabilityPolicy::LimitedLifetime:
+ maxLifetimeMs = Some(channel->mPrValue);
+ break;
+ }
- uint32_t ppid;
- if (aIsBinary) {
- if (msg.Length()) {
- ppid = DATA_CHANNEL_PPID_BINARY;
- } else {
- ppid = DATA_CHANNEL_PPID_BINARY_EMPTY;
- msg.Append('\0');
- }
- } else {
- if (msg.Length()) {
- ppid = DATA_CHANNEL_PPID_DOMSTRING;
- } else {
- ppid = DATA_CHANNEL_PPID_DOMSTRING_EMPTY;
- msg.Append('\0');
- }
- }
+ uint32_t ppid;
+ if (aIsBinary) {
+ if (msg.Length()) {
+ ppid = DATA_CHANNEL_PPID_BINARY;
+ } else {
+ ppid = DATA_CHANNEL_PPID_BINARY_EMPTY;
+ msg.Append('\0');
+ }
+ } else {
+ if (msg.Length()) {
+ ppid = DATA_CHANNEL_PPID_DOMSTRING;
+ } else {
+ ppid = DATA_CHANNEL_PPID_DOMSTRING_EMPTY;
+ msg.Append('\0');
+ }
+ }
- DataChannelMessageMetadata metadata(
- channel->mStream, ppid,
- !channel->mOrdered && !channel->mWaitingForAck, maxRetransmissions,
- maxLifetimeMs);
- // Create message instance and send
- OutgoingMsg outgoing(std::move(msg), metadata);
+ DataChannelMessageMetadata metadata(
+ channel->mStream, ppid,
+ !channel->mOrdered && !channel->mWaitingForAck,
+ maxRetransmissions, maxLifetimeMs);
+ // Create message instance and send
+ OutgoingMsg outgoing(std::move(msg), metadata);
- SendMessage(*channel, std::move(outgoing));
- }));
+ SendMessage(*channel, std::move(outgoing));
+ }),
+ NS_DISPATCH_FALLIBLE);
return 0;
}
void DataChannelConnection::EndOfStream(DataChannel* aChannel) {
- mSTS->Dispatch(NS_NewRunnableFunction(
- __func__,
- [this, self = RefPtr<DataChannelConnection>(this),
- channel = RefPtr<DataChannel>(aChannel), stream = aChannel->mStream]() {
- if (channel->mSendStreamNeedsReset) {
- DC_INFO(("%p: Need to send a reset, closing gracefully", this));
- nsTArray<uint16_t> temp({stream});
- bool success = ResetStreams(temp);
- if (success) {
- return;
- }
- // We presume that OnStreamResetComplete will not be called in
- // this case, nor will we receive a stream reset from the other
- // end.
- DC_INFO(
- ("%p: Failed to send a reset for channel %p, closing "
- "immediately",
- this, channel.get()));
- channel->mRecvStreamNeedsReset = false;
- }
+ mSTS->Dispatch(
+ NS_NewCancelableRunnableFunction(
+ __func__,
+ [this, self = RefPtr<DataChannelConnection>(this), channel = aChannel,
+ stream = aChannel->mStream]() {
+ if (channel->mSendStreamNeedsReset) {
+ DC_INFO((
+ "%p: Need to send a reset for channel %p, closing gracefully",
+ this, channel));
+ nsTArray<uint16_t> temp({stream});
+ bool success = ResetStreams(temp);
+ if (success) {
+ return;
+ }
+ // We presume that OnStreamResetComplete will not be called in
+ // this case, nor will we receive a stream reset from the other
+ // end.
+ DC_INFO(
+ ("%p: Failed to send a reset for channel %p, closing "
+ "immediately",
+ this, channel));
+ channel->mRecvStreamNeedsReset = false;
+ }
- if (!channel->mRecvStreamNeedsReset) {
- // Stream is reset in both directions (or never existed in the
- // first place), we're ready to finish tearing down.
- DC_INFO(
- ("%p: Stream does not need reset in either direction for "
- "channel %p",
- this, channel.get()));
- FinishClose_s(channel);
- }
- }));
+ if (!channel->mRecvStreamNeedsReset) {
+ // Stream is reset in both directions (or never existed in the
+ // first place), we're ready to finish tearing down.
+ DC_INFO(
+ ("%p: Stream does not need reset in either direction for "
+ "channel %p",
+ this, channel));
+ FinishClose_s(channel);
+ }
+ }),
+ NS_DISPATCH_FALLIBLE);
}
void DataChannel::EndOfStream() {
@@ -1267,9 +1296,12 @@ void DataChannelConnection::CloseAll() {
MOZ_ASSERT(NS_IsMainThread());
DC_DEBUG(("%p: Closing all channels", this));
- mSTS->Dispatch(NS_NewRunnableFunction(
- "DataChannelConnection::CloseAll",
- [this, self = RefPtr<DataChannelConnection>(this)]() { CloseAll_s(); }));
+ mSTS->Dispatch(NS_NewCancelableRunnableFunction(
+ "DataChannelConnection::CloseAll",
+ [this, self = RefPtr<DataChannelConnection>(this)]() {
+ CloseAll_s();
+ }),
+ NS_DISPATCH_FALLIBLE);
}
void DataChannelConnection::MarkStreamAvailable(uint16_t aStream) {
@@ -1422,12 +1454,14 @@ void DataChannel::UnsetMainthreadDomDataChannel() {
("Mainthread RTCDataChannel is being destroyed. Dispatching task to "
"inform corresponding worker RTCDataChannel."));
mDomEventTarget->Dispatch(
- NS_NewRunnableFunction("DataChannel::UnsetMainthreadDomDataChannel",
- [this, self = RefPtr<DataChannel>(this)] {
- if (mWorkerDomDataChannel) {
- mWorkerDomDataChannel->UnsetWorkerNeedsUs();
- }
- }));
+ NS_NewCancelableRunnableFunction(
+ "DataChannel::UnsetMainthreadDomDataChannel",
+ [this, self = RefPtr<DataChannel>(this)] {
+ if (mWorkerDomDataChannel) {
+ mWorkerDomDataChannel->UnsetWorkerNeedsUs();
+ }
+ }),
+ NS_DISPATCH_FALLIBLE);
} else {
DC_INFO(("%p: No worker RTCDataChannel. Closing.", this));
EndOfStream();
@@ -1444,13 +1478,15 @@ void DataChannel::UnsetWorkerDomDataChannel() {
}
void DataChannel::DecrementBufferedAmount(size_t aSize) {
- mDomEventTarget->Dispatch(NS_NewRunnableFunction(
- "DataChannel::DecrementBufferedAmount",
- [this, self = RefPtr<DataChannel>(this), aSize] {
- if (GetDomDataChannel()) {
- GetDomDataChannel()->DecrementBufferedAmount(aSize);
- }
- }));
+ mDomEventTarget->Dispatch(
+ NS_NewCancelableRunnableFunction(
+ "DataChannel::DecrementBufferedAmount",
+ [this, self = RefPtr<DataChannel>(this), aSize] {
+ if (GetDomDataChannel()) {
+ GetDomDataChannel()->DecrementBufferedAmount(aSize);
+ }
+ }),
+ NS_DISPATCH_FALLIBLE);
}
void DataChannel::AnnounceOpen() {
@@ -1462,29 +1498,34 @@ void DataChannel::AnnounceOpen() {
("%p: DataChannel is open. Queueing AnnounceOpen call to RTCDataChannel.",
this));
- mDomEventTarget->Dispatch(NS_NewRunnableFunction(
- "DataChannel::AnnounceOpen", [this, self = RefPtr<DataChannel>(this)] {
- if (GetDomDataChannel()) {
- DC_INFO(("Calling AnnounceOpen on RTCDataChannel."));
- GetDomDataChannel()->AnnounceOpen();
- }
+ mDomEventTarget->Dispatch(
+ NS_NewCancelableRunnableFunction(
+ "DataChannel::AnnounceOpen",
+ [this, self = RefPtr<DataChannel>(this)] {
+ if (GetDomDataChannel()) {
+ DC_INFO(("Calling AnnounceOpen on RTCDataChannel."));
+ GetDomDataChannel()->AnnounceOpen();
+ }
- // Right now, we're already on mainthread, but this might be a worker
- // someday.
- if (mConnection) {
- GetMainThreadSerialEventTarget()->Dispatch(NS_NewRunnableFunction(
- "DataChannel::AnnounceOpen",
- [this, self = RefPtr<DataChannel>(this),
- connection = mConnection]() {
- // Stats stuff
- // TODO: Can we simplify this?
- if (!mEverOpened && connection->mListener) {
- mEverOpened = true;
- connection->mListener->NotifyDataChannelOpen(this);
- }
- }));
- }
- }));
+ // Right now, we're already on mainthread, but this might be a
+ // worker someday.
+ if (mConnection) {
+ GetMainThreadSerialEventTarget()->Dispatch(
+ NS_NewCancelableRunnableFunction(
+ "DataChannel::AnnounceOpen",
+ [this, self = RefPtr<DataChannel>(this),
+ connection = mConnection]() {
+ // Stats stuff
+ // TODO: Can we simplify this?
+ if (!mEverOpened && connection->mListener) {
+ mEverOpened = true;
+ connection->mListener->NotifyDataChannelOpen(this);
+ }
+ }),
+ NS_DISPATCH_FALLIBLE);
+ }
+ }),
+ NS_DISPATCH_FALLIBLE);
}
void DataChannel::AnnounceClosed() {
@@ -1495,30 +1536,36 @@ void DataChannel::AnnounceClosed() {
"RTCDataChannel.",
this));
- GetMainThreadSerialEventTarget()->Dispatch(NS_NewRunnableFunction(
- "DataChannel::AnnounceClosed",
- [this, self = RefPtr<DataChannel>(this), connection = mConnection]() {
- // We have to unset this first, and then fire DOM events, so the event
- // handler won't hit an error if it tries to reuse this id.
- if (mStream != INVALID_STREAM) {
- connection->MarkStreamAvailable(mStream);
- }
+ GetMainThreadSerialEventTarget()->Dispatch(
+ NS_NewCancelableRunnableFunction(
+ "DataChannel::AnnounceClosed",
+ [this, self = RefPtr<DataChannel>(this), connection = mConnection]() {
+ // We have to unset this first, and then fire DOM events, so the
+ // event handler won't hit an error if it tries to reuse this id.
+ if (mStream != INVALID_STREAM) {
+ connection->MarkStreamAvailable(mStream);
+ }
- // Stats stuff
- if (mEverOpened && connection->mListener) {
- connection->mListener->NotifyDataChannelClosed(this);
- }
+ // Stats stuff
+ if (mEverOpened && connection->mListener) {
+ connection->mListener->NotifyDataChannelClosed(this);
+ }
- mDomEventTarget->Dispatch(
- NS_NewRunnableFunction("DataChannel::AnnounceClosed", [this, self] {
- DC_INFO(("%p: Attempting to call AnnounceClosed.", this));
- if (GetDomDataChannel()) {
- DC_INFO(
- ("%p: Calling AnnounceClosed on RTCDataChannel.", this));
- GetDomDataChannel()->AnnounceClosed();
- }
- }));
- }));
+ mDomEventTarget->Dispatch(
+ NS_NewCancelableRunnableFunction(
+ "DataChannel::AnnounceClosed",
+ [this, self = RefPtr<DataChannel>(this)] {
+ DC_INFO(("%p: Attempting to call AnnounceClosed.", this));
+ if (GetDomDataChannel()) {
+ DC_INFO(
+ ("%p: Calling AnnounceClosed on RTCDataChannel.",
+ this));
+ GetDomDataChannel()->AnnounceClosed();
+ }
+ }),
+ NS_DISPATCH_FALLIBLE);
+ }),
+ NS_DISPATCH_FALLIBLE);
}
void DataChannel::GracefulClose() {
@@ -1527,13 +1574,16 @@ void DataChannel::GracefulClose() {
"RTCDataChannel.",
this));
- mDomEventTarget->Dispatch(NS_NewRunnableFunction(
- "DataChannel::GracefulClose", [this, self = RefPtr<DataChannel>(this)] {
- if (GetDomDataChannel()) {
- DC_INFO(("Calling GracefulClose on RTCDataChannel."));
- GetDomDataChannel()->GracefulClose();
- }
- }));
+ mDomEventTarget->Dispatch(
+ NS_NewCancelableRunnableFunction(
+ "DataChannel::GracefulClose",
+ [this, self = RefPtr<DataChannel>(this)] {
+ if (GetDomDataChannel()) {
+ DC_INFO(("Calling GracefulClose on RTCDataChannel."));
+ GetDomDataChannel()->GracefulClose();
+ }
+ }),
+ NS_DISPATCH_FALLIBLE);
}
int DataChannel::SendMsg(nsACString&& aMsg) {
@@ -1563,12 +1613,15 @@ void DataChannel::SetStream(uint16_t aId) {
DC_INFO(
("DataChannel has been allocated a stream ID. Queueing task to inform "
"worker RTCDataChannel."));
- mDomEventTarget->Dispatch(NS_NewRunnableFunction(
- __func__, [this, self = RefPtr<DataChannel>(this), aId] {
- if (mWorkerDomDataChannel) {
- mWorkerDomDataChannel->SetId(aId);
- }
- }));
+ mDomEventTarget->Dispatch(
+ NS_NewCancelableRunnableFunction(
+ __func__,
+ [this, self = RefPtr<DataChannel>(this), aId] {
+ if (mWorkerDomDataChannel) {
+ mWorkerDomDataChannel->SetId(aId);
+ }
+ }),
+ NS_DISPATCH_FALLIBLE);
} else {
DC_INFO(
("%p: DataChannel has been allocated a stream ID. Synchronously "
@@ -1585,12 +1638,15 @@ void DataChannel::SetMaxMessageSize(double aMaxMessageSize) {
DC_INFO(
("DataChannel has updated its maximum message size. Queueing task to "
"inform worker RTCDataChannel."));
- mDomEventTarget->Dispatch(NS_NewRunnableFunction(
- __func__, [this, self = RefPtr<DataChannel>(this), aMaxMessageSize] {
- if (mWorkerDomDataChannel) {
- mWorkerDomDataChannel->SetMaxMessageSize(aMaxMessageSize);
- }
- }));
+ mDomEventTarget->Dispatch(
+ NS_NewCancelableRunnableFunction(
+ __func__,
+ [this, self = RefPtr<DataChannel>(this), aMaxMessageSize] {
+ if (mWorkerDomDataChannel) {
+ mWorkerDomDataChannel->SetMaxMessageSize(aMaxMessageSize);
+ }
+ }),
+ NS_DISPATCH_FALLIBLE);
} else {
DC_INFO(
("%p: DataChannel has updated its maximum message size. Synchronously "
@@ -1608,13 +1664,16 @@ void DataChannel::OnMessageReceived(nsCString&& aMsg, bool aIsBinary) {
DC_DEBUG(
("%p: received message (%s)", this, aIsBinary ? "binary" : "string"));
- mDomEventTarget->Dispatch(NS_NewRunnableFunction(
- "DataChannel::OnMessageReceived", [this, self = RefPtr<DataChannel>(this),
- msg = std::move(aMsg), aIsBinary]() {
- if (GetDomDataChannel()) {
- GetDomDataChannel()->DoOnMessageAvailable(msg, aIsBinary);
- }
- }));
+ mDomEventTarget->Dispatch(NS_NewCancelableRunnableFunction(
+ "DataChannel::OnMessageReceived",
+ [this, self = RefPtr<DataChannel>(this),
+ msg = std::move(aMsg), aIsBinary]() {
+ if (GetDomDataChannel()) {
+ GetDomDataChannel()->DoOnMessageAvailable(
+ msg, aIsBinary);
+ }
+ }),
+ NS_DISPATCH_FALLIBLE);
}
} // namespace mozilla