744251ac970672e0c977bc6f7aecbe7fe5bfe5fb
[barrelfish] / usr / tests / dist2test / d2pubsub.c
1 /**
2  * \file
3  * \brief Tests for octopus publish/subscribe API
4  */
5
6 /*
7  * Copyright (c) 2011, ETH Zurich.
8  * All rights reserved.
9  *
10  * This file is distributed under the terms in the attached LICENSE file.
11  * If you do not find this file, copies can be found by writing to:
12  * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
13  */
14
15 #include <stdlib.h>
16 #include <string.h>
17 #include <stdio.h>
18
19 #include <octopus/octopus.h>
20
21 #include "common.h"
22
23 static const char* barrier_name = "d2pubsub_test";
24 static struct thread_sem ts;
25
26 static void message_handler(oct_mode_t mode, char* record, void* st)
27 {
28     size_t* received = (size_t*) st;
29
30     if (mode & DIST_ON_PUBLISH) {
31         static const char* receive_order[] =
32         { "msg_2", "msg_4", "msg_5", "msg_5", "msg_6", "msg_7" };
33         char* name = NULL;
34
35         debug_printf("Message: %s received: %lu\n", record, *received);
36
37         errval_t err = oct_read(record, "%s", &name);
38         ASSERT_ERR_OK(err);
39         ASSERT_STRING(receive_order[*received], name);
40
41
42         free(name);
43         free(record);
44     }
45     else if (mode & DIST_REMOVED) {
46         debug_printf("DIST_REMOVED set...\n");
47     }
48
49     (*received)++;
50 }
51
52 static void subscriber(void)
53 {
54     errval_t err;
55     subscription_t id1 = 0;
56     subscription_t id2 = 0;
57     subscription_t id3 = 0;
58     subscription_t id4 = 0;
59     size_t received = 0;
60     char* barrier_record = NULL;
61
62     thread_sem_init(&ts, 0);
63
64     err = oct_subscribe(message_handler, &received, &id1, "111 [] attr: 10 }");
65     ASSERT_ERR(err, OCT_ERR_PARSER_FAIL);
66
67     err = oct_subscribe(message_handler, &received, &id1,
68             "_ { fl: 1.01, attr: 10 }");
69     ASSERT_ERR_OK(err);
70     debug_printf("id is: %lu\n", id1);
71
72     char* str = "test.txt";
73     err = oct_subscribe(message_handler, &received, &id2, "_ { str: r'%s' }",
74             str);
75     ASSERT_ERR_OK(err);
76     debug_printf("id is: %lu\n", id2);
77
78     err = oct_subscribe(message_handler, &received, &id3, "_ { age > %d }",
79             9);
80     ASSERT_ERR_OK(err);
81     debug_printf("id is: %lu\n", id3);
82
83     err = oct_subscribe(message_handler, &received, &id4,
84             "r'^msg_(6|7)$'");
85     ASSERT_ERR_OK(err);
86     debug_printf("id is: %lu\n", id4);
87
88     // Synchronize with publisher
89     err = oct_barrier_enter(barrier_name, &barrier_record, 2);
90     if (err_is_fail(err)) DEBUG_ERR(err, "barrier enter");
91     assert(err_is_ok(err));
92
93     // Wait until all messages received
94     while(received != 6) {
95         messages_wait_and_handle_next();
96     }
97
98     // Unsubscribe message handlers
99     err = oct_unsubscribe(id1);
100     ASSERT_ERR_OK(err);
101     err = oct_unsubscribe(id2);
102     ASSERT_ERR_OK(err);
103     err = oct_unsubscribe(id3);
104     ASSERT_ERR_OK(err);
105     err = oct_unsubscribe(id4);
106     ASSERT_ERR_OK(err);
107
108     while(received != 10) {
109         messages_wait_and_handle_next();
110     }
111
112     oct_barrier_leave(barrier_record);
113     free(barrier_record);
114
115     printf("Subscriber all done.\n");
116 }
117
118 static void publisher(void)
119 {
120     errval_t err;
121     char* barrier_record = NULL;
122
123     // Synchronize with subscriber
124     err = oct_barrier_enter(barrier_name, &barrier_record, 2);
125     if (err_is_fail(err)) DEBUG_ERR(err, "barrier enter");
126     assert(err_is_ok(err));
127
128     err = oct_publish("msg_1 { age: %d }", 9);
129     ASSERT_ERR_OK(err);
130
131     err = oct_publish("msg_2 { age: %d }", 10);
132     ASSERT_ERR_OK(err);
133
134     err = oct_publish("msg_3 { str: %d, age: '%d' }", 123, 8);
135     ASSERT_ERR_OK(err);
136
137     err = oct_publish("msg_4 { str: 'test.txt' }");
138     ASSERT_ERR_OK(err);
139
140     err = oct_publish("msg_5 { str: 'test.txt', attr: 10, fl: 1.01 }");
141     ASSERT_ERR_OK(err);
142
143     err = oct_publish("msg_6 { type: 'test', pattern: '123123' }");
144     ASSERT_ERR_OK(err);
145
146     err = oct_publish("msg_7 { type: 'test' }");
147     ASSERT_ERR_OK(err);
148
149     oct_barrier_leave(barrier_record);
150     free(barrier_record);
151
152     printf("Publisher all done.\n");
153 }
154
155 int main(int argc, char** argv)
156 {
157     oct_init();
158     assert(argc >= 2);
159
160     if (strcmp(argv[1], "subscriber") == 0) {
161         subscriber();
162     } else if (strcmp(argv[1], "publisher") == 0) {
163         publisher();
164     } else {
165         printf("Bad arguments (Valid choices are subscriber/publisher).");
166     }
167
168     return EXIT_SUCCESS;
169 }
170