data_parallel.h (4877B)
1 // Copyright (c) the JPEG XL Project Authors. All rights reserved. 2 // 3 // Use of this source code is governed by a BSD-style 4 // license that can be found in the LICENSE file. 5 6 #ifndef LIB_JXL_BASE_DATA_PARALLEL_H_ 7 #define LIB_JXL_BASE_DATA_PARALLEL_H_ 8 9 // Portable, low-overhead C++11 ThreadPool alternative to OpenMP for 10 // data-parallel computations. 11 12 #include <jxl/parallel_runner.h> 13 14 #include <atomic> 15 #include <cstddef> 16 #include <cstdint> 17 18 #include "lib/jxl/base/compiler_specific.h" 19 #include "lib/jxl/base/status.h" 20 #if JXL_COMPILER_MSVC 21 // suppress warnings about the const & applied to function types 22 #pragma warning(disable : 4180) 23 #endif 24 25 namespace jxl { 26 27 class ThreadPool { 28 public: 29 ThreadPool(JxlParallelRunner runner, void* runner_opaque) 30 : runner_(runner), 31 runner_opaque_(runner ? runner_opaque : static_cast<void*>(this)) {} 32 33 ThreadPool(const ThreadPool&) = delete; 34 ThreadPool& operator&(const ThreadPool&) = delete; 35 36 JxlParallelRunner runner() const { return runner_; } 37 void* runner_opaque() const { return runner_opaque_; } 38 39 // Runs init_func(num_threads) followed by data_func(task, thread) on worker 40 // thread(s) for every task in [begin, end). init_func() must return a Status 41 // indicating whether the initialization succeeded. 42 // "thread" is an integer smaller than num_threads. 43 // Not thread-safe - no two calls to Run may overlap. 44 // Subsequent calls will reuse the same threads. 45 // 46 // Precondition: begin <= end. 47 template <class InitFunc, class DataFunc> 48 Status Run(uint32_t begin, uint32_t end, const InitFunc& init_func, 49 const DataFunc& data_func, const char* caller) { 50 JXL_ENSURE(begin <= end); 51 if (begin == end) return true; 52 RunCallState<InitFunc, DataFunc> call_state(init_func, data_func); 53 // The runner_ uses the C convention and returns 0 in case of error, so we 54 // convert it to a Status. 55 if (!runner_) { 56 void* jpegxl_opaque = static_cast<void*>(&call_state); 57 if (call_state.CallInitFunc(jpegxl_opaque, 1) != 58 JXL_PARALLEL_RET_SUCCESS) { 59 return JXL_FAILURE("Failed to initialize thread"); 60 } 61 for (uint32_t i = begin; i < end; i++) { 62 call_state.CallDataFunc(jpegxl_opaque, i, 0); 63 } 64 if (call_state.HasError()) { 65 return JXL_FAILURE("[%s] failed", caller); 66 } 67 return true; 68 } 69 JxlParallelRetCode ret = (*runner_)( 70 runner_opaque_, static_cast<void*>(&call_state), 71 &call_state.CallInitFunc, &call_state.CallDataFunc, begin, end); 72 73 if (ret != JXL_PARALLEL_RET_SUCCESS || call_state.HasError()) { 74 return JXL_FAILURE("[%s] failed", caller); 75 } 76 return true; 77 } 78 79 // Use this as init_func when no initialization is needed. 80 static Status NoInit(size_t num_threads) { return true; } 81 82 private: 83 // class holding the state of a Run() call to pass to the runner_ as an 84 // opaque_jpegxl pointer. 85 template <class InitFunc, class DataFunc> 86 class RunCallState final { 87 public: 88 RunCallState(const InitFunc& init_func, const DataFunc& data_func) 89 : init_func_(init_func), data_func_(data_func) {} 90 91 // JxlParallelRunInit interface. 92 static int CallInitFunc(void* jpegxl_opaque, size_t num_threads) { 93 auto* self = 94 static_cast<RunCallState<InitFunc, DataFunc>*>(jpegxl_opaque); 95 // Returns -1 when the internal init function returns false Status to 96 // indicate an error. 97 if (!self->init_func_(num_threads)) { 98 self->has_error_ = true; 99 return JXL_PARALLEL_RET_RUNNER_ERROR; 100 } 101 return JXL_PARALLEL_RET_SUCCESS; 102 } 103 104 // JxlParallelRunFunction interface. 105 static void CallDataFunc(void* jpegxl_opaque, uint32_t value, 106 size_t thread_id) { 107 auto* self = 108 static_cast<RunCallState<InitFunc, DataFunc>*>(jpegxl_opaque); 109 if (self->has_error_) return; 110 if (!self->data_func_(value, thread_id)) { 111 self->has_error_ = true; 112 } 113 } 114 115 bool HasError() const { return has_error_; } 116 117 private: 118 const InitFunc& init_func_; 119 const DataFunc& data_func_; 120 std::atomic<bool> has_error_{false}; 121 }; 122 123 // The caller supplied runner function and its opaque void*. 124 const JxlParallelRunner runner_; 125 void* const runner_opaque_; 126 }; 127 128 template <class InitFunc, class DataFunc> 129 Status RunOnPool(ThreadPool* pool, const uint32_t begin, const uint32_t end, 130 const InitFunc& init_func, const DataFunc& data_func, 131 const char* caller) { 132 if (pool == nullptr) { 133 ThreadPool default_pool(nullptr, nullptr); 134 return default_pool.Run(begin, end, init_func, data_func, caller); 135 } else { 136 return pool->Run(begin, end, init_func, data_func, caller); 137 } 138 } 139 140 } // namespace jxl 141 #if JXL_COMPILER_MSVC 142 #pragma warning(default : 4180) 143 #endif 144 145 #endif // LIB_JXL_BASE_DATA_PARALLEL_H_