multiqueue.c (8466B)
1 // Multi-level queue for selective async event processing. 2 // Not threadsafe; access must be synchronized externally. 3 // 4 // Multiqueue supports a parent-child relationship with these properties: 5 // - pushing a node to a child queue will push a corresponding link node to the 6 // parent queue 7 // - removing a link node from a parent queue will remove the next node 8 // in the linked child queue 9 // - removing a node from a child queue will remove the corresponding link node 10 // in the parent queue 11 // 12 // These properties allow Nvim to organize and process events from different 13 // sources with a certain degree of control. How the multiqueue is used: 14 // 15 // +----------------+ 16 // | Main loop | 17 // +----------------+ 18 // 19 // +----------------+ 20 // +-------------->| Event loop |<------------+ 21 // | +--+-------------+ | 22 // | ^ ^ | 23 // | | | | 24 // +-----------+ +-----------+ +---------+ +---------+ 25 // | Channel 1 | | Channel 2 | | Job 1 | | Job 2 | 26 // +-----------+ +-----------+ +---------+ +---------+ 27 // 28 // 29 // The lower boxes represent event emitters, each with its own private queue 30 // having the event loop queue as the parent. 31 // 32 // When idle, the main loop spins the event loop which queues events from many 33 // sources (channels, jobs, user...). Each event emitter pushes events to its 34 // private queue which is propagated to the event loop queue. When the main loop 35 // consumes an event, the corresponding event is removed from the emitter's 36 // queue. 37 // 38 // The main reason for this queue hierarchy is to allow focusing on a single 39 // event emitter while blocking the main loop. For example, if the `jobwait` 40 // Vimscript function is called on job1, the main loop will temporarily stop polling 41 // the event loop queue and poll job1 queue instead. Same with channels, when 42 // calling `rpcrequest` we want to temporarily stop processing events from 43 // other sources and focus on a specific channel. 44 45 #include <assert.h> 46 #include <stdbool.h> 47 #include <stddef.h> 48 49 #include "nvim/event/defs.h" 50 #include "nvim/event/multiqueue.h" 51 #include "nvim/lib/queue_defs.h" 52 #include "nvim/memory.h" 53 54 typedef struct multiqueue_item MultiQueueItem; 55 struct multiqueue_item { 56 union { 57 MultiQueue *queue; 58 struct { 59 Event event; 60 MultiQueueItem *parent_item; 61 } item; 62 } data; 63 bool link; // true: current item is just a link to a node in a child queue 64 QUEUE node; 65 }; 66 67 struct multiqueue { 68 MultiQueue *parent; 69 QUEUE headtail; // circularly-linked 70 PutCallback on_put; // Called on the parent (if any) when an item is enqueued in a child. 71 void *data; 72 size_t size; 73 }; 74 75 typedef struct { 76 Event event; 77 bool fired; 78 int refcount; 79 } MulticastEvent; ///< Event present on multiple queues. 80 81 #include "event/multiqueue.c.generated.h" 82 83 static Event NILEVENT = { .handler = NULL, .argv = { NULL } }; 84 85 /// Creates a new root (parentless) queue, which may gain child queues via `multiqueue_new_child`. 86 MultiQueue *multiqueue_new(PutCallback on_put, void *data) 87 { 88 return _multiqueue_new(NULL, on_put, data); 89 } 90 91 /// Creates a new queue as a child of a `parent` queue. 92 MultiQueue *multiqueue_new_child(MultiQueue *parent) 93 FUNC_ATTR_NONNULL_ALL 94 { 95 assert(!parent->parent); // parent cannot have a parent, more like a "root" 96 parent->size++; 97 return _multiqueue_new(parent, NULL, NULL); 98 } 99 100 static MultiQueue *_multiqueue_new(MultiQueue *parent, PutCallback on_put, void *data) 101 { 102 MultiQueue *rv = xmalloc(sizeof(MultiQueue)); 103 QUEUE_INIT(&rv->headtail); 104 rv->size = 0; 105 rv->parent = parent; 106 rv->on_put = on_put; 107 rv->data = data; 108 return rv; 109 } 110 111 void multiqueue_free(MultiQueue *self) 112 { 113 assert(self); 114 QUEUE *q; 115 QUEUE_FOREACH(q, &self->headtail, { 116 MultiQueueItem *item = multiqueue_node_data(q); 117 if (self->parent) { 118 QUEUE_REMOVE(&item->data.item.parent_item->node); 119 xfree(item->data.item.parent_item); 120 } 121 QUEUE_REMOVE(q); 122 xfree(item); 123 }) 124 125 xfree(self); 126 } 127 128 /// Removes the next item and returns its Event. 129 Event multiqueue_get(MultiQueue *self) 130 { 131 return multiqueue_empty(self) ? NILEVENT : multiqueue_remove(self); 132 } 133 134 void multiqueue_put_event(MultiQueue *self, Event event) 135 { 136 assert(self); 137 multiqueue_push(self, event); 138 if (self->parent && self->parent->on_put) { 139 self->parent->on_put(self->parent, self->parent->data); 140 } 141 } 142 143 /// Move events from src to dest. 144 void multiqueue_move_events(MultiQueue *dest, MultiQueue *src) 145 FUNC_ATTR_NONNULL_ALL 146 { 147 while (!multiqueue_empty(src)) { 148 Event event = multiqueue_get(src); 149 multiqueue_put_event(dest, event); 150 } 151 } 152 153 void multiqueue_process_events(MultiQueue *self) 154 { 155 assert(self); 156 while (!multiqueue_empty(self)) { 157 Event event = multiqueue_remove(self); 158 if (event.handler) { 159 event.handler(event.argv); 160 } 161 } 162 } 163 164 /// Removes all events without processing them. 165 void multiqueue_purge_events(MultiQueue *self) 166 { 167 assert(self); 168 while (!multiqueue_empty(self)) { 169 multiqueue_remove(self); 170 } 171 } 172 173 bool multiqueue_empty(MultiQueue *self) 174 { 175 assert(self); 176 return QUEUE_EMPTY(&self->headtail); 177 } 178 179 void multiqueue_replace_parent(MultiQueue *self, MultiQueue *new_parent) 180 { 181 assert(multiqueue_empty(self)); 182 self->parent = new_parent; 183 } 184 185 /// Gets the count of all events currently in the queue. 186 size_t multiqueue_size(MultiQueue *self) 187 { 188 return self->size; 189 } 190 191 /// Gets an Event from an item. 192 /// 193 /// @param remove Remove the node from its queue, and free it. 194 static Event multiqueueitem_get_event(MultiQueueItem *item, bool remove) 195 { 196 assert(item != NULL); 197 Event ev; 198 if (item->link) { 199 // get the next node in the linked queue 200 MultiQueue *linked = item->data.queue; 201 assert(!multiqueue_empty(linked)); 202 MultiQueueItem *child = 203 multiqueue_node_data(QUEUE_HEAD(&linked->headtail)); 204 ev = child->data.item.event; 205 // remove the child node 206 if (remove) { 207 QUEUE_REMOVE(&child->node); 208 xfree(child); 209 } 210 } else { 211 // remove the corresponding link node in the parent queue 212 if (remove && item->data.item.parent_item) { 213 QUEUE_REMOVE(&item->data.item.parent_item->node); 214 xfree(item->data.item.parent_item); 215 item->data.item.parent_item = NULL; 216 } 217 ev = item->data.item.event; 218 } 219 return ev; 220 } 221 222 static Event multiqueue_remove(MultiQueue *self) 223 { 224 assert(!multiqueue_empty(self)); 225 QUEUE *h = QUEUE_HEAD(&self->headtail); 226 QUEUE_REMOVE(h); 227 MultiQueueItem *item = multiqueue_node_data(h); 228 assert(!item->link || !self->parent); // Only a parent queue has link-nodes 229 Event ev = multiqueueitem_get_event(item, true); 230 self->size--; 231 xfree(item); 232 return ev; 233 } 234 235 static void multiqueue_push(MultiQueue *self, Event event) 236 { 237 MultiQueueItem *item = xmalloc(sizeof(MultiQueueItem)); 238 item->link = false; 239 item->data.item.event = event; 240 item->data.item.parent_item = NULL; 241 QUEUE_INSERT_TAIL(&self->headtail, &item->node); 242 if (self->parent) { 243 // push link node to the parent queue 244 item->data.item.parent_item = xmalloc(sizeof(MultiQueueItem)); 245 item->data.item.parent_item->link = true; 246 item->data.item.parent_item->data.queue = self; 247 QUEUE_INSERT_TAIL(&self->parent->headtail, 248 &item->data.item.parent_item->node); 249 } 250 self->size++; 251 } 252 253 static MultiQueueItem *multiqueue_node_data(QUEUE *q) 254 FUNC_ATTR_NO_SANITIZE_ADDRESS 255 { 256 return QUEUE_DATA(q, MultiQueueItem, node); 257 } 258 259 /// Multicasts a one-shot event to multiple queues. 260 /// 261 /// The handler will be invoked once by the _first_ queue that consumes the 262 /// event. Later processing will do nothing (just memory cleanup). 263 /// 264 /// @param ev Event 265 /// @param num Number of queues that the event will be put on 266 /// @return Event that is safe to put onto `num` queues 267 Event event_create_oneshot(Event ev, int num) 268 { 269 MulticastEvent *data = xmalloc(sizeof(*data)); 270 data->event = ev; 271 data->fired = false; 272 data->refcount = num; 273 return event_create(multiqueue_oneshot_event, data); 274 } 275 static void multiqueue_oneshot_event(void **argv) 276 { 277 MulticastEvent *data = argv[0]; 278 if (!data->fired) { 279 data->fired = true; 280 if (data->event.handler) { 281 data->event.handler(data->event.argv); 282 } 283 } 284 if ((--data->refcount) == 0) { 285 xfree(data); 286 } 287 }