tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

MultiWriterQueue.h (19910B)


      1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
      2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
      3 /* This Source Code Form is subject to the terms of the Mozilla Public
      4 * License, v. 2.0. If a copy of the MPL was not distributed with this
      5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
      6 
      7 #ifndef mozilla_MultiWriterQueue_h_
      8 #define mozilla_MultiWriterQueue_h_
      9 
     10 #include <cstdint>
     11 #include <utility>
     12 
     13 #include "RollingNumber.h"
     14 #include "mozilla/Atomics.h"
     15 #include "mozilla/MemoryReporting.h"
     16 #include "mozilla/Mutex.h"
     17 #include "prthread.h"
     18 
     19 namespace mozilla {
     20 
     21 // Default reader locking strategy, using a mutex to ensure that concurrent
     22 // PopAll calls won't overlap.
     23 class MOZ_CAPABILITY("mutex") MultiWriterQueueReaderLocking_Mutex {
     24 public:
     25  MultiWriterQueueReaderLocking_Mutex()
     26      : mMutex("MultiWriterQueueReaderLocking_Mutex") {}
     27  void Lock() MOZ_CAPABILITY_ACQUIRE(mMutex) { mMutex.Lock(); };
     28  void Unlock() MOZ_CAPABILITY_RELEASE(mMutex) { mMutex.Unlock(); };
     29 
     30 private:
     31  Mutex mMutex;
     32 };
     33 
     34 // Reader non-locking strategy, trusting that PopAll will never be called
     35 // concurrently (e.g., by only calling it from a specific thread).
     36 class MOZ_CAPABILITY("dummy lock") MultiWriterQueueReaderLocking_None {
     37 public:
     38 #ifndef DEBUG
     39  void Lock() MOZ_CAPABILITY_ACQUIRE() {};
     40  void Unlock() MOZ_CAPABILITY_RELEASE() {};
     41 #else
     42  // DEBUG-mode checks to catch concurrent misuses.
     43  void Lock() MOZ_CAPABILITY_ACQUIRE() {
     44    MOZ_ASSERT(mLocked.compareExchange(false, true));
     45  };
     46  void Unlock() MOZ_CAPABILITY_RELEASE() {
     47    MOZ_ASSERT(mLocked.compareExchange(true, false));
     48  };
     49 
     50 private:
     51  Atomic<bool> mLocked{false};
     52 #endif
     53 };
     54 
     55 static constexpr uint32_t MultiWriterQueueDefaultBufferSize = 8192;
     56 
     57 // Multi-writer, single-reader queue of elements of type `T`.
     58 // Elements are bunched together in buffers of `BufferSize` elements.
     59 //
     60 // This queue is heavily optimized for pushing. In most cases pushes will only
     61 // cost a couple of atomic reads and a few non-atomic reads. Worst cases:
     62 // - Once per buffer, a push will allocate or reuse a buffer for later pushes;
     63 // - During the above new-buffer push, other pushes will be blocked.
     64 //
     65 // By default, popping is protected by mutex; it may be disabled if popping is
     66 // guaranteed never to be concurrent.
     67 // In any case, popping will never negatively impact pushes.
     68 // (However, *not* popping will add runtime costs, as unread buffers will not
     69 // be freed, or made available to future pushes; Push functions provide
     70 // feedback as to when popping would be most efficient.)
     71 template <typename T, uint32_t BufferSize = MultiWriterQueueDefaultBufferSize,
     72          typename ReaderLocking = MultiWriterQueueReaderLocking_Mutex>
     73 class MultiWriterQueue {
     74  static_assert(BufferSize > 0, "0-sized MultiWriterQueue buffer");
     75 
     76 public:
     77  // Constructor.
     78  // Allocates the initial buffer that will receive the first `BufferSize`
     79  // elements. Also allocates one reusable buffer, which will definitely be
     80  // needed after the first `BufferSize` elements have been pushed.
     81  // Ideally (if the reader can process each buffer quickly enough), there
     82  // won't be a need for more buffer allocations.
     83  MultiWriterQueue()
     84      : mBuffersCoverAtLeastUpTo(BufferSize - 1),
     85        mMostRecentBuffer(new Buffer{}),
     86        mReusableBuffers(new Buffer{}),
     87        mOldestBuffer(static_cast<Buffer*>(mMostRecentBuffer)),
     88        mLiveBuffersStats(1),
     89        mReusableBuffersStats(1),
     90        mAllocatedBuffersStats(2) {}
     91 
     92  ~MultiWriterQueue() {
     93    auto DestroyList = [](Buffer* aBuffer) {
     94      while (aBuffer) {
     95        Buffer* older = aBuffer->Older();
     96        delete aBuffer;
     97        aBuffer = older;
     98      }
     99    };
    100    DestroyList(mMostRecentBuffer);
    101    DestroyList(mReusableBuffers);
    102  }
    103 
    104  // We need the index to be order-resistant to overflow, i.e., numbers before
    105  // an overflow should test smaller-than numbers after the overflow.
    106  // This is because we keep pushing elements with increasing Index, and this
    107  // Index is used to find the appropriate buffer based on a range; and this
    108  // need to work smoothly when crossing the overflow boundary.
    109  using Index = RollingNumber<uint32_t>;
    110 
    111  // Pushes indicate whether they have just reached the end of a buffer.
    112  using DidReachEndOfBuffer = bool;
    113 
    114  // Push new element and call aF on it.
    115  // Element may be in just-created state, or recycled after a PopAll call.
    116  // Atomically thread-safe; in the worst case some pushes may be blocked
    117  // while a new buffer is created/reused for them.
    118  // Returns whether that push reached the end of a buffer; useful if caller
    119  // wants to trigger processing regularly at the most efficient time.
    120  template <typename F>
    121  DidReachEndOfBuffer PushF(F&& aF) {
    122    // Atomically claim ownership of the next available element.
    123    const Index index{mNextElementToWrite++};
    124    // And now go and set that element.
    125    for (;;) {
    126      Index lastIndex{mBuffersCoverAtLeastUpTo};
    127 
    128      if (MOZ_UNLIKELY(index == lastIndex)) {
    129        // We have claimed the last element in the current head -> Allocate a
    130        // new head in advance of more pushes. Make it point at the current
    131        // most-recent buffer.
    132        // This whole process is effectively guarded:
    133        // - Later pushes will wait until mBuffersCoverAtLeastUpTo changes to
    134        //   one that can accept their claimed index.
    135        // - Readers will stop until the last element is marked as valid.
    136        Buffer* ourBuffer = mMostRecentBuffer;
    137        Buffer* newBuffer = NewBuffer(ourBuffer, index + 1);
    138        // Because we have claimed this very specific index, we should be the
    139        // only one touching the most-recent buffer pointer.
    140        MOZ_ASSERT(mMostRecentBuffer == ourBuffer);
    141        // Just pivot the most-recent buffer pointer to our new buffer.
    142        mMostRecentBuffer = newBuffer;
    143        // Because we have claimed this very specific index, we should be the
    144        // only one touching the buffer coverage watermark.
    145        MOZ_ASSERT(mBuffersCoverAtLeastUpTo == lastIndex.Value());
    146        // Update it to include the just-added most-recent buffer.
    147        mBuffersCoverAtLeastUpTo = index.Value() + BufferSize;
    148        // We know for sure that `ourBuffer` is the correct one for this index.
    149        ourBuffer->SetAndValidateElement(aF, index);
    150        // And indicate that we have reached the end of a buffer.
    151        return true;
    152      }
    153 
    154      if (MOZ_UNLIKELY(index > lastIndex)) {
    155        // We have claimed an element in a yet-unavailable buffer, wait for our
    156        // target buffer to be created (see above).
    157        while (Index(mBuffersCoverAtLeastUpTo) < index) {
    158          PR_Sleep(PR_INTERVAL_NO_WAIT);  // Yield
    159        }
    160        // Then loop to examine the new situation.
    161        continue;
    162      }
    163 
    164      // Here, we have claimed a number that is covered by current buffers.
    165      // These buffers cannot be destroyed, because our buffer is not filled
    166      // yet (we haven't written in it yet), therefore the reader thread will
    167      // have to stop there (or before) and won't destroy our buffer or more
    168      // recent ones.
    169      MOZ_ASSERT(index < lastIndex);
    170      Buffer* ourBuffer = mMostRecentBuffer;
    171 
    172      // In rare situations, another thread may have had the time to create a
    173      // new more-recent buffer, in which case we need to find our older buffer.
    174      while (MOZ_UNLIKELY(index < ourBuffer->Origin())) {
    175        // We assume that older buffers with still-invalid elements (e.g., the
    176        // one we have just claimed) cannot be destroyed.
    177        MOZ_ASSERT(ourBuffer->Older());
    178        ourBuffer = ourBuffer->Older();
    179      }
    180 
    181      // Now we can set&validate the claimed element, and indicate that we have
    182      // not reached the end of a buffer.
    183      ourBuffer->SetAndValidateElement(aF, index);
    184      return false;
    185    }
    186  }
    187 
    188  // Push new element and assign it a value.
    189  // Atomically thread-safe; in the worst case some pushes may be blocked
    190  // while a new buffer is created/reused for them.
    191  // Returns whether that push reached the end of a buffer; useful if caller
    192  // wants to trigger processing regularly at the most efficient time.
    193  DidReachEndOfBuffer Push(const T& aT) {
    194    return PushF([&aT](T& aElement, Index) { aElement = aT; });
    195  }
    196 
    197  // Push new element and move-assign it a value.
    198  // Atomically thread-safe; in the worst case some pushes may be blocked
    199  // while a new buffer is created/reused for them.
    200  // Returns whether that push reached the end of a buffer; useful if caller
    201  // wants to trigger processing regularly at the most efficient time.
    202  DidReachEndOfBuffer Push(T&& aT) {
    203    return PushF([&aT](T& aElement, Index) { aElement = std::move(aT); });
    204  }
    205 
    206  // Pop all elements before the first invalid one, running aF on each of them
    207  // in FIFO order.
    208  // Thread-safety with other PopAll calls is controlled by the `Locking`
    209  // template argument.
    210  // Concurrent pushes are always allowed, because:
    211  // - PopAll won't read elements until valid,
    212  // - Pushes do not interfere with pop-related members -- except for
    213  //   mReusableBuffers, which is accessed atomically.
    214  template <typename F>
    215  void PopAll(F&& aF) {
    216    mReaderLocking.Lock();
    217    // Destroy every second fully-read buffer.
    218    // TODO: Research a better algorithm, probably based on stats.
    219    bool destroy = false;
    220    for (;;) {
    221      Buffer* b = mOldestBuffer;
    222      MOZ_ASSERT(!b->Older());
    223      // The next element to pop must be in that oldest buffer.
    224      MOZ_ASSERT(mNextElementToPop >= b->Origin());
    225      MOZ_ASSERT(mNextElementToPop < b->Origin() + BufferSize);
    226 
    227      // Start reading each element.
    228      if (!b->ReadAndInvalidateAll(aF, mNextElementToPop)) {
    229        // Found an invalid element, stop popping.
    230        mReaderLocking.Unlock();
    231        return;
    232      }
    233 
    234      // Reached the end of this oldest buffer
    235      MOZ_ASSERT(mNextElementToPop == b->Origin() + BufferSize);
    236      // Delete this oldest buffer.
    237      // Since the last element was valid, it must mean that there is a newer
    238      // buffer.
    239      MOZ_ASSERT(b->Newer());
    240      MOZ_ASSERT(mNextElementToPop == b->Newer()->Origin());
    241      StopUsing(b, destroy);
    242      destroy = !destroy;
    243 
    244      // We will loop and start reading the now-oldest buffer.
    245    }
    246  }
    247 
    248  // Size of all buffers (used, or recyclable), excluding external data.
    249  size_t ShallowSizeOfExcludingThis(MallocSizeOf aMallocSizeOf) const {
    250    return mAllocatedBuffersStats.Count() * sizeof(Buffer);
    251  }
    252 
    253  struct CountAndWatermark {
    254    int mCount;
    255    int mWatermark;
    256  };
    257 
    258  CountAndWatermark LiveBuffersStats() const { return mLiveBuffersStats.Get(); }
    259  CountAndWatermark ReusableBuffersStats() const {
    260    return mReusableBuffersStats.Get();
    261  }
    262  CountAndWatermark AllocatedBuffersStats() const {
    263    return mAllocatedBuffersStats.Get();
    264  }
    265 
    266 private:
    267  // Structure containing the element to be stored, and a validity-marker.
    268  class BufferedElement {
    269   public:
    270    // Run aF on an invalid element, and mark it as valid.
    271    template <typename F>
    272    void SetAndValidate(F&& aF, Index aIndex) {
    273      MOZ_ASSERT(!mValid);
    274      aF(mT, aIndex);
    275      mValid = true;
    276    }
    277 
    278    // Run aF on a valid element and mark it as invalid, return true.
    279    // Return false if element was invalid.
    280    template <typename F>
    281    bool ReadAndInvalidate(F&& aF) {
    282      if (!mValid) {
    283        return false;
    284      }
    285      aF(mT);
    286      mValid = false;
    287      return true;
    288    }
    289 
    290   private:
    291    T mT;
    292    // mValid should be atomically changed to true *after* mT has been written,
    293    // so that the reader can only see valid data.
    294    // ReleaseAcquire, because when set to `true`, we want the just-written mT
    295    // to be visible to the thread reading this `true`; and when set to `false`,
    296    // we want the previous reads to have completed.
    297    Atomic<bool, ReleaseAcquire> mValid{false};
    298  };
    299 
    300  // Buffer contains a sequence of BufferedElements starting at a specific
    301  // index, and it points to the next-older buffer (if any).
    302  class Buffer {
    303   public:
    304    // Constructor of the very first buffer.
    305    Buffer() : mOlder(nullptr), mNewer(nullptr), mOrigin(0) {}
    306 
    307    // Constructor of later buffers.
    308    Buffer(Buffer* aOlder, Index aOrigin)
    309        : mOlder(aOlder), mNewer(nullptr), mOrigin(aOrigin) {
    310      MOZ_ASSERT(aOlder);
    311      aOlder->mNewer = this;
    312    }
    313 
    314    Buffer* Older() const { return mOlder; }
    315    void SetOlder(Buffer* aOlder) { mOlder = aOlder; }
    316 
    317    Buffer* Newer() const { return mNewer; }
    318    void SetNewer(Buffer* aNewer) { mNewer = aNewer; }
    319 
    320    Index Origin() const { return mOrigin; }
    321    void SetOrigin(Index aOrigin) { mOrigin = aOrigin; }
    322 
    323    // Run aF on a yet-invalid element.
    324    // Not thread-safe by itself, but nothing else should write this element,
    325    // and reader won't access it until after it becomes valid.
    326    template <typename F>
    327    void SetAndValidateElement(F&& aF, Index aIndex) {
    328      MOZ_ASSERT(aIndex >= Origin());
    329      MOZ_ASSERT(aIndex < Origin() + BufferSize);
    330      mElements[aIndex - Origin()].SetAndValidate(aF, aIndex);
    331    }
    332 
    333    using DidReadLastElement = bool;
    334 
    335    // Read all valid elements starting at aIndex, marking them invalid and
    336    // updating aIndex.
    337    // Returns true if we ended up reading the last element in this buffer.
    338    // Accessing the validity bit is thread-safe (as it's atomic), but once
    339    // an element is valid, the reading itself is not thread-safe and should be
    340    // guarded.
    341    template <typename F>
    342    DidReadLastElement ReadAndInvalidateAll(F&& aF, Index& aIndex) {
    343      MOZ_ASSERT(aIndex >= Origin());
    344      MOZ_ASSERT(aIndex < Origin() + BufferSize);
    345      for (; aIndex < Origin() + BufferSize; ++aIndex) {
    346        if (!mElements[aIndex - Origin()].ReadAndInvalidate(aF)) {
    347          // Found an invalid element, stop here. (aIndex will not be updated
    348          // past it, so we will start from here next time.)
    349          return false;
    350        }
    351      }
    352      return true;
    353    }
    354 
    355   private:
    356    Buffer* mOlder;
    357    Buffer* mNewer;
    358    Index mOrigin;
    359    BufferedElement mElements[BufferSize];
    360  };
    361 
    362  // Reuse a buffer, or create a new one.
    363  // All buffered elements will be invalid.
    364  Buffer* NewBuffer(Buffer* aOlder, Index aOrigin) {
    365    MOZ_ASSERT(aOlder);
    366    for (;;) {
    367      Buffer* head = mReusableBuffers;
    368      if (!head) {
    369        ++mAllocatedBuffersStats;
    370        ++mLiveBuffersStats;
    371        Buffer* buffer = new Buffer(aOlder, aOrigin);
    372        return buffer;
    373      }
    374      Buffer* older = head->Older();
    375      // Try to pivot the reusable-buffer pointer from the current head to the
    376      // next buffer in line.
    377      if (mReusableBuffers.compareExchange(head, older)) {
    378        // Success! The reusable-buffer pointer now points at the older buffer,
    379        // so we can recycle this ex-head.
    380        --mReusableBuffersStats;
    381        ++mLiveBuffersStats;
    382        head->SetOlder(aOlder);
    383        aOlder->SetNewer(head);
    384        // We will be the newest; newer-pointer should already be null.
    385        MOZ_ASSERT(!head->Newer());
    386        head->SetOrigin(aOrigin);
    387        return head;
    388      }
    389      // Failure, someone else must have touched the list, loop to try again.
    390    }
    391  }
    392 
    393  // Discard a fully-read buffer.
    394  // If aDestroy is true, delete it.
    395  // If aDestroy is false, move the buffer to a reusable-buffer stack.
    396  void StopUsing(Buffer* aBuffer, bool aDestroy) {
    397    --mLiveBuffersStats;
    398 
    399    // We should only stop using the oldest buffer.
    400    MOZ_ASSERT(!aBuffer->Older());
    401    // The newest buffer should not be modified here.
    402    MOZ_ASSERT(aBuffer->Newer());
    403    MOZ_ASSERT(aBuffer->Newer()->Older() == aBuffer);
    404    // Detach from the second-oldest buffer.
    405    aBuffer->Newer()->SetOlder(nullptr);
    406    // Make the second-oldest buffer the now-oldest buffer.
    407    mOldestBuffer = aBuffer->Newer();
    408 
    409    if (aDestroy) {
    410      --mAllocatedBuffersStats;
    411      delete aBuffer;
    412    } else {
    413      ++mReusableBuffersStats;
    414      // The recycling stack only uses mOlder; mNewer is not needed.
    415      aBuffer->SetNewer(nullptr);
    416 
    417      // Make the given buffer the new head of reusable buffers.
    418      for (;;) {
    419        Buffer* head = mReusableBuffers;
    420        aBuffer->SetOlder(head);
    421        if (mReusableBuffers.compareExchange(head, aBuffer)) {
    422          break;
    423        }
    424      }
    425    }
    426  }
    427 
    428  // Index of the next element to write. Modified when an element index is
    429  // claimed for a push. If the last element of a buffer is claimed, that push
    430  // will be responsible for adding a new head buffer.
    431  // Relaxed, because there is no synchronization based on this variable, each
    432  // thread just needs to get a different value, and will then write different
    433  // things (which themselves have some atomic validation before they may be
    434  // read elsewhere, independent of this `mNextElementToWrite`.)
    435  Atomic<Index::ValueType, Relaxed> mNextElementToWrite{0};
    436 
    437  // Index that a live recent buffer reaches. If a push claims a lesser-or-
    438  // equal number, the corresponding buffer is guaranteed to still be alive:
    439  // - It will have been created before this index was updated,
    440  // - It will not be destroyed until all its values have been written,
    441  //   including the one that just claimed a position within it.
    442  // Also, the push that claims this exact number is responsible for adding the
    443  // next buffer and updating this value accordingly.
    444  // ReleaseAcquire, because when set to a certain value, the just-created
    445  // buffer covering the new range must be visible to readers.
    446  Atomic<Index::ValueType, ReleaseAcquire> mBuffersCoverAtLeastUpTo;
    447 
    448  // Pointer to the most recent buffer. Never null.
    449  // This is the most recent of a deque of yet-unread buffers.
    450  // Only modified when adding a new head buffer.
    451  // ReleaseAcquire, because when modified, the just-created new buffer must be
    452  // visible to readers.
    453  Atomic<Buffer*, ReleaseAcquire> mMostRecentBuffer;
    454 
    455  // Stack of reusable buffers.
    456  // ReleaseAcquire, because when modified, the just-added buffer must be
    457  // visible to readers.
    458  Atomic<Buffer*, ReleaseAcquire> mReusableBuffers;
    459 
    460  // Template-provided locking mechanism to protect PopAll()-only member
    461  // variables below.
    462  ReaderLocking mReaderLocking;
    463 
    464  // Pointer to the oldest buffer, which contains the new element to be popped.
    465  // Never null.
    466  Buffer* mOldestBuffer;
    467 
    468  // Index of the next element to be popped.
    469  Index mNextElementToPop{0};
    470 
    471  // Stats.
    472  class AtomicCountAndWatermark {
    473   public:
    474    explicit AtomicCountAndWatermark(int aCount)
    475        : mCount(aCount), mWatermark(aCount) {}
    476 
    477    int Count() const { return int(mCount); }
    478 
    479    CountAndWatermark Get() const {
    480      return CountAndWatermark{int(mCount), int(mWatermark)};
    481    }
    482 
    483    int operator++() {
    484      int count = int(++mCount);
    485      // Update watermark.
    486      for (;;) {
    487        int watermark = int(mWatermark);
    488        if (watermark >= count) {
    489          // printf("++[%p] -=> %d-%d\n", this, count, watermark);
    490          break;
    491        }
    492        if (mWatermark.compareExchange(watermark, count)) {
    493          // printf("++[%p] -x> %d-(was %d now %d)\n", this, count, watermark,
    494          // count);
    495          break;
    496        }
    497      }
    498      return count;
    499    }
    500 
    501    int operator--() {
    502      int count = int(--mCount);
    503      // printf("--[%p] -> %d\n", this, count);
    504      return count;
    505    }
    506 
    507   private:
    508    // Relaxed, as these are just gathering stats, so consistency is not
    509    // critical.
    510    Atomic<int, Relaxed> mCount;
    511    Atomic<int, Relaxed> mWatermark;
    512  };
    513  // All buffers in the mMostRecentBuffer deque.
    514  AtomicCountAndWatermark mLiveBuffersStats;
    515  // All buffers in the mReusableBuffers stack.
    516  AtomicCountAndWatermark mReusableBuffersStats;
    517  // All allocated buffers (sum of above).
    518  AtomicCountAndWatermark mAllocatedBuffersStats;
    519 };
    520 
    521 }  // namespace mozilla
    522 
    523 #endif  // mozilla_MultiWriterQueue_h_