Http2WebTransportSession.cpp (23546B)
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* vim: set sw=2 ts=8 et tw=80 : */ 3 /* This Source Code Form is subject to the terms of the Mozilla Public 4 * License, v. 2.0. If a copy of the MPL was not distributed with this 5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 6 7 // HttpLog.h should generally be included first 8 #include "HttpLog.h" 9 10 #include "Capsule.h" 11 #include "CapsuleEncoder.h" 12 #include "Http2WebTransportSession.h" 13 #include "Http2WebTransportStream.h" 14 #include "Http2Session.h" 15 #include "mozilla/net/NeqoHttp3Conn.h" 16 #include "nsIWebTransport.h" 17 #include "nsIOService.h" 18 #include "nsHttp.h" 19 20 namespace mozilla::net { 21 22 Http2WebTransportSessionImpl::CapsuleQueue::CapsuleQueue() = default; 23 24 mozilla::Queue<UniquePtr<CapsuleEncoder>>& 25 Http2WebTransportSessionImpl::CapsuleQueue::operator[]( 26 CapsuleTransmissionPriority aPriority) { 27 if (aPriority == CapsuleTransmissionPriority::Critical) { 28 return mCritical; 29 } 30 if (aPriority == CapsuleTransmissionPriority::Important) { 31 return mImportant; 32 } 33 if (aPriority == CapsuleTransmissionPriority::High) { 34 return mHigh; 35 } 36 if (aPriority == CapsuleTransmissionPriority::Normal) { 37 return mNormal; 38 } 39 40 return mLow; 41 } 42 43 Http2WebTransportSessionImpl::Http2WebTransportSessionImpl( 44 CapsuleIOHandler* aHandler, Http2WebTransportInitialSettings aSettings) 45 : mSettings(aSettings), 46 mRemoteStreamsFlowControl(aSettings.mInitialLocalMaxStreamsBidi, 47 aSettings.mInitialLocalMaxStreamsUnidi), 48 mHandler(aHandler), 49 mSessionDataFc(aSettings.mInitialMaxData), 50 mReceiverFc(aSettings.mInitialLocalMaxData) { 51 LOG(("Http2WebTransportSessionImpl ctor:%p", this)); 52 mLocalStreamsFlowControl[WebTransportStreamType::UniDi].Update( 53 mSettings.mInitialMaxStreamsUni); 54 mLocalStreamsFlowControl[WebTransportStreamType::BiDi].Update( 55 mSettings.mInitialMaxStreamsBidi); 56 } 57 58 Http2WebTransportSessionImpl::~Http2WebTransportSessionImpl() { 59 LOG(("Http2WebTransportSessionImpl dtor:%p", this)); 60 } 61 62 void Http2WebTransportSessionImpl::CloseSession(uint32_t aStatus, 63 const nsACString& aReason) { 64 LOG(("Http2WebTransportSessionImpl::CloseSession %p aStatus=%x", this, 65 aStatus)); 66 67 mHandler->SetSentFin(); 68 69 Capsule capsule = Capsule::CloseWebTransportSession(aStatus, aReason); 70 UniquePtr<CapsuleEncoder> encoder = MakeUnique<CapsuleEncoder>(); 71 encoder->EncodeCapsule(capsule); 72 EnqueueOutCapsule(CapsuleTransmissionPriority::Important, std::move(encoder)); 73 } 74 75 uint64_t Http2WebTransportSessionImpl::GetStreamId() const { return mStreamId; } 76 77 void Http2WebTransportSessionImpl::GetMaxDatagramSize() {} 78 79 void Http2WebTransportSessionImpl::SendDatagram(nsTArray<uint8_t>&& aData, 80 uint64_t aTrackingId) { 81 LOG(("Http2WebTransportSession::SendDatagram %p", this)); 82 83 Capsule capsule = Capsule::WebTransportDatagram(std::move(aData)); 84 85 UniquePtr<CapsuleEncoder> encoder = MakeUnique<CapsuleEncoder>(); 86 encoder->EncodeCapsule(capsule); 87 EnqueueOutCapsule(CapsuleTransmissionPriority::Normal, std::move(encoder)); 88 } 89 90 void Http2WebTransportSessionImpl::CreateOutgoingStreamInternal( 91 StreamId aStreamId, 92 std::function<void(Result<RefPtr<WebTransportStreamBase>, nsresult>&&)>&& 93 aCallback) { 94 LOG( 95 ("Http2WebTransportSessionImpl::CreateOutgoingStreamInternal %p " 96 "id:%" PRIx64, 97 this, (uint64_t)aStreamId)); 98 99 RefPtr<Http2WebTransportStream> stream = new Http2WebTransportStream( 100 this, aStreamId, 101 aStreamId.IsBiDi() ? mSettings.mInitialMaxStreamDataBidi 102 : mSettings.mInitialMaxStreamDataUni, 103 aStreamId.IsBiDi() ? mSettings.mInitialLocalMaxStreamDataBidi 104 : mSettings.mInitialLocalMaxStreamDataUnidi, 105 std::move(aCallback)); 106 if (NS_FAILED(stream->Init())) { 107 return; 108 } 109 mOutgoingStreams.InsertOrUpdate(aStreamId, std::move(stream)); 110 } 111 112 void Http2WebTransportSessionImpl::ProcessPendingStreamCallbacks( 113 mozilla::Queue<UniquePtr<PendingStreamCallback>>& aCallbacks, 114 WebTransportStreamType aStreamType) { 115 size_t size = aCallbacks.Count(); 116 for (size_t count = 0; count < size; ++count) { 117 auto id = mLocalStreamsFlowControl.TakeStreamId(aStreamType); 118 if (!id) { 119 break; 120 } 121 122 UniquePtr<PendingStreamCallback> callback = aCallbacks.Pop(); 123 auto cb = callback->TakeCallback(); 124 CreateOutgoingStreamInternal(*id, std::move(cb)); 125 } 126 } 127 128 void Http2WebTransportSessionImpl::CreateOutgoingBidirectionalStream( 129 std::function<void(Result<RefPtr<WebTransportStreamBase>, nsresult>&&)>&& 130 aCallback) { 131 auto id = mLocalStreamsFlowControl.TakeStreamId(WebTransportStreamType::BiDi); 132 if (!id) { 133 mBidiPendingStreamCallbacks.Push( 134 MakeUnique<PendingStreamCallback>(std::move(aCallback))); 135 return; 136 } 137 138 CreateOutgoingStreamInternal(*id, std::move(aCallback)); 139 } 140 141 void Http2WebTransportSessionImpl::CreateOutgoingUnidirectionalStream( 142 std::function<void(Result<RefPtr<WebTransportStreamBase>, nsresult>&&)>&& 143 aCallback) { 144 auto id = 145 mLocalStreamsFlowControl.TakeStreamId(WebTransportStreamType::UniDi); 146 if (!id) { 147 mUnidiPendingStreamCallbacks.Push( 148 MakeUnique<PendingStreamCallback>(std::move(aCallback))); 149 return; 150 } 151 152 CreateOutgoingStreamInternal(*id, std::move(aCallback)); 153 } 154 155 void Http2WebTransportSessionImpl::StartReading() { 156 LOG(("Http2WebTransportSessionImpl::StartReading %p", this)); 157 mHandler->StartReading(); 158 } 159 160 void Http2WebTransportSessionImpl::EnqueueOutCapsule( 161 CapsuleTransmissionPriority aPriority, UniquePtr<CapsuleEncoder>&& aData) { 162 mCapsuleQueue[aPriority].Push(std::move(aData)); 163 mHandler->HasCapsuleToSend(); 164 } 165 166 void Http2WebTransportSessionImpl::SendMaintenanceCapsules( 167 CapsuleTransmissionPriority aPriority) { 168 auto encoder = mSessionDataFc.CreateSessionDataBlockedCapsule(); 169 if (encoder) { 170 mCapsuleQueue[aPriority].Push(MakeUnique<CapsuleEncoder>(encoder.ref())); 171 } 172 encoder = mReceiverFc.CreateMaxDataCapsule(); 173 if (encoder) { 174 mCapsuleQueue[aPriority].Push(MakeUnique<CapsuleEncoder>(encoder.ref())); 175 } 176 encoder = mLocalStreamsFlowControl[WebTransportStreamType::BiDi] 177 .CreateStreamsBlockedCapsule(); 178 if (encoder) { 179 mCapsuleQueue[aPriority].Push(MakeUnique<CapsuleEncoder>(encoder.ref())); 180 } 181 encoder = mLocalStreamsFlowControl[WebTransportStreamType::UniDi] 182 .CreateStreamsBlockedCapsule(); 183 if (encoder) { 184 mCapsuleQueue[aPriority].Push(MakeUnique<CapsuleEncoder>(encoder.ref())); 185 } 186 encoder = mRemoteStreamsFlowControl[WebTransportStreamType::BiDi] 187 .FlowControl() 188 .CreateMaxStreamsCapsule(); 189 if (encoder) { 190 mCapsuleQueue[aPriority].Push(MakeUnique<CapsuleEncoder>(encoder.ref())); 191 } 192 encoder = mRemoteStreamsFlowControl[WebTransportStreamType::UniDi] 193 .FlowControl() 194 .CreateMaxStreamsCapsule(); 195 if (encoder) { 196 mCapsuleQueue[aPriority].Push(MakeUnique<CapsuleEncoder>(encoder.ref())); 197 } 198 for (const auto& stream : mOutgoingStreams.Values()) { 199 stream->WriteMaintenanceCapsules(mCapsuleQueue[aPriority]); 200 } 201 for (const auto& stream : mIncomingStreams.Values()) { 202 stream->WriteMaintenanceCapsules(mCapsuleQueue[aPriority]); 203 } 204 } 205 206 void Http2WebTransportSessionImpl::StreamHasCapsuleToSend() { 207 mHandler->HasCapsuleToSend(); 208 } 209 210 void Http2WebTransportSessionImpl::PrepareCapsulesToSend( 211 mozilla::Queue<UniquePtr<CapsuleEncoder>>& aOutput) { 212 // Like neqo, flow control capsules are at level 213 // CapsuleTransmissionPriority::Important. 214 SendMaintenanceCapsules(CapsuleTransmissionPriority::Important); 215 216 for (const auto& stream : mOutgoingStreams.Values()) { 217 stream->TakeOutputCapsule( 218 mCapsuleQueue[CapsuleTransmissionPriority::Normal]); 219 } 220 for (const auto& stream : mIncomingStreams.Values()) { 221 stream->TakeOutputCapsule( 222 mCapsuleQueue[CapsuleTransmissionPriority::Normal]); 223 } 224 225 static constexpr CapsuleTransmissionPriority priorities[] = { 226 CapsuleTransmissionPriority::Critical, 227 CapsuleTransmissionPriority::Important, 228 CapsuleTransmissionPriority::High, 229 CapsuleTransmissionPriority::Normal, 230 CapsuleTransmissionPriority::Low, 231 }; 232 233 for (CapsuleTransmissionPriority priority : priorities) { 234 auto& queue = mCapsuleQueue[priority]; 235 while (!queue.IsEmpty()) { 236 UniquePtr<CapsuleEncoder> entry = queue.Pop(); 237 aOutput.Push(std::move(entry)); 238 } 239 } 240 } 241 242 void Http2WebTransportSessionImpl::Close(nsresult aReason) { 243 for (const auto& stream : mOutgoingStreams.Values()) { 244 stream->Close(aReason); 245 } 246 for (const auto& stream : mIncomingStreams.Values()) { 247 stream->Close(aReason); 248 } 249 mOutgoingStreams.Clear(); 250 mIncomingStreams.Clear(); 251 } 252 253 void Http2WebTransportSessionImpl::OnStreamClosed( 254 Http2WebTransportStream* aStream) { 255 LOG(("Http2WebTransportSessionImpl::OnStreamClosed %p stream:%p", this, 256 aStream)); 257 RefPtr<Http2WebTransportStream> stream = aStream; 258 StreamId id = stream->WebTransportStreamId(); 259 if (id.IsClientInitiated()) { 260 mOutgoingStreams.Remove(id); 261 } else { 262 mIncomingStreams.Remove(id); 263 mRemoteStreamsFlowControl[id.StreamType()].FlowControl().AddRetired(1); 264 } 265 } 266 267 bool Http2WebTransportSessionImpl::OnCapsule(Capsule&& aCapsule) { 268 switch (aCapsule.Type()) { 269 case CapsuleType::CLOSE_WEBTRANSPORT_SESSION: 270 LOG(("Handling CLOSE_WEBTRANSPORT_SESSION\n")); 271 break; 272 case CapsuleType::DRAIN_WEBTRANSPORT_SESSION: 273 LOG(("Handling DRAIN_WEBTRANSPORT_SESSION\n")); 274 break; 275 case CapsuleType::PADDING: 276 LOG(("Handling PADDING\n")); 277 break; 278 case CapsuleType::WT_RESET_STREAM: { 279 WebTransportResetStreamCapsule& reset = 280 aCapsule.GetWebTransportResetStreamCapsule(); 281 StreamId id = StreamId(reset.mID); 282 if (!HandleStreamResetCapsule(id, std::move(aCapsule))) { 283 return false; 284 } 285 } break; 286 case CapsuleType::WT_STOP_SENDING: { 287 WebTransportStopSendingCapsule& stopSending = 288 aCapsule.GetWebTransportStopSendingCapsule(); 289 StreamId id = StreamId(stopSending.mID); 290 if (!HandleStreamStopSendingCapsule(id, std::move(aCapsule))) { 291 return false; 292 } 293 } break; 294 case CapsuleType::WT_STREAM: { 295 WebTransportStreamDataCapsule& streamData = 296 aCapsule.GetWebTransportStreamDataCapsule(); 297 StreamId id = StreamId(streamData.mID); 298 if (id.IsServerInitiated()) { 299 return ProcessIncomingStreamCapsule(std::move(aCapsule), id, 300 id.StreamType()); 301 } else { 302 RefPtr<Http2WebTransportStream> stream = mOutgoingStreams.Get(id); 303 if (!stream) { 304 LOG( 305 ("Http2WebTransportSessionImpl::OnCapsule - " 306 "stream not found " 307 "stream_id=0x%" PRIx64 " [this=%p].", 308 static_cast<uint64_t>(id), this)); 309 return false; 310 } 311 if (NS_FAILED(stream->OnCapsule(std::move(aCapsule)))) { 312 return false; 313 } 314 } 315 break; 316 } 317 case CapsuleType::WT_STREAM_FIN: 318 LOG(("Handling WT_STREAM_FIN\n")); 319 break; 320 case CapsuleType::WT_MAX_DATA: { 321 LOG(("Handling WT_MAX_DATA\n")); 322 WebTransportMaxDataCapsule& maxData = 323 aCapsule.GetWebTransportMaxDataCapsule(); 324 mSessionDataFc.Update(maxData.mMaxDataSize); 325 } break; 326 case CapsuleType::WT_MAX_STREAM_DATA: { 327 WebTransportMaxStreamDataCapsule& maxStreamData = 328 aCapsule.GetWebTransportMaxStreamDataCapsule(); 329 StreamId id = StreamId(maxStreamData.mID); 330 if (!HandleMaxStreamDataCapsule(id, std::move(aCapsule))) { 331 return false; 332 } 333 } break; 334 case CapsuleType::WT_MAX_STREAMS_BIDI: { 335 LOG(("Handling WT_MAX_STREAMS_BIDI\n")); 336 WebTransportMaxStreamsCapsule& maxStreams = 337 aCapsule.GetWebTransportMaxStreamsCapsule(); 338 mLocalStreamsFlowControl[WebTransportStreamType::BiDi].Update( 339 maxStreams.mLimit); 340 ProcessPendingStreamCallbacks(mBidiPendingStreamCallbacks, 341 WebTransportStreamType::BiDi); 342 break; 343 } 344 case CapsuleType::WT_MAX_STREAMS_UNIDI: { 345 LOG(("Handling WT_MAX_STREAMS_UNIDI\n")); 346 WebTransportMaxStreamsCapsule& maxStreams = 347 aCapsule.GetWebTransportMaxStreamsCapsule(); 348 mLocalStreamsFlowControl[WebTransportStreamType::UniDi].Update( 349 maxStreams.mLimit); 350 ProcessPendingStreamCallbacks(mUnidiPendingStreamCallbacks, 351 WebTransportStreamType::UniDi); 352 break; 353 } 354 case CapsuleType::WT_DATA_BLOCKED: 355 LOG(("Handling WT_DATA_BLOCKED\n")); 356 break; 357 case CapsuleType::WT_STREAM_DATA_BLOCKED: 358 LOG(("Handling WT_STREAM_DATA_BLOCKED\n")); 359 break; 360 case CapsuleType::WT_STREAMS_BLOCKED_BIDI: 361 LOG(("Handling WT_STREAMS_BLOCKED_BIDI\n")); 362 break; 363 case CapsuleType::WT_STREAMS_BLOCKED_UNIDI: 364 LOG(("Handling WT_STREAMS_BLOCKED_UNIDI\n")); 365 break; 366 case CapsuleType::DATAGRAM: { 367 LOG(("Handling DATAGRAM\n")); 368 WebTransportDatagramCapsule& datagram = 369 aCapsule.GetWebTransportDatagramCapsule(); 370 if (nsCOMPtr<WebTransportSessionEventListenerInternal> listener = 371 do_QueryInterface(mListener)) { 372 listener->OnDatagramReceivedInternal(std::move(datagram.mPayload)); 373 } 374 break; 375 } 376 default: 377 LOG(("Unhandled capsule type\n")); 378 break; 379 } 380 return true; 381 } 382 383 already_AddRefed<Http2WebTransportStream> 384 Http2WebTransportSessionImpl::GetStream(StreamId aId) { 385 RefPtr<Http2WebTransportStream> stream; 386 if (aId.IsClientInitiated()) { 387 stream = mOutgoingStreams.Get(aId); 388 } else { 389 stream = mIncomingStreams.Get(aId); 390 } 391 392 if (!stream) { 393 LOG( 394 ("Http2WebTransportSessionImpl::GetStream - " 395 "stream not found " 396 "stream_id=0x%" PRIx64 " [this=%p].", 397 static_cast<uint64_t>(aId), this)); 398 return nullptr; 399 } 400 401 return stream.forget(); 402 } 403 404 bool Http2WebTransportSessionImpl::HandleMaxStreamDataCapsule( 405 StreamId aId, Capsule&& aCapsule) { 406 RefPtr<Http2WebTransportStream> stream = GetStream(aId); 407 if (!stream) { 408 return false; 409 } 410 411 if (NS_FAILED(stream->OnCapsule(std::move(aCapsule)))) { 412 return false; 413 } 414 415 return true; 416 } 417 418 bool Http2WebTransportSessionImpl::HandleStreamStopSendingCapsule( 419 StreamId aId, Capsule&& aCapsule) { 420 RefPtr<Http2WebTransportStream> stream = GetStream(aId); 421 if (!stream) { 422 return false; 423 } 424 425 stream->OnStopSending(); 426 427 WebTransportStopSendingCapsule& stopSending = 428 aCapsule.GetWebTransportStopSendingCapsule(); 429 430 LOG( 431 ("Http2WebTransportSessionImpl::HandleStreamStopSendingCapsule %p " 432 "aID=%" PRIu64 " error=%" PRIu64, 433 this, (uint64_t)aId, stopSending.mErrorCode)); 434 435 uint8_t wtError = Http3ErrorToWebTransportError(stopSending.mErrorCode); 436 nsresult rv = GetNSResultFromWebTransportError(wtError); 437 if (mListener) { 438 mListener->OnStopSending(aId, rv); 439 } 440 return true; 441 } 442 443 bool Http2WebTransportSessionImpl::HandleStreamResetCapsule( 444 StreamId aId, Capsule&& aCapsule) { 445 RefPtr<Http2WebTransportStream> stream = GetStream(aId); 446 if (!stream) { 447 return false; 448 } 449 450 WebTransportResetStreamCapsule& reset = 451 aCapsule.GetWebTransportResetStreamCapsule(); 452 453 stream->OnReset(reset.mReliableSize); 454 455 uint8_t wtError = Http3ErrorToWebTransportError(reset.mErrorCode); 456 nsresult rv = GetNSResultFromWebTransportError(wtError); 457 if (mListener) { 458 mListener->OnResetReceived(aId, rv); 459 } 460 461 return true; 462 } 463 464 void Http2WebTransportSessionImpl::OnStreamDataSent(StreamId aId, 465 size_t aCount) { 466 RefPtr<Http2WebTransportStream> stream = GetStream(aId); 467 if (!stream) { 468 return; 469 } 470 471 stream->OnStreamDataSent(aCount); 472 } 473 474 void Http2WebTransportSessionImpl::OnError(uint64_t aError) { 475 LOG(("Http2WebTransportSessionImpl::OnError %p aError=%" PRIu64, this, 476 aError)); 477 // To be implemented. 478 } 479 480 bool Http2WebTransportSessionImpl::ProcessIncomingStreamCapsule( 481 Capsule&& aCapsule, StreamId aID, WebTransportStreamType aStreamType) { 482 LOG( 483 ("Http2WebTransportSessionImpl::ProcessIncomingStreamCapsule %p " 484 "aID=%" PRIu64 " type:%s", 485 this, (uint64_t)aID, 486 aStreamType == WebTransportStreamType::BiDi ? "BiDi" : "UniDi")); 487 RefPtr<Http2WebTransportStream> stream = mIncomingStreams.Get(aID); 488 if (stream) { 489 return NS_SUCCEEDED(stream->OnCapsule(std::move(aCapsule))); 490 } 491 492 while (true) { 493 auto res = mRemoteStreamsFlowControl[aStreamType].IsNewStream(aID); 494 if (res.isErr() || !res.unwrap()) { 495 break; 496 } 497 498 StreamId newStreamID = 499 mRemoteStreamsFlowControl[aStreamType].TakeStreamId(); 500 stream = new Http2WebTransportStream( 501 this, 502 aStreamType == WebTransportStreamType::BiDi 503 ? mSettings.mInitialMaxStreamDataBidi 504 : 0, 505 aStreamType == WebTransportStreamType::BiDi 506 ? mSettings.mInitialLocalMaxStreamDataBidi 507 : mSettings.mInitialLocalMaxStreamDataUnidi, 508 newStreamID); 509 if (NS_FAILED(stream->Init())) { 510 return false; 511 } 512 mIncomingStreams.InsertOrUpdate(newStreamID, stream); 513 if (nsCOMPtr<WebTransportSessionEventListenerInternal> listener = 514 do_QueryInterface(mListener)) { 515 listener->OnIncomingStreamAvailableInternal(stream); 516 } 517 } 518 519 stream = mIncomingStreams.Get(aID); 520 if (stream) { 521 return NS_SUCCEEDED(stream->OnCapsule(std::move(aCapsule))); 522 } 523 524 return true; 525 } 526 527 void Http2WebTransportSessionImpl::OnCapsuleParseFailure(nsresult aError) { 528 mHandler->OnCapsuleParseFailure(aError); 529 } 530 531 NS_IMPL_ISUPPORTS_INHERITED(Http2WebTransportSession, Http2StreamTunnel, 532 nsIOutputStreamCallback, nsIInputStreamCallback) 533 534 Http2WebTransportSession::Http2WebTransportSession( 535 Http2Session* aSession, int32_t aPriority, uint64_t aBcId, 536 nsHttpConnectionInfo* aConnectionInfo, 537 Http2WebTransportInitialSettings aSettings) 538 : Http2StreamTunnel(aSession, aPriority, aBcId, aConnectionInfo), 539 mImpl(MakeRefPtr<Http2WebTransportSessionImpl>(this, aSettings)), 540 mCapsuleParser(MakeUnique<CapsuleParser>(mImpl)) { 541 LOG(("Http2WebTransportSession ctor:%p", this)); 542 } 543 544 Http2WebTransportSession::~Http2WebTransportSession() { 545 LOG(("Http2WebTransportSession dtor:%p", this)); 546 } 547 548 void Http2WebTransportSession::CloseStream(nsresult aReason) { 549 LOG(("Http2WebTransportSession::CloseStream this=%p aReason=%x", this, 550 static_cast<uint32_t>(aReason))); 551 if (mTransaction) { 552 mTransaction->Close(aReason); 553 mTransaction = nullptr; 554 } 555 556 mInput->AsyncWait(nullptr, 0, 0, nullptr); 557 mOutput->AsyncWait(nullptr, 0, 0, nullptr); 558 Http2StreamTunnel::CloseStream(aReason); 559 mCapsuleParser = nullptr; 560 if (mImpl) { 561 mImpl->Close(aReason); 562 mImpl = nullptr; 563 } 564 } 565 566 nsresult Http2WebTransportSession::GenerateHeaders(nsCString& aCompressedData, 567 uint8_t& aFirstFrameFlags) { 568 nsHttpRequestHead* head = mTransaction->RequestHead(); 569 570 nsAutoCString authorityHeader; 571 nsresult rv = head->GetHeader(nsHttp::Host, authorityHeader); 572 NS_ENSURE_SUCCESS(rv, rv); 573 574 RefPtr<Http2Session> session = Session(); 575 LOG3(("Http2WebTransportSession %p Stream ID 0x%X [session=%p] for %s\n", 576 this, mStreamID, session.get(), authorityHeader.get())); 577 578 nsAutoCString path; 579 head->Path(path); 580 581 rv = session->Compressor()->EncodeHeaderBlock( 582 mFlatHttpRequestHeaders, "CONNECT"_ns, path, authorityHeader, "https"_ns, 583 "webtransport"_ns, false, aCompressedData, true); 584 NS_ENSURE_SUCCESS(rv, rv); 585 586 mRequestBodyLenRemaining = 0x0fffffffffffffffULL; 587 588 if (mInput) { 589 mInput->AsyncWait(this, 0, 0, nullptr); 590 } 591 return NS_OK; 592 } 593 594 NS_IMETHODIMP 595 Http2WebTransportSession::OnInputStreamReady(nsIAsyncInputStream* aIn) { 596 char buffer[nsIOService::gDefaultSegmentSize]; 597 uint32_t remainingCapacity = sizeof(buffer); 598 uint32_t read = 0; 599 600 while (remainingCapacity > 0) { 601 uint32_t count = 0; 602 nsresult rv = mInput->Read(buffer + read, remainingCapacity, &count); 603 if (rv == NS_BASE_STREAM_WOULD_BLOCK) { 604 break; 605 } 606 607 if (NS_FAILED(rv)) { 608 LOG(("Http2WebTransportSession::OnInputStreamReady %p failed 0x%x\n", 609 this, static_cast<uint32_t>(rv))); 610 // TODO: close connection 611 return rv; 612 } 613 614 // base stream closed 615 if (count == 0) { 616 LOG(( 617 "Http2WebTransportSession::OnInputStreamReady %p connection closed\n", 618 this)); 619 // Close with NS_BASE_STREAM_CLOSED 620 return NS_OK; 621 } 622 623 remainingCapacity -= count; 624 read += count; 625 } 626 627 if (read > 0) { 628 Http2Session::LogIO(nullptr, this, "Http2WebTransportSession", buffer, 629 read); 630 631 mCapsuleParser->ProcessCapsuleData(reinterpret_cast<uint8_t*>(buffer), 632 read); 633 } 634 635 mInput->AsyncWait(this, 0, 0, nullptr); 636 return NS_OK; 637 } 638 639 NS_IMETHODIMP 640 Http2WebTransportSession::OnOutputStreamReady(nsIAsyncOutputStream* aOut) { 641 if (!mCurrentOutCapsule) { 642 mImpl->PrepareCapsulesToSend(mOutgoingQueue); 643 if (mOutgoingQueue.IsEmpty()) { 644 return NS_OK; 645 } 646 mCurrentOutCapsule = mOutgoingQueue.Pop(); 647 } 648 649 while (mCurrentOutCapsule && mOutput) { 650 auto buffer = mCurrentOutCapsule->GetBuffer(); 651 const char* writeBuffer = 652 reinterpret_cast<const char*>(buffer.Elements()) + mWriteOffset; 653 uint32_t toWrite = buffer.Length() - mWriteOffset; 654 655 uint32_t wrote = 0; 656 nsresult rv = mOutput->Write(writeBuffer, toWrite, &wrote); 657 if (rv == NS_BASE_STREAM_WOULD_BLOCK) { 658 mOutput->AsyncWait(this, 0, 0, nullptr); 659 return NS_OK; 660 } 661 662 if (NS_FAILED(rv)) { 663 LOG(("Http2WebTransportSession::OnOutputStreamReady %p failed %u\n", this, 664 static_cast<uint32_t>(rv))); 665 // TODO: close connection 666 return NS_OK; 667 } 668 669 mWriteOffset += wrote; 670 671 Maybe<StreamMetadata> metadata = mCurrentOutCapsule->GetStreamMetadata(); 672 // This is a WT_STREAM_DATA capsule, so we need to track how many bytes of 673 // stream data are sent. 674 if (metadata) { 675 if (mWriteOffset > metadata->mStartOfData) { 676 uint64_t dataSent = mWriteOffset - metadata->mStartOfData; 677 mImpl->OnStreamDataSent(StreamId(metadata->mID), dataSent); 678 } 679 } 680 681 if (toWrite == wrote) { 682 mWriteOffset = 0; 683 mCurrentOutCapsule = 684 mOutgoingQueue.IsEmpty() ? nullptr : mOutgoingQueue.Pop(); 685 } 686 } 687 688 return NS_OK; 689 } 690 691 void Http2WebTransportSession::HasCapsuleToSend() { 692 LOG(("Http2WebTransportSession::HasCapsuleToSend %p mSendClosed=%d", this, 693 mSendClosed)); 694 if (mSendClosed) { 695 return; 696 } 697 698 mImpl->PrepareCapsulesToSend(mOutgoingQueue); 699 700 if (mOutput) { 701 OnOutputStreamReady(mOutput); 702 } 703 } 704 705 void Http2WebTransportSession::SetSentFin() { 706 Http2StreamTunnel::SetSentFin(true); 707 } 708 709 void Http2WebTransportSession::StartReading() { 710 if (mInput) { 711 mInput->AsyncWait(this, 0, 0, nullptr); 712 } 713 } 714 715 void Http2WebTransportSession::OnCapsuleParseFailure(nsresult aError) { 716 LOG(("Http2WebTransportSession::OnCapsuleParseFailure %p aError=%" PRIX32, 717 this, static_cast<uint32_t>(aError))); 718 } 719 720 } // namespace mozilla::net