3 * \brief Waitset and low-level event handling mechanism
5 * A "wait set" is a collection of channels to wait on, much like an
6 * FDSET in POSIX. There should be a default, static wait set for each
7 * dispatcher. Threads which wait for events specify the wait set they
12 * Copyright (c) 2009-2012, ETH Zurich.
13 * Copyright (c) 2015, Hewlett Packard Enterprise Development LP.
14 * All rights reserved.
16 * This file is distributed under the terms in the attached LICENSE file.
17 * If you do not find this file, copies can be found by writing to:
18 * ETH Zurich D-INFK, Universitaetstr. 6, CH-8092 Zurich. Attn: Systems Group.
21 #include <barrelfish/barrelfish.h>
22 #include <barrelfish/waitset.h>
23 #include <barrelfish/waitset_chan.h>
24 #include <barrelfish/threads.h>
25 #include <barrelfish/dispatch.h>
26 #include "threads_priv.h"
27 #include "waitset_chan_priv.h"
31 #include <flounder/flounder.h>
33 #ifdef CONFIG_INTERCONNECT_DRIVER_UMP
34 # include <barrelfish/ump_endpoint.h>
37 /// Dequeue a chanstate from a queue
38 static void dequeue(struct waitset_chanstate **queue, struct waitset_chanstate *chan)
40 if (chan->next == chan) {
41 assert(chan->prev == chan);
42 assert(*queue == chan);
45 chan->prev->next = chan->next;
46 chan->next->prev = chan->prev;
51 chan->prev = chan->next = NULL;
54 /// Enqueue a chanstate on a queue
55 static void enqueue(struct waitset_chanstate **queue, struct waitset_chanstate *chan)
59 chan->next = chan->prev = chan;
62 chan->prev = (*queue)->prev;
63 chan->next->prev = chan;
64 chan->prev->next = chan;
68 /// Dequeue a chanstate from polled queue
69 static void dequeue_polled(struct waitset_chanstate **queue,
70 struct waitset_chanstate *chan)
72 if (chan->polled_next == chan) {
73 assert(chan->polled_prev == chan);
74 assert(*queue == chan);
77 chan->polled_prev->polled_next = chan->polled_next;
78 chan->polled_next->polled_prev = chan->polled_prev;
80 *queue = chan->polled_next;
83 chan->polled_prev = chan->polled_next = NULL;
86 /// Enqueue a chanstate on polled queue
87 static void enqueue_polled(struct waitset_chanstate **queue,
88 struct waitset_chanstate *chan)
92 chan->polled_next = chan->polled_prev = chan;
94 chan->polled_next = *queue;
95 chan->polled_prev = (*queue)->polled_prev;
96 chan->polled_next->polled_prev = chan;
97 chan->polled_prev->polled_next = chan;
102 * \brief Initialise a new waitset
104 void waitset_init(struct waitset *ws)
107 ws->pending = ws->polled = ws->idle = ws->waiting = NULL;
108 ws->waiting_threads = NULL;
112 * \brief Destroy a previously initialised waitset
114 errval_t waitset_destroy(struct waitset *ws)
118 // FIXME: do we want to support cancelling all the pending events/channels?
119 if (ws->pending || ws->waiting_threads) {
120 return LIB_ERR_WAITSET_IN_USE;
123 // remove idle and polled channels from waitset
124 struct waitset_chanstate *chan, *next;
125 for (chan = ws->idle; chan != NULL; chan = next) {
127 assert(chan->state == CHAN_IDLE);
128 assert(chan->waitset == ws);
129 chan->waitset = NULL;
130 chan->next = chan->prev = NULL;
132 if (next == ws->idle) {
138 for (chan = ws->polled; chan != NULL; chan = next) {
140 assert(chan->state == CHAN_POLLED);
141 assert(chan->waitset == ws);
142 chan->waitset = NULL;
143 chan->next = chan->prev = NULL;
145 if (next == ws->polled) {
154 /// Check if a thread can receive an event
155 static bool waitset_check_token(struct waitset_chanstate *chan,
156 struct thread *thread)
160 if (chan->wait_for) // if a thread is waiting for this specific event
161 res = chan->wait_for == thread;
163 res = (chan->token & 1 && !thread->token) // incoming token is a request
164 // and a thread is not waiting for a token
165 || (!chan->token && chan != thread->channel) // there's no token
166 // and a thread is not waiting specifically for that event
167 || (chan->token == thread->token && chan == thread->channel);
168 // there is a token and it matches thread's token and event
172 /// Returns a channel with a pending event on the given waitset matching
174 static struct waitset_chanstate *get_pending_event_disabled(struct waitset *ws,
175 struct waitset_chanstate *chan)
177 struct thread *me = thread_self();
179 if (chan) { // channel that we wait for
180 if (chan->state == CHAN_PENDING && waitset_check_token(chan, me)) {
183 if (chan->state == CHAN_WAITING && waitset_check_token(chan, me)) {
187 // check a waiting queue for matching event
188 for (chan = ws->waiting; chan; ) {
189 if (waitset_check_token(chan, me)) {
190 assert_disabled(chan->state == CHAN_WAITING);
194 if (chan == ws->waiting)
197 // check a pending queue for matching event
198 for (chan = ws->pending; chan;) {
199 if (waitset_check_token(chan, me)) {
200 assert_disabled(chan->state == CHAN_PENDING);
204 if (chan == ws->pending)
210 void arranet_polling_loop_proxy(void) __attribute__((weak));
211 void arranet_polling_loop_proxy(void)
213 USER_PANIC("Network polling not available without Arranet!\n");
216 /// Check polled channels
217 void poll_channels_disabled(dispatcher_handle_t handle) {
218 struct dispatcher_generic *dp = get_dispatcher_generic(handle);
219 struct waitset_chanstate *chan;
221 if (!dp->polled_channels)
223 chan = dp->polled_channels;
225 switch (chan->chantype) {
226 #ifdef CONFIG_INTERCONNECT_DRIVER_UMP
227 case CHANTYPE_UMP_IN: {
228 if (ump_endpoint_poll(chan)) {
229 errval_t err = waitset_chan_trigger_disabled(chan, handle);
230 assert(err_is_ok(err)); // should not fail
231 if (!dp->polled_channels) // restart scan
233 chan = dp->polled_channels;
236 chan = chan->polled_next;
238 #endif // CONFIG_INTERCONNECT_DRIVER_UMP
239 case CHANTYPE_LWIP_SOCKET:
240 arranet_polling_loop_proxy();
244 assert(!"invalid channel type to poll!");
246 } while (chan != dp->polled_channels);
249 /// Re-register a channel (if persistent)
250 static void reregister_channel(struct waitset *ws, struct waitset_chanstate *chan,
251 dispatcher_handle_t handle)
253 assert(chan->waitset == ws);
254 if (chan->state == CHAN_PENDING) {
255 dequeue(&ws->pending, chan);
257 assert(chan->state == CHAN_WAITING);
258 dequeue(&ws->waiting, chan);
262 if (chan->chantype == CHANTYPE_UMP_IN) {
263 enqueue(&ws->polled, chan);
264 enqueue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
265 chan->state = CHAN_POLLED;
267 enqueue(&ws->idle, chan);
268 chan->state = CHAN_IDLE;
272 /// Find a thread that is able to receive an event
273 static struct thread * find_recipient(struct waitset *ws,
274 struct waitset_chanstate *channel, struct thread *me)
276 struct thread *t = ws->waiting_threads;
281 if (waitset_check_token(channel, t))
284 } while (t != ws->waiting_threads);
285 return ws->waiting_threads;
288 /// Wake up other thread if there's more pending events
289 static void wake_up_other_thread(dispatcher_handle_t handle, struct waitset *ws)
291 if (ws->pending && ws->waiting_threads) {
294 t = thread_unblock_one_disabled(handle, &ws->waiting_threads, NULL);
295 assert_disabled(t == NULL); // shouldn't see a remote thread
300 * \brief Get next pending event
302 * Check if there is a pending event that matches current thread and return it.
303 * Pending events are in a pending queue and in a waiting queue.
304 * A pending event then will be removed from a pending/waiting queue and become
305 * unregistered or, if it's persistent, will be re-registered to an idle queue
306 * or a polled queue (UMP channels) of a waitset.
307 * If there's no pending event, block this thread.
308 * If there's a pending event but it doesn't match our thread, don't remove it
309 * from a pending queue and wake up a matching thread.
310 * If there's no matching thread, add it to a waiting queue.
312 * \param ws Waitset with sources of events
313 * \param retchannel Holder of returned event
314 * \param retclosure Holder of returned closure
315 * \param waitfor Specific event that we're waiting for (can be NULL)
316 * \param handle Dispatcher's handle
317 * \param debug Debug mode (not used)
320 errval_t get_next_event_disabled(struct waitset *ws,
321 struct waitset_chanstate **retchannel, struct event_closure *retclosure,
322 struct waitset_chanstate *waitfor, dispatcher_handle_t handle, bool debug)
324 struct waitset_chanstate * chan;
327 chan = get_pending_event_disabled(ws, waitfor); // get our event
330 *retclosure = chan->closure;
331 chan->wait_for = NULL;
333 if (chan->persistent)
334 reregister_channel(ws, chan, handle);
336 waitset_chan_deregister_disabled(chan, handle);
337 wake_up_other_thread(handle, ws);
340 chan = ws->pending; // check a pending queue
341 if (!chan) { // if nothing then wait
342 thread_block_disabled(handle, &ws->waiting_threads);
344 } else { // something but it's not our event
345 if (!ws->waiting_threads) { // no other thread interested in
346 dequeue(&ws->pending, chan);
347 enqueue(&ws->waiting, chan);
348 chan->state = CHAN_WAITING;
351 // find a matching thread
353 for (t = ws->waiting_threads; t; ) {
354 if (waitset_check_token(chan, t)) { // match found, wake it
355 ws->waiting_threads = t;
356 t = thread_unblock_one_disabled(handle,
357 &ws->waiting_threads, chan);
358 assert_disabled(t == NULL); // shouldn't see a remote thread
362 if (t == ws->waiting_threads) { // no recipient found
363 dequeue(&ws->pending, chan);
364 enqueue(&ws->waiting, chan);
365 chan->state = CHAN_WAITING;
376 * \brief Wait for (block) and return next event on given waitset
378 * Wait until something happens, either activity on some channel, or a deferred
379 * call, and then return the corresponding closure. This is the core of the
380 * event-handling system.
383 * \param retclosure Pointer to storage space for returned event closure
385 errval_t get_next_event(struct waitset *ws, struct event_closure *retclosure)
387 dispatcher_handle_t handle = disp_disable();
388 struct waitset_chanstate *channel;
389 errval_t err = get_next_event_disabled(ws, &channel, retclosure, NULL,
398 * \brief Check if there is an event pending on given waitset
400 * This is essentially a non-blocking variant of get_next_event(). It should be
401 * used with great care, to avoid the creation of busy-waiting loops.
405 * \returns LIB_ERR_NO_EVENT if nothing is pending
407 static errval_t check_for_event_disabled(struct waitset *ws, dispatcher_handle_t handle)
409 struct waitset_chanstate *chan;
411 poll_channels_disabled(handle);
412 chan = get_pending_event_disabled(ws, NULL);
416 return LIB_ERR_NO_EVENT;
419 errval_t check_for_event(struct waitset *ws)
424 dispatcher_handle_t handle = disp_disable();
425 err = check_for_event_disabled(ws, handle);
431 * \brief Wait for (block) and dispatch next event on given waitset
433 * Wait until something happens, either activity on some channel, or deferred
434 * call, and then call the corresponding closure.
439 errval_t event_dispatch(struct waitset *ws)
441 struct event_closure closure;
442 errval_t err = get_next_event(ws, &closure);
443 if (err_is_fail(err)) {
447 assert(closure.handler != NULL);
448 closure.handler(closure.arg);
452 errval_t event_dispatch_debug(struct waitset *ws)
454 struct event_closure closure;
455 struct waitset_chanstate *channel;
456 dispatcher_handle_t handle = disp_disable();
457 errval_t err = get_next_event_disabled(ws, &channel, &closure, NULL, handle,
460 if (err_is_fail(err)) {
464 assert(closure.handler != NULL);
465 closure.handler(closure.arg);
470 * \brief Dispatch events until a specific event is received
472 * Wait for events and dispatch them. If a specific event comes, don't call
473 * a closure, just return.
476 * \param waitfor Event, that we are waiting for
477 * \param error_var Error variable that can be changed by closures
480 errval_t wait_for_channel(struct waitset *ws, struct waitset_chanstate *waitfor,
483 assert(waitfor->waitset == ws);
485 struct event_closure closure;
486 struct waitset_chanstate *channel;
488 dispatcher_handle_t handle = disp_disable();
489 errval_t err = get_next_event_disabled(ws, &channel, &closure, waitfor,
492 if (err_is_fail(err)) {
496 if (channel == waitfor) {
499 assert(!channel->wait_for);
500 assert(closure.handler != NULL);
501 closure.handler(closure.arg);
502 if (err_is_fail(*error_var))
508 * \brief check and dispatch next event on given waitset
510 * Check if there is any pending activity on some channel, or deferred
511 * call, and then call the corresponding closure.
513 * Do not wait! In case of no pending events, return err LIB_ERR_NO_EVENT.
517 errval_t event_dispatch_non_block(struct waitset *ws)
519 struct waitset_chanstate *channel;
520 struct event_closure closure;
524 // are there any pending events on the waitset?
525 dispatcher_handle_t handle = disp_disable();
526 errval_t err = check_for_event_disabled(ws, handle);
527 if (err_is_fail(err)) {
531 err = get_next_event_disabled(ws, &channel, &closure, NULL, handle,
533 if (err_is_fail(err))
536 assert(closure.handler != NULL);
537 closure.handler(closure.arg);
544 * "Private" functions that are called only by the channel implementations
548 * \brief Initialise per-channel waitset state
550 * \param chan Channel state
551 * \param chantype Channel type
553 void waitset_chanstate_init(struct waitset_chanstate *chan,
554 enum ws_chantype chantype)
556 assert(chan != NULL);
557 chan->waitset = NULL;
558 chan->chantype = chantype;
559 chan->state = CHAN_UNREGISTERED;
561 chan->prev = chan->next = NULL;
563 chan->persistent = false;
565 chan->wait_for = NULL;
569 * \brief Destroy previously-initialised per-channel waitset state
570 * \param chan Channel state
572 void waitset_chanstate_destroy(struct waitset_chanstate *chan)
574 assert(chan != NULL);
575 if (chan->waitset != NULL) {
576 errval_t err = waitset_chan_deregister(chan);
577 assert(err_is_ok(err)); // can't fail if registered
582 * \brief Register a closure to be called when a channel is triggered
584 * In the Future, call the closure on a thread associated with the waitset
585 * when the channel is triggered. Only one closure may be registered per
586 * channel state at any one time.
587 * This function must only be called when disabled.
590 * \param chan Waitset's per-channel state
591 * \param closure Event handler
593 errval_t waitset_chan_register_disabled(struct waitset *ws,
594 struct waitset_chanstate *chan,
595 struct event_closure closure)
597 if (chan->waitset != NULL) {
598 return LIB_ERR_CHAN_ALREADY_REGISTERED;
604 // channel must not already be registered!
605 assert_disabled(chan->next == NULL && chan->prev == NULL);
606 assert_disabled(chan->state == CHAN_UNREGISTERED);
608 // this is probably insane! :)
609 assert_disabled(closure.handler != NULL);
612 chan->closure = closure;
614 // enqueue this channel on the waitset's queue of idle channels
615 enqueue(&ws->idle, chan);
616 chan->state = CHAN_IDLE;
622 * \brief Register a closure on a channel, and mark the channel as polled
624 * In the Future, call the closure on a thread associated with the waitset
625 * when the channel is triggered. Only one closure may be registered per
626 * channel state at any one time. Additionally, mark the channel as polled.
627 * This function must only be called when disabled.
630 * \param chan Waitset's per-channel state
631 * \param closure Event handler
632 * \param disp Current dispatcher pointer
634 errval_t waitset_chan_register_polled_disabled(struct waitset *ws,
635 struct waitset_chanstate *chan,
636 struct event_closure closure,
637 dispatcher_handle_t handle)
639 if (chan->waitset != NULL) {
640 return LIB_ERR_CHAN_ALREADY_REGISTERED;
646 // channel must not already be registered!
647 assert_disabled(chan->next == NULL && chan->prev == NULL);
648 assert_disabled(chan->state == CHAN_UNREGISTERED);
651 chan->closure = closure;
653 // enqueue this channel on the waitset's queue of polled channels
654 enqueue(&ws->polled, chan);
655 chan->state = CHAN_POLLED;
656 enqueue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
662 * \brief Register a closure to be called when a channel is triggered
664 * In the Future, call the closure on a thread associated with the waitset
665 * when the channel is triggered. Only one closure may be registered per
666 * channel state at any one time.
667 * This function must only be called when enabled.
670 * \param chan Waitset's per-channel state
671 * \param closure Event handler
673 errval_t waitset_chan_register(struct waitset *ws, struct waitset_chanstate *chan,
674 struct event_closure closure)
676 dispatcher_handle_t handle = disp_disable();
677 errval_t err = waitset_chan_register_disabled(ws, chan, closure);
683 * \brief Register a closure on a channel, and mark the channel as polled
685 * In the Future, call the closure on a thread associated with the waitset
686 * when the channel is triggered. Only one closure may be registered per
687 * channel state at any one time. Additionally, mark the channel as polled.
688 * This function must only be called when enabled. It is equivalent to
689 * calling waitset_chan_register() followed by waitset_chan_start_polling().
692 * \param chan Waitset's per-channel state
693 * \param closure Event handler
695 errval_t waitset_chan_register_polled(struct waitset *ws,
696 struct waitset_chanstate *chan,
697 struct event_closure closure)
699 dispatcher_handle_t handle = disp_disable();
700 errval_t err = waitset_chan_register_polled_disabled(ws, chan, closure, handle);
706 * \brief Cancel a previous callback registration
708 * Remove the registration for a callback on the given channel.
709 * This function must only be called when disabled.
711 * \param chan Waitset's per-channel state
713 errval_t waitset_chan_deregister_disabled(struct waitset_chanstate *chan,
714 dispatcher_handle_t handle)
716 assert_disabled(chan != NULL);
717 struct waitset *ws = chan->waitset;
719 return LIB_ERR_CHAN_NOT_REGISTERED;
722 // remove this channel from the queue in which it is waiting
723 chan->waitset = NULL;
724 assert_disabled(chan->next != NULL && chan->prev != NULL);
726 switch (chan->state) {
728 dequeue(&ws->idle, chan);
732 dequeue(&ws->polled, chan);
733 dequeue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
737 dequeue(&ws->pending, chan);
741 dequeue(&ws->waiting, chan);
745 assert_disabled(!"invalid channel state in deregister");
747 chan->state = CHAN_UNREGISTERED;
748 chan->wait_for = NULL;
753 * \brief Cancel a previous callback registration
755 * Remove the registration for a callback on the given channel.
756 * This function must only be called when enabled.
758 * \param chan Waitset's per-channel state
760 errval_t waitset_chan_deregister(struct waitset_chanstate *chan)
762 dispatcher_handle_t handle = disp_disable();
763 errval_t err = waitset_chan_deregister_disabled(chan, handle);
769 * \brief Migrate callback registrations to a new waitset.
771 * \param chan Old waitset's per-channel state to migrate
772 * \param new_ws New waitset to migrate to
774 void waitset_chan_migrate(struct waitset_chanstate *chan,
775 struct waitset *new_ws)
777 struct waitset *ws = chan->waitset;
779 // Only when registered
784 switch(chan->state) {
786 dequeue(&ws->idle, chan);
787 enqueue(&new_ws->idle, chan);
791 dequeue(&ws->polled, chan);
792 enqueue(&new_ws->polled, chan);
796 dequeue(&ws->pending, chan);
797 enqueue(&new_ws->pending, chan);
801 dequeue(&ws->waiting, chan);
802 enqueue(&new_ws->waiting, chan);
805 case CHAN_UNREGISTERED:
810 // Remember new waitset association
811 chan->waitset = new_ws;
815 * \brief Trigger an event callback on a channel
817 * Marks the given channel as having a pending event, causing some future call
818 * to get_next_event() to return the registered closure.
819 * This function must only be called when disabled.
821 * \param chan Waitset's per-channel state
822 * \param disp Current dispatcher pointer
824 errval_t waitset_chan_trigger_disabled(struct waitset_chanstate *chan,
825 dispatcher_handle_t handle)
827 assert_disabled(chan != NULL);
828 struct waitset *ws = chan->waitset;
829 assert_disabled(ws != NULL);
830 assert_disabled(chan->prev != NULL && chan->next != NULL);
832 // no-op if already pending
833 if (chan->state == CHAN_PENDING) {
837 // remove from previous queue (either idle or polled)
838 if (chan->state == CHAN_IDLE) {
839 dequeue(&ws->idle, chan);
841 assert_disabled(chan->state == CHAN_POLLED);
842 dequeue(&ws->polled, chan);
843 dequeue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
846 // else mark channel pending and move to end of pending event queue
847 enqueue(&ws->pending, chan);
848 chan->state = CHAN_PENDING;
850 // is there a thread blocked on this waitset? if so, awaken it with the event
851 struct thread *thread = find_recipient(ws, chan, thread_self());
854 ws->waiting_threads = thread;
855 t = thread_unblock_one_disabled(handle, &ws->waiting_threads, chan);
856 assert_disabled(t == NULL);
862 * \brief Trigger an event callback on a channel
864 * Marks the given channel as having a pending event, causing some future call
865 * to get_next_event() to return the registered closure.
866 * This function must only be called when enabled.
868 * \param chan Waitset's per-channel state
869 * \param disp Current dispatcher pointer
871 errval_t waitset_chan_trigger(struct waitset_chanstate *chan)
873 dispatcher_handle_t handle = disp_disable();
874 errval_t err = waitset_chan_trigger_disabled(chan, handle);
880 * \brief Trigger a specific event callback on an unregistered channel
882 * This function is equivalent to waitset_chan_register_disabled() immediately
883 * followed by waitset_chan_trigger_disabled(), but avoids unneccessary queue
884 * manipulation. This function must only be called when disabled.
887 * \param chan Waitset's per-channel state
888 * \param closure Event handler
889 * \param disp Current dispatcher pointer
891 errval_t waitset_chan_trigger_closure_disabled(struct waitset *ws,
892 struct waitset_chanstate *chan,
893 struct event_closure closure,
894 dispatcher_handle_t handle)
896 assert_disabled(chan != NULL);
897 assert_disabled(ws != NULL);
899 // check if already registered
900 if (chan->waitset != NULL || chan->state != CHAN_UNREGISTERED) {
901 return LIB_ERR_CHAN_ALREADY_REGISTERED;
904 assert_disabled(chan->prev == NULL && chan->next == NULL);
907 chan->closure = closure;
909 // mark channel pending and place on end of pending event queue
911 enqueue(&ws->pending, chan);
913 // ws->pending = chan;
914 chan->state = CHAN_PENDING;
916 // is there a thread blocked on this waitset? if so, awaken it with the event
917 struct thread *thread = find_recipient(ws, chan, thread_self());
920 ws->waiting_threads = thread;
921 t = thread_unblock_one_disabled(handle, &ws->waiting_threads, chan);
922 assert_disabled(t == NULL);
929 * \brief Trigger a specific event callback on an unregistered channel
931 * This function is equivalent to waitset_chan_register()
932 * followed by waitset_chan_trigger(), but avoids unneccessary queue
933 * manipulation. This function must only be called when enabled.
936 * \param chan Waitset's per-channel state
937 * \param closure Event handler
939 errval_t waitset_chan_trigger_closure(struct waitset *ws,
940 struct waitset_chanstate *chan,
941 struct event_closure closure)
943 dispatcher_handle_t disp = disp_disable();
944 errval_t err = waitset_chan_trigger_closure_disabled(ws, chan, closure, disp);