tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

SPSCQueue.h (15182B)


      1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
      2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
      3 /* This Source Code Form is subject to the terms of the Mozilla Public
      4 * License, v. 2.0. If a copy of the MPL was not distributed with this
      5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
      6 
      7 /* Single producer single consumer lock-free and wait-free queue. */
      8 
      9 #ifndef mozilla_LockFreeQueue_h
     10 #define mozilla_LockFreeQueue_h
     11 
     12 #include "mozilla/Assertions.h"
     13 #include "mozilla/PodOperations.h"
     14 #include <algorithm>
     15 #include <atomic>
     16 #include <cstddef>
     17 #include <limits>
     18 #include <memory>
     19 #include <thread>
     20 #include <type_traits>
     21 
     22 namespace mozilla {
     23 
     24 namespace detail {
     25 template <typename T, bool IsPod = std::is_trivial<T>::value>
     26 struct MemoryOperations {
     27  /**
     28   * This allows zeroing (using memset) or default-constructing a number of
     29   * elements calling the constructors if necessary.
     30   */
     31  static void ConstructDefault(T* aDestination, size_t aCount);
     32  /**
     33   * This allows either moving (if T supports it) or copying a number of
     34   * elements from a `aSource` pointer to a `aDestination` pointer.
     35   * If it is safe to do so and this call copies, this uses PodCopy. Otherwise,
     36   * constructors and destructors are called in a loop.
     37   */
     38  static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount);
     39 };
     40 
     41 template <typename T>
     42 struct MemoryOperations<T, true> {
     43  static void ConstructDefault(T* aDestination, size_t aCount) {
     44    PodZero(aDestination, aCount);
     45  }
     46  static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount) {
     47    PodCopy(aDestination, aSource, aCount);
     48  }
     49 };
     50 
     51 template <typename T>
     52 struct MemoryOperations<T, false> {
     53  static void ConstructDefault(T* aDestination, size_t aCount) {
     54    for (size_t i = 0; i < aCount; i++) {
     55      aDestination[i] = T();
     56    }
     57  }
     58  static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount) {
     59    std::move(aSource, aSource + aCount, aDestination);
     60  }
     61 };
     62 }  // namespace detail
     63 
     64 /**
     65 * This data structure allows producing data from one thread, and consuming it
     66 * on another thread, safely and without explicit synchronization.
     67 *
     68 * The role for the producer and the consumer must be constant, i.e., the
     69 * producer should always be on one thread and the consumer should always be on
     70 * another thread.
     71 *
     72 * Some words about the inner workings of this class:
     73 * - Capacity is fixed. Only one allocation is performed, in the constructor.
     74 *   When reading and writing, the return value of the method allows checking if
     75 *   the ring buffer is empty or full.
     76 * - We always keep the read index at least one element ahead of the write
     77 *   index, so we can distinguish between an empty and a full ring buffer: an
     78 *   empty ring buffer is when the write index is at the same position as the
     79 *   read index. A full buffer is when the write index is exactly one position
     80 *   before the read index.
     81 * - We synchronize updates to the read index after having read the data, and
     82 *   the write index after having written the data. This means that the each
     83 *   thread can only touch a portion of the buffer that is not touched by the
     84 *   other thread.
     85 * - Callers are expected to provide buffers. When writing to the queue,
     86 *   elements are copied into the internal storage from the buffer passed in.
     87 *   When reading from the queue, the user is expected to provide a buffer.
     88 *   Because this is a ring buffer, data might not be contiguous in memory;
     89 *   providing an external buffer to copy into is an easy way to have linear
     90 *   data for further processing.
     91 */
     92 template <typename T>
     93 class SPSCRingBufferBase {
     94 public:
     95  /**
     96   * Constructor for a ring buffer.
     97   *
     98   * This performs an allocation on the heap, but is the only allocation that
     99   * will happen for the life time of a `SPSCRingBufferBase`.
    100   *
    101   * @param Capacity The maximum number of element this ring buffer will hold.
    102   */
    103  explicit SPSCRingBufferBase(int aCapacity)
    104      : mReadIndex(0),
    105        mWriteIndex(0),
    106        /* One more element to distinguish from empty and full buffer. */
    107        mCapacity(aCapacity + 1) {
    108    MOZ_RELEASE_ASSERT(aCapacity != std::numeric_limits<int>::max());
    109    MOZ_RELEASE_ASSERT(mCapacity > 0);
    110 
    111    mData = std::make_unique<T[]>(StorageCapacity());
    112 
    113    std::atomic_thread_fence(std::memory_order_seq_cst);
    114  }
    115  /**
    116   * Push `aCount` zero or default constructed elements in the array.
    117   *
    118   * Only safely called on the producer thread.
    119   *
    120   * @param count The number of elements to enqueue.
    121   * @return The number of element enqueued.
    122   */
    123  [[nodiscard]] int EnqueueDefault(int aCount) {
    124    return Enqueue(nullptr, aCount);
    125  }
    126  /**
    127   * @brief Put an element in the queue.
    128   *
    129   * Only safely called on the producer thread.
    130   *
    131   * @param element The element to put in the queue.
    132   *
    133   * @return 1 if the element was inserted, 0 otherwise.
    134   */
    135  [[nodiscard]] int Enqueue(T& aElement) { return Enqueue(&aElement, 1); }
    136  /**
    137   * Push `aCount` elements in the ring buffer.
    138   *
    139   * Only safely called on the producer thread.
    140   *
    141   * @param elements a pointer to a buffer containing at least `count` elements.
    142   * If `elements` is nullptr, zero or default constructed elements are enqueud.
    143   * @param count The number of elements to read from `elements`
    144   * @return The number of elements successfully coped from `elements` and
    145   * inserted into the ring buffer.
    146   */
    147  [[nodiscard]] int Enqueue(T* aElements, int aCount) {
    148 #ifdef DEBUG
    149    AssertCorrectThread(mProducerId);
    150 #endif
    151 
    152    int rdIdx = mReadIndex.load(std::memory_order_acquire);
    153    int wrIdx = mWriteIndex.load(std::memory_order_relaxed);
    154 
    155    if (IsFull(rdIdx, wrIdx)) {
    156      return 0;
    157    }
    158 
    159    int toWrite = std::min(AvailableWriteInternal(rdIdx, wrIdx), aCount);
    160 
    161    /* First part, from the write index to the end of the array. */
    162    int firstPart = std::min(StorageCapacity() - wrIdx, toWrite);
    163    /* Second part, from the beginning of the array */
    164    int secondPart = toWrite - firstPart;
    165 
    166    if (aElements) {
    167      detail::MemoryOperations<T>::MoveOrCopy(mData.get() + wrIdx, aElements,
    168                                              firstPart);
    169      detail::MemoryOperations<T>::MoveOrCopy(
    170          mData.get(), aElements + firstPart, secondPart);
    171    } else {
    172      detail::MemoryOperations<T>::ConstructDefault(mData.get() + wrIdx,
    173                                                    firstPart);
    174      detail::MemoryOperations<T>::ConstructDefault(mData.get(), secondPart);
    175    }
    176 
    177    mWriteIndex.store(IncrementIndex(wrIdx, toWrite),
    178                      std::memory_order_release);
    179 
    180    return toWrite;
    181  }
    182  /**
    183   * Retrieve at most `count` elements from the ring buffer, and copy them to
    184   * `elements`, if non-null.
    185   *
    186   * Only safely called on the consumer side.
    187   *
    188   * @param elements A pointer to a buffer with space for at least `count`
    189   * elements. If `elements` is `nullptr`, `count` element will be discarded.
    190   * @param count The maximum number of elements to Dequeue.
    191   * @return The number of elements written to `elements`.
    192   */
    193  [[nodiscard]] int Dequeue(T* elements, int count) {
    194 #ifdef DEBUG
    195    AssertCorrectThread(mConsumerId);
    196 #endif
    197 
    198    int wrIdx = mWriteIndex.load(std::memory_order_acquire);
    199    int rdIdx = mReadIndex.load(std::memory_order_relaxed);
    200 
    201    if (IsEmpty(rdIdx, wrIdx)) {
    202      return 0;
    203    }
    204 
    205    int toRead = std::min(AvailableReadInternal(rdIdx, wrIdx), count);
    206 
    207    int firstPart = std::min(StorageCapacity() - rdIdx, toRead);
    208    int secondPart = toRead - firstPart;
    209 
    210    if (elements) {
    211      detail::MemoryOperations<T>::MoveOrCopy(elements, mData.get() + rdIdx,
    212                                              firstPart);
    213      detail::MemoryOperations<T>::MoveOrCopy(elements + firstPart, mData.get(),
    214                                              secondPart);
    215    }
    216 
    217    mReadIndex.store(IncrementIndex(rdIdx, toRead), std::memory_order_release);
    218 
    219    return toRead;
    220  }
    221  /**
    222   * Get the number of available elements for consuming.
    223   *
    224   * This can be less than the actual number of elements in the queue, since the
    225   * mWriteIndex is updated at the very end of the Enqueue method on the
    226   * producer thread, but consequently always returns a number of elements such
    227   * that a call to Dequeue return this number of elements.
    228   *
    229   * @return The number of available elements for reading.
    230   */
    231  int AvailableRead() const {
    232    return AvailableReadInternal(mReadIndex.load(std::memory_order_relaxed),
    233                                 mWriteIndex.load(std::memory_order_relaxed));
    234  }
    235  /**
    236   * Get the number of available elements for writing.
    237   *
    238   * This can be less than than the actual number of slots that are available,
    239   * because mReadIndex is updated at the very end of the Deque method. It
    240   * always returns a number such that a call to Enqueue with this number will
    241   * succeed in enqueuing this number of elements.
    242   *
    243   * @return The number of empty slots in the buffer, available for writing.
    244   */
    245  int AvailableWrite() const {
    246    return AvailableWriteInternal(mReadIndex.load(std::memory_order_relaxed),
    247                                  mWriteIndex.load(std::memory_order_relaxed));
    248  }
    249  /**
    250   * Get the total Capacity, for this ring buffer.
    251   *
    252   * Can be called safely on any thread.
    253   *
    254   * @return The maximum Capacity of this ring buffer.
    255   */
    256  int Capacity() const { return StorageCapacity() - 1; }
    257 
    258  /**
    259   * Reset the consumer thread id to the current thread. The caller must
    260   * guarantee that the last call to Dequeue() on the previous consumer thread
    261   * has completed, and subsequent calls to Dequeue() will only happen on the
    262   * current thread.
    263   */
    264  void ResetConsumerThreadId() {
    265 #ifdef DEBUG
    266    mConsumerId = std::this_thread::get_id();
    267 #endif
    268 
    269    // When changing consumer from thread A to B, the last Dequeue on A (synced
    270    // by mReadIndex.store with memory_order_release) must be picked up by B
    271    // through an acquire operation.
    272    std::ignore = mReadIndex.load(std::memory_order_acquire);
    273  }
    274 
    275  /**
    276   * Reset the producer thread id to the current thread. The caller must
    277   * guarantee that the last call to Enqueue() on the previous consumer thread
    278   * has completed, and subsequent calls to Dequeue() will only happen on the
    279   * current thread.
    280   */
    281  void ResetProducerThreadId() {
    282 #ifdef DEBUG
    283    mProducerId = std::this_thread::get_id();
    284 #endif
    285 
    286    // When changing producer from thread A to B, the last Enqueue on A (synced
    287    // by mWriteIndex.store with memory_order_release) must be picked up by B
    288    // through an acquire operation.
    289    std::ignore = mWriteIndex.load(std::memory_order_acquire);
    290  }
    291 
    292 private:
    293  /** Return true if the ring buffer is empty.
    294   *
    295   * This can be called from the consumer or the producer thread.
    296   *
    297   * @param aReadIndex the read index to consider
    298   * @param writeIndex the write index to consider
    299   * @return true if the ring buffer is empty, false otherwise.
    300   **/
    301  bool IsEmpty(int aReadIndex, int aWriteIndex) const {
    302    return aWriteIndex == aReadIndex;
    303  }
    304  /** Return true if the ring buffer is full.
    305   *
    306   * This happens if the write index is exactly one element behind the read
    307   * index.
    308   *
    309   * This can be called from the consummer or the producer thread.
    310   *
    311   * @param aReadIndex the read index to consider
    312   * @param writeIndex the write index to consider
    313   * @return true if the ring buffer is full, false otherwise.
    314   **/
    315  bool IsFull(int aReadIndex, int aWriteIndex) const {
    316    return (aWriteIndex + 1) % StorageCapacity() == aReadIndex;
    317  }
    318  /**
    319   * Return the size of the storage. It is one more than the number of elements
    320   * that can be stored in the buffer.
    321   *
    322   * This can be called from any thread.
    323   *
    324   * @return the number of elements that can be stored in the buffer.
    325   */
    326  int StorageCapacity() const { return mCapacity; }
    327  /**
    328   * Returns the number of elements available for reading.
    329   *
    330   * This can be called from the consummer or producer thread, but see the
    331   * comment in `AvailableRead`.
    332   *
    333   * @return the number of available elements for reading.
    334   */
    335  int AvailableReadInternal(int aReadIndex, int aWriteIndex) const {
    336    if (aWriteIndex >= aReadIndex) {
    337      return aWriteIndex - aReadIndex;
    338    } else {
    339      return aWriteIndex + StorageCapacity() - aReadIndex;
    340    }
    341  }
    342  /**
    343   * Returns the number of empty elements, available for writing.
    344   *
    345   * This can be called from the consummer or producer thread, but see the
    346   * comment in `AvailableWrite`.
    347   *
    348   * @return the number of elements that can be written into the array.
    349   */
    350  int AvailableWriteInternal(int aReadIndex, int aWriteIndex) const {
    351    /* We subtract one element here to always keep at least one sample
    352     * free in the buffer, to distinguish between full and empty array. */
    353    int rv = aReadIndex - aWriteIndex - 1;
    354    if (aWriteIndex >= aReadIndex) {
    355      rv += StorageCapacity();
    356    }
    357    return rv;
    358  }
    359  /**
    360   * Increments an index, wrapping it around the storage.
    361   *
    362   * Incrementing `mWriteIndex` can be done on the producer thread.
    363   * Incrementing `mReadIndex` can be done on the consummer thread.
    364   *
    365   * @param index a reference to the index to increment.
    366   * @param increment the number by which `index` is incremented.
    367   * @return the new index.
    368   */
    369  int IncrementIndex(int aIndex, int aIncrement) const {
    370    MOZ_ASSERT(aIncrement >= 0 && aIncrement < StorageCapacity() &&
    371               aIndex < StorageCapacity());
    372    return (aIndex + aIncrement) % StorageCapacity();
    373  }
    374  /**
    375   * @brief This allows checking that Enqueue (resp. Dequeue) are always
    376   * called by the right thread.
    377   *
    378   * The role of the thread are assigned the first time they call Enqueue or
    379   * Dequeue, and cannot change, except by a ResetThreadId method.
    380   *
    381   * @param id the id of the thread that has called the calling method first.
    382   */
    383 #ifdef DEBUG
    384  static void AssertCorrectThread(std::thread::id& aId) {
    385    if (aId == std::thread::id()) {
    386      aId = std::this_thread::get_id();
    387      return;
    388    }
    389    MOZ_ASSERT(aId == std::this_thread::get_id());
    390  }
    391 #endif
    392  /** Index at which the oldest element is. */
    393  std::atomic<int> mReadIndex;
    394  /** Index at which to write new elements. `mWriteIndex` is always at
    395   * least one element ahead of `mReadIndex`. */
    396  std::atomic<int> mWriteIndex;
    397  /** Maximum number of elements that can be stored in the ring buffer. */
    398  const int mCapacity;
    399  /** Data storage, of size `mCapacity + 1` */
    400  std::unique_ptr<T[]> mData;
    401 #ifdef DEBUG
    402  /** The id of the only thread that is allowed to read from the queue. */
    403  mutable std::thread::id mConsumerId;
    404  /** The id of the only thread that is allowed to write from the queue. */
    405  mutable std::thread::id mProducerId;
    406 #endif
    407 };
    408 
    409 /**
    410 * Instantiation of the `SPSCRingBufferBase` type. This is safe to use
    411 * from two threads, one producer, one consumer (that never change role),
    412 * without explicit synchronization.
    413 */
    414 template <typename T>
    415 using SPSCQueue = SPSCRingBufferBase<T>;
    416 
    417 }  // namespace mozilla
    418 
    419 #endif  // mozilla_LockFreeQueue_h