tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

WorkerThread.cpp (9600B)


      1 //
      2 // Copyright 2016 The ANGLE Project Authors. All rights reserved.
      3 // Use of this source code is governed by a BSD-style license that can be
      4 // found in the LICENSE file.
      5 //
      6 // WorkerThread:
      7 //   Task running thread for ANGLE, similar to a TaskRunner in Chromium.
      8 //   Might be implemented differently depending on platform.
      9 //
     10 
     11 #include "libANGLE/WorkerThread.h"
     12 
     13 #include "libANGLE/trace.h"
     14 
     15 #if (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED) || (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
     16 #    include <condition_variable>
     17 #    include <future>
     18 #    include <mutex>
     19 #    include <queue>
     20 #    include <thread>
     21 #endif  // (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED) || (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
     22 
     23 namespace angle
     24 {
     25 
     26 WaitableEvent::WaitableEvent()  = default;
     27 WaitableEvent::~WaitableEvent() = default;
     28 
     29 void WaitableEventDone::wait() {}
     30 
     31 bool WaitableEventDone::isReady()
     32 {
     33    return true;
     34 }
     35 
     36 WorkerThreadPool::WorkerThreadPool()  = default;
     37 WorkerThreadPool::~WorkerThreadPool() = default;
     38 
     39 class SingleThreadedWaitableEvent final : public WaitableEvent
     40 {
     41  public:
     42    SingleThreadedWaitableEvent()           = default;
     43    ~SingleThreadedWaitableEvent() override = default;
     44 
     45    void wait() override;
     46    bool isReady() override;
     47 };
     48 
     49 void SingleThreadedWaitableEvent::wait() {}
     50 
     51 bool SingleThreadedWaitableEvent::isReady()
     52 {
     53    return true;
     54 }
     55 
     56 class SingleThreadedWorkerPool final : public WorkerThreadPool
     57 {
     58  public:
     59    std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
     60    void setMaxThreads(size_t maxThreads) override;
     61    bool isAsync() override;
     62 };
     63 
     64 // SingleThreadedWorkerPool implementation.
     65 std::shared_ptr<WaitableEvent> SingleThreadedWorkerPool::postWorkerTask(
     66    std::shared_ptr<Closure> task)
     67 {
     68    (*task)();
     69    return std::make_shared<SingleThreadedWaitableEvent>();
     70 }
     71 
     72 void SingleThreadedWorkerPool::setMaxThreads(size_t maxThreads) {}
     73 
     74 bool SingleThreadedWorkerPool::isAsync()
     75 {
     76    return false;
     77 }
     78 
     79 #if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
     80 class AsyncWaitableEvent final : public WaitableEvent
     81 {
     82  public:
     83    AsyncWaitableEvent() : mIsPending(true) {}
     84    ~AsyncWaitableEvent() override = default;
     85 
     86    void wait() override;
     87    bool isReady() override;
     88 
     89  private:
     90    friend class AsyncWorkerPool;
     91    void setFuture(std::future<void> &&future);
     92 
     93    // To block wait() when the task is still in queue to be run.
     94    // Also to protect the concurrent accesses from both main thread and
     95    // background threads to the member fields.
     96    std::mutex mMutex;
     97 
     98    bool mIsPending;
     99    std::condition_variable mCondition;
    100    std::future<void> mFuture;
    101 };
    102 
    103 void AsyncWaitableEvent::setFuture(std::future<void> &&future)
    104 {
    105    mFuture = std::move(future);
    106 }
    107 
    108 void AsyncWaitableEvent::wait()
    109 {
    110    ANGLE_TRACE_EVENT0("gpu.angle", "AsyncWaitableEvent::wait");
    111    {
    112        std::unique_lock<std::mutex> lock(mMutex);
    113        mCondition.wait(lock, [this] { return !mIsPending; });
    114    }
    115 
    116    ASSERT(mFuture.valid());
    117    mFuture.wait();
    118 }
    119 
    120 bool AsyncWaitableEvent::isReady()
    121 {
    122    std::lock_guard<std::mutex> lock(mMutex);
    123    if (mIsPending)
    124    {
    125        return false;
    126    }
    127    ASSERT(mFuture.valid());
    128    return mFuture.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
    129 }
    130 
    131 class AsyncWorkerPool final : public WorkerThreadPool
    132 {
    133  public:
    134    AsyncWorkerPool(size_t maxThreads) : mMaxThreads(maxThreads), mRunningThreads(0) {}
    135    ~AsyncWorkerPool() override = default;
    136 
    137    std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
    138    void setMaxThreads(size_t maxThreads) override;
    139    bool isAsync() override;
    140 
    141  private:
    142    void checkToRunPendingTasks();
    143 
    144    // To protect the concurrent accesses from both main thread and background
    145    // threads to the member fields.
    146    std::mutex mMutex;
    147 
    148    size_t mMaxThreads;
    149    size_t mRunningThreads;
    150    std::queue<std::pair<std::shared_ptr<AsyncWaitableEvent>, std::shared_ptr<Closure>>> mTaskQueue;
    151 };
    152 
    153 // AsyncWorkerPool implementation.
    154 std::shared_ptr<WaitableEvent> AsyncWorkerPool::postWorkerTask(std::shared_ptr<Closure> task)
    155 {
    156    ASSERT(mMaxThreads > 0);
    157 
    158    auto waitable = std::make_shared<AsyncWaitableEvent>();
    159    {
    160        std::lock_guard<std::mutex> lock(mMutex);
    161        mTaskQueue.push(std::make_pair(waitable, task));
    162    }
    163    checkToRunPendingTasks();
    164    return std::move(waitable);
    165 }
    166 
    167 void AsyncWorkerPool::setMaxThreads(size_t maxThreads)
    168 {
    169    {
    170        std::lock_guard<std::mutex> lock(mMutex);
    171        mMaxThreads = (maxThreads == 0xFFFFFFFF ? std::thread::hardware_concurrency() : maxThreads);
    172    }
    173    checkToRunPendingTasks();
    174 }
    175 
    176 bool AsyncWorkerPool::isAsync()
    177 {
    178    return true;
    179 }
    180 
    181 void AsyncWorkerPool::checkToRunPendingTasks()
    182 {
    183    std::lock_guard<std::mutex> lock(mMutex);
    184    while (mRunningThreads < mMaxThreads && !mTaskQueue.empty())
    185    {
    186        auto task = mTaskQueue.front();
    187        mTaskQueue.pop();
    188        auto waitable = task.first;
    189        auto closure  = task.second;
    190 
    191        auto future = std::async(std::launch::async, [closure, this] {
    192            {
    193                ANGLE_TRACE_EVENT0("gpu.angle", "AsyncWorkerPool::RunTask");
    194                (*closure)();
    195            }
    196            {
    197                std::lock_guard<std::mutex> lock(mMutex);
    198                ASSERT(mRunningThreads != 0);
    199                --mRunningThreads;
    200            }
    201            checkToRunPendingTasks();
    202        });
    203 
    204        ++mRunningThreads;
    205 
    206        {
    207            std::lock_guard<std::mutex> waitableLock(waitable->mMutex);
    208            waitable->mIsPending = false;
    209            waitable->setFuture(std::move(future));
    210        }
    211        waitable->mCondition.notify_all();
    212    }
    213 }
    214 #endif  // (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
    215 
    216 #if (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED)
    217 class DelegateWaitableEvent final : public WaitableEvent
    218 {
    219  public:
    220    DelegateWaitableEvent()           = default;
    221    ~DelegateWaitableEvent() override = default;
    222 
    223    void wait() override;
    224    bool isReady() override;
    225 
    226    void markAsReady();
    227 
    228  private:
    229    // To protect the concurrent accesses from both main thread and background
    230    // threads to the member fields.
    231    std::mutex mMutex;
    232 
    233    bool mIsReady = false;
    234    std::condition_variable mCondition;
    235 };
    236 
    237 void DelegateWaitableEvent::markAsReady()
    238 {
    239    std::lock_guard<std::mutex> lock(mMutex);
    240    mIsReady = true;
    241    mCondition.notify_all();
    242 }
    243 
    244 void DelegateWaitableEvent::wait()
    245 {
    246    std::unique_lock<std::mutex> lock(mMutex);
    247    mCondition.wait(lock, [this] { return mIsReady; });
    248 }
    249 
    250 bool DelegateWaitableEvent::isReady()
    251 {
    252    std::lock_guard<std::mutex> lock(mMutex);
    253    return mIsReady;
    254 }
    255 
    256 class DelegateWorkerPool final : public WorkerThreadPool
    257 {
    258  public:
    259    DelegateWorkerPool()           = default;
    260    ~DelegateWorkerPool() override = default;
    261 
    262    std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
    263 
    264    void setMaxThreads(size_t maxThreads) override;
    265    bool isAsync() override;
    266 };
    267 
    268 // A function wrapper to execute the closure and to notify the waitable
    269 // event after the execution.
    270 class DelegateWorkerTask
    271 {
    272  public:
    273    DelegateWorkerTask(std::shared_ptr<Closure> task,
    274                       std::shared_ptr<DelegateWaitableEvent> waitable)
    275        : mTask(task), mWaitable(waitable)
    276    {}
    277    DelegateWorkerTask()                     = delete;
    278    DelegateWorkerTask(DelegateWorkerTask &) = delete;
    279 
    280    static void RunTask(void *userData)
    281    {
    282        DelegateWorkerTask *workerTask = static_cast<DelegateWorkerTask *>(userData);
    283        (*workerTask->mTask)();
    284        workerTask->mWaitable->markAsReady();
    285 
    286        // Delete the task after its execution.
    287        delete workerTask;
    288    }
    289 
    290  private:
    291    ~DelegateWorkerTask() = default;
    292 
    293    std::shared_ptr<Closure> mTask;
    294    std::shared_ptr<DelegateWaitableEvent> mWaitable;
    295 };
    296 
    297 std::shared_ptr<WaitableEvent> DelegateWorkerPool::postWorkerTask(std::shared_ptr<Closure> task)
    298 {
    299    auto waitable = std::make_shared<DelegateWaitableEvent>();
    300 
    301    // The task will be deleted by DelegateWorkerTask::RunTask(...) after its execution.
    302    DelegateWorkerTask *workerTask = new DelegateWorkerTask(task, waitable);
    303    auto *platform                 = ANGLEPlatformCurrent();
    304    platform->postWorkerTask(platform, DelegateWorkerTask::RunTask, workerTask);
    305 
    306    return std::move(waitable);
    307 }
    308 
    309 void DelegateWorkerPool::setMaxThreads(size_t maxThreads) {}
    310 
    311 bool DelegateWorkerPool::isAsync()
    312 {
    313    return true;
    314 }
    315 #endif
    316 
    317 // static
    318 std::shared_ptr<WorkerThreadPool> WorkerThreadPool::Create(bool multithreaded)
    319 {
    320    std::shared_ptr<WorkerThreadPool> pool(nullptr);
    321 
    322 #if (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED)
    323    const bool hasPostWorkerTaskImpl = ANGLEPlatformCurrent()->postWorkerTask;
    324    if (hasPostWorkerTaskImpl && multithreaded)
    325    {
    326        pool = std::shared_ptr<WorkerThreadPool>(new DelegateWorkerPool());
    327    }
    328 #endif
    329 #if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
    330    if (!pool && multithreaded)
    331    {
    332        pool = std::shared_ptr<WorkerThreadPool>(
    333            new AsyncWorkerPool(std::thread::hardware_concurrency()));
    334    }
    335 #endif
    336    if (!pool)
    337    {
    338        return std::shared_ptr<WorkerThreadPool>(new SingleThreadedWorkerPool());
    339    }
    340    return pool;
    341 }
    342 
    343 // static
    344 std::shared_ptr<WaitableEvent> WorkerThreadPool::PostWorkerTask(
    345    std::shared_ptr<WorkerThreadPool> pool,
    346    std::shared_ptr<Closure> task)
    347 {
    348    std::shared_ptr<WaitableEvent> event = pool->postWorkerTask(task);
    349    if (event.get())
    350    {
    351        event->setWorkerThreadPool(pool);
    352    }
    353    return event;
    354 }
    355 
    356 }  // namespace angle