multiwait.c (21115B)
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* This Source Code Form is subject to the terms of the Mozilla Public 3 * License, v. 2.0. If a copy of the MPL was not distributed with this 4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 5 6 #include "prio.h" 7 #include "prprf.h" 8 #include "prlog.h" 9 #include "prmem.h" 10 #include "pratom.h" 11 #include "prlock.h" 12 #include "prmwait.h" 13 #include "prclist.h" 14 #include "prerror.h" 15 #include "prinrval.h" 16 #include "prnetdb.h" 17 #include "prthread.h" 18 19 #include "plstr.h" 20 #include "plerror.h" 21 #include "plgetopt.h" 22 23 #include <string.h> 24 25 typedef struct Shared { 26 const char* title; 27 PRLock* list_lock; 28 PRWaitGroup* group; 29 PRIntervalTime timeout; 30 } Shared; 31 32 typedef enum Verbosity { silent, quiet, chatty, noisy } Verbosity; 33 34 #ifdef DEBUG 35 # define PORT_INC_DO +100 36 #else 37 # define PORT_INC_DO 38 #endif 39 #ifdef IS_64 40 # define PORT_INC_3264 +200 41 #else 42 # define PORT_INC_3264 43 #endif 44 45 static PRFileDesc* debug = NULL; 46 static PRInt32 desc_allocated = 0; 47 static PRUint16 default_port = 12273 PORT_INC_DO PORT_INC_3264; 48 static enum Verbosity verbosity = quiet; 49 static PRInt32 ops_required = 1000, ops_done = 0; 50 static PRThreadScope thread_scope = PR_LOCAL_THREAD; 51 static PRIntn client_threads = 20, worker_threads = 2, wait_objects = 50; 52 53 #if defined(DEBUG) 54 # define MW_ASSERT(_expr) \ 55 ((_expr) ? ((void)0) : _MW_Assert(#_expr, __FILE__, __LINE__)) 56 static void _MW_Assert(const char* s, const char* file, PRIntn ln) { 57 if (NULL != debug) { 58 PL_FPrintError(debug, NULL); 59 } 60 PR_Assert(s, file, ln); 61 } /* _MW_Assert */ 62 #else 63 # define MW_ASSERT(_expr) 64 #endif 65 66 static void PrintRecvDesc(PRRecvWait* desc, const char* msg) { 67 const char* tag[] = {"PR_MW_INTERRUPT", "PR_MW_TIMEOUT", "PR_MW_FAILURE", 68 "PR_MW_SUCCESS", "PR_MW_PENDING"}; 69 PR_fprintf(debug, "%s: PRRecvWait(@0x%x): {fd: 0x%x, outcome: %s, tmo: %u}\n", 70 msg, desc, desc->fd, tag[desc->outcome + 3], desc->timeout); 71 } /* PrintRecvDesc */ 72 73 static Shared* MakeShared(const char* title) { 74 Shared* shared = PR_NEWZAP(Shared); 75 shared->group = PR_CreateWaitGroup(1); 76 shared->timeout = PR_SecondsToInterval(1); 77 shared->list_lock = PR_NewLock(); 78 shared->title = title; 79 return shared; 80 } /* MakeShared */ 81 82 static void DestroyShared(Shared* shared) { 83 PRStatus rv; 84 if (verbosity > quiet) { 85 PR_fprintf(debug, "%s: destroying group\n", shared->title); 86 } 87 rv = PR_DestroyWaitGroup(shared->group); 88 MW_ASSERT(PR_SUCCESS == rv); 89 PR_DestroyLock(shared->list_lock); 90 PR_DELETE(shared); 91 } /* DestroyShared */ 92 93 static PRRecvWait* CreateRecvWait(PRFileDesc* fd, PRIntervalTime timeout) { 94 PRRecvWait* desc_out = PR_NEWZAP(PRRecvWait); 95 MW_ASSERT(NULL != desc_out); 96 97 MW_ASSERT(NULL != fd); 98 desc_out->fd = fd; 99 desc_out->timeout = timeout; 100 desc_out->buffer.length = 120; 101 desc_out->buffer.start = PR_CALLOC(120); 102 103 PR_AtomicIncrement(&desc_allocated); 104 105 if (verbosity > chatty) { 106 PrintRecvDesc(desc_out, "Allocated"); 107 } 108 return desc_out; 109 } /* CreateRecvWait */ 110 111 static void DestroyRecvWait(PRRecvWait* desc_out) { 112 if (verbosity > chatty) { 113 PrintRecvDesc(desc_out, "Destroying"); 114 } 115 PR_Close(desc_out->fd); 116 if (NULL != desc_out->buffer.start) { 117 PR_DELETE(desc_out->buffer.start); 118 } 119 PR_Free(desc_out); 120 (void)PR_AtomicDecrement(&desc_allocated); 121 } /* DestroyRecvWait */ 122 123 static void CancelGroup(Shared* shared) { 124 PRRecvWait* desc_out; 125 126 if (verbosity > quiet) { 127 PR_fprintf(debug, "%s Reclaiming wait descriptors\n", shared->title); 128 } 129 130 do { 131 desc_out = PR_CancelWaitGroup(shared->group); 132 if (NULL != desc_out) { 133 DestroyRecvWait(desc_out); 134 } 135 } while (NULL != desc_out); 136 137 MW_ASSERT(0 == desc_allocated); 138 MW_ASSERT(PR_GROUP_EMPTY_ERROR == PR_GetError()); 139 } /* CancelGroup */ 140 141 static void PR_CALLBACK ClientThread(void* arg) { 142 PRStatus rv; 143 PRInt32 bytes; 144 PRIntn empty_flags = 0; 145 PRNetAddr server_address; 146 unsigned char buffer[100]; 147 Shared* shared = (Shared*)arg; 148 PRFileDesc* server = PR_NewTCPSocket(); 149 if ((NULL == server) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) { 150 return; 151 } 152 MW_ASSERT(NULL != server); 153 154 if (verbosity > chatty) { 155 PR_fprintf(debug, "%s: Server socket @0x%x\n", shared->title, server); 156 } 157 158 /* Initialize the buffer so that Purify won't complain */ 159 memset(buffer, 0, sizeof(buffer)); 160 161 rv = PR_InitializeNetAddr(PR_IpAddrLoopback, default_port, &server_address); 162 MW_ASSERT(PR_SUCCESS == rv); 163 164 if (verbosity > quiet) { 165 PR_fprintf(debug, "%s: Client opening connection\n", shared->title); 166 } 167 rv = PR_Connect(server, &server_address, PR_INTERVAL_NO_TIMEOUT); 168 169 if (PR_FAILURE == rv) { 170 if (verbosity > silent) { 171 PL_FPrintError(debug, "Client connect failed"); 172 } 173 return; 174 } 175 176 while (ops_done < ops_required) { 177 bytes = PR_Send(server, buffer, sizeof(buffer), empty_flags, 178 PR_INTERVAL_NO_TIMEOUT); 179 if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) { 180 break; 181 } 182 MW_ASSERT(sizeof(buffer) == bytes); 183 if (verbosity > chatty) 184 PR_fprintf(debug, "%s: Client sent %d bytes\n", shared->title, 185 sizeof(buffer)); 186 bytes = PR_Recv(server, buffer, sizeof(buffer), empty_flags, 187 PR_INTERVAL_NO_TIMEOUT); 188 if (verbosity > chatty) 189 PR_fprintf(debug, "%s: Client received %d bytes\n", shared->title, 190 sizeof(buffer)); 191 if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) { 192 break; 193 } 194 MW_ASSERT(sizeof(buffer) == bytes); 195 PR_Sleep(shared->timeout); 196 } 197 rv = PR_Close(server); 198 MW_ASSERT(PR_SUCCESS == rv); 199 200 } /* ClientThread */ 201 202 static void OneInThenCancelled(Shared* shared) { 203 PRStatus rv; 204 PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait); 205 206 shared->timeout = PR_INTERVAL_NO_TIMEOUT; 207 208 desc_in->fd = PR_NewTCPSocket(); 209 desc_in->timeout = shared->timeout; 210 211 if (verbosity > chatty) { 212 PrintRecvDesc(desc_in, "Adding desc"); 213 } 214 215 rv = PR_AddWaitFileDesc(shared->group, desc_in); 216 MW_ASSERT(PR_SUCCESS == rv); 217 218 if (verbosity > chatty) { 219 PrintRecvDesc(desc_in, "Cancelling"); 220 } 221 rv = PR_CancelWaitFileDesc(shared->group, desc_in); 222 MW_ASSERT(PR_SUCCESS == rv); 223 224 desc_out = PR_WaitRecvReady(shared->group); 225 MW_ASSERT(desc_out == desc_in); 226 MW_ASSERT(PR_MW_INTERRUPT == desc_out->outcome); 227 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError()); 228 if (verbosity > chatty) { 229 PrintRecvDesc(desc_out, "Ready"); 230 } 231 232 rv = PR_Close(desc_in->fd); 233 MW_ASSERT(PR_SUCCESS == rv); 234 235 if (verbosity > quiet) { 236 PR_fprintf(debug, "%s: destroying group\n", shared->title); 237 } 238 239 PR_DELETE(desc_in); 240 } /* OneInThenCancelled */ 241 242 static void OneOpOneThread(Shared* shared) { 243 PRStatus rv; 244 PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait); 245 246 desc_in->fd = PR_NewTCPSocket(); 247 desc_in->timeout = shared->timeout; 248 249 if (verbosity > chatty) { 250 PrintRecvDesc(desc_in, "Adding desc"); 251 } 252 253 rv = PR_AddWaitFileDesc(shared->group, desc_in); 254 MW_ASSERT(PR_SUCCESS == rv); 255 desc_out = PR_WaitRecvReady(shared->group); 256 MW_ASSERT(desc_out == desc_in); 257 MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome); 258 MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError()); 259 if (verbosity > chatty) { 260 PrintRecvDesc(desc_out, "Ready"); 261 } 262 263 rv = PR_Close(desc_in->fd); 264 MW_ASSERT(PR_SUCCESS == rv); 265 266 PR_DELETE(desc_in); 267 } /* OneOpOneThread */ 268 269 static void ManyOpOneThread(Shared* shared) { 270 PRStatus rv; 271 PRIntn index; 272 PRRecvWait* desc_in; 273 PRRecvWait* desc_out; 274 275 if (verbosity > quiet) { 276 PR_fprintf(debug, "%s: adding %d descs\n", shared->title, wait_objects); 277 } 278 279 for (index = 0; index < wait_objects; ++index) { 280 desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout); 281 282 rv = PR_AddWaitFileDesc(shared->group, desc_in); 283 MW_ASSERT(PR_SUCCESS == rv); 284 } 285 286 while (ops_done < ops_required) { 287 desc_out = PR_WaitRecvReady(shared->group); 288 MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome); 289 MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError()); 290 if (verbosity > chatty) { 291 PrintRecvDesc(desc_out, "Ready/readding"); 292 } 293 rv = PR_AddWaitFileDesc(shared->group, desc_out); 294 MW_ASSERT(PR_SUCCESS == rv); 295 (void)PR_AtomicIncrement(&ops_done); 296 } 297 298 CancelGroup(shared); 299 } /* ManyOpOneThread */ 300 301 static void PR_CALLBACK SomeOpsThread(void* arg) { 302 PRRecvWait* desc_out; 303 PRStatus rv = PR_SUCCESS; 304 Shared* shared = (Shared*)arg; 305 do /* until interrupted */ 306 { 307 desc_out = PR_WaitRecvReady(shared->group); 308 if (NULL == desc_out) { 309 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError()); 310 if (verbosity > quiet) { 311 PR_fprintf(debug, "Aborted\n"); 312 } 313 break; 314 } 315 MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome); 316 MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError()); 317 if (verbosity > chatty) { 318 PrintRecvDesc(desc_out, "Ready"); 319 } 320 321 if (verbosity > chatty) { 322 PrintRecvDesc(desc_out, "Re-Adding"); 323 } 324 desc_out->timeout = shared->timeout; 325 rv = PR_AddWaitFileDesc(shared->group, desc_out); 326 PR_AtomicIncrement(&ops_done); 327 if (ops_done > ops_required) { 328 break; 329 } 330 } while (PR_SUCCESS == rv); 331 MW_ASSERT(PR_SUCCESS == rv); 332 } /* SomeOpsThread */ 333 334 static void SomeOpsSomeThreads(Shared* shared) { 335 PRStatus rv; 336 PRThread** thread; 337 PRIntn index; 338 PRRecvWait* desc_in; 339 340 thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads); 341 342 /* Create some threads */ 343 344 if (verbosity > quiet) { 345 PR_fprintf(debug, "%s: creating threads\n", shared->title); 346 } 347 for (index = 0; index < worker_threads; ++index) { 348 thread[index] = 349 PR_CreateThread(PR_USER_THREAD, SomeOpsThread, shared, PR_PRIORITY_HIGH, 350 thread_scope, PR_JOINABLE_THREAD, 16 * 1024); 351 } 352 353 /* then create some operations */ 354 if (verbosity > quiet) { 355 PR_fprintf(debug, "%s: creating desc\n", shared->title); 356 } 357 for (index = 0; index < wait_objects; ++index) { 358 desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout); 359 rv = PR_AddWaitFileDesc(shared->group, desc_in); 360 MW_ASSERT(PR_SUCCESS == rv); 361 } 362 363 if (verbosity > quiet) { 364 PR_fprintf(debug, "%s: sleeping\n", shared->title); 365 } 366 while (ops_done < ops_required) { 367 PR_Sleep(shared->timeout); 368 } 369 370 if (verbosity > quiet) { 371 PR_fprintf(debug, "%s: interrupting/joining threads\n", shared->title); 372 } 373 for (index = 0; index < worker_threads; ++index) { 374 rv = PR_Interrupt(thread[index]); 375 MW_ASSERT(PR_SUCCESS == rv); 376 rv = PR_JoinThread(thread[index]); 377 MW_ASSERT(PR_SUCCESS == rv); 378 } 379 PR_DELETE(thread); 380 381 CancelGroup(shared); 382 } /* SomeOpsSomeThreads */ 383 384 static PRStatus ServiceRequest(Shared* shared, PRRecvWait* desc) { 385 PRInt32 bytes_out; 386 387 if (verbosity > chatty) 388 PR_fprintf(debug, "%s: Service received %d bytes\n", shared->title, 389 desc->bytesRecv); 390 391 if (0 == desc->bytesRecv) { 392 goto quitting; 393 } 394 if ((-1 == desc->bytesRecv) && 395 (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) { 396 goto aborted; 397 } 398 399 bytes_out = PR_Send(desc->fd, desc->buffer.start, desc->bytesRecv, 0, 400 shared->timeout); 401 if (verbosity > chatty) 402 PR_fprintf(debug, "%s: Service sent %d bytes\n", shared->title, bytes_out); 403 404 if ((-1 == bytes_out) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) { 405 goto aborted; 406 } 407 MW_ASSERT(bytes_out == desc->bytesRecv); 408 409 return PR_SUCCESS; 410 411 aborted: 412 quitting: 413 return PR_FAILURE; 414 } /* ServiceRequest */ 415 416 static void PR_CALLBACK ServiceThread(void* arg) { 417 PRStatus rv = PR_SUCCESS; 418 PRRecvWait* desc_out = NULL; 419 Shared* shared = (Shared*)arg; 420 do /* until interrupted */ 421 { 422 if (NULL != desc_out) { 423 desc_out->timeout = PR_INTERVAL_NO_TIMEOUT; 424 if (verbosity > chatty) { 425 PrintRecvDesc(desc_out, "Service re-adding"); 426 } 427 rv = PR_AddWaitFileDesc(shared->group, desc_out); 428 MW_ASSERT(PR_SUCCESS == rv); 429 } 430 431 desc_out = PR_WaitRecvReady(shared->group); 432 if (NULL == desc_out) { 433 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError()); 434 break; 435 } 436 437 switch (desc_out->outcome) { 438 case PR_MW_SUCCESS: { 439 PR_AtomicIncrement(&ops_done); 440 if (verbosity > chatty) { 441 PrintRecvDesc(desc_out, "Service ready"); 442 } 443 rv = ServiceRequest(shared, desc_out); 444 break; 445 } 446 case PR_MW_INTERRUPT: 447 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError()); 448 rv = PR_FAILURE; /* if interrupted, then exit */ 449 break; 450 case PR_MW_TIMEOUT: 451 MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError()); 452 case PR_MW_FAILURE: 453 if (verbosity > silent) { 454 PL_FPrintError(debug, "RecvReady failure"); 455 } 456 break; 457 default: 458 break; 459 } 460 } while (PR_SUCCESS == rv); 461 462 if (NULL != desc_out) { 463 DestroyRecvWait(desc_out); 464 } 465 466 } /* ServiceThread */ 467 468 static void PR_CALLBACK EnumerationThread(void* arg) { 469 PRStatus rv; 470 PRIntn count; 471 PRRecvWait* desc; 472 Shared* shared = (Shared*)arg; 473 PRIntervalTime five_seconds = PR_SecondsToInterval(5); 474 PRMWaitEnumerator* enumerator = PR_CreateMWaitEnumerator(shared->group); 475 MW_ASSERT(NULL != enumerator); 476 477 while (PR_SUCCESS == PR_Sleep(five_seconds)) { 478 count = 0; 479 desc = NULL; 480 while (NULL != (desc = PR_EnumerateWaitGroup(enumerator, desc))) { 481 if (verbosity > chatty) { 482 PrintRecvDesc(desc, shared->title); 483 } 484 count += 1; 485 } 486 if (verbosity > silent) 487 PR_fprintf(debug, "%s Enumerated %d objects\n", shared->title, count); 488 } 489 490 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError()); 491 492 rv = PR_DestroyMWaitEnumerator(enumerator); 493 MW_ASSERT(PR_SUCCESS == rv); 494 } /* EnumerationThread */ 495 496 static void PR_CALLBACK ServerThread(void* arg) { 497 PRStatus rv; 498 PRIntn index; 499 PRRecvWait* desc_in; 500 PRThread** worker_thread; 501 Shared* shared = (Shared*)arg; 502 PRFileDesc *listener, *service; 503 PRNetAddr server_address, client_address; 504 505 worker_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads); 506 if (verbosity > quiet) { 507 PR_fprintf(debug, "%s: Server creating worker_threads\n", shared->title); 508 } 509 for (index = 0; index < worker_threads; ++index) { 510 worker_thread[index] = 511 PR_CreateThread(PR_USER_THREAD, ServiceThread, shared, PR_PRIORITY_HIGH, 512 thread_scope, PR_JOINABLE_THREAD, 16 * 1024); 513 } 514 515 rv = PR_InitializeNetAddr(PR_IpAddrAny, default_port, &server_address); 516 MW_ASSERT(PR_SUCCESS == rv); 517 518 listener = PR_NewTCPSocket(); 519 MW_ASSERT(NULL != listener); 520 if (verbosity > chatty) 521 PR_fprintf(debug, "%s: Server listener socket @0x%x\n", shared->title, 522 listener); 523 rv = PR_Bind(listener, &server_address); 524 MW_ASSERT(PR_SUCCESS == rv); 525 rv = PR_Listen(listener, 10); 526 MW_ASSERT(PR_SUCCESS == rv); 527 while (ops_done < ops_required) { 528 if (verbosity > quiet) { 529 PR_fprintf(debug, "%s: Server accepting connection\n", shared->title); 530 } 531 service = PR_Accept(listener, &client_address, PR_INTERVAL_NO_TIMEOUT); 532 if (NULL == service) { 533 if (PR_PENDING_INTERRUPT_ERROR == PR_GetError()) { 534 break; 535 } 536 PL_PrintError("Accept failed"); 537 MW_ASSERT(PR_FALSE && "Accept failed"); 538 } else { 539 desc_in = CreateRecvWait(service, shared->timeout); 540 desc_in->timeout = PR_INTERVAL_NO_TIMEOUT; 541 if (verbosity > chatty) { 542 PrintRecvDesc(desc_in, "Service adding"); 543 } 544 rv = PR_AddWaitFileDesc(shared->group, desc_in); 545 MW_ASSERT(PR_SUCCESS == rv); 546 } 547 } 548 549 if (verbosity > quiet) { 550 PR_fprintf(debug, "%s: Server interrupting worker_threads\n", 551 shared->title); 552 } 553 for (index = 0; index < worker_threads; ++index) { 554 rv = PR_Interrupt(worker_thread[index]); 555 MW_ASSERT(PR_SUCCESS == rv); 556 rv = PR_JoinThread(worker_thread[index]); 557 MW_ASSERT(PR_SUCCESS == rv); 558 } 559 PR_DELETE(worker_thread); 560 561 PR_Close(listener); 562 563 CancelGroup(shared); 564 565 } /* ServerThread */ 566 567 static void RealOneGroupIO(Shared* shared) { 568 /* 569 ** Create a server that listens for connections and then services 570 ** requests that come in over those connections. The server never 571 ** deletes a connection and assumes a basic RPC model of operation. 572 ** 573 ** Use worker_threads threads to service how every many open ports 574 ** there might be. 575 ** 576 ** Oh, ya. Almost forget. Create (some) clients as well. 577 */ 578 PRStatus rv; 579 PRIntn index; 580 PRThread *server_thread, *enumeration_thread, **client_thread; 581 582 if (verbosity > quiet) { 583 PR_fprintf(debug, "%s: creating server_thread\n", shared->title); 584 } 585 586 server_thread = 587 PR_CreateThread(PR_USER_THREAD, ServerThread, shared, PR_PRIORITY_HIGH, 588 thread_scope, PR_JOINABLE_THREAD, 16 * 1024); 589 590 if (verbosity > quiet) { 591 PR_fprintf(debug, "%s: creating enumeration_thread\n", shared->title); 592 } 593 594 enumeration_thread = PR_CreateThread(PR_USER_THREAD, EnumerationThread, 595 shared, PR_PRIORITY_HIGH, thread_scope, 596 PR_JOINABLE_THREAD, 16 * 1024); 597 598 if (verbosity > quiet) { 599 PR_fprintf(debug, "%s: snoozing before creating clients\n", shared->title); 600 } 601 PR_Sleep(5 * shared->timeout); 602 603 if (verbosity > quiet) { 604 PR_fprintf(debug, "%s: creating client_threads\n", shared->title); 605 } 606 client_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * client_threads); 607 for (index = 0; index < client_threads; ++index) { 608 client_thread[index] = PR_CreateThread(PR_USER_THREAD, ClientThread, shared, 609 PR_PRIORITY_NORMAL, thread_scope, 610 PR_JOINABLE_THREAD, 16 * 1024); 611 } 612 613 while (ops_done < ops_required) { 614 PR_Sleep(shared->timeout); 615 } 616 617 if (verbosity > quiet) { 618 PR_fprintf(debug, "%s: interrupting/joining client_threads\n", 619 shared->title); 620 } 621 for (index = 0; index < client_threads; ++index) { 622 rv = PR_Interrupt(client_thread[index]); 623 MW_ASSERT(PR_SUCCESS == rv); 624 rv = PR_JoinThread(client_thread[index]); 625 MW_ASSERT(PR_SUCCESS == rv); 626 } 627 PR_DELETE(client_thread); 628 629 if (verbosity > quiet) { 630 PR_fprintf(debug, "%s: interrupting/joining enumeration_thread\n", 631 shared->title); 632 } 633 rv = PR_Interrupt(enumeration_thread); 634 MW_ASSERT(PR_SUCCESS == rv); 635 rv = PR_JoinThread(enumeration_thread); 636 MW_ASSERT(PR_SUCCESS == rv); 637 638 if (verbosity > quiet) { 639 PR_fprintf(debug, "%s: interrupting/joining server_thread\n", 640 shared->title); 641 } 642 rv = PR_Interrupt(server_thread); 643 MW_ASSERT(PR_SUCCESS == rv); 644 rv = PR_JoinThread(server_thread); 645 MW_ASSERT(PR_SUCCESS == rv); 646 } /* RealOneGroupIO */ 647 648 static void RunThisOne(void (*func)(Shared*), const char* name, 649 const char* test_name) { 650 Shared* shared; 651 if ((NULL == test_name) || (0 == PL_strcmp(name, test_name))) { 652 if (verbosity > silent) { 653 PR_fprintf(debug, "%s()\n", name); 654 } 655 shared = MakeShared(name); 656 ops_done = 0; 657 func(shared); /* run the test */ 658 MW_ASSERT(0 == desc_allocated); 659 DestroyShared(shared); 660 } 661 } /* RunThisOne */ 662 663 static Verbosity ChangeVerbosity(Verbosity verbosity, PRIntn delta) { 664 return (Verbosity)(((PRIntn)verbosity) + delta); 665 } /* ChangeVerbosity */ 666 667 int main(int argc, char** argv) { 668 PLOptStatus os; 669 const char* test_name = NULL; 670 PLOptState* opt = PL_CreateOptState(argc, argv, "dqGc:o:p:t:w:"); 671 672 while (PL_OPT_EOL != (os = PL_GetNextOpt(opt))) { 673 if (PL_OPT_BAD == os) { 674 continue; 675 } 676 switch (opt->option) { 677 case 0: 678 test_name = opt->value; 679 break; 680 case 'd': /* debug mode */ 681 if (verbosity < noisy) { 682 verbosity = ChangeVerbosity(verbosity, 1); 683 } 684 break; 685 case 'q': /* debug mode */ 686 if (verbosity > silent) { 687 verbosity = ChangeVerbosity(verbosity, -1); 688 } 689 break; 690 case 'G': /* use global threads */ 691 thread_scope = PR_GLOBAL_THREAD; 692 break; 693 case 'c': /* number of client threads */ 694 client_threads = atoi(opt->value); 695 break; 696 case 'o': /* operations to compelete */ 697 ops_required = atoi(opt->value); 698 break; 699 case 'p': /* default port */ 700 default_port = atoi(opt->value); 701 break; 702 case 't': /* number of threads waiting */ 703 worker_threads = atoi(opt->value); 704 break; 705 case 'w': /* number of wait objects */ 706 wait_objects = atoi(opt->value); 707 break; 708 default: 709 break; 710 } 711 } 712 PL_DestroyOptState(opt); 713 714 if (verbosity > 0) { 715 debug = PR_GetSpecialFD(PR_StandardError); 716 } 717 718 RunThisOne(OneInThenCancelled, "OneInThenCancelled", test_name); 719 RunThisOne(OneOpOneThread, "OneOpOneThread", test_name); 720 RunThisOne(ManyOpOneThread, "ManyOpOneThread", test_name); 721 RunThisOne(SomeOpsSomeThreads, "SomeOpsSomeThreads", test_name); 722 RunThisOne(RealOneGroupIO, "RealOneGroupIO", test_name); 723 return 0; 724 } /* main */ 725 726 /* multwait.c */