Add extra layer of queuing above the Flounder UMP one.
authorRazvan Damachi <razvan.damachi@gmail.com>
Thu, 13 Jul 2017 14:16:22 +0000 (16:16 +0200)
committerSimon Gerber <simon.gerber@inf.ethz.ch>
Thu, 31 Aug 2017 14:35:09 +0000 (16:35 +0200)
The process manager now handles its own high-level queues, over the ones
provided by Flounder for UMP transmission. This is meant so that when many
clients have requests that the process manager needs to forward to the same
spawnd instance, "register_send" does not return an "already registered" error.

The queuing implementation follows the one in usr/monitor/queue.c.

Signed-off-by: Razvan Damachi <razvan.damachi@gmail.com>

usr/proc_mgmt/service.c
usr/proc_mgmt/spawnd_state.c
usr/proc_mgmt/spawnd_state.h

index fff5189..40a8416 100644 (file)
@@ -72,9 +72,9 @@ static void cleanup_reply_handler(struct spawn_binding *b,
                                   struct capref domain_cap,
                                   errval_t cleanup_err);
 
-static void spawn_request_sender(void *arg)
+static bool spawn_request_sender(struct msg_queue_elem *m)
 {
-    struct pending_spawn *spawn = (struct pending_spawn*) arg;
+    struct pending_spawn *spawn = (struct pending_spawn*) m->st;
 
     errval_t err;
     bool with_caps = !(capref_is_null(spawn->inheritcn_cap) &&
@@ -100,171 +100,112 @@ static void spawn_request_sender(void *arg)
                                               spawn->envbuf, spawn->envbytes,
                                               spawn->flags);
     }
-    if (err_is_ok(err)) {
-        free(spawn);
-    } else {
+
+    if (err_is_fail(err)) {
         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
-            err = spawn->b->register_send(spawn->b, spawn->b->waitset,
-                                          MKCONT(spawn_request_sender, arg));
-            if (err_is_fail(err)) {
-                DEBUG_ERR(err, "registering for spawn request");
-                pending_clients_release(spawn->domain_cap,
-                                        with_caps ? ClientType_SpawnWithCaps
-                                                  : ClientType_Spawn,
-                                        NULL);
-                free(spawn);
-            }
+            return false;
         } else {
-            DEBUG_ERR(err, "sending spawn request");
-            pending_clients_release(spawn->domain_cap,
-                                    with_caps ? ClientType_SpawnWithCaps
-                                              : ClientType_Spawn,
-                                    NULL);
-            free(spawn);
+            USER_PANIC_ERR(err, "sending spawn request");
         }
     }
+
+    free(spawn);
+    free(m);
+
+    return true;
 }
 
-static void span_request_sender(void *arg)
+static bool span_request_sender(struct msg_queue_elem *m)
 {
-    struct pending_span *span = (struct pending_span*) arg;
+    struct pending_span *span = (struct pending_span*) m->st;
 
     errval_t err;
     span->b->rx_vtbl.span_reply = span_reply_handler;
     err = span->b->tx_vtbl.span_request(span->b, NOP_CONT, cap_procmng,
                                         span->domain_cap, span->vroot,
                                         span->dispframe);
-    if (err_is_ok(err)) {
-        err = domain_span(span->domain_cap, span->core_id);
-        if (err_is_fail(err)) {
-            DEBUG_ERR(err, "failed domain_span to core %u\n", span->core_id);
-        }
-        free(span);
-    } else {
+
+    if (err_is_fail(err)) {
         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
-            err = span->b->register_send(span->b, span->b->waitset,
-                                         MKCONT(span_request_sender, arg));
-            if (err_is_fail(err)) {
-                DEBUG_ERR(err, "registering for span request");
-                pending_clients_release(span->domain_cap, ClientType_Span,
-                                        NULL);
-                free(span);
-            }
+            return false;
         } else {
-            DEBUG_ERR(err, "sending span request");
-            pending_clients_release(span->domain_cap, ClientType_Span, NULL);
-            free(span);
+            USER_PANIC_ERR(err, "sending span request");
         }
     }
+
+    free(span);
+    free(m);
+
+    return true;
 }
 
-static void kill_request_sender(void *arg)
+static bool kill_request_sender(struct msg_queue_elem *m)
 {
-    struct pending_kill_exit_cleanup *kill = (struct pending_kill_exit_cleanup*) arg;
+    struct pending_kill_exit_cleanup *kill = (struct pending_kill_exit_cleanup*) m->st;
 
     errval_t err;
     kill->sb->rx_vtbl.kill_reply = kill_reply_handler;
     err = kill->sb->tx_vtbl.kill_request(kill->sb, NOP_CONT, cap_procmng,
                                         kill->domain_cap);
-    if (err_is_ok(err)) {
-        free(kill);
-    } else {
-        if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
-            err = kill->sb->register_send(kill->sb, kill->sb->waitset,
-                                         MKCONT(kill_request_sender, arg));
-            if (err_is_fail(err)) {
-                DEBUG_ERR(err, "registering for kill request");
-
-                struct pending_client *cl;
-                err = pending_clients_release_one(kill->domain_cap,
-                                                  ClientType_Kill,
-                                                  kill->pmb, &cl);
-                if (err_is_ok(err)) {
-                    while (cl != NULL) {
-                        struct pending_client *tmp = cl;
-                        cl = cl->next;
-                        free(tmp);
-                    }
-                }
 
-                free(kill);
-            }
+    if (err_is_fail(err)) {
+        if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
+            return false;
         } else {
-            DEBUG_ERR(err, "sending kill request");
-            
-            struct pending_client *cl;
-            err = pending_clients_release_one(kill->domain_cap,
-                                              ClientType_Kill,
-                                              kill->pmb, &cl);
-            if (err_is_ok(err)) {
-                while (cl != NULL) {
-                    struct pending_client *tmp = cl;
-                    cl = cl->next;
-                    free(tmp);
-                }
-            }
-
-            free(kill);
+            USER_PANIC_ERR(err, "sending kill request");
         }
     }
+
+    free(kill);
+    free(m);
+
+    return true;
 }
 
-static void exit_request_sender(void *arg)
+static bool exit_request_sender(struct msg_queue_elem *m)
 {
-    struct pending_kill_exit_cleanup *exit = (struct pending_kill_exit_cleanup*) arg;
+    struct pending_kill_exit_cleanup *exit = (struct pending_kill_exit_cleanup*) m->st;
 
     errval_t err;
     exit->sb->rx_vtbl.exit_reply = exit_reply_handler;
     err = exit->sb->tx_vtbl.exit_request(exit->sb, NOP_CONT, cap_procmng,
                                         exit->domain_cap);
-    if (err_is_ok(err)) {
-        free(exit);
-    } else {
+
+    if (err_is_fail(err)) {
         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
-            err = exit->sb->register_send(exit->sb, exit->sb->waitset,
-                                         MKCONT(exit_request_sender, arg));
-            if (err_is_fail(err)) {
-                DEBUG_ERR(err, "registering for exit request");
-                err = pending_clients_release(exit->domain_cap, ClientType_Exit,
-                                              NULL);
-                free(exit);
-            }
+            return false;
         } else {
-            DEBUG_ERR(err, "sending exit request");
-            err = pending_clients_release(exit->domain_cap, ClientType_Exit,
-                                          NULL);
-            free(exit);
+            USER_PANIC_ERR(err, "sending exit request");
         }
     }
+
+    free(exit);
+    free(m);
+
+    return true;
 }
 
-static void cleanup_request_sender(void *arg)
+static bool cleanup_request_sender(struct msg_queue_elem *m)
 {
-    struct pending_kill_exit_cleanup *cleanup = (struct pending_kill_exit_cleanup*) arg;
+    struct pending_kill_exit_cleanup *cleanup = (struct pending_kill_exit_cleanup*) m->st;
 
     errval_t err;
     cleanup->sb->rx_vtbl.cleanup_reply = cleanup_reply_handler;
     err = cleanup->sb->tx_vtbl.cleanup_request(cleanup->sb, NOP_CONT, cap_procmng,
                                               cleanup->domain_cap);
-    if (err_is_ok(err)) {
-        free(cleanup);
-    } else {
+
+    if (err_is_fail(err)) {
         if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
-            err = cleanup->sb->register_send(cleanup->sb, cleanup->sb->waitset,
-                                            MKCONT(cleanup_request_sender, arg));
-            if (err_is_fail(err)) {
-                DEBUG_ERR(err, "registering for cleanup request");
-                pending_clients_release(cleanup->domain_cap, ClientType_Cleanup,
-                                        NULL);
-                free(cleanup);
-            }
+            return false;
         } else {
-            DEBUG_ERR(err, "sending cleanup request");
-            pending_clients_release(cleanup->domain_cap, ClientType_Cleanup,
-                                    NULL);
-            free(cleanup);
+            USER_PANIC_ERR(err, "sending cleanup request");
         }
     }
+
+    free(cleanup);
+    free(m);
+
+    return true;
 }
 
 static void spawn_reply_handler(struct spawn_binding *b,
@@ -498,12 +439,17 @@ static void kill_reply_handler(struct spawn_binding *b,
             cleanup->sb = spb;
             cleanup->domain_cap = domain_cap;
 
-            spb->rx_vtbl.cleanup_reply = cleanup_reply_handler;
-            err = spb->register_send(spb, spb->waitset,
-                                     MKCONT(cleanup_request_sender, cleanup));
+            struct msg_queue_elem *msg = (struct msg_queue_elem*) malloc(
+                    sizeof(struct msg_queue_elem));
+            msg->st = cleanup;
+            msg->cont = cleanup_request_sender;
+
+            err = spawnd_state_enqueue_send(entry->spawnds[i], msg);
+
             if (err_is_fail(err)) {
-                DEBUG_ERR(err, "registering for cleanup request");
+                DEBUG_ERR(err, "enqueuing cleanup request");
                 free(cleanup);
+                free(msg);
             }
         }
     } else {
@@ -583,12 +529,17 @@ static void exit_reply_handler(struct spawn_binding *b,
             cleanup->sb = spb;
             cleanup->domain_cap = domain_cap;
 
-            spb->rx_vtbl.cleanup_reply = cleanup_reply_handler;
-            err = spb->register_send(spb, spb->waitset,
-                                     MKCONT(cleanup_request_sender, cleanup));
+            struct msg_queue_elem *msg = (struct msg_queue_elem*) malloc(
+                    sizeof(struct msg_queue_elem));
+            msg->st = cleanup;
+            msg->cont = cleanup_request_sender;
+
+            err = spawnd_state_enqueue_send(entry->spawnds[i], msg);
+
             if (err_is_fail(err)) {
-                DEBUG_ERR(err, "registering for cleanup request");
+                DEBUG_ERR(err, "enqueuing cleanup request");
                 free(cleanup);
+                free(msg);
             }
         }
     } else {
@@ -612,9 +563,9 @@ static errval_t spawn_handler_common(struct proc_mgmt_binding *b,
         return PROC_MGMT_ERR_INVALID_SPAWND;
     }
 
-    struct spawnd_state *state = spawnd_state_get(core_id);
-    assert(state != NULL);
-    struct spawn_binding *cl = state->b;
+    struct spawnd_state *spawnd = spawnd_state_get(core_id);
+    assert(spawnd != NULL);
+    struct spawn_binding *cl = spawnd->b;
     assert(cl != NULL);
 
     struct capref domain_cap;
@@ -649,11 +600,17 @@ static errval_t spawn_handler_common(struct proc_mgmt_binding *b,
     spawn->argcn_cap = argcn_cap;
     spawn->flags = flags;
 
-    err = cl->register_send(cl, cl->waitset,
-                            MKCONT(spawn_request_sender, spawn));
+    struct msg_queue_elem *msg = (struct msg_queue_elem*) malloc(
+            sizeof(struct msg_queue_elem));
+    msg->st = spawn;
+    msg->cont = spawn_request_sender;
+
+    err = spawnd_state_enqueue_send(spawnd, msg);
+
     if (err_is_fail(err)) {
-        DEBUG_ERR(err, "registering for spawn request");
+        DEBUG_ERR(err, "enqueuing spawn request");
         free(spawn);
+        free(msg);
     }
 
     return SYS_ERR_OK;
@@ -717,9 +674,9 @@ static void span_handler(struct proc_mgmt_binding *b, struct capref domain_cap,
         goto respond_with_err;
     }
 
-    struct spawnd_state *state = spawnd_state_get(core_id);
-    assert(state != NULL);
-    struct spawn_binding *cl = state->b;
+    struct spawnd_state *spawnd = spawnd_state_get(core_id);
+    assert(spawnd != NULL);
+    struct spawn_binding *cl = spawnd->b;
     assert(cl != NULL);
 
     err = pending_clients_add(domain_cap, b, ClientType_Span, core_id);
@@ -735,11 +692,17 @@ static void span_handler(struct proc_mgmt_binding *b, struct capref domain_cap,
     span->vroot = vroot;
     span->dispframe = dispframe;
 
-    err = cl->register_send(cl, cl->waitset,
-                            MKCONT(span_request_sender, span));
+    struct msg_queue_elem *msg = (struct msg_queue_elem*) malloc(
+            sizeof(struct msg_queue_elem));
+    msg->st = span;
+    msg->cont = span_request_sender;
+
+    err = spawnd_state_enqueue_send(spawnd, msg);
+
     if (err_is_fail(err)) {
-        DEBUG_ERR(err, "registering for span request");
+        DEBUG_ERR(err, "enqueuing span request");
         free(span);
+        free(msg);
     }
 
 respond_with_err:
@@ -780,25 +743,36 @@ static errval_t kill_handler_common(struct proc_mgmt_binding *b,
         cmd->domain_cap = domain_cap;
         cmd->sb = spb;
 
+        struct msg_queue_elem *msg = (struct msg_queue_elem*) malloc(
+                sizeof(struct msg_queue_elem));
+        msg->st = cmd;
+
         switch (type) {
             case ClientType_Kill:
                 cmd->pmb = b;
-                err = spb->register_send(spb, spb->waitset,
-                                         MKCONT(kill_request_sender, cmd));
+                msg->cont = kill_request_sender;
+
+                err = spawnd_state_enqueue_send(entry->spawnds[i], msg);
+
                 if (err_is_fail(err)) {
-                    DEBUG_ERR(err, "registering for kill request");
+                    DEBUG_ERR(err, "enqueuing kill request");
                     free(cmd);
+                    free(msg);
                 }
                 break;
 
             case ClientType_Exit:
-                err = spb->register_send(spb, spb->waitset,
-                                         MKCONT(exit_request_sender, cmd));
+                msg->cont = exit_request_sender;
+
+                err = spawnd_state_enqueue_send(entry->spawnds[i], msg);
+
                 if (err_is_fail(err)) {
-                    DEBUG_ERR(err, "registering for exit request");
+                    DEBUG_ERR(err, "enqueuing exit request");
                     free(cmd);
+                    free(msg);
                 }
                 break;
+
             default:
                 USER_PANIC("invalid client type for kill: %u\n", type);
         }
index 75f3022..219a000 100644 (file)
@@ -25,6 +25,8 @@ errval_t spawnd_state_alloc(coreid_t core_id, struct spawn_binding *b)
 
     spawnds[core_id]->b = b;
     spawnds[core_id]->core_id = core_id;
+    spawnds[core_id]->queue.head = NULL;
+    spawnds[core_id]->queue.tail = NULL;
 
     return SYS_ERR_OK;
 }
@@ -45,3 +47,112 @@ inline struct spawnd_state *spawnd_state_get(coreid_t core_id)
 {
     return spawnds[core_id];
 }
+
+/**
+ * \brief Enqueue on a waitset queue.
+ *
+ * \param q    Pointer to queue to enqueue on
+ * \param m    Pointer to element to enqueue
+ *
+ * \return true if queue was empty, false if not.
+ */
+static bool enqueue_send(struct msg_queue *q, struct msg_queue_elem *m)
+{
+    assert(m->next == NULL);
+
+    // Enqueue on the queue
+    if(q->tail != NULL) {
+        q->tail->next = m;
+    } else {
+        assert(q->head == NULL);
+        q->head = m;
+    }
+    q->tail = m;
+
+    return q->head == q->tail ? true : false;
+}
+
+/**
+ * \brief Dequeues from a waitset queue.
+ *
+ * \param q    Pointer to queue to dequeue from
+ *
+ * \return the newly dequeued element.
+ */
+static struct msg_queue_elem *dequeue_send(struct msg_queue *q)
+{
+    // Queue should have at least one element
+    assert(q->head != NULL && q->tail != NULL);
+
+    struct msg_queue_elem *e = q->head;
+    q->head = e->next;
+    if(q->head == NULL) {
+        q->tail = NULL;
+    }
+
+    return e;
+}
+
+/**
+ * \brief Enqueue an element on a waitset queue IN FRONT.
+ *
+ * \param q    Pointer to queue to enqueue on
+ * \param m    Pointer to element to enqueue
+ *
+ * \return true if queue was empty, false if not.
+ */
+static bool enqueue_send_at_front(struct msg_queue *q, struct msg_queue_elem *m)
+{
+    assert(m->next == NULL);
+    if(q->tail == NULL) {
+        assert(q->head == NULL);
+        q->head = m;
+        q->tail = m;
+    } else {
+        m->next = q->head;
+        q->head = m;
+    }
+    return q->head == q->tail ? true : false;
+}
+
+static void spawnd_send_handler(void *arg)
+{
+    struct spawnd_state *spawnd = (struct spawnd_state*) arg;
+    struct msg_queue *q = &spawnd->queue;
+
+    // Dequeue next element from the queue
+    struct msg_queue_elem *m = (struct msg_queue_elem*) dequeue_send(q);
+
+    assert(m->cont != NULL);
+    if (!m->cont(m)) {
+        // Flounder TX busy, need to re-enqueue message.
+        // TODO(razvan): Re-enqueuing at the front of the queue, to preserve
+        // original message order. Could a different strategy be preferrable?
+        enqueue_send_at_front(q, m);
+    }
+
+    if (q->head != NULL) {
+        // Queue is non-empty, therefore re-register.
+        errval_t err = spawnd->b->register_send(spawnd->b, spawnd->b->waitset,
+                                                MKCONT(spawnd_send_handler,
+                                                       arg));
+        if (err_is_fail(err)) {
+            DEBUG_ERR(err, "regitering for spawnd send");
+            return;
+        }
+    }
+}
+
+errval_t spawnd_state_enqueue_send(struct spawnd_state *spawnd,
+                                   struct msg_queue_elem *msg)
+{
+    msg->next = NULL;
+
+    // If queue was empty, enqueue on waitset
+    if(enqueue_send(&spawnd->queue, msg)) {
+        return spawnd->b->register_send(spawnd->b, spawnd->b->waitset,
+                                        MKCONT(spawnd_send_handler, spawnd));
+    } else {
+        return SYS_ERR_OK;
+    }
+}
\ No newline at end of file
index ad2e845..7fbc39f 100644 (file)
 #include <if/spawn_defs.h>
 #include <barrelfish/barrelfish.h>
 
+struct spawnd_state;
+struct msg_queue_elem;
+typedef bool (*msg_cont_handler_fn)(struct msg_queue_elem*);
+
+struct msg_queue_elem {
+       void *st;
+       msg_cont_handler_fn cont;
+
+    struct msg_queue_elem *next;
+};
+
+struct msg_queue {
+    struct msg_queue_elem *head, *tail;
+};
+
 struct spawnd_state {
     coreid_t core_id;
     struct spawn_binding *b;
+
+    struct msg_queue queue;
 };
 
 errval_t spawnd_state_alloc(coreid_t core_id, struct spawn_binding *b);
@@ -27,4 +44,7 @@ void spawnd_state_free(coreid_t core_id);
 bool spawnd_state_exists(coreid_t core_id);
 struct spawnd_state *spawnd_state_get(coreid_t core_id);
 
+errval_t spawnd_state_enqueue_send(struct spawnd_state *spawnd,
+                                   struct msg_queue_elem *msg);
+
 #endif  // SPAWND_STATE