register rlec rc addr(base, 0x04040) "Receive Length Error Count"
type(uint32);
+ // 8.2.3.23.23
+ register prc1522 rc addr(base, 0x04070) "Packets Received [1024 to Max Bytes] Count"
+ type(uint32);
+
// 8.2.23.26
register gprc ro addr(base, 0x04074) "Good packets recieved count"
type(uint32);
errors bulk_transfer BULK_TRANSFER_ {
failure MEM "Internal not enough memory error",
+ failure NO_CALLBACK "No callback was set",
failure CHAN_CREATE "Channel create operation failed.",
failure CHAN_BIND "Channel bind operation failed.",
failure CHAN_ASSIGN_POOL "Channel assign pool operation failed.",
failure CHAN_STATE "Channel has a wrong state",
failure CHAN_TRUST "Channel has a invalid trust level.",
failure CHAN_INVALID_EP "Channel has an invalid endpoint.",
+ failure CHAN_DIRECTION "The channel has the wrong direction for this operation.",
failure POOL_INVALD "The pool does not match.",
failure POOL_NOT_ASSIGNED "The pool has not yet been assigned to a channel.",
+ failure POOL_ASSIGN_VETO "The assignment request of the pool has been vetoed.",
failure POOL_MAP "Mapping of the pool failed",
failure POOL_UNMAP "The Unmapping of the pool failed",
failure POOL_ALREADY_ASSIGNED "The pool has already been assigned to this channel.",
failure BUFFER_UNMAP "The unmapping of the buffer failed.",
failure ALLOC_BUFFER_SIZE "The supplied buffer size is not valid.",
failure ALLOC_BUFFER_COUNT "The supplied buffer count is not valid.",
+ failure INVALID_ARGUMENT "The supplied argument is invalid.",
+ failure SM_NO_PENDING_MSG "No pending message associated with that tid",
+ failure SM_EXCLUSIVE_WS "BULK_SM: Exclusive waitset required per channel.",
+ failure NET_MAX_QUEUES "The number of maximum queues is reached",
+ failure NET_POOL_USED "The pool is already used over a no-copy channel.",
};
## Block Service
module /x86_64/sbin/block_server
module /x86_64/sbin/block_server_client
+module /x86_64/sbin/bs_user
# General user domains
module /x86_64/sbin/serial
sbin/lshw \
sbin/block_server \
sbin/block_server_client \
+ sbin/bs_user \
+ sbin/bulk_shm \
+ sbin/bulk_transfer_passthrough \
+ sbin/bulkbench_micro_echo \
+ sbin/bulkbench_micro_throughput \
+ sbin/bulkbench_micro_rtt \
# the following are broken in the newidc system
"xmplthc",
"unixsock",
"bcache",
- "replay",
- "block_service",
- "bulk_ctrl",
- "empty"
- ],
+ "replay",
+ "block_service",
+ "bulk_ctrl",
+ "empty"
+ ],
arch <- allArchitectures
] ++
*/
interface block_service "block service interface" {
- message write(uint32 start_block, uint32 count);
- message read(uint32 start_block, uint32 count);
+ message read(uint32 start_block, uint32 count, uint32 seqn);
+ message setup(iref tx_iref, iref rx_iref);
+ message status(errval err, uint32 seqn, uint32 req);
};
* ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
*/
+interface bulk_ctrl "bulk control channel interface" {
+
+ alias error uint64;
+
+ typedef enum {
+ SOURCE,
+ SINK
+ } direction;
+
+ typedef enum {
+ MASTER,
+ SLAVE,
+ GENERIC
+ } role;
+
+ typedef enum {
+ NONE,
+ HALF,
+ FULL
+ } trust;
+
+ typedef struct {
+ uint32 pool_id_machine;
+ uint32 pool_id_dom;
+ uint32 pool_id_local;
+ } poolid;
+
+ typedef struct {
+ uint32 pool_id_machine;
+ uint32 pool_id_dom;
+ uint32 pool_id_local;
+ trust trust;
+ uint32 buffer_size;
+ uint32 num_buffers;
+ cap cap;
+ } pool;
+
+ /*
+ rpc negotiate(in role role,
+ in trust trust,
+ out error error,
+ out direction match_direction,
+ out role match_role,
+ out uint64 meta_size);
+ */
+
+ message negotiate_call(role role,
+ trust trust);
+
+ message negotiate_response(error error,
+ direction match_direction,
+ role match_role,
+ uint64 meta_size);
+
+ /*
+ rpc assign_pool(in pool pool,
+ in uint64 id,
+ out error error,
+ out uint64 id);
+ */
+
+ message assign_pool_call(pool pool,
+ uint64 id);
+
+ message assign_pool_response(error error,
+ uint64 id);
/*
- * TODO: Decide if we send the aggregate object over the control channel
- * or if we split it up and send raw buffers
+ * buffer movement operations:
+ * we have to differentiate between the trusted and untrusted case, because some
+ * flounder backends (such as UMP) will go to great lenghts to send NULL_CAP's
+ * (UMP will always get a monitor mutex before sending any messages with caprefs,
+ * which slows the communication drastically)
+ * tid is a transaction id, used to mark which request a given reply belongs to
*
*/
+
+ /*
+ rpc move(in poolid poolid,
+ in uint32 bufferid,
+ in uint32 tid,
+ in cap cap,
+ in uint8 meta[metasize],
+ out error error,
+ out uint32 tid);
+ */
-interface bulk_ctrl "bulk control channel interface" {
- message copy_start(uint32 count);
- message copy(uint32 poolid, uint32 offset, uint32 length);
-
- message move_start(uint32 count);
- message move(uint32 poolid, uint32 offset, uint32 length);
-
- message free_start(uint32 count);
- message free(uint32 poolid, uint32 offset, uint32 length);
-
- message assign_pool(uint32 bufsize, uint32 count);
- message transfer_cap();
-
-
+ message move_untrusted_call(poolid poolid,
+ uint32 bufferid,
+ uint32 tid,
+ cap cap,
+ uint8 meta[metasize]);
+
+ message move_trusted_call(poolid poolid,
+ uint32 bufferid,
+ uint32 tid,
+ uint8 meta[metasize]);
+
+
+ message move_response(error error,
+ uint32 tid);
+
+ /*
+ rpc copy(in poolid poolid,
+ in uint32 bufferid,
+ in uint32 tid,
+ in cap cap,
+ in uint8 meta[metasize],
+ out error error,
+ out uint32 tid);
+ */
+
+ message copy_untrusted_call(poolid poolid,
+ uint32 bufferid,
+ uint32 tid,
+ cap cap,
+ uint8 meta[metasize]);
+
+ message copy_trusted_call(poolid poolid,
+ uint32 bufferid,
+ uint32 tid,
+ uint8 meta[metasize]);
+
+
+ message copy_response(error error,
+ uint32 tid);
+
+ /*
+ rpc pass(in poolid poolid,
+ in uint32 bufferid,
+ in uint32 tid,
+ in cap cap,
+ in uint8 meta[metasize],
+ out error error,
+ out uint32 tid);
+ */
+
+ message pass_untrusted_call(poolid poolid,
+ uint32 bufferid,
+ uint32 tid,
+ cap cap,
+ uint8 meta[metasize]);
+
+ message pass_trusted_call(poolid poolid,
+ uint32 bufferid,
+ uint32 tid,
+ uint8 meta[metasize]);
+
+ message pass_response(error error,
+ uint32 tid);
+
+ /*
+ rpc release(in poolid poolid,
+ in uint32 bufferid,
+ in uint32 tid,
+ out error error,
+ out uint32 tid);
+ */
+
+ message release_call(poolid poolid,
+ uint32 bufferid,
+ uint32 tid);
+
+ message release_response(error error,
+ uint32 tid);
};
cap txhwb,
cap rx,
uint32 rxbufsz,
+ uint32 rxhdrsz,
int16 msix_intvec,
uint8 msix_intdest,
bool use_irq,
struct memobj m; ///< public memobj interface
size_t count; ///< the number of frames
size_t chunk_size; ///< the size of the frames
- struct vregion *vregion; ///< the associated vregion
+ struct vregion *vregion; ///< the associated vregion
struct capref *frames; ///< the tracked frames
+ lpaddr_t *offsets; ///< the offset into the tracked frames
};
errval_t memobj_create_pinned(struct memobj_pinned *memobj, size_t size,
CHANTYPE_FLOUNDER,
CHANTYPE_AHCI,
CHANTYPE_LWIP_SOCKET,
+ CHANTYPE_BULK_E10K,
CHANTYPE_OTHER
};
errval_t waitset_chan_deregister(struct waitset_chanstate *chan);
errval_t waitset_chan_register(struct waitset *ws, struct waitset_chanstate *chan,
struct event_closure closure);
+errval_t waitset_chan_register_polled(struct waitset *ws,
+ struct waitset_chanstate *chan,
+ struct event_closure closure);
void waitset_chan_migrate(struct waitset_chanstate *chan,
struct waitset *new_ws);
*/
struct bulk_allocator {
struct bulk_pool *pool;
+ struct bulk_buffer_mng *mngs;
size_t num_free;
struct bulk_buffer_mng *free_buffers;
};
uintptr_t range_min;
uintptr_t range_max;
uintptr_t alignment;
+ //TRUST_NONE or TRUST_HALF implies seperate capabilities per buffer
+ enum bulk_trust_level trust;
};
#define BULK_NET_H
#include <bulk_transfer/bulk_transfer.h>
+#include <barrelfish/event_queue.h>
+#include <lwip/tcp.h>
+#include <dev/e10k_dev.h>
-/* TODO: correct inlude */
-struct ip_addr {
- int foo;
+struct bulk_net_msgdesc;
+struct e10k_binding;
+struct e10k_queue;
+
+struct stack_allocator {
+ size_t size;
+ size_t top;
+ void **stack;
+};
+
+struct bulk_e10k {
+ bool ready;
+ size_t buffer_size;
+ size_t ring_size;
+ uint8_t qi;
+ uint8_t int_core;
+ uint8_t int_vector;
+ struct e10k_binding *binding;
+ e10k_t d;
+ struct e10k_queue *q;
+ uint64_t mac;
+ struct capref rxframe;
+ struct capref txframe;
+ struct capref txhwbframe;
+ void (*received)(struct bulk_e10k *,
+ struct bulk_net_msgdesc *);
+ void (*transmitted)(struct bulk_e10k *,
+ void *);
+
+ struct event_queue event_queue;
+ struct stack_allocator rx_event_alloc;
+ struct stack_allocator tx_event_alloc;
+
+ struct waitset *waitset;
+ struct waitset_chanstate wscs;
+
+ void *opaque;
};
+
+
/**
* endpoint descriptor for the bulk network interface.
*
/* start of implementation specific part */
struct ip_addr ip; ///< ip address
uint16_t port; ///< port
- uint64_t rx_queue_id; ///< rx queue (control channel)
- uint64_t tx_queue_id; ///< tx queue (control channel)
+ /* from transparent endpoint */
+ char *cardname;
+ uint8_t queue;
+ uint8_t max_queues;
+ size_t buffer_size;
+ size_t buffer_count;
/*
* XXX: do we want to add the connection information here as well?
* e.g. tcp_pcb ?
struct ip_addr ip; ///< the ip address of the endpoint
uint16_t port; ///< the port of the endpoint
/* possibly queue id */
- uint64_t rx_queue_id; ///< rx queue (control channel)
- uint64_t tx_queue_id; ///< tx queue (control channel)
-};
-
-
-/**
- * enumeration of possible message types over the control channel
- */
-enum bulk_net_msg_type {
- BULK_NET_MSG_BIND, ///< binding to the channel
- BULK_NET_MSG_POOL_ASSIGN, ///< pool has been assigned to the channel
- BULK_NET_MSG_POOL_REMOVE, ///< pool has been removed from the channel
- BULK_NET_MSG_DATA ///< data arrives over the channel
-};
-
-struct bulk_net_header {
- enum bulk_net_msg_type type;
- size_t size;
-};
-
-struct bulk_net_msg {
- struct bulk_net_header header;
- /* meta data of the bulk data */
- union {
- struct {
- struct bulk_pool_id id;
- size_t buffer_size;
- size_t num_buffers;
- } pool_assign;
-
- struct {
- struct bulk_pool_id id;
- } pool_remove;
-
- struct {
- struct bulk_net_ep_setup ep_setup;
- struct bulk_channel_setup channel_setup;
- } bind;
-
- struct {
-
-
- } data;
-
- } msg;
+ uint8_t queue;
+ uint8_t max_queues;
+ size_t buffer_size;
+ size_t buffer_count;
+ char *cardname;
+ bool no_copy;
};
-/*
- * ---------------------------------------------------------------------------
- * Implementation Specific Interface Functions >>>
- *
- * TODO: make asynchronous
- */
-errval_t bulk_net_channel_pass(struct bulk_channel *channel,
- struct bulk_buffer *buffer,
- void *meta,
- struct bulk_continuation cont);
-
-errval_t bulk_net_channel_copy(struct bulk_channel *channel,
- struct bulk_buffer *buffer,
- void *meta,
- struct bulk_continuation cont);
-
-errval_t bulk_net_channel_release(struct bulk_channel *channel,
- struct bulk_buffer *buffer,
- struct bulk_continuation cont);
-
-errval_t bulk_net_channel_move(struct bulk_channel *channel,
- struct bulk_buffer *buffer,
- void *meta,
- struct bulk_continuation cont);
-
-errval_t bulk_net_channel_assign_pool(struct bulk_channel *channel,
- struct bulk_pool *pool);
-
-errval_t bulk_net_channel_bind(struct bulk_channel *channel);
-
-errval_t bulk_net_channel_create(struct bulk_channel *channel);
-
-struct bulk_implementation *bulk_net_get_implementation(void);
-
-/*
- * <<< Implementation Specific Interface Functions
- * ---------------------------------------------------------------------------
- */
-
/**
* Creates a new bulk endpoint which uses the network backend
*
* @param port the port where the otherside listens to
*/
errval_t bulk_net_ep_create_remote(struct bulk_net_endpoint_descriptor *ep_desc,
- struct ip_addr ip, uint16_t port);
+ struct bulk_net_ep_setup *setup);
#endif /* BULK_NET_H */
--- /dev/null
+/**
+ * \file
+ * \brief Proxy for connecting bulk transfer channels over a network connection
+ */
+
+/*
+ * Copyright (c) 2014, ETH Zurich.
+ * All rights reserved.
+ *
+ * This file is distributed under the terms in the attached LICENSE file.
+ * If you do not find this file, copies can be found by writing to:
+ * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
+ */
+
+#ifndef BULK_NET_PROXY_H
+#define BULK_NET_PROXY_H
+
+#include <barrelfish/barrelfish.h>
+#include <bulk_transfer/bulk_transfer.h>
+#include <bulk_transfer/bulk_net.h>
+
+#include <dev/e10k_dev.h>
+
+
+/* Internal stuff, only here to avoid unnecessary mallocs */
+struct transmit_buffer;
+struct receive_buffer;
+
+
+/** Proxy handle struct */
+struct bulk_net_proxy {
+ struct waitset *ws;
+ struct bulk_channel channel;
+ size_t buffer_size;
+
+ errval_t err;
+ bool bulk_bound;
+ bool net_bound;
+ void (*connected)(struct bulk_net_proxy *);
+
+ struct bulk_e10k transfer;
+
+ struct stack_allocator rb_stack;
+ struct transmit_buffer *tb;
+ struct stack_allocator tb_stack;
+
+ struct bulk_continuation panic_cont;
+ void *zero_meta;
+
+ // Coordinates for network connection
+ const char *card;
+ uint8_t queue;
+ uint16_t l_port;
+ uint16_t r_port;
+ uint32_t l_ip;
+ uint32_t r_ip;
+ uint64_t l_mac;
+ uint64_t r_mac;
+
+ void *user_state;
+};
+
+/**
+ * Start listening proxy on the specified port. Note that the proxy will bind to
+ * the bulk channel during the initialization phase, before actually receiving a
+ * connection request. But transfers on the channel (including the passing of
+ * buffers for the receive queue) must only be executed after the connected
+ * callback is called.
+ *
+ * @param p Proxy struct
+ * @param desc Bulk endpoint to bind to
+ * @param ws Waitset
+ * @param buffer_size Size of buffers to be transmitted over this proxy
+ * @param card NIC name in barrelfish
+ * @param queue Queue ID to use
+ * @param port Port number to listen on (host byte order)
+ * @param connected Callback, will be invoked once an incoming connection has
+ * been established.
+ */
+errval_t bulk_net_proxy_listen(struct bulk_net_proxy *p,
+ struct bulk_endpoint_descriptor *desc,
+ struct waitset *ws,
+ size_t buffer_size,
+ const char *card,
+ uint8_t queue,
+ uint16_t port,
+ void (*connected)(struct bulk_net_proxy *));
+
+/**
+ * Connect to listening proxy. Note that the proxy will bind to the bulk channel
+ * during the initialization phase, before actually receiving a connection
+ * request. But transfers on the channel (including the passing of buffers for
+ * the receive queue) must only be executed after the connected callback is
+ * called.
+ * @param p Proxy struct
+ * @param desc Bulk endpoint to bind to
+ * @param ws Waitset
+ * @param buffer_size Size of buffers to be transmitted over this proxy
+ * @param card NIC name in barrelfish
+ * @param queue Queue ID to use
+ * @param ip IP to connect to (host byte order)
+ * @param port Port number to connect to (host byte order)
+ * @param connected Callback, will be invoked once the connection has
+ * been established.
+*
+ */
+errval_t bulk_net_proxy_connect(struct bulk_net_proxy *p,
+ struct bulk_endpoint_descriptor *desc,
+ struct waitset *ws,
+ size_t buffer_size,
+ const char *card,
+ uint8_t queue,
+ uint32_t ip,
+ uint16_t port,
+ void (*connected)(struct bulk_net_proxy *));
+
+#endif /* BULK_NET_H */
#ifndef BULK_SM_H
#define BULK_SM_H
+#include <bulk_transfer/bulk_transfer.h>
+
+#include <if/bulk_ctrl_defs.h>
+
+// Shared memory specific structs -----------------------------------------
+
+enum bulk_sm_endpoint_state {
+ BULK_EPSTATE_CREATED, ///< dummy endpoint after creation
+ BULK_EPSTATE_IREF_EXPORTED ///< endpoint's bulk_ctrl is exported on the ep's iref
+};
+
struct bulk_sm_endpoint_descriptor {
/* generic part */
struct bulk_endpoint_descriptor ep_generic; ///< generic endpoint part
+
/* start of implementation specific part */
- /* TODO: references to iref / flounder data structure */
+ volatile enum bulk_sm_endpoint_state state;
+ iref_t iref;
+ errval_t err;
+};
+
+struct bulk_sm_resend_item {
+ struct bulk_sm_resend_item *next;
+ struct event_closure event;
+};
+
+struct bulk_sm_impl_data {
+ struct bulk_ctrl_binding *b;
+ struct bulk_sm_pending_msg *root;
+ struct thread_mutex mutex;//used for changes to pending_msg
+ //TODO: remove binding stuff from here, use pending message system instead
+ errval_t bind_error;
+ struct bulk_continuation bind_cont;
+
+ // resending of flounder messages
+ struct thread_mutex resend_lock;
+ struct bulk_sm_resend_item *resend_closure;
};
+// Shared memory implementation callbacks ---------------------------------
+
+errval_t bulk_sm_channel_create(struct bulk_channel *channel);
+
+errval_t bulk_sm_channel_bind(struct bulk_channel *channel,
+ struct bulk_continuation cont);
+
+errval_t bulk_sm_channel_destroy(struct bulk_channel *channel);
+
+errval_t bulk_sm_assign_pool(struct bulk_channel *channel,
+ struct bulk_pool *pool,
+ struct bulk_continuation cont);
+
+errval_t bulk_sm_remove_pool(struct bulk_channel *channel,
+ struct bulk_pool *pool,
+ struct bulk_continuation cont);
+
+errval_t bulk_sm_move(struct bulk_channel *channel,
+ struct bulk_buffer *buffer,
+ void *meta,
+ struct bulk_continuation cont);
+
+errval_t bulk_sm_copy(struct bulk_channel *channel,
+ struct bulk_buffer *buffer,
+ void *meta,
+ struct bulk_continuation cont);
+
+errval_t bulk_sm_release(struct bulk_channel *channel,
+ struct bulk_buffer *buffer,
+ struct bulk_continuation cont);
+
+errval_t bulk_sm_pass(struct bulk_channel *channel,
+ struct bulk_buffer *buffer,
+ void *meta,
+ struct bulk_continuation cont);
+
+struct bulk_implementation *bulk_sm_get_implementation(void);
+
+// Shared memory support functions ----------------------------------------
+
+/**
+ * Creates a new bulk endpoint which uses the shared memory backend
+ *
+ * @param ep_desc memory location to create the endpoint in
+ *
+ * This function is intended to be used by the creator. (exporting side)
+ */
+errval_t bulk_sm_ep_create(struct bulk_sm_endpoint_descriptor *ep_desc);
+
+/**
+ * Creates a new bulk endpoint which uses the shared memory backend
+ *
+ * @param ep_desc memory location to create the endpoint in
+ * @param remote_iref the iref of the exported service on the other side
+ *
+ * This function is intended to be used by the binding side
+ */
+errval_t bulk_sm_ep_create_remote(struct bulk_sm_endpoint_descriptor *ep_desc,
+ iref_t remote_iref);
+
+// Helpers to deal with multiple waitsets (each channel requires one) -----
+
+/**
+ * List of waitsets to dispatch.
+ */
+struct bulk_sm_ws_item {
+ struct bulk_sm_ws_item *next;
+ struct waitset *ws; ///< waitset to dispatch. may be NULL.
+};
+
+/**
+ * Dispatches events on a list of waitsets, non-blocking.
+ * @returns: LIB_ERR_NO_EVENT if no event dispatched.
+ */
+errval_t bulk_sm_multiple_event_dispatch_non_block(struct bulk_sm_ws_item *item);
+
+/**
+ * Dispatches events on on a list of waitsets. Retruns if events dispatched.
+ */
+errval_t bulk_sm_multiple_event_dispatch(struct bulk_sm_ws_item *item);
#endif /* BULK_SM_H */
BULK_STATE_UNINITIALIZED, ///< channel not initialized, no endpoint assigned
BULK_STATE_INITIALIZED, ///< local endpoint assigned, ready for binding
BULK_STATE_BINDING, ///< binding is in progress
+ BULK_STATE_BIND_NEGOTIATE, ///< channel properties are negotiated (role, trust)
BULK_STATE_CONNECTED, ///< binding is completed and ready for use
BULK_STATE_TEARDOWN, ///< teardown is initiated
BULK_STATE_CLOSED ///< the channel has been closed
void *arg;
};
+#define MK_BULK_CONT(h,a) ((struct bulk_continuation) {.handler=(h), .arg=(a)})
+#define BULK_CONT_NOP MK_BULK_CONT(NULL, NULL)
+
+/**
+ * Helper function to call a bulk continuation with given arguments.
+ */
+static inline void bulk_continuation_call(struct bulk_continuation cont,
+ errval_t err,
+ struct bulk_channel *channel)
+{
+ if (cont.handler) {
+ cont.handler(cont.arg, err, channel);
+ }
+}
+
+
/**
* Function pointers provided by an implementation of the bulk transfer
* mechanism over a specific backend. Functions correspond closely to the
struct bulk_implementation {
errval_t (*channel_create)(struct bulk_channel *channel);
- errval_t (*channel_bind)(struct bulk_channel *channel);
+ errval_t (*channel_bind)(struct bulk_channel *channel,
+ struct bulk_continuation cont);
errval_t (*channel_destroy)(struct bulk_channel *channel);
errval_t (*assign_pool)(struct bulk_channel *channel,
- struct bulk_pool *pool);
+ struct bulk_pool *pool,
+ struct bulk_continuation cont);
errval_t (*remove_pool)(struct bulk_channel *channel,
struct bulk_pool *pool,
struct bulk_channel_callbacks {
/**
* For exporting side: other endpoint successfully bound
- * For binding side: binding succeeded
*/
- void (*bind_done)(struct bulk_channel *channel, errval_t err);
+ errval_t (*bind_received)(struct bulk_channel *channel);
/**
* the other side wants to teardown the channel
* error code is sent to the other side (veto).
*/
errval_t (*pool_assigned)(struct bulk_channel *channel,
- struct bulk_pool *pool);
+ struct bulk_pool *pool);
/**
* The other endpoint wants to remove a pool from this channel
*/
errval_t (*pool_removed)(struct bulk_channel *channel,
- struct bulk_pool *pool);
+ struct bulk_pool *pool);
/** Incoming moved buffer (sink) */
void (*move_received)(struct bulk_channel *channel,
*/
struct bulk_pool_id {
uint32_t machine;
- domainid_t dom;
+ uint32_t dom;//warning: disp_get_domain_id() is core-local
uint32_t local;
};
size_t buffer_size;
/** pool trust level depending on first assignment */
enum bulk_trust_level trust;
- /** pointer to an implementation dependent data structure */
- void *impl_data;
/** capability for the entire pool */
struct capref pool_cap;
/** the maximum number of buffers in this pool */
struct bulk_buffer {
/** the virtual address of the buffer */
void *address;
+ /** the physical address */
+ uintptr_t phys;
/** XXX: maybe we have to use the pool_id here */
struct bulk_pool *pool;
+ /** index of this buffer within the pool's array of buffers */
+ uint32_t bufferid;
/** capability for this buffer */
struct capref cap;
/** offset in the capability */
errval_t bulk_channel_bind(struct bulk_channel *channel,
struct bulk_endpoint_descriptor *remote_ep_desc,
struct bulk_channel_callbacks *callbacks,
- struct bulk_channel_bind_params *params);
+ struct bulk_channel_bind_params *params,
+ struct bulk_continuation cont);
/**
* * There is the pool assigned callback that serves as a continuation for this.
*/
errval_t bulk_channel_assign_pool(struct bulk_channel *channel,
- struct bulk_pool *pool);
+ struct bulk_pool *pool,
+ struct bulk_continuation cont);
/**
* Remove a pool from a channel
*
* @param channel Channel to be freed
*/
-errval_t bulk_channel_destroy(struct bulk_channel *channel);
+errval_t bulk_channel_destroy(struct bulk_channel *channel,
+ struct bulk_continuation cont);
/*
* ---------------------------------------------------------------------------
paging_set_flags(entry, kpi_paging_flags);
}
- return SYS_ERR_OK;
+ return paging_tlb_flush_range(mapping, pages);
}
void paging_dump_tables(struct dcb *dispatcher)
paging_set_flags(entry, kpi_paging_flags);
}
- return SYS_ERR_OK;
+ return paging_tlb_flush_range(mapping, pages);
}
void paging_dump_tables(struct dcb *dispatcher)
paging_set_flags(entry, kpi_paging_flags);
}
- return SYS_ERR_OK;
+ return paging_tlb_flush_range(mapping, pages);
}
void paging_dump_tables(struct dcb *dispatcher)
paging_set_flags(entry, kpi_paging_flags);
}
- return SYS_ERR_OK;
+ return paging_tlb_flush_range(mapping, pages);
}
void paging_dump_tables(struct dcb *dispatcher)
paging_x86_32_modify_flags(entry, flags);
}
- return SYS_ERR_OK;
+ return paging_tlb_flush_range(mapping, pages);
}
void paging_dump_tables(struct dcb *dispatcher)
return unmapped_pages;
}
-errval_t page_mappings_unmap(struct capability *pgtable, struct cte *mapping,
+errval_t page_mappings_unmap(struct capability *pgtable, struct cte *mapping,
size_t slot, size_t num_pages)
{
assert(type_is_vnode(pgtable->type));
paging_x86_64_modify_flags(entry, flags);
}
- return SYS_ERR_OK;
+ /* flush affected TLB entries and return */
+ return paging_tlb_flush_range(mapping, pages);
}
void paging_dump_tables(struct dcb *dispatcher)
errval_t compile_vaddr(struct cte *ptable, size_t entry, genvaddr_t *retvaddr);
errval_t unmap_capability(struct cte *mem);
errval_t lookup_cap_for_mapping(genpaddr_t paddr, lvaddr_t pte, struct cte **retcte);
+errval_t paging_tlb_flush_range(struct cte *frame, size_t pages);
#endif // PAGING_H
// if we get here, we have not found a matching cap
return SYS_ERR_CAP_NOT_FOUND;
}
+
+errval_t paging_tlb_flush_range(struct cte *frame, size_t pages)
+{
+ // reconstruct first virtual address for TLB flushing
+ struct cte *leaf_pt;
+ errval_t err;
+ err = mdb_find_cap_for_address(frame->mapping_info.pte, &leaf_pt);
+ if (err_is_fail(err)) {
+ return err;
+ }
+ genvaddr_t vaddr;
+ size_t entry = (frame->mapping_info.pte - get_address(&leaf_pt->cap)) /
+ sizeof(union x86_64_ptable_entry);
+ err = compile_vaddr(leaf_pt, entry, &vaddr);
+ if (err_is_fail(err)) {
+ if (err_no(err) == SYS_ERR_VNODE_NOT_INSTALLED) {
+ debug(SUBSYS_PAGING, "couldn't reconstruct virtual address\n");
+ }
+ else {
+ return err;
+ }
+ }
+ debug(SUBSYS_PAGING, "flushing TLB entries for vaddrs 0x%"
+ PRIxGENVADDR"--0x%"PRIxGENVADDR"\n",
+ vaddr, vaddr+(pages * BASE_PAGE_SIZE));
+ // flush TLB entries for all modified pages
+ for (int i = 0; i < pages; i++) {
+ do_one_tlb_flush(vaddr);
+ vaddr += BASE_PAGE_SIZE;
+ }
+
+ return SYS_ERR_OK;
+}
return;
}
- if (q->head == NULL) {
+ if (q->head != NULL) {
// queue is non-empty: trigger ourselves again
struct event_closure self = {
.handler = event_queue_runner,
errval_t waitset_chan_register_disabled(struct waitset *ws,
struct waitset_chanstate *chan,
struct event_closure closure);
-errval_t waitset_chan_register_polled(struct waitset *ws,
+/*errval_t waitset_chan_register_polled(struct waitset *ws,
struct waitset_chanstate *chan,
- struct event_closure closure);
+ struct event_closure closure);*/
errval_t waitset_chan_register_polled_disabled(struct waitset *ws,
struct waitset_chanstate *chan,
struct event_closure closure,
genvaddr_t base = vregion_get_base_addr(vregion);
genvaddr_t vregion_offset = vregion_get_offset(vregion);
errval_t err;
-
+ size_t ret_size;
err = pmap->f.modify_flags(pmap, base + offset + vregion_offset, range,
- flags, &range);
+ flags, &ret_size);
if (err_is_fail(err)) {
return err_push(err, LIB_ERR_PMAP_MODIFY_FLAGS);
}
* \param memobj The memory object
* \param offset Offset into the memory object
* \param frame The frame cap for the offset
- * \param size The size of frame cap
+ * \param offset The offset into the frame cap
*
* Pagefault relies on frames inserted in order
*/
static errval_t fill(struct memobj *memobj,
genvaddr_t offset,
struct capref frame,
- size_t size)
+ lpaddr_t frame_offset)
{
struct memobj_fixed *fixed = (struct memobj_fixed*) memobj;
- if (offset % fixed->chunk_size || size != fixed->chunk_size) {
+ if (offset % fixed->chunk_size) {
return LIB_ERR_MEMOBJ_FILL;
}
}
fixed->frames[slot] = frame;
+ fixed->offsets[slot] = frame_offset;
return SYS_ERR_OK;
}
genvaddr_t vregion_offset = vregion_get_offset(vregion);
err = pmap->f.unmap(pmap, vregion_base + vregion_offset + offset,
- fixed->chunk_size, &retsize);
+ fixed->chunk_size, &retsize);
if (err_is_fail(err)) {
return err_push(err, LIB_ERR_PMAP_UNMAP);
}
vregion_flags_t flags = vregion_get_flags(vregion);
err = pmap->f.map(pmap, base + vregion_offset + offset, fixed->frames[slot],
- 0, fixed->chunk_size, flags, NULL, NULL);
+ fixed->offsets[slot], fixed->chunk_size, flags,
+ NULL, NULL);
if (err_is_fail(err)) {
return err_push(err, LIB_ERR_PMAP_MAP);
}
* The frames are mapped in on demand.
*/
errval_t memobj_create_fixed(struct memobj_fixed *fixed,
- size_t size,
+size_t size,
memobj_flags_t flags,
size_t count,
size_t chunk_size)
memobj->f.pagefault = pagefault;
memobj->f.pager_free = pager_free;
-
assert(size == count * chunk_size);
assert((chunk_size % BASE_PAGE_SIZE)==0);
/* specific portion */
fixed->count = count;
fixed->chunk_size = chunk_size;
+ fixed->vregion = NULL;
fixed->frames = malloc(count * sizeof(struct capref));
if (!fixed->frames) {
}
memset(fixed->frames, 0, count * sizeof(struct capref));
+ fixed->offsets = malloc(count * sizeof(lpaddr_t));
+ if (!fixed->offsets) {
+ return LIB_ERR_MALLOC_FAIL;
+ }
+ memset(fixed->offsets, 0, count * sizeof(lpaddr_t));
+
return SYS_ERR_OK;
}
err = vregion_destroy(vregion);
free(m->frames);
+ free(m->offsets);
return err;
}
}
#endif // CONFIG_INTERCONNECT_DRIVER_UMP
+void bulk_e10k_poll(struct waitset_chanstate *chan) __attribute__((weak));
+void bulk_e10k_poll(struct waitset_chanstate *chan) { }
+
/// Helper function that knows how to poll the given channel, based on its type
static void poll_channel(struct waitset_chanstate *chan)
{
ump_endpoint_poll(chan);
break;
#endif // CONFIG_INTERCONNECT_DRIVER_UMP
+ case CHANTYPE_BULK_E10K:
+ bulk_e10k_poll(chan);
+ break;
default:
assert(!"invalid channel type to poll!");
-- ETH Zurich D-INFK, Universitaetstr. 6, CH-8092 Zurich. Attn: Systems Group.
--
-- Hakefile for lib/bulk_transfer
---
+--
--
--------------------------------------------------------------------------
[
- build library {
+ build library {
target = "bulk_transfer",
cFiles = [ "bulk_allocator.c",
- "bulk_transfer.c",
- "bulk_buffer.c",
+ "bulk_transfer.c",
+ "bulk_buffer.c",
"bulk_channel.c",
- "bulk_endpoint.c",
- "control_channel.c",
- "bulk_pool.c",
+ "bulk_endpoint.c",
+ "control_channel.c",
+ "bulk_pool.c",
+ "backends/sm/flounder_helpers.c",
+ "backends/sm/ws_helpers.c",
+ "backends/sm/endpoint.c",
+ "backends/sm/channel.c",
+ "backends/sm/pool.c",
+ "backends/sm/buffers.c",
+ "backends/sm/pending_msg.c",
"backends/net/bulk_net_endpoint.c",
- "backends/net/bulk_net_channel.c",
+ "backends/net/bulk_net_e10k.c",
+ "backends/net/bulk_net_transfer.c",
+ "backends/net/bulk_net_proxy.c",
+ "backends/net/bulk_net_transparent.c",
+ "backends/net/bulk_net_no_copy.c",
+ "backends/net/stack_allocator.c",
"backends/local/control_channel.c"
],
flounderDefs = [ "bulk_ctrl" ],
- flounderBindings = [ "bulk_ctrl" ]
+ flounderBindings = [ "bulk_ctrl", "net_ports", "net_ARP", "e10k" ],
+ flounderExtraBindings = [ ("net_ports", ["rpcclient"]),
+ ("net_ARP", ["rpcclient"]) ],
+
+ mackerelDevices = [ "e10k", "e10k_q" ],
+ addLibraries = [ "pci" ]
+
}
]
#include <barrelfish/event_queue.h>
#include <string.h>
+#include <bulk_transfer/bulk_transfer.h>
+#include <bulk_transfer/bulk_allocator.h>
#include <bulk_transfer/bulk_local.h>
+#include "../../bulk_pool.h"
+#include "../../bulk_buffer.h"
+
#include "../../helpers.h"
+//#define IMPL_DEBUG(fmt, msg...) debug_printf("%s: "fmt"\n", __func__, msg);
+#define IMPL_DEBUG(fmt, msg...)
+
+//#define EVENT_DEBUG(fmt, msg...) debug_printf("%s: "fmt"\n", __func__, msg);
+#define EVENT_DEBUG(fmt, msg...)
+//#define EVENT_DEBUG_TRACE debug_printf("%s\n", __func__);
+#define EVENT_DEBUG_TRACE
-struct local_channel {
+struct local_channel
+{
struct bulk_channel *other;
- struct event_queue events;
+ struct event_queue events;
};
-struct local_event {
+struct local_event
+{
struct event_queue_node eqn;
- struct bulk_channel *channel;
- void (*handler)(struct local_event *);
+ struct bulk_channel *channel;
+ void (*handler)(struct local_event *);
+ struct bulk_continuation cont;
+ union
+ {
+ struct
+ {
+ errval_t err;
+ } status;
- union {
- struct {
+ struct
+ {
errval_t err;
} bind_done;
- struct {
+ struct
+ {
+ errval_t err;
struct bulk_pool *pool;
} pool_assigned;
- struct {
+ struct
+ {
+ struct bulk_pool_id pool_id;
+ size_t buf_id;
struct bulk_buffer *buffer;
- void *meta;
+ void *meta;
} move_received;
- struct {
+ struct
+ {
struct bulk_buffer *buffer;
- void *meta;
+ struct bulk_pool_id pool_id;
+ size_t buf_id;
+ void *meta;
} buffer_received;
- struct {
+ struct
+ {
+ struct bulk_pool_id pool_id;
+ size_t buf_id;
struct bulk_buffer *buffer;
- void *meta;
+ void *meta;
} copy_received;
- struct {
+ struct
+ {
+ struct bulk_pool_id pool_id;
+ size_t buf_id;
struct bulk_buffer *buffer;
} copy_released;
} params;
};
+static void event_handler(void *arg);
+
static errval_t impl_create(struct bulk_channel *channel);
-static errval_t impl_bind(struct bulk_channel *channel);
+static errval_t impl_bind(struct bulk_channel *channel,
+ struct bulk_continuation cont);
static errval_t impl_assign_pool(struct bulk_channel *channel,
- struct bulk_pool *pool);
+ struct bulk_pool *pool,
+ struct bulk_continuation cont);
static errval_t impl_move(struct bulk_channel *channel,
- struct bulk_buffer *buffer,
- void *meta,
+ struct bulk_buffer *buffer,
+ void *meta,
struct bulk_continuation cont);
static errval_t impl_pass(struct bulk_channel *channel,
- struct bulk_buffer *buffer,
- void *meta,
+ struct bulk_buffer *buffer,
+ void *meta,
struct bulk_continuation cont);
static errval_t impl_copy(struct bulk_channel *channel,
- struct bulk_buffer *buffer,
- void *meta,
+ struct bulk_buffer *buffer,
+ void *meta,
struct bulk_continuation cont);
static errval_t impl_release(struct bulk_channel *channel,
- struct bulk_buffer *buffer,
+ struct bulk_buffer *buffer,
struct bulk_continuation cont);
static struct bulk_implementation implementation = {
.channel_create = impl_create,
- .channel_bind = impl_bind,
- .assign_pool = impl_assign_pool,
- .move = impl_move,
- .pass = impl_pass,
- .copy = impl_copy,
- .release = impl_release,
-};
+ .channel_bind = impl_bind,
+ .assign_pool = impl_assign_pool,
+ .move = impl_move,
+ .pass = impl_pass,
+ .copy = impl_copy,
+ .release = impl_release, };
static errval_t init_channel(struct bulk_channel *channel)
{
return SYS_ERR_OK;
}
+/* ----------------------- event management --------------------------------*/
+
+/**
+ * allocates a new event
+ */
static errval_t event_alloc(struct bulk_channel *channel,
struct local_event **ev,
- void (*handler)(struct local_event *),
- size_t extra)
+ void (*handler)(struct local_event *),
+ size_t extra)
{
- *ev = malloc(sizeof(*ev) + extra);
+ *ev = malloc(sizeof(struct local_event) + extra);
if (*ev == NULL) {
return BULK_TRANSFER_MEM;
}
return SYS_ERR_OK;
}
-static void event_handler(void *arg)
-{
- struct local_event *lev = arg;
- lev->handler(lev);
- free(lev);
-}
-
+/**
+ * enqueues the local event to the event queue of the channel
+ */
static void event_enqueue(struct local_event *lev)
{
struct local_channel *local = lev->channel->impl_data;
- event_queue_add(&local->events, &lev->eqn,
- MKCLOSURE(event_handler, lev));
+ event_queue_add(&local->events, &lev->eqn, MKCLOSURE(event_handler, lev));
}
-static void event_bind_done(struct local_event *lev)
-{
- lev->channel->callbacks->bind_done(
- lev->channel,
- lev->params.bind_done.err);
-}
+/* ========================= EVENT HANDLERS ================================ */
-static void event_pool_assigned(struct local_event *lev)
-{
- lev->channel->callbacks->pool_assigned(
- lev->channel,
- lev->params.pool_assigned.pool);
-}
+/* ------------------------- event handlers -------------------------------- */
-static void event_move_received(struct local_event *lev)
+/**
+ *
+ */
+static void event_handler(void *arg)
{
- lev->channel->callbacks->move_received(
- lev->channel,
- lev->params.move_received.buffer,
- lev->params.move_received.meta);
+ struct local_event *lev = arg;
+ lev->handler(lev);
+ free(lev);
}
-static void event_buffer_received(struct local_event *lev)
+static void event_op_done(struct local_event *lev)
{
- lev->channel->callbacks->buffer_received(
- lev->channel,
- lev->params.buffer_received.buffer,
- lev->params.buffer_received.meta);
+ EVENT_DEBUG_TRACE
+ if (lev->cont.handler) {
+ lev->cont.handler(NULL, SYS_ERR_OK, lev->channel);
+ } else {
+ EVENT_DEBUG("event_op_done(): no handler set...\n");
+ }
+ free(lev);
}
-static void event_copy_received(struct local_event *lev)
-{
- lev->channel->callbacks->copy_received(
- lev->channel,
- lev->params.copy_received.buffer,
- lev->params.copy_received.meta);
-}
+/* -------------------------- event bind ----------------------------------- */
-static void event_copy_released(struct local_event *lev)
+/**
+ * Gets called when the binding procedure is over.
+ *
+ * Side: Binding Domain.
+ */
+static void event_bind_done(struct local_event *lev)
{
- lev->channel->callbacks->copy_released(
- lev->channel,
- lev->params.copy_released.buffer);
-}
-
+ EVENT_DEBUG_TRACE;
+ assert(lev);
+ if (lev->cont.handler == NULL) {
+ EVENT_DEBUG("%s", "handler not set");
+ return;
+ }
+ lev->cont.handler(lev->cont.arg, lev->params.bind_done.err, lev->channel);
+}
-static errval_t impl_create(struct bulk_channel *channel)
+/**
+ * Gets called when a bind request has been received
+ *
+ * Side: Creating Side
+ */
+static void event_bind_received(struct local_event *lev)
{
- return init_channel(channel);
+ EVENT_DEBUG_TRACE
+
+ errval_t err, reply = SYS_ERR_OK;
+ struct local_event *ev;
+ struct local_channel *l = lev->channel->impl_data;
+
+ /* do the callback to the application to inform about the binding */
+ if (lev->channel->callbacks->bind_received) {
+ err = lev->channel->callbacks->bind_received(lev->channel);
+ if (err_is_fail(err)) {
+ reply = err;
+ }
+ } else {
+ /* XXX: or if no cb set, just say SYS_ERR_OK ? */
+ reply = BULK_TRANSFER_NO_CALLBACK;
+ }
+
+ /* allocate and trigger event bind_done */
+ err = event_alloc(l->other, &ev, event_bind_done, 0);
+ if (!err_is_fail(err)) {
+ ev->params.bind_done.err = reply;
+ ev->cont = lev->cont;
+ event_enqueue(ev);
+ }
}
-static errval_t impl_bind(struct bulk_channel *channel)
+/**
+ * Implementation specific bind procedure
+ *
+ * Side: Binding Side
+ */
+static errval_t impl_bind(struct bulk_channel *channel,
+ struct bulk_continuation cont)
{
errval_t err;
struct local_channel *l, *o_l;
struct bulk_local_endpoint *ep;
- struct local_event *ev, *o_ev;
+ struct local_event *ev;
+ /* Initialize the channel */
err = init_channel(channel);
if (err_is_fail(err)) {
return err;
}
+ /* setting the pointers to the other channel */
ep = (struct bulk_local_endpoint *) channel->ep;
l = channel->impl_data;
l->other = ep->other_channel;
o_l = l->other->impl_data;
o_l->other = channel;
- // Set channel parameters from other side
+ /* set channel parameters from the other side */
channel->role = bulk_role_other(l->other->role);
channel->direction = bulk_direction_other(l->other->role);
channel->meta_size = l->other->meta_size;
- // Allocate events
- err = event_alloc(channel, &ev, event_bind_done, 0);
- if (err_is_fail(err)) {
- goto error;
- }
- err = event_alloc(l->other, &o_ev, event_bind_done, 0);
+ /* update the channel state */
+ channel->state = BULK_STATE_CONNECTED;
+ l->other->state = BULK_STATE_CONNECTED;
+
+ /* allocate and trigger the bind event to the other side */
+ err = event_alloc(l->other, &ev, event_bind_received, 0);
if (err_is_fail(err)) {
- free(ev);
goto error;
}
- // Now we're sure that we can succeed
- channel->state = BULK_STATE_CONNECTED;
- l->other->state = BULK_STATE_CONNECTED;
-
// Trigger first event
- ev->params.bind_done.err = SYS_ERR_OK;
+ ev->cont = cont;
event_enqueue(ev);
- // Trigger second event
- o_ev->params.bind_done.err = SYS_ERR_OK;
- event_enqueue(o_ev);
return SYS_ERR_OK;
-error:
- free(l);
+ error: free(l);
+
return err;
}
-static errval_t impl_assign_pool(struct bulk_channel *channel,
- struct bulk_pool *pool)
+/* -------------------------- event pool assign ---------------------------- */
+
+/**
+ * Gets called when the pool assignment on the other side is completed
+ *
+ * Side: Assigning Side
+ */
+static void event_pool_assigned(struct local_event *lev)
{
- struct local_channel *l = channel->impl_data;
- struct local_event *ev;
+ EVENT_DEBUG_TRACE
+
+ errval_t err;
+ errval_t result = lev->params.pool_assigned.err;
+
+ if (lev->cont.handler) {
+ if (!err_is_fail(result)) {
+ err = bulk_pool_assign(lev->params.pool_assigned.pool,
+ lev->channel);
+ if (err_is_fail(err)) {
+ result = err;
+ }
+ }
+
+ EVENT_DEBUG(" > [%s]", (result==SYS_ERR_OK) ? "Success", "Failure");
+
+ /* call the continuation */
+ lev->cont.handler(lev->params.pool_assigned.pool, result, lev->channel);
+ } else {
+ EVENT_DEBUG("%s", "continuation handler not set");
+ }
+}
+
+/**
+ * Gets called when a pool is assigned to the channel
+ *
+ * Side: Other
+ */
+static void event_pool_assign(struct local_event *lev)
+{
+ EVENT_DEBUG_TRACE
+
errval_t err;
+ errval_t assigned;
+ struct local_event *ev;
+
+ struct bulk_pool *pool = lev->params.pool_assigned.pool;
+ struct bulk_channel *chan = lev->channel;
+ struct local_channel *l = chan->impl_data;
+
+ if (bulk_pool_is_assigned(pool, chan)) {
+ /* channel is already assigned */
+ EVENT_DEBUG("pool [%p] is already assigned to channel.", pool);
+ err = event_alloc(l->other, &ev, event_pool_assigned, 0);
+ if (!err_is_fail(err)) {
+ ev->params.pool_assigned.err = BULK_TRANSFER_POOL_ALREADY_ASSIGNED;
+ ev->cont = lev->cont;
+ event_enqueue(ev);
+ }
+ return;
+ }
+
+ /* allocate the structures for the pool */
+ err = bulk_pool_alloc_with_id(&pool, pool->num_buffers, pool->buffer_size,
+ pool->id);
+ if (err_is_fail(err)) {
+ USER_PANIC_ERR(err, "Failed to allocate pool struct\n");
+ return;
+ }
+
+ /*
+ * prepare the cap
+ */
+ if (lev->params.pool_assigned.pool->trust == BULK_TRUST_FULL) {
+ err = slot_alloc(&pool->pool_cap);
+ if (err_is_fail(err)) {
+ EVENT_DEBUG("could not allocate a new slot for the cap: %s",
+ err_getstring(err));
+ assigned = err;
+ goto done;
+ }
+
+ err = cap_copy(pool->pool_cap,
+ lev->params.pool_assigned.pool->pool_cap);
+ if (err_is_fail(err)) {
+ EVENT_DEBUG("could not allocate a new slot for the cap: %s",
+ err_getstring(err));
+ assigned = err;
+ goto done;
+ }
+ }
+
+ pool->trust = lev->params.pool_assigned.pool->trust;
+
+ /*
+ * XXX: we set the trust level to none here to avoid the creation of
+ * the buffer caps. These have already been created and cannot
+ * be created a second time. [SYS_ERR_REVOKE_FIRST]
+ */
+
+ err = bulk_pool_map(pool);
+ if (err_is_fail(err)) {
+ assigned = err;
+ goto done;
+ }
+
+ assert(lev->channel->callbacks->pool_assigned);
+ if (lev->channel->callbacks->pool_assigned) {
+ assigned = lev->channel->callbacks->pool_assigned(lev->channel, pool);
+ } else {
+ /* XXX: or if no cb set, just say SYS_ERR_OK ? */
+ assigned = BULK_TRANSFER_NO_CALLBACK;
+ }
+ done:
+
+ if (err_is_fail(assigned)) {
+ bulk_pool_unmap(pool);
+ bulk_pool_dealloc(pool);
+ } else {
+ err = bulk_pool_assign(pool, lev->channel);
+ if (err_is_fail(err)) {
+ USER_PANIC_ERR(err, "Failed to assign the pool to the channel\n");
+ }
+ }
err = event_alloc(l->other, &ev, event_pool_assigned, 0);
if (!err_is_fail(err)) {
+ ev->params.pool_assigned.err = assigned;
+ ev->params.pool_assigned.pool = lev->params.pool_assigned.pool;
+ ev->cont = lev->cont;
+ event_enqueue(ev);
+ }
+
+}
+
+/**
+ * Implementation specific handler for pool assing requests
+ *
+ * Side: Assigning Side
+ */
+static errval_t impl_assign_pool(struct bulk_channel *channel,
+ struct bulk_pool *pool,
+ struct bulk_continuation cont)
+{
+ errval_t err;
+ struct local_event *ev;
+ struct local_channel *l = channel->impl_data;
+
+ /* allocate and trigger the event */
+ err = event_alloc(l->other, &ev, event_pool_assign, 0);
+ if (!err_is_fail(err)) {
ev->params.pool_assigned.pool = pool;
+ ev->cont = cont;
event_enqueue(ev);
}
return err;
}
+/* -------------------------- event buffer move ---------------------------- */
+
+/**
+ * Gets called when a buffer arrives via move operation
+ *
+ * Side: Receiving Side (SINK)
+ */
+static void event_move_received(struct local_event *lev)
+{
+ errval_t err;
+
+ struct bulk_pool *pool = bulk_pool_get(&lev->params.move_received.pool_id,
+ lev->channel);
+ size_t bufid = lev->params.copy_released.buf_id;
+
+ EVENT_DEBUG(" > pool=%p, bufid=%x", pool, (unsigned int )bufid);
+ assert(pool);
+
+ struct bulk_buffer *buf = pool->buffers[bufid];
+
+ err = bulk_buffer_change_state(buf, BULK_BUFFER_READ_WRITE);
+ if (err_is_fail(err)) {
+ USER_PANIC_ERR(err, "could not change the state of the buffer.");
+ }
+
+ if (lev->channel->callbacks->move_received) {
+ lev->channel->callbacks->move_received(lev->channel, buf,
+ lev->params.move_received.meta);
+ }
+}
+
+/**
+ * Implementation specific handler of the buffer move operation
+ *
+ * Side: Sending Side (SOURCE)
+ */
static errval_t impl_move(struct bulk_channel *channel,
- struct bulk_buffer *buffer,
- void *meta,
+ struct bulk_buffer *buffer,
+ void *meta,
struct bulk_continuation cont)
{
+ errval_t err;
+ struct local_event *ev, *ev2;
+
struct local_channel *l = channel->impl_data;
- struct local_event *ev;
+
void *m;
- errval_t err;
- err = event_alloc(l->other, &ev, event_move_received, channel->meta_size);
+ IMPL_DEBUG(" > buffer=%p", buffer->address);
+
+ /* trigger event to other channel */
+ size_t meta_size = 0;
+ if (meta) {
+ meta_size = channel->meta_size;
+ }
+ err = event_alloc(l->other, &ev, event_move_received, meta_size);
if (!err_is_fail(err)) {
- m = ev + 1;
- memcpy(m, meta, channel->meta_size);
+ ev->params.move_received.meta = NULL;
+ if (meta) {
+ /* copy the meta data */
+ m = ev + 1;
+ memcpy(m, meta, channel->meta_size);
+ ev->params.move_received.meta = m;
+ }
+ /* set parameters of the event */
+ ev->params.move_received.pool_id = buffer->pool->id;
+ ev->params.move_received.buf_id = ((lvaddr_t) buffer->address
+ - buffer->pool->base_address)
+ / buffer->pool->buffer_size;
ev->params.move_received.buffer = buffer;
- ev->params.move_received.meta = m;
event_enqueue(ev);
}
+
+ /* trigger operation done event to this channel */
+ err = event_alloc(channel, &ev2, event_op_done, 0);
+ if (!err_is_fail(err)) {
+ ev2->cont = cont;
+ event_op_done(ev2);
+ }
+
return err;
}
+/* -------------------------- event buffer pass ---------------------------- */
+
+/**
+ * Gets called when a buffer pass event occurs on the sending side
+ *
+ * Side: Sending Side (SOURCE)
+ */
+static void event_buffer_received(struct local_event *lev)
+{
+ errval_t err;
+
+ struct bulk_pool *pool = bulk_pool_get(&lev->params.buffer_received.pool_id,
+ lev->channel);
+ assert(pool);
+
+ size_t bufid = lev->params.buffer_received.buf_id;
+ struct bulk_buffer *buf = pool->buffers[bufid];
+
+ assert(bufid < pool->num_buffers);
+
+ EVENT_DEBUG(" > buffer=[%p], bufid=0x%x", buf, (unsigned int )bufid);
+
+ /* we need to change the state of the buffer */
+ err = bulk_buffer_change_state(buf, BULK_BUFFER_READ_WRITE);
+ if (err_is_fail(err)) {
+ /* TODO: error handling */
+ USER_PANIC_ERR(err, "could not change the state of the buffer.");
+ }
+
+ /* inform the application */
+ if (lev->channel->callbacks->buffer_received) {
+ lev->channel->callbacks->buffer_received(
+ lev->channel, buf, lev->params.buffer_received.meta);
+ }
+}
+
+/**
+ * Backend specific handler for buffer pass operations
+ *
+ * Side: Receiving Side (SINK)
+ */
static errval_t impl_pass(struct bulk_channel *channel,
- struct bulk_buffer *buffer,
- void *meta,
+ struct bulk_buffer *buffer,
+ void *meta,
struct bulk_continuation cont)
{
- struct local_channel *l = channel->impl_data;
- struct local_event *ev;
- void *m;
errval_t err;
+ struct local_event *ev, *ev2;
+ void *m;
+ struct local_channel *l = channel->impl_data;
+
+ IMPL_DEBUG(" > buffer=%p", buffer->address);
- err = event_alloc(l->other, &ev, event_buffer_received, channel->meta_size);
+ size_t meta_size = 0;
+ if (meta) {
+ meta_size = channel->meta_size;
+ }
+ /* allocate and trigger event */
+ err = event_alloc(l->other, &ev, event_buffer_received, meta_size);
if (!err_is_fail(err)) {
- m = ev + 1;
- memcpy(m, meta, channel->meta_size);
+ ev->params.buffer_received.meta = NULL;
+
+ if (meta) {
+ /* copy meta data */
+ m = ev + 1;
+ memcpy(m, meta, channel->meta_size);
+ ev->params.buffer_received.meta = m;
+
+ }
+ /* set event params */
+ ev->params.buffer_received.pool_id = buffer->pool->id;
+ ev->params.buffer_received.buf_id = ((lvaddr_t) buffer->address
+ - buffer->pool->base_address)
+ / buffer->pool->buffer_size;
ev->params.buffer_received.buffer = buffer;
- ev->params.buffer_received.meta = m;
event_enqueue(ev);
}
+
+ /* trigger op done event */
+ err = event_alloc(channel, &ev2, event_op_done, 0);
+ if (!err_is_fail(err)) {
+ ev2->cont = cont;
+ event_op_done(ev2);
+ }
return err;
}
+/* -------------------------- event buffer copy ---------------------------- */
+
+static void event_copy_received(struct local_event *lev)
+{
+ struct bulk_pool *pool = bulk_pool_get(&lev->params.copy_received.pool_id,
+ lev->channel);
+ size_t bufid = lev->params.copy_released.buf_id;
+ EVENT_DEBUG(" > pool=%p, bufid=%x", pool, (unsigned int )bufid);
+ assert(pool);
+
+ struct bulk_buffer *buf = pool->buffers[bufid];
+
+ errval_t err = bulk_buffer_change_state(buf, BULK_BUFFER_READ_ONLY);
+ if (err_is_fail(err)) {
+ /* TODO: error handling */
+ USER_PANIC_ERR(err, "failed to change the state");
+ }
+ if (lev->channel->callbacks->copy_received) {
+ lev->channel->callbacks->copy_received(lev->channel, buf,
+ lev->params.copy_received.meta);
+
+ }
+}
+
static errval_t impl_copy(struct bulk_channel *channel,
- struct bulk_buffer *buffer,
- void *meta,
+ struct bulk_buffer *buffer,
+ void *meta,
struct bulk_continuation cont)
{
+ IMPL_DEBUG(" > buffer=%p", buffer->address);
struct local_channel *l = channel->impl_data;
- struct local_event *ev;
+ struct local_event *ev, *ev2;
void *m;
errval_t err;
-
- err = event_alloc(l->other, &ev, event_copy_received, channel->meta_size);
+ size_t meta_size = 0;
+ if (meta) {
+ meta_size = channel->meta_size;
+ }
+ err = event_alloc(l->other, &ev, event_copy_received, meta_size);
if (!err_is_fail(err)) {
- m = ev + 1;
- memcpy(m, meta, channel->meta_size);
+ ev->params.copy_received.meta = NULL;
+ if (meta) {
+ m = ev + 1;
+ memcpy(m, meta, channel->meta_size);
+ ev->params.copy_received.meta = m;
+ }
ev->params.copy_received.buffer = buffer;
- ev->params.copy_received.meta = m;
+ ev->params.copy_received.pool_id = buffer->pool->id;
+ ev->params.copy_received.buf_id = ((lvaddr_t) buffer->address
+ - buffer->pool->base_address)
+ / buffer->pool->buffer_size;
event_enqueue(ev);
}
+
+ /* trigger op done event */
+ err = event_alloc(channel, &ev2, event_op_done, 0);
+ if (!err_is_fail(err)) {
+ ev2->cont = cont;
+ event_op_done(ev2);
+ }
return err;
}
+/* -------------------------- event copy release --------------------------- */
+
+/**
+ * Gets called when a copy release event occurred
+ *
+ * Side: Sending Side (SOURCE)
+ */
+static void event_copy_released(struct local_event *lev)
+{
+ errval_t err;
+
+ struct bulk_pool *pool = bulk_pool_get(&lev->params.copy_released.pool_id,
+ lev->channel);
+ assert(pool);
+
+ size_t bufid = lev->params.copy_released.buf_id;
+ struct bulk_buffer *buf = pool->buffers[bufid];
+
+ buf->local_ref_count--;
+
+ EVENT_DEBUG(" > buffer=[%p], bufid=0x%x", buf, (unsigned int )bufid);
+
+ /* change the state of the buffer */
+ if (buf->state == BULK_BUFFER_RO_OWNED && bulk_buffer_can_release(buf)) {
+ err = bulk_buffer_change_state(buf, BULK_BUFFER_READ_WRITE);
+ if (err_is_fail(err)) {
+ USER_PANIC_ERR(err, "failed to change the state");
+ }
+ }
+
+ /* inform the application */
+ if (lev->channel->callbacks->copy_released) {
+ lev->channel->callbacks->copy_released(lev->channel, buf);
+ }
+}
+
+/**
+ *
+ */
static errval_t impl_release(struct bulk_channel *channel,
- struct bulk_buffer *buffer,
+ struct bulk_buffer *buffer,
struct bulk_continuation cont)
{
- struct local_channel *l = channel->impl_data;
- struct local_event *ev;
+ struct local_event *ev, *ev2;
errval_t err;
+ struct local_channel *l = channel->impl_data;
+
+ IMPL_DEBUG(" > buffer=%p", buffer->address);
+ /* allocate and trigger event */
err = event_alloc(l->other, &ev, event_copy_released, 0);
if (!err_is_fail(err)) {
ev->params.copy_released.buffer = buffer;
+ ev->params.copy_released.pool_id = buffer->pool->id;
+ ev->params.copy_released.buf_id = ((lvaddr_t) buffer->address
+ - buffer->pool->base_address)
+ / buffer->pool->buffer_size;
+
event_enqueue(ev);
}
+
+ /* trigger op done event */
+ err = event_alloc(channel, &ev2, event_op_done, 0);
+ if (!err_is_fail(err)) {
+ ev2->cont = cont;
+ event_op_done(ev2);
+ }
return err;
}
+/* -------------------------- channel creation ----------------------------- */
+/**
+ * Implementation specific handler for channel creation
+ */
+static errval_t impl_create(struct bulk_channel *channel)
+{
+ return init_channel(channel);
+}
+
+/**
+ * initializes the local endpoint
+ */
void bulk_local_init_endpoint(struct bulk_local_endpoint *endpoint,
- struct bulk_channel *other_channel)
+ struct bulk_channel *other_channel)
{
endpoint->generic.f = &implementation;
endpoint->other_channel = other_channel;
* ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
*/
-struct bulk_backend_vtbl {
+#ifndef BULK_NET_BACKEND_H
+#define BULK_NET_BACKEND_H
+#include <if/e10k_defs.h>
+#include <dev/e10k_dev.h>
+#include <bulk_transfer/bulk_net_proxy.h>
+#include "e10k_queue.h"
+
+
+/*
+ * DEFINES FOR DEBUG OUTPUT
+ */
+/// enables/disables the entire debug outputs of the bulk net backend
+#define BULK_NET_ENABLE_DEBUG 1
+
+/// enables/disables status messages
+#define BULK_NET_ENABLE_STATUS 1
+
+/// enables/disables the tracing debug output
+#define BULK_NET_ENABLE_TRACE 0
+
+/// enables/disables the debug output for the e10k module
+#define BULK_NET_ENABLE_DEBUG_E10K 0
+#define BULK_NET_ENABLE_STATUS_E10K 0
+
+/// enables/disables the debug output for the transfer module
+#define BULK_NET_ENABLE_DEBUG_TRANSF 0
+#define BULK_NET_ENABLE_STATUS_TRANSF 0
+
+/// enables/disables the debug output for the backend module
+#define BULK_NET_ENABLE_DEBUG_BACKEND 1
+#define BULK_NET_ENABLE_STATUS_BACKEND 1
+
+#if BULK_NET_ENABLE_DEBUG
+#define BULK_NET_DEBUG(fmt, msg...) debug_printf("%s(): "fmt"\n", __func__, msg);
+#else
+#define BULK_NET_DEBUG(x...) do {} while(0);
+#endif
+
+#if BULK_NET_ENABLE_STATUS
+#define BULK_NET_STATUS(fmt, msg...) debug_printf("%s(): "fmt"\n", __func__, msg);
+#else
+#define BULK_NET_STATUS(x...) do {} while(0);
+#endif
+
+#if BULK_NET_ENABLE_DEBUG && BULK_NET_ENABLE_TRACE
+#define BULK_NET_TRACE debug_printf("%s\n", __func__);
+#else
+#define BULK_NET_TRACE do{} while(0);
+#endif
+
+
+/*
+ * the following values are used in the endpoint creation
+ */
+#define BULK_NET_DEFAULT_BUFFER_SIZE 0x1000
+#define BULK_NET_DEFAULT_BUFFER_COUNT 0xFF
+#define BULK_NET_NOCOPY_SPARE_BUFFERS 1.5
+#define BULK_NET_DEFAULT_QUEUES 2
+
+#define BULK_NET_NOCOPY_META_BUFFER_SIZE 512
+
+#define BULK_NET_TRANSFER_NUM_DESCS 1024
+
+#define BULK_NET_TRANSFER_DESCLEN 4
+
+#define BULK_NET_INTERNAL_BUFER_SIZE 512
+
+struct bulk_implementation *bulk_net_get_impl(void);
+struct bulk_implementation *bulk_net_get_impl_no_copy(void);
+
+
+
+/// switch to turn on message dumping
+#define DO_MSG_DUMP 0
+
+
+#define BULK_NET_DESCLEN 4
+
+struct receive_buffer {
+ void *hdr_virt;
+ uintptr_t hdr_phys;
+
+ void *virt;
+ uintptr_t phys;
+
+ struct bulk_buffer *buffer;
+ bool is_meta;
};
+#define INT_BUFSZ 512
+struct transmit_buffer {
+ void *hdr_virt;
+ uintptr_t hdr_phys;
+
+ bool is_copy;
+ struct bulk_buffer *buffer;
+ struct bulk_continuation cont;
+
+ void *int_virt;
+ uintptr_t int_phys;
+};
+
+
+struct packet_header {
+ struct {
+ uint8_t dmac[6];
+ uint8_t smac[6];
+ uint16_t type;
+ } __attribute__((packed)) l2;
+ struct {
+ uint8_t ver_ihl;
+ uint8_t dscp;
+ uint16_t len;
+ uint16_t id;
+ uint16_t offset;
+ uint8_t ttl;
+ uint8_t proto;
+ uint16_t checksum;
+ uint32_t s_ip;
+ uint32_t d_ip;
+ } __attribute__((packed)) l3;
+ struct {
+ uint16_t s_port;
+ uint16_t d_port;
+ uint16_t len;
+ uint16_t checksum;
+ } __attribute__((packed)) l4;
+} __attribute__((packed));
+
+
+/**
+ * Descriptor for passing around buffer chains with a reasonable length. Note
+ * that only the parts up to the first one with size == 0 are considered.
+ */
+struct bulk_net_msgdesc {
+ struct {
+ uint64_t phys;
+ size_t size;
+ void *opaque;
+ } parts[BULK_NET_DESCLEN];
+};
+
+
+#define E10K_HDRSZ 128
+#define E10K_DESCSZ (sizeof(e10k_q_tdesc_adv_wb_array_t))
+
+
+void stack_alloc_init(struct stack_allocator *alloc, size_t size);
+bool stack_alloc_free(struct stack_allocator *alloc, void *el);
+void *stack_alloc_alloc(struct stack_allocator *alloc);
+
+
+
+/******************************************************************************/
+/* e10k direct access channel */
+
+/**
+ * Initialize directly mapped RX/TX queue pair with e10k NIC.
+ *
+ * @param bu Channel struct
+ * @param ws Waitset
+ * @param card Card name
+ * @param queue Queue ID to use
+ * @param buffer_size Size of receive buffers in bytes
+ * @param ring_size Number of descriptors in the RX/TX rings
+ * @param received Callback for a received packet
+ * @param transmitted Callback for a transmitted packet
+ */
+errval_t bulk_e10k_init(struct bulk_e10k *bu,
+ struct waitset *ws,
+ const char *card,
+ uint8_t queue,
+ size_t buffer_size,
+ size_t ring_size,
+ void (*received)(struct bulk_e10k *,
+ struct bulk_net_msgdesc *),
+ void (*transmitted)(struct bulk_e10k *, void *));
+
+/**
+ * Add a buffer to the receive queue.
+ *
+ * @param bu Channel struct
+ * @param phys Physical address of buffer
+ * @param header Physical address of header buffer (needs E10K_HDRSZ bytes)
+ * @param opaque User-Data for this buffer, will be returned when it is used in
+ * a received packet.
+ */
+errval_t bulk_e10k_rx_add(struct bulk_e10k *bu, uint64_t phys, uint64_t header,
+ void *opaque);
+
+/**
+ * Send out a packet.
+ *
+ * @param bu Channel struct
+ * @param decs Descriptor for buffer chain to transmit
+ */
+errval_t bulk_e10k_send(struct bulk_e10k *bu, struct bulk_net_msgdesc *desc);
+
+
+/**
+ * Steer a specific UDP port to this queue.
+ *
+ * @param bu Channel struct
+ * @param port Port to allocate (in host byte order)
+ */
+errval_t bulk_e10k_port_add(struct bulk_e10k *bu, uint16_t port);
+
+/**
+ * Allocate an unused UDP port and steer it to this queue.
+ *
+ * @param bu Channel struct
+ * @param port Pointer to variable where port number will be stored (host byte
+ * order)
+ */
+errval_t bulk_e10k_port_alloc(struct bulk_e10k *bu, uint16_t *port);
+
+/**
+ * Get IP address configured for this interface.
+ *
+ * @param bu Channel struct
+ * @param ip Pointer to variable where IP will be stored (host byte order)
+ */
+errval_t bulk_e10k_ip_info(struct bulk_e10k *bu, uint32_t *ip);
+
+/**
+ * Do an ARP lookup on this interface
+ *
+ * @param bu Channnel struct
+ * @param ip IP address to resolve (in host byte order)
+ * @param mac Pointer to variable where MAC address will be stored
+ */
+errval_t bulk_e10k_arp_lookup(struct bulk_e10k *bu, uint32_t ip, uint64_t *mac);
+
+/******************************************************************************/
+
+
+/** Allocate and map a frame */
+static inline errval_t allocmap_frame(size_t size, void **virt, uintptr_t *phys,
+ struct capref *cap)
+{
+ errval_t err;
+ struct frame_identity fid = { 0, 0 };
+ struct capref c;
+
+
+ err = frame_alloc(&c, size, NULL);
+ assert(err_is_ok(err));
+
+ if (phys) {
+ invoke_frame_identify(c, &fid);
+ *phys = fid.base;
+ }
+
+ err = vspace_map_one_frame_attr(virt, size, c,
+ VREGION_FLAGS_READ_WRITE, NULL, NULL);
+
+ if (cap != NULL) {
+ *cap = c;
+ }
+ return err;
+
+}
+#endif /* BULK_NET_BACKEND_H */
+++ /dev/null
-/**
- * \file
- * \brief Unidirectional bulk data transfer via shared memory
- */
-
-/*
- * Copyright (c) 2009, 2010, 2011, 2012, ETH Zurich.
- * All rights reserved.
- *
- * This file is distributed under the terms in the attached LICENSE file.
- * If you do not find this file, copies can be found by writing to:
- * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
- */
-
-#include <barrelfish/barrelfish.h>
-
-#include <bulk_transfer/bulk_transfer.h>
-#include <bulk_transfer/bulk_net.h>
-
-
+++ /dev/null
-/**
- * \file
- * \brief Unidirectional bulk data transfer via shared memory
- */
-
-/*
- * Copyright (c) 2009, 2010, 2011, 2012, ETH Zurich.
- * All rights reserved.
- *
- * This file is distributed under the terms in the attached LICENSE file.
- * If you do not find this file, copies can be found by writing to:
- * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
- */
-
-#include <barrelfish/barrelfish.h>
-
-#include <bulk_transfer/bulk_transfer.h>
-#include <bulk_transfer/bulk_net.h>
-
-
-errval_t bulk_net_channel_create(struct bulk_channel *channel)
-{
- assert(!"NYI: bulk_net_channel_create");
-
- /*
- * TODO:
- * - initialize the channel struct
- * - initialize the network queues
- *
- * bool lwip_init(const char *card_name, uint64_t queueid)
- * -> does this has to be done once per domain or for every channel?
- *
- * struct tcp_pcb *pcb = tcp_new();
- * // err_t e = tcp_bind(pcb, IP_ADDR_ANY, (HTTP_PORT + disp_get_core_id()));
- * err_t e = tcp_bind(pcb, IP_ADDR_ANY, HTTP_PORT);
- * assert(e == ERR_OK);
- * pcb = tcp_listen(pcb);
- * assert(pcb != NULL);
- * tcp_arg(pcb, pcb);
- * tcp_accept(pcb, http_server_accept);
- * - start listening on the queue
- */
- return SYS_ERR_OK;
-}
-
-errval_t bulk_net_channel_bind(struct bulk_channel *channel)
-{
- assert(!"NYI: bulk_net_channel_bind");
-
- /*
- * TODO:
- * - initialize the channel struct
- * - initialize local queues
- * struct tcp_pcb *pcb = tcp_new();
- * tcp_connect();
- * tcp_write...
- * - connect to the network server
- */
-
- return SYS_ERR_OK;
-}
-
-errval_t bulk_net_channel_assign_pool(struct bulk_channel *channel,
- struct bulk_pool *pool)
-{
- assert(!"NYI: bulk_net_channel_assign_pool");
-
- /*
- * TODO:
- * - send control message with pool information to the other side
- */
-
- return SYS_ERR_OK;
-}
-
-errval_t bulk_net_channel_move(struct bulk_channel *channel,
- struct bulk_buffer *buffer,
- void *meta,
- struct bulk_continuation cont)
-{
- assert(!"NYI: bulk_net_channel_move");
-
-
-
- /**
- *
- * TODO:
- * - prepend meta data / header
- * - enqueue buffer on send queue
- * - register sent callback
- * - if (owner then set to read/write))
- * - free up buffer and hand it back to the pool
- */
-
- return SYS_ERR_OK;
-}
-
-/**
- *
- */
-errval_t bulk_net_channel_pass(struct bulk_channel *channel,
- struct bulk_buffer *buffer,
- void *meta,
- struct bulk_continuation cont)
-{
- /* this is a no-op over the network hop,
- * just do a buffer_release at this point */
- return bulk_net_channel_release(channel, buffer, cont);
-}
-
-
-errval_t bulk_net_channel_copy(struct bulk_channel *channel,
- struct bulk_buffer *buffer,
- void *meta,
- struct bulk_continuation cont)
-{
- assert(!"NYI: bulk_net_channel_copy");
-
- /*
- * TODO:
- * - prepend meta data / header
- * - enqueue buffer on send queue
- * - register sent callback
- * - if (owner then set to read/write))
- */
-
- return SYS_ERR_OK;
-}
-
-errval_t bulk_net_channel_release(struct bulk_channel *channel,
- struct bulk_buffer *buffer,
- struct bulk_continuation cont)
-{
- /* this is a no-op over the network hop */
- return SYS_ERR_OK;
-}
-
-struct bulk_implementation bulk_net_implementation = {
- .channel_create = bulk_net_channel_create,
- .channel_bind = bulk_net_channel_bind,
- .assign_pool = bulk_net_channel_assign_pool,
- .move = bulk_net_channel_move,
- .pass = bulk_net_channel_pass,
- .copy = bulk_net_channel_copy,
- .release = bulk_net_channel_release
-};
-
-struct bulk_implementation *bulk_net_get_implementation(void) {
- return &bulk_net_implementation;
-}
+++ /dev/null
-/**
- * \file
- * \brief Unidirectional bulk data transfer via shared memory
- */
-
-/*
- * Copyright (c) 2009, 2010, 2011, 2012, ETH Zurich.
- * All rights reserved.
- *
- * This file is distributed under the terms in the attached LICENSE file.
- * If you do not find this file, copies can be found by writing to:
- * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
- */
-
-#include <barrelfish/barrelfish.h>
-
-#include <bulk_transfer/bulk_transfer.h>
-#include <bulk_transfer/bulk_net.h>
-
-
--- /dev/null
+/*
+ * Copyright (c) 2014 ETH Zurich.
+ * All rights reserved.
+ *
+ * This file is distributed under the terms in the attached LICENSE file.
+ * If you do not find this file, copies can be found by writing to:
+ * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
+ */
+
+#include <stdio.h>
+#include <stddef.h>
+
+#include <barrelfish/barrelfish.h>
+#include <barrelfish/nameservice_client.h>
+#include <barrelfish/threads.h>
+#include <barrelfish/waitset_chan.h>
+#include <ipv4/lwip/inet.h>
+#include <bulk_transfer/bulk_transfer.h>
+#include <pci/pci.h>
+
+#include <if/net_ports_defs.h>
+#include <if/net_ports_rpcclient_defs.h>
+#include <if/net_ARP_defs.h>
+#include <if/net_ARP_rpcclient_defs.h>
+#include <if/e10k_defs.h>
+
+#include <dev/e10k_dev.h>
+#include <dev/e10k_q_dev.h>
+
+#include "bulk_net_backend.h"
+
+#define E10K_MNG_SUF "_e10kmng"
+
+#define ETHHDR_LEN 14
+#define IPHDR_LEN 20
+
+
+#if BULK_NET_ENABLE_DEBUG && BULK_NET_ENABLE_DEBUG_E10K
+#define DEBUG(x...) debug_printf("e10k: " x)
+#if BULK_NET_ENABLE_TRACE
+#define BULK_NET_ 1
+#else
+#endif
+#else
+#define BULK_NET_ENABLE_E10K_TRACE
+#define DEBUG(x...)
+#endif
+
+#define USE_INTERRUPTS 0
+#define USE_WSPOLL 1
+
+struct e10k_rx_event {
+ struct bulk_e10k *bu;
+ struct bulk_net_msgdesc msg;
+ struct event_queue_node eqn;
+};
+
+struct e10k_tx_event {
+ struct bulk_e10k *bu;
+ void *op;
+ struct event_queue_node eqn;
+};
+
+
+static struct net_ports_rpc_client net_ports_rpc;
+static bool net_ports_connected = false;
+
+static struct net_ARP_rpc_client net_arp_rpc;
+static bool net_arp_connected = false;
+
+static errval_t update_rxtail(void *opaque, size_t tail);
+static errval_t update_txtail(void *opaque, size_t tail);
+#if USE_INTERRUPTS
+static void interrupt_handler(void *arg);
+#endif
+#if USE_WSPOLL
+void bulk_e10k_poll(struct waitset_chanstate *chan);
+#endif
+
+/* Declarations for e10k flounder interface */
+static void idc_request_device_info(struct bulk_e10k *bu);
+static void idc_register_queue_memory(struct bulk_e10k *bu);
+static void idc_queue_init_data(struct e10k_binding *b, struct capref registers,
+ uint64_t macaddr);
+static void idc_queue_memory_registered(struct e10k_binding *b);
+static void idc_write_queue_tails(struct e10k_binding *b);
+
+static struct e10k_rx_vtbl rx_vtbl = {
+ .queue_init_data = idc_queue_init_data,
+ .queue_memory_registered = idc_queue_memory_registered,
+ .write_queue_tails = idc_write_queue_tails,
+};
+
+
+/*****************************************************************************/
+/* Port manager client */
+
+/** Bind specific port to queue */
+static errval_t port_bind(uint64_t b_rx, uint64_t b_tx, uint64_t q,
+ uint16_t port)
+{
+ errval_t err, msgerr;
+
+ err = net_ports_rpc.vtbl.bind_port(&net_ports_rpc, net_ports_PORT_UDP, port,
+ b_rx, b_tx, 0, q, &msgerr);
+ if (err_is_fail(err)) {
+ return err;
+ }
+
+ return msgerr;
+}
+
+/** Get any free port and bind it to the queue */
+static errval_t port_get(uint64_t b_rx, uint64_t b_tx, uint64_t q,
+ uint16_t *port)
+{
+ errval_t err, msgerr;
+
+ err = net_ports_rpc.vtbl.get_port(&net_ports_rpc, net_ports_PORT_UDP,
+ b_rx, b_tx, 0, q, &msgerr, port);
+ if (err_is_fail(err)) {
+ return err;
+ }
+
+ return msgerr;
+}
+
+static void p_bind_cb(void *st, errval_t err, struct net_ports_binding *b)
+{
+ assert(err_is_ok(err));
+ err = net_ports_rpc_client_init(&net_ports_rpc, b);
+ assert(err_is_ok(err));
+ net_ports_connected = true;
+}
+
+/** Bind to ports service (currently blocking) */
+static void bind_ports(struct waitset *ws)
+{
+ errval_t err;
+ iref_t iref;
+
+ DEBUG("bind_ports()\n");
+ err = nameservice_blocking_lookup("e10k_PORTS_MNG", &iref);
+ assert(err_is_ok(err));
+ DEBUG("resolved\n");
+
+ err = net_ports_bind(iref, p_bind_cb, NULL, ws, IDC_BIND_FLAGS_DEFAULT);
+ assert(err_is_ok(err));
+ DEBUG("binding initiated\n");
+
+ while (!net_ports_connected) {
+ event_dispatch_non_block(ws);
+ event_dispatch_non_block(get_default_waitset());
+ }
+ DEBUG("bound_ports\n");
+}
+
+
+/*****************************************************************************/
+/* ARP service client */
+
+/** Get information about the local TCP/IP configuration*/
+static errval_t arp_ip_info(uint32_t *ip, uint32_t *gw, uint32_t *mask)
+{
+ errval_t err, msgerr;
+
+ err = net_arp_rpc.vtbl.ip_info(&net_arp_rpc, 0, &msgerr, ip, gw, mask);
+ if (err_is_fail(err)) {
+ return err;
+ }
+ return msgerr;
+}
+
+/** Do an ARP lookup of an ip address */
+static errval_t arp_lookup(uint32_t ip, uint64_t *mac)
+{
+ errval_t err, msgerr;
+
+ err = net_arp_rpc.vtbl.ARP_lookup(&net_arp_rpc, ip, 0, true, &msgerr, mac);
+ if (err_is_fail(err)) {
+ return err;
+ }
+ return msgerr;
+}
+
+static void a_bind_cb(void *st, errval_t err, struct net_ARP_binding *b)
+{
+ assert(err_is_ok(err));
+ err = net_ARP_rpc_client_init(&net_arp_rpc, b);
+ assert(err_is_ok(err));
+ net_arp_connected = true;
+}
+
+/** Bind to ARP service (currently blocking) */
+static void bind_arp(struct waitset *ws)
+{
+ errval_t err;
+ iref_t iref;
+
+ DEBUG("bind_arp()\n");
+ err = nameservice_blocking_lookup("e10k_ARP", &iref);
+ assert(err_is_ok(err));
+ DEBUG("resolved\n");
+
+ err = net_ARP_bind(iref, a_bind_cb, NULL, ws, IDC_BIND_FLAGS_DEFAULT);
+ assert(err_is_ok(err));
+ DEBUG("binding initiated\n");
+
+ while (!net_arp_connected) {
+ event_dispatch_non_block(ws);
+ event_dispatch_non_block(get_default_waitset());
+ }
+ DEBUG("bound_arp\n");
+}
+
+
+/******************************************************************************/
+/* e10k card driver interface */
+
+/** e10k interface: callback for a successful binding */
+static void bind_cb(void *st, errval_t err, struct e10k_binding *b)
+{
+ DEBUG("bind_cb()\n");
+ struct bulk_e10k *bu = st;
+
+ assert(err_is_ok(err));
+
+ b->rx_vtbl = rx_vtbl;
+ b->st = bu;
+ bu->binding = b;
+
+ idc_request_device_info(bu);
+}
+
+
+/** e10k interface: Send request for device information */
+static void idc_request_device_info(struct bulk_e10k *bu)
+{
+ errval_t err;
+
+ DEBUG("idc_request_device_info()\n");
+
+ err = e10k_request_device_info__tx(bu->binding, NOP_CONT);
+ assert(err_is_ok(err));
+
+}
+
+/** e10k interface: Register memory for descriptor rings */
+static void idc_register_queue_memory(struct bulk_e10k *bu)
+{
+ errval_t r;
+ DEBUG("idc_register_queue_memory()\n");
+
+ r = e10k_register_queue_memory__tx(bu->binding, NOP_CONT, bu->qi,
+ bu->txframe, bu->txhwbframe, bu->rxframe, bu->buffer_size, E10K_HDRSZ,
+ bu->int_vector, bu->int_core, USE_INTERRUPTS, false);
+ assert(err_is_ok(r));
+}
+
+/** e10k interface: Callback for request device info */
+static void idc_queue_init_data(struct e10k_binding *b, struct capref registers,
+ uint64_t macaddr)
+{
+ DEBUG("idc_queue_init_data()\n");
+
+ errval_t err;
+ struct bulk_e10k *bu = b->st;
+ struct frame_identity fid = { .base = 0, .bits = 0 };
+ void *virt, *rx, *tx, *txhwb;
+ uint8_t core;
+ struct e10k_queue_ops ops = {
+ .update_txtail = update_txtail,
+ .update_rxtail = update_rxtail
+ };
+
+ bu->mac = macaddr;
+
+ // Map registers
+ invoke_frame_identify(registers, &fid);
+ err = vspace_map_one_frame_attr(&virt, 1 << fid.bits, registers,
+ VREGION_FLAGS_READ_WRITE_NOCACHE, NULL, NULL);
+ assert(err_is_ok(err));
+
+ // Initialize mackerel device (must only be used for queue index register)
+ e10k_initialize(&bu->d, virt);
+
+ // Allocate and initialize memory for queues
+ err = allocmap_frame(bu->ring_size * E10K_DESCSZ, &rx, NULL, &bu->rxframe);
+ assert(err_is_ok(err));
+ err = allocmap_frame(bu->ring_size * E10K_DESCSZ, &tx, NULL, &bu->txframe);
+ assert(err_is_ok(err));
+ err = allocmap_frame(0x1000, &txhwb, NULL, &bu->txhwbframe);
+ assert(err_is_ok(err));
+
+ bu->q = e10k_queue_init(tx, bu->ring_size, txhwb, rx, bu->ring_size,
+ &ops, bu);
+
+ // Setup interrupt
+#if USE_INTERRUPTS
+ err = pci_setup_inthandler(interrupt_handler, bu, &bu->int_vector);
+ assert(err_is_ok(err));
+ bu->int_core = disp_get_core_id();
+
+#endif
+
+ DEBUG("idc_queue_init_data: done\n");
+
+ // Register ring memory with driver
+ core = disp_get_core_id();
+ idc_register_queue_memory(bu);
+}
+
+/** e10k interface: Callback for register queue memory */
+static void idc_queue_memory_registered(struct e10k_binding *b)
+{
+ struct bulk_e10k *bu = b->st;
+ DEBUG("idc_queue_memory_registered()\n");
+
+ bu->ready = true;
+}
+
+/**
+ * e10k interface: Callback for writing out queue tails (needed in case of card
+ * hangs)
+ */
+static void idc_write_queue_tails(struct e10k_binding *b)
+{
+ struct bulk_e10k *bu = b->st;
+ DEBUG("idc_write_queue_tails()\n");
+ e10k_queue_bump_rxtail(bu->q);
+ e10k_queue_bump_txtail(bu->q);
+}
+
+
+/*****************************************************************************/
+/* e10k queue management */
+
+static void recv_event(void *arg)
+{
+ DEBUG("recv_event\n");
+ struct e10k_rx_event *rxe = arg;
+ rxe->bu->received(rxe->bu, &rxe->msg);
+ stack_alloc_free(&rxe->bu->rx_event_alloc, rxe);
+}
+
+/** Try to process one packet in the receive queue */
+static bool recv_one(struct bulk_e10k *bu)
+{
+ void *op;
+ size_t len, hdrlen, i;
+ int last = 0, res;
+ uint64_t flags = 0;
+ struct e10k_rx_event *rxe = NULL; // Fix compile bug -- jb
+
+
+ i = 0;
+ do {
+ res = e10k_queue_get_rxbuf(bu->q, &op, &hdrlen, &len, &last, &flags);
+ if (res == 0) {
+ if (i == 0) {
+ rxe = stack_alloc_alloc(&bu->rx_event_alloc);
+ assert(rxe != NULL); // should never happen
+ }
+ DEBUG(" Received part[%"PRId64"] of packet op=%p hl=%"PRIx64" l=%"
+ PRIx64" f=%"PRIx64"\n", i, op, hdrlen, len, flags);
+ }
+ if (i == 0 && res != 0) {
+ return false;
+ } else if (res != 0) {
+ continue;
+ } else if ((i + !!hdrlen) >= BULK_NET_DESCLEN) {
+ USER_PANIC("Buffer chain longer than supported");
+ }
+
+ if (hdrlen > 0) {
+ rxe->msg.parts[i].size = hdrlen;
+ rxe->msg.parts[i].opaque = op;
+ i++;
+ }
+
+ rxe->msg.parts[i].size = len;
+ rxe->msg.parts[i].opaque = op;
+
+ i++;
+ } while (last != 1);
+
+ if (i < BULK_NET_DESCLEN) {
+ memset(&rxe->msg.parts[i], 0, sizeof(rxe->msg.parts[i]));
+ }
+
+#if !USE_INTERRUPTS && USE_WSPOLL
+ recv_event(rxe);
+#else
+ event_queue_add(&bu->event_queue, &rxe->eqn,
+ MKCLOSURE(recv_event, rxe));
+#endif
+
+ return true;
+}
+
+static void tx_event(void *arg)
+{
+ DEBUG("tx_event\n");
+ struct e10k_tx_event *txe = arg;
+ txe->bu->transmitted(txe->bu, txe->op);
+ stack_alloc_free(&txe->bu->tx_event_alloc, txe);
+
+}
+
+/** Check thee tx queues for transmits that have finshed */
+static bool check_tx(struct bulk_e10k *bu)
+{
+ void *op = NULL;
+ bool had = false;
+ struct e10k_tx_event *txe;
+
+#if 0
+ if (e10k_tdt_rd(&bu->d, bu->qi) != e10k_tdh_rd(&bu->d, bu->qi)) {
+ DEBUG("Nonempty: %"PRIx32" %"PRIx32"\n", e10k_tdt_rd(&bu->d,
+ bu->qi), e10k_tdh_rd(&bu->d, bu->qi));
+ }
+#endif
+ if (e10k_queue_get_txbuf(bu->q, &op) == 0) {
+ DEBUG("e10k packet sent\n");
+ txe = stack_alloc_alloc(&bu->tx_event_alloc);
+ assert(txe != NULL); // should never happen
+ txe->op = op;
+#if !USE_INTERRUPTS && USE_WSPOLL
+ tx_event(txe);
+#else
+ event_queue_add(&bu->event_queue, &txe->eqn,
+ MKCLOSURE(tx_event, txe));
+#endif
+ had = true;
+ }
+ return had;
+}
+
+#if USE_INTERRUPTS
+/** Interrupt handler for RX and TX events */
+static void interrupt_handler(void *arg)
+{
+ struct bulk_e10k *bu = arg;
+ DEBUG("Interrupt!\n");
+ while (recv_one(bu));
+ while (check_tx(bu));
+}
+#else
+#if USE_WSPOLL
+
+static inline struct bulk_e10k *wscs_to_e10k(struct waitset_chanstate *chan)
+{
+ return (struct bulk_e10k *)
+ ((uintptr_t) chan - offsetof(struct bulk_e10k, wscs));
+}
+
+static void ws_event(void *arg)
+{
+ struct bulk_e10k *bu = arg;
+ bool found, cur;
+ do {
+ found = false;
+ do {
+ cur = recv_one(bu);
+ found = found || cur;
+ } while (cur);
+ do {
+ cur = check_tx(bu);
+ found = found || cur;
+ } while (cur);
+ } while (found);
+
+ waitset_chan_register_polled(bu->waitset, &bu->wscs,
+ MKCLOSURE(ws_event, bu));
+
+}
+
+void bulk_e10k_poll(struct waitset_chanstate *chan)
+{
+ struct bulk_e10k *bu = wscs_to_e10k(chan);
+ // Check TX queue first, since it is cheaper
+ if (e10k_queue_get_txpoll(bu->q) != 0 &&
+ e10k_queue_rxpoll(bu->q) != 0)
+ {
+ return;
+ }
+
+ waitset_chan_trigger(chan);
+}
+
+#else
+
+/** Thread polling rx and tx queues */
+static int recv_thread(void *arg)
+{
+ struct bulk_e10k *bu = arg;
+ DEBUG("Start receiving thread...\n");
+ bool found;
+ while (1) {
+ found = check_tx(bu);
+ found = recv_one(bu) || found;
+ if (!found) {
+ thread_yield();
+ }
+ }
+ return 0;
+}
+#endif
+#endif
+
+
+/** Callback for queue manager (writes tx tail index) */
+static errval_t update_txtail(void *opaque, size_t tail)
+{
+ struct bulk_e10k *bu = opaque;
+ e10k_tdt_wr(&bu->d, bu->qi, tail);
+ return SYS_ERR_OK;
+}
+
+/** Callback for queue manager (writes rx tail index) */
+static errval_t update_rxtail(void *opaque, size_t tail)
+{
+ struct bulk_e10k *bu = opaque;
+ e10k_rdt_1_wr(&bu->d, bu->qi, tail);
+ return SYS_ERR_OK;
+}
+
+
+/*****************************************************************************/
+/* Public interface */
+
+/**
+ * Initialize directly mapped RX/TX queue pair with e10k NIC.
+ *
+ * @param bu Channel struct
+ * @param ws Waitset
+ * @param card Card name
+ * @param queue Queue ID to use
+ * @param buffer_size Size of receive buffers in bytes
+ * @param ring_size Number of descriptors in the RX/TX rings
+ * @param received Callback for a received packet
+ * @param transmitted Callback for a transmitted packet
+ */
+errval_t bulk_e10k_init(struct bulk_e10k *bu,
+ struct waitset *ws,
+ const char *card,
+ uint8_t queue,
+ size_t buffer_size,
+ size_t ring_size,
+ void (*received)(struct bulk_e10k *,
+ struct bulk_net_msgdesc *),
+ void (*transmitted)(struct bulk_e10k *, void *))
+{
+ errval_t err;
+ char name[strlen(card) + strlen(E10K_MNG_SUF) + 1];
+ iref_t iref;
+ struct e10k_rx_event *rxe;
+ struct e10k_tx_event *txe;
+ size_t i;
+
+
+ bu->qi = queue;
+ bu->ready = false;
+ bu->received = received;
+ bu->transmitted = transmitted;
+ bu->buffer_size = buffer_size;
+ bu->ring_size = ring_size;
+ bu->waitset = ws;
+
+ // Allocate events
+ stack_alloc_init(&bu->rx_event_alloc, ring_size);
+ stack_alloc_init(&bu->tx_event_alloc, ring_size);
+ rxe = calloc(ring_size, sizeof(*rxe));
+ txe = calloc(ring_size, sizeof(*txe));
+ for (i = 0; i < ring_size; i++) {
+ rxe[i].bu = bu;
+ txe[i].bu = bu;
+ stack_alloc_free(&bu->rx_event_alloc, rxe + i);
+ stack_alloc_free(&bu->tx_event_alloc, txe + i);
+ }
+
+ // Connect to port management service
+ bind_ports(ws);
+ bind_arp(ws);
+
+ // Bind to e10k card driver
+ strcpy(name, card);
+ strcat(name, E10K_MNG_SUF);
+ err = nameservice_blocking_lookup(name, &iref);
+ assert(err_is_ok(err));
+
+ DEBUG("Start binding\n");
+ err = e10k_bind(iref, bind_cb, bu, ws, IDC_BIND_FLAGS_DEFAULT);
+ assert(err_is_ok(err));
+
+ while (!bu->ready) {
+ event_dispatch_non_block(ws);
+ event_dispatch_non_block(get_default_waitset());
+ }
+
+#if USE_INTERRUPTS || !USE_WSPOLL
+ event_queue_init(&bu->event_queue, ws, EVENT_QUEUE_CONTINUOUS);
+#endif
+#if !USE_INTERRUPTS
+#if USE_WSPOLL
+ waitset_chanstate_init(&bu->wscs, CHANTYPE_BULK_E10K);
+ waitset_chan_register_polled(ws, &bu->wscs,
+ MKCLOSURE(ws_event, bu));
+#else
+ thread_create(recv_thread, bu);
+#endif
+#endif
+
+ return SYS_ERR_OK;
+}
+
+/**
+ * Add a buffer to the receive queue.
+ *
+ * @param bu Channel struct
+ * @param phys Physical address of buffer
+ * @param header Physical address of header buffer (needs E10K_HDRSZ bytes)
+ * @param opaque User-Data for this buffer, will be returned when it is used in
+ * a received packet.
+ */
+errval_t bulk_e10k_rx_add(struct bulk_e10k *bu, uint64_t phys, uint64_t header,
+ void *opaque)
+{
+ DEBUG("bulk_e10k_rx_add(transfer=%p, phy=%"PRIx64",header=%"PRIx64",opaque=%p)\n",
+ bu, phys, header, opaque);
+ int r = e10k_queue_add_rxbuf(bu->q, phys, header, opaque);
+ assert(r == 0);
+ e10k_queue_bump_rxtail(bu->q);
+ return SYS_ERR_OK;
+}
+
+/**
+ * Send out a packet.
+ *
+ * @param bu Channel struct
+ * @param decs Descriptor for buffer chain to transmit
+ */
+errval_t bulk_e10k_send(struct bulk_e10k *bu, struct bulk_net_msgdesc *desc)
+{
+ size_t totallen = 0;
+ size_t cnt = 0;
+ size_t i;
+ for (i = 0; i < BULK_NET_DESCLEN; i++) {
+ if (desc->parts[i].size == 0) {
+ break;
+ }
+ cnt++;
+ totallen += desc->parts[i].size;
+ }
+ DEBUG("bulk_e10k_send(len=%"PRIx64")\n", totallen);
+
+ e10k_queue_add_txcontext(bu->q, 0, ETHHDR_LEN, IPHDR_LEN, 0, 0);
+ e10k_queue_add_txbuf_ctx(bu->q, desc->parts[0].phys,
+ desc->parts[0].size, desc->parts[0].opaque, 1, cnt == 1,
+ totallen, 0, true, false);
+
+ for (i = 1; i < cnt; i++) {
+ e10k_queue_add_txbuf(bu->q, desc->parts[i].phys,
+ desc->parts[i].size, desc->parts[i].opaque, 0, i == cnt - 1,
+ totallen);
+ }
+ e10k_queue_bump_txtail(bu->q);
+ DEBUG("bulk_e10k_send_done\n");
+ return SYS_ERR_OK;
+}
+
+/**
+ * Steer a specific UDP port to this queue.
+ *
+ * @param bu Channel struct
+ * @param port Port to allocate (in host byte order)
+ */
+errval_t bulk_e10k_port_add(struct bulk_e10k *bu, uint16_t port)
+{
+ errval_t err;
+
+ // Register port
+ err = port_bind(0, 0, bu->qi, port);
+ assert(err_is_ok(err));
+ DEBUG("Port registered\n");
+
+ return SYS_ERR_OK;
+}
+
+/**
+ * Allocate an unused UDP port and steer it to this queue.
+ *
+ * @param bu Channel struct
+ * @param port Pointer to variable where port number will be stored (host byte
+ * order)
+ */
+errval_t bulk_e10k_port_alloc(struct bulk_e10k *bu, uint16_t *port)
+{
+ return port_get(0, 0, bu->qi, port);
+}
+
+/**
+ * Get IP address configured for this interface.
+ *
+ * @param bu Channel struct
+ * @param ip Pointer to variable where IP will be stored (host byte order)
+ */
+errval_t bulk_e10k_ip_info(struct bulk_e10k *bu, uint32_t *ip)
+{
+ errval_t err;
+ uint32_t gw, mask;
+ err = arp_ip_info(ip, &gw, &mask);
+ *ip = ntohl(*ip);
+ return err;
+}
+
+/**
+ * Do an ARP lookup on this interface
+ *
+ * @param bu Channnel struct
+ * @param ip IP address to resolve (in host byte order)
+ * @param mac Pointer to variable where MAC address will be stored
+ */
+errval_t bulk_e10k_arp_lookup(struct bulk_e10k *bu, uint32_t ip, uint64_t *mac)
+{
+ return arp_lookup(htonl(ip), mac);
+}
+
--- /dev/null
+/**
+ * \file
+ * \brief Proxy for connecting bulk transfer channels over a network connection
+ */
+
+/*
+ * Copyright (c) 2014, ETH Zurich.
+ * All rights reserved.
+ *
+ * This file is distributed under the terms in the attached LICENSE file.
+ * If you do not find this file, copies can be found by writing to:
+ * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
+ */
+
+#ifndef BULK_NET_E10K_H
+#define BULK_NET_E10K_H
+
+#include <barrelfish/barrelfish.h>
+#include <bulk_transfer/bulk_transfer.h>
+
+#include <dev/e10k_dev.h>
+
+#if 0
+
+struct bulk_net_msgdesc;
+struct e10k_binding;
+struct e10k_queue;
+struct bulk_e10k {
+ bool ready;
+ size_t buffer_size;
+ size_t ring_size;
+ uint8_t qi;
+ struct e10k_binding *binding;
+ e10k_t d;
+ struct e10k_queue *q;
+ uint64_t mac;
+ struct capref rxframe;
+ struct capref txframe;
+ void (*received)(struct bulk_e10k *,
+ struct bulk_net_msgdesc *);
+ void (*transmitted)(struct bulk_e10k *,
+ void *);
+
+ void *opaque;
+};
+#endif
+
+#endif
#include <bulk_transfer/bulk_transfer.h>
#include <bulk_transfer/bulk_net.h>
+#include "bulk_net_backend.h"
+
+static char *default_cardname = "e10k";
/**
* Creates a new bulk endpoint which uses the network backend
{
assert(ep_desc);
- ep_desc->ip = setup->ip;
- ep_desc->port = setup->port;
-
- ep_desc->ep_generic.f = bulk_net_get_implementation();
+ if (setup->port == 0 || setup->queue == 0) {
+ return BULK_TRANSFER_INVALID_ARGUMENT;
+ }
+ ep_desc->ip.addr = htonl(setup->ip.addr);
+ ep_desc->port = setup->port;
+ ep_desc->queue = setup->queue;
+
+ if (setup->cardname) {
+ ep_desc->cardname = setup->cardname;
+ } else {
+ ep_desc->cardname = default_cardname;
+ }
+
+ if (setup->buffer_size == 0) {
+ ep_desc->buffer_size = BULK_NET_DEFAULT_BUFFER_SIZE;
+ } else {
+ ep_desc->buffer_size = setup->buffer_size;
+ }
+
+ if (setup->buffer_count == 0) {
+ ep_desc->buffer_count = BULK_NET_DEFAULT_BUFFER_COUNT;
+ } else {
+ ep_desc->buffer_count = setup->buffer_count;
+ }
+
+ if (setup->max_queues == 0) {
+ ep_desc->max_queues = BULK_NET_DEFAULT_QUEUES;
+ } else {
+ ep_desc->max_queues = setup->max_queues;
+ }
+
+
+ if(setup->no_copy) {
+ ep_desc->ep_generic.f = bulk_net_get_impl_no_copy();
+ } else {
+ ep_desc->ep_generic.f = bulk_net_get_impl();
+ }
- /*
- * XXX: Do we want to initialize the network queues and the
- * tcp connection at this point ?
- * Alternatively just prepare it and finalize it when the channel
- * gets created,
- * - RA
- */
return SYS_ERR_OK;
}
* a nameservice like lookup should return the correct endpoint descriptor.
*/
errval_t bulk_net_ep_create_remote(struct bulk_net_endpoint_descriptor *ep_desc,
- struct ip_addr ip, uint16_t port)
+ struct bulk_net_ep_setup *setup)
{
assert(ep_desc);
- ep_desc->ip = ip;
- ep_desc->port = port;
+ assert(ep_desc);
- ep_desc->ep_generic.f = bulk_net_get_implementation();
+ if (setup->port == 0 || setup->queue == 0) {
+ return BULK_TRANSFER_INVALID_ARGUMENT;
+ }
+
+ ep_desc->ip.addr = htonl(setup->ip.addr);
+ ep_desc->port = setup->port;
+ ep_desc->queue = setup->queue;
+
+ if (setup->cardname) {
+ ep_desc->cardname = setup->cardname;
+ } else {
+ ep_desc->cardname = default_cardname;
+ }
+
+ if (setup->buffer_size == 0) {
+ ep_desc->buffer_size = BULK_NET_DEFAULT_BUFFER_SIZE;
+ } else {
+ ep_desc->buffer_size = setup->buffer_size;
+ }
+
+ if (setup->buffer_count == 0) {
+ ep_desc->buffer_count = BULK_NET_DEFAULT_BUFFER_COUNT;
+ } else {
+ ep_desc->buffer_count = setup->buffer_count;
+ }
+
+
+ if(setup->no_copy) {
+ ep_desc->ep_generic.f = bulk_net_get_impl_no_copy();
+ } else {
+ ep_desc->ep_generic.f = bulk_net_get_impl();
+ }
- /*
- * this endpoint is used to specify the remote endpoint to bind to.
- * no need to create a listener for new connections
- *
- * potential receiving queues are created upon channel bind
- */
return SYS_ERR_OK;
}
--- /dev/null
+/*
+ * Copyright (c) 2014 ETH Zurich.
+ * All rights reserved.
+ *
+ * This file is distributed under the terms in the attached LICENSE file.
+ * If you do not find this file, copies can be found by writing to:
+ * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
+ */
+
+#include <stdio.h>
+#include <sys/param.h>
+
+#include <barrelfish/barrelfish.h>
+#include <bulk_transfer/bulk_transfer.h>
+#include <bulk_transfer/bulk_net.h>
+#include <bulk_transfer/bulk_allocator.h>
+#include <ipv4/lwip/inet.h>
+
+#include "../../bulk_pool.h"
+#include "../../bulk_buffer.h"
+#include "../../helpers.h"
+
+#include "bulk_net_backend.h"
+#include "bulk_net_transfer.h"
+
+#if BULK_NET_ENABLE_DEBUG_BACKEND
+#define BNT_DEBUG_TRACE BULK_NET_TRACE
+#define BNT_DEBUG(fmt, msg...) BULK_NET_DEBUG(fmt, msg)
+#else
+#define BNT_DEBUG(fmt, msg...) do{}while(0);
+#define BNT_DEBUG_TRACE do{}while(0);
+#endif
+
+#if BULK_NET_ENABLE_STATUS_BACKEND
+#define BNT_STATUS(fmt, msg...) BULK_NET_STATUS(fmt, msg)
+#else
+#define BNT_STATUS(fmt, msg...) do{} while(0);
+#endif
+
+#define BULK_NET_CTRL_CHANNEL_BUF_SIZE 256
+
+struct bulk_net_nocopy
+{
+ struct bulk_net_control net_ctrl;
+
+ struct bulk_net_nocopy *bulk_control;
+ struct bulk_channel *channel;
+ struct bulk_pool *pool;
+ struct bulk_continuation bind_cont;
+ struct pending_pool_request *pending_pool_requests;
+ struct receive_buffer *meta_rb;
+ errval_t err;
+ bool bound;
+ struct bulk_continuation panic_cont;
+ void *zero_meta;
+
+ void *user_state;
+};
+
+enum proto_msg
+{
+ PROTO_INVALID,
+ PROTO_BIND_REQUEST,
+ PROTO_BIND_RESPONSE,
+ PROTO_POOL_REQUEST,
+ PROTO_POOL_RESPONSE,
+ PROTO_BUFFER_MOVE,
+ PROTO_BUFFER_COPY,
+ PROTO_BUFFER_PASS,
+ PROTO_BUFFER_RELEASE,
+ PROTO_STATUS,
+
+ /* NOT IMPLEMENTED */
+ PROTO_POOL_REMOVE,
+ PROTO_TEARDOWN
+};
+
+struct proto_trail_bind_req
+{
+ uint32_t buffer_size;
+ uint8_t trust_level;
+ uint8_t role;
+ /* XXX: there are no constraints on this channel */
+
+ uint8_t type;
+}__attribute__((packed));
+
+struct proto_trail_bind_resp
+{
+ uint32_t buffer_size; ///< XXX: given by the creator side
+ uint32_t meta_size; ///< XXX: given by the creator side
+ uint8_t direction;
+ uint8_t trust_level;
+ uint8_t role;
+ errval_t err;
+
+ uint8_t type;
+}__attribute__((packed));
+
+struct proto_trail_pool_req
+{
+ uint32_t buffer_count;
+ uint32_t buffer_size;
+ uint32_t pool_machine_id;
+ domainid_t pool_domain_id;
+ uint32_t pool_local_id;
+ uint16_t port;
+
+ uint8_t type;
+}__attribute__((packed));
+
+struct proto_trail_pool_resp
+{
+ errval_t err;
+ uint32_t pool_machine_id;
+ domainid_t pool_domain_id;
+ uint32_t pool_local_id;
+ uint16_t port;
+
+ uint8_t type;
+}__attribute__((packed));
+
+struct proto_trail_move
+{
+ uint32_t buffer_id;
+
+ uint8_t type;
+}__attribute__((packed));
+
+struct proto_trail_copy
+{
+ uint32_t buffer_id;
+
+ uint8_t type;
+}__attribute__((packed));
+
+struct proto_trail_pass
+{
+ uint32_t pool_machine_id;
+ domainid_t pool_domain_id;
+ uint32_t pool_local_id;
+ uint32_t buffer_id;
+
+ uint8_t type;
+}__attribute__((packed));
+
+struct proto_trail_release
+{
+ uint32_t pool_machine_id;
+ domainid_t pool_domain_id;
+ uint32_t pool_local_id;
+ uint32_t buffer_id;
+
+ uint8_t type;
+}__attribute__((packed));
+
+struct proto_trail_status
+{
+ errval_t err;
+
+ uint8_t type;
+}__attribute__((packed));
+
+static void tcb_received(struct bulk_e10k* bu, struct bulk_net_msgdesc *msg);
+static void tcb_transmitted(struct bulk_e10k *bu, void *opaque);
+
+struct pending_pool_request
+{
+ struct bulk_pool *pool;
+ struct bulk_net_nocopy *bnt;
+ struct bulk_continuation cont;
+ struct pending_pool_request *next;
+};
+
+struct bulk_net_pool_data
+{
+ struct bulk_net_nocopy *p;
+ uint32_t *buf_id_local_to_remote;
+ uint32_t *buf_id_remote_to_local;
+};
+
+/* ----------------------------- pools ------------------------------------- */
+static inline struct bulk_net_pool_data *get_pool_data(struct bulk_pool *pool)
+{
+ return ((struct bulk_pool_internal*) pool)->impl_data;
+}
+
+static inline struct bulk_net_nocopy *get_net_nocopy(struct bulk_pool *pool)
+{
+ struct bulk_net_pool_data *pd = get_pool_data(pool);
+ if (pd) {
+ return pd->p;
+ }
+ return NULL;
+}
+
+static inline struct bulk_buffer *get_buffer(struct bulk_channel *chan,
+ struct bulk_pool_id *pool_id,
+ uint32_t buffer_id)
+{
+ struct bulk_pool *pool = bulk_pool_get(pool_id, chan);
+ assert(pool);
+ assert(buffer_id < pool->num_buffers);
+ return pool->buffers[buffer_id];
+}
+
+/* ---------------------------- buffer id translation ---------------------- */
+
+static inline uint32_t get_local_bufid(struct bulk_buffer *buf)
+{
+ return ((lvaddr_t) buf->address - buf->pool->base_address)
+ / buf->pool->buffer_size;
+}
+
+/// XXX: assuming pool goes just over one net
+static inline uint32_t get_remote_bufid(struct bulk_pool *pool,
+ uint32_t local_buf_id)
+{
+ struct bulk_net_pool_data *pd = get_pool_data(pool);
+ assert(pd);
+ return pd->buf_id_local_to_remote[local_buf_id];
+}
+
+static inline void set_remote_bufid(struct bulk_pool *pool,
+ uint32_t local_buf_id,
+ uint32_t remote_buf_id)
+{
+ struct bulk_net_pool_data *pd = get_pool_data(pool);
+ assert(pd);
+ pd->buf_id_local_to_remote[local_buf_id] = remote_buf_id;
+ pd->buf_id_remote_to_local[remote_buf_id] = local_buf_id;
+}
+
+static errval_t bulk_net_init_meta_rb(struct receive_buffer *rbs,
+ uint32_t num,
+ uint32_t size)
+{
+ errval_t err;
+ struct receive_buffer tmp_rb, *rb;
+
+ if (BULK_NET_NOCOPY_META_BUFFER_SIZE) {
+ size = BULK_NET_NOCOPY_META_BUFFER_SIZE;
+ }
+
+ err = allocmap_frame(num * size, &tmp_rb.virt, &tmp_rb.phys, NULL);
+ assert(err_is_ok(err));
+
+ for (uint32_t j = 0; j < num; ++j) {
+ rb = rbs + j;
+ rb->buffer = NULL;
+ rb->is_meta = true;
+ rb->virt = tmp_rb.virt + (j * size);
+ rb->phys = tmp_rb.phys + (j * size);
+ }
+ return SYS_ERR_OK;
+}
+
+/* --------------------------- binding ------------------------------------- */
+
+static void send_bind_response(struct bulk_net_nocopy *p,
+ uint32_t buffer_size,
+ uint32_t meta_size,
+ uint8_t direction,
+ uint8_t role,
+ uint8_t trust,
+ errval_t err)
+{
+ BNT_DEBUG_TRACE
+
+ struct proto_trail_bind_resp *t;
+ struct transmit_buffer *tb;
+ struct bulk_net_msgdesc msg;
+
+ tb = stack_alloc_alloc(&p->net_ctrl.tb_stack);
+ assert(tb != NULL);
+
+ t = tb->int_virt;
+ t->err = err;
+ t->buffer_size = buffer_size;
+ t->meta_size = meta_size;
+ t->direction = direction;
+ t->role = role;
+ t->trust_level = trust;
+ t->type = PROTO_BIND_RESPONSE;
+
+ msg.parts[1].phys = tb->int_phys;
+ msg.parts[1].size = sizeof(*t);
+ msg.parts[1].opaque = tb;
+ msg.parts[2].size = 0;
+
+ bulk_net_transfer_add_header(&msg);
+ err = bulk_e10k_send(&p->net_ctrl.transfer, &msg);
+ assert(err_is_ok(err));
+}
+
+static void handle_bind_response(struct bulk_net_nocopy *p,
+ struct proto_trail_bind_resp *t,
+ struct bulk_net_msgdesc *msg)
+{
+ BNT_DEBUG_TRACE
+
+ struct bulk_channel *chan = p->channel;
+
+ if (p->bound) {
+ BNT_DEBUG("channel [%p] already bound. request ignored.", chan);
+ goto free_rx;
+ }
+
+ assert(chan->state == BULK_STATE_BINDING);
+
+ chan->meta_size = t->meta_size;
+ chan->trust = t->trust_level;
+ chan->role = t->role;
+ chan->direction = t->direction;
+
+ if (err_is_fail(t->err)) {
+ BNT_STATUS("ERROR: binding failed on channel [%p].", chan);
+ chan->state = BULK_STATE_CLOSED;
+ } else {
+ BNT_STATUS("SUCCESS: channel [%p] bound.", chan);
+ chan->state = BULK_STATE_CONNECTED;
+ p->bound = true;
+ }
+
+ if (p->bind_cont.handler) {
+ p->bind_cont.handler(p->bind_cont.arg, t->err, p->channel);
+ }
+
+ free_rx: bulk_net_transfer_free_rx(&p->net_ctrl, msg);
+}
+
+static void send_bind_request(struct bulk_net_nocopy *p,
+ uint32_t buffer_size,
+ uint8_t trust_level,
+ uint8_t role)
+{
+ BNT_DEBUG_TRACE
+
+ errval_t err;
+
+ struct proto_trail_bind_req *t;
+ struct transmit_buffer *tb;
+ struct bulk_net_msgdesc msg;
+
+ tb = stack_alloc_alloc(&p->net_ctrl.tb_stack);
+ assert(tb != NULL);
+
+ t = tb->int_virt;
+ t->buffer_size = buffer_size;
+ t->role = role;
+ t->trust_level = trust_level;
+ t->type = PROTO_BIND_REQUEST;
+
+ msg.parts[1].phys = tb->int_phys;
+ msg.parts[1].size = sizeof(*t);
+ msg.parts[1].opaque = tb;
+ msg.parts[2].size = 0;
+
+ bulk_net_transfer_add_header(&msg);
+ err = bulk_e10k_send(&p->net_ctrl.transfer, &msg);
+ assert(err_is_ok(err));
+}
+
+static void handle_bind_request(struct bulk_net_nocopy *p,
+ struct proto_trail_bind_req *t,
+ struct bulk_net_msgdesc *msg)
+{
+ BNT_DEBUG_TRACE
+
+ errval_t err;
+
+ assert(p->bulk_control == p);
+
+ struct receive_buffer *rb = msg->parts[0].opaque;
+ struct packet_header *hdr = rb->hdr_virt;
+
+ if (p->bound) {
+ BNT_DEBUG("channel [%p] already bound. request ignored.", p->channel);
+
+ goto free_rx;
+ }
+
+ /* update mac address */
+ p->net_ctrl.r_mac = 0;
+ memcpy(&p->net_ctrl.r_mac, hdr->l2.smac, 6);
+
+ /* set the remote ip and ports */
+ p->net_ctrl.r_ip = ntohl(hdr->l3.s_ip);
+ p->net_ctrl.r_port = ntohs(hdr->l4.s_port);
+
+ /* update the TX headers */
+ bulk_net_transfer_update_tx_headers(&p->net_ctrl);
+
+ if (t->buffer_size != p->net_ctrl.buffer_size) {
+ BNT_DEBUG("ERROR: wrong buffer size: [%x] [%x]", t->buffer_size,
+ (uint32_t )p->net_ctrl.buffer_size);
+ err = BULK_TRANSFER_ALLOC_BUFFER_SIZE;
+ goto send_and_free;
+ }
+
+ /* update the roles */
+ if (p->channel->role == BULK_ROLE_GENERIC) {
+ if (t->role == BULK_ROLE_GENERIC) {
+ p->channel->role = BULK_ROLE_MASTER;
+ } else {
+ p->channel->role = bulk_role_other(t->role);
+ }
+ }
+
+ /* update the trust level */
+ if (p->channel->trust != t->trust_level) {
+ /* TODO: chose appropriate trust level */
+ if (p->channel->trust == BULK_TRUST_FULL) {
+ p->channel->trust = t->trust_level;
+ } else if (p->channel->trust == BULK_TRUST_HALF) {
+ if (t->trust_level == BULK_TRUST_NONE) {
+ p->channel->trust = BULK_TRUST_NONE;
+ }
+ }
+ }
+
+ /* do the callback tot he application */
+ err = p->channel->callbacks->bind_received(p->channel);
+
+ /* update the connectoin state */
+ p->channel->state = BULK_STATE_CONNECTED;
+ p->bound = true;
+
+ send_and_free: send_bind_response(
+ p, p->net_ctrl.buffer_size, p->channel->meta_size,
+ bulk_direction_other(p->channel->direction),
+ bulk_role_other(p->channel->role), p->channel->trust, err);
+
+ free_rx: bulk_net_transfer_free_rx(&p->net_ctrl, msg);
+}
+
+/* -------------------------- pool assignment -------------------------------*/
+
+static void send_pool_assign_response(struct bulk_net_nocopy *p,
+ errval_t err,
+ struct bulk_pool *pool,
+ uint16_t l_port)
+{
+ BNT_DEBUG_TRACE
+
+ struct proto_trail_pool_resp *t;
+ struct transmit_buffer *tb;
+ struct bulk_net_msgdesc msg;
+
+ tb = stack_alloc_alloc(&p->net_ctrl.tb_stack);
+ assert(tb != NULL);
+
+ t = tb->int_virt;
+ t->err = err;
+
+ t->pool_domain_id = pool->id.dom;
+ t->pool_machine_id = pool->id.machine;
+ t->pool_local_id = pool->id.local;
+
+ t->port = l_port;
+
+ t->type = PROTO_POOL_RESPONSE;
+
+ msg.parts[1].phys = tb->int_phys;
+ msg.parts[1].size = sizeof(*t);
+ msg.parts[1].opaque = tb;
+ msg.parts[2].size = 0;
+
+ bulk_net_transfer_add_header(&msg);
+ err = bulk_e10k_send(&p->net_ctrl.transfer, &msg);
+ assert(err_is_ok(err));
+}
+
+static void handle_pool_assign_response(struct bulk_net_nocopy *p,
+ struct proto_trail_pool_resp *t,
+ struct bulk_net_msgdesc *msg)
+{
+ BNT_DEBUG_TRACE
+
+ errval_t err;
+
+ assert(p->bulk_control == p);
+
+ struct pending_pool_request *ppr = p->pending_pool_requests;
+ struct pending_pool_request *prev = NULL;
+
+ struct bulk_pool_id id = {
+ .dom = t->pool_domain_id,
+ .machine = t->pool_machine_id,
+ .local = t->pool_local_id };
+
+ while (ppr) {
+ if (bulk_pool_cmp_id(&id, &ppr->pool->id) == 0) {
+ if (prev == NULL) {
+ p->pending_pool_requests = ppr->next;
+ } else {
+ prev->next = ppr->next;
+ }
+ break;
+ }
+ prev = ppr;
+ ppr = ppr->next;
+ }
+
+ if (ppr == NULL) {
+ BNT_DEBUG("ERROR: no pending binding request (ignored). [%i, %i]",
+ (uint32_t )id.dom, id.local);
+ goto free_and_cont;
+ }
+
+ struct bulk_pool *pool = ppr->pool;
+ if (err_is_fail(t->err)) {
+ BNT_STATUS("FAILED: Pool [%x, %x, %x] assign to channel [%p] vetoed",
+ pool->id.machine, pool->id.dom, pool->id.local, p->channel);
+ goto free_and_cont;
+ }
+
+ err = bulk_pool_assign(pool, p->channel);
+ if (err_is_fail(err)) {
+ BNT_STATUS("FAILED: Pool [%x, %x, %x] assignment to channel [%p] \n%s",
+ pool->id.machine, pool->id.dom, pool->id.local, p->channel,
+ err_getstring(err));
+ goto free_and_cont;
+ }
+
+ /* update status values */
+ struct bulk_net_nocopy *bnt = ppr->bnt;
+ bnt->bound = true;
+
+ /* update port information */
+ bnt->net_ctrl.r_port = t->port;
+ bulk_net_transfer_update_tx_headers(&bnt->net_ctrl);
+
+ BNT_STATUS("SUCCESS: Pool [%x, %x, %x] assigned to channel [%p]",
+ pool->id.machine, pool->id.dom, pool->id.local, p->channel);
+
+ free_and_cont:
+
+ if (ppr->cont.handler) {
+ ppr->cont.handler(ppr->cont.arg, t->err, p->channel);
+ }
+
+ if (err_is_fail(t->err)) {
+ assert(!"NYI: Cleaning up of network structs...");
+ }
+
+ free(ppr);
+ bulk_net_transfer_free_rx(&p->net_ctrl, msg);
+}
+
+static void send_pool_assign_request(struct bulk_net_nocopy *p,
+ struct bulk_pool *pool,
+ uint16_t l_port)
+{
+ BNT_DEBUG_TRACE
+
+ errval_t err;
+
+ struct proto_trail_pool_req *t;
+ struct transmit_buffer *tb;
+ struct bulk_net_msgdesc msg;
+
+ tb = stack_alloc_alloc(&p->net_ctrl.tb_stack);
+ assert(tb != NULL);
+
+ t = tb->int_virt;
+ t->buffer_size = pool->buffer_size;
+ t->buffer_count = pool->num_buffers;
+ t->pool_domain_id = pool->id.dom;
+ t->pool_machine_id = pool->id.machine;
+ t->pool_local_id = pool->id.local;
+ t->port = l_port;
+ t->type = PROTO_POOL_REQUEST;
+
+ msg.parts[1].phys = tb->int_phys;
+ msg.parts[1].size = sizeof(*t);
+ msg.parts[1].opaque = tb;
+ msg.parts[2].size = 0;
+
+ bulk_net_transfer_add_header(&msg);
+ err = bulk_e10k_send(&p->net_ctrl.transfer, &msg);
+ assert(err_is_ok(err));
+
+}
+
+static void handle_pool_assign_request(struct bulk_net_nocopy *p,
+ struct proto_trail_pool_req *t,
+ struct bulk_net_msgdesc *msg)
+{
+ BNT_DEBUG_TRACE
+
+ assert(p->bulk_control == p);
+
+ errval_t err;
+ uint16_t port = 0;
+ uint8_t first_assignment = 0;
+
+ struct bulk_net_pool_data *pd = NULL;
+ struct bulk_net_nocopy *bnt = NULL;
+
+ /* calculate the new queue */
+ uint8_t queueid = p->net_ctrl.queue + p->net_ctrl.num_queues;
+ p->net_ctrl.num_queues++;
+
+ /* check if the pool is already present in the domain */
+ struct bulk_pool_id id = {
+ .dom = t->pool_domain_id,
+ .machine = t->pool_machine_id,
+ .local = t->pool_local_id };
+
+ struct bulk_pool *pool = bulk_pool_domain_list_get(&id);
+
+ if (p->net_ctrl.num_queues == p->net_ctrl.max_queues) {
+ err = BULK_TRANSFER_NET_MAX_QUEUES;
+ goto send_and_free;
+ }
+
+ /* there is no such pool */
+ if (pool == NULL) {
+ struct bulk_allocator pool_alloc;
+
+ struct bulk_pool_constraints constr = {
+ .range_min = p->channel->constraints.mem_range_min,
+ .range_max = p->channel->constraints.mem_range_max,
+ .alignment = p->channel->constraints.men_align,
+ .trust = p->channel->trust };
+
+ err = bulk_alloc_init(&pool_alloc, t->buffer_count, t->buffer_size,
+ &constr);
+ if (err_is_fail(err)) {
+ DEBUG_ERR(err, "Failed to allocate memory for the pool\n");
+ goto send_and_free;
+ }
+
+ /* Free allocator memory*/
+ free(pool_alloc.mngs);
+
+ /* overwrite the ID */
+ pool = pool_alloc.pool;
+ pool->id = id;
+
+ first_assignment = 1;
+
+ BNT_DEBUG("New pool allocated: [%x, %x, %x]", pool->id.machine,
+ pool->id.dom, pool->id.local)
+
+ } else {
+ BNT_DEBUG("Pool already present in domain: [%x, %x, %x]",
+ pool->id.machine, pool->id.dom, pool->id.local);
+ if (get_net_nocopy(pool)) {
+ err = BULK_TRANSFER_NET_POOL_USED;
+ goto send_and_free;
+ }
+
+ if (bulk_pool_is_assigned(pool, p->channel)) {
+ err = BULK_TRANSFER_POOL_ALREADY_ASSIGNED;
+ goto send_and_free;
+ }
+ }
+
+ /* we have a pool and this pool does not go over the network channel */
+
+ struct bulk_pool_internal *pool_int = (struct bulk_pool_internal *) pool;
+
+ assert(!pool_int->impl_data);
+
+ size_t pd_size = sizeof(struct bulk_net_pool_data)
+ + 2 * t->buffer_count * sizeof(uint32_t);
+
+ pd = malloc(pd_size);
+
+ if (!pd) {
+ err = BULK_TRANSFER_MEM;
+ goto send_and_free;
+ }
+
+ pd->buf_id_local_to_remote = (uint32_t *) (pd + 1);
+ pd->buf_id_remote_to_local = (pd->buf_id_local_to_remote + t->buffer_count);
+
+ for (uint32_t i = 0; i < t->buffer_count; ++i) {
+ pd->buf_id_remote_to_local[i] = 0;
+ pd->buf_id_local_to_remote[i] = 0;
+ }
+
+ pool_int->impl_data = pd;
+
+ bnt =
+ calloc(1,
+ sizeof(struct bulk_net_nocopy)
+ + t->buffer_count
+ * sizeof(struct receive_buffer));
+ if (!bnt) {
+ err = BULK_TRANSFER_MEM;
+ goto send_and_free;
+ }
+
+ bnt->meta_rb = (struct receive_buffer *) (bnt + 1);
+ err = bulk_net_init_meta_rb(bnt->meta_rb, t->buffer_count,
+ pool->buffer_size);
+ assert(!err_is_fail(err));
+
+ memcpy(&bnt->net_ctrl, &p->net_ctrl, sizeof(bnt->net_ctrl));
+ bnt->net_ctrl.queue = queueid;
+ bnt->net_ctrl.buffer_count = 0;
+
+ pd->p = bnt;
+ bnt->net_ctrl.r_port = t->port;
+ /* this is the control channel, has just two buffers */
+
+ bnt->bulk_control = p;
+ bnt->channel = p->channel;
+ bnt->pool = pool;
+
+ err = bulk_net_transfer_bind(&bnt->net_ctrl, tcb_transmitted, tcb_received);
+ if (err_is_fail(err)) {
+ goto send_and_free;
+ }
+
+ err = p->channel->callbacks->pool_assigned(p->channel, pool);
+ if (err_is_fail(err)) {
+ BNT_STATUS("VETO: Pool [%x, %x, %x] not assigned to channel [%p]",
+ pool->id.machine, pool->id.dom, pool->id.local, p->channel);
+ goto send_and_free;
+ }
+
+ err = bulk_pool_assign(pool, p->channel);
+ assert(!err_is_fail(err)); // should not fail
+
+ BNT_STATUS("SUCCESS: Pool [%x, %x, %x] assigned to channel [%p]",
+ pool->id.machine, pool->id.dom, pool->id.local, p->channel);
+
+ /* update status */
+ bnt->bound = true;
+
+ /* we must make sure that the buffers are ready for receiving */
+ if (p->channel->direction == BULK_DIRECTION_RX) {
+ BNT_STATUS("Adding %i receive buffers.", (uint32_t )pool->num_buffers);
+ for (uint32_t i = 0; i < pool->num_buffers; ++i) {
+ struct receive_buffer *rb;
+ struct bulk_buffer *buffer = pool->buffers[i];
+ rb = stack_alloc_alloc(&bnt->net_ctrl.rb_stack);
+ assert(rb != NULL);
+
+ rb->virt = buffer->address;
+ rb->phys = buffer->phys;
+ rb->buffer = buffer;
+
+ err = bulk_e10k_rx_add(&bnt->net_ctrl.transfer, rb->phys,
+ rb->hdr_phys, rb);
+ assert(err_is_ok(err));
+
+ rb = bnt->meta_rb + i;
+ err = bulk_e10k_rx_add(&bnt->net_ctrl.transfer, rb->phys,
+ rb->hdr_phys, rb);
+ assert(err_is_ok(err));
+ }
+
+ }
+
+ port = bnt->net_ctrl.l_port;
+
+ if (!pool) {
+ struct bulk_pool tmp_pool;
+ tmp_pool.id = id;
+ pool = &tmp_pool;
+ }
+
+ send_and_free: send_pool_assign_response(p, err, pool, port);
+
+ if (err_is_fail(err)) {
+ if (pd) {
+ free(pd);
+ }
+ if (bnt) {
+ /* TODO: Free up net resources */
+ free(bnt);
+ }
+ if (first_assignment) {
+ bulk_pool_dealloc(pool);
+ }
+ }
+
+ bulk_net_transfer_free_rx(&p->net_ctrl, msg);
+}
+
+/* ---------------------------- move operation ----------------------------- */
+
+static void send_buffer_move(struct bulk_net_nocopy *p,
+ struct bulk_buffer *b,
+ void *meta,
+ struct bulk_continuation cont)
+{
+ BNT_DEBUG_TRACE
+
+ assert(p->bulk_control != p);
+ assert(p->channel->direction == BULK_DIRECTION_TX);
+
+ errval_t err;
+ struct proto_trail_move *t;
+ struct transmit_buffer *tb_d, *tb;
+ struct bulk_net_msgdesc msg;
+
+ tb_d = stack_alloc_alloc(&p->net_ctrl.tb_stack);
+ assert(tb_d != NULL);
+ tb_d->buffer = b;
+ tb_d->is_copy = false;
+ tb_d->cont = cont;
+
+ // prepare trailer
+ tb = stack_alloc_alloc(&p->net_ctrl.tb_stack);
+ assert(tb != NULL);
+
+ if (meta != NULL) {
+ memcpy(tb->int_virt, meta, p->channel->meta_size);
+ } else {
+ memset(tb->int_virt, 0, p->channel->meta_size);
+ }
+ t = (void *) ((uint8_t *) tb->int_virt + p->channel->meta_size);
+ t->type = PROTO_BUFFER_MOVE;
+
+ uint32_t local_id = ((lvaddr_t) b->address - b->pool->base_address)
+ / b->pool->buffer_size;
+ t->buffer_id = get_remote_bufid(b->pool, local_id);
+
+ msg.parts[1].phys = b->phys;
+ msg.parts[1].size = p->net_ctrl.buffer_size;
+ msg.parts[1].opaque = tb_d;
+ msg.parts[2].phys = tb->int_phys;
+ msg.parts[2].size = sizeof(*t) + p->channel->meta_size;
+ msg.parts[2].opaque = tb;
+ msg.parts[3].size = 0;
+
+ bulk_net_transfer_add_header(&msg);
+ err = bulk_e10k_send(&p->net_ctrl.transfer, &msg);
+ assert(err_is_ok(err));
+}
+
+static void handle_buffer_move(struct bulk_net_nocopy *p,
+ struct proto_trail_move *t,
+ struct bulk_net_msgdesc *msg)
+{
+ BNT_DEBUG_TRACE
+
+ assert(p->bulk_control != p);
+
+ errval_t err;
+ struct receive_buffer *rb;
+ struct bulk_buffer *buffer;
+
+ rb = msg->parts[1].opaque;
+ buffer = rb->buffer;
+ stack_alloc_free(&p->net_ctrl.rb_stack, rb);
+
+ uint32_t local_id =
+ ((lvaddr_t) buffer->address - buffer->pool->base_address)
+ / buffer->pool->buffer_size;
+
+ set_remote_bufid(buffer->pool, local_id, t->buffer_id);
+
+ err = bulk_buffer_change_state(buffer, BULK_BUFFER_READ_WRITE);
+ assert(!err_is_fail(err));
+
+ rb = msg->parts[2].opaque;
+
+ p->channel->callbacks->move_received(p->channel, buffer, rb->virt);
+
+ assert(rb->is_meta == true);
+
+ // bulk_net_transfer_free_rb(&p->net_ctrl, rb);
+}
+
+/* ----------------------------- copy operation ---------------------------- */
+
+static void send_buffer_copy(struct bulk_net_nocopy *p,
+ struct bulk_buffer *b,
+ void *meta,
+ struct bulk_continuation cont)
+{
+ BNT_DEBUG_TRACE
+
+ assert(p->bulk_control != p);
+ assert(p->channel->direction == BULK_DIRECTION_TX);
+
+ errval_t err;
+ struct proto_trail_copy *t;
+ struct transmit_buffer *tb_d, *tb;
+ struct bulk_net_msgdesc msg;
+
+ tb_d = stack_alloc_alloc(&p->net_ctrl.tb_stack);
+ assert(tb_d != NULL);
+ tb_d->buffer = b;
+ tb_d->is_copy = true;
+ tb_d->cont = cont;
+
+ // prepare trailer
+ tb = stack_alloc_alloc(&p->net_ctrl.tb_stack);
+ assert(tb != NULL);
+
+ if (meta != NULL) {
+ memcpy(tb->int_virt, meta, p->channel->meta_size);
+ } else {
+ memset(tb->int_virt, 0, p->channel->meta_size);
+ }
+ t = (void *) ((uint8_t *) tb->int_virt + p->channel->meta_size);
+ t->type = PROTO_BUFFER_COPY;
+
+ uint32_t local_id = ((lvaddr_t) b->address - b->pool->base_address)
+ / b->pool->buffer_size;
+ t->buffer_id = get_remote_bufid(b->pool, local_id);
+
+ msg.parts[1].phys = b->phys;
+ msg.parts[1].size = p->net_ctrl.buffer_size;
+ msg.parts[1].opaque = tb_d;
+ msg.parts[2].phys = tb->int_phys;
+ msg.parts[2].size = sizeof(*t) + p->channel->meta_size;
+ msg.parts[2].opaque = tb;
+ msg.parts[3].size = 0;
+
+ bulk_net_transfer_add_header(&msg);
+ err = bulk_e10k_send(&p->net_ctrl.transfer, &msg);
+ assert(err_is_ok(err));
+}
+
+static void handle_buffer_copy(struct bulk_net_nocopy *p,
+ struct proto_trail_copy *t,
+ struct bulk_net_msgdesc *msg)
+{
+ BNT_DEBUG_TRACE
+
+ assert(p->bulk_control != p);
+
+ errval_t err;
+ struct receive_buffer *rb;
+
+ rb = msg->parts[2].opaque;
+
+ struct bulk_buffer *buf = rb->buffer;
+
+ assert(buf);
+
+ uint32_t local_id = ((lvaddr_t) buf->address - buf->pool->base_address)
+ / buf->pool->buffer_size;
+
+ set_remote_bufid(buf->pool, local_id, t->buffer_id);
+
+ enum bulk_buffer_state st = BULK_BUFFER_READ_ONLY;
+ if (bulk_buffer_is_owner(buf)) {
+ st = BULK_BUFFER_RO_OWNED;
+ }
+ err = bulk_buffer_change_state(buf, st);
+ assert(!err_is_fail(err));
+
+ p->channel->callbacks->copy_received(p->channel, buf, rb->virt);
+
+ assert(rb->is_meta == true);
+
+ // bulk_net_transfer_free_rb(&p->net_ctrl, rb);
+}
+
+/* ------------------------------ pass operation --------------------------- */
+
+static void send_buffer_pass(struct bulk_net_nocopy *p,
+ struct bulk_buffer *b,
+ void *meta,
+ struct bulk_continuation cont)
+{
+ BNT_DEBUG_TRACE
+
+ assert(p->bulk_control == p);
+
+ errval_t err;
+ struct proto_trail_pass *t;
+ struct transmit_buffer *tb;
+ struct bulk_net_msgdesc msg;
+
+ // prepare trailer
+ tb = stack_alloc_alloc(&p->net_ctrl.tb_stack);
+ assert(tb != NULL);
+ tb->cont = cont;
+ tb->buffer = b;
+ if (meta != NULL) {
+ memcpy(tb->int_virt, meta, p->channel->meta_size);
+ } else {
+ memset(tb->int_virt, 0, p->channel->meta_size);
+ }
+ t = (void *) ((uint8_t *) tb->int_virt + p->channel->meta_size);
+ t->type = PROTO_BUFFER_PASS;
+ t->pool_domain_id = b->pool->id.dom;
+ t->pool_local_id = b->pool->id.local;
+ t->pool_machine_id = b->pool->id.machine;
+
+ uint32_t local_id = ((lvaddr_t) b->address - b->pool->base_address)
+ / b->pool->buffer_size;
+
+ t->buffer_id = get_remote_bufid(b->pool, local_id);
+
+ msg.parts[1].phys = tb->int_phys;
+ msg.parts[1].size = sizeof(*t) + p->channel->meta_size;
+ msg.parts[1].opaque = tb;
+ msg.parts[2].size = 0;
+
+ bulk_net_transfer_add_header(&msg);
+ err = bulk_e10k_send(&p->net_ctrl.transfer, &msg);
+ assert(err_is_ok(err));
+}
+
+static void handle_buffer_pass(struct bulk_net_nocopy *p,
+ struct proto_trail_pass *t,
+ struct bulk_net_msgdesc *msg)
+{
+ BNT_DEBUG_TRACE
+
+ assert(p->bulk_control == p);
+
+ errval_t err;
+ struct receive_buffer *rb;
+
+ struct bulk_pool_id id = {
+ .machine = t->pool_machine_id,
+ .local = t->pool_local_id,
+ .dom = t->pool_domain_id, };
+
+ struct bulk_buffer *buf = get_buffer(p->channel, &id, t->buffer_id);
+
+ assert(buf);
+
+ uint32_t local_id = ((lvaddr_t) buf->address - buf->pool->base_address)
+ / buf->pool->buffer_size;
+
+ set_remote_bufid(buf->pool, local_id, t->buffer_id);
+
+ err = bulk_buffer_change_state(buf, BULK_BUFFER_READ_WRITE);
+ assert(!err_is_fail(err));
+
+ rb = msg->parts[1].opaque;
+
+ p->channel->callbacks->buffer_received(p->channel, buf, rb->virt);
+
+ bulk_net_transfer_free_rb(&p->net_ctrl, rb);
+}
+
+/* ----------------------------- release operation ------------------------- */
+
+static void send_buffer_release(struct bulk_net_nocopy *p,
+ struct bulk_buffer *b,
+ void *meta,
+ struct bulk_continuation cont)
+{
+ BNT_DEBUG_TRACE
+
+ errval_t err;
+ struct proto_trail_release *t;
+ struct transmit_buffer *tb;
+ struct bulk_net_msgdesc msg;
+
+ assert(p->bulk_control == p);
+
+ // prepare trailer
+ tb = stack_alloc_alloc(&p->net_ctrl.tb_stack);
+ assert(tb != NULL);
+ tb->cont = cont;
+ tb->buffer = b;
+ if (meta != NULL) {
+ memcpy(tb->int_virt, meta, p->channel->meta_size);
+ } else {
+ memset(tb->int_virt, 0, p->channel->meta_size);
+ }
+ t = (void *) ((uint8_t *) tb->int_virt + p->channel->meta_size);
+ t->type = PROTO_BUFFER_RELEASE;
+ t->pool_domain_id = b->pool->id.dom;
+ t->pool_local_id = b->pool->id.local;
+ t->pool_machine_id = b->pool->id.machine;
+
+ uint32_t local_id = ((lvaddr_t) b->address - b->pool->base_address)
+ / b->pool->buffer_size;
+ t->buffer_id = get_remote_bufid(b->pool, local_id);
+
+ msg.parts[1].phys = tb->int_phys;
+ msg.parts[1].size = sizeof(*t) + p->channel->meta_size;
+ msg.parts[1].opaque = tb;
+ msg.parts[2].size = 0;
+
+ bulk_net_transfer_add_header(&msg);
+ err = bulk_e10k_send(&p->net_ctrl.transfer, &msg);
+ assert(err_is_ok(err));
+}
+
+static void handle_buffer_release(struct bulk_net_nocopy *p,
+ struct proto_trail_release *t,
+ struct bulk_net_msgdesc *msg)
+{
+ BNT_DEBUG_TRACE
+
+ assert(p->bulk_control == p);
+
+ errval_t err;
+ struct receive_buffer *rb;
+
+ struct bulk_pool_id id = {
+ .machine = t->pool_machine_id,
+ .local = t->pool_local_id,
+ .dom = t->pool_domain_id, };
+
+ struct bulk_buffer *buf = get_buffer(p->channel, &id, t->buffer_id);
+
+ assert(buf);
+
+ uint32_t local_id = ((lvaddr_t) buf->address - buf->pool->base_address)
+ / buf->pool->buffer_size;
+
+ set_remote_bufid(buf->pool, local_id, t->buffer_id);
+
+ rb = msg->parts[1].opaque;
+
+ buf->local_ref_count--;
+
+ if (buf->state == BULK_BUFFER_RO_OWNED && bulk_buffer_can_release(buf)) {
+ err = bulk_buffer_change_state(buf, BULK_BUFFER_READ_WRITE);
+ assert(!err_is_fail(err));
+ }
+
+ p->channel->callbacks->copy_released(p->channel, buf);
+
+ bulk_net_transfer_free_rb(&p->net_ctrl, rb);
+}
+
+/* ---------------------------- status message ----------------------------- */
+static void send_status_msg(void)
+{
+
+}
+
+static void handle_status_msg(struct bulk_net_nocopy *p,
+ struct proto_trail_status *t,
+ struct bulk_net_msgdesc *msg)
+{
+
+}
+
+/* ------------------------ network managements ---------------------------- */
+static void tcb_transmitted(struct bulk_e10k *bu, void *opaque)
+{
+ BNT_DEBUG_TRACE
+
+ struct bulk_net_nocopy *p = bu->opaque;
+ struct transmit_buffer *tb = opaque;
+
+ if (opaque == NULL) {
+ // We can ignore the header buffers
+ return;
+ }
+
+ if (tb->buffer != NULL) {
+ if (tb->cont.handler) {
+ tb->cont.handler(tb->cont.arg, SYS_ERR_OK, p->channel);
+ }
+ tb->buffer = NULL;
+ tb->cont = BULK_CONT_NOP;
+ }
+
+ stack_alloc_free(&p->net_ctrl.tb_stack, tb);
+}
+
+static void tcb_received(struct bulk_e10k* bu, struct bulk_net_msgdesc *msg)
+{
+ BNT_DEBUG_TRACE
+
+ struct bulk_net_nocopy *p = bu->opaque;
+ size_t i;
+ struct receive_buffer *rb;
+ uint8_t *t;
+
+ assert(msg->parts[0].size == sizeof(struct packet_header));
+ bulk_net_transfer_strip_padding(msg);
+
+ for (i = 0; i < BULK_NET_DESCLEN && msg->parts[i].size != 0; i++)
+ ;
+ i--;
+
+ rb = msg->parts[i].opaque;
+ t = rb->virt;
+
+ switch (t[msg->parts[i].size - 1]) {
+ case PROTO_BIND_REQUEST:
+ handle_bind_request(
+ p,
+ (struct proto_trail_bind_req *) (t
+ + msg->parts[i].size
+ - sizeof(struct proto_trail_bind_req)),
+ msg);
+ break;
+ case PROTO_BIND_RESPONSE:
+ handle_bind_response(
+ p,
+ (struct proto_trail_bind_resp *) (t
+ + msg->parts[i].size
+ - sizeof(struct proto_trail_bind_resp)),
+ msg);
+ break;
+ case PROTO_POOL_REQUEST:
+ handle_pool_assign_request(
+ p,
+ (struct proto_trail_pool_req *) (t
+ + msg->parts[i].size
+ - sizeof(struct proto_trail_pool_req)),
+ msg);
+ break;
+ case PROTO_POOL_RESPONSE:
+ handle_pool_assign_response(
+ p,
+ (struct proto_trail_pool_resp *) (t
+ + msg->parts[i].size
+ - sizeof(struct proto_trail_pool_resp)),
+ msg);
+ break;
+ case PROTO_BUFFER_MOVE:
+ handle_buffer_move(
+ p,
+ (struct proto_trail_move *) (t + msg->parts[i].size
+ - sizeof(struct proto_trail_move)),
+ msg);
+ break;
+ case PROTO_BUFFER_COPY:
+ handle_buffer_copy(
+ p,
+ (struct proto_trail_copy *) (t + msg->parts[i].size
+ - sizeof(struct proto_trail_copy)),
+ msg);
+ break;
+ case PROTO_BUFFER_PASS:
+ handle_buffer_pass(
+ p,
+ (struct proto_trail_pass *) (t + msg->parts[i].size
+ - sizeof(struct proto_trail_pass)),
+ msg);
+ break;
+ case PROTO_BUFFER_RELEASE:
+ handle_buffer_release(
+ p,
+ (struct proto_trail_release *) (t
+ + msg->parts[i].size
+ - sizeof(struct proto_trail_release)),
+ msg);
+ break;
+ case PROTO_STATUS:
+ handle_status_msg(
+ p,
+ (struct proto_trail_status *) (t
+ + msg->parts[i].size
+ - sizeof(struct proto_trail_status)),
+ msg);
+ break;
+ default:
+ USER_PANIC("Unsupported Request")
+ break;
+
+ }
+}
+
+/* --------------------- implementation callbacks -------------------------- */
+
+static errval_t impl_channel_create(struct bulk_channel *channel)
+{
+ BNT_DEBUG_TRACE
+
+ errval_t err;
+
+ BNT_STATUS("Creating new bulk channel [%p] using net.no-copy backend",
+ channel);
+
+ struct bulk_net_nocopy *p = calloc(1, sizeof(struct bulk_net_nocopy));
+ if (p == NULL) {
+ return BULK_TRANSFER_MEM;
+ }
+
+ struct bulk_net_endpoint_descriptor *ep =
+ (struct bulk_net_endpoint_descriptor *) channel->ep;
+
+ p->net_ctrl.card = ep->cardname;
+ p->net_ctrl.l_port = ep->port;
+ p->net_ctrl.queue = ep->queue;
+ p->net_ctrl.ws = channel->waitset;
+ p->net_ctrl.buffer_size = BULK_NET_CTRL_CHANNEL_BUF_SIZE;
+ /* this is the control channel, has just two buffers */
+ p->net_ctrl.buffer_count = ep->buffer_count;
+ p->net_ctrl.max_queues = ep->max_queues;
+ p->net_ctrl.num_queues = 1;
+
+ p->bulk_control = p;
+
+ err = bulk_net_transfer_export(&p->net_ctrl, tcb_transmitted, tcb_received);
+ if (err_is_fail(err)) {
+ free(p);
+ return err;
+ }
+
+ p->net_ctrl.buffer_size = ep->buffer_size;
+
+ channel->state = BULK_STATE_BINDING;
+ p->channel = channel;
+ channel->impl_data = p;
+
+ return err;
+}
+
+static errval_t impl_channel_bind(struct bulk_channel *channel,
+ struct bulk_continuation cont)
+{
+ BNT_DEBUG_TRACE
+
+ errval_t err;
+
+ BNT_STATUS("Binding new bulk channel [%p] using net.no-copy backend",
+ channel);
+
+ struct bulk_net_nocopy *bnt = calloc(1, sizeof(struct bulk_net_nocopy));
+ if (!bnt) {
+ return BULK_TRANSFER_MEM;
+ }
+
+ struct bulk_net_endpoint_descriptor *ep =
+ (struct bulk_net_endpoint_descriptor *) channel->ep;
+
+ bnt->net_ctrl.card = ep->cardname;
+ bnt->net_ctrl.r_port = ep->port;
+ bnt->net_ctrl.r_ip = ep->ip.addr; ///XXX: IP already in network byte order
+ bnt->net_ctrl.queue = ep->queue;
+ bnt->net_ctrl.ws = channel->waitset;
+ bnt->net_ctrl.buffer_size = BULK_NET_CTRL_CHANNEL_BUF_SIZE;
+ /* this is the control channel, has just two buffers */
+ bnt->net_ctrl.buffer_count = ep->buffer_count;
+ bnt->net_ctrl.max_queues = ep->max_queues;
+ bnt->net_ctrl.num_queues = 1;
+ bnt->bulk_control = bnt;
+
+ err = bulk_net_transfer_bind(&bnt->net_ctrl, tcb_transmitted, tcb_received);
+ if (err_is_fail(err)) {
+ free(bnt);
+ return err;
+ }
+
+ bnt->net_ctrl.buffer_size = ep->buffer_size;
+
+ channel->impl_data = bnt;
+ bnt->channel = channel;
+
+ channel->state = BULK_STATE_BINDING;
+
+ bnt->bind_cont = cont;
+
+ send_bind_request(bnt, bnt->net_ctrl.buffer_size, channel->trust,
+ channel->role);
+ send_status_msg();
+
+ return SYS_ERR_OK;
+}
+
+static errval_t impl_channel_assign_pool(struct bulk_channel *channel,
+ struct bulk_pool *pool,
+ struct bulk_continuation cont)
+{
+ BNT_DEBUG_TRACE
+
+ errval_t err;
+ struct bulk_net_nocopy *bnt, *p;
+ struct bulk_pool_internal *pool_int = (struct bulk_pool_internal *) pool;
+
+ p = get_net_nocopy(pool);
+ if (p) {
+ return BULK_TRANSFER_NET_POOL_USED;
+ }
+
+ bnt = (struct bulk_net_nocopy *) channel->impl_data;
+
+ /* need to take the control channel for this */
+ assert(bnt->bulk_control == bnt);
+
+ if (bnt->net_ctrl.buffer_size != pool->buffer_size) {
+ return BULK_TRANSFER_ALLOC_BUFFER_SIZE;
+ }
+
+ if (bnt->net_ctrl.num_queues == bnt->net_ctrl.max_queues) {
+ return BULK_TRANSFER_NET_MAX_QUEUES;
+ }
+
+ uint8_t queueid = bnt->net_ctrl.queue + bnt->net_ctrl.num_queues;
+ bnt->net_ctrl.num_queues++;
+
+ /* allocate a new queue for this pool */
+ p = calloc(1,
+ sizeof(struct bulk_net_nocopy)
+ + pool->num_buffers
+ * sizeof(struct receive_buffer));
+
+ p->meta_rb = (struct receive_buffer *) (p + 1);
+ err = bulk_net_init_meta_rb(p->meta_rb, pool->num_buffers,
+ pool->buffer_size);
+ assert(!err_is_fail(err));
+
+ memcpy(&p->net_ctrl, &bnt->net_ctrl, sizeof(p->net_ctrl));
+ p->net_ctrl.queue = queueid;
+ p->net_ctrl.buffer_count = 0;
+
+ err = bulk_net_transfer_bind(&p->net_ctrl, tcb_transmitted, tcb_received);
+ if (err_is_fail(err)) {
+ free(p);
+ return err;
+ }
+
+ p->channel = channel;
+ p->pool = pool;
+ p->bulk_control = bnt;
+ p->bind_cont = cont;
+
+ assert(!pool_int->impl_data);
+
+ size_t pd_size = sizeof(struct bulk_net_pool_data)
+ + 2 * pool->num_buffers * sizeof(uint32_t);
+
+ struct bulk_net_pool_data *pd = malloc(pd_size);
+
+ if (!pd) {
+ free(p);
+ /* TODO: Free network resources */
+ return BULK_TRANSFER_MEM;
+ }
+
+ pd->buf_id_local_to_remote = (uint32_t *) (pd + 1);
+ pd->buf_id_remote_to_local =
+ (pd->buf_id_local_to_remote + pool->num_buffers);
+ for (uint32_t i = 0; i < pool->num_buffers; ++i) {
+ pd->buf_id_local_to_remote[i] = i;
+ pd->buf_id_remote_to_local[i] = i;
+ }
+ pd->p = p;
+ pool_int->impl_data = pd;
+
+ struct pending_pool_request *req = malloc(
+ sizeof(struct pending_pool_request));
+ if (!req) {
+ free(p);
+ free(pd);
+ /* TODO: free network resources */
+ return BULK_TRANSFER_MEM;
+ }
+
+ req->cont = cont;
+ req->pool = pool;
+ req->bnt = p;
+ if (bnt->pending_pool_requests) {
+ req->next = bnt->pending_pool_requests->next;
+ } else {
+ req->next = NULL;
+ }
+ bnt->pending_pool_requests = req;
+
+ send_pool_assign_request(bnt, pool, p->net_ctrl.l_port);
+
+ return SYS_ERR_OK;
+}
+
+static errval_t impl_channel_move(struct bulk_channel *channel,
+ struct bulk_buffer *buffer,
+ void *meta,
+ struct bulk_continuation cont)
+{
+ struct bulk_pool_internal *pool = (struct bulk_pool_internal *) buffer->pool;
+ struct bulk_net_pool_data *pd = pool->impl_data;
+ struct bulk_net_nocopy *bnt = pd->p;
+
+ send_buffer_move(bnt, buffer, meta, cont);
+ return SYS_ERR_OK;
+}
+
+/**
+ *
+ */
+static errval_t impl_channel_pass(struct bulk_channel *channel,
+ struct bulk_buffer *buffer,
+ void *meta,
+ struct bulk_continuation cont)
+{
+ errval_t err;
+
+ struct bulk_pool_internal *pool = (struct bulk_pool_internal *) buffer->pool;
+ struct bulk_net_pool_data *pd = pool->impl_data;
+ struct bulk_net_nocopy *bnt = pd->p;
+ struct bulk_net_nocopy *p = channel->impl_data;
+
+ if (channel->direction == BULK_DIRECTION_TX) {
+ return BULK_TRANSFER_CHAN_DIRECTION;
+ }
+
+ assert(bnt != NULL);
+
+ struct receive_buffer *rb;
+ rb = stack_alloc_alloc(&bnt->net_ctrl.rb_stack);
+ assert(rb != NULL);
+
+ rb->virt = buffer->address;
+ rb->phys = buffer->phys;
+ rb->buffer = buffer;
+
+ err = bulk_e10k_rx_add(&bnt->net_ctrl.transfer, rb->phys, rb->hdr_phys, rb);
+ if (err_is_fail(err)) {
+ return err;
+ }
+ uint32_t local_id = get_local_bufid(buffer);
+ rb = bnt->meta_rb + local_id;
+ err = bulk_e10k_rx_add(&bnt->net_ctrl.transfer, rb->phys, rb->hdr_phys, rb);
+ if (err_is_fail(err)) {
+ return err;
+ }
+
+ /* send the buffer pass over the control channel */
+ send_buffer_pass(p, buffer, meta, cont);
+ return SYS_ERR_OK;
+}
+
+static errval_t impl_channel_copy(struct bulk_channel *channel,
+ struct bulk_buffer *buffer,
+ void *meta,
+ struct bulk_continuation cont)
+{
+ struct bulk_pool_internal *pool = (struct bulk_pool_internal *) buffer->pool;
+ struct bulk_net_pool_data *pd = pool->impl_data;
+ struct bulk_net_nocopy *bnt = pd->p;
+
+ send_buffer_copy(bnt, buffer, meta, cont);
+ return SYS_ERR_OK;
+}
+
+static errval_t impl_channel_release(struct bulk_channel *channel,
+ struct bulk_buffer *buffer,
+ struct bulk_continuation cont)
+{
+ errval_t err;
+
+ struct bulk_pool_internal *pool = (struct bulk_pool_internal *) buffer->pool;
+ struct bulk_net_pool_data *pd = pool->impl_data;
+ struct bulk_net_nocopy *bnt = pd->p;
+ struct bulk_net_nocopy *p = channel->impl_data;
+
+ if (channel->direction == BULK_DIRECTION_TX) {
+ return BULK_TRANSFER_CHAN_DIRECTION;
+ }
+
+ struct receive_buffer *rb;
+ rb = stack_alloc_alloc(&bnt->net_ctrl.rb_stack);
+ assert(rb != NULL);
+
+ rb->virt = buffer->address;
+ rb->phys = buffer->phys;
+ rb->buffer = buffer;
+
+ err = bulk_e10k_rx_add(&bnt->net_ctrl.transfer, rb->phys, rb->hdr_phys, rb);
+ if (err_is_fail(err)) {
+ return err;
+ }
+ uint32_t local_id = get_local_bufid(buffer);
+ rb = bnt->meta_rb + local_id;
+ err = bulk_e10k_rx_add(&bnt->net_ctrl.transfer, rb->phys, rb->hdr_phys, rb);
+ if (err_is_fail(err)) {
+ return err;
+ }
+ /* send the buffer pass over the control channel */
+ send_buffer_release(p, buffer, bnt->zero_meta, cont);
+ return SYS_ERR_OK;
+}
+
+static struct bulk_implementation bulk_net_implementation = {
+ .channel_create = impl_channel_create,
+ .channel_bind = impl_channel_bind,
+ .assign_pool = impl_channel_assign_pool,
+ .move = impl_channel_move,
+ .pass = impl_channel_pass,
+ .copy = impl_channel_copy,
+ .release = impl_channel_release };
+
+struct bulk_implementation *bulk_net_get_impl_no_copy(void)
+{
+ return &bulk_net_implementation;
+}
+
+++ /dev/null
-/**
- * \file
- * \brief Unidirectional bulk data transfer via shared memory
- */
-
-/*
- * Copyright (c) 2009, 2010, 2011, 2012, ETH Zurich.
- * All rights reserved.
- *
- * This file is distributed under the terms in the attached LICENSE file.
- * If you do not find this file, copies can be found by writing to:
- * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
- */
-
-#include <barrelfish/barrelfish.h>
-
-#include <bulk_transfer/bulk_transfer.h>
-#include <bulk_transfer/bulk_net.h>
-
-
--- /dev/null
+/*
+ * Copyright (c) 2014 ETH Zurich.
+ * All rights reserved.
+ *
+ * This file is distributed under the terms in the attached LICENSE file.
+ * If you do not find this file, copies can be found by writing to:
+ * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
+ */
+
+#include <stdio.h>
+#include <sys/param.h>
+
+#include <barrelfish/barrelfish.h>
+#include <bulk_transfer/bulk_transfer.h>
+#include <ipv4/lwip/inet.h>
+
+#include "bulk_net_backend.h"
+
+#define L234HEADER_SIZE (14 + 20 + 16)
+#define NDESCS 1024
+
+//#define DEBUG(x...) debug_printf(x)
+#define DEBUG(x...) do {} while (0)
+
+
+/******************************************************************************/
+/* Protocol used on the wire */
+
+/** Message types */
+enum proto_msg {
+ PROTO_INVALID = 0,
+ PROTO_BIND_REQUEST,
+ PROTO_BIND_RESPONSE,
+ PROTO_DATA_TRANSFER,
+};
+
+/** Bind request */
+struct proto_trail_bind_req {
+ uint32_t buffer_size;
+ uint32_t meta_size;
+
+ uint8_t type;
+} __attribute__((packed));
+
+/** Bind response */
+struct proto_trail_bind_resp {
+ errval_t err;
+
+ uint8_t type;
+} __attribute__((packed));
+
+/** Data transfer */
+struct proto_trail_data_transfer {
+ uint8_t type;
+} __attribute__((packed));
+
+
+
+/*errval_t (*bind_received)(struct bulk_channel *channel);
+void (*teardown_received)(struct bulk_channel *channel);
+errval_t (*pool_removed)(struct bulk_channel *channel,
+ struct bulk_pool *pool);
+void (*copy_released)(struct bulk_channel *channel,
+ struct bulk_buffer *buffer);*/
+
+static errval_t cb_pool_assigned(struct bulk_channel *channel,
+ struct bulk_pool *pool);
+static void cb_move_received(struct bulk_channel *channel,
+ struct bulk_buffer *buffer,
+ void *meta);
+static void cb_buffer_received(struct bulk_channel *channel,
+ struct bulk_buffer *buffer,
+ void *meta);
+static void cb_copy_received(struct bulk_channel *channel,
+ struct bulk_buffer *buffer,
+ void *meta);
+
+static struct bulk_channel_callbacks callbacks = {
+ .pool_assigned = cb_pool_assigned,
+ .move_received = cb_move_received,
+ .buffer_received = cb_buffer_received,
+ .copy_received = cb_copy_received,
+};
+
+
+/** Adapt MAC/IP/Port combinations in all transmit buffers (header part) */
+static void update_tx_headers(struct bulk_net_proxy *p)
+{
+ size_t i;
+ struct packet_header *hdr;
+ DEBUG("Updating TX headers %"PRIx64" %"PRIx64" frst=%"PRIx64"\n",
+ p->r_mac, p->l_mac, p->tb[NDESCS-2].hdr_phys);
+ for (i = 0; i < NDESCS - 1; i++) {
+ hdr = p->tb[i].hdr_virt;
+ memset(hdr, 0, sizeof(*hdr));
+ memcpy(hdr->l2.dmac, &p->r_mac, 6);
+ memcpy(hdr->l2.smac, &p->l_mac, 6);
+ hdr->l2.type = htons(0x0800);
+
+ hdr->l3.ver_ihl = 5 | (4 << 4);
+ hdr->l3.ttl = 64;
+ hdr->l3.proto = 0x11;
+ hdr->l3.s_ip = htonl(p->l_ip);
+ hdr->l3.d_ip = htonl(p->r_ip);
+
+ hdr->l4.s_port = htons(p->l_port);
+ hdr->l4.d_port = htons(p->r_port);
+ }
+}
+
+static void add_header(struct bulk_net_msgdesc *msg)
+{
+ struct transmit_buffer *tb = msg->parts[1].opaque;
+ struct packet_header *h = tb->hdr_virt;
+ size_t i;
+ size_t len = 0;
+
+ for (i = 1; i < BULK_NET_DESCLEN && msg->parts[i].size != 0; i++) {
+ len += msg->parts[i].size;
+ }
+
+ msg->parts[0].phys = tb->hdr_phys;
+ msg->parts[0].size = sizeof(*h);
+ msg->parts[0].opaque = NULL;
+
+ h->l4.len = htons(len + 8);
+ h->l3.len = htons(len + 8 + 20);
+}
+
+static void strip_padding(struct bulk_net_msgdesc *msg)
+{
+ struct receive_buffer *rb = msg->parts[0].opaque;
+ struct packet_header *h = rb->hdr_virt;
+ size_t len = ntohs(h->l4.len) - 8;
+ size_t i;
+
+ for (i = 1; i < BULK_NET_DESCLEN && msg->parts[i].size != 0; i++) {
+ msg->parts[i].size = MIN(msg->parts[i].size, len);
+ len -= msg->parts[i].size;
+ }
+}
+
+static void dump_rx_msg(struct bulk_net_msgdesc *msg)
+{
+ size_t i, j;
+ uint8_t *data;
+ uintptr_t phys;
+ struct receive_buffer *rb;
+
+#if !DO_MSG_DUMP
+ return;
+#endif
+
+ DEBUG("dump_rx_msg():\n");
+ for (i = 0; i < BULK_NET_DESCLEN && msg->parts[i].size != 0; i++) {
+ rb = msg->parts[i].opaque;
+ DEBUG(" parts[%"PRId64"]: size=%"PRIx64" op=%p ",
+ i, msg->parts[i].size, rb);
+ if (i == 0) {
+ data = rb->hdr_virt;
+ phys = rb->hdr_phys;
+ } else {
+ data = rb->virt;
+ phys = rb->phys;
+ }
+ printf(" phys=%"PRIx64" virt=%p ", phys, data);
+ for (j = 0; j < msg->parts[i].size; j++) {
+ printf("%02"PRIx8" ", data[j]);
+ }
+ printf("\n");
+ }
+}
+
+static void dump_tx_msg(struct bulk_net_msgdesc *msg)
+{
+#if !DO_MSG_DUMP
+ return;
+#endif
+ size_t i, j;
+ uint8_t *data;
+ struct transmit_buffer *tb;
+
+ DEBUG("dump_tx_msg():\n");
+ for (i = 0; i < BULK_NET_DESCLEN && msg->parts[i].size != 0; i++) {
+ DEBUG(" parts[%"PRId64"]: size=%"PRIx64" ", i,
+ msg->parts[i].size);
+ tb = msg->parts[i].opaque;
+ if (i == 0) {
+ tb = msg->parts[1].opaque;
+ data = tb->hdr_virt;
+ } else if (tb->buffer == NULL) {
+ data = tb->int_virt;
+ } else {
+ data = tb->buffer->address;
+ }
+ for (j = 0; j < msg->parts[i].size; j++) {
+ printf("%02"PRIx8" ", data[j]);
+ }
+ printf("\n");
+ }
+}
+
+/******************************************************************************/
+/* Sending messages to other end */
+
+/** Send out bind request */
+static void send_bind_request(struct bulk_net_proxy *p,
+ size_t buffer_size,
+ size_t meta_size)
+{
+ errval_t err;
+ struct proto_trail_bind_req *t;
+ struct transmit_buffer *tb;
+ struct bulk_net_msgdesc msg;
+
+ tb = stack_alloc_alloc(&p->tb_stack);
+ assert(tb != NULL);
+
+ t = tb->int_virt;
+ t->buffer_size = buffer_size;
+ t->meta_size = meta_size;
+ t->type = PROTO_BIND_REQUEST;
+
+ msg.parts[1].phys = tb->int_phys;
+ msg.parts[1].size = sizeof(*t);
+ msg.parts[1].opaque = tb;
+ msg.parts[2].size = 0;
+
+ add_header(&msg);
+ err = bulk_e10k_send(&p->transfer, &msg);
+ assert(err_is_ok(err));
+}
+
+/** Send out bind response */
+static void send_bind_response(struct bulk_net_proxy *p,
+ errval_t err)
+{
+ struct proto_trail_bind_resp *t;
+ struct transmit_buffer *tb;
+ struct bulk_net_msgdesc msg;
+
+ tb = stack_alloc_alloc(&p->tb_stack);
+ assert(tb != NULL);
+
+ t = tb->int_virt;
+ t->err = err;
+ t->type = PROTO_BIND_RESPONSE;
+
+ msg.parts[1].phys = tb->int_phys;
+ msg.parts[1].size = sizeof(*t);
+ msg.parts[1].opaque = tb;
+ msg.parts[2].size = 0;
+
+ add_header(&msg);
+ err = bulk_e10k_send(&p->transfer, &msg);
+ assert(err_is_ok(err));
+}
+
+/** Send data transfer */
+static void send_data_transfer(struct bulk_net_proxy *p,
+ struct bulk_buffer *b,
+ void *meta,
+ bool is_copy)
+{
+ DEBUG("send_data_transfer()\n");
+ errval_t err;
+ struct proto_trail_data_transfer *t;
+ struct transmit_buffer *tb_d, *tb;
+ struct bulk_net_msgdesc msg;
+
+ tb_d = stack_alloc_alloc(&p->tb_stack);
+ assert(tb_d != NULL);
+ tb_d->buffer = b;
+ tb_d->is_copy = is_copy;
+
+ // prepare trailer
+ tb = stack_alloc_alloc(&p->tb_stack);
+ assert(tb != NULL);
+
+ memcpy(tb->int_virt, meta, p->channel.meta_size);
+ t = (void *) ((uint8_t *) tb->int_virt + p->channel.meta_size);
+ t->type = PROTO_DATA_TRANSFER;
+
+ msg.parts[1].phys = b->phys;
+ msg.parts[1].size = p->buffer_size;
+ msg.parts[1].opaque = tb_d;
+ msg.parts[2].phys = tb->int_phys;
+ msg.parts[2].size = sizeof(*t) + p->channel.meta_size;
+ msg.parts[2].opaque = tb;
+ msg.parts[3].size = 0;
+
+ add_header(&msg);
+ dump_tx_msg(&msg);
+ err = bulk_e10k_send(&p->transfer, &msg);
+ assert(err_is_ok(err));
+ DEBUG("sent_data_transfer()\n");
+}
+
+
+/******************************************************************************/
+/* Receiving messages from other end */
+
+static void free_rb(struct bulk_net_proxy *p,
+ struct receive_buffer *rb)
+{
+ if (rb->buffer == NULL) {
+ // Temporary initialization buffer -> do not reenqueue after
+ // initialization is done
+ if (p->net_bound) {
+ // TODO: free, currently leaking here
+ stack_alloc_free(&p->rb_stack, rb);
+ return;
+ }
+ }
+
+ bulk_e10k_rx_add(&p->transfer, rb->phys, rb->hdr_phys, rb);
+}
+
+static void free_rx(struct bulk_net_proxy *p,
+ struct bulk_net_msgdesc *msg)
+{
+ size_t i;
+
+ for (i = 1; i < BULK_NET_DESCLEN && msg->parts[i].size != 0; i++) {
+ free_rb(p, msg->parts[i].opaque);
+ }
+}
+
+/** Handle received bind request */
+static void bind_req_received(struct bulk_net_proxy *p,
+ struct proto_trail_bind_req *t,
+ struct bulk_net_msgdesc *msg)
+{
+ struct receive_buffer *rb = msg->parts[0].opaque;
+ struct packet_header *hdr = rb->hdr_virt;
+
+ if (p->net_bound) {
+ DEBUG("Ignoring bind request to already bound proxy\n");
+ goto free;
+ }
+
+ p->r_mac = 0;
+ memcpy(&p->r_mac, hdr->l2.smac, 6);
+ p->r_ip = ntohl(hdr->l3.s_ip);
+ p->r_port = ntohs(hdr->l4.s_port);
+
+ update_tx_headers(p);
+
+ assert(t->buffer_size == p->buffer_size);
+ send_bind_response(p, SYS_ERR_OK);
+ p->net_bound = true;
+ p->connected(p);
+
+free:
+ free_rx(p, msg);
+}
+
+/** Handle received bind response */
+static void bind_resp_received(struct bulk_net_proxy *p,
+ struct proto_trail_bind_resp *t,
+ struct bulk_net_msgdesc *msg)
+{
+ if (p->net_bound) {
+ DEBUG("Ignoring bind response to already bound proxy\n");
+ goto free;
+ }
+
+ if (err_is_ok(t->err)) {
+ p->net_bound = true;
+ p->connected(p);
+ } else {
+ USER_PANIC("Remote bind attempt failed\n");
+ }
+
+free:
+ free_rx(p, msg);
+}
+
+/** Handle received data transfer */
+static void data_transfer_received(struct bulk_net_proxy *p,
+ struct proto_trail_data_transfer *t,
+ struct bulk_net_msgdesc *msg)
+{
+ errval_t err;
+ struct receive_buffer *rb;
+ struct bulk_buffer *buffer;
+
+ assert(msg->parts[1].size == p->buffer_size);
+ // TODO: assumes that meta_size has a reasonably small size
+ assert(msg->parts[2].size == p->channel.meta_size + sizeof(*t));
+ assert(msg->parts[3].size == 0);
+
+ rb = msg->parts[1].opaque;
+ buffer = rb->buffer;
+ stack_alloc_free(&p->rb_stack, rb);
+
+ rb = msg->parts[2].opaque;
+
+ err = bulk_channel_move(&p->channel, buffer, rb->virt, p->panic_cont);
+ assert(err_is_ok(err));
+
+ free_rb(p, rb);
+}
+
+static void tcb_received(struct bulk_e10k* bu, struct bulk_net_msgdesc *msg)
+{
+ struct bulk_net_proxy *p = bu->opaque;
+ size_t i;
+ struct receive_buffer *rb;
+ uint8_t *t;
+ DEBUG("tcb_received()\n");
+
+ assert(msg->parts[0].size == sizeof(struct packet_header));
+ dump_rx_msg(msg);
+ strip_padding(msg);
+ dump_rx_msg(msg);
+
+ for (i = 0; i < BULK_NET_DESCLEN && msg->parts[i].size != 0; i++);
+ i--;
+
+ rb = msg->parts[i].opaque;
+ t = rb->virt;
+ switch (t[msg->parts[i].size - 1]) {
+ case PROTO_BIND_REQUEST:
+ DEBUG("Received bind request\n");
+ bind_req_received(p, (struct proto_trail_bind_req *) (t +
+ msg->parts[i].size - sizeof(struct proto_trail_bind_req)),
+ msg);
+ break;
+
+ case PROTO_BIND_RESPONSE:
+ DEBUG("Received bind response\n");
+ bind_resp_received(p, (struct proto_trail_bind_resp *) (t +
+ msg->parts[i].size - sizeof(struct proto_trail_bind_resp)),
+ msg);
+ break;
+
+ case PROTO_DATA_TRANSFER:
+ DEBUG("Received data transfer\n");
+ data_transfer_received(p, (struct proto_trail_data_transfer *) (t +
+ msg->parts[i].size -
+ sizeof(struct proto_trail_data_transfer)), msg);
+ break;
+
+ default:
+ USER_PANIC("Unexpected message type received\n");
+ }
+}
+
+
+/******************************************************************************/
+/* Management of network channel */
+
+static void tcb_transmitted(struct bulk_e10k *bu, void *opaque)
+{
+ struct bulk_net_proxy *p = bu->opaque;
+ struct transmit_buffer *tb = opaque;
+ errval_t err;
+ DEBUG("tcb_transmitted()\n");
+
+ if (opaque == NULL) {
+ // We can ignore the header buffers
+ return;
+ }
+
+ // If there is a bulk buffer attached, need to pass it back
+ if (tb->buffer != NULL) {
+ if (tb->is_copy) {
+ err = bulk_channel_release(&p->channel, tb->buffer, p->panic_cont);
+ } else {
+ err = bulk_channel_pass(&p->channel, tb->buffer, p->zero_meta,
+ p->panic_cont);
+ }
+ assert(err_is_ok(err));
+ tb->buffer = NULL;
+ }
+ stack_alloc_free(&p->tb_stack, tb);
+}
+
+static errval_t t_init(struct bulk_net_proxy *p)
+{
+ errval_t err;
+ size_t i;
+ size_t n = NDESCS - 1;
+ struct receive_buffer *rb;
+ struct transmit_buffer *tb;
+ void *h_vbase, *i_vbase;
+ uintptr_t h_pbase, i_pbase;
+
+ p->net_bound = false;
+ p->transfer.opaque = p;
+
+ err = bulk_e10k_init(&p->transfer, p->ws, p->card, p->queue, p->buffer_size,
+ NDESCS, tcb_received, tcb_transmitted);
+
+ stack_alloc_init(&p->rb_stack, n);
+ stack_alloc_init(&p->tb_stack, n);
+ rb = calloc(n, sizeof(*rb));
+ p->tb = tb = calloc(n, sizeof(*tb));
+
+ err = allocmap_frame(E10K_HDRSZ * n * 2, &h_vbase, &h_pbase, NULL);
+ assert(err_is_ok(err));
+ err = allocmap_frame(INT_BUFSZ * n, &i_vbase, &i_pbase, NULL);
+ assert(err_is_ok(err));
+
+ for (i = 0; i < n; i++) {
+ rb[i].hdr_virt = h_vbase;
+ rb[i].hdr_phys = h_pbase;
+ h_pbase += E10K_HDRSZ;
+ h_vbase = (void *) ((uintptr_t) h_vbase + E10K_HDRSZ);
+
+ tb[i].hdr_virt = h_vbase;
+ tb[i].hdr_phys = h_pbase;
+ tb[i].int_virt = i_vbase;
+ tb[i].int_phys = i_pbase;
+ h_pbase += E10K_HDRSZ;
+ h_vbase = (void *) ((uintptr_t) h_vbase + E10K_HDRSZ);
+ i_pbase += INT_BUFSZ;
+ i_vbase = (void *) ((uintptr_t) i_vbase + INT_BUFSZ);
+
+ stack_alloc_free(&p->rb_stack, rb + i);
+ stack_alloc_free(&p->tb_stack, tb + i);
+ }
+
+ rb = stack_alloc_alloc(&p->rb_stack);
+ rb->buffer = NULL;
+ err = allocmap_frame(p->buffer_size, &rb->virt, &rb->phys, NULL);
+ assert(err_is_ok(err));
+
+ err = bulk_e10k_rx_add(&p->transfer, rb->phys, rb->hdr_phys, rb);
+ assert(err_is_ok(err));
+
+ p->l_mac = p->transfer.mac;
+ bulk_e10k_ip_info(&p->transfer, &p->l_ip);
+ return err;
+}
+
+static errval_t t_export(struct bulk_net_proxy *p)
+{
+ errval_t err;
+
+ err = t_init(p);
+ if (err_is_fail(err)) {
+ return err;
+ }
+
+ err = bulk_e10k_port_add(&p->transfer, p->l_port);
+ if (err_is_fail(err)) {
+ return err;
+ }
+
+ return err;
+}
+
+static errval_t t_bind(struct bulk_net_proxy *p)
+{
+ errval_t err;
+
+ err = t_init(p);
+ if (err_is_fail(err)) {
+ return err;
+ }
+
+ err = bulk_e10k_arp_lookup(&p->transfer, p->r_ip, &p->r_mac);
+ if (err_is_fail(err)) {
+ return err;
+ }
+
+ err = bulk_e10k_port_alloc(&p->transfer, &p->l_port);
+ if (err_is_fail(err)) {
+ return err;
+ }
+
+ update_tx_headers(p);
+ return err;
+}
+
+/******************************************************************************/
+/* Bulk transfer callbacks */
+
+static errval_t cb_pool_assigned(struct bulk_channel *channel,
+ struct bulk_pool *pool)
+{
+ struct bulk_net_proxy *p = channel->user_state;
+ assert(pool->buffer_size == p->buffer_size);
+ return SYS_ERR_OK;
+}
+
+static void cb_move_received(struct bulk_channel *channel,
+ struct bulk_buffer *buffer,
+ void *meta)
+{
+ DEBUG("cb_move_received()\n");
+ struct bulk_net_proxy *p = channel->user_state;
+ assert(p->bulk_bound && p->net_bound);
+ send_data_transfer(p, buffer, meta, false);
+}
+
+static void cb_buffer_received(struct bulk_channel *channel,
+ struct bulk_buffer *buffer,
+ void *meta)
+{
+ DEBUG("cb_buffer_received(b=%p,b->p=%"PRIx64")\n", buffer,
+ buffer->phys);
+ errval_t err;
+ struct bulk_net_proxy *p = channel->user_state;
+ struct receive_buffer *rb;
+ assert(p->bulk_bound && p->net_bound);
+
+ rb = stack_alloc_alloc(&p->rb_stack);
+ assert(rb != NULL);
+
+ rb->virt = buffer->address;
+ rb->phys = buffer->phys;
+ rb->buffer = buffer;
+
+ err = bulk_e10k_rx_add(&p->transfer, rb->phys, rb->hdr_phys, rb);
+ assert(err_is_ok(err));
+ DEBUG("added buffer to rx queue\n");
+}
+
+static void cb_copy_received(struct bulk_channel *channel,
+ struct bulk_buffer *buffer,
+ void *meta)
+{
+ DEBUG("cb_copy_received()\n");
+ struct bulk_net_proxy *p = channel->user_state;
+ assert(p->bulk_bound && p->net_bound);
+ send_data_transfer(p, buffer, meta, true);
+}
+
+
+/******************************************************************************/
+/* Initialization */
+
+static void cb_bind(void *arg, errval_t err, struct bulk_channel *c)
+{
+ struct bulk_net_proxy *p = arg;
+ p->err = err;
+ p->bulk_bound = true;
+}
+
+static errval_t channel_bind(struct bulk_net_proxy *p,
+ struct bulk_endpoint_descriptor *epd)
+{
+ errval_t err;
+ struct bulk_channel_bind_params bind_params = {
+ .role = BULK_ROLE_SLAVE,
+ .trust = BULK_TRUST_FULL,
+ .waitset = p->ws,
+ };
+ struct bulk_continuation cont = {
+ .handler = cb_bind,
+ .arg = p,
+ };
+ DEBUG("before bulk_channel_bind, %p\n", epd->f->channel_bind);
+
+
+ p->bulk_bound = false;
+ err = bulk_channel_bind(&p->channel, epd, &callbacks, &bind_params, cont);
+ if (err_is_fail(err)) {
+ return err;
+ }
+
+ p->channel.user_state = p;
+
+ while (!p->bulk_bound) {
+ event_dispatch(p->ws);
+ }
+
+ p->zero_meta = calloc(1, p->channel.meta_size);
+
+ return p->err;
+}
+
+errval_t bulk_net_proxy_listen(struct bulk_net_proxy *p,
+ struct bulk_endpoint_descriptor *desc,
+ struct waitset *ws,
+ size_t buffer_size,
+ const char *card,
+ uint8_t queue,
+ uint16_t port,
+ void (*connected)(struct bulk_net_proxy *))
+{
+ errval_t err;
+ p->card = card;
+ p->queue = queue;
+ p->ws = ws;
+ p->buffer_size = buffer_size;
+ p->l_port = port;
+ p->connected = connected;
+
+ err = channel_bind(p, desc);
+ if (err_is_fail(err)) {
+ return err;
+ }
+
+ return t_export(p);
+}
+
+errval_t bulk_net_proxy_connect(struct bulk_net_proxy *p,
+ struct bulk_endpoint_descriptor *desc,
+ struct waitset *ws,
+ size_t buffer_size,
+ const char *card,
+ uint8_t queue,
+ uint32_t ip,
+ uint16_t port,
+ void (*connected)(struct bulk_net_proxy *))
+{
+ errval_t err;
+ DEBUG("inside proxy connect, %p\n", ws);
+ p->ws = ws;
+ p->card = card;
+ p->queue = queue;
+ p->r_port = port;
+ p->buffer_size = buffer_size;
+ p->r_ip = ip;
+ p->connected = connected;
+
+ DEBUG("before channel bind. %p, %p, %p\n", p, desc, desc->f->channel_bind);
+ err = channel_bind(p, desc);
+ if (err_is_fail(err)) {
+ return err;
+ }
+
+ DEBUG("before tbind\n");
+ err = t_bind(p);
+ if (err_is_fail(err)) {
+ return err;
+ }
+
+ DEBUG("Sending bind request...\n");
+ send_bind_request(p, p->buffer_size, p->channel.meta_size);
+ DEBUG("Sent bind request\n");
+ return SYS_ERR_OK;
+}
+
--- /dev/null
+/*
+ * Copyright (c) 2014 ETH Zurich.
+ * All rights reserved.
+ *
+ * This file is distributed under the terms in the attached LICENSE file.
+ * If you do not find this file, copies can be found by writing to:
+ * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
+ */
+
+#include <stdio.h>
+#include <sys/param.h>
+
+#include <barrelfish/barrelfish.h>
+#include <bulk_transfer/bulk_transfer.h>
+#include <ipv4/lwip/inet.h>
+
+#include "bulk_net_backend.h"
+#include "bulk_net_transfer.h"
+
+#if BULK_NET_ENABLE_DEBUG_TRANSF
+#define BT_DEBUG_TRACE BULK_NET_TRACE
+#define BT_DEBUG(fmt, msg...) BULK_NET_DEBUG(fmt, msg)
+#else
+#define BT_DEBUG(fmt, msg...) do{}while(0);
+#define BT_DEBUG_TRACE do{}while(0);
+#endif
+
+#if BULK_NET_ENABLE_STATUS_TRANSF
+#define BT_STATUS(fmt, msg...) BULK_NET_STATUS(fmt, msg)
+#else
+#define BT_STATUS(fmt, msg...) do{} while(0);
+#endif
+
+errval_t bulk_net_transfer_bind(struct bulk_net_control *tc,
+ void (*tx_cb)(struct bulk_e10k *bu,
+ void *opaque),
+ void (*rx_cb)(struct bulk_e10k* bu,
+ struct bulk_net_msgdesc *msg))
+{
+ errval_t err;
+
+ err = bulk_net_transfer_init(tc, tx_cb, rx_cb);
+ if (err_is_fail(err)) {
+ return err;
+ }
+
+ err = bulk_e10k_arp_lookup(&tc->transfer, tc->r_ip, &tc->r_mac);
+ if (err_is_fail(err)) {
+ return err;
+ }
+
+ err = bulk_e10k_port_alloc(&tc->transfer, &tc->l_port);
+ if (err_is_fail(err)) {
+ return err;
+ }
+
+ bulk_net_transfer_update_tx_headers(tc);
+ return err;
+}
+
+errval_t bulk_net_transfer_export(struct bulk_net_control *tc,
+ void (*tx_cb)(struct bulk_e10k *bu,
+ void *opaque),
+ void (*rx_cb)(struct bulk_e10k* bu,
+ struct bulk_net_msgdesc *msg))
+{
+ errval_t err;
+
+ err = bulk_net_transfer_init(tc, tx_cb, rx_cb);
+ if (err_is_fail(err)) {
+ return err;
+ }
+
+ err = bulk_e10k_port_add(&tc->transfer, tc->l_port);
+ if (err_is_fail(err)) {
+ return err;
+ }
+
+ return err;
+}
+
+errval_t bulk_net_transfer_init(struct bulk_net_control *tc,
+ void (*tx_cb)(struct bulk_e10k *bu,
+ void *opaque),
+ void (*rx_cb)(struct bulk_e10k* bu,
+ struct bulk_net_msgdesc *msg))
+{
+ errval_t err;
+ size_t i;
+ size_t n = BULK_NET_TRANSFER_NUM_DESCS - 1;
+ struct receive_buffer *rb;
+ struct transmit_buffer *tb;
+ void *h_vbase, *i_vbase;
+ uintptr_t h_pbase, i_pbase;
+
+ tc->transfer.opaque = tc;
+
+ err = bulk_e10k_init(&tc->transfer, tc->ws, tc->card, tc->queue,
+ tc->buffer_size,
+ BULK_NET_TRANSFER_NUM_DESCS,
+ rx_cb, tx_cb);
+
+ stack_alloc_init(&tc->rb_stack, n);
+ stack_alloc_init(&tc->tb_stack, n);
+ rb = calloc(n, sizeof(*rb));
+ tc->tb = tb = calloc(n, sizeof(*tb));
+
+ err = allocmap_frame(E10K_HDRSZ * n * 2, &h_vbase, &h_pbase, NULL);
+ assert(err_is_ok(err));
+ err = allocmap_frame(BULK_NET_INTERNAL_BUFER_SIZE * n, &i_vbase, &i_pbase, NULL);
+ assert(err_is_ok(err));
+
+ for (i = 0; i < n; i++) {
+ rb[i].hdr_virt = h_vbase;
+ rb[i].hdr_phys = h_pbase;
+ h_pbase += E10K_HDRSZ;
+ h_vbase = (void *) ((uintptr_t) h_vbase + E10K_HDRSZ);
+
+ tb[i].hdr_virt = h_vbase;
+ tb[i].hdr_phys = h_pbase;
+ tb[i].int_virt = i_vbase;
+ tb[i].int_phys = i_pbase;
+ h_pbase += E10K_HDRSZ;
+ h_vbase = (void *) ((uintptr_t) h_vbase + E10K_HDRSZ);
+ i_pbase += BULK_NET_INTERNAL_BUFER_SIZE;
+ i_vbase = (void *) ((uintptr_t) i_vbase + BULK_NET_INTERNAL_BUFER_SIZE);
+
+ stack_alloc_free(&tc->rb_stack, rb + i);
+ stack_alloc_free(&tc->tb_stack, tb + i);
+ }
+
+ for (uint32_t j=0; j < tc->buffer_count; ++j) {
+ rb = stack_alloc_alloc(&tc->rb_stack);
+ rb->buffer = NULL;
+ err = allocmap_frame(tc->buffer_size, &rb->virt, &rb->phys, NULL);
+ assert(err_is_ok(err));
+
+ err = bulk_e10k_rx_add(&tc->transfer, rb->phys, rb->hdr_phys, rb);
+ assert(err_is_ok(err));
+ }
+ tc->l_mac = tc->transfer.mac;
+ bulk_e10k_ip_info(&tc->transfer, &tc->l_ip);
+ return err;
+}
+
+void bulk_net_transfer_update_tx_headers(struct bulk_net_control *p)
+{
+ size_t i;
+ struct packet_header *hdr;
+
+ BT_DEBUG("Updating TX headers %"PRIx64" %"PRIx64" frst=%"PRIx64"\n",
+ p->r_mac, p->l_mac,
+ p->tb[BULK_NET_TRANSFER_NUM_DESCS - 2].hdr_phys);
+
+ for (i = 0; i < BULK_NET_TRANSFER_NUM_DESCS - 1; i++) {
+ hdr = p->tb[i].hdr_virt;
+ memset(hdr, 0, sizeof(*hdr));
+ memcpy(hdr->l2.dmac, &p->r_mac, 6);
+ memcpy(hdr->l2.smac, &p->l_mac, 6);
+ hdr->l2.type = htons(0x0800);
+
+ hdr->l3.ver_ihl = 5 | (4 << 4);
+ hdr->l3.ttl = 64;
+ hdr->l3.proto = 0x11;
+ hdr->l3.s_ip = htonl(p->l_ip);
+ hdr->l3.d_ip = htonl(p->r_ip);
+
+ hdr->l4.s_port = htons(p->l_port);
+ hdr->l4.d_port = htons(p->r_port);
+ }
+}
+
+void bulk_net_transfer_add_header(struct bulk_net_msgdesc *msg)
+{
+ struct transmit_buffer *tb = msg->parts[1].opaque;
+ struct packet_header *h = tb->hdr_virt;
+ size_t i;
+ size_t len = 0;
+
+ for (i = 1; i < BULK_NET_DESCLEN && msg->parts[i].size != 0; i++) {
+ len += msg->parts[i].size;
+ }
+
+ msg->parts[0].phys = tb->hdr_phys;
+ msg->parts[0].size = sizeof(*h);
+ msg->parts[0].opaque = NULL;
+
+ h->l4.len = htons(len + 8);
+ h->l3.len = htons(len + 8 + 20);
+}
+
+void bulk_net_transfer_strip_padding(struct bulk_net_msgdesc *msg)
+{
+ struct receive_buffer *rb = msg->parts[0].opaque;
+ struct packet_header *h = rb->hdr_virt;
+ size_t len = ntohs(h->l4.len) - 8;
+ size_t i;
+
+ for (i = 1; i < BULK_NET_DESCLEN && msg->parts[i].size != 0; i++) {
+ msg->parts[i].size = MIN(msg->parts[i].size, len);
+ len -= msg->parts[i].size;
+ }
+}
+
+void bulk_net_transfer_free_rb(struct bulk_net_control *tc,
+ struct receive_buffer *rb)
+{
+ if (rb->buffer == NULL) {
+ // Temporary initialization buffer -> do not reenqueue after
+ // initialization is done
+ if (false) {
+ // TODO: free, currently leaking here
+ stack_alloc_free(&tc->rb_stack, rb);
+ return;
+ }
+ }
+
+ bulk_e10k_rx_add(&tc->transfer, rb->phys, rb->hdr_phys, rb);
+}
+
+void bulk_net_transfer_free_rx(struct bulk_net_control *tc,
+ struct bulk_net_msgdesc *msg)
+{
+ size_t i;
+
+ for (i = 1; i < BULK_NET_DESCLEN && msg->parts[i].size != 0; i++) {
+ bulk_net_transfer_free_rb(tc, msg->parts[i].opaque);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 ETH Zurich.
+ * All rights reserved.
+ *
+ * This file is distributed under the terms in the attached LICENSE file.
+ * If you do not find this file, copies can be found by writing to:
+ * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
+ */
+
+#ifndef BULK_NET_TRANSFER_H
+#define BULK_NET_TRANSFER_H
+
+#include <barrelfish/barrelfish.h>
+#include <bulk_transfer/bulk_transfer.h>
+#include <ipv4/lwip/inet.h>
+
+#include "bulk_net_backend.h"
+#include "bulk_net_e10k.h"
+
+
+
+struct bulk_net_control
+{
+ struct bulk_e10k transfer;
+
+ size_t buffer_size;
+ size_t buffer_count;
+
+ struct waitset *ws;
+
+ struct stack_allocator rb_stack;
+ struct transmit_buffer *tb;
+ struct stack_allocator tb_stack;
+
+ const char *card;
+ uint8_t queue;
+ uint8_t max_queues;
+ uint8_t num_queues;
+ uint16_t l_port;
+ uint16_t r_port;
+ uint32_t l_ip;
+ uint32_t r_ip;
+ uint64_t l_mac;
+ uint64_t r_mac;
+};
+
+errval_t bulk_net_transfer_bind(struct bulk_net_control *tc,
+ void (*tx_cb)(struct bulk_e10k *bu,
+ void *opaque),
+ void (*rx_cb)(struct bulk_e10k* bu,
+ struct bulk_net_msgdesc *msg));
+
+errval_t bulk_net_transfer_export(struct bulk_net_control *tc,
+ void (*tx_cb)(struct bulk_e10k *bu,
+ void *opaque),
+ void (*rx_cb)(struct bulk_e10k* bu,
+ struct bulk_net_msgdesc *msg));
+
+errval_t bulk_net_transfer_init(struct bulk_net_control *tc,
+ void (*tx_cb)(struct bulk_e10k *bu,
+ void *opaque),
+ void (*rx_cb)(struct bulk_e10k* bu,
+ struct bulk_net_msgdesc *msg));
+
+void bulk_net_transfer_strip_padding(struct bulk_net_msgdesc *msg);
+
+void bulk_net_transfer_add_header(struct bulk_net_msgdesc *msg);
+
+void bulk_net_transfer_update_tx_headers(struct bulk_net_control *p);
+
+void bulk_net_transfer_free_rb(struct bulk_net_control *tc,
+ struct receive_buffer *rb);
+
+void bulk_net_transfer_free_rx(struct bulk_net_control *tc,
+ struct bulk_net_msgdesc *msg);
+
+#endif /* BULK_NET_TRANSFER_H */
--- /dev/null
+/*
+ * Copyright (c) 2014 ETH Zurich.
+ * All rights reserved.
+ *
+ * This file is distributed under the terms in the attached LICENSE file.
+ * If you do not find this file, copies can be found by writing to:
+ * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
+ */
+
+#include <stdio.h>
+#include <sys/param.h>
+
+#include <barrelfish/barrelfish.h>
+#include <bulk_transfer/bulk_transfer.h>
+#include <bulk_transfer/bulk_net.h>
+#include <bulk_transfer/bulk_allocator.h>
+#include <ipv4/lwip/inet.h>
+
+#include "../../bulk_pool.h"
+#include "../../bulk_buffer.h"
+#include "../../helpers.h"
+
+#include "bulk_net_backend.h"
+#include "bulk_net_transfer.h"
+
+#if BULK_NET_ENABLE_DEBUG_BACKEND
+#define BNT_DEBUG_TRACE BULK_NET_TRACE
+#define BNT_DEBUG(fmt, msg...) BULK_NET_DEBUG(fmt, msg)
+#else
+#define BNT_DEBUG(fmt, msg...) do{}while(0);
+#define BNT_DEBUG_TRACE do{}while(0);
+#endif
+
+#if BULK_NET_ENABLE_STATUS_BACKEND
+#define BNT_STATUS(fmt, msg...) debug_printf("%s(): "fmt"\n", __func__, msg);
+#else
+#define BNT_STATUS(fmt, msg...) do{} while(0);
+#endif
+
+#define BULK_NET_BUFFER_SIZE 0x1000
+
+struct pending_pool_request
+{
+ struct bulk_pool *pool;
+ struct bulk_continuation cont;
+ struct pending_pool_request *next;
+};
+
+struct transmit_buffer;
+struct receive_buffer;
+struct bulk_net_transp
+{
+ struct bulk_net_control net_ctrl;
+
+ struct bulk_channel *channel;
+ struct bulk_continuation bind_cont;
+ struct pending_pool_request *pending_pool_requests;
+
+ errval_t err;
+ bool bound;
+ struct bulk_continuation panic_cont;
+ void *zero_meta;
+
+ void *user_state;
+};
+
+enum proto_msg
+{
+ PROTO_INVALID,
+ PROTO_BIND_REQUEST,
+ PROTO_BIND_RESPONSE,
+ PROTO_POOL_REQUEST,
+ PROTO_POOL_RESPONSE,
+ PROTO_BUFFER_MOVE,
+ PROTO_BUFFER_COPY,
+ PROTO_BUFFER_PASS,
+ PROTO_BUFFER_RELEASE,
+ PROTO_STATUS,
+
+ /* NOT IMPLEMENTED */
+ PROTO_POOL_REMOVE,
+ PROTO_TEARDOWN
+};
+
+struct proto_trail_bind_req
+{
+ uint32_t buffer_size;
+ uint8_t trust_level;
+ uint8_t role;
+ /* XXX: there are no constraints on this channel */
+
+ uint8_t type;
+}__attribute__((packed));
+
+struct proto_trail_bind_resp
+{
+ uint32_t buffer_size; ///< XXX: given by the creator side
+ uint32_t meta_size; ///< XXX: given by the creator side
+ uint8_t direction;
+ uint8_t trust_level;
+ uint8_t role;
+ errval_t err;
+
+ uint8_t type;
+}__attribute__((packed));
+
+struct proto_trail_pool_req
+{
+ uint32_t buffer_count;
+ uint32_t buffer_size;
+ uint32_t pool_machine_id;
+ domainid_t pool_domain_id;
+ uint32_t pool_local_id;
+
+ uint8_t type;
+}__attribute__((packed));
+
+struct proto_trail_pool_resp
+{
+ errval_t err;
+ uint32_t pool_machine_id;
+ domainid_t pool_domain_id;
+ uint32_t pool_local_id;
+
+ uint8_t type;
+}__attribute__((packed));
+
+struct proto_trail_move
+{
+ uint32_t pool_machine_id;
+ domainid_t pool_domain_id;
+ uint32_t pool_local_id;
+ uint32_t buffer_id;
+
+ uint8_t type;
+}__attribute__((packed));
+
+struct proto_trail_copy
+{
+ uint32_t pool_machine_id;
+ domainid_t pool_domain_id;
+ uint32_t pool_local_id;
+ uint32_t buffer_id;
+
+ uint8_t type;
+}__attribute__((packed));
+
+struct proto_trail_pass
+{
+ uint32_t pool_machine_id;
+ domainid_t pool_domain_id;
+ uint32_t pool_local_id;
+ uint32_t buffer_id;
+
+ uint8_t type;
+}__attribute__((packed));
+
+struct proto_trail_release
+{
+ uint32_t pool_machine_id;
+ domainid_t pool_domain_id;
+ uint32_t pool_local_id;
+ uint32_t buffer_id;
+
+ uint8_t type;
+}__attribute__((packed));
+
+struct proto_trail_status
+{
+ errval_t err;
+
+ uint8_t type;
+}__attribute__((packed));
+
+/* ----------------------------- pools ------------------------------------- */
+static inline struct bulk_buffer *get_buffer(struct bulk_channel *chan,
+ struct bulk_pool_id *pool_id,
+ uint32_t buffer_id)
+{
+ struct bulk_pool *pool = bulk_pool_get(pool_id, chan);
+ assert(pool);
+ assert(buffer_id < pool->num_buffers);
+ return pool->buffers[buffer_id];
+}
+
+/* --------------------------- binding ------------------------------------- */
+
+static void send_bind_response(struct bulk_net_transp *p,
+ uint32_t buffer_size,
+ uint32_t meta_size,
+ uint8_t direction,
+ uint8_t role,
+ uint8_t trust,
+ errval_t err)
+{
+ BNT_DEBUG_TRACE
+
+ struct proto_trail_bind_resp *t;
+ struct transmit_buffer *tb;
+ struct bulk_net_msgdesc msg;
+
+ tb = stack_alloc_alloc(&p->net_ctrl.tb_stack);
+ assert(tb != NULL);
+
+ t = tb->int_virt;
+ t->err = err;
+ t->buffer_size = buffer_size;
+ t->meta_size = meta_size;
+ t->direction = direction;
+ t->role = role;
+ t->trust_level = trust;
+ t->type = PROTO_BIND_RESPONSE;
+
+ msg.parts[1].phys = tb->int_phys;
+ msg.parts[1].size = sizeof(*t);
+ msg.parts[1].opaque = tb;
+ msg.parts[2].size = 0;
+
+ bulk_net_transfer_add_header(&msg);
+ err = bulk_e10k_send(&p->net_ctrl.transfer, &msg);
+ assert(err_is_ok(err));
+}
+
+static void handle_bind_response(struct bulk_net_transp *p,
+ struct proto_trail_bind_resp *t,
+ struct bulk_net_msgdesc *msg)
+{
+ BNT_DEBUG_TRACE
+
+ if (p->bound) {
+ debug_printf("Ignoring bind response to already bound\n");
+ goto free;
+ }
+
+ struct bulk_channel *chan = p->channel;
+
+ assert(chan->state == BULK_STATE_BINDING);
+
+ chan->meta_size = t->meta_size;
+ chan->trust = t->trust_level;
+ chan->role = t->role;
+ chan->direction = t->direction;
+
+ if (err_is_fail(t->err)) {
+ BNT_DEBUG("ERROR: binding failed. %s\n", err_getstring(t->err));
+ chan->state = BULK_STATE_CLOSED;
+ } else {
+ BNT_DEBUG("SUCCESS: channel %p bound", chan);
+ chan->state = BULK_STATE_CONNECTED;
+ p->bound = true;
+ }
+
+ p->zero_meta = calloc(1, chan->meta_size);
+
+ if (p->bind_cont.handler) {
+ p->bind_cont.handler(p->bind_cont.arg, t->err, p->channel);
+ }
+
+ free: bulk_net_transfer_free_rx(&p->net_ctrl, msg);
+}
+
+static void send_bind_request(struct bulk_net_transp *p,
+ uint32_t buffer_size,
+ uint8_t trust_level,
+ uint8_t role)
+{
+ BNT_DEBUG_TRACE
+
+ errval_t err;
+
+ struct proto_trail_bind_req *t;
+ struct transmit_buffer *tb;
+ struct bulk_net_msgdesc msg;
+
+ tb = stack_alloc_alloc(&p->net_ctrl.tb_stack);
+ assert(tb != NULL);
+
+ t = tb->int_virt;
+ t->buffer_size = buffer_size;
+ t->role = role;
+ t->trust_level = trust_level;
+ t->type = PROTO_BIND_REQUEST;
+
+ msg.parts[1].phys = tb->int_phys;
+ msg.parts[1].size = sizeof(*t);
+ msg.parts[1].opaque = tb;
+ msg.parts[2].size = 0;
+
+ bulk_net_transfer_add_header(&msg);
+ err = bulk_e10k_send(&p->net_ctrl.transfer, &msg);
+ assert(err_is_ok(err));
+}
+
+static void handle_bind_request(struct bulk_net_transp *p,
+ struct proto_trail_bind_req *t,
+ struct bulk_net_msgdesc *msg)
+{
+ BNT_DEBUG_TRACE
+
+ errval_t err;
+
+ struct receive_buffer *rb = msg->parts[0].opaque;
+ struct packet_header *hdr = rb->hdr_virt;
+
+ if (p->bound) {
+ debug_printf("Already bound!\n");
+ goto free;
+ }
+
+ p->net_ctrl.r_mac = 0;
+ memcpy(&p->net_ctrl.r_mac, hdr->l2.smac, 6);
+ p->net_ctrl.r_ip = ntohl(hdr->l3.s_ip);
+ p->net_ctrl.r_port = ntohs(hdr->l4.s_port);
+
+ bulk_net_transfer_update_tx_headers(&p->net_ctrl);
+
+ if (t->buffer_size != p->net_ctrl.buffer_size) {
+ BNT_DEBUG("ERROR: buffer sizes do not match: %i", t->buffer_size);
+ err = BULK_TRANSFER_ALLOC_BUFFER_SIZE;
+ goto send_and_free;
+ }
+
+ err = p->channel->callbacks->bind_received(p->channel);
+ if (err_is_fail(err)) {
+ BNT_DEBUG("ERROR: bind request rejected. %s", err_getstring(err));
+ goto send_and_free;
+ } else {
+ p->bound = true;
+ }
+
+ if (p->channel->role == BULK_ROLE_GENERIC) {
+ if (t->role == BULK_ROLE_GENERIC) {
+ p->channel->role = BULK_ROLE_MASTER;
+ } else {
+ p->channel->role = bulk_role_other(t->role);
+ }
+ }
+
+ if (p->channel->trust != t->trust_level) {
+ /* TODO: chose appropriate trust level */
+ p->channel->trust = BULK_TRUST_NONE;
+ }
+
+ p->channel->state = BULK_STATE_CONNECTED;
+
+ send_and_free: send_bind_response(
+ p, p->net_ctrl.buffer_size, p->channel->meta_size,
+ bulk_direction_other(p->channel->direction),
+ bulk_role_other(p->channel->role), p->channel->trust, err);
+
+ free: bulk_net_transfer_free_rx(&p->net_ctrl, msg);
+}
+
+/* -------------------------- pool assignment -------------------------------*/
+
+static void send_pool_assign_response(struct bulk_net_transp *p,
+ errval_t err,
+ struct bulk_pool *pool)
+{
+ BNT_DEBUG_TRACE
+
+ struct proto_trail_pool_resp *t;
+ struct transmit_buffer *tb;
+ struct bulk_net_msgdesc msg;
+
+ tb = stack_alloc_alloc(&p->net_ctrl.tb_stack);
+ assert(tb != NULL);
+
+ t = tb->int_virt;
+ t->err = err;
+ t->pool_domain_id = pool->id.dom;
+ t->pool_machine_id = pool->id.machine;
+ t->pool_local_id = pool->id.local;
+ t->type = PROTO_POOL_RESPONSE;
+
+ msg.parts[1].phys = tb->int_phys;
+ msg.parts[1].size = sizeof(*t);
+ msg.parts[1].opaque = tb;
+ msg.parts[2].size = 0;
+
+ bulk_net_transfer_add_header(&msg);
+ err = bulk_e10k_send(&p->net_ctrl.transfer, &msg);
+ assert(err_is_ok(err));
+}
+
+static void handle_pool_assign_response(struct bulk_net_transp *p,
+ struct proto_trail_pool_resp *t,
+ struct bulk_net_msgdesc *msg)
+{
+ BNT_DEBUG_TRACE
+
+ errval_t err;
+
+ struct pending_pool_request *ppr = p->pending_pool_requests;
+ struct pending_pool_request *prev = NULL;
+
+ struct bulk_pool_id id = {
+ .dom = t->pool_domain_id,
+ .machine = t->pool_machine_id,
+ .local = t->pool_local_id };
+
+ while (ppr) {
+ if (bulk_pool_cmp_id(&id, &ppr->pool->id) == 0) {
+ if (prev == NULL) {
+ p->pending_pool_requests = ppr->next;
+ } else {
+ prev->next = ppr->next;
+ }
+ break;
+ }
+ prev = ppr;
+ ppr = ppr->next;
+ }
+ if (ppr == NULL) {
+ BNT_DEBUG("NOTICE: no pending binding request (ignored). [%i, %i]",
+ (uint32_t )id.dom, id.local);
+ goto free;
+ }
+
+ struct bulk_pool *pool = ppr->pool;
+ if (err_is_fail(t->err)) {
+ DEBUG_ERR(t->err, "no assignment of the pool");
+ goto free;
+ }
+
+ err = bulk_pool_assign(pool, p->channel);
+ if (err_is_fail(err)) {
+ DEBUG_ERR(err, "assignment to the channel failed");
+ goto free;
+ }
+
+ free: if (ppr->cont.handler) {
+ ppr->cont.handler(ppr->cont.arg, t->err, p->channel);
+ }
+
+ free(ppr);
+ bulk_net_transfer_free_rx(&p->net_ctrl, msg);
+}
+
+static void send_pool_assign_request(struct bulk_net_transp *p,
+ struct bulk_pool *pool)
+{
+ BNT_DEBUG_TRACE
+
+ errval_t err;
+
+ struct proto_trail_pool_req *t;
+ struct transmit_buffer *tb;
+ struct bulk_net_msgdesc msg;
+
+ tb = stack_alloc_alloc(&p->net_ctrl.tb_stack);
+ assert(tb != NULL);
+
+ t = tb->int_virt;
+ t->buffer_size = pool->buffer_size;
+ t->buffer_count = pool->num_buffers;
+ t->pool_domain_id = pool->id.dom;
+ t->pool_machine_id = pool->id.machine;
+ t->pool_local_id = pool->id.local;
+ t->type = PROTO_POOL_REQUEST;
+
+ msg.parts[1].phys = tb->int_phys;
+ msg.parts[1].size = sizeof(*t);
+ msg.parts[1].opaque = tb;
+ msg.parts[2].size = 0;
+
+ bulk_net_transfer_add_header(&msg);
+ err = bulk_e10k_send(&p->net_ctrl.transfer, &msg);
+ assert(err_is_ok(err));
+
+}
+
+static void handle_pool_assign_request(struct bulk_net_transp *p,
+ struct proto_trail_pool_req *t,
+ struct bulk_net_msgdesc *msg)
+{
+ BNT_DEBUG_TRACE
+
+ errval_t err;
+
+ struct bulk_pool_id id = {
+ .dom = t->pool_domain_id,
+ .machine = t->pool_machine_id,
+ .local = t->pool_local_id };
+
+ struct bulk_pool *pool = bulk_pool_domain_list_get(&id);
+
+ uint8_t first_assignment = 0;
+
+ if (pool == NULL) {
+ struct bulk_allocator pool_alloc;
+
+ /* TODO: pool constraints */
+
+ err = bulk_alloc_init(&pool_alloc, t->buffer_count, t->buffer_size,
+ NULL);
+ if (err_is_fail(err)) {
+ DEBUG_ERR(err, "Failed to allocate memory for the pool\n");
+ goto send_and_free;
+ }
+ /* overwrite the ID */
+ pool_alloc.pool->id = id;
+
+ pool = pool_alloc.pool;
+
+ first_assignment = 1;
+
+ /* TODO: Free the allocator resources */
+
+ BNT_DEBUG("Pool created. [%i, %i]", (uint32_t )id.dom, id.local)
+ } else {
+ BNT_DEBUG("Pool reuse. [%i, %i]", (uint32_t )id.dom, id.local)
+ }
+
+ err = p->channel->callbacks->pool_assigned(p->channel, pool);
+ if (err_is_fail(err)) {
+ DEBUG_ERR(err, "User revokes pool assignment\n");
+ if (first_assignment) {
+ bulk_pool_domain_list_remove(pool);
+ }
+ bulk_pool_dealloc(pool);
+ goto send_and_free;
+ }
+
+ err = bulk_pool_assign(pool, p->channel);
+ assert(!err_is_fail(err)); // should not fail
+
+ send_and_free: send_pool_assign_response(p, err, pool);
+
+ bulk_net_transfer_free_rx(&p->net_ctrl, msg);
+}
+
+/* ---------------------------- move operation ----------------------------- */
+
+static void send_buffer_move(struct bulk_net_transp *p,
+ struct bulk_buffer *b,
+ void *meta,
+ struct bulk_continuation cont)
+{
+ BNT_DEBUG_TRACE
+
+ errval_t err;
+ struct proto_trail_move *t;
+ struct transmit_buffer *tb_d, *tb;
+ struct bulk_net_msgdesc msg;
+
+ tb_d = stack_alloc_alloc(&p->net_ctrl.tb_stack);
+ assert(tb_d != NULL);
+ tb_d->buffer = b;
+ tb_d->is_copy = false;
+ tb_d->cont = cont;
+
+ // prepare trailer
+ tb = stack_alloc_alloc(&p->net_ctrl.tb_stack);
+ assert(tb != NULL);
+
+ if (meta != NULL) {
+ memcpy(tb->int_virt, meta, p->channel->meta_size);
+ } else {
+ memset(tb->int_virt, 0, p->channel->meta_size);
+ }
+ t = (void *) ((uint8_t *) tb->int_virt + p->channel->meta_size);
+ t->type = PROTO_BUFFER_MOVE;
+ t->pool_domain_id = b->pool->id.dom;
+ t->pool_local_id = b->pool->id.local;
+ t->pool_machine_id = b->pool->id.machine;
+ t->buffer_id = ((lvaddr_t) b->address - b->pool->base_address)
+ / b->pool->buffer_size;
+
+ msg.parts[1].phys = b->phys;
+ msg.parts[1].size = p->net_ctrl.buffer_size;
+ msg.parts[1].opaque = tb_d;
+ msg.parts[2].phys = tb->int_phys;
+ msg.parts[2].size = sizeof(*t) + p->channel->meta_size;
+ msg.parts[2].opaque = tb;
+ msg.parts[3].size = 0;
+
+ bulk_net_transfer_add_header(&msg);
+ err = bulk_e10k_send(&p->net_ctrl.transfer, &msg);
+ assert(err_is_ok(err));
+}
+
+static void handle_buffer_move(struct bulk_net_transp *p,
+ struct proto_trail_move *t,
+ struct bulk_net_msgdesc *msg)
+{
+ BNT_DEBUG_TRACE
+
+ errval_t err;
+ struct receive_buffer *rb;
+
+ struct bulk_pool_id id = {
+ .machine = t->pool_machine_id,
+ .local = t->pool_local_id,
+ .dom = t->pool_domain_id, };
+
+ struct bulk_buffer *buf = get_buffer(p->channel, &id, t->buffer_id);
+
+ assert(buf);
+
+ err = bulk_buffer_change_state(buf, BULK_BUFFER_READ_WRITE);
+ assert(!err_is_fail(err));
+
+ rb = msg->parts[1].opaque;
+ memcpy(buf->address, rb->virt, buf->pool->buffer_size);
+ bulk_net_transfer_free_rb(&p->net_ctrl, rb);
+
+ rb = msg->parts[2].opaque;
+
+ p->channel->callbacks->move_received(p->channel, buf, rb->virt);
+
+ bulk_net_transfer_free_rb(&p->net_ctrl, rb);
+}
+
+/* ----------------------------- copy operation ---------------------------- */
+
+static void send_buffer_copy(struct bulk_net_transp *p,
+ struct bulk_buffer *b,
+ void *meta,
+ struct bulk_continuation cont)
+{
+ BNT_DEBUG_TRACE
+
+ errval_t err;
+ struct proto_trail_move *t;
+ struct transmit_buffer *tb_d, *tb;
+ struct bulk_net_msgdesc msg;
+
+ tb_d = stack_alloc_alloc(&p->net_ctrl.tb_stack);
+ assert(tb_d != NULL);
+ tb_d->buffer = b;
+ tb_d->is_copy = true;
+ tb_d->cont = cont;
+
+ // prepare trailer
+ tb = stack_alloc_alloc(&p->net_ctrl.tb_stack);
+ assert(tb != NULL);
+
+ if (meta != NULL) {
+ memcpy(tb->int_virt, meta, p->channel->meta_size);
+ } else {
+ memset(tb->int_virt, 0, p->channel->meta_size);
+ }
+ t = (void *) ((uint8_t *) tb->int_virt + p->channel->meta_size);
+ t->type = PROTO_BUFFER_COPY;
+ t->pool_domain_id = b->pool->id.dom;
+ t->pool_local_id = b->pool->id.local;
+ t->pool_machine_id = b->pool->id.machine;
+ t->buffer_id = ((lvaddr_t) b->address - b->pool->base_address)
+ / b->pool->buffer_size;
+
+ msg.parts[1].phys = b->phys;
+ msg.parts[1].size = p->net_ctrl.buffer_size;
+ msg.parts[1].opaque = tb_d;
+ msg.parts[2].phys = tb->int_phys;
+ msg.parts[2].size = sizeof(*t) + p->channel->meta_size;
+ msg.parts[2].opaque = tb;
+ msg.parts[3].size = 0;
+
+ bulk_net_transfer_add_header(&msg);
+ err = bulk_e10k_send(&p->net_ctrl.transfer, &msg);
+ assert(err_is_ok(err));
+}
+
+static void handle_buffer_copy(struct bulk_net_transp *p,
+ struct proto_trail_copy *t,
+ struct bulk_net_msgdesc *msg)
+{
+ BNT_DEBUG_TRACE
+
+ errval_t err;
+ struct receive_buffer *rb;
+
+ struct bulk_pool_id id = {
+ .machine = t->pool_machine_id,
+ .local = t->pool_local_id,
+ .dom = t->pool_domain_id, };
+
+ struct bulk_buffer *buf = get_buffer(p->channel, &id, t->buffer_id);
+
+ BNT_DEBUG("buf=%p, id=%i", buf, t->buffer_id);
+
+ assert(buf);
+ err = bulk_buffer_change_state(buf, BULK_BUFFER_READ_WRITE);
+ assert(!err_is_fail(err));
+
+ rb = msg->parts[1].opaque;
+ memcpy(buf->address, rb->virt, buf->pool->buffer_size);
+ bulk_net_transfer_free_rb(&p->net_ctrl, rb);
+
+ rb = msg->parts[2].opaque;
+
+ enum bulk_buffer_state st = BULK_BUFFER_READ_ONLY;
+ if (bulk_buffer_is_owner(buf)) {
+ st = BULK_BUFFER_RO_OWNED;
+ }
+ err = bulk_buffer_change_state(buf, st);
+ assert(!err_is_fail(err));
+
+ p->channel->callbacks->copy_received(p->channel, buf, rb->virt);
+
+ bulk_net_transfer_free_rb(&p->net_ctrl, rb);
+}
+
+/* ------------------------------ pass operation --------------------------- */
+
+static void send_buffer_pass(struct bulk_net_transp *p,
+ struct bulk_buffer *b,
+ void *meta,
+ struct bulk_continuation cont)
+{
+ BNT_DEBUG_TRACE
+
+ errval_t err;
+ struct proto_trail_move *t;
+ struct transmit_buffer *tb;
+ struct bulk_net_msgdesc msg;
+
+ // prepare trailer
+ tb = stack_alloc_alloc(&p->net_ctrl.tb_stack);
+ assert(tb != NULL);
+ tb->cont = cont;
+ tb->buffer = b;
+ if (meta != NULL) {
+ memcpy(tb->int_virt, meta, p->channel->meta_size);
+ } else {
+ memset(tb->int_virt, 0, p->channel->meta_size);
+ }
+ t = (void *) ((uint8_t *) tb->int_virt + p->channel->meta_size);
+ t->type = PROTO_BUFFER_PASS;
+ t->pool_domain_id = b->pool->id.dom;
+ t->pool_local_id = b->pool->id.local;
+ t->pool_machine_id = b->pool->id.machine;
+ t->buffer_id = ((lvaddr_t) b->address - b->pool->base_address)
+ / b->pool->buffer_size;
+
+ msg.parts[1].phys = tb->int_phys;
+ msg.parts[1].size = sizeof(*t) + p->channel->meta_size;
+ msg.parts[1].opaque = tb;
+ msg.parts[2].size = 0;
+
+ bulk_net_transfer_add_header(&msg);
+ err = bulk_e10k_send(&p->net_ctrl.transfer, &msg);
+ assert(err_is_ok(err));
+}
+
+static void handle_buffer_pass(struct bulk_net_transp *p,
+ struct proto_trail_pass *t,
+ struct bulk_net_msgdesc *msg)
+{
+ BNT_DEBUG_TRACE
+
+ errval_t err;
+ struct receive_buffer *rb;
+
+ struct bulk_pool_id id = {
+ .machine = t->pool_machine_id,
+ .local = t->pool_local_id,
+ .dom = t->pool_domain_id, };
+
+ struct bulk_buffer *buf = get_buffer(p->channel, &id, t->buffer_id);
+
+ assert(buf);
+
+ err = bulk_buffer_change_state(buf, BULK_BUFFER_READ_WRITE);
+ assert(!err_is_fail(err));
+
+ rb = msg->parts[1].opaque;
+
+ p->channel->callbacks->buffer_received(p->channel, buf, rb->virt);
+
+ bulk_net_transfer_free_rb(&p->net_ctrl, rb);
+}
+
+/* ----------------------------- release operation ------------------------- */
+
+static void send_buffer_release(struct bulk_net_transp *p,
+ struct bulk_buffer *b,
+ void *meta,
+ struct bulk_continuation cont)
+{
+ BNT_DEBUG_TRACE
+
+ errval_t err;
+ struct proto_trail_move *t;
+ struct transmit_buffer *tb;
+ struct bulk_net_msgdesc msg;
+
+ // prepare trailer
+ tb = stack_alloc_alloc(&p->net_ctrl.tb_stack);
+ assert(tb != NULL);
+ tb->cont = cont;
+ tb->buffer = b;
+ if (meta != NULL) {
+ memcpy(tb->int_virt, meta, p->channel->meta_size);
+ } else {
+ memset(tb->int_virt, 0, p->channel->meta_size);
+ }
+ t = (void *) ((uint8_t *) tb->int_virt + p->channel->meta_size);
+ t->type = PROTO_BUFFER_RELEASE;
+ t->pool_domain_id = b->pool->id.dom;
+ t->pool_local_id = b->pool->id.local;
+ t->pool_machine_id = b->pool->id.machine;
+ t->buffer_id = ((lvaddr_t) b->address - b->pool->base_address)
+ / b->pool->buffer_size;
+
+ msg.parts[1].phys = tb->int_phys;
+ msg.parts[1].size = sizeof(*t) + p->channel->meta_size;
+ msg.parts[1].opaque = tb;
+ msg.parts[2].size = 0;
+
+ bulk_net_transfer_add_header(&msg);
+ err = bulk_e10k_send(&p->net_ctrl.transfer, &msg);
+ assert(err_is_ok(err));
+}
+
+static void handle_buffer_release(struct bulk_net_transp *p,
+ struct proto_trail_release *t,
+ struct bulk_net_msgdesc *msg)
+{
+ BNT_DEBUG_TRACE
+
+ errval_t err;
+ struct receive_buffer *rb;
+
+ struct bulk_pool_id id = {
+ .machine = t->pool_machine_id,
+ .local = t->pool_local_id,
+ .dom = t->pool_domain_id, };
+
+ struct bulk_buffer *buf = get_buffer(p->channel, &id, t->buffer_id);
+
+ assert(buf);
+
+ rb = msg->parts[1].opaque;
+
+ buf->local_ref_count--;
+
+ if (buf->state == BULK_BUFFER_RO_OWNED && bulk_buffer_can_release(buf)) {
+ err = bulk_buffer_change_state(buf, BULK_BUFFER_READ_WRITE);
+ assert(!err_is_fail(err));
+ }
+
+ p->channel->callbacks->copy_released(p->channel, buf);
+
+ bulk_net_transfer_free_rb(&p->net_ctrl, rb);
+}
+
+/* ---------------------------- status message ----------------------------- */
+static void send_status_msg(void)
+{
+ BNT_DEBUG_TRACE
+
+}
+
+static void handle_status_msg(struct bulk_net_transp *p,
+ struct proto_trail_status *t,
+ struct bulk_net_msgdesc *msg)
+{
+ BNT_DEBUG_TRACE
+
+}
+
+/* ------------------------ network managements ---------------------------- */
+
+static void tcb_transmitted(struct bulk_e10k *bu, void *opaque)
+{
+ struct bulk_net_transp *p = bu->opaque;
+ struct transmit_buffer *tb = opaque;
+
+ if (opaque == NULL) {
+ // We can ignore the header buffers
+ return;
+ }
+
+ if (tb->buffer != NULL) {
+ if (tb->cont.handler) {
+ tb->cont.handler(tb->cont.arg, SYS_ERR_OK, p->channel);
+ }
+ tb->buffer = NULL;
+ tb->cont = BULK_CONT_NOP;
+ }
+ assert(tb != NULL);
+ stack_alloc_free(&p->net_ctrl.tb_stack, tb);
+}
+
+static void tcb_received(struct bulk_e10k* bu, struct bulk_net_msgdesc *msg)
+{
+ struct bulk_net_transp *p = bu->opaque;
+ size_t i;
+ struct receive_buffer *rb;
+ uint8_t *t;
+
+ assert(msg->parts[0].size == sizeof(struct packet_header));
+ bulk_net_transfer_strip_padding(msg);
+
+ for (i = 0; i < BULK_NET_DESCLEN && msg->parts[i].size != 0; i++)
+ ;
+ i--;
+
+ rb = msg->parts[i].opaque;
+ t = rb->virt;
+
+ switch (t[msg->parts[i].size - 1]) {
+ case PROTO_BIND_REQUEST:
+ handle_bind_request(
+ p,
+ (struct proto_trail_bind_req *) (t
+ + msg->parts[i].size
+ - sizeof(struct proto_trail_bind_req)),
+ msg);
+ break;
+ case PROTO_BIND_RESPONSE:
+ handle_bind_response(
+ p,
+ (struct proto_trail_bind_resp *) (t
+ + msg->parts[i].size
+ - sizeof(struct proto_trail_bind_resp)),
+ msg);
+ break;
+ case PROTO_POOL_REQUEST:
+ handle_pool_assign_request(
+ p,
+ (struct proto_trail_pool_req *) (t
+ + msg->parts[i].size
+ - sizeof(struct proto_trail_pool_req)),
+ msg);
+ break;
+ case PROTO_POOL_RESPONSE:
+ handle_pool_assign_response(
+ p,
+ (struct proto_trail_pool_resp *) (t
+ + msg->parts[i].size
+ - sizeof(struct proto_trail_pool_resp)),
+ msg);
+ break;
+ case PROTO_BUFFER_MOVE:
+ handle_buffer_move(
+ p,
+ (struct proto_trail_move *) (t + msg->parts[i].size
+ - sizeof(struct proto_trail_move)),
+ msg);
+ break;
+ case PROTO_BUFFER_COPY:
+ handle_buffer_copy(
+ p,
+ (struct proto_trail_copy *) (t + msg->parts[i].size
+ - sizeof(struct proto_trail_copy)),
+ msg);
+ break;
+ case PROTO_BUFFER_PASS:
+ handle_buffer_pass(
+ p,
+ (struct proto_trail_pass *) (t + msg->parts[i].size
+ - sizeof(struct proto_trail_pass)),
+ msg);
+ break;
+ case PROTO_BUFFER_RELEASE:
+ handle_buffer_release(
+ p,
+ (struct proto_trail_release *) (t
+ + msg->parts[i].size
+ - sizeof(struct proto_trail_release)),
+ msg);
+ break;
+ case PROTO_STATUS:
+ handle_status_msg(
+ p,
+ (struct proto_trail_status *) (t
+ + msg->parts[i].size
+ - sizeof(struct proto_trail_status)),
+ msg);
+ break;
+ default:
+ USER_PANIC("Unsupported Request")
+ break;
+
+ }
+}
+
+/* --------------------- implementation callbacks -------------------------- */
+
+static errval_t impl_channel_create(struct bulk_channel *channel)
+{
+ errval_t err;
+
+ BNT_STATUS("Creating new bulk channel [%p] using net.transparent backend",
+ channel);
+
+ struct bulk_net_transp *p = malloc(sizeof(struct bulk_net_transp));
+ if (p == NULL) {
+ return BULK_TRANSFER_MEM;
+ }
+
+ struct bulk_net_endpoint_descriptor *ep =
+ (struct bulk_net_endpoint_descriptor *) channel->ep;
+
+ p->net_ctrl.card = ep->cardname;
+ p->net_ctrl.l_port = ep->port;
+ p->net_ctrl.queue = ep->queue;
+ p->net_ctrl.ws = channel->waitset;
+ p->net_ctrl.buffer_size = ep->buffer_size;
+ p->net_ctrl.buffer_count = ep->buffer_count;
+ p->net_ctrl.max_queues = ep->max_queues;
+ p->net_ctrl.num_queues = 1;
+
+ channel->state = BULK_STATE_BINDING;
+
+ err = bulk_net_transfer_export(&p->net_ctrl, tcb_transmitted, tcb_received);
+ if (err_is_fail(err)) {
+ free(p);
+ return err;
+ }
+
+ p->channel = channel;
+ channel->impl_data = p;
+
+ p->zero_meta = calloc(1, p->channel->meta_size);
+
+ return err;
+}
+
+static errval_t impl_channel_bind(struct bulk_channel *channel,
+ struct bulk_continuation cont)
+{
+ errval_t err;
+
+ BNT_STATUS("Binding new bulk channel [%p] using net.transparent backend",
+ channel);
+
+ struct bulk_net_transp *bnt = malloc(sizeof(struct bulk_net_transp));
+ if (!bnt) {
+ return BULK_TRANSFER_MEM;
+ }
+
+ struct bulk_net_endpoint_descriptor *ep =
+ (struct bulk_net_endpoint_descriptor *) channel->ep;
+
+ bnt->net_ctrl.card = ep->cardname;
+ bnt->net_ctrl.r_port = ep->port;
+ bnt->net_ctrl.r_ip = ep->ip.addr;
+ bnt->net_ctrl.queue = ep->queue;
+ bnt->net_ctrl.ws = channel->waitset;
+ bnt->net_ctrl.buffer_size = ep->buffer_size;
+ bnt->net_ctrl.buffer_count = ep->buffer_count;
+ bnt->net_ctrl.max_queues = ep->max_queues;
+ bnt->net_ctrl.num_queues = 1;
+
+ err = bulk_net_transfer_bind(&bnt->net_ctrl, tcb_transmitted, tcb_received);
+ if (err_is_fail(err)) {
+ free(bnt);
+ return err;
+ }
+
+ channel->impl_data = bnt;
+ bnt->channel = channel;
+ channel->state = BULK_STATE_BINDING;
+ bnt->bind_cont = cont;
+
+ send_bind_request(bnt, bnt->net_ctrl.buffer_size, channel->trust,
+ channel->role);
+ send_status_msg();
+
+ return SYS_ERR_OK;
+}
+
+static errval_t impl_channel_assign_pool(struct bulk_channel *channel,
+ struct bulk_pool *pool,
+ struct bulk_continuation cont)
+{
+
+ struct bulk_net_transp *bnt = (struct bulk_net_transp *) channel->impl_data;
+
+ if (bnt->net_ctrl.buffer_size != pool->buffer_size) {
+ /* TODO: change to buffer size */
+ debug_printf("ERROR: only pools with matching buffer size can be assigned\n");
+ return BULK_TRANSFER_ALLOC_BUFFER_SIZE;
+ }
+
+ struct pending_pool_request *req = malloc(
+ sizeof(struct pending_pool_request));
+ if (!req) {
+ return BULK_TRANSFER_MEM;
+ }
+
+ req->cont = cont;
+ req->pool = pool;
+ if (bnt->pending_pool_requests) {
+ req->next = bnt->pending_pool_requests->next;
+ } else {
+ req->next = NULL;
+ }
+ bnt->pending_pool_requests = req;
+
+ send_pool_assign_request(bnt, pool);
+
+ return SYS_ERR_OK;
+}
+
+static errval_t impl_channel_move(struct bulk_channel *channel,
+ struct bulk_buffer *buffer,
+ void *meta,
+ struct bulk_continuation cont)
+{
+ struct bulk_net_transp *bnt = (struct bulk_net_transp *) channel->impl_data;
+ send_buffer_move(bnt, buffer, meta, cont);
+ return SYS_ERR_OK;
+}
+
+/**
+ *
+ */
+static errval_t impl_channel_pass(struct bulk_channel *channel,
+ struct bulk_buffer *buffer,
+ void *meta,
+ struct bulk_continuation cont)
+{
+ struct bulk_net_transp *bnt = (struct bulk_net_transp *) channel->impl_data;
+ send_buffer_pass(bnt, buffer, meta, cont);
+ return SYS_ERR_OK;
+}
+
+static errval_t impl_channel_copy(struct bulk_channel *channel,
+ struct bulk_buffer *buffer,
+ void *meta,
+ struct bulk_continuation cont)
+{
+ struct bulk_net_transp *bnt = (struct bulk_net_transp *) channel->impl_data;
+ send_buffer_copy(bnt, buffer, meta, cont);
+ return SYS_ERR_OK;
+}
+
+static errval_t impl_channel_release(struct bulk_channel *channel,
+ struct bulk_buffer *buffer,
+ struct bulk_continuation cont)
+{
+ struct bulk_net_transp *bnt = (struct bulk_net_transp *) channel->impl_data;
+ send_buffer_release(bnt, buffer, bnt->zero_meta, cont);
+ return SYS_ERR_OK;
+}
+
+static struct bulk_implementation bulk_net_implementation = {
+ .channel_create = impl_channel_create,
+ .channel_bind = impl_channel_bind,
+ .assign_pool = impl_channel_assign_pool,
+ .move = impl_channel_move,
+ .pass = impl_channel_pass,
+ .copy = impl_channel_copy,
+ .release = impl_channel_release };
+
+struct bulk_implementation *bulk_net_get_impl(void)
+{
+ return &bulk_net_implementation;
+}
+
--- /dev/null
+/*
+ * Copyright (c) 2007-2011, ETH Zurich.
+ * All rights reserved.
+ *
+ * This file is distributed under the terms in the attached LICENSE file.
+ * If you do not find this file, copies can be found by writing to:
+ * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
+ */
+
+#ifndef E10K_QUEUE_H_
+#define E10K_QUEUE_H_
+
+#include <string.h>
+#include <stdlib.h>
+#include <arch/x86/barrelfish_kpi/asm_inlines_arch.h>
+#include <net_interfaces/flags.h>
+
+#include "e10k_q_dev.h"
+
+struct e10k_queue_ops {
+ errval_t (*update_txtail)(void*, size_t);
+ errval_t (*update_rxtail)(void*, size_t);
+};
+
+/**
+ * Context structure for RX descriptors. This is needed to implement RSC, since
+ * we need to be able to chain buffers together. */
+struct e10k_queue_rxctx {
+ void *opaque;
+ struct e10k_queue_rxctx *previous;
+ bool used;
+};
+
+struct e10k_queue {
+ // FIXME: Look for appropriate type for the _head/tail/size fields
+ e10k_q_tdesc_adv_wb_array_t* tx_ring;
+ void** tx_opaque;
+ bool* tx_isctx;
+ size_t tx_head;
+ size_t tx_tail;
+ size_t tx_size;
+ uint32_t* tx_hwb;
+
+ e10k_q_rdesc_adv_wb_array_t* rx_ring;
+ struct e10k_queue_rxctx* rx_context;
+ bool rx_first;
+ size_t rx_head;
+ size_t rx_tail;
+ size_t rx_size;
+
+ struct e10k_queue_ops ops;
+ void* opaque;
+};
+
+typedef struct e10k_queue e10k_queue_t;
+
+static inline e10k_queue_t* e10k_queue_init(void* tx, size_t tx_size,
+ uint32_t* tx_hwb, void* rx, size_t rx_size, struct e10k_queue_ops* ops,
+ void* opaque)
+{
+ e10k_queue_t* q = malloc(sizeof(*q));
+
+ q->tx_ring = tx;
+ q->tx_opaque = calloc(tx_size, sizeof(void*));
+ q->tx_isctx = calloc(tx_size, sizeof(bool));
+ q->tx_head = 0;
+ q->tx_tail = 0;
+ q->tx_size = tx_size;
+ q->tx_hwb = tx_hwb;
+
+ q->rx_ring = rx;
+ q->rx_context = calloc(rx_size, sizeof(*q->rx_context));
+ q->rx_first = true;
+ q->rx_head = 0;
+ q->rx_tail = 0;
+ q->rx_size = rx_size;
+
+ q->ops = *ops;
+ q->opaque = opaque;
+
+ // Initialize ring memory with zero
+ memset(tx, 0, tx_size * e10k_q_tdesc_adv_wb_size);
+ memset(rx, 0, rx_size * e10k_q_rdesc_adv_wb_size);
+
+ return q;
+}
+
+static inline int e10k_queue_add_txcontext(e10k_queue_t* q, uint8_t idx,
+ uint8_t maclen, uint16_t iplen, uint8_t l4len, e10k_q_l4_type_t l4t)
+{
+ e10k_q_tdesc_adv_ctx_t d;
+ size_t tail = q->tx_tail;
+
+ // TODO: Check if there is room in the queue
+ q->tx_isctx[tail] = true;
+ d = q->tx_ring[tail];
+
+ e10k_q_tdesc_adv_rd_dtyp_insert(d, e10k_q_adv_ctx);
+ e10k_q_tdesc_adv_rd_dext_insert(d, 1);
+
+ e10k_q_tdesc_adv_ctx_idx_insert(d, idx);
+ e10k_q_tdesc_adv_ctx_maclen_insert(d, maclen);
+ e10k_q_tdesc_adv_ctx_iplen_insert(d, iplen);
+ e10k_q_tdesc_adv_ctx_ipv4_insert(d, 1);
+ e10k_q_tdesc_adv_ctx_l4len_insert(d, l4len);
+ e10k_q_tdesc_adv_ctx_l4t_insert(d, l4t);
+
+ q->tx_tail = (tail + 1) % q->tx_size;
+ return 0;
+}
+
+
+static inline int e10k_queue_add_txbuf_ctx(e10k_queue_t* q, uint64_t phys,
+ size_t len, void* opaque, int first, int last, size_t totallen,
+ uint8_t ctx, bool ixsm, bool txsm)
+{
+ e10k_q_tdesc_adv_rd_t d;
+ size_t tail = q->tx_tail;
+
+ // TODO: Check if there is room in the queue
+ q->tx_isctx[tail] = false;
+ q->tx_opaque[tail] = opaque;
+ d = q->tx_ring[tail];
+
+ e10k_q_tdesc_adv_rd_buffer_insert(d, phys);
+ e10k_q_tdesc_adv_rd_dtalen_insert(d, len);
+ if (first) {
+ e10k_q_tdesc_adv_rd_paylen_insert(d, totallen);
+ }
+ e10k_q_tdesc_adv_rd_dtyp_insert(d, e10k_q_adv_data);
+ e10k_q_tdesc_adv_rd_dext_insert(d, 1);
+ e10k_q_tdesc_adv_rd_rs_insert(d, (last == 1));
+ e10k_q_tdesc_adv_rd_ifcs_insert(d, 1);
+ e10k_q_tdesc_adv_rd_eop_insert(d, last);
+
+ if (ctx != -1U) {
+ e10k_q_tdesc_adv_rd_idx_insert(d, ctx);
+ e10k_q_tdesc_adv_rd_cc_insert(d, 1);
+ e10k_q_tdesc_adv_rd_ixsm_insert(d, ixsm);
+ e10k_q_tdesc_adv_rd_txsm_insert(d, txsm);
+ }
+
+ q->tx_tail = (tail + 1) % q->tx_size;
+ return 0;
+}
+
+static inline int e10k_queue_add_txbuf(e10k_queue_t* q, uint64_t phys,
+ size_t len, void* opaque, int first, int last, size_t totallen)
+{
+ return e10k_queue_add_txbuf_ctx(q, phys, len, opaque, first, last, totallen,
+ -1, false, false);
+}
+
+static inline int e10k_queue_get_txpoll(e10k_queue_t* q)
+{
+ e10k_q_tdesc_adv_wb_t d;
+ size_t head = q->tx_head;
+
+ // If HWB is enabled, we can skip reading the descriptor if nothing happened
+ if (q->tx_hwb && *q->tx_hwb == head) {
+ return 1;
+ }
+
+ d = q->tx_ring[head];
+ if (!q->tx_hwb && !e10k_q_tdesc_adv_wb_dd_extract(d)) {
+ return 1;
+ }
+
+ if (!q->tx_isctx[head]) {
+ return 0;
+ }
+
+ memset(d, 0, e10k_q_tdesc_adv_wb_size);
+ q->tx_head = (head + 1) % q->tx_size;
+ return 1;
+}
+
+static inline int e10k_queue_get_txbuf(e10k_queue_t* q, void** opaque)
+{
+ e10k_q_tdesc_adv_wb_t d;
+ size_t head = q->tx_head;
+
+ if (e10k_queue_get_txpoll(q)) {
+ return 1;
+ }
+
+ d = q->tx_ring[head];
+ *opaque = q->tx_opaque[head];
+
+ memset(d, 0, e10k_q_tdesc_adv_wb_size);
+ q->tx_head = (head + 1) % q->tx_size;
+ return 0;
+}
+
+static inline errval_t e10k_queue_bump_txtail(e10k_queue_t* q)
+{
+ return q->ops.update_txtail(q->opaque, q->tx_tail);
+}
+
+static inline size_t e10k_queue_free_txslots(e10k_queue_t* q)
+{
+ size_t head = q->tx_head;
+ size_t tail = q->tx_tail;
+ size_t size = q->tx_size;
+
+ if (tail >= head) {
+ return size - (tail - head) - 1; // TODO: could this be off by 1?
+ } else {
+ return size - (tail + size - head) - 1; // TODO: off by 1?
+ }
+
+}
+
+#include <stdio.h>
+static inline int e10k_queue_add_rxbuf(e10k_queue_t* q, uint64_t phys,
+ uint64_t header, void* opaque)
+{
+ e10k_q_rdesc_adv_rd_t d;
+ size_t tail = q->rx_tail;
+ struct e10k_queue_rxctx *ctx;
+
+ ctx = q->rx_context + tail;
+ if (ctx->used) {
+ printf("e10k: Already used!\n");
+ return 1;
+ }
+
+ // TODO: Check if there is room in the queue
+ ctx->opaque = opaque;
+ ctx->used = true;
+ d = (e10k_q_rdesc_adv_rd_t) q->rx_ring[tail];
+
+ e10k_q_rdesc_adv_rd_buffer_insert(d, phys);
+ // TODO: Does this make sense for RSC?
+ e10k_q_rdesc_adv_rd_hdr_buffer_insert(d, header);
+
+ q->rx_tail = (tail + 1) % q->rx_size;
+
+ return 0;
+}
+
+static inline uint64_t e10k_queue_convert_rxflags(e10k_q_rdesc_adv_wb_t d)
+{
+ uint64_t flags = 0;
+
+ // IP checksum
+ if (e10k_q_rdesc_adv_wb_ipcs_extract(d)) {
+ flags |= NETIF_RXFLAG_IPCHECKSUM;
+ if (!e10k_q_rdesc_adv_wb_ipe_extract(d)) {
+ flags |= NETIF_RXFLAG_IPCHECKSUM_GOOD;
+ }
+ }
+
+ // L4 checksum
+ if (e10k_q_rdesc_adv_wb_l4i_extract(d)) {
+ flags |= NETIF_RXFLAG_L4CHECKSUM;
+ if (!e10k_q_rdesc_adv_wb_l4e_extract(d)) {
+ flags |= NETIF_RXFLAG_L4CHECKSUM_GOOD;
+ }
+ }
+
+ // Packet type
+ if (e10k_q_rdesc_adv_wb_pt_ipv4_extract(d)) {
+ flags |= NETIF_RXFLAG_TYPE_IPV4;
+ }
+ if (e10k_q_rdesc_adv_wb_pt_tcp_extract(d)) {
+ flags |= NETIF_RXFLAG_TYPE_TCP;
+ }
+ if (e10k_q_rdesc_adv_wb_pt_udp_extract(d)) {
+ flags |= NETIF_RXFLAG_TYPE_UDP;
+ }
+
+ return flags;
+}
+
+static inline int e10k_queue_rxpoll(e10k_queue_t* q)
+{
+ e10k_q_rdesc_adv_wb_t d;
+ size_t head;
+ struct e10k_queue_rxctx *ctx;
+ struct e10k_queue_rxctx *ctx_next;
+ size_t nextp;
+
+ while (1) {
+ head = q->rx_head;
+ d = q->rx_ring[head];
+ ctx = q->rx_context + head;
+
+ if (!e10k_q_rdesc_adv_wb_dd_extract(d)) {
+ return 1;
+ }
+
+ // Barrier needed according to linux driver to make sure nothing else is
+ // read before the dd bit TODO: make sure
+ lfence();
+
+ if (e10k_q_rdesc_adv_wb_rsccnt_extract(d)) {
+ printf("e10k.q0: Part of a large receive\n");
+ }
+
+ if (e10k_q_rdesc_adv_wb_eop_extract(d)) {
+ return 0;
+ }
+
+ // Not the last part of this packet yet
+ if (e10k_q_rdesc_adv_wb_rsccnt_extract(d)) {
+ // RSC: We just received part of a large receive (not the last
+ // packet) chain this buffer to the indicated next one
+ printf("e10k: RSC chained\n");
+ e10k_q_rdesc_adv_wb_nl_t n = d;
+ nextp = e10k_q_rdesc_adv_wb_nl_nextp_extract(n);
+ assert(nextp < q->rx_size);
+
+ ctx_next = q->rx_context + nextp;
+ assert(ctx_next->used);
+ ctx_next->previous = ctx;
+ } else {
+ ctx_next = q->rx_context + ((head + 1) % q->rx_size);
+ ctx_next->previous = ctx;
+ }
+
+ q->rx_head = (head + 1) % q->rx_size;
+ }
+}
+
+static inline size_t e10k_queue_get_rxbuf(e10k_queue_t* q, void** opaque,
+ size_t *hdr_len, size_t* len, int* last, uint64_t *flags)
+{
+ e10k_q_rdesc_adv_wb_t d;
+ size_t head;
+ struct e10k_queue_rxctx *ctx;
+ struct e10k_queue_rxctx *ctx_next;
+
+
+ if (e10k_queue_rxpoll(q)) {
+ return 1;
+ }
+
+ head = q->rx_head;
+ d = q->rx_ring[head];
+ ctx = q->rx_context + head;
+
+ // Look for first segment not returned yet
+ ctx_next = NULL;
+ while (ctx->previous != NULL) {
+ ctx_next = ctx;
+ ctx = ctx->previous;
+ }
+ if (ctx_next != NULL) {
+ ctx_next->previous = NULL;
+ }
+
+ // Flags can only be read from last descriptor, just return these for
+ // every part of the packet
+ *flags = e10k_queue_convert_rxflags(d);
+
+ // Look for corresponding descriptor
+ head = ctx - q->rx_context;
+ d = q->rx_ring[head];
+
+ // Set return values
+ // TODO: Extract status (okay/error)
+ *last = e10k_q_rdesc_adv_wb_eop_extract(d);
+ *len = e10k_q_rdesc_adv_wb_pkt_len_extract(d);
+ *opaque = ctx->opaque;
+
+ // First segment of this packet
+ if (hdr_len != NULL) {
+ *hdr_len = (q->rx_first && e10k_q_rdesc_adv_wb_sph_extract(d) ?
+ e10k_q_rdesc_adv_wb_hdr_len_extract(d) : 0);
+ }
+
+ // If it is the last segment, we can move on to the next descriptor
+ if (e10k_q_rdesc_adv_wb_eop_extract(d)) {
+ q->rx_first = true;
+ q->rx_head = (head + 1) % q->rx_size;
+ } else {
+ q->rx_first = false;
+ }
+
+ ctx->used = false;
+ memset(d, 0, e10k_q_rdesc_adv_wb_size);
+ return 0;
+}
+
+static inline errval_t e10k_queue_bump_rxtail(e10k_queue_t* q)
+{
+ return q->ops.update_rxtail(q->opaque, q->rx_tail);
+}
+
+static inline size_t e10k_queue_free_rxslots(e10k_queue_t* q)
+{
+ size_t head = q->rx_head;
+ size_t tail = q->rx_tail;
+ size_t size = q->rx_size;
+
+ if (tail >= head) {
+ return size - (tail - head) - 1; // TODO: could this be off by 1?
+ } else {
+ return size - (tail + size - head) - 1; // TODO: off by 1?
+ }
+}
+
+
+#endif // ndef E10K_QUEUE_H_
--- /dev/null
+/*
+ * Copyright (c) 2014 ETH Zurich.
+ * All rights reserved.
+ *
+ * This file is distributed under the terms in the attached LICENSE file.
+ * If you do not find this file, copies can be found by writing to:
+ * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
+ */
+
+#include <stdio.h>
+
+#include <barrelfish/barrelfish.h>
+
+#include "bulk_net_backend.h"
+
+void stack_alloc_init(struct stack_allocator *alloc, size_t size)
+{
+ alloc->size = size;
+ alloc->top = 0;
+ alloc->stack = calloc(size, sizeof(void *));
+}
+
+bool stack_alloc_free(struct stack_allocator *alloc, void *el)
+{
+ if (alloc->top >= alloc->size) {
+ return false;
+ }
+
+ alloc->stack[alloc->top++] = el;
+ return true;
+}
+
+void *stack_alloc_alloc(struct stack_allocator *alloc)
+{
+ if (alloc->top == 0) {
+ return NULL;
+ }
+ return alloc->stack[--alloc->top];
+}
+
--- /dev/null
+/*
+ * Copyright (c) 2009, 2010, 2011, 2012, ETH Zurich.
+ * All rights reserved.
+ *
+ * This file is distributed under the terms in the attached LICENSE file.
+ * If you do not find this file, copies can be found by writing to:
+ * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
+ */
+
+#include <barrelfish/barrelfish.h>
+
+#include <bulk_transfer/bulk_transfer.h>
+#include <bulk_transfer/bulk_sm.h>
+#include "../../bulk_pool.h"
+#include "../../bulk_buffer.h"
+#include "bulk_sm_impl.h"
+#include "pending_msg.h"
+
+#if 0
+#define BULK_DEBUG_PRINT(fmt, msg...) debug_printf(fmt, msg)
+#else
+#define BULK_DEBUG_PRINT(fmt, msg...)
+#endif
+
+//the same values are necessary for move, pass and copy operations
+struct pass_data {
+ uint32_t tid;
+ void *meta;
+ size_t metasize;
+ struct capref cap;
+ struct bulk_buffer *buffer;
+ struct bulk_channel *channel;
+ bulk_ctrl_poolid_t poolid;
+};
+
+static errval_t bulk_sm_move_send_request(void *a)
+{
+ struct pass_data *d = (struct pass_data*) a;
+ struct bulk_buffer *buffer = d->buffer;
+ struct bulk_channel *channel = d->channel;
+ struct bulk_ctrl_binding *b = CHANNEL_BINDING(channel);
+
+ struct event_closure txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
+ "bulk_sm_move sent");
+
+ errval_t err;
+ if (channel->trust == BULK_TRUST_NONE) {
+ err = bulk_ctrl_move_untrusted_call__tx(b, txcont, d->poolid, buffer->bufferid,
+ d->tid, d->cap, d->meta, d->metasize);
+ } else {
+ err = bulk_ctrl_move_trusted_call__tx(b, txcont, d->poolid, buffer->bufferid,
+ d->tid, d->meta, d->metasize);
+ }
+
+ if (err_is_ok(err)) {
+ free(d);
+ } else if (err_no(err) != FLOUNDER_ERR_TX_BUSY) {
+ //sending this message will never work, do not retry
+ //notify user the same way as if the other side had an error
+ bulk_sm_move_rx_response(b, err, d->tid);
+ free(d);
+ }
+ return err;
+}
+
+errval_t bulk_sm_move(struct bulk_channel *channel,
+ struct bulk_buffer *buffer,
+ void *meta,
+ struct bulk_continuation cont)
+{
+ errval_t err;
+ uint32_t tid;
+ size_t metasize = (meta) ? channel->meta_size : 0;//tolerate null pointers
+
+ //store the arguments for when we get the reply later
+ union pending_msg_data pmsg = {
+ .move.continuation = cont,
+ };
+ err = pending_msg_add(channel, &tid, pmsg);
+ assert(err_is_ok(err));//adding should actually never fail
+
+ //if fully trusted, the other side already has the cap, so don't resend it
+ struct capref cap;
+ if (channel->trust == BULK_TRUST_FULL){
+ cap = NULL_CAP;
+ } else {
+ cap = buffer->cap;
+ }
+
+ //send message
+ struct pass_data *d = malloc(sizeof(*d));
+ assert(d);
+ d->tid = tid;
+ d->meta = meta;
+ d->metasize = metasize;
+ d->cap = cap;
+ d->buffer = buffer;
+ d->channel = channel;
+ fill_pool_id_for_flounder(&buffer->pool->id, &d->poolid);
+
+ //send message (automatically retries if channel busy)
+ bulk_sm_flounder_send_fifo_msg_with_arg(channel,bulk_sm_move_send_request,d);
+ //if the transmission fails, the user will be notified through the continuation
+ return err;
+}
+
+static errval_t bulk_sm_copy_send_request(void *a)
+{
+ struct pass_data *d = (struct pass_data*) a;
+ struct bulk_buffer *buffer = d->buffer;
+ struct bulk_channel *channel = d->channel;
+ struct bulk_ctrl_binding *b = CHANNEL_BINDING(channel);
+
+ struct event_closure txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
+ "bulk_sm_copy sent");
+ errval_t err;
+ if (channel->trust == BULK_TRUST_NONE) {
+ err = bulk_ctrl_copy_untrusted_call__tx(b, txcont, d->poolid, buffer->bufferid,
+ d->tid, d->cap, d->meta, d->metasize);
+ } else {
+ err = bulk_ctrl_copy_trusted_call__tx(b, txcont, d->poolid, buffer->bufferid,
+ d->tid, d->meta, d->metasize);
+ }
+
+ if (err_is_ok(err)) {
+ free(d);
+ } else if (err_no(err) != FLOUNDER_ERR_TX_BUSY) {
+ //sending this message will never work, do not retry
+ //notify user the same way as if the other side had an error
+ bulk_sm_copy_rx_response(b, err, d->tid);
+ free(d);
+ }
+ return err;
+}
+
+errval_t bulk_sm_copy(struct bulk_channel *channel,
+ struct bulk_buffer *buffer,
+ void *meta,
+ struct bulk_continuation cont)
+{
+ errval_t err;
+ uint32_t tid;
+ size_t metasize = (meta) ? channel->meta_size : 0;//tolerate null pointers
+
+ //store the arguments for when we get the reply later
+ union pending_msg_data pmsg = {
+ .copy.continuation = cont,
+ };
+ err = pending_msg_add(channel, &tid, pmsg);
+ assert(err_is_ok(err));//adding should actually never fail
+
+ //if fully trusted, the other side already has the cap, so don't resend it
+ struct capref cap;
+ if (channel->trust == BULK_TRUST_FULL){
+ cap = NULL_CAP;
+ } else {
+ cap = buffer->cap;
+ }
+
+ //send message
+ struct pass_data *d = malloc(sizeof(*d));
+ assert(d);
+ d->tid = tid;
+ d->meta = meta;
+ d->metasize = metasize;
+ d->cap = cap;
+ d->buffer = buffer;
+ d->channel = channel;
+ fill_pool_id_for_flounder(&buffer->pool->id, &d->poolid);
+
+ //send message (automatically retries if channel busy)
+ bulk_sm_flounder_send_fifo_msg_with_arg(channel,bulk_sm_copy_send_request,d);
+ //if the transmission fails, the user will be notified through the continuation
+ return err;
+}
+
+static errval_t bulk_sm_release_send_request(void *a)
+{
+ struct pass_data *d = (struct pass_data*) a;
+ struct bulk_buffer *buffer = d->buffer;
+ struct bulk_channel *channel = d->channel;
+ struct bulk_ctrl_binding *b = CHANNEL_BINDING(channel);
+
+ struct event_closure txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
+ "bulk_sm_release sent");
+ errval_t err = bulk_ctrl_release_call__tx(b, txcont,
+ d->poolid, buffer->bufferid, d->tid);
+
+ if (err_is_ok(err)) {
+ free(d);
+ } else if (err_no(err) != FLOUNDER_ERR_TX_BUSY) {
+ //sending this message will never work, do not retry
+ //notify user the same way as if the other side had an error
+ bulk_sm_release_rx_response(b, err, d->tid);
+ free(d);
+ }
+ return err;
+}
+
+
+errval_t bulk_sm_release(struct bulk_channel *channel,
+ struct bulk_buffer *buffer,
+ struct bulk_continuation cont)
+{
+ errval_t err;
+ uint32_t tid;
+
+ //store the arguments for when we get the reply later
+ union pending_msg_data pmsg = {
+ .release.continuation = cont,
+ };
+ err = pending_msg_add(channel, &tid, pmsg);
+ assert(err_is_ok(err));//adding should actually never fail
+
+
+ //send message
+ struct pass_data *d = malloc(sizeof(*d));//could use smaller struct
+ assert(d);
+ d->tid = tid;
+ d->buffer = buffer;
+ d->channel = channel;
+ fill_pool_id_for_flounder(&buffer->pool->id, &d->poolid);
+
+ //send message (automatically retries if channel busy)
+ bulk_sm_flounder_send_fifo_msg_with_arg(channel,bulk_sm_release_send_request,d);
+ //if the transmission fails, the user will be notified through the continuation
+ return err;
+}
+
+static errval_t bulk_sm_pass_send_request(void *a)
+{
+ struct pass_data *d = (struct pass_data*) a;
+ struct bulk_buffer *buffer = d->buffer;
+ struct bulk_channel *channel = d->channel;
+ struct bulk_ctrl_binding *b = CHANNEL_BINDING(channel);
+
+ struct event_closure txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
+ "bulk_sm_pass sent");
+
+ errval_t err;
+ if (channel->trust == BULK_TRUST_NONE) {
+ err = bulk_ctrl_pass_untrusted_call__tx(b, txcont, d->poolid, buffer->bufferid,
+ d->tid, d->cap, d->meta, d->metasize);
+ } else {
+ err = bulk_ctrl_pass_trusted_call__tx(b, txcont, d->poolid, buffer->bufferid,
+ d->tid, d->meta, d->metasize);
+ }
+
+ if (err_is_ok(err)) {
+ free(d);
+ } else if (err_no(err) != FLOUNDER_ERR_TX_BUSY) {
+ //sending this message will never work, do not retry
+ //notify user the same way as if the other side had an error
+ bulk_sm_pass_rx_response(b, err, d->tid);
+ free(d);
+ }
+ return err;
+}
+
+errval_t bulk_sm_pass(struct bulk_channel *channel,
+ struct bulk_buffer *buffer,
+ void *meta,
+ struct bulk_continuation cont)
+{
+ errval_t err;
+ uint32_t tid;
+ size_t metasize = (meta) ? channel->meta_size : 0;//tolerate null pointers
+
+ //store the arguments for when we get the reply later
+ union pending_msg_data pmsg = {
+ .pass.continuation = cont,
+ };
+ err = pending_msg_add(channel, &tid, pmsg);
+ assert(err_is_ok(err));//adding should actually never fail
+
+ //if fully trusted, the other side already has the cap, so don't resend it
+ struct capref cap;
+ if (channel->trust == BULK_TRUST_FULL){
+ cap = NULL_CAP;
+ } else {
+ cap = buffer->cap;
+ }
+
+
+ struct pass_data *d = malloc(sizeof(*d));
+ assert(d);
+ d->tid = tid;
+ d->meta = meta;
+ d->metasize = metasize;
+ d->cap = cap;
+ d->buffer = buffer;
+ d->channel = channel;
+ fill_pool_id_for_flounder(&buffer->pool->id, &d->poolid);
+
+ //send message (automatically retries if channel busy)
+ bulk_sm_flounder_send_fifo_msg_with_arg(channel,bulk_sm_pass_send_request, d);
+ //if the transmission fails, the user will be notified through the continuation
+ return err;
+}
+
+
+//--------------- flounder RPC handlers:
+
+//move, copy, pass and release replies all have the same format
+struct bulk_sm_reply_data {
+ struct bulk_channel *channel;
+ struct event_closure cb;
+ bulk_ctrl_error_t error;
+ uint32_t tid;
+};
+
+static errval_t bulk_sm_move_send_reply(void *a)
+{
+ struct bulk_sm_reply_data *rdata = a;
+ struct bulk_ctrl_binding *b = CHANNEL_BINDING(rdata->channel);
+
+ errval_t err = bulk_ctrl_move_response__tx(b, rdata->cb,
+ rdata->error, rdata->tid);
+
+ if (err_is_ok(err)) {
+ free(rdata);
+ }
+ return err;
+}
+
+
+void bulk_sm_move_rx_call(
+ struct bulk_ctrl_binding *b,
+ bulk_ctrl_poolid_t poolid,
+ uint32_t bufferid,
+ uint32_t tid,
+ struct capref cap,
+ uint8_t *meta,
+ size_t metasize)
+{
+ errval_t err = SYS_ERR_OK;
+ struct event_closure txcont;
+ struct bulk_buffer *buffer;
+ struct bulk_channel *channel = VOID2CHANNEL(b->st);
+
+ assert(metasize == channel->meta_size || metasize == 0);
+
+ struct bulk_pool_id b_poolid;
+ fill_pool_id_from_flounder(&b_poolid, &poolid);
+
+ struct bulk_pool *pool = bulk_pool_get(&b_poolid, channel);
+ if (pool == NULL){
+ err = BULK_TRANSFER_POOL_INVALD;
+ } else if (pool->num_buffers < bufferid){
+ err = BULK_TRANSFER_BUFFER_INVALID;
+ } else {
+ buffer = pool->buffers[bufferid];
+
+ //in the untrusted case, we also received the cap for this buffer
+ if (channel->trust == BULK_TRUST_NONE){
+ //make sure transmitter does not keep a copy for himself
+ err = cap_revoke(cap);
+ assert(err_is_ok(err));
+ err = bulk_buffer_assign_cap(buffer, cap, 0);
+ }
+
+ if (err_is_ok(err)){
+ //automatically remaps if necessary
+ err = bulk_buffer_change_state(buffer, BULK_BUFFER_READ_WRITE);
+ }
+ }
+
+ //send reply & inform user
+ if (err_is_ok(err)){
+ if (channel->callbacks->move_received) {
+ channel->callbacks->move_received(channel, buffer, meta);
+ }
+
+ txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
+ "bulk_sm_move_rx_call: reply sent.");
+ } else {
+ txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
+ "bulk_sm_move_rx_call: reply to invalid move sent");
+ }
+
+ struct bulk_sm_reply_data *rdata = malloc(sizeof(*rdata));
+ assert(rdata);
+ rdata->channel = channel;
+ rdata->cb = txcont;
+ rdata->error = err;
+ rdata->tid = tid;
+
+ bulk_sm_flounder_send_fifo_msg_with_arg(channel,
+ bulk_sm_move_send_reply, rdata);
+}
+
+void bulk_sm_move_trusted_rx_call(
+ struct bulk_ctrl_binding *b,
+ bulk_ctrl_poolid_t poolid,
+ uint32_t bufferid,
+ uint32_t tid,
+ uint8_t *meta,
+ size_t metasize)
+{
+ //call normal handler with a NULL_CAP
+ bulk_sm_move_rx_call(b, poolid, bufferid, tid, NULL_CAP, meta, metasize);
+}
+
+
+void bulk_sm_move_rx_response(
+ struct bulk_ctrl_binding *b,
+ bulk_ctrl_error_t error,
+ uint32_t tid)
+{
+ //find data associated with this RPC call
+ union pending_msg_data data;
+ errval_t err = pending_msg_get(VOID2CHANNEL(b->st), tid, &data, true);
+ if (err_is_fail(err)){
+ //no such message data -> ignore?
+ DEBUG_ERR(err, "bulk_sm_move_rx_response");
+ return;
+ }
+
+ //TODO: clean up if error is fail
+
+ //call continuation
+ bulk_continuation_call(data.move.continuation, (errval_t) error,
+ VOID2CHANNEL(b->st));
+}
+
+static errval_t bulk_sm_copy_send_reply(void *a)
+{
+ struct bulk_sm_reply_data *rdata = a;
+ struct bulk_ctrl_binding *b = CHANNEL_BINDING(rdata->channel);
+
+ errval_t err = bulk_ctrl_copy_response__tx(
+ b, rdata->cb, rdata->error, rdata->tid);
+
+ if (err_is_ok(err)) {
+ free(rdata);
+ }
+ return err;
+}
+
+void bulk_sm_copy_rx_call(
+ struct bulk_ctrl_binding *b,
+ bulk_ctrl_poolid_t poolid,
+ uint32_t bufferid,
+ uint32_t tid,
+ struct capref cap,
+ uint8_t *meta,
+ size_t metasize)
+{
+ errval_t err = SYS_ERR_OK;
+ struct event_closure txcont;
+ struct bulk_buffer *buffer;
+ struct bulk_channel *channel = VOID2CHANNEL(b->st);
+
+ assert(metasize == channel->meta_size || metasize == 0);
+
+ struct bulk_pool_id b_poolid;
+ fill_pool_id_from_flounder(&b_poolid, &poolid);
+
+ struct bulk_pool *pool = bulk_pool_get(&b_poolid, channel);
+ if (pool == NULL){
+ err = BULK_TRANSFER_POOL_INVALD;
+ } else if (pool->num_buffers < bufferid){
+ err = BULK_TRANSFER_BUFFER_INVALID;
+ } else {
+ buffer = pool->buffers[bufferid];
+ //in the untrusted case, we also received the cap for this buffer
+ if (channel->trust == BULK_TRUST_NONE){
+ //TODO: make sure there is no rw cap in transmitters cspace
+ // the way to do this would be to check that this is a shared_readonly cap
+ err = bulk_buffer_assign_cap(buffer, cap, 0);
+ }
+
+ if (err_is_ok(err)){
+ //automatically remaps if necessary
+ err = bulk_buffer_change_state(buffer, BULK_BUFFER_READ_ONLY);
+ //TODO: keep track of copies? adjust refcount? do we let the user do that?
+ }
+ }
+
+ if (err_is_ok(err)){
+ if (channel->callbacks->copy_received) {
+ channel->callbacks->copy_received(channel, buffer, meta);
+ }
+
+ txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
+ "bulk_sm_copy_rx_call: reply sent.");
+ } else {
+ txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
+ "bulk_sm_copy_rx_call: reply to invalid copy sent");
+ }
+
+ struct bulk_sm_reply_data *rdata = malloc(sizeof(*rdata));
+ assert(rdata);
+ rdata->channel = channel;
+ rdata->cb = txcont;
+ rdata->error = err;
+ rdata->tid = tid;
+
+
+ bulk_sm_flounder_send_fifo_msg_with_arg(channel,
+ bulk_sm_copy_send_reply, rdata);
+}
+
+void bulk_sm_copy_trusted_rx_call(
+ struct bulk_ctrl_binding *b,
+ bulk_ctrl_poolid_t poolid,
+ uint32_t bufferid,
+ uint32_t tid,
+ uint8_t *meta,
+ size_t metasize)
+{
+ //call normal handler with a NULL_CAP
+ bulk_sm_copy_rx_call(b, poolid, bufferid, tid, NULL_CAP, meta, metasize);
+}
+
+
+void bulk_sm_copy_rx_response(
+ struct bulk_ctrl_binding *b,
+ bulk_ctrl_error_t error,
+ uint32_t tid)
+{
+ //find data associated with this RPC call
+ union pending_msg_data data;
+ errval_t err = pending_msg_get(VOID2CHANNEL(b->st), tid, &data, true);
+ if (err_is_fail(err)){
+ //no such message data -> ignore?
+ DEBUG_ERR(err, "bulk_sm_copy_rx_response");
+ return;
+ }
+
+ //TODO: clean up if error is fail
+
+ //call continuation
+ bulk_continuation_call(data.copy.continuation, (errval_t) error,
+ VOID2CHANNEL(b->st));
+}
+
+static errval_t bulk_sm_pass_send_reply(void *a)
+{
+ struct bulk_sm_reply_data *rdata = a;
+ struct bulk_ctrl_binding *b = CHANNEL_BINDING(rdata->channel);
+
+ errval_t err = bulk_ctrl_pass_response__tx(b, rdata->cb,
+ rdata->error, rdata->tid);
+
+ if (err_is_ok(err)) {
+ free(rdata);
+ }
+ return err;
+}
+
+
+void bulk_sm_pass_rx_call(
+ struct bulk_ctrl_binding *b,
+ bulk_ctrl_poolid_t poolid,
+ uint32_t bufferid,
+ uint32_t tid,
+ struct capref cap,
+ uint8_t *meta,
+ size_t metasize)
+{
+ BULK_DEBUG_PRINT("%s", "bulk_sm_pass_rx_call called\n");
+
+ errval_t err = SYS_ERR_OK;
+ struct event_closure txcont;
+ struct bulk_buffer *buffer;
+ struct bulk_channel *channel = VOID2CHANNEL(b->st);
+
+ assert(metasize == channel->meta_size || metasize == 0);
+
+ struct bulk_pool_id b_poolid;
+ fill_pool_id_from_flounder(&b_poolid, &poolid);
+
+ struct bulk_pool *pool = bulk_pool_get(&b_poolid, channel);
+ if (pool == NULL){
+ err = BULK_TRANSFER_POOL_INVALD;
+ } else if (pool->num_buffers < bufferid){
+ err = BULK_TRANSFER_BUFFER_INVALID;
+ } else {
+ buffer = pool->buffers[bufferid];
+
+ //in the untrusted case, we also received the cap for this buffer
+ if (channel->trust == BULK_TRUST_NONE){
+ //make sure transmitter does not keep a copy for himself
+ err = cap_revoke(cap);
+ assert(err_is_ok(err));
+ err = bulk_buffer_assign_cap(buffer, cap, 0);
+ }
+
+ if (err_is_ok(err)){
+ //automatically remaps if necessary
+ err = bulk_buffer_change_state(buffer, BULK_BUFFER_READ_WRITE);
+ }
+ }
+
+ //send reply & inform user
+ if (err_is_ok(err)){
+ if (channel->callbacks->buffer_received) {
+ channel->callbacks->buffer_received(channel, buffer, meta);
+ }
+
+ txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
+ "bulk_sm_pass_rx_call: reply sent.");
+ } else {
+ txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
+ "bulk_sm_pass_rx_call: reply to invalid pass sent");
+ }
+
+ struct bulk_sm_reply_data *rdata = malloc(sizeof(*rdata));
+ assert(rdata);
+ rdata->channel = channel;
+ rdata->cb = txcont;
+ rdata->error = err;
+ rdata->tid = tid;
+
+ bulk_sm_flounder_send_fifo_msg_with_arg(channel,
+ bulk_sm_pass_send_reply, rdata);
+}
+
+void bulk_sm_pass_trusted_rx_call(
+ struct bulk_ctrl_binding *b,
+ bulk_ctrl_poolid_t poolid,
+ uint32_t bufferid,
+ uint32_t tid,
+ uint8_t *meta,
+ size_t metasize)
+{
+ //call normal handler with a NULL_CAP
+ bulk_sm_pass_rx_call(b, poolid, bufferid, tid, NULL_CAP, meta, metasize);
+}
+
+
+void bulk_sm_pass_rx_response(
+ struct bulk_ctrl_binding *b,
+ bulk_ctrl_error_t error,
+ uint32_t tid)
+{
+ BULK_DEBUG_PRINT("bulk_sm_pass_rx_response called. TID = %u\n", tid);
+ //find data associated with this RPC call
+ union pending_msg_data data;
+ errval_t err = pending_msg_get(VOID2CHANNEL(b->st), tid, &data, true);
+ if (err_is_fail(err)){
+ //no such message data -> ignore?
+ DEBUG_ERR(err, "bulk_sm_copy_rx_response");
+ return;
+ }
+
+ //TODO: clean up if error is fail
+
+ //call continuation
+ bulk_continuation_call(data.pass.continuation, (errval_t) error,
+ VOID2CHANNEL(b->st));
+}
+
+static errval_t bulk_sm_release_send_reply(void *a)
+{
+ struct bulk_sm_reply_data *rdata = a;
+ struct bulk_ctrl_binding *b = CHANNEL_BINDING(rdata->channel);
+
+ errval_t err = bulk_ctrl_release_response__tx(b, rdata->cb,
+ rdata->error, rdata->tid);
+
+ if (err_is_fail(err)) {
+ free(rdata);
+ }
+ return err;
+}
+
+
+void bulk_sm_release_rx_call(
+ struct bulk_ctrl_binding *b,
+ bulk_ctrl_poolid_t poolid,
+ uint32_t bufferid,
+ uint32_t tid)
+{
+ errval_t err = SYS_ERR_OK;
+ struct event_closure txcont;
+ struct bulk_buffer *buffer;
+ struct bulk_channel *channel = VOID2CHANNEL(b->st);
+
+ struct bulk_pool_id b_poolid;
+ fill_pool_id_from_flounder(&b_poolid, &poolid);
+
+ struct bulk_pool *pool = bulk_pool_get(&b_poolid, channel);
+ if (pool == NULL){
+ err = BULK_TRANSFER_POOL_INVALD;
+ } else if (pool->num_buffers < bufferid){
+ err = BULK_TRANSFER_BUFFER_INVALID;
+ } else {
+ buffer = pool->buffers[bufferid];
+ buffer->local_ref_count--;
+ //TODO: make the decrease atomic? (so only the last decrease reclaims it)
+ //TODO: find out what the refcount should be to take action (0 or 1?)
+ if (buffer->local_ref_count == 0 && buffer->state == BULK_BUFFER_RO_OWNED){
+ //retake ownership
+ if (channel->trust == BULK_TRUST_NONE){
+ err = cap_revoke(buffer->cap);
+ assert(err_is_ok(err));
+ }
+ if (err_is_ok(err)){
+ //automatically remaps if necessary
+ err = bulk_buffer_change_state(buffer, BULK_BUFFER_READ_WRITE);
+ }
+ }
+ //TODO: what if refcount became 0 but we are not the owner?
+ //should we just let the user callback handle that? (we probably have to)
+ }
+
+ //send reply & inform user
+ if (err_is_ok(err)){
+ if (channel->callbacks->copy_released) {
+ channel->callbacks->copy_released(channel, buffer);
+ }
+
+ txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
+ "bulk_sm_release_rx_call: reply sent.");
+ } else {
+ txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
+ "bulk_sm_release_rx_call: reply to invalid release sent");
+ }
+
+ struct bulk_sm_reply_data *rdata = malloc(sizeof(*rdata));
+ assert(rdata);
+ rdata->channel = channel;
+ rdata->cb = txcont;
+ rdata->error = err;
+ rdata->tid = tid;
+
+ bulk_sm_flounder_send_fifo_msg_with_arg(channel,
+ bulk_sm_release_send_reply, rdata);
+}
+
+
+void bulk_sm_release_rx_response(
+ struct bulk_ctrl_binding *b,
+ bulk_ctrl_error_t error,
+ uint32_t tid)
+{
+ //find data associated with this RPC call
+ union pending_msg_data data;
+ errval_t err = pending_msg_get(VOID2CHANNEL(b->st), tid, &data, true);
+ if (err_is_fail(err)){
+ //no such message data -> ignore?
+ DEBUG_ERR(err, "bulk_sm_release_rx_response");
+ return;
+ }
+
+ //TODO: clean up if error is fail
+
+ //call continuation
+ bulk_continuation_call(data.release.continuation, (errval_t) error,
+ VOID2CHANNEL(b->st));
+}
--- /dev/null
+/*
+ * Copyright (c) 2009, 2010, 2011, 2012, ETH Zurich.
+ * All rights reserved.
+ *
+ * This file is distributed under the terms in the attached LICENSE file.
+ * If you do not find this file, copies can be found by writing to:
+ * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
+ */
+
+#ifndef BULK_SM_IMPL_H
+#define BULK_SM_IMPL_H
+
+#include <barrelfish/barrelfish.h>
+
+#include <bulk_transfer/bulk_transfer.h>
+#include <bulk_transfer/bulk_sm.h>
+
+#define VOID2CHANNEL(a) ((struct bulk_channel*)(a))
+#define CHANNEL_EP(c) ((struct bulk_sm_endpoint_descriptor*)(c)->ep)
+#define CHANNEL_DATA(c) ((struct bulk_sm_impl_data*)(c)->impl_data)
+#define CHANNEL_BINDING(c) (CHANNEL_DATA(c)->b)
+
+// Flounder call/receive handler ------------------------------------------
+
+void bulk_sm_channel_negotiate_rx_call(
+ struct bulk_ctrl_binding *b,
+ enum bulk_ctrl_role_t role,
+ enum bulk_ctrl_trust_t trust);
+
+void bulk_sm_channel_negotiate_rx_reply(
+ struct bulk_ctrl_binding *b,
+ uint64_t error,
+ enum bulk_ctrl_direction_t match_direction,
+ enum bulk_ctrl_role_t match_role,
+ uint64_t meta_size);
+
+void bulk_sm_assign_pool_rx_call(
+ struct bulk_ctrl_binding *b,
+ bulk_ctrl_pool_t pool,
+ uint64_t id);
+
+void bulk_sm_assign_pool_rx_response(
+ struct bulk_ctrl_binding *b,
+ uint64_t error,
+ uint64_t id);
+
+
+void bulk_sm_move_rx_call(
+ struct bulk_ctrl_binding *b,
+ bulk_ctrl_poolid_t poolid,
+ uint32_t bufferid,
+ uint32_t tid,
+ struct capref cap,
+ uint8_t *meta,
+ size_t metasize);
+
+void bulk_sm_move_trusted_rx_call(
+ struct bulk_ctrl_binding *b,
+ bulk_ctrl_poolid_t poolid,
+ uint32_t bufferid,
+ uint32_t tid,
+ uint8_t *meta,
+ size_t metasize);
+
+void bulk_sm_move_rx_response(
+ struct bulk_ctrl_binding *b,
+ bulk_ctrl_error_t error,
+ uint32_t tid);
+
+void bulk_sm_copy_rx_call(
+ struct bulk_ctrl_binding *b,
+ bulk_ctrl_poolid_t poolid,
+ uint32_t bufferid,
+ uint32_t tid,
+ struct capref cap,
+ uint8_t *meta,
+ size_t metasize);
+
+void bulk_sm_copy_trusted_rx_call(
+ struct bulk_ctrl_binding *b,
+ bulk_ctrl_poolid_t poolid,
+ uint32_t bufferid,
+ uint32_t tid,
+ uint8_t *meta,
+ size_t metasize);
+
+void bulk_sm_copy_rx_response(
+ struct bulk_ctrl_binding *b,
+ bulk_ctrl_error_t error,
+ uint32_t tid);
+
+void bulk_sm_pass_rx_call(
+ struct bulk_ctrl_binding *b,
+ bulk_ctrl_poolid_t poolid,
+ uint32_t bufferid,
+ uint32_t tid,
+ struct capref cap,
+ uint8_t *meta,
+ size_t metasize);
+
+void bulk_sm_pass_trusted_rx_call(
+ struct bulk_ctrl_binding *b,
+ bulk_ctrl_poolid_t poolid,
+ uint32_t bufferid,
+ uint32_t tid,
+ uint8_t *meta,
+ size_t metasize);
+
+void bulk_sm_pass_rx_response(
+ struct bulk_ctrl_binding *b,
+ bulk_ctrl_error_t error,
+ uint32_t tid);
+
+void bulk_sm_release_rx_call(
+ struct bulk_ctrl_binding *b,
+ bulk_ctrl_poolid_t poolid,
+ uint32_t bufferid,
+ uint32_t tid);
+
+void bulk_sm_release_rx_response(
+ struct bulk_ctrl_binding *b,
+ bulk_ctrl_error_t error,
+ uint32_t tid);
+
+// Flounder generic callbacks ---------------------------------------------
+
+//printing handler for asynchronous errors
+void bulk_sm_error_handler_debug(struct bulk_ctrl_binding *_binding, errval_t err);
+
+/**
+ * Callback for use with flounder. Prints to string provided by (void *a).
+ */
+void bulk_sm_flounder_msg_sent_debug_cb(void *a);
+
+/**
+ * Function used as generic send that preserves fifo order.
+ * enqueues the message and registers resend handler if necessary
+ *
+ * @param channel: Channel of interest. Is used as argument for the callback.
+ * @param send_fn: send function to be called. this should be a very simple
+ * function that just reads arg, tries to send it and
+ * returns the error code.
+ */
+void bulk_sm_flounder_send_fifo_msg(struct bulk_channel *channel,
+ errval_t (*send_fn)(void *arg));
+
+void bulk_sm_flounder_send_fifo_msg_with_arg(struct bulk_channel *channel,
+ errval_t (*send_fn)(void *arg),
+ void *arg);
+
+
+
+// Flounder type conversion helpers ---------------------------------------
+
+static inline enum bulk_channel_direction flounder2bulk_direction(
+ enum bulk_ctrl_direction_t direction)
+{
+ return ((direction == bulk_ctrl_SOURCE) ? BULK_DIRECTION_TX :
+ BULK_DIRECTION_RX );
+}
+
+static inline enum bulk_channel_role flounder2bulk_role(
+ enum bulk_ctrl_role_t role)
+{
+ return ((role == bulk_ctrl_GENERIC) ? BULK_ROLE_GENERIC :
+ (role == bulk_ctrl_MASTER) ? BULK_ROLE_MASTER :
+ BULK_ROLE_SLAVE );
+}
+
+static inline enum bulk_trust_level flounder2bulk_trust(
+ enum bulk_ctrl_trust_t trust)
+{
+ return ((trust == bulk_ctrl_NONE) ? BULK_TRUST_NONE :
+ (trust == bulk_ctrl_HALF) ? BULK_TRUST_HALF :
+ BULK_TRUST_FULL );
+}
+
+static inline enum bulk_ctrl_direction_t bulk2flounder_direction(
+ enum bulk_channel_direction direction)
+{
+ return ((direction == BULK_DIRECTION_TX) ? bulk_ctrl_SOURCE :
+ bulk_ctrl_SINK );
+}
+
+static inline enum bulk_ctrl_role_t bulk2flounder_role(
+ enum bulk_channel_role role)
+{
+ return ((role == BULK_ROLE_GENERIC) ? bulk_ctrl_GENERIC :
+ (role == BULK_ROLE_MASTER) ? bulk_ctrl_MASTER :
+ bulk_ctrl_SLAVE );
+}
+
+static inline enum bulk_ctrl_trust_t bulk2flounder_trust(
+ enum bulk_trust_level trust)
+{
+ assert(trust != BULK_TRUST_UNINITIALIZED); // what to do with that?
+
+ return ((trust == BULK_TRUST_NONE) ? bulk_ctrl_NONE :
+ (trust == BULK_TRUST_HALF) ? bulk_ctrl_HALF :
+ bulk_ctrl_FULL );
+}
+
+/**
+ * Allocates and initializes a new bulk_pool struct based on flounder data.
+ *
+ * @param: pool Pointer where created and allocated pool is stored, by value.
+ * @param: f_pool Flounder data to create pool from.
+ */
+errval_t create_pool_from_flounder(struct bulk_pool **pool,
+ const bulk_ctrl_pool_t *f_pool);
+
+/**
+ * Creates a flounder pool struct from a bulk_pool.
+ *
+ * @param: pool Pool to be represented
+ * @param: f_pool Pointer to unused flounder pool.
+ */
+void generate_pool_for_flounder(const struct bulk_pool *pool,
+ bulk_ctrl_pool_t *f_pool);
+
+
+/**
+ * sets the fields of a bulk_pool_id struct from a flounder poolid struct.
+ * does not allocate any new memory.
+ *
+ */
+void fill_pool_id_from_flounder(struct bulk_pool_id *poolid,
+ const bulk_ctrl_poolid_t *f_poolid);
+
+/**
+ * sets the fields of a flounder poolid struct from a bulk_pool_id struct.
+ * does not allocate any new memory.
+ */
+void fill_pool_id_for_flounder(const struct bulk_pool_id *poolid,
+ bulk_ctrl_poolid_t *f_poolid);
+
+#endif // BULK_SM_IMPL_H
--- /dev/null
+/**
+ * \file
+ * \brief Unidirectional bulk data transfer via shared memory
+ */
+
+/*
+ * Copyright (c) 2009, 2010, 2011, 2012, ETH Zurich.
+ * All rights reserved.
+ *
+ * This file is distributed under the terms in the attached LICENSE file.
+ * If you do not find this file, copies can be found by writing to:
+ * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
+ */
+
+#include <barrelfish/barrelfish.h>
+
+#include <bulk_transfer/bulk_transfer.h>
+#include <bulk_transfer/bulk_sm.h>
+
+#include "../../helpers.h"
+#include "bulk_sm_impl.h" // XXX legal? where to put impl headers?
+
+
+struct bulk_implementation bulk_sm_implementation = {
+ .channel_create = bulk_sm_channel_create,
+ .channel_bind = bulk_sm_channel_bind,
+ .channel_destroy = bulk_sm_channel_destroy,
+ .assign_pool = bulk_sm_assign_pool,
+ .remove_pool = bulk_sm_remove_pool,
+ .move = bulk_sm_move,
+ .copy = bulk_sm_copy,
+ .release = bulk_sm_release,
+ .pass = bulk_sm_pass,
+ .request = NULL, // supported?
+};
+
+struct bulk_implementation *bulk_sm_get_implementation(void)
+{
+ return &bulk_sm_implementation;
+}
+
+struct bulk_ctrl_rx_vtbl bulk_ctrl_rx_vtbl = {
+ .negotiate_call = bulk_sm_channel_negotiate_rx_call,
+ .negotiate_response = bulk_sm_channel_negotiate_rx_reply,
+ .assign_pool_call = bulk_sm_assign_pool_rx_call,
+ .assign_pool_response = bulk_sm_assign_pool_rx_response,
+ .move_untrusted_call = bulk_sm_move_rx_call,
+ .move_trusted_call = bulk_sm_move_trusted_rx_call,
+ .move_response = bulk_sm_move_rx_response,
+ .copy_untrusted_call = bulk_sm_copy_rx_call,
+ .copy_trusted_call = bulk_sm_copy_trusted_rx_call,
+ .copy_response = bulk_sm_copy_rx_response,
+ .pass_untrusted_call = bulk_sm_pass_rx_call,
+ .pass_trusted_call = bulk_sm_pass_trusted_rx_call,
+ .pass_response = bulk_sm_pass_rx_response,
+ .release_call = bulk_sm_release_rx_call,
+ .release_response = bulk_sm_release_rx_response,
+};
+
+// Channel Management -----------------------------------------------------
+
+// Functions involved (C = Creator, B = Binder) (+ = public interface)
+// ==================
+//
+// Endpoint creation:
+// C + bulk_sm_ep_create
+//
+// Channel creation
+// C + bulk_sm_channel_create
+// C bulk_sm_channel_create_cb (Flounder: if.export_cb)
+//
+// Channel binding 1: establish flounder channel
+// B + bulk_sm_channel_bind
+// C bulk_sm_channel_connect (Flounder: if.connect_cb)
+// B bulk_sm_channel_bind_cb (Flounder: if.bind_cb)
+//
+// Channel binding 2: negotiate direction, role and trust level
+// B bulk_sm_channel_negotiate
+// C bulk_sm_channel_negotiate_rx_call (Flounder: if.rx_bind_negotiate_call)
+// C bulk_sm_channel_negotiate_send_reply
+// C bulk_sm_channel_negotiate_replied (Flounder: if.send_bind_negotiate_response_cb)
+// B bulk_sm_channel_negotiate_rx_reply (Flounder: if.rx_bind_negotiate_response)
+//
+// Generalized functions to be used by mulitples (Flounder Helpers):
+// bulk_sm_flounder_msg_sent_debug_cb()
+
+// Channel binding 2: negotiate direction, role and trust level -----------
+
+struct bulk_sm_properties {
+ enum bulk_channel_role role;
+ enum bulk_trust_level trust;
+};
+
+void bulk_sm_channel_negotiate_rx_reply(
+ struct bulk_ctrl_binding *b,
+ uint64_t error,
+ enum bulk_ctrl_direction_t match_direction,
+ enum bulk_ctrl_role_t match_role,
+ uint64_t meta_size)
+{
+ assert( sizeof(errval_t) == sizeof(uint64_t) );
+ errval_t err = (errval_t) error;
+
+ struct bulk_channel *channel = VOID2CHANNEL(b->st);
+ struct bulk_sm_impl_data *data = CHANNEL_DATA(channel);
+
+ if (err_is_ok(err)) {
+ channel->direction = flounder2bulk_direction(match_direction);
+ channel->role = flounder2bulk_role(match_role);
+ channel->meta_size = meta_size;
+ channel->state = BULK_STATE_CONNECTED;
+ } else {
+ channel->state = BULK_STATE_UNINITIALIZED;
+ }
+
+ // notify user
+ bulk_continuation_call(data->bind_cont, err, channel);
+}
+
+static void bulk_sm_channel_negotiate_replied(void *a)
+{
+ /*
+ struct bulk_channel *channel = VOID2CHANNEL(a);
+
+ if (channel->state == BULK_STATE_BIND_NEGOTIATE) {
+ // negotiate was successful
+ channel->state = BULK_STATE_CONNECTED;
+
+ if (channel->callbacks->bind_received) {
+ channel->callbacks->bind_received(channel);
+ }
+ } else {
+ // negotiation failed. go back to wait for binding
+ channel->state = BULK_STATE_INITIALIZED;
+ }
+ */
+}
+
+static errval_t bulk_sm_channel_negotiate_send_reply(void *a)
+{
+ struct bulk_channel *channel = VOID2CHANNEL(a);
+ struct bulk_sm_impl_data *data = CHANNEL_DATA(channel);
+ struct bulk_ctrl_binding *b = CHANNEL_BINDING(channel);
+
+ struct event_closure txcont = MKCONT(bulk_sm_channel_negotiate_replied,
+ channel);
+
+ bulk_ctrl_direction_t peer_direction =
+ bulk2flounder_direction(bulk_direction_other(channel->direction));
+ bulk_ctrl_role_t peer_role =
+ bulk2flounder_role(bulk_role_other(channel->role));
+
+ errval_t err = bulk_ctrl_negotiate_response__tx(b, txcont,
+ data->bind_error, peer_direction, peer_role, channel->meta_size);
+
+ if (err_is_ok(err)) {
+ // set new channel state. don't do this in
+ // bulk_sm_channel_negotiate_replied.
+ // Reason: when peer receives negotiate_reply, binding is done. If
+ // peer then no longer dispatches events on the waitset, we never get
+ // the above notification.
+
+ if (channel->state == BULK_STATE_BIND_NEGOTIATE) {
+ // negotiate was successful
+ channel->state = BULK_STATE_CONNECTED;
+
+ if (channel->callbacks->bind_received) {
+ channel->callbacks->bind_received(channel);
+ }
+ } else {
+ // negotiation failed. go back to wait for binding
+ channel->state = BULK_STATE_INITIALIZED;
+ }
+ }
+
+ return err;
+}
+
+void bulk_sm_channel_negotiate_rx_call(
+ struct bulk_ctrl_binding *b,
+ enum bulk_ctrl_role_t role,
+ enum bulk_ctrl_trust_t trust)
+{
+ struct bulk_channel *channel = VOID2CHANNEL(b->st);
+ struct bulk_sm_impl_data *data = CHANNEL_DATA(channel);
+
+ assert(channel->state == BULK_STATE_BIND_NEGOTIATE);
+
+ // helper structs
+ struct bulk_sm_properties me = {
+ .role = channel->role,
+ .trust = channel->trust,
+ };
+
+ struct bulk_sm_properties peer = {
+ .role = flounder2bulk_role(role),
+ .trust = flounder2bulk_trust(trust),
+ };
+
+ // Let's decide on the properties.
+ bool valid = true;
+
+ if (me.role == BULK_ROLE_GENERIC) {
+ if (peer.role == BULK_ROLE_GENERIC) {
+ me.role = BULK_ROLE_MASTER;
+ peer.role = BULK_ROLE_SLAVE;
+ } else {
+ me.role = bulk_role_other(peer.role);
+ }
+ } else {
+ if (peer.role == BULK_ROLE_GENERIC) {
+ peer.role = bulk_role_other(me.role);
+ } else {
+ valid = valid && (me.role == bulk_role_other(peer.role));
+ }
+ }
+
+ valid = valid && (bulk_trust_compare(me.trust, peer.trust) == 0);
+
+ // Successful?
+ if (valid) {
+ // update possibly updated role
+ channel->role = me.role;
+ data->bind_error = SYS_ERR_OK;
+ } else {
+ // reset binding state
+ channel->state = BULK_STATE_BINDING;
+ data->bind_error = BULK_TRANSFER_CHAN_BIND;
+ }
+
+ bulk_sm_flounder_send_fifo_msg(channel,
+ bulk_sm_channel_negotiate_send_reply);
+}
+
+static errval_t bulk_sm_channel_negotiate(void *a)
+{
+ struct bulk_channel *channel = VOID2CHANNEL(a);
+ struct bulk_ctrl_binding *b = CHANNEL_BINDING(channel);
+
+ assert(channel->state == BULK_STATE_BIND_NEGOTIATE);
+
+ struct event_closure txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
+ "bulk_sm_channel_negotiate sent");
+
+ errval_t err = bulk_ctrl_negotiate_call__tx(b, txcont,
+ bulk2flounder_role(channel->role),
+ bulk2flounder_trust(channel->trust)
+ );
+
+ return err;
+}
+
+// Channel binding 1: establish flounder channel --------------------------
+
+static void bulk_sm_channel_bind_cb(void *st,
+ errval_t err,
+ struct bulk_ctrl_binding *b)
+{
+ struct bulk_channel *channel = VOID2CHANNEL(st);
+ struct bulk_sm_impl_data *data = CHANNEL_DATA(channel);
+ assert(channel);
+
+ assert(err_is_ok(err)); // current implementation doesn't generate failure
+
+ // mutual pointers
+ b->rx_vtbl = bulk_ctrl_rx_vtbl;
+ b->st = channel;
+ data->b = b;
+
+ // channel update
+ channel->state = BULK_STATE_BIND_NEGOTIATE;
+
+ // Flounder channel established. let's negotiate channel properties
+ bulk_sm_flounder_send_fifo_msg(channel, bulk_sm_channel_negotiate);
+}
+
+static errval_t bulk_sm_channel_connect(void *st,
+ struct bulk_ctrl_binding *b)
+{
+ struct bulk_channel *channel = VOID2CHANNEL(st);
+ assert(channel);
+
+ struct bulk_sm_impl_data *data = CHANNEL_DATA(channel);
+
+ // mutual pointers
+ b->rx_vtbl = bulk_ctrl_rx_vtbl;
+ b->error_handler = bulk_sm_error_handler_debug;
+ b->st = channel;
+ data->b = b;
+
+ // channel update
+ channel->state = BULK_STATE_BIND_NEGOTIATE;
+
+ // Let binding side advance channel state and start negotiate properties.
+ return SYS_ERR_OK;
+}
+
+errval_t bulk_sm_channel_bind(struct bulk_channel *channel,
+ struct bulk_continuation cont)
+{
+ assert(channel);
+ assert(channel->state == BULK_STATE_UNINITIALIZED);
+ assert(channel->waitset);
+ assert(channel->ep);
+
+ struct bulk_sm_endpoint_descriptor *ep = CHANNEL_EP(channel);
+
+ assert(ep->state == BULK_EPSTATE_IREF_EXPORTED);
+
+ // allocate implementation-specific data
+ struct bulk_sm_impl_data *data = malloc(sizeof(struct bulk_sm_impl_data));
+ channel->impl_data = data;
+ if (!data) {
+ return BULK_TRANSFER_MEM;
+ }
+ data->root = NULL;
+ thread_mutex_init(&data->mutex);
+
+ thread_mutex_init(&data->resend_lock);
+ data->resend_closure = NULL;
+
+ // Bind to iref
+ errval_t err = bulk_ctrl_bind(ep->iref,
+ bulk_sm_channel_bind_cb,
+ channel,
+ channel->waitset,
+ IDC_EXPORT_FLAGS_DEFAULT);
+
+ if (err_is_fail(err)) {
+ DEBUG_ERR(err, "bulk_sm_channel_bind");
+ free(channel->impl_data);
+ channel->impl_data = NULL;
+ return BULK_TRANSFER_CHAN_BIND;
+ }
+
+ data->bind_cont = cont;
+ channel->state = BULK_STATE_BINDING;
+
+ return SYS_ERR_OK;
+}
+
+// Channel Creation -------------------------------------------------------
+
+static void bulk_sm_channel_create_cb(void *st, errval_t err, iref_t iref)
+{
+ struct bulk_channel *channel = VOID2CHANNEL(st);
+
+ assert(channel);
+
+ struct bulk_sm_endpoint_descriptor *ep = CHANNEL_EP(channel);
+
+ assert(ep);
+ assert(ep->state == BULK_EPSTATE_CREATED);
+
+ ep->iref = iref;
+ ep->err = err;
+ ep->state = BULK_EPSTATE_IREF_EXPORTED;
+}
+
+errval_t bulk_sm_channel_create(struct bulk_channel *channel)
+{
+ // We cannot use bulk_continuation here as we do not have a channel.
+ // Given the interface, we cannot take a barrelfish-style continuation.
+ // Mixing different continuation styles in the same library is ugly anyway.
+ // Thus, this call is blocking.
+
+ assert(channel);
+ assert(channel->state == BULK_STATE_UNINITIALIZED);
+ assert(channel->waitset);