tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

cubeb_ringbuffer.h (15215B)


      1 /*
      2 * Copyright © 2016 Mozilla Foundation
      3 *
      4 * This program is made available under an ISC-style license.  See the
      5 * accompanying file LICENSE for details.
      6 */
      7 
      8 #ifndef CUBEB_RING_BUFFER_H
      9 #define CUBEB_RING_BUFFER_H
     10 
     11 #include "cubeb_utils.h"
     12 #include <algorithm>
     13 #include <atomic>
     14 #include <cstdint>
     15 #include <memory>
     16 #include <thread>
     17 
     18 /**
     19 * Single producer single consumer lock-free and wait-free ring buffer.
     20 *
     21 * This data structure allows producing data from one thread, and consuming it
     22 * on another thread, safely and without explicit synchronization. If used on
     23 * two threads, this data structure uses atomics for thread safety. It is
     24 * possible to disable the use of atomics at compile time and only use this data
     25 * structure on one thread.
     26 *
     27 * The role for the producer and the consumer must be constant, i.e., the
     28 * producer should always be on one thread and the consumer should always be on
     29 * another thread.
     30 *
     31 * Some words about the inner workings of this class:
     32 * - Capacity is fixed. Only one allocation is performed, in the constructor.
     33 *   When reading and writing, the return value of the method allows checking if
     34 *   the ring buffer is empty or full.
     35 * - We always keep the read index at least one element ahead of the write
     36 *   index, so we can distinguish between an empty and a full ring buffer: an
     37 *   empty ring buffer is when the write index is at the same position as the
     38 *   read index. A full buffer is when the write index is exactly one position
     39 *   before the read index.
     40 * - We synchronize updates to the read index after having read the data, and
     41 *   the write index after having written the data. This means that the each
     42 *   thread can only touch a portion of the buffer that is not touched by the
     43 *   other thread.
     44 * - Callers are expected to provide buffers. When writing to the queue,
     45 *   elements are copied into the internal storage from the buffer passed in.
     46 *   When reading from the queue, the user is expected to provide a buffer.
     47 *   Because this is a ring buffer, data might not be contiguous in memory,
     48 *   providing an external buffer to copy into is an easy way to have linear
     49 *   data for further processing.
     50 */
     51 template <typename T> class ring_buffer_base {
     52 public:
     53  /**
     54   * Constructor for a ring buffer.
     55   *
     56   * This performs an allocation, but is the only allocation that will happen
     57   * for the life time of a `ring_buffer_base`.
     58   *
     59   * @param capacity The maximum number of element this ring buffer will hold.
     60   */
     61  ring_buffer_base(int capacity)
     62      /* One more element to distinguish from empty and full buffer. */
     63      : capacity_(capacity + 1)
     64  {
     65    assert(storage_capacity() < std::numeric_limits<int>::max() / 2 &&
     66           "buffer too large for the type of index used.");
     67    assert(capacity_ > 0);
     68 
     69    data_.reset(new T[storage_capacity()]);
     70    /* If this queue is using atomics, initializing those members as the last
     71     * action in the constructor acts as a full barrier, and allow capacity() to
     72     * be thread-safe. */
     73    write_index_ = 0;
     74    read_index_ = 0;
     75  }
     76  /**
     77   * Push `count` zero or default constructed elements in the array.
     78   *
     79   * Only safely called on the producer thread.
     80   *
     81   * @param count The number of elements to enqueue.
     82   * @return The number of element enqueued.
     83   */
     84  int enqueue_default(int count) { return enqueue(nullptr, count); }
     85  /**
     86   * @brief Put an element in the queue
     87   *
     88   * Only safely called on the producer thread.
     89   *
     90   * @param element The element to put in the queue.
     91   *
     92   * @return 1 if the element was inserted, 0 otherwise.
     93   */
     94  int enqueue(T & element) { return enqueue(&element, 1); }
     95  /**
     96   * Push `count` elements in the ring buffer.
     97   *
     98   * Only safely called on the producer thread.
     99   *
    100   * @param elements a pointer to a buffer containing at least `count` elements.
    101   * If `elements` is nullptr, zero or default constructed elements are
    102   * enqueued.
    103   * @param count The number of elements to read from `elements`
    104   * @return The number of elements successfully coped from `elements` and
    105   * inserted into the ring buffer.
    106   */
    107  int enqueue(T * elements, int count)
    108  {
    109 #ifndef NDEBUG
    110    assert_correct_thread(producer_id);
    111 #endif
    112 
    113    int wr_idx = write_index_.load(std::memory_order_relaxed);
    114    int rd_idx = read_index_.load(std::memory_order_acquire);
    115 
    116    if (full_internal(rd_idx, wr_idx)) {
    117      return 0;
    118    }
    119 
    120    int to_write = std::min(available_write_internal(rd_idx, wr_idx), count);
    121 
    122    /* First part, from the write index to the end of the array. */
    123    int first_part = std::min(storage_capacity() - wr_idx, to_write);
    124    /* Second part, from the beginning of the array */
    125    int second_part = to_write - first_part;
    126 
    127    if (elements) {
    128      Copy(data_.get() + wr_idx, elements, first_part);
    129      Copy(data_.get(), elements + first_part, second_part);
    130    } else {
    131      ConstructDefault(data_.get() + wr_idx, first_part);
    132      ConstructDefault(data_.get(), second_part);
    133    }
    134 
    135    write_index_.store(increment_index(wr_idx, to_write),
    136                       std::memory_order_release);
    137 
    138    return to_write;
    139  }
    140  /**
    141   * Retrieve at most `count` elements from the ring buffer, and copy them to
    142   * `elements`, if non-null.
    143   *
    144   * Only safely called on the consumer side.
    145   *
    146   * @param elements A pointer to a buffer with space for at least `count`
    147   * elements. If `elements` is `nullptr`, `count` element will be discarded.
    148   * @param count The maximum number of elements to dequeue.
    149   * @return The number of elements written to `elements`.
    150   */
    151  int dequeue(T * elements, int count)
    152  {
    153 #ifndef NDEBUG
    154    assert_correct_thread(consumer_id);
    155 #endif
    156 
    157    int rd_idx = read_index_.load(std::memory_order_relaxed);
    158    int wr_idx = write_index_.load(std::memory_order_acquire);
    159 
    160    if (empty_internal(rd_idx, wr_idx)) {
    161      return 0;
    162    }
    163 
    164    int to_read = std::min(available_read_internal(rd_idx, wr_idx), count);
    165 
    166    int first_part = std::min(storage_capacity() - rd_idx, to_read);
    167    int second_part = to_read - first_part;
    168 
    169    if (elements) {
    170      Copy(elements, data_.get() + rd_idx, first_part);
    171      Copy(elements + first_part, data_.get(), second_part);
    172    }
    173 
    174    read_index_.store(increment_index(rd_idx, to_read),
    175                      std::memory_order_release);
    176 
    177    return to_read;
    178  }
    179  /**
    180   * Get the number of available element for consuming.
    181   *
    182   * Only safely called on the consumer thread.
    183   *
    184   * @return The number of available elements for reading.
    185   */
    186  int available_read() const
    187  {
    188 #ifndef NDEBUG
    189    assert_correct_thread(consumer_id);
    190 #endif
    191    return available_read_internal(
    192        read_index_.load(std::memory_order_relaxed),
    193        write_index_.load(std::memory_order_acquire));
    194  }
    195  /**
    196   * Get the number of available elements for consuming.
    197   *
    198   * Only safely called on the producer thread.
    199   *
    200   * @return The number of empty slots in the buffer, available for writing.
    201   */
    202  int available_write() const
    203  {
    204 #ifndef NDEBUG
    205    assert_correct_thread(producer_id);
    206 #endif
    207    return available_write_internal(
    208        read_index_.load(std::memory_order_acquire),
    209        write_index_.load(std::memory_order_relaxed));
    210  }
    211  /**
    212   * Get the total capacity, for this ring buffer.
    213   *
    214   * Can be called safely on any thread.
    215   *
    216   * @return The maximum capacity of this ring buffer.
    217   */
    218  int capacity() const { return storage_capacity() - 1; }
    219  /**
    220   * Reset the consumer and producer thread identifier, in case the thread are
    221   * being changed. This has to be externally synchronized. This is no-op when
    222   * asserts are disabled.
    223   */
    224  void reset_thread_ids()
    225  {
    226 #ifndef NDEBUG
    227    consumer_id = producer_id = std::thread::id();
    228 #endif
    229  }
    230 
    231 private:
    232  /** Return true if the ring buffer is empty.
    233   *
    234   * @param read_index the read index to consider
    235   * @param write_index the write index to consider
    236   * @return true if the ring buffer is empty, false otherwise.
    237   **/
    238  bool empty_internal(int read_index, int write_index) const
    239  {
    240    return write_index == read_index;
    241  }
    242  /** Return true if the ring buffer is full.
    243   *
    244   * This happens if the write index is exactly one element behind the read
    245   * index.
    246   *
    247   * @param read_index the read index to consider
    248   * @param write_index the write index to consider
    249   * @return true if the ring buffer is full, false otherwise.
    250   **/
    251  bool full_internal(int read_index, int write_index) const
    252  {
    253    return (write_index + 1) % storage_capacity() == read_index;
    254  }
    255  /**
    256   * Return the size of the storage. It is one more than the number of elements
    257   * that can be stored in the buffer.
    258   *
    259   * @return the number of elements that can be stored in the buffer.
    260   */
    261  int storage_capacity() const { return capacity_; }
    262  /**
    263   * Returns the number of elements available for reading.
    264   *
    265   * @return the number of available elements for reading.
    266   */
    267  int available_read_internal(int read_index, int write_index) const
    268  {
    269    if (write_index >= read_index) {
    270      return write_index - read_index;
    271    } else {
    272      return write_index + storage_capacity() - read_index;
    273    }
    274  }
    275  /**
    276   * Returns the number of empty elements, available for writing.
    277   *
    278   * @return the number of elements that can be written into the array.
    279   */
    280  int available_write_internal(int read_index, int write_index) const
    281  {
    282    /* We substract one element here to always keep at least one sample
    283     * free in the buffer, to distinguish between full and empty array. */
    284    int rv = read_index - write_index - 1;
    285    if (write_index >= read_index) {
    286      rv += storage_capacity();
    287    }
    288    return rv;
    289  }
    290  /**
    291   * Increments an index, wrapping it around the storage.
    292   *
    293   * @param index a reference to the index to increment.
    294   * @param increment the number by which `index` is incremented.
    295   * @return the new index.
    296   */
    297  int increment_index(int index, int increment) const
    298  {
    299    assert(increment >= 0);
    300    return (index + increment) % storage_capacity();
    301  }
    302  /**
    303   * @brief This allows checking that enqueue (resp. dequeue) are always called
    304   * by the right thread.
    305   *
    306   * @param id the id of the thread that has called the calling method first.
    307   */
    308 #ifndef NDEBUG
    309  static void assert_correct_thread(std::thread::id & id)
    310  {
    311    if (id == std::thread::id()) {
    312      id = std::this_thread::get_id();
    313      return;
    314    }
    315    assert(id == std::this_thread::get_id());
    316  }
    317 #endif
    318  /** Index at which the oldest element is at, in samples. */
    319  std::atomic<int> read_index_;
    320  /** Index at which to write new elements. `write_index` is always at
    321   * least one element ahead of `read_index_`. */
    322  std::atomic<int> write_index_;
    323  /** Maximum number of elements that can be stored in the ring buffer. */
    324  const int capacity_;
    325  /** Data storage */
    326  std::unique_ptr<T[]> data_;
    327 #ifndef NDEBUG
    328  /** The id of the only thread that is allowed to read from the queue. */
    329  mutable std::thread::id consumer_id;
    330  /** The id of the only thread that is allowed to write from the queue. */
    331  mutable std::thread::id producer_id;
    332 #endif
    333 };
    334 
    335 /**
    336 * Adapter for `ring_buffer_base` that exposes an interface in frames.
    337 */
    338 template <typename T> class audio_ring_buffer_base {
    339 public:
    340  /**
    341   * @brief Constructor.
    342   *
    343   * @param channel_count       Number of channels.
    344   * @param capacity_in_frames  The capacity in frames.
    345   */
    346  audio_ring_buffer_base(int channel_count, int capacity_in_frames)
    347      : channel_count(channel_count),
    348        ring_buffer(frames_to_samples(capacity_in_frames))
    349  {
    350    assert(channel_count > 0);
    351  }
    352  /**
    353   * @brief Enqueue silence.
    354   *
    355   * Only safely called on the producer thread.
    356   *
    357   * @param frame_count The number of frames of silence to enqueue.
    358   * @return  The number of frames of silence actually written to the queue.
    359   */
    360  int enqueue_default(int frame_count)
    361  {
    362    return samples_to_frames(
    363        ring_buffer.enqueue(nullptr, frames_to_samples(frame_count)));
    364  }
    365  /**
    366   * @brief Enqueue `frames_count` frames of audio.
    367   *
    368   * Only safely called from the producer thread.
    369   *
    370   * @param [in] frames If non-null, the frames to enqueue.
    371   *                    Otherwise, silent frames are enqueued.
    372   * @param frame_count The number of frames to enqueue.
    373   *
    374   * @return The number of frames enqueued
    375   */
    376 
    377  int enqueue(T * frames, int frame_count)
    378  {
    379    return samples_to_frames(
    380        ring_buffer.enqueue(frames, frames_to_samples(frame_count)));
    381  }
    382 
    383  /**
    384   * @brief Removes `frame_count` frames from the buffer, and
    385   *        write them to `frames` if it is non-null.
    386   *
    387   * Only safely called on the consumer thread.
    388   *
    389   * @param frames      If non-null, the frames are copied to `frames`.
    390   *                    Otherwise, they are dropped.
    391   * @param frame_count The number of frames to remove.
    392   *
    393   * @return  The number of frames actually dequeud.
    394   */
    395  int dequeue(T * frames, int frame_count)
    396  {
    397    return samples_to_frames(
    398        ring_buffer.dequeue(frames, frames_to_samples(frame_count)));
    399  }
    400  /**
    401   * Get the number of available frames of audio for consuming.
    402   *
    403   * Only safely called on the consumer thread.
    404   *
    405   * @return The number of available frames of audio for reading.
    406   */
    407  int available_read() const
    408  {
    409    return samples_to_frames(ring_buffer.available_read());
    410  }
    411  /**
    412   * Get the number of available frames of audio for consuming.
    413   *
    414   * Only safely called on the producer thread.
    415   *
    416   * @return The number of empty slots in the buffer, available for writing.
    417   */
    418  int available_write() const
    419  {
    420    return samples_to_frames(ring_buffer.available_write());
    421  }
    422  /**
    423   * Get the total capacity, for this ring buffer.
    424   *
    425   * Can be called safely on any thread.
    426   *
    427   * @return The maximum capacity of this ring buffer.
    428   */
    429  int capacity() const { return samples_to_frames(ring_buffer.capacity()); }
    430 
    431 private:
    432  /**
    433   * @brief Frames to samples conversion.
    434   *
    435   * @param frames The number of frames.
    436   *
    437   * @return  A number of samples.
    438   */
    439  int frames_to_samples(int frames) const { return frames * channel_count; }
    440  /**
    441   * @brief Samples to frames conversion.
    442   *
    443   * @param samples The number of samples.
    444   *
    445   * @return  A number of frames.
    446   */
    447  int samples_to_frames(int samples) const { return samples / channel_count; }
    448  /** Number of channels of audio that will stream through this ring buffer. */
    449  int channel_count;
    450  /** The underlying ring buffer that is used to store the data. */
    451  ring_buffer_base<T> ring_buffer;
    452 };
    453 
    454 /**
    455 * Lock-free instantiation of the `ring_buffer_base` type. This is safe to use
    456 * from two threads, one producer, one consumer (that never change role),
    457 * without explicit synchronization.
    458 */
    459 template <typename T> using lock_free_queue = ring_buffer_base<T>;
    460 /**
    461 * Lock-free instantiation of the `audio_ring_buffer` type. This is safe to use
    462 * from two threads, one producer, one consumer (that never change role),
    463 * without explicit synchronization.
    464 */
    465 template <typename T>
    466 using lock_free_audio_ring_buffer = audio_ring_buffer_base<T>;
    467 
    468 #endif // CUBEB_RING_BUFFER_H