tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

thread_pool.h (64831B)


      1 // Copyright 2023 Google LLC
      2 // SPDX-License-Identifier: Apache-2.0
      3 //
      4 // Licensed under the Apache License, Version 2.0 (the "License");
      5 // you may not use this file except in compliance with the License.
      6 // You may obtain a copy of the License at
      7 //
      8 //      http://www.apache.org/licenses/LICENSE-2.0
      9 //
     10 // Unless required by applicable law or agreed to in writing, software
     11 // distributed under the License is distributed on an "AS IS" BASIS,
     12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13 // See the License for the specific language governing permissions and
     14 // limitations under the License.
     15 
     16 // Modified from BSD-licensed code
     17 // Copyright (c) the JPEG XL Project Authors. All rights reserved.
     18 // See https://github.com/libjxl/libjxl/blob/main/LICENSE.
     19 
     20 #ifndef HIGHWAY_HWY_CONTRIB_THREAD_POOL_THREAD_POOL_H_
     21 #define HIGHWAY_HWY_CONTRIB_THREAD_POOL_THREAD_POOL_H_
     22 
     23 #include <stddef.h>
     24 #include <stdint.h>
     25 #include <stdio.h>  // snprintf
     26 #include <string.h>
     27 
     28 #include <array>
     29 #include <atomic>
     30 #include <string>
     31 #include <thread>  // NOLINT
     32 #include <vector>
     33 
     34 #include "hwy/aligned_allocator.h"  // HWY_ALIGNMENT
     35 #include "hwy/auto_tune.h"
     36 #include "hwy/base.h"
     37 #include "hwy/cache_control.h"  // Pause
     38 #include "hwy/contrib/thread_pool/futex.h"
     39 #include "hwy/contrib/thread_pool/spin.h"
     40 #include "hwy/contrib/thread_pool/topology.h"
     41 #include "hwy/profiler.h"
     42 #include "hwy/stats.h"
     43 #include "hwy/timer.h"
     44 
     45 #if HWY_OS_APPLE
     46 #include <AvailabilityMacros.h>
     47 #endif
     48 
     49 #if PROFILER_ENABLED
     50 #include <algorithm>  // std::sort
     51 
     52 #include "hwy/bit_set.h"
     53 #endif
     54 
     55 namespace hwy {
     56 
     57 // Sets the name of the current thread to the format string `format`, which must
     58 // include %d for `thread`. Currently only implemented for pthreads (*nix and
     59 // OSX); Windows involves throwing an exception.
     60 static inline void SetThreadName(const char* format, int thread) {
     61  char buf[16] = {};  // Linux limit, including \0
     62  const int chars_written = snprintf(buf, sizeof(buf), format, thread);
     63  HWY_ASSERT(0 < chars_written &&
     64             chars_written <= static_cast<int>(sizeof(buf) - 1));
     65 
     66 #if (HWY_OS_LINUX && (!defined(__ANDROID__) || __ANDROID_API__ >= 19)) || \
     67    HWY_OS_FREEBSD
     68  // Note that FreeBSD pthread_set_name_np does not return a value (#2669).
     69  HWY_ASSERT(0 == pthread_setname_np(pthread_self(), buf));
     70 #elif HWY_OS_APPLE && (MAC_OS_X_VERSION_MIN_REQUIRED >= 1060)
     71  // Different interface: single argument, current thread only.
     72  HWY_ASSERT(0 == pthread_setname_np(buf));
     73 #elif defined(__EMSCRIPTEN__)
     74  emscripten_set_thread_name(pthread_self(), buf);
     75 #else
     76  (void)format;
     77  (void)thread;
     78 #endif
     79 }
     80 
     81 // Whether workers should block or spin.
     82 enum class PoolWaitMode : uint8_t { kBlock = 1, kSpin };
     83 
     84 enum class Exit : uint32_t { kNone, kLoop, kThread };
     85 
     86 // Upper bound on non-empty `ThreadPool` (single-worker pools do not count).
     87 // Turin has 16 clusters. Add one for the across-cluster pool.
     88 HWY_INLINE_VAR constexpr size_t kMaxClusters = 32 + 1;
     89 
     90 // Use the last slot so that `PoolWorkerMapping` does not have to know the
     91 // total number of clusters.
     92 HWY_INLINE_VAR constexpr size_t kAllClusters = kMaxClusters - 1;
     93 
     94 // Argument to `ThreadPool`: how to map local worker_idx to global.
     95 class PoolWorkerMapping {
     96 public:
     97  // Backward-compatible mode: returns local worker index.
     98  PoolWorkerMapping() : cluster_idx_(0), max_cluster_workers_(0) {}
     99  PoolWorkerMapping(size_t cluster_idx, size_t max_cluster_workers)
    100      : cluster_idx_(cluster_idx), max_cluster_workers_(max_cluster_workers) {
    101    HWY_DASSERT(cluster_idx <= kAllClusters);
    102    // Only use this ctor for the new global worker index mode. If this were
    103    // zero, we would still return local indices.
    104    HWY_DASSERT(max_cluster_workers != 0);
    105  }
    106 
    107  size_t ClusterIdx() const { return cluster_idx_; }
    108  size_t MaxClusterWorkers() const { return max_cluster_workers_; }
    109 
    110  // Returns global_idx, or unchanged local worker_idx if default-constructed.
    111  size_t operator()(size_t worker_idx) const {
    112    if (cluster_idx_ == kAllClusters) {
    113      const size_t cluster_idx = worker_idx;
    114      HWY_DASSERT(cluster_idx < kAllClusters);
    115      // First index within the N-th cluster. The main thread is the first.
    116      return cluster_idx * max_cluster_workers_;
    117    }
    118    HWY_DASSERT(max_cluster_workers_ == 0 || worker_idx < max_cluster_workers_);
    119    return cluster_idx_ * max_cluster_workers_ + worker_idx;
    120  }
    121 
    122 private:
    123  size_t cluster_idx_;
    124  size_t max_cluster_workers_;
    125 };
    126 
    127 namespace pool {
    128 
    129 #ifndef HWY_POOL_VERBOSITY
    130 #define HWY_POOL_VERBOSITY 0
    131 #endif
    132 
    133 static constexpr int kVerbosity = HWY_POOL_VERBOSITY;
    134 
    135 // Some CPUs already have more than this many threads, but rather than one
    136 // large pool, we assume applications create multiple pools, ideally per
    137 // cluster (cores sharing a cache), because this improves locality and barrier
    138 // latency. In that case, this is a generous upper bound.
    139 static constexpr size_t kMaxThreads = 127;
    140 
    141 // Generates a random permutation of [0, size). O(1) storage.
    142 class ShuffledIota {
    143 public:
    144  ShuffledIota() : coprime_(1) {}  // for Worker
    145  explicit ShuffledIota(uint32_t coprime) : coprime_(coprime) {}
    146 
    147  // Returns the next after `current`, using an LCG-like generator.
    148  uint32_t Next(uint32_t current, const Divisor64& divisor) const {
    149    HWY_DASSERT(current < divisor.GetDivisor());
    150    // (coprime * i + current) % size, see https://lemire.me/blog/2017/09/18/.
    151    return static_cast<uint32_t>(divisor.Remainder(current + coprime_));
    152  }
    153 
    154  // Returns true if a and b have no common denominator except 1. Based on
    155  // binary GCD. Assumes a and b are nonzero. Also used in tests.
    156  static bool CoprimeNonzero(uint32_t a, uint32_t b) {
    157    const size_t trailing_a = Num0BitsBelowLS1Bit_Nonzero32(a);
    158    const size_t trailing_b = Num0BitsBelowLS1Bit_Nonzero32(b);
    159    // If both have at least one trailing zero, they are both divisible by 2.
    160    if (HWY_MIN(trailing_a, trailing_b) != 0) return false;
    161 
    162    // If one of them has a trailing zero, shift it out.
    163    a >>= trailing_a;
    164    b >>= trailing_b;
    165 
    166    for (;;) {
    167      // Swap such that a >= b.
    168      const uint32_t tmp_a = a;
    169      a = HWY_MAX(tmp_a, b);
    170      b = HWY_MIN(tmp_a, b);
    171 
    172      // When the smaller number is 1, they were coprime.
    173      if (b == 1) return true;
    174 
    175      a -= b;
    176      // a == b means there was a common factor, so not coprime.
    177      if (a == 0) return false;
    178      a >>= Num0BitsBelowLS1Bit_Nonzero32(a);
    179    }
    180  }
    181 
    182  // Returns another coprime >= `start`, or 1 for small `size`.
    183  // Used to seed independent ShuffledIota instances.
    184  static uint32_t FindAnotherCoprime(uint32_t size, uint32_t start) {
    185    if (size <= 2) {
    186      return 1;
    187    }
    188 
    189    // Avoids even x for even sizes, which are sure to be rejected.
    190    const uint32_t inc = (size & 1) ? 1 : 2;
    191 
    192    for (uint32_t x = start | 1; x < start + size * 16; x += inc) {
    193      if (CoprimeNonzero(x, static_cast<uint32_t>(size))) {
    194        return x;
    195      }
    196    }
    197 
    198    HWY_UNREACHABLE;
    199  }
    200 
    201  uint32_t coprime_;
    202 };
    203 
    204 // 'Policies' suitable for various worker counts and locality. To define a
    205 // new class, add an enum and update `ToString` plus `CallWithConfig`. The
    206 // enumerators must be contiguous so we can iterate over them.
    207 enum class WaitType : uint8_t {
    208  kBlock,
    209  kSpin1,
    210  kSpinSeparate,
    211  kSentinel  // Must be last.
    212 };
    213 
    214 // For printing which is in use.
    215 static inline const char* ToString(WaitType type) {
    216  switch (type) {
    217    case WaitType::kBlock:
    218      return "Block";
    219    case WaitType::kSpin1:
    220      return "Single";
    221    case WaitType::kSpinSeparate:
    222      return "Separate";
    223    case WaitType::kSentinel:
    224      return nullptr;
    225  }
    226 }
    227 
    228 // Parameters governing the main and worker thread behavior. Can be updated at
    229 // runtime via `SetWaitMode`, which calls `SendConfig`. Both have copies which
    230 // are carefully synchronized. 32 bits leave room for two future fields.
    231 // 64 bits would also be fine because this does not go through futex.
    232 struct Config {  // 4 bytes
    233  static std::vector<Config> AllCandidates(PoolWaitMode wait_mode) {
    234    std::vector<Config> candidates;
    235 
    236    if (wait_mode == PoolWaitMode::kSpin) {
    237      std::vector<SpinType> spin_types;
    238      spin_types.reserve(2);
    239      spin_types.push_back(DetectSpin());
    240      // Monitor-based spin may be slower, so also try Pause.
    241      if (spin_types[0] != SpinType::kPause) {
    242        spin_types.push_back(SpinType::kPause);
    243      }
    244 
    245      // All except `kBlock`.
    246      std::vector<WaitType> wait_types;
    247      for (size_t wait = 0;; ++wait) {
    248        const WaitType wait_type = static_cast<WaitType>(wait);
    249        if (wait_type == WaitType::kSentinel) break;
    250        if (wait_type != WaitType::kBlock) wait_types.push_back(wait_type);
    251      }
    252 
    253      candidates.reserve(spin_types.size() * wait_types.size());
    254      for (const SpinType spin_type : spin_types) {
    255        for (const WaitType wait_type : wait_types) {
    256          candidates.emplace_back(spin_type, wait_type);
    257        }
    258      }
    259    } else {
    260      // kBlock does not use spin, so there is only one candidate.
    261      candidates.emplace_back(SpinType::kPause, WaitType::kBlock);
    262    }
    263 
    264    return candidates;
    265  }
    266 
    267  std::string ToString() const {
    268    char buf[128];
    269    snprintf(buf, sizeof(buf), "%-14s %-9s", hwy::ToString(spin_type),
    270             pool::ToString(wait_type));
    271    return buf;
    272  }
    273 
    274  Config(SpinType spin_type_in, WaitType wait_type_in)
    275      : spin_type(spin_type_in), wait_type(wait_type_in) {}
    276  // Workers initially spin until ThreadPool sends them their actual config.
    277  Config() : Config(SpinType::kPause, WaitType::kSpinSeparate) {}
    278 
    279  SpinType spin_type;
    280  WaitType wait_type;
    281  HWY_MEMBER_VAR_MAYBE_UNUSED uint8_t reserved[2];
    282 };
    283 static_assert(sizeof(Config) == 4, "");
    284 
    285 #if PROFILER_ENABLED
    286 
    287 // Accumulates timings and stats from main thread and workers.
    288 class Stats {
    289  // Up to `HWY_ALIGNMENT / 8` slots/offsets, passed to `PerThread`.
    290  static constexpr size_t kDWait = 0;
    291  static constexpr size_t kWaitReps = 1;
    292  static constexpr size_t kTBeforeRun = 2;
    293  static constexpr size_t kDRun = 3;
    294  static constexpr size_t kTasksStatic = 4;
    295  static constexpr size_t kTasksDynamic = 5;
    296  static constexpr size_t kTasksStolen = 6;
    297  static constexpr size_t kDFuncStatic = 7;
    298  static constexpr size_t kDFuncDynamic = 8;
    299  static constexpr size_t kSentinel = 9;
    300 
    301 public:
    302  Stats() {
    303    for (size_t thread_idx = 0; thread_idx < kMaxThreads; ++thread_idx) {
    304      for (size_t offset = 0; offset < kSentinel; ++offset) {
    305        PerThread(thread_idx, offset) = 0;
    306      }
    307    }
    308    Reset();
    309  }
    310 
    311  // Called by the N lowest-indexed workers that got one of the N tasks, which
    312  // includes the main thread because its index is 0.
    313  // `d_*` denotes "difference" (of timestamps) and thus also duration.
    314  void NotifyRunStatic(size_t worker_idx, timer::Ticks d_func) {
    315    if (worker_idx == 0) {  // main thread
    316      num_run_static_++;
    317      sum_tasks_static_++;
    318      sum_d_func_static_ += d_func;
    319    } else {
    320      const size_t thread_idx = worker_idx - 1;
    321      // Defer the sums until `NotifyMainRun` to avoid atomic RMW.
    322      PerThread(thread_idx, kTasksStatic)++;
    323      PerThread(thread_idx, kDFuncStatic) += d_func;
    324    }
    325  }
    326 
    327  // Called by all workers, including the main thread, regardless of whether
    328  // they actually stole or even ran a task.
    329  void NotifyRunDynamic(size_t worker_idx, size_t tasks, size_t stolen,
    330                        timer::Ticks d_func) {
    331    if (worker_idx == 0) {  // main thread
    332      num_run_dynamic_++;
    333      sum_tasks_dynamic_ += tasks;
    334      sum_tasks_stolen_ += stolen;
    335      sum_d_func_dynamic_ += d_func;
    336    } else {
    337      const size_t thread_idx = worker_idx - 1;
    338      // Defer the sums until `NotifyMainRun` to avoid atomic RMW.
    339      PerThread(thread_idx, kTasksDynamic) += tasks;
    340      PerThread(thread_idx, kTasksStolen) += stolen;
    341      PerThread(thread_idx, kDFuncDynamic) += d_func;
    342    }
    343  }
    344 
    345  // Called concurrently by non-main worker threads after their `WorkerRun` and
    346  // before the barrier.
    347  void NotifyThreadRun(size_t worker_idx, timer::Ticks d_wait, size_t wait_reps,
    348                       timer::Ticks t_before_run, timer::Ticks d_run) {
    349    HWY_DASSERT(worker_idx != 0);  // Not called by main thread.
    350    const size_t thread_idx = worker_idx - 1;
    351    HWY_DASSERT(PerThread(thread_idx, kDWait) == 0);
    352    HWY_DASSERT(PerThread(thread_idx, kWaitReps) == 0);
    353    HWY_DASSERT(PerThread(thread_idx, kTBeforeRun) == 0);
    354    HWY_DASSERT(PerThread(thread_idx, kDRun) == 0);
    355    PerThread(thread_idx, kDWait) = d_wait;
    356    PerThread(thread_idx, kWaitReps) = wait_reps;
    357    PerThread(thread_idx, kTBeforeRun) = t_before_run;  // For wake latency.
    358    PerThread(thread_idx, kDRun) = d_run;
    359  }
    360 
    361  // Called by the main thread after the barrier, whose store-release and
    362  // load-acquire publishes all prior writes. Note: only the main thread can
    363  // store `after_barrier`. If workers did, which by definition happens after
    364  // the barrier, then they would race with this function's reads.
    365  void NotifyMainRun(size_t num_threads, timer::Ticks t_before_wake,
    366                     timer::Ticks d_wake, timer::Ticks d_main_run,
    367                     timer::Ticks d_barrier) {
    368    HWY_DASSERT(num_threads <= kMaxThreads);
    369 
    370    timer::Ticks min_d_run = ~timer::Ticks{0};
    371    timer::Ticks max_d_run = 0;
    372    timer::Ticks sum_d_run = 0;
    373    for (size_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
    374      sum_tasks_static_ += PerThread(thread_idx, kTasksStatic);
    375      sum_tasks_dynamic_ += PerThread(thread_idx, kTasksDynamic);
    376      sum_tasks_stolen_ += PerThread(thread_idx, kTasksStolen);
    377      sum_d_func_static_ += PerThread(thread_idx, kDFuncStatic);
    378      sum_d_func_dynamic_ += PerThread(thread_idx, kDFuncDynamic);
    379      sum_d_wait_ += PerThread(thread_idx, kDWait);
    380      sum_wait_reps_ += PerThread(thread_idx, kWaitReps);
    381      const timer::Ticks d_thread_run = PerThread(thread_idx, kDRun);
    382      min_d_run = HWY_MIN(min_d_run, d_thread_run);
    383      max_d_run = HWY_MAX(max_d_run, d_thread_run);
    384      sum_d_run += d_thread_run;
    385      const timer::Ticks t_before_run = PerThread(thread_idx, kTBeforeRun);
    386 
    387      for (size_t offset = 0; offset < kSentinel; ++offset) {
    388        PerThread(thread_idx, offset) = 0;
    389      }
    390 
    391      HWY_DASSERT(t_before_run != 0);
    392      const timer::Ticks d_latency = t_before_run - t_before_wake;
    393      sum_wake_latency_ += d_latency;
    394      max_wake_latency_ = HWY_MAX(max_wake_latency_, d_latency);
    395    }
    396    const double inv_avg_d_run =
    397        static_cast<double>(num_threads) / static_cast<double>(sum_d_run);
    398    // Ratios of min and max run times to the average, for this pool.Run.
    399    const double r_min = static_cast<double>(min_d_run) * inv_avg_d_run;
    400    const double r_max = static_cast<double>(max_d_run) * inv_avg_d_run;
    401 
    402    num_run_++;  // `num_run_*` are incremented by `NotifyRun*`.
    403    sum_d_run_ += sum_d_run;
    404    sum_r_min_ += r_min;  // For average across all pool.Run.
    405    sum_r_max_ += r_max;
    406 
    407    sum_d_wake_ += d_wake;  // `*wake_latency_` are updated above.
    408    sum_d_barrier_ += d_barrier;
    409 
    410    sum_d_run_ += d_main_run;
    411    sum_d_run_main_ += d_main_run;
    412  }
    413 
    414  void PrintAndReset(size_t num_threads, timer::Ticks d_thread_lifetime_ticks) {
    415    // This is unconditionally called via `ProfilerFunc`. If the pool was unused
    416    // in this invocation, skip it.
    417    if (num_run_ == 0) return;
    418    HWY_ASSERT(num_run_ == num_run_static_ + num_run_dynamic_);
    419 
    420    const double d_func_static = Seconds(sum_d_func_static_);
    421    const double d_func_dynamic = Seconds(sum_d_func_dynamic_);
    422    const double sum_d_run = Seconds(sum_d_run_);
    423    const double func_div_run = (d_func_static + d_func_dynamic) / sum_d_run;
    424    if (!(0.95 <= func_div_run && func_div_run <= 1.0)) {
    425      HWY_WARN("Func time %f should be similar to total run %f.",
    426               d_func_static + d_func_dynamic, sum_d_run);
    427    }
    428    const double sum_d_run_main = Seconds(sum_d_run_main_);
    429    const double max_wake_latency = Seconds(max_wake_latency_);
    430    const double sum_d_wait = Seconds(sum_d_wait_);
    431    const double d_thread_lifetime = Seconds(d_thread_lifetime_ticks);
    432 
    433    const double inv_run = 1.0 / static_cast<double>(num_run_);
    434    const auto per_run = [inv_run](double sum) { return sum * inv_run; };
    435    const double avg_d_wake = per_run(Seconds(sum_d_wake_));
    436    const double avg_wake_latency = per_run(Seconds(sum_wake_latency_));
    437    const double avg_d_wait = per_run(sum_d_wait);
    438    const double avg_wait_reps = per_run(static_cast<double>(sum_wait_reps_));
    439    const double avg_d_barrier = per_run(Seconds(sum_d_barrier_));
    440    const double avg_r_min = per_run(sum_r_min_);
    441    const double avg_r_max = per_run(sum_r_max_);
    442 
    443    const size_t num_workers = 1 + num_threads;
    444    const double avg_tasks_static =
    445        Avg(sum_tasks_static_, num_run_static_ * num_workers);
    446    const double avg_tasks_dynamic =
    447        Avg(sum_tasks_dynamic_, num_run_dynamic_ * num_workers);
    448    const double avg_steals =
    449        Avg(sum_tasks_stolen_, num_run_dynamic_ * num_workers);
    450    const double avg_d_run = sum_d_run / num_workers;
    451 
    452    const double pc_wait = sum_d_wait / d_thread_lifetime * 100.0;
    453    const double pc_run = sum_d_run / d_thread_lifetime * 100.0;
    454    const double pc_main = sum_d_run_main / avg_d_run * 100.0;
    455 
    456    const auto us = [](double sec) { return sec * 1E6; };
    457    const auto ns = [](double sec) { return sec * 1E9; };
    458    printf(
    459        "%3zu: %5d x %.2f/%5d x %4.1f tasks, %.2f steals; "
    460        "wake %7.3f ns, latency %6.3f < %7.3f us, barrier %7.3f us; "
    461        "wait %.1f us (%6.0f reps, %4.1f%%), balance %4.1f%%-%5.1f%%, "
    462        "func: %6.3f + %7.3f, "
    463        "%.1f%% of thread time %7.3f s; main:worker %5.1f%%\n",
    464        num_threads, num_run_static_, avg_tasks_static, num_run_dynamic_,
    465        avg_tasks_dynamic, avg_steals, ns(avg_d_wake), us(avg_wake_latency),
    466        us(max_wake_latency), us(avg_d_barrier), us(avg_d_wait), avg_wait_reps,
    467        pc_wait, avg_r_min * 100.0, avg_r_max * 100.0, d_func_static,
    468        d_func_dynamic, pc_run, d_thread_lifetime, pc_main);
    469 
    470    Reset(num_threads);
    471  }
    472 
    473  void Reset(size_t num_threads = kMaxThreads) {
    474    num_run_ = 0;
    475    num_run_static_ = 0;
    476    num_run_dynamic_ = 0;
    477 
    478    sum_tasks_stolen_ = 0;
    479    sum_tasks_static_ = 0;
    480    sum_tasks_dynamic_ = 0;
    481 
    482    sum_d_wake_ = 0;
    483    sum_wake_latency_ = 0;
    484    max_wake_latency_ = 0;
    485    sum_d_wait_ = 0;
    486    sum_wait_reps_ = 0;
    487    sum_d_barrier_ = 0;
    488 
    489    sum_d_func_static_ = 0;
    490    sum_d_func_dynamic_ = 0;
    491    sum_r_min_ = 0.0;
    492    sum_r_max_ = 0.0;
    493    sum_d_run_ = 0;
    494    sum_d_run_main_ = 0;
    495    // ctor and `NotifyMainRun` already reset `PerThread`.
    496  }
    497 
    498 private:
    499  template <typename T>
    500  static double Avg(T sum, size_t div) {
    501    return div == 0 ? 0.0 : static_cast<double>(sum) / static_cast<double>(div);
    502  }
    503 
    504  static constexpr size_t kU64PerLine = HWY_ALIGNMENT / sizeof(uint64_t);
    505 
    506  uint64_t& PerThread(size_t thread_idx, size_t offset) {
    507    HWY_DASSERT(thread_idx < kMaxThreads);
    508    HWY_DASSERT(offset < kSentinel);
    509    return per_thread_[thread_idx * kU64PerLine + offset];
    510  }
    511 
    512  int32_t num_run_;
    513  int32_t num_run_static_;
    514  int32_t num_run_dynamic_;
    515 
    516  int32_t sum_tasks_stolen_;
    517  int64_t sum_tasks_static_;
    518  int64_t sum_tasks_dynamic_;
    519 
    520  timer::Ticks sum_d_wake_;
    521  timer::Ticks sum_wake_latency_;
    522  timer::Ticks max_wake_latency_;
    523  timer::Ticks sum_d_wait_;
    524  uint64_t sum_wait_reps_;
    525  timer::Ticks sum_d_barrier_;
    526 
    527  timer::Ticks sum_d_func_static_;
    528  timer::Ticks sum_d_func_dynamic_;
    529  double sum_r_min_;
    530  double sum_r_max_;
    531  timer::Ticks sum_d_run_;
    532  timer::Ticks sum_d_run_main_;
    533 
    534  // One cache line per pool thread to avoid false sharing.
    535  uint64_t per_thread_[kMaxThreads * kU64PerLine];
    536 };
    537 // Enables shift rather than multiplication.
    538 static_assert(sizeof(Stats) == (kMaxThreads + 1) * HWY_ALIGNMENT, "Wrong size");
    539 
    540 // Non-power of two to avoid 2K aliasing.
    541 HWY_INLINE_VAR constexpr size_t kMaxCallers = 60;
    542 
    543 // Per-caller stats, stored in `PerCluster`.
    544 class CallerAccumulator {
    545 public:
    546  bool Any() const { return calls_ != 0; }
    547 
    548  void Add(size_t tasks, size_t workers, bool is_root, timer::Ticks wait_before,
    549           timer::Ticks elapsed) {
    550    calls_++;
    551    root_ += is_root;
    552    workers_ += workers;
    553    min_tasks_ = HWY_MIN(min_tasks_, tasks);
    554    max_tasks_ = HWY_MAX(max_tasks_, tasks);
    555    tasks_ += tasks;
    556    wait_before_ += wait_before;
    557    elapsed_ += elapsed;
    558  }
    559 
    560  void AddFrom(const CallerAccumulator& other) {
    561    calls_ += other.calls_;
    562    root_ += other.root_;
    563    workers_ += other.workers_;
    564    min_tasks_ = HWY_MIN(min_tasks_, other.min_tasks_);
    565    max_tasks_ = HWY_MAX(max_tasks_, other.max_tasks_);
    566    tasks_ += other.tasks_;
    567    wait_before_ += other.wait_before_;
    568    elapsed_ += other.elapsed_;
    569  }
    570 
    571  bool operator>(const CallerAccumulator& other) const {
    572    return elapsed_ > other.elapsed_;
    573  }
    574 
    575  void PrintAndReset(const char* caller, size_t active_clusters) {
    576    if (!Any()) return;
    577    HWY_ASSERT(root_ <= calls_);
    578    const double inv_calls = 1.0 / static_cast<double>(calls_);
    579    const double pc_root = static_cast<double>(root_) * inv_calls * 100.0;
    580    const double avg_workers = static_cast<double>(workers_) * inv_calls;
    581    const double avg_tasks = static_cast<double>(tasks_) * inv_calls;
    582    const double avg_tasks_per_worker = avg_tasks / avg_workers;
    583    const double inv_freq = 1.0 / platform::InvariantTicksPerSecond();
    584    const double sum_wait_before = static_cast<double>(wait_before_) * inv_freq;
    585    const double avg_wait_before =
    586        root_ ? sum_wait_before / static_cast<double>(root_) : 0.0;
    587    const double elapsed = static_cast<double>(elapsed_) * inv_freq;
    588    const double avg_elapsed = elapsed * inv_calls;
    589    const double task_len = avg_elapsed / avg_tasks_per_worker;
    590    printf(
    591        "%40s: %7.0f x (%3.0f%%) %2zu clusters, %4.1f workers @ "
    592        "%5.1f tasks (%5u-%5u), "
    593        "%5.0f us wait, %6.1E us run (task len %6.1E us), total %6.2f s\n",
    594        caller, static_cast<double>(calls_), pc_root, active_clusters,
    595        avg_workers, avg_tasks_per_worker, static_cast<uint32_t>(min_tasks_),
    596        static_cast<uint32_t>(max_tasks_), avg_wait_before * 1E6,
    597        avg_elapsed * 1E6, task_len * 1E6, elapsed);
    598    *this = CallerAccumulator();
    599  }
    600 
    601  // For the grand total, only print calls and elapsed because averaging the
    602  // the other stats is not very useful. No need to reset because this is called
    603  // on a temporary.
    604  void PrintTotal() {
    605    if (!Any()) return;
    606    HWY_ASSERT(root_ <= calls_);
    607    const double elapsed =
    608        static_cast<double>(elapsed_) / platform::InvariantTicksPerSecond();
    609    printf("TOTAL: %7.0f x run %6.2f s\n", static_cast<double>(calls_),
    610           elapsed);
    611  }
    612 
    613 private:
    614  int64_t calls_ = 0;
    615  int64_t root_ = 0;
    616  uint64_t workers_ = 0;
    617  uint64_t min_tasks_ = ~uint64_t{0};
    618  uint64_t max_tasks_ = 0;
    619  uint64_t tasks_ = 0;
    620  // both are wall time for root Run, otherwise CPU time.
    621  timer::Ticks wait_before_ = 0;
    622  timer::Ticks elapsed_ = 0;
    623 };
    624 static_assert(sizeof(CallerAccumulator) == 64, "");
    625 
    626 class PerCluster {
    627 public:
    628  CallerAccumulator& Get(size_t caller_idx) {
    629    HWY_DASSERT(caller_idx < kMaxCallers);
    630    callers_.Set(caller_idx);
    631    return accumulators_[caller_idx];
    632  }
    633 
    634  template <class Func>
    635  void ForeachCaller(Func&& func) {
    636    callers_.Foreach([&](size_t caller_idx) {
    637      func(caller_idx, accumulators_[caller_idx]);
    638    });
    639  }
    640 
    641  // Returns indices (required for `StringTable::Name`) in descending order of
    642  // elapsed time.
    643  std::vector<size_t> Sorted() {
    644    std::vector<size_t> vec;
    645    vec.reserve(kMaxCallers);
    646    ForeachCaller([&](size_t caller_idx, CallerAccumulator&) {
    647      vec.push_back(caller_idx);
    648    });
    649    std::sort(vec.begin(), vec.end(), [&](size_t a, size_t b) {
    650      return accumulators_[a] > accumulators_[b];
    651    });
    652    return vec;
    653  }
    654 
    655  // Caller takes care of resetting `accumulators_`.
    656  void ResetBits() { callers_ = hwy::BitSet<kMaxCallers>(); }
    657 
    658 private:
    659  CallerAccumulator accumulators_[kMaxCallers];
    660  hwy::BitSet<kMaxCallers> callers_;
    661 };
    662 
    663 // Type-safe wrapper.
    664 class Caller {
    665 public:
    666  Caller() : idx_(0) {}  // `AddCaller` never returns 0.
    667  explicit Caller(size_t idx) : idx_(idx) { HWY_DASSERT(idx < kMaxCallers); }
    668  size_t Idx() const { return idx_; }
    669 
    670 private:
    671  size_t idx_;
    672 };
    673 
    674 // Singleton, shared by all ThreadPool.
    675 class Shared {
    676 public:
    677  static HWY_DLLEXPORT Shared& Get();  // Thread-safe.
    678 
    679  Stopwatch MakeStopwatch() const { return Stopwatch(timer_); }
    680  Stopwatch& LastRootEnd() { return last_root_end_; }
    681 
    682  // Thread-safe. Calls with the same `name` return the same `Caller`.
    683  Caller AddCaller(const char* name) { return Caller(callers_.Add(name)); }
    684 
    685  PerCluster& Cluster(size_t cluster_idx) {
    686    HWY_DASSERT(cluster_idx < kMaxClusters);
    687    return per_cluster_[cluster_idx];
    688  }
    689 
    690  // Called from the main thread via `Profiler::PrintResults`.
    691  void PrintAndReset() {
    692    // Start counting pools (= one per cluster) invoked by each caller.
    693    size_t active_clusters[kMaxCallers] = {};
    694    per_cluster_[0].ForeachCaller(
    695        [&](size_t caller_idx, CallerAccumulator& acc) {
    696          active_clusters[caller_idx] = acc.Any();
    697        });
    698    // Reduce per-cluster accumulators into the first cluster.
    699    for (size_t cluster_idx = 1; cluster_idx < kMaxClusters; ++cluster_idx) {
    700      per_cluster_[cluster_idx].ForeachCaller(
    701          [&](size_t caller_idx, CallerAccumulator& acc) {
    702            active_clusters[caller_idx] += acc.Any();
    703            per_cluster_[0].Get(caller_idx).AddFrom(acc);
    704            acc = CallerAccumulator();
    705          });
    706      per_cluster_[cluster_idx].ResetBits();
    707    }
    708 
    709    CallerAccumulator total;
    710    for (size_t caller_idx : per_cluster_[0].Sorted()) {
    711      CallerAccumulator& acc = per_cluster_[0].Get(caller_idx);
    712      total.AddFrom(acc);  // must be before PrintAndReset.
    713      acc.PrintAndReset(callers_.Name(caller_idx), active_clusters[caller_idx]);
    714    }
    715    total.PrintTotal();
    716    per_cluster_[0].ResetBits();
    717  }
    718 
    719 private:
    720  Shared()  // called via Get().
    721      : last_root_end_(timer_),
    722        send_config(callers_.Add("SendConfig")),
    723        dtor(callers_.Add("PoolDtor")),
    724        print_stats(callers_.Add("PrintStats")) {
    725    Profiler::Get().AddFunc(this, [this]() { PrintAndReset(); });
    726    // Can skip `RemoveFunc` because the singleton never dies.
    727  }
    728 
    729  const Timer timer_;
    730  Stopwatch last_root_end_;
    731 
    732  PerCluster per_cluster_[kMaxClusters];
    733  StringTable<kMaxCallers> callers_;
    734 
    735 public:
    736  // Returned from `callers_.Add`:
    737  Caller send_config;
    738  Caller dtor;
    739  Caller print_stats;
    740 };
    741 
    742 #else
    743 
    744 struct Stats {
    745  void NotifyRunStatic(size_t, timer::Ticks) {}
    746  void NotifyRunDynamic(size_t, size_t, size_t, timer::Ticks) {}
    747  void NotifyThreadRun(size_t, timer::Ticks, size_t, timer::Ticks,
    748                       timer::Ticks) {}
    749  void NotifyMainRun(size_t, timer::Ticks, timer::Ticks, timer::Ticks,
    750                     timer::Ticks) {}
    751  void PrintAndReset(size_t, timer::Ticks) {}
    752  void Reset(size_t = kMaxThreads) {}
    753 };
    754 
    755 struct Caller {};
    756 
    757 class Shared {
    758 public:
    759  static HWY_DLLEXPORT Shared& Get();  // Thread-safe.
    760 
    761  Stopwatch MakeStopwatch() const { return Stopwatch(timer_); }
    762 
    763  Caller AddCaller(const char*) { return Caller(); }
    764 
    765 private:
    766  Shared() {}
    767 
    768  const Timer timer_;
    769 
    770 public:
    771  Caller send_config;
    772  Caller dtor;
    773  Caller print_stats;
    774 };
    775 
    776 #endif  // PROFILER_ENABLED
    777 
    778 // Per-worker state used by both main and worker threads. `ThreadFunc`
    779 // (threads) and `ThreadPool` (main) have a few additional members of their own.
    780 class alignas(HWY_ALIGNMENT) Worker {  // HWY_ALIGNMENT bytes
    781  static constexpr size_t kMaxVictims = 4;
    782 
    783  static constexpr auto kAcq = std::memory_order_acquire;
    784  static constexpr auto kRel = std::memory_order_release;
    785 
    786  bool OwnsGlobalIdx() const {
    787 #if PROFILER_ENABLED
    788    if (global_idx_ >= profiler::kMaxWorkers) {
    789      HWY_WARN("Windows-only bug? global_idx %zu >= %zu.", global_idx_,
    790               profiler::kMaxWorkers);
    791    }
    792 #endif  // PROFILER_ENABLED
    793    // Across-cluster pool owns all except the main thread, which is reserved by
    794    // profiler.cc.
    795    if (cluster_idx_ == kAllClusters) return global_idx_ != 0;
    796    // Within-cluster pool owns all except *its* main thread, because that is
    797    // owned by the across-cluster pool.
    798    return worker_ != 0;
    799  }
    800 
    801 public:
    802  Worker(const size_t worker, const size_t num_threads,
    803         const PoolWorkerMapping mapping, const Divisor64& div_workers,
    804         const Stopwatch& stopwatch)
    805      : workers_(this - worker),
    806        worker_(worker),
    807        num_threads_(num_threads),
    808        stopwatch_(stopwatch),
    809        // If `num_threads == 0`, we might be in an inner pool and must use
    810        // the `global_idx` we are currently running on.
    811        global_idx_(num_threads == 0 ? Profiler::GlobalIdx() : mapping(worker)),
    812        cluster_idx_(mapping.ClusterIdx()) {
    813    HWY_DASSERT(IsAligned(this, HWY_ALIGNMENT));
    814    HWY_DASSERT(worker <= num_threads);
    815    const size_t num_workers = static_cast<size_t>(div_workers.GetDivisor());
    816    num_victims_ = static_cast<uint32_t>(HWY_MIN(kMaxVictims, num_workers));
    817 
    818    // Increase gap between coprimes to reduce collisions.
    819    const uint32_t coprime = ShuffledIota::FindAnotherCoprime(
    820        static_cast<uint32_t>(num_workers),
    821        static_cast<uint32_t>((worker + 1) * 257 + worker * 13));
    822    const ShuffledIota shuffled_iota(coprime);
    823 
    824    // To simplify `WorkerRun`, this worker is the first to 'steal' from.
    825    victims_[0] = static_cast<uint32_t>(worker);
    826    for (uint32_t i = 1; i < num_victims_; ++i) {
    827      victims_[i] = shuffled_iota.Next(victims_[i - 1], div_workers);
    828      HWY_DASSERT(victims_[i] != worker);
    829    }
    830 
    831    HWY_IF_CONSTEXPR(PROFILER_ENABLED) {
    832      if (HWY_LIKELY(OwnsGlobalIdx())) {
    833        Profiler::Get().ReserveWorker(global_idx_);
    834      }
    835    }
    836  }
    837 
    838  ~Worker() {
    839    HWY_IF_CONSTEXPR(PROFILER_ENABLED) {
    840      if (HWY_LIKELY(OwnsGlobalIdx())) {
    841        Profiler::Get().FreeWorker(global_idx_);
    842      }
    843    }
    844  }
    845 
    846  // Placement-newed by `WorkerLifecycle`, we do not expect any copying.
    847  Worker(const Worker&) = delete;
    848  Worker& operator=(const Worker&) = delete;
    849 
    850  size_t Index() const { return worker_; }
    851  // For work stealing.
    852  Worker* AllWorkers() { return workers_; }
    853  const Worker* AllWorkers() const { return workers_; }
    854  size_t NumThreads() const { return num_threads_; }
    855 
    856  size_t GlobalIdx() const { return global_idx_; }
    857  size_t ClusterIdx() const { return cluster_idx_; }
    858 
    859  void SetStartTime() { stopwatch_.Reset(); }
    860  timer::Ticks ElapsedTime() { return stopwatch_.Elapsed(); }
    861 
    862  // ------------------------ Per-worker storage for `SendConfig`
    863 
    864  Config NextConfig() const { return next_config_; }
    865  // Called during `SendConfig` by workers and now also the main thread. This
    866  // avoids a separate `ThreadPool` member which risks going out of sync.
    867  void SetNextConfig(Config copy) { next_config_ = copy; }
    868 
    869  Exit GetExit() const { return exit_; }
    870  void SetExit(Exit exit) { exit_ = exit; }
    871 
    872  uint32_t WorkerEpoch() const { return worker_epoch_; }
    873  uint32_t AdvanceWorkerEpoch() { return ++worker_epoch_; }
    874 
    875  // ------------------------ Task assignment
    876 
    877  // Called from the main thread.
    878  void SetRange(const uint64_t begin, const uint64_t end) {
    879    my_begin_.store(begin, kRel);
    880    my_end_.store(end, kRel);
    881  }
    882 
    883  uint64_t MyEnd() const { return my_end_.load(kAcq); }
    884 
    885  Span<const uint32_t> Victims() const {
    886    return hwy::Span<const uint32_t>(victims_.data(),
    887                                     static_cast<size_t>(num_victims_));
    888  }
    889 
    890  // Returns the next task to execute. If >= MyEnd(), it must be skipped.
    891  uint64_t WorkerReserveTask() {
    892    // TODO(janwas): replace with cooperative work-stealing.
    893    return my_begin_.fetch_add(1, std::memory_order_relaxed);
    894  }
    895 
    896  // ------------------------ Waiter: Threads wait for tasks
    897 
    898  // WARNING: some `Wait*` do not set this for all Worker instances. For
    899  // example, `WaitType::kBlock` only uses the first worker's `Waiter` because
    900  // one futex can wake multiple waiters. Hence we never load this directly
    901  // without going through `Wait*` policy classes, and must ensure all threads
    902  // use the same wait mode.
    903 
    904  const std::atomic<uint32_t>& Waiter() const { return wait_epoch_; }
    905  std::atomic<uint32_t>& MutableWaiter() { return wait_epoch_; }  // futex
    906  void StoreWaiter(uint32_t epoch) { wait_epoch_.store(epoch, kRel); }
    907 
    908  // ------------------------ Barrier: Main thread waits for workers
    909 
    910  // For use by `HasReached` and `UntilReached`.
    911  const std::atomic<uint32_t>& Barrier() const { return barrier_epoch_; }
    912  // Setting to `epoch` signals that the worker has reached the barrier.
    913  void StoreBarrier(uint32_t epoch) { barrier_epoch_.store(epoch, kRel); }
    914 
    915 private:
    916  // Set by `SetRange`:
    917  std::atomic<uint64_t> my_begin_;
    918  std::atomic<uint64_t> my_end_;
    919 
    920  Worker* const workers_;
    921  const size_t worker_;
    922  const size_t num_threads_;
    923 
    924  Stopwatch stopwatch_;  // Reset by `SetStartTime`.
    925  const size_t global_idx_;
    926  const size_t cluster_idx_;
    927 
    928  // Use u32 to match futex.h. These must start at the initial value of
    929  // `worker_epoch_`.
    930  std::atomic<uint32_t> wait_epoch_{1};
    931  std::atomic<uint32_t> barrier_epoch_{1};
    932 
    933  uint32_t num_victims_;  // <= kPoolMaxVictims
    934  std::array<uint32_t, kMaxVictims> victims_;
    935 
    936  // Written and read by the same thread, hence not atomic.
    937  Config next_config_;
    938  Exit exit_ = Exit::kNone;
    939  // thread_pool_test requires nonzero epoch.
    940  uint32_t worker_epoch_ = 1;
    941 
    942  HWY_MEMBER_VAR_MAYBE_UNUSED uint8_t
    943      padding_[HWY_ALIGNMENT - 56 - 6 * sizeof(void*) - sizeof(victims_)];
    944 };
    945 static_assert(sizeof(Worker) == HWY_ALIGNMENT, "");
    946 
    947 // Creates/destroys `Worker` using preallocated storage. See comment at
    948 // `ThreadPool::worker_bytes_` for why we do not dynamically allocate.
    949 class WorkerLifecycle {  // 0 bytes
    950 public:
    951  // Placement new for `Worker` into `storage` because its ctor requires
    952  // the worker index. Returns array of all workers.
    953  static Worker* Init(uint8_t* storage, size_t num_threads,
    954                      PoolWorkerMapping mapping, const Divisor64& div_workers,
    955                      Shared& shared) {
    956    Worker* workers = new (storage)
    957        Worker(0, num_threads, mapping, div_workers, shared.MakeStopwatch());
    958    for (size_t worker = 1; worker <= num_threads; ++worker) {
    959      new (Addr(storage, worker)) Worker(worker, num_threads, mapping,
    960                                         div_workers, shared.MakeStopwatch());
    961      // Ensure pointer arithmetic is the same (will be used in Destroy).
    962      HWY_DASSERT(reinterpret_cast<uintptr_t>(workers + worker) ==
    963                  reinterpret_cast<uintptr_t>(Addr(storage, worker)));
    964    }
    965 
    966    // Publish non-atomic stores in `workers`.
    967    std::atomic_thread_fence(std::memory_order_release);
    968 
    969    return workers;
    970  }
    971 
    972  static void Destroy(Worker* workers, size_t num_threads) {
    973    for (size_t worker = 0; worker <= num_threads; ++worker) {
    974      workers[worker].~Worker();
    975    }
    976  }
    977 
    978 private:
    979  static uint8_t* Addr(uint8_t* storage, size_t worker) {
    980    return storage + worker * sizeof(Worker);
    981  }
    982 };
    983 
    984 // Stores arguments to `Run`: the function and range of task indices. Set by
    985 // the main thread, read by workers including the main thread.
    986 class Tasks {
    987  static constexpr auto kAcq = std::memory_order_acquire;
    988 
    989  // Signature of the (internal) function called from workers(s) for each
    990  // `task` in the [`begin`, `end`) passed to Run(). Closures (lambdas) do not
    991  // receive the first argument, which points to the lambda object.
    992  typedef void (*RunFunc)(const void* opaque, uint64_t task, size_t worker);
    993 
    994 public:
    995  Tasks() { HWY_DASSERT(IsAligned(this, 8)); }
    996 
    997  template <class Closure>
    998  void Set(uint64_t begin, uint64_t end, const Closure& closure) {
    999    constexpr auto kRel = std::memory_order_release;
   1000    // `TestTasks` and `SetWaitMode` call this with `begin == end`.
   1001    HWY_DASSERT(begin <= end);
   1002    begin_.store(begin, kRel);
   1003    end_.store(end, kRel);
   1004    func_.store(static_cast<RunFunc>(&CallClosure<Closure>), kRel);
   1005    opaque_.store(reinterpret_cast<const void*>(&closure), kRel);
   1006  }
   1007 
   1008  // Assigns workers their share of `[begin, end)`. Called from the main
   1009  // thread; workers are initializing or waiting for a command.
   1010  // Negligible CPU time.
   1011  static void DivideRangeAmongWorkers(const uint64_t begin, const uint64_t end,
   1012                                      const Divisor64& div_workers,
   1013                                      Worker* workers) {
   1014    const size_t num_workers = static_cast<size_t>(div_workers.GetDivisor());
   1015    HWY_DASSERT(num_workers > 1);  // Else Run() runs on the main thread.
   1016    HWY_DASSERT(begin <= end);
   1017    const size_t num_tasks = static_cast<size_t>(end - begin);
   1018 
   1019    // Assigning all remainders to the last worker causes imbalance. We instead
   1020    // give one more to each worker whose index is less. This may be zero when
   1021    // called from `TestTasks`.
   1022    const size_t min_tasks = static_cast<size_t>(div_workers.Divide(num_tasks));
   1023    const size_t remainder =
   1024        static_cast<size_t>(div_workers.Remainder(num_tasks));
   1025 
   1026    uint64_t my_begin = begin;
   1027    for (size_t worker = 0; worker < num_workers; ++worker) {
   1028      const uint64_t my_end = my_begin + min_tasks + (worker < remainder);
   1029      workers[worker].SetRange(my_begin, my_end);
   1030      my_begin = my_end;
   1031    }
   1032    HWY_DASSERT(my_begin == end);
   1033  }
   1034 
   1035  // Runs the worker's assigned range of tasks, plus work stealing if needed.
   1036  void WorkerRun(Worker* worker, const Shared& shared, Stats& stats) const {
   1037    if (NumTasks() > worker->NumThreads() + 1) {
   1038      WorkerRunDynamic(worker, shared, stats);
   1039    } else {
   1040      WorkerRunStatic(worker, shared, stats);
   1041    }
   1042  }
   1043 
   1044 private:
   1045  // Special case for <= 1 task per worker, where stealing is unnecessary.
   1046  void WorkerRunStatic(Worker* worker, const Shared& shared,
   1047                       Stats& stats) const {
   1048    const uint64_t begin = begin_.load(kAcq);
   1049    const uint64_t end = end_.load(kAcq);
   1050    HWY_DASSERT(begin <= end);
   1051    const size_t index = worker->Index();
   1052 
   1053    const uint64_t task = begin + index;
   1054    // We might still have more workers than tasks, so check first.
   1055    if (HWY_LIKELY(task < end)) {
   1056      const void* opaque = Opaque();
   1057      const RunFunc func = Func();
   1058      Stopwatch stopwatch = shared.MakeStopwatch();
   1059      func(opaque, task, index);
   1060      stats.NotifyRunStatic(index, stopwatch.Elapsed());
   1061    }
   1062  }
   1063 
   1064  // Must be called for each `worker` in [0, num_workers).
   1065  //
   1066  // A prior version of this code attempted to assign only as much work as a
   1067  // worker will actually use. As with OpenMP's 'guided' strategy, we assigned
   1068  // remaining/(k*num_threads) in each iteration. Although the worst-case
   1069  // imbalance is bounded, this required several rounds of work allocation, and
   1070  // the atomic counter did not scale to > 30 threads.
   1071  //
   1072  // We now use work stealing instead, where already-finished workers look for
   1073  // and perform work from others, as if they were that worker. This deals with
   1074  // imbalances as they arise, but care is required to reduce contention. We
   1075  // randomize the order in which threads choose victims to steal from.
   1076  void WorkerRunDynamic(Worker* worker, const Shared& shared,
   1077                        Stats& stats) const {
   1078    Worker* workers = worker->AllWorkers();
   1079    const size_t index = worker->Index();
   1080    const RunFunc func = Func();
   1081    const void* opaque = Opaque();
   1082 
   1083    size_t sum_tasks = 0;
   1084    size_t sum_stolen = 0;
   1085    timer::Ticks sum_d_func = 0;
   1086    // For each worker in random order, starting with our own, attempt to do
   1087    // all their work.
   1088    for (uint32_t victim : worker->Victims()) {
   1089      Worker* other_worker = workers + victim;
   1090 
   1091      // Until all of other_worker's work is done:
   1092      const uint64_t other_end = other_worker->MyEnd();
   1093      for (;;) {
   1094        // The worker that first sets `task` to `other_end` exits this loop.
   1095        // After that, `task` can be incremented up to `num_workers - 1` times,
   1096        // once per other worker.
   1097        const uint64_t task = other_worker->WorkerReserveTask();
   1098        if (HWY_UNLIKELY(task >= other_end)) {
   1099          hwy::Pause();  // Reduce coherency traffic while stealing.
   1100          break;
   1101        }
   1102        Stopwatch stopwatch = shared.MakeStopwatch();
   1103        // Pass the index we are actually running on; this is important
   1104        // because it is the TLS index for user code.
   1105        func(opaque, task, index);
   1106        sum_tasks++;
   1107        sum_stolen += worker != other_worker;
   1108        sum_d_func += stopwatch.Elapsed();
   1109      }
   1110    }
   1111    stats.NotifyRunDynamic(index, sum_tasks, sum_stolen, sum_d_func);
   1112  }
   1113 
   1114  size_t NumTasks() const {
   1115    return static_cast<size_t>(end_.load(kAcq) - begin_.load(kAcq));
   1116  }
   1117 
   1118  const void* Opaque() const { return opaque_.load(kAcq); }
   1119  RunFunc Func() const { return func_.load(kAcq); }
   1120 
   1121  // Calls closure(task, worker). Signature must match `RunFunc`.
   1122  template <class Closure>
   1123  static void CallClosure(const void* opaque, uint64_t task, size_t worker) {
   1124    (*reinterpret_cast<const Closure*>(opaque))(task, worker);
   1125  }
   1126 
   1127  std::atomic<uint64_t> begin_;
   1128  std::atomic<uint64_t> end_;
   1129  std::atomic<RunFunc> func_;
   1130  std::atomic<const void*> opaque_;
   1131 };
   1132 static_assert(sizeof(Tasks) == 16 + 2 * sizeof(void*), "");
   1133 
   1134 // ------------------------------ Threads wait, main wakes them
   1135 
   1136 // Considerations:
   1137 // - uint32_t storage per `Worker` so we can use `futex.h`.
   1138 // - avoid atomic read-modify-write. These are implemented on x86 using a LOCK
   1139 //   prefix, which interferes with other cores' cache-coherency transactions
   1140 //   and drains our core's store buffer. We use only store-release and
   1141 //   load-acquire. Although expressed using `std::atomic`, these are normal
   1142 //   loads/stores in the strong x86 memory model.
   1143 // - prefer to avoid resetting the state. "Sense-reversing" (flipping a flag)
   1144 //   would work, but we we prefer an 'epoch' counter because it is more useful
   1145 //   and easier to understand/debug, and as fast.
   1146 
   1147 // Both the main thread and each worker maintain their own counter, which are
   1148 // implicitly synchronized by the barrier. To wake, the main thread does a
   1149 // store-release, and each worker does a load-acquire. The policy classes differ
   1150 // in whether they block or spin (with pause/monitor to reduce power), and
   1151 // whether workers check their own counter or a shared one.
   1152 //
   1153 // All methods are const because they only use storage in `Worker`, and we
   1154 // prefer to pass const-references to empty classes to enable type deduction.
   1155 
   1156 // Futex: blocking reduces apparent CPU usage, but has higher wake latency.
   1157 struct WaitBlock {
   1158  // Wakes all workers by storing the current `epoch`.
   1159  void WakeWorkers(Worker* workers, const uint32_t epoch) const {
   1160    HWY_DASSERT(epoch != 0);
   1161    workers[1].StoreWaiter(epoch);
   1162    WakeAll(workers[1].MutableWaiter());  // futex: expensive syscall
   1163  }
   1164 
   1165  // Waits until `WakeWorkers(_, epoch)` has been called.
   1166  template <class Spin>
   1167  size_t UntilWoken(const Worker& worker, const Spin& /*spin*/) const {
   1168    HWY_DASSERT(worker.Index() != 0);  // main is 0
   1169    const uint32_t epoch = worker.WorkerEpoch();
   1170    const Worker* workers = worker.AllWorkers();
   1171    BlockUntilDifferent(epoch - 1, workers[1].Waiter());
   1172    return 1;  // iterations
   1173  }
   1174 };
   1175 
   1176 // Single u32: single store by the main thread. All worker threads poll this
   1177 // one cache line and thus have it in a shared state, which means the store
   1178 // will invalidate each of them, leading to more transactions than SpinSeparate.
   1179 struct WaitSpin1 {
   1180  void WakeWorkers(Worker* workers, const uint32_t epoch) const {
   1181    workers[1].StoreWaiter(epoch);
   1182  }
   1183 
   1184  // Returns the number of spin-wait iterations.
   1185  template <class Spin>
   1186  size_t UntilWoken(const Worker& worker, const Spin& spin) const {
   1187    HWY_DASSERT(worker.Index() != 0);  // main is 0
   1188    const Worker* workers = worker.AllWorkers();
   1189    const uint32_t epoch = worker.WorkerEpoch();
   1190    return spin.UntilEqual(epoch, workers[1].Waiter());
   1191  }
   1192 };
   1193 
   1194 // Separate u32 per thread: more stores for the main thread, but each worker
   1195 // only polls its own cache line, leading to fewer cache-coherency transactions.
   1196 struct WaitSpinSeparate {
   1197  void WakeWorkers(Worker* workers, const uint32_t epoch) const {
   1198    for (size_t thread = 0; thread < workers->NumThreads(); ++thread) {
   1199      workers[1 + thread].StoreWaiter(epoch);
   1200    }
   1201  }
   1202 
   1203  template <class Spin>
   1204  size_t UntilWoken(const Worker& worker, const Spin& spin) const {
   1205    HWY_DASSERT(worker.Index() != 0);  // main is 0
   1206    const uint32_t epoch = worker.WorkerEpoch();
   1207    return spin.UntilEqual(epoch, worker.Waiter());
   1208  }
   1209 };
   1210 
   1211 // Calls unrolled code selected by all config enums.
   1212 template <class Func, typename... Args>
   1213 HWY_INLINE void CallWithConfig(const Config& config, Func&& func,
   1214                               Args&&... args) {
   1215  switch (config.wait_type) {
   1216    case WaitType::kBlock:
   1217      return func(SpinPause(), WaitBlock(), std::forward<Args>(args)...);
   1218    case WaitType::kSpin1:
   1219      return CallWithSpin(config.spin_type, func, WaitSpin1(),
   1220                          std::forward<Args>(args)...);
   1221    case WaitType::kSpinSeparate:
   1222      return CallWithSpin(config.spin_type, func, WaitSpinSeparate(),
   1223                          std::forward<Args>(args)...);
   1224    case WaitType::kSentinel:
   1225      HWY_UNREACHABLE;
   1226  }
   1227 }
   1228 
   1229 // ------------------------------ Barrier: Main thread waits for workers
   1230 
   1231 // Similar to `WaitSpinSeparate`, a store-release of the same local epoch
   1232 // counter serves as a "have arrived" flag that does not require resetting.
   1233 class Barrier {
   1234 public:
   1235  void WorkerReached(Worker& worker, uint32_t epoch) const {
   1236    HWY_DASSERT(worker.Index() != 0);  // main is 0
   1237    worker.StoreBarrier(epoch);
   1238  }
   1239 
   1240  // Returns true if `worker` (can be the main thread) reached the barrier.
   1241  bool HasReached(const Worker* worker, uint32_t epoch) const {
   1242    const uint32_t barrier = worker->Barrier().load(std::memory_order_acquire);
   1243    HWY_DASSERT(barrier <= epoch);
   1244    return barrier == epoch;
   1245  }
   1246 
   1247  // Main thread loops over each worker. A "group of 2 or 4" barrier was not
   1248  // competitive on Skylake, Granite Rapids and Zen5.
   1249  template <class Spin>
   1250  void UntilReached(size_t num_threads, Worker* workers, const Spin& spin,
   1251                    uint32_t epoch) const {
   1252    workers[0].StoreBarrier(epoch);  // for main thread HasReached.
   1253 
   1254    for (size_t i = 0; i < num_threads; ++i) {
   1255      // TODO: log number of spin-wait iterations.
   1256      (void)spin.UntilEqual(epoch, workers[1 + i].Barrier());
   1257    }
   1258  }
   1259 };
   1260 
   1261 // In debug builds, detects when functions are re-entered.
   1262 class BusyFlag {
   1263 public:
   1264  void Set() { HWY_DASSERT(!busy_.test_and_set()); }
   1265  void Clear() { HWY_IF_CONSTEXPR(HWY_IS_DEBUG_BUILD) busy_.clear(); }
   1266 
   1267 private:
   1268  std::atomic_flag busy_ = ATOMIC_FLAG_INIT;
   1269 };
   1270 
   1271 }  // namespace pool
   1272 
   1273 // Highly efficient parallel-for, intended for workloads with thousands of
   1274 // fork-join regions which consist of calling tasks[t](i) for a few hundred i,
   1275 // using dozens of threads.
   1276 //
   1277 // To reduce scheduling overhead, we assume that tasks are statically known and
   1278 // that threads do not schedule new work themselves. This allows us to avoid
   1279 // queues and only store a counter plus the current task. The latter is a
   1280 // pointer to a lambda function, without the allocation/indirection required for
   1281 // `std::function`.
   1282 //
   1283 // To reduce fork/join latency, we choose an efficient barrier, optionally
   1284 // enable spin-waits via `SetWaitMode`, and avoid any mutex/lock. We largely
   1285 // even avoid atomic RMW operations (LOCK prefix): currently for the wait and
   1286 // barrier, in future hopefully also for work stealing.
   1287 //
   1288 // To eliminate false sharing and enable reasoning about cache line traffic, the
   1289 // class is aligned and holds all worker state.
   1290 //
   1291 // For load-balancing, we use work stealing in random order.
   1292 class alignas(HWY_ALIGNMENT) ThreadPool {
   1293  // Used to initialize `num_threads_` from the ctor argument.
   1294  static size_t ClampedNumThreads(size_t num_threads) {
   1295    // Upper bound is required for `worker_bytes_`.
   1296    if (HWY_UNLIKELY(num_threads > pool::kMaxThreads)) {
   1297      HWY_WARN("ThreadPool: clamping num_threads %zu to %zu.", num_threads,
   1298               pool::kMaxThreads);
   1299      num_threads = pool::kMaxThreads;
   1300    }
   1301    return num_threads;
   1302  }
   1303 
   1304 public:
   1305  // This typically includes hyperthreads, hence it is a loose upper bound.
   1306  // -1 because these are in addition to the main thread.
   1307  static size_t MaxThreads() {
   1308    LogicalProcessorSet lps;
   1309    // This is OS dependent, but more accurate if available because it takes
   1310    // into account restrictions set by cgroups or numactl/taskset.
   1311    if (GetThreadAffinity(lps)) {
   1312      return lps.Count() - 1;
   1313    }
   1314    return static_cast<size_t>(std::thread::hardware_concurrency() - 1);
   1315  }
   1316 
   1317  // `num_threads` is the number of *additional* threads to spawn, which should
   1318  // not exceed `MaxThreads()`. Note that the main thread also performs work.
   1319  // `mapping` indicates how to map local worker_idx to global.
   1320  ThreadPool(size_t num_threads,
   1321             PoolWorkerMapping mapping = PoolWorkerMapping())
   1322      : num_threads_(ClampedNumThreads(num_threads)),
   1323        div_workers_(1 + num_threads_),
   1324        shared_(pool::Shared::Get()),  // on first call, calls ReserveWorker(0)!
   1325        workers_(pool::WorkerLifecycle::Init(worker_bytes_, num_threads_,
   1326                                             mapping, div_workers_, shared_)) {
   1327    // Leaves the default wait mode as `kBlock`, which means futex, because
   1328    // spinning only makes sense when threads are pinned and wake latency is
   1329    // important, so it must explicitly be requested by calling `SetWaitMode`.
   1330    for (PoolWaitMode mode : {PoolWaitMode::kSpin, PoolWaitMode::kBlock}) {
   1331      wait_mode_ = mode;  // for AutoTuner
   1332      AutoTuner().SetCandidates(
   1333          pool::Config::AllCandidates(mode));
   1334    }
   1335 
   1336    // Skip empty pools because they do not update stats anyway.
   1337    if (num_threads_ > 0) {
   1338      Profiler::Get().AddFunc(this, [this]() { PrintStats(); });
   1339    }
   1340 
   1341    threads_.reserve(num_threads_);
   1342    for (size_t thread = 0; thread < num_threads_; ++thread) {
   1343      threads_.emplace_back(
   1344          ThreadFunc(workers_[1 + thread], tasks_, shared_, stats_));
   1345    }
   1346 
   1347    // Threads' `Config` defaults to spinning. Change to `kBlock` (see above).
   1348    // This also ensures all threads have started before we return, so that
   1349    // startup latency is billed to the ctor, not the first `Run`.
   1350    SendConfig(AutoTuner().Candidates()[0]);
   1351  }
   1352 
   1353  // If we created threads, waits for them all to exit.
   1354  ~ThreadPool() {
   1355    // There is no portable way to request threads to exit like `ExitThread` on
   1356    // Windows, otherwise we could call that from `Run`. Instead, we must cause
   1357    // the thread to wake up and exit. We can just use `Run`.
   1358    (void)RunWithoutAutotune(
   1359        0, NumWorkers(), shared_.dtor,
   1360        [this](HWY_MAYBE_UNUSED uint64_t task, size_t worker) {
   1361          HWY_DASSERT(task == worker);
   1362          workers_[worker].SetExit(Exit::kThread);
   1363        });
   1364 
   1365    for (std::thread& thread : threads_) {
   1366      HWY_DASSERT(thread.joinable());
   1367      thread.join();
   1368    }
   1369 
   1370    if (num_threads_ > 0) {
   1371      Profiler::Get().RemoveFunc(this);
   1372    }
   1373 
   1374    pool::WorkerLifecycle::Destroy(workers_, num_threads_);
   1375  }
   1376 
   1377  ThreadPool(const ThreadPool&) = delete;
   1378  ThreadPool& operator&(const ThreadPool&) = delete;
   1379 
   1380  // Returns number of Worker, i.e., one more than the largest `worker`
   1381  // argument. Useful for callers that want to allocate thread-local storage.
   1382  size_t NumWorkers() const {
   1383    return static_cast<size_t>(div_workers_.GetDivisor());
   1384  }
   1385 
   1386  // `mode` defaults to `kBlock`, which means futex. Switching to `kSpin`
   1387  // reduces fork-join overhead especially when there are many calls to `Run`,
   1388  // but wastes power when waiting over long intervals. Must not be called
   1389  // concurrently with any `Run`, because this uses the same waiter/barrier.
   1390  void SetWaitMode(PoolWaitMode mode) {
   1391    wait_mode_ = mode;
   1392    SendConfig(AutoTuneComplete() ? *AutoTuner().Best()
   1393                                  : AutoTuner().NextConfig());
   1394  }
   1395 
   1396  // For printing which is in use.
   1397  pool::Config config() const { return workers_[0].NextConfig(); }
   1398 
   1399  bool AutoTuneComplete() const { return AutoTuner().Best(); }
   1400  Span<CostDistribution> AutoTuneCosts() { return AutoTuner().Costs(); }
   1401 
   1402  static pool::Caller AddCaller(const char* name) {
   1403    return pool::Shared::Get().AddCaller(name);
   1404  }
   1405 
   1406  // parallel-for: Runs `closure(task, worker)` on workers for every `task` in
   1407  // `[begin, end)`. Note that the unit of work should be large enough to
   1408  // amortize the function call overhead, but small enough that each worker
   1409  // processes a few tasks. Thus each `task` is usually a loop.
   1410  //
   1411  // Not thread-safe - concurrent parallel-for in the same `ThreadPool` are
   1412  // forbidden unless `NumWorkers() == 1` or `end <= begin + 1`.
   1413  template <class Closure>
   1414  void Run(uint64_t begin, uint64_t end, pool::Caller caller,
   1415           const Closure& closure) {
   1416    AutoTuneT& auto_tuner = AutoTuner();
   1417    // Already finished tuning: run without time measurement.
   1418    if (HWY_LIKELY(auto_tuner.Best())) {
   1419      // Don't care whether threads ran, we are done either way.
   1420      (void)RunWithoutAutotune(begin, end, caller, closure);
   1421      return;
   1422    }
   1423 
   1424    // Not yet finished: measure time and notify autotuner.
   1425    Stopwatch stopwatch(shared_.MakeStopwatch());
   1426    // Skip update if threads didn't actually run.
   1427    if (!RunWithoutAutotune(begin, end, caller, closure)) return;
   1428    auto_tuner.NotifyCost(stopwatch.Elapsed());
   1429 
   1430    pool::Config next = auto_tuner.NextConfig();  // may be overwritten below
   1431    if (auto_tuner.Best()) {  // just finished
   1432      next = *auto_tuner.Best();
   1433      HWY_IF_CONSTEXPR(pool::kVerbosity >= 1) {
   1434        const size_t idx_best = static_cast<size_t>(
   1435            auto_tuner.Best() - auto_tuner.Candidates().data());
   1436        HWY_DASSERT(idx_best < auto_tuner.Costs().size());
   1437        auto& AT = auto_tuner.Costs()[idx_best];
   1438        const double best_cost = AT.EstimateCost();
   1439        HWY_DASSERT(best_cost > 0.0);  // will divide by this below
   1440 
   1441        Stats s_ratio;
   1442        for (size_t i = 0; i < auto_tuner.Costs().size(); ++i) {
   1443          if (i == idx_best) continue;
   1444          const double cost = auto_tuner.Costs()[i].EstimateCost();
   1445          s_ratio.Notify(static_cast<float>(cost / best_cost));
   1446        }
   1447 
   1448        fprintf(stderr,
   1449                "Pool %3zu: %s %8.0f +/- %6.0f. Gain %.2fx [%.2fx, %.2fx]\n",
   1450                NumWorkers(), auto_tuner.Best()->ToString().c_str(), best_cost,
   1451                AT.Stddev(), s_ratio.GeometricMean(),
   1452                static_cast<double>(s_ratio.Min()),
   1453                static_cast<double>(s_ratio.Max()));
   1454      }
   1455    }
   1456    SendConfig(next);
   1457  }
   1458 
   1459  // Backward-compatible version without Caller.
   1460  template <class Closure>
   1461  void Run(uint64_t begin, uint64_t end, const Closure& closure) {
   1462    Run(begin, end, pool::Caller(), closure);
   1463  }
   1464 
   1465 private:
   1466  // Called via `CallWithConfig`.
   1467  struct MainWakeAndBarrier {
   1468    template <class Spin, class Wait>
   1469    void operator()(const Spin& spin, const Wait& wait, pool::Worker& main,
   1470                    const pool::Tasks& tasks, const pool::Shared& shared,
   1471                    pool::Stats& stats) const {
   1472      const pool::Barrier barrier;
   1473      pool::Worker* workers = main.AllWorkers();
   1474      HWY_DASSERT(&main == main.AllWorkers());  // main is first.
   1475      const size_t num_threads = main.NumThreads();
   1476      const uint32_t epoch = main.AdvanceWorkerEpoch();
   1477 
   1478      HWY_IF_CONSTEXPR(HWY_IS_DEBUG_BUILD) {
   1479        for (size_t i = 0; i < 1 + num_threads; ++i) {
   1480          HWY_DASSERT(!barrier.HasReached(workers + i, epoch));
   1481        }
   1482      }
   1483 
   1484      Stopwatch stopwatch(shared.MakeStopwatch());
   1485      const timer::Ticks t_before_wake = stopwatch.Origin();
   1486      wait.WakeWorkers(workers, epoch);
   1487      const timer::Ticks d_wake = stopwatch.Elapsed();
   1488 
   1489      // Also perform work on the main thread before the barrier.
   1490      tasks.WorkerRun(&main, shared, stats);
   1491      const timer::Ticks d_run = stopwatch.Elapsed();
   1492 
   1493      // Spin-waits until all worker *threads* (not `main`, because it already
   1494      // knows it is here) called `WorkerReached`.
   1495      barrier.UntilReached(num_threads, workers, spin, epoch);
   1496      const timer::Ticks d_barrier = stopwatch.Elapsed();
   1497      stats.NotifyMainRun(main.NumThreads(), t_before_wake, d_wake, d_run,
   1498                          d_barrier);
   1499 
   1500      HWY_IF_CONSTEXPR(HWY_IS_DEBUG_BUILD) {
   1501        for (size_t i = 0; i < 1 + num_threads; ++i) {
   1502          HWY_DASSERT(barrier.HasReached(workers + i, epoch));
   1503        }
   1504      }
   1505 
   1506      // Threads are or will soon be waiting `UntilWoken`, which serves as the
   1507      // 'release' phase of the barrier.
   1508    }
   1509  };
   1510 
   1511  // Called by `std::thread`. Could also be a lambda.
   1512  class ThreadFunc {
   1513    // Functor called by `CallWithConfig`. Loops until `SendConfig` changes the
   1514    // Spin or Wait policy or the pool is destroyed.
   1515    struct WorkerLoop {
   1516      template <class Spin, class Wait>
   1517      void operator()(const Spin& spin, const Wait& wait, pool::Worker& worker,
   1518                      pool::Tasks& tasks, const pool::Shared& shared,
   1519                      pool::Stats& stats) const {
   1520        do {
   1521          // Main worker also calls this, so their epochs match.
   1522          const uint32_t epoch = worker.AdvanceWorkerEpoch();
   1523 
   1524          Stopwatch stopwatch(shared.MakeStopwatch());
   1525 
   1526          const size_t wait_reps = wait.UntilWoken(worker, spin);
   1527          const timer::Ticks d_wait = stopwatch.Elapsed();
   1528          const timer::Ticks t_before_run = stopwatch.Origin();
   1529 
   1530          tasks.WorkerRun(&worker, shared, stats);
   1531          const timer::Ticks d_run = stopwatch.Elapsed();
   1532          stats.NotifyThreadRun(worker.Index(), d_wait, wait_reps, t_before_run,
   1533                                d_run);
   1534 
   1535          // Notify barrier after `WorkerRun`. Note that we cannot send an
   1536          // after-barrier timestamp, see above.
   1537          pool::Barrier().WorkerReached(worker, epoch);
   1538          // Check after `WorkerReached`, otherwise the main thread deadlocks.
   1539        } while (worker.GetExit() == Exit::kNone);
   1540      }
   1541    };
   1542 
   1543   public:
   1544    ThreadFunc(pool::Worker& worker, pool::Tasks& tasks,
   1545               const pool::Shared& shared, pool::Stats& stats)
   1546        : worker_(worker), tasks_(tasks), shared_(shared), stats_(stats) {}
   1547 
   1548    void operator()() {
   1549      // Ensure main thread's writes are visible (synchronizes with fence in
   1550      // `WorkerLifecycle::Init`).
   1551      std::atomic_thread_fence(std::memory_order_acquire);
   1552 
   1553      HWY_DASSERT(worker_.Index() != 0);  // main is 0
   1554      SetThreadName("worker%03zu", static_cast<int>(worker_.Index() - 1));
   1555 
   1556      worker_.SetStartTime();
   1557      Profiler& profiler = Profiler::Get();
   1558      profiler.SetGlobalIdx(worker_.GlobalIdx());
   1559      // No Zone here because it would only exit after `GetExit`, which may be
   1560      // after the main thread's `PROFILER_END_ROOT_RUN`, and thus too late to
   1561      // be counted. Instead, `ProfilerFunc` records the elapsed time.
   1562 
   1563      // Loop termination via `GetExit` is triggered by `~ThreadPool`.
   1564      for (;;) {
   1565        // Uses the initial config, or the last one set during WorkerRun.
   1566        CallWithConfig(worker_.NextConfig(), WorkerLoop(), worker_, tasks_,
   1567                       shared_, stats_);
   1568 
   1569        // Exit or reset the flag and return to WorkerLoop with a new config.
   1570        if (worker_.GetExit() == Exit::kThread) break;
   1571        worker_.SetExit(Exit::kNone);
   1572      }
   1573 
   1574      profiler.SetGlobalIdx(~size_t{0});
   1575 
   1576      // Defer `FreeWorker` until workers are destroyed to ensure the profiler
   1577      // is not still using the worker.
   1578    }
   1579 
   1580   private:
   1581    pool::Worker& worker_;
   1582    pool::Tasks& tasks_;
   1583    const pool::Shared& shared_;
   1584    pool::Stats& stats_;
   1585  };
   1586 
   1587  void PrintStats() {
   1588    // Total run time from all non-main threads.
   1589    std::atomic<timer::Ticks> sum_thread_elapsed{0};
   1590    (void)RunWithoutAutotune(
   1591        0, NumWorkers(), shared_.print_stats,
   1592        [this, &sum_thread_elapsed](HWY_MAYBE_UNUSED uint64_t task,
   1593                                    size_t worker) {
   1594          HWY_DASSERT(task == worker);
   1595          // Skip any main thread(s) because they did not init the stopwatch.
   1596          if (worker != 0) {
   1597            sum_thread_elapsed.fetch_add(workers_[worker].ElapsedTime());
   1598          }
   1599        });
   1600    const timer::Ticks thread_total =
   1601        sum_thread_elapsed.load(std::memory_order_acquire);
   1602    stats_.PrintAndReset(num_threads_, thread_total);
   1603  }
   1604 
   1605  // Returns whether threads were used. If not, there is no need to update
   1606  // the autotuner config.
   1607  template <class Closure>
   1608  bool RunWithoutAutotune(uint64_t begin, uint64_t end, pool::Caller caller,
   1609                          const Closure& closure) {
   1610    pool::Worker& main = workers_[0];
   1611 
   1612    const size_t num_tasks = static_cast<size_t>(end - begin);
   1613    const size_t num_workers = NumWorkers();
   1614 
   1615    // If zero or one task, or no extra threads, run on the main thread without
   1616    // setting any member variables, because we may be re-entering Run.
   1617    if (HWY_UNLIKELY(num_tasks <= 1 || num_workers == 1)) {
   1618      for (uint64_t task = begin; task < end; ++task) {
   1619        closure(task, /*worker=*/0);
   1620      }
   1621      return false;
   1622    }
   1623 
   1624    busy_.Set();
   1625 
   1626 #if PROFILER_ENABLED
   1627    const bool is_root = PROFILER_IS_ROOT_RUN();
   1628    Stopwatch stopwatch(shared_.MakeStopwatch());
   1629    const timer::Ticks wait_before =
   1630        is_root ? shared_.LastRootEnd().Elapsed() : 0;
   1631 #endif
   1632 
   1633    tasks_.Set(begin, end, closure);
   1634 
   1635    // More than one task per worker: use work stealing.
   1636    if (HWY_LIKELY(num_tasks > num_workers)) {
   1637      pool::Tasks::DivideRangeAmongWorkers(begin, end, div_workers_, workers_);
   1638    }
   1639 
   1640    // Runs `MainWakeAndBarrier` with the first worker slot.
   1641    CallWithConfig(config(), MainWakeAndBarrier(), main, tasks_, shared_,
   1642                   stats_);
   1643 
   1644 #if PROFILER_ENABLED
   1645    pool::CallerAccumulator& acc =
   1646        shared_.Cluster(main.ClusterIdx()).Get(caller.Idx());
   1647    acc.Add(num_tasks, num_workers, is_root, wait_before, stopwatch.Elapsed());
   1648    if (is_root) {
   1649      PROFILER_END_ROOT_RUN();
   1650      shared_.LastRootEnd().Reset();
   1651    }
   1652 #else
   1653    (void)caller;
   1654 #endif
   1655 
   1656    busy_.Clear();
   1657    return true;
   1658  }
   1659 
   1660  // Sends `next_config` to workers:
   1661  // - Main wakes threads using the current config.
   1662  // - Threads copy `next_config` into their `Worker` during `WorkerRun`.
   1663  // - Threads notify the (same) barrier and already wait for the next wake
   1664  //   using `next_config`.
   1665  HWY_NOINLINE void SendConfig(pool::Config next_config) {
   1666    (void)RunWithoutAutotune(
   1667        0, NumWorkers(), shared_.send_config,
   1668        [this, next_config](HWY_MAYBE_UNUSED uint64_t task, size_t worker) {
   1669          HWY_DASSERT(task == worker);  // one task per worker
   1670          workers_[worker].SetNextConfig(next_config);
   1671          workers_[worker].SetExit(Exit::kLoop);
   1672        });
   1673 
   1674    // All have woken and are, or will be, waiting per `next_config`. Now we
   1675    // can entirely switch the main thread's config for the next wake.
   1676    workers_[0].SetNextConfig(next_config);
   1677  }
   1678 
   1679  using AutoTuneT = AutoTune<pool::Config, 30>;
   1680  AutoTuneT& AutoTuner() {
   1681    static_assert(static_cast<size_t>(PoolWaitMode::kBlock) == 1, "");
   1682    return auto_tune_[static_cast<size_t>(wait_mode_) - 1];
   1683  }
   1684  const AutoTuneT& AutoTuner() const {
   1685    return auto_tune_[static_cast<size_t>(wait_mode_) - 1];
   1686  }
   1687 
   1688  const size_t num_threads_;  // not including main thread
   1689  const Divisor64 div_workers_;
   1690  pool::Shared& shared_;
   1691  pool::Worker* const workers_;  // points into `worker_bytes_`
   1692 
   1693  alignas(HWY_ALIGNMENT) pool::Stats stats_;
   1694 
   1695  // This is written by the main thread and read by workers, via reference
   1696  // passed to `ThreadFunc`. Padding ensures that the workers' cache lines are
   1697  // not unnecessarily invalidated when the main thread writes other members.
   1698  alignas(HWY_ALIGNMENT) pool::Tasks tasks_;
   1699  HWY_MEMBER_VAR_MAYBE_UNUSED char
   1700      padding_[HWY_ALIGNMENT - sizeof(pool::Tasks)];
   1701 
   1702  pool::BusyFlag busy_;
   1703 
   1704  // Unmodified after ctor, but cannot be const because we call thread::join().
   1705  std::vector<std::thread> threads_;
   1706 
   1707  PoolWaitMode wait_mode_;
   1708  AutoTuneT auto_tune_[2];  // accessed via `AutoTuner`
   1709 
   1710  // Last because it is large. Store inside `ThreadPool` so that callers can
   1711  // bind it to the NUMA node's memory. Not stored inside `WorkerLifecycle`
   1712  // because that class would be initialized after `workers_`.
   1713  alignas(HWY_ALIGNMENT) uint8_t
   1714      worker_bytes_[sizeof(pool::Worker) * (pool::kMaxThreads + 1)];
   1715 };
   1716 
   1717 }  // namespace hwy
   1718 
   1719 #endif  // HIGHWAY_HWY_CONTRIB_THREAD_POOL_THREAD_POOL_H_