message send_cap_reply(errval err);
message wakeup_thread(genvaddr thread);
- message create_thread(genvaddr start_func, genvaddr arg, uint64 stacksize);
+ message create_thread_request(genvaddr start_func, genvaddr arg, uint64 stacksize, genvaddr req);
+ message create_thread_reply(errval err, genvaddr thread, genvaddr req);
message span_slave();
message span_slave_done();
domain_spanned_callback_t callback,
void *callback_arg);
errval_t domain_thread_create_on(coreid_t core_id, thread_func_t start_func,
- void *arg);
+ void *arg, struct thread **newthread);
errval_t domain_thread_create_on_varstack(coreid_t core_id,
thread_func_t start_func,
- void *arg, size_t stacksize);
+ void *arg, size_t stacksize,
+ struct thread **newthread);
errval_t domain_send_cap(coreid_t core_id, struct capref cap);
errval_t domain_wakeup_on(dispatcher_handle_t disp, struct thread *thread);
errval_t domain_wakeup_on_disabled(dispatcher_handle_t disp,
size_t stacksize);
void thread_yield(void);
void thread_yield_dispatcher(struct capref endpoint);
-void thread_exit(void);
+void thread_exit(int status);
struct thread *thread_self(void);
errval_t thread_join(struct thread *thread, int *retval);
errval_t thread_detach(struct thread *thread);
errval_t posixcompat_unpack_fds(void);
iref_t posixcompat_pts_get_iref(int fd);
+enum pthread_action {
+ PTHREAD_ACTION_CREATE,
+ PTHREAD_ACTION_DESTROY,
+};
+typedef int (*pthread_placement_fn)(enum pthread_action action, int coreid);
+errval_t posixcompat_pthread_set_placement_fn(pthread_placement_fn fn);
+
__END_DECLS
#endif
static void create_thread_request(struct interdisp_binding *b,
genvaddr_t funcaddr, genvaddr_t argaddr,
- uint64_t stacksize)
+ uint64_t stacksize, genvaddr_t req)
{
thread_func_t start_func = (thread_func_t)(uintptr_t)funcaddr;
void *arg = (void *)(uintptr_t)argaddr;
newthread = thread_create(start_func, arg);
}
assert(newthread != NULL);
+ errval_t err = b->tx_vtbl.create_thread_reply(b, NOP_CONT, SYS_ERR_OK,
+ (genvaddr_t)newthread, req);
+ assert(err_is_ok(err));
+}
+
+struct create_thread_req {
+ struct thread *thread;
+ bool reply_received;
+};
+
+static void create_thread_reply(struct interdisp_binding *b,
+ errval_t err, genvaddr_t thread, genvaddr_t req)
+{
+ assert(err_is_ok(err));
+ // fill out request
+ struct create_thread_req *r = (struct create_thread_req*)req;
+ r->thread = (struct thread *)thread;
+ r->reply_received = true;
}
static void wakeup_thread_request(struct interdisp_binding *b,
{
USER_PANIC("shouldn't be called");
free(cs);
- thread_exit();
+ thread_exit(0);
}
static void span_slave_done_request(struct interdisp_binding *b)
.send_cap_request = send_cap_request,
.send_cap_reply = send_cap_reply,
- .wakeup_thread = wakeup_thread_request,
- .create_thread = create_thread_request,
+ .wakeup_thread = wakeup_thread_request,
+ .create_thread_request = create_thread_request,
+ .create_thread_reply = create_thread_reply,
// XXX: Hack to allow domain_new_dispatcher() to proceed when not all
// default waitsets are serviced
errval_t domain_thread_create_on_varstack(coreid_t core_id,
thread_func_t start_func,
- void *arg, size_t stacksize)
+ void *arg, size_t stacksize,
+ struct thread **newthread)
{
if (disp_get_core_id() == core_id) {
struct thread *th = NULL;
th = thread_create_varstack(start_func, arg, stacksize);
}
if (th != NULL) {
+ if (newthread) {
+ *newthread = th;
+ }
return SYS_ERR_OK;
} else {
return LIB_ERR_THREAD_CREATE;
}
struct interdisp_binding *b = domain_state->b[core_id];
- err = b->tx_vtbl.create_thread(b, NOP_CONT,
- (genvaddr_t)(uintptr_t)start_func,
- (genvaddr_t)(uintptr_t)arg,
- stacksize);
+ struct create_thread_req *req = malloc(sizeof(*req));
+ req->reply_received = false;
+ // use special waitset to make sure loop exits properly.
+ struct waitset ws, *old_ws = b->waitset;
+ waitset_init(&ws);
+ b->change_waitset(b, &ws);
+ err = b->tx_vtbl.create_thread_request(b, NOP_CONT,
+ (genvaddr_t)(uintptr_t)start_func,
+ (genvaddr_t)(uintptr_t)arg,
+ stacksize, (genvaddr_t)req);
if (err_is_fail(err)) {
return err;
}
+ while (!req->reply_received) {
+ event_dispatch(&ws);
+ }
+
+ if (newthread) {
+ *newthread = req->thread;
+ }
+ free(req);
+
+ b->change_waitset(b, old_ws);
+
return SYS_ERR_OK;
}
}
errval_t domain_thread_create_on(coreid_t core_id, thread_func_t start_func,
- void *arg)
+ void *arg, struct thread **newthread)
{
- return domain_thread_create_on_varstack(core_id, start_func, arg, 0);
+ return domain_thread_create_on_varstack(core_id, start_func, arg, 0, newthread);
}
/**
}
}
- thread_exit();
+ thread_exit(status);
// If we're not dead by now, we wait
while (1) {}
}
static void thread_entry(thread_func_t start_func, void *start_data)
{
assert((lvaddr_t)start_func >= BASE_PAGE_SIZE);
- start_func(start_data);
- thread_exit();
+ int retval = start_func(start_data);
+ thread_exit(retval);
assert(!"thread_exit returned");
}
/**
* \brief Terminate the calling thread
*/
-void thread_exit(void)
+void thread_exit(int status)
{
struct thread *me = thread_self();
disp_resume(handle, &dg->cleanupthread->regs);
} else {
// We're not detached -- wakeup joiner
- me->return_value = 0; // XXX: Should be an argument to thread_exit()
+ me->return_value = status;
me->state = THREAD_STATE_EXITED;
thread_cond_signal(&me->exit_condition);
thread_func_t func = (thread_func_t) cfunc;
errval_t err = domain_thread_create_on_varstack(actual_id, func, arg,
- thread_stack_size);
+ thread_stack_size, NULL);
if (err_is_fail(err)) {
DEBUG_ERR(err, "domain_thread_create_on failed");
printf("domain_thread_create_on failed on %d\n", actual_id);
/* Wait for the Barrier */
bomp_barrier_wait(work_data->barrier);
thread_detach(thread_self());
- thread_exit();
+ thread_exit(0); // XXX: should return work_fn return value?
return 0;
}
//for (int i = my_core_id + 1; i < nos_threads + my_core_id; i++) {
for (int i = 1; i < nos_threads; ++i) {
coreid_t core = my_core_id + (i * BOMP_DEFAULT_CORE_STRIDE);
- err = domain_thread_create_on(core, remote_init, NULL);
+ err = domain_thread_create_on(core, remote_init, NULL, NULL);
if (err_is_fail(err)) {
DEBUG_ERR(err, "domain_thread_create_on failed");
printf("domain_thread_create_on failed on %d\n", i);
typedef void (*backend_synchronize_fn_t)(void);
typedef void (*backend_set_tls_fn_t)(void *data);
typedef void* (*backend_get_thread_fn_t)(void);
-typedef void (*backend_thread_exit_fn_t)(void);
+typedef void (*backend_thread_exit_fn_t)(int status);
typedef void (*backend_end_processing_fn_t)(void);
typedef struct thread *(*backend_thread_create_fn_t)(bomp_thread_func_t start_func,
void *arg, size_t stacksize);
}
BOMP_DEBUG_NODE("creating thread on core %" PRIuCOREID "\n", core);
- err = domain_thread_create_on(core, bomp_node_msg_handler, node);
+ err = domain_thread_create_on(core, bomp_node_msg_handler, node, NULL);
if (err_is_fail(err)) {
// XXX> error handling
return err;
if(i < num_threads - 1) {
do {
err = domain_thread_create_on(i + my_core_id + 1, thread_loop,
- &tpool->thread_args[i]);
+ &tpool->thread_args[i], NULL);
if (err_no(err) == LIB_ERR_NO_SPANNED_DISP) {
thread_yield();
}
#include <errno.h>
#include <string.h>
+#include <posixcompat.h> // for pthread_placement stuff
+
typedef void (*destructor_fn_t)(void *);
typedef void *(*start_fn_t)(void *);
struct pthread {
struct thread *thread;
+ int core; //< for spanned domains core on which thread is running
const void *keys[PTHREAD_KEYS_MAX];
start_fn_t start_fn;
void *arg;
return 0;
}
+/*
+ * Optional pthread placement policy for spanned domains
+ */
+static pthread_placement_fn pthread_placement = NULL;
+errval_t posixcompat_pthread_set_placement_fn(pthread_placement_fn fn)
+{
+ pthread_placement = fn;
+ return SYS_ERR_OK;
+}
+
int pthread_create(pthread_t *pthread, const pthread_attr_t *attr,
void *(*start_routine) (void *), void *arg)
{
(*pthread)->arg = arg;
// Start the thread
- (*pthread)->thread = thread_create(start_pthread, *pthread);
+ (*pthread)->core = disp_get_core_id();
+ if (pthread_placement) {
+ (*pthread)->core = pthread_placement(PTHREAD_ACTION_CREATE, 0);
+ }
+ struct thread *nt;
+ errval_t err = domain_thread_create_on((*pthread)->core, start_pthread, *pthread, &nt);
+ if (err_is_fail(err)) {
+ DEBUG_ERR(err, "pthread_create");
+ return 1;
+ }
+ (*pthread)->thread = nt;
+ debug_printf("%s: %p -> %"PRIuPTR"\n", __FUNCTION__, *pthread,
+ thread_get_id((*pthread)->thread));
return 0;
}
int pthread_join(pthread_t thread, void **retval)
{
+ debug_printf("%s: %p\n", __FUNCTION__, thread);
errval_t err = thread_join(thread->thread, NULL);
assert(err_is_ok(err));
+ if (pthread_placement) {
+ pthread_placement(PTHREAD_ACTION_DESTROY, thread->core);
+ }
+
if (retval != NULL) {
*retval = thread->retval;
}
args->origin = disp_get_core_id();
err = domain_thread_create_on(i + disp_get_core_id(), start_worker_thread,
- args);
+ args, NULL);
if (err_is_fail(err)) {
DEBUG_ERR(err, "Failed to run a function on remote core");
}
a.argv = argv;
thread_create_varstack(lu_bootstrap, &a, LU_STACK_SIZE);
- thread_exit();
+ thread_exit(0);
}
// Start all threads
for (int i = my_core_id + 1; i < nthreads + my_core_id; i++) {
- err = domain_thread_create_on(i, apic_measure_loop, NULL);
+ err = domain_thread_create_on(i, apic_measure_loop, NULL, NULL);
assert(err_is_ok(err));
}
}
for (int i = my_core_id + 1; i < nthreads + my_core_id; i++) {
- err = domain_thread_create_on(i, remote_init, NULL);
+ err = domain_thread_create_on(i, remote_init, NULL, NULL);
assert(err_is_ok(err));
thread_sem_wait(&init_sem);
}
}
for (int i = my_core_id + 1; i < nthreads + my_core_id; i++) {
- err = domain_thread_create_on(i, remote_init, NULL);
+ err = domain_thread_create_on(i, remote_init, NULL, NULL);
assert(err_is_ok(err));
thread_sem_wait(&init_sem);
}
assert(r == 0);
// Stick around waiting for input
- thread_exit();
+ thread_exit(0);
assert(!"thread_exit returned");
return EXIT_FAILURE;
}
// lwip_demo();
tcp_server();
- thread_exit();
+ thread_exit(0);
assert(!"thread_exit returned");
return EXIT_FAILURE;
}
for(int i = 1; i <= num_span; i++) {
debug_printf("starting thread on %d\n", mycore + i);
- err = domain_thread_create_on(mycore + i, do_hello, NULL);
+ err = domain_thread_create_on(mycore + i, do_hello, NULL, NULL);
if (err_is_fail(err)) {
DEBUG_ERR(err, "failed thread create %d", i);
curr_core_nr = (my_core_id + 1) % nr_available_cores;
- err = domain_thread_create_on(curr_core_nr, writemem, intarray);
+ err = domain_thread_create_on(curr_core_nr, writemem, intarray, NULL);
if (err_is_fail(err)) {
DEBUG_ERR(err, "could not create thread on core\n");
}
thread_cond_init(&all_done_condition);
curr_core_nr = (my_core_id + 1) % nr_available_cores;
- err = domain_thread_create_on(curr_core_nr, increment_tmp_var, NULL);
+ err = domain_thread_create_on(curr_core_nr, increment_tmp_var, NULL, NULL);
if (err_is_fail(err)) {
DEBUG_ERR(err, "could not create thread on core\n");
}
curr_core_nr = (my_core_id) % nr_available_cores;
- err = domain_thread_create_on(curr_core_nr, increment_tmp_var, NULL);
+ err = domain_thread_create_on(curr_core_nr, increment_tmp_var, NULL, NULL);
if (err_is_fail(err)) {
DEBUG_ERR(err, "could not create thread on core\n");
}
s = 0;
curr_core_nr = (my_core_id + 1) % nr_available_cores;
while (curr_core_nr != my_core_id) {
- err = domain_thread_create_on(curr_core_nr, create_skb_channel, 0);
+ err = domain_thread_create_on(curr_core_nr, create_skb_channel, 0, NULL);
if (err_is_fail(err)) {
DEBUG_ERR(err, "could not create thread on core\n");
break;
uint64_t start_l2_cache_misses = rdpmc(0);
for (int i = 0; i < NR_ALLOCATED_THREADS; i++) {
- err = domain_thread_create_on(curr_core_nr, initialize_memory, &as[i]);
+ err = domain_thread_create_on(curr_core_nr, initialize_memory, &as[i], NULL);
if (err_is_fail(err)) {
DEBUG_ERR(err, "could not create thread on core\n");
}
curr_core_nr = (my_core_id + 1) % nr_available_cores;
start_l2_cache_misses = rdpmc(0);
for (int i = 0; i < NR_ALLOCATED_THREADS; i++) {
- err = domain_thread_create_on(curr_core_nr, initialize_number, &as[i]);
+ err = domain_thread_create_on(curr_core_nr, initialize_number, &as[i], NULL);
if (err_is_fail(err)) {
DEBUG_ERR(err, "could not create thread on core\n");
}
curr_core_nr = (my_core_id + 1) % nr_available_cores;
start_l2_cache_misses = rdpmc(0);
for (int i = 0; i < NR_ALLOCATED_THREADS; i++) {
- err = domain_thread_create_on(curr_core_nr, sqr, &as[i]);
+ err = domain_thread_create_on(curr_core_nr, sqr, &as[i], NULL);
if (err_is_fail(err)) {
DEBUG_ERR(err, "could not create thread on core\n");
}
j++;
if(j == 3) {
printf("fputest passed successfully!\n");
- thread_exit();
+ thread_exit(0);
}
}
}
thread_create(fpu_thread, (void *)(uintptr_t)i);
}
- thread_exit();
+ thread_exit(0);
return 0;
}
cFiles = [ "spantest.c" ],
addLibraries = [ "trace" ],
flounderDefs = [ "monitor" ]
+ },
+ build application { target = "xcorejoin",
+ cFiles = [ "xcorejoin.c" ]
}
]
//trace_dump();
for(int i = 1; i < cores; i++) {
- err = domain_thread_create_on(i, remote, NULL);
+ err = domain_thread_create_on(i, remote, NULL, NULL);
assert(err_is_ok(err));
}
assert(err_is_ok(err));
}
- thread_exit();
+ thread_exit(0);
}