3 * \brief Tests for octopus publish/subscribe API
7 * Copyright (c) 2011, ETH Zurich.
10 * This file is distributed under the terms in the attached LICENSE file.
11 * If you do not find this file, copies can be found by writing to:
12 * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
19 #include <octopus/octopus.h>
23 static const char* barrier_name = "d2pubsub_test";
24 static struct thread_sem ts;
26 static void message_handler(oct_mode_t mode, char* record, void* st)
28 size_t* received = (size_t*) st;
30 if (mode & OCT_ON_PUBLISH) {
31 static const char* receive_order[] =
32 { "msg_2", "msg_4", "msg_5", "msg_5", "msg_6", "msg_7" };
35 debug_printf("Message: %s received: %lu\n", record, *received);
37 errval_t err = oct_read(record, "%s", &name);
39 ASSERT_STRING(receive_order[*received], name);
45 else if (mode & OCT_REMOVED) {
46 debug_printf("OCT_REMOVED set...\n");
52 static void subscriber(void)
55 subscription_t id1 = 0;
56 subscription_t id2 = 0;
57 subscription_t id3 = 0;
58 subscription_t id4 = 0;
60 char* barrier_record = NULL;
62 thread_sem_init(&ts, 0);
64 err = oct_subscribe(message_handler, &received, &id1, "111 [] attr: 10 }");
65 ASSERT_ERR(err, OCT_ERR_PARSER_FAIL);
67 err = oct_subscribe(message_handler, &received, &id1,
68 "_ { fl: 1.01, attr: 10 }");
70 debug_printf("id is: %lu\n", id1);
72 char* str = "test.txt";
73 err = oct_subscribe(message_handler, &received, &id2, "_ { str: r'%s' }",
76 debug_printf("id is: %lu\n", id2);
78 err = oct_subscribe(message_handler, &received, &id3, "_ { age > %d }",
81 debug_printf("id is: %lu\n", id3);
83 err = oct_subscribe(message_handler, &received, &id4,
86 debug_printf("id is: %lu\n", id4);
88 // Synchronize with publisher
89 err = oct_barrier_enter(barrier_name, &barrier_record, 2);
90 if (err_is_fail(err)) DEBUG_ERR(err, "barrier enter");
91 assert(err_is_ok(err));
93 // Wait until all messages received
94 while(received != 6) {
95 messages_wait_and_handle_next();
98 // Unsubscribe message handlers
99 err = oct_unsubscribe(id1);
101 err = oct_unsubscribe(id2);
103 err = oct_unsubscribe(id3);
105 err = oct_unsubscribe(id4);
108 while(received != 10) {
109 messages_wait_and_handle_next();
112 oct_barrier_leave(barrier_record);
113 free(barrier_record);
115 printf("Subscriber all done.\n");
118 static void publisher(void)
121 char* barrier_record = NULL;
123 // Synchronize with subscriber
124 err = oct_barrier_enter(barrier_name, &barrier_record, 2);
125 if (err_is_fail(err)) DEBUG_ERR(err, "barrier enter");
126 assert(err_is_ok(err));
128 err = oct_publish("msg_1 { age: %d }", 9);
131 err = oct_publish("msg_2 { age: %d }", 10);
134 err = oct_publish("msg_3 { str: %d, age: '%d' }", 123, 8);
137 err = oct_publish("msg_4 { str: 'test.txt' }");
140 err = oct_publish("msg_5 { str: 'test.txt', attr: 10, fl: 1.01 }");
143 err = oct_publish("msg_6 { type: 'test', pattern: '123123' }");
146 err = oct_publish("msg_7 { type: 'test' }");
149 oct_barrier_leave(barrier_record);
150 free(barrier_record);
152 printf("Publisher all done.\n");
155 int main(int argc, char** argv)
160 if (strcmp(argv[1], "subscriber") == 0) {
162 } else if (strcmp(argv[1], "publisher") == 0) {
165 printf("Bad arguments (Valid choices are subscriber/publisher).");