tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

BoundedMPSCQueue.h (13805B)


      1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
      2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
      3 /* This Source Code Form is subject to the terms of the Mozilla Public
      4 * License, v. 2.0. If a copy of the MPL was not distributed with this
      5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
      6 
      7 /*
      8 * Multiple Producer Single Consumer lock-free queue.
      9 * Allocation-free is guaranteed outside of the constructor.
     10 *
     11 * This is a direct C++ port from
     12 * https://docs.rs/signal-hook/0.3.17/src/signal_hook/low_level/channel.rs.html#1-235
     13 * with the exception we are using atomic uint64t to have 15 slots in the ring
     14 * buffer (Rust implem is 5 slots, we want a bit more).
     15 * */
     16 
     17 #ifndef mozilla_BoundedMPSCQueue_h
     18 #define mozilla_BoundedMPSCQueue_h
     19 
     20 #include "mozilla/Assertions.h"
     21 #include <algorithm>
     22 #include <atomic>
     23 #include <cinttypes>
     24 #include <cstddef>
     25 #include <cstdint>
     26 #include <memory>
     27 #include <optional>
     28 
     29 namespace mozilla {
     30 
     31 static constexpr bool MPSC_DEBUG = false;
     32 
     33 /**
     34 * This data structure allows producing data from several threads, and consuming
     35 * it on one thread, safely and without performing memory allocations or
     36 * locking.
     37 *
     38 * The role for the producers and the consumer must be constant, i.e., the
     39 * producer should always be on one thread and the consumer should always be on
     40 * another thread.
     41 *
     42 * Some words about the inner workings of this class:
     43 * - Capacity is fixed. Only one allocation is performed, in the constructor.
     44 * - Maximum capacity is 15 elements, with 0 being used to denote an empty set.
     45 *   This is a hard limitation from encoding indexes within the atomic uint64_t.
     46 * - This is lock-free but not wait-free, it might spin a little until
     47 *   compare/exchange succeeds.
     48 * - There is no guarantee of forward progression for individual threads.
     49 * - This should be safe to use from a signal handler context.
     50 */
     51 template <typename T, size_t kCapacity>
     52 class MPSCRingBufferBase {
     53  static constexpr size_t kMaxCapacity = 16;
     54 
     55 public:
     56  explicit MPSCRingBufferBase() : mFree(0), mOccupied(0) {
     57    static_assert(kCapacity < kMaxCapacity);
     58 
     59    if constexpr (MPSC_DEBUG) {
     60      fprintf(stderr,
     61              "[%s] this=%p { mCapacity=%zu, mBits=%" PRIu64
     62              ", mMask=0x%" PRIx64 " }\n",
     63              __PRETTY_FUNCTION__, this, 1 + kCapacity, mBits, mMask);
     64    }
     65 
     66    // Leave one empty space in the queue, used to distinguish an empty queue
     67    // from a full one, as in the SPSCQueue.
     68    // https://docs.rs/signal-hook/0.3.17/src/signal_hook/low_level/channel.rs.html#126
     69    for (uint64_t i = 1; i < StorageCapacity(); ++i) {
     70      MarkSlot(mFree, i);
     71    }
     72 
     73    // This should be the only allocation performed, thus it cannot be performed
     74    // in a restricted context (e.g., signal handler, real-time thread)
     75    mData = std::make_unique<T[]>(Capacity());
     76 
     77    std::atomic_thread_fence(std::memory_order_seq_cst);
     78  }
     79 
     80  /**
     81   * @brief Put an element in the queue. The caller MUST check the return value
     82   * and maybe loop to try again (or drop if acceptable).
     83   *
     84   * First it attempts to acquire a slot (storage index) that is known to be
     85   * non used. If that is not successful then 0 is returned. If that is
     86   * successful, the slot is ours (it has been exclusively acquired) and data
     87   * can be copied into the ring buffer at that index.
     88   *
     89   * @param aElement The element to put in the queue.
     90   *
     91   * @return 0 if insertion could not be performed, inserted index otherwise
     92   */
     93  [[nodiscard]] int Send(T& aElement) {
     94    std::optional<uint64_t> empty_idx = UnmarkSlot(mFree);
     95    if (empty_idx.has_value()) {
     96      std::move(&aElement, &aElement + 1, &mData[*empty_idx - 1]);
     97      MarkSlot(mOccupied, *empty_idx);
     98      return *empty_idx;
     99    }
    100    return 0;
    101  }
    102 
    103  /**
    104   * Retrieve one element from the ring buffer, and copy it to
    105   * `aElement`, if non-null.
    106   *
    107   * It attempts to acquire a slot from the list of used ones. If that is not
    108   * successfull, then 0 is returned. Once a slot has been exclusively acquired,
    109   * data is copied from it into the non-null pointer passed in parameter.
    110   *
    111   * @param aElement A pointer to a `T` where data will be copied.
    112   *
    113   * @return The index from which data was copied, 0 if there was nothing in the
    114   * ring buffer.
    115   */
    116  [[nodiscard]] int Recv(T* aElement) {
    117    std::optional<uint64_t> idx = UnmarkSlot(mOccupied);
    118    if (idx.has_value()) {
    119      if (aElement) {
    120        std::move(&mData[*idx - 1], &mData[*idx], aElement);
    121      }
    122      MarkSlot(mFree, *idx);
    123      return *idx;
    124    }
    125    return 0;
    126  }
    127 
    128  constexpr size_t Capacity() const { return StorageCapacity() - 1; }
    129 
    130 private:
    131  /*
    132   * Get/Set manipulates the encoding within `aNumber` by storing the index as a
    133   * number and shifting it to the left (set) or right (get).
    134   *
    135   * Initial `aNumber` value is 0.
    136   *
    137   * Set() with first index value (1), we store the index on mBits and we shift
    138   * it to the left, e.g., as follows:
    139   *
    140   * aNumber=0b00000000000000000000000000000000000000000000000000000000000000
    141   * aIndex=0 aValue=1
    142   * aNumber=0b00000000000000000000000000000000000000000000000000000000000001
    143   * aIndex=1 aValue=33
    144   * aNumber=0b00000000000000000000000000000000000000000000000000000000100001
    145   * aIndex=2 aValue=801
    146   * aNumber=0b00000000000000000000000000000000000000000000000000001100100001
    147   * aIndex=3 aValue=17185
    148   * aNumber=0b00000000000000000000000000000000000000000000000100001100100001
    149   * aIndex=4 aValue=344865
    150   * aNumber=0b00000000000000000000000000000000000000000001010100001100100001
    151   * aIndex=5 aValue=6636321
    152   * aNumber=0b00000000000000000000000000000000000000011001010100001100100001
    153   * aIndex=6 aValue=124076833
    154   * aNumber=0b00000000000000000000000000000000000111011001010100001100100001
    155   * aIndex=7 aValue=2271560481
    156   * aNumber=0b00000000000000000000000000000010000111011001010100001100100001
    157   * aIndex=8 aValue=40926266145
    158   * aNumber=0b00000000000000000000000000100110000111011001010100001100100001
    159   * aIndex=9 aValue=728121033505
    160   * aNumber=0b00000000000000000000001010100110000111011001010100001100100001
    161   * aIndex=10 aValue=12822748939041
    162   * aNumber=0b00000000000000000010111010100110000111011001010100001100100001
    163   * aIndex=11 aValue=223928981472033
    164   * aNumber=0b00000000000000110010111010100110000111011001010100001100100001
    165   * aIndex=12 aValue=3883103678710561
    166   * aNumber=0b00000000001101110010111010100110000111011001010100001100100001
    167   * aIndex=13 aValue=66933498461897505
    168   * aNumber=0b00000011101101110010111010100110000111011001010100001100100001
    169   * aIndex=14 aValue=1147797409030816545
    170   */
    171  [[nodiscard]] uint64_t Get(uint64_t aNumber, uint64_t aIndex) {
    172    return (aNumber >> (mBits * aIndex)) & mMask;
    173  }
    174 
    175  [[nodiscard]] uint64_t Set(uint64_t aNumber, uint64_t aIndex,
    176                             uint64_t aValue) {
    177    return (aNumber & ~(mMask << (mBits * aIndex))) |
    178           (aValue << (mBits * aIndex));
    179  }
    180 
    181  /*
    182   * Enqueue a value in the ring buffer at aIndex.
    183   *
    184   * Takes the current uint64_t value from the atomic and try to acquire a non
    185   * used slot in the ring buffer. If unsuccessful, 0 is returned, otherwise
    186   * compute the new atomic value that holds the new state of usage of the
    187   * slots, and use compare/exchange to perform lock-free synchronization:
    188   * compare/exchanges succeeds when the current value and the modified one are
    189   * equal, reflecting an acquired lock. If another thread was concurrent to
    190   * this one, then it would fail to that operation, and go into the next
    191   * iteration of the loop to read the new state value from the atomic, and
    192   * acquire a different slot.
    193   *
    194   * @param aSlotStatus a uint64_t atomic that is used to perform lock-free
    195   * thread exclusions
    196   *
    197   * @param aIndex the index where we want to enqueue. It should come from the
    198   * empty queue
    199   * */
    200  void MarkSlot(std::atomic<uint64_t>& aSlotStatus, uint64_t aIndex) {
    201    uint64_t current = aSlotStatus.load(std::memory_order_relaxed);
    202    do {
    203      // Attempts to find a slot that is available to enqueue, without
    204      // cross-thread synchronization
    205      auto empty = [&]() -> std::optional<uint64_t> {
    206        for (uint64_t i = 0; i < Capacity(); ++i) {
    207          if (Get(current, i) == 0) {
    208            return i;
    209          }
    210        }
    211        return {};
    212      }();
    213      if (!empty.has_value()) {
    214        // Rust does expect() which would panic:
    215        // https://docs.rs/signal-hook/0.3.17/src/signal_hook/low_level/channel.rs.html#62
    216        // If there's no empty place, then it would be up to the caller to deal
    217        // with that
    218        MOZ_CRASH("No empty slot available");
    219      }
    220      uint64_t modified = Set(current, *empty, aIndex);
    221      // This is where the lock-free synchronization happens ; if `current`
    222      // matches the content of `aSlotStatus`, then store `modified` in
    223      // aSlotStatus and succeeds. Upon success it means no other thread has
    224      // tried to change the same value at the same time, so the lock was safely
    225      // acquired.
    226      //
    227      // Upon failure, it means another thread tried at the same time to use the
    228      // same slot, so a new iteration of the loop needs to be executed to try
    229      // another slot.
    230      //
    231      // In case of success (`aSlotStatus`'s content is equal to `current`), we
    232      // require memory_order_release for the read-modify-write operation
    233      // because we want to make sure when acquiring a slot that any concurrent
    234      // thread performing a write had a chance to do it.
    235      //
    236      // In case of failure we require memory_order_relaxed for the load
    237      // operation because we dont need synchronization at that point.
    238      if (aSlotStatus.compare_exchange_weak(current, modified,
    239                                            std::memory_order_release,
    240                                            std::memory_order_relaxed)) {
    241        if constexpr (MPSC_DEBUG) {
    242          fprintf(stderr,
    243                  "[enqueue] modified=0x%" PRIx64 " => index=%" PRIu64 "\n",
    244                  modified, aIndex);
    245        }
    246        return;
    247      }
    248    } while (true);
    249  }
    250 
    251  /*
    252   * Dequeue a value from the ring buffer.
    253   *
    254   * Takes the current value from the uint64_t atomic and read the current index
    255   * out of it. If that index is 0 then we are facing a lack of slots and we
    256   * return, the caller MUST check this and deal with the situation. If the
    257   * index is non null we can try to acquire the matching slot in the ring
    258   * buffer thanks to the compare/exchange loop. When the compare/exchange call
    259   * succeeds, then the slot was acquired.
    260   *
    261   * @param aSlotStatus a uint64_t atomic that is used to perform lock-free
    262   * thread exclusions
    263   * */
    264  [[nodiscard]] std::optional<uint64_t> UnmarkSlot(
    265      std::atomic<uint64_t>& aSlotStatus) {
    266    uint64_t current = aSlotStatus.load(std::memory_order_relaxed);
    267    do {
    268      uint64_t index = current & mMask;
    269      if (index == 0) {
    270        // Return a None
    271        // https://docs.rs/signal-hook/0.3.17/src/signal_hook/low_level/channel.rs.html#77
    272        // If we return None while dequeuing on mFree then we are full and the
    273        // caller needs to deal with that.
    274        return {};
    275      }
    276      uint64_t modified = current >> mBits;
    277      // See the comment in MarkSlot for details
    278      //
    279      // In case of success (`aSlotStatus`'s content is equal to `current`), we
    280      // require memory_order_acquire for the read-modify-write operation
    281      // because we want to make sure when unmarking a slot that any concurrent
    282      // thread performing a read will see the value we are writing.
    283      //
    284      // In case of failure we require memory_order_relaxed for the load
    285      // operation because we don't need synchronization at that point.
    286      if (aSlotStatus.compare_exchange_weak(current, modified,
    287                                            std::memory_order_acquire,
    288                                            std::memory_order_relaxed)) {
    289        if constexpr (MPSC_DEBUG) {
    290          fprintf(stderr,
    291                  "[dequeue] current=0x%" PRIx64 " => index=%" PRIu64 "\n",
    292                  current, index);
    293        }
    294        return index;
    295      }
    296    } while (true);
    297    return {};
    298  }
    299 
    300  // Return the number of elements we can store within the ring buffer, whereas
    301  // Capacity() will return the amount of elements in mData, including the 0
    302  // value.
    303  [[nodiscard]] constexpr size_t StorageCapacity() const {
    304    return 1 + kCapacity;
    305  }
    306 
    307  // For the atomics below they are manipulated by Get()/Set(), and we are using
    308  // them to store the IDs of the ring buffer usage (empty/full).
    309  //
    310  // We use mBits bits to store an ID (so we are limited to 16 and 0 is
    311  // reserved) and append each of them to the atomics.
    312  //
    313  // A 0 value in one of those denotes we are full for the atomic, i.e.,
    314  // mFree=0 means we are full and mOccupied=0 means we are empty.
    315 
    316  // Holds the IDs of the free slots in the ring buffer
    317  std::atomic<uint64_t> mFree;
    318 
    319  // Holds the IDs of the occupied slots in the ring buffer
    320  std::atomic<uint64_t> mOccupied;
    321 
    322  // The actual ring buffer
    323  std::unique_ptr<T[]> mData;
    324 
    325  // How we are using the uint64_t atomic above to store the IDs of the ring
    326  // buffer.
    327  static constexpr uint64_t mBits = 4;
    328  static constexpr uint64_t mMask = 0b1111;
    329 };
    330 
    331 /**
    332 * Instantiation of the `MPSCRingBufferBase` type. This is safe to use from
    333 * several producers threads and one one consumer (that never changes role),
    334 * without explicit synchronization nor allocation (outside of the constructor).
    335 */
    336 template <typename T, size_t Capacity>
    337 using BoundedMPSCQueue = MPSCRingBufferBase<T, Capacity>;
    338 
    339 }  // namespace mozilla
    340 
    341 #endif  // mozilla_BoundedMPSCQueue_h