thread_parallel_runner_internal.cc (5792B)
1 // Copyright (c) the JPEG XL Project Authors. All rights reserved. 2 // 3 // Use of this source code is governed by a BSD-style 4 // license that can be found in the LICENSE file. 5 6 #include "lib/threads/thread_parallel_runner_internal.h" 7 8 #include <jxl/parallel_runner.h> 9 #include <jxl/types.h> 10 11 #include <algorithm> 12 #include <atomic> 13 #include <cstddef> 14 #include <cstdint> 15 #include <mutex> 16 #include <thread> 17 18 #include "lib/jxl/base/compiler_specific.h" 19 20 namespace jpegxl { 21 22 // static 23 JxlParallelRetCode ThreadParallelRunner::Runner( 24 void* runner_opaque, void* jpegxl_opaque, JxlParallelRunInit init, 25 JxlParallelRunFunction func, uint32_t start_range, uint32_t end_range) { 26 ThreadParallelRunner* self = 27 static_cast<ThreadParallelRunner*>(runner_opaque); 28 if (start_range > end_range) return JXL_PARALLEL_RET_RUNNER_ERROR; 29 if (start_range == end_range) return JXL_PARALLEL_RET_SUCCESS; 30 31 int ret = init(jpegxl_opaque, std::max<size_t>(self->num_worker_threads_, 1)); 32 if (ret != JXL_PARALLEL_RET_SUCCESS) return ret; 33 34 // Use a sequential run when num_worker_threads_ is zero since we have no 35 // worker threads. 36 if (self->num_worker_threads_ == 0) { 37 const size_t thread = 0; 38 for (uint32_t task = start_range; task < end_range; ++task) { 39 func(jpegxl_opaque, task, thread); 40 } 41 return JXL_PARALLEL_RET_SUCCESS; 42 } 43 44 if (self->depth_.fetch_add(1, std::memory_order_acq_rel) != 0) { 45 return JXL_PARALLEL_RET_RUNNER_ERROR; // Must not re-enter. 46 } 47 48 const WorkerCommand worker_command = 49 (static_cast<WorkerCommand>(start_range) << 32) + end_range; 50 // Ensure the inputs do not result in a reserved command. 51 if ((worker_command == kWorkerWait) || (worker_command == kWorkerOnce) || 52 (worker_command == kWorkerExit)) { 53 return JXL_PARALLEL_RET_RUNNER_ERROR; 54 } 55 56 self->data_func_ = func; 57 self->jpegxl_opaque_ = jpegxl_opaque; 58 self->num_reserved_.store(0, std::memory_order_relaxed); 59 60 self->StartWorkers(worker_command); 61 self->WorkersReadyBarrier(); 62 63 if (self->depth_.fetch_add(-1, std::memory_order_acq_rel) != 1) { 64 return JXL_PARALLEL_RET_RUNNER_ERROR; 65 } 66 return JXL_PARALLEL_RET_SUCCESS; 67 } 68 69 // static 70 void ThreadParallelRunner::RunRange(ThreadParallelRunner* self, 71 const WorkerCommand command, 72 const int thread) { 73 const uint32_t begin = command >> 32; 74 const uint32_t end = command & 0xFFFFFFFF; 75 const uint32_t num_tasks = end - begin; 76 const uint32_t num_worker_threads = self->num_worker_threads_; 77 78 // OpenMP introduced several "schedule" strategies: 79 // "single" (static assignment of exactly one chunk per thread): slower. 80 // "dynamic" (allocates k tasks at a time): competitive for well-chosen k. 81 // "guided" (allocates k tasks, decreases k): computing k = remaining/n 82 // is faster than halving k each iteration. We prefer this strategy 83 // because it avoids user-specified parameters. 84 85 for (;;) { 86 #if JXL_FALSE 87 // dynamic 88 const uint32_t my_size = std::max(num_tasks / (num_worker_threads * 4), 1); 89 #else 90 // guided 91 const uint32_t num_reserved = 92 self->num_reserved_.load(std::memory_order_relaxed); 93 // It is possible that more tasks are reserved than ready to run. 94 const uint32_t num_remaining = 95 num_tasks - std::min(num_reserved, num_tasks); 96 const uint32_t my_size = 97 std::max(num_remaining / (num_worker_threads * 4), 1u); 98 #endif 99 const uint32_t my_begin = begin + self->num_reserved_.fetch_add( 100 my_size, std::memory_order_relaxed); 101 const uint32_t my_end = std::min(my_begin + my_size, begin + num_tasks); 102 // Another thread already reserved the last task. 103 if (my_begin >= my_end) { 104 break; 105 } 106 for (uint32_t task = my_begin; task < my_end; ++task) { 107 self->data_func_(self->jpegxl_opaque_, task, thread); 108 } 109 } 110 } 111 112 // static 113 void ThreadParallelRunner::ThreadFunc(ThreadParallelRunner* self, 114 const int thread) { 115 // Until kWorkerExit command received: 116 for (;;) { 117 std::unique_lock<std::mutex> lock(self->mutex_); 118 // Notify main thread that this thread is ready. 119 if (++self->workers_ready_ == self->num_threads_) { 120 self->workers_ready_cv_.notify_one(); 121 } 122 RESUME_WAIT: 123 // Wait for a command. 124 self->worker_start_cv_.wait(lock); 125 const WorkerCommand command = self->worker_start_command_; 126 switch (command) { 127 case kWorkerWait: // spurious wakeup: 128 goto RESUME_WAIT; // lock still held, avoid incrementing ready. 129 case kWorkerOnce: 130 lock.unlock(); 131 self->data_func_(self->jpegxl_opaque_, thread, thread); 132 break; 133 case kWorkerExit: 134 return; // exits thread 135 default: 136 lock.unlock(); 137 RunRange(self, command, thread); 138 break; 139 } 140 } 141 } 142 143 ThreadParallelRunner::ThreadParallelRunner(const int num_worker_threads) 144 : num_worker_threads_(num_worker_threads), 145 num_threads_(std::max(num_worker_threads, 1)) { 146 threads_.reserve(num_worker_threads_); 147 148 // Suppress "unused-private-field" warning. 149 (void)padding1; 150 (void)padding2; 151 152 // Safely handle spurious worker wakeups. 153 worker_start_command_ = kWorkerWait; 154 155 for (uint32_t i = 0; i < num_worker_threads_; ++i) { 156 threads_.emplace_back(ThreadFunc, this, i); 157 } 158 159 if (num_worker_threads_ != 0) { 160 WorkersReadyBarrier(); 161 } 162 } 163 164 ThreadParallelRunner::~ThreadParallelRunner() { 165 if (num_worker_threads_ != 0) { 166 StartWorkers(kWorkerExit); 167 } 168 169 for (std::thread& thread : threads_) { 170 if (thread.joinable()) { 171 thread.join(); 172 } else { 173 #if JXL_IS_DEBUG_BUILD 174 JXL_PRINT_STACK_TRACE(); 175 JXL_CRASH(); 176 #endif 177 } 178 } 179 } 180 } // namespace jpegxl