thrpool_client.c (8989B)
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* This Source Code Form is subject to the terms of the Mozilla Public 3 * License, v. 2.0. If a copy of the MPL was not distributed with this 4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 5 6 /*********************************************************************** 7 ** 8 ** Name: thrpool_client.c 9 ** 10 ** Description: Test threadpool functionality. 11 ** 12 ** Modification History: 13 */ 14 #include "primpl.h" 15 16 #include "plgetopt.h" 17 18 #include <stdio.h> 19 #include <string.h> 20 #include <errno.h> 21 #ifdef XP_UNIX 22 # include <sys/mman.h> 23 #endif 24 #if defined(_PR_PTHREADS) 25 # include <pthread.h> 26 #endif 27 28 #ifdef WIN32 29 # include <process.h> 30 #endif 31 32 static int _debug_on = 0; 33 static int server_port = -1; 34 static char* program_name = NULL; 35 36 #include "obsolete/prsem.h" 37 38 #ifdef XP_PC 39 # define mode_t int 40 #endif 41 42 #define DPRINTF(arg) \ 43 if (_debug_on) printf arg 44 45 #define BUF_DATA_SIZE (2 * 1024) 46 #define TCP_MESG_SIZE 1024 47 #define NUM_TCP_CLIENTS 10 /* for a listen queue depth of 5 */ 48 49 #define NUM_TCP_CONNECTIONS_PER_CLIENT 10 50 #define NUM_TCP_MESGS_PER_CONNECTION 10 51 #define TCP_SERVER_PORT 10000 52 53 static PRInt32 num_tcp_clients = NUM_TCP_CLIENTS; 54 static PRInt32 num_tcp_connections_per_client = NUM_TCP_CONNECTIONS_PER_CLIENT; 55 static PRInt32 tcp_mesg_size = TCP_MESG_SIZE; 56 static PRInt32 num_tcp_mesgs_per_connection = NUM_TCP_MESGS_PER_CONNECTION; 57 58 int failed_already = 0; 59 60 typedef struct buffer { 61 char data[BUF_DATA_SIZE]; 62 } buffer; 63 64 PRNetAddr tcp_server_addr, udp_server_addr; 65 66 typedef struct Client_Param { 67 PRNetAddr server_addr; 68 PRMonitor* exit_mon; /* monitor to signal on exit */ 69 PRInt32* exit_counter; /* counter to decrement, before exit */ 70 PRInt32 datalen; 71 } Client_Param; 72 73 /* 74 * readn 75 * read data from sockfd into buf 76 */ 77 static PRInt32 readn(PRFileDesc* sockfd, char* buf, int len) { 78 int rem; 79 int bytes; 80 int offset = 0; 81 PRIntervalTime timeout = PR_INTERVAL_NO_TIMEOUT; 82 83 for (rem = len; rem; offset += bytes, rem -= bytes) { 84 DPRINTF(("thread = 0x%lx: calling PR_Recv, bytes = %d\n", 85 PR_GetCurrentThread(), rem)); 86 bytes = PR_Recv(sockfd, buf + offset, rem, 0, timeout); 87 DPRINTF(("thread = 0x%lx: returning from PR_Recv, bytes = %d\n", 88 PR_GetCurrentThread(), bytes)); 89 if (bytes < 0) { 90 return -1; 91 } 92 } 93 return len; 94 } 95 96 /* 97 * writen 98 * write data from buf to sockfd 99 */ 100 static PRInt32 writen(PRFileDesc* sockfd, char* buf, int len) { 101 int rem; 102 int bytes; 103 int offset = 0; 104 105 for (rem = len; rem; offset += bytes, rem -= bytes) { 106 DPRINTF(("thread = 0x%lx: calling PR_Send, bytes = %d\n", 107 PR_GetCurrentThread(), rem)); 108 bytes = PR_Send(sockfd, buf + offset, rem, 0, PR_INTERVAL_NO_TIMEOUT); 109 DPRINTF(("thread = 0x%lx: returning from PR_Send, bytes = %d\n", 110 PR_GetCurrentThread(), bytes)); 111 if (bytes <= 0) { 112 return -1; 113 } 114 } 115 return len; 116 } 117 118 /* 119 * TCP_Client 120 * Client job 121 * Connect to the server at the address specified in the argument. 122 * Fill in a buffer, write data to server, read it back and check 123 * for data corruption. 124 * Close the socket for server connection 125 */ 126 static void PR_CALLBACK TCP_Client(void* arg) { 127 Client_Param* cp = (Client_Param*)arg; 128 PRFileDesc* sockfd; 129 buffer *in_buf, *out_buf; 130 union PRNetAddr netaddr; 131 PRInt32 bytes, i, j; 132 133 DPRINTF(("TCP client started\n")); 134 bytes = cp->datalen; 135 out_buf = PR_NEW(buffer); 136 if (out_buf == NULL) { 137 fprintf(stderr, "%s: failed to alloc buffer struct\n", program_name); 138 failed_already = 1; 139 return; 140 } 141 in_buf = PR_NEW(buffer); 142 if (in_buf == NULL) { 143 fprintf(stderr, "%s: failed to alloc buffer struct\n", program_name); 144 failed_already = 1; 145 return; 146 } 147 netaddr.inet.family = cp->server_addr.inet.family; 148 netaddr.inet.port = cp->server_addr.inet.port; 149 netaddr.inet.ip = cp->server_addr.inet.ip; 150 151 for (i = 0; i < num_tcp_connections_per_client; i++) { 152 if ((sockfd = PR_OpenTCPSocket(PR_AF_INET)) == NULL) { 153 fprintf(stderr, "%s: PR_OpenTCPSocket failed\n", program_name); 154 failed_already = 1; 155 return; 156 } 157 158 DPRINTF(("TCP client connecting to server:%d\n", server_port)); 159 if (PR_Connect(sockfd, &netaddr, PR_INTERVAL_NO_TIMEOUT) < 0) { 160 fprintf(stderr, "PR_Connect failed: (%ld, %ld)\n", PR_GetError(), 161 PR_GetOSError()); 162 failed_already = 1; 163 return; 164 } 165 for (j = 0; j < num_tcp_mesgs_per_connection; j++) { 166 /* 167 * fill in random data 168 */ 169 memset(out_buf->data, ((PRInt32)(&netaddr)) + i + j, bytes); 170 /* 171 * write to server 172 */ 173 if (writen(sockfd, out_buf->data, bytes) < bytes) { 174 fprintf(stderr, "%s: ERROR - TCP_Client:writen\n", program_name); 175 failed_already = 1; 176 return; 177 } 178 /* 179 DPRINTF(("TCP Client [0x%lx]: out_buf = 0x%lx out_buf[0] = 0x%lx\n", 180 PR_GetCurrentThread(), out_buf, (*((int *) out_buf->data)))); 181 */ 182 if (readn(sockfd, in_buf->data, bytes) < bytes) { 183 fprintf(stderr, "%s: ERROR - TCP_Client:readn\n", program_name); 184 failed_already = 1; 185 return; 186 } 187 /* 188 * verify the data read 189 */ 190 if (memcmp(in_buf->data, out_buf->data, bytes) != 0) { 191 fprintf(stderr, "%s: ERROR - data corruption\n", program_name); 192 failed_already = 1; 193 return; 194 } 195 } 196 /* 197 * shutdown reads and writes 198 */ 199 if (PR_Shutdown(sockfd, PR_SHUTDOWN_BOTH) < 0) { 200 fprintf(stderr, "%s: ERROR - PR_Shutdown\n", program_name); 201 failed_already = 1; 202 } 203 PR_Close(sockfd); 204 } 205 206 PR_DELETE(out_buf); 207 PR_DELETE(in_buf); 208 209 /* 210 * Decrement exit_counter and notify parent thread 211 */ 212 213 PR_EnterMonitor(cp->exit_mon); 214 --(*cp->exit_counter); 215 PR_Notify(cp->exit_mon); 216 PR_ExitMonitor(cp->exit_mon); 217 DPRINTF(("TCP_Client exiting\n")); 218 } 219 220 /* 221 * TCP_Socket_Client_Server_Test - concurrent server test 222 * 223 * Each client connects to the server and sends a chunk of data 224 * For each connection, server reads the data 225 * from the client and sends it back to the client, unmodified. 226 * Each client checks that data received from server is same as the 227 * data it sent to the server. 228 * 229 */ 230 231 static PRInt32 TCP_Socket_Client_Server_Test(void) { 232 int i; 233 Client_Param* cparamp; 234 PRMonitor* mon2; 235 PRInt32 datalen; 236 PRInt32 connections = 0; 237 PRThread* thr; 238 239 datalen = tcp_mesg_size; 240 connections = 0; 241 242 mon2 = PR_NewMonitor(); 243 if (mon2 == NULL) { 244 fprintf(stderr, "%s: PR_NewMonitor failed\n", program_name); 245 failed_already = 1; 246 return -1; 247 } 248 249 /* 250 * Start client jobs 251 */ 252 cparamp = PR_NEW(Client_Param); 253 if (cparamp == NULL) { 254 fprintf(stderr, "%s: PR_NEW failed\n", program_name); 255 failed_already = 1; 256 return -1; 257 } 258 cparamp->server_addr.inet.family = PR_AF_INET; 259 cparamp->server_addr.inet.port = PR_htons(server_port); 260 cparamp->server_addr.inet.ip = PR_htonl(PR_INADDR_LOOPBACK); 261 cparamp->exit_mon = mon2; 262 cparamp->exit_counter = &connections; 263 cparamp->datalen = datalen; 264 for (i = 0; i < num_tcp_clients; i++) { 265 thr = PR_CreateThread(PR_USER_THREAD, TCP_Client, (void*)cparamp, 266 PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, 267 PR_UNJOINABLE_THREAD, 0); 268 if (NULL == thr) { 269 fprintf(stderr, "%s: PR_CreateThread failed\n", program_name); 270 failed_already = 1; 271 return -1; 272 } 273 PR_EnterMonitor(mon2); 274 connections++; 275 PR_ExitMonitor(mon2); 276 DPRINTF(("Created TCP client = 0x%lx\n", thr)); 277 } 278 /* Wait for client jobs to exit */ 279 PR_EnterMonitor(mon2); 280 while (0 != connections) { 281 PR_Wait(mon2, PR_INTERVAL_NO_TIMEOUT); 282 DPRINTF(("Client job count = %d\n", connections)); 283 } 284 PR_ExitMonitor(mon2); 285 printf("%30s", "TCP_Socket_Client_Server_Test:"); 286 printf("%2ld Server %2ld Clients %2ld connections_per_client\n", 1l, 287 num_tcp_clients, num_tcp_connections_per_client); 288 printf("%30s %2ld messages_per_connection %4ld bytes_per_message\n", ":", 289 num_tcp_mesgs_per_connection, tcp_mesg_size); 290 291 PR_DELETE(cparamp); 292 return 0; 293 } 294 295 /************************************************************************/ 296 297 int main(int argc, char** argv) { 298 /* 299 * -d debug mode 300 */ 301 PLOptStatus os; 302 PLOptState* opt; 303 program_name = argv[0]; 304 305 opt = PL_CreateOptState(argc, argv, "dp:"); 306 while (PL_OPT_EOL != (os = PL_GetNextOpt(opt))) { 307 if (PL_OPT_BAD == os) { 308 continue; 309 } 310 switch (opt->option) { 311 case 'd': /* debug mode */ 312 _debug_on = 1; 313 break; 314 case 'p': 315 server_port = atoi(opt->value); 316 break; 317 default: 318 break; 319 } 320 } 321 PL_DestroyOptState(opt); 322 323 PR_Init(PR_USER_THREAD, PR_PRIORITY_NORMAL, 0); 324 325 PR_SetConcurrency(4); 326 327 TCP_Socket_Client_Server_Test(); 328 329 PR_Cleanup(); 330 if (failed_already) { 331 return 1; 332 } else { 333 return 0; 334 } 335 }