tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

WebTransportStreamProxy.cpp (12815B)


      1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
      2 /* This Source Code Form is subject to the terms of the Mozilla Public
      3 * License, v. 2.0. If a copy of the MPL was not distributed with this
      4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
      5 
      6 #include "WebTransportStreamProxy.h"
      7 
      8 #include "WebTransportLog.h"
      9 #include "nsProxyRelease.h"
     10 #include "nsSocketTransportService2.h"
     11 
     12 namespace mozilla::net {
     13 
     14 NS_IMPL_ADDREF(WebTransportStreamProxy)
     15 NS_IMPL_RELEASE(WebTransportStreamProxy)
     16 
     17 NS_INTERFACE_MAP_BEGIN(WebTransportStreamProxy)
     18  NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIWebTransportReceiveStream)
     19  NS_INTERFACE_MAP_ENTRY(nsIWebTransportReceiveStream)
     20  NS_INTERFACE_MAP_ENTRY(nsIWebTransportSendStream)
     21  NS_INTERFACE_MAP_ENTRY(nsIWebTransportBidirectionalStream)
     22 NS_INTERFACE_MAP_END
     23 
     24 WebTransportStreamProxy::WebTransportStreamProxy(
     25    WebTransportStreamBase* aStream)
     26    : mWebTransportStream(aStream) {
     27  nsCOMPtr<nsIAsyncInputStream> inputStream;
     28  nsCOMPtr<nsIAsyncOutputStream> outputStream;
     29  mWebTransportStream->GetWriterAndReader(getter_AddRefs(outputStream),
     30                                          getter_AddRefs(inputStream));
     31  if (outputStream) {
     32    mWriter = new AsyncOutputStreamWrapper(outputStream);
     33  }
     34  if (inputStream) {
     35    mReader = new AsyncInputStreamWrapper(inputStream, mWebTransportStream);
     36  }
     37 }
     38 
     39 WebTransportStreamProxy::~WebTransportStreamProxy() {
     40  // mWebTransportStream needs to be destroyed on the socket thread.
     41  NS_ProxyRelease("WebTransportStreamProxy::~WebTransportStreamProxy",
     42                  gSocketTransportService, mWebTransportStream.forget());
     43 }
     44 
     45 NS_IMETHODIMP WebTransportStreamProxy::SendStopSending(uint8_t aError) {
     46  if (!OnSocketThread()) {
     47    RefPtr<WebTransportStreamProxy> self(this);
     48    return gSocketTransportService->Dispatch(
     49        NS_NewRunnableFunction("WebTransportStreamProxy::SendStopSending",
     50                               [self{std::move(self)}, error(aError)]() {
     51                                 self->SendStopSending(error);
     52                               }));
     53  }
     54 
     55  mWebTransportStream->SendStopSending(aError);
     56  return NS_OK;
     57 }
     58 
     59 NS_IMETHODIMP WebTransportStreamProxy::SendFin(void) {
     60  if (!mWriter) {
     61    return NS_ERROR_UNEXPECTED;
     62  }
     63 
     64  mWriter->Close();
     65 
     66  if (!OnSocketThread()) {
     67    RefPtr<WebTransportStreamProxy> self(this);
     68    return gSocketTransportService->Dispatch(NS_NewRunnableFunction(
     69        "WebTransportStreamProxy::SendFin",
     70        [self{std::move(self)}]() { self->mWebTransportStream->SendFin(); }));
     71  }
     72 
     73  mWebTransportStream->SendFin();
     74  return NS_OK;
     75 }
     76 
     77 NS_IMETHODIMP WebTransportStreamProxy::Reset(uint8_t aErrorCode) {
     78  if (!mWriter) {
     79    return NS_ERROR_UNEXPECTED;
     80  }
     81 
     82  mWriter->Close();
     83 
     84  if (!OnSocketThread()) {
     85    RefPtr<WebTransportStreamProxy> self(this);
     86    return gSocketTransportService->Dispatch(
     87        NS_NewRunnableFunction("WebTransportStreamProxy::Reset",
     88                               [self{std::move(self)}, error(aErrorCode)]() {
     89                                 self->mWebTransportStream->Reset(error);
     90                               }));
     91  }
     92 
     93  mWebTransportStream->Reset(aErrorCode);
     94  return NS_OK;
     95 }
     96 
     97 namespace {
     98 
     99 class StatsCallbackWrapper : public nsIWebTransportStreamStatsCallback {
    100 public:
    101  NS_DECL_THREADSAFE_ISUPPORTS
    102 
    103  explicit StatsCallbackWrapper(nsIWebTransportStreamStatsCallback* aCallback)
    104      : mCallback(aCallback), mTarget(GetCurrentSerialEventTarget()) {}
    105 
    106  NS_IMETHOD OnSendStatsAvailable(
    107      nsIWebTransportSendStreamStats* aStats) override {
    108    if (!mTarget->IsOnCurrentThread()) {
    109      RefPtr<StatsCallbackWrapper> self(this);
    110      nsCOMPtr<nsIWebTransportSendStreamStats> stats = aStats;
    111      (void)mTarget->Dispatch(NS_NewRunnableFunction(
    112          "StatsCallbackWrapper::OnSendStatsAvailable",
    113          [self{std::move(self)}, stats{std::move(stats)}]() {
    114            self->OnSendStatsAvailable(stats);
    115          }));
    116      return NS_OK;
    117    }
    118 
    119    mCallback->OnSendStatsAvailable(aStats);
    120    return NS_OK;
    121  }
    122 
    123  NS_IMETHOD OnReceiveStatsAvailable(
    124      nsIWebTransportReceiveStreamStats* aStats) override {
    125    if (!mTarget->IsOnCurrentThread()) {
    126      RefPtr<StatsCallbackWrapper> self(this);
    127      nsCOMPtr<nsIWebTransportReceiveStreamStats> stats = aStats;
    128      (void)mTarget->Dispatch(NS_NewRunnableFunction(
    129          "StatsCallbackWrapper::OnReceiveStatsAvailable",
    130          [self{std::move(self)}, stats{std::move(stats)}]() {
    131            self->OnReceiveStatsAvailable(stats);
    132          }));
    133      return NS_OK;
    134    }
    135 
    136    mCallback->OnReceiveStatsAvailable(aStats);
    137    return NS_OK;
    138  }
    139 
    140 private:
    141  virtual ~StatsCallbackWrapper() {
    142    NS_ProxyRelease("StatsCallbackWrapper::~StatsCallbackWrapper", mTarget,
    143                    mCallback.forget());
    144  }
    145 
    146  nsCOMPtr<nsIWebTransportStreamStatsCallback> mCallback;
    147  nsCOMPtr<nsIEventTarget> mTarget;
    148 };
    149 
    150 NS_IMPL_ISUPPORTS(StatsCallbackWrapper, nsIWebTransportStreamStatsCallback)
    151 
    152 }  // namespace
    153 
    154 NS_IMETHODIMP WebTransportStreamProxy::GetSendStreamStats(
    155    nsIWebTransportStreamStatsCallback* aCallback) {
    156  if (!OnSocketThread()) {
    157    RefPtr<WebTransportStreamProxy> self(this);
    158    nsCOMPtr<nsIWebTransportStreamStatsCallback> callback =
    159        new StatsCallbackWrapper(aCallback);
    160    return gSocketTransportService->Dispatch(NS_NewRunnableFunction(
    161        "WebTransportStreamProxy::GetSendStreamStats",
    162        [self{std::move(self)}, callback{std::move(callback)}]() {
    163          self->GetSendStreamStats(callback);
    164        }));
    165  }
    166 
    167  nsCOMPtr<nsIWebTransportSendStreamStats> stats =
    168      mWebTransportStream->GetSendStreamStats();
    169  aCallback->OnSendStatsAvailable(stats);
    170  return NS_OK;
    171 }
    172 
    173 NS_IMETHODIMP WebTransportStreamProxy::GetReceiveStreamStats(
    174    nsIWebTransportStreamStatsCallback* aCallback) {
    175  if (!OnSocketThread()) {
    176    RefPtr<WebTransportStreamProxy> self(this);
    177    nsCOMPtr<nsIWebTransportStreamStatsCallback> callback =
    178        new StatsCallbackWrapper(aCallback);
    179    return gSocketTransportService->Dispatch(NS_NewRunnableFunction(
    180        "WebTransportStreamProxy::GetReceiveStreamStats",
    181        [self{std::move(self)}, callback{std::move(callback)}]() {
    182          self->GetReceiveStreamStats(callback);
    183        }));
    184  }
    185 
    186  nsCOMPtr<nsIWebTransportReceiveStreamStats> stats =
    187      mWebTransportStream->GetReceiveStreamStats();
    188  aCallback->OnReceiveStatsAvailable(stats);
    189  return NS_OK;
    190 }
    191 
    192 NS_IMETHODIMP WebTransportStreamProxy::GetHasReceivedFIN(
    193    bool* aHasReceivedFIN) {
    194  *aHasReceivedFIN = mWebTransportStream->RecvDone();
    195  return NS_OK;
    196 }
    197 
    198 NS_IMETHODIMP WebTransportStreamProxy::GetInputStream(
    199    nsIAsyncInputStream** aOut) {
    200  if (!mReader) {
    201    return NS_ERROR_NOT_AVAILABLE;
    202  }
    203 
    204  RefPtr<AsyncInputStreamWrapper> stream = mReader;
    205  stream.forget(aOut);
    206  return NS_OK;
    207 }
    208 
    209 NS_IMETHODIMP WebTransportStreamProxy::GetOutputStream(
    210    nsIAsyncOutputStream** aOut) {
    211  if (!mWriter) {
    212    return NS_ERROR_NOT_AVAILABLE;
    213  }
    214 
    215  RefPtr<AsyncOutputStreamWrapper> stream = mWriter;
    216  stream.forget(aOut);
    217  return NS_OK;
    218 }
    219 
    220 NS_IMETHODIMP WebTransportStreamProxy::GetStreamId(uint64_t* aId) {
    221  *aId = mWebTransportStream->GetStreamId();
    222  return NS_OK;
    223 }
    224 
    225 NS_IMETHODIMP WebTransportStreamProxy::SetSendOrder(Maybe<int64_t> aSendOrder) {
    226  if (!OnSocketThread()) {
    227    return gSocketTransportService->Dispatch(NS_NewRunnableFunction(
    228        "SetSendOrder", [stream = mWebTransportStream, aSendOrder]() {
    229          stream->SetSendOrder(aSendOrder);
    230        }));
    231  }
    232  mWebTransportStream->SetSendOrder(aSendOrder);
    233  return NS_OK;
    234 }
    235 
    236 //------------------------------------------------------------------------------
    237 // WebTransportStreamProxy::AsyncInputStreamWrapper
    238 //------------------------------------------------------------------------------
    239 
    240 NS_IMPL_ISUPPORTS(WebTransportStreamProxy::AsyncInputStreamWrapper,
    241                  nsIInputStream, nsIAsyncInputStream)
    242 
    243 WebTransportStreamProxy::AsyncInputStreamWrapper::AsyncInputStreamWrapper(
    244    nsIAsyncInputStream* aStream, WebTransportStreamBase* aWebTransportStream)
    245    : mStream(aStream), mWebTransportStream(aWebTransportStream) {}
    246 
    247 WebTransportStreamProxy::AsyncInputStreamWrapper::~AsyncInputStreamWrapper() =
    248    default;
    249 
    250 void WebTransportStreamProxy::AsyncInputStreamWrapper::MaybeCloseStream() {
    251  if (!mWebTransportStream->RecvDone()) {
    252    return;
    253  }
    254 
    255  uint64_t available = 0;
    256  (void)Available(&available);
    257  if (available) {
    258    // Don't close the InputStream if there's unread data available, since it
    259    // would be lost. We exit above unless we know no more data will be received
    260    // for the stream.
    261    return;
    262  }
    263 
    264  LOG(
    265      ("AsyncInputStreamWrapper::MaybeCloseStream close stream due to FIN "
    266       "stream=%p",
    267       mWebTransportStream.get()));
    268  Close();
    269 }
    270 
    271 NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::Close() {
    272  return mStream->CloseWithStatus(NS_BASE_STREAM_CLOSED);
    273 }
    274 
    275 NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::Available(
    276    uint64_t* aAvailable) {
    277  return mStream->Available(aAvailable);
    278 }
    279 
    280 NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::StreamStatus() {
    281  return mStream->StreamStatus();
    282 }
    283 
    284 NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::Read(
    285    char* aBuf, uint32_t aCount, uint32_t* aResult) {
    286  LOG(("WebTransportStreamProxy::AsyncInputStreamWrapper::Read %p", this));
    287  nsresult rv = mStream->Read(aBuf, aCount, aResult);
    288  MaybeCloseStream();
    289  return rv;
    290 }
    291 
    292 NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::ReadSegments(
    293    nsWriteSegmentFun aWriter, void* aClosure, uint32_t aCount,
    294    uint32_t* aResult) {
    295  LOG(("WebTransportStreamProxy::AsyncInputStreamWrapper::ReadSegments %p",
    296       this));
    297  nsresult rv = mStream->ReadSegments(aWriter, aClosure, aCount, aResult);
    298  if (*aResult > 0) {
    299    LOG(("   Read %u bytes", *aResult));
    300  }
    301  MaybeCloseStream();
    302  return rv;
    303 }
    304 
    305 NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::IsNonBlocking(
    306    bool* aResult) {
    307  return mStream->IsNonBlocking(aResult);
    308 }
    309 
    310 NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::CloseWithStatus(
    311    nsresult aStatus) {
    312  return mStream->CloseWithStatus(aStatus);
    313 }
    314 
    315 NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::AsyncWait(
    316    nsIInputStreamCallback* aCallback, uint32_t aFlags,
    317    uint32_t aRequestedCount, nsIEventTarget* aEventTarget) {
    318  return mStream->AsyncWait(aCallback, aFlags, aRequestedCount, aEventTarget);
    319 }
    320 
    321 //------------------------------------------------------------------------------
    322 // WebTransportStreamProxy::AsyncOutputStreamWrapper
    323 //------------------------------------------------------------------------------
    324 
    325 NS_IMPL_ISUPPORTS(WebTransportStreamProxy::AsyncOutputStreamWrapper,
    326                  nsIOutputStream, nsIAsyncOutputStream)
    327 
    328 WebTransportStreamProxy::AsyncOutputStreamWrapper::AsyncOutputStreamWrapper(
    329    nsIAsyncOutputStream* aStream)
    330    : mStream(aStream) {}
    331 
    332 WebTransportStreamProxy::AsyncOutputStreamWrapper::~AsyncOutputStreamWrapper() =
    333    default;
    334 
    335 NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::Flush() {
    336  return mStream->Flush();
    337 }
    338 
    339 NS_IMETHODIMP
    340 WebTransportStreamProxy::AsyncOutputStreamWrapper::StreamStatus() {
    341  return mStream->StreamStatus();
    342 }
    343 
    344 NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::Write(
    345    const char* aBuf, uint32_t aCount, uint32_t* aResult) {
    346  LOG(
    347      ("WebTransportStreamProxy::AsyncOutputStreamWrapper::Write %p %u bytes, "
    348       "first byte %c",
    349       this, aCount, aBuf[0]));
    350  return mStream->Write(aBuf, aCount, aResult);
    351 }
    352 
    353 NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::WriteFrom(
    354    nsIInputStream* aFromStream, uint32_t aCount, uint32_t* aResult) {
    355  return mStream->WriteFrom(aFromStream, aCount, aResult);
    356 }
    357 
    358 NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::WriteSegments(
    359    nsReadSegmentFun aReader, void* aClosure, uint32_t aCount,
    360    uint32_t* aResult) {
    361  return mStream->WriteSegments(aReader, aClosure, aCount, aResult);
    362 }
    363 
    364 NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::AsyncWait(
    365    nsIOutputStreamCallback* aCallback, uint32_t aFlags,
    366    uint32_t aRequestedCount, nsIEventTarget* aEventTarget) {
    367  return mStream->AsyncWait(aCallback, aFlags, aRequestedCount, aEventTarget);
    368 }
    369 
    370 NS_IMETHODIMP
    371 WebTransportStreamProxy::AsyncOutputStreamWrapper::CloseWithStatus(
    372    nsresult aStatus) {
    373  return mStream->CloseWithStatus(aStatus);
    374 }
    375 
    376 NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::Close() {
    377  return mStream->Close();
    378 }
    379 
    380 NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::IsNonBlocking(
    381    bool* aResult) {
    382  return mStream->IsNonBlocking(aResult);
    383 }
    384 
    385 }  // namespace mozilla::net