/// Special message types
enum flounder_ump_msgtype {
- FL_UMP_ACK = 0,
- FL_UMP_BIND = 1,
- FL_UMP_BIND_REPLY = 2,
+ FL_UMP_BIND = 0,
+ FL_UMP_BIND_REPLY = 1,
FL_UMP_CAP_ACK = (1 << FL_UMP_MSGTYPE_BITS) - 1,
};
struct flounder_ump_state {
struct ump_chan chan;
- ump_index_t next_id; ///< Sequence number of next message to be sent
- ump_index_t seq_id; ///< Last sequence number received from remote
- ump_index_t ack_id; ///< Last sequence number acknowledged by remote
- ump_index_t last_ack; ///< Last acknowledgement we sent to remote
-
struct flounder_cap_state capst; ///< State for indirect cap tx/rx machinery
uint32_t token; ///< Outgoing message's token
};
size_t maxsize);
-/// Computes (from seq/ack numbers) whether we can currently send a non-ack
-/// on the channel
-static inline bool flounder_stub_ump_can_send(struct flounder_ump_state *s) {
- bool r = (ump_index_t)(s->next_id - s->ack_id) < s->chan.max_send_msgs;
- return r;
-}
-
-/// Computes (from seq/ack numbers) whether we can currently send an ack
-/// on the channel
-static inline bool flounder_stub_ump_can_send_ack(struct flounder_ump_state *s) {
- bool r = (ump_index_t)(s->next_id - s->ack_id) <= s->chan.max_send_msgs;
- return r;
-}
-
#define ENABLE_MESSAGE_PASSING_TRACE 1
/// Prepare a "control" word (header for each UMP message fragment)
static inline void flounder_stub_ump_control_fill(struct flounder_ump_state *s,
{
#if ENABLE_MESSAGE_PASSING_TRACE
trace_event_raw((((uint64_t)0xEA)<<56) |
- ((uint64_t)s->chan.sendid << 12) |
- (s->next_id & 0xffff));
+ ((uint64_t)s->chan.sendid << 12));
#endif // ENABLE_MESSAGE_PASSING_TRACE
assert(s->chan.sendid != 0);
assert(msgtype < (1 << FL_UMP_MSGTYPE_BITS)); // check for overflow
- ctrl->header = ((uintptr_t)msgtype << UMP_INDEX_BITS) | (uintptr_t)s->seq_id;
+ ctrl->header = ((uintptr_t)msgtype << UMP_INDEX_BITS);
ctrl->token = s->token;
- s->last_ack = s->seq_id;
- s->next_id++;
}
/// Process a "control" word
{
#if ENABLE_MESSAGE_PASSING_TRACE
trace_event_raw( (((uint64_t)0xEB)<<56) |
- ((uint64_t)s->chan.recvid << 12) |
- (s->seq_id & 0xffff));
+ ((uint64_t)s->chan.recvid << 12));
#endif // ENABLE_MESSAGE_PASSING_TRACE
- s->ack_id = ctrl.header & UMP_INDEX_MASK;
- s->seq_id++;
return ctrl.header >> UMP_INDEX_BITS;
}
#endif
}
-/// Should we send an ACK?
-static inline bool flounder_stub_ump_needs_ack(struct flounder_ump_state *s)
-{
- // send a forced ACK if the channel is full
- // FIXME: should probably send it only when "nearly" full
- return (ump_index_t)(s->seq_id - s->last_ack) >=
- (ump_index_t)(s->chan.max_recv_msgs - 1);
-}
-/// Send an explicit ACK
-static inline void flounder_stub_ump_send_ack(struct flounder_ump_state *s)
-{
- assert(flounder_stub_ump_can_send_ack(s));
- struct ump_control ctrl;
- volatile struct ump_message *msg = ump_chan_get_next(&s->chan, &ctrl);
- flounder_stub_ump_control_fill(s, &ctrl, FL_UMP_ACK);
- msg->header.control = ctrl;
-}
-
/// Send a cap ACK (message that we are ready to receive caps)
static inline void flounder_stub_ump_send_cap_ack(struct flounder_ump_state *s)
{
- assert(flounder_stub_ump_can_send(s));
struct ump_control ctrl;
volatile struct ump_message *msg = ump_chan_get_next(&s->chan, &ctrl);
+ assert(msg);
flounder_stub_ump_control_fill(s, &ctrl, FL_UMP_CAP_ACK);
msg->header.control = ctrl;
}
C.Return $
C.Call "err_push" [errvar, C.Variable "LIB_ERR_UMP_CHAN_INIT"]]
[],
+ C.Ex $ C.Assignment (C.FieldOf (common_field "tx_cont_chanstate") "trigger") (C.AddressOf $ C.FieldOf chanvar "send_waitset"),
C.SBlank,
C.Ex $ C.Assignment (sendvar) (C.DerefField (C.Variable "_frameinfo") "sendbase"),
C.Return $
C.Call "err_push" [errvar, C.Variable "LIB_ERR_UMP_CHAN_INIT"]]
[],
+ C.Ex $ C.Assignment (C.FieldOf (common_field "tx_cont_chanstate") "trigger") (C.AddressOf $ C.FieldOf chanvar "send_waitset"),
C.SBlank,
C.Ex $ C.Assignment (sendvar) (C.DerefField (C.Variable "_frameinfo") "sendbase"),
C.Ex $ C.Assignment (common_field "receive_next") (C.Variable $ receive_next_fn_name p ifn),
C.Ex $ C.Assignment (common_field "get_receiving_chanstate") (C.Variable $ get_receiving_chanstate_fn_name p ifn),
C.Ex $ C.Assignment (common_field "st") (C.Variable "st"),
+ C.Ex $ C.Assignment (C.FieldOf (common_field "tx_cont_chanstate") "trigger") (C.AddressOf $ C.FieldOf chanvar "send_waitset"),
C.Ex $ C.Assignment (intf_bind_var `C.FieldOf` "bind_cont") (C.Variable intf_cont_var),
C.Ex $ C.Assignment (my_bindvar `C.DerefField` "iref") (C.Variable "iref"),
C.Ex $ C.Assignment (my_bindvar `C.DerefField` "inchanlen") (C.Variable "inchanlen"),
C.Ex $ C.Assignment (my_bindvar `C.DerefField` "outchanlen") (C.Variable "outchanlen"),
C.Ex $ C.Assignment (my_bindvar `C.DerefField` "no_cap_transfer") (C.Variable "0"),
+ C.Ex $ C.Assignment (C.FieldOf (common_field "tx_cont_chanstate") "trigger") (C.AddressOf $ C.FieldOf chanvar "send_waitset"),
C.StmtList $ (ump_binding_extra_fields_init p),
C.SBlank,
C.SComment "do we need a new monitor binding?",
C.Ex $ C.Assignment (common_field "get_receiving_chanstate") (C.Variable $ get_receiving_chanstate_fn_name p ifn),
C.Ex $ C.Assignment (my_bindvar `C.DerefField` "no_cap_transfer") (C.Variable "0"),
C.StmtList $ (ump_connect_extra_fields_init p),
+ C.Ex $ C.Assignment (C.FieldOf (common_field "tx_cont_chanstate") "trigger") (C.AddressOf $ C.FieldOf chanvar "send_waitset"),
C.SBlank,
C.SComment "run user's connect handler",
localvar (C.Struct "ump_control") "ctrl" Nothing,
C.SBlank,
- C.SComment "check if we can send another message",
- C.If (C.Unary C.Not $ C.Call "flounder_stub_ump_can_send" [C.AddressOf umpst])
- [C.Return (C.Variable "FLOUNDER_ERR_TX_BUSY")] [],
- C.SBlank,
C.SComment "send the next fragment",
C.Ex $ C.Assignment ump_token (C.Variable "0"),
C.Ex $ C.Assignment msgvar $ C.Call "ump_chan_get_next" [chanaddr, ctrladdr],
+ C.SComment "check if we can send another message",
+ C.If (C.Unary C.Not msgvar)
+ [C.Return (C.Variable "FLOUNDER_ERR_TX_BUSY")] [],
+ C.SBlank,
C.Ex $ C.Call "flounder_stub_ump_control_fill"
[chanst, ctrladdr, C.Variable $ msg_enum_elem_name ifn "__bind"],
-- C.StmtList
localvar (C.Struct "ump_control") "ctrl" Nothing,
C.SBlank,
- C.SComment "check if we can send another message",
- C.If (C.Unary C.Not $ C.Call "flounder_stub_ump_can_send" [C.AddressOf umpst])
- [C.Return (C.Variable "FLOUNDER_ERR_TX_BUSY")] [],
- C.SBlank,
-
C.SComment "send the next fragment",
C.Ex $ C.Assignment ump_token (C.Variable "0"),
C.Ex $ C.Assignment msgvar $ C.Call "ump_chan_get_next" [chanaddr, ctrladdr],
+ C.SComment "check if we can send another message",
+ C.If (C.Unary C.Not msgvar)
+ [C.Return (C.Variable "FLOUNDER_ERR_TX_BUSY")] [],
C.Ex $ C.Call "flounder_stub_ump_control_fill"
[chanst, ctrladdr, C.Variable $ msg_enum_elem_name ifn "__bind_reply"],
-- C.StmtList
C.SBlank,
C.SComment "do we need to (and can we) send a cap ack?",
- C.If (C.Binary C.And
- (capst `C.FieldOf` "tx_cap_ack")
- (C.Call "flounder_stub_ump_can_send" [C.AddressOf umpst]))
+ C.If (capst `C.FieldOf` "tx_cap_ack")
[C.Ex $ C.Call "flounder_stub_ump_send_cap_ack" [C.AddressOf umpst],
- C.Ex $ C.Assignment (capst `C.FieldOf` "tx_cap_ack") (C.Variable "false"),
- C.Ex $ C.Assignment (C.Variable "tx_notify") (C.Variable "true")] [],
+ C.Ex $ C.Assignment (capst `C.FieldOf` "tx_cap_ack") (C.Variable "false")] [],
C.SBlank,
C.SComment "Switch on current outgoing message number",
report_user_err (C.Variable "FLOUNDER_ERR_INVALID_STATE")],
C.SBlank,
- C.SComment "Send a notification if necessary",
+ C.SComment "Retry send",
C.If (C.Variable "tx_notify")
- (ump_notify p) []
+ [
+ localvar (C.Struct "event_closure") "retry_closure"
+ (Just $ C.StructConstant "event_closure" [
+ ("handler", C.Variable $ tx_handler_name p ifn), ("arg", C.Variable "arg")]),
+ C.Ex $ C.Assignment errvar (C.Call "ump_chan_register_send" [
+ chanaddr, C.DerefField bindvar "waitset", C.Variable "retry_closure"]),
+ C.Ex $ C.Call "assert" [C.Call "err_is_ok" [errvar]]
+ ] []
]
where
inc_fragnum = C.Ex $ C.PostInc $ C.DerefField bindvar "tx_msg_fragment"
| otherwise = -- more fragments to go
[inc_fragnum, C.SComment "fall through to next fragment"]
-
+ statevar = C.DerefField my_bindvar "ump_state"
+ chanaddr = C.AddressOf $ C.FieldOf statevar "chan"
tx_handler_case :: UMPParams -> String -> String -> MsgFragment -> [C.Stmt]
tx_handler_case p ifn mn (MsgFragment words) = [
- C.SComment "check if we can send another message",
- C.If (C.Unary C.Not $ C.Call "flounder_stub_ump_can_send" [stateaddr])
- [C.Ex $ C.Assignment (C.Variable "tx_notify") (C.Variable "true"),
- C.Break] [],
- C.SBlank,
-
C.SComment "send the next fragment",
C.Ex $ C.Assignment ump_token binding_outgoing_token,
C.Ex $ C.Assignment msgvar $ C.Call "ump_chan_get_next" [chanaddr, ctrladdr],
+ C.SComment "check if we can send another message",
+ C.If (C.Unary C.Not msgvar)
+ [C.Ex $ C.Assignment (C.Variable "tx_notify") (C.Variable "true"),
+ C.Break] [],
C.Ex $ C.Call "flounder_stub_ump_control_fill"
[stateaddr, ctrladdr, C.Variable $ msg_enum_elem_name ifn mn],
C.StmtList
C.Label "out_no_reregister",
C.Ex $ C.Variable "__attribute__((unused))",
- C.SComment "run our send process, if we need to",
- C.If tx_is_busy [run_tx] [
- C.SComment "otherwise send a forced ack if the channel is now full",
- C.If (C.Call "flounder_stub_ump_needs_ack" [stateaddr])
- [C.Ex $ C.Call "flounder_stub_ump_send_ack" [stateaddr],
- C.StmtList $ ump_notify p]
- []
- ],
C.If (C.Variable "call_msgnum") [C.Ex $ C.Assignment rx_msgnum_field (C.NumConstant 0)] [],
C.Ex $ C.Call "thread_mutex_unlock" [C.AddressOf $ C.DerefField bindvar "rxtx_mutex"],
C.Switch (C.Variable "call_msgnum") call_cases [C.Break]
[C.SComment "real error",
report_user_err $ C.Call "err_push"
[errvar, C.Variable "LIB_ERR_UMP_CHAN_RECV"],
+ C.Ex $ C.Call "thread_mutex_unlock" [C.AddressOf $ C.DerefField bindvar "rxtx_mutex"],
C.ReturnVoid] ]
[],
C.SBlank,
C.Variable "msg" `C.DerefField` "header" `C.FieldOf` "control"],
C.SBlank,
- C.SComment "is this a dummy message (ACK)?",
- C.If (C.Binary C.Equals (C.Variable "msgnum") (C.Variable "FL_UMP_ACK"))
- [C.Goto "loopnext"] [],
- C.SBlank,
-
C.SComment "is this a binding message of connect/accept?",
C.If (C.Binary C.Equals (C.Variable "msgnum") (C.Variable "FL_UMP_BIND")) [
- C.If ((C.Binary C.Equals (C.DerefField my_bindvar "is_client")) (C.Variable "1")) [
+ C.Ex $ C.Call "ump_chan_free_message" [C.Variable "msg"],
+ C.If ((C.Binary C.Equals (C.DerefField my_bindvar "is_client")) (C.Variable "1")) [
C.SComment "Client should not recv bind messages. Ignore.",
- C.Goto "loopnext"] [],
+ C.Continue] [],
C.SComment "handle bind reply: calling bind callback",
C.Ex $ C.CallInd (bindvar `C.DerefField` "bind_cont")
[bindvar `C.DerefField` "st", errvar, bindvar],
C.Ex $ C.Call (tx_bind_reply_fn_name p ifn) [my_bindvar],
- C.Goto "loopnext"] [],
+ C.Continue] [],
C.SBlank,
C.SComment "is this a binding reply message of connect/accept?",
C.If (C.Binary C.Equals (C.Variable "msgnum") (C.Variable "FL_UMP_BIND_REPLY")) [
- C.If ((C.Binary C.Equals (C.DerefField my_bindvar "is_client")) (C.Variable "0")) [
+ C.Ex $ C.Call "ump_chan_free_message" [C.Variable "msg"],
+ C.If ((C.Binary C.Equals (C.DerefField my_bindvar "is_client")) (C.Variable "0")) [
C.SComment "Server should not recv bind messages. Ignore.",
- C.Goto "loopnext"] [],
+ C.Continue] [],
C.SComment "handle bind: calling connect callback",
C.Ex $ C.CallInd (bindvar `C.DerefField` "bind_cont")
[bindvar `C.DerefField` "st", errvar, bindvar],
- C.Goto "loopnext"] [],
+ C.Continue] [],
C.SBlank,
C.SComment "is this a cap ack for a pending tx message",
C.If (C.Binary C.Equals (C.Variable "msgnum") (C.Variable "FL_UMP_CAP_ACK"))
- [C.Ex $ C.Call "assert" [C.Unary C.Not (capst `C.FieldOf` "rx_cap_ack")],
+ [C.Ex $ C.Call "ump_chan_free_message" [C.Variable "msg"],
+ C.Ex $ C.Call "assert" [C.Unary C.Not (capst `C.FieldOf` "rx_cap_ack")],
C.Ex $ C.Assignment (capst `C.FieldOf` "rx_cap_ack") (C.Variable "true"),
C.If (capst `C.FieldOf` "monitor_mutex_held")
[C.Ex $ C.Call (tx_cap_handler_name p ifn) [my_bindvar]] [],
- C.Goto "loopnext"]
+ C.Continue]
[],
C.SBlank,
C.SBlank,
C.SComment "switch on message number and fragment number",
- C.Switch rx_msgnum_field msgnum_cases bad_msgnum,
- C.SBlank,
-
- C.Label "loopnext",
- C.SComment "send an ack if the channel is now full",
- C.If (C.Call "flounder_stub_ump_needs_ack" [stateaddr])
- [C.SComment "run our send process if we need to",
- C.If tx_is_busy
- [run_tx]
- [C.Ex $ C.Call "flounder_stub_ump_send_ack" [stateaddr],
- C.StmtList $ ump_notify p]
- ] []
+ C.Switch rx_msgnum_field msgnum_cases bad_msgnum
]
tx_is_busy = C.Binary C.Or
start_recv (ump_drv p) ifn typedefs mn msgargs ++
(if caps /= [] then [
-- + with caps received
- C.Ex $ C.Assignment
- (capst `C.FieldOf` "tx_cap_ack") (C.Variable "true"),
+ C.Ex $ C.Call "flounder_stub_ump_send_cap_ack" [C.AddressOf umpst],
+ -- C.Ex $ C.Assignment
+ -- (capst `C.FieldOf` "tx_cap_ack") (C.Variable "true"),
C.Ex $ C.Assignment
(capst `C.FieldOf` "rx_capnum") (C.NumConstant 0)
] else [])
| (frag, i) <- zip frags [0..] ]
bad_msgfrag,
C.Break]
+ where
+ umpst = C.DerefField my_bindvar "ump_state"
bad_msgnum = [report_user_err $ C.Variable "FLOUNDER_ERR_RX_INVALID_MSGNUM",
C.Goto "out"]
| (afl, word) <- zip wl [0..]],
(if isFirst then C.Ex $ C.Assignment binding_incoming_token ump_token else C.SBlank),
C.SBlank,
+ C.Ex $ C.Call "ump_chan_free_message" [C.Variable "msg"],
C.StmtList $ msgfrag_case_prolog msg caps isLast,
C.Goto "out"]
where
msgfrag_case msg@(Message _ mn _ _) (OverflowFragment (StringFragment af)) caps isFirst isLast = [
C.Ex $ C.Assignment errvar (C.Call "flounder_stub_ump_recv_string" args),
(if isFirst then C.Ex $ C.Assignment binding_incoming_token ump_token else C.SBlank),
+ C.Ex $ C.Call "ump_chan_free_message" [C.Variable "msg"],
C.If (C.Call "err_is_ok" [errvar])
(msgfrag_case_prolog msg caps isLast)
-- error from string receive code, check if it's permanent
msgfrag_case msg@(Message _ mn _ _) (OverflowFragment (BufferFragment _ afn afl)) caps isFirst isLast = [
C.Ex $ C.Assignment errvar (C.Call "flounder_stub_ump_recv_buf" args),
+ C.Ex $ C.Call "ump_chan_free_message" [C.Variable "msg"],
C.If (C.Call "err_is_ok" [errvar])
(msgfrag_case_prolog msg caps isLast)
-- error from receive code, check if it's permanent