neovim

Neovim text editor
git clone https://git.dasho.dev/neovim.git
Log | Files | Refs | README

multiqueue.c (8466B)


      1 // Multi-level queue for selective async event processing.
      2 // Not threadsafe; access must be synchronized externally.
      3 //
      4 // Multiqueue supports a parent-child relationship with these properties:
      5 // - pushing a node to a child queue will push a corresponding link node to the
      6 //   parent queue
      7 // - removing a link node from a parent queue will remove the next node
      8 //   in the linked child queue
      9 // - removing a node from a child queue will remove the corresponding link node
     10 //   in the parent queue
     11 //
     12 // These properties allow Nvim to organize and process events from different
     13 // sources with a certain degree of control. How the multiqueue is used:
     14 //
     15 //                         +----------------+
     16 //                         |   Main loop    |
     17 //                         +----------------+
     18 //
     19 //                         +----------------+
     20 //         +-------------->|   Event loop   |<------------+
     21 //         |               +--+-------------+             |
     22 //         |                  ^           ^               |
     23 //         |                  |           |               |
     24 //    +-----------+   +-----------+    +---------+    +---------+
     25 //    | Channel 1 |   | Channel 2 |    |  Job 1  |    |  Job 2  |
     26 //    +-----------+   +-----------+    +---------+    +---------+
     27 //
     28 //
     29 // The lower boxes represent event emitters, each with its own private queue
     30 // having the event loop queue as the parent.
     31 //
     32 // When idle, the main loop spins the event loop which queues events from many
     33 // sources (channels, jobs, user...). Each event emitter pushes events to its
     34 // private queue which is propagated to the event loop queue. When the main loop
     35 // consumes an event, the corresponding event is removed from the emitter's
     36 // queue.
     37 //
     38 // The main reason for this queue hierarchy is to allow focusing on a single
     39 // event emitter while blocking the main loop. For example, if the `jobwait`
     40 // Vimscript function is called on job1, the main loop will temporarily stop polling
     41 // the event loop queue and poll job1 queue instead. Same with channels, when
     42 // calling `rpcrequest` we want to temporarily stop processing events from
     43 // other sources and focus on a specific channel.
     44 
     45 #include <assert.h>
     46 #include <stdbool.h>
     47 #include <stddef.h>
     48 
     49 #include "nvim/event/defs.h"
     50 #include "nvim/event/multiqueue.h"
     51 #include "nvim/lib/queue_defs.h"
     52 #include "nvim/memory.h"
     53 
     54 typedef struct multiqueue_item MultiQueueItem;
     55 struct multiqueue_item {
     56  union {
     57    MultiQueue *queue;
     58    struct {
     59      Event event;
     60      MultiQueueItem *parent_item;
     61    } item;
     62  } data;
     63  bool link;  // true: current item is just a link to a node in a child queue
     64  QUEUE node;
     65 };
     66 
     67 struct multiqueue {
     68  MultiQueue *parent;
     69  QUEUE headtail;  // circularly-linked
     70  PutCallback on_put;  // Called on the parent (if any) when an item is enqueued in a child.
     71  void *data;
     72  size_t size;
     73 };
     74 
     75 typedef struct {
     76  Event event;
     77  bool fired;
     78  int refcount;
     79 } MulticastEvent;  ///< Event present on multiple queues.
     80 
     81 #include "event/multiqueue.c.generated.h"
     82 
     83 static Event NILEVENT = { .handler = NULL, .argv = { NULL } };
     84 
     85 /// Creates a new root (parentless) queue, which may gain child queues via `multiqueue_new_child`.
     86 MultiQueue *multiqueue_new(PutCallback on_put, void *data)
     87 {
     88  return _multiqueue_new(NULL, on_put, data);
     89 }
     90 
     91 /// Creates a new queue as a child of a `parent` queue.
     92 MultiQueue *multiqueue_new_child(MultiQueue *parent)
     93  FUNC_ATTR_NONNULL_ALL
     94 {
     95  assert(!parent->parent);  // parent cannot have a parent, more like a "root"
     96  parent->size++;
     97  return _multiqueue_new(parent, NULL, NULL);
     98 }
     99 
    100 static MultiQueue *_multiqueue_new(MultiQueue *parent, PutCallback on_put, void *data)
    101 {
    102  MultiQueue *rv = xmalloc(sizeof(MultiQueue));
    103  QUEUE_INIT(&rv->headtail);
    104  rv->size = 0;
    105  rv->parent = parent;
    106  rv->on_put = on_put;
    107  rv->data = data;
    108  return rv;
    109 }
    110 
    111 void multiqueue_free(MultiQueue *self)
    112 {
    113  assert(self);
    114  QUEUE *q;
    115  QUEUE_FOREACH(q, &self->headtail, {
    116    MultiQueueItem *item = multiqueue_node_data(q);
    117    if (self->parent) {
    118      QUEUE_REMOVE(&item->data.item.parent_item->node);
    119      xfree(item->data.item.parent_item);
    120    }
    121    QUEUE_REMOVE(q);
    122    xfree(item);
    123  })
    124 
    125  xfree(self);
    126 }
    127 
    128 /// Removes the next item and returns its Event.
    129 Event multiqueue_get(MultiQueue *self)
    130 {
    131  return multiqueue_empty(self) ? NILEVENT : multiqueue_remove(self);
    132 }
    133 
    134 void multiqueue_put_event(MultiQueue *self, Event event)
    135 {
    136  assert(self);
    137  multiqueue_push(self, event);
    138  if (self->parent && self->parent->on_put) {
    139    self->parent->on_put(self->parent, self->parent->data);
    140  }
    141 }
    142 
    143 /// Move events from src to dest.
    144 void multiqueue_move_events(MultiQueue *dest, MultiQueue *src)
    145  FUNC_ATTR_NONNULL_ALL
    146 {
    147  while (!multiqueue_empty(src)) {
    148    Event event = multiqueue_get(src);
    149    multiqueue_put_event(dest, event);
    150  }
    151 }
    152 
    153 void multiqueue_process_events(MultiQueue *self)
    154 {
    155  assert(self);
    156  while (!multiqueue_empty(self)) {
    157    Event event = multiqueue_remove(self);
    158    if (event.handler) {
    159      event.handler(event.argv);
    160    }
    161  }
    162 }
    163 
    164 /// Removes all events without processing them.
    165 void multiqueue_purge_events(MultiQueue *self)
    166 {
    167  assert(self);
    168  while (!multiqueue_empty(self)) {
    169    multiqueue_remove(self);
    170  }
    171 }
    172 
    173 bool multiqueue_empty(MultiQueue *self)
    174 {
    175  assert(self);
    176  return QUEUE_EMPTY(&self->headtail);
    177 }
    178 
    179 void multiqueue_replace_parent(MultiQueue *self, MultiQueue *new_parent)
    180 {
    181  assert(multiqueue_empty(self));
    182  self->parent = new_parent;
    183 }
    184 
    185 /// Gets the count of all events currently in the queue.
    186 size_t multiqueue_size(MultiQueue *self)
    187 {
    188  return self->size;
    189 }
    190 
    191 /// Gets an Event from an item.
    192 ///
    193 /// @param remove   Remove the node from its queue, and free it.
    194 static Event multiqueueitem_get_event(MultiQueueItem *item, bool remove)
    195 {
    196  assert(item != NULL);
    197  Event ev;
    198  if (item->link) {
    199    // get the next node in the linked queue
    200    MultiQueue *linked = item->data.queue;
    201    assert(!multiqueue_empty(linked));
    202    MultiQueueItem *child =
    203      multiqueue_node_data(QUEUE_HEAD(&linked->headtail));
    204    ev = child->data.item.event;
    205    // remove the child node
    206    if (remove) {
    207      QUEUE_REMOVE(&child->node);
    208      xfree(child);
    209    }
    210  } else {
    211    // remove the corresponding link node in the parent queue
    212    if (remove && item->data.item.parent_item) {
    213      QUEUE_REMOVE(&item->data.item.parent_item->node);
    214      xfree(item->data.item.parent_item);
    215      item->data.item.parent_item = NULL;
    216    }
    217    ev = item->data.item.event;
    218  }
    219  return ev;
    220 }
    221 
    222 static Event multiqueue_remove(MultiQueue *self)
    223 {
    224  assert(!multiqueue_empty(self));
    225  QUEUE *h = QUEUE_HEAD(&self->headtail);
    226  QUEUE_REMOVE(h);
    227  MultiQueueItem *item = multiqueue_node_data(h);
    228  assert(!item->link || !self->parent);  // Only a parent queue has link-nodes
    229  Event ev = multiqueueitem_get_event(item, true);
    230  self->size--;
    231  xfree(item);
    232  return ev;
    233 }
    234 
    235 static void multiqueue_push(MultiQueue *self, Event event)
    236 {
    237  MultiQueueItem *item = xmalloc(sizeof(MultiQueueItem));
    238  item->link = false;
    239  item->data.item.event = event;
    240  item->data.item.parent_item = NULL;
    241  QUEUE_INSERT_TAIL(&self->headtail, &item->node);
    242  if (self->parent) {
    243    // push link node to the parent queue
    244    item->data.item.parent_item = xmalloc(sizeof(MultiQueueItem));
    245    item->data.item.parent_item->link = true;
    246    item->data.item.parent_item->data.queue = self;
    247    QUEUE_INSERT_TAIL(&self->parent->headtail,
    248                      &item->data.item.parent_item->node);
    249  }
    250  self->size++;
    251 }
    252 
    253 static MultiQueueItem *multiqueue_node_data(QUEUE *q)
    254  FUNC_ATTR_NO_SANITIZE_ADDRESS
    255 {
    256  return QUEUE_DATA(q, MultiQueueItem, node);
    257 }
    258 
    259 /// Multicasts a one-shot event to multiple queues.
    260 ///
    261 /// The handler will be invoked once by the _first_ queue that consumes the
    262 /// event. Later processing will do nothing (just memory cleanup).
    263 ///
    264 /// @param ev  Event
    265 /// @param num  Number of queues that the event will be put on
    266 /// @return Event that is safe to put onto `num` queues
    267 Event event_create_oneshot(Event ev, int num)
    268 {
    269  MulticastEvent *data = xmalloc(sizeof(*data));
    270  data->event = ev;
    271  data->fired = false;
    272  data->refcount = num;
    273  return event_create(multiqueue_oneshot_event, data);
    274 }
    275 static void multiqueue_oneshot_event(void **argv)
    276 {
    277  MulticastEvent *data = argv[0];
    278  if (!data->fired) {
    279    data->fired = true;
    280    if (data->event.handler) {
    281      data->event.handler(data->event.argv);
    282    }
    283  }
    284  if ((--data->refcount) == 0) {
    285    xfree(data);
    286  }
    287 }