tor

The Tor anonymity network
git clone https://git.dasho.dev/tor.git
Log | Files | Refs | README | LICENSE

mainloop_pubsub.c (4565B)


      1 /* Copyright (c) 2001, Matej Pfajfar.
      2 * Copyright (c) 2001-2004, Roger Dingledine.
      3 * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
      4 * Copyright (c) 2007-2021, The Tor Project, Inc. */
      5 /* See LICENSE for licensing information */
      6 
      7 /**
      8 * @file mainloop_pubsub.c
      9 * @brief Connect the publish-subscribe code to the main-loop.
     10 *
     11 * This module is responsible for instantiating all the channels used by the
     12 * publish-subscribe code, and making sure that each one's messages are
     13 * processed when appropriate.
     14 **/
     15 
     16 #include "orconfig.h"
     17 
     18 #include "core/or/or.h"
     19 #include "core/mainloop/mainloop.h"
     20 #include "core/mainloop/mainloop_pubsub.h"
     21 
     22 #include "lib/container/smartlist.h"
     23 #include "lib/dispatch/dispatch.h"
     24 #include "lib/dispatch/dispatch_naming.h"
     25 #include "lib/evloop/compat_libevent.h"
     26 #include "lib/pubsub/pubsub.h"
     27 #include "lib/pubsub/pubsub_build.h"
     28 
     29 /**
     30 * Dispatcher to use for delivering messages.
     31 **/
     32 static dispatch_t *the_dispatcher = NULL;
     33 static pubsub_items_t *the_pubsub_items = NULL;
     34 /**
     35 * A list of mainloop_event_t, indexed by channel ID, to flush the messages
     36 * on a channel.
     37 **/
     38 static smartlist_t *alert_events = NULL;
     39 
     40 /**
     41 * Mainloop event callback: flush all the messages in a channel.
     42 *
     43 * The channel is encoded as a pointer, and passed via arg.
     44 **/
     45 static void
     46 flush_channel_event(mainloop_event_t *ev, void *arg)
     47 {
     48  (void)ev;
     49  if (!the_dispatcher)
     50    return;
     51 
     52  channel_id_t chan = (channel_id_t)(uintptr_t)(arg);
     53  dispatch_flush(the_dispatcher, chan, INT_MAX);
     54 }
     55 
     56 /**
     57 * Construct our global pubsub object from <b>builder</b>. Return 0 on
     58 * success, -1 on failure. */
     59 int
     60 tor_mainloop_connect_pubsub(struct pubsub_builder_t *builder)
     61 {
     62  int rv = -1;
     63  tor_mainloop_disconnect_pubsub();
     64 
     65  the_dispatcher = pubsub_builder_finalize(builder, &the_pubsub_items);
     66  if (! the_dispatcher)
     67    goto err;
     68 
     69  rv = 0;
     70  goto done;
     71 err:
     72  tor_mainloop_disconnect_pubsub();
     73 done:
     74  return rv;
     75 }
     76 
     77 /**
     78 * Install libevent events for all of the pubsub channels.
     79 *
     80 * Invoke this after tor_mainloop_connect_pubsub, and after libevent has been
     81 * initialized.
     82 */
     83 void
     84 tor_mainloop_connect_pubsub_events(void)
     85 {
     86  tor_assert(the_dispatcher);
     87  tor_assert(! alert_events);
     88 
     89  const size_t num_channels = get_num_channel_ids();
     90  alert_events = smartlist_new();
     91  for (size_t i = 0; i < num_channels; ++i) {
     92    smartlist_add(alert_events,
     93                  mainloop_event_postloop_new(flush_channel_event,
     94                                              (void*)(uintptr_t)(i)));
     95  }
     96 }
     97 
     98 /**
     99 * Dispatch alertfn callback: do nothing. Implements DELIV_NEVER.
    100 **/
    101 static void
    102 alertfn_never(dispatch_t *d, channel_id_t chan, void *arg)
    103 {
    104  (void)d;
    105  (void)chan;
    106  (void)arg;
    107 }
    108 
    109 /**
    110 * Dispatch alertfn callback: activate a mainloop event. Implements
    111 * DELIV_PROMPT.
    112 **/
    113 static void
    114 alertfn_prompt(dispatch_t *d, channel_id_t chan, void *arg)
    115 {
    116  (void)d;
    117  (void)chan;
    118  mainloop_event_t *event = arg;
    119  mainloop_event_activate(event);
    120 }
    121 
    122 /**
    123 * Dispatch alertfn callback: flush all messages right now. Implements
    124 * DELIV_IMMEDIATE.
    125 **/
    126 static void
    127 alertfn_immediate(dispatch_t *d, channel_id_t chan, void *arg)
    128 {
    129  (void) arg;
    130  dispatch_flush(d, chan, INT_MAX);
    131 }
    132 
    133 /**
    134 * Set the strategy to be used for delivering messages on the named channel.
    135 *
    136 * This function needs to be called once globally for each channel, to
    137 * set up how messages are delivered.
    138 **/
    139 int
    140 tor_mainloop_set_delivery_strategy(const char *msg_channel_name,
    141                                   deliv_strategy_t strategy)
    142 {
    143  channel_id_t chan = get_channel_id(msg_channel_name);
    144  if (BUG(chan == ERROR_ID) ||
    145      BUG(chan >= smartlist_len(alert_events)))
    146    return -1;
    147 
    148  switch (strategy) {
    149    case DELIV_NEVER:
    150      dispatch_set_alert_fn(the_dispatcher, chan, alertfn_never, NULL);
    151      break;
    152    case DELIV_PROMPT:
    153      dispatch_set_alert_fn(the_dispatcher, chan, alertfn_prompt,
    154                            smartlist_get(alert_events, chan));
    155      break;
    156    case DELIV_IMMEDIATE:
    157      dispatch_set_alert_fn(the_dispatcher, chan, alertfn_immediate, NULL);
    158      break;
    159  }
    160  return 0;
    161 }
    162 
    163 /**
    164 * Remove all pubsub dispatchers and events from the mainloop.
    165 **/
    166 void
    167 tor_mainloop_disconnect_pubsub(void)
    168 {
    169  if (the_pubsub_items) {
    170    pubsub_items_clear_bindings(the_pubsub_items);
    171    pubsub_items_free(the_pubsub_items);
    172  }
    173  if (alert_events) {
    174    SMARTLIST_FOREACH(alert_events, mainloop_event_t *, ev,
    175                      mainloop_event_free(ev));
    176    smartlist_free(alert_events);
    177  }
    178  dispatch_free(the_dispatcher);
    179 }