T65: implement proper x-core join.
authorSimon Gerber <simon.gerber@inf.ethz.ch>
Tue, 24 Feb 2015 12:22:28 +0000 (13:22 +0100)
committerSimon Gerber <simon.gerber@inf.ethz.ch>
Mon, 9 Mar 2015 14:36:28 +0000 (15:36 +0100)
Signed-off-by: Simon Gerber <simon.gerber@inf.ethz.ch>

if/interdisp.if
include/barrelfish/domain.h
lib/barrelfish/domain.c
lib/barrelfish/threads.c
lib/posixcompat/pthreads.c
usr/tests/spantest/xcorejoin.c [new file with mode: 0644]

index ea4e2b3..e34a8c5 100644 (file)
@@ -21,6 +21,9 @@ interface interdisp "Interface between domains spanning cores" {
        message create_thread_request(genvaddr start_func, genvaddr arg, uint64 stacksize, genvaddr req);
        message create_thread_reply(errval err, genvaddr thread, genvaddr req);
 
+        message join_thread_request(genvaddr thread, genvaddr req);
+        message join_thread_reply(errval err, uint64 retval, genvaddr req);
+
        message span_slave();
        message span_slave_done();
        message span_eager_connect(coreid core_id);
index f8d4c40..ab185f7 100644 (file)
@@ -75,6 +75,7 @@ errval_t domain_thread_create_on_varstack(coreid_t core_id,
                                           thread_func_t start_func,
                                           void *arg, size_t stacksize,
                                           struct thread **newthread);
+errval_t domain_thread_join(struct thread *thread, int *retval);
 errval_t domain_send_cap(coreid_t core_id, struct capref cap);
 errval_t domain_wakeup_on(dispatcher_handle_t disp, struct thread *thread);
 errval_t domain_wakeup_on_disabled(dispatcher_handle_t disp,
index 6c1a457..8e85322 100644 (file)
@@ -205,6 +205,32 @@ static void wakeup_thread_request(struct interdisp_binding *b,
     disp_enable(handle);
 }
 
+static void join_thread_request(struct interdisp_binding *b,
+                                genvaddr_t taddr, genvaddr_t req)
+{
+    struct thread *thread = (struct thread *)taddr;
+    assert(thread->coreid == disp_get_core_id());
+    int retval = 0;
+    errval_t err = thread_join(thread, &retval);
+    err = b->tx_vtbl.join_thread_reply(b, NOP_CONT, err, retval, req);
+    assert(err_is_ok(err));
+}
+
+struct join_thread_req {
+    errval_t err;
+    int retval;
+    bool reply_received;
+};
+
+static void join_thread_reply(struct interdisp_binding *b,
+                              errval_t err, uint64_t retval, genvaddr_t req)
+{
+    struct join_thread_req *r = (struct join_thread_req *)req;
+    r->err = err;
+    r->retval = retval;
+    r->reply_received = true;
+}
+
 /*
  * XXX: The whole span_slave*() thing is a hack to allow all
  * dispatchers to wait on both the monitor and interdisp waitsets
@@ -273,6 +299,9 @@ static struct interdisp_rx_vtbl interdisp_vtbl = {
     .create_thread_request = create_thread_request,
     .create_thread_reply   = create_thread_reply,
 
+    .join_thread_request   = join_thread_request,
+    .join_thread_reply     = join_thread_reply,
+
     // XXX: Hack to allow domain_new_dispatcher() to proceed when not all
     // default waitsets are serviced
     .span_slave       = span_slave_request,
@@ -952,6 +981,50 @@ errval_t domain_thread_create_on(coreid_t core_id, thread_func_t start_func,
     return domain_thread_create_on_varstack(core_id, start_func, arg, 0, newthread);
 }
 
+errval_t domain_thread_join(struct thread *thread, int *retval)
+{
+    coreid_t core_id = thread->coreid;
+    debug_printf("%s: joining %p, coreid %d\n", __FUNCTION__, thread, core_id);
+    if (disp_get_core_id() == core_id) {
+        return thread_join(thread, retval);
+    } else {
+        struct domain_state *domain_state = get_domain_state();
+        errval_t err;
+
+        if (domain_state->b[core_id] == NULL) {
+            return LIB_ERR_NO_SPANNED_DISP;
+        }
+
+        struct interdisp_binding *b = domain_state->b[core_id];
+        struct join_thread_req *req = malloc(sizeof(*req));
+        req->reply_received = false;
+        // use special waitset to make sure loop exits properly.
+        struct waitset ws, *old_ws = b->waitset;
+        waitset_init(&ws);
+        b->change_waitset(b, &ws);
+        err = b->tx_vtbl.join_thread_request(b, NOP_CONT,
+                                             (genvaddr_t)thread,
+                                             (genvaddr_t)req);
+        if (err_is_fail(err)) {
+            return err;
+        }
+
+        while (!req->reply_received) {
+            event_dispatch(&ws);
+        }
+        // change waitset back
+        b->change_waitset(b, old_ws);
+
+        if (retval) {
+            *retval = req->retval;
+        }
+        err = req->err;
+        free(req);
+
+        return err;
+    }
+}
+
 /**
  * \brief set the core_id.
  *
index f26e311..cdac9f1 100644 (file)
@@ -507,6 +507,8 @@ struct thread *thread_create(thread_func_t start_func, void *arg)
 errval_t thread_join(struct thread *thread, int *retval)
 {
     assert(thread != NULL);
+    // this function should only be called for threads on same core
+    assert(thread->coreid == disp_get_core_id());
 
     thread_mutex_lock(&thread->exit_lock);
     if(thread->detached) {
index d912867..e99f900 100644 (file)
@@ -331,7 +331,7 @@ int pthread_cond_destroy(pthread_cond_t *cond)
 int pthread_join(pthread_t thread, void **retval)
 {
     debug_printf("%s: %p\n", __FUNCTION__, thread);
-    errval_t err = thread_join(thread->thread, NULL);
+    errval_t err = domain_thread_join(thread->thread, NULL);
     assert(err_is_ok(err));
 
     if (pthread_placement) {
diff --git a/usr/tests/spantest/xcorejoin.c b/usr/tests/spantest/xcorejoin.c
new file mode 100644 (file)
index 0000000..3f4c5d0
--- /dev/null
@@ -0,0 +1,78 @@
+/**
+ * \file
+ * \brief Test spanning of domains across cores
+ */
+
+/*
+ * Copyright (c) 2007, 2008, 2010, ETH Zurich.
+ * All rights reserved.
+ *
+ * This file is distributed under the terms in the attached LICENSE file.
+ * If you do not find this file, copies can be found by writing to:
+ * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <barrelfish/barrelfish.h>
+#include <barrelfish/dispatch.h>
+#include <barrelfish/waitset.h>
+
+
+static int remote(void *dummy)
+{
+    uintptr_t v = (uintptr_t)dummy;
+    debug_printf("v = %"PRIuPTR"\n", v);
+    return v;
+}
+
+int ndispatchers = 1;
+
+static void domain_spanned_callback(void *arg, errval_t err)
+{
+    ndispatchers++;
+}
+
+int main(int argc, char *argv[])
+{
+    errval_t err;
+    if (argc != 2) {
+        printf("Usage %s: <Num additional threads>\n", argv[0]);
+        exit(-1);
+    }
+
+
+    //printf("main running on %d\n", disp_get_core_id());
+
+    int cores = strtol(argv[1], NULL, 10) + 1;
+
+    for(int i = 1; i < cores; i++) {
+        err = domain_new_dispatcher(i + disp_get_core_id(),
+                                    domain_spanned_callback,
+                                    (void*)(uintptr_t)i);
+        if (err_is_fail(err)) {
+            USER_PANIC_ERR(err, "domain_new_dispatcher failed");
+        }
+    }
+
+    while (ndispatchers < cores) {
+        thread_yield();
+    }
+
+    struct thread *threads[cores];
+    for(int i = 1; i < cores; i++) {
+        err = domain_thread_create_on(i, remote, (void*)(uintptr_t)i, &threads[i]);
+        assert(err_is_ok(err));
+        debug_printf("created %p\n", threads[i]);
+    }
+
+    for(int i = 1; i < cores; i++) {
+        int retval;
+        debug_printf("joining %p\n", threads[i]);
+        err = domain_thread_join(threads[i], &retval);
+        assert(err_is_ok(err));
+        debug_printf("retval = %d\n", retval);
+        assert(retval == i);
+    }
+    return 0;
+}