DataPipe.cpp (28025B)
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */ 3 /* This Source Code Form is subject to the terms of the Mozilla Public 4 * License, v. 2.0. If a copy of the MPL was not distributed with this 5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 6 7 #include "DataPipe.h" 8 #include "mozilla/AlreadyAddRefed.h" 9 #include "mozilla/Assertions.h" 10 #include "mozilla/CheckedInt.h" 11 #include "mozilla/ErrorNames.h" 12 #include "mozilla/Logging.h" 13 #include "mozilla/MoveOnlyFunction.h" 14 #include "mozilla/ipc/InputStreamParams.h" 15 #include "nsIAsyncInputStream.h" 16 #include "nsStreamUtils.h" 17 #include "nsThreadUtils.h" 18 19 namespace mozilla { 20 namespace ipc { 21 22 LazyLogModule gDataPipeLog("DataPipe"); 23 24 namespace data_pipe_detail { 25 26 // Helper for queueing up actions to be run once the mutex has been unlocked. 27 // Actions will be run in-order. 28 class MOZ_SCOPED_CAPABILITY DataPipeAutoLock { 29 public: 30 explicit DataPipeAutoLock(Mutex& aMutex) MOZ_CAPABILITY_ACQUIRE(aMutex) 31 : mMutex(aMutex) { 32 mMutex.Lock(); 33 } 34 DataPipeAutoLock(const DataPipeAutoLock&) = delete; 35 DataPipeAutoLock& operator=(const DataPipeAutoLock&) = delete; 36 37 template <typename F> 38 void AddUnlockAction(F aAction) { 39 mActions.AppendElement(std::move(aAction)); 40 } 41 42 ~DataPipeAutoLock() MOZ_CAPABILITY_RELEASE() { 43 mMutex.Unlock(); 44 for (auto& action : mActions) { 45 action(); 46 } 47 } 48 49 private: 50 Mutex& mMutex; 51 AutoTArray<MoveOnlyFunction<void()>, 4> mActions; 52 }; 53 54 static void DoNotifyOnUnlock(DataPipeAutoLock& aLock, 55 already_AddRefed<nsIRunnable> aCallback, 56 already_AddRefed<nsIEventTarget> aTarget) { 57 nsCOMPtr<nsIRunnable> callback{std::move(aCallback)}; 58 nsCOMPtr<nsIEventTarget> target{std::move(aTarget)}; 59 if (callback) { 60 aLock.AddUnlockAction( 61 [callback = std::move(callback), target = std::move(target)]() mutable { 62 if (target) { 63 target->Dispatch(callback.forget()); 64 } else { 65 NS_DispatchBackgroundTask(callback.forget()); 66 } 67 }); 68 } 69 } 70 71 class DataPipeLink : public NodeController::PortObserver { 72 public: 73 DataPipeLink(bool aReceiverSide, std::shared_ptr<Mutex> aMutex, 74 ScopedPort aPort, MutableSharedMemoryHandle&& aShmemHandle, 75 const std::shared_ptr<SharedMemoryMapping> aShmem, 76 uint32_t aCapacity, nsresult aPeerStatus, uint32_t aOffset, 77 uint32_t aAvailable) 78 : mMutex(std::move(aMutex)), 79 mPort(std::move(aPort)), 80 mShmemHandle(std::move(aShmemHandle)), 81 mShmem(aShmem), 82 mCapacity(aCapacity), 83 mReceiverSide(aReceiverSide), 84 mPeerStatus(aPeerStatus), 85 mOffset(aOffset), 86 mAvailable(aAvailable) {} 87 88 void Init() MOZ_EXCLUDES(*mMutex) { 89 { 90 DataPipeAutoLock lock(*mMutex); 91 if (NS_FAILED(mPeerStatus)) { 92 return; 93 } 94 MOZ_ASSERT(mPort.IsValid()); 95 mPort.Controller()->SetPortObserver(mPort.Port(), this); 96 } 97 OnPortStatusChanged(); 98 } 99 100 void OnPortStatusChanged() final MOZ_EXCLUDES(*mMutex); 101 102 // Add a task to notify the callback after `aLock` is unlocked. 103 // 104 // This method is safe to call multiple times, as after the first time it is 105 // called, `mCallback` will be cleared. 106 void NotifyOnUnlock(DataPipeAutoLock& aLock) MOZ_REQUIRES(*mMutex) { 107 DoNotifyOnUnlock(aLock, mCallback.forget(), mCallbackTarget.forget()); 108 } 109 110 void SendBytesConsumedOnUnlock(DataPipeAutoLock& aLock, uint32_t aBytes) 111 MOZ_REQUIRES(*mMutex) { 112 MOZ_LOG(gDataPipeLog, LogLevel::Verbose, 113 ("SendOnUnlock CONSUMED(%u) %s", aBytes, Describe(aLock).get())); 114 if (NS_FAILED(mPeerStatus)) { 115 return; 116 } 117 118 // `mPort` may be destroyed by `SetPeerError` after the DataPipe is unlocked 119 // but before we send the message. The strong controller and port references 120 // will allow us to try to send the message anyway, and it will be safely 121 // dropped if the port has already been closed. CONSUMED messages are safe 122 // to deliver out-of-order, so we don't need to worry about ordering here. 123 aLock.AddUnlockAction([controller = RefPtr{mPort.Controller()}, 124 port = mPort.Port(), aBytes]() mutable { 125 auto message = MakeUnique<IPC::Message>( 126 MSG_ROUTING_NONE, DATA_PIPE_BYTES_CONSUMED_MESSAGE_TYPE); 127 IPC::MessageWriter writer(*message); 128 WriteParam(&writer, aBytes); 129 controller->SendUserMessage(port, std::move(message)); 130 }); 131 } 132 133 void SetPeerError(DataPipeAutoLock& aLock, nsresult aStatus, 134 bool aSendClosed = false) MOZ_REQUIRES(*mMutex) { 135 MOZ_LOG(gDataPipeLog, LogLevel::Debug, 136 ("SetPeerError(%s%s) %s", GetStaticErrorName(aStatus), 137 aSendClosed ? ", send" : "", Describe(aLock).get())); 138 // The pipe was closed or errored. Clear the observer reference back 139 // to this type from the port layer, and ensure we notify waiters. 140 MOZ_ASSERT(NS_SUCCEEDED(mPeerStatus)); 141 mPeerStatus = NS_SUCCEEDED(aStatus) ? NS_BASE_STREAM_CLOSED : aStatus; 142 aLock.AddUnlockAction([port = std::move(mPort), aStatus, aSendClosed] { 143 if (aSendClosed) { 144 auto message = MakeUnique<IPC::Message>(MSG_ROUTING_NONE, 145 DATA_PIPE_CLOSED_MESSAGE_TYPE); 146 IPC::MessageWriter writer(*message); 147 WriteParam(&writer, aStatus); 148 port.Controller()->SendUserMessage(port.Port(), std::move(message)); 149 } 150 // The `ScopedPort` being destroyed with this action will close it, 151 // clearing the observer reference from the ports layer. 152 }); 153 NotifyOnUnlock(aLock); 154 } 155 156 nsCString Describe(DataPipeAutoLock& aLock) const MOZ_REQUIRES(*mMutex) { 157 return nsPrintfCString( 158 "[%s(%p) c=%u e=%s o=%u a=%u, cb=%s]", 159 mReceiverSide ? "Receiver" : "Sender", this, mCapacity, 160 GetStaticErrorName(mPeerStatus), mOffset, mAvailable, 161 mCallback ? (mCallbackClosureOnly ? "clo" : "yes") : "no"); 162 } 163 164 // This mutex is shared with the `DataPipeBase` which owns this 165 // `DataPipeLink`. 166 std::shared_ptr<Mutex> mMutex; 167 168 ScopedPort mPort MOZ_GUARDED_BY(*mMutex); 169 MutableSharedMemoryHandle mShmemHandle MOZ_GUARDED_BY(*mMutex); 170 const std::shared_ptr<SharedMemoryMapping> mShmem; 171 const uint32_t mCapacity; 172 const bool mReceiverSide; 173 174 bool mProcessingSegment MOZ_GUARDED_BY(*mMutex) = false; 175 176 nsresult mPeerStatus MOZ_GUARDED_BY(*mMutex) = NS_OK; 177 uint32_t mOffset MOZ_GUARDED_BY(*mMutex) = 0; 178 uint32_t mAvailable MOZ_GUARDED_BY(*mMutex) = 0; 179 180 bool mCallbackClosureOnly MOZ_GUARDED_BY(*mMutex) = false; 181 nsCOMPtr<nsIRunnable> mCallback MOZ_GUARDED_BY(*mMutex); 182 nsCOMPtr<nsIEventTarget> mCallbackTarget MOZ_GUARDED_BY(*mMutex); 183 }; 184 185 void DataPipeLink::OnPortStatusChanged() { 186 DataPipeAutoLock lock(*mMutex); 187 188 while (NS_SUCCEEDED(mPeerStatus)) { 189 UniquePtr<IPC::Message> message; 190 if (!mPort.Controller()->GetMessage(mPort.Port(), &message)) { 191 SetPeerError(lock, NS_BASE_STREAM_CLOSED); 192 return; 193 } 194 if (!message) { 195 return; // no more messages 196 } 197 198 IPC::MessageReader reader(*message); 199 switch (message->type()) { 200 case DATA_PIPE_CLOSED_MESSAGE_TYPE: { 201 nsresult status = NS_OK; 202 if (!ReadParam(&reader, &status)) { 203 NS_WARNING("Unable to parse nsresult error from peer"); 204 status = NS_ERROR_UNEXPECTED; 205 } 206 MOZ_LOG(gDataPipeLog, LogLevel::Debug, 207 ("Got CLOSED(%s) %s", GetStaticErrorName(status), 208 Describe(lock).get())); 209 SetPeerError(lock, status); 210 return; 211 } 212 case DATA_PIPE_BYTES_CONSUMED_MESSAGE_TYPE: { 213 uint32_t consumed = 0; 214 if (!ReadParam(&reader, &consumed)) { 215 NS_WARNING("Unable to parse bytes consumed from peer"); 216 SetPeerError(lock, NS_ERROR_UNEXPECTED); 217 return; 218 } 219 220 MOZ_LOG(gDataPipeLog, LogLevel::Verbose, 221 ("Got CONSUMED(%u) %s", consumed, Describe(lock).get())); 222 auto newAvailable = CheckedUint32{mAvailable} + consumed; 223 if (!newAvailable.isValid() || newAvailable.value() > mCapacity) { 224 NS_WARNING("Illegal bytes consumed message received from peer"); 225 SetPeerError(lock, NS_ERROR_UNEXPECTED); 226 return; 227 } 228 mAvailable = newAvailable.value(); 229 if (!mCallbackClosureOnly) { 230 NotifyOnUnlock(lock); 231 } 232 break; 233 } 234 default: { 235 NS_WARNING("Illegal message type received from peer"); 236 SetPeerError(lock, NS_ERROR_UNEXPECTED); 237 return; 238 } 239 } 240 } 241 } 242 243 DataPipeBase::DataPipeBase(bool aReceiverSide, nsresult aError) 244 : mMutex(std::make_shared<Mutex>(aReceiverSide ? "DataPipeReceiver" 245 : "DataPipeSender")), 246 mStatus(NS_SUCCEEDED(aError) ? NS_BASE_STREAM_CLOSED : aError) {} 247 248 DataPipeBase::DataPipeBase(bool aReceiverSide, ScopedPort aPort, 249 MutableSharedMemoryHandle&& aShmemHandle, 250 const std::shared_ptr<SharedMemoryMapping>& aShmem, 251 uint32_t aCapacity, nsresult aPeerStatus, 252 uint32_t aOffset, uint32_t aAvailable) 253 : mMutex(std::make_shared<Mutex>(aReceiverSide ? "DataPipeReceiver" 254 : "DataPipeSender")), 255 mStatus(NS_OK), 256 mLink(new DataPipeLink(aReceiverSide, mMutex, std::move(aPort), 257 std::move(aShmemHandle), aShmem, aCapacity, 258 aPeerStatus, aOffset, aAvailable)) { 259 mLink->Init(); 260 } 261 262 DataPipeBase::~DataPipeBase() { 263 DataPipeAutoLock lock(*mMutex); 264 CloseInternal(lock, NS_BASE_STREAM_CLOSED); 265 } 266 267 void DataPipeBase::CloseInternal(DataPipeAutoLock& aLock, nsresult aStatus) { 268 if (NS_FAILED(mStatus)) { 269 return; 270 } 271 272 MOZ_LOG( 273 gDataPipeLog, LogLevel::Debug, 274 ("Closing(%s) %s", GetStaticErrorName(aStatus), Describe(aLock).get())); 275 276 // Set our status to an errored status. 277 mStatus = NS_SUCCEEDED(aStatus) ? NS_BASE_STREAM_CLOSED : aStatus; 278 RefPtr<DataPipeLink> link = mLink.forget(); 279 AssertSameMutex(link->mMutex); 280 link->NotifyOnUnlock(aLock); 281 282 // If our peer hasn't disappeared yet, clean up our connection to it. 283 if (NS_SUCCEEDED(link->mPeerStatus)) { 284 link->SetPeerError(aLock, mStatus, /* aSendClosed */ true); 285 } 286 } 287 288 nsresult DataPipeBase::ProcessSegmentsInternal( 289 uint32_t aCount, ProcessSegmentFun aProcessSegment, 290 uint32_t* aProcessedCount) { 291 *aProcessedCount = 0; 292 293 while (*aProcessedCount < aCount) { 294 DataPipeAutoLock lock(*mMutex); 295 mMutex->AssertCurrentThreadOwns(); 296 297 MOZ_LOG(gDataPipeLog, LogLevel::Verbose, 298 ("ProcessSegments(%u of %u) %s", *aProcessedCount, aCount, 299 Describe(lock).get())); 300 301 nsresult status = CheckStatus(lock); 302 if (NS_FAILED(status)) { 303 if (*aProcessedCount > 0) { 304 return NS_OK; 305 } 306 return status == NS_BASE_STREAM_CLOSED ? NS_OK : status; 307 } 308 309 RefPtr<DataPipeLink> link = mLink; 310 AssertSameMutex(link->mMutex); 311 if (!link->mAvailable) { 312 MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(link->mPeerStatus), 313 "CheckStatus will have returned an error"); 314 return *aProcessedCount > 0 ? NS_OK : NS_BASE_STREAM_WOULD_BLOCK; 315 } 316 317 MOZ_RELEASE_ASSERT(!link->mProcessingSegment, 318 "Only one thread may be processing a segment at a time"); 319 320 // Extract an iterator over the next contiguous region of the shared memory 321 // buffer which will be used . 322 char* start = link->mShmem->DataAs<char>() + link->mOffset; 323 char* iter = start; 324 char* end = start + std::min({aCount - *aProcessedCount, link->mAvailable, 325 link->mCapacity - link->mOffset}); 326 327 // Record the consumed region from our segment when exiting this scope, 328 // telling our peer how many bytes were consumed. Hold on to `mLink` to keep 329 // the shmem mapped and make sure we can clean up even if we're closed while 330 // processing the shmem region. 331 link->mProcessingSegment = true; 332 auto scopeExit = MakeScopeExit([&] { 333 mMutex->AssertCurrentThreadOwns(); // should still be held 334 AssertSameMutex(link->mMutex); 335 336 MOZ_RELEASE_ASSERT(link->mProcessingSegment); 337 link->mProcessingSegment = false; 338 uint32_t totalProcessed = iter - start; 339 if (totalProcessed > 0) { 340 link->mOffset += totalProcessed; 341 MOZ_RELEASE_ASSERT(link->mOffset <= link->mCapacity); 342 if (link->mOffset == link->mCapacity) { 343 link->mOffset = 0; 344 } 345 link->mAvailable -= totalProcessed; 346 link->SendBytesConsumedOnUnlock(lock, totalProcessed); 347 } 348 MOZ_LOG(gDataPipeLog, LogLevel::Verbose, 349 ("Processed Segment(%u of %zu) %s", totalProcessed, end - start, 350 Describe(lock).get())); 351 }); 352 353 { 354 MutexAutoUnlock unlock(*mMutex); 355 while (iter < end) { 356 uint32_t processed = 0; 357 Span segment{iter, end}; 358 nsresult rv = aProcessSegment(segment, *aProcessedCount, &processed); 359 if (NS_FAILED(rv) || processed == 0) { 360 return NS_OK; 361 } 362 363 MOZ_RELEASE_ASSERT(processed <= segment.Length()); 364 iter += processed; 365 *aProcessedCount += processed; 366 } 367 } 368 } 369 MOZ_DIAGNOSTIC_ASSERT(*aProcessedCount == aCount, 370 "Must have processed exactly aCount"); 371 return NS_OK; 372 } 373 374 void DataPipeBase::AsyncWaitInternal(already_AddRefed<nsIRunnable> aCallback, 375 already_AddRefed<nsIEventTarget> aTarget, 376 bool aClosureOnly) { 377 RefPtr<nsIRunnable> callback = std::move(aCallback); 378 RefPtr<nsIEventTarget> target = std::move(aTarget); 379 380 DataPipeAutoLock lock(*mMutex); 381 MOZ_LOG(gDataPipeLog, LogLevel::Debug, 382 ("AsyncWait %s %p %s", aClosureOnly ? "(closure)" : "(ready)", 383 callback.get(), Describe(lock).get())); 384 385 if (NS_FAILED(CheckStatus(lock))) { 386 #ifdef DEBUG 387 if (mLink) { 388 AssertSameMutex(mLink->mMutex); 389 MOZ_ASSERT(!mLink->mCallback); 390 } 391 #endif 392 DoNotifyOnUnlock(lock, callback.forget(), target.forget()); 393 return; 394 } 395 396 AssertSameMutex(mLink->mMutex); 397 398 // NOTE: After this point, `mLink` may have previously had a callback which is 399 // now being cancelled, make sure we clear `mCallback` even if we're going to 400 // call `aCallback` immediately. 401 mLink->mCallback = callback.forget(); 402 mLink->mCallbackTarget = target.forget(); 403 mLink->mCallbackClosureOnly = aClosureOnly; 404 if (!aClosureOnly && mLink->mAvailable) { 405 mLink->NotifyOnUnlock(lock); 406 } 407 } 408 409 nsresult DataPipeBase::CheckStatus(DataPipeAutoLock& aLock) { 410 // If our peer has closed or errored, we may need to close our local side to 411 // reflect the error code our peer provided. If we're a sender, we want to 412 // become closed immediately, whereas if we're a receiver we want to wait 413 // until our available buffer has been exhausted. 414 // 415 // NOTE: There may still be 2-stage writes/reads ongoing at this point, which 416 // will continue due to `mLink` being kept alive by the 417 // `ProcessSegmentsInternal` function. 418 if (NS_FAILED(mStatus)) { 419 return mStatus; 420 } 421 AssertSameMutex(mLink->mMutex); 422 if (NS_FAILED(mLink->mPeerStatus) && 423 (!mLink->mReceiverSide || !mLink->mAvailable)) { 424 CloseInternal(aLock, mLink->mPeerStatus); 425 } 426 return mStatus; 427 } 428 429 nsCString DataPipeBase::Describe(DataPipeAutoLock& aLock) { 430 if (mLink) { 431 AssertSameMutex(mLink->mMutex); 432 return mLink->Describe(aLock); 433 } 434 return nsPrintfCString("[status=%s]", GetStaticErrorName(mStatus)); 435 } 436 437 template <typename T> 438 void DataPipeWrite(IPC::MessageWriter* aWriter, T* aParam) { 439 DataPipeAutoLock lock(*aParam->mMutex); 440 MOZ_LOG(gDataPipeLog, LogLevel::Debug, 441 ("IPC Write: %s", aParam->Describe(lock).get())); 442 443 WriteParam(aWriter, aParam->mStatus); 444 if (NS_FAILED(aParam->mStatus)) { 445 return; 446 } 447 448 aParam->AssertSameMutex(aParam->mLink->mMutex); 449 MOZ_RELEASE_ASSERT(!aParam->mLink->mProcessingSegment, 450 "cannot transfer while processing a segment"); 451 452 // Serialize relevant parameters to our peer. 453 WriteParam(aWriter, std::move(aParam->mLink->mPort)); 454 WriteParam(aWriter, std::move(aParam->mLink->mShmemHandle)); 455 WriteParam(aWriter, aParam->mLink->mCapacity); 456 WriteParam(aWriter, aParam->mLink->mPeerStatus); 457 WriteParam(aWriter, aParam->mLink->mOffset); 458 WriteParam(aWriter, aParam->mLink->mAvailable); 459 460 // Mark our peer as closed so we don't try to send to it when closing. 461 aParam->mLink->mPeerStatus = NS_ERROR_NOT_INITIALIZED; 462 aParam->CloseInternal(lock, NS_ERROR_NOT_INITIALIZED); 463 } 464 465 template <typename T> 466 bool DataPipeRead(IPC::MessageReader* aReader, RefPtr<T>* aResult) { 467 nsresult rv = NS_OK; 468 if (!ReadParam(aReader, &rv)) { 469 aReader->FatalError("failed to read DataPipe status"); 470 return false; 471 } 472 if (NS_FAILED(rv)) { 473 *aResult = new T(rv); 474 MOZ_LOG(gDataPipeLog, LogLevel::Debug, 475 ("IPC Read: [status=%s]", GetStaticErrorName(rv))); 476 return true; 477 } 478 479 ScopedPort port; 480 if (!ReadParam(aReader, &port)) { 481 aReader->FatalError("failed to read DataPipe port"); 482 return false; 483 } 484 MutableSharedMemoryHandle shmemHandle; 485 if (!ReadParam(aReader, &shmemHandle)) { 486 aReader->FatalError("failed to read DataPipe shmem"); 487 return false; 488 } 489 490 if (!shmemHandle) { 491 aReader->FatalError("failed to create DataPipe shmem handle"); 492 return false; 493 } 494 495 uint32_t capacity = 0; 496 nsresult peerStatus = NS_OK; 497 uint32_t offset = 0; 498 uint32_t available = 0; 499 if (!ReadParam(aReader, &capacity) || !ReadParam(aReader, &peerStatus) || 500 !ReadParam(aReader, &offset) || !ReadParam(aReader, &available)) { 501 aReader->FatalError("failed to read DataPipe fields"); 502 return false; 503 } 504 if (!capacity || offset >= capacity || available > capacity) { 505 aReader->FatalError("received DataPipe state values are inconsistent"); 506 return false; 507 } 508 auto mapping = std::make_shared<SharedMemoryMapping>(shmemHandle.Map()); 509 if (!*mapping || 510 mapping->Size() != shared_memory::PageAlignedSize(capacity)) { 511 aReader->FatalError("failed to map DataPipe shared memory region"); 512 return false; 513 } 514 515 *aResult = new T(std::move(port), std::move(shmemHandle), mapping, capacity, 516 peerStatus, offset, available); 517 if (MOZ_LOG_TEST(gDataPipeLog, LogLevel::Debug)) { 518 DataPipeAutoLock lock(*(*aResult)->mMutex); 519 MOZ_LOG(gDataPipeLog, LogLevel::Debug, 520 ("IPC Read: %s", (*aResult)->Describe(lock).get())); 521 } 522 return true; 523 } 524 525 } // namespace data_pipe_detail 526 527 //----------------------------------------------------------------------------- 528 // DataPipeSender 529 //----------------------------------------------------------------------------- 530 531 NS_IMPL_ISUPPORTS(DataPipeSender, nsIOutputStream, nsIAsyncOutputStream, 532 DataPipeSender) 533 534 // nsIOutputStream 535 536 NS_IMETHODIMP DataPipeSender::Close() { 537 return CloseWithStatus(NS_BASE_STREAM_CLOSED); 538 } 539 540 NS_IMETHODIMP DataPipeSender::Flush() { return NS_OK; } 541 542 NS_IMETHODIMP DataPipeSender::StreamStatus() { 543 data_pipe_detail::DataPipeAutoLock lock(*mMutex); 544 return CheckStatus(lock); 545 } 546 547 NS_IMETHODIMP DataPipeSender::Write(const char* aBuf, uint32_t aCount, 548 uint32_t* aWriteCount) { 549 return WriteSegments(NS_CopyBufferToSegment, (void*)aBuf, aCount, 550 aWriteCount); 551 } 552 553 NS_IMETHODIMP DataPipeSender::WriteFrom(nsIInputStream* aFromStream, 554 uint32_t aCount, 555 uint32_t* aWriteCount) { 556 return WriteSegments(NS_CopyStreamToSegment, aFromStream, aCount, 557 aWriteCount); 558 } 559 560 NS_IMETHODIMP DataPipeSender::WriteSegments(nsReadSegmentFun aReader, 561 void* aClosure, uint32_t aCount, 562 uint32_t* aWriteCount) { 563 auto processSegment = [&](Span<char> aSpan, uint32_t aToOffset, 564 uint32_t* aReadCount) -> nsresult { 565 return aReader(this, aClosure, aSpan.data(), aToOffset, aSpan.Length(), 566 aReadCount); 567 }; 568 return ProcessSegmentsInternal(aCount, processSegment, aWriteCount); 569 } 570 571 NS_IMETHODIMP DataPipeSender::IsNonBlocking(bool* _retval) { 572 *_retval = true; 573 return NS_OK; 574 } 575 576 // nsIAsyncOutputStream 577 578 NS_IMETHODIMP DataPipeSender::CloseWithStatus(nsresult reason) { 579 data_pipe_detail::DataPipeAutoLock lock(*mMutex); 580 CloseInternal(lock, reason); 581 return NS_OK; 582 } 583 584 NS_IMETHODIMP DataPipeSender::AsyncWait(nsIOutputStreamCallback* aCallback, 585 uint32_t aFlags, 586 uint32_t aRequestedCount, 587 nsIEventTarget* aTarget) { 588 AsyncWaitInternal( 589 aCallback ? NS_NewCancelableRunnableFunction( 590 "DataPipeSender::AsyncWait", 591 [self = RefPtr{this}, callback = RefPtr{aCallback}] { 592 MOZ_LOG(gDataPipeLog, LogLevel::Debug, 593 ("Calling OnOutputStreamReady(%p, %p)", 594 callback.get(), self.get())); 595 callback->OnOutputStreamReady(self); 596 }) 597 : nullptr, 598 do_AddRef(aTarget), aFlags & WAIT_CLOSURE_ONLY); 599 return NS_OK; 600 } 601 602 //----------------------------------------------------------------------------- 603 // DataPipeReceiver 604 //----------------------------------------------------------------------------- 605 606 NS_IMPL_ISUPPORTS(DataPipeReceiver, nsIInputStream, nsIAsyncInputStream, 607 nsIIPCSerializableInputStream, DataPipeReceiver) 608 609 // nsIInputStream 610 611 NS_IMETHODIMP DataPipeReceiver::Close() { 612 return CloseWithStatus(NS_BASE_STREAM_CLOSED); 613 } 614 615 NS_IMETHODIMP DataPipeReceiver::Available(uint64_t* _retval) { 616 data_pipe_detail::DataPipeAutoLock lock(*mMutex); 617 nsresult rv = CheckStatus(lock); 618 if (NS_FAILED(rv)) { 619 return rv; 620 } 621 AssertSameMutex(mLink->mMutex); 622 *_retval = mLink->mAvailable; 623 return NS_OK; 624 } 625 626 NS_IMETHODIMP DataPipeReceiver::StreamStatus() { 627 data_pipe_detail::DataPipeAutoLock lock(*mMutex); 628 return CheckStatus(lock); 629 } 630 631 NS_IMETHODIMP DataPipeReceiver::Read(char* aBuf, uint32_t aCount, 632 uint32_t* aReadCount) { 633 return ReadSegments(NS_CopySegmentToBuffer, aBuf, aCount, aReadCount); 634 } 635 636 NS_IMETHODIMP DataPipeReceiver::ReadSegments(nsWriteSegmentFun aWriter, 637 void* aClosure, uint32_t aCount, 638 uint32_t* aReadCount) { 639 auto processSegment = [&](Span<char> aSpan, uint32_t aToOffset, 640 uint32_t* aWriteCount) -> nsresult { 641 return aWriter(this, aClosure, aSpan.data(), aToOffset, aSpan.Length(), 642 aWriteCount); 643 }; 644 return ProcessSegmentsInternal(aCount, processSegment, aReadCount); 645 } 646 647 NS_IMETHODIMP DataPipeReceiver::IsNonBlocking(bool* _retval) { 648 *_retval = true; 649 return NS_OK; 650 } 651 652 // nsIAsyncInputStream 653 654 NS_IMETHODIMP DataPipeReceiver::CloseWithStatus(nsresult aStatus) { 655 data_pipe_detail::DataPipeAutoLock lock(*mMutex); 656 CloseInternal(lock, aStatus); 657 return NS_OK; 658 } 659 660 NS_IMETHODIMP DataPipeReceiver::AsyncWait(nsIInputStreamCallback* aCallback, 661 uint32_t aFlags, 662 uint32_t aRequestedCount, 663 nsIEventTarget* aTarget) { 664 AsyncWaitInternal( 665 aCallback ? NS_NewCancelableRunnableFunction( 666 "DataPipeReceiver::AsyncWait", 667 [self = RefPtr{this}, callback = RefPtr{aCallback}] { 668 MOZ_LOG(gDataPipeLog, LogLevel::Debug, 669 ("Calling OnInputStreamReady(%p, %p)", 670 callback.get(), self.get())); 671 callback->OnInputStreamReady(self); 672 }) 673 : nullptr, 674 do_AddRef(aTarget), aFlags & WAIT_CLOSURE_ONLY); 675 return NS_OK; 676 } 677 678 // nsIIPCSerializableInputStream 679 680 void DataPipeReceiver::SerializedComplexity(uint32_t aMaxSize, 681 uint32_t* aSizeUsed, 682 uint32_t* aPipes, 683 uint32_t* aTransferables) { 684 // We report DataPipeReceiver as taking one transferrable to serialize, rather 685 // than one pipe, as we aren't starting a new pipe for this purpose, and are 686 // instead transferring an existing pipe. 687 *aTransferables = 1; 688 } 689 690 void DataPipeReceiver::Serialize(InputStreamParams& aParams, uint32_t aMaxSize, 691 uint32_t* aSizeUsed) { 692 *aSizeUsed = 0; 693 aParams = DataPipeReceiverStreamParams(WrapNotNull(this)); 694 } 695 696 bool DataPipeReceiver::Deserialize(const InputStreamParams& aParams) { 697 MOZ_CRASH("Handled directly in `DeserializeInputStream`"); 698 } 699 700 //----------------------------------------------------------------------------- 701 // NewDataPipe 702 //----------------------------------------------------------------------------- 703 704 nsresult NewDataPipe(uint32_t aCapacity, DataPipeSender** aSender, 705 DataPipeReceiver** aReceiver) { 706 if (!aCapacity) { 707 aCapacity = kDefaultDataPipeCapacity; 708 } 709 710 RefPtr<NodeController> controller = NodeController::GetSingleton(); 711 if (!controller) { 712 return NS_ERROR_ILLEGAL_DURING_SHUTDOWN; 713 } 714 715 // Allocate a pair of ports for messaging between the sender & receiver. 716 auto [senderPort, receiverPort] = controller->CreatePortPair(); 717 718 // Create and allocate the shared memory region. 719 size_t alignedCapacity = shared_memory::PageAlignedSize(aCapacity); 720 auto handle = shared_memory::Create(alignedCapacity); 721 if (!handle) { 722 return NS_ERROR_OUT_OF_MEMORY; 723 } 724 auto mapping = std::make_shared<SharedMemoryMapping>(handle.Map()); 725 if (!*mapping) { 726 return NS_ERROR_OUT_OF_MEMORY; 727 } 728 729 // We'll first clone then take the handle from the region so that the sender & 730 // receiver each have a handle. This avoids the need to duplicate the handle 731 // when serializing, when errors are non-recoverable. 732 auto senderShmemHandle = handle.Clone(); 733 auto receiverShmemHandle = std::move(handle); 734 if (!senderShmemHandle || !receiverShmemHandle) { 735 return NS_ERROR_OUT_OF_MEMORY; 736 } 737 738 RefPtr sender = 739 new DataPipeSender(std::move(senderPort), std::move(senderShmemHandle), 740 mapping, aCapacity, NS_OK, 0, aCapacity); 741 RefPtr receiver = new DataPipeReceiver(std::move(receiverPort), 742 std::move(receiverShmemHandle), 743 mapping, aCapacity, NS_OK, 0, 0); 744 sender.forget(aSender); 745 receiver.forget(aReceiver); 746 return NS_OK; 747 } 748 749 } // namespace ipc 750 } // namespace mozilla 751 752 void IPC::ParamTraits<mozilla::ipc::DataPipeSender*>::Write( 753 MessageWriter* aWriter, mozilla::ipc::DataPipeSender* aParam) { 754 mozilla::ipc::data_pipe_detail::DataPipeWrite(aWriter, aParam); 755 } 756 757 bool IPC::ParamTraits<mozilla::ipc::DataPipeSender*>::Read( 758 MessageReader* aReader, RefPtr<mozilla::ipc::DataPipeSender>* aResult) { 759 return mozilla::ipc::data_pipe_detail::DataPipeRead(aReader, aResult); 760 } 761 762 void IPC::ParamTraits<mozilla::ipc::DataPipeReceiver*>::Write( 763 MessageWriter* aWriter, mozilla::ipc::DataPipeReceiver* aParam) { 764 mozilla::ipc::data_pipe_detail::DataPipeWrite(aWriter, aParam); 765 } 766 767 bool IPC::ParamTraits<mozilla::ipc::DataPipeReceiver*>::Read( 768 MessageReader* aReader, RefPtr<mozilla::ipc::DataPipeReceiver>* aResult) { 769 return mozilla::ipc::data_pipe_detail::DataPipeRead(aReader, aResult); 770 }