TestMultiWriterQueue.cpp (14564B)
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */ 3 /* This Source Code Form is subject to the terms of the Mozilla Public 4 * License, v. 2.0. If a copy of the MPL was not distributed with this file, 5 * You can obtain one at http://mozilla.org/MPL/2.0/. */ 6 7 #include <gtest/gtest.h> 8 9 #include <type_traits> 10 11 #include "DDTimeStamp.h" 12 #include "MultiWriterQueue.h" 13 #include "mozilla/gtest/MozAssertions.h" 14 #include "nsDeque.h" 15 #include "nsIThread.h" 16 #include "nsThreadUtils.h" 17 18 using mozilla::MultiWriterQueue; 19 using mozilla::MultiWriterQueueDefaultBufferSize; 20 using mozilla::MultiWriterQueueReaderLocking_Mutex; 21 using mozilla::MultiWriterQueueReaderLocking_None; 22 23 template <size_t BufferSize> 24 static void TestMultiWriterQueueST(const int loops) { 25 using Q = MultiWriterQueue<int, BufferSize>; 26 Q q; 27 28 int pushes = 0; 29 // Go through 2 cycles of pushes&pops, to exercize reusable buffers. 30 for (int max = loops; max <= loops * 2; max *= 2) { 31 // Push all numbers. 32 for (int i = 1; i <= max; ++i) { 33 bool newBuffer = q.Push(i); 34 // A new buffer should be added at the last push of each buffer. 35 EXPECT_EQ(++pushes % BufferSize == 0, newBuffer); 36 } 37 38 // Pop numbers, should be FIFO. 39 int x = 0; 40 q.PopAll([&](int& i) { EXPECT_EQ(++x, i); }); 41 42 // We should have got all numbers. 43 EXPECT_EQ(max, x); 44 45 // Nothing left. 46 q.PopAll([&](int&) { EXPECT_TRUE(false); }); 47 } 48 } 49 50 TEST(MultiWriterQueue, SingleThreaded) 51 { 52 TestMultiWriterQueueST<1>(10); 53 TestMultiWriterQueueST<2>(10); 54 TestMultiWriterQueueST<4>(10); 55 56 TestMultiWriterQueueST<10>(9); 57 TestMultiWriterQueueST<10>(10); 58 TestMultiWriterQueueST<10>(11); 59 TestMultiWriterQueueST<10>(19); 60 TestMultiWriterQueueST<10>(20); 61 TestMultiWriterQueueST<10>(21); 62 TestMultiWriterQueueST<10>(999); 63 TestMultiWriterQueueST<10>(1000); 64 TestMultiWriterQueueST<10>(1001); 65 66 TestMultiWriterQueueST<8192>(8192 * 4 + 1); 67 } 68 69 template <typename Q> 70 static void TestMultiWriterQueueMT(int aWriterThreads, int aReaderThreads, 71 int aTotalLoops, const char* aPrintPrefix) { 72 Q q; 73 74 const int threads = aWriterThreads + aReaderThreads; 75 const int loops = aTotalLoops / aWriterThreads; 76 77 nsIThread** array = new nsIThread*[threads]; 78 79 mozilla::Atomic<int> pushThreadsCompleted{0}; 80 int pops = 0; 81 82 nsCOMPtr<nsIRunnable> popper = NS_NewRunnableFunction("MWQPopper", [&]() { 83 // int popsBefore = pops; 84 // int allocsBefore = q.AllocatedBuffersStats().mCount; 85 q.PopAll([&pops](const int& i) { ++pops; }); 86 // if (pops != popsBefore || 87 // q.AllocatedBuffersStats().mCount != allocsBefore) { 88 // printf("%s threads=1+%d loops/thread=%d pops=%d " 89 // "buffers: live=%d (w %d) reusable=%d (w %d) " 90 // "alloc=%d (w %d)\n", 91 // aPrintPrefix, 92 // aWriterThreads, 93 // loops, 94 // pops, 95 // q.LiveBuffersStats().mCount, 96 // q.LiveBuffersStats().mWatermark, 97 // q.ReusableBuffersStats().mCount, 98 // q.ReusableBuffersStats().mWatermark, 99 // q.AllocatedBuffersStats().mCount, 100 // q.AllocatedBuffersStats().mWatermark); 101 // } 102 }); 103 // Cycle through reader threads. 104 mozilla::Atomic<size_t> readerThread{0}; 105 106 double start = mozilla::ToSeconds(mozilla::DDNow()); 107 108 for (int k = 0; k < threads; k++) { 109 // First `aReaderThreads` threads to pop, all others to push. 110 if (k < aReaderThreads) { 111 nsCOMPtr<nsIThread> t; 112 nsresult rv = NS_NewNamedThread("MWQThread", getter_AddRefs(t)); 113 EXPECT_NS_SUCCEEDED(rv); 114 NS_ADDREF(array[k] = t); 115 } else { 116 nsCOMPtr<nsIThread> t; 117 nsCOMPtr<nsIRunnable> r = NS_NewRunnableFunction("MWQPusher", [&, k]() { 118 // Give a bit of breathing space to construct other threads. 119 PR_Sleep(PR_MillisecondsToInterval(100)); 120 121 for (int i = 0; i < loops; ++i) { 122 if (q.Push(k * threads + i) && aReaderThreads != 0) { 123 // Run a popper task every time we push the last element of a 124 // buffer. 125 array[++readerThread % aReaderThreads]->Dispatch( 126 popper, nsIThread::DISPATCH_NORMAL); 127 } 128 } 129 ++pushThreadsCompleted; 130 }); 131 nsresult rv = NS_NewNamedThread("MWQThread", getter_AddRefs(t), r); 132 EXPECT_NS_SUCCEEDED(rv); 133 NS_ADDREF(array[k] = t); 134 } 135 } 136 137 for (int k = threads - 1; k >= 0; k--) { 138 array[k]->Shutdown(); 139 NS_RELEASE(array[k]); 140 } 141 delete[] array; 142 143 // There may be a few more elements that haven't been read yet. 144 q.PopAll([&pops](const int& i) { ++pops; }); 145 const int pushes = aWriterThreads * loops; 146 EXPECT_EQ(pushes, pops); 147 q.PopAll([](const int& i) { EXPECT_TRUE(false); }); 148 149 double duration = mozilla::ToSeconds(mozilla::DDNow()) - start - 0.1; 150 printf( 151 "%s threads=%dw+%dr loops/thread=%d pushes=pops=%d duration=%fs " 152 "pushes/s=%f buffers: live=%d (w %d) reusable=%d (w %d) " 153 "alloc=%d (w %d)\n", 154 aPrintPrefix, aWriterThreads, aReaderThreads, loops, pushes, duration, 155 pushes / duration, q.LiveBuffersStats().mCount, 156 q.LiveBuffersStats().mWatermark, q.ReusableBuffersStats().mCount, 157 q.ReusableBuffersStats().mWatermark, q.AllocatedBuffersStats().mCount, 158 q.AllocatedBuffersStats().mWatermark); 159 } 160 161 // skip test on windows10-aarch64 due to unexpected test timeout at 162 // MultiWriterSingleReader, bug 1526001 163 #if !defined(_M_ARM64) 164 TEST(MultiWriterQueue, MultiWriterSingleReader) 165 { 166 // Small BufferSize, to exercize the buffer management code. 167 TestMultiWriterQueueMT< 168 MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_None>>( 169 1, 0, 2 * 1024 * 1024, "MultiWriterQueue<int, 10, Locking_None>"); 170 TestMultiWriterQueueMT< 171 MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_None>>( 172 1, 1, 2 * 1024 * 1024, "MultiWriterQueue<int, 10, Locking_None>"); 173 TestMultiWriterQueueMT< 174 MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_None>>( 175 2, 1, 2 * 1024 * 1024, "MultiWriterQueue<int, 10, Locking_None>"); 176 TestMultiWriterQueueMT< 177 MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_None>>( 178 3, 1, 2 * 1024 * 1024, "MultiWriterQueue<int, 10, Locking_None>"); 179 TestMultiWriterQueueMT< 180 MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_None>>( 181 4, 1, 2 * 1024 * 1024, "MultiWriterQueue<int, 10, Locking_None>"); 182 TestMultiWriterQueueMT< 183 MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_None>>( 184 5, 1, 2 * 1024 * 1024, "MultiWriterQueue<int, 10, Locking_None>"); 185 TestMultiWriterQueueMT< 186 MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_None>>( 187 6, 1, 2 * 1024 * 1024, "MultiWriterQueue<int, 10, Locking_None>"); 188 TestMultiWriterQueueMT< 189 MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_None>>( 190 7, 1, 2 * 1024 * 1024, "MultiWriterQueue<int, 10, Locking_None>"); 191 TestMultiWriterQueueMT< 192 MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_None>>( 193 8, 1, 2 * 1024 * 1024, "MultiWriterQueue<int, 10, Locking_None>"); 194 TestMultiWriterQueueMT< 195 MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_None>>( 196 9, 1, 2 * 1024 * 1024, "MultiWriterQueue<int, 10, Locking_None>"); 197 TestMultiWriterQueueMT< 198 MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_None>>( 199 10, 1, 2 * 1024 * 1024, "MultiWriterQueue<int, 10, Locking_None>"); 200 TestMultiWriterQueueMT< 201 MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_None>>( 202 16, 1, 2 * 1024 * 1024, "MultiWriterQueue<int, 10, Locking_None>"); 203 TestMultiWriterQueueMT< 204 MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_None>>( 205 32, 1, 2 * 1024 * 1024, "MultiWriterQueue<int, 10, Locking_None>"); 206 TestMultiWriterQueueMT< 207 MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_None>>( 208 64, 1, 2 * 1024 * 1024, "MultiWriterQueue<int, 10, Locking_None>"); 209 210 // A more real-life buffer size. 211 TestMultiWriterQueueMT< 212 MultiWriterQueue<int, MultiWriterQueueDefaultBufferSize, 213 MultiWriterQueueReaderLocking_None>>( 214 64, 1, 2 * 1024 * 1024, 215 "MultiWriterQueue<int, DefaultBufferSize, Locking_None>"); 216 217 // DEBUG-mode thread-safety checks should make the following (multi-reader 218 // with no locking) crash; uncomment to verify. 219 // TestMultiWriterQueueMT< 220 // MultiWriterQueue<int, MultiWriterQueueDefaultBufferSize, 221 // MultiWriterQueueReaderLocking_None>>(64, 2, 2*1024*1024); 222 } 223 #endif 224 225 // skip test on windows10-aarch64 due to unexpected test timeout at 226 // MultiWriterMultiReade, bug 1526001 227 #if !defined(_M_ARM64) 228 TEST(MultiWriterQueue, MultiWriterMultiReader) 229 { 230 static_assert( 231 std::is_same_v< 232 MultiWriterQueue<int, 10>, 233 MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_Mutex>>, 234 "MultiWriterQueue reader locking should use Mutex by default"); 235 236 // Small BufferSize, to exercize the buffer management code. 237 TestMultiWriterQueueMT< 238 MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_Mutex>>( 239 1, 2, 1024 * 1024, "MultiWriterQueue<int, 10, Locking_Mutex>"); 240 TestMultiWriterQueueMT< 241 MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_Mutex>>( 242 2, 2, 1024 * 1024, "MultiWriterQueue<int, 10, Locking_Mutex>"); 243 TestMultiWriterQueueMT< 244 MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_Mutex>>( 245 3, 2, 1024 * 1024, "MultiWriterQueue<int, 10, Locking_Mutex>"); 246 TestMultiWriterQueueMT< 247 MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_Mutex>>( 248 4, 2, 1024 * 1024, "MultiWriterQueue<int, 10, Locking_Mutex>"); 249 TestMultiWriterQueueMT< 250 MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_Mutex>>( 251 5, 2, 1024 * 1024, "MultiWriterQueue<int, 10, Locking_Mutex>"); 252 TestMultiWriterQueueMT< 253 MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_Mutex>>( 254 6, 2, 1024 * 1024, "MultiWriterQueue<int, 10, Locking_Mutex>"); 255 TestMultiWriterQueueMT< 256 MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_Mutex>>( 257 7, 2, 1024 * 1024, "MultiWriterQueue<int, 10, Locking_Mutex>"); 258 TestMultiWriterQueueMT< 259 MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_Mutex>>( 260 8, 2, 1024 * 1024, "MultiWriterQueue<int, 10, Locking_Mutex>"); 261 TestMultiWriterQueueMT< 262 MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_Mutex>>( 263 9, 2, 1024 * 1024, "MultiWriterQueue<int, 10, Locking_Mutex>"); 264 TestMultiWriterQueueMT< 265 MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_Mutex>>( 266 10, 4, 1024 * 1024, "MultiWriterQueue<int, 10, Locking_Mutex>"); 267 TestMultiWriterQueueMT< 268 MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_Mutex>>( 269 16, 8, 1024 * 1024, "MultiWriterQueue<int, 10, Locking_Mutex>"); 270 TestMultiWriterQueueMT< 271 MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_Mutex>>( 272 32, 16, 1024 * 1024, "MultiWriterQueue<int, 10, Locking_Mutex>"); 273 TestMultiWriterQueueMT< 274 MultiWriterQueue<int, 10, MultiWriterQueueReaderLocking_Mutex>>( 275 64, 32, 1024 * 1024, "MultiWriterQueue<int, 10, Locking_Mutex>"); 276 277 // A more real-life buffer size. 278 TestMultiWriterQueueMT< 279 MultiWriterQueue<int, MultiWriterQueueDefaultBufferSize, 280 MultiWriterQueueReaderLocking_Mutex>>( 281 64, 32, 1024 * 1024, 282 "MultiWriterQueue<int, DefaultBufferSize, Locking_Mutex>"); 283 } 284 #endif 285 286 // Single-threaded use only. 287 struct DequeWrapperST { 288 nsDeque<void> mDQ; 289 290 bool Push(int i) { 291 mDQ.PushFront(reinterpret_cast<void*>(static_cast<uintptr_t>(i))); 292 return true; 293 } 294 template <typename F> 295 void PopAll(F&& aF) { 296 while (mDQ.GetSize() != 0) { 297 int i = static_cast<int>(reinterpret_cast<uintptr_t>(mDQ.Pop())); 298 aF(i); 299 } 300 } 301 302 struct CountAndWatermark { 303 int mCount = 0; 304 int mWatermark = 0; 305 } mLiveBuffersStats, mReusableBuffersStats, mAllocatedBuffersStats; 306 307 CountAndWatermark LiveBuffersStats() const { return mLiveBuffersStats; } 308 CountAndWatermark ReusableBuffersStats() const { 309 return mReusableBuffersStats; 310 } 311 CountAndWatermark AllocatedBuffersStats() const { 312 return mAllocatedBuffersStats; 313 } 314 }; 315 316 // Multi-thread (atomic) writes allowed, make sure you don't pop unless writes 317 // can't happen. 318 struct DequeWrapperAW : DequeWrapperST { 319 mozilla::Atomic<bool> mWriting{false}; 320 321 bool Push(int i) { 322 while (!mWriting.compareExchange(false, true)) { 323 } 324 mDQ.PushFront(reinterpret_cast<void*>(static_cast<uintptr_t>(i))); 325 mWriting = false; 326 return true; 327 } 328 }; 329 330 // Multi-thread writes allowed, make sure you don't pop unless writes can't 331 // happen. 332 struct DequeWrapperMW : DequeWrapperST { 333 mozilla::Mutex mMutex MOZ_UNANNOTATED; 334 335 DequeWrapperMW() : mMutex("DequeWrapperMW/MT") {} 336 337 bool Push(int i) { 338 mozilla::MutexAutoLock lock(mMutex); 339 mDQ.PushFront(reinterpret_cast<void*>(static_cast<uintptr_t>(i))); 340 return true; 341 } 342 }; 343 344 // Multi-thread read&writes allowed. 345 struct DequeWrapperMT : DequeWrapperMW { 346 template <typename F> 347 void PopAll(F&& aF) { 348 while (mDQ.GetSize() != 0) { 349 int i; 350 { 351 mozilla::MutexAutoLock lock(mMutex); 352 i = static_cast<int>(reinterpret_cast<uintptr_t>(mDQ.Pop())); 353 } 354 aF(i); 355 } 356 } 357 }; 358 359 TEST(MultiWriterQueue, nsDequeBenchmark) 360 { 361 TestMultiWriterQueueMT<DequeWrapperST>(1, 0, 2 * 1024 * 1024, 362 "DequeWrapperST "); 363 364 TestMultiWriterQueueMT<DequeWrapperAW>(1, 0, 2 * 1024 * 1024, 365 "DequeWrapperAW "); 366 TestMultiWriterQueueMT<DequeWrapperMW>(1, 0, 2 * 1024 * 1024, 367 "DequeWrapperMW "); 368 TestMultiWriterQueueMT<DequeWrapperMT>(1, 0, 2 * 1024 * 1024, 369 "DequeWrapperMT "); 370 TestMultiWriterQueueMT<DequeWrapperMT>(1, 1, 2 * 1024 * 1024, 371 "DequeWrapperMT "); 372 373 TestMultiWriterQueueMT<DequeWrapperAW>(8, 0, 2 * 1024 * 1024, 374 "DequeWrapperAW "); 375 TestMultiWriterQueueMT<DequeWrapperMW>(8, 0, 2 * 1024 * 1024, 376 "DequeWrapperMW "); 377 TestMultiWriterQueueMT<DequeWrapperMT>(8, 0, 2 * 1024 * 1024, 378 "DequeWrapperMT "); 379 TestMultiWriterQueueMT<DequeWrapperMT>(8, 1, 2 * 1024 * 1024, 380 "DequeWrapperMT "); 381 }