thruput.c (11139B)
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* This Source Code Form is subject to the terms of the Mozilla Public 3 * License, v. 2.0. If a copy of the MPL was not distributed with this 4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 5 6 /* 7 ** File: thruput.c 8 ** Description: Test server's throughput capability comparing various 9 ** implmentation strategies. 10 ** 11 ** Note: Requires a server machine and an aribitrary number of 12 ** clients to bang on it. Trust the numbers on the server 13 ** more than those being displayed by the various clients. 14 */ 15 16 #include "prerror.h" 17 #include "prinrval.h" 18 #include "prinit.h" 19 #include "prio.h" 20 #include "prlock.h" 21 #include "prmem.h" 22 #include "prnetdb.h" 23 #include "prprf.h" 24 #include "prthread.h" 25 #include "pprio.h" 26 #include "plerror.h" 27 #include "plgetopt.h" 28 29 #define ADDR_BUFFER 100 30 31 #ifdef DEBUG 32 # define PORT_INC_DO +100 33 #else 34 # define PORT_INC_DO 35 #endif 36 #ifdef IS_64 37 # define PORT_INC_3264 +200 38 #else 39 # define PORT_INC_3264 40 #endif 41 42 #define PORT_NUMBER 51877 PORT_INC_DO PORT_INC_3264 43 44 #define SAMPLING_INTERVAL 10 45 #define BUFFER_SIZE (32 * 1024) 46 47 static PRInt32 domain = PR_AF_INET; 48 static PRInt32 protocol = 6; /* TCP */ 49 static PRFileDesc* err = NULL; 50 static PRIntn concurrency = 1; 51 static PRInt32 xport_buffer = -1; 52 static PRUint32 initial_streams = 1; 53 static PRInt32 buffer_size = BUFFER_SIZE; 54 static PRThreadScope thread_scope = PR_LOCAL_THREAD; 55 56 typedef struct Shared { 57 PRLock* ml; 58 PRUint32 sampled; 59 PRUint32 threads; 60 PRIntervalTime timein; 61 PRNetAddr server_address; 62 } Shared; 63 64 static Shared* shared = NULL; 65 66 static PRStatus PrintAddress(const PRNetAddr* address) { 67 char buffer[ADDR_BUFFER]; 68 PRStatus rv = PR_NetAddrToString(address, buffer, sizeof(buffer)); 69 if (PR_SUCCESS == rv) { 70 PR_fprintf(err, "%s:%u\n", buffer, PR_ntohs(address->inet.port)); 71 } else { 72 PL_FPrintError(err, "PR_NetAddrToString"); 73 } 74 return rv; 75 } /* PrintAddress */ 76 77 static void PR_CALLBACK Clientel(void* arg) { 78 PRStatus rv; 79 PRFileDesc* xport; 80 PRInt32 bytes, sampled; 81 PRIntervalTime now, interval; 82 PRBool do_display = PR_FALSE; 83 Shared* shared = (Shared*)arg; 84 char* buffer = (char*)PR_Malloc(buffer_size); 85 PRNetAddr* server_address = &shared->server_address; 86 PRIntervalTime connect_timeout = PR_SecondsToInterval(5); 87 PRIntervalTime sampling_interval = PR_SecondsToInterval(SAMPLING_INTERVAL); 88 89 PR_fprintf(err, "Client connecting to "); 90 (void)PrintAddress(server_address); 91 92 do { 93 xport = PR_Socket(domain, PR_SOCK_STREAM, protocol); 94 if (NULL == xport) { 95 PL_FPrintError(err, "PR_Socket"); 96 return; 97 } 98 99 if (xport_buffer != -1) { 100 PRSocketOptionData data; 101 data.option = PR_SockOpt_RecvBufferSize; 102 data.value.recv_buffer_size = (PRSize)xport_buffer; 103 rv = PR_SetSocketOption(xport, &data); 104 if (PR_FAILURE == rv) { 105 PL_FPrintError(err, "PR_SetSocketOption - ignored"); 106 } 107 data.option = PR_SockOpt_SendBufferSize; 108 data.value.send_buffer_size = (PRSize)xport_buffer; 109 rv = PR_SetSocketOption(xport, &data); 110 if (PR_FAILURE == rv) { 111 PL_FPrintError(err, "PR_SetSocketOption - ignored"); 112 } 113 } 114 115 rv = PR_Connect(xport, server_address, connect_timeout); 116 if (PR_FAILURE == rv) { 117 PL_FPrintError(err, "PR_Connect"); 118 if (PR_IO_TIMEOUT_ERROR != PR_GetError()) { 119 PR_Sleep(connect_timeout); 120 } 121 PR_Close(xport); /* delete it and start over */ 122 } 123 } while (PR_FAILURE == rv); 124 125 do { 126 bytes = PR_Recv(xport, buffer, buffer_size, 0, PR_INTERVAL_NO_TIMEOUT); 127 PR_Lock(shared->ml); 128 now = PR_IntervalNow(); 129 shared->sampled += bytes; 130 interval = now - shared->timein; 131 if (interval > sampling_interval) { 132 sampled = shared->sampled; 133 shared->timein = now; 134 shared->sampled = 0; 135 do_display = PR_TRUE; 136 } 137 PR_Unlock(shared->ml); 138 139 if (do_display) { 140 PRUint32 rate = sampled / PR_IntervalToMilliseconds(interval); 141 PR_fprintf(err, "%u streams @ %u Kbytes/sec\n", shared->threads, rate); 142 do_display = PR_FALSE; 143 } 144 145 } while (bytes > 0); 146 } /* Clientel */ 147 148 static void Client(const char* server_name) { 149 PRStatus rv; 150 PRHostEnt host; 151 char buffer[PR_NETDB_BUF_SIZE]; 152 PRIntervalTime dally = PR_SecondsToInterval(60); 153 PR_fprintf(err, "Translating the name %s\n", server_name); 154 rv = PR_GetHostByName(server_name, buffer, sizeof(buffer), &host); 155 if (PR_FAILURE == rv) { 156 PL_FPrintError(err, "PR_GetHostByName"); 157 } else { 158 if (PR_EnumerateHostEnt(0, &host, PORT_NUMBER, &shared->server_address) < 159 0) { 160 PL_FPrintError(err, "PR_EnumerateHostEnt"); 161 } else { 162 do { 163 shared->threads += 1; 164 (void)PR_CreateThread(PR_USER_THREAD, Clientel, shared, 165 PR_PRIORITY_NORMAL, thread_scope, 166 PR_UNJOINABLE_THREAD, 8 * 1024); 167 if (shared->threads == initial_streams) { 168 PR_Sleep(dally); 169 initial_streams += 1; 170 } 171 } while (PR_TRUE); 172 } 173 } 174 } 175 176 static void PR_CALLBACK Servette(void* arg) { 177 PRInt32 bytes, sampled; 178 PRIntervalTime now, interval; 179 PRBool do_display = PR_FALSE; 180 PRFileDesc* client = (PRFileDesc*)arg; 181 char* buffer = (char*)PR_Malloc(buffer_size); 182 PRIntervalTime sampling_interval = PR_SecondsToInterval(SAMPLING_INTERVAL); 183 184 if (xport_buffer != -1) { 185 PRStatus rv; 186 PRSocketOptionData data; 187 data.option = PR_SockOpt_RecvBufferSize; 188 data.value.recv_buffer_size = (PRSize)xport_buffer; 189 rv = PR_SetSocketOption(client, &data); 190 if (PR_FAILURE == rv) { 191 PL_FPrintError(err, "PR_SetSocketOption - ignored"); 192 } 193 data.option = PR_SockOpt_SendBufferSize; 194 data.value.send_buffer_size = (PRSize)xport_buffer; 195 rv = PR_SetSocketOption(client, &data); 196 if (PR_FAILURE == rv) { 197 PL_FPrintError(err, "PR_SetSocketOption - ignored"); 198 } 199 } 200 201 do { 202 bytes = PR_Send(client, buffer, buffer_size, 0, PR_INTERVAL_NO_TIMEOUT); 203 204 PR_Lock(shared->ml); 205 now = PR_IntervalNow(); 206 shared->sampled += bytes; 207 interval = now - shared->timein; 208 if (interval > sampling_interval) { 209 sampled = shared->sampled; 210 shared->timein = now; 211 shared->sampled = 0; 212 do_display = PR_TRUE; 213 } 214 PR_Unlock(shared->ml); 215 216 if (do_display) { 217 PRUint32 rate = sampled / PR_IntervalToMilliseconds(interval); 218 PR_fprintf(err, "%u streams @ %u Kbytes/sec\n", shared->threads, rate); 219 do_display = PR_FALSE; 220 } 221 } while (bytes > 0); 222 } /* Servette */ 223 224 static void Server(void) { 225 PRStatus rv; 226 PRNetAddr server_address, client_address; 227 PRFileDesc* xport = PR_Socket(domain, PR_SOCK_STREAM, protocol); 228 229 if (NULL == xport) { 230 PL_FPrintError(err, "PR_Socket"); 231 return; 232 } 233 234 rv = PR_InitializeNetAddr(PR_IpAddrAny, PORT_NUMBER, &server_address); 235 if (PR_FAILURE == rv) { 236 PL_FPrintError(err, "PR_InitializeNetAddr"); 237 } else { 238 rv = PR_Bind(xport, &server_address); 239 if (PR_FAILURE == rv) { 240 PL_FPrintError(err, "PR_Bind"); 241 } else { 242 PRFileDesc* client; 243 rv = PR_Listen(xport, 10); 244 PR_fprintf(err, "Server listening on "); 245 (void)PrintAddress(&server_address); 246 do { 247 client = PR_Accept(xport, &client_address, PR_INTERVAL_NO_TIMEOUT); 248 if (NULL == client) { 249 PL_FPrintError(err, "PR_Accept"); 250 } else { 251 PR_fprintf(err, "Server accepting from "); 252 (void)PrintAddress(&client_address); 253 shared->threads += 1; 254 (void)PR_CreateThread(PR_USER_THREAD, Servette, client, 255 PR_PRIORITY_NORMAL, thread_scope, 256 PR_UNJOINABLE_THREAD, 8 * 1024); 257 } 258 } while (PR_TRUE); 259 } 260 } 261 } /* Server */ 262 263 static void Help(void) { 264 PR_fprintf(err, "Usage: [-h] [<server>]\n"); 265 PR_fprintf(err, "\t-s <n> Initial # of connections (default: 1)\n"); 266 PR_fprintf(err, "\t-C <n> Set 'concurrency' (default: 1)\n"); 267 PR_fprintf(err, 268 "\t-b <nK> Client buffer size (default: 32k)\n"); 269 PR_fprintf(err, 270 "\t-B <nK> Transport recv/send buffer size (default: sys)\n"); 271 PR_fprintf(err, 272 "\t-G Use GLOBAL threads (default: LOCAL)\n"); 273 PR_fprintf(err, 274 "\t-X Use XTP transport (default: TCP)\n"); 275 PR_fprintf(err, 276 "\t-6 Use IPv6 (default: IPv4)\n"); 277 PR_fprintf(err, "\t-h This message and nothing else\n"); 278 PR_fprintf(err, "\t<server> DNS name of server\n"); 279 PR_fprintf(err, "\t\tIf <server> is not specified, this host will be\n"); 280 PR_fprintf(err, "\t\tthe server and not act as a client.\n"); 281 } /* Help */ 282 283 int main(int argc, char** argv) { 284 PLOptStatus os; 285 const char* server_name = NULL; 286 PLOptState* opt = PL_CreateOptState(argc, argv, "hGX6C:b:s:B:"); 287 288 err = PR_GetSpecialFD(PR_StandardError); 289 290 while (PL_OPT_EOL != (os = PL_GetNextOpt(opt))) { 291 if (PL_OPT_BAD == os) { 292 continue; 293 } 294 switch (opt->option) { 295 case 0: /* Name of server */ 296 server_name = opt->value; 297 break; 298 case 'G': /* Globular threads */ 299 thread_scope = PR_GLOBAL_THREAD; 300 break; 301 case 'X': /* Use XTP as the transport */ 302 protocol = 36; 303 break; 304 case '6': /* Use IPv6 */ 305 domain = PR_AF_INET6; 306 break; 307 case 's': /* initial_streams */ 308 initial_streams = atoi(opt->value); 309 break; 310 case 'C': /* concurrency */ 311 concurrency = atoi(opt->value); 312 break; 313 case 'b': /* buffer size */ 314 buffer_size = 1024 * atoi(opt->value); 315 break; 316 case 'B': /* buffer size */ 317 xport_buffer = 1024 * atoi(opt->value); 318 break; 319 case 'h': /* user wants some guidance */ 320 default: 321 Help(); /* so give him an earful */ 322 return 2; /* but not a lot else */ 323 } 324 } 325 PL_DestroyOptState(opt); 326 327 shared = PR_NEWZAP(Shared); 328 shared->ml = PR_NewLock(); 329 330 PR_fprintf(err, "This machine is %s\n", 331 (NULL == server_name) ? "the SERVER" : "a CLIENT"); 332 333 PR_fprintf(err, "Transport being used is %s\n", 334 (6 == protocol) ? "TCP" : "XTP"); 335 336 if (PR_GLOBAL_THREAD == thread_scope) { 337 if (1 != concurrency) { 338 PR_fprintf(err, " **Concurrency > 1 and GLOBAL threads!?!?\n"); 339 PR_fprintf(err, " **Ignoring concurrency\n"); 340 concurrency = 1; 341 } 342 } 343 344 if (1 != concurrency) { 345 PR_SetConcurrency(concurrency); 346 PR_fprintf(err, "Concurrency set to %u\n", concurrency); 347 } 348 349 PR_fprintf(err, "All threads will be %s\n", 350 (PR_GLOBAL_THREAD == thread_scope) ? "GLOBAL" : "LOCAL"); 351 352 PR_fprintf(err, "Client buffer size will be %u\n", buffer_size); 353 354 if (-1 != xport_buffer) 355 PR_fprintf(err, "Transport send & receive buffer size will be %u\n", 356 xport_buffer); 357 358 if (NULL == server_name) { 359 Server(); 360 } else { 361 Client(server_name); 362 } 363 364 return 0; 365 } /* main */ 366 367 /* thruput.c */