8c772a09c349af51826f6df25571a159721877c7
[barrelfish] / lib / barrelfish / waitset.c
1 /**
2  * \file
3  * \brief Waitset and low-level event handling mechanism
4  *
5  * A "wait set" is a collection of channels to wait on, much like an
6  * FDSET in POSIX. There should be a default, static wait set for each
7  * dispatcher. Threads which wait for events specify the wait set they
8  * are waiting on.
9  */
10
11 /*
12  * Copyright (c) 2009-2012, ETH Zurich.
13  * Copyright (c) 2015, Hewlett Packard Enterprise Development LP.
14  * All rights reserved.
15  *
16  * This file is distributed under the terms in the attached LICENSE file.
17  * If you do not find this file, copies can be found by writing to:
18  * ETH Zurich D-INFK, Universitaetstr. 6, CH-8092 Zurich. Attn: Systems Group.
19  */
20
21 #include <barrelfish/barrelfish.h>
22 #include <barrelfish/waitset.h>
23 #include <barrelfish/waitset_chan.h>
24 #include <barrelfish/threads.h>
25 #include <barrelfish/dispatch.h>
26 #include "threads_priv.h"
27 #include "waitset_chan_priv.h"
28 #include <stdio.h>
29 #include <string.h>
30
31 #ifdef CONFIG_INTERCONNECT_DRIVER_UMP
32 #  include <barrelfish/ump_endpoint.h>
33 #endif
34
35 #if defined(__k1om__) || defined(__aarch64__)
36 #include <barrelfish_kpi/asm_inlines_arch.h>
37 static inline cycles_t cyclecount(void)
38 {
39     return rdtsc();
40 }
41 #elif defined(__x86_64__) || defined(__i386__)
42 #include <arch/x86/barrelfish_kpi/asm_inlines_arch.h>
43 static inline cycles_t cyclecount(void)
44 {
45     return rdtsc();
46 }
47 #elif defined(__arm__) && defined(__gem5__)
48 /**
49  * XXX: Gem5 doesn't support the ARM performance monitor extension
50  * therefore we just poll a fixed number of times instead of using
51  * cycle counts. POLL_COUNT is deliberately set to 42, guess why! ;)
52  */
53 #define POLL_COUNT      42
54 #elif defined(__aarch64__) && defined(__gem5__)
55 #define POLL_COUNT  42
56 #elif defined(__arm__)
57 #include <arch/arm/barrelfish_kpi/asm_inlines_arch.h>
58 static inline cycles_t cyclecount(void)
59 {
60     return get_cycle_count();
61 }
62 #else
63 static inline cycles_t cyclecount(void)
64 {
65     USER_PANIC("called on non-x86 architecture. why are we polling?");
66     return 0;
67 }
68 #endif
69
70 // FIXME: bogus default value. need to measure this at boot time
71 #define WAITSET_POLL_CYCLES_DEFAULT 2000
72
73 /// Maximum number of cycles to spend polling channels before yielding CPU
74 cycles_t waitset_poll_cycles = WAITSET_POLL_CYCLES_DEFAULT;
75
76 /**
77  * \brief Initialise a new waitset
78  */
79 void waitset_init(struct waitset *ws)
80 {
81     assert(ws != NULL);
82     ws->pending = ws->polled = ws->idle = NULL;
83     ws->waiting_threads = NULL;
84     ws->polling = false;
85 }
86
87 /**
88  * \brief Destroy a previously initialised waitset
89  */
90 errval_t waitset_destroy(struct waitset *ws)
91 {
92     assert(ws != NULL);
93
94     // FIXME: do we want to support cancelling all the pending events/channels?
95     if (ws->pending || ws->waiting_threads) {
96         return LIB_ERR_WAITSET_IN_USE;
97     }
98
99     // remove idle and polled channels from waitset
100     struct waitset_chanstate *chan, *next;
101     for (chan = ws->idle; chan != NULL; chan = next) {
102         next = chan->next;
103         assert(chan->state == CHAN_IDLE);
104         assert(chan->waitset == ws);
105         chan->waitset = NULL;
106         chan->next = chan->prev = NULL;
107
108         if (next == ws->idle) {
109             break;
110         }
111     }
112     ws->idle = NULL;
113
114     for (chan = ws->polled; chan != NULL; chan = next) {
115         next = chan->next;
116         assert(chan->state == CHAN_POLLED);
117         assert(chan->waitset == ws);
118         chan->waitset = NULL;
119         chan->next = chan->prev = NULL;
120
121         if (next == ws->polled) {
122             break;
123         }
124     }
125     ws->polled = NULL;
126
127     return SYS_ERR_OK;
128 }
129
130 /// Returns a channel with a pending event on the given waitset, or NULL
131 static struct waitset_chanstate *get_pending_event_disabled(struct waitset *ws)
132 {
133     // are there any pending events on the waitset?
134     if (ws->pending == NULL) {
135         return NULL;
136     }
137
138     // dequeue next pending event
139     struct waitset_chanstate *chan = ws->pending;
140     if (chan->next == chan) {
141         assert_disabled(chan->prev == chan);
142         ws->pending = NULL;
143     } else {
144         ws->pending = chan->next;
145         chan->prev->next = chan->next;
146         chan->next->prev = chan->prev;
147     }
148 #ifndef NDEBUG
149     chan->prev = chan->next = NULL;
150 #endif
151
152     // mark not pending
153     assert_disabled(chan->state == CHAN_PENDING);
154     chan->state = CHAN_UNREGISTERED;
155     chan->waitset = NULL;
156
157     return chan;
158 }
159
160 #ifdef CONFIG_INTERCONNECT_DRIVER_UMP
161 /**
162  * \brief Poll an incoming UMP endpoint.
163  * This is logically part of the UMP endpoint implementation, but placed here
164  * for easier inlining.
165  */
166 static inline void ump_endpoint_poll(struct waitset_chanstate *chan)
167 {
168     /* XXX: calculate location of endpoint from waitset channel state */
169     struct ump_endpoint *ep = (struct ump_endpoint *)
170         ((char *)chan - offsetof(struct ump_endpoint, waitset_state));
171
172     if (ump_endpoint_can_recv(ep)) {
173         errval_t err = waitset_chan_trigger(chan);
174         assert(err_is_ok(err)); // should not be able to fail
175     }
176 }
177 #endif // CONFIG_INTERCONNECT_DRIVER_UMP
178
179
180 void arranet_polling_loop_proxy(void) __attribute__((weak));
181 void arranet_polling_loop_proxy(void)
182 {
183     USER_PANIC("Network polling not available without Arranet!\n");
184 }
185
186 void poll_ahci(struct waitset_chanstate *) __attribute__((weak));
187 void poll_ahci(struct waitset_chanstate *chan)
188 {
189     errval_t err = waitset_chan_trigger(chan);
190     assert(err_is_ok(err)); // should not be able to fail
191 }
192
193 /// Helper function that knows how to poll the given channel, based on its type
194 static void poll_channel(struct waitset_chanstate *chan)
195 {
196     switch (chan->chantype) {
197 #ifdef CONFIG_INTERCONNECT_DRIVER_UMP
198     case CHANTYPE_UMP_IN:
199         ump_endpoint_poll(chan);
200         break;
201 #endif // CONFIG_INTERCONNECT_DRIVER_UMP
202
203     case CHANTYPE_LWIP_SOCKET:
204         arranet_polling_loop_proxy();
205         break;
206
207     case CHANTYPE_AHCI:
208         poll_ahci(chan);
209         break;
210
211     default:
212         assert(!"invalid channel type to poll!");
213     }
214 }
215
216 // pollcycles_*: arch-specific implementation for polling.
217 //               Used by get_next_event().
218 //
219 //   pollcycles_reset()  -- return the number of pollcycles we want to poll for
220 //   pollcycles_update() -- update the pollcycles variable. This is needed for
221 //                          implementations where we don't have a cycle counter
222 //                          and we just count the number of polling operations
223 //                          performed
224 //   pollcycles_expired() -- check if pollcycles have expired
225 //
226 // We might want to move them to architecture-specific files, and/or create a
227 // cleaner interface. For now, I just wanted to keep them out of
228 // get_next_event()
229
230 #if defined(__ARM_ARCH_7A__) && defined(__GNUC__) \
231         && __GNUC__ == 4 && __GNUC_MINOR__ <= 6 && __GNUC_PATCHLEVEL__ <= 3
232 static __attribute__((noinline, unused))
233 #else
234 static inline
235 #endif
236 cycles_t pollcycles_reset(void)
237 {
238     cycles_t pollcycles;
239 #if defined(__arm__) && !defined(__gem5__)
240     reset_cycle_counter();
241     pollcycles = waitset_poll_cycles;
242 #elif defined(__arm__) && defined(__gem5__)
243     pollcycles = 0;
244 #elif defined(__aarch64__) && defined(__gem5__)
245     pollcycles = 0;
246 #else
247     pollcycles = cyclecount() + waitset_poll_cycles;
248 #endif
249     return pollcycles;
250 }
251
252 #if defined(__ARM_ARCH_7A__) && defined(__GNUC__) \
253         && __GNUC__ == 4 && __GNUC_MINOR__ <= 6 && __GNUC_PATCHLEVEL__ <= 3
254 static __attribute__((noinline, unused))
255 #else
256 static inline
257 #endif
258 cycles_t pollcycles_update(cycles_t pollcycles)
259 {
260     cycles_t ret = pollcycles;
261     #if defined(__arm__) && defined(__gem5__)
262     ret++;
263         #elif defined(__aarch64__) && defined(__gem5__)
264         ret++;
265     #endif
266     return ret;
267 }
268
269 #if defined(__ARM_ARCH_7A__) && defined(__GNUC__) \
270         && __GNUC__ == 4 && __GNUC_MINOR__ <= 6 && __GNUC_PATCHLEVEL__ <= 3
271 static __attribute__((noinline, unused))
272 #else
273 static inline
274 #endif
275 bool pollcycles_expired(cycles_t pollcycles)
276 {
277     bool ret;
278     #if defined(__arm__) && !defined(__gem5__)
279     ret = (cyclecount() > pollcycles || is_cycle_counter_overflow());
280     #elif defined(__arm__) && defined(__gem5__)
281     ret = pollcycles >= POLL_COUNT;
282     #elif defined(__aarch64__) && defined(__gem5__)
283     ret = pollcycles >= POLL_COUNT;
284     #else
285     ret = cyclecount() > pollcycles;
286     #endif
287     return ret;
288 }
289
290 static errval_t get_next_event_debug(struct waitset *ws,
291         struct event_closure *retclosure, bool debug)
292 {
293     struct waitset_chanstate *chan;
294     bool was_polling = false;
295     cycles_t pollcycles;
296
297     assert(ws != NULL);
298     assert(retclosure != NULL);
299
300     // unconditionally disable ourselves and check for events
301     // if we decide we have to start polling, we'll jump back up here
302     goto check_for_events;
303
304     /* ------------ POLLING LOOP; RUNS WHILE ENABLED ------------ */
305 polling_loop:
306     was_polling = true;
307     assert(ws->polling); // this thread is polling
308     // get the amount of cycles we want to poll for
309     pollcycles = pollcycles_reset();
310
311     // while there are no pending events, poll channels
312     while (ws->polled != NULL && ws->pending == NULL) {
313         struct waitset_chanstate *nextchan = NULL;
314         // NB: Polling policy is to return as soon as a pending event
315         // appears, not bother looking at the rest of the polling queue
316         for (chan = ws->polled;
317              chan != NULL && chan->waitset == ws && chan->state == CHAN_POLLED
318                  && ws->pending == NULL;
319              chan = nextchan) {
320
321             nextchan = chan->next;
322             poll_channel(chan);
323             // update pollcycles
324             pollcycles = pollcycles_update(pollcycles);
325             // yield the thread if we exceed the cycle count limit
326             if (ws->pending == NULL && pollcycles_expired(pollcycles)) {
327                 if (debug) {
328                 if (strcmp(disp_name(), "netd") != 0) {
329                     // Print the callback trace so that we know which call is leading
330                     // the schedule removal and
331                     printf("%s: callstack: %p %p %p %p\n", disp_name(),
332                             __builtin_return_address(0),
333                             __builtin_return_address(1),
334                             __builtin_return_address(2),
335                             __builtin_return_address(3));
336                 }
337
338                 }
339                 thread_yield();
340                 pollcycles = pollcycles_reset();
341             }
342         }
343
344         // ensure that we restart polling from the place we left off here,
345         // if the next channel is a valid one
346         if (nextchan != NULL && nextchan->waitset == ws
347             && nextchan->state == CHAN_POLLED) {
348             ws->polled = nextchan;
349         }
350     }
351
352     /* ------------ STATE MACHINERY; RUNS WHILE DISABLED ------------ */
353 check_for_events: ;
354     dispatcher_handle_t handle = disp_disable();
355
356     // are there any pending events on the waitset?
357     chan = get_pending_event_disabled(ws);
358     if (chan != NULL) {
359         // if we need to poll, and we have a blocked thread, wake it up to do so
360         if (was_polling && ws->polled != NULL && ws->waiting_threads != NULL) {
361             // start a blocked thread polling
362             struct thread *t;
363             t = thread_unblock_one_disabled(handle, &ws->waiting_threads, NULL);
364             assert_disabled(t == NULL); // shouldn't see a remote thread
365         } else if (was_polling) {
366             // I'm stopping polling, and there is nobody else
367             assert_disabled(ws->polling);
368             ws->polling = false;
369         }
370         disp_enable(handle);
371
372         *retclosure = chan->closure;
373         return SYS_ERR_OK;
374     }
375
376     // If we got here and there are channels to poll but no-one is polling,
377     // then either we never polled, or we lost a race on the channel we picked.
378     // Either way, we'd better start polling again.
379     if (ws->polled != NULL && (was_polling || !ws->polling)) {
380         if (!was_polling) {
381             ws->polling = true;
382         }
383         disp_enable(handle);
384         goto polling_loop;
385     }
386
387     // otherwise block awaiting an event
388     chan = thread_block_disabled(handle, &ws->waiting_threads);
389
390     if (chan == NULL) {
391         // not a real event, just a wakeup to get us to start polling!
392         assert(ws->polling);
393         goto polling_loop;
394     } else {
395         *retclosure = chan->closure;
396         return SYS_ERR_OK;
397     }
398 }
399
400 /**
401  * \brief Wait for (block) and return next event on given waitset
402  *
403  * Wait until something happens, either activity on some channel, or a deferred
404  * call, and then return the corresponding closure. This is the core of the
405  * event-handling system.
406  *
407  * \param ws Waitset
408  * \param retclosure Pointer to storage space for returned event closure
409  */
410 errval_t get_next_event(struct waitset *ws, struct event_closure *retclosure)
411 {
412     return get_next_event_debug(ws, retclosure, false);
413 }
414
415
416
417 /**
418  * \brief Return next event on given waitset, if one is already pending
419  *
420  * This is essentially a non-blocking variant of get_next_event(). It should be
421  * used with great care, to avoid the creation of busy-waiting loops.
422  *
423  * \param ws Waitset
424  * \param retclosure Pointer to storage space for returned event closure
425  *
426  * \returns LIB_ERR_NO_EVENT if nothing is pending
427  */
428 errval_t check_for_event(struct waitset *ws, struct event_closure *retclosure)
429 {
430     struct waitset_chanstate *chan;
431     int pollcount = 0;
432
433     assert(ws != NULL);
434     assert(retclosure != NULL);
435
436  recheck: ;
437     // are there any pending events on the waitset?
438     dispatcher_handle_t handle = disp_disable();
439     chan = get_pending_event_disabled(ws);
440     disp_enable(handle);
441     if (chan != NULL) {
442         *retclosure = chan->closure;
443         return SYS_ERR_OK;
444     }
445
446     // if there are no pending events, poll all channels once
447     if (ws->polled != NULL && pollcount++ == 0) {
448         for (chan = ws->polled;
449              chan != NULL && chan->waitset == ws && chan->state == CHAN_POLLED;
450              chan = chan->next) {
451
452             poll_channel(chan);
453             if (ws->pending != NULL) {
454                 goto recheck;
455             }
456
457             if (chan->next == ws->polled) { // reached the start of the queue
458                 break;
459             }
460         }
461     }
462
463     return LIB_ERR_NO_EVENT;
464 }
465
466 /**
467  * \brief Wait for (block) and dispatch next event on given waitset
468  *
469  * Wait until something happens, either activity on some channel, or deferred
470  * call, and then call the corresponding closure.
471  *
472  * \param ws Waitset
473  */
474
475 errval_t event_dispatch(struct waitset *ws)
476 {
477     struct event_closure closure;
478     errval_t err = get_next_event(ws, &closure);
479     if (err_is_fail(err)) {
480         return err;
481     }
482
483     assert(closure.handler != NULL);
484     closure.handler(closure.arg);
485     return SYS_ERR_OK;
486 }
487
488 errval_t event_dispatch_debug(struct waitset *ws)
489 {
490     struct event_closure closure;
491     errval_t err = get_next_event_debug(ws, &closure, false);
492     if (err_is_fail(err)) {
493         return err;
494     }
495
496     assert(closure.handler != NULL);
497 //    printf("%s: event_dispatch: %p: \n", disp_name(), closure.handler);
498
499
500     closure.handler(closure.arg);
501     return SYS_ERR_OK;
502 }
503
504 /**
505  * \brief check and dispatch next event on given waitset
506  *
507  * Check if there is any pending activity on some channel, or deferred
508  * call, and then call the corresponding closure.
509  *
510  * Do not wait!  In case of no pending events, return err LIB_ERR_NO_EVENT.
511  *
512  * \param ws Waitset
513  */
514 errval_t event_dispatch_non_block(struct waitset *ws)
515 {
516     assert(ws != NULL);
517     struct event_closure closure;
518     errval_t err = check_for_event(ws, &closure);
519
520     if (err_is_fail(err)) {
521         return err;
522     }
523
524     assert(closure.handler != NULL);
525     closure.handler(closure.arg);
526     return SYS_ERR_OK;
527 }
528
529
530 /**
531  * \privatesection
532  * "Private" functions that are called only by the channel implementations
533  */
534
535 /**
536  * \brief Initialise per-channel waitset state
537  *
538  * \param chan Channel state
539  * \param chantype Channel type
540  */
541 void waitset_chanstate_init(struct waitset_chanstate *chan,
542                             enum ws_chantype chantype)
543 {
544     assert(chan != NULL);
545     chan->waitset = NULL;
546     chan->chantype = chantype;
547     chan->state = CHAN_UNREGISTERED;
548 #ifndef NDEBUG
549     chan->prev = chan->next = NULL;
550 #endif
551 }
552
553 /**
554  * \brief Destroy previously-initialised per-channel waitset state
555  * \param chan Channel state
556  */
557 void waitset_chanstate_destroy(struct waitset_chanstate *chan)
558 {
559     assert(chan != NULL);
560     if (chan->waitset != NULL) {
561         errval_t err = waitset_chan_deregister(chan);
562         assert(err_is_ok(err)); // can't fail if registered
563     }
564 }
565
566 /**
567  * \brief Register a closure to be called when a channel is triggered
568  *
569  * In the Future, call the closure on a thread associated with the waitset
570  * when the channel is triggered. Only one closure may be registered per
571  * channel state at any one time.
572  * This function must only be called when disabled.
573  *
574  * \param ws Waitset
575  * \param chan Waitset's per-channel state
576  * \param closure Event handler
577  */
578 errval_t waitset_chan_register_disabled(struct waitset *ws,
579                                         struct waitset_chanstate *chan,
580                                         struct event_closure closure)
581 {
582     if (chan->waitset != NULL) {
583         return LIB_ERR_CHAN_ALREADY_REGISTERED;
584     }
585
586     chan->waitset = ws;
587
588     // channel must not already be registered!
589     assert_disabled(chan->next == NULL && chan->prev == NULL);
590     assert_disabled(chan->state == CHAN_UNREGISTERED);
591
592     // this is probably insane! :)
593     assert_disabled(closure.handler != NULL);
594
595     // store closure
596     chan->closure = closure;
597
598     // enqueue this channel on the waitset's queue of idle channels
599     if (ws->idle == NULL) {
600         chan->next = chan->prev = chan;
601         ws->idle = chan;
602     } else {
603         chan->next = ws->idle;
604         chan->prev = chan->next->prev;
605         chan->next->prev = chan;
606         chan->prev->next = chan;
607     }
608     chan->state = CHAN_IDLE;
609
610     return SYS_ERR_OK;
611 }
612
613 /**
614  * \brief Register a closure on a channel, and mark the channel as polled
615  *
616  * In the Future, call the closure on a thread associated with the waitset
617  * when the channel is triggered. Only one closure may be registered per
618  * channel state at any one time. Additionally, mark the channel as polled.
619  * This function must only be called when disabled.
620  *
621  * \param ws Waitset
622  * \param chan Waitset's per-channel state
623  * \param closure Event handler
624  * \param disp Current dispatcher pointer
625  */
626 errval_t waitset_chan_register_polled_disabled(struct waitset *ws,
627                                                struct waitset_chanstate *chan,
628                                                struct event_closure closure,
629                                                dispatcher_handle_t handle)
630 {
631     if (chan->waitset != NULL) {
632         return LIB_ERR_CHAN_ALREADY_REGISTERED;
633     }
634
635     chan->waitset = ws;
636
637     // channel must not already be registered!
638     assert_disabled(chan->next == NULL && chan->prev == NULL);
639     assert_disabled(chan->state == CHAN_UNREGISTERED);
640
641     // store closure
642     chan->closure = closure;
643
644     // enqueue this channel on the waitset's queue of polled channels
645     if (ws->polled == NULL) {
646         chan->next = chan->prev = chan;
647         ws->polled = chan;
648         if (ws->waiting_threads != NULL && !ws->polling) {
649             // start a blocked thread polling
650             ws->polling = true;
651             struct thread *t;
652             t = thread_unblock_one_disabled(handle, &ws->waiting_threads, NULL);
653             assert_disabled(t == NULL); // shouldn't see a remote thread: waitsets are per-dispatcher
654         }
655     } else {
656         chan->next = ws->polled;
657         chan->prev = chan->next->prev;
658         chan->next->prev = chan;
659         chan->prev->next = chan;
660     }
661     chan->state = CHAN_POLLED;
662
663     return SYS_ERR_OK;
664 }
665
666 /**
667  * \brief Register a closure to be called when a channel is triggered
668  *
669  * In the Future, call the closure on a thread associated with the waitset
670  * when the channel is triggered. Only one closure may be registered per
671  * channel state at any one time.
672  * This function must only be called when enabled.
673  *
674  * \param ws Waitset
675  * \param chan Waitset's per-channel state
676  * \param closure Event handler
677  */
678 errval_t waitset_chan_register(struct waitset *ws, struct waitset_chanstate *chan,
679                                struct event_closure closure)
680 {
681     dispatcher_handle_t handle = disp_disable();
682     errval_t err = waitset_chan_register_disabled(ws, chan, closure);
683     disp_enable(handle);
684     return err;
685 }
686
687 /**
688  * \brief Register a closure on a channel, and mark the channel as polled
689  *
690  * In the Future, call the closure on a thread associated with the waitset
691  * when the channel is triggered. Only one closure may be registered per
692  * channel state at any one time. Additionally, mark the channel as polled.
693  * This function must only be called when enabled. It is equivalent to
694  * calling waitset_chan_register() followed by waitset_chan_start_polling().
695  *
696  * \param ws Waitset
697  * \param chan Waitset's per-channel state
698  * \param closure Event handler
699  */
700 errval_t waitset_chan_register_polled(struct waitset *ws,
701                                       struct waitset_chanstate *chan,
702                                       struct event_closure closure)
703 {
704     dispatcher_handle_t handle = disp_disable();
705     errval_t err = waitset_chan_register_polled_disabled(ws, chan, closure, handle);
706     disp_enable(handle);
707     return err;
708 }
709
710 /**
711  * \brief Mark an idle channel as polled
712  *
713  * The given channel will periodically have its poll function called.
714  * The channel must already be registered.
715  *
716  * \param chan Waitset's per-channel state
717  */
718 errval_t waitset_chan_start_polling(struct waitset_chanstate *chan)
719 {
720     errval_t err = SYS_ERR_OK;
721
722     dispatcher_handle_t handle = disp_disable();
723
724     struct waitset *ws = chan->waitset;
725     if (ws == NULL) {
726         err = LIB_ERR_CHAN_NOT_REGISTERED;
727         goto out;
728     }
729
730     assert(chan->state != CHAN_UNREGISTERED);
731     if (chan->state != CHAN_IDLE) {
732         goto out; // no-op if polled or pending
733     }
734
735     // remove from idle queue
736     if (chan->next == chan) {
737         assert(chan->prev == chan);
738         assert(ws->idle == chan);
739         ws->idle = NULL;
740     } else {
741         chan->prev->next = chan->next;
742         chan->next->prev = chan->prev;
743         if (ws->idle == chan) {
744             ws->idle = chan->next;
745         }
746     }
747
748     // enqueue on polled queue
749     if (ws->polled == NULL) {
750         ws->polled = chan;
751         chan->next = chan->prev = chan;
752         if (ws->waiting_threads != NULL && !ws->polling) {
753             // start a blocked thread polling
754             ws->polling = true;
755             struct thread *t;
756             t = thread_unblock_one_disabled(handle, &ws->waiting_threads, NULL);
757             assert(t == NULL); // shouldn't see a remote thread: waitsets are per-dispatcher
758         }
759     } else {
760         chan->next = ws->polled;
761         chan->prev = ws->polled->prev;
762         chan->next->prev = chan;
763         chan->prev->next = chan;
764     }
765     chan->state = CHAN_POLLED;
766
767 out:
768     disp_enable(handle);
769     return err;
770 }
771
772 /**
773  * \brief Stop polling the given channel, making it idle again
774  *
775  * \param chan Waitset's per-channel state
776  */
777 errval_t waitset_chan_stop_polling(struct waitset_chanstate *chan)
778 {
779     errval_t err = SYS_ERR_OK;
780
781     dispatcher_handle_t handle = disp_disable();
782
783     struct waitset *ws = chan->waitset;
784     if (ws == NULL) {
785         err = LIB_ERR_CHAN_NOT_REGISTERED;
786         goto out;
787     }
788
789     assert(chan->state != CHAN_UNREGISTERED);
790     if (chan->state != CHAN_POLLED) {
791         goto out; // no-op if idle or pending
792     }
793
794     // remove from polled queue
795     if (chan->next == chan) {
796         assert(chan->prev == chan);
797         assert(ws->polled == chan);
798         ws->polled = NULL;
799     } else {
800         chan->prev->next = chan->next;
801         chan->next->prev = chan->prev;
802         if (ws->polled == chan) {
803             ws->polled = chan->next;
804         }
805     }
806
807     // enqueue on idle queue
808     if (ws->idle == NULL) {
809         ws->idle = chan;
810         chan->next = chan->prev = chan;
811     } else {
812         chan->next = ws->idle;
813         chan->prev = ws->idle->prev;
814         chan->next->prev = chan;
815         chan->prev->next = chan;
816     }
817     chan->state = CHAN_IDLE;
818
819 out:
820     disp_enable(handle);
821     return err;
822 }
823
824 /**
825  * \brief Cancel a previous callback registration
826  *
827  * Remove the registration for a callback on the given channel.
828  * This function must only be called when disabled.
829  *
830  * \param chan Waitset's per-channel state
831  */
832 errval_t waitset_chan_deregister_disabled(struct waitset_chanstate *chan)
833 {
834     assert_disabled(chan != NULL);
835     struct waitset *ws = chan->waitset;
836     if (ws == NULL) {
837         return LIB_ERR_CHAN_NOT_REGISTERED;
838     }
839
840     // remove this channel from the queue in which it is waiting
841     chan->waitset = NULL;
842     assert_disabled(chan->next != NULL && chan->prev != NULL);
843
844     if (chan->next == chan) {
845         // only thing in the list: must be the head
846         assert_disabled(chan->prev == chan);
847         switch (chan->state) {
848         case CHAN_IDLE:
849             assert_disabled(chan == ws->idle);
850             ws->idle = NULL;
851             break;
852
853         case CHAN_POLLED:
854             assert_disabled(chan == ws->polled);
855             ws->polled = NULL;
856             break;
857
858         case CHAN_PENDING:
859             assert_disabled(chan == ws->pending);
860             ws->pending = NULL;
861             break;
862
863         default:
864             assert_disabled(!"invalid channel state in deregister");
865         }
866     } else {
867         assert_disabled(chan->prev != chan);
868         chan->prev->next = chan->next;
869         chan->next->prev = chan->prev;
870         switch (chan->state) {
871         case CHAN_IDLE:
872             if (chan == ws->idle) {
873                 ws->idle = chan->next;
874             }
875             break;
876
877         case CHAN_POLLED:
878             if (chan == ws->polled) {
879                 ws->polled = chan->next;
880             }
881             break;
882
883         case CHAN_PENDING:
884             if (chan == ws->pending) {
885                 ws->pending = chan->next;
886             }
887             break;
888
889         default:
890             assert_disabled(!"invalid channel state in deregister");
891         }
892     }
893     chan->state = CHAN_UNREGISTERED;
894
895 #ifndef NDEBUG
896     chan->prev = chan->next = NULL;
897 #endif
898
899     return SYS_ERR_OK;
900 }
901
902 /**
903  * \brief Cancel a previous callback registration
904  *
905  * Remove the registration for a callback on the given channel.
906  * This function must only be called when enabled.
907  *
908  * \param chan Waitset's per-channel state
909  */
910 errval_t waitset_chan_deregister(struct waitset_chanstate *chan)
911 {
912     dispatcher_handle_t handle = disp_disable();
913     errval_t err = waitset_chan_deregister_disabled(chan);
914     disp_enable(handle);
915     return err;
916 }
917
918 /**
919  * \brief Migrate callback registrations to a new waitset.
920  *
921  * \param chan Old waitset's per-channel state to migrate
922  * \param new_ws New waitset to migrate to
923  */
924 void waitset_chan_migrate(struct waitset_chanstate *chan,
925                           struct waitset *new_ws)
926 {
927     struct waitset *ws = chan->waitset;
928
929     // Only when registered
930     if(ws == NULL) {
931         return;
932     }
933
934     switch(chan->state) {
935     case CHAN_IDLE:
936         if (chan->next == chan) {
937             assert(chan->prev == chan);
938             assert(ws->idle == chan);
939             ws->idle = NULL;
940         } else {
941             chan->prev->next = chan->next;
942             chan->next->prev = chan->prev;
943             if (ws->idle == chan) {
944                 ws->idle = chan->next;
945             }
946         }
947
948         if (new_ws->idle == NULL) {
949             new_ws->idle = chan;
950             chan->next = chan->prev = chan;
951         } else {
952             chan->next = new_ws->idle;
953             chan->prev = new_ws->idle->prev;
954             chan->next->prev = chan;
955             chan->prev->next = chan;
956         }
957         break;
958
959     case CHAN_POLLED:
960         if (chan->next == chan) {
961             assert(chan->prev == chan);
962             assert(ws->polled == chan);
963             ws->polled = NULL;
964         } else {
965             chan->prev->next = chan->next;
966             chan->next->prev = chan->prev;
967             if (ws->polled == chan) {
968                 ws->polled = chan->next;
969             }
970         }
971
972         if (new_ws->polled == NULL) {
973             new_ws->polled = chan;
974             chan->next = chan->prev = chan;
975         } else {
976             chan->next = new_ws->polled;
977             chan->prev = new_ws->polled->prev;
978             chan->next->prev = chan;
979             chan->prev->next = chan;
980         }
981         break;
982
983     case CHAN_PENDING:
984         if (chan->next == chan) {
985             assert(chan->prev == chan);
986             assert(ws->pending == chan);
987             ws->pending = NULL;
988         } else {
989             chan->prev->next = chan->next;
990             chan->next->prev = chan->prev;
991             if (ws->pending == chan) {
992                 ws->pending = chan->next;
993             }
994         }
995
996         if (new_ws->pending == NULL) {
997             new_ws->pending = chan;
998             chan->next = chan->prev = chan;
999         } else {
1000             chan->next = new_ws->pending;
1001             chan->prev = new_ws->pending->prev;
1002             chan->next->prev = chan;
1003             chan->prev->next = chan;
1004         }
1005         break;
1006
1007     case CHAN_UNREGISTERED:
1008         // Do nothing
1009         break;
1010     }
1011
1012     // Remember new waitset association
1013     chan->waitset = new_ws;
1014 }
1015
1016 /**
1017  * \brief Trigger an event callback on a channel
1018  *
1019  * Marks the given channel as having a pending event, causing some future call
1020  * to get_next_event() to return the registered closure.
1021  * This function must only be called when disabled.
1022  *
1023  * \param chan Waitset's per-channel state
1024  * \param disp Current dispatcher pointer
1025  */
1026 errval_t waitset_chan_trigger_disabled(struct waitset_chanstate *chan,
1027                                        dispatcher_handle_t handle)
1028 {
1029     assert_disabled(chan != NULL);
1030     struct waitset *ws = chan->waitset;
1031     assert_disabled(ws != NULL);
1032     assert_disabled(chan->prev != NULL && chan->next != NULL);
1033
1034     // no-op if already pending
1035     if (chan->state == CHAN_PENDING) {
1036         return SYS_ERR_OK;
1037     }
1038
1039     // remove from previous queue (either idle or polled)
1040     if (chan->next == chan) {
1041         assert_disabled(chan->prev == chan);
1042         if (chan->state == CHAN_IDLE) {
1043             assert_disabled(ws->idle == chan);
1044             ws->idle = NULL;
1045         } else {
1046             assert_disabled(chan->state == CHAN_POLLED);
1047             assert_disabled(ws->polled == chan);
1048             ws->polled = NULL;
1049         }
1050     } else {
1051         chan->prev->next = chan->next;
1052         chan->next->prev = chan->prev;
1053         if (chan->state == CHAN_IDLE) {
1054             if (ws->idle == chan) {
1055                 ws->idle = chan->next;
1056             }
1057         } else {
1058             assert_disabled(chan->state == CHAN_POLLED);
1059             if (ws->polled == chan) {
1060                 ws->polled = chan->next;
1061             }
1062         }
1063     }
1064
1065     // is there a thread blocked on this waitset? if so, awaken it with the event
1066     if (ws->waiting_threads != NULL) {
1067         chan->waitset = NULL;
1068 #ifndef NDEBUG
1069         chan->prev = chan->next = NULL;
1070 #endif
1071         chan->state = CHAN_UNREGISTERED;
1072         struct thread *t;
1073         t = thread_unblock_one_disabled(handle, &ws->waiting_threads, chan);
1074         assert_disabled(t == NULL);
1075         return SYS_ERR_OK;
1076     }
1077
1078     // else mark channel pending and move to end of pending event queue
1079     chan->state = CHAN_PENDING;
1080     if (ws->pending == NULL) {
1081         ws->pending = chan;
1082         chan->next = chan->prev = chan;
1083     } else {
1084         chan->next = ws->pending;
1085         chan->prev = ws->pending->prev;
1086         assert_disabled(ws->pending->next != NULL);
1087         assert_disabled(ws->pending->prev != NULL);
1088         assert_disabled(chan->prev != NULL);
1089         chan->next->prev = chan;
1090         chan->prev->next = chan;
1091     }
1092
1093     return SYS_ERR_OK;
1094 }
1095
1096 /**
1097  * \brief Trigger an event callback on a channel
1098  *
1099  * Marks the given channel as having a pending event, causing some future call
1100  * to get_next_event() to return the registered closure.
1101  * This function must only be called when enabled.
1102  *
1103  * \param chan Waitset's per-channel state
1104  * \param disp Current dispatcher pointer
1105  */
1106 errval_t waitset_chan_trigger(struct waitset_chanstate *chan)
1107 {
1108     dispatcher_handle_t disp = disp_disable();
1109     errval_t err = waitset_chan_trigger_disabled(chan, disp);
1110     disp_enable(disp);
1111     return err;
1112 }
1113
1114 /**
1115  * \brief Trigger a specific event callback on an unregistered channel
1116  *
1117  * This function is equivalent to waitset_chan_register_disabled() immediately
1118  * followed by waitset_chan_trigger_disabled(), but avoids unneccessary queue
1119  * manipulation. This function must only be called when disabled.
1120  *
1121  * \param ws Waitset
1122  * \param chan Waitset's per-channel state
1123  * \param closure Event handler
1124  * \param disp Current dispatcher pointer
1125  */
1126 errval_t waitset_chan_trigger_closure_disabled(struct waitset *ws,
1127                                                struct waitset_chanstate *chan,
1128                                                struct event_closure closure,
1129                                                dispatcher_handle_t handle)
1130 {
1131     assert_disabled(chan != NULL);
1132     assert_disabled(ws != NULL);
1133
1134     // check if already registered
1135     if (chan->waitset != NULL || chan->state != CHAN_UNREGISTERED) {
1136         return LIB_ERR_CHAN_ALREADY_REGISTERED;
1137     }
1138
1139     assert_disabled(chan->prev == NULL && chan->next == NULL);
1140
1141     // set closure
1142     chan->closure = closure;
1143
1144     // is there a thread blocked on this waitset? if so, awaken it with the event
1145     if (ws->waiting_threads != NULL) {
1146         struct thread *t;
1147         t = thread_unblock_one_disabled(handle, &ws->waiting_threads, chan);
1148         assert_disabled(t == NULL);
1149         return SYS_ERR_OK;
1150     }
1151
1152     // mark channel pending and place on end of pending event queue
1153     chan->waitset = ws;
1154     chan->state = CHAN_PENDING;
1155     if (ws->pending == NULL) {
1156         ws->pending = chan;
1157         chan->next = chan->prev = chan;
1158     } else {
1159         chan->next = ws->pending;
1160         chan->prev = ws->pending->prev;
1161         chan->next->prev = chan;
1162         chan->prev->next = chan;
1163     }
1164
1165     assert(ws->pending->prev != NULL && ws->pending->next != NULL);
1166
1167     return SYS_ERR_OK;
1168 }
1169
1170
1171 /**
1172  * \brief Trigger a specific event callback on an unregistered channel
1173  *
1174  * This function is equivalent to waitset_chan_register()
1175  * followed by waitset_chan_trigger(), but avoids unneccessary queue
1176  * manipulation. This function must only be called when enabled.
1177  *
1178  * \param ws Waitset
1179  * \param chan Waitset's per-channel state
1180  * \param closure Event handler
1181  */
1182 errval_t waitset_chan_trigger_closure(struct waitset *ws,
1183                                       struct waitset_chanstate *chan,
1184                                       struct event_closure closure)
1185 {
1186     dispatcher_handle_t disp = disp_disable();
1187     errval_t err = waitset_chan_trigger_closure_disabled(ws, chan, closure, disp);
1188     disp_enable(disp);
1189     return err;
1190 }