tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

thrpool_server.c (14642B)


      1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
      2 /* This Source Code Form is subject to the terms of the Mozilla Public
      3 * License, v. 2.0. If a copy of the MPL was not distributed with this
      4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
      5 
      6 /***********************************************************************
      7 **
      8 ** Name: thrpool.c
      9 **
     10 ** Description: Test threadpool functionality.
     11 **
     12 ** Modification History:
     13 */
     14 #include "primpl.h"
     15 
     16 #include "plgetopt.h"
     17 
     18 #include <stdio.h>
     19 #include <string.h>
     20 #include <errno.h>
     21 #ifdef XP_UNIX
     22 #  include <sys/mman.h>
     23 #endif
     24 #if defined(_PR_PTHREADS)
     25 #  include <pthread.h>
     26 #endif
     27 
     28 /* for getcwd */
     29 #if defined(XP_UNIX)
     30 #  include <unistd.h>
     31 #elif defined(XP_PC)
     32 #  include <direct.h>
     33 #endif
     34 
     35 #ifdef WIN32
     36 #  include <process.h>
     37 #endif
     38 
     39 static int _debug_on = 0;
     40 static char* program_name = NULL;
     41 static void serve_client_write(void* arg);
     42 
     43 #include "obsolete/prsem.h"
     44 
     45 #ifdef XP_PC
     46 #  define mode_t int
     47 #endif
     48 
     49 #define DPRINTF(arg) \
     50  if (_debug_on) printf arg
     51 
     52 #define BUF_DATA_SIZE (2 * 1024)
     53 #define TCP_MESG_SIZE 1024
     54 #define NUM_TCP_CLIENTS 10 /* for a listen queue depth of 5 */
     55 
     56 #define NUM_TCP_CONNECTIONS_PER_CLIENT 10
     57 #define NUM_TCP_MESGS_PER_CONNECTION 10
     58 #define TCP_SERVER_PORT 10000
     59 #define SERVER_MAX_BIND_COUNT 100
     60 
     61 #ifdef WINCE
     62 char* getcwd(char* buf, size_t size) {
     63  wchar_t wpath[MAX_PATH];
     64  _wgetcwd(wpath, MAX_PATH);
     65  WideCharToMultiByte(CP_ACP, 0, wpath, -1, buf, size, 0, 0);
     66 }
     67 
     68 #  define perror(s)
     69 #endif
     70 
     71 static PRInt32 num_tcp_clients = NUM_TCP_CLIENTS;
     72 static PRInt32 num_tcp_connections_per_client = NUM_TCP_CONNECTIONS_PER_CLIENT;
     73 static PRInt32 tcp_mesg_size = TCP_MESG_SIZE;
     74 static PRInt32 num_tcp_mesgs_per_connection = NUM_TCP_MESGS_PER_CONNECTION;
     75 static void TCP_Server_Accept(void* arg);
     76 
     77 int failed_already = 0;
     78 typedef struct buffer {
     79  char data[BUF_DATA_SIZE];
     80 } buffer;
     81 
     82 typedef struct Server_Param {
     83  PRJobIoDesc iod; /* socket to read from/write to    */
     84  PRInt32 datalen; /* bytes of data transfered in each read/write */
     85  PRNetAddr netaddr;
     86  PRMonitor* exit_mon;   /* monitor to signal on exit            */
     87  PRInt32* job_counterp; /* counter to decrement, before exit        */
     88  PRInt32 conn_counter;  /* counter to decrement, before exit        */
     89  PRThreadPool* tp;
     90 } Server_Param;
     91 
     92 typedef struct Serve_Client_Param {
     93  PRJobIoDesc iod;       /* socket to read from/write to    */
     94  PRInt32 datalen;       /* bytes of data transfered in each read/write */
     95  PRMonitor* exit_mon;   /* monitor to signal on exit            */
     96  PRInt32* job_counterp; /* counter to decrement, before exit        */
     97  PRThreadPool* tp;
     98 } Serve_Client_Param;
     99 
    100 typedef struct Session {
    101  PRJobIoDesc iod; /* socket to read from/write to    */
    102  buffer* in_buf;
    103  PRInt32 bytes;
    104  PRInt32 msg_num;
    105  PRInt32 bytes_read;
    106  PRMonitor* exit_mon;   /* monitor to signal on exit            */
    107  PRInt32* job_counterp; /* counter to decrement, before exit        */
    108  PRThreadPool* tp;
    109 } Session;
    110 
    111 static void serve_client_read(void* arg) {
    112  Session* sp = (Session*)arg;
    113  int rem;
    114  int bytes;
    115  int offset;
    116  PRFileDesc* sockfd;
    117  char* buf;
    118  PRJob* jobp;
    119 
    120  PRIntervalTime timeout = PR_INTERVAL_NO_TIMEOUT;
    121 
    122  sockfd = sp->iod.socket;
    123  buf = sp->in_buf->data;
    124 
    125  PR_ASSERT(sp->msg_num < num_tcp_mesgs_per_connection);
    126  PR_ASSERT(sp->bytes_read < sp->bytes);
    127 
    128  offset = sp->bytes_read;
    129  rem = sp->bytes - offset;
    130  bytes = PR_Recv(sockfd, buf + offset, rem, 0, timeout);
    131  if (bytes < 0) {
    132    return;
    133  }
    134  sp->bytes_read += bytes;
    135  sp->iod.timeout = PR_SecondsToInterval(60);
    136  if (sp->bytes_read < sp->bytes) {
    137    jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp, PR_FALSE);
    138    PR_ASSERT(NULL != jobp);
    139    return;
    140  }
    141  PR_ASSERT(sp->bytes_read == sp->bytes);
    142  DPRINTF(("serve_client: read complete, msg(%d) \n", sp->msg_num));
    143 
    144  sp->iod.timeout = PR_SecondsToInterval(60);
    145  jobp = PR_QueueJob_Write(sp->tp, &sp->iod, serve_client_write, sp, PR_FALSE);
    146  PR_ASSERT(NULL != jobp);
    147 
    148  return;
    149 }
    150 
    151 static void serve_client_write(void* arg) {
    152  Session* sp = (Session*)arg;
    153  int bytes;
    154  PRFileDesc* sockfd;
    155  char* buf;
    156  PRJob* jobp;
    157 
    158  sockfd = sp->iod.socket;
    159  buf = sp->in_buf->data;
    160 
    161  PR_ASSERT(sp->msg_num < num_tcp_mesgs_per_connection);
    162 
    163  bytes = PR_Send(sockfd, buf, sp->bytes, 0, PR_INTERVAL_NO_TIMEOUT);
    164  PR_ASSERT(bytes == sp->bytes);
    165 
    166  if (bytes < 0) {
    167    return;
    168  }
    169  DPRINTF(("serve_client: write complete, msg(%d) \n", sp->msg_num));
    170  sp->msg_num++;
    171  if (sp->msg_num < num_tcp_mesgs_per_connection) {
    172    sp->bytes_read = 0;
    173    sp->iod.timeout = PR_SecondsToInterval(60);
    174    jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp, PR_FALSE);
    175    PR_ASSERT(NULL != jobp);
    176    return;
    177  }
    178 
    179  DPRINTF(("serve_client: read/write complete, msg(%d) \n", sp->msg_num));
    180  if (PR_Shutdown(sockfd, PR_SHUTDOWN_BOTH) < 0) {
    181    fprintf(stderr, "%s: ERROR - PR_Shutdown\n", program_name);
    182  }
    183 
    184  PR_Close(sockfd);
    185  PR_EnterMonitor(sp->exit_mon);
    186  --(*sp->job_counterp);
    187  PR_Notify(sp->exit_mon);
    188  PR_ExitMonitor(sp->exit_mon);
    189 
    190  PR_DELETE(sp->in_buf);
    191  PR_DELETE(sp);
    192 
    193  return;
    194 }
    195 
    196 /*
    197 * Serve_Client
    198 *    Thread, started by the server, for serving a client connection.
    199 *    Reads data from socket and writes it back, unmodified, and
    200 *    closes the socket
    201 */
    202 static void PR_CALLBACK Serve_Client(void* arg) {
    203  Serve_Client_Param* scp = (Serve_Client_Param*)arg;
    204  buffer* in_buf;
    205  Session* sp;
    206  PRJob* jobp;
    207 
    208  sp = PR_NEW(Session);
    209  sp->iod = scp->iod;
    210 
    211  in_buf = PR_NEW(buffer);
    212  if (in_buf == NULL) {
    213    fprintf(stderr, "%s: failed to alloc buffer struct\n", program_name);
    214    failed_already = 1;
    215    return;
    216  }
    217 
    218  sp->in_buf = in_buf;
    219  sp->bytes = scp->datalen;
    220  sp->msg_num = 0;
    221  sp->bytes_read = 0;
    222  sp->tp = scp->tp;
    223  sp->exit_mon = scp->exit_mon;
    224  sp->job_counterp = scp->job_counterp;
    225 
    226  sp->iod.timeout = PR_SecondsToInterval(60);
    227  jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp, PR_FALSE);
    228  PR_ASSERT(NULL != jobp);
    229  PR_DELETE(scp);
    230 }
    231 
    232 static void print_stats(void* arg) {
    233  Server_Param* sp = (Server_Param*)arg;
    234  PRThreadPool* tp = sp->tp;
    235  PRInt32 counter;
    236  PRJob* jobp;
    237 
    238  PR_EnterMonitor(sp->exit_mon);
    239  counter = (*sp->job_counterp);
    240  PR_ExitMonitor(sp->exit_mon);
    241 
    242  printf("PRINT_STATS: #client connections = %d\n", counter);
    243 
    244  jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(500), print_stats, sp,
    245                           PR_FALSE);
    246 
    247  PR_ASSERT(NULL != jobp);
    248 }
    249 
    250 static int job_counter = 0;
    251 /*
    252 * TCP Server
    253 *    Server binds an address to a socket, starts a client process and
    254 *    listens for incoming connections.
    255 *    Each client connects to the server and sends a chunk of data
    256 *    Starts a Serve_Client job for each incoming connection, to read
    257 *    the data from the client and send it back to the client, unmodified.
    258 *    Each client checks that data received from server is same as the
    259 *    data it sent to the server.
    260 *    Finally, the threadpool is shutdown
    261 */
    262 static void PR_CALLBACK TCP_Server(void* arg) {
    263  PRThreadPool* tp = (PRThreadPool*)arg;
    264  Server_Param* sp;
    265  PRFileDesc* sockfd;
    266  PRNetAddr netaddr;
    267  PRMonitor* sc_mon;
    268  PRJob* jobp;
    269  int i;
    270  PRStatus rval;
    271 
    272  /*
    273   * Create a tcp socket
    274   */
    275  if ((sockfd = PR_NewTCPSocket()) == NULL) {
    276    fprintf(stderr, "%s: PR_NewTCPSocket failed\n", program_name);
    277    return;
    278  }
    279  memset(&netaddr, 0, sizeof(netaddr));
    280  netaddr.inet.family = PR_AF_INET;
    281  netaddr.inet.port = PR_htons(TCP_SERVER_PORT);
    282  netaddr.inet.ip = PR_htonl(PR_INADDR_ANY);
    283  /*
    284   * try a few times to bind server's address, if addresses are in
    285   * use
    286   */
    287  i = 0;
    288  while (PR_Bind(sockfd, &netaddr) < 0) {
    289    if (PR_GetError() == PR_ADDRESS_IN_USE_ERROR) {
    290      netaddr.inet.port += 2;
    291      if (i++ < SERVER_MAX_BIND_COUNT) {
    292        continue;
    293      }
    294    }
    295    fprintf(stderr, "%s: ERROR - PR_Bind failed\n", program_name);
    296    perror("PR_Bind");
    297    failed_already = 1;
    298    return;
    299  }
    300 
    301  if (PR_Listen(sockfd, 32) < 0) {
    302    fprintf(stderr, "%s: ERROR - PR_Listen failed\n", program_name);
    303    failed_already = 1;
    304    return;
    305  }
    306 
    307  if (PR_GetSockName(sockfd, &netaddr) < 0) {
    308    fprintf(stderr, "%s: ERROR - PR_GetSockName failed\n", program_name);
    309    failed_already = 1;
    310    return;
    311  }
    312 
    313  DPRINTF(
    314      ("TCP_Server: PR_BIND netaddr.inet.ip = 0x%lx, netaddr.inet.port = %d\n",
    315       netaddr.inet.ip, netaddr.inet.port));
    316 
    317  sp = PR_NEW(Server_Param);
    318  if (sp == NULL) {
    319    fprintf(stderr, "%s: PR_NEW failed\n", program_name);
    320    failed_already = 1;
    321    return;
    322  }
    323  sp->iod.socket = sockfd;
    324  sp->iod.timeout = PR_SecondsToInterval(60);
    325  sp->datalen = tcp_mesg_size;
    326  sp->exit_mon = sc_mon;
    327  sp->job_counterp = &job_counter;
    328  sp->conn_counter = 0;
    329  sp->tp = tp;
    330  sp->netaddr = netaddr;
    331 
    332  /* create and cancel an io job */
    333  jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp, PR_FALSE);
    334  PR_ASSERT(NULL != jobp);
    335  rval = PR_CancelJob(jobp);
    336  PR_ASSERT(PR_SUCCESS == rval);
    337 
    338  /*
    339   * create the client process
    340   */
    341  {
    342 #define MAX_ARGS 4
    343    char* argv[MAX_ARGS + 1];
    344    int index = 0;
    345    char port[32];
    346    char path[1024 + sizeof("/thrpool_client")];
    347 
    348    getcwd(path, sizeof(path));
    349 
    350    (void)strcat(path, "/thrpool_client");
    351 #ifdef XP_PC
    352    (void)strcat(path, ".exe");
    353 #endif
    354    argv[index++] = path;
    355    sprintf(port, "%d", PR_ntohs(netaddr.inet.port));
    356    if (_debug_on) {
    357      argv[index++] = "-d";
    358      argv[index++] = "-p";
    359      argv[index++] = port;
    360      argv[index++] = NULL;
    361    } else {
    362      argv[index++] = "-p";
    363      argv[index++] = port;
    364      argv[index++] = NULL;
    365    }
    366    PR_ASSERT(MAX_ARGS >= (index - 1));
    367 
    368    DPRINTF(("creating client process %s ...\n", path));
    369    if (PR_FAILURE == PR_CreateProcessDetached(path, argv, NULL, NULL)) {
    370      fprintf(stderr,
    371              "thrpool_server: ERROR - PR_CreateProcessDetached failed\n");
    372      failed_already = 1;
    373      return;
    374    }
    375  }
    376 
    377  sc_mon = PR_NewMonitor();
    378  if (sc_mon == NULL) {
    379    fprintf(stderr, "%s: PR_NewMonitor failed\n", program_name);
    380    failed_already = 1;
    381    return;
    382  }
    383 
    384  sp->iod.socket = sockfd;
    385  sp->iod.timeout = PR_SecondsToInterval(60);
    386  sp->datalen = tcp_mesg_size;
    387  sp->exit_mon = sc_mon;
    388  sp->job_counterp = &job_counter;
    389  sp->conn_counter = 0;
    390  sp->tp = tp;
    391  sp->netaddr = netaddr;
    392 
    393  /* create and cancel a timer job */
    394  jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(5000), print_stats, sp,
    395                           PR_FALSE);
    396  PR_ASSERT(NULL != jobp);
    397  rval = PR_CancelJob(jobp);
    398  PR_ASSERT(PR_SUCCESS == rval);
    399 
    400  DPRINTF(("TCP_Server: Accepting connections \n"));
    401 
    402  jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp, PR_FALSE);
    403  PR_ASSERT(NULL != jobp);
    404  return;
    405 }
    406 
    407 static void TCP_Server_Accept(void* arg) {
    408  Server_Param* sp = (Server_Param*)arg;
    409  PRThreadPool* tp = sp->tp;
    410  Serve_Client_Param* scp;
    411  PRFileDesc* newsockfd;
    412  PRJob* jobp;
    413 
    414  if ((newsockfd = PR_Accept(sp->iod.socket, &sp->netaddr,
    415                             PR_INTERVAL_NO_TIMEOUT)) == NULL) {
    416    fprintf(stderr, "%s: ERROR - PR_Accept failed\n", program_name);
    417    failed_already = 1;
    418    goto exit;
    419  }
    420  scp = PR_NEW(Serve_Client_Param);
    421  if (scp == NULL) {
    422    fprintf(stderr, "%s: PR_NEW failed\n", program_name);
    423    failed_already = 1;
    424    goto exit;
    425  }
    426 
    427  /*
    428   * Start a Serve_Client job for each incoming connection
    429   */
    430  scp->iod.socket = newsockfd;
    431  scp->iod.timeout = PR_SecondsToInterval(60);
    432  scp->datalen = tcp_mesg_size;
    433  scp->exit_mon = sp->exit_mon;
    434  scp->job_counterp = sp->job_counterp;
    435  scp->tp = sp->tp;
    436 
    437  PR_EnterMonitor(sp->exit_mon);
    438  (*sp->job_counterp)++;
    439  PR_ExitMonitor(sp->exit_mon);
    440  jobp = PR_QueueJob(tp, Serve_Client, scp, PR_FALSE);
    441 
    442  PR_ASSERT(NULL != jobp);
    443  DPRINTF(("TCP_Server: Created Serve_Client = 0x%lx\n", jobp));
    444 
    445  /*
    446   * single-threaded update; no lock needed
    447   */
    448  sp->conn_counter++;
    449  if (sp->conn_counter < (num_tcp_clients * num_tcp_connections_per_client)) {
    450    jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp, PR_FALSE);
    451    PR_ASSERT(NULL != jobp);
    452    return;
    453  }
    454  jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(500), print_stats, sp,
    455                           PR_FALSE);
    456 
    457  PR_ASSERT(NULL != jobp);
    458  DPRINTF(("TCP_Server: Created print_stats timer job = 0x%lx\n", jobp));
    459 
    460 exit:
    461  PR_EnterMonitor(sp->exit_mon);
    462  /* Wait for server jobs to finish */
    463  while (0 != *sp->job_counterp) {
    464    PR_Wait(sp->exit_mon, PR_INTERVAL_NO_TIMEOUT);
    465    DPRINTF(("TCP_Server: conn_counter = %d\n", *sp->job_counterp));
    466  }
    467 
    468  PR_ExitMonitor(sp->exit_mon);
    469  if (sp->iod.socket) {
    470    PR_Close(sp->iod.socket);
    471  }
    472  PR_DestroyMonitor(sp->exit_mon);
    473  printf("%30s", "TCP_Socket_Client_Server_Test:");
    474  printf("%2ld Server %2ld Clients %2ld connections_per_client\n", 1l,
    475         num_tcp_clients, num_tcp_connections_per_client);
    476  printf("%30s %2ld messages_per_connection %4ld bytes_per_message\n", ":",
    477         num_tcp_mesgs_per_connection, tcp_mesg_size);
    478 
    479  DPRINTF(("%s: calling PR_ShutdownThreadPool\n", program_name));
    480  PR_ShutdownThreadPool(sp->tp);
    481  PR_DELETE(sp);
    482 }
    483 
    484 /************************************************************************/
    485 
    486 #define DEFAULT_INITIAL_THREADS 4
    487 #define DEFAULT_MAX_THREADS 100
    488 #define DEFAULT_STACKSIZE (512 * 1024)
    489 
    490 int main(int argc, char** argv) {
    491  PRInt32 initial_threads = DEFAULT_INITIAL_THREADS;
    492  PRInt32 max_threads = DEFAULT_MAX_THREADS;
    493  PRInt32 stacksize = DEFAULT_STACKSIZE;
    494  PRThreadPool* tp = NULL;
    495  PRStatus rv;
    496  PRJob* jobp;
    497 
    498  /*
    499   * -d           debug mode
    500   */
    501  PLOptStatus os;
    502  PLOptState* opt;
    503 
    504  program_name = argv[0];
    505  opt = PL_CreateOptState(argc, argv, "d");
    506  while (PL_OPT_EOL != (os = PL_GetNextOpt(opt))) {
    507    if (PL_OPT_BAD == os) {
    508      continue;
    509    }
    510    switch (opt->option) {
    511      case 'd': /* debug mode */
    512        _debug_on = 1;
    513        break;
    514      default:
    515        break;
    516    }
    517  }
    518  PL_DestroyOptState(opt);
    519 
    520  PR_Init(PR_USER_THREAD, PR_PRIORITY_NORMAL, 0);
    521 
    522  PR_SetConcurrency(4);
    523 
    524  tp = PR_CreateThreadPool(initial_threads, max_threads, stacksize);
    525  if (NULL == tp) {
    526    printf("PR_CreateThreadPool failed\n");
    527    failed_already = 1;
    528    goto done;
    529  }
    530  jobp = PR_QueueJob(tp, TCP_Server, tp, PR_TRUE);
    531  rv = PR_JoinJob(jobp);
    532  PR_ASSERT(PR_SUCCESS == rv);
    533 
    534  DPRINTF(("%s: calling PR_JoinThreadPool\n", program_name));
    535  rv = PR_JoinThreadPool(tp);
    536  PR_ASSERT(PR_SUCCESS == rv);
    537  DPRINTF(("%s: returning from PR_JoinThreadPool\n", program_name));
    538 
    539 done:
    540  PR_Cleanup();
    541  if (failed_already) {
    542    return 1;
    543  } else {
    544    return 0;
    545  }
    546 }