d9d09e7ac624129fff196c2f0348a5159956fe5e
[barrelfish] / lib / octopus / server / service.c
1 /**
2  * \file
3  * \brief Contains handler functions for server-side octopus interface RPC call.
4  */
5
6 /*
7  * Copyright (c) 2009, 2010, 2012, ETH Zurich.
8  * All rights reserved.
9  *
10  * This file is distributed under the terms in the attached LICENSE file.
11  * If you do not find this file, copies can be found by writing to:
12  * ETH Zurich D-INFK, Universitaetstr. 6, CH-8092 Zurich. Attn: Systems Group.
13  */
14
15 #include <stdio.h>
16 #include <string.h>
17
18 #include <barrelfish/barrelfish.h>
19 #include <barrelfish/nameservice_client.h>
20 #include <skb/skb.h> // read list
21 #include <if/octopus_defs.h>
22
23 #include <octopus_server/service.h>
24 #include <octopus_server/query.h>
25 #include <octopus_server/debug.h>
26
27 #include <octopus/parser/ast.h>
28 #include <octopus/definitions.h>
29
30 #include <bench/bench.h>
31
32 #include "queue.h"
33
34 /**
35  * Name prefix used to by the functions set_with_idcap_handler() and
36  * get_with_idcap_handler() to store and retrieve records by idcap.
37  *
38  * This essentially emulates a dedicated namespace for records stored with an
39  * id cap. Octopus and the SKB do not support dedicated namespaces atm.
40  * FIXME: store records set with the function 'set_with_idcap' in a dedicated
41  *        namespace.
42  */
43 #define IDCAPID_NAME_PREFIX "idcapid."
44
45 static uint64_t current_id = 1;
46
47 static inline errval_t check_query_length(char* query) {
48     if (strlen(query) >= MAX_QUERY_LENGTH) {
49         return OCT_ERR_QUERY_SIZE;
50     }
51
52     return SYS_ERR_OK;
53 }
54
55 errval_t new_oct_reply_state(struct oct_reply_state** drt,
56         oct_reply_handler_fn reply_handler)
57 {
58     assert(*drt == NULL);
59     *drt = malloc(sizeof(struct oct_reply_state));
60     if (*drt == NULL) {
61         return LIB_ERR_MALLOC_FAIL;
62     }
63
64     //memset(*drt, 0, sizeof(struct oct_reply_state));
65     (*drt)->query_state.std_out.buffer[0] = '\0';
66     (*drt)->query_state.std_out.length = 0;
67     (*drt)->query_state.std_err.buffer[0] = '\0';
68     (*drt)->query_state.std_err.length = 0;
69
70     (*drt)->binding = 0;
71     (*drt)->return_record = false;
72     (*drt)->error = 0;
73
74     // For set_watch()
75     (*drt)->mode = 0;
76     (*drt)->client_state = 0;
77     (*drt)->client_handler = 0;
78     (*drt)->server_id = 0;
79
80     (*drt)->reply = reply_handler;
81     (*drt)->next = NULL;
82
83     return SYS_ERR_OK;
84 }
85
86 static void free_oct_reply_state(void* arg)
87 {
88     if (arg != NULL) {
89         struct oct_reply_state* drt = (struct oct_reply_state*) arg;
90         // In case we have to free things in oct_reply_state, free here...
91
92         free(drt);
93     } else {
94         assert(!"free_reply_state with NULL argument?");
95     }
96 }
97
98 static void trigger_send_handler(struct octopus_binding* b,
99         struct oct_reply_state* drs)
100 {
101     char* record = drs->query_state.std_out.buffer[0] != '\0' ?
102             drs->query_state.std_out.buffer : NULL;
103
104     errval_t err;
105     err = b->tx_vtbl.trigger(b, MKCONT(free_oct_reply_state, drs),
106             drs->server_id,
107             drs->client_handler,
108             drs->mode,
109             record,
110             drs->client_state);
111     if (err_is_fail(err)) {
112         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
113             oct_rpc_enqueue_reply(b, drs);
114             return;
115         }
116         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
117     }
118 }
119
120 static inline bool can_install_trigger(octopus_trigger_t trigger, errval_t error)
121 {
122     OCT_DEBUG("%s:%s:%d: trigger.m > 0 = %d\n",
123               __FILE__, __FUNCTION__, __LINE__, trigger.m > 0);
124     OCT_DEBUG("%s:%s:%d: trigger.in_case == err_no(error) = %d\n",
125            __FILE__, __FUNCTION__, __LINE__, trigger.in_case == err_no(error));
126
127     return trigger.m > 0 &&
128            (trigger.in_case == err_no(error) ||
129            (trigger.m & OCT_ALWAYS_SET) != 0 );
130 }
131
132 static inline uint64_t install_trigger(struct octopus_binding* binding,
133         struct ast_object* ast, octopus_trigger_t trigger, errval_t error)
134 {
135     errval_t err;
136     uint64_t watch_id = 0;
137
138     if (can_install_trigger(trigger, error)) {
139         struct oct_reply_state* trigger_reply = NULL;
140         err = new_oct_reply_state(&trigger_reply, trigger_send_handler);
141         assert(err_is_ok(err));
142
143         trigger_reply->client_handler = trigger.trigger;
144         trigger_reply->client_state = trigger.st;
145
146         trigger_reply->binding = (trigger.send_to == octopus_BINDING_RPC) ?
147                 binding : get_event_binding(binding);
148         if (trigger_reply->binding == NULL) {
149             fprintf(stderr, "No event binding for trigger, send events "
150                             "over regular binding.");
151             trigger_reply->binding = binding;
152         }
153
154         err = set_watch(binding, ast, trigger.m, trigger_reply, &watch_id);
155         assert(err_is_ok(err));
156     }
157
158     return watch_id;
159 }
160
161 static void remove_trigger_reply(struct octopus_binding* b,
162         struct oct_reply_state* drs)
163 {
164     errval_t err;
165     err = b->tx_vtbl.remove_trigger_response(b,
166             MKCONT(free_oct_reply_state, drs),
167             drs->error);
168     if (err_is_fail(err)) {
169         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
170             oct_rpc_enqueue_reply(b, drs);
171             return;
172         }
173         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
174     }
175 }
176
177 void remove_trigger_handler(struct octopus_binding *b, octopus_trigger_id_t tid)
178 {
179     struct oct_reply_state* drs = NULL;
180     errval_t err = new_oct_reply_state(&drs, remove_trigger_reply);
181     assert(err_is_ok(err));
182
183     drs->error = del_watch(b, tid, &drs->query_state);
184     drs->reply(b, drs);
185 }
186
187 /*static inline void arrival_rate(void)
188 {
189     static cycles_t measure_time = 10000;
190     static uint64_t arrivals = 0;
191     static cycles_t start = 0;
192     arrivals++;
193     if ( (arrivals % 100) == 0 && bench_tsc_to_ms(bench_tsc() - start) > measure_time) {
194         printf("Get Rate per sec: %lu\n", arrivals / (measure_time / 1000));
195         start = bench_tsc();
196         arrivals = 0;
197     }
198 }*/
199
200 static void get_reply(struct octopus_binding* b, struct oct_reply_state* drt)
201 {
202     errval_t err;
203     char* reply = err_is_ok(drt->error) ?
204             drt->query_state.std_out.buffer : NULL;
205     err = b->tx_vtbl.get_response(b, MKCONT(free_oct_reply_state, drt),
206             reply, drt->server_id, drt->error);
207     if (err_is_fail(err)) {
208         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
209             oct_rpc_enqueue_reply(b, drt);
210             return;
211         }
212         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
213     }
214 }
215
216 void get_handler(struct octopus_binding *b, char *query, octopus_trigger_t trigger)
217 {
218     errval_t err = SYS_ERR_OK;
219
220     struct oct_reply_state* drs = NULL;
221     struct ast_object* ast = NULL;
222     err = new_oct_reply_state(&drs, get_reply);
223     assert(err_is_ok(err));
224
225     err = check_query_length(query);
226     if (err_is_fail(err)) {
227         goto out;
228     }
229
230     err = generate_ast(query, &ast);
231     if (err_is_ok(err)) {
232         err = get_record(ast, &drs->query_state);
233         drs->server_id = install_trigger(b, ast, trigger, err);
234     }
235
236 out:
237     drs->error = err;
238     drs->reply(b, drs);
239
240     free_ast(ast);
241 }
242
243 static void get_names_reply(struct octopus_binding* b,
244         struct oct_reply_state* drt)
245 {
246     errval_t err;
247     char* reply = err_is_ok(drt->error) ?
248             drt->query_state.std_out.buffer : NULL;
249     err = b->tx_vtbl.get_names_response(b, MKCONT(free_oct_reply_state, drt),
250             reply, drt->server_id, drt->error);
251     if (err_is_fail(err)) {
252         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
253             oct_rpc_enqueue_reply(b, drt);
254             return;
255         }
256         if (err_no(err) == FLOUNDER_ERR_TX_MSG_SIZE) {
257             debug_printf("max msg size: %u, reply size: %zu\n",
258                     octopus__get_names_response_output_MAX_ARGUMENT_SIZE,
259                     drt->query_state.std_out.length);
260         }
261         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
262     }
263 }
264
265 void get_names_handler(struct octopus_binding *b, char *query, octopus_trigger_t t)
266 {
267     OCT_DEBUG(" get_names_handler: %s\n", query);
268
269     errval_t err = SYS_ERR_OK;
270
271     struct oct_reply_state* drs = NULL;
272     struct ast_object* ast = NULL;
273
274     err = new_oct_reply_state(&drs, get_names_reply);
275     assert(err_is_ok(err));
276
277     err = check_query_length(query);
278     if (err_is_fail(err)) {
279         goto out;
280     }
281
282     err = generate_ast(query, &ast);
283     if (err_is_ok(err)) {
284         err = get_record_names(ast, &drs->query_state);
285         drs->server_id = install_trigger(b, ast, t, err);
286     }
287
288 out:
289     drs->error = err;
290     drs->reply(b, drs);
291
292     free_ast(ast);
293 }
294
295 static void set_reply(struct octopus_binding* b, struct oct_reply_state* drs)
296 {
297     char* record = err_is_ok(drs->error) && drs->return_record ?
298             drs->query_state.std_out.buffer : NULL;
299
300     errval_t err;
301     err = b->tx_vtbl.set_response(b, MKCONT(free_oct_reply_state, drs), record,
302             drs->server_id, drs->error);
303     if (err_is_fail(err)) {
304         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
305             oct_rpc_enqueue_reply(b, drs);
306             return;
307         }
308         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
309     }
310 }
311
312 void set_handler(struct octopus_binding *b, char *query, uint64_t mode,
313         octopus_trigger_t trigger, bool get)
314 {
315     OCT_DEBUG(" set_handler: %s\n", query);
316     errval_t err = SYS_ERR_OK;
317
318     struct oct_reply_state* drs = NULL;
319     struct ast_object* ast = NULL;
320
321     err = new_oct_reply_state(&drs, set_reply);
322     assert(err_is_ok(err));
323
324     err = check_query_length(query);
325     if (err_is_fail(err)) {
326         goto out;
327     }
328
329     err = generate_ast(query, &ast);
330     if (err_is_ok(err)) {
331         if (ast->u.on.name->type == nodeType_Ident) {
332             err = set_record(ast, mode, &drs->query_state);
333             drs->server_id = install_trigger(b, ast, trigger, err);
334         }
335         else {
336             // Since we don't have any ACLs atm. we do not
337             // allow name to be a regex/variable, because
338             // we it's not guaranteed which records get
339             // modified in this case.
340             err = OCT_ERR_NO_RECORD_NAME;
341         }
342     }
343
344 out:
345     drs->error = err;
346     drs->return_record = get;
347     drs->reply(b, drs);
348
349     free_ast(ast);
350 }
351
352 static errval_t build_query_with_idcap(char **query_p, struct capref idcap,
353                                        char *attributes)
354 {
355     errval_t err;
356     idcap_id_t id = 0;
357     size_t query_size, bytes_written;
358
359     // retrieve id from idcap
360     err = invoke_idcap_identify(idcap, &id);
361     if (err_is_fail(err)) {
362         return err_push(err, OCT_ERR_IDCAP_INVOKE);
363     }
364
365     err = cap_delete(idcap);
366     assert(err_is_ok(err));
367
368     if (attributes == NULL) {
369         attributes = "";
370     }
371
372     // build query using the idcapid and the attributes
373     query_size = snprintf(NULL, 0, IDCAPID_NAME_PREFIX "%" PRIxIDCAPID "%s", id,
374                           attributes);
375     *query_p = (char *) malloc(query_size + 1); // include \0
376     if (*query_p == NULL) {
377         return LIB_ERR_MALLOC_FAIL;
378     }
379     bytes_written = snprintf(*query_p, query_size + 1, IDCAPID_NAME_PREFIX
380                              "%" PRIxIDCAPID "%s", id, attributes);
381
382     return SYS_ERR_OK;
383 }
384
385 static void get_with_idcap_reply(struct octopus_binding *b,
386                                  struct oct_reply_state *drt)
387 {
388     errval_t err;
389     char *reply = err_is_ok(drt->error) ?
390                   drt->query_state.std_out.buffer : NULL;
391     err = b->tx_vtbl.get_with_idcap_response(b,
392                                              MKCONT(free_oct_reply_state, drt),
393                                              reply, drt->server_id, drt->error);
394     if (err_is_fail(err)) {
395         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
396             oct_rpc_enqueue_reply(b, drt);
397             return;
398         }
399         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
400     }
401 }
402
403 void get_with_idcap_handler(struct octopus_binding *b, struct capref idcap,
404                             octopus_trigger_t trigger)
405 {
406     errval_t err;
407     char *query = NULL;
408     struct oct_reply_state *drs = NULL;
409     struct ast_object *ast = NULL;
410
411     OCT_DEBUG("get_with_idcap_handler: %s\n", query);
412
413     err = new_oct_reply_state(&drs, get_with_idcap_reply);
414     assert(err_is_ok(err));
415
416     err = build_query_with_idcap(&query, idcap, "");
417     if (err_is_fail(err)) {
418         goto out;
419     }
420
421     err = check_query_length(query);
422     if (err_is_fail(err)) {
423         goto out;
424     }
425
426     err = generate_ast(query, &ast);
427     if (err_is_ok(err)) {
428         err = get_record(ast, &drs->query_state);
429         drs->server_id = install_trigger(b, ast, trigger, err);
430     }
431
432 out:
433     drs->error = err;
434     drs->reply(b, drs);
435
436     free_ast(ast);
437     if (query != NULL) {
438         free(query);
439     }
440 }
441
442 static void set_with_idcap_reply(struct octopus_binding *b,
443                                  struct oct_reply_state *drs)
444 {
445     char *record = err_is_ok(drs->error) && drs->return_record ?
446             drs->query_state.std_out.buffer : NULL;
447
448     errval_t err;
449     err = b->tx_vtbl.set_with_idcap_response(b,
450                                              MKCONT(free_oct_reply_state, drs),
451                                              record, drs->server_id,
452                                              drs->error);
453     if (err_is_fail(err)) {
454         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
455             oct_rpc_enqueue_reply(b, drs);
456             return;
457         }
458         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
459     }
460 }
461
462 void set_with_idcap_handler(struct octopus_binding *b, struct capref idcap,
463                             char *attributes, uint64_t mode,
464                             octopus_trigger_t trigger, bool get)
465 {
466     errval_t err;
467     char *query = NULL;
468     struct oct_reply_state *drs = NULL;
469     struct ast_object *ast = NULL;
470
471     err = new_oct_reply_state(&drs, set_with_idcap_reply);
472     assert(err_is_ok(err));
473
474     err = build_query_with_idcap(&query, idcap, attributes);
475     if (err_is_fail(err)) {
476         goto out;
477     }
478     OCT_DEBUG(" set_with_idcap_handler: %s\n", query);
479
480     err = check_query_length(query);
481     if (err_is_fail(err)) {
482         goto out;
483     }
484
485     err = generate_ast(query, &ast);
486     if (err_is_ok(err)) {
487         if (ast->u.on.name->type == nodeType_Ident) {
488             err = set_record(ast, mode, &drs->query_state);
489             drs->server_id = install_trigger(b, ast, trigger, err);
490         } else {
491             err = OCT_ERR_NO_RECORD_NAME;
492         }
493     }
494
495 out:
496     drs->error = err;
497     drs->return_record = get;
498     drs->reply(b, drs);
499
500     free_ast(ast);
501     if (query != NULL) {
502         free(query);
503     }
504
505 }
506
507 static void del_reply(struct octopus_binding* b, struct oct_reply_state* drs)
508 {
509     errval_t err;
510     err = b->tx_vtbl.del_response(b, MKCONT(free_oct_reply_state, drs),
511             drs->server_id, drs->error);
512     if (err_is_fail(err)) {
513         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
514             oct_rpc_enqueue_reply(b, drs);
515             return;
516         }
517         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
518     }
519 }
520
521 void del_handler(struct octopus_binding* b, char* query, octopus_trigger_t trigger)
522 {
523     OCT_DEBUG(" del_handler: %s\n", query);
524     errval_t err = SYS_ERR_OK;
525
526     struct oct_reply_state* drs = NULL;
527     struct ast_object* ast = NULL;
528
529     err = new_oct_reply_state(&drs, del_reply);
530     assert(err_is_ok(err));
531
532     err = check_query_length(query);
533     if (err_is_fail(err)) {
534         goto out;
535     }
536
537     err = generate_ast(query, &ast);
538     if (err_is_ok(err)) {
539         if (ast->u.on.name->type == nodeType_Ident) {
540             err = del_record(ast, &drs->query_state);
541             drs->server_id = install_trigger(b, ast, trigger, err);
542         }
543         else {
544             // Since we don't have any ACLs atm. we do not
545             // allow name to be a regex/variable
546             // (see set_handler).
547             err = OCT_ERR_NO_RECORD_NAME;
548         }
549     }
550
551 out:
552     drs->error = err;
553     drs->reply(b, drs);
554
555     free_ast(ast);
556 }
557
558 static void exists_reply(struct octopus_binding* b, struct oct_reply_state* drs)
559 {
560     errval_t err;
561     err = b->tx_vtbl.exists_response(b, MKCONT(free_oct_reply_state, drs),
562             drs->server_id, drs->error);
563
564     if (err_is_fail(err)) {
565         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
566             oct_rpc_enqueue_reply(b, drs);
567             return;
568         }
569         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
570     }
571 }
572
573 void exists_handler(struct octopus_binding* b, char* query,
574         octopus_trigger_t trigger)
575 {
576     errval_t err = SYS_ERR_OK;
577
578     struct oct_reply_state* drs = NULL;
579     struct ast_object* ast = NULL;
580
581     err = new_oct_reply_state(&drs, exists_reply);
582     assert(err_is_ok(err));
583
584     err = check_query_length(query);
585     if (err_is_fail(err)) {
586         goto out;
587     }
588
589     err = generate_ast(query, &ast);
590     if (err_is_ok(err)) {
591         err = get_record(ast, &drs->query_state);
592         drs->server_id = install_trigger(b, ast, trigger, err);
593     }
594
595 out:
596     drs->error = err;
597     drs->reply(b, drs);
598
599     free_ast(ast);
600 }
601
602 static void wait_for_reply(struct octopus_binding* b, struct oct_reply_state* drs)
603 {
604     errval_t err;
605     err = b->tx_vtbl.wait_for_response(b, MKCONT(free_oct_reply_state, drs),
606             drs->query_state.std_out.buffer, drs->error);
607
608     if (err_is_fail(err)) {
609         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
610             oct_rpc_enqueue_reply(b, drs);
611             return;
612         }
613         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
614     }
615 }
616
617 // XXX: For compatibility reasons with nameserver API
618 void wait_for_handler(struct octopus_binding* b, char* query) {
619     errval_t err = SYS_ERR_OK;
620     errval_t set_watch_err = SYS_ERR_OK;
621
622     struct oct_reply_state* drs = NULL;
623     struct ast_object* ast = NULL;
624
625     err = new_oct_reply_state(&drs, wait_for_reply);
626     drs->binding = b;
627     assert(err_is_ok(err));
628
629     err = check_query_length(query);
630     if (err_is_fail(err)) {
631         goto out;
632     }
633
634     err = generate_ast(query, &ast);
635     if (err_is_ok(err)) {
636         err = get_record(ast, &drs->query_state);
637         if (err_no(err) == OCT_ERR_NO_RECORD) {
638             uint64_t wid;
639             set_watch_err = set_watch(b, ast, OCT_ON_SET, drs, &wid);
640         }
641     }
642
643 out:
644     if (err_no(err) != OCT_ERR_NO_RECORD || err_is_fail(set_watch_err)) {
645         drs->error = err;
646         if (err_is_fail(set_watch_err)) {
647             // implies err = OCT_ERR_NO_RECORD
648             drs->error = set_watch_err;
649         }
650         drs->reply(b, drs);
651     }
652
653     free_ast(ast);
654 }
655
656 static void subscribe_reply(struct octopus_binding* b,
657         struct oct_reply_state* drs)
658 {
659     errval_t err;
660     err = b->tx_vtbl.subscribe_response(b, MKCONT(free_oct_reply_state, drs),
661             drs->server_id, drs->error);
662
663     if (err_is_fail(err)) {
664         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
665             oct_rpc_enqueue_reply(b, drs);
666             return;
667         }
668         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
669     }
670 }
671
672 void subscribe_handler(struct octopus_binding *b, char* query,
673         uint64_t trigger_fn, uint64_t state)
674 {
675     OCT_DEBUG("subscribe: query = %s\n", query);
676     errval_t err = SYS_ERR_OK;
677
678     struct oct_reply_state* drs = NULL;
679     struct ast_object* ast = NULL;
680
681     err = new_oct_reply_state(&drs, subscribe_reply);
682     assert(err_is_ok(err));
683
684     err = check_query_length(query);
685     if (err_is_fail(err)) {
686         goto out;
687     }
688
689     err = generate_ast(query, &ast);
690     if (err_is_ok(err)) {
691         err = add_subscription(b, ast, trigger_fn, state, drs);
692     }
693
694 out:
695     drs->error = err;
696     drs->reply(b, drs);
697
698     free_ast(ast);
699 }
700
701 static void unsubscribe_reply(struct octopus_binding* b,
702         struct oct_reply_state* drs)
703 {
704     errval_t err;
705     err = b->tx_vtbl.unsubscribe_response(b, MKCONT(free_oct_reply_state, drs),
706             drs->error);
707     if (err_is_fail(err)) {
708         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
709             oct_rpc_enqueue_reply(b, drs);
710             return;
711         }
712         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
713     }
714 }
715
716 static void send_subscribed_message(struct octopus_binding* b, struct oct_reply_state* drs)
717 {
718     errval_t err = SYS_ERR_OK;
719     char* record = drs->query_state.std_out.buffer[0] != '\0' ?
720             drs->query_state.std_out.buffer : NULL;
721
722     err = b->tx_vtbl.subscription(b, MKCONT(free_oct_reply_state, drs),
723             drs->server_id, drs->client_handler,
724             drs->mode, record, drs->client_state);
725     if (err_is_fail(err)) {
726         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
727             oct_rpc_enqueue_reply(b, drs);
728             return;
729         }
730         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
731     }
732
733 }
734
735 void unsubscribe_handler(struct octopus_binding *b, uint64_t id)
736 {
737     errval_t err = SYS_ERR_OK;
738
739     OCT_DEBUG("unsubscribe: id = %"PRIu64"\n", id);
740
741     struct oct_reply_state* srs = NULL;
742     err = new_oct_reply_state(&srs, unsubscribe_reply);
743     assert(err_is_ok(err));
744
745     err = del_subscription(b, id, &srs->query_state);
746     if (err_is_ok(err)) {
747         uint64_t binding;
748         uint64_t client_handler;
749         uint64_t client_state;
750         uint64_t server_id;
751
752         skb_read_output_at(srs->query_state.std_out.buffer,
753                 "subscriber(%"SCNu64", %"SCNu64", %"SCNu64", %"SCNu64")",
754                 &binding, &client_handler, &client_state, &server_id);
755
756         struct oct_reply_state* subscriber = NULL;
757         err = new_oct_reply_state(&subscriber,
758                 send_subscribed_message);
759         assert(err_is_ok(err));
760
761 #if defined(__i386__) || defined(__arm__)
762         subscriber->binding = (struct octopus_binding*)(uint32_t)binding;
763 #else
764         subscriber->binding = (struct octopus_binding*)binding;
765 #endif
766         subscriber->client_handler = client_handler;
767         subscriber->client_state = client_state;
768         subscriber->server_id = server_id;
769         subscriber->mode = OCT_REMOVED;
770
771         OCT_DEBUG("publish msg to: recipient:%"PRIu64" id:%"PRIu64"\n", binding, server_id);
772         subscriber->reply(subscriber->binding, subscriber);
773     }
774
775     srs->error = err;
776     srs->reply(b, srs);
777 }
778
779 static void publish_reply(struct octopus_binding* b, struct oct_reply_state* drs)
780 {
781     errval_t err;
782     err = b->tx_vtbl.publish_response(b, MKCONT(free_oct_reply_state, drs),
783             drs->error);
784     if (err_is_fail(err)) {
785         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
786             oct_rpc_enqueue_reply(b, drs);
787             return;
788         }
789         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
790     }
791 }
792
793 void publish_handler(struct octopus_binding *b, char* record)
794 {
795     OCT_DEBUG("publish_handler query: %s\n", record);
796     errval_t err = SYS_ERR_OK;
797
798     struct oct_reply_state* drs = NULL;
799     err = new_oct_reply_state(&drs, publish_reply);
800     assert(err_is_ok(err));
801
802     err = check_query_length(record);
803     if (err_is_fail(err)) {
804         drs->error = err;
805         drs->reply(b, drs);
806         goto out1;
807     }
808
809     struct ast_object* ast = NULL;
810     err = generate_ast(record, &ast);
811     if (err_is_fail(err)) {
812         drs->error = err;
813         drs->reply(b, drs);
814         goto out2;
815     }
816
817
818     if (err_is_ok(err)) {
819         err = find_subscribers(ast, &drs->query_state);
820         if (err_is_ok(err)) {
821             // Reply to publisher
822             drs->error = err;
823             drs->reply(b, drs);
824
825
826             struct list_parser_status status;
827             skb_read_list_init_offset(&status, drs->query_state.std_out.buffer, 0);
828
829             // TODO remove skb list parser dependency
830             // Send to all subscribers
831             uint64_t binding;
832             uint64_t client_handler;
833             uint64_t client_state;
834             uint64_t server_id;
835
836             while (skb_read_list(&status, "subscriber(%"SCNu64", %"SCNu64", %"SCNu64", %"SCNu64")",
837                     &binding, &client_handler, &client_state, &server_id)) {
838
839                 struct oct_reply_state* subscriber = NULL;
840                 err = new_oct_reply_state(&subscriber,
841                         send_subscribed_message);
842                 assert(err_is_ok(err));
843 #if defined(__i386__) || defined(__arm__)
844                 subscriber->binding = (struct octopus_binding*)(uint32_t)binding;
845 #else
846                 subscriber->binding = (struct octopus_binding*)binding;
847 #endif
848                 subscriber->client_handler = client_handler;
849                 strcpy(subscriber->query_state.std_out.buffer, record);
850                 subscriber->client_state = client_state;
851                 subscriber->server_id = server_id;
852                 subscriber->mode = OCT_ON_PUBLISH;
853
854                 OCT_DEBUG("publish msg to: recipient:%"PRIu64" id:%"PRIu64"\n", binding, server_id);
855                 subscriber->reply(subscriber->binding, subscriber);
856             }
857         }
858     }
859
860 out2:
861     free_ast(ast);
862 out1:
863     return;
864 }
865
866 void get_identifier(struct octopus_binding* b)
867 {
868     errval_t err = b->tx_vtbl.get_identifier_response(b, NOP_CONT,
869             current_id++);
870     assert(err_is_ok(err));
871 }
872
873 static void identify_binding_reply(struct octopus_binding* b,
874         struct oct_reply_state* drs)
875 {
876     errval_t err;
877     // TODO send drs->error back to client!
878     err = b->tx_vtbl.identify_response(b, MKCONT(free_oct_reply_state, drs));
879     if (err_is_fail(err)) {
880         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
881             oct_rpc_enqueue_reply(b, drs);
882             return;
883         }
884         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
885     }
886
887 }
888
889 void identify_binding(struct octopus_binding* b, uint64_t id,
890         octopus_binding_type_t type)
891 {
892     assert(id <= current_id);
893
894     struct oct_reply_state* drs = NULL;
895     errval_t err = new_oct_reply_state(&drs, identify_binding_reply);
896     assert(err_is_ok(err));
897
898     OCT_DEBUG("set binding: id=%"PRIu64" type=%d\n", id, type);
899     drs->error = set_binding(type, id, b);
900     drs->reply(b, drs);
901 }
902