2 * Copyright (c) 2009, 2010, 2011, 2012, ETH Zurich.
5 * This file is distributed under the terms in the attached LICENSE file.
6 * If you do not find this file, copies can be found by writing to:
7 * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
10 #include <barrelfish/barrelfish.h>
12 #include <bulk_transfer/bulk_transfer.h>
13 #include <bulk_transfer/bulk_sm.h>
14 #include "../../bulk_pool.h"
15 #include "../../bulk_buffer.h"
16 #include "bulk_sm_impl.h"
17 #include "pending_msg.h"
20 #define BULK_DEBUG_PRINT(fmt, msg...) debug_printf(fmt, msg)
22 #define BULK_DEBUG_PRINT(fmt, msg...)
25 //the same values are necessary for move, pass and copy operations
31 struct bulk_buffer *buffer;
32 struct bulk_channel *channel;
33 bulk_ctrl_poolid_t poolid;
36 static errval_t bulk_sm_move_send_request(void *a)
38 struct pass_data *d = (struct pass_data*) a;
39 struct bulk_buffer *buffer = d->buffer;
40 struct bulk_channel *channel = d->channel;
41 struct bulk_ctrl_binding *b = CHANNEL_BINDING(channel);
43 struct event_closure txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
47 if (channel->trust == BULK_TRUST_NONE) {
48 err = bulk_ctrl_move_untrusted_call__tx(b, txcont, d->poolid, buffer->bufferid,
49 d->tid, d->cap, d->meta, d->metasize);
51 err = bulk_ctrl_move_trusted_call__tx(b, txcont, d->poolid, buffer->bufferid,
52 d->tid, d->meta, d->metasize);
57 } else if (err_no(err) != FLOUNDER_ERR_TX_BUSY) {
58 //sending this message will never work, do not retry
59 //notify user the same way as if the other side had an error
60 bulk_sm_move_rx_response(b, err, d->tid);
66 errval_t bulk_sm_move(struct bulk_channel *channel,
67 struct bulk_buffer *buffer,
69 struct bulk_continuation cont)
73 size_t metasize = (meta) ? channel->meta_size : 0;//tolerate null pointers
75 //store the arguments for when we get the reply later
76 union pending_msg_data pmsg = {
77 .move.continuation = cont,
79 err = pending_msg_add(channel, &tid, pmsg);
80 assert(err_is_ok(err));//adding should actually never fail
82 //if fully trusted, the other side already has the cap, so don't resend it
84 if (channel->trust == BULK_TRUST_FULL){
91 struct pass_data *d = malloc(sizeof(*d));
95 d->metasize = metasize;
99 fill_pool_id_for_flounder(&buffer->pool->id, &d->poolid);
101 //send message (automatically retries if channel busy)
102 bulk_sm_flounder_send_fifo_msg_with_arg(channel,bulk_sm_move_send_request,d);
103 //if the transmission fails, the user will be notified through the continuation
107 static errval_t bulk_sm_copy_send_request(void *a)
109 struct pass_data *d = (struct pass_data*) a;
110 struct bulk_buffer *buffer = d->buffer;
111 struct bulk_channel *channel = d->channel;
112 struct bulk_ctrl_binding *b = CHANNEL_BINDING(channel);
114 struct event_closure txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
115 "bulk_sm_copy sent");
117 if (channel->trust == BULK_TRUST_NONE) {
118 err = bulk_ctrl_copy_untrusted_call__tx(b, txcont, d->poolid, buffer->bufferid,
119 d->tid, d->cap, d->meta, d->metasize);
121 err = bulk_ctrl_copy_trusted_call__tx(b, txcont, d->poolid, buffer->bufferid,
122 d->tid, d->meta, d->metasize);
125 if (err_is_ok(err)) {
127 } else if (err_no(err) != FLOUNDER_ERR_TX_BUSY) {
128 //sending this message will never work, do not retry
129 //notify user the same way as if the other side had an error
130 bulk_sm_copy_rx_response(b, err, d->tid);
136 errval_t bulk_sm_copy(struct bulk_channel *channel,
137 struct bulk_buffer *buffer,
139 struct bulk_continuation cont)
143 size_t metasize = (meta) ? channel->meta_size : 0;//tolerate null pointers
145 //store the arguments for when we get the reply later
146 union pending_msg_data pmsg = {
147 .copy.continuation = cont,
149 err = pending_msg_add(channel, &tid, pmsg);
150 assert(err_is_ok(err));//adding should actually never fail
152 //if fully trusted, the other side already has the cap, so don't resend it
154 if (channel->trust == BULK_TRUST_FULL){
161 struct pass_data *d = malloc(sizeof(*d));
165 d->metasize = metasize;
168 d->channel = channel;
169 fill_pool_id_for_flounder(&buffer->pool->id, &d->poolid);
171 //send message (automatically retries if channel busy)
172 bulk_sm_flounder_send_fifo_msg_with_arg(channel,bulk_sm_copy_send_request,d);
173 //if the transmission fails, the user will be notified through the continuation
177 static errval_t bulk_sm_release_send_request(void *a)
179 struct pass_data *d = (struct pass_data*) a;
180 struct bulk_buffer *buffer = d->buffer;
181 struct bulk_channel *channel = d->channel;
182 struct bulk_ctrl_binding *b = CHANNEL_BINDING(channel);
184 struct event_closure txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
185 "bulk_sm_release sent");
186 errval_t err = bulk_ctrl_release_call__tx(b, txcont,
187 d->poolid, buffer->bufferid, d->tid);
189 if (err_is_ok(err)) {
191 } else if (err_no(err) != FLOUNDER_ERR_TX_BUSY) {
192 //sending this message will never work, do not retry
193 //notify user the same way as if the other side had an error
194 bulk_sm_release_rx_response(b, err, d->tid);
201 errval_t bulk_sm_release(struct bulk_channel *channel,
202 struct bulk_buffer *buffer,
203 struct bulk_continuation cont)
208 //store the arguments for when we get the reply later
209 union pending_msg_data pmsg = {
210 .release.continuation = cont,
212 err = pending_msg_add(channel, &tid, pmsg);
213 assert(err_is_ok(err));//adding should actually never fail
217 struct pass_data *d = malloc(sizeof(*d));//could use smaller struct
221 d->channel = channel;
222 fill_pool_id_for_flounder(&buffer->pool->id, &d->poolid);
224 //send message (automatically retries if channel busy)
225 bulk_sm_flounder_send_fifo_msg_with_arg(channel,bulk_sm_release_send_request,d);
226 //if the transmission fails, the user will be notified through the continuation
230 static errval_t bulk_sm_pass_send_request(void *a)
232 struct pass_data *d = (struct pass_data*) a;
233 struct bulk_buffer *buffer = d->buffer;
234 struct bulk_channel *channel = d->channel;
235 struct bulk_ctrl_binding *b = CHANNEL_BINDING(channel);
237 struct event_closure txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
238 "bulk_sm_pass sent");
241 if (channel->trust == BULK_TRUST_NONE) {
242 err = bulk_ctrl_pass_untrusted_call__tx(b, txcont, d->poolid, buffer->bufferid,
243 d->tid, d->cap, d->meta, d->metasize);
245 err = bulk_ctrl_pass_trusted_call__tx(b, txcont, d->poolid, buffer->bufferid,
246 d->tid, d->meta, d->metasize);
249 if (err_is_ok(err)) {
251 } else if (err_no(err) != FLOUNDER_ERR_TX_BUSY) {
252 //sending this message will never work, do not retry
253 //notify user the same way as if the other side had an error
254 bulk_sm_pass_rx_response(b, err, d->tid);
260 errval_t bulk_sm_pass(struct bulk_channel *channel,
261 struct bulk_buffer *buffer,
263 struct bulk_continuation cont)
267 size_t metasize = (meta) ? channel->meta_size : 0;//tolerate null pointers
269 //store the arguments for when we get the reply later
270 union pending_msg_data pmsg = {
271 .pass.continuation = cont,
273 err = pending_msg_add(channel, &tid, pmsg);
274 assert(err_is_ok(err));//adding should actually never fail
276 //if fully trusted, the other side already has the cap, so don't resend it
278 if (channel->trust == BULK_TRUST_FULL){
285 struct pass_data *d = malloc(sizeof(*d));
289 d->metasize = metasize;
292 d->channel = channel;
293 fill_pool_id_for_flounder(&buffer->pool->id, &d->poolid);
295 //send message (automatically retries if channel busy)
296 bulk_sm_flounder_send_fifo_msg_with_arg(channel,bulk_sm_pass_send_request, d);
297 //if the transmission fails, the user will be notified through the continuation
302 //--------------- flounder RPC handlers:
304 //move, copy, pass and release replies all have the same format
305 struct bulk_sm_reply_data {
306 struct bulk_channel *channel;
307 struct event_closure cb;
308 bulk_ctrl_error_t error;
312 static errval_t bulk_sm_move_send_reply(void *a)
314 struct bulk_sm_reply_data *rdata = a;
315 struct bulk_ctrl_binding *b = CHANNEL_BINDING(rdata->channel);
317 errval_t err = bulk_ctrl_move_response__tx(b, rdata->cb,
318 rdata->error, rdata->tid);
320 if (err_is_ok(err)) {
327 void bulk_sm_move_rx_call(
328 struct bulk_ctrl_binding *b,
329 bulk_ctrl_poolid_t poolid,
336 errval_t err = SYS_ERR_OK;
337 struct event_closure txcont;
338 struct bulk_buffer *buffer = NULL;
339 struct bulk_channel *channel = VOID2CHANNEL(b->st);
341 assert(metasize == channel->meta_size || metasize == 0);
343 struct bulk_pool_id b_poolid;
344 fill_pool_id_from_flounder(&b_poolid, &poolid);
346 struct bulk_pool *pool = bulk_pool_get(&b_poolid, channel);
348 err = BULK_TRANSFER_POOL_INVALD;
349 } else if (pool->num_buffers < bufferid){
350 err = BULK_TRANSFER_BUFFER_INVALID;
352 buffer = pool->buffers[bufferid];
354 //in the untrusted case, we also received the cap for this buffer
355 if (channel->trust == BULK_TRUST_NONE){
356 //make sure transmitter does not keep a copy for himself
357 err = cap_revoke(cap);
358 assert(err_is_ok(err));
359 err = bulk_buffer_assign_cap(buffer, cap, 0);
363 //automatically remaps if necessary
364 err = bulk_buffer_change_state(buffer, BULK_BUFFER_READ_WRITE);
368 //send reply & inform user
370 if (channel->callbacks->move_received) {
371 channel->callbacks->move_received(channel, buffer, (CONST_CAST)meta);
374 txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
375 "bulk_sm_move_rx_call: reply sent.");
377 txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
378 "bulk_sm_move_rx_call: reply to invalid move sent");
381 struct bulk_sm_reply_data *rdata = malloc(sizeof(*rdata));
383 rdata->channel = channel;
388 bulk_sm_flounder_send_fifo_msg_with_arg(channel,
389 bulk_sm_move_send_reply, rdata);
392 void bulk_sm_move_trusted_rx_call(
393 struct bulk_ctrl_binding *b,
394 bulk_ctrl_poolid_t poolid,
400 //call normal handler with a NULL_CAP
401 bulk_sm_move_rx_call(b, poolid, bufferid, tid, NULL_CAP, meta, metasize);
405 void bulk_sm_move_rx_response(
406 struct bulk_ctrl_binding *b,
407 bulk_ctrl_error_t error,
410 //find data associated with this RPC call
411 union pending_msg_data data;
412 errval_t err = pending_msg_get(VOID2CHANNEL(b->st), tid, &data, true);
413 if (err_is_fail(err)){
414 //no such message data -> ignore?
415 DEBUG_ERR(err, "bulk_sm_move_rx_response");
419 //TODO: clean up if error is fail
422 bulk_continuation_call(data.move.continuation, (errval_t) error,
423 VOID2CHANNEL(b->st));
426 static errval_t bulk_sm_copy_send_reply(void *a)
428 struct bulk_sm_reply_data *rdata = a;
429 struct bulk_ctrl_binding *b = CHANNEL_BINDING(rdata->channel);
431 errval_t err = bulk_ctrl_copy_response__tx(
432 b, rdata->cb, rdata->error, rdata->tid);
434 if (err_is_ok(err)) {
440 void bulk_sm_copy_rx_call(
441 struct bulk_ctrl_binding *b,
442 bulk_ctrl_poolid_t poolid,
449 errval_t err = SYS_ERR_OK;
450 struct event_closure txcont;
451 struct bulk_buffer *buffer = NULL;
452 struct bulk_channel *channel = VOID2CHANNEL(b->st);
454 assert(metasize == channel->meta_size || metasize == 0);
456 struct bulk_pool_id b_poolid;
457 fill_pool_id_from_flounder(&b_poolid, &poolid);
459 struct bulk_pool *pool = bulk_pool_get(&b_poolid, channel);
461 err = BULK_TRANSFER_POOL_INVALD;
462 } else if (pool->num_buffers < bufferid){
463 err = BULK_TRANSFER_BUFFER_INVALID;
465 buffer = pool->buffers[bufferid];
466 //in the untrusted case, we also received the cap for this buffer
467 if (channel->trust == BULK_TRUST_NONE){
468 //TODO: make sure there is no rw cap in transmitters cspace
469 // the way to do this would be to check that this is a shared_readonly cap
470 err = bulk_buffer_assign_cap(buffer, cap, 0);
474 //automatically remaps if necessary
475 err = bulk_buffer_change_state(buffer, BULK_BUFFER_READ_ONLY);
476 //TODO: keep track of copies? adjust refcount? do we let the user do that?
481 if (channel->callbacks->copy_received) {
482 channel->callbacks->copy_received(channel, buffer, (CONST_CAST)meta);
485 txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
486 "bulk_sm_copy_rx_call: reply sent.");
488 txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
489 "bulk_sm_copy_rx_call: reply to invalid copy sent");
492 struct bulk_sm_reply_data *rdata = malloc(sizeof(*rdata));
494 rdata->channel = channel;
500 bulk_sm_flounder_send_fifo_msg_with_arg(channel,
501 bulk_sm_copy_send_reply, rdata);
504 void bulk_sm_copy_trusted_rx_call(
505 struct bulk_ctrl_binding *b,
506 bulk_ctrl_poolid_t poolid,
512 //call normal handler with a NULL_CAP
513 bulk_sm_copy_rx_call(b, poolid, bufferid, tid, NULL_CAP, meta, metasize);
517 void bulk_sm_copy_rx_response(
518 struct bulk_ctrl_binding *b,
519 bulk_ctrl_error_t error,
522 //find data associated with this RPC call
523 union pending_msg_data data;
524 errval_t err = pending_msg_get(VOID2CHANNEL(b->st), tid, &data, true);
525 if (err_is_fail(err)){
526 //no such message data -> ignore?
527 DEBUG_ERR(err, "bulk_sm_copy_rx_response");
531 //TODO: clean up if error is fail
534 bulk_continuation_call(data.copy.continuation, (errval_t) error,
535 VOID2CHANNEL(b->st));
538 static errval_t bulk_sm_pass_send_reply(void *a)
540 struct bulk_sm_reply_data *rdata = a;
541 struct bulk_ctrl_binding *b = CHANNEL_BINDING(rdata->channel);
543 errval_t err = bulk_ctrl_pass_response__tx(b, rdata->cb,
544 rdata->error, rdata->tid);
546 if (err_is_ok(err)) {
553 void bulk_sm_pass_rx_call(
554 struct bulk_ctrl_binding *b,
555 bulk_ctrl_poolid_t poolid,
562 BULK_DEBUG_PRINT("%s", "bulk_sm_pass_rx_call called\n");
564 errval_t err = SYS_ERR_OK;
565 struct event_closure txcont;
566 struct bulk_buffer *buffer = NULL;
567 struct bulk_channel *channel = VOID2CHANNEL(b->st);
569 assert(metasize == channel->meta_size || metasize == 0);
571 struct bulk_pool_id b_poolid;
572 fill_pool_id_from_flounder(&b_poolid, &poolid);
574 struct bulk_pool *pool = bulk_pool_get(&b_poolid, channel);
576 err = BULK_TRANSFER_POOL_INVALD;
577 } else if (pool->num_buffers < bufferid){
578 err = BULK_TRANSFER_BUFFER_INVALID;
580 buffer = pool->buffers[bufferid];
582 //in the untrusted case, we also received the cap for this buffer
583 if (channel->trust == BULK_TRUST_NONE){
584 //make sure transmitter does not keep a copy for himself
585 err = cap_revoke(cap);
586 assert(err_is_ok(err));
587 err = bulk_buffer_assign_cap(buffer, cap, 0);
591 //automatically remaps if necessary
592 err = bulk_buffer_change_state(buffer, BULK_BUFFER_READ_WRITE);
596 //send reply & inform user
598 if (channel->callbacks->buffer_received) {
599 channel->callbacks->buffer_received(channel, buffer, (CONST_CAST)meta);
602 txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
603 "bulk_sm_pass_rx_call: reply sent.");
605 txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
606 "bulk_sm_pass_rx_call: reply to invalid pass sent");
609 struct bulk_sm_reply_data *rdata = malloc(sizeof(*rdata));
611 rdata->channel = channel;
616 bulk_sm_flounder_send_fifo_msg_with_arg(channel,
617 bulk_sm_pass_send_reply, rdata);
620 void bulk_sm_pass_trusted_rx_call(
621 struct bulk_ctrl_binding *b,
622 bulk_ctrl_poolid_t poolid,
628 //call normal handler with a NULL_CAP
629 bulk_sm_pass_rx_call(b, poolid, bufferid, tid, NULL_CAP, meta, metasize);
633 void bulk_sm_pass_rx_response(
634 struct bulk_ctrl_binding *b,
635 bulk_ctrl_error_t error,
638 BULK_DEBUG_PRINT("bulk_sm_pass_rx_response called. TID = %u\n", tid);
639 //find data associated with this RPC call
640 union pending_msg_data data;
641 errval_t err = pending_msg_get(VOID2CHANNEL(b->st), tid, &data, true);
642 if (err_is_fail(err)){
643 //no such message data -> ignore?
644 DEBUG_ERR(err, "bulk_sm_copy_rx_response");
648 //TODO: clean up if error is fail
651 bulk_continuation_call(data.pass.continuation, (errval_t) error,
652 VOID2CHANNEL(b->st));
655 static errval_t bulk_sm_release_send_reply(void *a)
657 struct bulk_sm_reply_data *rdata = a;
658 struct bulk_ctrl_binding *b = CHANNEL_BINDING(rdata->channel);
660 errval_t err = bulk_ctrl_release_response__tx(b, rdata->cb,
661 rdata->error, rdata->tid);
663 if (err_is_fail(err)) {
670 void bulk_sm_release_rx_call(
671 struct bulk_ctrl_binding *b,
672 bulk_ctrl_poolid_t poolid,
676 errval_t err = SYS_ERR_OK;
677 struct event_closure txcont;
678 struct bulk_buffer *buffer = NULL;
679 struct bulk_channel *channel = VOID2CHANNEL(b->st);
681 struct bulk_pool_id b_poolid;
682 fill_pool_id_from_flounder(&b_poolid, &poolid);
684 struct bulk_pool *pool = bulk_pool_get(&b_poolid, channel);
686 err = BULK_TRANSFER_POOL_INVALD;
687 } else if (pool->num_buffers < bufferid){
688 err = BULK_TRANSFER_BUFFER_INVALID;
690 buffer = pool->buffers[bufferid];
691 buffer->local_ref_count--;
692 //TODO: make the decrease atomic? (so only the last decrease reclaims it)
693 //TODO: find out what the refcount should be to take action (0 or 1?)
694 if (buffer->local_ref_count == 0 && buffer->state == BULK_BUFFER_RO_OWNED){
696 if (channel->trust == BULK_TRUST_NONE){
697 err = cap_revoke(buffer->cap);
698 assert(err_is_ok(err));
701 //automatically remaps if necessary
702 err = bulk_buffer_change_state(buffer, BULK_BUFFER_READ_WRITE);
705 //TODO: what if refcount became 0 but we are not the owner?
706 //should we just let the user callback handle that? (we probably have to)
709 //send reply & inform user
711 if (channel->callbacks->copy_released) {
712 channel->callbacks->copy_released(channel, buffer);
715 txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
716 "bulk_sm_release_rx_call: reply sent.");
718 txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
719 "bulk_sm_release_rx_call: reply to invalid release sent");
722 struct bulk_sm_reply_data *rdata = malloc(sizeof(*rdata));
724 rdata->channel = channel;
729 bulk_sm_flounder_send_fifo_msg_with_arg(channel,
730 bulk_sm_release_send_reply, rdata);
734 void bulk_sm_release_rx_response(
735 struct bulk_ctrl_binding *b,
736 bulk_ctrl_error_t error,
739 //find data associated with this RPC call
740 union pending_msg_data data;
741 errval_t err = pending_msg_get(VOID2CHANNEL(b->st), tid, &data, true);
742 if (err_is_fail(err)){
743 //no such message data -> ignore?
744 DEBUG_ERR(err, "bulk_sm_release_rx_response");
748 //TODO: clean up if error is fail
751 bulk_continuation_call(data.release.continuation, (errval_t) error,
752 VOID2CHANNEL(b->st));