UnboundedMPSCQueue.h (5638B)
1 /* This Source Code Form is subject to the terms of the Mozilla Public 2 * License, v. 2.0. If a copy of the MPL was not distributed with this 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 4 5 #ifndef mozilla_dom_UnboundedMPSCQueue_h 6 #define mozilla_dom_UnboundedMPSCQueue_h 7 8 namespace mozilla { 9 10 // This class implements a lock-free multiple producer single consumer queue of 11 // fixed size log messages, with the following characteristics: 12 // - Unbounded (uses a intrinsic linked list) 13 // - Allocates on Push. Push can be called on any thread. 14 // - Deallocates on Pop. Pop MUST always be called on the same thread for the 15 // life-time of the queue. 16 // 17 // In our scenario, the producer threads are real-time, they can't block. The 18 // consummer thread runs every now and then and empties the queue to a log 19 // file, on disk. 20 const size_t MPSC_MSG_RESERVED = sizeof(std::atomic<void*>); 21 22 template <typename T> 23 class UnboundedMPSCQueue { 24 public: 25 struct Message { 26 Message() { mNext.store(nullptr, std::memory_order_relaxed); } 27 Message(const Message& aMessage) = delete; 28 void operator=(const Message& aMessage) = delete; 29 30 std::atomic<Message*> mNext; 31 T data; 32 }; 33 34 // Creates a new UnboundedMPSCQueue. Initially, the queue has a single 35 // sentinel node, pointed to by both mHead and mTail. 36 UnboundedMPSCQueue() 37 // At construction, the initial message points to nullptr (it has no 38 // successor). It is a sentinel node, that does not contain meaningful 39 // data. 40 : mHead(new Message()), mTail(mHead.load(std::memory_order_relaxed)) {} 41 42 ~UnboundedMPSCQueue() { 43 Message dummy; 44 while (Pop(&dummy.data)) { 45 } 46 Message* front = mHead.load(std::memory_order_relaxed); 47 delete front; 48 } 49 50 void Push(UnboundedMPSCQueue<T>::Message* aMessage) { 51 // The next two non-commented line are called A and B in this paragraph. 52 // Producer threads i, i-1, etc. are numbered in the order they reached 53 // A in time, thread i being the thread that has reached A first. 54 // Atomically, on line A the new `mHead` is set to be the node that was 55 // just allocated, with strong memory order. From now on, any thread 56 // that reaches A will see that the node just allocated is 57 // effectively the head of the list, and will make itself the new head 58 // of the list. 59 // In a bad case (when thread i executes A and then 60 // is not scheduled for a long time), it is possible that thread i-1 and 61 // subsequent threads create a seemingly disconnected set of nodes, but 62 // they all have the correct value for the next node to set as their 63 // mNext member on their respective stacks (in `prev`), and this is 64 // always correct. When the scheduler resumes, and line B is executed, 65 // the correct linkage is resumed. 66 // Before line B, since mNext for the node was the last element of 67 // the queue still has an mNext of nullptr, Pop will not see the node 68 // added. 69 // For line A, it's critical to have strong ordering both ways (since 70 // it's going to possibly be read and write repeatidly by multiple 71 // threads) 72 // Line B can have weaker guarantees, it's only going to be written by a 73 // single thread, and we just need to ensure it's read properly by a 74 // single other one. 75 Message* prev = mHead.exchange(aMessage, std::memory_order_acq_rel); 76 prev->mNext.store(aMessage, std::memory_order_release); 77 } 78 79 // Copy the content of the first message of the queue to aOutput, and 80 // frees the message. Returns true if there was a message, in which case 81 // `aOutput` contains a valid value. If the queue was empty, returns false, 82 // in which case `aOutput` is left untouched. 83 bool Pop(T* aOutput) { 84 // Similarly, in this paragraph, the two following lines are called A 85 // and B, and threads are called thread i, i-1, etc. in order of 86 // execution of line A. 87 // On line A, the first element of the queue is acquired. It is simply a 88 // sentinel node. 89 // On line B, we acquire the node that has the data we want. If B is 90 // null, then only the sentinel node was present in the queue, we can 91 // safely return false. 92 // mTail can be loaded with relaxed ordering, since it's not written nor 93 // read by any other thread (this queue is single consumer). 94 // mNext can be written to by one of the producer, so it's necessary to 95 // ensure those writes are seen, hence the stricter ordering. 96 Message* tail = mTail.load(std::memory_order_relaxed); 97 Message* next = tail->mNext.load(std::memory_order_acquire); 98 99 if (next == nullptr) { 100 return false; 101 } 102 103 *aOutput = next->data; 104 105 // Simply shift the queue one node further, so that the sentinel node is 106 // now pointing to the correct most ancient node. It contains stale data, 107 // but this data will never be read again. 108 // It's only necessary to ensure the previous load on this thread is not 109 // reordered past this line, so release ordering is sufficient here. 110 mTail.store(next, std::memory_order_release); 111 112 // This thread is now the only thing that points to `tail`, it can be 113 // safely deleted. 114 delete tail; 115 116 return true; 117 } 118 119 private: 120 // An atomic pointer to the most recent message in the queue. 121 std::atomic<Message*> mHead; 122 // An atomic pointer to a sentinel node, that points to the oldest message 123 // in the queue. 124 std::atomic<Message*> mTail; 125 126 UnboundedMPSCQueue(const UnboundedMPSCQueue&) = delete; 127 void operator=(const UnboundedMPSCQueue&) = delete; 128 }; 129 130 } // namespace mozilla 131 132 #endif // mozilla_dom_UnboundedMPSCQueue_h