WebTransportStreamProxy.cpp (12815B)
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ 2 /* This Source Code Form is subject to the terms of the Mozilla Public 3 * License, v. 2.0. If a copy of the MPL was not distributed with this 4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 5 6 #include "WebTransportStreamProxy.h" 7 8 #include "WebTransportLog.h" 9 #include "nsProxyRelease.h" 10 #include "nsSocketTransportService2.h" 11 12 namespace mozilla::net { 13 14 NS_IMPL_ADDREF(WebTransportStreamProxy) 15 NS_IMPL_RELEASE(WebTransportStreamProxy) 16 17 NS_INTERFACE_MAP_BEGIN(WebTransportStreamProxy) 18 NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIWebTransportReceiveStream) 19 NS_INTERFACE_MAP_ENTRY(nsIWebTransportReceiveStream) 20 NS_INTERFACE_MAP_ENTRY(nsIWebTransportSendStream) 21 NS_INTERFACE_MAP_ENTRY(nsIWebTransportBidirectionalStream) 22 NS_INTERFACE_MAP_END 23 24 WebTransportStreamProxy::WebTransportStreamProxy( 25 WebTransportStreamBase* aStream) 26 : mWebTransportStream(aStream) { 27 nsCOMPtr<nsIAsyncInputStream> inputStream; 28 nsCOMPtr<nsIAsyncOutputStream> outputStream; 29 mWebTransportStream->GetWriterAndReader(getter_AddRefs(outputStream), 30 getter_AddRefs(inputStream)); 31 if (outputStream) { 32 mWriter = new AsyncOutputStreamWrapper(outputStream); 33 } 34 if (inputStream) { 35 mReader = new AsyncInputStreamWrapper(inputStream, mWebTransportStream); 36 } 37 } 38 39 WebTransportStreamProxy::~WebTransportStreamProxy() { 40 // mWebTransportStream needs to be destroyed on the socket thread. 41 NS_ProxyRelease("WebTransportStreamProxy::~WebTransportStreamProxy", 42 gSocketTransportService, mWebTransportStream.forget()); 43 } 44 45 NS_IMETHODIMP WebTransportStreamProxy::SendStopSending(uint8_t aError) { 46 if (!OnSocketThread()) { 47 RefPtr<WebTransportStreamProxy> self(this); 48 return gSocketTransportService->Dispatch( 49 NS_NewRunnableFunction("WebTransportStreamProxy::SendStopSending", 50 [self{std::move(self)}, error(aError)]() { 51 self->SendStopSending(error); 52 })); 53 } 54 55 mWebTransportStream->SendStopSending(aError); 56 return NS_OK; 57 } 58 59 NS_IMETHODIMP WebTransportStreamProxy::SendFin(void) { 60 if (!mWriter) { 61 return NS_ERROR_UNEXPECTED; 62 } 63 64 mWriter->Close(); 65 66 if (!OnSocketThread()) { 67 RefPtr<WebTransportStreamProxy> self(this); 68 return gSocketTransportService->Dispatch(NS_NewRunnableFunction( 69 "WebTransportStreamProxy::SendFin", 70 [self{std::move(self)}]() { self->mWebTransportStream->SendFin(); })); 71 } 72 73 mWebTransportStream->SendFin(); 74 return NS_OK; 75 } 76 77 NS_IMETHODIMP WebTransportStreamProxy::Reset(uint8_t aErrorCode) { 78 if (!mWriter) { 79 return NS_ERROR_UNEXPECTED; 80 } 81 82 mWriter->Close(); 83 84 if (!OnSocketThread()) { 85 RefPtr<WebTransportStreamProxy> self(this); 86 return gSocketTransportService->Dispatch( 87 NS_NewRunnableFunction("WebTransportStreamProxy::Reset", 88 [self{std::move(self)}, error(aErrorCode)]() { 89 self->mWebTransportStream->Reset(error); 90 })); 91 } 92 93 mWebTransportStream->Reset(aErrorCode); 94 return NS_OK; 95 } 96 97 namespace { 98 99 class StatsCallbackWrapper : public nsIWebTransportStreamStatsCallback { 100 public: 101 NS_DECL_THREADSAFE_ISUPPORTS 102 103 explicit StatsCallbackWrapper(nsIWebTransportStreamStatsCallback* aCallback) 104 : mCallback(aCallback), mTarget(GetCurrentSerialEventTarget()) {} 105 106 NS_IMETHOD OnSendStatsAvailable( 107 nsIWebTransportSendStreamStats* aStats) override { 108 if (!mTarget->IsOnCurrentThread()) { 109 RefPtr<StatsCallbackWrapper> self(this); 110 nsCOMPtr<nsIWebTransportSendStreamStats> stats = aStats; 111 (void)mTarget->Dispatch(NS_NewRunnableFunction( 112 "StatsCallbackWrapper::OnSendStatsAvailable", 113 [self{std::move(self)}, stats{std::move(stats)}]() { 114 self->OnSendStatsAvailable(stats); 115 })); 116 return NS_OK; 117 } 118 119 mCallback->OnSendStatsAvailable(aStats); 120 return NS_OK; 121 } 122 123 NS_IMETHOD OnReceiveStatsAvailable( 124 nsIWebTransportReceiveStreamStats* aStats) override { 125 if (!mTarget->IsOnCurrentThread()) { 126 RefPtr<StatsCallbackWrapper> self(this); 127 nsCOMPtr<nsIWebTransportReceiveStreamStats> stats = aStats; 128 (void)mTarget->Dispatch(NS_NewRunnableFunction( 129 "StatsCallbackWrapper::OnReceiveStatsAvailable", 130 [self{std::move(self)}, stats{std::move(stats)}]() { 131 self->OnReceiveStatsAvailable(stats); 132 })); 133 return NS_OK; 134 } 135 136 mCallback->OnReceiveStatsAvailable(aStats); 137 return NS_OK; 138 } 139 140 private: 141 virtual ~StatsCallbackWrapper() { 142 NS_ProxyRelease("StatsCallbackWrapper::~StatsCallbackWrapper", mTarget, 143 mCallback.forget()); 144 } 145 146 nsCOMPtr<nsIWebTransportStreamStatsCallback> mCallback; 147 nsCOMPtr<nsIEventTarget> mTarget; 148 }; 149 150 NS_IMPL_ISUPPORTS(StatsCallbackWrapper, nsIWebTransportStreamStatsCallback) 151 152 } // namespace 153 154 NS_IMETHODIMP WebTransportStreamProxy::GetSendStreamStats( 155 nsIWebTransportStreamStatsCallback* aCallback) { 156 if (!OnSocketThread()) { 157 RefPtr<WebTransportStreamProxy> self(this); 158 nsCOMPtr<nsIWebTransportStreamStatsCallback> callback = 159 new StatsCallbackWrapper(aCallback); 160 return gSocketTransportService->Dispatch(NS_NewRunnableFunction( 161 "WebTransportStreamProxy::GetSendStreamStats", 162 [self{std::move(self)}, callback{std::move(callback)}]() { 163 self->GetSendStreamStats(callback); 164 })); 165 } 166 167 nsCOMPtr<nsIWebTransportSendStreamStats> stats = 168 mWebTransportStream->GetSendStreamStats(); 169 aCallback->OnSendStatsAvailable(stats); 170 return NS_OK; 171 } 172 173 NS_IMETHODIMP WebTransportStreamProxy::GetReceiveStreamStats( 174 nsIWebTransportStreamStatsCallback* aCallback) { 175 if (!OnSocketThread()) { 176 RefPtr<WebTransportStreamProxy> self(this); 177 nsCOMPtr<nsIWebTransportStreamStatsCallback> callback = 178 new StatsCallbackWrapper(aCallback); 179 return gSocketTransportService->Dispatch(NS_NewRunnableFunction( 180 "WebTransportStreamProxy::GetReceiveStreamStats", 181 [self{std::move(self)}, callback{std::move(callback)}]() { 182 self->GetReceiveStreamStats(callback); 183 })); 184 } 185 186 nsCOMPtr<nsIWebTransportReceiveStreamStats> stats = 187 mWebTransportStream->GetReceiveStreamStats(); 188 aCallback->OnReceiveStatsAvailable(stats); 189 return NS_OK; 190 } 191 192 NS_IMETHODIMP WebTransportStreamProxy::GetHasReceivedFIN( 193 bool* aHasReceivedFIN) { 194 *aHasReceivedFIN = mWebTransportStream->RecvDone(); 195 return NS_OK; 196 } 197 198 NS_IMETHODIMP WebTransportStreamProxy::GetInputStream( 199 nsIAsyncInputStream** aOut) { 200 if (!mReader) { 201 return NS_ERROR_NOT_AVAILABLE; 202 } 203 204 RefPtr<AsyncInputStreamWrapper> stream = mReader; 205 stream.forget(aOut); 206 return NS_OK; 207 } 208 209 NS_IMETHODIMP WebTransportStreamProxy::GetOutputStream( 210 nsIAsyncOutputStream** aOut) { 211 if (!mWriter) { 212 return NS_ERROR_NOT_AVAILABLE; 213 } 214 215 RefPtr<AsyncOutputStreamWrapper> stream = mWriter; 216 stream.forget(aOut); 217 return NS_OK; 218 } 219 220 NS_IMETHODIMP WebTransportStreamProxy::GetStreamId(uint64_t* aId) { 221 *aId = mWebTransportStream->GetStreamId(); 222 return NS_OK; 223 } 224 225 NS_IMETHODIMP WebTransportStreamProxy::SetSendOrder(Maybe<int64_t> aSendOrder) { 226 if (!OnSocketThread()) { 227 return gSocketTransportService->Dispatch(NS_NewRunnableFunction( 228 "SetSendOrder", [stream = mWebTransportStream, aSendOrder]() { 229 stream->SetSendOrder(aSendOrder); 230 })); 231 } 232 mWebTransportStream->SetSendOrder(aSendOrder); 233 return NS_OK; 234 } 235 236 //------------------------------------------------------------------------------ 237 // WebTransportStreamProxy::AsyncInputStreamWrapper 238 //------------------------------------------------------------------------------ 239 240 NS_IMPL_ISUPPORTS(WebTransportStreamProxy::AsyncInputStreamWrapper, 241 nsIInputStream, nsIAsyncInputStream) 242 243 WebTransportStreamProxy::AsyncInputStreamWrapper::AsyncInputStreamWrapper( 244 nsIAsyncInputStream* aStream, WebTransportStreamBase* aWebTransportStream) 245 : mStream(aStream), mWebTransportStream(aWebTransportStream) {} 246 247 WebTransportStreamProxy::AsyncInputStreamWrapper::~AsyncInputStreamWrapper() = 248 default; 249 250 void WebTransportStreamProxy::AsyncInputStreamWrapper::MaybeCloseStream() { 251 if (!mWebTransportStream->RecvDone()) { 252 return; 253 } 254 255 uint64_t available = 0; 256 (void)Available(&available); 257 if (available) { 258 // Don't close the InputStream if there's unread data available, since it 259 // would be lost. We exit above unless we know no more data will be received 260 // for the stream. 261 return; 262 } 263 264 LOG( 265 ("AsyncInputStreamWrapper::MaybeCloseStream close stream due to FIN " 266 "stream=%p", 267 mWebTransportStream.get())); 268 Close(); 269 } 270 271 NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::Close() { 272 return mStream->CloseWithStatus(NS_BASE_STREAM_CLOSED); 273 } 274 275 NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::Available( 276 uint64_t* aAvailable) { 277 return mStream->Available(aAvailable); 278 } 279 280 NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::StreamStatus() { 281 return mStream->StreamStatus(); 282 } 283 284 NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::Read( 285 char* aBuf, uint32_t aCount, uint32_t* aResult) { 286 LOG(("WebTransportStreamProxy::AsyncInputStreamWrapper::Read %p", this)); 287 nsresult rv = mStream->Read(aBuf, aCount, aResult); 288 MaybeCloseStream(); 289 return rv; 290 } 291 292 NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::ReadSegments( 293 nsWriteSegmentFun aWriter, void* aClosure, uint32_t aCount, 294 uint32_t* aResult) { 295 LOG(("WebTransportStreamProxy::AsyncInputStreamWrapper::ReadSegments %p", 296 this)); 297 nsresult rv = mStream->ReadSegments(aWriter, aClosure, aCount, aResult); 298 if (*aResult > 0) { 299 LOG((" Read %u bytes", *aResult)); 300 } 301 MaybeCloseStream(); 302 return rv; 303 } 304 305 NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::IsNonBlocking( 306 bool* aResult) { 307 return mStream->IsNonBlocking(aResult); 308 } 309 310 NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::CloseWithStatus( 311 nsresult aStatus) { 312 return mStream->CloseWithStatus(aStatus); 313 } 314 315 NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::AsyncWait( 316 nsIInputStreamCallback* aCallback, uint32_t aFlags, 317 uint32_t aRequestedCount, nsIEventTarget* aEventTarget) { 318 return mStream->AsyncWait(aCallback, aFlags, aRequestedCount, aEventTarget); 319 } 320 321 //------------------------------------------------------------------------------ 322 // WebTransportStreamProxy::AsyncOutputStreamWrapper 323 //------------------------------------------------------------------------------ 324 325 NS_IMPL_ISUPPORTS(WebTransportStreamProxy::AsyncOutputStreamWrapper, 326 nsIOutputStream, nsIAsyncOutputStream) 327 328 WebTransportStreamProxy::AsyncOutputStreamWrapper::AsyncOutputStreamWrapper( 329 nsIAsyncOutputStream* aStream) 330 : mStream(aStream) {} 331 332 WebTransportStreamProxy::AsyncOutputStreamWrapper::~AsyncOutputStreamWrapper() = 333 default; 334 335 NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::Flush() { 336 return mStream->Flush(); 337 } 338 339 NS_IMETHODIMP 340 WebTransportStreamProxy::AsyncOutputStreamWrapper::StreamStatus() { 341 return mStream->StreamStatus(); 342 } 343 344 NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::Write( 345 const char* aBuf, uint32_t aCount, uint32_t* aResult) { 346 LOG( 347 ("WebTransportStreamProxy::AsyncOutputStreamWrapper::Write %p %u bytes, " 348 "first byte %c", 349 this, aCount, aBuf[0])); 350 return mStream->Write(aBuf, aCount, aResult); 351 } 352 353 NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::WriteFrom( 354 nsIInputStream* aFromStream, uint32_t aCount, uint32_t* aResult) { 355 return mStream->WriteFrom(aFromStream, aCount, aResult); 356 } 357 358 NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::WriteSegments( 359 nsReadSegmentFun aReader, void* aClosure, uint32_t aCount, 360 uint32_t* aResult) { 361 return mStream->WriteSegments(aReader, aClosure, aCount, aResult); 362 } 363 364 NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::AsyncWait( 365 nsIOutputStreamCallback* aCallback, uint32_t aFlags, 366 uint32_t aRequestedCount, nsIEventTarget* aEventTarget) { 367 return mStream->AsyncWait(aCallback, aFlags, aRequestedCount, aEventTarget); 368 } 369 370 NS_IMETHODIMP 371 WebTransportStreamProxy::AsyncOutputStreamWrapper::CloseWithStatus( 372 nsresult aStatus) { 373 return mStream->CloseWithStatus(aStatus); 374 } 375 376 NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::Close() { 377 return mStream->Close(); 378 } 379 380 NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::IsNonBlocking( 381 bool* aResult) { 382 return mStream->IsNonBlocking(aResult); 383 } 384 385 } // namespace mozilla::net