message create_thread_request(genvaddr start_func, genvaddr arg, uint64 stacksize, genvaddr req);
message create_thread_reply(errval err, genvaddr thread, genvaddr req);
+ message join_thread_request(genvaddr thread, genvaddr req);
+ message join_thread_reply(errval err, uint64 retval, genvaddr req);
+
message span_slave();
message span_slave_done();
message span_eager_connect(coreid core_id);
thread_func_t start_func,
void *arg, size_t stacksize,
struct thread **newthread);
+errval_t domain_thread_join(struct thread *thread, int *retval);
errval_t domain_send_cap(coreid_t core_id, struct capref cap);
errval_t domain_wakeup_on(dispatcher_handle_t disp, struct thread *thread);
errval_t domain_wakeup_on_disabled(dispatcher_handle_t disp,
disp_enable(handle);
}
+static void join_thread_request(struct interdisp_binding *b,
+ genvaddr_t taddr, genvaddr_t req)
+{
+ struct thread *thread = (struct thread *)taddr;
+ assert(thread->coreid == disp_get_core_id());
+ int retval = 0;
+ errval_t err = thread_join(thread, &retval);
+ err = b->tx_vtbl.join_thread_reply(b, NOP_CONT, err, retval, req);
+ assert(err_is_ok(err));
+}
+
+struct join_thread_req {
+ errval_t err;
+ int retval;
+ bool reply_received;
+};
+
+static void join_thread_reply(struct interdisp_binding *b,
+ errval_t err, uint64_t retval, genvaddr_t req)
+{
+ struct join_thread_req *r = (struct join_thread_req *)req;
+ r->err = err;
+ r->retval = retval;
+ r->reply_received = true;
+}
+
/*
* XXX: The whole span_slave*() thing is a hack to allow all
* dispatchers to wait on both the monitor and interdisp waitsets
.create_thread_request = create_thread_request,
.create_thread_reply = create_thread_reply,
+ .join_thread_request = join_thread_request,
+ .join_thread_reply = join_thread_reply,
+
// XXX: Hack to allow domain_new_dispatcher() to proceed when not all
// default waitsets are serviced
.span_slave = span_slave_request,
return domain_thread_create_on_varstack(core_id, start_func, arg, 0, newthread);
}
+errval_t domain_thread_join(struct thread *thread, int *retval)
+{
+ coreid_t core_id = thread->coreid;
+ debug_printf("%s: joining %p, coreid %d\n", __FUNCTION__, thread, core_id);
+ if (disp_get_core_id() == core_id) {
+ return thread_join(thread, retval);
+ } else {
+ struct domain_state *domain_state = get_domain_state();
+ errval_t err;
+
+ if (domain_state->b[core_id] == NULL) {
+ return LIB_ERR_NO_SPANNED_DISP;
+ }
+
+ struct interdisp_binding *b = domain_state->b[core_id];
+ struct join_thread_req *req = malloc(sizeof(*req));
+ req->reply_received = false;
+ // use special waitset to make sure loop exits properly.
+ struct waitset ws, *old_ws = b->waitset;
+ waitset_init(&ws);
+ b->change_waitset(b, &ws);
+ err = b->tx_vtbl.join_thread_request(b, NOP_CONT,
+ (genvaddr_t)thread,
+ (genvaddr_t)req);
+ if (err_is_fail(err)) {
+ return err;
+ }
+
+ while (!req->reply_received) {
+ event_dispatch(&ws);
+ }
+ // change waitset back
+ b->change_waitset(b, old_ws);
+
+ if (retval) {
+ *retval = req->retval;
+ }
+ err = req->err;
+ free(req);
+
+ return err;
+ }
+}
+
/**
* \brief set the core_id.
*
errval_t thread_join(struct thread *thread, int *retval)
{
assert(thread != NULL);
+ // this function should only be called for threads on same core
+ assert(thread->coreid == disp_get_core_id());
thread_mutex_lock(&thread->exit_lock);
if(thread->detached) {
int pthread_join(pthread_t thread, void **retval)
{
debug_printf("%s: %p\n", __FUNCTION__, thread);
- errval_t err = thread_join(thread->thread, NULL);
+ errval_t err = domain_thread_join(thread->thread, NULL);
assert(err_is_ok(err));
if (pthread_placement) {
--- /dev/null
+/**
+ * \file
+ * \brief Test spanning of domains across cores
+ */
+
+/*
+ * Copyright (c) 2007, 2008, 2010, ETH Zurich.
+ * All rights reserved.
+ *
+ * This file is distributed under the terms in the attached LICENSE file.
+ * If you do not find this file, copies can be found by writing to:
+ * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <barrelfish/barrelfish.h>
+#include <barrelfish/dispatch.h>
+#include <barrelfish/waitset.h>
+
+
+static int remote(void *dummy)
+{
+ uintptr_t v = (uintptr_t)dummy;
+ debug_printf("v = %"PRIuPTR"\n", v);
+ return v;
+}
+
+int ndispatchers = 1;
+
+static void domain_spanned_callback(void *arg, errval_t err)
+{
+ ndispatchers++;
+}
+
+int main(int argc, char *argv[])
+{
+ errval_t err;
+ if (argc != 2) {
+ printf("Usage %s: <Num additional threads>\n", argv[0]);
+ exit(-1);
+ }
+
+
+ //printf("main running on %d\n", disp_get_core_id());
+
+ int cores = strtol(argv[1], NULL, 10) + 1;
+
+ for(int i = 1; i < cores; i++) {
+ err = domain_new_dispatcher(i + disp_get_core_id(),
+ domain_spanned_callback,
+ (void*)(uintptr_t)i);
+ if (err_is_fail(err)) {
+ USER_PANIC_ERR(err, "domain_new_dispatcher failed");
+ }
+ }
+
+ while (ndispatchers < cores) {
+ thread_yield();
+ }
+
+ struct thread *threads[cores];
+ for(int i = 1; i < cores; i++) {
+ err = domain_thread_create_on(i, remote, (void*)(uintptr_t)i, &threads[i]);
+ assert(err_is_ok(err));
+ debug_printf("created %p\n", threads[i]);
+ }
+
+ for(int i = 1; i < cores; i++) {
+ int retval;
+ debug_printf("joining %p\n", threads[i]);
+ err = domain_thread_join(threads[i], &retval);
+ assert(err_is_ok(err));
+ debug_printf("retval = %d\n", retval);
+ assert(retval == i);
+ }
+ return 0;
+}