prmwait.c (40481B)
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* This Source Code Form is subject to the terms of the Mozilla Public 3 * License, v. 2.0. If a copy of the MPL was not distributed with this 4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 5 6 #include "primpl.h" 7 #include "pprmwait.h" 8 9 #define _MW_REHASH_MAX 11 10 11 static PRLock* mw_lock = NULL; 12 static _PRGlobalState* mw_state = NULL; 13 14 static PRIntervalTime max_polling_interval; 15 16 #ifdef WINNT 17 18 typedef struct TimerEvent { 19 PRIntervalTime absolute; 20 void (*func)(void*); 21 void* arg; 22 LONG ref_count; 23 PRCList links; 24 } TimerEvent; 25 26 # define TIMER_EVENT_PTR(_qp) \ 27 ((TimerEvent*)((char*)(_qp) - offsetof(TimerEvent, links))) 28 29 struct { 30 PRLock* ml; 31 PRCondVar* new_timer; 32 PRCondVar* cancel_timer; 33 PRThread* manager_thread; 34 PRCList timer_queue; 35 } tm_vars; 36 37 static PRStatus TimerInit(void); 38 static void TimerManager(void* arg); 39 static TimerEvent* CreateTimer(PRIntervalTime timeout, void (*func)(void*), 40 void* arg); 41 static PRBool CancelTimer(TimerEvent* timer); 42 43 static void TimerManager(void* arg) { 44 PRIntervalTime now; 45 PRIntervalTime timeout; 46 PRCList* head; 47 TimerEvent* timer; 48 49 PR_Lock(tm_vars.ml); 50 while (1) { 51 if (PR_CLIST_IS_EMPTY(&tm_vars.timer_queue)) { 52 PR_WaitCondVar(tm_vars.new_timer, PR_INTERVAL_NO_TIMEOUT); 53 } else { 54 now = PR_IntervalNow(); 55 head = PR_LIST_HEAD(&tm_vars.timer_queue); 56 timer = TIMER_EVENT_PTR(head); 57 if ((PRInt32)(now - timer->absolute) >= 0) { 58 PR_REMOVE_LINK(head); 59 /* 60 * make its prev and next point to itself so that 61 * it's obvious that it's not on the timer_queue. 62 */ 63 PR_INIT_CLIST(head); 64 PR_ASSERT(2 == timer->ref_count); 65 PR_Unlock(tm_vars.ml); 66 timer->func(timer->arg); 67 PR_Lock(tm_vars.ml); 68 timer->ref_count -= 1; 69 if (0 == timer->ref_count) { 70 PR_NotifyAllCondVar(tm_vars.cancel_timer); 71 } 72 } else { 73 timeout = (PRIntervalTime)(timer->absolute - now); 74 PR_WaitCondVar(tm_vars.new_timer, timeout); 75 } 76 } 77 } 78 PR_Unlock(tm_vars.ml); 79 } 80 81 static TimerEvent* CreateTimer(PRIntervalTime timeout, void (*func)(void*), 82 void* arg) { 83 TimerEvent* timer; 84 PRCList *links, *tail; 85 TimerEvent* elem; 86 87 timer = PR_NEW(TimerEvent); 88 if (NULL == timer) { 89 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); 90 return timer; 91 } 92 timer->absolute = PR_IntervalNow() + timeout; 93 timer->func = func; 94 timer->arg = arg; 95 timer->ref_count = 2; 96 PR_Lock(tm_vars.ml); 97 tail = links = PR_LIST_TAIL(&tm_vars.timer_queue); 98 while (links->prev != tail) { 99 elem = TIMER_EVENT_PTR(links); 100 if ((PRInt32)(timer->absolute - elem->absolute) >= 0) { 101 break; 102 } 103 links = links->prev; 104 } 105 PR_INSERT_AFTER(&timer->links, links); 106 PR_NotifyCondVar(tm_vars.new_timer); 107 PR_Unlock(tm_vars.ml); 108 return timer; 109 } 110 111 static PRBool CancelTimer(TimerEvent* timer) { 112 PRBool canceled = PR_FALSE; 113 114 PR_Lock(tm_vars.ml); 115 timer->ref_count -= 1; 116 if (timer->links.prev == &timer->links) { 117 while (timer->ref_count == 1) { 118 PR_WaitCondVar(tm_vars.cancel_timer, PR_INTERVAL_NO_TIMEOUT); 119 } 120 } else { 121 PR_REMOVE_LINK(&timer->links); 122 canceled = PR_TRUE; 123 } 124 PR_Unlock(tm_vars.ml); 125 PR_DELETE(timer); 126 return canceled; 127 } 128 129 static PRStatus TimerInit(void) { 130 tm_vars.ml = PR_NewLock(); 131 if (NULL == tm_vars.ml) { 132 goto failed; 133 } 134 tm_vars.new_timer = PR_NewCondVar(tm_vars.ml); 135 if (NULL == tm_vars.new_timer) { 136 goto failed; 137 } 138 tm_vars.cancel_timer = PR_NewCondVar(tm_vars.ml); 139 if (NULL == tm_vars.cancel_timer) { 140 goto failed; 141 } 142 PR_INIT_CLIST(&tm_vars.timer_queue); 143 tm_vars.manager_thread = 144 PR_CreateThread(PR_SYSTEM_THREAD, TimerManager, NULL, PR_PRIORITY_NORMAL, 145 PR_LOCAL_THREAD, PR_UNJOINABLE_THREAD, 0); 146 if (NULL == tm_vars.manager_thread) { 147 goto failed; 148 } 149 return PR_SUCCESS; 150 151 failed: 152 if (NULL != tm_vars.cancel_timer) { 153 PR_DestroyCondVar(tm_vars.cancel_timer); 154 } 155 if (NULL != tm_vars.new_timer) { 156 PR_DestroyCondVar(tm_vars.new_timer); 157 } 158 if (NULL != tm_vars.ml) { 159 PR_DestroyLock(tm_vars.ml); 160 } 161 return PR_FAILURE; 162 } 163 164 #endif /* WINNT */ 165 166 /******************************************************************/ 167 /******************************************************************/ 168 /************************ The private portion *********************/ 169 /******************************************************************/ 170 /******************************************************************/ 171 void _PR_InitMW(void) { 172 #ifdef WINNT 173 /* 174 * We use NT 4's InterlockedCompareExchange() to operate 175 * on PRMWStatus variables. 176 */ 177 PR_ASSERT(sizeof(LONG) == sizeof(PRMWStatus)); 178 TimerInit(); 179 #endif 180 mw_lock = PR_NewLock(); 181 PR_ASSERT(NULL != mw_lock); 182 mw_state = PR_NEWZAP(_PRGlobalState); 183 PR_ASSERT(NULL != mw_state); 184 PR_INIT_CLIST(&mw_state->group_list); 185 max_polling_interval = PR_MillisecondsToInterval(MAX_POLLING_INTERVAL); 186 } /* _PR_InitMW */ 187 188 void _PR_CleanupMW(void) { 189 PR_DestroyLock(mw_lock); 190 mw_lock = NULL; 191 if (mw_state->group) { 192 PR_DestroyWaitGroup(mw_state->group); 193 /* mw_state->group is set to NULL as a side effect. */ 194 } 195 PR_DELETE(mw_state); 196 } /* _PR_CleanupMW */ 197 198 static PRWaitGroup* MW_Init2(void) { 199 PRWaitGroup* group = mw_state->group; /* it's the null group */ 200 if (NULL == group) /* there is this special case */ 201 { 202 group = PR_CreateWaitGroup(_PR_DEFAULT_HASH_LENGTH); 203 if (NULL == group) { 204 goto failed_alloc; 205 } 206 PR_Lock(mw_lock); 207 if (NULL == mw_state->group) { 208 mw_state->group = group; 209 group = NULL; 210 } 211 PR_Unlock(mw_lock); 212 if (group != NULL) { 213 (void)PR_DestroyWaitGroup(group); 214 } 215 group = mw_state->group; /* somebody beat us to it */ 216 } 217 failed_alloc: 218 return group; /* whatever */ 219 } /* MW_Init2 */ 220 221 static _PR_HashStory MW_AddHashInternal(PRRecvWait* desc, _PRWaiterHash* hash) { 222 /* 223 ** The entries are put in the table using the fd (PRFileDesc*) of 224 ** the receive descriptor as the key. This allows us to locate 225 ** the appropriate entry aqain when the poll operation finishes. 226 ** 227 ** The pointer to the file descriptor object is first divided by 228 ** the natural alignment of a pointer in the belief that object 229 ** will have at least that many zeros in the low order bits. 230 ** This may not be a good assuption. 231 ** 232 ** We try to put the entry in by rehashing _MW_REHASH_MAX times. After 233 ** that we declare defeat and force the table to be reconstructed. 234 ** Since some fds might be added more than once, won't that cause 235 ** collisions even in an empty table? 236 */ 237 PRIntn rehash = _MW_REHASH_MAX; 238 PRRecvWait** waiter; 239 PRUintn hidx = _MW_HASH(desc->fd, hash->length); 240 PRUintn hoffset = 0; 241 242 while (rehash-- > 0) { 243 waiter = &hash->recv_wait; 244 if (NULL == waiter[hidx]) { 245 waiter[hidx] = desc; 246 hash->count += 1; 247 #if 0 248 printf("Adding 0x%x->0x%x ", desc, desc->fd); 249 printf( 250 "table[%u:%u:*%u]: 0x%x->0x%x\n", 251 hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd); 252 #endif 253 return _prmw_success; 254 } 255 if (desc == waiter[hidx]) { 256 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); /* desc already in table */ 257 return _prmw_error; 258 } 259 #if 0 260 printf("Failing 0x%x->0x%x ", desc, desc->fd); 261 printf( 262 "table[*%u:%u:%u]: 0x%x->0x%x\n", 263 hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd); 264 #endif 265 if (0 == hoffset) { 266 hoffset = _MW_HASH2(desc->fd, hash->length); 267 PR_ASSERT(0 != hoffset); 268 } 269 hidx = (hidx + hoffset) % (hash->length); 270 } 271 return _prmw_rehash; 272 } /* MW_AddHashInternal */ 273 274 static _PR_HashStory MW_ExpandHashInternal(PRWaitGroup* group) { 275 PRRecvWait** desc; 276 PRUint32 pidx, length; 277 _PRWaiterHash *newHash, *oldHash = group->waiter; 278 PRBool retry; 279 _PR_HashStory hrv; 280 281 static const PRInt32 prime_number[] = {_PR_DEFAULT_HASH_LENGTH, 282 179, 283 521, 284 907, 285 1427, 286 2711, 287 3917, 288 5021, 289 8219, 290 11549, 291 18911, 292 26711, 293 33749, 294 44771}; 295 PRUintn primes = (sizeof(prime_number) / sizeof(PRInt32)); 296 297 /* look up the next size we'd like to use for the hash table */ 298 for (pidx = 0; pidx < primes; ++pidx) { 299 if (prime_number[pidx] == oldHash->length) { 300 break; 301 } 302 } 303 /* table size must be one of the prime numbers */ 304 PR_ASSERT(pidx < primes); 305 306 /* if pidx == primes - 1, we can't expand the table any more */ 307 while (pidx < primes - 1) { 308 /* next size */ 309 ++pidx; 310 length = prime_number[pidx]; 311 312 /* allocate the new hash table and fill it in with the old */ 313 newHash = (_PRWaiterHash*)PR_CALLOC(sizeof(_PRWaiterHash) + 314 (length * sizeof(PRRecvWait*))); 315 if (NULL == newHash) { 316 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); 317 return _prmw_error; 318 } 319 320 newHash->length = length; 321 retry = PR_FALSE; 322 for (desc = &oldHash->recv_wait; newHash->count < oldHash->count; ++desc) { 323 PR_ASSERT(desc < &oldHash->recv_wait + oldHash->length); 324 if (NULL != *desc) { 325 hrv = MW_AddHashInternal(*desc, newHash); 326 PR_ASSERT(_prmw_error != hrv); 327 if (_prmw_success != hrv) { 328 PR_DELETE(newHash); 329 retry = PR_TRUE; 330 break; 331 } 332 } 333 } 334 if (retry) { 335 continue; 336 } 337 338 PR_DELETE(group->waiter); 339 group->waiter = newHash; 340 group->p_timestamp += 1; 341 return _prmw_success; 342 } 343 344 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); 345 return _prmw_error; /* we're hosed */ 346 } /* MW_ExpandHashInternal */ 347 348 #ifndef WINNT 349 static void _MW_DoneInternal(PRWaitGroup* group, PRRecvWait** waiter, 350 PRMWStatus outcome) { 351 /* 352 ** Add this receive wait object to the list of finished I/O 353 ** operations for this particular group. If there are other 354 ** threads waiting on the group, notify one. If not, arrange 355 ** for this thread to return. 356 */ 357 358 # if 0 359 printf("Removing 0x%x->0x%x\n", *waiter, (*waiter)->fd); 360 # endif 361 (*waiter)->outcome = outcome; 362 PR_APPEND_LINK(&((*waiter)->internal), &group->io_ready); 363 PR_NotifyCondVar(group->io_complete); 364 PR_ASSERT(0 != group->waiter->count); 365 group->waiter->count -= 1; 366 *waiter = NULL; 367 } /* _MW_DoneInternal */ 368 #endif /* WINNT */ 369 370 static PRRecvWait** _MW_LookupInternal(PRWaitGroup* group, PRFileDesc* fd) { 371 /* 372 ** Find the receive wait object corresponding to the file descriptor. 373 ** Only search the wait group specified. 374 */ 375 PRRecvWait** desc; 376 PRIntn rehash = _MW_REHASH_MAX; 377 _PRWaiterHash* hash = group->waiter; 378 PRUintn hidx = _MW_HASH(fd, hash->length); 379 PRUintn hoffset = 0; 380 381 while (rehash-- > 0) { 382 desc = (&hash->recv_wait) + hidx; 383 if ((*desc != NULL) && ((*desc)->fd == fd)) { 384 return desc; 385 } 386 if (0 == hoffset) { 387 hoffset = _MW_HASH2(fd, hash->length); 388 PR_ASSERT(0 != hoffset); 389 } 390 hidx = (hidx + hoffset) % (hash->length); 391 } 392 return NULL; 393 } /* _MW_LookupInternal */ 394 395 #ifndef WINNT 396 static PRStatus _MW_PollInternal(PRWaitGroup* group) { 397 PRRecvWait** waiter; 398 PRStatus rv = PR_FAILURE; 399 PRInt32 count, count_ready; 400 PRIntervalTime polling_interval; 401 402 group->poller = PR_GetCurrentThread(); 403 404 while (PR_TRUE) { 405 PRIntervalTime now, since_last_poll; 406 PRPollDesc* poll_list; 407 408 while (0 == group->waiter->count) { 409 PRStatus st; 410 st = PR_WaitCondVar(group->new_business, PR_INTERVAL_NO_TIMEOUT); 411 if (_prmw_running != group->state) { 412 PR_SetError(PR_INVALID_STATE_ERROR, 0); 413 goto aborted; 414 } 415 if (_MW_ABORTED(st)) { 416 goto aborted; 417 } 418 } 419 420 /* 421 ** There's something to do. See if our existing polling list 422 ** is large enough for what we have to do? 423 */ 424 425 while (group->polling_count < group->waiter->count) { 426 PRUint32 old_count = group->waiter->count; 427 PRUint32 new_count = PR_ROUNDUP(old_count, _PR_POLL_COUNT_FUDGE); 428 PRSize new_size = sizeof(PRPollDesc) * new_count; 429 PRPollDesc* old_polling_list = group->polling_list; 430 431 PR_Unlock(group->ml); 432 poll_list = (PRPollDesc*)PR_CALLOC(new_size); 433 if (NULL == poll_list) { 434 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); 435 PR_Lock(group->ml); 436 goto failed_alloc; 437 } 438 if (NULL != old_polling_list) { 439 PR_DELETE(old_polling_list); 440 } 441 PR_Lock(group->ml); 442 if (_prmw_running != group->state) { 443 PR_DELETE(poll_list); 444 PR_SetError(PR_INVALID_STATE_ERROR, 0); 445 goto aborted; 446 } 447 group->polling_list = poll_list; 448 group->polling_count = new_count; 449 } 450 451 now = PR_IntervalNow(); 452 polling_interval = max_polling_interval; 453 since_last_poll = now - group->last_poll; 454 455 waiter = &group->waiter->recv_wait; 456 poll_list = group->polling_list; 457 for (count = 0; count < group->waiter->count; ++waiter) { 458 PR_ASSERT(waiter < &group->waiter->recv_wait + group->waiter->length); 459 if (NULL != *waiter) /* a live one! */ 460 { 461 if ((PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout) && 462 (since_last_poll >= (*waiter)->timeout)) { 463 _MW_DoneInternal(group, waiter, PR_MW_TIMEOUT); 464 } else { 465 if (PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout) { 466 (*waiter)->timeout -= since_last_poll; 467 if ((*waiter)->timeout < polling_interval) { 468 polling_interval = (*waiter)->timeout; 469 } 470 } 471 PR_ASSERT(poll_list < group->polling_list + group->polling_count); 472 poll_list->fd = (*waiter)->fd; 473 poll_list->in_flags = PR_POLL_READ; 474 poll_list->out_flags = 0; 475 # if 0 476 printf( 477 "Polling 0x%x[%d]: [fd: 0x%x, tmo: %u]\n", 478 poll_list, count, poll_list->fd, (*waiter)->timeout); 479 # endif 480 poll_list += 1; 481 count += 1; 482 } 483 } 484 } 485 486 PR_ASSERT(count == group->waiter->count); 487 488 /* 489 ** If there are no more threads waiting for completion, 490 ** we need to return. 491 */ 492 if ((!PR_CLIST_IS_EMPTY(&group->io_ready)) && 493 (1 == group->waiting_threads)) { 494 break; 495 } 496 497 if (0 == count) { 498 continue; /* wait for new business */ 499 } 500 501 group->last_poll = now; 502 503 PR_Unlock(group->ml); 504 505 count_ready = PR_Poll(group->polling_list, count, polling_interval); 506 507 PR_Lock(group->ml); 508 509 if (_prmw_running != group->state) { 510 PR_SetError(PR_INVALID_STATE_ERROR, 0); 511 goto aborted; 512 } 513 if (-1 == count_ready) { 514 goto failed_poll; /* that's a shame */ 515 } else if (0 < count_ready) { 516 for (poll_list = group->polling_list; count > 0; poll_list++, count--) { 517 PR_ASSERT(poll_list < group->polling_list + group->polling_count); 518 if (poll_list->out_flags != 0) { 519 waiter = _MW_LookupInternal(group, poll_list->fd); 520 /* 521 ** If 'waiter' is NULL, that means the wait receive 522 ** descriptor has been canceled. 523 */ 524 if (NULL != waiter) { 525 _MW_DoneInternal(group, waiter, PR_MW_SUCCESS); 526 } 527 } 528 } 529 } 530 /* 531 ** If there are no more threads waiting for completion, 532 ** we need to return. 533 ** This thread was "borrowed" to do the polling, but it really 534 ** belongs to the client. 535 */ 536 if ((!PR_CLIST_IS_EMPTY(&group->io_ready)) && 537 (1 == group->waiting_threads)) { 538 break; 539 } 540 } 541 542 rv = PR_SUCCESS; 543 544 aborted: 545 failed_poll: 546 failed_alloc: 547 group->poller = NULL; /* we were that, not we ain't */ 548 if ((_prmw_running == group->state) && (group->waiting_threads > 1)) { 549 /* Wake up one thread to become the new poller. */ 550 PR_NotifyCondVar(group->io_complete); 551 } 552 return rv; /* we return with the lock held */ 553 } /* _MW_PollInternal */ 554 #endif /* !WINNT */ 555 556 static PRMWGroupState MW_TestForShutdownInternal(PRWaitGroup* group) { 557 PRMWGroupState rv = group->state; 558 /* 559 ** Looking at the group's fields is safe because 560 ** once the group's state is no longer running, it 561 ** cannot revert and there is a safe check on entry 562 ** to make sure no more threads are made to wait. 563 */ 564 if ((_prmw_stopping == rv) && (0 == group->waiting_threads)) { 565 rv = group->state = _prmw_stopped; 566 PR_NotifyCondVar(group->mw_manage); 567 } 568 return rv; 569 } /* MW_TestForShutdownInternal */ 570 571 #ifndef WINNT 572 static void _MW_InitialRecv(PRCList* io_ready) { 573 PRRecvWait* desc = (PRRecvWait*)io_ready; 574 if ((NULL == desc->buffer.start) || (0 == desc->buffer.length)) { 575 desc->bytesRecv = 0; 576 } else { 577 desc->bytesRecv = (desc->fd->methods->recv)( 578 desc->fd, desc->buffer.start, desc->buffer.length, 0, desc->timeout); 579 if (desc->bytesRecv < 0) { /* SetError should already be there */ 580 desc->outcome = PR_MW_FAILURE; 581 } 582 } 583 } /* _MW_InitialRecv */ 584 #endif 585 586 #ifdef WINNT 587 static void NT_TimeProc(void* arg) { 588 _MDOverlapped* overlapped = (_MDOverlapped*)arg; 589 PRRecvWait* desc = overlapped->data.mw.desc; 590 PRFileDesc* bottom; 591 592 if (InterlockedCompareExchange((LONG*)&desc->outcome, (LONG)PR_MW_TIMEOUT, 593 (LONG)PR_MW_PENDING) != (LONG)PR_MW_PENDING) { 594 /* This wait recv descriptor has already completed. */ 595 return; 596 } 597 598 /* close the osfd to abort the outstanding async io request */ 599 /* $$$$ 600 ** Little late to be checking if NSPR's on the bottom of stack, 601 ** but if we don't check, we can't assert that the private data 602 ** is what we think it is. 603 ** $$$$ 604 */ 605 bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER); 606 PR_ASSERT(NULL != bottom); 607 if (NULL != bottom) /* now what!?!?! */ 608 { 609 bottom->secret->state = _PR_FILEDESC_CLOSED; 610 if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR) { 611 fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError()); 612 PR_NOT_REACHED("What shall I do?"); 613 } 614 } 615 return; 616 } /* NT_TimeProc */ 617 618 static PRStatus NT_HashRemove(PRWaitGroup* group, PRFileDesc* fd) { 619 PRRecvWait** waiter; 620 621 _PR_MD_LOCK(&group->mdlock); 622 waiter = _MW_LookupInternal(group, fd); 623 if (NULL != waiter) { 624 group->waiter->count -= 1; 625 *waiter = NULL; 626 } 627 _PR_MD_UNLOCK(&group->mdlock); 628 return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE; 629 } 630 631 PRStatus NT_HashRemoveInternal(PRWaitGroup* group, PRFileDesc* fd) { 632 PRRecvWait** waiter; 633 634 waiter = _MW_LookupInternal(group, fd); 635 if (NULL != waiter) { 636 group->waiter->count -= 1; 637 *waiter = NULL; 638 } 639 return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE; 640 } 641 #endif /* WINNT */ 642 643 /******************************************************************/ 644 /******************************************************************/ 645 /********************** The public API portion ********************/ 646 /******************************************************************/ 647 /******************************************************************/ 648 PR_IMPLEMENT(PRStatus) 649 PR_AddWaitFileDesc(PRWaitGroup* group, PRRecvWait* desc) { 650 _PR_HashStory hrv; 651 PRStatus rv = PR_FAILURE; 652 #ifdef WINNT 653 _MDOverlapped* overlapped; 654 HANDLE hFile; 655 BOOL bResult; 656 DWORD dwError; 657 PRFileDesc* bottom; 658 #endif 659 660 if (!_pr_initialized) { 661 _PR_ImplicitInitialization(); 662 } 663 if ((NULL == group) && (NULL == (group = MW_Init2()))) { 664 return rv; 665 } 666 667 PR_ASSERT(NULL != desc->fd); 668 669 desc->outcome = PR_MW_PENDING; /* nice, well known value */ 670 desc->bytesRecv = 0; /* likewise, though this value is ambiguious */ 671 672 PR_Lock(group->ml); 673 674 if (_prmw_running != group->state) { 675 /* Not allowed to add after cancelling the group */ 676 desc->outcome = PR_MW_INTERRUPT; 677 PR_SetError(PR_INVALID_STATE_ERROR, 0); 678 PR_Unlock(group->ml); 679 return rv; 680 } 681 682 #ifdef WINNT 683 _PR_MD_LOCK(&group->mdlock); 684 #endif 685 686 /* 687 ** If the waiter count is zero at this point, there's no telling 688 ** how long we've been idle. Therefore, initialize the beginning 689 ** of the timing interval. As long as the list doesn't go empty, 690 ** it will maintain itself. 691 */ 692 if (0 == group->waiter->count) { 693 group->last_poll = PR_IntervalNow(); 694 } 695 696 do { 697 hrv = MW_AddHashInternal(desc, group->waiter); 698 if (_prmw_rehash != hrv) { 699 break; 700 } 701 hrv = MW_ExpandHashInternal(group); /* gruesome */ 702 if (_prmw_success != hrv) { 703 break; 704 } 705 } while (PR_TRUE); 706 707 #ifdef WINNT 708 _PR_MD_UNLOCK(&group->mdlock); 709 #endif 710 711 PR_NotifyCondVar(group->new_business); /* tell the world */ 712 rv = (_prmw_success == hrv) ? PR_SUCCESS : PR_FAILURE; 713 PR_Unlock(group->ml); 714 715 #ifdef WINNT 716 overlapped = PR_NEWZAP(_MDOverlapped); 717 if (NULL == overlapped) { 718 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); 719 NT_HashRemove(group, desc->fd); 720 return rv; 721 } 722 overlapped->ioModel = _MD_MultiWaitIO; 723 overlapped->data.mw.desc = desc; 724 overlapped->data.mw.group = group; 725 if (desc->timeout != PR_INTERVAL_NO_TIMEOUT) { 726 overlapped->data.mw.timer = 727 CreateTimer(desc->timeout, NT_TimeProc, overlapped); 728 if (0 == overlapped->data.mw.timer) { 729 NT_HashRemove(group, desc->fd); 730 PR_DELETE(overlapped); 731 /* 732 * XXX It appears that a maximum of 16 timer events can 733 * be outstanding. GetLastError() returns 0 when I try it. 734 */ 735 PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, GetLastError()); 736 return PR_FAILURE; 737 } 738 } 739 740 /* Reach to the bottom layer to get the OS fd */ 741 bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER); 742 PR_ASSERT(NULL != bottom); 743 if (NULL == bottom) { 744 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); 745 return PR_FAILURE; 746 } 747 hFile = (HANDLE)bottom->secret->md.osfd; 748 if (!bottom->secret->md.io_model_committed) { 749 PRInt32 st; 750 st = _md_Associate(hFile); 751 PR_ASSERT(0 != st); 752 bottom->secret->md.io_model_committed = PR_TRUE; 753 } 754 bResult = ReadFile(hFile, desc->buffer.start, (DWORD)desc->buffer.length, 755 NULL, &overlapped->overlapped); 756 if (FALSE == bResult && (dwError = GetLastError()) != ERROR_IO_PENDING) { 757 if (desc->timeout != PR_INTERVAL_NO_TIMEOUT) { 758 if (InterlockedCompareExchange((LONG*)&desc->outcome, (LONG)PR_MW_FAILURE, 759 (LONG)PR_MW_PENDING) == 760 (LONG)PR_MW_PENDING) { 761 CancelTimer(overlapped->data.mw.timer); 762 } 763 NT_HashRemove(group, desc->fd); 764 PR_DELETE(overlapped); 765 } 766 _PR_MD_MAP_READ_ERROR(dwError); 767 rv = PR_FAILURE; 768 } 769 #endif 770 771 return rv; 772 } /* PR_AddWaitFileDesc */ 773 774 PR_IMPLEMENT(PRRecvWait*) PR_WaitRecvReady(PRWaitGroup* group) { 775 PRCList* io_ready = NULL; 776 #ifdef WINNT 777 PRThread* me = _PR_MD_CURRENT_THREAD(); 778 _MDOverlapped* overlapped; 779 #endif 780 781 if (!_pr_initialized) { 782 _PR_ImplicitInitialization(); 783 } 784 if ((NULL == group) && (NULL == (group = MW_Init2()))) { 785 goto failed_init; 786 } 787 788 PR_Lock(group->ml); 789 790 if (_prmw_running != group->state) { 791 PR_SetError(PR_INVALID_STATE_ERROR, 0); 792 goto invalid_state; 793 } 794 795 group->waiting_threads += 1; /* the polling thread is counted */ 796 797 #ifdef WINNT 798 _PR_MD_LOCK(&group->mdlock); 799 while (PR_CLIST_IS_EMPTY(&group->io_ready)) { 800 _PR_THREAD_LOCK(me); 801 me->state = _PR_IO_WAIT; 802 PR_APPEND_LINK(&me->waitQLinks, &group->wait_list); 803 if (!_PR_IS_NATIVE_THREAD(me)) { 804 _PR_SLEEPQ_LOCK(me->cpu); 805 _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT); 806 _PR_SLEEPQ_UNLOCK(me->cpu); 807 } 808 _PR_THREAD_UNLOCK(me); 809 _PR_MD_UNLOCK(&group->mdlock); 810 PR_Unlock(group->ml); 811 _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT); 812 me->state = _PR_RUNNING; 813 PR_Lock(group->ml); 814 _PR_MD_LOCK(&group->mdlock); 815 if (_PR_PENDING_INTERRUPT(me)) { 816 PR_REMOVE_LINK(&me->waitQLinks); 817 _PR_MD_UNLOCK(&group->mdlock); 818 me->flags &= ~_PR_INTERRUPT; 819 me->io_suspended = PR_FALSE; 820 PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0); 821 goto aborted; 822 } 823 } 824 io_ready = PR_LIST_HEAD(&group->io_ready); 825 PR_ASSERT(io_ready != NULL); 826 PR_REMOVE_LINK(io_ready); 827 _PR_MD_UNLOCK(&group->mdlock); 828 overlapped = 829 (_MDOverlapped*)((char*)io_ready - offsetof(_MDOverlapped, data)); 830 io_ready = &overlapped->data.mw.desc->internal; 831 #else 832 do { 833 /* 834 ** If the I/O ready list isn't empty, have this thread 835 ** return with the first receive wait object that's available. 836 */ 837 if (PR_CLIST_IS_EMPTY(&group->io_ready)) { 838 /* 839 ** Is there a polling thread yet? If not, grab this thread 840 ** and use it. 841 */ 842 if (NULL == group->poller) { 843 /* 844 ** This thread will stay do polling until it becomes the only one 845 ** left to service a completion. Then it will return and there will 846 ** be none left to actually poll or to run completions. 847 ** 848 ** The polling function should only return w/ failure or 849 ** with some I/O ready. 850 */ 851 if (PR_FAILURE == _MW_PollInternal(group)) { 852 goto failed_poll; 853 } 854 } else { 855 /* 856 ** There are four reasons a thread can be awakened from 857 ** a wait on the io_complete condition variable. 858 ** 1. Some I/O has completed, i.e., the io_ready list 859 ** is nonempty. 860 ** 2. The wait group is canceled. 861 ** 3. The thread is interrupted. 862 ** 4. The current polling thread has to leave and needs 863 ** a replacement. 864 ** The logic to find a new polling thread is made more 865 ** complicated by all the other possible events. 866 ** I tried my best to write the logic clearly, but 867 ** it is still full of if's with continue and goto. 868 */ 869 PRStatus st; 870 do { 871 st = PR_WaitCondVar(group->io_complete, PR_INTERVAL_NO_TIMEOUT); 872 if (_prmw_running != group->state) { 873 PR_SetError(PR_INVALID_STATE_ERROR, 0); 874 goto aborted; 875 } 876 if (_MW_ABORTED(st) || (NULL == group->poller)) { 877 break; 878 } 879 } while (PR_CLIST_IS_EMPTY(&group->io_ready)); 880 881 /* 882 ** The thread is interrupted and has to leave. It might 883 ** have also been awakened to process ready i/o or be the 884 ** new poller. To be safe, if either condition is true, 885 ** we awaken another thread to take its place. 886 */ 887 if (_MW_ABORTED(st)) { 888 if ((NULL == group->poller || !PR_CLIST_IS_EMPTY(&group->io_ready)) && 889 group->waiting_threads > 1) { 890 PR_NotifyCondVar(group->io_complete); 891 } 892 goto aborted; 893 } 894 895 /* 896 ** A new poller is needed, but can I be the new poller? 897 ** If there is no i/o ready, sure. But if there is any 898 ** i/o ready, it has a higher priority. I want to 899 ** process the ready i/o first and wake up another 900 ** thread to be the new poller. 901 */ 902 if (NULL == group->poller) { 903 if (PR_CLIST_IS_EMPTY(&group->io_ready)) { 904 continue; 905 } 906 if (group->waiting_threads > 1) { 907 PR_NotifyCondVar(group->io_complete); 908 } 909 } 910 } 911 PR_ASSERT(!PR_CLIST_IS_EMPTY(&group->io_ready)); 912 } 913 io_ready = PR_LIST_HEAD(&group->io_ready); 914 PR_NotifyCondVar(group->io_taken); 915 PR_ASSERT(io_ready != NULL); 916 PR_REMOVE_LINK(io_ready); 917 } while (NULL == io_ready); 918 919 failed_poll: 920 921 #endif 922 923 aborted: 924 925 group->waiting_threads -= 1; 926 invalid_state: 927 (void)MW_TestForShutdownInternal(group); 928 PR_Unlock(group->ml); 929 930 failed_init: 931 if (NULL != io_ready) { 932 /* If the operation failed, record the reason why */ 933 switch (((PRRecvWait*)io_ready)->outcome) { 934 case PR_MW_PENDING: 935 PR_ASSERT(0); 936 break; 937 case PR_MW_SUCCESS: 938 #ifndef WINNT 939 _MW_InitialRecv(io_ready); 940 #endif 941 break; 942 #ifdef WINNT 943 case PR_MW_FAILURE: 944 _PR_MD_MAP_READ_ERROR(overlapped->data.mw.error); 945 break; 946 #endif 947 case PR_MW_TIMEOUT: 948 PR_SetError(PR_IO_TIMEOUT_ERROR, 0); 949 break; 950 case PR_MW_INTERRUPT: 951 PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0); 952 break; 953 default: 954 break; 955 } 956 #ifdef WINNT 957 if (NULL != overlapped->data.mw.timer) { 958 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT != overlapped->data.mw.desc->timeout); 959 CancelTimer(overlapped->data.mw.timer); 960 } else { 961 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT == overlapped->data.mw.desc->timeout); 962 } 963 PR_DELETE(overlapped); 964 #endif 965 } 966 return (PRRecvWait*)io_ready; 967 } /* PR_WaitRecvReady */ 968 969 PR_IMPLEMENT(PRStatus) 970 PR_CancelWaitFileDesc(PRWaitGroup* group, PRRecvWait* desc) { 971 #if !defined(WINNT) 972 PRRecvWait** recv_wait; 973 #endif 974 PRStatus rv = PR_SUCCESS; 975 if (NULL == group) { 976 group = mw_state->group; 977 } 978 PR_ASSERT(NULL != group); 979 if (NULL == group) { 980 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); 981 return PR_FAILURE; 982 } 983 984 PR_Lock(group->ml); 985 986 if (_prmw_running != group->state) { 987 PR_SetError(PR_INVALID_STATE_ERROR, 0); 988 rv = PR_FAILURE; 989 goto unlock; 990 } 991 992 #ifdef WINNT 993 if (InterlockedCompareExchange((LONG*)&desc->outcome, (LONG)PR_MW_INTERRUPT, 994 (LONG)PR_MW_PENDING) == (LONG)PR_MW_PENDING) { 995 PRFileDesc* bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER); 996 PR_ASSERT(NULL != bottom); 997 if (NULL == bottom) { 998 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); 999 goto unlock; 1000 } 1001 bottom->secret->state = _PR_FILEDESC_CLOSED; 1002 # if 0 1003 fprintf(stderr, "cancel wait recv: closing socket\n"); 1004 # endif 1005 if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR) { 1006 fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError()); 1007 exit(1); 1008 } 1009 } 1010 #else 1011 if (NULL != (recv_wait = _MW_LookupInternal(group, desc->fd))) { 1012 /* it was in the wait table */ 1013 _MW_DoneInternal(group, recv_wait, PR_MW_INTERRUPT); 1014 goto unlock; 1015 } 1016 if (!PR_CLIST_IS_EMPTY(&group->io_ready)) { 1017 /* is it already complete? */ 1018 PRCList* head = PR_LIST_HEAD(&group->io_ready); 1019 do { 1020 PRRecvWait* done = (PRRecvWait*)head; 1021 if (done == desc) { 1022 goto unlock; 1023 } 1024 head = PR_NEXT_LINK(head); 1025 } while (head != &group->io_ready); 1026 } 1027 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); 1028 rv = PR_FAILURE; 1029 1030 #endif 1031 unlock: 1032 PR_Unlock(group->ml); 1033 return rv; 1034 } /* PR_CancelWaitFileDesc */ 1035 1036 PR_IMPLEMENT(PRRecvWait*) PR_CancelWaitGroup(PRWaitGroup* group) { 1037 PRRecvWait** desc; 1038 PRRecvWait* recv_wait = NULL; 1039 #ifdef WINNT 1040 _MDOverlapped* overlapped; 1041 PRRecvWait** end; 1042 PRThread* me = _PR_MD_CURRENT_THREAD(); 1043 #endif 1044 1045 if (NULL == group) { 1046 group = mw_state->group; 1047 } 1048 PR_ASSERT(NULL != group); 1049 if (NULL == group) { 1050 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); 1051 return NULL; 1052 } 1053 1054 PR_Lock(group->ml); 1055 if (_prmw_stopped != group->state) { 1056 if (_prmw_running == group->state) { 1057 group->state = _prmw_stopping; /* so nothing new comes in */ 1058 } 1059 if (0 == group->waiting_threads) { /* is there anybody else? */ 1060 group->state = _prmw_stopped; /* we can stop right now */ 1061 } else { 1062 PR_NotifyAllCondVar(group->new_business); 1063 PR_NotifyAllCondVar(group->io_complete); 1064 } 1065 while (_prmw_stopped != group->state) { 1066 (void)PR_WaitCondVar(group->mw_manage, PR_INTERVAL_NO_TIMEOUT); 1067 } 1068 } 1069 1070 #ifdef WINNT 1071 _PR_MD_LOCK(&group->mdlock); 1072 #endif 1073 /* make all the existing descriptors look done/interrupted */ 1074 #ifdef WINNT 1075 end = &group->waiter->recv_wait + group->waiter->length; 1076 for (desc = &group->waiter->recv_wait; desc < end; ++desc) { 1077 if (NULL != *desc) { 1078 if (InterlockedCompareExchange( 1079 (LONG*)&(*desc)->outcome, (LONG)PR_MW_INTERRUPT, 1080 (LONG)PR_MW_PENDING) == (LONG)PR_MW_PENDING) { 1081 PRFileDesc* bottom = 1082 PR_GetIdentitiesLayer((*desc)->fd, PR_NSPR_IO_LAYER); 1083 PR_ASSERT(NULL != bottom); 1084 if (NULL == bottom) { 1085 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); 1086 goto invalid_arg; 1087 } 1088 bottom->secret->state = _PR_FILEDESC_CLOSED; 1089 # if 0 1090 fprintf(stderr, "cancel wait group: closing socket\n"); 1091 # endif 1092 if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR) { 1093 fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError()); 1094 exit(1); 1095 } 1096 } 1097 } 1098 } 1099 while (group->waiter->count > 0) { 1100 _PR_THREAD_LOCK(me); 1101 me->state = _PR_IO_WAIT; 1102 PR_APPEND_LINK(&me->waitQLinks, &group->wait_list); 1103 if (!_PR_IS_NATIVE_THREAD(me)) { 1104 _PR_SLEEPQ_LOCK(me->cpu); 1105 _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT); 1106 _PR_SLEEPQ_UNLOCK(me->cpu); 1107 } 1108 _PR_THREAD_UNLOCK(me); 1109 _PR_MD_UNLOCK(&group->mdlock); 1110 PR_Unlock(group->ml); 1111 _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT); 1112 me->state = _PR_RUNNING; 1113 PR_Lock(group->ml); 1114 _PR_MD_LOCK(&group->mdlock); 1115 } 1116 #else 1117 for (desc = &group->waiter->recv_wait; group->waiter->count > 0; ++desc) { 1118 PR_ASSERT(desc < &group->waiter->recv_wait + group->waiter->length); 1119 if (NULL != *desc) { 1120 _MW_DoneInternal(group, desc, PR_MW_INTERRUPT); 1121 } 1122 } 1123 #endif 1124 1125 /* take first element of finished list and return it or NULL */ 1126 if (PR_CLIST_IS_EMPTY(&group->io_ready)) { 1127 PR_SetError(PR_GROUP_EMPTY_ERROR, 0); 1128 } else { 1129 PRCList* head = PR_LIST_HEAD(&group->io_ready); 1130 PR_REMOVE_AND_INIT_LINK(head); 1131 #ifdef WINNT 1132 overlapped = (_MDOverlapped*)((char*)head - offsetof(_MDOverlapped, data)); 1133 head = &overlapped->data.mw.desc->internal; 1134 if (NULL != overlapped->data.mw.timer) { 1135 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT != overlapped->data.mw.desc->timeout); 1136 CancelTimer(overlapped->data.mw.timer); 1137 } else { 1138 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT == overlapped->data.mw.desc->timeout); 1139 } 1140 PR_DELETE(overlapped); 1141 #endif 1142 recv_wait = (PRRecvWait*)head; 1143 } 1144 #ifdef WINNT 1145 invalid_arg: 1146 _PR_MD_UNLOCK(&group->mdlock); 1147 #endif 1148 PR_Unlock(group->ml); 1149 1150 return recv_wait; 1151 } /* PR_CancelWaitGroup */ 1152 1153 PR_IMPLEMENT(PRWaitGroup*) PR_CreateWaitGroup(PRInt32 size /* ignored */) { 1154 PRWaitGroup* wg; 1155 1156 if (NULL == (wg = PR_NEWZAP(PRWaitGroup))) { 1157 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); 1158 goto failed; 1159 } 1160 /* the wait group itself */ 1161 wg->ml = PR_NewLock(); 1162 if (NULL == wg->ml) { 1163 goto failed_lock; 1164 } 1165 wg->io_taken = PR_NewCondVar(wg->ml); 1166 if (NULL == wg->io_taken) { 1167 goto failed_cvar0; 1168 } 1169 wg->io_complete = PR_NewCondVar(wg->ml); 1170 if (NULL == wg->io_complete) { 1171 goto failed_cvar1; 1172 } 1173 wg->new_business = PR_NewCondVar(wg->ml); 1174 if (NULL == wg->new_business) { 1175 goto failed_cvar2; 1176 } 1177 wg->mw_manage = PR_NewCondVar(wg->ml); 1178 if (NULL == wg->mw_manage) { 1179 goto failed_cvar3; 1180 } 1181 1182 PR_INIT_CLIST(&wg->group_link); 1183 PR_INIT_CLIST(&wg->io_ready); 1184 1185 /* the waiters sequence */ 1186 wg->waiter = (_PRWaiterHash*)PR_CALLOC( 1187 sizeof(_PRWaiterHash) + (_PR_DEFAULT_HASH_LENGTH * sizeof(PRRecvWait*))); 1188 if (NULL == wg->waiter) { 1189 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); 1190 goto failed_waiter; 1191 } 1192 wg->waiter->count = 0; 1193 wg->waiter->length = _PR_DEFAULT_HASH_LENGTH; 1194 1195 #ifdef WINNT 1196 _PR_MD_NEW_LOCK(&wg->mdlock); 1197 PR_INIT_CLIST(&wg->wait_list); 1198 #endif /* WINNT */ 1199 1200 PR_Lock(mw_lock); 1201 PR_APPEND_LINK(&wg->group_link, &mw_state->group_list); 1202 PR_Unlock(mw_lock); 1203 return wg; 1204 1205 failed_waiter: 1206 PR_DestroyCondVar(wg->mw_manage); 1207 failed_cvar3: 1208 PR_DestroyCondVar(wg->new_business); 1209 failed_cvar2: 1210 PR_DestroyCondVar(wg->io_complete); 1211 failed_cvar1: 1212 PR_DestroyCondVar(wg->io_taken); 1213 failed_cvar0: 1214 PR_DestroyLock(wg->ml); 1215 failed_lock: 1216 PR_DELETE(wg); 1217 wg = NULL; 1218 1219 failed: 1220 return wg; 1221 } /* MW_CreateWaitGroup */ 1222 1223 PR_IMPLEMENT(PRStatus) PR_DestroyWaitGroup(PRWaitGroup* group) { 1224 PRStatus rv = PR_SUCCESS; 1225 if (NULL == group) { 1226 group = mw_state->group; 1227 } 1228 PR_ASSERT(NULL != group); 1229 if (NULL != group) { 1230 PR_Lock(group->ml); 1231 if ((group->waiting_threads == 0) && (group->waiter->count == 0) && 1232 PR_CLIST_IS_EMPTY(&group->io_ready)) { 1233 group->state = _prmw_stopped; 1234 } else { 1235 PR_SetError(PR_INVALID_STATE_ERROR, 0); 1236 rv = PR_FAILURE; 1237 } 1238 PR_Unlock(group->ml); 1239 if (PR_FAILURE == rv) { 1240 return rv; 1241 } 1242 1243 PR_Lock(mw_lock); 1244 PR_REMOVE_LINK(&group->group_link); 1245 PR_Unlock(mw_lock); 1246 1247 #ifdef WINNT 1248 /* 1249 * XXX make sure wait_list is empty and waiter is empty. 1250 * These must be checked while holding mdlock. 1251 */ 1252 _PR_MD_FREE_LOCK(&group->mdlock); 1253 #endif 1254 1255 PR_DELETE(group->waiter); 1256 PR_DELETE(group->polling_list); 1257 PR_DestroyCondVar(group->mw_manage); 1258 PR_DestroyCondVar(group->new_business); 1259 PR_DestroyCondVar(group->io_complete); 1260 PR_DestroyCondVar(group->io_taken); 1261 PR_DestroyLock(group->ml); 1262 if (group == mw_state->group) { 1263 mw_state->group = NULL; 1264 } 1265 PR_DELETE(group); 1266 } else { 1267 /* The default wait group is not created yet. */ 1268 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); 1269 rv = PR_FAILURE; 1270 } 1271 return rv; 1272 } /* PR_DestroyWaitGroup */ 1273 1274 /********************************************************************** 1275 *********************************************************************** 1276 ******************** Wait group enumerations ************************** 1277 *********************************************************************** 1278 **********************************************************************/ 1279 1280 PR_IMPLEMENT(PRMWaitEnumerator*) PR_CreateMWaitEnumerator(PRWaitGroup* group) { 1281 PRMWaitEnumerator* enumerator = PR_NEWZAP(PRMWaitEnumerator); 1282 if (NULL == enumerator) { 1283 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); 1284 } else { 1285 enumerator->group = group; 1286 enumerator->seal = _PR_ENUM_SEALED; 1287 } 1288 return enumerator; 1289 } /* PR_CreateMWaitEnumerator */ 1290 1291 PR_IMPLEMENT(PRStatus) 1292 PR_DestroyMWaitEnumerator(PRMWaitEnumerator* enumerator) { 1293 PR_ASSERT(NULL != enumerator); 1294 PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal); 1295 if ((NULL == enumerator) || (_PR_ENUM_SEALED != enumerator->seal)) { 1296 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); 1297 return PR_FAILURE; 1298 } 1299 enumerator->seal = _PR_ENUM_UNSEALED; 1300 PR_Free(enumerator); 1301 return PR_SUCCESS; 1302 } /* PR_DestroyMWaitEnumerator */ 1303 1304 PR_IMPLEMENT(PRRecvWait*) 1305 PR_EnumerateWaitGroup(PRMWaitEnumerator* enumerator, 1306 const PRRecvWait* previous) { 1307 PRRecvWait* result = NULL; 1308 1309 /* entry point sanity checking */ 1310 PR_ASSERT(NULL != enumerator); 1311 PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal); 1312 if ((NULL == enumerator) || (_PR_ENUM_SEALED != enumerator->seal)) { 1313 goto bad_argument; 1314 } 1315 1316 /* beginning of enumeration */ 1317 if (NULL == previous) { 1318 if (NULL == enumerator->group) { 1319 enumerator->group = mw_state->group; 1320 if (NULL == enumerator->group) { 1321 PR_SetError(PR_GROUP_EMPTY_ERROR, 0); 1322 return NULL; 1323 } 1324 } 1325 enumerator->waiter = &enumerator->group->waiter->recv_wait; 1326 enumerator->p_timestamp = enumerator->group->p_timestamp; 1327 enumerator->thread = PR_GetCurrentThread(); 1328 enumerator->index = 0; 1329 } 1330 /* continuing an enumeration */ 1331 else { 1332 PRThread* me = PR_GetCurrentThread(); 1333 PR_ASSERT(me == enumerator->thread); 1334 if (me != enumerator->thread) { 1335 goto bad_argument; 1336 } 1337 1338 /* need to restart the enumeration */ 1339 if (enumerator->p_timestamp != enumerator->group->p_timestamp) { 1340 return PR_EnumerateWaitGroup(enumerator, NULL); 1341 } 1342 } 1343 1344 /* actually progress the enumeration */ 1345 #if defined(WINNT) 1346 _PR_MD_LOCK(&enumerator->group->mdlock); 1347 #else 1348 PR_Lock(enumerator->group->ml); 1349 #endif 1350 while (enumerator->index++ < enumerator->group->waiter->length) { 1351 if (NULL != (result = *(enumerator->waiter)++)) { 1352 break; 1353 } 1354 } 1355 #if defined(WINNT) 1356 _PR_MD_UNLOCK(&enumerator->group->mdlock); 1357 #else 1358 PR_Unlock(enumerator->group->ml); 1359 #endif 1360 1361 return result; /* what we live for */ 1362 1363 bad_argument: 1364 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); 1365 return NULL; /* probably ambiguous */ 1366 } /* PR_EnumerateWaitGroup */ 1367 1368 /* prmwait.c */