tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

Http2WebTransportStream.cpp (15198B)


      1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
      2 /* This Source Code Form is subject to the terms of the Mozilla Public
      3 * License, v. 2.0. If a copy of the MPL was not distributed with this
      4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
      5 
      6 // HttpLog.h should generally be included first
      7 #include "HttpLog.h"
      8 
      9 #include <algorithm>
     10 #include "Http2WebTransportStream.h"
     11 #include "Http2WebTransportSession.h"
     12 #include "Capsule.h"
     13 #include "CapsuleEncoder.h"
     14 #include "nsIOService.h"
     15 
     16 namespace mozilla::net {
     17 
     18 NS_IMPL_ISUPPORTS(Http2WebTransportStream, nsIOutputStreamCallback,
     19                  nsIInputStreamCallback)
     20 
     21 Http2WebTransportStream::Http2WebTransportStream(
     22    Http2WebTransportSessionImpl* aWebTransportSession, StreamId aStreamId,
     23    uint64_t aInitialMaxStreamData, uint64_t aInitialLocalMaxStreamData,
     24    std::function<void(Result<RefPtr<WebTransportStreamBase>, nsresult>&&)>&&
     25        aCallback)
     26    : WebTransportStreamBase(aWebTransportSession->GetStreamId(),
     27                             std::move(aCallback)),
     28      mWebTransportSession(aWebTransportSession),
     29      mStreamId(aStreamId),
     30      mOwnerThread(GetCurrentSerialEventTarget()),
     31      mFc(aStreamId, aInitialMaxStreamData),
     32      mReceiverFc(aStreamId, aInitialLocalMaxStreamData) {
     33  LOG(("Http2WebTransportStream outgoing ctor:%p", this));
     34  mStreamRole = OUTGOING;
     35  mStreamType = mStreamId.StreamType();
     36 }
     37 
     38 Http2WebTransportStream::Http2WebTransportStream(
     39    Http2WebTransportSessionImpl* aWebTransportSession,
     40    uint64_t aInitialMaxStreamData, uint64_t aInitialLocalMaxStreamData,
     41    StreamId aStreamId)
     42    : WebTransportStreamBase(aWebTransportSession->GetStreamId(), nullptr),
     43      mWebTransportSession(aWebTransportSession),
     44      mStreamId(aStreamId),
     45      mOwnerThread(GetCurrentSerialEventTarget()),
     46      mFc(aStreamId, aInitialMaxStreamData),
     47      mReceiverFc(aStreamId, aInitialLocalMaxStreamData) {
     48  LOG(("Http2WebTransportStream incoming ctor:%p", this));
     49  mStreamRole = INCOMING;
     50  mStreamType = mStreamId.StreamType();
     51 }
     52 
     53 Http2WebTransportStream::~Http2WebTransportStream() {
     54  LOG(("Http2WebTransportStream dtor:%p", this));
     55 }
     56 
     57 nsresult Http2WebTransportStream::Init() {
     58  nsresult rv = NS_OK;
     59  auto resultCallback = MakeScopeExit([&] {
     60    if (NS_FAILED(rv)) {
     61      mSendState = SEND_DONE;
     62      mRecvState = RECV_DONE;
     63      if (mStreamReadyCallback) {
     64        mStreamReadyCallback(Err(rv));
     65      }
     66 
     67    } else {
     68      mSocketInCondition = NS_OK;
     69      mSocketOutCondition = NS_OK;
     70      RefPtr<WebTransportStreamBase> stream = this;
     71      if (mStreamReadyCallback) {
     72        mStreamReadyCallback(stream);
     73      }
     74    }
     75    mStreamReadyCallback = nullptr;
     76  });
     77 
     78  if (mStreamRole == INCOMING) {
     79    rv = InitInputPipe();
     80    if (NS_FAILED(rv)) {
     81      return rv;
     82    }
     83 
     84    if (mStreamType == WebTransportStreamType::BiDi) {
     85      rv = InitOutputPipe();
     86    }
     87 
     88    return rv;
     89  }
     90 
     91  MOZ_ASSERT(mStreamRole == OUTGOING);
     92  rv = InitOutputPipe();
     93  if (NS_FAILED(rv)) {
     94    return rv;
     95  }
     96 
     97  if (mStreamType == WebTransportStreamType::BiDi) {
     98    rv = InitInputPipe();
     99  }
    100 
    101  if (mSendStreamPipeIn) {
    102    rv = mSendStreamPipeIn->AsyncWait(this, 0, 0, mOwnerThread);
    103  }
    104 
    105  return rv;
    106 }
    107 
    108 class StreamId Http2WebTransportStream::WebTransportStreamId() const {
    109  return mStreamId;
    110 }
    111 
    112 uint64_t Http2WebTransportStream::GetStreamId() const { return mStreamId; }
    113 
    114 void Http2WebTransportStream::SendStopSending(uint8_t aErrorCode) {
    115  if (mSentStopSending || !mWebTransportSession) {
    116    // https://www.ietf.org/archive/id/draft-ietf-webtrans-http2-11.html#section-6.3
    117    // A WT_STOP_SENDING capsule MUST NOT be sent multiple times for the same
    118    // stream.
    119    return;
    120  }
    121 
    122  mSentStopSending = true;
    123  mStopSendingCapsule.emplace(
    124      Capsule::WebTransportStopSending(aErrorCode, mStreamId));
    125  mWebTransportSession->StreamHasCapsuleToSend();
    126  mRecvState = RECV_DONE;
    127 }
    128 
    129 void Http2WebTransportStream::SendFin() {}
    130 
    131 void Http2WebTransportStream::Reset(uint64_t aErrorCode) {
    132  if (mSentReset || !mWebTransportSession || mSendState == SEND_DONE) {
    133    // https://www.ietf.org/archive/id/draft-ietf-webtrans-http2-11.html#section-6.2
    134    // A WT_RESET_STREAM capsule MUST NOT be sent after a stream is closed or
    135    // reset.
    136    return;
    137  }
    138 
    139  mSentReset = true;
    140  mStreamResetCapsule.emplace(Capsule::WebTransportResetStream(
    141      aErrorCode, mTotalSent.value(), mStreamId));
    142  mWebTransportSession->StreamHasCapsuleToSend();
    143  mRecvState = RECV_DONE;
    144  mSendState = SEND_DONE;
    145 }
    146 
    147 already_AddRefed<nsIWebTransportSendStreamStats>
    148 Http2WebTransportStream::GetSendStreamStats() {
    149  return nullptr;
    150 }
    151 
    152 already_AddRefed<nsIWebTransportReceiveStreamStats>
    153 Http2WebTransportStream::GetReceiveStreamStats() {
    154  return nullptr;
    155 }
    156 
    157 bool Http2WebTransportStream::RecvDone() const { return false; }
    158 
    159 void Http2WebTransportStream::SetSendOrder(Maybe<int64_t> aSendOrder) {}
    160 
    161 NS_IMETHODIMP
    162 Http2WebTransportStream::OnInputStreamReady(nsIAsyncInputStream* aIn) {
    163  LOG1(
    164      ("Http2WebTransportStream::OnInputStreamReady [this=%p stream=%p "
    165       "state=%d]",
    166       this, aIn, mSendState));
    167  if (mSendState == SEND_DONE) {
    168    // already closed
    169    return NS_OK;
    170  }
    171 
    172  uint32_t sendBytes = 0;
    173  return mSendStreamPipeIn->ReadSegments(
    174      ReadRequestSegment, this, nsIOService::gDefaultSegmentSize, &sendBytes);
    175 }
    176 
    177 NS_IMETHODIMP
    178 Http2WebTransportStream::OnOutputStreamReady(nsIAsyncOutputStream* aOut) {
    179  if (!mCurrentOut) {
    180    if (mOutgoingQueue.IsEmpty()) {
    181      return NS_OK;
    182    }
    183    mCurrentOut = mOutgoingQueue.Pop();
    184  }
    185 
    186  while (mCurrentOut && mReceiveStreamPipeOut && (mRecvState != RECV_DONE)) {
    187    char* writeBuffer = reinterpret_cast<char*>(const_cast<uint8_t*>(
    188                            mCurrentOut->GetData().Elements())) +
    189                        mWriteOffset;
    190    uint32_t toWrite = mCurrentOut->GetData().Length() - mWriteOffset;
    191    if (mReliableSize) {
    192      if (mTotalReceived + toWrite > *mReliableSize) {
    193        toWrite = *mReliableSize - mTotalReceived;
    194      }
    195    }
    196 
    197    uint32_t wrote = 0;
    198    nsresult rv = mReceiveStreamPipeOut->Write(writeBuffer, toWrite, &wrote);
    199    LOG(("Http2WebTransportStream::Write rv=0x%" PRIx32 " wrote=%" PRIu32
    200         " socketin=%" PRIx32 " [this=%p]",
    201         static_cast<uint32_t>(rv), wrote,
    202         static_cast<uint32_t>(mSocketInCondition), this));
    203    if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
    204      mSocketInCondition =
    205          mReceiveStreamPipeOut->AsyncWait(this, 0, 0, nullptr);
    206      return mSocketInCondition;
    207    }
    208 
    209    if (NS_FAILED(rv)) {
    210      LOG(("Http2WebTransportStream::OnOutputStreamReady %p failed %u\n", this,
    211           static_cast<uint32_t>(rv)));
    212      // TODO: close this stream
    213      mSocketInCondition = rv;
    214      mCurrentOut = nullptr;
    215      mRecvState = RECV_DONE;
    216      return NS_OK;
    217    }
    218 
    219    // Retire when sending data to the consumer.
    220    mReceiverFc.AddRetired(wrote);
    221    mWebTransportSession->ReceiverFc().AddRetired(wrote);
    222 
    223    mWriteOffset += wrote;
    224    mTotalReceived += wrote;
    225    // https://www.ietf.org/archive/id/draft-ietf-webtrans-http2-11.html#section-6.2
    226    // A receiver of a WT_RESET_STREAM capsule can discard any data in excess of
    227    // the Reliable Size indicated, even if that data was already received.
    228    if (mReliableSize && mTotalReceived == *mReliableSize) {
    229      mSocketInCondition = NS_OK;
    230      mWriteOffset = 0;
    231      mCurrentOut = nullptr;
    232      mOutgoingQueue.Clear();
    233      mRecvState = RECV_DONE;
    234      break;
    235    }
    236 
    237    if (toWrite == wrote) {
    238      mWriteOffset = 0;
    239      mCurrentOut = mOutgoingQueue.IsEmpty() ? nullptr : mOutgoingQueue.Pop();
    240    }
    241  }
    242  return NS_OK;
    243 }
    244 
    245 // static
    246 nsresult Http2WebTransportStream::ReadRequestSegment(
    247    nsIInputStream* stream, void* closure, const char* buf, uint32_t offset,
    248    uint32_t count, uint32_t* countRead) {
    249  Http2WebTransportStream* wtStream = (Http2WebTransportStream*)closure;
    250  LOG(("Http2WebTransportStream::ReadRequestSegment %p count=%u", wtStream,
    251       count));
    252  *countRead = 0;
    253  if (!wtStream->mWebTransportSession) {
    254    return NS_ERROR_UNEXPECTED;
    255  }
    256 
    257  uint64_t limit =
    258      std::min(wtStream->mWebTransportSession->SessionDataFc().Available(),
    259               wtStream->mFc.Available());
    260  if (limit < count) {
    261    if (wtStream->mWebTransportSession->SessionDataFc().Available() < count) {
    262      LOG(("blocked by session level flow control"));
    263      wtStream->mWebTransportSession->SessionDataFc().Blocked();
    264    }
    265    if (wtStream->mFc.Available() < count) {
    266      LOG(("blocked by stream level flow control"));
    267      wtStream->mFc.Blocked();
    268    }
    269    return NS_BASE_STREAM_WOULD_BLOCK;
    270  }
    271 
    272  nsTArray<uint8_t> data;
    273  data.AppendElements(buf, count);
    274  Capsule capsule = Capsule::WebTransportStreamData(wtStream->mStreamId, false,
    275                                                    std::move(data));
    276  UniquePtr<CapsuleEncoder> encoder = MakeUnique<CapsuleEncoder>();
    277  encoder->EncodeCapsule(capsule);
    278  wtStream->mCapsuleQueue.Push(std::move(encoder));
    279  *countRead = count;
    280  return NS_OK;
    281 }
    282 
    283 void Http2WebTransportStream::TakeOutputCapsule(
    284    mozilla::Queue<UniquePtr<CapsuleEncoder>>& aOutput) {
    285  LOG(("Http2WebTransportStream::TakeOutputCapsule %p", this));
    286  if (mCapsuleQueue.IsEmpty()) {
    287    mSendStreamPipeIn->AsyncWait(this, 0, 0, mOwnerThread);
    288    return;
    289  }
    290  while (!mCapsuleQueue.IsEmpty()) {
    291    UniquePtr<CapsuleEncoder> entry = mCapsuleQueue.Pop();
    292    aOutput.Push(std::move(entry));
    293  }
    294  mSendStreamPipeIn->AsyncWait(this, 0, 0, mOwnerThread);
    295 }
    296 
    297 void Http2WebTransportStream::WriteMaintenanceCapsules(
    298    mozilla::Queue<UniquePtr<CapsuleEncoder>>& aOutput) {
    299  if (mStopSendingCapsule) {
    300    UniquePtr<CapsuleEncoder> encoder = MakeUnique<CapsuleEncoder>();
    301    encoder->EncodeCapsule(*mStopSendingCapsule);
    302    mStopSendingCapsule = Nothing();
    303    aOutput.Push(std::move(encoder));
    304  }
    305 
    306  if (mStreamResetCapsule) {
    307    UniquePtr<CapsuleEncoder> encoder = MakeUnique<CapsuleEncoder>();
    308    encoder->EncodeCapsule(*mStreamResetCapsule);
    309    mStreamResetCapsule = Nothing();
    310    aOutput.Push(std::move(encoder));
    311  }
    312 
    313  auto dataBlocked = mFc.CreateStreamDataBlockedCapsule();
    314  if (dataBlocked) {
    315    aOutput.Push(MakeUnique<CapsuleEncoder>(dataBlocked.ref()));
    316  }
    317 
    318  auto maxStreamData = mReceiverFc.CreateMaxStreamDataCapsule();
    319  if (maxStreamData) {
    320    aOutput.Push(MakeUnique<CapsuleEncoder>(maxStreamData.ref()));
    321  }
    322 
    323  // Keep reading data from the consumer.
    324  mSendStreamPipeIn->AsyncWait(this, 0, 0, mOwnerThread);
    325 }
    326 
    327 nsresult Http2WebTransportStream::OnCapsule(Capsule&& aCapsule) {
    328  switch (aCapsule.Type()) {
    329    case CapsuleType::WT_STREAM: {
    330      LOG(("Handling WT_STREAM\n"));
    331      WebTransportStreamDataCapsule& streamData =
    332          aCapsule.GetWebTransportStreamDataCapsule();
    333      return HandleStreamData(false, std::move(streamData.mData));
    334    }
    335    case CapsuleType::WT_STREAM_FIN:
    336      LOG(("Handling WT_STREAM_FIN\n"));
    337      break;
    338    case CapsuleType::WT_MAX_STREAM_DATA: {
    339      LOG(("Handling WT_MAX_STREAM_DATA\n"));
    340      WebTransportMaxStreamDataCapsule& maxStreamData =
    341          aCapsule.GetWebTransportMaxStreamDataCapsule();
    342      return HandleMaxStreamData(maxStreamData.mLimit);
    343    }
    344    case CapsuleType::WT_STREAM_DATA_BLOCKED:
    345      LOG(("Handling WT_STREAM_DATA_BLOCKED\n"));
    346      break;
    347    default:
    348      LOG(("Unhandled capsule type\n"));
    349      break;
    350  }
    351  return NS_OK;
    352 }
    353 
    354 nsresult Http2WebTransportStream::HandleMaxStreamData(uint64_t aLimit) {
    355  mFc.Update(aLimit);
    356  return NS_OK;
    357 }
    358 
    359 void Http2WebTransportStream::OnStopSending() { mSendState = SEND_DONE; }
    360 
    361 void Http2WebTransportStream::OnReset(uint64_t aSize) {
    362  if (mReliableSize) {
    363    return;
    364  }
    365 
    366  mReliableSize.emplace(aSize);
    367 
    368  LOG(("Http2WebTransportStream::OnReset %p mReliableSize=%" PRIu64
    369       " mTotalReceived=%" PRIu64,
    370       this, *mReliableSize, mTotalReceived));
    371  if (*mReliableSize < mTotalReceived) {
    372    // A receiver MUST treat the receipt of a WT_RESET_STREAM with a Reliable
    373    // Size smaller than the number of bytes it has received on the stream as a
    374    // session error.
    375    // TODO: find a better error code.
    376    mWebTransportSession->OnError(0);
    377  }
    378 }
    379 
    380 void Http2WebTransportStream::OnStreamDataSent(size_t aCount) {
    381  LOG(("Http2WebTransportStream::OnStreamDataSent %p aCount=%" PRIu64
    382       " mTotalSent=%" PRIu64,
    383       this, static_cast<uint64_t>(aCount), mTotalSent.value()));
    384  mTotalSent += aCount;
    385  if (!mTotalSent.isValid()) {
    386    // TODO: find a better error code.
    387    mWebTransportSession->OnError(0);
    388    return;
    389  }
    390 
    391  mFc.Consume(aCount);
    392  mWebTransportSession->SessionDataFc().Consume(aCount);
    393 }
    394 
    395 void Http2WebTransportStream::Close(nsresult aResult) {
    396  if (mSendStreamPipeIn) {
    397    mSendStreamPipeIn->AsyncWait(nullptr, 0, 0, nullptr);
    398    mSendStreamPipeIn->CloseWithStatus(aResult);
    399  }
    400  if (mReceiveStreamPipeOut) {
    401    mReceiveStreamPipeOut->AsyncWait(nullptr, 0, 0, nullptr);
    402    mReceiveStreamPipeOut->CloseWithStatus(aResult);
    403  }
    404  mSendState = SEND_DONE;
    405  mRecvState = RECV_DONE;
    406  mWebTransportSession = nullptr;
    407 }
    408 
    409 nsresult Http2WebTransportStream::HandleStreamData(bool aFin,
    410                                                   nsTArray<uint8_t>&& aData) {
    411  LOG(("Http2WebTransportStream::HandleStreamData [this=%p, state=%d aFin=%d",
    412       this, static_cast<uint32_t>(mRecvState), aFin));
    413 
    414  if (NS_FAILED(mSocketInCondition)) {
    415    mRecvState = RECV_DONE;
    416  }
    417 
    418  uint32_t countWrittenSingle = 0;
    419  switch (mRecvState) {
    420    case READING: {
    421      size_t length = aData.Length();
    422      if (length) {
    423        auto newConsumed =
    424            mReceiverFc.SetConsumed(mReceiverFc.Consumed() + length);
    425        if (newConsumed.isErr()) {
    426          mSocketInCondition = newConsumed.unwrapErr();
    427        } else {
    428          if (!mWebTransportSession->ReceiverFc().Consume(
    429                  newConsumed.unwrap())) {
    430            LOG(("Exceed session flow control limit"));
    431            mSocketInCondition = NS_ERROR_NOT_AVAILABLE;
    432          } else {
    433            mOutgoingQueue.Push(MakeUnique<StreamData>(std::move(aData)));
    434            mSocketInCondition = OnOutputStreamReady(mReceiveStreamPipeOut);
    435          }
    436        }
    437      } else {
    438        // https://www.ietf.org/archive/id/draft-ietf-webtrans-http2-10.html#section-6.4
    439        // Empty WT_STREAM capsules MUST NOT be used unless they open or close a
    440        // stream
    441        // TODO: Handle empty stream capsule
    442      }
    443 
    444      LOG((
    445          "Http2WebTransportStream::HandleStreamData "
    446          "countWrittenSingle=%" PRIu32 " socketin=%" PRIx32 " [this=%p]",
    447          countWrittenSingle, static_cast<uint32_t>(mSocketInCondition), this));
    448 
    449      if (NS_FAILED(mSocketInCondition)) {
    450        mReceiveStreamPipeOut->Close();
    451        mRecvState = RECV_DONE;
    452      } else {
    453        if (aFin) {
    454          mRecvState = RECEIVED_FIN;
    455        }
    456      }
    457    } break;
    458    case RECEIVED_FIN:
    459      mRecvState = RECV_DONE;
    460      break;
    461    case RECV_DONE:
    462      mSocketInCondition = NS_ERROR_UNEXPECTED;
    463      break;
    464    default:
    465      mSocketInCondition = NS_ERROR_UNEXPECTED;
    466      break;
    467  }
    468 
    469  return mSocketInCondition;
    470 }
    471 
    472 }  // namespace mozilla::net