thread_pool.h (64831B)
1 // Copyright 2023 Google LLC 2 // SPDX-License-Identifier: Apache-2.0 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 16 // Modified from BSD-licensed code 17 // Copyright (c) the JPEG XL Project Authors. All rights reserved. 18 // See https://github.com/libjxl/libjxl/blob/main/LICENSE. 19 20 #ifndef HIGHWAY_HWY_CONTRIB_THREAD_POOL_THREAD_POOL_H_ 21 #define HIGHWAY_HWY_CONTRIB_THREAD_POOL_THREAD_POOL_H_ 22 23 #include <stddef.h> 24 #include <stdint.h> 25 #include <stdio.h> // snprintf 26 #include <string.h> 27 28 #include <array> 29 #include <atomic> 30 #include <string> 31 #include <thread> // NOLINT 32 #include <vector> 33 34 #include "hwy/aligned_allocator.h" // HWY_ALIGNMENT 35 #include "hwy/auto_tune.h" 36 #include "hwy/base.h" 37 #include "hwy/cache_control.h" // Pause 38 #include "hwy/contrib/thread_pool/futex.h" 39 #include "hwy/contrib/thread_pool/spin.h" 40 #include "hwy/contrib/thread_pool/topology.h" 41 #include "hwy/profiler.h" 42 #include "hwy/stats.h" 43 #include "hwy/timer.h" 44 45 #if HWY_OS_APPLE 46 #include <AvailabilityMacros.h> 47 #endif 48 49 #if PROFILER_ENABLED 50 #include <algorithm> // std::sort 51 52 #include "hwy/bit_set.h" 53 #endif 54 55 namespace hwy { 56 57 // Sets the name of the current thread to the format string `format`, which must 58 // include %d for `thread`. Currently only implemented for pthreads (*nix and 59 // OSX); Windows involves throwing an exception. 60 static inline void SetThreadName(const char* format, int thread) { 61 char buf[16] = {}; // Linux limit, including \0 62 const int chars_written = snprintf(buf, sizeof(buf), format, thread); 63 HWY_ASSERT(0 < chars_written && 64 chars_written <= static_cast<int>(sizeof(buf) - 1)); 65 66 #if (HWY_OS_LINUX && (!defined(__ANDROID__) || __ANDROID_API__ >= 19)) || \ 67 HWY_OS_FREEBSD 68 // Note that FreeBSD pthread_set_name_np does not return a value (#2669). 69 HWY_ASSERT(0 == pthread_setname_np(pthread_self(), buf)); 70 #elif HWY_OS_APPLE && (MAC_OS_X_VERSION_MIN_REQUIRED >= 1060) 71 // Different interface: single argument, current thread only. 72 HWY_ASSERT(0 == pthread_setname_np(buf)); 73 #elif defined(__EMSCRIPTEN__) 74 emscripten_set_thread_name(pthread_self(), buf); 75 #else 76 (void)format; 77 (void)thread; 78 #endif 79 } 80 81 // Whether workers should block or spin. 82 enum class PoolWaitMode : uint8_t { kBlock = 1, kSpin }; 83 84 enum class Exit : uint32_t { kNone, kLoop, kThread }; 85 86 // Upper bound on non-empty `ThreadPool` (single-worker pools do not count). 87 // Turin has 16 clusters. Add one for the across-cluster pool. 88 HWY_INLINE_VAR constexpr size_t kMaxClusters = 32 + 1; 89 90 // Use the last slot so that `PoolWorkerMapping` does not have to know the 91 // total number of clusters. 92 HWY_INLINE_VAR constexpr size_t kAllClusters = kMaxClusters - 1; 93 94 // Argument to `ThreadPool`: how to map local worker_idx to global. 95 class PoolWorkerMapping { 96 public: 97 // Backward-compatible mode: returns local worker index. 98 PoolWorkerMapping() : cluster_idx_(0), max_cluster_workers_(0) {} 99 PoolWorkerMapping(size_t cluster_idx, size_t max_cluster_workers) 100 : cluster_idx_(cluster_idx), max_cluster_workers_(max_cluster_workers) { 101 HWY_DASSERT(cluster_idx <= kAllClusters); 102 // Only use this ctor for the new global worker index mode. If this were 103 // zero, we would still return local indices. 104 HWY_DASSERT(max_cluster_workers != 0); 105 } 106 107 size_t ClusterIdx() const { return cluster_idx_; } 108 size_t MaxClusterWorkers() const { return max_cluster_workers_; } 109 110 // Returns global_idx, or unchanged local worker_idx if default-constructed. 111 size_t operator()(size_t worker_idx) const { 112 if (cluster_idx_ == kAllClusters) { 113 const size_t cluster_idx = worker_idx; 114 HWY_DASSERT(cluster_idx < kAllClusters); 115 // First index within the N-th cluster. The main thread is the first. 116 return cluster_idx * max_cluster_workers_; 117 } 118 HWY_DASSERT(max_cluster_workers_ == 0 || worker_idx < max_cluster_workers_); 119 return cluster_idx_ * max_cluster_workers_ + worker_idx; 120 } 121 122 private: 123 size_t cluster_idx_; 124 size_t max_cluster_workers_; 125 }; 126 127 namespace pool { 128 129 #ifndef HWY_POOL_VERBOSITY 130 #define HWY_POOL_VERBOSITY 0 131 #endif 132 133 static constexpr int kVerbosity = HWY_POOL_VERBOSITY; 134 135 // Some CPUs already have more than this many threads, but rather than one 136 // large pool, we assume applications create multiple pools, ideally per 137 // cluster (cores sharing a cache), because this improves locality and barrier 138 // latency. In that case, this is a generous upper bound. 139 static constexpr size_t kMaxThreads = 127; 140 141 // Generates a random permutation of [0, size). O(1) storage. 142 class ShuffledIota { 143 public: 144 ShuffledIota() : coprime_(1) {} // for Worker 145 explicit ShuffledIota(uint32_t coprime) : coprime_(coprime) {} 146 147 // Returns the next after `current`, using an LCG-like generator. 148 uint32_t Next(uint32_t current, const Divisor64& divisor) const { 149 HWY_DASSERT(current < divisor.GetDivisor()); 150 // (coprime * i + current) % size, see https://lemire.me/blog/2017/09/18/. 151 return static_cast<uint32_t>(divisor.Remainder(current + coprime_)); 152 } 153 154 // Returns true if a and b have no common denominator except 1. Based on 155 // binary GCD. Assumes a and b are nonzero. Also used in tests. 156 static bool CoprimeNonzero(uint32_t a, uint32_t b) { 157 const size_t trailing_a = Num0BitsBelowLS1Bit_Nonzero32(a); 158 const size_t trailing_b = Num0BitsBelowLS1Bit_Nonzero32(b); 159 // If both have at least one trailing zero, they are both divisible by 2. 160 if (HWY_MIN(trailing_a, trailing_b) != 0) return false; 161 162 // If one of them has a trailing zero, shift it out. 163 a >>= trailing_a; 164 b >>= trailing_b; 165 166 for (;;) { 167 // Swap such that a >= b. 168 const uint32_t tmp_a = a; 169 a = HWY_MAX(tmp_a, b); 170 b = HWY_MIN(tmp_a, b); 171 172 // When the smaller number is 1, they were coprime. 173 if (b == 1) return true; 174 175 a -= b; 176 // a == b means there was a common factor, so not coprime. 177 if (a == 0) return false; 178 a >>= Num0BitsBelowLS1Bit_Nonzero32(a); 179 } 180 } 181 182 // Returns another coprime >= `start`, or 1 for small `size`. 183 // Used to seed independent ShuffledIota instances. 184 static uint32_t FindAnotherCoprime(uint32_t size, uint32_t start) { 185 if (size <= 2) { 186 return 1; 187 } 188 189 // Avoids even x for even sizes, which are sure to be rejected. 190 const uint32_t inc = (size & 1) ? 1 : 2; 191 192 for (uint32_t x = start | 1; x < start + size * 16; x += inc) { 193 if (CoprimeNonzero(x, static_cast<uint32_t>(size))) { 194 return x; 195 } 196 } 197 198 HWY_UNREACHABLE; 199 } 200 201 uint32_t coprime_; 202 }; 203 204 // 'Policies' suitable for various worker counts and locality. To define a 205 // new class, add an enum and update `ToString` plus `CallWithConfig`. The 206 // enumerators must be contiguous so we can iterate over them. 207 enum class WaitType : uint8_t { 208 kBlock, 209 kSpin1, 210 kSpinSeparate, 211 kSentinel // Must be last. 212 }; 213 214 // For printing which is in use. 215 static inline const char* ToString(WaitType type) { 216 switch (type) { 217 case WaitType::kBlock: 218 return "Block"; 219 case WaitType::kSpin1: 220 return "Single"; 221 case WaitType::kSpinSeparate: 222 return "Separate"; 223 case WaitType::kSentinel: 224 return nullptr; 225 } 226 } 227 228 // Parameters governing the main and worker thread behavior. Can be updated at 229 // runtime via `SetWaitMode`, which calls `SendConfig`. Both have copies which 230 // are carefully synchronized. 32 bits leave room for two future fields. 231 // 64 bits would also be fine because this does not go through futex. 232 struct Config { // 4 bytes 233 static std::vector<Config> AllCandidates(PoolWaitMode wait_mode) { 234 std::vector<Config> candidates; 235 236 if (wait_mode == PoolWaitMode::kSpin) { 237 std::vector<SpinType> spin_types; 238 spin_types.reserve(2); 239 spin_types.push_back(DetectSpin()); 240 // Monitor-based spin may be slower, so also try Pause. 241 if (spin_types[0] != SpinType::kPause) { 242 spin_types.push_back(SpinType::kPause); 243 } 244 245 // All except `kBlock`. 246 std::vector<WaitType> wait_types; 247 for (size_t wait = 0;; ++wait) { 248 const WaitType wait_type = static_cast<WaitType>(wait); 249 if (wait_type == WaitType::kSentinel) break; 250 if (wait_type != WaitType::kBlock) wait_types.push_back(wait_type); 251 } 252 253 candidates.reserve(spin_types.size() * wait_types.size()); 254 for (const SpinType spin_type : spin_types) { 255 for (const WaitType wait_type : wait_types) { 256 candidates.emplace_back(spin_type, wait_type); 257 } 258 } 259 } else { 260 // kBlock does not use spin, so there is only one candidate. 261 candidates.emplace_back(SpinType::kPause, WaitType::kBlock); 262 } 263 264 return candidates; 265 } 266 267 std::string ToString() const { 268 char buf[128]; 269 snprintf(buf, sizeof(buf), "%-14s %-9s", hwy::ToString(spin_type), 270 pool::ToString(wait_type)); 271 return buf; 272 } 273 274 Config(SpinType spin_type_in, WaitType wait_type_in) 275 : spin_type(spin_type_in), wait_type(wait_type_in) {} 276 // Workers initially spin until ThreadPool sends them their actual config. 277 Config() : Config(SpinType::kPause, WaitType::kSpinSeparate) {} 278 279 SpinType spin_type; 280 WaitType wait_type; 281 HWY_MEMBER_VAR_MAYBE_UNUSED uint8_t reserved[2]; 282 }; 283 static_assert(sizeof(Config) == 4, ""); 284 285 #if PROFILER_ENABLED 286 287 // Accumulates timings and stats from main thread and workers. 288 class Stats { 289 // Up to `HWY_ALIGNMENT / 8` slots/offsets, passed to `PerThread`. 290 static constexpr size_t kDWait = 0; 291 static constexpr size_t kWaitReps = 1; 292 static constexpr size_t kTBeforeRun = 2; 293 static constexpr size_t kDRun = 3; 294 static constexpr size_t kTasksStatic = 4; 295 static constexpr size_t kTasksDynamic = 5; 296 static constexpr size_t kTasksStolen = 6; 297 static constexpr size_t kDFuncStatic = 7; 298 static constexpr size_t kDFuncDynamic = 8; 299 static constexpr size_t kSentinel = 9; 300 301 public: 302 Stats() { 303 for (size_t thread_idx = 0; thread_idx < kMaxThreads; ++thread_idx) { 304 for (size_t offset = 0; offset < kSentinel; ++offset) { 305 PerThread(thread_idx, offset) = 0; 306 } 307 } 308 Reset(); 309 } 310 311 // Called by the N lowest-indexed workers that got one of the N tasks, which 312 // includes the main thread because its index is 0. 313 // `d_*` denotes "difference" (of timestamps) and thus also duration. 314 void NotifyRunStatic(size_t worker_idx, timer::Ticks d_func) { 315 if (worker_idx == 0) { // main thread 316 num_run_static_++; 317 sum_tasks_static_++; 318 sum_d_func_static_ += d_func; 319 } else { 320 const size_t thread_idx = worker_idx - 1; 321 // Defer the sums until `NotifyMainRun` to avoid atomic RMW. 322 PerThread(thread_idx, kTasksStatic)++; 323 PerThread(thread_idx, kDFuncStatic) += d_func; 324 } 325 } 326 327 // Called by all workers, including the main thread, regardless of whether 328 // they actually stole or even ran a task. 329 void NotifyRunDynamic(size_t worker_idx, size_t tasks, size_t stolen, 330 timer::Ticks d_func) { 331 if (worker_idx == 0) { // main thread 332 num_run_dynamic_++; 333 sum_tasks_dynamic_ += tasks; 334 sum_tasks_stolen_ += stolen; 335 sum_d_func_dynamic_ += d_func; 336 } else { 337 const size_t thread_idx = worker_idx - 1; 338 // Defer the sums until `NotifyMainRun` to avoid atomic RMW. 339 PerThread(thread_idx, kTasksDynamic) += tasks; 340 PerThread(thread_idx, kTasksStolen) += stolen; 341 PerThread(thread_idx, kDFuncDynamic) += d_func; 342 } 343 } 344 345 // Called concurrently by non-main worker threads after their `WorkerRun` and 346 // before the barrier. 347 void NotifyThreadRun(size_t worker_idx, timer::Ticks d_wait, size_t wait_reps, 348 timer::Ticks t_before_run, timer::Ticks d_run) { 349 HWY_DASSERT(worker_idx != 0); // Not called by main thread. 350 const size_t thread_idx = worker_idx - 1; 351 HWY_DASSERT(PerThread(thread_idx, kDWait) == 0); 352 HWY_DASSERT(PerThread(thread_idx, kWaitReps) == 0); 353 HWY_DASSERT(PerThread(thread_idx, kTBeforeRun) == 0); 354 HWY_DASSERT(PerThread(thread_idx, kDRun) == 0); 355 PerThread(thread_idx, kDWait) = d_wait; 356 PerThread(thread_idx, kWaitReps) = wait_reps; 357 PerThread(thread_idx, kTBeforeRun) = t_before_run; // For wake latency. 358 PerThread(thread_idx, kDRun) = d_run; 359 } 360 361 // Called by the main thread after the barrier, whose store-release and 362 // load-acquire publishes all prior writes. Note: only the main thread can 363 // store `after_barrier`. If workers did, which by definition happens after 364 // the barrier, then they would race with this function's reads. 365 void NotifyMainRun(size_t num_threads, timer::Ticks t_before_wake, 366 timer::Ticks d_wake, timer::Ticks d_main_run, 367 timer::Ticks d_barrier) { 368 HWY_DASSERT(num_threads <= kMaxThreads); 369 370 timer::Ticks min_d_run = ~timer::Ticks{0}; 371 timer::Ticks max_d_run = 0; 372 timer::Ticks sum_d_run = 0; 373 for (size_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) { 374 sum_tasks_static_ += PerThread(thread_idx, kTasksStatic); 375 sum_tasks_dynamic_ += PerThread(thread_idx, kTasksDynamic); 376 sum_tasks_stolen_ += PerThread(thread_idx, kTasksStolen); 377 sum_d_func_static_ += PerThread(thread_idx, kDFuncStatic); 378 sum_d_func_dynamic_ += PerThread(thread_idx, kDFuncDynamic); 379 sum_d_wait_ += PerThread(thread_idx, kDWait); 380 sum_wait_reps_ += PerThread(thread_idx, kWaitReps); 381 const timer::Ticks d_thread_run = PerThread(thread_idx, kDRun); 382 min_d_run = HWY_MIN(min_d_run, d_thread_run); 383 max_d_run = HWY_MAX(max_d_run, d_thread_run); 384 sum_d_run += d_thread_run; 385 const timer::Ticks t_before_run = PerThread(thread_idx, kTBeforeRun); 386 387 for (size_t offset = 0; offset < kSentinel; ++offset) { 388 PerThread(thread_idx, offset) = 0; 389 } 390 391 HWY_DASSERT(t_before_run != 0); 392 const timer::Ticks d_latency = t_before_run - t_before_wake; 393 sum_wake_latency_ += d_latency; 394 max_wake_latency_ = HWY_MAX(max_wake_latency_, d_latency); 395 } 396 const double inv_avg_d_run = 397 static_cast<double>(num_threads) / static_cast<double>(sum_d_run); 398 // Ratios of min and max run times to the average, for this pool.Run. 399 const double r_min = static_cast<double>(min_d_run) * inv_avg_d_run; 400 const double r_max = static_cast<double>(max_d_run) * inv_avg_d_run; 401 402 num_run_++; // `num_run_*` are incremented by `NotifyRun*`. 403 sum_d_run_ += sum_d_run; 404 sum_r_min_ += r_min; // For average across all pool.Run. 405 sum_r_max_ += r_max; 406 407 sum_d_wake_ += d_wake; // `*wake_latency_` are updated above. 408 sum_d_barrier_ += d_barrier; 409 410 sum_d_run_ += d_main_run; 411 sum_d_run_main_ += d_main_run; 412 } 413 414 void PrintAndReset(size_t num_threads, timer::Ticks d_thread_lifetime_ticks) { 415 // This is unconditionally called via `ProfilerFunc`. If the pool was unused 416 // in this invocation, skip it. 417 if (num_run_ == 0) return; 418 HWY_ASSERT(num_run_ == num_run_static_ + num_run_dynamic_); 419 420 const double d_func_static = Seconds(sum_d_func_static_); 421 const double d_func_dynamic = Seconds(sum_d_func_dynamic_); 422 const double sum_d_run = Seconds(sum_d_run_); 423 const double func_div_run = (d_func_static + d_func_dynamic) / sum_d_run; 424 if (!(0.95 <= func_div_run && func_div_run <= 1.0)) { 425 HWY_WARN("Func time %f should be similar to total run %f.", 426 d_func_static + d_func_dynamic, sum_d_run); 427 } 428 const double sum_d_run_main = Seconds(sum_d_run_main_); 429 const double max_wake_latency = Seconds(max_wake_latency_); 430 const double sum_d_wait = Seconds(sum_d_wait_); 431 const double d_thread_lifetime = Seconds(d_thread_lifetime_ticks); 432 433 const double inv_run = 1.0 / static_cast<double>(num_run_); 434 const auto per_run = [inv_run](double sum) { return sum * inv_run; }; 435 const double avg_d_wake = per_run(Seconds(sum_d_wake_)); 436 const double avg_wake_latency = per_run(Seconds(sum_wake_latency_)); 437 const double avg_d_wait = per_run(sum_d_wait); 438 const double avg_wait_reps = per_run(static_cast<double>(sum_wait_reps_)); 439 const double avg_d_barrier = per_run(Seconds(sum_d_barrier_)); 440 const double avg_r_min = per_run(sum_r_min_); 441 const double avg_r_max = per_run(sum_r_max_); 442 443 const size_t num_workers = 1 + num_threads; 444 const double avg_tasks_static = 445 Avg(sum_tasks_static_, num_run_static_ * num_workers); 446 const double avg_tasks_dynamic = 447 Avg(sum_tasks_dynamic_, num_run_dynamic_ * num_workers); 448 const double avg_steals = 449 Avg(sum_tasks_stolen_, num_run_dynamic_ * num_workers); 450 const double avg_d_run = sum_d_run / num_workers; 451 452 const double pc_wait = sum_d_wait / d_thread_lifetime * 100.0; 453 const double pc_run = sum_d_run / d_thread_lifetime * 100.0; 454 const double pc_main = sum_d_run_main / avg_d_run * 100.0; 455 456 const auto us = [](double sec) { return sec * 1E6; }; 457 const auto ns = [](double sec) { return sec * 1E9; }; 458 printf( 459 "%3zu: %5d x %.2f/%5d x %4.1f tasks, %.2f steals; " 460 "wake %7.3f ns, latency %6.3f < %7.3f us, barrier %7.3f us; " 461 "wait %.1f us (%6.0f reps, %4.1f%%), balance %4.1f%%-%5.1f%%, " 462 "func: %6.3f + %7.3f, " 463 "%.1f%% of thread time %7.3f s; main:worker %5.1f%%\n", 464 num_threads, num_run_static_, avg_tasks_static, num_run_dynamic_, 465 avg_tasks_dynamic, avg_steals, ns(avg_d_wake), us(avg_wake_latency), 466 us(max_wake_latency), us(avg_d_barrier), us(avg_d_wait), avg_wait_reps, 467 pc_wait, avg_r_min * 100.0, avg_r_max * 100.0, d_func_static, 468 d_func_dynamic, pc_run, d_thread_lifetime, pc_main); 469 470 Reset(num_threads); 471 } 472 473 void Reset(size_t num_threads = kMaxThreads) { 474 num_run_ = 0; 475 num_run_static_ = 0; 476 num_run_dynamic_ = 0; 477 478 sum_tasks_stolen_ = 0; 479 sum_tasks_static_ = 0; 480 sum_tasks_dynamic_ = 0; 481 482 sum_d_wake_ = 0; 483 sum_wake_latency_ = 0; 484 max_wake_latency_ = 0; 485 sum_d_wait_ = 0; 486 sum_wait_reps_ = 0; 487 sum_d_barrier_ = 0; 488 489 sum_d_func_static_ = 0; 490 sum_d_func_dynamic_ = 0; 491 sum_r_min_ = 0.0; 492 sum_r_max_ = 0.0; 493 sum_d_run_ = 0; 494 sum_d_run_main_ = 0; 495 // ctor and `NotifyMainRun` already reset `PerThread`. 496 } 497 498 private: 499 template <typename T> 500 static double Avg(T sum, size_t div) { 501 return div == 0 ? 0.0 : static_cast<double>(sum) / static_cast<double>(div); 502 } 503 504 static constexpr size_t kU64PerLine = HWY_ALIGNMENT / sizeof(uint64_t); 505 506 uint64_t& PerThread(size_t thread_idx, size_t offset) { 507 HWY_DASSERT(thread_idx < kMaxThreads); 508 HWY_DASSERT(offset < kSentinel); 509 return per_thread_[thread_idx * kU64PerLine + offset]; 510 } 511 512 int32_t num_run_; 513 int32_t num_run_static_; 514 int32_t num_run_dynamic_; 515 516 int32_t sum_tasks_stolen_; 517 int64_t sum_tasks_static_; 518 int64_t sum_tasks_dynamic_; 519 520 timer::Ticks sum_d_wake_; 521 timer::Ticks sum_wake_latency_; 522 timer::Ticks max_wake_latency_; 523 timer::Ticks sum_d_wait_; 524 uint64_t sum_wait_reps_; 525 timer::Ticks sum_d_barrier_; 526 527 timer::Ticks sum_d_func_static_; 528 timer::Ticks sum_d_func_dynamic_; 529 double sum_r_min_; 530 double sum_r_max_; 531 timer::Ticks sum_d_run_; 532 timer::Ticks sum_d_run_main_; 533 534 // One cache line per pool thread to avoid false sharing. 535 uint64_t per_thread_[kMaxThreads * kU64PerLine]; 536 }; 537 // Enables shift rather than multiplication. 538 static_assert(sizeof(Stats) == (kMaxThreads + 1) * HWY_ALIGNMENT, "Wrong size"); 539 540 // Non-power of two to avoid 2K aliasing. 541 HWY_INLINE_VAR constexpr size_t kMaxCallers = 60; 542 543 // Per-caller stats, stored in `PerCluster`. 544 class CallerAccumulator { 545 public: 546 bool Any() const { return calls_ != 0; } 547 548 void Add(size_t tasks, size_t workers, bool is_root, timer::Ticks wait_before, 549 timer::Ticks elapsed) { 550 calls_++; 551 root_ += is_root; 552 workers_ += workers; 553 min_tasks_ = HWY_MIN(min_tasks_, tasks); 554 max_tasks_ = HWY_MAX(max_tasks_, tasks); 555 tasks_ += tasks; 556 wait_before_ += wait_before; 557 elapsed_ += elapsed; 558 } 559 560 void AddFrom(const CallerAccumulator& other) { 561 calls_ += other.calls_; 562 root_ += other.root_; 563 workers_ += other.workers_; 564 min_tasks_ = HWY_MIN(min_tasks_, other.min_tasks_); 565 max_tasks_ = HWY_MAX(max_tasks_, other.max_tasks_); 566 tasks_ += other.tasks_; 567 wait_before_ += other.wait_before_; 568 elapsed_ += other.elapsed_; 569 } 570 571 bool operator>(const CallerAccumulator& other) const { 572 return elapsed_ > other.elapsed_; 573 } 574 575 void PrintAndReset(const char* caller, size_t active_clusters) { 576 if (!Any()) return; 577 HWY_ASSERT(root_ <= calls_); 578 const double inv_calls = 1.0 / static_cast<double>(calls_); 579 const double pc_root = static_cast<double>(root_) * inv_calls * 100.0; 580 const double avg_workers = static_cast<double>(workers_) * inv_calls; 581 const double avg_tasks = static_cast<double>(tasks_) * inv_calls; 582 const double avg_tasks_per_worker = avg_tasks / avg_workers; 583 const double inv_freq = 1.0 / platform::InvariantTicksPerSecond(); 584 const double sum_wait_before = static_cast<double>(wait_before_) * inv_freq; 585 const double avg_wait_before = 586 root_ ? sum_wait_before / static_cast<double>(root_) : 0.0; 587 const double elapsed = static_cast<double>(elapsed_) * inv_freq; 588 const double avg_elapsed = elapsed * inv_calls; 589 const double task_len = avg_elapsed / avg_tasks_per_worker; 590 printf( 591 "%40s: %7.0f x (%3.0f%%) %2zu clusters, %4.1f workers @ " 592 "%5.1f tasks (%5u-%5u), " 593 "%5.0f us wait, %6.1E us run (task len %6.1E us), total %6.2f s\n", 594 caller, static_cast<double>(calls_), pc_root, active_clusters, 595 avg_workers, avg_tasks_per_worker, static_cast<uint32_t>(min_tasks_), 596 static_cast<uint32_t>(max_tasks_), avg_wait_before * 1E6, 597 avg_elapsed * 1E6, task_len * 1E6, elapsed); 598 *this = CallerAccumulator(); 599 } 600 601 // For the grand total, only print calls and elapsed because averaging the 602 // the other stats is not very useful. No need to reset because this is called 603 // on a temporary. 604 void PrintTotal() { 605 if (!Any()) return; 606 HWY_ASSERT(root_ <= calls_); 607 const double elapsed = 608 static_cast<double>(elapsed_) / platform::InvariantTicksPerSecond(); 609 printf("TOTAL: %7.0f x run %6.2f s\n", static_cast<double>(calls_), 610 elapsed); 611 } 612 613 private: 614 int64_t calls_ = 0; 615 int64_t root_ = 0; 616 uint64_t workers_ = 0; 617 uint64_t min_tasks_ = ~uint64_t{0}; 618 uint64_t max_tasks_ = 0; 619 uint64_t tasks_ = 0; 620 // both are wall time for root Run, otherwise CPU time. 621 timer::Ticks wait_before_ = 0; 622 timer::Ticks elapsed_ = 0; 623 }; 624 static_assert(sizeof(CallerAccumulator) == 64, ""); 625 626 class PerCluster { 627 public: 628 CallerAccumulator& Get(size_t caller_idx) { 629 HWY_DASSERT(caller_idx < kMaxCallers); 630 callers_.Set(caller_idx); 631 return accumulators_[caller_idx]; 632 } 633 634 template <class Func> 635 void ForeachCaller(Func&& func) { 636 callers_.Foreach([&](size_t caller_idx) { 637 func(caller_idx, accumulators_[caller_idx]); 638 }); 639 } 640 641 // Returns indices (required for `StringTable::Name`) in descending order of 642 // elapsed time. 643 std::vector<size_t> Sorted() { 644 std::vector<size_t> vec; 645 vec.reserve(kMaxCallers); 646 ForeachCaller([&](size_t caller_idx, CallerAccumulator&) { 647 vec.push_back(caller_idx); 648 }); 649 std::sort(vec.begin(), vec.end(), [&](size_t a, size_t b) { 650 return accumulators_[a] > accumulators_[b]; 651 }); 652 return vec; 653 } 654 655 // Caller takes care of resetting `accumulators_`. 656 void ResetBits() { callers_ = hwy::BitSet<kMaxCallers>(); } 657 658 private: 659 CallerAccumulator accumulators_[kMaxCallers]; 660 hwy::BitSet<kMaxCallers> callers_; 661 }; 662 663 // Type-safe wrapper. 664 class Caller { 665 public: 666 Caller() : idx_(0) {} // `AddCaller` never returns 0. 667 explicit Caller(size_t idx) : idx_(idx) { HWY_DASSERT(idx < kMaxCallers); } 668 size_t Idx() const { return idx_; } 669 670 private: 671 size_t idx_; 672 }; 673 674 // Singleton, shared by all ThreadPool. 675 class Shared { 676 public: 677 static HWY_DLLEXPORT Shared& Get(); // Thread-safe. 678 679 Stopwatch MakeStopwatch() const { return Stopwatch(timer_); } 680 Stopwatch& LastRootEnd() { return last_root_end_; } 681 682 // Thread-safe. Calls with the same `name` return the same `Caller`. 683 Caller AddCaller(const char* name) { return Caller(callers_.Add(name)); } 684 685 PerCluster& Cluster(size_t cluster_idx) { 686 HWY_DASSERT(cluster_idx < kMaxClusters); 687 return per_cluster_[cluster_idx]; 688 } 689 690 // Called from the main thread via `Profiler::PrintResults`. 691 void PrintAndReset() { 692 // Start counting pools (= one per cluster) invoked by each caller. 693 size_t active_clusters[kMaxCallers] = {}; 694 per_cluster_[0].ForeachCaller( 695 [&](size_t caller_idx, CallerAccumulator& acc) { 696 active_clusters[caller_idx] = acc.Any(); 697 }); 698 // Reduce per-cluster accumulators into the first cluster. 699 for (size_t cluster_idx = 1; cluster_idx < kMaxClusters; ++cluster_idx) { 700 per_cluster_[cluster_idx].ForeachCaller( 701 [&](size_t caller_idx, CallerAccumulator& acc) { 702 active_clusters[caller_idx] += acc.Any(); 703 per_cluster_[0].Get(caller_idx).AddFrom(acc); 704 acc = CallerAccumulator(); 705 }); 706 per_cluster_[cluster_idx].ResetBits(); 707 } 708 709 CallerAccumulator total; 710 for (size_t caller_idx : per_cluster_[0].Sorted()) { 711 CallerAccumulator& acc = per_cluster_[0].Get(caller_idx); 712 total.AddFrom(acc); // must be before PrintAndReset. 713 acc.PrintAndReset(callers_.Name(caller_idx), active_clusters[caller_idx]); 714 } 715 total.PrintTotal(); 716 per_cluster_[0].ResetBits(); 717 } 718 719 private: 720 Shared() // called via Get(). 721 : last_root_end_(timer_), 722 send_config(callers_.Add("SendConfig")), 723 dtor(callers_.Add("PoolDtor")), 724 print_stats(callers_.Add("PrintStats")) { 725 Profiler::Get().AddFunc(this, [this]() { PrintAndReset(); }); 726 // Can skip `RemoveFunc` because the singleton never dies. 727 } 728 729 const Timer timer_; 730 Stopwatch last_root_end_; 731 732 PerCluster per_cluster_[kMaxClusters]; 733 StringTable<kMaxCallers> callers_; 734 735 public: 736 // Returned from `callers_.Add`: 737 Caller send_config; 738 Caller dtor; 739 Caller print_stats; 740 }; 741 742 #else 743 744 struct Stats { 745 void NotifyRunStatic(size_t, timer::Ticks) {} 746 void NotifyRunDynamic(size_t, size_t, size_t, timer::Ticks) {} 747 void NotifyThreadRun(size_t, timer::Ticks, size_t, timer::Ticks, 748 timer::Ticks) {} 749 void NotifyMainRun(size_t, timer::Ticks, timer::Ticks, timer::Ticks, 750 timer::Ticks) {} 751 void PrintAndReset(size_t, timer::Ticks) {} 752 void Reset(size_t = kMaxThreads) {} 753 }; 754 755 struct Caller {}; 756 757 class Shared { 758 public: 759 static HWY_DLLEXPORT Shared& Get(); // Thread-safe. 760 761 Stopwatch MakeStopwatch() const { return Stopwatch(timer_); } 762 763 Caller AddCaller(const char*) { return Caller(); } 764 765 private: 766 Shared() {} 767 768 const Timer timer_; 769 770 public: 771 Caller send_config; 772 Caller dtor; 773 Caller print_stats; 774 }; 775 776 #endif // PROFILER_ENABLED 777 778 // Per-worker state used by both main and worker threads. `ThreadFunc` 779 // (threads) and `ThreadPool` (main) have a few additional members of their own. 780 class alignas(HWY_ALIGNMENT) Worker { // HWY_ALIGNMENT bytes 781 static constexpr size_t kMaxVictims = 4; 782 783 static constexpr auto kAcq = std::memory_order_acquire; 784 static constexpr auto kRel = std::memory_order_release; 785 786 bool OwnsGlobalIdx() const { 787 #if PROFILER_ENABLED 788 if (global_idx_ >= profiler::kMaxWorkers) { 789 HWY_WARN("Windows-only bug? global_idx %zu >= %zu.", global_idx_, 790 profiler::kMaxWorkers); 791 } 792 #endif // PROFILER_ENABLED 793 // Across-cluster pool owns all except the main thread, which is reserved by 794 // profiler.cc. 795 if (cluster_idx_ == kAllClusters) return global_idx_ != 0; 796 // Within-cluster pool owns all except *its* main thread, because that is 797 // owned by the across-cluster pool. 798 return worker_ != 0; 799 } 800 801 public: 802 Worker(const size_t worker, const size_t num_threads, 803 const PoolWorkerMapping mapping, const Divisor64& div_workers, 804 const Stopwatch& stopwatch) 805 : workers_(this - worker), 806 worker_(worker), 807 num_threads_(num_threads), 808 stopwatch_(stopwatch), 809 // If `num_threads == 0`, we might be in an inner pool and must use 810 // the `global_idx` we are currently running on. 811 global_idx_(num_threads == 0 ? Profiler::GlobalIdx() : mapping(worker)), 812 cluster_idx_(mapping.ClusterIdx()) { 813 HWY_DASSERT(IsAligned(this, HWY_ALIGNMENT)); 814 HWY_DASSERT(worker <= num_threads); 815 const size_t num_workers = static_cast<size_t>(div_workers.GetDivisor()); 816 num_victims_ = static_cast<uint32_t>(HWY_MIN(kMaxVictims, num_workers)); 817 818 // Increase gap between coprimes to reduce collisions. 819 const uint32_t coprime = ShuffledIota::FindAnotherCoprime( 820 static_cast<uint32_t>(num_workers), 821 static_cast<uint32_t>((worker + 1) * 257 + worker * 13)); 822 const ShuffledIota shuffled_iota(coprime); 823 824 // To simplify `WorkerRun`, this worker is the first to 'steal' from. 825 victims_[0] = static_cast<uint32_t>(worker); 826 for (uint32_t i = 1; i < num_victims_; ++i) { 827 victims_[i] = shuffled_iota.Next(victims_[i - 1], div_workers); 828 HWY_DASSERT(victims_[i] != worker); 829 } 830 831 HWY_IF_CONSTEXPR(PROFILER_ENABLED) { 832 if (HWY_LIKELY(OwnsGlobalIdx())) { 833 Profiler::Get().ReserveWorker(global_idx_); 834 } 835 } 836 } 837 838 ~Worker() { 839 HWY_IF_CONSTEXPR(PROFILER_ENABLED) { 840 if (HWY_LIKELY(OwnsGlobalIdx())) { 841 Profiler::Get().FreeWorker(global_idx_); 842 } 843 } 844 } 845 846 // Placement-newed by `WorkerLifecycle`, we do not expect any copying. 847 Worker(const Worker&) = delete; 848 Worker& operator=(const Worker&) = delete; 849 850 size_t Index() const { return worker_; } 851 // For work stealing. 852 Worker* AllWorkers() { return workers_; } 853 const Worker* AllWorkers() const { return workers_; } 854 size_t NumThreads() const { return num_threads_; } 855 856 size_t GlobalIdx() const { return global_idx_; } 857 size_t ClusterIdx() const { return cluster_idx_; } 858 859 void SetStartTime() { stopwatch_.Reset(); } 860 timer::Ticks ElapsedTime() { return stopwatch_.Elapsed(); } 861 862 // ------------------------ Per-worker storage for `SendConfig` 863 864 Config NextConfig() const { return next_config_; } 865 // Called during `SendConfig` by workers and now also the main thread. This 866 // avoids a separate `ThreadPool` member which risks going out of sync. 867 void SetNextConfig(Config copy) { next_config_ = copy; } 868 869 Exit GetExit() const { return exit_; } 870 void SetExit(Exit exit) { exit_ = exit; } 871 872 uint32_t WorkerEpoch() const { return worker_epoch_; } 873 uint32_t AdvanceWorkerEpoch() { return ++worker_epoch_; } 874 875 // ------------------------ Task assignment 876 877 // Called from the main thread. 878 void SetRange(const uint64_t begin, const uint64_t end) { 879 my_begin_.store(begin, kRel); 880 my_end_.store(end, kRel); 881 } 882 883 uint64_t MyEnd() const { return my_end_.load(kAcq); } 884 885 Span<const uint32_t> Victims() const { 886 return hwy::Span<const uint32_t>(victims_.data(), 887 static_cast<size_t>(num_victims_)); 888 } 889 890 // Returns the next task to execute. If >= MyEnd(), it must be skipped. 891 uint64_t WorkerReserveTask() { 892 // TODO(janwas): replace with cooperative work-stealing. 893 return my_begin_.fetch_add(1, std::memory_order_relaxed); 894 } 895 896 // ------------------------ Waiter: Threads wait for tasks 897 898 // WARNING: some `Wait*` do not set this for all Worker instances. For 899 // example, `WaitType::kBlock` only uses the first worker's `Waiter` because 900 // one futex can wake multiple waiters. Hence we never load this directly 901 // without going through `Wait*` policy classes, and must ensure all threads 902 // use the same wait mode. 903 904 const std::atomic<uint32_t>& Waiter() const { return wait_epoch_; } 905 std::atomic<uint32_t>& MutableWaiter() { return wait_epoch_; } // futex 906 void StoreWaiter(uint32_t epoch) { wait_epoch_.store(epoch, kRel); } 907 908 // ------------------------ Barrier: Main thread waits for workers 909 910 // For use by `HasReached` and `UntilReached`. 911 const std::atomic<uint32_t>& Barrier() const { return barrier_epoch_; } 912 // Setting to `epoch` signals that the worker has reached the barrier. 913 void StoreBarrier(uint32_t epoch) { barrier_epoch_.store(epoch, kRel); } 914 915 private: 916 // Set by `SetRange`: 917 std::atomic<uint64_t> my_begin_; 918 std::atomic<uint64_t> my_end_; 919 920 Worker* const workers_; 921 const size_t worker_; 922 const size_t num_threads_; 923 924 Stopwatch stopwatch_; // Reset by `SetStartTime`. 925 const size_t global_idx_; 926 const size_t cluster_idx_; 927 928 // Use u32 to match futex.h. These must start at the initial value of 929 // `worker_epoch_`. 930 std::atomic<uint32_t> wait_epoch_{1}; 931 std::atomic<uint32_t> barrier_epoch_{1}; 932 933 uint32_t num_victims_; // <= kPoolMaxVictims 934 std::array<uint32_t, kMaxVictims> victims_; 935 936 // Written and read by the same thread, hence not atomic. 937 Config next_config_; 938 Exit exit_ = Exit::kNone; 939 // thread_pool_test requires nonzero epoch. 940 uint32_t worker_epoch_ = 1; 941 942 HWY_MEMBER_VAR_MAYBE_UNUSED uint8_t 943 padding_[HWY_ALIGNMENT - 56 - 6 * sizeof(void*) - sizeof(victims_)]; 944 }; 945 static_assert(sizeof(Worker) == HWY_ALIGNMENT, ""); 946 947 // Creates/destroys `Worker` using preallocated storage. See comment at 948 // `ThreadPool::worker_bytes_` for why we do not dynamically allocate. 949 class WorkerLifecycle { // 0 bytes 950 public: 951 // Placement new for `Worker` into `storage` because its ctor requires 952 // the worker index. Returns array of all workers. 953 static Worker* Init(uint8_t* storage, size_t num_threads, 954 PoolWorkerMapping mapping, const Divisor64& div_workers, 955 Shared& shared) { 956 Worker* workers = new (storage) 957 Worker(0, num_threads, mapping, div_workers, shared.MakeStopwatch()); 958 for (size_t worker = 1; worker <= num_threads; ++worker) { 959 new (Addr(storage, worker)) Worker(worker, num_threads, mapping, 960 div_workers, shared.MakeStopwatch()); 961 // Ensure pointer arithmetic is the same (will be used in Destroy). 962 HWY_DASSERT(reinterpret_cast<uintptr_t>(workers + worker) == 963 reinterpret_cast<uintptr_t>(Addr(storage, worker))); 964 } 965 966 // Publish non-atomic stores in `workers`. 967 std::atomic_thread_fence(std::memory_order_release); 968 969 return workers; 970 } 971 972 static void Destroy(Worker* workers, size_t num_threads) { 973 for (size_t worker = 0; worker <= num_threads; ++worker) { 974 workers[worker].~Worker(); 975 } 976 } 977 978 private: 979 static uint8_t* Addr(uint8_t* storage, size_t worker) { 980 return storage + worker * sizeof(Worker); 981 } 982 }; 983 984 // Stores arguments to `Run`: the function and range of task indices. Set by 985 // the main thread, read by workers including the main thread. 986 class Tasks { 987 static constexpr auto kAcq = std::memory_order_acquire; 988 989 // Signature of the (internal) function called from workers(s) for each 990 // `task` in the [`begin`, `end`) passed to Run(). Closures (lambdas) do not 991 // receive the first argument, which points to the lambda object. 992 typedef void (*RunFunc)(const void* opaque, uint64_t task, size_t worker); 993 994 public: 995 Tasks() { HWY_DASSERT(IsAligned(this, 8)); } 996 997 template <class Closure> 998 void Set(uint64_t begin, uint64_t end, const Closure& closure) { 999 constexpr auto kRel = std::memory_order_release; 1000 // `TestTasks` and `SetWaitMode` call this with `begin == end`. 1001 HWY_DASSERT(begin <= end); 1002 begin_.store(begin, kRel); 1003 end_.store(end, kRel); 1004 func_.store(static_cast<RunFunc>(&CallClosure<Closure>), kRel); 1005 opaque_.store(reinterpret_cast<const void*>(&closure), kRel); 1006 } 1007 1008 // Assigns workers their share of `[begin, end)`. Called from the main 1009 // thread; workers are initializing or waiting for a command. 1010 // Negligible CPU time. 1011 static void DivideRangeAmongWorkers(const uint64_t begin, const uint64_t end, 1012 const Divisor64& div_workers, 1013 Worker* workers) { 1014 const size_t num_workers = static_cast<size_t>(div_workers.GetDivisor()); 1015 HWY_DASSERT(num_workers > 1); // Else Run() runs on the main thread. 1016 HWY_DASSERT(begin <= end); 1017 const size_t num_tasks = static_cast<size_t>(end - begin); 1018 1019 // Assigning all remainders to the last worker causes imbalance. We instead 1020 // give one more to each worker whose index is less. This may be zero when 1021 // called from `TestTasks`. 1022 const size_t min_tasks = static_cast<size_t>(div_workers.Divide(num_tasks)); 1023 const size_t remainder = 1024 static_cast<size_t>(div_workers.Remainder(num_tasks)); 1025 1026 uint64_t my_begin = begin; 1027 for (size_t worker = 0; worker < num_workers; ++worker) { 1028 const uint64_t my_end = my_begin + min_tasks + (worker < remainder); 1029 workers[worker].SetRange(my_begin, my_end); 1030 my_begin = my_end; 1031 } 1032 HWY_DASSERT(my_begin == end); 1033 } 1034 1035 // Runs the worker's assigned range of tasks, plus work stealing if needed. 1036 void WorkerRun(Worker* worker, const Shared& shared, Stats& stats) const { 1037 if (NumTasks() > worker->NumThreads() + 1) { 1038 WorkerRunDynamic(worker, shared, stats); 1039 } else { 1040 WorkerRunStatic(worker, shared, stats); 1041 } 1042 } 1043 1044 private: 1045 // Special case for <= 1 task per worker, where stealing is unnecessary. 1046 void WorkerRunStatic(Worker* worker, const Shared& shared, 1047 Stats& stats) const { 1048 const uint64_t begin = begin_.load(kAcq); 1049 const uint64_t end = end_.load(kAcq); 1050 HWY_DASSERT(begin <= end); 1051 const size_t index = worker->Index(); 1052 1053 const uint64_t task = begin + index; 1054 // We might still have more workers than tasks, so check first. 1055 if (HWY_LIKELY(task < end)) { 1056 const void* opaque = Opaque(); 1057 const RunFunc func = Func(); 1058 Stopwatch stopwatch = shared.MakeStopwatch(); 1059 func(opaque, task, index); 1060 stats.NotifyRunStatic(index, stopwatch.Elapsed()); 1061 } 1062 } 1063 1064 // Must be called for each `worker` in [0, num_workers). 1065 // 1066 // A prior version of this code attempted to assign only as much work as a 1067 // worker will actually use. As with OpenMP's 'guided' strategy, we assigned 1068 // remaining/(k*num_threads) in each iteration. Although the worst-case 1069 // imbalance is bounded, this required several rounds of work allocation, and 1070 // the atomic counter did not scale to > 30 threads. 1071 // 1072 // We now use work stealing instead, where already-finished workers look for 1073 // and perform work from others, as if they were that worker. This deals with 1074 // imbalances as they arise, but care is required to reduce contention. We 1075 // randomize the order in which threads choose victims to steal from. 1076 void WorkerRunDynamic(Worker* worker, const Shared& shared, 1077 Stats& stats) const { 1078 Worker* workers = worker->AllWorkers(); 1079 const size_t index = worker->Index(); 1080 const RunFunc func = Func(); 1081 const void* opaque = Opaque(); 1082 1083 size_t sum_tasks = 0; 1084 size_t sum_stolen = 0; 1085 timer::Ticks sum_d_func = 0; 1086 // For each worker in random order, starting with our own, attempt to do 1087 // all their work. 1088 for (uint32_t victim : worker->Victims()) { 1089 Worker* other_worker = workers + victim; 1090 1091 // Until all of other_worker's work is done: 1092 const uint64_t other_end = other_worker->MyEnd(); 1093 for (;;) { 1094 // The worker that first sets `task` to `other_end` exits this loop. 1095 // After that, `task` can be incremented up to `num_workers - 1` times, 1096 // once per other worker. 1097 const uint64_t task = other_worker->WorkerReserveTask(); 1098 if (HWY_UNLIKELY(task >= other_end)) { 1099 hwy::Pause(); // Reduce coherency traffic while stealing. 1100 break; 1101 } 1102 Stopwatch stopwatch = shared.MakeStopwatch(); 1103 // Pass the index we are actually running on; this is important 1104 // because it is the TLS index for user code. 1105 func(opaque, task, index); 1106 sum_tasks++; 1107 sum_stolen += worker != other_worker; 1108 sum_d_func += stopwatch.Elapsed(); 1109 } 1110 } 1111 stats.NotifyRunDynamic(index, sum_tasks, sum_stolen, sum_d_func); 1112 } 1113 1114 size_t NumTasks() const { 1115 return static_cast<size_t>(end_.load(kAcq) - begin_.load(kAcq)); 1116 } 1117 1118 const void* Opaque() const { return opaque_.load(kAcq); } 1119 RunFunc Func() const { return func_.load(kAcq); } 1120 1121 // Calls closure(task, worker). Signature must match `RunFunc`. 1122 template <class Closure> 1123 static void CallClosure(const void* opaque, uint64_t task, size_t worker) { 1124 (*reinterpret_cast<const Closure*>(opaque))(task, worker); 1125 } 1126 1127 std::atomic<uint64_t> begin_; 1128 std::atomic<uint64_t> end_; 1129 std::atomic<RunFunc> func_; 1130 std::atomic<const void*> opaque_; 1131 }; 1132 static_assert(sizeof(Tasks) == 16 + 2 * sizeof(void*), ""); 1133 1134 // ------------------------------ Threads wait, main wakes them 1135 1136 // Considerations: 1137 // - uint32_t storage per `Worker` so we can use `futex.h`. 1138 // - avoid atomic read-modify-write. These are implemented on x86 using a LOCK 1139 // prefix, which interferes with other cores' cache-coherency transactions 1140 // and drains our core's store buffer. We use only store-release and 1141 // load-acquire. Although expressed using `std::atomic`, these are normal 1142 // loads/stores in the strong x86 memory model. 1143 // - prefer to avoid resetting the state. "Sense-reversing" (flipping a flag) 1144 // would work, but we we prefer an 'epoch' counter because it is more useful 1145 // and easier to understand/debug, and as fast. 1146 1147 // Both the main thread and each worker maintain their own counter, which are 1148 // implicitly synchronized by the barrier. To wake, the main thread does a 1149 // store-release, and each worker does a load-acquire. The policy classes differ 1150 // in whether they block or spin (with pause/monitor to reduce power), and 1151 // whether workers check their own counter or a shared one. 1152 // 1153 // All methods are const because they only use storage in `Worker`, and we 1154 // prefer to pass const-references to empty classes to enable type deduction. 1155 1156 // Futex: blocking reduces apparent CPU usage, but has higher wake latency. 1157 struct WaitBlock { 1158 // Wakes all workers by storing the current `epoch`. 1159 void WakeWorkers(Worker* workers, const uint32_t epoch) const { 1160 HWY_DASSERT(epoch != 0); 1161 workers[1].StoreWaiter(epoch); 1162 WakeAll(workers[1].MutableWaiter()); // futex: expensive syscall 1163 } 1164 1165 // Waits until `WakeWorkers(_, epoch)` has been called. 1166 template <class Spin> 1167 size_t UntilWoken(const Worker& worker, const Spin& /*spin*/) const { 1168 HWY_DASSERT(worker.Index() != 0); // main is 0 1169 const uint32_t epoch = worker.WorkerEpoch(); 1170 const Worker* workers = worker.AllWorkers(); 1171 BlockUntilDifferent(epoch - 1, workers[1].Waiter()); 1172 return 1; // iterations 1173 } 1174 }; 1175 1176 // Single u32: single store by the main thread. All worker threads poll this 1177 // one cache line and thus have it in a shared state, which means the store 1178 // will invalidate each of them, leading to more transactions than SpinSeparate. 1179 struct WaitSpin1 { 1180 void WakeWorkers(Worker* workers, const uint32_t epoch) const { 1181 workers[1].StoreWaiter(epoch); 1182 } 1183 1184 // Returns the number of spin-wait iterations. 1185 template <class Spin> 1186 size_t UntilWoken(const Worker& worker, const Spin& spin) const { 1187 HWY_DASSERT(worker.Index() != 0); // main is 0 1188 const Worker* workers = worker.AllWorkers(); 1189 const uint32_t epoch = worker.WorkerEpoch(); 1190 return spin.UntilEqual(epoch, workers[1].Waiter()); 1191 } 1192 }; 1193 1194 // Separate u32 per thread: more stores for the main thread, but each worker 1195 // only polls its own cache line, leading to fewer cache-coherency transactions. 1196 struct WaitSpinSeparate { 1197 void WakeWorkers(Worker* workers, const uint32_t epoch) const { 1198 for (size_t thread = 0; thread < workers->NumThreads(); ++thread) { 1199 workers[1 + thread].StoreWaiter(epoch); 1200 } 1201 } 1202 1203 template <class Spin> 1204 size_t UntilWoken(const Worker& worker, const Spin& spin) const { 1205 HWY_DASSERT(worker.Index() != 0); // main is 0 1206 const uint32_t epoch = worker.WorkerEpoch(); 1207 return spin.UntilEqual(epoch, worker.Waiter()); 1208 } 1209 }; 1210 1211 // Calls unrolled code selected by all config enums. 1212 template <class Func, typename... Args> 1213 HWY_INLINE void CallWithConfig(const Config& config, Func&& func, 1214 Args&&... args) { 1215 switch (config.wait_type) { 1216 case WaitType::kBlock: 1217 return func(SpinPause(), WaitBlock(), std::forward<Args>(args)...); 1218 case WaitType::kSpin1: 1219 return CallWithSpin(config.spin_type, func, WaitSpin1(), 1220 std::forward<Args>(args)...); 1221 case WaitType::kSpinSeparate: 1222 return CallWithSpin(config.spin_type, func, WaitSpinSeparate(), 1223 std::forward<Args>(args)...); 1224 case WaitType::kSentinel: 1225 HWY_UNREACHABLE; 1226 } 1227 } 1228 1229 // ------------------------------ Barrier: Main thread waits for workers 1230 1231 // Similar to `WaitSpinSeparate`, a store-release of the same local epoch 1232 // counter serves as a "have arrived" flag that does not require resetting. 1233 class Barrier { 1234 public: 1235 void WorkerReached(Worker& worker, uint32_t epoch) const { 1236 HWY_DASSERT(worker.Index() != 0); // main is 0 1237 worker.StoreBarrier(epoch); 1238 } 1239 1240 // Returns true if `worker` (can be the main thread) reached the barrier. 1241 bool HasReached(const Worker* worker, uint32_t epoch) const { 1242 const uint32_t barrier = worker->Barrier().load(std::memory_order_acquire); 1243 HWY_DASSERT(barrier <= epoch); 1244 return barrier == epoch; 1245 } 1246 1247 // Main thread loops over each worker. A "group of 2 or 4" barrier was not 1248 // competitive on Skylake, Granite Rapids and Zen5. 1249 template <class Spin> 1250 void UntilReached(size_t num_threads, Worker* workers, const Spin& spin, 1251 uint32_t epoch) const { 1252 workers[0].StoreBarrier(epoch); // for main thread HasReached. 1253 1254 for (size_t i = 0; i < num_threads; ++i) { 1255 // TODO: log number of spin-wait iterations. 1256 (void)spin.UntilEqual(epoch, workers[1 + i].Barrier()); 1257 } 1258 } 1259 }; 1260 1261 // In debug builds, detects when functions are re-entered. 1262 class BusyFlag { 1263 public: 1264 void Set() { HWY_DASSERT(!busy_.test_and_set()); } 1265 void Clear() { HWY_IF_CONSTEXPR(HWY_IS_DEBUG_BUILD) busy_.clear(); } 1266 1267 private: 1268 std::atomic_flag busy_ = ATOMIC_FLAG_INIT; 1269 }; 1270 1271 } // namespace pool 1272 1273 // Highly efficient parallel-for, intended for workloads with thousands of 1274 // fork-join regions which consist of calling tasks[t](i) for a few hundred i, 1275 // using dozens of threads. 1276 // 1277 // To reduce scheduling overhead, we assume that tasks are statically known and 1278 // that threads do not schedule new work themselves. This allows us to avoid 1279 // queues and only store a counter plus the current task. The latter is a 1280 // pointer to a lambda function, without the allocation/indirection required for 1281 // `std::function`. 1282 // 1283 // To reduce fork/join latency, we choose an efficient barrier, optionally 1284 // enable spin-waits via `SetWaitMode`, and avoid any mutex/lock. We largely 1285 // even avoid atomic RMW operations (LOCK prefix): currently for the wait and 1286 // barrier, in future hopefully also for work stealing. 1287 // 1288 // To eliminate false sharing and enable reasoning about cache line traffic, the 1289 // class is aligned and holds all worker state. 1290 // 1291 // For load-balancing, we use work stealing in random order. 1292 class alignas(HWY_ALIGNMENT) ThreadPool { 1293 // Used to initialize `num_threads_` from the ctor argument. 1294 static size_t ClampedNumThreads(size_t num_threads) { 1295 // Upper bound is required for `worker_bytes_`. 1296 if (HWY_UNLIKELY(num_threads > pool::kMaxThreads)) { 1297 HWY_WARN("ThreadPool: clamping num_threads %zu to %zu.", num_threads, 1298 pool::kMaxThreads); 1299 num_threads = pool::kMaxThreads; 1300 } 1301 return num_threads; 1302 } 1303 1304 public: 1305 // This typically includes hyperthreads, hence it is a loose upper bound. 1306 // -1 because these are in addition to the main thread. 1307 static size_t MaxThreads() { 1308 LogicalProcessorSet lps; 1309 // This is OS dependent, but more accurate if available because it takes 1310 // into account restrictions set by cgroups or numactl/taskset. 1311 if (GetThreadAffinity(lps)) { 1312 return lps.Count() - 1; 1313 } 1314 return static_cast<size_t>(std::thread::hardware_concurrency() - 1); 1315 } 1316 1317 // `num_threads` is the number of *additional* threads to spawn, which should 1318 // not exceed `MaxThreads()`. Note that the main thread also performs work. 1319 // `mapping` indicates how to map local worker_idx to global. 1320 ThreadPool(size_t num_threads, 1321 PoolWorkerMapping mapping = PoolWorkerMapping()) 1322 : num_threads_(ClampedNumThreads(num_threads)), 1323 div_workers_(1 + num_threads_), 1324 shared_(pool::Shared::Get()), // on first call, calls ReserveWorker(0)! 1325 workers_(pool::WorkerLifecycle::Init(worker_bytes_, num_threads_, 1326 mapping, div_workers_, shared_)) { 1327 // Leaves the default wait mode as `kBlock`, which means futex, because 1328 // spinning only makes sense when threads are pinned and wake latency is 1329 // important, so it must explicitly be requested by calling `SetWaitMode`. 1330 for (PoolWaitMode mode : {PoolWaitMode::kSpin, PoolWaitMode::kBlock}) { 1331 wait_mode_ = mode; // for AutoTuner 1332 AutoTuner().SetCandidates( 1333 pool::Config::AllCandidates(mode)); 1334 } 1335 1336 // Skip empty pools because they do not update stats anyway. 1337 if (num_threads_ > 0) { 1338 Profiler::Get().AddFunc(this, [this]() { PrintStats(); }); 1339 } 1340 1341 threads_.reserve(num_threads_); 1342 for (size_t thread = 0; thread < num_threads_; ++thread) { 1343 threads_.emplace_back( 1344 ThreadFunc(workers_[1 + thread], tasks_, shared_, stats_)); 1345 } 1346 1347 // Threads' `Config` defaults to spinning. Change to `kBlock` (see above). 1348 // This also ensures all threads have started before we return, so that 1349 // startup latency is billed to the ctor, not the first `Run`. 1350 SendConfig(AutoTuner().Candidates()[0]); 1351 } 1352 1353 // If we created threads, waits for them all to exit. 1354 ~ThreadPool() { 1355 // There is no portable way to request threads to exit like `ExitThread` on 1356 // Windows, otherwise we could call that from `Run`. Instead, we must cause 1357 // the thread to wake up and exit. We can just use `Run`. 1358 (void)RunWithoutAutotune( 1359 0, NumWorkers(), shared_.dtor, 1360 [this](HWY_MAYBE_UNUSED uint64_t task, size_t worker) { 1361 HWY_DASSERT(task == worker); 1362 workers_[worker].SetExit(Exit::kThread); 1363 }); 1364 1365 for (std::thread& thread : threads_) { 1366 HWY_DASSERT(thread.joinable()); 1367 thread.join(); 1368 } 1369 1370 if (num_threads_ > 0) { 1371 Profiler::Get().RemoveFunc(this); 1372 } 1373 1374 pool::WorkerLifecycle::Destroy(workers_, num_threads_); 1375 } 1376 1377 ThreadPool(const ThreadPool&) = delete; 1378 ThreadPool& operator&(const ThreadPool&) = delete; 1379 1380 // Returns number of Worker, i.e., one more than the largest `worker` 1381 // argument. Useful for callers that want to allocate thread-local storage. 1382 size_t NumWorkers() const { 1383 return static_cast<size_t>(div_workers_.GetDivisor()); 1384 } 1385 1386 // `mode` defaults to `kBlock`, which means futex. Switching to `kSpin` 1387 // reduces fork-join overhead especially when there are many calls to `Run`, 1388 // but wastes power when waiting over long intervals. Must not be called 1389 // concurrently with any `Run`, because this uses the same waiter/barrier. 1390 void SetWaitMode(PoolWaitMode mode) { 1391 wait_mode_ = mode; 1392 SendConfig(AutoTuneComplete() ? *AutoTuner().Best() 1393 : AutoTuner().NextConfig()); 1394 } 1395 1396 // For printing which is in use. 1397 pool::Config config() const { return workers_[0].NextConfig(); } 1398 1399 bool AutoTuneComplete() const { return AutoTuner().Best(); } 1400 Span<CostDistribution> AutoTuneCosts() { return AutoTuner().Costs(); } 1401 1402 static pool::Caller AddCaller(const char* name) { 1403 return pool::Shared::Get().AddCaller(name); 1404 } 1405 1406 // parallel-for: Runs `closure(task, worker)` on workers for every `task` in 1407 // `[begin, end)`. Note that the unit of work should be large enough to 1408 // amortize the function call overhead, but small enough that each worker 1409 // processes a few tasks. Thus each `task` is usually a loop. 1410 // 1411 // Not thread-safe - concurrent parallel-for in the same `ThreadPool` are 1412 // forbidden unless `NumWorkers() == 1` or `end <= begin + 1`. 1413 template <class Closure> 1414 void Run(uint64_t begin, uint64_t end, pool::Caller caller, 1415 const Closure& closure) { 1416 AutoTuneT& auto_tuner = AutoTuner(); 1417 // Already finished tuning: run without time measurement. 1418 if (HWY_LIKELY(auto_tuner.Best())) { 1419 // Don't care whether threads ran, we are done either way. 1420 (void)RunWithoutAutotune(begin, end, caller, closure); 1421 return; 1422 } 1423 1424 // Not yet finished: measure time and notify autotuner. 1425 Stopwatch stopwatch(shared_.MakeStopwatch()); 1426 // Skip update if threads didn't actually run. 1427 if (!RunWithoutAutotune(begin, end, caller, closure)) return; 1428 auto_tuner.NotifyCost(stopwatch.Elapsed()); 1429 1430 pool::Config next = auto_tuner.NextConfig(); // may be overwritten below 1431 if (auto_tuner.Best()) { // just finished 1432 next = *auto_tuner.Best(); 1433 HWY_IF_CONSTEXPR(pool::kVerbosity >= 1) { 1434 const size_t idx_best = static_cast<size_t>( 1435 auto_tuner.Best() - auto_tuner.Candidates().data()); 1436 HWY_DASSERT(idx_best < auto_tuner.Costs().size()); 1437 auto& AT = auto_tuner.Costs()[idx_best]; 1438 const double best_cost = AT.EstimateCost(); 1439 HWY_DASSERT(best_cost > 0.0); // will divide by this below 1440 1441 Stats s_ratio; 1442 for (size_t i = 0; i < auto_tuner.Costs().size(); ++i) { 1443 if (i == idx_best) continue; 1444 const double cost = auto_tuner.Costs()[i].EstimateCost(); 1445 s_ratio.Notify(static_cast<float>(cost / best_cost)); 1446 } 1447 1448 fprintf(stderr, 1449 "Pool %3zu: %s %8.0f +/- %6.0f. Gain %.2fx [%.2fx, %.2fx]\n", 1450 NumWorkers(), auto_tuner.Best()->ToString().c_str(), best_cost, 1451 AT.Stddev(), s_ratio.GeometricMean(), 1452 static_cast<double>(s_ratio.Min()), 1453 static_cast<double>(s_ratio.Max())); 1454 } 1455 } 1456 SendConfig(next); 1457 } 1458 1459 // Backward-compatible version without Caller. 1460 template <class Closure> 1461 void Run(uint64_t begin, uint64_t end, const Closure& closure) { 1462 Run(begin, end, pool::Caller(), closure); 1463 } 1464 1465 private: 1466 // Called via `CallWithConfig`. 1467 struct MainWakeAndBarrier { 1468 template <class Spin, class Wait> 1469 void operator()(const Spin& spin, const Wait& wait, pool::Worker& main, 1470 const pool::Tasks& tasks, const pool::Shared& shared, 1471 pool::Stats& stats) const { 1472 const pool::Barrier barrier; 1473 pool::Worker* workers = main.AllWorkers(); 1474 HWY_DASSERT(&main == main.AllWorkers()); // main is first. 1475 const size_t num_threads = main.NumThreads(); 1476 const uint32_t epoch = main.AdvanceWorkerEpoch(); 1477 1478 HWY_IF_CONSTEXPR(HWY_IS_DEBUG_BUILD) { 1479 for (size_t i = 0; i < 1 + num_threads; ++i) { 1480 HWY_DASSERT(!barrier.HasReached(workers + i, epoch)); 1481 } 1482 } 1483 1484 Stopwatch stopwatch(shared.MakeStopwatch()); 1485 const timer::Ticks t_before_wake = stopwatch.Origin(); 1486 wait.WakeWorkers(workers, epoch); 1487 const timer::Ticks d_wake = stopwatch.Elapsed(); 1488 1489 // Also perform work on the main thread before the barrier. 1490 tasks.WorkerRun(&main, shared, stats); 1491 const timer::Ticks d_run = stopwatch.Elapsed(); 1492 1493 // Spin-waits until all worker *threads* (not `main`, because it already 1494 // knows it is here) called `WorkerReached`. 1495 barrier.UntilReached(num_threads, workers, spin, epoch); 1496 const timer::Ticks d_barrier = stopwatch.Elapsed(); 1497 stats.NotifyMainRun(main.NumThreads(), t_before_wake, d_wake, d_run, 1498 d_barrier); 1499 1500 HWY_IF_CONSTEXPR(HWY_IS_DEBUG_BUILD) { 1501 for (size_t i = 0; i < 1 + num_threads; ++i) { 1502 HWY_DASSERT(barrier.HasReached(workers + i, epoch)); 1503 } 1504 } 1505 1506 // Threads are or will soon be waiting `UntilWoken`, which serves as the 1507 // 'release' phase of the barrier. 1508 } 1509 }; 1510 1511 // Called by `std::thread`. Could also be a lambda. 1512 class ThreadFunc { 1513 // Functor called by `CallWithConfig`. Loops until `SendConfig` changes the 1514 // Spin or Wait policy or the pool is destroyed. 1515 struct WorkerLoop { 1516 template <class Spin, class Wait> 1517 void operator()(const Spin& spin, const Wait& wait, pool::Worker& worker, 1518 pool::Tasks& tasks, const pool::Shared& shared, 1519 pool::Stats& stats) const { 1520 do { 1521 // Main worker also calls this, so their epochs match. 1522 const uint32_t epoch = worker.AdvanceWorkerEpoch(); 1523 1524 Stopwatch stopwatch(shared.MakeStopwatch()); 1525 1526 const size_t wait_reps = wait.UntilWoken(worker, spin); 1527 const timer::Ticks d_wait = stopwatch.Elapsed(); 1528 const timer::Ticks t_before_run = stopwatch.Origin(); 1529 1530 tasks.WorkerRun(&worker, shared, stats); 1531 const timer::Ticks d_run = stopwatch.Elapsed(); 1532 stats.NotifyThreadRun(worker.Index(), d_wait, wait_reps, t_before_run, 1533 d_run); 1534 1535 // Notify barrier after `WorkerRun`. Note that we cannot send an 1536 // after-barrier timestamp, see above. 1537 pool::Barrier().WorkerReached(worker, epoch); 1538 // Check after `WorkerReached`, otherwise the main thread deadlocks. 1539 } while (worker.GetExit() == Exit::kNone); 1540 } 1541 }; 1542 1543 public: 1544 ThreadFunc(pool::Worker& worker, pool::Tasks& tasks, 1545 const pool::Shared& shared, pool::Stats& stats) 1546 : worker_(worker), tasks_(tasks), shared_(shared), stats_(stats) {} 1547 1548 void operator()() { 1549 // Ensure main thread's writes are visible (synchronizes with fence in 1550 // `WorkerLifecycle::Init`). 1551 std::atomic_thread_fence(std::memory_order_acquire); 1552 1553 HWY_DASSERT(worker_.Index() != 0); // main is 0 1554 SetThreadName("worker%03zu", static_cast<int>(worker_.Index() - 1)); 1555 1556 worker_.SetStartTime(); 1557 Profiler& profiler = Profiler::Get(); 1558 profiler.SetGlobalIdx(worker_.GlobalIdx()); 1559 // No Zone here because it would only exit after `GetExit`, which may be 1560 // after the main thread's `PROFILER_END_ROOT_RUN`, and thus too late to 1561 // be counted. Instead, `ProfilerFunc` records the elapsed time. 1562 1563 // Loop termination via `GetExit` is triggered by `~ThreadPool`. 1564 for (;;) { 1565 // Uses the initial config, or the last one set during WorkerRun. 1566 CallWithConfig(worker_.NextConfig(), WorkerLoop(), worker_, tasks_, 1567 shared_, stats_); 1568 1569 // Exit or reset the flag and return to WorkerLoop with a new config. 1570 if (worker_.GetExit() == Exit::kThread) break; 1571 worker_.SetExit(Exit::kNone); 1572 } 1573 1574 profiler.SetGlobalIdx(~size_t{0}); 1575 1576 // Defer `FreeWorker` until workers are destroyed to ensure the profiler 1577 // is not still using the worker. 1578 } 1579 1580 private: 1581 pool::Worker& worker_; 1582 pool::Tasks& tasks_; 1583 const pool::Shared& shared_; 1584 pool::Stats& stats_; 1585 }; 1586 1587 void PrintStats() { 1588 // Total run time from all non-main threads. 1589 std::atomic<timer::Ticks> sum_thread_elapsed{0}; 1590 (void)RunWithoutAutotune( 1591 0, NumWorkers(), shared_.print_stats, 1592 [this, &sum_thread_elapsed](HWY_MAYBE_UNUSED uint64_t task, 1593 size_t worker) { 1594 HWY_DASSERT(task == worker); 1595 // Skip any main thread(s) because they did not init the stopwatch. 1596 if (worker != 0) { 1597 sum_thread_elapsed.fetch_add(workers_[worker].ElapsedTime()); 1598 } 1599 }); 1600 const timer::Ticks thread_total = 1601 sum_thread_elapsed.load(std::memory_order_acquire); 1602 stats_.PrintAndReset(num_threads_, thread_total); 1603 } 1604 1605 // Returns whether threads were used. If not, there is no need to update 1606 // the autotuner config. 1607 template <class Closure> 1608 bool RunWithoutAutotune(uint64_t begin, uint64_t end, pool::Caller caller, 1609 const Closure& closure) { 1610 pool::Worker& main = workers_[0]; 1611 1612 const size_t num_tasks = static_cast<size_t>(end - begin); 1613 const size_t num_workers = NumWorkers(); 1614 1615 // If zero or one task, or no extra threads, run on the main thread without 1616 // setting any member variables, because we may be re-entering Run. 1617 if (HWY_UNLIKELY(num_tasks <= 1 || num_workers == 1)) { 1618 for (uint64_t task = begin; task < end; ++task) { 1619 closure(task, /*worker=*/0); 1620 } 1621 return false; 1622 } 1623 1624 busy_.Set(); 1625 1626 #if PROFILER_ENABLED 1627 const bool is_root = PROFILER_IS_ROOT_RUN(); 1628 Stopwatch stopwatch(shared_.MakeStopwatch()); 1629 const timer::Ticks wait_before = 1630 is_root ? shared_.LastRootEnd().Elapsed() : 0; 1631 #endif 1632 1633 tasks_.Set(begin, end, closure); 1634 1635 // More than one task per worker: use work stealing. 1636 if (HWY_LIKELY(num_tasks > num_workers)) { 1637 pool::Tasks::DivideRangeAmongWorkers(begin, end, div_workers_, workers_); 1638 } 1639 1640 // Runs `MainWakeAndBarrier` with the first worker slot. 1641 CallWithConfig(config(), MainWakeAndBarrier(), main, tasks_, shared_, 1642 stats_); 1643 1644 #if PROFILER_ENABLED 1645 pool::CallerAccumulator& acc = 1646 shared_.Cluster(main.ClusterIdx()).Get(caller.Idx()); 1647 acc.Add(num_tasks, num_workers, is_root, wait_before, stopwatch.Elapsed()); 1648 if (is_root) { 1649 PROFILER_END_ROOT_RUN(); 1650 shared_.LastRootEnd().Reset(); 1651 } 1652 #else 1653 (void)caller; 1654 #endif 1655 1656 busy_.Clear(); 1657 return true; 1658 } 1659 1660 // Sends `next_config` to workers: 1661 // - Main wakes threads using the current config. 1662 // - Threads copy `next_config` into their `Worker` during `WorkerRun`. 1663 // - Threads notify the (same) barrier and already wait for the next wake 1664 // using `next_config`. 1665 HWY_NOINLINE void SendConfig(pool::Config next_config) { 1666 (void)RunWithoutAutotune( 1667 0, NumWorkers(), shared_.send_config, 1668 [this, next_config](HWY_MAYBE_UNUSED uint64_t task, size_t worker) { 1669 HWY_DASSERT(task == worker); // one task per worker 1670 workers_[worker].SetNextConfig(next_config); 1671 workers_[worker].SetExit(Exit::kLoop); 1672 }); 1673 1674 // All have woken and are, or will be, waiting per `next_config`. Now we 1675 // can entirely switch the main thread's config for the next wake. 1676 workers_[0].SetNextConfig(next_config); 1677 } 1678 1679 using AutoTuneT = AutoTune<pool::Config, 30>; 1680 AutoTuneT& AutoTuner() { 1681 static_assert(static_cast<size_t>(PoolWaitMode::kBlock) == 1, ""); 1682 return auto_tune_[static_cast<size_t>(wait_mode_) - 1]; 1683 } 1684 const AutoTuneT& AutoTuner() const { 1685 return auto_tune_[static_cast<size_t>(wait_mode_) - 1]; 1686 } 1687 1688 const size_t num_threads_; // not including main thread 1689 const Divisor64 div_workers_; 1690 pool::Shared& shared_; 1691 pool::Worker* const workers_; // points into `worker_bytes_` 1692 1693 alignas(HWY_ALIGNMENT) pool::Stats stats_; 1694 1695 // This is written by the main thread and read by workers, via reference 1696 // passed to `ThreadFunc`. Padding ensures that the workers' cache lines are 1697 // not unnecessarily invalidated when the main thread writes other members. 1698 alignas(HWY_ALIGNMENT) pool::Tasks tasks_; 1699 HWY_MEMBER_VAR_MAYBE_UNUSED char 1700 padding_[HWY_ALIGNMENT - sizeof(pool::Tasks)]; 1701 1702 pool::BusyFlag busy_; 1703 1704 // Unmodified after ctor, but cannot be const because we call thread::join(). 1705 std::vector<std::thread> threads_; 1706 1707 PoolWaitMode wait_mode_; 1708 AutoTuneT auto_tune_[2]; // accessed via `AutoTuner` 1709 1710 // Last because it is large. Store inside `ThreadPool` so that callers can 1711 // bind it to the NUMA node's memory. Not stored inside `WorkerLifecycle` 1712 // because that class would be initialized after `workers_`. 1713 alignas(HWY_ALIGNMENT) uint8_t 1714 worker_bytes_[sizeof(pool::Worker) * (pool::kMaxThreads + 1)]; 1715 }; 1716 1717 } // namespace hwy 1718 1719 #endif // HIGHWAY_HWY_CONTRIB_THREAD_POOL_THREAD_POOL_H_