sequence_lock_test.cc (5800B)
1 // Copyright 2020 The Abseil Authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 #include "absl/flags/internal/sequence_lock.h" 15 16 #include <algorithm> 17 #include <atomic> 18 #include <thread> // NOLINT(build/c++11) 19 #include <tuple> 20 #include <vector> 21 22 #include "gtest/gtest.h" 23 #include "absl/base/internal/sysinfo.h" 24 #include "absl/container/fixed_array.h" 25 #include "absl/time/clock.h" 26 27 namespace { 28 29 namespace flags = absl::flags_internal; 30 31 class ConcurrentSequenceLockTest 32 : public testing::TestWithParam<std::tuple<int, int>> { 33 public: 34 ConcurrentSequenceLockTest() 35 : buf_bytes_(std::get<0>(GetParam())), 36 num_threads_(std::get<1>(GetParam())) {} 37 38 protected: 39 const int buf_bytes_; 40 const int num_threads_; 41 }; 42 43 TEST_P(ConcurrentSequenceLockTest, ReadAndWrite) { 44 const int buf_words = 45 flags::AlignUp(buf_bytes_, sizeof(uint64_t)) / sizeof(uint64_t); 46 47 // The buffer that will be protected by the SequenceLock. 48 absl::FixedArray<std::atomic<uint64_t>> protected_buf(buf_words); 49 for (auto& v : protected_buf) v = -1; 50 51 flags::SequenceLock seq_lock; 52 std::atomic<bool> stop{false}; 53 std::atomic<int64_t> bad_reads{0}; 54 std::atomic<int64_t> good_reads{0}; 55 std::atomic<int64_t> unsuccessful_reads{0}; 56 57 // Start a bunch of threads which read 'protected_buf' under the sequence 58 // lock. The main thread will concurrently update 'protected_buf'. The updates 59 // always consist of an array of identical integers. The reader ensures that 60 // any data it reads matches that pattern (i.e. the reads are not "torn"). 61 std::vector<std::thread> threads; 62 for (int i = 0; i < num_threads_; i++) { 63 threads.emplace_back([&]() { 64 absl::FixedArray<char> local_buf(buf_bytes_); 65 while (!stop.load(std::memory_order_relaxed)) { 66 if (seq_lock.TryRead(local_buf.data(), protected_buf.data(), 67 buf_bytes_)) { 68 bool good = true; 69 for (const auto& v : local_buf) { 70 if (v != local_buf[0]) good = false; 71 } 72 if (good) { 73 good_reads.fetch_add(1, std::memory_order_relaxed); 74 } else { 75 bad_reads.fetch_add(1, std::memory_order_relaxed); 76 } 77 } else { 78 unsuccessful_reads.fetch_add(1, std::memory_order_relaxed); 79 } 80 } 81 }); 82 } 83 while (unsuccessful_reads.load(std::memory_order_relaxed) < num_threads_) { 84 absl::SleepFor(absl::Milliseconds(1)); 85 } 86 seq_lock.MarkInitialized(); 87 88 // Run a maximum of 5 seconds. On Windows, the scheduler behavior seems 89 // somewhat unfair and without an explicit timeout for this loop, the tests 90 // can run a long time. 91 absl::Time deadline = absl::Now() + absl::Seconds(5); 92 for (int i = 0; i < 100 && absl::Now() < deadline; i++) { 93 absl::FixedArray<char> writer_buf(buf_bytes_); 94 for (auto& v : writer_buf) v = i; 95 seq_lock.Write(protected_buf.data(), writer_buf.data(), buf_bytes_); 96 absl::SleepFor(absl::Microseconds(10)); 97 } 98 stop.store(true, std::memory_order_relaxed); 99 for (auto& t : threads) t.join(); 100 ASSERT_GE(good_reads, 0); 101 ASSERT_EQ(bad_reads, 0); 102 } 103 104 // Simple helper for generating a range of thread counts. 105 // Generates [low, low*scale, low*scale^2, ...high) 106 // (even if high is between low*scale^k and low*scale^(k+1)). 107 std::vector<int> MultiplicativeRange(int low, int high, int scale) { 108 std::vector<int> result; 109 for (int current = low; current < high; current *= scale) { 110 result.push_back(current); 111 } 112 result.push_back(high); 113 return result; 114 } 115 116 #ifndef ABSL_HAVE_THREAD_SANITIZER 117 const int kMaxThreads = absl::base_internal::NumCPUs(); 118 #else 119 // With TSAN, a lot of threads contending for atomic access on the sequence 120 // lock make this test run too slowly. 121 const int kMaxThreads = std::min(absl::base_internal::NumCPUs(), 4); 122 #endif 123 124 // Return all of the interesting buffer sizes worth testing: 125 // powers of two and adjacent values. 126 std::vector<int> InterestingBufferSizes() { 127 std::vector<int> ret; 128 for (int v : MultiplicativeRange(1, 128, 2)) { 129 ret.push_back(v); 130 if (v > 1) { 131 ret.push_back(v - 1); 132 } 133 ret.push_back(v + 1); 134 } 135 return ret; 136 } 137 138 INSTANTIATE_TEST_SUITE_P( 139 TestManyByteSizes, ConcurrentSequenceLockTest, 140 testing::Combine( 141 // Buffer size (bytes). 142 testing::ValuesIn(InterestingBufferSizes()), 143 // Number of reader threads. 144 testing::ValuesIn(MultiplicativeRange(1, kMaxThreads, 2)))); 145 146 // Simple single-threaded test, parameterized by the size of the buffer to be 147 // protected. 148 class SequenceLockTest : public testing::TestWithParam<int> {}; 149 150 TEST_P(SequenceLockTest, SingleThreaded) { 151 const int size = GetParam(); 152 absl::FixedArray<std::atomic<uint64_t>> protected_buf( 153 flags::AlignUp(size, sizeof(uint64_t)) / sizeof(uint64_t)); 154 155 flags::SequenceLock seq_lock; 156 seq_lock.MarkInitialized(); 157 158 std::vector<char> src_buf(size, 'x'); 159 seq_lock.Write(protected_buf.data(), src_buf.data(), size); 160 161 std::vector<char> dst_buf(size, '0'); 162 ASSERT_TRUE(seq_lock.TryRead(dst_buf.data(), protected_buf.data(), size)); 163 ASSERT_EQ(src_buf, dst_buf); 164 } 165 INSTANTIATE_TEST_SUITE_P(TestManyByteSizes, SequenceLockTest, 166 // Buffer size (bytes). 167 testing::Range(1, 128)); 168 169 } // namespace