tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

TestMultiWriterQueue.cpp (14564B)


      1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
      2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
      3 /* This Source Code Form is subject to the terms of the Mozilla Public
      4 * License, v. 2.0. If a copy of the MPL was not distributed with this file,
      5 * You can obtain one at http://mozilla.org/MPL/2.0/. */
      6 
      7 #include <gtest/gtest.h>
      8 
      9 #include <type_traits>
     10 
     11 #include "DDTimeStamp.h"
     12 #include "MultiWriterQueue.h"
     13 #include "mozilla/gtest/MozAssertions.h"
     14 #include "nsDeque.h"
     15 #include "nsIThread.h"
     16 #include "nsThreadUtils.h"
     17 
     18 using mozilla::MultiWriterQueue;
     19 using mozilla::MultiWriterQueueDefaultBufferSize;
     20 using mozilla::MultiWriterQueueReaderLocking_Mutex;
     21 using mozilla::MultiWriterQueueReaderLocking_None;
     22 
     23 template <size_t BufferSize>
     24 static void TestMultiWriterQueueST(const int loops) {
     25  using Q = MultiWriterQueue<int, BufferSize>;
     26  Q q;
     27 
     28  int pushes = 0;
     29  // Go through 2 cycles of pushes&pops, to exercize reusable buffers.
     30  for (int max = loops; max <= loops * 2; max *= 2) {
     31    // Push all numbers.
     32    for (int i = 1; i <= max; ++i) {
     33      bool newBuffer = q.Push(i);
     34      // A new buffer should be added at the last push of each buffer.
     35      EXPECT_EQ(++pushes % BufferSize == 0, newBuffer);
     36    }
     37 
     38    // Pop numbers, should be FIFO.
     39    int x = 0;
     40    q.PopAll([&](int& i) { EXPECT_EQ(++x, i); });
     41 
     42    // We should have got all numbers.
     43    EXPECT_EQ(max, x);
     44 
     45    // Nothing left.
     46    q.PopAll([&](int&) { EXPECT_TRUE(false); });
     47  }
     48 }
     49 
     50 TEST(MultiWriterQueue, SingleThreaded)
     51 {
     52  TestMultiWriterQueueST<1>(10);
     53  TestMultiWriterQueueST<2>(10);
     54  TestMultiWriterQueueST<4>(10);
     55 
     56  TestMultiWriterQueueST<10>(9);
     57  TestMultiWriterQueueST<10>(10);
     58  TestMultiWriterQueueST<10>(11);
     59  TestMultiWriterQueueST<10>(19);
     60  TestMultiWriterQueueST<10>(20);
     61  TestMultiWriterQueueST<10>(21);
     62  TestMultiWriterQueueST<10>(999);
     63  TestMultiWriterQueueST<10>(1000);
     64  TestMultiWriterQueueST<10>(1001);
     65 
     66  TestMultiWriterQueueST<8192>(8192 * 4 + 1);
     67 }
     68 
     69 template <typename Q>
     70 static void TestMultiWriterQueueMT(int aWriterThreads, int aReaderThreads,
     71                                   int aTotalLoops, const char* aPrintPrefix) {
     72  Q q;
     73 
     74  const int threads = aWriterThreads + aReaderThreads;
     75  const int loops = aTotalLoops / aWriterThreads;
     76 
     77  nsIThread** array = new nsIThread*[threads];
     78 
     79  mozilla::Atomic<int> pushThreadsCompleted{0};
     80  int pops = 0;
     81 
     82  nsCOMPtr<nsIRunnable> popper = NS_NewRunnableFunction("MWQPopper", [&]() {
     83    // int popsBefore = pops;
     84    // int allocsBefore = q.AllocatedBuffersStats().mCount;
     85    q.PopAll([&pops](const int& i) { ++pops; });
     86    // if (pops != popsBefore ||
     87    //     q.AllocatedBuffersStats().mCount != allocsBefore) {
     88    //   printf("%s threads=1+%d loops/thread=%d pops=%d "
     89    //          "buffers: live=%d (w %d) reusable=%d (w %d) "
     90    //          "alloc=%d (w %d)\n",
     91    //          aPrintPrefix,
     92    //          aWriterThreads,
     93    //          loops,
     94    //          pops,
     95    //          q.LiveBuffersStats().mCount,
     96    //          q.LiveBuffersStats().mWatermark,
     97    //          q.ReusableBuffersStats().mCount,
     98    //          q.ReusableBuffersStats().mWatermark,
     99    //          q.AllocatedBuffersStats().mCount,
    100    //          q.AllocatedBuffersStats().mWatermark);
    101    // }
    102  });
    103  // Cycle through reader threads.
    104  mozilla::Atomic<size_t> readerThread{0};
    105 
    106  double start = mozilla::ToSeconds(mozilla::DDNow());
    107 
    108  for (int k = 0; k < threads; k++) {
    109    // First `aReaderThreads` threads to pop, all others to push.
    110    if (k < aReaderThreads) {
    111      nsCOMPtr<nsIThread> t;
    112      nsresult rv = NS_NewNamedThread("MWQThread", getter_AddRefs(t));
    113      EXPECT_NS_SUCCEEDED(rv);
    114      NS_ADDREF(array[k] = t);
    115    } else {
    116      nsCOMPtr<nsIThread> t;
    117      nsCOMPtr<nsIRunnable> r = NS_NewRunnableFunction("MWQPusher", [&, k]() {
    118        // Give a bit of breathing space to construct other threads.
    119        PR_Sleep(PR_MillisecondsToInterval(100));
    120 
    121        for (int i = 0; i < loops; ++i) {
    122          if (q.Push(k * threads + i) && aReaderThreads != 0) {
    123            // Run a popper task every time we push the last element of a
    124            // buffer.
    125            array[++readerThread % aReaderThreads]->Dispatch(
    126                popper, nsIThread::DISPATCH_NORMAL);
    127          }
    128        }
    129        ++pushThreadsCompleted;
    130      });
    131      nsresult rv = NS_NewNamedThread("MWQThread", getter_AddRefs(t), r);
    132      EXPECT_NS_SUCCEEDED(rv);
    133      NS_ADDREF(array[k] = t);
    134    }
    135  }
    136 
    137  for (int k = threads - 1; k >= 0; k--) {
    138    array[k]->Shutdown();
    139    NS_RELEASE(array[k]);
    140  }
    141  delete[] array;
    142 
    143  // There may be a few more elements that haven't been read yet.
    144  q.PopAll([&pops](const int& i) { ++pops; });
    145  const int pushes = aWriterThreads * loops;
    146  EXPECT_EQ(pushes, pops);
    147  q.PopAll([](const int& i) { EXPECT_TRUE(false); });
    148 
    149  double duration = mozilla::ToSeconds(mozilla::DDNow()) - start - 0.1;
    150  printf(
    151      "%s threads=%dw+%dr loops/thread=%d pushes=pops=%d duration=%fs "
    152      "pushes/s=%f buffers: live=%d (w %d) reusable=%d (w %d) "
    153      "alloc=%d (w %d)\n",
    154      aPrintPrefix, aWriterThreads, aReaderThreads, loops, pushes, duration,
    155      pushes / duration, q.LiveBuffersStats().mCount,
    156      q.LiveBuffersStats().mWatermark, q.ReusableBuffersStats().mCount,
    157      q.ReusableBuffersStats().mWatermark, q.AllocatedBuffersStats().mCount,
    158      q.AllocatedBuffersStats().mWatermark);
    159 }
    160 
    161 // skip test on windows10-aarch64 due to unexpected test timeout at
    162 // MultiWriterSingleReader, bug 1526001
    163 #if !defined(_M_ARM64)
    164 TEST(MultiWriterQueue, MultiWriterSingleReader)
    165 {
    166  // Small BufferSize, to exercize the buffer management code.
    167  TestMultiWriterQueueMT<
    168      MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_None>>(
    169      1, 0, 2 * 1024 * 1024, "MultiWriterQueue<int, 10, Locking_None>");
    170  TestMultiWriterQueueMT<
    171      MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_None>>(
    172      1, 1, 2 * 1024 * 1024, "MultiWriterQueue<int, 10, Locking_None>");
    173  TestMultiWriterQueueMT<
    174      MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_None>>(
    175      2, 1, 2 * 1024 * 1024, "MultiWriterQueue<int, 10, Locking_None>");
    176  TestMultiWriterQueueMT<
    177      MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_None>>(
    178      3, 1, 2 * 1024 * 1024, "MultiWriterQueue<int, 10, Locking_None>");
    179  TestMultiWriterQueueMT<
    180      MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_None>>(
    181      4, 1, 2 * 1024 * 1024, "MultiWriterQueue<int, 10, Locking_None>");
    182  TestMultiWriterQueueMT<
    183      MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_None>>(
    184      5, 1, 2 * 1024 * 1024, "MultiWriterQueue<int, 10, Locking_None>");
    185  TestMultiWriterQueueMT<
    186      MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_None>>(
    187      6, 1, 2 * 1024 * 1024, "MultiWriterQueue<int, 10, Locking_None>");
    188  TestMultiWriterQueueMT<
    189      MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_None>>(
    190      7, 1, 2 * 1024 * 1024, "MultiWriterQueue<int, 10, Locking_None>");
    191  TestMultiWriterQueueMT<
    192      MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_None>>(
    193      8, 1, 2 * 1024 * 1024, "MultiWriterQueue<int, 10, Locking_None>");
    194  TestMultiWriterQueueMT<
    195      MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_None>>(
    196      9, 1, 2 * 1024 * 1024, "MultiWriterQueue<int, 10, Locking_None>");
    197  TestMultiWriterQueueMT<
    198      MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_None>>(
    199      10, 1, 2 * 1024 * 1024, "MultiWriterQueue<int, 10, Locking_None>");
    200  TestMultiWriterQueueMT<
    201      MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_None>>(
    202      16, 1, 2 * 1024 * 1024, "MultiWriterQueue<int, 10, Locking_None>");
    203  TestMultiWriterQueueMT<
    204      MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_None>>(
    205      32, 1, 2 * 1024 * 1024, "MultiWriterQueue<int, 10, Locking_None>");
    206  TestMultiWriterQueueMT<
    207      MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_None>>(
    208      64, 1, 2 * 1024 * 1024, "MultiWriterQueue<int, 10, Locking_None>");
    209 
    210  // A more real-life buffer size.
    211  TestMultiWriterQueueMT<
    212      MultiWriterQueue<int, MultiWriterQueueDefaultBufferSize,
    213                       MultiWriterQueueReaderLocking_None>>(
    214      64, 1, 2 * 1024 * 1024,
    215      "MultiWriterQueue<int, DefaultBufferSize, Locking_None>");
    216 
    217  // DEBUG-mode thread-safety checks should make the following (multi-reader
    218  // with no locking) crash; uncomment to verify.
    219  // TestMultiWriterQueueMT<
    220  //   MultiWriterQueue<int, MultiWriterQueueDefaultBufferSize,
    221  //   MultiWriterQueueReaderLocking_None>>(64, 2, 2*1024*1024);
    222 }
    223 #endif
    224 
    225 // skip test on windows10-aarch64 due to unexpected test timeout at
    226 // MultiWriterMultiReade, bug 1526001
    227 #if !defined(_M_ARM64)
    228 TEST(MultiWriterQueue, MultiWriterMultiReader)
    229 {
    230  static_assert(
    231      std::is_same_v<
    232          MultiWriterQueue<int, 10>,
    233          MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_Mutex>>,
    234      "MultiWriterQueue reader locking should use Mutex by default");
    235 
    236  // Small BufferSize, to exercize the buffer management code.
    237  TestMultiWriterQueueMT<
    238      MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_Mutex>>(
    239      1, 2, 1024 * 1024, "MultiWriterQueue<int, 10, Locking_Mutex>");
    240  TestMultiWriterQueueMT<
    241      MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_Mutex>>(
    242      2, 2, 1024 * 1024, "MultiWriterQueue<int, 10, Locking_Mutex>");
    243  TestMultiWriterQueueMT<
    244      MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_Mutex>>(
    245      3, 2, 1024 * 1024, "MultiWriterQueue<int, 10, Locking_Mutex>");
    246  TestMultiWriterQueueMT<
    247      MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_Mutex>>(
    248      4, 2, 1024 * 1024, "MultiWriterQueue<int, 10, Locking_Mutex>");
    249  TestMultiWriterQueueMT<
    250      MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_Mutex>>(
    251      5, 2, 1024 * 1024, "MultiWriterQueue<int, 10, Locking_Mutex>");
    252  TestMultiWriterQueueMT<
    253      MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_Mutex>>(
    254      6, 2, 1024 * 1024, "MultiWriterQueue<int, 10, Locking_Mutex>");
    255  TestMultiWriterQueueMT<
    256      MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_Mutex>>(
    257      7, 2, 1024 * 1024, "MultiWriterQueue<int, 10, Locking_Mutex>");
    258  TestMultiWriterQueueMT<
    259      MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_Mutex>>(
    260      8, 2, 1024 * 1024, "MultiWriterQueue<int, 10, Locking_Mutex>");
    261  TestMultiWriterQueueMT<
    262      MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_Mutex>>(
    263      9, 2, 1024 * 1024, "MultiWriterQueue<int, 10, Locking_Mutex>");
    264  TestMultiWriterQueueMT<
    265      MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_Mutex>>(
    266      10, 4, 1024 * 1024, "MultiWriterQueue<int, 10, Locking_Mutex>");
    267  TestMultiWriterQueueMT<
    268      MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_Mutex>>(
    269      16, 8, 1024 * 1024, "MultiWriterQueue<int, 10, Locking_Mutex>");
    270  TestMultiWriterQueueMT<
    271      MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_Mutex>>(
    272      32, 16, 1024 * 1024, "MultiWriterQueue<int, 10, Locking_Mutex>");
    273  TestMultiWriterQueueMT<
    274      MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_Mutex>>(
    275      64, 32, 1024 * 1024, "MultiWriterQueue<int, 10, Locking_Mutex>");
    276 
    277  // A more real-life buffer size.
    278  TestMultiWriterQueueMT<
    279      MultiWriterQueue<int, MultiWriterQueueDefaultBufferSize,
    280                       MultiWriterQueueReaderLocking_Mutex>>(
    281      64, 32, 1024 * 1024,
    282      "MultiWriterQueue<int, DefaultBufferSize, Locking_Mutex>");
    283 }
    284 #endif
    285 
    286 // Single-threaded use only.
    287 struct DequeWrapperST {
    288  nsDeque<void> mDQ;
    289 
    290  bool Push(int i) {
    291    mDQ.PushFront(reinterpret_cast<void*>(static_cast<uintptr_t>(i)));
    292    return true;
    293  }
    294  template <typename F>
    295  void PopAll(F&& aF) {
    296    while (mDQ.GetSize() != 0) {
    297      int i = static_cast<int>(reinterpret_cast<uintptr_t>(mDQ.Pop()));
    298      aF(i);
    299    }
    300  }
    301 
    302  struct CountAndWatermark {
    303    int mCount = 0;
    304    int mWatermark = 0;
    305  } mLiveBuffersStats, mReusableBuffersStats, mAllocatedBuffersStats;
    306 
    307  CountAndWatermark LiveBuffersStats() const { return mLiveBuffersStats; }
    308  CountAndWatermark ReusableBuffersStats() const {
    309    return mReusableBuffersStats;
    310  }
    311  CountAndWatermark AllocatedBuffersStats() const {
    312    return mAllocatedBuffersStats;
    313  }
    314 };
    315 
    316 // Multi-thread (atomic) writes allowed, make sure you don't pop unless writes
    317 // can't happen.
    318 struct DequeWrapperAW : DequeWrapperST {
    319  mozilla::Atomic<bool> mWriting{false};
    320 
    321  bool Push(int i) {
    322    while (!mWriting.compareExchange(false, true)) {
    323    }
    324    mDQ.PushFront(reinterpret_cast<void*>(static_cast<uintptr_t>(i)));
    325    mWriting = false;
    326    return true;
    327  }
    328 };
    329 
    330 // Multi-thread writes allowed, make sure you don't pop unless writes can't
    331 // happen.
    332 struct DequeWrapperMW : DequeWrapperST {
    333  mozilla::Mutex mMutex MOZ_UNANNOTATED;
    334 
    335  DequeWrapperMW() : mMutex("DequeWrapperMW/MT") {}
    336 
    337  bool Push(int i) {
    338    mozilla::MutexAutoLock lock(mMutex);
    339    mDQ.PushFront(reinterpret_cast<void*>(static_cast<uintptr_t>(i)));
    340    return true;
    341  }
    342 };
    343 
    344 // Multi-thread read&writes allowed.
    345 struct DequeWrapperMT : DequeWrapperMW {
    346  template <typename F>
    347  void PopAll(F&& aF) {
    348    while (mDQ.GetSize() != 0) {
    349      int i;
    350      {
    351        mozilla::MutexAutoLock lock(mMutex);
    352        i = static_cast<int>(reinterpret_cast<uintptr_t>(mDQ.Pop()));
    353      }
    354      aF(i);
    355    }
    356  }
    357 };
    358 
    359 TEST(MultiWriterQueue, nsDequeBenchmark)
    360 {
    361  TestMultiWriterQueueMT<DequeWrapperST>(1, 0, 2 * 1024 * 1024,
    362                                         "DequeWrapperST ");
    363 
    364  TestMultiWriterQueueMT<DequeWrapperAW>(1, 0, 2 * 1024 * 1024,
    365                                         "DequeWrapperAW ");
    366  TestMultiWriterQueueMT<DequeWrapperMW>(1, 0, 2 * 1024 * 1024,
    367                                         "DequeWrapperMW ");
    368  TestMultiWriterQueueMT<DequeWrapperMT>(1, 0, 2 * 1024 * 1024,
    369                                         "DequeWrapperMT ");
    370  TestMultiWriterQueueMT<DequeWrapperMT>(1, 1, 2 * 1024 * 1024,
    371                                         "DequeWrapperMT ");
    372 
    373  TestMultiWriterQueueMT<DequeWrapperAW>(8, 0, 2 * 1024 * 1024,
    374                                         "DequeWrapperAW ");
    375  TestMultiWriterQueueMT<DequeWrapperMW>(8, 0, 2 * 1024 * 1024,
    376                                         "DequeWrapperMW ");
    377  TestMultiWriterQueueMT<DequeWrapperMT>(8, 0, 2 * 1024 * 1024,
    378                                         "DequeWrapperMT ");
    379  TestMultiWriterQueueMT<DequeWrapperMT>(8, 1, 2 * 1024 * 1024,
    380                                         "DequeWrapperMT ");
    381 }