per_thread_sem.cc (3877B)
1 // Copyright 2017 The Abseil Authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 // This file is a no-op if the required LowLevelAlloc support is missing. 16 #include "absl/base/internal/low_level_alloc.h" 17 #ifndef ABSL_LOW_LEVEL_ALLOC_MISSING 18 19 #include "absl/synchronization/internal/per_thread_sem.h" 20 21 #include <atomic> 22 23 #include "absl/base/attributes.h" 24 #include "absl/base/internal/thread_identity.h" 25 #include "absl/synchronization/internal/waiter.h" 26 27 namespace absl { 28 ABSL_NAMESPACE_BEGIN 29 namespace synchronization_internal { 30 31 void PerThreadSem::SetThreadBlockedCounter(std::atomic<int> *counter) { 32 base_internal::ThreadIdentity *identity; 33 identity = GetOrCreateCurrentThreadIdentity(); 34 identity->blocked_count_ptr = counter; 35 } 36 37 std::atomic<int> *PerThreadSem::GetThreadBlockedCounter() { 38 base_internal::ThreadIdentity *identity; 39 identity = GetOrCreateCurrentThreadIdentity(); 40 return identity->blocked_count_ptr; 41 } 42 43 void PerThreadSem::Tick(base_internal::ThreadIdentity *identity) { 44 const int ticker = 45 identity->ticker.fetch_add(1, std::memory_order_relaxed) + 1; 46 const int wait_start = identity->wait_start.load(std::memory_order_relaxed); 47 const bool is_idle = identity->is_idle.load(std::memory_order_relaxed); 48 if (wait_start && (ticker - wait_start > Waiter::kIdlePeriods) && !is_idle) { 49 // Wakeup the waiting thread since it is time for it to become idle. 50 ABSL_INTERNAL_C_SYMBOL(AbslInternalPerThreadSemPoke)(identity); 51 } 52 } 53 54 } // namespace synchronization_internal 55 ABSL_NAMESPACE_END 56 } // namespace absl 57 58 extern "C" { 59 60 ABSL_ATTRIBUTE_WEAK void ABSL_INTERNAL_C_SYMBOL(AbslInternalPerThreadSemInit)( 61 absl::base_internal::ThreadIdentity *identity) { 62 new (absl::synchronization_internal::Waiter::GetWaiter(identity)) 63 absl::synchronization_internal::Waiter(); 64 } 65 66 ABSL_ATTRIBUTE_WEAK void ABSL_INTERNAL_C_SYMBOL(AbslInternalPerThreadSemPost)( 67 absl::base_internal::ThreadIdentity *identity) { 68 absl::synchronization_internal::Waiter::GetWaiter(identity)->Post(); 69 } 70 71 ABSL_ATTRIBUTE_WEAK void ABSL_INTERNAL_C_SYMBOL(AbslInternalPerThreadSemPoke)( 72 absl::base_internal::ThreadIdentity *identity) { 73 absl::synchronization_internal::Waiter::GetWaiter(identity)->Poke(); 74 } 75 76 ABSL_ATTRIBUTE_WEAK bool ABSL_INTERNAL_C_SYMBOL(AbslInternalPerThreadSemWait)( 77 absl::synchronization_internal::KernelTimeout t) { 78 bool timeout = false; 79 absl::base_internal::ThreadIdentity *identity; 80 identity = absl::synchronization_internal::GetOrCreateCurrentThreadIdentity(); 81 82 // Ensure wait_start != 0. 83 int ticker = identity->ticker.load(std::memory_order_relaxed); 84 identity->wait_start.store(ticker ? ticker : 1, std::memory_order_relaxed); 85 identity->is_idle.store(false, std::memory_order_relaxed); 86 87 if (identity->blocked_count_ptr != nullptr) { 88 // Increment count of threads blocked in a given thread pool. 89 identity->blocked_count_ptr->fetch_add(1, std::memory_order_relaxed); 90 } 91 92 timeout = 93 !absl::synchronization_internal::Waiter::GetWaiter(identity)->Wait(t); 94 95 if (identity->blocked_count_ptr != nullptr) { 96 identity->blocked_count_ptr->fetch_sub(1, std::memory_order_relaxed); 97 } 98 99 identity->is_idle.store(false, std::memory_order_relaxed); 100 identity->wait_start.store(0, std::memory_order_relaxed); 101 return !timeout; 102 } 103 104 } // extern "C" 105 106 #endif // ABSL_LOW_LEVEL_ALLOC_MISSING