nsStreamTransportService.cpp (10951B)
1 /* This Source Code Form is subject to the terms of the Mozilla Public 2 * License, v. 2.0. If a copy of the MPL was not distributed with this 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 4 5 #include "nsStreamTransportService.h" 6 #include "ErrorList.h" 7 #include "nsXPCOMCIDInternal.h" 8 #include "nsNetSegmentUtils.h" 9 #include "nsTransportUtils.h" 10 #include "nsStreamUtils.h" 11 #include "nsError.h" 12 #include "nsNetCID.h" 13 14 #include "nsIAsyncInputStream.h" 15 #include "nsIAsyncOutputStream.h" 16 #include "nsIPipe.h" 17 #include "nsITransport.h" 18 #include "nsIObserverService.h" 19 #include "nsThreadPool.h" 20 #include "mozilla/Components.h" 21 #include "mozilla/Services.h" 22 23 namespace mozilla { 24 namespace net { 25 26 //----------------------------------------------------------------------------- 27 // nsInputStreamTransport 28 // 29 // Implements nsIInputStream as a wrapper around the real input stream. This 30 // allows the transport to support seeking, range-limiting, progress reporting, 31 // and close-when-done semantics while utilizing NS_AsyncCopy. 32 //----------------------------------------------------------------------------- 33 34 class nsInputStreamTransport : public nsITransport, 35 public nsIAsyncInputStream, 36 public nsIInputStreamCallback { 37 public: 38 NS_DECL_THREADSAFE_ISUPPORTS 39 NS_DECL_NSITRANSPORT 40 NS_DECL_NSIINPUTSTREAM 41 NS_DECL_NSIASYNCINPUTSTREAM 42 NS_DECL_NSIINPUTSTREAMCALLBACK 43 44 nsInputStreamTransport(nsIInputStream* source, bool closeWhenDone) 45 : mSource(source), mCloseWhenDone(closeWhenDone) { 46 mAsyncSource = do_QueryInterface(mSource); 47 } 48 49 private: 50 virtual ~nsInputStreamTransport() = default; 51 52 Mutex mMutex MOZ_UNANNOTATED{"nsInputStreamTransport::mMutex"}; 53 54 // This value is protected by mutex. 55 nsCOMPtr<nsIInputStreamCallback> mAsyncWaitCallback; 56 57 nsCOMPtr<nsIAsyncInputStream> mPipeIn; 58 59 // while the copy is active, these members may only be accessed from the 60 // nsIInputStream implementation. 61 nsCOMPtr<nsITransportEventSink> mEventSink; 62 nsCOMPtr<nsIInputStream> mSource; 63 64 // It can be null. 65 nsCOMPtr<nsIAsyncInputStream> mAsyncSource; 66 67 int64_t mOffset{0}; 68 const bool mCloseWhenDone; 69 70 // this variable serves as a lock to prevent the state of the transport 71 // from being modified once the copy is in progress. 72 bool mInProgress{false}; 73 }; 74 75 NS_IMPL_ADDREF(nsInputStreamTransport); 76 NS_IMPL_RELEASE(nsInputStreamTransport); 77 78 NS_INTERFACE_MAP_BEGIN(nsInputStreamTransport) 79 NS_INTERFACE_MAP_ENTRY(nsITransport) 80 NS_INTERFACE_MAP_ENTRY(nsIInputStream) 81 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStream, !!mAsyncSource) 82 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamCallback, !!mAsyncSource) 83 NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsITransport) 84 NS_INTERFACE_MAP_END 85 86 /** nsITransport **/ 87 88 NS_IMETHODIMP 89 nsInputStreamTransport::OpenInputStream(uint32_t flags, uint32_t segsize, 90 uint32_t segcount, 91 nsIInputStream** result) { 92 NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS); 93 94 nsresult rv; 95 nsCOMPtr<nsIEventTarget> target; 96 target = mozilla::components::StreamTransport::Service(&rv); 97 if (NS_FAILED(rv)) return rv; 98 99 // XXX if the caller requests an unbuffered stream, then perhaps 100 // we'd want to simply return mSource; however, then we would 101 // not be reading mSource on a background thread. is this ok? 102 103 bool nonblocking = !(flags & OPEN_BLOCKING); 104 105 net_ResolveSegmentParams(segsize, segcount); 106 107 nsCOMPtr<nsIAsyncOutputStream> pipeOut; 108 NS_NewPipe2(getter_AddRefs(mPipeIn), getter_AddRefs(pipeOut), nonblocking, 109 true, segsize, segcount); 110 111 mInProgress = true; 112 113 // startup async copy process... 114 rv = NS_AsyncCopy(this, pipeOut, target, NS_ASYNCCOPY_VIA_WRITESEGMENTS, 115 segsize); 116 if (NS_FAILED(rv)) { 117 return rv; 118 } 119 *result = do_AddRef(mPipeIn).take(); 120 return NS_OK; 121 } 122 123 NS_IMETHODIMP 124 nsInputStreamTransport::OpenOutputStream(uint32_t flags, uint32_t segsize, 125 uint32_t segcount, 126 nsIOutputStream** result) { 127 // this transport only supports reading! 128 MOZ_ASSERT_UNREACHABLE("nsInputStreamTransport::OpenOutputStream"); 129 return NS_ERROR_UNEXPECTED; 130 } 131 132 NS_IMETHODIMP 133 nsInputStreamTransport::Close(nsresult reason) { 134 if (NS_SUCCEEDED(reason)) reason = NS_BASE_STREAM_CLOSED; 135 136 return mPipeIn->CloseWithStatus(reason); 137 } 138 139 NS_IMETHODIMP 140 nsInputStreamTransport::SetEventSink(nsITransportEventSink* sink, 141 nsIEventTarget* target) { 142 NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS); 143 144 if (target) { 145 return net_NewTransportEventSinkProxy(getter_AddRefs(mEventSink), sink, 146 target); 147 } 148 149 mEventSink = sink; 150 return NS_OK; 151 } 152 153 /** nsIInputStream **/ 154 155 NS_IMETHODIMP 156 nsInputStreamTransport::Close() { 157 if (mCloseWhenDone) mSource->Close(); 158 159 // make additional reads return early... 160 mOffset = 0; 161 return NS_OK; 162 } 163 164 NS_IMETHODIMP 165 nsInputStreamTransport::Available(uint64_t* result) { 166 return NS_ERROR_NOT_IMPLEMENTED; 167 } 168 169 NS_IMETHODIMP 170 nsInputStreamTransport::StreamStatus() { return mSource->StreamStatus(); } 171 172 NS_IMETHODIMP 173 nsInputStreamTransport::Read(char* buf, uint32_t count, uint32_t* result) { 174 nsresult rv = mSource->Read(buf, count, result); 175 176 if (NS_SUCCEEDED(rv)) { 177 mOffset += *result; 178 if (mEventSink) { 179 mEventSink->OnTransportStatus(this, NS_NET_STATUS_READING, mOffset, -1); 180 } 181 } 182 return rv; 183 } 184 185 NS_IMETHODIMP 186 nsInputStreamTransport::ReadSegments(nsWriteSegmentFun writer, void* closure, 187 uint32_t count, uint32_t* result) { 188 return NS_ERROR_NOT_IMPLEMENTED; 189 } 190 191 NS_IMETHODIMP 192 nsInputStreamTransport::IsNonBlocking(bool* result) { 193 *result = false; 194 return NS_OK; 195 } 196 197 // nsIAsyncInputStream interface 198 199 NS_IMETHODIMP 200 nsInputStreamTransport::CloseWithStatus(nsresult aStatus) { return Close(); } 201 202 NS_IMETHODIMP 203 nsInputStreamTransport::AsyncWait(nsIInputStreamCallback* aCallback, 204 uint32_t aFlags, uint32_t aRequestedCount, 205 nsIEventTarget* aEventTarget) { 206 NS_ENSURE_STATE(!!mAsyncSource); 207 208 nsCOMPtr<nsIInputStreamCallback> callback = aCallback ? this : nullptr; 209 210 { 211 MutexAutoLock lock(mMutex); 212 213 if (NS_WARN_IF(mAsyncWaitCallback && aCallback && 214 mAsyncWaitCallback != aCallback)) { 215 return NS_ERROR_FAILURE; 216 } 217 218 mAsyncWaitCallback = aCallback; 219 } 220 221 return mAsyncSource->AsyncWait(callback, aFlags, aRequestedCount, 222 aEventTarget); 223 } 224 225 // nsIInputStreamCallback 226 227 NS_IMETHODIMP 228 nsInputStreamTransport::OnInputStreamReady(nsIAsyncInputStream* aStream) { 229 nsCOMPtr<nsIInputStreamCallback> callback; 230 { 231 MutexAutoLock lock(mMutex); 232 233 // We have been canceled in the meanwhile. 234 if (!mAsyncWaitCallback) { 235 return NS_OK; 236 } 237 238 callback.swap(mAsyncWaitCallback); 239 } 240 241 MOZ_ASSERT(callback); 242 return callback->OnInputStreamReady(this); 243 } 244 245 //----------------------------------------------------------------------------- 246 // nsStreamTransportService 247 //----------------------------------------------------------------------------- 248 249 nsStreamTransportService::nsStreamTransportService() = default; 250 251 nsStreamTransportService::~nsStreamTransportService() { 252 NS_ASSERTION(!mPool, "thread pool wasn't shutdown"); 253 } 254 255 nsresult nsStreamTransportService::Init() { 256 // Can't be used multithreaded before this 257 MOZ_PUSH_IGNORE_THREAD_SAFETY 258 MOZ_ASSERT(!mPool); 259 mPool = new nsThreadPool(); 260 261 // Configure the pool 262 mPool->SetName("StreamTrans"_ns); 263 // TODO: Make these settings configurable. 264 mPool->SetThreadLimit(25); 265 mPool->SetIdleThreadLimit(4); 266 mPool->SetIdleThreadMaximumTimeout(30 * 1000); 267 mPool->SetIdleThreadGraceTimeout(500); 268 MOZ_POP_THREAD_SAFETY 269 270 nsCOMPtr<nsIObserverService> obsSvc = mozilla::services::GetObserverService(); 271 if (obsSvc) obsSvc->AddObserver(this, "xpcom-shutdown-threads", false); 272 return NS_OK; 273 } 274 275 NS_IMPL_ISUPPORTS(nsStreamTransportService, nsIStreamTransportService, 276 nsIEventTarget, nsIObserver) 277 278 NS_IMETHODIMP 279 nsStreamTransportService::DispatchFromScript(nsIRunnable* task, 280 DispatchFlags flags) { 281 return Dispatch(do_AddRef(task), flags); 282 } 283 284 NS_IMETHODIMP 285 nsStreamTransportService::Dispatch(already_AddRefed<nsIRunnable> task, 286 DispatchFlags flags) { 287 // NOTE: To maintain existing behaviour, we never leak task on error, even if 288 // NS_DISPATCH_FALLIBLE is not specified. 289 nsCOMPtr<nsIRunnable> event(task); // so it gets released on failure paths 290 nsCOMPtr<nsIThreadPool> pool; 291 { 292 mozilla::MutexAutoLock lock(mShutdownLock); 293 if (mIsShutdown) { 294 return NS_ERROR_NOT_INITIALIZED; 295 } 296 pool = mPool; 297 } 298 NS_ENSURE_TRUE(pool, NS_ERROR_NOT_INITIALIZED); 299 return pool->Dispatch(event.forget(), flags); 300 } 301 302 NS_IMETHODIMP 303 nsStreamTransportService::DelayedDispatch(already_AddRefed<nsIRunnable> aEvent, 304 uint32_t aDelayMs) { 305 return NS_ERROR_NOT_IMPLEMENTED; 306 } 307 308 NS_IMETHODIMP 309 nsStreamTransportService::RegisterShutdownTask(nsITargetShutdownTask*) { 310 return NS_ERROR_NOT_IMPLEMENTED; 311 } 312 313 NS_IMETHODIMP 314 nsStreamTransportService::UnregisterShutdownTask(nsITargetShutdownTask*) { 315 return NS_ERROR_NOT_IMPLEMENTED; 316 } 317 318 NS_IMETHODIMP_(bool) 319 nsStreamTransportService::IsOnCurrentThreadInfallible() { 320 nsCOMPtr<nsIThreadPool> pool; 321 { 322 mozilla::MutexAutoLock lock(mShutdownLock); 323 pool = mPool; 324 } 325 if (!pool) { 326 return false; 327 } 328 return pool->IsOnCurrentThread(); 329 } 330 331 NS_IMETHODIMP 332 nsStreamTransportService::IsOnCurrentThread(bool* result) { 333 nsCOMPtr<nsIThreadPool> pool; 334 { 335 mozilla::MutexAutoLock lock(mShutdownLock); 336 if (mIsShutdown) { 337 return NS_ERROR_NOT_INITIALIZED; 338 } 339 pool = mPool; 340 } 341 NS_ENSURE_TRUE(pool, NS_ERROR_NOT_INITIALIZED); 342 return pool->IsOnCurrentThread(result); 343 } 344 345 NS_IMETHODIMP 346 nsStreamTransportService::CreateInputTransport(nsIInputStream* stream, 347 bool closeWhenDone, 348 nsITransport** result) { 349 RefPtr<nsInputStreamTransport> trans = 350 new nsInputStreamTransport(stream, closeWhenDone); 351 trans.forget(result); 352 return NS_OK; 353 } 354 355 NS_IMETHODIMP 356 nsStreamTransportService::Observe(nsISupports* subject, const char* topic, 357 const char16_t* data) { 358 NS_ASSERTION(strcmp(topic, "xpcom-shutdown-threads") == 0, "oops"); 359 360 { 361 nsCOMPtr<nsIThreadPool> pool; 362 { 363 mozilla::MutexAutoLock lock(mShutdownLock); 364 mIsShutdown = true; 365 pool = mPool.forget(); 366 } 367 368 if (pool) { 369 pool->Shutdown(); 370 } 371 } 372 return NS_OK; 373 } 374 375 } // namespace net 376 } // namespace mozilla