per_thread_sem_test.cc (6240B)
1 // Copyright 2017 The Abseil Authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #include "absl/synchronization/internal/per_thread_sem.h" 16 17 #include <atomic> 18 #include <condition_variable> // NOLINT(build/c++11) 19 #include <functional> 20 #include <limits> 21 #include <mutex> // NOLINT(build/c++11) 22 #include <string> 23 #include <thread> // NOLINT(build/c++11) 24 25 #include "gtest/gtest.h" 26 #include "absl/base/config.h" 27 #include "absl/base/internal/cycleclock.h" 28 #include "absl/base/internal/thread_identity.h" 29 #include "absl/strings/str_cat.h" 30 #include "absl/time/clock.h" 31 #include "absl/time/time.h" 32 33 // In this test we explicitly avoid the use of synchronization 34 // primitives which might use PerThreadSem, most notably absl::Mutex. 35 36 namespace absl { 37 ABSL_NAMESPACE_BEGIN 38 namespace synchronization_internal { 39 40 class SimpleSemaphore { 41 public: 42 SimpleSemaphore() : count_(0) {} 43 44 // Decrements (locks) the semaphore. If the semaphore's value is 45 // greater than zero, then the decrement proceeds, and the function 46 // returns, immediately. If the semaphore currently has the value 47 // zero, then the call blocks until it becomes possible to perform 48 // the decrement. 49 void Wait() { 50 std::unique_lock<std::mutex> lock(mu_); 51 cv_.wait(lock, [this]() { return count_ > 0; }); 52 --count_; 53 cv_.notify_one(); 54 } 55 56 // Increments (unlocks) the semaphore. If the semaphore's value 57 // consequently becomes greater than zero, then another thread 58 // blocked Wait() call will be woken up and proceed to lock the 59 // semaphore. 60 void Post() { 61 std::lock_guard<std::mutex> lock(mu_); 62 ++count_; 63 cv_.notify_one(); 64 } 65 66 private: 67 std::mutex mu_; 68 std::condition_variable cv_; 69 int count_; 70 }; 71 72 struct ThreadData { 73 int num_iterations; // Number of replies to send. 74 SimpleSemaphore identity2_written; // Posted by thread writing identity2. 75 base_internal::ThreadIdentity *identity1; // First Post()-er. 76 base_internal::ThreadIdentity *identity2; // First Wait()-er. 77 KernelTimeout timeout; 78 }; 79 80 // Need friendship with PerThreadSem. 81 class PerThreadSemTest : public testing::Test { 82 public: 83 static void TimingThread(ThreadData* t) { 84 t->identity2 = GetOrCreateCurrentThreadIdentity(); 85 t->identity2_written.Post(); 86 while (t->num_iterations--) { 87 Wait(t->timeout); 88 Post(t->identity1); 89 } 90 } 91 92 void TestTiming(const char *msg, bool timeout) { 93 static const int kNumIterations = 100; 94 ThreadData t; 95 t.num_iterations = kNumIterations; 96 t.timeout = timeout ? 97 KernelTimeout(absl::Now() + absl::Seconds(10000)) // far in the future 98 : KernelTimeout::Never(); 99 t.identity1 = GetOrCreateCurrentThreadIdentity(); 100 101 // We can't use the Thread class here because it uses the Mutex 102 // class which will invoke PerThreadSem, so we use std::thread instead. 103 std::thread partner_thread(std::bind(TimingThread, &t)); 104 105 // Wait for our partner thread to register their identity. 106 t.identity2_written.Wait(); 107 108 int64_t min_cycles = std::numeric_limits<int64_t>::max(); 109 int64_t total_cycles = 0; 110 for (int i = 0; i < kNumIterations; ++i) { 111 absl::SleepFor(absl::Milliseconds(20)); 112 int64_t cycles = base_internal::CycleClock::Now(); 113 Post(t.identity2); 114 Wait(t.timeout); 115 cycles = base_internal::CycleClock::Now() - cycles; 116 min_cycles = std::min(min_cycles, cycles); 117 total_cycles += cycles; 118 } 119 std::string out = StrCat( 120 msg, "min cycle count=", min_cycles, " avg cycle count=", 121 absl::SixDigits(static_cast<double>(total_cycles) / kNumIterations)); 122 printf("%s\n", out.c_str()); 123 124 partner_thread.join(); 125 } 126 127 protected: 128 static void Post(base_internal::ThreadIdentity *id) { 129 PerThreadSem::Post(id); 130 } 131 static bool Wait(KernelTimeout t) { 132 return PerThreadSem::Wait(t); 133 } 134 135 // convenience overload 136 static bool Wait(absl::Time t) { 137 return Wait(KernelTimeout(t)); 138 } 139 140 static void Tick(base_internal::ThreadIdentity *identity) { 141 PerThreadSem::Tick(identity); 142 } 143 }; 144 145 namespace { 146 147 TEST_F(PerThreadSemTest, WithoutTimeout) { 148 PerThreadSemTest::TestTiming("Without timeout: ", false); 149 } 150 151 TEST_F(PerThreadSemTest, WithTimeout) { 152 PerThreadSemTest::TestTiming("With timeout: ", true); 153 } 154 155 TEST_F(PerThreadSemTest, Timeouts) { 156 const absl::Duration delay = absl::Milliseconds(50); 157 const absl::Time start = absl::Now(); 158 EXPECT_FALSE(Wait(start + delay)); 159 const absl::Duration elapsed = absl::Now() - start; 160 // Allow for a slight early return, to account for quality of implementation 161 // issues on various platforms. 162 absl::Duration slop = absl::Milliseconds(1); 163 #ifdef _MSC_VER 164 // Use higher slop on MSVC due to flaky test failures. 165 slop = absl::Milliseconds(16); 166 #endif 167 EXPECT_LE(delay - slop, elapsed) 168 << "Wait returned " << delay - elapsed 169 << " early (with " << slop << " slop), start time was " << start; 170 171 absl::Time negative_timeout = absl::UnixEpoch() - absl::Milliseconds(100); 172 EXPECT_FALSE(Wait(negative_timeout)); 173 EXPECT_LE(negative_timeout, absl::Now() + slop); // trivially true :) 174 175 Post(GetOrCreateCurrentThreadIdentity()); 176 // The wait here has an expired timeout, but we have a wake to consume, 177 // so this should succeed 178 EXPECT_TRUE(Wait(negative_timeout)); 179 } 180 181 TEST_F(PerThreadSemTest, ThreadIdentityReuse) { 182 // Create a base_internal::ThreadIdentity object and keep reusing it. There 183 // should be no memory or resource leaks. 184 for (int i = 0; i < 10000; i++) { 185 std::thread t([]() { GetOrCreateCurrentThreadIdentity(); }); 186 t.join(); 187 } 188 } 189 190 } // namespace 191 192 } // namespace synchronization_internal 193 ABSL_NAMESPACE_END 194 } // namespace absl