thread_parallel_runner_test.cc (4173B)
1 // Copyright (c) the JPEG XL Project Authors. All rights reserved. 2 // 3 // Use of this source code is governed by a BSD-style 4 // license that can be found in the LICENSE file. 5 6 #include <algorithm> 7 #include <atomic> 8 #include <cstdint> 9 #include <vector> 10 11 #include "lib/jxl/base/data_parallel.h" 12 #include "lib/jxl/base/status.h" 13 #include "lib/jxl/test_utils.h" 14 #include "lib/jxl/testing.h" 15 16 using ::jxl::test::ThreadPoolForTests; 17 18 namespace jpegxl { 19 namespace { 20 21 int PopulationCount(uint64_t bits) { 22 int num_set = 0; 23 while (bits != 0) { 24 num_set += bits & 1; 25 bits >>= 1; 26 } 27 return num_set; 28 } 29 30 // Ensures task parameter is in bounds, every parameter is reached, 31 // pool can be reused (multiple consecutive Run calls), pool can be destroyed 32 // (joining with its threads), num_threads=0 works (runs on current thread). 33 TEST(ThreadParallelRunnerTest, TestPool) { 34 for (int num_threads = 0; num_threads <= 18; ++num_threads) { 35 ThreadPoolForTests pool(num_threads); 36 for (int num_tasks = 0; num_tasks < 32; ++num_tasks) { 37 std::vector<int> mementos(num_tasks); 38 for (int begin = 0; begin < 32; ++begin) { 39 std::fill(mementos.begin(), mementos.end(), 0); 40 const auto do_task = [begin, num_tasks, &mementos]( 41 const int task, 42 const int thread) -> jxl::Status { 43 // Parameter is in the given range 44 EXPECT_GE(task, begin); 45 EXPECT_LT(task, begin + num_tasks); 46 47 // Store mementos to be sure we visited each task. 48 mementos.at(task - begin) = 1000 + task; 49 return true; 50 }; 51 EXPECT_TRUE(RunOnPool(pool.get(), begin, begin + num_tasks, 52 jxl::ThreadPool::NoInit, do_task, "TestPool")); 53 for (int task = begin; task < begin + num_tasks; ++task) { 54 EXPECT_EQ(1000 + task, mementos.at(task - begin)); 55 } 56 } 57 } 58 } 59 } 60 61 // Verify "thread" parameter when processing few tasks. 62 TEST(ThreadParallelRunnerTest, TestSmallAssignments) { 63 const int kMaxThreads = 8; 64 for (int num_threads = 1; num_threads <= kMaxThreads; ++num_threads) { 65 ThreadPoolForTests pool(num_threads); 66 67 // (Avoid mutex because it may perturb the worker thread scheduling) 68 std::atomic<uint64_t> id_bits{0}; 69 std::atomic<int> num_calls{0}; 70 const auto do_task = [&num_calls, num_threads, &id_bits]( 71 const int task, const int thread) -> jxl::Status { 72 num_calls.fetch_add(1, std::memory_order_relaxed); 73 74 EXPECT_LT(thread, num_threads); 75 uint64_t bits = id_bits.load(std::memory_order_relaxed); 76 while (!id_bits.compare_exchange_weak(bits, bits | (1ULL << thread))) { 77 // lock-free retry-loop 78 } 79 return true; 80 }; 81 EXPECT_TRUE(RunOnPool(pool.get(), 0, num_threads, jxl::ThreadPool::NoInit, 82 do_task, "TestSmallAssignments")); 83 84 // Correct number of tasks. 85 EXPECT_EQ(num_threads, num_calls.load()); 86 87 const int num_participants = PopulationCount(id_bits.load()); 88 // Can't expect equality because other workers may have woken up too late. 89 EXPECT_LE(num_participants, num_threads); 90 } 91 } 92 93 struct Counter { 94 Counter() { 95 // Suppress "unused-field" warning. 96 (void)padding; 97 } 98 void Assimilate(const Counter& victim) { counter += victim.counter; } 99 int counter = 0; 100 int padding[31]; 101 }; 102 103 TEST(ThreadParallelRunnerTest, TestCounter) { 104 const int kNumThreads = 12; 105 ThreadPoolForTests pool(kNumThreads); 106 alignas(128) Counter counters[kNumThreads]; 107 108 const int kNumTasks = kNumThreads * 19; 109 const auto count = [&counters](const int task, 110 const int thread) -> jxl::Status { 111 counters[thread].counter += task; 112 return true; 113 }; 114 EXPECT_TRUE(RunOnPool(pool.get(), 0, kNumTasks, jxl::ThreadPool::NoInit, 115 count, "TestCounter")); 116 117 int expected = 0; 118 for (int i = 0; i < kNumTasks; ++i) { 119 expected += i; 120 } 121 122 for (int i = 1; i < kNumThreads; ++i) { 123 counters[0].Assimilate(counters[i]); 124 } 125 EXPECT_EQ(expected, counters[0].counter); 126 } 127 128 } // namespace 129 } // namespace jpegxl