ChannelEventQueue.h (13206B)
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- 2 * vim: set sw=2 ts=8 et tw=80 : 3 */ 4 /* This Source Code Form is subject to the terms of the Mozilla Public 5 * License, v. 2.0. If a copy of the MPL was not distributed with this 6 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 7 8 #ifndef mozilla_net_ChannelEventQueue_h 9 #define mozilla_net_ChannelEventQueue_h 10 11 #include "nsTArray.h" 12 #include "nsIEventTarget.h" 13 #include "nsThreadUtils.h" 14 #include "nsXULAppAPI.h" 15 #include "mozilla/DebugOnly.h" 16 #include "mozilla/Mutex.h" 17 #include "mozilla/RecursiveMutex.h" 18 #include "mozilla/UniquePtr.h" 19 #include "mozilla/FlowMarkers.h" 20 21 class nsISupports; 22 23 namespace mozilla { 24 namespace net { 25 26 class ChannelEvent { 27 public: 28 MOZ_COUNTED_DEFAULT_CTOR(ChannelEvent) 29 MOZ_COUNTED_DTOR_VIRTUAL(ChannelEvent) virtual void Run() = 0; 30 virtual already_AddRefed<nsIEventTarget> GetEventTarget() = 0; 31 }; 32 33 // Note that MainThreadChannelEvent should not be used in child process since 34 // GetEventTarget() directly returns an unlabeled event target. 35 class MainThreadChannelEvent : public ChannelEvent { 36 public: 37 MOZ_COUNTED_DEFAULT_CTOR(MainThreadChannelEvent) 38 MOZ_COUNTED_DTOR_OVERRIDE(MainThreadChannelEvent) 39 40 already_AddRefed<nsIEventTarget> GetEventTarget() override { 41 MOZ_ASSERT(XRE_IsParentProcess()); 42 43 return do_AddRef(GetMainThreadSerialEventTarget()); 44 } 45 }; 46 47 class ChannelFunctionEvent : public ChannelEvent { 48 public: 49 ChannelFunctionEvent( 50 std::function<already_AddRefed<nsIEventTarget>()>&& aGetEventTarget, 51 std::function<void()>&& aCallback) 52 : mGetEventTarget(std::move(aGetEventTarget)), 53 mCallback(std::move(aCallback)) {} 54 55 void Run() override { mCallback(); } 56 already_AddRefed<nsIEventTarget> GetEventTarget() override { 57 return mGetEventTarget(); 58 } 59 60 private: 61 const std::function<already_AddRefed<nsIEventTarget>()> mGetEventTarget; 62 const std::function<void()> mCallback; 63 }; 64 65 // UnsafePtr is a work-around our static analyzer that requires all 66 // ref-counted objects to be captured in lambda via a RefPtr 67 // The ChannelEventQueue makes it safe to capture "this" by pointer only. 68 // This is required as work-around to prevent cycles until bug 1596295 69 // is resolved. 70 template <typename T> 71 class UnsafePtr { 72 public: 73 explicit UnsafePtr(T* aPtr) : mPtr(aPtr) {} 74 75 T& operator*() const { return *mPtr; } 76 T* operator->() const { 77 MOZ_ASSERT(mPtr, "dereferencing a null pointer"); 78 return mPtr; 79 } 80 operator T*() const& { return mPtr; } 81 explicit operator bool() const { return mPtr != nullptr; } 82 83 private: 84 T* const mPtr; 85 }; 86 87 class NeckoTargetChannelFunctionEvent : public ChannelFunctionEvent { 88 public: 89 template <typename T> 90 NeckoTargetChannelFunctionEvent(T* aChild, std::function<void()>&& aCallback) 91 : ChannelFunctionEvent( 92 [child = UnsafePtr<T>(aChild)]() { 93 MOZ_ASSERT(child); 94 return child->GetNeckoTarget(); 95 }, 96 std::move(aCallback)) {} 97 }; 98 99 // Workaround for Necko re-entrancy dangers. We buffer IPDL messages in a 100 // queue if still dispatching previous one(s) to listeners/observers. 101 // Otherwise synchronous XMLHttpRequests and/or other code that spins the 102 // event loop (ex: IPDL rpc) could cause listener->OnDataAvailable (for 103 // instance) to be dispatched and called before mListener->OnStartRequest has 104 // completed. 105 // The ChannelEventQueue implementation ensures strict ordering of 106 // event execution across target threads. 107 108 class ChannelEventQueue final { 109 NS_INLINE_DECL_THREADSAFE_REFCOUNTING(ChannelEventQueue) 110 111 public: 112 explicit ChannelEventQueue(nsISupports* owner) 113 : mSuspendCount(0), 114 mSuspended(false), 115 mForcedCount(0), 116 mFlushing(false), 117 mHasCheckedForAsyncXMLHttpRequest(false), 118 mForAsyncXMLHttpRequest(false), 119 mOwner(owner), 120 mMutex("ChannelEventQueue::mMutex"), 121 mRunningMutex("ChannelEventQueue::mRunningMutex") {} 122 123 // Puts IPDL-generated channel event into queue, to be run later 124 // automatically when EndForcedQueueing and/or Resume is called. 125 // 126 // @param aCallback - the ChannelEvent 127 // @param aAssertionWhenNotQueued - this optional param will be used in an 128 // assertion when the event is executed directly. 129 inline void RunOrEnqueue(ChannelEvent* aCallback, 130 bool aAssertionWhenNotQueued = false); 131 132 // Append ChannelEvent in front of the event queue. 133 inline void PrependEvent(UniquePtr<ChannelEvent>&& aEvent); 134 inline void PrependEventInternal(UniquePtr<ChannelEvent>&& aEvent) 135 MOZ_REQUIRES(mMutex); 136 inline void PrependEvents(nsTArray<UniquePtr<ChannelEvent>>& aEvents); 137 138 // After StartForcedQueueing is called, RunOrEnqueue() will start enqueuing 139 // events that will be run/flushed when EndForcedQueueing is called. 140 // - Note: queueing may still be required after EndForcedQueueing() (if the 141 // queue is suspended, etc): always call RunOrEnqueue() to avoid race 142 // conditions. 143 inline void StartForcedQueueing(); 144 inline void EndForcedQueueing(); 145 146 // Suspend/resume event queue. RunOrEnqueue() will start enqueuing 147 // events and they will be run/flushed when resume is called. These should be 148 // called when the channel owning the event queue is suspended/resumed. 149 void Suspend(); 150 // Resume flushes the queue asynchronously, i.e. items in queue will be 151 // dispatched in a new event on the current thread. 152 void Resume(); 153 154 void NotifyReleasingOwner() { 155 MutexAutoLock lock(mMutex); 156 mOwner = nullptr; 157 } 158 159 #ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED 160 bool IsEmpty() { 161 MutexAutoLock lock(mMutex); 162 return mEventQueue.IsEmpty(); 163 } 164 #endif 165 166 private: 167 // Private destructor, to discourage deletion outside of Release(): 168 ~ChannelEventQueue() = default; 169 170 void SuspendInternal() MOZ_REQUIRES(mMutex); 171 void ResumeInternal() MOZ_REQUIRES(mMutex); 172 173 bool MaybeSuspendIfEventsAreSuppressed() MOZ_REQUIRES(mMutex); 174 175 inline void MaybeFlushQueue() MOZ_REQUIRES(mMutex); 176 void FlushQueue() MOZ_REQUIRES(mMutex); 177 inline void CompleteResume(); 178 179 ChannelEvent* TakeEvent(); 180 181 nsTArray<UniquePtr<ChannelEvent>> mEventQueue MOZ_GUARDED_BY(mMutex); 182 183 uint32_t mSuspendCount MOZ_GUARDED_BY(mMutex); 184 bool mSuspended MOZ_GUARDED_BY(mMutex); 185 uint32_t mForcedCount // Support ForcedQueueing on multiple thread. 186 MOZ_GUARDED_BY(mMutex); 187 bool mFlushing MOZ_GUARDED_BY(mMutex); 188 189 // Whether the queue is associated with an ssync XHR. 190 // This is lazily instantiated the first time it is needed. 191 // These are MainThread-only. 192 bool mHasCheckedForAsyncXMLHttpRequest; 193 bool mForAsyncXMLHttpRequest; 194 195 // Keep ptr to avoid refcount cycle: only grab ref during flushing. 196 nsISupports* mOwner MOZ_GUARDED_BY(mMutex); 197 198 // For atomic mEventQueue operation and state update 199 Mutex mMutex; 200 201 // To guarantee event execution order among threads 202 RecursiveMutex mRunningMutex MOZ_ACQUIRED_BEFORE(mMutex); 203 204 friend class AutoEventEnqueuer; 205 }; 206 207 inline void ChannelEventQueue::RunOrEnqueue(ChannelEvent* aCallback, 208 bool aAssertionWhenNotQueued) { 209 MOZ_ASSERT(aCallback); 210 // Events execution could be a destruction of the channel (and our own 211 // destructor) unless we make sure its refcount doesn't drop to 0 while this 212 // method is running. 213 nsCOMPtr<nsISupports> kungFuDeathGrip; 214 215 // To avoid leaks. 216 UniquePtr<ChannelEvent> event(aCallback); 217 218 // To guarantee that the running event and all the events generated within 219 // it will be finished before events on other threads. 220 RecursiveMutexAutoLock lock(mRunningMutex); 221 { 222 MutexAutoLock lock(mMutex); 223 kungFuDeathGrip = mOwner; // must be under the lock 224 225 bool enqueue = !!mForcedCount || mSuspended || mFlushing || 226 !mEventQueue.IsEmpty() || 227 MaybeSuspendIfEventsAreSuppressed(); 228 // To ensure strict ordering of events across multiple threads we buffer the 229 // events for the below cases: 230 // a. event queuing is forced by AutoEventEnqueuer 231 // b. event queue is suspended 232 // c. an event is currently flushed/executed from the queue 233 // d. queue is non-empty (pending events on remote thread targets) 234 if (enqueue) { 235 PROFILER_MARKER("ChannelEventQueue::Enqueue", NETWORK, {}, FlowMarker, 236 Flow::FromPointer(event.get())); 237 mEventQueue.AppendElement(std::move(event)); 238 return; 239 } 240 241 nsCOMPtr<nsIEventTarget> target = event->GetEventTarget(); 242 MOZ_ASSERT(target); 243 244 bool isCurrentThread = false; 245 DebugOnly<nsresult> rv = target->IsOnCurrentThread(&isCurrentThread); 246 MOZ_ASSERT(NS_SUCCEEDED(rv)); 247 248 if (!isCurrentThread) { 249 // Leverage Suspend/Resume mechanism to trigger flush procedure without 250 // creating a new one. 251 // The execution of further events in the queue is blocked until the 252 // target thread completes the execution of this event. 253 // A callback is dispatched to the target thread to flush events from the 254 // queue. This is done 255 // by ResumeInternal which dispatches a runnable 256 // (CompleteResumeRunnable) to the target thread. The target thread will 257 // call CompleteResume to flush the queue. All the events are run 258 // synchronously in their respective target threads. 259 SuspendInternal(); 260 261 PROFILER_MARKER("ChannelEventQueue::Enqueue", NETWORK, {}, FlowMarker, 262 Flow::FromPointer(event.get())); 263 264 mEventQueue.AppendElement(std::move(event)); 265 ResumeInternal(); 266 return; 267 } 268 } 269 270 MOZ_RELEASE_ASSERT(!aAssertionWhenNotQueued); 271 AUTO_PROFILER_TERMINATING_FLOW_MARKER("ChannelEvent", OTHER, 272 Flow::FromPointer(event.get())); 273 // execute the event synchronously if we are not queuing it and 274 // the target thread is the current thread 275 event->Run(); 276 } 277 278 inline void ChannelEventQueue::StartForcedQueueing() { 279 MutexAutoLock lock(mMutex); 280 ++mForcedCount; 281 } 282 283 inline void ChannelEventQueue::EndForcedQueueing() { 284 MutexAutoLock lock(mMutex); 285 MOZ_ASSERT(mForcedCount > 0); 286 if (!--mForcedCount) { 287 MaybeFlushQueue(); 288 } 289 } 290 291 inline void ChannelEventQueue::PrependEvent(UniquePtr<ChannelEvent>&& aEvent) { 292 MutexAutoLock lock(mMutex); 293 PrependEventInternal(std::move(aEvent)); 294 } 295 296 inline void ChannelEventQueue::PrependEventInternal( 297 UniquePtr<ChannelEvent>&& aEvent) { 298 mMutex.AssertCurrentThreadOwns(); 299 300 // Prepending event while no queue flush foreseen might cause the following 301 // channel events not run. This assertion here guarantee there must be a 302 // queue flush, either triggered by Resume or EndForcedQueueing, to execute 303 // the added event. 304 MOZ_ASSERT(mSuspended || !!mForcedCount); 305 306 mEventQueue.InsertElementAt(0, std::move(aEvent)); 307 } 308 309 inline void ChannelEventQueue::PrependEvents( 310 nsTArray<UniquePtr<ChannelEvent>>& aEvents) { 311 MutexAutoLock lock(mMutex); 312 313 // Prepending event while no queue flush foreseen might cause the following 314 // channel events not run. This assertion here guarantee there must be a 315 // queue flush, either triggered by Resume or EndForcedQueueing, to execute 316 // the added events. 317 MOZ_ASSERT(mSuspended || !!mForcedCount); 318 319 mEventQueue.InsertElementsAt(0, aEvents.Length()); 320 321 for (uint32_t i = 0; i < aEvents.Length(); i++) { 322 mEventQueue[i] = std::move(aEvents[i]); 323 } 324 } 325 326 inline void ChannelEventQueue::CompleteResume() { 327 MutexAutoLock lock(mMutex); 328 329 // channel may have been suspended again since Resume fired event to call 330 // this. 331 if (!mSuspendCount) { 332 // we need to remain logically suspended (for purposes of queuing incoming 333 // messages) until this point, else new incoming messages could run before 334 // queued ones. 335 mSuspended = false; 336 MaybeFlushQueue(); 337 } 338 } 339 340 inline void ChannelEventQueue::MaybeFlushQueue() { 341 mMutex.AssertCurrentThreadOwns(); 342 // Don't flush if forced queuing on, we're already being flushed, or 343 // suspended, or there's nothing to flush 344 bool flushQueue = !mForcedCount && !mFlushing && !mSuspended && 345 !mEventQueue.IsEmpty() && 346 !MaybeSuspendIfEventsAreSuppressed(); 347 348 // Only one thread is allowed to run FlushQueue at a time. 349 if (flushQueue) { 350 mFlushing = true; 351 FlushQueue(); 352 } 353 } 354 355 // Ensures that RunOrEnqueue() will be collecting events during its lifetime 356 // (letting caller know incoming IPDL msgs should be queued). Flushes the 357 // queue when it goes out of scope. 358 class MOZ_STACK_CLASS AutoEventEnqueuer { 359 public: 360 explicit AutoEventEnqueuer(ChannelEventQueue* queue) : mEventQueue(queue) { 361 { 362 // Probably not actually needed, since NotifyReleasingOwner should 363 // only happen after this, but safer to take it in case things change 364 MutexAutoLock lock(queue->mMutex); 365 mOwner = queue->mOwner; 366 } 367 mEventQueue->StartForcedQueueing(); 368 } 369 ~AutoEventEnqueuer() { mEventQueue->EndForcedQueueing(); } 370 371 private: 372 RefPtr<ChannelEventQueue> mEventQueue; 373 // Ensure channel object lives longer than ChannelEventQueue. 374 nsCOMPtr<nsISupports> mOwner; 375 }; 376 377 } // namespace net 378 } // namespace mozilla 379 380 #endif