T155: libbarrelfish: update copyright notice for event_queue.{c,h}
[barrelfish] / lib / barrelfish / event_queue.c
1 /**
2  * \file
3  * \brief Event queue implementatino
4  *
5  * This code implements a thread-safe queue of pending events which are
6  * serviced by a single waitset.
7  *
8  * [!] WARNING: current realization of event queues is unsuitable for
9  * cross-core operation. Crux of the issue: trigger channel for waitset
10  * belonging to *another* core but waitsets are assumed to be local to a
11  * dispatcher.
12  *
13  * Example scenario:
14  * - Consumer C (on core 0) calls get_next_event and blocks on
15  *   ws->waiting_threads.
16  * - Producer P (on core 1) invokes event_queue_add which in turn triggers
17  *   channel. P disables its dispatcher, unblocks a thread and C is returned.
18  *   Code assumes remote wakeup should not occur but C is on core 0.
19  *
20  * For further details see:
21  *   https://lists.inf.ethz.ch/mailman/private/barrelfish/2013/002746.html
22  */
23
24 /*
25  * Copyright (c) 2010, 2012, 2015, ETH Zurich.
26  * All rights reserved.
27  *
28  * This file is distributed under the terms in the attached LICENSE file.
29  * If you do not find this file, copies can be found by writing to:
30  * ETH Zurich D-INFK, Universitaetsstrasse 6, CH-8092 Zurich. Attn: Systems Group.
31  */
32
33 #include <barrelfish/barrelfish.h>
34 #include <barrelfish/event_queue.h>
35 #include <barrelfish/waitset_chan.h>
36
37 /**
38  * \brief Initialise a new event queue
39  *
40  * \param q Storage for event queue
41  * \param waitset Waitset that will service the queue
42  * \param mode Operating mode for the queue
43  */
44 void event_queue_init(struct event_queue *q, struct waitset *waitset,
45                       enum event_queue_mode mode)
46 {
47     waitset_chanstate_init(&q->waitset_state, CHANTYPE_EVENT_QUEUE);
48     thread_mutex_init(&q->mutex);
49     q->head = q->tail = NULL;
50     q->waitset = waitset;
51     q->mode = mode;
52 }
53
54 static struct event_queue_node *next_event(struct event_queue *q)
55 {
56     // dequeue the next node from the head
57     struct event_queue_node *qn = q->head;
58
59     if (qn == NULL) {
60         return NULL;
61     }
62
63     assert(qn->prev == NULL);
64
65     if (qn->next == NULL) {
66         assert(q->tail == qn);
67         q->head = q->tail = NULL;
68     } else {
69         qn->next->prev = NULL;
70         q->head = qn->next;
71     }
72
73     return qn;
74 }
75
76 static void event_queue_runner(void *arg)
77 {
78     struct event_queue *q = arg;
79     errval_t err;
80
81     assert(q->mode == EVENT_QUEUE_CONTINUOUS);
82
83     thread_mutex_lock(&q->mutex);
84
85     // dequeue the next node from the head
86     struct event_queue_node *qn = next_event(q);
87     if (qn == NULL) {
88         // an event was cancelled while we were pending
89         thread_mutex_unlock(&q->mutex);
90         return;
91     }
92
93     if (q->head != NULL) {
94         // queue is non-empty: trigger ourselves again
95         // (note: event registrations are single shot)
96         struct event_closure self = {
97             .handler = event_queue_runner,
98             .arg = arg
99         };
100         err = waitset_chan_trigger_closure(q->waitset, &q->waitset_state, self);
101         assert(err_is_ok(err)); // shouldn't fail
102     }
103
104     qn->run = true;
105     thread_mutex_unlock(&q->mutex);
106
107     // run closure
108     qn->event.handler(qn->event.arg);
109 }
110
111 /**
112  * \brief Add a new event to an event queue
113  *
114  * \param q Event queue
115  * \param qn Storage for queue node (uninitialised)
116  * \param event Event closure
117  */
118 void event_queue_add(struct event_queue *q, struct event_queue_node *qn,
119                      struct event_closure event)
120 {
121     errval_t err;
122
123     qn->event = event;
124     qn->run = false;
125
126     thread_mutex_lock(&q->mutex);
127
128     // enqueue at tail
129     if (q->tail == NULL) {
130         assert(q->head == NULL);
131         qn->next = qn->prev = NULL;
132         q->head = q->tail = qn;
133
134         // was empty: need to trigger queue runner if in continuous mode
135         if (q->mode == EVENT_QUEUE_CONTINUOUS) {
136             struct event_closure runner = {
137                 .handler = event_queue_runner,
138                 .arg = q
139             };
140             err = waitset_chan_trigger_closure(q->waitset, &q->waitset_state,
141                                                runner);
142             // apparently there's a situation when we dont really need to
143             // trigger the queue runner here, as we can get an
144             // LIB_ERR_CHAN_ALREADY REGISTERED error from the call.
145             // -SG, 2013-07-31
146             assert(err_is_ok(err) ||
147                    err_no(err) == LIB_ERR_CHAN_ALREADY_REGISTERED);
148         }
149     } else {
150         assert(q->tail != qn); // don't re-enqueue the same node!
151         assert(q->tail->next == NULL);
152         q->tail->next = qn;
153         qn->prev = q->tail;
154         qn->next = NULL;
155         q->tail = qn;
156
157         // runner is already active if it needs to be, don't need to do anything
158     }
159
160     thread_mutex_unlock(&q->mutex);
161 }
162
163 /**
164  * \brief Cancel an event previously added to an event queue
165  *
166  * \param q Event queue
167  * \param qn Queue node which was previously added to #q by event_queue_add()
168  */
169 errval_t event_queue_cancel(struct event_queue *q, struct event_queue_node *qn)
170 {
171     errval_t err;
172
173     if (qn->run) {
174         return LIB_ERR_EVENT_ALREADY_RUN;
175     }
176
177     thread_mutex_lock(&q->mutex);
178
179     if (qn->run) {
180         thread_mutex_unlock(&q->mutex);
181         return LIB_ERR_EVENT_ALREADY_RUN;
182     }
183
184     // dequeue
185     if (qn->next == NULL) {
186         assert(q->tail == qn);
187         q->tail = qn->prev;
188     } else {
189         qn->next->prev = qn->prev;
190     }
191
192     if (qn->prev == NULL) {
193         assert(q->head == qn);
194         q->head = qn->next;
195     } else {
196         qn->prev->next = qn->next;
197     }
198
199     // if the queue is now empty, we should cancel the runner
200     if (q->head == NULL && q->mode == EVENT_QUEUE_CONTINUOUS) {
201         assert(q->tail == NULL);
202         err = waitset_chan_deregister(&q->waitset_state);
203         if (err_is_fail(err)) {
204             // can fail if the event already fired, but this is ok
205             assert(err_no(err) == LIB_ERR_CHAN_NOT_REGISTERED);
206         }
207     }
208
209     thread_mutex_unlock(&q->mutex);
210     return SYS_ERR_OK;
211 }
212
213 /**
214  * \brief Flush all pending events from the event queue.
215  *
216  * \param q Event queue.
217  */
218 errval_t
219 event_queue_flush(struct event_queue *q) {
220
221     thread_mutex_lock(&q->mutex);
222
223     struct event_queue_node *qn;
224     do {
225         qn = next_event(q);
226     } while (qn);
227
228     thread_mutex_unlock(&q->mutex);
229
230     return SYS_ERR_OK;
231 }
232
233 /**
234  * \brief Trigger the next event on a queue which is operating in one-shot mode
235  *
236  * Must not be called before the previously-triggered event has run.
237  *
238  * \param q Event queue
239  */
240 errval_t event_queue_trigger(struct event_queue *q)
241 {
242     assert(q->mode == EVENT_QUEUE_ONESHOT);
243
244     thread_mutex_lock(&q->mutex);
245
246     struct event_queue_node *qn = next_event(q);
247
248     if (qn == NULL) {
249         thread_mutex_unlock(&q->mutex);
250         return LIB_ERR_EVENT_QUEUE_EMPTY;
251     }
252
253     qn->run = true;
254     thread_mutex_unlock(&q->mutex);
255
256     // trigger closure on waitset
257     return waitset_chan_trigger_closure(q->waitset, &q->waitset_state, qn->event);
258 }