TestSPSCQueue.cpp (8064B)
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */ 3 /* This Source Code Form is subject to the terms of the Mozilla Public 4 * License, v. 2.0. If a copy of the MPL was not distributed with this 5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 6 7 #include "mozilla/SPSCQueue.h" 8 #include "mozilla/PodOperations.h" 9 #include <vector> 10 #include <iostream> 11 #include <thread> 12 #include <chrono> 13 #include <memory> 14 #include <string> 15 16 #ifdef _WIN32 17 # include <windows.h> 18 #endif 19 20 using namespace mozilla; 21 22 /* Generate a monotonically increasing sequence of numbers. */ 23 template <typename T> 24 class SequenceGenerator { 25 public: 26 SequenceGenerator() = default; 27 void Get(T* aElements, size_t aCount) { 28 for (size_t i = 0; i < aCount; i++) { 29 aElements[i] = static_cast<T>(mIndex); 30 mIndex++; 31 } 32 } 33 void Rewind(size_t aCount) { mIndex -= aCount; } 34 35 private: 36 size_t mIndex = 0; 37 }; 38 39 /* Checks that a sequence is monotonically increasing. */ 40 template <typename T> 41 class SequenceVerifier { 42 public: 43 SequenceVerifier() = default; 44 void Check(T* aElements, size_t aCount) { 45 for (size_t i = 0; i < aCount; i++) { 46 if (aElements[i] != static_cast<T>(mIndex)) { 47 std::cerr << "Element " << i << " is different. Expected " 48 << static_cast<T>(mIndex) << ", got " << aElements[i] << "." 49 << std::endl; 50 MOZ_RELEASE_ASSERT(false); 51 } 52 mIndex++; 53 } 54 } 55 56 private: 57 size_t mIndex = 0; 58 }; 59 60 const int BLOCK_SIZE = 127; 61 62 template <typename T> 63 void TestRing(int capacity) { 64 SPSCQueue<T> buf(capacity); 65 std::unique_ptr<T[]> seq(new T[capacity]); 66 SequenceGenerator<T> gen; 67 SequenceVerifier<T> checker; 68 69 int iterations = 1002; 70 71 while (iterations--) { 72 gen.Get(seq.get(), BLOCK_SIZE); 73 int rv = buf.Enqueue(seq.get(), BLOCK_SIZE); 74 MOZ_RELEASE_ASSERT(rv == BLOCK_SIZE); 75 PodZero(seq.get(), BLOCK_SIZE); 76 rv = buf.Dequeue(seq.get(), BLOCK_SIZE); 77 MOZ_RELEASE_ASSERT(rv == BLOCK_SIZE); 78 checker.Check(seq.get(), BLOCK_SIZE); 79 } 80 } 81 82 void Delay() { 83 // On Windows and x86 Android, the timer resolution is so bad that, even if 84 // we used `timeBeginPeriod(1)`, any nonzero sleep from the test's inner loops 85 // would make this program take far too long. 86 #ifdef _WIN32 87 Sleep(0); 88 #elif defined(ANDROID) 89 std::this_thread::sleep_for(std::chrono::microseconds(0)); 90 #else 91 std::this_thread::sleep_for(std::chrono::microseconds(10)); 92 #endif 93 } 94 95 template <typename T> 96 void TestRingMultiThread(int capacity) { 97 SPSCQueue<T> buf(capacity); 98 SequenceVerifier<T> checker; 99 std::unique_ptr<T[]> outBuffer(new T[capacity]); 100 101 std::thread t([&buf, capacity] { 102 int iterations = 1002; 103 std::unique_ptr<T[]> inBuffer(new T[capacity]); 104 SequenceGenerator<T> gen; 105 106 while (iterations--) { 107 Delay(); 108 gen.Get(inBuffer.get(), BLOCK_SIZE); 109 int rv = buf.Enqueue(inBuffer.get(), BLOCK_SIZE); 110 MOZ_RELEASE_ASSERT(rv <= BLOCK_SIZE); 111 if (rv != BLOCK_SIZE) { 112 gen.Rewind(BLOCK_SIZE - rv); 113 } 114 } 115 }); 116 117 int remaining = 1002; 118 119 while (remaining--) { 120 Delay(); 121 int rv = buf.Dequeue(outBuffer.get(), BLOCK_SIZE); 122 MOZ_RELEASE_ASSERT(rv <= BLOCK_SIZE); 123 checker.Check(outBuffer.get(), rv); 124 } 125 126 t.join(); 127 } 128 129 template <typename T> 130 void BasicAPITest(T& ring) { 131 MOZ_RELEASE_ASSERT(ring.Capacity() == 128); 132 133 MOZ_RELEASE_ASSERT(ring.AvailableRead() == 0); 134 MOZ_RELEASE_ASSERT(ring.AvailableWrite() == 128); 135 136 int rv = ring.EnqueueDefault(63); 137 138 MOZ_RELEASE_ASSERT(rv == 63); 139 MOZ_RELEASE_ASSERT(ring.AvailableRead() == 63); 140 MOZ_RELEASE_ASSERT(ring.AvailableWrite() == 65); 141 142 rv = ring.EnqueueDefault(65); 143 144 MOZ_RELEASE_ASSERT(rv == 65); 145 MOZ_RELEASE_ASSERT(ring.AvailableRead() == 128); 146 MOZ_RELEASE_ASSERT(ring.AvailableWrite() == 0); 147 148 rv = ring.Dequeue(nullptr, 63); 149 150 MOZ_RELEASE_ASSERT(ring.AvailableRead() == 65); 151 MOZ_RELEASE_ASSERT(ring.AvailableWrite() == 63); 152 153 rv = ring.Dequeue(nullptr, 65); 154 155 MOZ_RELEASE_ASSERT(ring.AvailableRead() == 0); 156 MOZ_RELEASE_ASSERT(ring.AvailableWrite() == 128); 157 } 158 159 const size_t RING_BUFFER_SIZE = 128; 160 const size_t ENQUEUE_SIZE = RING_BUFFER_SIZE / 2; 161 162 void TestResetAPI() { 163 SPSCQueue<float> ring(RING_BUFFER_SIZE); 164 std::thread p([&ring] { 165 std::unique_ptr<float[]> inBuffer(new float[ENQUEUE_SIZE]); 166 int rv = ring.Enqueue(inBuffer.get(), ENQUEUE_SIZE); 167 MOZ_RELEASE_ASSERT(rv > 0); 168 }); 169 170 p.join(); 171 172 std::thread c([&ring] { 173 std::unique_ptr<float[]> outBuffer(new float[ENQUEUE_SIZE]); 174 int rv = ring.Dequeue(outBuffer.get(), ENQUEUE_SIZE); 175 MOZ_RELEASE_ASSERT(rv > 0); 176 }); 177 178 c.join(); 179 180 // Enqueue with a different thread. We reset the thread ID in the ring buffer, 181 // this should work. 182 std::thread p2([&ring] { 183 ring.ResetProducerThreadId(); 184 std::unique_ptr<float[]> inBuffer(new float[ENQUEUE_SIZE]); 185 int rv = ring.Enqueue(inBuffer.get(), ENQUEUE_SIZE); 186 MOZ_RELEASE_ASSERT(rv > 0); 187 }); 188 189 p2.join(); 190 191 // Dequeue with a different thread. We reset the thread ID in the ring buffer, 192 // this should work. 193 std::thread c2([&ring] { 194 ring.ResetConsumerThreadId(); 195 std::unique_ptr<float[]> outBuffer(new float[ENQUEUE_SIZE]); 196 int rv = ring.Dequeue(outBuffer.get(), ENQUEUE_SIZE); 197 MOZ_RELEASE_ASSERT(rv > 0); 198 }); 199 200 c2.join(); 201 202 // Similarly, but do the Enqueues without a Dequeue in between, since a 203 // Dequeue could affect memory ordering. 204 std::thread p4; 205 std::thread p3([&] { 206 ring.ResetProducerThreadId(); 207 std::unique_ptr<float[]> inBuffer(new float[ENQUEUE_SIZE]); 208 int rv = ring.Enqueue(inBuffer.get(), ENQUEUE_SIZE); 209 MOZ_RELEASE_ASSERT(rv > 0); 210 p4 = std::thread([&ring] { 211 ring.ResetProducerThreadId(); 212 std::unique_ptr<float[]> inBuffer(new float[ENQUEUE_SIZE]); 213 int rv = ring.Enqueue(inBuffer.get(), ENQUEUE_SIZE); 214 MOZ_RELEASE_ASSERT(rv > 0); 215 }); 216 }); 217 218 p3.join(); 219 p4.join(); 220 221 std::thread c4; 222 std::thread c3([&] { 223 ring.ResetConsumerThreadId(); 224 std::unique_ptr<float[]> outBuffer(new float[ENQUEUE_SIZE]); 225 int rv = ring.Dequeue(outBuffer.get(), ENQUEUE_SIZE); 226 MOZ_RELEASE_ASSERT(rv > 0); 227 c4 = std::thread([&ring] { 228 ring.ResetConsumerThreadId(); 229 std::unique_ptr<float[]> outBuffer(new float[ENQUEUE_SIZE]); 230 int rv = ring.Dequeue(outBuffer.get(), ENQUEUE_SIZE); 231 MOZ_RELEASE_ASSERT(rv > 0); 232 }); 233 }); 234 235 c3.join(); 236 c4.join(); 237 } 238 239 void TestMove() { 240 const size_t ELEMENT_COUNT = 16; 241 struct Thing { 242 Thing() : mStr("") {} 243 explicit Thing(const std::string& aStr) : mStr(aStr) {} 244 Thing(Thing&& aOtherThing) { 245 mStr = std::move(aOtherThing.mStr); 246 // aOtherThing.mStr.clear(); 247 } 248 Thing& operator=(Thing&& aOtherThing) { 249 mStr = std::move(aOtherThing.mStr); 250 return *this; 251 } 252 std::string mStr; 253 }; 254 255 std::vector<Thing> vec_in; 256 std::vector<Thing> vec_out; 257 258 for (uint32_t i = 0; i < ELEMENT_COUNT; i++) { 259 vec_in.push_back(Thing(std::to_string(i))); 260 vec_out.push_back(Thing()); 261 } 262 263 SPSCQueue<Thing> queue(ELEMENT_COUNT); 264 265 int rv = queue.Enqueue(&vec_in[0], ELEMENT_COUNT); 266 MOZ_RELEASE_ASSERT(rv == ELEMENT_COUNT); 267 268 // Check that we've moved the std::string into the queue. 269 for (uint32_t i = 0; i < ELEMENT_COUNT; i++) { 270 MOZ_RELEASE_ASSERT(vec_in[i].mStr.empty()); 271 } 272 273 rv = queue.Dequeue(&vec_out[0], ELEMENT_COUNT); 274 MOZ_RELEASE_ASSERT(rv == ELEMENT_COUNT); 275 276 for (uint32_t i = 0; i < ELEMENT_COUNT; i++) { 277 MOZ_RELEASE_ASSERT(std::stoul(vec_out[i].mStr) == i); 278 } 279 } 280 281 int main() { 282 const int minCapacity = 199; 283 const int maxCapacity = 1277; 284 const int capacityIncrement = 27; 285 286 SPSCQueue<float> q1(128); 287 BasicAPITest(q1); 288 SPSCQueue<char> q2(128); 289 BasicAPITest(q2); 290 291 for (uint32_t i = minCapacity; i < maxCapacity; i += capacityIncrement) { 292 TestRing<uint32_t>(i); 293 TestRingMultiThread<uint32_t>(i); 294 TestRing<float>(i); 295 TestRingMultiThread<float>(i); 296 } 297 298 TestResetAPI(); 299 TestMove(); 300 301 return 0; 302 }