3 * \brief Waitset and low-level event handling mechanism
5 * A "wait set" is a collection of channels to wait on, much like an
6 * FDSET in POSIX. There should be a default, static wait set for each
7 * dispatcher. Threads which wait for events specify the wait set they
12 * Copyright (c) 2009-2012, ETH Zurich.
13 * Copyright (c) 2015, Hewlett Packard Enterprise Development LP.
14 * All rights reserved.
16 * This file is distributed under the terms in the attached LICENSE file.
17 * If you do not find this file, copies can be found by writing to:
18 * ETH Zurich D-INFK, Universitaetstr. 6, CH-8092 Zurich. Attn: Systems Group.
21 #include <barrelfish/barrelfish.h>
22 #include <barrelfish/waitset.h>
23 #include <barrelfish/waitset_chan.h>
24 #include <barrelfish/threads.h>
25 #include <barrelfish/dispatch.h>
26 #include "threads_priv.h"
27 #include "waitset_chan_priv.h"
31 #include <flounder/flounder.h>
33 #ifdef CONFIG_INTERCONNECT_DRIVER_UMP
34 # include <barrelfish/ump_endpoint.h>
37 /// Dequeue a chanstate from a queue
38 static void dequeue(struct waitset_chanstate **queue, struct waitset_chanstate *chan)
40 if (chan->next == chan) {
41 assert(chan->prev == chan);
42 assert(*queue == chan);
45 chan->prev->next = chan->next;
46 chan->next->prev = chan->prev;
51 chan->prev = chan->next = NULL;
54 /// Enqueue a chanstate on a queue
55 static void enqueue(struct waitset_chanstate **queue, struct waitset_chanstate *chan)
59 chan->next = chan->prev = chan;
62 chan->prev = (*queue)->prev;
63 chan->next->prev = chan;
64 chan->prev->next = chan;
68 /// Dequeue a chanstate from polled queue
69 static void dequeue_polled(struct waitset_chanstate **queue,
70 struct waitset_chanstate *chan)
72 if (chan->polled_next == chan) {
73 assert(chan->polled_prev == chan);
74 assert(*queue == chan);
77 chan->polled_prev->polled_next = chan->polled_next;
78 chan->polled_next->polled_prev = chan->polled_prev;
80 *queue = chan->polled_next;
83 chan->polled_prev = chan->polled_next = NULL;
86 /// Enqueue a chanstate on polled queue
87 static void enqueue_polled(struct waitset_chanstate **queue,
88 struct waitset_chanstate *chan)
92 chan->polled_next = chan->polled_prev = chan;
94 chan->polled_next = *queue;
95 chan->polled_prev = (*queue)->polled_prev;
96 chan->polled_next->polled_prev = chan;
97 chan->polled_prev->polled_next = chan;
102 * \brief Initialise a new waitset
104 void waitset_init(struct waitset *ws)
107 ws->pending = ws->polled = ws->idle = ws->waiting = NULL;
108 ws->waiting_threads = NULL;
112 * \brief Destroy a previously initialised waitset
114 errval_t waitset_destroy(struct waitset *ws)
118 // FIXME: do we want to support cancelling all the pending events/channels?
119 if (ws->pending || ws->waiting_threads) {
120 return LIB_ERR_WAITSET_IN_USE;
123 // remove idle and polled channels from waitset
124 struct waitset_chanstate *chan, *next;
125 for (chan = ws->idle; chan != NULL; chan = next) {
127 assert(chan->state == CHAN_IDLE);
128 assert(chan->waitset == ws);
129 chan->waitset = NULL;
130 chan->next = chan->prev = NULL;
132 if (next == ws->idle) {
138 for (chan = ws->polled; chan != NULL; chan = next) {
140 assert(chan->state == CHAN_POLLED);
141 assert(chan->waitset == ws);
142 chan->waitset = NULL;
143 chan->next = chan->prev = NULL;
145 if (next == ws->polled) {
154 /// Check if the thread can receive the event
155 static bool waitset_can_receive(struct waitset_chanstate *chan,
156 struct thread *thread)
160 if (!thread->mask_channels || !chan->masked) {
161 if (chan->wait_for) // if a thread is waiting for this specific event
162 res = chan->wait_for == thread;
164 res = (chan->token & 1 && !thread->token) // incoming token is a request
165 // and a thread is not waiting for a token
166 || (!chan->token && chan != thread->channel) // there's no token
167 // and a thread is not waiting specifically for that event
168 || (chan->token == thread->token && chan == thread->channel);
169 // there is a token and it matches thread's token and event
174 /// Returns a channel with a pending event on the given waitset matching
176 static struct waitset_chanstate *get_pending_event_disabled(struct waitset *ws,
177 struct waitset_chanstate *chan)
179 struct thread *me = thread_self_disabled();
181 if (chan) { // channel that we wait for
182 if (chan->state == CHAN_PENDING && waitset_can_receive(chan, me)) {
185 if (chan->state == CHAN_WAITING && waitset_can_receive(chan, me)) {
189 // check a waiting queue for matching event
190 for (chan = ws->waiting; chan; ) {
191 if (waitset_can_receive(chan, me)) {
192 assert_disabled(chan->state == CHAN_WAITING);
196 if (chan == ws->waiting)
199 // check a pending queue for matching event
200 for (chan = ws->pending; chan;) {
201 if (waitset_can_receive(chan, me)) {
202 assert_disabled(chan->state == CHAN_PENDING);
206 if (chan == ws->pending)
212 void arranet_polling_loop_proxy(void) __attribute__((weak));
213 void arranet_polling_loop_proxy(void)
215 USER_PANIC("Network polling not available without Arranet!\n");
218 void poll_ahci(struct waitset_chanstate *) __attribute__((weak));
219 void poll_ahci(struct waitset_chanstate *chan)
221 errval_t err = waitset_chan_trigger(chan);
222 assert(err_is_ok(err)); // should not be able to fail
225 /// Check polled channels
226 void poll_channels_disabled(dispatcher_handle_t handle) {
227 struct dispatcher_generic *dp = get_dispatcher_generic(handle);
228 struct waitset_chanstate *chan;
230 if (!dp->polled_channels)
232 chan = dp->polled_channels;
234 switch (chan->chantype) {
235 #ifdef CONFIG_INTERCONNECT_DRIVER_UMP
236 case CHANTYPE_UMP_IN: {
237 if (ump_endpoint_poll(chan)) {
238 errval_t err = waitset_chan_trigger_disabled(chan, handle);
239 assert(err_is_ok(err)); // should not fail
240 if (!dp->polled_channels) // restart scan
242 chan = dp->polled_channels;
245 chan = chan->polled_next;
247 #endif // CONFIG_INTERCONNECT_DRIVER_UMP
248 case CHANTYPE_LWIP_SOCKET:
249 arranet_polling_loop_proxy();
255 assert(!"invalid channel type to poll!");
257 } while (chan != dp->polled_channels);
260 /// Re-register a channel (if persistent)
261 static void reregister_channel(struct waitset *ws, struct waitset_chanstate *chan,
262 dispatcher_handle_t handle)
264 assert(chan->waitset == ws);
265 if (chan->state == CHAN_PENDING) {
266 dequeue(&ws->pending, chan);
268 assert(chan->state == CHAN_WAITING);
269 dequeue(&ws->waiting, chan);
273 if (chan->chantype == CHANTYPE_UMP_IN
274 || chan->chantype == CHANTYPE_LWIP_SOCKET
275 || chan->chantype == CHANTYPE_AHCI) {
276 enqueue(&ws->polled, chan);
277 enqueue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
278 chan->state = CHAN_POLLED;
280 enqueue(&ws->idle, chan);
281 chan->state = CHAN_IDLE;
285 /// Find a thread that is able to receive an event
286 static struct thread * find_recipient(struct waitset *ws,
287 struct waitset_chanstate *channel, struct thread *me)
289 struct thread *t = ws->waiting_threads;
294 if (waitset_can_receive(channel, t))
297 } while (t != ws->waiting_threads);
298 return ws->waiting_threads;
301 /// Wake up other thread if there's more pending events
302 static void wake_up_other_thread(dispatcher_handle_t handle, struct waitset *ws)
304 if (ws->pending && ws->waiting_threads) {
307 t = thread_unblock_one_disabled(handle, &ws->waiting_threads, NULL);
308 assert_disabled(t == NULL); // shouldn't see a remote thread
313 * \brief Get next pending event
315 * Check if there is a pending event that matches current thread and return it.
316 * Pending events are in a pending queue and in a waiting queue.
317 * A pending event then will be removed from a pending/waiting queue and become
318 * unregistered or, if it's persistent, will be re-registered to an idle queue
319 * or a polled queue (UMP channels) of a waitset.
320 * If there's no pending event, block this thread.
321 * If there's a pending event but it doesn't match our thread, don't remove it
322 * from a pending queue and wake up a matching thread.
323 * If there's no matching thread, add it to a waiting queue.
325 * \param ws Waitset with sources of events
326 * \param retchannel Holder of returned event
327 * \param retclosure Holder of returned closure
328 * \param waitfor Specific event that we're waiting for (can be NULL)
329 * \param handle Dispatcher's handle
330 * \param debug Debug mode (not used)
333 errval_t get_next_event_disabled(struct waitset *ws,
334 struct waitset_chanstate **retchannel, struct event_closure *retclosure,
335 struct waitset_chanstate *waitfor, dispatcher_handle_t handle, bool debug)
337 struct waitset_chanstate * chan;
340 chan = get_pending_event_disabled(ws, waitfor); // get our event
343 *retclosure = chan->closure;
344 chan->wait_for = NULL;
346 if (chan->persistent)
347 reregister_channel(ws, chan, handle);
349 waitset_chan_deregister_disabled(chan, handle);
350 wake_up_other_thread(handle, ws);
353 chan = ws->pending; // check a pending queue
354 if (!chan) { // if nothing then wait
355 thread_block_disabled(handle, &ws->waiting_threads);
357 } else { // something but it's not our event
358 if (!ws->waiting_threads) { // no other thread interested in
359 dequeue(&ws->pending, chan);
360 enqueue(&ws->waiting, chan);
361 chan->state = CHAN_WAITING;
364 // find a matching thread
366 for (t = ws->waiting_threads; t; ) {
367 if (waitset_can_receive(chan, t)) { // match found, wake it
368 ws->waiting_threads = t;
369 t = thread_unblock_one_disabled(handle,
370 &ws->waiting_threads, chan);
371 assert_disabled(t == NULL); // shouldn't see a remote thread
375 if (t == ws->waiting_threads) { // no recipient found
376 dequeue(&ws->pending, chan);
377 enqueue(&ws->waiting, chan);
378 chan->state = CHAN_WAITING;
389 * \brief Wait for (block) and return next event on given waitset
391 * Wait until something happens, either activity on some channel, or a deferred
392 * call, and then return the corresponding closure. This is the core of the
393 * event-handling system.
396 * \param retclosure Pointer to storage space for returned event closure
398 errval_t get_next_event(struct waitset *ws, struct event_closure *retclosure)
400 dispatcher_handle_t handle = disp_disable();
401 struct waitset_chanstate *channel;
402 errval_t err = get_next_event_disabled(ws, &channel, retclosure, NULL,
411 * \brief Check if there is an event pending on given waitset
413 * This is essentially a non-blocking variant of get_next_event(). It should be
414 * used with great care, to avoid the creation of busy-waiting loops.
418 * \returns LIB_ERR_NO_EVENT if nothing is pending
420 static errval_t check_for_event_disabled(struct waitset *ws, dispatcher_handle_t handle)
422 struct waitset_chanstate *chan;
424 poll_channels_disabled(handle);
425 chan = get_pending_event_disabled(ws, NULL);
429 return LIB_ERR_NO_EVENT;
432 errval_t check_for_event(struct waitset *ws)
437 dispatcher_handle_t handle = disp_disable();
438 err = check_for_event_disabled(ws, handle);
444 * \brief Wait for (block) and dispatch next event on given waitset
446 * Wait until something happens, either activity on some channel, or deferred
447 * call, and then call the corresponding closure.
452 errval_t event_dispatch(struct waitset *ws)
454 struct event_closure closure;
455 errval_t err = get_next_event(ws, &closure);
456 if (err_is_fail(err)) {
460 assert(closure.handler != NULL);
461 closure.handler(closure.arg);
465 errval_t event_dispatch_debug(struct waitset *ws)
467 struct event_closure closure;
468 struct waitset_chanstate *channel;
469 dispatcher_handle_t handle = disp_disable();
470 errval_t err = get_next_event_disabled(ws, &channel, &closure, NULL, handle,
473 if (err_is_fail(err)) {
477 assert(closure.handler != NULL);
478 closure.handler(closure.arg);
483 * \brief Dispatch events until a specific event is received
485 * Wait for events and dispatch them. If a specific event comes, don't call
486 * a closure, just return.
489 * \param waitfor Event, that we are waiting for
490 * \param error_var Error variable that can be changed by closures
493 errval_t wait_for_channel(struct waitset *ws, struct waitset_chanstate *waitfor,
496 assert(waitfor->waitset == ws);
498 struct event_closure closure;
499 struct waitset_chanstate *channel;
501 dispatcher_handle_t handle = disp_disable();
502 errval_t err = get_next_event_disabled(ws, &channel, &closure, waitfor,
505 if (err_is_fail(err)) {
509 if (channel == waitfor) {
512 assert(!channel->wait_for);
513 assert(closure.handler != NULL);
514 closure.handler(closure.arg);
515 if (err_is_fail(*error_var))
521 * \brief check and dispatch next event on given waitset
523 * Check if there is any pending activity on some channel, or deferred
524 * call, and then call the corresponding closure.
526 * Do not wait! In case of no pending events, return err LIB_ERR_NO_EVENT.
530 errval_t event_dispatch_non_block(struct waitset *ws)
532 struct waitset_chanstate *channel;
533 struct event_closure closure;
537 // are there any pending events on the waitset?
538 dispatcher_handle_t handle = disp_disable();
539 errval_t err = check_for_event_disabled(ws, handle);
540 if (err_is_fail(err)) {
544 err = get_next_event_disabled(ws, &channel, &closure, NULL, handle,
546 if (err_is_fail(err))
549 assert(closure.handler != NULL);
550 closure.handler(closure.arg);
557 * "Private" functions that are called only by the channel implementations
561 * \brief Initialise per-channel waitset state
563 * \param chan Channel state
564 * \param chantype Channel type
566 void waitset_chanstate_init(struct waitset_chanstate *chan,
567 enum ws_chantype chantype)
569 assert(chan != NULL);
570 chan->waitset = NULL;
571 chan->chantype = chantype;
572 chan->state = CHAN_UNREGISTERED;
574 chan->prev = chan->next = NULL;
576 chan->persistent = false;
578 chan->wait_for = NULL;
579 chan->masked = false;
583 * \brief Destroy previously-initialised per-channel waitset state
584 * \param chan Channel state
586 void waitset_chanstate_destroy(struct waitset_chanstate *chan)
588 assert(chan != NULL);
589 if (chan->waitset != NULL) {
590 errval_t err = waitset_chan_deregister(chan);
591 assert(err_is_ok(err)); // can't fail if registered
596 * \brief Register a closure to be called when a channel is triggered
598 * In the Future, call the closure on a thread associated with the waitset
599 * when the channel is triggered. Only one closure may be registered per
600 * channel state at any one time.
601 * This function must only be called when disabled.
604 * \param chan Waitset's per-channel state
605 * \param closure Event handler
607 errval_t waitset_chan_register_disabled(struct waitset *ws,
608 struct waitset_chanstate *chan,
609 struct event_closure closure)
611 if (chan->waitset != NULL) {
612 return LIB_ERR_CHAN_ALREADY_REGISTERED;
618 // channel must not already be registered!
619 assert_disabled(chan->next == NULL && chan->prev == NULL);
620 assert_disabled(chan->state == CHAN_UNREGISTERED);
622 // this is probably insane! :)
623 assert_disabled(closure.handler != NULL);
626 chan->closure = closure;
628 // enqueue this channel on the waitset's queue of idle channels
629 enqueue(&ws->idle, chan);
630 chan->state = CHAN_IDLE;
636 * \brief Register a closure on a channel, and mark the channel as polled
638 * In the Future, call the closure on a thread associated with the waitset
639 * when the channel is triggered. Only one closure may be registered per
640 * channel state at any one time. Additionally, mark the channel as polled.
641 * This function must only be called when disabled.
644 * \param chan Waitset's per-channel state
645 * \param closure Event handler
646 * \param disp Current dispatcher pointer
648 errval_t waitset_chan_register_polled_disabled(struct waitset *ws,
649 struct waitset_chanstate *chan,
650 struct event_closure closure,
651 dispatcher_handle_t handle)
653 if (chan->waitset != NULL) {
654 return LIB_ERR_CHAN_ALREADY_REGISTERED;
660 // channel must not already be registered!
661 assert_disabled(chan->next == NULL && chan->prev == NULL);
662 assert_disabled(chan->state == CHAN_UNREGISTERED);
665 chan->closure = closure;
667 // enqueue this channel on the waitset's queue of polled channels
668 enqueue(&ws->polled, chan);
669 chan->state = CHAN_POLLED;
670 enqueue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
676 * \brief Register a closure to be called when a channel is triggered
678 * In the Future, call the closure on a thread associated with the waitset
679 * when the channel is triggered. Only one closure may be registered per
680 * channel state at any one time.
681 * This function must only be called when enabled.
684 * \param chan Waitset's per-channel state
685 * \param closure Event handler
687 errval_t waitset_chan_register(struct waitset *ws, struct waitset_chanstate *chan,
688 struct event_closure closure)
690 dispatcher_handle_t handle = disp_disable();
691 errval_t err = waitset_chan_register_disabled(ws, chan, closure);
697 * \brief Register a closure on a channel, and mark the channel as polled
699 * In the Future, call the closure on a thread associated with the waitset
700 * when the channel is triggered. Only one closure may be registered per
701 * channel state at any one time. Additionally, mark the channel as polled.
702 * This function must only be called when enabled. It is equivalent to
703 * calling waitset_chan_register() followed by waitset_chan_start_polling().
706 * \param chan Waitset's per-channel state
707 * \param closure Event handler
709 errval_t waitset_chan_register_polled(struct waitset *ws,
710 struct waitset_chanstate *chan,
711 struct event_closure closure)
713 dispatcher_handle_t handle = disp_disable();
714 errval_t err = waitset_chan_register_polled_disabled(ws, chan, closure, handle);
720 * \brief Cancel a previous callback registration
722 * Remove the registration for a callback on the given channel.
723 * This function must only be called when disabled.
725 * \param chan Waitset's per-channel state
727 errval_t waitset_chan_deregister_disabled(struct waitset_chanstate *chan,
728 dispatcher_handle_t handle)
730 assert_disabled(chan != NULL);
731 struct waitset *ws = chan->waitset;
733 return LIB_ERR_CHAN_NOT_REGISTERED;
736 // remove this channel from the queue in which it is waiting
737 chan->waitset = NULL;
738 assert_disabled(chan->next != NULL && chan->prev != NULL);
740 switch (chan->state) {
742 dequeue(&ws->idle, chan);
746 dequeue(&ws->polled, chan);
747 dequeue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
751 dequeue(&ws->pending, chan);
755 dequeue(&ws->waiting, chan);
759 assert_disabled(!"invalid channel state in deregister");
761 chan->state = CHAN_UNREGISTERED;
762 chan->wait_for = NULL;
767 * \brief Cancel a previous callback registration
769 * Remove the registration for a callback on the given channel.
770 * This function must only be called when enabled.
772 * \param chan Waitset's per-channel state
774 errval_t waitset_chan_deregister(struct waitset_chanstate *chan)
776 dispatcher_handle_t handle = disp_disable();
777 errval_t err = waitset_chan_deregister_disabled(chan, handle);
783 * \brief Migrate callback registrations to a new waitset.
785 * \param chan Old waitset's per-channel state to migrate
786 * \param new_ws New waitset to migrate to
788 void waitset_chan_migrate(struct waitset_chanstate *chan,
789 struct waitset *new_ws)
791 struct waitset *ws = chan->waitset;
793 // Only when registered
798 switch(chan->state) {
800 dequeue(&ws->idle, chan);
801 enqueue(&new_ws->idle, chan);
805 dequeue(&ws->polled, chan);
806 enqueue(&new_ws->polled, chan);
810 dequeue(&ws->pending, chan);
811 enqueue(&new_ws->pending, chan);
815 dequeue(&ws->waiting, chan);
816 enqueue(&new_ws->waiting, chan);
819 case CHAN_UNREGISTERED:
824 // Remember new waitset association
825 chan->waitset = new_ws;
829 * \brief Trigger an event callback on a channel
831 * Marks the given channel as having a pending event, causing some future call
832 * to get_next_event() to return the registered closure.
833 * This function must only be called when disabled.
835 * \param chan Waitset's per-channel state
836 * \param disp Current dispatcher pointer
838 errval_t waitset_chan_trigger_disabled(struct waitset_chanstate *chan,
839 dispatcher_handle_t handle)
841 assert_disabled(chan != NULL);
842 struct waitset *ws = chan->waitset;
843 assert_disabled(ws != NULL);
844 assert_disabled(chan->prev != NULL && chan->next != NULL);
846 // no-op if already pending
847 if (chan->state == CHAN_PENDING) {
851 // remove from previous queue (either idle or polled)
852 if (chan->state == CHAN_IDLE) {
853 dequeue(&ws->idle, chan);
855 assert_disabled(chan->state == CHAN_POLLED);
856 dequeue(&ws->polled, chan);
857 dequeue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
860 // else mark channel pending and move to end of pending event queue
861 enqueue(&ws->pending, chan);
862 chan->state = CHAN_PENDING;
864 // is there a thread blocked on this waitset? if so, awaken it with the event
865 struct thread *thread = find_recipient(ws, chan, thread_self_disabled());
868 ws->waiting_threads = thread;
869 t = thread_unblock_one_disabled(handle, &ws->waiting_threads, chan);
870 assert_disabled(t == NULL);
876 * \brief Trigger an event callback on a channel
878 * Marks the given channel as having a pending event, causing some future call
879 * to get_next_event() to return the registered closure.
880 * This function must only be called when enabled.
882 * \param chan Waitset's per-channel state
883 * \param disp Current dispatcher pointer
885 errval_t waitset_chan_trigger(struct waitset_chanstate *chan)
887 dispatcher_handle_t handle = disp_disable();
888 errval_t err = waitset_chan_trigger_disabled(chan, handle);
894 * \brief Trigger a specific event callback on an unregistered channel
896 * This function is equivalent to waitset_chan_register_disabled() immediately
897 * followed by waitset_chan_trigger_disabled(), but avoids unneccessary queue
898 * manipulation. This function must only be called when disabled.
901 * \param chan Waitset's per-channel state
902 * \param closure Event handler
903 * \param disp Current dispatcher pointer
905 errval_t waitset_chan_trigger_closure_disabled(struct waitset *ws,
906 struct waitset_chanstate *chan,
907 struct event_closure closure,
908 dispatcher_handle_t handle)
910 assert_disabled(chan != NULL);
911 assert_disabled(ws != NULL);
913 // check if already registered
914 if (chan->waitset != NULL || chan->state != CHAN_UNREGISTERED) {
915 return LIB_ERR_CHAN_ALREADY_REGISTERED;
918 assert_disabled(chan->prev == NULL && chan->next == NULL);
921 chan->closure = closure;
923 // mark channel pending and place on end of pending event queue
925 enqueue(&ws->pending, chan);
927 // ws->pending = chan;
928 chan->state = CHAN_PENDING;
930 // is there a thread blocked on this waitset? if so, awaken it with the event
931 struct thread *thread = find_recipient(ws, chan, thread_self_disabled());
934 ws->waiting_threads = thread;
935 t = thread_unblock_one_disabled(handle, &ws->waiting_threads, chan);
936 assert_disabled(t == NULL);
943 * \brief Trigger a specific event callback on an unregistered channel
945 * This function is equivalent to waitset_chan_register()
946 * followed by waitset_chan_trigger(), but avoids unneccessary queue
947 * manipulation. This function must only be called when enabled.
950 * \param chan Waitset's per-channel state
951 * \param closure Event handler
953 errval_t waitset_chan_trigger_closure(struct waitset *ws,
954 struct waitset_chanstate *chan,
955 struct event_closure closure)
957 dispatcher_handle_t disp = disp_disable();
958 errval_t err = waitset_chan_trigger_closure_disabled(ws, chan, closure, disp);