WebrtcTaskQueueWrapper.cpp (10358B)
1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*-*/ 2 /* This Source Code Form is subject to the terms of the Mozilla Public 3 * License, v. 2.0. If a copy of the MPL was not distributed with this file, 4 * You can obtain one at http://mozilla.org/MPL/2.0/. */ 5 6 #include "WebrtcTaskQueueWrapper.h" 7 8 #include "VideoUtils.h" 9 #include "api/task_queue/task_queue_factory.h" 10 #include "mozilla/TaskQueue.h" 11 #include "nsThreadUtils.h" 12 13 #ifdef MOZ_COLLECTING_RUNNABLE_TELEMETRY 14 # include "fmt/format.h" 15 #endif 16 17 namespace mozilla { 18 #ifdef MOZ_COLLECTING_RUNNABLE_TELEMETRY 19 class InvocableRunnable final : public nsIRunnable, public nsINamed { 20 // Storage for caching the name returned from GetName(). 21 Maybe<nsAutoCString> mName; 22 23 // These are used to construct mName. 24 const char* const mTaskQueueName; 25 const WebrtcLocation mLocation; 26 27 absl::AnyInvocable<void() &&> mTask; 28 29 ~InvocableRunnable() = default; 30 31 public: 32 NS_DECL_THREADSAFE_ISUPPORTS 33 34 InvocableRunnable(const char* aTaskQueueName, 35 absl::AnyInvocable<void() &&>&& aTask, 36 const WebrtcLocation& aLocation) 37 : mTaskQueueName(aTaskQueueName), 38 mLocation(aLocation), 39 mTask(std::move(aTask)) {} 40 41 NS_IMETHOD Run() override { 42 std::move(mTask)(); 43 return NS_OK; 44 } 45 46 NS_IMETHOD GetName(nsACString& aName) override { 47 if (mName) { 48 aName = *mName; 49 return NS_OK; 50 } 51 52 Maybe<nsDependentCSubstring> fileName; 53 if (mLocation.mFile) { 54 nsDependentCString f(mLocation.mFile); 55 # ifdef XP_WIN 56 // On Windows, path separators are inconsistent per 57 // https://github.com/llvm/llvm-project/issues/45076. 58 int32_t i = f.RFindCharInSet("/\\"); 59 # else 60 int32_t i = f.RFindChar('/'); 61 # endif 62 if (i == kNotFound) { 63 fileName.emplace(f); 64 } else { 65 fileName.emplace(f, i + 1); 66 } 67 } 68 69 mName.emplace(); 70 if (mLocation.mFunction && fileName && mLocation.mLine) { 71 mName->AppendFmt(FMT_STRING("{} - {} ({}:{})"), mTaskQueueName, 72 mLocation.mFunction, *fileName, mLocation.mLine); 73 } else if (fileName && mLocation.mLine) { 74 mName->AppendFmt(FMT_STRING("{} - InvocableRunnable ({}:{})"), 75 mTaskQueueName, *fileName, mLocation.mLine); 76 } else { 77 mName->AppendFmt(FMT_STRING("{} - InvocableRunnable"), mTaskQueueName); 78 } 79 aName = *mName; 80 81 return NS_OK; 82 } 83 }; 84 85 NS_IMPL_ISUPPORTS(InvocableRunnable, nsIRunnable, nsINamed) 86 #endif 87 88 enum class DeletionPolicy : uint8_t { Blocking, NonBlocking }; 89 90 /** 91 * A wrapper around Mozilla TaskQueues in the shape of a libwebrtc TaskQueue. 92 * 93 * Allows libwebrtc to use Mozilla threads where tooling, e.g. profiling, is set 94 * up and just works. 95 * 96 * Mozilla APIs like Runnables, MozPromise, etc. can also be used with the 97 * wrapped TaskQueue to run things on the right thread when interacting with 98 * libwebrtc. 99 */ 100 template <DeletionPolicy Deletion> 101 class WebrtcTaskQueueWrapper : public webrtc::TaskQueueBase { 102 public: 103 class TaskQueueObserver final : public TaskQueue::Observer { 104 public: 105 NS_INLINE_DECL_THREADSAFE_REFCOUNTING(TaskQueueObserver, override); 106 107 template <typename Wrapper> 108 explicit TaskQueueObserver(Wrapper aOwner) 109 : mOwner(std::forward<Wrapper>(aOwner)) {} 110 111 void WillProcessEvent(TaskQueue* aQueue) override { 112 if constexpr (Deletion == DeletionPolicy::Blocking) { 113 mCurrent.emplace(mOwner); 114 } else { 115 static_assert(Deletion == DeletionPolicy::NonBlocking); 116 mCurrent.emplace(mOwner.get()); 117 } 118 } 119 void DidProcessEvent(TaskQueue* aQueue) override { mCurrent = Nothing(); } 120 121 private: 122 ~TaskQueueObserver() override = default; 123 // If NonBlocking, a TaskQueue owns this observer, which owns mOwner, which 124 // must be kept alive. There are no cycles. 125 // 126 // If Blocking, mOwner owns the TaskQueue, which owns us. mOwner is owned 127 // externally. It must be a weak reference here, or we'd have a cycle. 128 // 129 // mOwner is safe because the underlying TaskQueue first finishes shutdown, 130 // then the observer is destroyed, then the WebrtcTaskQueueWrapper is 131 // destroyed. See the WebrtcTaskQueueWrawpper::Delete for more details. 132 std::conditional_t<Deletion == DeletionPolicy::NonBlocking, 133 UniquePtr<WebrtcTaskQueueWrapper>, 134 WebrtcTaskQueueWrapper*> const mOwner; 135 Maybe<CurrentTaskQueueSetter> mCurrent; 136 }; 137 138 public: 139 template <typename Target> 140 WebrtcTaskQueueWrapper(Target aTaskQueue, const nsACString& aName) 141 : mTaskQueue(std::forward<Target>(aTaskQueue)), mName(aName) {} 142 ~WebrtcTaskQueueWrapper() = default; 143 144 void Delete() override { 145 if constexpr (Deletion == DeletionPolicy::Blocking) { 146 MOZ_RELEASE_ASSERT(!mTaskQueue->IsOnCurrentThread()); 147 // Don't call into the task queue if non-blocking as it is in the middle 148 // of its dtor. There'd be nothing to wait for anyway. 149 mTaskQueue->BeginShutdown(); 150 mTaskQueue->AwaitShutdownAndIdle(); 151 mTaskQueue->SetObserver(nullptr); 152 } 153 154 delete this; 155 } 156 157 already_AddRefed<nsIRunnable> WrapInvocable( 158 absl::AnyInvocable<void() &&>&& aTask, const WebrtcLocation& aLocation) { 159 #ifdef MOZ_COLLECTING_RUNNABLE_TELEMETRY 160 return MakeAndAddRef<InvocableRunnable>(mName.get(), std::move(aTask), 161 aLocation); 162 #else 163 return NS_NewRunnableFunction( 164 "InvocableRunnable", 165 [task = std::move(aTask)]() mutable { std::move(task)(); }); 166 #endif 167 } 168 169 void PostTaskImpl(absl::AnyInvocable<void() &&> aTask, 170 const PostTaskTraits& aTraits, 171 const webrtc::Location& aLocation) override { 172 if (NS_FAILED(mTaskQueue->Dispatch( 173 WrapInvocable(std::move(aTask), aLocation), 174 /* Pass NS_DISPATCH_FALLIBLE as documentation, but note 175 that TaskQueue::Dispatch does not leak the task even 176 with NS_DISPATCH_NORMAL, which is what 177 DelayedDispatch below results in. */ 178 NS_DISPATCH_FALLIBLE))) { 179 NS_WARNING( 180 nsFmtCString( 181 FMT_STRING( 182 "TaskQueue '{}' failed to dispatch a task from {} ({}:{})"), 183 mName, aLocation.mFunction, aLocation.mFile, aLocation.mLine) 184 .get()); 185 }; 186 } 187 188 void PostDelayedTaskImpl(absl::AnyInvocable<void() &&> aTask, 189 webrtc::TimeDelta aDelay, 190 const PostDelayedTaskTraits& aTraits, 191 const webrtc::Location& aLocation) override { 192 if (aDelay.ms() == 0) { 193 // AbstractThread::DelayedDispatch doesn't support delay 0 194 PostTaskImpl(std::move(aTask), PostTaskTraits{}, aLocation); 195 return; 196 } 197 if (NS_FAILED(mTaskQueue->DelayedDispatch( 198 WrapInvocable(std::move(aTask), aLocation), aDelay.ms()))) { 199 NS_WARNING(nsFmtCString(FMT_STRING("TaskQueue '{}' failed to dispatch a " 200 "delayed task from {} ({}:{})"), 201 mName, aLocation.mFunction, aLocation.mFile, 202 aLocation.mLine) 203 .get()); 204 } 205 } 206 207 // If Blocking, access is through WebrtcTaskQueueWrapper, which has to keep 208 // mTaskQueue alive. If NonBlocking, mTaskQueue keeps WebrtcTaskQueueWrapper 209 // alive through the observer. We must not create a cycle. 210 const std::conditional_t<Deletion == DeletionPolicy::Blocking, 211 RefPtr<TaskQueue>, TaskQueue*> 212 mTaskQueue; 213 // Storage for mTaskQueue's null-terminated const char* name. 214 const nsCString mName; 215 }; 216 217 } // namespace mozilla 218 219 namespace std { 220 template <mozilla::DeletionPolicy Deletion> 221 struct default_delete<mozilla::WebrtcTaskQueueWrapper<Deletion>> 222 : webrtc::TaskQueueDeleter { 223 void operator()(mozilla::WebrtcTaskQueueWrapper<Deletion>* aPtr) const { 224 webrtc::TaskQueueDeleter::operator()(aPtr); 225 } 226 }; 227 228 } // namespace std 229 230 namespace mozilla { 231 232 std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter> 233 CreateWebrtcTaskQueue(already_AddRefed<nsIEventTarget> aTarget, 234 const nsACString& aName, bool aSupportsTailDispatch) { 235 using Wrapper = WebrtcTaskQueueWrapper<DeletionPolicy::Blocking>; 236 const auto& flat = PromiseFlatCString(aName); 237 auto tq = 238 TaskQueue::Create(std::move(aTarget), flat.get(), aSupportsTailDispatch); 239 auto wrapper = MakeUnique<Wrapper>(std::move(tq), flat); 240 auto observer = MakeRefPtr<Wrapper::TaskQueueObserver>(wrapper.get()); 241 wrapper->mTaskQueue->SetObserver(observer); 242 return std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>( 243 wrapper.release()); 244 } 245 246 RefPtr<TaskQueue> CreateWebrtcTaskQueueWrapper( 247 already_AddRefed<nsIEventTarget> aTarget, const nsACString& aName, 248 bool aSupportsTailDispatch) { 249 using Wrapper = WebrtcTaskQueueWrapper<DeletionPolicy::NonBlocking>; 250 const auto& flat = PromiseFlatCString(aName); 251 auto tq = 252 TaskQueue::Create(std::move(aTarget), flat.get(), aSupportsTailDispatch); 253 auto wrapper = MakeUnique<Wrapper>(tq.get(), flat); 254 auto observer = MakeRefPtr<Wrapper::TaskQueueObserver>(std::move(wrapper)); 255 tq->SetObserver(observer); 256 return tq; 257 } 258 259 UniquePtr<webrtc::TaskQueueFactory> CreateWebrtcTaskQueueFactory() { 260 class SharedThreadPoolWebRtcTaskQueueFactory 261 : public webrtc::TaskQueueFactory { 262 public: 263 std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter> 264 CreateTaskQueue(absl::string_view aName, 265 Priority aPriority) const override { 266 // libwebrtc will dispatch some tasks sync, i.e., block the origin thread 267 // until they've run, and that doesn't play nice with tail dispatching 268 // since there will never be a tail. DeletionPolicy::Blocking because this 269 // is for libwebrtc use and that's what they expect. 270 constexpr bool supportTailDispatch = false; 271 // XXX Do something with aPriority 272 return CreateWebrtcTaskQueue( 273 GetMediaThreadPool(MediaThreadType::WEBRTC_WORKER), 274 nsDependentCSubstring(aName.data(), aName.size()), 275 supportTailDispatch); 276 } 277 }; 278 279 return WrapUnique(new SharedThreadPoolWebRtcTaskQueueFactory); 280 } 281 282 } // namespace mozilla