3 * \brief Definitions for external C predicates used in Prolog code of
4 * the octopus server implementation.
8 * Copyright (c) 2011, ETH Zurich.
11 * This file is distributed under the terms in the attached LICENSE file.
12 * If you do not find this file, copies can be found by writing to:
13 * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
15 #define _USE_XOPEN /* for strdup() */
21 #include <barrelfish/barrelfish.h>
22 #include <include/skb_server.h>
23 #include <collections/hash_table.h>
25 #include <octopus_server/debug.h>
26 #include <octopus_server/service.h>
27 #include <octopus/trigger.h> // for trigger modes
29 #include "predicates.h"
34 #define HASH_INDEX_BUCKETS 6151
35 static hash_table* record_index = NULL;
37 static hash_table* trigger_index = NULL;
38 static struct bitfield* no_attr_triggers = NULL;
40 static hash_table* subscriber_index = NULL;
41 static struct bitfield* no_attr_subscriptions = NULL;
43 static inline void init_index(void) {
44 if(record_index == NULL) {
45 hash_create_with_buckets(&record_index, HASH_INDEX_BUCKETS, NULL);
48 if(subscriber_index == NULL) {
49 hash_create_with_buckets(&subscriber_index, HASH_INDEX_BUCKETS, NULL);
50 bitfield_create(&no_attr_subscriptions);
53 if(trigger_index == NULL) {
54 hash_create_with_buckets(&trigger_index, HASH_INDEX_BUCKETS, NULL);
55 bitfield_create(&no_attr_triggers);
60 static int skip_index_insert(hash_table* ht, uint64_t key, char* value)
63 assert(value != NULL);
65 struct skip_list* sl = (struct skip_list*) hash_find(ht, key);
67 errval_t err = skip_create_list(&sl);
68 if (err_is_fail(err)) {
71 hash_insert(ht, key, sl);
74 skip_insert(sl, value);
75 //skip_print_list(sl);
80 static char* skip_index_remove(hash_table* ht, uint64_t key, char* value)
83 assert(value != NULL);
85 struct skip_list* sl = (struct skip_list*) hash_find(ht, key);
90 char* record_name = skip_delete(sl, value);
92 //skip_print_list(sl);
96 int p_save_index(void)
98 OCT_DEBUG("p_save_index\n");
102 int res = ec_get_string(ec_arg(3), &value);
103 assert(res == PSUCCEED);
105 char* record_name = strdup(value);
106 bool inserted = false;
108 pword list, cur, rest;
109 pword attribute_term;
110 for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) {
111 ec_get_arg(1, cur, &attribute_term);
114 ec_get_string(attribute_term, &attribute);
116 OCT_DEBUG("insert %s(%p) into index[%s]=", record_name, record_name, attribute);
117 uint64_t key = fnv_64a_str(attribute, FNV1A_64_INIT);
118 int res = skip_index_insert(record_index, key, record_name);
119 assert(res == PSUCCEED);
130 int p_remove_index(void)
133 char* to_free = NULL;
137 res = ec_get_string(ec_arg(3), &name);
138 assert(res == PSUCCEED);
140 pword list, cur, rest;
141 pword attribute_term;
142 for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) {
143 ec_get_arg(1, cur, &attribute_term);
146 res = ec_get_string(attribute_term, &attribute);
147 assert(res == PSUCCEED);
149 uint64_t key = fnv_64a_str(attribute, FNV1A_64_INIT);
150 to_free = skip_index_remove(record_index, key, name);
151 OCT_DEBUG("removed %s(%p) from index[%s]=", name, to_free, attribute);
152 //assert(to_free != NULL);
159 int p_index_intersect(void) /* p_index_intersect(type, -[Attributes], -Current, +Next) */
161 OCT_DEBUG("p_index_intersect\n");
162 static struct skip_list** sets = NULL;
163 static char* next = NULL;
164 static size_t elems = 0;
171 char* index_type = NULL;
172 res = ec_get_string(ec_arg(1), &index_type);
173 if (res != PSUCCEED) {
176 hash_table* ht = record_index;
178 res = ec_get_string(ec_arg(3), &next);
179 if (res != PSUCCEED) {
180 OCT_DEBUG("state is not a string, find skip lists\n");
182 pword list, cur, rest;
185 for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) {
188 sets = malloc(sizeof(struct skip_list*) * elems);
191 for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) {
192 res = ec_get_string(cur, &key);
193 if (res != PSUCCEED) {
197 uint64_t hash_key = fnv_64a_str(key, FNV1A_64_INIT);
198 struct skip_list* sl = hash_find(ht, hash_key);
202 OCT_DEBUG("skip_intersect found skip list for key: %s\n", key);
203 //skip_print_list(sl);
211 next = skip_intersect(sets, elems, next);
212 OCT_DEBUG("skip_intersect found next: %s\n", next);
214 dident item = ec_did(next, 0);
215 return ec_unify_arg(4, ec_atom(item));
221 int p_index_union(void) /* p_index_union(type, -[Attributes], -Current, +Next) */
223 OCT_DEBUG("p_index_union\n");
224 static hash_table* union_ht = NULL;
225 static char* next = NULL;
232 char* index_type = NULL;
233 res = ec_get_string(ec_arg(1), &index_type);
234 if (res != PSUCCEED) {
237 hash_table* ht = record_index; // TODO broken
239 res = ec_get_string(ec_arg(3), &next);
240 if (res != PSUCCEED) {
241 OCT_DEBUG("state is not a string, find skip lists\n");
242 if (union_ht != NULL) {
243 hash_release(union_ht);
246 hash_create_with_buckets(&union_ht, HASH_INDEX_BUCKETS, NULL);
248 pword list, cur, rest;
249 for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) {
250 res = ec_get_string(cur, &key);
251 if (res != PSUCCEED) {
255 uint64_t hash_key = fnv_64a_str(key, FNV1A_64_INIT);
256 struct skip_list* sl = hash_find(ht, hash_key);
258 // Insert all entries in union hash table
260 OCT_DEBUG("p_index_union found skip list for key: %s\n", key);
261 //skip_print_list(sl);
263 struct skip_node* sentry = sl->header->forward[0];
264 while(sentry != NULL) {
265 uint64_t hash_key = fnv_64a_str(sentry->element, FNV1A_64_INIT);
266 if(hash_find(union_ht, hash_key) == NULL) {
267 OCT_DEBUG("p_index_union insert: %s\n", sentry->element);
268 hash_insert(union_ht, hash_key, sentry->element);
270 sentry = sentry->forward[0];
276 hash_traverse_start(union_ht);
280 next = hash_traverse_next(union_ht, &hash_key);
281 OCT_DEBUG("skip_union found next: %s\n", next);
283 dident item = ec_did(next, 0);
284 return ec_unify_arg(4, ec_atom(item));
287 hash_traverse_end(union_ht);
294 static int bitfield_index_insert(hash_table* ht, uint64_t key, long int id)
298 struct bitfield* bf = (struct bitfield*) hash_find(ht, key);
300 errval_t err = bitfield_create(&bf);
301 if (err_is_fail(err)) {
304 hash_insert(ht, key, bf);
311 static int bitfield_index_remove(hash_table* ht, uint64_t key, long int id)
315 struct bitfield* bf = (struct bitfield*) hash_find(ht, key);
317 bitfield_off(bf, id);
323 int p_bitfield_add(void) /* p_bitfield_add(Storage, +Name, +[AttributeList], +Id) */
328 bool inserted = false;
330 hash_table* ht = NULL;
331 struct bitfield* no_attr_bf = NULL;
334 res = ec_get_string(ec_arg(1), &storage);
335 if (strcmp(storage, "trigger") == 0) {
337 no_attr_bf = no_attr_triggers;
340 ht = subscriber_index;
341 no_attr_bf = no_attr_subscriptions;
344 res = ec_get_long(ec_arg(3), &id);
345 if (res != PSUCCEED) {
349 pword list, cur, rest;
350 pword attribute_term;
351 for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) {
352 ec_get_arg(1, cur, &attribute_term);
355 ec_get_string(attribute_term, &attribute);
356 uint64_t key = fnv_64a_str(attribute, FNV1A_64_INIT);
358 int res = bitfield_index_insert(ht, key, id);
359 assert(res == PSUCCEED);
364 bitfield_on(no_attr_bf, id);
370 int p_bitfield_remove(void) /* p_bitfield_remove(Storage, +Name, +[AttributeList], +Id) */
377 hash_table* ht = NULL;
378 struct bitfield* no_attr_bf = NULL;
381 res = ec_get_string(ec_arg(1), &storage);
382 if (strcmp(storage, "trigger") == 0) {
384 no_attr_bf = no_attr_triggers;
387 ht = subscriber_index;
388 no_attr_bf = no_attr_subscriptions;
391 res = ec_get_long(ec_arg(3), &id);
392 if (res != PSUCCEED) {
396 pword list, cur, rest;
397 pword attribute_term;
398 for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) {
399 ec_get_arg(1, cur, &attribute_term);
402 res = ec_get_string(attribute_term, &attribute);
403 assert(res == PSUCCEED);
405 uint64_t key = fnv_64a_str(attribute, FNV1A_64_INIT);
406 bitfield_index_remove(ht, key, id);
409 bitfield_off(no_attr_bf, id);
413 int p_bitfield_union(void) /* p_index_union(Storage, -[Attributes], -Current, +Next) */
415 OCT_DEBUG("p_bitfield_union\n");
416 static struct bitfield** sets = NULL;
417 static long int next = -1;
418 static size_t elems = 0;
424 hash_table* ht = NULL;
425 struct bitfield* no_attr_bf = NULL;
427 char* storage = NULL;
428 res = ec_get_string(ec_arg(1), &storage);
429 if (strcmp(storage, "trigger") == 0) {
431 no_attr_bf = no_attr_triggers;
434 ht = subscriber_index;
435 no_attr_bf = no_attr_subscriptions;
438 res = ec_get_long(ec_arg(3), &next);
439 if (res != PSUCCEED) {
440 OCT_DEBUG("state is not a id, find bitmaps\n");
442 pword list, cur, rest;
445 for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) {
448 sets = calloc(elems+1, sizeof(struct bitfield*));
449 sets[0] = no_attr_bf;
452 for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) {
453 res = ec_get_string(cur, &key);
454 if (res != PSUCCEED) {
458 uint64_t hash_key = fnv_64a_str(key, FNV1A_64_INIT);
459 struct bitfield* sl = hash_find(ht, hash_key);
461 OCT_DEBUG("bitfield_union found bitfield for key: %s\n", key);
464 // else: no record with this attribute, just ignore
470 next = bitfield_union(sets, elems, next);
471 OCT_DEBUG("bitfield_union found next: %ld\n", next);
473 pword item = ec_long(next);
474 return ec_unify_arg(4, item);
480 void oct_rpc_enqueue_reply(struct octopus_binding *b, struct oct_reply_state* st);
481 extern struct bitfield* trigger_ids;
483 int p_trigger_watch(void) /* p_trigger_watch(+String, +Mode, +Recipient, +WatchId, -Retract) */
486 OCT_DEBUG("\n*** p_trigger_watch: start\n");
490 res = ec_get_string(ec_arg(1), &record);
491 if (res != PSUCCEED) {
492 assert(ec_is_var(ec_arg(1)) == PSUCCEED);
493 // record will be null
494 // can happen in case we send DIST_REMOVED
497 // Action that triggered the event
499 res = ec_get_long(ec_arg(2), &action);
500 if (res != PSUCCEED) {
505 long int watch_mode = 0;
506 res = ec_get_long(ec_arg(3), &watch_mode);
507 if (res != PSUCCEED) {
511 struct oct_reply_state* drs = NULL;
512 res = ec_get_long(ec_arg(4), (long int*) &drs);
513 if (res != PSUCCEED) {
517 OCT_DEBUG("drs is: %p\n", drs);
519 long int watch_id = 0;
520 res = ec_get_long(ec_arg(5), &watch_id);
521 if (res != PSUCCEED) {
525 OCT_DEBUG("p_trigger_watch: %s\n", record);
526 OCT_DEBUG("drs->binding: %p\n", drs->binding);
527 OCT_DEBUG("drs->reply: %p\n", drs->reply);
530 drs->error = SYS_ERR_OK;
531 bool retract = !(watch_mode & DIST_PERSIST);
532 if (record != NULL) {
533 assert(strlen(record)+1 < MAX_QUERY_LENGTH);
534 strcpy(drs->query_state.stdout.buffer, record);
537 drs->query_state.stdout.buffer[0] = '\0';
538 drs->query_state.stdout.length = 0;
541 if (drs->binding != NULL && drs->reply != NULL) {
544 // Copy reply state because the trigger will stay intact
545 struct oct_reply_state* drs_copy = NULL;
546 errval_t err = new_oct_reply_state(&drs_copy, NULL);
547 assert(err_is_ok(err));
548 memcpy(drs_copy, drs, sizeof(struct oct_reply_state));
549 drs = drs_copy; // overwrite drs
552 assert(trigger_ids != NULL);
553 OCT_DEBUG("turn off trigger id: %lu\n", watch_id);
554 bitfield_off(trigger_ids, watch_id);
557 drs->mode = (retract) ? (action | DIST_REMOVED) : action;
559 if (drs->binding->st != NULL) {
560 oct_rpc_enqueue_reply(drs->binding, drs);
563 drs->reply(drs->binding, drs);
567 USER_PANIC("No binding set for watch_id: %lu", watch_id);
570 OCT_DEBUG("p_trigger_watch: done");
571 return ec_unify_arg(6, ec_long(retract));