From b0e04368bb148daeef031ed5130b326a789be31a Mon Sep 17 00:00:00 2001 From: Adam Turowski Date: Tue, 20 Sep 2016 11:54:51 +0200 Subject: [PATCH] mt-waitset: adding masking of channels so a thread won't handle two messages at the same time Signed-off-by: Adam Turowski --- if/mt_waitset.if | 2 +- include/barrelfish/threads.h | 3 ++ include/barrelfish/ump_endpoint.h | 12 +++++---- include/barrelfish/waitset.h | 1 + include/flounder/flounder_support_ump.h | 12 ++++++++- lib/barrelfish/flounder_support.c | 3 +- lib/barrelfish/include/threads_priv.h | 1 + lib/barrelfish/threads.c | 11 ++++++++ lib/barrelfish/ump_endpoint.c | 11 +++++++- lib/barrelfish/waitset.c | 39 ++++++++++++++++-------------- tools/flounder/BackendCommon.hs | 4 ++- usr/tests/mt_waitset/main.c | 23 +++++++++-------- 12 files changed, 81 insertions(+), 41 deletions(-) diff --git a/if/mt_waitset.if b/if/mt_waitset.if index 206b871..75b360e 100644 --- a/if/mt_waitset.if +++ b/if/mt_waitset.if @@ -1,3 +1,3 @@ interface mt_waitset "Multithreaded waitset test" { - rpc rpc_method(in uint64 i1, in uint8 s[ss, 2048], in uint32 i2, out uint64 o1, out uint8 r[rs, 2048], out uint32 o2); + rpc rpc_method(in uint64 i1, in uint8 s[ss, 4096], in uint32 i2, out uint64 o1, out uint8 r[rs, 4096], out uint32 o2); }; diff --git a/include/barrelfish/threads.h b/include/barrelfish/threads.h index f64e1fb..97980d5 100644 --- a/include/barrelfish/threads.h +++ b/include/barrelfish/threads.h @@ -91,6 +91,9 @@ bool thread_get_rpc_in_progress(void); void thread_set_async_error(errval_t e); errval_t thread_get_async_error(void); +void thread_set_mask_channels(bool m); +bool thread_get_mask_channels(void); + extern __thread thread_once_t thread_once_local_epoch; extern void thread_once_internal(thread_once_t *control, void (*func)(void)); diff --git a/include/barrelfish/ump_endpoint.h b/include/barrelfish/ump_endpoint.h index f6800ef..e7bc47d 100644 --- a/include/barrelfish/ump_endpoint.h +++ b/include/barrelfish/ump_endpoint.h @@ -37,7 +37,7 @@ errval_t ump_endpoint_deregister(struct ump_endpoint *ep); void ump_endpoint_migrate(struct ump_endpoint *ep, struct waitset *ws); /** - * \brief Returns true iff there is a message pending on the given UMP endpoint + * \brief Returns true if there is a message pending on the given UMP endpoint */ static inline bool ump_endpoint_can_recv(struct ump_endpoint *ep) { @@ -64,15 +64,17 @@ static inline errval_t ump_endpoint_recv(struct ump_endpoint *ep, } } +/** + * \brief Return true if there's a message available + * + * \param channel UMP channal + */ static inline bool ump_endpoint_poll(struct waitset_chanstate *channel) { struct ump_endpoint *ep = (struct ump_endpoint *) ((char *)channel - offsetof(struct ump_endpoint, waitset_state)); - if (ump_endpoint_can_recv(ep)) { - return true; - } - return false; + return ump_endpoint_can_recv(ep); } diff --git a/include/barrelfish/waitset.h b/include/barrelfish/waitset.h index f2e208b..371e2cb 100644 --- a/include/barrelfish/waitset.h +++ b/include/barrelfish/waitset.h @@ -86,6 +86,7 @@ struct waitset_chanstate { bool persistent; ///< Channel should be always registered struct waitset_chanstate *polled_next, *polled_prev; ///< Dispatcher's polled queue struct thread *wait_for; ///< Thread waiting for this event + bool masked; }; /** diff --git a/include/flounder/flounder_support_ump.h b/include/flounder/flounder_support_ump.h index 963b7ca..383b856 100644 --- a/include/flounder/flounder_support_ump.h +++ b/include/flounder/flounder_support_ump.h @@ -65,8 +65,16 @@ errval_t flounder_stub_ump_recv_buf(volatile struct ump_message *msg, size_t maxsize); -/// Computes (from seq/ack numbers) whether we can currently send on the channel +/// Computes (from seq/ack numbers) whether we can currently send a non-ack +/// on the channel static inline bool flounder_stub_ump_can_send(struct flounder_ump_state *s) { + bool r = (ump_index_t)(s->next_id - s->ack_id) < s->chan.max_send_msgs; + return r; +} + +/// Computes (from seq/ack numbers) whether we can currently send an ack +/// on the channel +static inline bool flounder_stub_ump_can_send_ack(struct flounder_ump_state *s) { bool r = (ump_index_t)(s->next_id - s->ack_id) <= s->chan.max_send_msgs; return r; } @@ -128,7 +136,7 @@ static inline bool flounder_stub_ump_needs_ack(struct flounder_ump_state *s) /// Send an explicit ACK static inline void flounder_stub_ump_send_ack(struct flounder_ump_state *s) { - assert(flounder_stub_ump_can_send(s)); + assert(flounder_stub_ump_can_send_ack(s)); struct ump_control ctrl; volatile struct ump_message *msg = ump_chan_get_next(&s->chan, &ctrl); flounder_stub_ump_control_fill(s, &ctrl, FL_UMP_ACK); diff --git a/lib/barrelfish/flounder_support.c b/lib/barrelfish/flounder_support.c index 17928ae..176e85e 100644 --- a/lib/barrelfish/flounder_support.c +++ b/lib/barrelfish/flounder_support.c @@ -85,6 +85,7 @@ void flounder_support_waitset_chanstate_init_persistent(struct waitset_chanstate { waitset_chanstate_init(wc, CHANTYPE_FLOUNDER); wc->persistent = true; + wc->masked = true; } void flounder_support_waitset_chanstate_destroy(struct waitset_chanstate *wc) @@ -326,7 +327,7 @@ errval_t flounder_stub_lmp_recv_buf(struct lmp_recv_msg *msg, void *buf, } *len = msg->words[0]; - assert(*len < maxsize); + assert(*len <= maxsize); msgpos = 1; } else { msgpos = 0; diff --git a/lib/barrelfish/include/threads_priv.h b/lib/barrelfish/include/threads_priv.h index 692bc47..9fb381c 100644 --- a/lib/barrelfish/include/threads_priv.h +++ b/lib/barrelfish/include/threads_priv.h @@ -81,6 +81,7 @@ struct thread { bool rpc_in_progress; ///< RPC in progress errval_t async_error; ///< RPC async error uint32_t outgoing_token; ///< Token of outgoing message + bool mask_channels; }; void thread_enqueue(struct thread *thread, struct thread **queue); diff --git a/lib/barrelfish/threads.c b/lib/barrelfish/threads.c index 8d9e849..3e0ad29 100644 --- a/lib/barrelfish/threads.c +++ b/lib/barrelfish/threads.c @@ -256,6 +256,7 @@ static void thread_init(dispatcher_handle_t disp, struct thread *newthread) newthread->rpc_in_progress = false; newthread->async_error = SYS_ERR_OK; + newthread->mask_channels = false; } /** @@ -682,6 +683,16 @@ errval_t thread_get_async_error(void) return thread_self()->async_error; } +void thread_set_mask_channels(bool m) +{ + thread_self()->mask_channels = m; +} + +bool thread_get_mask_channels(void) +{ + return thread_self()->mask_channels; +} + /** * \brief Yield the calling thread * diff --git a/lib/barrelfish/ump_endpoint.c b/lib/barrelfish/ump_endpoint.c index cdf694d..52aa653 100644 --- a/lib/barrelfish/ump_endpoint.c +++ b/lib/barrelfish/ump_endpoint.c @@ -60,14 +60,21 @@ void ump_endpoint_destroy(struct ump_endpoint *ep) errval_t ump_endpoint_register(struct ump_endpoint *ep, struct waitset *ws, struct event_closure closure) { + bool wd; + dispatcher_handle_t handle = disp_try_disable(&wd); + errval_t err; + assert(ep != NULL); assert(ws != NULL); if (ump_endpoint_poll(&ep->waitset_state)) { // trigger event immediately - return waitset_chan_trigger_closure(ws, &ep->waitset_state, closure); + err = waitset_chan_trigger_closure_disabled(ws, &ep->waitset_state, closure, handle); } else { - return waitset_chan_register_polled(ws, &ep->waitset_state, closure); + err = waitset_chan_register_polled_disabled(ws, &ep->waitset_state, closure, handle); } + if (wd) + disp_enable(handle); + return err; } /** diff --git a/lib/barrelfish/waitset.c b/lib/barrelfish/waitset.c index 69ec7bd..e6c0840 100644 --- a/lib/barrelfish/waitset.c +++ b/lib/barrelfish/waitset.c @@ -151,21 +151,23 @@ errval_t waitset_destroy(struct waitset *ws) return SYS_ERR_OK; } -/// Check if a thread can receive an event -static bool waitset_check_token(struct waitset_chanstate *chan, +/// Check if the thread can receive the event +static bool waitset_can_receive(struct waitset_chanstate *chan, struct thread *thread) { bool res = false; - if (chan->wait_for) // if a thread is waiting for this specific event - res = chan->wait_for == thread; - else - res = (chan->token & 1 && !thread->token) // incoming token is a request - // and a thread is not waiting for a token - || (!chan->token && chan != thread->channel) // there's no token - // and a thread is not waiting specifically for that event - || (chan->token == thread->token && chan == thread->channel); - // there is a token and it matches thread's token and event + if (!thread->mask_channels || !chan->masked) { + if (chan->wait_for) // if a thread is waiting for this specific event + res = chan->wait_for == thread; + else + res = (chan->token & 1 && !thread->token) // incoming token is a request + // and a thread is not waiting for a token + || (!chan->token && chan != thread->channel) // there's no token + // and a thread is not waiting specifically for that event + || (chan->token == thread->token && chan == thread->channel); + // there is a token and it matches thread's token and event + } return res; } @@ -177,16 +179,16 @@ static struct waitset_chanstate *get_pending_event_disabled(struct waitset *ws, struct thread *me = thread_self_disabled(); if (chan) { // channel that we wait for - if (chan->state == CHAN_PENDING && waitset_check_token(chan, me)) { + if (chan->state == CHAN_PENDING && waitset_can_receive(chan, me)) { return chan; } - if (chan->state == CHAN_WAITING && waitset_check_token(chan, me)) { + if (chan->state == CHAN_WAITING && waitset_can_receive(chan, me)) { return chan; } } // check a waiting queue for matching event for (chan = ws->waiting; chan; ) { - if (waitset_check_token(chan, me)) { + if (waitset_can_receive(chan, me)) { assert_disabled(chan->state == CHAN_WAITING); return chan; } @@ -196,7 +198,7 @@ static struct waitset_chanstate *get_pending_event_disabled(struct waitset *ws, } // check a pending queue for matching event for (chan = ws->pending; chan;) { - if (waitset_check_token(chan, me)) { + if (waitset_can_receive(chan, me)) { assert_disabled(chan->state == CHAN_PENDING); return chan; } @@ -269,7 +271,7 @@ static void reregister_channel(struct waitset *ws, struct waitset_chanstate *cha chan->token = 0; if (chan->chantype == CHANTYPE_UMP_IN - || chan->chantype == CHANTYPE_LWIP_SOCKET + || chan->chantype == CHANTYPE_LWIP_SOCKET || chan->chantype == CHANTYPE_AHCI) { enqueue(&ws->polled, chan); enqueue_polled(&get_dispatcher_generic(handle)->polled_channels, chan); @@ -289,7 +291,7 @@ static struct thread * find_recipient(struct waitset *ws, if (!t) return NULL; do { - if (waitset_check_token(channel, t)) + if (waitset_can_receive(channel, t)) return t; t = t->next; } while (t != ws->waiting_threads); @@ -362,7 +364,7 @@ errval_t get_next_event_disabled(struct waitset *ws, // find a matching thread struct thread *t; for (t = ws->waiting_threads; t; ) { - if (waitset_check_token(chan, t)) { // match found, wake it + if (waitset_can_receive(chan, t)) { // match found, wake it ws->waiting_threads = t; t = thread_unblock_one_disabled(handle, &ws->waiting_threads, chan); @@ -574,6 +576,7 @@ void waitset_chanstate_init(struct waitset_chanstate *chan, chan->persistent = false; chan->token = 0; chan->wait_for = NULL; + chan->masked = false; } /** diff --git a/tools/flounder/BackendCommon.hs b/tools/flounder/BackendCommon.hs index d8932ba..e75f31a 100644 --- a/tools/flounder/BackendCommon.hs +++ b/tools/flounder/BackendCommon.hs @@ -404,8 +404,10 @@ block_sending :: C.Expr -> [C.Stmt] block_sending cont_ex = [ C.If (C.Binary C.Equals (cont_ex `C.FieldOf` "handler") (C.Variable "blocking_cont")) [C.If (C.Binary C.Equals binding_error (C.Variable "SYS_ERR_OK")) [ + C.Ex $ C.Call "thread_set_mask_channels" [C.Variable "true"], C.Ex $ C.Assignment binding_error $ C.Call "wait_for_channel" - [C.Variable "send_waitset", tx_cont_chanstate, C.AddressOf binding_error] + [C.Variable "send_waitset", tx_cont_chanstate, C.AddressOf binding_error], + C.Ex $ C.Call "thread_set_mask_channels" [C.Variable "false"] ] [ C.Ex $ C.Call "flounder_support_deregister_chan" [tx_cont_chanstate] ] diff --git a/usr/tests/mt_waitset/main.c b/usr/tests/mt_waitset/main.c index ba61361..507c495 100644 --- a/usr/tests/mt_waitset/main.c +++ b/usr/tests/mt_waitset/main.c @@ -55,8 +55,8 @@ static int client_thread(void * arg) errval_t err; rpc_client = arg; int i, j, k, l; - uint64_t payload[256]; - uint64_t result[256]; + uint64_t payload[512]; + uint64_t result[512]; size_t result_size; uint64_t o1; uint32_t o2; @@ -68,7 +68,7 @@ static int client_thread(void * arg) for (k = 0; k < iteration_count; k++) { uint64_t i2 = (rdtsc() & 0xffffffff) | mmm | (((uint64_t)k & 0xffffL) << 32); - j = ((i2 >> 5) & 127) + 1; + j = ((i2 >> 5) & 511) + 1; i2 &= 0xfffffffffffff000; @@ -95,15 +95,19 @@ static int client_thread(void * arg) } } + dispatcher_handle_t handle = disp_disable(); + client_counter--; debug_printf("Done, threads left:%d\n", client_counter); if (client_counter == 0) { + disp_enable(handle); // all threads have finished, we're done, inform the server payload[0] = mmm; err = rpc_client->vtbl.rpc_method(rpc_client, mmm, (uint8_t *)payload, 8, 65536, &o1, (uint8_t *)result, &result_size, &o2); show_stats(); - } + } else + disp_enable(handle); return 0; } @@ -182,7 +186,7 @@ static errval_t server_rpc_method_call(struct mt_waitset_binding *b, uint64_t i1 response[i] += i; } if (k != j && i2 != 65536) - debug_printf("server_zrob_call: binding:%p %08x %08x %d %d %016lx:%d\n", b, i2, b->incoming_token, k, j, response[0], me); + debug_printf("%s: binding:%p %08x %08x %d %d %016lx:%d\n", __func__, b, i2, b->incoming_token, k, j, response[0], me); if (count == num_cores) { bool failed = false; @@ -204,7 +208,7 @@ out: debug_printf("Test PASSED\n"); } calls++; - if ((calls % iteration_count) == 0) { + if ((calls % 10000) == 0) { show_stats(); } @@ -273,8 +277,6 @@ int main(int argc, char *argv[]) memset(server_calls, 0, sizeof(server_calls)); memset(client_calls, 0, sizeof(client_calls)); - debug_printf("Got %d args\n", argc); - if (argc == 1) { debug_printf("Usage: %s server_threads client_threads iteration_count\n", argv[0]); } else if (argc == 5) { @@ -306,10 +308,9 @@ int main(int argc, char *argv[]) iteration_count = atoi(argv[2]); limit = atoi(argv[3]); - start_client(); - struct waitset *ws = get_default_waitset(); - + start_client(); + debug_printf("Client process events\n"); for (;;) { err = event_dispatch(ws); if (err_is_fail(err)) { -- 1.7.2.5