#include <stdio.h>
#include <string.h>
+#include <flounder/flounder.h>
+
#ifdef CONFIG_INTERCONNECT_DRIVER_UMP
# include <barrelfish/ump_endpoint.h>
#endif
-#if defined(__k1om__) || defined(__aarch64__)
-#include <barrelfish_kpi/asm_inlines_arch.h>
-static inline cycles_t cyclecount(void)
+/// Dequeue a chanstate from a queue
+static void dequeue(struct waitset_chanstate **queue, struct waitset_chanstate *chan)
{
- return rdtsc();
+ if (chan->next == chan) {
+ assert(chan->prev == chan);
+ assert(*queue == chan);
+ *queue = NULL;
+ } else {
+ chan->prev->next = chan->next;
+ chan->next->prev = chan->prev;
+ if (*queue == chan) {
+ *queue = chan->next;
+ }
+ }
+ chan->prev = chan->next = NULL;
}
-#elif defined(__x86_64__) || defined(__i386__)
-#include <arch/x86/barrelfish_kpi/asm_inlines_arch.h>
-static inline cycles_t cyclecount(void)
+
+/// Enqueue a chanstate on a queue
+static void enqueue(struct waitset_chanstate **queue, struct waitset_chanstate *chan)
{
- return rdtsc();
+ if (*queue == NULL) {
+ *queue = chan;
+ chan->next = chan->prev = chan;
+ } else {
+ chan->next = *queue;
+ chan->prev = (*queue)->prev;
+ chan->next->prev = chan;
+ chan->prev->next = chan;
+ }
}
-#elif defined(__arm__) && defined(__gem5__)
-/**
- * XXX: Gem5 doesn't support the ARM performance monitor extension
- * therefore we just poll a fixed number of times instead of using
- * cycle counts. POLL_COUNT is deliberately set to 42, guess why! ;)
- */
-#define POLL_COUNT 42
-#elif defined(__aarch64__) && defined(__gem5__)
-#define POLL_COUNT 42
-#elif defined(__arm__)
-#include <arch/arm/barrelfish_kpi/asm_inlines_arch.h>
-static inline cycles_t cyclecount(void)
+
+/// Dequeue a chanstate from polled queue
+static void dequeue_polled(struct waitset_chanstate **queue,
+ struct waitset_chanstate *chan)
{
- return get_cycle_count();
+ if (chan->polled_next == chan) {
+ assert(chan->polled_prev == chan);
+ assert(*queue == chan);
+ *queue = NULL;
+ } else {
+ chan->polled_prev->polled_next = chan->polled_next;
+ chan->polled_next->polled_prev = chan->polled_prev;
+ if (*queue == chan) {
+ *queue = chan->polled_next;
+ }
+ }
+ chan->polled_prev = chan->polled_next = NULL;
}
-#else
-static inline cycles_t cyclecount(void)
+
+/// Enqueue a chanstate on polled queue
+static void enqueue_polled(struct waitset_chanstate **queue,
+ struct waitset_chanstate *chan)
{
- USER_PANIC("called on non-x86 architecture. why are we polling?");
- return 0;
+ if (*queue == NULL) {
+ *queue = chan;
+ chan->polled_next = chan->polled_prev = chan;
+ } else {
+ chan->polled_next = *queue;
+ chan->polled_prev = (*queue)->polled_prev;
+ chan->polled_next->polled_prev = chan;
+ chan->polled_prev->polled_next = chan;
+ }
}
-#endif
-
-// FIXME: bogus default value. need to measure this at boot time
-#define WAITSET_POLL_CYCLES_DEFAULT 2000
-
-/// Maximum number of cycles to spend polling channels before yielding CPU
-cycles_t waitset_poll_cycles = WAITSET_POLL_CYCLES_DEFAULT;
/**
* \brief Initialise a new waitset
void waitset_init(struct waitset *ws)
{
assert(ws != NULL);
- ws->pending = ws->polled = ws->idle = NULL;
+ ws->pending = ws->polled = ws->idle = ws->waiting = NULL;
ws->waiting_threads = NULL;
- ws->polling = false;
}
/**
return SYS_ERR_OK;
}
-/// Returns a channel with a pending event on the given waitset, or NULL
-static struct waitset_chanstate *get_pending_event_disabled(struct waitset *ws)
+/// Check if a thread can receive an event
+static bool waitset_check_token(struct waitset_chanstate *chan,
+ struct thread *thread)
{
- // are there any pending events on the waitset?
- if (ws->pending == NULL) {
- return NULL;
- }
-
- // dequeue next pending event
- struct waitset_chanstate *chan = ws->pending;
- if (chan->next == chan) {
- assert_disabled(chan->prev == chan);
- ws->pending = NULL;
- } else {
- ws->pending = chan->next;
- chan->prev->next = chan->next;
- chan->next->prev = chan->prev;
- }
-#ifndef NDEBUG
- chan->prev = chan->next = NULL;
-#endif
-
- // mark not pending
- assert_disabled(chan->state == CHAN_PENDING);
- chan->state = CHAN_UNREGISTERED;
- chan->waitset = NULL;
+ bool res = false;
- return chan;
+ if (chan->wait_for) // if a thread is waiting for this specific event
+ res = chan->wait_for == thread;
+ else
+ res = (chan->token & 1 && !thread->token) // incoming token is a request
+ // and a thread is not waiting for a token
+ || (!chan->token && chan != thread->channel) // there's no token
+ // and a thread is not waiting specifically for that event
+ || (chan->token == thread->token && chan == thread->channel);
+ // there is a token and it matches thread's token and event
+ return res;
}
-#ifdef CONFIG_INTERCONNECT_DRIVER_UMP
-/**
- * \brief Poll an incoming UMP endpoint.
- * This is logically part of the UMP endpoint implementation, but placed here
- * for easier inlining.
- */
-static inline void ump_endpoint_poll(struct waitset_chanstate *chan)
+/// Returns a channel with a pending event on the given waitset matching
+/// our thread
+static struct waitset_chanstate *get_pending_event_disabled(struct waitset *ws,
+ struct waitset_chanstate *chan)
{
- /* XXX: calculate location of endpoint from waitset channel state */
- struct ump_endpoint *ep = (struct ump_endpoint *)
- ((char *)chan - offsetof(struct ump_endpoint, waitset_state));
+ struct thread *me = thread_self();
- if (ump_endpoint_can_recv(ep)) {
- errval_t err = waitset_chan_trigger(chan);
- assert(err_is_ok(err)); // should not be able to fail
+ if (chan) { // channel that we wait for
+ if (chan->state == CHAN_PENDING && waitset_check_token(chan, me)) {
+ return chan;
+ }
+ if (chan->state == CHAN_WAITING && waitset_check_token(chan, me)) {
+ return chan;
+ }
+ }
+ // check a waiting queue for matching event
+ for (chan = ws->waiting; chan; ) {
+ if (waitset_check_token(chan, me)) {
+ assert_disabled(chan->state == CHAN_WAITING);
+ return chan;
+ }
+ chan = chan->next;
+ if (chan == ws->waiting)
+ break;
+ }
+ // check a pending queue for matching event
+ for (chan = ws->pending; chan;) {
+ if (waitset_check_token(chan, me)) {
+ assert_disabled(chan->state == CHAN_PENDING);
+ return chan;
+ }
+ chan = chan->next;
+ if (chan == ws->pending)
+ break;
}
+ return NULL;
}
-#endif // CONFIG_INTERCONNECT_DRIVER_UMP
-
void arranet_polling_loop_proxy(void) __attribute__((weak));
void arranet_polling_loop_proxy(void)
assert(err_is_ok(err)); // should not be able to fail
}
-/// Helper function that knows how to poll the given channel, based on its type
-static void poll_channel(struct waitset_chanstate *chan)
-{
- switch (chan->chantype) {
+/// Check polled channels
+void poll_channels_disabled(dispatcher_handle_t handle) {
+ struct dispatcher_generic *dp = get_dispatcher_generic(handle);
+ struct waitset_chanstate *chan;
+
+ if (!dp->polled_channels)
+ return;
+ chan = dp->polled_channels;
+ do {
+ switch (chan->chantype) {
#ifdef CONFIG_INTERCONNECT_DRIVER_UMP
- case CHANTYPE_UMP_IN:
- ump_endpoint_poll(chan);
- break;
+ case CHANTYPE_UMP_IN: {
+ if (ump_endpoint_poll(chan)) {
+ errval_t err = waitset_chan_trigger_disabled(chan, handle);
+ assert(err_is_ok(err)); // should not fail
+ if (!dp->polled_channels) // restart scan
+ return;
+ chan = dp->polled_channels;
+ continue;
+ } else
+ chan = chan->polled_next;
+ } break;
#endif // CONFIG_INTERCONNECT_DRIVER_UMP
-
- case CHANTYPE_LWIP_SOCKET:
- arranet_polling_loop_proxy();
- break;
-
- case CHANTYPE_AHCI:
- poll_ahci(chan);
- break;
-
- default:
- assert(!"invalid channel type to poll!");
- }
+ case CHANTYPE_LWIP_SOCKET:
+ arranet_polling_loop_proxy();
+ break;
+ case CHANTYPE_AHCI:
+ poll_ahci(chan);
+ break;
+ default:
+ assert(!"invalid channel type to poll!");
+ }
+ } while (chan != dp->polled_channels);
}
-// pollcycles_*: arch-specific implementation for polling.
-// Used by get_next_event().
-//
-// pollcycles_reset() -- return the number of pollcycles we want to poll for
-// pollcycles_update() -- update the pollcycles variable. This is needed for
-// implementations where we don't have a cycle counter
-// and we just count the number of polling operations
-// performed
-// pollcycles_expired() -- check if pollcycles have expired
-//
-// We might want to move them to architecture-specific files, and/or create a
-// cleaner interface. For now, I just wanted to keep them out of
-// get_next_event()
-
-#if defined(__ARM_ARCH_7A__) && defined(__GNUC__) \
- && __GNUC__ == 4 && __GNUC_MINOR__ <= 6 && __GNUC_PATCHLEVEL__ <= 3
-static __attribute__((noinline, unused))
-#else
-static inline
-#endif
-cycles_t pollcycles_reset(void)
+/// Re-register a channel (if persistent)
+static void reregister_channel(struct waitset *ws, struct waitset_chanstate *chan,
+ dispatcher_handle_t handle)
{
- cycles_t pollcycles;
-#if defined(__arm__) && !defined(__gem5__)
- reset_cycle_counter();
- pollcycles = waitset_poll_cycles;
-#elif defined(__arm__) && defined(__gem5__)
- pollcycles = 0;
-#elif defined(__aarch64__) && defined(__gem5__)
- pollcycles = 0;
-#else
- pollcycles = cyclecount() + waitset_poll_cycles;
-#endif
- return pollcycles;
-}
+ assert(chan->waitset == ws);
+ if (chan->state == CHAN_PENDING) {
+ dequeue(&ws->pending, chan);
+ } else {
+ assert(chan->state == CHAN_WAITING);
+ dequeue(&ws->waiting, chan);
+ }
-#if defined(__ARM_ARCH_7A__) && defined(__GNUC__) \
- && __GNUC__ == 4 && __GNUC_MINOR__ <= 6 && __GNUC_PATCHLEVEL__ <= 3
-static __attribute__((noinline, unused))
-#else
-static inline
-#endif
-cycles_t pollcycles_update(cycles_t pollcycles)
-{
- cycles_t ret = pollcycles;
- #if defined(__arm__) && defined(__gem5__)
- ret++;
- #elif defined(__aarch64__) && defined(__gem5__)
- ret++;
- #endif
- return ret;
+ chan->token = 0;
+ if (chan->chantype == CHANTYPE_UMP_IN
+ || chan->chantype == CHANTYPE_LWIP_SOCKET
+ || chan->chantype == CHANTYPE_AHCI) {
+ enqueue(&ws->polled, chan);
+ enqueue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
+ chan->state = CHAN_POLLED;
+ } else {
+ enqueue(&ws->idle, chan);
+ chan->state = CHAN_IDLE;
+ }
}
-#if defined(__ARM_ARCH_7A__) && defined(__GNUC__) \
- && __GNUC__ == 4 && __GNUC_MINOR__ <= 6 && __GNUC_PATCHLEVEL__ <= 3
-static __attribute__((noinline, unused))
-#else
-static inline
-#endif
-bool pollcycles_expired(cycles_t pollcycles)
+/// Find a thread that is able to receive an event
+static struct thread * find_recipient(struct waitset *ws,
+ struct waitset_chanstate *channel, struct thread *me)
{
- bool ret;
- #if defined(__arm__) && !defined(__gem5__)
- ret = (cyclecount() > pollcycles || is_cycle_counter_overflow());
- #elif defined(__arm__) && defined(__gem5__)
- ret = pollcycles >= POLL_COUNT;
- #elif defined(__aarch64__) && defined(__gem5__)
- ret = pollcycles >= POLL_COUNT;
- #else
- ret = cyclecount() > pollcycles;
- #endif
- return ret;
+ struct thread *t = ws->waiting_threads;
+
+ if (!t)
+ return NULL;
+ do {
+ if (waitset_check_token(channel, t))
+ return t;
+ t = t->next;
+ } while (t != ws->waiting_threads);
+ return ws->waiting_threads;
}
-static errval_t get_next_event_debug(struct waitset *ws,
- struct event_closure *retclosure, bool debug)
+/// Wake up other thread if there's more pending events
+static void wake_up_other_thread(dispatcher_handle_t handle, struct waitset *ws)
{
- struct waitset_chanstate *chan;
- bool was_polling = false;
- cycles_t pollcycles;
-
- assert(ws != NULL);
- assert(retclosure != NULL);
-
- // unconditionally disable ourselves and check for events
- // if we decide we have to start polling, we'll jump back up here
- goto check_for_events;
-
- /* ------------ POLLING LOOP; RUNS WHILE ENABLED ------------ */
-polling_loop:
- was_polling = true;
- assert(ws->polling); // this thread is polling
- // get the amount of cycles we want to poll for
- pollcycles = pollcycles_reset();
-
- // while there are no pending events, poll channels
- while (ws->polled != NULL && ws->pending == NULL) {
- struct waitset_chanstate *nextchan = NULL;
- // NB: Polling policy is to return as soon as a pending event
- // appears, not bother looking at the rest of the polling queue
- for (chan = ws->polled;
- chan != NULL && chan->waitset == ws && chan->state == CHAN_POLLED
- && ws->pending == NULL;
- chan = nextchan) {
-
- nextchan = chan->next;
- poll_channel(chan);
- // update pollcycles
- pollcycles = pollcycles_update(pollcycles);
- // yield the thread if we exceed the cycle count limit
- if (ws->pending == NULL && pollcycles_expired(pollcycles)) {
- if (debug) {
- if (strcmp(disp_name(), "netd") != 0) {
- // Print the callback trace so that we know which call is leading
- // the schedule removal and
- printf("%s: callstack: %p %p %p %p\n", disp_name(),
- __builtin_return_address(0),
- __builtin_return_address(1),
- __builtin_return_address(2),
- __builtin_return_address(3));
- }
-
- }
- thread_yield();
- pollcycles = pollcycles_reset();
- }
- }
+ if (ws->pending && ws->waiting_threads) {
+ struct thread *t;
- // ensure that we restart polling from the place we left off here,
- // if the next channel is a valid one
- if (nextchan != NULL && nextchan->waitset == ws
- && nextchan->state == CHAN_POLLED) {
- ws->polled = nextchan;
- }
+ t = thread_unblock_one_disabled(handle, &ws->waiting_threads, NULL);
+ assert_disabled(t == NULL); // shouldn't see a remote thread
}
+}
- /* ------------ STATE MACHINERY; RUNS WHILE DISABLED ------------ */
-check_for_events: ;
- dispatcher_handle_t handle = disp_disable();
+/**
+ * \brief Get next pending event
+ *
+ * Check if there is a pending event that matches current thread and return it.
+ * Pending events are in a pending queue and in a waiting queue.
+ * A pending event then will be removed from a pending/waiting queue and become
+ * unregistered or, if it's persistent, will be re-registered to an idle queue
+ * or a polled queue (UMP channels) of a waitset.
+ * If there's no pending event, block this thread.
+ * If there's a pending event but it doesn't match our thread, don't remove it
+ * from a pending queue and wake up a matching thread.
+ * If there's no matching thread, add it to a waiting queue.
+ *
+ * \param ws Waitset with sources of events
+ * \param retchannel Holder of returned event
+ * \param retclosure Holder of returned closure
+ * \param waitfor Specific event that we're waiting for (can be NULL)
+ * \param handle Dispatcher's handle
+ * \param debug Debug mode (not used)
+ */
- // are there any pending events on the waitset?
- chan = get_pending_event_disabled(ws);
- if (chan != NULL) {
- // if we need to poll, and we have a blocked thread, wake it up to do so
- if (was_polling && ws->polled != NULL && ws->waiting_threads != NULL) {
- // start a blocked thread polling
- struct thread *t;
- t = thread_unblock_one_disabled(handle, &ws->waiting_threads, NULL);
- assert_disabled(t == NULL); // shouldn't see a remote thread
- } else if (was_polling) {
- // I'm stopping polling, and there is nobody else
- assert_disabled(ws->polling);
- ws->polling = false;
+errval_t get_next_event_disabled(struct waitset *ws,
+ struct waitset_chanstate **retchannel, struct event_closure *retclosure,
+ struct waitset_chanstate *waitfor, dispatcher_handle_t handle, bool debug)
+{
+ struct waitset_chanstate * chan;
+
+ for (;;) {
+ chan = get_pending_event_disabled(ws, waitfor); // get our event
+ if (chan) {
+ *retchannel = chan;
+ *retclosure = chan->closure;
+ chan->wait_for = NULL;
+ chan->token = 0;
+ if (chan->persistent)
+ reregister_channel(ws, chan, handle);
+ else
+ waitset_chan_deregister_disabled(chan, handle);
+ wake_up_other_thread(handle, ws);
+ return SYS_ERR_OK;
}
- disp_enable(handle);
-
- *retclosure = chan->closure;
- return SYS_ERR_OK;
- }
-
- // If we got here and there are channels to poll but no-one is polling,
- // then either we never polled, or we lost a race on the channel we picked.
- // Either way, we'd better start polling again.
- if (ws->polled != NULL && (was_polling || !ws->polling)) {
- if (!was_polling) {
- ws->polling = true;
+ chan = ws->pending; // check a pending queue
+ if (!chan) { // if nothing then wait
+ thread_block_disabled(handle, &ws->waiting_threads);
+ disp_disable();
+ } else { // something but it's not our event
+ if (!ws->waiting_threads) { // no other thread interested in
+ dequeue(&ws->pending, chan);
+ enqueue(&ws->waiting, chan);
+ chan->state = CHAN_WAITING;
+ chan->waitset = ws;
+ } else {
+ // find a matching thread
+ struct thread *t;
+ for (t = ws->waiting_threads; t; ) {
+ if (waitset_check_token(chan, t)) { // match found, wake it
+ ws->waiting_threads = t;
+ t = thread_unblock_one_disabled(handle,
+ &ws->waiting_threads, chan);
+ assert_disabled(t == NULL); // shouldn't see a remote thread
+ break;
+ }
+ t = t->next;
+ if (t == ws->waiting_threads) { // no recipient found
+ dequeue(&ws->pending, chan);
+ enqueue(&ws->waiting, chan);
+ chan->state = CHAN_WAITING;
+ chan->waitset = ws;
+ break;
+ }
+ }
+ }
}
- disp_enable(handle);
- goto polling_loop;
- }
-
- // otherwise block awaiting an event
- chan = thread_block_disabled(handle, &ws->waiting_threads);
-
- if (chan == NULL) {
- // not a real event, just a wakeup to get us to start polling!
- assert(ws->polling);
- goto polling_loop;
- } else {
- *retclosure = chan->closure;
- return SYS_ERR_OK;
}
}
*/
errval_t get_next_event(struct waitset *ws, struct event_closure *retclosure)
{
- return get_next_event_debug(ws, retclosure, false);
+ dispatcher_handle_t handle = disp_disable();
+ struct waitset_chanstate *channel;
+ errval_t err = get_next_event_disabled(ws, &channel, retclosure, NULL,
+ handle, false);
+ disp_enable(handle);
+ return err;
}
/**
- * \brief Return next event on given waitset, if one is already pending
+ * \brief Check if there is an event pending on given waitset
*
* This is essentially a non-blocking variant of get_next_event(). It should be
* used with great care, to avoid the creation of busy-waiting loops.
*
* \param ws Waitset
- * \param retclosure Pointer to storage space for returned event closure
*
* \returns LIB_ERR_NO_EVENT if nothing is pending
*/
-errval_t check_for_event(struct waitset *ws, struct event_closure *retclosure)
+static errval_t check_for_event_disabled(struct waitset *ws, dispatcher_handle_t handle)
{
struct waitset_chanstate *chan;
- int pollcount = 0;
-
- assert(ws != NULL);
- assert(retclosure != NULL);
- recheck: ;
- // are there any pending events on the waitset?
- dispatcher_handle_t handle = disp_disable();
- chan = get_pending_event_disabled(ws);
- disp_enable(handle);
+ poll_channels_disabled(handle);
+ chan = get_pending_event_disabled(ws, NULL);
if (chan != NULL) {
- *retclosure = chan->closure;
return SYS_ERR_OK;
}
+ return LIB_ERR_NO_EVENT;
+}
- // if there are no pending events, poll all channels once
- if (ws->polled != NULL && pollcount++ == 0) {
- for (chan = ws->polled;
- chan != NULL && chan->waitset == ws && chan->state == CHAN_POLLED;
- chan = chan->next) {
-
- poll_channel(chan);
- if (ws->pending != NULL) {
- goto recheck;
- }
-
- if (chan->next == ws->polled) { // reached the start of the queue
- break;
- }
- }
- }
+errval_t check_for_event(struct waitset *ws)
+{
+ errval_t err;
- return LIB_ERR_NO_EVENT;
+ assert(ws != NULL);
+ dispatcher_handle_t handle = disp_disable();
+ err = check_for_event_disabled(ws, handle);
+ disp_enable(handle);
+ return err;
}
/**
errval_t event_dispatch_debug(struct waitset *ws)
{
struct event_closure closure;
- errval_t err = get_next_event_debug(ws, &closure, false);
+ struct waitset_chanstate *channel;
+ dispatcher_handle_t handle = disp_disable();
+ errval_t err = get_next_event_disabled(ws, &channel, &closure, NULL, handle,
+ true);
+ disp_enable(handle);
if (err_is_fail(err)) {
return err;
}
assert(closure.handler != NULL);
-// printf("%s: event_dispatch: %p: \n", disp_name(), closure.handler);
-
-
closure.handler(closure.arg);
return SYS_ERR_OK;
}
/**
+ * \brief Dispatch events until a specific event is received
+ *
+ * Wait for events and dispatch them. If a specific event comes, don't call
+ * a closure, just return.
+ *
+ * \param ws Waitset
+ * \param waitfor Event, that we are waiting for
+ * \param error_var Error variable that can be changed by closures
+ */
+
+errval_t wait_for_channel(struct waitset *ws, struct waitset_chanstate *waitfor,
+ errval_t *error_var)
+{
+ assert(waitfor->waitset == ws);
+ for (;;) {
+ struct event_closure closure;
+ struct waitset_chanstate *channel;
+
+ dispatcher_handle_t handle = disp_disable();
+ errval_t err = get_next_event_disabled(ws, &channel, &closure, waitfor,
+ handle, false);
+ disp_enable(handle);
+ if (err_is_fail(err)) {
+ assert(0);
+ return err;
+ }
+ if (channel == waitfor) {
+ return SYS_ERR_OK;
+ }
+ assert(!channel->wait_for);
+ assert(closure.handler != NULL);
+ closure.handler(closure.arg);
+ if (err_is_fail(*error_var))
+ return *error_var;
+ }
+}
+
+/**
* \brief check and dispatch next event on given waitset
*
* Check if there is any pending activity on some channel, or deferred
*/
errval_t event_dispatch_non_block(struct waitset *ws)
{
- assert(ws != NULL);
+ struct waitset_chanstate *channel;
struct event_closure closure;
- errval_t err = check_for_event(ws, &closure);
+ assert(ws != NULL);
+
+ // are there any pending events on the waitset?
+ dispatcher_handle_t handle = disp_disable();
+ errval_t err = check_for_event_disabled(ws, handle);
if (err_is_fail(err)) {
+ disp_enable(handle);
return err;
}
-
+ err = get_next_event_disabled(ws, &channel, &closure, NULL, handle,
+ false);
+ if (err_is_fail(err))
+ return err;
+ disp_enable(handle);
assert(closure.handler != NULL);
closure.handler(closure.arg);
return SYS_ERR_OK;
#ifndef NDEBUG
chan->prev = chan->next = NULL;
#endif
+ chan->persistent = false;
+ chan->token = 0;
+ chan->wait_for = NULL;
}
/**
}
chan->waitset = ws;
+ chan->token = 0;
// channel must not already be registered!
assert_disabled(chan->next == NULL && chan->prev == NULL);
chan->closure = closure;
// enqueue this channel on the waitset's queue of idle channels
- if (ws->idle == NULL) {
- chan->next = chan->prev = chan;
- ws->idle = chan;
- } else {
- chan->next = ws->idle;
- chan->prev = chan->next->prev;
- chan->next->prev = chan;
- chan->prev->next = chan;
- }
+ enqueue(&ws->idle, chan);
chan->state = CHAN_IDLE;
return SYS_ERR_OK;
}
chan->waitset = ws;
+ chan->token = 0;
// channel must not already be registered!
assert_disabled(chan->next == NULL && chan->prev == NULL);
chan->closure = closure;
// enqueue this channel on the waitset's queue of polled channels
- if (ws->polled == NULL) {
- chan->next = chan->prev = chan;
- ws->polled = chan;
- if (ws->waiting_threads != NULL && !ws->polling) {
- // start a blocked thread polling
- ws->polling = true;
- struct thread *t;
- t = thread_unblock_one_disabled(handle, &ws->waiting_threads, NULL);
- assert_disabled(t == NULL); // shouldn't see a remote thread: waitsets are per-dispatcher
- }
- } else {
- chan->next = ws->polled;
- chan->prev = chan->next->prev;
- chan->next->prev = chan;
- chan->prev->next = chan;
- }
+ enqueue(&ws->polled, chan);
chan->state = CHAN_POLLED;
+ enqueue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
return SYS_ERR_OK;
}
}
/**
- * \brief Mark an idle channel as polled
- *
- * The given channel will periodically have its poll function called.
- * The channel must already be registered.
- *
- * \param chan Waitset's per-channel state
- */
-errval_t waitset_chan_start_polling(struct waitset_chanstate *chan)
-{
- errval_t err = SYS_ERR_OK;
-
- dispatcher_handle_t handle = disp_disable();
-
- struct waitset *ws = chan->waitset;
- if (ws == NULL) {
- err = LIB_ERR_CHAN_NOT_REGISTERED;
- goto out;
- }
-
- assert(chan->state != CHAN_UNREGISTERED);
- if (chan->state != CHAN_IDLE) {
- goto out; // no-op if polled or pending
- }
-
- // remove from idle queue
- if (chan->next == chan) {
- assert(chan->prev == chan);
- assert(ws->idle == chan);
- ws->idle = NULL;
- } else {
- chan->prev->next = chan->next;
- chan->next->prev = chan->prev;
- if (ws->idle == chan) {
- ws->idle = chan->next;
- }
- }
-
- // enqueue on polled queue
- if (ws->polled == NULL) {
- ws->polled = chan;
- chan->next = chan->prev = chan;
- if (ws->waiting_threads != NULL && !ws->polling) {
- // start a blocked thread polling
- ws->polling = true;
- struct thread *t;
- t = thread_unblock_one_disabled(handle, &ws->waiting_threads, NULL);
- assert(t == NULL); // shouldn't see a remote thread: waitsets are per-dispatcher
- }
- } else {
- chan->next = ws->polled;
- chan->prev = ws->polled->prev;
- chan->next->prev = chan;
- chan->prev->next = chan;
- }
- chan->state = CHAN_POLLED;
-
-out:
- disp_enable(handle);
- return err;
-}
-
-/**
- * \brief Stop polling the given channel, making it idle again
- *
- * \param chan Waitset's per-channel state
- */
-errval_t waitset_chan_stop_polling(struct waitset_chanstate *chan)
-{
- errval_t err = SYS_ERR_OK;
-
- dispatcher_handle_t handle = disp_disable();
-
- struct waitset *ws = chan->waitset;
- if (ws == NULL) {
- err = LIB_ERR_CHAN_NOT_REGISTERED;
- goto out;
- }
-
- assert(chan->state != CHAN_UNREGISTERED);
- if (chan->state != CHAN_POLLED) {
- goto out; // no-op if idle or pending
- }
-
- // remove from polled queue
- if (chan->next == chan) {
- assert(chan->prev == chan);
- assert(ws->polled == chan);
- ws->polled = NULL;
- } else {
- chan->prev->next = chan->next;
- chan->next->prev = chan->prev;
- if (ws->polled == chan) {
- ws->polled = chan->next;
- }
- }
-
- // enqueue on idle queue
- if (ws->idle == NULL) {
- ws->idle = chan;
- chan->next = chan->prev = chan;
- } else {
- chan->next = ws->idle;
- chan->prev = ws->idle->prev;
- chan->next->prev = chan;
- chan->prev->next = chan;
- }
- chan->state = CHAN_IDLE;
-
-out:
- disp_enable(handle);
- return err;
-}
-
-/**
* \brief Cancel a previous callback registration
*
* Remove the registration for a callback on the given channel.
*
* \param chan Waitset's per-channel state
*/
-errval_t waitset_chan_deregister_disabled(struct waitset_chanstate *chan)
+errval_t waitset_chan_deregister_disabled(struct waitset_chanstate *chan,
+ dispatcher_handle_t handle)
{
assert_disabled(chan != NULL);
struct waitset *ws = chan->waitset;
chan->waitset = NULL;
assert_disabled(chan->next != NULL && chan->prev != NULL);
- if (chan->next == chan) {
- // only thing in the list: must be the head
- assert_disabled(chan->prev == chan);
- switch (chan->state) {
- case CHAN_IDLE:
- assert_disabled(chan == ws->idle);
- ws->idle = NULL;
- break;
-
- case CHAN_POLLED:
- assert_disabled(chan == ws->polled);
- ws->polled = NULL;
- break;
-
- case CHAN_PENDING:
- assert_disabled(chan == ws->pending);
- ws->pending = NULL;
- break;
+ switch (chan->state) {
+ case CHAN_IDLE:
+ dequeue(&ws->idle, chan);
+ break;
- default:
- assert_disabled(!"invalid channel state in deregister");
- }
- } else {
- assert_disabled(chan->prev != chan);
- chan->prev->next = chan->next;
- chan->next->prev = chan->prev;
- switch (chan->state) {
- case CHAN_IDLE:
- if (chan == ws->idle) {
- ws->idle = chan->next;
- }
- break;
+ case CHAN_POLLED:
+ dequeue(&ws->polled, chan);
+ dequeue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
+ break;
- case CHAN_POLLED:
- if (chan == ws->polled) {
- ws->polled = chan->next;
- }
- break;
+ case CHAN_PENDING:
+ dequeue(&ws->pending, chan);
+ break;
- case CHAN_PENDING:
- if (chan == ws->pending) {
- ws->pending = chan->next;
- }
- break;
+ case CHAN_WAITING:
+ dequeue(&ws->waiting, chan);
+ break;
- default:
- assert_disabled(!"invalid channel state in deregister");
- }
+ default:
+ assert_disabled(!"invalid channel state in deregister");
}
chan->state = CHAN_UNREGISTERED;
-
-#ifndef NDEBUG
- chan->prev = chan->next = NULL;
-#endif
-
+ chan->wait_for = NULL;
return SYS_ERR_OK;
}
errval_t waitset_chan_deregister(struct waitset_chanstate *chan)
{
dispatcher_handle_t handle = disp_disable();
- errval_t err = waitset_chan_deregister_disabled(chan);
+ errval_t err = waitset_chan_deregister_disabled(chan, handle);
disp_enable(handle);
return err;
}
switch(chan->state) {
case CHAN_IDLE:
- if (chan->next == chan) {
- assert(chan->prev == chan);
- assert(ws->idle == chan);
- ws->idle = NULL;
- } else {
- chan->prev->next = chan->next;
- chan->next->prev = chan->prev;
- if (ws->idle == chan) {
- ws->idle = chan->next;
- }
- }
-
- if (new_ws->idle == NULL) {
- new_ws->idle = chan;
- chan->next = chan->prev = chan;
- } else {
- chan->next = new_ws->idle;
- chan->prev = new_ws->idle->prev;
- chan->next->prev = chan;
- chan->prev->next = chan;
- }
+ dequeue(&ws->idle, chan);
+ enqueue(&new_ws->idle, chan);
break;
case CHAN_POLLED:
- if (chan->next == chan) {
- assert(chan->prev == chan);
- assert(ws->polled == chan);
- ws->polled = NULL;
- } else {
- chan->prev->next = chan->next;
- chan->next->prev = chan->prev;
- if (ws->polled == chan) {
- ws->polled = chan->next;
- }
- }
-
- if (new_ws->polled == NULL) {
- new_ws->polled = chan;
- chan->next = chan->prev = chan;
- } else {
- chan->next = new_ws->polled;
- chan->prev = new_ws->polled->prev;
- chan->next->prev = chan;
- chan->prev->next = chan;
- }
+ dequeue(&ws->polled, chan);
+ enqueue(&new_ws->polled, chan);
break;
case CHAN_PENDING:
- if (chan->next == chan) {
- assert(chan->prev == chan);
- assert(ws->pending == chan);
- ws->pending = NULL;
- } else {
- chan->prev->next = chan->next;
- chan->next->prev = chan->prev;
- if (ws->pending == chan) {
- ws->pending = chan->next;
- }
- }
+ dequeue(&ws->pending, chan);
+ enqueue(&new_ws->pending, chan);
+ break;
- if (new_ws->pending == NULL) {
- new_ws->pending = chan;
- chan->next = chan->prev = chan;
- } else {
- chan->next = new_ws->pending;
- chan->prev = new_ws->pending->prev;
- chan->next->prev = chan;
- chan->prev->next = chan;
- }
+ case CHAN_WAITING:
+ dequeue(&ws->waiting, chan);
+ enqueue(&new_ws->waiting, chan);
break;
case CHAN_UNREGISTERED:
}
// remove from previous queue (either idle or polled)
- if (chan->next == chan) {
- assert_disabled(chan->prev == chan);
- if (chan->state == CHAN_IDLE) {
- assert_disabled(ws->idle == chan);
- ws->idle = NULL;
- } else {
- assert_disabled(chan->state == CHAN_POLLED);
- assert_disabled(ws->polled == chan);
- ws->polled = NULL;
- }
+ if (chan->state == CHAN_IDLE) {
+ dequeue(&ws->idle, chan);
} else {
- chan->prev->next = chan->next;
- chan->next->prev = chan->prev;
- if (chan->state == CHAN_IDLE) {
- if (ws->idle == chan) {
- ws->idle = chan->next;
- }
- } else {
- assert_disabled(chan->state == CHAN_POLLED);
- if (ws->polled == chan) {
- ws->polled = chan->next;
- }
- }
+ assert_disabled(chan->state == CHAN_POLLED);
+ dequeue(&ws->polled, chan);
+ dequeue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
}
+ // else mark channel pending and move to end of pending event queue
+ enqueue(&ws->pending, chan);
+ chan->state = CHAN_PENDING;
+
// is there a thread blocked on this waitset? if so, awaken it with the event
- if (ws->waiting_threads != NULL) {
- chan->waitset = NULL;
-#ifndef NDEBUG
- chan->prev = chan->next = NULL;
-#endif
- chan->state = CHAN_UNREGISTERED;
+ struct thread *thread = find_recipient(ws, chan, thread_self());
+ if (thread) {
struct thread *t;
+ ws->waiting_threads = thread;
t = thread_unblock_one_disabled(handle, &ws->waiting_threads, chan);
assert_disabled(t == NULL);
- return SYS_ERR_OK;
- }
-
- // else mark channel pending and move to end of pending event queue
- chan->state = CHAN_PENDING;
- if (ws->pending == NULL) {
- ws->pending = chan;
- chan->next = chan->prev = chan;
- } else {
- chan->next = ws->pending;
- chan->prev = ws->pending->prev;
- assert_disabled(ws->pending->next != NULL);
- assert_disabled(ws->pending->prev != NULL);
- assert_disabled(chan->prev != NULL);
- chan->next->prev = chan;
- chan->prev->next = chan;
}
-
return SYS_ERR_OK;
}
*/
errval_t waitset_chan_trigger(struct waitset_chanstate *chan)
{
- dispatcher_handle_t disp = disp_disable();
- errval_t err = waitset_chan_trigger_disabled(chan, disp);
- disp_enable(disp);
+ dispatcher_handle_t handle = disp_disable();
+ errval_t err = waitset_chan_trigger_disabled(chan, handle);
+ disp_enable(handle);
return err;
}
// set closure
chan->closure = closure;
+ // mark channel pending and place on end of pending event queue
+ chan->waitset = ws;
+ enqueue(&ws->pending, chan);
+ // if (first)
+ // ws->pending = chan;
+ chan->state = CHAN_PENDING;
+
// is there a thread blocked on this waitset? if so, awaken it with the event
- if (ws->waiting_threads != NULL) {
+ struct thread *thread = find_recipient(ws, chan, thread_self());
+ if (thread) {
struct thread *t;
+ ws->waiting_threads = thread;
t = thread_unblock_one_disabled(handle, &ws->waiting_threads, chan);
assert_disabled(t == NULL);
- return SYS_ERR_OK;
- }
-
- // mark channel pending and place on end of pending event queue
- chan->waitset = ws;
- chan->state = CHAN_PENDING;
- if (ws->pending == NULL) {
- ws->pending = chan;
- chan->next = chan->prev = chan;
- } else {
- chan->next = ws->pending;
- chan->prev = ws->pending->prev;
- chan->next->prev = chan;
- chan->prev->next = chan;
}
-
- assert(ws->pending->prev != NULL && ws->pending->next != NULL);
-
return SYS_ERR_OK;
}