about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2013-06-21 03:34:12 +0000
committerEric Wong <normalperson@yhbt.net>2013-06-25 21:24:53 +0000
commite90b43119ff33fb591ffb3bc100cf847537ca5fb (patch)
treeef181b44f4c323aaf53df2716017982616816b69
parent2acbe7f4001de74091282ee199e3cad50c2e3e7f (diff)
downloadcmogstored-e90b43119ff33fb591ffb3bc100cf847537ca5fb.tar.gz
This simplifies code, reduces contention, and reduces the
chances of independent MogileFS instances (with one instance
of cmogstored) stepping over each other.

Most cmogstored deployments are single docroot (for a single
instance of MogileFS), however cmogstored supports multiple
docroots for some rare configurations and we support them here.
-rw-r--r--cmogstored.c28
-rw-r--r--cmogstored.h5
-rw-r--r--notify.c26
-rw-r--r--svc_dev.c24
-rw-r--r--thrpool.c6
5 files changed, 45 insertions, 44 deletions
diff --git a/cmogstored.c b/cmogstored.c
index f87cc67..e2da7e2 100644
--- a/cmogstored.c
+++ b/cmogstored.c
@@ -14,8 +14,6 @@ static struct mog_fd *master_selfwake;
 static sig_atomic_t sigchld_hit;
 static sig_atomic_t do_exit;
 static sig_atomic_t do_upgrade;
-static size_t nthr;
-static bool have_mgmt;
 static pid_t master_pid;
 static pid_t upgrade_pid;
 static unsigned long worker_processes;
@@ -261,18 +259,18 @@ MOG_NOINLINE static void setup(int argc, char *argv[])
 
         master_pid = getpid();
 
-        /* 10 - ??? threads based on number of devices, same as mogstored */
-        nthr = mog_mkusage_all(NULL) * 10;
-        nthr = MAX(10, nthr);
+        /* set svc->nmogdev on all svc */
+        mog_mkusage_all();
 }
 
 /* Hash iterator function */
-static bool svc_start_each(void *svcptr, void *qptr)
+static bool svc_start_each(void *svcptr, void *argptr)
 {
         struct mog_svc *svc = svcptr;
-        struct mog_queue *q = qptr;
+        bool *have_mgmt = argptr;
         struct mog_accept *ac;
         size_t athr = (size_t)num_processors(NPROC_CURRENT);
+        struct mog_queue *q = mog_queue_new();
 
         /*
          * try to distribute accept() callers between workers more evenly
@@ -285,9 +283,11 @@ static bool svc_start_each(void *svcptr, void *qptr)
         athr = worker_processes > 1 ? 1 : MIN(2, athr);
 
         svc->queue = q;
+        mog_thrpool_start(&q->thrpool, 1, mog_queue_loop, q);
+        mog_thrpool_update(q, 0, svc->nmogdev);
 
         if (svc->mgmt_mfd) {
-                have_mgmt = true;
+                *have_mgmt = true;
                 ac = &svc->mgmt_mfd->as.accept;
 
                 /*
@@ -414,11 +414,11 @@ static void upgrade_handler(void)
         }
 }
 
-static void main_worker_loop(struct mog_queue *q, const pid_t parent)
+static void main_worker_loop(const pid_t parent, bool have_mgmt)
 {
         mog_cancel_disable(); /* mog_idleq_wait() now relies on this */
         while (parent == 0 || parent == getppid()) {
-                mog_notify_wait(q, have_mgmt);
+                mog_notify_wait(have_mgmt);
                 if (sigchld_hit)
                         sigchld_handler();
                 if (do_upgrade)
@@ -441,19 +441,17 @@ static void main_worker_loop(struct mog_queue *q, const pid_t parent)
 
 static void run_worker(const pid_t parent)
 {
-        struct mog_queue *q = mog_queue_new();
+        bool have_mgmt = false;
 
         mog_notify_init();
         siginit(worker_wakeup_handler);
-        mog_thrpool_start(&q->thrpool, nthr, mog_queue_loop, q);
-        have_mgmt = false;
-        mog_svc_each(svc_start_each, q); /* this will set have_mgmt */
+        mog_svc_each(svc_start_each, &have_mgmt); /* this will set have_mgmt */
         if (have_mgmt) {
                 iostat_running = mog_iostat_respawn(0);
                 if (!iostat_running)
                         syslog(LOG_WARNING, "iostat(1) not available/running");
         }
-        main_worker_loop(q, parent);
+        main_worker_loop(parent, have_mgmt);
 }
 
 static void fork_worker(unsigned worker_id)
diff --git a/cmogstored.h b/cmogstored.h
index f17d45b..6bbd786 100644
--- a/cmogstored.h
+++ b/cmogstored.h
@@ -137,6 +137,7 @@ struct mog_queue;
 struct mog_svc {
         int docroot_fd;
         const char *docroot;
+        size_t nmogdev;
 
         /* private */
         DIR *dir;
@@ -351,7 +352,7 @@ void mog_pidfile_upgrade_abort(void);
 bool mog_svc_devstats_broadcast(void *svc, void *ignored);
 void mog_svc_devstats_subscribe(struct mog_mgmt *);
 void mog_svc_dev_shutdown(void);
-size_t mog_mkusage_all(struct mog_queue *);
+void mog_mkusage_all(void);
 
 /* cloexec_detect.c */
 extern bool mog_cloexec_atomic;
@@ -434,7 +435,7 @@ bool mog_open_expire_retry(struct mog_svc *);
 /* notify.c */
 void mog_notify_init(void);
 void mog_notify(enum mog_notification);
-void mog_notify_wait(struct mog_queue *, bool need_usage_file);
+void mog_notify_wait(bool need_usage_file);
 
 /* http_parser.rl */
 void mog_http_reset_parser(struct mog_http *);
diff --git a/notify.c b/notify.c
index a794e51..be9221b 100644
--- a/notify.c
+++ b/notify.c
@@ -34,9 +34,9 @@ void mog_notify_init(void)
         }
 }
 
-static void global_mkusage(struct mog_queue *q)
+static void global_mkusage(void)
 {
-        mog_mkusage_all(q);
+        mog_mkusage_all();
         usage_file_updated_at = time(NULL);
 }
 
@@ -45,27 +45,27 @@ static inline bool note_xchg(enum mog_notification note, int from, int to)
         return __sync_bool_compare_and_swap(&notes[note], from, to);
 }
 
-static void note_run(struct mog_queue *q)
+static void note_run(void)
 {
         if (note_xchg(MOG_NOTIFY_DEVICE_REFRESH, 1, 0))
-                global_mkusage(q);
+                global_mkusage();
 
         if (note_xchg(MOG_NOTIFY_SET_N_THREADS, 1, 0))
                 mog_thrpool_process_queue();
 }
 
 /* drain the pipe and process notifications */
-static void note_queue_step(struct mog_queue *q, struct mog_fd *mfd)
+static void note_queue_step(struct mog_fd *mfd)
 {
         mog_selfwake_drain(mfd);
-        note_run(q);
+        note_run();
         mog_idleq_push(mfd->as.selfwake.queue, mfd, MOG_QEV_RD);
 }
 
-static void notify_queue_step(struct mog_queue *q, struct mog_fd *mfd)
+static void notify_queue_step(struct mog_fd *mfd)
 {
         switch (mfd->fd_type) {
-        case MOG_FD_TYPE_SELFWAKE: note_queue_step(q, mfd); return;
+        case MOG_FD_TYPE_SELFWAKE: note_queue_step(mfd); return;
         case MOG_FD_TYPE_IOSTAT: mog_iostat_queue_step(mfd); return;
         default:
                 assert(0 && mfd->fd_type && "bad fd_type in queue");
@@ -73,7 +73,7 @@ static void notify_queue_step(struct mog_queue *q, struct mog_fd *mfd)
 }
 
 /* this is the main loop of cmogstored */
-void mog_notify_wait(struct mog_queue *q, bool need_usage_file)
+void mog_notify_wait(bool need_usage_file)
 {
         time_t next = usage_file_updated_at + usage_file_interval;
         time_t now = time(NULL);
@@ -81,14 +81,14 @@ void mog_notify_wait(struct mog_queue *q, bool need_usage_file)
         struct mog_fd *mfd;
 
         if (next <= now)
-                global_mkusage(q);
+                global_mkusage();
 
         /*
          * epoll_wait() with timeout==0 can avoid some slow paths,
          * so take anything that's already ready before sleeping
          */
         while ((mfd = mog_idleq_wait(mog_notify_queue, 0)))
-                notify_queue_step(q, mfd);
+                notify_queue_step(mfd);
 
         if (need_usage_file == false)
                 timeout = -1;
@@ -99,9 +99,9 @@ void mog_notify_wait(struct mog_queue *q, bool need_usage_file)
 
         mfd = mog_idleq_wait_intr(mog_notify_queue, timeout);
         if (mfd)
-                notify_queue_step(q, mfd);
+                notify_queue_step(mfd);
         else if (errno == EINTR)
-                note_run(q);
+                note_run();
 }
 
 /* this is async-signal safe */
diff --git a/svc_dev.c b/svc_dev.c
index 8786a50..566602c 100644
--- a/svc_dev.c
+++ b/svc_dev.c
@@ -4,7 +4,6 @@
  */
 #include "cmogstored.h"
 #include "compat_memstream.h"
-static size_t ndev;
 
 /*
  * maps multiple "devXXX" directories to the device.
@@ -287,22 +286,21 @@ void mog_svc_dev_shutdown(void)
         mog_svc_each(devstats_shutdown_i, NULL);
 }
 
-static bool svc_mkusage_each(void *svc, void *nr)
+static bool svc_mkusage_each(void *svcptr, void *ignored)
 {
-        svc_scandev((struct mog_svc *)svc, nr, mog_dev_mkusage);
+        struct mog_svc *svc = svcptr;
+        size_t ndev = 0;
+
+        svc_scandev(svc, &ndev, mog_dev_mkusage);
+
+        if (svc->queue && (svc->nmogdev != ndev))
+                mog_thrpool_update(svc->queue, svc->nmogdev, ndev);
+        svc->nmogdev = ndev;
 
         return true;
 }
 
-size_t mog_mkusage_all(struct mog_queue *q)
+void mog_mkusage_all(void)
 {
-        size_t ndev_new = 0;
-
-        mog_svc_each(svc_mkusage_each, &ndev_new);
-
-        if (q && ndev_new != ndev)
-                mog_thrpool_update(q, ndev, ndev_new);
-        ndev = ndev_new;
-
-        return ndev;
+        mog_svc_each(svc_mkusage_each, NULL);
 }
diff --git a/thrpool.c b/thrpool.c
index cf1afba..7167239 100644
--- a/thrpool.c
+++ b/thrpool.c
@@ -24,6 +24,7 @@ size_t mog_user_set_aio_threads;
 #  define MOG_THR_STACK_SIZE (0)
 #endif
 static const size_t stacksize = (size_t)MOG_THR_STACK_SIZE;
+static const size_t thr_per_dev = 10;
 
 static pthread_mutex_t sat_lock = PTHREAD_MUTEX_INITIALIZER;
 struct sat_arg;
@@ -138,7 +139,7 @@ out:
 /* this is only called by the main (notify) thread */
 void mog_thrpool_update(struct mog_queue *q, size_t ndev_old, size_t ndev_new)
 {
-        size_t size = ndev_new * 10;
+        size_t size = ndev_new * thr_per_dev;
         struct mog_thrpool *tp = &q->thrpool;
 
         if (mog_user_set_aio_threads) {
@@ -158,6 +159,9 @@ void mog_thrpool_update(struct mog_queue *q, size_t ndev_old, size_t ndev_new)
                 return;
         }
 
+        if (size < thr_per_dev)
+                size = thr_per_dev;
+
         if (ndev_old)
                 syslog(LOG_INFO,
                        "devcount(%zu->%zu), updating server aio_threads=%zu",