about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2013-06-29 03:14:54 +0000
committerEric Wong <normalperson@yhbt.net>2013-07-10 00:56:17 +0000
commit013e903340a75b12523bd795d15fe5f23d725be9 (patch)
treec1b6e9410cd768a0062827c0021bf72bf0ed3094
parentfef978104cf134dc6629115456b27dfa2856ded7 (diff)
downloadcmogstored-013e903340a75b12523bd795d15fe5f23d725be9.tar.gz
This will allow us to limit concurrency on a per-device basis with
limited impact on HTTP header reading/parsing.  This prevents
pathological slowness on a single device from bringing down an entire
host.  This also allows users to more safely run with fewer aio_threads
(e.g. 1:1 thread:device mapping) on fast devices with smaller low-level
(kernel/hardware) I/O queues.
-rw-r--r--cmogstored.h5
-rw-r--r--dev.c2
-rw-r--r--fdmap.c1
-rw-r--r--http.c106
-rw-r--r--ioq.c10
-rw-r--r--mgmt.c70
-rw-r--r--test/http.rb67
7 files changed, 210 insertions, 51 deletions
diff --git a/cmogstored.h b/cmogstored.h
index bb7455d..7d32817 100644
--- a/cmogstored.h
+++ b/cmogstored.h
@@ -108,6 +108,7 @@ struct mog_wbuf;
 struct mog_dev {
         dev_t st_dev;
         uint32_t devid;
+        struct mog_ioq ioq;
         struct mog_ioq fsckq;
 };
 
@@ -302,7 +303,8 @@ enum mog_fd_type {
 
 /* fdmap.c */
 struct mog_fd {
-        enum mog_fd_type fd_type;
+        enum mog_fd_type fd_type:16;
+        uint16_t ioq_blocked;
         int fd;
         pthread_spinlock_t expiring;
         union {
@@ -628,3 +630,4 @@ void mog_ioq_init(struct mog_ioq *, struct mog_svc *, size_t val);
 bool mog_ioq_ready(struct mog_ioq *, struct mog_fd *) MOG_CHECK;
 void mog_ioq_next(struct mog_ioq *);
 void mog_ioq_destroy(struct mog_ioq *);
+bool mog_ioq_unblock(struct mog_fd *);
diff --git a/dev.c b/dev.c
index aa5429f..75b12c5 100644
--- a/dev.c
+++ b/dev.c
@@ -32,6 +32,7 @@ static struct mog_dev *mog_dev_new(struct mog_svc *svc, uint32_t mog_devid)
         dev->devid = mog_devid;
         dev->st_dev = sb.st_dev;
         mog_ioq_init(&dev->fsckq, svc, 1);
+        mog_ioq_init(&dev->ioq, svc, svc->thr_per_dev);
 
         return dev;
 }
@@ -241,5 +242,6 @@ void mog_dev_free(void *ptr)
         struct mog_dev *dev = ptr;
 
         mog_ioq_destroy(&dev->fsckq);
+        mog_ioq_destroy(&dev->ioq);
         free(dev);
 }
diff --git a/fdmap.c b/fdmap.c
index 7ef8aff..14da274 100644
--- a/fdmap.c
+++ b/fdmap.c
@@ -193,6 +193,7 @@ struct mog_fd * mog_fd_init(int fd, enum mog_fd_type fd_type)
         assert(mfd->fd == fd && "mfd->fd incorrect");
         mfd_expiring_lock(mfd);
         mfd->fd_type = fd_type;
+        mfd->ioq_blocked = 0;
         mfd_expiring_unlock(mfd);
 
         return mfd;
diff --git a/http.c b/http.c
index 2f0b435..e440d2b 100644
--- a/http.c
+++ b/http.c
@@ -59,14 +59,24 @@ http_defer_rbuf(struct mog_http *http, struct mog_rbuf *rbuf, size_t buf_len)
         http->_p.buf_off = 0;
 }
 
-static void
-http_process_client(struct mog_fd *mfd, char *buf, size_t buf_len)
+static bool
+http_process_client(struct mog_fd *mfd, struct mog_rbuf *rbuf,
+                        char *buf, size_t buf_len)
 {
         struct mog_http *http = &mfd->as.http;
+        struct mog_dev *dev;
 
         assert(http->wbuf == NULL &&
                "processing client with buffered data");
 
+        dev = mog_dev_for(http->svc, http->_p.mog_devid, false);
+        if (dev && !mog_ioq_ready(&dev->ioq, mfd)) {
+                if (!http->rbuf)
+                        http->rbuf = mog_rbuf_detach(rbuf);
+                http->rbuf->rsize = buf_len;
+                return false;
+        }
+
         switch (http->_p.http_method) {
         case MOG_HTTP_METHOD_NONE: assert(0 && "BUG: unset HTTP method");
         case MOG_HTTP_METHOD_GET: mog_http_get_open(mfd, buf); break;
@@ -75,6 +85,8 @@ http_process_client(struct mog_fd *mfd, char *buf, size_t buf_len)
         case MOG_HTTP_METHOD_MKCOL: mog_http_mkcol(mfd, buf); break;
         case MOG_HTTP_METHOD_PUT: mog_http_put(mfd, buf, buf_len); break;
         }
+
+        return true;
 }
 
 MOG_NOINLINE static void http_close(struct mog_fd *mfd)
@@ -144,9 +156,18 @@ static enum mog_next http_wbuf_in_progress(struct mog_http *http)
         return MOG_NEXT_CLOSE;
 }
 
-static enum mog_next http_forward_in_progress(struct mog_fd *mfd)
+static enum mog_next http_forward_in_progress(struct mog_fd *mfd, bool needq)
 {
-        enum mog_http_method method = mfd->as.http._p.http_method;
+        struct mog_http *http = &mfd->as.http;
+        enum mog_http_method method = http->_p.http_method;
+        struct mog_dev *dev;
+
+        if (needq) {
+                dev = mog_dev_for(http->svc, http->_p.mog_devid, false);
+                if (dev && !mog_ioq_ready(&dev->ioq, mfd))
+                        /* no need to setup/stash rbuf, it's been done */
+                        return MOG_NEXT_IGNORE;
+        }
 
         if (method == MOG_HTTP_METHOD_GET)
                 return mog_http_get_in_progress(mfd);
@@ -156,7 +177,35 @@ static enum mog_next http_forward_in_progress(struct mog_fd *mfd)
         return mog_http_put_in_progress(mfd);
 }
 
-static enum mog_next http_queue_step(struct mog_fd *mfd)
+static enum mog_next http_run(struct mog_fd *mfd, struct mog_rbuf *rbuf,
+                        char *buf, size_t buf_len)
+{
+        struct mog_http *http = &mfd->as.http;
+
+        if (!http_process_client(mfd, rbuf, buf, buf_len))
+                return MOG_NEXT_IGNORE; /* in ioq */
+        if (http->wbuf == MOG_WR_ERROR)
+                return MOG_NEXT_CLOSE;
+        if (http->wbuf) {
+                http_defer_rbuf(http, rbuf, buf_len);
+                return MOG_NEXT_WAIT_WR;
+        } else if (http->forward) {
+                http_defer_rbuf(http, rbuf, buf_len);
+                return http_forward_in_progress(mfd, false);
+        } else if (!http->_p.persistent) {
+                return MOG_NEXT_CLOSE;
+        } else {
+                /* pipelined request */
+                if (buf_len)
+                        TRACE(CMOGSTORED_HTTP_REQ_BEGIN(true));
+
+                http_defer_rbuf(http, rbuf, buf_len);
+                mog_http_reset(http);
+        }
+        return MOG_NEXT_ACTIVE;
+}
+
+static enum mog_next __http_queue_step(struct mog_fd *mfd)
 {
         struct mog_http *http = &mfd->as.http;
         struct mog_rbuf *rbuf;
@@ -169,22 +218,28 @@ static enum mog_next http_queue_step(struct mog_fd *mfd)
         assert(mfd->fd >= 0 && "http fd is invalid");
 
         if (http->wbuf) return http_wbuf_in_progress(http);
-        if (http->forward) return http_forward_in_progress(mfd);
+        if (http->forward) return http_forward_in_progress(mfd, true);
 
         /* we may have pipelined data in http->rbuf */
         rbuf = http->rbuf ? http->rbuf : mog_rbuf_get(MOG_RBUF_BASE_SIZE);
         buf = rbuf->rptr;
         off = http->_p.buf_off;
-        assert(off < rbuf->rcapa && "offset is too big");
+        assert(off >= 0 && "offset is negative");
+        assert(off <= rbuf->rcapa && "offset is too big");
+
         if (http->rbuf) {
-                /* request got pipelined, resuming now */
                 buf_len = http->rbuf->rsize;
+                if (mog_ioq_unblock(mfd))
+                        return http_run(mfd, rbuf, buf, buf_len);
+                /* request got pipelined, resuming now */
+                assert(off < rbuf->rcapa && "offset is too big");
                 assert(http->_p.buf_off <= buf_len
                         && "bad offset from pipelining");
                 assert(buf_len <= http->rbuf->rcapa && "bad rsize stashed");
                 if (http->_p.buf_off < buf_len)
                         goto parse;
         }
+        assert(off < rbuf->rcapa && "offset is too big");
 reread:
         r = read(mfd->fd, buf + off, rbuf->rcapa - off);
         if (r > 0) {
@@ -211,26 +266,7 @@ parse:
                         off = http->_p.buf_off;
                         goto reread;
                 case MOG_PARSER_DONE:
-                        http_process_client(mfd, buf, buf_len);
-                        if (http->wbuf == MOG_WR_ERROR)
-                                return MOG_NEXT_CLOSE;
-                        if (http->wbuf) {
-                                http_defer_rbuf(http, rbuf, buf_len);
-                                return MOG_NEXT_WAIT_WR;
-                        } else if (http->forward) {
-                                http_defer_rbuf(http, rbuf, buf_len);
-                                return http_forward_in_progress(mfd);
-                        } else if (!http->_p.persistent) {
-                                return MOG_NEXT_CLOSE;
-                        } else {
-                                /* pipelined request */
-                                if (buf_len)
-                                        TRACE(CMOGSTORED_HTTP_REQ_BEGIN(true));
-
-                                http_defer_rbuf(http, rbuf, buf_len);
-                                mog_http_reset(http);
-                        }
-                        return MOG_NEXT_ACTIVE;
+                        return http_run(mfd, rbuf, buf, buf_len);
                 }
         } else if (r == 0) { /* client shut down */
                 TRACE(CMOGSTORED_HTTP_RDCLOSE(buf_len));
@@ -269,14 +305,22 @@ err400:
         return MOG_NEXT_CLOSE;
 }
 
+static enum mog_next http_queue_step(struct mog_fd *mfd)
+{
+        enum mog_next next = __http_queue_step(mfd);
+
+        /* enqueue any pending waiters before we become enqueued ourselves */
+        mog_ioq_next(NULL);
+
+        return next;
+}
+
 enum mog_next mog_http_queue_step(struct mog_fd *mfd)
 {
         enum mog_next rv = http_queue_step(mfd);
 
         if (rv == MOG_NEXT_CLOSE)
                 http_close(mfd);
-        assert(rv != MOG_NEXT_IGNORE &&
-               "refusing to put HTTP client into ignore state");
         return rv;
 }
 
@@ -301,7 +345,7 @@ void mog_http_quit_step(struct mog_fd *mfd)
         case MOG_NEXT_ACTIVE: mog_activeq_push(q, mfd); return;
         case MOG_NEXT_WAIT_WR: mog_idleq_push(q, mfd, MOG_QEV_WR); return;
         case MOG_NEXT_IGNORE:
-                assert(0 && "refused to put HTTP client into ignore state");
+                return;
         }
 }
 
diff --git a/ioq.c b/ioq.c
index 0b36e65..028b225 100644
--- a/ioq.c
+++ b/ioq.c
@@ -41,6 +41,7 @@ bool mog_ioq_ready(struct mog_ioq *ioq, struct mog_fd *client_mfd)
                 ioq->cur--;
                 mog_ioq_current = ioq;
         } else {
+                client_mfd->ioq_blocked = 1;
                 SIMPLEQ_INSERT_TAIL(&ioq->ioq_head, client_mfd, ioqent);
         }
 
@@ -88,3 +89,12 @@ void mog_ioq_destroy(struct mog_ioq *ioq)
 {
         CHECK(int, 0, pthread_mutex_destroy(&ioq->mtx));
 }
+
+bool mog_ioq_unblock(struct mog_fd *mfd)
+{
+        if (mfd->ioq_blocked == 0)
+                return false;
+
+        mfd->ioq_blocked = 0;
+        return true;
+}
diff --git a/mgmt.c b/mgmt.c
index 1aacb41..6b8d6a6 100644
--- a/mgmt.c
+++ b/mgmt.c
@@ -149,14 +149,33 @@ mgmt_defer_rbuf(struct mog_mgmt *mgmt, struct mog_rbuf *rbuf, size_t buf_len)
         mgmt->buf_off = 0;
 }
 
-static void mgmt_process_client(struct mog_fd *mfd, char *buf)
+static bool
+mgmt_process_client(struct mog_fd *mfd, struct mog_rbuf *rbuf,
+                        char *buf, size_t buf_len)
 {
         struct mog_mgmt *mgmt = &mfd->as.mgmt;
+        struct mog_dev *dev;
+        struct mog_ioq *ioq;
+
+        /* we handle non-filesystem-using commands inline in the parser */
+        if (mgmt->mgmt_method == MOG_MGMT_METHOD_NONE)
+                return true;
+
+        dev = mog_dev_for(mgmt->svc, mgmt->mog_devid, false);
+
+        if (dev) {
+                ioq = mgmt->prio == MOG_PRIO_NONE ? &dev->ioq : &dev->fsckq;
+                if (!mog_ioq_ready(ioq, mfd)) {
+                        if (!mgmt->rbuf)
+                                mgmt->rbuf = mog_rbuf_detach(rbuf);
+                        mgmt->rbuf->rsize = buf_len;
+                        return false;
+                }
+        }
 
         switch (mgmt->mgmt_method) {
         case MOG_MGMT_METHOD_NONE:
-                /* we handle filesystem-using commands inline in the parser */
-                return;
+                assert(0 && "we should never get here: MOG_MGMT_METHOD_NONE");
         case MOG_MGMT_METHOD_SIZE:
                 mog_mgmt_fn_size(mgmt, buf);
                 break;
@@ -165,6 +184,26 @@ static void mgmt_process_client(struct mog_fd *mfd, char *buf)
                 break;
         }
         mgmt->mgmt_method = MOG_MGMT_METHOD_NONE;
+        return true;
+}
+
+static enum mog_next mgmt_run(struct mog_fd *mfd, struct mog_rbuf *rbuf,
+                        char *buf, size_t buf_len)
+{
+        struct mog_mgmt *mgmt = &mfd->as.mgmt;
+
+        if (!mgmt_process_client(mfd, rbuf, buf, buf_len))
+                return MOG_NEXT_IGNORE; /* in ioq */
+        if (mgmt->wbuf == MOG_WR_ERROR)
+                return MOG_NEXT_CLOSE;
+        if (mgmt->forward == MOG_IOSTAT)
+                return mgmt_iostat_forever(mgmt);
+
+        /* stash unread portion in a new buffer */
+        mgmt_defer_rbuf(mgmt, rbuf, buf_len);
+        mog_mgmt_reset_parser(mgmt);
+        assert(mgmt->wbuf != MOG_WR_ERROR);
+        return mgmt->wbuf ? MOG_NEXT_WAIT_WR : MOG_NEXT_ACTIVE;
 }
 
 /*
@@ -191,12 +230,17 @@ static enum mog_next __mgmt_queue_step(struct mog_fd *mfd)
         buf = rbuf->rptr;
         off = mgmt->buf_off;
         assert(off >= 0 && "offset is negative");
-        assert(off < rbuf->rcapa && "offset is too big");
-        if (mgmt->rbuf && off == 0) {
-                /* request got "pipelined", resuming now */
+        assert(off <= rbuf->rcapa && "offset is too big");
+
+        if (mgmt->rbuf) {
                 buf_len = mgmt->rbuf->rsize;
-                goto parse;
+                if (mog_ioq_unblock(mfd))
+                        return mgmt_run(mfd, rbuf, buf, buf_len);
+                assert(off < rbuf->rcapa && "offset is too big");
+                if (off == 0) /* request got "pipelined", resuming now */
+                        goto parse;
         }
+        assert(off < rbuf->rcapa && "offset is too big");
 reread:
         r = read(mfd->fd, buf + off, rbuf->rcapa - off);
         if (r > 0) {
@@ -222,17 +266,7 @@ parse:
                         off = mgmt->buf_off;
                         goto reread;
                 case MOG_PARSER_DONE:
-                        mgmt_process_client(mfd, buf);
-                        if (mgmt->wbuf == MOG_WR_ERROR)
-                                return MOG_NEXT_CLOSE;
-                        if (mgmt->forward == MOG_IOSTAT)
-                                return mgmt_iostat_forever(mgmt);
-
-                        /* stash unread portion in a new buffer */
-                        mgmt_defer_rbuf(mgmt, rbuf, buf_len);
-                        mog_mgmt_reset_parser(mgmt);
-                        assert(mgmt->wbuf != MOG_WR_ERROR);
-                        return mgmt->wbuf ? MOG_NEXT_WAIT_WR : MOG_NEXT_ACTIVE;
+                        return mgmt_run(mfd, rbuf, buf, buf_len);
                 }
         } else if (r == 0) { /* client shut down */
                 TRACE(CMOGSTORED_MGMT_RDCLOSE(mfd, buf_len));
diff --git a/test/http.rb b/test/http.rb
index df51836..b1fd3b3 100644
--- a/test/http.rb
+++ b/test/http.rb
@@ -6,7 +6,7 @@ require 'test/test_helper'
 require 'digest/md5'
 require 'net/http'
 require 'time'
-$stderr.sync = $stdout.sync = Thread.abort_on_exception = true
+require 'timeout'
 
 class TestHTTP < Test::Unit::TestCase
   def setup
@@ -221,4 +221,69 @@ class TestHTTP < Test::Unit::TestCase
     buf = @client.readpartial(666)
     assert_match %r{\AHTTP/1\.1 400 Bad Request\r\n}, buf
   end
+
+  def test_iosem_concurrency
+    fifo = "#@tmpdir/dev666/fifo.%u.fid"
+    Dir.mkdir("#@tmpdir/dev666")
+    Dir.mkdir("#@tmpdir/dev333")
+    File.open("#@tmpdir/dev333/fast.fid", "w") { |fp| fp.write('.') }
+    File.open("#@tmpdir/dev666/fast.fid", "w") { |fp| fp.write('.') }
+
+    # create 10 threads which are blocked on the FIFO read
+    nr = 10
+    nr.times do |i|
+      assert system("mkfifo", fifo % i), "mkfifo #{fifo % i}"
+    end
+    threads = []
+    nr.times do |i|
+      thr = Thread.new(i) do |_i|
+        res = nil
+        Net::HTTP.start(@host, @port) do |http|
+          res = http.request(Net::HTTP::Get.new("/dev666/fifo.%u.fid" % _i))
+        end
+        res
+      end
+      threads << thr
+    end
+
+    # start a fast request to the bogged down device, it should get queued
+    # FIXME: still racy
+    t_yield
+    @client.write("GET /dev666/fast.fid HTTP/1.0\r\n")
+    t_yield
+    @client.write("\r\n")
+
+    # fast request to a free device
+    Net::HTTP.start(@host, @port) do |http|
+      res = http.request(Net::HTTP::Get.new("/dev333/fast.fid"))
+      assert_equal 200, res.code.to_i
+    end
+
+    # slow device should still be stuck
+    assert_equal nil, IO.select([@client], nil, nil, 2)
+    cr = Thread.new do
+      val = @client.read
+      [ val, Time.now ]
+    end
+    wr_start = Time.now
+
+    # wake up the blocked threads
+    writer = Thread.new do
+      nr.times do |i|
+        File.open(fifo % i, "w") { |fp| fp.write(i.to_s) }
+      end
+    end
+
+    # blocked threads return
+    threads.each do |t|
+      assert_equal 403, t.value.code.to_i
+    end
+    writer.join
+
+    # fast device should be readable, now
+    fast_response, fast_finish = Timeout.timeout(5) { cr.value }
+    assert_match(%r{\AHTTP/1\.1 200 OK}, fast_response)
+    assert_match(%r{\r\n\r\n\.\z}, fast_response)
+    assert_operator fast_finish, :>, wr_start
+  end
 end