Adding some initializations
[barrelfish] / lib / bulk_transfer / backends / sm / buffers.c
1 /*
2  * Copyright (c) 2009, 2010, 2011, 2012, ETH Zurich.
3  * All rights reserved.
4  *
5  * This file is distributed under the terms in the attached LICENSE file.
6  * If you do not find this file, copies can be found by writing to:
7  * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
8  */
9
10 #include <barrelfish/barrelfish.h>
11
12 #include <bulk_transfer/bulk_transfer.h>
13 #include <bulk_transfer/bulk_sm.h>
14 #include "../../bulk_pool.h"
15 #include "../../bulk_buffer.h"
16 #include "bulk_sm_impl.h"
17 #include "pending_msg.h"
18
19 #if 0
20 #define BULK_DEBUG_PRINT(fmt, msg...) debug_printf(fmt, msg)
21 #else
22 #define BULK_DEBUG_PRINT(fmt, msg...)
23 #endif
24
25 //the same values are necessary for move, pass and copy operations
26 struct pass_data {
27     uint32_t tid;
28     void *meta;
29     size_t metasize;
30     struct capref cap;
31     struct bulk_buffer *buffer;
32     struct bulk_channel *channel;
33     bulk_ctrl_poolid_t poolid;
34 };
35
36 static errval_t bulk_sm_move_send_request(void *a)
37 {
38     struct pass_data         *d       = (struct pass_data*) a;
39     struct bulk_buffer       *buffer  = d->buffer;
40     struct bulk_channel      *channel = d->channel;
41     struct bulk_ctrl_binding *b       = CHANNEL_BINDING(channel);
42
43     struct event_closure txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
44             "bulk_sm_move sent");
45
46     errval_t err;
47     if (channel->trust == BULK_TRUST_NONE) {
48         err = bulk_ctrl_move_untrusted_call__tx(b, txcont, d->poolid, buffer->bufferid,
49                                                 d->tid, d->cap, d->meta, d->metasize);
50     } else {
51         err = bulk_ctrl_move_trusted_call__tx(b, txcont, d->poolid, buffer->bufferid,
52                                               d->tid, d->meta, d->metasize);
53     }
54
55     if (err_is_ok(err)) {
56         free(d);
57     } else if (err_no(err) != FLOUNDER_ERR_TX_BUSY) {
58         //sending this message will never work, do not retry
59         //notify user the same way as if the other side had an error
60         bulk_sm_move_rx_response(b, err, d->tid);
61         free(d);
62     }
63     return err;
64 }
65
66 errval_t bulk_sm_move(struct bulk_channel      *channel,
67                       struct bulk_buffer       *buffer,
68                       void                     *meta,
69                       struct bulk_continuation cont)
70 {
71     errval_t err;
72     uint32_t tid;
73     size_t   metasize = (meta) ? channel->meta_size : 0;//tolerate null pointers
74
75     //store the arguments for when we get the reply later
76     union pending_msg_data pmsg = {
77         .move.continuation = cont,
78         };
79     err = pending_msg_add(channel, &tid, pmsg);
80     assert(err_is_ok(err));//adding should actually never fail
81
82     //if fully trusted, the other side already has the cap, so don't resend it
83     struct capref cap;
84     if (channel->trust == BULK_TRUST_FULL){
85         cap = NULL_CAP;
86     } else {
87         cap = buffer->cap;
88     }
89
90     //send message
91     struct pass_data *d = malloc(sizeof(*d));
92     assert(d);
93     d->tid = tid;
94     d->meta = meta;
95     d->metasize = metasize;
96     d->cap = cap;
97     d->buffer = buffer;
98     d->channel = channel;
99     fill_pool_id_for_flounder(&buffer->pool->id, &d->poolid);
100
101     //send message (automatically retries if channel busy)
102     bulk_sm_flounder_send_fifo_msg_with_arg(channel,bulk_sm_move_send_request,d);
103     //if the transmission fails, the user will be notified through the continuation
104     return err;
105 }
106
107 static errval_t bulk_sm_copy_send_request(void *a)
108 {
109     struct pass_data         *d       = (struct pass_data*) a;
110     struct bulk_buffer       *buffer  = d->buffer;
111     struct bulk_channel      *channel = d->channel;
112     struct bulk_ctrl_binding *b       = CHANNEL_BINDING(channel);
113
114     struct event_closure txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
115             "bulk_sm_copy sent");
116     errval_t err;
117     if (channel->trust == BULK_TRUST_NONE) {
118         err = bulk_ctrl_copy_untrusted_call__tx(b, txcont, d->poolid, buffer->bufferid,
119                                                 d->tid, d->cap, d->meta, d->metasize);
120     } else {
121         err = bulk_ctrl_copy_trusted_call__tx(b, txcont, d->poolid, buffer->bufferid,
122                                               d->tid, d->meta, d->metasize);
123     }
124
125     if (err_is_ok(err)) {
126         free(d);
127     } else if (err_no(err) != FLOUNDER_ERR_TX_BUSY) {
128         //sending this message will never work, do not retry
129         //notify user the same way as if the other side had an error
130         bulk_sm_copy_rx_response(b, err, d->tid);
131         free(d);
132     }
133     return err;
134 }
135
136 errval_t bulk_sm_copy(struct bulk_channel      *channel,
137                       struct bulk_buffer       *buffer,
138                       void                     *meta,
139                       struct bulk_continuation cont)
140 {
141     errval_t err;
142     uint32_t tid;
143     size_t   metasize = (meta) ? channel->meta_size : 0;//tolerate null pointers
144
145     //store the arguments for when we get the reply later
146     union pending_msg_data pmsg = {
147         .copy.continuation = cont,
148         };
149     err = pending_msg_add(channel, &tid, pmsg);
150     assert(err_is_ok(err));//adding should actually never fail
151
152     //if fully trusted, the other side already has the cap, so don't resend it
153     struct capref cap;
154     if (channel->trust == BULK_TRUST_FULL){
155         cap = NULL_CAP;
156     } else {
157         cap = buffer->cap;
158     }
159
160     //send message
161     struct pass_data *d = malloc(sizeof(*d));
162     assert(d);
163     d->tid = tid;
164     d->meta = meta;
165     d->metasize = metasize;
166     d->cap = cap;
167     d->buffer = buffer;
168     d->channel = channel;
169     fill_pool_id_for_flounder(&buffer->pool->id, &d->poolid);
170
171     //send message (automatically retries if channel busy)
172     bulk_sm_flounder_send_fifo_msg_with_arg(channel,bulk_sm_copy_send_request,d);
173     //if the transmission fails, the user will be notified through the continuation
174     return err;
175 }
176
177 static errval_t bulk_sm_release_send_request(void *a)
178 {
179     struct pass_data         *d       = (struct pass_data*) a;
180     struct bulk_buffer       *buffer  = d->buffer;
181     struct bulk_channel      *channel = d->channel;
182     struct bulk_ctrl_binding *b       = CHANNEL_BINDING(channel);
183
184     struct event_closure txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
185             "bulk_sm_release sent");
186     errval_t err = bulk_ctrl_release_call__tx(b, txcont,
187             d->poolid, buffer->bufferid, d->tid);
188
189     if (err_is_ok(err)) {
190         free(d);
191     } else if (err_no(err) != FLOUNDER_ERR_TX_BUSY) {
192         //sending this message will never work, do not retry
193         //notify user the same way as if the other side had an error
194         bulk_sm_release_rx_response(b, err, d->tid);
195         free(d);
196     }
197     return err;
198 }
199
200
201 errval_t bulk_sm_release(struct bulk_channel      *channel,
202                          struct bulk_buffer       *buffer,
203                          struct bulk_continuation cont)
204 {
205     errval_t err;
206     uint32_t tid;
207
208     //store the arguments for when we get the reply later
209     union pending_msg_data pmsg = {
210         .release.continuation = cont,
211         };
212     err = pending_msg_add(channel, &tid, pmsg);
213     assert(err_is_ok(err));//adding should actually never fail
214
215
216     //send message
217     struct pass_data *d = malloc(sizeof(*d));//could use smaller struct
218     assert(d);
219     d->tid = tid;
220     d->buffer = buffer;
221     d->channel = channel;
222     fill_pool_id_for_flounder(&buffer->pool->id, &d->poolid);
223
224     //send message (automatically retries if channel busy)
225     bulk_sm_flounder_send_fifo_msg_with_arg(channel,bulk_sm_release_send_request,d);
226     //if the transmission fails, the user will be notified through the continuation
227     return err;
228 }
229
230 static errval_t bulk_sm_pass_send_request(void *a)
231 {
232     struct pass_data         *d       = (struct pass_data*) a;
233     struct bulk_buffer       *buffer  = d->buffer;
234     struct bulk_channel      *channel = d->channel;
235     struct bulk_ctrl_binding *b       = CHANNEL_BINDING(channel);
236
237     struct event_closure txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
238             "bulk_sm_pass sent");
239
240     errval_t err;
241     if (channel->trust == BULK_TRUST_NONE) {
242         err = bulk_ctrl_pass_untrusted_call__tx(b, txcont, d->poolid, buffer->bufferid,
243                                                 d->tid, d->cap, d->meta, d->metasize);
244     } else {
245         err = bulk_ctrl_pass_trusted_call__tx(b, txcont, d->poolid, buffer->bufferid,
246                                               d->tid, d->meta, d->metasize);
247     }
248
249     if (err_is_ok(err)) {
250         free(d);
251     } else if (err_no(err) != FLOUNDER_ERR_TX_BUSY) {
252         //sending this message will never work, do not retry
253         //notify user the same way as if the other side had an error
254         bulk_sm_pass_rx_response(b, err, d->tid);
255         free(d);
256     }
257     return err;
258 }
259
260 errval_t bulk_sm_pass(struct bulk_channel      *channel,
261                       struct bulk_buffer       *buffer,
262                       void                     *meta,
263                       struct bulk_continuation cont)
264 {
265     errval_t err;
266     uint32_t tid;
267     size_t   metasize = (meta) ? channel->meta_size : 0;//tolerate null pointers
268
269     //store the arguments for when we get the reply later
270     union pending_msg_data pmsg = {
271         .pass.continuation = cont,
272         };
273     err = pending_msg_add(channel, &tid, pmsg);
274     assert(err_is_ok(err));//adding should actually never fail
275
276     //if fully trusted, the other side already has the cap, so don't resend it
277     struct capref cap;
278     if (channel->trust == BULK_TRUST_FULL){
279         cap = NULL_CAP;
280     } else {
281         cap = buffer->cap;
282     }
283
284
285     struct pass_data *d = malloc(sizeof(*d));
286     assert(d);
287     d->tid = tid;
288     d->meta = meta;
289     d->metasize = metasize;
290     d->cap = cap;
291     d->buffer = buffer;
292     d->channel = channel;
293     fill_pool_id_for_flounder(&buffer->pool->id, &d->poolid);
294
295     //send message (automatically retries if channel busy)
296     bulk_sm_flounder_send_fifo_msg_with_arg(channel,bulk_sm_pass_send_request, d);
297     //if the transmission fails, the user will be notified through the continuation
298     return err;
299 }
300
301
302 //--------------- flounder RPC handlers:
303
304 //move, copy, pass and release replies all have the same format
305 struct bulk_sm_reply_data {
306     struct bulk_channel      *channel;
307     struct event_closure     cb;
308     bulk_ctrl_error_t        error;
309     uint32_t                 tid;
310 };
311
312 static errval_t bulk_sm_move_send_reply(void *a)
313 {
314     struct bulk_sm_reply_data   *rdata = a;
315     struct bulk_ctrl_binding    *b     = CHANNEL_BINDING(rdata->channel);
316
317     errval_t err = bulk_ctrl_move_response__tx(b, rdata->cb,
318                                                     rdata->error, rdata->tid);
319
320     if (err_is_ok(err)) {
321         free(rdata);
322     }
323     return err;
324 }
325
326
327 void bulk_sm_move_rx_call(
328         struct bulk_ctrl_binding *b,
329         bulk_ctrl_poolid_t       poolid,
330         uint32_t                 bufferid,
331         uint32_t                 tid,
332         struct capref            cap,
333         uint8_t                  *meta,
334         size_t                   metasize)
335 {
336     errval_t err = SYS_ERR_OK;
337     struct event_closure txcont;
338     struct bulk_buffer *buffer = NULL;
339     struct bulk_channel *channel = VOID2CHANNEL(b->st);
340
341     assert(metasize == channel->meta_size || metasize == 0);
342
343     struct bulk_pool_id b_poolid;
344     fill_pool_id_from_flounder(&b_poolid, &poolid);
345
346     struct bulk_pool *pool = bulk_pool_get(&b_poolid, channel);
347     if (pool == NULL){
348         err = BULK_TRANSFER_POOL_INVALD;
349     } else if (pool->num_buffers < bufferid){
350         err = BULK_TRANSFER_BUFFER_INVALID;
351     } else {
352         buffer = pool->buffers[bufferid];
353
354         //in the untrusted case, we also received the cap for this buffer
355         if (channel->trust == BULK_TRUST_NONE){
356             //make sure transmitter does not keep a copy for himself
357             err = cap_revoke(cap);
358             assert(err_is_ok(err));
359             err = bulk_buffer_assign_cap(buffer, cap, 0);
360         }
361
362         if (err_is_ok(err)){
363             //automatically remaps if necessary
364             err = bulk_buffer_change_state(buffer, BULK_BUFFER_READ_WRITE);
365         }
366     }
367
368     //send reply & inform user
369     if (err_is_ok(err)){
370         if (channel->callbacks->move_received) {
371             channel->callbacks->move_received(channel, buffer, meta);
372         }
373
374         txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
375                 "bulk_sm_move_rx_call: reply sent.");
376     } else {
377         txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
378                 "bulk_sm_move_rx_call: reply to invalid move sent");
379     }
380
381     struct bulk_sm_reply_data *rdata = malloc(sizeof(*rdata));
382     assert(rdata);
383     rdata->channel = channel;
384     rdata->cb      = txcont;
385     rdata->error   = err;
386     rdata->tid     = tid;
387
388     bulk_sm_flounder_send_fifo_msg_with_arg(channel,
389                                             bulk_sm_move_send_reply, rdata);
390 }
391
392 void bulk_sm_move_trusted_rx_call(
393         struct bulk_ctrl_binding *b,
394         bulk_ctrl_poolid_t       poolid,
395         uint32_t                 bufferid,
396         uint32_t                 tid,
397         uint8_t                  *meta,
398         size_t                   metasize)
399 {
400     //call normal handler with a NULL_CAP
401     bulk_sm_move_rx_call(b, poolid, bufferid, tid, NULL_CAP, meta, metasize);
402 }
403
404
405 void bulk_sm_move_rx_response(
406         struct bulk_ctrl_binding *b,
407         bulk_ctrl_error_t        error,
408         uint32_t                 tid)
409 {
410     //find data associated with this RPC call
411     union pending_msg_data data;
412     errval_t err = pending_msg_get(VOID2CHANNEL(b->st), tid, &data, true);
413     if (err_is_fail(err)){
414         //no such message data -> ignore?
415         DEBUG_ERR(err, "bulk_sm_move_rx_response");
416         return;
417     }
418
419     //TODO: clean up if error is fail
420
421     //call continuation
422     bulk_continuation_call(data.move.continuation, (errval_t) error,
423             VOID2CHANNEL(b->st));
424 }
425
426 static errval_t bulk_sm_copy_send_reply(void *a)
427 {
428     struct bulk_sm_reply_data   *rdata = a;
429     struct bulk_ctrl_binding    *b     = CHANNEL_BINDING(rdata->channel);
430
431     errval_t err = bulk_ctrl_copy_response__tx(
432                         b, rdata->cb, rdata->error, rdata->tid);
433
434     if (err_is_ok(err)) {
435         free(rdata);
436     }
437     return err;
438 }
439
440 void bulk_sm_copy_rx_call(
441         struct bulk_ctrl_binding *b,
442         bulk_ctrl_poolid_t       poolid,
443         uint32_t                 bufferid,
444         uint32_t                 tid,
445         struct capref            cap,
446         uint8_t                  *meta,
447         size_t                   metasize)
448 {
449     errval_t err = SYS_ERR_OK;
450     struct event_closure txcont;
451     struct bulk_buffer *buffer = NULL;
452     struct bulk_channel *channel = VOID2CHANNEL(b->st);
453
454     assert(metasize == channel->meta_size || metasize == 0);
455
456     struct bulk_pool_id b_poolid;
457     fill_pool_id_from_flounder(&b_poolid, &poolid);
458
459     struct bulk_pool *pool = bulk_pool_get(&b_poolid, channel);
460     if (pool == NULL){
461         err = BULK_TRANSFER_POOL_INVALD;
462     } else if (pool->num_buffers < bufferid){
463         err = BULK_TRANSFER_BUFFER_INVALID;
464     } else {
465         buffer = pool->buffers[bufferid];
466         //in the untrusted case, we also received the cap for this buffer
467         if (channel->trust == BULK_TRUST_NONE){
468             //TODO: make sure there is no rw cap in transmitters cspace
469             //      the way to do this would be to check that this is a shared_readonly cap
470             err = bulk_buffer_assign_cap(buffer, cap, 0);
471         }
472
473         if (err_is_ok(err)){
474             //automatically remaps if necessary
475             err = bulk_buffer_change_state(buffer, BULK_BUFFER_READ_ONLY);
476             //TODO: keep track of copies? adjust refcount? do we let the user do that?
477         }
478     }
479
480     if (err_is_ok(err)){
481         if (channel->callbacks->copy_received) {
482             channel->callbacks->copy_received(channel, buffer, meta);
483         }
484
485         txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
486                 "bulk_sm_copy_rx_call: reply sent.");
487     } else {
488         txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
489                 "bulk_sm_copy_rx_call: reply to invalid copy sent");
490     }
491
492     struct bulk_sm_reply_data *rdata = malloc(sizeof(*rdata));
493     assert(rdata);
494     rdata->channel = channel;
495     rdata->cb      = txcont;
496     rdata->error   = err;
497     rdata->tid     = tid;
498
499
500     bulk_sm_flounder_send_fifo_msg_with_arg(channel,
501                                             bulk_sm_copy_send_reply, rdata);
502 }
503
504 void bulk_sm_copy_trusted_rx_call(
505         struct bulk_ctrl_binding *b,
506         bulk_ctrl_poolid_t       poolid,
507         uint32_t                 bufferid,
508         uint32_t                 tid,
509         uint8_t                  *meta,
510         size_t                   metasize)
511 {
512     //call normal handler with a NULL_CAP
513     bulk_sm_copy_rx_call(b, poolid, bufferid, tid, NULL_CAP, meta, metasize);
514 }
515
516
517 void bulk_sm_copy_rx_response(
518         struct bulk_ctrl_binding *b,
519         bulk_ctrl_error_t        error,
520         uint32_t                 tid)
521 {
522     //find data associated with this RPC call
523     union pending_msg_data data;
524     errval_t err = pending_msg_get(VOID2CHANNEL(b->st), tid, &data, true);
525     if (err_is_fail(err)){
526         //no such message data -> ignore?
527         DEBUG_ERR(err, "bulk_sm_copy_rx_response");
528         return;
529     }
530
531     //TODO: clean up if error is fail
532
533     //call continuation
534     bulk_continuation_call(data.copy.continuation, (errval_t) error,
535             VOID2CHANNEL(b->st));
536 }
537
538 static errval_t bulk_sm_pass_send_reply(void *a)
539 {
540     struct bulk_sm_reply_data   *rdata = a;
541     struct bulk_ctrl_binding    *b     = CHANNEL_BINDING(rdata->channel);
542
543     errval_t err = bulk_ctrl_pass_response__tx(b, rdata->cb,
544                                                rdata->error, rdata->tid);
545
546     if (err_is_ok(err)) {
547         free(rdata);
548     }
549     return err;
550 }
551
552
553 void bulk_sm_pass_rx_call(
554         struct bulk_ctrl_binding *b,
555         bulk_ctrl_poolid_t       poolid,
556         uint32_t                 bufferid,
557         uint32_t                 tid,
558         struct capref            cap,
559         uint8_t                  *meta,
560         size_t                   metasize)
561 {
562     BULK_DEBUG_PRINT("%s", "bulk_sm_pass_rx_call called\n");
563
564     errval_t err = SYS_ERR_OK;
565     struct event_closure txcont;
566     struct bulk_buffer *buffer = NULL;
567     struct bulk_channel *channel = VOID2CHANNEL(b->st);
568
569     assert(metasize == channel->meta_size || metasize == 0);
570
571     struct bulk_pool_id b_poolid;
572     fill_pool_id_from_flounder(&b_poolid, &poolid);
573
574     struct bulk_pool *pool = bulk_pool_get(&b_poolid, channel);
575     if (pool == NULL){
576         err = BULK_TRANSFER_POOL_INVALD;
577     } else if (pool->num_buffers < bufferid){
578         err = BULK_TRANSFER_BUFFER_INVALID;
579     } else {
580         buffer = pool->buffers[bufferid];
581
582         //in the untrusted case, we also received the cap for this buffer
583         if (channel->trust == BULK_TRUST_NONE){
584             //make sure transmitter does not keep a copy for himself
585             err = cap_revoke(cap);
586             assert(err_is_ok(err));
587             err = bulk_buffer_assign_cap(buffer, cap, 0);
588         }
589
590         if (err_is_ok(err)){
591             //automatically remaps if necessary
592             err = bulk_buffer_change_state(buffer, BULK_BUFFER_READ_WRITE);
593         }
594     }
595
596     //send reply & inform user
597     if (err_is_ok(err)){
598         if (channel->callbacks->buffer_received) {
599             channel->callbacks->buffer_received(channel, buffer, meta);
600         }
601
602         txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
603                 "bulk_sm_pass_rx_call: reply sent.");
604     } else {
605         txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
606                 "bulk_sm_pass_rx_call: reply to invalid pass sent");
607     }
608
609     struct bulk_sm_reply_data *rdata = malloc(sizeof(*rdata));
610     assert(rdata);
611     rdata->channel = channel;
612     rdata->cb      = txcont;
613     rdata->error   = err;
614     rdata->tid     = tid;
615
616     bulk_sm_flounder_send_fifo_msg_with_arg(channel,
617                                             bulk_sm_pass_send_reply, rdata);
618 }
619
620 void bulk_sm_pass_trusted_rx_call(
621         struct bulk_ctrl_binding *b,
622         bulk_ctrl_poolid_t       poolid,
623         uint32_t                 bufferid,
624         uint32_t                 tid,
625         uint8_t                  *meta,
626         size_t                   metasize)
627 {
628     //call normal handler with a NULL_CAP
629     bulk_sm_pass_rx_call(b, poolid, bufferid, tid, NULL_CAP, meta, metasize);
630 }
631
632
633 void bulk_sm_pass_rx_response(
634         struct bulk_ctrl_binding *b,
635         bulk_ctrl_error_t        error,
636         uint32_t                 tid)
637 {
638     BULK_DEBUG_PRINT("bulk_sm_pass_rx_response called. TID = %u\n", tid);
639     //find data associated with this RPC call
640     union pending_msg_data data;
641     errval_t err = pending_msg_get(VOID2CHANNEL(b->st), tid, &data, true);
642     if (err_is_fail(err)){
643         //no such message data -> ignore?
644         DEBUG_ERR(err, "bulk_sm_copy_rx_response");
645         return;
646     }
647
648     //TODO: clean up if error is fail
649
650     //call continuation
651     bulk_continuation_call(data.pass.continuation, (errval_t) error,
652             VOID2CHANNEL(b->st));
653 }
654
655 static errval_t bulk_sm_release_send_reply(void *a)
656 {
657     struct bulk_sm_reply_data   *rdata = a;
658     struct bulk_ctrl_binding    *b     = CHANNEL_BINDING(rdata->channel);
659
660     errval_t err = bulk_ctrl_release_response__tx(b, rdata->cb,
661                                                     rdata->error, rdata->tid);
662
663     if (err_is_fail(err)) {
664         free(rdata);
665     }
666     return err;
667 }
668
669
670 void bulk_sm_release_rx_call(
671         struct bulk_ctrl_binding *b,
672         bulk_ctrl_poolid_t       poolid,
673         uint32_t                 bufferid,
674         uint32_t                 tid)
675 {
676     errval_t err = SYS_ERR_OK;
677     struct event_closure txcont;
678     struct bulk_buffer *buffer = NULL;
679     struct bulk_channel *channel = VOID2CHANNEL(b->st);
680
681     struct bulk_pool_id b_poolid;
682     fill_pool_id_from_flounder(&b_poolid, &poolid);
683
684     struct bulk_pool *pool = bulk_pool_get(&b_poolid, channel);
685     if (pool == NULL){
686         err = BULK_TRANSFER_POOL_INVALD;
687     } else if (pool->num_buffers < bufferid){
688         err = BULK_TRANSFER_BUFFER_INVALID;
689     } else {
690         buffer = pool->buffers[bufferid];
691         buffer->local_ref_count--;
692         //TODO: make the decrease atomic? (so only the last decrease reclaims it)
693         //TODO: find out what the refcount should be to take action (0 or 1?)
694         if (buffer->local_ref_count == 0 && buffer->state == BULK_BUFFER_RO_OWNED){
695             //retake ownership
696             if (channel->trust == BULK_TRUST_NONE){
697                 err = cap_revoke(buffer->cap);
698                 assert(err_is_ok(err));
699             }
700             if (err_is_ok(err)){
701                 //automatically remaps if necessary
702                 err = bulk_buffer_change_state(buffer, BULK_BUFFER_READ_WRITE);
703             }
704         }
705         //TODO: what if refcount became 0 but we are not the owner?
706         //should we just let the user callback handle that? (we probably have to)
707     }
708
709     //send reply & inform user
710     if (err_is_ok(err)){
711         if (channel->callbacks->copy_released) {
712             channel->callbacks->copy_released(channel, buffer);
713         }
714
715         txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
716                 "bulk_sm_release_rx_call: reply sent.");
717     } else {
718         txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
719                 "bulk_sm_release_rx_call: reply to invalid release sent");
720     }
721
722     struct bulk_sm_reply_data *rdata = malloc(sizeof(*rdata));
723     assert(rdata);
724     rdata->channel = channel;
725     rdata->cb      = txcont;
726     rdata->error   = err;
727     rdata->tid     = tid;
728
729     bulk_sm_flounder_send_fifo_msg_with_arg(channel,
730                                             bulk_sm_release_send_reply, rdata);
731 }
732
733
734 void bulk_sm_release_rx_response(
735         struct bulk_ctrl_binding *b,
736         bulk_ctrl_error_t        error,
737         uint32_t                 tid)
738 {
739     //find data associated with this RPC call
740     union pending_msg_data data;
741     errval_t err = pending_msg_get(VOID2CHANNEL(b->st), tid, &data, true);
742     if (err_is_fail(err)){
743         //no such message data -> ignore?
744         DEBUG_ERR(err, "bulk_sm_release_rx_response");
745         return;
746     }
747
748     //TODO: clean up if error is fail
749
750     //call continuation
751     bulk_continuation_call(data.release.continuation, (errval_t) error,
752             VOID2CHANNEL(b->st));
753 }