From ad961733c0afb96a7ab44dc9837a0f8c8fa239a4 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 26 Jun 2013 01:03:52 +0000 Subject: introduce generic I/O queue functionality This replaces the fsck_queue internals with a generic ioq implementation which is based on the MogileFS devid, and not the operating system devid. --- Makefile.am | 1 + cmogstored.h | 22 ++++++++++- dev.c | 25 +++++++++++- fsck_queue.c | 123 ++--------------------------------------------------------- ioq.c | 90 +++++++++++++++++++++++++++++++++++++++++++ mgmt.c | 15 ++++++-- svc.c | 2 +- svc_dev.c | 2 +- 8 files changed, 151 insertions(+), 129 deletions(-) create mode 100644 ioq.c diff --git a/Makefile.am b/Makefile.am index d143abd..0b24b7b 100644 --- a/Makefile.am +++ b/Makefile.am @@ -52,6 +52,7 @@ mog_src += http_util.h mog_src += inherit.c mog_src += ioprio.h mog_src += ioprio_linux.h +mog_src += ioq.c mog_src += iostat.c mog_src += iostat.h mog_src += iostat_process.c diff --git a/cmogstored.h b/cmogstored.h index 66193b7..d7f37c9 100644 --- a/cmogstored.h +++ b/cmogstored.h @@ -96,10 +96,19 @@ enum mog_next { MOG_NEXT_IGNORE /* for iostat and fsck MD5 */ }; +struct mog_ioq { + size_t cur; + size_t max; + pthread_mutex_t mtx; + SIMPLEQ_HEAD(ioq_head, mog_fd) ioq_head; + struct mog_svc *svc; +}; + struct mog_wbuf; struct mog_dev { dev_t st_dev; uint32_t devid; + struct mog_ioq fsckq; }; struct mog_rbuf { @@ -130,7 +139,6 @@ struct mog_mgmt { struct mog_svc *svc; enum Gc_hash alg; LIST_ENTRY(mog_mgmt) subscribed; - SIMPLEQ_ENTRY(mog_mgmt) fsckq; }; struct mog_queue; @@ -301,6 +309,7 @@ struct mog_fd { struct mog_queue queue; struct mog_svc *svc; } as; + SIMPLEQ_ENTRY(mog_fd) ioqent; }; void mog_fd_put(struct mog_fd *mfd); void mog_fdmap_requeue(struct mog_queue *quit_queue); @@ -348,10 +357,11 @@ void mog_svc_aio_threads_enqueue(struct mog_svc *, size_t nr); void mog_svc_aio_threads_handler(void); /* dev.c */ -struct mog_dev * mog_dev_for(struct mog_svc *, uint32_t mog_devid); +struct mog_dev *mog_dev_for(struct mog_svc *, uint32_t mog_devid, bool update); int mog_dev_mkusage(const struct mog_dev *, struct mog_svc *); size_t mog_dev_hash(const void *, size_t tablesize); bool mog_dev_cmp(const void *a, const void *b); +void mog_dev_free(void *devptr); /* valid_path.rl */ int mog_valid_path(const char *buf, size_t len); @@ -602,3 +612,11 @@ void mog_nameinfo(struct mog_packaddr *, struct mog_ni *); /* yield.c */ void mog_yield(void); + + +/* ioq.c */ +extern __thread struct mog_ioq *mog_ioq_current; +void mog_ioq_init(struct mog_ioq *, struct mog_svc *, size_t val); +bool mog_ioq_ready(struct mog_ioq *, struct mog_fd *) MOG_CHECK; +void mog_ioq_next(struct mog_ioq *); +void mog_ioq_destroy(struct mog_ioq *); diff --git a/dev.c b/dev.c index cd39c5d..aa5429f 100644 --- a/dev.c +++ b/dev.c @@ -31,14 +31,17 @@ static struct mog_dev *mog_dev_new(struct mog_svc *svc, uint32_t mog_devid) dev = mog_cachealign(sizeof(struct mog_dev)); dev->devid = mog_devid; dev->st_dev = sb.st_dev; + mog_ioq_init(&dev->fsckq, svc, 1); return dev; } -struct mog_dev *mog_dev_for(struct mog_svc *svc, uint32_t mog_devid) +struct mog_dev * +mog_dev_for(struct mog_svc *svc, uint32_t mog_devid, bool update) { struct mog_dev finder; struct mog_dev *ret; + bool need_refresh = false; finder.devid = mog_devid; @@ -47,6 +50,9 @@ struct mog_dev *mog_dev_for(struct mog_svc *svc, uint32_t mog_devid) if (ret) { struct stat sb; + if (!update) + goto out; + /* * devXXX dir existed before, but is no longer readable * Possible FS/device error, it could come back, so do @@ -67,13 +73,20 @@ struct mog_dev *mog_dev_for(struct mog_svc *svc, uint32_t mog_devid) case 0: assert(0 && "mog_dev existed while adding"); abort(); - case 1: break; /* OK, inserted */ + case 1: + if (!update) + need_refresh = true; + break; /* OK, inserted */ default: mog_oom(); } } out: CHECK(int, 0, pthread_mutex_unlock(&svc->by_mog_devid_lock)); + /* we need to get the notify thread to create new worker threads */ + if (need_refresh) + mog_notify(MOG_NOTIFY_DEVICE_REFRESH); + return ret; } @@ -222,3 +235,11 @@ out: } while (0)); return errno ? -1 : 0; } + +void mog_dev_free(void *ptr) +{ + struct mog_dev *dev = ptr; + + mog_ioq_destroy(&dev->fsckq); + free(dev); +} diff --git a/fsck_queue.c b/fsck_queue.c index c99a9ed..b0895cf 100644 --- a/fsck_queue.c +++ b/fsck_queue.c @@ -3,132 +3,17 @@ * License: GPLv3 or later (see COPYING for details) */ #include "cmogstored.h" -static pthread_mutex_t fsck_queue_lock = PTHREAD_MUTEX_INITIALIZER; -static Hash_table *fsck_queues; - -struct fsck_queue { - dev_t st_dev; - SIMPLEQ_HEAD(fsck_fd, mog_mgmt) qhead; - size_t active; - size_t max_active; - pthread_mutex_t qlock; -}; - -static bool fq_cmp(const void *a, const void *b) -{ - const struct fsck_queue *fqa = a; - const struct fsck_queue *fqb = b; - - return fqa->st_dev == fqb->st_dev; -} - -static size_t fq_hash(const void *x, size_t tablesize) -{ - const struct fsck_queue *fq = x; - - return fq->st_dev % tablesize; -} - -static void fsck_queue_atexit(void) -{ - hash_free(fsck_queues); -} - -MOG_NOINLINE static void fsck_queue_once(void) -{ - fsck_queues = hash_initialize(7, NULL, fq_hash, fq_cmp, free); - mog_oom_if_null(fsck_queues); - atexit(fsck_queue_atexit); -} - -static struct fsck_queue * fsck_queue_for(struct mog_mgmt *mgmt) -{ - struct fsck_queue tmpq; - struct fsck_queue *fq; - struct stat sb; - - assert(mgmt->forward && "no file open for fsck MD5"); - - if (fstat(mgmt->forward->fd, &sb) < 0) { - assert(errno != EBADF && "fstat on closed fd"); - syslog(LOG_ERR, "fstat() failed for MD5 req: %m"); - /* continue to a better error handling path */ - return NULL; - } - - tmpq.st_dev = sb.st_dev; - - CHECK(int, 0, pthread_mutex_lock(&fsck_queue_lock)); - - if (fsck_queues) { - fq = hash_lookup(fsck_queues, &tmpq); - if (fq) - goto out; - } else { - fsck_queue_once(); - } - - fq = mog_cachealign(sizeof(struct fsck_queue)); - fq->st_dev = sb.st_dev; - SIMPLEQ_INIT(&fq->qhead); - fq->active = 0; - fq->max_active = 1; /* TODO: tunable */ - CHECK(int, 0, pthread_mutex_init(&fq->qlock, NULL)); - - CHECK(int, 1, hash_insert_if_absent(fsck_queues, fq, NULL)); -out: - CHECK(int, 0, pthread_mutex_unlock(&fsck_queue_lock)); - - return fq; -} bool mog_fsck_queue_ready(struct mog_fd *mfd) { struct mog_mgmt *mgmt = &mfd->as.mgmt; - struct fsck_queue *fq = fsck_queue_for(mgmt); - bool rv; + struct mog_dev *dev = mog_dev_for(mgmt->svc, mgmt->mog_devid, false); - /* hopefully continue to a better error handling path on error */ - if (fq == NULL) + /* we may have been called to fsck a file NOT for MogileFS ... */ + if (dev == NULL) return true; assert(mgmt->prio == MOG_PRIO_FSCK && "bad prio"); - CHECK(int, 0, pthread_mutex_lock(&fq->qlock)); - if (fq->active < fq->max_active) { - fq->active++; - rv = true; - } else { - SIMPLEQ_INSERT_TAIL(&fq->qhead, mgmt, fsckq); - rv = false; - } - CHECK(int, 0, pthread_mutex_unlock(&fq->qlock)); - - return rv; -} - -void mog_fsck_queue_next(struct mog_fd *mfd) -{ - struct mog_mgmt *mgmt = &mfd->as.mgmt; - struct fsck_queue *fq = fsck_queue_for(mgmt); - struct mog_mgmt *next_mgmt = NULL; - - if (fq == NULL) - return; /* hopefully continue to a better error handling path */ - - assert(mgmt->prio == MOG_PRIO_FSCK && "bad prio"); - - CHECK(int, 0, pthread_mutex_lock(&fq->qlock)); - fq->active--; - if (fq->active < fq->max_active) { - next_mgmt = SIMPLEQ_FIRST(&fq->qhead); - if (next_mgmt) - SIMPLEQ_REMOVE_HEAD(&fq->qhead, fsckq); - } - CHECK(int, 0, pthread_mutex_unlock(&fq->qlock)); - - if (next_mgmt) { - assert(next_mgmt->prio == MOG_PRIO_FSCK && "bad prio"); - mog_activeq_push(next_mgmt->svc->queue, mog_fd_of(next_mgmt)); - } + return mog_ioq_ready(&dev->fsckq, mfd); } diff --git a/ioq.c b/ioq.c new file mode 100644 index 0000000..0b36e65 --- /dev/null +++ b/ioq.c @@ -0,0 +1,90 @@ +/* + * Copyright (C) 2012-2013, Eric Wong + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" +/* + * This is a semaphore-like API with explicit queueing and activation, + * so contended scheduling/wakeups happen via epoll/kqueue and there + * is never blocking of threads (other than the mutex) + * + * The main operations are mog_ioq_ready and mog_ioq_next + * + * mog_ioq_next is automatically called when releases a regular file. + */ +__thread struct mog_ioq *mog_ioq_current; + +void mog_ioq_init(struct mog_ioq *ioq, struct mog_svc *svc, size_t val) +{ + ioq->cur = val; + ioq->max = val; + ioq->svc = svc; + SIMPLEQ_INIT(&ioq->ioq_head); + CHECK(int, 0, pthread_mutex_init(&ioq->mtx, NULL)); +} + +/* + * This is like sem_trywait. Each thread is only allowed to acquire + * one ioq at once. + * + * client_mfd is the client socket, not the open (regular) file + */ +bool mog_ioq_ready(struct mog_ioq *ioq, struct mog_fd *client_mfd) +{ + bool good; + + assert(mog_ioq_current == NULL && "already holding mog_ioq_current"); + CHECK(int, 0, pthread_mutex_lock(&ioq->mtx)); + + good = ioq->cur > 0; + if (good) { /* uncontended case is simple */ + ioq->cur--; + mog_ioq_current = ioq; + } else { + SIMPLEQ_INSERT_TAIL(&ioq->ioq_head, client_mfd, ioqent); + } + + CHECK(int, 0, pthread_mutex_unlock(&ioq->mtx)); + + return good; +} + +/* + * analogous to sem_post, this wakes up the next waiter + * check_ioq may be NULL, if non-NULL, it validates against mog_ioq_current + */ +void mog_ioq_next(struct mog_ioq *check_ioq) +{ + struct mog_fd *client_mfd = NULL; + + if (mog_ioq_current == NULL) + return; + + CHECK(int, 0, pthread_mutex_lock(&mog_ioq_current->mtx)); + + assert((check_ioq == NULL) || + (check_ioq == mog_ioq_current && "ioq mismatch (tls vs check)")); + + mog_ioq_current->cur++; + if (mog_ioq_current->cur <= mog_ioq_current->max) { + /* wake up any waiters */ + client_mfd = SIMPLEQ_FIRST(&mog_ioq_current->ioq_head); + if (client_mfd) + SIMPLEQ_REMOVE_HEAD(&mog_ioq_current->ioq_head, ioqent); + } else { + /* mog_ioq_adjust was called and lowered our capacity */ + mog_ioq_current->cur--; + } + CHECK(int, 0, pthread_mutex_unlock(&mog_ioq_current->mtx)); + + /* wake up the next sleeper on this queue */ + if (client_mfd) + mog_activeq_push(mog_ioq_current->svc->queue, client_mfd); + + mog_ioq_current = NULL; +} + +void mog_ioq_destroy(struct mog_ioq *ioq) +{ + CHECK(int, 0, pthread_mutex_destroy(&ioq->mtx)); +} diff --git a/mgmt.c b/mgmt.c index a6a1ff5..d64c1c0 100644 --- a/mgmt.c +++ b/mgmt.c @@ -42,9 +42,6 @@ static void mgmt_digest_step(struct mog_fd *mfd) mog_mgmt_fn_digest_err(mgmt); } - if (mgmt->prio == MOG_PRIO_FSCK) - mog_fsck_queue_next(mfd); - mog_file_close(mgmt->forward); mgmt->prio = MOG_PRIO_NONE; mgmt->forward = NULL; @@ -156,7 +153,7 @@ mgmt_defer_rbuf(struct mog_mgmt *mgmt, struct mog_rbuf *rbuf, size_t buf_len) * this is the main event callback and called whenever mgmt * is pulled out of a queue (either idle or active) */ -static enum mog_next mgmt_queue_step(struct mog_fd *mfd) +static enum mog_next __mgmt_queue_step(struct mog_fd *mfd) { struct mog_mgmt *mgmt = &mfd->as.mgmt; struct mog_rbuf *rbuf; @@ -244,6 +241,16 @@ parse: return MOG_NEXT_CLOSE; } +static enum mog_next mgmt_queue_step(struct mog_fd *mfd) +{ + enum mog_next ret = __mgmt_queue_step(mfd); + + /* enqueue any pending waiters before we become enqueued ourselves */ + mog_ioq_next(NULL); + + return ret; +} + /* * this function is called whenever a mgmt client is pulled out of * _any_ queue (listen/idle/active). Our queueing model should be diff --git a/svc.c b/svc.c index 27440ff..24d90fd 100644 --- a/svc.c +++ b/svc.c @@ -109,7 +109,7 @@ struct mog_svc * mog_svc_new(const char *docroot) CHECK(int, 0, pthread_mutex_init(&svc->devstats_lock, NULL)); CHECK(int, 0, pthread_mutex_init(&svc->by_mog_devid_lock, NULL)); svc->by_mog_devid = hash_initialize(7, NULL, mog_dev_hash, - mog_dev_cmp, free); + mog_dev_cmp, mog_dev_free); mog_oom_if_null(svc->by_mog_devid); switch (hash_insert_if_absent(by_docroot, svc, NULL)) { diff --git a/svc_dev.c b/svc_dev.c index 5d19580..6fe3396 100644 --- a/svc_dev.c +++ b/svc_dev.c @@ -118,7 +118,7 @@ static int svc_scandev(struct mog_svc *svc, size_t *nr, mog_scandev_cb cb) if (*end != 0) continue; if (mog_devid > MOG_DEVID_MAX) continue; - dev = mog_dev_for(svc, (uint32_t)mog_devid); + dev = mog_dev_for(svc, (uint32_t)mog_devid, true); if (!dev) continue; devlist = svc_devlist(svc, dev->st_dev); -- cgit v1.2.3-24-ge0c7