task_queue_test.cc (12529B)
1 /* 2 * Copyright 2019 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 #include "api/task_queue/task_queue_test.h" 11 12 #include <cstdint> 13 #include <memory> 14 #include <utility> 15 #include <vector> 16 17 #include "absl/cleanup/cleanup.h" 18 #include "absl/strings/string_view.h" 19 #include "api/ref_count.h" 20 #include "api/task_queue/task_queue_base.h" 21 #include "api/task_queue/task_queue_factory.h" 22 #include "api/units/time_delta.h" 23 #include "rtc_base/event.h" 24 #include "rtc_base/ref_counter.h" 25 #include "rtc_base/time_utils.h" 26 #include "test/gtest.h" 27 28 namespace webrtc { 29 namespace { 30 31 // Avoids a dependency to system_wrappers. 32 void SleepFor(TimeDelta duration) { 33 ScopedAllowBaseSyncPrimitivesForTesting allow; 34 Event event; 35 event.Wait(duration); 36 } 37 38 std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue( 39 const std::unique_ptr<TaskQueueFactory>& factory, 40 absl::string_view task_queue_name, 41 TaskQueueFactory::Priority priority = TaskQueueFactory::Priority::NORMAL) { 42 return factory->CreateTaskQueue(task_queue_name, priority); 43 } 44 45 TEST_P(TaskQueueTest, Construct) { 46 std::unique_ptr<TaskQueueFactory> factory = GetParam()(nullptr); 47 auto queue = CreateTaskQueue(factory, "Construct"); 48 EXPECT_FALSE(queue->IsCurrent()); 49 } 50 51 TEST_P(TaskQueueTest, PostAndCheckCurrent) { 52 std::unique_ptr<TaskQueueFactory> factory = GetParam()(nullptr); 53 Event event; 54 auto queue = CreateTaskQueue(factory, "PostAndCheckCurrent"); 55 56 // We're not running a task, so `queue` shouldn't be current. 57 // Note that because Thread also supports the TQ interface and 58 // TestMainImpl::Init wraps the main test thread (bugs.webrtc.org/9714), that 59 // means that TaskQueueBase::Current() will still return a valid value. 60 EXPECT_FALSE(queue->IsCurrent()); 61 62 queue->PostTask([&event, &queue] { 63 EXPECT_TRUE(queue->IsCurrent()); 64 event.Set(); 65 }); 66 EXPECT_TRUE(event.Wait(TimeDelta::Seconds(1))); 67 } 68 69 TEST_P(TaskQueueTest, PostCustomTask) { 70 std::unique_ptr<TaskQueueFactory> factory = GetParam()(nullptr); 71 Event ran; 72 auto queue = CreateTaskQueue(factory, "PostCustomImplementation"); 73 74 class CustomTask { 75 public: 76 explicit CustomTask(Event* ran) : ran_(ran) {} 77 78 void operator()() { ran_->Set(); } 79 80 private: 81 Event* const ran_; 82 } my_task(&ran); 83 84 queue->PostTask(my_task); 85 EXPECT_TRUE(ran.Wait(TimeDelta::Seconds(1))); 86 } 87 88 TEST_P(TaskQueueTest, PostDelayedZero) { 89 std::unique_ptr<TaskQueueFactory> factory = GetParam()(nullptr); 90 Event event; 91 auto queue = CreateTaskQueue(factory, "PostDelayedZero"); 92 93 queue->PostDelayedTask([&event] { event.Set(); }, TimeDelta::Zero()); 94 EXPECT_TRUE(event.Wait(TimeDelta::Seconds(1))); 95 } 96 97 TEST_P(TaskQueueTest, PostFromQueue) { 98 std::unique_ptr<TaskQueueFactory> factory = GetParam()(nullptr); 99 Event event; 100 auto queue = CreateTaskQueue(factory, "PostFromQueue"); 101 102 queue->PostTask( 103 [&event, &queue] { queue->PostTask([&event] { event.Set(); }); }); 104 EXPECT_TRUE(event.Wait(TimeDelta::Seconds(1))); 105 } 106 107 TEST_P(TaskQueueTest, PostDelayed) { 108 std::unique_ptr<TaskQueueFactory> factory = GetParam()(nullptr); 109 Event event; 110 auto queue = 111 CreateTaskQueue(factory, "PostDelayed", TaskQueueFactory::Priority::HIGH); 112 113 int64_t start = TimeMillis(); 114 queue->PostDelayedTask( 115 [&event, &queue] { 116 EXPECT_TRUE(queue->IsCurrent()); 117 event.Set(); 118 }, 119 TimeDelta::Millis(100)); 120 EXPECT_TRUE(event.Wait(TimeDelta::Seconds(1))); 121 int64_t end = TimeMillis(); 122 // These tests are a little relaxed due to how "powerful" our test bots can 123 // be. Most recently we've seen windows bots fire the callback after 94-99ms, 124 // which is why we have a little bit of leeway backwards as well. 125 EXPECT_GE(end - start, 90u); 126 EXPECT_NEAR(end - start, 190u, 100u); // Accept 90-290. 127 } 128 129 TEST_P(TaskQueueTest, PostMultipleDelayed) { 130 std::unique_ptr<TaskQueueFactory> factory = GetParam()(nullptr); 131 auto queue = CreateTaskQueue(factory, "PostMultipleDelayed"); 132 133 std::vector<Event> events(100); 134 for (int i = 0; i < 100; ++i) { 135 Event* event = &events[i]; 136 queue->PostDelayedTask( 137 [event, &queue] { 138 EXPECT_TRUE(queue->IsCurrent()); 139 event->Set(); 140 }, 141 TimeDelta::Millis(i)); 142 } 143 144 for (Event& e : events) 145 EXPECT_TRUE(e.Wait(TimeDelta::Seconds(1))); 146 } 147 148 TEST_P(TaskQueueTest, PostDelayedAfterDestruct) { 149 std::unique_ptr<TaskQueueFactory> factory = GetParam()(nullptr); 150 Event run; 151 Event deleted; 152 auto queue = CreateTaskQueue(factory, "PostDelayedAfterDestruct"); 153 absl::Cleanup cleanup = [&deleted] { deleted.Set(); }; 154 queue->PostDelayedTask([&run, cleanup = std::move(cleanup)] { run.Set(); }, 155 TimeDelta::Millis(100)); 156 // Destroy the queue. 157 queue = nullptr; 158 // Task might outlive the TaskQueue, but still should be deleted. 159 EXPECT_TRUE(deleted.Wait(TimeDelta::Seconds(1))); 160 EXPECT_FALSE(run.Wait(TimeDelta::Zero())); // and should not run. 161 } 162 163 TEST_P(TaskQueueTest, PostDelayedHighPrecisionAfterDestruct) { 164 std::unique_ptr<TaskQueueFactory> factory = GetParam()(nullptr); 165 Event run; 166 Event deleted; 167 auto queue = 168 CreateTaskQueue(factory, "PostDelayedHighPrecisionAfterDestruct"); 169 absl::Cleanup cleanup = [&deleted] { deleted.Set(); }; 170 queue->PostDelayedHighPrecisionTask( 171 [&run, cleanup = std::move(cleanup)] { run.Set(); }, 172 TimeDelta::Millis(100)); 173 // Destroy the queue. 174 queue = nullptr; 175 // Task might outlive the TaskQueue, but still should be deleted. 176 EXPECT_TRUE(deleted.Wait(TimeDelta::Seconds(1))); 177 EXPECT_FALSE(run.Wait(TimeDelta::Zero())); // and should not run. 178 } 179 180 TEST_P(TaskQueueTest, PostedUnexecutedClosureDestroyedOnTaskQueue) { 181 std::unique_ptr<TaskQueueFactory> factory = GetParam()(nullptr); 182 auto queue = 183 CreateTaskQueue(factory, "PostedUnexecutedClosureDestroyedOnTaskQueue"); 184 TaskQueueBase* queue_ptr = queue.get(); 185 queue->PostTask([] { SleepFor(TimeDelta::Millis(100)); }); 186 // Give the task queue a chance to start executing the first lambda. 187 SleepFor(TimeDelta::Millis(10)); 188 Event finished; 189 // Then ensure the next lambda (which is likely not executing yet) is 190 // destroyed in the task queue context when the queue is deleted. 191 auto cleanup = absl::Cleanup([queue_ptr, &finished] { 192 EXPECT_EQ(queue_ptr, TaskQueueBase::Current()); 193 finished.Set(); 194 }); 195 queue->PostTask([cleanup = std::move(cleanup)] {}); 196 queue = nullptr; 197 finished.Wait(TimeDelta::Seconds(1)); 198 } 199 200 TEST_P(TaskQueueTest, PostedClosureDestroyedOnTaskQueue) { 201 std::unique_ptr<TaskQueueFactory> factory = GetParam()(nullptr); 202 auto queue = CreateTaskQueue(factory, "PostedClosureDestroyedOnTaskQueue"); 203 TaskQueueBase* queue_ptr = queue.get(); 204 Event finished; 205 auto cleanup = absl::Cleanup([queue_ptr, &finished] { 206 EXPECT_EQ(queue_ptr, TaskQueueBase::Current()); 207 finished.Set(); 208 }); 209 // The cleanup task may or may not have had time to execute when the task 210 // queue is destroyed. Regardless, the task should be destroyed on the 211 // queue. 212 queue->PostTask([cleanup = std::move(cleanup)] {}); 213 queue = nullptr; 214 finished.Wait(TimeDelta::Seconds(1)); 215 } 216 217 TEST_P(TaskQueueTest, PostedExecutedClosureDestroyedOnTaskQueue) { 218 std::unique_ptr<TaskQueueFactory> factory = GetParam()(nullptr); 219 auto queue = 220 CreateTaskQueue(factory, "PostedExecutedClosureDestroyedOnTaskQueue"); 221 TaskQueueBase* queue_ptr = queue.get(); 222 // Ensure an executed lambda is destroyed on the task queue. 223 Event finished; 224 queue->PostTask([cleanup = absl::Cleanup([queue_ptr, &finished] { 225 EXPECT_EQ(queue_ptr, TaskQueueBase::Current()); 226 finished.Set(); 227 })] {}); 228 finished.Wait(TimeDelta::Seconds(1)); 229 } 230 231 TEST_P(TaskQueueTest, PostAndReuse) { 232 std::unique_ptr<TaskQueueFactory> factory = GetParam()(nullptr); 233 Event event; 234 auto post_queue = CreateTaskQueue(factory, "PostQueue"); 235 auto reply_queue = CreateTaskQueue(factory, "ReplyQueue"); 236 237 int call_count = 0; 238 239 class ReusedTask { 240 public: 241 ReusedTask(int* counter, TaskQueueBase* reply_queue, Event* event) 242 : counter_(*counter), reply_queue_(reply_queue), event_(*event) { 243 EXPECT_EQ(counter_, 0); 244 } 245 ReusedTask(ReusedTask&&) = default; 246 ReusedTask& operator=(ReusedTask&&) = delete; 247 248 void operator()() && { 249 if (++counter_ == 1) { 250 reply_queue_->PostTask(std::move(*this)); 251 // At this point, the object is in the moved-from state. 252 } else { 253 EXPECT_EQ(counter_, 2); 254 EXPECT_TRUE(reply_queue_->IsCurrent()); 255 event_.Set(); 256 } 257 } 258 259 private: 260 int& counter_; 261 TaskQueueBase* const reply_queue_; 262 Event& event_; 263 }; 264 265 ReusedTask task(&call_count, reply_queue.get(), &event); 266 post_queue->PostTask(std::move(task)); 267 EXPECT_TRUE(event.Wait(TimeDelta::Seconds(1))); 268 } 269 270 TEST_P(TaskQueueTest, PostALot) { 271 // Waits until DecrementCount called `count` times. Thread safe. 272 class BlockingCounter { 273 public: 274 explicit BlockingCounter(int initial_count) : count_(initial_count) {} 275 276 void DecrementCount() { 277 if (count_.DecRef() == RefCountReleaseStatus::kDroppedLastRef) { 278 event_.Set(); 279 } 280 } 281 bool Wait(TimeDelta give_up_after) { return event_.Wait(give_up_after); } 282 283 private: 284 webrtc_impl::RefCounter count_; 285 Event event_; 286 }; 287 288 std::unique_ptr<TaskQueueFactory> factory = GetParam()(nullptr); 289 static constexpr int kTaskCount = 0xffff; 290 Event posting_done; 291 BlockingCounter all_destroyed(kTaskCount); 292 293 int tasks_executed = 0; 294 auto task_queue = CreateTaskQueue(factory, "PostALot"); 295 296 task_queue->PostTask([&] { 297 // Post tasks from the queue to guarantee that the 1st task won't be 298 // executed before the last one is posted. 299 for (int i = 0; i < kTaskCount; ++i) { 300 absl::Cleanup cleanup = [&] { all_destroyed.DecrementCount(); }; 301 task_queue->PostTask([&tasks_executed, cleanup = std::move(cleanup)] { 302 ++tasks_executed; 303 }); 304 } 305 306 posting_done.Set(); 307 }); 308 309 // Before destroying the task queue wait until all child tasks are posted. 310 posting_done.Wait(Event::kForever); 311 // Destroy the task queue. 312 task_queue = nullptr; 313 314 // Expect all tasks are destroyed eventually. In some task queue 315 // implementations that might happen on a different thread after task queue is 316 // destroyed. 317 EXPECT_TRUE(all_destroyed.Wait(TimeDelta::Minutes(1))); 318 EXPECT_LE(tasks_executed, kTaskCount); 319 } 320 321 // Test posting two tasks that have shared state not protected by a 322 // lock. The TaskQueue should guarantee memory read-write order and 323 // FIFO task execution order, so the second task should always see the 324 // changes that were made by the first task. 325 // 326 // If the TaskQueue doesn't properly synchronize the execution of 327 // tasks, there will be a data race, which is undefined behavior. The 328 // EXPECT calls may randomly catch this, but to make the most of this 329 // unit test, run it under TSan or some other tool that is able to 330 // directly detect data races. 331 TEST_P(TaskQueueTest, PostTwoWithSharedUnprotectedState) { 332 std::unique_ptr<TaskQueueFactory> factory = GetParam()(nullptr); 333 struct SharedState { 334 // First task will set this value to 1 and second will assert it. 335 int state = 0; 336 } state; 337 338 auto queue = CreateTaskQueue(factory, "PostTwoWithSharedUnprotectedState"); 339 Event done; 340 queue->PostTask([&state, &queue, &done] { 341 // Post tasks from queue to guarantee, that 1st task won't be 342 // executed before the second one will be posted. 343 queue->PostTask([&state] { state.state = 1; }); 344 queue->PostTask([&state, &done] { 345 EXPECT_EQ(state.state, 1); 346 done.Set(); 347 }); 348 // Check, that state changing tasks didn't start yet. 349 EXPECT_EQ(state.state, 0); 350 }); 351 EXPECT_TRUE(done.Wait(TimeDelta::Seconds(1))); 352 } 353 354 // TaskQueueTest is a set of tests for any implementation of the TaskQueueBase. 355 // Tests are instantiated next to the concrete implementation(s). 356 // https://github.com/google/googletest/blob/master/googletest/docs/advanced.md#creating-value-parameterized-abstract-tests 357 GTEST_ALLOW_UNINSTANTIATED_PARAMETERIZED_TEST(TaskQueueTest); 358 359 } // namespace 360 } // namespace webrtc