tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

DataPipe.cpp (28025B)


      1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
      2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
      3 /* This Source Code Form is subject to the terms of the Mozilla Public
      4 * License, v. 2.0. If a copy of the MPL was not distributed with this
      5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
      6 
      7 #include "DataPipe.h"
      8 #include "mozilla/AlreadyAddRefed.h"
      9 #include "mozilla/Assertions.h"
     10 #include "mozilla/CheckedInt.h"
     11 #include "mozilla/ErrorNames.h"
     12 #include "mozilla/Logging.h"
     13 #include "mozilla/MoveOnlyFunction.h"
     14 #include "mozilla/ipc/InputStreamParams.h"
     15 #include "nsIAsyncInputStream.h"
     16 #include "nsStreamUtils.h"
     17 #include "nsThreadUtils.h"
     18 
     19 namespace mozilla {
     20 namespace ipc {
     21 
     22 LazyLogModule gDataPipeLog("DataPipe");
     23 
     24 namespace data_pipe_detail {
     25 
     26 // Helper for queueing up actions to be run once the mutex has been unlocked.
     27 // Actions will be run in-order.
     28 class MOZ_SCOPED_CAPABILITY DataPipeAutoLock {
     29 public:
     30  explicit DataPipeAutoLock(Mutex& aMutex) MOZ_CAPABILITY_ACQUIRE(aMutex)
     31      : mMutex(aMutex) {
     32    mMutex.Lock();
     33  }
     34  DataPipeAutoLock(const DataPipeAutoLock&) = delete;
     35  DataPipeAutoLock& operator=(const DataPipeAutoLock&) = delete;
     36 
     37  template <typename F>
     38  void AddUnlockAction(F aAction) {
     39    mActions.AppendElement(std::move(aAction));
     40  }
     41 
     42  ~DataPipeAutoLock() MOZ_CAPABILITY_RELEASE() {
     43    mMutex.Unlock();
     44    for (auto& action : mActions) {
     45      action();
     46    }
     47  }
     48 
     49 private:
     50  Mutex& mMutex;
     51  AutoTArray<MoveOnlyFunction<void()>, 4> mActions;
     52 };
     53 
     54 static void DoNotifyOnUnlock(DataPipeAutoLock& aLock,
     55                             already_AddRefed<nsIRunnable> aCallback,
     56                             already_AddRefed<nsIEventTarget> aTarget) {
     57  nsCOMPtr<nsIRunnable> callback{std::move(aCallback)};
     58  nsCOMPtr<nsIEventTarget> target{std::move(aTarget)};
     59  if (callback) {
     60    aLock.AddUnlockAction(
     61        [callback = std::move(callback), target = std::move(target)]() mutable {
     62          if (target) {
     63            target->Dispatch(callback.forget());
     64          } else {
     65            NS_DispatchBackgroundTask(callback.forget());
     66          }
     67        });
     68  }
     69 }
     70 
     71 class DataPipeLink : public NodeController::PortObserver {
     72 public:
     73  DataPipeLink(bool aReceiverSide, std::shared_ptr<Mutex> aMutex,
     74               ScopedPort aPort, MutableSharedMemoryHandle&& aShmemHandle,
     75               const std::shared_ptr<SharedMemoryMapping> aShmem,
     76               uint32_t aCapacity, nsresult aPeerStatus, uint32_t aOffset,
     77               uint32_t aAvailable)
     78      : mMutex(std::move(aMutex)),
     79        mPort(std::move(aPort)),
     80        mShmemHandle(std::move(aShmemHandle)),
     81        mShmem(aShmem),
     82        mCapacity(aCapacity),
     83        mReceiverSide(aReceiverSide),
     84        mPeerStatus(aPeerStatus),
     85        mOffset(aOffset),
     86        mAvailable(aAvailable) {}
     87 
     88  void Init() MOZ_EXCLUDES(*mMutex) {
     89    {
     90      DataPipeAutoLock lock(*mMutex);
     91      if (NS_FAILED(mPeerStatus)) {
     92        return;
     93      }
     94      MOZ_ASSERT(mPort.IsValid());
     95      mPort.Controller()->SetPortObserver(mPort.Port(), this);
     96    }
     97    OnPortStatusChanged();
     98  }
     99 
    100  void OnPortStatusChanged() final MOZ_EXCLUDES(*mMutex);
    101 
    102  // Add a task to notify the callback after `aLock` is unlocked.
    103  //
    104  // This method is safe to call multiple times, as after the first time it is
    105  // called, `mCallback` will be cleared.
    106  void NotifyOnUnlock(DataPipeAutoLock& aLock) MOZ_REQUIRES(*mMutex) {
    107    DoNotifyOnUnlock(aLock, mCallback.forget(), mCallbackTarget.forget());
    108  }
    109 
    110  void SendBytesConsumedOnUnlock(DataPipeAutoLock& aLock, uint32_t aBytes)
    111      MOZ_REQUIRES(*mMutex) {
    112    MOZ_LOG(gDataPipeLog, LogLevel::Verbose,
    113            ("SendOnUnlock CONSUMED(%u) %s", aBytes, Describe(aLock).get()));
    114    if (NS_FAILED(mPeerStatus)) {
    115      return;
    116    }
    117 
    118    // `mPort` may be destroyed by `SetPeerError` after the DataPipe is unlocked
    119    // but before we send the message. The strong controller and port references
    120    // will allow us to try to send the message anyway, and it will be safely
    121    // dropped if the port has already been closed. CONSUMED messages are safe
    122    // to deliver out-of-order, so we don't need to worry about ordering here.
    123    aLock.AddUnlockAction([controller = RefPtr{mPort.Controller()},
    124                           port = mPort.Port(), aBytes]() mutable {
    125      auto message = MakeUnique<IPC::Message>(
    126          MSG_ROUTING_NONE, DATA_PIPE_BYTES_CONSUMED_MESSAGE_TYPE);
    127      IPC::MessageWriter writer(*message);
    128      WriteParam(&writer, aBytes);
    129      controller->SendUserMessage(port, std::move(message));
    130    });
    131  }
    132 
    133  void SetPeerError(DataPipeAutoLock& aLock, nsresult aStatus,
    134                    bool aSendClosed = false) MOZ_REQUIRES(*mMutex) {
    135    MOZ_LOG(gDataPipeLog, LogLevel::Debug,
    136            ("SetPeerError(%s%s) %s", GetStaticErrorName(aStatus),
    137             aSendClosed ? ", send" : "", Describe(aLock).get()));
    138    // The pipe was closed or errored. Clear the observer reference back
    139    // to this type from the port layer, and ensure we notify waiters.
    140    MOZ_ASSERT(NS_SUCCEEDED(mPeerStatus));
    141    mPeerStatus = NS_SUCCEEDED(aStatus) ? NS_BASE_STREAM_CLOSED : aStatus;
    142    aLock.AddUnlockAction([port = std::move(mPort), aStatus, aSendClosed] {
    143      if (aSendClosed) {
    144        auto message = MakeUnique<IPC::Message>(MSG_ROUTING_NONE,
    145                                                DATA_PIPE_CLOSED_MESSAGE_TYPE);
    146        IPC::MessageWriter writer(*message);
    147        WriteParam(&writer, aStatus);
    148        port.Controller()->SendUserMessage(port.Port(), std::move(message));
    149      }
    150      // The `ScopedPort` being destroyed with this action will close it,
    151      // clearing the observer reference from the ports layer.
    152    });
    153    NotifyOnUnlock(aLock);
    154  }
    155 
    156  nsCString Describe(DataPipeAutoLock& aLock) const MOZ_REQUIRES(*mMutex) {
    157    return nsPrintfCString(
    158        "[%s(%p) c=%u e=%s o=%u a=%u, cb=%s]",
    159        mReceiverSide ? "Receiver" : "Sender", this, mCapacity,
    160        GetStaticErrorName(mPeerStatus), mOffset, mAvailable,
    161        mCallback ? (mCallbackClosureOnly ? "clo" : "yes") : "no");
    162  }
    163 
    164  // This mutex is shared with the `DataPipeBase` which owns this
    165  // `DataPipeLink`.
    166  std::shared_ptr<Mutex> mMutex;
    167 
    168  ScopedPort mPort MOZ_GUARDED_BY(*mMutex);
    169  MutableSharedMemoryHandle mShmemHandle MOZ_GUARDED_BY(*mMutex);
    170  const std::shared_ptr<SharedMemoryMapping> mShmem;
    171  const uint32_t mCapacity;
    172  const bool mReceiverSide;
    173 
    174  bool mProcessingSegment MOZ_GUARDED_BY(*mMutex) = false;
    175 
    176  nsresult mPeerStatus MOZ_GUARDED_BY(*mMutex) = NS_OK;
    177  uint32_t mOffset MOZ_GUARDED_BY(*mMutex) = 0;
    178  uint32_t mAvailable MOZ_GUARDED_BY(*mMutex) = 0;
    179 
    180  bool mCallbackClosureOnly MOZ_GUARDED_BY(*mMutex) = false;
    181  nsCOMPtr<nsIRunnable> mCallback MOZ_GUARDED_BY(*mMutex);
    182  nsCOMPtr<nsIEventTarget> mCallbackTarget MOZ_GUARDED_BY(*mMutex);
    183 };
    184 
    185 void DataPipeLink::OnPortStatusChanged() {
    186  DataPipeAutoLock lock(*mMutex);
    187 
    188  while (NS_SUCCEEDED(mPeerStatus)) {
    189    UniquePtr<IPC::Message> message;
    190    if (!mPort.Controller()->GetMessage(mPort.Port(), &message)) {
    191      SetPeerError(lock, NS_BASE_STREAM_CLOSED);
    192      return;
    193    }
    194    if (!message) {
    195      return;  // no more messages
    196    }
    197 
    198    IPC::MessageReader reader(*message);
    199    switch (message->type()) {
    200      case DATA_PIPE_CLOSED_MESSAGE_TYPE: {
    201        nsresult status = NS_OK;
    202        if (!ReadParam(&reader, &status)) {
    203          NS_WARNING("Unable to parse nsresult error from peer");
    204          status = NS_ERROR_UNEXPECTED;
    205        }
    206        MOZ_LOG(gDataPipeLog, LogLevel::Debug,
    207                ("Got CLOSED(%s) %s", GetStaticErrorName(status),
    208                 Describe(lock).get()));
    209        SetPeerError(lock, status);
    210        return;
    211      }
    212      case DATA_PIPE_BYTES_CONSUMED_MESSAGE_TYPE: {
    213        uint32_t consumed = 0;
    214        if (!ReadParam(&reader, &consumed)) {
    215          NS_WARNING("Unable to parse bytes consumed from peer");
    216          SetPeerError(lock, NS_ERROR_UNEXPECTED);
    217          return;
    218        }
    219 
    220        MOZ_LOG(gDataPipeLog, LogLevel::Verbose,
    221                ("Got CONSUMED(%u) %s", consumed, Describe(lock).get()));
    222        auto newAvailable = CheckedUint32{mAvailable} + consumed;
    223        if (!newAvailable.isValid() || newAvailable.value() > mCapacity) {
    224          NS_WARNING("Illegal bytes consumed message received from peer");
    225          SetPeerError(lock, NS_ERROR_UNEXPECTED);
    226          return;
    227        }
    228        mAvailable = newAvailable.value();
    229        if (!mCallbackClosureOnly) {
    230          NotifyOnUnlock(lock);
    231        }
    232        break;
    233      }
    234      default: {
    235        NS_WARNING("Illegal message type received from peer");
    236        SetPeerError(lock, NS_ERROR_UNEXPECTED);
    237        return;
    238      }
    239    }
    240  }
    241 }
    242 
    243 DataPipeBase::DataPipeBase(bool aReceiverSide, nsresult aError)
    244    : mMutex(std::make_shared<Mutex>(aReceiverSide ? "DataPipeReceiver"
    245                                                   : "DataPipeSender")),
    246      mStatus(NS_SUCCEEDED(aError) ? NS_BASE_STREAM_CLOSED : aError) {}
    247 
    248 DataPipeBase::DataPipeBase(bool aReceiverSide, ScopedPort aPort,
    249                           MutableSharedMemoryHandle&& aShmemHandle,
    250                           const std::shared_ptr<SharedMemoryMapping>& aShmem,
    251                           uint32_t aCapacity, nsresult aPeerStatus,
    252                           uint32_t aOffset, uint32_t aAvailable)
    253    : mMutex(std::make_shared<Mutex>(aReceiverSide ? "DataPipeReceiver"
    254                                                   : "DataPipeSender")),
    255      mStatus(NS_OK),
    256      mLink(new DataPipeLink(aReceiverSide, mMutex, std::move(aPort),
    257                             std::move(aShmemHandle), aShmem, aCapacity,
    258                             aPeerStatus, aOffset, aAvailable)) {
    259  mLink->Init();
    260 }
    261 
    262 DataPipeBase::~DataPipeBase() {
    263  DataPipeAutoLock lock(*mMutex);
    264  CloseInternal(lock, NS_BASE_STREAM_CLOSED);
    265 }
    266 
    267 void DataPipeBase::CloseInternal(DataPipeAutoLock& aLock, nsresult aStatus) {
    268  if (NS_FAILED(mStatus)) {
    269    return;
    270  }
    271 
    272  MOZ_LOG(
    273      gDataPipeLog, LogLevel::Debug,
    274      ("Closing(%s) %s", GetStaticErrorName(aStatus), Describe(aLock).get()));
    275 
    276  // Set our status to an errored status.
    277  mStatus = NS_SUCCEEDED(aStatus) ? NS_BASE_STREAM_CLOSED : aStatus;
    278  RefPtr<DataPipeLink> link = mLink.forget();
    279  AssertSameMutex(link->mMutex);
    280  link->NotifyOnUnlock(aLock);
    281 
    282  // If our peer hasn't disappeared yet, clean up our connection to it.
    283  if (NS_SUCCEEDED(link->mPeerStatus)) {
    284    link->SetPeerError(aLock, mStatus, /* aSendClosed */ true);
    285  }
    286 }
    287 
    288 nsresult DataPipeBase::ProcessSegmentsInternal(
    289    uint32_t aCount, ProcessSegmentFun aProcessSegment,
    290    uint32_t* aProcessedCount) {
    291  *aProcessedCount = 0;
    292 
    293  while (*aProcessedCount < aCount) {
    294    DataPipeAutoLock lock(*mMutex);
    295    mMutex->AssertCurrentThreadOwns();
    296 
    297    MOZ_LOG(gDataPipeLog, LogLevel::Verbose,
    298            ("ProcessSegments(%u of %u) %s", *aProcessedCount, aCount,
    299             Describe(lock).get()));
    300 
    301    nsresult status = CheckStatus(lock);
    302    if (NS_FAILED(status)) {
    303      if (*aProcessedCount > 0) {
    304        return NS_OK;
    305      }
    306      return status == NS_BASE_STREAM_CLOSED ? NS_OK : status;
    307    }
    308 
    309    RefPtr<DataPipeLink> link = mLink;
    310    AssertSameMutex(link->mMutex);
    311    if (!link->mAvailable) {
    312      MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(link->mPeerStatus),
    313                            "CheckStatus will have returned an error");
    314      return *aProcessedCount > 0 ? NS_OK : NS_BASE_STREAM_WOULD_BLOCK;
    315    }
    316 
    317    MOZ_RELEASE_ASSERT(!link->mProcessingSegment,
    318                       "Only one thread may be processing a segment at a time");
    319 
    320    // Extract an iterator over the next contiguous region of the shared memory
    321    // buffer which will be used .
    322    char* start = link->mShmem->DataAs<char>() + link->mOffset;
    323    char* iter = start;
    324    char* end = start + std::min({aCount - *aProcessedCount, link->mAvailable,
    325                                  link->mCapacity - link->mOffset});
    326 
    327    // Record the consumed region from our segment when exiting this scope,
    328    // telling our peer how many bytes were consumed. Hold on to `mLink` to keep
    329    // the shmem mapped and make sure we can clean up even if we're closed while
    330    // processing the shmem region.
    331    link->mProcessingSegment = true;
    332    auto scopeExit = MakeScopeExit([&] {
    333      mMutex->AssertCurrentThreadOwns();  // should still be held
    334      AssertSameMutex(link->mMutex);
    335 
    336      MOZ_RELEASE_ASSERT(link->mProcessingSegment);
    337      link->mProcessingSegment = false;
    338      uint32_t totalProcessed = iter - start;
    339      if (totalProcessed > 0) {
    340        link->mOffset += totalProcessed;
    341        MOZ_RELEASE_ASSERT(link->mOffset <= link->mCapacity);
    342        if (link->mOffset == link->mCapacity) {
    343          link->mOffset = 0;
    344        }
    345        link->mAvailable -= totalProcessed;
    346        link->SendBytesConsumedOnUnlock(lock, totalProcessed);
    347      }
    348      MOZ_LOG(gDataPipeLog, LogLevel::Verbose,
    349              ("Processed Segment(%u of %zu) %s", totalProcessed, end - start,
    350               Describe(lock).get()));
    351    });
    352 
    353    {
    354      MutexAutoUnlock unlock(*mMutex);
    355      while (iter < end) {
    356        uint32_t processed = 0;
    357        Span segment{iter, end};
    358        nsresult rv = aProcessSegment(segment, *aProcessedCount, &processed);
    359        if (NS_FAILED(rv) || processed == 0) {
    360          return NS_OK;
    361        }
    362 
    363        MOZ_RELEASE_ASSERT(processed <= segment.Length());
    364        iter += processed;
    365        *aProcessedCount += processed;
    366      }
    367    }
    368  }
    369  MOZ_DIAGNOSTIC_ASSERT(*aProcessedCount == aCount,
    370                        "Must have processed exactly aCount");
    371  return NS_OK;
    372 }
    373 
    374 void DataPipeBase::AsyncWaitInternal(already_AddRefed<nsIRunnable> aCallback,
    375                                     already_AddRefed<nsIEventTarget> aTarget,
    376                                     bool aClosureOnly) {
    377  RefPtr<nsIRunnable> callback = std::move(aCallback);
    378  RefPtr<nsIEventTarget> target = std::move(aTarget);
    379 
    380  DataPipeAutoLock lock(*mMutex);
    381  MOZ_LOG(gDataPipeLog, LogLevel::Debug,
    382          ("AsyncWait %s %p %s", aClosureOnly ? "(closure)" : "(ready)",
    383           callback.get(), Describe(lock).get()));
    384 
    385  if (NS_FAILED(CheckStatus(lock))) {
    386 #ifdef DEBUG
    387    if (mLink) {
    388      AssertSameMutex(mLink->mMutex);
    389      MOZ_ASSERT(!mLink->mCallback);
    390    }
    391 #endif
    392    DoNotifyOnUnlock(lock, callback.forget(), target.forget());
    393    return;
    394  }
    395 
    396  AssertSameMutex(mLink->mMutex);
    397 
    398  // NOTE: After this point, `mLink` may have previously had a callback which is
    399  // now being cancelled, make sure we clear `mCallback` even if we're going to
    400  // call `aCallback` immediately.
    401  mLink->mCallback = callback.forget();
    402  mLink->mCallbackTarget = target.forget();
    403  mLink->mCallbackClosureOnly = aClosureOnly;
    404  if (!aClosureOnly && mLink->mAvailable) {
    405    mLink->NotifyOnUnlock(lock);
    406  }
    407 }
    408 
    409 nsresult DataPipeBase::CheckStatus(DataPipeAutoLock& aLock) {
    410  // If our peer has closed or errored, we may need to close our local side to
    411  // reflect the error code our peer provided. If we're a sender, we want to
    412  // become closed immediately, whereas if we're a receiver we want to wait
    413  // until our available buffer has been exhausted.
    414  //
    415  // NOTE: There may still be 2-stage writes/reads ongoing at this point, which
    416  // will continue due to `mLink` being kept alive by the
    417  // `ProcessSegmentsInternal` function.
    418  if (NS_FAILED(mStatus)) {
    419    return mStatus;
    420  }
    421  AssertSameMutex(mLink->mMutex);
    422  if (NS_FAILED(mLink->mPeerStatus) &&
    423      (!mLink->mReceiverSide || !mLink->mAvailable)) {
    424    CloseInternal(aLock, mLink->mPeerStatus);
    425  }
    426  return mStatus;
    427 }
    428 
    429 nsCString DataPipeBase::Describe(DataPipeAutoLock& aLock) {
    430  if (mLink) {
    431    AssertSameMutex(mLink->mMutex);
    432    return mLink->Describe(aLock);
    433  }
    434  return nsPrintfCString("[status=%s]", GetStaticErrorName(mStatus));
    435 }
    436 
    437 template <typename T>
    438 void DataPipeWrite(IPC::MessageWriter* aWriter, T* aParam) {
    439  DataPipeAutoLock lock(*aParam->mMutex);
    440  MOZ_LOG(gDataPipeLog, LogLevel::Debug,
    441          ("IPC Write: %s", aParam->Describe(lock).get()));
    442 
    443  WriteParam(aWriter, aParam->mStatus);
    444  if (NS_FAILED(aParam->mStatus)) {
    445    return;
    446  }
    447 
    448  aParam->AssertSameMutex(aParam->mLink->mMutex);
    449  MOZ_RELEASE_ASSERT(!aParam->mLink->mProcessingSegment,
    450                     "cannot transfer while processing a segment");
    451 
    452  // Serialize relevant parameters to our peer.
    453  WriteParam(aWriter, std::move(aParam->mLink->mPort));
    454  WriteParam(aWriter, std::move(aParam->mLink->mShmemHandle));
    455  WriteParam(aWriter, aParam->mLink->mCapacity);
    456  WriteParam(aWriter, aParam->mLink->mPeerStatus);
    457  WriteParam(aWriter, aParam->mLink->mOffset);
    458  WriteParam(aWriter, aParam->mLink->mAvailable);
    459 
    460  // Mark our peer as closed so we don't try to send to it when closing.
    461  aParam->mLink->mPeerStatus = NS_ERROR_NOT_INITIALIZED;
    462  aParam->CloseInternal(lock, NS_ERROR_NOT_INITIALIZED);
    463 }
    464 
    465 template <typename T>
    466 bool DataPipeRead(IPC::MessageReader* aReader, RefPtr<T>* aResult) {
    467  nsresult rv = NS_OK;
    468  if (!ReadParam(aReader, &rv)) {
    469    aReader->FatalError("failed to read DataPipe status");
    470    return false;
    471  }
    472  if (NS_FAILED(rv)) {
    473    *aResult = new T(rv);
    474    MOZ_LOG(gDataPipeLog, LogLevel::Debug,
    475            ("IPC Read: [status=%s]", GetStaticErrorName(rv)));
    476    return true;
    477  }
    478 
    479  ScopedPort port;
    480  if (!ReadParam(aReader, &port)) {
    481    aReader->FatalError("failed to read DataPipe port");
    482    return false;
    483  }
    484  MutableSharedMemoryHandle shmemHandle;
    485  if (!ReadParam(aReader, &shmemHandle)) {
    486    aReader->FatalError("failed to read DataPipe shmem");
    487    return false;
    488  }
    489 
    490  if (!shmemHandle) {
    491    aReader->FatalError("failed to create DataPipe shmem handle");
    492    return false;
    493  }
    494 
    495  uint32_t capacity = 0;
    496  nsresult peerStatus = NS_OK;
    497  uint32_t offset = 0;
    498  uint32_t available = 0;
    499  if (!ReadParam(aReader, &capacity) || !ReadParam(aReader, &peerStatus) ||
    500      !ReadParam(aReader, &offset) || !ReadParam(aReader, &available)) {
    501    aReader->FatalError("failed to read DataPipe fields");
    502    return false;
    503  }
    504  if (!capacity || offset >= capacity || available > capacity) {
    505    aReader->FatalError("received DataPipe state values are inconsistent");
    506    return false;
    507  }
    508  auto mapping = std::make_shared<SharedMemoryMapping>(shmemHandle.Map());
    509  if (!*mapping ||
    510      mapping->Size() != shared_memory::PageAlignedSize(capacity)) {
    511    aReader->FatalError("failed to map DataPipe shared memory region");
    512    return false;
    513  }
    514 
    515  *aResult = new T(std::move(port), std::move(shmemHandle), mapping, capacity,
    516                   peerStatus, offset, available);
    517  if (MOZ_LOG_TEST(gDataPipeLog, LogLevel::Debug)) {
    518    DataPipeAutoLock lock(*(*aResult)->mMutex);
    519    MOZ_LOG(gDataPipeLog, LogLevel::Debug,
    520            ("IPC Read: %s", (*aResult)->Describe(lock).get()));
    521  }
    522  return true;
    523 }
    524 
    525 }  // namespace data_pipe_detail
    526 
    527 //-----------------------------------------------------------------------------
    528 // DataPipeSender
    529 //-----------------------------------------------------------------------------
    530 
    531 NS_IMPL_ISUPPORTS(DataPipeSender, nsIOutputStream, nsIAsyncOutputStream,
    532                  DataPipeSender)
    533 
    534 // nsIOutputStream
    535 
    536 NS_IMETHODIMP DataPipeSender::Close() {
    537  return CloseWithStatus(NS_BASE_STREAM_CLOSED);
    538 }
    539 
    540 NS_IMETHODIMP DataPipeSender::Flush() { return NS_OK; }
    541 
    542 NS_IMETHODIMP DataPipeSender::StreamStatus() {
    543  data_pipe_detail::DataPipeAutoLock lock(*mMutex);
    544  return CheckStatus(lock);
    545 }
    546 
    547 NS_IMETHODIMP DataPipeSender::Write(const char* aBuf, uint32_t aCount,
    548                                    uint32_t* aWriteCount) {
    549  return WriteSegments(NS_CopyBufferToSegment, (void*)aBuf, aCount,
    550                       aWriteCount);
    551 }
    552 
    553 NS_IMETHODIMP DataPipeSender::WriteFrom(nsIInputStream* aFromStream,
    554                                        uint32_t aCount,
    555                                        uint32_t* aWriteCount) {
    556  return WriteSegments(NS_CopyStreamToSegment, aFromStream, aCount,
    557                       aWriteCount);
    558 }
    559 
    560 NS_IMETHODIMP DataPipeSender::WriteSegments(nsReadSegmentFun aReader,
    561                                            void* aClosure, uint32_t aCount,
    562                                            uint32_t* aWriteCount) {
    563  auto processSegment = [&](Span<char> aSpan, uint32_t aToOffset,
    564                            uint32_t* aReadCount) -> nsresult {
    565    return aReader(this, aClosure, aSpan.data(), aToOffset, aSpan.Length(),
    566                   aReadCount);
    567  };
    568  return ProcessSegmentsInternal(aCount, processSegment, aWriteCount);
    569 }
    570 
    571 NS_IMETHODIMP DataPipeSender::IsNonBlocking(bool* _retval) {
    572  *_retval = true;
    573  return NS_OK;
    574 }
    575 
    576 // nsIAsyncOutputStream
    577 
    578 NS_IMETHODIMP DataPipeSender::CloseWithStatus(nsresult reason) {
    579  data_pipe_detail::DataPipeAutoLock lock(*mMutex);
    580  CloseInternal(lock, reason);
    581  return NS_OK;
    582 }
    583 
    584 NS_IMETHODIMP DataPipeSender::AsyncWait(nsIOutputStreamCallback* aCallback,
    585                                        uint32_t aFlags,
    586                                        uint32_t aRequestedCount,
    587                                        nsIEventTarget* aTarget) {
    588  AsyncWaitInternal(
    589      aCallback ? NS_NewCancelableRunnableFunction(
    590                      "DataPipeSender::AsyncWait",
    591                      [self = RefPtr{this}, callback = RefPtr{aCallback}] {
    592                        MOZ_LOG(gDataPipeLog, LogLevel::Debug,
    593                                ("Calling OnOutputStreamReady(%p, %p)",
    594                                 callback.get(), self.get()));
    595                        callback->OnOutputStreamReady(self);
    596                      })
    597                : nullptr,
    598      do_AddRef(aTarget), aFlags & WAIT_CLOSURE_ONLY);
    599  return NS_OK;
    600 }
    601 
    602 //-----------------------------------------------------------------------------
    603 // DataPipeReceiver
    604 //-----------------------------------------------------------------------------
    605 
    606 NS_IMPL_ISUPPORTS(DataPipeReceiver, nsIInputStream, nsIAsyncInputStream,
    607                  nsIIPCSerializableInputStream, DataPipeReceiver)
    608 
    609 // nsIInputStream
    610 
    611 NS_IMETHODIMP DataPipeReceiver::Close() {
    612  return CloseWithStatus(NS_BASE_STREAM_CLOSED);
    613 }
    614 
    615 NS_IMETHODIMP DataPipeReceiver::Available(uint64_t* _retval) {
    616  data_pipe_detail::DataPipeAutoLock lock(*mMutex);
    617  nsresult rv = CheckStatus(lock);
    618  if (NS_FAILED(rv)) {
    619    return rv;
    620  }
    621  AssertSameMutex(mLink->mMutex);
    622  *_retval = mLink->mAvailable;
    623  return NS_OK;
    624 }
    625 
    626 NS_IMETHODIMP DataPipeReceiver::StreamStatus() {
    627  data_pipe_detail::DataPipeAutoLock lock(*mMutex);
    628  return CheckStatus(lock);
    629 }
    630 
    631 NS_IMETHODIMP DataPipeReceiver::Read(char* aBuf, uint32_t aCount,
    632                                     uint32_t* aReadCount) {
    633  return ReadSegments(NS_CopySegmentToBuffer, aBuf, aCount, aReadCount);
    634 }
    635 
    636 NS_IMETHODIMP DataPipeReceiver::ReadSegments(nsWriteSegmentFun aWriter,
    637                                             void* aClosure, uint32_t aCount,
    638                                             uint32_t* aReadCount) {
    639  auto processSegment = [&](Span<char> aSpan, uint32_t aToOffset,
    640                            uint32_t* aWriteCount) -> nsresult {
    641    return aWriter(this, aClosure, aSpan.data(), aToOffset, aSpan.Length(),
    642                   aWriteCount);
    643  };
    644  return ProcessSegmentsInternal(aCount, processSegment, aReadCount);
    645 }
    646 
    647 NS_IMETHODIMP DataPipeReceiver::IsNonBlocking(bool* _retval) {
    648  *_retval = true;
    649  return NS_OK;
    650 }
    651 
    652 // nsIAsyncInputStream
    653 
    654 NS_IMETHODIMP DataPipeReceiver::CloseWithStatus(nsresult aStatus) {
    655  data_pipe_detail::DataPipeAutoLock lock(*mMutex);
    656  CloseInternal(lock, aStatus);
    657  return NS_OK;
    658 }
    659 
    660 NS_IMETHODIMP DataPipeReceiver::AsyncWait(nsIInputStreamCallback* aCallback,
    661                                          uint32_t aFlags,
    662                                          uint32_t aRequestedCount,
    663                                          nsIEventTarget* aTarget) {
    664  AsyncWaitInternal(
    665      aCallback ? NS_NewCancelableRunnableFunction(
    666                      "DataPipeReceiver::AsyncWait",
    667                      [self = RefPtr{this}, callback = RefPtr{aCallback}] {
    668                        MOZ_LOG(gDataPipeLog, LogLevel::Debug,
    669                                ("Calling OnInputStreamReady(%p, %p)",
    670                                 callback.get(), self.get()));
    671                        callback->OnInputStreamReady(self);
    672                      })
    673                : nullptr,
    674      do_AddRef(aTarget), aFlags & WAIT_CLOSURE_ONLY);
    675  return NS_OK;
    676 }
    677 
    678 // nsIIPCSerializableInputStream
    679 
    680 void DataPipeReceiver::SerializedComplexity(uint32_t aMaxSize,
    681                                            uint32_t* aSizeUsed,
    682                                            uint32_t* aPipes,
    683                                            uint32_t* aTransferables) {
    684  // We report DataPipeReceiver as taking one transferrable to serialize, rather
    685  // than one pipe, as we aren't starting a new pipe for this purpose, and are
    686  // instead transferring an existing pipe.
    687  *aTransferables = 1;
    688 }
    689 
    690 void DataPipeReceiver::Serialize(InputStreamParams& aParams, uint32_t aMaxSize,
    691                                 uint32_t* aSizeUsed) {
    692  *aSizeUsed = 0;
    693  aParams = DataPipeReceiverStreamParams(WrapNotNull(this));
    694 }
    695 
    696 bool DataPipeReceiver::Deserialize(const InputStreamParams& aParams) {
    697  MOZ_CRASH("Handled directly in `DeserializeInputStream`");
    698 }
    699 
    700 //-----------------------------------------------------------------------------
    701 // NewDataPipe
    702 //-----------------------------------------------------------------------------
    703 
    704 nsresult NewDataPipe(uint32_t aCapacity, DataPipeSender** aSender,
    705                     DataPipeReceiver** aReceiver) {
    706  if (!aCapacity) {
    707    aCapacity = kDefaultDataPipeCapacity;
    708  }
    709 
    710  RefPtr<NodeController> controller = NodeController::GetSingleton();
    711  if (!controller) {
    712    return NS_ERROR_ILLEGAL_DURING_SHUTDOWN;
    713  }
    714 
    715  // Allocate a pair of ports for messaging between the sender & receiver.
    716  auto [senderPort, receiverPort] = controller->CreatePortPair();
    717 
    718  // Create and allocate the shared memory region.
    719  size_t alignedCapacity = shared_memory::PageAlignedSize(aCapacity);
    720  auto handle = shared_memory::Create(alignedCapacity);
    721  if (!handle) {
    722    return NS_ERROR_OUT_OF_MEMORY;
    723  }
    724  auto mapping = std::make_shared<SharedMemoryMapping>(handle.Map());
    725  if (!*mapping) {
    726    return NS_ERROR_OUT_OF_MEMORY;
    727  }
    728 
    729  // We'll first clone then take the handle from the region so that the sender &
    730  // receiver each have a handle. This avoids the need to duplicate the handle
    731  // when serializing, when errors are non-recoverable.
    732  auto senderShmemHandle = handle.Clone();
    733  auto receiverShmemHandle = std::move(handle);
    734  if (!senderShmemHandle || !receiverShmemHandle) {
    735    return NS_ERROR_OUT_OF_MEMORY;
    736  }
    737 
    738  RefPtr sender =
    739      new DataPipeSender(std::move(senderPort), std::move(senderShmemHandle),
    740                         mapping, aCapacity, NS_OK, 0, aCapacity);
    741  RefPtr receiver = new DataPipeReceiver(std::move(receiverPort),
    742                                         std::move(receiverShmemHandle),
    743                                         mapping, aCapacity, NS_OK, 0, 0);
    744  sender.forget(aSender);
    745  receiver.forget(aReceiver);
    746  return NS_OK;
    747 }
    748 
    749 }  // namespace ipc
    750 }  // namespace mozilla
    751 
    752 void IPC::ParamTraits<mozilla::ipc::DataPipeSender*>::Write(
    753    MessageWriter* aWriter, mozilla::ipc::DataPipeSender* aParam) {
    754  mozilla::ipc::data_pipe_detail::DataPipeWrite(aWriter, aParam);
    755 }
    756 
    757 bool IPC::ParamTraits<mozilla::ipc::DataPipeSender*>::Read(
    758    MessageReader* aReader, RefPtr<mozilla::ipc::DataPipeSender>* aResult) {
    759  return mozilla::ipc::data_pipe_detail::DataPipeRead(aReader, aResult);
    760 }
    761 
    762 void IPC::ParamTraits<mozilla::ipc::DataPipeReceiver*>::Write(
    763    MessageWriter* aWriter, mozilla::ipc::DataPipeReceiver* aParam) {
    764  mozilla::ipc::data_pipe_detail::DataPipeWrite(aWriter, aParam);
    765 }
    766 
    767 bool IPC::ParamTraits<mozilla::ipc::DataPipeReceiver*>::Read(
    768    MessageReader* aReader, RefPtr<mozilla::ipc::DataPipeReceiver>* aResult) {
    769  return mozilla::ipc::data_pipe_detail::DataPipeRead(aReader, aResult);
    770 }