tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

WebrtcTaskQueueWrapper.cpp (10358B)


      1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*-*/
      2 /* This Source Code Form is subject to the terms of the Mozilla Public
      3 * License, v. 2.0. If a copy of the MPL was not distributed with this file,
      4 * You can obtain one at http://mozilla.org/MPL/2.0/. */
      5 
      6 #include "WebrtcTaskQueueWrapper.h"
      7 
      8 #include "VideoUtils.h"
      9 #include "api/task_queue/task_queue_factory.h"
     10 #include "mozilla/TaskQueue.h"
     11 #include "nsThreadUtils.h"
     12 
     13 #ifdef MOZ_COLLECTING_RUNNABLE_TELEMETRY
     14 #  include "fmt/format.h"
     15 #endif
     16 
     17 namespace mozilla {
     18 #ifdef MOZ_COLLECTING_RUNNABLE_TELEMETRY
     19 class InvocableRunnable final : public nsIRunnable, public nsINamed {
     20  // Storage for caching the name returned from GetName().
     21  Maybe<nsAutoCString> mName;
     22 
     23  // These are used to construct mName.
     24  const char* const mTaskQueueName;
     25  const WebrtcLocation mLocation;
     26 
     27  absl::AnyInvocable<void() &&> mTask;
     28 
     29  ~InvocableRunnable() = default;
     30 
     31 public:
     32  NS_DECL_THREADSAFE_ISUPPORTS
     33 
     34  InvocableRunnable(const char* aTaskQueueName,
     35                    absl::AnyInvocable<void() &&>&& aTask,
     36                    const WebrtcLocation& aLocation)
     37      : mTaskQueueName(aTaskQueueName),
     38        mLocation(aLocation),
     39        mTask(std::move(aTask)) {}
     40 
     41  NS_IMETHOD Run() override {
     42    std::move(mTask)();
     43    return NS_OK;
     44  }
     45 
     46  NS_IMETHOD GetName(nsACString& aName) override {
     47    if (mName) {
     48      aName = *mName;
     49      return NS_OK;
     50    }
     51 
     52    Maybe<nsDependentCSubstring> fileName;
     53    if (mLocation.mFile) {
     54      nsDependentCString f(mLocation.mFile);
     55 #  ifdef XP_WIN
     56      // On Windows, path separators are inconsistent per
     57      // https://github.com/llvm/llvm-project/issues/45076.
     58      int32_t i = f.RFindCharInSet("/\\");
     59 #  else
     60      int32_t i = f.RFindChar('/');
     61 #  endif
     62      if (i == kNotFound) {
     63        fileName.emplace(f);
     64      } else {
     65        fileName.emplace(f, i + 1);
     66      }
     67    }
     68 
     69    mName.emplace();
     70    if (mLocation.mFunction && fileName && mLocation.mLine) {
     71      mName->AppendFmt(FMT_STRING("{} - {} ({}:{})"), mTaskQueueName,
     72                       mLocation.mFunction, *fileName, mLocation.mLine);
     73    } else if (fileName && mLocation.mLine) {
     74      mName->AppendFmt(FMT_STRING("{} - InvocableRunnable ({}:{})"),
     75                       mTaskQueueName, *fileName, mLocation.mLine);
     76    } else {
     77      mName->AppendFmt(FMT_STRING("{} - InvocableRunnable"), mTaskQueueName);
     78    }
     79    aName = *mName;
     80 
     81    return NS_OK;
     82  }
     83 };
     84 
     85 NS_IMPL_ISUPPORTS(InvocableRunnable, nsIRunnable, nsINamed)
     86 #endif
     87 
     88 enum class DeletionPolicy : uint8_t { Blocking, NonBlocking };
     89 
     90 /**
     91 * A wrapper around Mozilla TaskQueues in the shape of a libwebrtc TaskQueue.
     92 *
     93 * Allows libwebrtc to use Mozilla threads where tooling, e.g. profiling, is set
     94 * up and just works.
     95 *
     96 * Mozilla APIs like Runnables, MozPromise, etc. can also be used with the
     97 * wrapped TaskQueue to run things on the right thread when interacting with
     98 * libwebrtc.
     99 */
    100 template <DeletionPolicy Deletion>
    101 class WebrtcTaskQueueWrapper : public webrtc::TaskQueueBase {
    102 public:
    103  class TaskQueueObserver final : public TaskQueue::Observer {
    104   public:
    105    NS_INLINE_DECL_THREADSAFE_REFCOUNTING(TaskQueueObserver, override);
    106 
    107    template <typename Wrapper>
    108    explicit TaskQueueObserver(Wrapper aOwner)
    109        : mOwner(std::forward<Wrapper>(aOwner)) {}
    110 
    111    void WillProcessEvent(TaskQueue* aQueue) override {
    112      if constexpr (Deletion == DeletionPolicy::Blocking) {
    113        mCurrent.emplace(mOwner);
    114      } else {
    115        static_assert(Deletion == DeletionPolicy::NonBlocking);
    116        mCurrent.emplace(mOwner.get());
    117      }
    118    }
    119    void DidProcessEvent(TaskQueue* aQueue) override { mCurrent = Nothing(); }
    120 
    121   private:
    122    ~TaskQueueObserver() override = default;
    123    // If NonBlocking, a TaskQueue owns this observer, which owns mOwner, which
    124    // must be kept alive. There are no cycles.
    125    //
    126    // If Blocking, mOwner owns the TaskQueue, which owns us. mOwner is owned
    127    // externally. It must be a weak reference here, or we'd have a cycle.
    128    //
    129    // mOwner is safe because the underlying TaskQueue first finishes shutdown,
    130    // then the observer is destroyed, then the WebrtcTaskQueueWrapper is
    131    // destroyed. See the WebrtcTaskQueueWrawpper::Delete for more details.
    132    std::conditional_t<Deletion == DeletionPolicy::NonBlocking,
    133                       UniquePtr<WebrtcTaskQueueWrapper>,
    134                       WebrtcTaskQueueWrapper*> const mOwner;
    135    Maybe<CurrentTaskQueueSetter> mCurrent;
    136  };
    137 
    138 public:
    139  template <typename Target>
    140  WebrtcTaskQueueWrapper(Target aTaskQueue, const nsACString& aName)
    141      : mTaskQueue(std::forward<Target>(aTaskQueue)), mName(aName) {}
    142  ~WebrtcTaskQueueWrapper() = default;
    143 
    144  void Delete() override {
    145    if constexpr (Deletion == DeletionPolicy::Blocking) {
    146      MOZ_RELEASE_ASSERT(!mTaskQueue->IsOnCurrentThread());
    147      // Don't call into the task queue if non-blocking as it is in the middle
    148      // of its dtor. There'd be nothing to wait for anyway.
    149      mTaskQueue->BeginShutdown();
    150      mTaskQueue->AwaitShutdownAndIdle();
    151      mTaskQueue->SetObserver(nullptr);
    152    }
    153 
    154    delete this;
    155  }
    156 
    157  already_AddRefed<nsIRunnable> WrapInvocable(
    158      absl::AnyInvocable<void() &&>&& aTask, const WebrtcLocation& aLocation) {
    159 #ifdef MOZ_COLLECTING_RUNNABLE_TELEMETRY
    160    return MakeAndAddRef<InvocableRunnable>(mName.get(), std::move(aTask),
    161                                            aLocation);
    162 #else
    163    return NS_NewRunnableFunction(
    164        "InvocableRunnable",
    165        [task = std::move(aTask)]() mutable { std::move(task)(); });
    166 #endif
    167  }
    168 
    169  void PostTaskImpl(absl::AnyInvocable<void() &&> aTask,
    170                    const PostTaskTraits& aTraits,
    171                    const webrtc::Location& aLocation) override {
    172    if (NS_FAILED(mTaskQueue->Dispatch(
    173            WrapInvocable(std::move(aTask), aLocation),
    174            /* Pass NS_DISPATCH_FALLIBLE as documentation, but note
    175               that TaskQueue::Dispatch does not leak the task even
    176               with NS_DISPATCH_NORMAL, which is what
    177               DelayedDispatch below results in. */
    178            NS_DISPATCH_FALLIBLE))) {
    179      NS_WARNING(
    180          nsFmtCString(
    181              FMT_STRING(
    182                  "TaskQueue '{}' failed to dispatch a task from {} ({}:{})"),
    183              mName, aLocation.mFunction, aLocation.mFile, aLocation.mLine)
    184              .get());
    185    };
    186  }
    187 
    188  void PostDelayedTaskImpl(absl::AnyInvocable<void() &&> aTask,
    189                           webrtc::TimeDelta aDelay,
    190                           const PostDelayedTaskTraits& aTraits,
    191                           const webrtc::Location& aLocation) override {
    192    if (aDelay.ms() == 0) {
    193      // AbstractThread::DelayedDispatch doesn't support delay 0
    194      PostTaskImpl(std::move(aTask), PostTaskTraits{}, aLocation);
    195      return;
    196    }
    197    if (NS_FAILED(mTaskQueue->DelayedDispatch(
    198            WrapInvocable(std::move(aTask), aLocation), aDelay.ms()))) {
    199      NS_WARNING(nsFmtCString(FMT_STRING("TaskQueue '{}' failed to dispatch a "
    200                                         "delayed task from {} ({}:{})"),
    201                              mName, aLocation.mFunction, aLocation.mFile,
    202                              aLocation.mLine)
    203                     .get());
    204    }
    205  }
    206 
    207  // If Blocking, access is through WebrtcTaskQueueWrapper, which has to keep
    208  // mTaskQueue alive. If NonBlocking, mTaskQueue keeps WebrtcTaskQueueWrapper
    209  // alive through the observer. We must not create a cycle.
    210  const std::conditional_t<Deletion == DeletionPolicy::Blocking,
    211                           RefPtr<TaskQueue>, TaskQueue*>
    212      mTaskQueue;
    213  // Storage for mTaskQueue's null-terminated const char* name.
    214  const nsCString mName;
    215 };
    216 
    217 }  // namespace mozilla
    218 
    219 namespace std {
    220 template <mozilla::DeletionPolicy Deletion>
    221 struct default_delete<mozilla::WebrtcTaskQueueWrapper<Deletion>>
    222    : webrtc::TaskQueueDeleter {
    223  void operator()(mozilla::WebrtcTaskQueueWrapper<Deletion>* aPtr) const {
    224    webrtc::TaskQueueDeleter::operator()(aPtr);
    225  }
    226 };
    227 
    228 }  // namespace std
    229 
    230 namespace mozilla {
    231 
    232 std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>
    233 CreateWebrtcTaskQueue(already_AddRefed<nsIEventTarget> aTarget,
    234                      const nsACString& aName, bool aSupportsTailDispatch) {
    235  using Wrapper = WebrtcTaskQueueWrapper<DeletionPolicy::Blocking>;
    236  const auto& flat = PromiseFlatCString(aName);
    237  auto tq =
    238      TaskQueue::Create(std::move(aTarget), flat.get(), aSupportsTailDispatch);
    239  auto wrapper = MakeUnique<Wrapper>(std::move(tq), flat);
    240  auto observer = MakeRefPtr<Wrapper::TaskQueueObserver>(wrapper.get());
    241  wrapper->mTaskQueue->SetObserver(observer);
    242  return std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>(
    243      wrapper.release());
    244 }
    245 
    246 RefPtr<TaskQueue> CreateWebrtcTaskQueueWrapper(
    247    already_AddRefed<nsIEventTarget> aTarget, const nsACString& aName,
    248    bool aSupportsTailDispatch) {
    249  using Wrapper = WebrtcTaskQueueWrapper<DeletionPolicy::NonBlocking>;
    250  const auto& flat = PromiseFlatCString(aName);
    251  auto tq =
    252      TaskQueue::Create(std::move(aTarget), flat.get(), aSupportsTailDispatch);
    253  auto wrapper = MakeUnique<Wrapper>(tq.get(), flat);
    254  auto observer = MakeRefPtr<Wrapper::TaskQueueObserver>(std::move(wrapper));
    255  tq->SetObserver(observer);
    256  return tq;
    257 }
    258 
    259 UniquePtr<webrtc::TaskQueueFactory> CreateWebrtcTaskQueueFactory() {
    260  class SharedThreadPoolWebRtcTaskQueueFactory
    261      : public webrtc::TaskQueueFactory {
    262   public:
    263    std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>
    264    CreateTaskQueue(absl::string_view aName,
    265                    Priority aPriority) const override {
    266      // libwebrtc will dispatch some tasks sync, i.e., block the origin thread
    267      // until they've run, and that doesn't play nice with tail dispatching
    268      // since there will never be a tail. DeletionPolicy::Blocking because this
    269      // is for libwebrtc use and that's what they expect.
    270      constexpr bool supportTailDispatch = false;
    271      // XXX Do something with aPriority
    272      return CreateWebrtcTaskQueue(
    273          GetMediaThreadPool(MediaThreadType::WEBRTC_WORKER),
    274          nsDependentCSubstring(aName.data(), aName.size()),
    275          supportTailDispatch);
    276    }
    277  };
    278 
    279  return WrapUnique(new SharedThreadPoolWebRtcTaskQueueFactory);
    280 }
    281 
    282 }  // namespace mozilla