UnderlyingSinkCallbackHelpers.cpp (11763B)
1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* vim:set ts=2 sw=2 sts=2 et cindent: */ 3 /* This Source Code Form is subject to the terms of the Mozilla Public 4 * License, v. 2.0. If a copy of the MPL was not distributed with this 5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 6 7 #include "mozilla/dom/UnderlyingSinkCallbackHelpers.h" 8 9 #include "StreamUtils.h" 10 #include "mozilla/dom/BufferSourceBinding.h" 11 #include "mozilla/dom/BufferSourceBindingFwd.h" 12 #include "mozilla/dom/UnionTypes.h" 13 #include "mozilla/dom/WebTransportError.h" 14 #include "nsHttp.h" 15 16 using namespace mozilla::dom; 17 18 NS_IMPL_CYCLE_COLLECTION(UnderlyingSinkAlgorithmsBase) 19 NS_IMPL_CYCLE_COLLECTING_ADDREF(UnderlyingSinkAlgorithmsBase) 20 NS_IMPL_CYCLE_COLLECTING_RELEASE(UnderlyingSinkAlgorithmsBase) 21 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(UnderlyingSinkAlgorithmsBase) 22 NS_INTERFACE_MAP_ENTRY(nsISupports) 23 NS_INTERFACE_MAP_END 24 25 NS_IMPL_CYCLE_COLLECTION_INHERITED_WITH_JS_MEMBERS( 26 UnderlyingSinkAlgorithms, UnderlyingSinkAlgorithmsBase, 27 (mGlobal, mStartCallback, mWriteCallback, mCloseCallback, mAbortCallback), 28 (mUnderlyingSink)) 29 NS_IMPL_ADDREF_INHERITED(UnderlyingSinkAlgorithms, UnderlyingSinkAlgorithmsBase) 30 NS_IMPL_RELEASE_INHERITED(UnderlyingSinkAlgorithms, 31 UnderlyingSinkAlgorithmsBase) 32 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(UnderlyingSinkAlgorithms) 33 NS_INTERFACE_MAP_END_INHERITING(UnderlyingSinkAlgorithmsBase) 34 35 // https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink 36 void UnderlyingSinkAlgorithms::StartCallback( 37 JSContext* aCx, WritableStreamDefaultController& aController, 38 JS::MutableHandle<JS::Value> aRetVal, ErrorResult& aRv) { 39 if (!mStartCallback) { 40 // Step 2: Let startAlgorithm be an algorithm that returns undefined. 41 aRetVal.setUndefined(); 42 return; 43 } 44 45 // Step 6: If underlyingSinkDict["start"] exists, then set startAlgorithm to 46 // an algorithm which returns the result of invoking 47 // underlyingSinkDict["start"] with argument list « controller » and callback 48 // this value underlyingSink. 49 JS::Rooted<JSObject*> thisObj(aCx, mUnderlyingSink); 50 return mStartCallback->Call(thisObj, aController, aRetVal, aRv, 51 "UnderlyingSink.start", 52 CallbackFunction::eRethrowExceptions); 53 } 54 55 // https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink 56 already_AddRefed<Promise> UnderlyingSinkAlgorithms::WriteCallback( 57 JSContext* aCx, JS::Handle<JS::Value> aChunk, 58 WritableStreamDefaultController& aController, ErrorResult& aRv) { 59 if (!mWriteCallback) { 60 // Step 3: Let writeAlgorithm be an algorithm that returns a promise 61 // resolved with undefined. 62 return Promise::CreateResolvedWithUndefined(mGlobal, aRv); 63 } 64 65 // Step 7: If underlyingSinkDict["write"] exists, then set writeAlgorithm to 66 // an algorithm which takes an argument chunk and returns the result of 67 // invoking underlyingSinkDict["write"] with argument list « chunk, controller 68 // » and callback this value underlyingSink. 69 JS::Rooted<JSObject*> thisObj(aCx, mUnderlyingSink); 70 RefPtr<Promise> promise = mWriteCallback->Call( 71 thisObj, aChunk, aController, aRv, "UnderlyingSink.write", 72 CallbackFunction::eRethrowExceptions); 73 return promise.forget(); 74 } 75 76 // https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink 77 already_AddRefed<Promise> UnderlyingSinkAlgorithms::CloseCallback( 78 JSContext* aCx, ErrorResult& aRv) { 79 if (!mCloseCallback) { 80 // Step 4: Let closeAlgorithm be an algorithm that returns a promise 81 // resolved with undefined. 82 return Promise::CreateResolvedWithUndefined(mGlobal, aRv); 83 } 84 85 // Step 8: If underlyingSinkDict["close"] exists, then set closeAlgorithm to 86 // an algorithm which returns the result of invoking 87 // underlyingSinkDict["close"] with argument list «» and callback this value 88 // underlyingSink. 89 JS::Rooted<JSObject*> thisObj(aCx, mUnderlyingSink); 90 RefPtr<Promise> promise = 91 mCloseCallback->Call(thisObj, aRv, "UnderlyingSink.close", 92 CallbackFunction::eRethrowExceptions); 93 return promise.forget(); 94 } 95 96 // https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink 97 already_AddRefed<Promise> UnderlyingSinkAlgorithms::AbortCallback( 98 JSContext* aCx, const Optional<JS::Handle<JS::Value>>& aReason, 99 ErrorResult& aRv) { 100 if (!mAbortCallback) { 101 // Step 5: Let abortAlgorithm be an algorithm that returns a promise 102 // resolved with undefined. 103 return Promise::CreateResolvedWithUndefined(mGlobal, aRv); 104 } 105 106 // Step 9: Let abortAlgorithm be an algorithm that returns a promise resolved 107 // with undefined. 108 JS::Rooted<JSObject*> thisObj(aCx, mUnderlyingSink); 109 RefPtr<Promise> promise = 110 mAbortCallback->Call(thisObj, aReason, aRv, "UnderlyingSink.abort", 111 CallbackFunction::eRethrowExceptions); 112 113 return promise.forget(); 114 } 115 116 // https://streams.spec.whatwg.org/#writable-set-up 117 // This one is not covered by the above section as the spec expects any spec 118 // implementation to explicitly return a promise for this callback. It's still 119 // useful for Gecko as error handling is very frequently done with 120 // ErrorResult instead of a rejected promise. See also 121 // https://github.com/whatwg/streams/issues/1253. 122 already_AddRefed<Promise> UnderlyingSinkAlgorithmsWrapper::WriteCallback( 123 JSContext* aCx, JS::Handle<JS::Value> aChunk, 124 WritableStreamDefaultController& aController, ErrorResult& aRv) { 125 nsCOMPtr<nsIGlobalObject> global = xpc::CurrentNativeGlobal(aCx); 126 return PromisifyAlgorithm( 127 global, 128 [&](ErrorResult& aRv) { 129 return WriteCallbackImpl(aCx, aChunk, aController, aRv); 130 }, 131 aRv); 132 } 133 134 // https://streams.spec.whatwg.org/#writable-set-up 135 // Step 2.1: Let closeAlgorithmWrapper be an algorithm that runs these steps: 136 already_AddRefed<Promise> UnderlyingSinkAlgorithmsWrapper::CloseCallback( 137 JSContext* aCx, ErrorResult& aRv) { 138 nsCOMPtr<nsIGlobalObject> global = xpc::CurrentNativeGlobal(aCx); 139 return PromisifyAlgorithm( 140 global, [&](ErrorResult& aRv) { return CloseCallbackImpl(aCx, aRv); }, 141 aRv); 142 } 143 144 // https://streams.spec.whatwg.org/#writable-set-up 145 // Step 3.1: Let abortAlgorithmWrapper be an algorithm that runs these steps: 146 already_AddRefed<Promise> UnderlyingSinkAlgorithmsWrapper::AbortCallback( 147 JSContext* aCx, const Optional<JS::Handle<JS::Value>>& aReason, 148 ErrorResult& aRv) { 149 nsCOMPtr<nsIGlobalObject> global = xpc::CurrentNativeGlobal(aCx); 150 return PromisifyAlgorithm( 151 global, 152 [&](ErrorResult& aRv) { return AbortCallbackImpl(aCx, aReason, aRv); }, 153 aRv); 154 } 155 156 NS_IMPL_ISUPPORTS_CYCLE_COLLECTION_INHERITED(WritableStreamToOutput, 157 UnderlyingSinkAlgorithmsBase, 158 nsIOutputStreamCallback) 159 NS_IMPL_CYCLE_COLLECTION_INHERITED(WritableStreamToOutput, 160 UnderlyingSinkAlgorithmsBase, mParent, 161 mOutput, mPromise) 162 163 NS_IMETHODIMP 164 WritableStreamToOutput::OnOutputStreamReady(nsIAsyncOutputStream* aStream) { 165 if (!mData) { 166 return NS_OK; 167 } 168 MOZ_ASSERT(mPromise); 169 uint32_t written = 0; 170 nsresult rv = mOutput->Write( 171 reinterpret_cast<const char*>(mData->Elements() + mWritten), 172 mData->Length() - mWritten, &written); 173 if (NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK) { 174 mPromise->MaybeRejectWithAbortError("Error writing to stream"_ns); 175 ClearData(); 176 // XXX should we add mErrored and fail future calls immediately? 177 // I presume new calls to Write() will fail though, too 178 return rv; 179 } 180 if (NS_SUCCEEDED(rv)) { 181 mWritten += written; 182 MOZ_ASSERT(mWritten <= mData->Length()); 183 if (mWritten >= mData->Length()) { 184 mPromise->MaybeResolveWithUndefined(); 185 ClearData(); 186 return NS_OK; 187 } 188 // more to write 189 } 190 // wrote partial or nothing 191 // Wait for space 192 nsCOMPtr<nsIEventTarget> target = mozilla::GetCurrentSerialEventTarget(); 193 rv = mOutput->AsyncWait(this, 0, 0, target); 194 if (NS_FAILED(rv)) { 195 mPromise->MaybeRejectWithUnknownError("error waiting to write data"); 196 ClearData(); 197 // XXX should we add mErrored and fail future calls immediately? 198 // New calls to Write() will fail, note 199 // See step 5.2 of 200 // https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write. 201 return rv; 202 } 203 return NS_OK; 204 } 205 206 already_AddRefed<Promise> WritableStreamToOutput::WriteCallbackImpl( 207 JSContext* aCx, JS::Handle<JS::Value> aChunk, 208 WritableStreamDefaultController& aController, ErrorResult& aRv) { 209 BufferSource data; 210 if (!data.Init(aCx, aChunk)) { 211 aRv.MightThrowJSException(); 212 aRv.StealExceptionFromJSContext(aCx); 213 return nullptr; 214 } 215 // buffer/bufferView 216 MOZ_ASSERT(data.IsArrayBuffer() || data.IsArrayBufferView()); 217 218 RefPtr<Promise> promise = Promise::Create(mParent, aRv); 219 if (NS_WARN_IF(aRv.Failed())) { 220 return nullptr; 221 } 222 223 // Try to write first, and only enqueue data if we were already blocked 224 // or the write didn't write it all. This avoids allocations and copies 225 // in common cases. 226 MOZ_ASSERT(!mPromise); 227 MOZ_ASSERT(mWritten == 0); 228 uint32_t written = 0; 229 ProcessTypedArraysFixed(data, [&](const Span<uint8_t>& aData) { 230 Span<uint8_t> dataSpan = aData; 231 nsresult rv = mOutput->Write(mozilla::AsChars(dataSpan).Elements(), 232 dataSpan.Length(), &written); 233 if (NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK) { 234 promise->MaybeRejectWithAbortError("error writing data"); 235 return; 236 } 237 if (NS_SUCCEEDED(rv)) { 238 if (written == dataSpan.Length()) { 239 promise->MaybeResolveWithUndefined(); 240 return; 241 } 242 dataSpan = dataSpan.From(written); 243 } 244 245 auto buffer = Buffer<uint8_t>::CopyFrom(dataSpan); 246 if (buffer.isNothing()) { 247 promise->MaybeReject(NS_ERROR_OUT_OF_MEMORY); 248 return; 249 } 250 mData = std::move(buffer); 251 }); 252 253 if (promise->State() != Promise::PromiseState::Pending) { 254 return promise.forget(); 255 } 256 257 mPromise = promise; 258 259 nsCOMPtr<nsIEventTarget> target = mozilla::GetCurrentSerialEventTarget(); 260 nsresult rv = mOutput->AsyncWait(this, 0, 0, target); 261 if (NS_FAILED(rv)) { 262 ClearData(); 263 promise->MaybeRejectWithUnknownError("error waiting to write data"); 264 } 265 return promise.forget(); 266 } 267 268 already_AddRefed<Promise> WritableStreamToOutput::AbortCallbackImpl( 269 JSContext* aCx, const Optional<JS::Handle<JS::Value>>& aReason, 270 ErrorResult& aRv) { 271 // https://streams.spec.whatwg.org/#writablestream-set-up 272 // Step 3. Let abortAlgorithmWrapper be an algorithm that runs these steps: 273 274 if (aReason.WasPassed() && aReason.Value().isObject()) { 275 JS::Rooted<JSObject*> obj(aCx, &aReason.Value().toObject()); 276 RefPtr<WebTransportError> error; 277 UnwrapObject<prototypes::id::WebTransportError, WebTransportError>( 278 obj, error, nullptr); 279 if (error) { 280 mOutput->CloseWithStatus(net::GetNSResultFromWebTransportError( 281 error->GetStreamErrorCode().Value())); 282 return nullptr; 283 } 284 } 285 286 // XXX The close or rather a dedicated abort should be async. For now we have 287 // to always fall back to the Step 3.3 below. 288 // XXX how do we know this stream is used by webtransport? 289 mOutput->CloseWithStatus(NS_ERROR_WEBTRANSPORT_CODE_BASE); 290 291 // Step 3.3. Return a promise resolved with undefined. 292 // Wrapper handles this 293 return nullptr; 294 } 295 296 void WritableStreamToOutput::ReleaseObjects() { mOutput->Close(); }