Merge branch 'master' of ssh://code.systems.ethz.ch:8006/diffusion/BFI/barrelfish
[barrelfish] / lib / barrelfish / waitset.c
index 8c772a0..e13437e 100644 (file)
 #include <stdio.h>
 #include <string.h>
 
+#include <flounder/flounder.h>
+
 #ifdef CONFIG_INTERCONNECT_DRIVER_UMP
 #  include <barrelfish/ump_endpoint.h>
 #endif
 
-#if defined(__k1om__) || defined(__aarch64__)
-#include <barrelfish_kpi/asm_inlines_arch.h>
-static inline cycles_t cyclecount(void)
+/// Dequeue a chanstate from a queue
+static void dequeue(struct waitset_chanstate **queue, struct waitset_chanstate *chan)
 {
-    return rdtsc();
+    if (chan->next == chan) {
+        assert(chan->prev == chan);
+        assert(*queue == chan);
+        *queue = NULL;
+    } else {
+        chan->prev->next = chan->next;
+        chan->next->prev = chan->prev;
+        if (*queue == chan) {
+            *queue = chan->next;
+        }
+    }
+    chan->prev = chan->next = NULL;
 }
-#elif defined(__x86_64__) || defined(__i386__)
-#include <arch/x86/barrelfish_kpi/asm_inlines_arch.h>
-static inline cycles_t cyclecount(void)
+
+/// Enqueue a chanstate on a queue
+static void enqueue(struct waitset_chanstate **queue, struct waitset_chanstate *chan)
 {
-    return rdtsc();
+    if (*queue == NULL) {
+        *queue = chan;
+        chan->next = chan->prev = chan;
+    } else {
+        chan->next = *queue;
+        chan->prev = (*queue)->prev;
+        chan->next->prev = chan;
+        chan->prev->next = chan;
+    }
 }
-#elif defined(__arm__) && defined(__gem5__)
-/**
- * XXX: Gem5 doesn't support the ARM performance monitor extension
- * therefore we just poll a fixed number of times instead of using
- * cycle counts. POLL_COUNT is deliberately set to 42, guess why! ;)
- */
-#define POLL_COUNT     42
-#elif defined(__aarch64__) && defined(__gem5__)
-#define POLL_COUNT  42
-#elif defined(__arm__)
-#include <arch/arm/barrelfish_kpi/asm_inlines_arch.h>
-static inline cycles_t cyclecount(void)
+
+/// Dequeue a chanstate from polled queue
+static void dequeue_polled(struct waitset_chanstate **queue,
+                            struct waitset_chanstate *chan)
 {
-    return get_cycle_count();
+    if (chan->polled_next == chan) {
+        assert(chan->polled_prev == chan);
+        assert(*queue == chan);
+        *queue = NULL;
+    } else {
+        chan->polled_prev->polled_next = chan->polled_next;
+        chan->polled_next->polled_prev = chan->polled_prev;
+        if (*queue == chan) {
+            *queue = chan->polled_next;
+        }
+    }
+    chan->polled_prev = chan->polled_next = NULL;
 }
-#else
-static inline cycles_t cyclecount(void)
+
+/// Enqueue a chanstate on polled queue
+static void enqueue_polled(struct waitset_chanstate **queue,
+                            struct waitset_chanstate *chan)
 {
-    USER_PANIC("called on non-x86 architecture. why are we polling?");
-    return 0;
+    if (*queue == NULL) {
+        *queue = chan;
+        chan->polled_next = chan->polled_prev = chan;
+    } else {
+        chan->polled_next = *queue;
+        chan->polled_prev = (*queue)->polled_prev;
+        chan->polled_next->polled_prev = chan;
+        chan->polled_prev->polled_next = chan;
+    }
 }
-#endif
-
-// FIXME: bogus default value. need to measure this at boot time
-#define WAITSET_POLL_CYCLES_DEFAULT 2000
-
-/// Maximum number of cycles to spend polling channels before yielding CPU
-cycles_t waitset_poll_cycles = WAITSET_POLL_CYCLES_DEFAULT;
 
 /**
  * \brief Initialise a new waitset
@@ -79,9 +104,8 @@ cycles_t waitset_poll_cycles = WAITSET_POLL_CYCLES_DEFAULT;
 void waitset_init(struct waitset *ws)
 {
     assert(ws != NULL);
-    ws->pending = ws->polled = ws->idle = NULL;
+    ws->pending = ws->polled = ws->idle = ws->waiting = NULL;
     ws->waiting_threads = NULL;
-    ws->polling = false;
 }
 
 /**
@@ -127,55 +151,61 @@ errval_t waitset_destroy(struct waitset *ws)
     return SYS_ERR_OK;
 }
 
-/// Returns a channel with a pending event on the given waitset, or NULL
-static struct waitset_chanstate *get_pending_event_disabled(struct waitset *ws)
+/// Check if a thread can receive an event
+static bool waitset_check_token(struct waitset_chanstate *chan,
+                                struct thread *thread)
 {
-    // are there any pending events on the waitset?
-    if (ws->pending == NULL) {
-        return NULL;
-    }
-
-    // dequeue next pending event
-    struct waitset_chanstate *chan = ws->pending;
-    if (chan->next == chan) {
-        assert_disabled(chan->prev == chan);
-        ws->pending = NULL;
-    } else {
-        ws->pending = chan->next;
-        chan->prev->next = chan->next;
-        chan->next->prev = chan->prev;
-    }
-#ifndef NDEBUG
-    chan->prev = chan->next = NULL;
-#endif
-
-    // mark not pending
-    assert_disabled(chan->state == CHAN_PENDING);
-    chan->state = CHAN_UNREGISTERED;
-    chan->waitset = NULL;
+    bool res = false;
 
-    return chan;
+    if (chan->wait_for) // if a thread is waiting for this specific event
+        res = chan->wait_for == thread;
+    else
+        res = (chan->token & 1 && !thread->token) // incoming token is a request
+            // and a thread is not waiting for a token
+            || (!chan->token && chan != thread->channel) // there's no token
+            // and a thread is not waiting specifically for that event
+            || (chan->token == thread->token && chan == thread->channel);
+            // there is a token and it matches thread's token and event
+    return res;
 }
 
-#ifdef CONFIG_INTERCONNECT_DRIVER_UMP
-/**
- * \brief Poll an incoming UMP endpoint.
- * This is logically part of the UMP endpoint implementation, but placed here
- * for easier inlining.
- */
-static inline void ump_endpoint_poll(struct waitset_chanstate *chan)
+/// Returns a channel with a pending event on the given waitset matching
+/// our thread
+static struct waitset_chanstate *get_pending_event_disabled(struct waitset *ws,
+                                    struct waitset_chanstate *chan)
 {
-    /* XXX: calculate location of endpoint from waitset channel state */
-    struct ump_endpoint *ep = (struct ump_endpoint *)
-        ((char *)chan - offsetof(struct ump_endpoint, waitset_state));
+    struct thread *me = thread_self();
 
-    if (ump_endpoint_can_recv(ep)) {
-        errval_t err = waitset_chan_trigger(chan);
-        assert(err_is_ok(err)); // should not be able to fail
+    if (chan) { // channel that we wait for
+        if (chan->state == CHAN_PENDING && waitset_check_token(chan, me)) {
+            return chan;
+        }
+        if (chan->state == CHAN_WAITING && waitset_check_token(chan, me)) {
+            return chan;
+        }
+    }
+    // check a waiting queue for matching event
+    for (chan = ws->waiting; chan; ) {
+        if (waitset_check_token(chan, me)) {
+            assert_disabled(chan->state == CHAN_WAITING);
+            return chan;
+        }
+        chan = chan->next;
+        if (chan == ws->waiting)
+            break;
+    }
+    // check a pending queue for matching event
+    for (chan = ws->pending; chan;) {
+        if (waitset_check_token(chan, me)) {
+            assert_disabled(chan->state == CHAN_PENDING);
+            return chan;
+        }
+        chan = chan->next;
+        if (chan == ws->pending)
+            break;
     }
+    return NULL;
 }
-#endif // CONFIG_INTERCONNECT_DRIVER_UMP
-
 
 void arranet_polling_loop_proxy(void) __attribute__((weak));
 void arranet_polling_loop_proxy(void)
@@ -190,210 +220,166 @@ void poll_ahci(struct waitset_chanstate *chan)
     assert(err_is_ok(err)); // should not be able to fail
 }
 
-/// Helper function that knows how to poll the given channel, based on its type
-static void poll_channel(struct waitset_chanstate *chan)
-{
-    switch (chan->chantype) {
+/// Check polled channels
+void poll_channels_disabled(dispatcher_handle_t handle) {
+    struct dispatcher_generic *dp = get_dispatcher_generic(handle);
+    struct waitset_chanstate *chan;
+
+    if (!dp->polled_channels)
+        return;
+    chan = dp->polled_channels;
+    do {
+        switch (chan->chantype) {
 #ifdef CONFIG_INTERCONNECT_DRIVER_UMP
-    case CHANTYPE_UMP_IN:
-        ump_endpoint_poll(chan);
-        break;
+        case CHANTYPE_UMP_IN: {
+            if (ump_endpoint_poll(chan)) {
+                errval_t err = waitset_chan_trigger_disabled(chan, handle);
+                assert(err_is_ok(err)); // should not fail
+                if (!dp->polled_channels) // restart scan
+                    return;
+                chan = dp->polled_channels;
+                continue;
+            } else
+                chan = chan->polled_next;
+        } break;
 #endif // CONFIG_INTERCONNECT_DRIVER_UMP
-
-    case CHANTYPE_LWIP_SOCKET:
-        arranet_polling_loop_proxy();
-        break;
-
-    case CHANTYPE_AHCI:
-        poll_ahci(chan);
-        break;
-
-    default:
-        assert(!"invalid channel type to poll!");
-    }
+        case CHANTYPE_LWIP_SOCKET:
+            arranet_polling_loop_proxy();
+            break;
+        case CHANTYPE_AHCI:
+            poll_ahci(chan);
+            break;
+        default:
+            assert(!"invalid channel type to poll!");
+        }
+    } while (chan != dp->polled_channels);
 }
 
-// pollcycles_*: arch-specific implementation for polling.
-//               Used by get_next_event().
-//
-//   pollcycles_reset()  -- return the number of pollcycles we want to poll for
-//   pollcycles_update() -- update the pollcycles variable. This is needed for
-//                          implementations where we don't have a cycle counter
-//                          and we just count the number of polling operations
-//                          performed
-//   pollcycles_expired() -- check if pollcycles have expired
-//
-// We might want to move them to architecture-specific files, and/or create a
-// cleaner interface. For now, I just wanted to keep them out of
-// get_next_event()
-
-#if defined(__ARM_ARCH_7A__) && defined(__GNUC__) \
-       && __GNUC__ == 4 && __GNUC_MINOR__ <= 6 && __GNUC_PATCHLEVEL__ <= 3
-static __attribute__((noinline, unused))
-#else
-static inline
-#endif
-cycles_t pollcycles_reset(void)
+/// Re-register a channel (if persistent)
+static void reregister_channel(struct waitset *ws, struct waitset_chanstate *chan,
+                                dispatcher_handle_t handle)
 {
-    cycles_t pollcycles;
-#if defined(__arm__) && !defined(__gem5__)
-    reset_cycle_counter();
-    pollcycles = waitset_poll_cycles;
-#elif defined(__arm__) && defined(__gem5__)
-    pollcycles = 0;
-#elif defined(__aarch64__) && defined(__gem5__)
-    pollcycles = 0;
-#else
-    pollcycles = cyclecount() + waitset_poll_cycles;
-#endif
-    return pollcycles;
-}
+    assert(chan->waitset == ws);
+    if (chan->state == CHAN_PENDING) {
+        dequeue(&ws->pending, chan);
+    } else {
+        assert(chan->state == CHAN_WAITING);
+        dequeue(&ws->waiting, chan);
+    }
 
-#if defined(__ARM_ARCH_7A__) && defined(__GNUC__) \
-       && __GNUC__ == 4 && __GNUC_MINOR__ <= 6 && __GNUC_PATCHLEVEL__ <= 3
-static __attribute__((noinline, unused))
-#else
-static inline
-#endif
-cycles_t pollcycles_update(cycles_t pollcycles)
-{
-    cycles_t ret = pollcycles;
-    #if defined(__arm__) && defined(__gem5__)
-    ret++;
-       #elif defined(__aarch64__) && defined(__gem5__)
-       ret++;
-    #endif
-    return ret;
+    chan->token = 0;
+    if (chan->chantype == CHANTYPE_UMP_IN
+        || chan->chantype == CHANTYPE_LWIP_SOCKET 
+        || chan->chantype == CHANTYPE_AHCI) {
+        enqueue(&ws->polled, chan);
+        enqueue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
+        chan->state = CHAN_POLLED;
+    } else {
+        enqueue(&ws->idle, chan);
+        chan->state = CHAN_IDLE;
+    }
 }
 
-#if defined(__ARM_ARCH_7A__) && defined(__GNUC__) \
-       && __GNUC__ == 4 && __GNUC_MINOR__ <= 6 && __GNUC_PATCHLEVEL__ <= 3
-static __attribute__((noinline, unused))
-#else
-static inline
-#endif
-bool pollcycles_expired(cycles_t pollcycles)
+/// Find a thread that is able to receive an event
+static struct thread * find_recipient(struct waitset *ws,
+                        struct waitset_chanstate *channel, struct thread *me)
 {
-    bool ret;
-    #if defined(__arm__) && !defined(__gem5__)
-    ret = (cyclecount() > pollcycles || is_cycle_counter_overflow());
-    #elif defined(__arm__) && defined(__gem5__)
-    ret = pollcycles >= POLL_COUNT;
-    #elif defined(__aarch64__) && defined(__gem5__)
-    ret = pollcycles >= POLL_COUNT;
-    #else
-    ret = cyclecount() > pollcycles;
-    #endif
-    return ret;
+    struct thread *t = ws->waiting_threads;
+
+    if (!t)
+        return NULL;
+    do {
+        if (waitset_check_token(channel, t))
+            return t;
+        t = t->next;
+    } while (t != ws->waiting_threads);
+    return ws->waiting_threads;
 }
 
-static errval_t get_next_event_debug(struct waitset *ws,
-        struct event_closure *retclosure, bool debug)
+/// Wake up other thread if there's more pending events
+static void wake_up_other_thread(dispatcher_handle_t handle, struct waitset *ws)
 {
-    struct waitset_chanstate *chan;
-    bool was_polling = false;
-    cycles_t pollcycles;
-
-    assert(ws != NULL);
-    assert(retclosure != NULL);
-
-    // unconditionally disable ourselves and check for events
-    // if we decide we have to start polling, we'll jump back up here
-    goto check_for_events;
-
-    /* ------------ POLLING LOOP; RUNS WHILE ENABLED ------------ */
-polling_loop:
-    was_polling = true;
-    assert(ws->polling); // this thread is polling
-    // get the amount of cycles we want to poll for
-    pollcycles = pollcycles_reset();
-
-    // while there are no pending events, poll channels
-    while (ws->polled != NULL && ws->pending == NULL) {
-        struct waitset_chanstate *nextchan = NULL;
-        // NB: Polling policy is to return as soon as a pending event
-        // appears, not bother looking at the rest of the polling queue
-        for (chan = ws->polled;
-             chan != NULL && chan->waitset == ws && chan->state == CHAN_POLLED
-                 && ws->pending == NULL;
-             chan = nextchan) {
-
-            nextchan = chan->next;
-            poll_channel(chan);
-            // update pollcycles
-            pollcycles = pollcycles_update(pollcycles);
-            // yield the thread if we exceed the cycle count limit
-            if (ws->pending == NULL && pollcycles_expired(pollcycles)) {
-                if (debug) {
-                if (strcmp(disp_name(), "netd") != 0) {
-                    // Print the callback trace so that we know which call is leading
-                    // the schedule removal and
-                    printf("%s: callstack: %p %p %p %p\n", disp_name(),
-                            __builtin_return_address(0),
-                            __builtin_return_address(1),
-                            __builtin_return_address(2),
-                            __builtin_return_address(3));
-                }
-
-                }
-                thread_yield();
-                pollcycles = pollcycles_reset();
-            }
-        }
+    if (ws->pending && ws->waiting_threads) {
+        struct thread *t;
 
-        // ensure that we restart polling from the place we left off here,
-        // if the next channel is a valid one
-        if (nextchan != NULL && nextchan->waitset == ws
-            && nextchan->state == CHAN_POLLED) {
-            ws->polled = nextchan;
-        }
+        t = thread_unblock_one_disabled(handle, &ws->waiting_threads, NULL);
+        assert_disabled(t == NULL); // shouldn't see a remote thread
     }
+}
 
-    /* ------------ STATE MACHINERY; RUNS WHILE DISABLED ------------ */
-check_for_events: ;
-    dispatcher_handle_t handle = disp_disable();
+/**
+ * \brief Get next pending event
+ *
+ * Check if there is a pending event that matches current thread and return it.
+ * Pending events are in a pending queue and in a waiting queue.
+ * A pending event then will be removed from a pending/waiting queue and become
+ * unregistered or, if it's persistent, will be re-registered to an idle queue
+ * or a polled queue (UMP channels) of a waitset.
+ * If there's no pending event, block this thread.
+ * If there's a pending event but it doesn't match our thread, don't remove it
+ * from a pending queue and wake up a matching thread.
+ * If there's no matching thread, add it to a waiting queue.
+ *
+ * \param ws Waitset with sources of events
+ * \param retchannel Holder of returned event
+ * \param retclosure Holder of returned closure
+ * \param waitfor Specific event that we're waiting for (can be NULL)
+ * \param handle Dispatcher's handle
+ * \param debug Debug mode (not used)
+ */
 
-    // are there any pending events on the waitset?
-    chan = get_pending_event_disabled(ws);
-    if (chan != NULL) {
-        // if we need to poll, and we have a blocked thread, wake it up to do so
-        if (was_polling && ws->polled != NULL && ws->waiting_threads != NULL) {
-            // start a blocked thread polling
-            struct thread *t;
-            t = thread_unblock_one_disabled(handle, &ws->waiting_threads, NULL);
-            assert_disabled(t == NULL); // shouldn't see a remote thread
-        } else if (was_polling) {
-            // I'm stopping polling, and there is nobody else
-            assert_disabled(ws->polling);
-            ws->polling = false;
+errval_t get_next_event_disabled(struct waitset *ws,
+    struct waitset_chanstate **retchannel, struct event_closure *retclosure,
+    struct waitset_chanstate *waitfor, dispatcher_handle_t handle, bool debug)
+{
+    struct waitset_chanstate * chan;
+
+    for (;;) {
+        chan = get_pending_event_disabled(ws, waitfor); // get our event
+        if (chan) {
+            *retchannel = chan;
+            *retclosure = chan->closure;
+            chan->wait_for = NULL;
+            chan->token = 0;
+            if (chan->persistent)
+                reregister_channel(ws, chan, handle);
+            else
+                waitset_chan_deregister_disabled(chan, handle);
+            wake_up_other_thread(handle, ws);
+            return SYS_ERR_OK;
         }
-        disp_enable(handle);
-
-        *retclosure = chan->closure;
-        return SYS_ERR_OK;
-    }
-
-    // If we got here and there are channels to poll but no-one is polling,
-    // then either we never polled, or we lost a race on the channel we picked.
-    // Either way, we'd better start polling again.
-    if (ws->polled != NULL && (was_polling || !ws->polling)) {
-        if (!was_polling) {
-            ws->polling = true;
+        chan = ws->pending; // check a pending queue
+        if (!chan) { // if nothing then wait
+            thread_block_disabled(handle, &ws->waiting_threads);
+            disp_disable();
+        } else { // something but it's not our event
+            if (!ws->waiting_threads) { // no other thread interested in
+                dequeue(&ws->pending, chan);
+                enqueue(&ws->waiting, chan);
+                chan->state = CHAN_WAITING;
+                chan->waitset = ws;
+            } else {
+                // find a matching thread
+                struct thread *t;
+                for (t = ws->waiting_threads; t; ) {
+                    if (waitset_check_token(chan, t)) { // match found, wake it
+                        ws->waiting_threads = t;
+                        t = thread_unblock_one_disabled(handle,
+                                                    &ws->waiting_threads, chan);
+                        assert_disabled(t == NULL); // shouldn't see a remote thread
+                        break;
+                    }
+                    t = t->next;
+                    if (t == ws->waiting_threads) { // no recipient found
+                        dequeue(&ws->pending, chan);
+                        enqueue(&ws->waiting, chan);
+                        chan->state = CHAN_WAITING;
+                        chan->waitset = ws;
+                        break;
+                    }
+                }
+            }
         }
-        disp_enable(handle);
-        goto polling_loop;
-    }
-
-    // otherwise block awaiting an event
-    chan = thread_block_disabled(handle, &ws->waiting_threads);
-
-    if (chan == NULL) {
-        // not a real event, just a wakeup to get us to start polling!
-        assert(ws->polling);
-        goto polling_loop;
-    } else {
-        *retclosure = chan->closure;
-        return SYS_ERR_OK;
     }
 }
 
@@ -409,58 +395,47 @@ check_for_events: ;
  */
 errval_t get_next_event(struct waitset *ws, struct event_closure *retclosure)
 {
-    return get_next_event_debug(ws, retclosure, false);
+    dispatcher_handle_t handle = disp_disable();
+    struct waitset_chanstate *channel;
+    errval_t err = get_next_event_disabled(ws, &channel, retclosure, NULL,
+                                            handle, false);
+    disp_enable(handle);
+    return err;
 }
 
 
 
 /**
- * \brief Return next event on given waitset, if one is already pending
+ * \brief Check if there is an event pending on given waitset
  *
  * This is essentially a non-blocking variant of get_next_event(). It should be
  * used with great care, to avoid the creation of busy-waiting loops.
  *
  * \param ws Waitset
- * \param retclosure Pointer to storage space for returned event closure
  *
  * \returns LIB_ERR_NO_EVENT if nothing is pending
  */
-errval_t check_for_event(struct waitset *ws, struct event_closure *retclosure)
+static errval_t check_for_event_disabled(struct waitset *ws, dispatcher_handle_t handle)
 {
     struct waitset_chanstate *chan;
-    int pollcount = 0;
-
-    assert(ws != NULL);
-    assert(retclosure != NULL);
 
- recheck: ;
-    // are there any pending events on the waitset?
-    dispatcher_handle_t handle = disp_disable();
-    chan = get_pending_event_disabled(ws);
-    disp_enable(handle);
+    poll_channels_disabled(handle);
+    chan = get_pending_event_disabled(ws, NULL);
     if (chan != NULL) {
-        *retclosure = chan->closure;
         return SYS_ERR_OK;
     }
+    return LIB_ERR_NO_EVENT;
+}
 
-    // if there are no pending events, poll all channels once
-    if (ws->polled != NULL && pollcount++ == 0) {
-        for (chan = ws->polled;
-             chan != NULL && chan->waitset == ws && chan->state == CHAN_POLLED;
-             chan = chan->next) {
-
-            poll_channel(chan);
-            if (ws->pending != NULL) {
-                goto recheck;
-            }
-
-            if (chan->next == ws->polled) { // reached the start of the queue
-                break;
-            }
-        }
-    }
+errval_t check_for_event(struct waitset *ws)
+{
+    errval_t err;
 
-    return LIB_ERR_NO_EVENT;
+    assert(ws != NULL);
+    dispatcher_handle_t handle = disp_disable();
+    err = check_for_event_disabled(ws, handle);
+    disp_enable(handle);
+    return err;
 }
 
 /**
@@ -488,20 +463,59 @@ errval_t event_dispatch(struct waitset *ws)
 errval_t event_dispatch_debug(struct waitset *ws)
 {
     struct event_closure closure;
-    errval_t err = get_next_event_debug(ws, &closure, false);
+    struct waitset_chanstate *channel;
+    dispatcher_handle_t handle = disp_disable();
+    errval_t err = get_next_event_disabled(ws, &channel, &closure, NULL, handle,
+                                            true);
+    disp_enable(handle);
     if (err_is_fail(err)) {
         return err;
     }
 
     assert(closure.handler != NULL);
-//    printf("%s: event_dispatch: %p: \n", disp_name(), closure.handler);
-
-
     closure.handler(closure.arg);
     return SYS_ERR_OK;
 }
 
 /**
+ * \brief Dispatch events until a specific event is received
+ *
+ * Wait for events and dispatch them. If a specific event comes, don't call
+ * a closure, just return.
+ *
+ * \param ws Waitset
+ * \param waitfor Event, that we are waiting for
+ * \param error_var Error variable that can be changed by closures
+ */
+
+errval_t wait_for_channel(struct waitset *ws, struct waitset_chanstate *waitfor,
+                            errval_t *error_var)
+{
+    assert(waitfor->waitset == ws);
+    for (;;) {
+        struct event_closure closure;
+        struct waitset_chanstate *channel;
+
+        dispatcher_handle_t handle = disp_disable();
+        errval_t err = get_next_event_disabled(ws, &channel, &closure, waitfor,
+                                                handle, false);
+        disp_enable(handle);
+        if (err_is_fail(err)) {
+            assert(0);
+            return err;
+        }
+        if (channel == waitfor) {
+            return SYS_ERR_OK;
+        }
+        assert(!channel->wait_for);
+        assert(closure.handler != NULL);
+        closure.handler(closure.arg);
+        if (err_is_fail(*error_var))
+            return *error_var;
+    }
+}
+
+/**
  * \brief check and dispatch next event on given waitset
  *
  * Check if there is any pending activity on some channel, or deferred
@@ -513,14 +527,23 @@ errval_t event_dispatch_debug(struct waitset *ws)
  */
 errval_t event_dispatch_non_block(struct waitset *ws)
 {
-    assert(ws != NULL);
+    struct waitset_chanstate *channel;
     struct event_closure closure;
-    errval_t err = check_for_event(ws, &closure);
 
+    assert(ws != NULL);
+
+    // are there any pending events on the waitset?
+    dispatcher_handle_t handle = disp_disable();
+    errval_t err = check_for_event_disabled(ws, handle);
     if (err_is_fail(err)) {
+        disp_enable(handle);
         return err;
     }
-
+    err = get_next_event_disabled(ws, &channel, &closure, NULL, handle,
+                                            false);
+    if (err_is_fail(err))
+        return err;
+    disp_enable(handle);
     assert(closure.handler != NULL);
     closure.handler(closure.arg);
     return SYS_ERR_OK;
@@ -548,6 +571,9 @@ void waitset_chanstate_init(struct waitset_chanstate *chan,
 #ifndef NDEBUG
     chan->prev = chan->next = NULL;
 #endif
+    chan->persistent = false;
+    chan->token = 0;
+    chan->wait_for = NULL;
 }
 
 /**
@@ -584,6 +610,7 @@ errval_t waitset_chan_register_disabled(struct waitset *ws,
     }
 
     chan->waitset = ws;
+    chan->token = 0;
 
     // channel must not already be registered!
     assert_disabled(chan->next == NULL && chan->prev == NULL);
@@ -596,15 +623,7 @@ errval_t waitset_chan_register_disabled(struct waitset *ws,
     chan->closure = closure;
 
     // enqueue this channel on the waitset's queue of idle channels
-    if (ws->idle == NULL) {
-        chan->next = chan->prev = chan;
-        ws->idle = chan;
-    } else {
-        chan->next = ws->idle;
-        chan->prev = chan->next->prev;
-        chan->next->prev = chan;
-        chan->prev->next = chan;
-    }
+    enqueue(&ws->idle, chan);
     chan->state = CHAN_IDLE;
 
     return SYS_ERR_OK;
@@ -633,6 +652,7 @@ errval_t waitset_chan_register_polled_disabled(struct waitset *ws,
     }
 
     chan->waitset = ws;
+    chan->token = 0;
 
     // channel must not already be registered!
     assert_disabled(chan->next == NULL && chan->prev == NULL);
@@ -642,23 +662,9 @@ errval_t waitset_chan_register_polled_disabled(struct waitset *ws,
     chan->closure = closure;
 
     // enqueue this channel on the waitset's queue of polled channels
-    if (ws->polled == NULL) {
-        chan->next = chan->prev = chan;
-        ws->polled = chan;
-        if (ws->waiting_threads != NULL && !ws->polling) {
-            // start a blocked thread polling
-            ws->polling = true;
-            struct thread *t;
-            t = thread_unblock_one_disabled(handle, &ws->waiting_threads, NULL);
-            assert_disabled(t == NULL); // shouldn't see a remote thread: waitsets are per-dispatcher
-        }
-    } else {
-        chan->next = ws->polled;
-        chan->prev = chan->next->prev;
-        chan->next->prev = chan;
-        chan->prev->next = chan;
-    }
+    enqueue(&ws->polled, chan);
     chan->state = CHAN_POLLED;
+    enqueue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
 
     return SYS_ERR_OK;
 }
@@ -708,120 +714,6 @@ errval_t waitset_chan_register_polled(struct waitset *ws,
 }
 
 /**
- * \brief Mark an idle channel as polled
- *
- * The given channel will periodically have its poll function called.
- * The channel must already be registered.
- *
- * \param chan Waitset's per-channel state
- */
-errval_t waitset_chan_start_polling(struct waitset_chanstate *chan)
-{
-    errval_t err = SYS_ERR_OK;
-
-    dispatcher_handle_t handle = disp_disable();
-
-    struct waitset *ws = chan->waitset;
-    if (ws == NULL) {
-        err = LIB_ERR_CHAN_NOT_REGISTERED;
-        goto out;
-    }
-
-    assert(chan->state != CHAN_UNREGISTERED);
-    if (chan->state != CHAN_IDLE) {
-        goto out; // no-op if polled or pending
-    }
-
-    // remove from idle queue
-    if (chan->next == chan) {
-        assert(chan->prev == chan);
-        assert(ws->idle == chan);
-        ws->idle = NULL;
-    } else {
-        chan->prev->next = chan->next;
-        chan->next->prev = chan->prev;
-        if (ws->idle == chan) {
-            ws->idle = chan->next;
-        }
-    }
-
-    // enqueue on polled queue
-    if (ws->polled == NULL) {
-        ws->polled = chan;
-        chan->next = chan->prev = chan;
-        if (ws->waiting_threads != NULL && !ws->polling) {
-            // start a blocked thread polling
-            ws->polling = true;
-            struct thread *t;
-            t = thread_unblock_one_disabled(handle, &ws->waiting_threads, NULL);
-            assert(t == NULL); // shouldn't see a remote thread: waitsets are per-dispatcher
-        }
-    } else {
-        chan->next = ws->polled;
-        chan->prev = ws->polled->prev;
-        chan->next->prev = chan;
-        chan->prev->next = chan;
-    }
-    chan->state = CHAN_POLLED;
-
-out:
-    disp_enable(handle);
-    return err;
-}
-
-/**
- * \brief Stop polling the given channel, making it idle again
- *
- * \param chan Waitset's per-channel state
- */
-errval_t waitset_chan_stop_polling(struct waitset_chanstate *chan)
-{
-    errval_t err = SYS_ERR_OK;
-
-    dispatcher_handle_t handle = disp_disable();
-
-    struct waitset *ws = chan->waitset;
-    if (ws == NULL) {
-        err = LIB_ERR_CHAN_NOT_REGISTERED;
-        goto out;
-    }
-
-    assert(chan->state != CHAN_UNREGISTERED);
-    if (chan->state != CHAN_POLLED) {
-        goto out; // no-op if idle or pending
-    }
-
-    // remove from polled queue
-    if (chan->next == chan) {
-        assert(chan->prev == chan);
-        assert(ws->polled == chan);
-        ws->polled = NULL;
-    } else {
-        chan->prev->next = chan->next;
-        chan->next->prev = chan->prev;
-        if (ws->polled == chan) {
-            ws->polled = chan->next;
-        }
-    }
-
-    // enqueue on idle queue
-    if (ws->idle == NULL) {
-        ws->idle = chan;
-        chan->next = chan->prev = chan;
-    } else {
-        chan->next = ws->idle;
-        chan->prev = ws->idle->prev;
-        chan->next->prev = chan;
-        chan->prev->next = chan;
-    }
-    chan->state = CHAN_IDLE;
-
-out:
-    disp_enable(handle);
-    return err;
-}
-
-/**
  * \brief Cancel a previous callback registration
  *
  * Remove the registration for a callback on the given channel.
@@ -829,7 +721,8 @@ out:
  *
  * \param chan Waitset's per-channel state
  */
-errval_t waitset_chan_deregister_disabled(struct waitset_chanstate *chan)
+errval_t waitset_chan_deregister_disabled(struct waitset_chanstate *chan,
+                                          dispatcher_handle_t handle)
 {
     assert_disabled(chan != NULL);
     struct waitset *ws = chan->waitset;
@@ -841,61 +734,29 @@ errval_t waitset_chan_deregister_disabled(struct waitset_chanstate *chan)
     chan->waitset = NULL;
     assert_disabled(chan->next != NULL && chan->prev != NULL);
 
-    if (chan->next == chan) {
-        // only thing in the list: must be the head
-        assert_disabled(chan->prev == chan);
-        switch (chan->state) {
-        case CHAN_IDLE:
-            assert_disabled(chan == ws->idle);
-            ws->idle = NULL;
-            break;
-
-        case CHAN_POLLED:
-            assert_disabled(chan == ws->polled);
-            ws->polled = NULL;
-            break;
-
-        case CHAN_PENDING:
-            assert_disabled(chan == ws->pending);
-            ws->pending = NULL;
-            break;
+    switch (chan->state) {
+    case CHAN_IDLE:
+        dequeue(&ws->idle, chan);
+        break;
 
-        default:
-            assert_disabled(!"invalid channel state in deregister");
-        }
-    } else {
-        assert_disabled(chan->prev != chan);
-        chan->prev->next = chan->next;
-        chan->next->prev = chan->prev;
-        switch (chan->state) {
-        case CHAN_IDLE:
-            if (chan == ws->idle) {
-                ws->idle = chan->next;
-            }
-            break;
+    case CHAN_POLLED:
+        dequeue(&ws->polled, chan);
+        dequeue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
+        break;
 
-        case CHAN_POLLED:
-            if (chan == ws->polled) {
-                ws->polled = chan->next;
-            }
-            break;
+    case CHAN_PENDING:
+        dequeue(&ws->pending, chan);
+        break;
 
-        case CHAN_PENDING:
-            if (chan == ws->pending) {
-                ws->pending = chan->next;
-            }
-            break;
+    case CHAN_WAITING:
+        dequeue(&ws->waiting, chan);
+        break;
 
-        default:
-            assert_disabled(!"invalid channel state in deregister");
-        }
+    default:
+        assert_disabled(!"invalid channel state in deregister");
     }
     chan->state = CHAN_UNREGISTERED;
-
-#ifndef NDEBUG
-    chan->prev = chan->next = NULL;
-#endif
-
+    chan->wait_for = NULL;
     return SYS_ERR_OK;
 }
 
@@ -910,7 +771,7 @@ errval_t waitset_chan_deregister_disabled(struct waitset_chanstate *chan)
 errval_t waitset_chan_deregister(struct waitset_chanstate *chan)
 {
     dispatcher_handle_t handle = disp_disable();
-    errval_t err = waitset_chan_deregister_disabled(chan);
+    errval_t err = waitset_chan_deregister_disabled(chan, handle);
     disp_enable(handle);
     return err;
 }
@@ -933,75 +794,23 @@ void waitset_chan_migrate(struct waitset_chanstate *chan,
 
     switch(chan->state) {
     case CHAN_IDLE:
-        if (chan->next == chan) {
-            assert(chan->prev == chan);
-            assert(ws->idle == chan);
-            ws->idle = NULL;
-        } else {
-            chan->prev->next = chan->next;
-            chan->next->prev = chan->prev;
-            if (ws->idle == chan) {
-                ws->idle = chan->next;
-            }
-        }
-
-        if (new_ws->idle == NULL) {
-            new_ws->idle = chan;
-            chan->next = chan->prev = chan;
-        } else {
-            chan->next = new_ws->idle;
-            chan->prev = new_ws->idle->prev;
-            chan->next->prev = chan;
-            chan->prev->next = chan;
-        }
+        dequeue(&ws->idle, chan);
+        enqueue(&new_ws->idle, chan);
         break;
 
     case CHAN_POLLED:
-        if (chan->next == chan) {
-            assert(chan->prev == chan);
-            assert(ws->polled == chan);
-            ws->polled = NULL;
-        } else {
-            chan->prev->next = chan->next;
-            chan->next->prev = chan->prev;
-            if (ws->polled == chan) {
-                ws->polled = chan->next;
-            }
-        }
-
-        if (new_ws->polled == NULL) {
-            new_ws->polled = chan;
-            chan->next = chan->prev = chan;
-        } else {
-            chan->next = new_ws->polled;
-            chan->prev = new_ws->polled->prev;
-            chan->next->prev = chan;
-            chan->prev->next = chan;
-        }
+        dequeue(&ws->polled, chan);
+        enqueue(&new_ws->polled, chan);
         break;
 
     case CHAN_PENDING:
-        if (chan->next == chan) {
-            assert(chan->prev == chan);
-            assert(ws->pending == chan);
-            ws->pending = NULL;
-        } else {
-            chan->prev->next = chan->next;
-            chan->next->prev = chan->prev;
-            if (ws->pending == chan) {
-                ws->pending = chan->next;
-            }
-        }
+        dequeue(&ws->pending, chan);
+        enqueue(&new_ws->pending, chan);
+        break;
 
-        if (new_ws->pending == NULL) {
-            new_ws->pending = chan;
-            chan->next = chan->prev = chan;
-        } else {
-            chan->next = new_ws->pending;
-            chan->prev = new_ws->pending->prev;
-            chan->next->prev = chan;
-            chan->prev->next = chan;
-        }
+    case CHAN_WAITING:
+        dequeue(&ws->waiting, chan);
+        enqueue(&new_ws->waiting, chan);
         break;
 
     case CHAN_UNREGISTERED:
@@ -1037,59 +846,26 @@ errval_t waitset_chan_trigger_disabled(struct waitset_chanstate *chan,
     }
 
     // remove from previous queue (either idle or polled)
-    if (chan->next == chan) {
-        assert_disabled(chan->prev == chan);
-        if (chan->state == CHAN_IDLE) {
-            assert_disabled(ws->idle == chan);
-            ws->idle = NULL;
-        } else {
-            assert_disabled(chan->state == CHAN_POLLED);
-            assert_disabled(ws->polled == chan);
-            ws->polled = NULL;
-        }
+    if (chan->state == CHAN_IDLE) {
+        dequeue(&ws->idle, chan);
     } else {
-        chan->prev->next = chan->next;
-        chan->next->prev = chan->prev;
-        if (chan->state == CHAN_IDLE) {
-            if (ws->idle == chan) {
-                ws->idle = chan->next;
-            }
-        } else {
-            assert_disabled(chan->state == CHAN_POLLED);
-            if (ws->polled == chan) {
-                ws->polled = chan->next;
-            }
-        }
+        assert_disabled(chan->state == CHAN_POLLED);
+        dequeue(&ws->polled, chan);
+        dequeue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
     }
 
+    // else mark channel pending and move to end of pending event queue
+    enqueue(&ws->pending, chan);
+    chan->state = CHAN_PENDING;
+
     // is there a thread blocked on this waitset? if so, awaken it with the event
-    if (ws->waiting_threads != NULL) {
-        chan->waitset = NULL;
-#ifndef NDEBUG
-        chan->prev = chan->next = NULL;
-#endif
-        chan->state = CHAN_UNREGISTERED;
+    struct thread *thread = find_recipient(ws, chan, thread_self());
+    if (thread) {
         struct thread *t;
+        ws->waiting_threads = thread;
         t = thread_unblock_one_disabled(handle, &ws->waiting_threads, chan);
         assert_disabled(t == NULL);
-        return SYS_ERR_OK;
-    }
-
-    // else mark channel pending and move to end of pending event queue
-    chan->state = CHAN_PENDING;
-    if (ws->pending == NULL) {
-        ws->pending = chan;
-        chan->next = chan->prev = chan;
-    } else {
-        chan->next = ws->pending;
-        chan->prev = ws->pending->prev;
-        assert_disabled(ws->pending->next != NULL);
-        assert_disabled(ws->pending->prev != NULL);
-        assert_disabled(chan->prev != NULL);
-        chan->next->prev = chan;
-        chan->prev->next = chan;
     }
-
     return SYS_ERR_OK;
 }
 
@@ -1105,9 +881,9 @@ errval_t waitset_chan_trigger_disabled(struct waitset_chanstate *chan,
  */
 errval_t waitset_chan_trigger(struct waitset_chanstate *chan)
 {
-    dispatcher_handle_t disp = disp_disable();
-    errval_t err = waitset_chan_trigger_disabled(chan, disp);
-    disp_enable(disp);
+    dispatcher_handle_t handle = disp_disable();
+    errval_t err = waitset_chan_trigger_disabled(chan, handle);
+    disp_enable(handle);
     return err;
 }
 
@@ -1141,29 +917,21 @@ errval_t waitset_chan_trigger_closure_disabled(struct waitset *ws,
     // set closure
     chan->closure = closure;
 
+    // mark channel pending and place on end of pending event queue
+    chan->waitset = ws;
+    enqueue(&ws->pending, chan);
+    // if (first)
+    //     ws->pending = chan;
+    chan->state = CHAN_PENDING;
+
     // is there a thread blocked on this waitset? if so, awaken it with the event
-    if (ws->waiting_threads != NULL) {
+    struct thread *thread = find_recipient(ws, chan, thread_self());
+    if (thread) {
         struct thread *t;
+        ws->waiting_threads = thread;
         t = thread_unblock_one_disabled(handle, &ws->waiting_threads, chan);
         assert_disabled(t == NULL);
-        return SYS_ERR_OK;
-    }
-
-    // mark channel pending and place on end of pending event queue
-    chan->waitset = ws;
-    chan->state = CHAN_PENDING;
-    if (ws->pending == NULL) {
-        ws->pending = chan;
-        chan->next = chan->prev = chan;
-    } else {
-        chan->next = ws->pending;
-        chan->prev = ws->pending->prev;
-        chan->next->prev = chan;
-        chan->prev->next = chan;
     }
-
-    assert(ws->pending->prev != NULL && ws->pending->next != NULL);
-
     return SYS_ERR_OK;
 }