mt-waitset: adding masking of channels so a thread won't handle two messages at the...
authorAdam Turowski <adam.turowski@inf.ethz.ch>
Tue, 20 Sep 2016 09:54:51 +0000 (11:54 +0200)
committerAdam Turowski <adam.turowski@inf.ethz.ch>
Tue, 20 Sep 2016 09:54:51 +0000 (11:54 +0200)
Signed-off-by: Adam Turowski <adam.turowski@inf.ethz.ch>

12 files changed:
if/mt_waitset.if
include/barrelfish/threads.h
include/barrelfish/ump_endpoint.h
include/barrelfish/waitset.h
include/flounder/flounder_support_ump.h
lib/barrelfish/flounder_support.c
lib/barrelfish/include/threads_priv.h
lib/barrelfish/threads.c
lib/barrelfish/ump_endpoint.c
lib/barrelfish/waitset.c
tools/flounder/BackendCommon.hs
usr/tests/mt_waitset/main.c

index 206b871..75b360e 100644 (file)
@@ -1,3 +1,3 @@
 interface mt_waitset "Multithreaded waitset test" {
-    rpc rpc_method(in uint64 i1, in uint8 s[ss, 2048], in uint32 i2, out uint64 o1, out uint8 r[rs, 2048], out uint32 o2);
+    rpc rpc_method(in uint64 i1, in uint8 s[ss, 4096], in uint32 i2, out uint64 o1, out uint8 r[rs, 4096], out uint32 o2);
 };
index f64e1fb..97980d5 100644 (file)
@@ -91,6 +91,9 @@ bool thread_get_rpc_in_progress(void);
 void thread_set_async_error(errval_t e);
 errval_t thread_get_async_error(void);
 
+void thread_set_mask_channels(bool m);
+bool thread_get_mask_channels(void);
+
 extern __thread thread_once_t thread_once_local_epoch;
 extern void thread_once_internal(thread_once_t *control, void (*func)(void));
 
index f6800ef..e7bc47d 100644 (file)
@@ -37,7 +37,7 @@ errval_t ump_endpoint_deregister(struct ump_endpoint *ep);
 void ump_endpoint_migrate(struct ump_endpoint *ep, struct waitset *ws);
 
 /**
- * \brief Returns true iff there is a message pending on the given UMP endpoint
+ * \brief Returns true if there is a message pending on the given UMP endpoint
  */
 static inline bool ump_endpoint_can_recv(struct ump_endpoint *ep)
 {
@@ -64,15 +64,17 @@ static inline errval_t ump_endpoint_recv(struct ump_endpoint *ep,
     }
 }
 
+/**
+ * \brief Return true if there's a message available
+ *
+ * \param channel UMP channal
+ */
 static inline bool ump_endpoint_poll(struct waitset_chanstate *channel)
 {
     struct ump_endpoint *ep = (struct ump_endpoint *)
         ((char *)channel - offsetof(struct ump_endpoint, waitset_state));
 
-    if (ump_endpoint_can_recv(ep)) {
-        return true;
-    }
-    return false;
+    return ump_endpoint_can_recv(ep);
 }
 
 
index f2e208b..371e2cb 100644 (file)
@@ -86,6 +86,7 @@ struct waitset_chanstate {
     bool persistent;                        ///< Channel should be always registered
     struct waitset_chanstate *polled_next, *polled_prev;    ///< Dispatcher's polled queue
     struct thread *wait_for;                ///< Thread waiting for this event
+    bool masked;
 };
 
 /**
index 963b7ca..383b856 100644 (file)
@@ -65,8 +65,16 @@ errval_t flounder_stub_ump_recv_buf(volatile struct ump_message *msg,
                                     size_t maxsize);
 
 
-/// Computes (from seq/ack numbers) whether we can currently send on the channel
+/// Computes (from seq/ack numbers) whether we can currently send a non-ack
+/// on the channel
 static inline bool flounder_stub_ump_can_send(struct flounder_ump_state *s) {
+    bool r = (ump_index_t)(s->next_id - s->ack_id) < s->chan.max_send_msgs;
+    return r;
+}
+
+/// Computes (from seq/ack numbers) whether we can currently send an ack
+/// on the channel
+static inline bool flounder_stub_ump_can_send_ack(struct flounder_ump_state *s) {
     bool r = (ump_index_t)(s->next_id - s->ack_id) <= s->chan.max_send_msgs;
     return r;
 }
@@ -128,7 +136,7 @@ static inline bool flounder_stub_ump_needs_ack(struct flounder_ump_state *s)
 /// Send an explicit ACK
 static inline void flounder_stub_ump_send_ack(struct flounder_ump_state *s)
 {
-    assert(flounder_stub_ump_can_send(s));
+    assert(flounder_stub_ump_can_send_ack(s));
     struct ump_control ctrl;
     volatile struct ump_message *msg = ump_chan_get_next(&s->chan, &ctrl);
     flounder_stub_ump_control_fill(s, &ctrl, FL_UMP_ACK);
index 17928ae..176e85e 100644 (file)
@@ -85,6 +85,7 @@ void flounder_support_waitset_chanstate_init_persistent(struct waitset_chanstate
 {
     waitset_chanstate_init(wc, CHANTYPE_FLOUNDER);
     wc->persistent = true;
+    wc->masked = true;
 }
 
 void flounder_support_waitset_chanstate_destroy(struct waitset_chanstate *wc)
@@ -326,7 +327,7 @@ errval_t flounder_stub_lmp_recv_buf(struct lmp_recv_msg *msg, void *buf,
         }
 
         *len = msg->words[0];
-        assert(*len < maxsize);
+        assert(*len <= maxsize);
         msgpos = 1;
     } else {
         msgpos = 0;
index 692bc47..9fb381c 100644 (file)
@@ -81,6 +81,7 @@ struct thread {
     bool    rpc_in_progress;               ///< RPC in progress
     errval_t    async_error;                ///< RPC async error
     uint32_t    outgoing_token;             ///< Token of outgoing message
+    bool    mask_channels;
 };
 
 void thread_enqueue(struct thread *thread, struct thread **queue);
index 8d9e849..3e0ad29 100644 (file)
@@ -256,6 +256,7 @@ static void thread_init(dispatcher_handle_t disp, struct thread *newthread)
 
     newthread->rpc_in_progress = false;
     newthread->async_error = SYS_ERR_OK;
+    newthread->mask_channels = false;
 }
 
 /**
@@ -682,6 +683,16 @@ errval_t thread_get_async_error(void)
     return thread_self()->async_error;
 }
 
+void thread_set_mask_channels(bool m)
+{
+    thread_self()->mask_channels = m;
+}
+
+bool thread_get_mask_channels(void)
+{
+    return thread_self()->mask_channels;
+}
+
 /**
  * \brief Yield the calling thread
  *
index cdf694d..52aa653 100644 (file)
@@ -60,14 +60,21 @@ void ump_endpoint_destroy(struct ump_endpoint *ep)
 errval_t ump_endpoint_register(struct ump_endpoint *ep, struct waitset *ws,
                                 struct event_closure closure)
 {
+    bool wd;
+    dispatcher_handle_t handle = disp_try_disable(&wd);
+    errval_t err;
+
     assert(ep != NULL);
     assert(ws != NULL);
 
     if (ump_endpoint_poll(&ep->waitset_state)) { // trigger event immediately
-        return waitset_chan_trigger_closure(ws, &ep->waitset_state, closure);
+        err = waitset_chan_trigger_closure_disabled(ws, &ep->waitset_state, closure, handle);
     } else {
-        return waitset_chan_register_polled(ws, &ep->waitset_state, closure);
+        err = waitset_chan_register_polled_disabled(ws, &ep->waitset_state, closure, handle);
     }
+    if (wd)
+        disp_enable(handle);
+    return err;
 }
 
 /**
index 69ec7bd..e6c0840 100644 (file)
@@ -151,21 +151,23 @@ errval_t waitset_destroy(struct waitset *ws)
     return SYS_ERR_OK;
 }
 
-/// Check if a thread can receive an event
-static bool waitset_check_token(struct waitset_chanstate *chan,
+/// Check if the thread can receive the event
+static bool waitset_can_receive(struct waitset_chanstate *chan,
                                 struct thread *thread)
 {
     bool res = false;
 
-    if (chan->wait_for) // if a thread is waiting for this specific event
-        res = chan->wait_for == thread;
-    else
-        res = (chan->token & 1 && !thread->token) // incoming token is a request
-            // and a thread is not waiting for a token
-            || (!chan->token && chan != thread->channel) // there's no token
-            // and a thread is not waiting specifically for that event
-            || (chan->token == thread->token && chan == thread->channel);
-            // there is a token and it matches thread's token and event
+    if (!thread->mask_channels || !chan->masked) {
+        if (chan->wait_for) // if a thread is waiting for this specific event
+            res = chan->wait_for == thread;
+        else
+            res = (chan->token & 1 && !thread->token) // incoming token is a request
+                // and a thread is not waiting for a token
+                || (!chan->token && chan != thread->channel) // there's no token
+                // and a thread is not waiting specifically for that event
+                || (chan->token == thread->token && chan == thread->channel);
+                // there is a token and it matches thread's token and event
+    }
     return res;
 }
 
@@ -177,16 +179,16 @@ static struct waitset_chanstate *get_pending_event_disabled(struct waitset *ws,
     struct thread *me = thread_self_disabled();
 
     if (chan) { // channel that we wait for
-        if (chan->state == CHAN_PENDING && waitset_check_token(chan, me)) {
+        if (chan->state == CHAN_PENDING && waitset_can_receive(chan, me)) {
             return chan;
         }
-        if (chan->state == CHAN_WAITING && waitset_check_token(chan, me)) {
+        if (chan->state == CHAN_WAITING && waitset_can_receive(chan, me)) {
             return chan;
         }
     }
     // check a waiting queue for matching event
     for (chan = ws->waiting; chan; ) {
-        if (waitset_check_token(chan, me)) {
+        if (waitset_can_receive(chan, me)) {
             assert_disabled(chan->state == CHAN_WAITING);
             return chan;
         }
@@ -196,7 +198,7 @@ static struct waitset_chanstate *get_pending_event_disabled(struct waitset *ws,
     }
     // check a pending queue for matching event
     for (chan = ws->pending; chan;) {
-        if (waitset_check_token(chan, me)) {
+        if (waitset_can_receive(chan, me)) {
             assert_disabled(chan->state == CHAN_PENDING);
             return chan;
         }
@@ -269,7 +271,7 @@ static void reregister_channel(struct waitset *ws, struct waitset_chanstate *cha
 
     chan->token = 0;
     if (chan->chantype == CHANTYPE_UMP_IN
-        || chan->chantype == CHANTYPE_LWIP_SOCKET 
+        || chan->chantype == CHANTYPE_LWIP_SOCKET
         || chan->chantype == CHANTYPE_AHCI) {
         enqueue(&ws->polled, chan);
         enqueue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
@@ -289,7 +291,7 @@ static struct thread * find_recipient(struct waitset *ws,
     if (!t)
         return NULL;
     do {
-        if (waitset_check_token(channel, t))
+        if (waitset_can_receive(channel, t))
             return t;
         t = t->next;
     } while (t != ws->waiting_threads);
@@ -362,7 +364,7 @@ errval_t get_next_event_disabled(struct waitset *ws,
                 // find a matching thread
                 struct thread *t;
                 for (t = ws->waiting_threads; t; ) {
-                    if (waitset_check_token(chan, t)) { // match found, wake it
+                    if (waitset_can_receive(chan, t)) { // match found, wake it
                         ws->waiting_threads = t;
                         t = thread_unblock_one_disabled(handle,
                                                     &ws->waiting_threads, chan);
@@ -574,6 +576,7 @@ void waitset_chanstate_init(struct waitset_chanstate *chan,
     chan->persistent = false;
     chan->token = 0;
     chan->wait_for = NULL;
+    chan->masked = false;
 }
 
 /**
index d8932ba..e75f31a 100644 (file)
@@ -404,8 +404,10 @@ block_sending :: C.Expr -> [C.Stmt]
 block_sending cont_ex = [
     C.If (C.Binary C.Equals (cont_ex `C.FieldOf` "handler") (C.Variable "blocking_cont"))
         [C.If (C.Binary C.Equals binding_error (C.Variable "SYS_ERR_OK")) [
+            C.Ex $ C.Call "thread_set_mask_channels" [C.Variable "true"],
             C.Ex $ C.Assignment binding_error $ C.Call "wait_for_channel"
-                [C.Variable "send_waitset", tx_cont_chanstate, C.AddressOf binding_error]
+                [C.Variable "send_waitset", tx_cont_chanstate, C.AddressOf binding_error],
+            C.Ex $ C.Call "thread_set_mask_channels" [C.Variable "false"]
             ] [
             C.Ex $ C.Call "flounder_support_deregister_chan" [tx_cont_chanstate]
             ]
index ba61361..507c495 100644 (file)
@@ -55,8 +55,8 @@ static int client_thread(void * arg)
     errval_t err;
     rpc_client = arg;
     int i, j, k, l;
-    uint64_t payload[256];
-    uint64_t result[256];
+    uint64_t payload[512];
+    uint64_t result[512];
     size_t result_size;
     uint64_t o1;
     uint32_t o2;
@@ -68,7 +68,7 @@ static int client_thread(void * arg)
     for (k = 0; k < iteration_count; k++) {
         uint64_t i2 = (rdtsc() & 0xffffffff) | mmm | (((uint64_t)k & 0xffffL) << 32);
 
-        j = ((i2 >> 5) & 127) + 1;
+        j = ((i2 >> 5) & 511) + 1;
 
         i2 &= 0xfffffffffffff000;
 
@@ -95,15 +95,19 @@ static int client_thread(void * arg)
         }
     }
 
+    dispatcher_handle_t handle = disp_disable();
+
     client_counter--;
     debug_printf("Done, threads left:%d\n", client_counter);
 
     if (client_counter == 0) {
+        disp_enable(handle);
         // all threads have finished, we're done, inform the server
         payload[0] = mmm;
         err = rpc_client->vtbl.rpc_method(rpc_client, mmm, (uint8_t *)payload, 8, 65536, &o1, (uint8_t *)result, &result_size, &o2);
         show_stats();
-    }
+    } else
+        disp_enable(handle);
     return 0;
 }
 
@@ -182,7 +186,7 @@ static errval_t server_rpc_method_call(struct mt_waitset_binding *b, uint64_t i1
         response[i] += i;
     }
     if (k != j && i2 != 65536)
-        debug_printf("server_zrob_call: binding:%p %08x %08x  %d %d   %016lx:%d\n", b, i2, b->incoming_token, k, j, response[0], me);
+        debug_printf("%s: binding:%p %08x %08x  %d %d   %016lx:%d\n", __func__, b, i2, b->incoming_token, k, j, response[0], me);
     if (count == num_cores) {
         bool failed = false;
 
@@ -204,7 +208,7 @@ out:
             debug_printf("Test PASSED\n");
     }
     calls++;
-    if ((calls % iteration_count) == 0) {
+    if ((calls % 10000) == 0) {
         show_stats();
     }
 
@@ -273,8 +277,6 @@ int main(int argc, char *argv[])
     memset(server_calls, 0, sizeof(server_calls));
     memset(client_calls, 0, sizeof(client_calls));
 
-    debug_printf("Got %d args\n", argc);
-
     if (argc == 1) {
         debug_printf("Usage: %s server_threads client_threads iteration_count\n", argv[0]);
     } else if (argc == 5) {
@@ -306,10 +308,9 @@ int main(int argc, char *argv[])
         iteration_count = atoi(argv[2]);
         limit = atoi(argv[3]);
 
-        start_client();
-
         struct waitset *ws = get_default_waitset();
-
+        start_client();
+        debug_printf("Client process events\n");
         for (;;) {
             err = event_dispatch(ws);
             if (err_is_fail(err)) {