tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

UnboundedMPSCQueue.h (5638B)


      1 /* This Source Code Form is subject to the terms of the Mozilla Public
      2 * License, v. 2.0. If a copy of the MPL was not distributed with this
      3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
      4 
      5 #ifndef mozilla_dom_UnboundedMPSCQueue_h
      6 #define mozilla_dom_UnboundedMPSCQueue_h
      7 
      8 namespace mozilla {
      9 
     10 // This class implements a lock-free multiple producer single consumer queue of
     11 // fixed size log messages, with the following characteristics:
     12 // - Unbounded (uses a intrinsic linked list)
     13 // - Allocates on Push. Push can be called on any thread.
     14 // - Deallocates on Pop. Pop MUST always be called on the same thread for the
     15 // life-time of the queue.
     16 //
     17 // In our scenario, the producer threads are real-time, they can't block. The
     18 // consummer thread runs every now and then and empties the queue to a log
     19 // file, on disk.
     20 const size_t MPSC_MSG_RESERVED = sizeof(std::atomic<void*>);
     21 
     22 template <typename T>
     23 class UnboundedMPSCQueue {
     24 public:
     25  struct Message {
     26    Message() { mNext.store(nullptr, std::memory_order_relaxed); }
     27    Message(const Message& aMessage) = delete;
     28    void operator=(const Message& aMessage) = delete;
     29 
     30    std::atomic<Message*> mNext;
     31    T data;
     32  };
     33 
     34  // Creates a new UnboundedMPSCQueue. Initially, the queue has a single
     35  // sentinel node, pointed to by both mHead and mTail.
     36  UnboundedMPSCQueue()
     37      // At construction, the initial message points to nullptr (it has no
     38      // successor). It is a sentinel node, that does not contain meaningful
     39      // data.
     40      : mHead(new Message()), mTail(mHead.load(std::memory_order_relaxed)) {}
     41 
     42  ~UnboundedMPSCQueue() {
     43    Message dummy;
     44    while (Pop(&dummy.data)) {
     45    }
     46    Message* front = mHead.load(std::memory_order_relaxed);
     47    delete front;
     48  }
     49 
     50  void Push(UnboundedMPSCQueue<T>::Message* aMessage) {
     51    // The next two non-commented line are called A and B in this paragraph.
     52    // Producer threads i, i-1, etc. are numbered in the order they reached
     53    // A in time, thread i being the thread that has reached A first.
     54    // Atomically, on line A the new `mHead` is set to be the node that was
     55    // just allocated, with strong memory order. From now on, any thread
     56    // that reaches A will see that the node just allocated is
     57    // effectively the head of the list, and will make itself the new head
     58    // of the list.
     59    // In a bad case (when thread i executes A and then
     60    // is not scheduled for a long time), it is possible that thread i-1 and
     61    // subsequent threads create a seemingly disconnected set of nodes, but
     62    // they all have the correct value for the next node to set as their
     63    // mNext member on their respective stacks (in `prev`), and this is
     64    // always correct. When the scheduler resumes, and line B is executed,
     65    // the correct linkage is resumed.
     66    // Before line B, since mNext for the node was the last element of
     67    // the queue still has an mNext of nullptr, Pop will not see the node
     68    // added.
     69    // For line A, it's critical to have strong ordering both ways (since
     70    // it's going to possibly be read and write repeatidly by multiple
     71    // threads)
     72    // Line B can have weaker guarantees, it's only going to be written by a
     73    // single thread, and we just need to ensure it's read properly by a
     74    // single other one.
     75    Message* prev = mHead.exchange(aMessage, std::memory_order_acq_rel);
     76    prev->mNext.store(aMessage, std::memory_order_release);
     77  }
     78 
     79  // Copy the content of the first message of the queue to aOutput, and
     80  // frees the message. Returns true if there was a message, in which case
     81  // `aOutput` contains a valid value. If the queue was empty, returns false,
     82  // in which case `aOutput` is left untouched.
     83  bool Pop(T* aOutput) {
     84    // Similarly, in this paragraph, the two following lines are called A
     85    // and B, and threads are called thread i, i-1, etc. in order of
     86    // execution of line A.
     87    // On line A, the first element of the queue is acquired. It is simply a
     88    // sentinel node.
     89    // On line B, we acquire the node that has the data we want. If B is
     90    // null, then only the sentinel node was present in the queue, we can
     91    // safely return false.
     92    // mTail can be loaded with relaxed ordering, since it's not written nor
     93    // read by any other thread (this queue is single consumer).
     94    // mNext can be written to by one of the producer, so it's necessary to
     95    // ensure those writes are seen, hence the stricter ordering.
     96    Message* tail = mTail.load(std::memory_order_relaxed);
     97    Message* next = tail->mNext.load(std::memory_order_acquire);
     98 
     99    if (next == nullptr) {
    100      return false;
    101    }
    102 
    103    *aOutput = next->data;
    104 
    105    // Simply shift the queue one node further, so that the sentinel node is
    106    // now pointing to the correct most ancient node. It contains stale data,
    107    // but this data will never be read again.
    108    // It's only necessary to ensure the previous load on this thread is not
    109    // reordered past this line, so release ordering is sufficient here.
    110    mTail.store(next, std::memory_order_release);
    111 
    112    // This thread is now the only thing that points to `tail`, it can be
    113    // safely deleted.
    114    delete tail;
    115 
    116    return true;
    117  }
    118 
    119 private:
    120  // An atomic pointer to the most recent message in the queue.
    121  std::atomic<Message*> mHead;
    122  // An atomic pointer to a sentinel node, that points to the oldest message
    123  // in the queue.
    124  std::atomic<Message*> mTail;
    125 
    126  UnboundedMPSCQueue(const UnboundedMPSCQueue&) = delete;
    127  void operator=(const UnboundedMPSCQueue&) = delete;
    128 };
    129 
    130 }  // namespace mozilla
    131 
    132 #endif  // mozilla_dom_UnboundedMPSCQueue_h