From e50365f275ada4afcd5f25f2ac3328e341a79d71 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 11 Jul 2013 22:15:56 +0000 Subject: ioq: rescale to match user-set aio_threads values Users reducing or increasing thread counts should increase ioq capacity, otherwise there's no point in having more or less threads if they are synched to the ioq capacity. --- cmogstored.h | 3 ++ dev.c | 15 ++++++++ ioq.c | 18 +++++++++ svc.c | 3 ++ svc_dev.c | 32 ++++++++++++++++ test/mgmt_auto_adjust.rb | 99 ++++++++++++++++++++++++++++++++++++++++++++++++ test/ruby.mk | 3 +- 7 files changed, 172 insertions(+), 1 deletion(-) create mode 100644 test/mgmt_auto_adjust.rb diff --git a/cmogstored.h b/cmogstored.h index 487a70d..182568c 100644 --- a/cmogstored.h +++ b/cmogstored.h @@ -374,6 +374,7 @@ int mog_dev_mkusage(const struct mog_dev *, struct mog_svc *); size_t mog_dev_hash(const void *, size_t tablesize); bool mog_dev_cmp(const void *a, const void *b); void mog_dev_free(void *devptr); +bool mog_dev_user_rescale_i(void *devp, void *svcp); /* valid_path.rl */ int mog_valid_path(const char *buf, size_t len); @@ -396,6 +397,7 @@ bool mog_svc_devstats_broadcast(void *svc, void *ignored); void mog_svc_devstats_subscribe(struct mog_mgmt *); void mog_svc_dev_shutdown(void); void mog_mkusage_all(void); +void mog_svc_dev_user_rescale(struct mog_svc *, size_t ndev_new); /* cloexec_detect.c */ extern bool mog_cloexec_atomic; @@ -639,5 +641,6 @@ void mog_ioq_init(struct mog_ioq *, struct mog_svc *, size_t val); bool mog_ioq_ready(struct mog_ioq *, struct mog_fd *) MOG_CHECK; bool mog_ioq_contended(void) MOG_CHECK; void mog_ioq_next(struct mog_ioq *); +void mog_ioq_adjust(struct mog_ioq *, unsigned value); void mog_ioq_destroy(struct mog_ioq *); bool mog_ioq_unblock(struct mog_fd *); diff --git a/dev.c b/dev.c index 75b12c5..64deec4 100644 --- a/dev.c +++ b/dev.c @@ -245,3 +245,18 @@ void mog_dev_free(void *ptr) mog_ioq_destroy(&dev->ioq); free(dev); } + +/* + * Only called by the main/notify thread as a hash iterator function + * This increases or decreases the capacity of a given device if a + * a sidechannel user changes the worker thread pool size. + */ +bool mog_dev_user_rescale_i(void *devp, void *svcp) +{ + struct mog_dev *dev = devp; + struct mog_svc *svc = svcp; + + mog_ioq_adjust(&dev->ioq, svc->thr_per_dev); + + return true; /* continue iteration */ +} diff --git a/ioq.c b/ioq.c index 829e00d..0a32c71 100644 --- a/ioq.c +++ b/ioq.c @@ -90,6 +90,7 @@ void mog_ioq_next(struct mog_ioq *check_ioq) } else { /* mog_ioq_adjust was called and lowered our capacity */ mog_ioq_current->cur--; + ioq_set_contended(mog_ioq_current); } CHECK(int, 0, pthread_mutex_unlock(&mog_ioq_current->mtx)); @@ -121,6 +122,23 @@ bool mog_ioq_contended(void) return __sync_bool_compare_and_swap(&cur->contended, true, false); } +/* + * called by the main/notify thread if the user has ever set + * "server aio_threads = XX" via sidechannel. + */ +void mog_ioq_adjust(struct mog_ioq *ioq, unsigned value) +{ + assert(value > 0 && "mog_ioq_adjust value must be non-zero"); + CHECK(int, 0, pthread_mutex_lock(&ioq->mtx)); + ioq->max = value; + + /* capacity reduced, get some threads to yield themselves */ + if (ioq->cur > ioq->max) + ioq_set_contended(ioq); + + CHECK(int, 0, pthread_mutex_unlock(&ioq->mtx)); +} + void mog_ioq_destroy(struct mog_ioq *ioq) { CHECK(int, 0, pthread_mutex_destroy(&ioq->mtx)); diff --git a/svc.c b/svc.c index 24d90fd..47fb9f7 100644 --- a/svc.c +++ b/svc.c @@ -169,6 +169,7 @@ void mog_svc_thrpool_rescale(struct mog_svc *svc, size_t ndev_new) /* respect user-setting */ if (svc->user_set_aio_threads) { + mog_svc_dev_user_rescale(svc, ndev_new); if (tp->n_threads >= ndev_new) return; @@ -291,6 +292,8 @@ void mog_svc_aio_threads_handler(void) syslog(LOG_INFO, "server aio_threads=%zu", req_size); svc->user_set_aio_threads = req_size; + if (svc->nmogdev) + mog_svc_dev_user_rescale(svc, svc->nmogdev); mog_thrpool_set_size(&svc->queue->thrpool, req_size); } } diff --git a/svc_dev.c b/svc_dev.c index 6fe3396..e66ca26 100644 --- a/svc_dev.c +++ b/svc_dev.c @@ -293,3 +293,35 @@ void mog_mkusage_all(void) { mog_svc_each(svc_mkusage_each, NULL); } + +/* we should never set ioq_max == 0 */ +static void svc_rescale_warn_fix_capa(struct mog_svc *svc, size_t ndev_new) +{ + if (svc->thr_per_dev != 0) + return; + + syslog(LOG_WARNING, + "serving %s with fewer aio_threads(%zu) than devices(%zu)", + svc->docroot, svc->user_set_aio_threads, ndev_new); + syslog(LOG_WARNING, + "set \"server aio_threads = %zu\" or higher via sidechannel", + ndev_new); + + svc->thr_per_dev = 1; +} + +/* rescaling only happens in the main thread */ +void mog_svc_dev_user_rescale(struct mog_svc *svc, size_t ndev_new) +{ + assert(svc->user_set_aio_threads && + "user did not set aio_threads via sidechannel"); + + svc->thr_per_dev = svc->user_set_aio_threads / ndev_new; + + svc_rescale_warn_fix_capa(svc, ndev_new); + + /* iterate through each device of this svc */ + CHECK(int, 0, pthread_mutex_lock(&svc->by_mog_devid_lock)); + hash_do_for_each(svc->by_mog_devid, mog_dev_user_rescale_i, svc); + CHECK(int, 0, pthread_mutex_unlock(&svc->by_mog_devid_lock)); +} diff --git a/test/mgmt_auto_adjust.rb b/test/mgmt_auto_adjust.rb new file mode 100644 index 0000000..f2e01d6 --- /dev/null +++ b/test/mgmt_auto_adjust.rb @@ -0,0 +1,99 @@ +#!/usr/bin/env ruby +# -*- encoding: binary -*- +# Copyright (C) 2013, Eric Wong +# License: GPLv3 or later (see COPYING for details) +require 'test/test_helper' + +class TestMgmtAutoAdjust < Test::Unit::TestCase + def setup + @tmpdir = Dir.mktmpdir('cmogstored-mgmt-test') + @to_close = [] + @host = TEST_HOST + srv = TCPServer.new(@host, 0) + @port = srv.addr[1] + srv.close + + http = TCPServer.new(@host, 0) + @httpport = http.addr[1] + http.close + + @err = Tempfile.new("stderr") + Dir.mkdir("#@tmpdir/dev666") + cmd = [ "cmogstored", "--docroot=#@tmpdir", "--mgmtlisten=#@host:#@port", + "--httplisten=#@host:#@httpport", + "--maxconns=500" ] + vg = ENV["VALGRIND"] and cmd = vg.split(/\s+/).concat(cmd) + @pid = fork { + ENV["MOG_DISK_USAGE_INTERVAL"] = "1" + $stderr.reopen(@err) + @err.close + exec(*cmd) + } + @client = get_client + end + + def teardown + Process.kill(:QUIT, @pid) rescue nil + _, status = Process.waitpid2(@pid) + @to_close.each { |io| io.close unless io.closed? } + FileUtils.rm_rf(@tmpdir) + @err.rewind + # $stderr.write(@err.read) + assert status.success?, status.inspect + end + + def wait_for_seen_dev + @client.write "watch\r\n" + seen_dev = false + while line = @client.gets + line.strip! + case line + when /^666\t/ + seen_dev = true + when "." + break if seen_dev + end + end + end + + def test_aio_threads_auto_adjust + wait_for_seen_dev + t_yield # wait for threads to spawn + taskdir = "/proc/#@pid/task" + glob = "#{taskdir}/*" + prev_threads = Dir[glob].size if File.directory?(taskdir) + if RUBY_PLATFORM =~ /linux/ + assert File.directory?(taskdir), "/proc not mounted on Linux?" + end + + File.directory?(taskdir) or return + Dir.mkdir("#@tmpdir/dev333") + stop = Time.now + 10 + while prev_threads == Dir[glob].size && Time.now < stop + sleep(0.1) + end + cur_threads = Dir[glob].size + assert_operator cur_threads, :>, prev_threads, + "prev_threads=#{cur_threads} > prev_threads=#{prev_threads}" + assert_match(%r{updating server aio_threads=20}, File.read(@err.path)) + end + + def test_aio_threads_auto_adjust_warn + @client.write "server aio_threads = 1\r\n" + assert_equal "\r\n", @client.gets + stop = Time.now + 60 + expect = "server aio_threads=1" + while (! File.read(@err.path).include?(expect) && Time.now < stop) + sleep 0.1 + end + assert(File.read(@err.path).include?(expect), File.read(@err.path)) + + Dir.mkdir("#@tmpdir/dev333") + warning = "fewer aio_threads(1) than devices(2)" + stop = Time.now + 10 + while (! File.read(@err.path).include?(warning)) && Time.now < stop + sleep 0.1 + end + assert(File.read(@err.path).include?(warning), File.read(@err.path)) + end +end diff --git a/test/ruby.mk b/test/ruby.mk index c82a1ec..b089db8 100644 --- a/test/ruby.mk +++ b/test/ruby.mk @@ -3,5 +3,6 @@ RB_TESTS_FAST = test/cmogstored-cfg.rb test/http_dav.rb test/http_range.rb \ test/http_put6_fail.rb test/epoll_enospc.rb RB_TESTS_SLOW = test/mgmt-usage.rb test/mgmt.rb test/mgmt-iostat.rb \ test/http.rb test/http_put_slow.rb test/http_chunked_put.rb \ - test/graceful_quit.rb test/http_idle_expire.rb + test/graceful_quit.rb test/http_idle_expire.rb \ + test/mgmt_auto_adjust.rb RB_TESTS = $(RB_TESTS_FAST) $(RB_TESTS_SLOW) -- cgit v1.2.3-24-ge0c7