tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

bench_parallel.cc (7674B)


      1 // Copyright 2021 Google LLC
      2 // SPDX-License-Identifier: Apache-2.0
      3 //
      4 // Licensed under the Apache License, Version 2.0 (the "License");
      5 // you may not use this file except in compliance with the License.
      6 // You may obtain a copy of the License at
      7 //
      8 //      http://www.apache.org/licenses/LICENSE-2.0
      9 //
     10 // Unless required by applicable law or agreed to in writing, software
     11 // distributed under the License is distributed on an "AS IS" BASIS,
     12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13 // See the License for the specific language governing permissions and
     14 // limitations under the License.
     15 
     16 // Concurrent, independent sorts for generating more memory traffic and testing
     17 // scalability when bandwidth-limited. If you want to use multiple threads for
     18 // a single sort, you can use ips4o and integrate vqsort by calling it from
     19 // `baseCaseSort` and increasing `IPS4OML_BASE_CASE_SIZE` to say 8192.
     20 
     21 #include <stdint.h>
     22 #include <stdio.h>
     23 
     24 #include <condition_variable>  //NOLINT
     25 #include <functional>
     26 #include <mutex>   //NOLINT
     27 #include <thread>  //NOLINT
     28 #include <vector>
     29 
     30 #include "hwy/timer.h"
     31 
     32 // clang-format off
     33 #undef HWY_TARGET_INCLUDE
     34 #define HWY_TARGET_INCLUDE "hwy/contrib/sort/bench_parallel.cc"  //NOLINT
     35 #include "hwy/foreach_target.h"  // IWYU pragma: keep
     36 
     37 // After foreach_target
     38 #include "hwy/contrib/sort/algo-inl.h"
     39 #include "hwy/contrib/sort/result-inl.h"
     40 #include "hwy/aligned_allocator.h"
     41 // Last
     42 #include "hwy/tests/test_util-inl.h"
     43 // clang-format on
     44 
     45 HWY_BEFORE_NAMESPACE();
     46 namespace hwy {
     47 namespace HWY_NAMESPACE {
     48 namespace {
     49 
     50 class ThreadPool {
     51 public:
     52  // Starts the given number of worker threads and blocks until they are ready.
     53  explicit ThreadPool(
     54      const size_t num_threads = std::thread::hardware_concurrency())
     55      : num_threads_(num_threads) {
     56    HWY_ASSERT(num_threads_ > 0);
     57    threads_.reserve(num_threads_);
     58    for (size_t i = 0; i < num_threads_; ++i) {
     59      threads_.emplace_back(ThreadFunc, this, i);
     60    }
     61 
     62    WorkersReadyBarrier();
     63  }
     64 
     65  ThreadPool(const ThreadPool&) = delete;
     66  ThreadPool& operator&(const ThreadPool&) = delete;
     67 
     68  // Waits for all threads to exit.
     69  ~ThreadPool() {
     70    StartWorkers(kWorkerExit);
     71 
     72    for (std::thread& thread : threads_) {
     73      thread.join();
     74    }
     75  }
     76 
     77  size_t NumThreads() const { return threads_.size(); }
     78 
     79  template <class Func>
     80  void RunOnThreads(size_t max_threads, const Func& func) {
     81    task_ = &CallClosure<Func>;
     82    data_ = &func;
     83    StartWorkers(max_threads);
     84    WorkersReadyBarrier();
     85  }
     86 
     87 private:
     88  // After construction and between calls to Run, workers are "ready", i.e.
     89  // waiting on worker_start_cv_. They are "started" by sending a "command"
     90  // and notifying all worker_start_cv_ waiters. (That is why all workers
     91  // must be ready/waiting - otherwise, the notification will not reach all of
     92  // them and the main thread waits in vain for them to report readiness.)
     93  using WorkerCommand = uint64_t;
     94 
     95  static constexpr WorkerCommand kWorkerWait = ~1ULL;
     96  static constexpr WorkerCommand kWorkerExit = ~2ULL;
     97 
     98  // Calls a closure (lambda with captures).
     99  template <class Closure>
    100  static void CallClosure(const void* f, size_t thread) {
    101    (*reinterpret_cast<const Closure*>(f))(thread);
    102  }
    103 
    104  void WorkersReadyBarrier() {
    105    std::unique_lock<std::mutex> lock(mutex_);
    106    // Typically only a single iteration.
    107    while (workers_ready_ != threads_.size()) {
    108      workers_ready_cv_.wait(lock);
    109    }
    110    workers_ready_ = 0;
    111 
    112    // Safely handle spurious worker wakeups.
    113    worker_start_command_ = kWorkerWait;
    114  }
    115 
    116  // Precondition: all workers are ready.
    117  void StartWorkers(const WorkerCommand worker_command) {
    118    std::unique_lock<std::mutex> lock(mutex_);
    119    worker_start_command_ = worker_command;
    120    // Workers will need this lock, so release it before they wake up.
    121    lock.unlock();
    122    worker_start_cv_.notify_all();
    123  }
    124 
    125  static void ThreadFunc(ThreadPool* self, size_t thread) {
    126    // Until kWorkerExit command received:
    127    for (;;) {
    128      std::unique_lock<std::mutex> lock(self->mutex_);
    129      // Notify main thread that this thread is ready.
    130      if (++self->workers_ready_ == self->num_threads_) {
    131        self->workers_ready_cv_.notify_one();
    132      }
    133    RESUME_WAIT:
    134      // Wait for a command.
    135      self->worker_start_cv_.wait(lock);
    136      const WorkerCommand command = self->worker_start_command_;
    137      switch (command) {
    138        case kWorkerWait:    // spurious wakeup:
    139          goto RESUME_WAIT;  // lock still held, avoid incrementing ready.
    140        case kWorkerExit:
    141          return;  // exits thread
    142        default:
    143          break;
    144      }
    145 
    146      lock.unlock();
    147      // Command is the maximum number of threads that should run the task.
    148      HWY_ASSERT(command < self->NumThreads());
    149      if (thread < command) {
    150        self->task_(self->data_, thread);
    151      }
    152    }
    153  }
    154 
    155  const size_t num_threads_;
    156 
    157  // Unmodified after ctor, but cannot be const because we call thread::join().
    158  std::vector<std::thread> threads_;
    159 
    160  std::mutex mutex_;  // guards both cv and their variables.
    161  std::condition_variable workers_ready_cv_;
    162  size_t workers_ready_ = 0;
    163  std::condition_variable worker_start_cv_;
    164  WorkerCommand worker_start_command_;
    165 
    166  // Written by main thread, read by workers (after mutex lock/unlock).
    167  std::function<void(const void*, size_t)> task_;  // points to CallClosure
    168  const void* data_;                               // points to caller's Func
    169 };
    170 
    171 template <class Traits>
    172 void RunWithoutVerify(Traits st, const Dist dist, const size_t num_keys,
    173                      const Algo algo, SharedState& shared, size_t thread) {
    174  using LaneType = typename Traits::LaneType;
    175  using KeyType = typename Traits::KeyType;
    176  using Order = typename Traits::Order;
    177  const size_t num_lanes = num_keys * st.LanesPerKey();
    178  auto aligned = hwy::AllocateAligned<LaneType>(num_lanes);
    179 
    180  (void)GenerateInput(dist, aligned.get(), num_lanes);
    181 
    182  const Timestamp t0;
    183  Run(algo, reinterpret_cast<KeyType*>(aligned.get()), num_keys, shared, thread,
    184      /*k_keys=*/0, Order());
    185  HWY_ASSERT(aligned[0] < aligned[num_lanes - 1]);
    186 }
    187 
    188 void BenchParallel() {
    189  // Not interested in benchmark results for other targets on x86
    190  if (HWY_ARCH_X86 &&
    191      (HWY_TARGET != HWY_AVX2 && HWY_TARGET != HWY_AVX3 &&
    192       HWY_TARGET != HWY_AVX3_ZEN4 && HWY_TARGET != HWY_AVX3_SPR)) {
    193    return;
    194  }
    195 
    196  ThreadPool pool;
    197  const size_t NT = pool.NumThreads();
    198 
    199  detail::SharedTraits<detail::TraitsLane<detail::OrderAscending<int64_t>>> st;
    200  using KeyType = typename decltype(st)::KeyType;
    201  const size_t num_keys = size_t{100} * 1000 * 1000;
    202 
    203 #if HAVE_IPS4O
    204  const Algo algo = Algo::kIPS4O;
    205 #else
    206  const Algo algo = Algo::kVQSort;
    207 #endif
    208  const Dist dist = Dist::kUniform32;
    209 
    210  SharedState shared;
    211 
    212  std::vector<SortResult> results;
    213  for (size_t nt = 1; nt < NT; nt += HWY_MAX(1, NT / 16)) {
    214    Timestamp t0;
    215    // Default capture because MSVC wants algo/dist but clang does not.
    216    pool.RunOnThreads(nt, [=, &shared](size_t thread) {
    217      RunWithoutVerify(st, dist, num_keys, algo, shared, thread);
    218    });
    219    const double sec = SecondsSince(t0);
    220    results.emplace_back(algo, dist, num_keys, nt, sec, sizeof(KeyType),
    221                         st.KeyString());
    222    results.back().Print();
    223  }
    224 }
    225 
    226 }  // namespace
    227 // NOLINTNEXTLINE(google-readability-namespace-comments)
    228 }  // namespace HWY_NAMESPACE
    229 }  // namespace hwy
    230 HWY_AFTER_NAMESPACE();
    231 
    232 #if HWY_ONCE
    233 
    234 namespace hwy {
    235 namespace {
    236 HWY_BEFORE_TEST(BenchParallel);
    237 HWY_EXPORT_AND_TEST_P(BenchParallel, BenchParallel);
    238 HWY_AFTER_TEST();
    239 }  // namespace
    240 }  // namespace hwy
    241 
    242 #endif  // HWY_ONCE