ump,flounder: moving a flow control from a flounder level to a ump level
authorAdam Turowski <adam.turowski@inf.ethz.ch>
Tue, 3 Jan 2017 12:23:07 +0000 (13:23 +0100)
committerAdam Turowski <adam.turowski@inf.ethz.ch>
Wed, 4 Jan 2017 09:03:12 +0000 (10:03 +0100)
Signed-off-by: Adam Turowski <adam.turowski@inf.ethz.ch>

12 files changed:
include/barrelfish/dispatcher.h
include/barrelfish/ump_chan.h
include/barrelfish/ump_impl.h
include/flounder/flounder_support_ump.h
lib/barrelfish/dispatch.c
lib/barrelfish/flounder_support.c
lib/barrelfish/include/threads_priv.h
lib/barrelfish/ump_chan.c
tools/flounder/BackendCommon.hs
tools/flounder/UMPCommon.hs
usr/bench/ump_bench/latency.c
usr/bench/ump_bench/main.c

index c345c90..845b0e6 100644 (file)
@@ -21,6 +21,7 @@
 #include <barrelfish/threads.h>
 
 struct lmp_chan;
+struct ump_chan;
 struct deferred_event;
 
 // Architecture generic user only dispatcher struct
@@ -45,6 +46,7 @@ struct dispatcher_generic {
 
     /// List of LMP channels waiting to retry a send
     struct lmp_chan *lmp_send_events_list;
+    struct ump_chan *ump_send_events_list;
 
     /// LMP endpoint heap state
     struct heap lmp_endpoint_heap;
index f1a143c..ab4aa66 100644 (file)
@@ -44,7 +44,9 @@ struct ump_chan {
 
     struct ump_chan_state send_chan;       ///< Outgoing UMP channel state
     struct ump_endpoint endpoint;          ///< Incoming UMP endpoint
-
+    struct waitset_chanstate send_waitset;
+    struct ump_chan *next, *prev;
+    
     /// connection state
     enum {UMP_DISCONNECTED,     ///< Disconnected
           UMP_BIND_WAIT,        ///< Waiting for bind reply
@@ -80,6 +82,9 @@ errval_t ump_chan_bind(struct ump_chan *uc, struct ump_bind_continuation cont,
                        struct capref notify_cap);
 errval_t ump_chan_accept(struct ump_chan *uc, uintptr_t mon_id,
                          struct capref frame, size_t inchanlen, size_t outchanlen);
+errval_t ump_chan_register_send(struct ump_chan *uc, struct waitset *ws,
+                                struct event_closure closure);
+void ump_channels_retry_send_disabled(dispatcher_handle_t handle);
 void ump_chan_send_bind_reply(struct monitor_binding *mb,
                               struct ump_chan *uc, errval_t err,
                               uintptr_t monitor_id, struct capref notify_cap);
@@ -134,6 +139,17 @@ static inline volatile struct ump_message *ump_chan_get_next(
     return ump_impl_get_next(&uc->send_chan, ctrl);
 }
 
+static inline bool ump_chan_can_send(struct ump_chan *uc)
+{
+    assert(uc != NULL);
+    return ump_impl_can_send(&uc->send_chan);
+}
+
+static inline void ump_chan_free_message(volatile struct ump_message *msg)
+{
+    ump_impl_free_message(msg);
+}
+
 /**
  * \brief Migrate an event registration made with
  * ump_chan_register_recv() to a new waitset
index bf7cd3b..da77994 100644 (file)
@@ -54,11 +54,11 @@ __BEGIN_DECLS
 
 // control word is 32-bit, because it must be possible to atomically write it
 typedef uint32_t ump_control_t;
-#define UMP_EPOCH_BITS  1
+#define UMP_USED_BITS  1
 #define UMP_HEADER_BITS 31
 
 struct ump_control {
-    ump_control_t epoch:UMP_EPOCH_BITS;
+    ump_control_t used:UMP_USED_BITS;
     ump_control_t header:UMP_HEADER_BITS;
     ump_control_t token:32;
 };
@@ -93,7 +93,6 @@ struct ump_chan_state {
     volatile struct ump_message *buf;  ///< Ring buffer
     ump_index_t        pos;            ///< Current position
     ump_index_t        bufmsgs;        ///< Buffer size in messages
-    bool               epoch;          ///< Next Message epoch
     enum ump_direction dir;            ///< Channel direction
 };
 
@@ -128,7 +127,6 @@ static inline errval_t ump_chan_state_init(struct ump_chan_state *c,
     c->buf = (volatile struct ump_message *) buf;
     c->dir = dir;
     c->bufmsgs = size / UMP_MSG_BYTES;
-    c->epoch = 1;
 
     if(dir == UMP_INCOMING) {
         ump_index_t i;
@@ -150,10 +148,9 @@ static inline errval_t ump_chan_state_init(struct ump_chan_state *c,
 static inline volatile struct ump_message *ump_impl_poll(struct ump_chan_state *c)
 {
     assert(c->dir == UMP_INCOMING);
-    ump_control_t ctrl_epoch =  c->buf[c->pos].header.control.epoch;
-    if (ctrl_epoch == c->epoch) {
+    ump_control_t ctrl_used =  c->buf[c->pos].header.control.used;
+    if (ctrl_used)
         return &c->buf[c->pos];
-    }
     return NULL;
 }
 
@@ -170,10 +167,8 @@ static inline volatile struct ump_message *ump_impl_recv(struct ump_chan_state *
     volatile struct ump_message *msg = ump_impl_poll(c);
 
     if (msg != NULL) {
-        if (++c->pos == c->bufmsgs) {
+        if (++c->pos == c->bufmsgs)
             c->pos = 0;
-            c->epoch = !c->epoch;
-        }
         return msg;
     }
     return NULL;
@@ -194,7 +189,7 @@ static inline volatile struct ump_message *ump_impl_get_next(
     assert(c->dir == UMP_OUTGOING);
 
     // construct header
-    ctrl->epoch = c->epoch;
+    ctrl->used = 1;
     ctrl->token = 0;
 
 #ifdef __x86_64__
@@ -211,16 +206,26 @@ static inline volatile struct ump_message *ump_impl_get_next(
 #endif
 
     volatile struct ump_message *msg = &c->buf[c->pos];
-
+    if (msg->header.control.used)
+        return NULL;
     // update pos
-    if (++c->pos == c->bufmsgs) {
+    if (++c->pos == c->bufmsgs)
         c->pos = 0;
-        c->epoch = !c->epoch;
-    }
-
     return msg;
 }
 
+static inline volatile bool ump_impl_can_send(struct ump_chan_state *c)
+{
+    assert(c->dir == UMP_OUTGOING);
+    volatile struct ump_message *msg = &c->buf[c->pos];
+    return !msg->header.control.used;
+}
+
+static inline void ump_impl_free_message(volatile struct ump_message *msg)
+{
+    msg->header.control.used = 0;
+}
+
 __END_DECLS
 
 #endif // UMP_IMPL_H
index 383b856..9a7cc64 100644 (file)
@@ -28,20 +28,14 @@ __BEGIN_DECLS
 
 /// Special message types
 enum flounder_ump_msgtype {
-    FL_UMP_ACK = 0,
-    FL_UMP_BIND = 1,
-    FL_UMP_BIND_REPLY = 2,
+    FL_UMP_BIND = 0,
+    FL_UMP_BIND_REPLY = 1,
     FL_UMP_CAP_ACK = (1 << FL_UMP_MSGTYPE_BITS) - 1,
 };
 
 struct flounder_ump_state {
     struct ump_chan chan;
 
-    ump_index_t next_id;   ///< Sequence number of next message to be sent
-    ump_index_t seq_id;    ///< Last sequence number received from remote
-    ump_index_t ack_id;    ///< Last sequence number acknowledged by remote
-    ump_index_t last_ack;  ///< Last acknowledgement we sent to remote
-
     struct flounder_cap_state capst; ///< State for indirect cap tx/rx machinery
     uint32_t token;        ///< Outgoing message's token
 };
@@ -65,20 +59,6 @@ errval_t flounder_stub_ump_recv_buf(volatile struct ump_message *msg,
                                     size_t maxsize);
 
 
-/// Computes (from seq/ack numbers) whether we can currently send a non-ack
-/// on the channel
-static inline bool flounder_stub_ump_can_send(struct flounder_ump_state *s) {
-    bool r = (ump_index_t)(s->next_id - s->ack_id) < s->chan.max_send_msgs;
-    return r;
-}
-
-/// Computes (from seq/ack numbers) whether we can currently send an ack
-/// on the channel
-static inline bool flounder_stub_ump_can_send_ack(struct flounder_ump_state *s) {
-    bool r = (ump_index_t)(s->next_id - s->ack_id) <= s->chan.max_send_msgs;
-    return r;
-}
-
 #define ENABLE_MESSAGE_PASSING_TRACE 1
 /// Prepare a "control" word (header for each UMP message fragment)
 static inline void flounder_stub_ump_control_fill(struct flounder_ump_state *s,
@@ -87,15 +67,12 @@ static inline void flounder_stub_ump_control_fill(struct flounder_ump_state *s,
 {
 #if ENABLE_MESSAGE_PASSING_TRACE
     trace_event_raw((((uint64_t)0xEA)<<56) |
-                    ((uint64_t)s->chan.sendid << 12) |
-                    (s->next_id & 0xffff));
+                    ((uint64_t)s->chan.sendid << 12));
 #endif // ENABLE_MESSAGE_PASSING_TRACE
     assert(s->chan.sendid != 0);
     assert(msgtype < (1 << FL_UMP_MSGTYPE_BITS)); // check for overflow
-    ctrl->header = ((uintptr_t)msgtype << UMP_INDEX_BITS) | (uintptr_t)s->seq_id;
+    ctrl->header = ((uintptr_t)msgtype << UMP_INDEX_BITS);
     ctrl->token = s->token;
-    s->last_ack = s->seq_id;
-    s->next_id++;
 }
 
 /// Process a "control" word
@@ -104,11 +81,8 @@ static inline int flounder_stub_ump_control_process(struct flounder_ump_state *s
 {
 #if ENABLE_MESSAGE_PASSING_TRACE
     trace_event_raw( (((uint64_t)0xEB)<<56) |
-                     ((uint64_t)s->chan.recvid << 12) |
-                     (s->seq_id & 0xffff));
+                     ((uint64_t)s->chan.recvid << 12));
 #endif // ENABLE_MESSAGE_PASSING_TRACE
-    s->ack_id = ctrl.header & UMP_INDEX_MASK;
-    s->seq_id++;
     return ctrl.header >> UMP_INDEX_BITS;
 }
 
@@ -125,30 +99,12 @@ static inline void flounder_stub_ump_barrier(void)
 #endif
 }
 
-/// Should we send an ACK?
-static inline bool flounder_stub_ump_needs_ack(struct flounder_ump_state *s)
-{
-    // send a forced ACK if the channel is full
-    // FIXME: should probably send it only when "nearly" full
-    return (ump_index_t)(s->seq_id - s->last_ack) >=
-        (ump_index_t)(s->chan.max_recv_msgs - 1);
-}
-/// Send an explicit ACK
-static inline void flounder_stub_ump_send_ack(struct flounder_ump_state *s)
-{
-    assert(flounder_stub_ump_can_send_ack(s));
-    struct ump_control ctrl;
-    volatile struct ump_message *msg = ump_chan_get_next(&s->chan, &ctrl);
-    flounder_stub_ump_control_fill(s, &ctrl, FL_UMP_ACK);
-    msg->header.control = ctrl;
-}
-
 /// Send a cap ACK (message that we are ready to receive caps)
 static inline void flounder_stub_ump_send_cap_ack(struct flounder_ump_state *s)
 {
-    assert(flounder_stub_ump_can_send(s));
     struct ump_control ctrl;
     volatile struct ump_message *msg = ump_chan_get_next(&s->chan, &ctrl);
+    assert(msg);
     flounder_stub_ump_control_fill(s, &ctrl, FL_UMP_CAP_ACK);
     msg->header.control = ctrl;
 }
index 78ba05e..c36aea3 100644 (file)
@@ -112,6 +112,7 @@ void disp_run(dispatcher_handle_t handle)
 #endif // CONFIG_INTERCONNECT_DRIVER_LMP
     // Check polled channels
     poll_channels_disabled(handle);
+    ump_channels_retry_send_disabled(handle);
 
     // Run, saving state of previous thread if required
     thread_run_disabled(handle);
index 06c3a6e..46c8acb 100644 (file)
@@ -388,10 +388,6 @@ errval_t flounder_stub_lmp_recv_string(struct lmp_recv_msg *msg, char *str,
 
 void flounder_stub_ump_state_init(struct flounder_ump_state *s, void *binding)
 {
-    s->next_id = 1;
-    s->seq_id = 0;
-    s->ack_id = 0;
-    s->last_ack = 0;
     s->token = 0;
     flounder_stub_cap_state_init(&s->capst, binding);
 }
@@ -406,11 +402,9 @@ errval_t flounder_stub_ump_send_buf(struct flounder_ump_state *s,
     int msgpos;
 
     do {
-        if (!flounder_stub_ump_can_send(s)) {
-            return FLOUNDER_ERR_BUF_SEND_MORE;
-        }
-
         msg = ump_chan_get_next(&s->chan, &ctrl);
+        if (!msg)
+            return FLOUNDER_ERR_BUF_SEND_MORE;
         flounder_stub_ump_control_fill(s, &ctrl, msgnum);
 
         // is this the start of the buffer?
index 905c565..1928353 100644 (file)
@@ -106,6 +106,7 @@ static inline bool havework_disabled(dispatcher_handle_t handle)
             || disp->lmp_send_events_list != NULL
 #endif
             || disp->polled_channels != NULL
+            || disp->ump_send_events_list != NULL
             ;
 }
 
index 131ed46..2c87a0e 100644 (file)
@@ -17,6 +17,9 @@
 #include <barrelfish/ump_chan.h>
 #include <barrelfish/idc_export.h>
 #include <if/monitor_defs.h>
+#include <barrelfish/waitset.h>
+#include <barrelfish/waitset_chan.h>
+#include "waitset_chan_priv.h"
 
 #define UMP_MAP_ATTR VREGION_FLAGS_READ_WRITE
 
@@ -58,7 +61,11 @@ errval_t ump_chan_init(struct ump_chan *uc,
     memset(&uc->cap_handlers, 0, sizeof(uc->cap_handlers));
     uc->iref = 0;
     uc->monitor_binding = get_monitor_binding(); // TODO: expose non-default to caller
-
+    waitset_chanstate_init(&uc->send_waitset, CHANTYPE_OTHER);
+    
+    uc->prev = NULL;
+    uc->next = NULL;
+    
     return SYS_ERR_OK;
 }
 
@@ -337,6 +344,65 @@ errval_t ump_chan_accept(struct ump_chan *uc, uintptr_t mon_id,
     return SYS_ERR_OK;
 }
 
+errval_t ump_chan_register_send(struct ump_chan *uc, struct waitset *ws,
+                                struct event_closure closure)
+{
+    assert(uc != NULL);
+    assert(ws != NULL);
+
+    errval_t err = waitset_chan_register(ws, &uc->send_waitset, closure);
+    assert(err_is_ok(err));
+
+    // enqueue in list of channels with a registered event to retry sending
+    assert(uc->next == NULL && uc->prev == NULL);
+    dispatcher_handle_t handle = disp_disable();
+    struct dispatcher_generic *dp = get_dispatcher_generic(handle);
+    if (dp->ump_send_events_list == NULL) {
+        dp->ump_send_events_list = uc;
+        uc->next = uc->prev = uc;
+    } else {
+        uc->prev = dp->ump_send_events_list->prev;
+        uc->next = dp->ump_send_events_list;
+        uc->prev->next = uc;
+        uc->next->prev = uc;
+    }
+    disp_enable(handle);
+
+    return err;
+}
+
+void ump_channels_retry_send_disabled(dispatcher_handle_t handle)
+{
+    struct dispatcher_generic *dp = get_dispatcher_generic(handle);
+    struct ump_chan *uc, *first = dp->ump_send_events_list, *next;
+    errval_t err;
+
+    for (uc = first; uc != NULL; uc = next) {
+        next = uc->next;
+        assert(next != NULL);
+        bool cs = ump_chan_can_send(uc);
+        if (cs) {
+            if (uc->next == uc) {
+                dp->ump_send_events_list = NULL;
+            } else {
+                uc->prev->next = uc->next;
+                uc->next->prev = uc->prev;
+                if (dp->ump_send_events_list == uc) {
+                    dp->ump_send_events_list = next;
+                    first = next;
+                }
+            }
+            uc->next = uc->prev = NULL;
+            err = waitset_chan_trigger_disabled(&uc->send_waitset, handle);
+            assert_disabled(err_is_ok(err)); // shouldn't fail
+        }
+        if (next == first) {
+            break; // wrapped
+        }
+    }
+}
+
+
 /// Initialise the UMP channel driver
 void ump_init(void)
 {
index fbee525..ddeff37 100644 (file)
@@ -455,8 +455,8 @@ finished_recv drvn ifn typedefs mtype mn msgargs
              C.Ex $ C.CallInd handler (bindvar:args)]
             [C.Ex $ C.Assignment (C.FieldOf message_chanstate "token") binding_incoming_token,
              C.Ex $ C.Call "flounder_support_trigger_chan" [C.AddressOf message_chanstate],
-             C.Ex $ C.Assignment (C.Variable "no_register") (C.NumConstant 1)],
-        C.Ex $ C.Assignment rx_msgnum_field (C.NumConstant 0)]
+             C.Ex $ C.Assignment (C.Variable "no_register") (C.NumConstant 1),
+             C.Ex $ C.Assignment rx_msgnum_field (C.NumConstant 0)]]
     where
         rx_msgnum_field = C.DerefField bindvar "rx_msgnum"
         handler = C.DerefField bindvar "rx_vtbl" `C.FieldOf` mn
index 53759b9..08f7def 100644 (file)
@@ -412,6 +412,7 @@ connect_fn p ifn =
            C.Return $
               C.Call "err_push" [errvar, C.Variable "LIB_ERR_UMP_CHAN_INIT"]]
           [],
+      C.Ex $ C.Assignment (C.FieldOf (common_field "tx_cont_chanstate") "trigger") (C.AddressOf $ C.FieldOf chanvar "send_waitset"),
       C.SBlank,
 
       C.Ex $ C.Assignment (sendvar) (C.DerefField (C.Variable "_frameinfo") "sendbase"),
@@ -472,6 +473,7 @@ accept_fn p ifn =
            C.Return $
               C.Call "err_push" [errvar, C.Variable "LIB_ERR_UMP_CHAN_INIT"]]
           [],
+      C.Ex $ C.Assignment (C.FieldOf (common_field "tx_cont_chanstate") "trigger") (C.AddressOf $ C.FieldOf chanvar "send_waitset"),
       C.SBlank,
 
       C.Ex $ C.Assignment (sendvar) (C.DerefField (C.Variable "_frameinfo") "sendbase"),
@@ -517,11 +519,13 @@ bind_fn p ifn =
         C.Ex $ C.Assignment (common_field "receive_next") (C.Variable $ receive_next_fn_name p ifn),
         C.Ex $ C.Assignment (common_field "get_receiving_chanstate") (C.Variable $ get_receiving_chanstate_fn_name p ifn),
         C.Ex $ C.Assignment (common_field "st") (C.Variable "st"),
+        C.Ex $ C.Assignment (C.FieldOf (common_field "tx_cont_chanstate") "trigger") (C.AddressOf $ C.FieldOf chanvar "send_waitset"),
         C.Ex $ C.Assignment (intf_bind_var `C.FieldOf` "bind_cont") (C.Variable intf_cont_var),
         C.Ex $ C.Assignment (my_bindvar `C.DerefField` "iref") (C.Variable "iref"),
         C.Ex $ C.Assignment (my_bindvar `C.DerefField` "inchanlen") (C.Variable "inchanlen"),
         C.Ex $ C.Assignment (my_bindvar `C.DerefField` "outchanlen") (C.Variable "outchanlen"),
         C.Ex $ C.Assignment (my_bindvar `C.DerefField` "no_cap_transfer") (C.Variable "0"),
+        C.Ex $ C.Assignment (C.FieldOf (common_field "tx_cont_chanstate") "trigger") (C.AddressOf $ C.FieldOf chanvar "send_waitset"),
         C.StmtList $ (ump_binding_extra_fields_init p),
         C.SBlank,
         C.SComment "do we need a new monitor binding?",
@@ -669,6 +673,7 @@ connect_handler_fn p ifn = C.FunctionDef C.NoScope (C.TypeName "errval_t")
     C.Ex $ C.Assignment (common_field "get_receiving_chanstate") (C.Variable $ get_receiving_chanstate_fn_name p ifn),
     C.Ex $ C.Assignment (my_bindvar `C.DerefField` "no_cap_transfer") (C.Variable "0"),
     C.StmtList $ (ump_connect_extra_fields_init p),
+    C.Ex $ C.Assignment (C.FieldOf (common_field "tx_cont_chanstate") "trigger") (C.AddressOf $ C.FieldOf chanvar "send_waitset"),
     C.SBlank,
 
     C.SComment "run user's connect handler",
@@ -887,14 +892,14 @@ tx_bind_msg p ifn =
       localvar (C.Struct "ump_control") "ctrl" Nothing,
       C.SBlank,
 
-      C.SComment "check if we can send another message",
-      C.If (C.Unary C.Not $ C.Call "flounder_stub_ump_can_send" [C.AddressOf umpst])
-          [C.Return (C.Variable "FLOUNDER_ERR_TX_BUSY")] [],
-      C.SBlank,
 
       C.SComment "send the next fragment",
       C.Ex $ C.Assignment ump_token (C.Variable "0"),
       C.Ex $ C.Assignment msgvar $ C.Call "ump_chan_get_next" [chanaddr, ctrladdr],
+      C.SComment "check if we can send another message",
+      C.If (C.Unary C.Not msgvar)
+          [C.Return (C.Variable "FLOUNDER_ERR_TX_BUSY")] [],
+      C.SBlank,
       C.Ex $ C.Call "flounder_stub_ump_control_fill"
                   [chanst, ctrladdr, C.Variable $ msg_enum_elem_name ifn "__bind"],
 --      C.StmtList
@@ -929,14 +934,12 @@ tx_bind_reply p ifn =
       localvar (C.Struct "ump_control") "ctrl" Nothing,
       C.SBlank,
 
-      C.SComment "check if we can send another message",
-      C.If (C.Unary C.Not $ C.Call "flounder_stub_ump_can_send" [C.AddressOf umpst])
-        [C.Return (C.Variable "FLOUNDER_ERR_TX_BUSY")] [],
-      C.SBlank,
-
       C.SComment "send the next fragment",
       C.Ex $ C.Assignment ump_token (C.Variable "0"),
       C.Ex $ C.Assignment msgvar $ C.Call "ump_chan_get_next" [chanaddr, ctrladdr],
+      C.SComment "check if we can send another message",
+      C.If (C.Unary C.Not msgvar)
+          [C.Return (C.Variable "FLOUNDER_ERR_TX_BUSY")] [],
       C.Ex $ C.Call "flounder_stub_ump_control_fill"
                   [chanst, ctrladdr, C.Variable $ msg_enum_elem_name ifn "__bind_reply"],
 --      C.StmtList
@@ -973,12 +976,9 @@ tx_handler p ifn msgs =
         C.SBlank,
 
         C.SComment "do we need to (and can we) send a cap ack?",
-        C.If (C.Binary C.And
-                    (capst `C.FieldOf` "tx_cap_ack")
-                    (C.Call "flounder_stub_ump_can_send" [C.AddressOf umpst]))
+        C.If (capst `C.FieldOf` "tx_cap_ack")
             [C.Ex $ C.Call "flounder_stub_ump_send_cap_ack" [C.AddressOf umpst],
-             C.Ex $ C.Assignment (capst `C.FieldOf` "tx_cap_ack") (C.Variable "false"),
-             C.Ex $ C.Assignment (C.Variable "tx_notify") (C.Variable "true")] [],
+             C.Ex $ C.Assignment (capst `C.FieldOf` "tx_cap_ack") (C.Variable "false")] [],
         C.SBlank,
 
         C.SComment "Switch on current outgoing message number",
@@ -987,9 +987,16 @@ tx_handler p ifn msgs =
                 report_user_err (C.Variable "FLOUNDER_ERR_INVALID_STATE")],
         C.SBlank,
 
-        C.SComment "Send a notification if necessary",
+        C.SComment "Retry send",
         C.If (C.Variable "tx_notify")
-            (ump_notify p) []
+            [
+            localvar (C.Struct "event_closure") "retry_closure"
+                (Just $ C.StructConstant "event_closure" [
+                ("handler", C.Variable $ tx_handler_name p ifn), ("arg", C.Variable "arg")]),
+            C.Ex $ C.Assignment errvar (C.Call "ump_chan_register_send" [
+                chanaddr, C.DerefField bindvar "waitset", C.Variable "retry_closure"]),
+            C.Ex $ C.Call "assert" [C.Call "err_is_ok" [errvar]]
+            ] []
     ]
     where
         inc_fragnum = C.Ex $ C.PostInc $ C.DerefField bindvar "tx_msg_fragment"
@@ -1047,19 +1054,18 @@ tx_handler p ifn msgs =
 
                 | otherwise = -- more fragments to go
                 [inc_fragnum, C.SComment "fall through to next fragment"]
-
+        statevar = C.DerefField my_bindvar "ump_state"
+        chanaddr = C.AddressOf $ C.FieldOf statevar "chan"
 
 tx_handler_case :: UMPParams -> String -> String -> MsgFragment -> [C.Stmt]
 tx_handler_case p ifn mn (MsgFragment words) = [
-    C.SComment "check if we can send another message",
-    C.If (C.Unary C.Not $ C.Call "flounder_stub_ump_can_send" [stateaddr])
-        [C.Ex $ C.Assignment (C.Variable "tx_notify") (C.Variable "true"),
-         C.Break] [],
-    C.SBlank,
-
     C.SComment "send the next fragment",
     C.Ex $ C.Assignment ump_token binding_outgoing_token,
     C.Ex $ C.Assignment msgvar $ C.Call "ump_chan_get_next" [chanaddr, ctrladdr],
+    C.SComment "check if we can send another message",
+    C.If (C.Unary C.Not msgvar)
+      [C.Ex $ C.Assignment (C.Variable "tx_notify") (C.Variable "true"),
+       C.Break] [],
     C.Ex $ C.Call "flounder_stub_ump_control_fill"
                 [stateaddr, ctrladdr, C.Variable $ msg_enum_elem_name ifn mn],
     C.StmtList
@@ -1265,14 +1271,6 @@ rx_handler p ifn typedefs msgdefs msgs =
         C.Label "out_no_reregister",
         C.Ex $ C.Variable "__attribute__((unused))",
 
-        C.SComment "run our send process, if we need to",
-        C.If tx_is_busy [run_tx] [
-              C.SComment "otherwise send a forced ack if the channel is now full",
-              C.If (C.Call "flounder_stub_ump_needs_ack" [stateaddr])
-                   [C.Ex $ C.Call "flounder_stub_ump_send_ack" [stateaddr],
-                    C.StmtList $ ump_notify p]
-                   []
-             ],
         C.If (C.Variable "call_msgnum") [C.Ex $ C.Assignment rx_msgnum_field (C.NumConstant 0)] [],
         C.Ex $ C.Call "thread_mutex_unlock" [C.AddressOf $ C.DerefField bindvar "rxtx_mutex"],
         C.Switch (C.Variable "call_msgnum") call_cases [C.Break]
@@ -1293,6 +1291,7 @@ rx_handler p ifn typedefs msgdefs msgs =
                     [C.SComment "real error",
                      report_user_err $ C.Call "err_push"
                                    [errvar, C.Variable "LIB_ERR_UMP_CHAN_RECV"],
+                     C.Ex $ C.Call "thread_mutex_unlock" [C.AddressOf $ C.DerefField bindvar "rxtx_mutex"],
                      C.ReturnVoid] ]
                 [],
             C.SBlank,
@@ -1304,41 +1303,39 @@ rx_handler p ifn typedefs msgdefs msgs =
                      C.Variable "msg" `C.DerefField` "header" `C.FieldOf` "control"],
             C.SBlank,
 
-            C.SComment "is this a dummy message (ACK)?",
-            C.If (C.Binary C.Equals (C.Variable "msgnum") (C.Variable "FL_UMP_ACK"))
-                [C.Goto "loopnext"] [],
-            C.SBlank,
-
             C.SComment "is this a binding message of connect/accept?",
             C.If (C.Binary C.Equals (C.Variable "msgnum") (C.Variable "FL_UMP_BIND")) [
-                C.If ((C.Binary C.Equals (C.DerefField my_bindvar "is_client")) (C.Variable "1")) [
+              C.Ex $ C.Call "ump_chan_free_message" [C.Variable "msg"],
+                 C.If ((C.Binary C.Equals (C.DerefField my_bindvar "is_client")) (C.Variable "1")) [
                   C.SComment "Client should not recv bind messages. Ignore.",
-                  C.Goto "loopnext"] [],
+                  C.Continue] [],
               C.SComment "handle bind reply: calling bind callback",
               C.Ex $ C.CallInd (bindvar `C.DerefField` "bind_cont")
                   [bindvar `C.DerefField` "st", errvar, bindvar],
                   C.Ex $ C.Call (tx_bind_reply_fn_name p ifn) [my_bindvar],
-              C.Goto "loopnext"] [],
+              C.Continue] [],
             C.SBlank,
 
             C.SComment "is this a binding reply message of connect/accept?",
             C.If (C.Binary C.Equals (C.Variable "msgnum") (C.Variable "FL_UMP_BIND_REPLY")) [
-              C.If ((C.Binary C.Equals (C.DerefField my_bindvar "is_client")) (C.Variable "0")) [
+               C.Ex $ C.Call "ump_chan_free_message" [C.Variable "msg"],
+               C.If ((C.Binary C.Equals (C.DerefField my_bindvar "is_client")) (C.Variable "0")) [
                 C.SComment "Server should not recv bind messages. Ignore.",
-                C.Goto "loopnext"] [],
+                C.Continue] [],
               C.SComment "handle bind: calling connect callback",
               C.Ex $ C.CallInd (bindvar `C.DerefField` "bind_cont")
                   [bindvar `C.DerefField` "st", errvar, bindvar],
-              C.Goto "loopnext"] [],
+              C.Continue] [],
             C.SBlank,
 
             C.SComment "is this a cap ack for a pending tx message",
             C.If (C.Binary C.Equals (C.Variable "msgnum") (C.Variable "FL_UMP_CAP_ACK"))
-                [C.Ex $ C.Call "assert" [C.Unary C.Not (capst `C.FieldOf` "rx_cap_ack")],
+                [C.Ex $ C.Call "ump_chan_free_message" [C.Variable "msg"],
+                 C.Ex $ C.Call "assert" [C.Unary C.Not (capst `C.FieldOf` "rx_cap_ack")],
                  C.Ex $ C.Assignment (capst `C.FieldOf` "rx_cap_ack") (C.Variable "true"),
                  C.If (capst `C.FieldOf` "monitor_mutex_held")
                     [C.Ex $ C.Call (tx_cap_handler_name p ifn) [my_bindvar]] [],
-                 C.Goto "loopnext"]
+                 C.Continue]
                 [],
             C.SBlank,
 
@@ -1350,18 +1347,7 @@ rx_handler p ifn typedefs msgdefs msgs =
             C.SBlank,
 
             C.SComment "switch on message number and fragment number",
-            C.Switch rx_msgnum_field msgnum_cases bad_msgnum,
-            C.SBlank,
-
-            C.Label "loopnext",
-            C.SComment "send an ack if the channel is now full",
-            C.If (C.Call "flounder_stub_ump_needs_ack" [stateaddr])
-                 [C.SComment "run our send process if we need to",
-                  C.If tx_is_busy
-                       [run_tx]
-                       [C.Ex $ C.Call "flounder_stub_ump_send_ack" [stateaddr],
-                        C.StmtList $ ump_notify p]
-                 ] []
+            C.Switch rx_msgnum_field msgnum_cases bad_msgnum
             ]
 
         tx_is_busy = C.Binary C.Or
@@ -1396,8 +1382,9 @@ rx_handler p ifn typedefs msgdefs msgs =
                     start_recv (ump_drv p) ifn typedefs mn msgargs ++
                     (if caps /= [] then [
                         -- + with caps received
-                        C.Ex $ C.Assignment
-                            (capst `C.FieldOf` "tx_cap_ack") (C.Variable "true"),
+                        C.Ex $ C.Call "flounder_stub_ump_send_cap_ack" [C.AddressOf umpst],
+                        -- C.Ex $ C.Assignment
+                        --     (capst `C.FieldOf` "tx_cap_ack") (C.Variable "true"),
                         C.Ex $ C.Assignment
                             (capst `C.FieldOf` "rx_capnum") (C.NumConstant 0)
                         ] else [])
@@ -1406,6 +1393,8 @@ rx_handler p ifn typedefs msgdefs msgs =
                  | (frag, i) <- zip frags [0..] ]
                 bad_msgfrag,
             C.Break]
+            where
+                umpst = C.DerefField my_bindvar "ump_state"
 
         bad_msgnum = [report_user_err $ C.Variable "FLOUNDER_ERR_RX_INVALID_MSGNUM",
                       C.Goto "out"]
@@ -1419,6 +1408,7 @@ rx_handler p ifn typedefs msgdefs msgs =
                                  | (afl, word) <- zip wl [0..]],
             (if isFirst then C.Ex $ C.Assignment binding_incoming_token ump_token else C.SBlank),
             C.SBlank,
+            C.Ex $ C.Call "ump_chan_free_message" [C.Variable "msg"],
             C.StmtList $ msgfrag_case_prolog msg caps isLast,
             C.Goto "out"]
             where
@@ -1430,6 +1420,7 @@ rx_handler p ifn typedefs msgdefs msgs =
         msgfrag_case msg@(Message _ mn _ _) (OverflowFragment (StringFragment af)) caps isFirst isLast = [
             C.Ex $ C.Assignment errvar (C.Call "flounder_stub_ump_recv_string" args),
             (if isFirst then C.Ex $ C.Assignment binding_incoming_token ump_token else C.SBlank),
+            C.Ex $ C.Call "ump_chan_free_message" [C.Variable "msg"],
             C.If (C.Call "err_is_ok" [errvar])
                 (msgfrag_case_prolog msg caps isLast)
                 -- error from string receive code, check if it's permanent
@@ -1454,6 +1445,7 @@ rx_handler p ifn typedefs msgdefs msgs =
 
         msgfrag_case msg@(Message _ mn _ _) (OverflowFragment (BufferFragment _ afn afl)) caps isFirst isLast = [
             C.Ex $ C.Assignment errvar (C.Call "flounder_stub_ump_recv_buf" args),
+            C.Ex $ C.Call "ump_chan_free_message" [C.Variable "msg"],
             C.If (C.Call "err_is_ok" [errvar])
                 (msgfrag_case_prolog msg caps isLast)
                 -- error from receive code, check if it's permanent
index 7df2dc0..95c7c8c 100644 (file)
@@ -36,9 +36,10 @@ void experiment(coreid_t idx)
         volatile struct ump_message *msg;
         struct ump_control ctrl;
         timestamps[i].time0 = bench_tsc();
-        msg = ump_impl_get_next(send, &ctrl);
+        while (!(msg = ump_impl_get_next(send, &ctrl)));
         msg->header.control = ctrl;
-        while (!ump_impl_recv(recv));
+        while (!(msg = ump_impl_recv(recv)));
+        ump_impl_free_message(msg);
         timestamps[i].time1 = bench_tsc();
     }
 
index 8b0648a..b30b3a4 100644 (file)
@@ -62,8 +62,9 @@ static void bind_cb(void *st, errval_t binderr, struct bench_binding *b)
     while (1) {
         volatile struct ump_message *msg;
         struct ump_control ctrl;
-        while (!ump_impl_recv(recv));
-        msg = ump_impl_get_next(send, &ctrl);
+        while (!(msg = ump_impl_recv(recv)));
+        ump_impl_free_message(msg);
+        while (!(msg = ump_impl_get_next(send, &ctrl)));
         msg->header.control = ctrl;
     }
 }