dispatch_core.c (6244B)
1 /* Copyright (c) 2001, Matej Pfajfar. 2 * Copyright (c) 2001-2004, Roger Dingledine. 3 * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. 4 * Copyright (c) 2007-2021, The Tor Project, Inc. */ 5 /* See LICENSE for licensing information */ 6 7 /** 8 * \file dispatch_core.c 9 * \brief Core module for sending and receiving messages. 10 */ 11 12 #define DISPATCH_PRIVATE 13 #include "orconfig.h" 14 15 #include "lib/dispatch/dispatch.h" 16 #include "lib/dispatch/dispatch_st.h" 17 #include "lib/dispatch/dispatch_naming.h" 18 19 #include "lib/malloc/malloc.h" 20 #include "lib/log/util_bug.h" 21 22 #include <string.h> 23 24 /** 25 * Use <b>d</b> to drop all storage held for <b>msg</b>. 26 * 27 * (We need the dispatcher so we know how to free the auxiliary data.) 28 **/ 29 void 30 dispatch_free_msg_(const dispatch_t *d, msg_t *msg) 31 { 32 if (!msg) 33 return; 34 35 d->typefns[msg->type].free_fn(msg->aux_data__); 36 tor_free(msg); 37 } 38 39 /** 40 * Format the auxiliary data held by msg. 41 **/ 42 char * 43 dispatch_fmt_msg_data(const dispatch_t *d, const msg_t *msg) 44 { 45 if (!msg) 46 return NULL; 47 48 return d->typefns[msg->type].fmt_fn(msg->aux_data__); 49 } 50 51 /** 52 * Release all storage held by <b>d</b>. 53 **/ 54 void 55 dispatch_free_(dispatch_t *d) 56 { 57 if (d == NULL) 58 return; 59 60 size_t n_queues = d->n_queues; 61 for (size_t i = 0; i < n_queues; ++i) { 62 msg_t *m, *mtmp; 63 TOR_SIMPLEQ_FOREACH_SAFE(m, &d->queues[i].queue, next, mtmp) { 64 dispatch_free_msg(d, m); 65 } 66 } 67 68 size_t n_msgs = d->n_msgs; 69 70 for (size_t i = 0; i < n_msgs; ++i) { 71 tor_free(d->table[i]); 72 } 73 tor_free(d->table); 74 tor_free(d->typefns); 75 tor_free(d->queues); 76 77 // This is the only time we will treat d->cfg as non-const. 78 //dispatch_cfg_free_((dispatch_items_t *) d->cfg); 79 80 tor_free(d); 81 } 82 83 /** 84 * Tell the dispatcher to call <b>fn</b> with <b>userdata</b> whenever 85 * <b>chan</b> becomes nonempty. Return 0 on success, -1 on error. 86 **/ 87 int 88 dispatch_set_alert_fn(dispatch_t *d, channel_id_t chan, 89 dispatch_alertfn_t fn, void *userdata) 90 { 91 if (BUG(chan >= d->n_queues)) 92 return -1; 93 94 dqueue_t *q = &d->queues[chan]; 95 q->alert_fn = fn; 96 q->alert_fn_arg = userdata; 97 return 0; 98 } 99 100 /** 101 * Send a message on the appropriate channel notifying that channel if 102 * necessary. 103 * 104 * This function takes ownership of the auxiliary data; it can't be static or 105 * stack-allocated, and the caller is not allowed to use it afterwards. 106 * 107 * This function does not check the various vields of the message object for 108 * consistency. 109 **/ 110 int 111 dispatch_send(dispatch_t *d, 112 subsys_id_t sender, 113 channel_id_t channel, 114 message_id_t msg, 115 msg_type_id_t type, 116 msg_aux_data_t auxdata) 117 { 118 if (!d->table[msg]) { 119 /* Fast path: nobody wants this data. */ 120 121 d->typefns[type].free_fn(auxdata); 122 return 0; 123 } 124 125 msg_t *m = tor_malloc(sizeof(msg_t)); 126 127 m->sender = sender; 128 m->channel = channel; 129 m->msg = msg; 130 m->type = type; 131 memcpy(&m->aux_data__, &auxdata, sizeof(msg_aux_data_t)); 132 133 return dispatch_send_msg(d, m); 134 } 135 136 int 137 dispatch_send_msg(dispatch_t *d, msg_t *m) 138 { 139 if (BUG(!d)) 140 goto err; 141 if (BUG(!m)) 142 goto err; 143 if (BUG(m->channel >= d->n_queues)) 144 goto err; 145 if (BUG(m->msg >= d->n_msgs)) 146 goto err; 147 148 dtbl_entry_t *ent = d->table[m->msg]; 149 if (ent) { 150 if (BUG(m->type != ent->type)) 151 goto err; 152 if (BUG(m->channel != ent->channel)) 153 goto err; 154 } 155 156 return dispatch_send_msg_unchecked(d, m); 157 err: 158 /* Probably it isn't safe to free m, since type could be wrong. */ 159 return -1; 160 } 161 162 /** 163 * Send a message on the appropriate queue, notifying that queue if necessary. 164 * 165 * This function takes ownership of the message object and its auxiliary data; 166 * it can't be static or stack-allocated, and the caller isn't allowed to use 167 * it afterwards. 168 * 169 * This function does not check the various fields of the message object for 170 * consistency, and can crash if they are out of range. Only functions that 171 * have already constructed the message in a safe way, or checked it for 172 * correctness themselves, should call this function. 173 **/ 174 int 175 dispatch_send_msg_unchecked(dispatch_t *d, msg_t *m) 176 { 177 /* Find the right queue. */ 178 dqueue_t *q = &d->queues[m->channel]; 179 bool was_empty = TOR_SIMPLEQ_EMPTY(&q->queue); 180 181 /* Append the message. */ 182 TOR_SIMPLEQ_INSERT_TAIL(&q->queue, m, next); 183 184 if (debug_logging_enabled()) { 185 char *arg = dispatch_fmt_msg_data(d, m); 186 log_debug(LD_MESG, 187 "Queued: %s (%s) from %s, on %s.", 188 get_message_id_name(m->msg), 189 arg, 190 get_subsys_id_name(m->sender), 191 get_channel_id_name(m->channel)); 192 tor_free(arg); 193 } 194 195 /* If we just made the queue nonempty for the first time, call the alert 196 * function. */ 197 if (was_empty) { 198 q->alert_fn(d, m->channel, q->alert_fn_arg); 199 } 200 201 return 0; 202 } 203 204 /** 205 * Run all of the callbacks on <b>d</b> associated with <b>m</b>. 206 **/ 207 static void 208 dispatcher_run_msg_cbs(const dispatch_t *d, msg_t *m) 209 { 210 tor_assert(m->msg <= d->n_msgs); 211 dtbl_entry_t *ent = d->table[m->msg]; 212 int n_fns = ent->n_fns; 213 214 if (debug_logging_enabled()) { 215 char *arg = dispatch_fmt_msg_data(d, m); 216 log_debug(LD_MESG, 217 "Delivering: %s (%s) from %s, on %s:", 218 get_message_id_name(m->msg), 219 arg, 220 get_subsys_id_name(m->sender), 221 get_channel_id_name(m->channel)); 222 tor_free(arg); 223 } 224 225 int i; 226 for (i=0; i < n_fns; ++i) { 227 if (ent->rcv[i].enabled) { 228 log_debug(LD_MESG, " Delivering to %s.", 229 get_subsys_id_name(ent->rcv[i].sys)); 230 ent->rcv[i].fn(m); 231 } 232 } 233 } 234 235 /** 236 * Run up to <b>max_msgs</b> callbacks for messages on the channel <b>ch</b> 237 * on the given dispatcher. Return 0 on success or recoverable failure, 238 * -1 on unrecoverable error. 239 **/ 240 int 241 dispatch_flush(dispatch_t *d, channel_id_t ch, int max_msgs) 242 { 243 if (BUG(ch >= d->n_queues)) 244 return 0; 245 246 int n_flushed = 0; 247 dqueue_t *q = &d->queues[ch]; 248 249 while (n_flushed < max_msgs) { 250 msg_t *m = TOR_SIMPLEQ_FIRST(&q->queue); 251 if (!m) 252 break; 253 TOR_SIMPLEQ_REMOVE_HEAD(&q->queue, next); 254 dispatcher_run_msg_cbs(d, m); 255 dispatch_free_msg(d, m); 256 ++n_flushed; 257 } 258 259 return 0; 260 }