bench_parallel.cc (7674B)
1 // Copyright 2021 Google LLC 2 // SPDX-License-Identifier: Apache-2.0 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 16 // Concurrent, independent sorts for generating more memory traffic and testing 17 // scalability when bandwidth-limited. If you want to use multiple threads for 18 // a single sort, you can use ips4o and integrate vqsort by calling it from 19 // `baseCaseSort` and increasing `IPS4OML_BASE_CASE_SIZE` to say 8192. 20 21 #include <stdint.h> 22 #include <stdio.h> 23 24 #include <condition_variable> //NOLINT 25 #include <functional> 26 #include <mutex> //NOLINT 27 #include <thread> //NOLINT 28 #include <vector> 29 30 #include "hwy/timer.h" 31 32 // clang-format off 33 #undef HWY_TARGET_INCLUDE 34 #define HWY_TARGET_INCLUDE "hwy/contrib/sort/bench_parallel.cc" //NOLINT 35 #include "hwy/foreach_target.h" // IWYU pragma: keep 36 37 // After foreach_target 38 #include "hwy/contrib/sort/algo-inl.h" 39 #include "hwy/contrib/sort/result-inl.h" 40 #include "hwy/aligned_allocator.h" 41 // Last 42 #include "hwy/tests/test_util-inl.h" 43 // clang-format on 44 45 HWY_BEFORE_NAMESPACE(); 46 namespace hwy { 47 namespace HWY_NAMESPACE { 48 namespace { 49 50 class ThreadPool { 51 public: 52 // Starts the given number of worker threads and blocks until they are ready. 53 explicit ThreadPool( 54 const size_t num_threads = std::thread::hardware_concurrency()) 55 : num_threads_(num_threads) { 56 HWY_ASSERT(num_threads_ > 0); 57 threads_.reserve(num_threads_); 58 for (size_t i = 0; i < num_threads_; ++i) { 59 threads_.emplace_back(ThreadFunc, this, i); 60 } 61 62 WorkersReadyBarrier(); 63 } 64 65 ThreadPool(const ThreadPool&) = delete; 66 ThreadPool& operator&(const ThreadPool&) = delete; 67 68 // Waits for all threads to exit. 69 ~ThreadPool() { 70 StartWorkers(kWorkerExit); 71 72 for (std::thread& thread : threads_) { 73 thread.join(); 74 } 75 } 76 77 size_t NumThreads() const { return threads_.size(); } 78 79 template <class Func> 80 void RunOnThreads(size_t max_threads, const Func& func) { 81 task_ = &CallClosure<Func>; 82 data_ = &func; 83 StartWorkers(max_threads); 84 WorkersReadyBarrier(); 85 } 86 87 private: 88 // After construction and between calls to Run, workers are "ready", i.e. 89 // waiting on worker_start_cv_. They are "started" by sending a "command" 90 // and notifying all worker_start_cv_ waiters. (That is why all workers 91 // must be ready/waiting - otherwise, the notification will not reach all of 92 // them and the main thread waits in vain for them to report readiness.) 93 using WorkerCommand = uint64_t; 94 95 static constexpr WorkerCommand kWorkerWait = ~1ULL; 96 static constexpr WorkerCommand kWorkerExit = ~2ULL; 97 98 // Calls a closure (lambda with captures). 99 template <class Closure> 100 static void CallClosure(const void* f, size_t thread) { 101 (*reinterpret_cast<const Closure*>(f))(thread); 102 } 103 104 void WorkersReadyBarrier() { 105 std::unique_lock<std::mutex> lock(mutex_); 106 // Typically only a single iteration. 107 while (workers_ready_ != threads_.size()) { 108 workers_ready_cv_.wait(lock); 109 } 110 workers_ready_ = 0; 111 112 // Safely handle spurious worker wakeups. 113 worker_start_command_ = kWorkerWait; 114 } 115 116 // Precondition: all workers are ready. 117 void StartWorkers(const WorkerCommand worker_command) { 118 std::unique_lock<std::mutex> lock(mutex_); 119 worker_start_command_ = worker_command; 120 // Workers will need this lock, so release it before they wake up. 121 lock.unlock(); 122 worker_start_cv_.notify_all(); 123 } 124 125 static void ThreadFunc(ThreadPool* self, size_t thread) { 126 // Until kWorkerExit command received: 127 for (;;) { 128 std::unique_lock<std::mutex> lock(self->mutex_); 129 // Notify main thread that this thread is ready. 130 if (++self->workers_ready_ == self->num_threads_) { 131 self->workers_ready_cv_.notify_one(); 132 } 133 RESUME_WAIT: 134 // Wait for a command. 135 self->worker_start_cv_.wait(lock); 136 const WorkerCommand command = self->worker_start_command_; 137 switch (command) { 138 case kWorkerWait: // spurious wakeup: 139 goto RESUME_WAIT; // lock still held, avoid incrementing ready. 140 case kWorkerExit: 141 return; // exits thread 142 default: 143 break; 144 } 145 146 lock.unlock(); 147 // Command is the maximum number of threads that should run the task. 148 HWY_ASSERT(command < self->NumThreads()); 149 if (thread < command) { 150 self->task_(self->data_, thread); 151 } 152 } 153 } 154 155 const size_t num_threads_; 156 157 // Unmodified after ctor, but cannot be const because we call thread::join(). 158 std::vector<std::thread> threads_; 159 160 std::mutex mutex_; // guards both cv and their variables. 161 std::condition_variable workers_ready_cv_; 162 size_t workers_ready_ = 0; 163 std::condition_variable worker_start_cv_; 164 WorkerCommand worker_start_command_; 165 166 // Written by main thread, read by workers (after mutex lock/unlock). 167 std::function<void(const void*, size_t)> task_; // points to CallClosure 168 const void* data_; // points to caller's Func 169 }; 170 171 template <class Traits> 172 void RunWithoutVerify(Traits st, const Dist dist, const size_t num_keys, 173 const Algo algo, SharedState& shared, size_t thread) { 174 using LaneType = typename Traits::LaneType; 175 using KeyType = typename Traits::KeyType; 176 using Order = typename Traits::Order; 177 const size_t num_lanes = num_keys * st.LanesPerKey(); 178 auto aligned = hwy::AllocateAligned<LaneType>(num_lanes); 179 180 (void)GenerateInput(dist, aligned.get(), num_lanes); 181 182 const Timestamp t0; 183 Run(algo, reinterpret_cast<KeyType*>(aligned.get()), num_keys, shared, thread, 184 /*k_keys=*/0, Order()); 185 HWY_ASSERT(aligned[0] < aligned[num_lanes - 1]); 186 } 187 188 void BenchParallel() { 189 // Not interested in benchmark results for other targets on x86 190 if (HWY_ARCH_X86 && 191 (HWY_TARGET != HWY_AVX2 && HWY_TARGET != HWY_AVX3 && 192 HWY_TARGET != HWY_AVX3_ZEN4 && HWY_TARGET != HWY_AVX3_SPR)) { 193 return; 194 } 195 196 ThreadPool pool; 197 const size_t NT = pool.NumThreads(); 198 199 detail::SharedTraits<detail::TraitsLane<detail::OrderAscending<int64_t>>> st; 200 using KeyType = typename decltype(st)::KeyType; 201 const size_t num_keys = size_t{100} * 1000 * 1000; 202 203 #if HAVE_IPS4O 204 const Algo algo = Algo::kIPS4O; 205 #else 206 const Algo algo = Algo::kVQSort; 207 #endif 208 const Dist dist = Dist::kUniform32; 209 210 SharedState shared; 211 212 std::vector<SortResult> results; 213 for (size_t nt = 1; nt < NT; nt += HWY_MAX(1, NT / 16)) { 214 Timestamp t0; 215 // Default capture because MSVC wants algo/dist but clang does not. 216 pool.RunOnThreads(nt, [=, &shared](size_t thread) { 217 RunWithoutVerify(st, dist, num_keys, algo, shared, thread); 218 }); 219 const double sec = SecondsSince(t0); 220 results.emplace_back(algo, dist, num_keys, nt, sec, sizeof(KeyType), 221 st.KeyString()); 222 results.back().Print(); 223 } 224 } 225 226 } // namespace 227 // NOLINTNEXTLINE(google-readability-namespace-comments) 228 } // namespace HWY_NAMESPACE 229 } // namespace hwy 230 HWY_AFTER_NAMESPACE(); 231 232 #if HWY_ONCE 233 234 namespace hwy { 235 namespace { 236 HWY_BEFORE_TEST(BenchParallel); 237 HWY_EXPORT_AND_TEST_P(BenchParallel, BenchParallel); 238 HWY_AFTER_TEST(); 239 } // namespace 240 } // namespace hwy 241 242 #endif // HWY_ONCE