test_pubsub_msg.c (9546B)
1 /* Copyright (c) 2018-2021, The Tor Project, Inc. */ 2 /* See LICENSE for licensing information */ 3 4 #define DISPATCH_PRIVATE 5 6 #include "test/test.h" 7 8 #include "lib/dispatch/dispatch.h" 9 #include "lib/dispatch/dispatch_naming.h" 10 #include "lib/dispatch/dispatch_st.h" 11 #include "lib/dispatch/msgtypes.h" 12 #include "lib/pubsub/pubsub_flags.h" 13 #include "lib/pubsub/pub_binding_st.h" 14 #include "lib/pubsub/pubsub_build.h" 15 #include "lib/pubsub/pubsub_builder_st.h" 16 #include "lib/pubsub/pubsub_connect.h" 17 #include "lib/pubsub/pubsub_publish.h" 18 19 #include "lib/log/escape.h" 20 #include "lib/malloc/malloc.h" 21 #include "lib/string/printf.h" 22 23 #include <stdio.h> 24 #include <string.h> 25 26 static char * 27 ex_str_fmt(msg_aux_data_t aux) 28 { 29 return esc_for_log(aux.ptr); 30 } 31 static void 32 ex_str_free(msg_aux_data_t aux) 33 { 34 tor_free_(aux.ptr); 35 } 36 static dispatch_typefns_t stringfns = { 37 .free_fn = ex_str_free, 38 .fmt_fn = ex_str_fmt 39 }; 40 41 // We're using the lowest-level publish/subscribe logic here, to avoid the 42 // pubsub_macros.h macros and just test the dispatch core. We'll use a string 43 // type for everything. 44 45 #define DECLARE_MESSAGE(suffix) \ 46 static pub_binding_t pub_binding_##suffix; \ 47 static int msg_received_##suffix = 0; \ 48 static void recv_msg_##suffix(const msg_t *m) { \ 49 (void)m; \ 50 ++msg_received_##suffix; \ 51 } \ 52 EAT_SEMICOLON 53 54 #define ADD_PUBLISH(binding_suffix, subsys, channel, msg, flags) \ 55 STMT_BEGIN { \ 56 con = pubsub_connector_for_subsystem(builder, \ 57 get_subsys_id(#subsys)); \ 58 pubsub_add_pub_(con, &pub_binding_##binding_suffix, \ 59 get_channel_id(#channel), \ 60 get_message_id(#msg), get_msg_type_id("string"), \ 61 (flags), __FILE__, __LINE__); \ 62 pubsub_connector_free(con); \ 63 } STMT_END 64 65 #define ADD_SUBSCRIBE(hook_suffix, subsys, channel, msg, flags) \ 66 STMT_BEGIN { \ 67 con = pubsub_connector_for_subsystem(builder, \ 68 get_subsys_id(#subsys)); \ 69 pubsub_add_sub_(con, recv_msg_##hook_suffix, \ 70 get_channel_id(#channel), \ 71 get_message_id(#msg), get_msg_type_id("string"), \ 72 (flags), __FILE__, __LINE__); \ 73 pubsub_connector_free(con); \ 74 } STMT_END 75 76 #define SEND(binding_suffix, val) \ 77 STMT_BEGIN { \ 78 msg_aux_data_t data_; \ 79 data_.ptr = tor_strdup(val); \ 80 pubsub_pub_(&pub_binding_##binding_suffix, data_); \ 81 } STMT_END 82 83 DECLARE_MESSAGE(msg1); 84 DECLARE_MESSAGE(msg2); 85 DECLARE_MESSAGE(msg3); 86 DECLARE_MESSAGE(msg4); 87 DECLARE_MESSAGE(msg5); 88 89 static smartlist_t *strings_received = NULL; 90 static void 91 recv_msg_copy_string(const msg_t *m) 92 { 93 const char *s = m->aux_data__.ptr; 94 smartlist_add(strings_received, tor_strdup(s)); 95 } 96 97 static void * 98 setup_dispatcher(const struct testcase_t *testcase) 99 { 100 (void)testcase; 101 pubsub_builder_t *builder = pubsub_builder_new(); 102 pubsub_connector_t *con; 103 104 { 105 con = pubsub_connector_for_subsystem(builder, get_subsys_id("types")); 106 pubsub_connector_register_type_(con, 107 get_msg_type_id("string"), 108 &stringfns, 109 "nowhere.c", 99); 110 pubsub_connector_free(con); 111 } 112 // message1 has one publisher and one subscriber. 113 ADD_PUBLISH(msg1, sys1, main, message1, 0); 114 ADD_SUBSCRIBE(msg1, sys2, main, message1, 0); 115 116 // message2 has a publisher and a stub subscriber. 117 ADD_PUBLISH(msg2, sys1, main, message2, 0); 118 ADD_SUBSCRIBE(msg2, sys2, main, message2, DISP_FLAG_STUB); 119 120 // message3 has a publisher and three subscribers. 121 ADD_PUBLISH(msg3, sys1, main, message3, 0); 122 ADD_SUBSCRIBE(msg3, sys2, main, message3, 0); 123 ADD_SUBSCRIBE(msg3, sys3, main, message3, 0); 124 ADD_SUBSCRIBE(msg3, sys4, main, message3, 0); 125 126 // message4 has one publisher and two subscribers, but it's on another 127 // channel. 128 ADD_PUBLISH(msg4, sys2, other, message4, 0); 129 ADD_SUBSCRIBE(msg4, sys1, other, message4, 0); 130 ADD_SUBSCRIBE(msg4, sys3, other, message4, 0); 131 132 // message5 has a huge number of recipients. 133 ADD_PUBLISH(msg5, sys3, main, message5, 0); 134 ADD_SUBSCRIBE(msg5, sys4, main, message5, 0); 135 ADD_SUBSCRIBE(msg5, sys5, main, message5, 0); 136 ADD_SUBSCRIBE(msg5, sys6, main, message5, 0); 137 ADD_SUBSCRIBE(msg5, sys7, main, message5, 0); 138 ADD_SUBSCRIBE(msg5, sys8, main, message5, 0); 139 for (int i = 0; i < 1000-5; ++i) { 140 char *sys; 141 tor_asprintf(&sys, "xsys-%d", i); 142 con = pubsub_connector_for_subsystem(builder, get_subsys_id(sys)); 143 pubsub_add_sub_(con, recv_msg_copy_string, 144 get_channel_id("main"), 145 get_message_id("message5"), 146 get_msg_type_id("string"), 0, "here", 100); 147 pubsub_connector_free(con); 148 tor_free(sys); 149 } 150 151 return pubsub_builder_finalize(builder, NULL); 152 } 153 154 static int 155 cleanup_dispatcher(const struct testcase_t *testcase, void *dispatcher_) 156 { 157 (void)testcase; 158 dispatch_t *dispatcher = dispatcher_; 159 dispatch_free(dispatcher); 160 return 1; 161 } 162 163 static const struct testcase_setup_t dispatcher_setup = { 164 setup_dispatcher, cleanup_dispatcher 165 }; 166 167 static void 168 test_pubsub_msg_minimal(void *arg) 169 { 170 dispatch_t *d = arg; 171 172 tt_int_op(0, OP_EQ, msg_received_msg1); 173 SEND(msg1, "hello world"); 174 tt_int_op(0, OP_EQ, msg_received_msg1); // hasn't actually arrived yet. 175 176 tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000)); 177 tt_int_op(1, OP_EQ, msg_received_msg1); // we got the message! 178 179 done: 180 ; 181 } 182 183 static void 184 test_pubsub_msg_send_to_stub(void *arg) 185 { 186 dispatch_t *d = arg; 187 188 tt_int_op(0, OP_EQ, msg_received_msg2); 189 SEND(msg2, "hello silence"); 190 tt_int_op(0, OP_EQ, msg_received_msg2); // hasn't actually arrived yet. 191 192 tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000)); 193 tt_int_op(0, OP_EQ, msg_received_msg2); // doesn't arrive -- stub hook. 194 195 done: 196 ; 197 } 198 199 static void 200 test_pubsub_msg_cancel_msgs(void *arg) 201 { 202 dispatch_t *d = arg; 203 204 tt_int_op(0, OP_EQ, msg_received_msg1); 205 for (int i = 0; i < 100; ++i) { 206 SEND(msg1, "hello world"); 207 } 208 tt_int_op(0, OP_EQ, msg_received_msg1); // hasn't actually arrived yet. 209 210 tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 10)); 211 tt_int_op(10, OP_EQ, msg_received_msg1); // we got the message 10 times. 212 213 // At this point, the dispatcher will be freed with queued, undelivered 214 // messages. 215 done: 216 ; 217 } 218 219 struct alertfn_target { 220 dispatch_t *d; 221 channel_id_t ch; 222 int count; 223 }; 224 static void 225 alertfn_generic(dispatch_t *d, channel_id_t ch, void *arg) 226 { 227 struct alertfn_target *t = arg; 228 tt_ptr_op(d, OP_EQ, t->d); 229 tt_int_op(ch, OP_EQ, t->ch); 230 ++t->count; 231 done: 232 ; 233 } 234 235 static void 236 test_pubsub_msg_alertfns(void *arg) 237 { 238 dispatch_t *d = arg; 239 struct alertfn_target ch1_a = { d, get_channel_id("main"), 0 }; 240 struct alertfn_target ch2_a = { d, get_channel_id("other"), 0 }; 241 242 tt_int_op(0, OP_EQ, 243 dispatch_set_alert_fn(d, get_channel_id("main"), 244 alertfn_generic, &ch1_a)); 245 tt_int_op(0, OP_EQ, 246 dispatch_set_alert_fn(d, get_channel_id("other"), 247 alertfn_generic, &ch2_a)); 248 249 SEND(msg3, "hello"); 250 tt_int_op(ch1_a.count, OP_EQ, 1); 251 SEND(msg3, "world"); 252 tt_int_op(ch1_a.count, OP_EQ, 1); // only the first message sends an alert 253 tt_int_op(ch2_a.count, OP_EQ, 0); // no alert for 'other' 254 255 SEND(msg4, "worse things happen in C"); 256 tt_int_op(ch2_a.count, OP_EQ, 1); 257 258 // flush the first (main) channel... 259 tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000)); 260 tt_int_op(6, OP_EQ, msg_received_msg3); // 3 subscribers, 2 instances. 261 262 // now that the main channel is flushed, sending another message on it 263 // starts another alert. 264 tt_int_op(ch1_a.count, OP_EQ, 1); 265 SEND(msg1, "plover"); 266 tt_int_op(ch1_a.count, OP_EQ, 2); 267 tt_int_op(ch2_a.count, OP_EQ, 1); 268 269 done: 270 ; 271 } 272 273 /* try more than N_FAST_FNS hooks on msg5 */ 274 static void 275 test_pubsub_msg_many_hooks(void *arg) 276 { 277 dispatch_t *d = arg; 278 strings_received = smartlist_new(); 279 280 tt_int_op(0, OP_EQ, msg_received_msg5); 281 SEND(msg5, "hello world"); 282 tt_int_op(0, OP_EQ, msg_received_msg5); 283 tt_int_op(0, OP_EQ, smartlist_len(strings_received)); 284 285 tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 100000)); 286 tt_int_op(5, OP_EQ, msg_received_msg5); 287 tt_int_op(995, OP_EQ, smartlist_len(strings_received)); 288 289 done: 290 SMARTLIST_FOREACH(strings_received, char *, s, tor_free(s)); 291 smartlist_free(strings_received); 292 } 293 294 #define T(name) \ 295 { #name, test_pubsub_msg_ ## name , TT_FORK, \ 296 &dispatcher_setup, NULL } 297 298 struct testcase_t pubsub_msg_tests[] = { 299 T(minimal), 300 T(send_to_stub), 301 T(cancel_msgs), 302 T(alertfns), 303 T(many_hooks), 304 END_OF_TESTCASES 305 };