tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

thread_parallel_runner_internal.cc (5792B)


      1 // Copyright (c) the JPEG XL Project Authors. All rights reserved.
      2 //
      3 // Use of this source code is governed by a BSD-style
      4 // license that can be found in the LICENSE file.
      5 
      6 #include "lib/threads/thread_parallel_runner_internal.h"
      7 
      8 #include <jxl/parallel_runner.h>
      9 #include <jxl/types.h>
     10 
     11 #include <algorithm>
     12 #include <atomic>
     13 #include <cstddef>
     14 #include <cstdint>
     15 #include <mutex>
     16 #include <thread>
     17 
     18 #include "lib/jxl/base/compiler_specific.h"
     19 
     20 namespace jpegxl {
     21 
     22 // static
     23 JxlParallelRetCode ThreadParallelRunner::Runner(
     24    void* runner_opaque, void* jpegxl_opaque, JxlParallelRunInit init,
     25    JxlParallelRunFunction func, uint32_t start_range, uint32_t end_range) {
     26  ThreadParallelRunner* self =
     27      static_cast<ThreadParallelRunner*>(runner_opaque);
     28  if (start_range > end_range) return JXL_PARALLEL_RET_RUNNER_ERROR;
     29  if (start_range == end_range) return JXL_PARALLEL_RET_SUCCESS;
     30 
     31  int ret = init(jpegxl_opaque, std::max<size_t>(self->num_worker_threads_, 1));
     32  if (ret != JXL_PARALLEL_RET_SUCCESS) return ret;
     33 
     34  // Use a sequential run when num_worker_threads_ is zero since we have no
     35  // worker threads.
     36  if (self->num_worker_threads_ == 0) {
     37    const size_t thread = 0;
     38    for (uint32_t task = start_range; task < end_range; ++task) {
     39      func(jpegxl_opaque, task, thread);
     40    }
     41    return JXL_PARALLEL_RET_SUCCESS;
     42  }
     43 
     44  if (self->depth_.fetch_add(1, std::memory_order_acq_rel) != 0) {
     45    return JXL_PARALLEL_RET_RUNNER_ERROR;  // Must not re-enter.
     46  }
     47 
     48  const WorkerCommand worker_command =
     49      (static_cast<WorkerCommand>(start_range) << 32) + end_range;
     50  // Ensure the inputs do not result in a reserved command.
     51  if ((worker_command == kWorkerWait) || (worker_command == kWorkerOnce) ||
     52      (worker_command == kWorkerExit)) {
     53    return JXL_PARALLEL_RET_RUNNER_ERROR;
     54  }
     55 
     56  self->data_func_ = func;
     57  self->jpegxl_opaque_ = jpegxl_opaque;
     58  self->num_reserved_.store(0, std::memory_order_relaxed);
     59 
     60  self->StartWorkers(worker_command);
     61  self->WorkersReadyBarrier();
     62 
     63  if (self->depth_.fetch_add(-1, std::memory_order_acq_rel) != 1) {
     64    return JXL_PARALLEL_RET_RUNNER_ERROR;
     65  }
     66  return JXL_PARALLEL_RET_SUCCESS;
     67 }
     68 
     69 // static
     70 void ThreadParallelRunner::RunRange(ThreadParallelRunner* self,
     71                                    const WorkerCommand command,
     72                                    const int thread) {
     73  const uint32_t begin = command >> 32;
     74  const uint32_t end = command & 0xFFFFFFFF;
     75  const uint32_t num_tasks = end - begin;
     76  const uint32_t num_worker_threads = self->num_worker_threads_;
     77 
     78  // OpenMP introduced several "schedule" strategies:
     79  // "single" (static assignment of exactly one chunk per thread): slower.
     80  // "dynamic" (allocates k tasks at a time): competitive for well-chosen k.
     81  // "guided" (allocates k tasks, decreases k): computing k = remaining/n
     82  //   is faster than halving k each iteration. We prefer this strategy
     83  //   because it avoids user-specified parameters.
     84 
     85  for (;;) {
     86 #if JXL_FALSE
     87    // dynamic
     88    const uint32_t my_size = std::max(num_tasks / (num_worker_threads * 4), 1);
     89 #else
     90    // guided
     91    const uint32_t num_reserved =
     92        self->num_reserved_.load(std::memory_order_relaxed);
     93    // It is possible that more tasks are reserved than ready to run.
     94    const uint32_t num_remaining =
     95        num_tasks - std::min(num_reserved, num_tasks);
     96    const uint32_t my_size =
     97        std::max(num_remaining / (num_worker_threads * 4), 1u);
     98 #endif
     99    const uint32_t my_begin = begin + self->num_reserved_.fetch_add(
    100                                          my_size, std::memory_order_relaxed);
    101    const uint32_t my_end = std::min(my_begin + my_size, begin + num_tasks);
    102    // Another thread already reserved the last task.
    103    if (my_begin >= my_end) {
    104      break;
    105    }
    106    for (uint32_t task = my_begin; task < my_end; ++task) {
    107      self->data_func_(self->jpegxl_opaque_, task, thread);
    108    }
    109  }
    110 }
    111 
    112 // static
    113 void ThreadParallelRunner::ThreadFunc(ThreadParallelRunner* self,
    114                                      const int thread) {
    115  // Until kWorkerExit command received:
    116  for (;;) {
    117    std::unique_lock<std::mutex> lock(self->mutex_);
    118    // Notify main thread that this thread is ready.
    119    if (++self->workers_ready_ == self->num_threads_) {
    120      self->workers_ready_cv_.notify_one();
    121    }
    122  RESUME_WAIT:
    123    // Wait for a command.
    124    self->worker_start_cv_.wait(lock);
    125    const WorkerCommand command = self->worker_start_command_;
    126    switch (command) {
    127      case kWorkerWait:    // spurious wakeup:
    128        goto RESUME_WAIT;  // lock still held, avoid incrementing ready.
    129      case kWorkerOnce:
    130        lock.unlock();
    131        self->data_func_(self->jpegxl_opaque_, thread, thread);
    132        break;
    133      case kWorkerExit:
    134        return;  // exits thread
    135      default:
    136        lock.unlock();
    137        RunRange(self, command, thread);
    138        break;
    139    }
    140  }
    141 }
    142 
    143 ThreadParallelRunner::ThreadParallelRunner(const int num_worker_threads)
    144    : num_worker_threads_(num_worker_threads),
    145      num_threads_(std::max(num_worker_threads, 1)) {
    146  threads_.reserve(num_worker_threads_);
    147 
    148  // Suppress "unused-private-field" warning.
    149  (void)padding1;
    150  (void)padding2;
    151 
    152  // Safely handle spurious worker wakeups.
    153  worker_start_command_ = kWorkerWait;
    154 
    155  for (uint32_t i = 0; i < num_worker_threads_; ++i) {
    156    threads_.emplace_back(ThreadFunc, this, i);
    157  }
    158 
    159  if (num_worker_threads_ != 0) {
    160    WorkersReadyBarrier();
    161  }
    162 }
    163 
    164 ThreadParallelRunner::~ThreadParallelRunner() {
    165  if (num_worker_threads_ != 0) {
    166    StartWorkers(kWorkerExit);
    167  }
    168 
    169  for (std::thread& thread : threads_) {
    170    if (thread.joinable()) {
    171      thread.join();
    172    } else {
    173 #if JXL_IS_DEBUG_BUILD
    174      JXL_PRINT_STACK_TRACE();
    175      JXL_CRASH();
    176 #endif
    177    }
    178  }
    179 }
    180 }  // namespace jpegxl