T65: properly implement x-core thread creation.
authorSimon Gerber <simon.gerber@inf.ethz.ch>
Mon, 23 Feb 2015 15:45:40 +0000 (16:45 +0100)
committerSimon Gerber <simon.gerber@inf.ethz.ch>
Mon, 9 Mar 2015 14:36:28 +0000 (15:36 +0100)
Signed-off-by: Simon Gerber <simon.gerber@inf.ethz.ch>

25 files changed:
if/interdisp.if
include/barrelfish/domain.h
include/barrelfish/threads.h
include/posixcompat.h
lib/barrelfish/domain.c
lib/barrelfish/init.c
lib/barrelfish/threads.c
lib/bomp/backends/bomp_shared.c
lib/bomp/include/bomp_backend.h
lib/bomp_new/bomp_node.c
lib/phoenix/tpool.c
lib/posixcompat/pthreads.c
lib/tweed/tweed.c
usr/bench/rcce/lu/lu.c
usr/bench/scheduling/clockdrift.c
usr/bench/scheduling/phases.c
usr/bench/scheduling/phases_scale.c
usr/drivers/rtc/main.c
usr/drivers/tulip/main.c
usr/examples/xmpl-span/span.c
usr/skb/testapps/threadalloc.c
usr/tests/fputest/fputest.c
usr/tests/spantest/Hakefile
usr/tests/spantest/spantest.c
usr/tests/yield_test/yield_test.c

index e917fdc..ea4e2b3 100644 (file)
@@ -18,7 +18,8 @@ interface interdisp "Interface between domains spanning cores" {
        message send_cap_reply(errval err);
 
        message wakeup_thread(genvaddr thread);
-       message create_thread(genvaddr start_func, genvaddr arg, uint64 stacksize);
+       message create_thread_request(genvaddr start_func, genvaddr arg, uint64 stacksize, genvaddr req);
+       message create_thread_reply(errval err, genvaddr thread, genvaddr req);
 
        message span_slave();
        message span_slave_done();
index 8284cd5..f8d4c40 100644 (file)
@@ -70,10 +70,11 @@ errval_t domain_new_dispatcher(coreid_t core_id,
                                domain_spanned_callback_t callback,
                                void *callback_arg);
 errval_t domain_thread_create_on(coreid_t core_id, thread_func_t start_func,
-                                 void *arg);
+                                 void *arg, struct thread **newthread);
 errval_t domain_thread_create_on_varstack(coreid_t core_id,
                                           thread_func_t start_func,
-                                          void *arg, size_t stacksize);
+                                          void *arg, size_t stacksize,
+                                          struct thread **newthread);
 errval_t domain_send_cap(coreid_t core_id, struct capref cap);
 errval_t domain_wakeup_on(dispatcher_handle_t disp, struct thread *thread);
 errval_t domain_wakeup_on_disabled(dispatcher_handle_t disp,
index 0e29720..83a2fbc 100644 (file)
@@ -35,7 +35,7 @@ struct thread *thread_create_varstack(thread_func_t start_func, void *arg,
                                       size_t stacksize);
 void thread_yield(void);
 void thread_yield_dispatcher(struct capref endpoint);
-void thread_exit(void);
+void thread_exit(int status);
 struct thread *thread_self(void);
 errval_t thread_join(struct thread *thread, int *retval);
 errval_t thread_detach(struct thread *thread);
index 7ad94e6..d225c5b 100644 (file)
@@ -23,6 +23,13 @@ errval_t spawn_setup_fds(struct capref *frame, int rfd);
 errval_t posixcompat_unpack_fds(void);
 iref_t posixcompat_pts_get_iref(int fd);
 
+enum pthread_action {
+    PTHREAD_ACTION_CREATE,
+    PTHREAD_ACTION_DESTROY,
+};
+typedef int (*pthread_placement_fn)(enum pthread_action action, int coreid);
+errval_t posixcompat_pthread_set_placement_fn(pthread_placement_fn fn);
+
 __END_DECLS
 
 #endif
index 841ac99..6c1a457 100644 (file)
@@ -158,7 +158,7 @@ static void send_cap_reply(struct interdisp_binding *st, errval_t err)
 
 static void create_thread_request(struct interdisp_binding *b,
                                   genvaddr_t funcaddr, genvaddr_t argaddr,
-                                  uint64_t stacksize)
+                                  uint64_t stacksize, genvaddr_t req)
 {
     thread_func_t start_func = (thread_func_t)(uintptr_t)funcaddr;
     void *arg = (void *)(uintptr_t)argaddr;
@@ -171,6 +171,24 @@ static void create_thread_request(struct interdisp_binding *b,
         newthread = thread_create(start_func, arg);
     }
     assert(newthread != NULL);
+    errval_t err = b->tx_vtbl.create_thread_reply(b, NOP_CONT, SYS_ERR_OK,
+                                                  (genvaddr_t)newthread, req);
+    assert(err_is_ok(err));
+}
+
+struct create_thread_req {
+    struct thread *thread;
+    bool reply_received;
+};
+
+static void create_thread_reply(struct interdisp_binding *b,
+                                errval_t err, genvaddr_t thread, genvaddr_t req)
+{
+    assert(err_is_ok(err));
+    // fill out request
+    struct create_thread_req *r = (struct create_thread_req*)req;
+    r->thread = (struct thread *)thread;
+    r->reply_received = true;
 }
 
 static void wakeup_thread_request(struct interdisp_binding *b,
@@ -215,7 +233,7 @@ static void span_slave_done_handler(void *cs)
 {
     USER_PANIC("shouldn't be called");
     free(cs);
-    thread_exit();
+    thread_exit(0);
 }
 
 static void span_slave_done_request(struct interdisp_binding *b)
@@ -251,8 +269,9 @@ static struct interdisp_rx_vtbl interdisp_vtbl = {
     .send_cap_request = send_cap_request,
     .send_cap_reply = send_cap_reply,
 
-    .wakeup_thread    = wakeup_thread_request,
-    .create_thread    = create_thread_request,
+    .wakeup_thread         = wakeup_thread_request,
+    .create_thread_request = create_thread_request,
+    .create_thread_reply   = create_thread_reply,
 
     // XXX: Hack to allow domain_new_dispatcher() to proceed when not all
     // default waitsets are serviced
@@ -871,7 +890,8 @@ errval_t domain_thread_move_to(struct thread *thread, coreid_t core_id)
 
 errval_t domain_thread_create_on_varstack(coreid_t core_id,
                                           thread_func_t start_func,
-                                          void *arg, size_t stacksize)
+                                          void *arg, size_t stacksize,
+                                          struct thread **newthread)
 {
     if (disp_get_core_id() == core_id) {
         struct thread *th = NULL;
@@ -881,6 +901,9 @@ errval_t domain_thread_create_on_varstack(coreid_t core_id,
             th = thread_create_varstack(start_func, arg, stacksize);
         }
         if (th != NULL) {
+            if (newthread) {
+                *newthread = th;
+            }
             return SYS_ERR_OK;
         } else {
             return LIB_ERR_THREAD_CREATE;
@@ -894,22 +917,39 @@ errval_t domain_thread_create_on_varstack(coreid_t core_id,
         }
 
         struct interdisp_binding *b = domain_state->b[core_id];
-        err = b->tx_vtbl.create_thread(b, NOP_CONT,
-                                       (genvaddr_t)(uintptr_t)start_func,
-                                       (genvaddr_t)(uintptr_t)arg,
-                                       stacksize);
+        struct create_thread_req *req = malloc(sizeof(*req));
+        req->reply_received = false;
+        // use special waitset to make sure loop exits properly.
+        struct waitset ws, *old_ws = b->waitset;
+        waitset_init(&ws);
+        b->change_waitset(b, &ws);
+        err = b->tx_vtbl.create_thread_request(b, NOP_CONT,
+                                               (genvaddr_t)(uintptr_t)start_func,
+                                               (genvaddr_t)(uintptr_t)arg,
+                                               stacksize, (genvaddr_t)req);
         if (err_is_fail(err)) {
             return err;
         }
 
+        while (!req->reply_received) {
+            event_dispatch(&ws);
+        }
+
+        if (newthread) {
+            *newthread = req->thread;
+        }
+        free(req);
+
+        b->change_waitset(b, old_ws);
+
         return SYS_ERR_OK;
     }
 }
 
 errval_t domain_thread_create_on(coreid_t core_id, thread_func_t start_func,
-                                 void *arg)
+                                 void *arg, struct thread **newthread)
 {
-    return domain_thread_create_on_varstack(core_id, start_func, arg, 0);
+    return domain_thread_create_on_varstack(core_id, start_func, arg, 0, newthread);
 }
 
 /**
index cd9b1d0..6e18d67 100644 (file)
@@ -72,7 +72,7 @@ void libc_exit(int status)
         }
     }
 
-    thread_exit();
+    thread_exit(status);
     // If we're not dead by now, we wait
     while (1) {}
 }
index 316ade1..f26e311 100644 (file)
@@ -88,8 +88,8 @@ __attribute__((unused)) static bool stack_warned=0;
 static void thread_entry(thread_func_t start_func, void *start_data)
 {
     assert((lvaddr_t)start_func >= BASE_PAGE_SIZE);
-    start_func(start_data);
-    thread_exit();
+    int retval = start_func(start_data);
+    thread_exit(retval);
     assert(!"thread_exit returned");
 }
 
@@ -707,7 +707,7 @@ static int cleanup_thread(void *arg)
 /**
  * \brief Terminate the calling thread
  */
-void thread_exit(void)
+void thread_exit(int status)
 {
     struct thread *me = thread_self();
 
@@ -785,7 +785,7 @@ void thread_exit(void)
         disp_resume(handle, &dg->cleanupthread->regs);
     } else {
         // We're not detached -- wakeup joiner
-        me->return_value = 0;   // XXX: Should be an argument to thread_exit()
+        me->return_value = status;
         me->state = THREAD_STATE_EXITED;
         thread_cond_signal(&me->exit_condition);
 
index b0386f7..1fcf63a 100644 (file)
@@ -45,7 +45,7 @@ static void bomp_run_on(int core_id,
     thread_func_t func = (thread_func_t) cfunc;
 
     errval_t err = domain_thread_create_on_varstack(actual_id, func, arg,
-                                                    thread_stack_size);
+                                                    thread_stack_size, NULL);
     if (err_is_fail(err)) {
         DEBUG_ERR(err, "domain_thread_create_on failed");
         printf("domain_thread_create_on failed on %d\n", actual_id);
@@ -64,7 +64,7 @@ static int bomp_thread_fn(void *xdata)
     /* Wait for the Barrier */
     bomp_barrier_wait(work_data->barrier);
     thread_detach(thread_self());
-    thread_exit();
+    thread_exit(0); // XXX: should return work_fn return value?
     return 0;
 }
 
@@ -200,7 +200,7 @@ static void bomp_span_domain(int nos_threads,
     //for (int i = my_core_id + 1; i < nos_threads + my_core_id; i++) {
     for (int i = 1; i < nos_threads; ++i) {
         coreid_t core = my_core_id + (i * BOMP_DEFAULT_CORE_STRIDE);
-        err = domain_thread_create_on(core, remote_init, NULL);
+        err = domain_thread_create_on(core, remote_init, NULL, NULL);
         if (err_is_fail(err)) {
             DEBUG_ERR(err, "domain_thread_create_on failed");
             printf("domain_thread_create_on failed on %d\n", i);
index c2cf138..d50bc2d 100644 (file)
@@ -21,7 +21,7 @@ typedef void* (*backend_get_tls_fn_t)(void);
 typedef void (*backend_synchronize_fn_t)(void);
 typedef void (*backend_set_tls_fn_t)(void *data);
 typedef void* (*backend_get_thread_fn_t)(void);
-typedef void (*backend_thread_exit_fn_t)(void);
+typedef void (*backend_thread_exit_fn_t)(int status);
 typedef void (*backend_end_processing_fn_t)(void);
 typedef struct thread *(*backend_thread_create_fn_t)(bomp_thread_func_t start_func,
                                               void *arg, size_t stacksize);
index 5fddab9..2e1e8cb 100644 (file)
@@ -203,7 +203,7 @@ static errval_t bomp_node_init_local(nodeid_t nodeid,
     }
 
     BOMP_DEBUG_NODE("creating thread on core %" PRIuCOREID "\n", core);
-    err = domain_thread_create_on(core, bomp_node_msg_handler, node);
+    err = domain_thread_create_on(core, bomp_node_msg_handler, node, NULL);
     if (err_is_fail(err)) {
         // XXX> error handling
         return err;
index a0828bc..f2952ee 100644 (file)
@@ -122,7 +122,7 @@ tpool_t* tpool_create (int num_threads)
         if(i < num_threads - 1) {
             do {
                 err = domain_thread_create_on(i + my_core_id + 1, thread_loop,
-                                              &tpool->thread_args[i]);
+                                              &tpool->thread_args[i], NULL);
                 if (err_no(err) == LIB_ERR_NO_SPANNED_DISP) {
                     thread_yield();
                 }
index c1df329..d912867 100644 (file)
@@ -4,6 +4,8 @@
 #include <errno.h>
 #include <string.h>
 
+#include <posixcompat.h> // for pthread_placement stuff
+
 typedef void (*destructor_fn_t)(void *);
 typedef void *(*start_fn_t)(void *);
 
@@ -44,6 +46,7 @@ struct pthread_rwlock
 
 struct pthread {
     struct thread *thread;
+    int core; //< for spanned domains core on which thread is running
     const void *keys[PTHREAD_KEYS_MAX];
     start_fn_t start_fn;
     void *arg;
@@ -77,6 +80,16 @@ static int start_pthread(void *arg)
     return 0;
 }
 
+/*
+ * Optional pthread placement policy for spanned domains
+ */
+static pthread_placement_fn pthread_placement = NULL;
+errval_t posixcompat_pthread_set_placement_fn(pthread_placement_fn fn)
+{
+    pthread_placement = fn;
+    return SYS_ERR_OK;
+}
+
 int pthread_create(pthread_t *pthread, const pthread_attr_t *attr,
                    void *(*start_routine) (void *), void *arg)
 {
@@ -89,7 +102,19 @@ int pthread_create(pthread_t *pthread, const pthread_attr_t *attr,
     (*pthread)->arg = arg;
 
     // Start the thread
-    (*pthread)->thread = thread_create(start_pthread, *pthread);
+    (*pthread)->core = disp_get_core_id();
+    if (pthread_placement) {
+        (*pthread)->core = pthread_placement(PTHREAD_ACTION_CREATE, 0);
+    }
+    struct thread *nt;
+    errval_t err = domain_thread_create_on((*pthread)->core, start_pthread, *pthread, &nt);
+    if (err_is_fail(err)) {
+        DEBUG_ERR(err, "pthread_create");
+        return 1;
+    }
+    (*pthread)->thread = nt;
+    debug_printf("%s: %p -> %"PRIuPTR"\n", __FUNCTION__, *pthread,
+            thread_get_id((*pthread)->thread));
     return 0;
 }
 
@@ -305,9 +330,14 @@ int pthread_cond_destroy(pthread_cond_t *cond)
 
 int pthread_join(pthread_t thread, void **retval)
 {
+    debug_printf("%s: %p\n", __FUNCTION__, thread);
     errval_t err = thread_join(thread->thread, NULL);
     assert(err_is_ok(err));
 
+    if (pthread_placement) {
+        pthread_placement(PTHREAD_ACTION_DESTROY, thread->core);
+    }
+
     if (retval != NULL) {
         *retval = thread->retval;
     }
index 9916551..19ee2a2 100644 (file)
@@ -178,7 +178,7 @@ int init_tweed(int workers_requested,
         args->origin = disp_get_core_id();
 
         err = domain_thread_create_on(i + disp_get_core_id(), start_worker_thread, 
-                                 args);
+                                 args, NULL);
         if (err_is_fail(err)) {
             DEBUG_ERR(err, "Failed to run a function on remote core");
         }
index f58c46e..1e28c62 100644 (file)
@@ -198,5 +198,5 @@ int main(int argc, char **argv)
     a.argv = argv;
 
     thread_create_varstack(lu_bootstrap, &a, LU_STACK_SIZE);
-    thread_exit();
+    thread_exit(0);
 }
index 5b3ce8f..86b5c88 100644 (file)
@@ -124,7 +124,7 @@ int main(int argc, char *argv[])
 
     // Start all threads
     for (int i = my_core_id + 1; i < nthreads + my_core_id; i++) {
-        err = domain_thread_create_on(i, apic_measure_loop, NULL);
+        err = domain_thread_create_on(i, apic_measure_loop, NULL, NULL);
         assert(err_is_ok(err));
     }
 
index d09f224..dd521fc 100644 (file)
@@ -78,7 +78,7 @@ int main(int argc, char *argv[])
     }
 
     for (int i = my_core_id + 1; i < nthreads + my_core_id; i++) {
-        err = domain_thread_create_on(i, remote_init, NULL);
+        err = domain_thread_create_on(i, remote_init, NULL, NULL);
         assert(err_is_ok(err));
         thread_sem_wait(&init_sem);
     }
index de699b4..6a59633 100644 (file)
@@ -92,7 +92,7 @@ int main(int argc, char *argv[])
     }
 
     for (int i = my_core_id + 1; i < nthreads + my_core_id; i++) {
-        err = domain_thread_create_on(i, remote_init, NULL);
+        err = domain_thread_create_on(i, remote_init, NULL, NULL);
         assert(err_is_ok(err));
         thread_sem_wait(&init_sem);
     }
index 61ad228..4422691 100644 (file)
@@ -60,7 +60,7 @@ int main(int argc, char *argv[])
     assert(r == 0);
 
     // Stick around waiting for input
-    thread_exit();
+    thread_exit(0);
     assert(!"thread_exit returned");
     return EXIT_FAILURE;
 }
index 991a619..c6cdc09 100644 (file)
@@ -45,7 +45,7 @@ int main(int argc, char *argv[])
     // lwip_demo();
     tcp_server();
 
-    thread_exit();
+    thread_exit(0);
     assert(!"thread_exit returned");
     return EXIT_FAILURE;
 }
index 750d9ee..17a0b51 100644 (file)
@@ -75,7 +75,7 @@ int main(int argc, char *argv[])
     for(int i = 1; i <= num_span; i++) {
         debug_printf("starting thread on %d\n", mycore + i); 
 
-        err = domain_thread_create_on(mycore + i, do_hello, NULL);
+        err = domain_thread_create_on(mycore + i, do_hello, NULL, NULL);
 
         if (err_is_fail(err)) {
             DEBUG_ERR(err, "failed thread create %d", i);
index 48a6e1d..ed29bb7 100644 (file)
@@ -243,7 +243,7 @@ int main(int argc, char **argv)
 
 
     curr_core_nr = (my_core_id + 1) % nr_available_cores;
-    err = domain_thread_create_on(curr_core_nr, writemem, intarray);
+    err = domain_thread_create_on(curr_core_nr, writemem, intarray, NULL);
     if (err_is_fail(err)) {
         DEBUG_ERR(err, "could not create thread on core\n");
     }
@@ -258,12 +258,12 @@ int main(int argc, char **argv)
     thread_cond_init(&all_done_condition);
     
     curr_core_nr = (my_core_id + 1) % nr_available_cores;
-    err = domain_thread_create_on(curr_core_nr, increment_tmp_var, NULL);
+    err = domain_thread_create_on(curr_core_nr, increment_tmp_var, NULL, NULL);
     if (err_is_fail(err)) {
         DEBUG_ERR(err, "could not create thread on core\n");
     }
     curr_core_nr = (my_core_id) % nr_available_cores;
-    err = domain_thread_create_on(curr_core_nr, increment_tmp_var, NULL);
+    err = domain_thread_create_on(curr_core_nr, increment_tmp_var, NULL, NULL);
     if (err_is_fail(err)) {
         DEBUG_ERR(err, "could not create thread on core\n");
     }
@@ -289,7 +289,7 @@ int main(int argc, char **argv)
     s = 0;
     curr_core_nr = (my_core_id + 1) % nr_available_cores;
     while (curr_core_nr != my_core_id) {
-        err = domain_thread_create_on(curr_core_nr, create_skb_channel, 0);
+        err = domain_thread_create_on(curr_core_nr, create_skb_channel, 0, NULL);
         if (err_is_fail(err)) {
             DEBUG_ERR(err, "could not create thread on core\n");
             break;
@@ -339,7 +339,7 @@ int main(int argc, char **argv)
 
     uint64_t start_l2_cache_misses = rdpmc(0);
     for (int i = 0; i < NR_ALLOCATED_THREADS; i++) {
-        err = domain_thread_create_on(curr_core_nr, initialize_memory, &as[i]);
+        err = domain_thread_create_on(curr_core_nr, initialize_memory, &as[i], NULL);
         if (err_is_fail(err)) {
             DEBUG_ERR(err, "could not create thread on core\n");
         }
@@ -362,7 +362,7 @@ int main(int argc, char **argv)
     curr_core_nr = (my_core_id + 1) % nr_available_cores;
     start_l2_cache_misses = rdpmc(0);
     for (int i = 0; i < NR_ALLOCATED_THREADS; i++) {
-        err = domain_thread_create_on(curr_core_nr, initialize_number, &as[i]);
+        err = domain_thread_create_on(curr_core_nr, initialize_number, &as[i], NULL);
         if (err_is_fail(err)) {
             DEBUG_ERR(err, "could not create thread on core\n");
         }
@@ -385,7 +385,7 @@ int main(int argc, char **argv)
     curr_core_nr = (my_core_id + 1) % nr_available_cores;
     start_l2_cache_misses = rdpmc(0);
     for (int i = 0; i < NR_ALLOCATED_THREADS; i++) {
-        err = domain_thread_create_on(curr_core_nr, sqr, &as[i]);
+        err = domain_thread_create_on(curr_core_nr, sqr, &as[i], NULL);
         if (err_is_fail(err)) {
             DEBUG_ERR(err, "could not create thread on core\n");
         }
index 627010d..e4e95c8 100644 (file)
@@ -43,7 +43,7 @@ static int fpu_thread(void *arg)
             j++;
             if(j == 3) {
                 printf("fputest passed successfully!\n");
-                thread_exit();
+                thread_exit(0);
             }
         }
     }
@@ -63,6 +63,6 @@ int main(int argc, char *argv[])
         thread_create(fpu_thread, (void *)(uintptr_t)i);
     }
 
-    thread_exit();
+    thread_exit(0);
     return 0;
 }
index 732b51b..e164059 100644 (file)
@@ -14,5 +14,8 @@
                       cFiles = [ "spantest.c" ],
                       addLibraries = [ "trace" ],
                       flounderDefs = [ "monitor" ]
+                    },
+  build application { target = "xcorejoin",
+                      cFiles = [ "xcorejoin.c" ]
                     }
 ]
index 899911c..0de2cdc 100644 (file)
@@ -367,7 +367,7 @@ int main(int argc, char *argv[])
     //trace_dump();
 
     for(int i = 1; i < cores; i++) {
-        err = domain_thread_create_on(i, remote, NULL);
+        err = domain_thread_create_on(i, remote, NULL, NULL);
         assert(err_is_ok(err));
     }
 
index 9fff2d7..f8c0f33 100644 (file)
@@ -46,5 +46,5 @@ int main(int argc, char *argv[])
         assert(err_is_ok(err));
     }
 
-    thread_exit();
+    thread_exit(0);
 }