tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

prtpool.c (29322B)


      1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
      2 /* This Source Code Form is subject to the terms of the Mozilla Public
      3 * License, v. 2.0. If a copy of the MPL was not distributed with this
      4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
      5 
      6 #include "nspr.h"
      7 
      8 /*
      9 * Thread pools
     10 *  Thread pools create and manage threads to provide support for
     11 *  scheduling jobs onto one or more threads.
     12 *
     13 */
     14 #ifdef OPT_WINNT
     15 #  include <windows.h>
     16 #endif
     17 
     18 /*
     19 * worker thread
     20 */
     21 typedef struct wthread {
     22  PRCList links;
     23  PRThread* thread;
     24 } wthread;
     25 
     26 /*
     27 * queue of timer jobs
     28 */
     29 typedef struct timer_jobq {
     30  PRCList list;
     31  PRLock* lock;
     32  PRCondVar* cv;
     33  PRInt32 cnt;
     34  PRCList wthreads;
     35 } timer_jobq;
     36 
     37 /*
     38 * queue of jobs
     39 */
     40 typedef struct tp_jobq {
     41  PRCList list;
     42  PRInt32 cnt;
     43  PRLock* lock;
     44  PRCondVar* cv;
     45  PRCList wthreads;
     46 #ifdef OPT_WINNT
     47  HANDLE nt_completion_port;
     48 #endif
     49 } tp_jobq;
     50 
     51 /*
     52 * queue of IO jobs
     53 */
     54 typedef struct io_jobq {
     55  PRCList list;
     56  PRPollDesc* pollfds;
     57  PRInt32 npollfds;
     58  PRJob** polljobs;
     59  PRLock* lock;
     60  PRInt32 cnt;
     61  PRFileDesc* notify_fd;
     62  PRCList wthreads;
     63 } io_jobq;
     64 
     65 /*
     66 * Threadpool
     67 */
     68 struct PRThreadPool {
     69  PRInt32 init_threads;
     70  PRInt32 max_threads;
     71  PRInt32 current_threads;
     72  PRInt32 idle_threads;
     73  PRUint32 stacksize;
     74  tp_jobq jobq;
     75  io_jobq ioq;
     76  timer_jobq timerq;
     77  PRLock* join_lock; /* used with jobp->join_cv */
     78  PRCondVar* shutdown_cv;
     79  PRBool shutdown;
     80 };
     81 
     82 typedef enum io_op_type {
     83  JOB_IO_READ,
     84  JOB_IO_WRITE,
     85  JOB_IO_CONNECT,
     86  JOB_IO_ACCEPT
     87 } io_op_type;
     88 
     89 #ifdef OPT_WINNT
     90 typedef struct NT_notifier {
     91  OVERLAPPED overlapped; /* must be first */
     92  PRJob* jobp;
     93 } NT_notifier;
     94 #endif
     95 
     96 struct PRJob {
     97  PRCList links;    /*  for linking jobs */
     98  PRBool on_ioq;    /* job on ioq */
     99  PRBool on_timerq; /* job on timerq */
    100  PRJobFn job_func;
    101  void* job_arg;
    102  PRCondVar* join_cv;
    103  PRBool join_wait;     /* == PR_TRUE, when waiting to join */
    104  PRCondVar* cancel_cv; /* for cancelling IO jobs */
    105  PRBool cancel_io;     /* for cancelling IO jobs */
    106  PRThreadPool* tpool;  /* back pointer to thread pool */
    107  PRJobIoDesc* iod;
    108  io_op_type io_op;
    109  PRInt16 io_poll_flags;
    110  PRNetAddr* netaddr;
    111  PRIntervalTime timeout; /* relative value */
    112  PRIntervalTime absolute;
    113 #ifdef OPT_WINNT
    114  NT_notifier nt_notifier;
    115 #endif
    116 };
    117 
    118 #define JOB_LINKS_PTR(_qp) ((PRJob*)((char*)(_qp) - offsetof(PRJob, links)))
    119 
    120 #define WTHREAD_LINKS_PTR(_qp) \
    121  ((wthread*)((char*)(_qp) - offsetof(wthread, links)))
    122 
    123 #define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv)
    124 
    125 #define JOIN_NOTIFY(_jobp)            \
    126  PR_BEGIN_MACRO                      \
    127  PR_Lock(_jobp->tpool->join_lock);   \
    128  _jobp->join_wait = PR_FALSE;        \
    129  PR_NotifyCondVar(_jobp->join_cv);   \
    130  PR_Unlock(_jobp->tpool->join_lock); \
    131  PR_END_MACRO
    132 
    133 #define CANCEL_IO_JOB(jobp)              \
    134  PR_BEGIN_MACRO                         \
    135  jobp->cancel_io = PR_FALSE;            \
    136  jobp->on_ioq = PR_FALSE;               \
    137  PR_REMOVE_AND_INIT_LINK(&jobp->links); \
    138  tp->ioq.cnt--;                         \
    139  PR_NotifyCondVar(jobp->cancel_cv);     \
    140  PR_END_MACRO
    141 
    142 static void delete_job(PRJob* jobp);
    143 static PRThreadPool* alloc_threadpool(void);
    144 static PRJob* alloc_job(PRBool joinable, PRThreadPool* tp);
    145 static void notify_ioq(PRThreadPool* tp);
    146 static void notify_timerq(PRThreadPool* tp);
    147 
    148 /*
    149 * locks are acquired in the following order
    150 *
    151 *  tp->ioq.lock,tp->timerq.lock
    152 *          |
    153 *          V
    154 *      tp->jobq->lock
    155 */
    156 
    157 /*
    158 * worker thread function
    159 */
    160 static void wstart(void* arg) {
    161  PRThreadPool* tp = (PRThreadPool*)arg;
    162  PRCList* head;
    163 
    164  /*
    165   * execute jobs until shutdown
    166   */
    167  while (!tp->shutdown) {
    168    PRJob* jobp;
    169 #ifdef OPT_WINNT
    170    BOOL rv;
    171    DWORD unused, shutdown;
    172    LPOVERLAPPED olp;
    173 
    174    PR_Lock(tp->jobq.lock);
    175    tp->idle_threads++;
    176    PR_Unlock(tp->jobq.lock);
    177    rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port, &unused,
    178                                   &shutdown, &olp, INFINITE);
    179 
    180    PR_ASSERT(rv);
    181    if (shutdown) {
    182      break;
    183    }
    184    jobp = ((NT_notifier*)olp)->jobp;
    185    PR_Lock(tp->jobq.lock);
    186    tp->idle_threads--;
    187    tp->jobq.cnt--;
    188    PR_Unlock(tp->jobq.lock);
    189 #else
    190 
    191    PR_Lock(tp->jobq.lock);
    192    while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) {
    193      tp->idle_threads++;
    194      PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT);
    195      tp->idle_threads--;
    196    }
    197    if (tp->shutdown) {
    198      PR_Unlock(tp->jobq.lock);
    199      break;
    200    }
    201    head = PR_LIST_HEAD(&tp->jobq.list);
    202    /*
    203     * remove job from queue
    204     */
    205    PR_REMOVE_AND_INIT_LINK(head);
    206    tp->jobq.cnt--;
    207    jobp = JOB_LINKS_PTR(head);
    208    PR_Unlock(tp->jobq.lock);
    209 #endif
    210 
    211    jobp->job_func(jobp->job_arg);
    212    if (!JOINABLE_JOB(jobp)) {
    213      delete_job(jobp);
    214    } else {
    215      JOIN_NOTIFY(jobp);
    216    }
    217  }
    218  PR_Lock(tp->jobq.lock);
    219  tp->current_threads--;
    220  PR_Unlock(tp->jobq.lock);
    221 }
    222 
    223 /*
    224 * add a job to the work queue
    225 */
    226 static void add_to_jobq(PRThreadPool* tp, PRJob* jobp) {
    227  /*
    228   * add to jobq
    229   */
    230 #ifdef OPT_WINNT
    231  PR_Lock(tp->jobq.lock);
    232  tp->jobq.cnt++;
    233  PR_Unlock(tp->jobq.lock);
    234  /*
    235   * notify worker thread(s)
    236   */
    237  PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0, FALSE,
    238                             &jobp->nt_notifier.overlapped);
    239 #else
    240  PR_Lock(tp->jobq.lock);
    241  PR_APPEND_LINK(&jobp->links, &tp->jobq.list);
    242  tp->jobq.cnt++;
    243  if ((tp->idle_threads < tp->jobq.cnt) &&
    244      (tp->current_threads < tp->max_threads)) {
    245    wthread* wthrp;
    246    /*
    247     * increment thread count and unlock the jobq lock
    248     */
    249    tp->current_threads++;
    250    PR_Unlock(tp->jobq.lock);
    251    /* create new worker thread */
    252    wthrp = PR_NEWZAP(wthread);
    253    if (wthrp) {
    254      wthrp->thread =
    255          PR_CreateThread(PR_USER_THREAD, wstart, tp, PR_PRIORITY_NORMAL,
    256                          PR_GLOBAL_THREAD, PR_JOINABLE_THREAD, tp->stacksize);
    257      if (NULL == wthrp->thread) {
    258        PR_DELETE(wthrp); /* this sets wthrp to NULL */
    259      }
    260    }
    261    PR_Lock(tp->jobq.lock);
    262    if (NULL == wthrp) {
    263      tp->current_threads--;
    264    } else {
    265      PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
    266    }
    267  }
    268  /*
    269   * wakeup a worker thread
    270   */
    271  PR_NotifyCondVar(tp->jobq.cv);
    272  PR_Unlock(tp->jobq.lock);
    273 #endif
    274 }
    275 
    276 /*
    277 * io worker thread function
    278 */
    279 static void io_wstart(void* arg) {
    280  PRThreadPool* tp = (PRThreadPool*)arg;
    281  int pollfd_cnt, pollfds_used;
    282  int rv;
    283  PRCList *qp, *nextqp;
    284  PRPollDesc* pollfds = NULL;
    285  PRJob** polljobs = NULL;
    286  int poll_timeout;
    287  PRIntervalTime now;
    288 
    289  /*
    290   * scan io_jobq
    291   * construct poll list
    292   * call PR_Poll
    293   * for all fds, for which poll returns true, move the job to
    294   * jobq and wakeup worker thread.
    295   */
    296  while (!tp->shutdown) {
    297    PRJob* jobp;
    298 
    299    pollfd_cnt = tp->ioq.cnt + 10;
    300    if (pollfd_cnt > tp->ioq.npollfds) {
    301      /*
    302       * re-allocate pollfd array if the current one is not large
    303       * enough
    304       */
    305      if (NULL != tp->ioq.pollfds) {
    306        PR_Free(tp->ioq.pollfds);
    307      }
    308      tp->ioq.pollfds = (PRPollDesc*)PR_Malloc(
    309          pollfd_cnt * (sizeof(PRPollDesc) + sizeof(PRJob*)));
    310      PR_ASSERT(NULL != tp->ioq.pollfds);
    311      /*
    312       * array of pollfds
    313       */
    314      pollfds = tp->ioq.pollfds;
    315      tp->ioq.polljobs = (PRJob**)(&tp->ioq.pollfds[pollfd_cnt]);
    316      /*
    317       * parallel array of jobs
    318       */
    319      polljobs = tp->ioq.polljobs;
    320      tp->ioq.npollfds = pollfd_cnt;
    321    }
    322 
    323    pollfds_used = 0;
    324    /*
    325     * add the notify fd; used for unblocking io thread(s)
    326     */
    327    pollfds[pollfds_used].fd = tp->ioq.notify_fd;
    328    pollfds[pollfds_used].in_flags = PR_POLL_READ;
    329    pollfds[pollfds_used].out_flags = 0;
    330    polljobs[pollfds_used] = NULL;
    331    pollfds_used++;
    332    /*
    333     * fill in the pollfd array
    334     */
    335    PR_Lock(tp->ioq.lock);
    336    for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
    337      nextqp = qp->next;
    338      jobp = JOB_LINKS_PTR(qp);
    339      if (jobp->cancel_io) {
    340        CANCEL_IO_JOB(jobp);
    341        continue;
    342      }
    343      if (pollfds_used == (pollfd_cnt)) {
    344        break;
    345      }
    346      pollfds[pollfds_used].fd = jobp->iod->socket;
    347      pollfds[pollfds_used].in_flags = jobp->io_poll_flags;
    348      pollfds[pollfds_used].out_flags = 0;
    349      polljobs[pollfds_used] = jobp;
    350 
    351      pollfds_used++;
    352    }
    353    if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) {
    354      qp = tp->ioq.list.next;
    355      jobp = JOB_LINKS_PTR(qp);
    356      if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) {
    357        poll_timeout = PR_INTERVAL_NO_TIMEOUT;
    358      } else if (PR_INTERVAL_NO_WAIT == jobp->timeout) {
    359        poll_timeout = PR_INTERVAL_NO_WAIT;
    360      } else {
    361        poll_timeout = jobp->absolute - PR_IntervalNow();
    362        if (poll_timeout <= 0) { /* already timed out */
    363          poll_timeout = PR_INTERVAL_NO_WAIT;
    364        }
    365      }
    366    } else {
    367      poll_timeout = PR_INTERVAL_NO_TIMEOUT;
    368    }
    369    PR_Unlock(tp->ioq.lock);
    370 
    371    /*
    372     * XXXX
    373     * should retry if more jobs have been added to the queue?
    374     *
    375     */
    376    PR_ASSERT(pollfds_used <= pollfd_cnt);
    377    rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout);
    378 
    379    if (tp->shutdown) {
    380      break;
    381    }
    382 
    383    if (rv > 0) {
    384      /*
    385       * at least one io event is set
    386       */
    387      PRStatus rval_status;
    388      PRInt32 index;
    389 
    390      PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd);
    391      /*
    392       * reset the pollable event, if notified
    393       */
    394      if (pollfds[0].out_flags & PR_POLL_READ) {
    395        rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd);
    396        PR_ASSERT(PR_SUCCESS == rval_status);
    397      }
    398 
    399      for (index = 1; index < (pollfds_used); index++) {
    400        PRInt16 events = pollfds[index].in_flags;
    401        PRInt16 revents = pollfds[index].out_flags;
    402        jobp = polljobs[index];
    403 
    404        if ((revents & PR_POLL_NVAL) || /* busted in all cases */
    405            (revents & PR_POLL_ERR) ||
    406            ((events & PR_POLL_WRITE) &&
    407             (revents & PR_POLL_HUP))) { /* write op & hup */
    408          PR_Lock(tp->ioq.lock);
    409          if (jobp->cancel_io) {
    410            CANCEL_IO_JOB(jobp);
    411            PR_Unlock(tp->ioq.lock);
    412            continue;
    413          }
    414          PR_REMOVE_AND_INIT_LINK(&jobp->links);
    415          tp->ioq.cnt--;
    416          jobp->on_ioq = PR_FALSE;
    417          PR_Unlock(tp->ioq.lock);
    418 
    419          /* set error */
    420          if (PR_POLL_NVAL & revents) {
    421            jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR;
    422          } else if (PR_POLL_HUP & revents) {
    423            jobp->iod->error = PR_CONNECT_RESET_ERROR;
    424          } else {
    425            jobp->iod->error = PR_IO_ERROR;
    426          }
    427 
    428          /*
    429           * add to jobq
    430           */
    431          add_to_jobq(tp, jobp);
    432        } else if (revents) {
    433          /*
    434           * add to jobq
    435           */
    436          PR_Lock(tp->ioq.lock);
    437          if (jobp->cancel_io) {
    438            CANCEL_IO_JOB(jobp);
    439            PR_Unlock(tp->ioq.lock);
    440            continue;
    441          }
    442          PR_REMOVE_AND_INIT_LINK(&jobp->links);
    443          tp->ioq.cnt--;
    444          jobp->on_ioq = PR_FALSE;
    445          PR_Unlock(tp->ioq.lock);
    446 
    447          if (jobp->io_op == JOB_IO_CONNECT) {
    448            if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS) {
    449              jobp->iod->error = 0;
    450            } else {
    451              jobp->iod->error = PR_GetError();
    452            }
    453          } else {
    454            jobp->iod->error = 0;
    455          }
    456 
    457          add_to_jobq(tp, jobp);
    458        }
    459      }
    460    }
    461    /*
    462     * timeout processing
    463     */
    464    now = PR_IntervalNow();
    465    PR_Lock(tp->ioq.lock);
    466    for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
    467      nextqp = qp->next;
    468      jobp = JOB_LINKS_PTR(qp);
    469      if (jobp->cancel_io) {
    470        CANCEL_IO_JOB(jobp);
    471        continue;
    472      }
    473      if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) {
    474        break;
    475      }
    476      if ((PR_INTERVAL_NO_WAIT != jobp->timeout) &&
    477          ((PRInt32)(jobp->absolute - now) > 0)) {
    478        break;
    479      }
    480      PR_REMOVE_AND_INIT_LINK(&jobp->links);
    481      tp->ioq.cnt--;
    482      jobp->on_ioq = PR_FALSE;
    483      jobp->iod->error = PR_IO_TIMEOUT_ERROR;
    484      add_to_jobq(tp, jobp);
    485    }
    486    PR_Unlock(tp->ioq.lock);
    487  }
    488 }
    489 
    490 /*
    491 * timer worker thread function
    492 */
    493 static void timer_wstart(void* arg) {
    494  PRThreadPool* tp = (PRThreadPool*)arg;
    495  PRCList* qp;
    496  PRIntervalTime timeout;
    497  PRIntervalTime now;
    498 
    499  /*
    500   * call PR_WaitCondVar with minimum value of all timeouts
    501   */
    502  while (!tp->shutdown) {
    503    PRJob* jobp;
    504 
    505    PR_Lock(tp->timerq.lock);
    506    if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
    507      timeout = PR_INTERVAL_NO_TIMEOUT;
    508    } else {
    509      PRCList* qp;
    510 
    511      qp = tp->timerq.list.next;
    512      jobp = JOB_LINKS_PTR(qp);
    513 
    514      timeout = jobp->absolute - PR_IntervalNow();
    515      if (timeout <= 0) {
    516        timeout = PR_INTERVAL_NO_WAIT; /* already timed out */
    517      }
    518    }
    519    if (PR_INTERVAL_NO_WAIT != timeout) {
    520      PR_WaitCondVar(tp->timerq.cv, timeout);
    521    }
    522    if (tp->shutdown) {
    523      PR_Unlock(tp->timerq.lock);
    524      break;
    525    }
    526    /*
    527     * move expired-timer jobs to jobq
    528     */
    529    now = PR_IntervalNow();
    530    while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
    531      qp = tp->timerq.list.next;
    532      jobp = JOB_LINKS_PTR(qp);
    533 
    534      if ((PRInt32)(jobp->absolute - now) > 0) {
    535        break;
    536      }
    537      /*
    538       * job timed out
    539       */
    540      PR_REMOVE_AND_INIT_LINK(&jobp->links);
    541      tp->timerq.cnt--;
    542      jobp->on_timerq = PR_FALSE;
    543      add_to_jobq(tp, jobp);
    544    }
    545    PR_Unlock(tp->timerq.lock);
    546  }
    547 }
    548 
    549 static void delete_threadpool(PRThreadPool* tp) {
    550  if (NULL != tp) {
    551    if (NULL != tp->shutdown_cv) {
    552      PR_DestroyCondVar(tp->shutdown_cv);
    553    }
    554    if (NULL != tp->jobq.cv) {
    555      PR_DestroyCondVar(tp->jobq.cv);
    556    }
    557    if (NULL != tp->jobq.lock) {
    558      PR_DestroyLock(tp->jobq.lock);
    559    }
    560    if (NULL != tp->join_lock) {
    561      PR_DestroyLock(tp->join_lock);
    562    }
    563 #ifdef OPT_WINNT
    564    if (NULL != tp->jobq.nt_completion_port) {
    565      CloseHandle(tp->jobq.nt_completion_port);
    566    }
    567 #endif
    568    /* Timer queue */
    569    if (NULL != tp->timerq.cv) {
    570      PR_DestroyCondVar(tp->timerq.cv);
    571    }
    572    if (NULL != tp->timerq.lock) {
    573      PR_DestroyLock(tp->timerq.lock);
    574    }
    575 
    576    if (NULL != tp->ioq.lock) {
    577      PR_DestroyLock(tp->ioq.lock);
    578    }
    579    if (NULL != tp->ioq.pollfds) {
    580      PR_Free(tp->ioq.pollfds);
    581    }
    582    if (NULL != tp->ioq.notify_fd) {
    583      PR_DestroyPollableEvent(tp->ioq.notify_fd);
    584    }
    585    PR_Free(tp);
    586  }
    587  return;
    588 }
    589 
    590 static PRThreadPool* alloc_threadpool(void) {
    591  PRThreadPool* tp;
    592 
    593  tp = (PRThreadPool*)PR_CALLOC(sizeof(*tp));
    594  if (NULL == tp) {
    595    goto failed;
    596  }
    597  tp->jobq.lock = PR_NewLock();
    598  if (NULL == tp->jobq.lock) {
    599    goto failed;
    600  }
    601  tp->jobq.cv = PR_NewCondVar(tp->jobq.lock);
    602  if (NULL == tp->jobq.cv) {
    603    goto failed;
    604  }
    605  tp->join_lock = PR_NewLock();
    606  if (NULL == tp->join_lock) {
    607    goto failed;
    608  }
    609 #ifdef OPT_WINNT
    610  tp->jobq.nt_completion_port =
    611      CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
    612  if (NULL == tp->jobq.nt_completion_port) {
    613    goto failed;
    614  }
    615 #endif
    616 
    617  tp->ioq.lock = PR_NewLock();
    618  if (NULL == tp->ioq.lock) {
    619    goto failed;
    620  }
    621 
    622  /* Timer queue */
    623 
    624  tp->timerq.lock = PR_NewLock();
    625  if (NULL == tp->timerq.lock) {
    626    goto failed;
    627  }
    628  tp->timerq.cv = PR_NewCondVar(tp->timerq.lock);
    629  if (NULL == tp->timerq.cv) {
    630    goto failed;
    631  }
    632 
    633  tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock);
    634  if (NULL == tp->shutdown_cv) {
    635    goto failed;
    636  }
    637  tp->ioq.notify_fd = PR_NewPollableEvent();
    638  if (NULL == tp->ioq.notify_fd) {
    639    goto failed;
    640  }
    641  return tp;
    642 failed:
    643  delete_threadpool(tp);
    644  PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
    645  return NULL;
    646 }
    647 
    648 /* Create thread pool */
    649 PR_IMPLEMENT(PRThreadPool*)
    650 PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads,
    651                    PRUint32 stacksize) {
    652  PRThreadPool* tp;
    653  PRThread* thr;
    654  int i;
    655  wthread* wthrp;
    656 
    657  tp = alloc_threadpool();
    658  if (NULL == tp) {
    659    return NULL;
    660  }
    661 
    662  tp->init_threads = initial_threads;
    663  tp->max_threads = max_threads;
    664  tp->stacksize = stacksize;
    665  PR_INIT_CLIST(&tp->jobq.list);
    666  PR_INIT_CLIST(&tp->ioq.list);
    667  PR_INIT_CLIST(&tp->timerq.list);
    668  PR_INIT_CLIST(&tp->jobq.wthreads);
    669  PR_INIT_CLIST(&tp->ioq.wthreads);
    670  PR_INIT_CLIST(&tp->timerq.wthreads);
    671  tp->shutdown = PR_FALSE;
    672 
    673  PR_Lock(tp->jobq.lock);
    674  for (i = 0; i < initial_threads; ++i) {
    675    thr = PR_CreateThread(PR_USER_THREAD, wstart, tp, PR_PRIORITY_NORMAL,
    676                          PR_GLOBAL_THREAD, PR_JOINABLE_THREAD, stacksize);
    677    PR_ASSERT(thr);
    678    wthrp = PR_NEWZAP(wthread);
    679    PR_ASSERT(wthrp);
    680    wthrp->thread = thr;
    681    PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
    682  }
    683  tp->current_threads = initial_threads;
    684 
    685  thr = PR_CreateThread(PR_USER_THREAD, io_wstart, tp, PR_PRIORITY_NORMAL,
    686                        PR_GLOBAL_THREAD, PR_JOINABLE_THREAD, stacksize);
    687  PR_ASSERT(thr);
    688  wthrp = PR_NEWZAP(wthread);
    689  PR_ASSERT(wthrp);
    690  wthrp->thread = thr;
    691  PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads);
    692 
    693  thr = PR_CreateThread(PR_USER_THREAD, timer_wstart, tp, PR_PRIORITY_NORMAL,
    694                        PR_GLOBAL_THREAD, PR_JOINABLE_THREAD, stacksize);
    695  PR_ASSERT(thr);
    696  wthrp = PR_NEWZAP(wthread);
    697  PR_ASSERT(wthrp);
    698  wthrp->thread = thr;
    699  PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads);
    700 
    701  PR_Unlock(tp->jobq.lock);
    702  return tp;
    703 }
    704 
    705 static void delete_job(PRJob* jobp) {
    706  if (NULL != jobp) {
    707    if (NULL != jobp->join_cv) {
    708      PR_DestroyCondVar(jobp->join_cv);
    709      jobp->join_cv = NULL;
    710    }
    711    if (NULL != jobp->cancel_cv) {
    712      PR_DestroyCondVar(jobp->cancel_cv);
    713      jobp->cancel_cv = NULL;
    714    }
    715    PR_DELETE(jobp);
    716  }
    717 }
    718 
    719 static PRJob* alloc_job(PRBool joinable, PRThreadPool* tp) {
    720  PRJob* jobp;
    721 
    722  jobp = PR_NEWZAP(PRJob);
    723  if (NULL == jobp) {
    724    goto failed;
    725  }
    726  if (joinable) {
    727    jobp->join_cv = PR_NewCondVar(tp->join_lock);
    728    jobp->join_wait = PR_TRUE;
    729    if (NULL == jobp->join_cv) {
    730      goto failed;
    731    }
    732  } else {
    733    jobp->join_cv = NULL;
    734  }
    735 #ifdef OPT_WINNT
    736  jobp->nt_notifier.jobp = jobp;
    737 #endif
    738  return jobp;
    739 failed:
    740  delete_job(jobp);
    741  PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
    742  return NULL;
    743 }
    744 
    745 /* queue a job */
    746 PR_IMPLEMENT(PRJob*)
    747 PR_QueueJob(PRThreadPool* tpool, PRJobFn fn, void* arg, PRBool joinable) {
    748  PRJob* jobp;
    749 
    750  jobp = alloc_job(joinable, tpool);
    751  if (NULL == jobp) {
    752    return NULL;
    753  }
    754 
    755  jobp->job_func = fn;
    756  jobp->job_arg = arg;
    757  jobp->tpool = tpool;
    758 
    759  add_to_jobq(tpool, jobp);
    760  return jobp;
    761 }
    762 
    763 /* queue a job, when a socket is readable or writeable */
    764 static PRJob* queue_io_job(PRThreadPool* tpool, PRJobIoDesc* iod, PRJobFn fn,
    765                           void* arg, PRBool joinable, io_op_type op) {
    766  PRJob* jobp;
    767  PRIntervalTime now;
    768 
    769  jobp = alloc_job(joinable, tpool);
    770  if (NULL == jobp) {
    771    return NULL;
    772  }
    773 
    774  /*
    775   * Add a new job to io_jobq
    776   * wakeup io worker thread
    777   */
    778 
    779  jobp->job_func = fn;
    780  jobp->job_arg = arg;
    781  jobp->tpool = tpool;
    782  jobp->iod = iod;
    783  if (JOB_IO_READ == op) {
    784    jobp->io_op = JOB_IO_READ;
    785    jobp->io_poll_flags = PR_POLL_READ;
    786  } else if (JOB_IO_WRITE == op) {
    787    jobp->io_op = JOB_IO_WRITE;
    788    jobp->io_poll_flags = PR_POLL_WRITE;
    789  } else if (JOB_IO_ACCEPT == op) {
    790    jobp->io_op = JOB_IO_ACCEPT;
    791    jobp->io_poll_flags = PR_POLL_READ;
    792  } else if (JOB_IO_CONNECT == op) {
    793    jobp->io_op = JOB_IO_CONNECT;
    794    jobp->io_poll_flags = PR_POLL_WRITE | PR_POLL_EXCEPT;
    795  } else {
    796    delete_job(jobp);
    797    PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
    798    return NULL;
    799  }
    800 
    801  jobp->timeout = iod->timeout;
    802  if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) ||
    803      (PR_INTERVAL_NO_WAIT == iod->timeout)) {
    804    jobp->absolute = iod->timeout;
    805  } else {
    806    now = PR_IntervalNow();
    807    jobp->absolute = now + iod->timeout;
    808  }
    809 
    810  PR_Lock(tpool->ioq.lock);
    811 
    812  if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) ||
    813      (PR_INTERVAL_NO_TIMEOUT == iod->timeout)) {
    814    PR_APPEND_LINK(&jobp->links, &tpool->ioq.list);
    815  } else if (PR_INTERVAL_NO_WAIT == iod->timeout) {
    816    PR_INSERT_LINK(&jobp->links, &tpool->ioq.list);
    817  } else {
    818    PRCList* qp;
    819    PRJob* tmp_jobp;
    820    /*
    821     * insert into the timeout-sorted ioq
    822     */
    823    for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list; qp = qp->prev) {
    824      tmp_jobp = JOB_LINKS_PTR(qp);
    825      if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
    826        break;
    827      }
    828    }
    829    PR_INSERT_AFTER(&jobp->links, qp);
    830  }
    831 
    832  jobp->on_ioq = PR_TRUE;
    833  tpool->ioq.cnt++;
    834  /*
    835   * notify io worker thread(s)
    836   */
    837  PR_Unlock(tpool->ioq.lock);
    838  notify_ioq(tpool);
    839  return jobp;
    840 }
    841 
    842 /* queue a job, when a socket is readable */
    843 PR_IMPLEMENT(PRJob*)
    844 PR_QueueJob_Read(PRThreadPool* tpool, PRJobIoDesc* iod, PRJobFn fn, void* arg,
    845                 PRBool joinable) {
    846  return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ));
    847 }
    848 
    849 /* queue a job, when a socket is writeable */
    850 PR_IMPLEMENT(PRJob*)
    851 PR_QueueJob_Write(PRThreadPool* tpool, PRJobIoDesc* iod, PRJobFn fn, void* arg,
    852                  PRBool joinable) {
    853  return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE));
    854 }
    855 
    856 /* queue a job, when a socket has a pending connection */
    857 PR_IMPLEMENT(PRJob*)
    858 PR_QueueJob_Accept(PRThreadPool* tpool, PRJobIoDesc* iod, PRJobFn fn, void* arg,
    859                   PRBool joinable) {
    860  return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT));
    861 }
    862 
    863 /* queue a job, when a socket can be connected */
    864 PR_IMPLEMENT(PRJob*)
    865 PR_QueueJob_Connect(PRThreadPool* tpool, PRJobIoDesc* iod,
    866                    const PRNetAddr* addr, PRJobFn fn, void* arg,
    867                    PRBool joinable) {
    868  PRStatus rv;
    869  PRErrorCode err;
    870 
    871  rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT);
    872  if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)) {
    873    /* connection pending */
    874    return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT));
    875  }
    876  /*
    877   * connection succeeded or failed; add to jobq right away
    878   */
    879  if (rv == PR_FAILURE) {
    880    iod->error = err;
    881  } else {
    882    iod->error = 0;
    883  }
    884  return (PR_QueueJob(tpool, fn, arg, joinable));
    885 }
    886 
    887 /* queue a job, when a timer expires */
    888 PR_IMPLEMENT(PRJob*)
    889 PR_QueueJob_Timer(PRThreadPool* tpool, PRIntervalTime timeout, PRJobFn fn,
    890                  void* arg, PRBool joinable) {
    891  PRIntervalTime now;
    892  PRJob* jobp;
    893 
    894  if (PR_INTERVAL_NO_TIMEOUT == timeout) {
    895    PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
    896    return NULL;
    897  }
    898  if (PR_INTERVAL_NO_WAIT == timeout) {
    899    /*
    900     * no waiting; add to jobq right away
    901     */
    902    return (PR_QueueJob(tpool, fn, arg, joinable));
    903  }
    904  jobp = alloc_job(joinable, tpool);
    905  if (NULL == jobp) {
    906    return NULL;
    907  }
    908 
    909  /*
    910   * Add a new job to timer_jobq
    911   * wakeup timer worker thread
    912   */
    913 
    914  jobp->job_func = fn;
    915  jobp->job_arg = arg;
    916  jobp->tpool = tpool;
    917  jobp->timeout = timeout;
    918 
    919  now = PR_IntervalNow();
    920  jobp->absolute = now + timeout;
    921 
    922  PR_Lock(tpool->timerq.lock);
    923  jobp->on_timerq = PR_TRUE;
    924  if (PR_CLIST_IS_EMPTY(&tpool->timerq.list)) {
    925    PR_APPEND_LINK(&jobp->links, &tpool->timerq.list);
    926  } else {
    927    PRCList* qp;
    928    PRJob* tmp_jobp;
    929    /*
    930     * insert into the sorted timer jobq
    931     */
    932    for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list;
    933         qp = qp->prev) {
    934      tmp_jobp = JOB_LINKS_PTR(qp);
    935      if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
    936        break;
    937      }
    938    }
    939    PR_INSERT_AFTER(&jobp->links, qp);
    940  }
    941  tpool->timerq.cnt++;
    942  /*
    943   * notify timer worker thread(s)
    944   */
    945  notify_timerq(tpool);
    946  PR_Unlock(tpool->timerq.lock);
    947  return jobp;
    948 }
    949 
    950 static void notify_timerq(PRThreadPool* tp) {
    951  /*
    952   * wakeup the timer thread(s)
    953   */
    954  PR_NotifyCondVar(tp->timerq.cv);
    955 }
    956 
    957 static void notify_ioq(PRThreadPool* tp) {
    958  PRStatus rval_status;
    959 
    960  /*
    961   * wakeup the io thread(s)
    962   */
    963  rval_status = PR_SetPollableEvent(tp->ioq.notify_fd);
    964  PR_ASSERT(PR_SUCCESS == rval_status);
    965 }
    966 
    967 /*
    968 * cancel a job
    969 *
    970 *  XXXX: is this needed? likely to be removed
    971 */
    972 PR_IMPLEMENT(PRStatus)
    973 PR_CancelJob(PRJob* jobp) {
    974  PRStatus rval = PR_FAILURE;
    975  PRThreadPool* tp;
    976 
    977  if (jobp->on_timerq) {
    978    /*
    979     * now, check again while holding the timerq lock
    980     */
    981    tp = jobp->tpool;
    982    PR_Lock(tp->timerq.lock);
    983    if (jobp->on_timerq) {
    984      jobp->on_timerq = PR_FALSE;
    985      PR_REMOVE_AND_INIT_LINK(&jobp->links);
    986      tp->timerq.cnt--;
    987      PR_Unlock(tp->timerq.lock);
    988      if (!JOINABLE_JOB(jobp)) {
    989        delete_job(jobp);
    990      } else {
    991        JOIN_NOTIFY(jobp);
    992      }
    993      rval = PR_SUCCESS;
    994    } else {
    995      PR_Unlock(tp->timerq.lock);
    996    }
    997  } else if (jobp->on_ioq) {
    998    /*
    999     * now, check again while holding the ioq lock
   1000     */
   1001    tp = jobp->tpool;
   1002    PR_Lock(tp->ioq.lock);
   1003    if (jobp->on_ioq) {
   1004      jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock);
   1005      if (NULL == jobp->cancel_cv) {
   1006        PR_Unlock(tp->ioq.lock);
   1007        PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0);
   1008        return PR_FAILURE;
   1009      }
   1010      /*
   1011       * mark job 'cancelled' and notify io thread(s)
   1012       * XXXX:
   1013       *      this assumes there is only one io thread; when there
   1014       *      are multiple threads, the io thread processing this job
   1015       *      must be notified.
   1016       */
   1017      jobp->cancel_io = PR_TRUE;
   1018      PR_Unlock(tp->ioq.lock); /* release, reacquire ioq lock */
   1019      notify_ioq(tp);
   1020      PR_Lock(tp->ioq.lock);
   1021      while (jobp->cancel_io) {
   1022        PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT);
   1023      }
   1024      PR_Unlock(tp->ioq.lock);
   1025      PR_ASSERT(!jobp->on_ioq);
   1026      if (!JOINABLE_JOB(jobp)) {
   1027        delete_job(jobp);
   1028      } else {
   1029        JOIN_NOTIFY(jobp);
   1030      }
   1031      rval = PR_SUCCESS;
   1032    } else {
   1033      PR_Unlock(tp->ioq.lock);
   1034    }
   1035  }
   1036  if (PR_FAILURE == rval) {
   1037    PR_SetError(PR_INVALID_STATE_ERROR, 0);
   1038  }
   1039  return rval;
   1040 }
   1041 
   1042 /* join a job, wait until completion */
   1043 PR_IMPLEMENT(PRStatus)
   1044 PR_JoinJob(PRJob* jobp) {
   1045  if (!JOINABLE_JOB(jobp)) {
   1046    PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
   1047    return PR_FAILURE;
   1048  }
   1049  PR_Lock(jobp->tpool->join_lock);
   1050  while (jobp->join_wait) {
   1051    PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT);
   1052  }
   1053  PR_Unlock(jobp->tpool->join_lock);
   1054  delete_job(jobp);
   1055  return PR_SUCCESS;
   1056 }
   1057 
   1058 /* shutdown threadpool */
   1059 PR_IMPLEMENT(PRStatus)
   1060 PR_ShutdownThreadPool(PRThreadPool* tpool) {
   1061  PRStatus rval = PR_SUCCESS;
   1062 
   1063  PR_Lock(tpool->jobq.lock);
   1064  tpool->shutdown = PR_TRUE;
   1065  PR_NotifyAllCondVar(tpool->shutdown_cv);
   1066  PR_Unlock(tpool->jobq.lock);
   1067 
   1068  return rval;
   1069 }
   1070 
   1071 /*
   1072 * join thread pool
   1073 *  wait for termination of worker threads
   1074 *  reclaim threadpool resources
   1075 */
   1076 PR_IMPLEMENT(PRStatus)
   1077 PR_JoinThreadPool(PRThreadPool* tpool) {
   1078  PRStatus rval = PR_SUCCESS;
   1079  PRCList* head;
   1080  PRStatus rval_status;
   1081 
   1082  PR_Lock(tpool->jobq.lock);
   1083  while (!tpool->shutdown) {
   1084    PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT);
   1085  }
   1086 
   1087  /*
   1088   * wakeup worker threads
   1089   */
   1090 #ifdef OPT_WINNT
   1091  /*
   1092   * post shutdown notification for all threads
   1093   */
   1094  {
   1095    int i;
   1096    for (i = 0; i < tpool->current_threads; i++) {
   1097      PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0, TRUE, NULL);
   1098    }
   1099  }
   1100 #else
   1101  PR_NotifyAllCondVar(tpool->jobq.cv);
   1102 #endif
   1103 
   1104  /*
   1105   * wakeup io thread(s)
   1106   */
   1107  notify_ioq(tpool);
   1108 
   1109  /*
   1110   * wakeup timer thread(s)
   1111   */
   1112  PR_Lock(tpool->timerq.lock);
   1113  notify_timerq(tpool);
   1114  PR_Unlock(tpool->timerq.lock);
   1115 
   1116  while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) {
   1117    wthread* wthrp;
   1118 
   1119    head = PR_LIST_HEAD(&tpool->jobq.wthreads);
   1120    PR_REMOVE_AND_INIT_LINK(head);
   1121    PR_Unlock(tpool->jobq.lock);
   1122    wthrp = WTHREAD_LINKS_PTR(head);
   1123    rval_status = PR_JoinThread(wthrp->thread);
   1124    PR_ASSERT(PR_SUCCESS == rval_status);
   1125    PR_DELETE(wthrp);
   1126    PR_Lock(tpool->jobq.lock);
   1127  }
   1128  PR_Unlock(tpool->jobq.lock);
   1129  while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) {
   1130    wthread* wthrp;
   1131 
   1132    head = PR_LIST_HEAD(&tpool->ioq.wthreads);
   1133    PR_REMOVE_AND_INIT_LINK(head);
   1134    wthrp = WTHREAD_LINKS_PTR(head);
   1135    rval_status = PR_JoinThread(wthrp->thread);
   1136    PR_ASSERT(PR_SUCCESS == rval_status);
   1137    PR_DELETE(wthrp);
   1138  }
   1139 
   1140  while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) {
   1141    wthread* wthrp;
   1142 
   1143    head = PR_LIST_HEAD(&tpool->timerq.wthreads);
   1144    PR_REMOVE_AND_INIT_LINK(head);
   1145    wthrp = WTHREAD_LINKS_PTR(head);
   1146    rval_status = PR_JoinThread(wthrp->thread);
   1147    PR_ASSERT(PR_SUCCESS == rval_status);
   1148    PR_DELETE(wthrp);
   1149  }
   1150 
   1151  /*
   1152   * Delete queued jobs
   1153   */
   1154  while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) {
   1155    PRJob* jobp;
   1156 
   1157    head = PR_LIST_HEAD(&tpool->jobq.list);
   1158    PR_REMOVE_AND_INIT_LINK(head);
   1159    jobp = JOB_LINKS_PTR(head);
   1160    tpool->jobq.cnt--;
   1161    delete_job(jobp);
   1162  }
   1163 
   1164  /* delete io jobs */
   1165  while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) {
   1166    PRJob* jobp;
   1167 
   1168    head = PR_LIST_HEAD(&tpool->ioq.list);
   1169    PR_REMOVE_AND_INIT_LINK(head);
   1170    tpool->ioq.cnt--;
   1171    jobp = JOB_LINKS_PTR(head);
   1172    delete_job(jobp);
   1173  }
   1174 
   1175  /* delete timer jobs */
   1176  while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) {
   1177    PRJob* jobp;
   1178 
   1179    head = PR_LIST_HEAD(&tpool->timerq.list);
   1180    PR_REMOVE_AND_INIT_LINK(head);
   1181    tpool->timerq.cnt--;
   1182    jobp = JOB_LINKS_PTR(head);
   1183    delete_job(jobp);
   1184  }
   1185 
   1186  PR_ASSERT(0 == tpool->jobq.cnt);
   1187  PR_ASSERT(0 == tpool->ioq.cnt);
   1188  PR_ASSERT(0 == tpool->timerq.cnt);
   1189 
   1190  delete_threadpool(tpool);
   1191  return rval;
   1192 }