ReadStream.cpp (17887B)
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */ 3 /* This Source Code Form is subject to the terms of the Mozilla Public 4 * License, v. 2.0. If a copy of the MPL was not distributed with this 5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 6 7 #include "mozilla/dom/cache/ReadStream.h" 8 9 #include "mozilla/SnappyUncompressInputStream.h" 10 #include "mozilla/dom/cache/CacheStreamControlChild.h" 11 #include "mozilla/dom/cache/CacheStreamControlParent.h" 12 #include "mozilla/dom/cache/CacheTypes.h" 13 #include "mozilla/ipc/IPCStreamUtils.h" 14 #include "nsIAsyncInputStream.h" 15 #include "nsIThread.h" 16 #include "nsStringStream.h" 17 #include "nsTArray.h" 18 19 namespace mozilla::dom::cache { 20 21 // ---------------------------------------------------------------------------- 22 23 // The inner stream class. This is where all of the real work is done. As 24 // an invariant Inner::Close() must be called before ~Inner(). This is 25 // guaranteed by our outer ReadStream class. 26 class ReadStream::Inner final : public ReadStream::Controllable { 27 public: 28 Inner(StreamControl* aControl, const nsID& aId, nsIInputStream* aStream); 29 30 void Serialize(Maybe<CacheReadStream>* aReadStreamOut, ErrorResult& aRv); 31 32 void Serialize(CacheReadStream* aReadStreamOut, ErrorResult& aRv); 33 34 // ReadStream::Controllable methods 35 virtual void CloseStream() override; 36 37 virtual void CloseStreamWithoutReporting() override; 38 39 virtual bool HasEverBeenRead() const override; 40 41 // Simulate nsIInputStream methods, but we don't actually inherit from it 42 nsresult Close(); 43 44 nsresult Available(uint64_t* aNumAvailableOut); 45 46 nsresult StreamStatus(); 47 48 nsresult Read(char* aBuf, uint32_t aCount, uint32_t* aNumReadOut); 49 50 nsresult ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, 51 uint32_t aCount, uint32_t* aNumReadOut); 52 53 nsresult IsNonBlocking(bool* aNonBlockingOut); 54 55 NS_DECL_OWNINGTHREAD; 56 57 ~Inner(); 58 59 private: 60 class NoteClosedRunnable; 61 class ForgetRunnable; 62 63 void NoteClosed(); 64 65 void Forget(); 66 67 void NoteClosedOnOwningThread(); 68 69 void ForgetOnOwningThread(); 70 71 nsIInputStream* EnsureStream(); 72 73 void AsyncOpenStreamOnOwningThread(); 74 75 void MaybeAbortAsyncOpenStream(); 76 77 void OpenStreamFailed(); 78 79 inline SafeRefPtr<Inner> SafeRefPtrFromThis() { 80 return Controllable::SafeRefPtrFromThis().downcast<Inner>(); 81 } 82 83 // Weak ref to the stream control actor. The actor will always call either 84 // CloseStream() or CloseStreamWithoutReporting() before it's destroyed. The 85 // weak ref is cleared in the resulting NoteClosedOnOwningThread() or 86 // ForgetOnOwningThread() method call. 87 StreamControl* mControl; 88 89 const nsID mId; 90 nsCOMPtr<nsISerialEventTarget> mOwningEventTarget; 91 92 enum State { Open, Closed, NumStates }; 93 Atomic<State> mState; 94 Atomic<bool> mHasEverBeenRead; 95 bool mAsyncOpenStarted; 96 97 // The wrapped stream objects may not be threadsafe. We need to be able 98 // to close a stream on our owning thread while an IO thread is simultaneously 99 // reading the same stream. Therefore, protect all access to these stream 100 // objects with a mutex. 101 Mutex mMutex MOZ_UNANNOTATED; 102 CondVar mCondVar; 103 nsCOMPtr<nsIInputStream> mStream; 104 nsCOMPtr<nsIInputStream> mSnappyStream; 105 }; 106 107 // ---------------------------------------------------------------------------- 108 109 // Runnable to notify actors that the ReadStream has closed. This must 110 // be done on the thread associated with the PBackground actor. Must be 111 // cancelable to execute on Worker threads (which can occur when the 112 // ReadStream is constructed on a child process Worker thread). 113 class ReadStream::Inner::NoteClosedRunnable final : public CancelableRunnable { 114 public: 115 explicit NoteClosedRunnable(SafeRefPtr<ReadStream::Inner> aStream) 116 : CancelableRunnable("dom::cache::ReadStream::Inner::NoteClosedRunnable"), 117 mStream(std::move(aStream)) {} 118 119 NS_IMETHOD Run() override { 120 mStream->NoteClosedOnOwningThread(); 121 return NS_OK; 122 } 123 124 // Note, we must proceed with the Run() method since our actor will not 125 // clean itself up until we note that the stream is closed. 126 nsresult Cancel() override { 127 Run(); 128 return NS_OK; 129 } 130 131 private: 132 ~NoteClosedRunnable() = default; 133 134 const SafeRefPtr<ReadStream::Inner> mStream; 135 }; 136 137 // ---------------------------------------------------------------------------- 138 139 // Runnable to clear actors without reporting that the ReadStream has 140 // closed. Since this can trigger actor destruction, we need to do 141 // it on the thread associated with the PBackground actor. Must be 142 // cancelable to execute on Worker threads (which can occur when the 143 // ReadStream is constructed on a child process Worker thread). 144 class ReadStream::Inner::ForgetRunnable final : public CancelableRunnable { 145 public: 146 explicit ForgetRunnable(SafeRefPtr<ReadStream::Inner> aStream) 147 : CancelableRunnable("dom::cache::ReadStream::Inner::ForgetRunnable"), 148 mStream(std::move(aStream)) {} 149 150 NS_IMETHOD Run() override { 151 mStream->ForgetOnOwningThread(); 152 return NS_OK; 153 } 154 155 // Note, we must proceed with the Run() method so that we properly 156 // call RemoveListener on the actor. 157 nsresult Cancel() override { 158 Run(); 159 return NS_OK; 160 } 161 162 private: 163 ~ForgetRunnable() = default; 164 165 const SafeRefPtr<ReadStream::Inner> mStream; 166 }; 167 168 // ---------------------------------------------------------------------------- 169 170 ReadStream::Inner::Inner(StreamControl* aControl, const nsID& aId, 171 nsIInputStream* aStream) 172 : mControl(aControl), 173 mId(aId), 174 mOwningEventTarget(GetCurrentSerialEventTarget()), 175 mState(Open), 176 mHasEverBeenRead(false), 177 mAsyncOpenStarted(false), 178 mMutex("dom::cache::ReadStream"), 179 mCondVar(mMutex, "dom::cache::ReadStream"), 180 mStream(aStream), 181 mSnappyStream(aStream ? new SnappyUncompressInputStream(aStream) 182 : nullptr) { 183 MOZ_DIAGNOSTIC_ASSERT(mControl); 184 mControl->AddReadStream(SafeRefPtrFromThis()); 185 } 186 187 void ReadStream::Inner::Serialize(Maybe<CacheReadStream>* aReadStreamOut, 188 ErrorResult& aRv) { 189 MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread()); 190 MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut); 191 aReadStreamOut->emplace(CacheReadStream()); 192 Serialize(&aReadStreamOut->ref(), aRv); 193 } 194 195 void ReadStream::Inner::Serialize(CacheReadStream* aReadStreamOut, 196 ErrorResult& aRv) { 197 MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread()); 198 MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut); 199 200 if (mState != Open) { 201 aRv.ThrowTypeError( 202 "Response body is a cache file stream that has already been closed."); 203 return; 204 } 205 206 MOZ_DIAGNOSTIC_ASSERT(mControl); 207 208 aReadStreamOut->id() = mId; 209 mControl->SerializeControl(aReadStreamOut); 210 211 { 212 MutexAutoLock lock(mMutex); 213 mControl->SerializeStream(aReadStreamOut, mStream); 214 } 215 216 MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut->stream().isNothing() || 217 aReadStreamOut->stream().ref().stream().type() != 218 mozilla::ipc::InputStreamParams::T__None); 219 220 // We're passing ownership across the IPC barrier with the control, so 221 // do not signal that the stream is closed here. 222 Forget(); 223 } 224 225 void ReadStream::Inner::CloseStream() { 226 MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread()); 227 MOZ_ALWAYS_SUCCEEDS(Close()); 228 } 229 230 void ReadStream::Inner::CloseStreamWithoutReporting() { 231 MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread()); 232 Forget(); 233 } 234 235 bool ReadStream::Inner::HasEverBeenRead() const { 236 MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread()); 237 return mHasEverBeenRead; 238 } 239 240 nsresult ReadStream::Inner::Close() { 241 // stream ops can happen on any thread 242 nsresult rv = NS_OK; 243 { 244 MutexAutoLock lock(mMutex); 245 if (mSnappyStream) { 246 rv = mSnappyStream->Close(); 247 } 248 } 249 NoteClosed(); 250 return rv; 251 } 252 253 nsresult ReadStream::Inner::Available(uint64_t* aNumAvailableOut) { 254 // stream ops can happen on any thread 255 nsresult rv = NS_OK; 256 { 257 MutexAutoLock lock(mMutex); 258 rv = EnsureStream()->Available(aNumAvailableOut); 259 } 260 261 if (NS_FAILED(rv)) { 262 Close(); 263 } 264 265 return rv; 266 } 267 268 nsresult ReadStream::Inner::StreamStatus() { 269 // stream ops can happen on any thread 270 nsresult rv = NS_OK; 271 { 272 MutexAutoLock lock(mMutex); 273 rv = EnsureStream()->StreamStatus(); 274 } 275 276 if (NS_FAILED(rv)) { 277 Close(); 278 } 279 280 return rv; 281 } 282 283 nsresult ReadStream::Inner::Read(char* aBuf, uint32_t aCount, 284 uint32_t* aNumReadOut) { 285 // stream ops can happen on any thread 286 MOZ_DIAGNOSTIC_ASSERT(aNumReadOut); 287 288 nsresult rv = NS_OK; 289 { 290 MutexAutoLock lock(mMutex); 291 rv = EnsureStream()->Read(aBuf, aCount, aNumReadOut); 292 } 293 294 if ((NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK) || 295 *aNumReadOut == 0) { 296 Close(); 297 } 298 299 mHasEverBeenRead = true; 300 301 return rv; 302 } 303 304 nsresult ReadStream::Inner::ReadSegments(nsWriteSegmentFun aWriter, 305 void* aClosure, uint32_t aCount, 306 uint32_t* aNumReadOut) { 307 // stream ops can happen on any thread 308 MOZ_DIAGNOSTIC_ASSERT(aNumReadOut); 309 310 if (aCount) { 311 mHasEverBeenRead = true; 312 } 313 314 nsresult rv = NS_OK; 315 { 316 MutexAutoLock lock(mMutex); 317 rv = EnsureStream()->ReadSegments(aWriter, aClosure, aCount, aNumReadOut); 318 } 319 320 if ((NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK && 321 rv != NS_ERROR_NOT_IMPLEMENTED) || 322 *aNumReadOut == 0) { 323 Close(); 324 } 325 326 // Verify bytes were actually read before marking as being ever read. For 327 // example, code can test if the stream supports ReadSegments() by calling 328 // this method with a dummy callback which doesn't read anything. We don't 329 // want to trigger on that. 330 if (*aNumReadOut) { 331 mHasEverBeenRead = true; 332 } 333 334 return rv; 335 } 336 337 nsresult ReadStream::Inner::IsNonBlocking(bool* aNonBlockingOut) { 338 // stream ops can happen on any thread 339 MutexAutoLock lock(mMutex); 340 if (mSnappyStream) { 341 return mSnappyStream->IsNonBlocking(aNonBlockingOut); 342 } 343 *aNonBlockingOut = false; 344 return NS_OK; 345 } 346 347 ReadStream::Inner::~Inner() { 348 // Any thread 349 MOZ_DIAGNOSTIC_ASSERT(mState == Closed); 350 MOZ_DIAGNOSTIC_ASSERT(!mControl); 351 } 352 353 void ReadStream::Inner::NoteClosed() { 354 // Any thread 355 if (mState == Closed) { 356 return; 357 } 358 359 if (mOwningEventTarget->IsOnCurrentThread()) { 360 NoteClosedOnOwningThread(); 361 return; 362 } 363 364 nsCOMPtr<nsIRunnable> runnable = new NoteClosedRunnable(SafeRefPtrFromThis()); 365 MOZ_ALWAYS_SUCCEEDS(mOwningEventTarget->Dispatch(runnable.forget(), 366 nsIThread::DISPATCH_NORMAL)); 367 } 368 369 void ReadStream::Inner::Forget() { 370 // Any thread 371 if (mState == Closed) { 372 return; 373 } 374 375 if (mOwningEventTarget->IsOnCurrentThread()) { 376 ForgetOnOwningThread(); 377 return; 378 } 379 380 nsCOMPtr<nsIRunnable> runnable = new ForgetRunnable(SafeRefPtrFromThis()); 381 MOZ_ALWAYS_SUCCEEDS(mOwningEventTarget->Dispatch(runnable.forget(), 382 nsIThread::DISPATCH_NORMAL)); 383 } 384 385 void ReadStream::Inner::NoteClosedOnOwningThread() { 386 MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread()); 387 388 // Mark closed and do nothing if we were already closed 389 if (!mState.compareExchange(Open, Closed)) { 390 return; 391 } 392 393 MaybeAbortAsyncOpenStream(); 394 395 MOZ_DIAGNOSTIC_ASSERT(mControl); 396 mControl->NoteClosed(SafeRefPtrFromThis(), mId); 397 mControl = nullptr; 398 } 399 400 void ReadStream::Inner::ForgetOnOwningThread() { 401 MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread()); 402 403 // Mark closed and do nothing if we were already closed 404 if (!mState.compareExchange(Open, Closed)) { 405 return; 406 } 407 408 MaybeAbortAsyncOpenStream(); 409 410 MOZ_DIAGNOSTIC_ASSERT(mControl); 411 mControl->ForgetReadStream(SafeRefPtrFromThis()); 412 mControl = nullptr; 413 } 414 415 nsIInputStream* ReadStream::Inner::EnsureStream() { 416 mMutex.AssertCurrentThreadOwns(); 417 418 // We need to block the current thread while we open the stream. We 419 // cannot do this safely from the main owning thread since it would 420 // trigger deadlock. This should be ok, though, since a blocking 421 // stream like this should never be read on the owning thread anyway. 422 if (mOwningEventTarget->IsOnCurrentThread()) { 423 MOZ_CRASH("Blocking read on the js/ipc owning thread!"); 424 } 425 426 if (mSnappyStream) { 427 return mSnappyStream; 428 } 429 430 nsCOMPtr<nsIRunnable> r = NewCancelableRunnableMethod( 431 "ReadStream::Inner::AsyncOpenStreamOnOwningThread", this, 432 &ReadStream::Inner::AsyncOpenStreamOnOwningThread); 433 nsresult rv = 434 mOwningEventTarget->Dispatch(r.forget(), nsIThread::DISPATCH_NORMAL); 435 if (NS_WARN_IF(NS_FAILED(rv))) { 436 OpenStreamFailed(); 437 return mSnappyStream; 438 } 439 440 mCondVar.Wait(); 441 MOZ_DIAGNOSTIC_ASSERT(mSnappyStream); 442 443 return mSnappyStream; 444 } 445 446 void ReadStream::Inner::AsyncOpenStreamOnOwningThread() { 447 MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread()); 448 449 if (mSnappyStream) { 450 // Different threads might request opening the stream at the same time. If 451 // the earlier request succeeded, then use the result. 452 mCondVar.NotifyAll(); 453 return; 454 } 455 456 if (!mControl || mState == Closed) { 457 MutexAutoLock lock(mMutex); 458 OpenStreamFailed(); 459 mCondVar.NotifyAll(); 460 return; 461 } 462 463 if (mAsyncOpenStarted) { 464 return; 465 } 466 mAsyncOpenStarted = true; 467 468 RefPtr<ReadStream::Inner> self = this; 469 mControl->OpenStream(mId, [self](nsCOMPtr<nsIInputStream>&& aStream) { 470 MutexAutoLock lock(self->mMutex); 471 self->mAsyncOpenStarted = false; 472 if (!self->mStream) { 473 if (!aStream) { 474 self->OpenStreamFailed(); 475 } else { 476 self->mStream = std::move(aStream); 477 self->mSnappyStream = new SnappyUncompressInputStream(self->mStream); 478 } 479 } 480 self->mCondVar.NotifyAll(); 481 }); 482 } 483 484 void ReadStream::Inner::MaybeAbortAsyncOpenStream() { 485 if (!mAsyncOpenStarted) { 486 return; 487 } 488 489 MutexAutoLock lock(mMutex); 490 OpenStreamFailed(); 491 mCondVar.NotifyAll(); 492 } 493 494 void ReadStream::Inner::OpenStreamFailed() { 495 MOZ_DIAGNOSTIC_ASSERT(!mStream); 496 MOZ_DIAGNOSTIC_ASSERT(!mSnappyStream); 497 mMutex.AssertCurrentThreadOwns(); 498 (void)NS_NewCStringInputStream(getter_AddRefs(mStream), ""_ns); 499 mSnappyStream = mStream; 500 mStream->Close(); 501 NoteClosed(); 502 } 503 504 // ---------------------------------------------------------------------------- 505 506 NS_IMPL_ISUPPORTS(cache::ReadStream, nsIInputStream, ReadStream); 507 508 // static 509 already_AddRefed<ReadStream> ReadStream::Create( 510 const Maybe<CacheReadStream>& aMaybeReadStream) { 511 if (aMaybeReadStream.isNothing()) { 512 return nullptr; 513 } 514 515 return Create(aMaybeReadStream.ref()); 516 } 517 518 // static 519 already_AddRefed<ReadStream> ReadStream::Create( 520 const CacheReadStream& aReadStream) { 521 // The parameter may or may not be for a Cache created stream. The way we 522 // tell is by looking at the stream control actor. If the actor exists, 523 // then we know the Cache created it. 524 if (!aReadStream.control()) { 525 return nullptr; 526 } 527 528 MOZ_DIAGNOSTIC_ASSERT(aReadStream.stream().isNothing() || 529 aReadStream.stream().ref().stream().type() != 530 mozilla::ipc::InputStreamParams::T__None); 531 532 // Control is guaranteed to survive this method as ActorDestroy() cannot 533 // run on this thread until we complete. 534 StreamControl* control; 535 if (aReadStream.control().IsChild()) { 536 auto actor = 537 static_cast<CacheStreamControlChild*>(aReadStream.control().AsChild()); 538 control = actor; 539 } else { 540 auto actor = static_cast<CacheStreamControlParent*>( 541 aReadStream.control().AsParent()); 542 control = actor; 543 } 544 MOZ_DIAGNOSTIC_ASSERT(control); 545 546 nsCOMPtr<nsIInputStream> stream = DeserializeIPCStream(aReadStream.stream()); 547 548 // Currently we expect all cache read streams to be blocking file streams. 549 #if defined(MOZ_DIAGNOSTIC_ASSERT_ENABLED) 550 if (stream) { 551 nsCOMPtr<nsIAsyncInputStream> asyncStream = do_QueryInterface(stream); 552 MOZ_DIAGNOSTIC_ASSERT(!asyncStream); 553 } 554 #endif 555 556 return MakeAndAddRef<ReadStream>(MakeSafeRefPtr<ReadStream::Inner>( 557 std::move(control), aReadStream.id(), stream)); 558 } 559 560 // static 561 already_AddRefed<ReadStream> ReadStream::Create( 562 PCacheStreamControlParent* aControl, const nsID& aId, 563 nsIInputStream* aStream) { 564 MOZ_DIAGNOSTIC_ASSERT(aControl); 565 566 return MakeAndAddRef<ReadStream>(MakeSafeRefPtr<ReadStream::Inner>( 567 static_cast<CacheStreamControlParent*>(aControl), aId, aStream)); 568 } 569 570 void ReadStream::Serialize(Maybe<CacheReadStream>* aReadStreamOut, 571 ErrorResult& aRv) { 572 mInner->Serialize(aReadStreamOut, aRv); 573 } 574 575 void ReadStream::Serialize(CacheReadStream* aReadStreamOut, ErrorResult& aRv) { 576 mInner->Serialize(aReadStreamOut, aRv); 577 } 578 579 ReadStream::ReadStream(SafeRefPtr<ReadStream::Inner> aInner) 580 : mInner(std::move(aInner)) { 581 MOZ_DIAGNOSTIC_ASSERT(mInner); 582 } 583 584 ReadStream::~ReadStream() { 585 // Explicitly close the inner stream so that it does not have to 586 // deal with implicitly closing at destruction time. 587 mInner->Close(); 588 } 589 590 NS_IMETHODIMP 591 ReadStream::Close() { return mInner->Close(); } 592 593 NS_IMETHODIMP 594 ReadStream::Available(uint64_t* aNumAvailableOut) { 595 return mInner->Available(aNumAvailableOut); 596 } 597 598 NS_IMETHODIMP 599 ReadStream::StreamStatus() { return mInner->StreamStatus(); } 600 601 NS_IMETHODIMP 602 ReadStream::Read(char* aBuf, uint32_t aCount, uint32_t* aNumReadOut) { 603 return mInner->Read(aBuf, aCount, aNumReadOut); 604 } 605 606 NS_IMETHODIMP 607 ReadStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, 608 uint32_t aCount, uint32_t* aNumReadOut) { 609 return mInner->ReadSegments(aWriter, aClosure, aCount, aNumReadOut); 610 } 611 612 NS_IMETHODIMP 613 ReadStream::IsNonBlocking(bool* aNonBlockingOut) { 614 return mInner->IsNonBlocking(aNonBlockingOut); 615 } 616 617 } // namespace mozilla::dom::cache