diff options
Diffstat (limited to 'http.c')
-rw-r--r-- | http.c | 106 |
1 files changed, 75 insertions, 31 deletions
@@ -59,14 +59,24 @@ http_defer_rbuf(struct mog_http *http, struct mog_rbuf *rbuf, size_t buf_len) http->_p.buf_off = 0; } -static void -http_process_client(struct mog_fd *mfd, char *buf, size_t buf_len) +static bool +http_process_client(struct mog_fd *mfd, struct mog_rbuf *rbuf, + char *buf, size_t buf_len) { struct mog_http *http = &mfd->as.http; + struct mog_dev *dev; assert(http->wbuf == NULL && "processing client with buffered data"); + dev = mog_dev_for(http->svc, http->_p.mog_devid, false); + if (dev && !mog_ioq_ready(&dev->ioq, mfd)) { + if (!http->rbuf) + http->rbuf = mog_rbuf_detach(rbuf); + http->rbuf->rsize = buf_len; + return false; + } + switch (http->_p.http_method) { case MOG_HTTP_METHOD_NONE: assert(0 && "BUG: unset HTTP method"); case MOG_HTTP_METHOD_GET: mog_http_get_open(mfd, buf); break; @@ -75,6 +85,8 @@ http_process_client(struct mog_fd *mfd, char *buf, size_t buf_len) case MOG_HTTP_METHOD_MKCOL: mog_http_mkcol(mfd, buf); break; case MOG_HTTP_METHOD_PUT: mog_http_put(mfd, buf, buf_len); break; } + + return true; } MOG_NOINLINE static void http_close(struct mog_fd *mfd) @@ -144,9 +156,18 @@ static enum mog_next http_wbuf_in_progress(struct mog_http *http) return MOG_NEXT_CLOSE; } -static enum mog_next http_forward_in_progress(struct mog_fd *mfd) +static enum mog_next http_forward_in_progress(struct mog_fd *mfd, bool needq) { - enum mog_http_method method = mfd->as.http._p.http_method; + struct mog_http *http = &mfd->as.http; + enum mog_http_method method = http->_p.http_method; + struct mog_dev *dev; + + if (needq) { + dev = mog_dev_for(http->svc, http->_p.mog_devid, false); + if (dev && !mog_ioq_ready(&dev->ioq, mfd)) + /* no need to setup/stash rbuf, it's been done */ + return MOG_NEXT_IGNORE; + } if (method == MOG_HTTP_METHOD_GET) return mog_http_get_in_progress(mfd); @@ -156,7 +177,35 @@ static enum mog_next http_forward_in_progress(struct mog_fd *mfd) return mog_http_put_in_progress(mfd); } -static enum mog_next http_queue_step(struct mog_fd *mfd) +static enum mog_next http_run(struct mog_fd *mfd, struct mog_rbuf *rbuf, + char *buf, size_t buf_len) +{ + struct mog_http *http = &mfd->as.http; + + if (!http_process_client(mfd, rbuf, buf, buf_len)) + return MOG_NEXT_IGNORE; /* in ioq */ + if (http->wbuf == MOG_WR_ERROR) + return MOG_NEXT_CLOSE; + if (http->wbuf) { + http_defer_rbuf(http, rbuf, buf_len); + return MOG_NEXT_WAIT_WR; + } else if (http->forward) { + http_defer_rbuf(http, rbuf, buf_len); + return http_forward_in_progress(mfd, false); + } else if (!http->_p.persistent) { + return MOG_NEXT_CLOSE; + } else { + /* pipelined request */ + if (buf_len) + TRACE(CMOGSTORED_HTTP_REQ_BEGIN(true)); + + http_defer_rbuf(http, rbuf, buf_len); + mog_http_reset(http); + } + return MOG_NEXT_ACTIVE; +} + +static enum mog_next __http_queue_step(struct mog_fd *mfd) { struct mog_http *http = &mfd->as.http; struct mog_rbuf *rbuf; @@ -169,22 +218,28 @@ static enum mog_next http_queue_step(struct mog_fd *mfd) assert(mfd->fd >= 0 && "http fd is invalid"); if (http->wbuf) return http_wbuf_in_progress(http); - if (http->forward) return http_forward_in_progress(mfd); + if (http->forward) return http_forward_in_progress(mfd, true); /* we may have pipelined data in http->rbuf */ rbuf = http->rbuf ? http->rbuf : mog_rbuf_get(MOG_RBUF_BASE_SIZE); buf = rbuf->rptr; off = http->_p.buf_off; - assert(off < rbuf->rcapa && "offset is too big"); + assert(off >= 0 && "offset is negative"); + assert(off <= rbuf->rcapa && "offset is too big"); + if (http->rbuf) { - /* request got pipelined, resuming now */ buf_len = http->rbuf->rsize; + if (mog_ioq_unblock(mfd)) + return http_run(mfd, rbuf, buf, buf_len); + /* request got pipelined, resuming now */ + assert(off < rbuf->rcapa && "offset is too big"); assert(http->_p.buf_off <= buf_len && "bad offset from pipelining"); assert(buf_len <= http->rbuf->rcapa && "bad rsize stashed"); if (http->_p.buf_off < buf_len) goto parse; } + assert(off < rbuf->rcapa && "offset is too big"); reread: r = read(mfd->fd, buf + off, rbuf->rcapa - off); if (r > 0) { @@ -211,26 +266,7 @@ parse: off = http->_p.buf_off; goto reread; case MOG_PARSER_DONE: - http_process_client(mfd, buf, buf_len); - if (http->wbuf == MOG_WR_ERROR) - return MOG_NEXT_CLOSE; - if (http->wbuf) { - http_defer_rbuf(http, rbuf, buf_len); - return MOG_NEXT_WAIT_WR; - } else if (http->forward) { - http_defer_rbuf(http, rbuf, buf_len); - return http_forward_in_progress(mfd); - } else if (!http->_p.persistent) { - return MOG_NEXT_CLOSE; - } else { - /* pipelined request */ - if (buf_len) - TRACE(CMOGSTORED_HTTP_REQ_BEGIN(true)); - - http_defer_rbuf(http, rbuf, buf_len); - mog_http_reset(http); - } - return MOG_NEXT_ACTIVE; + return http_run(mfd, rbuf, buf, buf_len); } } else if (r == 0) { /* client shut down */ TRACE(CMOGSTORED_HTTP_RDCLOSE(buf_len)); @@ -269,14 +305,22 @@ err400: return MOG_NEXT_CLOSE; } +static enum mog_next http_queue_step(struct mog_fd *mfd) +{ + enum mog_next next = __http_queue_step(mfd); + + /* enqueue any pending waiters before we become enqueued ourselves */ + mog_ioq_next(NULL); + + return next; +} + enum mog_next mog_http_queue_step(struct mog_fd *mfd) { enum mog_next rv = http_queue_step(mfd); if (rv == MOG_NEXT_CLOSE) http_close(mfd); - assert(rv != MOG_NEXT_IGNORE && - "refusing to put HTTP client into ignore state"); return rv; } @@ -301,7 +345,7 @@ void mog_http_quit_step(struct mog_fd *mfd) case MOG_NEXT_ACTIVE: mog_activeq_push(q, mfd); return; case MOG_NEXT_WAIT_WR: mog_idleq_push(q, mfd, MOG_QEV_WR); return; case MOG_NEXT_IGNORE: - assert(0 && "refused to put HTTP client into ignore state"); + return; } } |