tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

multiwait.c (21115B)


      1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
      2 /* This Source Code Form is subject to the terms of the Mozilla Public
      3 * License, v. 2.0. If a copy of the MPL was not distributed with this
      4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
      5 
      6 #include "prio.h"
      7 #include "prprf.h"
      8 #include "prlog.h"
      9 #include "prmem.h"
     10 #include "pratom.h"
     11 #include "prlock.h"
     12 #include "prmwait.h"
     13 #include "prclist.h"
     14 #include "prerror.h"
     15 #include "prinrval.h"
     16 #include "prnetdb.h"
     17 #include "prthread.h"
     18 
     19 #include "plstr.h"
     20 #include "plerror.h"
     21 #include "plgetopt.h"
     22 
     23 #include <string.h>
     24 
     25 typedef struct Shared {
     26  const char* title;
     27  PRLock* list_lock;
     28  PRWaitGroup* group;
     29  PRIntervalTime timeout;
     30 } Shared;
     31 
     32 typedef enum Verbosity { silent, quiet, chatty, noisy } Verbosity;
     33 
     34 #ifdef DEBUG
     35 #  define PORT_INC_DO +100
     36 #else
     37 #  define PORT_INC_DO
     38 #endif
     39 #ifdef IS_64
     40 #  define PORT_INC_3264 +200
     41 #else
     42 #  define PORT_INC_3264
     43 #endif
     44 
     45 static PRFileDesc* debug = NULL;
     46 static PRInt32 desc_allocated = 0;
     47 static PRUint16 default_port = 12273 PORT_INC_DO PORT_INC_3264;
     48 static enum Verbosity verbosity = quiet;
     49 static PRInt32 ops_required = 1000, ops_done = 0;
     50 static PRThreadScope thread_scope = PR_LOCAL_THREAD;
     51 static PRIntn client_threads = 20, worker_threads = 2, wait_objects = 50;
     52 
     53 #if defined(DEBUG)
     54 #  define MW_ASSERT(_expr) \
     55    ((_expr) ? ((void)0) : _MW_Assert(#_expr, __FILE__, __LINE__))
     56 static void _MW_Assert(const char* s, const char* file, PRIntn ln) {
     57  if (NULL != debug) {
     58    PL_FPrintError(debug, NULL);
     59  }
     60  PR_Assert(s, file, ln);
     61 } /* _MW_Assert */
     62 #else
     63 #  define MW_ASSERT(_expr)
     64 #endif
     65 
     66 static void PrintRecvDesc(PRRecvWait* desc, const char* msg) {
     67  const char* tag[] = {"PR_MW_INTERRUPT", "PR_MW_TIMEOUT", "PR_MW_FAILURE",
     68                       "PR_MW_SUCCESS", "PR_MW_PENDING"};
     69  PR_fprintf(debug, "%s: PRRecvWait(@0x%x): {fd: 0x%x, outcome: %s, tmo: %u}\n",
     70             msg, desc, desc->fd, tag[desc->outcome + 3], desc->timeout);
     71 } /* PrintRecvDesc */
     72 
     73 static Shared* MakeShared(const char* title) {
     74  Shared* shared = PR_NEWZAP(Shared);
     75  shared->group = PR_CreateWaitGroup(1);
     76  shared->timeout = PR_SecondsToInterval(1);
     77  shared->list_lock = PR_NewLock();
     78  shared->title = title;
     79  return shared;
     80 } /* MakeShared */
     81 
     82 static void DestroyShared(Shared* shared) {
     83  PRStatus rv;
     84  if (verbosity > quiet) {
     85    PR_fprintf(debug, "%s: destroying group\n", shared->title);
     86  }
     87  rv = PR_DestroyWaitGroup(shared->group);
     88  MW_ASSERT(PR_SUCCESS == rv);
     89  PR_DestroyLock(shared->list_lock);
     90  PR_DELETE(shared);
     91 } /* DestroyShared */
     92 
     93 static PRRecvWait* CreateRecvWait(PRFileDesc* fd, PRIntervalTime timeout) {
     94  PRRecvWait* desc_out = PR_NEWZAP(PRRecvWait);
     95  MW_ASSERT(NULL != desc_out);
     96 
     97  MW_ASSERT(NULL != fd);
     98  desc_out->fd = fd;
     99  desc_out->timeout = timeout;
    100  desc_out->buffer.length = 120;
    101  desc_out->buffer.start = PR_CALLOC(120);
    102 
    103  PR_AtomicIncrement(&desc_allocated);
    104 
    105  if (verbosity > chatty) {
    106    PrintRecvDesc(desc_out, "Allocated");
    107  }
    108  return desc_out;
    109 } /* CreateRecvWait */
    110 
    111 static void DestroyRecvWait(PRRecvWait* desc_out) {
    112  if (verbosity > chatty) {
    113    PrintRecvDesc(desc_out, "Destroying");
    114  }
    115  PR_Close(desc_out->fd);
    116  if (NULL != desc_out->buffer.start) {
    117    PR_DELETE(desc_out->buffer.start);
    118  }
    119  PR_Free(desc_out);
    120  (void)PR_AtomicDecrement(&desc_allocated);
    121 } /* DestroyRecvWait */
    122 
    123 static void CancelGroup(Shared* shared) {
    124  PRRecvWait* desc_out;
    125 
    126  if (verbosity > quiet) {
    127    PR_fprintf(debug, "%s Reclaiming wait descriptors\n", shared->title);
    128  }
    129 
    130  do {
    131    desc_out = PR_CancelWaitGroup(shared->group);
    132    if (NULL != desc_out) {
    133      DestroyRecvWait(desc_out);
    134    }
    135  } while (NULL != desc_out);
    136 
    137  MW_ASSERT(0 == desc_allocated);
    138  MW_ASSERT(PR_GROUP_EMPTY_ERROR == PR_GetError());
    139 } /* CancelGroup */
    140 
    141 static void PR_CALLBACK ClientThread(void* arg) {
    142  PRStatus rv;
    143  PRInt32 bytes;
    144  PRIntn empty_flags = 0;
    145  PRNetAddr server_address;
    146  unsigned char buffer[100];
    147  Shared* shared = (Shared*)arg;
    148  PRFileDesc* server = PR_NewTCPSocket();
    149  if ((NULL == server) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) {
    150    return;
    151  }
    152  MW_ASSERT(NULL != server);
    153 
    154  if (verbosity > chatty) {
    155    PR_fprintf(debug, "%s: Server socket @0x%x\n", shared->title, server);
    156  }
    157 
    158  /* Initialize the buffer so that Purify won't complain */
    159  memset(buffer, 0, sizeof(buffer));
    160 
    161  rv = PR_InitializeNetAddr(PR_IpAddrLoopback, default_port, &server_address);
    162  MW_ASSERT(PR_SUCCESS == rv);
    163 
    164  if (verbosity > quiet) {
    165    PR_fprintf(debug, "%s: Client opening connection\n", shared->title);
    166  }
    167  rv = PR_Connect(server, &server_address, PR_INTERVAL_NO_TIMEOUT);
    168 
    169  if (PR_FAILURE == rv) {
    170    if (verbosity > silent) {
    171      PL_FPrintError(debug, "Client connect failed");
    172    }
    173    return;
    174  }
    175 
    176  while (ops_done < ops_required) {
    177    bytes = PR_Send(server, buffer, sizeof(buffer), empty_flags,
    178                    PR_INTERVAL_NO_TIMEOUT);
    179    if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) {
    180      break;
    181    }
    182    MW_ASSERT(sizeof(buffer) == bytes);
    183    if (verbosity > chatty)
    184      PR_fprintf(debug, "%s: Client sent %d bytes\n", shared->title,
    185                 sizeof(buffer));
    186    bytes = PR_Recv(server, buffer, sizeof(buffer), empty_flags,
    187                    PR_INTERVAL_NO_TIMEOUT);
    188    if (verbosity > chatty)
    189      PR_fprintf(debug, "%s: Client received %d bytes\n", shared->title,
    190                 sizeof(buffer));
    191    if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) {
    192      break;
    193    }
    194    MW_ASSERT(sizeof(buffer) == bytes);
    195    PR_Sleep(shared->timeout);
    196  }
    197  rv = PR_Close(server);
    198  MW_ASSERT(PR_SUCCESS == rv);
    199 
    200 } /* ClientThread */
    201 
    202 static void OneInThenCancelled(Shared* shared) {
    203  PRStatus rv;
    204  PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait);
    205 
    206  shared->timeout = PR_INTERVAL_NO_TIMEOUT;
    207 
    208  desc_in->fd = PR_NewTCPSocket();
    209  desc_in->timeout = shared->timeout;
    210 
    211  if (verbosity > chatty) {
    212    PrintRecvDesc(desc_in, "Adding desc");
    213  }
    214 
    215  rv = PR_AddWaitFileDesc(shared->group, desc_in);
    216  MW_ASSERT(PR_SUCCESS == rv);
    217 
    218  if (verbosity > chatty) {
    219    PrintRecvDesc(desc_in, "Cancelling");
    220  }
    221  rv = PR_CancelWaitFileDesc(shared->group, desc_in);
    222  MW_ASSERT(PR_SUCCESS == rv);
    223 
    224  desc_out = PR_WaitRecvReady(shared->group);
    225  MW_ASSERT(desc_out == desc_in);
    226  MW_ASSERT(PR_MW_INTERRUPT == desc_out->outcome);
    227  MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
    228  if (verbosity > chatty) {
    229    PrintRecvDesc(desc_out, "Ready");
    230  }
    231 
    232  rv = PR_Close(desc_in->fd);
    233  MW_ASSERT(PR_SUCCESS == rv);
    234 
    235  if (verbosity > quiet) {
    236    PR_fprintf(debug, "%s: destroying group\n", shared->title);
    237  }
    238 
    239  PR_DELETE(desc_in);
    240 } /* OneInThenCancelled */
    241 
    242 static void OneOpOneThread(Shared* shared) {
    243  PRStatus rv;
    244  PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait);
    245 
    246  desc_in->fd = PR_NewTCPSocket();
    247  desc_in->timeout = shared->timeout;
    248 
    249  if (verbosity > chatty) {
    250    PrintRecvDesc(desc_in, "Adding desc");
    251  }
    252 
    253  rv = PR_AddWaitFileDesc(shared->group, desc_in);
    254  MW_ASSERT(PR_SUCCESS == rv);
    255  desc_out = PR_WaitRecvReady(shared->group);
    256  MW_ASSERT(desc_out == desc_in);
    257  MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
    258  MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
    259  if (verbosity > chatty) {
    260    PrintRecvDesc(desc_out, "Ready");
    261  }
    262 
    263  rv = PR_Close(desc_in->fd);
    264  MW_ASSERT(PR_SUCCESS == rv);
    265 
    266  PR_DELETE(desc_in);
    267 } /* OneOpOneThread */
    268 
    269 static void ManyOpOneThread(Shared* shared) {
    270  PRStatus rv;
    271  PRIntn index;
    272  PRRecvWait* desc_in;
    273  PRRecvWait* desc_out;
    274 
    275  if (verbosity > quiet) {
    276    PR_fprintf(debug, "%s: adding %d descs\n", shared->title, wait_objects);
    277  }
    278 
    279  for (index = 0; index < wait_objects; ++index) {
    280    desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout);
    281 
    282    rv = PR_AddWaitFileDesc(shared->group, desc_in);
    283    MW_ASSERT(PR_SUCCESS == rv);
    284  }
    285 
    286  while (ops_done < ops_required) {
    287    desc_out = PR_WaitRecvReady(shared->group);
    288    MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
    289    MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
    290    if (verbosity > chatty) {
    291      PrintRecvDesc(desc_out, "Ready/readding");
    292    }
    293    rv = PR_AddWaitFileDesc(shared->group, desc_out);
    294    MW_ASSERT(PR_SUCCESS == rv);
    295    (void)PR_AtomicIncrement(&ops_done);
    296  }
    297 
    298  CancelGroup(shared);
    299 } /* ManyOpOneThread */
    300 
    301 static void PR_CALLBACK SomeOpsThread(void* arg) {
    302  PRRecvWait* desc_out;
    303  PRStatus rv = PR_SUCCESS;
    304  Shared* shared = (Shared*)arg;
    305  do /* until interrupted */
    306  {
    307    desc_out = PR_WaitRecvReady(shared->group);
    308    if (NULL == desc_out) {
    309      MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
    310      if (verbosity > quiet) {
    311        PR_fprintf(debug, "Aborted\n");
    312      }
    313      break;
    314    }
    315    MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
    316    MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
    317    if (verbosity > chatty) {
    318      PrintRecvDesc(desc_out, "Ready");
    319    }
    320 
    321    if (verbosity > chatty) {
    322      PrintRecvDesc(desc_out, "Re-Adding");
    323    }
    324    desc_out->timeout = shared->timeout;
    325    rv = PR_AddWaitFileDesc(shared->group, desc_out);
    326    PR_AtomicIncrement(&ops_done);
    327    if (ops_done > ops_required) {
    328      break;
    329    }
    330  } while (PR_SUCCESS == rv);
    331  MW_ASSERT(PR_SUCCESS == rv);
    332 } /* SomeOpsThread */
    333 
    334 static void SomeOpsSomeThreads(Shared* shared) {
    335  PRStatus rv;
    336  PRThread** thread;
    337  PRIntn index;
    338  PRRecvWait* desc_in;
    339 
    340  thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads);
    341 
    342  /* Create some threads */
    343 
    344  if (verbosity > quiet) {
    345    PR_fprintf(debug, "%s: creating threads\n", shared->title);
    346  }
    347  for (index = 0; index < worker_threads; ++index) {
    348    thread[index] =
    349        PR_CreateThread(PR_USER_THREAD, SomeOpsThread, shared, PR_PRIORITY_HIGH,
    350                        thread_scope, PR_JOINABLE_THREAD, 16 * 1024);
    351  }
    352 
    353  /* then create some operations */
    354  if (verbosity > quiet) {
    355    PR_fprintf(debug, "%s: creating desc\n", shared->title);
    356  }
    357  for (index = 0; index < wait_objects; ++index) {
    358    desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout);
    359    rv = PR_AddWaitFileDesc(shared->group, desc_in);
    360    MW_ASSERT(PR_SUCCESS == rv);
    361  }
    362 
    363  if (verbosity > quiet) {
    364    PR_fprintf(debug, "%s: sleeping\n", shared->title);
    365  }
    366  while (ops_done < ops_required) {
    367    PR_Sleep(shared->timeout);
    368  }
    369 
    370  if (verbosity > quiet) {
    371    PR_fprintf(debug, "%s: interrupting/joining threads\n", shared->title);
    372  }
    373  for (index = 0; index < worker_threads; ++index) {
    374    rv = PR_Interrupt(thread[index]);
    375    MW_ASSERT(PR_SUCCESS == rv);
    376    rv = PR_JoinThread(thread[index]);
    377    MW_ASSERT(PR_SUCCESS == rv);
    378  }
    379  PR_DELETE(thread);
    380 
    381  CancelGroup(shared);
    382 } /* SomeOpsSomeThreads */
    383 
    384 static PRStatus ServiceRequest(Shared* shared, PRRecvWait* desc) {
    385  PRInt32 bytes_out;
    386 
    387  if (verbosity > chatty)
    388    PR_fprintf(debug, "%s: Service received %d bytes\n", shared->title,
    389               desc->bytesRecv);
    390 
    391  if (0 == desc->bytesRecv) {
    392    goto quitting;
    393  }
    394  if ((-1 == desc->bytesRecv) &&
    395      (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) {
    396    goto aborted;
    397  }
    398 
    399  bytes_out = PR_Send(desc->fd, desc->buffer.start, desc->bytesRecv, 0,
    400                      shared->timeout);
    401  if (verbosity > chatty)
    402    PR_fprintf(debug, "%s: Service sent %d bytes\n", shared->title, bytes_out);
    403 
    404  if ((-1 == bytes_out) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) {
    405    goto aborted;
    406  }
    407  MW_ASSERT(bytes_out == desc->bytesRecv);
    408 
    409  return PR_SUCCESS;
    410 
    411 aborted:
    412 quitting:
    413  return PR_FAILURE;
    414 } /* ServiceRequest */
    415 
    416 static void PR_CALLBACK ServiceThread(void* arg) {
    417  PRStatus rv = PR_SUCCESS;
    418  PRRecvWait* desc_out = NULL;
    419  Shared* shared = (Shared*)arg;
    420  do /* until interrupted */
    421  {
    422    if (NULL != desc_out) {
    423      desc_out->timeout = PR_INTERVAL_NO_TIMEOUT;
    424      if (verbosity > chatty) {
    425        PrintRecvDesc(desc_out, "Service re-adding");
    426      }
    427      rv = PR_AddWaitFileDesc(shared->group, desc_out);
    428      MW_ASSERT(PR_SUCCESS == rv);
    429    }
    430 
    431    desc_out = PR_WaitRecvReady(shared->group);
    432    if (NULL == desc_out) {
    433      MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
    434      break;
    435    }
    436 
    437    switch (desc_out->outcome) {
    438      case PR_MW_SUCCESS: {
    439        PR_AtomicIncrement(&ops_done);
    440        if (verbosity > chatty) {
    441          PrintRecvDesc(desc_out, "Service ready");
    442        }
    443        rv = ServiceRequest(shared, desc_out);
    444        break;
    445      }
    446      case PR_MW_INTERRUPT:
    447        MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
    448        rv = PR_FAILURE; /* if interrupted, then exit */
    449        break;
    450      case PR_MW_TIMEOUT:
    451        MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
    452      case PR_MW_FAILURE:
    453        if (verbosity > silent) {
    454          PL_FPrintError(debug, "RecvReady failure");
    455        }
    456        break;
    457      default:
    458        break;
    459    }
    460  } while (PR_SUCCESS == rv);
    461 
    462  if (NULL != desc_out) {
    463    DestroyRecvWait(desc_out);
    464  }
    465 
    466 } /* ServiceThread */
    467 
    468 static void PR_CALLBACK EnumerationThread(void* arg) {
    469  PRStatus rv;
    470  PRIntn count;
    471  PRRecvWait* desc;
    472  Shared* shared = (Shared*)arg;
    473  PRIntervalTime five_seconds = PR_SecondsToInterval(5);
    474  PRMWaitEnumerator* enumerator = PR_CreateMWaitEnumerator(shared->group);
    475  MW_ASSERT(NULL != enumerator);
    476 
    477  while (PR_SUCCESS == PR_Sleep(five_seconds)) {
    478    count = 0;
    479    desc = NULL;
    480    while (NULL != (desc = PR_EnumerateWaitGroup(enumerator, desc))) {
    481      if (verbosity > chatty) {
    482        PrintRecvDesc(desc, shared->title);
    483      }
    484      count += 1;
    485    }
    486    if (verbosity > silent)
    487      PR_fprintf(debug, "%s Enumerated %d objects\n", shared->title, count);
    488  }
    489 
    490  MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
    491 
    492  rv = PR_DestroyMWaitEnumerator(enumerator);
    493  MW_ASSERT(PR_SUCCESS == rv);
    494 } /* EnumerationThread */
    495 
    496 static void PR_CALLBACK ServerThread(void* arg) {
    497  PRStatus rv;
    498  PRIntn index;
    499  PRRecvWait* desc_in;
    500  PRThread** worker_thread;
    501  Shared* shared = (Shared*)arg;
    502  PRFileDesc *listener, *service;
    503  PRNetAddr server_address, client_address;
    504 
    505  worker_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads);
    506  if (verbosity > quiet) {
    507    PR_fprintf(debug, "%s: Server creating worker_threads\n", shared->title);
    508  }
    509  for (index = 0; index < worker_threads; ++index) {
    510    worker_thread[index] =
    511        PR_CreateThread(PR_USER_THREAD, ServiceThread, shared, PR_PRIORITY_HIGH,
    512                        thread_scope, PR_JOINABLE_THREAD, 16 * 1024);
    513  }
    514 
    515  rv = PR_InitializeNetAddr(PR_IpAddrAny, default_port, &server_address);
    516  MW_ASSERT(PR_SUCCESS == rv);
    517 
    518  listener = PR_NewTCPSocket();
    519  MW_ASSERT(NULL != listener);
    520  if (verbosity > chatty)
    521    PR_fprintf(debug, "%s: Server listener socket @0x%x\n", shared->title,
    522               listener);
    523  rv = PR_Bind(listener, &server_address);
    524  MW_ASSERT(PR_SUCCESS == rv);
    525  rv = PR_Listen(listener, 10);
    526  MW_ASSERT(PR_SUCCESS == rv);
    527  while (ops_done < ops_required) {
    528    if (verbosity > quiet) {
    529      PR_fprintf(debug, "%s: Server accepting connection\n", shared->title);
    530    }
    531    service = PR_Accept(listener, &client_address, PR_INTERVAL_NO_TIMEOUT);
    532    if (NULL == service) {
    533      if (PR_PENDING_INTERRUPT_ERROR == PR_GetError()) {
    534        break;
    535      }
    536      PL_PrintError("Accept failed");
    537      MW_ASSERT(PR_FALSE && "Accept failed");
    538    } else {
    539      desc_in = CreateRecvWait(service, shared->timeout);
    540      desc_in->timeout = PR_INTERVAL_NO_TIMEOUT;
    541      if (verbosity > chatty) {
    542        PrintRecvDesc(desc_in, "Service adding");
    543      }
    544      rv = PR_AddWaitFileDesc(shared->group, desc_in);
    545      MW_ASSERT(PR_SUCCESS == rv);
    546    }
    547  }
    548 
    549  if (verbosity > quiet) {
    550    PR_fprintf(debug, "%s: Server interrupting worker_threads\n",
    551               shared->title);
    552  }
    553  for (index = 0; index < worker_threads; ++index) {
    554    rv = PR_Interrupt(worker_thread[index]);
    555    MW_ASSERT(PR_SUCCESS == rv);
    556    rv = PR_JoinThread(worker_thread[index]);
    557    MW_ASSERT(PR_SUCCESS == rv);
    558  }
    559  PR_DELETE(worker_thread);
    560 
    561  PR_Close(listener);
    562 
    563  CancelGroup(shared);
    564 
    565 } /* ServerThread */
    566 
    567 static void RealOneGroupIO(Shared* shared) {
    568  /*
    569  ** Create a server that listens for connections and then services
    570  ** requests that come in over those connections. The server never
    571  ** deletes a connection and assumes a basic RPC model of operation.
    572  **
    573  ** Use worker_threads threads to service how every many open ports
    574  ** there might be.
    575  **
    576  ** Oh, ya. Almost forget. Create (some) clients as well.
    577  */
    578  PRStatus rv;
    579  PRIntn index;
    580  PRThread *server_thread, *enumeration_thread, **client_thread;
    581 
    582  if (verbosity > quiet) {
    583    PR_fprintf(debug, "%s: creating server_thread\n", shared->title);
    584  }
    585 
    586  server_thread =
    587      PR_CreateThread(PR_USER_THREAD, ServerThread, shared, PR_PRIORITY_HIGH,
    588                      thread_scope, PR_JOINABLE_THREAD, 16 * 1024);
    589 
    590  if (verbosity > quiet) {
    591    PR_fprintf(debug, "%s: creating enumeration_thread\n", shared->title);
    592  }
    593 
    594  enumeration_thread = PR_CreateThread(PR_USER_THREAD, EnumerationThread,
    595                                       shared, PR_PRIORITY_HIGH, thread_scope,
    596                                       PR_JOINABLE_THREAD, 16 * 1024);
    597 
    598  if (verbosity > quiet) {
    599    PR_fprintf(debug, "%s: snoozing before creating clients\n", shared->title);
    600  }
    601  PR_Sleep(5 * shared->timeout);
    602 
    603  if (verbosity > quiet) {
    604    PR_fprintf(debug, "%s: creating client_threads\n", shared->title);
    605  }
    606  client_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * client_threads);
    607  for (index = 0; index < client_threads; ++index) {
    608    client_thread[index] = PR_CreateThread(PR_USER_THREAD, ClientThread, shared,
    609                                           PR_PRIORITY_NORMAL, thread_scope,
    610                                           PR_JOINABLE_THREAD, 16 * 1024);
    611  }
    612 
    613  while (ops_done < ops_required) {
    614    PR_Sleep(shared->timeout);
    615  }
    616 
    617  if (verbosity > quiet) {
    618    PR_fprintf(debug, "%s: interrupting/joining client_threads\n",
    619               shared->title);
    620  }
    621  for (index = 0; index < client_threads; ++index) {
    622    rv = PR_Interrupt(client_thread[index]);
    623    MW_ASSERT(PR_SUCCESS == rv);
    624    rv = PR_JoinThread(client_thread[index]);
    625    MW_ASSERT(PR_SUCCESS == rv);
    626  }
    627  PR_DELETE(client_thread);
    628 
    629  if (verbosity > quiet) {
    630    PR_fprintf(debug, "%s: interrupting/joining enumeration_thread\n",
    631               shared->title);
    632  }
    633  rv = PR_Interrupt(enumeration_thread);
    634  MW_ASSERT(PR_SUCCESS == rv);
    635  rv = PR_JoinThread(enumeration_thread);
    636  MW_ASSERT(PR_SUCCESS == rv);
    637 
    638  if (verbosity > quiet) {
    639    PR_fprintf(debug, "%s: interrupting/joining server_thread\n",
    640               shared->title);
    641  }
    642  rv = PR_Interrupt(server_thread);
    643  MW_ASSERT(PR_SUCCESS == rv);
    644  rv = PR_JoinThread(server_thread);
    645  MW_ASSERT(PR_SUCCESS == rv);
    646 } /* RealOneGroupIO */
    647 
    648 static void RunThisOne(void (*func)(Shared*), const char* name,
    649                       const char* test_name) {
    650  Shared* shared;
    651  if ((NULL == test_name) || (0 == PL_strcmp(name, test_name))) {
    652    if (verbosity > silent) {
    653      PR_fprintf(debug, "%s()\n", name);
    654    }
    655    shared = MakeShared(name);
    656    ops_done = 0;
    657    func(shared); /* run the test */
    658    MW_ASSERT(0 == desc_allocated);
    659    DestroyShared(shared);
    660  }
    661 } /* RunThisOne */
    662 
    663 static Verbosity ChangeVerbosity(Verbosity verbosity, PRIntn delta) {
    664  return (Verbosity)(((PRIntn)verbosity) + delta);
    665 } /* ChangeVerbosity */
    666 
    667 int main(int argc, char** argv) {
    668  PLOptStatus os;
    669  const char* test_name = NULL;
    670  PLOptState* opt = PL_CreateOptState(argc, argv, "dqGc:o:p:t:w:");
    671 
    672  while (PL_OPT_EOL != (os = PL_GetNextOpt(opt))) {
    673    if (PL_OPT_BAD == os) {
    674      continue;
    675    }
    676    switch (opt->option) {
    677      case 0:
    678        test_name = opt->value;
    679        break;
    680      case 'd': /* debug mode */
    681        if (verbosity < noisy) {
    682          verbosity = ChangeVerbosity(verbosity, 1);
    683        }
    684        break;
    685      case 'q': /* debug mode */
    686        if (verbosity > silent) {
    687          verbosity = ChangeVerbosity(verbosity, -1);
    688        }
    689        break;
    690      case 'G': /* use global threads */
    691        thread_scope = PR_GLOBAL_THREAD;
    692        break;
    693      case 'c': /* number of client threads */
    694        client_threads = atoi(opt->value);
    695        break;
    696      case 'o': /* operations to compelete */
    697        ops_required = atoi(opt->value);
    698        break;
    699      case 'p': /* default port */
    700        default_port = atoi(opt->value);
    701        break;
    702      case 't': /* number of threads waiting */
    703        worker_threads = atoi(opt->value);
    704        break;
    705      case 'w': /* number of wait objects */
    706        wait_objects = atoi(opt->value);
    707        break;
    708      default:
    709        break;
    710    }
    711  }
    712  PL_DestroyOptState(opt);
    713 
    714  if (verbosity > 0) {
    715    debug = PR_GetSpecialFD(PR_StandardError);
    716  }
    717 
    718  RunThisOne(OneInThenCancelled, "OneInThenCancelled", test_name);
    719  RunThisOne(OneOpOneThread, "OneOpOneThread", test_name);
    720  RunThisOne(ManyOpOneThread, "ManyOpOneThread", test_name);
    721  RunThisOne(SomeOpsSomeThreads, "SomeOpsSomeThreads", test_name);
    722  RunThisOne(RealOneGroupIO, "RealOneGroupIO", test_name);
    723  return 0;
    724 } /* main */
    725 
    726 /* multwait.c */