diff options
author | Eric Wong <normalperson@yhbt.net> | 2013-06-21 03:34:21 +0000 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2013-06-25 21:27:53 +0000 |
commit | d5a52618ca1f9b5d7f6998716fbfe7714f927112 (patch) | |
tree | 927845365d19c847368b4f678e39697303802bf1 | |
parent | 03c2391078e19dc36ea62c75fa6745569b5cbef6 (diff) | |
download | cmogstored-d5a52618ca1f9b5d7f6998716fbfe7714f927112.tar.gz |
We're using per-svc-based thread pools, so different MogileFS instances we serve no longer affect each other. This means changing the aio_threads count only affects the svc of the sidechannel port which triggered the change.
-rw-r--r-- | cmogstored.c | 86 | ||||
-rw-r--r-- | cmogstored.h | 15 | ||||
-rw-r--r-- | mgmt_fn.c | 7 | ||||
-rw-r--r-- | notify.c | 6 | ||||
-rw-r--r-- | notify.h | 2 | ||||
-rw-r--r-- | svc.c | 148 | ||||
-rw-r--r-- | svc_dev.c | 2 | ||||
-rw-r--r-- | test/mgmt.rb | 8 | ||||
-rw-r--r-- | thrpool.c | 98 |
9 files changed, 195 insertions, 177 deletions
diff --git a/cmogstored.c b/cmogstored.c index e2da7e2..e7cc154 100644 --- a/cmogstored.c +++ b/cmogstored.c @@ -16,9 +16,10 @@ static sig_atomic_t do_exit; static sig_atomic_t do_upgrade; static pid_t master_pid; static pid_t upgrade_pid; -static unsigned long worker_processes; static bool iostat_running; +static struct mog_main mog_main; + #define CFG_KEY(f) -((int)offsetof(struct mog_cfg,f) + 1) static struct argp_option options[] = { { .name = "daemonize", .key = 'd', @@ -123,8 +124,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) break; case -'M': mog_cfg_multi = true; break; case -'W': - check_strtoul(&worker_processes, arg, "worker-processes"); - if (worker_processes > UINT_MAX) + check_strtoul(&mog_main.worker_processes, arg, + "worker-processes"); + if (mog_main.worker_processes > UINT_MAX) die("--worker-processes exceeded (max=%u)", UINT_MAX); break; case ARGP_KEY_ARG: @@ -263,53 +265,6 @@ MOG_NOINLINE static void setup(int argc, char *argv[]) mog_mkusage_all(); } -/* Hash iterator function */ -static bool svc_start_each(void *svcptr, void *argptr) -{ - struct mog_svc *svc = svcptr; - bool *have_mgmt = argptr; - struct mog_accept *ac; - size_t athr = (size_t)num_processors(NPROC_CURRENT); - struct mog_queue *q = mog_queue_new(); - - /* - * try to distribute accept() callers between workers more evenly - * with wake-one accept() behavior by trimming down on acceptors - * having too many acceptor threads does not make sense, these - * threads are only bounded by lock contention and local bus speeds. - * Increasing threads here just leads to lock contention inside the - * kernel (accept/accept4/EPOLL_CTL_ADD) - */ - athr = worker_processes > 1 ? 1 : MIN(2, athr); - - svc->queue = q; - mog_thrpool_start(&q->thrpool, 1, mog_queue_loop, q); - mog_thrpool_update(q, 0, svc->nmogdev); - - if (svc->mgmt_mfd) { - *have_mgmt = true; - ac = &svc->mgmt_mfd->as.accept; - - /* - * mgmt port is rarely used and always persistent, so it - * does not need multiple threads for blocking accept() - */ - mog_thrpool_start(&ac->thrpool, 1, mog_accept_loop, ac); - } - - if (svc->http_mfd) { - ac = &svc->http_mfd->as.accept; - mog_thrpool_start(&ac->thrpool, athr, mog_accept_loop, ac); - } - - if (svc->httpget_mfd) { - ac = &svc->httpget_mfd->as.accept; - mog_thrpool_start(&ac->thrpool, athr, mog_accept_loop, ac); - } - - return true; -} - static void worker_wakeup_handler(int signum) { switch (signum) { @@ -414,20 +369,20 @@ static void upgrade_handler(void) } } -static void main_worker_loop(const pid_t parent, bool have_mgmt) +static void main_worker_loop(const pid_t parent) { mog_cancel_disable(); /* mog_idleq_wait() now relies on this */ while (parent == 0 || parent == getppid()) { - mog_notify_wait(have_mgmt); + mog_notify_wait(mog_main.have_mgmt); if (sigchld_hit) sigchld_handler(); if (do_upgrade) upgrade_handler(); if (do_exit) cmogstored_exit(); - if (have_mgmt) + if (mog_main.have_mgmt) mog_mnt_refresh(); - else if (have_mgmt && !iostat_running && !do_exit) + else if (mog_main.have_mgmt && !iostat_running && !do_exit) /* * maybe iostat was not installed/available/usable at * startup, but became usable later @@ -441,17 +396,18 @@ static void main_worker_loop(const pid_t parent, bool have_mgmt) static void run_worker(const pid_t parent) { - bool have_mgmt = false; - mog_notify_init(); siginit(worker_wakeup_handler); - mog_svc_each(svc_start_each, &have_mgmt); /* this will set have_mgmt */ - if (have_mgmt) { + + /* this can set mog_main->have_mgmt */ + mog_svc_each(mog_svc_start_each, &mog_main); + + if (mog_main.have_mgmt) { iostat_running = mog_iostat_respawn(0); if (!iostat_running) syslog(LOG_WARNING, "iostat(1) not available/running"); } - main_worker_loop(parent, have_mgmt); + main_worker_loop(parent); } static void fork_worker(unsigned worker_id) @@ -463,8 +419,6 @@ static void fork_worker(unsigned worker_id) if (pid > 0) { mog_process_register(pid, worker_id); } else if (pid == 0) { - /* workers have no workers of their own */ - worker_processes = 0; mog_process_reset(); /* worker will call mog_intr_enable() later in notify loop */ @@ -508,7 +462,7 @@ static void process_died(pid_t pid, int status) switch (id) { case MOG_PROC_IOSTAT: - assert(worker_processes == 0 && + assert(mog_main.worker_processes == 0 && "master process registered iostat process"); iostat_died(pid, status); return; @@ -532,12 +486,12 @@ static void process_died(pid_t pid, int status) static void run_master(void) { unsigned id; - size_t running = worker_processes; + size_t running = mog_main.worker_processes; master_selfwake = mog_selfwake_new(); siginit(master_wakeup_handler); - for (id = 0; id < worker_processes; id++) + for (id = 0; id < mog_main.worker_processes; id++) fork_worker(id); while (running > 0) { @@ -561,8 +515,8 @@ int main(int argc, char *argv[], char *envp[]) mog_intr_disable(); setup(argc, argv); /* this daemonizes */ - mog_process_init(worker_processes); - if (worker_processes == 0) + mog_process_init(mog_main.worker_processes); + if (mog_main.worker_processes == 0) run_worker(0); else run_master(); diff --git a/cmogstored.h b/cmogstored.h index 0b5a7bf..fd10e6d 100644 --- a/cmogstored.h +++ b/cmogstored.h @@ -139,6 +139,7 @@ struct mog_svc { int docroot_fd; const char *docroot; size_t nmogdev; + size_t user_set_aio_threads; /* only touched by main/notify thread */ /* private */ DIR *dir; @@ -332,6 +333,10 @@ struct mog_svc *mog_svc_new(const char *docroot); typedef int (*mog_scandev_cb)(const struct mog_dev *, struct mog_svc *); size_t mog_svc_each(Hash_processor processor, void *data); void mog_svc_upgrade_prepare(void); +bool mog_svc_start_each(void *svc_ptr, void *have_mgmt_ptr); +void mog_svc_thrpool_rescale(struct mog_svc *, size_t ndev_new); +void mog_svc_aio_threads_enqueue(struct mog_svc *, size_t nr); +void mog_svc_aio_threads_handler(void); /* dev.c */ struct mog_dev * mog_dev_for(struct mog_svc *, uint32_t mog_devid); @@ -392,10 +397,8 @@ char *mog_canonpath_die(const char *path, enum canonicalize_mode_t canon_mode); void mog_thrpool_start(struct mog_thrpool *, size_t n, void *(*start_fn)(void *), void *arg); void mog_thrpool_quit(struct mog_thrpool *, struct mog_queue *); -void mog_thrpool_set_n_threads(struct mog_queue *q, size_t size); void mog_thrpool_process_queue(void); -void mog_thrpool_update(struct mog_queue *, size_t ndev_old, size_t ndev_new); -extern size_t mog_user_set_aio_threads; +void mog_thrpool_set_size(struct mog_thrpool *, size_t size); /* mgmt.c */ void mog_mgmt_writev(struct mog_mgmt *, struct iovec *, int iovcnt); @@ -530,6 +533,12 @@ void mog_iou_active(dev_t); # define MOG_TCP_NOPUSH (0) #endif +/* publically visible attributes of the current process */ +struct mog_main { + unsigned long worker_processes; + bool have_mgmt; +}; + /* cmogstored.c */ void cmogstored_quit(void); @@ -183,7 +183,6 @@ void mog_mgmt_fn_aio_threads(struct mog_mgmt *mgmt, char *buf) { char *end; unsigned long long nr; - struct mog_queue *q = mgmt->svc->queue; char *nptr = buf + mgmt->mark[0]; char *eor = nptr + mgmt->mark[1] - mgmt->mark[0]; struct iovec iov; @@ -194,10 +193,8 @@ void mog_mgmt_fn_aio_threads(struct mog_mgmt *mgmt, char *buf) nr = strtoull(nptr, &end, 10); assert(*end == 0 && "ragel misfed mog_mgmt_fn_set_aio_threads"); - if (nr > 0 && nr <= (size_t)INT_MAX) { - mog_user_set_aio_threads = (size_t)nr; - mog_thrpool_set_n_threads(q, nr); - } + if (nr > 0 && nr <= (size_t)INT_MAX) + mog_svc_aio_threads_enqueue(mgmt->svc, nr); IOV_STR(&iov, "\r\n"); mog_mgmt_writev(mgmt, &iov, 1); @@ -50,8 +50,8 @@ static void note_run(void) if (note_xchg(MOG_NOTIFY_DEVICE_REFRESH, 1, 0)) global_mkusage(); - if (note_xchg(MOG_NOTIFY_SET_N_THREADS, 1, 0)) - mog_thrpool_process_queue(); + if (note_xchg(MOG_NOTIFY_AIO_THREADS, 1, 0)) + mog_svc_aio_threads_handler(); } /* drain the pipe and process notifications */ @@ -109,7 +109,7 @@ void mog_notify(enum mog_notification note) { switch (note) { case MOG_NOTIFY_DEVICE_REFRESH: - case MOG_NOTIFY_SET_N_THREADS: + case MOG_NOTIFY_AIO_THREADS: note_xchg(note, 0, 1); mog_selfwake_interrupt(); break; @@ -5,7 +5,7 @@ enum mog_notification { MOG_NOTIFY_SIGNAL = -1, MOG_NOTIFY_DEVICE_REFRESH = 0, - MOG_NOTIFY_SET_N_THREADS = 1, + MOG_NOTIFY_AIO_THREADS = 1, MOG_NOTIFY_MAX }; @@ -11,6 +11,27 @@ static pthread_mutex_t svc_lock = PTHREAD_MUTEX_INITIALIZER; static Hash_table *by_docroot; /* enforce one mog_svc per docroot: */ static mode_t mog_umask; +static const size_t thr_per_dev = 10; + +/* + * maintain an internal queue of requests for the "server aio_threads = N" + * command in the side channel. The worker handling the client request must + * tell the main thread do change thread counts asynchronously (because + * the worker thread handling the request may die from a thread count + * reduction, so we have a worker thread make a fire-and-forget request + * to the notify thread. + */ +static pthread_mutex_t aio_threads_lock = PTHREAD_MUTEX_INITIALIZER; +struct aio_threads_req; +struct aio_threads_req { + struct mog_svc *svc; + size_t size; + SIMPLEQ_ENTRY(aio_threads_req) qentry; +}; + +static SIMPLEQ_HEAD(sq, aio_threads_req) aio_threads_qhead = + SIMPLEQ_HEAD_INITIALIZER(aio_threads_qhead); + static void svc_free(void *ptr) { @@ -147,3 +168,130 @@ void mog_svc_upgrade_prepare(void) { (void)hash_do_for_each(by_docroot, svc_cloexec_off_i, NULL); } + +/* this is only called by the main (notify) thread */ +void mog_svc_thrpool_rescale(struct mog_svc *svc, size_t ndev_new) +{ + size_t size = ndev_new * thr_per_dev; + struct mog_thrpool *tp = &svc->queue->thrpool; + + /* respect user-setting */ + if (svc->user_set_aio_threads) { + if (tp->n_threads >= ndev_new) + return; + + syslog(LOG_WARNING, + "server aio_threads=%zu is less than devcount=%zu", + tp->n_threads, ndev_new); + + return; + } + + if (size < thr_per_dev) + size = thr_per_dev; + + if (svc->nmogdev) + syslog(LOG_INFO, + "devcount(%zu->%zu), updating server aio_threads=%zu", + svc->nmogdev, ndev_new, size); + mog_thrpool_set_size(tp, size); +} + +/* Hash iterator function */ +bool mog_svc_start_each(void *svc_ptr, void *main_ptr) +{ + struct mog_svc *svc = svc_ptr; + struct mog_main *mog_main = main_ptr; + struct mog_accept *ac; + size_t athr = (size_t)num_processors(NPROC_CURRENT); + struct mog_queue *q = mog_queue_new(); + size_t nthr = svc->nmogdev ? svc->nmogdev * thr_per_dev : thr_per_dev; + + /* + * try to distribute accept() callers between workers more evenly + * with wake-one accept() behavior by trimming down on acceptors + * having too many acceptor threads does not make sense, these + * threads are only bounded by lock contention and local bus speeds. + * Increasing threads here just leads to lock contention inside the + * kernel (accept/accept4/EPOLL_CTL_ADD) + */ + athr = mog_main->worker_processes > 1 ? 1 : MIN(2, athr); + + svc->queue = q; + mog_thrpool_start(&q->thrpool, nthr, mog_queue_loop, q); + + if (svc->mgmt_mfd) { + mog_main->have_mgmt = true; + ac = &svc->mgmt_mfd->as.accept; + + /* + * mgmt port is rarely used and always persistent, so it + * does not need multiple threads for blocking accept() + */ + mog_thrpool_start(&ac->thrpool, 1, mog_accept_loop, ac); + } + + if (svc->http_mfd) { + ac = &svc->http_mfd->as.accept; + mog_thrpool_start(&ac->thrpool, athr, mog_accept_loop, ac); + } + + if (svc->httpget_mfd) { + ac = &svc->httpget_mfd->as.accept; + mog_thrpool_start(&ac->thrpool, athr, mog_accept_loop, ac); + } + + return true; +} + +/* + * Fire and forget, we must run the actual thread count manipulation + * in the main notify thread because we may end up terminating the + * thread which invoked this. + * + * Called by threads inside the thrpool to wake-up the main/notify thread. + */ +void mog_svc_aio_threads_enqueue(struct mog_svc *svc, size_t size) +{ + struct aio_threads_req *req; + + /* this gets free'ed in mog_thrpool_process_queue() */ + req = xmalloc(sizeof(struct aio_threads_req)); + req->size = size; + req->svc = svc; + + /* put into the queue so main thread can process it */ + CHECK(int, 0, pthread_mutex_lock(&aio_threads_lock)); + SIMPLEQ_INSERT_TAIL(&aio_threads_qhead, req, qentry); + CHECK(int, 0, pthread_mutex_unlock(&aio_threads_lock)); + + /* wake up the main thread so it can process the queue */ + mog_notify(MOG_NOTIFY_AIO_THREADS); +} + +/* this runs in the main (notify) thread */ +void mog_svc_aio_threads_handler(void) +{ + struct aio_threads_req *req; + + /* guard against requests bundled in one wakeup by looping here */ + for (;;) { + CHECK(int, 0, pthread_mutex_lock(&aio_threads_lock)); + req = SIMPLEQ_FIRST(&aio_threads_qhead); + if (req) + SIMPLEQ_REMOVE_HEAD(&aio_threads_qhead, qentry); + CHECK(int, 0, pthread_mutex_unlock(&aio_threads_lock)); + + /* + * spurious wakeup is possible since we loop here + * (and we must loop, see above comment) + */ + if (req == NULL) + return; + + syslog(LOG_INFO, "server aio_threads=%zu", req->size); + req->svc->user_set_aio_threads = req->size; + mog_thrpool_set_size(&req->svc->queue->thrpool, req->size); + free(req); + } +} @@ -283,7 +283,7 @@ static bool svc_mkusage_each(void *svcptr, void *ignored) svc_scandev(svc, &ndev, mog_dev_mkusage); if (svc->queue && (svc->nmogdev != ndev)) - mog_thrpool_update(svc->queue, svc->nmogdev, ndev); + mog_svc_thrpool_rescale(svc, ndev); svc->nmogdev = ndev; return true; diff --git a/test/mgmt.rb b/test/mgmt.rb index 373cd69..deec383 100644 --- a/test/mgmt.rb +++ b/test/mgmt.rb @@ -270,17 +270,19 @@ class TestMgmt < Test::Unit::TestCase t_yield # wait for threads to spawn taskdir = "/proc/#@pid/task" glob = "#{taskdir}/*" - nr_threads = Dir[glob].size if File.directory?(taskdir) + prev_threads = Dir[glob].size if File.directory?(taskdir) @client.write "server aio_threads = 1\r\n" assert_equal "\r\n", @client.gets if RUBY_PLATFORM =~ /linux/ assert File.directory?(taskdir), "/proc not mounted on Linux?" end if File.directory?(taskdir) - while nr_threads == Dir[glob].size && (tries -= 1) > 0 + while prev_threads == Dir[glob].size && (tries -= 1) > 0 sleep(0.1) end - assert nr_threads != Dir[glob].size + cur_threads = Dir[glob].size + assert prev_threads != cur_threads, + "prev_threads=#{prev_threads} != cur_threads=#{cur_threads}" end @client.write "server aio_threads=6\r\n" assert_equal "\r\n", @client.gets @@ -4,9 +4,6 @@ */ #include "cmogstored.h" -/* this is set and read without a lock, but harmless if racy */ -size_t mog_user_set_aio_threads; - /* * we can lower this if we can test with lower values, NPTL minimum is 16K. * We also use syslog() and *printf() functions which take a lot of @@ -24,17 +21,6 @@ size_t mog_user_set_aio_threads; # define MOG_THR_STACK_SIZE (0) #endif static const size_t stacksize = (size_t)MOG_THR_STACK_SIZE; -static const size_t thr_per_dev = 10; - -static pthread_mutex_t sat_lock = PTHREAD_MUTEX_INITIALIZER; -struct sat_arg; -struct sat_arg { - struct mog_queue *queue; - size_t size; - SIMPLEQ_ENTRY(sat_arg) qentry; -}; - -static SIMPLEQ_HEAD(sq, sat_arg) satqhead = SIMPLEQ_HEAD_INITIALIZER(satqhead); /* * kevent() sleep is not a cancellation point, so it's possible for @@ -72,7 +58,7 @@ thr_create_fail_retry(struct mog_thrpool *tp, size_t size, } } -static void thrpool_set_size(struct mog_thrpool *tp, size_t size) +void mog_thrpool_set_size(struct mog_thrpool *tp, size_t size) { unsigned long nr_eagain = 0; @@ -136,84 +122,6 @@ out: CHECK(int, 0, pthread_mutex_unlock(&tp->lock)); } -/* this is only called by the main (notify) thread */ -void mog_thrpool_update(struct mog_queue *q, size_t ndev_old, size_t ndev_new) -{ - size_t size = ndev_new * thr_per_dev; - struct mog_thrpool *tp = &q->thrpool; - - if (mog_user_set_aio_threads) { - size_t n_threads; - - CHECK(int, 0, pthread_mutex_lock(&tp->lock)); - n_threads = tp->n_threads; - CHECK(int, 0, pthread_mutex_unlock(&tp->lock)); - - if (n_threads >= ndev_new) - return; - - syslog(LOG_WARNING, - "server aio_threads=%zu is less than devcount=%zu", - n_threads, ndev_new); - - return; - } - - if (size < thr_per_dev) - size = thr_per_dev; - - if (ndev_old) - syslog(LOG_INFO, - "devcount(%zu->%zu), updating server aio_threads=%zu", - ndev_old, ndev_new, size); - thrpool_set_size(tp, size); -} - -/* - * fire and forget, we must run the actual thread count manipulation - * in the main notify thread because we may end up terminating the - * thread which invoked this. - */ -void mog_thrpool_set_n_threads(struct mog_queue *q, size_t size) -{ - struct sat_arg *arg; - - /* this gets free'ed in mog_thrpool_process_queue() */ - arg = xmalloc(sizeof(struct sat_arg)); - arg->size = size; - arg->queue = q; - - /* put into the queue so main thread can process it */ - CHECK(int, 0, pthread_mutex_lock(&sat_lock)); - SIMPLEQ_INSERT_TAIL(&satqhead, arg, qentry); - CHECK(int, 0, pthread_mutex_unlock(&sat_lock)); - - /* wake up the main thread so it can process the queue */ - mog_notify(MOG_NOTIFY_SET_N_THREADS); -} - -/* this runs in the main (notify) thread */ -void mog_thrpool_process_queue(void) -{ - /* guard against requests bundled in one wakeup by looping here */ - for (;;) { - struct sat_arg *arg; - - CHECK(int, 0, pthread_mutex_lock(&sat_lock)); - arg = SIMPLEQ_FIRST(&satqhead); - if (arg) - SIMPLEQ_REMOVE_HEAD(&satqhead, qentry); - CHECK(int, 0, pthread_mutex_unlock(&sat_lock)); - - if (arg == NULL) - return; - - syslog(LOG_INFO, "server aio_threads=%zu", arg->size); - thrpool_set_size(&arg->queue->thrpool, arg->size); - free(arg); - } -} - void mog_thrpool_start(struct mog_thrpool *tp, size_t n, void *(*start_fn)(void *), void *arg) @@ -227,12 +135,12 @@ mog_thrpool_start(struct mog_thrpool *tp, size_t n, tp->start_fn = start_fn; tp->start_arg = arg; CHECK(int, 0, pthread_mutex_init(&tp->lock, NULL)); - thrpool_set_size(tp, n); + mog_thrpool_set_size(tp, n); } void mog_thrpool_quit(struct mog_thrpool *tp, struct mog_queue *q) { - thrpool_set_size(tp, 0); + mog_thrpool_set_size(tp, 0); CHECK(int, 0, pthread_mutex_destroy(&tp->lock)); mog_free_and_null(&tp->threads); } |