From 013e903340a75b12523bd795d15fe5f23d725be9 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sat, 29 Jun 2013 03:14:54 +0000 Subject: ioq: implement and enable generic I/O queues This will allow us to limit concurrency on a per-device basis with limited impact on HTTP header reading/parsing. This prevents pathological slowness on a single device from bringing down an entire host. This also allows users to more safely run with fewer aio_threads (e.g. 1:1 thread:device mapping) on fast devices with smaller low-level (kernel/hardware) I/O queues. --- cmogstored.h | 5 ++- dev.c | 2 ++ fdmap.c | 1 + http.c | 106 ++++++++++++++++++++++++++++++++++++++++++----------------- ioq.c | 10 ++++++ mgmt.c | 70 +++++++++++++++++++++++++++++---------- test/http.rb | 67 ++++++++++++++++++++++++++++++++++++- 7 files changed, 210 insertions(+), 51 deletions(-) diff --git a/cmogstored.h b/cmogstored.h index bb7455d..7d32817 100644 --- a/cmogstored.h +++ b/cmogstored.h @@ -108,6 +108,7 @@ struct mog_wbuf; struct mog_dev { dev_t st_dev; uint32_t devid; + struct mog_ioq ioq; struct mog_ioq fsckq; }; @@ -302,7 +303,8 @@ enum mog_fd_type { /* fdmap.c */ struct mog_fd { - enum mog_fd_type fd_type; + enum mog_fd_type fd_type:16; + uint16_t ioq_blocked; int fd; pthread_spinlock_t expiring; union { @@ -628,3 +630,4 @@ void mog_ioq_init(struct mog_ioq *, struct mog_svc *, size_t val); bool mog_ioq_ready(struct mog_ioq *, struct mog_fd *) MOG_CHECK; void mog_ioq_next(struct mog_ioq *); void mog_ioq_destroy(struct mog_ioq *); +bool mog_ioq_unblock(struct mog_fd *); diff --git a/dev.c b/dev.c index aa5429f..75b12c5 100644 --- a/dev.c +++ b/dev.c @@ -32,6 +32,7 @@ static struct mog_dev *mog_dev_new(struct mog_svc *svc, uint32_t mog_devid) dev->devid = mog_devid; dev->st_dev = sb.st_dev; mog_ioq_init(&dev->fsckq, svc, 1); + mog_ioq_init(&dev->ioq, svc, svc->thr_per_dev); return dev; } @@ -241,5 +242,6 @@ void mog_dev_free(void *ptr) struct mog_dev *dev = ptr; mog_ioq_destroy(&dev->fsckq); + mog_ioq_destroy(&dev->ioq); free(dev); } diff --git a/fdmap.c b/fdmap.c index 7ef8aff..14da274 100644 --- a/fdmap.c +++ b/fdmap.c @@ -193,6 +193,7 @@ struct mog_fd * mog_fd_init(int fd, enum mog_fd_type fd_type) assert(mfd->fd == fd && "mfd->fd incorrect"); mfd_expiring_lock(mfd); mfd->fd_type = fd_type; + mfd->ioq_blocked = 0; mfd_expiring_unlock(mfd); return mfd; diff --git a/http.c b/http.c index 2f0b435..e440d2b 100644 --- a/http.c +++ b/http.c @@ -59,14 +59,24 @@ http_defer_rbuf(struct mog_http *http, struct mog_rbuf *rbuf, size_t buf_len) http->_p.buf_off = 0; } -static void -http_process_client(struct mog_fd *mfd, char *buf, size_t buf_len) +static bool +http_process_client(struct mog_fd *mfd, struct mog_rbuf *rbuf, + char *buf, size_t buf_len) { struct mog_http *http = &mfd->as.http; + struct mog_dev *dev; assert(http->wbuf == NULL && "processing client with buffered data"); + dev = mog_dev_for(http->svc, http->_p.mog_devid, false); + if (dev && !mog_ioq_ready(&dev->ioq, mfd)) { + if (!http->rbuf) + http->rbuf = mog_rbuf_detach(rbuf); + http->rbuf->rsize = buf_len; + return false; + } + switch (http->_p.http_method) { case MOG_HTTP_METHOD_NONE: assert(0 && "BUG: unset HTTP method"); case MOG_HTTP_METHOD_GET: mog_http_get_open(mfd, buf); break; @@ -75,6 +85,8 @@ http_process_client(struct mog_fd *mfd, char *buf, size_t buf_len) case MOG_HTTP_METHOD_MKCOL: mog_http_mkcol(mfd, buf); break; case MOG_HTTP_METHOD_PUT: mog_http_put(mfd, buf, buf_len); break; } + + return true; } MOG_NOINLINE static void http_close(struct mog_fd *mfd) @@ -144,9 +156,18 @@ static enum mog_next http_wbuf_in_progress(struct mog_http *http) return MOG_NEXT_CLOSE; } -static enum mog_next http_forward_in_progress(struct mog_fd *mfd) +static enum mog_next http_forward_in_progress(struct mog_fd *mfd, bool needq) { - enum mog_http_method method = mfd->as.http._p.http_method; + struct mog_http *http = &mfd->as.http; + enum mog_http_method method = http->_p.http_method; + struct mog_dev *dev; + + if (needq) { + dev = mog_dev_for(http->svc, http->_p.mog_devid, false); + if (dev && !mog_ioq_ready(&dev->ioq, mfd)) + /* no need to setup/stash rbuf, it's been done */ + return MOG_NEXT_IGNORE; + } if (method == MOG_HTTP_METHOD_GET) return mog_http_get_in_progress(mfd); @@ -156,7 +177,35 @@ static enum mog_next http_forward_in_progress(struct mog_fd *mfd) return mog_http_put_in_progress(mfd); } -static enum mog_next http_queue_step(struct mog_fd *mfd) +static enum mog_next http_run(struct mog_fd *mfd, struct mog_rbuf *rbuf, + char *buf, size_t buf_len) +{ + struct mog_http *http = &mfd->as.http; + + if (!http_process_client(mfd, rbuf, buf, buf_len)) + return MOG_NEXT_IGNORE; /* in ioq */ + if (http->wbuf == MOG_WR_ERROR) + return MOG_NEXT_CLOSE; + if (http->wbuf) { + http_defer_rbuf(http, rbuf, buf_len); + return MOG_NEXT_WAIT_WR; + } else if (http->forward) { + http_defer_rbuf(http, rbuf, buf_len); + return http_forward_in_progress(mfd, false); + } else if (!http->_p.persistent) { + return MOG_NEXT_CLOSE; + } else { + /* pipelined request */ + if (buf_len) + TRACE(CMOGSTORED_HTTP_REQ_BEGIN(true)); + + http_defer_rbuf(http, rbuf, buf_len); + mog_http_reset(http); + } + return MOG_NEXT_ACTIVE; +} + +static enum mog_next __http_queue_step(struct mog_fd *mfd) { struct mog_http *http = &mfd->as.http; struct mog_rbuf *rbuf; @@ -169,22 +218,28 @@ static enum mog_next http_queue_step(struct mog_fd *mfd) assert(mfd->fd >= 0 && "http fd is invalid"); if (http->wbuf) return http_wbuf_in_progress(http); - if (http->forward) return http_forward_in_progress(mfd); + if (http->forward) return http_forward_in_progress(mfd, true); /* we may have pipelined data in http->rbuf */ rbuf = http->rbuf ? http->rbuf : mog_rbuf_get(MOG_RBUF_BASE_SIZE); buf = rbuf->rptr; off = http->_p.buf_off; - assert(off < rbuf->rcapa && "offset is too big"); + assert(off >= 0 && "offset is negative"); + assert(off <= rbuf->rcapa && "offset is too big"); + if (http->rbuf) { - /* request got pipelined, resuming now */ buf_len = http->rbuf->rsize; + if (mog_ioq_unblock(mfd)) + return http_run(mfd, rbuf, buf, buf_len); + /* request got pipelined, resuming now */ + assert(off < rbuf->rcapa && "offset is too big"); assert(http->_p.buf_off <= buf_len && "bad offset from pipelining"); assert(buf_len <= http->rbuf->rcapa && "bad rsize stashed"); if (http->_p.buf_off < buf_len) goto parse; } + assert(off < rbuf->rcapa && "offset is too big"); reread: r = read(mfd->fd, buf + off, rbuf->rcapa - off); if (r > 0) { @@ -211,26 +266,7 @@ parse: off = http->_p.buf_off; goto reread; case MOG_PARSER_DONE: - http_process_client(mfd, buf, buf_len); - if (http->wbuf == MOG_WR_ERROR) - return MOG_NEXT_CLOSE; - if (http->wbuf) { - http_defer_rbuf(http, rbuf, buf_len); - return MOG_NEXT_WAIT_WR; - } else if (http->forward) { - http_defer_rbuf(http, rbuf, buf_len); - return http_forward_in_progress(mfd); - } else if (!http->_p.persistent) { - return MOG_NEXT_CLOSE; - } else { - /* pipelined request */ - if (buf_len) - TRACE(CMOGSTORED_HTTP_REQ_BEGIN(true)); - - http_defer_rbuf(http, rbuf, buf_len); - mog_http_reset(http); - } - return MOG_NEXT_ACTIVE; + return http_run(mfd, rbuf, buf, buf_len); } } else if (r == 0) { /* client shut down */ TRACE(CMOGSTORED_HTTP_RDCLOSE(buf_len)); @@ -269,14 +305,22 @@ err400: return MOG_NEXT_CLOSE; } +static enum mog_next http_queue_step(struct mog_fd *mfd) +{ + enum mog_next next = __http_queue_step(mfd); + + /* enqueue any pending waiters before we become enqueued ourselves */ + mog_ioq_next(NULL); + + return next; +} + enum mog_next mog_http_queue_step(struct mog_fd *mfd) { enum mog_next rv = http_queue_step(mfd); if (rv == MOG_NEXT_CLOSE) http_close(mfd); - assert(rv != MOG_NEXT_IGNORE && - "refusing to put HTTP client into ignore state"); return rv; } @@ -301,7 +345,7 @@ void mog_http_quit_step(struct mog_fd *mfd) case MOG_NEXT_ACTIVE: mog_activeq_push(q, mfd); return; case MOG_NEXT_WAIT_WR: mog_idleq_push(q, mfd, MOG_QEV_WR); return; case MOG_NEXT_IGNORE: - assert(0 && "refused to put HTTP client into ignore state"); + return; } } diff --git a/ioq.c b/ioq.c index 0b36e65..028b225 100644 --- a/ioq.c +++ b/ioq.c @@ -41,6 +41,7 @@ bool mog_ioq_ready(struct mog_ioq *ioq, struct mog_fd *client_mfd) ioq->cur--; mog_ioq_current = ioq; } else { + client_mfd->ioq_blocked = 1; SIMPLEQ_INSERT_TAIL(&ioq->ioq_head, client_mfd, ioqent); } @@ -88,3 +89,12 @@ void mog_ioq_destroy(struct mog_ioq *ioq) { CHECK(int, 0, pthread_mutex_destroy(&ioq->mtx)); } + +bool mog_ioq_unblock(struct mog_fd *mfd) +{ + if (mfd->ioq_blocked == 0) + return false; + + mfd->ioq_blocked = 0; + return true; +} diff --git a/mgmt.c b/mgmt.c index 1aacb41..6b8d6a6 100644 --- a/mgmt.c +++ b/mgmt.c @@ -149,14 +149,33 @@ mgmt_defer_rbuf(struct mog_mgmt *mgmt, struct mog_rbuf *rbuf, size_t buf_len) mgmt->buf_off = 0; } -static void mgmt_process_client(struct mog_fd *mfd, char *buf) +static bool +mgmt_process_client(struct mog_fd *mfd, struct mog_rbuf *rbuf, + char *buf, size_t buf_len) { struct mog_mgmt *mgmt = &mfd->as.mgmt; + struct mog_dev *dev; + struct mog_ioq *ioq; + + /* we handle non-filesystem-using commands inline in the parser */ + if (mgmt->mgmt_method == MOG_MGMT_METHOD_NONE) + return true; + + dev = mog_dev_for(mgmt->svc, mgmt->mog_devid, false); + + if (dev) { + ioq = mgmt->prio == MOG_PRIO_NONE ? &dev->ioq : &dev->fsckq; + if (!mog_ioq_ready(ioq, mfd)) { + if (!mgmt->rbuf) + mgmt->rbuf = mog_rbuf_detach(rbuf); + mgmt->rbuf->rsize = buf_len; + return false; + } + } switch (mgmt->mgmt_method) { case MOG_MGMT_METHOD_NONE: - /* we handle filesystem-using commands inline in the parser */ - return; + assert(0 && "we should never get here: MOG_MGMT_METHOD_NONE"); case MOG_MGMT_METHOD_SIZE: mog_mgmt_fn_size(mgmt, buf); break; @@ -165,6 +184,26 @@ static void mgmt_process_client(struct mog_fd *mfd, char *buf) break; } mgmt->mgmt_method = MOG_MGMT_METHOD_NONE; + return true; +} + +static enum mog_next mgmt_run(struct mog_fd *mfd, struct mog_rbuf *rbuf, + char *buf, size_t buf_len) +{ + struct mog_mgmt *mgmt = &mfd->as.mgmt; + + if (!mgmt_process_client(mfd, rbuf, buf, buf_len)) + return MOG_NEXT_IGNORE; /* in ioq */ + if (mgmt->wbuf == MOG_WR_ERROR) + return MOG_NEXT_CLOSE; + if (mgmt->forward == MOG_IOSTAT) + return mgmt_iostat_forever(mgmt); + + /* stash unread portion in a new buffer */ + mgmt_defer_rbuf(mgmt, rbuf, buf_len); + mog_mgmt_reset_parser(mgmt); + assert(mgmt->wbuf != MOG_WR_ERROR); + return mgmt->wbuf ? MOG_NEXT_WAIT_WR : MOG_NEXT_ACTIVE; } /* @@ -191,12 +230,17 @@ static enum mog_next __mgmt_queue_step(struct mog_fd *mfd) buf = rbuf->rptr; off = mgmt->buf_off; assert(off >= 0 && "offset is negative"); - assert(off < rbuf->rcapa && "offset is too big"); - if (mgmt->rbuf && off == 0) { - /* request got "pipelined", resuming now */ + assert(off <= rbuf->rcapa && "offset is too big"); + + if (mgmt->rbuf) { buf_len = mgmt->rbuf->rsize; - goto parse; + if (mog_ioq_unblock(mfd)) + return mgmt_run(mfd, rbuf, buf, buf_len); + assert(off < rbuf->rcapa && "offset is too big"); + if (off == 0) /* request got "pipelined", resuming now */ + goto parse; } + assert(off < rbuf->rcapa && "offset is too big"); reread: r = read(mfd->fd, buf + off, rbuf->rcapa - off); if (r > 0) { @@ -222,17 +266,7 @@ parse: off = mgmt->buf_off; goto reread; case MOG_PARSER_DONE: - mgmt_process_client(mfd, buf); - if (mgmt->wbuf == MOG_WR_ERROR) - return MOG_NEXT_CLOSE; - if (mgmt->forward == MOG_IOSTAT) - return mgmt_iostat_forever(mgmt); - - /* stash unread portion in a new buffer */ - mgmt_defer_rbuf(mgmt, rbuf, buf_len); - mog_mgmt_reset_parser(mgmt); - assert(mgmt->wbuf != MOG_WR_ERROR); - return mgmt->wbuf ? MOG_NEXT_WAIT_WR : MOG_NEXT_ACTIVE; + return mgmt_run(mfd, rbuf, buf, buf_len); } } else if (r == 0) { /* client shut down */ TRACE(CMOGSTORED_MGMT_RDCLOSE(mfd, buf_len)); diff --git a/test/http.rb b/test/http.rb index df51836..b1fd3b3 100644 --- a/test/http.rb +++ b/test/http.rb @@ -6,7 +6,7 @@ require 'test/test_helper' require 'digest/md5' require 'net/http' require 'time' -$stderr.sync = $stdout.sync = Thread.abort_on_exception = true +require 'timeout' class TestHTTP < Test::Unit::TestCase def setup @@ -221,4 +221,69 @@ class TestHTTP < Test::Unit::TestCase buf = @client.readpartial(666) assert_match %r{\AHTTP/1\.1 400 Bad Request\r\n}, buf end + + def test_iosem_concurrency + fifo = "#@tmpdir/dev666/fifo.%u.fid" + Dir.mkdir("#@tmpdir/dev666") + Dir.mkdir("#@tmpdir/dev333") + File.open("#@tmpdir/dev333/fast.fid", "w") { |fp| fp.write('.') } + File.open("#@tmpdir/dev666/fast.fid", "w") { |fp| fp.write('.') } + + # create 10 threads which are blocked on the FIFO read + nr = 10 + nr.times do |i| + assert system("mkfifo", fifo % i), "mkfifo #{fifo % i}" + end + threads = [] + nr.times do |i| + thr = Thread.new(i) do |_i| + res = nil + Net::HTTP.start(@host, @port) do |http| + res = http.request(Net::HTTP::Get.new("/dev666/fifo.%u.fid" % _i)) + end + res + end + threads << thr + end + + # start a fast request to the bogged down device, it should get queued + # FIXME: still racy + t_yield + @client.write("GET /dev666/fast.fid HTTP/1.0\r\n") + t_yield + @client.write("\r\n") + + # fast request to a free device + Net::HTTP.start(@host, @port) do |http| + res = http.request(Net::HTTP::Get.new("/dev333/fast.fid")) + assert_equal 200, res.code.to_i + end + + # slow device should still be stuck + assert_equal nil, IO.select([@client], nil, nil, 2) + cr = Thread.new do + val = @client.read + [ val, Time.now ] + end + wr_start = Time.now + + # wake up the blocked threads + writer = Thread.new do + nr.times do |i| + File.open(fifo % i, "w") { |fp| fp.write(i.to_s) } + end + end + + # blocked threads return + threads.each do |t| + assert_equal 403, t.value.code.to_i + end + writer.join + + # fast device should be readable, now + fast_response, fast_finish = Timeout.timeout(5) { cr.value } + assert_match(%r{\AHTTP/1\.1 200 OK}, fast_response) + assert_match(%r{\r\n\r\n\.\z}, fast_response) + assert_operator fast_finish, :>, wr_start + end end -- cgit v1.2.3-24-ge0c7