flounder: making const pointers in receiving handlers, using CONST_CAST as a temporar...
[barrelfish] / usr / monitor / multihop_support.c
1 /**
2  * \file
3  * \brief Multi-hop channel support at the monitor
4  */
5
6 /*
7  * Copyright (c) 2009, 2010, ETH Zurich.
8  * All rights reserved.
9  *
10  * This file is distributed under the terms in the attached LICENSE file.
11  * If you do not find this file, copies can be found by writing to:
12  * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
13  */
14
15 #include "monitor.h"
16 #include <collections/hash_table.h>
17 #include <bench/bench.h>
18 #include <barrelfish/multihop_chan.h>
19
20 ///////////////////////////////////////////////////////
21
22 // ROUTING TABLE
23
24 ///////////////////////////////////////////////////////
25
26 /* The routing table is used to determine where to
27  * forward a connection set-up request.
28  *
29  * The routing table is constructed by the RTS
30  * (Routing table set-up dispatcher), using
31  * information from the System Knowledge Base (SKB).
32  * The RTS will send the routing table to the monitor
33  * that is first booted once it has constructed the
34  * routing table.
35  *
36  * In cases where there is no SKB, the RTS will also
37  * never send us an routing table. We use direct routing
38  * is this case.
39  */
40
41 // the routing table (as two dimensional array indexed by source and dest core)
42 static coreid_t **routing_table;
43
44 // the maximum source core ID in the routing table
45 static coreid_t routing_table_max_coreid;
46
47 // the number of outstanding entries to expect in the routing table
48 // (this is a kludge used while receiving entries from the rts program)
49 static coreid_t routing_table_nentries;
50
51 // hack for synchronisation when requesting routing table from another monitor
52 static bool saw_routing_table_response;
53
54 /*
55  *  Print the routing table of a monitor, if present.
56  */
57 static void multihop_print_routing_table(void)
58 {
59 #if MULTIHOP_DEBUG_ENABLED
60     if (routing_table == NULL) {
61         MULTIHOP_DEBUG("routing table not present on core %u\n", my_core_id);
62         return;
63     }
64
65     size_t buffer_size = (((size_t)routing_table_max_coreid) + 1) * 5;
66     char buffer[buffer_size];
67
68     // Print the header
69     MULTIHOP_DEBUG("routing table of monitor %u:\n", my_core_id);
70     {
71         char *p = buffer;
72         for (unsigned i = 0; i <= routing_table_max_coreid; i++) {
73             p += sprintf(p, " %3u", i);
74         }
75         MULTIHOP_DEBUG("      To:%s\n", buffer);
76     }
77
78     // Print each line
79     for (unsigned src = 0; src <= routing_table_max_coreid; src++) {
80         if (routing_table[src] == NULL) {
81             continue;
82         }
83
84         // convert (my part of) the routing table into a single string
85         char *p = buffer;
86         int total_char = 0, w_char = 0;
87         for (unsigned i = 0; i <= routing_table_max_coreid; i++) {
88             w_char = snprintf(p, buffer_size - total_char, " %3u",
89                               routing_table[src][i]);
90             assert(w_char > 0);
91             total_char += w_char;
92             p += w_char;
93         }
94         MULTIHOP_DEBUG("From %3u:%s\n", src, buffer);
95     }
96 #endif // MULTIHOP_DEBUG_ENABLED
97 }
98
99 // start to receive a new routing table from the RTS
100 static void multihop_routing_table_new(struct monitor_binding *b,
101                                        coreid_t max_coreid, coreid_t nentries)
102 {
103     // sanity-check input (FIXME: report errors!)
104     assert(max_coreid >= my_core_id);
105     assert(nentries > 0 && nentries <= (max_coreid + 1));
106
107     // FIXME: we don't yet support changes to the existing routing table
108     assert(routing_table == NULL);
109
110     routing_table_max_coreid = max_coreid;
111     routing_table_nentries = nentries;
112
113     // allocate space for the max core ID
114     routing_table = calloc(((uintptr_t)max_coreid) + 1, sizeof(coreid_t *));
115     assert(routing_table != NULL);
116 }
117
118 // receive a part of the routing table from RTS (routing table set-up dispatcher)
119 static void multihop_routing_table_set(struct monitor_binding *b,
120                                        coreid_t from, const coreid_t *to,
121                                        size_t len)
122 {
123     // sanity-check input (FIXME: report errors!)
124     // FIXME: we don't yet support changes to the existing routing table
125     assert(routing_table != NULL);
126     assert(from <= routing_table_max_coreid);
127     assert(routing_table[from] == NULL);
128     assert(len == routing_table_max_coreid + 1);
129     routing_table[from] = memdup(to, len * sizeof(coreid_t));
130
131     if (--routing_table_nentries == 0) {
132         // we have received the complete table!
133         MULTIHOP_DEBUG("monitor on core %d has received the complete"
134                        " routing table (from RTS)\n", my_core_id);
135         multihop_print_routing_table();
136     }
137 }
138
139 /*
140  * Request (my part of) the routing table from another monitor.
141  * This method blocks until a reply is received.
142  */
143 errval_t multihop_request_routing_table(struct intermon_binding *b)
144 {
145     errval_t err;
146
147     // request the routing table
148     err = b->tx_vtbl.multihop_routing_table_request(b, NOP_CONT, my_core_id);
149     if (err_is_fail(err)) {
150         return err;
151     }
152
153     // wait until we have received a reply
154     while (!saw_routing_table_response) {
155         messages_wait_and_handle_next();
156     }
157
158     return SYS_ERR_OK;
159 }
160
161 // handle request for a portion of the routing table from another monitor
162 static void multihop_handle_routing_table_request(struct intermon_binding *b,
163                                                   coreid_t core_id)
164 {
165     errval_t err;
166
167     if (routing_table != NULL && core_id <= routing_table_max_coreid
168         && routing_table[core_id] != NULL) {
169         // if we have a routing table, send routing table to other core
170         err = b->tx_vtbl.multihop_routing_table_response(b, NOP_CONT,
171                 SYS_ERR_OK, core_id, routing_table_max_coreid,
172                 routing_table[core_id], routing_table_max_coreid + 1);
173     } else {
174         // if we don't have a routing table, send an error reply
175         err = b->tx_vtbl.multihop_routing_table_response(b, NOP_CONT,
176                 MON_ERR_INCOMPLETE_ROUTE, core_id, routing_table_max_coreid,
177                 NULL, 0);
178     }
179
180     assert(err_is_ok(err)); // FIXME
181 }
182
183 // handle the response to a routing table request from the other monitor
184 static void multihop_handle_routing_table_response(struct intermon_binding *b,
185                                                    errval_t err,
186                                                    coreid_t source_coreid,
187                                                    coreid_t max_coreid,
188                                                    const coreid_t *to, size_t len)
189 {
190     assert(routing_table == NULL);
191     assert(source_coreid == my_core_id);
192
193     if (err_is_ok(err)) {
194         assert(to != NULL);
195         routing_table = calloc(((uintptr_t)max_coreid) + 1, sizeof(coreid_t *));
196         assert(routing_table != NULL);
197         routing_table_max_coreid = max_coreid;
198
199         assert(len == max_coreid + 1);
200         assert(source_coreid <= max_coreid);
201         routing_table[source_coreid] = memdup(to, len * sizeof(coreid_t));
202     } else {
203         if (err_no(err) != MON_ERR_INCOMPLETE_ROUTE) {
204             DEBUG_ERR(err, "unexpected error retrieving routing table");
205         }
206     }
207
208     saw_routing_table_response = true;
209 }
210
211 // grow the routing table to a set of desination cores, via a given forwarder
212 static void multihop_routing_table_grow(struct intermon_binding *b,
213                                         coreid_t forwarder,
214                                         const coreid_t *destinations,
215                                         size_t ndests)
216 {
217     assert(ndests > 0);
218
219     // check the max core ID in the destinations
220     coreid_t max_coreid = my_core_id;
221     for (unsigned i = 0; i < ndests; i++) {
222         if (destinations[i] > max_coreid) {
223             max_coreid = destinations[i];
224         }
225     }
226
227     // ensure we have an allocated routing table; if necessary, grow it
228     if (routing_table == NULL) {
229         routing_table = calloc(((uintptr_t)max_coreid) + 1, sizeof(coreid_t *));
230         assert(routing_table != NULL);
231         routing_table_max_coreid = max_coreid;
232     } else if (max_coreid > routing_table_max_coreid) {
233         for (unsigned i = 0; i <= routing_table_max_coreid; i++) {
234             if (routing_table[i] != NULL) {
235                 routing_table[i] = realloc(routing_table[i],
236                                            (((uintptr_t)max_coreid) + 1)
237                                            * sizeof(coreid_t));
238                 assert(routing_table[i] != NULL);
239                 // XXX: the default for the unconfigured part of the routing
240                 // table is direct routing
241                 for (unsigned j = routing_table_max_coreid + 1; j <= max_coreid; j++) {
242                     routing_table[i][j] = j;
243                 }
244             }
245         }
246
247         routing_table = realloc(routing_table, (((uintptr_t)max_coreid) + 1)
248                                                * sizeof(coreid_t *));
249         assert(routing_table != NULL);
250         memset(&routing_table[routing_table_max_coreid + 1], 0,
251                (max_coreid - routing_table_max_coreid) * sizeof(coreid_t *));
252         routing_table_max_coreid = max_coreid;
253     }
254
255     // ensure I have my own routes (the default is direct routing)
256     if (routing_table[my_core_id] == NULL) {
257         routing_table[my_core_id] = malloc((((uintptr_t)routing_table_max_coreid) + 1)
258                                            * sizeof(coreid_t));
259         assert(routing_table[my_core_id] != NULL);
260         for (unsigned i = 0; i <= routing_table_max_coreid; i++) {
261             routing_table[my_core_id][i] = i;
262         }
263     }
264
265     // update routes to destinations for all origins in my routing table and myself
266     for (unsigned src = 0; src <= routing_table_max_coreid; src++) {
267         if (routing_table[src] != NULL) {
268             for (unsigned i = 0; i < ndests; i++) {
269                 routing_table[src][destinations[i]] = routing_table[src][forwarder];
270             }
271         }
272     }
273 }
274
275 // return the next hop (based on the routing table)
276 static inline coreid_t get_next_hop(coreid_t dest)
277 {
278
279     assert(dest != my_core_id);
280
281     if (routing_table != NULL
282         && my_core_id <= routing_table_max_coreid
283         && dest <= routing_table_max_coreid
284         && routing_table[my_core_id] != NULL) {
285         // if we have a routing table, look up next hop
286         return routing_table[my_core_id][dest];
287     } else {
288         // if we don't have a routing table, route directly
289         return dest;
290     }
291 }
292
293 ///////////////////////////////////////////////////////
294
295 // FORWARDING (HASH) TABLE
296
297 ///////////////////////////////////////////////////////
298
299 /**
300  * Messages are forwarded based on the forwarding table.
301  * We use a hash table to map virtual circuit identifiers (VCIs)
302  * to a pointer to the channel state.
303  */
304 static collections_hash_table *forwarding_table;
305
306 // is forwarding table initialized?
307 static bool is_forwarding_table_initialized = false;
308
309 struct monitor_multihop_chan_state;
310
311 // initialize the forwarding table
312 static inline void init_forwarding_table(void)
313 {
314
315     if (!is_forwarding_table_initialized) {
316         is_forwarding_table_initialized = true;
317         collections_hash_create_with_buckets(&forwarding_table,
318                 MULTIHOP_FORWARDING_TABLE_BUCKETS, free);
319
320         /**
321          * We initialize the random function with the current time stamp
322          * in order to make assigned VCIs unpredictable. This makes it hard
323          * for an attacker that sends message with manipulated VCIs to actually
324          * find a valid VCI.
325          */
326         srand(bench_tsc());
327     }
328 }
329
330 // insert entry in forwarding table and return VCI
331 static inline multihop_vci_t forwarding_table_insert(
332         struct monitor_multihop_chan_state *chan_state)
333 {
334
335     assert(chan_state != NULL);
336     multihop_vci_t vci;
337
338     // we call initialize before we insert an entry
339     init_forwarding_table();
340
341     do {
342         // we assign VCIs randomly, but need to
343         // make sure, that it is not yet taken
344         vci = (multihop_vci_t) rand();
345     } while (collections_hash_find(forwarding_table, vci) != NULL);
346
347     // insert into forwarding table
348     collections_hash_insert(forwarding_table, vci, chan_state);
349     return vci;
350 }
351
352 // delete entry from forwarding table
353 static inline void forwarding_table_delete(multihop_vci_t vci)
354 {
355     assert(is_forwarding_table_initialized);
356     collections_hash_delete(forwarding_table, vci);
357 }
358
359 // get entry from the forwarding table
360 static inline struct monitor_multihop_chan_state* forwarding_table_lookup(
361         multihop_vci_t vci)
362 {
363
364     assert(is_forwarding_table_initialized);
365     struct monitor_multihop_chan_state *chan_state = collections_hash_find(forwarding_table,
366             vci);
367
368     if (chan_state == NULL) {
369         USER_PANIC("invalid virtual circuit identifier in multi-hop channel");
370     }
371     return chan_state;
372 }
373
374 ///////////////////////////////////////////////////////
375
376 // STRUCT FOR THE PER - CHANNEL STATE
377
378 ///////////////////////////////////////////////////////
379
380 struct monitor_multihop_chan_state {
381     struct direction {
382         enum {
383             MULTIHOP_ENDPOINT, // if this is an endpoint, the communication partner is on the same core
384             MULTIHOP_NODE
385         // communication partner is a monitor on another core
386         } type;
387
388         multihop_vci_t vci; // the virtual circuit identifier to use on outgoing messages
389
390         // bindings to the "next hop"
391         union {
392             struct monitor_binding *monitor_binding; // used at endpoints to identify the dispatcher
393             struct intermon_binding *intermon_binding; // monitor binding of next hop
394         } binding;
395     } dir1, dir2;
396
397     // temporary storage for a virtual circuit identifier
398     multihop_vci_t tmp_vci;
399
400     // connection state
401     enum {
402         MONTIOR_MULTIHOP_DISCONNECTED, // Disconnected
403         MONITOR_MULTIHOP_BIND_WAIT, // Waiting for a bind reply message
404         MONITOR_MULTIHOP_CONNECTED,
405     // Connection established
406     } connstate;
407 };
408
409 // get the direction
410 static inline struct direction* multihop_get_direction(
411         struct monitor_multihop_chan_state *chan_state, uint8_t direction)
412 {
413     if (direction == 1) {
414         return &chan_state->dir1;
415     } else if (direction == 2) {
416         return &chan_state->dir2;
417     } else {
418         USER_PANIC("unknown direction in multihop channel: %d", direction);
419         return NULL;
420     }
421 }
422
423 // get the opposite direction
424 static inline uint8_t multihop_get_opposite_direction(
425         struct monitor_multihop_chan_state *chan_state, uint8_t direction,
426         struct direction **dir)
427 {
428     if (direction == 2) {
429         *dir = &chan_state->dir1;
430         return 1;
431     } else if (direction == 1) {
432         *dir = &chan_state->dir2;
433         return 2;
434     } else {
435         USER_PANIC("unknown direction in multihop channel: %d", direction);
436         return 0;
437     }
438 }
439
440 ////////////////////////////////////////////////////////////
441
442 // MULTI-HOP CHANNEL SETUP
443
444 ////////////////////////////////////////////////////////////
445
446 static void
447 multihop_monitor_bind_request_busy_cont(struct intermon_binding *b,
448         struct intermon_msg_queue_elem *e);
449
450 static void
451 multihop_monitor_bind_request_cont(
452         struct monitor_multihop_chan_state *chan_state, iref_t iref,
453         coreid_t core);
454
455 static void
456 multihop_bind_service_request(uintptr_t service_id,
457         struct monitor_multihop_chan_state *chan_state);
458
459 static void
460 multihop_intermon_bind_reply_cont(struct intermon_binding *intermon_binding,
461         multihop_vci_t receiver_vci, multihop_vci_t sender_vci, errval_t msgerr);
462
463 static void
464 multihop_monitor_bind_reply_client(struct monitor_binding *domain_binding,
465         multihop_vci_t receiver_vci, multihop_vci_t sender_vci, errval_t msgerr);
466
467 static inline void
468 multihop_monitor_request_error(struct monitor_multihop_chan_state *chan_state,
469         errval_t msgerr);
470
471 /**
472  * \brief This method handles a bind request message from a local dispatcher
473  *
474  * \param b The monitor binding
475  * \param iref The iref of the service
476  * \param vci The vci of the local dispatcher (this vci should be used for messages sent to the dispatcher)
477  */
478 static void multihop_monitor_bind_request_handler(struct monitor_binding *b,
479         iref_t iref, multihop_vci_t vci)
480 {
481
482     errval_t err;
483     coreid_t core_id;
484     struct monitor_multihop_chan_state *chan_state = NULL;
485
486     MULTIHOP_DEBUG(
487             "monitor on core %d received a bind multi-hop message from a local dispatcher, iref: %d\n", my_core_id, (int) iref);
488
489     // Look up core_id from the iref
490     err = iref_get_core_id(iref, &core_id);
491     if (err_is_fail(err)) {
492         debug_err(__FILE__, __func__, __LINE__, err, "iref_get_core_id failed");
493         multihop_monitor_bind_reply_client(b, vci, 0, err); // send back error message
494         return;
495     }
496
497     // allocate local state for the connection
498     chan_state = malloc(sizeof(struct monitor_multihop_chan_state));
499     assert(chan_state != NULL);
500     chan_state->connstate = MONITOR_MULTIHOP_BIND_WAIT;
501     chan_state->dir2.type = MULTIHOP_ENDPOINT;
502     chan_state->dir2.vci = vci;
503     chan_state->dir2.binding.monitor_binding = b;
504
505     // get a virtual circuit identifier (VCI) for this channel
506     // and insert mapping into forwarding table
507     chan_state->tmp_vci = forwarding_table_insert(chan_state);
508
509     // make sure that service is not on same core as the client
510     if (core_id == my_core_id) {
511         multihop_monitor_request_error(chan_state,
512                 LIB_ERR_BIND_MULTIHOP_SAME_CORE);
513         forwarding_table_delete(chan_state->tmp_vci);
514         return;
515     }
516
517     // determine where to forward the message
518     coreid_t next_hop = get_next_hop(core_id);
519
520     // Get connection to the monitor to forward request to
521     err = intermon_binding_get(next_hop,
522             &chan_state->dir1.binding.intermon_binding);
523     if (err_is_fail(err)) {
524         debug_err(__FILE__, __func__, __LINE__, err,
525                 "intermon_binding_get failed");
526         multihop_monitor_request_error(chan_state, err);
527         forwarding_table_delete(chan_state->tmp_vci);
528         return;
529     }
530
531     // call continuation function
532     multihop_monitor_bind_request_cont(chan_state, iref, core_id);
533 }
534
535 struct multihop_monitor_bind_request_state {
536     struct intermon_msg_queue_elem elem;
537     struct monitor_multihop_chan_state *chan_state;
538     iref_t iref;
539     coreid_t core;
540 };
541
542 // called when channel is no longer busy
543 static void multihop_monitor_bind_request_busy_cont(struct intermon_binding *b,
544         struct intermon_msg_queue_elem *e)
545 {
546     struct multihop_monitor_bind_request_state *st =
547             (struct multihop_monitor_bind_request_state *) e;
548     multihop_monitor_bind_request_cont(st->chan_state, st->iref, st->core);
549     free(e);
550 }
551
552 /**
553  * \brief Sends a bind request to the "next hop"
554  *
555  * \param chan_state pointer to the channel state
556  * \param iref the iref of the service
557  * \param core core ID of the service
558  */
559 static void multihop_monitor_bind_request_cont(
560         struct monitor_multihop_chan_state *chan_state, iref_t iref,
561         coreid_t core)
562 {
563
564     errval_t err;
565     struct intermon_binding *mon_binding =
566             chan_state->dir1.binding.intermon_binding;
567
568     // send request to next hop
569     err = mon_binding->tx_vtbl.bind_multihop_intermon_request(mon_binding,
570             NOP_CONT, iref, chan_state->tmp_vci, core);
571
572     if (err_is_fail(err)) {
573         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
574             struct multihop_monitor_bind_request_state *me = malloc(
575                     sizeof(struct multihop_monitor_bind_request_state));
576             struct intermon_state *ist = mon_binding->st;
577             me->chan_state = chan_state;
578             me->iref = iref;
579             me->elem.cont = multihop_monitor_bind_request_busy_cont;
580
581             err = intermon_enqueue_send(mon_binding, &ist->queue,
582                     get_default_waitset(), &me->elem.queue);
583             assert(err_is_ok(err));
584             return;
585         }
586         // return error code to client
587         multihop_monitor_request_error(chan_state, err);
588         forwarding_table_delete(chan_state->tmp_vci);
589     }
590 }
591
592 /**
593  * \brief Handles a bind request from another monitor
594  *
595  * \param b intermonitor binding
596  * \param iref the iref of the service
597  * \param vci The vci to use
598  * \param the core ID of the service
599  */
600 static void multihop_intermon_bind_request_handler(struct intermon_binding *b,
601         iref_t iref, multihop_vci_t vci, coreid_t core)
602 {
603     errval_t err;
604
605     MULTIHOP_DEBUG(
606             "monitor on core %d received multi-hop bind request with vci %d\n", my_core_id, (int) vci);
607
608     // allocate channel state & fill in needed information
609     struct monitor_multihop_chan_state *chan_state = malloc(
610             sizeof(struct monitor_multihop_chan_state));
611     chan_state->connstate = MONITOR_MULTIHOP_BIND_WAIT;
612     chan_state->dir2.vci = vci;
613     chan_state->dir2.binding.intermon_binding = b;
614     chan_state->dir2.type = MULTIHOP_NODE;
615     chan_state->tmp_vci = forwarding_table_insert(chan_state);
616
617     if (core == my_core_id) {
618         // service is on same core than this monitor, therefore we forward to local dispatcher
619
620         // get the service's connection
621         err = iref_get_binding(iref, &chan_state->dir1.binding.monitor_binding);
622         if (err_is_fail(err)) {
623             USER_PANIC_ERR(err,
624                     "Multihop set-up: could not get domain-binding for iref");
625         }
626
627         // get the service id
628         uintptr_t service_id = 0;
629         err = iref_get_service_id(iref, &service_id);
630         if (err_is_fail(err)) {
631             USER_PANIC_ERR(err,
632                     "Multihop set-up: could not get service id for iref");
633         }
634
635         // forward request to service
636         multihop_bind_service_request(service_id, chan_state);
637
638     } else {
639         // we have to forward the request to another monitor
640         // we get the core id of the next hop from the routing table
641         coreid_t next_hop = get_next_hop(core);
642
643         // get connection to the "next-hop" monitor
644         err = intermon_binding_get(next_hop,
645                 &chan_state->dir1.binding.intermon_binding);
646         if (err_is_fail(err)) {
647             debug_err(__FILE__, __func__, __LINE__, err,
648                     "intermon_binding_get failed");
649             multihop_monitor_request_error(chan_state, err);
650             forwarding_table_delete(chan_state->tmp_vci);
651             return;
652         }
653
654         // send request to next hop
655         multihop_monitor_bind_request_cont(chan_state, iref, core);
656     }
657 }
658
659 // used if channel is busy while sending request to service
660 struct multihop_bind_service_request_state {
661     struct monitor_msg_queue_elem elem;
662     uintptr_t service_id;
663     struct monitor_multihop_chan_state *chan_state;
664 };
665
666 // used when channel is no longer busy
667 static void multihop_bind_service_busy_cont(struct monitor_binding *b,
668         struct monitor_msg_queue_elem *e)
669 {
670     struct multihop_bind_service_request_state *st =
671             (struct multihop_bind_service_request_state *) e;
672     multihop_bind_service_request(st->service_id, st->chan_state);
673     free(e);
674 }
675
676 /**
677  * \brief Forward bind request to service's dispatcher
678  *
679  * \param domain_binding binding to service
680  * \param service_id Id of the service
681  * \param vci my vci
682  */
683 static void multihop_bind_service_request(uintptr_t service_id,
684         struct monitor_multihop_chan_state *chan_state)
685 {
686     errval_t err;
687     MULTIHOP_DEBUG(
688             "monitor on core %d is forwarding bind request to local dispatcher...\n", my_core_id);
689     err =
690             chan_state->dir1.binding.monitor_binding->tx_vtbl.multihop_bind_service_request(
691                     chan_state->dir1.binding.monitor_binding, NOP_CONT,
692                     service_id, chan_state->tmp_vci);
693     if (err_is_fail(err)) {
694         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
695             struct monitor_binding *monitor_binding =
696                     chan_state->dir1.binding.monitor_binding;
697             struct multihop_bind_service_request_state *me = malloc(
698                     sizeof(struct multihop_bind_service_request_state));
699             struct monitor_state *ist = monitor_binding->st;
700             me->service_id = service_id;
701             me->chan_state = chan_state;
702             me->elem.cont = &multihop_bind_service_busy_cont;
703
704             err = monitor_enqueue_send(monitor_binding, &ist->queue,
705                     get_default_waitset(), &me->elem.queue);
706             assert(err_is_ok(err));
707             return;
708         }
709
710         // return error code to client
711         multihop_monitor_request_error(chan_state, err);
712         forwarding_table_delete(chan_state->tmp_vci);
713     }
714 }
715
716 /**
717  * \brief Handle a reply message coming from service's dispatcher
718  *
719  * \param mon_binding Binding to service's dispatcher
720  * \param my_vci my virtual circuit identifier
721  * \param sender_vci virtual circuit identifier of the sender
722  * \param msgerr error code
723  */
724 static void multihop_monitor_service_bind_reply_handler(
725         struct monitor_binding *mon_binding, multihop_vci_t receiver_vci,
726         multihop_vci_t sender_vci, errval_t msgerr)
727 {
728     MULTIHOP_DEBUG(
729             "monitor on core %d received bind reply message. Status: %s. my_vci: %d\n", my_core_id, err_is_ok(msgerr) ? "success" : "failed", (int) receiver_vci);
730
731     struct monitor_multihop_chan_state *chan_state = forwarding_table_lookup(
732             receiver_vci);
733
734     assert(chan_state->connstate == MONITOR_MULTIHOP_BIND_WAIT);
735
736     multihop_vci_t next_receiver_vci = chan_state->dir2.vci;
737     struct intermon_binding *next_hop_binding =
738             chan_state->dir2.binding.intermon_binding;
739     if (err_is_ok(msgerr)) { /* bind succeeded */
740         chan_state->dir1.type = MULTIHOP_ENDPOINT;
741         chan_state->dir1.vci = sender_vci;
742         chan_state->dir1.binding.monitor_binding = mon_binding;
743         chan_state->connstate = MONITOR_MULTIHOP_CONNECTED;
744     } else {
745         // delete entry from forwarding table
746         forwarding_table_delete(receiver_vci);
747     }
748
749     // (stack-ripped) forward reply to next monitor
750     multihop_intermon_bind_reply_cont(next_hop_binding, next_receiver_vci,
751             receiver_vci, msgerr);
752 }
753
754 struct multihop_intermon_bind_reply_state {
755     struct intermon_msg_queue_elem elem;
756     struct intermon_bind_multihop_intermon_reply__tx_args args;
757 };
758
759 // called when channel is no longer busy
760 static void multihop_intermon_bind_reply_busy_cont(struct intermon_binding *b,
761         struct intermon_msg_queue_elem *e)
762 {
763     struct multihop_intermon_bind_reply_state *st =
764             (struct multihop_intermon_bind_reply_state *) e;
765     multihop_intermon_bind_reply_cont(b, st->args.receiver_vci,
766             st->args.sender_vci, st->args.err);
767     free(e);
768 }
769
770 /**
771  * \brief Forward a bind reply message to the next monitor
772  *
773  * \param intermon_binding binding to the next monitor
774  */
775 static void multihop_intermon_bind_reply_cont(
776         struct intermon_binding *intermon_binding, multihop_vci_t receiver_vci,
777         multihop_vci_t sender_vci, errval_t msgerr)
778 {
779     errval_t err;
780     MULTIHOP_DEBUG("monitor on core %d is forwarding reply\n", my_core_id);
781     err = intermon_binding->tx_vtbl.bind_multihop_intermon_reply(
782             intermon_binding, NOP_CONT, receiver_vci, sender_vci, msgerr);
783     if (err_is_fail(err)) {
784         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
785             struct multihop_intermon_bind_reply_state *me = malloc(
786                     sizeof(struct multihop_intermon_bind_reply_state));
787             struct intermon_state *ist = intermon_binding->st;
788             me->args.sender_vci = sender_vci;
789             me->args.receiver_vci = receiver_vci;
790             me->args.err = msgerr;
791             me->elem.cont = multihop_intermon_bind_reply_busy_cont;
792
793             err = intermon_enqueue_send(intermon_binding, &ist->queue,
794                     get_default_waitset(), &me->elem.queue);
795             assert(err_is_ok(err));
796             return;
797         }USER_PANIC_ERR(err,
798                 "Could not forward bind reply message in multi-hop channel");
799     }
800 }
801
802 /**
803  * \brief Handles a reply message from another monitor
804  *
805  * \param binding Binding to the other monitor
806  * \param my_vci My virtual circuit identifier
807  * \param sender_vci virtual circuit identifier of the sender
808  * \param msgerr error code
809  */
810 static void multihop_intermon_bind_reply_handler(
811         struct intermon_binding *binding, multihop_vci_t receiver_vci,
812         multihop_vci_t sender_vci, errval_t msgerr)
813 {
814     MULTIHOP_DEBUG(
815             "monitor on core %d has received a bind reply\n", my_core_id);
816     struct monitor_multihop_chan_state *chan_state = forwarding_table_lookup(
817             receiver_vci);
818
819     assert(chan_state->connstate == MONITOR_MULTIHOP_BIND_WAIT);
820
821     if (err_is_ok(msgerr)) {
822         chan_state->dir1.type = MULTIHOP_NODE;
823         chan_state->dir1.binding.intermon_binding = binding;
824         chan_state->dir1.vci = sender_vci;
825         chan_state->connstate = MONITOR_MULTIHOP_CONNECTED;
826
827         if (chan_state->dir2.type == MULTIHOP_NODE) {
828             multihop_intermon_bind_reply_cont(
829                     chan_state->dir2.binding.intermon_binding,
830                     chan_state->dir2.vci, receiver_vci, msgerr);
831         } else {
832             multihop_monitor_bind_reply_client(
833                     chan_state->dir2.binding.monitor_binding,
834                     chan_state->dir2.vci, receiver_vci, msgerr);
835         }
836     } else {
837
838         // connection was refused
839
840         if (chan_state->dir2.type == MULTIHOP_NODE) {
841             multihop_intermon_bind_reply_cont(
842                     chan_state->dir2.binding.intermon_binding,
843                     chan_state->dir2.vci, 0, msgerr);
844         } else {
845             multihop_monitor_bind_reply_client(
846                     chan_state->dir2.binding.monitor_binding,
847                     chan_state->dir2.vci, 0, msgerr);
848         }
849
850         // delete entry from forwarding table
851         forwarding_table_delete(receiver_vci);
852     }
853 }
854
855 struct multihop_monitor_bind_reply_state {
856     struct monitor_msg_queue_elem elem;
857     struct monitor_multihop_bind_client_reply__tx_args args;
858 };
859
860 // continue function to forward a message to a dispatcher
861 static void multihop_monitor_bind_reply_busy_cont(struct monitor_binding *b,
862         struct monitor_msg_queue_elem *e)
863 {
864     struct multihop_monitor_bind_reply_state *st =
865             (struct multihop_monitor_bind_reply_state *) e;
866     multihop_monitor_bind_reply_client(b, st->args.receiver_vci,
867             st->args.sender_vci, st->args.err);
868     free(e);
869 }
870
871 /**
872  * \brief Send a reply to the dispatcher who originally sent the request
873  *
874  * \param domain_binding The monitor_binding to use
875  * \param receiver_vci The VCI of the receiver
876  * \param sender_vci The VCI of the sender
877  * \param msgerr The error code
878  */
879 static void multihop_monitor_bind_reply_client(
880         struct monitor_binding *domain_binding, multihop_vci_t receiver_vci,
881         multihop_vci_t sender_vci, errval_t msgerr)
882 {
883     errval_t err;
884     MULTIHOP_DEBUG(
885             "monitor on core %d is sending reply to dispatcher\n", my_core_id);
886     err = domain_binding->tx_vtbl.multihop_bind_client_reply(domain_binding,
887             NOP_CONT, receiver_vci, sender_vci, msgerr);
888     if (err_is_fail(err)) {
889         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
890             struct multihop_monitor_bind_reply_state *me = malloc(
891                     sizeof(struct multihop_monitor_bind_reply_state));
892             assert(me != NULL);
893             struct monitor_state *ist = domain_binding->st;
894             me->args.receiver_vci = receiver_vci;
895             me->args.sender_vci = sender_vci;
896             me->args.err = msgerr;
897             me->elem.cont = multihop_monitor_bind_reply_busy_cont;
898
899             err = monitor_enqueue_send(domain_binding, &ist->queue,
900                     get_default_waitset(), &me->elem.queue);
901             assert(err_is_ok(err));
902             return;
903
904         }
905
906         USER_PANIC_ERR(
907                 err,
908                 "Could not forward bind reply to client's dispatcher in multi-hop channel");
909     }
910 }
911
912 /**
913  * \brief send an error code back
914  *
915  */
916 static inline void multihop_monitor_request_error(
917         struct monitor_multihop_chan_state *chan_state, errval_t msgerr)
918 {
919     assert(chan_state != NULL);
920     if (chan_state->dir2.type == MULTIHOP_NODE) {
921         multihop_intermon_bind_reply_cont(
922                 chan_state->dir2.binding.intermon_binding, chan_state->dir2.vci,
923                 0, msgerr);
924     } else {
925         multihop_monitor_bind_reply_client(
926                 chan_state->dir2.binding.monitor_binding, chan_state->dir2.vci,
927                 0, msgerr);
928     }
929 }
930
931 ///////////////////////////////////////////////////////
932
933 // MESSAGE FORWARDING
934
935 ///////////////////////////////////////////////////////
936
937 static inline void multihop_message_monitor_forward(struct monitor_binding *b,
938         multihop_vci_t vci, uint8_t direction, uint8_t flags, uint32_t ack,
939         const uint8_t *payload, size_t size, bool first_try);
940
941 static void multihop_message_forward_continue(struct monitor_binding *b,
942         struct monitor_msg_queue_elem *e);
943
944 static inline void multihop_message_intermon_forward(struct intermon_binding *b,
945         multihop_vci_t vci, uint8_t direction, uint8_t flags, uint32_t ack,
946         const uint8_t *payload, size_t size, bool first_try);
947
948 static void multihop_message_intermon_forward_cont(struct intermon_binding *b,
949         struct intermon_msg_queue_elem *e);
950
951 // monitor message forwarding state
952 struct monitor_multihop_message_forwarding_state {
953     struct monitor_msg_queue_elem elem;
954     struct monitor_multihop_message__rx_args args;
955 };
956
957 // inter-monitor forwarding state
958 struct intermon_message_forwarding_state {
959     struct intermon_msg_queue_elem elem;
960     struct intermon_multihop_message__rx_args args;
961 };
962
963 /**
964  * \brief Handle a multi-hop message coming from a local dispatcher.
965  *        The message must be forwarded to the next hop.
966  *
967  * \param mon_binding the monitor binding
968  * \param vci the virtual circuit identifier of the message
969  * \param direction direction of the message
970  * \param flags message flags
971  * \param ack number of messages acknowledged with this message
972  * \param payload pointer to the message payload
973  * \size size of the message payload
974  *
975  */
976 static void multihop_message_handler(struct monitor_binding *mon_binding,
977         multihop_vci_t vci, uint8_t direction, uint8_t flags, uint32_t ack,
978         const uint8_t *payload, size_t size)
979 {
980
981     MULTIHOP_DEBUG(
982             "monitor on core %d received multi-hop message (from local dispatcher). VCI %llu, direction %d, flags %d, ack %d\n", my_core_id, (unsigned long long) vci, direction, flags, ack);
983
984     // get forwarding information
985     errval_t err;
986     struct monitor_multihop_chan_state *chan_state = forwarding_table_lookup(
987             vci);
988     struct direction *dir = multihop_get_direction(chan_state, direction);
989     struct intermon_binding *b = dir->binding.intermon_binding;
990
991     struct intermon_state *ist = b->st;
992     if (msg_queue_is_empty(&ist->queue)) {
993
994         // if the message queue is empty, we can directly forward
995         // the message
996         multihop_message_intermon_forward(b, dir->vci, direction, flags, ack,
997                 payload, size, true);
998     } else {
999         // if the message queue is not empty, we have to
1000         // enqueue the message (to make sure we do not bypass
1001         // other messages)
1002         struct intermon_message_forwarding_state *me = malloc(
1003                 sizeof(struct intermon_message_forwarding_state));
1004         me->args.vci = dir->vci;
1005         me->args.direction = direction;
1006         me->args.flags = flags;
1007         me->args.ack = ack;
1008         memcpy(me->args.payload, payload, size);
1009
1010         me->args.size = size;
1011         me->elem.cont = multihop_message_intermon_forward_cont;
1012
1013         err = intermon_enqueue_send(b, &ist->queue, get_default_waitset(),
1014                 &me->elem.queue);
1015         assert(err_is_ok(err));
1016     }
1017 }
1018
1019 // continue function for intermonitor message forwarding
1020 static void multihop_message_intermon_forward_cont(struct intermon_binding *b,
1021         struct intermon_msg_queue_elem *e)
1022 {
1023
1024     struct intermon_message_forwarding_state *st =
1025             (struct intermon_message_forwarding_state *) e;
1026
1027     multihop_message_intermon_forward(b, st->args.vci, st->args.direction,
1028             st->args.flags, st->args.ack, st->args.payload, st->args.size,
1029             false);
1030     free(e);
1031 }
1032
1033 /**
1034  * \brief Forward a message to another monitor.
1035  *
1036  */
1037 static inline void multihop_message_intermon_forward(struct intermon_binding *b,
1038         multihop_vci_t vci, uint8_t direction, uint8_t flags, uint32_t ack,
1039         const uint8_t *payload, size_t size, bool first_try)
1040 {
1041
1042     errval_t err;
1043
1044     // try to forward message
1045     err = b->tx_vtbl.multihop_message(b, NOP_CONT, vci, direction,
1046             flags, ack, payload, size);
1047
1048     if (err_is_fail(err)) {
1049         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
1050             struct intermon_message_forwarding_state *me = malloc(
1051                     sizeof(struct intermon_message_forwarding_state));
1052             struct intermon_state *ist = b->st;
1053             me->args.vci = vci;
1054             me->args.direction = direction;
1055             me->args.flags = flags;
1056             me->args.ack = ack;
1057             memcpy(me->args.payload, payload, size);
1058
1059             me->args.size = size;
1060             me->elem.cont = multihop_message_intermon_forward_cont;
1061
1062             if (first_try) {
1063                 // if this is the first time that we try to send this message
1064                 // we can enqueue it at the back of the message queue
1065                 err = intermon_enqueue_send(b, &ist->queue,
1066                         get_default_waitset(), &me->elem.queue);
1067             } else {
1068                 // if this is NOT the first time that we try to send this message
1069                 // we have to enqueue it at the FRONT to make sure that the
1070                 // original message order is preserved
1071                 err = intermon_enqueue_send_at_front(b, &ist->queue,
1072                         get_default_waitset(), &me->elem.queue);
1073             }
1074
1075             assert(err_is_ok(err));
1076             return;
1077         }
1078
1079         USER_PANIC_ERR(err, "Could not forward multi-hop message\n");
1080     }
1081 }
1082
1083 /**
1084  * \brief Handle a message coming from another monitor. We have
1085  *        to forward the message either to another monitor or
1086  *        to a dispatcher
1087  *
1088  * \param mon_binding the monitor binding
1089  * \param vci the virtual circuit identifier of the message
1090  * \param direction direction of the message
1091  * \param flags message flags
1092  * \param ack number of messages acknowledged with this message
1093  * \param payload pointer to the message payload
1094  * \size size of the message payload
1095  */
1096 static void intermon_multihop_message_handler(struct intermon_binding *binding,
1097         multihop_vci_t vci, uint8_t direction, uint8_t flags, uint32_t ack,
1098         const uint8_t *payload, size_t size)
1099 {
1100
1101     MULTIHOP_DEBUG(
1102             "monitor on core %d received multi-hop message (from other monitor). VCI %llu, direction %d, flags %d, ack %d\n", my_core_id, (unsigned long long) vci, direction, flags, ack);
1103
1104     errval_t err;
1105     struct monitor_multihop_chan_state *chan_state = forwarding_table_lookup(
1106             vci);
1107     struct direction *dir = multihop_get_direction(chan_state, direction);
1108
1109     if (dir->type == MULTIHOP_ENDPOINT) {
1110         // we have to forward the message to a local dispatcher
1111         struct monitor_binding *b = dir->binding.monitor_binding;
1112         struct monitor_state *ist = b->st;
1113
1114         if (msg_queue_is_empty(&ist->queue)) {
1115             // if the message queue is empty, we can directly forward
1116             // the message
1117             multihop_message_monitor_forward(b, dir->vci, direction, flags, ack,
1118                     payload, size, true);
1119         } else {
1120             // if the message queue is not empty, we have to
1121             // enqueue the message (to make sure we do not bypass
1122             // other messages)
1123             struct monitor_multihop_message_forwarding_state *me = malloc(
1124                     sizeof(struct monitor_multihop_message_forwarding_state));
1125             assert(me != NULL);
1126             me->args.vci = dir->vci;
1127             me->args.direction = direction;
1128             me->args.flags = flags;
1129             me->args.ack = ack;
1130             memcpy(me->args.payload, payload, size);
1131
1132             me->args.size = size;
1133             me->elem.cont = multihop_message_forward_continue;
1134
1135             err = monitor_enqueue_send(b, &ist->queue, get_default_waitset(),
1136                     &me->elem.queue);
1137             assert(err_is_ok(err));
1138         }
1139         return;
1140     } else {
1141         // we have to forward the message to the next hop (--> another monitor)
1142         struct intermon_binding *b = dir->binding.intermon_binding;
1143         struct intermon_state *ist = b->st;
1144
1145         if (msg_queue_is_empty(&ist->queue)) {
1146             // message queue is empty --> send directly
1147             multihop_message_intermon_forward(b, dir->vci, direction, flags,
1148                     ack, payload, size, true);
1149         } else {
1150             // enqueue message
1151             struct intermon_message_forwarding_state *me = malloc(
1152                     sizeof(struct intermon_message_forwarding_state));
1153             me->args.vci = dir->vci;
1154             me->args.direction = direction;
1155             me->args.flags = flags;
1156             me->args.ack = ack;
1157             memcpy(me->args.payload, payload, size);
1158
1159             me->args.size = size;
1160             me->elem.cont = multihop_message_intermon_forward_cont;
1161
1162             err = intermon_enqueue_send(b, &ist->queue, get_default_waitset(),
1163                     &me->elem.queue);
1164             assert(err_is_ok(err));
1165         }
1166         return;
1167     }
1168 }
1169
1170 // continue function to forward a message to a dispatcher
1171 static void multihop_message_forward_continue(struct monitor_binding *b,
1172         struct monitor_msg_queue_elem *e)
1173 {
1174
1175     struct monitor_multihop_message_forwarding_state *st =
1176             (struct monitor_multihop_message_forwarding_state *) e;
1177
1178     multihop_message_monitor_forward(b, st->args.vci, st->args.direction,
1179             st->args.flags, st->args.ack, st->args.payload, st->args.size,
1180             false);
1181     free(e);
1182 }
1183
1184 /**
1185  * \brief Forward a message to a dispatcher
1186  */
1187 static inline void multihop_message_monitor_forward(struct monitor_binding *b,
1188         multihop_vci_t vci, uint8_t direction, uint8_t flags, uint32_t ack,
1189         const uint8_t *payload, size_t size, bool first_try)
1190 {
1191
1192     errval_t err;
1193
1194     // try to forward message
1195     err = b->tx_vtbl.multihop_message(b, NOP_CONT, vci, direction,
1196             flags, ack, payload, size);
1197
1198     if (err_is_fail(err)) {
1199         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
1200             struct monitor_multihop_message_forwarding_state *me = malloc(
1201                     sizeof(struct monitor_multihop_message_forwarding_state));
1202             assert(me != NULL);
1203             struct monitor_state *ist = b->st;
1204             me->args.vci = vci;
1205             me->args.direction = direction;
1206             me->args.flags = flags;
1207             me->args.ack = ack;
1208             memcpy(me->args.payload, payload, size);
1209
1210             me->args.size = size;
1211             me->elem.cont = multihop_message_forward_continue;
1212
1213             if (first_try) {
1214                 err = monitor_enqueue_send(b, &ist->queue,
1215                         get_default_waitset(), &me->elem.queue);
1216             } else {
1217                 err = monitor_enqueue_send_at_front(b, &ist->queue,
1218                         get_default_waitset(), &me->elem.queue);
1219             }
1220
1221             assert(err_is_ok(err));
1222             return;
1223         }
1224
1225         USER_PANIC_ERR(err, "failed forwarding multihop message\n");
1226     }
1227 }
1228
1229 ///////////////////////////////////////////////////////
1230
1231 // CAPABILITY FORWARDING
1232
1233 ///////////////////////////////////////////////////////
1234
1235 static void multihop_cap_send_intermon_forward_cont(struct intermon_binding *b,
1236         struct intermon_msg_queue_elem *e);
1237
1238 static inline void multihop_cap_send_intermon_forward(
1239         struct intermon_binding *b, multihop_vci_t vci, uint8_t direction,
1240         uint32_t capid, errval_t msgerr, intermon_caprep_t caprep, bool null_cap,
1241         coreid_t owner);
1242
1243 static void multihop_cap_send_forward_cont(struct monitor_binding *b,
1244         struct monitor_msg_queue_elem *e);
1245
1246 inline static void multihop_cap_send_forward(struct monitor_binding *b,
1247         multihop_vci_t vci, uint8_t direction, uint32_t capid, errval_t msgerr,
1248         struct capref cap);
1249
1250 // intermonitor capability forwarding state
1251 struct multihop_intermon_capability_forwarding_state {
1252     struct intermon_msg_queue_elem elem;
1253     struct intermon_multihop_cap_send__tx_args args;
1254 };
1255
1256 // monitor capability forwarding state
1257 struct multihop_capability_forwarding_state {
1258     struct monitor_msg_queue_elem elem;
1259     struct monitor_multihop_cap_send__tx_args args;
1260 };
1261
1262 /**
1263  * \brief Handle capability send request from local monitor.
1264  *        The capability must be forwarded to the next hop.
1265  *
1266  * \param monitor_binding
1267  * \param vci the virtual circuit identifier (VCI)
1268  * \param direction the direction
1269  * \param cap reference to the capability
1270  * \capid ID of the capability
1271  *
1272  */
1273 static void multihop_cap_send_request_handler(
1274         struct monitor_binding *monitor_binding, multihop_vci_t vci,
1275         uint8_t direction, errval_t msgerr, struct capref cap, uint32_t capid)
1276 {
1277
1278     MULTIHOP_DEBUG(
1279             "monitor on core %d received a capability (from local dispatcher). VCI %llu, direction %d, cap ID %d\n", my_core_id, (unsigned long long) vci, direction, capid);
1280
1281     errval_t err;
1282     struct capability capability;
1283     intermon_caprep_t caprep;
1284     coreid_t capowner;
1285     memset(&caprep, 0, sizeof(caprep));
1286     bool null_cap = capref_is_null(cap);
1287
1288     // XXX: this field is ignored when the local dispatcher originates the cap
1289     msgerr = SYS_ERR_OK;
1290
1291     // get forwarding information
1292     struct monitor_multihop_chan_state *chan_state = forwarding_table_lookup(
1293             vci);
1294     struct direction *dir = multihop_get_direction(chan_state, direction);
1295     struct intermon_binding *b = dir->binding.intermon_binding;
1296
1297     if (!null_cap) {
1298         // get binary representation of capability
1299         err = monitor_cap_identify(cap, &capability);
1300         if (err_is_fail(err)) {
1301             USER_PANIC_ERR(err, "monitor_cap_identify failed, ignored");
1302             return;
1303         }
1304
1305         err = monitor_get_domcap_owner(get_cap_domref(cap), &capowner);
1306         if (err_is_fail(err)) {
1307             USER_PANIC_ERR(err, "getting owner failed, ignored");
1308             return;
1309         }
1310
1311         // if we can't transfer the cap, it is delivered as NULL
1312         if (!monitor_can_send_cap(&capability)) {
1313             cap = NULL_CAP;
1314             null_cap = true;
1315             msgerr = MON_ERR_CAP_SEND;
1316         }
1317     }
1318
1319     if (!null_cap) {
1320         // FIXME: this seems to be totally bogus. it assumes a give_away cap -AB
1321
1322         // mark capability as remote
1323         err = monitor_remote_relations(cap, RRELS_COPY_BIT, RRELS_COPY_BIT, NULL);
1324         if (err_is_fail(err)) {
1325             USER_PANIC_ERR(err, "monitor_cap_remote failed");
1326             return;
1327         }
1328
1329         // XXX: This is a typedef of struct that flounder is generating.
1330         // Flounder should not be generating this and we shouldn't be using it.
1331         capability_to_caprep(&capability, &caprep);
1332
1333         // destroy capability on this core
1334         err = cap_destroy(cap);
1335         if (err_is_fail(err)) {
1336             USER_PANIC_ERR(err, "cap destroy failed");
1337         }
1338     }
1339
1340     // enqueue capability in order to be forwarded
1341     struct multihop_intermon_capability_forwarding_state *me = malloc(
1342             sizeof(struct multihop_intermon_capability_forwarding_state));
1343     struct intermon_state *ist = b->st;
1344     me->args.vci = dir->vci;
1345     me->args.direction = direction;
1346     me->args.capid = capid;
1347     me->args.err = msgerr;
1348     me->args.cap = caprep;
1349     me->args.null_cap = null_cap;
1350     me->args.owner = capowner;
1351     me->elem.cont = multihop_cap_send_intermon_forward_cont;
1352
1353     err = intermon_enqueue_send(b, &ist->queue, get_default_waitset(),
1354             &me->elem.queue);
1355     assert(err_is_ok(err));
1356 }
1357
1358 // continue function for intermonitor capability forwarding
1359 static void multihop_cap_send_intermon_forward_cont(struct intermon_binding *b,
1360         struct intermon_msg_queue_elem *e)
1361 {
1362     struct multihop_intermon_capability_forwarding_state *st =
1363             (struct multihop_intermon_capability_forwarding_state *) e;
1364     multihop_cap_send_intermon_forward(b, st->args.vci, st->args.direction,
1365         st->args.capid, st->args.err, st->args.cap, st->args.null_cap,
1366         st->args.owner);
1367     free(e);
1368 }
1369
1370 /**
1371  * \brief Forward capability to the next hop
1372  *
1373  */
1374 static inline void multihop_cap_send_intermon_forward(
1375         struct intermon_binding *b, multihop_vci_t vci, uint8_t direction,
1376         uint32_t capid, errval_t msgerr, intermon_caprep_t caprep, bool null_cap,
1377         coreid_t owner)
1378 {
1379
1380     errval_t err;
1381
1382     // try to forward
1383     err = b->tx_vtbl.multihop_cap_send(b, NOP_CONT, vci, direction, capid, msgerr,
1384             caprep, null_cap, owner);
1385
1386     if (err_is_fail(err)) {
1387         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
1388             struct multihop_intermon_capability_forwarding_state *me =
1389                     malloc(
1390                             sizeof(struct multihop_intermon_capability_forwarding_state));
1391             struct intermon_state *ist = b->st;
1392             me->args.vci = vci;
1393             me->args.direction = direction;
1394             me->args.capid = capid;
1395             me->args.err = msgerr;
1396             me->args.cap = caprep;
1397             me->args.null_cap = null_cap;
1398             me->elem.cont = multihop_cap_send_intermon_forward_cont;
1399
1400             err = intermon_enqueue_send_at_front(b, &ist->queue,
1401                     get_default_waitset(), &me->elem.queue);
1402             assert(err_is_ok(err));
1403             return;
1404         }
1405
1406         USER_PANIC_ERR(err,
1407                 "Could not forward capability over multi-hop channel\n");
1408     }
1409 }
1410
1411 /**
1412  * \brief Handle a capability coming from another monitor.
1413  *        The capability must be either forwarded to another
1414  *        monitor or to a local dispatcher.
1415  *
1416  */
1417 static void multihop_intermon_cap_send_handler(
1418         struct intermon_binding *intermon_binding, multihop_vci_t vci,
1419         uint8_t direction, uint32_t capid, errval_t msgerr,
1420         intermon_caprep_t caprep, bool null_cap, coreid_t owner)
1421 {
1422
1423     MULTIHOP_DEBUG(
1424             "monitor on core %d received a capability (from other monitor). VCI %llu, direction %d, cap ID %d, owner %d\n", my_core_id, (unsigned long long) vci, direction, capid, owner);
1425
1426     errval_t err;
1427     struct monitor_multihop_chan_state *chan_state = forwarding_table_lookup(
1428             vci);
1429     struct direction *dir = multihop_get_direction(chan_state, direction);
1430
1431     if (dir->type == MULTIHOP_ENDPOINT) {
1432         // we have to forward the message to a local dispatcher
1433
1434         // Construct the capability
1435         struct capability *capability = (struct capability *) &caprep;
1436         struct capref cap;
1437
1438         if (null_cap) {
1439             cap = NULL_CAP;
1440         } else {
1441             err = slot_alloc(&cap);
1442             if (err_is_fail(err)) {
1443
1444                 // send a msg indicating that we failed
1445                 // to allocate a slot for the capability
1446                 cap = NULL_CAP;
1447                 msgerr = err;
1448                 goto do_send;
1449             }
1450
1451             // create capability
1452             err = monitor_cap_create(cap, capability, owner);
1453             if (err_is_fail(err)) {
1454                 slot_free(cap);
1455
1456                 // send a msg indicating that we failed
1457                 // to create the capability
1458                 cap = NULL_CAP;
1459                 msgerr = err_push(err, MON_ERR_CAP_CREATE);
1460                 goto do_send;
1461             }
1462
1463             // mark capability as remote
1464             err = monitor_remote_relations(cap, RRELS_COPY_BIT, RRELS_COPY_BIT, NULL);
1465             if (err_is_fail(err)) {
1466                 USER_PANIC_ERR(err, "monitor_cap_remote failed");
1467                 return;
1468             }
1469         }
1470
1471 do_send: ;
1472         // enqueue the capability in order to be forwarded to
1473         // the local dispatcher
1474         struct monitor_binding *b = dir->binding.monitor_binding;
1475         struct multihop_capability_forwarding_state *me = malloc(
1476                 sizeof(struct multihop_capability_forwarding_state));
1477         assert(me != NULL);
1478         struct monitor_state *ist = b->st;
1479         me->args.vci = dir->vci;
1480         me->args.direction = direction;
1481         me->args.cap = cap;
1482         me->args.capid = capid;
1483         me->args.err = msgerr;
1484         me->elem.cont = multihop_cap_send_forward_cont;
1485
1486         err = monitor_enqueue_send(b, &ist->queue, get_default_waitset(),
1487                 &me->elem.queue);
1488         assert(err_is_ok(err));
1489         return;
1490
1491     } else {
1492         // we have to forward the capability to the next hop
1493         // we therefore enqueue the capability
1494         struct intermon_binding *b = dir->binding.intermon_binding;
1495         struct multihop_intermon_capability_forwarding_state *me = malloc(
1496                 sizeof(struct multihop_intermon_capability_forwarding_state));
1497         struct intermon_state *ist = b->st;
1498         me->args.vci = dir->vci;
1499         me->args.direction = direction;
1500         me->args.capid = capid;
1501         me->args.err = msgerr;
1502         me->args.cap = caprep;
1503         me->args.null_cap = null_cap;
1504         me->elem.cont = multihop_cap_send_intermon_forward_cont;
1505
1506         err = intermon_enqueue_send(b, &ist->queue, get_default_waitset(),
1507                 &me->elem.queue);
1508         assert(err_is_ok(err));
1509         return;
1510     }
1511 }
1512
1513 // continue function for monitor capability forwarding
1514 static void multihop_cap_send_forward_cont(struct monitor_binding *b,
1515         struct monitor_msg_queue_elem *e)
1516 {
1517     struct multihop_capability_forwarding_state *st =
1518             (struct multihop_capability_forwarding_state *) e;
1519     multihop_cap_send_forward(b, st->args.vci, st->args.direction,
1520                               st->args.capid, st->args.err, st->args.cap);
1521     free(e);
1522 }
1523
1524 /**
1525  * \brief Forward capability to a local dispatcher
1526  *
1527  */
1528 inline static void multihop_cap_send_forward(struct monitor_binding *b,
1529         multihop_vci_t vci, uint8_t direction, uint32_t capid, errval_t msgerr,
1530         struct capref cap)
1531 {
1532     errval_t err;
1533
1534 // try to send
1535     err = b->tx_vtbl.multihop_cap_send(b, NOP_CONT, vci, direction, msgerr,
1536                                        cap, capid);
1537
1538     if (err_is_fail(err)) {
1539         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
1540             struct multihop_capability_forwarding_state *me = malloc(
1541                     sizeof(struct multihop_capability_forwarding_state));
1542             assert(me != NULL);
1543             struct monitor_state *ist = b->st;
1544             me->args.vci = vci;
1545             me->args.direction = direction;
1546             me->args.cap = cap;
1547             me->args.capid = capid;
1548             me->args.err = msgerr;
1549             me->elem.cont = multihop_cap_send_forward_cont;
1550
1551             err = monitor_enqueue_send_at_front(b, &ist->queue,
1552                     get_default_waitset(), &me->elem.queue);
1553             assert(err_is_ok(err));
1554             return;
1555         }
1556
1557         USER_PANIC_ERR(err,
1558                 "failed to forward capability over multi-hop channel\n");
1559     }
1560 }
1561
1562 ///////////////////////////////////////////////////////
1563
1564 // INITIALIZATION
1565
1566 ///////////////////////////////////////////////////////
1567
1568 // set up receive vtable in the intermonitor interface
1569 errval_t multihop_intermon_init(struct intermon_binding *ib)
1570 {
1571     ib->rx_vtbl.bind_multihop_intermon_request =
1572             &multihop_intermon_bind_request_handler;
1573     ib->rx_vtbl.bind_multihop_intermon_reply =
1574             &multihop_intermon_bind_reply_handler;
1575     ib->rx_vtbl.multihop_message = &intermon_multihop_message_handler;
1576     ib->rx_vtbl.multihop_cap_send = &multihop_intermon_cap_send_handler;
1577     ib->rx_vtbl.multihop_routing_table_request =
1578             &multihop_handle_routing_table_request;
1579     ib->rx_vtbl.multihop_routing_table_response =
1580             &multihop_handle_routing_table_response;
1581     ib->rx_vtbl.multihop_routing_table_grow =
1582             &multihop_routing_table_grow;
1583
1584     return SYS_ERR_OK;
1585 }
1586
1587 // set up receive vtable in the monitor interface
1588 errval_t multihop_monitor_init(struct monitor_binding *mb)
1589 {
1590     mb->rx_vtbl.multihop_bind_client_request =
1591             &multihop_monitor_bind_request_handler;
1592     mb->rx_vtbl.multihop_bind_service_reply =
1593             &multihop_monitor_service_bind_reply_handler;
1594     mb->rx_vtbl.multihop_message = &multihop_message_handler;
1595     mb->rx_vtbl.multihop_cap_send = &multihop_cap_send_request_handler;
1596     mb->rx_vtbl.multihop_routing_table_new =
1597             &multihop_routing_table_new;
1598     mb->rx_vtbl.multihop_routing_table_set =
1599             &multihop_routing_table_set;
1600
1601     return SYS_ERR_OK;
1602 }