Multi-threaded waitsets, tokens added to IPC messages.
authorAdam Turowski <adam.turowski@inf.ethz.ch>
Tue, 26 Jul 2016 08:22:50 +0000 (10:22 +0200)
committerAdam Turowski <adam.turowski@inf.ethz.ch>
Tue, 26 Jul 2016 08:22:50 +0000 (10:22 +0200)
Signed-off-by: Adam Turowski <adam.turowski@inf.ethz.ch>

24 files changed:
include/barrelfish/barrelfish.h
include/barrelfish/dispatcher.h
include/barrelfish/lmp_chan.h
include/barrelfish/threads.h
include/barrelfish/ump_chan.h
include/barrelfish/ump_endpoint.h
include/barrelfish/ump_impl.h
include/barrelfish/waitset.h
include/barrelfish_kpi/dispatcher_shared.h
include/flounder/flounder.h
include/flounder/flounder_support.h
include/flounder/flounder_support_lmp.h
include/flounder/flounder_support_ump.h
lib/barrelfish/dispatch.c
lib/barrelfish/flounder_support.c
lib/barrelfish/include/threads_priv.h
lib/barrelfish/include/waitset_chan_priv.h
lib/barrelfish/lmp_chan.c
lib/barrelfish/lmp_endpoints.c
lib/barrelfish/multihop_chan.c
lib/barrelfish/threads.c
lib/barrelfish/ump_chan.c
lib/barrelfish/ump_endpoint.c
lib/barrelfish/waitset.c

index 5f91cad..1fad374 100644 (file)
@@ -82,6 +82,14 @@ static inline uint8_t log2ceil(uintptr_t num)
     }
 }
 
+/// Duplicate memory
+static inline void * memdup(const void *ptr, size_t size) {
+    void *res = malloc(size);
+    assert(res);
+    memcpy(res, ptr, size);
+    return res;
+}
+
 /* XXX: glue junk for old IDC system, to be removed!! */
 
 void messages_wait_and_handle_next(void);
index 42123fc..d59164b 100644 (file)
@@ -79,11 +79,14 @@ struct dispatcher_generic {
     /// size of the eh frame
     size_t   eh_frame_size;
 
-        /// virtual address of the eh_frame
+    /// virtual address of the eh_frame
     lvaddr_t eh_frame_hdr;
 
     /// size of the eh frame
     size_t   eh_frame_hdr_size;
+
+    /// list of polled channels
+    struct waitset_chanstate *polled_channels;
 };
 
 #endif // BARRELFISH_DISPATCHER_H
index 271a6f1..0a99f37 100644 (file)
@@ -134,6 +134,17 @@ static inline errval_t lmp_chan_recv(struct lmp_chan *lc,
 }
 
 /**
+ * \brief Check if a channel has data to receive
+ */
+
+static inline bool lmp_chan_can_recv(struct lmp_chan *lc)
+{
+    assert(lc);
+    assert(lc->endpoint);
+    return lmp_endpoint_can_recv(lc->endpoint);
+}
+
+/**
  * \brief Set the receive capability slot for an LMP channel
  *
  * \param lc LMP channel
@@ -179,6 +190,15 @@ static inline lmp_send_flags_t idc_control_to_lmp_flags(idc_control_t control,
     }
 }
 
+/**
+ * \brief Get a receiving chanstate of LMP channel
+ */
+static inline struct waitset_chanstate * lmp_chan_get_receiving_channel(struct lmp_chan *chan)
+{
+    assert(chan->endpoint);
+    return &chan->endpoint->waitset_state;
+}
+
 #include <barrelfish/lmp_chan_arch.h>
 
 __END_DECLS
index f4afb01..5b7adf3 100644 (file)
@@ -23,6 +23,7 @@
 #include <barrelfish_kpi/registers_arch.h>
 #include <barrelfish_kpi/dispatcher_handle.h>
 #include <errors/errno.h>
+#include <barrelfish/waitset.h>
 
 __BEGIN_DECLS
 
@@ -42,7 +43,7 @@ errval_t thread_join(struct thread *thread, int *retval);
 errval_t thread_detach(struct thread *thread);
 
 void thread_pause(struct thread *thread);
-void thread_pause_and_capture_state(struct thread *thread, 
+void thread_pause_and_capture_state(struct thread *thread,
                                     arch_registers_state_t **ret_regs,
                                     arch_registers_fpu_state_t **ret_fpuregs);
 void thread_resume(struct thread *thread);
@@ -75,6 +76,20 @@ uintptr_t thread_id(void);
 uintptr_t thread_get_id(struct thread *t);
 void thread_set_id(uintptr_t id);
 
+uint32_t thread_set_token(struct waitset_chanstate *channel);
+void thread_clear_token(struct waitset_chanstate *channel);
+uint32_t thread_current_token(void);
+
+void thread_set_outgoing_token(uint32_t token);
+void thread_get_outgoing_token(uint32_t *token);
+
+struct flounder_rpc_context;
+
+void thread_set_rpc_in_progress(bool v);
+bool thread_get_rpc_in_progress(void);
+void thread_set_async_error(errval_t e);
+errval_t thread_get_async_error(void);
+
 extern __thread thread_once_t thread_once_local_epoch;
 extern void thread_once_internal(thread_once_t *control, void (*func)(void));
 
index a5089c7..dee287c 100644 (file)
@@ -118,9 +118,16 @@ static inline errval_t ump_chan_recv(struct ump_chan *uc,
                                      volatile struct ump_message **msg)
 {
     assert(msg != NULL);
+    assert(uc != NULL);
     return ump_endpoint_recv(&uc->endpoint, msg);
 }
 
+static inline errval_t ump_chan_can_recv(struct ump_chan *uc)
+{
+    assert(uc != NULL);
+    return ump_endpoint_can_recv(&uc->endpoint);
+}
+
 static inline volatile struct ump_message *ump_chan_get_next(
                                 struct ump_chan *uc, struct ump_control *ctrl)
 {
@@ -140,6 +147,12 @@ static inline void ump_chan_migrate_recv(struct ump_chan *lc,
     ump_endpoint_migrate(&lc->endpoint, ws);
 }
 
+static inline struct waitset_chanstate * ump_chan_get_receiving_channel(struct ump_chan *chan)
+{
+    return &chan->endpoint.waitset_state;
+}
+
+
 __END_DECLS
 
 #endif // BARRELFISH_UMP_CHAN_H
index 899f024..f6800ef 100644 (file)
@@ -64,6 +64,18 @@ static inline errval_t ump_endpoint_recv(struct ump_endpoint *ep,
     }
 }
 
+static inline bool ump_endpoint_poll(struct waitset_chanstate *channel)
+{
+    struct ump_endpoint *ep = (struct ump_endpoint *)
+        ((char *)channel - offsetof(struct ump_endpoint, waitset_state));
+
+    if (ump_endpoint_can_recv(ep)) {
+        return true;
+    }
+    return false;
+}
+
+
 __END_DECLS
 
 #endif // LIBBARRELFISH_UMP_ENDPOINT_H
index f73304e..bf7cd3b 100644 (file)
@@ -44,10 +44,10 @@ __BEGIN_DECLS
 // This needs to be such that ump_payload defined in params in flounder/UMP.hs
 // and the size of the UMP headers fits into it. It also needs to be a multiple
 // of a cache-line.
-#define UMP_PAYLOAD_BYTES  64
-#define UMP_PAYLOAD_WORDS  (UMP_PAYLOAD_BYTES / sizeof(uintptr_t) - 1)
-#define UMP_MSG_WORDS      (UMP_PAYLOAD_WORDS + 1)
-#define UMP_MSG_BYTES      (UMP_MSG_WORDS * sizeof(uintptr_t))
+#define UMP_MSG_BYTES      64
+#define UMP_MSG_WORDS      (UMP_MSG_BYTES / sizeof(uintptr_t))
+#define UMP_PAYLOAD_BYTES  (UMP_MSG_BYTES - sizeof(uint64_t))
+#define UMP_PAYLOAD_WORDS  (UMP_PAYLOAD_BYTES / sizeof(uintptr_t))
 
 /// Default size of a unidirectional UMP message buffer, in bytes
 #define DEFAULT_UMP_BUFLEN  (BASE_PAGE_SIZE / 2 / UMP_MSG_BYTES * UMP_MSG_BYTES)
@@ -60,16 +60,17 @@ typedef uint32_t ump_control_t;
 struct ump_control {
     ump_control_t epoch:UMP_EPOCH_BITS;
     ump_control_t header:UMP_HEADER_BITS;
+    ump_control_t token:32;
 };
 
 struct ump_message {
     uintptr_t data[UMP_PAYLOAD_WORDS] __attribute__((aligned (CACHELINE_BYTES)));
     union {
         struct ump_control control;
-        uintptr_t raw;
+        uint64_t raw;
     } header;
 };
-STATIC_ASSERT((sizeof(struct ump_message)%CACHELINE_BYTES)==0, 
+STATIC_ASSERT((sizeof(struct ump_message)%CACHELINE_BYTES)==0,
                "Size of UMP message is not a multiple of cache-line size");
 
 /// Type used for indices of UMP message slots
@@ -152,9 +153,8 @@ static inline volatile struct ump_message *ump_impl_poll(struct ump_chan_state *
     ump_control_t ctrl_epoch =  c->buf[c->pos].header.control.epoch;
     if (ctrl_epoch == c->epoch) {
         return &c->buf[c->pos];
-    } else {
-        return NULL;
     }
+    return NULL;
 }
 
 /**
@@ -169,15 +169,14 @@ static inline volatile struct ump_message *ump_impl_recv(struct ump_chan_state *
 {
     volatile struct ump_message *msg = ump_impl_poll(c);
 
-    if(msg != NULL) {
+    if (msg != NULL) {
         if (++c->pos == c->bufmsgs) {
             c->pos = 0;
             c->epoch = !c->epoch;
         }
         return msg;
-    } else {
-        return NULL;
     }
+    return NULL;
 }
 
 /**
@@ -196,6 +195,7 @@ static inline volatile struct ump_message *ump_impl_get_next(
 
     // construct header
     ctrl->epoch = c->epoch;
+    ctrl->token = 0;
 
 #ifdef __x86_64__
     if(debug_notify_syscall) {
index aecda10..f2e208b 100644 (file)
@@ -19,6 +19,7 @@
 #include <barrelfish/types.h>
 #include <errors/errno.h>
 #include <sys/cdefs.h>
+#include <barrelfish/dispatch.h>
 
 __BEGIN_DECLS
 
@@ -64,7 +65,8 @@ enum ws_chanstate {
     CHAN_UNREGISTERED,  ///< Initialised, but not yet registered on a waitset
     CHAN_IDLE,          ///< Has a registered event handler, but the event has not fired
     CHAN_POLLED,        ///< Idle and polled. Channel implementation must be called to check for pending events
-    CHAN_PENDING        ///< Has a pending event waiting to be delivered
+    CHAN_PENDING,       ///< Has a pending event waiting to be delivered
+    CHAN_WAITING        ///< There's no registered event handler (for now)
 };
 
 /**
@@ -79,6 +81,11 @@ struct waitset_chanstate {
     struct event_closure closure;           ///< Event closure to run when channel is ready
     enum ws_chantype chantype;              ///< Channel type
     enum ws_chanstate state;                ///< Channel event state
+
+    uint32_t token;                         ///< Token of an event
+    bool persistent;                        ///< Channel should be always registered
+    struct waitset_chanstate *polled_next, *polled_prev;    ///< Dispatcher's polled queue
+    struct thread *wait_for;                ///< Thread waiting for this event
 };
 
 /**
@@ -90,21 +97,25 @@ struct waitset_chanstate {
 struct waitset {
     struct waitset_chanstate *pending, ///< Channels with pending events
                              *polled,  ///< Channels that need to be polled
-                             *idle;    ///< All other channels on this waitset
+                             *idle,    ///< All other channels on this waitset
+                             *waiting; ///< Channels waiting for an event handler registration
 
     /// Queue of threads blocked on this waitset (when no events are pending)
     struct thread *waiting_threads;
-
-    /// Is a thread currently polling this waitset?
-    volatile bool polling;
 };
 
+void poll_channels_disabled(dispatcher_handle_t handle);
+
 void waitset_init(struct waitset *ws);
 errval_t waitset_destroy(struct waitset *ws);
 
 errval_t get_next_event(struct waitset *ws, struct event_closure *retclosure);
-errval_t check_for_event(struct waitset *ws, struct event_closure *retclosure);
+errval_t get_next_event_disabled(struct waitset *ws, struct waitset_chanstate **retchan,
+        struct event_closure *retclosure, struct waitset_chanstate *waitfor, dispatcher_handle_t handle, bool debug);
+errval_t check_for_event(struct waitset *ws);
 errval_t event_dispatch(struct waitset *ws);
+errval_t wait_for_channel(struct waitset *ws, struct waitset_chanstate *channel, errval_t *error_var);
+errval_t event_dispatch_disabled(struct waitset *ws, dispatcher_handle_t handle);
 errval_t event_dispatch_debug(struct waitset *ws);
 errval_t event_dispatch_non_block(struct waitset *ws);
 
index fc3e5eb..85003fd 100644 (file)
@@ -38,10 +38,10 @@ enum task_type {
 
 ///< Architecture generic kernel/user shared dispatcher struct
 struct dispatcher_shared_generic {
-    uint32_t   disabled;           ///< Disabled flag (Must be able to change atomically)
-    uint32_t   haswork;            ///< Has work (ie. is runnable) (Must be able to change atomically)
+    uint32_t   disabled;                        ///< Disabled flag (Must be able to change atomically)
+    uint32_t   haswork;                         ///< Has work (ie. is runnable) (Must be able to change atomically)
 
-    lvaddr_t    udisp;             ///< User-mode pointer to dispatcher
+    lvaddr_t    udisp;                          ///< User-mode pointer to dispatcher
     uint32_t    lmp_delivered, lmp_seen;        ///< # LMP words delivered and seen
     lvaddr_t    lmp_hint;                       ///< Hint for location of LMP
     lvaddr_t    dispatcher_run;                 ///< Run entry
@@ -50,14 +50,14 @@ struct dispatcher_shared_generic {
     lvaddr_t    dispatcher_pagefault_disabled;  ///< Disabled pagefault entry
     lvaddr_t    dispatcher_trap;                ///< Trap entry
 
-    systime_t   systime; ///< System time when last dispatched/resumed (W/O to kernel)
-    systime_t   wakeup;  ///< System time at which to wake dispatcher from sleep (R/O by kernel, on yield)
+    systime_t   systime;                        ///< System time when last dispatched/resumed (W/O to kernel)
+    systime_t   wakeup;                         ///< System time at which to wake dispatcher from sleep (R/O by kernel, on yield)
 
-    char        name[DISP_NAME_LEN];///< Name of domain, for debugging purposes
-    uint32_t    fpu_used;               ///< Was FPU used while disabled?
-    uint32_t    fpu_trap;               ///< State of FPU trap
-       
-    coreid_t    curr_core_id;  ///< Core id of current core, in this part so kernel can update
+    char        name[DISP_NAME_LEN];            ///< Name of domain, for debugging purposes
+    uint32_t    fpu_used;                       ///< Was FPU used while disabled?
+    uint32_t    fpu_trap;                       ///< State of FPU trap
+
+    coreid_t    curr_core_id;                   ///< Core id of current core, in this part so kernel can update
 #ifdef __k1om__
     uint8_t     xeon_phi_id;
 #endif
index a5d6aaf..cf093fd 100644 (file)
 /// No-op continuation, to be passed to message send functions
 #define NOP_CONT    NOP_CLOSURE
 
+/// Blocking continuation, block until a sending completes
+void blocking_cont(void *v);
+#define BLOCKING_CONT   MKCLOSURE(blocking_cont, NULL)
+
 /// Utility macro to construct a continuation structure (handler & arg)
 #define MKCONT(h,a) MKCLOSURE(h,a)
 
index 600412d..cbdb87c 100644 (file)
@@ -43,6 +43,7 @@ errval_t flounder_support_register(struct waitset *ws,
                                    bool trigger_now);
 void flounder_support_deregister_chan(struct waitset_chanstate *wc);
 void flounder_support_waitset_chanstate_init(struct waitset_chanstate *wc);
+void flounder_support_waitset_chanstate_init_persistent(struct waitset_chanstate *wc);
 void flounder_support_waitset_chanstate_destroy(struct waitset_chanstate *wc);
 errval_t flounder_support_change_monitor_waitset(struct monitor_binding *mb,
                                                  struct waitset *ws);
index 42f8e98..c36ae4e 100644 (file)
@@ -26,14 +26,14 @@ errval_t flounder_stub_lmp_send_string(struct lmp_chan *chan,
                                        lmp_send_flags_t flags,
                                        const char *str,
                                        size_t *pos, size_t *len);
-errval_t flounder_stub_lmp_recv_string(struct lmp_recv_msg *msg, char **str,
-                                       size_t *pos, size_t *len);
+errval_t flounder_stub_lmp_recv_string(struct lmp_recv_msg *msg, char *str,
+                                       size_t *pos, size_t *len, size_t maxsize);
 
 errval_t flounder_stub_lmp_send_buf(struct lmp_chan *chan,
                                     lmp_send_flags_t flags, const void *buf,
                                     size_t len, size_t *pos);
-errval_t flounder_stub_lmp_recv_buf(struct lmp_recv_msg *msg, void **buf,
-                                    size_t *len, size_t *pos);
+errval_t flounder_stub_lmp_recv_buf(struct lmp_recv_msg *msg, void *buf,
+                                    size_t *len, size_t *pos, size_t maxsize);
 
 __END_DECLS
 
index 31f6a4f..963b7ca 100644 (file)
@@ -43,6 +43,7 @@ struct flounder_ump_state {
     ump_index_t last_ack;  ///< Last acknowledgement we sent to remote
 
     struct flounder_cap_state capst; ///< State for indirect cap tx/rx machinery
+    uint32_t token;        ///< Outgoing message's token
 };
 
 void flounder_stub_ump_state_init(struct flounder_ump_state *s, void *binding);
@@ -52,18 +53,22 @@ errval_t flounder_stub_ump_send_string(struct flounder_ump_state *s,
                                        size_t *pos, size_t *len);
 
 errval_t flounder_stub_ump_recv_string(volatile struct ump_message *msg,
-                                       char **str, size_t *pos, size_t *len);
+                                       char *str, size_t *pos, size_t *len,
+                                       size_t maxsize);
 
 errval_t flounder_stub_ump_send_buf(struct flounder_ump_state *s,
                                        int msgnum, const void *buf,
                                        size_t len, size_t *pos);
 
 errval_t flounder_stub_ump_recv_buf(volatile struct ump_message *msg,
-                                    void **buf, size_t *len, size_t *pos);
+                                    void *buf, size_t *len, size_t *pos,
+                                    size_t maxsize);
+
 
 /// Computes (from seq/ack numbers) whether we can currently send on the channel
 static inline bool flounder_stub_ump_can_send(struct flounder_ump_state *s) {
-    return (ump_index_t)(s->next_id - s->ack_id) <= s->chan.max_send_msgs;
+    bool r = (ump_index_t)(s->next_id - s->ack_id) <= s->chan.max_send_msgs;
+    return r;
 }
 
 #define ENABLE_MESSAGE_PASSING_TRACE 1
@@ -80,6 +85,7 @@ static inline void flounder_stub_ump_control_fill(struct flounder_ump_state *s,
     assert(s->chan.sendid != 0);
     assert(msgtype < (1 << FL_UMP_MSGTYPE_BITS)); // check for overflow
     ctrl->header = ((uintptr_t)msgtype << UMP_INDEX_BITS) | (uintptr_t)s->seq_id;
+    ctrl->token = s->token;
     s->last_ack = s->seq_id;
     s->next_id++;
 }
index 7b64646..01030a6 100644 (file)
@@ -110,6 +110,8 @@ void disp_run(dispatcher_handle_t handle)
     // Trigger any send events for LMP channels
     lmp_channels_retry_send_disabled(handle);
 #endif // CONFIG_INTERCONNECT_DRIVER_LMP
+    // Check polled channels
+    poll_channels_disabled(handle);
 
     // Run, saving state of previous thread if required
     thread_run_disabled(handle);
index bbf0cc4..e0132a6 100644 (file)
 #include <flounder/flounder_support_caps.h>
 #include <if/monitor_defs.h>
 
+/// Special continuation for blocking
+void blocking_cont(void *v)
+{
+    debug_printf("%s: should never be called!\n", __func__);
+    assert(0);
+}
+
 /*
  * NB: many of these functions are trivial, but exist so that we don't need to
  * expose private libbarrelfish headers or generated flounder headers to every
@@ -61,6 +68,10 @@ errval_t flounder_support_register(struct waitset *ws,
     if (trigger_now) {
         return waitset_chan_trigger_closure(ws, wc, ec);
     } else {
+        if (ec.handler == blocking_cont) {
+            assert(!wc->wait_for);          // this event should be received
+            wc->wait_for = thread_self();   // only by our thread
+        }
         return waitset_chan_register(ws, wc, ec);
     }
 }
@@ -70,6 +81,12 @@ void flounder_support_waitset_chanstate_init(struct waitset_chanstate *wc)
     waitset_chanstate_init(wc, CHANTYPE_FLOUNDER);
 }
 
+void flounder_support_waitset_chanstate_init_persistent(struct waitset_chanstate *wc)
+{
+    waitset_chanstate_init(wc, CHANTYPE_FLOUNDER);
+    wc->persistent = true;
+}
+
 void flounder_support_waitset_chanstate_destroy(struct waitset_chanstate *wc)
 {
     waitset_chanstate_destroy(wc);
@@ -169,7 +186,7 @@ static void putword(uintptr_t word, uint8_t *buf, size_t *pos, size_t len)
     }
 
     for (int i = 0; *pos < len && i < sizeof(uintptr_t); i++) {
-        buf[(*pos)++] = (word & ((uintptr_t)0xff << shift_bits)) >> shift_bits;
+        buf[(*pos)++] = word >> shift_bits;
         word <<= NBBY;
     }
 }
@@ -289,11 +306,13 @@ errval_t flounder_stub_lmp_send_buf(struct lmp_chan *chan,
     return err;
 }
 
-errval_t flounder_stub_lmp_recv_buf(struct lmp_recv_msg *msg, void **bufp,
-                                    size_t *len, size_t *pos)
+errval_t flounder_stub_lmp_recv_buf(struct lmp_recv_msg *msg, void *buf,
+                                    size_t *len, size_t *pos, size_t maxsize)
 {
     int msgpos;
 
+    assert(buf);
+
     // is this the first fragment?
     // if so, unmarshall the length and allocate a buffer
     if (*pos == 0) {
@@ -302,21 +321,12 @@ errval_t flounder_stub_lmp_recv_buf(struct lmp_recv_msg *msg, void **bufp,
         }
 
         *len = msg->words[0];
-        if (*len == 0) {
-            *bufp = NULL;
-        } else {
-            *bufp = malloc(*len);
-            if (*bufp == NULL) {
-                return LIB_ERR_MALLOC_FAIL;
-            }
-        }
+        assert(*len < maxsize);
         msgpos = 1;
     } else {
         msgpos = 0;
     }
 
-    uint8_t *buf = *bufp;
-
     // copy remainder of fragment to buffer
     for (; msgpos < msg->buf.msglen && *pos < *len; msgpos++) {
         putword(msg->words[msgpos], buf, pos, *len);
@@ -350,10 +360,16 @@ errval_t flounder_stub_lmp_send_string(struct lmp_chan *chan,
     return flounder_stub_lmp_send_buf(chan, flags, str, *len, pos);
 }
 
-errval_t flounder_stub_lmp_recv_string(struct lmp_recv_msg *msg, char **strp,
-                                       size_t *pos, size_t *len)
+errval_t flounder_stub_lmp_recv_string(struct lmp_recv_msg *msg, char *str,
+                                       size_t *pos, size_t *len, size_t maxsize)
 {
-    return flounder_stub_lmp_recv_buf(msg, (void **)strp, len, pos);
+    errval_t err;
+
+    err = flounder_stub_lmp_recv_buf(msg, (void *)str, len, pos, maxsize);
+    if (*len == 0) {
+        str[0] = '\0';
+    }
+    return err;
 }
 #endif // CONFIG_INTERCONNECT_DRIVER_LMP
 
@@ -368,6 +384,7 @@ void flounder_stub_ump_state_init(struct flounder_ump_state *s, void *binding)
     s->seq_id = 0;
     s->ack_id = 0;
     s->last_ack = 0;
+    s->token = 0;
     flounder_stub_cap_state_init(&s->capst, binding);
 }
 
@@ -415,31 +432,24 @@ errval_t flounder_stub_ump_send_buf(struct flounder_ump_state *s,
 }
 
 errval_t flounder_stub_ump_recv_buf(volatile struct ump_message *msg,
-                                    void **bufp, size_t *len, size_t *pos)
+                                    void *buf, size_t *len, size_t *pos,
+                                    size_t maxsize)
 {
     int msgpos;
 
+    assert(buf);
+
     // is this the first fragment?
     // if so, unmarshall the length and allocate a buffer
     if (*pos == 0) {
         *len = msg->data[0];
-        if (*len == 0) {
-            *bufp = NULL;
-        } else {
-            *bufp = malloc(*len);
-            if (*bufp == NULL) {
-                return LIB_ERR_MALLOC_FAIL;
-            }
-        }
-
+        assert(*len <= maxsize);
         // XXX: skip as many words as the largest word size
         msgpos = (sizeof(uint64_t) / sizeof(uintptr_t));
     } else {
         msgpos = 0;
     }
 
-    uint8_t *buf = *bufp;
-
     // copy remainder of fragment to buffer
     for (; msgpos < UMP_PAYLOAD_WORDS && *pos < *len; msgpos++) {
         putword(msg->data[msgpos], buf, pos, *len);
@@ -473,9 +483,16 @@ errval_t flounder_stub_ump_send_string(struct flounder_ump_state *s,
 }
 
 errval_t flounder_stub_ump_recv_string(volatile struct ump_message *msg,
-                                       char **strp, size_t *pos, size_t *len)
+                                       char *str, size_t *pos, size_t *len,
+                                       size_t maxsize)
 {
-    return flounder_stub_ump_recv_buf(msg, (void **)strp, len, pos);
+    errval_t err;
+
+    err = flounder_stub_ump_recv_buf(msg, (void *)str, len, pos, maxsize);
+    if (*len == 0) {
+        str[0] = '\0';
+    }
+    return err;
 }
 
 #endif // CONFIG_INTERCONNECT_DRIVER_UMP
index 760450b..692bc47 100644 (file)
@@ -65,7 +65,7 @@ struct thread {
     bool                paused;             ///< Thread is paused (not runnable)
     bool                detached;           ///< true if detached
     bool                joining;            ///< true if someone is joining
-    bool                in_exception;       ///< true iff running exception handler
+    bool                in_exception;       ///< true if running exception handler
     bool                used_fpu;           ///< Ever used FPU?
 #if defined(__x86_64__)
     uint16_t            thread_seg_selector; ///< Segment selector for TCB
@@ -73,6 +73,14 @@ struct thread {
     arch_registers_fpu_state_t fpu_state;   ///< FPU state
     void                *slab;              ///< Base of slab block containing this TCB
     uintptr_t           id;                 ///< User-defined thread identifier
+
+    uint32_t            token_number;      ///< RPC next token
+    uint32_t            token;             ///< Token to be received
+    struct waitset_chanstate *channel;      ///< on right channel
+
+    bool    rpc_in_progress;               ///< RPC in progress
+    errval_t    async_error;                ///< RPC async error
+    uint32_t    outgoing_token;             ///< Token of outgoing message
 };
 
 void thread_enqueue(struct thread *thread, struct thread **queue);
@@ -82,7 +90,7 @@ void thread_remove_from_queue(struct thread **queue, struct thread *thread);
 /* must only be called by dispatcher, while disabled */
 void thread_init_disabled(dispatcher_handle_t handle, bool init_domain);
 
-/// Returns true iff there is non-threaded work to be done on this dispatcher
+/// Returns true if there is non-threaded work to be done on this dispatcher
 /// (ie. if we still need to run)
 static inline bool havework_disabled(dispatcher_handle_t handle)
 {
@@ -91,6 +99,7 @@ static inline bool havework_disabled(dispatcher_handle_t handle)
 #ifdef CONFIG_INTERCONNECT_DRIVER_LMP
             || disp->lmp_send_events_list != NULL
 #endif
+            || disp->polled_channels != NULL
             ;
 }
 
index 37408ee..ba695a1 100644 (file)
@@ -23,7 +23,8 @@ errval_t waitset_chan_trigger_closure_disabled(struct waitset *ws,
                                                struct waitset_chanstate *chan,
                                                struct event_closure closure,
                                                dispatcher_handle_t handle);
-errval_t waitset_chan_deregister_disabled(struct waitset_chanstate *chan);
+errval_t waitset_chan_deregister_disabled(struct waitset_chanstate *chan,
+                                          dispatcher_handle_t handle);
 errval_t waitset_chan_register_disabled(struct waitset *ws,
                                         struct waitset_chanstate *chan,
                                         struct event_closure closure);
@@ -31,7 +32,5 @@ errval_t waitset_chan_register_polled_disabled(struct waitset *ws,
                                                struct waitset_chanstate *chan,
                                                struct event_closure closure,
                                                dispatcher_handle_t handle);
-errval_t waitset_chan_start_polling(struct waitset_chanstate *chan);
-errval_t waitset_chan_stop_polling(struct waitset_chanstate *chan);
 
 #endif // BARRELFISH_WAITSET_CHAN_PRIV_H
index e321bf4..c41c4dc 100644 (file)
@@ -213,7 +213,7 @@ void lmp_chan_destroy(struct lmp_chan *lc)
 struct bind_lmp_reply_state {
     struct monitor_binding *b;
     struct lmp_chan *lc;
-    struct monitor_bind_lmp_reply_monitor__args args;
+    struct monitor_bind_lmp_reply_monitor__tx_args args;
     struct event_queue_node qnode;
 };
 
index 102468b..59441ce 100644 (file)
@@ -90,7 +90,7 @@ void lmp_endpoint_free(struct lmp_endpoint *ep)
  * \param dest    Location of empty slot in which to create endpoint
  * \param retep   Double pointer to LMP endpoint, filled-in with allocated EP
  *
- * This function mints into the given slot an endpoint capability to the 
+ * This function mints into the given slot an endpoint capability to the
  * current dispatcher.
  */
 errval_t lmp_endpoint_create_in_slot(size_t buflen, struct capref dest,
@@ -318,7 +318,7 @@ errval_t lmp_endpoint_deregister(struct lmp_endpoint *ep)
     dispatcher_handle_t handle = disp_disable();
     struct dispatcher_generic *dp = get_dispatcher_generic(handle);
 
-    errval_t err = waitset_chan_deregister_disabled(&ep->waitset_state);
+    errval_t err = waitset_chan_deregister_disabled(&ep->waitset_state, handle);
     if (err_is_ok(err)) {
         /* dequeue from poll list */
         if (ep->next == ep) {
index 428fc6a..7b90382 100644 (file)
@@ -243,7 +243,7 @@ static void multihop_new_monitor_binding_continuation2(void *st, errval_t err,
 struct bind_multihop_reply_state {
     struct multihop_chan *mc;
     struct monitor_binding *monitor_binding;
-    struct monitor_multihop_bind_service_reply__args args;
+    struct monitor_multihop_bind_service_reply__tx_args args;
     struct event_queue_node qnode;
 };
 
index c4658a2..203e6e5 100644 (file)
@@ -251,6 +251,11 @@ static void thread_init(dispatcher_handle_t disp, struct thread *newthread)
     newthread->used_fpu = false;
     newthread->paused = false;
     newthread->slab = NULL;
+    newthread->token = 0;
+    newthread->token_number = 1;
+
+    newthread->rpc_in_progress = false;
+    newthread->async_error = SYS_ERR_OK;
 }
 
 /**
@@ -602,6 +607,70 @@ void thread_set_id(uintptr_t id)
     me->id = id;
 }
 
+uint32_t thread_set_token(struct waitset_chanstate *channel)
+{
+    struct thread *me = thread_self();
+    // generate new token
+    uint32_t outgoing_token = (uint32_t)((me->id << 16) |
+         (me->coreid << 24) | ((me->token_number & 255) << 8)) | 1;
+    assert(me->token == 0);
+    me->token_number++;
+    me->token = outgoing_token & ~1;    // wait for this token
+    me->channel = channel;              // on that channel
+    return outgoing_token;
+}
+
+void thread_clear_token(struct waitset_chanstate *channel)
+{
+    struct thread *me = thread_self();
+
+    me->token = 0;      // don't wait anymore
+    me->channel = NULL;
+}
+
+uint32_t thread_current_token(void)
+{
+    return thread_self()->token;
+}
+
+void thread_set_outgoing_token(uint32_t token)
+{
+    struct thread *me = thread_self();
+
+    assert(!me->outgoing_token);
+    me->outgoing_token = token;
+}
+
+void thread_get_outgoing_token(uint32_t *token)
+{
+    struct thread *me = thread_self();
+    // if thread's outgoing token is set, get it
+    if (me->outgoing_token) {
+        *token = me->outgoing_token;
+        me->outgoing_token = 0;
+    }
+}
+
+void thread_set_rpc_in_progress(bool v)
+{
+    thread_self()->rpc_in_progress = v;
+}
+
+bool thread_get_rpc_in_progress(void)
+{
+    return thread_self()->rpc_in_progress;
+}
+
+void thread_set_async_error(errval_t e)
+{
+    thread_self()->async_error = e;
+}
+
+errval_t thread_get_async_error(void)
+{
+    return thread_self()->async_error;
+}
+
 /**
  * \brief Yield the calling thread
  *
@@ -629,6 +698,8 @@ void thread_yield(void)
         }
     } while(next->yield_epoch == disp_gen->timeslice);
 
+    poll_channels_disabled(handle);
+
     if (next != me) {
         fpu_context_switch(disp_gen, next);
         disp_gen->current = next;
index ef748be..6bda739 100644 (file)
@@ -93,7 +93,7 @@ static void bind_ump_reply_handler(struct monitor_binding *b, uintptr_t mon_id,
 struct bind_ump_reply_state {
     struct monitor_binding *b;
     struct ump_chan *uc;
-    struct monitor_bind_ump_reply_monitor__args args;
+    struct monitor_bind_ump_reply_monitor__tx_args args;
     struct event_queue_node qnode;
 };
 
@@ -105,7 +105,7 @@ static void send_bind_reply(void *arg)
 
     // send back a bind success/failure message to the monitor
     err =
-        st->b->tx_vtbl.bind_ump_reply_monitor(st->b, NOP_CONT, st->args.mon_id,
+        st->b->tx_vtbl.bind_ump_reply_monitor(b, NOP_CONT, st->args.mon_id,
                                               st->args.conn_id, st->args.err,
                                               st->args.notify);
     if (err_is_ok(err)) {
@@ -242,7 +242,7 @@ errval_t ump_chan_bind(struct ump_chan *uc, struct ump_bind_continuation cont,
     void *buf;
     err = vspace_map_one_frame_attr(&buf, framesize, uc->frame, UMP_MAP_ATTR,
                                     NULL, &uc->vregion);
-    if (err_is_fail(err)) { 
+    if (err_is_fail(err)) {
         cap_destroy(uc->frame);
         return err_push(err, LIB_ERR_VSPACE_MAP);
     }
index 76869c1..cdf694d 100644 (file)
@@ -63,7 +63,7 @@ errval_t ump_endpoint_register(struct ump_endpoint *ep, struct waitset *ws,
     assert(ep != NULL);
     assert(ws != NULL);
 
-    if (ump_endpoint_can_recv(ep)) { // trigger event immediately
+    if (ump_endpoint_poll(&ep->waitset_state)) { // trigger event immediately
         return waitset_chan_trigger_closure(ws, &ep->waitset_state, closure);
     } else {
         return waitset_chan_register_polled(ws, &ep->waitset_state, closure);
index 4cbda73..c676029 100644 (file)
 #include <stdio.h>
 #include <string.h>
 
+#include <flounder/flounder.h>
+
 #ifdef CONFIG_INTERCONNECT_DRIVER_UMP
 #  include <barrelfish/ump_endpoint.h>
 #endif
 
-#if defined(__k1om__) || defined(__aarch64__)
-#include <barrelfish_kpi/asm_inlines_arch.h>
-static inline cycles_t cyclecount(void)
+/// Dequeue a chanstate from a queue
+static void dequeue(struct waitset_chanstate **queue, struct waitset_chanstate *chan)
 {
-    return rdtsc();
+    if (chan->next == chan) {
+        assert(chan->prev == chan);
+        assert(*queue == chan);
+        *queue = NULL;
+    } else {
+        chan->prev->next = chan->next;
+        chan->next->prev = chan->prev;
+        if (*queue == chan) {
+            *queue = chan->next;
+        }
+    }
+    chan->prev = chan->next = NULL;
 }
-#elif defined(__x86_64__) || defined(__i386__)
-#include <arch/x86/barrelfish_kpi/asm_inlines_arch.h>
-static inline cycles_t cyclecount(void)
+
+/// Enqueue a chanstate on a queue
+static void enqueue(struct waitset_chanstate **queue, struct waitset_chanstate *chan)
 {
-    return rdtsc();
+    if (*queue == NULL) {
+        *queue = chan;
+        chan->next = chan->prev = chan;
+    } else {
+        chan->next = *queue;
+        chan->prev = (*queue)->prev;
+        chan->next->prev = chan;
+        chan->prev->next = chan;
+    }
 }
-#elif defined(__arm__) && defined(__gem5__)
-/**
- * XXX: Gem5 doesn't support the ARM performance monitor extension
- * therefore we just poll a fixed number of times instead of using
- * cycle counts. POLL_COUNT is deliberately set to 42, guess why! ;)
- */
-#define POLL_COUNT     42
-#elif defined(__aarch64__) && defined(__gem5__)
-#define POLL_COUNT  42
-#elif defined(__arm__)
-#include <arch/arm/barrelfish_kpi/asm_inlines_arch.h>
-static inline cycles_t cyclecount(void)
+
+/// Dequeue a chanstate from polled queue
+static void dequeue_polled(struct waitset_chanstate **queue,
+                            struct waitset_chanstate *chan)
 {
-    return get_cycle_count();
+    if (chan->polled_next == chan) {
+        assert(chan->polled_prev == chan);
+        assert(*queue == chan);
+        *queue = NULL;
+    } else {
+        chan->polled_prev->polled_next = chan->polled_next;
+        chan->polled_next->polled_prev = chan->polled_prev;
+        if (*queue == chan) {
+            *queue = chan->polled_next;
+        }
+    }
+    chan->polled_prev = chan->polled_next = NULL;
 }
-#else
-static inline cycles_t cyclecount(void)
+
+/// Enqueue a chanstate on polled queue
+static void enqueue_polled(struct waitset_chanstate **queue,
+                            struct waitset_chanstate *chan)
 {
-    USER_PANIC("called on non-x86 architecture. why are we polling?");
-    return 0;
+    if (*queue == NULL) {
+        *queue = chan;
+        chan->polled_next = chan->polled_prev = chan;
+    } else {
+        chan->polled_next = *queue;
+        chan->polled_prev = (*queue)->polled_prev;
+        chan->polled_next->polled_prev = chan;
+        chan->polled_prev->polled_next = chan;
+    }
 }
-#endif
-
-// FIXME: bogus default value. need to measure this at boot time
-#define WAITSET_POLL_CYCLES_DEFAULT 2000
-
-/// Maximum number of cycles to spend polling channels before yielding CPU
-cycles_t waitset_poll_cycles = WAITSET_POLL_CYCLES_DEFAULT;
 
 /**
  * \brief Initialise a new waitset
@@ -79,9 +104,8 @@ cycles_t waitset_poll_cycles = WAITSET_POLL_CYCLES_DEFAULT;
 void waitset_init(struct waitset *ws)
 {
     assert(ws != NULL);
-    ws->pending = ws->polled = ws->idle = NULL;
+    ws->pending = ws->polled = ws->idle = ws->waiting = NULL;
     ws->waiting_threads = NULL;
-    ws->polling = false;
 }
 
 /**
@@ -127,55 +151,61 @@ errval_t waitset_destroy(struct waitset *ws)
     return SYS_ERR_OK;
 }
 
-/// Returns a channel with a pending event on the given waitset, or NULL
-static struct waitset_chanstate *get_pending_event_disabled(struct waitset *ws)
+/// Check if a thread can receive an event
+static bool waitset_check_token(struct waitset_chanstate *chan,
+                                struct thread *thread)
 {
-    // are there any pending events on the waitset?
-    if (ws->pending == NULL) {
-        return NULL;
-    }
-
-    // dequeue next pending event
-    struct waitset_chanstate *chan = ws->pending;
-    if (chan->next == chan) {
-        assert_disabled(chan->prev == chan);
-        ws->pending = NULL;
-    } else {
-        ws->pending = chan->next;
-        chan->prev->next = chan->next;
-        chan->next->prev = chan->prev;
-    }
-#ifndef NDEBUG
-    chan->prev = chan->next = NULL;
-#endif
-
-    // mark not pending
-    assert_disabled(chan->state == CHAN_PENDING);
-    chan->state = CHAN_UNREGISTERED;
-    chan->waitset = NULL;
+    bool res = false;
 
-    return chan;
+    if (chan->wait_for) // if a thread is waiting for this specific event
+        res = chan->wait_for == thread;
+    else
+        res = (chan->token & 1 && !thread->token) // incoming token is a request
+            // and a thread is not waiting for a token
+            || (!chan->token && chan != thread->channel) // there's no token
+            // and a thread is not waiting specifically for that event
+            || (chan->token == thread->token && chan == thread->channel);
+            // there is a token and it matches thread's token and event
+    return res;
 }
 
-#ifdef CONFIG_INTERCONNECT_DRIVER_UMP
-/**
- * \brief Poll an incoming UMP endpoint.
- * This is logically part of the UMP endpoint implementation, but placed here
- * for easier inlining.
- */
-static inline void ump_endpoint_poll(struct waitset_chanstate *chan)
+/// Returns a channel with a pending event on the given waitset matching
+/// our thread
+static struct waitset_chanstate *get_pending_event_disabled(struct waitset *ws,
+                                    struct waitset_chanstate *chan)
 {
-    /* XXX: calculate location of endpoint from waitset channel state */
-    struct ump_endpoint *ep = (struct ump_endpoint *)
-        ((char *)chan - offsetof(struct ump_endpoint, waitset_state));
+    struct thread *me = thread_self();
 
-    if (ump_endpoint_can_recv(ep)) {
-        errval_t err = waitset_chan_trigger(chan);
-        assert(err_is_ok(err)); // should not be able to fail
+    if (chan) { // channel that we wait for
+        if (chan->state == CHAN_PENDING && waitset_check_token(chan, me)) {
+            return chan;
+        }
+        if (chan->state == CHAN_WAITING && waitset_check_token(chan, me)) {
+            return chan;
+        }
     }
+    // check a waiting queue for matching event
+    for (chan = ws->waiting; chan; ) {
+        if (waitset_check_token(chan, me)) {
+            assert_disabled(chan->state == CHAN_WAITING);
+            return chan;
+        }
+        chan = chan->next;
+        if (chan == ws->waiting)
+            break;
+    }
+    // check a pending queue for matching event
+    for (chan = ws->pending; chan;) {
+        if (waitset_check_token(chan, me)) {
+            assert_disabled(chan->state == CHAN_PENDING);
+            return chan;
+        }
+        chan = chan->next;
+        if (chan == ws->pending)
+            break;
+    }
+    return NULL;
 }
-#endif // CONFIG_INTERCONNECT_DRIVER_UMP
-
 
 void arranet_polling_loop_proxy(void) __attribute__((weak));
 void arranet_polling_loop_proxy(void)
@@ -183,206 +213,162 @@ void arranet_polling_loop_proxy(void)
     USER_PANIC("Network polling not available without Arranet!\n");
 }
 
-/// Helper function that knows how to poll the given channel, based on its type
-static void poll_channel(struct waitset_chanstate *chan)
-{
-    switch (chan->chantype) {
+/// Check polled channels
+void poll_channels_disabled(dispatcher_handle_t handle) {
+    struct dispatcher_generic *dp = get_dispatcher_generic(handle);
+    struct waitset_chanstate *chan;
+
+    if (!dp->polled_channels)
+        return;
+    chan = dp->polled_channels;
+    do {
+        switch (chan->chantype) {
 #ifdef CONFIG_INTERCONNECT_DRIVER_UMP
-    case CHANTYPE_UMP_IN:
-        ump_endpoint_poll(chan);
-        break;
+        case CHANTYPE_UMP_IN: {
+            if (ump_endpoint_poll(chan)) {
+                errval_t err = waitset_chan_trigger_disabled(chan, handle);
+                assert(err_is_ok(err)); // should not fail
+                if (!dp->polled_channels) // restart scan
+                    return;
+                chan = dp->polled_channels;
+                continue;
+            } else
+                chan = chan->polled_next;
+        } break;
 #endif // CONFIG_INTERCONNECT_DRIVER_UMP
+        case CHANTYPE_LWIP_SOCKET:
+            arranet_polling_loop_proxy();
+            break;
 
-    case CHANTYPE_LWIP_SOCKET:
-        arranet_polling_loop_proxy();
-        break;
-
-    default:
-        assert(!"invalid channel type to poll!");
-    }
+        default:
+            assert(!"invalid channel type to poll!");
+        }
+    } while (chan != dp->polled_channels);
 }
 
-// pollcycles_*: arch-specific implementation for polling.
-//               Used by get_next_event().
-//
-//   pollcycles_reset()  -- return the number of pollcycles we want to poll for
-//   pollcycles_update() -- update the pollcycles variable. This is needed for
-//                          implementations where we don't have a cycle counter
-//                          and we just count the number of polling operations
-//                          performed
-//   pollcycles_expired() -- check if pollcycles have expired
-//
-// We might want to move them to architecture-specific files, and/or create a
-// cleaner interface. For now, I just wanted to keep them out of
-// get_next_event()
-
-#if defined(__ARM_ARCH_7A__) && defined(__GNUC__) \
-       && __GNUC__ == 4 && __GNUC_MINOR__ <= 6 && __GNUC_PATCHLEVEL__ <= 3
-static __attribute__((noinline, unused))
-#else
-static inline
-#endif
-cycles_t pollcycles_reset(void)
+/// Re-register a channel (if persistent)
+static void reregister_channel(struct waitset *ws, struct waitset_chanstate *chan,
+                                dispatcher_handle_t handle)
 {
-    cycles_t pollcycles;
-#if defined(__arm__) && !defined(__gem5__)
-    reset_cycle_counter();
-    pollcycles = waitset_poll_cycles;
-#elif defined(__arm__) && defined(__gem5__)
-    pollcycles = 0;
-#elif defined(__aarch64__) && defined(__gem5__)
-    pollcycles = 0;
-#else
-    pollcycles = cyclecount() + waitset_poll_cycles;
-#endif
-    return pollcycles;
-}
+    assert(chan->waitset == ws);
+    if (chan->state == CHAN_PENDING) {
+        dequeue(&ws->pending, chan);
+    } else {
+        assert(chan->state == CHAN_WAITING);
+        dequeue(&ws->waiting, chan);
+    }
 
-#if defined(__ARM_ARCH_7A__) && defined(__GNUC__) \
-       && __GNUC__ == 4 && __GNUC_MINOR__ <= 6 && __GNUC_PATCHLEVEL__ <= 3
-static __attribute__((noinline, unused))
-#else
-static inline
-#endif
-cycles_t pollcycles_update(cycles_t pollcycles)
-{
-    cycles_t ret = pollcycles;
-    #if defined(__arm__) && defined(__gem5__)
-    ret++;
-       #elif defined(__aarch64__) && defined(__gem5__)
-       ret++;
-    #endif
-    return ret;
+    chan->token = 0;
+    if (chan->chantype == CHANTYPE_UMP_IN) {
+        enqueue(&ws->polled, chan);
+        enqueue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
+        chan->state = CHAN_POLLED;
+    } else {
+        enqueue(&ws->idle, chan);
+        chan->state = CHAN_IDLE;
+    }
 }
 
-#if defined(__ARM_ARCH_7A__) && defined(__GNUC__) \
-       && __GNUC__ == 4 && __GNUC_MINOR__ <= 6 && __GNUC_PATCHLEVEL__ <= 3
-static __attribute__((noinline, unused))
-#else
-static inline
-#endif
-bool pollcycles_expired(cycles_t pollcycles)
+/// Find a thread that is able to receive an event
+static struct thread * find_recipient(struct waitset *ws,
+                        struct waitset_chanstate *channel, struct thread *me)
 {
-    bool ret;
-    #if defined(__arm__) && !defined(__gem5__)
-    ret = (cyclecount() > pollcycles || is_cycle_counter_overflow());
-    #elif defined(__arm__) && defined(__gem5__)
-    ret = pollcycles >= POLL_COUNT;
-    #elif defined(__aarch64__) && defined(__gem5__)
-    ret = pollcycles >= POLL_COUNT;
-    #else
-    ret = cyclecount() > pollcycles;
-    #endif
-    return ret;
+    struct thread *t = ws->waiting_threads;
+
+    if (!t)
+        return NULL;
+    do {
+        if (waitset_check_token(channel, t))
+            return t;
+        t = t->next;
+    } while (t != ws->waiting_threads);
+    return ws->waiting_threads;
 }
 
-static errval_t get_next_event_debug(struct waitset *ws,
-        struct event_closure *retclosure, bool debug)
+/// Wake up other thread if there's more pending events
+static void wake_up_other_thread(dispatcher_handle_t handle, struct waitset *ws)
 {
-    struct waitset_chanstate *chan;
-    bool was_polling = false;
-    cycles_t pollcycles;
-
-    assert(ws != NULL);
-    assert(retclosure != NULL);
-
-    // unconditionally disable ourselves and check for events
-    // if we decide we have to start polling, we'll jump back up here
-    goto check_for_events;
-
-    /* ------------ POLLING LOOP; RUNS WHILE ENABLED ------------ */
-polling_loop:
-    was_polling = true;
-    assert(ws->polling); // this thread is polling
-    // get the amount of cycles we want to poll for
-    pollcycles = pollcycles_reset();
-
-    // while there are no pending events, poll channels
-    while (ws->polled != NULL && ws->pending == NULL) {
-        struct waitset_chanstate *nextchan = NULL;
-        // NB: Polling policy is to return as soon as a pending event
-        // appears, not bother looking at the rest of the polling queue
-        for (chan = ws->polled;
-             chan != NULL && chan->waitset == ws && chan->state == CHAN_POLLED
-                 && ws->pending == NULL;
-             chan = nextchan) {
-
-            nextchan = chan->next;
-            poll_channel(chan);
-            // update pollcycles
-            pollcycles = pollcycles_update(pollcycles);
-            // yield the thread if we exceed the cycle count limit
-            if (ws->pending == NULL && pollcycles_expired(pollcycles)) {
-                if (debug) {
-                if (strcmp(disp_name(), "netd") != 0) {
-                    // Print the callback trace so that we know which call is leading
-                    // the schedule removal and
-                    printf("%s: callstack: %p %p %p %p\n", disp_name(),
-                            __builtin_return_address(0),
-                            __builtin_return_address(1),
-                            __builtin_return_address(2),
-                            __builtin_return_address(3));
-                }
-
-                }
-                thread_yield();
-                pollcycles = pollcycles_reset();
-            }
-        }
+    if (ws->pending && ws->waiting_threads) {
+        struct thread *t;
 
-        // ensure that we restart polling from the place we left off here,
-        // if the next channel is a valid one
-        if (nextchan != NULL && nextchan->waitset == ws
-            && nextchan->state == CHAN_POLLED) {
-            ws->polled = nextchan;
-        }
+        t = thread_unblock_one_disabled(handle, &ws->waiting_threads, NULL);
+        assert_disabled(t == NULL); // shouldn't see a remote thread
     }
+}
 
-    /* ------------ STATE MACHINERY; RUNS WHILE DISABLED ------------ */
-check_for_events: ;
-    dispatcher_handle_t handle = disp_disable();
+/**
+ * \brief Get next pending event
+ *
+ * Check if there is a pending event that matches current thread and return it.
+ * Pending events are in a pending queue and in a waiting queue.
+ * A pending event then will be removed from a pending/waiting queue and become
+ * unregistered or, if it's persistent, will be re-registered to an idle queue
+ * or a polled queue (UMP channels) of a waitset.
+ * If there's no pending event, block this thread.
+ * If there's a pending event but it doesn't match our thread, don't remove it
+ * from a pending queue and wake up a matching thread.
+ * If there's no matching thread, add it to a waiting queue.
+ *
+ * \param ws Waitset with sources of events
+ * \param retchannel Holder of returned event
+ * \param retclosure Holder of returned closure
+ * \param waitfor Specific event that we're waiting for (can be NULL)
+ * \param handle Dispatcher's handle
+ * \param debug Debug mode (not used)
+ */
 
-    // are there any pending events on the waitset?
-    chan = get_pending_event_disabled(ws);
-    if (chan != NULL) {
-        // if we need to poll, and we have a blocked thread, wake it up to do so
-        if (was_polling && ws->polled != NULL && ws->waiting_threads != NULL) {
-            // start a blocked thread polling
-            struct thread *t;
-            t = thread_unblock_one_disabled(handle, &ws->waiting_threads, NULL);
-            assert_disabled(t == NULL); // shouldn't see a remote thread
-        } else if (was_polling) {
-            // I'm stopping polling, and there is nobody else
-            assert_disabled(ws->polling);
-            ws->polling = false;
+errval_t get_next_event_disabled(struct waitset *ws,
+    struct waitset_chanstate **retchannel, struct event_closure *retclosure,
+    struct waitset_chanstate *waitfor, dispatcher_handle_t handle, bool debug)
+{
+    struct waitset_chanstate * chan;
+
+    for (;;) {
+        chan = get_pending_event_disabled(ws, waitfor); // get our event
+        if (chan) {
+            *retchannel = chan;
+            *retclosure = chan->closure;
+            chan->wait_for = NULL;
+            chan->token = 0;
+            if (chan->persistent)
+                reregister_channel(ws, chan, handle);
+            else
+                waitset_chan_deregister_disabled(chan, handle);
+            wake_up_other_thread(handle, ws);
+            return SYS_ERR_OK;
         }
-        disp_enable(handle);
-
-        *retclosure = chan->closure;
-        return SYS_ERR_OK;
-    }
-
-    // If we got here and there are channels to poll but no-one is polling,
-    // then either we never polled, or we lost a race on the channel we picked.
-    // Either way, we'd better start polling again.
-    if (ws->polled != NULL && (was_polling || !ws->polling)) {
-        if (!was_polling) {
-            ws->polling = true;
+        chan = ws->pending; // check a pending queue
+        if (!chan) { // if nothing then wait
+            thread_block_disabled(handle, &ws->waiting_threads);
+            disp_disable();
+        } else { // something but it's not our event
+            if (!ws->waiting_threads) { // no other thread interested in
+                dequeue(&ws->pending, chan);
+                enqueue(&ws->waiting, chan);
+                chan->state = CHAN_WAITING;
+                chan->waitset = ws;
+            } else {
+                // find a matching thread
+                struct thread *t;
+                for (t = ws->waiting_threads; t; ) {
+                    if (waitset_check_token(chan, t)) { // match found, wake it
+                        ws->waiting_threads = t;
+                        t = thread_unblock_one_disabled(handle,
+                                                    &ws->waiting_threads, chan);
+                        assert_disabled(t == NULL); // shouldn't see a remote thread
+                        break;
+                    }
+                    t = t->next;
+                    if (t == ws->waiting_threads) { // no recipient found
+                        dequeue(&ws->pending, chan);
+                        enqueue(&ws->waiting, chan);
+                        chan->state = CHAN_WAITING;
+                        chan->waitset = ws;
+                        break;
+                    }
+                }
+            }
         }
-        disp_enable(handle);
-        goto polling_loop;
-    }
-
-    // otherwise block awaiting an event
-    chan = thread_block_disabled(handle, &ws->waiting_threads);
-
-    if (chan == NULL) {
-        // not a real event, just a wakeup to get us to start polling!
-        assert(ws->polling);
-        goto polling_loop;
-    } else {
-        *retclosure = chan->closure;
-        return SYS_ERR_OK;
     }
 }
 
@@ -398,58 +384,47 @@ check_for_events: ;
  */
 errval_t get_next_event(struct waitset *ws, struct event_closure *retclosure)
 {
-    return get_next_event_debug(ws, retclosure, false);
+    dispatcher_handle_t handle = disp_disable();
+    struct waitset_chanstate *channel;
+    errval_t err = get_next_event_disabled(ws, &channel, retclosure, NULL,
+                                            handle, false);
+    disp_enable(handle);
+    return err;
 }
 
 
 
 /**
- * \brief Return next event on given waitset, if one is already pending
+ * \brief Check if there is an event pending on given waitset
  *
  * This is essentially a non-blocking variant of get_next_event(). It should be
  * used with great care, to avoid the creation of busy-waiting loops.
  *
  * \param ws Waitset
- * \param retclosure Pointer to storage space for returned event closure
  *
  * \returns LIB_ERR_NO_EVENT if nothing is pending
  */
-errval_t check_for_event(struct waitset *ws, struct event_closure *retclosure)
+static errval_t check_for_event_disabled(struct waitset *ws, dispatcher_handle_t handle)
 {
     struct waitset_chanstate *chan;
-    int pollcount = 0;
 
-    assert(ws != NULL);
-    assert(retclosure != NULL);
-
- recheck: ;
-    // are there any pending events on the waitset?
-    dispatcher_handle_t handle = disp_disable();
-    chan = get_pending_event_disabled(ws);
-    disp_enable(handle);
+    poll_channels_disabled(handle);
+    chan = get_pending_event_disabled(ws, NULL);
     if (chan != NULL) {
-        *retclosure = chan->closure;
         return SYS_ERR_OK;
     }
+    return LIB_ERR_NO_EVENT;
+}
 
-    // if there are no pending events, poll all channels once
-    if (ws->polled != NULL && pollcount++ == 0) {
-        for (chan = ws->polled;
-             chan != NULL && chan->waitset == ws && chan->state == CHAN_POLLED;
-             chan = chan->next) {
-
-            poll_channel(chan);
-            if (ws->pending != NULL) {
-                goto recheck;
-            }
-
-            if (chan->next == ws->polled) { // reached the start of the queue
-                break;
-            }
-        }
-    }
+errval_t check_for_event(struct waitset *ws)
+{
+    errval_t err;
 
-    return LIB_ERR_NO_EVENT;
+    assert(ws != NULL);
+    dispatcher_handle_t handle = disp_disable();
+    err = check_for_event_disabled(ws, handle);
+    disp_enable(handle);
+    return err;
 }
 
 /**
@@ -477,20 +452,59 @@ errval_t event_dispatch(struct waitset *ws)
 errval_t event_dispatch_debug(struct waitset *ws)
 {
     struct event_closure closure;
-    errval_t err = get_next_event_debug(ws, &closure, false);
+    struct waitset_chanstate *channel;
+    dispatcher_handle_t handle = disp_disable();
+    errval_t err = get_next_event_disabled(ws, &channel, &closure, NULL, handle,
+                                            true);
+    disp_enable(handle);
     if (err_is_fail(err)) {
         return err;
     }
 
     assert(closure.handler != NULL);
-//    printf("%s: event_dispatch: %p: \n", disp_name(), closure.handler);
-
-
     closure.handler(closure.arg);
     return SYS_ERR_OK;
 }
 
 /**
+ * \brief Dispatch events until a specific event is received
+ *
+ * Wait for events and dispatch them. If a specific event comes, don't call
+ * a closure, just return.
+ *
+ * \param ws Waitset
+ * \param waitfor Event, that we are waiting for
+ * \param error_var Error variable that can be changed by closures
+ */
+
+errval_t wait_for_channel(struct waitset *ws, struct waitset_chanstate *waitfor,
+                            errval_t *error_var)
+{
+    assert(waitfor->waitset == ws);
+    for (;;) {
+        struct event_closure closure;
+        struct waitset_chanstate *channel;
+
+        dispatcher_handle_t handle = disp_disable();
+        errval_t err = get_next_event_disabled(ws, &channel, &closure, waitfor,
+                                                handle, false);
+        disp_enable(handle);
+        if (err_is_fail(err)) {
+            assert(0);
+            return err;
+        }
+        if (channel == waitfor) {
+            return SYS_ERR_OK;
+        }
+        assert(!channel->wait_for);
+        assert(closure.handler != NULL);
+        closure.handler(closure.arg);
+        if (err_is_fail(*error_var))
+            return *error_var;
+    }
+}
+
+/**
  * \brief check and dispatch next event on given waitset
  *
  * Check if there is any pending activity on some channel, or deferred
@@ -502,14 +516,23 @@ errval_t event_dispatch_debug(struct waitset *ws)
  */
 errval_t event_dispatch_non_block(struct waitset *ws)
 {
-    assert(ws != NULL);
+    struct waitset_chanstate *channel;
     struct event_closure closure;
-    errval_t err = check_for_event(ws, &closure);
 
+    assert(ws != NULL);
+
+    // are there any pending events on the waitset?
+    dispatcher_handle_t handle = disp_disable();
+    errval_t err = check_for_event_disabled(ws, handle);
     if (err_is_fail(err)) {
+        disp_enable(handle);
         return err;
     }
-
+    err = get_next_event_disabled(ws, &channel, &closure, NULL, handle,
+                                            false);
+    if (err_is_fail(err))
+        return err;
+    disp_enable(handle);
     assert(closure.handler != NULL);
     closure.handler(closure.arg);
     return SYS_ERR_OK;
@@ -537,6 +560,9 @@ void waitset_chanstate_init(struct waitset_chanstate *chan,
 #ifndef NDEBUG
     chan->prev = chan->next = NULL;
 #endif
+    chan->persistent = false;
+    chan->token = 0;
+    chan->wait_for = NULL;
 }
 
 /**
@@ -573,6 +599,7 @@ errval_t waitset_chan_register_disabled(struct waitset *ws,
     }
 
     chan->waitset = ws;
+    chan->token = 0;
 
     // channel must not already be registered!
     assert_disabled(chan->next == NULL && chan->prev == NULL);
@@ -585,15 +612,7 @@ errval_t waitset_chan_register_disabled(struct waitset *ws,
     chan->closure = closure;
 
     // enqueue this channel on the waitset's queue of idle channels
-    if (ws->idle == NULL) {
-        chan->next = chan->prev = chan;
-        ws->idle = chan;
-    } else {
-        chan->next = ws->idle;
-        chan->prev = chan->next->prev;
-        chan->next->prev = chan;
-        chan->prev->next = chan;
-    }
+    enqueue(&ws->idle, chan);
     chan->state = CHAN_IDLE;
 
     return SYS_ERR_OK;
@@ -622,6 +641,7 @@ errval_t waitset_chan_register_polled_disabled(struct waitset *ws,
     }
 
     chan->waitset = ws;
+    chan->token = 0;
 
     // channel must not already be registered!
     assert_disabled(chan->next == NULL && chan->prev == NULL);
@@ -631,23 +651,9 @@ errval_t waitset_chan_register_polled_disabled(struct waitset *ws,
     chan->closure = closure;
 
     // enqueue this channel on the waitset's queue of polled channels
-    if (ws->polled == NULL) {
-        chan->next = chan->prev = chan;
-        ws->polled = chan;
-        if (ws->waiting_threads != NULL && !ws->polling) {
-            // start a blocked thread polling
-            ws->polling = true;
-            struct thread *t;
-            t = thread_unblock_one_disabled(handle, &ws->waiting_threads, NULL);
-            assert_disabled(t == NULL); // shouldn't see a remote thread: waitsets are per-dispatcher
-        }
-    } else {
-        chan->next = ws->polled;
-        chan->prev = chan->next->prev;
-        chan->next->prev = chan;
-        chan->prev->next = chan;
-    }
+    enqueue(&ws->polled, chan);
     chan->state = CHAN_POLLED;
+    enqueue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
 
     return SYS_ERR_OK;
 }
@@ -697,120 +703,6 @@ errval_t waitset_chan_register_polled(struct waitset *ws,
 }
 
 /**
- * \brief Mark an idle channel as polled
- *
- * The given channel will periodically have its poll function called.
- * The channel must already be registered.
- *
- * \param chan Waitset's per-channel state
- */
-errval_t waitset_chan_start_polling(struct waitset_chanstate *chan)
-{
-    errval_t err = SYS_ERR_OK;
-
-    dispatcher_handle_t handle = disp_disable();
-
-    struct waitset *ws = chan->waitset;
-    if (ws == NULL) {
-        err = LIB_ERR_CHAN_NOT_REGISTERED;
-        goto out;
-    }
-
-    assert(chan->state != CHAN_UNREGISTERED);
-    if (chan->state != CHAN_IDLE) {
-        goto out; // no-op if polled or pending
-    }
-
-    // remove from idle queue
-    if (chan->next == chan) {
-        assert(chan->prev == chan);
-        assert(ws->idle == chan);
-        ws->idle = NULL;
-    } else {
-        chan->prev->next = chan->next;
-        chan->next->prev = chan->prev;
-        if (ws->idle == chan) {
-            ws->idle = chan->next;
-        }
-    }
-
-    // enqueue on polled queue
-    if (ws->polled == NULL) {
-        ws->polled = chan;
-        chan->next = chan->prev = chan;
-        if (ws->waiting_threads != NULL && !ws->polling) {
-            // start a blocked thread polling
-            ws->polling = true;
-            struct thread *t;
-            t = thread_unblock_one_disabled(handle, &ws->waiting_threads, NULL);
-            assert(t == NULL); // shouldn't see a remote thread: waitsets are per-dispatcher
-        }
-    } else {
-        chan->next = ws->polled;
-        chan->prev = ws->polled->prev;
-        chan->next->prev = chan;
-        chan->prev->next = chan;
-    }
-    chan->state = CHAN_POLLED;
-
-out:
-    disp_enable(handle);
-    return err;
-}
-
-/**
- * \brief Stop polling the given channel, making it idle again
- *
- * \param chan Waitset's per-channel state
- */
-errval_t waitset_chan_stop_polling(struct waitset_chanstate *chan)
-{
-    errval_t err = SYS_ERR_OK;
-
-    dispatcher_handle_t handle = disp_disable();
-
-    struct waitset *ws = chan->waitset;
-    if (ws == NULL) {
-        err = LIB_ERR_CHAN_NOT_REGISTERED;
-        goto out;
-    }
-
-    assert(chan->state != CHAN_UNREGISTERED);
-    if (chan->state != CHAN_POLLED) {
-        goto out; // no-op if idle or pending
-    }
-
-    // remove from polled queue
-    if (chan->next == chan) {
-        assert(chan->prev == chan);
-        assert(ws->polled == chan);
-        ws->polled = NULL;
-    } else {
-        chan->prev->next = chan->next;
-        chan->next->prev = chan->prev;
-        if (ws->polled == chan) {
-            ws->polled = chan->next;
-        }
-    }
-
-    // enqueue on idle queue
-    if (ws->idle == NULL) {
-        ws->idle = chan;
-        chan->next = chan->prev = chan;
-    } else {
-        chan->next = ws->idle;
-        chan->prev = ws->idle->prev;
-        chan->next->prev = chan;
-        chan->prev->next = chan;
-    }
-    chan->state = CHAN_IDLE;
-
-out:
-    disp_enable(handle);
-    return err;
-}
-
-/**
  * \brief Cancel a previous callback registration
  *
  * Remove the registration for a callback on the given channel.
@@ -818,7 +710,8 @@ out:
  *
  * \param chan Waitset's per-channel state
  */
-errval_t waitset_chan_deregister_disabled(struct waitset_chanstate *chan)
+errval_t waitset_chan_deregister_disabled(struct waitset_chanstate *chan,
+                                          dispatcher_handle_t handle)
 {
     assert_disabled(chan != NULL);
     struct waitset *ws = chan->waitset;
@@ -830,61 +723,29 @@ errval_t waitset_chan_deregister_disabled(struct waitset_chanstate *chan)
     chan->waitset = NULL;
     assert_disabled(chan->next != NULL && chan->prev != NULL);
 
-    if (chan->next == chan) {
-        // only thing in the list: must be the head
-        assert_disabled(chan->prev == chan);
-        switch (chan->state) {
-        case CHAN_IDLE:
-            assert_disabled(chan == ws->idle);
-            ws->idle = NULL;
-            break;
-
-        case CHAN_POLLED:
-            assert_disabled(chan == ws->polled);
-            ws->polled = NULL;
-            break;
-
-        case CHAN_PENDING:
-            assert_disabled(chan == ws->pending);
-            ws->pending = NULL;
-            break;
+    switch (chan->state) {
+    case CHAN_IDLE:
+        dequeue(&ws->idle, chan);
+        break;
 
-        default:
-            assert_disabled(!"invalid channel state in deregister");
-        }
-    } else {
-        assert_disabled(chan->prev != chan);
-        chan->prev->next = chan->next;
-        chan->next->prev = chan->prev;
-        switch (chan->state) {
-        case CHAN_IDLE:
-            if (chan == ws->idle) {
-                ws->idle = chan->next;
-            }
-            break;
+    case CHAN_POLLED:
+        dequeue(&ws->polled, chan);
+        dequeue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
+        break;
 
-        case CHAN_POLLED:
-            if (chan == ws->polled) {
-                ws->polled = chan->next;
-            }
-            break;
+    case CHAN_PENDING:
+        dequeue(&ws->pending, chan);
+        break;
 
-        case CHAN_PENDING:
-            if (chan == ws->pending) {
-                ws->pending = chan->next;
-            }
-            break;
+    case CHAN_WAITING:
+        dequeue(&ws->waiting, chan);
+        break;
 
-        default:
-            assert_disabled(!"invalid channel state in deregister");
-        }
+    default:
+        assert_disabled(!"invalid channel state in deregister");
     }
     chan->state = CHAN_UNREGISTERED;
-
-#ifndef NDEBUG
-    chan->prev = chan->next = NULL;
-#endif
-
+    chan->wait_for = NULL;
     return SYS_ERR_OK;
 }
 
@@ -899,7 +760,7 @@ errval_t waitset_chan_deregister_disabled(struct waitset_chanstate *chan)
 errval_t waitset_chan_deregister(struct waitset_chanstate *chan)
 {
     dispatcher_handle_t handle = disp_disable();
-    errval_t err = waitset_chan_deregister_disabled(chan);
+    errval_t err = waitset_chan_deregister_disabled(chan, handle);
     disp_enable(handle);
     return err;
 }
@@ -922,75 +783,23 @@ void waitset_chan_migrate(struct waitset_chanstate *chan,
 
     switch(chan->state) {
     case CHAN_IDLE:
-        if (chan->next == chan) {
-            assert(chan->prev == chan);
-            assert(ws->idle == chan);
-            ws->idle = NULL;
-        } else {
-            chan->prev->next = chan->next;
-            chan->next->prev = chan->prev;
-            if (ws->idle == chan) {
-                ws->idle = chan->next;
-            }
-        }
-
-        if (new_ws->idle == NULL) {
-            new_ws->idle = chan;
-            chan->next = chan->prev = chan;
-        } else {
-            chan->next = new_ws->idle;
-            chan->prev = new_ws->idle->prev;
-            chan->next->prev = chan;
-            chan->prev->next = chan;
-        }
+        dequeue(&ws->idle, chan);
+        enqueue(&new_ws->idle, chan);
         break;
 
     case CHAN_POLLED:
-        if (chan->next == chan) {
-            assert(chan->prev == chan);
-            assert(ws->polled == chan);
-            ws->polled = NULL;
-        } else {
-            chan->prev->next = chan->next;
-            chan->next->prev = chan->prev;
-            if (ws->polled == chan) {
-                ws->polled = chan->next;
-            }
-        }
-
-        if (new_ws->polled == NULL) {
-            new_ws->polled = chan;
-            chan->next = chan->prev = chan;
-        } else {
-            chan->next = new_ws->polled;
-            chan->prev = new_ws->polled->prev;
-            chan->next->prev = chan;
-            chan->prev->next = chan;
-        }
+        dequeue(&ws->polled, chan);
+        enqueue(&new_ws->polled, chan);
         break;
 
     case CHAN_PENDING:
-        if (chan->next == chan) {
-            assert(chan->prev == chan);
-            assert(ws->pending == chan);
-            ws->pending = NULL;
-        } else {
-            chan->prev->next = chan->next;
-            chan->next->prev = chan->prev;
-            if (ws->pending == chan) {
-                ws->pending = chan->next;
-            }
-        }
+        dequeue(&ws->pending, chan);
+        enqueue(&new_ws->pending, chan);
+        break;
 
-        if (new_ws->pending == NULL) {
-            new_ws->pending = chan;
-            chan->next = chan->prev = chan;
-        } else {
-            chan->next = new_ws->pending;
-            chan->prev = new_ws->pending->prev;
-            chan->next->prev = chan;
-            chan->prev->next = chan;
-        }
+    case CHAN_WAITING:
+        dequeue(&ws->waiting, chan);
+        enqueue(&new_ws->waiting, chan);
         break;
 
     case CHAN_UNREGISTERED:
@@ -1026,59 +835,26 @@ errval_t waitset_chan_trigger_disabled(struct waitset_chanstate *chan,
     }
 
     // remove from previous queue (either idle or polled)
-    if (chan->next == chan) {
-        assert_disabled(chan->prev == chan);
-        if (chan->state == CHAN_IDLE) {
-            assert_disabled(ws->idle == chan);
-            ws->idle = NULL;
-        } else {
-            assert_disabled(chan->state == CHAN_POLLED);
-            assert_disabled(ws->polled == chan);
-            ws->polled = NULL;
-        }
+    if (chan->state == CHAN_IDLE) {
+        dequeue(&ws->idle, chan);
     } else {
-        chan->prev->next = chan->next;
-        chan->next->prev = chan->prev;
-        if (chan->state == CHAN_IDLE) {
-            if (ws->idle == chan) {
-                ws->idle = chan->next;
-            }
-        } else {
-            assert_disabled(chan->state == CHAN_POLLED);
-            if (ws->polled == chan) {
-                ws->polled = chan->next;
-            }
-        }
+        assert_disabled(chan->state == CHAN_POLLED);
+        dequeue(&ws->polled, chan);
+        dequeue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
     }
 
+    // else mark channel pending and move to end of pending event queue
+    enqueue(&ws->pending, chan);
+    chan->state = CHAN_PENDING;
+
     // is there a thread blocked on this waitset? if so, awaken it with the event
-    if (ws->waiting_threads != NULL) {
-        chan->waitset = NULL;
-#ifndef NDEBUG
-        chan->prev = chan->next = NULL;
-#endif
-        chan->state = CHAN_UNREGISTERED;
+    struct thread *thread = find_recipient(ws, chan, thread_self());
+    if (thread) {
         struct thread *t;
+        ws->waiting_threads = thread;
         t = thread_unblock_one_disabled(handle, &ws->waiting_threads, chan);
         assert_disabled(t == NULL);
-        return SYS_ERR_OK;
-    }
-
-    // else mark channel pending and move to end of pending event queue
-    chan->state = CHAN_PENDING;
-    if (ws->pending == NULL) {
-        ws->pending = chan;
-        chan->next = chan->prev = chan;
-    } else {
-        chan->next = ws->pending;
-        chan->prev = ws->pending->prev;
-        assert_disabled(ws->pending->next != NULL);
-        assert_disabled(ws->pending->prev != NULL);
-        assert_disabled(chan->prev != NULL);
-        chan->next->prev = chan;
-        chan->prev->next = chan;
     }
-
     return SYS_ERR_OK;
 }
 
@@ -1094,9 +870,9 @@ errval_t waitset_chan_trigger_disabled(struct waitset_chanstate *chan,
  */
 errval_t waitset_chan_trigger(struct waitset_chanstate *chan)
 {
-    dispatcher_handle_t disp = disp_disable();
-    errval_t err = waitset_chan_trigger_disabled(chan, disp);
-    disp_enable(disp);
+    dispatcher_handle_t handle = disp_disable();
+    errval_t err = waitset_chan_trigger_disabled(chan, handle);
+    disp_enable(handle);
     return err;
 }
 
@@ -1130,29 +906,21 @@ errval_t waitset_chan_trigger_closure_disabled(struct waitset *ws,
     // set closure
     chan->closure = closure;
 
+    // mark channel pending and place on end of pending event queue
+    chan->waitset = ws;
+    enqueue(&ws->pending, chan);
+    // if (first)
+    //     ws->pending = chan;
+    chan->state = CHAN_PENDING;
+
     // is there a thread blocked on this waitset? if so, awaken it with the event
-    if (ws->waiting_threads != NULL) {
+    struct thread *thread = find_recipient(ws, chan, thread_self());
+    if (thread) {
         struct thread *t;
+        ws->waiting_threads = thread;
         t = thread_unblock_one_disabled(handle, &ws->waiting_threads, chan);
         assert_disabled(t == NULL);
-        return SYS_ERR_OK;
-    }
-
-    // mark channel pending and place on end of pending event queue
-    chan->waitset = ws;
-    chan->state = CHAN_PENDING;
-    if (ws->pending == NULL) {
-        ws->pending = chan;
-        chan->next = chan->prev = chan;
-    } else {
-        chan->next = ws->pending;
-        chan->prev = ws->pending->prev;
-        chan->next->prev = chan;
-        chan->prev->next = chan;
     }
-
-    assert(ws->pending->prev != NULL && ws->pending->next != NULL);
-
     return SYS_ERR_OK;
 }