Merge branch 'master' of ssh://code.systems.ethz.ch:8006/diffusion/BFI/barrelfish
[barrelfish] / lib / barrelfish / waitset.c
1 /**
2  * \file
3  * \brief Waitset and low-level event handling mechanism
4  *
5  * A "wait set" is a collection of channels to wait on, much like an
6  * FDSET in POSIX. There should be a default, static wait set for each
7  * dispatcher. Threads which wait for events specify the wait set they
8  * are waiting on.
9  */
10
11 /*
12  * Copyright (c) 2009-2012, ETH Zurich.
13  * Copyright (c) 2015, Hewlett Packard Enterprise Development LP.
14  * All rights reserved.
15  *
16  * This file is distributed under the terms in the attached LICENSE file.
17  * If you do not find this file, copies can be found by writing to:
18  * ETH Zurich D-INFK, Universitaetstr. 6, CH-8092 Zurich. Attn: Systems Group.
19  */
20
21 #include <barrelfish/barrelfish.h>
22 #include <barrelfish/waitset.h>
23 #include <barrelfish/waitset_chan.h>
24 #include <barrelfish/threads.h>
25 #include <barrelfish/dispatch.h>
26 #include "threads_priv.h"
27 #include "waitset_chan_priv.h"
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <flounder/flounder.h>
32
33 #ifdef CONFIG_INTERCONNECT_DRIVER_UMP
34 #  include <barrelfish/ump_endpoint.h>
35 #endif
36
37 /// Dequeue a chanstate from a queue
38 static void dequeue(struct waitset_chanstate **queue, struct waitset_chanstate *chan)
39 {
40     if (chan->next == chan) {
41         assert(chan->prev == chan);
42         assert(*queue == chan);
43         *queue = NULL;
44     } else {
45         chan->prev->next = chan->next;
46         chan->next->prev = chan->prev;
47         if (*queue == chan) {
48             *queue = chan->next;
49         }
50     }
51     chan->prev = chan->next = NULL;
52 }
53
54 /// Enqueue a chanstate on a queue
55 static void enqueue(struct waitset_chanstate **queue, struct waitset_chanstate *chan)
56 {
57     if (*queue == NULL) {
58         *queue = chan;
59         chan->next = chan->prev = chan;
60     } else {
61         chan->next = *queue;
62         chan->prev = (*queue)->prev;
63         chan->next->prev = chan;
64         chan->prev->next = chan;
65     }
66 }
67
68 /// Dequeue a chanstate from polled queue
69 static void dequeue_polled(struct waitset_chanstate **queue,
70                             struct waitset_chanstate *chan)
71 {
72     if (chan->polled_next == chan) {
73         assert(chan->polled_prev == chan);
74         assert(*queue == chan);
75         *queue = NULL;
76     } else {
77         chan->polled_prev->polled_next = chan->polled_next;
78         chan->polled_next->polled_prev = chan->polled_prev;
79         if (*queue == chan) {
80             *queue = chan->polled_next;
81         }
82     }
83     chan->polled_prev = chan->polled_next = NULL;
84 }
85
86 /// Enqueue a chanstate on polled queue
87 static void enqueue_polled(struct waitset_chanstate **queue,
88                             struct waitset_chanstate *chan)
89 {
90     if (*queue == NULL) {
91         *queue = chan;
92         chan->polled_next = chan->polled_prev = chan;
93     } else {
94         chan->polled_next = *queue;
95         chan->polled_prev = (*queue)->polled_prev;
96         chan->polled_next->polled_prev = chan;
97         chan->polled_prev->polled_next = chan;
98     }
99 }
100
101 /**
102  * \brief Initialise a new waitset
103  */
104 void waitset_init(struct waitset *ws)
105 {
106     assert(ws != NULL);
107     ws->pending = ws->polled = ws->idle = ws->waiting = NULL;
108     ws->waiting_threads = NULL;
109 }
110
111 /**
112  * \brief Destroy a previously initialised waitset
113  */
114 errval_t waitset_destroy(struct waitset *ws)
115 {
116     assert(ws != NULL);
117
118     // FIXME: do we want to support cancelling all the pending events/channels?
119     if (ws->pending || ws->waiting_threads) {
120         return LIB_ERR_WAITSET_IN_USE;
121     }
122
123     // remove idle and polled channels from waitset
124     struct waitset_chanstate *chan, *next;
125     for (chan = ws->idle; chan != NULL; chan = next) {
126         next = chan->next;
127         assert(chan->state == CHAN_IDLE);
128         assert(chan->waitset == ws);
129         chan->waitset = NULL;
130         chan->next = chan->prev = NULL;
131
132         if (next == ws->idle) {
133             break;
134         }
135     }
136     ws->idle = NULL;
137
138     for (chan = ws->polled; chan != NULL; chan = next) {
139         next = chan->next;
140         assert(chan->state == CHAN_POLLED);
141         assert(chan->waitset == ws);
142         chan->waitset = NULL;
143         chan->next = chan->prev = NULL;
144
145         if (next == ws->polled) {
146             break;
147         }
148     }
149     ws->polled = NULL;
150
151     return SYS_ERR_OK;
152 }
153
154 /// Check if a thread can receive an event
155 static bool waitset_check_token(struct waitset_chanstate *chan,
156                                 struct thread *thread)
157 {
158     bool res = false;
159
160     if (chan->wait_for) // if a thread is waiting for this specific event
161         res = chan->wait_for == thread;
162     else
163         res = (chan->token & 1 && !thread->token) // incoming token is a request
164             // and a thread is not waiting for a token
165             || (!chan->token && chan != thread->channel) // there's no token
166             // and a thread is not waiting specifically for that event
167             || (chan->token == thread->token && chan == thread->channel);
168             // there is a token and it matches thread's token and event
169     return res;
170 }
171
172 /// Returns a channel with a pending event on the given waitset matching
173 /// our thread
174 static struct waitset_chanstate *get_pending_event_disabled(struct waitset *ws,
175                                     struct waitset_chanstate *chan)
176 {
177     struct thread *me = thread_self();
178
179     if (chan) { // channel that we wait for
180         if (chan->state == CHAN_PENDING && waitset_check_token(chan, me)) {
181             return chan;
182         }
183         if (chan->state == CHAN_WAITING && waitset_check_token(chan, me)) {
184             return chan;
185         }
186     }
187     // check a waiting queue for matching event
188     for (chan = ws->waiting; chan; ) {
189         if (waitset_check_token(chan, me)) {
190             assert_disabled(chan->state == CHAN_WAITING);
191             return chan;
192         }
193         chan = chan->next;
194         if (chan == ws->waiting)
195             break;
196     }
197     // check a pending queue for matching event
198     for (chan = ws->pending; chan;) {
199         if (waitset_check_token(chan, me)) {
200             assert_disabled(chan->state == CHAN_PENDING);
201             return chan;
202         }
203         chan = chan->next;
204         if (chan == ws->pending)
205             break;
206     }
207     return NULL;
208 }
209
210 void arranet_polling_loop_proxy(void) __attribute__((weak));
211 void arranet_polling_loop_proxy(void)
212 {
213     USER_PANIC("Network polling not available without Arranet!\n");
214 }
215
216 void poll_ahci(struct waitset_chanstate *) __attribute__((weak));
217 void poll_ahci(struct waitset_chanstate *chan)
218 {
219     errval_t err = waitset_chan_trigger(chan);
220     assert(err_is_ok(err)); // should not be able to fail
221 }
222
223 /// Check polled channels
224 void poll_channels_disabled(dispatcher_handle_t handle) {
225     struct dispatcher_generic *dp = get_dispatcher_generic(handle);
226     struct waitset_chanstate *chan;
227
228     if (!dp->polled_channels)
229         return;
230     chan = dp->polled_channels;
231     do {
232         switch (chan->chantype) {
233 #ifdef CONFIG_INTERCONNECT_DRIVER_UMP
234         case CHANTYPE_UMP_IN: {
235             if (ump_endpoint_poll(chan)) {
236                 errval_t err = waitset_chan_trigger_disabled(chan, handle);
237                 assert(err_is_ok(err)); // should not fail
238                 if (!dp->polled_channels) // restart scan
239                     return;
240                 chan = dp->polled_channels;
241                 continue;
242             } else
243                 chan = chan->polled_next;
244         } break;
245 #endif // CONFIG_INTERCONNECT_DRIVER_UMP
246         case CHANTYPE_LWIP_SOCKET:
247             arranet_polling_loop_proxy();
248             break;
249         case CHANTYPE_AHCI:
250             poll_ahci(chan);
251             break;
252         default:
253             assert(!"invalid channel type to poll!");
254         }
255     } while (chan != dp->polled_channels);
256 }
257
258 /// Re-register a channel (if persistent)
259 static void reregister_channel(struct waitset *ws, struct waitset_chanstate *chan,
260                                 dispatcher_handle_t handle)
261 {
262     assert(chan->waitset == ws);
263     if (chan->state == CHAN_PENDING) {
264         dequeue(&ws->pending, chan);
265     } else {
266         assert(chan->state == CHAN_WAITING);
267         dequeue(&ws->waiting, chan);
268     }
269
270     chan->token = 0;
271     if (chan->chantype == CHANTYPE_UMP_IN
272         || chan->chantype == CHANTYPE_LWIP_SOCKET 
273         || chan->chantype == CHANTYPE_AHCI) {
274         enqueue(&ws->polled, chan);
275         enqueue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
276         chan->state = CHAN_POLLED;
277     } else {
278         enqueue(&ws->idle, chan);
279         chan->state = CHAN_IDLE;
280     }
281 }
282
283 /// Find a thread that is able to receive an event
284 static struct thread * find_recipient(struct waitset *ws,
285                         struct waitset_chanstate *channel, struct thread *me)
286 {
287     struct thread *t = ws->waiting_threads;
288
289     if (!t)
290         return NULL;
291     do {
292         if (waitset_check_token(channel, t))
293             return t;
294         t = t->next;
295     } while (t != ws->waiting_threads);
296     return ws->waiting_threads;
297 }
298
299 /// Wake up other thread if there's more pending events
300 static void wake_up_other_thread(dispatcher_handle_t handle, struct waitset *ws)
301 {
302     if (ws->pending && ws->waiting_threads) {
303         struct thread *t;
304
305         t = thread_unblock_one_disabled(handle, &ws->waiting_threads, NULL);
306         assert_disabled(t == NULL); // shouldn't see a remote thread
307     }
308 }
309
310 /**
311  * \brief Get next pending event
312  *
313  * Check if there is a pending event that matches current thread and return it.
314  * Pending events are in a pending queue and in a waiting queue.
315  * A pending event then will be removed from a pending/waiting queue and become
316  * unregistered or, if it's persistent, will be re-registered to an idle queue
317  * or a polled queue (UMP channels) of a waitset.
318  * If there's no pending event, block this thread.
319  * If there's a pending event but it doesn't match our thread, don't remove it
320  * from a pending queue and wake up a matching thread.
321  * If there's no matching thread, add it to a waiting queue.
322  *
323  * \param ws Waitset with sources of events
324  * \param retchannel Holder of returned event
325  * \param retclosure Holder of returned closure
326  * \param waitfor Specific event that we're waiting for (can be NULL)
327  * \param handle Dispatcher's handle
328  * \param debug Debug mode (not used)
329  */
330
331 errval_t get_next_event_disabled(struct waitset *ws,
332     struct waitset_chanstate **retchannel, struct event_closure *retclosure,
333     struct waitset_chanstate *waitfor, dispatcher_handle_t handle, bool debug)
334 {
335     struct waitset_chanstate * chan;
336
337     for (;;) {
338         chan = get_pending_event_disabled(ws, waitfor); // get our event
339         if (chan) {
340             *retchannel = chan;
341             *retclosure = chan->closure;
342             chan->wait_for = NULL;
343             chan->token = 0;
344             if (chan->persistent)
345                 reregister_channel(ws, chan, handle);
346             else
347                 waitset_chan_deregister_disabled(chan, handle);
348             wake_up_other_thread(handle, ws);
349             return SYS_ERR_OK;
350         }
351         chan = ws->pending; // check a pending queue
352         if (!chan) { // if nothing then wait
353             thread_block_disabled(handle, &ws->waiting_threads);
354             disp_disable();
355         } else { // something but it's not our event
356             if (!ws->waiting_threads) { // no other thread interested in
357                 dequeue(&ws->pending, chan);
358                 enqueue(&ws->waiting, chan);
359                 chan->state = CHAN_WAITING;
360                 chan->waitset = ws;
361             } else {
362                 // find a matching thread
363                 struct thread *t;
364                 for (t = ws->waiting_threads; t; ) {
365                     if (waitset_check_token(chan, t)) { // match found, wake it
366                         ws->waiting_threads = t;
367                         t = thread_unblock_one_disabled(handle,
368                                                     &ws->waiting_threads, chan);
369                         assert_disabled(t == NULL); // shouldn't see a remote thread
370                         break;
371                     }
372                     t = t->next;
373                     if (t == ws->waiting_threads) { // no recipient found
374                         dequeue(&ws->pending, chan);
375                         enqueue(&ws->waiting, chan);
376                         chan->state = CHAN_WAITING;
377                         chan->waitset = ws;
378                         break;
379                     }
380                 }
381             }
382         }
383     }
384 }
385
386 /**
387  * \brief Wait for (block) and return next event on given waitset
388  *
389  * Wait until something happens, either activity on some channel, or a deferred
390  * call, and then return the corresponding closure. This is the core of the
391  * event-handling system.
392  *
393  * \param ws Waitset
394  * \param retclosure Pointer to storage space for returned event closure
395  */
396 errval_t get_next_event(struct waitset *ws, struct event_closure *retclosure)
397 {
398     dispatcher_handle_t handle = disp_disable();
399     struct waitset_chanstate *channel;
400     errval_t err = get_next_event_disabled(ws, &channel, retclosure, NULL,
401                                             handle, false);
402     disp_enable(handle);
403     return err;
404 }
405
406
407
408 /**
409  * \brief Check if there is an event pending on given waitset
410  *
411  * This is essentially a non-blocking variant of get_next_event(). It should be
412  * used with great care, to avoid the creation of busy-waiting loops.
413  *
414  * \param ws Waitset
415  *
416  * \returns LIB_ERR_NO_EVENT if nothing is pending
417  */
418 static errval_t check_for_event_disabled(struct waitset *ws, dispatcher_handle_t handle)
419 {
420     struct waitset_chanstate *chan;
421
422     poll_channels_disabled(handle);
423     chan = get_pending_event_disabled(ws, NULL);
424     if (chan != NULL) {
425         return SYS_ERR_OK;
426     }
427     return LIB_ERR_NO_EVENT;
428 }
429
430 errval_t check_for_event(struct waitset *ws)
431 {
432     errval_t err;
433
434     assert(ws != NULL);
435     dispatcher_handle_t handle = disp_disable();
436     err = check_for_event_disabled(ws, handle);
437     disp_enable(handle);
438     return err;
439 }
440
441 /**
442  * \brief Wait for (block) and dispatch next event on given waitset
443  *
444  * Wait until something happens, either activity on some channel, or deferred
445  * call, and then call the corresponding closure.
446  *
447  * \param ws Waitset
448  */
449
450 errval_t event_dispatch(struct waitset *ws)
451 {
452     struct event_closure closure;
453     errval_t err = get_next_event(ws, &closure);
454     if (err_is_fail(err)) {
455         return err;
456     }
457
458     assert(closure.handler != NULL);
459     closure.handler(closure.arg);
460     return SYS_ERR_OK;
461 }
462
463 errval_t event_dispatch_debug(struct waitset *ws)
464 {
465     struct event_closure closure;
466     struct waitset_chanstate *channel;
467     dispatcher_handle_t handle = disp_disable();
468     errval_t err = get_next_event_disabled(ws, &channel, &closure, NULL, handle,
469                                             true);
470     disp_enable(handle);
471     if (err_is_fail(err)) {
472         return err;
473     }
474
475     assert(closure.handler != NULL);
476     closure.handler(closure.arg);
477     return SYS_ERR_OK;
478 }
479
480 /**
481  * \brief Dispatch events until a specific event is received
482  *
483  * Wait for events and dispatch them. If a specific event comes, don't call
484  * a closure, just return.
485  *
486  * \param ws Waitset
487  * \param waitfor Event, that we are waiting for
488  * \param error_var Error variable that can be changed by closures
489  */
490
491 errval_t wait_for_channel(struct waitset *ws, struct waitset_chanstate *waitfor,
492                             errval_t *error_var)
493 {
494     assert(waitfor->waitset == ws);
495     for (;;) {
496         struct event_closure closure;
497         struct waitset_chanstate *channel;
498
499         dispatcher_handle_t handle = disp_disable();
500         errval_t err = get_next_event_disabled(ws, &channel, &closure, waitfor,
501                                                 handle, false);
502         disp_enable(handle);
503         if (err_is_fail(err)) {
504             assert(0);
505             return err;
506         }
507         if (channel == waitfor) {
508             return SYS_ERR_OK;
509         }
510         assert(!channel->wait_for);
511         assert(closure.handler != NULL);
512         closure.handler(closure.arg);
513         if (err_is_fail(*error_var))
514             return *error_var;
515     }
516 }
517
518 /**
519  * \brief check and dispatch next event on given waitset
520  *
521  * Check if there is any pending activity on some channel, or deferred
522  * call, and then call the corresponding closure.
523  *
524  * Do not wait!  In case of no pending events, return err LIB_ERR_NO_EVENT.
525  *
526  * \param ws Waitset
527  */
528 errval_t event_dispatch_non_block(struct waitset *ws)
529 {
530     struct waitset_chanstate *channel;
531     struct event_closure closure;
532
533     assert(ws != NULL);
534
535     // are there any pending events on the waitset?
536     dispatcher_handle_t handle = disp_disable();
537     errval_t err = check_for_event_disabled(ws, handle);
538     if (err_is_fail(err)) {
539         disp_enable(handle);
540         return err;
541     }
542     err = get_next_event_disabled(ws, &channel, &closure, NULL, handle,
543                                             false);
544     if (err_is_fail(err))
545         return err;
546     disp_enable(handle);
547     assert(closure.handler != NULL);
548     closure.handler(closure.arg);
549     return SYS_ERR_OK;
550 }
551
552
553 /**
554  * \privatesection
555  * "Private" functions that are called only by the channel implementations
556  */
557
558 /**
559  * \brief Initialise per-channel waitset state
560  *
561  * \param chan Channel state
562  * \param chantype Channel type
563  */
564 void waitset_chanstate_init(struct waitset_chanstate *chan,
565                             enum ws_chantype chantype)
566 {
567     assert(chan != NULL);
568     chan->waitset = NULL;
569     chan->chantype = chantype;
570     chan->state = CHAN_UNREGISTERED;
571 #ifndef NDEBUG
572     chan->prev = chan->next = NULL;
573 #endif
574     chan->persistent = false;
575     chan->token = 0;
576     chan->wait_for = NULL;
577 }
578
579 /**
580  * \brief Destroy previously-initialised per-channel waitset state
581  * \param chan Channel state
582  */
583 void waitset_chanstate_destroy(struct waitset_chanstate *chan)
584 {
585     assert(chan != NULL);
586     if (chan->waitset != NULL) {
587         errval_t err = waitset_chan_deregister(chan);
588         assert(err_is_ok(err)); // can't fail if registered
589     }
590 }
591
592 /**
593  * \brief Register a closure to be called when a channel is triggered
594  *
595  * In the Future, call the closure on a thread associated with the waitset
596  * when the channel is triggered. Only one closure may be registered per
597  * channel state at any one time.
598  * This function must only be called when disabled.
599  *
600  * \param ws Waitset
601  * \param chan Waitset's per-channel state
602  * \param closure Event handler
603  */
604 errval_t waitset_chan_register_disabled(struct waitset *ws,
605                                         struct waitset_chanstate *chan,
606                                         struct event_closure closure)
607 {
608     if (chan->waitset != NULL) {
609         return LIB_ERR_CHAN_ALREADY_REGISTERED;
610     }
611
612     chan->waitset = ws;
613     chan->token = 0;
614
615     // channel must not already be registered!
616     assert_disabled(chan->next == NULL && chan->prev == NULL);
617     assert_disabled(chan->state == CHAN_UNREGISTERED);
618
619     // this is probably insane! :)
620     assert_disabled(closure.handler != NULL);
621
622     // store closure
623     chan->closure = closure;
624
625     // enqueue this channel on the waitset's queue of idle channels
626     enqueue(&ws->idle, chan);
627     chan->state = CHAN_IDLE;
628
629     return SYS_ERR_OK;
630 }
631
632 /**
633  * \brief Register a closure on a channel, and mark the channel as polled
634  *
635  * In the Future, call the closure on a thread associated with the waitset
636  * when the channel is triggered. Only one closure may be registered per
637  * channel state at any one time. Additionally, mark the channel as polled.
638  * This function must only be called when disabled.
639  *
640  * \param ws Waitset
641  * \param chan Waitset's per-channel state
642  * \param closure Event handler
643  * \param disp Current dispatcher pointer
644  */
645 errval_t waitset_chan_register_polled_disabled(struct waitset *ws,
646                                                struct waitset_chanstate *chan,
647                                                struct event_closure closure,
648                                                dispatcher_handle_t handle)
649 {
650     if (chan->waitset != NULL) {
651         return LIB_ERR_CHAN_ALREADY_REGISTERED;
652     }
653
654     chan->waitset = ws;
655     chan->token = 0;
656
657     // channel must not already be registered!
658     assert_disabled(chan->next == NULL && chan->prev == NULL);
659     assert_disabled(chan->state == CHAN_UNREGISTERED);
660
661     // store closure
662     chan->closure = closure;
663
664     // enqueue this channel on the waitset's queue of polled channels
665     enqueue(&ws->polled, chan);
666     chan->state = CHAN_POLLED;
667     enqueue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
668
669     return SYS_ERR_OK;
670 }
671
672 /**
673  * \brief Register a closure to be called when a channel is triggered
674  *
675  * In the Future, call the closure on a thread associated with the waitset
676  * when the channel is triggered. Only one closure may be registered per
677  * channel state at any one time.
678  * This function must only be called when enabled.
679  *
680  * \param ws Waitset
681  * \param chan Waitset's per-channel state
682  * \param closure Event handler
683  */
684 errval_t waitset_chan_register(struct waitset *ws, struct waitset_chanstate *chan,
685                                struct event_closure closure)
686 {
687     dispatcher_handle_t handle = disp_disable();
688     errval_t err = waitset_chan_register_disabled(ws, chan, closure);
689     disp_enable(handle);
690     return err;
691 }
692
693 /**
694  * \brief Register a closure on a channel, and mark the channel as polled
695  *
696  * In the Future, call the closure on a thread associated with the waitset
697  * when the channel is triggered. Only one closure may be registered per
698  * channel state at any one time. Additionally, mark the channel as polled.
699  * This function must only be called when enabled. It is equivalent to
700  * calling waitset_chan_register() followed by waitset_chan_start_polling().
701  *
702  * \param ws Waitset
703  * \param chan Waitset's per-channel state
704  * \param closure Event handler
705  */
706 errval_t waitset_chan_register_polled(struct waitset *ws,
707                                       struct waitset_chanstate *chan,
708                                       struct event_closure closure)
709 {
710     dispatcher_handle_t handle = disp_disable();
711     errval_t err = waitset_chan_register_polled_disabled(ws, chan, closure, handle);
712     disp_enable(handle);
713     return err;
714 }
715
716 /**
717  * \brief Cancel a previous callback registration
718  *
719  * Remove the registration for a callback on the given channel.
720  * This function must only be called when disabled.
721  *
722  * \param chan Waitset's per-channel state
723  */
724 errval_t waitset_chan_deregister_disabled(struct waitset_chanstate *chan,
725                                           dispatcher_handle_t handle)
726 {
727     assert_disabled(chan != NULL);
728     struct waitset *ws = chan->waitset;
729     if (ws == NULL) {
730         return LIB_ERR_CHAN_NOT_REGISTERED;
731     }
732
733     // remove this channel from the queue in which it is waiting
734     chan->waitset = NULL;
735     assert_disabled(chan->next != NULL && chan->prev != NULL);
736
737     switch (chan->state) {
738     case CHAN_IDLE:
739         dequeue(&ws->idle, chan);
740         break;
741
742     case CHAN_POLLED:
743         dequeue(&ws->polled, chan);
744         dequeue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
745         break;
746
747     case CHAN_PENDING:
748         dequeue(&ws->pending, chan);
749         break;
750
751     case CHAN_WAITING:
752         dequeue(&ws->waiting, chan);
753         break;
754
755     default:
756         assert_disabled(!"invalid channel state in deregister");
757     }
758     chan->state = CHAN_UNREGISTERED;
759     chan->wait_for = NULL;
760     return SYS_ERR_OK;
761 }
762
763 /**
764  * \brief Cancel a previous callback registration
765  *
766  * Remove the registration for a callback on the given channel.
767  * This function must only be called when enabled.
768  *
769  * \param chan Waitset's per-channel state
770  */
771 errval_t waitset_chan_deregister(struct waitset_chanstate *chan)
772 {
773     dispatcher_handle_t handle = disp_disable();
774     errval_t err = waitset_chan_deregister_disabled(chan, handle);
775     disp_enable(handle);
776     return err;
777 }
778
779 /**
780  * \brief Migrate callback registrations to a new waitset.
781  *
782  * \param chan Old waitset's per-channel state to migrate
783  * \param new_ws New waitset to migrate to
784  */
785 void waitset_chan_migrate(struct waitset_chanstate *chan,
786                           struct waitset *new_ws)
787 {
788     struct waitset *ws = chan->waitset;
789
790     // Only when registered
791     if(ws == NULL) {
792         return;
793     }
794
795     switch(chan->state) {
796     case CHAN_IDLE:
797         dequeue(&ws->idle, chan);
798         enqueue(&new_ws->idle, chan);
799         break;
800
801     case CHAN_POLLED:
802         dequeue(&ws->polled, chan);
803         enqueue(&new_ws->polled, chan);
804         break;
805
806     case CHAN_PENDING:
807         dequeue(&ws->pending, chan);
808         enqueue(&new_ws->pending, chan);
809         break;
810
811     case CHAN_WAITING:
812         dequeue(&ws->waiting, chan);
813         enqueue(&new_ws->waiting, chan);
814         break;
815
816     case CHAN_UNREGISTERED:
817         // Do nothing
818         break;
819     }
820
821     // Remember new waitset association
822     chan->waitset = new_ws;
823 }
824
825 /**
826  * \brief Trigger an event callback on a channel
827  *
828  * Marks the given channel as having a pending event, causing some future call
829  * to get_next_event() to return the registered closure.
830  * This function must only be called when disabled.
831  *
832  * \param chan Waitset's per-channel state
833  * \param disp Current dispatcher pointer
834  */
835 errval_t waitset_chan_trigger_disabled(struct waitset_chanstate *chan,
836                                        dispatcher_handle_t handle)
837 {
838     assert_disabled(chan != NULL);
839     struct waitset *ws = chan->waitset;
840     assert_disabled(ws != NULL);
841     assert_disabled(chan->prev != NULL && chan->next != NULL);
842
843     // no-op if already pending
844     if (chan->state == CHAN_PENDING) {
845         return SYS_ERR_OK;
846     }
847
848     // remove from previous queue (either idle or polled)
849     if (chan->state == CHAN_IDLE) {
850         dequeue(&ws->idle, chan);
851     } else {
852         assert_disabled(chan->state == CHAN_POLLED);
853         dequeue(&ws->polled, chan);
854         dequeue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
855     }
856
857     // else mark channel pending and move to end of pending event queue
858     enqueue(&ws->pending, chan);
859     chan->state = CHAN_PENDING;
860
861     // is there a thread blocked on this waitset? if so, awaken it with the event
862     struct thread *thread = find_recipient(ws, chan, thread_self());
863     if (thread) {
864         struct thread *t;
865         ws->waiting_threads = thread;
866         t = thread_unblock_one_disabled(handle, &ws->waiting_threads, chan);
867         assert_disabled(t == NULL);
868     }
869     return SYS_ERR_OK;
870 }
871
872 /**
873  * \brief Trigger an event callback on a channel
874  *
875  * Marks the given channel as having a pending event, causing some future call
876  * to get_next_event() to return the registered closure.
877  * This function must only be called when enabled.
878  *
879  * \param chan Waitset's per-channel state
880  * \param disp Current dispatcher pointer
881  */
882 errval_t waitset_chan_trigger(struct waitset_chanstate *chan)
883 {
884     dispatcher_handle_t handle = disp_disable();
885     errval_t err = waitset_chan_trigger_disabled(chan, handle);
886     disp_enable(handle);
887     return err;
888 }
889
890 /**
891  * \brief Trigger a specific event callback on an unregistered channel
892  *
893  * This function is equivalent to waitset_chan_register_disabled() immediately
894  * followed by waitset_chan_trigger_disabled(), but avoids unneccessary queue
895  * manipulation. This function must only be called when disabled.
896  *
897  * \param ws Waitset
898  * \param chan Waitset's per-channel state
899  * \param closure Event handler
900  * \param disp Current dispatcher pointer
901  */
902 errval_t waitset_chan_trigger_closure_disabled(struct waitset *ws,
903                                                struct waitset_chanstate *chan,
904                                                struct event_closure closure,
905                                                dispatcher_handle_t handle)
906 {
907     assert_disabled(chan != NULL);
908     assert_disabled(ws != NULL);
909
910     // check if already registered
911     if (chan->waitset != NULL || chan->state != CHAN_UNREGISTERED) {
912         return LIB_ERR_CHAN_ALREADY_REGISTERED;
913     }
914
915     assert_disabled(chan->prev == NULL && chan->next == NULL);
916
917     // set closure
918     chan->closure = closure;
919
920     // mark channel pending and place on end of pending event queue
921     chan->waitset = ws;
922     enqueue(&ws->pending, chan);
923     // if (first)
924     //     ws->pending = chan;
925     chan->state = CHAN_PENDING;
926
927     // is there a thread blocked on this waitset? if so, awaken it with the event
928     struct thread *thread = find_recipient(ws, chan, thread_self());
929     if (thread) {
930         struct thread *t;
931         ws->waiting_threads = thread;
932         t = thread_unblock_one_disabled(handle, &ws->waiting_threads, chan);
933         assert_disabled(t == NULL);
934     }
935     return SYS_ERR_OK;
936 }
937
938
939 /**
940  * \brief Trigger a specific event callback on an unregistered channel
941  *
942  * This function is equivalent to waitset_chan_register()
943  * followed by waitset_chan_trigger(), but avoids unneccessary queue
944  * manipulation. This function must only be called when enabled.
945  *
946  * \param ws Waitset
947  * \param chan Waitset's per-channel state
948  * \param closure Event handler
949  */
950 errval_t waitset_chan_trigger_closure(struct waitset *ws,
951                                       struct waitset_chanstate *chan,
952                                       struct event_closure closure)
953 {
954     dispatcher_handle_t disp = disp_disable();
955     errval_t err = waitset_chan_trigger_closure_disabled(ws, chan, closure, disp);
956     disp_enable(disp);
957     return err;
958 }