BoundedMPSCQueue.h (13805B)
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */ 3 /* This Source Code Form is subject to the terms of the Mozilla Public 4 * License, v. 2.0. If a copy of the MPL was not distributed with this 5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 6 7 /* 8 * Multiple Producer Single Consumer lock-free queue. 9 * Allocation-free is guaranteed outside of the constructor. 10 * 11 * This is a direct C++ port from 12 * https://docs.rs/signal-hook/0.3.17/src/signal_hook/low_level/channel.rs.html#1-235 13 * with the exception we are using atomic uint64t to have 15 slots in the ring 14 * buffer (Rust implem is 5 slots, we want a bit more). 15 * */ 16 17 #ifndef mozilla_BoundedMPSCQueue_h 18 #define mozilla_BoundedMPSCQueue_h 19 20 #include "mozilla/Assertions.h" 21 #include <algorithm> 22 #include <atomic> 23 #include <cinttypes> 24 #include <cstddef> 25 #include <cstdint> 26 #include <memory> 27 #include <optional> 28 29 namespace mozilla { 30 31 static constexpr bool MPSC_DEBUG = false; 32 33 /** 34 * This data structure allows producing data from several threads, and consuming 35 * it on one thread, safely and without performing memory allocations or 36 * locking. 37 * 38 * The role for the producers and the consumer must be constant, i.e., the 39 * producer should always be on one thread and the consumer should always be on 40 * another thread. 41 * 42 * Some words about the inner workings of this class: 43 * - Capacity is fixed. Only one allocation is performed, in the constructor. 44 * - Maximum capacity is 15 elements, with 0 being used to denote an empty set. 45 * This is a hard limitation from encoding indexes within the atomic uint64_t. 46 * - This is lock-free but not wait-free, it might spin a little until 47 * compare/exchange succeeds. 48 * - There is no guarantee of forward progression for individual threads. 49 * - This should be safe to use from a signal handler context. 50 */ 51 template <typename T, size_t kCapacity> 52 class MPSCRingBufferBase { 53 static constexpr size_t kMaxCapacity = 16; 54 55 public: 56 explicit MPSCRingBufferBase() : mFree(0), mOccupied(0) { 57 static_assert(kCapacity < kMaxCapacity); 58 59 if constexpr (MPSC_DEBUG) { 60 fprintf(stderr, 61 "[%s] this=%p { mCapacity=%zu, mBits=%" PRIu64 62 ", mMask=0x%" PRIx64 " }\n", 63 __PRETTY_FUNCTION__, this, 1 + kCapacity, mBits, mMask); 64 } 65 66 // Leave one empty space in the queue, used to distinguish an empty queue 67 // from a full one, as in the SPSCQueue. 68 // https://docs.rs/signal-hook/0.3.17/src/signal_hook/low_level/channel.rs.html#126 69 for (uint64_t i = 1; i < StorageCapacity(); ++i) { 70 MarkSlot(mFree, i); 71 } 72 73 // This should be the only allocation performed, thus it cannot be performed 74 // in a restricted context (e.g., signal handler, real-time thread) 75 mData = std::make_unique<T[]>(Capacity()); 76 77 std::atomic_thread_fence(std::memory_order_seq_cst); 78 } 79 80 /** 81 * @brief Put an element in the queue. The caller MUST check the return value 82 * and maybe loop to try again (or drop if acceptable). 83 * 84 * First it attempts to acquire a slot (storage index) that is known to be 85 * non used. If that is not successful then 0 is returned. If that is 86 * successful, the slot is ours (it has been exclusively acquired) and data 87 * can be copied into the ring buffer at that index. 88 * 89 * @param aElement The element to put in the queue. 90 * 91 * @return 0 if insertion could not be performed, inserted index otherwise 92 */ 93 [[nodiscard]] int Send(T& aElement) { 94 std::optional<uint64_t> empty_idx = UnmarkSlot(mFree); 95 if (empty_idx.has_value()) { 96 std::move(&aElement, &aElement + 1, &mData[*empty_idx - 1]); 97 MarkSlot(mOccupied, *empty_idx); 98 return *empty_idx; 99 } 100 return 0; 101 } 102 103 /** 104 * Retrieve one element from the ring buffer, and copy it to 105 * `aElement`, if non-null. 106 * 107 * It attempts to acquire a slot from the list of used ones. If that is not 108 * successfull, then 0 is returned. Once a slot has been exclusively acquired, 109 * data is copied from it into the non-null pointer passed in parameter. 110 * 111 * @param aElement A pointer to a `T` where data will be copied. 112 * 113 * @return The index from which data was copied, 0 if there was nothing in the 114 * ring buffer. 115 */ 116 [[nodiscard]] int Recv(T* aElement) { 117 std::optional<uint64_t> idx = UnmarkSlot(mOccupied); 118 if (idx.has_value()) { 119 if (aElement) { 120 std::move(&mData[*idx - 1], &mData[*idx], aElement); 121 } 122 MarkSlot(mFree, *idx); 123 return *idx; 124 } 125 return 0; 126 } 127 128 constexpr size_t Capacity() const { return StorageCapacity() - 1; } 129 130 private: 131 /* 132 * Get/Set manipulates the encoding within `aNumber` by storing the index as a 133 * number and shifting it to the left (set) or right (get). 134 * 135 * Initial `aNumber` value is 0. 136 * 137 * Set() with first index value (1), we store the index on mBits and we shift 138 * it to the left, e.g., as follows: 139 * 140 * aNumber=0b00000000000000000000000000000000000000000000000000000000000000 141 * aIndex=0 aValue=1 142 * aNumber=0b00000000000000000000000000000000000000000000000000000000000001 143 * aIndex=1 aValue=33 144 * aNumber=0b00000000000000000000000000000000000000000000000000000000100001 145 * aIndex=2 aValue=801 146 * aNumber=0b00000000000000000000000000000000000000000000000000001100100001 147 * aIndex=3 aValue=17185 148 * aNumber=0b00000000000000000000000000000000000000000000000100001100100001 149 * aIndex=4 aValue=344865 150 * aNumber=0b00000000000000000000000000000000000000000001010100001100100001 151 * aIndex=5 aValue=6636321 152 * aNumber=0b00000000000000000000000000000000000000011001010100001100100001 153 * aIndex=6 aValue=124076833 154 * aNumber=0b00000000000000000000000000000000000111011001010100001100100001 155 * aIndex=7 aValue=2271560481 156 * aNumber=0b00000000000000000000000000000010000111011001010100001100100001 157 * aIndex=8 aValue=40926266145 158 * aNumber=0b00000000000000000000000000100110000111011001010100001100100001 159 * aIndex=9 aValue=728121033505 160 * aNumber=0b00000000000000000000001010100110000111011001010100001100100001 161 * aIndex=10 aValue=12822748939041 162 * aNumber=0b00000000000000000010111010100110000111011001010100001100100001 163 * aIndex=11 aValue=223928981472033 164 * aNumber=0b00000000000000110010111010100110000111011001010100001100100001 165 * aIndex=12 aValue=3883103678710561 166 * aNumber=0b00000000001101110010111010100110000111011001010100001100100001 167 * aIndex=13 aValue=66933498461897505 168 * aNumber=0b00000011101101110010111010100110000111011001010100001100100001 169 * aIndex=14 aValue=1147797409030816545 170 */ 171 [[nodiscard]] uint64_t Get(uint64_t aNumber, uint64_t aIndex) { 172 return (aNumber >> (mBits * aIndex)) & mMask; 173 } 174 175 [[nodiscard]] uint64_t Set(uint64_t aNumber, uint64_t aIndex, 176 uint64_t aValue) { 177 return (aNumber & ~(mMask << (mBits * aIndex))) | 178 (aValue << (mBits * aIndex)); 179 } 180 181 /* 182 * Enqueue a value in the ring buffer at aIndex. 183 * 184 * Takes the current uint64_t value from the atomic and try to acquire a non 185 * used slot in the ring buffer. If unsuccessful, 0 is returned, otherwise 186 * compute the new atomic value that holds the new state of usage of the 187 * slots, and use compare/exchange to perform lock-free synchronization: 188 * compare/exchanges succeeds when the current value and the modified one are 189 * equal, reflecting an acquired lock. If another thread was concurrent to 190 * this one, then it would fail to that operation, and go into the next 191 * iteration of the loop to read the new state value from the atomic, and 192 * acquire a different slot. 193 * 194 * @param aSlotStatus a uint64_t atomic that is used to perform lock-free 195 * thread exclusions 196 * 197 * @param aIndex the index where we want to enqueue. It should come from the 198 * empty queue 199 * */ 200 void MarkSlot(std::atomic<uint64_t>& aSlotStatus, uint64_t aIndex) { 201 uint64_t current = aSlotStatus.load(std::memory_order_relaxed); 202 do { 203 // Attempts to find a slot that is available to enqueue, without 204 // cross-thread synchronization 205 auto empty = [&]() -> std::optional<uint64_t> { 206 for (uint64_t i = 0; i < Capacity(); ++i) { 207 if (Get(current, i) == 0) { 208 return i; 209 } 210 } 211 return {}; 212 }(); 213 if (!empty.has_value()) { 214 // Rust does expect() which would panic: 215 // https://docs.rs/signal-hook/0.3.17/src/signal_hook/low_level/channel.rs.html#62 216 // If there's no empty place, then it would be up to the caller to deal 217 // with that 218 MOZ_CRASH("No empty slot available"); 219 } 220 uint64_t modified = Set(current, *empty, aIndex); 221 // This is where the lock-free synchronization happens ; if `current` 222 // matches the content of `aSlotStatus`, then store `modified` in 223 // aSlotStatus and succeeds. Upon success it means no other thread has 224 // tried to change the same value at the same time, so the lock was safely 225 // acquired. 226 // 227 // Upon failure, it means another thread tried at the same time to use the 228 // same slot, so a new iteration of the loop needs to be executed to try 229 // another slot. 230 // 231 // In case of success (`aSlotStatus`'s content is equal to `current`), we 232 // require memory_order_release for the read-modify-write operation 233 // because we want to make sure when acquiring a slot that any concurrent 234 // thread performing a write had a chance to do it. 235 // 236 // In case of failure we require memory_order_relaxed for the load 237 // operation because we dont need synchronization at that point. 238 if (aSlotStatus.compare_exchange_weak(current, modified, 239 std::memory_order_release, 240 std::memory_order_relaxed)) { 241 if constexpr (MPSC_DEBUG) { 242 fprintf(stderr, 243 "[enqueue] modified=0x%" PRIx64 " => index=%" PRIu64 "\n", 244 modified, aIndex); 245 } 246 return; 247 } 248 } while (true); 249 } 250 251 /* 252 * Dequeue a value from the ring buffer. 253 * 254 * Takes the current value from the uint64_t atomic and read the current index 255 * out of it. If that index is 0 then we are facing a lack of slots and we 256 * return, the caller MUST check this and deal with the situation. If the 257 * index is non null we can try to acquire the matching slot in the ring 258 * buffer thanks to the compare/exchange loop. When the compare/exchange call 259 * succeeds, then the slot was acquired. 260 * 261 * @param aSlotStatus a uint64_t atomic that is used to perform lock-free 262 * thread exclusions 263 * */ 264 [[nodiscard]] std::optional<uint64_t> UnmarkSlot( 265 std::atomic<uint64_t>& aSlotStatus) { 266 uint64_t current = aSlotStatus.load(std::memory_order_relaxed); 267 do { 268 uint64_t index = current & mMask; 269 if (index == 0) { 270 // Return a None 271 // https://docs.rs/signal-hook/0.3.17/src/signal_hook/low_level/channel.rs.html#77 272 // If we return None while dequeuing on mFree then we are full and the 273 // caller needs to deal with that. 274 return {}; 275 } 276 uint64_t modified = current >> mBits; 277 // See the comment in MarkSlot for details 278 // 279 // In case of success (`aSlotStatus`'s content is equal to `current`), we 280 // require memory_order_acquire for the read-modify-write operation 281 // because we want to make sure when unmarking a slot that any concurrent 282 // thread performing a read will see the value we are writing. 283 // 284 // In case of failure we require memory_order_relaxed for the load 285 // operation because we don't need synchronization at that point. 286 if (aSlotStatus.compare_exchange_weak(current, modified, 287 std::memory_order_acquire, 288 std::memory_order_relaxed)) { 289 if constexpr (MPSC_DEBUG) { 290 fprintf(stderr, 291 "[dequeue] current=0x%" PRIx64 " => index=%" PRIu64 "\n", 292 current, index); 293 } 294 return index; 295 } 296 } while (true); 297 return {}; 298 } 299 300 // Return the number of elements we can store within the ring buffer, whereas 301 // Capacity() will return the amount of elements in mData, including the 0 302 // value. 303 [[nodiscard]] constexpr size_t StorageCapacity() const { 304 return 1 + kCapacity; 305 } 306 307 // For the atomics below they are manipulated by Get()/Set(), and we are using 308 // them to store the IDs of the ring buffer usage (empty/full). 309 // 310 // We use mBits bits to store an ID (so we are limited to 16 and 0 is 311 // reserved) and append each of them to the atomics. 312 // 313 // A 0 value in one of those denotes we are full for the atomic, i.e., 314 // mFree=0 means we are full and mOccupied=0 means we are empty. 315 316 // Holds the IDs of the free slots in the ring buffer 317 std::atomic<uint64_t> mFree; 318 319 // Holds the IDs of the occupied slots in the ring buffer 320 std::atomic<uint64_t> mOccupied; 321 322 // The actual ring buffer 323 std::unique_ptr<T[]> mData; 324 325 // How we are using the uint64_t atomic above to store the IDs of the ring 326 // buffer. 327 static constexpr uint64_t mBits = 4; 328 static constexpr uint64_t mMask = 0b1111; 329 }; 330 331 /** 332 * Instantiation of the `MPSCRingBufferBase` type. This is safe to use from 333 * several producers threads and one one consumer (that never changes role), 334 * without explicit synchronization nor allocation (outside of the constructor). 335 */ 336 template <typename T, size_t Capacity> 337 using BoundedMPSCQueue = MPSCRingBufferBase<T, Capacity>; 338 339 } // namespace mozilla 340 341 #endif // mozilla_BoundedMPSCQueue_h