WebTransportStreams.cpp (8298B)
1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* vim:set ts=2 sw=2 sts=2 et cindent: */ 3 /* This Source Code Form is subject to the terms of the Mozilla Public 4 * License, v. 2.0. If a copy of the MPL was not distributed with this 5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 6 7 #include "mozilla/dom/WebTransportStreams.h" 8 9 #include "mozilla/Result.h" 10 #include "mozilla/dom/Promise-inl.h" 11 #include "mozilla/dom/WebTransport.h" 12 #include "mozilla/dom/WebTransportBidirectionalStream.h" 13 #include "mozilla/dom/WebTransportLog.h" 14 #include "mozilla/dom/WebTransportReceiveStream.h" 15 #include "mozilla/dom/WebTransportSendStream.h" 16 17 using namespace mozilla::ipc; 18 19 namespace mozilla::dom { 20 NS_IMPL_CYCLE_COLLECTION_INHERITED(WebTransportIncomingStreamsAlgorithms, 21 UnderlyingSourceAlgorithmsWrapper, 22 mTransport, mCallback) 23 NS_IMPL_ADDREF_INHERITED(WebTransportIncomingStreamsAlgorithms, 24 UnderlyingSourceAlgorithmsWrapper) 25 NS_IMPL_RELEASE_INHERITED(WebTransportIncomingStreamsAlgorithms, 26 UnderlyingSourceAlgorithmsWrapper) 27 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(WebTransportIncomingStreamsAlgorithms) 28 NS_INTERFACE_MAP_END_INHERITING(UnderlyingSourceAlgorithmsWrapper) 29 30 WebTransportIncomingStreamsAlgorithms::WebTransportIncomingStreamsAlgorithms( 31 StreamType aUnidirectional, WebTransport* aTransport) 32 : mUnidirectional(aUnidirectional), mTransport(aTransport) {} 33 34 WebTransportIncomingStreamsAlgorithms:: 35 ~WebTransportIncomingStreamsAlgorithms() = default; 36 37 already_AddRefed<Promise> 38 WebTransportIncomingStreamsAlgorithms::PullCallbackImpl( 39 JSContext* aCx, ReadableStreamControllerBase& aController, 40 ErrorResult& aRv) { 41 // https://w3c.github.io/webtransport/#pullbidirectionalstream and 42 // https://w3c.github.io/webtransport/#pullunidirectionalstream 43 44 // Step 1: If transport.[[State]] is "connecting", then return the result 45 // of performing the following steps upon fulfillment of 46 // transport.[[Ready]]: 47 // We don't explicitly check mState here, since we'll reject 48 // mIncomingStreamPromise if we go to FAILED or CLOSED 49 // 50 // Step 2: Let session be transport.[[Session]]. 51 // Step 3: Let p be a new promise. 52 RefPtr<Promise> promise = 53 Promise::CreateInfallible(mTransport->GetParentObject()); 54 RefPtr<WebTransportIncomingStreamsAlgorithms> self(this); 55 // The real work of PullCallback() 56 // Step 5: Wait until there is an available incoming unidirectional stream. 57 auto length = (mUnidirectional == StreamType::Unidirectional) 58 ? mTransport->mUnidirectionalStreams.Length() 59 : mTransport->mBidirectionalStreams.Length(); 60 if (length == 0) { 61 // We need to wait. 62 // Per 63 // https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pulling 64 // we can't be called again until the promise is resolved 65 MOZ_ASSERT(!mCallback); 66 mCallback = promise; 67 68 LOG(("Incoming%sDirectionalStreams Pull waiting for a stream", 69 mUnidirectional == StreamType::Unidirectional ? "Uni" : "Bi")); 70 Result<RefPtr<Promise>, nsresult> returnResult = 71 promise->ThenWithCycleCollectedArgs( 72 [](JSContext* aCx, JS::Handle<JS::Value>, ErrorResult& aRv, 73 RefPtr<WebTransportIncomingStreamsAlgorithms> self, 74 RefPtr<Promise> aPromise) -> already_AddRefed<Promise> { 75 self->BuildStream(aCx, aRv); 76 return nullptr; 77 }, 78 self, promise); 79 if (returnResult.isErr()) { 80 // XXX Reject? 81 aRv.Throw(returnResult.unwrapErr()); 82 return nullptr; 83 } 84 // Step 4: Return p and run the remaining steps in parallel. 85 return returnResult.unwrap().forget(); 86 } 87 self->BuildStream(aCx, aRv); 88 // Step 4: Return p and run the remaining steps in parallel. 89 return promise.forget(); 90 } 91 92 // Note: fallible 93 void WebTransportIncomingStreamsAlgorithms::BuildStream(JSContext* aCx, 94 ErrorResult& aRv) { 95 // https://w3c.github.io/webtransport/#pullbidirectionalstream and 96 // https://w3c.github.io/webtransport/#pullunidirectionalstream 97 LOG(("Incoming%sDirectionalStreams Pull building a stream", 98 mUnidirectional == StreamType::Unidirectional ? "Uni" : "Bi")); 99 if (mUnidirectional == StreamType::Unidirectional) { 100 // Step 6: Let internalStream be the result of receiving an incoming 101 // unidirectional stream. 102 MOZ_ASSERT(mTransport->mUnidirectionalStreams.Length() > 0); 103 std::tuple<uint64_t, RefPtr<mozilla::ipc::DataPipeReceiver>> tuple = 104 mTransport->mUnidirectionalStreams[0]; 105 mTransport->mUnidirectionalStreams.RemoveElementAt(0); 106 107 // Step 7.1: Let stream be the result of creating a 108 // WebTransportReceiveStream with internalStream and transport 109 RefPtr<WebTransportReceiveStream> readableStream = 110 WebTransportReceiveStream::Create(mTransport, mTransport->mGlobal, 111 std::get<0>(tuple), 112 std::get<1>(tuple), aRv); 113 if (MOZ_UNLIKELY(!readableStream)) { 114 aRv.ThrowUnknownError("Internal error"); 115 return; 116 } 117 // Step 7.2 Enqueue stream to transport.[[IncomingUnidirectionalStreams]]. 118 JS::Rooted<JS::Value> jsStream(aCx); 119 if (MOZ_UNLIKELY(!ToJSValue(aCx, readableStream, &jsStream))) { 120 aRv.ThrowUnknownError("Internal error"); 121 return; 122 } 123 // EnqueueNative is CAN_RUN_SCRIPT 124 RefPtr<ReadableStream> incomingStream = 125 mTransport->mIncomingUnidirectionalStreams; 126 incomingStream->EnqueueNative(aCx, jsStream, aRv); 127 if (MOZ_UNLIKELY(aRv.Failed())) { 128 aRv.ThrowUnknownError("Internal error"); 129 return; 130 } 131 } else { 132 // Step 6: Let internalStream be the result of receiving a bidirectional 133 // stream 134 MOZ_ASSERT(mTransport->mBidirectionalStreams.Length() > 0); 135 std::tuple<uint64_t, UniquePtr<BidirectionalPair>> tuple = 136 std::move(mTransport->mBidirectionalStreams.ElementAt(0)); 137 mTransport->mBidirectionalStreams.RemoveElementAt(0); 138 RefPtr<DataPipeReceiver> input = std::get<1>(tuple)->first.forget(); 139 RefPtr<DataPipeSender> output = std::get<1>(tuple)->second.forget(); 140 141 RefPtr<WebTransportBidirectionalStream> stream = 142 WebTransportBidirectionalStream::Create(mTransport, mTransport->mGlobal, 143 std::get<0>(tuple), input, 144 output, Nothing(), aRv); 145 146 // Step 7.2 Enqueue stream to transport.[[IncomingBidirectionalStreams]]. 147 JS::Rooted<JS::Value> jsStream(aCx); 148 if (MOZ_UNLIKELY(!ToJSValue(aCx, stream, &jsStream))) { 149 return; 150 } 151 LOG(("Enqueuing bidirectional stream\n")); 152 // EnqueueNative is CAN_RUN_SCRIPT 153 RefPtr<ReadableStream> incomingStream = 154 mTransport->mIncomingBidirectionalStreams; 155 incomingStream->EnqueueNative(aCx, jsStream, aRv); 156 if (MOZ_UNLIKELY(aRv.Failed())) { 157 return; 158 } 159 } 160 // Step 7.3: Resolve p with undefined. 161 } 162 163 void WebTransportIncomingStreamsAlgorithms::NotifyIncomingStream() { 164 if (mUnidirectional == StreamType::Unidirectional) { 165 LOG(("NotifyIncomingStream: %zu Unidirectional ", 166 mTransport->mUnidirectionalStreams.Length())); 167 #ifdef DEBUG 168 auto number = mTransport->mUnidirectionalStreams.Length(); 169 MOZ_ASSERT(number > 0); 170 #endif 171 RefPtr<Promise> promise = mCallback.forget(); 172 if (promise) { 173 promise->MaybeResolveWithUndefined(); 174 } 175 } else { 176 LOG(("NotifyIncomingStream: %zu Bidirectional ", 177 mTransport->mBidirectionalStreams.Length())); 178 #ifdef DEBUG 179 auto number = mTransport->mBidirectionalStreams.Length(); 180 MOZ_ASSERT(number > 0); 181 #endif 182 RefPtr<Promise> promise = mCallback.forget(); 183 if (promise) { 184 promise->MaybeResolveWithUndefined(); 185 } 186 } 187 } 188 189 void WebTransportIncomingStreamsAlgorithms::NotifyRejectAll() { 190 // cancel all pulls 191 LOG(("Cancel all WebTransport Pulls")); 192 // Ensure we clear the callback before resolving/rejecting it 193 if (RefPtr<Promise> promise = mCallback.forget()) { 194 promise->MaybeReject(NS_ERROR_FAILURE); 195 } 196 } 197 198 } // namespace mozilla::dom