diff options
author | Eric Wong <normalperson@yhbt.net> | 2012-11-09 04:46:43 +0000 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2012-11-12 20:29:12 +0000 |
commit | 5c73abe7d6acc66ff7177e31dd0c75f67c53d9f0 (patch) | |
tree | 295819f818fc64dc1074745fc700efeed4cfd315 /thrpool.c | |
parent | 1d7604267fc661f45a95e3d4ebda5bababe7bb8a (diff) | |
download | cmogstored-5c73abe7d6acc66ff7177e31dd0c75f67c53d9f0.tar.gz |
mgmt: support "server aio_threads = <digit>"
This allows tunable thread counts at runtime like regular mogstored (using Perlbal).
Diffstat (limited to 'thrpool.c')
-rw-r--r-- | thrpool.c | 138 |
1 files changed, 107 insertions, 31 deletions
@@ -22,20 +22,27 @@ #endif static const size_t stacksize = (size_t)MOG_THR_STACK_SIZE; -void -mog_thrpool_start(struct mog_thrpool *tp, size_t n, - void *(*start_fn)(void *), void *arg) +static pthread_mutex_t sat_lock = PTHREAD_MUTEX_INITIALIZER; +struct sat_arg; +struct sat_arg { + struct mog_queue *queue; + size_t size; + SIMPLEQ_ENTRY(sat_arg) qentry; +}; + +static SIMPLEQ_HEAD(sq, sat_arg) satqhead = SIMPLEQ_HEAD_INITIALIZER(satqhead); + +static void +thrpool_set_size(struct mog_thrpool *tp, size_t size, struct mog_queue *q) { - size_t i; pthread_t *thr; - if (n == 0) - n = 1; - tp->threads = thr = xmalloc(sizeof(pthread_t) * n); - tp->n_threads = n; - - for (i = 0; i < n; i++, thr++) { + CHECK(int, 0, pthread_mutex_lock(&tp->lock)); + while (size > tp->n_threads) { pthread_attr_t attr; + size_t bytes = (tp->n_threads + 1) * sizeof(pthread_t); + + tp->threads = xrealloc(tp->threads, bytes); CHECK(int, 0, pthread_attr_init(&attr)); @@ -44,34 +51,103 @@ mog_thrpool_start(struct mog_thrpool *tp, size_t n, pthread_attr_setstacksize(&attr, stacksize)); } - CHECK(int, 0, pthread_create(thr, &attr, start_fn, arg)); + thr = tp->threads + tp->n_threads; + + CHECK(int, 0, + pthread_create(thr, &attr, tp->start_fn, tp->start_arg)); CHECK(int, 0, pthread_attr_destroy(&attr)); + tp->n_threads++; } + + if (tp->n_threads > size) { + size_t i; + + for (i = size; i < tp->n_threads; i++) + CHECK(int, 0, pthread_cancel(tp->threads[i])); + for (i = size; i < tp->n_threads; i++) { + int err; + + thr = tp->threads + i; + if (q) { + /* + * if we can't rely on cancellation, + * keep poking the thread until it wakes + * up from cancellation. + */ + while ((err = pthread_kill(*thr, SIGURG)) == 0) + sched_yield(); + assert(err == ESRCH && + "pthread_kill() usage bug"); + } + CHECK(int, 0, pthread_join(*thr, NULL)); + } + tp->n_threads = size; + } + CHECK(int, 0, pthread_mutex_unlock(&tp->lock)); } -void mog_thrpool_quit(struct mog_thrpool *tp, struct mog_queue *q) +/* + * fire and forget, we must run the actual thread count manipulation + * in the main notify thread because we may end up terminating the + * thread which invoked this. + */ +void mog_thrpool_set_n_threads(struct mog_queue *q, size_t size) { - size_t i; - - for (i = 0; i < tp->n_threads; i++) - CHECK(int, 0, pthread_cancel(tp->threads[i])); - - for (i = 0; i < tp->n_threads; i++) { - pthread_t thr = tp->threads[i]; - int err; - - if (q) { - /* - * if we can't rely on cancellation, keep poking - * the thread until it wakes up from cancellation. - */ - while ((err = pthread_kill(thr, SIGURG)) == 0) - sched_yield(); - assert(err == ESRCH && "pthread_kill() usage bug"); - } + struct sat_arg *arg; - CHECK(int, 0, pthread_join(thr, NULL)); + /* this gets free'ed in mog_thrpool_process_queue() */ + arg = xmalloc(sizeof(struct sat_arg)); + arg->size = size; + arg->queue = q; + + /* put into the queue so main thread can process it */ + CHECK(int, 0, pthread_mutex_lock(&sat_lock)); + SIMPLEQ_INSERT_TAIL(&satqhead, arg, qentry); + CHECK(int, 0, pthread_mutex_unlock(&sat_lock)); + + /* wake up the main thread so it can process the queue */ + mog_notify(MOG_NOTIFY_SET_N_THREADS); +} + +/* this runs in the main (notify) thread */ +void mog_thrpool_process_queue(void) +{ + /* guard against requests bundled in one wakeup by looping here */ + for (;;) { + struct sat_arg *arg; + + CHECK(int, 0, pthread_mutex_lock(&sat_lock)); + arg = SIMPLEQ_FIRST(&satqhead); + if (arg) + SIMPLEQ_REMOVE_HEAD(&satqhead, qentry); + CHECK(int, 0, pthread_mutex_unlock(&sat_lock)); + + if (arg == NULL) + return; + + syslog(LOG_INFO, "server aio_threads=%u", (unsigned)arg->size); + thrpool_set_size(&arg->queue->thrpool, arg->size, arg->queue); + free(arg); } +} +void +mog_thrpool_start(struct mog_thrpool *tp, size_t n, + void *(*start_fn)(void *), void *arg) +{ + if (n == 0) + n = 1; + tp->threads = NULL; + tp->n_threads = 0; + tp->start_fn = start_fn; + tp->start_arg = arg; + CHECK(int, 0, pthread_mutex_init(&tp->lock, NULL)); + thrpool_set_size(tp, n, NULL); +} + +void mog_thrpool_quit(struct mog_thrpool *tp, struct mog_queue *q) +{ + thrpool_set_size(tp, 0, q); + CHECK(int, 0, pthread_mutex_destroy(&tp->lock)); mog_free_and_null(&tp->threads); } |