nsInputStreamPump.cpp (27026B)
1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* vim:set ts=4 sts=2 sw=2 et cin: */ 3 /* This Source Code Form is subject to the terms of the Mozilla Public 4 * License, v. 2.0. If a copy of the MPL was not distributed with this 5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 6 7 #include "nsIOService.h" 8 #include "nsInputStreamPump.h" 9 #include "nsIStreamTransportService.h" 10 #include "nsIThreadRetargetableStreamListener.h" 11 #include "nsThreadUtils.h" 12 #include "nsCOMPtr.h" 13 #include "mozilla/Logging.h" 14 #include "mozilla/NonBlockingAsyncInputStream.h" 15 #include "mozilla/ProfilerLabels.h" 16 #include "mozilla/SlicedInputStream.h" 17 #include "mozilla/StaticPrefs_network.h" 18 #include "nsIStreamListener.h" 19 #include "nsILoadGroup.h" 20 #include "nsNetCID.h" 21 #include "nsNetUtil.h" 22 #include "nsStreamUtils.h" 23 #include <algorithm> 24 25 // 26 // MOZ_LOG=nsStreamPump:5 27 // 28 static mozilla::LazyLogModule gStreamPumpLog("nsStreamPump"); 29 #undef LOG 30 #define LOG(args) MOZ_LOG(gStreamPumpLog, mozilla::LogLevel::Debug, args) 31 32 //----------------------------------------------------------------------------- 33 // nsInputStreamPump methods 34 //----------------------------------------------------------------------------- 35 36 nsInputStreamPump::nsInputStreamPump() : mOffMainThread(!NS_IsMainThread()) {} 37 38 nsresult nsInputStreamPump::Create(nsInputStreamPump** result, 39 nsIInputStream* stream, uint32_t segsize, 40 uint32_t segcount, bool closeWhenDone, 41 nsISerialEventTarget* mainThreadTarget) { 42 nsresult rv = NS_ERROR_OUT_OF_MEMORY; 43 RefPtr<nsInputStreamPump> pump = new nsInputStreamPump(); 44 if (pump) { 45 rv = pump->Init(stream, segsize, segcount, closeWhenDone, mainThreadTarget); 46 if (NS_SUCCEEDED(rv)) { 47 pump.forget(result); 48 } 49 } 50 return rv; 51 } 52 53 struct PeekData { 54 PeekData(nsInputStreamPump::PeekSegmentFun fun, void* closure) 55 : mFunc(fun), mClosure(closure) {} 56 57 nsInputStreamPump::PeekSegmentFun mFunc; 58 void* mClosure; 59 }; 60 61 static nsresult CallPeekFunc(nsIInputStream* aInStream, void* aClosure, 62 const char* aFromSegment, uint32_t aToOffset, 63 uint32_t aCount, uint32_t* aWriteCount) { 64 NS_ASSERTION(aToOffset == 0, "Called more than once?"); 65 NS_ASSERTION(aCount > 0, "Called without data?"); 66 67 PeekData* data = static_cast<PeekData*>(aClosure); 68 data->mFunc(data->mClosure, reinterpret_cast<const uint8_t*>(aFromSegment), 69 aCount); 70 return NS_BINDING_ABORTED; 71 } 72 73 nsresult nsInputStreamPump::PeekStream(PeekSegmentFun callback, void* closure) { 74 RecursiveMutexAutoLock lock(mMutex); 75 76 if (!mAsyncStream) { 77 MOZ_DIAGNOSTIC_ASSERT(false, "PeekStream called without stream"); 78 return NS_ERROR_NOT_AVAILABLE; 79 } 80 81 nsresult rv = CreateBufferedStreamIfNeeded(); 82 NS_ENSURE_SUCCESS(rv, rv); 83 84 // See if the pipe is closed by checking the return of Available. 85 uint64_t dummy64; 86 rv = mAsyncStream->Available(&dummy64); 87 if (NS_FAILED(rv)) return rv; 88 uint32_t dummy = (uint32_t)std::min(dummy64, (uint64_t)UINT32_MAX); 89 90 PeekData data(callback, closure); 91 return mAsyncStream->ReadSegments( 92 CallPeekFunc, &data, mozilla::net::nsIOService::gDefaultSegmentSize, 93 &dummy); 94 } 95 96 nsresult nsInputStreamPump::EnsureWaiting() { 97 mMutex.AssertCurrentThreadIn(); 98 99 // no need to worry about multiple threads... an input stream pump lives 100 // on only one thread at a time. 101 MOZ_ASSERT(mAsyncStream); 102 if (!mWaitingForInputStreamReady && !mProcessingCallbacks) { 103 // Ensure OnStateStop is called on the main thread only when this pump is 104 // created on main thread. 105 if (mState == STATE_STOP && !mOffMainThread) { 106 nsCOMPtr<nsISerialEventTarget> mainThread = 107 mLabeledMainThreadTarget 108 ? mLabeledMainThreadTarget 109 : do_AddRef(mozilla::GetMainThreadSerialEventTarget()); 110 if (mTargetThread != mainThread) { 111 mTargetThread = mainThread; 112 } 113 } 114 MOZ_ASSERT(mTargetThread); 115 nsresult rv = mAsyncStream->AsyncWait(this, 0, 0, mTargetThread); 116 if (NS_FAILED(rv)) { 117 NS_ERROR("AsyncWait failed"); 118 return rv; 119 } 120 // Any retargeting during STATE_START or START_TRANSFER is complete 121 // after the call to AsyncWait; next callback will be on mTargetThread. 122 mRetargeting = false; 123 mWaitingForInputStreamReady = true; 124 } 125 return NS_OK; 126 } 127 128 //----------------------------------------------------------------------------- 129 // nsInputStreamPump::nsISupports 130 //----------------------------------------------------------------------------- 131 132 // although this class can only be accessed from one thread at a time, we do 133 // allow its ownership to move from thread to thread, assuming the consumer 134 // understands the limitations of this. 135 NS_IMPL_ADDREF(nsInputStreamPump) 136 NS_IMPL_RELEASE(nsInputStreamPump) 137 NS_INTERFACE_MAP_BEGIN(nsInputStreamPump) 138 NS_INTERFACE_MAP_ENTRY(nsIRequest) 139 NS_INTERFACE_MAP_ENTRY(nsIThreadRetargetableRequest) 140 NS_INTERFACE_MAP_ENTRY(nsIInputStreamCallback) 141 NS_INTERFACE_MAP_ENTRY(nsIInputStreamPump) 142 NS_INTERFACE_MAP_ENTRY_CONCRETE(nsInputStreamPump) 143 NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIInputStreamPump) 144 NS_INTERFACE_MAP_END 145 146 //----------------------------------------------------------------------------- 147 // nsInputStreamPump::nsIRequest 148 //----------------------------------------------------------------------------- 149 150 NS_IMETHODIMP 151 nsInputStreamPump::GetName(nsACString& result) { 152 RecursiveMutexAutoLock lock(mMutex); 153 154 result.Truncate(); 155 return NS_OK; 156 } 157 158 NS_IMETHODIMP 159 nsInputStreamPump::IsPending(bool* result) { 160 RecursiveMutexAutoLock lock(mMutex); 161 162 *result = (mState != STATE_IDLE && mState != STATE_DEAD); 163 return NS_OK; 164 } 165 166 NS_IMETHODIMP 167 nsInputStreamPump::GetStatus(nsresult* status) { 168 RecursiveMutexAutoLock lock(mMutex); 169 170 *status = mStatus; 171 return NS_OK; 172 } 173 174 NS_IMETHODIMP nsInputStreamPump::SetCanceledReason(const nsACString& aReason) { 175 return SetCanceledReasonImpl(aReason); 176 } 177 178 NS_IMETHODIMP nsInputStreamPump::GetCanceledReason(nsACString& aReason) { 179 return GetCanceledReasonImpl(aReason); 180 } 181 182 NS_IMETHODIMP nsInputStreamPump::CancelWithReason(nsresult aStatus, 183 const nsACString& aReason) { 184 return CancelWithReasonImpl(aStatus, aReason); 185 } 186 187 NS_IMETHODIMP 188 nsInputStreamPump::Cancel(nsresult status) { 189 RecursiveMutexAutoLock lock(mMutex); 190 191 AssertOnThread(); 192 193 LOG(("nsInputStreamPump::Cancel [this=%p status=%" PRIx32 "]\n", this, 194 static_cast<uint32_t>(status))); 195 196 if (NS_FAILED(mStatus)) { 197 LOG((" already canceled\n")); 198 return NS_OK; 199 } 200 201 NS_ASSERTION(NS_FAILED(status), "cancel with non-failure status code"); 202 mStatus = status; 203 204 // close input stream 205 if (mAsyncStream) { 206 // If mSuspendCount != 0, EnsureWaiting will be called by Resume(). 207 // Note that while suspended, OnInputStreamReady will 208 // not do anything, and also note that calling asyncWait 209 // on a closed stream works and will dispatch an event immediately. 210 211 nsCOMPtr<nsIEventTarget> currentTarget = NS_GetCurrentThread(); 212 if (mTargetThread && currentTarget != mTargetThread) { 213 nsresult rv = mTargetThread->Dispatch(NS_NewRunnableFunction( 214 "nsInputStreamPump::Cancel", [self = RefPtr{this}, status] { 215 RecursiveMutexAutoLock lock(self->mMutex); 216 if (!self->mAsyncStream) { 217 return; 218 } 219 self->mAsyncStream->CloseWithStatus(status); 220 if (self->mSuspendCount == 0) { 221 self->EnsureWaiting(); 222 } 223 })); 224 NS_ENSURE_SUCCESS(rv, rv); 225 } else { 226 mAsyncStream->CloseWithStatus(status); 227 if (mSuspendCount == 0) { 228 EnsureWaiting(); 229 } 230 } 231 } 232 return NS_OK; 233 } 234 235 NS_IMETHODIMP 236 nsInputStreamPump::Suspend() { 237 RecursiveMutexAutoLock lock(mMutex); 238 239 LOG(("nsInputStreamPump::Suspend [this=%p]\n", this)); 240 NS_ENSURE_TRUE(mState != STATE_IDLE && mState != STATE_DEAD, 241 NS_ERROR_UNEXPECTED); 242 ++mSuspendCount; 243 return NS_OK; 244 } 245 246 NS_IMETHODIMP 247 nsInputStreamPump::Resume() { 248 RecursiveMutexAutoLock lock(mMutex); 249 250 LOG(("nsInputStreamPump::Resume [this=%p]\n", this)); 251 NS_ENSURE_TRUE(mSuspendCount > 0, NS_ERROR_UNEXPECTED); 252 NS_ENSURE_TRUE(mState != STATE_IDLE && mState != STATE_DEAD, 253 NS_ERROR_UNEXPECTED); 254 255 // There is a brief in-between state when we null out mAsyncStream in 256 // OnStateStop() before calling OnStopRequest, and only afterwards set 257 // STATE_DEAD, which we need to handle gracefully. 258 if (--mSuspendCount == 0 && mAsyncStream) { 259 EnsureWaiting(); 260 } 261 return NS_OK; 262 } 263 264 NS_IMETHODIMP 265 nsInputStreamPump::GetLoadFlags(nsLoadFlags* aLoadFlags) { 266 RecursiveMutexAutoLock lock(mMutex); 267 268 *aLoadFlags = mLoadFlags; 269 return NS_OK; 270 } 271 272 NS_IMETHODIMP 273 nsInputStreamPump::SetLoadFlags(nsLoadFlags aLoadFlags) { 274 RecursiveMutexAutoLock lock(mMutex); 275 276 mLoadFlags = aLoadFlags; 277 return NS_OK; 278 } 279 280 NS_IMETHODIMP 281 nsInputStreamPump::GetTRRMode(nsIRequest::TRRMode* aTRRMode) { 282 return GetTRRModeImpl(aTRRMode); 283 } 284 285 NS_IMETHODIMP 286 nsInputStreamPump::SetTRRMode(nsIRequest::TRRMode aTRRMode) { 287 return SetTRRModeImpl(aTRRMode); 288 } 289 290 NS_IMETHODIMP 291 nsInputStreamPump::GetLoadGroup(nsILoadGroup** aLoadGroup) { 292 RecursiveMutexAutoLock lock(mMutex); 293 294 *aLoadGroup = do_AddRef(mLoadGroup).take(); 295 return NS_OK; 296 } 297 298 NS_IMETHODIMP 299 nsInputStreamPump::SetLoadGroup(nsILoadGroup* aLoadGroup) { 300 RecursiveMutexAutoLock lock(mMutex); 301 302 mLoadGroup = aLoadGroup; 303 return NS_OK; 304 } 305 306 //----------------------------------------------------------------------------- 307 // nsInputStreamPump::nsIInputStreamPump implementation 308 //----------------------------------------------------------------------------- 309 310 NS_IMETHODIMP 311 nsInputStreamPump::Init(nsIInputStream* stream, uint32_t segsize, 312 uint32_t segcount, bool closeWhenDone, 313 nsISerialEventTarget* mainThreadTarget) { 314 // probably we can't be multithread-accessed yet 315 RecursiveMutexAutoLock lock(mMutex); 316 NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS); 317 318 mStream = stream; 319 mSegSize = segsize; 320 mSegCount = segcount; 321 mCloseWhenDone = closeWhenDone; 322 mLabeledMainThreadTarget = mainThreadTarget; 323 if (mOffMainThread && mLabeledMainThreadTarget) { 324 MOZ_ASSERT( 325 false, 326 "Init stream pump off main thread with a main thread event target."); 327 return NS_ERROR_FAILURE; 328 } 329 330 return NS_OK; 331 } 332 333 NS_IMETHODIMP 334 nsInputStreamPump::Reset() { 335 RecursiveMutexAutoLock lock(mMutex); 336 LOG(("nsInputStreamPump::Reset [this=%p]\n", this)); 337 mListener = nullptr; 338 339 if (mAsyncStream && NS_SUCCEEDED(mAsyncStream->StreamStatus())) { 340 mAsyncStream->Close(); 341 mAsyncStream->AsyncWait(nullptr, 0, 0, nullptr); 342 } 343 344 // release the reference, input stream must be closed by the transaction 345 mStream = nullptr; 346 347 return NS_OK; 348 } 349 350 NS_IMETHODIMP 351 nsInputStreamPump::AsyncRead(nsIStreamListener* listener) { 352 RecursiveMutexAutoLock lock(mMutex); 353 354 // This ensures only one thread can interact with a pump at a time 355 NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS); 356 NS_ENSURE_ARG_POINTER(listener); 357 MOZ_ASSERT(NS_IsMainThread() || mOffMainThread, 358 "nsInputStreamPump should be read from the " 359 "main thread only."); 360 361 nsresult rv = NS_MakeAsyncNonBlockingInputStream( 362 mStream.forget(), getter_AddRefs(mAsyncStream), mCloseWhenDone, mSegSize, 363 mSegCount); 364 if (NS_WARN_IF(NS_FAILED(rv))) { 365 return rv; 366 } 367 368 MOZ_ASSERT(mAsyncStream); 369 370 // mStreamOffset now holds the number of bytes currently read. 371 mStreamOffset = 0; 372 373 // grab event queue (we must do this here by contract, since all notifications 374 // must go to the thread which called AsyncRead) 375 if (NS_IsMainThread() && mLabeledMainThreadTarget) { 376 mTargetThread = mLabeledMainThreadTarget; 377 } else { 378 mTargetThread = mozilla::GetCurrentSerialEventTarget(); 379 } 380 NS_ENSURE_STATE(mTargetThread); 381 382 rv = EnsureWaiting(); 383 if (NS_FAILED(rv)) return rv; 384 385 if (mLoadGroup) mLoadGroup->AddRequest(this, nullptr); 386 387 mState = STATE_START; 388 mListener = listener; 389 return NS_OK; 390 } 391 392 //----------------------------------------------------------------------------- 393 // nsInputStreamPump::nsIInputStreamCallback implementation 394 //----------------------------------------------------------------------------- 395 396 NS_IMETHODIMP 397 nsInputStreamPump::OnInputStreamReady(nsIAsyncInputStream* stream) { 398 LOG(("nsInputStreamPump::OnInputStreamReady [this=%p]\n", this)); 399 400 AUTO_PROFILER_LABEL("nsInputStreamPump::OnInputStreamReady", NETWORK); 401 402 // this function has been called from a PLEvent, so we can safely call 403 // any listener or progress sink methods directly from here. 404 405 for (;;) { 406 // There should only be one iteration of this loop happening at a time. 407 // To prevent AsyncWait() (called during callbacks or on other threads) 408 // from creating a parallel OnInputStreamReady(), we use: 409 // -- a mutex; and 410 // -- a boolean mProcessingCallbacks to detect parallel loops 411 // when exiting the mutex for callbacks. 412 RecursiveMutexAutoLock lock(mMutex); 413 414 // Prevent parallel execution during callbacks, while out of mutex. 415 if (mProcessingCallbacks) { 416 MOZ_ASSERT(!mProcessingCallbacks); 417 break; 418 } 419 mProcessingCallbacks = true; 420 if (mSuspendCount || mState == STATE_IDLE || mState == STATE_DEAD) { 421 mWaitingForInputStreamReady = false; 422 mProcessingCallbacks = false; 423 break; 424 } 425 426 uint32_t nextState; 427 switch (mState) { 428 case STATE_START: 429 nextState = OnStateStart(); 430 break; 431 case STATE_TRANSFER: 432 nextState = OnStateTransfer(); 433 break; 434 case STATE_STOP: 435 mRetargeting = false; 436 nextState = OnStateStop(); 437 break; 438 default: 439 nextState = 0; 440 MOZ_ASSERT_UNREACHABLE("Unknown enum value."); 441 return NS_ERROR_UNEXPECTED; 442 } 443 444 bool stillTransferring = 445 (mState == STATE_TRANSFER && nextState == STATE_TRANSFER); 446 if (stillTransferring) { 447 NS_ASSERTION(NS_SUCCEEDED(mStatus), 448 "Should not have failed status for ongoing transfer"); 449 } else { 450 NS_ASSERTION(mState != nextState, 451 "Only OnStateTransfer can be called more than once."); 452 } 453 if (mRetargeting) { 454 NS_ASSERTION(mState != STATE_STOP, 455 "Retargeting should not happen during OnStateStop."); 456 } 457 458 // Set mRetargeting so EnsureWaiting will be called. It ensures that 459 // OnStateStop is called on the main thread. 460 if (nextState == STATE_STOP && !NS_IsMainThread() && !mOffMainThread) { 461 mRetargeting = true; 462 } 463 464 // Unset mProcessingCallbacks here (while we have lock) so our own call to 465 // EnsureWaiting isn't blocked by it. 466 mProcessingCallbacks = false; 467 468 // We must break the loop if suspended during one of the previous 469 // operation. 470 if (mSuspendCount) { 471 mState = nextState; 472 mWaitingForInputStreamReady = false; 473 break; 474 } 475 476 // Wait asynchronously if there is still data to transfer, or we're 477 // switching event delivery to another thread. 478 if (stillTransferring || mRetargeting) { 479 mState = nextState; 480 mWaitingForInputStreamReady = false; 481 nsresult rv = EnsureWaiting(); 482 if (NS_SUCCEEDED(rv)) break; 483 484 // Failure to start asynchronous wait: stop transfer. 485 // Do not set mStatus if it was previously set to report a failure. 486 if (NS_SUCCEEDED(mStatus)) { 487 mStatus = rv; 488 } 489 nextState = STATE_STOP; 490 } 491 492 mState = nextState; 493 } 494 return NS_OK; 495 } 496 497 uint32_t nsInputStreamPump::OnStateStart() MOZ_REQUIRES(mMutex) { 498 mMutex.AssertCurrentThreadIn(); 499 500 AUTO_PROFILER_LABEL("nsInputStreamPump::OnStateStart", NETWORK); 501 502 LOG((" OnStateStart [this=%p]\n", this)); 503 504 nsresult rv; 505 506 // need to check the reason why the stream is ready. this is required 507 // so our listener can check our status from OnStartRequest. 508 // XXX async streams should have a GetStatus method! 509 if (NS_SUCCEEDED(mStatus)) { 510 uint64_t avail; 511 rv = mAsyncStream->Available(&avail); 512 if (NS_FAILED(rv) && rv != NS_BASE_STREAM_CLOSED) mStatus = rv; 513 } 514 515 { 516 nsCOMPtr<nsIStreamListener> listener = mListener; 517 if (!listener) { 518 return STATE_DEAD; 519 } 520 // We're on the writing thread 521 AssertOnThread(); 522 523 // Note: Must exit mutex for call to OnStartRequest to avoid 524 // deadlocks when calls to RetargetDeliveryTo for multiple 525 // nsInputStreamPumps are needed (e.g. nsHttpChannel). 526 RecursiveMutexAutoUnlock unlock(mMutex); 527 rv = listener->OnStartRequest(this); 528 } 529 530 // an error returned from OnStartRequest should cause us to abort; however, 531 // we must not stomp on mStatus if already canceled. 532 if (NS_FAILED(rv) && NS_SUCCEEDED(mStatus)) mStatus = rv; 533 534 return NS_SUCCEEDED(mStatus) ? STATE_TRANSFER : STATE_STOP; 535 } 536 537 uint32_t nsInputStreamPump::OnStateTransfer() MOZ_REQUIRES(mMutex) { 538 mMutex.AssertCurrentThreadIn(); 539 540 AUTO_PROFILER_LABEL("nsInputStreamPump::OnStateTransfer", NETWORK); 541 542 LOG((" OnStateTransfer [this=%p]\n", this)); 543 544 // if canceled, go directly to STATE_STOP... 545 if (NS_FAILED(mStatus)) return STATE_STOP; 546 547 nsresult rv = CreateBufferedStreamIfNeeded(); 548 if (NS_WARN_IF(NS_FAILED(rv))) { 549 return STATE_STOP; 550 } 551 552 uint64_t avail; 553 rv = mAsyncStream->Available(&avail); 554 LOG((" Available returned [stream=%p rv=%" PRIx32 " avail=%" PRIu64 "]\n", 555 mAsyncStream.get(), static_cast<uint32_t>(rv), avail)); 556 557 if (rv == NS_BASE_STREAM_CLOSED) { 558 rv = NS_OK; 559 avail = 0; 560 } else if (NS_SUCCEEDED(rv) && avail) { 561 // we used to limit avail to 16K - we were afraid some ODA handlers 562 // might assume they wouldn't get more than 16K at once 563 // we're removing that limit since it speeds up local file access. 564 // Now there's an implicit 64K limit of 4 16K segments 565 // NOTE: ok, so the story is as follows. OnDataAvailable impls 566 // are by contract supposed to consume exactly |avail| bytes. 567 // however, many do not... mailnews... stream converters... 568 // cough, cough. the input stream pump is fairly tolerant 569 // in this regard; however, if an ODA does not consume any 570 // data from the stream, then we could potentially end up in 571 // an infinite loop. we do our best here to try to catch 572 // such an error. (see bug 189672) 573 574 // in most cases this QI will succeed (mAsyncStream is almost always 575 // a nsPipeInputStream, which implements nsITellableStream::Tell). 576 int64_t offsetBefore; 577 nsCOMPtr<nsITellableStream> tellable = do_QueryInterface(mAsyncStream); 578 if (tellable && NS_FAILED(tellable->Tell(&offsetBefore))) { 579 MOZ_ASSERT_UNREACHABLE("Tell failed on readable stream"); 580 offsetBefore = 0; 581 } 582 583 uint32_t odaAvail = avail > UINT32_MAX ? UINT32_MAX : uint32_t(avail); 584 585 LOG((" calling OnDataAvailable [offset=%" PRIu64 " count=%" PRIu64 586 "(%u)]\n", 587 mStreamOffset, avail, odaAvail)); 588 589 { 590 // We may be called on non-MainThread even if mOffMainThread is 591 // false, due to RetargetDeliveryTo(), so don't use AssertOnThread() 592 if (mTargetThread) { 593 MOZ_ASSERT(mTargetThread->IsOnCurrentThread()); 594 } else { 595 MOZ_ASSERT(NS_IsMainThread()); 596 } 597 598 nsCOMPtr<nsIStreamListener> listener = mListener; 599 if (!listener) { 600 return STATE_DEAD; 601 } 602 // Note: Must exit mutex for call to OnStartRequest to avoid 603 // deadlocks when calls to RetargetDeliveryTo for multiple 604 // nsInputStreamPumps are needed (e.g. nsHttpChannel). 605 RecursiveMutexAutoUnlock unlock(mMutex); 606 // We're on the writing thread for mListener and mAsyncStream. 607 // mStreamOffset is only touched in OnStateTransfer, and AsyncRead 608 // shouldn't be called during OnDataAvailable() 609 610 MOZ_PUSH_IGNORE_THREAD_SAFETY 611 rv = listener->OnDataAvailable(this, mAsyncStream, mStreamOffset, 612 odaAvail); 613 MOZ_POP_THREAD_SAFETY 614 } 615 616 // don't enter this code if ODA failed or called Cancel 617 if (NS_SUCCEEDED(rv) && NS_SUCCEEDED(mStatus)) { 618 // test to see if this ODA failed to consume data 619 if (tellable) { 620 // NOTE: if Tell fails, which can happen if the stream is 621 // now closed, then we assume that everything was read. 622 int64_t offsetAfter; 623 if (NS_FAILED(tellable->Tell(&offsetAfter))) { 624 offsetAfter = offsetBefore + odaAvail; 625 } 626 if (offsetAfter > offsetBefore) { 627 mStreamOffset += (offsetAfter - offsetBefore); 628 } else if (mSuspendCount == 0) { 629 // 630 // possible infinite loop if we continue pumping data! 631 // 632 // NOTE: although not allowed by nsIStreamListener, we 633 // will allow the ODA impl to Suspend the pump. IMAP 634 // does this :-( 635 // 636 NS_ERROR("OnDataAvailable implementation consumed no data"); 637 mStatus = NS_ERROR_UNEXPECTED; 638 } 639 } else { 640 mStreamOffset += odaAvail; // assume ODA behaved well 641 } 642 } 643 } 644 645 // an error returned from Available or OnDataAvailable should cause us to 646 // abort; however, we must not stop on mStatus if already canceled. 647 648 if (NS_SUCCEEDED(mStatus)) { 649 if (NS_FAILED(rv)) { 650 mStatus = rv; 651 } else if (avail) { 652 // if stream is now closed, advance to STATE_STOP right away. 653 // Available may return 0 bytes available at the moment; that 654 // would not mean that we are done. 655 // XXX async streams should have a GetStatus method! 656 rv = mAsyncStream->Available(&avail); 657 if (NS_SUCCEEDED(rv)) return STATE_TRANSFER; 658 if (rv != NS_BASE_STREAM_CLOSED) mStatus = rv; 659 } 660 } 661 return STATE_STOP; 662 } 663 664 nsresult nsInputStreamPump::CallOnStateStop() { 665 RecursiveMutexAutoLock lock(mMutex); 666 667 MOZ_ASSERT(NS_IsMainThread(), 668 "CallOnStateStop should only be called on the main thread."); 669 670 mState = OnStateStop(); 671 return NS_OK; 672 } 673 674 uint32_t nsInputStreamPump::OnStateStop() MOZ_REQUIRES(mMutex) { 675 mMutex.AssertCurrentThreadIn(); 676 677 if (!NS_IsMainThread() && !mOffMainThread) { 678 // This method can be called on a different thread if nsInputStreamPump 679 // is used off the main-thread. 680 if (NS_SUCCEEDED(mStatus) && mListener && 681 mozilla::StaticPrefs::network_send_OnDataFinished_nsInputStreamPump()) { 682 nsCOMPtr<nsIThreadRetargetableStreamListener> retargetableListener = 683 do_QueryInterface(mListener); 684 if (retargetableListener) { 685 retargetableListener->OnDataFinished(mStatus); 686 } 687 } 688 nsresult rv = mLabeledMainThreadTarget->Dispatch( 689 mozilla::NewRunnableMethod("nsInputStreamPump::CallOnStateStop", this, 690 &nsInputStreamPump::CallOnStateStop)); 691 NS_ENSURE_SUCCESS(rv, STATE_DEAD); 692 return STATE_DEAD; 693 } 694 695 AUTO_PROFILER_LABEL("nsInputStreamPump::OnStateStop", NETWORK); 696 697 LOG((" OnStateStop [this=%p status=%" PRIx32 "]\n", this, 698 static_cast<uint32_t>(mStatus))); 699 700 // if an error occurred, we must be sure to pass the error onto the async 701 // stream. in some cases, this is redundant, but since close is idempotent, 702 // this is OK. otherwise, be sure to honor the "close-when-done" option. 703 704 if (!mAsyncStream) { 705 MOZ_ASSERT(mAsyncStream, "null mAsyncStream: OnStateStop called twice?"); 706 return STATE_DEAD; 707 } 708 709 if (NS_FAILED(mStatus)) { 710 mAsyncStream->CloseWithStatus(mStatus); 711 } else if (mCloseWhenDone) { 712 mAsyncStream->Close(); 713 } 714 715 mAsyncStream = nullptr; 716 mIsPending = false; 717 { 718 // We're on the writing thread. 719 // We believe that mStatus can't be changed on us here. 720 AssertOnThread(); 721 722 nsCOMPtr<nsIStreamListener> listener = mListener; 723 nsresult status = mStatus; 724 // Note: Must exit mutex for call to OnStartRequest to avoid 725 // deadlocks when calls to RetargetDeliveryTo for multiple 726 // nsInputStreamPumps are needed (e.g. nsHttpChannel). 727 RecursiveMutexAutoUnlock unlock(mMutex); 728 729 listener->OnStopRequest(this, status); 730 } 731 mTargetThread = nullptr; 732 mListener = nullptr; 733 734 if (mLoadGroup) mLoadGroup->RemoveRequest(this, nullptr, mStatus); 735 736 return STATE_DEAD; 737 } 738 739 nsresult nsInputStreamPump::CreateBufferedStreamIfNeeded() { 740 if (mAsyncStreamIsBuffered) { 741 return NS_OK; 742 } 743 744 // ReadSegments is not available for any nsIAsyncInputStream. In order to use 745 // it, we wrap a nsIBufferedInputStream around it, if needed. 746 747 if (NS_InputStreamIsBuffered(mAsyncStream)) { 748 mAsyncStreamIsBuffered = true; 749 return NS_OK; 750 } 751 752 nsCOMPtr<nsIInputStream> stream; 753 nsresult rv = NS_NewBufferedInputStream(getter_AddRefs(stream), 754 mAsyncStream.forget(), 4096); 755 NS_ENSURE_SUCCESS(rv, rv); 756 757 // A buffered inputStream must implement nsIAsyncInputStream. 758 mAsyncStream = do_QueryInterface(stream); 759 MOZ_DIAGNOSTIC_ASSERT(mAsyncStream); 760 mAsyncStreamIsBuffered = true; 761 762 return NS_OK; 763 } 764 765 //----------------------------------------------------------------------------- 766 // nsIThreadRetargetableRequest 767 //----------------------------------------------------------------------------- 768 769 NS_IMETHODIMP 770 nsInputStreamPump::RetargetDeliveryTo(nsISerialEventTarget* aNewTarget) { 771 RecursiveMutexAutoLock lock(mMutex); 772 773 NS_ENSURE_ARG(aNewTarget); 774 NS_ENSURE_TRUE(mState == STATE_START || mState == STATE_TRANSFER, 775 NS_ERROR_UNEXPECTED); 776 777 // If canceled, do not retarget. Return with canceled status. 778 if (NS_FAILED(mStatus)) { 779 return mStatus; 780 } 781 782 if (aNewTarget == mTargetThread) { 783 NS_WARNING("Retargeting delivery to same thread"); 784 return NS_OK; 785 } 786 787 if (mOffMainThread) { 788 // Don't support retargeting if this pump is already used off the main 789 // thread. 790 return NS_ERROR_FAILURE; 791 } 792 793 // Ensure that |mListener| and any subsequent listeners can be retargeted 794 // to another thread. 795 nsresult rv = NS_OK; 796 nsCOMPtr<nsIThreadRetargetableStreamListener> retargetableListener = 797 do_QueryInterface(mListener, &rv); 798 if (NS_SUCCEEDED(rv) && retargetableListener) { 799 rv = retargetableListener->CheckListenerChain(); 800 if (NS_SUCCEEDED(rv)) { 801 mTargetThread = aNewTarget; 802 mRetargeting = true; 803 } 804 } 805 LOG( 806 ("nsInputStreamPump::RetargetDeliveryTo [this=%p aNewTarget=%p] " 807 "%s listener [%p] rv[%" PRIx32 "]", 808 this, aNewTarget, (mTargetThread == aNewTarget ? "success" : "failure"), 809 (nsIStreamListener*)mListener, static_cast<uint32_t>(rv))); 810 return rv; 811 } 812 813 NS_IMETHODIMP 814 nsInputStreamPump::GetDeliveryTarget(nsISerialEventTarget** aNewTarget) { 815 RecursiveMutexAutoLock lock(mMutex); 816 817 nsCOMPtr<nsISerialEventTarget> target = mTargetThread; 818 target.forget(aNewTarget); 819 return NS_OK; 820 }