about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2013-06-15 11:22:11 +0000
committerEric Wong <normalperson@yhbt.net>2013-06-15 11:22:11 +0000
commitdbe7ad541412a1069487c11582b9b40905b7464f (patch)
treede29658d1e31451aa912778428721298d9aab4ba
parent2157035aa98720d36dc6ed8f5be03516b9588811 (diff)
downloadcmogstored-dbe7ad541412a1069487c11582b9b40905b7464f.tar.gz
This will help ensure availability when new devices are added,
without additional user interaction to manually set aio_threads
via sidechannel.
-rw-r--r--TODO1
-rw-r--r--cmogstored.c11
-rw-r--r--cmogstored.h6
-rw-r--r--mgmt_fn.c4
-rw-r--r--notify.c26
-rw-r--r--svc_dev.c17
-rw-r--r--thrpool.c35
7 files changed, 72 insertions, 28 deletions
diff --git a/TODO b/TODO
index 3237765..34f2d99 100644
--- a/TODO
+++ b/TODO
@@ -4,3 +4,4 @@
 * reduce/minimize memory/stack usage
 * optional fsync/fdatasync/O_SYNC/msync for PUT
 * fallocate support?  slow emulation interfaces can be a problem...
+* inotify (and kqueue) support for detecting new device directories
diff --git a/cmogstored.c b/cmogstored.c
index 1c2f7db..3c3f87c 100644
--- a/cmogstored.c
+++ b/cmogstored.c
@@ -261,10 +261,9 @@ MOG_NOINLINE static void setup(int argc, char *argv[])
 
         master_pid = getpid();
 
-        /* 10 - 100 threads based on number of devices, same as mogstored */
-        nthr = mog_mkusage_all() * 10;
+        /* 10 - ??? threads based on number of devices, same as mogstored */
+        nthr = mog_mkusage_all(NULL) * 10;
         nthr = MAX(10, nthr);
-        nthr = MIN(100, nthr);
 }
 
 /* Hash iterator function */
@@ -415,11 +414,11 @@ static void upgrade_handler(void)
         }
 }
 
-static void main_worker_loop(const pid_t parent)
+static void main_worker_loop(struct mog_queue *q, const pid_t parent)
 {
         mog_cancel_disable(); /* mog_idleq_wait() now relies on this */
         while (parent == 0 || parent == getppid()) {
-                mog_notify_wait(have_mgmt);
+                mog_notify_wait(q, have_mgmt);
                 if (sigchld_hit)
                         sigchld_handler();
                 if (do_upgrade)
@@ -454,7 +453,7 @@ static void run_worker(const pid_t parent)
                 if (!iostat_running)
                         syslog(LOG_WARNING, "iostat(1) not available/running");
         }
-        main_worker_loop(parent);
+        main_worker_loop(q, parent);
 }
 
 static void fork_worker(unsigned worker_id)
diff --git a/cmogstored.h b/cmogstored.h
index ee6a6fa..f17d45b 100644
--- a/cmogstored.h
+++ b/cmogstored.h
@@ -351,7 +351,7 @@ void mog_pidfile_upgrade_abort(void);
 bool mog_svc_devstats_broadcast(void *svc, void *ignored);
 void mog_svc_devstats_subscribe(struct mog_mgmt *);
 void mog_svc_dev_shutdown(void);
-size_t mog_mkusage_all(void);
+size_t mog_mkusage_all(struct mog_queue *);
 
 /* cloexec_detect.c */
 extern bool mog_cloexec_atomic;
@@ -386,6 +386,8 @@ void mog_thrpool_start(struct mog_thrpool *, size_t n,
 void mog_thrpool_quit(struct mog_thrpool *, struct mog_queue *);
 void mog_thrpool_set_n_threads(struct mog_queue *q, size_t size);
 void mog_thrpool_process_queue(void);
+void mog_thrpool_update(struct mog_queue *, size_t ndev_old, size_t ndev_new);
+extern size_t mog_user_set_aio_threads;
 
 /* mgmt.c */
 void mog_mgmt_writev(struct mog_mgmt *, struct iovec *, int iovcnt);
@@ -432,7 +434,7 @@ bool mog_open_expire_retry(struct mog_svc *);
 /* notify.c */
 void mog_notify_init(void);
 void mog_notify(enum mog_notification);
-void mog_notify_wait(bool need_usage_file);
+void mog_notify_wait(struct mog_queue *, bool need_usage_file);
 
 /* http_parser.rl */
 void mog_http_reset_parser(struct mog_http *);
diff --git a/mgmt_fn.c b/mgmt_fn.c
index 90dfe40..0ca91f6 100644
--- a/mgmt_fn.c
+++ b/mgmt_fn.c
@@ -194,8 +194,10 @@ void mog_mgmt_fn_aio_threads(struct mog_mgmt *mgmt, char *buf)
         nr = strtoull(nptr, &end, 10);
         assert(*end == 0 && "ragel misfed mog_mgmt_fn_set_aio_threads");
 
-        if (nr > 0 && nr <= 100)
+        if (nr > 0 && nr <= (size_t)INT_MAX) {
+                mog_user_set_aio_threads = (size_t)nr;
                 mog_thrpool_set_n_threads(q, nr);
+        }
 
         IOV_STR(&iov, "\r\n");
         mog_mgmt_writev(mgmt, &iov, 1);
diff --git a/notify.c b/notify.c
index be9221b..a794e51 100644
--- a/notify.c
+++ b/notify.c
@@ -34,9 +34,9 @@ void mog_notify_init(void)
         }
 }
 
-static void global_mkusage(void)
+static void global_mkusage(struct mog_queue *q)
 {
-        mog_mkusage_all();
+        mog_mkusage_all(q);
         usage_file_updated_at = time(NULL);
 }
 
@@ -45,27 +45,27 @@ static inline bool note_xchg(enum mog_notification note, int from, int to)
         return __sync_bool_compare_and_swap(&notes[note], from, to);
 }
 
-static void note_run(void)
+static void note_run(struct mog_queue *q)
 {
         if (note_xchg(MOG_NOTIFY_DEVICE_REFRESH, 1, 0))
-                global_mkusage();
+                global_mkusage(q);
 
         if (note_xchg(MOG_NOTIFY_SET_N_THREADS, 1, 0))
                 mog_thrpool_process_queue();
 }
 
 /* drain the pipe and process notifications */
-static void note_queue_step(struct mog_fd *mfd)
+static void note_queue_step(struct mog_queue *q, struct mog_fd *mfd)
 {
         mog_selfwake_drain(mfd);
-        note_run();
+        note_run(q);
         mog_idleq_push(mfd->as.selfwake.queue, mfd, MOG_QEV_RD);
 }
 
-static void notify_queue_step(struct mog_fd *mfd)
+static void notify_queue_step(struct mog_queue *q, struct mog_fd *mfd)
 {
         switch (mfd->fd_type) {
-        case MOG_FD_TYPE_SELFWAKE: note_queue_step(mfd); return;
+        case MOG_FD_TYPE_SELFWAKE: note_queue_step(q, mfd); return;
         case MOG_FD_TYPE_IOSTAT: mog_iostat_queue_step(mfd); return;
         default:
                 assert(0 && mfd->fd_type && "bad fd_type in queue");
@@ -73,7 +73,7 @@ static void notify_queue_step(struct mog_fd *mfd)
 }
 
 /* this is the main loop of cmogstored */
-void mog_notify_wait(bool need_usage_file)
+void mog_notify_wait(struct mog_queue *q, bool need_usage_file)
 {
         time_t next = usage_file_updated_at + usage_file_interval;
         time_t now = time(NULL);
@@ -81,14 +81,14 @@ void mog_notify_wait(bool need_usage_file)
         struct mog_fd *mfd;
 
         if (next <= now)
-                global_mkusage();
+                global_mkusage(q);
 
         /*
          * epoll_wait() with timeout==0 can avoid some slow paths,
          * so take anything that's already ready before sleeping
          */
         while ((mfd = mog_idleq_wait(mog_notify_queue, 0)))
-                notify_queue_step(mfd);
+                notify_queue_step(q, mfd);
 
         if (need_usage_file == false)
                 timeout = -1;
@@ -99,9 +99,9 @@ void mog_notify_wait(bool need_usage_file)
 
         mfd = mog_idleq_wait_intr(mog_notify_queue, timeout);
         if (mfd)
-                notify_queue_step(mfd);
+                notify_queue_step(q, mfd);
         else if (errno == EINTR)
-                note_run();
+                note_run(q);
 }
 
 /* this is async-signal safe */
diff --git a/svc_dev.c b/svc_dev.c
index 47a67da..8786a50 100644
--- a/svc_dev.c
+++ b/svc_dev.c
@@ -4,6 +4,7 @@
  */
 #include "cmogstored.h"
 #include "compat_memstream.h"
+static size_t ndev;
 
 /*
  * maps multiple "devXXX" directories to the device.
@@ -135,7 +136,9 @@ static int svc_scandev(struct mog_svc *svc, size_t *nr, mog_scandev_cb cb)
                 devlist = svc_devlist(svc, dev->st_dev);
                 devhash = devlist->by_mogdevid;
 
-                if (cb) rc |= cb(dev, svc); /* mog_dev_mkusage */
+                if (cb)
+                        rc |= cb(dev, svc); /* mog_dev_mkusage */
+
                 switch (hash_insert_if_absent(devhash, dev, NULL)) {
                 case 0:
                         free(dev);
@@ -291,11 +294,15 @@ static bool svc_mkusage_each(void *svc, void *nr)
         return true;
 }
 
-size_t mog_mkusage_all(void)
+size_t mog_mkusage_all(struct mog_queue *q)
 {
-        size_t nr = 0;
+        size_t ndev_new = 0;
+
+        mog_svc_each(svc_mkusage_each, &ndev_new);
 
-        mog_svc_each(svc_mkusage_each, &nr);
+        if (q && ndev_new != ndev)
+                mog_thrpool_update(q, ndev, ndev_new);
+        ndev = ndev_new;
 
-        return nr;
+        return ndev;
 }
diff --git a/thrpool.c b/thrpool.c
index 8030e19..3bff31e 100644
--- a/thrpool.c
+++ b/thrpool.c
@@ -4,6 +4,9 @@
  */
 #include "cmogstored.h"
 
+/* this is set and read without a lock, but harmless if racy */
+size_t mog_user_set_aio_threads;
+
 /*
  * we can lower this if we can test with lower values, NPTL minimum is 16K.
  * We also use syslog() and *printf() functions which take a lot of
@@ -132,6 +135,36 @@ out:
         CHECK(int, 0, pthread_mutex_unlock(&tp->lock));
 }
 
+/* this is only called by the main (notify) thread */
+void mog_thrpool_update(struct mog_queue *q, size_t ndev_old, size_t ndev_new)
+{
+        size_t size = ndev_new * 10;
+        struct mog_thrpool *tp = &q->thrpool;
+
+        if (mog_user_set_aio_threads) {
+                size_t n_threads;
+
+                CHECK(int, 0, pthread_mutex_lock(&tp->lock));
+                n_threads = tp->n_threads;
+                CHECK(int, 0, pthread_mutex_unlock(&tp->lock));
+
+                if (n_threads >= ndev_new)
+                        return;
+
+                syslog(LOG_WARNING,
+                        "server aio_threads=%zu is less than devcount=%zu",
+                        n_threads, ndev_new);
+
+                return;
+        }
+
+        if (ndev_old)
+                syslog(LOG_INFO,
+                       "devcount(%zu->%zu), updating server aio_threads=%zu",
+                       ndev_old, ndev_new, size);
+        thrpool_set_size(tp, size);
+}
+
 /*
  * fire and forget, we must run the actual thread count manipulation
  * in the main notify thread because we may end up terminating the
@@ -171,7 +204,7 @@ void mog_thrpool_process_queue(void)
                 if (arg == NULL)
                         return;
 
-                syslog(LOG_INFO, "server aio_threads=%u", (unsigned)arg->size);
+                syslog(LOG_INFO, "server aio_threads=%zu", arg->size);
                 thrpool_set_size(&arg->queue->thrpool, arg->size);
                 free(arg);
         }