8b6df77ddcc9eab5669e3b38320b9775ab2d4939
[barrelfish] / usr / skb / dist / predicates.c
1 /**
2  * \file
3  * \brief Definitions for external C predicates used in Prolog code of
4  * the octopus server implementation.
5  */
6
7 /*
8  * Copyright (c) 2011, ETH Zurich.
9  * All rights reserved.
10  *
11  * This file is distributed under the terms in the attached LICENSE file.
12  * If you do not find this file, copies can be found by writing to:
13  * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
14  */
15 #define _USE_XOPEN /* for strdup() */
16
17 #include <stdio.h>
18 #include <string.h>
19
20 #include <eclipse.h>
21 #include <barrelfish/barrelfish.h>
22 #include <include/skb_server.h>
23 #include <collections/hash_table.h>
24
25 #include <octopus_server/debug.h>
26 #include <octopus_server/service.h>
27 #include <octopus/trigger.h> // for trigger modes
28
29 #include "predicates.h"
30 #include "skiplist.h"
31 #include "bitfield.h"
32 #include "fnv.h"
33
34 #define HASH_INDEX_BUCKETS 6151
35 static hash_table* record_index = NULL;
36
37 static hash_table* trigger_index = NULL;
38 static struct bitfield* no_attr_triggers = NULL;
39
40 static hash_table* subscriber_index = NULL;
41 static struct bitfield* no_attr_subscriptions = NULL;
42
43 static inline void init_index(void) {
44     if(record_index == NULL) {
45         hash_create_with_buckets(&record_index, HASH_INDEX_BUCKETS, NULL);
46     }
47
48     if(subscriber_index == NULL) {
49         hash_create_with_buckets(&subscriber_index, HASH_INDEX_BUCKETS, NULL);
50         bitfield_create(&no_attr_subscriptions);
51     }
52
53     if(trigger_index == NULL) {
54         hash_create_with_buckets(&trigger_index, HASH_INDEX_BUCKETS, NULL);
55         bitfield_create(&no_attr_triggers);
56     }
57 }
58
59
60 static int skip_index_insert(hash_table* ht, uint64_t key, char* value)
61 {
62     assert(ht != NULL);
63     assert(value != NULL);
64
65     struct skip_list* sl = (struct skip_list*) hash_find(ht, key);
66     if (sl == NULL) {
67         errval_t err = skip_create_list(&sl);
68         if (err_is_fail(err)) {
69             return PFAIL;
70         }
71         hash_insert(ht, key, sl);
72     }
73
74     skip_insert(sl, value);
75     //skip_print_list(sl);
76
77     return PSUCCEED;
78 }
79
80 static char* skip_index_remove(hash_table* ht, uint64_t key, char* value)
81 {
82     assert(ht != NULL);
83     assert(value != NULL);
84
85     struct skip_list* sl = (struct skip_list*) hash_find(ht, key);
86     if (sl == NULL) {
87         return NULL;
88     }
89
90     char* record_name = skip_delete(sl, value);
91
92     //skip_print_list(sl);
93     return record_name;
94 }
95
96 int p_save_index(void)
97 {
98     OCT_DEBUG("p_save_index\n");
99     init_index();
100
101     char* value = NULL;
102     int res = ec_get_string(ec_arg(3), &value);
103     assert(res == PSUCCEED);
104
105     char* record_name = strdup(value);
106     bool inserted = false;
107
108     pword list, cur, rest;
109     pword attribute_term;
110     for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) {
111         ec_get_arg(1, cur, &attribute_term);
112
113         char* attribute;
114         ec_get_string(attribute_term, &attribute);
115
116         OCT_DEBUG("insert %s(%p) into index[%s]=", record_name, record_name, attribute);
117         uint64_t key = fnv_64a_str(attribute, FNV1A_64_INIT);
118         int res = skip_index_insert(record_index, key, record_name);
119         assert(res == PSUCCEED);
120         inserted = true;
121     }
122
123     if (!inserted) {
124         free(record_name);
125     }
126
127     return PSUCCEED;
128 }
129
130 int p_remove_index(void)
131 {
132     int res;
133     char* to_free = NULL;
134     init_index();
135
136     char* name = NULL;
137     res = ec_get_string(ec_arg(3), &name);
138     assert(res == PSUCCEED);
139
140     pword list, cur, rest;
141     pword attribute_term;
142     for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) {
143         ec_get_arg(1, cur, &attribute_term);
144
145         char* attribute;
146         res = ec_get_string(attribute_term, &attribute);
147         assert(res == PSUCCEED);
148
149         uint64_t key = fnv_64a_str(attribute, FNV1A_64_INIT);
150         to_free = skip_index_remove(record_index, key, name);
151         OCT_DEBUG("removed %s(%p) from index[%s]=", name, to_free, attribute);
152         //assert(to_free != NULL);
153     }
154
155     free(to_free);
156     return PSUCCEED;
157 }
158
159 int p_index_intersect(void) /* p_index_intersect(type, -[Attributes], -Current, +Next) */
160 {
161     OCT_DEBUG("p_index_intersect\n");
162     static struct skip_list** sets = NULL;
163     static char* next = NULL;
164     static size_t elems = 0;
165
166     int res;
167     char* key;
168
169     init_index();
170
171     char* index_type = NULL;
172     res = ec_get_string(ec_arg(1), &index_type);
173     if (res != PSUCCEED) {
174         return res;
175     }
176     hash_table* ht = record_index;
177
178     res = ec_get_string(ec_arg(3), &next);
179     if (res != PSUCCEED) {
180         OCT_DEBUG("state is not a string, find skip lists\n");
181         free(sets);
182         pword list, cur, rest;
183
184         elems = 0;
185         for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) {
186             elems++;
187         }
188         sets = malloc(sizeof(struct skip_list*) * elems);
189
190         size_t i = 0;
191         for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) {
192             res = ec_get_string(cur, &key);
193             if (res != PSUCCEED) {
194                 return res;
195             }
196
197             uint64_t hash_key = fnv_64a_str(key, FNV1A_64_INIT);
198             struct skip_list* sl = hash_find(ht, hash_key);
199             if (sl == NULL) {
200                 return PFAIL;
201             }
202             OCT_DEBUG("skip_intersect found skip list for key: %s\n", key);
203             //skip_print_list(sl);
204
205             sets[i] = sl;
206             i++;
207         }
208         next = NULL;
209     }
210
211     next = skip_intersect(sets, elems, next);
212     OCT_DEBUG("skip_intersect found next: %s\n", next);
213     if(next != NULL) {
214         dident item = ec_did(next, 0);
215         return ec_unify_arg(4, ec_atom(item));
216     }
217
218     return PFAIL;
219 }
220
221 int p_index_union(void) /* p_index_union(type, -[Attributes], -Current, +Next) */
222 {
223     OCT_DEBUG("p_index_union\n");
224     static hash_table* union_ht = NULL;
225     static char* next = NULL;
226
227     int res;
228     char* key;
229
230     init_index();
231
232     char* index_type = NULL;
233     res = ec_get_string(ec_arg(1), &index_type);
234     if (res != PSUCCEED) {
235         return res;
236     }
237     hash_table* ht = record_index; // TODO broken
238
239     res = ec_get_string(ec_arg(3), &next);
240     if (res != PSUCCEED) {
241         OCT_DEBUG("state is not a string, find skip lists\n");
242         if (union_ht != NULL) {
243             hash_release(union_ht);
244             union_ht = NULL;
245         }
246         hash_create_with_buckets(&union_ht, HASH_INDEX_BUCKETS, NULL);
247
248         pword list, cur, rest;
249         for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) {
250             res = ec_get_string(cur, &key);
251             if (res != PSUCCEED) {
252                 return res;
253             }
254
255             uint64_t hash_key = fnv_64a_str(key, FNV1A_64_INIT);
256             struct skip_list* sl = hash_find(ht, hash_key);
257
258             // Insert all entries in union hash table
259             if (sl != NULL) {
260                 OCT_DEBUG("p_index_union found skip list for key: %s\n", key);
261                 //skip_print_list(sl);
262
263                 struct skip_node* sentry = sl->header->forward[0];
264                 while(sentry != NULL) {
265                     uint64_t hash_key = fnv_64a_str(sentry->element, FNV1A_64_INIT);
266                     if(hash_find(union_ht, hash_key) == NULL) {
267                         OCT_DEBUG("p_index_union insert: %s\n", sentry->element);
268                         hash_insert(union_ht, hash_key, sentry->element);
269                     }
270                     sentry = sentry->forward[0];
271                 }
272             }
273
274         }
275         next = NULL;
276         hash_traverse_start(union_ht);
277     }
278
279     uint64_t hash_key;
280     next = hash_traverse_next(union_ht, &hash_key);
281     OCT_DEBUG("skip_union found next: %s\n", next);
282     if(next != NULL) {
283         dident item = ec_did(next, 0);
284         return ec_unify_arg(4, ec_atom(item));
285     }
286     else {
287         hash_traverse_end(union_ht);
288         return PFAIL;
289     }
290 }
291
292
293
294 static int bitfield_index_insert(hash_table* ht, uint64_t key, long int id)
295 {
296     assert(ht != NULL);
297
298     struct bitfield* bf = (struct bitfield*) hash_find(ht, key);
299     if (bf == NULL) {
300         errval_t err = bitfield_create(&bf);
301         if (err_is_fail(err)) {
302             return PFAIL;
303         }
304         hash_insert(ht, key, bf);
305     }
306
307     bitfield_on(bf, id);
308     return PSUCCEED;
309 }
310
311 static int bitfield_index_remove(hash_table* ht, uint64_t key, long int id)
312 {
313     assert(ht != NULL);
314
315     struct bitfield* bf = (struct bitfield*) hash_find(ht, key);
316     if (bf != NULL) {
317         bitfield_off(bf, id);
318     }
319
320     return PSUCCEED;
321 }
322
323 int p_bitfield_add(void) /* p_bitfield_add(Storage, +Name, +[AttributeList], +Id) */
324 {
325     init_index();
326     int res = 0;
327     long int id;
328     bool inserted = false;
329
330     hash_table* ht = NULL;
331     struct bitfield* no_attr_bf = NULL;
332
333     char* storage;
334     res = ec_get_string(ec_arg(1), &storage);
335     if (strcmp(storage, "trigger") == 0) {
336         ht = trigger_index;
337         no_attr_bf = no_attr_triggers;
338     }
339     else {
340         ht = subscriber_index;
341         no_attr_bf = no_attr_subscriptions;
342     }
343
344     res = ec_get_long(ec_arg(3), &id);
345     if (res != PSUCCEED) {
346         return PFAIL;
347     }
348
349     pword list, cur, rest;
350     pword attribute_term;
351     for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) {
352         ec_get_arg(1, cur, &attribute_term);
353
354         char* attribute;
355         ec_get_string(attribute_term, &attribute);
356         uint64_t key = fnv_64a_str(attribute, FNV1A_64_INIT);
357
358         int res = bitfield_index_insert(ht, key, id);
359         assert(res == PSUCCEED);
360         inserted = true;
361     }
362
363     if (!inserted) {
364         bitfield_on(no_attr_bf, id);
365     }
366
367     return PSUCCEED;
368 }
369
370 int p_bitfield_remove(void) /* p_bitfield_remove(Storage, +Name, +[AttributeList], +Id) */
371 {
372     init_index();
373
374     int res = 0;
375     long int id;
376
377     hash_table* ht = NULL;
378     struct bitfield* no_attr_bf = NULL;
379
380     char* storage;
381     res = ec_get_string(ec_arg(1), &storage);
382     if (strcmp(storage, "trigger") == 0) {
383         ht = trigger_index;
384         no_attr_bf = no_attr_triggers;
385     }
386     else {
387         ht = subscriber_index;
388         no_attr_bf = no_attr_subscriptions;
389     }
390
391     res = ec_get_long(ec_arg(3), &id);
392     if (res != PSUCCEED) {
393         return PFAIL;
394     }
395
396     pword list, cur, rest;
397     pword attribute_term;
398     for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) {
399         ec_get_arg(1, cur, &attribute_term);
400
401         char* attribute;
402         res = ec_get_string(attribute_term, &attribute);
403         assert(res == PSUCCEED);
404
405         uint64_t key = fnv_64a_str(attribute, FNV1A_64_INIT);
406         bitfield_index_remove(ht, key, id);
407     }
408
409     bitfield_off(no_attr_bf, id);
410     return PSUCCEED;
411 }
412
413 int p_bitfield_union(void) /* p_index_union(Storage, -[Attributes], -Current, +Next) */
414 {
415     OCT_DEBUG("p_bitfield_union\n");
416     static struct bitfield** sets = NULL;
417     static long int next = -1;
418     static size_t elems = 0;
419
420     int res;
421     char* key;
422
423     init_index();
424     hash_table* ht = NULL;
425     struct bitfield* no_attr_bf = NULL;
426
427     char* storage = NULL;
428     res = ec_get_string(ec_arg(1), &storage);
429     if (strcmp(storage, "trigger") == 0) {
430         ht = trigger_index;
431         no_attr_bf = no_attr_triggers;
432     }
433     else {
434         ht = subscriber_index;
435         no_attr_bf = no_attr_subscriptions;
436     }
437
438     res = ec_get_long(ec_arg(3), &next);
439     if (res != PSUCCEED) {
440         OCT_DEBUG("state is not a id, find bitmaps\n");
441         free(sets);
442         pword list, cur, rest;
443
444         elems = 0;
445         for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) {
446             elems++;
447         }
448         sets = calloc(elems+1, sizeof(struct bitfield*));
449         sets[0] = no_attr_bf;
450
451         elems = 1;
452         for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) {
453             res = ec_get_string(cur, &key);
454             if (res != PSUCCEED) {
455                 return res;
456             }
457
458             uint64_t hash_key = fnv_64a_str(key, FNV1A_64_INIT);
459             struct bitfield* sl = hash_find(ht, hash_key);
460             if (sl != NULL) {
461                 OCT_DEBUG("bitfield_union found bitfield for key: %s\n", key);
462                 sets[elems++] = sl;
463             }
464             // else: no record with this attribute, just ignore
465
466         }
467         next = -1;
468     }
469
470     next = bitfield_union(sets, elems, next);
471     OCT_DEBUG("bitfield_union found next: %ld\n", next);
472     if(next != -1) {
473         pword item = ec_long(next);
474         return ec_unify_arg(4, item);
475     }
476
477     return PFAIL;
478 }
479
480 void oct_rpc_enqueue_reply(struct octopus_binding *b, struct oct_reply_state* st);
481 extern struct bitfield* trigger_ids;
482
483 int p_trigger_watch(void) /* p_trigger_watch(+String, +Mode, +Recipient, +WatchId, -Retract) */
484 {
485     int res;
486     OCT_DEBUG("\n*** p_trigger_watch: start\n");
487
488     // Get arguments
489     char* record = NULL;
490     res = ec_get_string(ec_arg(1), &record);
491     if (res != PSUCCEED) {
492         assert(ec_is_var(ec_arg(1)) == PSUCCEED);
493         // record will be null
494         // can happen in case we send DIST_REMOVED
495     }
496
497     // Action that triggered the event
498     long int action = 0;
499     res = ec_get_long(ec_arg(2), &action);
500     if (res != PSUCCEED) {
501         return res;
502     }
503
504     // Mode of watch
505     long int watch_mode = 0;
506     res = ec_get_long(ec_arg(3), &watch_mode);
507     if (res != PSUCCEED) {
508         return res;
509     }
510
511     struct oct_reply_state* drs = NULL;
512     res = ec_get_long(ec_arg(4), (long int*) &drs);
513     if (res != PSUCCEED) {
514         return res;
515     }
516     assert(drs != NULL);
517     OCT_DEBUG("drs is: %p\n", drs);
518
519     long int watch_id = 0;
520     res = ec_get_long(ec_arg(5), &watch_id);
521     if (res != PSUCCEED) {
522         return res;
523     }
524
525     OCT_DEBUG("p_trigger_watch: %s\n", record);
526     OCT_DEBUG("drs->binding: %p\n", drs->binding);
527     OCT_DEBUG("drs->reply: %p\n", drs->reply);
528
529
530     drs->error = SYS_ERR_OK;
531     bool retract = !(watch_mode & DIST_PERSIST);
532     if (record != NULL) {
533         assert(strlen(record)+1 < MAX_QUERY_LENGTH);
534         strcpy(drs->query_state.stdout.buffer, record);
535     }
536     else {
537         drs->query_state.stdout.buffer[0] = '\0';
538         drs->query_state.stdout.length = 0;
539     }
540
541     if (drs->binding != NULL && drs->reply != NULL) {
542
543         if (!retract) {
544             // Copy reply state because the trigger will stay intact
545             struct oct_reply_state* drs_copy = NULL;
546             errval_t err = new_oct_reply_state(&drs_copy, NULL);
547             assert(err_is_ok(err));
548             memcpy(drs_copy, drs, sizeof(struct oct_reply_state));
549             drs = drs_copy; // overwrite drs
550         }
551         else {
552             assert(trigger_ids != NULL);
553             OCT_DEBUG("turn off trigger id: %lu\n", watch_id);
554             bitfield_off(trigger_ids, watch_id);
555         }
556
557         drs->mode = (retract) ? (action | DIST_REMOVED) : action;
558
559         if (drs->binding->st != NULL) {
560             oct_rpc_enqueue_reply(drs->binding, drs);
561         }
562         else {
563             drs->reply(drs->binding, drs);
564         }
565     }
566     else {
567         USER_PANIC("No binding set for watch_id: %lu", watch_id);
568     }
569
570     OCT_DEBUG("p_trigger_watch: done");
571     return ec_unify_arg(6, ec_long(retract));
572 }