thrpool_server.c (14642B)
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* This Source Code Form is subject to the terms of the Mozilla Public 3 * License, v. 2.0. If a copy of the MPL was not distributed with this 4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 5 6 /*********************************************************************** 7 ** 8 ** Name: thrpool.c 9 ** 10 ** Description: Test threadpool functionality. 11 ** 12 ** Modification History: 13 */ 14 #include "primpl.h" 15 16 #include "plgetopt.h" 17 18 #include <stdio.h> 19 #include <string.h> 20 #include <errno.h> 21 #ifdef XP_UNIX 22 # include <sys/mman.h> 23 #endif 24 #if defined(_PR_PTHREADS) 25 # include <pthread.h> 26 #endif 27 28 /* for getcwd */ 29 #if defined(XP_UNIX) 30 # include <unistd.h> 31 #elif defined(XP_PC) 32 # include <direct.h> 33 #endif 34 35 #ifdef WIN32 36 # include <process.h> 37 #endif 38 39 static int _debug_on = 0; 40 static char* program_name = NULL; 41 static void serve_client_write(void* arg); 42 43 #include "obsolete/prsem.h" 44 45 #ifdef XP_PC 46 # define mode_t int 47 #endif 48 49 #define DPRINTF(arg) \ 50 if (_debug_on) printf arg 51 52 #define BUF_DATA_SIZE (2 * 1024) 53 #define TCP_MESG_SIZE 1024 54 #define NUM_TCP_CLIENTS 10 /* for a listen queue depth of 5 */ 55 56 #define NUM_TCP_CONNECTIONS_PER_CLIENT 10 57 #define NUM_TCP_MESGS_PER_CONNECTION 10 58 #define TCP_SERVER_PORT 10000 59 #define SERVER_MAX_BIND_COUNT 100 60 61 #ifdef WINCE 62 char* getcwd(char* buf, size_t size) { 63 wchar_t wpath[MAX_PATH]; 64 _wgetcwd(wpath, MAX_PATH); 65 WideCharToMultiByte(CP_ACP, 0, wpath, -1, buf, size, 0, 0); 66 } 67 68 # define perror(s) 69 #endif 70 71 static PRInt32 num_tcp_clients = NUM_TCP_CLIENTS; 72 static PRInt32 num_tcp_connections_per_client = NUM_TCP_CONNECTIONS_PER_CLIENT; 73 static PRInt32 tcp_mesg_size = TCP_MESG_SIZE; 74 static PRInt32 num_tcp_mesgs_per_connection = NUM_TCP_MESGS_PER_CONNECTION; 75 static void TCP_Server_Accept(void* arg); 76 77 int failed_already = 0; 78 typedef struct buffer { 79 char data[BUF_DATA_SIZE]; 80 } buffer; 81 82 typedef struct Server_Param { 83 PRJobIoDesc iod; /* socket to read from/write to */ 84 PRInt32 datalen; /* bytes of data transfered in each read/write */ 85 PRNetAddr netaddr; 86 PRMonitor* exit_mon; /* monitor to signal on exit */ 87 PRInt32* job_counterp; /* counter to decrement, before exit */ 88 PRInt32 conn_counter; /* counter to decrement, before exit */ 89 PRThreadPool* tp; 90 } Server_Param; 91 92 typedef struct Serve_Client_Param { 93 PRJobIoDesc iod; /* socket to read from/write to */ 94 PRInt32 datalen; /* bytes of data transfered in each read/write */ 95 PRMonitor* exit_mon; /* monitor to signal on exit */ 96 PRInt32* job_counterp; /* counter to decrement, before exit */ 97 PRThreadPool* tp; 98 } Serve_Client_Param; 99 100 typedef struct Session { 101 PRJobIoDesc iod; /* socket to read from/write to */ 102 buffer* in_buf; 103 PRInt32 bytes; 104 PRInt32 msg_num; 105 PRInt32 bytes_read; 106 PRMonitor* exit_mon; /* monitor to signal on exit */ 107 PRInt32* job_counterp; /* counter to decrement, before exit */ 108 PRThreadPool* tp; 109 } Session; 110 111 static void serve_client_read(void* arg) { 112 Session* sp = (Session*)arg; 113 int rem; 114 int bytes; 115 int offset; 116 PRFileDesc* sockfd; 117 char* buf; 118 PRJob* jobp; 119 120 PRIntervalTime timeout = PR_INTERVAL_NO_TIMEOUT; 121 122 sockfd = sp->iod.socket; 123 buf = sp->in_buf->data; 124 125 PR_ASSERT(sp->msg_num < num_tcp_mesgs_per_connection); 126 PR_ASSERT(sp->bytes_read < sp->bytes); 127 128 offset = sp->bytes_read; 129 rem = sp->bytes - offset; 130 bytes = PR_Recv(sockfd, buf + offset, rem, 0, timeout); 131 if (bytes < 0) { 132 return; 133 } 134 sp->bytes_read += bytes; 135 sp->iod.timeout = PR_SecondsToInterval(60); 136 if (sp->bytes_read < sp->bytes) { 137 jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp, PR_FALSE); 138 PR_ASSERT(NULL != jobp); 139 return; 140 } 141 PR_ASSERT(sp->bytes_read == sp->bytes); 142 DPRINTF(("serve_client: read complete, msg(%d) \n", sp->msg_num)); 143 144 sp->iod.timeout = PR_SecondsToInterval(60); 145 jobp = PR_QueueJob_Write(sp->tp, &sp->iod, serve_client_write, sp, PR_FALSE); 146 PR_ASSERT(NULL != jobp); 147 148 return; 149 } 150 151 static void serve_client_write(void* arg) { 152 Session* sp = (Session*)arg; 153 int bytes; 154 PRFileDesc* sockfd; 155 char* buf; 156 PRJob* jobp; 157 158 sockfd = sp->iod.socket; 159 buf = sp->in_buf->data; 160 161 PR_ASSERT(sp->msg_num < num_tcp_mesgs_per_connection); 162 163 bytes = PR_Send(sockfd, buf, sp->bytes, 0, PR_INTERVAL_NO_TIMEOUT); 164 PR_ASSERT(bytes == sp->bytes); 165 166 if (bytes < 0) { 167 return; 168 } 169 DPRINTF(("serve_client: write complete, msg(%d) \n", sp->msg_num)); 170 sp->msg_num++; 171 if (sp->msg_num < num_tcp_mesgs_per_connection) { 172 sp->bytes_read = 0; 173 sp->iod.timeout = PR_SecondsToInterval(60); 174 jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp, PR_FALSE); 175 PR_ASSERT(NULL != jobp); 176 return; 177 } 178 179 DPRINTF(("serve_client: read/write complete, msg(%d) \n", sp->msg_num)); 180 if (PR_Shutdown(sockfd, PR_SHUTDOWN_BOTH) < 0) { 181 fprintf(stderr, "%s: ERROR - PR_Shutdown\n", program_name); 182 } 183 184 PR_Close(sockfd); 185 PR_EnterMonitor(sp->exit_mon); 186 --(*sp->job_counterp); 187 PR_Notify(sp->exit_mon); 188 PR_ExitMonitor(sp->exit_mon); 189 190 PR_DELETE(sp->in_buf); 191 PR_DELETE(sp); 192 193 return; 194 } 195 196 /* 197 * Serve_Client 198 * Thread, started by the server, for serving a client connection. 199 * Reads data from socket and writes it back, unmodified, and 200 * closes the socket 201 */ 202 static void PR_CALLBACK Serve_Client(void* arg) { 203 Serve_Client_Param* scp = (Serve_Client_Param*)arg; 204 buffer* in_buf; 205 Session* sp; 206 PRJob* jobp; 207 208 sp = PR_NEW(Session); 209 sp->iod = scp->iod; 210 211 in_buf = PR_NEW(buffer); 212 if (in_buf == NULL) { 213 fprintf(stderr, "%s: failed to alloc buffer struct\n", program_name); 214 failed_already = 1; 215 return; 216 } 217 218 sp->in_buf = in_buf; 219 sp->bytes = scp->datalen; 220 sp->msg_num = 0; 221 sp->bytes_read = 0; 222 sp->tp = scp->tp; 223 sp->exit_mon = scp->exit_mon; 224 sp->job_counterp = scp->job_counterp; 225 226 sp->iod.timeout = PR_SecondsToInterval(60); 227 jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp, PR_FALSE); 228 PR_ASSERT(NULL != jobp); 229 PR_DELETE(scp); 230 } 231 232 static void print_stats(void* arg) { 233 Server_Param* sp = (Server_Param*)arg; 234 PRThreadPool* tp = sp->tp; 235 PRInt32 counter; 236 PRJob* jobp; 237 238 PR_EnterMonitor(sp->exit_mon); 239 counter = (*sp->job_counterp); 240 PR_ExitMonitor(sp->exit_mon); 241 242 printf("PRINT_STATS: #client connections = %d\n", counter); 243 244 jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(500), print_stats, sp, 245 PR_FALSE); 246 247 PR_ASSERT(NULL != jobp); 248 } 249 250 static int job_counter = 0; 251 /* 252 * TCP Server 253 * Server binds an address to a socket, starts a client process and 254 * listens for incoming connections. 255 * Each client connects to the server and sends a chunk of data 256 * Starts a Serve_Client job for each incoming connection, to read 257 * the data from the client and send it back to the client, unmodified. 258 * Each client checks that data received from server is same as the 259 * data it sent to the server. 260 * Finally, the threadpool is shutdown 261 */ 262 static void PR_CALLBACK TCP_Server(void* arg) { 263 PRThreadPool* tp = (PRThreadPool*)arg; 264 Server_Param* sp; 265 PRFileDesc* sockfd; 266 PRNetAddr netaddr; 267 PRMonitor* sc_mon; 268 PRJob* jobp; 269 int i; 270 PRStatus rval; 271 272 /* 273 * Create a tcp socket 274 */ 275 if ((sockfd = PR_NewTCPSocket()) == NULL) { 276 fprintf(stderr, "%s: PR_NewTCPSocket failed\n", program_name); 277 return; 278 } 279 memset(&netaddr, 0, sizeof(netaddr)); 280 netaddr.inet.family = PR_AF_INET; 281 netaddr.inet.port = PR_htons(TCP_SERVER_PORT); 282 netaddr.inet.ip = PR_htonl(PR_INADDR_ANY); 283 /* 284 * try a few times to bind server's address, if addresses are in 285 * use 286 */ 287 i = 0; 288 while (PR_Bind(sockfd, &netaddr) < 0) { 289 if (PR_GetError() == PR_ADDRESS_IN_USE_ERROR) { 290 netaddr.inet.port += 2; 291 if (i++ < SERVER_MAX_BIND_COUNT) { 292 continue; 293 } 294 } 295 fprintf(stderr, "%s: ERROR - PR_Bind failed\n", program_name); 296 perror("PR_Bind"); 297 failed_already = 1; 298 return; 299 } 300 301 if (PR_Listen(sockfd, 32) < 0) { 302 fprintf(stderr, "%s: ERROR - PR_Listen failed\n", program_name); 303 failed_already = 1; 304 return; 305 } 306 307 if (PR_GetSockName(sockfd, &netaddr) < 0) { 308 fprintf(stderr, "%s: ERROR - PR_GetSockName failed\n", program_name); 309 failed_already = 1; 310 return; 311 } 312 313 DPRINTF( 314 ("TCP_Server: PR_BIND netaddr.inet.ip = 0x%lx, netaddr.inet.port = %d\n", 315 netaddr.inet.ip, netaddr.inet.port)); 316 317 sp = PR_NEW(Server_Param); 318 if (sp == NULL) { 319 fprintf(stderr, "%s: PR_NEW failed\n", program_name); 320 failed_already = 1; 321 return; 322 } 323 sp->iod.socket = sockfd; 324 sp->iod.timeout = PR_SecondsToInterval(60); 325 sp->datalen = tcp_mesg_size; 326 sp->exit_mon = sc_mon; 327 sp->job_counterp = &job_counter; 328 sp->conn_counter = 0; 329 sp->tp = tp; 330 sp->netaddr = netaddr; 331 332 /* create and cancel an io job */ 333 jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp, PR_FALSE); 334 PR_ASSERT(NULL != jobp); 335 rval = PR_CancelJob(jobp); 336 PR_ASSERT(PR_SUCCESS == rval); 337 338 /* 339 * create the client process 340 */ 341 { 342 #define MAX_ARGS 4 343 char* argv[MAX_ARGS + 1]; 344 int index = 0; 345 char port[32]; 346 char path[1024 + sizeof("/thrpool_client")]; 347 348 getcwd(path, sizeof(path)); 349 350 (void)strcat(path, "/thrpool_client"); 351 #ifdef XP_PC 352 (void)strcat(path, ".exe"); 353 #endif 354 argv[index++] = path; 355 sprintf(port, "%d", PR_ntohs(netaddr.inet.port)); 356 if (_debug_on) { 357 argv[index++] = "-d"; 358 argv[index++] = "-p"; 359 argv[index++] = port; 360 argv[index++] = NULL; 361 } else { 362 argv[index++] = "-p"; 363 argv[index++] = port; 364 argv[index++] = NULL; 365 } 366 PR_ASSERT(MAX_ARGS >= (index - 1)); 367 368 DPRINTF(("creating client process %s ...\n", path)); 369 if (PR_FAILURE == PR_CreateProcessDetached(path, argv, NULL, NULL)) { 370 fprintf(stderr, 371 "thrpool_server: ERROR - PR_CreateProcessDetached failed\n"); 372 failed_already = 1; 373 return; 374 } 375 } 376 377 sc_mon = PR_NewMonitor(); 378 if (sc_mon == NULL) { 379 fprintf(stderr, "%s: PR_NewMonitor failed\n", program_name); 380 failed_already = 1; 381 return; 382 } 383 384 sp->iod.socket = sockfd; 385 sp->iod.timeout = PR_SecondsToInterval(60); 386 sp->datalen = tcp_mesg_size; 387 sp->exit_mon = sc_mon; 388 sp->job_counterp = &job_counter; 389 sp->conn_counter = 0; 390 sp->tp = tp; 391 sp->netaddr = netaddr; 392 393 /* create and cancel a timer job */ 394 jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(5000), print_stats, sp, 395 PR_FALSE); 396 PR_ASSERT(NULL != jobp); 397 rval = PR_CancelJob(jobp); 398 PR_ASSERT(PR_SUCCESS == rval); 399 400 DPRINTF(("TCP_Server: Accepting connections \n")); 401 402 jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp, PR_FALSE); 403 PR_ASSERT(NULL != jobp); 404 return; 405 } 406 407 static void TCP_Server_Accept(void* arg) { 408 Server_Param* sp = (Server_Param*)arg; 409 PRThreadPool* tp = sp->tp; 410 Serve_Client_Param* scp; 411 PRFileDesc* newsockfd; 412 PRJob* jobp; 413 414 if ((newsockfd = PR_Accept(sp->iod.socket, &sp->netaddr, 415 PR_INTERVAL_NO_TIMEOUT)) == NULL) { 416 fprintf(stderr, "%s: ERROR - PR_Accept failed\n", program_name); 417 failed_already = 1; 418 goto exit; 419 } 420 scp = PR_NEW(Serve_Client_Param); 421 if (scp == NULL) { 422 fprintf(stderr, "%s: PR_NEW failed\n", program_name); 423 failed_already = 1; 424 goto exit; 425 } 426 427 /* 428 * Start a Serve_Client job for each incoming connection 429 */ 430 scp->iod.socket = newsockfd; 431 scp->iod.timeout = PR_SecondsToInterval(60); 432 scp->datalen = tcp_mesg_size; 433 scp->exit_mon = sp->exit_mon; 434 scp->job_counterp = sp->job_counterp; 435 scp->tp = sp->tp; 436 437 PR_EnterMonitor(sp->exit_mon); 438 (*sp->job_counterp)++; 439 PR_ExitMonitor(sp->exit_mon); 440 jobp = PR_QueueJob(tp, Serve_Client, scp, PR_FALSE); 441 442 PR_ASSERT(NULL != jobp); 443 DPRINTF(("TCP_Server: Created Serve_Client = 0x%lx\n", jobp)); 444 445 /* 446 * single-threaded update; no lock needed 447 */ 448 sp->conn_counter++; 449 if (sp->conn_counter < (num_tcp_clients * num_tcp_connections_per_client)) { 450 jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp, PR_FALSE); 451 PR_ASSERT(NULL != jobp); 452 return; 453 } 454 jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(500), print_stats, sp, 455 PR_FALSE); 456 457 PR_ASSERT(NULL != jobp); 458 DPRINTF(("TCP_Server: Created print_stats timer job = 0x%lx\n", jobp)); 459 460 exit: 461 PR_EnterMonitor(sp->exit_mon); 462 /* Wait for server jobs to finish */ 463 while (0 != *sp->job_counterp) { 464 PR_Wait(sp->exit_mon, PR_INTERVAL_NO_TIMEOUT); 465 DPRINTF(("TCP_Server: conn_counter = %d\n", *sp->job_counterp)); 466 } 467 468 PR_ExitMonitor(sp->exit_mon); 469 if (sp->iod.socket) { 470 PR_Close(sp->iod.socket); 471 } 472 PR_DestroyMonitor(sp->exit_mon); 473 printf("%30s", "TCP_Socket_Client_Server_Test:"); 474 printf("%2ld Server %2ld Clients %2ld connections_per_client\n", 1l, 475 num_tcp_clients, num_tcp_connections_per_client); 476 printf("%30s %2ld messages_per_connection %4ld bytes_per_message\n", ":", 477 num_tcp_mesgs_per_connection, tcp_mesg_size); 478 479 DPRINTF(("%s: calling PR_ShutdownThreadPool\n", program_name)); 480 PR_ShutdownThreadPool(sp->tp); 481 PR_DELETE(sp); 482 } 483 484 /************************************************************************/ 485 486 #define DEFAULT_INITIAL_THREADS 4 487 #define DEFAULT_MAX_THREADS 100 488 #define DEFAULT_STACKSIZE (512 * 1024) 489 490 int main(int argc, char** argv) { 491 PRInt32 initial_threads = DEFAULT_INITIAL_THREADS; 492 PRInt32 max_threads = DEFAULT_MAX_THREADS; 493 PRInt32 stacksize = DEFAULT_STACKSIZE; 494 PRThreadPool* tp = NULL; 495 PRStatus rv; 496 PRJob* jobp; 497 498 /* 499 * -d debug mode 500 */ 501 PLOptStatus os; 502 PLOptState* opt; 503 504 program_name = argv[0]; 505 opt = PL_CreateOptState(argc, argv, "d"); 506 while (PL_OPT_EOL != (os = PL_GetNextOpt(opt))) { 507 if (PL_OPT_BAD == os) { 508 continue; 509 } 510 switch (opt->option) { 511 case 'd': /* debug mode */ 512 _debug_on = 1; 513 break; 514 default: 515 break; 516 } 517 } 518 PL_DestroyOptState(opt); 519 520 PR_Init(PR_USER_THREAD, PR_PRIORITY_NORMAL, 0); 521 522 PR_SetConcurrency(4); 523 524 tp = PR_CreateThreadPool(initial_threads, max_threads, stacksize); 525 if (NULL == tp) { 526 printf("PR_CreateThreadPool failed\n"); 527 failed_already = 1; 528 goto done; 529 } 530 jobp = PR_QueueJob(tp, TCP_Server, tp, PR_TRUE); 531 rv = PR_JoinJob(jobp); 532 PR_ASSERT(PR_SUCCESS == rv); 533 534 DPRINTF(("%s: calling PR_JoinThreadPool\n", program_name)); 535 rv = PR_JoinThreadPool(tp); 536 PR_ASSERT(PR_SUCCESS == rv); 537 DPRINTF(("%s: returning from PR_JoinThreadPool\n", program_name)); 538 539 done: 540 PR_Cleanup(); 541 if (failed_already) { 542 return 1; 543 } else { 544 return 0; 545 } 546 }