WorkerThread.cpp (9600B)
1 // 2 // Copyright 2016 The ANGLE Project Authors. All rights reserved. 3 // Use of this source code is governed by a BSD-style license that can be 4 // found in the LICENSE file. 5 // 6 // WorkerThread: 7 // Task running thread for ANGLE, similar to a TaskRunner in Chromium. 8 // Might be implemented differently depending on platform. 9 // 10 11 #include "libANGLE/WorkerThread.h" 12 13 #include "libANGLE/trace.h" 14 15 #if (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED) || (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED) 16 # include <condition_variable> 17 # include <future> 18 # include <mutex> 19 # include <queue> 20 # include <thread> 21 #endif // (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED) || (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED) 22 23 namespace angle 24 { 25 26 WaitableEvent::WaitableEvent() = default; 27 WaitableEvent::~WaitableEvent() = default; 28 29 void WaitableEventDone::wait() {} 30 31 bool WaitableEventDone::isReady() 32 { 33 return true; 34 } 35 36 WorkerThreadPool::WorkerThreadPool() = default; 37 WorkerThreadPool::~WorkerThreadPool() = default; 38 39 class SingleThreadedWaitableEvent final : public WaitableEvent 40 { 41 public: 42 SingleThreadedWaitableEvent() = default; 43 ~SingleThreadedWaitableEvent() override = default; 44 45 void wait() override; 46 bool isReady() override; 47 }; 48 49 void SingleThreadedWaitableEvent::wait() {} 50 51 bool SingleThreadedWaitableEvent::isReady() 52 { 53 return true; 54 } 55 56 class SingleThreadedWorkerPool final : public WorkerThreadPool 57 { 58 public: 59 std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override; 60 void setMaxThreads(size_t maxThreads) override; 61 bool isAsync() override; 62 }; 63 64 // SingleThreadedWorkerPool implementation. 65 std::shared_ptr<WaitableEvent> SingleThreadedWorkerPool::postWorkerTask( 66 std::shared_ptr<Closure> task) 67 { 68 (*task)(); 69 return std::make_shared<SingleThreadedWaitableEvent>(); 70 } 71 72 void SingleThreadedWorkerPool::setMaxThreads(size_t maxThreads) {} 73 74 bool SingleThreadedWorkerPool::isAsync() 75 { 76 return false; 77 } 78 79 #if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED) 80 class AsyncWaitableEvent final : public WaitableEvent 81 { 82 public: 83 AsyncWaitableEvent() : mIsPending(true) {} 84 ~AsyncWaitableEvent() override = default; 85 86 void wait() override; 87 bool isReady() override; 88 89 private: 90 friend class AsyncWorkerPool; 91 void setFuture(std::future<void> &&future); 92 93 // To block wait() when the task is still in queue to be run. 94 // Also to protect the concurrent accesses from both main thread and 95 // background threads to the member fields. 96 std::mutex mMutex; 97 98 bool mIsPending; 99 std::condition_variable mCondition; 100 std::future<void> mFuture; 101 }; 102 103 void AsyncWaitableEvent::setFuture(std::future<void> &&future) 104 { 105 mFuture = std::move(future); 106 } 107 108 void AsyncWaitableEvent::wait() 109 { 110 ANGLE_TRACE_EVENT0("gpu.angle", "AsyncWaitableEvent::wait"); 111 { 112 std::unique_lock<std::mutex> lock(mMutex); 113 mCondition.wait(lock, [this] { return !mIsPending; }); 114 } 115 116 ASSERT(mFuture.valid()); 117 mFuture.wait(); 118 } 119 120 bool AsyncWaitableEvent::isReady() 121 { 122 std::lock_guard<std::mutex> lock(mMutex); 123 if (mIsPending) 124 { 125 return false; 126 } 127 ASSERT(mFuture.valid()); 128 return mFuture.wait_for(std::chrono::seconds(0)) == std::future_status::ready; 129 } 130 131 class AsyncWorkerPool final : public WorkerThreadPool 132 { 133 public: 134 AsyncWorkerPool(size_t maxThreads) : mMaxThreads(maxThreads), mRunningThreads(0) {} 135 ~AsyncWorkerPool() override = default; 136 137 std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override; 138 void setMaxThreads(size_t maxThreads) override; 139 bool isAsync() override; 140 141 private: 142 void checkToRunPendingTasks(); 143 144 // To protect the concurrent accesses from both main thread and background 145 // threads to the member fields. 146 std::mutex mMutex; 147 148 size_t mMaxThreads; 149 size_t mRunningThreads; 150 std::queue<std::pair<std::shared_ptr<AsyncWaitableEvent>, std::shared_ptr<Closure>>> mTaskQueue; 151 }; 152 153 // AsyncWorkerPool implementation. 154 std::shared_ptr<WaitableEvent> AsyncWorkerPool::postWorkerTask(std::shared_ptr<Closure> task) 155 { 156 ASSERT(mMaxThreads > 0); 157 158 auto waitable = std::make_shared<AsyncWaitableEvent>(); 159 { 160 std::lock_guard<std::mutex> lock(mMutex); 161 mTaskQueue.push(std::make_pair(waitable, task)); 162 } 163 checkToRunPendingTasks(); 164 return std::move(waitable); 165 } 166 167 void AsyncWorkerPool::setMaxThreads(size_t maxThreads) 168 { 169 { 170 std::lock_guard<std::mutex> lock(mMutex); 171 mMaxThreads = (maxThreads == 0xFFFFFFFF ? std::thread::hardware_concurrency() : maxThreads); 172 } 173 checkToRunPendingTasks(); 174 } 175 176 bool AsyncWorkerPool::isAsync() 177 { 178 return true; 179 } 180 181 void AsyncWorkerPool::checkToRunPendingTasks() 182 { 183 std::lock_guard<std::mutex> lock(mMutex); 184 while (mRunningThreads < mMaxThreads && !mTaskQueue.empty()) 185 { 186 auto task = mTaskQueue.front(); 187 mTaskQueue.pop(); 188 auto waitable = task.first; 189 auto closure = task.second; 190 191 auto future = std::async(std::launch::async, [closure, this] { 192 { 193 ANGLE_TRACE_EVENT0("gpu.angle", "AsyncWorkerPool::RunTask"); 194 (*closure)(); 195 } 196 { 197 std::lock_guard<std::mutex> lock(mMutex); 198 ASSERT(mRunningThreads != 0); 199 --mRunningThreads; 200 } 201 checkToRunPendingTasks(); 202 }); 203 204 ++mRunningThreads; 205 206 { 207 std::lock_guard<std::mutex> waitableLock(waitable->mMutex); 208 waitable->mIsPending = false; 209 waitable->setFuture(std::move(future)); 210 } 211 waitable->mCondition.notify_all(); 212 } 213 } 214 #endif // (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED) 215 216 #if (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED) 217 class DelegateWaitableEvent final : public WaitableEvent 218 { 219 public: 220 DelegateWaitableEvent() = default; 221 ~DelegateWaitableEvent() override = default; 222 223 void wait() override; 224 bool isReady() override; 225 226 void markAsReady(); 227 228 private: 229 // To protect the concurrent accesses from both main thread and background 230 // threads to the member fields. 231 std::mutex mMutex; 232 233 bool mIsReady = false; 234 std::condition_variable mCondition; 235 }; 236 237 void DelegateWaitableEvent::markAsReady() 238 { 239 std::lock_guard<std::mutex> lock(mMutex); 240 mIsReady = true; 241 mCondition.notify_all(); 242 } 243 244 void DelegateWaitableEvent::wait() 245 { 246 std::unique_lock<std::mutex> lock(mMutex); 247 mCondition.wait(lock, [this] { return mIsReady; }); 248 } 249 250 bool DelegateWaitableEvent::isReady() 251 { 252 std::lock_guard<std::mutex> lock(mMutex); 253 return mIsReady; 254 } 255 256 class DelegateWorkerPool final : public WorkerThreadPool 257 { 258 public: 259 DelegateWorkerPool() = default; 260 ~DelegateWorkerPool() override = default; 261 262 std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override; 263 264 void setMaxThreads(size_t maxThreads) override; 265 bool isAsync() override; 266 }; 267 268 // A function wrapper to execute the closure and to notify the waitable 269 // event after the execution. 270 class DelegateWorkerTask 271 { 272 public: 273 DelegateWorkerTask(std::shared_ptr<Closure> task, 274 std::shared_ptr<DelegateWaitableEvent> waitable) 275 : mTask(task), mWaitable(waitable) 276 {} 277 DelegateWorkerTask() = delete; 278 DelegateWorkerTask(DelegateWorkerTask &) = delete; 279 280 static void RunTask(void *userData) 281 { 282 DelegateWorkerTask *workerTask = static_cast<DelegateWorkerTask *>(userData); 283 (*workerTask->mTask)(); 284 workerTask->mWaitable->markAsReady(); 285 286 // Delete the task after its execution. 287 delete workerTask; 288 } 289 290 private: 291 ~DelegateWorkerTask() = default; 292 293 std::shared_ptr<Closure> mTask; 294 std::shared_ptr<DelegateWaitableEvent> mWaitable; 295 }; 296 297 std::shared_ptr<WaitableEvent> DelegateWorkerPool::postWorkerTask(std::shared_ptr<Closure> task) 298 { 299 auto waitable = std::make_shared<DelegateWaitableEvent>(); 300 301 // The task will be deleted by DelegateWorkerTask::RunTask(...) after its execution. 302 DelegateWorkerTask *workerTask = new DelegateWorkerTask(task, waitable); 303 auto *platform = ANGLEPlatformCurrent(); 304 platform->postWorkerTask(platform, DelegateWorkerTask::RunTask, workerTask); 305 306 return std::move(waitable); 307 } 308 309 void DelegateWorkerPool::setMaxThreads(size_t maxThreads) {} 310 311 bool DelegateWorkerPool::isAsync() 312 { 313 return true; 314 } 315 #endif 316 317 // static 318 std::shared_ptr<WorkerThreadPool> WorkerThreadPool::Create(bool multithreaded) 319 { 320 std::shared_ptr<WorkerThreadPool> pool(nullptr); 321 322 #if (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED) 323 const bool hasPostWorkerTaskImpl = ANGLEPlatformCurrent()->postWorkerTask; 324 if (hasPostWorkerTaskImpl && multithreaded) 325 { 326 pool = std::shared_ptr<WorkerThreadPool>(new DelegateWorkerPool()); 327 } 328 #endif 329 #if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED) 330 if (!pool && multithreaded) 331 { 332 pool = std::shared_ptr<WorkerThreadPool>( 333 new AsyncWorkerPool(std::thread::hardware_concurrency())); 334 } 335 #endif 336 if (!pool) 337 { 338 return std::shared_ptr<WorkerThreadPool>(new SingleThreadedWorkerPool()); 339 } 340 return pool; 341 } 342 343 // static 344 std::shared_ptr<WaitableEvent> WorkerThreadPool::PostWorkerTask( 345 std::shared_ptr<WorkerThreadPool> pool, 346 std::shared_ptr<Closure> task) 347 { 348 std::shared_ptr<WaitableEvent> event = pool->postWorkerTask(task); 349 if (event.get()) 350 { 351 event->setWorkerThreadPool(pool); 352 } 353 return event; 354 } 355 356 } // namespace angle