SPSCQueue.h (15182B)
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */ 3 /* This Source Code Form is subject to the terms of the Mozilla Public 4 * License, v. 2.0. If a copy of the MPL was not distributed with this 5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 6 7 /* Single producer single consumer lock-free and wait-free queue. */ 8 9 #ifndef mozilla_LockFreeQueue_h 10 #define mozilla_LockFreeQueue_h 11 12 #include "mozilla/Assertions.h" 13 #include "mozilla/PodOperations.h" 14 #include <algorithm> 15 #include <atomic> 16 #include <cstddef> 17 #include <limits> 18 #include <memory> 19 #include <thread> 20 #include <type_traits> 21 22 namespace mozilla { 23 24 namespace detail { 25 template <typename T, bool IsPod = std::is_trivial<T>::value> 26 struct MemoryOperations { 27 /** 28 * This allows zeroing (using memset) or default-constructing a number of 29 * elements calling the constructors if necessary. 30 */ 31 static void ConstructDefault(T* aDestination, size_t aCount); 32 /** 33 * This allows either moving (if T supports it) or copying a number of 34 * elements from a `aSource` pointer to a `aDestination` pointer. 35 * If it is safe to do so and this call copies, this uses PodCopy. Otherwise, 36 * constructors and destructors are called in a loop. 37 */ 38 static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount); 39 }; 40 41 template <typename T> 42 struct MemoryOperations<T, true> { 43 static void ConstructDefault(T* aDestination, size_t aCount) { 44 PodZero(aDestination, aCount); 45 } 46 static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount) { 47 PodCopy(aDestination, aSource, aCount); 48 } 49 }; 50 51 template <typename T> 52 struct MemoryOperations<T, false> { 53 static void ConstructDefault(T* aDestination, size_t aCount) { 54 for (size_t i = 0; i < aCount; i++) { 55 aDestination[i] = T(); 56 } 57 } 58 static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount) { 59 std::move(aSource, aSource + aCount, aDestination); 60 } 61 }; 62 } // namespace detail 63 64 /** 65 * This data structure allows producing data from one thread, and consuming it 66 * on another thread, safely and without explicit synchronization. 67 * 68 * The role for the producer and the consumer must be constant, i.e., the 69 * producer should always be on one thread and the consumer should always be on 70 * another thread. 71 * 72 * Some words about the inner workings of this class: 73 * - Capacity is fixed. Only one allocation is performed, in the constructor. 74 * When reading and writing, the return value of the method allows checking if 75 * the ring buffer is empty or full. 76 * - We always keep the read index at least one element ahead of the write 77 * index, so we can distinguish between an empty and a full ring buffer: an 78 * empty ring buffer is when the write index is at the same position as the 79 * read index. A full buffer is when the write index is exactly one position 80 * before the read index. 81 * - We synchronize updates to the read index after having read the data, and 82 * the write index after having written the data. This means that the each 83 * thread can only touch a portion of the buffer that is not touched by the 84 * other thread. 85 * - Callers are expected to provide buffers. When writing to the queue, 86 * elements are copied into the internal storage from the buffer passed in. 87 * When reading from the queue, the user is expected to provide a buffer. 88 * Because this is a ring buffer, data might not be contiguous in memory; 89 * providing an external buffer to copy into is an easy way to have linear 90 * data for further processing. 91 */ 92 template <typename T> 93 class SPSCRingBufferBase { 94 public: 95 /** 96 * Constructor for a ring buffer. 97 * 98 * This performs an allocation on the heap, but is the only allocation that 99 * will happen for the life time of a `SPSCRingBufferBase`. 100 * 101 * @param Capacity The maximum number of element this ring buffer will hold. 102 */ 103 explicit SPSCRingBufferBase(int aCapacity) 104 : mReadIndex(0), 105 mWriteIndex(0), 106 /* One more element to distinguish from empty and full buffer. */ 107 mCapacity(aCapacity + 1) { 108 MOZ_RELEASE_ASSERT(aCapacity != std::numeric_limits<int>::max()); 109 MOZ_RELEASE_ASSERT(mCapacity > 0); 110 111 mData = std::make_unique<T[]>(StorageCapacity()); 112 113 std::atomic_thread_fence(std::memory_order_seq_cst); 114 } 115 /** 116 * Push `aCount` zero or default constructed elements in the array. 117 * 118 * Only safely called on the producer thread. 119 * 120 * @param count The number of elements to enqueue. 121 * @return The number of element enqueued. 122 */ 123 [[nodiscard]] int EnqueueDefault(int aCount) { 124 return Enqueue(nullptr, aCount); 125 } 126 /** 127 * @brief Put an element in the queue. 128 * 129 * Only safely called on the producer thread. 130 * 131 * @param element The element to put in the queue. 132 * 133 * @return 1 if the element was inserted, 0 otherwise. 134 */ 135 [[nodiscard]] int Enqueue(T& aElement) { return Enqueue(&aElement, 1); } 136 /** 137 * Push `aCount` elements in the ring buffer. 138 * 139 * Only safely called on the producer thread. 140 * 141 * @param elements a pointer to a buffer containing at least `count` elements. 142 * If `elements` is nullptr, zero or default constructed elements are enqueud. 143 * @param count The number of elements to read from `elements` 144 * @return The number of elements successfully coped from `elements` and 145 * inserted into the ring buffer. 146 */ 147 [[nodiscard]] int Enqueue(T* aElements, int aCount) { 148 #ifdef DEBUG 149 AssertCorrectThread(mProducerId); 150 #endif 151 152 int rdIdx = mReadIndex.load(std::memory_order_acquire); 153 int wrIdx = mWriteIndex.load(std::memory_order_relaxed); 154 155 if (IsFull(rdIdx, wrIdx)) { 156 return 0; 157 } 158 159 int toWrite = std::min(AvailableWriteInternal(rdIdx, wrIdx), aCount); 160 161 /* First part, from the write index to the end of the array. */ 162 int firstPart = std::min(StorageCapacity() - wrIdx, toWrite); 163 /* Second part, from the beginning of the array */ 164 int secondPart = toWrite - firstPart; 165 166 if (aElements) { 167 detail::MemoryOperations<T>::MoveOrCopy(mData.get() + wrIdx, aElements, 168 firstPart); 169 detail::MemoryOperations<T>::MoveOrCopy( 170 mData.get(), aElements + firstPart, secondPart); 171 } else { 172 detail::MemoryOperations<T>::ConstructDefault(mData.get() + wrIdx, 173 firstPart); 174 detail::MemoryOperations<T>::ConstructDefault(mData.get(), secondPart); 175 } 176 177 mWriteIndex.store(IncrementIndex(wrIdx, toWrite), 178 std::memory_order_release); 179 180 return toWrite; 181 } 182 /** 183 * Retrieve at most `count` elements from the ring buffer, and copy them to 184 * `elements`, if non-null. 185 * 186 * Only safely called on the consumer side. 187 * 188 * @param elements A pointer to a buffer with space for at least `count` 189 * elements. If `elements` is `nullptr`, `count` element will be discarded. 190 * @param count The maximum number of elements to Dequeue. 191 * @return The number of elements written to `elements`. 192 */ 193 [[nodiscard]] int Dequeue(T* elements, int count) { 194 #ifdef DEBUG 195 AssertCorrectThread(mConsumerId); 196 #endif 197 198 int wrIdx = mWriteIndex.load(std::memory_order_acquire); 199 int rdIdx = mReadIndex.load(std::memory_order_relaxed); 200 201 if (IsEmpty(rdIdx, wrIdx)) { 202 return 0; 203 } 204 205 int toRead = std::min(AvailableReadInternal(rdIdx, wrIdx), count); 206 207 int firstPart = std::min(StorageCapacity() - rdIdx, toRead); 208 int secondPart = toRead - firstPart; 209 210 if (elements) { 211 detail::MemoryOperations<T>::MoveOrCopy(elements, mData.get() + rdIdx, 212 firstPart); 213 detail::MemoryOperations<T>::MoveOrCopy(elements + firstPart, mData.get(), 214 secondPart); 215 } 216 217 mReadIndex.store(IncrementIndex(rdIdx, toRead), std::memory_order_release); 218 219 return toRead; 220 } 221 /** 222 * Get the number of available elements for consuming. 223 * 224 * This can be less than the actual number of elements in the queue, since the 225 * mWriteIndex is updated at the very end of the Enqueue method on the 226 * producer thread, but consequently always returns a number of elements such 227 * that a call to Dequeue return this number of elements. 228 * 229 * @return The number of available elements for reading. 230 */ 231 int AvailableRead() const { 232 return AvailableReadInternal(mReadIndex.load(std::memory_order_relaxed), 233 mWriteIndex.load(std::memory_order_relaxed)); 234 } 235 /** 236 * Get the number of available elements for writing. 237 * 238 * This can be less than than the actual number of slots that are available, 239 * because mReadIndex is updated at the very end of the Deque method. It 240 * always returns a number such that a call to Enqueue with this number will 241 * succeed in enqueuing this number of elements. 242 * 243 * @return The number of empty slots in the buffer, available for writing. 244 */ 245 int AvailableWrite() const { 246 return AvailableWriteInternal(mReadIndex.load(std::memory_order_relaxed), 247 mWriteIndex.load(std::memory_order_relaxed)); 248 } 249 /** 250 * Get the total Capacity, for this ring buffer. 251 * 252 * Can be called safely on any thread. 253 * 254 * @return The maximum Capacity of this ring buffer. 255 */ 256 int Capacity() const { return StorageCapacity() - 1; } 257 258 /** 259 * Reset the consumer thread id to the current thread. The caller must 260 * guarantee that the last call to Dequeue() on the previous consumer thread 261 * has completed, and subsequent calls to Dequeue() will only happen on the 262 * current thread. 263 */ 264 void ResetConsumerThreadId() { 265 #ifdef DEBUG 266 mConsumerId = std::this_thread::get_id(); 267 #endif 268 269 // When changing consumer from thread A to B, the last Dequeue on A (synced 270 // by mReadIndex.store with memory_order_release) must be picked up by B 271 // through an acquire operation. 272 std::ignore = mReadIndex.load(std::memory_order_acquire); 273 } 274 275 /** 276 * Reset the producer thread id to the current thread. The caller must 277 * guarantee that the last call to Enqueue() on the previous consumer thread 278 * has completed, and subsequent calls to Dequeue() will only happen on the 279 * current thread. 280 */ 281 void ResetProducerThreadId() { 282 #ifdef DEBUG 283 mProducerId = std::this_thread::get_id(); 284 #endif 285 286 // When changing producer from thread A to B, the last Enqueue on A (synced 287 // by mWriteIndex.store with memory_order_release) must be picked up by B 288 // through an acquire operation. 289 std::ignore = mWriteIndex.load(std::memory_order_acquire); 290 } 291 292 private: 293 /** Return true if the ring buffer is empty. 294 * 295 * This can be called from the consumer or the producer thread. 296 * 297 * @param aReadIndex the read index to consider 298 * @param writeIndex the write index to consider 299 * @return true if the ring buffer is empty, false otherwise. 300 **/ 301 bool IsEmpty(int aReadIndex, int aWriteIndex) const { 302 return aWriteIndex == aReadIndex; 303 } 304 /** Return true if the ring buffer is full. 305 * 306 * This happens if the write index is exactly one element behind the read 307 * index. 308 * 309 * This can be called from the consummer or the producer thread. 310 * 311 * @param aReadIndex the read index to consider 312 * @param writeIndex the write index to consider 313 * @return true if the ring buffer is full, false otherwise. 314 **/ 315 bool IsFull(int aReadIndex, int aWriteIndex) const { 316 return (aWriteIndex + 1) % StorageCapacity() == aReadIndex; 317 } 318 /** 319 * Return the size of the storage. It is one more than the number of elements 320 * that can be stored in the buffer. 321 * 322 * This can be called from any thread. 323 * 324 * @return the number of elements that can be stored in the buffer. 325 */ 326 int StorageCapacity() const { return mCapacity; } 327 /** 328 * Returns the number of elements available for reading. 329 * 330 * This can be called from the consummer or producer thread, but see the 331 * comment in `AvailableRead`. 332 * 333 * @return the number of available elements for reading. 334 */ 335 int AvailableReadInternal(int aReadIndex, int aWriteIndex) const { 336 if (aWriteIndex >= aReadIndex) { 337 return aWriteIndex - aReadIndex; 338 } else { 339 return aWriteIndex + StorageCapacity() - aReadIndex; 340 } 341 } 342 /** 343 * Returns the number of empty elements, available for writing. 344 * 345 * This can be called from the consummer or producer thread, but see the 346 * comment in `AvailableWrite`. 347 * 348 * @return the number of elements that can be written into the array. 349 */ 350 int AvailableWriteInternal(int aReadIndex, int aWriteIndex) const { 351 /* We subtract one element here to always keep at least one sample 352 * free in the buffer, to distinguish between full and empty array. */ 353 int rv = aReadIndex - aWriteIndex - 1; 354 if (aWriteIndex >= aReadIndex) { 355 rv += StorageCapacity(); 356 } 357 return rv; 358 } 359 /** 360 * Increments an index, wrapping it around the storage. 361 * 362 * Incrementing `mWriteIndex` can be done on the producer thread. 363 * Incrementing `mReadIndex` can be done on the consummer thread. 364 * 365 * @param index a reference to the index to increment. 366 * @param increment the number by which `index` is incremented. 367 * @return the new index. 368 */ 369 int IncrementIndex(int aIndex, int aIncrement) const { 370 MOZ_ASSERT(aIncrement >= 0 && aIncrement < StorageCapacity() && 371 aIndex < StorageCapacity()); 372 return (aIndex + aIncrement) % StorageCapacity(); 373 } 374 /** 375 * @brief This allows checking that Enqueue (resp. Dequeue) are always 376 * called by the right thread. 377 * 378 * The role of the thread are assigned the first time they call Enqueue or 379 * Dequeue, and cannot change, except by a ResetThreadId method. 380 * 381 * @param id the id of the thread that has called the calling method first. 382 */ 383 #ifdef DEBUG 384 static void AssertCorrectThread(std::thread::id& aId) { 385 if (aId == std::thread::id()) { 386 aId = std::this_thread::get_id(); 387 return; 388 } 389 MOZ_ASSERT(aId == std::this_thread::get_id()); 390 } 391 #endif 392 /** Index at which the oldest element is. */ 393 std::atomic<int> mReadIndex; 394 /** Index at which to write new elements. `mWriteIndex` is always at 395 * least one element ahead of `mReadIndex`. */ 396 std::atomic<int> mWriteIndex; 397 /** Maximum number of elements that can be stored in the ring buffer. */ 398 const int mCapacity; 399 /** Data storage, of size `mCapacity + 1` */ 400 std::unique_ptr<T[]> mData; 401 #ifdef DEBUG 402 /** The id of the only thread that is allowed to read from the queue. */ 403 mutable std::thread::id mConsumerId; 404 /** The id of the only thread that is allowed to write from the queue. */ 405 mutable std::thread::id mProducerId; 406 #endif 407 }; 408 409 /** 410 * Instantiation of the `SPSCRingBufferBase` type. This is safe to use 411 * from two threads, one producer, one consumer (that never change role), 412 * without explicit synchronization. 413 */ 414 template <typename T> 415 using SPSCQueue = SPSCRingBufferBase<T>; 416 417 } // namespace mozilla 418 419 #endif // mozilla_LockFreeQueue_h