mainloop_pubsub.c (4565B)
1 /* Copyright (c) 2001, Matej Pfajfar. 2 * Copyright (c) 2001-2004, Roger Dingledine. 3 * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. 4 * Copyright (c) 2007-2021, The Tor Project, Inc. */ 5 /* See LICENSE for licensing information */ 6 7 /** 8 * @file mainloop_pubsub.c 9 * @brief Connect the publish-subscribe code to the main-loop. 10 * 11 * This module is responsible for instantiating all the channels used by the 12 * publish-subscribe code, and making sure that each one's messages are 13 * processed when appropriate. 14 **/ 15 16 #include "orconfig.h" 17 18 #include "core/or/or.h" 19 #include "core/mainloop/mainloop.h" 20 #include "core/mainloop/mainloop_pubsub.h" 21 22 #include "lib/container/smartlist.h" 23 #include "lib/dispatch/dispatch.h" 24 #include "lib/dispatch/dispatch_naming.h" 25 #include "lib/evloop/compat_libevent.h" 26 #include "lib/pubsub/pubsub.h" 27 #include "lib/pubsub/pubsub_build.h" 28 29 /** 30 * Dispatcher to use for delivering messages. 31 **/ 32 static dispatch_t *the_dispatcher = NULL; 33 static pubsub_items_t *the_pubsub_items = NULL; 34 /** 35 * A list of mainloop_event_t, indexed by channel ID, to flush the messages 36 * on a channel. 37 **/ 38 static smartlist_t *alert_events = NULL; 39 40 /** 41 * Mainloop event callback: flush all the messages in a channel. 42 * 43 * The channel is encoded as a pointer, and passed via arg. 44 **/ 45 static void 46 flush_channel_event(mainloop_event_t *ev, void *arg) 47 { 48 (void)ev; 49 if (!the_dispatcher) 50 return; 51 52 channel_id_t chan = (channel_id_t)(uintptr_t)(arg); 53 dispatch_flush(the_dispatcher, chan, INT_MAX); 54 } 55 56 /** 57 * Construct our global pubsub object from <b>builder</b>. Return 0 on 58 * success, -1 on failure. */ 59 int 60 tor_mainloop_connect_pubsub(struct pubsub_builder_t *builder) 61 { 62 int rv = -1; 63 tor_mainloop_disconnect_pubsub(); 64 65 the_dispatcher = pubsub_builder_finalize(builder, &the_pubsub_items); 66 if (! the_dispatcher) 67 goto err; 68 69 rv = 0; 70 goto done; 71 err: 72 tor_mainloop_disconnect_pubsub(); 73 done: 74 return rv; 75 } 76 77 /** 78 * Install libevent events for all of the pubsub channels. 79 * 80 * Invoke this after tor_mainloop_connect_pubsub, and after libevent has been 81 * initialized. 82 */ 83 void 84 tor_mainloop_connect_pubsub_events(void) 85 { 86 tor_assert(the_dispatcher); 87 tor_assert(! alert_events); 88 89 const size_t num_channels = get_num_channel_ids(); 90 alert_events = smartlist_new(); 91 for (size_t i = 0; i < num_channels; ++i) { 92 smartlist_add(alert_events, 93 mainloop_event_postloop_new(flush_channel_event, 94 (void*)(uintptr_t)(i))); 95 } 96 } 97 98 /** 99 * Dispatch alertfn callback: do nothing. Implements DELIV_NEVER. 100 **/ 101 static void 102 alertfn_never(dispatch_t *d, channel_id_t chan, void *arg) 103 { 104 (void)d; 105 (void)chan; 106 (void)arg; 107 } 108 109 /** 110 * Dispatch alertfn callback: activate a mainloop event. Implements 111 * DELIV_PROMPT. 112 **/ 113 static void 114 alertfn_prompt(dispatch_t *d, channel_id_t chan, void *arg) 115 { 116 (void)d; 117 (void)chan; 118 mainloop_event_t *event = arg; 119 mainloop_event_activate(event); 120 } 121 122 /** 123 * Dispatch alertfn callback: flush all messages right now. Implements 124 * DELIV_IMMEDIATE. 125 **/ 126 static void 127 alertfn_immediate(dispatch_t *d, channel_id_t chan, void *arg) 128 { 129 (void) arg; 130 dispatch_flush(d, chan, INT_MAX); 131 } 132 133 /** 134 * Set the strategy to be used for delivering messages on the named channel. 135 * 136 * This function needs to be called once globally for each channel, to 137 * set up how messages are delivered. 138 **/ 139 int 140 tor_mainloop_set_delivery_strategy(const char *msg_channel_name, 141 deliv_strategy_t strategy) 142 { 143 channel_id_t chan = get_channel_id(msg_channel_name); 144 if (BUG(chan == ERROR_ID) || 145 BUG(chan >= smartlist_len(alert_events))) 146 return -1; 147 148 switch (strategy) { 149 case DELIV_NEVER: 150 dispatch_set_alert_fn(the_dispatcher, chan, alertfn_never, NULL); 151 break; 152 case DELIV_PROMPT: 153 dispatch_set_alert_fn(the_dispatcher, chan, alertfn_prompt, 154 smartlist_get(alert_events, chan)); 155 break; 156 case DELIV_IMMEDIATE: 157 dispatch_set_alert_fn(the_dispatcher, chan, alertfn_immediate, NULL); 158 break; 159 } 160 return 0; 161 } 162 163 /** 164 * Remove all pubsub dispatchers and events from the mainloop. 165 **/ 166 void 167 tor_mainloop_disconnect_pubsub(void) 168 { 169 if (the_pubsub_items) { 170 pubsub_items_clear_bindings(the_pubsub_items); 171 pubsub_items_free(the_pubsub_items); 172 } 173 if (alert_events) { 174 SMARTLIST_FOREACH(alert_events, mainloop_event_t *, ev, 175 mainloop_event_free(ev)); 176 smartlist_free(alert_events); 177 } 178 dispatch_free(the_dispatcher); 179 }