about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2013-06-21 03:34:21 +0000
committerEric Wong <normalperson@yhbt.net>2013-06-25 21:27:53 +0000
commitd5a52618ca1f9b5d7f6998716fbfe7714f927112 (patch)
tree927845365d19c847368b4f678e39697303802bf1
parent03c2391078e19dc36ea62c75fa6745569b5cbef6 (diff)
downloadcmogstored-d5a52618ca1f9b5d7f6998716fbfe7714f927112.tar.gz
We're using per-svc-based thread pools, so different MogileFS
instances we serve no longer affect each other.  This means
changing the aio_threads count only affects the svc of the
sidechannel port which triggered the change.
-rw-r--r--cmogstored.c86
-rw-r--r--cmogstored.h15
-rw-r--r--mgmt_fn.c7
-rw-r--r--notify.c6
-rw-r--r--notify.h2
-rw-r--r--svc.c148
-rw-r--r--svc_dev.c2
-rw-r--r--test/mgmt.rb8
-rw-r--r--thrpool.c98
9 files changed, 195 insertions, 177 deletions
diff --git a/cmogstored.c b/cmogstored.c
index e2da7e2..e7cc154 100644
--- a/cmogstored.c
+++ b/cmogstored.c
@@ -16,9 +16,10 @@ static sig_atomic_t do_exit;
 static sig_atomic_t do_upgrade;
 static pid_t master_pid;
 static pid_t upgrade_pid;
-static unsigned long worker_processes;
 static bool iostat_running;
 
+static struct mog_main mog_main;
+
 #define CFG_KEY(f) -((int)offsetof(struct mog_cfg,f) + 1)
 static struct argp_option options[] = {
         { .name = "daemonize", .key = 'd',
@@ -123,8 +124,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state)
                 break;
         case -'M': mog_cfg_multi = true; break;
         case -'W':
-                check_strtoul(&worker_processes, arg, "worker-processes");
-                if (worker_processes > UINT_MAX)
+                check_strtoul(&mog_main.worker_processes, arg,
+                                "worker-processes");
+                if (mog_main.worker_processes > UINT_MAX)
                         die("--worker-processes exceeded (max=%u)", UINT_MAX);
                 break;
         case ARGP_KEY_ARG:
@@ -263,53 +265,6 @@ MOG_NOINLINE static void setup(int argc, char *argv[])
         mog_mkusage_all();
 }
 
-/* Hash iterator function */
-static bool svc_start_each(void *svcptr, void *argptr)
-{
-        struct mog_svc *svc = svcptr;
-        bool *have_mgmt = argptr;
-        struct mog_accept *ac;
-        size_t athr = (size_t)num_processors(NPROC_CURRENT);
-        struct mog_queue *q = mog_queue_new();
-
-        /*
-         * try to distribute accept() callers between workers more evenly
-         * with wake-one accept() behavior by trimming down on acceptors
-         * having too many acceptor threads does not make sense, these
-         * threads are only bounded by lock contention and local bus speeds.
-         * Increasing threads here just leads to lock contention inside the
-         * kernel (accept/accept4/EPOLL_CTL_ADD)
-         */
-        athr = worker_processes > 1 ? 1 : MIN(2, athr);
-
-        svc->queue = q;
-        mog_thrpool_start(&q->thrpool, 1, mog_queue_loop, q);
-        mog_thrpool_update(q, 0, svc->nmogdev);
-
-        if (svc->mgmt_mfd) {
-                *have_mgmt = true;
-                ac = &svc->mgmt_mfd->as.accept;
-
-                /*
-                 * mgmt port is rarely used and always persistent, so it
-                 * does not need multiple threads for blocking accept()
-                 */
-                mog_thrpool_start(&ac->thrpool, 1, mog_accept_loop, ac);
-        }
-
-        if (svc->http_mfd) {
-                ac = &svc->http_mfd->as.accept;
-                mog_thrpool_start(&ac->thrpool, athr, mog_accept_loop, ac);
-        }
-
-        if (svc->httpget_mfd) {
-                ac = &svc->httpget_mfd->as.accept;
-                mog_thrpool_start(&ac->thrpool, athr, mog_accept_loop, ac);
-        }
-
-        return true;
-}
-
 static void worker_wakeup_handler(int signum)
 {
         switch (signum) {
@@ -414,20 +369,20 @@ static void upgrade_handler(void)
         }
 }
 
-static void main_worker_loop(const pid_t parent, bool have_mgmt)
+static void main_worker_loop(const pid_t parent)
 {
         mog_cancel_disable(); /* mog_idleq_wait() now relies on this */
         while (parent == 0 || parent == getppid()) {
-                mog_notify_wait(have_mgmt);
+                mog_notify_wait(mog_main.have_mgmt);
                 if (sigchld_hit)
                         sigchld_handler();
                 if (do_upgrade)
                         upgrade_handler();
                 if (do_exit)
                         cmogstored_exit();
-                if (have_mgmt)
+                if (mog_main.have_mgmt)
                         mog_mnt_refresh();
-                else if (have_mgmt && !iostat_running && !do_exit)
+                else if (mog_main.have_mgmt && !iostat_running && !do_exit)
                         /*
                          * maybe iostat was not installed/available/usable at
                          * startup, but became usable later
@@ -441,17 +396,18 @@ static void main_worker_loop(const pid_t parent, bool have_mgmt)
 
 static void run_worker(const pid_t parent)
 {
-        bool have_mgmt = false;
-
         mog_notify_init();
         siginit(worker_wakeup_handler);
-        mog_svc_each(svc_start_each, &have_mgmt); /* this will set have_mgmt */
-        if (have_mgmt) {
+
+        /* this can set mog_main->have_mgmt */
+        mog_svc_each(mog_svc_start_each, &mog_main);
+
+        if (mog_main.have_mgmt) {
                 iostat_running = mog_iostat_respawn(0);
                 if (!iostat_running)
                         syslog(LOG_WARNING, "iostat(1) not available/running");
         }
-        main_worker_loop(parent, have_mgmt);
+        main_worker_loop(parent);
 }
 
 static void fork_worker(unsigned worker_id)
@@ -463,8 +419,6 @@ static void fork_worker(unsigned worker_id)
         if (pid > 0) {
                 mog_process_register(pid, worker_id);
         } else if (pid == 0) {
-                /* workers have no workers of their own */
-                worker_processes = 0;
                 mog_process_reset();
 
                 /* worker will call mog_intr_enable() later in notify loop */
@@ -508,7 +462,7 @@ static void process_died(pid_t pid, int status)
 
         switch (id) {
         case MOG_PROC_IOSTAT:
-                assert(worker_processes == 0 &&
+                assert(mog_main.worker_processes == 0 &&
                        "master process registered iostat process");
                 iostat_died(pid, status);
                 return;
@@ -532,12 +486,12 @@ static void process_died(pid_t pid, int status)
 static void run_master(void)
 {
         unsigned id;
-        size_t running = worker_processes;
+        size_t running = mog_main.worker_processes;
 
         master_selfwake = mog_selfwake_new();
         siginit(master_wakeup_handler);
 
-        for (id = 0; id < worker_processes; id++)
+        for (id = 0; id < mog_main.worker_processes; id++)
                 fork_worker(id);
 
         while (running > 0) {
@@ -561,8 +515,8 @@ int main(int argc, char *argv[], char *envp[])
         mog_intr_disable();
         setup(argc, argv); /* this daemonizes */
 
-        mog_process_init(worker_processes);
-        if (worker_processes == 0)
+        mog_process_init(mog_main.worker_processes);
+        if (mog_main.worker_processes == 0)
                 run_worker(0);
         else
                 run_master();
diff --git a/cmogstored.h b/cmogstored.h
index 0b5a7bf..fd10e6d 100644
--- a/cmogstored.h
+++ b/cmogstored.h
@@ -139,6 +139,7 @@ struct mog_svc {
         int docroot_fd;
         const char *docroot;
         size_t nmogdev;
+        size_t user_set_aio_threads; /* only touched by main/notify thread */
 
         /* private */
         DIR *dir;
@@ -332,6 +333,10 @@ struct mog_svc *mog_svc_new(const char *docroot);
 typedef int (*mog_scandev_cb)(const struct mog_dev *, struct mog_svc *);
 size_t mog_svc_each(Hash_processor processor, void *data);
 void mog_svc_upgrade_prepare(void);
+bool mog_svc_start_each(void *svc_ptr, void *have_mgmt_ptr);
+void mog_svc_thrpool_rescale(struct mog_svc *, size_t ndev_new);
+void mog_svc_aio_threads_enqueue(struct mog_svc *, size_t nr);
+void mog_svc_aio_threads_handler(void);
 
 /* dev.c */
 struct mog_dev * mog_dev_for(struct mog_svc *, uint32_t mog_devid);
@@ -392,10 +397,8 @@ char *mog_canonpath_die(const char *path, enum canonicalize_mode_t canon_mode);
 void mog_thrpool_start(struct mog_thrpool *, size_t n,
                        void *(*start_fn)(void *), void *arg);
 void mog_thrpool_quit(struct mog_thrpool *, struct mog_queue *);
-void mog_thrpool_set_n_threads(struct mog_queue *q, size_t size);
 void mog_thrpool_process_queue(void);
-void mog_thrpool_update(struct mog_queue *, size_t ndev_old, size_t ndev_new);
-extern size_t mog_user_set_aio_threads;
+void mog_thrpool_set_size(struct mog_thrpool *, size_t size);
 
 /* mgmt.c */
 void mog_mgmt_writev(struct mog_mgmt *, struct iovec *, int iovcnt);
@@ -530,6 +533,12 @@ void mog_iou_active(dev_t);
 #  define MOG_TCP_NOPUSH (0)
 #endif
 
+/* publically visible attributes of the current process */
+struct mog_main {
+        unsigned long worker_processes;
+        bool have_mgmt;
+};
+
 /* cmogstored.c */
 void cmogstored_quit(void);
 
diff --git a/mgmt_fn.c b/mgmt_fn.c
index 0ca91f6..b64c4ac 100644
--- a/mgmt_fn.c
+++ b/mgmt_fn.c
@@ -183,7 +183,6 @@ void mog_mgmt_fn_aio_threads(struct mog_mgmt *mgmt, char *buf)
 {
         char *end;
         unsigned long long nr;
-        struct mog_queue *q = mgmt->svc->queue;
         char *nptr = buf + mgmt->mark[0];
         char *eor = nptr + mgmt->mark[1] - mgmt->mark[0];
         struct iovec iov;
@@ -194,10 +193,8 @@ void mog_mgmt_fn_aio_threads(struct mog_mgmt *mgmt, char *buf)
         nr = strtoull(nptr, &end, 10);
         assert(*end == 0 && "ragel misfed mog_mgmt_fn_set_aio_threads");
 
-        if (nr > 0 && nr <= (size_t)INT_MAX) {
-                mog_user_set_aio_threads = (size_t)nr;
-                mog_thrpool_set_n_threads(q, nr);
-        }
+        if (nr > 0 && nr <= (size_t)INT_MAX)
+                mog_svc_aio_threads_enqueue(mgmt->svc, nr);
 
         IOV_STR(&iov, "\r\n");
         mog_mgmt_writev(mgmt, &iov, 1);
diff --git a/notify.c b/notify.c
index be9221b..0dad6c6 100644
--- a/notify.c
+++ b/notify.c
@@ -50,8 +50,8 @@ static void note_run(void)
         if (note_xchg(MOG_NOTIFY_DEVICE_REFRESH, 1, 0))
                 global_mkusage();
 
-        if (note_xchg(MOG_NOTIFY_SET_N_THREADS, 1, 0))
-                mog_thrpool_process_queue();
+        if (note_xchg(MOG_NOTIFY_AIO_THREADS, 1, 0))
+                mog_svc_aio_threads_handler();
 }
 
 /* drain the pipe and process notifications */
@@ -109,7 +109,7 @@ void mog_notify(enum mog_notification note)
 {
         switch (note) {
         case MOG_NOTIFY_DEVICE_REFRESH:
-        case MOG_NOTIFY_SET_N_THREADS:
+        case MOG_NOTIFY_AIO_THREADS:
                 note_xchg(note, 0, 1);
                 mog_selfwake_interrupt();
                 break;
diff --git a/notify.h b/notify.h
index f6a6f37..66b065a 100644
--- a/notify.h
+++ b/notify.h
@@ -5,7 +5,7 @@
 enum mog_notification {
         MOG_NOTIFY_SIGNAL = -1,
         MOG_NOTIFY_DEVICE_REFRESH = 0,
-        MOG_NOTIFY_SET_N_THREADS = 1,
+        MOG_NOTIFY_AIO_THREADS = 1,
         MOG_NOTIFY_MAX
 };
 
diff --git a/svc.c b/svc.c
index e9d8d6d..834117d 100644
--- a/svc.c
+++ b/svc.c
@@ -11,6 +11,27 @@
 static pthread_mutex_t svc_lock = PTHREAD_MUTEX_INITIALIZER;
 static Hash_table *by_docroot; /* enforce one mog_svc per docroot: */
 static mode_t mog_umask;
+static const size_t thr_per_dev = 10;
+
+/*
+ * maintain an internal queue of requests for the "server aio_threads = N"
+ * command in the side channel.  The worker handling the client request must
+ * tell the main thread do change thread counts asynchronously (because
+ * the worker thread handling the request may die from a thread count
+ * reduction, so we have a worker thread make a fire-and-forget request
+ * to the notify thread.
+ */
+static pthread_mutex_t aio_threads_lock = PTHREAD_MUTEX_INITIALIZER;
+struct aio_threads_req;
+struct aio_threads_req {
+        struct mog_svc *svc;
+        size_t size;
+        SIMPLEQ_ENTRY(aio_threads_req) qentry;
+};
+
+static SIMPLEQ_HEAD(sq, aio_threads_req) aio_threads_qhead =
+                                SIMPLEQ_HEAD_INITIALIZER(aio_threads_qhead);
+
 
 static void svc_free(void *ptr)
 {
@@ -147,3 +168,130 @@ void mog_svc_upgrade_prepare(void)
 {
         (void)hash_do_for_each(by_docroot, svc_cloexec_off_i, NULL);
 }
+
+/* this is only called by the main (notify) thread */
+void mog_svc_thrpool_rescale(struct mog_svc *svc, size_t ndev_new)
+{
+        size_t size = ndev_new * thr_per_dev;
+        struct mog_thrpool *tp = &svc->queue->thrpool;
+
+        /* respect user-setting */
+        if (svc->user_set_aio_threads) {
+                if (tp->n_threads >= ndev_new)
+                        return;
+
+                syslog(LOG_WARNING,
+                        "server aio_threads=%zu is less than devcount=%zu",
+                        tp->n_threads, ndev_new);
+
+                return;
+        }
+
+        if (size < thr_per_dev)
+                size = thr_per_dev;
+
+        if (svc->nmogdev)
+                syslog(LOG_INFO,
+                       "devcount(%zu->%zu), updating server aio_threads=%zu",
+                       svc->nmogdev, ndev_new, size);
+        mog_thrpool_set_size(tp, size);
+}
+
+/* Hash iterator function */
+bool mog_svc_start_each(void *svc_ptr, void *main_ptr)
+{
+        struct mog_svc *svc = svc_ptr;
+        struct mog_main *mog_main = main_ptr;
+        struct mog_accept *ac;
+        size_t athr = (size_t)num_processors(NPROC_CURRENT);
+        struct mog_queue *q = mog_queue_new();
+        size_t nthr = svc->nmogdev ? svc->nmogdev * thr_per_dev : thr_per_dev;
+
+        /*
+         * try to distribute accept() callers between workers more evenly
+         * with wake-one accept() behavior by trimming down on acceptors
+         * having too many acceptor threads does not make sense, these
+         * threads are only bounded by lock contention and local bus speeds.
+         * Increasing threads here just leads to lock contention inside the
+         * kernel (accept/accept4/EPOLL_CTL_ADD)
+         */
+        athr = mog_main->worker_processes > 1 ? 1 : MIN(2, athr);
+
+        svc->queue = q;
+        mog_thrpool_start(&q->thrpool, nthr, mog_queue_loop, q);
+
+        if (svc->mgmt_mfd) {
+                mog_main->have_mgmt = true;
+                ac = &svc->mgmt_mfd->as.accept;
+
+                /*
+                 * mgmt port is rarely used and always persistent, so it
+                 * does not need multiple threads for blocking accept()
+                 */
+                mog_thrpool_start(&ac->thrpool, 1, mog_accept_loop, ac);
+        }
+
+        if (svc->http_mfd) {
+                ac = &svc->http_mfd->as.accept;
+                mog_thrpool_start(&ac->thrpool, athr, mog_accept_loop, ac);
+        }
+
+        if (svc->httpget_mfd) {
+                ac = &svc->httpget_mfd->as.accept;
+                mog_thrpool_start(&ac->thrpool, athr, mog_accept_loop, ac);
+        }
+
+        return true;
+}
+
+/*
+ * Fire and forget, we must run the actual thread count manipulation
+ * in the main notify thread because we may end up terminating the
+ * thread which invoked this.
+ *
+ * Called by threads inside the thrpool to wake-up the main/notify thread.
+ */
+void mog_svc_aio_threads_enqueue(struct mog_svc *svc, size_t size)
+{
+        struct aio_threads_req *req;
+
+        /* this gets free'ed in mog_thrpool_process_queue() */
+        req = xmalloc(sizeof(struct aio_threads_req));
+        req->size = size;
+        req->svc = svc;
+
+        /* put into the queue so main thread can process it */
+        CHECK(int, 0, pthread_mutex_lock(&aio_threads_lock));
+        SIMPLEQ_INSERT_TAIL(&aio_threads_qhead, req, qentry);
+        CHECK(int, 0, pthread_mutex_unlock(&aio_threads_lock));
+
+        /* wake up the main thread so it can process the queue */
+        mog_notify(MOG_NOTIFY_AIO_THREADS);
+}
+
+/* this runs in the main (notify) thread */
+void mog_svc_aio_threads_handler(void)
+{
+        struct aio_threads_req *req;
+
+        /* guard against requests bundled in one wakeup by looping here */
+        for (;;) {
+                CHECK(int, 0, pthread_mutex_lock(&aio_threads_lock));
+                req = SIMPLEQ_FIRST(&aio_threads_qhead);
+                if (req)
+                        SIMPLEQ_REMOVE_HEAD(&aio_threads_qhead, qentry);
+                CHECK(int, 0, pthread_mutex_unlock(&aio_threads_lock));
+
+                /*
+                 * spurious wakeup is possible since we loop here
+                 * (and we must loop, see above comment)
+                 */
+                if (req == NULL)
+                        return;
+
+                syslog(LOG_INFO, "server aio_threads=%zu", req->size);
+                req->svc->user_set_aio_threads = req->size;
+                mog_thrpool_set_size(&req->svc->queue->thrpool, req->size);
+                free(req);
+        }
+}
diff --git a/svc_dev.c b/svc_dev.c
index ab21211..5d19580 100644
--- a/svc_dev.c
+++ b/svc_dev.c
@@ -283,7 +283,7 @@ static bool svc_mkusage_each(void *svcptr, void *ignored)
         svc_scandev(svc, &ndev, mog_dev_mkusage);
 
         if (svc->queue && (svc->nmogdev != ndev))
-                mog_thrpool_update(svc->queue, svc->nmogdev, ndev);
+                mog_svc_thrpool_rescale(svc, ndev);
         svc->nmogdev = ndev;
 
         return true;
diff --git a/test/mgmt.rb b/test/mgmt.rb
index 373cd69..deec383 100644
--- a/test/mgmt.rb
+++ b/test/mgmt.rb
@@ -270,17 +270,19 @@ class TestMgmt < Test::Unit::TestCase
     t_yield # wait for threads to spawn
     taskdir = "/proc/#@pid/task"
     glob = "#{taskdir}/*"
-    nr_threads = Dir[glob].size if File.directory?(taskdir)
+    prev_threads = Dir[glob].size if File.directory?(taskdir)
     @client.write "server aio_threads = 1\r\n"
     assert_equal "\r\n", @client.gets
     if RUBY_PLATFORM =~ /linux/
       assert File.directory?(taskdir), "/proc not mounted on Linux?"
     end
     if File.directory?(taskdir)
-      while nr_threads == Dir[glob].size && (tries -= 1) > 0
+      while prev_threads == Dir[glob].size && (tries -= 1) > 0
         sleep(0.1)
       end
-      assert nr_threads != Dir[glob].size
+      cur_threads = Dir[glob].size
+      assert prev_threads != cur_threads,
+             "prev_threads=#{prev_threads} != cur_threads=#{cur_threads}"
     end
     @client.write "server aio_threads=6\r\n"
     assert_equal "\r\n", @client.gets
diff --git a/thrpool.c b/thrpool.c
index 7167239..96246a8 100644
--- a/thrpool.c
+++ b/thrpool.c
@@ -4,9 +4,6 @@
  */
 #include "cmogstored.h"
 
-/* this is set and read without a lock, but harmless if racy */
-size_t mog_user_set_aio_threads;
-
 /*
  * we can lower this if we can test with lower values, NPTL minimum is 16K.
  * We also use syslog() and *printf() functions which take a lot of
@@ -24,17 +21,6 @@ size_t mog_user_set_aio_threads;
 #  define MOG_THR_STACK_SIZE (0)
 #endif
 static const size_t stacksize = (size_t)MOG_THR_STACK_SIZE;
-static const size_t thr_per_dev = 10;
-
-static pthread_mutex_t sat_lock = PTHREAD_MUTEX_INITIALIZER;
-struct sat_arg;
-struct sat_arg {
-        struct mog_queue *queue;
-        size_t size;
-        SIMPLEQ_ENTRY(sat_arg) qentry;
-};
-
-static SIMPLEQ_HEAD(sq, sat_arg) satqhead = SIMPLEQ_HEAD_INITIALIZER(satqhead);
 
 /*
  * kevent() sleep is not a cancellation point, so it's possible for
@@ -72,7 +58,7 @@ thr_create_fail_retry(struct mog_thrpool *tp, size_t size,
         }
 }
 
-static void thrpool_set_size(struct mog_thrpool *tp, size_t size)
+void mog_thrpool_set_size(struct mog_thrpool *tp, size_t size)
 {
         unsigned long nr_eagain = 0;
 
@@ -136,84 +122,6 @@ out:
         CHECK(int, 0, pthread_mutex_unlock(&tp->lock));
 }
 
-/* this is only called by the main (notify) thread */
-void mog_thrpool_update(struct mog_queue *q, size_t ndev_old, size_t ndev_new)
-{
-        size_t size = ndev_new * thr_per_dev;
-        struct mog_thrpool *tp = &q->thrpool;
-
-        if (mog_user_set_aio_threads) {
-                size_t n_threads;
-
-                CHECK(int, 0, pthread_mutex_lock(&tp->lock));
-                n_threads = tp->n_threads;
-                CHECK(int, 0, pthread_mutex_unlock(&tp->lock));
-
-                if (n_threads >= ndev_new)
-                        return;
-
-                syslog(LOG_WARNING,
-                        "server aio_threads=%zu is less than devcount=%zu",
-                        n_threads, ndev_new);
-
-                return;
-        }
-
-        if (size < thr_per_dev)
-                size = thr_per_dev;
-
-        if (ndev_old)
-                syslog(LOG_INFO,
-                       "devcount(%zu->%zu), updating server aio_threads=%zu",
-                       ndev_old, ndev_new, size);
-        thrpool_set_size(tp, size);
-}
-
-/*
- * fire and forget, we must run the actual thread count manipulation
- * in the main notify thread because we may end up terminating the
- * thread which invoked this.
- */
-void mog_thrpool_set_n_threads(struct mog_queue *q, size_t size)
-{
-        struct sat_arg *arg;
-
-        /* this gets free'ed in mog_thrpool_process_queue() */
-        arg = xmalloc(sizeof(struct sat_arg));
-        arg->size = size;
-        arg->queue = q;
-
-        /* put into the queue so main thread can process it */
-        CHECK(int, 0, pthread_mutex_lock(&sat_lock));
-        SIMPLEQ_INSERT_TAIL(&satqhead, arg, qentry);
-        CHECK(int, 0, pthread_mutex_unlock(&sat_lock));
-
-        /* wake up the main thread so it can process the queue */
-        mog_notify(MOG_NOTIFY_SET_N_THREADS);
-}
-
-/* this runs in the main (notify) thread */
-void mog_thrpool_process_queue(void)
-{
-        /* guard against requests bundled in one wakeup by looping here */
-        for (;;) {
-                struct sat_arg *arg;
-
-                CHECK(int, 0, pthread_mutex_lock(&sat_lock));
-                arg = SIMPLEQ_FIRST(&satqhead);
-                if (arg)
-                        SIMPLEQ_REMOVE_HEAD(&satqhead, qentry);
-                CHECK(int, 0, pthread_mutex_unlock(&sat_lock));
-
-                if (arg == NULL)
-                        return;
-
-                syslog(LOG_INFO, "server aio_threads=%zu", arg->size);
-                thrpool_set_size(&arg->queue->thrpool, arg->size);
-                free(arg);
-        }
-}
-
 void
 mog_thrpool_start(struct mog_thrpool *tp, size_t n,
                   void *(*start_fn)(void *), void *arg)
@@ -227,12 +135,12 @@ mog_thrpool_start(struct mog_thrpool *tp, size_t n,
         tp->start_fn = start_fn;
         tp->start_arg = arg;
         CHECK(int, 0, pthread_mutex_init(&tp->lock, NULL));
-        thrpool_set_size(tp, n);
+        mog_thrpool_set_size(tp, n);
 }
 
 void mog_thrpool_quit(struct mog_thrpool *tp, struct mog_queue *q)
 {
-        thrpool_set_size(tp, 0);
+        mog_thrpool_set_size(tp, 0);
         CHECK(int, 0, pthread_mutex_destroy(&tp->lock));
         mog_free_and_null(&tp->threads);
 }