diff options
author | Eric Wong <normalperson@yhbt.net> | 2013-06-15 11:22:11 +0000 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2013-06-15 11:22:11 +0000 |
commit | dbe7ad541412a1069487c11582b9b40905b7464f (patch) | |
tree | de29658d1e31451aa912778428721298d9aab4ba | |
parent | 2157035aa98720d36dc6ed8f5be03516b9588811 (diff) | |
download | cmogstored-dbe7ad541412a1069487c11582b9b40905b7464f.tar.gz |
This will help ensure availability when new devices are added, without additional user interaction to manually set aio_threads via sidechannel.
-rw-r--r-- | TODO | 1 | ||||
-rw-r--r-- | cmogstored.c | 11 | ||||
-rw-r--r-- | cmogstored.h | 6 | ||||
-rw-r--r-- | mgmt_fn.c | 4 | ||||
-rw-r--r-- | notify.c | 26 | ||||
-rw-r--r-- | svc_dev.c | 17 | ||||
-rw-r--r-- | thrpool.c | 35 |
7 files changed, 72 insertions, 28 deletions
@@ -4,3 +4,4 @@ * reduce/minimize memory/stack usage * optional fsync/fdatasync/O_SYNC/msync for PUT * fallocate support? slow emulation interfaces can be a problem... +* inotify (and kqueue) support for detecting new device directories diff --git a/cmogstored.c b/cmogstored.c index 1c2f7db..3c3f87c 100644 --- a/cmogstored.c +++ b/cmogstored.c @@ -261,10 +261,9 @@ MOG_NOINLINE static void setup(int argc, char *argv[]) master_pid = getpid(); - /* 10 - 100 threads based on number of devices, same as mogstored */ - nthr = mog_mkusage_all() * 10; + /* 10 - ??? threads based on number of devices, same as mogstored */ + nthr = mog_mkusage_all(NULL) * 10; nthr = MAX(10, nthr); - nthr = MIN(100, nthr); } /* Hash iterator function */ @@ -415,11 +414,11 @@ static void upgrade_handler(void) } } -static void main_worker_loop(const pid_t parent) +static void main_worker_loop(struct mog_queue *q, const pid_t parent) { mog_cancel_disable(); /* mog_idleq_wait() now relies on this */ while (parent == 0 || parent == getppid()) { - mog_notify_wait(have_mgmt); + mog_notify_wait(q, have_mgmt); if (sigchld_hit) sigchld_handler(); if (do_upgrade) @@ -454,7 +453,7 @@ static void run_worker(const pid_t parent) if (!iostat_running) syslog(LOG_WARNING, "iostat(1) not available/running"); } - main_worker_loop(parent); + main_worker_loop(q, parent); } static void fork_worker(unsigned worker_id) diff --git a/cmogstored.h b/cmogstored.h index ee6a6fa..f17d45b 100644 --- a/cmogstored.h +++ b/cmogstored.h @@ -351,7 +351,7 @@ void mog_pidfile_upgrade_abort(void); bool mog_svc_devstats_broadcast(void *svc, void *ignored); void mog_svc_devstats_subscribe(struct mog_mgmt *); void mog_svc_dev_shutdown(void); -size_t mog_mkusage_all(void); +size_t mog_mkusage_all(struct mog_queue *); /* cloexec_detect.c */ extern bool mog_cloexec_atomic; @@ -386,6 +386,8 @@ void mog_thrpool_start(struct mog_thrpool *, size_t n, void mog_thrpool_quit(struct mog_thrpool *, struct mog_queue *); void mog_thrpool_set_n_threads(struct mog_queue *q, size_t size); void mog_thrpool_process_queue(void); +void mog_thrpool_update(struct mog_queue *, size_t ndev_old, size_t ndev_new); +extern size_t mog_user_set_aio_threads; /* mgmt.c */ void mog_mgmt_writev(struct mog_mgmt *, struct iovec *, int iovcnt); @@ -432,7 +434,7 @@ bool mog_open_expire_retry(struct mog_svc *); /* notify.c */ void mog_notify_init(void); void mog_notify(enum mog_notification); -void mog_notify_wait(bool need_usage_file); +void mog_notify_wait(struct mog_queue *, bool need_usage_file); /* http_parser.rl */ void mog_http_reset_parser(struct mog_http *); @@ -194,8 +194,10 @@ void mog_mgmt_fn_aio_threads(struct mog_mgmt *mgmt, char *buf) nr = strtoull(nptr, &end, 10); assert(*end == 0 && "ragel misfed mog_mgmt_fn_set_aio_threads"); - if (nr > 0 && nr <= 100) + if (nr > 0 && nr <= (size_t)INT_MAX) { + mog_user_set_aio_threads = (size_t)nr; mog_thrpool_set_n_threads(q, nr); + } IOV_STR(&iov, "\r\n"); mog_mgmt_writev(mgmt, &iov, 1); @@ -34,9 +34,9 @@ void mog_notify_init(void) } } -static void global_mkusage(void) +static void global_mkusage(struct mog_queue *q) { - mog_mkusage_all(); + mog_mkusage_all(q); usage_file_updated_at = time(NULL); } @@ -45,27 +45,27 @@ static inline bool note_xchg(enum mog_notification note, int from, int to) return __sync_bool_compare_and_swap(¬es[note], from, to); } -static void note_run(void) +static void note_run(struct mog_queue *q) { if (note_xchg(MOG_NOTIFY_DEVICE_REFRESH, 1, 0)) - global_mkusage(); + global_mkusage(q); if (note_xchg(MOG_NOTIFY_SET_N_THREADS, 1, 0)) mog_thrpool_process_queue(); } /* drain the pipe and process notifications */ -static void note_queue_step(struct mog_fd *mfd) +static void note_queue_step(struct mog_queue *q, struct mog_fd *mfd) { mog_selfwake_drain(mfd); - note_run(); + note_run(q); mog_idleq_push(mfd->as.selfwake.queue, mfd, MOG_QEV_RD); } -static void notify_queue_step(struct mog_fd *mfd) +static void notify_queue_step(struct mog_queue *q, struct mog_fd *mfd) { switch (mfd->fd_type) { - case MOG_FD_TYPE_SELFWAKE: note_queue_step(mfd); return; + case MOG_FD_TYPE_SELFWAKE: note_queue_step(q, mfd); return; case MOG_FD_TYPE_IOSTAT: mog_iostat_queue_step(mfd); return; default: assert(0 && mfd->fd_type && "bad fd_type in queue"); @@ -73,7 +73,7 @@ static void notify_queue_step(struct mog_fd *mfd) } /* this is the main loop of cmogstored */ -void mog_notify_wait(bool need_usage_file) +void mog_notify_wait(struct mog_queue *q, bool need_usage_file) { time_t next = usage_file_updated_at + usage_file_interval; time_t now = time(NULL); @@ -81,14 +81,14 @@ void mog_notify_wait(bool need_usage_file) struct mog_fd *mfd; if (next <= now) - global_mkusage(); + global_mkusage(q); /* * epoll_wait() with timeout==0 can avoid some slow paths, * so take anything that's already ready before sleeping */ while ((mfd = mog_idleq_wait(mog_notify_queue, 0))) - notify_queue_step(mfd); + notify_queue_step(q, mfd); if (need_usage_file == false) timeout = -1; @@ -99,9 +99,9 @@ void mog_notify_wait(bool need_usage_file) mfd = mog_idleq_wait_intr(mog_notify_queue, timeout); if (mfd) - notify_queue_step(mfd); + notify_queue_step(q, mfd); else if (errno == EINTR) - note_run(); + note_run(q); } /* this is async-signal safe */ @@ -4,6 +4,7 @@ */ #include "cmogstored.h" #include "compat_memstream.h" +static size_t ndev; /* * maps multiple "devXXX" directories to the device. @@ -135,7 +136,9 @@ static int svc_scandev(struct mog_svc *svc, size_t *nr, mog_scandev_cb cb) devlist = svc_devlist(svc, dev->st_dev); devhash = devlist->by_mogdevid; - if (cb) rc |= cb(dev, svc); /* mog_dev_mkusage */ + if (cb) + rc |= cb(dev, svc); /* mog_dev_mkusage */ + switch (hash_insert_if_absent(devhash, dev, NULL)) { case 0: free(dev); @@ -291,11 +294,15 @@ static bool svc_mkusage_each(void *svc, void *nr) return true; } -size_t mog_mkusage_all(void) +size_t mog_mkusage_all(struct mog_queue *q) { - size_t nr = 0; + size_t ndev_new = 0; + + mog_svc_each(svc_mkusage_each, &ndev_new); - mog_svc_each(svc_mkusage_each, &nr); + if (q && ndev_new != ndev) + mog_thrpool_update(q, ndev, ndev_new); + ndev = ndev_new; - return nr; + return ndev; } @@ -4,6 +4,9 @@ */ #include "cmogstored.h" +/* this is set and read without a lock, but harmless if racy */ +size_t mog_user_set_aio_threads; + /* * we can lower this if we can test with lower values, NPTL minimum is 16K. * We also use syslog() and *printf() functions which take a lot of @@ -132,6 +135,36 @@ out: CHECK(int, 0, pthread_mutex_unlock(&tp->lock)); } +/* this is only called by the main (notify) thread */ +void mog_thrpool_update(struct mog_queue *q, size_t ndev_old, size_t ndev_new) +{ + size_t size = ndev_new * 10; + struct mog_thrpool *tp = &q->thrpool; + + if (mog_user_set_aio_threads) { + size_t n_threads; + + CHECK(int, 0, pthread_mutex_lock(&tp->lock)); + n_threads = tp->n_threads; + CHECK(int, 0, pthread_mutex_unlock(&tp->lock)); + + if (n_threads >= ndev_new) + return; + + syslog(LOG_WARNING, + "server aio_threads=%zu is less than devcount=%zu", + n_threads, ndev_new); + + return; + } + + if (ndev_old) + syslog(LOG_INFO, + "devcount(%zu->%zu), updating server aio_threads=%zu", + ndev_old, ndev_new, size); + thrpool_set_size(tp, size); +} + /* * fire and forget, we must run the actual thread count manipulation * in the main notify thread because we may end up terminating the @@ -171,7 +204,7 @@ void mog_thrpool_process_queue(void) if (arg == NULL) return; - syslog(LOG_INFO, "server aio_threads=%u", (unsigned)arg->size); + syslog(LOG_INFO, "server aio_threads=%zu", arg->size); thrpool_set_size(&arg->queue->thrpool, arg->size); free(arg); } |