prtpool.c (29322B)
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* This Source Code Form is subject to the terms of the Mozilla Public 3 * License, v. 2.0. If a copy of the MPL was not distributed with this 4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 5 6 #include "nspr.h" 7 8 /* 9 * Thread pools 10 * Thread pools create and manage threads to provide support for 11 * scheduling jobs onto one or more threads. 12 * 13 */ 14 #ifdef OPT_WINNT 15 # include <windows.h> 16 #endif 17 18 /* 19 * worker thread 20 */ 21 typedef struct wthread { 22 PRCList links; 23 PRThread* thread; 24 } wthread; 25 26 /* 27 * queue of timer jobs 28 */ 29 typedef struct timer_jobq { 30 PRCList list; 31 PRLock* lock; 32 PRCondVar* cv; 33 PRInt32 cnt; 34 PRCList wthreads; 35 } timer_jobq; 36 37 /* 38 * queue of jobs 39 */ 40 typedef struct tp_jobq { 41 PRCList list; 42 PRInt32 cnt; 43 PRLock* lock; 44 PRCondVar* cv; 45 PRCList wthreads; 46 #ifdef OPT_WINNT 47 HANDLE nt_completion_port; 48 #endif 49 } tp_jobq; 50 51 /* 52 * queue of IO jobs 53 */ 54 typedef struct io_jobq { 55 PRCList list; 56 PRPollDesc* pollfds; 57 PRInt32 npollfds; 58 PRJob** polljobs; 59 PRLock* lock; 60 PRInt32 cnt; 61 PRFileDesc* notify_fd; 62 PRCList wthreads; 63 } io_jobq; 64 65 /* 66 * Threadpool 67 */ 68 struct PRThreadPool { 69 PRInt32 init_threads; 70 PRInt32 max_threads; 71 PRInt32 current_threads; 72 PRInt32 idle_threads; 73 PRUint32 stacksize; 74 tp_jobq jobq; 75 io_jobq ioq; 76 timer_jobq timerq; 77 PRLock* join_lock; /* used with jobp->join_cv */ 78 PRCondVar* shutdown_cv; 79 PRBool shutdown; 80 }; 81 82 typedef enum io_op_type { 83 JOB_IO_READ, 84 JOB_IO_WRITE, 85 JOB_IO_CONNECT, 86 JOB_IO_ACCEPT 87 } io_op_type; 88 89 #ifdef OPT_WINNT 90 typedef struct NT_notifier { 91 OVERLAPPED overlapped; /* must be first */ 92 PRJob* jobp; 93 } NT_notifier; 94 #endif 95 96 struct PRJob { 97 PRCList links; /* for linking jobs */ 98 PRBool on_ioq; /* job on ioq */ 99 PRBool on_timerq; /* job on timerq */ 100 PRJobFn job_func; 101 void* job_arg; 102 PRCondVar* join_cv; 103 PRBool join_wait; /* == PR_TRUE, when waiting to join */ 104 PRCondVar* cancel_cv; /* for cancelling IO jobs */ 105 PRBool cancel_io; /* for cancelling IO jobs */ 106 PRThreadPool* tpool; /* back pointer to thread pool */ 107 PRJobIoDesc* iod; 108 io_op_type io_op; 109 PRInt16 io_poll_flags; 110 PRNetAddr* netaddr; 111 PRIntervalTime timeout; /* relative value */ 112 PRIntervalTime absolute; 113 #ifdef OPT_WINNT 114 NT_notifier nt_notifier; 115 #endif 116 }; 117 118 #define JOB_LINKS_PTR(_qp) ((PRJob*)((char*)(_qp) - offsetof(PRJob, links))) 119 120 #define WTHREAD_LINKS_PTR(_qp) \ 121 ((wthread*)((char*)(_qp) - offsetof(wthread, links))) 122 123 #define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv) 124 125 #define JOIN_NOTIFY(_jobp) \ 126 PR_BEGIN_MACRO \ 127 PR_Lock(_jobp->tpool->join_lock); \ 128 _jobp->join_wait = PR_FALSE; \ 129 PR_NotifyCondVar(_jobp->join_cv); \ 130 PR_Unlock(_jobp->tpool->join_lock); \ 131 PR_END_MACRO 132 133 #define CANCEL_IO_JOB(jobp) \ 134 PR_BEGIN_MACRO \ 135 jobp->cancel_io = PR_FALSE; \ 136 jobp->on_ioq = PR_FALSE; \ 137 PR_REMOVE_AND_INIT_LINK(&jobp->links); \ 138 tp->ioq.cnt--; \ 139 PR_NotifyCondVar(jobp->cancel_cv); \ 140 PR_END_MACRO 141 142 static void delete_job(PRJob* jobp); 143 static PRThreadPool* alloc_threadpool(void); 144 static PRJob* alloc_job(PRBool joinable, PRThreadPool* tp); 145 static void notify_ioq(PRThreadPool* tp); 146 static void notify_timerq(PRThreadPool* tp); 147 148 /* 149 * locks are acquired in the following order 150 * 151 * tp->ioq.lock,tp->timerq.lock 152 * | 153 * V 154 * tp->jobq->lock 155 */ 156 157 /* 158 * worker thread function 159 */ 160 static void wstart(void* arg) { 161 PRThreadPool* tp = (PRThreadPool*)arg; 162 PRCList* head; 163 164 /* 165 * execute jobs until shutdown 166 */ 167 while (!tp->shutdown) { 168 PRJob* jobp; 169 #ifdef OPT_WINNT 170 BOOL rv; 171 DWORD unused, shutdown; 172 LPOVERLAPPED olp; 173 174 PR_Lock(tp->jobq.lock); 175 tp->idle_threads++; 176 PR_Unlock(tp->jobq.lock); 177 rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port, &unused, 178 &shutdown, &olp, INFINITE); 179 180 PR_ASSERT(rv); 181 if (shutdown) { 182 break; 183 } 184 jobp = ((NT_notifier*)olp)->jobp; 185 PR_Lock(tp->jobq.lock); 186 tp->idle_threads--; 187 tp->jobq.cnt--; 188 PR_Unlock(tp->jobq.lock); 189 #else 190 191 PR_Lock(tp->jobq.lock); 192 while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) { 193 tp->idle_threads++; 194 PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT); 195 tp->idle_threads--; 196 } 197 if (tp->shutdown) { 198 PR_Unlock(tp->jobq.lock); 199 break; 200 } 201 head = PR_LIST_HEAD(&tp->jobq.list); 202 /* 203 * remove job from queue 204 */ 205 PR_REMOVE_AND_INIT_LINK(head); 206 tp->jobq.cnt--; 207 jobp = JOB_LINKS_PTR(head); 208 PR_Unlock(tp->jobq.lock); 209 #endif 210 211 jobp->job_func(jobp->job_arg); 212 if (!JOINABLE_JOB(jobp)) { 213 delete_job(jobp); 214 } else { 215 JOIN_NOTIFY(jobp); 216 } 217 } 218 PR_Lock(tp->jobq.lock); 219 tp->current_threads--; 220 PR_Unlock(tp->jobq.lock); 221 } 222 223 /* 224 * add a job to the work queue 225 */ 226 static void add_to_jobq(PRThreadPool* tp, PRJob* jobp) { 227 /* 228 * add to jobq 229 */ 230 #ifdef OPT_WINNT 231 PR_Lock(tp->jobq.lock); 232 tp->jobq.cnt++; 233 PR_Unlock(tp->jobq.lock); 234 /* 235 * notify worker thread(s) 236 */ 237 PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0, FALSE, 238 &jobp->nt_notifier.overlapped); 239 #else 240 PR_Lock(tp->jobq.lock); 241 PR_APPEND_LINK(&jobp->links, &tp->jobq.list); 242 tp->jobq.cnt++; 243 if ((tp->idle_threads < tp->jobq.cnt) && 244 (tp->current_threads < tp->max_threads)) { 245 wthread* wthrp; 246 /* 247 * increment thread count and unlock the jobq lock 248 */ 249 tp->current_threads++; 250 PR_Unlock(tp->jobq.lock); 251 /* create new worker thread */ 252 wthrp = PR_NEWZAP(wthread); 253 if (wthrp) { 254 wthrp->thread = 255 PR_CreateThread(PR_USER_THREAD, wstart, tp, PR_PRIORITY_NORMAL, 256 PR_GLOBAL_THREAD, PR_JOINABLE_THREAD, tp->stacksize); 257 if (NULL == wthrp->thread) { 258 PR_DELETE(wthrp); /* this sets wthrp to NULL */ 259 } 260 } 261 PR_Lock(tp->jobq.lock); 262 if (NULL == wthrp) { 263 tp->current_threads--; 264 } else { 265 PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads); 266 } 267 } 268 /* 269 * wakeup a worker thread 270 */ 271 PR_NotifyCondVar(tp->jobq.cv); 272 PR_Unlock(tp->jobq.lock); 273 #endif 274 } 275 276 /* 277 * io worker thread function 278 */ 279 static void io_wstart(void* arg) { 280 PRThreadPool* tp = (PRThreadPool*)arg; 281 int pollfd_cnt, pollfds_used; 282 int rv; 283 PRCList *qp, *nextqp; 284 PRPollDesc* pollfds = NULL; 285 PRJob** polljobs = NULL; 286 int poll_timeout; 287 PRIntervalTime now; 288 289 /* 290 * scan io_jobq 291 * construct poll list 292 * call PR_Poll 293 * for all fds, for which poll returns true, move the job to 294 * jobq and wakeup worker thread. 295 */ 296 while (!tp->shutdown) { 297 PRJob* jobp; 298 299 pollfd_cnt = tp->ioq.cnt + 10; 300 if (pollfd_cnt > tp->ioq.npollfds) { 301 /* 302 * re-allocate pollfd array if the current one is not large 303 * enough 304 */ 305 if (NULL != tp->ioq.pollfds) { 306 PR_Free(tp->ioq.pollfds); 307 } 308 tp->ioq.pollfds = (PRPollDesc*)PR_Malloc( 309 pollfd_cnt * (sizeof(PRPollDesc) + sizeof(PRJob*))); 310 PR_ASSERT(NULL != tp->ioq.pollfds); 311 /* 312 * array of pollfds 313 */ 314 pollfds = tp->ioq.pollfds; 315 tp->ioq.polljobs = (PRJob**)(&tp->ioq.pollfds[pollfd_cnt]); 316 /* 317 * parallel array of jobs 318 */ 319 polljobs = tp->ioq.polljobs; 320 tp->ioq.npollfds = pollfd_cnt; 321 } 322 323 pollfds_used = 0; 324 /* 325 * add the notify fd; used for unblocking io thread(s) 326 */ 327 pollfds[pollfds_used].fd = tp->ioq.notify_fd; 328 pollfds[pollfds_used].in_flags = PR_POLL_READ; 329 pollfds[pollfds_used].out_flags = 0; 330 polljobs[pollfds_used] = NULL; 331 pollfds_used++; 332 /* 333 * fill in the pollfd array 334 */ 335 PR_Lock(tp->ioq.lock); 336 for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) { 337 nextqp = qp->next; 338 jobp = JOB_LINKS_PTR(qp); 339 if (jobp->cancel_io) { 340 CANCEL_IO_JOB(jobp); 341 continue; 342 } 343 if (pollfds_used == (pollfd_cnt)) { 344 break; 345 } 346 pollfds[pollfds_used].fd = jobp->iod->socket; 347 pollfds[pollfds_used].in_flags = jobp->io_poll_flags; 348 pollfds[pollfds_used].out_flags = 0; 349 polljobs[pollfds_used] = jobp; 350 351 pollfds_used++; 352 } 353 if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) { 354 qp = tp->ioq.list.next; 355 jobp = JOB_LINKS_PTR(qp); 356 if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) { 357 poll_timeout = PR_INTERVAL_NO_TIMEOUT; 358 } else if (PR_INTERVAL_NO_WAIT == jobp->timeout) { 359 poll_timeout = PR_INTERVAL_NO_WAIT; 360 } else { 361 poll_timeout = jobp->absolute - PR_IntervalNow(); 362 if (poll_timeout <= 0) { /* already timed out */ 363 poll_timeout = PR_INTERVAL_NO_WAIT; 364 } 365 } 366 } else { 367 poll_timeout = PR_INTERVAL_NO_TIMEOUT; 368 } 369 PR_Unlock(tp->ioq.lock); 370 371 /* 372 * XXXX 373 * should retry if more jobs have been added to the queue? 374 * 375 */ 376 PR_ASSERT(pollfds_used <= pollfd_cnt); 377 rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout); 378 379 if (tp->shutdown) { 380 break; 381 } 382 383 if (rv > 0) { 384 /* 385 * at least one io event is set 386 */ 387 PRStatus rval_status; 388 PRInt32 index; 389 390 PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd); 391 /* 392 * reset the pollable event, if notified 393 */ 394 if (pollfds[0].out_flags & PR_POLL_READ) { 395 rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd); 396 PR_ASSERT(PR_SUCCESS == rval_status); 397 } 398 399 for (index = 1; index < (pollfds_used); index++) { 400 PRInt16 events = pollfds[index].in_flags; 401 PRInt16 revents = pollfds[index].out_flags; 402 jobp = polljobs[index]; 403 404 if ((revents & PR_POLL_NVAL) || /* busted in all cases */ 405 (revents & PR_POLL_ERR) || 406 ((events & PR_POLL_WRITE) && 407 (revents & PR_POLL_HUP))) { /* write op & hup */ 408 PR_Lock(tp->ioq.lock); 409 if (jobp->cancel_io) { 410 CANCEL_IO_JOB(jobp); 411 PR_Unlock(tp->ioq.lock); 412 continue; 413 } 414 PR_REMOVE_AND_INIT_LINK(&jobp->links); 415 tp->ioq.cnt--; 416 jobp->on_ioq = PR_FALSE; 417 PR_Unlock(tp->ioq.lock); 418 419 /* set error */ 420 if (PR_POLL_NVAL & revents) { 421 jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR; 422 } else if (PR_POLL_HUP & revents) { 423 jobp->iod->error = PR_CONNECT_RESET_ERROR; 424 } else { 425 jobp->iod->error = PR_IO_ERROR; 426 } 427 428 /* 429 * add to jobq 430 */ 431 add_to_jobq(tp, jobp); 432 } else if (revents) { 433 /* 434 * add to jobq 435 */ 436 PR_Lock(tp->ioq.lock); 437 if (jobp->cancel_io) { 438 CANCEL_IO_JOB(jobp); 439 PR_Unlock(tp->ioq.lock); 440 continue; 441 } 442 PR_REMOVE_AND_INIT_LINK(&jobp->links); 443 tp->ioq.cnt--; 444 jobp->on_ioq = PR_FALSE; 445 PR_Unlock(tp->ioq.lock); 446 447 if (jobp->io_op == JOB_IO_CONNECT) { 448 if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS) { 449 jobp->iod->error = 0; 450 } else { 451 jobp->iod->error = PR_GetError(); 452 } 453 } else { 454 jobp->iod->error = 0; 455 } 456 457 add_to_jobq(tp, jobp); 458 } 459 } 460 } 461 /* 462 * timeout processing 463 */ 464 now = PR_IntervalNow(); 465 PR_Lock(tp->ioq.lock); 466 for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) { 467 nextqp = qp->next; 468 jobp = JOB_LINKS_PTR(qp); 469 if (jobp->cancel_io) { 470 CANCEL_IO_JOB(jobp); 471 continue; 472 } 473 if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) { 474 break; 475 } 476 if ((PR_INTERVAL_NO_WAIT != jobp->timeout) && 477 ((PRInt32)(jobp->absolute - now) > 0)) { 478 break; 479 } 480 PR_REMOVE_AND_INIT_LINK(&jobp->links); 481 tp->ioq.cnt--; 482 jobp->on_ioq = PR_FALSE; 483 jobp->iod->error = PR_IO_TIMEOUT_ERROR; 484 add_to_jobq(tp, jobp); 485 } 486 PR_Unlock(tp->ioq.lock); 487 } 488 } 489 490 /* 491 * timer worker thread function 492 */ 493 static void timer_wstart(void* arg) { 494 PRThreadPool* tp = (PRThreadPool*)arg; 495 PRCList* qp; 496 PRIntervalTime timeout; 497 PRIntervalTime now; 498 499 /* 500 * call PR_WaitCondVar with minimum value of all timeouts 501 */ 502 while (!tp->shutdown) { 503 PRJob* jobp; 504 505 PR_Lock(tp->timerq.lock); 506 if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) { 507 timeout = PR_INTERVAL_NO_TIMEOUT; 508 } else { 509 PRCList* qp; 510 511 qp = tp->timerq.list.next; 512 jobp = JOB_LINKS_PTR(qp); 513 514 timeout = jobp->absolute - PR_IntervalNow(); 515 if (timeout <= 0) { 516 timeout = PR_INTERVAL_NO_WAIT; /* already timed out */ 517 } 518 } 519 if (PR_INTERVAL_NO_WAIT != timeout) { 520 PR_WaitCondVar(tp->timerq.cv, timeout); 521 } 522 if (tp->shutdown) { 523 PR_Unlock(tp->timerq.lock); 524 break; 525 } 526 /* 527 * move expired-timer jobs to jobq 528 */ 529 now = PR_IntervalNow(); 530 while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) { 531 qp = tp->timerq.list.next; 532 jobp = JOB_LINKS_PTR(qp); 533 534 if ((PRInt32)(jobp->absolute - now) > 0) { 535 break; 536 } 537 /* 538 * job timed out 539 */ 540 PR_REMOVE_AND_INIT_LINK(&jobp->links); 541 tp->timerq.cnt--; 542 jobp->on_timerq = PR_FALSE; 543 add_to_jobq(tp, jobp); 544 } 545 PR_Unlock(tp->timerq.lock); 546 } 547 } 548 549 static void delete_threadpool(PRThreadPool* tp) { 550 if (NULL != tp) { 551 if (NULL != tp->shutdown_cv) { 552 PR_DestroyCondVar(tp->shutdown_cv); 553 } 554 if (NULL != tp->jobq.cv) { 555 PR_DestroyCondVar(tp->jobq.cv); 556 } 557 if (NULL != tp->jobq.lock) { 558 PR_DestroyLock(tp->jobq.lock); 559 } 560 if (NULL != tp->join_lock) { 561 PR_DestroyLock(tp->join_lock); 562 } 563 #ifdef OPT_WINNT 564 if (NULL != tp->jobq.nt_completion_port) { 565 CloseHandle(tp->jobq.nt_completion_port); 566 } 567 #endif 568 /* Timer queue */ 569 if (NULL != tp->timerq.cv) { 570 PR_DestroyCondVar(tp->timerq.cv); 571 } 572 if (NULL != tp->timerq.lock) { 573 PR_DestroyLock(tp->timerq.lock); 574 } 575 576 if (NULL != tp->ioq.lock) { 577 PR_DestroyLock(tp->ioq.lock); 578 } 579 if (NULL != tp->ioq.pollfds) { 580 PR_Free(tp->ioq.pollfds); 581 } 582 if (NULL != tp->ioq.notify_fd) { 583 PR_DestroyPollableEvent(tp->ioq.notify_fd); 584 } 585 PR_Free(tp); 586 } 587 return; 588 } 589 590 static PRThreadPool* alloc_threadpool(void) { 591 PRThreadPool* tp; 592 593 tp = (PRThreadPool*)PR_CALLOC(sizeof(*tp)); 594 if (NULL == tp) { 595 goto failed; 596 } 597 tp->jobq.lock = PR_NewLock(); 598 if (NULL == tp->jobq.lock) { 599 goto failed; 600 } 601 tp->jobq.cv = PR_NewCondVar(tp->jobq.lock); 602 if (NULL == tp->jobq.cv) { 603 goto failed; 604 } 605 tp->join_lock = PR_NewLock(); 606 if (NULL == tp->join_lock) { 607 goto failed; 608 } 609 #ifdef OPT_WINNT 610 tp->jobq.nt_completion_port = 611 CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); 612 if (NULL == tp->jobq.nt_completion_port) { 613 goto failed; 614 } 615 #endif 616 617 tp->ioq.lock = PR_NewLock(); 618 if (NULL == tp->ioq.lock) { 619 goto failed; 620 } 621 622 /* Timer queue */ 623 624 tp->timerq.lock = PR_NewLock(); 625 if (NULL == tp->timerq.lock) { 626 goto failed; 627 } 628 tp->timerq.cv = PR_NewCondVar(tp->timerq.lock); 629 if (NULL == tp->timerq.cv) { 630 goto failed; 631 } 632 633 tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock); 634 if (NULL == tp->shutdown_cv) { 635 goto failed; 636 } 637 tp->ioq.notify_fd = PR_NewPollableEvent(); 638 if (NULL == tp->ioq.notify_fd) { 639 goto failed; 640 } 641 return tp; 642 failed: 643 delete_threadpool(tp); 644 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); 645 return NULL; 646 } 647 648 /* Create thread pool */ 649 PR_IMPLEMENT(PRThreadPool*) 650 PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads, 651 PRUint32 stacksize) { 652 PRThreadPool* tp; 653 PRThread* thr; 654 int i; 655 wthread* wthrp; 656 657 tp = alloc_threadpool(); 658 if (NULL == tp) { 659 return NULL; 660 } 661 662 tp->init_threads = initial_threads; 663 tp->max_threads = max_threads; 664 tp->stacksize = stacksize; 665 PR_INIT_CLIST(&tp->jobq.list); 666 PR_INIT_CLIST(&tp->ioq.list); 667 PR_INIT_CLIST(&tp->timerq.list); 668 PR_INIT_CLIST(&tp->jobq.wthreads); 669 PR_INIT_CLIST(&tp->ioq.wthreads); 670 PR_INIT_CLIST(&tp->timerq.wthreads); 671 tp->shutdown = PR_FALSE; 672 673 PR_Lock(tp->jobq.lock); 674 for (i = 0; i < initial_threads; ++i) { 675 thr = PR_CreateThread(PR_USER_THREAD, wstart, tp, PR_PRIORITY_NORMAL, 676 PR_GLOBAL_THREAD, PR_JOINABLE_THREAD, stacksize); 677 PR_ASSERT(thr); 678 wthrp = PR_NEWZAP(wthread); 679 PR_ASSERT(wthrp); 680 wthrp->thread = thr; 681 PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads); 682 } 683 tp->current_threads = initial_threads; 684 685 thr = PR_CreateThread(PR_USER_THREAD, io_wstart, tp, PR_PRIORITY_NORMAL, 686 PR_GLOBAL_THREAD, PR_JOINABLE_THREAD, stacksize); 687 PR_ASSERT(thr); 688 wthrp = PR_NEWZAP(wthread); 689 PR_ASSERT(wthrp); 690 wthrp->thread = thr; 691 PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads); 692 693 thr = PR_CreateThread(PR_USER_THREAD, timer_wstart, tp, PR_PRIORITY_NORMAL, 694 PR_GLOBAL_THREAD, PR_JOINABLE_THREAD, stacksize); 695 PR_ASSERT(thr); 696 wthrp = PR_NEWZAP(wthread); 697 PR_ASSERT(wthrp); 698 wthrp->thread = thr; 699 PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads); 700 701 PR_Unlock(tp->jobq.lock); 702 return tp; 703 } 704 705 static void delete_job(PRJob* jobp) { 706 if (NULL != jobp) { 707 if (NULL != jobp->join_cv) { 708 PR_DestroyCondVar(jobp->join_cv); 709 jobp->join_cv = NULL; 710 } 711 if (NULL != jobp->cancel_cv) { 712 PR_DestroyCondVar(jobp->cancel_cv); 713 jobp->cancel_cv = NULL; 714 } 715 PR_DELETE(jobp); 716 } 717 } 718 719 static PRJob* alloc_job(PRBool joinable, PRThreadPool* tp) { 720 PRJob* jobp; 721 722 jobp = PR_NEWZAP(PRJob); 723 if (NULL == jobp) { 724 goto failed; 725 } 726 if (joinable) { 727 jobp->join_cv = PR_NewCondVar(tp->join_lock); 728 jobp->join_wait = PR_TRUE; 729 if (NULL == jobp->join_cv) { 730 goto failed; 731 } 732 } else { 733 jobp->join_cv = NULL; 734 } 735 #ifdef OPT_WINNT 736 jobp->nt_notifier.jobp = jobp; 737 #endif 738 return jobp; 739 failed: 740 delete_job(jobp); 741 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); 742 return NULL; 743 } 744 745 /* queue a job */ 746 PR_IMPLEMENT(PRJob*) 747 PR_QueueJob(PRThreadPool* tpool, PRJobFn fn, void* arg, PRBool joinable) { 748 PRJob* jobp; 749 750 jobp = alloc_job(joinable, tpool); 751 if (NULL == jobp) { 752 return NULL; 753 } 754 755 jobp->job_func = fn; 756 jobp->job_arg = arg; 757 jobp->tpool = tpool; 758 759 add_to_jobq(tpool, jobp); 760 return jobp; 761 } 762 763 /* queue a job, when a socket is readable or writeable */ 764 static PRJob* queue_io_job(PRThreadPool* tpool, PRJobIoDesc* iod, PRJobFn fn, 765 void* arg, PRBool joinable, io_op_type op) { 766 PRJob* jobp; 767 PRIntervalTime now; 768 769 jobp = alloc_job(joinable, tpool); 770 if (NULL == jobp) { 771 return NULL; 772 } 773 774 /* 775 * Add a new job to io_jobq 776 * wakeup io worker thread 777 */ 778 779 jobp->job_func = fn; 780 jobp->job_arg = arg; 781 jobp->tpool = tpool; 782 jobp->iod = iod; 783 if (JOB_IO_READ == op) { 784 jobp->io_op = JOB_IO_READ; 785 jobp->io_poll_flags = PR_POLL_READ; 786 } else if (JOB_IO_WRITE == op) { 787 jobp->io_op = JOB_IO_WRITE; 788 jobp->io_poll_flags = PR_POLL_WRITE; 789 } else if (JOB_IO_ACCEPT == op) { 790 jobp->io_op = JOB_IO_ACCEPT; 791 jobp->io_poll_flags = PR_POLL_READ; 792 } else if (JOB_IO_CONNECT == op) { 793 jobp->io_op = JOB_IO_CONNECT; 794 jobp->io_poll_flags = PR_POLL_WRITE | PR_POLL_EXCEPT; 795 } else { 796 delete_job(jobp); 797 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); 798 return NULL; 799 } 800 801 jobp->timeout = iod->timeout; 802 if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) || 803 (PR_INTERVAL_NO_WAIT == iod->timeout)) { 804 jobp->absolute = iod->timeout; 805 } else { 806 now = PR_IntervalNow(); 807 jobp->absolute = now + iod->timeout; 808 } 809 810 PR_Lock(tpool->ioq.lock); 811 812 if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) || 813 (PR_INTERVAL_NO_TIMEOUT == iod->timeout)) { 814 PR_APPEND_LINK(&jobp->links, &tpool->ioq.list); 815 } else if (PR_INTERVAL_NO_WAIT == iod->timeout) { 816 PR_INSERT_LINK(&jobp->links, &tpool->ioq.list); 817 } else { 818 PRCList* qp; 819 PRJob* tmp_jobp; 820 /* 821 * insert into the timeout-sorted ioq 822 */ 823 for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list; qp = qp->prev) { 824 tmp_jobp = JOB_LINKS_PTR(qp); 825 if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) { 826 break; 827 } 828 } 829 PR_INSERT_AFTER(&jobp->links, qp); 830 } 831 832 jobp->on_ioq = PR_TRUE; 833 tpool->ioq.cnt++; 834 /* 835 * notify io worker thread(s) 836 */ 837 PR_Unlock(tpool->ioq.lock); 838 notify_ioq(tpool); 839 return jobp; 840 } 841 842 /* queue a job, when a socket is readable */ 843 PR_IMPLEMENT(PRJob*) 844 PR_QueueJob_Read(PRThreadPool* tpool, PRJobIoDesc* iod, PRJobFn fn, void* arg, 845 PRBool joinable) { 846 return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ)); 847 } 848 849 /* queue a job, when a socket is writeable */ 850 PR_IMPLEMENT(PRJob*) 851 PR_QueueJob_Write(PRThreadPool* tpool, PRJobIoDesc* iod, PRJobFn fn, void* arg, 852 PRBool joinable) { 853 return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE)); 854 } 855 856 /* queue a job, when a socket has a pending connection */ 857 PR_IMPLEMENT(PRJob*) 858 PR_QueueJob_Accept(PRThreadPool* tpool, PRJobIoDesc* iod, PRJobFn fn, void* arg, 859 PRBool joinable) { 860 return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT)); 861 } 862 863 /* queue a job, when a socket can be connected */ 864 PR_IMPLEMENT(PRJob*) 865 PR_QueueJob_Connect(PRThreadPool* tpool, PRJobIoDesc* iod, 866 const PRNetAddr* addr, PRJobFn fn, void* arg, 867 PRBool joinable) { 868 PRStatus rv; 869 PRErrorCode err; 870 871 rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT); 872 if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)) { 873 /* connection pending */ 874 return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT)); 875 } 876 /* 877 * connection succeeded or failed; add to jobq right away 878 */ 879 if (rv == PR_FAILURE) { 880 iod->error = err; 881 } else { 882 iod->error = 0; 883 } 884 return (PR_QueueJob(tpool, fn, arg, joinable)); 885 } 886 887 /* queue a job, when a timer expires */ 888 PR_IMPLEMENT(PRJob*) 889 PR_QueueJob_Timer(PRThreadPool* tpool, PRIntervalTime timeout, PRJobFn fn, 890 void* arg, PRBool joinable) { 891 PRIntervalTime now; 892 PRJob* jobp; 893 894 if (PR_INTERVAL_NO_TIMEOUT == timeout) { 895 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); 896 return NULL; 897 } 898 if (PR_INTERVAL_NO_WAIT == timeout) { 899 /* 900 * no waiting; add to jobq right away 901 */ 902 return (PR_QueueJob(tpool, fn, arg, joinable)); 903 } 904 jobp = alloc_job(joinable, tpool); 905 if (NULL == jobp) { 906 return NULL; 907 } 908 909 /* 910 * Add a new job to timer_jobq 911 * wakeup timer worker thread 912 */ 913 914 jobp->job_func = fn; 915 jobp->job_arg = arg; 916 jobp->tpool = tpool; 917 jobp->timeout = timeout; 918 919 now = PR_IntervalNow(); 920 jobp->absolute = now + timeout; 921 922 PR_Lock(tpool->timerq.lock); 923 jobp->on_timerq = PR_TRUE; 924 if (PR_CLIST_IS_EMPTY(&tpool->timerq.list)) { 925 PR_APPEND_LINK(&jobp->links, &tpool->timerq.list); 926 } else { 927 PRCList* qp; 928 PRJob* tmp_jobp; 929 /* 930 * insert into the sorted timer jobq 931 */ 932 for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list; 933 qp = qp->prev) { 934 tmp_jobp = JOB_LINKS_PTR(qp); 935 if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) { 936 break; 937 } 938 } 939 PR_INSERT_AFTER(&jobp->links, qp); 940 } 941 tpool->timerq.cnt++; 942 /* 943 * notify timer worker thread(s) 944 */ 945 notify_timerq(tpool); 946 PR_Unlock(tpool->timerq.lock); 947 return jobp; 948 } 949 950 static void notify_timerq(PRThreadPool* tp) { 951 /* 952 * wakeup the timer thread(s) 953 */ 954 PR_NotifyCondVar(tp->timerq.cv); 955 } 956 957 static void notify_ioq(PRThreadPool* tp) { 958 PRStatus rval_status; 959 960 /* 961 * wakeup the io thread(s) 962 */ 963 rval_status = PR_SetPollableEvent(tp->ioq.notify_fd); 964 PR_ASSERT(PR_SUCCESS == rval_status); 965 } 966 967 /* 968 * cancel a job 969 * 970 * XXXX: is this needed? likely to be removed 971 */ 972 PR_IMPLEMENT(PRStatus) 973 PR_CancelJob(PRJob* jobp) { 974 PRStatus rval = PR_FAILURE; 975 PRThreadPool* tp; 976 977 if (jobp->on_timerq) { 978 /* 979 * now, check again while holding the timerq lock 980 */ 981 tp = jobp->tpool; 982 PR_Lock(tp->timerq.lock); 983 if (jobp->on_timerq) { 984 jobp->on_timerq = PR_FALSE; 985 PR_REMOVE_AND_INIT_LINK(&jobp->links); 986 tp->timerq.cnt--; 987 PR_Unlock(tp->timerq.lock); 988 if (!JOINABLE_JOB(jobp)) { 989 delete_job(jobp); 990 } else { 991 JOIN_NOTIFY(jobp); 992 } 993 rval = PR_SUCCESS; 994 } else { 995 PR_Unlock(tp->timerq.lock); 996 } 997 } else if (jobp->on_ioq) { 998 /* 999 * now, check again while holding the ioq lock 1000 */ 1001 tp = jobp->tpool; 1002 PR_Lock(tp->ioq.lock); 1003 if (jobp->on_ioq) { 1004 jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock); 1005 if (NULL == jobp->cancel_cv) { 1006 PR_Unlock(tp->ioq.lock); 1007 PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0); 1008 return PR_FAILURE; 1009 } 1010 /* 1011 * mark job 'cancelled' and notify io thread(s) 1012 * XXXX: 1013 * this assumes there is only one io thread; when there 1014 * are multiple threads, the io thread processing this job 1015 * must be notified. 1016 */ 1017 jobp->cancel_io = PR_TRUE; 1018 PR_Unlock(tp->ioq.lock); /* release, reacquire ioq lock */ 1019 notify_ioq(tp); 1020 PR_Lock(tp->ioq.lock); 1021 while (jobp->cancel_io) { 1022 PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT); 1023 } 1024 PR_Unlock(tp->ioq.lock); 1025 PR_ASSERT(!jobp->on_ioq); 1026 if (!JOINABLE_JOB(jobp)) { 1027 delete_job(jobp); 1028 } else { 1029 JOIN_NOTIFY(jobp); 1030 } 1031 rval = PR_SUCCESS; 1032 } else { 1033 PR_Unlock(tp->ioq.lock); 1034 } 1035 } 1036 if (PR_FAILURE == rval) { 1037 PR_SetError(PR_INVALID_STATE_ERROR, 0); 1038 } 1039 return rval; 1040 } 1041 1042 /* join a job, wait until completion */ 1043 PR_IMPLEMENT(PRStatus) 1044 PR_JoinJob(PRJob* jobp) { 1045 if (!JOINABLE_JOB(jobp)) { 1046 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); 1047 return PR_FAILURE; 1048 } 1049 PR_Lock(jobp->tpool->join_lock); 1050 while (jobp->join_wait) { 1051 PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT); 1052 } 1053 PR_Unlock(jobp->tpool->join_lock); 1054 delete_job(jobp); 1055 return PR_SUCCESS; 1056 } 1057 1058 /* shutdown threadpool */ 1059 PR_IMPLEMENT(PRStatus) 1060 PR_ShutdownThreadPool(PRThreadPool* tpool) { 1061 PRStatus rval = PR_SUCCESS; 1062 1063 PR_Lock(tpool->jobq.lock); 1064 tpool->shutdown = PR_TRUE; 1065 PR_NotifyAllCondVar(tpool->shutdown_cv); 1066 PR_Unlock(tpool->jobq.lock); 1067 1068 return rval; 1069 } 1070 1071 /* 1072 * join thread pool 1073 * wait for termination of worker threads 1074 * reclaim threadpool resources 1075 */ 1076 PR_IMPLEMENT(PRStatus) 1077 PR_JoinThreadPool(PRThreadPool* tpool) { 1078 PRStatus rval = PR_SUCCESS; 1079 PRCList* head; 1080 PRStatus rval_status; 1081 1082 PR_Lock(tpool->jobq.lock); 1083 while (!tpool->shutdown) { 1084 PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT); 1085 } 1086 1087 /* 1088 * wakeup worker threads 1089 */ 1090 #ifdef OPT_WINNT 1091 /* 1092 * post shutdown notification for all threads 1093 */ 1094 { 1095 int i; 1096 for (i = 0; i < tpool->current_threads; i++) { 1097 PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0, TRUE, NULL); 1098 } 1099 } 1100 #else 1101 PR_NotifyAllCondVar(tpool->jobq.cv); 1102 #endif 1103 1104 /* 1105 * wakeup io thread(s) 1106 */ 1107 notify_ioq(tpool); 1108 1109 /* 1110 * wakeup timer thread(s) 1111 */ 1112 PR_Lock(tpool->timerq.lock); 1113 notify_timerq(tpool); 1114 PR_Unlock(tpool->timerq.lock); 1115 1116 while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) { 1117 wthread* wthrp; 1118 1119 head = PR_LIST_HEAD(&tpool->jobq.wthreads); 1120 PR_REMOVE_AND_INIT_LINK(head); 1121 PR_Unlock(tpool->jobq.lock); 1122 wthrp = WTHREAD_LINKS_PTR(head); 1123 rval_status = PR_JoinThread(wthrp->thread); 1124 PR_ASSERT(PR_SUCCESS == rval_status); 1125 PR_DELETE(wthrp); 1126 PR_Lock(tpool->jobq.lock); 1127 } 1128 PR_Unlock(tpool->jobq.lock); 1129 while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) { 1130 wthread* wthrp; 1131 1132 head = PR_LIST_HEAD(&tpool->ioq.wthreads); 1133 PR_REMOVE_AND_INIT_LINK(head); 1134 wthrp = WTHREAD_LINKS_PTR(head); 1135 rval_status = PR_JoinThread(wthrp->thread); 1136 PR_ASSERT(PR_SUCCESS == rval_status); 1137 PR_DELETE(wthrp); 1138 } 1139 1140 while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) { 1141 wthread* wthrp; 1142 1143 head = PR_LIST_HEAD(&tpool->timerq.wthreads); 1144 PR_REMOVE_AND_INIT_LINK(head); 1145 wthrp = WTHREAD_LINKS_PTR(head); 1146 rval_status = PR_JoinThread(wthrp->thread); 1147 PR_ASSERT(PR_SUCCESS == rval_status); 1148 PR_DELETE(wthrp); 1149 } 1150 1151 /* 1152 * Delete queued jobs 1153 */ 1154 while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) { 1155 PRJob* jobp; 1156 1157 head = PR_LIST_HEAD(&tpool->jobq.list); 1158 PR_REMOVE_AND_INIT_LINK(head); 1159 jobp = JOB_LINKS_PTR(head); 1160 tpool->jobq.cnt--; 1161 delete_job(jobp); 1162 } 1163 1164 /* delete io jobs */ 1165 while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) { 1166 PRJob* jobp; 1167 1168 head = PR_LIST_HEAD(&tpool->ioq.list); 1169 PR_REMOVE_AND_INIT_LINK(head); 1170 tpool->ioq.cnt--; 1171 jobp = JOB_LINKS_PTR(head); 1172 delete_job(jobp); 1173 } 1174 1175 /* delete timer jobs */ 1176 while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) { 1177 PRJob* jobp; 1178 1179 head = PR_LIST_HEAD(&tpool->timerq.list); 1180 PR_REMOVE_AND_INIT_LINK(head); 1181 tpool->timerq.cnt--; 1182 jobp = JOB_LINKS_PTR(head); 1183 delete_job(jobp); 1184 } 1185 1186 PR_ASSERT(0 == tpool->jobq.cnt); 1187 PR_ASSERT(0 == tpool->ioq.cnt); 1188 PR_ASSERT(0 == tpool->timerq.cnt); 1189 1190 delete_threadpool(tpool); 1191 return rval; 1192 }