pubsub_build.c (7907B)
1 /* Copyright (c) 2001, Matej Pfajfar. 2 * Copyright (c) 2001-2004, Roger Dingledine. 3 * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. 4 * Copyright (c) 2007-2021, The Tor Project, Inc. */ 5 /* See LICENSE for licensing information */ 6 7 /** 8 * @file pubsub_build.c 9 * @brief Construct a dispatch_t in safer, more OO way. 10 **/ 11 12 #define PUBSUB_PRIVATE 13 14 #include "lib/dispatch/dispatch.h" 15 #include "lib/dispatch/dispatch_cfg.h" 16 #include "lib/dispatch/dispatch_naming.h" 17 #include "lib/dispatch/msgtypes.h" 18 #include "lib/pubsub/pubsub_flags.h" 19 #include "lib/pubsub/pub_binding_st.h" 20 #include "lib/pubsub/pubsub_build.h" 21 #include "lib/pubsub/pubsub_builder_st.h" 22 #include "lib/pubsub/pubsub_connect.h" 23 24 #include "lib/container/smartlist.h" 25 #include "lib/log/util_bug.h" 26 #include "lib/malloc/malloc.h" 27 28 #include <string.h> 29 30 /** Construct and return a new empty pubsub_items_t. */ 31 static pubsub_items_t * 32 pubsub_items_new(void) 33 { 34 pubsub_items_t *cfg = tor_malloc_zero(sizeof(*cfg)); 35 cfg->items = smartlist_new(); 36 cfg->type_items = smartlist_new(); 37 return cfg; 38 } 39 40 /** Release all storage held in a pubsub_items_t. */ 41 void 42 pubsub_items_free_(pubsub_items_t *cfg) 43 { 44 if (! cfg) 45 return; 46 SMARTLIST_FOREACH(cfg->items, pubsub_cfg_t *, item, tor_free(item)); 47 SMARTLIST_FOREACH(cfg->type_items, 48 pubsub_type_cfg_t *, item, tor_free(item)); 49 smartlist_free(cfg->items); 50 smartlist_free(cfg->type_items); 51 tor_free(cfg); 52 } 53 54 /** Construct and return a new pubsub_builder_t. */ 55 pubsub_builder_t * 56 pubsub_builder_new(void) 57 { 58 dispatch_naming_init(); 59 60 pubsub_builder_t *pb = tor_malloc_zero(sizeof(*pb)); 61 pb->cfg = dcfg_new(); 62 pb->items = pubsub_items_new(); 63 return pb; 64 } 65 66 /** 67 * Release all storage held by a pubsub_builder_t. 68 * 69 * You'll (mostly) only want to call this function on an error case: if you're 70 * constructing a dispatch_t instead, you should call 71 * pubsub_builder_finalize() to consume the pubsub_builder_t. 72 */ 73 void 74 pubsub_builder_free_(pubsub_builder_t *pb) 75 { 76 if (pb == NULL) 77 return; 78 pubsub_items_free(pb->items); 79 dcfg_free(pb->cfg); 80 tor_free(pb); 81 } 82 83 /** 84 * Create and return a pubsub_connector_t for the subsystem with ID 85 * <b>subsys</b> to use in adding publications, subscriptions, and types to 86 * <b>builder</b>. 87 **/ 88 pubsub_connector_t * 89 pubsub_connector_for_subsystem(pubsub_builder_t *builder, 90 subsys_id_t subsys) 91 { 92 tor_assert(builder); 93 ++builder->n_connectors; 94 95 pubsub_connector_t *con = tor_malloc_zero(sizeof(*con)); 96 97 con->builder = builder; 98 con->subsys_id = subsys; 99 100 return con; 101 } 102 103 /** 104 * Release all storage held by a pubsub_connector_t. 105 **/ 106 void 107 pubsub_connector_free_(pubsub_connector_t *con) 108 { 109 if (!con) 110 return; 111 112 if (con->builder) { 113 --con->builder->n_connectors; 114 tor_assert(con->builder->n_connectors >= 0); 115 } 116 tor_free(con); 117 } 118 119 /** 120 * Use <b>con</b> to add a request for being able to publish messages of type 121 * <b>msg</b> with auxiliary data of <b>type</b> on <b>channel</b>. 122 **/ 123 int 124 pubsub_add_pub_(pubsub_connector_t *con, 125 pub_binding_t *out, 126 channel_id_t channel, 127 message_id_t msg, 128 msg_type_id_t type, 129 unsigned flags, 130 const char *file, 131 unsigned line) 132 { 133 pubsub_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg)); 134 135 memset(out, 0, sizeof(*out)); 136 cfg->is_publish = true; 137 138 out->msg_template.sender = cfg->subsys = con->subsys_id; 139 out->msg_template.channel = cfg->channel = channel; 140 out->msg_template.msg = cfg->msg = msg; 141 out->msg_template.type = cfg->type = type; 142 143 cfg->flags = flags; 144 cfg->added_by_file = file; 145 cfg->added_by_line = line; 146 147 /* We're grabbing a pointer to the pub_binding_t so we can tell it about 148 * the dispatcher later on. 149 */ 150 cfg->pub_binding = out; 151 152 smartlist_add(con->builder->items->items, cfg); 153 154 if (dcfg_msg_set_type(con->builder->cfg, msg, type) < 0) 155 goto err; 156 if (dcfg_msg_set_chan(con->builder->cfg, msg, channel) < 0) 157 goto err; 158 159 return 0; 160 err: 161 ++con->builder->n_errors; 162 return -1; 163 } 164 165 /** 166 * Use <b>con</b> to add a request for being able to publish messages of type 167 * <b>msg</b> with auxiliary data of <b>type</b> on <b>channel</b>, 168 * passing them to the callback in <b>recv_fn</b>. 169 **/ 170 int 171 pubsub_add_sub_(pubsub_connector_t *con, 172 recv_fn_t recv_fn, 173 channel_id_t channel, 174 message_id_t msg, 175 msg_type_id_t type, 176 unsigned flags, 177 const char *file, 178 unsigned line) 179 { 180 pubsub_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg)); 181 182 cfg->is_publish = false; 183 cfg->subsys = con->subsys_id; 184 cfg->channel = channel; 185 cfg->msg = msg; 186 cfg->type = type; 187 cfg->flags = flags; 188 cfg->added_by_file = file; 189 cfg->added_by_line = line; 190 191 cfg->recv_fn = recv_fn; 192 193 smartlist_add(con->builder->items->items, cfg); 194 195 if (dcfg_msg_set_type(con->builder->cfg, msg, type) < 0) 196 goto err; 197 if (dcfg_msg_set_chan(con->builder->cfg, msg, channel) < 0) 198 goto err; 199 if (! (flags & DISP_FLAG_STUB)) { 200 if (dcfg_add_recv(con->builder->cfg, msg, cfg->subsys, recv_fn) < 0) 201 goto err; 202 } 203 204 return 0; 205 err: 206 ++con->builder->n_errors; 207 return -1; 208 } 209 210 /** 211 * Use <b>con</b> to define the functions to use for manipulating the type 212 * <b>type</b>. Any function pointers left as NULL will be implemented as 213 * no-ops. 214 **/ 215 int 216 pubsub_connector_register_type_(pubsub_connector_t *con, 217 msg_type_id_t type, 218 dispatch_typefns_t *fns, 219 const char *file, 220 unsigned line) 221 { 222 pubsub_type_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg)); 223 cfg->type = type; 224 memcpy(&cfg->fns, fns, sizeof(*fns)); 225 cfg->subsys = con->subsys_id; 226 cfg->added_by_file = file; 227 cfg->added_by_line = line; 228 229 smartlist_add(con->builder->items->type_items, cfg); 230 231 if (dcfg_type_set_fns(con->builder->cfg, type, fns) < 0) 232 goto err; 233 234 return 0; 235 err: 236 ++con->builder->n_errors; 237 return -1; 238 } 239 240 /** 241 * Initialize the dispatch_ptr field in every relevant publish binding 242 * for <b>d</b>. 243 */ 244 static void 245 pubsub_items_install_bindings(pubsub_items_t *items, 246 dispatch_t *d) 247 { 248 SMARTLIST_FOREACH_BEGIN(items->items, pubsub_cfg_t *, cfg) { 249 if (cfg->pub_binding) { 250 // XXXX we could skip this for STUB publishers, and for any publishers 251 // XXXX where all subscribers are STUB. 252 cfg->pub_binding->dispatch_ptr = d; 253 } 254 } SMARTLIST_FOREACH_END(cfg); 255 } 256 257 /** 258 * Remove the dispatch_ptr fields for all the relevant publish bindings 259 * in <b>items</b>. The prevents subsequent dispatch_pub_() calls from 260 * sending messages to a dispatcher that has been freed. 261 **/ 262 void 263 pubsub_items_clear_bindings(pubsub_items_t *items) 264 { 265 SMARTLIST_FOREACH_BEGIN(items->items, pubsub_cfg_t *, cfg) { 266 if (cfg->pub_binding) { 267 cfg->pub_binding->dispatch_ptr = NULL; 268 } 269 } SMARTLIST_FOREACH_END(cfg); 270 } 271 272 /** 273 * Create a new dispatcher as configured in a pubsub_builder_t. 274 * 275 * Consumes and frees its input. 276 **/ 277 dispatch_t * 278 pubsub_builder_finalize(pubsub_builder_t *builder, 279 pubsub_items_t **items_out) 280 { 281 dispatch_t *dispatcher = NULL; 282 tor_assert_nonfatal(builder->n_connectors == 0); 283 284 if (pubsub_builder_check(builder) < 0) 285 goto err; 286 287 if (builder->n_errors) { 288 log_warn(LD_GENERAL, "At least one error occurred previously when " 289 "configuring the dispatcher."); 290 goto err; 291 } 292 293 dispatcher = dispatch_new(builder->cfg); 294 295 if (!dispatcher) 296 goto err; 297 298 pubsub_items_install_bindings(builder->items, dispatcher); 299 if (items_out) { 300 *items_out = builder->items; 301 builder->items = NULL; /* Prevent free */ 302 } 303 304 err: 305 pubsub_builder_free(builder); 306 return dispatcher; 307 }