capsend broadcast: can now remember set of destination cores for operations involving...
authorSimon Gerber <simon.gerber@inf.ethz.ch>
Fri, 28 Nov 2014 15:26:12 +0000 (16:26 +0100)
committerSimon Gerber <simon.gerber@inf.ethz.ch>
Fri, 28 Nov 2014 15:26:12 +0000 (16:26 +0100)
usr/monitor/capops/capsend.c
usr/monitor/capops/revoke.c
usr/monitor/include/capsend.h

index 7ed2456..1f9ca9e 100644 (file)
@@ -157,42 +157,82 @@ bool capsend_handle_mc_reply(struct capsend_mc_st *st)
  */
 
 static errval_t
-capsend_broadcast(struct capsend_mc_st *bc_st, struct capability *cap, capsend_send_fn send_cont)
+capsend_broadcast(struct capsend_mc_st *bc_st, struct capsend_destset *dests,
+        struct capability *cap, capsend_send_fn send_cont)
 {
     errval_t err;
+    size_t dest_count;
+    bool init_destset = false;
     size_t online_monitors = num_monitors_online();
     // do not count self when calculating #dest cores
-    int dest_count = online_monitors - 1;
+    dest_count = online_monitors - 1;
     DEBUG_CAPOPS("%s: dest_count = %d\n", __FUNCTION__, dest_count);
     DEBUG_CAPOPS("%s: num_queued = %d\n", __FUNCTION__, bc_st->num_queued);
     DEBUG_CAPOPS("%s: num_pending = %d\n", __FUNCTION__, bc_st->num_pending);
+    if (dests && dests->set == NULL) {
+        dests->set = calloc(dest_count, sizeof(coreid_t));
+        dests->capacity = dest_count;
+        dests->count = 0;
+        init_destset = true;
+    } else if (dests) {
+        dest_count = dests->count;
+    }
     err = capsend_mc_init(bc_st, cap, send_cont, dest_count, true);
     if (err_is_fail(err)) {
         free(bc_st);
     }
 
-    for (coreid_t dest = 0; dest < MAX_COREID && bc_st->num_queued < dest_count; dest++)
-    {
-        if (dest == my_core_id) {
-            // do not send to self
-            continue;
+    if (init_destset || !dests) {
+        for (coreid_t dest = 0; dest < MAX_COREID && bc_st->num_queued < dest_count; dest++)
+        {
+            if (dest == my_core_id) {
+                // do not send to self
+                continue;
+            }
+            err = capsend_mc_enqueue(bc_st, dest);
+            if (err_is_ok(err) && dests) {
+                // if we're initializing destination set, add destination
+                // cores that we were able to enqueue msg for to set.
+                dests->set[dests->count++] = dest;
+            }
+            if (err_no(err) == MON_ERR_NO_MONITOR_FOR_CORE) {
+                // no connection for this core, skip
+                continue;
+            } else if (err_no(err) == MON_ERR_CAPOPS_BUSY) {
+                debug_printf("monitor.%d not ready to participate in distops, skipping\n",
+                        dest);
+            } else if (err_is_fail(err)) {
+                // failure, disable broadcast
+                bc_st->do_send = false;
+                if (!bc_st->num_queued) {
+                    // only cleanup of no messages have been enqueued
+                    free(bc_st->msg_st_arr);
+                    free(bc_st);
+                }
+                return err;
+            }
         }
-        err = capsend_mc_enqueue(bc_st, dest);
-        if (err_no(err) == MON_ERR_NO_MONITOR_FOR_CORE) {
-            // no connection for this core, skip
-            continue;
-        } else if (err_no(err) == MON_ERR_CAPOPS_BUSY) {
-            debug_printf("monitor.%d not ready to participate in distops, skipping\n",
-                    dest);
-        } else if (err_is_fail(err)) {
-            // failure, disable broadcast
-            bc_st->do_send = false;
-            if (!bc_st->num_queued) {
-                // only cleanup of no messages have been enqueued
-                free(bc_st->msg_st_arr);
-                free(bc_st);
+    } else {
+        for (int i = 0; i < dest_count; i++) {
+            coreid_t dest = dests->set[i];
+
+            err = capsend_mc_enqueue(bc_st, dest);
+            if (err_no(err) == MON_ERR_NO_MONITOR_FOR_CORE) {
+                // no connection for this core, skip
+                continue;
+            } else if (err_no(err) == MON_ERR_CAPOPS_BUSY) {
+                debug_printf("monitor.%d not ready to participate in distops, skipping\n",
+                        dest);
+            } else if (err_is_fail(err)) {
+                // failure, disable broadcast
+                bc_st->do_send = false;
+                if (!bc_st->num_queued) {
+                    // only cleanup of no messages have been enqueued
+                    free(bc_st->msg_st_arr);
+                    free(bc_st);
+                }
+                return err;
             }
-            return err;
         }
     }
 
@@ -238,7 +278,7 @@ capsend_find_cap(struct capability *cap, capsend_find_cap_result_fn result_handl
     bc_st->found = false;
     bc_st->st = st;
 
-    return capsend_broadcast((struct capsend_mc_st*)bc_st, cap, find_cap_broadcast_send_cont);
+    return capsend_broadcast((struct capsend_mc_st*)bc_st, NULL, cap, find_cap_broadcast_send_cont);
 }
 
 /*
@@ -392,7 +432,8 @@ capsend_find_descendants(struct domcapref src, capsend_result_fn result_fn, void
     mc_st->result_fn = result_fn;
     mc_st->st = st;
     mc_st->have_result = false;
-    return capsend_relations(&cap, find_descendants_send_cont, (struct capsend_mc_st*)mc_st);
+    return capsend_relations(&cap, find_descendants_send_cont,
+            (struct capsend_mc_st*)mc_st, NULL);
 }
 
 
@@ -513,7 +554,7 @@ capsend_update_owner(struct domcapref capref, struct event_closure completion_co
     }
     bc_st->completion_continuation = completion_continuation;
 
-    return capsend_broadcast((struct capsend_mc_st*)bc_st, &cap, update_owner_broadcast_send_cont);
+    return capsend_broadcast((struct capsend_mc_st*)bc_st, NULL, &cap, update_owner_broadcast_send_cont);
 }
 
 /*
@@ -621,14 +662,15 @@ capsend_copies(struct capability *cap,
             struct capsend_mc_st *mc_st)
 {
     // this is currently just a broadcast
-    return capsend_broadcast(mc_st, cap, send_fn);
+    return capsend_broadcast(mc_st, NULL, cap, send_fn);
 }
 
 errval_t
 capsend_relations(struct capability *cap,
                   capsend_send_fn send_fn,
-                  struct capsend_mc_st *mc_st)
+                  struct capsend_mc_st *mc_st,
+                  struct capsend_destset *dests)
 {
     // this is currently just a broadcast
-    return capsend_broadcast(mc_st, cap, send_fn);
+    return capsend_broadcast(mc_st, dests, cap, send_fn);
 }
index ef1a31d..7368f77 100644 (file)
@@ -24,6 +24,7 @@ struct revoke_master_st {
     struct domcapref cap;
     struct capability rawcap;
     struct capsend_mc_st revoke_mc_st;
+    struct capsend_destset dests;
     revoke_result_handler_t result_handler;
     void *st;
     bool local_fin, remote_fin;
@@ -163,7 +164,8 @@ revoke_local(struct revoke_master_st *st)
     PANIC_IF_ERR(err, "marking revoke");
 
     // XXX: could check whether remote copies exist here(?), -SG, 2014-11-05
-    err = capsend_relations(&st->rawcap, revoke_mark__send, &st->revoke_mc_st);
+    err = capsend_relations(&st->rawcap, revoke_mark__send,
+            &st->revoke_mc_st, &st->dests);
     PANIC_IF_ERR(err, "initiating revoke mark multicast");
 }
 
@@ -286,7 +288,8 @@ revoke_ready__rx(struct intermon_binding *b, genvaddr_t st)
     }
 
     DEBUG_CAPOPS("%s: sending commit\n", __FUNCTION__);
-    err = capsend_relations(&rvk_st->rawcap, revoke_commit__send, &rvk_st->revoke_mc_st);
+    err = capsend_relations(&rvk_st->rawcap, revoke_commit__send,
+            &rvk_st->revoke_mc_st, &rvk_st->dests);
     PANIC_IF_ERR(err, "enqueing revoke_commit multicast");
 
     delete_steps_resume();
index 9c29db8..975b06d 100644 (file)
@@ -25,6 +25,12 @@ typedef errval_t (*capsend_send_fn)(struct intermon_binding* /*binding*/,
 
 bool capsend_handle_mc_reply(struct capsend_mc_st *mc_st); /* returns true if was last reply */
 
+struct capsend_destset {
+    coreid_t *set;
+    size_t count;
+    size_t capacity;
+};
+
 struct capsend_mc_st {
     struct capsend_mc_msg_st *msg_st_arr;
     int num_pending;
@@ -49,7 +55,8 @@ errval_t capsend_copies(struct capability *cap,
 
 errval_t capsend_relations(struct capability *cap,
                            capsend_send_fn send_fn,
-                           struct capsend_mc_st *mc_st);
+                           struct capsend_mc_st *mc_st,
+                           struct capsend_destset *dests);
 
 typedef void (*capsend_find_cap_result_fn)(errval_t, coreid_t, void*);