FetchStreamReader.cpp (15395B)
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */ 3 /* This Source Code Form is subject to the terms of the Mozilla Public 4 * License, v. 2.0. If a copy of the MPL was not distributed with this 5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 6 7 #include "FetchStreamReader.h" 8 9 #include "InternalResponse.h" 10 #include "jsapi.h" 11 #include "mozilla/ConsoleReportCollector.h" 12 #include "mozilla/ErrorResult.h" 13 #include "mozilla/HoldDropJSObjects.h" 14 #include "mozilla/StaticAnalysisFunctions.h" 15 #include "mozilla/dom/AutoEntryScript.h" 16 #include "mozilla/dom/Promise.h" 17 #include "mozilla/dom/PromiseBinding.h" 18 #include "mozilla/dom/ReadableStream.h" 19 #include "mozilla/dom/ReadableStreamDefaultController.h" 20 #include "mozilla/dom/ReadableStreamDefaultReader.h" 21 #include "mozilla/dom/WorkerPrivate.h" 22 #include "mozilla/dom/WorkerRef.h" 23 #include "nsContentUtils.h" 24 #include "nsDebug.h" 25 #include "nsIAsyncInputStream.h" 26 #include "nsIPipe.h" 27 #include "nsIScriptError.h" 28 #include "nsPIDOMWindow.h" 29 30 namespace mozilla::dom { 31 32 NS_IMPL_ISUPPORTS(OutputStreamHolder, nsIOutputStreamCallback) 33 34 OutputStreamHolder::OutputStreamHolder(FetchStreamReader* aReader, 35 nsIAsyncOutputStream* aOutput) 36 : mReader(aReader), mOutput(aOutput) {} 37 38 nsresult OutputStreamHolder::Init(JSContext* aCx) { 39 if (NS_IsMainThread()) { 40 return NS_OK; 41 } 42 43 // We're in a worker 44 WorkerPrivate* workerPrivate = GetWorkerPrivateFromContext(aCx); 45 MOZ_ASSERT(workerPrivate); 46 47 workerPrivate->AssertIsOnWorkerThread(); 48 49 // Note, this will create a ref-cycle between the holder and the stream. 50 // The cycle is broken when the stream is closed or the worker begins 51 // shutting down. 52 mWorkerRef = 53 StrongWorkerRef::Create(workerPrivate, "OutputStreamHolder", 54 [self = RefPtr{this}]() { self->Shutdown(); }); 55 if (NS_WARN_IF(!mWorkerRef)) { 56 return NS_ERROR_FAILURE; 57 } 58 return NS_OK; 59 } 60 61 OutputStreamHolder::~OutputStreamHolder() = default; 62 63 void OutputStreamHolder::Shutdown() { 64 if (mOutput) { 65 mOutput->Close(); 66 } 67 // If we have an AsyncWait running, we'll get a callback and clear 68 // the mAsyncWaitWorkerRef 69 mWorkerRef = nullptr; 70 } 71 72 nsresult OutputStreamHolder::AsyncWait(uint32_t aFlags, 73 uint32_t aRequestedCount, 74 nsIEventTarget* aEventTarget) { 75 mAsyncWaitWorkerRef = mWorkerRef; 76 // Grab the strong reference for the reader but only when we are waiting for 77 // the output stream, because it means we still have things to write. 78 // (WAIT_CLOSURE_ONLY happens when waiting for ReadableStream to respond, at 79 // which point the pull callback should get an indirect strong reference via 80 // the controller argument.) 81 mAsyncWaitReader = 82 aFlags == nsIAsyncOutputStream::WAIT_CLOSURE_ONLY ? nullptr : mReader; 83 nsresult rv = mOutput->AsyncWait(this, aFlags, aRequestedCount, aEventTarget); 84 if (NS_WARN_IF(NS_FAILED(rv))) { 85 mAsyncWaitWorkerRef = nullptr; 86 mAsyncWaitReader = nullptr; 87 } 88 return rv; 89 } 90 91 NS_IMETHODIMP OutputStreamHolder::OnOutputStreamReady( 92 nsIAsyncOutputStream* aStream) { 93 // We may get called back after ::Shutdown() 94 if (!mReader) { 95 mAsyncWaitWorkerRef = nullptr; 96 MOZ_ASSERT(!mAsyncWaitReader); 97 return NS_OK; 98 } 99 100 // mAsyncWaitReader may be reset during OnOutputStreamReady, make sure to let 101 // it live during the call 102 RefPtr<FetchStreamReader> reader = mReader.get(); 103 if (!reader->OnOutputStreamReady()) { 104 mAsyncWaitWorkerRef = nullptr; 105 mAsyncWaitReader = nullptr; 106 return NS_OK; 107 } 108 return NS_OK; 109 } 110 111 NS_IMPL_CYCLE_COLLECTING_ADDREF(FetchStreamReader) 112 NS_IMPL_CYCLE_COLLECTING_RELEASE(FetchStreamReader) 113 114 NS_IMPL_CYCLE_COLLECTION_WEAK_PTR(FetchStreamReader, mGlobal, mReader) 115 116 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(FetchStreamReader) 117 NS_INTERFACE_MAP_ENTRY(nsISupports) 118 NS_INTERFACE_MAP_END 119 120 /* static */ 121 nsresult FetchStreamReader::Create(JSContext* aCx, nsIGlobalObject* aGlobal, 122 FetchStreamReader** aStreamReader, 123 nsIInputStream** aInputStream) { 124 MOZ_ASSERT(aCx); 125 MOZ_ASSERT(aGlobal); 126 MOZ_ASSERT(aStreamReader); 127 MOZ_ASSERT(aInputStream); 128 129 RefPtr<FetchStreamReader> streamReader = new FetchStreamReader(aGlobal); 130 131 nsCOMPtr<nsIAsyncInputStream> pipeIn; 132 nsCOMPtr<nsIAsyncOutputStream> pipeOut; 133 134 NS_NewPipe2(getter_AddRefs(pipeIn), getter_AddRefs(pipeOut), true, true, 0, 135 0); 136 137 streamReader->mOutput = new OutputStreamHolder(streamReader, pipeOut); 138 139 pipeIn.forget(aInputStream); 140 streamReader.forget(aStreamReader); 141 return NS_OK; 142 } 143 144 FetchStreamReader::FetchStreamReader(nsIGlobalObject* aGlobal) 145 : mGlobal(aGlobal), mOwningEventTarget(mGlobal->SerialEventTarget()) { 146 MOZ_ASSERT(aGlobal); 147 } 148 149 FetchStreamReader::~FetchStreamReader() { 150 CloseAndRelease(nullptr, NS_BASE_STREAM_CLOSED); 151 } 152 153 // If a context is provided, an attempt will be made to cancel the reader. The 154 // only situation where we don't expect to have a context is when closure is 155 // being triggered from the destructor or the WorkerRef is notifying. If 156 // we're at the destructor, it's far too late to cancel anything. And if the 157 // WorkerRef is being notified, the global is going away, so there's also 158 // no need to do further JS work. 159 void FetchStreamReader::CloseAndRelease(JSContext* aCx, nsresult aStatus) { 160 NS_ASSERT_OWNINGTHREAD(FetchStreamReader); 161 162 if (mStreamClosed) { 163 // Already closed. 164 return; 165 } 166 167 RefPtr<FetchStreamReader> kungFuDeathGrip = this; 168 if (aCx && mReader) { 169 ErrorResult rv; 170 if (aStatus == NS_ERROR_DOM_WRONG_TYPE_ERR) { 171 rv.ThrowTypeError<MSG_FETCH_BODY_WRONG_TYPE>(); 172 } else { 173 rv = aStatus; 174 } 175 JS::Rooted<JS::Value> errorValue(aCx); 176 if (ToJSValue(aCx, std::move(rv), &errorValue)) { 177 IgnoredErrorResult ignoredError; 178 // It's currently safe to cancel an already closed reader because, per the 179 // comments in ReadableStream::cancel() conveying the spec, step 2 of 180 // 3.4.3 that specified ReadableStreamCancel is: If stream.[[state]] is 181 // "closed", return a new promise resolved with undefined. 182 RefPtr<Promise> cancelResultPromise = 183 MOZ_KnownLive(mReader)->Cancel(aCx, errorValue, ignoredError); 184 NS_WARNING_ASSERTION(!ignoredError.Failed(), 185 "Failed to cancel stream during close and release"); 186 if (cancelResultPromise) { 187 bool setHandled = cancelResultPromise->SetAnyPromiseIsHandled(); 188 NS_WARNING_ASSERTION(setHandled, 189 "Failed to mark cancel promise as handled."); 190 (void)setHandled; 191 } 192 } 193 194 // We don't want to propagate exceptions during the cleanup. 195 JS_ClearPendingException(aCx); 196 } 197 198 mStreamClosed = true; 199 200 mGlobal = nullptr; 201 202 if (mOutput) { 203 mOutput->CloseWithStatus(aStatus); 204 mOutput->Shutdown(); 205 mOutput = nullptr; 206 } 207 208 mReader = nullptr; 209 mBuffer.Clear(); 210 } 211 212 // https://fetch.spec.whatwg.org/#body-incrementally-read 213 void FetchStreamReader::StartConsuming(JSContext* aCx, ReadableStream* aStream, 214 ErrorResult& aRv) { 215 MOZ_DIAGNOSTIC_ASSERT(!mReader); 216 MOZ_DIAGNOSTIC_ASSERT(aStream); 217 MOZ_ASSERT(!aStream->MaybeGetInputStreamIfUnread(), 218 "FetchStreamReader is for JS streams but we got a stream based on " 219 "nsIInputStream here. Extract nsIInputStream and read it instead " 220 "to reduce overhead."); 221 222 aRv = mOutput->Init(aCx); 223 if (aRv.Failed()) { 224 CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR); 225 return; 226 } 227 228 // Step 2: Let reader be the result of getting a reader for body’s stream. 229 RefPtr<ReadableStreamDefaultReader> reader = aStream->GetReader(aRv); 230 if (aRv.Failed()) { 231 CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR); 232 return; 233 } 234 235 mReader = reader; 236 237 aRv = mOutput->AsyncWait(0, 0, mOwningEventTarget); 238 if (NS_WARN_IF(aRv.Failed())) { 239 CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR); 240 } 241 } 242 243 struct FetchReadRequest : public ReadRequest { 244 public: 245 NS_DECL_ISUPPORTS_INHERITED 246 NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(FetchReadRequest, ReadRequest) 247 248 explicit FetchReadRequest(FetchStreamReader* aReader) 249 : mFetchStreamReader(aReader) {} 250 251 MOZ_CAN_RUN_SCRIPT_BOUNDARY 252 void ChunkSteps(JSContext* aCx, JS::Handle<JS::Value> aChunk, 253 ErrorResult& aRv) override { 254 mFetchStreamReader->ChunkSteps(aCx, aChunk, aRv); 255 } 256 257 MOZ_CAN_RUN_SCRIPT_BOUNDARY 258 void CloseSteps(JSContext* aCx, ErrorResult& aRv) override { 259 mFetchStreamReader->CloseSteps(aCx, aRv); 260 } 261 262 MOZ_CAN_RUN_SCRIPT_BOUNDARY 263 void ErrorSteps(JSContext* aCx, JS::Handle<JS::Value> aError, 264 ErrorResult& aRv) override { 265 mFetchStreamReader->ErrorSteps(aCx, aError, aRv); 266 } 267 268 protected: 269 virtual ~FetchReadRequest() = default; 270 271 MOZ_KNOWN_LIVE RefPtr<FetchStreamReader> mFetchStreamReader; 272 }; 273 274 NS_IMPL_CYCLE_COLLECTION_INHERITED(FetchReadRequest, ReadRequest, 275 mFetchStreamReader) 276 NS_IMPL_ADDREF_INHERITED(FetchReadRequest, ReadRequest) 277 NS_IMPL_RELEASE_INHERITED(FetchReadRequest, ReadRequest) 278 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(FetchReadRequest) 279 NS_INTERFACE_MAP_END_INHERITING(ReadRequest) 280 281 // nsIOutputStreamCallback interface 282 MOZ_CAN_RUN_SCRIPT_BOUNDARY 283 bool FetchStreamReader::OnOutputStreamReady() { 284 NS_ASSERT_OWNINGTHREAD(FetchStreamReader); 285 if (mStreamClosed) { 286 return false; 287 } 288 289 AutoEntryScript aes(mGlobal, "ReadableStreamReader.read"); 290 return Process(aes.cx()); 291 } 292 293 bool FetchStreamReader::Process(JSContext* aCx) { 294 NS_ASSERT_OWNINGTHREAD(FetchStreamReader); 295 MOZ_ASSERT(mReader); 296 297 if (!mBuffer.IsEmpty()) { 298 nsresult rv = WriteBuffer(); 299 if (NS_WARN_IF(NS_FAILED(rv))) { 300 CloseAndRelease(aCx, NS_ERROR_DOM_ABORT_ERR); 301 return false; 302 } 303 return true; 304 } 305 306 // Check if the output stream has already been closed. This lets us propagate 307 // errors eagerly, and detect output stream closures even when we have no data 308 // to write. 309 if (NS_WARN_IF(NS_FAILED(mOutput->StreamStatus()))) { 310 CloseAndRelease(aCx, NS_ERROR_DOM_ABORT_ERR); 311 return false; 312 } 313 314 // We're waiting on new data - set up a WAIT_CLOSURE_ONLY callback so we 315 // notice if the reader closes. 316 nsresult rv = mOutput->AsyncWait(nsIAsyncOutputStream::WAIT_CLOSURE_ONLY, 0, 317 mOwningEventTarget); 318 if (NS_WARN_IF(NS_FAILED(rv))) { 319 CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR); 320 return false; 321 } 322 323 // If we already have an outstanding read request, don't start another one 324 // concurrently. 325 if (!mHasOutstandingReadRequest) { 326 // https://fetch.spec.whatwg.org/#incrementally-read-loop 327 // The below very loosely tries to implement the incrementally-read-loop 328 // from the fetch spec. 329 // Step 2: Read a chunk from reader given readRequest. 330 RefPtr<ReadRequest> readRequest = new FetchReadRequest(this); 331 RefPtr<ReadableStreamDefaultReader> reader = mReader; 332 mHasOutstandingReadRequest = true; 333 334 IgnoredErrorResult err; 335 reader->ReadChunk(aCx, *readRequest, err); 336 if (NS_WARN_IF(err.Failed())) { 337 // Let's close the stream. 338 mHasOutstandingReadRequest = false; 339 CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR); 340 // Don't return false, as we've already called `AsyncWait`. 341 } 342 } 343 return true; 344 } 345 346 void FetchStreamReader::ChunkSteps(JSContext* aCx, JS::Handle<JS::Value> aChunk, 347 ErrorResult& aRv) { 348 // This roughly implements the chunk steps from 349 // https://fetch.spec.whatwg.org/#incrementally-read-loop. 350 351 mHasOutstandingReadRequest = false; 352 353 // Step 2. If chunk is not a Uint8Array object, then set continueAlgorithm to 354 // this step: run processBodyError given a TypeError. 355 RootedSpiderMonkeyInterface<Uint8Array> chunk(aCx); 356 if (!aChunk.isObject() || !chunk.Init(&aChunk.toObject())) { 357 CloseAndRelease(aCx, NS_ERROR_DOM_WRONG_TYPE_ERR); 358 return; 359 } 360 361 MOZ_DIAGNOSTIC_ASSERT(mBuffer.IsEmpty()); 362 363 // Let's take a copy of the data. 364 // FIXME: We could sometimes avoid this copy by trying to write `chunk` 365 // directly into `mPipeOut` eagerly, and only filling `mBuffer` if there isn't 366 // enough space in the pipe's buffer. 367 if (!chunk.AppendDataTo(mBuffer)) { 368 CloseAndRelease(aCx, NS_ERROR_OUT_OF_MEMORY); 369 return; 370 } 371 372 mBufferOffset = 0; 373 mBufferRemaining = mBuffer.Length(); 374 375 nsresult rv = WriteBuffer(); 376 if (NS_WARN_IF(NS_FAILED(rv))) { 377 CloseAndRelease(aCx, NS_ERROR_DOM_ABORT_ERR); 378 } 379 } 380 381 void FetchStreamReader::CloseSteps(JSContext* aCx, ErrorResult& aRv) { 382 mHasOutstandingReadRequest = false; 383 CloseAndRelease(aCx, NS_BASE_STREAM_CLOSED); 384 } 385 386 void FetchStreamReader::ErrorSteps(JSContext* aCx, JS::Handle<JS::Value> aError, 387 ErrorResult& aRv) { 388 mHasOutstandingReadRequest = false; 389 ReportErrorToConsole(aCx, aError); 390 CloseAndRelease(aCx, NS_ERROR_FAILURE); 391 } 392 393 nsresult FetchStreamReader::WriteBuffer() { 394 MOZ_ASSERT(mBuffer.Length() == (mBufferOffset + mBufferRemaining)); 395 396 char* data = reinterpret_cast<char*>(mBuffer.Elements()); 397 398 while (mBufferRemaining > 0) { 399 uint32_t written = 0; 400 nsresult rv = 401 mOutput->Write(data + mBufferOffset, mBufferRemaining, &written); 402 403 if (rv == NS_BASE_STREAM_WOULD_BLOCK) { 404 break; 405 } 406 407 if (NS_WARN_IF(NS_FAILED(rv))) { 408 return rv; 409 } 410 411 MOZ_ASSERT(written <= mBufferRemaining); 412 mBufferRemaining -= written; 413 mBufferOffset += written; 414 415 if (mBufferRemaining == 0) { 416 mBuffer.Clear(); 417 break; 418 } 419 } 420 421 nsresult rv = mOutput->AsyncWait(0, 0, mOwningEventTarget); 422 if (NS_WARN_IF(NS_FAILED(rv))) { 423 return rv; 424 } 425 426 return NS_OK; 427 } 428 429 void FetchStreamReader::ReportErrorToConsole(JSContext* aCx, 430 JS::Handle<JS::Value> aValue) { 431 nsCString sourceSpec; 432 uint32_t line = 0; 433 uint32_t column = 0; 434 nsString valueString; 435 436 nsContentUtils::ExtractErrorValues(aCx, aValue, sourceSpec, &line, &column, 437 valueString); 438 439 nsTArray<nsString> params; 440 params.AppendElement(valueString); 441 442 RefPtr<ConsoleReportCollector> reporter = new ConsoleReportCollector(); 443 reporter->AddConsoleReport(nsIScriptError::errorFlag, 444 "ReadableStreamReader.read"_ns, 445 nsContentUtils::eDOM_PROPERTIES, sourceSpec, line, 446 column, "ReadableStreamReadingFailed"_ns, params); 447 448 uint64_t innerWindowId = 0; 449 450 if (NS_IsMainThread()) { 451 nsCOMPtr<nsPIDOMWindowInner> window = do_QueryInterface(mGlobal); 452 if (window) { 453 innerWindowId = window->WindowID(); 454 } 455 reporter->FlushReportsToConsole(innerWindowId); 456 return; 457 } 458 459 WorkerPrivate* workerPrivate = GetWorkerPrivateFromContext(aCx); 460 if (workerPrivate) { 461 innerWindowId = workerPrivate->WindowID(); 462 } 463 464 RefPtr<Runnable> r = NS_NewRunnableFunction( 465 "FetchStreamReader::ReportErrorToConsole", [reporter, innerWindowId]() { 466 reporter->FlushReportsToConsole(innerWindowId); 467 }); 468 469 workerPrivate->DispatchToMainThread(r.forget()); 470 } 471 472 } // namespace mozilla::dom