tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

Http2WebTransportSession.cpp (23546B)


      1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
      2 /* vim: set sw=2 ts=8 et tw=80 : */
      3 /* This Source Code Form is subject to the terms of the Mozilla Public
      4 * License, v. 2.0. If a copy of the MPL was not distributed with this
      5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
      6 
      7 // HttpLog.h should generally be included first
      8 #include "HttpLog.h"
      9 
     10 #include "Capsule.h"
     11 #include "CapsuleEncoder.h"
     12 #include "Http2WebTransportSession.h"
     13 #include "Http2WebTransportStream.h"
     14 #include "Http2Session.h"
     15 #include "mozilla/net/NeqoHttp3Conn.h"
     16 #include "nsIWebTransport.h"
     17 #include "nsIOService.h"
     18 #include "nsHttp.h"
     19 
     20 namespace mozilla::net {
     21 
     22 Http2WebTransportSessionImpl::CapsuleQueue::CapsuleQueue() = default;
     23 
     24 mozilla::Queue<UniquePtr<CapsuleEncoder>>&
     25 Http2WebTransportSessionImpl::CapsuleQueue::operator[](
     26    CapsuleTransmissionPriority aPriority) {
     27  if (aPriority == CapsuleTransmissionPriority::Critical) {
     28    return mCritical;
     29  }
     30  if (aPriority == CapsuleTransmissionPriority::Important) {
     31    return mImportant;
     32  }
     33  if (aPriority == CapsuleTransmissionPriority::High) {
     34    return mHigh;
     35  }
     36  if (aPriority == CapsuleTransmissionPriority::Normal) {
     37    return mNormal;
     38  }
     39 
     40  return mLow;
     41 }
     42 
     43 Http2WebTransportSessionImpl::Http2WebTransportSessionImpl(
     44    CapsuleIOHandler* aHandler, Http2WebTransportInitialSettings aSettings)
     45    : mSettings(aSettings),
     46      mRemoteStreamsFlowControl(aSettings.mInitialLocalMaxStreamsBidi,
     47                                aSettings.mInitialLocalMaxStreamsUnidi),
     48      mHandler(aHandler),
     49      mSessionDataFc(aSettings.mInitialMaxData),
     50      mReceiverFc(aSettings.mInitialLocalMaxData) {
     51  LOG(("Http2WebTransportSessionImpl ctor:%p", this));
     52  mLocalStreamsFlowControl[WebTransportStreamType::UniDi].Update(
     53      mSettings.mInitialMaxStreamsUni);
     54  mLocalStreamsFlowControl[WebTransportStreamType::BiDi].Update(
     55      mSettings.mInitialMaxStreamsBidi);
     56 }
     57 
     58 Http2WebTransportSessionImpl::~Http2WebTransportSessionImpl() {
     59  LOG(("Http2WebTransportSessionImpl dtor:%p", this));
     60 }
     61 
     62 void Http2WebTransportSessionImpl::CloseSession(uint32_t aStatus,
     63                                                const nsACString& aReason) {
     64  LOG(("Http2WebTransportSessionImpl::CloseSession %p aStatus=%x", this,
     65       aStatus));
     66 
     67  mHandler->SetSentFin();
     68 
     69  Capsule capsule = Capsule::CloseWebTransportSession(aStatus, aReason);
     70  UniquePtr<CapsuleEncoder> encoder = MakeUnique<CapsuleEncoder>();
     71  encoder->EncodeCapsule(capsule);
     72  EnqueueOutCapsule(CapsuleTransmissionPriority::Important, std::move(encoder));
     73 }
     74 
     75 uint64_t Http2WebTransportSessionImpl::GetStreamId() const { return mStreamId; }
     76 
     77 void Http2WebTransportSessionImpl::GetMaxDatagramSize() {}
     78 
     79 void Http2WebTransportSessionImpl::SendDatagram(nsTArray<uint8_t>&& aData,
     80                                                uint64_t aTrackingId) {
     81  LOG(("Http2WebTransportSession::SendDatagram %p", this));
     82 
     83  Capsule capsule = Capsule::WebTransportDatagram(std::move(aData));
     84 
     85  UniquePtr<CapsuleEncoder> encoder = MakeUnique<CapsuleEncoder>();
     86  encoder->EncodeCapsule(capsule);
     87  EnqueueOutCapsule(CapsuleTransmissionPriority::Normal, std::move(encoder));
     88 }
     89 
     90 void Http2WebTransportSessionImpl::CreateOutgoingStreamInternal(
     91    StreamId aStreamId,
     92    std::function<void(Result<RefPtr<WebTransportStreamBase>, nsresult>&&)>&&
     93        aCallback) {
     94  LOG(
     95      ("Http2WebTransportSessionImpl::CreateOutgoingStreamInternal %p "
     96       "id:%" PRIx64,
     97       this, (uint64_t)aStreamId));
     98 
     99  RefPtr<Http2WebTransportStream> stream = new Http2WebTransportStream(
    100      this, aStreamId,
    101      aStreamId.IsBiDi() ? mSettings.mInitialMaxStreamDataBidi
    102                         : mSettings.mInitialMaxStreamDataUni,
    103      aStreamId.IsBiDi() ? mSettings.mInitialLocalMaxStreamDataBidi
    104                         : mSettings.mInitialLocalMaxStreamDataUnidi,
    105      std::move(aCallback));
    106  if (NS_FAILED(stream->Init())) {
    107    return;
    108  }
    109  mOutgoingStreams.InsertOrUpdate(aStreamId, std::move(stream));
    110 }
    111 
    112 void Http2WebTransportSessionImpl::ProcessPendingStreamCallbacks(
    113    mozilla::Queue<UniquePtr<PendingStreamCallback>>& aCallbacks,
    114    WebTransportStreamType aStreamType) {
    115  size_t size = aCallbacks.Count();
    116  for (size_t count = 0; count < size; ++count) {
    117    auto id = mLocalStreamsFlowControl.TakeStreamId(aStreamType);
    118    if (!id) {
    119      break;
    120    }
    121 
    122    UniquePtr<PendingStreamCallback> callback = aCallbacks.Pop();
    123    auto cb = callback->TakeCallback();
    124    CreateOutgoingStreamInternal(*id, std::move(cb));
    125  }
    126 }
    127 
    128 void Http2WebTransportSessionImpl::CreateOutgoingBidirectionalStream(
    129    std::function<void(Result<RefPtr<WebTransportStreamBase>, nsresult>&&)>&&
    130        aCallback) {
    131  auto id = mLocalStreamsFlowControl.TakeStreamId(WebTransportStreamType::BiDi);
    132  if (!id) {
    133    mBidiPendingStreamCallbacks.Push(
    134        MakeUnique<PendingStreamCallback>(std::move(aCallback)));
    135    return;
    136  }
    137 
    138  CreateOutgoingStreamInternal(*id, std::move(aCallback));
    139 }
    140 
    141 void Http2WebTransportSessionImpl::CreateOutgoingUnidirectionalStream(
    142    std::function<void(Result<RefPtr<WebTransportStreamBase>, nsresult>&&)>&&
    143        aCallback) {
    144  auto id =
    145      mLocalStreamsFlowControl.TakeStreamId(WebTransportStreamType::UniDi);
    146  if (!id) {
    147    mUnidiPendingStreamCallbacks.Push(
    148        MakeUnique<PendingStreamCallback>(std::move(aCallback)));
    149    return;
    150  }
    151 
    152  CreateOutgoingStreamInternal(*id, std::move(aCallback));
    153 }
    154 
    155 void Http2WebTransportSessionImpl::StartReading() {
    156  LOG(("Http2WebTransportSessionImpl::StartReading %p", this));
    157  mHandler->StartReading();
    158 }
    159 
    160 void Http2WebTransportSessionImpl::EnqueueOutCapsule(
    161    CapsuleTransmissionPriority aPriority, UniquePtr<CapsuleEncoder>&& aData) {
    162  mCapsuleQueue[aPriority].Push(std::move(aData));
    163  mHandler->HasCapsuleToSend();
    164 }
    165 
    166 void Http2WebTransportSessionImpl::SendMaintenanceCapsules(
    167    CapsuleTransmissionPriority aPriority) {
    168  auto encoder = mSessionDataFc.CreateSessionDataBlockedCapsule();
    169  if (encoder) {
    170    mCapsuleQueue[aPriority].Push(MakeUnique<CapsuleEncoder>(encoder.ref()));
    171  }
    172  encoder = mReceiverFc.CreateMaxDataCapsule();
    173  if (encoder) {
    174    mCapsuleQueue[aPriority].Push(MakeUnique<CapsuleEncoder>(encoder.ref()));
    175  }
    176  encoder = mLocalStreamsFlowControl[WebTransportStreamType::BiDi]
    177                .CreateStreamsBlockedCapsule();
    178  if (encoder) {
    179    mCapsuleQueue[aPriority].Push(MakeUnique<CapsuleEncoder>(encoder.ref()));
    180  }
    181  encoder = mLocalStreamsFlowControl[WebTransportStreamType::UniDi]
    182                .CreateStreamsBlockedCapsule();
    183  if (encoder) {
    184    mCapsuleQueue[aPriority].Push(MakeUnique<CapsuleEncoder>(encoder.ref()));
    185  }
    186  encoder = mRemoteStreamsFlowControl[WebTransportStreamType::BiDi]
    187                .FlowControl()
    188                .CreateMaxStreamsCapsule();
    189  if (encoder) {
    190    mCapsuleQueue[aPriority].Push(MakeUnique<CapsuleEncoder>(encoder.ref()));
    191  }
    192  encoder = mRemoteStreamsFlowControl[WebTransportStreamType::UniDi]
    193                .FlowControl()
    194                .CreateMaxStreamsCapsule();
    195  if (encoder) {
    196    mCapsuleQueue[aPriority].Push(MakeUnique<CapsuleEncoder>(encoder.ref()));
    197  }
    198  for (const auto& stream : mOutgoingStreams.Values()) {
    199    stream->WriteMaintenanceCapsules(mCapsuleQueue[aPriority]);
    200  }
    201  for (const auto& stream : mIncomingStreams.Values()) {
    202    stream->WriteMaintenanceCapsules(mCapsuleQueue[aPriority]);
    203  }
    204 }
    205 
    206 void Http2WebTransportSessionImpl::StreamHasCapsuleToSend() {
    207  mHandler->HasCapsuleToSend();
    208 }
    209 
    210 void Http2WebTransportSessionImpl::PrepareCapsulesToSend(
    211    mozilla::Queue<UniquePtr<CapsuleEncoder>>& aOutput) {
    212  // Like neqo, flow control capsules are at level
    213  // CapsuleTransmissionPriority::Important.
    214  SendMaintenanceCapsules(CapsuleTransmissionPriority::Important);
    215 
    216  for (const auto& stream : mOutgoingStreams.Values()) {
    217    stream->TakeOutputCapsule(
    218        mCapsuleQueue[CapsuleTransmissionPriority::Normal]);
    219  }
    220  for (const auto& stream : mIncomingStreams.Values()) {
    221    stream->TakeOutputCapsule(
    222        mCapsuleQueue[CapsuleTransmissionPriority::Normal]);
    223  }
    224 
    225  static constexpr CapsuleTransmissionPriority priorities[] = {
    226      CapsuleTransmissionPriority::Critical,
    227      CapsuleTransmissionPriority::Important,
    228      CapsuleTransmissionPriority::High,
    229      CapsuleTransmissionPriority::Normal,
    230      CapsuleTransmissionPriority::Low,
    231  };
    232 
    233  for (CapsuleTransmissionPriority priority : priorities) {
    234    auto& queue = mCapsuleQueue[priority];
    235    while (!queue.IsEmpty()) {
    236      UniquePtr<CapsuleEncoder> entry = queue.Pop();
    237      aOutput.Push(std::move(entry));
    238    }
    239  }
    240 }
    241 
    242 void Http2WebTransportSessionImpl::Close(nsresult aReason) {
    243  for (const auto& stream : mOutgoingStreams.Values()) {
    244    stream->Close(aReason);
    245  }
    246  for (const auto& stream : mIncomingStreams.Values()) {
    247    stream->Close(aReason);
    248  }
    249  mOutgoingStreams.Clear();
    250  mIncomingStreams.Clear();
    251 }
    252 
    253 void Http2WebTransportSessionImpl::OnStreamClosed(
    254    Http2WebTransportStream* aStream) {
    255  LOG(("Http2WebTransportSessionImpl::OnStreamClosed %p stream:%p", this,
    256       aStream));
    257  RefPtr<Http2WebTransportStream> stream = aStream;
    258  StreamId id = stream->WebTransportStreamId();
    259  if (id.IsClientInitiated()) {
    260    mOutgoingStreams.Remove(id);
    261  } else {
    262    mIncomingStreams.Remove(id);
    263    mRemoteStreamsFlowControl[id.StreamType()].FlowControl().AddRetired(1);
    264  }
    265 }
    266 
    267 bool Http2WebTransportSessionImpl::OnCapsule(Capsule&& aCapsule) {
    268  switch (aCapsule.Type()) {
    269    case CapsuleType::CLOSE_WEBTRANSPORT_SESSION:
    270      LOG(("Handling CLOSE_WEBTRANSPORT_SESSION\n"));
    271      break;
    272    case CapsuleType::DRAIN_WEBTRANSPORT_SESSION:
    273      LOG(("Handling DRAIN_WEBTRANSPORT_SESSION\n"));
    274      break;
    275    case CapsuleType::PADDING:
    276      LOG(("Handling PADDING\n"));
    277      break;
    278    case CapsuleType::WT_RESET_STREAM: {
    279      WebTransportResetStreamCapsule& reset =
    280          aCapsule.GetWebTransportResetStreamCapsule();
    281      StreamId id = StreamId(reset.mID);
    282      if (!HandleStreamResetCapsule(id, std::move(aCapsule))) {
    283        return false;
    284      }
    285    } break;
    286    case CapsuleType::WT_STOP_SENDING: {
    287      WebTransportStopSendingCapsule& stopSending =
    288          aCapsule.GetWebTransportStopSendingCapsule();
    289      StreamId id = StreamId(stopSending.mID);
    290      if (!HandleStreamStopSendingCapsule(id, std::move(aCapsule))) {
    291        return false;
    292      }
    293    } break;
    294    case CapsuleType::WT_STREAM: {
    295      WebTransportStreamDataCapsule& streamData =
    296          aCapsule.GetWebTransportStreamDataCapsule();
    297      StreamId id = StreamId(streamData.mID);
    298      if (id.IsServerInitiated()) {
    299        return ProcessIncomingStreamCapsule(std::move(aCapsule), id,
    300                                            id.StreamType());
    301      } else {
    302        RefPtr<Http2WebTransportStream> stream = mOutgoingStreams.Get(id);
    303        if (!stream) {
    304          LOG(
    305              ("Http2WebTransportSessionImpl::OnCapsule - "
    306               "stream not found "
    307               "stream_id=0x%" PRIx64 " [this=%p].",
    308               static_cast<uint64_t>(id), this));
    309          return false;
    310        }
    311        if (NS_FAILED(stream->OnCapsule(std::move(aCapsule)))) {
    312          return false;
    313        }
    314      }
    315      break;
    316    }
    317    case CapsuleType::WT_STREAM_FIN:
    318      LOG(("Handling WT_STREAM_FIN\n"));
    319      break;
    320    case CapsuleType::WT_MAX_DATA: {
    321      LOG(("Handling WT_MAX_DATA\n"));
    322      WebTransportMaxDataCapsule& maxData =
    323          aCapsule.GetWebTransportMaxDataCapsule();
    324      mSessionDataFc.Update(maxData.mMaxDataSize);
    325    } break;
    326    case CapsuleType::WT_MAX_STREAM_DATA: {
    327      WebTransportMaxStreamDataCapsule& maxStreamData =
    328          aCapsule.GetWebTransportMaxStreamDataCapsule();
    329      StreamId id = StreamId(maxStreamData.mID);
    330      if (!HandleMaxStreamDataCapsule(id, std::move(aCapsule))) {
    331        return false;
    332      }
    333    } break;
    334    case CapsuleType::WT_MAX_STREAMS_BIDI: {
    335      LOG(("Handling WT_MAX_STREAMS_BIDI\n"));
    336      WebTransportMaxStreamsCapsule& maxStreams =
    337          aCapsule.GetWebTransportMaxStreamsCapsule();
    338      mLocalStreamsFlowControl[WebTransportStreamType::BiDi].Update(
    339          maxStreams.mLimit);
    340      ProcessPendingStreamCallbacks(mBidiPendingStreamCallbacks,
    341                                    WebTransportStreamType::BiDi);
    342      break;
    343    }
    344    case CapsuleType::WT_MAX_STREAMS_UNIDI: {
    345      LOG(("Handling WT_MAX_STREAMS_UNIDI\n"));
    346      WebTransportMaxStreamsCapsule& maxStreams =
    347          aCapsule.GetWebTransportMaxStreamsCapsule();
    348      mLocalStreamsFlowControl[WebTransportStreamType::UniDi].Update(
    349          maxStreams.mLimit);
    350      ProcessPendingStreamCallbacks(mUnidiPendingStreamCallbacks,
    351                                    WebTransportStreamType::UniDi);
    352      break;
    353    }
    354    case CapsuleType::WT_DATA_BLOCKED:
    355      LOG(("Handling WT_DATA_BLOCKED\n"));
    356      break;
    357    case CapsuleType::WT_STREAM_DATA_BLOCKED:
    358      LOG(("Handling WT_STREAM_DATA_BLOCKED\n"));
    359      break;
    360    case CapsuleType::WT_STREAMS_BLOCKED_BIDI:
    361      LOG(("Handling WT_STREAMS_BLOCKED_BIDI\n"));
    362      break;
    363    case CapsuleType::WT_STREAMS_BLOCKED_UNIDI:
    364      LOG(("Handling WT_STREAMS_BLOCKED_UNIDI\n"));
    365      break;
    366    case CapsuleType::DATAGRAM: {
    367      LOG(("Handling DATAGRAM\n"));
    368      WebTransportDatagramCapsule& datagram =
    369          aCapsule.GetWebTransportDatagramCapsule();
    370      if (nsCOMPtr<WebTransportSessionEventListenerInternal> listener =
    371              do_QueryInterface(mListener)) {
    372        listener->OnDatagramReceivedInternal(std::move(datagram.mPayload));
    373      }
    374      break;
    375    }
    376    default:
    377      LOG(("Unhandled capsule type\n"));
    378      break;
    379  }
    380  return true;
    381 }
    382 
    383 already_AddRefed<Http2WebTransportStream>
    384 Http2WebTransportSessionImpl::GetStream(StreamId aId) {
    385  RefPtr<Http2WebTransportStream> stream;
    386  if (aId.IsClientInitiated()) {
    387    stream = mOutgoingStreams.Get(aId);
    388  } else {
    389    stream = mIncomingStreams.Get(aId);
    390  }
    391 
    392  if (!stream) {
    393    LOG(
    394        ("Http2WebTransportSessionImpl::GetStream - "
    395         "stream not found "
    396         "stream_id=0x%" PRIx64 " [this=%p].",
    397         static_cast<uint64_t>(aId), this));
    398    return nullptr;
    399  }
    400 
    401  return stream.forget();
    402 }
    403 
    404 bool Http2WebTransportSessionImpl::HandleMaxStreamDataCapsule(
    405    StreamId aId, Capsule&& aCapsule) {
    406  RefPtr<Http2WebTransportStream> stream = GetStream(aId);
    407  if (!stream) {
    408    return false;
    409  }
    410 
    411  if (NS_FAILED(stream->OnCapsule(std::move(aCapsule)))) {
    412    return false;
    413  }
    414 
    415  return true;
    416 }
    417 
    418 bool Http2WebTransportSessionImpl::HandleStreamStopSendingCapsule(
    419    StreamId aId, Capsule&& aCapsule) {
    420  RefPtr<Http2WebTransportStream> stream = GetStream(aId);
    421  if (!stream) {
    422    return false;
    423  }
    424 
    425  stream->OnStopSending();
    426 
    427  WebTransportStopSendingCapsule& stopSending =
    428      aCapsule.GetWebTransportStopSendingCapsule();
    429 
    430  LOG(
    431      ("Http2WebTransportSessionImpl::HandleStreamStopSendingCapsule %p "
    432       "aID=%" PRIu64 " error=%" PRIu64,
    433       this, (uint64_t)aId, stopSending.mErrorCode));
    434 
    435  uint8_t wtError = Http3ErrorToWebTransportError(stopSending.mErrorCode);
    436  nsresult rv = GetNSResultFromWebTransportError(wtError);
    437  if (mListener) {
    438    mListener->OnStopSending(aId, rv);
    439  }
    440  return true;
    441 }
    442 
    443 bool Http2WebTransportSessionImpl::HandleStreamResetCapsule(
    444    StreamId aId, Capsule&& aCapsule) {
    445  RefPtr<Http2WebTransportStream> stream = GetStream(aId);
    446  if (!stream) {
    447    return false;
    448  }
    449 
    450  WebTransportResetStreamCapsule& reset =
    451      aCapsule.GetWebTransportResetStreamCapsule();
    452 
    453  stream->OnReset(reset.mReliableSize);
    454 
    455  uint8_t wtError = Http3ErrorToWebTransportError(reset.mErrorCode);
    456  nsresult rv = GetNSResultFromWebTransportError(wtError);
    457  if (mListener) {
    458    mListener->OnResetReceived(aId, rv);
    459  }
    460 
    461  return true;
    462 }
    463 
    464 void Http2WebTransportSessionImpl::OnStreamDataSent(StreamId aId,
    465                                                    size_t aCount) {
    466  RefPtr<Http2WebTransportStream> stream = GetStream(aId);
    467  if (!stream) {
    468    return;
    469  }
    470 
    471  stream->OnStreamDataSent(aCount);
    472 }
    473 
    474 void Http2WebTransportSessionImpl::OnError(uint64_t aError) {
    475  LOG(("Http2WebTransportSessionImpl::OnError %p aError=%" PRIu64, this,
    476       aError));
    477  // To be implemented.
    478 }
    479 
    480 bool Http2WebTransportSessionImpl::ProcessIncomingStreamCapsule(
    481    Capsule&& aCapsule, StreamId aID, WebTransportStreamType aStreamType) {
    482  LOG(
    483      ("Http2WebTransportSessionImpl::ProcessIncomingStreamCapsule %p "
    484       "aID=%" PRIu64 " type:%s",
    485       this, (uint64_t)aID,
    486       aStreamType == WebTransportStreamType::BiDi ? "BiDi" : "UniDi"));
    487  RefPtr<Http2WebTransportStream> stream = mIncomingStreams.Get(aID);
    488  if (stream) {
    489    return NS_SUCCEEDED(stream->OnCapsule(std::move(aCapsule)));
    490  }
    491 
    492  while (true) {
    493    auto res = mRemoteStreamsFlowControl[aStreamType].IsNewStream(aID);
    494    if (res.isErr() || !res.unwrap()) {
    495      break;
    496    }
    497 
    498    StreamId newStreamID =
    499        mRemoteStreamsFlowControl[aStreamType].TakeStreamId();
    500    stream = new Http2WebTransportStream(
    501        this,
    502        aStreamType == WebTransportStreamType::BiDi
    503            ? mSettings.mInitialMaxStreamDataBidi
    504            : 0,
    505        aStreamType == WebTransportStreamType::BiDi
    506            ? mSettings.mInitialLocalMaxStreamDataBidi
    507            : mSettings.mInitialLocalMaxStreamDataUnidi,
    508        newStreamID);
    509    if (NS_FAILED(stream->Init())) {
    510      return false;
    511    }
    512    mIncomingStreams.InsertOrUpdate(newStreamID, stream);
    513    if (nsCOMPtr<WebTransportSessionEventListenerInternal> listener =
    514            do_QueryInterface(mListener)) {
    515      listener->OnIncomingStreamAvailableInternal(stream);
    516    }
    517  }
    518 
    519  stream = mIncomingStreams.Get(aID);
    520  if (stream) {
    521    return NS_SUCCEEDED(stream->OnCapsule(std::move(aCapsule)));
    522  }
    523 
    524  return true;
    525 }
    526 
    527 void Http2WebTransportSessionImpl::OnCapsuleParseFailure(nsresult aError) {
    528  mHandler->OnCapsuleParseFailure(aError);
    529 }
    530 
    531 NS_IMPL_ISUPPORTS_INHERITED(Http2WebTransportSession, Http2StreamTunnel,
    532                            nsIOutputStreamCallback, nsIInputStreamCallback)
    533 
    534 Http2WebTransportSession::Http2WebTransportSession(
    535    Http2Session* aSession, int32_t aPriority, uint64_t aBcId,
    536    nsHttpConnectionInfo* aConnectionInfo,
    537    Http2WebTransportInitialSettings aSettings)
    538    : Http2StreamTunnel(aSession, aPriority, aBcId, aConnectionInfo),
    539      mImpl(MakeRefPtr<Http2WebTransportSessionImpl>(this, aSettings)),
    540      mCapsuleParser(MakeUnique<CapsuleParser>(mImpl)) {
    541  LOG(("Http2WebTransportSession ctor:%p", this));
    542 }
    543 
    544 Http2WebTransportSession::~Http2WebTransportSession() {
    545  LOG(("Http2WebTransportSession dtor:%p", this));
    546 }
    547 
    548 void Http2WebTransportSession::CloseStream(nsresult aReason) {
    549  LOG(("Http2WebTransportSession::CloseStream this=%p aReason=%x", this,
    550       static_cast<uint32_t>(aReason)));
    551  if (mTransaction) {
    552    mTransaction->Close(aReason);
    553    mTransaction = nullptr;
    554  }
    555 
    556  mInput->AsyncWait(nullptr, 0, 0, nullptr);
    557  mOutput->AsyncWait(nullptr, 0, 0, nullptr);
    558  Http2StreamTunnel::CloseStream(aReason);
    559  mCapsuleParser = nullptr;
    560  if (mImpl) {
    561    mImpl->Close(aReason);
    562    mImpl = nullptr;
    563  }
    564 }
    565 
    566 nsresult Http2WebTransportSession::GenerateHeaders(nsCString& aCompressedData,
    567                                                   uint8_t& aFirstFrameFlags) {
    568  nsHttpRequestHead* head = mTransaction->RequestHead();
    569 
    570  nsAutoCString authorityHeader;
    571  nsresult rv = head->GetHeader(nsHttp::Host, authorityHeader);
    572  NS_ENSURE_SUCCESS(rv, rv);
    573 
    574  RefPtr<Http2Session> session = Session();
    575  LOG3(("Http2WebTransportSession %p Stream ID 0x%X [session=%p] for %s\n",
    576        this, mStreamID, session.get(), authorityHeader.get()));
    577 
    578  nsAutoCString path;
    579  head->Path(path);
    580 
    581  rv = session->Compressor()->EncodeHeaderBlock(
    582      mFlatHttpRequestHeaders, "CONNECT"_ns, path, authorityHeader, "https"_ns,
    583      "webtransport"_ns, false, aCompressedData, true);
    584  NS_ENSURE_SUCCESS(rv, rv);
    585 
    586  mRequestBodyLenRemaining = 0x0fffffffffffffffULL;
    587 
    588  if (mInput) {
    589    mInput->AsyncWait(this, 0, 0, nullptr);
    590  }
    591  return NS_OK;
    592 }
    593 
    594 NS_IMETHODIMP
    595 Http2WebTransportSession::OnInputStreamReady(nsIAsyncInputStream* aIn) {
    596  char buffer[nsIOService::gDefaultSegmentSize];
    597  uint32_t remainingCapacity = sizeof(buffer);
    598  uint32_t read = 0;
    599 
    600  while (remainingCapacity > 0) {
    601    uint32_t count = 0;
    602    nsresult rv = mInput->Read(buffer + read, remainingCapacity, &count);
    603    if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
    604      break;
    605    }
    606 
    607    if (NS_FAILED(rv)) {
    608      LOG(("Http2WebTransportSession::OnInputStreamReady %p failed 0x%x\n",
    609           this, static_cast<uint32_t>(rv)));
    610      // TODO: close connection
    611      return rv;
    612    }
    613 
    614    // base stream closed
    615    if (count == 0) {
    616      LOG((
    617          "Http2WebTransportSession::OnInputStreamReady %p connection closed\n",
    618          this));
    619      // Close with NS_BASE_STREAM_CLOSED
    620      return NS_OK;
    621    }
    622 
    623    remainingCapacity -= count;
    624    read += count;
    625  }
    626 
    627  if (read > 0) {
    628    Http2Session::LogIO(nullptr, this, "Http2WebTransportSession", buffer,
    629                        read);
    630 
    631    mCapsuleParser->ProcessCapsuleData(reinterpret_cast<uint8_t*>(buffer),
    632                                       read);
    633  }
    634 
    635  mInput->AsyncWait(this, 0, 0, nullptr);
    636  return NS_OK;
    637 }
    638 
    639 NS_IMETHODIMP
    640 Http2WebTransportSession::OnOutputStreamReady(nsIAsyncOutputStream* aOut) {
    641  if (!mCurrentOutCapsule) {
    642    mImpl->PrepareCapsulesToSend(mOutgoingQueue);
    643    if (mOutgoingQueue.IsEmpty()) {
    644      return NS_OK;
    645    }
    646    mCurrentOutCapsule = mOutgoingQueue.Pop();
    647  }
    648 
    649  while (mCurrentOutCapsule && mOutput) {
    650    auto buffer = mCurrentOutCapsule->GetBuffer();
    651    const char* writeBuffer =
    652        reinterpret_cast<const char*>(buffer.Elements()) + mWriteOffset;
    653    uint32_t toWrite = buffer.Length() - mWriteOffset;
    654 
    655    uint32_t wrote = 0;
    656    nsresult rv = mOutput->Write(writeBuffer, toWrite, &wrote);
    657    if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
    658      mOutput->AsyncWait(this, 0, 0, nullptr);
    659      return NS_OK;
    660    }
    661 
    662    if (NS_FAILED(rv)) {
    663      LOG(("Http2WebTransportSession::OnOutputStreamReady %p failed %u\n", this,
    664           static_cast<uint32_t>(rv)));
    665      // TODO: close connection
    666      return NS_OK;
    667    }
    668 
    669    mWriteOffset += wrote;
    670 
    671    Maybe<StreamMetadata> metadata = mCurrentOutCapsule->GetStreamMetadata();
    672    // This is a WT_STREAM_DATA capsule, so we need to track how many bytes of
    673    // stream data are sent.
    674    if (metadata) {
    675      if (mWriteOffset > metadata->mStartOfData) {
    676        uint64_t dataSent = mWriteOffset - metadata->mStartOfData;
    677        mImpl->OnStreamDataSent(StreamId(metadata->mID), dataSent);
    678      }
    679    }
    680 
    681    if (toWrite == wrote) {
    682      mWriteOffset = 0;
    683      mCurrentOutCapsule =
    684          mOutgoingQueue.IsEmpty() ? nullptr : mOutgoingQueue.Pop();
    685    }
    686  }
    687 
    688  return NS_OK;
    689 }
    690 
    691 void Http2WebTransportSession::HasCapsuleToSend() {
    692  LOG(("Http2WebTransportSession::HasCapsuleToSend %p mSendClosed=%d", this,
    693       mSendClosed));
    694  if (mSendClosed) {
    695    return;
    696  }
    697 
    698  mImpl->PrepareCapsulesToSend(mOutgoingQueue);
    699 
    700  if (mOutput) {
    701    OnOutputStreamReady(mOutput);
    702  }
    703 }
    704 
    705 void Http2WebTransportSession::SetSentFin() {
    706  Http2StreamTunnel::SetSentFin(true);
    707 }
    708 
    709 void Http2WebTransportSession::StartReading() {
    710  if (mInput) {
    711    mInput->AsyncWait(this, 0, 0, nullptr);
    712  }
    713 }
    714 
    715 void Http2WebTransportSession::OnCapsuleParseFailure(nsresult aError) {
    716  LOG(("Http2WebTransportSession::OnCapsuleParseFailure %p aError=%" PRIX32,
    717       this, static_cast<uint32_t>(aError)));
    718 }
    719 
    720 }  // namespace mozilla::net