TestBoundedMPSCQueue.cpp (5193B)
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */ 3 /* This Source Code Form is subject to the terms of the Mozilla Public 4 * License, v. 2.0. If a copy of the MPL was not distributed with this 5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 6 7 #include "mozilla/BoundedMPSCQueue.h" 8 #include <iostream> 9 #include <thread> 10 #include <chrono> 11 12 using namespace mozilla; 13 14 struct NativeStack { 15 void* mPCs[32]; 16 void* mSPs[32]; 17 size_t mCount; 18 size_t mTid; 19 }; 20 21 void StackWalkCallback(void* aPC, void* aSP, NativeStack* nativeStack) { 22 nativeStack->mSPs[nativeStack->mCount] = aSP; 23 nativeStack->mPCs[nativeStack->mCount] = aPC; 24 nativeStack->mCount++; 25 } 26 27 void FillNativeStack(NativeStack* aStack) { 28 StackWalkCallback((void*)0x1234, (void*)0x9876, aStack); 29 StackWalkCallback((void*)0x3456, (void*)0x5432, aStack); 30 StackWalkCallback((void*)0x7890, (void*)0x1098, aStack); 31 StackWalkCallback((void*)0x1234, (void*)0x7654, aStack); 32 StackWalkCallback((void*)0x5678, (void*)0x3210, aStack); 33 StackWalkCallback((void*)0x9012, (void*)0x9876, aStack); 34 StackWalkCallback((void*)0x1334, (void*)0x9786, aStack); 35 StackWalkCallback((void*)0x3546, (void*)0x5342, aStack); 36 StackWalkCallback((void*)0x7809, (void*)0x0198, aStack); 37 StackWalkCallback((void*)0x4123, (void*)0x7645, aStack); 38 StackWalkCallback((void*)0x5768, (void*)0x3120, aStack); 39 StackWalkCallback((void*)0x9102, (void*)0x9867, aStack); 40 StackWalkCallback((void*)0x1243, (void*)0x8976, aStack); 41 StackWalkCallback((void*)0x6345, (void*)0x4325, aStack); 42 StackWalkCallback((void*)0x8790, (void*)0x1908, aStack); 43 StackWalkCallback((void*)0x134, (void*)0x654, aStack); 44 StackWalkCallback((void*)0x567, (void*)0x320, aStack); 45 StackWalkCallback((void*)0x901, (void*)0x976, aStack); 46 } 47 48 template <size_t Capacity> 49 void BasicAPITestWithStack(BoundedMPSCQueue<NativeStack, Capacity>& aQueue, 50 size_t aCap) { 51 MOZ_RELEASE_ASSERT(aQueue.Capacity() == aCap); 52 53 NativeStack s = {.mCount = 0}; 54 FillNativeStack(&s); 55 MOZ_RELEASE_ASSERT(s.mCount == 18); 56 57 int store = -1; 58 for (size_t i = 0; i < aCap; ++i) { 59 store = aQueue.Send(s); 60 MOZ_RELEASE_ASSERT(store > 0); 61 } 62 63 int retrieve = -1; 64 for (size_t i = 0; i < aCap; ++i) { 65 NativeStack sr{}; 66 retrieve = aQueue.Recv(&sr); 67 68 MOZ_RELEASE_ASSERT(retrieve > 0); 69 MOZ_RELEASE_ASSERT(&s != &sr); 70 MOZ_RELEASE_ASSERT(s.mCount == sr.mCount); 71 72 for (size_t i = 0; i < s.mCount; ++i) { 73 MOZ_RELEASE_ASSERT(s.mPCs[i] == sr.mPCs[i]); 74 MOZ_RELEASE_ASSERT(s.mSPs[i] == sr.mSPs[i]); 75 } 76 } 77 } 78 79 template <size_t Capacity> 80 void BasicAPITestMP(BoundedMPSCQueue<NativeStack, Capacity>& aQueue, 81 size_t aThreads) { 82 MOZ_RELEASE_ASSERT(aQueue.Capacity() == 15); 83 84 std::thread consumer([&aQueue, aThreads] { 85 size_t received = 0; 86 NativeStack v{}; 87 do { 88 int deq = aQueue.Recv(&v); 89 if (deq > 0) { 90 received++; 91 } 92 std::this_thread::sleep_for(std::chrono::microseconds(10)); 93 } while (received < aThreads); 94 }); 95 96 std::thread producers[aThreads]; 97 for (size_t t = 0; t < aThreads; ++t) { 98 producers[t] = std::thread([&aQueue, t] { 99 NativeStack s = {.mCount = 0, .mTid = t}; 100 FillNativeStack(&s); 101 MOZ_RELEASE_ASSERT(s.mCount == 18); 102 103 int sent = 0; 104 // wrap in a do { } while () because Send() will return 0 on message being 105 // dropped so we want to retry 106 do { 107 std::this_thread::sleep_for(std::chrono::microseconds(5)); 108 sent = aQueue.Send(s); 109 } while (sent == 0); 110 }); 111 } 112 113 for (size_t t = 0; t < aThreads; ++t) { 114 producers[t].join(); 115 } 116 consumer.join(); 117 } 118 119 template <size_t Capacity> 120 bool testBasicApi() { 121 BoundedMPSCQueue<NativeStack, Capacity> s; 122 BasicAPITestWithStack(s, Capacity); 123 return true; 124 } 125 126 template <size_t... Capacity> 127 void testBasicApiCapacities() { 128 [[maybe_unused]] std::initializer_list<bool> _ = { 129 testBasicApi<Capacity>()...}; 130 } 131 132 int main() { 133 testBasicApiCapacities<1, 5, 7, 10, 15>(); 134 135 { 136 NativeStack e{}; 137 BoundedMPSCQueue<NativeStack, 2> deq; 138 139 // Dequeue with nothing should return 0 and not fail later 140 int retrieve = deq.Recv(&e); 141 MOZ_RELEASE_ASSERT(retrieve == 0); 142 143 NativeStack real = {.mCount = 0}; 144 FillNativeStack(&real); 145 MOZ_RELEASE_ASSERT(real.mCount == 18); 146 147 int store = deq.Send(real); 148 MOZ_RELEASE_ASSERT(store > 0); 149 store = deq.Send(real); 150 MOZ_RELEASE_ASSERT(store > 0); 151 152 // should be full we should get 0 153 store = deq.Send(real); 154 MOZ_RELEASE_ASSERT(store == 0); 155 156 // try to dequeue 157 NativeStack e1{}; 158 retrieve = deq.Recv(&e1); 159 MOZ_RELEASE_ASSERT(retrieve > 0); 160 MOZ_RELEASE_ASSERT(e1.mCount == 18); 161 162 NativeStack e2{}; 163 retrieve = deq.Recv(&e2); 164 MOZ_RELEASE_ASSERT(retrieve > 0); 165 MOZ_RELEASE_ASSERT(e2.mCount == 18); 166 167 retrieve = deq.Recv(&e); 168 MOZ_RELEASE_ASSERT(retrieve == 0); 169 } 170 171 size_t nbThreads[] = {8, 16, 64, 128, 512, 1024}; 172 for (auto threads : nbThreads) { 173 BoundedMPSCQueue<NativeStack, 15> s; 174 BasicAPITestMP(s, threads); 175 } 176 177 return 0; 178 }