tor

The Tor anonymity network
git clone https://git.dasho.dev/tor.git
Log | Files | Refs | README | LICENSE

pubsub_build.c (7907B)


      1 /* Copyright (c) 2001, Matej Pfajfar.
      2 * Copyright (c) 2001-2004, Roger Dingledine.
      3 * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
      4 * Copyright (c) 2007-2021, The Tor Project, Inc. */
      5 /* See LICENSE for licensing information */
      6 
      7 /**
      8 * @file pubsub_build.c
      9 * @brief Construct a dispatch_t in safer, more OO way.
     10 **/
     11 
     12 #define PUBSUB_PRIVATE
     13 
     14 #include "lib/dispatch/dispatch.h"
     15 #include "lib/dispatch/dispatch_cfg.h"
     16 #include "lib/dispatch/dispatch_naming.h"
     17 #include "lib/dispatch/msgtypes.h"
     18 #include "lib/pubsub/pubsub_flags.h"
     19 #include "lib/pubsub/pub_binding_st.h"
     20 #include "lib/pubsub/pubsub_build.h"
     21 #include "lib/pubsub/pubsub_builder_st.h"
     22 #include "lib/pubsub/pubsub_connect.h"
     23 
     24 #include "lib/container/smartlist.h"
     25 #include "lib/log/util_bug.h"
     26 #include "lib/malloc/malloc.h"
     27 
     28 #include <string.h>
     29 
     30 /** Construct and return a new empty pubsub_items_t. */
     31 static pubsub_items_t *
     32 pubsub_items_new(void)
     33 {
     34  pubsub_items_t *cfg = tor_malloc_zero(sizeof(*cfg));
     35  cfg->items = smartlist_new();
     36  cfg->type_items = smartlist_new();
     37  return cfg;
     38 }
     39 
     40 /** Release all storage held in a pubsub_items_t. */
     41 void
     42 pubsub_items_free_(pubsub_items_t *cfg)
     43 {
     44  if (! cfg)
     45    return;
     46  SMARTLIST_FOREACH(cfg->items, pubsub_cfg_t *, item, tor_free(item));
     47  SMARTLIST_FOREACH(cfg->type_items,
     48                    pubsub_type_cfg_t *, item, tor_free(item));
     49  smartlist_free(cfg->items);
     50  smartlist_free(cfg->type_items);
     51  tor_free(cfg);
     52 }
     53 
     54 /** Construct and return a new pubsub_builder_t. */
     55 pubsub_builder_t *
     56 pubsub_builder_new(void)
     57 {
     58  dispatch_naming_init();
     59 
     60  pubsub_builder_t *pb = tor_malloc_zero(sizeof(*pb));
     61  pb->cfg = dcfg_new();
     62  pb->items = pubsub_items_new();
     63  return pb;
     64 }
     65 
     66 /**
     67 * Release all storage held by a pubsub_builder_t.
     68 *
     69 * You'll (mostly) only want to call this function on an error case: if you're
     70 * constructing a dispatch_t instead, you should call
     71 * pubsub_builder_finalize() to consume the pubsub_builder_t.
     72 */
     73 void
     74 pubsub_builder_free_(pubsub_builder_t *pb)
     75 {
     76  if (pb == NULL)
     77    return;
     78  pubsub_items_free(pb->items);
     79  dcfg_free(pb->cfg);
     80  tor_free(pb);
     81 }
     82 
     83 /**
     84 * Create and return a pubsub_connector_t for the subsystem with ID
     85 * <b>subsys</b> to use in adding publications, subscriptions, and types to
     86 * <b>builder</b>.
     87 **/
     88 pubsub_connector_t *
     89 pubsub_connector_for_subsystem(pubsub_builder_t *builder,
     90                               subsys_id_t subsys)
     91 {
     92  tor_assert(builder);
     93  ++builder->n_connectors;
     94 
     95  pubsub_connector_t *con = tor_malloc_zero(sizeof(*con));
     96 
     97  con->builder = builder;
     98  con->subsys_id = subsys;
     99 
    100  return con;
    101 }
    102 
    103 /**
    104 * Release all storage held by a pubsub_connector_t.
    105 **/
    106 void
    107 pubsub_connector_free_(pubsub_connector_t *con)
    108 {
    109  if (!con)
    110    return;
    111 
    112  if (con->builder) {
    113    --con->builder->n_connectors;
    114    tor_assert(con->builder->n_connectors >= 0);
    115  }
    116  tor_free(con);
    117 }
    118 
    119 /**
    120 * Use <b>con</b> to add a request for being able to publish messages of type
    121 * <b>msg</b> with auxiliary data of <b>type</b> on <b>channel</b>.
    122 **/
    123 int
    124 pubsub_add_pub_(pubsub_connector_t *con,
    125                pub_binding_t *out,
    126                channel_id_t channel,
    127                message_id_t msg,
    128                msg_type_id_t type,
    129                unsigned flags,
    130                const char *file,
    131                unsigned line)
    132 {
    133  pubsub_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg));
    134 
    135  memset(out, 0, sizeof(*out));
    136  cfg->is_publish = true;
    137 
    138  out->msg_template.sender = cfg->subsys = con->subsys_id;
    139  out->msg_template.channel = cfg->channel = channel;
    140  out->msg_template.msg = cfg->msg = msg;
    141  out->msg_template.type = cfg->type = type;
    142 
    143  cfg->flags = flags;
    144  cfg->added_by_file = file;
    145  cfg->added_by_line = line;
    146 
    147  /* We're grabbing a pointer to the pub_binding_t so we can tell it about
    148   * the dispatcher later on.
    149   */
    150  cfg->pub_binding = out;
    151 
    152  smartlist_add(con->builder->items->items, cfg);
    153 
    154  if (dcfg_msg_set_type(con->builder->cfg, msg, type) < 0)
    155    goto err;
    156  if (dcfg_msg_set_chan(con->builder->cfg, msg, channel) < 0)
    157    goto err;
    158 
    159  return 0;
    160 err:
    161  ++con->builder->n_errors;
    162  return -1;
    163 }
    164 
    165 /**
    166 * Use <b>con</b> to add a request for being able to publish messages of type
    167 * <b>msg</b> with auxiliary data of <b>type</b> on <b>channel</b>,
    168 * passing them to the callback in <b>recv_fn</b>.
    169 **/
    170 int
    171 pubsub_add_sub_(pubsub_connector_t *con,
    172                recv_fn_t recv_fn,
    173                channel_id_t channel,
    174                message_id_t msg,
    175                msg_type_id_t type,
    176                unsigned flags,
    177                const char *file,
    178                unsigned line)
    179 {
    180  pubsub_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg));
    181 
    182  cfg->is_publish = false;
    183  cfg->subsys = con->subsys_id;
    184  cfg->channel = channel;
    185  cfg->msg = msg;
    186  cfg->type = type;
    187  cfg->flags = flags;
    188  cfg->added_by_file = file;
    189  cfg->added_by_line = line;
    190 
    191  cfg->recv_fn = recv_fn;
    192 
    193  smartlist_add(con->builder->items->items, cfg);
    194 
    195  if (dcfg_msg_set_type(con->builder->cfg, msg, type) < 0)
    196    goto err;
    197  if (dcfg_msg_set_chan(con->builder->cfg, msg, channel) < 0)
    198    goto err;
    199  if (! (flags & DISP_FLAG_STUB)) {
    200    if (dcfg_add_recv(con->builder->cfg, msg, cfg->subsys, recv_fn) < 0)
    201      goto err;
    202  }
    203 
    204  return 0;
    205 err:
    206  ++con->builder->n_errors;
    207  return -1;
    208 }
    209 
    210 /**
    211 * Use <b>con</b> to define the functions to use for manipulating the type
    212 * <b>type</b>.  Any function pointers left as NULL will be implemented as
    213 * no-ops.
    214 **/
    215 int
    216 pubsub_connector_register_type_(pubsub_connector_t *con,
    217                                msg_type_id_t type,
    218                                dispatch_typefns_t *fns,
    219                                const char *file,
    220                                unsigned line)
    221 {
    222  pubsub_type_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg));
    223  cfg->type = type;
    224  memcpy(&cfg->fns, fns, sizeof(*fns));
    225  cfg->subsys = con->subsys_id;
    226  cfg->added_by_file = file;
    227  cfg->added_by_line = line;
    228 
    229  smartlist_add(con->builder->items->type_items, cfg);
    230 
    231  if (dcfg_type_set_fns(con->builder->cfg, type, fns) < 0)
    232    goto err;
    233 
    234  return 0;
    235 err:
    236  ++con->builder->n_errors;
    237  return -1;
    238 }
    239 
    240 /**
    241 * Initialize the dispatch_ptr field in every relevant publish binding
    242 * for <b>d</b>.
    243 */
    244 static void
    245 pubsub_items_install_bindings(pubsub_items_t *items,
    246                              dispatch_t *d)
    247 {
    248  SMARTLIST_FOREACH_BEGIN(items->items, pubsub_cfg_t *, cfg) {
    249    if (cfg->pub_binding) {
    250      // XXXX we could skip this for STUB publishers, and for any publishers
    251      // XXXX where all subscribers are STUB.
    252      cfg->pub_binding->dispatch_ptr = d;
    253    }
    254  } SMARTLIST_FOREACH_END(cfg);
    255 }
    256 
    257 /**
    258 * Remove the dispatch_ptr fields for all the relevant publish bindings
    259 * in <b>items</b>.  The prevents subsequent dispatch_pub_() calls from
    260 * sending messages to a dispatcher that has been freed.
    261 **/
    262 void
    263 pubsub_items_clear_bindings(pubsub_items_t *items)
    264 {
    265  SMARTLIST_FOREACH_BEGIN(items->items, pubsub_cfg_t *, cfg) {
    266    if (cfg->pub_binding) {
    267      cfg->pub_binding->dispatch_ptr = NULL;
    268    }
    269  } SMARTLIST_FOREACH_END(cfg);
    270 }
    271 
    272 /**
    273 * Create a new dispatcher as configured in a pubsub_builder_t.
    274 *
    275 * Consumes and frees its input.
    276 **/
    277 dispatch_t *
    278 pubsub_builder_finalize(pubsub_builder_t *builder,
    279                        pubsub_items_t **items_out)
    280 {
    281  dispatch_t *dispatcher = NULL;
    282  tor_assert_nonfatal(builder->n_connectors == 0);
    283 
    284  if (pubsub_builder_check(builder) < 0)
    285    goto err;
    286 
    287  if (builder->n_errors) {
    288    log_warn(LD_GENERAL, "At least one error occurred previously when "
    289             "configuring the dispatcher.");
    290    goto err;
    291  }
    292 
    293  dispatcher = dispatch_new(builder->cfg);
    294 
    295  if (!dispatcher)
    296    goto err;
    297 
    298  pubsub_items_install_bindings(builder->items, dispatcher);
    299  if (items_out) {
    300    *items_out = builder->items;
    301    builder->items = NULL; /* Prevent free */
    302  }
    303 
    304 err:
    305  pubsub_builder_free(builder);
    306  return dispatcher;
    307 }