mt-waitset: adding masking of channels so a thread won't handle two messages at the...
[barrelfish] / lib / barrelfish / waitset.c
1 /**
2  * \file
3  * \brief Waitset and low-level event handling mechanism
4  *
5  * A "wait set" is a collection of channels to wait on, much like an
6  * FDSET in POSIX. There should be a default, static wait set for each
7  * dispatcher. Threads which wait for events specify the wait set they
8  * are waiting on.
9  */
10
11 /*
12  * Copyright (c) 2009-2012, ETH Zurich.
13  * Copyright (c) 2015, Hewlett Packard Enterprise Development LP.
14  * All rights reserved.
15  *
16  * This file is distributed under the terms in the attached LICENSE file.
17  * If you do not find this file, copies can be found by writing to:
18  * ETH Zurich D-INFK, Universitaetstr. 6, CH-8092 Zurich. Attn: Systems Group.
19  */
20
21 #include <barrelfish/barrelfish.h>
22 #include <barrelfish/waitset.h>
23 #include <barrelfish/waitset_chan.h>
24 #include <barrelfish/threads.h>
25 #include <barrelfish/dispatch.h>
26 #include "threads_priv.h"
27 #include "waitset_chan_priv.h"
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <flounder/flounder.h>
32
33 #ifdef CONFIG_INTERCONNECT_DRIVER_UMP
34 #  include <barrelfish/ump_endpoint.h>
35 #endif
36
37 /// Dequeue a chanstate from a queue
38 static void dequeue(struct waitset_chanstate **queue, struct waitset_chanstate *chan)
39 {
40     if (chan->next == chan) {
41         assert(chan->prev == chan);
42         assert(*queue == chan);
43         *queue = NULL;
44     } else {
45         chan->prev->next = chan->next;
46         chan->next->prev = chan->prev;
47         if (*queue == chan) {
48             *queue = chan->next;
49         }
50     }
51     chan->prev = chan->next = NULL;
52 }
53
54 /// Enqueue a chanstate on a queue
55 static void enqueue(struct waitset_chanstate **queue, struct waitset_chanstate *chan)
56 {
57     if (*queue == NULL) {
58         *queue = chan;
59         chan->next = chan->prev = chan;
60     } else {
61         chan->next = *queue;
62         chan->prev = (*queue)->prev;
63         chan->next->prev = chan;
64         chan->prev->next = chan;
65     }
66 }
67
68 /// Dequeue a chanstate from polled queue
69 static void dequeue_polled(struct waitset_chanstate **queue,
70                             struct waitset_chanstate *chan)
71 {
72     if (chan->polled_next == chan) {
73         assert(chan->polled_prev == chan);
74         assert(*queue == chan);
75         *queue = NULL;
76     } else {
77         chan->polled_prev->polled_next = chan->polled_next;
78         chan->polled_next->polled_prev = chan->polled_prev;
79         if (*queue == chan) {
80             *queue = chan->polled_next;
81         }
82     }
83     chan->polled_prev = chan->polled_next = NULL;
84 }
85
86 /// Enqueue a chanstate on polled queue
87 static void enqueue_polled(struct waitset_chanstate **queue,
88                             struct waitset_chanstate *chan)
89 {
90     if (*queue == NULL) {
91         *queue = chan;
92         chan->polled_next = chan->polled_prev = chan;
93     } else {
94         chan->polled_next = *queue;
95         chan->polled_prev = (*queue)->polled_prev;
96         chan->polled_next->polled_prev = chan;
97         chan->polled_prev->polled_next = chan;
98     }
99 }
100
101 /**
102  * \brief Initialise a new waitset
103  */
104 void waitset_init(struct waitset *ws)
105 {
106     assert(ws != NULL);
107     ws->pending = ws->polled = ws->idle = ws->waiting = NULL;
108     ws->waiting_threads = NULL;
109 }
110
111 /**
112  * \brief Destroy a previously initialised waitset
113  */
114 errval_t waitset_destroy(struct waitset *ws)
115 {
116     assert(ws != NULL);
117
118     // FIXME: do we want to support cancelling all the pending events/channels?
119     if (ws->pending || ws->waiting_threads) {
120         return LIB_ERR_WAITSET_IN_USE;
121     }
122
123     // remove idle and polled channels from waitset
124     struct waitset_chanstate *chan, *next;
125     for (chan = ws->idle; chan != NULL; chan = next) {
126         next = chan->next;
127         assert(chan->state == CHAN_IDLE);
128         assert(chan->waitset == ws);
129         chan->waitset = NULL;
130         chan->next = chan->prev = NULL;
131
132         if (next == ws->idle) {
133             break;
134         }
135     }
136     ws->idle = NULL;
137
138     for (chan = ws->polled; chan != NULL; chan = next) {
139         next = chan->next;
140         assert(chan->state == CHAN_POLLED);
141         assert(chan->waitset == ws);
142         chan->waitset = NULL;
143         chan->next = chan->prev = NULL;
144
145         if (next == ws->polled) {
146             break;
147         }
148     }
149     ws->polled = NULL;
150
151     return SYS_ERR_OK;
152 }
153
154 /// Check if the thread can receive the event
155 static bool waitset_can_receive(struct waitset_chanstate *chan,
156                                 struct thread *thread)
157 {
158     bool res = false;
159
160     if (!thread->mask_channels || !chan->masked) {
161         if (chan->wait_for) // if a thread is waiting for this specific event
162             res = chan->wait_for == thread;
163         else
164             res = (chan->token & 1 && !thread->token) // incoming token is a request
165                 // and a thread is not waiting for a token
166                 || (!chan->token && chan != thread->channel) // there's no token
167                 // and a thread is not waiting specifically for that event
168                 || (chan->token == thread->token && chan == thread->channel);
169                 // there is a token and it matches thread's token and event
170     }
171     return res;
172 }
173
174 /// Returns a channel with a pending event on the given waitset matching
175 /// our thread
176 static struct waitset_chanstate *get_pending_event_disabled(struct waitset *ws,
177                                     struct waitset_chanstate *chan)
178 {
179     struct thread *me = thread_self_disabled();
180
181     if (chan) { // channel that we wait for
182         if (chan->state == CHAN_PENDING && waitset_can_receive(chan, me)) {
183             return chan;
184         }
185         if (chan->state == CHAN_WAITING && waitset_can_receive(chan, me)) {
186             return chan;
187         }
188     }
189     // check a waiting queue for matching event
190     for (chan = ws->waiting; chan; ) {
191         if (waitset_can_receive(chan, me)) {
192             assert_disabled(chan->state == CHAN_WAITING);
193             return chan;
194         }
195         chan = chan->next;
196         if (chan == ws->waiting)
197             break;
198     }
199     // check a pending queue for matching event
200     for (chan = ws->pending; chan;) {
201         if (waitset_can_receive(chan, me)) {
202             assert_disabled(chan->state == CHAN_PENDING);
203             return chan;
204         }
205         chan = chan->next;
206         if (chan == ws->pending)
207             break;
208     }
209     return NULL;
210 }
211
212 void arranet_polling_loop_proxy(void) __attribute__((weak));
213 void arranet_polling_loop_proxy(void)
214 {
215     USER_PANIC("Network polling not available without Arranet!\n");
216 }
217
218 void poll_ahci(struct waitset_chanstate *) __attribute__((weak));
219 void poll_ahci(struct waitset_chanstate *chan)
220 {
221     errval_t err = waitset_chan_trigger(chan);
222     assert(err_is_ok(err)); // should not be able to fail
223 }
224
225 /// Check polled channels
226 void poll_channels_disabled(dispatcher_handle_t handle) {
227     struct dispatcher_generic *dp = get_dispatcher_generic(handle);
228     struct waitset_chanstate *chan;
229
230     if (!dp->polled_channels)
231         return;
232     chan = dp->polled_channels;
233     do {
234         switch (chan->chantype) {
235 #ifdef CONFIG_INTERCONNECT_DRIVER_UMP
236         case CHANTYPE_UMP_IN: {
237             if (ump_endpoint_poll(chan)) {
238                 errval_t err = waitset_chan_trigger_disabled(chan, handle);
239                 assert(err_is_ok(err)); // should not fail
240                 if (!dp->polled_channels) // restart scan
241                     return;
242                 chan = dp->polled_channels;
243                 continue;
244             } else
245                 chan = chan->polled_next;
246         } break;
247 #endif // CONFIG_INTERCONNECT_DRIVER_UMP
248         case CHANTYPE_LWIP_SOCKET:
249             arranet_polling_loop_proxy();
250             break;
251         case CHANTYPE_AHCI:
252             poll_ahci(chan);
253             break;
254         default:
255             assert(!"invalid channel type to poll!");
256         }
257     } while (chan != dp->polled_channels);
258 }
259
260 /// Re-register a channel (if persistent)
261 static void reregister_channel(struct waitset *ws, struct waitset_chanstate *chan,
262                                 dispatcher_handle_t handle)
263 {
264     assert(chan->waitset == ws);
265     if (chan->state == CHAN_PENDING) {
266         dequeue(&ws->pending, chan);
267     } else {
268         assert(chan->state == CHAN_WAITING);
269         dequeue(&ws->waiting, chan);
270     }
271
272     chan->token = 0;
273     if (chan->chantype == CHANTYPE_UMP_IN
274         || chan->chantype == CHANTYPE_LWIP_SOCKET
275         || chan->chantype == CHANTYPE_AHCI) {
276         enqueue(&ws->polled, chan);
277         enqueue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
278         chan->state = CHAN_POLLED;
279     } else {
280         enqueue(&ws->idle, chan);
281         chan->state = CHAN_IDLE;
282     }
283 }
284
285 /// Find a thread that is able to receive an event
286 static struct thread * find_recipient(struct waitset *ws,
287                         struct waitset_chanstate *channel, struct thread *me)
288 {
289     struct thread *t = ws->waiting_threads;
290
291     if (!t)
292         return NULL;
293     do {
294         if (waitset_can_receive(channel, t))
295             return t;
296         t = t->next;
297     } while (t != ws->waiting_threads);
298     return ws->waiting_threads;
299 }
300
301 /// Wake up other thread if there's more pending events
302 static void wake_up_other_thread(dispatcher_handle_t handle, struct waitset *ws)
303 {
304     if (ws->pending && ws->waiting_threads) {
305         struct thread *t;
306
307         t = thread_unblock_one_disabled(handle, &ws->waiting_threads, NULL);
308         assert_disabled(t == NULL); // shouldn't see a remote thread
309     }
310 }
311
312 /**
313  * \brief Get next pending event
314  *
315  * Check if there is a pending event that matches current thread and return it.
316  * Pending events are in a pending queue and in a waiting queue.
317  * A pending event then will be removed from a pending/waiting queue and become
318  * unregistered or, if it's persistent, will be re-registered to an idle queue
319  * or a polled queue (UMP channels) of a waitset.
320  * If there's no pending event, block this thread.
321  * If there's a pending event but it doesn't match our thread, don't remove it
322  * from a pending queue and wake up a matching thread.
323  * If there's no matching thread, add it to a waiting queue.
324  *
325  * \param ws Waitset with sources of events
326  * \param retchannel Holder of returned event
327  * \param retclosure Holder of returned closure
328  * \param waitfor Specific event that we're waiting for (can be NULL)
329  * \param handle Dispatcher's handle
330  * \param debug Debug mode (not used)
331  */
332
333 errval_t get_next_event_disabled(struct waitset *ws,
334     struct waitset_chanstate **retchannel, struct event_closure *retclosure,
335     struct waitset_chanstate *waitfor, dispatcher_handle_t handle, bool debug)
336 {
337     struct waitset_chanstate * chan;
338
339     for (;;) {
340         chan = get_pending_event_disabled(ws, waitfor); // get our event
341         if (chan) {
342             *retchannel = chan;
343             *retclosure = chan->closure;
344             chan->wait_for = NULL;
345             chan->token = 0;
346             if (chan->persistent)
347                 reregister_channel(ws, chan, handle);
348             else
349                 waitset_chan_deregister_disabled(chan, handle);
350             wake_up_other_thread(handle, ws);
351             return SYS_ERR_OK;
352         }
353         chan = ws->pending; // check a pending queue
354         if (!chan) { // if nothing then wait
355             thread_block_disabled(handle, &ws->waiting_threads);
356             disp_disable();
357         } else { // something but it's not our event
358             if (!ws->waiting_threads) { // no other thread interested in
359                 dequeue(&ws->pending, chan);
360                 enqueue(&ws->waiting, chan);
361                 chan->state = CHAN_WAITING;
362                 chan->waitset = ws;
363             } else {
364                 // find a matching thread
365                 struct thread *t;
366                 for (t = ws->waiting_threads; t; ) {
367                     if (waitset_can_receive(chan, t)) { // match found, wake it
368                         ws->waiting_threads = t;
369                         t = thread_unblock_one_disabled(handle,
370                                                     &ws->waiting_threads, chan);
371                         assert_disabled(t == NULL); // shouldn't see a remote thread
372                         break;
373                     }
374                     t = t->next;
375                     if (t == ws->waiting_threads) { // no recipient found
376                         dequeue(&ws->pending, chan);
377                         enqueue(&ws->waiting, chan);
378                         chan->state = CHAN_WAITING;
379                         chan->waitset = ws;
380                         break;
381                     }
382                 }
383             }
384         }
385     }
386 }
387
388 /**
389  * \brief Wait for (block) and return next event on given waitset
390  *
391  * Wait until something happens, either activity on some channel, or a deferred
392  * call, and then return the corresponding closure. This is the core of the
393  * event-handling system.
394  *
395  * \param ws Waitset
396  * \param retclosure Pointer to storage space for returned event closure
397  */
398 errval_t get_next_event(struct waitset *ws, struct event_closure *retclosure)
399 {
400     dispatcher_handle_t handle = disp_disable();
401     struct waitset_chanstate *channel;
402     errval_t err = get_next_event_disabled(ws, &channel, retclosure, NULL,
403                                             handle, false);
404     disp_enable(handle);
405     return err;
406 }
407
408
409
410 /**
411  * \brief Check if there is an event pending on given waitset
412  *
413  * This is essentially a non-blocking variant of get_next_event(). It should be
414  * used with great care, to avoid the creation of busy-waiting loops.
415  *
416  * \param ws Waitset
417  *
418  * \returns LIB_ERR_NO_EVENT if nothing is pending
419  */
420 static errval_t check_for_event_disabled(struct waitset *ws, dispatcher_handle_t handle)
421 {
422     struct waitset_chanstate *chan;
423
424     poll_channels_disabled(handle);
425     chan = get_pending_event_disabled(ws, NULL);
426     if (chan != NULL) {
427         return SYS_ERR_OK;
428     }
429     return LIB_ERR_NO_EVENT;
430 }
431
432 errval_t check_for_event(struct waitset *ws)
433 {
434     errval_t err;
435
436     assert(ws != NULL);
437     dispatcher_handle_t handle = disp_disable();
438     err = check_for_event_disabled(ws, handle);
439     disp_enable(handle);
440     return err;
441 }
442
443 /**
444  * \brief Wait for (block) and dispatch next event on given waitset
445  *
446  * Wait until something happens, either activity on some channel, or deferred
447  * call, and then call the corresponding closure.
448  *
449  * \param ws Waitset
450  */
451
452 errval_t event_dispatch(struct waitset *ws)
453 {
454     struct event_closure closure;
455     errval_t err = get_next_event(ws, &closure);
456     if (err_is_fail(err)) {
457         return err;
458     }
459
460     assert(closure.handler != NULL);
461     closure.handler(closure.arg);
462     return SYS_ERR_OK;
463 }
464
465 errval_t event_dispatch_debug(struct waitset *ws)
466 {
467     struct event_closure closure;
468     struct waitset_chanstate *channel;
469     dispatcher_handle_t handle = disp_disable();
470     errval_t err = get_next_event_disabled(ws, &channel, &closure, NULL, handle,
471                                             true);
472     disp_enable(handle);
473     if (err_is_fail(err)) {
474         return err;
475     }
476
477     assert(closure.handler != NULL);
478     closure.handler(closure.arg);
479     return SYS_ERR_OK;
480 }
481
482 /**
483  * \brief Dispatch events until a specific event is received
484  *
485  * Wait for events and dispatch them. If a specific event comes, don't call
486  * a closure, just return.
487  *
488  * \param ws Waitset
489  * \param waitfor Event, that we are waiting for
490  * \param error_var Error variable that can be changed by closures
491  */
492
493 errval_t wait_for_channel(struct waitset *ws, struct waitset_chanstate *waitfor,
494                             errval_t *error_var)
495 {
496     assert(waitfor->waitset == ws);
497     for (;;) {
498         struct event_closure closure;
499         struct waitset_chanstate *channel;
500
501         dispatcher_handle_t handle = disp_disable();
502         errval_t err = get_next_event_disabled(ws, &channel, &closure, waitfor,
503                                                 handle, false);
504         disp_enable(handle);
505         if (err_is_fail(err)) {
506             assert(0);
507             return err;
508         }
509         if (channel == waitfor) {
510             return SYS_ERR_OK;
511         }
512         assert(!channel->wait_for);
513         assert(closure.handler != NULL);
514         closure.handler(closure.arg);
515         if (err_is_fail(*error_var))
516             return *error_var;
517     }
518 }
519
520 /**
521  * \brief check and dispatch next event on given waitset
522  *
523  * Check if there is any pending activity on some channel, or deferred
524  * call, and then call the corresponding closure.
525  *
526  * Do not wait!  In case of no pending events, return err LIB_ERR_NO_EVENT.
527  *
528  * \param ws Waitset
529  */
530 errval_t event_dispatch_non_block(struct waitset *ws)
531 {
532     struct waitset_chanstate *channel;
533     struct event_closure closure;
534
535     assert(ws != NULL);
536
537     // are there any pending events on the waitset?
538     dispatcher_handle_t handle = disp_disable();
539     errval_t err = check_for_event_disabled(ws, handle);
540     if (err_is_fail(err)) {
541         disp_enable(handle);
542         return err;
543     }
544     err = get_next_event_disabled(ws, &channel, &closure, NULL, handle,
545                                             false);
546     if (err_is_fail(err))
547         return err;
548     disp_enable(handle);
549     assert(closure.handler != NULL);
550     closure.handler(closure.arg);
551     return SYS_ERR_OK;
552 }
553
554
555 /**
556  * \privatesection
557  * "Private" functions that are called only by the channel implementations
558  */
559
560 /**
561  * \brief Initialise per-channel waitset state
562  *
563  * \param chan Channel state
564  * \param chantype Channel type
565  */
566 void waitset_chanstate_init(struct waitset_chanstate *chan,
567                             enum ws_chantype chantype)
568 {
569     assert(chan != NULL);
570     chan->waitset = NULL;
571     chan->chantype = chantype;
572     chan->state = CHAN_UNREGISTERED;
573 #ifndef NDEBUG
574     chan->prev = chan->next = NULL;
575 #endif
576     chan->persistent = false;
577     chan->token = 0;
578     chan->wait_for = NULL;
579     chan->masked = false;
580 }
581
582 /**
583  * \brief Destroy previously-initialised per-channel waitset state
584  * \param chan Channel state
585  */
586 void waitset_chanstate_destroy(struct waitset_chanstate *chan)
587 {
588     assert(chan != NULL);
589     if (chan->waitset != NULL) {
590         errval_t err = waitset_chan_deregister(chan);
591         assert(err_is_ok(err)); // can't fail if registered
592     }
593 }
594
595 /**
596  * \brief Register a closure to be called when a channel is triggered
597  *
598  * In the Future, call the closure on a thread associated with the waitset
599  * when the channel is triggered. Only one closure may be registered per
600  * channel state at any one time.
601  * This function must only be called when disabled.
602  *
603  * \param ws Waitset
604  * \param chan Waitset's per-channel state
605  * \param closure Event handler
606  */
607 errval_t waitset_chan_register_disabled(struct waitset *ws,
608                                         struct waitset_chanstate *chan,
609                                         struct event_closure closure)
610 {
611     if (chan->waitset != NULL) {
612         return LIB_ERR_CHAN_ALREADY_REGISTERED;
613     }
614
615     chan->waitset = ws;
616     chan->token = 0;
617
618     // channel must not already be registered!
619     assert_disabled(chan->next == NULL && chan->prev == NULL);
620     assert_disabled(chan->state == CHAN_UNREGISTERED);
621
622     // this is probably insane! :)
623     assert_disabled(closure.handler != NULL);
624
625     // store closure
626     chan->closure = closure;
627
628     // enqueue this channel on the waitset's queue of idle channels
629     enqueue(&ws->idle, chan);
630     chan->state = CHAN_IDLE;
631
632     return SYS_ERR_OK;
633 }
634
635 /**
636  * \brief Register a closure on a channel, and mark the channel as polled
637  *
638  * In the Future, call the closure on a thread associated with the waitset
639  * when the channel is triggered. Only one closure may be registered per
640  * channel state at any one time. Additionally, mark the channel as polled.
641  * This function must only be called when disabled.
642  *
643  * \param ws Waitset
644  * \param chan Waitset's per-channel state
645  * \param closure Event handler
646  * \param disp Current dispatcher pointer
647  */
648 errval_t waitset_chan_register_polled_disabled(struct waitset *ws,
649                                                struct waitset_chanstate *chan,
650                                                struct event_closure closure,
651                                                dispatcher_handle_t handle)
652 {
653     if (chan->waitset != NULL) {
654         return LIB_ERR_CHAN_ALREADY_REGISTERED;
655     }
656
657     chan->waitset = ws;
658     chan->token = 0;
659
660     // channel must not already be registered!
661     assert_disabled(chan->next == NULL && chan->prev == NULL);
662     assert_disabled(chan->state == CHAN_UNREGISTERED);
663
664     // store closure
665     chan->closure = closure;
666
667     // enqueue this channel on the waitset's queue of polled channels
668     enqueue(&ws->polled, chan);
669     chan->state = CHAN_POLLED;
670     enqueue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
671
672     return SYS_ERR_OK;
673 }
674
675 /**
676  * \brief Register a closure to be called when a channel is triggered
677  *
678  * In the Future, call the closure on a thread associated with the waitset
679  * when the channel is triggered. Only one closure may be registered per
680  * channel state at any one time.
681  * This function must only be called when enabled.
682  *
683  * \param ws Waitset
684  * \param chan Waitset's per-channel state
685  * \param closure Event handler
686  */
687 errval_t waitset_chan_register(struct waitset *ws, struct waitset_chanstate *chan,
688                                struct event_closure closure)
689 {
690     dispatcher_handle_t handle = disp_disable();
691     errval_t err = waitset_chan_register_disabled(ws, chan, closure);
692     disp_enable(handle);
693     return err;
694 }
695
696 /**
697  * \brief Register a closure on a channel, and mark the channel as polled
698  *
699  * In the Future, call the closure on a thread associated with the waitset
700  * when the channel is triggered. Only one closure may be registered per
701  * channel state at any one time. Additionally, mark the channel as polled.
702  * This function must only be called when enabled. It is equivalent to
703  * calling waitset_chan_register() followed by waitset_chan_start_polling().
704  *
705  * \param ws Waitset
706  * \param chan Waitset's per-channel state
707  * \param closure Event handler
708  */
709 errval_t waitset_chan_register_polled(struct waitset *ws,
710                                       struct waitset_chanstate *chan,
711                                       struct event_closure closure)
712 {
713     dispatcher_handle_t handle = disp_disable();
714     errval_t err = waitset_chan_register_polled_disabled(ws, chan, closure, handle);
715     disp_enable(handle);
716     return err;
717 }
718
719 /**
720  * \brief Cancel a previous callback registration
721  *
722  * Remove the registration for a callback on the given channel.
723  * This function must only be called when disabled.
724  *
725  * \param chan Waitset's per-channel state
726  */
727 errval_t waitset_chan_deregister_disabled(struct waitset_chanstate *chan,
728                                           dispatcher_handle_t handle)
729 {
730     assert_disabled(chan != NULL);
731     struct waitset *ws = chan->waitset;
732     if (ws == NULL) {
733         return LIB_ERR_CHAN_NOT_REGISTERED;
734     }
735
736     // remove this channel from the queue in which it is waiting
737     chan->waitset = NULL;
738     assert_disabled(chan->next != NULL && chan->prev != NULL);
739
740     switch (chan->state) {
741     case CHAN_IDLE:
742         dequeue(&ws->idle, chan);
743         break;
744
745     case CHAN_POLLED:
746         dequeue(&ws->polled, chan);
747         dequeue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
748         break;
749
750     case CHAN_PENDING:
751         dequeue(&ws->pending, chan);
752         break;
753
754     case CHAN_WAITING:
755         dequeue(&ws->waiting, chan);
756         break;
757
758     default:
759         assert_disabled(!"invalid channel state in deregister");
760     }
761     chan->state = CHAN_UNREGISTERED;
762     chan->wait_for = NULL;
763     return SYS_ERR_OK;
764 }
765
766 /**
767  * \brief Cancel a previous callback registration
768  *
769  * Remove the registration for a callback on the given channel.
770  * This function must only be called when enabled.
771  *
772  * \param chan Waitset's per-channel state
773  */
774 errval_t waitset_chan_deregister(struct waitset_chanstate *chan)
775 {
776     dispatcher_handle_t handle = disp_disable();
777     errval_t err = waitset_chan_deregister_disabled(chan, handle);
778     disp_enable(handle);
779     return err;
780 }
781
782 /**
783  * \brief Migrate callback registrations to a new waitset.
784  *
785  * \param chan Old waitset's per-channel state to migrate
786  * \param new_ws New waitset to migrate to
787  */
788 void waitset_chan_migrate(struct waitset_chanstate *chan,
789                           struct waitset *new_ws)
790 {
791     struct waitset *ws = chan->waitset;
792
793     // Only when registered
794     if(ws == NULL) {
795         return;
796     }
797
798     switch(chan->state) {
799     case CHAN_IDLE:
800         dequeue(&ws->idle, chan);
801         enqueue(&new_ws->idle, chan);
802         break;
803
804     case CHAN_POLLED:
805         dequeue(&ws->polled, chan);
806         enqueue(&new_ws->polled, chan);
807         break;
808
809     case CHAN_PENDING:
810         dequeue(&ws->pending, chan);
811         enqueue(&new_ws->pending, chan);
812         break;
813
814     case CHAN_WAITING:
815         dequeue(&ws->waiting, chan);
816         enqueue(&new_ws->waiting, chan);
817         break;
818
819     case CHAN_UNREGISTERED:
820         // Do nothing
821         break;
822     }
823
824     // Remember new waitset association
825     chan->waitset = new_ws;
826 }
827
828 /**
829  * \brief Trigger an event callback on a channel
830  *
831  * Marks the given channel as having a pending event, causing some future call
832  * to get_next_event() to return the registered closure.
833  * This function must only be called when disabled.
834  *
835  * \param chan Waitset's per-channel state
836  * \param disp Current dispatcher pointer
837  */
838 errval_t waitset_chan_trigger_disabled(struct waitset_chanstate *chan,
839                                        dispatcher_handle_t handle)
840 {
841     assert_disabled(chan != NULL);
842     struct waitset *ws = chan->waitset;
843     assert_disabled(ws != NULL);
844     assert_disabled(chan->prev != NULL && chan->next != NULL);
845
846     // no-op if already pending
847     if (chan->state == CHAN_PENDING) {
848         return SYS_ERR_OK;
849     }
850
851     // remove from previous queue (either idle or polled)
852     if (chan->state == CHAN_IDLE) {
853         dequeue(&ws->idle, chan);
854     } else {
855         assert_disabled(chan->state == CHAN_POLLED);
856         dequeue(&ws->polled, chan);
857         dequeue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
858     }
859
860     // else mark channel pending and move to end of pending event queue
861     enqueue(&ws->pending, chan);
862     chan->state = CHAN_PENDING;
863
864     // is there a thread blocked on this waitset? if so, awaken it with the event
865     struct thread *thread = find_recipient(ws, chan, thread_self_disabled());
866     if (thread) {
867         struct thread *t;
868         ws->waiting_threads = thread;
869         t = thread_unblock_one_disabled(handle, &ws->waiting_threads, chan);
870         assert_disabled(t == NULL);
871     }
872     return SYS_ERR_OK;
873 }
874
875 /**
876  * \brief Trigger an event callback on a channel
877  *
878  * Marks the given channel as having a pending event, causing some future call
879  * to get_next_event() to return the registered closure.
880  * This function must only be called when enabled.
881  *
882  * \param chan Waitset's per-channel state
883  * \param disp Current dispatcher pointer
884  */
885 errval_t waitset_chan_trigger(struct waitset_chanstate *chan)
886 {
887     dispatcher_handle_t handle = disp_disable();
888     errval_t err = waitset_chan_trigger_disabled(chan, handle);
889     disp_enable(handle);
890     return err;
891 }
892
893 /**
894  * \brief Trigger a specific event callback on an unregistered channel
895  *
896  * This function is equivalent to waitset_chan_register_disabled() immediately
897  * followed by waitset_chan_trigger_disabled(), but avoids unneccessary queue
898  * manipulation. This function must only be called when disabled.
899  *
900  * \param ws Waitset
901  * \param chan Waitset's per-channel state
902  * \param closure Event handler
903  * \param disp Current dispatcher pointer
904  */
905 errval_t waitset_chan_trigger_closure_disabled(struct waitset *ws,
906                                                struct waitset_chanstate *chan,
907                                                struct event_closure closure,
908                                                dispatcher_handle_t handle)
909 {
910     assert_disabled(chan != NULL);
911     assert_disabled(ws != NULL);
912
913     // check if already registered
914     if (chan->waitset != NULL || chan->state != CHAN_UNREGISTERED) {
915         return LIB_ERR_CHAN_ALREADY_REGISTERED;
916     }
917
918     assert_disabled(chan->prev == NULL && chan->next == NULL);
919
920     // set closure
921     chan->closure = closure;
922
923     // mark channel pending and place on end of pending event queue
924     chan->waitset = ws;
925     enqueue(&ws->pending, chan);
926     // if (first)
927     //     ws->pending = chan;
928     chan->state = CHAN_PENDING;
929
930     // is there a thread blocked on this waitset? if so, awaken it with the event
931     struct thread *thread = find_recipient(ws, chan, thread_self_disabled());
932     if (thread) {
933         struct thread *t;
934         ws->waiting_threads = thread;
935         t = thread_unblock_one_disabled(handle, &ws->waiting_threads, chan);
936         assert_disabled(t == NULL);
937     }
938     return SYS_ERR_OK;
939 }
940
941
942 /**
943  * \brief Trigger a specific event callback on an unregistered channel
944  *
945  * This function is equivalent to waitset_chan_register()
946  * followed by waitset_chan_trigger(), but avoids unneccessary queue
947  * manipulation. This function must only be called when enabled.
948  *
949  * \param ws Waitset
950  * \param chan Waitset's per-channel state
951  * \param closure Event handler
952  */
953 errval_t waitset_chan_trigger_closure(struct waitset *ws,
954                                       struct waitset_chanstate *chan,
955                                       struct event_closure closure)
956 {
957     dispatcher_handle_t disp = disp_disable();
958     errval_t err = waitset_chan_trigger_closure_disabled(ws, chan, closure, disp);
959     disp_enable(disp);
960     return err;
961 }