flounder,waitsets: adding chanstate dependency in order to wait for a specific chanst...
authorAdam Turowski <adam.turowski@inf.ethz.ch>
Wed, 21 Dec 2016 09:46:29 +0000 (10:46 +0100)
committerAdam Turowski <adam.turowski@inf.ethz.ch>
Wed, 28 Dec 2016 10:22:32 +0000 (11:22 +0100)
Signed-off-by: Adam Turowski <adam.turowski@inf.ethz.ch>

17 files changed:
include/barrelfish/threads.h
include/barrelfish/ump_chan.h
include/barrelfish/waitset.h
lib/barrelfish/flounder_support.c
lib/barrelfish/include/threads_priv.h
lib/barrelfish/spawn_client.c
lib/barrelfish/threads.c
lib/barrelfish/ump_chan.c
lib/barrelfish/waitset.c
tools/flounder/BackendCommon.hs
tools/flounder/GCBackend.hs
tools/flounder/GHBackend.hs
tools/flounder/LMP.hs
tools/flounder/Multihop.hs
tools/flounder/RPCClient.hs
tools/flounder/UMPCommon.hs
usr/mem_serv_dist/steal.c

index 9c0a00a..cfd0774 100644 (file)
@@ -84,6 +84,10 @@ uint32_t thread_current_token(void);
 void thread_set_outgoing_token(uint32_t token);
 void thread_get_outgoing_token(uint32_t *token);
 
+/// Set/get a local trigger for currently processed event channel
+void thread_set_local_trigger(struct waitset_chanstate *trigger);
+struct waitset_chanstate * thread_get_local_trigger(void);
+
 struct flounder_rpc_context;
 
 void thread_set_rpc_in_progress(bool v);
@@ -91,9 +95,6 @@ bool thread_get_rpc_in_progress(void);
 void thread_set_async_error(errval_t e);
 errval_t thread_get_async_error(void);
 
-void thread_set_mask_channels(bool m);
-bool thread_get_mask_channels(void);
-
 void thread_store_recv_slot(struct capref recv_slot);
 struct capref thread_get_next_recv_slot(void);
 
index dee287c..f1a143c 100644 (file)
@@ -152,6 +152,7 @@ static inline struct waitset_chanstate * ump_chan_get_receiving_channel(struct u
     return &chan->endpoint.waitset_state;
 }
 
+struct waitset_chanstate * monitor_bind_get_receiving_chanstate(struct monitor_binding *b);
 
 __END_DECLS
 
index 6016465..1ac6e09 100644 (file)
@@ -84,7 +84,7 @@ struct waitset_chanstate {
     bool persistent;                        ///< Channel should be always registered
     struct waitset_chanstate *polled_next, *polled_prev;    ///< Dispatcher's polled queue
     struct thread *wait_for;                ///< Thread waiting for this event
-    bool masked;
+    struct waitset_chanstate *trigger;      ///< Chanstate that triggers this chanstate 
 };
 
 /**
@@ -110,7 +110,8 @@ errval_t waitset_destroy(struct waitset *ws);
 
 errval_t get_next_event(struct waitset *ws, struct event_closure *retclosure);
 errval_t get_next_event_disabled(struct waitset *ws, struct waitset_chanstate **retchan,
-        struct event_closure *retclosure, struct waitset_chanstate *waitfor, dispatcher_handle_t handle, bool debug);
+    struct event_closure *retclosure, struct waitset_chanstate *waitfor,
+    struct waitset_chanstate *waitfor2, dispatcher_handle_t handle, bool debug);
 errval_t check_for_event(struct waitset *ws);
 errval_t event_dispatch(struct waitset *ws);
 errval_t wait_for_channel(struct waitset *ws, struct waitset_chanstate *channel, errval_t *error_var);
index 176e85e..06c3a6e 100644 (file)
@@ -85,7 +85,6 @@ void flounder_support_waitset_chanstate_init_persistent(struct waitset_chanstate
 {
     waitset_chanstate_init(wc, CHANTYPE_FLOUNDER);
     wc->persistent = true;
-    wc->masked = true;
 }
 
 void flounder_support_waitset_chanstate_destroy(struct waitset_chanstate *wc)
@@ -147,10 +146,13 @@ errval_t flounder_stub_send_cap(struct flounder_cap_state *s,
                                            monitor_id, cap, s->tx_capnum);
     }
     if (err_is_ok(err)) {
+        thread_set_local_trigger(&mb->tx_cont_chanstate);
         s->tx_capnum++;
         return err;
     } else if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
-        // register to retry
+        assert(0); // this should never happen
+        thread_set_local_trigger(&mb->tx_cont_chanstate); // not sure if this is
+        // ok since I don't know how to test this case
         return mb->register_send(mb, mb->waitset, MKCONT(cap_send_cont, s));
     } else {
         return err_push(err, LIB_ERR_MONITOR_CAP_SEND);
index 123a8a7..905c565 100644 (file)
@@ -84,9 +84,9 @@ struct thread {
     bool    rpc_in_progress;               ///< RPC in progress
     errval_t    async_error;                ///< RPC async error
     uint32_t    outgoing_token;             ///< Token of outgoing message
-    bool    mask_channels;
     struct capref recv_slots[MAX_RECV_SLOTS];///< Queued cap recv slots
     int8_t recv_slot_count;                 ///< number of currently queued recv slots
+    struct waitset_chanstate *local_trigger; ///< Trigger for a local thread event
 };
 
 void thread_enqueue(struct thread *thread, struct thread **queue);
index 79d7cc2..ea26708 100644 (file)
@@ -213,12 +213,6 @@ errval_t spawn_program_with_caps(coreid_t coreid, const char *path,
         path = pathbuf;
     }
 
-    // XXX: HACK: change waitset on monitor binding temporarily so we get UMP
-    // cap transfer notifications!
-    struct monitor_binding *mb = get_monitor_binding();
-    struct waitset *mon_ws = mb->waitset;
-    mb->change_waitset(mb, &cl->rpc_waitset);
-
     if (capref_is_null(inheritcn_cap) && capref_is_null(argcn_cap)) {
         err = cl->vtbl.spawn_domain(cl, path, argstr, argstrlen,
                                     envstr, envstrlen, flags,
@@ -239,7 +233,6 @@ errval_t spawn_program_with_caps(coreid_t coreid, const char *path,
     }
 
 out:
-    mb->change_waitset(mb, mon_ws);
     return msgerr;
 }
 
index 2663c23..6eacf55 100644 (file)
@@ -256,7 +256,7 @@ static void thread_init(dispatcher_handle_t disp, struct thread *newthread)
 
     newthread->rpc_in_progress = false;
     newthread->async_error = SYS_ERR_OK;
-    newthread->mask_channels = false;
+    newthread->local_trigger = NULL;
 }
 
 /**
@@ -663,6 +663,18 @@ void thread_get_outgoing_token(uint32_t *token)
     }
 }
 
+void thread_set_local_trigger(struct waitset_chanstate *trigger)
+{
+    struct thread *me = thread_self();
+    me->local_trigger = trigger;
+}
+
+struct waitset_chanstate * thread_get_local_trigger(void)
+{
+    struct thread *me = thread_self();
+    return me->local_trigger;
+}
+
 void thread_set_rpc_in_progress(bool v)
 {
     thread_self()->rpc_in_progress = v;
@@ -683,16 +695,6 @@ errval_t thread_get_async_error(void)
     return thread_self()->async_error;
 }
 
-void thread_set_mask_channels(bool m)
-{
-    thread_self()->mask_channels = m;
-}
-
-bool thread_get_mask_channels(void)
-{
-    return thread_self()->mask_channels;
-}
-
 /**
  * \brief Store receive slot provided by rpc in thread state
  */
index 95dcc68..131ed46 100644 (file)
@@ -344,3 +344,8 @@ void ump_init(void)
     mcb->rx_vtbl.bind_ump_reply_client = bind_ump_reply_handler;
     mcb->rx_vtbl.bind_ump_service_request = bind_ump_service_request_handler;
 }
+
+struct waitset_chanstate * monitor_bind_get_receiving_chanstate(struct monitor_binding *b)
+{
+    return b->get_receiving_chanstate(b);
+}
index e6c0840..866f8c0 100644 (file)
@@ -157,35 +157,37 @@ static bool waitset_can_receive(struct waitset_chanstate *chan,
 {
     bool res = false;
 
-    if (!thread->mask_channels || !chan->masked) {
-        if (chan->wait_for) // if a thread is waiting for this specific event
-            res = chan->wait_for == thread;
-        else
-            res = (chan->token & 1 && !thread->token) // incoming token is a request
-                // and a thread is not waiting for a token
-                || (!chan->token && chan != thread->channel) // there's no token
-                // and a thread is not waiting specifically for that event
-                || (chan->token == thread->token && chan == thread->channel);
-                // there is a token and it matches thread's token and event
-    }
+    if (chan->wait_for) // if a thread is waiting for this specific event
+        res = chan->wait_for == thread;
+    else
+        res = (chan->token & 1 && !thread->token) // incoming token is a request
+            // and a thread is not waiting for a token
+            || (!chan->token && chan != thread->channel) // there's no token
+            // and a thread is not waiting specifically for that event
+            || (chan->token == thread->token && chan == thread->channel);
+            // there is a token and it matches thread's token and event
     return res;
 }
 
 /// Returns a channel with a pending event on the given waitset matching
 /// our thread
 static struct waitset_chanstate *get_pending_event_disabled(struct waitset *ws,
-                                    struct waitset_chanstate *chan)
+          struct waitset_chanstate *waitfor, struct waitset_chanstate *waitfor2)
 {
     struct thread *me = thread_self_disabled();
 
-    if (chan) { // channel that we wait for
-        if (chan->state == CHAN_PENDING && waitset_can_receive(chan, me)) {
-            return chan;
+    if (waitfor) { // channel that we wait for
+        if ((waitfor->state == CHAN_PENDING || waitfor->state == CHAN_WAITING)
+            && waitset_can_receive(waitfor, me)) {
+            return waitfor;
         }
-        if (chan->state == CHAN_WAITING && waitset_can_receive(chan, me)) {
-            return chan;
+        if (waitfor2 && (waitfor2->state == CHAN_PENDING || waitfor2->state == CHAN_WAITING)
+            && waitset_can_receive(waitfor2, me)) {
+            return waitfor2;
         }
+        return NULL;
     }
+    struct waitset_chanstate *chan;
     // check a waiting queue for matching event
     for (chan = ws->waiting; chan; ) {
         if (waitset_can_receive(chan, me)) {
@@ -332,12 +334,14 @@ static void wake_up_other_thread(dispatcher_handle_t handle, struct waitset *ws)
 
 errval_t get_next_event_disabled(struct waitset *ws,
     struct waitset_chanstate **retchannel, struct event_closure *retclosure,
-    struct waitset_chanstate *waitfor, dispatcher_handle_t handle, bool debug)
+    struct waitset_chanstate *waitfor, struct waitset_chanstate *waitfor2,
+    dispatcher_handle_t handle, bool debug)
 {
     struct waitset_chanstate * chan;
 
+// debug_printf("%s: %p %p %p %p\n", __func__, __builtin_return_address(0), __builtin_return_address(1), __builtin_return_address(2), __builtin_return_address(3));
     for (;;) {
-        chan = get_pending_event_disabled(ws, waitfor); // get our event
+        chan = get_pending_event_disabled(ws, waitfor, waitfor2); // get our event
         if (chan) {
             *retchannel = chan;
             *retclosure = chan->closure;
@@ -348,6 +352,7 @@ errval_t get_next_event_disabled(struct waitset *ws,
             else
                 waitset_chan_deregister_disabled(chan, handle);
             wake_up_other_thread(handle, ws);
+    // debug_printf("%s.%d: %p\n", __func__, __LINE__, retclosure->handler);
             return SYS_ERR_OK;
         }
         chan = ws->pending; // check a pending queue
@@ -399,7 +404,7 @@ errval_t get_next_event(struct waitset *ws, struct event_closure *retclosure)
 {
     dispatcher_handle_t handle = disp_disable();
     struct waitset_chanstate *channel;
-    errval_t err = get_next_event_disabled(ws, &channel, retclosure, NULL,
+    errval_t err = get_next_event_disabled(ws, &channel, retclosure, NULL, NULL,
                                             handle, false);
     disp_enable(handle);
     return err;
@@ -422,7 +427,7 @@ static errval_t check_for_event_disabled(struct waitset *ws, dispatcher_handle_t
     struct waitset_chanstate *chan;
 
     poll_channels_disabled(handle);
-    chan = get_pending_event_disabled(ws, NULL);
+    chan = get_pending_event_disabled(ws, NULL, NULL);
     if (chan != NULL) {
         return SYS_ERR_OK;
     }
@@ -467,8 +472,8 @@ errval_t event_dispatch_debug(struct waitset *ws)
     struct event_closure closure;
     struct waitset_chanstate *channel;
     dispatcher_handle_t handle = disp_disable();
-    errval_t err = get_next_event_disabled(ws, &channel, &closure, NULL, handle,
-                                            true);
+    errval_t err = get_next_event_disabled(ws, &channel, &closure, NULL, NULL,
+                                           handle, true);
     disp_enable(handle);
     if (err_is_fail(err)) {
         return err;
@@ -491,24 +496,26 @@ errval_t event_dispatch_debug(struct waitset *ws)
  */
 
 errval_t wait_for_channel(struct waitset *ws, struct waitset_chanstate *waitfor,
-                            errval_t *error_var)
+                          errval_t *error_var)
 {
     assert(waitfor->waitset == ws);
     for (;;) {
         struct event_closure closure;
-        struct waitset_chanstate *channel;
+        struct waitset_chanstate *channel, *trigger;
 
+        trigger = thread_get_local_trigger();
         dispatcher_handle_t handle = disp_disable();
         errval_t err = get_next_event_disabled(ws, &channel, &closure, waitfor,
-                                                handle, false);
+            trigger ? trigger: waitfor->trigger, handle, false);
+        if (trigger)
+            thread_set_local_trigger(NULL);
         disp_enable(handle);
         if (err_is_fail(err)) {
             assert(0);
             return err;
         }
-        if (channel == waitfor) {
+        if (channel == waitfor)
             return SYS_ERR_OK;
-        }
         assert(!channel->wait_for);
         assert(closure.handler != NULL);
         closure.handler(closure.arg);
@@ -541,7 +548,7 @@ errval_t event_dispatch_non_block(struct waitset *ws)
         disp_enable(handle);
         return err;
     }
-    err = get_next_event_disabled(ws, &channel, &closure, NULL, handle,
+    err = get_next_event_disabled(ws, &channel, &closure, NULL, NULL, handle,
                                             false);
     if (err_is_fail(err))
         return err;
@@ -576,7 +583,7 @@ void waitset_chanstate_init(struct waitset_chanstate *chan,
     chan->persistent = false;
     chan->token = 0;
     chan->wait_for = NULL;
-    chan->masked = false;
+    chan->trigger = NULL;
 }
 
 /**
index e75f31a..fbee525 100644 (file)
@@ -279,7 +279,6 @@ binding_struct_init :: String -> String -> C.Expr -> C.Expr ->  C.Expr -> [C.Stm
 binding_struct_init drv ifn binding_var waitset_ex tx_vtbl_ex = [
     C.Ex $ C.Assignment (C.FieldOf binding_var "st") (C.Variable "NULL"),
     C.Ex $ C.Assignment (C.FieldOf binding_var "waitset") waitset_ex,
-    C.Ex $ C.Assignment (C.FieldOf binding_var "send_waitset") (C.Variable "NULL"),
     C.Ex $ C.Call "event_mutex_init" [C.AddressOf $ C.FieldOf binding_var "mutex", waitset_ex],
     C.Ex $ C.Call "thread_mutex_init" [C.AddressOf $ C.FieldOf binding_var "rxtx_mutex"],
     C.Ex $ C.Call "thread_mutex_init" [C.AddressOf $ C.FieldOf binding_var "send_mutex"],
@@ -382,7 +381,7 @@ register_txcont cont_ex = [
     C.If (C.Binary C.NotEquals (cont_ex `C.FieldOf` "handler") (C.Variable "NULL"))
         [localvar (C.TypeName "errval_t") "_err" Nothing,
          C.Ex $ C.Assignment errvar $ C.Call "flounder_support_register"
-            [C.Variable "send_waitset",
+            [C.DerefField bindvar "waitset",
              C.AddressOf $ bindvar `C.DerefField` "tx_cont_chanstate",
              cont_ex,
              C.Variable "false"],
@@ -404,10 +403,8 @@ block_sending :: C.Expr -> [C.Stmt]
 block_sending cont_ex = [
     C.If (C.Binary C.Equals (cont_ex `C.FieldOf` "handler") (C.Variable "blocking_cont"))
         [C.If (C.Binary C.Equals binding_error (C.Variable "SYS_ERR_OK")) [
-            C.Ex $ C.Call "thread_set_mask_channels" [C.Variable "true"],
             C.Ex $ C.Assignment binding_error $ C.Call "wait_for_channel"
-                [C.Variable "send_waitset", tx_cont_chanstate, C.AddressOf binding_error],
-            C.Ex $ C.Call "thread_set_mask_channels" [C.Variable "false"]
+                [C.DerefField bindvar "waitset", tx_cont_chanstate, C.AddressOf binding_error]
             ] [
             C.Ex $ C.Call "flounder_support_deregister_chan" [tx_cont_chanstate]
             ]
index d9abcfd..a96b540 100644 (file)
@@ -156,6 +156,7 @@ connect_handler :: String -> MessageDef -> C.Stmt
 connect_handler n msg@(Message _ mn _ _) = C.StmtList [
     C.Ex $ C.Call "flounder_support_waitset_chanstate_init_persistent" [message_chanstate],
     C.Ex $ C.Assignment errvar $ C.Call "flounder_support_register" [waitset, message_chanstate, closure, C.Variable "false"],
+    C.Ex $ C.Assignment (C.DerefField message_chanstate "trigger") $ C.CallInd (bindvar `C.DerefField` "get_receiving_chanstate") [bindvar],
     C.Ex $ C.Call "assert" [C.Call "err_is_ok" [errvar]]
     ]
     where
index d6bfa68..4ba01c3 100644 (file)
@@ -363,10 +363,6 @@ binding_struct n ml = C.StructDecl (intf_bind_type n) fields
         C.ParamComment "Message channels",
         C.Param (C.Array (toInteger ((length ml) + 3)) (C.Struct "waitset_chanstate")) "message_chanstate",
 
-        C.ParamComment "Waitset used for send continuations",
-        C.Param (C.Ptr $ C.Struct "waitset") "send_waitset",
-        C.ParamBlank,
-
         C.ParamComment "Private state belonging to the binding implementation",
         C.Param (C.Union $ binding_arg_union_type TX n) "tx_union",
         C.Param (C.Union $ binding_arg_union_type RX n) "rx_union",
index 3caeb0c..6ddd4fa 100644 (file)
@@ -226,6 +226,7 @@ lmp_init_fn :: String -> C.Unit
 lmp_init_fn ifn = C.FunctionDef C.NoScope C.Void (lmp_init_fn_name ifn) params [
     C.StmtList common_init,
     C.Ex $ C.Call "lmp_chan_init" [C.AddressOf $ C.DerefField lmp_bind_var "chan"],
+    C.Ex $ C.Assignment (C.FieldOf (common_field "tx_cont_chanstate") "trigger") (C.AddressOf $ C.FieldOf (C.DerefField lmp_bind_var "chan") "send_waitset"),
     C.Ex $ C.Assignment (common_field "change_waitset") (C.Variable $ change_waitset_fn_name ifn),
     C.Ex $ C.Assignment (common_field "control") (C.Variable $ control_fn_name ifn),
     C.Ex $ C.Assignment (common_field "receive_next") (C.Variable $ receive_next_fn_name ifn),
@@ -278,6 +279,8 @@ lmp_bind_cont_fn ifn =
     C.FunctionDef C.Static C.Void (lmp_bind_cont_fn_name ifn) params [
         localvar (C.Ptr $ C.Struct $ lmp_bind_type ifn)
             lmp_bind_var_name (Just $ C.Variable "st"),
+        localvar (C.Ptr $ C.Struct $ intf_bind_type ifn)
+             intf_bind_var (Just $ C.AddressOf $ lmp_bind_var `C.DerefField` "b"),
         C.SBlank,
 
         C.If (C.Call "err_is_ok" [errvar])
@@ -301,7 +304,8 @@ lmp_bind_cont_fn ifn =
                 [C.Ex $ C.Assignment errvar $
                         C.Call "err_push"
                                 [errvar, C.Variable "LIB_ERR_CHAN_REGISTER_RECV"],
-                 C.Goto "fail"] []]
+                 C.Goto "fail"] [],
+             C.Ex $ C.Call (connect_handlers_fn_name ifn) [C.Variable intf_bind_var]]
             [C.Label "fail",
              C.Ex $ C.Call (lmp_destroy_fn_name ifn) [lmp_bind_var]],
         C.SBlank,
@@ -594,7 +598,6 @@ tx_fn ifn typedefs msg@(Message mtype n args _) =
             C.StmtList [ tx_fn_arg_check_size ifn typedefs n a | a <- args ],
             C.SComment "check that we can accept an outgoing message",
             C.Ex $ C.Call "thread_mutex_lock" [C.AddressOf $ C.DerefField bindvar "send_mutex"],
-            localvar (C.Ptr $ C.Struct "waitset") "send_waitset" (Just $ C.DerefField bindvar "waitset"),
             C.Ex $ C.Assignment binding_error (C.Variable "SYS_ERR_OK"),
             C.If (C.Binary C.NotEquals tx_msgnum_field (C.NumConstant 0))
                 [C.Ex $ C.Call "thread_mutex_unlock" [C.AddressOf $ C.DerefField bindvar "send_mutex"],
index 451c8aa..5945e6e 100644 (file)
@@ -728,7 +728,6 @@ tx_fn ifn typedefs msg@(Message _ n args _) =
             C.If (C.Binary C.NotEquals tx_msgnum_field (C.NumConstant 0))
             [C.Return $ C.Variable "FLOUNDER_ERR_TX_BUSY"] [],
             C.SBlank,
-            localvar (C.Ptr $ C.Struct "waitset") "send_waitset" (Just $ C.DerefField bindvar "waitset"),
             C.SComment "register send continuation",
             C.StmtList $ register_txcont (C.Variable intf_cont_var),
             C.SBlank,
index 5147f75..eceeccc 100644 (file)
@@ -101,9 +101,7 @@ rpc_binding_struct name = C.StructDecl (rpc_bind_type name) fields
   where
     fields = [
         C.Param (C.Ptr $ C.Struct $ intf_bind_type name) "b",
-        C.Param (C.Struct $ rpc_vtbl_type name) "vtbl",
-        C.Param (C.Struct "waitset") "rpc_waitset",
-        C.Param (C.Struct "waitset_chanstate") "dummy_chanstate"]
+        C.Param (C.Struct $ rpc_vtbl_type name) "vtbl"]
 
 rpc_init_fn_proto :: String -> C.Unit
 rpc_init_fn_proto n =
@@ -185,7 +183,7 @@ rpc_fn ifn typedefs msg@(RPC n args _) =
         rpcvar = C.Variable rpc_bind_var
         rpc_progress_var = C.Call "thread_get_rpc_in_progress" []
         async_err_var = C.Call "thread_get_async_error" []
-        waitset_var = C.AddressOf $ C.DerefField rpcvar "rpc_waitset"
+        waitset_var = C.DerefField bindvar "waitset"
         bindvar = C.DerefField rpcvar "b"
         tx_func = C.DerefField bindvar "tx_vtbl" `C.FieldOf` (rpc_call_name n)
         tx_func_args = [bindvar, C.Variable "BLOCKING_CONT"] ++ (map C.Variable $ concat $ map mkargs txargs)
@@ -245,39 +243,22 @@ rpc_error_fn ifn = C.FunctionDef C.Static C.Void (rpc_error_fn_name ifn)
      C.If (C.Call "thread_get_rpc_in_progress" [])
         [C.Ex $ C.Call "assert" [C.Call "err_is_fail" [errvar]],
          C.Ex $ C.Call "thread_set_async_error" [errvar],
-         C.SComment "kick waitset with dummy event",
-         C.Ex $ C.Call "flounder_support_register"
-                    [waitset_addr, chanstate_addr,
-                     C.Variable "dummy_event_closure", C.Variable "true"]]
+         C.SComment "kick waitset with dummy event"]
         [C.Ex $ C.Call "USER_PANIC_ERR" [errvar, C.StringConstant "async error in RPC"]]
     ]
     where
         rpcvar = C.Variable rpc_bind_var
-        waitset_addr = C.AddressOf $ C.DerefField rpcvar "rpc_waitset"
-        chanstate_addr = C.AddressOf $ C.DerefField rpcvar "dummy_chanstate"
 
 rpc_init_fn :: String -> [MessageDef] -> C.Unit
 rpc_init_fn ifn ml = C.FunctionDef C.NoScope (C.TypeName "errval_t")
                             (rpc_init_fn_name ifn) (rpc_init_fn_params ifn) $
-    [localvar (C.TypeName "errval_t") errvar_name Nothing,
+    [
      C.SBlank,
      C.SComment "Setup state of RPC client object",
      C.Ex $ C.Assignment (C.DerefField rpcvar "b") bindvar,
-     C.Ex $ C.Call "waitset_init" [waitset_addr],
-     C.Ex $ C.Call "flounder_support_waitset_chanstate_init"
-                        [C.AddressOf $ C.DerefField rpcvar "dummy_chanstate"],
      C.Ex $ C.Assignment (C.DerefField rpcvar "vtbl") (C.Variable $ rpc_vtbl_name ifn),
      C.Ex $ C.Assignment (C.DerefField bindvar "st") rpcvar,
      C.SBlank,
-     C.SComment "Change waitset on binding",
-     C.Ex $ C.Assignment errvar $
-        C.CallInd (C.DerefField bindvar "change_waitset")
-                [bindvar, waitset_addr],
-     C.If (C.Call "err_is_fail" [errvar])
-        [C.Ex $ C.Call "waitset_destroy" [waitset_addr],
-         C.Return $ C.Call "err_push" [errvar, C.Variable "FLOUNDER_ERR_CHANGE_WAITSET"]]
-        [],
-     C.SBlank,
      C.SComment "Set RX handlers on binding object for RPCs",
      C.StmtList [C.Ex $ C.Assignment (C.FieldOf (C.DerefField bindvar "rx_vtbl")
                                         (rpc_resp_name mn))
@@ -292,7 +273,7 @@ rpc_init_fn ifn ml = C.FunctionDef C.NoScope (C.TypeName "errval_t")
     where
         rpcvar = C.Variable "rpc"
         bindvar = C.Variable "binding"
-        waitset_addr = C.AddressOf $ C.DerefField rpcvar "rpc_waitset"
+        waitset_addr = C.AddressOf $ C.DerefField bindvar "waitset"
 
 rpc_init_fn_params n = [C.Param (C.Ptr $ C.Struct (rpc_bind_type n)) "rpc",
                         C.Param (C.Ptr $ C.Struct (intf_bind_type n)) "binding"]
index 6464d79..53759b9 100644 (file)
@@ -1144,13 +1144,6 @@ tx_fn p ifn typedefs msg@(Message mtype n args _) (MsgSpec _ _ caps) =
             C.StmtList [ tx_fn_arg_check_size ifn typedefs n a | a <- args ],
             C.Ex $ C.Call "thread_mutex_lock" [C.AddressOf $ C.DerefField bindvar "send_mutex"],
             C.Ex $ C.Assignment binding_error (C.Variable "SYS_ERR_OK"),
-            localvar (C.Ptr $ C.Struct "waitset") "send_waitset" (Just $ C.DerefField bindvar "waitset"),
-            -- localvar (C.Struct "waitset") "tmp_waitset" Nothing,
-            -- C.If (C.Binary C.Equals ((C.Variable intf_cont_var) `C.FieldOf` "handler") (C.Variable "blocking_cont")) [
-            --     C.Ex $ C.Assignment (C.Variable "send_waitset") (C.AddressOf $ C.Variable "tmp_waitset"),
-            --     C.Ex $ C.Call "waitset_init" [C.Variable "send_waitset"]
-            -- ] [],
-
             C.SComment "check that we can accept an outgoing message",
             C.If (C.Binary C.NotEquals tx_msgnum_field (C.NumConstant 0))
                 [C.Ex $ C.Call "thread_mutex_unlock" [C.AddressOf $ C.DerefField bindvar "send_mutex"],
@@ -1211,7 +1204,8 @@ block_sending_with_caps p ifn cont_ex = [
 
             C.Ex $ C.Assignment binding_error $ C.Call "flounder_support_change_monitor_waitset" [monitor_binding, C.DerefField bindvar "waitset"],
             C.If (C.Binary C.Equals binding_error (C.Variable "SYS_ERR_OK")) [
-                C.Ex $ C.Assignment binding_error $ C.Call "wait_for_channel" [C.Variable "send_waitset", tx_cont_chanstate, C.AddressOf binding_error]] [],
+                C.Ex $ C.Assignment (C.DerefField tx_cont_chanstate "trigger") $ C.CallInd (bindvar `C.DerefField` "get_receiving_chanstate") [bindvar],
+                C.Ex $ C.Assignment binding_error $ C.Call "wait_for_channel" [C.DerefField bindvar "waitset", tx_cont_chanstate, C.AddressOf binding_error]] [],
             C.If (C.Binary C.Equals binding_error (C.Variable "SYS_ERR_OK")) [
                 C.Ex $ C.Assignment binding_error $ C.Call "flounder_support_change_monitor_waitset" [monitor_binding, C.Variable "ws"]] []
             ] [
@@ -1386,7 +1380,7 @@ rx_handler p ifn typedefs msgdefs msgs =
         rx_msgfrag_field = C.DerefField bindvar "rx_msg_fragment"
 
         call_cases = [C.Case (C.Variable $ msg_enum_elem_name ifn mn) (call_msgnum_case msgdef msg)
-                            | (msgdef, msg@(MsgSpec mn _ _)) <- zip msgdefs msgs]
+                            | (msgdef, msg@(MsgSpec mn _ caps)) <- zip msgdefs msgs, caps == []]
 
         call_msgnum_case msgdef@(Message mtype mn msgargs _) (MsgSpec _ frags caps) =
             [C.StmtList $ call_handler (ump_drv p) ifn typedefs mtype mn msgargs, C.Break]
@@ -1493,15 +1487,12 @@ rx_handler p ifn typedefs msgdefs msgs =
             | caps == [] = call_callback
             | otherwise = [
                 rx_fragment_increment,
-                C.If (C.Binary C.Equals
-                                    (capst `C.FieldOf` "rx_capnum")
-                                    (C.NumConstant $ toInteger $ length caps))
-                    call_callback
-                    [C.SComment "don't process anything else until we're done",
-                     C.Goto "out_no_reregister"]]
+                C.Ex $ C.Assignment (C.DerefField message_chanstate "trigger") $ C.Call "monitor_bind_get_receiving_chanstate" [ump_chan `C.DerefField` "monitor_binding"],
+                C.Goto "out_no_reregister"]
              where
                 call_callback = [C.StmtList $ finished_recv_nocall (ump_drv p) ifn typedefs mtype mn msgargs, C.Goto "out"]
                 ump_chan = C.AddressOf $ statevar `C.FieldOf` "chan"
+                message_chanstate = C.Binary C.Plus (C.DerefField bindvar "message_chanstate") (C.Variable $ msg_enum_elem_name ifn mn)
 
         rx_fragment_increment
             = C.Ex $ C.PostInc $ C.DerefField bindvar "rx_msg_fragment"
@@ -1548,7 +1539,7 @@ cap_rx_handler p ifn typedefs msgdefs msgspecs
                  | (MsgSpec mn frags caps, msgdef) <- zip msgspecs msgdefs, caps /= []]
         rx_msgnum_field = C.DerefField bindvar "rx_msgnum"
         call_cases = [C.Case (C.Variable $ msg_enum_elem_name ifn mn) (call_msgnum_case msgdef msg)
-                            | (msgdef, msg@(MsgSpec mn _ _)) <- zip msgdefs msgspecs]
+                            | (msgdef, msg@(MsgSpec mn _ caps)) <- zip msgdefs msgspecs, caps /= []]
 
         call_msgnum_case msgdef@(Message mtype mn msgargs _) (MsgSpec _ frags caps) =
             [C.StmtList $ call_handler (ump_drv p) ifn typedefs mtype mn msgargs, C.Break]
@@ -1575,6 +1566,7 @@ cap_rx_handler_case p ifn typedefs mn (Message mtype _ msgargs _) nfrags caps =
                 C.If (C.Binary C.Equals rx_msgfrag_field (C.NumConstant $ toInteger nfrags))
                     [
                         C.StmtList $ finished_recv_nocall (ump_drv p) ifn typedefs mtype mn msgargs,
+                        C.Ex $ C.Assignment (C.DerefField message_chanstate "trigger") $ C.CallInd (bindvar `C.DerefField` "get_receiving_chanstate") [bindvar],
                         C.If (C.Unary C.Not (C.Variable "no_register"))
                             [C.StmtList $ register_recv p ifn] [],
                         C.SBlank
@@ -1586,6 +1578,7 @@ cap_rx_handler_case p ifn typedefs mn (Message mtype _ msgargs _) nfrags caps =
                 is_last = (ncap + 1 == length caps)
                 statevar = C.DerefField my_bindvar "ump_state"
                 ump_chan = C.AddressOf $ statevar `C.FieldOf` "chan"
+                message_chanstate = C.Binary C.Plus (C.DerefField bindvar "message_chanstate") (C.Variable $ msg_enum_elem_name ifn mn)
 
 -- generate the code to register for receive notification
 register_recv :: UMPParams -> String -> [C.Stmt]
index 21e8ea3..48186f4 100644 (file)
@@ -41,7 +41,7 @@
 struct peer_core {
     coreid_t id;
     bool is_bound;
-    struct mem_thc_client_binding_t cl; 
+    struct mem_thc_client_binding_t cl;
     thc_lock_t lock;
 };
 
@@ -49,7 +49,7 @@ coreid_t mycore;
 static struct peer_core *peer_cores;
 static int num_peers;
 
-// FIXME: possible race if handling two concurrent alloc request that both 
+// FIXME: possible race if handling two concurrent alloc request that both
 // try to connect to the same peer
 static errval_t connect_peer(struct peer_core *peer)
 {
@@ -83,10 +83,10 @@ static errval_t connect_peer(struct peer_core *peer)
     return SYS_ERR_OK;
 }
 
-static errval_t steal_from_serv(struct peer_core *peer, 
-                                struct capref *ret_cap, 
-                                uint8_t bits, 
-                                genpaddr_t minbase, 
+static errval_t steal_from_serv(struct peer_core *peer,
+                                struct capref *ret_cap,
+                                uint8_t bits,
+                                genpaddr_t minbase,
                                 genpaddr_t maxlimit)
 {
     assert(peer != NULL);
@@ -103,10 +103,10 @@ static errval_t steal_from_serv(struct peer_core *peer,
         peer->is_bound = true;
     }
 
-    // due to the single-waiter rule of thc we need to make sure we only 
+    // due to the single-waiter rule of thc we need to make sure we only
     // ever have one of these rpcs outstanding at a time.
     thc_lock_acquire(&peer->lock);
-    peer->cl.call_seq.steal(&peer->cl, bits, minbase, maxlimit, 
+    peer->cl.call_seq.steal(&peer->cl, bits, minbase, maxlimit,
                                 &err, ret_cap);
     thc_lock_release(&peer->lock);
 
@@ -146,12 +146,12 @@ static errval_t rr_steal(struct capref *ret_cap, uint8_t bits,
     }
     */
 
-    return err; 
+    return err;
 }
 
 
-static errval_t steal_and_alloc(struct capref *ret_cap, uint8_t steal_bits, 
-                                uint8_t alloc_bits, 
+static errval_t steal_and_alloc(struct capref *ret_cap, uint8_t steal_bits,
+                                uint8_t alloc_bits,
                                 genpaddr_t minbase, genpaddr_t maxlimit)
 {
     errval_t err;
@@ -170,16 +170,6 @@ static errval_t steal_and_alloc(struct capref *ret_cap, uint8_t steal_bits,
         return err;
     }
 
-    // XXX: Hack to allow monitor to allocate memory while we call RPC into it
-    // These calls should just be avoided...
-    struct monitor_blocking_rpc_client *mc = get_monitor_blocking_rpc_client();
-    assert(mc != NULL);
-    struct waitset *oldws = monitor_mem_binding->waitset;
-    err = monitor_mem_binding->change_waitset(monitor_mem_binding, &mc->rpc_waitset);
-    if(err_is_fail(err)) {
-        USER_PANIC_ERR(err, "change_waitset");
-    }
-
     // XXX: Mark as local to this core, until we have x-core cap management
     err = monitor_cap_set_remote(ramcap, false);
     if(err_is_fail(err)) {
@@ -191,17 +181,10 @@ static errval_t steal_and_alloc(struct capref *ret_cap, uint8_t steal_bits,
     if (err_is_fail(err)) {
         return err_push(err, MON_ERR_CAP_IDENTIFY);
     }
-
-    // XXX: Reset waitset before THC becomes active again
-    err = monitor_mem_binding->change_waitset(monitor_mem_binding, oldws);
-    if(err_is_fail(err)) {
-        USER_PANIC_ERR(err, "change_waitset");
-    }
-
 #if 0
     debug_printf("STOLEN cap is type %d Ram base 0x%"PRIxGENPADDR
                  " (%"PRIuGENPADDR") Bits %d\n",
-                 info.type, info.u.ram.base, info.u.ram.base, 
+                 info.type, info.u.ram.base, info.u.ram.base,
                  info.u.ram.bits);
 #endif
     if(steal_bits != log2ceil(info.u.ram.bytes)) {
@@ -228,10 +211,10 @@ static errval_t steal_and_alloc(struct capref *ret_cap, uint8_t steal_bits,
 
     mem_avail += mem_to_add;
 
-    err = percore_alloc(ret_cap, alloc_bits, minbase, maxlimit);    
+    err = percore_alloc(ret_cap, alloc_bits, minbase, maxlimit);
 
     return err;
-} 
+}
 
 
 void try_steal(errval_t *ret, struct capref *cap, uint8_t bits,
@@ -245,7 +228,7 @@ void try_steal(errval_t *ret, struct capref *cap, uint8_t bits,
                                                                        __builtin_return_address(3),
                                                                        __builtin_return_address(4),
                                                                        __builtin_return_address(5));
-    //DEBUG_ERR(*ret, "allocation of %d bits in 0x%" PRIxGENPADDR 
+    //DEBUG_ERR(*ret, "allocation of %d bits in 0x%" PRIxGENPADDR
     //           "-0x%" PRIxGENPADDR " failed", bits, minbase, maxlimit);
     *ret = steal_and_alloc(cap, bits+1, bits, minbase, maxlimit);
     if (err_is_fail(*ret)) {
@@ -257,9 +240,9 @@ void try_steal(errval_t *ret, struct capref *cap, uint8_t bits,
 //     *cap = NULL_CAP;
 }
 
-errval_t init_peers(coreid_t core, int len_cores, coreid_t *cores) 
+errval_t init_peers(coreid_t core, int len_cores, coreid_t *cores)
 {
-    // initialise info about our peers 
+    // initialise info about our peers
     mycore = core;
     num_peers = len_cores;
     peer_cores = malloc(num_peers * sizeof(struct peer_core));
@@ -276,7 +259,7 @@ errval_t init_peers(coreid_t core, int len_cores, coreid_t *cores)
 }
 
 errval_t percore_steal_handler_common(uint8_t bits,
-                                      genpaddr_t minbase, 
+                                      genpaddr_t minbase,
                                       genpaddr_t maxlimit,
                                       struct capref *retcap)
 {
@@ -286,7 +269,7 @@ errval_t percore_steal_handler_common(uint8_t bits,
     trace_event(TRACE_SUBSYS_MEMSERV, TRACE_EVENT_MEMSERV_PERCORE_ALLOC, bits);
     /* debug_printf("%d: percore steal request: bits: %d\n", disp_get_core_id(), bits); */
 
-    // refill slot allocator if needed 
+    // refill slot allocator if needed
     err = slot_prealloc_refill(mm_percore.slot_alloc_inst);
     if (err_is_fail(err)) {
         DEBUG_ERR(err, "Warning: failure in slot_prealloc_refill\n");
@@ -294,17 +277,17 @@ errval_t percore_steal_handler_common(uint8_t bits,
         return err;
     }
 
-    // refill slab allocator if needed 
+    // refill slab allocator if needed
     err = slab_refill(&mm_percore.slabs);
     if (err_is_fail(err)) {
         DEBUG_ERR(err, "Warning: failure when refilling mm_percore slab\n");
     }
 
-    // get actual ram cap 
+    // get actual ram cap
     ret = percore_alloc(&cap, bits, minbase, maxlimit);
     if (err_is_fail(ret)){
         // debug_printf("percore steal request failed\n");
-        //DEBUG_ERR(ret, "allocation of stolen %d bits in 0x%" PRIxGENPADDR 
+        //DEBUG_ERR(ret, "allocation of stolen %d bits in 0x%" PRIxGENPADDR
         //          "-0x%" PRIxGENPADDR " failed", bits, minbase, maxlimit);
         cap = NULL_CAP;
     }