3 * \brief Waitset and low-level event handling mechanism
5 * A "wait set" is a collection of channels to wait on, much like an
6 * FDSET in POSIX. There should be a default, static wait set for each
7 * dispatcher. Threads which wait for events specify the wait set they
12 * Copyright (c) 2009-2012, ETH Zurich.
13 * Copyright (c) 2015, Hewlett Packard Enterprise Development LP.
14 * All rights reserved.
16 * This file is distributed under the terms in the attached LICENSE file.
17 * If you do not find this file, copies can be found by writing to:
18 * ETH Zurich D-INFK, Universitaetstr. 6, CH-8092 Zurich. Attn: Systems Group.
21 #include <barrelfish/barrelfish.h>
22 #include <barrelfish/waitset.h>
23 #include <barrelfish/waitset_chan.h>
24 #include <barrelfish/threads.h>
25 #include <barrelfish/dispatch.h>
26 #include "threads_priv.h"
27 #include "waitset_chan_priv.h"
31 #include <flounder/flounder.h>
33 #ifdef CONFIG_INTERCONNECT_DRIVER_UMP
34 # include <barrelfish/ump_endpoint.h>
37 /// Dequeue a chanstate from a queue
38 static void dequeue(struct waitset_chanstate **queue, struct waitset_chanstate *chan)
40 if (chan->next == chan) {
41 assert(chan->prev == chan);
42 assert(*queue == chan);
45 chan->prev->next = chan->next;
46 chan->next->prev = chan->prev;
51 chan->prev = chan->next = NULL;
54 /// Enqueue a chanstate on a queue
55 static void enqueue(struct waitset_chanstate **queue, struct waitset_chanstate *chan)
59 chan->next = chan->prev = chan;
62 chan->prev = (*queue)->prev;
63 chan->next->prev = chan;
64 chan->prev->next = chan;
68 /// Dequeue a chanstate from polled queue
69 static void dequeue_polled(struct waitset_chanstate **queue,
70 struct waitset_chanstate *chan)
72 if (chan->polled_next == chan) {
73 assert(chan->polled_prev == chan);
74 assert(*queue == chan);
77 chan->polled_prev->polled_next = chan->polled_next;
78 chan->polled_next->polled_prev = chan->polled_prev;
80 *queue = chan->polled_next;
83 chan->polled_prev = chan->polled_next = NULL;
86 /// Enqueue a chanstate on polled queue
87 static void enqueue_polled(struct waitset_chanstate **queue,
88 struct waitset_chanstate *chan)
92 chan->polled_next = chan->polled_prev = chan;
94 chan->polled_next = *queue;
95 chan->polled_prev = (*queue)->polled_prev;
96 chan->polled_next->polled_prev = chan;
97 chan->polled_prev->polled_next = chan;
102 * \brief Initialise a new waitset
104 void waitset_init(struct waitset *ws)
107 ws->pending = ws->polled = ws->idle = ws->waiting = NULL;
108 ws->waiting_threads = NULL;
112 * \brief Destroy a previously initialised waitset
114 errval_t waitset_destroy(struct waitset *ws)
118 // FIXME: do we want to support cancelling all the pending events/channels?
119 if (ws->pending || ws->waiting_threads) {
120 return LIB_ERR_WAITSET_IN_USE;
123 // remove idle and polled channels from waitset
124 struct waitset_chanstate *chan, *next;
125 for (chan = ws->idle; chan != NULL; chan = next) {
127 assert(chan->state == CHAN_IDLE);
128 assert(chan->waitset == ws);
129 chan->waitset = NULL;
130 chan->next = chan->prev = NULL;
132 if (next == ws->idle) {
138 for (chan = ws->polled; chan != NULL; chan = next) {
140 assert(chan->state == CHAN_POLLED);
141 assert(chan->waitset == ws);
142 chan->waitset = NULL;
143 chan->next = chan->prev = NULL;
145 if (next == ws->polled) {
154 /// Check if a thread can receive an event
155 static bool waitset_check_token(struct waitset_chanstate *chan,
156 struct thread *thread)
160 if (chan->wait_for) // if a thread is waiting for this specific event
161 res = chan->wait_for == thread;
163 res = (chan->token & 1 && !thread->token) // incoming token is a request
164 // and a thread is not waiting for a token
165 || (!chan->token && chan != thread->channel) // there's no token
166 // and a thread is not waiting specifically for that event
167 || (chan->token == thread->token && chan == thread->channel);
168 // there is a token and it matches thread's token and event
172 /// Returns a channel with a pending event on the given waitset matching
174 static struct waitset_chanstate *get_pending_event_disabled(struct waitset *ws,
175 struct waitset_chanstate *chan)
177 struct thread *me = thread_self();
179 if (chan) { // channel that we wait for
180 if (chan->state == CHAN_PENDING && waitset_check_token(chan, me)) {
183 if (chan->state == CHAN_WAITING && waitset_check_token(chan, me)) {
187 // check a waiting queue for matching event
188 for (chan = ws->waiting; chan; ) {
189 if (waitset_check_token(chan, me)) {
190 assert_disabled(chan->state == CHAN_WAITING);
194 if (chan == ws->waiting)
197 // check a pending queue for matching event
198 for (chan = ws->pending; chan;) {
199 if (waitset_check_token(chan, me)) {
200 assert_disabled(chan->state == CHAN_PENDING);
204 if (chan == ws->pending)
210 void arranet_polling_loop_proxy(void) __attribute__((weak));
211 void arranet_polling_loop_proxy(void)
213 USER_PANIC("Network polling not available without Arranet!\n");
216 void poll_ahci(struct waitset_chanstate *) __attribute__((weak));
217 void poll_ahci(struct waitset_chanstate *chan)
219 errval_t err = waitset_chan_trigger(chan);
220 assert(err_is_ok(err)); // should not be able to fail
223 /// Check polled channels
224 void poll_channels_disabled(dispatcher_handle_t handle) {
225 struct dispatcher_generic *dp = get_dispatcher_generic(handle);
226 struct waitset_chanstate *chan;
228 if (!dp->polled_channels)
230 chan = dp->polled_channels;
232 switch (chan->chantype) {
233 #ifdef CONFIG_INTERCONNECT_DRIVER_UMP
234 case CHANTYPE_UMP_IN: {
235 if (ump_endpoint_poll(chan)) {
236 errval_t err = waitset_chan_trigger_disabled(chan, handle);
237 assert(err_is_ok(err)); // should not fail
238 if (!dp->polled_channels) // restart scan
240 chan = dp->polled_channels;
243 chan = chan->polled_next;
245 #endif // CONFIG_INTERCONNECT_DRIVER_UMP
246 case CHANTYPE_LWIP_SOCKET:
247 arranet_polling_loop_proxy();
253 assert(!"invalid channel type to poll!");
255 } while (chan != dp->polled_channels);
258 /// Re-register a channel (if persistent)
259 static void reregister_channel(struct waitset *ws, struct waitset_chanstate *chan,
260 dispatcher_handle_t handle)
262 assert(chan->waitset == ws);
263 if (chan->state == CHAN_PENDING) {
264 dequeue(&ws->pending, chan);
266 assert(chan->state == CHAN_WAITING);
267 dequeue(&ws->waiting, chan);
271 if (chan->chantype == CHANTYPE_UMP_IN
272 || chan->chantype == CHANTYPE_LWIP_SOCKET
273 || chan->chantype == CHANTYPE_AHCI) {
274 enqueue(&ws->polled, chan);
275 enqueue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
276 chan->state = CHAN_POLLED;
278 enqueue(&ws->idle, chan);
279 chan->state = CHAN_IDLE;
283 /// Find a thread that is able to receive an event
284 static struct thread * find_recipient(struct waitset *ws,
285 struct waitset_chanstate *channel, struct thread *me)
287 struct thread *t = ws->waiting_threads;
292 if (waitset_check_token(channel, t))
295 } while (t != ws->waiting_threads);
296 return ws->waiting_threads;
299 /// Wake up other thread if there's more pending events
300 static void wake_up_other_thread(dispatcher_handle_t handle, struct waitset *ws)
302 if (ws->pending && ws->waiting_threads) {
305 t = thread_unblock_one_disabled(handle, &ws->waiting_threads, NULL);
306 assert_disabled(t == NULL); // shouldn't see a remote thread
311 * \brief Get next pending event
313 * Check if there is a pending event that matches current thread and return it.
314 * Pending events are in a pending queue and in a waiting queue.
315 * A pending event then will be removed from a pending/waiting queue and become
316 * unregistered or, if it's persistent, will be re-registered to an idle queue
317 * or a polled queue (UMP channels) of a waitset.
318 * If there's no pending event, block this thread.
319 * If there's a pending event but it doesn't match our thread, don't remove it
320 * from a pending queue and wake up a matching thread.
321 * If there's no matching thread, add it to a waiting queue.
323 * \param ws Waitset with sources of events
324 * \param retchannel Holder of returned event
325 * \param retclosure Holder of returned closure
326 * \param waitfor Specific event that we're waiting for (can be NULL)
327 * \param handle Dispatcher's handle
328 * \param debug Debug mode (not used)
331 errval_t get_next_event_disabled(struct waitset *ws,
332 struct waitset_chanstate **retchannel, struct event_closure *retclosure,
333 struct waitset_chanstate *waitfor, dispatcher_handle_t handle, bool debug)
335 struct waitset_chanstate * chan;
338 chan = get_pending_event_disabled(ws, waitfor); // get our event
341 *retclosure = chan->closure;
342 chan->wait_for = NULL;
344 if (chan->persistent)
345 reregister_channel(ws, chan, handle);
347 waitset_chan_deregister_disabled(chan, handle);
348 wake_up_other_thread(handle, ws);
351 chan = ws->pending; // check a pending queue
352 if (!chan) { // if nothing then wait
353 thread_block_disabled(handle, &ws->waiting_threads);
355 } else { // something but it's not our event
356 if (!ws->waiting_threads) { // no other thread interested in
357 dequeue(&ws->pending, chan);
358 enqueue(&ws->waiting, chan);
359 chan->state = CHAN_WAITING;
362 // find a matching thread
364 for (t = ws->waiting_threads; t; ) {
365 if (waitset_check_token(chan, t)) { // match found, wake it
366 ws->waiting_threads = t;
367 t = thread_unblock_one_disabled(handle,
368 &ws->waiting_threads, chan);
369 assert_disabled(t == NULL); // shouldn't see a remote thread
373 if (t == ws->waiting_threads) { // no recipient found
374 dequeue(&ws->pending, chan);
375 enqueue(&ws->waiting, chan);
376 chan->state = CHAN_WAITING;
387 * \brief Wait for (block) and return next event on given waitset
389 * Wait until something happens, either activity on some channel, or a deferred
390 * call, and then return the corresponding closure. This is the core of the
391 * event-handling system.
394 * \param retclosure Pointer to storage space for returned event closure
396 errval_t get_next_event(struct waitset *ws, struct event_closure *retclosure)
398 dispatcher_handle_t handle = disp_disable();
399 struct waitset_chanstate *channel;
400 errval_t err = get_next_event_disabled(ws, &channel, retclosure, NULL,
409 * \brief Check if there is an event pending on given waitset
411 * This is essentially a non-blocking variant of get_next_event(). It should be
412 * used with great care, to avoid the creation of busy-waiting loops.
416 * \returns LIB_ERR_NO_EVENT if nothing is pending
418 static errval_t check_for_event_disabled(struct waitset *ws, dispatcher_handle_t handle)
420 struct waitset_chanstate *chan;
422 poll_channels_disabled(handle);
423 chan = get_pending_event_disabled(ws, NULL);
427 return LIB_ERR_NO_EVENT;
430 errval_t check_for_event(struct waitset *ws)
435 dispatcher_handle_t handle = disp_disable();
436 err = check_for_event_disabled(ws, handle);
442 * \brief Wait for (block) and dispatch next event on given waitset
444 * Wait until something happens, either activity on some channel, or deferred
445 * call, and then call the corresponding closure.
450 errval_t event_dispatch(struct waitset *ws)
452 struct event_closure closure;
453 errval_t err = get_next_event(ws, &closure);
454 if (err_is_fail(err)) {
458 assert(closure.handler != NULL);
459 closure.handler(closure.arg);
463 errval_t event_dispatch_debug(struct waitset *ws)
465 struct event_closure closure;
466 struct waitset_chanstate *channel;
467 dispatcher_handle_t handle = disp_disable();
468 errval_t err = get_next_event_disabled(ws, &channel, &closure, NULL, handle,
471 if (err_is_fail(err)) {
475 assert(closure.handler != NULL);
476 closure.handler(closure.arg);
481 * \brief Dispatch events until a specific event is received
483 * Wait for events and dispatch them. If a specific event comes, don't call
484 * a closure, just return.
487 * \param waitfor Event, that we are waiting for
488 * \param error_var Error variable that can be changed by closures
491 errval_t wait_for_channel(struct waitset *ws, struct waitset_chanstate *waitfor,
494 assert(waitfor->waitset == ws);
496 struct event_closure closure;
497 struct waitset_chanstate *channel;
499 dispatcher_handle_t handle = disp_disable();
500 errval_t err = get_next_event_disabled(ws, &channel, &closure, waitfor,
503 if (err_is_fail(err)) {
507 if (channel == waitfor) {
510 assert(!channel->wait_for);
511 assert(closure.handler != NULL);
512 closure.handler(closure.arg);
513 if (err_is_fail(*error_var))
519 * \brief check and dispatch next event on given waitset
521 * Check if there is any pending activity on some channel, or deferred
522 * call, and then call the corresponding closure.
524 * Do not wait! In case of no pending events, return err LIB_ERR_NO_EVENT.
528 errval_t event_dispatch_non_block(struct waitset *ws)
530 struct waitset_chanstate *channel;
531 struct event_closure closure;
535 // are there any pending events on the waitset?
536 dispatcher_handle_t handle = disp_disable();
537 errval_t err = check_for_event_disabled(ws, handle);
538 if (err_is_fail(err)) {
542 err = get_next_event_disabled(ws, &channel, &closure, NULL, handle,
544 if (err_is_fail(err))
547 assert(closure.handler != NULL);
548 closure.handler(closure.arg);
555 * "Private" functions that are called only by the channel implementations
559 * \brief Initialise per-channel waitset state
561 * \param chan Channel state
562 * \param chantype Channel type
564 void waitset_chanstate_init(struct waitset_chanstate *chan,
565 enum ws_chantype chantype)
567 assert(chan != NULL);
568 chan->waitset = NULL;
569 chan->chantype = chantype;
570 chan->state = CHAN_UNREGISTERED;
572 chan->prev = chan->next = NULL;
574 chan->persistent = false;
576 chan->wait_for = NULL;
580 * \brief Destroy previously-initialised per-channel waitset state
581 * \param chan Channel state
583 void waitset_chanstate_destroy(struct waitset_chanstate *chan)
585 assert(chan != NULL);
586 if (chan->waitset != NULL) {
587 errval_t err = waitset_chan_deregister(chan);
588 assert(err_is_ok(err)); // can't fail if registered
593 * \brief Register a closure to be called when a channel is triggered
595 * In the Future, call the closure on a thread associated with the waitset
596 * when the channel is triggered. Only one closure may be registered per
597 * channel state at any one time.
598 * This function must only be called when disabled.
601 * \param chan Waitset's per-channel state
602 * \param closure Event handler
604 errval_t waitset_chan_register_disabled(struct waitset *ws,
605 struct waitset_chanstate *chan,
606 struct event_closure closure)
608 if (chan->waitset != NULL) {
609 return LIB_ERR_CHAN_ALREADY_REGISTERED;
615 // channel must not already be registered!
616 assert_disabled(chan->next == NULL && chan->prev == NULL);
617 assert_disabled(chan->state == CHAN_UNREGISTERED);
619 // this is probably insane! :)
620 assert_disabled(closure.handler != NULL);
623 chan->closure = closure;
625 // enqueue this channel on the waitset's queue of idle channels
626 enqueue(&ws->idle, chan);
627 chan->state = CHAN_IDLE;
633 * \brief Register a closure on a channel, and mark the channel as polled
635 * In the Future, call the closure on a thread associated with the waitset
636 * when the channel is triggered. Only one closure may be registered per
637 * channel state at any one time. Additionally, mark the channel as polled.
638 * This function must only be called when disabled.
641 * \param chan Waitset's per-channel state
642 * \param closure Event handler
643 * \param disp Current dispatcher pointer
645 errval_t waitset_chan_register_polled_disabled(struct waitset *ws,
646 struct waitset_chanstate *chan,
647 struct event_closure closure,
648 dispatcher_handle_t handle)
650 if (chan->waitset != NULL) {
651 return LIB_ERR_CHAN_ALREADY_REGISTERED;
657 // channel must not already be registered!
658 assert_disabled(chan->next == NULL && chan->prev == NULL);
659 assert_disabled(chan->state == CHAN_UNREGISTERED);
662 chan->closure = closure;
664 // enqueue this channel on the waitset's queue of polled channels
665 enqueue(&ws->polled, chan);
666 chan->state = CHAN_POLLED;
667 enqueue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
673 * \brief Register a closure to be called when a channel is triggered
675 * In the Future, call the closure on a thread associated with the waitset
676 * when the channel is triggered. Only one closure may be registered per
677 * channel state at any one time.
678 * This function must only be called when enabled.
681 * \param chan Waitset's per-channel state
682 * \param closure Event handler
684 errval_t waitset_chan_register(struct waitset *ws, struct waitset_chanstate *chan,
685 struct event_closure closure)
687 dispatcher_handle_t handle = disp_disable();
688 errval_t err = waitset_chan_register_disabled(ws, chan, closure);
694 * \brief Register a closure on a channel, and mark the channel as polled
696 * In the Future, call the closure on a thread associated with the waitset
697 * when the channel is triggered. Only one closure may be registered per
698 * channel state at any one time. Additionally, mark the channel as polled.
699 * This function must only be called when enabled. It is equivalent to
700 * calling waitset_chan_register() followed by waitset_chan_start_polling().
703 * \param chan Waitset's per-channel state
704 * \param closure Event handler
706 errval_t waitset_chan_register_polled(struct waitset *ws,
707 struct waitset_chanstate *chan,
708 struct event_closure closure)
710 dispatcher_handle_t handle = disp_disable();
711 errval_t err = waitset_chan_register_polled_disabled(ws, chan, closure, handle);
717 * \brief Cancel a previous callback registration
719 * Remove the registration for a callback on the given channel.
720 * This function must only be called when disabled.
722 * \param chan Waitset's per-channel state
724 errval_t waitset_chan_deregister_disabled(struct waitset_chanstate *chan,
725 dispatcher_handle_t handle)
727 assert_disabled(chan != NULL);
728 struct waitset *ws = chan->waitset;
730 return LIB_ERR_CHAN_NOT_REGISTERED;
733 // remove this channel from the queue in which it is waiting
734 chan->waitset = NULL;
735 assert_disabled(chan->next != NULL && chan->prev != NULL);
737 switch (chan->state) {
739 dequeue(&ws->idle, chan);
743 dequeue(&ws->polled, chan);
744 dequeue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
748 dequeue(&ws->pending, chan);
752 dequeue(&ws->waiting, chan);
756 assert_disabled(!"invalid channel state in deregister");
758 chan->state = CHAN_UNREGISTERED;
759 chan->wait_for = NULL;
764 * \brief Cancel a previous callback registration
766 * Remove the registration for a callback on the given channel.
767 * This function must only be called when enabled.
769 * \param chan Waitset's per-channel state
771 errval_t waitset_chan_deregister(struct waitset_chanstate *chan)
773 dispatcher_handle_t handle = disp_disable();
774 errval_t err = waitset_chan_deregister_disabled(chan, handle);
780 * \brief Migrate callback registrations to a new waitset.
782 * \param chan Old waitset's per-channel state to migrate
783 * \param new_ws New waitset to migrate to
785 void waitset_chan_migrate(struct waitset_chanstate *chan,
786 struct waitset *new_ws)
788 struct waitset *ws = chan->waitset;
790 // Only when registered
795 switch(chan->state) {
797 dequeue(&ws->idle, chan);
798 enqueue(&new_ws->idle, chan);
802 dequeue(&ws->polled, chan);
803 enqueue(&new_ws->polled, chan);
807 dequeue(&ws->pending, chan);
808 enqueue(&new_ws->pending, chan);
812 dequeue(&ws->waiting, chan);
813 enqueue(&new_ws->waiting, chan);
816 case CHAN_UNREGISTERED:
821 // Remember new waitset association
822 chan->waitset = new_ws;
826 * \brief Trigger an event callback on a channel
828 * Marks the given channel as having a pending event, causing some future call
829 * to get_next_event() to return the registered closure.
830 * This function must only be called when disabled.
832 * \param chan Waitset's per-channel state
833 * \param disp Current dispatcher pointer
835 errval_t waitset_chan_trigger_disabled(struct waitset_chanstate *chan,
836 dispatcher_handle_t handle)
838 assert_disabled(chan != NULL);
839 struct waitset *ws = chan->waitset;
840 assert_disabled(ws != NULL);
841 assert_disabled(chan->prev != NULL && chan->next != NULL);
843 // no-op if already pending
844 if (chan->state == CHAN_PENDING) {
848 // remove from previous queue (either idle or polled)
849 if (chan->state == CHAN_IDLE) {
850 dequeue(&ws->idle, chan);
852 assert_disabled(chan->state == CHAN_POLLED);
853 dequeue(&ws->polled, chan);
854 dequeue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
857 // else mark channel pending and move to end of pending event queue
858 enqueue(&ws->pending, chan);
859 chan->state = CHAN_PENDING;
861 // is there a thread blocked on this waitset? if so, awaken it with the event
862 struct thread *thread = find_recipient(ws, chan, thread_self());
865 ws->waiting_threads = thread;
866 t = thread_unblock_one_disabled(handle, &ws->waiting_threads, chan);
867 assert_disabled(t == NULL);
873 * \brief Trigger an event callback on a channel
875 * Marks the given channel as having a pending event, causing some future call
876 * to get_next_event() to return the registered closure.
877 * This function must only be called when enabled.
879 * \param chan Waitset's per-channel state
880 * \param disp Current dispatcher pointer
882 errval_t waitset_chan_trigger(struct waitset_chanstate *chan)
884 dispatcher_handle_t handle = disp_disable();
885 errval_t err = waitset_chan_trigger_disabled(chan, handle);
891 * \brief Trigger a specific event callback on an unregistered channel
893 * This function is equivalent to waitset_chan_register_disabled() immediately
894 * followed by waitset_chan_trigger_disabled(), but avoids unneccessary queue
895 * manipulation. This function must only be called when disabled.
898 * \param chan Waitset's per-channel state
899 * \param closure Event handler
900 * \param disp Current dispatcher pointer
902 errval_t waitset_chan_trigger_closure_disabled(struct waitset *ws,
903 struct waitset_chanstate *chan,
904 struct event_closure closure,
905 dispatcher_handle_t handle)
907 assert_disabled(chan != NULL);
908 assert_disabled(ws != NULL);
910 // check if already registered
911 if (chan->waitset != NULL || chan->state != CHAN_UNREGISTERED) {
912 return LIB_ERR_CHAN_ALREADY_REGISTERED;
915 assert_disabled(chan->prev == NULL && chan->next == NULL);
918 chan->closure = closure;
920 // mark channel pending and place on end of pending event queue
922 enqueue(&ws->pending, chan);
924 // ws->pending = chan;
925 chan->state = CHAN_PENDING;
927 // is there a thread blocked on this waitset? if so, awaken it with the event
928 struct thread *thread = find_recipient(ws, chan, thread_self());
931 ws->waiting_threads = thread;
932 t = thread_unblock_one_disabled(handle, &ws->waiting_threads, chan);
933 assert_disabled(t == NULL);
940 * \brief Trigger a specific event callback on an unregistered channel
942 * This function is equivalent to waitset_chan_register()
943 * followed by waitset_chan_trigger(), but avoids unneccessary queue
944 * manipulation. This function must only be called when enabled.
947 * \param chan Waitset's per-channel state
948 * \param closure Event handler
950 errval_t waitset_chan_trigger_closure(struct waitset *ws,
951 struct waitset_chanstate *chan,
952 struct event_closure closure)
954 dispatcher_handle_t disp = disp_disable();
955 errval_t err = waitset_chan_trigger_closure_disabled(ws, chan, closure, disp);