about summary refs log tree commit homepage
path: root/http.c
diff options
context:
space:
mode:
Diffstat (limited to 'http.c')
-rw-r--r--http.c106
1 files changed, 75 insertions, 31 deletions
diff --git a/http.c b/http.c
index 2f0b435..e440d2b 100644
--- a/http.c
+++ b/http.c
@@ -59,14 +59,24 @@ http_defer_rbuf(struct mog_http *http, struct mog_rbuf *rbuf, size_t buf_len)
         http->_p.buf_off = 0;
 }
 
-static void
-http_process_client(struct mog_fd *mfd, char *buf, size_t buf_len)
+static bool
+http_process_client(struct mog_fd *mfd, struct mog_rbuf *rbuf,
+                        char *buf, size_t buf_len)
 {
         struct mog_http *http = &mfd->as.http;
+        struct mog_dev *dev;
 
         assert(http->wbuf == NULL &&
                "processing client with buffered data");
 
+        dev = mog_dev_for(http->svc, http->_p.mog_devid, false);
+        if (dev && !mog_ioq_ready(&dev->ioq, mfd)) {
+                if (!http->rbuf)
+                        http->rbuf = mog_rbuf_detach(rbuf);
+                http->rbuf->rsize = buf_len;
+                return false;
+        }
+
         switch (http->_p.http_method) {
         case MOG_HTTP_METHOD_NONE: assert(0 && "BUG: unset HTTP method");
         case MOG_HTTP_METHOD_GET: mog_http_get_open(mfd, buf); break;
@@ -75,6 +85,8 @@ http_process_client(struct mog_fd *mfd, char *buf, size_t buf_len)
         case MOG_HTTP_METHOD_MKCOL: mog_http_mkcol(mfd, buf); break;
         case MOG_HTTP_METHOD_PUT: mog_http_put(mfd, buf, buf_len); break;
         }
+
+        return true;
 }
 
 MOG_NOINLINE static void http_close(struct mog_fd *mfd)
@@ -144,9 +156,18 @@ static enum mog_next http_wbuf_in_progress(struct mog_http *http)
         return MOG_NEXT_CLOSE;
 }
 
-static enum mog_next http_forward_in_progress(struct mog_fd *mfd)
+static enum mog_next http_forward_in_progress(struct mog_fd *mfd, bool needq)
 {
-        enum mog_http_method method = mfd->as.http._p.http_method;
+        struct mog_http *http = &mfd->as.http;
+        enum mog_http_method method = http->_p.http_method;
+        struct mog_dev *dev;
+
+        if (needq) {
+                dev = mog_dev_for(http->svc, http->_p.mog_devid, false);
+                if (dev && !mog_ioq_ready(&dev->ioq, mfd))
+                        /* no need to setup/stash rbuf, it's been done */
+                        return MOG_NEXT_IGNORE;
+        }
 
         if (method == MOG_HTTP_METHOD_GET)
                 return mog_http_get_in_progress(mfd);
@@ -156,7 +177,35 @@ static enum mog_next http_forward_in_progress(struct mog_fd *mfd)
         return mog_http_put_in_progress(mfd);
 }
 
-static enum mog_next http_queue_step(struct mog_fd *mfd)
+static enum mog_next http_run(struct mog_fd *mfd, struct mog_rbuf *rbuf,
+                        char *buf, size_t buf_len)
+{
+        struct mog_http *http = &mfd->as.http;
+
+        if (!http_process_client(mfd, rbuf, buf, buf_len))
+                return MOG_NEXT_IGNORE; /* in ioq */
+        if (http->wbuf == MOG_WR_ERROR)
+                return MOG_NEXT_CLOSE;
+        if (http->wbuf) {
+                http_defer_rbuf(http, rbuf, buf_len);
+                return MOG_NEXT_WAIT_WR;
+        } else if (http->forward) {
+                http_defer_rbuf(http, rbuf, buf_len);
+                return http_forward_in_progress(mfd, false);
+        } else if (!http->_p.persistent) {
+                return MOG_NEXT_CLOSE;
+        } else {
+                /* pipelined request */
+                if (buf_len)
+                        TRACE(CMOGSTORED_HTTP_REQ_BEGIN(true));
+
+                http_defer_rbuf(http, rbuf, buf_len);
+                mog_http_reset(http);
+        }
+        return MOG_NEXT_ACTIVE;
+}
+
+static enum mog_next __http_queue_step(struct mog_fd *mfd)
 {
         struct mog_http *http = &mfd->as.http;
         struct mog_rbuf *rbuf;
@@ -169,22 +218,28 @@ static enum mog_next http_queue_step(struct mog_fd *mfd)
         assert(mfd->fd >= 0 && "http fd is invalid");
 
         if (http->wbuf) return http_wbuf_in_progress(http);
-        if (http->forward) return http_forward_in_progress(mfd);
+        if (http->forward) return http_forward_in_progress(mfd, true);
 
         /* we may have pipelined data in http->rbuf */
         rbuf = http->rbuf ? http->rbuf : mog_rbuf_get(MOG_RBUF_BASE_SIZE);
         buf = rbuf->rptr;
         off = http->_p.buf_off;
-        assert(off < rbuf->rcapa && "offset is too big");
+        assert(off >= 0 && "offset is negative");
+        assert(off <= rbuf->rcapa && "offset is too big");
+
         if (http->rbuf) {
-                /* request got pipelined, resuming now */
                 buf_len = http->rbuf->rsize;
+                if (mog_ioq_unblock(mfd))
+                        return http_run(mfd, rbuf, buf, buf_len);
+                /* request got pipelined, resuming now */
+                assert(off < rbuf->rcapa && "offset is too big");
                 assert(http->_p.buf_off <= buf_len
                         && "bad offset from pipelining");
                 assert(buf_len <= http->rbuf->rcapa && "bad rsize stashed");
                 if (http->_p.buf_off < buf_len)
                         goto parse;
         }
+        assert(off < rbuf->rcapa && "offset is too big");
 reread:
         r = read(mfd->fd, buf + off, rbuf->rcapa - off);
         if (r > 0) {
@@ -211,26 +266,7 @@ parse:
                         off = http->_p.buf_off;
                         goto reread;
                 case MOG_PARSER_DONE:
-                        http_process_client(mfd, buf, buf_len);
-                        if (http->wbuf == MOG_WR_ERROR)
-                                return MOG_NEXT_CLOSE;
-                        if (http->wbuf) {
-                                http_defer_rbuf(http, rbuf, buf_len);
-                                return MOG_NEXT_WAIT_WR;
-                        } else if (http->forward) {
-                                http_defer_rbuf(http, rbuf, buf_len);
-                                return http_forward_in_progress(mfd);
-                        } else if (!http->_p.persistent) {
-                                return MOG_NEXT_CLOSE;
-                        } else {
-                                /* pipelined request */
-                                if (buf_len)
-                                        TRACE(CMOGSTORED_HTTP_REQ_BEGIN(true));
-
-                                http_defer_rbuf(http, rbuf, buf_len);
-                                mog_http_reset(http);
-                        }
-                        return MOG_NEXT_ACTIVE;
+                        return http_run(mfd, rbuf, buf, buf_len);
                 }
         } else if (r == 0) { /* client shut down */
                 TRACE(CMOGSTORED_HTTP_RDCLOSE(buf_len));
@@ -269,14 +305,22 @@ err400:
         return MOG_NEXT_CLOSE;
 }
 
+static enum mog_next http_queue_step(struct mog_fd *mfd)
+{
+        enum mog_next next = __http_queue_step(mfd);
+
+        /* enqueue any pending waiters before we become enqueued ourselves */
+        mog_ioq_next(NULL);
+
+        return next;
+}
+
 enum mog_next mog_http_queue_step(struct mog_fd *mfd)
 {
         enum mog_next rv = http_queue_step(mfd);
 
         if (rv == MOG_NEXT_CLOSE)
                 http_close(mfd);
-        assert(rv != MOG_NEXT_IGNORE &&
-               "refusing to put HTTP client into ignore state");
         return rv;
 }
 
@@ -301,7 +345,7 @@ void mog_http_quit_step(struct mog_fd *mfd)
         case MOG_NEXT_ACTIVE: mog_activeq_push(q, mfd); return;
         case MOG_NEXT_WAIT_WR: mog_idleq_push(q, mfd, MOG_QEV_WR); return;
         case MOG_NEXT_IGNORE:
-                assert(0 && "refused to put HTTP client into ignore state");
+                return;
         }
 }