tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

ChannelEventQueue.h (13206B)


      1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*-
      2 * vim: set sw=2 ts=8 et tw=80 :
      3 */
      4 /* This Source Code Form is subject to the terms of the Mozilla Public
      5 * License, v. 2.0. If a copy of the MPL was not distributed with this
      6 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
      7 
      8 #ifndef mozilla_net_ChannelEventQueue_h
      9 #define mozilla_net_ChannelEventQueue_h
     10 
     11 #include "nsTArray.h"
     12 #include "nsIEventTarget.h"
     13 #include "nsThreadUtils.h"
     14 #include "nsXULAppAPI.h"
     15 #include "mozilla/DebugOnly.h"
     16 #include "mozilla/Mutex.h"
     17 #include "mozilla/RecursiveMutex.h"
     18 #include "mozilla/UniquePtr.h"
     19 #include "mozilla/FlowMarkers.h"
     20 
     21 class nsISupports;
     22 
     23 namespace mozilla {
     24 namespace net {
     25 
     26 class ChannelEvent {
     27 public:
     28  MOZ_COUNTED_DEFAULT_CTOR(ChannelEvent)
     29  MOZ_COUNTED_DTOR_VIRTUAL(ChannelEvent) virtual void Run() = 0;
     30  virtual already_AddRefed<nsIEventTarget> GetEventTarget() = 0;
     31 };
     32 
     33 // Note that MainThreadChannelEvent should not be used in child process since
     34 // GetEventTarget() directly returns an unlabeled event target.
     35 class MainThreadChannelEvent : public ChannelEvent {
     36 public:
     37  MOZ_COUNTED_DEFAULT_CTOR(MainThreadChannelEvent)
     38  MOZ_COUNTED_DTOR_OVERRIDE(MainThreadChannelEvent)
     39 
     40  already_AddRefed<nsIEventTarget> GetEventTarget() override {
     41    MOZ_ASSERT(XRE_IsParentProcess());
     42 
     43    return do_AddRef(GetMainThreadSerialEventTarget());
     44  }
     45 };
     46 
     47 class ChannelFunctionEvent : public ChannelEvent {
     48 public:
     49  ChannelFunctionEvent(
     50      std::function<already_AddRefed<nsIEventTarget>()>&& aGetEventTarget,
     51      std::function<void()>&& aCallback)
     52      : mGetEventTarget(std::move(aGetEventTarget)),
     53        mCallback(std::move(aCallback)) {}
     54 
     55  void Run() override { mCallback(); }
     56  already_AddRefed<nsIEventTarget> GetEventTarget() override {
     57    return mGetEventTarget();
     58  }
     59 
     60 private:
     61  const std::function<already_AddRefed<nsIEventTarget>()> mGetEventTarget;
     62  const std::function<void()> mCallback;
     63 };
     64 
     65 // UnsafePtr is a work-around our static analyzer that requires all
     66 // ref-counted objects to be captured in lambda via a RefPtr
     67 // The ChannelEventQueue makes it safe to capture "this" by pointer only.
     68 // This is required as work-around to prevent cycles until bug 1596295
     69 // is resolved.
     70 template <typename T>
     71 class UnsafePtr {
     72 public:
     73  explicit UnsafePtr(T* aPtr) : mPtr(aPtr) {}
     74 
     75  T& operator*() const { return *mPtr; }
     76  T* operator->() const {
     77    MOZ_ASSERT(mPtr, "dereferencing a null pointer");
     78    return mPtr;
     79  }
     80  operator T*() const& { return mPtr; }
     81  explicit operator bool() const { return mPtr != nullptr; }
     82 
     83 private:
     84  T* const mPtr;
     85 };
     86 
     87 class NeckoTargetChannelFunctionEvent : public ChannelFunctionEvent {
     88 public:
     89  template <typename T>
     90  NeckoTargetChannelFunctionEvent(T* aChild, std::function<void()>&& aCallback)
     91      : ChannelFunctionEvent(
     92            [child = UnsafePtr<T>(aChild)]() {
     93              MOZ_ASSERT(child);
     94              return child->GetNeckoTarget();
     95            },
     96            std::move(aCallback)) {}
     97 };
     98 
     99 // Workaround for Necko re-entrancy dangers. We buffer IPDL messages in a
    100 // queue if still dispatching previous one(s) to listeners/observers.
    101 // Otherwise synchronous XMLHttpRequests and/or other code that spins the
    102 // event loop (ex: IPDL rpc) could cause listener->OnDataAvailable (for
    103 // instance) to be dispatched and called before mListener->OnStartRequest has
    104 // completed.
    105 // The ChannelEventQueue implementation ensures strict ordering of
    106 // event execution across target threads.
    107 
    108 class ChannelEventQueue final {
    109  NS_INLINE_DECL_THREADSAFE_REFCOUNTING(ChannelEventQueue)
    110 
    111 public:
    112  explicit ChannelEventQueue(nsISupports* owner)
    113      : mSuspendCount(0),
    114        mSuspended(false),
    115        mForcedCount(0),
    116        mFlushing(false),
    117        mHasCheckedForAsyncXMLHttpRequest(false),
    118        mForAsyncXMLHttpRequest(false),
    119        mOwner(owner),
    120        mMutex("ChannelEventQueue::mMutex"),
    121        mRunningMutex("ChannelEventQueue::mRunningMutex") {}
    122 
    123  // Puts IPDL-generated channel event into queue, to be run later
    124  // automatically when EndForcedQueueing and/or Resume is called.
    125  //
    126  // @param aCallback - the ChannelEvent
    127  // @param aAssertionWhenNotQueued - this optional param will be used in an
    128  //   assertion when the event is executed directly.
    129  inline void RunOrEnqueue(ChannelEvent* aCallback,
    130                           bool aAssertionWhenNotQueued = false);
    131 
    132  // Append ChannelEvent in front of the event queue.
    133  inline void PrependEvent(UniquePtr<ChannelEvent>&& aEvent);
    134  inline void PrependEventInternal(UniquePtr<ChannelEvent>&& aEvent)
    135      MOZ_REQUIRES(mMutex);
    136  inline void PrependEvents(nsTArray<UniquePtr<ChannelEvent>>& aEvents);
    137 
    138  // After StartForcedQueueing is called, RunOrEnqueue() will start enqueuing
    139  // events that will be run/flushed when EndForcedQueueing is called.
    140  // - Note: queueing may still be required after EndForcedQueueing() (if the
    141  //   queue is suspended, etc):  always call RunOrEnqueue() to avoid race
    142  //   conditions.
    143  inline void StartForcedQueueing();
    144  inline void EndForcedQueueing();
    145 
    146  // Suspend/resume event queue.  RunOrEnqueue() will start enqueuing
    147  // events and they will be run/flushed when resume is called.  These should be
    148  // called when the channel owning the event queue is suspended/resumed.
    149  void Suspend();
    150  // Resume flushes the queue asynchronously, i.e. items in queue will be
    151  // dispatched in a new event on the current thread.
    152  void Resume();
    153 
    154  void NotifyReleasingOwner() {
    155    MutexAutoLock lock(mMutex);
    156    mOwner = nullptr;
    157  }
    158 
    159 #ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
    160  bool IsEmpty() {
    161    MutexAutoLock lock(mMutex);
    162    return mEventQueue.IsEmpty();
    163  }
    164 #endif
    165 
    166 private:
    167  // Private destructor, to discourage deletion outside of Release():
    168  ~ChannelEventQueue() = default;
    169 
    170  void SuspendInternal() MOZ_REQUIRES(mMutex);
    171  void ResumeInternal() MOZ_REQUIRES(mMutex);
    172 
    173  bool MaybeSuspendIfEventsAreSuppressed() MOZ_REQUIRES(mMutex);
    174 
    175  inline void MaybeFlushQueue() MOZ_REQUIRES(mMutex);
    176  void FlushQueue() MOZ_REQUIRES(mMutex);
    177  inline void CompleteResume();
    178 
    179  ChannelEvent* TakeEvent();
    180 
    181  nsTArray<UniquePtr<ChannelEvent>> mEventQueue MOZ_GUARDED_BY(mMutex);
    182 
    183  uint32_t mSuspendCount MOZ_GUARDED_BY(mMutex);
    184  bool mSuspended MOZ_GUARDED_BY(mMutex);
    185  uint32_t mForcedCount  // Support ForcedQueueing on multiple thread.
    186      MOZ_GUARDED_BY(mMutex);
    187  bool mFlushing MOZ_GUARDED_BY(mMutex);
    188 
    189  // Whether the queue is associated with an ssync XHR.
    190  // This is lazily instantiated the first time it is needed.
    191  // These are MainThread-only.
    192  bool mHasCheckedForAsyncXMLHttpRequest;
    193  bool mForAsyncXMLHttpRequest;
    194 
    195  // Keep ptr to avoid refcount cycle: only grab ref during flushing.
    196  nsISupports* mOwner MOZ_GUARDED_BY(mMutex);
    197 
    198  // For atomic mEventQueue operation and state update
    199  Mutex mMutex;
    200 
    201  // To guarantee event execution order among threads
    202  RecursiveMutex mRunningMutex MOZ_ACQUIRED_BEFORE(mMutex);
    203 
    204  friend class AutoEventEnqueuer;
    205 };
    206 
    207 inline void ChannelEventQueue::RunOrEnqueue(ChannelEvent* aCallback,
    208                                            bool aAssertionWhenNotQueued) {
    209  MOZ_ASSERT(aCallback);
    210  // Events execution could be a destruction of the channel (and our own
    211  // destructor) unless we make sure its refcount doesn't drop to 0 while this
    212  // method is running.
    213  nsCOMPtr<nsISupports> kungFuDeathGrip;
    214 
    215  // To avoid leaks.
    216  UniquePtr<ChannelEvent> event(aCallback);
    217 
    218  // To guarantee that the running event and all the events generated within
    219  // it will be finished before events on other threads.
    220  RecursiveMutexAutoLock lock(mRunningMutex);
    221  {
    222    MutexAutoLock lock(mMutex);
    223    kungFuDeathGrip = mOwner;  // must be under the lock
    224 
    225    bool enqueue = !!mForcedCount || mSuspended || mFlushing ||
    226                   !mEventQueue.IsEmpty() ||
    227                   MaybeSuspendIfEventsAreSuppressed();
    228    // To ensure strict ordering of events across multiple threads we buffer the
    229    // events for the below cases:
    230    // a. event queuing is forced by AutoEventEnqueuer
    231    // b. event queue is suspended
    232    // c. an event is currently flushed/executed from the queue
    233    // d. queue is non-empty (pending events on remote thread targets)
    234    if (enqueue) {
    235      PROFILER_MARKER("ChannelEventQueue::Enqueue", NETWORK, {}, FlowMarker,
    236                      Flow::FromPointer(event.get()));
    237      mEventQueue.AppendElement(std::move(event));
    238      return;
    239    }
    240 
    241    nsCOMPtr<nsIEventTarget> target = event->GetEventTarget();
    242    MOZ_ASSERT(target);
    243 
    244    bool isCurrentThread = false;
    245    DebugOnly<nsresult> rv = target->IsOnCurrentThread(&isCurrentThread);
    246    MOZ_ASSERT(NS_SUCCEEDED(rv));
    247 
    248    if (!isCurrentThread) {
    249      // Leverage Suspend/Resume mechanism to trigger flush procedure without
    250      // creating a new one.
    251      // The execution of further events in the queue is blocked until the
    252      // target thread completes the execution of this event.
    253      // A callback is dispatched to the target thread to flush events from the
    254      // queue. This is done
    255      // by ResumeInternal which dispatches a runnable
    256      // (CompleteResumeRunnable) to the target thread. The target thread will
    257      // call CompleteResume to flush the queue. All the events are run
    258      // synchronously in their respective target threads.
    259      SuspendInternal();
    260 
    261      PROFILER_MARKER("ChannelEventQueue::Enqueue", NETWORK, {}, FlowMarker,
    262                      Flow::FromPointer(event.get()));
    263 
    264      mEventQueue.AppendElement(std::move(event));
    265      ResumeInternal();
    266      return;
    267    }
    268  }
    269 
    270  MOZ_RELEASE_ASSERT(!aAssertionWhenNotQueued);
    271  AUTO_PROFILER_TERMINATING_FLOW_MARKER("ChannelEvent", OTHER,
    272                                        Flow::FromPointer(event.get()));
    273  // execute the event synchronously if we are not queuing it and
    274  // the target thread is the current thread
    275  event->Run();
    276 }
    277 
    278 inline void ChannelEventQueue::StartForcedQueueing() {
    279  MutexAutoLock lock(mMutex);
    280  ++mForcedCount;
    281 }
    282 
    283 inline void ChannelEventQueue::EndForcedQueueing() {
    284  MutexAutoLock lock(mMutex);
    285  MOZ_ASSERT(mForcedCount > 0);
    286  if (!--mForcedCount) {
    287    MaybeFlushQueue();
    288  }
    289 }
    290 
    291 inline void ChannelEventQueue::PrependEvent(UniquePtr<ChannelEvent>&& aEvent) {
    292  MutexAutoLock lock(mMutex);
    293  PrependEventInternal(std::move(aEvent));
    294 }
    295 
    296 inline void ChannelEventQueue::PrependEventInternal(
    297    UniquePtr<ChannelEvent>&& aEvent) {
    298  mMutex.AssertCurrentThreadOwns();
    299 
    300  // Prepending event while no queue flush foreseen might cause the following
    301  // channel events not run. This assertion here guarantee there must be a
    302  // queue flush, either triggered by Resume or EndForcedQueueing, to execute
    303  // the added event.
    304  MOZ_ASSERT(mSuspended || !!mForcedCount);
    305 
    306  mEventQueue.InsertElementAt(0, std::move(aEvent));
    307 }
    308 
    309 inline void ChannelEventQueue::PrependEvents(
    310    nsTArray<UniquePtr<ChannelEvent>>& aEvents) {
    311  MutexAutoLock lock(mMutex);
    312 
    313  // Prepending event while no queue flush foreseen might cause the following
    314  // channel events not run. This assertion here guarantee there must be a
    315  // queue flush, either triggered by Resume or EndForcedQueueing, to execute
    316  // the added events.
    317  MOZ_ASSERT(mSuspended || !!mForcedCount);
    318 
    319  mEventQueue.InsertElementsAt(0, aEvents.Length());
    320 
    321  for (uint32_t i = 0; i < aEvents.Length(); i++) {
    322    mEventQueue[i] = std::move(aEvents[i]);
    323  }
    324 }
    325 
    326 inline void ChannelEventQueue::CompleteResume() {
    327  MutexAutoLock lock(mMutex);
    328 
    329  // channel may have been suspended again since Resume fired event to call
    330  // this.
    331  if (!mSuspendCount) {
    332    // we need to remain logically suspended (for purposes of queuing incoming
    333    // messages) until this point, else new incoming messages could run before
    334    // queued ones.
    335    mSuspended = false;
    336    MaybeFlushQueue();
    337  }
    338 }
    339 
    340 inline void ChannelEventQueue::MaybeFlushQueue() {
    341  mMutex.AssertCurrentThreadOwns();
    342  // Don't flush if forced queuing on, we're already being flushed, or
    343  // suspended, or there's nothing to flush
    344  bool flushQueue = !mForcedCount && !mFlushing && !mSuspended &&
    345                    !mEventQueue.IsEmpty() &&
    346                    !MaybeSuspendIfEventsAreSuppressed();
    347 
    348  // Only one thread is allowed to run FlushQueue at a time.
    349  if (flushQueue) {
    350    mFlushing = true;
    351    FlushQueue();
    352  }
    353 }
    354 
    355 // Ensures that RunOrEnqueue() will be collecting events during its lifetime
    356 // (letting caller know incoming IPDL msgs should be queued). Flushes the
    357 // queue when it goes out of scope.
    358 class MOZ_STACK_CLASS AutoEventEnqueuer {
    359 public:
    360  explicit AutoEventEnqueuer(ChannelEventQueue* queue) : mEventQueue(queue) {
    361    {
    362      // Probably not actually needed, since NotifyReleasingOwner should
    363      // only happen after this, but safer to take it in case things change
    364      MutexAutoLock lock(queue->mMutex);
    365      mOwner = queue->mOwner;
    366    }
    367    mEventQueue->StartForcedQueueing();
    368  }
    369  ~AutoEventEnqueuer() { mEventQueue->EndForcedQueueing(); }
    370 
    371 private:
    372  RefPtr<ChannelEventQueue> mEventQueue;
    373  // Ensure channel object lives longer than ChannelEventQueue.
    374  nsCOMPtr<nsISupports> mOwner;
    375 };
    376 
    377 }  // namespace net
    378 }  // namespace mozilla
    379 
    380 #endif