c262b1eb178602926a12a517c78d3724612a83c8
[barrelfish] / usr / monitor / inter.c
1 /**
2  * \file
3  * \brief Inter-monitor communication
4  *
5  * Welcome to stack-rip hell.
6  */
7
8 /*
9  * Copyright (c) 2009, 2010, 2011, ETH Zurich.
10  * All rights reserved.
11  *
12  * This file is distributed under the terms in the attached LICENSE file.
13  * If you do not find this file, copies can be found by writing to:
14  * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
15  */
16
17 #include <inttypes.h>
18 #include <sys/param.h>
19 #include <monitor.h>
20 #include <barrelfish/dispatch.h>
21 #include <barrelfish/proc_mgmt_client.h>
22 #include <trace/trace.h>
23 #include "send_cap.h"
24 #include "capops.h"
25 #include <trace_definitions/trace_defs.h>
26
27 static bool* notification_sent = NULL;
28 static bool* monitor_ready = NULL;
29 static errval_t new_monitor_notify(coreid_t id)
30 {
31     if (notification_sent == NULL) {
32         notification_sent = calloc(MAX_COREID*MAX_COREID, sizeof(bool));
33     }
34
35     struct intermon_binding *b;
36     errval_t err;
37     // XXX: this is stupid...
38     // XXX: I changed this a bit to keep track of what cores are ready
39     // and who has gotten a notification, this allows to boot cores
40     // in parallel thus speeding up the boot process
41     for (int i = 0; i < MAX_COREID; i++) {
42         if (i != my_core_id && i != id) {
43
44             coreid_t min = MIN(id, i);
45             coreid_t max = MAX(id, i);
46
47             err = intermon_binding_get(i, &b);
48             if (err_is_ok(err) && !notification_sent[min*MAX_COREID+max] && monitor_ready[i]) {
49                 while (1) {
50                     err = b->tx_vtbl.new_monitor_notify(b, NOP_CONT, id);
51                     if (err_is_ok(err)) {
52                         notification_sent[min*MAX_COREID+max] = true;
53                         break;
54                     }
55                     if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
56                         messages_wait_and_handle_next();
57                     } else {
58                         return err;
59                     }
60                 }
61             }
62         }
63     }
64
65     return SYS_ERR_OK;
66 }
67
68 /**
69  * \brief A newly booted monitor indicates that can participate in capability
70  * operations
71  */
72 static void capops_ready(struct intermon_binding *b)
73 {
74     struct intermon_state *st = b->st;
75     st->capops_ready = true;
76 }
77
78 /**
79  * \brief A newly booted monitor indicates that it has initialized
80  */
81
82 static void boot_core_reply_handler(struct monitor_binding *b,
83                                     struct monitor_msg_queue_elem *e);
84
85 struct boot_core_reply_state {
86     struct monitor_msg_queue_elem elem;
87     struct monitor_boot_core_reply__tx_args args;
88 };
89
90 static void
91 boot_core_reply_enqueue(struct monitor_binding *domain_binding,
92                             errval_t error_code)
93 {
94     errval_t err;
95
96     struct boot_core_reply_state *me =
97         malloc(sizeof(struct boot_core_reply_state));
98     assert(me != NULL);
99     me->args.err = error_code;
100     me->elem.cont = boot_core_reply_handler;
101
102     struct monitor_state *st = domain_binding->st;
103     err = monitor_enqueue_send(domain_binding, &st->queue,
104                                get_default_waitset(), &me->elem.queue);
105     if (err_is_fail(err)) {
106         USER_PANIC_ERR(err, "monitor_enqueue_send failed");
107     }
108 }
109
110 static void
111 boot_core_reply_cont(struct monitor_binding *domain_binding,
112                      errval_t error_code)
113 {
114     assert(domain_binding != NULL);
115     errval_t err;
116     DEBUG_CAPOPS("boot_core_reply_cont: %s (%"PRIuERRV")\n",
117             err_getstring(error_code), error_code);
118     err = domain_binding->tx_vtbl.
119             boot_core_reply(domain_binding, NOP_CONT, error_code);
120     if (err_is_fail(err)) {
121         if(err_no(err) == FLOUNDER_ERR_TX_BUSY) {
122             boot_core_reply_enqueue(domain_binding, error_code);
123         } else {
124             USER_PANIC_ERR(err, "error delivering boot_core_reply");
125         }
126     }
127 }
128
129 static void boot_core_reply_handler(struct monitor_binding *b,
130                                     struct monitor_msg_queue_elem *e)
131 {
132     struct boot_core_reply_state *st =
133         (struct boot_core_reply_state *)e;
134     boot_core_reply_cont(b, st->args.err);
135     free(e);
136 }
137
138 static void monitor_initialized(struct intermon_binding *b)
139 {
140     if (monitor_ready == NULL) {
141         monitor_ready = calloc(MAX_COREID, sizeof(bool));
142     }
143
144     struct intermon_state *st = b->st;
145     errval_t err = SYS_ERR_OK;
146     assert(st->capops_ready);
147
148     // Inform other monitors of this new monitor
149     monitor_ready[st->core_id] = true;
150     err = new_monitor_notify(st->core_id);
151     if (err_is_fail(err)) {
152         err = err_push(err, MON_ERR_INTERN_NEW_MONITOR);
153     }
154
155     // New plan, do timing sync for every time a monitor has come up...
156     /*if(num_monitors > 1) {
157         printf("monitor: synchronizing clocks\n");
158         err = timing_sync_timer();
159         assert(err_is_ok(err) || err_no(err) == SYS_ERR_SYNC_MISS);
160         if(err_no(err) == SYS_ERR_SYNC_MISS) {
161             printf("monitor: failed to sync clocks. Bad reference clock?\n");
162         }
163     }*/
164
165     // Tell the client that asked us to boot this core what happened
166     struct monitor_binding *client = st->originating_client;
167     boot_core_reply_cont(client, err);
168 }
169
170 static void cap_receive_request_handler(struct monitor_binding *b,
171                                         struct monitor_msg_queue_elem *e);
172
173 struct cap_receive_request_state {
174     struct monitor_msg_queue_elem elem;
175     struct monitor_cap_receive_request__tx_args args;
176     uintptr_t your_mon_id;
177     struct intermon_binding *b;
178 };
179
180 static void
181 cap_receive_request_enqueue(struct monitor_binding *domain_binding,
182                             uintptr_t domain_id, errval_t msgerr,
183                             struct capref cap, uint32_t capid,
184                             uintptr_t your_mon_id,
185                             struct intermon_binding *b, bool first)
186 {
187     DEBUG_CAPOPS("%s\n", __FUNCTION__);
188     errval_t err;
189
190     struct cap_receive_request_state *me =
191         malloc(sizeof(struct cap_receive_request_state));
192     assert(me != NULL);
193     me->args.conn_id = domain_id;
194     me->args.err = msgerr;
195     me->args.cap = cap;
196     me->args.capid = capid;
197     me->your_mon_id = your_mon_id;
198     me->b = b;
199     me->elem.cont = cap_receive_request_handler;
200
201     struct monitor_state *st = domain_binding->st;
202     if (first) {
203         err = monitor_enqueue_send_at_front(domain_binding, &st->queue,
204                                    get_default_waitset(), &me->elem.queue);
205     } else {
206         err = monitor_enqueue_send(domain_binding, &st->queue,
207                                    get_default_waitset(), &me->elem.queue);
208     }
209     if (err_is_fail(err)) {
210         USER_PANIC_ERR(err, "monitor_enqueue_send failed");
211     }
212 }
213
214 static void
215 cap_receive_request_cont(struct monitor_binding *domain_binding,
216                          uintptr_t domain_id, errval_t msgerr,
217                          struct capref cap,
218                          uint32_t capid, uintptr_t your_mon_id,
219                          struct intermon_binding *b, bool first)
220 {
221     DEBUG_CAPOPS("%s ->%"PRIuPTR", %s\n", __FUNCTION__, domain_id, err_getstring(msgerr));
222     errval_t err, err2;
223     struct capref *capp = caprefdup(cap);
224
225     err = domain_binding->tx_vtbl.
226         cap_receive_request(domain_binding, MKCONT(free, capp), domain_id, msgerr, cap, capid);
227
228     if (err_is_fail(err)) {
229         DEBUG_CAPOPS("%s: send failed: %s\n", __FUNCTION__, err_getstring(err));
230         free(capp);
231         if(err_no(err) == FLOUNDER_ERR_TX_BUSY) {
232             DEBUG_CAPOPS("%s: enqueueing message b/c flounder busy\n", __FUNCTION__);
233             cap_receive_request_enqueue(domain_binding, domain_id, msgerr, cap,
234                                         capid, your_mon_id, b, first);
235         } else {
236             if (!capref_is_null(cap)) {
237                 err2 = cap_destroy(cap);
238                 if (err_is_fail(err2)) {
239                     USER_PANIC_ERR(err, "cap_destroy failed");
240                 }
241             }
242             // TODO: handle sanely: kill dispatcher/teardown binding/etc.
243             USER_PANIC_ERR(err, "error delivering cap to local dispatcher");
244         }
245     }
246 }
247
248 static void cap_receive_request_handler(struct monitor_binding *b,
249                                         struct monitor_msg_queue_elem *e)
250 {
251     struct cap_receive_request_state *st =
252         (struct cap_receive_request_state *)e;
253     cap_receive_request_cont(b, st->args.conn_id, st->args.err, st->args.cap,
254                              st->args.capid, st->your_mon_id, st->b, true);
255     free(e);
256 }
257
258 struct cap_send_request_st {
259     struct captx_recv_state captx_st;
260     struct intermon_binding *b;
261     uint32_t capid;
262     uintptr_t my_mon_id;
263 };
264
265 static void
266 cap_send_request_caprecv_cont(errval_t err, struct captx_recv_state *captx_st,
267                               struct capref cap, void *st_)
268 {
269 #if defined (DEBUG_MONITOR_CAPOPS)
270     char buf[256];
271     debug_print_capref(buf,256,cap);
272     DEBUG_CAPOPS("%s: %s (cap: %s)\n", __FUNCTION__, err_getstring(err), buf);
273 #endif
274     struct cap_send_request_st *st = (struct cap_send_request_st*)st_;
275
276     uintptr_t my_mon_id = st->my_mon_id;
277     struct remote_conn_state *conn = remote_conn_lookup(my_mon_id);
278     assert(conn != NULL);
279     uintptr_t your_mon_id = conn->mon_id;
280
281     // Get the user domain's connection and connection id
282     struct monitor_binding *domain_binding = conn->domain_binding;
283     uintptr_t domain_id = conn->domain_id;
284
285     // Try to send cap to the user domain, but only if the queue is empty
286     struct monitor_state *mst = domain_binding->st;
287     if (msg_queue_is_empty(&mst->queue)) {
288         DEBUG_CAPOPS("deliver cap to user domain 0x%"PRIxPTR"\n", domain_id);
289         cap_receive_request_cont(domain_binding, domain_id, err, cap, st->capid,
290                                  your_mon_id, st->b, false);
291     } else {
292         DEBUG_CAPOPS("enqueue cap for delivery to user domain\n");
293         // don't allow sends to bypass the queue
294         cap_receive_request_enqueue(domain_binding, domain_id, err, cap, st->capid,
295                                     your_mon_id, st->b, false);
296     }
297 }
298
299 static void
300 cap_send_request(struct intermon_binding *b, mon_id_t my_mon_id,
301                  uint32_t capid, intermon_captx_t captx)
302 {
303     DEBUG_CAPOPS("intermon: %s\n", __FUNCTION__);
304     errval_t err;
305
306     struct cap_send_request_st *st;
307     st = malloc(sizeof(*st));
308     if (!st) {
309         err = LIB_ERR_MALLOC_FAIL;
310         goto send_err;
311     }
312
313     st->capid = capid;
314     st->my_mon_id = my_mon_id;
315     st->b = b;
316
317     captx_handle_recv(&captx, &st->captx_st,
318                       cap_send_request_caprecv_cont, st);
319
320     return;
321
322 send_err:
323     // XXX... should send error here
324     DEBUG_ERR(err, "error while handling intermon send_request");
325 }
326
327 static void span_domain_request(struct intermon_binding *b,
328                                 state_id_t state_id, genpaddr_t vnodebase,
329                                 genpaddr_t framebase, gensize_t framebytes)
330 {
331     errval_t err, err2;
332
333     /* Sender's core_id */
334     struct intermon_state *st = b->st;
335     coreid_t core_id = st->core_id;
336
337     trace_event(TRACE_SUBSYS_MONITOR, TRACE_EVENT_MONITOR_SPAN, disp_get_core_id());
338
339     /* Contruct vroot */
340     struct capability vnode_cap = {
341         .type = ObjType_VNode_x86_64_pml4,
342         .rights = CAPRIGHTS_READ_WRITE, // XXX
343         .u.vnode_x86_64_pml4 = {
344             .base = vnodebase,
345         }
346     };
347     struct capref vroot;
348     err = slot_alloc(&vroot);
349     if (err_is_fail(err)) {
350         err_push(err, LIB_ERR_SLOT_ALLOC);
351         goto reply;
352     }
353     err = monitor_cap_create(vroot, &vnode_cap, core_id);
354     if (err_is_fail(err)) {
355         err_push(err, MON_ERR_CAP_CREATE);
356         goto reply;
357     }
358
359     /* Construct disp frame */
360     struct capability dispframe_cap = {
361         .type = ObjType_Frame,
362         .rights = CAPRIGHTS_READ_WRITE, // XXX
363         .u.frame = {
364             .base = framebase,
365             .bytes = framebytes
366         }
367     };
368     struct capref disp;
369     err = slot_alloc(&disp);
370     if (err_is_fail(err)) {
371         err_push(err, LIB_ERR_SLOT_ALLOC);
372         goto reply;
373     }
374     err = monitor_cap_create(disp, &dispframe_cap, core_id);
375     if (err_is_fail(err)) {
376         err_push(err, MON_ERR_CAP_CREATE);
377         goto reply;
378     }
379
380     err = monitor_remote_relations(disp, RRELS_COPY_BIT, RRELS_COPY_BIT, NULL);
381     if (err_is_fail(err)) {
382         USER_PANIC_ERR(err, "monitor_remote_relations failed");
383         return;
384     }
385     err = monitor_remote_relations(vroot, RRELS_COPY_BIT, RRELS_COPY_BIT, NULL);
386     if (err_is_fail(err)) {
387         USER_PANIC_ERR(err, "monitor_remote_relations failed");
388         return;
389     }
390
391     err = span_domain(vroot, disp);
392     if (err_is_fail(err)) {
393         err_push(err, MON_ERR_SPAN_DOMAIN);
394     }
395
396  reply:
397     err2 = b->tx_vtbl.span_domain_reply(b, NOP_CONT, state_id, err);
398     if (err_is_fail(err2)) {
399         USER_PANIC_ERR(err2, "Failed to reply to the monitor");
400     }
401     err2 = cap_destroy(vroot);
402     if (err_is_fail(err2)) {
403         USER_PANIC_ERR(err2, "Failed to destroy span_vroot cap");
404     }
405     err2 = cap_destroy(disp);
406     if (err_is_fail(err2)) {
407         USER_PANIC_ERR(err2, "Failed to destroy disp cap");
408     }
409 }
410
411 static void span_domain_reply(struct intermon_binding *b,
412                               uint64_t state_id, errval_t msgerr)
413 {
414     errval_t err;
415     struct span_state *state = span_state_lookup(state_id);
416     err = state->mb->tx_vtbl.span_domain_reply(state->mb, NOP_CONT, msgerr,
417                                                state->domain_id);
418     if (err_is_fail(err)) {
419         USER_PANIC_ERR(err, "Replying to the domain failed");
420     }
421
422     err = span_state_free(state_id);
423     if (err_is_fail(err)) {
424         USER_PANIC_ERR(err, "Freeing span state failed");
425     }
426 }
427
428 static void add_spawnd(struct intermon_binding *b, iref_t iref)
429 {
430     struct intermon_state *st = (struct intermon_state*) b->st;
431     errval_t err = proc_mgmt_add_spawnd(iref, st->core_id);
432     if (err_is_fail(err)) {
433         USER_PANIC_ERR(err, "Sending proc_mgmt_add_spawnd request failed");
434     }
435 }
436
437 static void trace_caps_request(struct intermon_binding *b)
438 {
439     errval_t err;
440
441     /* Identify the frame cap */
442     struct capref tracecap = {
443         .cnode  = cnode_task,
444         .slot   = TASKCN_SLOT_TRACEBUF
445     };
446     struct capability tracecapa;
447     err = monitor_cap_identify(tracecap, &tracecapa);
448     if (err_is_fail(err)) {
449         USER_PANIC_ERR(err, "monitor_cap_identify failed");
450         return;
451     }
452
453     intermon_caprep_t caprep;
454     capability_to_caprep(&tracecapa, &caprep);
455
456     err = b->tx_vtbl.trace_caps_reply(b, NOP_CONT, caprep);
457     if (err_is_fail(err)) {
458         USER_PANIC_ERR(err, "sending trace_caps_reply failed");
459     }
460 }
461
462 static void trace_caps_reply(struct intermon_binding *b,
463                              intermon_caprep_t caprep)
464 {
465     struct capability capability;
466     caprep_to_capability(&caprep, &capability);
467     assert(capability.type != ObjType_Null);
468
469     trace_cap.cnode = cnode_task;
470     trace_cap.slot = TASKCN_SLOT_TRACEBUF;
471
472     errval_t err = monitor_cap_create(trace_cap, &capability, my_core_id);
473     if (err_is_fail(err)) {
474         USER_PANIC_ERR(err, "monitor_cap_create failed");
475     }
476 }
477
478 static void mem_serv_iref_request(struct intermon_binding *b)
479 {
480     errval_t err;
481     err = b->tx_vtbl.mem_serv_iref_reply(b, NOP_CONT, mem_serv_iref);
482     if (err_is_fail(err)) {
483         USER_PANIC_ERR(err, "sending mem_serv_iref_reply failed");
484     }
485 }
486
487 static void mem_serv_iref_reply(struct intermon_binding *b, iref_t iref)
488 {
489     assert(mem_serv_iref == 0);
490     mem_serv_iref = iref;
491 }
492
493 static void name_serv_iref_request(struct intermon_binding *b)
494 {
495     errval_t err;
496     err = b->tx_vtbl.name_serv_iref_reply(b, NOP_CONT, name_serv_iref);
497     if (err_is_fail(err)) {
498         USER_PANIC_ERR(err, "sending mem_serv_iref_reply failed");
499     }
500 }
501
502 static void name_serv_iref_reply(struct intermon_binding *b, iref_t iref)
503 {
504     assert(name_serv_iref == 0);
505     name_serv_iref = iref;
506 }
507
508 static void monitor_mem_iref_request(struct intermon_binding *b)
509 {
510     errval_t err;
511     err = b->tx_vtbl.monitor_mem_iref_reply(b, NOP_CONT, monitor_mem_iref);
512     if (err_is_fail(err)) {
513         USER_PANIC_ERR(err, "sending mem_serv_iref_reply failed");
514     }
515 }
516
517 static void monitor_mem_iref_reply(struct intermon_binding *b, iref_t iref)
518 {
519     assert(monitor_mem_iref == 0);
520     monitor_mem_iref = iref;
521 }
522
523 static void ramfs_serv_iref_request(struct intermon_binding *b)
524 {
525     errval_t err;
526     err = b->tx_vtbl.ramfs_serv_iref_reply(b, NOP_CONT, ramfs_serv_iref);
527     if (err_is_fail(err)) {
528         USER_PANIC_ERR(err, "sending ramfs_serv_iref_request failed");
529     }
530 }
531
532 static void ramfs_serv_iref_reply(struct intermon_binding *b, iref_t iref)
533 {
534     assert(ramfs_serv_iref == 0);
535     ramfs_serv_iref = iref;
536 }
537
538 static void inter_rsrc_join(struct intermon_binding *b,
539                             rsrcid_t id, uint8_t coreid)
540 {
541     errval_t err = rsrc_join_satellite(id, coreid);
542     if (err_is_fail(err)) {
543         USER_PANIC_ERR(err, "rsrc_join_satellite failed");
544     }
545 }
546
547 struct rsrc_timer_sync__args {
548     errval_t error;
549 };
550
551 struct rsrc_timer_sync_state {
552     struct intermon_msg_queue_elem elem;
553     struct rsrc_timer_sync__args args;
554 };
555
556 static void inter_rsrc_timer_sync_retry(struct intermon_binding *b,
557                                         struct intermon_msg_queue_elem *e);
558
559 static void inter_rsrc_timer_sync_cont(struct intermon_binding *b,
560                                        errval_t msgerr)
561 {
562     errval_t err;
563     err = b->tx_vtbl.rsrc_timer_sync_reply(b, NOP_CONT, msgerr);
564     if(err_is_fail(err)) {
565         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
566             struct rsrc_timer_sync_state *me =
567                 malloc(sizeof(struct rsrc_timer_sync_state));
568             assert(me);
569             struct intermon_state *st = b->st;
570             me->elem.cont = inter_rsrc_timer_sync_retry;
571             me->args.error = msgerr;
572             err = intermon_enqueue_send(b, &st->queue, get_default_waitset(),
573                                         &me->elem.queue);
574             if (err_is_fail(err)) {
575                 USER_PANIC_ERR(err, "intermon_enqueue_send failed");
576             }
577         } else {
578             USER_PANIC_ERR(err, "sending rsrc_timer_sync_reply failed");
579         }
580     }
581 }
582
583 static void inter_rsrc_timer_sync_retry(struct intermon_binding *b,
584                                         struct intermon_msg_queue_elem *e)
585 {
586     struct rsrc_timer_sync_state *st = (struct rsrc_timer_sync_state*)e;
587     inter_rsrc_timer_sync_cont(b, st->args.error);
588 }
589
590 static void inter_rsrc_timer_sync(struct intermon_binding *b,
591                                   uint64_t timestamp)
592 {
593     errval_t err = invoke_monitor_sync_timer(timestamp);
594     inter_rsrc_timer_sync_cont(b, err);
595 }
596
597 static void inter_rsrc_timer_sync_reply(struct intermon_binding *b,
598                                         errval_t err)
599 {
600     // Relay to timing code
601     timing_sync_timer_reply(err);
602 }
603
604 static void inter_rsrc_phase(struct intermon_binding *b, rsrcid_t id,
605                              uint32_t phase, uint64_t timestamp)
606 {
607     errval_t err = rsrc_set_phase_inter(id, phase, timestamp);
608     if (err_is_fail(err)) {
609         USER_PANIC_ERR(err, "rsrc_set_phase_inter failed");
610     }
611 }
612
613 static void inter_rsrc_phase_data(struct intermon_binding *b, rsrcid_t id,
614                                   uint32_t phase, const uint8_t *data, size_t len)
615 {
616     errval_t err = rsrc_set_phase_data(id, phase, (CONST_CAST)data, len);
617     assert(err_is_ok(err));
618 }
619
620 static void inter_rsrc_join_complete(struct intermon_binding *b, rsrcid_t id)
621 {
622     struct monitor_blocking_binding *mb = rsrc_get_binding(id);
623
624     assert(mb != NULL);
625     errval_t err = mb->tx_vtbl.rsrc_join_response(mb, NOP_CONT, SYS_ERR_OK);
626     assert(err_is_ok(err));
627 }
628
629 static void spawnd_image_request(struct intermon_binding *b)
630 {
631     assert(bsp_monitor);
632     errval_t err;
633
634     struct mem_region *mod = multiboot_find_module(bi, "/spawnd");
635     if (mod == NULL) {
636         USER_PANIC("didn't find spawnd module in multiboot image");
637     }
638
639     assert(mod->mr_type == RegionType_Module);
640
641     err = b->tx_vtbl.spawnd_image_reply(b, NOP_CONT, mod->mr_base, mod->mrmod_size);
642     assert(err_is_ok(err));
643 }
644
645 static void give_kcb_request(struct intermon_binding *b, intermon_caprep_t kcb_rep)
646 {
647     errval_t err;
648     struct capability kcb_cap;
649
650     caprep_to_capability(&kcb_rep, &kcb_cap);
651     assert(kcb_cap.type != ObjType_Null);
652
653     struct capref kcb_capref;
654     err = slot_alloc(&kcb_capref);
655     if (err_is_fail(err)) {
656         USER_PANIC_ERR(err, "Can't allocate slot for kcb_capref.");
657     }
658
659     err = monitor_cap_create(kcb_capref, &kcb_cap, my_core_id);
660     if (err_is_fail(err)) {
661         USER_PANIC_ERR(err, "monitor_cap_create failed");
662     }
663
664     printf("%s:%s:%d: Remote monitor: give kcb to kernel\n",
665                 __FILE__, __FUNCTION__, __LINE__);
666     uintptr_t kcb_base = (uintptr_t)kcb_cap.u.kernelcontrolblock.kcb;
667     err = invoke_monitor_add_kcb(kcb_base);
668     if (err_is_fail(err)) {
669         USER_PANIC_ERR(err, "invoke_monitor_add_kcb failed.");
670     }
671
672     err = b->tx_vtbl.give_kcb_response(b, NOP_CONT, SYS_ERR_OK);
673     assert(err_is_ok(err));
674 }
675
676 static void give_kcb_response(struct intermon_binding *ib, errval_t error)
677 {
678     printf("%s:%s:%d: Local monitor received answer\n",
679                 __FILE__, __FUNCTION__, __LINE__);
680     if (err_is_fail(error)) {
681         USER_PANIC_ERR(error, "give kcb did not work.");
682     }
683
684     struct monitor_blocking_binding * b = (struct monitor_blocking_binding *) ib->st;
685     if (b != NULL) {
686         errval_t err = b->tx_vtbl.forward_kcb_request_response(b, NOP_CONT, error);
687         assert(err_is_ok(err));
688     }
689 }
690
691 static void forward_kcb_rm_request(struct intermon_binding *b, uint64_t kcb_base)
692 {
693     errval_t err;
694     // don't switch kcbs on the current core
695     err = invoke_monitor_suspend_kcb_scheduler(true);
696     assert(err_is_ok(err));
697     // remove kcb from ring
698     err = invoke_monitor_remove_kcb((uintptr_t) kcb_base);
699     assert(err_is_ok(err));
700     // send reply
701     err = b->tx_vtbl.forward_kcb_rm_response(b, NOP_CONT, SYS_ERR_OK);
702     assert(err_is_ok(err));
703     // disp_save_rm_kcb -> next kcb -> enable kcb switching again
704     disp_save_rm_kcb();
705     // send monitor initialized when we're back up
706     //err = b->tx_vtbl.monitor_initialized(b, NOP_CONT);
707     //assert(err_is_ok(err));
708 }
709
710 static void forward_kcb_rm_response(struct intermon_binding *b, errval_t error)
711 {
712     //XXX: HACK
713     struct monitor_blocking_binding *mb =
714         (struct monitor_blocking_binding*)
715         ((struct intermon_state*)b->st)->originating_client;
716
717     debug_printf("received kcb_rm response on %d, forwarding to %p\n", my_core_id, mb);
718
719     mb->tx_vtbl.forward_kcb_rm_request_response(mb, NOP_CONT, error);
720 }
721
722 static struct intermon_rx_vtbl the_intermon_vtable = {
723     .trace_caps_request = trace_caps_request,
724     .trace_caps_reply = trace_caps_reply,
725     .mem_serv_iref_request = mem_serv_iref_request,
726     .mem_serv_iref_reply = mem_serv_iref_reply,
727     .name_serv_iref_request = name_serv_iref_request,
728     .name_serv_iref_reply = name_serv_iref_reply,
729     .ramfs_serv_iref_request = ramfs_serv_iref_request,
730     .ramfs_serv_iref_reply = ramfs_serv_iref_reply,
731     .monitor_mem_iref_request = monitor_mem_iref_request,
732     .monitor_mem_iref_reply = monitor_mem_iref_reply,
733
734     .capops_ready              = capops_ready,
735     .monitor_initialized       = monitor_initialized,
736
737     .spawnd_image_request      = spawnd_image_request,
738
739     .cap_send_request          = cap_send_request,
740
741     .span_domain_request       = span_domain_request,
742     .span_domain_reply         = span_domain_reply,
743
744     .add_spawnd                = add_spawnd,
745
746     .rsrc_join                 = inter_rsrc_join,
747     .rsrc_join_complete        = inter_rsrc_join_complete,
748     .rsrc_timer_sync           = inter_rsrc_timer_sync,
749     .rsrc_timer_sync_reply     = inter_rsrc_timer_sync_reply,
750     .rsrc_phase                = inter_rsrc_phase,
751     .rsrc_phase_data           = inter_rsrc_phase_data,
752
753     .give_kcb_request = give_kcb_request,
754     .give_kcb_response = give_kcb_response,
755
756     .forward_kcb_rm_request = forward_kcb_rm_request,
757     .forward_kcb_rm_response = forward_kcb_rm_response,
758 };
759
760 errval_t intermon_init(struct intermon_binding *b, coreid_t coreid)
761 {
762     errval_t err;
763
764     struct intermon_state *st = malloc(sizeof(struct intermon_state));
765     assert(st != NULL);
766
767     st->core_id = coreid;
768     st->binding = b;
769     st->queue.head = st->queue.tail = NULL;
770     st->rsrcid_inflight = false;
771     st->capops_ready = true;
772     st->originating_client = NULL;
773     b->st = st;
774     b->rx_vtbl = the_intermon_vtable;
775
776 #ifdef CONFIG_INTERCONNECT_DRIVER_UMP
777     err = ump_intermon_init(b);
778     if (err_is_fail(err)) {
779         USER_PANIC_ERR(err, "ump_intermon_init failed");
780         return err;
781     }
782 #endif
783
784 #ifdef CONFIG_INTERCONNECT_DRIVER_MULTIHOP
785     err = multihop_intermon_init(b);
786     if (err_is_fail(err)) {
787         USER_PANIC_ERR(err, "multihop_intermon_init failed");
788         return err;
789     }
790 #endif
791
792 #if CONFIG_TRACE
793     err = trace_intermon_init(b);
794     if (err_is_fail(err)) {
795         USER_PANIC_ERR(err, "trace_intermon_init failed");
796         return err;
797     }
798
799     err = bfscope_intermon_init(b);
800     if (err_is_fail(err)) {
801         USER_PANIC_ERR(err, "bfscope_intermon_init failed");
802         return err;
803     }
804 #endif
805
806     err = arch_intermon_init(b);
807     if (err_is_fail(err)) {
808         USER_PANIC_ERR(err, "arch_intermon_init failed");
809         return err;
810     }
811
812     err = capops_init(b->waitset, b);
813     if (err_is_fail(err)) {
814         USER_PANIC_ERR(err, "capops_intermon_init failed");
815         return err;
816     }
817
818     err = intermon_binding_set(st);
819     if (err_is_fail(err)) {
820         USER_PANIC_ERR(err, "intermon_binding_set failed");
821         return err;
822     }
823
824     return SYS_ERR_OK;
825 }