diff options
author | Eric Wong <normalperson@yhbt.net> | 2013-06-21 03:34:12 +0000 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2013-06-25 21:24:53 +0000 |
commit | e90b43119ff33fb591ffb3bc100cf847537ca5fb (patch) | |
tree | ef181b44f4c323aaf53df2716017982616816b69 | |
parent | 2acbe7f4001de74091282ee199e3cad50c2e3e7f (diff) | |
download | cmogstored-e90b43119ff33fb591ffb3bc100cf847537ca5fb.tar.gz |
This simplifies code, reduces contention, and reduces the chances of independent MogileFS instances (with one instance of cmogstored) stepping over each other. Most cmogstored deployments are single docroot (for a single instance of MogileFS), however cmogstored supports multiple docroots for some rare configurations and we support them here.
-rw-r--r-- | cmogstored.c | 28 | ||||
-rw-r--r-- | cmogstored.h | 5 | ||||
-rw-r--r-- | notify.c | 26 | ||||
-rw-r--r-- | svc_dev.c | 24 | ||||
-rw-r--r-- | thrpool.c | 6 |
5 files changed, 45 insertions, 44 deletions
diff --git a/cmogstored.c b/cmogstored.c index f87cc67..e2da7e2 100644 --- a/cmogstored.c +++ b/cmogstored.c @@ -14,8 +14,6 @@ static struct mog_fd *master_selfwake; static sig_atomic_t sigchld_hit; static sig_atomic_t do_exit; static sig_atomic_t do_upgrade; -static size_t nthr; -static bool have_mgmt; static pid_t master_pid; static pid_t upgrade_pid; static unsigned long worker_processes; @@ -261,18 +259,18 @@ MOG_NOINLINE static void setup(int argc, char *argv[]) master_pid = getpid(); - /* 10 - ??? threads based on number of devices, same as mogstored */ - nthr = mog_mkusage_all(NULL) * 10; - nthr = MAX(10, nthr); + /* set svc->nmogdev on all svc */ + mog_mkusage_all(); } /* Hash iterator function */ -static bool svc_start_each(void *svcptr, void *qptr) +static bool svc_start_each(void *svcptr, void *argptr) { struct mog_svc *svc = svcptr; - struct mog_queue *q = qptr; + bool *have_mgmt = argptr; struct mog_accept *ac; size_t athr = (size_t)num_processors(NPROC_CURRENT); + struct mog_queue *q = mog_queue_new(); /* * try to distribute accept() callers between workers more evenly @@ -285,9 +283,11 @@ static bool svc_start_each(void *svcptr, void *qptr) athr = worker_processes > 1 ? 1 : MIN(2, athr); svc->queue = q; + mog_thrpool_start(&q->thrpool, 1, mog_queue_loop, q); + mog_thrpool_update(q, 0, svc->nmogdev); if (svc->mgmt_mfd) { - have_mgmt = true; + *have_mgmt = true; ac = &svc->mgmt_mfd->as.accept; /* @@ -414,11 +414,11 @@ static void upgrade_handler(void) } } -static void main_worker_loop(struct mog_queue *q, const pid_t parent) +static void main_worker_loop(const pid_t parent, bool have_mgmt) { mog_cancel_disable(); /* mog_idleq_wait() now relies on this */ while (parent == 0 || parent == getppid()) { - mog_notify_wait(q, have_mgmt); + mog_notify_wait(have_mgmt); if (sigchld_hit) sigchld_handler(); if (do_upgrade) @@ -441,19 +441,17 @@ static void main_worker_loop(struct mog_queue *q, const pid_t parent) static void run_worker(const pid_t parent) { - struct mog_queue *q = mog_queue_new(); + bool have_mgmt = false; mog_notify_init(); siginit(worker_wakeup_handler); - mog_thrpool_start(&q->thrpool, nthr, mog_queue_loop, q); - have_mgmt = false; - mog_svc_each(svc_start_each, q); /* this will set have_mgmt */ + mog_svc_each(svc_start_each, &have_mgmt); /* this will set have_mgmt */ if (have_mgmt) { iostat_running = mog_iostat_respawn(0); if (!iostat_running) syslog(LOG_WARNING, "iostat(1) not available/running"); } - main_worker_loop(q, parent); + main_worker_loop(parent, have_mgmt); } static void fork_worker(unsigned worker_id) diff --git a/cmogstored.h b/cmogstored.h index f17d45b..6bbd786 100644 --- a/cmogstored.h +++ b/cmogstored.h @@ -137,6 +137,7 @@ struct mog_queue; struct mog_svc { int docroot_fd; const char *docroot; + size_t nmogdev; /* private */ DIR *dir; @@ -351,7 +352,7 @@ void mog_pidfile_upgrade_abort(void); bool mog_svc_devstats_broadcast(void *svc, void *ignored); void mog_svc_devstats_subscribe(struct mog_mgmt *); void mog_svc_dev_shutdown(void); -size_t mog_mkusage_all(struct mog_queue *); +void mog_mkusage_all(void); /* cloexec_detect.c */ extern bool mog_cloexec_atomic; @@ -434,7 +435,7 @@ bool mog_open_expire_retry(struct mog_svc *); /* notify.c */ void mog_notify_init(void); void mog_notify(enum mog_notification); -void mog_notify_wait(struct mog_queue *, bool need_usage_file); +void mog_notify_wait(bool need_usage_file); /* http_parser.rl */ void mog_http_reset_parser(struct mog_http *); @@ -34,9 +34,9 @@ void mog_notify_init(void) } } -static void global_mkusage(struct mog_queue *q) +static void global_mkusage(void) { - mog_mkusage_all(q); + mog_mkusage_all(); usage_file_updated_at = time(NULL); } @@ -45,27 +45,27 @@ static inline bool note_xchg(enum mog_notification note, int from, int to) return __sync_bool_compare_and_swap(¬es[note], from, to); } -static void note_run(struct mog_queue *q) +static void note_run(void) { if (note_xchg(MOG_NOTIFY_DEVICE_REFRESH, 1, 0)) - global_mkusage(q); + global_mkusage(); if (note_xchg(MOG_NOTIFY_SET_N_THREADS, 1, 0)) mog_thrpool_process_queue(); } /* drain the pipe and process notifications */ -static void note_queue_step(struct mog_queue *q, struct mog_fd *mfd) +static void note_queue_step(struct mog_fd *mfd) { mog_selfwake_drain(mfd); - note_run(q); + note_run(); mog_idleq_push(mfd->as.selfwake.queue, mfd, MOG_QEV_RD); } -static void notify_queue_step(struct mog_queue *q, struct mog_fd *mfd) +static void notify_queue_step(struct mog_fd *mfd) { switch (mfd->fd_type) { - case MOG_FD_TYPE_SELFWAKE: note_queue_step(q, mfd); return; + case MOG_FD_TYPE_SELFWAKE: note_queue_step(mfd); return; case MOG_FD_TYPE_IOSTAT: mog_iostat_queue_step(mfd); return; default: assert(0 && mfd->fd_type && "bad fd_type in queue"); @@ -73,7 +73,7 @@ static void notify_queue_step(struct mog_queue *q, struct mog_fd *mfd) } /* this is the main loop of cmogstored */ -void mog_notify_wait(struct mog_queue *q, bool need_usage_file) +void mog_notify_wait(bool need_usage_file) { time_t next = usage_file_updated_at + usage_file_interval; time_t now = time(NULL); @@ -81,14 +81,14 @@ void mog_notify_wait(struct mog_queue *q, bool need_usage_file) struct mog_fd *mfd; if (next <= now) - global_mkusage(q); + global_mkusage(); /* * epoll_wait() with timeout==0 can avoid some slow paths, * so take anything that's already ready before sleeping */ while ((mfd = mog_idleq_wait(mog_notify_queue, 0))) - notify_queue_step(q, mfd); + notify_queue_step(mfd); if (need_usage_file == false) timeout = -1; @@ -99,9 +99,9 @@ void mog_notify_wait(struct mog_queue *q, bool need_usage_file) mfd = mog_idleq_wait_intr(mog_notify_queue, timeout); if (mfd) - notify_queue_step(q, mfd); + notify_queue_step(mfd); else if (errno == EINTR) - note_run(q); + note_run(); } /* this is async-signal safe */ @@ -4,7 +4,6 @@ */ #include "cmogstored.h" #include "compat_memstream.h" -static size_t ndev; /* * maps multiple "devXXX" directories to the device. @@ -287,22 +286,21 @@ void mog_svc_dev_shutdown(void) mog_svc_each(devstats_shutdown_i, NULL); } -static bool svc_mkusage_each(void *svc, void *nr) +static bool svc_mkusage_each(void *svcptr, void *ignored) { - svc_scandev((struct mog_svc *)svc, nr, mog_dev_mkusage); + struct mog_svc *svc = svcptr; + size_t ndev = 0; + + svc_scandev(svc, &ndev, mog_dev_mkusage); + + if (svc->queue && (svc->nmogdev != ndev)) + mog_thrpool_update(svc->queue, svc->nmogdev, ndev); + svc->nmogdev = ndev; return true; } -size_t mog_mkusage_all(struct mog_queue *q) +void mog_mkusage_all(void) { - size_t ndev_new = 0; - - mog_svc_each(svc_mkusage_each, &ndev_new); - - if (q && ndev_new != ndev) - mog_thrpool_update(q, ndev, ndev_new); - ndev = ndev_new; - - return ndev; + mog_svc_each(svc_mkusage_each, NULL); } @@ -24,6 +24,7 @@ size_t mog_user_set_aio_threads; # define MOG_THR_STACK_SIZE (0) #endif static const size_t stacksize = (size_t)MOG_THR_STACK_SIZE; +static const size_t thr_per_dev = 10; static pthread_mutex_t sat_lock = PTHREAD_MUTEX_INITIALIZER; struct sat_arg; @@ -138,7 +139,7 @@ out: /* this is only called by the main (notify) thread */ void mog_thrpool_update(struct mog_queue *q, size_t ndev_old, size_t ndev_new) { - size_t size = ndev_new * 10; + size_t size = ndev_new * thr_per_dev; struct mog_thrpool *tp = &q->thrpool; if (mog_user_set_aio_threads) { @@ -158,6 +159,9 @@ void mog_thrpool_update(struct mog_queue *q, size_t ndev_old, size_t ndev_new) return; } + if (size < thr_per_dev) + size = thr_per_dev; + if (ndev_old) syslog(LOG_INFO, "devcount(%zu->%zu), updating server aio_threads=%zu", |