tor

The Tor anonymity network
git clone https://git.dasho.dev/tor.git
Log | Files | Refs | README | LICENSE

test_pubsub_msg.c (9546B)


      1 /* Copyright (c) 2018-2021, The Tor Project, Inc. */
      2 /* See LICENSE for licensing information */
      3 
      4 #define DISPATCH_PRIVATE
      5 
      6 #include "test/test.h"
      7 
      8 #include "lib/dispatch/dispatch.h"
      9 #include "lib/dispatch/dispatch_naming.h"
     10 #include "lib/dispatch/dispatch_st.h"
     11 #include "lib/dispatch/msgtypes.h"
     12 #include "lib/pubsub/pubsub_flags.h"
     13 #include "lib/pubsub/pub_binding_st.h"
     14 #include "lib/pubsub/pubsub_build.h"
     15 #include "lib/pubsub/pubsub_builder_st.h"
     16 #include "lib/pubsub/pubsub_connect.h"
     17 #include "lib/pubsub/pubsub_publish.h"
     18 
     19 #include "lib/log/escape.h"
     20 #include "lib/malloc/malloc.h"
     21 #include "lib/string/printf.h"
     22 
     23 #include <stdio.h>
     24 #include <string.h>
     25 
     26 static char *
     27 ex_str_fmt(msg_aux_data_t aux)
     28 {
     29  return esc_for_log(aux.ptr);
     30 }
     31 static void
     32 ex_str_free(msg_aux_data_t aux)
     33 {
     34  tor_free_(aux.ptr);
     35 }
     36 static dispatch_typefns_t stringfns = {
     37  .free_fn = ex_str_free,
     38  .fmt_fn = ex_str_fmt
     39 };
     40 
     41 // We're using the lowest-level publish/subscribe logic here, to avoid the
     42 // pubsub_macros.h macros and just test the dispatch core.  We'll use a string
     43 // type for everything.
     44 
     45 #define DECLARE_MESSAGE(suffix)                         \
     46  static pub_binding_t pub_binding_##suffix;            \
     47  static int msg_received_##suffix = 0;                 \
     48  static void recv_msg_##suffix(const msg_t *m) {       \
     49    (void)m;                                            \
     50    ++msg_received_##suffix;                            \
     51  }                                                     \
     52  EAT_SEMICOLON
     53 
     54 #define ADD_PUBLISH(binding_suffix, subsys, channel, msg, flags)        \
     55  STMT_BEGIN {                                                          \
     56    con = pubsub_connector_for_subsystem(builder,                       \
     57                                           get_subsys_id(#subsys));     \
     58    pubsub_add_pub_(con, &pub_binding_##binding_suffix,                 \
     59                      get_channel_id(#channel),                         \
     60                      get_message_id(#msg), get_msg_type_id("string"),  \
     61                      (flags), __FILE__, __LINE__);                     \
     62    pubsub_connector_free(con);                                         \
     63  } STMT_END
     64 
     65 #define ADD_SUBSCRIBE(hook_suffix, subsys, channel, msg, flags)         \
     66  STMT_BEGIN {                                                          \
     67    con = pubsub_connector_for_subsystem(builder,                       \
     68                                           get_subsys_id(#subsys));     \
     69    pubsub_add_sub_(con, recv_msg_##hook_suffix,                        \
     70                      get_channel_id(#channel),                         \
     71                      get_message_id(#msg), get_msg_type_id("string"),  \
     72                      (flags), __FILE__, __LINE__);                     \
     73    pubsub_connector_free(con);                                         \
     74  } STMT_END
     75 
     76 #define SEND(binding_suffix, val)                          \
     77  STMT_BEGIN {                                             \
     78    msg_aux_data_t data_;                                  \
     79    data_.ptr = tor_strdup(val);                           \
     80    pubsub_pub_(&pub_binding_##binding_suffix, data_);     \
     81  } STMT_END
     82 
     83 DECLARE_MESSAGE(msg1);
     84 DECLARE_MESSAGE(msg2);
     85 DECLARE_MESSAGE(msg3);
     86 DECLARE_MESSAGE(msg4);
     87 DECLARE_MESSAGE(msg5);
     88 
     89 static smartlist_t *strings_received = NULL;
     90 static void
     91 recv_msg_copy_string(const msg_t *m)
     92 {
     93  const char *s = m->aux_data__.ptr;
     94  smartlist_add(strings_received, tor_strdup(s));
     95 }
     96 
     97 static void *
     98 setup_dispatcher(const struct testcase_t *testcase)
     99 {
    100  (void)testcase;
    101  pubsub_builder_t *builder = pubsub_builder_new();
    102  pubsub_connector_t *con;
    103 
    104  {
    105    con = pubsub_connector_for_subsystem(builder, get_subsys_id("types"));
    106    pubsub_connector_register_type_(con,
    107                                    get_msg_type_id("string"),
    108                                    &stringfns,
    109                                    "nowhere.c", 99);
    110    pubsub_connector_free(con);
    111  }
    112  // message1 has one publisher and one subscriber.
    113  ADD_PUBLISH(msg1, sys1, main, message1, 0);
    114  ADD_SUBSCRIBE(msg1, sys2, main, message1, 0);
    115 
    116  // message2 has a publisher and a stub subscriber.
    117  ADD_PUBLISH(msg2, sys1, main, message2, 0);
    118  ADD_SUBSCRIBE(msg2, sys2, main, message2, DISP_FLAG_STUB);
    119 
    120  // message3 has a publisher and three subscribers.
    121  ADD_PUBLISH(msg3, sys1, main, message3, 0);
    122  ADD_SUBSCRIBE(msg3, sys2, main, message3, 0);
    123  ADD_SUBSCRIBE(msg3, sys3, main, message3, 0);
    124  ADD_SUBSCRIBE(msg3, sys4, main, message3, 0);
    125 
    126  // message4 has one publisher and two subscribers, but it's on another
    127  // channel.
    128  ADD_PUBLISH(msg4, sys2, other, message4, 0);
    129  ADD_SUBSCRIBE(msg4, sys1, other, message4, 0);
    130  ADD_SUBSCRIBE(msg4, sys3, other, message4, 0);
    131 
    132  // message5 has a huge number of recipients.
    133  ADD_PUBLISH(msg5, sys3, main, message5, 0);
    134  ADD_SUBSCRIBE(msg5, sys4, main, message5, 0);
    135  ADD_SUBSCRIBE(msg5, sys5, main, message5, 0);
    136  ADD_SUBSCRIBE(msg5, sys6, main, message5, 0);
    137  ADD_SUBSCRIBE(msg5, sys7, main, message5, 0);
    138  ADD_SUBSCRIBE(msg5, sys8, main, message5, 0);
    139  for (int i = 0; i < 1000-5; ++i) {
    140    char *sys;
    141    tor_asprintf(&sys, "xsys-%d", i);
    142    con = pubsub_connector_for_subsystem(builder, get_subsys_id(sys));
    143    pubsub_add_sub_(con, recv_msg_copy_string,
    144                    get_channel_id("main"),
    145                    get_message_id("message5"),
    146                    get_msg_type_id("string"), 0, "here", 100);
    147    pubsub_connector_free(con);
    148    tor_free(sys);
    149  }
    150 
    151  return pubsub_builder_finalize(builder, NULL);
    152 }
    153 
    154 static int
    155 cleanup_dispatcher(const struct testcase_t *testcase, void *dispatcher_)
    156 {
    157  (void)testcase;
    158  dispatch_t *dispatcher = dispatcher_;
    159  dispatch_free(dispatcher);
    160  return 1;
    161 }
    162 
    163 static const struct testcase_setup_t dispatcher_setup = {
    164  setup_dispatcher, cleanup_dispatcher
    165 };
    166 
    167 static void
    168 test_pubsub_msg_minimal(void *arg)
    169 {
    170  dispatch_t *d = arg;
    171 
    172  tt_int_op(0, OP_EQ, msg_received_msg1);
    173  SEND(msg1, "hello world");
    174  tt_int_op(0, OP_EQ, msg_received_msg1); // hasn't actually arrived yet.
    175 
    176  tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000));
    177  tt_int_op(1, OP_EQ, msg_received_msg1); // we got the message!
    178 
    179 done:
    180  ;
    181 }
    182 
    183 static void
    184 test_pubsub_msg_send_to_stub(void *arg)
    185 {
    186  dispatch_t *d = arg;
    187 
    188  tt_int_op(0, OP_EQ, msg_received_msg2);
    189  SEND(msg2, "hello silence");
    190  tt_int_op(0, OP_EQ, msg_received_msg2); // hasn't actually arrived yet.
    191 
    192  tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000));
    193  tt_int_op(0, OP_EQ, msg_received_msg2); // doesn't arrive -- stub hook.
    194 
    195 done:
    196  ;
    197 }
    198 
    199 static void
    200 test_pubsub_msg_cancel_msgs(void *arg)
    201 {
    202  dispatch_t *d = arg;
    203 
    204  tt_int_op(0, OP_EQ, msg_received_msg1);
    205  for (int i = 0; i < 100; ++i) {
    206    SEND(msg1, "hello world");
    207  }
    208  tt_int_op(0, OP_EQ, msg_received_msg1); // hasn't actually arrived yet.
    209 
    210  tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 10));
    211  tt_int_op(10, OP_EQ, msg_received_msg1); // we got the message 10 times.
    212 
    213  // At this point, the dispatcher will be freed with queued, undelivered
    214  // messages.
    215 done:
    216  ;
    217 }
    218 
    219 struct alertfn_target {
    220  dispatch_t *d;
    221  channel_id_t ch;
    222  int count;
    223 };
    224 static void
    225 alertfn_generic(dispatch_t *d, channel_id_t ch, void *arg)
    226 {
    227  struct alertfn_target *t = arg;
    228  tt_ptr_op(d, OP_EQ, t->d);
    229  tt_int_op(ch, OP_EQ, t->ch);
    230  ++t->count;
    231 done:
    232  ;
    233 }
    234 
    235 static void
    236 test_pubsub_msg_alertfns(void *arg)
    237 {
    238  dispatch_t *d = arg;
    239  struct alertfn_target ch1_a = { d, get_channel_id("main"), 0 };
    240  struct alertfn_target ch2_a = { d, get_channel_id("other"), 0 };
    241 
    242  tt_int_op(0, OP_EQ,
    243            dispatch_set_alert_fn(d, get_channel_id("main"),
    244                                  alertfn_generic, &ch1_a));
    245  tt_int_op(0, OP_EQ,
    246            dispatch_set_alert_fn(d, get_channel_id("other"),
    247                                  alertfn_generic, &ch2_a));
    248 
    249  SEND(msg3, "hello");
    250  tt_int_op(ch1_a.count, OP_EQ, 1);
    251  SEND(msg3, "world");
    252  tt_int_op(ch1_a.count, OP_EQ, 1); // only the first message sends an alert
    253  tt_int_op(ch2_a.count, OP_EQ, 0); // no alert for 'other'
    254 
    255  SEND(msg4, "worse things happen in C");
    256  tt_int_op(ch2_a.count, OP_EQ, 1);
    257 
    258  // flush the first (main) channel...
    259  tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000));
    260  tt_int_op(6, OP_EQ, msg_received_msg3); // 3 subscribers, 2 instances.
    261 
    262  // now that the main channel is flushed, sending another message on it
    263  // starts another alert.
    264  tt_int_op(ch1_a.count, OP_EQ, 1);
    265  SEND(msg1, "plover");
    266  tt_int_op(ch1_a.count, OP_EQ, 2);
    267  tt_int_op(ch2_a.count, OP_EQ, 1);
    268 
    269 done:
    270  ;
    271 }
    272 
    273 /* try more than N_FAST_FNS hooks on msg5 */
    274 static void
    275 test_pubsub_msg_many_hooks(void *arg)
    276 {
    277  dispatch_t *d = arg;
    278  strings_received = smartlist_new();
    279 
    280  tt_int_op(0, OP_EQ, msg_received_msg5);
    281  SEND(msg5, "hello world");
    282  tt_int_op(0, OP_EQ, msg_received_msg5);
    283  tt_int_op(0, OP_EQ, smartlist_len(strings_received));
    284 
    285  tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 100000));
    286  tt_int_op(5, OP_EQ, msg_received_msg5);
    287  tt_int_op(995, OP_EQ, smartlist_len(strings_received));
    288 
    289 done:
    290  SMARTLIST_FOREACH(strings_received, char *, s, tor_free(s));
    291  smartlist_free(strings_received);
    292 }
    293 
    294 #define T(name)                                                 \
    295  { #name, test_pubsub_msg_ ## name , TT_FORK,                \
    296      &dispatcher_setup, NULL }
    297 
    298 struct testcase_t pubsub_msg_tests[] = {
    299  T(minimal),
    300  T(send_to_stub),
    301  T(cancel_msgs),
    302  T(alertfns),
    303  T(many_hooks),
    304  END_OF_TESTCASES
    305 };