tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

Http3WebTransportStream.cpp (20667B)


      1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
      2 /* This Source Code Form is subject to the terms of the Mozilla Public
      3 * License, v. 2.0. If a copy of the MPL was not distributed with this
      4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
      5 
      6 #include "Http3WebTransportStream.h"
      7 
      8 #include "HttpLog.h"
      9 #include "Http3Session.h"
     10 #include "Http3WebTransportSession.h"
     11 #include "mozilla/TimeStamp.h"
     12 #include "nsHttpHandler.h"
     13 #include "nsIOService.h"
     14 #include "nsIPipe.h"
     15 #include "nsSocketTransportService2.h"
     16 #include "nsIWebTransportStream.h"
     17 
     18 namespace mozilla::net {
     19 
     20 namespace {
     21 
     22 // This is an nsAHttpTransaction that does nothing.
     23 class DummyWebTransportStreamTransaction : public nsAHttpTransaction {
     24 public:
     25  NS_DECL_THREADSAFE_ISUPPORTS
     26 
     27  DummyWebTransportStreamTransaction() = default;
     28 
     29  void SetConnection(nsAHttpConnection*) override {}
     30  nsAHttpConnection* Connection() override { return nullptr; }
     31  void GetSecurityCallbacks(nsIInterfaceRequestor**) override {}
     32  void OnTransportStatus(nsITransport* transport, nsresult status,
     33                         int64_t progress) override {}
     34  bool IsDone() override { return false; }
     35  nsresult Status() override { return NS_OK; }
     36  uint32_t Caps() override { return 0; }
     37  [[nodiscard]] nsresult ReadSegments(nsAHttpSegmentReader*, uint32_t,
     38                                      uint32_t*) override {
     39    return NS_OK;
     40  }
     41  [[nodiscard]] nsresult WriteSegments(nsAHttpSegmentWriter*, uint32_t,
     42                                       uint32_t*) override {
     43    return NS_OK;
     44  }
     45  void Close(nsresult reason) override {}
     46  nsHttpConnectionInfo* ConnectionInfo() override { return nullptr; }
     47  void SetProxyConnectFailed() override {}
     48  nsHttpRequestHead* RequestHead() override { return nullptr; }
     49  uint32_t Http1xTransactionCount() override { return 0; }
     50  [[nodiscard]] nsresult TakeSubTransactions(
     51      nsTArray<RefPtr<nsAHttpTransaction>>& outTransactions) override {
     52    return NS_OK;
     53  }
     54 
     55 private:
     56  virtual ~DummyWebTransportStreamTransaction() = default;
     57 };
     58 
     59 NS_IMPL_ISUPPORTS(DummyWebTransportStreamTransaction, nsISupportsWeakReference)
     60 
     61 class WebTransportSendStreamStats : public nsIWebTransportSendStreamStats {
     62 public:
     63  NS_DECL_THREADSAFE_ISUPPORTS
     64 
     65  explicit WebTransportSendStreamStats(uint64_t aSent, uint64_t aAcked)
     66      : mTimeStamp(TimeStamp::Now()),
     67        mTotalSent(aSent),
     68        mTotalAcknowledged(aAcked) {}
     69 
     70  NS_IMETHOD GetTimestamp(mozilla::TimeStamp* aTimestamp) override {
     71    *aTimestamp = mTimeStamp;
     72    return NS_OK;
     73  }
     74  NS_IMETHOD GetBytesSent(uint64_t* aBytesSent) override {
     75    *aBytesSent = mTotalSent;
     76    return NS_OK;
     77  }
     78  NS_IMETHOD GetBytesAcknowledged(uint64_t* aBytesAcknowledged) override {
     79    *aBytesAcknowledged = mTotalAcknowledged;
     80    return NS_OK;
     81  }
     82 
     83 private:
     84  virtual ~WebTransportSendStreamStats() = default;
     85 
     86  TimeStamp mTimeStamp;
     87  uint64_t mTotalSent;
     88  uint64_t mTotalAcknowledged;
     89 };
     90 
     91 NS_IMPL_ISUPPORTS(WebTransportSendStreamStats, nsIWebTransportSendStreamStats)
     92 
     93 class WebTransportReceiveStreamStats
     94    : public nsIWebTransportReceiveStreamStats {
     95 public:
     96  NS_DECL_THREADSAFE_ISUPPORTS
     97 
     98  explicit WebTransportReceiveStreamStats(uint64_t aReceived)
     99      : mTimeStamp(TimeStamp::Now()), mTotalReceived(aReceived) {}
    100 
    101  NS_IMETHOD GetTimestamp(mozilla::TimeStamp* aTimestamp) override {
    102    *aTimestamp = mTimeStamp;
    103    return NS_OK;
    104  }
    105  NS_IMETHOD GetBytesReceived(uint64_t* aByteReceived) override {
    106    *aByteReceived = mTotalReceived;
    107    return NS_OK;
    108  }
    109 
    110 private:
    111  virtual ~WebTransportReceiveStreamStats() = default;
    112 
    113  TimeStamp mTimeStamp;
    114  uint64_t mTotalReceived;
    115 };
    116 
    117 NS_IMPL_ISUPPORTS(WebTransportReceiveStreamStats,
    118                  nsIWebTransportReceiveStreamStats)
    119 
    120 }  // namespace
    121 
    122 NS_IMPL_ISUPPORTS(Http3WebTransportStream, nsIInputStreamCallback,
    123                  nsIOutputStreamCallback)
    124 
    125 Http3WebTransportStream::Http3WebTransportStream(
    126    Http3SessionBase* aSession, uint64_t aSessionId,
    127    WebTransportStreamType aType,
    128    std::function<void(Result<RefPtr<WebTransportStreamBase>, nsresult>&&)>&&
    129        aCallback)
    130    : WebTransportStreamBase(aSessionId, std::move(aCallback)),
    131      Http3StreamBase(new DummyWebTransportStreamTransaction(), aSession) {
    132  LOG(("Http3WebTransportStream outgoing ctor %p", this));
    133  mStreamRole = OUTGOING;
    134  mStreamType = aType;
    135 }
    136 
    137 Http3WebTransportStream::Http3WebTransportStream(Http3SessionBase* aSession,
    138                                                 uint64_t aSessionId,
    139                                                 WebTransportStreamType aType,
    140                                                 uint64_t aStreamId)
    141    : WebTransportStreamBase(aSessionId, nullptr),
    142      Http3StreamBase(new DummyWebTransportStreamTransaction(), aSession) {
    143  LOG(("Http3WebTransportStream incoming ctor %p", this));
    144  mStreamId = aStreamId;
    145  mStreamRole = INCOMING;
    146  mStreamType = aType;
    147  // WAITING_DATA indicates we are waiting
    148  // Http3WebTransportStream::OnInputStreamReady to be called.
    149  mSendState = WAITING_DATA;
    150 }
    151 
    152 Http3WebTransportStream::~Http3WebTransportStream() {
    153  LOG(("Http3WebTransportStream dtor %p", this));
    154 }
    155 
    156 StreamId Http3WebTransportStream::WebTransportStreamId() const {
    157  return StreamId::From(Http3StreamBase::StreamId());
    158 }
    159 
    160 uint64_t Http3WebTransportStream::GetStreamId() const {
    161  return Http3StreamBase::StreamId();
    162 }
    163 
    164 nsresult Http3WebTransportStream::TryActivating() {
    165  MOZ_ASSERT(OnSocketThread(), "not on socket thread");
    166  return mSession->TryActivatingWebTransportStream(&mStreamId, this);
    167 }
    168 
    169 NS_IMETHODIMP Http3WebTransportStream::OnInputStreamReady(
    170    nsIAsyncInputStream* aStream) {
    171  LOG1(
    172      ("Http3WebTransportStream::OnInputStreamReady [this=%p stream=%p "
    173       "state=%d]",
    174       this, aStream, mSendState));
    175  if (mSendState == SEND_DONE) {
    176    // already closed
    177    return NS_OK;
    178  }
    179 
    180  mSendState = SENDING;
    181  mSession->StreamHasDataToWrite(this);
    182  return NS_OK;
    183 }
    184 
    185 NS_IMETHODIMP
    186 Http3WebTransportStream::OnOutputStreamReady(nsIAsyncOutputStream* aOutStream) {
    187  MOZ_ASSERT(OnSocketThread(), "not on socket thread");
    188  if (!mSession) {
    189    return NS_OK;
    190  }
    191 
    192  mSession->ConnectSlowConsumer(this);
    193  return NS_OK;
    194 }
    195 
    196 already_AddRefed<nsIWebTransportSendStreamStats>
    197 Http3WebTransportStream::GetSendStreamStats() {
    198  MOZ_ASSERT(OnSocketThread(), "not on socket thread");
    199 
    200  nsCOMPtr<nsIWebTransportSendStreamStats> stats =
    201      new WebTransportSendStreamStats(mTotalSent, mTotalAcknowledged);
    202  return stats.forget();
    203 }
    204 
    205 already_AddRefed<nsIWebTransportReceiveStreamStats>
    206 Http3WebTransportStream::GetReceiveStreamStats() {
    207  MOZ_ASSERT(OnSocketThread(), "not on socket thread");
    208 
    209  nsCOMPtr<nsIWebTransportReceiveStreamStats> stats =
    210      new WebTransportReceiveStreamStats(mTotalReceived);
    211  return stats.forget();
    212 }
    213 
    214 nsresult Http3WebTransportStream::OnReadSegment(const char* buf, uint32_t count,
    215                                                uint32_t* countRead) {
    216  MOZ_ASSERT(OnSocketThread(), "not on socket thread");
    217 
    218  LOG(("Http3WebTransportStream::OnReadSegment count=%u state=%d [this=%p]",
    219       count, mSendState, this));
    220 
    221  nsresult rv = NS_OK;
    222 
    223  switch (mSendState) {
    224    case WAITING_TO_ACTIVATE: {
    225      rv = TryActivating();
    226      if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
    227        LOG3(
    228            ("Http3WebTransportStream::OnReadSegment %p cannot activate now. "
    229             "queued.\n",
    230             this));
    231        break;
    232      }
    233      if (NS_FAILED(rv)) {
    234        LOG3(
    235            ("Http3WebTransportStream::OnReadSegment %p cannot activate "
    236             "error=0x%" PRIx32 ".",
    237             this, static_cast<uint32_t>(rv)));
    238        mStreamReadyCallback(Err(rv));
    239        mStreamReadyCallback = nullptr;
    240        break;
    241      }
    242 
    243      rv = InitOutputPipe();
    244      if (NS_SUCCEEDED(rv)) {
    245        mSendState = WAITING_DATA;
    246        if (mStreamType == WebTransportStreamType::BiDi) {
    247          rv = InitInputPipe();
    248          if (NS_SUCCEEDED(rv)) {
    249            mRecvState = READING;
    250          }
    251        }
    252      }
    253      if (NS_FAILED(rv)) {
    254        LOG3(
    255            ("Http3WebTransportStream::OnReadSegment %p failed to create pipe "
    256             "error=0x%" PRIx32 ".",
    257             this, static_cast<uint32_t>(rv)));
    258        mSendState = SEND_DONE;
    259        mStreamReadyCallback(Err(rv));
    260        mStreamReadyCallback = nullptr;
    261        break;
    262      }
    263 
    264      // Successfully activated.
    265      RefPtr<WebTransportStreamBase> stream = this;
    266      mStreamReadyCallback(stream);
    267      mStreamReadyCallback = nullptr;
    268    } break;
    269    case SENDING: {
    270      rv = mSession->SendRequestBody(mStreamId, buf, count, countRead);
    271      LOG3(
    272          ("Http3WebTransportStream::OnReadSegment %p sending body returns "
    273           "error=0x%" PRIx32 ".",
    274           this, static_cast<uint32_t>(rv)));
    275      mTotalSent += *countRead;
    276    } break;
    277    case WAITING_DATA:
    278      // Still waiting
    279      LOG3((
    280          "Http3WebTransportStream::OnReadSegment %p Still waiting for data...",
    281          this));
    282      break;
    283    case SEND_DONE:
    284      LOG3(("Http3WebTransportStream::OnReadSegment %p called after SEND_DONE ",
    285            this));
    286      MOZ_ASSERT(false, "We are done sending this request!");
    287      MOZ_ASSERT(mStreamReadyCallback);
    288      rv = NS_ERROR_UNEXPECTED;
    289      mStreamReadyCallback(Err(rv));
    290      mStreamReadyCallback = nullptr;
    291      break;
    292  }
    293 
    294  mSocketOutCondition = rv;
    295 
    296  return mSocketOutCondition;
    297 }
    298 
    299 // static
    300 nsresult Http3WebTransportStream::ReadRequestSegment(
    301    nsIInputStream* stream, void* closure, const char* buf, uint32_t offset,
    302    uint32_t count, uint32_t* countRead) {
    303  Http3WebTransportStream* wtStream = (Http3WebTransportStream*)closure;
    304  nsresult rv = wtStream->OnReadSegment(buf, count, countRead);
    305  LOG(("Http3WebTransportStream::ReadRequestSegment %p read=%u", wtStream,
    306       *countRead));
    307  return rv;
    308 }
    309 
    310 nsresult Http3WebTransportStream::ReadSegments() {
    311  MOZ_ASSERT(OnSocketThread(), "not on socket thread");
    312  LOG(("Http3WebTransportStream::ReadSegments [this=%p]", this));
    313  nsresult rv = NS_OK;
    314  uint32_t sendBytes = 0;
    315  bool again = true;
    316  do {
    317    sendBytes = 0;
    318    rv = mSocketOutCondition = NS_OK;
    319    LOG(("Http3WebTransportStream::ReadSegments state=%d [this=%p]", mSendState,
    320         this));
    321    switch (mSendState) {
    322      case WAITING_TO_ACTIVATE: {
    323        LOG3(
    324            ("Http3WebTransportStream %p ReadSegments forcing OnReadSegment "
    325             "call\n",
    326             this));
    327        uint32_t wasted = 0;
    328        nsresult rv2 = OnReadSegment("", 0, &wasted);
    329        LOG3(("  OnReadSegment returned 0x%08" PRIx32,
    330              static_cast<uint32_t>(rv2)));
    331        if (mSendState != WAITING_DATA) {
    332          break;
    333        }
    334      }
    335        [[fallthrough]];
    336      case WAITING_DATA:
    337        [[fallthrough]];
    338      case SENDING: {
    339        if (mStreamRole == INCOMING &&
    340            mStreamType == WebTransportStreamType::UniDi) {
    341          rv = NS_OK;
    342          break;
    343        }
    344        mSendState = SENDING;
    345        rv = mSendStreamPipeIn->ReadSegments(ReadRequestSegment, this,
    346                                             nsIOService::gDefaultSegmentSize,
    347                                             &sendBytes);
    348      } break;
    349      case SEND_DONE: {
    350        return NS_OK;
    351      }
    352      default:
    353        sendBytes = 0;
    354        rv = NS_OK;
    355        break;
    356    }
    357 
    358    LOG(("Http3WebTransportStream::ReadSegments rv=0x%" PRIx32
    359         " read=%u sock-cond=%" PRIx32 " again=%d mSendFin=%d [this=%p]",
    360         static_cast<uint32_t>(rv), sendBytes,
    361         static_cast<uint32_t>(mSocketOutCondition), again, mSendFin, this));
    362 
    363    // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
    364    if (rv == NS_BASE_STREAM_CLOSED || !mPendingTasks.IsEmpty()) {
    365      rv = NS_OK;
    366      sendBytes = 0;
    367    }
    368 
    369    if (NS_FAILED(rv)) {
    370      // if the writer didn't want to write any more data, then
    371      // wait for the transaction to call ResumeSend.
    372      if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
    373        mSendState = WAITING_DATA;
    374        rv = mSendStreamPipeIn->AsyncWait(this, 0, 0, gSocketTransportService);
    375      }
    376      again = false;
    377 
    378      // Got a  WebTransport specific error
    379      if (rv >= NS_ERROR_WEBTRANSPORT_CODE_BASE &&
    380          rv <= NS_ERROR_WEBTRANSPORT_CODE_END) {
    381        uint8_t errorCode = GetWebTransportErrorFromNSResult(rv);
    382        mSendState = SEND_DONE;
    383        Reset(WebTransportErrorToHttp3Error(errorCode));
    384        rv = NS_OK;
    385      }
    386    } else if (NS_FAILED(mSocketOutCondition)) {
    387      if (mSocketOutCondition != NS_BASE_STREAM_WOULD_BLOCK) {
    388        rv = mSocketOutCondition;
    389      }
    390      again = false;
    391    } else if (!sendBytes) {
    392      mSendState = SEND_DONE;
    393      rv = NS_OK;
    394      again = false;
    395      if (!mPendingTasks.IsEmpty()) {
    396        LOG(("Has pending tasks to do"));
    397        nsTArray<std::function<void()>> tasks = std::move(mPendingTasks);
    398        for (const auto& task : tasks) {
    399          task();
    400        }
    401      }
    402      // Tell the underlying stream we're done
    403      SendFin();
    404    }
    405 
    406    // write more to the socket until error or end-of-request...
    407  } while (again && gHttpHandler->Active());
    408  return rv;
    409 }
    410 
    411 nsresult Http3WebTransportStream::OnWriteSegment(char* buf, uint32_t count,
    412                                                 uint32_t* countWritten) {
    413  MOZ_ASSERT(OnSocketThread(), "not on socket thread");
    414 
    415  LOG(("Http3WebTransportStream::OnWriteSegment [this=%p, state=%d", this,
    416       static_cast<uint32_t>(mRecvState)));
    417  nsresult rv = NS_OK;
    418  switch (mRecvState) {
    419    case READING: {
    420      rv = mSession->ReadResponseData(mStreamId, buf, count, countWritten,
    421                                      &mFin);
    422      if (*countWritten == 0) {
    423        if (mFin) {
    424          mRecvState = RECV_DONE;
    425          rv = NS_BASE_STREAM_CLOSED;
    426        } else {
    427          rv = NS_BASE_STREAM_WOULD_BLOCK;
    428        }
    429      } else {
    430        mTotalReceived += *countWritten;
    431        if (mFin) {
    432          mRecvState = RECEIVED_FIN;
    433        }
    434      }
    435    } break;
    436    case RECEIVED_FIN:
    437      rv = NS_BASE_STREAM_CLOSED;
    438      mRecvState = RECV_DONE;
    439      break;
    440    case RECV_DONE:
    441      rv = NS_ERROR_UNEXPECTED;
    442      break;
    443    default:
    444      rv = NS_ERROR_UNEXPECTED;
    445      break;
    446  }
    447 
    448  // Remember the error received from lower layers. A stream pipe may overwrite
    449  // it.
    450  // If rv == NS_OK this will reset mSocketInCondition.
    451  mSocketInCondition = rv;
    452 
    453  return rv;
    454 }
    455 
    456 // static
    457 nsresult Http3WebTransportStream::WritePipeSegment(nsIOutputStream* stream,
    458                                                   void* closure, char* buf,
    459                                                   uint32_t offset,
    460                                                   uint32_t count,
    461                                                   uint32_t* countWritten) {
    462  Http3WebTransportStream* self = (Http3WebTransportStream*)closure;
    463 
    464  nsresult rv = self->OnWriteSegment(buf, count, countWritten);
    465  if (NS_FAILED(rv)) {
    466    return rv;
    467  }
    468 
    469  LOG(("Http3WebTransportStream::WritePipeSegment %p written=%u", self,
    470       *countWritten));
    471 
    472  return rv;
    473 }
    474 
    475 nsresult Http3WebTransportStream::WriteSegments() {
    476  MOZ_ASSERT(OnSocketThread(), "not on socket thread");
    477  if (!mReceiveStreamPipeOut) {
    478    return NS_OK;
    479  }
    480 
    481  LOG(("Http3WebTransportStream::WriteSegments [this=%p]", this));
    482 
    483  nsresult rv = NS_OK;
    484  uint32_t countWrittenSingle = 0;
    485  bool again = true;
    486 
    487  do {
    488    mSocketInCondition = NS_OK;
    489    countWrittenSingle = 0;
    490    rv = mReceiveStreamPipeOut->WriteSegments(WritePipeSegment, this,
    491                                              nsIOService::gDefaultSegmentSize,
    492                                              &countWrittenSingle);
    493    LOG(("Http3WebTransportStream::WriteSegments rv=0x%" PRIx32
    494         " countWrittenSingle=%" PRIu32 " socketin=%" PRIx32 " [this=%p]",
    495         static_cast<uint32_t>(rv), countWrittenSingle,
    496         static_cast<uint32_t>(mSocketInCondition), this));
    497    if (NS_FAILED(rv)) {
    498      if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
    499        nsCOMPtr<nsIEventTarget> target;
    500        (void)gHttpHandler->GetSocketThreadTarget(getter_AddRefs(target));
    501        if (target) {
    502          mReceiveStreamPipeOut->AsyncWait(this, 0, 0, target);
    503          rv = NS_OK;
    504        } else {
    505          rv = NS_ERROR_UNEXPECTED;
    506        }
    507      }
    508      if (rv == NS_BASE_STREAM_CLOSED) {
    509        mReceiveStreamPipeOut->Close();
    510        rv = NS_OK;
    511      }
    512      again = false;
    513    } else if (NS_FAILED(mSocketInCondition)) {
    514      if (mSocketInCondition != NS_BASE_STREAM_WOULD_BLOCK) {
    515        rv = mSocketInCondition;
    516        if (rv == NS_BASE_STREAM_CLOSED) {
    517          mReceiveStreamPipeOut->Close();
    518          rv = NS_OK;
    519        }
    520      }
    521      again = false;
    522    }
    523    // read more from the socket until error...
    524  } while (again && gHttpHandler->Active());
    525 
    526  return rv;
    527 }
    528 
    529 bool Http3WebTransportStream::Done() const {
    530  return mSendState == SEND_DONE && mRecvState == RECV_DONE;
    531 }
    532 
    533 void Http3WebTransportStream::Close(nsresult aResult) {
    534  MOZ_ASSERT(OnSocketThread(), "not on socket thread");
    535  LOG(("Http3WebTransportStream::Close [this=%p]", this));
    536  mTransaction = nullptr;
    537  if (mSendStreamPipeIn) {
    538    mSendStreamPipeIn->AsyncWait(nullptr, 0, 0, nullptr);
    539    mSendStreamPipeIn->CloseWithStatus(aResult);
    540  }
    541  if (mReceiveStreamPipeOut) {
    542    mReceiveStreamPipeOut->AsyncWait(nullptr, 0, 0, nullptr);
    543    mReceiveStreamPipeOut->CloseWithStatus(aResult);
    544  }
    545  mSendState = SEND_DONE;
    546  mRecvState = RECV_DONE;
    547  mSession = nullptr;
    548 }
    549 
    550 void Http3WebTransportStream::SendFin() {
    551  MOZ_ASSERT(OnSocketThread(), "not on socket thread");
    552  LOG(("Http3WebTransportStream::SendFin [this=%p mSendState=%d]", this,
    553       mSendState));
    554 
    555  if (mSendFin || !mSession || mResetError) {
    556    // Already closed.
    557    return;
    558  }
    559 
    560  mSendFin = true;
    561 
    562  switch (mSendState) {
    563    case SENDING: {
    564      mPendingTasks.AppendElement([self = RefPtr{this}]() {
    565        self->mSession->CloseSendingSide(self->mStreamId);
    566      });
    567    } break;
    568    case WAITING_DATA:
    569      mSendState = SEND_DONE;
    570      [[fallthrough]];
    571    case SEND_DONE:
    572      mSession->CloseSendingSide(mStreamId);
    573      // StreamHasDataToWrite needs to be called to trigger ProcessOutput.
    574      mSession->StreamHasDataToWrite(this);
    575      break;
    576    default:
    577      MOZ_ASSERT_UNREACHABLE("invalid mSendState!");
    578      break;
    579  }
    580 }
    581 
    582 void Http3WebTransportStream::Reset(uint64_t aErrorCode) {
    583  MOZ_ASSERT(OnSocketThread(), "not on socket thread");
    584  LOG(("Http3WebTransportStream::Reset [this=%p, mSendState=%d]", this,
    585       mSendState));
    586 
    587  if (mResetError || !mSession || mSendFin) {
    588    // The stream is already reset.
    589    return;
    590  }
    591 
    592  mResetError = Some(aErrorCode);
    593 
    594  switch (mSendState) {
    595    case SENDING: {
    596      LOG(("Http3WebTransportStream::Reset [this=%p] reset after sending data",
    597           this));
    598      mPendingTasks.AppendElement([self = RefPtr{this}]() {
    599        // "Reset" needs a special treatment here. If we are sending data and
    600        // ResetWebTransportStream is called before Http3Session::ProcessOutput,
    601        // neqo will drop the last piece of data.
    602        NS_DispatchToCurrentThread(
    603            NS_NewRunnableFunction("Http3WebTransportStream::Reset", [self]() {
    604              self->mSession->ResetWebTransportStream(self, *self->mResetError);
    605              self->mSession->StreamHasDataToWrite(self);
    606              self->mSession->ConnectSlowConsumer(self);
    607            }));
    608      });
    609    } break;
    610    case WAITING_DATA:
    611      mSendState = SEND_DONE;
    612      [[fallthrough]];
    613    case SEND_DONE:
    614      mSession->ResetWebTransportStream(this, *mResetError);
    615      // StreamHasDataToWrite needs to be called to trigger ProcessOutput.
    616      mSession->StreamHasDataToWrite(this);
    617      mSession->ConnectSlowConsumer(this);
    618      break;
    619    default:
    620      MOZ_ASSERT_UNREACHABLE("invalid mSendState!");
    621      break;
    622  }
    623 }
    624 
    625 void Http3WebTransportStream::SendStopSending(uint8_t aErrorCode) {
    626  MOZ_ASSERT(OnSocketThread(), "not on socket thread");
    627  LOG(("Http3WebTransportStream::SendStopSending [this=%p, mSendState=%d]",
    628       this, mSendState));
    629 
    630  if (mSendState == WAITING_TO_ACTIVATE) {
    631    return;
    632  }
    633 
    634  if (mStopSendingError || !mSession) {
    635    return;
    636  }
    637 
    638  mStopSendingError = Some(aErrorCode);
    639 
    640  mSession->StreamStopSending(this, *mStopSendingError);
    641  // StreamHasDataToWrite needs to be called to trigger ProcessOutput.
    642  mSession->StreamHasDataToWrite(this);
    643 }
    644 
    645 void Http3WebTransportStream::SetSendOrder(Maybe<int64_t> aSendOrder) {
    646  if (!mSession) {
    647    return;
    648  }
    649  mSession->SetSendOrder(this, aSendOrder);
    650 }
    651 
    652 }  // namespace mozilla::net