tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

nsStreamTransportService.cpp (10951B)


      1 /* This Source Code Form is subject to the terms of the Mozilla Public
      2 * License, v. 2.0. If a copy of the MPL was not distributed with this
      3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
      4 
      5 #include "nsStreamTransportService.h"
      6 #include "ErrorList.h"
      7 #include "nsXPCOMCIDInternal.h"
      8 #include "nsNetSegmentUtils.h"
      9 #include "nsTransportUtils.h"
     10 #include "nsStreamUtils.h"
     11 #include "nsError.h"
     12 #include "nsNetCID.h"
     13 
     14 #include "nsIAsyncInputStream.h"
     15 #include "nsIAsyncOutputStream.h"
     16 #include "nsIPipe.h"
     17 #include "nsITransport.h"
     18 #include "nsIObserverService.h"
     19 #include "nsThreadPool.h"
     20 #include "mozilla/Components.h"
     21 #include "mozilla/Services.h"
     22 
     23 namespace mozilla {
     24 namespace net {
     25 
     26 //-----------------------------------------------------------------------------
     27 // nsInputStreamTransport
     28 //
     29 // Implements nsIInputStream as a wrapper around the real input stream.  This
     30 // allows the transport to support seeking, range-limiting, progress reporting,
     31 // and close-when-done semantics while utilizing NS_AsyncCopy.
     32 //-----------------------------------------------------------------------------
     33 
     34 class nsInputStreamTransport : public nsITransport,
     35                               public nsIAsyncInputStream,
     36                               public nsIInputStreamCallback {
     37 public:
     38  NS_DECL_THREADSAFE_ISUPPORTS
     39  NS_DECL_NSITRANSPORT
     40  NS_DECL_NSIINPUTSTREAM
     41  NS_DECL_NSIASYNCINPUTSTREAM
     42  NS_DECL_NSIINPUTSTREAMCALLBACK
     43 
     44  nsInputStreamTransport(nsIInputStream* source, bool closeWhenDone)
     45      : mSource(source), mCloseWhenDone(closeWhenDone) {
     46    mAsyncSource = do_QueryInterface(mSource);
     47  }
     48 
     49 private:
     50  virtual ~nsInputStreamTransport() = default;
     51 
     52  Mutex mMutex MOZ_UNANNOTATED{"nsInputStreamTransport::mMutex"};
     53 
     54  // This value is protected by mutex.
     55  nsCOMPtr<nsIInputStreamCallback> mAsyncWaitCallback;
     56 
     57  nsCOMPtr<nsIAsyncInputStream> mPipeIn;
     58 
     59  // while the copy is active, these members may only be accessed from the
     60  // nsIInputStream implementation.
     61  nsCOMPtr<nsITransportEventSink> mEventSink;
     62  nsCOMPtr<nsIInputStream> mSource;
     63 
     64  // It can be null.
     65  nsCOMPtr<nsIAsyncInputStream> mAsyncSource;
     66 
     67  int64_t mOffset{0};
     68  const bool mCloseWhenDone;
     69 
     70  // this variable serves as a lock to prevent the state of the transport
     71  // from being modified once the copy is in progress.
     72  bool mInProgress{false};
     73 };
     74 
     75 NS_IMPL_ADDREF(nsInputStreamTransport);
     76 NS_IMPL_RELEASE(nsInputStreamTransport);
     77 
     78 NS_INTERFACE_MAP_BEGIN(nsInputStreamTransport)
     79  NS_INTERFACE_MAP_ENTRY(nsITransport)
     80  NS_INTERFACE_MAP_ENTRY(nsIInputStream)
     81  NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStream, !!mAsyncSource)
     82  NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamCallback, !!mAsyncSource)
     83  NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsITransport)
     84 NS_INTERFACE_MAP_END
     85 
     86 /** nsITransport **/
     87 
     88 NS_IMETHODIMP
     89 nsInputStreamTransport::OpenInputStream(uint32_t flags, uint32_t segsize,
     90                                        uint32_t segcount,
     91                                        nsIInputStream** result) {
     92  NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS);
     93 
     94  nsresult rv;
     95  nsCOMPtr<nsIEventTarget> target;
     96  target = mozilla::components::StreamTransport::Service(&rv);
     97  if (NS_FAILED(rv)) return rv;
     98 
     99  // XXX if the caller requests an unbuffered stream, then perhaps
    100  //     we'd want to simply return mSource; however, then we would
    101  //     not be reading mSource on a background thread.  is this ok?
    102 
    103  bool nonblocking = !(flags & OPEN_BLOCKING);
    104 
    105  net_ResolveSegmentParams(segsize, segcount);
    106 
    107  nsCOMPtr<nsIAsyncOutputStream> pipeOut;
    108  NS_NewPipe2(getter_AddRefs(mPipeIn), getter_AddRefs(pipeOut), nonblocking,
    109              true, segsize, segcount);
    110 
    111  mInProgress = true;
    112 
    113  // startup async copy process...
    114  rv = NS_AsyncCopy(this, pipeOut, target, NS_ASYNCCOPY_VIA_WRITESEGMENTS,
    115                    segsize);
    116  if (NS_FAILED(rv)) {
    117    return rv;
    118  }
    119  *result = do_AddRef(mPipeIn).take();
    120  return NS_OK;
    121 }
    122 
    123 NS_IMETHODIMP
    124 nsInputStreamTransport::OpenOutputStream(uint32_t flags, uint32_t segsize,
    125                                         uint32_t segcount,
    126                                         nsIOutputStream** result) {
    127  // this transport only supports reading!
    128  MOZ_ASSERT_UNREACHABLE("nsInputStreamTransport::OpenOutputStream");
    129  return NS_ERROR_UNEXPECTED;
    130 }
    131 
    132 NS_IMETHODIMP
    133 nsInputStreamTransport::Close(nsresult reason) {
    134  if (NS_SUCCEEDED(reason)) reason = NS_BASE_STREAM_CLOSED;
    135 
    136  return mPipeIn->CloseWithStatus(reason);
    137 }
    138 
    139 NS_IMETHODIMP
    140 nsInputStreamTransport::SetEventSink(nsITransportEventSink* sink,
    141                                     nsIEventTarget* target) {
    142  NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS);
    143 
    144  if (target) {
    145    return net_NewTransportEventSinkProxy(getter_AddRefs(mEventSink), sink,
    146                                          target);
    147  }
    148 
    149  mEventSink = sink;
    150  return NS_OK;
    151 }
    152 
    153 /** nsIInputStream **/
    154 
    155 NS_IMETHODIMP
    156 nsInputStreamTransport::Close() {
    157  if (mCloseWhenDone) mSource->Close();
    158 
    159  // make additional reads return early...
    160  mOffset = 0;
    161  return NS_OK;
    162 }
    163 
    164 NS_IMETHODIMP
    165 nsInputStreamTransport::Available(uint64_t* result) {
    166  return NS_ERROR_NOT_IMPLEMENTED;
    167 }
    168 
    169 NS_IMETHODIMP
    170 nsInputStreamTransport::StreamStatus() { return mSource->StreamStatus(); }
    171 
    172 NS_IMETHODIMP
    173 nsInputStreamTransport::Read(char* buf, uint32_t count, uint32_t* result) {
    174  nsresult rv = mSource->Read(buf, count, result);
    175 
    176  if (NS_SUCCEEDED(rv)) {
    177    mOffset += *result;
    178    if (mEventSink) {
    179      mEventSink->OnTransportStatus(this, NS_NET_STATUS_READING, mOffset, -1);
    180    }
    181  }
    182  return rv;
    183 }
    184 
    185 NS_IMETHODIMP
    186 nsInputStreamTransport::ReadSegments(nsWriteSegmentFun writer, void* closure,
    187                                     uint32_t count, uint32_t* result) {
    188  return NS_ERROR_NOT_IMPLEMENTED;
    189 }
    190 
    191 NS_IMETHODIMP
    192 nsInputStreamTransport::IsNonBlocking(bool* result) {
    193  *result = false;
    194  return NS_OK;
    195 }
    196 
    197 // nsIAsyncInputStream interface
    198 
    199 NS_IMETHODIMP
    200 nsInputStreamTransport::CloseWithStatus(nsresult aStatus) { return Close(); }
    201 
    202 NS_IMETHODIMP
    203 nsInputStreamTransport::AsyncWait(nsIInputStreamCallback* aCallback,
    204                                  uint32_t aFlags, uint32_t aRequestedCount,
    205                                  nsIEventTarget* aEventTarget) {
    206  NS_ENSURE_STATE(!!mAsyncSource);
    207 
    208  nsCOMPtr<nsIInputStreamCallback> callback = aCallback ? this : nullptr;
    209 
    210  {
    211    MutexAutoLock lock(mMutex);
    212 
    213    if (NS_WARN_IF(mAsyncWaitCallback && aCallback &&
    214                   mAsyncWaitCallback != aCallback)) {
    215      return NS_ERROR_FAILURE;
    216    }
    217 
    218    mAsyncWaitCallback = aCallback;
    219  }
    220 
    221  return mAsyncSource->AsyncWait(callback, aFlags, aRequestedCount,
    222                                 aEventTarget);
    223 }
    224 
    225 // nsIInputStreamCallback
    226 
    227 NS_IMETHODIMP
    228 nsInputStreamTransport::OnInputStreamReady(nsIAsyncInputStream* aStream) {
    229  nsCOMPtr<nsIInputStreamCallback> callback;
    230  {
    231    MutexAutoLock lock(mMutex);
    232 
    233    // We have been canceled in the meanwhile.
    234    if (!mAsyncWaitCallback) {
    235      return NS_OK;
    236    }
    237 
    238    callback.swap(mAsyncWaitCallback);
    239  }
    240 
    241  MOZ_ASSERT(callback);
    242  return callback->OnInputStreamReady(this);
    243 }
    244 
    245 //-----------------------------------------------------------------------------
    246 // nsStreamTransportService
    247 //-----------------------------------------------------------------------------
    248 
    249 nsStreamTransportService::nsStreamTransportService() = default;
    250 
    251 nsStreamTransportService::~nsStreamTransportService() {
    252  NS_ASSERTION(!mPool, "thread pool wasn't shutdown");
    253 }
    254 
    255 nsresult nsStreamTransportService::Init() {
    256  // Can't be used multithreaded before this
    257  MOZ_PUSH_IGNORE_THREAD_SAFETY
    258  MOZ_ASSERT(!mPool);
    259  mPool = new nsThreadPool();
    260 
    261  // Configure the pool
    262  mPool->SetName("StreamTrans"_ns);
    263  // TODO: Make these settings configurable.
    264  mPool->SetThreadLimit(25);
    265  mPool->SetIdleThreadLimit(4);
    266  mPool->SetIdleThreadMaximumTimeout(30 * 1000);
    267  mPool->SetIdleThreadGraceTimeout(500);
    268  MOZ_POP_THREAD_SAFETY
    269 
    270  nsCOMPtr<nsIObserverService> obsSvc = mozilla::services::GetObserverService();
    271  if (obsSvc) obsSvc->AddObserver(this, "xpcom-shutdown-threads", false);
    272  return NS_OK;
    273 }
    274 
    275 NS_IMPL_ISUPPORTS(nsStreamTransportService, nsIStreamTransportService,
    276                  nsIEventTarget, nsIObserver)
    277 
    278 NS_IMETHODIMP
    279 nsStreamTransportService::DispatchFromScript(nsIRunnable* task,
    280                                             DispatchFlags flags) {
    281  return Dispatch(do_AddRef(task), flags);
    282 }
    283 
    284 NS_IMETHODIMP
    285 nsStreamTransportService::Dispatch(already_AddRefed<nsIRunnable> task,
    286                                   DispatchFlags flags) {
    287  // NOTE: To maintain existing behaviour, we never leak task on error, even if
    288  // NS_DISPATCH_FALLIBLE is not specified.
    289  nsCOMPtr<nsIRunnable> event(task);  // so it gets released on failure paths
    290  nsCOMPtr<nsIThreadPool> pool;
    291  {
    292    mozilla::MutexAutoLock lock(mShutdownLock);
    293    if (mIsShutdown) {
    294      return NS_ERROR_NOT_INITIALIZED;
    295    }
    296    pool = mPool;
    297  }
    298  NS_ENSURE_TRUE(pool, NS_ERROR_NOT_INITIALIZED);
    299  return pool->Dispatch(event.forget(), flags);
    300 }
    301 
    302 NS_IMETHODIMP
    303 nsStreamTransportService::DelayedDispatch(already_AddRefed<nsIRunnable> aEvent,
    304                                          uint32_t aDelayMs) {
    305  return NS_ERROR_NOT_IMPLEMENTED;
    306 }
    307 
    308 NS_IMETHODIMP
    309 nsStreamTransportService::RegisterShutdownTask(nsITargetShutdownTask*) {
    310  return NS_ERROR_NOT_IMPLEMENTED;
    311 }
    312 
    313 NS_IMETHODIMP
    314 nsStreamTransportService::UnregisterShutdownTask(nsITargetShutdownTask*) {
    315  return NS_ERROR_NOT_IMPLEMENTED;
    316 }
    317 
    318 NS_IMETHODIMP_(bool)
    319 nsStreamTransportService::IsOnCurrentThreadInfallible() {
    320  nsCOMPtr<nsIThreadPool> pool;
    321  {
    322    mozilla::MutexAutoLock lock(mShutdownLock);
    323    pool = mPool;
    324  }
    325  if (!pool) {
    326    return false;
    327  }
    328  return pool->IsOnCurrentThread();
    329 }
    330 
    331 NS_IMETHODIMP
    332 nsStreamTransportService::IsOnCurrentThread(bool* result) {
    333  nsCOMPtr<nsIThreadPool> pool;
    334  {
    335    mozilla::MutexAutoLock lock(mShutdownLock);
    336    if (mIsShutdown) {
    337      return NS_ERROR_NOT_INITIALIZED;
    338    }
    339    pool = mPool;
    340  }
    341  NS_ENSURE_TRUE(pool, NS_ERROR_NOT_INITIALIZED);
    342  return pool->IsOnCurrentThread(result);
    343 }
    344 
    345 NS_IMETHODIMP
    346 nsStreamTransportService::CreateInputTransport(nsIInputStream* stream,
    347                                               bool closeWhenDone,
    348                                               nsITransport** result) {
    349  RefPtr<nsInputStreamTransport> trans =
    350      new nsInputStreamTransport(stream, closeWhenDone);
    351  trans.forget(result);
    352  return NS_OK;
    353 }
    354 
    355 NS_IMETHODIMP
    356 nsStreamTransportService::Observe(nsISupports* subject, const char* topic,
    357                                  const char16_t* data) {
    358  NS_ASSERTION(strcmp(topic, "xpcom-shutdown-threads") == 0, "oops");
    359 
    360  {
    361    nsCOMPtr<nsIThreadPool> pool;
    362    {
    363      mozilla::MutexAutoLock lock(mShutdownLock);
    364      mIsShutdown = true;
    365      pool = mPool.forget();
    366    }
    367 
    368    if (pool) {
    369      pool->Shutdown();
    370    }
    371  }
    372  return NS_OK;
    373 }
    374 
    375 }  // namespace net
    376 }  // namespace mozilla