flounder: making const pointers in receiving handlers, using CONST_CAST as a temporar...
[barrelfish] / lib / octopus / server / service.c
1 /**
2  * \file
3  * \brief Contains handler functions for server-side octopus interface RPC call.
4  */
5
6 /*
7  * Copyright (c) 2009, 2010, 2012, ETH Zurich.
8  * All rights reserved.
9  *
10  * This file is distributed under the terms in the attached LICENSE file.
11  * If you do not find this file, copies can be found by writing to:
12  * ETH Zurich D-INFK, Universitaetstr. 6, CH-8092 Zurich. Attn: Systems Group.
13  */
14
15 #include <stdio.h>
16 #include <string.h>
17
18 #include <barrelfish/barrelfish.h>
19 #include <barrelfish/nameservice_client.h>
20 #include <skb/skb.h> // read list
21 #include <if/octopus_defs.h>
22
23 #include <octopus_server/service.h>
24 #include <octopus_server/query.h>
25 #include <octopus_server/debug.h>
26
27 #include <octopus/parser/ast.h>
28 #include <octopus/definitions.h>
29
30 #include <bench/bench.h>
31
32 #include "queue.h"
33
34 /**
35  * Name prefix used to by the functions set_with_idcap_handler() and
36  * get_with_idcap_handler() to store and retrieve records by idcap.
37  *
38  * This essentially emulates a dedicated namespace for records stored with an
39  * id cap. Octopus and the SKB do not support dedicated namespaces atm.
40  * FIXME: store records set with the function 'set_with_idcap' in a dedicated
41  *        namespace.
42  */
43 #define IDCAPID_NAME_PREFIX "idcapid."
44
45 static uint64_t current_id = 1;
46
47 static inline errval_t check_query_length(const char* query) {
48     if (strlen(query) >= MAX_QUERY_LENGTH) {
49         return OCT_ERR_QUERY_SIZE;
50     }
51
52     return SYS_ERR_OK;
53 }
54
55 errval_t new_oct_reply_state(struct oct_reply_state** drt,
56         oct_reply_handler_fn reply_handler)
57 {
58     assert(*drt == NULL);
59     *drt = malloc(sizeof(struct oct_reply_state));
60     if (*drt == NULL) {
61         return LIB_ERR_MALLOC_FAIL;
62     }
63
64     //memset(*drt, 0, sizeof(struct oct_reply_state));
65     (*drt)->query_state.std_out.buffer[0] = '\0';
66     (*drt)->query_state.std_out.length = 0;
67     (*drt)->query_state.std_err.buffer[0] = '\0';
68     (*drt)->query_state.std_err.length = 0;
69
70     (*drt)->binding = 0;
71     (*drt)->return_record = false;
72     (*drt)->error = 0;
73
74     // For set_watch()
75     (*drt)->mode = 0;
76     (*drt)->client_state = 0;
77     (*drt)->client_handler = 0;
78     (*drt)->server_id = 0;
79
80     (*drt)->reply = reply_handler;
81     (*drt)->next = NULL;
82
83     return SYS_ERR_OK;
84 }
85
86 static void free_oct_reply_state(void* arg)
87 {
88     if (arg != NULL) {
89         struct oct_reply_state* drt = (struct oct_reply_state*) arg;
90         // In case we have to free things in oct_reply_state, free here...
91
92         free(drt);
93     } else {
94         assert(!"free_reply_state with NULL argument?");
95     }
96 }
97
98 static void trigger_send_handler(struct octopus_binding* b,
99         struct oct_reply_state* drs)
100 {
101     char* record = drs->query_state.std_out.buffer[0] != '\0' ?
102             drs->query_state.std_out.buffer : NULL;
103
104     errval_t err;
105     err = b->tx_vtbl.trigger(b, MKCONT(free_oct_reply_state, drs),
106             drs->server_id,
107             drs->client_handler,
108             drs->mode,
109             record,
110             drs->client_state);
111     if (err_is_fail(err)) {
112         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
113             oct_rpc_enqueue_reply(b, drs);
114             return;
115         }
116         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
117     }
118 }
119
120 static inline bool can_install_trigger(octopus_trigger_t trigger, errval_t error)
121 {
122     OCT_DEBUG("%s:%s:%d: trigger.m > 0 = %d\n",
123               __FILE__, __FUNCTION__, __LINE__, trigger.m > 0);
124     OCT_DEBUG("%s:%s:%d: trigger.in_case == err_no(error) = %d\n",
125            __FILE__, __FUNCTION__, __LINE__, trigger.in_case == err_no(error));
126
127     return trigger.m > 0 &&
128            (trigger.in_case == err_no(error) ||
129            (trigger.m & OCT_ALWAYS_SET) != 0 );
130 }
131
132 static inline uint64_t install_trigger(struct octopus_binding* binding,
133         struct ast_object* ast, octopus_trigger_t trigger, errval_t error)
134 {
135     errval_t err;
136     uint64_t watch_id = 0;
137
138     if (can_install_trigger(trigger, error)) {
139         struct oct_reply_state* trigger_reply = NULL;
140         err = new_oct_reply_state(&trigger_reply, trigger_send_handler);
141         assert(err_is_ok(err));
142
143         trigger_reply->client_handler = trigger.trigger;
144         trigger_reply->client_state = trigger.st;
145
146         trigger_reply->binding = (trigger.send_to == octopus_BINDING_RPC) ?
147                 binding : get_event_binding(binding);
148         if (trigger_reply->binding == NULL) {
149             fprintf(stderr, "No event binding for trigger, send events "
150                             "over regular binding.");
151             trigger_reply->binding = binding;
152         }
153
154         err = set_watch(binding, ast, trigger.m, trigger_reply, &watch_id);
155         assert(err_is_ok(err));
156     }
157
158     return watch_id;
159 }
160
161 static void remove_trigger_reply(struct octopus_binding* b,
162         struct oct_reply_state* drs)
163 {
164     errval_t err;
165     err = b->tx_vtbl.remove_trigger_response(b,
166             MKCONT(free_oct_reply_state, drs),
167             drs->error);
168     if (err_is_fail(err)) {
169         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
170             oct_rpc_enqueue_reply(b, drs);
171             return;
172         }
173         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
174     }
175 }
176
177 void remove_trigger_handler(struct octopus_binding *b, octopus_trigger_id_t tid)
178 {
179     struct oct_reply_state* drs = NULL;
180     errval_t err = new_oct_reply_state(&drs, remove_trigger_reply);
181     assert(err_is_ok(err));
182
183     drs->error = del_watch(b, tid, &drs->query_state);
184     drs->reply(b, drs);
185 }
186
187 /*static inline void arrival_rate(void)
188 {
189     static cycles_t measure_time = 10000;
190     static uint64_t arrivals = 0;
191     static cycles_t start = 0;
192     arrivals++;
193     if ( (arrivals % 100) == 0 && bench_tsc_to_ms(bench_tsc() - start) > measure_time) {
194         printf("Get Rate per sec: %lu\n", arrivals / (measure_time / 1000));
195         start = bench_tsc();
196         arrivals = 0;
197     }
198 }*/
199
200 static void get_reply(struct octopus_binding* b, struct oct_reply_state* drt)
201 {
202     errval_t err;
203     char* reply = err_is_ok(drt->error) ?
204             drt->query_state.std_out.buffer : NULL;
205     err = b->tx_vtbl.get_response(b, MKCONT(free_oct_reply_state, drt),
206             reply, drt->server_id, drt->error);
207     if (err_is_fail(err)) {
208         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
209             oct_rpc_enqueue_reply(b, drt);
210             return;
211         }
212         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
213     }
214 }
215
216 void get_handler(struct octopus_binding *b, const char *query,
217                  octopus_trigger_t trigger)
218 {
219     errval_t err = SYS_ERR_OK;
220
221     struct oct_reply_state* drs = NULL;
222     struct ast_object* ast = NULL;
223     err = new_oct_reply_state(&drs, get_reply);
224     assert(err_is_ok(err));
225
226     err = check_query_length(query);
227     if (err_is_fail(err)) {
228         goto out;
229     }
230
231     err = generate_ast(query, &ast);
232     if (err_is_ok(err)) {
233         err = get_record(ast, &drs->query_state);
234         drs->server_id = install_trigger(b, ast, trigger, err);
235     }
236
237 out:
238     drs->error = err;
239     drs->reply(b, drs);
240
241     free_ast(ast);
242 }
243
244 static void get_names_reply(struct octopus_binding* b,
245         struct oct_reply_state* drt)
246 {
247     errval_t err;
248     char* reply = err_is_ok(drt->error) ?
249             drt->query_state.std_out.buffer : NULL;
250     err = b->tx_vtbl.get_names_response(b, MKCONT(free_oct_reply_state, drt),
251             reply, drt->server_id, drt->error);
252     if (err_is_fail(err)) {
253         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
254             oct_rpc_enqueue_reply(b, drt);
255             return;
256         }
257         if (err_no(err) == FLOUNDER_ERR_TX_MSG_SIZE) {
258             debug_printf("max msg size: %u, reply size: %zu\n",
259                     octopus__get_names_response_output_MAX_ARGUMENT_SIZE,
260                     drt->query_state.std_out.length);
261         }
262         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
263     }
264 }
265
266 void get_names_handler(struct octopus_binding *b, const char *query,
267                        octopus_trigger_t t)
268 {
269     OCT_DEBUG(" get_names_handler: %s\n", query);
270
271     errval_t err = SYS_ERR_OK;
272
273     struct oct_reply_state* drs = NULL;
274     struct ast_object* ast = NULL;
275
276     err = new_oct_reply_state(&drs, get_names_reply);
277     assert(err_is_ok(err));
278
279     err = check_query_length(query);
280     if (err_is_fail(err)) {
281         goto out;
282     }
283
284     err = generate_ast(query, &ast);
285     if (err_is_ok(err)) {
286         err = get_record_names(ast, &drs->query_state);
287         drs->server_id = install_trigger(b, ast, t, err);
288     }
289
290 out:
291     drs->error = err;
292     drs->reply(b, drs);
293
294     free_ast(ast);
295 }
296
297 static void set_reply(struct octopus_binding* b, struct oct_reply_state* drs)
298 {
299     char* record = err_is_ok(drs->error) && drs->return_record ?
300             drs->query_state.std_out.buffer : NULL;
301
302     errval_t err;
303     err = b->tx_vtbl.set_response(b, MKCONT(free_oct_reply_state, drs), record,
304             drs->server_id, drs->error);
305     if (err_is_fail(err)) {
306         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
307             oct_rpc_enqueue_reply(b, drs);
308             return;
309         }
310         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
311     }
312 }
313
314 void set_handler(struct octopus_binding *b, const char *query, uint64_t mode,
315         octopus_trigger_t trigger, bool get)
316 {
317     OCT_DEBUG(" set_handler: %s\n", query);
318     errval_t err = SYS_ERR_OK;
319
320     struct oct_reply_state* drs = NULL;
321     struct ast_object* ast = NULL;
322
323     err = new_oct_reply_state(&drs, set_reply);
324     assert(err_is_ok(err));
325
326     err = check_query_length(query);
327     if (err_is_fail(err)) {
328         goto out;
329     }
330
331     err = generate_ast(query, &ast);
332     if (err_is_ok(err)) {
333         if (ast->u.on.name->type == nodeType_Ident) {
334             err = set_record(ast, mode, &drs->query_state);
335             drs->server_id = install_trigger(b, ast, trigger, err);
336         }
337         else {
338             // Since we don't have any ACLs atm. we do not
339             // allow name to be a regex/variable, because
340             // we it's not guaranteed which records get
341             // modified in this case.
342             err = OCT_ERR_NO_RECORD_NAME;
343         }
344     }
345
346 out:
347     drs->error = err;
348     drs->return_record = get;
349     drs->reply(b, drs);
350
351     free_ast(ast);
352 }
353
354 static errval_t build_query_with_idcap(char **query_p, struct capref idcap,
355                                        const char *attributes)
356 {
357     errval_t err;
358     idcap_id_t id = 0;
359     size_t query_size, bytes_written;
360
361     // retrieve id from idcap
362     err = invoke_idcap_identify(idcap, &id);
363     if (err_is_fail(err)) {
364         return err_push(err, OCT_ERR_IDCAP_INVOKE);
365     }
366
367     err = cap_delete(idcap);
368     assert(err_is_ok(err));
369
370     if (attributes == NULL) {
371         attributes = "";
372     }
373
374     // build query using the idcapid and the attributes
375     query_size = snprintf(NULL, 0, IDCAPID_NAME_PREFIX "%" PRIxIDCAPID "%s", id,
376                           attributes);
377     *query_p = (char *) malloc(query_size + 1); // include \0
378     if (*query_p == NULL) {
379         return LIB_ERR_MALLOC_FAIL;
380     }
381     bytes_written = snprintf(*query_p, query_size + 1, IDCAPID_NAME_PREFIX
382                              "%" PRIxIDCAPID "%s", id, attributes);
383
384     return SYS_ERR_OK;
385 }
386
387 static void get_with_idcap_reply(struct octopus_binding *b,
388                                  struct oct_reply_state *drt)
389 {
390     errval_t err;
391     char *reply = err_is_ok(drt->error) ?
392                   drt->query_state.std_out.buffer : NULL;
393     err = b->tx_vtbl.get_with_idcap_response(b,
394                                              MKCONT(free_oct_reply_state, drt),
395                                              reply, drt->server_id, drt->error);
396     if (err_is_fail(err)) {
397         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
398             oct_rpc_enqueue_reply(b, drt);
399             return;
400         }
401         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
402     }
403 }
404
405 void get_with_idcap_handler(struct octopus_binding *b, struct capref idcap,
406                             octopus_trigger_t trigger)
407 {
408     errval_t err;
409     char *query = NULL;
410     struct oct_reply_state *drs = NULL;
411     struct ast_object *ast = NULL;
412
413     OCT_DEBUG("get_with_idcap_handler: %s\n", query);
414
415     err = new_oct_reply_state(&drs, get_with_idcap_reply);
416     assert(err_is_ok(err));
417
418     err = build_query_with_idcap(&query, idcap, "");
419     if (err_is_fail(err)) {
420         goto out;
421     }
422
423     err = check_query_length(query);
424     if (err_is_fail(err)) {
425         goto out;
426     }
427
428     err = generate_ast(query, &ast);
429     if (err_is_ok(err)) {
430         err = get_record(ast, &drs->query_state);
431         drs->server_id = install_trigger(b, ast, trigger, err);
432     }
433
434 out:
435     drs->error = err;
436     drs->reply(b, drs);
437
438     free_ast(ast);
439     if (query != NULL) {
440         free(query);
441     }
442 }
443
444 static void set_with_idcap_reply(struct octopus_binding *b,
445                                  struct oct_reply_state *drs)
446 {
447     char *record = err_is_ok(drs->error) && drs->return_record ?
448             drs->query_state.std_out.buffer : NULL;
449
450     errval_t err;
451     err = b->tx_vtbl.set_with_idcap_response(b,
452                                              MKCONT(free_oct_reply_state, drs),
453                                              record, drs->server_id,
454                                              drs->error);
455     if (err_is_fail(err)) {
456         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
457             oct_rpc_enqueue_reply(b, drs);
458             return;
459         }
460         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
461     }
462 }
463
464 void set_with_idcap_handler(struct octopus_binding *b, struct capref idcap,
465                             const char *attributes, uint64_t mode,
466                             octopus_trigger_t trigger, bool get)
467 {
468     errval_t err;
469     char *query = NULL;
470     struct oct_reply_state *drs = NULL;
471     struct ast_object *ast = NULL;
472
473     err = new_oct_reply_state(&drs, set_with_idcap_reply);
474     assert(err_is_ok(err));
475
476     err = build_query_with_idcap(&query, idcap, attributes);
477     if (err_is_fail(err)) {
478         goto out;
479     }
480     OCT_DEBUG(" set_with_idcap_handler: %s\n", query);
481
482     err = check_query_length(query);
483     if (err_is_fail(err)) {
484         goto out;
485     }
486
487     err = generate_ast(query, &ast);
488     if (err_is_ok(err)) {
489         if (ast->u.on.name->type == nodeType_Ident) {
490             err = set_record(ast, mode, &drs->query_state);
491             drs->server_id = install_trigger(b, ast, trigger, err);
492         } else {
493             err = OCT_ERR_NO_RECORD_NAME;
494         }
495     }
496
497 out:
498     drs->error = err;
499     drs->return_record = get;
500     drs->reply(b, drs);
501
502     free_ast(ast);
503     if (query != NULL) {
504         free(query);
505     }
506
507 }
508
509 static void del_reply(struct octopus_binding* b, struct oct_reply_state* drs)
510 {
511     errval_t err;
512     err = b->tx_vtbl.del_response(b, MKCONT(free_oct_reply_state, drs),
513             drs->server_id, drs->error);
514     if (err_is_fail(err)) {
515         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
516             oct_rpc_enqueue_reply(b, drs);
517             return;
518         }
519         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
520     }
521 }
522
523 void del_handler(struct octopus_binding* b, const char* query,
524                  octopus_trigger_t trigger)
525 {
526     OCT_DEBUG(" del_handler: %s\n", query);
527     errval_t err = SYS_ERR_OK;
528
529     struct oct_reply_state* drs = NULL;
530     struct ast_object* ast = NULL;
531
532     err = new_oct_reply_state(&drs, del_reply);
533     assert(err_is_ok(err));
534
535     err = check_query_length(query);
536     if (err_is_fail(err)) {
537         goto out;
538     }
539
540     err = generate_ast(query, &ast);
541     if (err_is_ok(err)) {
542         if (ast->u.on.name->type == nodeType_Ident) {
543             err = del_record(ast, &drs->query_state);
544             drs->server_id = install_trigger(b, ast, trigger, err);
545         }
546         else {
547             // Since we don't have any ACLs atm. we do not
548             // allow name to be a regex/variable
549             // (see set_handler).
550             err = OCT_ERR_NO_RECORD_NAME;
551         }
552     }
553
554 out:
555     drs->error = err;
556     drs->reply(b, drs);
557
558     free_ast(ast);
559 }
560
561 static void exists_reply(struct octopus_binding* b, struct oct_reply_state* drs)
562 {
563     errval_t err;
564     err = b->tx_vtbl.exists_response(b, MKCONT(free_oct_reply_state, drs),
565             drs->server_id, drs->error);
566
567     if (err_is_fail(err)) {
568         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
569             oct_rpc_enqueue_reply(b, drs);
570             return;
571         }
572         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
573     }
574 }
575
576 void exists_handler(struct octopus_binding* b, const char* query,
577         octopus_trigger_t trigger)
578 {
579     errval_t err = SYS_ERR_OK;
580
581     struct oct_reply_state* drs = NULL;
582     struct ast_object* ast = NULL;
583
584     err = new_oct_reply_state(&drs, exists_reply);
585     assert(err_is_ok(err));
586
587     err = check_query_length(query);
588     if (err_is_fail(err)) {
589         goto out;
590     }
591
592     err = generate_ast(query, &ast);
593     if (err_is_ok(err)) {
594         err = get_record(ast, &drs->query_state);
595         drs->server_id = install_trigger(b, ast, trigger, err);
596     }
597
598 out:
599     drs->error = err;
600     drs->reply(b, drs);
601
602     free_ast(ast);
603 }
604
605 static void wait_for_reply(struct octopus_binding* b, struct oct_reply_state* drs)
606 {
607     errval_t err;
608     err = b->tx_vtbl.wait_for_response(b, MKCONT(free_oct_reply_state, drs),
609             drs->query_state.std_out.buffer, drs->error);
610
611     if (err_is_fail(err)) {
612         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
613             oct_rpc_enqueue_reply(b, drs);
614             return;
615         }
616         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
617     }
618 }
619
620 // XXX: For compatibility reasons with nameserver API
621 void wait_for_handler(struct octopus_binding* b, const char* query) {
622     errval_t err = SYS_ERR_OK;
623     errval_t set_watch_err = SYS_ERR_OK;
624
625     struct oct_reply_state* drs = NULL;
626     struct ast_object* ast = NULL;
627
628     err = new_oct_reply_state(&drs, wait_for_reply);
629     drs->binding = b;
630     assert(err_is_ok(err));
631
632     err = check_query_length(query);
633     if (err_is_fail(err)) {
634         goto out;
635     }
636
637     err = generate_ast(query, &ast);
638     if (err_is_ok(err)) {
639         err = get_record(ast, &drs->query_state);
640         if (err_no(err) == OCT_ERR_NO_RECORD) {
641             uint64_t wid;
642             set_watch_err = set_watch(b, ast, OCT_ON_SET, drs, &wid);
643         }
644     }
645
646 out:
647     if (err_no(err) != OCT_ERR_NO_RECORD || err_is_fail(set_watch_err)) {
648         drs->error = err;
649         if (err_is_fail(set_watch_err)) {
650             // implies err = OCT_ERR_NO_RECORD
651             drs->error = set_watch_err;
652         }
653         drs->reply(b, drs);
654     }
655
656     free_ast(ast);
657 }
658
659 static void subscribe_reply(struct octopus_binding* b,
660         struct oct_reply_state* drs)
661 {
662     errval_t err;
663     err = b->tx_vtbl.subscribe_response(b, MKCONT(free_oct_reply_state, drs),
664             drs->server_id, drs->error);
665
666     if (err_is_fail(err)) {
667         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
668             oct_rpc_enqueue_reply(b, drs);
669             return;
670         }
671         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
672     }
673 }
674
675 void subscribe_handler(struct octopus_binding *b, const char* query,
676         uint64_t trigger_fn, uint64_t state)
677 {
678     OCT_DEBUG("subscribe: query = %s\n", query);
679     errval_t err = SYS_ERR_OK;
680
681     struct oct_reply_state* drs = NULL;
682     struct ast_object* ast = NULL;
683
684     err = new_oct_reply_state(&drs, subscribe_reply);
685     assert(err_is_ok(err));
686
687     err = check_query_length(query);
688     if (err_is_fail(err)) {
689         goto out;
690     }
691
692     err = generate_ast(query, &ast);
693     if (err_is_ok(err)) {
694         err = add_subscription(b, ast, trigger_fn, state, drs);
695     }
696
697 out:
698     drs->error = err;
699     drs->reply(b, drs);
700
701     free_ast(ast);
702 }
703
704 static void unsubscribe_reply(struct octopus_binding* b,
705         struct oct_reply_state* drs)
706 {
707     errval_t err;
708     err = b->tx_vtbl.unsubscribe_response(b, MKCONT(free_oct_reply_state, drs),
709             drs->error);
710     if (err_is_fail(err)) {
711         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
712             oct_rpc_enqueue_reply(b, drs);
713             return;
714         }
715         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
716     }
717 }
718
719 static void send_subscribed_message(struct octopus_binding* b, struct oct_reply_state* drs)
720 {
721     errval_t err = SYS_ERR_OK;
722     char* record = drs->query_state.std_out.buffer[0] != '\0' ?
723             drs->query_state.std_out.buffer : NULL;
724
725     err = b->tx_vtbl.subscription(b, MKCONT(free_oct_reply_state, drs),
726             drs->server_id, drs->client_handler,
727             drs->mode, record, drs->client_state);
728     if (err_is_fail(err)) {
729         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
730             oct_rpc_enqueue_reply(b, drs);
731             return;
732         }
733         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
734     }
735
736 }
737
738 void unsubscribe_handler(struct octopus_binding *b, uint64_t id)
739 {
740     errval_t err = SYS_ERR_OK;
741
742     OCT_DEBUG("unsubscribe: id = %"PRIu64"\n", id);
743
744     struct oct_reply_state* srs = NULL;
745     err = new_oct_reply_state(&srs, unsubscribe_reply);
746     assert(err_is_ok(err));
747
748     err = del_subscription(b, id, &srs->query_state);
749     if (err_is_ok(err)) {
750         uint64_t binding;
751         uint64_t client_handler;
752         uint64_t client_state;
753         uint64_t server_id;
754
755         skb_read_output_at(srs->query_state.std_out.buffer,
756                 "subscriber(%"SCNu64", %"SCNu64", %"SCNu64", %"SCNu64")",
757                 &binding, &client_handler, &client_state, &server_id);
758
759         struct oct_reply_state* subscriber = NULL;
760         err = new_oct_reply_state(&subscriber,
761                 send_subscribed_message);
762         assert(err_is_ok(err));
763
764 #if defined(__i386__) || defined(__arm__)
765         subscriber->binding = (struct octopus_binding*)(uint32_t)binding;
766 #else
767         subscriber->binding = (struct octopus_binding*)binding;
768 #endif
769         subscriber->client_handler = client_handler;
770         subscriber->client_state = client_state;
771         subscriber->server_id = server_id;
772         subscriber->mode = OCT_REMOVED;
773
774         OCT_DEBUG("publish msg to: recipient:%"PRIu64" id:%"PRIu64"\n", binding, server_id);
775         subscriber->reply(subscriber->binding, subscriber);
776     }
777
778     srs->error = err;
779     srs->reply(b, srs);
780 }
781
782 static void publish_reply(struct octopus_binding* b, struct oct_reply_state* drs)
783 {
784     errval_t err;
785     err = b->tx_vtbl.publish_response(b, MKCONT(free_oct_reply_state, drs),
786             drs->error);
787     if (err_is_fail(err)) {
788         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
789             oct_rpc_enqueue_reply(b, drs);
790             return;
791         }
792         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
793     }
794 }
795
796 void publish_handler(struct octopus_binding *b, const char* record)
797 {
798     OCT_DEBUG("publish_handler query: %s\n", record);
799     errval_t err = SYS_ERR_OK;
800
801     struct oct_reply_state* drs = NULL;
802     err = new_oct_reply_state(&drs, publish_reply);
803     assert(err_is_ok(err));
804
805     err = check_query_length(record);
806     if (err_is_fail(err)) {
807         drs->error = err;
808         drs->reply(b, drs);
809         goto out1;
810     }
811
812     struct ast_object* ast = NULL;
813     err = generate_ast(record, &ast);
814     if (err_is_fail(err)) {
815         drs->error = err;
816         drs->reply(b, drs);
817         goto out2;
818     }
819
820
821     if (err_is_ok(err)) {
822         err = find_subscribers(ast, &drs->query_state);
823         if (err_is_ok(err)) {
824             // Reply to publisher
825             drs->error = err;
826             drs->reply(b, drs);
827
828
829             struct list_parser_status status;
830             skb_read_list_init_offset(&status, drs->query_state.std_out.buffer, 0);
831
832             // TODO remove skb list parser dependency
833             // Send to all subscribers
834             uint64_t binding;
835             uint64_t client_handler;
836             uint64_t client_state;
837             uint64_t server_id;
838
839             while (skb_read_list(&status, "subscriber(%"SCNu64", %"SCNu64", %"SCNu64", %"SCNu64")",
840                     &binding, &client_handler, &client_state, &server_id)) {
841
842                 struct oct_reply_state* subscriber = NULL;
843                 err = new_oct_reply_state(&subscriber,
844                         send_subscribed_message);
845                 assert(err_is_ok(err));
846 #if defined(__i386__) || defined(__arm__)
847                 subscriber->binding = (struct octopus_binding*)(uint32_t)binding;
848 #else
849                 subscriber->binding = (struct octopus_binding*)binding;
850 #endif
851                 subscriber->client_handler = client_handler;
852                 strcpy(subscriber->query_state.std_out.buffer, record);
853                 subscriber->client_state = client_state;
854                 subscriber->server_id = server_id;
855                 subscriber->mode = OCT_ON_PUBLISH;
856
857                 OCT_DEBUG("publish msg to: recipient:%"PRIu64" id:%"PRIu64"\n", binding, server_id);
858                 subscriber->reply(subscriber->binding, subscriber);
859             }
860         }
861     }
862
863 out2:
864     free_ast(ast);
865 out1:
866     return;
867 }
868
869 void get_identifier(struct octopus_binding* b)
870 {
871     errval_t err = b->tx_vtbl.get_identifier_response(b, NOP_CONT,
872             current_id++);
873     assert(err_is_ok(err));
874 }
875
876 static void identify_binding_reply(struct octopus_binding* b,
877         struct oct_reply_state* drs)
878 {
879     errval_t err;
880     // TODO send drs->error back to client!
881     err = b->tx_vtbl.identify_response(b, MKCONT(free_oct_reply_state, drs));
882     if (err_is_fail(err)) {
883         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
884             oct_rpc_enqueue_reply(b, drs);
885             return;
886         }
887         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
888     }
889
890 }
891
892 void identify_binding(struct octopus_binding* b, uint64_t id,
893         octopus_binding_type_t type)
894 {
895     assert(id <= current_id);
896
897     struct oct_reply_state* drs = NULL;
898     errval_t err = new_oct_reply_state(&drs, identify_binding_reply);
899     assert(err_is_ok(err));
900
901     OCT_DEBUG("set binding: id=%"PRIu64" type=%d\n", id, type);
902     drs->error = set_binding(type, id, b);
903     drs->reply(b, drs);
904 }