tor

The Tor anonymity network
git clone https://git.dasho.dev/tor.git
Log | Files | Refs | README | LICENSE

dispatch_core.c (6244B)


      1 /* Copyright (c) 2001, Matej Pfajfar.
      2 * Copyright (c) 2001-2004, Roger Dingledine.
      3 * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
      4 * Copyright (c) 2007-2021, The Tor Project, Inc. */
      5 /* See LICENSE for licensing information */
      6 
      7 /**
      8 * \file dispatch_core.c
      9 * \brief Core module for sending and receiving messages.
     10 */
     11 
     12 #define DISPATCH_PRIVATE
     13 #include "orconfig.h"
     14 
     15 #include "lib/dispatch/dispatch.h"
     16 #include "lib/dispatch/dispatch_st.h"
     17 #include "lib/dispatch/dispatch_naming.h"
     18 
     19 #include "lib/malloc/malloc.h"
     20 #include "lib/log/util_bug.h"
     21 
     22 #include <string.h>
     23 
     24 /**
     25 * Use <b>d</b> to drop all storage held for <b>msg</b>.
     26 *
     27 * (We need the dispatcher so we know how to free the auxiliary data.)
     28 **/
     29 void
     30 dispatch_free_msg_(const dispatch_t *d, msg_t *msg)
     31 {
     32  if (!msg)
     33    return;
     34 
     35  d->typefns[msg->type].free_fn(msg->aux_data__);
     36  tor_free(msg);
     37 }
     38 
     39 /**
     40 * Format the auxiliary data held by msg.
     41 **/
     42 char *
     43 dispatch_fmt_msg_data(const dispatch_t *d, const msg_t *msg)
     44 {
     45  if (!msg)
     46    return NULL;
     47 
     48  return d->typefns[msg->type].fmt_fn(msg->aux_data__);
     49 }
     50 
     51 /**
     52 * Release all storage held by <b>d</b>.
     53 **/
     54 void
     55 dispatch_free_(dispatch_t *d)
     56 {
     57  if (d == NULL)
     58    return;
     59 
     60  size_t n_queues = d->n_queues;
     61  for (size_t i = 0; i < n_queues; ++i) {
     62    msg_t *m, *mtmp;
     63    TOR_SIMPLEQ_FOREACH_SAFE(m, &d->queues[i].queue, next, mtmp) {
     64      dispatch_free_msg(d, m);
     65    }
     66  }
     67 
     68  size_t n_msgs = d->n_msgs;
     69 
     70  for (size_t i = 0; i < n_msgs; ++i) {
     71    tor_free(d->table[i]);
     72  }
     73  tor_free(d->table);
     74  tor_free(d->typefns);
     75  tor_free(d->queues);
     76 
     77  // This is the only time we will treat d->cfg as non-const.
     78  //dispatch_cfg_free_((dispatch_items_t *) d->cfg);
     79 
     80  tor_free(d);
     81 }
     82 
     83 /**
     84 * Tell the dispatcher to call <b>fn</b> with <b>userdata</b> whenever
     85 * <b>chan</b> becomes nonempty.  Return 0 on success, -1 on error.
     86 **/
     87 int
     88 dispatch_set_alert_fn(dispatch_t *d, channel_id_t chan,
     89                      dispatch_alertfn_t fn, void *userdata)
     90 {
     91  if (BUG(chan >= d->n_queues))
     92    return -1;
     93 
     94  dqueue_t *q = &d->queues[chan];
     95  q->alert_fn = fn;
     96  q->alert_fn_arg = userdata;
     97  return 0;
     98 }
     99 
    100 /**
    101 * Send a message on the appropriate channel notifying that channel if
    102 * necessary.
    103 *
    104 * This function takes ownership of the auxiliary data; it can't be static or
    105 * stack-allocated, and the caller is not allowed to use it afterwards.
    106 *
    107 * This function does not check the various vields of the message object for
    108 * consistency.
    109 **/
    110 int
    111 dispatch_send(dispatch_t *d,
    112              subsys_id_t sender,
    113              channel_id_t channel,
    114              message_id_t msg,
    115              msg_type_id_t type,
    116              msg_aux_data_t auxdata)
    117 {
    118  if (!d->table[msg]) {
    119    /* Fast path: nobody wants this data. */
    120 
    121    d->typefns[type].free_fn(auxdata);
    122    return 0;
    123  }
    124 
    125  msg_t *m = tor_malloc(sizeof(msg_t));
    126 
    127  m->sender = sender;
    128  m->channel = channel;
    129  m->msg = msg;
    130  m->type = type;
    131  memcpy(&m->aux_data__, &auxdata, sizeof(msg_aux_data_t));
    132 
    133  return dispatch_send_msg(d, m);
    134 }
    135 
    136 int
    137 dispatch_send_msg(dispatch_t *d, msg_t *m)
    138 {
    139  if (BUG(!d))
    140    goto err;
    141  if (BUG(!m))
    142    goto err;
    143  if (BUG(m->channel >= d->n_queues))
    144    goto err;
    145  if (BUG(m->msg >= d->n_msgs))
    146    goto err;
    147 
    148  dtbl_entry_t *ent = d->table[m->msg];
    149  if (ent) {
    150    if (BUG(m->type != ent->type))
    151      goto err;
    152    if (BUG(m->channel != ent->channel))
    153      goto err;
    154  }
    155 
    156  return dispatch_send_msg_unchecked(d, m);
    157 err:
    158  /* Probably it isn't safe to free m, since type could be wrong. */
    159  return -1;
    160 }
    161 
    162 /**
    163 * Send a message on the appropriate queue, notifying that queue if necessary.
    164 *
    165 * This function takes ownership of the message object and its auxiliary data;
    166 * it can't be static or stack-allocated, and the caller isn't allowed to use
    167 * it afterwards.
    168 *
    169 * This function does not check the various fields of the message object for
    170 * consistency, and can crash if they are out of range.  Only functions that
    171 * have already constructed the message in a safe way, or checked it for
    172 * correctness themselves, should call this function.
    173 **/
    174 int
    175 dispatch_send_msg_unchecked(dispatch_t *d, msg_t *m)
    176 {
    177  /* Find the right queue. */
    178  dqueue_t *q = &d->queues[m->channel];
    179  bool was_empty = TOR_SIMPLEQ_EMPTY(&q->queue);
    180 
    181  /* Append the message. */
    182  TOR_SIMPLEQ_INSERT_TAIL(&q->queue, m, next);
    183 
    184  if (debug_logging_enabled()) {
    185    char *arg = dispatch_fmt_msg_data(d, m);
    186    log_debug(LD_MESG,
    187              "Queued: %s (%s) from %s, on %s.",
    188              get_message_id_name(m->msg),
    189              arg,
    190              get_subsys_id_name(m->sender),
    191              get_channel_id_name(m->channel));
    192    tor_free(arg);
    193  }
    194 
    195  /* If we just made the queue nonempty for the first time, call the alert
    196   * function. */
    197  if (was_empty) {
    198    q->alert_fn(d, m->channel, q->alert_fn_arg);
    199  }
    200 
    201  return 0;
    202 }
    203 
    204 /**
    205 * Run all of the callbacks on <b>d</b> associated with <b>m</b>.
    206 **/
    207 static void
    208 dispatcher_run_msg_cbs(const dispatch_t *d, msg_t *m)
    209 {
    210  tor_assert(m->msg <= d->n_msgs);
    211  dtbl_entry_t *ent = d->table[m->msg];
    212  int n_fns = ent->n_fns;
    213 
    214  if (debug_logging_enabled()) {
    215    char *arg = dispatch_fmt_msg_data(d, m);
    216    log_debug(LD_MESG,
    217              "Delivering: %s (%s) from %s, on %s:",
    218              get_message_id_name(m->msg),
    219              arg,
    220              get_subsys_id_name(m->sender),
    221              get_channel_id_name(m->channel));
    222    tor_free(arg);
    223  }
    224 
    225  int i;
    226  for (i=0; i < n_fns; ++i) {
    227    if (ent->rcv[i].enabled) {
    228      log_debug(LD_MESG, "  Delivering to %s.",
    229                get_subsys_id_name(ent->rcv[i].sys));
    230      ent->rcv[i].fn(m);
    231    }
    232  }
    233 }
    234 
    235 /**
    236 * Run up to <b>max_msgs</b> callbacks for messages on the channel <b>ch</b>
    237 * on the given dispatcher.  Return 0 on success or recoverable failure,
    238 * -1 on unrecoverable error.
    239 **/
    240 int
    241 dispatch_flush(dispatch_t *d, channel_id_t ch, int max_msgs)
    242 {
    243  if (BUG(ch >= d->n_queues))
    244    return 0;
    245 
    246  int n_flushed = 0;
    247  dqueue_t *q = &d->queues[ch];
    248 
    249  while (n_flushed < max_msgs) {
    250    msg_t *m = TOR_SIMPLEQ_FIRST(&q->queue);
    251    if (!m)
    252      break;
    253    TOR_SIMPLEQ_REMOVE_HEAD(&q->queue, next);
    254    dispatcher_run_msg_cbs(d, m);
    255    dispatch_free_msg(d, m);
    256    ++n_flushed;
    257  }
    258 
    259  return 0;
    260 }