nsAsyncStreamCopier.cpp (11597B)
1 /* This Source Code Form is subject to the terms of the Mozilla Public 2 * License, v. 2.0. If a copy of the MPL was not distributed with this 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 4 5 #include "nsAsyncStreamCopier.h" 6 #include "nsComponentManagerUtils.h" 7 #include "nsIOService.h" 8 #include "nsIEventTarget.h" 9 #include "nsStreamUtils.h" 10 #include "nsThreadUtils.h" 11 #include "nsNetUtil.h" 12 #include "nsNetCID.h" 13 #include "nsIBufferedStreams.h" 14 #include "nsIRequestObserver.h" 15 #include "mozilla/Components.h" 16 #include "mozilla/Logging.h" 17 18 using namespace mozilla; 19 using namespace mozilla::net; 20 21 #undef LOG 22 // 23 // MOZ_LOG=nsStreamCopier:5 24 // 25 static LazyLogModule gStreamCopierLog("nsStreamCopier"); 26 #define LOG(args) MOZ_LOG(gStreamCopierLog, mozilla::LogLevel::Debug, args) 27 28 /** 29 * An event used to perform initialization off the main thread. 30 */ 31 class AsyncApplyBufferingPolicyEvent final : public Runnable { 32 public: 33 /** 34 * @param aCopier 35 * The nsAsyncStreamCopier requesting the information. 36 */ 37 explicit AsyncApplyBufferingPolicyEvent(nsAsyncStreamCopier* aCopier) 38 : mozilla::Runnable("AsyncApplyBufferingPolicyEvent"), 39 mCopier(aCopier), 40 mTarget(GetCurrentSerialEventTarget()) {} 41 42 NS_IMETHOD Run() override { 43 nsresult rv = mCopier->ApplyBufferingPolicy(); 44 if (NS_FAILED(rv)) { 45 mCopier->Cancel(rv); 46 return NS_OK; 47 } 48 49 rv = mTarget->Dispatch( 50 NewRunnableMethod("nsAsyncStreamCopier::AsyncCopyInternal", mCopier, 51 &nsAsyncStreamCopier::AsyncCopyInternal), 52 NS_DISPATCH_NORMAL); 53 MOZ_ASSERT(NS_SUCCEEDED(rv)); 54 55 if (NS_FAILED(rv)) { 56 mCopier->Cancel(rv); 57 } 58 return NS_OK; 59 } 60 61 private: 62 RefPtr<nsAsyncStreamCopier> mCopier; 63 nsCOMPtr<nsIEventTarget> mTarget; 64 }; 65 66 //----------------------------------------------------------------------------- 67 68 nsAsyncStreamCopier::nsAsyncStreamCopier() 69 : mChunkSize(nsIOService::gDefaultSegmentSize) { 70 LOG(("Creating nsAsyncStreamCopier @%p\n", this)); 71 } 72 73 nsAsyncStreamCopier::~nsAsyncStreamCopier() { 74 LOG(("Destroying nsAsyncStreamCopier @%p\n", this)); 75 } 76 77 bool nsAsyncStreamCopier::IsComplete(nsresult* status) { 78 MutexAutoLock lock(mLock); 79 if (status) *status = mStatus; 80 return !mIsPending; 81 } 82 83 nsIRequest* nsAsyncStreamCopier::AsRequest() { 84 return static_cast<nsIRequest*>(static_cast<nsIAsyncStreamCopier*>(this)); 85 } 86 87 void nsAsyncStreamCopier::Complete(nsresult status) { 88 LOG(("nsAsyncStreamCopier::Complete [this=%p status=%" PRIx32 "]\n", this, 89 static_cast<uint32_t>(status))); 90 91 nsCOMPtr<nsIRequestObserver> observer; 92 nsCOMPtr<nsISupports> ctx; 93 { 94 MutexAutoLock lock(mLock); 95 mCopierCtx = nullptr; 96 97 if (mIsPending) { 98 mIsPending = false; 99 mStatus = status; 100 101 // setup OnStopRequest callback and release references... 102 observer = mObserver; 103 mObserver = nullptr; 104 } 105 } 106 107 if (observer) { 108 LOG((" calling OnStopRequest [status=%" PRIx32 "]\n", 109 static_cast<uint32_t>(status))); 110 observer->OnStopRequest(AsRequest(), status); 111 } 112 } 113 114 void nsAsyncStreamCopier::OnAsyncCopyComplete(void* closure, nsresult status) { 115 // AddRef'd in AsyncCopy. Will be released at the end of the method. 116 RefPtr<nsAsyncStreamCopier> self = dont_AddRef((nsAsyncStreamCopier*)closure); 117 self->Complete(status); 118 } 119 120 //----------------------------------------------------------------------------- 121 // nsISupports 122 123 // We cannot use simply NS_IMPL_ISUPPORTSx as both 124 // nsIAsyncStreamCopier and nsIAsyncStreamCopier2 implement nsIRequest 125 126 NS_IMPL_ADDREF(nsAsyncStreamCopier) 127 NS_IMPL_RELEASE(nsAsyncStreamCopier) 128 NS_INTERFACE_TABLE_HEAD(nsAsyncStreamCopier) 129 NS_INTERFACE_TABLE_BEGIN 130 NS_INTERFACE_TABLE_ENTRY(nsAsyncStreamCopier, nsIAsyncStreamCopier) 131 NS_INTERFACE_TABLE_ENTRY(nsAsyncStreamCopier, nsIAsyncStreamCopier2) 132 NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsAsyncStreamCopier, nsIRequest, 133 nsIAsyncStreamCopier) 134 NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsAsyncStreamCopier, nsISupports, 135 nsIAsyncStreamCopier) 136 NS_INTERFACE_TABLE_END 137 NS_INTERFACE_TABLE_TAIL 138 139 //----------------------------------------------------------------------------- 140 // nsIRequest 141 142 NS_IMETHODIMP 143 nsAsyncStreamCopier::GetName(nsACString& name) { 144 name.Truncate(); 145 return NS_OK; 146 } 147 148 NS_IMETHODIMP 149 nsAsyncStreamCopier::IsPending(bool* result) { 150 *result = !IsComplete(); 151 return NS_OK; 152 } 153 154 NS_IMETHODIMP 155 nsAsyncStreamCopier::GetStatus(nsresult* status) { 156 IsComplete(status); 157 return NS_OK; 158 } 159 160 NS_IMETHODIMP nsAsyncStreamCopier::SetCanceledReason( 161 const nsACString& aReason) { 162 return nsIAsyncStreamCopier::SetCanceledReasonImpl(aReason); 163 } 164 165 NS_IMETHODIMP nsAsyncStreamCopier::GetCanceledReason(nsACString& aReason) { 166 return nsIAsyncStreamCopier::GetCanceledReasonImpl(aReason); 167 } 168 169 NS_IMETHODIMP nsAsyncStreamCopier::CancelWithReason(nsresult aStatus, 170 const nsACString& aReason) { 171 return nsIAsyncStreamCopier::CancelWithReasonImpl(aStatus, aReason); 172 } 173 174 NS_IMETHODIMP 175 nsAsyncStreamCopier::Cancel(nsresult status) { 176 nsCOMPtr<nsISupports> copierCtx; 177 { 178 MutexAutoLock lock(mLock); 179 if (!mIsPending) { 180 return NS_OK; 181 } 182 copierCtx.swap(mCopierCtx); 183 } 184 185 if (NS_SUCCEEDED(status)) { 186 NS_WARNING("cancel with non-failure status code"); 187 status = NS_BASE_STREAM_CLOSED; 188 } 189 190 if (copierCtx) NS_CancelAsyncCopy(copierCtx, status); 191 192 return NS_OK; 193 } 194 195 NS_IMETHODIMP 196 nsAsyncStreamCopier::Suspend() { 197 MOZ_ASSERT_UNREACHABLE("nsAsyncStreamCopier::Suspend"); 198 return NS_ERROR_NOT_IMPLEMENTED; 199 } 200 201 NS_IMETHODIMP 202 nsAsyncStreamCopier::Resume() { 203 MOZ_ASSERT_UNREACHABLE("nsAsyncStreamCopier::Resume"); 204 return NS_ERROR_NOT_IMPLEMENTED; 205 } 206 207 NS_IMETHODIMP 208 nsAsyncStreamCopier::GetLoadFlags(nsLoadFlags* aLoadFlags) { 209 *aLoadFlags = LOAD_NORMAL; 210 return NS_OK; 211 } 212 213 NS_IMETHODIMP 214 nsAsyncStreamCopier::SetLoadFlags(nsLoadFlags aLoadFlags) { return NS_OK; } 215 216 NS_IMETHODIMP 217 nsAsyncStreamCopier::GetTRRMode(nsIRequest::TRRMode* aTRRMode) { 218 return nsIAsyncStreamCopier::GetTRRModeImpl(aTRRMode); 219 } 220 221 NS_IMETHODIMP 222 nsAsyncStreamCopier::SetTRRMode(nsIRequest::TRRMode aTRRMode) { 223 return nsIAsyncStreamCopier::SetTRRModeImpl(aTRRMode); 224 } 225 226 NS_IMETHODIMP 227 nsAsyncStreamCopier::GetLoadGroup(nsILoadGroup** aLoadGroup) { 228 *aLoadGroup = nullptr; 229 return NS_OK; 230 } 231 232 NS_IMETHODIMP 233 nsAsyncStreamCopier::SetLoadGroup(nsILoadGroup* aLoadGroup) { return NS_OK; } 234 235 // Can't be accessed by multiple threads yet 236 nsresult nsAsyncStreamCopier::InitInternal( 237 nsIInputStream* source, nsIOutputStream* sink, nsIEventTarget* target, 238 uint32_t chunkSize, bool closeSource, 239 bool closeSink) MOZ_NO_THREAD_SAFETY_ANALYSIS { 240 NS_ASSERTION(!mSource && !mSink, "Init() called more than once"); 241 if (chunkSize == 0) { 242 chunkSize = nsIOService::gDefaultSegmentSize; 243 } 244 mChunkSize = chunkSize; 245 246 mSource = source; 247 mSink = sink; 248 mCloseSource = closeSource; 249 mCloseSink = closeSink; 250 251 if (target) { 252 mTarget = target; 253 } else { 254 nsresult rv; 255 mTarget = mozilla::components::StreamTransport::Service(&rv); 256 if (NS_FAILED(rv)) { 257 return rv; 258 } 259 } 260 261 return NS_OK; 262 } 263 264 //----------------------------------------------------------------------------- 265 // nsIAsyncStreamCopier 266 267 NS_IMETHODIMP 268 nsAsyncStreamCopier::Init(nsIInputStream* source, nsIOutputStream* sink, 269 nsIEventTarget* target, bool sourceBuffered, 270 bool sinkBuffered, uint32_t chunkSize, 271 bool closeSource, bool closeSink) { 272 NS_ASSERTION(sourceBuffered || sinkBuffered, 273 "at least one stream must be buffered"); 274 mMode = sourceBuffered ? NS_ASYNCCOPY_VIA_READSEGMENTS 275 : NS_ASYNCCOPY_VIA_WRITESEGMENTS; 276 277 return InitInternal(source, sink, target, chunkSize, closeSource, closeSink); 278 } 279 280 //----------------------------------------------------------------------------- 281 // nsIAsyncStreamCopier2 282 283 NS_IMETHODIMP 284 nsAsyncStreamCopier::Init(nsIInputStream* source, nsIOutputStream* sink, 285 nsIEventTarget* target, uint32_t chunkSize, 286 bool closeSource, bool closeSink) { 287 mShouldSniffBuffering = true; 288 289 return InitInternal(source, sink, target, chunkSize, closeSource, closeSink); 290 } 291 292 /** 293 * Detect whether the input or the output stream is buffered, 294 * bufferize one of them if neither is buffered. 295 */ 296 nsresult nsAsyncStreamCopier::ApplyBufferingPolicy() { 297 // This function causes I/O, it must not be executed on the main 298 // thread. 299 MOZ_ASSERT(!NS_IsMainThread()); 300 301 if (NS_OutputStreamIsBuffered(mSink)) { 302 // Sink is buffered, no need to perform additional buffering 303 mMode = NS_ASYNCCOPY_VIA_WRITESEGMENTS; 304 return NS_OK; 305 } 306 if (NS_InputStreamIsBuffered(mSource)) { 307 // Source is buffered, no need to perform additional buffering 308 mMode = NS_ASYNCCOPY_VIA_READSEGMENTS; 309 return NS_OK; 310 } 311 312 // No buffering, let's buffer the sink 313 nsresult rv; 314 nsCOMPtr<nsIBufferedOutputStream> sink = 315 do_CreateInstance(NS_BUFFEREDOUTPUTSTREAM_CONTRACTID, &rv); 316 if (NS_FAILED(rv)) { 317 return rv; 318 } 319 320 rv = sink->Init(mSink, mChunkSize); 321 if (NS_FAILED(rv)) { 322 return rv; 323 } 324 325 mMode = NS_ASYNCCOPY_VIA_WRITESEGMENTS; 326 mSink = sink; 327 return NS_OK; 328 } 329 330 //----------------------------------------------------------------------------- 331 // Both nsIAsyncStreamCopier and nsIAsyncStreamCopier2 332 333 NS_IMETHODIMP 334 nsAsyncStreamCopier::AsyncCopy(nsIRequestObserver* observer, nsISupports* ctx) { 335 LOG(("nsAsyncStreamCopier::AsyncCopy [this=%p observer=%p]\n", this, 336 observer)); 337 338 NS_ASSERTION(mSource && mSink, "not initialized"); 339 nsresult rv; 340 341 if (observer) { 342 // build proxy for observer events 343 rv = NS_NewRequestObserverProxy(getter_AddRefs(mObserver), observer, ctx); 344 if (NS_FAILED(rv)) return rv; 345 } 346 347 // from this point forward, AsyncCopy is going to return NS_OK. any errors 348 // will be reported via OnStopRequest. 349 { 350 MutexAutoLock lock(mLock); 351 mIsPending = true; 352 } 353 354 if (mObserver) { 355 rv = mObserver->OnStartRequest(AsRequest()); 356 if (NS_FAILED(rv)) Cancel(rv); 357 } 358 359 if (!mShouldSniffBuffering) { 360 // No buffer sniffing required, let's proceed 361 AsyncCopyInternal(); 362 return NS_OK; 363 } 364 365 if (NS_IsMainThread()) { 366 // Don't perform buffer sniffing on the main thread 367 nsCOMPtr<nsIRunnable> event = new AsyncApplyBufferingPolicyEvent(this); 368 rv = mTarget->Dispatch(event, NS_DISPATCH_NORMAL); 369 if (NS_FAILED(rv)) { 370 Cancel(rv); 371 } 372 return NS_OK; 373 } 374 375 // We're not going to block the main thread, so let's sniff here 376 rv = ApplyBufferingPolicy(); 377 if (NS_FAILED(rv)) { 378 Cancel(rv); 379 } 380 AsyncCopyInternal(); 381 return NS_OK; 382 } 383 384 // Launch async copy. 385 // All errors are reported through the observer. 386 void nsAsyncStreamCopier::AsyncCopyInternal() { 387 MOZ_ASSERT(mMode == NS_ASYNCCOPY_VIA_READSEGMENTS || 388 mMode == NS_ASYNCCOPY_VIA_WRITESEGMENTS); 389 390 nsresult rv; 391 // We want to receive progress notifications; release happens in 392 // OnAsyncCopyComplete. 393 RefPtr<nsAsyncStreamCopier> self = this; 394 { 395 MutexAutoLock lock(mLock); 396 rv = NS_AsyncCopy(mSource, mSink, mTarget, mMode, mChunkSize, 397 OnAsyncCopyComplete, this, mCloseSource, mCloseSink, 398 getter_AddRefs(mCopierCtx)); 399 } 400 if (NS_FAILED(rv)) { 401 Cancel(rv); 402 return; // release self 403 } 404 405 self.forget().leak(); // Will be released in OnAsyncCopyComplete 406 }