servr_kk.c (15415B)
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* This Source Code Form is subject to the terms of the Mozilla Public 3 * License, v. 2.0. If a copy of the MPL was not distributed with this 4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 5 6 /*********************************************************************** 7 ** 8 ** This server simulates a server running in loopback mode. 9 ** 10 ** The idea is that a single server is created. The server initially creates 11 ** a number of worker threads. Then, with the server running, a number of 12 ** clients are created which start requesting service from the server. 13 ** 14 ** 15 ** Modification History: 16 ** 19-May-97 AGarcia- Converted the test to accomodate the debug_mode flag. 17 ** The debug mode will print all of the printfs associated with this 18 *test. 19 ** The regress mode will be the default mode. Since the regress tool 20 *limits 21 ** the output to a one line status:PASS or FAIL,all of the printf 22 *statements 23 ** have been handled with an if (debug_mode) statement. 24 ** 04-June-97 AGarcia removed the Test_Result function. Regress tool has been 25 *updated to 26 ** recognize the return code from tha main program. 27 ***********************************************************************/ 28 29 /*********************************************************************** 30 ** Includes 31 ***********************************************************************/ 32 /* Used to get the command line option */ 33 #include "plgetopt.h" 34 35 #include "nspr.h" 36 #include "pprthred.h" 37 38 #include <string.h> 39 40 #define PORT 15004 41 #define THREAD_STACKSIZE 0 42 43 static int _iterations = 1000; 44 static int _clients = 1; 45 static int _client_data = 250; 46 static int _server_data = (8 * 1024); 47 48 static PRThreadScope ServerScope, ClientScope; 49 50 #define SERVER "Server" 51 #define MAIN "Main" 52 53 #define SERVER_STATE_STARTUP 0 54 #define SERVER_STATE_READY 1 55 #define SERVER_STATE_DYING 2 56 #define SERVER_STATE_DEAD 4 57 int ServerState; 58 PRLock* ServerStateCVLock; 59 PRCondVar* ServerStateCV; 60 61 #ifdef DEBUGPRINTS 62 # define DPRINTF printf 63 #else 64 # define DPRINTF 65 #endif 66 67 PRIntn failed_already = 0; 68 PRIntn debug_mode; 69 static void do_work(void); 70 71 /* --- Server state functions --------------------------------------------- */ 72 void SetServerState(char* waiter, PRInt32 state) { 73 PR_Lock(ServerStateCVLock); 74 ServerState = state; 75 PR_NotifyCondVar(ServerStateCV); 76 77 if (debug_mode) { 78 DPRINTF("\t%s changed state to %d\n", waiter, state); 79 } 80 81 PR_Unlock(ServerStateCVLock); 82 } 83 84 int WaitServerState(char* waiter, PRInt32 state) { 85 PRInt32 rv; 86 87 PR_Lock(ServerStateCVLock); 88 89 if (debug_mode) { 90 DPRINTF("\t%s waiting for state %d\n", waiter, state); 91 } 92 93 while (!(ServerState & state)) { 94 PR_WaitCondVar(ServerStateCV, PR_INTERVAL_NO_TIMEOUT); 95 } 96 rv = ServerState; 97 98 if (debug_mode) 99 DPRINTF("\t%s resuming from wait for state %d; state now %d\n", waiter, 100 state, ServerState); 101 PR_Unlock(ServerStateCVLock); 102 103 return rv; 104 } 105 106 /* --- Server Functions ------------------------------------------- */ 107 108 PRLock* workerThreadsLock; 109 PRInt32 workerThreads; 110 PRInt32 workerThreadsBusy; 111 112 void WorkerThreadFunc(void* _listenSock) { 113 PRFileDesc* listenSock = (PRFileDesc*)_listenSock; 114 PRInt32 bytesRead; 115 PRInt32 bytesWritten; 116 char* dataBuf; 117 char* sendBuf; 118 119 if (debug_mode) 120 DPRINTF("\tServer buffer is %d bytes; %d data, %d netaddrs\n", 121 _client_data + (2 * sizeof(PRNetAddr)) + 32, _client_data, 122 (2 * sizeof(PRNetAddr)) + 32); 123 dataBuf = (char*)PR_MALLOC(_client_data + 2 * sizeof(PRNetAddr) + 32); 124 if (!dataBuf) 125 if (debug_mode) { 126 printf("\tServer could not malloc space!?\n"); 127 } 128 sendBuf = (char*)PR_MALLOC(_server_data * sizeof(char)); 129 if (!sendBuf) 130 if (debug_mode) { 131 printf("\tServer could not malloc space!?\n"); 132 } 133 134 if (debug_mode) { 135 DPRINTF("\tServer worker thread running\n"); 136 } 137 138 while (1) { 139 PRInt32 bytesToRead = _client_data; 140 PRInt32 bytesToWrite = _server_data; 141 PRFileDesc* newSock; 142 PRNetAddr* rAddr; 143 PRInt32 loops = 0; 144 145 loops++; 146 147 if (debug_mode) { 148 DPRINTF("\tServer thread going into accept\n"); 149 } 150 151 bytesRead = PR_AcceptRead(listenSock, &newSock, &rAddr, dataBuf, 152 bytesToRead, PR_INTERVAL_NO_TIMEOUT); 153 154 if (bytesRead < 0) { 155 if (debug_mode) { 156 printf("\tServer error in accept (%d)\n", bytesRead); 157 } 158 continue; 159 } 160 161 if (debug_mode) { 162 DPRINTF("\tServer accepted connection (%d bytes)\n", bytesRead); 163 } 164 165 PR_AtomicIncrement(&workerThreadsBusy); 166 if (workerThreadsBusy == workerThreads) { 167 PR_Lock(workerThreadsLock); 168 if (workerThreadsBusy == workerThreads) { 169 PRThread* WorkerThread; 170 171 WorkerThread = PR_CreateThread( 172 PR_SYSTEM_THREAD, WorkerThreadFunc, listenSock, PR_PRIORITY_NORMAL, 173 ServerScope, PR_UNJOINABLE_THREAD, THREAD_STACKSIZE); 174 175 if (!WorkerThread) { 176 if (debug_mode) { 177 printf("Error creating client thread %d\n", workerThreads); 178 } 179 } else { 180 PR_AtomicIncrement(&workerThreads); 181 if (debug_mode) { 182 DPRINTF("\tServer creates worker (%d)\n", workerThreads); 183 } 184 } 185 } 186 PR_Unlock(workerThreadsLock); 187 } 188 189 bytesToRead -= bytesRead; 190 while (bytesToRead) { 191 bytesRead = 192 PR_Recv(newSock, dataBuf, bytesToRead, 0, PR_INTERVAL_NO_TIMEOUT); 193 if (bytesRead < 0) { 194 if (debug_mode) { 195 printf("\tServer error receiving data (%d)\n", bytesRead); 196 } 197 continue; 198 } 199 if (debug_mode) { 200 DPRINTF("\tServer received %d bytes\n", bytesRead); 201 } 202 } 203 204 bytesWritten = 205 PR_Send(newSock, sendBuf, bytesToWrite, 0, PR_INTERVAL_NO_TIMEOUT); 206 if (bytesWritten != _server_data) { 207 if (debug_mode) 208 printf("\tError sending data to client (%d, %d)\n", bytesWritten, 209 PR_GetOSError()); 210 } else { 211 if (debug_mode) { 212 DPRINTF("\tServer sent %d bytes\n", bytesWritten); 213 } 214 } 215 216 PR_Close(newSock); 217 PR_AtomicDecrement(&workerThreadsBusy); 218 } 219 } 220 221 PRFileDesc* ServerSetup(void) { 222 PRFileDesc* listenSocket; 223 PRSocketOptionData sockOpt; 224 PRNetAddr serverAddr; 225 PRThread* WorkerThread; 226 227 if ((listenSocket = PR_NewTCPSocket()) == NULL) { 228 if (debug_mode) { 229 printf("\tServer error creating listen socket\n"); 230 } else { 231 failed_already = 1; 232 } 233 return NULL; 234 } 235 236 sockOpt.option = PR_SockOpt_Reuseaddr; 237 sockOpt.value.reuse_addr = PR_TRUE; 238 if (PR_SetSocketOption(listenSocket, &sockOpt) == PR_FAILURE) { 239 if (debug_mode) 240 printf("\tServer error setting socket option: OS error %d\n", 241 PR_GetOSError()); 242 else { 243 failed_already = 1; 244 } 245 PR_Close(listenSocket); 246 return NULL; 247 } 248 249 memset(&serverAddr, 0, sizeof(PRNetAddr)); 250 serverAddr.inet.family = PR_AF_INET; 251 serverAddr.inet.port = PR_htons(PORT); 252 serverAddr.inet.ip = PR_htonl(PR_INADDR_ANY); 253 254 if (PR_Bind(listenSocket, &serverAddr) == PR_FAILURE) { 255 if (debug_mode) 256 printf("\tServer error binding to server address: OS error %d\n", 257 PR_GetOSError()); 258 else { 259 failed_already = 1; 260 } 261 PR_Close(listenSocket); 262 return NULL; 263 } 264 265 if (PR_Listen(listenSocket, 128) == PR_FAILURE) { 266 if (debug_mode) { 267 printf("\tServer error listening to server socket\n"); 268 } else { 269 failed_already = 1; 270 } 271 PR_Close(listenSocket); 272 273 return NULL; 274 } 275 276 /* Create Clients */ 277 workerThreads = 0; 278 workerThreadsBusy = 0; 279 280 workerThreadsLock = PR_NewLock(); 281 282 WorkerThread = PR_CreateThread(PR_SYSTEM_THREAD, WorkerThreadFunc, 283 listenSocket, PR_PRIORITY_NORMAL, ServerScope, 284 PR_UNJOINABLE_THREAD, THREAD_STACKSIZE); 285 286 if (!WorkerThread) { 287 if (debug_mode) { 288 printf("error creating working thread\n"); 289 } 290 PR_Close(listenSocket); 291 return NULL; 292 } 293 PR_AtomicIncrement(&workerThreads); 294 if (debug_mode) { 295 DPRINTF("\tServer created primordial worker thread\n"); 296 } 297 298 return listenSocket; 299 } 300 301 /* The main server loop */ 302 void ServerThreadFunc(void* unused) { 303 PRFileDesc* listenSocket; 304 305 /* Do setup */ 306 listenSocket = ServerSetup(); 307 308 if (!listenSocket) { 309 SetServerState(SERVER, SERVER_STATE_DEAD); 310 } else { 311 if (debug_mode) { 312 DPRINTF("\tServer up\n"); 313 } 314 315 /* Tell clients they can start now. */ 316 SetServerState(SERVER, SERVER_STATE_READY); 317 318 /* Now wait for server death signal */ 319 WaitServerState(SERVER, SERVER_STATE_DYING); 320 321 /* Cleanup */ 322 SetServerState(SERVER, SERVER_STATE_DEAD); 323 } 324 } 325 326 /* --- Client Functions ------------------------------------------- */ 327 328 PRInt32 numRequests; 329 PRInt32 numClients; 330 PRMonitor* clientMonitor; 331 332 void ClientThreadFunc(void* unused) { 333 PRNetAddr serverAddr; 334 PRFileDesc* clientSocket; 335 char* sendBuf; 336 char* recvBuf; 337 PRInt32 rv; 338 PRInt32 bytesNeeded; 339 340 sendBuf = (char*)PR_MALLOC(_client_data * sizeof(char)); 341 if (!sendBuf) 342 if (debug_mode) { 343 printf("\tClient could not malloc space!?\n"); 344 } 345 recvBuf = (char*)PR_MALLOC(_server_data * sizeof(char)); 346 if (!recvBuf) 347 if (debug_mode) { 348 printf("\tClient could not malloc space!?\n"); 349 } 350 351 memset(&serverAddr, 0, sizeof(PRNetAddr)); 352 serverAddr.inet.family = PR_AF_INET; 353 serverAddr.inet.port = PR_htons(PORT); 354 serverAddr.inet.ip = PR_htonl(PR_INADDR_LOOPBACK); 355 356 while (numRequests > 0) { 357 if ((numRequests % 10) == 0) 358 if (debug_mode) { 359 printf("."); 360 } 361 if (debug_mode) { 362 DPRINTF("\tClient starting request %d\n", numRequests); 363 } 364 365 clientSocket = PR_NewTCPSocket(); 366 if (!clientSocket) { 367 if (debug_mode) 368 printf("Client error creating socket: OS error %d\n", PR_GetOSError()); 369 continue; 370 } 371 372 if (debug_mode) { 373 DPRINTF("\tClient connecting\n"); 374 } 375 376 rv = PR_Connect(clientSocket, &serverAddr, PR_INTERVAL_NO_TIMEOUT); 377 if (!clientSocket) { 378 if (debug_mode) { 379 printf("\tClient error connecting\n"); 380 } 381 continue; 382 } 383 384 if (debug_mode) { 385 DPRINTF("\tClient connected\n"); 386 } 387 388 rv = 389 PR_Send(clientSocket, sendBuf, _client_data, 0, PR_INTERVAL_NO_TIMEOUT); 390 if (rv != _client_data) { 391 if (debug_mode) { 392 printf("Client error sending data (%d)\n", rv); 393 } 394 PR_Close(clientSocket); 395 continue; 396 } 397 398 if (debug_mode) { 399 DPRINTF("\tClient sent %d bytes\n", rv); 400 } 401 402 bytesNeeded = _server_data; 403 while (bytesNeeded) { 404 rv = PR_Recv(clientSocket, recvBuf, bytesNeeded, 0, 405 PR_INTERVAL_NO_TIMEOUT); 406 if (rv <= 0) { 407 if (debug_mode) 408 printf("Client error receiving data (%d) (%d/%d)\n", rv, 409 (_server_data - bytesNeeded), _server_data); 410 break; 411 } 412 if (debug_mode) { 413 DPRINTF("\tClient received %d bytes; need %d more\n", rv, 414 bytesNeeded - rv); 415 } 416 bytesNeeded -= rv; 417 } 418 419 PR_Close(clientSocket); 420 421 PR_AtomicDecrement(&numRequests); 422 } 423 424 PR_EnterMonitor(clientMonitor); 425 --numClients; 426 PR_Notify(clientMonitor); 427 PR_ExitMonitor(clientMonitor); 428 429 PR_DELETE(sendBuf); 430 PR_DELETE(recvBuf); 431 } 432 433 void RunClients(void) { 434 PRInt32 index; 435 436 numRequests = _iterations; 437 numClients = _clients; 438 clientMonitor = PR_NewMonitor(); 439 440 for (index = 0; index < _clients; index++) { 441 PRThread* clientThread; 442 443 clientThread = PR_CreateThread(PR_USER_THREAD, ClientThreadFunc, NULL, 444 PR_PRIORITY_NORMAL, ClientScope, 445 PR_UNJOINABLE_THREAD, THREAD_STACKSIZE); 446 447 if (!clientThread) { 448 if (debug_mode) { 449 printf("\terror creating client thread %d\n", index); 450 } 451 } else if (debug_mode) { 452 DPRINTF("\tMain created client %d/%d\n", index + 1, _clients); 453 } 454 } 455 456 PR_EnterMonitor(clientMonitor); 457 while (numClients) { 458 PR_Wait(clientMonitor, PR_INTERVAL_NO_TIMEOUT); 459 } 460 PR_ExitMonitor(clientMonitor); 461 } 462 463 /* --- Main Function ---------------------------------------------- */ 464 465 static void do_work() { 466 PRThread* ServerThread; 467 PRInt32 state; 468 469 SetServerState(MAIN, SERVER_STATE_STARTUP); 470 ServerThread = PR_CreateThread(PR_USER_THREAD, ServerThreadFunc, NULL, 471 PR_PRIORITY_NORMAL, ServerScope, 472 PR_JOINABLE_THREAD, THREAD_STACKSIZE); 473 if (!ServerThread) { 474 if (debug_mode) { 475 printf("error creating main server thread\n"); 476 } 477 return; 478 } 479 480 /* Wait for server to be ready */ 481 state = WaitServerState(MAIN, SERVER_STATE_READY | SERVER_STATE_DEAD); 482 483 if (!(state & SERVER_STATE_DEAD)) { 484 /* Run Test Clients */ 485 RunClients(); 486 487 /* Send death signal to server */ 488 SetServerState(MAIN, SERVER_STATE_DYING); 489 } 490 491 PR_JoinThread(ServerThread); 492 } 493 494 static void do_workUU(void) { 495 ServerScope = PR_LOCAL_THREAD; 496 ClientScope = PR_LOCAL_THREAD; 497 do_work(); 498 } 499 500 static void do_workUK(void) { 501 ServerScope = PR_LOCAL_THREAD; 502 ClientScope = PR_GLOBAL_THREAD; 503 do_work(); 504 } 505 506 static void do_workKU(void) { 507 ServerScope = PR_GLOBAL_THREAD; 508 ClientScope = PR_LOCAL_THREAD; 509 do_work(); 510 } 511 512 static void do_workKK(void) { 513 ServerScope = PR_GLOBAL_THREAD; 514 ClientScope = PR_GLOBAL_THREAD; 515 do_work(); 516 } 517 518 static void Measure(void (*func)(void), const char* msg) { 519 PRIntervalTime start, stop; 520 double d; 521 522 start = PR_IntervalNow(); 523 (*func)(); 524 stop = PR_IntervalNow(); 525 526 d = (double)PR_IntervalToMicroseconds(stop - start); 527 528 if (debug_mode) { 529 printf("\n%40s: %6.2f usec\n", msg, d / _iterations); 530 } 531 } 532 533 int main(int argc, char** argv) { 534 /* The command line argument: -d is used to determine if the test is being run 535 in debug mode. The regress tool requires only one line output:PASS or FAIL. 536 All of the printfs associated with this test has been handled with a if 537 (debug_mode) test. Usage: test_name -d 538 */ 539 PLOptStatus os; 540 PLOptState* opt = PL_CreateOptState(argc, argv, "d:"); 541 while (PL_OPT_EOL != (os = PL_GetNextOpt(opt))) { 542 if (PL_OPT_BAD == os) { 543 continue; 544 } 545 switch (opt->option) { 546 case 'd': /* debug mode */ 547 debug_mode = 1; 548 break; 549 default: 550 break; 551 } 552 } 553 PL_DestroyOptState(opt); 554 555 /* main test */ 556 if (debug_mode) { 557 printf("Enter number of iterations: \n"); 558 scanf("%d", &_iterations); 559 printf("Enter number of clients : \n"); 560 scanf("%d", &_clients); 561 printf("Enter size of client data : \n"); 562 scanf("%d", &_client_data); 563 printf("Enter size of server data : \n"); 564 scanf("%d", &_server_data); 565 } else { 566 _iterations = 7; 567 _clients = 7; 568 _client_data = 100; 569 _server_data = 100; 570 } 571 572 if (debug_mode) { 573 printf("\n\n%d iterations with %d client threads.\n", _iterations, 574 _clients); 575 printf("Sending %d bytes of client data and %d bytes of server data\n", 576 _client_data, _server_data); 577 } 578 PR_Init(PR_USER_THREAD, PR_PRIORITY_NORMAL, 0); 579 580 PR_SetThreadRecycleMode(64); 581 582 ServerStateCVLock = PR_NewLock(); 583 ServerStateCV = PR_NewCondVar(ServerStateCVLock); 584 585 Measure(do_workKK, "server loop kernel/kernel"); 586 587 PR_Cleanup(); 588 589 if (failed_already) { 590 return 1; 591 } else { 592 return 0; 593 } 594 }