tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

thruput.c (11139B)


      1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
      2 /* This Source Code Form is subject to the terms of the Mozilla Public
      3 * License, v. 2.0. If a copy of the MPL was not distributed with this
      4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
      5 
      6 /*
      7 ** File:        thruput.c
      8 ** Description: Test server's throughput capability comparing various
      9 **              implmentation strategies.
     10 **
     11 ** Note:        Requires a server machine and an aribitrary number of
     12 **              clients to bang on it. Trust the numbers on the server
     13 **              more than those being displayed by the various clients.
     14 */
     15 
     16 #include "prerror.h"
     17 #include "prinrval.h"
     18 #include "prinit.h"
     19 #include "prio.h"
     20 #include "prlock.h"
     21 #include "prmem.h"
     22 #include "prnetdb.h"
     23 #include "prprf.h"
     24 #include "prthread.h"
     25 #include "pprio.h"
     26 #include "plerror.h"
     27 #include "plgetopt.h"
     28 
     29 #define ADDR_BUFFER 100
     30 
     31 #ifdef DEBUG
     32 #  define PORT_INC_DO +100
     33 #else
     34 #  define PORT_INC_DO
     35 #endif
     36 #ifdef IS_64
     37 #  define PORT_INC_3264 +200
     38 #else
     39 #  define PORT_INC_3264
     40 #endif
     41 
     42 #define PORT_NUMBER 51877 PORT_INC_DO PORT_INC_3264
     43 
     44 #define SAMPLING_INTERVAL 10
     45 #define BUFFER_SIZE (32 * 1024)
     46 
     47 static PRInt32 domain = PR_AF_INET;
     48 static PRInt32 protocol = 6; /* TCP */
     49 static PRFileDesc* err = NULL;
     50 static PRIntn concurrency = 1;
     51 static PRInt32 xport_buffer = -1;
     52 static PRUint32 initial_streams = 1;
     53 static PRInt32 buffer_size = BUFFER_SIZE;
     54 static PRThreadScope thread_scope = PR_LOCAL_THREAD;
     55 
     56 typedef struct Shared {
     57  PRLock* ml;
     58  PRUint32 sampled;
     59  PRUint32 threads;
     60  PRIntervalTime timein;
     61  PRNetAddr server_address;
     62 } Shared;
     63 
     64 static Shared* shared = NULL;
     65 
     66 static PRStatus PrintAddress(const PRNetAddr* address) {
     67  char buffer[ADDR_BUFFER];
     68  PRStatus rv = PR_NetAddrToString(address, buffer, sizeof(buffer));
     69  if (PR_SUCCESS == rv) {
     70    PR_fprintf(err, "%s:%u\n", buffer, PR_ntohs(address->inet.port));
     71  } else {
     72    PL_FPrintError(err, "PR_NetAddrToString");
     73  }
     74  return rv;
     75 } /* PrintAddress */
     76 
     77 static void PR_CALLBACK Clientel(void* arg) {
     78  PRStatus rv;
     79  PRFileDesc* xport;
     80  PRInt32 bytes, sampled;
     81  PRIntervalTime now, interval;
     82  PRBool do_display = PR_FALSE;
     83  Shared* shared = (Shared*)arg;
     84  char* buffer = (char*)PR_Malloc(buffer_size);
     85  PRNetAddr* server_address = &shared->server_address;
     86  PRIntervalTime connect_timeout = PR_SecondsToInterval(5);
     87  PRIntervalTime sampling_interval = PR_SecondsToInterval(SAMPLING_INTERVAL);
     88 
     89  PR_fprintf(err, "Client connecting to ");
     90  (void)PrintAddress(server_address);
     91 
     92  do {
     93    xport = PR_Socket(domain, PR_SOCK_STREAM, protocol);
     94    if (NULL == xport) {
     95      PL_FPrintError(err, "PR_Socket");
     96      return;
     97    }
     98 
     99    if (xport_buffer != -1) {
    100      PRSocketOptionData data;
    101      data.option = PR_SockOpt_RecvBufferSize;
    102      data.value.recv_buffer_size = (PRSize)xport_buffer;
    103      rv = PR_SetSocketOption(xport, &data);
    104      if (PR_FAILURE == rv) {
    105        PL_FPrintError(err, "PR_SetSocketOption - ignored");
    106      }
    107      data.option = PR_SockOpt_SendBufferSize;
    108      data.value.send_buffer_size = (PRSize)xport_buffer;
    109      rv = PR_SetSocketOption(xport, &data);
    110      if (PR_FAILURE == rv) {
    111        PL_FPrintError(err, "PR_SetSocketOption - ignored");
    112      }
    113    }
    114 
    115    rv = PR_Connect(xport, server_address, connect_timeout);
    116    if (PR_FAILURE == rv) {
    117      PL_FPrintError(err, "PR_Connect");
    118      if (PR_IO_TIMEOUT_ERROR != PR_GetError()) {
    119        PR_Sleep(connect_timeout);
    120      }
    121      PR_Close(xport); /* delete it and start over */
    122    }
    123  } while (PR_FAILURE == rv);
    124 
    125  do {
    126    bytes = PR_Recv(xport, buffer, buffer_size, 0, PR_INTERVAL_NO_TIMEOUT);
    127    PR_Lock(shared->ml);
    128    now = PR_IntervalNow();
    129    shared->sampled += bytes;
    130    interval = now - shared->timein;
    131    if (interval > sampling_interval) {
    132      sampled = shared->sampled;
    133      shared->timein = now;
    134      shared->sampled = 0;
    135      do_display = PR_TRUE;
    136    }
    137    PR_Unlock(shared->ml);
    138 
    139    if (do_display) {
    140      PRUint32 rate = sampled / PR_IntervalToMilliseconds(interval);
    141      PR_fprintf(err, "%u streams @ %u Kbytes/sec\n", shared->threads, rate);
    142      do_display = PR_FALSE;
    143    }
    144 
    145  } while (bytes > 0);
    146 } /* Clientel */
    147 
    148 static void Client(const char* server_name) {
    149  PRStatus rv;
    150  PRHostEnt host;
    151  char buffer[PR_NETDB_BUF_SIZE];
    152  PRIntervalTime dally = PR_SecondsToInterval(60);
    153  PR_fprintf(err, "Translating the name %s\n", server_name);
    154  rv = PR_GetHostByName(server_name, buffer, sizeof(buffer), &host);
    155  if (PR_FAILURE == rv) {
    156    PL_FPrintError(err, "PR_GetHostByName");
    157  } else {
    158    if (PR_EnumerateHostEnt(0, &host, PORT_NUMBER, &shared->server_address) <
    159        0) {
    160      PL_FPrintError(err, "PR_EnumerateHostEnt");
    161    } else {
    162      do {
    163        shared->threads += 1;
    164        (void)PR_CreateThread(PR_USER_THREAD, Clientel, shared,
    165                              PR_PRIORITY_NORMAL, thread_scope,
    166                              PR_UNJOINABLE_THREAD, 8 * 1024);
    167        if (shared->threads == initial_streams) {
    168          PR_Sleep(dally);
    169          initial_streams += 1;
    170        }
    171      } while (PR_TRUE);
    172    }
    173  }
    174 }
    175 
    176 static void PR_CALLBACK Servette(void* arg) {
    177  PRInt32 bytes, sampled;
    178  PRIntervalTime now, interval;
    179  PRBool do_display = PR_FALSE;
    180  PRFileDesc* client = (PRFileDesc*)arg;
    181  char* buffer = (char*)PR_Malloc(buffer_size);
    182  PRIntervalTime sampling_interval = PR_SecondsToInterval(SAMPLING_INTERVAL);
    183 
    184  if (xport_buffer != -1) {
    185    PRStatus rv;
    186    PRSocketOptionData data;
    187    data.option = PR_SockOpt_RecvBufferSize;
    188    data.value.recv_buffer_size = (PRSize)xport_buffer;
    189    rv = PR_SetSocketOption(client, &data);
    190    if (PR_FAILURE == rv) {
    191      PL_FPrintError(err, "PR_SetSocketOption - ignored");
    192    }
    193    data.option = PR_SockOpt_SendBufferSize;
    194    data.value.send_buffer_size = (PRSize)xport_buffer;
    195    rv = PR_SetSocketOption(client, &data);
    196    if (PR_FAILURE == rv) {
    197      PL_FPrintError(err, "PR_SetSocketOption - ignored");
    198    }
    199  }
    200 
    201  do {
    202    bytes = PR_Send(client, buffer, buffer_size, 0, PR_INTERVAL_NO_TIMEOUT);
    203 
    204    PR_Lock(shared->ml);
    205    now = PR_IntervalNow();
    206    shared->sampled += bytes;
    207    interval = now - shared->timein;
    208    if (interval > sampling_interval) {
    209      sampled = shared->sampled;
    210      shared->timein = now;
    211      shared->sampled = 0;
    212      do_display = PR_TRUE;
    213    }
    214    PR_Unlock(shared->ml);
    215 
    216    if (do_display) {
    217      PRUint32 rate = sampled / PR_IntervalToMilliseconds(interval);
    218      PR_fprintf(err, "%u streams @ %u Kbytes/sec\n", shared->threads, rate);
    219      do_display = PR_FALSE;
    220    }
    221  } while (bytes > 0);
    222 } /* Servette */
    223 
    224 static void Server(void) {
    225  PRStatus rv;
    226  PRNetAddr server_address, client_address;
    227  PRFileDesc* xport = PR_Socket(domain, PR_SOCK_STREAM, protocol);
    228 
    229  if (NULL == xport) {
    230    PL_FPrintError(err, "PR_Socket");
    231    return;
    232  }
    233 
    234  rv = PR_InitializeNetAddr(PR_IpAddrAny, PORT_NUMBER, &server_address);
    235  if (PR_FAILURE == rv) {
    236    PL_FPrintError(err, "PR_InitializeNetAddr");
    237  } else {
    238    rv = PR_Bind(xport, &server_address);
    239    if (PR_FAILURE == rv) {
    240      PL_FPrintError(err, "PR_Bind");
    241    } else {
    242      PRFileDesc* client;
    243      rv = PR_Listen(xport, 10);
    244      PR_fprintf(err, "Server listening on ");
    245      (void)PrintAddress(&server_address);
    246      do {
    247        client = PR_Accept(xport, &client_address, PR_INTERVAL_NO_TIMEOUT);
    248        if (NULL == client) {
    249          PL_FPrintError(err, "PR_Accept");
    250        } else {
    251          PR_fprintf(err, "Server accepting from ");
    252          (void)PrintAddress(&client_address);
    253          shared->threads += 1;
    254          (void)PR_CreateThread(PR_USER_THREAD, Servette, client,
    255                                PR_PRIORITY_NORMAL, thread_scope,
    256                                PR_UNJOINABLE_THREAD, 8 * 1024);
    257        }
    258      } while (PR_TRUE);
    259    }
    260  }
    261 } /* Server */
    262 
    263 static void Help(void) {
    264  PR_fprintf(err, "Usage: [-h] [<server>]\n");
    265  PR_fprintf(err, "\t-s <n>   Initial # of connections        (default: 1)\n");
    266  PR_fprintf(err, "\t-C <n>   Set 'concurrency'               (default: 1)\n");
    267  PR_fprintf(err,
    268             "\t-b <nK>  Client buffer size              (default: 32k)\n");
    269  PR_fprintf(err,
    270             "\t-B <nK>  Transport recv/send buffer size (default: sys)\n");
    271  PR_fprintf(err,
    272             "\t-G       Use GLOBAL threads              (default: LOCAL)\n");
    273  PR_fprintf(err,
    274             "\t-X       Use XTP transport               (default: TCP)\n");
    275  PR_fprintf(err,
    276             "\t-6       Use IPv6                        (default: IPv4)\n");
    277  PR_fprintf(err, "\t-h       This message and nothing else\n");
    278  PR_fprintf(err, "\t<server> DNS name of server\n");
    279  PR_fprintf(err, "\t\tIf <server> is not specified, this host will be\n");
    280  PR_fprintf(err, "\t\tthe server and not act as a client.\n");
    281 } /* Help */
    282 
    283 int main(int argc, char** argv) {
    284  PLOptStatus os;
    285  const char* server_name = NULL;
    286  PLOptState* opt = PL_CreateOptState(argc, argv, "hGX6C:b:s:B:");
    287 
    288  err = PR_GetSpecialFD(PR_StandardError);
    289 
    290  while (PL_OPT_EOL != (os = PL_GetNextOpt(opt))) {
    291    if (PL_OPT_BAD == os) {
    292      continue;
    293    }
    294    switch (opt->option) {
    295      case 0: /* Name of server */
    296        server_name = opt->value;
    297        break;
    298      case 'G': /* Globular threads */
    299        thread_scope = PR_GLOBAL_THREAD;
    300        break;
    301      case 'X': /* Use XTP as the transport */
    302        protocol = 36;
    303        break;
    304      case '6': /* Use IPv6 */
    305        domain = PR_AF_INET6;
    306        break;
    307      case 's': /* initial_streams */
    308        initial_streams = atoi(opt->value);
    309        break;
    310      case 'C': /* concurrency */
    311        concurrency = atoi(opt->value);
    312        break;
    313      case 'b': /* buffer size */
    314        buffer_size = 1024 * atoi(opt->value);
    315        break;
    316      case 'B': /* buffer size */
    317        xport_buffer = 1024 * atoi(opt->value);
    318        break;
    319      case 'h': /* user wants some guidance */
    320      default:
    321        Help();   /* so give him an earful */
    322        return 2; /* but not a lot else */
    323    }
    324  }
    325  PL_DestroyOptState(opt);
    326 
    327  shared = PR_NEWZAP(Shared);
    328  shared->ml = PR_NewLock();
    329 
    330  PR_fprintf(err, "This machine is %s\n",
    331             (NULL == server_name) ? "the SERVER" : "a CLIENT");
    332 
    333  PR_fprintf(err, "Transport being used is %s\n",
    334             (6 == protocol) ? "TCP" : "XTP");
    335 
    336  if (PR_GLOBAL_THREAD == thread_scope) {
    337    if (1 != concurrency) {
    338      PR_fprintf(err, "  **Concurrency > 1 and GLOBAL threads!?!?\n");
    339      PR_fprintf(err, "  **Ignoring concurrency\n");
    340      concurrency = 1;
    341    }
    342  }
    343 
    344  if (1 != concurrency) {
    345    PR_SetConcurrency(concurrency);
    346    PR_fprintf(err, "Concurrency set to %u\n", concurrency);
    347  }
    348 
    349  PR_fprintf(err, "All threads will be %s\n",
    350             (PR_GLOBAL_THREAD == thread_scope) ? "GLOBAL" : "LOCAL");
    351 
    352  PR_fprintf(err, "Client buffer size will be %u\n", buffer_size);
    353 
    354  if (-1 != xport_buffer)
    355    PR_fprintf(err, "Transport send & receive buffer size will be %u\n",
    356               xport_buffer);
    357 
    358  if (NULL == server_name) {
    359    Server();
    360  } else {
    361    Client(server_name);
    362  }
    363 
    364  return 0;
    365 } /* main */
    366 
    367 /* thruput.c */