961c9ec5c7b553cb4bfa273c12dc257a78549cca
[barrelfish] / lib / octopus / server / service.c
1 /**
2  * \file
3  * \brief Contains handler functions for server-side octopus interface RPC call.
4  */
5
6 /*
7  * Copyright (c) 2009, 2010, ETH Zurich.
8  * All rights reserved.
9  *
10  * This file is distributed under the terms in the attached LICENSE file.
11  * If you do not find this file, copies can be found by writing to:
12  * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
13  */
14
15 #include <stdio.h>
16 #include <string.h>
17
18 #include <barrelfish/barrelfish.h>
19 #include <barrelfish/nameservice_client.h>
20 #include <skb/skb.h> // read list
21 #include <if/octopus_defs.h>
22
23 #include <octopus_server/service.h>
24 #include <octopus_server/query.h>
25 #include <octopus_server/debug.h>
26
27 #include <octopus/parser/ast.h>
28 #include <octopus/definitions.h>
29
30 #include <bench/bench.h>
31
32 #include "queue.h"
33
34 static uint64_t current_id = 1;
35
36 static inline errval_t check_query_length(char* query) {
37     if (strlen(query) >= MAX_QUERY_LENGTH) {
38         return OCT_ERR_QUERY_SIZE;
39     }
40
41     return SYS_ERR_OK;
42 }
43
44 errval_t new_oct_reply_state(struct oct_reply_state** drt,
45         oct_reply_handler_fn reply_handler)
46 {
47     assert(*drt == NULL);
48     *drt = malloc(sizeof(struct oct_reply_state));
49     if (*drt == NULL) {
50         return LIB_ERR_MALLOC_FAIL;
51     }
52
53     //memset(*drt, 0, sizeof(struct oct_reply_state));
54     (*drt)->query_state.stdout.buffer[0] = '\0';
55     (*drt)->query_state.stdout.length = 0;
56     (*drt)->query_state.stderr.buffer[0] = '\0';
57     (*drt)->query_state.stderr.length = 0;
58
59     (*drt)->binding = 0;
60     (*drt)->return_record = false;
61     (*drt)->error = 0;
62
63     // For set_watch()
64     (*drt)->mode = 0;
65     (*drt)->client_state = 0;
66     (*drt)->client_handler = 0;
67     (*drt)->server_id = 0;
68
69     (*drt)->reply = reply_handler;
70     (*drt)->next = NULL;
71
72     return SYS_ERR_OK;
73 }
74
75 static void free_oct_reply_state(void* arg)
76 {
77     if (arg != NULL) {
78         struct oct_reply_state* drt = (struct oct_reply_state*) arg;
79         // In case we have to free things in oct_reply_state, free here...
80
81         free(drt);
82     } else {
83         assert(!"free_reply_state with NULL argument?");
84     }
85 }
86
87 static void trigger_send_handler(struct octopus_binding* b,
88         struct oct_reply_state* drs)
89 {
90     char* record = drs->query_state.stdout.buffer[0] != '\0' ?
91             drs->query_state.stdout.buffer : NULL;
92
93     errval_t err;
94     err = b->tx_vtbl.trigger(b, MKCONT(free_oct_reply_state, drs),
95             drs->server_id,
96             drs->client_handler,
97             drs->mode,
98             record,
99             drs->client_state);
100     if (err_is_fail(err)) {
101         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
102             oct_rpc_enqueue_reply(b, drs);
103             return;
104         }
105         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
106     }
107 }
108
109 static inline bool can_install_trigger(octopus_trigger_t trigger, errval_t error)
110 {
111     return trigger.m > 0 &&
112            (trigger.in_case == err_no(error) ||
113            (trigger.m & DIST_ALWAYS_SET) != 0 );
114 }
115
116 static inline uint64_t install_trigger(struct octopus_binding* binding,
117         struct ast_object* ast, octopus_trigger_t trigger, errval_t error)
118 {
119     errval_t err;
120     uint64_t watch_id = 0;
121
122     if (can_install_trigger(trigger, error)) {
123         struct oct_reply_state* trigger_reply = NULL;
124         err = new_oct_reply_state(&trigger_reply, trigger_send_handler);
125         assert(err_is_ok(err));
126
127         trigger_reply->client_handler = trigger.trigger;
128         trigger_reply->client_state = trigger.st;
129
130         trigger_reply->binding = (trigger.send_to == octopus_BINDING_RPC) ?
131                 binding : get_event_binding(binding);
132         if (trigger_reply->binding == NULL) {
133             fprintf(stderr, "No event binding for trigger, send events "
134                             "over regular binding.");
135             trigger_reply->binding = binding;
136         }
137
138         err = set_watch(binding, ast, trigger.m, trigger_reply, &watch_id);
139         assert(err_is_ok(err));
140     }
141
142     return watch_id;
143 }
144
145 static void remove_trigger_reply(struct octopus_binding* b,
146         struct oct_reply_state* drs)
147 {
148     errval_t err;
149     err = b->tx_vtbl.remove_trigger_response(b,
150             MKCONT(free_oct_reply_state, drs),
151             drs->error);
152     if (err_is_fail(err)) {
153         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
154             oct_rpc_enqueue_reply(b, drs);
155             return;
156         }
157         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
158     }
159 }
160
161 void remove_trigger_handler(struct octopus_binding *b, octopus_trigger_id_t tid)
162 {
163     struct oct_reply_state* drs = NULL;
164     errval_t err = new_oct_reply_state(&drs, remove_trigger_reply);
165     assert(err_is_ok(err));
166
167     drs->error = del_watch(b, tid, &drs->query_state);
168     drs->reply(b, drs);
169 }
170
171 /*static inline void arrival_rate(void)
172 {
173     static cycles_t measure_time = 10000;
174     static uint64_t arrivals = 0;
175     static cycles_t start = 0;
176     arrivals++;
177     if ( (arrivals % 100) == 0 && bench_tsc_to_ms(bench_tsc() - start) > measure_time) {
178         printf("Get Rate per sec: %lu\n", arrivals / (measure_time / 1000));
179         start = bench_tsc();
180         arrivals = 0;
181     }
182 }*/
183
184 static void get_reply(struct octopus_binding* b, struct oct_reply_state* drt)
185 {
186     errval_t err;
187     char* reply = err_is_ok(drt->error) ?
188             drt->query_state.stdout.buffer : NULL;
189     err = b->tx_vtbl.get_response(b, MKCONT(free_oct_reply_state, drt),
190             reply, drt->server_id, drt->error);
191     if (err_is_fail(err)) {
192         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
193             oct_rpc_enqueue_reply(b, drt);
194             return;
195         }
196         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
197     }
198 }
199
200 void get_handler(struct octopus_binding *b, char *query, octopus_trigger_t trigger)
201 {
202     errval_t err = SYS_ERR_OK;
203
204     struct oct_reply_state* drs = NULL;
205     struct ast_object* ast = NULL;
206     err = new_oct_reply_state(&drs, get_reply);
207     assert(err_is_ok(err));
208
209     err = check_query_length(query);
210     if (err_is_fail(err)) {
211         goto out;
212     }
213
214     err = generate_ast(query, &ast);
215     if (err_is_ok(err)) {
216         err = get_record(ast, &drs->query_state);
217         drs->server_id = install_trigger(b, ast, trigger, err);
218     }
219
220 out:
221     drs->error = err;
222     drs->reply(b, drs);
223
224     free_ast(ast);
225     free(query);
226 }
227
228 static void get_names_reply(struct octopus_binding* b,
229         struct oct_reply_state* drt)
230 {
231     errval_t err;
232     char* reply = err_is_ok(drt->error) ?
233             drt->query_state.stdout.buffer : NULL;
234     err = b->tx_vtbl.get_names_response(b, MKCONT(free_oct_reply_state, drt),
235             reply, drt->server_id, drt->error);
236     if (err_is_fail(err)) {
237         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
238             oct_rpc_enqueue_reply(b, drt);
239             return;
240         }
241         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
242     }
243 }
244
245 void get_names_handler(struct octopus_binding *b, char *query, octopus_trigger_t t)
246 {
247     OCT_DEBUG(" get_names_handler: %s\n", query);
248
249     errval_t err = SYS_ERR_OK;
250
251     struct oct_reply_state* drs = NULL;
252     struct ast_object* ast = NULL;
253
254     err = new_oct_reply_state(&drs, get_names_reply);
255     assert(err_is_ok(err));
256
257     err = check_query_length(query);
258     if (err_is_fail(err)) {
259         goto out;
260     }
261
262     err = generate_ast(query, &ast);
263     if (err_is_ok(err)) {
264         err = get_record_names(ast, &drs->query_state);
265         drs->server_id = install_trigger(b, ast, t, err);
266     }
267
268 out:
269     drs->error = err;
270     drs->reply(b, drs);
271
272     free_ast(ast);
273     free(query);
274 }
275
276 static void set_reply(struct octopus_binding* b, struct oct_reply_state* drs)
277 {
278     char* record = err_is_ok(drs->error) && drs->return_record ?
279             drs->query_state.stdout.buffer : NULL;
280
281     errval_t err;
282     err = b->tx_vtbl.set_response(b, MKCONT(free_oct_reply_state, drs), record,
283             drs->server_id, drs->error);
284     if (err_is_fail(err)) {
285         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
286             oct_rpc_enqueue_reply(b, drs);
287             return;
288         }
289         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
290     }
291 }
292
293 void set_handler(struct octopus_binding *b, char *query, uint64_t mode,
294         octopus_trigger_t trigger, bool get)
295 {
296     OCT_DEBUG(" set_handler: %s\n", query);
297     errval_t err = SYS_ERR_OK;
298
299     struct oct_reply_state* drs = NULL;
300     struct ast_object* ast = NULL;
301
302     err = new_oct_reply_state(&drs, set_reply);
303     assert(err_is_ok(err));
304
305     err = check_query_length(query);
306     if (err_is_fail(err)) {
307         goto out;
308     }
309
310     err = generate_ast(query, &ast);
311     if (err_is_ok(err)) {
312         if (ast->u.on.name->type == nodeType_Ident) {
313             err = set_record(ast, mode, &drs->query_state);
314             drs->server_id = install_trigger(b, ast, trigger, err);
315         }
316         else {
317             // Since we don't have any ACLs atm. we do not
318             // allow name to be a regex/variable, because
319             // we it's not guaranteed which records get
320             // modified in this case.
321             err = OCT_ERR_NO_RECORD_NAME;
322         }
323     }
324
325 out:
326     drs->error = err;
327     drs->return_record = get;
328     drs->reply(b, drs);
329
330     free_ast(ast);
331     free(query);
332 }
333
334 static void del_reply(struct octopus_binding* b, struct oct_reply_state* drs)
335 {
336     errval_t err;
337     err = b->tx_vtbl.del_response(b, MKCONT(free_oct_reply_state, drs),
338             drs->server_id, drs->error);
339     if (err_is_fail(err)) {
340         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
341             oct_rpc_enqueue_reply(b, drs);
342             return;
343         }
344         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
345     }
346 }
347
348 void del_handler(struct octopus_binding* b, char* query, octopus_trigger_t trigger)
349 {
350     OCT_DEBUG(" del_handler: %s\n", query);
351     errval_t err = SYS_ERR_OK;
352
353     struct oct_reply_state* drs = NULL;
354     struct ast_object* ast = NULL;
355
356     err = new_oct_reply_state(&drs, del_reply);
357     assert(err_is_ok(err));
358
359     err = check_query_length(query);
360     if (err_is_fail(err)) {
361         goto out;
362     }
363
364     err = generate_ast(query, &ast);
365     if (err_is_ok(err)) {
366         if (ast->u.on.name->type == nodeType_Ident) {
367             err = del_record(ast, &drs->query_state);
368             drs->server_id = install_trigger(b, ast, trigger, err);
369         }
370         else {
371             // Since we don't have any ACLs atm. we do not
372             // allow name to be a regex/variable
373             // (see set_handler).
374             err = OCT_ERR_NO_RECORD_NAME;
375         }
376     }
377
378 out:
379     drs->error = err;
380     drs->reply(b, drs);
381
382     free_ast(ast);
383     free(query);
384 }
385
386 static void exists_reply(struct octopus_binding* b, struct oct_reply_state* drs)
387 {
388     errval_t err;
389     err = b->tx_vtbl.exists_response(b, MKCONT(free_oct_reply_state, drs),
390             drs->server_id, drs->error);
391
392     if (err_is_fail(err)) {
393         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
394             oct_rpc_enqueue_reply(b, drs);
395             return;
396         }
397         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
398     }
399 }
400
401 void exists_handler(struct octopus_binding* b, char* query,
402         octopus_trigger_t trigger)
403 {
404     errval_t err = SYS_ERR_OK;
405
406     struct oct_reply_state* drs = NULL;
407     struct ast_object* ast = NULL;
408
409     err = new_oct_reply_state(&drs, exists_reply);
410     assert(err_is_ok(err));
411
412     err = check_query_length(query);
413     if (err_is_fail(err)) {
414         goto out;
415     }
416
417     err = generate_ast(query, &ast);
418     if (err_is_ok(err)) {
419         err = get_record(ast, &drs->query_state);
420         drs->server_id = install_trigger(b, ast, trigger, err);
421     }
422
423 out:
424     drs->error = err;
425     drs->reply(b, drs);
426
427     free_ast(ast);
428     free(query);
429 }
430
431 static void wait_for_reply(struct octopus_binding* b, struct oct_reply_state* drs)
432 {
433     errval_t err;
434     err = b->tx_vtbl.wait_for_response(b, MKCONT(free_oct_reply_state, drs),
435             drs->query_state.stdout.buffer, drs->error);
436
437     if (err_is_fail(err)) {
438         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
439             oct_rpc_enqueue_reply(b, drs);
440             return;
441         }
442         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
443     }
444 }
445
446 // XXX: For compatibility reasons with nameserver API
447 void wait_for_handler(struct octopus_binding* b, char* query) {
448     errval_t err = SYS_ERR_OK;
449     errval_t set_watch_err = SYS_ERR_OK;
450
451     struct oct_reply_state* drs = NULL;
452     struct ast_object* ast = NULL;
453
454     err = new_oct_reply_state(&drs, wait_for_reply);
455     drs->binding = b;
456     assert(err_is_ok(err));
457
458     err = check_query_length(query);
459     if (err_is_fail(err)) {
460         goto out;
461     }
462
463     err = generate_ast(query, &ast);
464     if (err_is_ok(err)) {
465         err = get_record(ast, &drs->query_state);
466         if (err_no(err) == OCT_ERR_NO_RECORD) {
467             debug_printf("waiting for: %s\n", query);
468             uint64_t wid;
469             set_watch_err = set_watch(b, ast, DIST_ON_SET, drs, &wid);
470         }
471     }
472
473 out:
474     if (err_no(err) != OCT_ERR_NO_RECORD || err_is_fail(set_watch_err)) {
475         drs->error = err;
476         if (err_is_fail(set_watch_err)) {
477             // implies err = OCT_ERR_NO_RECORD
478             drs->error = set_watch_err;
479         }
480         drs->reply(b, drs);
481     }
482
483     free_ast(ast);
484     free(query);
485 }
486
487 static void subscribe_reply(struct octopus_binding* b,
488         struct oct_reply_state* drs)
489 {
490     errval_t err;
491     err = b->tx_vtbl.subscribe_response(b, MKCONT(free_oct_reply_state, drs),
492             drs->server_id, drs->error);
493
494     if (err_is_fail(err)) {
495         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
496             oct_rpc_enqueue_reply(b, drs);
497             return;
498         }
499         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
500     }
501 }
502
503 void subscribe_handler(struct octopus_binding *b, char* query,
504         uint64_t trigger_fn, uint64_t state)
505 {
506     OCT_DEBUG("subscribe: query = %s\n", query);
507     errval_t err = SYS_ERR_OK;
508
509     struct oct_reply_state* drs = NULL;
510     struct ast_object* ast = NULL;
511
512     err = new_oct_reply_state(&drs, subscribe_reply);
513     assert(err_is_ok(err));
514
515     err = check_query_length(query);
516     if (err_is_fail(err)) {
517         goto out;
518     }
519
520     err = generate_ast(query, &ast);
521     if (err_is_ok(err)) {
522         err = add_subscription(b, ast, trigger_fn, state, drs);
523     }
524
525 out:
526     drs->error = err;
527     drs->reply(b, drs);
528
529     free_ast(ast);
530     free(query);
531 }
532
533 static void unsubscribe_reply(struct octopus_binding* b,
534         struct oct_reply_state* drs)
535 {
536     errval_t err;
537     err = b->tx_vtbl.unsubscribe_response(b, MKCONT(free_oct_reply_state, drs),
538             drs->error);
539     if (err_is_fail(err)) {
540         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
541             oct_rpc_enqueue_reply(b, drs);
542             return;
543         }
544         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
545     }
546 }
547
548 static void send_subscribed_message(struct octopus_binding* b, struct oct_reply_state* drs)
549 {
550     errval_t err = SYS_ERR_OK;
551     char* record = drs->query_state.stdout.buffer[0] != '\0' ?
552             drs->query_state.stdout.buffer : NULL;
553
554     err = b->tx_vtbl.subscription(b, MKCONT(free_oct_reply_state, drs),
555             drs->server_id, drs->client_handler,
556             drs->mode, record, drs->client_state);
557     if (err_is_fail(err)) {
558         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
559             oct_rpc_enqueue_reply(b, drs);
560             return;
561         }
562         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
563     }
564
565 }
566
567 void unsubscribe_handler(struct octopus_binding *b, uint64_t id)
568 {
569     errval_t err = SYS_ERR_OK;
570
571     OCT_DEBUG("unsubscribe: id = %lu\n", id);
572
573     struct oct_reply_state* srs = NULL;
574     err = new_oct_reply_state(&srs, unsubscribe_reply);
575     assert(err_is_ok(err));
576
577     err = del_subscription(b, id, &srs->query_state);
578     if (err_is_ok(err)) {
579         uint64_t binding;
580         uint64_t client_handler;
581         uint64_t client_state;
582         uint64_t server_id;
583
584         skb_read_output_at(srs->query_state.stdout.buffer,
585                 "subscriber(%lu, %lu, %lu, %lu)",
586                 &binding, &client_handler, &client_state, &server_id);
587
588         struct oct_reply_state* subscriber = NULL;
589         err = new_oct_reply_state(&subscriber,
590                 send_subscribed_message);
591         assert(err_is_ok(err));
592
593         subscriber->binding = (struct octopus_binding*)binding;
594         subscriber->client_handler = client_handler;
595         subscriber->client_state = client_state;
596         subscriber->server_id = server_id;
597         subscriber->mode = DIST_REMOVED;
598
599         OCT_DEBUG("publish msg to: recipient:%lu id:%lu\n", binding, server_id);
600         subscriber->reply(subscriber->binding, subscriber);
601     }
602
603     srs->error = err;
604     srs->reply(b, srs);
605 }
606
607 static void publish_reply(struct octopus_binding* b, struct oct_reply_state* drs)
608 {
609     errval_t err;
610     err = b->tx_vtbl.publish_response(b, MKCONT(free_oct_reply_state, drs),
611             drs->error);
612     if (err_is_fail(err)) {
613         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
614             oct_rpc_enqueue_reply(b, drs);
615             return;
616         }
617         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
618     }
619 }
620
621 void publish_handler(struct octopus_binding *b, char* record)
622 {
623     OCT_DEBUG("publish_handler query: %s\n", record);
624     errval_t err = SYS_ERR_OK;
625
626     struct oct_reply_state* drs = NULL;
627     err = new_oct_reply_state(&drs, publish_reply);
628     assert(err_is_ok(err));
629
630     err = check_query_length(record);
631     if (err_is_fail(err)) {
632         drs->error = err;
633         drs->reply(b, drs);
634         goto out;
635     }
636
637     struct ast_object* ast = NULL;
638     err = generate_ast(record, &ast);
639     if (err_is_fail(err)) {
640         drs->error = err;
641         drs->reply(b, drs);
642         goto out;
643     }
644
645
646     if (err_is_ok(err)) {
647         err = find_subscribers(ast, &drs->query_state);
648         if (err_is_ok(err)) {
649             // Reply to publisher
650             drs->error = err;
651             drs->reply(b, drs);
652
653
654             struct list_parser_status status;
655             skb_read_list_init_offset(&status, drs->query_state.stdout.buffer, 0);
656
657             // TODO remove skb list parser dependency
658             // Send to all subscribers
659             uint64_t binding;
660             uint64_t client_handler;
661             uint64_t client_state;
662             uint64_t server_id;
663
664             while (skb_read_list(&status, "subscriber(%lu, %lu, %lu, %lu)",
665                     &binding, &client_handler, &client_state, &server_id)) {
666
667                 struct oct_reply_state* subscriber = NULL;
668                 err = new_oct_reply_state(&subscriber,
669                         send_subscribed_message);
670                 assert(err_is_ok(err));
671
672                 subscriber->binding = (struct octopus_binding*)binding;
673                 subscriber->client_handler = client_handler;
674                 strcpy(subscriber->query_state.stdout.buffer, record);
675                 subscriber->client_state = client_state;
676                 subscriber->server_id = server_id;
677                 subscriber->mode = DIST_ON_PUBLISH;
678
679                 OCT_DEBUG("publish msg to: recipient:%lu id:%lu\n", binding, server_id);
680                 subscriber->reply(subscriber->binding, subscriber);
681             }
682         }
683     }
684
685 out:
686     free(record);
687     free_ast(ast);
688 }
689
690 void get_identifier(struct octopus_binding* b)
691 {
692     errval_t err = b->tx_vtbl.get_identifier_response(b, NOP_CONT,
693             current_id++);
694     assert(err_is_ok(err));
695 }
696
697 static void identify_binding_reply(struct octopus_binding* b,
698         struct oct_reply_state* drs)
699 {
700     errval_t err;
701     // TODO send drs->error back to client!
702     err = b->tx_vtbl.identify_response(b, MKCONT(free_oct_reply_state, drs));
703     if (err_is_fail(err)) {
704         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
705             oct_rpc_enqueue_reply(b, drs);
706             return;
707         }
708         USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
709     }
710
711 }
712
713 void identify_binding(struct octopus_binding* b, uint64_t id,
714         octopus_binding_type_t type)
715 {
716     assert(id <= current_id);
717
718     struct oct_reply_state* drs = NULL;
719     errval_t err = new_oct_reply_state(&drs, identify_binding_reply);
720     assert(err_is_ok(err));
721
722     OCT_DEBUG("set binding: id=%lu type=%d\n", id, type);
723     drs->error = set_binding(type, id, b);
724     drs->reply(b, drs);
725 }
726