about summary refs log tree commit homepage
path: root/mgmt.c
diff options
context:
space:
mode:
Diffstat (limited to 'mgmt.c')
-rw-r--r--mgmt.c70
1 files changed, 52 insertions, 18 deletions
diff --git a/mgmt.c b/mgmt.c
index 1aacb41..6b8d6a6 100644
--- a/mgmt.c
+++ b/mgmt.c
@@ -149,14 +149,33 @@ mgmt_defer_rbuf(struct mog_mgmt *mgmt, struct mog_rbuf *rbuf, size_t buf_len)
         mgmt->buf_off = 0;
 }
 
-static void mgmt_process_client(struct mog_fd *mfd, char *buf)
+static bool
+mgmt_process_client(struct mog_fd *mfd, struct mog_rbuf *rbuf,
+                        char *buf, size_t buf_len)
 {
         struct mog_mgmt *mgmt = &mfd->as.mgmt;
+        struct mog_dev *dev;
+        struct mog_ioq *ioq;
+
+        /* we handle non-filesystem-using commands inline in the parser */
+        if (mgmt->mgmt_method == MOG_MGMT_METHOD_NONE)
+                return true;
+
+        dev = mog_dev_for(mgmt->svc, mgmt->mog_devid, false);
+
+        if (dev) {
+                ioq = mgmt->prio == MOG_PRIO_NONE ? &dev->ioq : &dev->fsckq;
+                if (!mog_ioq_ready(ioq, mfd)) {
+                        if (!mgmt->rbuf)
+                                mgmt->rbuf = mog_rbuf_detach(rbuf);
+                        mgmt->rbuf->rsize = buf_len;
+                        return false;
+                }
+        }
 
         switch (mgmt->mgmt_method) {
         case MOG_MGMT_METHOD_NONE:
-                /* we handle filesystem-using commands inline in the parser */
-                return;
+                assert(0 && "we should never get here: MOG_MGMT_METHOD_NONE");
         case MOG_MGMT_METHOD_SIZE:
                 mog_mgmt_fn_size(mgmt, buf);
                 break;
@@ -165,6 +184,26 @@ static void mgmt_process_client(struct mog_fd *mfd, char *buf)
                 break;
         }
         mgmt->mgmt_method = MOG_MGMT_METHOD_NONE;
+        return true;
+}
+
+static enum mog_next mgmt_run(struct mog_fd *mfd, struct mog_rbuf *rbuf,
+                        char *buf, size_t buf_len)
+{
+        struct mog_mgmt *mgmt = &mfd->as.mgmt;
+
+        if (!mgmt_process_client(mfd, rbuf, buf, buf_len))
+                return MOG_NEXT_IGNORE; /* in ioq */
+        if (mgmt->wbuf == MOG_WR_ERROR)
+                return MOG_NEXT_CLOSE;
+        if (mgmt->forward == MOG_IOSTAT)
+                return mgmt_iostat_forever(mgmt);
+
+        /* stash unread portion in a new buffer */
+        mgmt_defer_rbuf(mgmt, rbuf, buf_len);
+        mog_mgmt_reset_parser(mgmt);
+        assert(mgmt->wbuf != MOG_WR_ERROR);
+        return mgmt->wbuf ? MOG_NEXT_WAIT_WR : MOG_NEXT_ACTIVE;
 }
 
 /*
@@ -191,12 +230,17 @@ static enum mog_next __mgmt_queue_step(struct mog_fd *mfd)
         buf = rbuf->rptr;
         off = mgmt->buf_off;
         assert(off >= 0 && "offset is negative");
-        assert(off < rbuf->rcapa && "offset is too big");
-        if (mgmt->rbuf && off == 0) {
-                /* request got "pipelined", resuming now */
+        assert(off <= rbuf->rcapa && "offset is too big");
+
+        if (mgmt->rbuf) {
                 buf_len = mgmt->rbuf->rsize;
-                goto parse;
+                if (mog_ioq_unblock(mfd))
+                        return mgmt_run(mfd, rbuf, buf, buf_len);
+                assert(off < rbuf->rcapa && "offset is too big");
+                if (off == 0) /* request got "pipelined", resuming now */
+                        goto parse;
         }
+        assert(off < rbuf->rcapa && "offset is too big");
 reread:
         r = read(mfd->fd, buf + off, rbuf->rcapa - off);
         if (r > 0) {
@@ -222,17 +266,7 @@ parse:
                         off = mgmt->buf_off;
                         goto reread;
                 case MOG_PARSER_DONE:
-                        mgmt_process_client(mfd, buf);
-                        if (mgmt->wbuf == MOG_WR_ERROR)
-                                return MOG_NEXT_CLOSE;
-                        if (mgmt->forward == MOG_IOSTAT)
-                                return mgmt_iostat_forever(mgmt);
-
-                        /* stash unread portion in a new buffer */
-                        mgmt_defer_rbuf(mgmt, rbuf, buf_len);
-                        mog_mgmt_reset_parser(mgmt);
-                        assert(mgmt->wbuf != MOG_WR_ERROR);
-                        return mgmt->wbuf ? MOG_NEXT_WAIT_WR : MOG_NEXT_ACTIVE;
+                        return mgmt_run(mfd, rbuf, buf, buf_len);
                 }
         } else if (r == 0) { /* client shut down */
                 TRACE(CMOGSTORED_MGMT_RDCLOSE(mfd, buf_len));