nsBufferedStreams.cpp (33234B)
1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* This Source Code Form is subject to the terms of the Mozilla Public 3 * License, v. 2.0. If a copy of the MPL was not distributed with this 4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 5 6 #include "nsBufferedStreams.h" 7 #include "nsStreamUtils.h" 8 #include "nsNetCID.h" 9 #include "nsIClassInfoImpl.h" 10 #include "nsIEventTarget.h" 11 #include "nsThreadUtils.h" 12 #include "mozilla/DebugOnly.h" 13 #include "mozilla/ipc/InputStreamUtils.h" 14 #include <algorithm> 15 16 #ifdef DEBUG_brendan 17 # define METERING 18 #endif 19 20 #ifdef METERING 21 # include <stdio.h> 22 # define METER(x) x 23 # define MAX_BIG_SEEKS 20 24 25 static struct { 26 uint32_t mSeeksWithinBuffer; 27 uint32_t mSeeksOutsideBuffer; 28 uint32_t mBufferReadUponSeek; 29 uint32_t mBufferUnreadUponSeek; 30 uint32_t mBytesReadFromBuffer; 31 uint32_t mBigSeekIndex; 32 struct { 33 int64_t mOldOffset; 34 int64_t mNewOffset; 35 } mBigSeek[MAX_BIG_SEEKS]; 36 } bufstats; 37 #else 38 # define METER(x) /* nothing */ 39 #endif 40 41 using namespace mozilla::ipc; 42 using namespace mozilla; 43 44 //////////////////////////////////////////////////////////////////////////////// 45 // nsBufferedStream 46 47 nsBufferedStream::~nsBufferedStream() { Close(); } 48 49 NS_IMPL_ADDREF(nsBufferedStream) 50 NS_IMPL_RELEASE(nsBufferedStream) 51 52 NS_INTERFACE_MAP_BEGIN(nsBufferedStream) 53 NS_INTERFACE_MAP_ENTRY(nsISupports) 54 NS_INTERFACE_MAP_ENTRY(nsITellableStream) 55 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsISeekableStream, mSeekable) 56 NS_INTERFACE_MAP_END 57 58 nsresult nsBufferedStream::Init(nsISupports* aStream, uint32_t bufferSize) { 59 NS_ASSERTION(aStream, "need to supply a stream"); 60 NS_ASSERTION(mStream == nullptr, "already inited"); 61 mStream = aStream; // we keep a reference until nsBufferedStream::Close 62 mBufferSize = bufferSize; 63 mBufferStartOffset = 0; 64 mCursor = 0; 65 nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mStream); 66 mSeekable = seekable; 67 RecursiveMutexAutoLock lock(mBufferMutex); 68 mBuffer = new (mozilla::fallible) char[bufferSize]; 69 if (mBuffer == nullptr) { 70 return NS_ERROR_OUT_OF_MEMORY; 71 } 72 return NS_OK; 73 } 74 75 void nsBufferedStream::Close() { 76 // Drop the reference from nsBufferedStream::Init() 77 mStream = nullptr; 78 RecursiveMutexAutoLock lock(mBufferMutex); 79 if (mBuffer) { 80 delete[] mBuffer; 81 mBuffer = nullptr; 82 mBufferSize = 0; 83 mBufferStartOffset = 0; 84 mCursor = 0; 85 mFillPoint = 0; 86 } 87 #ifdef METERING 88 { 89 static FILE* tfp; 90 if (!tfp) { 91 tfp = fopen("/tmp/bufstats", "w"); 92 if (tfp) { 93 setvbuf(tfp, nullptr, _IOLBF, 0); 94 } 95 } 96 if (tfp) { 97 fprintf(tfp, "seeks within buffer: %u\n", bufstats.mSeeksWithinBuffer); 98 fprintf(tfp, "seeks outside buffer: %u\n", 99 bufstats.mSeeksOutsideBuffer); 100 fprintf(tfp, "buffer read on seek: %u\n", 101 bufstats.mBufferReadUponSeek); 102 fprintf(tfp, "buffer unread on seek: %u\n", 103 bufstats.mBufferUnreadUponSeek); 104 fprintf(tfp, "bytes read from buffer: %u\n", 105 bufstats.mBytesReadFromBuffer); 106 for (uint32_t i = 0; i < bufstats.mBigSeekIndex; i++) { 107 fprintf(tfp, "bigseek[%u] = {old: %u, new: %u}\n", i, 108 bufstats.mBigSeek[i].mOldOffset, 109 bufstats.mBigSeek[i].mNewOffset); 110 } 111 } 112 } 113 #endif 114 } 115 116 NS_IMETHODIMP 117 nsBufferedStream::Seek(int32_t whence, int64_t offset) { 118 if (mStream == nullptr) { 119 return NS_BASE_STREAM_CLOSED; 120 } 121 122 // If the underlying stream isn't a random access store, then fail early. 123 // We could possibly succeed for the case where the seek position denotes 124 // something that happens to be read into the buffer, but that would make 125 // the failure data-dependent. 126 nsresult rv; 127 nsCOMPtr<nsISeekableStream> ras = do_QueryInterface(mStream, &rv); 128 if (NS_FAILED(rv)) { 129 NS_WARNING("mStream doesn't QI to nsISeekableStream"); 130 return rv; 131 } 132 133 int64_t absPos = 0; 134 switch (whence) { 135 case nsISeekableStream::NS_SEEK_SET: 136 absPos = offset; 137 break; 138 case nsISeekableStream::NS_SEEK_CUR: 139 absPos = mBufferStartOffset; 140 absPos += mCursor; 141 absPos += offset; 142 break; 143 case nsISeekableStream::NS_SEEK_END: 144 absPos = -1; 145 break; 146 default: 147 MOZ_ASSERT_UNREACHABLE("bogus seek whence parameter"); 148 return NS_ERROR_UNEXPECTED; 149 } 150 151 // Let mCursor point into the existing buffer if the new position is 152 // between the current cursor and the mFillPoint "fencepost" -- the 153 // client may never get around to a Read or Write after this Seek. 154 // Read and Write worry about flushing and filling in that event. 155 // But if we're at EOF, make sure to pass the seek through to the 156 // underlying stream, because it may have auto-closed itself and 157 // needs to reopen. 158 uint32_t offsetInBuffer = uint32_t(absPos - mBufferStartOffset); 159 if (offsetInBuffer <= mFillPoint && !mEOF) { 160 METER(bufstats.mSeeksWithinBuffer++); 161 mCursor = offsetInBuffer; 162 return NS_OK; 163 } 164 165 METER(bufstats.mSeeksOutsideBuffer++); 166 METER(bufstats.mBufferReadUponSeek += mCursor); 167 METER(bufstats.mBufferUnreadUponSeek += mFillPoint - mCursor); 168 rv = Flush(); 169 if (NS_FAILED(rv)) { 170 #ifdef DEBUG 171 NS_WARNING( 172 "(debug) Flush returned error within nsBufferedStream::Seek, so we " 173 "exit early."); 174 #endif 175 return rv; 176 } 177 178 rv = ras->Seek(whence, offset); 179 if (NS_FAILED(rv)) { 180 #ifdef DEBUG 181 NS_WARNING( 182 "(debug) Error: ras->Seek() returned error within " 183 "nsBufferedStream::Seek, so we exit early."); 184 #endif 185 return rv; 186 } 187 188 mEOF = false; 189 190 // Recompute whether the offset we're seeking to is in our buffer. 191 // Note that we need to recompute because Flush() might have 192 // changed mBufferStartOffset. 193 offsetInBuffer = uint32_t(absPos - mBufferStartOffset); 194 if (offsetInBuffer <= mFillPoint) { 195 // It's safe to just set mCursor to offsetInBuffer. In particular, we 196 // want to avoid calling Fill() here since we already have the data that 197 // was seeked to and calling Fill() might auto-close our underlying 198 // stream in some cases. 199 mCursor = offsetInBuffer; 200 return NS_OK; 201 } 202 203 METER(if (bufstats.mBigSeekIndex < MAX_BIG_SEEKS) 204 bufstats.mBigSeek[bufstats.mBigSeekIndex] 205 .mOldOffset = mBufferStartOffset + int64_t(mCursor)); 206 const int64_t minus1 = -1; 207 if (absPos == minus1) { 208 // then we had the SEEK_END case, above 209 int64_t tellPos; 210 rv = ras->Tell(&tellPos); 211 mBufferStartOffset = tellPos; 212 if (NS_FAILED(rv)) { 213 return rv; 214 } 215 } else { 216 mBufferStartOffset = absPos; 217 } 218 METER(if (bufstats.mBigSeekIndex < MAX_BIG_SEEKS) 219 bufstats.mBigSeek[bufstats.mBigSeekIndex++] 220 .mNewOffset = mBufferStartOffset); 221 222 mFillPoint = mCursor = 0; 223 224 // If we seeked back to the start, then don't fill the buffer 225 // right now in case this is a lazily-opened file stream. 226 // We'll fill on the first read, like we did initially. 227 if (whence == nsISeekableStream::NS_SEEK_SET && offset == 0) { 228 return NS_OK; 229 } 230 return Fill(); 231 } 232 233 NS_IMETHODIMP 234 nsBufferedStream::Tell(int64_t* result) { 235 if (mStream == nullptr) { 236 return NS_BASE_STREAM_CLOSED; 237 } 238 239 int64_t result64 = mBufferStartOffset; 240 result64 += mCursor; 241 *result = result64; 242 return NS_OK; 243 } 244 245 NS_IMETHODIMP 246 nsBufferedStream::SetEOF() { 247 if (mStream == nullptr) { 248 return NS_BASE_STREAM_CLOSED; 249 } 250 251 nsresult rv; 252 nsCOMPtr<nsISeekableStream> ras = do_QueryInterface(mStream, &rv); 253 if (NS_FAILED(rv)) { 254 return rv; 255 } 256 257 rv = ras->SetEOF(); 258 if (NS_SUCCEEDED(rv)) { 259 mEOF = true; 260 } 261 262 return rv; 263 } 264 265 nsresult nsBufferedStream::GetData(nsISupports** aResult) { 266 nsCOMPtr<nsISupports> stream(mStream); 267 stream.forget(aResult); 268 return NS_OK; 269 } 270 271 //////////////////////////////////////////////////////////////////////////////// 272 // nsBufferedInputStream 273 274 NS_IMPL_ADDREF_INHERITED(nsBufferedInputStream, nsBufferedStream) 275 NS_IMPL_RELEASE_INHERITED(nsBufferedInputStream, nsBufferedStream) 276 277 NS_IMPL_CLASSINFO(nsBufferedInputStream, nullptr, nsIClassInfo::THREADSAFE, 278 NS_BUFFEREDINPUTSTREAM_CID) 279 280 NS_INTERFACE_MAP_BEGIN(nsBufferedInputStream) 281 // Unfortunately there isn't a macro that combines ambiguous and conditional, 282 // and as far as I can tell, no other class would need such a macro. 283 if (mIsAsyncInputStream && aIID.Equals(NS_GET_IID(nsIInputStream))) { 284 foundInterface = 285 static_cast<nsIInputStream*>(static_cast<nsIAsyncInputStream*>(this)); 286 } else if (!mIsAsyncInputStream && aIID.Equals(NS_GET_IID(nsIInputStream))) { 287 foundInterface = static_cast<nsIInputStream*>( 288 static_cast<nsIBufferedInputStream*>(this)); 289 } else 290 NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIBufferedInputStream) 291 NS_INTERFACE_MAP_ENTRY(nsIBufferedInputStream) 292 NS_INTERFACE_MAP_ENTRY(nsIStreamBufferAccess) 293 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIIPCSerializableInputStream, 294 mIsIPCSerializable) 295 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStream, mIsAsyncInputStream) 296 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamCallback, 297 mIsAsyncInputStream) 298 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsICloneableInputStream, 299 mIsCloneableInputStream) 300 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamLength, mIsInputStreamLength) 301 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStreamLength, 302 mIsAsyncInputStreamLength) 303 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamLengthCallback, 304 mIsAsyncInputStreamLength) 305 NS_IMPL_QUERY_CLASSINFO(nsBufferedInputStream) 306 NS_INTERFACE_MAP_END_INHERITING(nsBufferedStream) 307 308 NS_IMPL_CI_INTERFACE_GETTER(nsBufferedInputStream, nsIInputStream, 309 nsIBufferedInputStream, nsISeekableStream, 310 nsITellableStream, nsIStreamBufferAccess) 311 312 nsresult nsBufferedInputStream::Create(REFNSIID aIID, void** aResult) { 313 RefPtr<nsBufferedInputStream> stream = new nsBufferedInputStream(); 314 return stream->QueryInterface(aIID, aResult); 315 } 316 317 NS_IMETHODIMP 318 nsBufferedInputStream::Init(nsIInputStream* stream, uint32_t bufferSize) { 319 nsresult rv = nsBufferedStream::Init(stream, bufferSize); 320 NS_ENSURE_SUCCESS(rv, rv); 321 322 { 323 nsCOMPtr<nsIIPCSerializableInputStream> stream = do_QueryInterface(mStream); 324 mIsIPCSerializable = !!stream; 325 } 326 327 { 328 nsCOMPtr<nsIAsyncInputStream> stream = do_QueryInterface(mStream); 329 mIsAsyncInputStream = !!stream; 330 } 331 332 { 333 nsCOMPtr<nsICloneableInputStream> stream = do_QueryInterface(mStream); 334 mIsCloneableInputStream = !!stream; 335 } 336 337 { 338 nsCOMPtr<nsIInputStreamLength> stream = do_QueryInterface(mStream); 339 mIsInputStreamLength = !!stream; 340 } 341 342 { 343 nsCOMPtr<nsIAsyncInputStreamLength> stream = do_QueryInterface(mStream); 344 mIsAsyncInputStreamLength = !!stream; 345 } 346 347 return NS_OK; 348 } 349 350 already_AddRefed<nsIInputStream> nsBufferedInputStream::GetInputStream() { 351 // A non-null mStream implies Init() has been called. 352 MOZ_ASSERT(mStream); 353 354 nsIInputStream* out = nullptr; 355 DebugOnly<nsresult> rv = QueryInterface(NS_GET_IID(nsIInputStream), 356 reinterpret_cast<void**>(&out)); 357 MOZ_ASSERT(NS_SUCCEEDED(rv)); 358 MOZ_ASSERT(out); 359 360 return already_AddRefed<nsIInputStream>(out); 361 } 362 363 NS_IMETHODIMP 364 nsBufferedInputStream::Close() { 365 nsresult rv = NS_OK; 366 if (mStream) { 367 rv = Source()->Close(); 368 if (NS_FAILED(rv)) { 369 NS_WARNING( 370 "(debug) Error: Source()->Close() returned error in " 371 "bsBuffedInputStream::Close()."); 372 } 373 } 374 375 nsBufferedStream::Close(); 376 return rv; 377 } 378 379 NS_IMETHODIMP 380 nsBufferedInputStream::Available(uint64_t* result) { 381 *result = 0; 382 383 if (!mStream) { 384 return NS_OK; 385 } 386 387 uint64_t avail = mFillPoint - mCursor; 388 389 uint64_t tmp; 390 nsresult rv = Source()->Available(&tmp); 391 if (NS_SUCCEEDED(rv)) { 392 avail += tmp; 393 } 394 395 if (avail) { 396 *result = avail; 397 return NS_OK; 398 } 399 400 return rv; 401 } 402 403 NS_IMETHODIMP 404 nsBufferedInputStream::StreamStatus() { 405 if (!mStream) { 406 return NS_OK; 407 } 408 409 if (mFillPoint - mCursor) { 410 return NS_OK; 411 } 412 413 return Source()->StreamStatus(); 414 } 415 416 NS_IMETHODIMP 417 nsBufferedInputStream::Read(char* buf, uint32_t count, uint32_t* result) { 418 if (mBufferDisabled) { 419 if (!mStream) { 420 *result = 0; 421 return NS_OK; 422 } 423 nsresult rv = Source()->Read(buf, count, result); 424 if (NS_SUCCEEDED(rv)) { 425 mBufferStartOffset += *result; // so nsBufferedStream::Tell works 426 if (*result == 0) { 427 mEOF = true; 428 } 429 } 430 return rv; 431 } 432 433 return ReadSegments(NS_CopySegmentToBuffer, buf, count, result); 434 } 435 436 NS_IMETHODIMP 437 nsBufferedInputStream::ReadSegments(nsWriteSegmentFun writer, void* closure, 438 uint32_t count, uint32_t* result) { 439 *result = 0; 440 441 if (!mStream) { 442 return NS_OK; 443 } 444 445 nsresult rv = NS_OK; 446 RecursiveMutexAutoLock lock(mBufferMutex); 447 while (count > 0) { 448 uint32_t amt = std::min(count, mFillPoint - mCursor); 449 if (amt > 0) { 450 uint32_t read = 0; 451 rv = writer(static_cast<nsIBufferedInputStream*>(this), closure, 452 mBuffer + mCursor, *result, amt, &read); 453 if (NS_FAILED(rv)) { 454 // errors returned from the writer end here! 455 rv = NS_OK; 456 break; 457 } 458 *result += read; 459 count -= read; 460 mCursor += read; 461 } else { 462 rv = Fill(); 463 if (rv == NS_BASE_STREAM_WOULD_BLOCK) { 464 break; 465 } 466 if (NS_FAILED(rv)) { 467 return rv; 468 } 469 if (mFillPoint == mCursor) { 470 break; 471 } 472 } 473 } 474 return (*result > 0) ? NS_OK : rv; 475 } 476 477 NS_IMETHODIMP 478 nsBufferedInputStream::IsNonBlocking(bool* aNonBlocking) { 479 if (mStream) { 480 return Source()->IsNonBlocking(aNonBlocking); 481 } 482 return NS_ERROR_NOT_INITIALIZED; 483 } 484 485 NS_IMETHODIMP 486 nsBufferedInputStream::Fill() { 487 if (mBufferDisabled) { 488 return NS_OK; 489 } 490 NS_ENSURE_TRUE(mStream, NS_ERROR_NOT_INITIALIZED); 491 492 RecursiveMutexAutoLock lock(mBufferMutex); 493 494 nsresult rv; 495 int32_t rem = int32_t(mFillPoint - mCursor); 496 if (rem > 0) { 497 // slide the remainder down to the start of the buffer 498 // |<------------->|<--rem-->|<--->| 499 // b c f s 500 memcpy(mBuffer, mBuffer + mCursor, rem); 501 } 502 mBufferStartOffset += mCursor; 503 mFillPoint = rem; 504 mCursor = 0; 505 506 uint32_t amt; 507 rv = Source()->Read(mBuffer + mFillPoint, mBufferSize - mFillPoint, &amt); 508 if (NS_FAILED(rv)) { 509 return rv; 510 } 511 512 if (amt == 0) { 513 mEOF = true; 514 } 515 516 mFillPoint += amt; 517 return NS_OK; 518 } 519 520 NS_IMETHODIMP_(char*) 521 nsBufferedInputStream::GetBuffer(uint32_t aLength, uint32_t aAlignMask) { 522 NS_ASSERTION(mGetBufferCount == 0, "nested GetBuffer!"); 523 if (mGetBufferCount != 0) { 524 return nullptr; 525 } 526 527 if (mBufferDisabled) { 528 return nullptr; 529 } 530 531 RecursiveMutexAutoLock lock(mBufferMutex); 532 char* buf = mBuffer + mCursor; 533 uint32_t rem = mFillPoint - mCursor; 534 if (rem == 0) { 535 if (NS_FAILED(Fill())) { 536 return nullptr; 537 } 538 buf = mBuffer + mCursor; 539 rem = mFillPoint - mCursor; 540 } 541 542 uint32_t mod = (NS_PTR_TO_INT32(buf) & aAlignMask); 543 if (mod) { 544 uint32_t pad = aAlignMask + 1 - mod; 545 if (pad > rem) { 546 return nullptr; 547 } 548 549 memset(buf, 0, pad); 550 mCursor += pad; 551 buf += pad; 552 rem -= pad; 553 } 554 555 if (aLength > rem) { 556 return nullptr; 557 } 558 mGetBufferCount++; 559 return buf; 560 } 561 562 NS_IMETHODIMP_(void) 563 nsBufferedInputStream::PutBuffer(char* aBuffer, uint32_t aLength) { 564 NS_ASSERTION(mGetBufferCount == 1, "stray PutBuffer!"); 565 if (--mGetBufferCount != 0) { 566 return; 567 } 568 569 NS_ASSERTION(mCursor + aLength <= mFillPoint, "PutBuffer botch"); 570 mCursor += aLength; 571 } 572 573 NS_IMETHODIMP 574 nsBufferedInputStream::DisableBuffering() { 575 NS_ASSERTION(!mBufferDisabled, "redundant call to DisableBuffering!"); 576 NS_ASSERTION(mGetBufferCount == 0, 577 "DisableBuffer call between GetBuffer and PutBuffer!"); 578 if (mGetBufferCount != 0) { 579 return NS_ERROR_UNEXPECTED; 580 } 581 582 // Empty the buffer so nsBufferedStream::Tell works. 583 mBufferStartOffset += mCursor; 584 mFillPoint = mCursor = 0; 585 mBufferDisabled = true; 586 return NS_OK; 587 } 588 589 NS_IMETHODIMP 590 nsBufferedInputStream::EnableBuffering() { 591 NS_ASSERTION(mBufferDisabled, "gratuitous call to EnableBuffering!"); 592 mBufferDisabled = false; 593 return NS_OK; 594 } 595 596 NS_IMETHODIMP 597 nsBufferedInputStream::GetUnbufferedStream(nsISupports** aStream) { 598 // Empty the buffer so subsequent i/o trumps any buffered data. 599 mBufferStartOffset += mCursor; 600 mFillPoint = mCursor = 0; 601 602 nsCOMPtr<nsISupports> stream = mStream; 603 stream.forget(aStream); 604 return NS_OK; 605 } 606 607 void nsBufferedInputStream::SerializedComplexity(uint32_t aMaxSize, 608 uint32_t* aSizeUsed, 609 uint32_t* aPipes, 610 uint32_t* aTransferables) { 611 if (mStream) { 612 nsCOMPtr<nsIInputStream> stream = do_QueryInterface(mStream); 613 MOZ_ASSERT(stream); 614 615 InputStreamHelper::SerializedComplexity(stream, aMaxSize, aSizeUsed, aPipes, 616 aTransferables); 617 } 618 } 619 620 void nsBufferedInputStream::Serialize(InputStreamParams& aParams, 621 uint32_t aMaxSize, uint32_t* aSizeUsed) { 622 MOZ_ASSERT(aSizeUsed); 623 *aSizeUsed = 0; 624 625 BufferedInputStreamParams params; 626 627 if (mStream) { 628 nsCOMPtr<nsIInputStream> stream = do_QueryInterface(mStream); 629 MOZ_ASSERT(stream); 630 631 InputStreamParams wrappedParams; 632 InputStreamHelper::SerializeInputStream(stream, wrappedParams, aMaxSize, 633 aSizeUsed); 634 635 params.optionalStream().emplace(wrappedParams); 636 } 637 638 params.bufferSize() = mBufferSize; 639 640 aParams = params; 641 } 642 643 bool nsBufferedInputStream::Deserialize(const InputStreamParams& aParams) { 644 if (aParams.type() != InputStreamParams::TBufferedInputStreamParams) { 645 NS_ERROR("Received unknown parameters from the other process!"); 646 return false; 647 } 648 649 const BufferedInputStreamParams& params = 650 aParams.get_BufferedInputStreamParams(); 651 const Maybe<InputStreamParams>& wrappedParams = params.optionalStream(); 652 653 nsCOMPtr<nsIInputStream> stream; 654 if (wrappedParams.isSome()) { 655 stream = InputStreamHelper::DeserializeInputStream(wrappedParams.ref()); 656 if (!stream) { 657 NS_WARNING("Failed to deserialize wrapped stream!"); 658 return false; 659 } 660 } 661 662 nsresult rv = Init(stream, params.bufferSize()); 663 NS_ENSURE_SUCCESS(rv, false); 664 665 return true; 666 } 667 668 NS_IMETHODIMP 669 nsBufferedInputStream::CloseWithStatus(nsresult aStatus) { return Close(); } 670 671 NS_IMETHODIMP 672 nsBufferedInputStream::AsyncWait(nsIInputStreamCallback* aCallback, 673 uint32_t aFlags, uint32_t aRequestedCount, 674 nsIEventTarget* aEventTarget) { 675 nsCOMPtr<nsIAsyncInputStream> stream = do_QueryInterface(mStream); 676 if (!stream) { 677 // Stream is probably closed. Callback, if not nullptr, can be executed 678 // immediately 679 if (!aCallback) { 680 return NS_OK; 681 } 682 683 if (aEventTarget) { 684 nsCOMPtr<nsIInputStreamCallback> callable = NS_NewInputStreamReadyEvent( 685 "nsBufferedInputStream::OnInputStreamReady", aCallback, aEventTarget); 686 return callable->OnInputStreamReady(this); 687 } 688 689 aCallback->OnInputStreamReady(this); 690 return NS_OK; 691 } 692 693 nsCOMPtr<nsIInputStreamCallback> callback = aCallback ? this : nullptr; 694 { 695 MutexAutoLock lock(mMutex); 696 697 if (NS_WARN_IF(mAsyncWaitCallback && aCallback && 698 mAsyncWaitCallback != aCallback)) { 699 return NS_ERROR_FAILURE; 700 } 701 702 mAsyncWaitCallback = aCallback; 703 } 704 705 return stream->AsyncWait(callback, aFlags, aRequestedCount, aEventTarget); 706 } 707 708 NS_IMETHODIMP 709 nsBufferedInputStream::OnInputStreamReady(nsIAsyncInputStream* aStream) { 710 nsCOMPtr<nsIInputStreamCallback> callback; 711 { 712 MutexAutoLock lock(mMutex); 713 714 // We have been canceled in the meanwhile. 715 if (!mAsyncWaitCallback) { 716 return NS_OK; 717 } 718 719 callback.swap(mAsyncWaitCallback); 720 } 721 722 MOZ_ASSERT(callback); 723 return callback->OnInputStreamReady(this); 724 } 725 726 NS_IMETHODIMP 727 nsBufferedInputStream::GetData(nsIInputStream** aResult) { 728 nsCOMPtr<nsISupports> stream; 729 nsBufferedStream::GetData(getter_AddRefs(stream)); 730 nsCOMPtr<nsIInputStream> inputStream = do_QueryInterface(stream); 731 inputStream.forget(aResult); 732 return NS_OK; 733 } 734 735 // nsICloneableInputStream interface 736 737 NS_IMETHODIMP 738 nsBufferedInputStream::GetCloneable(bool* aCloneable) { 739 *aCloneable = false; 740 741 RecursiveMutexAutoLock lock(mBufferMutex); 742 743 // If we don't have the buffer, the inputStream has been already closed. 744 // If mBufferStartOffset is not 0, the stream has been seeked or read. 745 // In both case the cloning is not supported. 746 if (!mBuffer || mBufferStartOffset) { 747 return NS_OK; 748 } 749 750 nsCOMPtr<nsICloneableInputStream> stream = do_QueryInterface(mStream); 751 752 // GetCloneable is infallible. 753 NS_ENSURE_TRUE(stream, NS_OK); 754 755 return stream->GetCloneable(aCloneable); 756 } 757 758 NS_IMETHODIMP 759 nsBufferedInputStream::Clone(nsIInputStream** aResult) { 760 RecursiveMutexAutoLock lock(mBufferMutex); 761 762 if (!mBuffer || mBufferStartOffset) { 763 return NS_ERROR_FAILURE; 764 } 765 766 nsCOMPtr<nsICloneableInputStream> stream = do_QueryInterface(mStream); 767 NS_ENSURE_TRUE(stream, NS_ERROR_FAILURE); 768 769 nsCOMPtr<nsIInputStream> clonedStream; 770 nsresult rv = stream->Clone(getter_AddRefs(clonedStream)); 771 NS_ENSURE_SUCCESS(rv, rv); 772 773 nsCOMPtr<nsIBufferedInputStream> bis = new nsBufferedInputStream(); 774 rv = bis->Init(clonedStream, mBufferSize); 775 NS_ENSURE_SUCCESS(rv, rv); 776 777 *aResult = 778 static_cast<nsBufferedInputStream*>(bis.get())->GetInputStream().take(); 779 780 return NS_OK; 781 } 782 783 // nsIInputStreamLength 784 785 NS_IMETHODIMP 786 nsBufferedInputStream::Length(int64_t* aLength) { 787 nsCOMPtr<nsIInputStreamLength> stream = do_QueryInterface(mStream); 788 NS_ENSURE_TRUE(stream, NS_ERROR_FAILURE); 789 790 return stream->Length(aLength); 791 } 792 793 // nsIAsyncInputStreamLength 794 795 NS_IMETHODIMP 796 nsBufferedInputStream::AsyncLengthWait(nsIInputStreamLengthCallback* aCallback, 797 nsIEventTarget* aEventTarget) { 798 nsCOMPtr<nsIAsyncInputStreamLength> stream = do_QueryInterface(mStream); 799 if (!stream) { 800 // Stream is probably closed. Callback, if not nullptr, can be executed 801 // immediately 802 if (aCallback) { 803 const RefPtr<nsBufferedInputStream> self = this; 804 const nsCOMPtr<nsIInputStreamLengthCallback> callback = aCallback; 805 nsCOMPtr<nsIRunnable> runnable = NS_NewRunnableFunction( 806 "nsBufferedInputStream::OnInputStreamLengthReady", 807 [self, callback] { callback->OnInputStreamLengthReady(self, -1); }); 808 809 if (aEventTarget) { 810 aEventTarget->Dispatch(runnable, NS_DISPATCH_NORMAL); 811 } else { 812 runnable->Run(); 813 } 814 } 815 return NS_OK; 816 } 817 818 nsCOMPtr<nsIInputStreamLengthCallback> callback = aCallback ? this : nullptr; 819 { 820 MutexAutoLock lock(mMutex); 821 mAsyncInputStreamLengthCallback = aCallback; 822 } 823 824 MOZ_ASSERT(stream); 825 return stream->AsyncLengthWait(callback, aEventTarget); 826 } 827 828 // nsIInputStreamLengthCallback 829 830 NS_IMETHODIMP 831 nsBufferedInputStream::OnInputStreamLengthReady( 832 nsIAsyncInputStreamLength* aStream, int64_t aLength) { 833 nsCOMPtr<nsIInputStreamLengthCallback> callback; 834 { 835 MutexAutoLock lock(mMutex); 836 // We have been canceled in the meanwhile. 837 if (!mAsyncInputStreamLengthCallback) { 838 return NS_OK; 839 } 840 841 callback.swap(mAsyncInputStreamLengthCallback); 842 } 843 844 MOZ_ASSERT(callback); 845 return callback->OnInputStreamLengthReady(this, aLength); 846 } 847 848 //////////////////////////////////////////////////////////////////////////////// 849 // nsBufferedOutputStream 850 851 NS_IMPL_ADDREF_INHERITED(nsBufferedOutputStream, nsBufferedStream) 852 NS_IMPL_RELEASE_INHERITED(nsBufferedOutputStream, nsBufferedStream) 853 // This QI uses NS_INTERFACE_MAP_ENTRY_CONDITIONAL to check for 854 // non-nullness of mSafeStream. 855 NS_INTERFACE_MAP_BEGIN(nsBufferedOutputStream) 856 NS_INTERFACE_MAP_ENTRY(nsIOutputStream) 857 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsISafeOutputStream, mSafeStream) 858 NS_INTERFACE_MAP_ENTRY(nsIBufferedOutputStream) 859 NS_INTERFACE_MAP_ENTRY(nsIStreamBufferAccess) 860 NS_INTERFACE_MAP_END_INHERITING(nsBufferedStream) 861 862 nsresult nsBufferedOutputStream::Create(REFNSIID aIID, void** aResult) { 863 RefPtr<nsBufferedOutputStream> stream = new nsBufferedOutputStream(); 864 return stream->QueryInterface(aIID, aResult); 865 } 866 867 NS_IMETHODIMP 868 nsBufferedOutputStream::Init(nsIOutputStream* stream, uint32_t bufferSize) { 869 // QI stream to an nsISafeOutputStream, to see if we should support it 870 mSafeStream = do_QueryInterface(stream); 871 872 return nsBufferedStream::Init(stream, bufferSize); 873 } 874 875 NS_IMETHODIMP 876 nsBufferedOutputStream::Close() { 877 if (!mStream) { 878 return NS_OK; 879 } 880 881 nsresult rv1, rv2 = NS_OK; 882 883 rv1 = Flush(); 884 885 #ifdef DEBUG 886 if (NS_FAILED(rv1)) { 887 NS_WARNING( 888 "(debug) Flush() inside nsBufferedOutputStream::Close() returned error " 889 "(rv1)."); 890 } 891 #endif 892 893 // If we fail to Flush all the data, then we close anyway and drop the 894 // remaining data in the buffer. We do this because it's what Unix does 895 // for fclose and close. However, we report the error from Flush anyway. 896 if (mStream) { 897 rv2 = Sink()->Close(); 898 #ifdef DEBUG 899 if (NS_FAILED(rv2)) { 900 NS_WARNING( 901 "(debug) Sink->Close() inside nsBufferedOutputStream::Close() " 902 "returned error (rv2)."); 903 } 904 #endif 905 } 906 nsBufferedStream::Close(); 907 908 if (NS_FAILED(rv1)) { 909 return rv1; 910 } 911 if (NS_FAILED(rv2)) { 912 return rv2; 913 } 914 return NS_OK; 915 } 916 917 NS_IMETHODIMP 918 nsBufferedOutputStream::StreamStatus() { 919 return mStream ? Sink()->StreamStatus() : NS_BASE_STREAM_CLOSED; 920 } 921 922 NS_IMETHODIMP 923 nsBufferedOutputStream::Write(const char* buf, uint32_t count, 924 uint32_t* result) { 925 nsresult rv = NS_OK; 926 uint32_t written = 0; 927 *result = 0; 928 if (!mStream) { 929 // We special case this situation. 930 // We should catch the failure, NS_BASE_STREAM_CLOSED ASAP, here. 931 // If we don't, eventually Flush() is called in the while loop below 932 // after so many writes. 933 // However, Flush() returns NS_OK when mStream is null (!!), 934 // and we don't get a meaningful error, NS_BASE_STREAM_CLOSED, 935 // soon enough when we use buffered output. 936 #ifdef DEBUG 937 NS_WARNING( 938 "(info) nsBufferedOutputStream::Write returns NS_BASE_STREAM_CLOSED " 939 "immediately (mStream==null)."); 940 #endif 941 return NS_BASE_STREAM_CLOSED; 942 } 943 944 RecursiveMutexAutoLock lock(mBufferMutex); 945 while (count > 0) { 946 uint32_t amt = std::min(count, mBufferSize - mCursor); 947 if (amt > 0) { 948 memcpy(mBuffer + mCursor, buf + written, amt); 949 written += amt; 950 count -= amt; 951 mCursor += amt; 952 if (mFillPoint < mCursor) mFillPoint = mCursor; 953 } else { 954 NS_ASSERTION(mFillPoint, "loop in nsBufferedOutputStream::Write!"); 955 rv = Flush(); 956 if (NS_FAILED(rv)) { 957 #ifdef DEBUG 958 NS_WARNING( 959 "(debug) Flush() returned error in nsBufferedOutputStream::Write."); 960 #endif 961 break; 962 } 963 } 964 } 965 *result = written; 966 return (written > 0) ? NS_OK : rv; 967 } 968 969 NS_IMETHODIMP 970 nsBufferedOutputStream::Flush() { 971 nsresult rv; 972 uint32_t amt; 973 if (!mStream) { 974 // Stream already cancelled/flushed; probably because of previous error. 975 return NS_OK; 976 } 977 // optimize : some code within C-C needs to call Seek -> Flush() often. 978 if (mFillPoint == 0) { 979 return NS_OK; 980 } 981 RecursiveMutexAutoLock lock(mBufferMutex); 982 rv = Sink()->Write(mBuffer, mFillPoint, &amt); 983 if (NS_FAILED(rv)) { 984 return rv; 985 } 986 mBufferStartOffset += amt; 987 if (amt == mFillPoint) { 988 mFillPoint = mCursor = 0; 989 return NS_OK; // flushed everything 990 } 991 992 // slide the remainder down to the start of the buffer 993 // |<-------------->|<---|----->| 994 // b a c s 995 uint32_t rem = mFillPoint - amt; 996 memmove(mBuffer, mBuffer + amt, rem); 997 mFillPoint = mCursor = rem; 998 return NS_ERROR_FAILURE; // didn't flush all 999 } 1000 1001 // nsISafeOutputStream 1002 NS_IMETHODIMP 1003 nsBufferedOutputStream::Finish() { 1004 // flush the stream, to write out any buffered data... 1005 nsresult rv1 = nsBufferedOutputStream::Flush(); 1006 nsresult rv2 = NS_OK; 1007 1008 if (NS_FAILED(rv1)) { 1009 NS_WARNING( 1010 "(debug) nsBufferedOutputStream::Flush() failed in " 1011 "nsBufferedOutputStream::Finish()! Possible dataloss."); 1012 1013 rv2 = Sink()->Close(); 1014 if (NS_FAILED(rv2)) { 1015 NS_WARNING( 1016 "(debug) Sink()->Close() failed in nsBufferedOutputStream::Finish()! " 1017 "Possible dataloss."); 1018 } 1019 } else { 1020 rv2 = mSafeStream->Finish(); 1021 if (NS_FAILED(rv2)) { 1022 NS_WARNING( 1023 "(debug) mSafeStream->Finish() failed within " 1024 "nsBufferedOutputStream::Flush()! Possible dataloss."); 1025 } 1026 } 1027 1028 // ... and close the buffered stream, so any further attempts to flush/close 1029 // the buffered stream won't cause errors. 1030 nsBufferedStream::Close(); 1031 1032 // We want to return the errors precisely from Finish() 1033 // and mimick the existing error handling in 1034 // nsBufferedOutputStream::Close() as reference. 1035 1036 if (NS_FAILED(rv1)) { 1037 return rv1; 1038 } 1039 if (NS_FAILED(rv2)) { 1040 return rv2; 1041 } 1042 return NS_OK; 1043 } 1044 1045 NS_IMETHODIMP 1046 nsBufferedOutputStream::WriteFrom(nsIInputStream* inStr, uint32_t count, 1047 uint32_t* _retval) { 1048 return WriteSegments(NS_CopyStreamToSegment, inStr, count, _retval); 1049 } 1050 1051 NS_IMETHODIMP 1052 nsBufferedOutputStream::WriteSegments(nsReadSegmentFun reader, void* closure, 1053 uint32_t count, uint32_t* _retval) { 1054 *_retval = 0; 1055 nsresult rv; 1056 RecursiveMutexAutoLock lock(mBufferMutex); 1057 while (count > 0) { 1058 uint32_t left = std::min(count, mBufferSize - mCursor); 1059 if (left == 0) { 1060 rv = Flush(); 1061 if (NS_FAILED(rv)) { 1062 return (*_retval > 0) ? NS_OK : rv; 1063 } 1064 1065 continue; 1066 } 1067 1068 uint32_t read = 0; 1069 rv = reader(this, closure, mBuffer + mCursor, *_retval, left, &read); 1070 1071 if (NS_FAILED(rv)) { // If we have read some data, return ok 1072 return (*_retval > 0) ? NS_OK : rv; 1073 } 1074 mCursor += read; 1075 *_retval += read; 1076 count -= read; 1077 mFillPoint = std::max(mFillPoint, mCursor); 1078 } 1079 return NS_OK; 1080 } 1081 1082 NS_IMETHODIMP 1083 nsBufferedOutputStream::IsNonBlocking(bool* aNonBlocking) { 1084 if (mStream) { 1085 return Sink()->IsNonBlocking(aNonBlocking); 1086 } 1087 return NS_ERROR_NOT_INITIALIZED; 1088 } 1089 1090 NS_IMETHODIMP_(char*) 1091 nsBufferedOutputStream::GetBuffer(uint32_t aLength, uint32_t aAlignMask) { 1092 NS_ASSERTION(mGetBufferCount == 0, "nested GetBuffer!"); 1093 if (mGetBufferCount != 0) { 1094 return nullptr; 1095 } 1096 1097 if (mBufferDisabled) { 1098 return nullptr; 1099 } 1100 1101 RecursiveMutexAutoLock lock(mBufferMutex); 1102 char* buf = mBuffer + mCursor; 1103 uint32_t rem = mBufferSize - mCursor; 1104 if (rem == 0) { 1105 if (NS_FAILED(Flush())) { 1106 return nullptr; 1107 } 1108 buf = mBuffer + mCursor; 1109 rem = mBufferSize - mCursor; 1110 } 1111 1112 uint32_t mod = (NS_PTR_TO_INT32(buf) & aAlignMask); 1113 if (mod) { 1114 uint32_t pad = aAlignMask + 1 - mod; 1115 if (pad > rem) { 1116 return nullptr; 1117 } 1118 1119 memset(buf, 0, pad); 1120 mCursor += pad; 1121 buf += pad; 1122 rem -= pad; 1123 } 1124 1125 if (aLength > rem) { 1126 return nullptr; 1127 } 1128 mGetBufferCount++; 1129 return buf; 1130 } 1131 1132 NS_IMETHODIMP_(void) 1133 nsBufferedOutputStream::PutBuffer(char* aBuffer, uint32_t aLength) { 1134 NS_ASSERTION(mGetBufferCount == 1, "stray PutBuffer!"); 1135 if (--mGetBufferCount != 0) { 1136 return; 1137 } 1138 1139 NS_ASSERTION(mCursor + aLength <= mBufferSize, "PutBuffer botch"); 1140 mCursor += aLength; 1141 if (mFillPoint < mCursor) { 1142 mFillPoint = mCursor; 1143 } 1144 } 1145 1146 NS_IMETHODIMP 1147 nsBufferedOutputStream::DisableBuffering() { 1148 NS_ASSERTION(!mBufferDisabled, "redundant call to DisableBuffering!"); 1149 NS_ASSERTION(mGetBufferCount == 0, 1150 "DisableBuffer call between GetBuffer and PutBuffer!"); 1151 if (mGetBufferCount != 0) { 1152 return NS_ERROR_UNEXPECTED; 1153 } 1154 1155 // Empty the buffer so nsBufferedStream::Tell works. 1156 nsresult rv = Flush(); 1157 if (NS_FAILED(rv)) { 1158 return rv; 1159 } 1160 1161 mBufferDisabled = true; 1162 return NS_OK; 1163 } 1164 1165 NS_IMETHODIMP 1166 nsBufferedOutputStream::EnableBuffering() { 1167 NS_ASSERTION(mBufferDisabled, "gratuitous call to EnableBuffering!"); 1168 mBufferDisabled = false; 1169 return NS_OK; 1170 } 1171 1172 NS_IMETHODIMP 1173 nsBufferedOutputStream::GetUnbufferedStream(nsISupports** aStream) { 1174 // Empty the buffer so subsequent i/o trumps any buffered data. 1175 if (mFillPoint) { 1176 nsresult rv = Flush(); 1177 if (NS_FAILED(rv)) { 1178 return rv; 1179 } 1180 } 1181 1182 nsCOMPtr<nsISupports> stream = mStream; 1183 stream.forget(aStream); 1184 return NS_OK; 1185 } 1186 1187 NS_IMETHODIMP 1188 nsBufferedOutputStream::GetData(nsIOutputStream** aResult) { 1189 nsCOMPtr<nsISupports> stream; 1190 nsBufferedStream::GetData(getter_AddRefs(stream)); 1191 nsCOMPtr<nsIOutputStream> outputStream = do_QueryInterface(stream); 1192 outputStream.forget(aResult); 1193 return NS_OK; 1194 } 1195 #undef METER 1196 1197 ////////////////////////////////////////////////////////////////////////////////