tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

thread_parallel_runner_internal.h (6305B)


      1 // Copyright (c) the JPEG XL Project Authors. All rights reserved.
      2 //
      3 // Use of this source code is governed by a BSD-style
      4 // license that can be found in the LICENSE file.
      5 //
      6 
      7 // C++ implementation using std::thread of a ::JxlParallelRunner.
      8 
      9 // The main class in this module, ThreadParallelRunner, implements a static
     10 // method ThreadParallelRunner::Runner than can be passed as a
     11 // JxlParallelRunner when using the JPEG XL library. This uses std::thread
     12 // internally and related synchronization functions. The number of threads
     13 // created is fixed at construction time and the threads are re-used for every
     14 // ThreadParallelRunner::Runner call. Only one concurrent Runner() call per
     15 // instance is allowed at a time.
     16 //
     17 // This is a scalable, lower-overhead thread pool runner, especially suitable
     18 // for data-parallel computations in the fork-join model, where clients need to
     19 // know when all tasks have completed.
     20 //
     21 // This thread pool can efficiently load-balance millions of tasks using an
     22 // atomic counter, thus avoiding per-task virtual or system calls. With 48
     23 // hyperthreads and 1M tasks that add to an atomic counter, overall runtime is
     24 // 10-20x higher when using std::async, and ~200x for a queue-based thread
     25 // pool.
     26 //
     27 // Usage:
     28 //   ThreadParallelRunner runner;
     29 //   JxlDecode(
     30 //       ... , &ThreadParallelRunner::Runner, static_cast<void*>(&runner));
     31 
     32 #ifndef LIB_THREADS_THREAD_PARALLEL_RUNNER_INTERNAL_H_
     33 #define LIB_THREADS_THREAD_PARALLEL_RUNNER_INTERNAL_H_
     34 
     35 #include <jxl/memory_manager.h>
     36 #include <jxl/parallel_runner.h>
     37 
     38 #include <atomic>
     39 #include <condition_variable>  //NOLINT
     40 #include <cstddef>
     41 #include <cstdint>
     42 #include <cstdlib>
     43 #include <mutex>               //NOLINT
     44 #include <thread>              //NOLINT
     45 #include <vector>
     46 
     47 namespace jpegxl {
     48 
     49 // Main helper class implementing the ::JxlParallelRunner interface.
     50 class ThreadParallelRunner {
     51 public:
     52  // ::JxlParallelRunner interface.
     53  static JxlParallelRetCode Runner(void* runner_opaque, void* jpegxl_opaque,
     54                                   JxlParallelRunInit init,
     55                                   JxlParallelRunFunction func,
     56                                   uint32_t start_range, uint32_t end_range);
     57 
     58  // Starts the given number of worker threads and blocks until they are ready.
     59  // "num_worker_threads" defaults to one per hyperthread. If zero, all tasks
     60  // run on the main thread.
     61  explicit ThreadParallelRunner(
     62      int num_worker_threads = std::thread::hardware_concurrency());
     63 
     64  // Waits for all threads to exit.
     65  ~ThreadParallelRunner();
     66 
     67  // Returns maximum number of main/worker threads that may call Func. Useful
     68  // for allocating per-thread storage.
     69  size_t NumThreads() const { return num_threads_; }
     70 
     71  // Runs func(thread, thread) on all thread(s) that may participate in Run.
     72  // If NumThreads() == 0, runs on the main thread with thread == 0, otherwise
     73  // concurrently called by each worker thread in [0, NumThreads()).
     74  template <class Func>
     75  void RunOnEachThread(const Func& func) {
     76    if (num_worker_threads_ == 0) {
     77      const int thread = 0;
     78      func(thread, thread);
     79      return;
     80    }
     81 
     82    data_func_ = reinterpret_cast<JxlParallelRunFunction>(&CallClosure<Func>);
     83    jpegxl_opaque_ = const_cast<void*>(static_cast<const void*>(&func));
     84    StartWorkers(kWorkerOnce);
     85    WorkersReadyBarrier();
     86  }
     87 
     88  JxlMemoryManager memory_manager;
     89 
     90 private:
     91  // After construction and between calls to Run, workers are "ready", i.e.
     92  // waiting on worker_start_cv_. They are "started" by sending a "command"
     93  // and notifying all worker_start_cv_ waiters. (That is why all workers
     94  // must be ready/waiting - otherwise, the notification will not reach all of
     95  // them and the main thread waits in vain for them to report readiness.)
     96  using WorkerCommand = uint64_t;
     97 
     98  // Special values; all others encode the begin/end parameters. Note that all
     99  // these are no-op ranges (begin >= end) and therefore never used to encode
    100  // ranges.
    101  static constexpr WorkerCommand kWorkerWait = ~1ULL;
    102  static constexpr WorkerCommand kWorkerOnce = ~2ULL;
    103  static constexpr WorkerCommand kWorkerExit = ~3ULL;
    104 
    105  // Calls f(task, thread). Used for type erasure of Func arguments. The
    106  // signature must match JxlParallelRunFunction, hence a void* argument.
    107  template <class Closure>
    108  static void CallClosure(void* f, const uint32_t task, const size_t thread) {
    109    (*reinterpret_cast<const Closure*>(f))(task, thread);
    110  }
    111 
    112  void WorkersReadyBarrier() {
    113    std::unique_lock<std::mutex> lock(mutex_);
    114    // Typically only a single iteration.
    115    while (workers_ready_ != threads_.size()) {
    116      workers_ready_cv_.wait(lock);
    117    }
    118    workers_ready_ = 0;
    119 
    120    // Safely handle spurious worker wakeups.
    121    worker_start_command_ = kWorkerWait;
    122  }
    123 
    124  // Precondition: all workers are ready.
    125  void StartWorkers(const WorkerCommand worker_command) {
    126    mutex_.lock();
    127    worker_start_command_ = worker_command;
    128    // Workers will need this lock, so release it before they wake up.
    129    mutex_.unlock();
    130    worker_start_cv_.notify_all();
    131  }
    132 
    133  // Attempts to reserve and perform some work from the global range of tasks,
    134  // which is encoded within "command". Returns after all tasks are reserved.
    135  static void RunRange(ThreadParallelRunner* self, WorkerCommand command,
    136                       int thread);
    137 
    138  static void ThreadFunc(ThreadParallelRunner* self, int thread);
    139 
    140  // Unmodified after ctor, but cannot be const because we call thread::join().
    141  std::vector<std::thread> threads_;
    142 
    143  const uint32_t num_worker_threads_;  // == threads_.size()
    144  const uint32_t num_threads_;
    145 
    146  std::atomic<int> depth_{0};  // detects if Run is re-entered (not supported).
    147 
    148  std::mutex mutex_;  // guards both cv and their variables.
    149  std::condition_variable workers_ready_cv_;
    150  uint32_t workers_ready_ = 0;
    151  std::condition_variable worker_start_cv_;
    152  WorkerCommand worker_start_command_;
    153 
    154  // Written by main thread, read by workers (after mutex lock/unlock).
    155  JxlParallelRunFunction data_func_;
    156  void* jpegxl_opaque_;
    157 
    158  // Updated by workers; padding avoids false sharing.
    159  uint8_t padding1[64];
    160  std::atomic<uint32_t> num_reserved_{0};
    161  uint8_t padding2[64];
    162 };
    163 
    164 }  // namespace jpegxl
    165 
    166 #endif  // LIB_THREADS_THREAD_PARALLEL_RUNNER_INTERNAL_H_