5688f1cc2ef890b3020a30c2586e4cead6abfe06
[barrelfish] / usr / monitor / multihop_support.c
1 /**
2  * \file
3  * \brief Multi-hop channel support at the monitor
4  */
5
6 /*
7  * Copyright (c) 2009, 2010, ETH Zurich.
8  * All rights reserved.
9  *
10  * This file is distributed under the terms in the attached LICENSE file.
11  * If you do not find this file, copies can be found by writing to:
12  * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
13  */
14
15 #include "monitor.h"
16 #include <collections/hash_table.h>
17 #include <bench/bench.h>
18 #include <barrelfish/multihop_chan.h>
19
20 ///////////////////////////////////////////////////////
21
22 // ROUTING TABLE
23
24 ///////////////////////////////////////////////////////
25
26 /* The routing table is used to determine where to
27  * forward a connection set-up request.
28  *
29  * The routing table is constructed by the RTS
30  * (Routing table set-up dispatcher), using
31  * information from the System Knowledge Base (SKB).
32  * The RTS will send the routing table to the monitor
33  * that is first booted once it has constructed the
34  * routing table.
35  *
36  * In cases where there is no SKB, the RTS will also
37  * never send us an routing table. We use direct routing
38  * is this case.
39  */
40
41 // the routing table (as two dimensional array indexed by source and dest core)
42 static coreid_t **routing_table;
43
44 // the maximum source core ID in the routing table
45 static coreid_t routing_table_max_coreid;
46
47 // the number of outstanding entries to expect in the routing table
48 // (this is a kludge used while receiving entries from the rts program)
49 static coreid_t routing_table_nentries;
50
51 // hack for synchronisation when requesting routing table from another monitor
52 static bool saw_routing_table_response;
53
54 /*
55  *  Print the routing table of a monitor, if present.
56  */
57 static void multihop_print_routing_table(void)
58 {
59 #if MULTIHOP_DEBUG_ENABLED
60     if (routing_table == NULL) {
61         MULTIHOP_DEBUG("routing table not present on core %u\n", my_core_id);
62         return;
63     }
64
65     size_t buffer_size = (((size_t)routing_table_max_coreid) + 1) * 5;
66     char buffer[buffer_size];
67
68     // Print the header
69     MULTIHOP_DEBUG("routing table of monitor %u:\n", my_core_id);
70     {
71         char *p = buffer;
72         for (unsigned i = 0; i <= routing_table_max_coreid; i++) {
73             p += sprintf(p, " %3u", i);
74         }
75         MULTIHOP_DEBUG("      To:%s\n", buffer);
76     }
77
78     // Print each line
79     for (unsigned src = 0; src <= routing_table_max_coreid; src++) {
80         if (routing_table[src] == NULL) {
81             continue;
82         }
83
84         // convert (my part of) the routing table into a single string
85         char *p = buffer;
86         int total_char = 0, w_char = 0;
87         for (unsigned i = 0; i <= routing_table_max_coreid; i++) {
88             w_char = snprintf(p, buffer_size - total_char, " %3u",
89                               routing_table[src][i]);
90             assert(w_char > 0);
91             total_char += w_char;
92             p += w_char;
93         }
94         MULTIHOP_DEBUG("From %3u:%s\n", src, buffer);
95     }
96 #endif // MULTIHOP_DEBUG_ENABLED
97 }
98
99 // start to receive a new routing table from the RTS
100 static void multihop_routing_table_new(struct monitor_binding *b,
101                                        coreid_t max_coreid, coreid_t nentries)
102 {
103     // sanity-check input (FIXME: report errors!)
104     assert(max_coreid >= my_core_id);
105     assert(nentries > 0 && nentries <= (max_coreid + 1));
106
107     // FIXME: we don't yet support changes to the existing routing table
108     assert(routing_table == NULL);
109
110     routing_table_max_coreid = max_coreid;
111     routing_table_nentries = nentries;
112
113     // allocate space for the max core ID
114     routing_table = calloc(((uintptr_t)max_coreid) + 1, sizeof(coreid_t *));
115     assert(routing_table != NULL);
116 }
117
118 // receive a part of the routing table from RTS (routing table set-up dispatcher)
119 static void multihop_routing_table_set(struct monitor_binding *b,
120                                        coreid_t from, coreid_t *to, size_t len)
121 {
122     // sanity-check input (FIXME: report errors!)
123     // FIXME: we don't yet support changes to the existing routing table
124     assert(routing_table != NULL);
125     assert(from <= routing_table_max_coreid);
126     assert(routing_table[from] == NULL);
127     assert(len == routing_table_max_coreid + 1);
128     routing_table[from] = to;
129
130     if (--routing_table_nentries == 0) {
131         // we have received the complete table!
132         MULTIHOP_DEBUG("monitor on core %d has received the complete"
133                        " routing table (from RTS)\n", my_core_id);
134         multihop_print_routing_table();
135     }
136 }
137
138 /*
139  * Request (my part of) the routing table from another monitor.
140  * This method blocks until a reply is received.
141  */
142 errval_t multihop_request_routing_table(struct intermon_binding *b)
143 {
144     errval_t err;
145
146     // request the routing table
147     err = b->tx_vtbl.multihop_routing_table_request(b, NOP_CONT, my_core_id);
148     if (err_is_fail(err)) {
149         return err;
150     }
151
152     // wait until we have received a reply
153     while (!saw_routing_table_response) {
154         messages_wait_and_handle_next();
155     }
156
157     return SYS_ERR_OK;
158 }
159
160 // handle request for a portion of the routing table from another monitor
161 static void multihop_handle_routing_table_request(struct intermon_binding *b,
162                                                   coreid_t core_id)
163 {
164     errval_t err;
165
166     if (routing_table != NULL && core_id <= routing_table_max_coreid
167         && routing_table[core_id] != NULL) {
168         // if we have a routing table, send routing table to other core
169         err = b->tx_vtbl.multihop_routing_table_response(b, NOP_CONT,
170                 SYS_ERR_OK, core_id, routing_table_max_coreid,
171                 routing_table[core_id], routing_table_max_coreid + 1);
172     } else {
173         // if we don't have a routing table, send an error reply
174         err = b->tx_vtbl.multihop_routing_table_response(b, NOP_CONT,
175                 MON_ERR_INCOMPLETE_ROUTE, core_id, routing_table_max_coreid,
176                 NULL, 0);
177     }
178
179     assert(err_is_ok(err)); // FIXME
180 }
181
182 // handle the response to a routing table request from the other monitor
183 static void multihop_handle_routing_table_response(struct intermon_binding *b,
184                                                    errval_t err,
185                                                    coreid_t source_coreid,
186                                                    coreid_t max_coreid,
187                                                    coreid_t *to, size_t len)
188 {
189     assert(routing_table == NULL);
190     assert(source_coreid == my_core_id);
191
192     if (err_is_ok(err)) {
193         assert(to != NULL);
194         routing_table = calloc(((uintptr_t)max_coreid) + 1, sizeof(coreid_t *));
195         assert(routing_table != NULL);
196         routing_table_max_coreid = max_coreid;
197
198         assert(len == max_coreid + 1);
199         assert(source_coreid <= max_coreid);
200         routing_table[source_coreid] = to;
201     } else {
202         assert(to == NULL);
203
204         if (err_no(err) != MON_ERR_INCOMPLETE_ROUTE) {
205             DEBUG_ERR(err, "unexpected error retrieving routing table");
206         }
207     }
208
209     saw_routing_table_response = true;
210 }
211
212 // grow the routing table to a set of desination cores, via a given forwarder
213 static void multihop_routing_table_grow(struct intermon_binding *b,
214                                         coreid_t forwarder,
215                                         coreid_t *destinations, size_t ndests)
216 {
217     assert(ndests > 0);
218
219     // check the max core ID in the destinations
220     coreid_t max_coreid = my_core_id;
221     for (unsigned i = 0; i < ndests; i++) {
222         if (destinations[i] > max_coreid) {
223             max_coreid = destinations[i];
224         }
225     }
226
227     // ensure we have an allocated routing table; if necessary, grow it
228     if (routing_table == NULL) {
229         routing_table = calloc(((uintptr_t)max_coreid) + 1, sizeof(coreid_t *));
230         assert(routing_table != NULL);
231         routing_table_max_coreid = max_coreid;
232     } else if (max_coreid > routing_table_max_coreid) {
233         for (unsigned i = 0; i <= routing_table_max_coreid; i++) {
234             if (routing_table[i] != NULL) {
235                 routing_table[i] = realloc(routing_table[i],
236                                            (((uintptr_t)max_coreid) + 1)
237                                            * sizeof(coreid_t));
238                 assert(routing_table[i] != NULL);
239                 // XXX: the default for the unconfigured part of the routing
240                 // table is direct routing
241                 for (unsigned j = routing_table_max_coreid + 1; j <= max_coreid; j++) {
242                     routing_table[i][j] = j;
243                 }
244             }
245         }
246
247         routing_table = realloc(routing_table, (((uintptr_t)max_coreid) + 1)
248                                                * sizeof(coreid_t *));
249         assert(routing_table != NULL);
250         memset(&routing_table[routing_table_max_coreid + 1], 0,
251                (max_coreid - routing_table_max_coreid) * sizeof(coreid_t *));
252         routing_table_max_coreid = max_coreid;
253     }
254
255     // ensure I have my own routes (the default is direct routing)
256     if (routing_table[my_core_id] == NULL) {
257         routing_table[my_core_id] = malloc((((uintptr_t)routing_table_max_coreid) + 1)
258                                            * sizeof(coreid_t));
259         assert(routing_table[my_core_id] != NULL);
260         for (unsigned i = 0; i <= routing_table_max_coreid; i++) {
261             routing_table[my_core_id][i] = i;
262         }
263     }
264
265     // update routes to destinations for all origins in my routing table and myself
266     for (unsigned src = 0; src <= routing_table_max_coreid; src++) {
267         if (routing_table[src] != NULL) {
268             for (unsigned i = 0; i < ndests; i++) {
269                 routing_table[src][destinations[i]] = routing_table[src][forwarder];
270             }
271         }
272     }
273
274     free(destinations);
275 }
276
277 // return the next hop (based on the routing table)
278 static inline coreid_t get_next_hop(coreid_t dest)
279 {
280
281     assert(dest != my_core_id);
282
283     if (routing_table != NULL
284         && my_core_id <= routing_table_max_coreid
285         && dest <= routing_table_max_coreid
286         && routing_table[my_core_id] != NULL) {
287         // if we have a routing table, look up next hop
288         return routing_table[my_core_id][dest];
289     } else {
290         // if we don't have a routing table, route directly
291         return dest;
292     }
293 }
294
295 ///////////////////////////////////////////////////////
296
297 // FORWARDING (HASH) TABLE
298
299 ///////////////////////////////////////////////////////
300
301 /**
302  * Messages are forwarded based on the forwarding table.
303  * We use a hash table to map virtual circuit identifiers (VCIs)
304  * to a pointer to the channel state.
305  */
306 static collections_hash_table *forwarding_table;
307
308 // is forwarding table initialized?
309 static bool is_forwarding_table_initialized = false;
310
311 struct monitor_multihop_chan_state;
312
313 // initialize the forwarding table
314 static inline void init_forwarding_table(void)
315 {
316
317     if (!is_forwarding_table_initialized) {
318         is_forwarding_table_initialized = true;
319         collections_hash_create_with_buckets(&forwarding_table,
320                 MULTIHOP_FORWARDING_TABLE_BUCKETS, free);
321
322         /**
323          * We initialize the random function with the current time stamp
324          * in order to make assigned VCIs unpredictable. This makes it hard
325          * for an attacker that sends message with manipulated VCIs to actually
326          * find a valid VCI.
327          */
328         srand(bench_tsc());
329     }
330 }
331
332 // insert entry in forwarding table and return VCI
333 static inline multihop_vci_t forwarding_table_insert(
334         struct monitor_multihop_chan_state *chan_state)
335 {
336
337     assert(chan_state != NULL);
338     multihop_vci_t vci;
339
340     // we call initialize before we insert an entry
341     init_forwarding_table();
342
343     do {
344         // we assign VCIs randomly, but need to
345         // make sure, that it is not yet taken
346         vci = (multihop_vci_t) rand();
347     } while (collections_hash_find(forwarding_table, vci) != NULL);
348
349     // insert into forwarding table
350     collections_hash_insert(forwarding_table, vci, chan_state);
351     return vci;
352 }
353
354 // delete entry from forwarding table
355 static inline void forwarding_table_delete(multihop_vci_t vci)
356 {
357     assert(is_forwarding_table_initialized);
358     collections_hash_delete(forwarding_table, vci);
359 }
360
361 // get entry from the forwarding table
362 static inline struct monitor_multihop_chan_state* forwarding_table_lookup(
363         multihop_vci_t vci)
364 {
365
366     assert(is_forwarding_table_initialized);
367     struct monitor_multihop_chan_state *chan_state = collections_hash_find(forwarding_table,
368             vci);
369
370     if (chan_state == NULL) {
371         USER_PANIC("invalid virtual circuit identifier in multi-hop channel");
372     }
373     return chan_state;
374 }
375
376 ///////////////////////////////////////////////////////
377
378 // STRUCT FOR THE PER - CHANNEL STATE
379
380 ///////////////////////////////////////////////////////
381
382 struct monitor_multihop_chan_state {
383     struct direction {
384         enum {
385             MULTIHOP_ENDPOINT, // if this is an endpoint, the communication partner is on the same core
386             MULTIHOP_NODE
387         // communication partner is a monitor on another core
388         } type;
389
390         multihop_vci_t vci; // the virtual circuit identifier to use on outgoing messages
391
392         // bindings to the "next hop"
393         union {
394             struct monitor_binding *monitor_binding; // used at endpoints to identify the dispatcher
395             struct intermon_binding *intermon_binding; // monitor binding of next hop
396         } binding;
397     } dir1, dir2;
398
399     // temporary storage for a virtual circuit identifier
400     multihop_vci_t tmp_vci;
401
402     // connection state
403     enum {
404         MONTIOR_MULTIHOP_DISCONNECTED, // Disconnected
405         MONITOR_MULTIHOP_BIND_WAIT, // Waiting for a bind reply message
406         MONITOR_MULTIHOP_CONNECTED,
407     // Connection established
408     } connstate;
409 };
410
411 // get the direction
412 static inline struct direction* multihop_get_direction(
413         struct monitor_multihop_chan_state *chan_state, uint8_t direction)
414 {
415     if (direction == 1) {
416         return &chan_state->dir1;
417     } else if (direction == 2) {
418         return &chan_state->dir2;
419     } else {
420         USER_PANIC("unknown direction in multihop channel: %d", direction);
421         return NULL;
422     }
423 }
424
425 // get the opposite direction
426 static inline uint8_t multihop_get_opposite_direction(
427         struct monitor_multihop_chan_state *chan_state, uint8_t direction,
428         struct direction **dir)
429 {
430     if (direction == 2) {
431         *dir = &chan_state->dir1;
432         return 1;
433     } else if (direction == 1) {
434         *dir = &chan_state->dir2;
435         return 2;
436     } else {
437         USER_PANIC("unknown direction in multihop channel: %d", direction);
438         return 0;
439     }
440 }
441
442 ////////////////////////////////////////////////////////////
443
444 // MULTI-HOP CHANNEL SETUP
445
446 ////////////////////////////////////////////////////////////
447
448 static void
449 multihop_monitor_bind_request_busy_cont(struct intermon_binding *b,
450         struct intermon_msg_queue_elem *e);
451
452 static void
453 multihop_monitor_bind_request_cont(
454         struct monitor_multihop_chan_state *chan_state, iref_t iref,
455         coreid_t core);
456
457 static void
458 multihop_bind_service_request(uintptr_t service_id,
459         struct monitor_multihop_chan_state *chan_state);
460
461 static void
462 multihop_intermon_bind_reply_cont(struct intermon_binding *intermon_binding,
463         multihop_vci_t receiver_vci, multihop_vci_t sender_vci, errval_t msgerr);
464
465 static void
466 multihop_monitor_bind_reply_client(struct monitor_binding *domain_binding,
467         multihop_vci_t receiver_vci, multihop_vci_t sender_vci, errval_t msgerr);
468
469 static inline void
470 multihop_monitor_request_error(struct monitor_multihop_chan_state *chan_state,
471         errval_t msgerr);
472
473 /**
474  * \brief This method handles a bind request message from a local dispatcher
475  *
476  * \param b The monitor binding
477  * \param iref The iref of the service
478  * \param vci The vci of the local dispatcher (this vci should be used for messages sent to the dispatcher)
479  */
480 static void multihop_monitor_bind_request_handler(struct monitor_binding *b,
481         iref_t iref, multihop_vci_t vci)
482 {
483
484     errval_t err;
485     coreid_t core_id;
486     struct monitor_multihop_chan_state *chan_state = NULL;
487
488     MULTIHOP_DEBUG(
489             "monitor on core %d received a bind multi-hop message from a local dispatcher, iref: %d\n", my_core_id, (int) iref);
490
491     // Look up core_id from the iref
492     err = iref_get_core_id(iref, &core_id);
493     if (err_is_fail(err)) {
494         debug_err(__FILE__, __func__, __LINE__, err, "iref_get_core_id failed");
495         multihop_monitor_bind_reply_client(b, vci, 0, err); // send back error message
496         return;
497     }
498
499     // allocate local state for the connection
500     chan_state = malloc(sizeof(struct monitor_multihop_chan_state));
501     assert(chan_state != NULL);
502     chan_state->connstate = MONITOR_MULTIHOP_BIND_WAIT;
503     chan_state->dir2.type = MULTIHOP_ENDPOINT;
504     chan_state->dir2.vci = vci;
505     chan_state->dir2.binding.monitor_binding = b;
506
507     // get a virtual circuit identifier (VCI) for this channel
508     // and insert mapping into forwarding table
509     chan_state->tmp_vci = forwarding_table_insert(chan_state);
510
511     // make sure that service is not on same core as the client
512     if (core_id == my_core_id) {
513         multihop_monitor_request_error(chan_state,
514                 LIB_ERR_BIND_MULTIHOP_SAME_CORE);
515         forwarding_table_delete(chan_state->tmp_vci);
516         return;
517     }
518
519     // determine where to forward the message
520     coreid_t next_hop = get_next_hop(core_id);
521
522     // Get connection to the monitor to forward request to
523     err = intermon_binding_get(next_hop,
524             &chan_state->dir1.binding.intermon_binding);
525     if (err_is_fail(err)) {
526         debug_err(__FILE__, __func__, __LINE__, err,
527                 "intermon_binding_get failed");
528         multihop_monitor_request_error(chan_state, err);
529         forwarding_table_delete(chan_state->tmp_vci);
530         return;
531     }
532
533     // call continuation function
534     multihop_monitor_bind_request_cont(chan_state, iref, core_id);
535 }
536
537 struct multihop_monitor_bind_request_state {
538     struct intermon_msg_queue_elem elem;
539     struct monitor_multihop_chan_state *chan_state;
540     iref_t iref;
541     coreid_t core;
542 };
543
544 // called when channel is no longer busy
545 static void multihop_monitor_bind_request_busy_cont(struct intermon_binding *b,
546         struct intermon_msg_queue_elem *e)
547 {
548     struct multihop_monitor_bind_request_state *st =
549             (struct multihop_monitor_bind_request_state *) e;
550     multihop_monitor_bind_request_cont(st->chan_state, st->iref, st->core);
551     free(e);
552 }
553
554 /**
555  * \brief Sends a bind request to the "next hop"
556  *
557  * \param chan_state pointer to the channel state
558  * \param iref the iref of the service
559  * \param core core ID of the service
560  */
561 static void multihop_monitor_bind_request_cont(
562         struct monitor_multihop_chan_state *chan_state, iref_t iref,
563         coreid_t core)
564 {
565
566     errval_t err;
567     struct intermon_binding *mon_binding =
568             chan_state->dir1.binding.intermon_binding;
569
570     // send request to next hop
571     err = mon_binding->tx_vtbl.bind_multihop_intermon_request(mon_binding,
572             NOP_CONT, iref, chan_state->tmp_vci, core);
573
574     if (err_is_fail(err)) {
575         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
576             struct multihop_monitor_bind_request_state *me = malloc(
577                     sizeof(struct multihop_monitor_bind_request_state));
578             struct intermon_state *ist = mon_binding->st;
579             me->chan_state = chan_state;
580             me->iref = iref;
581             me->elem.cont = multihop_monitor_bind_request_busy_cont;
582
583             err = intermon_enqueue_send(mon_binding, &ist->queue,
584                     get_default_waitset(), &me->elem.queue);
585             assert(err_is_ok(err));
586             return;
587         }
588         // return error code to client
589         multihop_monitor_request_error(chan_state, err);
590         forwarding_table_delete(chan_state->tmp_vci);
591     }
592 }
593
594 /**
595  * \brief Handles a bind request from another monitor
596  *
597  * \param b intermonitor binding
598  * \param iref the iref of the service
599  * \param vci The vci to use
600  * \param the core ID of the service
601  */
602 static void multihop_intermon_bind_request_handler(struct intermon_binding *b,
603         iref_t iref, multihop_vci_t vci, coreid_t core)
604 {
605     errval_t err;
606
607     MULTIHOP_DEBUG(
608             "monitor on core %d received multi-hop bind request with vci %d\n", my_core_id, (int) vci);
609
610     // allocate channel state & fill in needed information
611     struct monitor_multihop_chan_state *chan_state = malloc(
612             sizeof(struct monitor_multihop_chan_state));
613     chan_state->connstate = MONITOR_MULTIHOP_BIND_WAIT;
614     chan_state->dir2.vci = vci;
615     chan_state->dir2.binding.intermon_binding = b;
616     chan_state->dir2.type = MULTIHOP_NODE;
617     chan_state->tmp_vci = forwarding_table_insert(chan_state);
618
619     if (core == my_core_id) {
620         // service is on same core than this monitor, therefore we forward to local dispatcher
621
622         // get the service's connection
623         err = iref_get_binding(iref, &chan_state->dir1.binding.monitor_binding);
624         if (err_is_fail(err)) {
625             USER_PANIC_ERR(err,
626                     "Multihop set-up: could not get domain-binding for iref");
627         }
628
629         // get the service id
630         uintptr_t service_id = 0;
631         err = iref_get_service_id(iref, &service_id);
632         if (err_is_fail(err)) {
633             USER_PANIC_ERR(err,
634                     "Multihop set-up: could not get service id for iref");
635         }
636
637         // forward request to service
638         multihop_bind_service_request(service_id, chan_state);
639
640     } else {
641         // we have to forward the request to another monitor
642         // we get the core id of the next hop from the routing table
643         coreid_t next_hop = get_next_hop(core);
644
645         // get connection to the "next-hop" monitor
646         err = intermon_binding_get(next_hop,
647                 &chan_state->dir1.binding.intermon_binding);
648         if (err_is_fail(err)) {
649             debug_err(__FILE__, __func__, __LINE__, err,
650                     "intermon_binding_get failed");
651             multihop_monitor_request_error(chan_state, err);
652             forwarding_table_delete(chan_state->tmp_vci);
653             return;
654         }
655
656         // send request to next hop
657         multihop_monitor_bind_request_cont(chan_state, iref, core);
658     }
659 }
660
661 // used if channel is busy while sending request to service
662 struct multihop_bind_service_request_state {
663     struct monitor_msg_queue_elem elem;
664     uintptr_t service_id;
665     struct monitor_multihop_chan_state *chan_state;
666 };
667
668 // used when channel is no longer busy
669 static void multihop_bind_service_busy_cont(struct monitor_binding *b,
670         struct monitor_msg_queue_elem *e)
671 {
672     struct multihop_bind_service_request_state *st =
673             (struct multihop_bind_service_request_state *) e;
674     multihop_bind_service_request(st->service_id, st->chan_state);
675     free(e);
676 }
677
678 /**
679  * \brief Forward bind request to service's dispatcher
680  *
681  * \param domain_binding binding to service
682  * \param service_id Id of the service
683  * \param vci my vci
684  */
685 static void multihop_bind_service_request(uintptr_t service_id,
686         struct monitor_multihop_chan_state *chan_state)
687 {
688     errval_t err;
689     MULTIHOP_DEBUG(
690             "monitor on core %d is forwarding bind request to local dispatcher...\n", my_core_id);
691     err =
692             chan_state->dir1.binding.monitor_binding->tx_vtbl.multihop_bind_service_request(
693                     chan_state->dir1.binding.monitor_binding, NOP_CONT,
694                     service_id, chan_state->tmp_vci);
695     if (err_is_fail(err)) {
696         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
697             struct monitor_binding *monitor_binding =
698                     chan_state->dir1.binding.monitor_binding;
699             struct multihop_bind_service_request_state *me = malloc(
700                     sizeof(struct multihop_bind_service_request_state));
701             struct monitor_state *ist = monitor_binding->st;
702             me->service_id = service_id;
703             me->chan_state = chan_state;
704             me->elem.cont = &multihop_bind_service_busy_cont;
705
706             err = monitor_enqueue_send(monitor_binding, &ist->queue,
707                     get_default_waitset(), &me->elem.queue);
708             assert(err_is_ok(err));
709             return;
710         }
711
712         // return error code to client
713         multihop_monitor_request_error(chan_state, err);
714         forwarding_table_delete(chan_state->tmp_vci);
715     }
716 }
717
718 /**
719  * \brief Handle a reply message coming from service's dispatcher
720  *
721  * \param mon_binding Binding to service's dispatcher
722  * \param my_vci my virtual circuit identifier
723  * \param sender_vci virtual circuit identifier of the sender
724  * \param msgerr error code
725  */
726 static void multihop_monitor_service_bind_reply_handler(
727         struct monitor_binding *mon_binding, multihop_vci_t receiver_vci,
728         multihop_vci_t sender_vci, errval_t msgerr)
729 {
730     MULTIHOP_DEBUG(
731             "monitor on core %d received bind reply message. Status: %s. my_vci: %d\n", my_core_id, err_is_ok(msgerr) ? "success" : "failed", (int) receiver_vci);
732
733     struct monitor_multihop_chan_state *chan_state = forwarding_table_lookup(
734             receiver_vci);
735
736     assert(chan_state->connstate == MONITOR_MULTIHOP_BIND_WAIT);
737
738     multihop_vci_t next_receiver_vci = chan_state->dir2.vci;
739     struct intermon_binding *next_hop_binding =
740             chan_state->dir2.binding.intermon_binding;
741     if (err_is_ok(msgerr)) { /* bind succeeded */
742         chan_state->dir1.type = MULTIHOP_ENDPOINT;
743         chan_state->dir1.vci = sender_vci;
744         chan_state->dir1.binding.monitor_binding = mon_binding;
745         chan_state->connstate = MONITOR_MULTIHOP_CONNECTED;
746     } else {
747         // delete entry from forwarding table
748         forwarding_table_delete(receiver_vci);
749     }
750
751     // (stack-ripped) forward reply to next monitor
752     multihop_intermon_bind_reply_cont(next_hop_binding, next_receiver_vci,
753             receiver_vci, msgerr);
754 }
755
756 struct multihop_intermon_bind_reply_state {
757     struct intermon_msg_queue_elem elem;
758     struct intermon_bind_multihop_intermon_reply__args args;
759 };
760
761 // called when channel is no longer busy
762 static void multihop_intermon_bind_reply_busy_cont(struct intermon_binding *b,
763         struct intermon_msg_queue_elem *e)
764 {
765     struct multihop_intermon_bind_reply_state *st =
766             (struct multihop_intermon_bind_reply_state *) e;
767     multihop_intermon_bind_reply_cont(b, st->args.receiver_vci,
768             st->args.sender_vci, st->args.err);
769     free(e);
770 }
771
772 /**
773  * \brief Forward a bind reply message to the next monitor
774  *
775  * \param intermon_binding binding to the next monitor
776  */
777 static void multihop_intermon_bind_reply_cont(
778         struct intermon_binding *intermon_binding, multihop_vci_t receiver_vci,
779         multihop_vci_t sender_vci, errval_t msgerr)
780 {
781     errval_t err;
782     MULTIHOP_DEBUG("monitor on core %d is forwarding reply\n", my_core_id);
783     err = intermon_binding->tx_vtbl.bind_multihop_intermon_reply(
784             intermon_binding, NOP_CONT, receiver_vci, sender_vci, msgerr);
785     if (err_is_fail(err)) {
786         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
787             struct multihop_intermon_bind_reply_state *me = malloc(
788                     sizeof(struct multihop_intermon_bind_reply_state));
789             struct intermon_state *ist = intermon_binding->st;
790             me->args.sender_vci = sender_vci;
791             me->args.receiver_vci = receiver_vci;
792             me->args.err = msgerr;
793             me->elem.cont = multihop_intermon_bind_reply_busy_cont;
794
795             err = intermon_enqueue_send(intermon_binding, &ist->queue,
796                     get_default_waitset(), &me->elem.queue);
797             assert(err_is_ok(err));
798             return;
799         }USER_PANIC_ERR(err,
800                 "Could not forward bind reply message in multi-hop channel");
801     }
802 }
803
804 /**
805  * \brief Handles a reply message from another monitor
806  *
807  * \param binding Binding to the other monitor
808  * \param my_vci My virtual circuit identifier
809  * \param sender_vci virtual circuit identifier of the sender
810  * \param msgerr error code
811  */
812 static void multihop_intermon_bind_reply_handler(
813         struct intermon_binding *binding, multihop_vci_t receiver_vci,
814         multihop_vci_t sender_vci, errval_t msgerr)
815 {
816     MULTIHOP_DEBUG(
817             "monitor on core %d has received a bind reply\n", my_core_id);
818     struct monitor_multihop_chan_state *chan_state = forwarding_table_lookup(
819             receiver_vci);
820
821     assert(chan_state->connstate == MONITOR_MULTIHOP_BIND_WAIT);
822
823     if (err_is_ok(msgerr)) {
824         chan_state->dir1.type = MULTIHOP_NODE;
825         chan_state->dir1.binding.intermon_binding = binding;
826         chan_state->dir1.vci = sender_vci;
827         chan_state->connstate = MONITOR_MULTIHOP_CONNECTED;
828
829         if (chan_state->dir2.type == MULTIHOP_NODE) {
830             multihop_intermon_bind_reply_cont(
831                     chan_state->dir2.binding.intermon_binding,
832                     chan_state->dir2.vci, receiver_vci, msgerr);
833         } else {
834             multihop_monitor_bind_reply_client(
835                     chan_state->dir2.binding.monitor_binding,
836                     chan_state->dir2.vci, receiver_vci, msgerr);
837         }
838     } else {
839
840         // connection was refused
841
842         if (chan_state->dir2.type == MULTIHOP_NODE) {
843             multihop_intermon_bind_reply_cont(
844                     chan_state->dir2.binding.intermon_binding,
845                     chan_state->dir2.vci, 0, msgerr);
846         } else {
847             multihop_monitor_bind_reply_client(
848                     chan_state->dir2.binding.monitor_binding,
849                     chan_state->dir2.vci, 0, msgerr);
850         }
851
852         // delete entry from forwarding table
853         forwarding_table_delete(receiver_vci);
854     }
855 }
856
857 struct multihop_monitor_bind_reply_state {
858     struct monitor_msg_queue_elem elem;
859     struct monitor_multihop_bind_client_reply__args args;
860 };
861
862 // continue function to forward a message to a dispatcher
863 static void multihop_monitor_bind_reply_busy_cont(struct monitor_binding *b,
864         struct monitor_msg_queue_elem *e)
865 {
866     struct multihop_monitor_bind_reply_state *st =
867             (struct multihop_monitor_bind_reply_state *) e;
868     multihop_monitor_bind_reply_client(b, st->args.receiver_vci,
869             st->args.sender_vci, st->args.err);
870     free(e);
871 }
872
873 /**
874  * \brief Send a reply to the dispatcher who originally sent the request
875  *
876  * \param domain_binding The monitor_binding to use
877  * \param receiver_vci The VCI of the receiver
878  * \param sender_vci The VCI of the sender
879  * \param msgerr The error code
880  */
881 static void multihop_monitor_bind_reply_client(
882         struct monitor_binding *domain_binding, multihop_vci_t receiver_vci,
883         multihop_vci_t sender_vci, errval_t msgerr)
884 {
885     errval_t err;
886     MULTIHOP_DEBUG(
887             "monitor on core %d is sending reply to dispatcher\n", my_core_id);
888     err = domain_binding->tx_vtbl.multihop_bind_client_reply(domain_binding,
889             NOP_CONT, receiver_vci, sender_vci, msgerr);
890     if (err_is_fail(err)) {
891         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
892             struct multihop_monitor_bind_reply_state *me = malloc(
893                     sizeof(struct multihop_monitor_bind_reply_state));
894             assert(me != NULL);
895             struct monitor_state *ist = domain_binding->st;
896             me->args.receiver_vci = receiver_vci;
897             me->args.sender_vci = sender_vci;
898             me->args.err = msgerr;
899             me->elem.cont = multihop_monitor_bind_reply_busy_cont;
900
901             err = monitor_enqueue_send(domain_binding, &ist->queue,
902                     get_default_waitset(), &me->elem.queue);
903             assert(err_is_ok(err));
904             return;
905
906         }
907
908         USER_PANIC_ERR(
909                 err,
910                 "Could not forward bind reply to client's dispatcher in multi-hop channel");
911     }
912 }
913
914 /**
915  * \brief send an error code back
916  *
917  */
918 static inline void multihop_monitor_request_error(
919         struct monitor_multihop_chan_state *chan_state, errval_t msgerr)
920 {
921     assert(chan_state != NULL);
922     if (chan_state->dir2.type == MULTIHOP_NODE) {
923         multihop_intermon_bind_reply_cont(
924                 chan_state->dir2.binding.intermon_binding, chan_state->dir2.vci,
925                 0, msgerr);
926     } else {
927         multihop_monitor_bind_reply_client(
928                 chan_state->dir2.binding.monitor_binding, chan_state->dir2.vci,
929                 0, msgerr);
930     }
931 }
932
933 ///////////////////////////////////////////////////////
934
935 // MESSAGE FORWARDING
936
937 ///////////////////////////////////////////////////////
938
939 static inline void multihop_message_monitor_forward(struct monitor_binding *b,
940         multihop_vci_t vci, uint8_t direction, uint8_t flags, uint32_t ack,
941         uint8_t *payload, size_t size, bool first_try);
942
943 static void multihop_message_forward_continue(struct monitor_binding *b,
944         struct monitor_msg_queue_elem *e);
945
946 static inline void multihop_message_intermon_forward(struct intermon_binding *b,
947         multihop_vci_t vci, uint8_t direction, uint8_t flags, uint32_t ack,
948         uint8_t *payload, size_t size, bool first_try);
949
950 static void multihop_message_intermon_forward_cont(struct intermon_binding *b,
951         struct intermon_msg_queue_elem *e);
952
953 // monitor message forwarding state
954 struct monitor_multihop_message_forwarding_state {
955     struct monitor_msg_queue_elem elem;
956     struct monitor_multihop_message__args args;
957 };
958
959 // inter-monitor forwarding state
960 struct intermon_message_forwarding_state {
961     struct intermon_msg_queue_elem elem;
962     struct intermon_multihop_message__args args;
963 };
964
965 /**
966  * \brief Handle a multi-hop message coming from a local dispatcher.
967  *        The message must be forwarded to the next hop.
968  *
969  * \param mon_binding the monitor binding
970  * \param vci the virtual circuit identifier of the message
971  * \param direction direction of the message
972  * \param flags message flags
973  * \param ack number of messages acknowledged with this message
974  * \param payload pointer to the message payload
975  * \size size of the message payload
976  *
977  */
978 static void multihop_message_handler(struct monitor_binding *mon_binding,
979         multihop_vci_t vci, uint8_t direction, uint8_t flags, uint32_t ack,
980         uint8_t *payload, size_t size)
981 {
982
983     MULTIHOP_DEBUG(
984             "monitor on core %d received multi-hop message (from local dispatcher). VCI %llu, direction %d, flags %d, ack %d\n", my_core_id, (unsigned long long) vci, direction, flags, ack);
985
986     // get forwarding information
987     errval_t err;
988     struct monitor_multihop_chan_state *chan_state = forwarding_table_lookup(
989             vci);
990     struct direction *dir = multihop_get_direction(chan_state, direction);
991     struct intermon_binding *b = dir->binding.intermon_binding;
992
993     struct intermon_state *ist = b->st;
994     if (msg_queue_is_empty(&ist->queue)) {
995
996         // if the message queue is empty, we can directly forward
997         // the message
998         multihop_message_intermon_forward(b, dir->vci, direction, flags, ack,
999                 payload, size, true);
1000     } else {
1001         // if the message queue is not empty, we have to
1002         // enqueue the message (to make sure we do not bypass
1003         // other messages)
1004         struct intermon_message_forwarding_state *me = malloc(
1005                 sizeof(struct intermon_message_forwarding_state));
1006         me->args.vci = dir->vci;
1007         me->args.direction = direction;
1008         me->args.flags = flags;
1009         me->args.ack = ack;
1010         me->args.payload = payload;
1011         me->args.size = size;
1012         me->elem.cont = multihop_message_intermon_forward_cont;
1013
1014         err = intermon_enqueue_send(b, &ist->queue, get_default_waitset(),
1015                 &me->elem.queue);
1016         assert(err_is_ok(err));
1017     }
1018 }
1019
1020 // continue function for intermonitor message forwarding
1021 static void multihop_message_intermon_forward_cont(struct intermon_binding *b,
1022         struct intermon_msg_queue_elem *e)
1023 {
1024
1025     struct intermon_message_forwarding_state *st =
1026             (struct intermon_message_forwarding_state *) e;
1027
1028     multihop_message_intermon_forward(b, st->args.vci, st->args.direction,
1029             st->args.flags, st->args.ack, st->args.payload, st->args.size,
1030             false);
1031     free(e);
1032 }
1033
1034 /**
1035  * \brief Forward a message to another monitor.
1036  *
1037  */
1038 static inline void multihop_message_intermon_forward(struct intermon_binding *b,
1039         multihop_vci_t vci, uint8_t direction, uint8_t flags, uint32_t ack,
1040         uint8_t *payload, size_t size, bool first_try)
1041 {
1042
1043     errval_t err;
1044
1045     // try to forward message
1046     err = b->tx_vtbl.multihop_message(b, MKCONT(free, payload), vci, direction,
1047             flags, ack, payload, size);
1048
1049     if (err_is_fail(err)) {
1050         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
1051             struct intermon_message_forwarding_state *me = malloc(
1052                     sizeof(struct intermon_message_forwarding_state));
1053             struct intermon_state *ist = b->st;
1054             me->args.vci = vci;
1055             me->args.direction = direction;
1056             me->args.flags = flags;
1057             me->args.ack = ack;
1058             me->args.payload = payload;
1059             me->args.size = size;
1060             me->elem.cont = multihop_message_intermon_forward_cont;
1061
1062             if (first_try) {
1063                 // if this is the first time that we try to send this message
1064                 // we can enqueue it at the back of the message queue
1065                 err = intermon_enqueue_send(b, &ist->queue,
1066                         get_default_waitset(), &me->elem.queue);
1067             } else {
1068                 // if this is NOT the first time that we try to send this message
1069                 // we have to enqueue it at the FRONT to make sure that the
1070                 // original message order is preserved
1071                 err = intermon_enqueue_send_at_front(b, &ist->queue,
1072                         get_default_waitset(), &me->elem.queue);
1073             }
1074
1075             assert(err_is_ok(err));
1076             return;
1077         }
1078
1079         USER_PANIC_ERR(err, "Could not forward multi-hop message\n");
1080     }
1081 }
1082
1083 /**
1084  * \brief Handle a message coming from another monitor. We have
1085  *        to forward the message either to another monitor or
1086  *        to a dispatcher
1087  *
1088  * \param mon_binding the monitor binding
1089  * \param vci the virtual circuit identifier of the message
1090  * \param direction direction of the message
1091  * \param flags message flags
1092  * \param ack number of messages acknowledged with this message
1093  * \param payload pointer to the message payload
1094  * \size size of the message payload
1095  */
1096 static void intermon_multihop_message_handler(struct intermon_binding *binding,
1097         multihop_vci_t vci, uint8_t direction, uint8_t flags, uint32_t ack,
1098         uint8_t *payload, size_t size)
1099 {
1100
1101     MULTIHOP_DEBUG(
1102             "monitor on core %d received multi-hop message (from other monitor). VCI %llu, direction %d, flags %d, ack %d\n", my_core_id, (unsigned long long) vci, direction, flags, ack);
1103
1104     errval_t err;
1105     struct monitor_multihop_chan_state *chan_state = forwarding_table_lookup(
1106             vci);
1107     struct direction *dir = multihop_get_direction(chan_state, direction);
1108
1109     if (dir->type == MULTIHOP_ENDPOINT) {
1110         // we have to forward the message to a local dispatcher
1111         struct monitor_binding *b = dir->binding.monitor_binding;
1112         struct monitor_state *ist = b->st;
1113
1114         if (msg_queue_is_empty(&ist->queue)) {
1115             // if the message queue is empty, we can directly forward
1116             // the message
1117             multihop_message_monitor_forward(b, dir->vci, direction, flags, ack,
1118                     payload, size, true);
1119         } else {
1120             // if the message queue is not empty, we have to
1121             // enqueue the message (to make sure we do not bypass
1122             // other messages)
1123             struct monitor_multihop_message_forwarding_state *me = malloc(
1124                     sizeof(struct monitor_multihop_message_forwarding_state));
1125             assert(me != NULL);
1126             me->args.vci = dir->vci;
1127             me->args.direction = direction;
1128             me->args.flags = flags;
1129             me->args.ack = ack;
1130             me->args.payload = payload;
1131             me->args.size = size;
1132             me->elem.cont = multihop_message_forward_continue;
1133
1134             err = monitor_enqueue_send(b, &ist->queue, get_default_waitset(),
1135                     &me->elem.queue);
1136             assert(err_is_ok(err));
1137         }
1138         return;
1139     } else {
1140         // we have to forward the message to the next hop (--> another monitor)
1141         struct intermon_binding *b = dir->binding.intermon_binding;
1142         struct intermon_state *ist = b->st;
1143
1144         if (msg_queue_is_empty(&ist->queue)) {
1145             // message queue is empty --> send directly
1146             multihop_message_intermon_forward(b, dir->vci, direction, flags,
1147                     ack, payload, size, true);
1148         } else {
1149             // enqueue message
1150             struct intermon_message_forwarding_state *me = malloc(
1151                     sizeof(struct intermon_message_forwarding_state));
1152             me->args.vci = dir->vci;
1153             me->args.direction = direction;
1154             me->args.flags = flags;
1155             me->args.ack = ack;
1156             me->args.payload = payload;
1157             me->args.size = size;
1158             me->elem.cont = multihop_message_intermon_forward_cont;
1159
1160             err = intermon_enqueue_send(b, &ist->queue, get_default_waitset(),
1161                     &me->elem.queue);
1162             assert(err_is_ok(err));
1163         }
1164         return;
1165     }
1166 }
1167
1168 // continue function to forward a message to a dispatcher
1169 static void multihop_message_forward_continue(struct monitor_binding *b,
1170         struct monitor_msg_queue_elem *e)
1171 {
1172
1173     struct monitor_multihop_message_forwarding_state *st =
1174             (struct monitor_multihop_message_forwarding_state *) e;
1175
1176     multihop_message_monitor_forward(b, st->args.vci, st->args.direction,
1177             st->args.flags, st->args.ack, st->args.payload, st->args.size,
1178             false);
1179     free(e);
1180 }
1181
1182 /**
1183  * \brief Forward a message to a dispatcher
1184  */
1185 static inline void multihop_message_monitor_forward(struct monitor_binding *b,
1186         multihop_vci_t vci, uint8_t direction, uint8_t flags, uint32_t ack,
1187         uint8_t *payload, size_t size, bool first_try)
1188 {
1189
1190     errval_t err;
1191
1192     // try to forward message
1193     err = b->tx_vtbl.multihop_message(b, MKCONT(free, payload), vci, direction,
1194             flags, ack, payload, size);
1195
1196     if (err_is_fail(err)) {
1197         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
1198             struct monitor_multihop_message_forwarding_state *me = malloc(
1199                     sizeof(struct monitor_multihop_message_forwarding_state));
1200             assert(me != NULL);
1201             struct monitor_state *ist = b->st;
1202             me->args.vci = vci;
1203             me->args.direction = direction;
1204             me->args.flags = flags;
1205             me->args.ack = ack;
1206             me->args.payload = payload;
1207             me->args.size = size;
1208             me->elem.cont = multihop_message_forward_continue;
1209
1210             if (first_try) {
1211                 err = monitor_enqueue_send(b, &ist->queue,
1212                         get_default_waitset(), &me->elem.queue);
1213             } else {
1214                 err = monitor_enqueue_send_at_front(b, &ist->queue,
1215                         get_default_waitset(), &me->elem.queue);
1216             }
1217
1218             assert(err_is_ok(err));
1219             return;
1220         }
1221
1222         USER_PANIC_ERR(err, "failed forwarding multihop message\n");
1223     }
1224 }
1225
1226 ///////////////////////////////////////////////////////
1227
1228 // CAPABILITY FORWARDING
1229
1230 ///////////////////////////////////////////////////////
1231
1232 static void multihop_cap_send_intermon_forward_cont(struct intermon_binding *b,
1233         struct intermon_msg_queue_elem *e);
1234
1235 static inline void multihop_cap_send_intermon_forward(
1236         struct intermon_binding *b, multihop_vci_t vci, uint8_t direction,
1237         uint32_t capid, errval_t msgerr, intermon_caprep_t caprep, bool null_cap);
1238
1239 static void multihop_cap_send_forward_cont(struct monitor_binding *b,
1240         struct monitor_msg_queue_elem *e);
1241
1242 inline static void multihop_cap_send_forward(struct monitor_binding *b,
1243         multihop_vci_t vci, uint8_t direction, uint32_t capid, errval_t msgerr,
1244         struct capref cap);
1245
1246 // intermonitor capability forwarding state
1247 struct multihop_intermon_capability_forwarding_state {
1248     struct intermon_msg_queue_elem elem;
1249     struct intermon_multihop_cap_send__args args;
1250 };
1251
1252 // monitor capability forwarding state
1253 struct multihop_capability_forwarding_state {
1254     struct monitor_msg_queue_elem elem;
1255     struct monitor_multihop_cap_send__args args;
1256 };
1257
1258 /**
1259  * \brief Handle capability send request from local monitor.
1260  *        The capability must be forwarded to the next hop.
1261  *
1262  * \param monitor_binding
1263  * \param vci the virtual circuit identifier (VCI)
1264  * \param direction the direction
1265  * \param cap reference to the capability
1266  * \capid ID of the capability
1267  *
1268  */
1269 static void multihop_cap_send_request_handler(
1270         struct monitor_binding *monitor_binding, multihop_vci_t vci,
1271         uint8_t direction, errval_t msgerr, struct capref cap, uint32_t capid)
1272 {
1273
1274     MULTIHOP_DEBUG(
1275             "monitor on core %d received a capability (from local dispatcher). VCI %llu, direction %d, cap ID %d\n", my_core_id, (unsigned long long) vci, direction, capid);
1276
1277     errval_t err;
1278     struct capability capability;
1279     intermon_caprep_t caprep;
1280     memset(&caprep, 0, sizeof(caprep));
1281     bool null_cap = capref_is_null(cap);
1282
1283     // XXX: this field is ignored when the local dispatcher originates the cap
1284     msgerr = SYS_ERR_OK;
1285
1286     // get forwarding information
1287     struct monitor_multihop_chan_state *chan_state = forwarding_table_lookup(
1288             vci);
1289     struct direction *dir = multihop_get_direction(chan_state, direction);
1290     struct intermon_binding *b = dir->binding.intermon_binding;
1291
1292     if (!null_cap) {
1293         // get binary representation of capability
1294         err = monitor_cap_identify(cap, &capability);
1295         if (err_is_fail(err)) {
1296             USER_PANIC_ERR(err, "monitor_cap_identify failed, ignored");
1297             return;
1298         }
1299
1300         // if we can't transfer the cap, it is delivered as NULL
1301         if (!monitor_can_send_cap(&capability)) {
1302             cap = NULL_CAP;
1303             null_cap = true;
1304             msgerr = MON_ERR_CAP_SEND;
1305         }
1306     }
1307
1308     if (!null_cap) {
1309         // FIXME: this seems to be totally bogus. it assumes a give_away cap -AB
1310
1311         // mark capability as remote
1312         err = monitor_remote_relations(cap, RRELS_COPY_BIT, RRELS_COPY_BIT, NULL);
1313         if (err_is_fail(err)) {
1314             USER_PANIC_ERR(err, "monitor_cap_remote failed");
1315             return;
1316         }
1317
1318         // XXX: This is a typedef of struct that flounder is generating.
1319         // Flounder should not be generating this and we shouldn't be using it.
1320         capability_to_caprep(&capability, &caprep);
1321
1322         // destroy capability on this core
1323         err = cap_destroy(cap);
1324         if (err_is_fail(err)) {
1325             USER_PANIC_ERR(err, "cap destroy failed");
1326         }
1327     }
1328
1329     // enqueue capability in order to be forwarded
1330     struct multihop_intermon_capability_forwarding_state *me = malloc(
1331             sizeof(struct multihop_intermon_capability_forwarding_state));
1332     struct intermon_state *ist = b->st;
1333     me->args.vci = dir->vci;
1334     me->args.direction = direction;
1335     me->args.capid = capid;
1336     me->args.err = msgerr;
1337     me->args.cap = caprep;
1338     me->args.null_cap = null_cap;
1339     me->elem.cont = multihop_cap_send_intermon_forward_cont;
1340
1341     err = intermon_enqueue_send(b, &ist->queue, get_default_waitset(),
1342             &me->elem.queue);
1343     assert(err_is_ok(err));
1344 }
1345
1346 // continue function for intermonitor capability forwarding
1347 static void multihop_cap_send_intermon_forward_cont(struct intermon_binding *b,
1348         struct intermon_msg_queue_elem *e)
1349 {
1350     struct multihop_intermon_capability_forwarding_state *st =
1351             (struct multihop_intermon_capability_forwarding_state *) e;
1352     multihop_cap_send_intermon_forward(b, st->args.vci, st->args.direction,
1353         st->args.capid, st->args.err, st->args.cap, st->args.null_cap);
1354     free(e);
1355 }
1356
1357 /**
1358  * \brief Forward capability to the next hop
1359  *
1360  */
1361 static inline void multihop_cap_send_intermon_forward(
1362         struct intermon_binding *b, multihop_vci_t vci, uint8_t direction,
1363         uint32_t capid, errval_t msgerr, intermon_caprep_t caprep, bool null_cap)
1364 {
1365
1366     errval_t err;
1367
1368     // try to forward
1369     err = b->tx_vtbl.multihop_cap_send(b, NOP_CONT, vci, direction, capid, msgerr,
1370             caprep, null_cap);
1371
1372     if (err_is_fail(err)) {
1373         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
1374             struct multihop_intermon_capability_forwarding_state *me =
1375                     malloc(
1376                             sizeof(struct multihop_intermon_capability_forwarding_state));
1377             struct intermon_state *ist = b->st;
1378             me->args.vci = vci;
1379             me->args.direction = direction;
1380             me->args.capid = capid;
1381             me->args.err = msgerr;
1382             me->args.cap = caprep;
1383             me->args.null_cap = null_cap;
1384             me->elem.cont = multihop_cap_send_intermon_forward_cont;
1385
1386             err = intermon_enqueue_send_at_front(b, &ist->queue,
1387                     get_default_waitset(), &me->elem.queue);
1388             assert(err_is_ok(err));
1389             return;
1390         }
1391
1392         USER_PANIC_ERR(err,
1393                 "Could not forward capability over multi-hop channel\n");
1394     }
1395 }
1396
1397 /**
1398  * \brief Handle a capability coming from another monitor.
1399  *        The capability must be either forwarded to another
1400  *        monitor or to a local dispatcher.
1401  *
1402  */
1403 static void multihop_intermon_cap_send_handler(
1404         struct intermon_binding *intermon_binding, multihop_vci_t vci,
1405         uint8_t direction, uint32_t capid, errval_t msgerr,
1406         intermon_caprep_t caprep, bool null_cap)
1407 {
1408
1409     MULTIHOP_DEBUG(
1410             "monitor on core %d received a capability (from other monitor). VCI %llu, direction %d, cap ID %d\n", my_core_id, (unsigned long long) vci, direction, capid);
1411
1412     errval_t err;
1413     struct monitor_multihop_chan_state *chan_state = forwarding_table_lookup(
1414             vci);
1415     struct direction *dir = multihop_get_direction(chan_state, direction);
1416
1417     if (dir->type == MULTIHOP_ENDPOINT) {
1418         // we have to forward the message to a local dispatcher
1419
1420         // Construct the capability
1421         struct capability *capability = (struct capability *) &caprep;
1422         struct capref cap;
1423
1424         if (null_cap) {
1425             cap = NULL_CAP;
1426         } else {
1427             err = slot_alloc(&cap);
1428             if (err_is_fail(err)) {
1429
1430                 // send a msg indicating that we failed
1431                 // to allocate a slot for the capability
1432                 cap = NULL_CAP;
1433                 msgerr = err;
1434                 goto do_send;
1435             }
1436
1437             // create capability
1438             // note that we just pass anything as core_id, because
1439             // it is not being used
1440             err = monitor_cap_create(cap, capability, my_core_id);
1441             if (err_is_fail(err)) {
1442                 slot_free(cap);
1443
1444                 // send a msg indicating that we failed
1445                 // to create the capability
1446                 cap = NULL_CAP;
1447                 msgerr = err_push(err, MON_ERR_CAP_CREATE);
1448                 goto do_send;
1449             }
1450
1451             // mark capability as remote
1452             err = monitor_remote_relations(cap, RRELS_COPY_BIT, RRELS_COPY_BIT, NULL);
1453             if (err_is_fail(err)) {
1454                 USER_PANIC_ERR(err, "monitor_cap_remote failed");
1455                 return;
1456             }
1457         }
1458
1459 do_send: ;
1460         // enqueue the capability in order to be forwarded to
1461         // the local dispatcher
1462         struct monitor_binding *b = dir->binding.monitor_binding;
1463         struct multihop_capability_forwarding_state *me = malloc(
1464                 sizeof(struct multihop_capability_forwarding_state));
1465         assert(me != NULL);
1466         struct monitor_state *ist = b->st;
1467         me->args.vci = dir->vci;
1468         me->args.direction = direction;
1469         me->args.cap = cap;
1470         me->args.capid = capid;
1471         me->args.err = msgerr;
1472         me->elem.cont = multihop_cap_send_forward_cont;
1473
1474         err = monitor_enqueue_send(b, &ist->queue, get_default_waitset(),
1475                 &me->elem.queue);
1476         assert(err_is_ok(err));
1477         return;
1478
1479     } else {
1480         // we have to forward the capability to the next hop
1481         // we therefore enqueue the capability
1482         struct intermon_binding *b = dir->binding.intermon_binding;
1483         struct multihop_intermon_capability_forwarding_state *me = malloc(
1484                 sizeof(struct multihop_intermon_capability_forwarding_state));
1485         struct intermon_state *ist = b->st;
1486         me->args.vci = dir->vci;
1487         me->args.direction = direction;
1488         me->args.capid = capid;
1489         me->args.err = msgerr;
1490         me->args.cap = caprep;
1491         me->args.null_cap = null_cap;
1492         me->elem.cont = multihop_cap_send_intermon_forward_cont;
1493
1494         err = intermon_enqueue_send(b, &ist->queue, get_default_waitset(),
1495                 &me->elem.queue);
1496         assert(err_is_ok(err));
1497         return;
1498     }
1499 }
1500
1501 // continue function for monitor capability forwarding
1502 static void multihop_cap_send_forward_cont(struct monitor_binding *b,
1503         struct monitor_msg_queue_elem *e)
1504 {
1505     struct multihop_capability_forwarding_state *st =
1506             (struct multihop_capability_forwarding_state *) e;
1507     multihop_cap_send_forward(b, st->args.vci, st->args.direction,
1508                               st->args.capid, st->args.err, st->args.cap);
1509     free(e);
1510 }
1511
1512 /**
1513  * \brief Forward capability to a local dispatcher
1514  *
1515  */
1516 inline static void multihop_cap_send_forward(struct monitor_binding *b,
1517         multihop_vci_t vci, uint8_t direction, uint32_t capid, errval_t msgerr,
1518         struct capref cap)
1519 {
1520     errval_t err;
1521
1522 // try to send
1523     err = b->tx_vtbl.multihop_cap_send(b, NOP_CONT, vci, direction, msgerr,
1524                                        cap, capid);
1525
1526     if (err_is_fail(err)) {
1527         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
1528             struct multihop_capability_forwarding_state *me = malloc(
1529                     sizeof(struct multihop_capability_forwarding_state));
1530             assert(me != NULL);
1531             struct monitor_state *ist = b->st;
1532             me->args.vci = vci;
1533             me->args.direction = direction;
1534             me->args.cap = cap;
1535             me->args.capid = capid;
1536             me->args.err = msgerr;
1537             me->elem.cont = multihop_cap_send_forward_cont;
1538
1539             err = monitor_enqueue_send_at_front(b, &ist->queue,
1540                     get_default_waitset(), &me->elem.queue);
1541             assert(err_is_ok(err));
1542             return;
1543         }
1544
1545         USER_PANIC_ERR(err,
1546                 "failed to forward capability over multi-hop channel\n");
1547     }
1548 }
1549
1550 ///////////////////////////////////////////////////////
1551
1552 // INITIALIZATION
1553
1554 ///////////////////////////////////////////////////////
1555
1556 // set up receive vtable in the intermonitor interface
1557 errval_t multihop_intermon_init(struct intermon_binding *ib)
1558 {
1559     ib->rx_vtbl.bind_multihop_intermon_request =
1560             &multihop_intermon_bind_request_handler;
1561     ib->rx_vtbl.bind_multihop_intermon_reply =
1562             &multihop_intermon_bind_reply_handler;
1563     ib->rx_vtbl.multihop_message = &intermon_multihop_message_handler;
1564     ib->rx_vtbl.multihop_cap_send = &multihop_intermon_cap_send_handler;
1565     ib->rx_vtbl.multihop_routing_table_request =
1566             &multihop_handle_routing_table_request;
1567     ib->rx_vtbl.multihop_routing_table_response =
1568             &multihop_handle_routing_table_response;
1569     ib->rx_vtbl.multihop_routing_table_grow =
1570             &multihop_routing_table_grow;
1571
1572     return SYS_ERR_OK;
1573 }
1574
1575 // set up receive vtable in the monitor interface
1576 errval_t multihop_monitor_init(struct monitor_binding *mb)
1577 {
1578     mb->rx_vtbl.multihop_bind_client_request =
1579             &multihop_monitor_bind_request_handler;
1580     mb->rx_vtbl.multihop_bind_service_reply =
1581             &multihop_monitor_service_bind_reply_handler;
1582     mb->rx_vtbl.multihop_message = &multihop_message_handler;
1583     mb->rx_vtbl.multihop_cap_send = &multihop_cap_send_request_handler;
1584     mb->rx_vtbl.multihop_routing_table_new =
1585             &multihop_routing_table_new;
1586     mb->rx_vtbl.multihop_routing_table_set =
1587             &multihop_routing_table_set;
1588
1589     return SYS_ERR_OK;
1590 }