about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2013-07-11 22:15:56 +0000
committerEric Wong <normalperson@yhbt.net>2013-07-12 00:21:06 +0000
commite50365f275ada4afcd5f25f2ac3328e341a79d71 (patch)
tree0e0d1b5ee52424ab2c0fa125b4f31fedbd83e2a4
parentf83d0466afc32542f3f4ff962105c817a1be2c96 (diff)
downloadcmogstored-e50365f275ada4afcd5f25f2ac3328e341a79d71.tar.gz
Users reducing or increasing thread counts should increase
ioq capacity, otherwise there's no point in having more or
less threads if they are synched to the ioq capacity.
-rw-r--r--cmogstored.h3
-rw-r--r--dev.c15
-rw-r--r--ioq.c18
-rw-r--r--svc.c3
-rw-r--r--svc_dev.c32
-rw-r--r--test/mgmt_auto_adjust.rb99
-rw-r--r--test/ruby.mk3
7 files changed, 172 insertions, 1 deletions
diff --git a/cmogstored.h b/cmogstored.h
index 487a70d..182568c 100644
--- a/cmogstored.h
+++ b/cmogstored.h
@@ -374,6 +374,7 @@ int mog_dev_mkusage(const struct mog_dev *, struct mog_svc *);
 size_t mog_dev_hash(const void *, size_t tablesize);
 bool mog_dev_cmp(const void *a, const void *b);
 void mog_dev_free(void *devptr);
+bool mog_dev_user_rescale_i(void *devp, void *svcp);
 
 /* valid_path.rl */
 int mog_valid_path(const char *buf, size_t len);
@@ -396,6 +397,7 @@ bool mog_svc_devstats_broadcast(void *svc, void *ignored);
 void mog_svc_devstats_subscribe(struct mog_mgmt *);
 void mog_svc_dev_shutdown(void);
 void mog_mkusage_all(void);
+void mog_svc_dev_user_rescale(struct mog_svc *, size_t ndev_new);
 
 /* cloexec_detect.c */
 extern bool mog_cloexec_atomic;
@@ -639,5 +641,6 @@ void mog_ioq_init(struct mog_ioq *, struct mog_svc *, size_t val);
 bool mog_ioq_ready(struct mog_ioq *, struct mog_fd *) MOG_CHECK;
 bool mog_ioq_contended(void) MOG_CHECK;
 void mog_ioq_next(struct mog_ioq *);
+void mog_ioq_adjust(struct mog_ioq *, unsigned value);
 void mog_ioq_destroy(struct mog_ioq *);
 bool mog_ioq_unblock(struct mog_fd *);
diff --git a/dev.c b/dev.c
index 75b12c5..64deec4 100644
--- a/dev.c
+++ b/dev.c
@@ -245,3 +245,18 @@ void mog_dev_free(void *ptr)
         mog_ioq_destroy(&dev->ioq);
         free(dev);
 }
+
+/*
+ * Only called by the main/notify thread as a hash iterator function
+ * This increases or decreases the capacity of a given device if a
+ * a sidechannel user changes the worker thread pool size.
+ */
+bool mog_dev_user_rescale_i(void *devp, void *svcp)
+{
+        struct mog_dev *dev = devp;
+        struct mog_svc *svc = svcp;
+
+        mog_ioq_adjust(&dev->ioq, svc->thr_per_dev);
+
+        return true; /* continue iteration */
+}
diff --git a/ioq.c b/ioq.c
index 829e00d..0a32c71 100644
--- a/ioq.c
+++ b/ioq.c
@@ -90,6 +90,7 @@ void mog_ioq_next(struct mog_ioq *check_ioq)
         } else {
                 /* mog_ioq_adjust was called and lowered our capacity */
                 mog_ioq_current->cur--;
+                ioq_set_contended(mog_ioq_current);
         }
         CHECK(int, 0, pthread_mutex_unlock(&mog_ioq_current->mtx));
 
@@ -121,6 +122,23 @@ bool mog_ioq_contended(void)
         return __sync_bool_compare_and_swap(&cur->contended, true, false);
 }
 
+/*
+ * called by the main/notify thread if the user has ever set
+ * "server aio_threads = XX" via sidechannel.
+ */
+void mog_ioq_adjust(struct mog_ioq *ioq, unsigned value)
+{
+        assert(value > 0 && "mog_ioq_adjust value must be non-zero");
+        CHECK(int, 0, pthread_mutex_lock(&ioq->mtx));
+        ioq->max = value;
+
+        /* capacity reduced, get some threads to yield themselves */
+        if (ioq->cur > ioq->max)
+                ioq_set_contended(ioq);
+
+        CHECK(int, 0, pthread_mutex_unlock(&ioq->mtx));
+}
+
 void mog_ioq_destroy(struct mog_ioq *ioq)
 {
         CHECK(int, 0, pthread_mutex_destroy(&ioq->mtx));
diff --git a/svc.c b/svc.c
index 24d90fd..47fb9f7 100644
--- a/svc.c
+++ b/svc.c
@@ -169,6 +169,7 @@ void mog_svc_thrpool_rescale(struct mog_svc *svc, size_t ndev_new)
 
         /* respect user-setting */
         if (svc->user_set_aio_threads) {
+                mog_svc_dev_user_rescale(svc, ndev_new);
                 if (tp->n_threads >= ndev_new)
                         return;
 
@@ -291,6 +292,8 @@ void mog_svc_aio_threads_handler(void)
 
                 syslog(LOG_INFO, "server aio_threads=%zu", req_size);
                 svc->user_set_aio_threads = req_size;
+                if (svc->nmogdev)
+                        mog_svc_dev_user_rescale(svc, svc->nmogdev);
                 mog_thrpool_set_size(&svc->queue->thrpool, req_size);
         }
 }
diff --git a/svc_dev.c b/svc_dev.c
index 6fe3396..e66ca26 100644
--- a/svc_dev.c
+++ b/svc_dev.c
@@ -293,3 +293,35 @@ void mog_mkusage_all(void)
 {
         mog_svc_each(svc_mkusage_each, NULL);
 }
+
+/* we should never set ioq_max == 0 */
+static void svc_rescale_warn_fix_capa(struct mog_svc *svc, size_t ndev_new)
+{
+        if (svc->thr_per_dev != 0)
+                return;
+
+        syslog(LOG_WARNING,
+               "serving %s with fewer aio_threads(%zu) than devices(%zu)",
+               svc->docroot, svc->user_set_aio_threads, ndev_new);
+        syslog(LOG_WARNING,
+               "set \"server aio_threads = %zu\" or higher via sidechannel",
+               ndev_new);
+
+        svc->thr_per_dev = 1;
+}
+
+/* rescaling only happens in the main thread */
+void mog_svc_dev_user_rescale(struct mog_svc *svc, size_t ndev_new)
+{
+        assert(svc->user_set_aio_threads &&
+               "user did not set aio_threads via sidechannel");
+
+        svc->thr_per_dev = svc->user_set_aio_threads / ndev_new;
+
+        svc_rescale_warn_fix_capa(svc, ndev_new);
+
+        /* iterate through each device of this svc */
+        CHECK(int, 0, pthread_mutex_lock(&svc->by_mog_devid_lock));
+        hash_do_for_each(svc->by_mog_devid, mog_dev_user_rescale_i, svc);
+        CHECK(int, 0, pthread_mutex_unlock(&svc->by_mog_devid_lock));
+}
diff --git a/test/mgmt_auto_adjust.rb b/test/mgmt_auto_adjust.rb
new file mode 100644
index 0000000..f2e01d6
--- /dev/null
+++ b/test/mgmt_auto_adjust.rb
@@ -0,0 +1,99 @@
+#!/usr/bin/env ruby
+# -*- encoding: binary -*-
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (see COPYING for details)
+require 'test/test_helper'
+
+class TestMgmtAutoAdjust < Test::Unit::TestCase
+  def setup
+    @tmpdir = Dir.mktmpdir('cmogstored-mgmt-test')
+    @to_close = []
+    @host = TEST_HOST
+    srv = TCPServer.new(@host, 0)
+    @port = srv.addr[1]
+    srv.close
+
+    http = TCPServer.new(@host, 0)
+    @httpport = http.addr[1]
+    http.close
+
+    @err = Tempfile.new("stderr")
+    Dir.mkdir("#@tmpdir/dev666")
+    cmd = [ "cmogstored", "--docroot=#@tmpdir", "--mgmtlisten=#@host:#@port",
+            "--httplisten=#@host:#@httpport",
+            "--maxconns=500" ]
+    vg = ENV["VALGRIND"] and cmd = vg.split(/\s+/).concat(cmd)
+    @pid = fork {
+      ENV["MOG_DISK_USAGE_INTERVAL"] = "1"
+      $stderr.reopen(@err)
+      @err.close
+      exec(*cmd)
+    }
+    @client = get_client
+  end
+
+  def teardown
+    Process.kill(:QUIT, @pid) rescue nil
+    _, status = Process.waitpid2(@pid)
+    @to_close.each { |io| io.close unless io.closed? }
+    FileUtils.rm_rf(@tmpdir)
+    @err.rewind
+    # $stderr.write(@err.read)
+    assert status.success?, status.inspect
+  end
+
+  def wait_for_seen_dev
+    @client.write "watch\r\n"
+    seen_dev = false
+    while line = @client.gets
+      line.strip!
+      case line
+      when /^666\t/
+        seen_dev = true
+      when "."
+        break if seen_dev
+      end
+    end
+  end
+
+  def test_aio_threads_auto_adjust
+    wait_for_seen_dev
+    t_yield # wait for threads to spawn
+    taskdir = "/proc/#@pid/task"
+    glob = "#{taskdir}/*"
+    prev_threads = Dir[glob].size if File.directory?(taskdir)
+    if RUBY_PLATFORM =~ /linux/
+      assert File.directory?(taskdir), "/proc not mounted on Linux?"
+    end
+
+    File.directory?(taskdir) or return
+    Dir.mkdir("#@tmpdir/dev333")
+    stop = Time.now + 10
+    while prev_threads == Dir[glob].size && Time.now < stop
+      sleep(0.1)
+    end
+    cur_threads = Dir[glob].size
+    assert_operator cur_threads, :>, prev_threads,
+           "prev_threads=#{cur_threads} > prev_threads=#{prev_threads}"
+    assert_match(%r{updating server aio_threads=20}, File.read(@err.path))
+  end
+
+  def test_aio_threads_auto_adjust_warn
+    @client.write "server aio_threads = 1\r\n"
+    assert_equal "\r\n", @client.gets
+    stop = Time.now + 60
+    expect = "server aio_threads=1"
+    while (! File.read(@err.path).include?(expect) && Time.now < stop)
+      sleep 0.1
+    end
+    assert(File.read(@err.path).include?(expect), File.read(@err.path))
+
+    Dir.mkdir("#@tmpdir/dev333")
+    warning = "fewer aio_threads(1) than devices(2)"
+    stop = Time.now + 10
+    while (! File.read(@err.path).include?(warning)) && Time.now < stop
+      sleep 0.1
+    end
+    assert(File.read(@err.path).include?(warning), File.read(@err.path))
+  end
+end
diff --git a/test/ruby.mk b/test/ruby.mk
index c82a1ec..b089db8 100644
--- a/test/ruby.mk
+++ b/test/ruby.mk
@@ -3,5 +3,6 @@ RB_TESTS_FAST = test/cmogstored-cfg.rb test/http_dav.rb test/http_range.rb \
   test/http_put6_fail.rb test/epoll_enospc.rb
 RB_TESTS_SLOW = test/mgmt-usage.rb test/mgmt.rb test/mgmt-iostat.rb \
  test/http.rb test/http_put_slow.rb test/http_chunked_put.rb \
- test/graceful_quit.rb test/http_idle_expire.rb
+ test/graceful_quit.rb test/http_idle_expire.rb \
+ test/mgmt_auto_adjust.rb
 RB_TESTS = $(RB_TESTS_FAST) $(RB_TESTS_SLOW)