mt-waitset: adding masking of channels so a thread won't handle two messages at the...
[barrelfish] / lib / barrelfish / waitset.c
index 69ec7bd..e6c0840 100644 (file)
@@ -151,21 +151,23 @@ errval_t waitset_destroy(struct waitset *ws)
     return SYS_ERR_OK;
 }
 
-/// Check if a thread can receive an event
-static bool waitset_check_token(struct waitset_chanstate *chan,
+/// Check if the thread can receive the event
+static bool waitset_can_receive(struct waitset_chanstate *chan,
                                 struct thread *thread)
 {
     bool res = false;
 
-    if (chan->wait_for) // if a thread is waiting for this specific event
-        res = chan->wait_for == thread;
-    else
-        res = (chan->token & 1 && !thread->token) // incoming token is a request
-            // and a thread is not waiting for a token
-            || (!chan->token && chan != thread->channel) // there's no token
-            // and a thread is not waiting specifically for that event
-            || (chan->token == thread->token && chan == thread->channel);
-            // there is a token and it matches thread's token and event
+    if (!thread->mask_channels || !chan->masked) {
+        if (chan->wait_for) // if a thread is waiting for this specific event
+            res = chan->wait_for == thread;
+        else
+            res = (chan->token & 1 && !thread->token) // incoming token is a request
+                // and a thread is not waiting for a token
+                || (!chan->token && chan != thread->channel) // there's no token
+                // and a thread is not waiting specifically for that event
+                || (chan->token == thread->token && chan == thread->channel);
+                // there is a token and it matches thread's token and event
+    }
     return res;
 }
 
@@ -177,16 +179,16 @@ static struct waitset_chanstate *get_pending_event_disabled(struct waitset *ws,
     struct thread *me = thread_self_disabled();
 
     if (chan) { // channel that we wait for
-        if (chan->state == CHAN_PENDING && waitset_check_token(chan, me)) {
+        if (chan->state == CHAN_PENDING && waitset_can_receive(chan, me)) {
             return chan;
         }
-        if (chan->state == CHAN_WAITING && waitset_check_token(chan, me)) {
+        if (chan->state == CHAN_WAITING && waitset_can_receive(chan, me)) {
             return chan;
         }
     }
     // check a waiting queue for matching event
     for (chan = ws->waiting; chan; ) {
-        if (waitset_check_token(chan, me)) {
+        if (waitset_can_receive(chan, me)) {
             assert_disabled(chan->state == CHAN_WAITING);
             return chan;
         }
@@ -196,7 +198,7 @@ static struct waitset_chanstate *get_pending_event_disabled(struct waitset *ws,
     }
     // check a pending queue for matching event
     for (chan = ws->pending; chan;) {
-        if (waitset_check_token(chan, me)) {
+        if (waitset_can_receive(chan, me)) {
             assert_disabled(chan->state == CHAN_PENDING);
             return chan;
         }
@@ -269,7 +271,7 @@ static void reregister_channel(struct waitset *ws, struct waitset_chanstate *cha
 
     chan->token = 0;
     if (chan->chantype == CHANTYPE_UMP_IN
-        || chan->chantype == CHANTYPE_LWIP_SOCKET 
+        || chan->chantype == CHANTYPE_LWIP_SOCKET
         || chan->chantype == CHANTYPE_AHCI) {
         enqueue(&ws->polled, chan);
         enqueue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
@@ -289,7 +291,7 @@ static struct thread * find_recipient(struct waitset *ws,
     if (!t)
         return NULL;
     do {
-        if (waitset_check_token(channel, t))
+        if (waitset_can_receive(channel, t))
             return t;
         t = t->next;
     } while (t != ws->waiting_threads);
@@ -362,7 +364,7 @@ errval_t get_next_event_disabled(struct waitset *ws,
                 // find a matching thread
                 struct thread *t;
                 for (t = ws->waiting_threads; t; ) {
-                    if (waitset_check_token(chan, t)) { // match found, wake it
+                    if (waitset_can_receive(chan, t)) { // match found, wake it
                         ws->waiting_threads = t;
                         t = thread_unblock_one_disabled(handle,
                                                     &ws->waiting_threads, chan);
@@ -574,6 +576,7 @@ void waitset_chanstate_init(struct waitset_chanstate *chan,
     chan->persistent = false;
     chan->token = 0;
     chan->wait_for = NULL;
+    chan->masked = false;
 }
 
 /**