cltsrv.c (36688B)
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* This Source Code Form is subject to the terms of the Mozilla Public 3 * License, v. 2.0. If a copy of the MPL was not distributed with this 4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 5 6 /* 7 * 8 * Notes: 9 * [1] lth. The call to Sleep() is a hack to get the test case to run 10 * on Windows 95. Without it, the test case fails with an error 11 * WSAECONNRESET following a recv() call. The error is caused by the 12 * server side thread termination without a shutdown() or closesocket() 13 * call. Windows docmunentation suggests that this is predicted 14 * behavior; that other platforms get away with it is ... serindipity. 15 * The test case should shutdown() or closesocket() before 16 * thread termination. I didn't have time to figure out where or how 17 * to do it. The Sleep() call inserts enough delay to allow the 18 * client side to recv() all his data before the server side thread 19 * terminates. Whew! ... 20 * 21 ** Modification History: 22 * 14-May-97 AGarcia- Converted the test to accomodate the debug_mode flag. 23 * The debug mode will print all of the printfs associated with this 24 *test. The regress mode will be the default mode. Since the regress tool limits 25 * the output to a one line status:PASS or FAIL,all of the printf 26 *statements have been handled with an if (debug_mode) statement. 27 */ 28 29 #include "prclist.h" 30 #include "prcvar.h" 31 #include "prerror.h" 32 #include "prinit.h" 33 #include "prinrval.h" 34 #include "prio.h" 35 #include "prlock.h" 36 #include "prlog.h" 37 #include "prtime.h" 38 #include "prmem.h" 39 #include "prnetdb.h" 40 #include "prprf.h" 41 #include "prthread.h" 42 43 #include "pprio.h" 44 #include "primpl.h" 45 46 #include "plstr.h" 47 #include "plerror.h" 48 #include "plgetopt.h" 49 50 #include <stdlib.h> 51 #include <string.h> 52 53 #if defined(XP_UNIX) 54 # include <math.h> 55 #endif 56 57 /* 58 ** This is the beginning of the test 59 */ 60 61 #define RECV_FLAGS 0 62 #define SEND_FLAGS 0 63 #define DEFAULT_LOW 0 64 #define DEFAULT_HIGH 0 65 #define BUFFER_SIZE 1024 66 #define DEFAULT_BACKLOG 5 67 68 #ifdef DEBUG 69 # define PORT_INC_DO +100 70 #else 71 # define PORT_INC_DO 72 #endif 73 #ifdef IS_64 74 # define PORT_INC_3264 +200 75 #else 76 # define PORT_INC_3264 77 #endif 78 79 #define DEFAULT_PORT 12849 PORT_INC_DO PORT_INC_3264 80 81 #define DEFAULT_CLIENTS 1 82 #define ALLOWED_IN_ACCEPT 1 83 #define DEFAULT_CLIPPING 1000 84 #define DEFAULT_WORKERS_MIN 1 85 #define DEFAULT_WORKERS_MAX 1 86 #define DEFAULT_SERVER "localhost" 87 #define DEFAULT_EXECUTION_TIME 10 88 #define DEFAULT_CLIENT_TIMEOUT 4000 89 #define DEFAULT_SERVER_TIMEOUT 4000 90 #define DEFAULT_SERVER_PRIORITY PR_PRIORITY_HIGH 91 92 typedef enum CSState_e { cs_init, cs_run, cs_stop, cs_exit } CSState_t; 93 94 static void PR_CALLBACK Worker(void* arg); 95 typedef struct CSPool_s CSPool_t; 96 typedef struct CSWorker_s CSWorker_t; 97 typedef struct CSServer_s CSServer_t; 98 typedef enum Verbosity { 99 TEST_LOG_ALWAYS, 100 TEST_LOG_ERROR, 101 TEST_LOG_WARNING, 102 TEST_LOG_NOTICE, 103 TEST_LOG_INFO, 104 TEST_LOG_STATUS, 105 TEST_LOG_VERBOSE 106 } Verbosity; 107 108 static PRInt32 domain = AF_INET; 109 static PRInt32 protocol = 6; /* TCP */ 110 static PRFileDesc* debug_out = NULL; 111 static PRBool debug_mode = PR_FALSE; 112 static PRBool pthread_stats = PR_FALSE; 113 static Verbosity verbosity = TEST_LOG_ALWAYS; 114 static PRThreadScope thread_scope = PR_LOCAL_THREAD; 115 116 struct CSWorker_s { 117 PRCList element; /* list of the server's workers */ 118 119 PRThread* thread; /* this worker objects thread */ 120 CSServer_t* server; /* back pointer to server structure */ 121 }; 122 123 struct CSPool_s { 124 PRCondVar* exiting; 125 PRCondVar* acceptComplete; 126 PRUint32 accepting, active, workers; 127 }; 128 129 struct CSServer_s { 130 PRCList list; /* head of worker list */ 131 132 PRLock* ml; 133 PRThread* thread; /* the main server thread */ 134 PRCondVar* stateChange; 135 136 PRUint16 port; /* port we're listening on */ 137 PRUint32 backlog; /* size of our listener backlog */ 138 PRFileDesc* listener; /* the fd accepting connections */ 139 140 CSPool_t pool; /* statistics on worker threads */ 141 CSState_t state; /* the server's state */ 142 struct /* controlling worker counts */ 143 { 144 PRUint32 minimum, maximum, accepting; 145 } workers; 146 147 /* statistics */ 148 PRIntervalTime started, stopped; 149 PRUint32 operations, bytesTransferred; 150 }; 151 152 typedef struct CSDescriptor_s { 153 PRInt32 size; /* size of transfer */ 154 char filename[60]; /* filename, null padded */ 155 } CSDescriptor_t; 156 157 typedef struct CSClient_s { 158 PRLock* ml; 159 PRThread* thread; 160 PRCondVar* stateChange; 161 PRNetAddr serverAddress; 162 163 CSState_t state; 164 165 /* statistics */ 166 PRIntervalTime started, stopped; 167 PRUint32 operations, bytesTransferred; 168 } CSClient_t; 169 170 #define TEST_LOG(l, p, a) \ 171 do { \ 172 if (debug_mode || (p <= verbosity)) printf a; \ 173 } while (0) 174 175 PRLogModuleInfo* cltsrv_log_file = NULL; 176 177 #define MY_ASSERT(_expr) \ 178 ((_expr) ? ((void)0) : _MY_Assert(#_expr, __FILE__, __LINE__)) 179 180 #define TEST_ASSERT(_expr) \ 181 ((_expr) ? ((void)0) : _MY_Assert(#_expr, __FILE__, __LINE__)) 182 183 static void _MY_Assert(const char* s, const char* file, PRIntn ln) { 184 PL_PrintError(NULL); 185 PR_Assert(s, file, ln); 186 } /* _MY_Assert */ 187 188 static PRBool Aborted(PRStatus rv) { 189 return ((PR_FAILURE == rv) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) 190 ? PR_TRUE 191 : PR_FALSE; 192 } 193 194 static void TimeOfDayMessage(const char* msg, PRThread* me) { 195 char buffer[100]; 196 PRExplodedTime tod; 197 PR_ExplodeTime(PR_Now(), PR_LocalTimeParameters, &tod); 198 (void)PR_FormatTime(buffer, sizeof(buffer), "%H:%M:%S", &tod); 199 200 TEST_LOG(cltsrv_log_file, TEST_LOG_ALWAYS, 201 ("%s(0x%p): %s\n", msg, me, buffer)); 202 } /* TimeOfDayMessage */ 203 204 static void PR_CALLBACK Client(void* arg) { 205 PRStatus rv; 206 PRIntn index; 207 char buffer[1024]; 208 PRFileDesc* fd = NULL; 209 PRUintn clipping = DEFAULT_CLIPPING; 210 PRThread* me = PR_GetCurrentThread(); 211 CSClient_t* client = (CSClient_t*)arg; 212 CSDescriptor_t* descriptor = PR_NEW(CSDescriptor_t); 213 PRIntervalTime timeout = PR_MillisecondsToInterval(DEFAULT_CLIENT_TIMEOUT); 214 215 for (index = 0; index < sizeof(buffer); ++index) { 216 buffer[index] = (char)index; 217 } 218 219 client->started = PR_IntervalNow(); 220 221 PR_Lock(client->ml); 222 client->state = cs_run; 223 PR_NotifyCondVar(client->stateChange); 224 PR_Unlock(client->ml); 225 226 TimeOfDayMessage("Client started at", me); 227 228 while (cs_run == client->state) { 229 PRInt32 bytes, descbytes, filebytes, netbytes; 230 231 (void)PR_NetAddrToString(&client->serverAddress, buffer, sizeof(buffer)); 232 TEST_LOG(cltsrv_log_file, TEST_LOG_INFO, 233 ("\tClient(0x%p): connecting to server at %s\n", me, buffer)); 234 235 fd = PR_Socket(domain, SOCK_STREAM, protocol); 236 TEST_ASSERT(NULL != fd); 237 rv = PR_Connect(fd, &client->serverAddress, timeout); 238 if (PR_FAILURE == rv) { 239 TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR, 240 ("\tClient(0x%p): conection failed (%d, %d)\n", me, 241 PR_GetError(), PR_GetOSError())); 242 goto aborted; 243 } 244 245 memset(descriptor, 0, sizeof(*descriptor)); 246 descriptor->size = PR_htonl(descbytes = rand() % clipping); 247 PR_snprintf(descriptor->filename, sizeof(descriptor->filename), 248 "CS%p%p-%p.dat", client->started, me, client->operations); 249 TEST_LOG( 250 cltsrv_log_file, TEST_LOG_VERBOSE, 251 ("\tClient(0x%p): sending descriptor for %u bytes\n", me, descbytes)); 252 bytes = PR_Send(fd, descriptor, sizeof(*descriptor), SEND_FLAGS, timeout); 253 if (sizeof(CSDescriptor_t) != bytes) { 254 if (Aborted(PR_FAILURE)) { 255 goto aborted; 256 } 257 if (PR_IO_TIMEOUT_ERROR == PR_GetError()) { 258 TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR, 259 ("\tClient(0x%p): send descriptor timeout\n", me)); 260 goto retry; 261 } 262 } 263 TEST_ASSERT(sizeof(*descriptor) == bytes); 264 265 netbytes = 0; 266 while (netbytes < descbytes) { 267 filebytes = sizeof(buffer); 268 if ((descbytes - netbytes) < filebytes) { 269 filebytes = descbytes - netbytes; 270 } 271 TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE, 272 ("\tClient(0x%p): sending %d bytes\n", me, filebytes)); 273 bytes = PR_Send(fd, buffer, filebytes, SEND_FLAGS, timeout); 274 if (filebytes != bytes) { 275 if (Aborted(PR_FAILURE)) { 276 goto aborted; 277 } 278 if (PR_IO_TIMEOUT_ERROR == PR_GetError()) { 279 TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR, 280 ("\tClient(0x%p): send data timeout\n", me)); 281 goto retry; 282 } 283 } 284 TEST_ASSERT(bytes == filebytes); 285 netbytes += bytes; 286 } 287 filebytes = 0; 288 while (filebytes < descbytes) { 289 netbytes = sizeof(buffer); 290 if ((descbytes - filebytes) < netbytes) { 291 netbytes = descbytes - filebytes; 292 } 293 TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE, 294 ("\tClient(0x%p): receiving %d bytes\n", me, netbytes)); 295 bytes = PR_Recv(fd, buffer, netbytes, RECV_FLAGS, timeout); 296 if (-1 == bytes) { 297 if (Aborted(PR_FAILURE)) { 298 TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR, 299 ("\tClient(0x%p): receive data aborted\n", me)); 300 goto aborted; 301 } else if (PR_IO_TIMEOUT_ERROR == PR_GetError()) 302 TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR, 303 ("\tClient(0x%p): receive data timeout\n", me)); 304 else 305 TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR, 306 ("\tClient(0x%p): receive error (%d, %d)\n", me, 307 PR_GetError(), PR_GetOSError())); 308 goto retry; 309 } 310 if (0 == bytes) { 311 TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR, 312 ("\t\tClient(0x%p): unexpected end of stream\n", 313 PR_GetCurrentThread())); 314 break; 315 } 316 filebytes += bytes; 317 } 318 319 rv = PR_Shutdown(fd, PR_SHUTDOWN_BOTH); 320 if (Aborted(rv)) { 321 goto aborted; 322 } 323 TEST_ASSERT(PR_SUCCESS == rv); 324 retry: 325 (void)PR_Close(fd); 326 fd = NULL; 327 TEST_LOG(cltsrv_log_file, TEST_LOG_INFO, 328 ("\tClient(0x%p): disconnected from server\n", me)); 329 330 PR_Lock(client->ml); 331 client->operations += 1; 332 client->bytesTransferred += 2 * descbytes; 333 rv = PR_WaitCondVar(client->stateChange, rand() % clipping); 334 PR_Unlock(client->ml); 335 if (Aborted(rv)) { 336 break; 337 } 338 } 339 340 aborted: 341 client->stopped = PR_IntervalNow(); 342 343 PR_ClearInterrupt(); 344 if (NULL != fd) { 345 rv = PR_Close(fd); 346 } 347 348 PR_Lock(client->ml); 349 client->state = cs_exit; 350 PR_NotifyCondVar(client->stateChange); 351 PR_Unlock(client->ml); 352 PR_DELETE(descriptor); 353 TEST_LOG( 354 cltsrv_log_file, TEST_LOG_ALWAYS, 355 ("\tClient(0x%p): stopped after %u operations and %u bytes\n", 356 PR_GetCurrentThread(), client->operations, client->bytesTransferred)); 357 358 } /* Client */ 359 360 static PRStatus ProcessRequest(PRFileDesc* fd, CSServer_t* server) { 361 PRStatus drv, rv; 362 char buffer[1024]; 363 PRFileDesc* file = NULL; 364 PRThread* me = PR_GetCurrentThread(); 365 PRInt32 bytes, descbytes, netbytes, filebytes = 0; 366 CSDescriptor_t* descriptor = PR_NEW(CSDescriptor_t); 367 PRIntervalTime timeout = PR_MillisecondsToInterval(DEFAULT_SERVER_TIMEOUT); 368 369 TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE, 370 ("\tProcessRequest(0x%p): receiving desciptor\n", me)); 371 bytes = PR_Recv(fd, descriptor, sizeof(*descriptor), RECV_FLAGS, timeout); 372 if (-1 == bytes) { 373 rv = PR_FAILURE; 374 if (Aborted(rv)) { 375 goto exit; 376 } 377 if (PR_IO_TIMEOUT_ERROR == PR_GetError()) { 378 TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR, 379 ("\tProcessRequest(0x%p): receive timeout\n", me)); 380 } 381 goto exit; 382 } 383 if (0 == bytes) { 384 rv = PR_FAILURE; 385 TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR, 386 ("\tProcessRequest(0x%p): unexpected end of file\n", me)); 387 goto exit; 388 } 389 descbytes = PR_ntohl(descriptor->size); 390 TEST_ASSERT(sizeof(*descriptor) == bytes); 391 392 TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE, 393 ("\t\tProcessRequest(0x%p): read descriptor {%d, %s}\n", me, 394 descbytes, descriptor->filename)); 395 396 file = PR_Open(descriptor->filename, (PR_CREATE_FILE | PR_WRONLY), 0666); 397 if (NULL == file) { 398 rv = PR_FAILURE; 399 if (Aborted(rv)) { 400 goto aborted; 401 } 402 if (PR_IO_TIMEOUT_ERROR == PR_GetError()) { 403 TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR, 404 ("\tProcessRequest(0x%p): open file timeout\n", me)); 405 goto aborted; 406 } 407 } 408 TEST_ASSERT(NULL != file); 409 410 filebytes = 0; 411 while (filebytes < descbytes) { 412 netbytes = sizeof(buffer); 413 if ((descbytes - filebytes) < netbytes) { 414 netbytes = descbytes - filebytes; 415 } 416 TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE, 417 ("\tProcessRequest(0x%p): receive %d bytes\n", me, netbytes)); 418 bytes = PR_Recv(fd, buffer, netbytes, RECV_FLAGS, timeout); 419 if (-1 == bytes) { 420 rv = PR_FAILURE; 421 if (Aborted(rv)) { 422 goto aborted; 423 } 424 if (PR_IO_TIMEOUT_ERROR == PR_GetError()) { 425 TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR, 426 ("\t\tProcessRequest(0x%p): receive data timeout\n", me)); 427 goto aborted; 428 } 429 /* 430 * XXX: I got (PR_CONNECT_RESET_ERROR, ERROR_NETNAME_DELETED) 431 * on NT here. This is equivalent to ECONNRESET on Unix. 432 * -wtc 433 */ 434 TEST_LOG(cltsrv_log_file, TEST_LOG_WARNING, 435 ("\t\tProcessRequest(0x%p): unexpected error (%d, %d)\n", me, 436 PR_GetError(), PR_GetOSError())); 437 goto aborted; 438 } 439 if (0 == bytes) { 440 TEST_LOG(cltsrv_log_file, TEST_LOG_WARNING, 441 ("\t\tProcessRequest(0x%p): unexpected end of stream\n", me)); 442 rv = PR_FAILURE; 443 goto aborted; 444 } 445 filebytes += bytes; 446 netbytes = bytes; 447 /* The byte count for PR_Write should be positive */ 448 MY_ASSERT(netbytes > 0); 449 TEST_LOG( 450 cltsrv_log_file, TEST_LOG_VERBOSE, 451 ("\tProcessRequest(0x%p): write %d bytes to file\n", me, netbytes)); 452 bytes = PR_Write(file, buffer, netbytes); 453 if (netbytes != bytes) { 454 rv = PR_FAILURE; 455 if (Aborted(rv)) { 456 goto aborted; 457 } 458 if (PR_IO_TIMEOUT_ERROR == PR_GetError()) { 459 TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR, 460 ("\t\tProcessRequest(0x%p): write file timeout\n", me)); 461 goto aborted; 462 } 463 } 464 TEST_ASSERT(bytes > 0); 465 } 466 467 PR_Lock(server->ml); 468 server->operations += 1; 469 server->bytesTransferred += filebytes; 470 PR_Unlock(server->ml); 471 472 rv = PR_Close(file); 473 if (Aborted(rv)) { 474 goto aborted; 475 } 476 TEST_ASSERT(PR_SUCCESS == rv); 477 file = NULL; 478 479 TEST_LOG( 480 cltsrv_log_file, TEST_LOG_VERBOSE, 481 ("\t\tProcessRequest(0x%p): opening %s\n", me, descriptor->filename)); 482 file = PR_Open(descriptor->filename, PR_RDONLY, 0); 483 if (NULL == file) { 484 rv = PR_FAILURE; 485 if (Aborted(rv)) { 486 goto aborted; 487 } 488 if (PR_IO_TIMEOUT_ERROR == PR_GetError()) { 489 TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR, 490 ("\t\tProcessRequest(0x%p): open file timeout\n", 491 PR_GetCurrentThread())); 492 goto aborted; 493 } 494 TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR, 495 ("\t\tProcessRequest(0x%p): other file open error (%u, %u)\n", me, 496 PR_GetError(), PR_GetOSError())); 497 goto aborted; 498 } 499 TEST_ASSERT(NULL != file); 500 501 netbytes = 0; 502 while (netbytes < descbytes) { 503 filebytes = sizeof(buffer); 504 if ((descbytes - netbytes) < filebytes) { 505 filebytes = descbytes - netbytes; 506 } 507 TEST_LOG( 508 cltsrv_log_file, TEST_LOG_VERBOSE, 509 ("\tProcessRequest(0x%p): read %d bytes from file\n", me, filebytes)); 510 bytes = PR_Read(file, buffer, filebytes); 511 if (filebytes != bytes) { 512 rv = PR_FAILURE; 513 if (Aborted(rv)) { 514 goto aborted; 515 } 516 if (PR_IO_TIMEOUT_ERROR == PR_GetError()) 517 TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR, 518 ("\t\tProcessRequest(0x%p): read file timeout\n", me)); 519 else 520 TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR, 521 ("\t\tProcessRequest(0x%p): other file error (%d, %d)\n", me, 522 PR_GetError(), PR_GetOSError())); 523 goto aborted; 524 } 525 TEST_ASSERT(bytes > 0); 526 netbytes += bytes; 527 filebytes = bytes; 528 TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE, 529 ("\t\tProcessRequest(0x%p): sending %d bytes\n", me, filebytes)); 530 bytes = PR_Send(fd, buffer, filebytes, SEND_FLAGS, timeout); 531 if (filebytes != bytes) { 532 rv = PR_FAILURE; 533 if (Aborted(rv)) { 534 goto aborted; 535 } 536 if (PR_IO_TIMEOUT_ERROR == PR_GetError()) { 537 TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR, 538 ("\t\tProcessRequest(0x%p): send data timeout\n", me)); 539 goto aborted; 540 } 541 break; 542 } 543 TEST_ASSERT(bytes > 0); 544 } 545 546 PR_Lock(server->ml); 547 server->bytesTransferred += filebytes; 548 PR_Unlock(server->ml); 549 550 rv = PR_Shutdown(fd, PR_SHUTDOWN_BOTH); 551 if (Aborted(rv)) { 552 goto aborted; 553 } 554 555 rv = PR_Close(file); 556 if (Aborted(rv)) { 557 goto aborted; 558 } 559 TEST_ASSERT(PR_SUCCESS == rv); 560 file = NULL; 561 562 aborted: 563 PR_ClearInterrupt(); 564 if (NULL != file) { 565 PR_Close(file); 566 } 567 drv = PR_Delete(descriptor->filename); 568 TEST_ASSERT(PR_SUCCESS == drv); 569 exit: 570 TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE, 571 ("\t\tProcessRequest(0x%p): Finished\n", me)); 572 573 PR_DELETE(descriptor); 574 575 #if defined(WIN95) 576 PR_Sleep(PR_MillisecondsToInterval(200)); /* lth. see note [1] */ 577 #endif 578 return rv; 579 } /* ProcessRequest */ 580 581 static PRStatus CreateWorker(CSServer_t* server, CSPool_t* pool) { 582 CSWorker_t* worker = PR_NEWZAP(CSWorker_t); 583 worker->server = server; 584 PR_INIT_CLIST(&worker->element); 585 worker->thread = 586 PR_CreateThread(PR_USER_THREAD, Worker, worker, DEFAULT_SERVER_PRIORITY, 587 thread_scope, PR_UNJOINABLE_THREAD, 0); 588 if (NULL == worker->thread) { 589 PR_DELETE(worker); 590 return PR_FAILURE; 591 } 592 593 TEST_LOG(cltsrv_log_file, TEST_LOG_STATUS, 594 ("\tCreateWorker(0x%p): create new worker (0x%p)\n", 595 PR_GetCurrentThread(), worker->thread)); 596 597 return PR_SUCCESS; 598 } /* CreateWorker */ 599 600 static void PR_CALLBACK Worker(void* arg) { 601 PRStatus rv; 602 PRNetAddr from; 603 PRFileDesc* fd = NULL; 604 PRThread* me = PR_GetCurrentThread(); 605 CSWorker_t* worker = (CSWorker_t*)arg; 606 CSServer_t* server = worker->server; 607 CSPool_t* pool = &server->pool; 608 609 TEST_LOG(cltsrv_log_file, TEST_LOG_NOTICE, 610 ("\t\tWorker(0x%p): started [%u]\n", me, pool->workers + 1)); 611 612 PR_Lock(server->ml); 613 PR_APPEND_LINK(&worker->element, &server->list); 614 pool->workers += 1; /* define our existance */ 615 616 while (cs_run == server->state) { 617 while (pool->accepting >= server->workers.accepting) { 618 TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE, 619 ("\t\tWorker(0x%p): waiting for accept slot[%d]\n", me, 620 pool->accepting)); 621 rv = PR_WaitCondVar(pool->acceptComplete, PR_INTERVAL_NO_TIMEOUT); 622 if (Aborted(rv) || (cs_run != server->state)) { 623 TEST_LOG(cltsrv_log_file, TEST_LOG_NOTICE, 624 ("\tWorker(0x%p): has been %s\n", me, 625 (Aborted(rv) ? "interrupted" : "stopped"))); 626 goto exit; 627 } 628 } 629 pool->accepting += 1; /* how many are really in accept */ 630 PR_Unlock(server->ml); 631 632 TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE, 633 ("\t\tWorker(0x%p): calling accept\n", me)); 634 fd = PR_Accept(server->listener, &from, PR_INTERVAL_NO_TIMEOUT); 635 636 PR_Lock(server->ml); 637 pool->accepting -= 1; 638 PR_NotifyCondVar(pool->acceptComplete); 639 640 if ((NULL == fd) && Aborted(PR_FAILURE)) { 641 if (NULL != server->listener) { 642 PR_Close(server->listener); 643 server->listener = NULL; 644 } 645 goto exit; 646 } 647 648 if (NULL != fd) { 649 /* 650 ** Create another worker of the total number of workers is 651 ** less than the minimum specified or we have none left in 652 ** accept() AND we're not over the maximum. 653 ** This sort of presumes that the number allowed in accept 654 ** is at least as many as the minimum. Otherwise we'll keep 655 ** creating new threads and deleting them soon after. 656 */ 657 PRBool another = ((pool->workers < server->workers.minimum) || 658 ((0 == pool->accepting) && 659 (pool->workers < server->workers.maximum))) 660 ? PR_TRUE 661 : PR_FALSE; 662 pool->active += 1; 663 PR_Unlock(server->ml); 664 665 if (another) { 666 (void)CreateWorker(server, pool); 667 } 668 669 rv = ProcessRequest(fd, server); 670 if (PR_SUCCESS != rv) 671 TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR, 672 ("\t\tWorker(0x%p): server process ended abnormally\n", me)); 673 (void)PR_Close(fd); 674 fd = NULL; 675 676 PR_Lock(server->ml); 677 pool->active -= 1; 678 } 679 } 680 681 exit: 682 PR_ClearInterrupt(); 683 PR_Unlock(server->ml); 684 685 if (NULL != fd) { 686 (void)PR_Shutdown(fd, PR_SHUTDOWN_BOTH); 687 (void)PR_Close(fd); 688 } 689 690 TEST_LOG(cltsrv_log_file, TEST_LOG_NOTICE, 691 ("\t\tWorker(0x%p): exiting [%u]\n", PR_GetCurrentThread(), 692 pool->workers)); 693 694 PR_Lock(server->ml); 695 pool->workers -= 1; /* undefine our existance */ 696 PR_REMOVE_AND_INIT_LINK(&worker->element); 697 PR_NotifyCondVar(pool->exiting); 698 PR_Unlock(server->ml); 699 700 PR_DELETE(worker); /* destruction of the "worker" object */ 701 702 } /* Worker */ 703 704 static void PR_CALLBACK Server(void* arg) { 705 PRStatus rv; 706 PRNetAddr serverAddress; 707 PRThread* me = PR_GetCurrentThread(); 708 CSServer_t* server = (CSServer_t*)arg; 709 PRSocketOptionData sockOpt; 710 711 server->listener = PR_Socket(domain, SOCK_STREAM, protocol); 712 713 sockOpt.option = PR_SockOpt_Reuseaddr; 714 sockOpt.value.reuse_addr = PR_TRUE; 715 rv = PR_SetSocketOption(server->listener, &sockOpt); 716 TEST_ASSERT(PR_SUCCESS == rv); 717 718 memset(&serverAddress, 0, sizeof(serverAddress)); 719 if (PR_AF_INET6 != domain) { 720 TEST_LOG(cltsrv_log_file, TEST_LOG_ALWAYS, 721 ("server binding to ip port %s\n", DEFAULT_PORT)); 722 rv = PR_InitializeNetAddr(PR_IpAddrAny, DEFAULT_PORT, &serverAddress); 723 } else { 724 TEST_LOG(cltsrv_log_file, TEST_LOG_ALWAYS, 725 ("server binding to ipv6 port %s\n", DEFAULT_PORT)); 726 rv = PR_SetNetAddr(PR_IpAddrAny, PR_AF_INET6, DEFAULT_PORT, &serverAddress); 727 } 728 rv = PR_Bind(server->listener, &serverAddress); 729 TEST_ASSERT(PR_SUCCESS == rv); 730 731 rv = PR_Listen(server->listener, server->backlog); 732 TEST_ASSERT(PR_SUCCESS == rv); 733 734 server->started = PR_IntervalNow(); 735 TimeOfDayMessage("Server started at", me); 736 737 PR_Lock(server->ml); 738 server->state = cs_run; 739 PR_NotifyCondVar(server->stateChange); 740 PR_Unlock(server->ml); 741 742 /* 743 ** Create the first worker (actually, a thread that accepts 744 ** connections and then processes the work load as needed). 745 ** From this point on, additional worker threads are created 746 ** as they are needed by existing worker threads. 747 */ 748 rv = CreateWorker(server, &server->pool); 749 TEST_ASSERT(PR_SUCCESS == rv); 750 751 /* 752 ** From here on this thread is merely hanging around as the contact 753 ** point for the main test driver. It's just waiting for the driver 754 ** to declare the test complete. 755 */ 756 TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE, 757 ("\tServer(0x%p): waiting for state change\n", me)); 758 759 PR_Lock(server->ml); 760 while ((cs_run == server->state) && !Aborted(rv)) { 761 rv = PR_WaitCondVar(server->stateChange, PR_INTERVAL_NO_TIMEOUT); 762 } 763 PR_Unlock(server->ml); 764 PR_ClearInterrupt(); 765 766 TEST_LOG(cltsrv_log_file, TEST_LOG_INFO, 767 ("\tServer(0x%p): shutting down workers\n", me)); 768 769 /* 770 ** Get all the worker threads to exit. They know how to 771 ** clean up after themselves, so this is just a matter of 772 ** waiting for clorine in the pool to take effect. During 773 ** this stage we're ignoring interrupts. 774 */ 775 server->workers.minimum = server->workers.maximum = 0; 776 777 PR_Lock(server->ml); 778 while (!PR_CLIST_IS_EMPTY(&server->list)) { 779 PRCList* head = PR_LIST_HEAD(&server->list); 780 CSWorker_t* worker = (CSWorker_t*)head; 781 TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE, 782 ("\tServer(0x%p): interrupting worker(0x%p)\n", me, worker)); 783 rv = PR_Interrupt(worker->thread); 784 TEST_ASSERT(PR_SUCCESS == rv); 785 PR_REMOVE_AND_INIT_LINK(head); 786 } 787 788 while (server->pool.workers > 0) { 789 TEST_LOG(cltsrv_log_file, TEST_LOG_NOTICE, 790 ("\tServer(0x%p): waiting for %u workers to exit\n", me, 791 server->pool.workers)); 792 (void)PR_WaitCondVar(server->pool.exiting, PR_INTERVAL_NO_TIMEOUT); 793 } 794 795 server->state = cs_exit; 796 PR_NotifyCondVar(server->stateChange); 797 PR_Unlock(server->ml); 798 799 TEST_LOG(cltsrv_log_file, TEST_LOG_ALWAYS, 800 ("\tServer(0x%p): stopped after %u operations and %u bytes\n", me, 801 server->operations, server->bytesTransferred)); 802 803 if (NULL != server->listener) { 804 PR_Close(server->listener); 805 } 806 server->stopped = PR_IntervalNow(); 807 808 } /* Server */ 809 810 static void WaitForCompletion(PRIntn execution) { 811 while (execution > 0) { 812 PRIntn dally = (execution > 30) ? 30 : execution; 813 PR_Sleep(PR_SecondsToInterval(dally)); 814 if (pthread_stats) { 815 PT_FPrintStats(debug_out, "\nPThread Statistics\n"); 816 } 817 execution -= dally; 818 } 819 } /* WaitForCompletion */ 820 821 static void Help(void) { 822 PR_fprintf(debug_out, "cltsrv test program usage:\n"); 823 PR_fprintf(debug_out, 824 "\t-a <n> threads allowed in accept (5)\n"); 825 PR_fprintf(debug_out, 826 "\t-b <n> backlock for listen (5)\n"); 827 PR_fprintf(debug_out, 828 "\t-c <threads> number of clients to create (1)\n"); 829 PR_fprintf(debug_out, 830 "\t-f <low> low water mark for fd caching (0)\n"); 831 PR_fprintf(debug_out, 832 "\t-F <high> high water mark for fd caching (0)\n"); 833 PR_fprintf(debug_out, 834 "\t-w <threads> minimal number of server threads (1)\n"); 835 PR_fprintf(debug_out, 836 "\t-W <threads> maximum number of server threads (1)\n"); 837 PR_fprintf(debug_out, 838 "\t-e <seconds> duration of the test in seconds (10)\n"); 839 PR_fprintf(debug_out, 840 "\t-s <string> dsn name of server (localhost)\n"); 841 PR_fprintf(debug_out, 842 "\t-G use GLOBAL threads (LOCAL)\n"); 843 PR_fprintf(debug_out, 844 "\t-X use XTP as transport (TCP)\n"); 845 PR_fprintf(debug_out, 846 "\t-6 Use IPv6 (IPv4)\n"); 847 PR_fprintf(debug_out, 848 "\t-v verbosity (accumulative) (0)\n"); 849 PR_fprintf(debug_out, 850 "\t-p pthread statistics (FALSE)\n"); 851 PR_fprintf(debug_out, 852 "\t-d debug mode (FALSE)\n"); 853 PR_fprintf(debug_out, "\t-h this message\n"); 854 } /* Help */ 855 856 static Verbosity IncrementVerbosity(void) { 857 PRIntn verboge = (PRIntn)verbosity + 1; 858 return (Verbosity)verboge; 859 } /* IncrementVerbosity */ 860 861 int main(int argc, char** argv) { 862 PRUintn index; 863 PRBool boolean; 864 CSClient_t* client; 865 PRStatus rv, joinStatus; 866 CSServer_t* server = NULL; 867 868 PRUintn backlog = DEFAULT_BACKLOG; 869 PRUintn clients = DEFAULT_CLIENTS; 870 const char* serverName = DEFAULT_SERVER; 871 PRBool serverIsLocal = PR_TRUE; 872 PRUintn accepting = ALLOWED_IN_ACCEPT; 873 PRUintn workersMin = DEFAULT_WORKERS_MIN; 874 PRUintn workersMax = DEFAULT_WORKERS_MAX; 875 PRIntn execution = DEFAULT_EXECUTION_TIME; 876 PRIntn low = DEFAULT_LOW, high = DEFAULT_HIGH; 877 878 /* 879 * -G use global threads 880 * -a <n> threads allowed in accept 881 * -b <n> backlock for listen 882 * -c <threads> number of clients to create 883 * -f <low> low water mark for caching FDs 884 * -F <high> high water mark for caching FDs 885 * -w <threads> minimal number of server threads 886 * -W <threads> maximum number of server threads 887 * -e <seconds> duration of the test in seconds 888 * -s <string> dsn name of server (implies no server here) 889 * -v verbosity 890 */ 891 892 PLOptStatus os; 893 PLOptState* opt = PL_CreateOptState(argc, argv, "GX6b:a:c:f:F:w:W:e:s:vdhp"); 894 895 debug_out = PR_GetSpecialFD(PR_StandardError); 896 897 while (PL_OPT_EOL != (os = PL_GetNextOpt(opt))) { 898 if (PL_OPT_BAD == os) { 899 continue; 900 } 901 switch (opt->option) { 902 case 'G': /* use global threads */ 903 thread_scope = PR_GLOBAL_THREAD; 904 break; 905 case 'X': /* use XTP as transport */ 906 protocol = 36; 907 break; 908 case '6': /* Use IPv6 */ 909 domain = PR_AF_INET6; 910 break; 911 case 'a': /* the value for accepting */ 912 accepting = atoi(opt->value); 913 break; 914 case 'b': /* the value for backlock */ 915 backlog = atoi(opt->value); 916 break; 917 case 'c': /* number of client threads */ 918 clients = atoi(opt->value); 919 break; 920 case 'f': /* low water fd cache */ 921 low = atoi(opt->value); 922 break; 923 case 'F': /* low water fd cache */ 924 high = atoi(opt->value); 925 break; 926 case 'w': /* minimum server worker threads */ 927 workersMin = atoi(opt->value); 928 break; 929 case 'W': /* maximum server worker threads */ 930 workersMax = atoi(opt->value); 931 break; 932 case 'e': /* program execution time in seconds */ 933 execution = atoi(opt->value); 934 break; 935 case 's': /* server's address */ 936 serverName = opt->value; 937 break; 938 case 'v': /* verbosity */ 939 verbosity = IncrementVerbosity(); 940 break; 941 case 'd': /* debug mode */ 942 debug_mode = PR_TRUE; 943 break; 944 case 'p': /* pthread mode */ 945 pthread_stats = PR_TRUE; 946 break; 947 case 'h': 948 default: 949 Help(); 950 return 2; 951 } 952 } 953 PL_DestroyOptState(opt); 954 955 if (0 != PL_strcmp(serverName, DEFAULT_SERVER)) { 956 serverIsLocal = PR_FALSE; 957 } 958 if (0 == execution) { 959 execution = DEFAULT_EXECUTION_TIME; 960 } 961 if (0 == workersMax) { 962 workersMax = DEFAULT_WORKERS_MAX; 963 } 964 if (0 == workersMin) { 965 workersMin = DEFAULT_WORKERS_MIN; 966 } 967 if (0 == accepting) { 968 accepting = ALLOWED_IN_ACCEPT; 969 } 970 if (0 == backlog) { 971 backlog = DEFAULT_BACKLOG; 972 } 973 974 if (workersMin > accepting) { 975 accepting = workersMin; 976 } 977 978 TimeOfDayMessage("Client/Server started at", PR_GetCurrentThread()); 979 980 cltsrv_log_file = PR_NewLogModule("cltsrv_log"); 981 MY_ASSERT(NULL != cltsrv_log_file); 982 boolean = PR_SetLogFile("cltsrv.log"); 983 MY_ASSERT(boolean); 984 985 rv = PR_SetFDCacheSize(low, high); 986 PR_ASSERT(PR_SUCCESS == rv); 987 988 if (serverIsLocal) { 989 /* Establish the server */ 990 TEST_LOG(cltsrv_log_file, TEST_LOG_INFO, 991 ("main(0x%p): starting server\n", PR_GetCurrentThread())); 992 993 server = PR_NEWZAP(CSServer_t); 994 PR_INIT_CLIST(&server->list); 995 server->state = cs_init; 996 server->ml = PR_NewLock(); 997 server->backlog = backlog; 998 server->port = DEFAULT_PORT; 999 server->workers.minimum = workersMin; 1000 server->workers.maximum = workersMax; 1001 server->workers.accepting = accepting; 1002 server->stateChange = PR_NewCondVar(server->ml); 1003 server->pool.exiting = PR_NewCondVar(server->ml); 1004 server->pool.acceptComplete = PR_NewCondVar(server->ml); 1005 1006 TEST_LOG(cltsrv_log_file, TEST_LOG_NOTICE, 1007 ("main(0x%p): creating server thread\n", PR_GetCurrentThread())); 1008 1009 server->thread = 1010 PR_CreateThread(PR_USER_THREAD, Server, server, PR_PRIORITY_HIGH, 1011 thread_scope, PR_JOINABLE_THREAD, 0); 1012 TEST_ASSERT(NULL != server->thread); 1013 1014 TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE, 1015 ("main(0x%p): waiting for server init\n", PR_GetCurrentThread())); 1016 1017 PR_Lock(server->ml); 1018 while (server->state == cs_init) { 1019 PR_WaitCondVar(server->stateChange, PR_INTERVAL_NO_TIMEOUT); 1020 } 1021 PR_Unlock(server->ml); 1022 1023 TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE, 1024 ("main(0x%p): server init complete (port #%d)\n", 1025 PR_GetCurrentThread(), server->port)); 1026 } 1027 1028 if (clients != 0) { 1029 /* Create all of the clients */ 1030 PRHostEnt host; 1031 char buffer[BUFFER_SIZE]; 1032 client = (CSClient_t*)PR_CALLOC(clients * sizeof(CSClient_t)); 1033 1034 TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE, 1035 ("main(0x%p): creating %d client threads\n", PR_GetCurrentThread(), 1036 clients)); 1037 1038 if (!serverIsLocal) { 1039 rv = PR_GetHostByName(serverName, buffer, BUFFER_SIZE, &host); 1040 if (PR_SUCCESS != rv) { 1041 PL_FPrintError(PR_STDERR, "PR_GetHostByName"); 1042 return 2; 1043 } 1044 } 1045 1046 for (index = 0; index < clients; ++index) { 1047 client[index].state = cs_init; 1048 client[index].ml = PR_NewLock(); 1049 if (serverIsLocal) { 1050 if (PR_AF_INET6 != domain) { 1051 TEST_LOG(cltsrv_log_file, TEST_LOG_ALWAYS, 1052 ("loopback client ip port %s\n", DEFAULT_PORT)); 1053 (void)PR_InitializeNetAddr(PR_IpAddrLoopback, DEFAULT_PORT, 1054 &client[index].serverAddress); 1055 } else { 1056 TEST_LOG(cltsrv_log_file, TEST_LOG_ALWAYS, 1057 ("loopback client ipv6 port %s\n", DEFAULT_PORT)); 1058 rv = PR_SetNetAddr(PR_IpAddrLoopback, PR_AF_INET6, DEFAULT_PORT, 1059 &client[index].serverAddress); 1060 } 1061 } else { 1062 TEST_LOG(cltsrv_log_file, TEST_LOG_ALWAYS, 1063 ("client enumerate port %s\n", DEFAULT_PORT)); 1064 (void)PR_EnumerateHostEnt(0, &host, DEFAULT_PORT, 1065 &client[index].serverAddress); 1066 } 1067 client[index].stateChange = PR_NewCondVar(client[index].ml); 1068 TEST_LOG( 1069 cltsrv_log_file, TEST_LOG_INFO, 1070 ("main(0x%p): creating client threads\n", PR_GetCurrentThread())); 1071 client[index].thread = PR_CreateThread( 1072 PR_USER_THREAD, Client, &client[index], PR_PRIORITY_NORMAL, 1073 thread_scope, PR_JOINABLE_THREAD, 0); 1074 TEST_ASSERT(NULL != client[index].thread); 1075 PR_Lock(client[index].ml); 1076 while (cs_init == client[index].state) { 1077 PR_WaitCondVar(client[index].stateChange, PR_INTERVAL_NO_TIMEOUT); 1078 } 1079 PR_Unlock(client[index].ml); 1080 } 1081 } 1082 1083 /* Then just let them go at it for a bit */ 1084 TEST_LOG(cltsrv_log_file, TEST_LOG_ALWAYS, 1085 ("main(0x%p): waiting for execution interval (%d seconds)\n", 1086 PR_GetCurrentThread(), execution)); 1087 1088 WaitForCompletion(execution); 1089 1090 TimeOfDayMessage("Shutting down", PR_GetCurrentThread()); 1091 1092 if (clients != 0) { 1093 for (index = 0; index < clients; ++index) { 1094 TEST_LOG(cltsrv_log_file, TEST_LOG_STATUS, 1095 ("main(0x%p): notifying client(0x%p) to stop\n", 1096 PR_GetCurrentThread(), client[index].thread)); 1097 1098 PR_Lock(client[index].ml); 1099 if (cs_run == client[index].state) { 1100 client[index].state = cs_stop; 1101 PR_Interrupt(client[index].thread); 1102 while (cs_stop == client[index].state) 1103 PR_WaitCondVar(client[index].stateChange, PR_INTERVAL_NO_TIMEOUT); 1104 } 1105 PR_Unlock(client[index].ml); 1106 1107 TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE, 1108 ("main(0x%p): joining client(0x%p)\n", PR_GetCurrentThread(), 1109 client[index].thread)); 1110 1111 joinStatus = PR_JoinThread(client[index].thread); 1112 TEST_ASSERT(PR_SUCCESS == joinStatus); 1113 PR_DestroyCondVar(client[index].stateChange); 1114 PR_DestroyLock(client[index].ml); 1115 } 1116 PR_DELETE(client); 1117 } 1118 1119 if (NULL != server) { 1120 /* All clients joined - retrieve the server */ 1121 TEST_LOG(cltsrv_log_file, TEST_LOG_NOTICE, 1122 ("main(0x%p): notifying server(0x%p) to stop\n", 1123 PR_GetCurrentThread(), server->thread)); 1124 1125 PR_Lock(server->ml); 1126 server->state = cs_stop; 1127 PR_Interrupt(server->thread); 1128 while (cs_exit != server->state) { 1129 PR_WaitCondVar(server->stateChange, PR_INTERVAL_NO_TIMEOUT); 1130 } 1131 PR_Unlock(server->ml); 1132 1133 TEST_LOG(cltsrv_log_file, TEST_LOG_NOTICE, 1134 ("main(0x%p): joining server(0x%p)\n", PR_GetCurrentThread(), 1135 server->thread)); 1136 joinStatus = PR_JoinThread(server->thread); 1137 TEST_ASSERT(PR_SUCCESS == joinStatus); 1138 1139 PR_DestroyCondVar(server->stateChange); 1140 PR_DestroyCondVar(server->pool.exiting); 1141 PR_DestroyCondVar(server->pool.acceptComplete); 1142 PR_DestroyLock(server->ml); 1143 PR_DELETE(server); 1144 } 1145 1146 TEST_LOG(cltsrv_log_file, TEST_LOG_ALWAYS, 1147 ("main(0x%p): test complete\n", PR_GetCurrentThread())); 1148 1149 PT_FPrintStats(debug_out, "\nPThread Statistics\n"); 1150 1151 TimeOfDayMessage("Test exiting at", PR_GetCurrentThread()); 1152 PR_Cleanup(); 1153 return 0; 1154 } /* main */ 1155 1156 /* cltsrv.c */