MultiWriterQueue.h (19910B)
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */ 3 /* This Source Code Form is subject to the terms of the Mozilla Public 4 * License, v. 2.0. If a copy of the MPL was not distributed with this 5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 6 7 #ifndef mozilla_MultiWriterQueue_h_ 8 #define mozilla_MultiWriterQueue_h_ 9 10 #include <cstdint> 11 #include <utility> 12 13 #include "RollingNumber.h" 14 #include "mozilla/Atomics.h" 15 #include "mozilla/MemoryReporting.h" 16 #include "mozilla/Mutex.h" 17 #include "prthread.h" 18 19 namespace mozilla { 20 21 // Default reader locking strategy, using a mutex to ensure that concurrent 22 // PopAll calls won't overlap. 23 class MOZ_CAPABILITY("mutex") MultiWriterQueueReaderLocking_Mutex { 24 public: 25 MultiWriterQueueReaderLocking_Mutex() 26 : mMutex("MultiWriterQueueReaderLocking_Mutex") {} 27 void Lock() MOZ_CAPABILITY_ACQUIRE(mMutex) { mMutex.Lock(); }; 28 void Unlock() MOZ_CAPABILITY_RELEASE(mMutex) { mMutex.Unlock(); }; 29 30 private: 31 Mutex mMutex; 32 }; 33 34 // Reader non-locking strategy, trusting that PopAll will never be called 35 // concurrently (e.g., by only calling it from a specific thread). 36 class MOZ_CAPABILITY("dummy lock") MultiWriterQueueReaderLocking_None { 37 public: 38 #ifndef DEBUG 39 void Lock() MOZ_CAPABILITY_ACQUIRE() {}; 40 void Unlock() MOZ_CAPABILITY_RELEASE() {}; 41 #else 42 // DEBUG-mode checks to catch concurrent misuses. 43 void Lock() MOZ_CAPABILITY_ACQUIRE() { 44 MOZ_ASSERT(mLocked.compareExchange(false, true)); 45 }; 46 void Unlock() MOZ_CAPABILITY_RELEASE() { 47 MOZ_ASSERT(mLocked.compareExchange(true, false)); 48 }; 49 50 private: 51 Atomic<bool> mLocked{false}; 52 #endif 53 }; 54 55 static constexpr uint32_t MultiWriterQueueDefaultBufferSize = 8192; 56 57 // Multi-writer, single-reader queue of elements of type `T`. 58 // Elements are bunched together in buffers of `BufferSize` elements. 59 // 60 // This queue is heavily optimized for pushing. In most cases pushes will only 61 // cost a couple of atomic reads and a few non-atomic reads. Worst cases: 62 // - Once per buffer, a push will allocate or reuse a buffer for later pushes; 63 // - During the above new-buffer push, other pushes will be blocked. 64 // 65 // By default, popping is protected by mutex; it may be disabled if popping is 66 // guaranteed never to be concurrent. 67 // In any case, popping will never negatively impact pushes. 68 // (However, *not* popping will add runtime costs, as unread buffers will not 69 // be freed, or made available to future pushes; Push functions provide 70 // feedback as to when popping would be most efficient.) 71 template <typename T, uint32_t BufferSize = MultiWriterQueueDefaultBufferSize, 72 typename ReaderLocking = MultiWriterQueueReaderLocking_Mutex> 73 class MultiWriterQueue { 74 static_assert(BufferSize > 0, "0-sized MultiWriterQueue buffer"); 75 76 public: 77 // Constructor. 78 // Allocates the initial buffer that will receive the first `BufferSize` 79 // elements. Also allocates one reusable buffer, which will definitely be 80 // needed after the first `BufferSize` elements have been pushed. 81 // Ideally (if the reader can process each buffer quickly enough), there 82 // won't be a need for more buffer allocations. 83 MultiWriterQueue() 84 : mBuffersCoverAtLeastUpTo(BufferSize - 1), 85 mMostRecentBuffer(new Buffer{}), 86 mReusableBuffers(new Buffer{}), 87 mOldestBuffer(static_cast<Buffer*>(mMostRecentBuffer)), 88 mLiveBuffersStats(1), 89 mReusableBuffersStats(1), 90 mAllocatedBuffersStats(2) {} 91 92 ~MultiWriterQueue() { 93 auto DestroyList = [](Buffer* aBuffer) { 94 while (aBuffer) { 95 Buffer* older = aBuffer->Older(); 96 delete aBuffer; 97 aBuffer = older; 98 } 99 }; 100 DestroyList(mMostRecentBuffer); 101 DestroyList(mReusableBuffers); 102 } 103 104 // We need the index to be order-resistant to overflow, i.e., numbers before 105 // an overflow should test smaller-than numbers after the overflow. 106 // This is because we keep pushing elements with increasing Index, and this 107 // Index is used to find the appropriate buffer based on a range; and this 108 // need to work smoothly when crossing the overflow boundary. 109 using Index = RollingNumber<uint32_t>; 110 111 // Pushes indicate whether they have just reached the end of a buffer. 112 using DidReachEndOfBuffer = bool; 113 114 // Push new element and call aF on it. 115 // Element may be in just-created state, or recycled after a PopAll call. 116 // Atomically thread-safe; in the worst case some pushes may be blocked 117 // while a new buffer is created/reused for them. 118 // Returns whether that push reached the end of a buffer; useful if caller 119 // wants to trigger processing regularly at the most efficient time. 120 template <typename F> 121 DidReachEndOfBuffer PushF(F&& aF) { 122 // Atomically claim ownership of the next available element. 123 const Index index{mNextElementToWrite++}; 124 // And now go and set that element. 125 for (;;) { 126 Index lastIndex{mBuffersCoverAtLeastUpTo}; 127 128 if (MOZ_UNLIKELY(index == lastIndex)) { 129 // We have claimed the last element in the current head -> Allocate a 130 // new head in advance of more pushes. Make it point at the current 131 // most-recent buffer. 132 // This whole process is effectively guarded: 133 // - Later pushes will wait until mBuffersCoverAtLeastUpTo changes to 134 // one that can accept their claimed index. 135 // - Readers will stop until the last element is marked as valid. 136 Buffer* ourBuffer = mMostRecentBuffer; 137 Buffer* newBuffer = NewBuffer(ourBuffer, index + 1); 138 // Because we have claimed this very specific index, we should be the 139 // only one touching the most-recent buffer pointer. 140 MOZ_ASSERT(mMostRecentBuffer == ourBuffer); 141 // Just pivot the most-recent buffer pointer to our new buffer. 142 mMostRecentBuffer = newBuffer; 143 // Because we have claimed this very specific index, we should be the 144 // only one touching the buffer coverage watermark. 145 MOZ_ASSERT(mBuffersCoverAtLeastUpTo == lastIndex.Value()); 146 // Update it to include the just-added most-recent buffer. 147 mBuffersCoverAtLeastUpTo = index.Value() + BufferSize; 148 // We know for sure that `ourBuffer` is the correct one for this index. 149 ourBuffer->SetAndValidateElement(aF, index); 150 // And indicate that we have reached the end of a buffer. 151 return true; 152 } 153 154 if (MOZ_UNLIKELY(index > lastIndex)) { 155 // We have claimed an element in a yet-unavailable buffer, wait for our 156 // target buffer to be created (see above). 157 while (Index(mBuffersCoverAtLeastUpTo) < index) { 158 PR_Sleep(PR_INTERVAL_NO_WAIT); // Yield 159 } 160 // Then loop to examine the new situation. 161 continue; 162 } 163 164 // Here, we have claimed a number that is covered by current buffers. 165 // These buffers cannot be destroyed, because our buffer is not filled 166 // yet (we haven't written in it yet), therefore the reader thread will 167 // have to stop there (or before) and won't destroy our buffer or more 168 // recent ones. 169 MOZ_ASSERT(index < lastIndex); 170 Buffer* ourBuffer = mMostRecentBuffer; 171 172 // In rare situations, another thread may have had the time to create a 173 // new more-recent buffer, in which case we need to find our older buffer. 174 while (MOZ_UNLIKELY(index < ourBuffer->Origin())) { 175 // We assume that older buffers with still-invalid elements (e.g., the 176 // one we have just claimed) cannot be destroyed. 177 MOZ_ASSERT(ourBuffer->Older()); 178 ourBuffer = ourBuffer->Older(); 179 } 180 181 // Now we can set&validate the claimed element, and indicate that we have 182 // not reached the end of a buffer. 183 ourBuffer->SetAndValidateElement(aF, index); 184 return false; 185 } 186 } 187 188 // Push new element and assign it a value. 189 // Atomically thread-safe; in the worst case some pushes may be blocked 190 // while a new buffer is created/reused for them. 191 // Returns whether that push reached the end of a buffer; useful if caller 192 // wants to trigger processing regularly at the most efficient time. 193 DidReachEndOfBuffer Push(const T& aT) { 194 return PushF([&aT](T& aElement, Index) { aElement = aT; }); 195 } 196 197 // Push new element and move-assign it a value. 198 // Atomically thread-safe; in the worst case some pushes may be blocked 199 // while a new buffer is created/reused for them. 200 // Returns whether that push reached the end of a buffer; useful if caller 201 // wants to trigger processing regularly at the most efficient time. 202 DidReachEndOfBuffer Push(T&& aT) { 203 return PushF([&aT](T& aElement, Index) { aElement = std::move(aT); }); 204 } 205 206 // Pop all elements before the first invalid one, running aF on each of them 207 // in FIFO order. 208 // Thread-safety with other PopAll calls is controlled by the `Locking` 209 // template argument. 210 // Concurrent pushes are always allowed, because: 211 // - PopAll won't read elements until valid, 212 // - Pushes do not interfere with pop-related members -- except for 213 // mReusableBuffers, which is accessed atomically. 214 template <typename F> 215 void PopAll(F&& aF) { 216 mReaderLocking.Lock(); 217 // Destroy every second fully-read buffer. 218 // TODO: Research a better algorithm, probably based on stats. 219 bool destroy = false; 220 for (;;) { 221 Buffer* b = mOldestBuffer; 222 MOZ_ASSERT(!b->Older()); 223 // The next element to pop must be in that oldest buffer. 224 MOZ_ASSERT(mNextElementToPop >= b->Origin()); 225 MOZ_ASSERT(mNextElementToPop < b->Origin() + BufferSize); 226 227 // Start reading each element. 228 if (!b->ReadAndInvalidateAll(aF, mNextElementToPop)) { 229 // Found an invalid element, stop popping. 230 mReaderLocking.Unlock(); 231 return; 232 } 233 234 // Reached the end of this oldest buffer 235 MOZ_ASSERT(mNextElementToPop == b->Origin() + BufferSize); 236 // Delete this oldest buffer. 237 // Since the last element was valid, it must mean that there is a newer 238 // buffer. 239 MOZ_ASSERT(b->Newer()); 240 MOZ_ASSERT(mNextElementToPop == b->Newer()->Origin()); 241 StopUsing(b, destroy); 242 destroy = !destroy; 243 244 // We will loop and start reading the now-oldest buffer. 245 } 246 } 247 248 // Size of all buffers (used, or recyclable), excluding external data. 249 size_t ShallowSizeOfExcludingThis(MallocSizeOf aMallocSizeOf) const { 250 return mAllocatedBuffersStats.Count() * sizeof(Buffer); 251 } 252 253 struct CountAndWatermark { 254 int mCount; 255 int mWatermark; 256 }; 257 258 CountAndWatermark LiveBuffersStats() const { return mLiveBuffersStats.Get(); } 259 CountAndWatermark ReusableBuffersStats() const { 260 return mReusableBuffersStats.Get(); 261 } 262 CountAndWatermark AllocatedBuffersStats() const { 263 return mAllocatedBuffersStats.Get(); 264 } 265 266 private: 267 // Structure containing the element to be stored, and a validity-marker. 268 class BufferedElement { 269 public: 270 // Run aF on an invalid element, and mark it as valid. 271 template <typename F> 272 void SetAndValidate(F&& aF, Index aIndex) { 273 MOZ_ASSERT(!mValid); 274 aF(mT, aIndex); 275 mValid = true; 276 } 277 278 // Run aF on a valid element and mark it as invalid, return true. 279 // Return false if element was invalid. 280 template <typename F> 281 bool ReadAndInvalidate(F&& aF) { 282 if (!mValid) { 283 return false; 284 } 285 aF(mT); 286 mValid = false; 287 return true; 288 } 289 290 private: 291 T mT; 292 // mValid should be atomically changed to true *after* mT has been written, 293 // so that the reader can only see valid data. 294 // ReleaseAcquire, because when set to `true`, we want the just-written mT 295 // to be visible to the thread reading this `true`; and when set to `false`, 296 // we want the previous reads to have completed. 297 Atomic<bool, ReleaseAcquire> mValid{false}; 298 }; 299 300 // Buffer contains a sequence of BufferedElements starting at a specific 301 // index, and it points to the next-older buffer (if any). 302 class Buffer { 303 public: 304 // Constructor of the very first buffer. 305 Buffer() : mOlder(nullptr), mNewer(nullptr), mOrigin(0) {} 306 307 // Constructor of later buffers. 308 Buffer(Buffer* aOlder, Index aOrigin) 309 : mOlder(aOlder), mNewer(nullptr), mOrigin(aOrigin) { 310 MOZ_ASSERT(aOlder); 311 aOlder->mNewer = this; 312 } 313 314 Buffer* Older() const { return mOlder; } 315 void SetOlder(Buffer* aOlder) { mOlder = aOlder; } 316 317 Buffer* Newer() const { return mNewer; } 318 void SetNewer(Buffer* aNewer) { mNewer = aNewer; } 319 320 Index Origin() const { return mOrigin; } 321 void SetOrigin(Index aOrigin) { mOrigin = aOrigin; } 322 323 // Run aF on a yet-invalid element. 324 // Not thread-safe by itself, but nothing else should write this element, 325 // and reader won't access it until after it becomes valid. 326 template <typename F> 327 void SetAndValidateElement(F&& aF, Index aIndex) { 328 MOZ_ASSERT(aIndex >= Origin()); 329 MOZ_ASSERT(aIndex < Origin() + BufferSize); 330 mElements[aIndex - Origin()].SetAndValidate(aF, aIndex); 331 } 332 333 using DidReadLastElement = bool; 334 335 // Read all valid elements starting at aIndex, marking them invalid and 336 // updating aIndex. 337 // Returns true if we ended up reading the last element in this buffer. 338 // Accessing the validity bit is thread-safe (as it's atomic), but once 339 // an element is valid, the reading itself is not thread-safe and should be 340 // guarded. 341 template <typename F> 342 DidReadLastElement ReadAndInvalidateAll(F&& aF, Index& aIndex) { 343 MOZ_ASSERT(aIndex >= Origin()); 344 MOZ_ASSERT(aIndex < Origin() + BufferSize); 345 for (; aIndex < Origin() + BufferSize; ++aIndex) { 346 if (!mElements[aIndex - Origin()].ReadAndInvalidate(aF)) { 347 // Found an invalid element, stop here. (aIndex will not be updated 348 // past it, so we will start from here next time.) 349 return false; 350 } 351 } 352 return true; 353 } 354 355 private: 356 Buffer* mOlder; 357 Buffer* mNewer; 358 Index mOrigin; 359 BufferedElement mElements[BufferSize]; 360 }; 361 362 // Reuse a buffer, or create a new one. 363 // All buffered elements will be invalid. 364 Buffer* NewBuffer(Buffer* aOlder, Index aOrigin) { 365 MOZ_ASSERT(aOlder); 366 for (;;) { 367 Buffer* head = mReusableBuffers; 368 if (!head) { 369 ++mAllocatedBuffersStats; 370 ++mLiveBuffersStats; 371 Buffer* buffer = new Buffer(aOlder, aOrigin); 372 return buffer; 373 } 374 Buffer* older = head->Older(); 375 // Try to pivot the reusable-buffer pointer from the current head to the 376 // next buffer in line. 377 if (mReusableBuffers.compareExchange(head, older)) { 378 // Success! The reusable-buffer pointer now points at the older buffer, 379 // so we can recycle this ex-head. 380 --mReusableBuffersStats; 381 ++mLiveBuffersStats; 382 head->SetOlder(aOlder); 383 aOlder->SetNewer(head); 384 // We will be the newest; newer-pointer should already be null. 385 MOZ_ASSERT(!head->Newer()); 386 head->SetOrigin(aOrigin); 387 return head; 388 } 389 // Failure, someone else must have touched the list, loop to try again. 390 } 391 } 392 393 // Discard a fully-read buffer. 394 // If aDestroy is true, delete it. 395 // If aDestroy is false, move the buffer to a reusable-buffer stack. 396 void StopUsing(Buffer* aBuffer, bool aDestroy) { 397 --mLiveBuffersStats; 398 399 // We should only stop using the oldest buffer. 400 MOZ_ASSERT(!aBuffer->Older()); 401 // The newest buffer should not be modified here. 402 MOZ_ASSERT(aBuffer->Newer()); 403 MOZ_ASSERT(aBuffer->Newer()->Older() == aBuffer); 404 // Detach from the second-oldest buffer. 405 aBuffer->Newer()->SetOlder(nullptr); 406 // Make the second-oldest buffer the now-oldest buffer. 407 mOldestBuffer = aBuffer->Newer(); 408 409 if (aDestroy) { 410 --mAllocatedBuffersStats; 411 delete aBuffer; 412 } else { 413 ++mReusableBuffersStats; 414 // The recycling stack only uses mOlder; mNewer is not needed. 415 aBuffer->SetNewer(nullptr); 416 417 // Make the given buffer the new head of reusable buffers. 418 for (;;) { 419 Buffer* head = mReusableBuffers; 420 aBuffer->SetOlder(head); 421 if (mReusableBuffers.compareExchange(head, aBuffer)) { 422 break; 423 } 424 } 425 } 426 } 427 428 // Index of the next element to write. Modified when an element index is 429 // claimed for a push. If the last element of a buffer is claimed, that push 430 // will be responsible for adding a new head buffer. 431 // Relaxed, because there is no synchronization based on this variable, each 432 // thread just needs to get a different value, and will then write different 433 // things (which themselves have some atomic validation before they may be 434 // read elsewhere, independent of this `mNextElementToWrite`.) 435 Atomic<Index::ValueType, Relaxed> mNextElementToWrite{0}; 436 437 // Index that a live recent buffer reaches. If a push claims a lesser-or- 438 // equal number, the corresponding buffer is guaranteed to still be alive: 439 // - It will have been created before this index was updated, 440 // - It will not be destroyed until all its values have been written, 441 // including the one that just claimed a position within it. 442 // Also, the push that claims this exact number is responsible for adding the 443 // next buffer and updating this value accordingly. 444 // ReleaseAcquire, because when set to a certain value, the just-created 445 // buffer covering the new range must be visible to readers. 446 Atomic<Index::ValueType, ReleaseAcquire> mBuffersCoverAtLeastUpTo; 447 448 // Pointer to the most recent buffer. Never null. 449 // This is the most recent of a deque of yet-unread buffers. 450 // Only modified when adding a new head buffer. 451 // ReleaseAcquire, because when modified, the just-created new buffer must be 452 // visible to readers. 453 Atomic<Buffer*, ReleaseAcquire> mMostRecentBuffer; 454 455 // Stack of reusable buffers. 456 // ReleaseAcquire, because when modified, the just-added buffer must be 457 // visible to readers. 458 Atomic<Buffer*, ReleaseAcquire> mReusableBuffers; 459 460 // Template-provided locking mechanism to protect PopAll()-only member 461 // variables below. 462 ReaderLocking mReaderLocking; 463 464 // Pointer to the oldest buffer, which contains the new element to be popped. 465 // Never null. 466 Buffer* mOldestBuffer; 467 468 // Index of the next element to be popped. 469 Index mNextElementToPop{0}; 470 471 // Stats. 472 class AtomicCountAndWatermark { 473 public: 474 explicit AtomicCountAndWatermark(int aCount) 475 : mCount(aCount), mWatermark(aCount) {} 476 477 int Count() const { return int(mCount); } 478 479 CountAndWatermark Get() const { 480 return CountAndWatermark{int(mCount), int(mWatermark)}; 481 } 482 483 int operator++() { 484 int count = int(++mCount); 485 // Update watermark. 486 for (;;) { 487 int watermark = int(mWatermark); 488 if (watermark >= count) { 489 // printf("++[%p] -=> %d-%d\n", this, count, watermark); 490 break; 491 } 492 if (mWatermark.compareExchange(watermark, count)) { 493 // printf("++[%p] -x> %d-(was %d now %d)\n", this, count, watermark, 494 // count); 495 break; 496 } 497 } 498 return count; 499 } 500 501 int operator--() { 502 int count = int(--mCount); 503 // printf("--[%p] -> %d\n", this, count); 504 return count; 505 } 506 507 private: 508 // Relaxed, as these are just gathering stats, so consistency is not 509 // critical. 510 Atomic<int, Relaxed> mCount; 511 Atomic<int, Relaxed> mWatermark; 512 }; 513 // All buffers in the mMostRecentBuffer deque. 514 AtomicCountAndWatermark mLiveBuffersStats; 515 // All buffers in the mReusableBuffers stack. 516 AtomicCountAndWatermark mReusableBuffersStats; 517 // All allocated buffers (sum of above). 518 AtomicCountAndWatermark mAllocatedBuffersStats; 519 }; 520 521 } // namespace mozilla 522 523 #endif // mozilla_MultiWriterQueue_h_