interface mt_waitset "Multithreaded waitset test" {
- rpc rpc_method(in uint64 i1, in uint8 s[ss, 2048], in uint32 i2, out uint64 o1, out uint8 r[rs, 2048], out uint32 o2);
+ rpc rpc_method(in uint64 i1, in uint8 s[ss, 4096], in uint32 i2, out uint64 o1, out uint8 r[rs, 4096], out uint32 o2);
};
void thread_set_async_error(errval_t e);
errval_t thread_get_async_error(void);
+void thread_set_mask_channels(bool m);
+bool thread_get_mask_channels(void);
+
extern __thread thread_once_t thread_once_local_epoch;
extern void thread_once_internal(thread_once_t *control, void (*func)(void));
void ump_endpoint_migrate(struct ump_endpoint *ep, struct waitset *ws);
/**
- * \brief Returns true iff there is a message pending on the given UMP endpoint
+ * \brief Returns true if there is a message pending on the given UMP endpoint
*/
static inline bool ump_endpoint_can_recv(struct ump_endpoint *ep)
{
}
}
+/**
+ * \brief Return true if there's a message available
+ *
+ * \param channel UMP channal
+ */
static inline bool ump_endpoint_poll(struct waitset_chanstate *channel)
{
struct ump_endpoint *ep = (struct ump_endpoint *)
((char *)channel - offsetof(struct ump_endpoint, waitset_state));
- if (ump_endpoint_can_recv(ep)) {
- return true;
- }
- return false;
+ return ump_endpoint_can_recv(ep);
}
bool persistent; ///< Channel should be always registered
struct waitset_chanstate *polled_next, *polled_prev; ///< Dispatcher's polled queue
struct thread *wait_for; ///< Thread waiting for this event
+ bool masked;
};
/**
size_t maxsize);
-/// Computes (from seq/ack numbers) whether we can currently send on the channel
+/// Computes (from seq/ack numbers) whether we can currently send a non-ack
+/// on the channel
static inline bool flounder_stub_ump_can_send(struct flounder_ump_state *s) {
+ bool r = (ump_index_t)(s->next_id - s->ack_id) < s->chan.max_send_msgs;
+ return r;
+}
+
+/// Computes (from seq/ack numbers) whether we can currently send an ack
+/// on the channel
+static inline bool flounder_stub_ump_can_send_ack(struct flounder_ump_state *s) {
bool r = (ump_index_t)(s->next_id - s->ack_id) <= s->chan.max_send_msgs;
return r;
}
/// Send an explicit ACK
static inline void flounder_stub_ump_send_ack(struct flounder_ump_state *s)
{
- assert(flounder_stub_ump_can_send(s));
+ assert(flounder_stub_ump_can_send_ack(s));
struct ump_control ctrl;
volatile struct ump_message *msg = ump_chan_get_next(&s->chan, &ctrl);
flounder_stub_ump_control_fill(s, &ctrl, FL_UMP_ACK);
{
waitset_chanstate_init(wc, CHANTYPE_FLOUNDER);
wc->persistent = true;
+ wc->masked = true;
}
void flounder_support_waitset_chanstate_destroy(struct waitset_chanstate *wc)
}
*len = msg->words[0];
- assert(*len < maxsize);
+ assert(*len <= maxsize);
msgpos = 1;
} else {
msgpos = 0;
bool rpc_in_progress; ///< RPC in progress
errval_t async_error; ///< RPC async error
uint32_t outgoing_token; ///< Token of outgoing message
+ bool mask_channels;
};
void thread_enqueue(struct thread *thread, struct thread **queue);
newthread->rpc_in_progress = false;
newthread->async_error = SYS_ERR_OK;
+ newthread->mask_channels = false;
}
/**
return thread_self()->async_error;
}
+void thread_set_mask_channels(bool m)
+{
+ thread_self()->mask_channels = m;
+}
+
+bool thread_get_mask_channels(void)
+{
+ return thread_self()->mask_channels;
+}
+
/**
* \brief Yield the calling thread
*
errval_t ump_endpoint_register(struct ump_endpoint *ep, struct waitset *ws,
struct event_closure closure)
{
+ bool wd;
+ dispatcher_handle_t handle = disp_try_disable(&wd);
+ errval_t err;
+
assert(ep != NULL);
assert(ws != NULL);
if (ump_endpoint_poll(&ep->waitset_state)) { // trigger event immediately
- return waitset_chan_trigger_closure(ws, &ep->waitset_state, closure);
+ err = waitset_chan_trigger_closure_disabled(ws, &ep->waitset_state, closure, handle);
} else {
- return waitset_chan_register_polled(ws, &ep->waitset_state, closure);
+ err = waitset_chan_register_polled_disabled(ws, &ep->waitset_state, closure, handle);
}
+ if (wd)
+ disp_enable(handle);
+ return err;
}
/**
return SYS_ERR_OK;
}
-/// Check if a thread can receive an event
-static bool waitset_check_token(struct waitset_chanstate *chan,
+/// Check if the thread can receive the event
+static bool waitset_can_receive(struct waitset_chanstate *chan,
struct thread *thread)
{
bool res = false;
- if (chan->wait_for) // if a thread is waiting for this specific event
- res = chan->wait_for == thread;
- else
- res = (chan->token & 1 && !thread->token) // incoming token is a request
- // and a thread is not waiting for a token
- || (!chan->token && chan != thread->channel) // there's no token
- // and a thread is not waiting specifically for that event
- || (chan->token == thread->token && chan == thread->channel);
- // there is a token and it matches thread's token and event
+ if (!thread->mask_channels || !chan->masked) {
+ if (chan->wait_for) // if a thread is waiting for this specific event
+ res = chan->wait_for == thread;
+ else
+ res = (chan->token & 1 && !thread->token) // incoming token is a request
+ // and a thread is not waiting for a token
+ || (!chan->token && chan != thread->channel) // there's no token
+ // and a thread is not waiting specifically for that event
+ || (chan->token == thread->token && chan == thread->channel);
+ // there is a token and it matches thread's token and event
+ }
return res;
}
struct thread *me = thread_self_disabled();
if (chan) { // channel that we wait for
- if (chan->state == CHAN_PENDING && waitset_check_token(chan, me)) {
+ if (chan->state == CHAN_PENDING && waitset_can_receive(chan, me)) {
return chan;
}
- if (chan->state == CHAN_WAITING && waitset_check_token(chan, me)) {
+ if (chan->state == CHAN_WAITING && waitset_can_receive(chan, me)) {
return chan;
}
}
// check a waiting queue for matching event
for (chan = ws->waiting; chan; ) {
- if (waitset_check_token(chan, me)) {
+ if (waitset_can_receive(chan, me)) {
assert_disabled(chan->state == CHAN_WAITING);
return chan;
}
}
// check a pending queue for matching event
for (chan = ws->pending; chan;) {
- if (waitset_check_token(chan, me)) {
+ if (waitset_can_receive(chan, me)) {
assert_disabled(chan->state == CHAN_PENDING);
return chan;
}
chan->token = 0;
if (chan->chantype == CHANTYPE_UMP_IN
- || chan->chantype == CHANTYPE_LWIP_SOCKET
+ || chan->chantype == CHANTYPE_LWIP_SOCKET
|| chan->chantype == CHANTYPE_AHCI) {
enqueue(&ws->polled, chan);
enqueue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
if (!t)
return NULL;
do {
- if (waitset_check_token(channel, t))
+ if (waitset_can_receive(channel, t))
return t;
t = t->next;
} while (t != ws->waiting_threads);
// find a matching thread
struct thread *t;
for (t = ws->waiting_threads; t; ) {
- if (waitset_check_token(chan, t)) { // match found, wake it
+ if (waitset_can_receive(chan, t)) { // match found, wake it
ws->waiting_threads = t;
t = thread_unblock_one_disabled(handle,
&ws->waiting_threads, chan);
chan->persistent = false;
chan->token = 0;
chan->wait_for = NULL;
+ chan->masked = false;
}
/**
block_sending cont_ex = [
C.If (C.Binary C.Equals (cont_ex `C.FieldOf` "handler") (C.Variable "blocking_cont"))
[C.If (C.Binary C.Equals binding_error (C.Variable "SYS_ERR_OK")) [
+ C.Ex $ C.Call "thread_set_mask_channels" [C.Variable "true"],
C.Ex $ C.Assignment binding_error $ C.Call "wait_for_channel"
- [C.Variable "send_waitset", tx_cont_chanstate, C.AddressOf binding_error]
+ [C.Variable "send_waitset", tx_cont_chanstate, C.AddressOf binding_error],
+ C.Ex $ C.Call "thread_set_mask_channels" [C.Variable "false"]
] [
C.Ex $ C.Call "flounder_support_deregister_chan" [tx_cont_chanstate]
]
errval_t err;
rpc_client = arg;
int i, j, k, l;
- uint64_t payload[256];
- uint64_t result[256];
+ uint64_t payload[512];
+ uint64_t result[512];
size_t result_size;
uint64_t o1;
uint32_t o2;
for (k = 0; k < iteration_count; k++) {
uint64_t i2 = (rdtsc() & 0xffffffff) | mmm | (((uint64_t)k & 0xffffL) << 32);
- j = ((i2 >> 5) & 127) + 1;
+ j = ((i2 >> 5) & 511) + 1;
i2 &= 0xfffffffffffff000;
}
}
+ dispatcher_handle_t handle = disp_disable();
+
client_counter--;
debug_printf("Done, threads left:%d\n", client_counter);
if (client_counter == 0) {
+ disp_enable(handle);
// all threads have finished, we're done, inform the server
payload[0] = mmm;
err = rpc_client->vtbl.rpc_method(rpc_client, mmm, (uint8_t *)payload, 8, 65536, &o1, (uint8_t *)result, &result_size, &o2);
show_stats();
- }
+ } else
+ disp_enable(handle);
return 0;
}
response[i] += i;
}
if (k != j && i2 != 65536)
- debug_printf("server_zrob_call: binding:%p %08x %08x %d %d %016lx:%d\n", b, i2, b->incoming_token, k, j, response[0], me);
+ debug_printf("%s: binding:%p %08x %08x %d %d %016lx:%d\n", __func__, b, i2, b->incoming_token, k, j, response[0], me);
if (count == num_cores) {
bool failed = false;
debug_printf("Test PASSED\n");
}
calls++;
- if ((calls % iteration_count) == 0) {
+ if ((calls % 10000) == 0) {
show_stats();
}
memset(server_calls, 0, sizeof(server_calls));
memset(client_calls, 0, sizeof(client_calls));
- debug_printf("Got %d args\n", argc);
-
if (argc == 1) {
debug_printf("Usage: %s server_threads client_threads iteration_count\n", argv[0]);
} else if (argc == 5) {
iteration_count = atoi(argv[2]);
limit = atoi(argv[3]);
- start_client();
-
struct waitset *ws = get_default_waitset();
-
+ start_client();
+ debug_printf("Client process events\n");
for (;;) {
err = event_dispatch(ws);
if (err_is_fail(err)) {