Http2WebTransportStream.cpp (15198B)
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* This Source Code Form is subject to the terms of the Mozilla Public 3 * License, v. 2.0. If a copy of the MPL was not distributed with this 4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 5 6 // HttpLog.h should generally be included first 7 #include "HttpLog.h" 8 9 #include <algorithm> 10 #include "Http2WebTransportStream.h" 11 #include "Http2WebTransportSession.h" 12 #include "Capsule.h" 13 #include "CapsuleEncoder.h" 14 #include "nsIOService.h" 15 16 namespace mozilla::net { 17 18 NS_IMPL_ISUPPORTS(Http2WebTransportStream, nsIOutputStreamCallback, 19 nsIInputStreamCallback) 20 21 Http2WebTransportStream::Http2WebTransportStream( 22 Http2WebTransportSessionImpl* aWebTransportSession, StreamId aStreamId, 23 uint64_t aInitialMaxStreamData, uint64_t aInitialLocalMaxStreamData, 24 std::function<void(Result<RefPtr<WebTransportStreamBase>, nsresult>&&)>&& 25 aCallback) 26 : WebTransportStreamBase(aWebTransportSession->GetStreamId(), 27 std::move(aCallback)), 28 mWebTransportSession(aWebTransportSession), 29 mStreamId(aStreamId), 30 mOwnerThread(GetCurrentSerialEventTarget()), 31 mFc(aStreamId, aInitialMaxStreamData), 32 mReceiverFc(aStreamId, aInitialLocalMaxStreamData) { 33 LOG(("Http2WebTransportStream outgoing ctor:%p", this)); 34 mStreamRole = OUTGOING; 35 mStreamType = mStreamId.StreamType(); 36 } 37 38 Http2WebTransportStream::Http2WebTransportStream( 39 Http2WebTransportSessionImpl* aWebTransportSession, 40 uint64_t aInitialMaxStreamData, uint64_t aInitialLocalMaxStreamData, 41 StreamId aStreamId) 42 : WebTransportStreamBase(aWebTransportSession->GetStreamId(), nullptr), 43 mWebTransportSession(aWebTransportSession), 44 mStreamId(aStreamId), 45 mOwnerThread(GetCurrentSerialEventTarget()), 46 mFc(aStreamId, aInitialMaxStreamData), 47 mReceiverFc(aStreamId, aInitialLocalMaxStreamData) { 48 LOG(("Http2WebTransportStream incoming ctor:%p", this)); 49 mStreamRole = INCOMING; 50 mStreamType = mStreamId.StreamType(); 51 } 52 53 Http2WebTransportStream::~Http2WebTransportStream() { 54 LOG(("Http2WebTransportStream dtor:%p", this)); 55 } 56 57 nsresult Http2WebTransportStream::Init() { 58 nsresult rv = NS_OK; 59 auto resultCallback = MakeScopeExit([&] { 60 if (NS_FAILED(rv)) { 61 mSendState = SEND_DONE; 62 mRecvState = RECV_DONE; 63 if (mStreamReadyCallback) { 64 mStreamReadyCallback(Err(rv)); 65 } 66 67 } else { 68 mSocketInCondition = NS_OK; 69 mSocketOutCondition = NS_OK; 70 RefPtr<WebTransportStreamBase> stream = this; 71 if (mStreamReadyCallback) { 72 mStreamReadyCallback(stream); 73 } 74 } 75 mStreamReadyCallback = nullptr; 76 }); 77 78 if (mStreamRole == INCOMING) { 79 rv = InitInputPipe(); 80 if (NS_FAILED(rv)) { 81 return rv; 82 } 83 84 if (mStreamType == WebTransportStreamType::BiDi) { 85 rv = InitOutputPipe(); 86 } 87 88 return rv; 89 } 90 91 MOZ_ASSERT(mStreamRole == OUTGOING); 92 rv = InitOutputPipe(); 93 if (NS_FAILED(rv)) { 94 return rv; 95 } 96 97 if (mStreamType == WebTransportStreamType::BiDi) { 98 rv = InitInputPipe(); 99 } 100 101 if (mSendStreamPipeIn) { 102 rv = mSendStreamPipeIn->AsyncWait(this, 0, 0, mOwnerThread); 103 } 104 105 return rv; 106 } 107 108 class StreamId Http2WebTransportStream::WebTransportStreamId() const { 109 return mStreamId; 110 } 111 112 uint64_t Http2WebTransportStream::GetStreamId() const { return mStreamId; } 113 114 void Http2WebTransportStream::SendStopSending(uint8_t aErrorCode) { 115 if (mSentStopSending || !mWebTransportSession) { 116 // https://www.ietf.org/archive/id/draft-ietf-webtrans-http2-11.html#section-6.3 117 // A WT_STOP_SENDING capsule MUST NOT be sent multiple times for the same 118 // stream. 119 return; 120 } 121 122 mSentStopSending = true; 123 mStopSendingCapsule.emplace( 124 Capsule::WebTransportStopSending(aErrorCode, mStreamId)); 125 mWebTransportSession->StreamHasCapsuleToSend(); 126 mRecvState = RECV_DONE; 127 } 128 129 void Http2WebTransportStream::SendFin() {} 130 131 void Http2WebTransportStream::Reset(uint64_t aErrorCode) { 132 if (mSentReset || !mWebTransportSession || mSendState == SEND_DONE) { 133 // https://www.ietf.org/archive/id/draft-ietf-webtrans-http2-11.html#section-6.2 134 // A WT_RESET_STREAM capsule MUST NOT be sent after a stream is closed or 135 // reset. 136 return; 137 } 138 139 mSentReset = true; 140 mStreamResetCapsule.emplace(Capsule::WebTransportResetStream( 141 aErrorCode, mTotalSent.value(), mStreamId)); 142 mWebTransportSession->StreamHasCapsuleToSend(); 143 mRecvState = RECV_DONE; 144 mSendState = SEND_DONE; 145 } 146 147 already_AddRefed<nsIWebTransportSendStreamStats> 148 Http2WebTransportStream::GetSendStreamStats() { 149 return nullptr; 150 } 151 152 already_AddRefed<nsIWebTransportReceiveStreamStats> 153 Http2WebTransportStream::GetReceiveStreamStats() { 154 return nullptr; 155 } 156 157 bool Http2WebTransportStream::RecvDone() const { return false; } 158 159 void Http2WebTransportStream::SetSendOrder(Maybe<int64_t> aSendOrder) {} 160 161 NS_IMETHODIMP 162 Http2WebTransportStream::OnInputStreamReady(nsIAsyncInputStream* aIn) { 163 LOG1( 164 ("Http2WebTransportStream::OnInputStreamReady [this=%p stream=%p " 165 "state=%d]", 166 this, aIn, mSendState)); 167 if (mSendState == SEND_DONE) { 168 // already closed 169 return NS_OK; 170 } 171 172 uint32_t sendBytes = 0; 173 return mSendStreamPipeIn->ReadSegments( 174 ReadRequestSegment, this, nsIOService::gDefaultSegmentSize, &sendBytes); 175 } 176 177 NS_IMETHODIMP 178 Http2WebTransportStream::OnOutputStreamReady(nsIAsyncOutputStream* aOut) { 179 if (!mCurrentOut) { 180 if (mOutgoingQueue.IsEmpty()) { 181 return NS_OK; 182 } 183 mCurrentOut = mOutgoingQueue.Pop(); 184 } 185 186 while (mCurrentOut && mReceiveStreamPipeOut && (mRecvState != RECV_DONE)) { 187 char* writeBuffer = reinterpret_cast<char*>(const_cast<uint8_t*>( 188 mCurrentOut->GetData().Elements())) + 189 mWriteOffset; 190 uint32_t toWrite = mCurrentOut->GetData().Length() - mWriteOffset; 191 if (mReliableSize) { 192 if (mTotalReceived + toWrite > *mReliableSize) { 193 toWrite = *mReliableSize - mTotalReceived; 194 } 195 } 196 197 uint32_t wrote = 0; 198 nsresult rv = mReceiveStreamPipeOut->Write(writeBuffer, toWrite, &wrote); 199 LOG(("Http2WebTransportStream::Write rv=0x%" PRIx32 " wrote=%" PRIu32 200 " socketin=%" PRIx32 " [this=%p]", 201 static_cast<uint32_t>(rv), wrote, 202 static_cast<uint32_t>(mSocketInCondition), this)); 203 if (rv == NS_BASE_STREAM_WOULD_BLOCK) { 204 mSocketInCondition = 205 mReceiveStreamPipeOut->AsyncWait(this, 0, 0, nullptr); 206 return mSocketInCondition; 207 } 208 209 if (NS_FAILED(rv)) { 210 LOG(("Http2WebTransportStream::OnOutputStreamReady %p failed %u\n", this, 211 static_cast<uint32_t>(rv))); 212 // TODO: close this stream 213 mSocketInCondition = rv; 214 mCurrentOut = nullptr; 215 mRecvState = RECV_DONE; 216 return NS_OK; 217 } 218 219 // Retire when sending data to the consumer. 220 mReceiverFc.AddRetired(wrote); 221 mWebTransportSession->ReceiverFc().AddRetired(wrote); 222 223 mWriteOffset += wrote; 224 mTotalReceived += wrote; 225 // https://www.ietf.org/archive/id/draft-ietf-webtrans-http2-11.html#section-6.2 226 // A receiver of a WT_RESET_STREAM capsule can discard any data in excess of 227 // the Reliable Size indicated, even if that data was already received. 228 if (mReliableSize && mTotalReceived == *mReliableSize) { 229 mSocketInCondition = NS_OK; 230 mWriteOffset = 0; 231 mCurrentOut = nullptr; 232 mOutgoingQueue.Clear(); 233 mRecvState = RECV_DONE; 234 break; 235 } 236 237 if (toWrite == wrote) { 238 mWriteOffset = 0; 239 mCurrentOut = mOutgoingQueue.IsEmpty() ? nullptr : mOutgoingQueue.Pop(); 240 } 241 } 242 return NS_OK; 243 } 244 245 // static 246 nsresult Http2WebTransportStream::ReadRequestSegment( 247 nsIInputStream* stream, void* closure, const char* buf, uint32_t offset, 248 uint32_t count, uint32_t* countRead) { 249 Http2WebTransportStream* wtStream = (Http2WebTransportStream*)closure; 250 LOG(("Http2WebTransportStream::ReadRequestSegment %p count=%u", wtStream, 251 count)); 252 *countRead = 0; 253 if (!wtStream->mWebTransportSession) { 254 return NS_ERROR_UNEXPECTED; 255 } 256 257 uint64_t limit = 258 std::min(wtStream->mWebTransportSession->SessionDataFc().Available(), 259 wtStream->mFc.Available()); 260 if (limit < count) { 261 if (wtStream->mWebTransportSession->SessionDataFc().Available() < count) { 262 LOG(("blocked by session level flow control")); 263 wtStream->mWebTransportSession->SessionDataFc().Blocked(); 264 } 265 if (wtStream->mFc.Available() < count) { 266 LOG(("blocked by stream level flow control")); 267 wtStream->mFc.Blocked(); 268 } 269 return NS_BASE_STREAM_WOULD_BLOCK; 270 } 271 272 nsTArray<uint8_t> data; 273 data.AppendElements(buf, count); 274 Capsule capsule = Capsule::WebTransportStreamData(wtStream->mStreamId, false, 275 std::move(data)); 276 UniquePtr<CapsuleEncoder> encoder = MakeUnique<CapsuleEncoder>(); 277 encoder->EncodeCapsule(capsule); 278 wtStream->mCapsuleQueue.Push(std::move(encoder)); 279 *countRead = count; 280 return NS_OK; 281 } 282 283 void Http2WebTransportStream::TakeOutputCapsule( 284 mozilla::Queue<UniquePtr<CapsuleEncoder>>& aOutput) { 285 LOG(("Http2WebTransportStream::TakeOutputCapsule %p", this)); 286 if (mCapsuleQueue.IsEmpty()) { 287 mSendStreamPipeIn->AsyncWait(this, 0, 0, mOwnerThread); 288 return; 289 } 290 while (!mCapsuleQueue.IsEmpty()) { 291 UniquePtr<CapsuleEncoder> entry = mCapsuleQueue.Pop(); 292 aOutput.Push(std::move(entry)); 293 } 294 mSendStreamPipeIn->AsyncWait(this, 0, 0, mOwnerThread); 295 } 296 297 void Http2WebTransportStream::WriteMaintenanceCapsules( 298 mozilla::Queue<UniquePtr<CapsuleEncoder>>& aOutput) { 299 if (mStopSendingCapsule) { 300 UniquePtr<CapsuleEncoder> encoder = MakeUnique<CapsuleEncoder>(); 301 encoder->EncodeCapsule(*mStopSendingCapsule); 302 mStopSendingCapsule = Nothing(); 303 aOutput.Push(std::move(encoder)); 304 } 305 306 if (mStreamResetCapsule) { 307 UniquePtr<CapsuleEncoder> encoder = MakeUnique<CapsuleEncoder>(); 308 encoder->EncodeCapsule(*mStreamResetCapsule); 309 mStreamResetCapsule = Nothing(); 310 aOutput.Push(std::move(encoder)); 311 } 312 313 auto dataBlocked = mFc.CreateStreamDataBlockedCapsule(); 314 if (dataBlocked) { 315 aOutput.Push(MakeUnique<CapsuleEncoder>(dataBlocked.ref())); 316 } 317 318 auto maxStreamData = mReceiverFc.CreateMaxStreamDataCapsule(); 319 if (maxStreamData) { 320 aOutput.Push(MakeUnique<CapsuleEncoder>(maxStreamData.ref())); 321 } 322 323 // Keep reading data from the consumer. 324 mSendStreamPipeIn->AsyncWait(this, 0, 0, mOwnerThread); 325 } 326 327 nsresult Http2WebTransportStream::OnCapsule(Capsule&& aCapsule) { 328 switch (aCapsule.Type()) { 329 case CapsuleType::WT_STREAM: { 330 LOG(("Handling WT_STREAM\n")); 331 WebTransportStreamDataCapsule& streamData = 332 aCapsule.GetWebTransportStreamDataCapsule(); 333 return HandleStreamData(false, std::move(streamData.mData)); 334 } 335 case CapsuleType::WT_STREAM_FIN: 336 LOG(("Handling WT_STREAM_FIN\n")); 337 break; 338 case CapsuleType::WT_MAX_STREAM_DATA: { 339 LOG(("Handling WT_MAX_STREAM_DATA\n")); 340 WebTransportMaxStreamDataCapsule& maxStreamData = 341 aCapsule.GetWebTransportMaxStreamDataCapsule(); 342 return HandleMaxStreamData(maxStreamData.mLimit); 343 } 344 case CapsuleType::WT_STREAM_DATA_BLOCKED: 345 LOG(("Handling WT_STREAM_DATA_BLOCKED\n")); 346 break; 347 default: 348 LOG(("Unhandled capsule type\n")); 349 break; 350 } 351 return NS_OK; 352 } 353 354 nsresult Http2WebTransportStream::HandleMaxStreamData(uint64_t aLimit) { 355 mFc.Update(aLimit); 356 return NS_OK; 357 } 358 359 void Http2WebTransportStream::OnStopSending() { mSendState = SEND_DONE; } 360 361 void Http2WebTransportStream::OnReset(uint64_t aSize) { 362 if (mReliableSize) { 363 return; 364 } 365 366 mReliableSize.emplace(aSize); 367 368 LOG(("Http2WebTransportStream::OnReset %p mReliableSize=%" PRIu64 369 " mTotalReceived=%" PRIu64, 370 this, *mReliableSize, mTotalReceived)); 371 if (*mReliableSize < mTotalReceived) { 372 // A receiver MUST treat the receipt of a WT_RESET_STREAM with a Reliable 373 // Size smaller than the number of bytes it has received on the stream as a 374 // session error. 375 // TODO: find a better error code. 376 mWebTransportSession->OnError(0); 377 } 378 } 379 380 void Http2WebTransportStream::OnStreamDataSent(size_t aCount) { 381 LOG(("Http2WebTransportStream::OnStreamDataSent %p aCount=%" PRIu64 382 " mTotalSent=%" PRIu64, 383 this, static_cast<uint64_t>(aCount), mTotalSent.value())); 384 mTotalSent += aCount; 385 if (!mTotalSent.isValid()) { 386 // TODO: find a better error code. 387 mWebTransportSession->OnError(0); 388 return; 389 } 390 391 mFc.Consume(aCount); 392 mWebTransportSession->SessionDataFc().Consume(aCount); 393 } 394 395 void Http2WebTransportStream::Close(nsresult aResult) { 396 if (mSendStreamPipeIn) { 397 mSendStreamPipeIn->AsyncWait(nullptr, 0, 0, nullptr); 398 mSendStreamPipeIn->CloseWithStatus(aResult); 399 } 400 if (mReceiveStreamPipeOut) { 401 mReceiveStreamPipeOut->AsyncWait(nullptr, 0, 0, nullptr); 402 mReceiveStreamPipeOut->CloseWithStatus(aResult); 403 } 404 mSendState = SEND_DONE; 405 mRecvState = RECV_DONE; 406 mWebTransportSession = nullptr; 407 } 408 409 nsresult Http2WebTransportStream::HandleStreamData(bool aFin, 410 nsTArray<uint8_t>&& aData) { 411 LOG(("Http2WebTransportStream::HandleStreamData [this=%p, state=%d aFin=%d", 412 this, static_cast<uint32_t>(mRecvState), aFin)); 413 414 if (NS_FAILED(mSocketInCondition)) { 415 mRecvState = RECV_DONE; 416 } 417 418 uint32_t countWrittenSingle = 0; 419 switch (mRecvState) { 420 case READING: { 421 size_t length = aData.Length(); 422 if (length) { 423 auto newConsumed = 424 mReceiverFc.SetConsumed(mReceiverFc.Consumed() + length); 425 if (newConsumed.isErr()) { 426 mSocketInCondition = newConsumed.unwrapErr(); 427 } else { 428 if (!mWebTransportSession->ReceiverFc().Consume( 429 newConsumed.unwrap())) { 430 LOG(("Exceed session flow control limit")); 431 mSocketInCondition = NS_ERROR_NOT_AVAILABLE; 432 } else { 433 mOutgoingQueue.Push(MakeUnique<StreamData>(std::move(aData))); 434 mSocketInCondition = OnOutputStreamReady(mReceiveStreamPipeOut); 435 } 436 } 437 } else { 438 // https://www.ietf.org/archive/id/draft-ietf-webtrans-http2-10.html#section-6.4 439 // Empty WT_STREAM capsules MUST NOT be used unless they open or close a 440 // stream 441 // TODO: Handle empty stream capsule 442 } 443 444 LOG(( 445 "Http2WebTransportStream::HandleStreamData " 446 "countWrittenSingle=%" PRIu32 " socketin=%" PRIx32 " [this=%p]", 447 countWrittenSingle, static_cast<uint32_t>(mSocketInCondition), this)); 448 449 if (NS_FAILED(mSocketInCondition)) { 450 mReceiveStreamPipeOut->Close(); 451 mRecvState = RECV_DONE; 452 } else { 453 if (aFin) { 454 mRecvState = RECEIVED_FIN; 455 } 456 } 457 } break; 458 case RECEIVED_FIN: 459 mRecvState = RECV_DONE; 460 break; 461 case RECV_DONE: 462 mSocketInCondition = NS_ERROR_UNEXPECTED; 463 break; 464 default: 465 mSocketInCondition = NS_ERROR_UNEXPECTED; 466 break; 467 } 468 469 return mSocketInCondition; 470 } 471 472 } // namespace mozilla::net