tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

prmwait.c (40481B)


      1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
      2 /* This Source Code Form is subject to the terms of the Mozilla Public
      3 * License, v. 2.0. If a copy of the MPL was not distributed with this
      4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
      5 
      6 #include "primpl.h"
      7 #include "pprmwait.h"
      8 
      9 #define _MW_REHASH_MAX 11
     10 
     11 static PRLock* mw_lock = NULL;
     12 static _PRGlobalState* mw_state = NULL;
     13 
     14 static PRIntervalTime max_polling_interval;
     15 
     16 #ifdef WINNT
     17 
     18 typedef struct TimerEvent {
     19  PRIntervalTime absolute;
     20  void (*func)(void*);
     21  void* arg;
     22  LONG ref_count;
     23  PRCList links;
     24 } TimerEvent;
     25 
     26 #  define TIMER_EVENT_PTR(_qp) \
     27    ((TimerEvent*)((char*)(_qp) - offsetof(TimerEvent, links)))
     28 
     29 struct {
     30  PRLock* ml;
     31  PRCondVar* new_timer;
     32  PRCondVar* cancel_timer;
     33  PRThread* manager_thread;
     34  PRCList timer_queue;
     35 } tm_vars;
     36 
     37 static PRStatus TimerInit(void);
     38 static void TimerManager(void* arg);
     39 static TimerEvent* CreateTimer(PRIntervalTime timeout, void (*func)(void*),
     40                               void* arg);
     41 static PRBool CancelTimer(TimerEvent* timer);
     42 
     43 static void TimerManager(void* arg) {
     44  PRIntervalTime now;
     45  PRIntervalTime timeout;
     46  PRCList* head;
     47  TimerEvent* timer;
     48 
     49  PR_Lock(tm_vars.ml);
     50  while (1) {
     51    if (PR_CLIST_IS_EMPTY(&tm_vars.timer_queue)) {
     52      PR_WaitCondVar(tm_vars.new_timer, PR_INTERVAL_NO_TIMEOUT);
     53    } else {
     54      now = PR_IntervalNow();
     55      head = PR_LIST_HEAD(&tm_vars.timer_queue);
     56      timer = TIMER_EVENT_PTR(head);
     57      if ((PRInt32)(now - timer->absolute) >= 0) {
     58        PR_REMOVE_LINK(head);
     59        /*
     60         * make its prev and next point to itself so that
     61         * it's obvious that it's not on the timer_queue.
     62         */
     63        PR_INIT_CLIST(head);
     64        PR_ASSERT(2 == timer->ref_count);
     65        PR_Unlock(tm_vars.ml);
     66        timer->func(timer->arg);
     67        PR_Lock(tm_vars.ml);
     68        timer->ref_count -= 1;
     69        if (0 == timer->ref_count) {
     70          PR_NotifyAllCondVar(tm_vars.cancel_timer);
     71        }
     72      } else {
     73        timeout = (PRIntervalTime)(timer->absolute - now);
     74        PR_WaitCondVar(tm_vars.new_timer, timeout);
     75      }
     76    }
     77  }
     78  PR_Unlock(tm_vars.ml);
     79 }
     80 
     81 static TimerEvent* CreateTimer(PRIntervalTime timeout, void (*func)(void*),
     82                               void* arg) {
     83  TimerEvent* timer;
     84  PRCList *links, *tail;
     85  TimerEvent* elem;
     86 
     87  timer = PR_NEW(TimerEvent);
     88  if (NULL == timer) {
     89    PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
     90    return timer;
     91  }
     92  timer->absolute = PR_IntervalNow() + timeout;
     93  timer->func = func;
     94  timer->arg = arg;
     95  timer->ref_count = 2;
     96  PR_Lock(tm_vars.ml);
     97  tail = links = PR_LIST_TAIL(&tm_vars.timer_queue);
     98  while (links->prev != tail) {
     99    elem = TIMER_EVENT_PTR(links);
    100    if ((PRInt32)(timer->absolute - elem->absolute) >= 0) {
    101      break;
    102    }
    103    links = links->prev;
    104  }
    105  PR_INSERT_AFTER(&timer->links, links);
    106  PR_NotifyCondVar(tm_vars.new_timer);
    107  PR_Unlock(tm_vars.ml);
    108  return timer;
    109 }
    110 
    111 static PRBool CancelTimer(TimerEvent* timer) {
    112  PRBool canceled = PR_FALSE;
    113 
    114  PR_Lock(tm_vars.ml);
    115  timer->ref_count -= 1;
    116  if (timer->links.prev == &timer->links) {
    117    while (timer->ref_count == 1) {
    118      PR_WaitCondVar(tm_vars.cancel_timer, PR_INTERVAL_NO_TIMEOUT);
    119    }
    120  } else {
    121    PR_REMOVE_LINK(&timer->links);
    122    canceled = PR_TRUE;
    123  }
    124  PR_Unlock(tm_vars.ml);
    125  PR_DELETE(timer);
    126  return canceled;
    127 }
    128 
    129 static PRStatus TimerInit(void) {
    130  tm_vars.ml = PR_NewLock();
    131  if (NULL == tm_vars.ml) {
    132    goto failed;
    133  }
    134  tm_vars.new_timer = PR_NewCondVar(tm_vars.ml);
    135  if (NULL == tm_vars.new_timer) {
    136    goto failed;
    137  }
    138  tm_vars.cancel_timer = PR_NewCondVar(tm_vars.ml);
    139  if (NULL == tm_vars.cancel_timer) {
    140    goto failed;
    141  }
    142  PR_INIT_CLIST(&tm_vars.timer_queue);
    143  tm_vars.manager_thread =
    144      PR_CreateThread(PR_SYSTEM_THREAD, TimerManager, NULL, PR_PRIORITY_NORMAL,
    145                      PR_LOCAL_THREAD, PR_UNJOINABLE_THREAD, 0);
    146  if (NULL == tm_vars.manager_thread) {
    147    goto failed;
    148  }
    149  return PR_SUCCESS;
    150 
    151 failed:
    152  if (NULL != tm_vars.cancel_timer) {
    153    PR_DestroyCondVar(tm_vars.cancel_timer);
    154  }
    155  if (NULL != tm_vars.new_timer) {
    156    PR_DestroyCondVar(tm_vars.new_timer);
    157  }
    158  if (NULL != tm_vars.ml) {
    159    PR_DestroyLock(tm_vars.ml);
    160  }
    161  return PR_FAILURE;
    162 }
    163 
    164 #endif /* WINNT */
    165 
    166 /******************************************************************/
    167 /******************************************************************/
    168 /************************ The private portion *********************/
    169 /******************************************************************/
    170 /******************************************************************/
    171 void _PR_InitMW(void) {
    172 #ifdef WINNT
    173  /*
    174   * We use NT 4's InterlockedCompareExchange() to operate
    175   * on PRMWStatus variables.
    176   */
    177  PR_ASSERT(sizeof(LONG) == sizeof(PRMWStatus));
    178  TimerInit();
    179 #endif
    180  mw_lock = PR_NewLock();
    181  PR_ASSERT(NULL != mw_lock);
    182  mw_state = PR_NEWZAP(_PRGlobalState);
    183  PR_ASSERT(NULL != mw_state);
    184  PR_INIT_CLIST(&mw_state->group_list);
    185  max_polling_interval = PR_MillisecondsToInterval(MAX_POLLING_INTERVAL);
    186 } /* _PR_InitMW */
    187 
    188 void _PR_CleanupMW(void) {
    189  PR_DestroyLock(mw_lock);
    190  mw_lock = NULL;
    191  if (mw_state->group) {
    192    PR_DestroyWaitGroup(mw_state->group);
    193    /* mw_state->group is set to NULL as a side effect. */
    194  }
    195  PR_DELETE(mw_state);
    196 } /* _PR_CleanupMW */
    197 
    198 static PRWaitGroup* MW_Init2(void) {
    199  PRWaitGroup* group = mw_state->group; /* it's the null group */
    200  if (NULL == group)                    /* there is this special case */
    201  {
    202    group = PR_CreateWaitGroup(_PR_DEFAULT_HASH_LENGTH);
    203    if (NULL == group) {
    204      goto failed_alloc;
    205    }
    206    PR_Lock(mw_lock);
    207    if (NULL == mw_state->group) {
    208      mw_state->group = group;
    209      group = NULL;
    210    }
    211    PR_Unlock(mw_lock);
    212    if (group != NULL) {
    213      (void)PR_DestroyWaitGroup(group);
    214    }
    215    group = mw_state->group; /* somebody beat us to it */
    216  }
    217 failed_alloc:
    218  return group; /* whatever */
    219 } /* MW_Init2 */
    220 
    221 static _PR_HashStory MW_AddHashInternal(PRRecvWait* desc, _PRWaiterHash* hash) {
    222  /*
    223  ** The entries are put in the table using the fd (PRFileDesc*) of
    224  ** the receive descriptor as the key. This allows us to locate
    225  ** the appropriate entry aqain when the poll operation finishes.
    226  **
    227  ** The pointer to the file descriptor object is first divided by
    228  ** the natural alignment of a pointer in the belief that object
    229  ** will have at least that many zeros in the low order bits.
    230  ** This may not be a good assuption.
    231  **
    232  ** We try to put the entry in by rehashing _MW_REHASH_MAX times. After
    233  ** that we declare defeat and force the table to be reconstructed.
    234  ** Since some fds might be added more than once, won't that cause
    235  ** collisions even in an empty table?
    236  */
    237  PRIntn rehash = _MW_REHASH_MAX;
    238  PRRecvWait** waiter;
    239  PRUintn hidx = _MW_HASH(desc->fd, hash->length);
    240  PRUintn hoffset = 0;
    241 
    242  while (rehash-- > 0) {
    243    waiter = &hash->recv_wait;
    244    if (NULL == waiter[hidx]) {
    245      waiter[hidx] = desc;
    246      hash->count += 1;
    247 #if 0
    248            printf("Adding 0x%x->0x%x ", desc, desc->fd);
    249            printf(
    250                "table[%u:%u:*%u]: 0x%x->0x%x\n",
    251                hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
    252 #endif
    253      return _prmw_success;
    254    }
    255    if (desc == waiter[hidx]) {
    256      PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); /* desc already in table */
    257      return _prmw_error;
    258    }
    259 #if 0
    260        printf("Failing 0x%x->0x%x ", desc, desc->fd);
    261        printf(
    262            "table[*%u:%u:%u]: 0x%x->0x%x\n",
    263            hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
    264 #endif
    265    if (0 == hoffset) {
    266      hoffset = _MW_HASH2(desc->fd, hash->length);
    267      PR_ASSERT(0 != hoffset);
    268    }
    269    hidx = (hidx + hoffset) % (hash->length);
    270  }
    271  return _prmw_rehash;
    272 } /* MW_AddHashInternal */
    273 
    274 static _PR_HashStory MW_ExpandHashInternal(PRWaitGroup* group) {
    275  PRRecvWait** desc;
    276  PRUint32 pidx, length;
    277  _PRWaiterHash *newHash, *oldHash = group->waiter;
    278  PRBool retry;
    279  _PR_HashStory hrv;
    280 
    281  static const PRInt32 prime_number[] = {_PR_DEFAULT_HASH_LENGTH,
    282                                         179,
    283                                         521,
    284                                         907,
    285                                         1427,
    286                                         2711,
    287                                         3917,
    288                                         5021,
    289                                         8219,
    290                                         11549,
    291                                         18911,
    292                                         26711,
    293                                         33749,
    294                                         44771};
    295  PRUintn primes = (sizeof(prime_number) / sizeof(PRInt32));
    296 
    297  /* look up the next size we'd like to use for the hash table */
    298  for (pidx = 0; pidx < primes; ++pidx) {
    299    if (prime_number[pidx] == oldHash->length) {
    300      break;
    301    }
    302  }
    303  /* table size must be one of the prime numbers */
    304  PR_ASSERT(pidx < primes);
    305 
    306  /* if pidx == primes - 1, we can't expand the table any more */
    307  while (pidx < primes - 1) {
    308    /* next size */
    309    ++pidx;
    310    length = prime_number[pidx];
    311 
    312    /* allocate the new hash table and fill it in with the old */
    313    newHash = (_PRWaiterHash*)PR_CALLOC(sizeof(_PRWaiterHash) +
    314                                        (length * sizeof(PRRecvWait*)));
    315    if (NULL == newHash) {
    316      PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
    317      return _prmw_error;
    318    }
    319 
    320    newHash->length = length;
    321    retry = PR_FALSE;
    322    for (desc = &oldHash->recv_wait; newHash->count < oldHash->count; ++desc) {
    323      PR_ASSERT(desc < &oldHash->recv_wait + oldHash->length);
    324      if (NULL != *desc) {
    325        hrv = MW_AddHashInternal(*desc, newHash);
    326        PR_ASSERT(_prmw_error != hrv);
    327        if (_prmw_success != hrv) {
    328          PR_DELETE(newHash);
    329          retry = PR_TRUE;
    330          break;
    331        }
    332      }
    333    }
    334    if (retry) {
    335      continue;
    336    }
    337 
    338    PR_DELETE(group->waiter);
    339    group->waiter = newHash;
    340    group->p_timestamp += 1;
    341    return _prmw_success;
    342  }
    343 
    344  PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
    345  return _prmw_error; /* we're hosed */
    346 } /* MW_ExpandHashInternal */
    347 
    348 #ifndef WINNT
    349 static void _MW_DoneInternal(PRWaitGroup* group, PRRecvWait** waiter,
    350                             PRMWStatus outcome) {
    351  /*
    352  ** Add this receive wait object to the list of finished I/O
    353  ** operations for this particular group. If there are other
    354  ** threads waiting on the group, notify one. If not, arrange
    355  ** for this thread to return.
    356  */
    357 
    358 #  if 0
    359    printf("Removing 0x%x->0x%x\n", *waiter, (*waiter)->fd);
    360 #  endif
    361  (*waiter)->outcome = outcome;
    362  PR_APPEND_LINK(&((*waiter)->internal), &group->io_ready);
    363  PR_NotifyCondVar(group->io_complete);
    364  PR_ASSERT(0 != group->waiter->count);
    365  group->waiter->count -= 1;
    366  *waiter = NULL;
    367 } /* _MW_DoneInternal */
    368 #endif /* WINNT */
    369 
    370 static PRRecvWait** _MW_LookupInternal(PRWaitGroup* group, PRFileDesc* fd) {
    371  /*
    372  ** Find the receive wait object corresponding to the file descriptor.
    373  ** Only search the wait group specified.
    374  */
    375  PRRecvWait** desc;
    376  PRIntn rehash = _MW_REHASH_MAX;
    377  _PRWaiterHash* hash = group->waiter;
    378  PRUintn hidx = _MW_HASH(fd, hash->length);
    379  PRUintn hoffset = 0;
    380 
    381  while (rehash-- > 0) {
    382    desc = (&hash->recv_wait) + hidx;
    383    if ((*desc != NULL) && ((*desc)->fd == fd)) {
    384      return desc;
    385    }
    386    if (0 == hoffset) {
    387      hoffset = _MW_HASH2(fd, hash->length);
    388      PR_ASSERT(0 != hoffset);
    389    }
    390    hidx = (hidx + hoffset) % (hash->length);
    391  }
    392  return NULL;
    393 } /* _MW_LookupInternal */
    394 
    395 #ifndef WINNT
    396 static PRStatus _MW_PollInternal(PRWaitGroup* group) {
    397  PRRecvWait** waiter;
    398  PRStatus rv = PR_FAILURE;
    399  PRInt32 count, count_ready;
    400  PRIntervalTime polling_interval;
    401 
    402  group->poller = PR_GetCurrentThread();
    403 
    404  while (PR_TRUE) {
    405    PRIntervalTime now, since_last_poll;
    406    PRPollDesc* poll_list;
    407 
    408    while (0 == group->waiter->count) {
    409      PRStatus st;
    410      st = PR_WaitCondVar(group->new_business, PR_INTERVAL_NO_TIMEOUT);
    411      if (_prmw_running != group->state) {
    412        PR_SetError(PR_INVALID_STATE_ERROR, 0);
    413        goto aborted;
    414      }
    415      if (_MW_ABORTED(st)) {
    416        goto aborted;
    417      }
    418    }
    419 
    420    /*
    421    ** There's something to do. See if our existing polling list
    422    ** is large enough for what we have to do?
    423    */
    424 
    425    while (group->polling_count < group->waiter->count) {
    426      PRUint32 old_count = group->waiter->count;
    427      PRUint32 new_count = PR_ROUNDUP(old_count, _PR_POLL_COUNT_FUDGE);
    428      PRSize new_size = sizeof(PRPollDesc) * new_count;
    429      PRPollDesc* old_polling_list = group->polling_list;
    430 
    431      PR_Unlock(group->ml);
    432      poll_list = (PRPollDesc*)PR_CALLOC(new_size);
    433      if (NULL == poll_list) {
    434        PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
    435        PR_Lock(group->ml);
    436        goto failed_alloc;
    437      }
    438      if (NULL != old_polling_list) {
    439        PR_DELETE(old_polling_list);
    440      }
    441      PR_Lock(group->ml);
    442      if (_prmw_running != group->state) {
    443        PR_DELETE(poll_list);
    444        PR_SetError(PR_INVALID_STATE_ERROR, 0);
    445        goto aborted;
    446      }
    447      group->polling_list = poll_list;
    448      group->polling_count = new_count;
    449    }
    450 
    451    now = PR_IntervalNow();
    452    polling_interval = max_polling_interval;
    453    since_last_poll = now - group->last_poll;
    454 
    455    waiter = &group->waiter->recv_wait;
    456    poll_list = group->polling_list;
    457    for (count = 0; count < group->waiter->count; ++waiter) {
    458      PR_ASSERT(waiter < &group->waiter->recv_wait + group->waiter->length);
    459      if (NULL != *waiter) /* a live one! */
    460      {
    461        if ((PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout) &&
    462            (since_last_poll >= (*waiter)->timeout)) {
    463          _MW_DoneInternal(group, waiter, PR_MW_TIMEOUT);
    464        } else {
    465          if (PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout) {
    466            (*waiter)->timeout -= since_last_poll;
    467            if ((*waiter)->timeout < polling_interval) {
    468              polling_interval = (*waiter)->timeout;
    469            }
    470          }
    471          PR_ASSERT(poll_list < group->polling_list + group->polling_count);
    472          poll_list->fd = (*waiter)->fd;
    473          poll_list->in_flags = PR_POLL_READ;
    474          poll_list->out_flags = 0;
    475 #  if 0
    476                    printf(
    477                        "Polling 0x%x[%d]: [fd: 0x%x, tmo: %u]\n",
    478                        poll_list, count, poll_list->fd, (*waiter)->timeout);
    479 #  endif
    480          poll_list += 1;
    481          count += 1;
    482        }
    483      }
    484    }
    485 
    486    PR_ASSERT(count == group->waiter->count);
    487 
    488    /*
    489    ** If there are no more threads waiting for completion,
    490    ** we need to return.
    491    */
    492    if ((!PR_CLIST_IS_EMPTY(&group->io_ready)) &&
    493        (1 == group->waiting_threads)) {
    494      break;
    495    }
    496 
    497    if (0 == count) {
    498      continue; /* wait for new business */
    499    }
    500 
    501    group->last_poll = now;
    502 
    503    PR_Unlock(group->ml);
    504 
    505    count_ready = PR_Poll(group->polling_list, count, polling_interval);
    506 
    507    PR_Lock(group->ml);
    508 
    509    if (_prmw_running != group->state) {
    510      PR_SetError(PR_INVALID_STATE_ERROR, 0);
    511      goto aborted;
    512    }
    513    if (-1 == count_ready) {
    514      goto failed_poll; /* that's a shame */
    515    } else if (0 < count_ready) {
    516      for (poll_list = group->polling_list; count > 0; poll_list++, count--) {
    517        PR_ASSERT(poll_list < group->polling_list + group->polling_count);
    518        if (poll_list->out_flags != 0) {
    519          waiter = _MW_LookupInternal(group, poll_list->fd);
    520          /*
    521          ** If 'waiter' is NULL, that means the wait receive
    522          ** descriptor has been canceled.
    523          */
    524          if (NULL != waiter) {
    525            _MW_DoneInternal(group, waiter, PR_MW_SUCCESS);
    526          }
    527        }
    528      }
    529    }
    530    /*
    531    ** If there are no more threads waiting for completion,
    532    ** we need to return.
    533    ** This thread was "borrowed" to do the polling, but it really
    534    ** belongs to the client.
    535    */
    536    if ((!PR_CLIST_IS_EMPTY(&group->io_ready)) &&
    537        (1 == group->waiting_threads)) {
    538      break;
    539    }
    540  }
    541 
    542  rv = PR_SUCCESS;
    543 
    544 aborted:
    545 failed_poll:
    546 failed_alloc:
    547  group->poller = NULL; /* we were that, not we ain't */
    548  if ((_prmw_running == group->state) && (group->waiting_threads > 1)) {
    549    /* Wake up one thread to become the new poller. */
    550    PR_NotifyCondVar(group->io_complete);
    551  }
    552  return rv; /* we return with the lock held */
    553 } /* _MW_PollInternal */
    554 #endif /* !WINNT */
    555 
    556 static PRMWGroupState MW_TestForShutdownInternal(PRWaitGroup* group) {
    557  PRMWGroupState rv = group->state;
    558  /*
    559  ** Looking at the group's fields is safe because
    560  ** once the group's state is no longer running, it
    561  ** cannot revert and there is a safe check on entry
    562  ** to make sure no more threads are made to wait.
    563  */
    564  if ((_prmw_stopping == rv) && (0 == group->waiting_threads)) {
    565    rv = group->state = _prmw_stopped;
    566    PR_NotifyCondVar(group->mw_manage);
    567  }
    568  return rv;
    569 } /* MW_TestForShutdownInternal */
    570 
    571 #ifndef WINNT
    572 static void _MW_InitialRecv(PRCList* io_ready) {
    573  PRRecvWait* desc = (PRRecvWait*)io_ready;
    574  if ((NULL == desc->buffer.start) || (0 == desc->buffer.length)) {
    575    desc->bytesRecv = 0;
    576  } else {
    577    desc->bytesRecv = (desc->fd->methods->recv)(
    578        desc->fd, desc->buffer.start, desc->buffer.length, 0, desc->timeout);
    579    if (desc->bytesRecv < 0) { /* SetError should already be there */
    580      desc->outcome = PR_MW_FAILURE;
    581    }
    582  }
    583 } /* _MW_InitialRecv */
    584 #endif
    585 
    586 #ifdef WINNT
    587 static void NT_TimeProc(void* arg) {
    588  _MDOverlapped* overlapped = (_MDOverlapped*)arg;
    589  PRRecvWait* desc = overlapped->data.mw.desc;
    590  PRFileDesc* bottom;
    591 
    592  if (InterlockedCompareExchange((LONG*)&desc->outcome, (LONG)PR_MW_TIMEOUT,
    593                                 (LONG)PR_MW_PENDING) != (LONG)PR_MW_PENDING) {
    594    /* This wait recv descriptor has already completed. */
    595    return;
    596  }
    597 
    598  /* close the osfd to abort the outstanding async io request */
    599  /* $$$$
    600  ** Little late to be checking if NSPR's on the bottom of stack,
    601  ** but if we don't check, we can't assert that the private data
    602  ** is what we think it is.
    603  ** $$$$
    604  */
    605  bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
    606  PR_ASSERT(NULL != bottom);
    607  if (NULL != bottom) /* now what!?!?! */
    608  {
    609    bottom->secret->state = _PR_FILEDESC_CLOSED;
    610    if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR) {
    611      fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
    612      PR_NOT_REACHED("What shall I do?");
    613    }
    614  }
    615  return;
    616 } /* NT_TimeProc */
    617 
    618 static PRStatus NT_HashRemove(PRWaitGroup* group, PRFileDesc* fd) {
    619  PRRecvWait** waiter;
    620 
    621  _PR_MD_LOCK(&group->mdlock);
    622  waiter = _MW_LookupInternal(group, fd);
    623  if (NULL != waiter) {
    624    group->waiter->count -= 1;
    625    *waiter = NULL;
    626  }
    627  _PR_MD_UNLOCK(&group->mdlock);
    628  return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;
    629 }
    630 
    631 PRStatus NT_HashRemoveInternal(PRWaitGroup* group, PRFileDesc* fd) {
    632  PRRecvWait** waiter;
    633 
    634  waiter = _MW_LookupInternal(group, fd);
    635  if (NULL != waiter) {
    636    group->waiter->count -= 1;
    637    *waiter = NULL;
    638  }
    639  return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;
    640 }
    641 #endif /* WINNT */
    642 
    643 /******************************************************************/
    644 /******************************************************************/
    645 /********************** The public API portion ********************/
    646 /******************************************************************/
    647 /******************************************************************/
    648 PR_IMPLEMENT(PRStatus)
    649 PR_AddWaitFileDesc(PRWaitGroup* group, PRRecvWait* desc) {
    650  _PR_HashStory hrv;
    651  PRStatus rv = PR_FAILURE;
    652 #ifdef WINNT
    653  _MDOverlapped* overlapped;
    654  HANDLE hFile;
    655  BOOL bResult;
    656  DWORD dwError;
    657  PRFileDesc* bottom;
    658 #endif
    659 
    660  if (!_pr_initialized) {
    661    _PR_ImplicitInitialization();
    662  }
    663  if ((NULL == group) && (NULL == (group = MW_Init2()))) {
    664    return rv;
    665  }
    666 
    667  PR_ASSERT(NULL != desc->fd);
    668 
    669  desc->outcome = PR_MW_PENDING; /* nice, well known value */
    670  desc->bytesRecv = 0;           /* likewise, though this value is ambiguious */
    671 
    672  PR_Lock(group->ml);
    673 
    674  if (_prmw_running != group->state) {
    675    /* Not allowed to add after cancelling the group */
    676    desc->outcome = PR_MW_INTERRUPT;
    677    PR_SetError(PR_INVALID_STATE_ERROR, 0);
    678    PR_Unlock(group->ml);
    679    return rv;
    680  }
    681 
    682 #ifdef WINNT
    683  _PR_MD_LOCK(&group->mdlock);
    684 #endif
    685 
    686  /*
    687  ** If the waiter count is zero at this point, there's no telling
    688  ** how long we've been idle. Therefore, initialize the beginning
    689  ** of the timing interval. As long as the list doesn't go empty,
    690  ** it will maintain itself.
    691  */
    692  if (0 == group->waiter->count) {
    693    group->last_poll = PR_IntervalNow();
    694  }
    695 
    696  do {
    697    hrv = MW_AddHashInternal(desc, group->waiter);
    698    if (_prmw_rehash != hrv) {
    699      break;
    700    }
    701    hrv = MW_ExpandHashInternal(group); /* gruesome */
    702    if (_prmw_success != hrv) {
    703      break;
    704    }
    705  } while (PR_TRUE);
    706 
    707 #ifdef WINNT
    708  _PR_MD_UNLOCK(&group->mdlock);
    709 #endif
    710 
    711  PR_NotifyCondVar(group->new_business); /* tell the world */
    712  rv = (_prmw_success == hrv) ? PR_SUCCESS : PR_FAILURE;
    713  PR_Unlock(group->ml);
    714 
    715 #ifdef WINNT
    716  overlapped = PR_NEWZAP(_MDOverlapped);
    717  if (NULL == overlapped) {
    718    PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
    719    NT_HashRemove(group, desc->fd);
    720    return rv;
    721  }
    722  overlapped->ioModel = _MD_MultiWaitIO;
    723  overlapped->data.mw.desc = desc;
    724  overlapped->data.mw.group = group;
    725  if (desc->timeout != PR_INTERVAL_NO_TIMEOUT) {
    726    overlapped->data.mw.timer =
    727        CreateTimer(desc->timeout, NT_TimeProc, overlapped);
    728    if (0 == overlapped->data.mw.timer) {
    729      NT_HashRemove(group, desc->fd);
    730      PR_DELETE(overlapped);
    731      /*
    732       * XXX It appears that a maximum of 16 timer events can
    733       * be outstanding. GetLastError() returns 0 when I try it.
    734       */
    735      PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, GetLastError());
    736      return PR_FAILURE;
    737    }
    738  }
    739 
    740  /* Reach to the bottom layer to get the OS fd */
    741  bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
    742  PR_ASSERT(NULL != bottom);
    743  if (NULL == bottom) {
    744    PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
    745    return PR_FAILURE;
    746  }
    747  hFile = (HANDLE)bottom->secret->md.osfd;
    748  if (!bottom->secret->md.io_model_committed) {
    749    PRInt32 st;
    750    st = _md_Associate(hFile);
    751    PR_ASSERT(0 != st);
    752    bottom->secret->md.io_model_committed = PR_TRUE;
    753  }
    754  bResult = ReadFile(hFile, desc->buffer.start, (DWORD)desc->buffer.length,
    755                     NULL, &overlapped->overlapped);
    756  if (FALSE == bResult && (dwError = GetLastError()) != ERROR_IO_PENDING) {
    757    if (desc->timeout != PR_INTERVAL_NO_TIMEOUT) {
    758      if (InterlockedCompareExchange((LONG*)&desc->outcome, (LONG)PR_MW_FAILURE,
    759                                     (LONG)PR_MW_PENDING) ==
    760          (LONG)PR_MW_PENDING) {
    761        CancelTimer(overlapped->data.mw.timer);
    762      }
    763      NT_HashRemove(group, desc->fd);
    764      PR_DELETE(overlapped);
    765    }
    766    _PR_MD_MAP_READ_ERROR(dwError);
    767    rv = PR_FAILURE;
    768  }
    769 #endif
    770 
    771  return rv;
    772 } /* PR_AddWaitFileDesc */
    773 
    774 PR_IMPLEMENT(PRRecvWait*) PR_WaitRecvReady(PRWaitGroup* group) {
    775  PRCList* io_ready = NULL;
    776 #ifdef WINNT
    777  PRThread* me = _PR_MD_CURRENT_THREAD();
    778  _MDOverlapped* overlapped;
    779 #endif
    780 
    781  if (!_pr_initialized) {
    782    _PR_ImplicitInitialization();
    783  }
    784  if ((NULL == group) && (NULL == (group = MW_Init2()))) {
    785    goto failed_init;
    786  }
    787 
    788  PR_Lock(group->ml);
    789 
    790  if (_prmw_running != group->state) {
    791    PR_SetError(PR_INVALID_STATE_ERROR, 0);
    792    goto invalid_state;
    793  }
    794 
    795  group->waiting_threads += 1; /* the polling thread is counted */
    796 
    797 #ifdef WINNT
    798  _PR_MD_LOCK(&group->mdlock);
    799  while (PR_CLIST_IS_EMPTY(&group->io_ready)) {
    800    _PR_THREAD_LOCK(me);
    801    me->state = _PR_IO_WAIT;
    802    PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);
    803    if (!_PR_IS_NATIVE_THREAD(me)) {
    804      _PR_SLEEPQ_LOCK(me->cpu);
    805      _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);
    806      _PR_SLEEPQ_UNLOCK(me->cpu);
    807    }
    808    _PR_THREAD_UNLOCK(me);
    809    _PR_MD_UNLOCK(&group->mdlock);
    810    PR_Unlock(group->ml);
    811    _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);
    812    me->state = _PR_RUNNING;
    813    PR_Lock(group->ml);
    814    _PR_MD_LOCK(&group->mdlock);
    815    if (_PR_PENDING_INTERRUPT(me)) {
    816      PR_REMOVE_LINK(&me->waitQLinks);
    817      _PR_MD_UNLOCK(&group->mdlock);
    818      me->flags &= ~_PR_INTERRUPT;
    819      me->io_suspended = PR_FALSE;
    820      PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
    821      goto aborted;
    822    }
    823  }
    824  io_ready = PR_LIST_HEAD(&group->io_ready);
    825  PR_ASSERT(io_ready != NULL);
    826  PR_REMOVE_LINK(io_ready);
    827  _PR_MD_UNLOCK(&group->mdlock);
    828  overlapped =
    829      (_MDOverlapped*)((char*)io_ready - offsetof(_MDOverlapped, data));
    830  io_ready = &overlapped->data.mw.desc->internal;
    831 #else
    832  do {
    833    /*
    834    ** If the I/O ready list isn't empty, have this thread
    835    ** return with the first receive wait object that's available.
    836    */
    837    if (PR_CLIST_IS_EMPTY(&group->io_ready)) {
    838      /*
    839      ** Is there a polling thread yet? If not, grab this thread
    840      ** and use it.
    841      */
    842      if (NULL == group->poller) {
    843        /*
    844        ** This thread will stay do polling until it becomes the only one
    845        ** left to service a completion. Then it will return and there will
    846        ** be none left to actually poll or to run completions.
    847        **
    848        ** The polling function should only return w/ failure or
    849        ** with some I/O ready.
    850        */
    851        if (PR_FAILURE == _MW_PollInternal(group)) {
    852          goto failed_poll;
    853        }
    854      } else {
    855        /*
    856        ** There are four reasons a thread can be awakened from
    857        ** a wait on the io_complete condition variable.
    858        ** 1. Some I/O has completed, i.e., the io_ready list
    859        **    is nonempty.
    860        ** 2. The wait group is canceled.
    861        ** 3. The thread is interrupted.
    862        ** 4. The current polling thread has to leave and needs
    863        **    a replacement.
    864        ** The logic to find a new polling thread is made more
    865        ** complicated by all the other possible events.
    866        ** I tried my best to write the logic clearly, but
    867        ** it is still full of if's with continue and goto.
    868        */
    869        PRStatus st;
    870        do {
    871          st = PR_WaitCondVar(group->io_complete, PR_INTERVAL_NO_TIMEOUT);
    872          if (_prmw_running != group->state) {
    873            PR_SetError(PR_INVALID_STATE_ERROR, 0);
    874            goto aborted;
    875          }
    876          if (_MW_ABORTED(st) || (NULL == group->poller)) {
    877            break;
    878          }
    879        } while (PR_CLIST_IS_EMPTY(&group->io_ready));
    880 
    881        /*
    882        ** The thread is interrupted and has to leave.  It might
    883        ** have also been awakened to process ready i/o or be the
    884        ** new poller.  To be safe, if either condition is true,
    885        ** we awaken another thread to take its place.
    886        */
    887        if (_MW_ABORTED(st)) {
    888          if ((NULL == group->poller || !PR_CLIST_IS_EMPTY(&group->io_ready)) &&
    889              group->waiting_threads > 1) {
    890            PR_NotifyCondVar(group->io_complete);
    891          }
    892          goto aborted;
    893        }
    894 
    895        /*
    896        ** A new poller is needed, but can I be the new poller?
    897        ** If there is no i/o ready, sure.  But if there is any
    898        ** i/o ready, it has a higher priority.  I want to
    899        ** process the ready i/o first and wake up another
    900        ** thread to be the new poller.
    901        */
    902        if (NULL == group->poller) {
    903          if (PR_CLIST_IS_EMPTY(&group->io_ready)) {
    904            continue;
    905          }
    906          if (group->waiting_threads > 1) {
    907            PR_NotifyCondVar(group->io_complete);
    908          }
    909        }
    910      }
    911      PR_ASSERT(!PR_CLIST_IS_EMPTY(&group->io_ready));
    912    }
    913    io_ready = PR_LIST_HEAD(&group->io_ready);
    914    PR_NotifyCondVar(group->io_taken);
    915    PR_ASSERT(io_ready != NULL);
    916    PR_REMOVE_LINK(io_ready);
    917  } while (NULL == io_ready);
    918 
    919 failed_poll:
    920 
    921 #endif
    922 
    923 aborted:
    924 
    925  group->waiting_threads -= 1;
    926 invalid_state:
    927  (void)MW_TestForShutdownInternal(group);
    928  PR_Unlock(group->ml);
    929 
    930 failed_init:
    931  if (NULL != io_ready) {
    932    /* If the operation failed, record the reason why */
    933    switch (((PRRecvWait*)io_ready)->outcome) {
    934      case PR_MW_PENDING:
    935        PR_ASSERT(0);
    936        break;
    937      case PR_MW_SUCCESS:
    938 #ifndef WINNT
    939        _MW_InitialRecv(io_ready);
    940 #endif
    941        break;
    942 #ifdef WINNT
    943      case PR_MW_FAILURE:
    944        _PR_MD_MAP_READ_ERROR(overlapped->data.mw.error);
    945        break;
    946 #endif
    947      case PR_MW_TIMEOUT:
    948        PR_SetError(PR_IO_TIMEOUT_ERROR, 0);
    949        break;
    950      case PR_MW_INTERRUPT:
    951        PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
    952        break;
    953      default:
    954        break;
    955    }
    956 #ifdef WINNT
    957    if (NULL != overlapped->data.mw.timer) {
    958      PR_ASSERT(PR_INTERVAL_NO_TIMEOUT != overlapped->data.mw.desc->timeout);
    959      CancelTimer(overlapped->data.mw.timer);
    960    } else {
    961      PR_ASSERT(PR_INTERVAL_NO_TIMEOUT == overlapped->data.mw.desc->timeout);
    962    }
    963    PR_DELETE(overlapped);
    964 #endif
    965  }
    966  return (PRRecvWait*)io_ready;
    967 } /* PR_WaitRecvReady */
    968 
    969 PR_IMPLEMENT(PRStatus)
    970 PR_CancelWaitFileDesc(PRWaitGroup* group, PRRecvWait* desc) {
    971 #if !defined(WINNT)
    972  PRRecvWait** recv_wait;
    973 #endif
    974  PRStatus rv = PR_SUCCESS;
    975  if (NULL == group) {
    976    group = mw_state->group;
    977  }
    978  PR_ASSERT(NULL != group);
    979  if (NULL == group) {
    980    PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
    981    return PR_FAILURE;
    982  }
    983 
    984  PR_Lock(group->ml);
    985 
    986  if (_prmw_running != group->state) {
    987    PR_SetError(PR_INVALID_STATE_ERROR, 0);
    988    rv = PR_FAILURE;
    989    goto unlock;
    990  }
    991 
    992 #ifdef WINNT
    993  if (InterlockedCompareExchange((LONG*)&desc->outcome, (LONG)PR_MW_INTERRUPT,
    994                                 (LONG)PR_MW_PENDING) == (LONG)PR_MW_PENDING) {
    995    PRFileDesc* bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
    996    PR_ASSERT(NULL != bottom);
    997    if (NULL == bottom) {
    998      PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
    999      goto unlock;
   1000    }
   1001    bottom->secret->state = _PR_FILEDESC_CLOSED;
   1002 #  if 0
   1003        fprintf(stderr, "cancel wait recv: closing socket\n");
   1004 #  endif
   1005    if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR) {
   1006      fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
   1007      exit(1);
   1008    }
   1009  }
   1010 #else
   1011  if (NULL != (recv_wait = _MW_LookupInternal(group, desc->fd))) {
   1012    /* it was in the wait table */
   1013    _MW_DoneInternal(group, recv_wait, PR_MW_INTERRUPT);
   1014    goto unlock;
   1015  }
   1016  if (!PR_CLIST_IS_EMPTY(&group->io_ready)) {
   1017    /* is it already complete? */
   1018    PRCList* head = PR_LIST_HEAD(&group->io_ready);
   1019    do {
   1020      PRRecvWait* done = (PRRecvWait*)head;
   1021      if (done == desc) {
   1022        goto unlock;
   1023      }
   1024      head = PR_NEXT_LINK(head);
   1025    } while (head != &group->io_ready);
   1026  }
   1027  PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
   1028  rv = PR_FAILURE;
   1029 
   1030 #endif
   1031 unlock:
   1032  PR_Unlock(group->ml);
   1033  return rv;
   1034 } /* PR_CancelWaitFileDesc */
   1035 
   1036 PR_IMPLEMENT(PRRecvWait*) PR_CancelWaitGroup(PRWaitGroup* group) {
   1037  PRRecvWait** desc;
   1038  PRRecvWait* recv_wait = NULL;
   1039 #ifdef WINNT
   1040  _MDOverlapped* overlapped;
   1041  PRRecvWait** end;
   1042  PRThread* me = _PR_MD_CURRENT_THREAD();
   1043 #endif
   1044 
   1045  if (NULL == group) {
   1046    group = mw_state->group;
   1047  }
   1048  PR_ASSERT(NULL != group);
   1049  if (NULL == group) {
   1050    PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
   1051    return NULL;
   1052  }
   1053 
   1054  PR_Lock(group->ml);
   1055  if (_prmw_stopped != group->state) {
   1056    if (_prmw_running == group->state) {
   1057      group->state = _prmw_stopping; /* so nothing new comes in */
   1058    }
   1059    if (0 == group->waiting_threads) { /* is there anybody else? */
   1060      group->state = _prmw_stopped;    /* we can stop right now */
   1061    } else {
   1062      PR_NotifyAllCondVar(group->new_business);
   1063      PR_NotifyAllCondVar(group->io_complete);
   1064    }
   1065    while (_prmw_stopped != group->state) {
   1066      (void)PR_WaitCondVar(group->mw_manage, PR_INTERVAL_NO_TIMEOUT);
   1067    }
   1068  }
   1069 
   1070 #ifdef WINNT
   1071  _PR_MD_LOCK(&group->mdlock);
   1072 #endif
   1073  /* make all the existing descriptors look done/interrupted */
   1074 #ifdef WINNT
   1075  end = &group->waiter->recv_wait + group->waiter->length;
   1076  for (desc = &group->waiter->recv_wait; desc < end; ++desc) {
   1077    if (NULL != *desc) {
   1078      if (InterlockedCompareExchange(
   1079              (LONG*)&(*desc)->outcome, (LONG)PR_MW_INTERRUPT,
   1080              (LONG)PR_MW_PENDING) == (LONG)PR_MW_PENDING) {
   1081        PRFileDesc* bottom =
   1082            PR_GetIdentitiesLayer((*desc)->fd, PR_NSPR_IO_LAYER);
   1083        PR_ASSERT(NULL != bottom);
   1084        if (NULL == bottom) {
   1085          PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
   1086          goto invalid_arg;
   1087        }
   1088        bottom->secret->state = _PR_FILEDESC_CLOSED;
   1089 #  if 0
   1090                fprintf(stderr, "cancel wait group: closing socket\n");
   1091 #  endif
   1092        if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR) {
   1093          fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
   1094          exit(1);
   1095        }
   1096      }
   1097    }
   1098  }
   1099  while (group->waiter->count > 0) {
   1100    _PR_THREAD_LOCK(me);
   1101    me->state = _PR_IO_WAIT;
   1102    PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);
   1103    if (!_PR_IS_NATIVE_THREAD(me)) {
   1104      _PR_SLEEPQ_LOCK(me->cpu);
   1105      _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);
   1106      _PR_SLEEPQ_UNLOCK(me->cpu);
   1107    }
   1108    _PR_THREAD_UNLOCK(me);
   1109    _PR_MD_UNLOCK(&group->mdlock);
   1110    PR_Unlock(group->ml);
   1111    _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);
   1112    me->state = _PR_RUNNING;
   1113    PR_Lock(group->ml);
   1114    _PR_MD_LOCK(&group->mdlock);
   1115  }
   1116 #else
   1117  for (desc = &group->waiter->recv_wait; group->waiter->count > 0; ++desc) {
   1118    PR_ASSERT(desc < &group->waiter->recv_wait + group->waiter->length);
   1119    if (NULL != *desc) {
   1120      _MW_DoneInternal(group, desc, PR_MW_INTERRUPT);
   1121    }
   1122  }
   1123 #endif
   1124 
   1125  /* take first element of finished list and return it or NULL */
   1126  if (PR_CLIST_IS_EMPTY(&group->io_ready)) {
   1127    PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
   1128  } else {
   1129    PRCList* head = PR_LIST_HEAD(&group->io_ready);
   1130    PR_REMOVE_AND_INIT_LINK(head);
   1131 #ifdef WINNT
   1132    overlapped = (_MDOverlapped*)((char*)head - offsetof(_MDOverlapped, data));
   1133    head = &overlapped->data.mw.desc->internal;
   1134    if (NULL != overlapped->data.mw.timer) {
   1135      PR_ASSERT(PR_INTERVAL_NO_TIMEOUT != overlapped->data.mw.desc->timeout);
   1136      CancelTimer(overlapped->data.mw.timer);
   1137    } else {
   1138      PR_ASSERT(PR_INTERVAL_NO_TIMEOUT == overlapped->data.mw.desc->timeout);
   1139    }
   1140    PR_DELETE(overlapped);
   1141 #endif
   1142    recv_wait = (PRRecvWait*)head;
   1143  }
   1144 #ifdef WINNT
   1145 invalid_arg:
   1146  _PR_MD_UNLOCK(&group->mdlock);
   1147 #endif
   1148  PR_Unlock(group->ml);
   1149 
   1150  return recv_wait;
   1151 } /* PR_CancelWaitGroup */
   1152 
   1153 PR_IMPLEMENT(PRWaitGroup*) PR_CreateWaitGroup(PRInt32 size /* ignored */) {
   1154  PRWaitGroup* wg;
   1155 
   1156  if (NULL == (wg = PR_NEWZAP(PRWaitGroup))) {
   1157    PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
   1158    goto failed;
   1159  }
   1160  /* the wait group itself */
   1161  wg->ml = PR_NewLock();
   1162  if (NULL == wg->ml) {
   1163    goto failed_lock;
   1164  }
   1165  wg->io_taken = PR_NewCondVar(wg->ml);
   1166  if (NULL == wg->io_taken) {
   1167    goto failed_cvar0;
   1168  }
   1169  wg->io_complete = PR_NewCondVar(wg->ml);
   1170  if (NULL == wg->io_complete) {
   1171    goto failed_cvar1;
   1172  }
   1173  wg->new_business = PR_NewCondVar(wg->ml);
   1174  if (NULL == wg->new_business) {
   1175    goto failed_cvar2;
   1176  }
   1177  wg->mw_manage = PR_NewCondVar(wg->ml);
   1178  if (NULL == wg->mw_manage) {
   1179    goto failed_cvar3;
   1180  }
   1181 
   1182  PR_INIT_CLIST(&wg->group_link);
   1183  PR_INIT_CLIST(&wg->io_ready);
   1184 
   1185  /* the waiters sequence */
   1186  wg->waiter = (_PRWaiterHash*)PR_CALLOC(
   1187      sizeof(_PRWaiterHash) + (_PR_DEFAULT_HASH_LENGTH * sizeof(PRRecvWait*)));
   1188  if (NULL == wg->waiter) {
   1189    PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
   1190    goto failed_waiter;
   1191  }
   1192  wg->waiter->count = 0;
   1193  wg->waiter->length = _PR_DEFAULT_HASH_LENGTH;
   1194 
   1195 #ifdef WINNT
   1196  _PR_MD_NEW_LOCK(&wg->mdlock);
   1197  PR_INIT_CLIST(&wg->wait_list);
   1198 #endif /* WINNT */
   1199 
   1200  PR_Lock(mw_lock);
   1201  PR_APPEND_LINK(&wg->group_link, &mw_state->group_list);
   1202  PR_Unlock(mw_lock);
   1203  return wg;
   1204 
   1205 failed_waiter:
   1206  PR_DestroyCondVar(wg->mw_manage);
   1207 failed_cvar3:
   1208  PR_DestroyCondVar(wg->new_business);
   1209 failed_cvar2:
   1210  PR_DestroyCondVar(wg->io_complete);
   1211 failed_cvar1:
   1212  PR_DestroyCondVar(wg->io_taken);
   1213 failed_cvar0:
   1214  PR_DestroyLock(wg->ml);
   1215 failed_lock:
   1216  PR_DELETE(wg);
   1217  wg = NULL;
   1218 
   1219 failed:
   1220  return wg;
   1221 } /* MW_CreateWaitGroup */
   1222 
   1223 PR_IMPLEMENT(PRStatus) PR_DestroyWaitGroup(PRWaitGroup* group) {
   1224  PRStatus rv = PR_SUCCESS;
   1225  if (NULL == group) {
   1226    group = mw_state->group;
   1227  }
   1228  PR_ASSERT(NULL != group);
   1229  if (NULL != group) {
   1230    PR_Lock(group->ml);
   1231    if ((group->waiting_threads == 0) && (group->waiter->count == 0) &&
   1232        PR_CLIST_IS_EMPTY(&group->io_ready)) {
   1233      group->state = _prmw_stopped;
   1234    } else {
   1235      PR_SetError(PR_INVALID_STATE_ERROR, 0);
   1236      rv = PR_FAILURE;
   1237    }
   1238    PR_Unlock(group->ml);
   1239    if (PR_FAILURE == rv) {
   1240      return rv;
   1241    }
   1242 
   1243    PR_Lock(mw_lock);
   1244    PR_REMOVE_LINK(&group->group_link);
   1245    PR_Unlock(mw_lock);
   1246 
   1247 #ifdef WINNT
   1248    /*
   1249     * XXX make sure wait_list is empty and waiter is empty.
   1250     * These must be checked while holding mdlock.
   1251     */
   1252    _PR_MD_FREE_LOCK(&group->mdlock);
   1253 #endif
   1254 
   1255    PR_DELETE(group->waiter);
   1256    PR_DELETE(group->polling_list);
   1257    PR_DestroyCondVar(group->mw_manage);
   1258    PR_DestroyCondVar(group->new_business);
   1259    PR_DestroyCondVar(group->io_complete);
   1260    PR_DestroyCondVar(group->io_taken);
   1261    PR_DestroyLock(group->ml);
   1262    if (group == mw_state->group) {
   1263      mw_state->group = NULL;
   1264    }
   1265    PR_DELETE(group);
   1266  } else {
   1267    /* The default wait group is not created yet. */
   1268    PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
   1269    rv = PR_FAILURE;
   1270  }
   1271  return rv;
   1272 } /* PR_DestroyWaitGroup */
   1273 
   1274 /**********************************************************************
   1275 ***********************************************************************
   1276 ******************** Wait group enumerations **************************
   1277 ***********************************************************************
   1278 **********************************************************************/
   1279 
   1280 PR_IMPLEMENT(PRMWaitEnumerator*) PR_CreateMWaitEnumerator(PRWaitGroup* group) {
   1281  PRMWaitEnumerator* enumerator = PR_NEWZAP(PRMWaitEnumerator);
   1282  if (NULL == enumerator) {
   1283    PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
   1284  } else {
   1285    enumerator->group = group;
   1286    enumerator->seal = _PR_ENUM_SEALED;
   1287  }
   1288  return enumerator;
   1289 } /* PR_CreateMWaitEnumerator */
   1290 
   1291 PR_IMPLEMENT(PRStatus)
   1292 PR_DestroyMWaitEnumerator(PRMWaitEnumerator* enumerator) {
   1293  PR_ASSERT(NULL != enumerator);
   1294  PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal);
   1295  if ((NULL == enumerator) || (_PR_ENUM_SEALED != enumerator->seal)) {
   1296    PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
   1297    return PR_FAILURE;
   1298  }
   1299  enumerator->seal = _PR_ENUM_UNSEALED;
   1300  PR_Free(enumerator);
   1301  return PR_SUCCESS;
   1302 } /* PR_DestroyMWaitEnumerator */
   1303 
   1304 PR_IMPLEMENT(PRRecvWait*)
   1305 PR_EnumerateWaitGroup(PRMWaitEnumerator* enumerator,
   1306                      const PRRecvWait* previous) {
   1307  PRRecvWait* result = NULL;
   1308 
   1309  /* entry point sanity checking */
   1310  PR_ASSERT(NULL != enumerator);
   1311  PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal);
   1312  if ((NULL == enumerator) || (_PR_ENUM_SEALED != enumerator->seal)) {
   1313    goto bad_argument;
   1314  }
   1315 
   1316  /* beginning of enumeration */
   1317  if (NULL == previous) {
   1318    if (NULL == enumerator->group) {
   1319      enumerator->group = mw_state->group;
   1320      if (NULL == enumerator->group) {
   1321        PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
   1322        return NULL;
   1323      }
   1324    }
   1325    enumerator->waiter = &enumerator->group->waiter->recv_wait;
   1326    enumerator->p_timestamp = enumerator->group->p_timestamp;
   1327    enumerator->thread = PR_GetCurrentThread();
   1328    enumerator->index = 0;
   1329  }
   1330  /* continuing an enumeration */
   1331  else {
   1332    PRThread* me = PR_GetCurrentThread();
   1333    PR_ASSERT(me == enumerator->thread);
   1334    if (me != enumerator->thread) {
   1335      goto bad_argument;
   1336    }
   1337 
   1338    /* need to restart the enumeration */
   1339    if (enumerator->p_timestamp != enumerator->group->p_timestamp) {
   1340      return PR_EnumerateWaitGroup(enumerator, NULL);
   1341    }
   1342  }
   1343 
   1344  /* actually progress the enumeration */
   1345 #if defined(WINNT)
   1346  _PR_MD_LOCK(&enumerator->group->mdlock);
   1347 #else
   1348  PR_Lock(enumerator->group->ml);
   1349 #endif
   1350  while (enumerator->index++ < enumerator->group->waiter->length) {
   1351    if (NULL != (result = *(enumerator->waiter)++)) {
   1352      break;
   1353    }
   1354  }
   1355 #if defined(WINNT)
   1356  _PR_MD_UNLOCK(&enumerator->group->mdlock);
   1357 #else
   1358  PR_Unlock(enumerator->group->ml);
   1359 #endif
   1360 
   1361  return result; /* what we live for */
   1362 
   1363 bad_argument:
   1364  PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
   1365  return NULL; /* probably ambiguous */
   1366 } /* PR_EnumerateWaitGroup */
   1367 
   1368 /* prmwait.c */