about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2013-06-26 01:03:52 +0000
committerEric Wong <normalperson@yhbt.net>2013-07-10 00:55:50 +0000
commitad961733c0afb96a7ab44dc9837a0f8c8fa239a4 (patch)
tree69bdbbec807c9bd3191dc59bdc1ab6ac37822956
parent70efa665edeef05f53978f9d541f411b0e1a2b2a (diff)
downloadcmogstored-ad961733c0afb96a7ab44dc9837a0f8c8fa239a4.tar.gz
This replaces the fsck_queue internals with a generic
ioq implementation which is based on the MogileFS devid,
and not the operating system devid.
-rw-r--r--Makefile.am1
-rw-r--r--cmogstored.h22
-rw-r--r--dev.c25
-rw-r--r--fsck_queue.c123
-rw-r--r--ioq.c90
-rw-r--r--mgmt.c15
-rw-r--r--svc.c2
-rw-r--r--svc_dev.c2
8 files changed, 151 insertions, 129 deletions
diff --git a/Makefile.am b/Makefile.am
index d143abd..0b24b7b 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -52,6 +52,7 @@ mog_src += http_util.h
 mog_src += inherit.c
 mog_src += ioprio.h
 mog_src += ioprio_linux.h
+mog_src += ioq.c
 mog_src += iostat.c
 mog_src += iostat.h
 mog_src += iostat_process.c
diff --git a/cmogstored.h b/cmogstored.h
index 66193b7..d7f37c9 100644
--- a/cmogstored.h
+++ b/cmogstored.h
@@ -96,10 +96,19 @@ enum mog_next {
         MOG_NEXT_IGNORE /* for iostat and fsck MD5 */
 };
 
+struct mog_ioq {
+        size_t cur;
+        size_t max;
+        pthread_mutex_t mtx;
+        SIMPLEQ_HEAD(ioq_head, mog_fd) ioq_head;
+        struct mog_svc *svc;
+};
+
 struct mog_wbuf;
 struct mog_dev {
         dev_t st_dev;
         uint32_t devid;
+        struct mog_ioq fsckq;
 };
 
 struct mog_rbuf {
@@ -130,7 +139,6 @@ struct mog_mgmt {
         struct mog_svc *svc;
         enum Gc_hash alg;
         LIST_ENTRY(mog_mgmt) subscribed;
-        SIMPLEQ_ENTRY(mog_mgmt) fsckq;
 };
 
 struct mog_queue;
@@ -301,6 +309,7 @@ struct mog_fd {
                 struct mog_queue queue;
                 struct mog_svc *svc;
         } as;
+        SIMPLEQ_ENTRY(mog_fd) ioqent;
 };
 void mog_fd_put(struct mog_fd *mfd);
 void mog_fdmap_requeue(struct mog_queue *quit_queue);
@@ -348,10 +357,11 @@ void mog_svc_aio_threads_enqueue(struct mog_svc *, size_t nr);
 void mog_svc_aio_threads_handler(void);
 
 /* dev.c */
-struct mog_dev * mog_dev_for(struct mog_svc *, uint32_t mog_devid);
+struct mog_dev *mog_dev_for(struct mog_svc *, uint32_t mog_devid, bool update);
 int mog_dev_mkusage(const struct mog_dev *, struct mog_svc *);
 size_t mog_dev_hash(const void *, size_t tablesize);
 bool mog_dev_cmp(const void *a, const void *b);
+void mog_dev_free(void *devptr);
 
 /* valid_path.rl */
 int mog_valid_path(const char *buf, size_t len);
@@ -602,3 +612,11 @@ void mog_nameinfo(struct mog_packaddr *, struct mog_ni *);
 
 /* yield.c */
 void mog_yield(void);
+
+
+/* ioq.c */
+extern __thread struct mog_ioq *mog_ioq_current;
+void mog_ioq_init(struct mog_ioq *, struct mog_svc *, size_t val);
+bool mog_ioq_ready(struct mog_ioq *, struct mog_fd *) MOG_CHECK;
+void mog_ioq_next(struct mog_ioq *);
+void mog_ioq_destroy(struct mog_ioq *);
diff --git a/dev.c b/dev.c
index cd39c5d..aa5429f 100644
--- a/dev.c
+++ b/dev.c
@@ -31,14 +31,17 @@ static struct mog_dev *mog_dev_new(struct mog_svc *svc, uint32_t mog_devid)
         dev = mog_cachealign(sizeof(struct mog_dev));
         dev->devid = mog_devid;
         dev->st_dev = sb.st_dev;
+        mog_ioq_init(&dev->fsckq, svc, 1);
 
         return dev;
 }
 
-struct mog_dev *mog_dev_for(struct mog_svc *svc, uint32_t mog_devid)
+struct mog_dev *
+mog_dev_for(struct mog_svc *svc, uint32_t mog_devid, bool update)
 {
         struct mog_dev finder;
         struct mog_dev *ret;
+        bool need_refresh = false;
 
         finder.devid = mog_devid;
 
@@ -47,6 +50,9 @@ struct mog_dev *mog_dev_for(struct mog_svc *svc, uint32_t mog_devid)
         if (ret) {
                 struct stat sb;
 
+                if (!update)
+                        goto out;
+
                 /*
                  * devXXX dir existed before, but is no longer readable
                  * Possible FS/device error, it could come back, so do
@@ -67,13 +73,20 @@ struct mog_dev *mog_dev_for(struct mog_svc *svc, uint32_t mog_devid)
                 case 0:
                         assert(0 && "mog_dev existed while adding");
                         abort();
-                case 1: break; /* OK, inserted */
+                case 1:
+                        if (!update)
+                                need_refresh = true;
+                        break; /* OK, inserted */
                 default: mog_oom();
                 }
         }
 out:
         CHECK(int, 0, pthread_mutex_unlock(&svc->by_mog_devid_lock));
 
+        /* we need to get the notify thread to create new worker threads */
+        if (need_refresh)
+                mog_notify(MOG_NOTIFY_DEVICE_REFRESH);
+
         return ret;
 }
 
@@ -222,3 +235,11 @@ out:
         } while (0));
         return errno ? -1 : 0;
 }
+
+void mog_dev_free(void *ptr)
+{
+        struct mog_dev *dev = ptr;
+
+        mog_ioq_destroy(&dev->fsckq);
+        free(dev);
+}
diff --git a/fsck_queue.c b/fsck_queue.c
index c99a9ed..b0895cf 100644
--- a/fsck_queue.c
+++ b/fsck_queue.c
@@ -3,132 +3,17 @@
  * License: GPLv3 or later (see COPYING for details)
  */
 #include "cmogstored.h"
-static pthread_mutex_t fsck_queue_lock = PTHREAD_MUTEX_INITIALIZER;
-static Hash_table *fsck_queues;
-
-struct fsck_queue {
-        dev_t st_dev;
-        SIMPLEQ_HEAD(fsck_fd, mog_mgmt) qhead;
-        size_t active;
-        size_t max_active;
-        pthread_mutex_t qlock;
-};
-
-static bool fq_cmp(const void *a, const void *b)
-{
-        const struct fsck_queue *fqa = a;
-        const struct fsck_queue *fqb = b;
-
-        return fqa->st_dev == fqb->st_dev;
-}
-
-static size_t fq_hash(const void *x, size_t tablesize)
-{
-        const struct fsck_queue *fq = x;
-
-        return fq->st_dev % tablesize;
-}
-
-static void fsck_queue_atexit(void)
-{
-        hash_free(fsck_queues);
-}
-
-MOG_NOINLINE static void fsck_queue_once(void)
-{
-        fsck_queues = hash_initialize(7, NULL, fq_hash, fq_cmp, free);
-        mog_oom_if_null(fsck_queues);
-        atexit(fsck_queue_atexit);
-}
-
-static struct fsck_queue * fsck_queue_for(struct mog_mgmt *mgmt)
-{
-        struct fsck_queue tmpq;
-        struct fsck_queue *fq;
-        struct stat sb;
-
-        assert(mgmt->forward && "no file open for fsck MD5");
-
-        if (fstat(mgmt->forward->fd, &sb) < 0) {
-                assert(errno != EBADF && "fstat on closed fd");
-                syslog(LOG_ERR, "fstat() failed for MD5 req: %m");
-                /* continue to a better error handling path */
-                return NULL;
-        }
-
-        tmpq.st_dev = sb.st_dev;
-
-        CHECK(int, 0, pthread_mutex_lock(&fsck_queue_lock));
-
-        if (fsck_queues) {
-                fq = hash_lookup(fsck_queues, &tmpq);
-                if (fq)
-                        goto out;
-        } else {
-                fsck_queue_once();
-        }
-
-        fq = mog_cachealign(sizeof(struct fsck_queue));
-        fq->st_dev = sb.st_dev;
-        SIMPLEQ_INIT(&fq->qhead);
-        fq->active = 0;
-        fq->max_active = 1; /* TODO: tunable */
-        CHECK(int, 0, pthread_mutex_init(&fq->qlock, NULL));
-
-        CHECK(int, 1, hash_insert_if_absent(fsck_queues, fq, NULL));
-out:
-        CHECK(int, 0, pthread_mutex_unlock(&fsck_queue_lock));
-
-        return fq;
-}
 
 bool mog_fsck_queue_ready(struct mog_fd *mfd)
 {
         struct mog_mgmt *mgmt = &mfd->as.mgmt;
-        struct fsck_queue *fq = fsck_queue_for(mgmt);
-        bool rv;
+        struct mog_dev *dev = mog_dev_for(mgmt->svc, mgmt->mog_devid, false);
 
-        /* hopefully continue to a better error handling path on error */
-        if (fq == NULL)
+        /* we may have been called to fsck a file NOT for MogileFS ... */
+        if (dev == NULL)
                 return true;
 
         assert(mgmt->prio == MOG_PRIO_FSCK && "bad prio");
 
-        CHECK(int, 0, pthread_mutex_lock(&fq->qlock));
-        if (fq->active < fq->max_active) {
-                fq->active++;
-                rv = true;
-        } else {
-                SIMPLEQ_INSERT_TAIL(&fq->qhead, mgmt, fsckq);
-                rv = false;
-        }
-        CHECK(int, 0, pthread_mutex_unlock(&fq->qlock));
-
-        return rv;
-}
-
-void mog_fsck_queue_next(struct mog_fd *mfd)
-{
-        struct mog_mgmt *mgmt = &mfd->as.mgmt;
-        struct fsck_queue *fq = fsck_queue_for(mgmt);
-        struct mog_mgmt *next_mgmt = NULL;
-
-        if (fq == NULL)
-                return; /* hopefully continue to a better error handling path */
-
-        assert(mgmt->prio == MOG_PRIO_FSCK && "bad prio");
-
-        CHECK(int, 0, pthread_mutex_lock(&fq->qlock));
-        fq->active--;
-        if (fq->active < fq->max_active) {
-                next_mgmt = SIMPLEQ_FIRST(&fq->qhead);
-                if (next_mgmt)
-                        SIMPLEQ_REMOVE_HEAD(&fq->qhead, fsckq);
-        }
-        CHECK(int, 0, pthread_mutex_unlock(&fq->qlock));
-
-        if (next_mgmt) {
-                assert(next_mgmt->prio == MOG_PRIO_FSCK && "bad prio");
-                mog_activeq_push(next_mgmt->svc->queue, mog_fd_of(next_mgmt));
-        }
+        return mog_ioq_ready(&dev->fsckq, mfd);
 }
diff --git a/ioq.c b/ioq.c
new file mode 100644
index 0000000..0b36e65
--- /dev/null
+++ b/ioq.c
@@ -0,0 +1,90 @@
+/*
+ * Copyright (C) 2012-2013, Eric Wong <normalperson@yhbt.net>
+ * License: GPLv3 or later (see COPYING for details)
+ */
+#include "cmogstored.h"
+/*
+ * This is a semaphore-like API with explicit queueing and activation,
+ * so contended scheduling/wakeups happen via epoll/kqueue and there
+ * is never blocking of threads (other than the mutex)
+ *
+ * The main operations are mog_ioq_ready and mog_ioq_next
+ *
+ * mog_ioq_next is automatically called when releases a regular file.
+ */
+__thread struct mog_ioq *mog_ioq_current;
+
+void mog_ioq_init(struct mog_ioq *ioq, struct mog_svc *svc, size_t val)
+{
+        ioq->cur = val;
+        ioq->max = val;
+        ioq->svc = svc;
+        SIMPLEQ_INIT(&ioq->ioq_head);
+        CHECK(int, 0, pthread_mutex_init(&ioq->mtx, NULL));
+}
+
+/*
+ * This is like sem_trywait.  Each thread is only allowed to acquire
+ * one ioq at once.
+ *
+ * client_mfd is the client socket, not the open (regular) file
+ */
+bool mog_ioq_ready(struct mog_ioq *ioq, struct mog_fd *client_mfd)
+{
+        bool good;
+
+        assert(mog_ioq_current == NULL && "already holding mog_ioq_current");
+        CHECK(int, 0, pthread_mutex_lock(&ioq->mtx));
+
+        good = ioq->cur > 0;
+        if (good) { /* uncontended case is simple */
+                ioq->cur--;
+                mog_ioq_current = ioq;
+        } else {
+                SIMPLEQ_INSERT_TAIL(&ioq->ioq_head, client_mfd, ioqent);
+        }
+
+        CHECK(int, 0, pthread_mutex_unlock(&ioq->mtx));
+
+        return good;
+}
+
+/*
+ * analogous to sem_post, this wakes up the next waiter
+ * check_ioq may be NULL, if non-NULL, it validates against mog_ioq_current
+ */
+void mog_ioq_next(struct mog_ioq *check_ioq)
+{
+        struct mog_fd *client_mfd = NULL;
+
+        if (mog_ioq_current == NULL)
+                return;
+
+        CHECK(int, 0, pthread_mutex_lock(&mog_ioq_current->mtx));
+
+        assert((check_ioq == NULL) ||
+               (check_ioq == mog_ioq_current && "ioq mismatch (tls vs check)"));
+
+        mog_ioq_current->cur++;
+        if (mog_ioq_current->cur <= mog_ioq_current->max) {
+                /* wake up any waiters */
+                client_mfd = SIMPLEQ_FIRST(&mog_ioq_current->ioq_head);
+                if (client_mfd)
+                        SIMPLEQ_REMOVE_HEAD(&mog_ioq_current->ioq_head, ioqent);
+        } else {
+                /* mog_ioq_adjust was called and lowered our capacity */
+                mog_ioq_current->cur--;
+        }
+        CHECK(int, 0, pthread_mutex_unlock(&mog_ioq_current->mtx));
+
+        /* wake up the next sleeper on this queue */
+        if (client_mfd)
+                mog_activeq_push(mog_ioq_current->svc->queue, client_mfd);
+
+        mog_ioq_current = NULL;
+}
+
+void mog_ioq_destroy(struct mog_ioq *ioq)
+{
+        CHECK(int, 0, pthread_mutex_destroy(&ioq->mtx));
+}
diff --git a/mgmt.c b/mgmt.c
index a6a1ff5..d64c1c0 100644
--- a/mgmt.c
+++ b/mgmt.c
@@ -42,9 +42,6 @@ static void mgmt_digest_step(struct mog_fd *mfd)
                 mog_mgmt_fn_digest_err(mgmt);
         }
 
-        if (mgmt->prio == MOG_PRIO_FSCK)
-                mog_fsck_queue_next(mfd);
-
         mog_file_close(mgmt->forward);
         mgmt->prio = MOG_PRIO_NONE;
         mgmt->forward = NULL;
@@ -156,7 +153,7 @@ mgmt_defer_rbuf(struct mog_mgmt *mgmt, struct mog_rbuf *rbuf, size_t buf_len)
  * this is the main event callback and called whenever mgmt
  * is pulled out of a queue (either idle or active)
  */
-static enum mog_next mgmt_queue_step(struct mog_fd *mfd)
+static enum mog_next __mgmt_queue_step(struct mog_fd *mfd)
 {
         struct mog_mgmt *mgmt = &mfd->as.mgmt;
         struct mog_rbuf *rbuf;
@@ -244,6 +241,16 @@ parse:
         return MOG_NEXT_CLOSE;
 }
 
+static enum mog_next mgmt_queue_step(struct mog_fd *mfd)
+{
+        enum mog_next ret = __mgmt_queue_step(mfd);
+
+        /* enqueue any pending waiters before we become enqueued ourselves */
+        mog_ioq_next(NULL);
+
+        return ret;
+}
+
 /*
  * this function is called whenever a mgmt client is pulled out of
  * _any_ queue (listen/idle/active).  Our queueing model should be
diff --git a/svc.c b/svc.c
index 27440ff..24d90fd 100644
--- a/svc.c
+++ b/svc.c
@@ -109,7 +109,7 @@ struct mog_svc * mog_svc_new(const char *docroot)
         CHECK(int, 0, pthread_mutex_init(&svc->devstats_lock, NULL));
         CHECK(int, 0, pthread_mutex_init(&svc->by_mog_devid_lock, NULL));
         svc->by_mog_devid = hash_initialize(7, NULL, mog_dev_hash,
-                                        mog_dev_cmp, free);
+                                        mog_dev_cmp, mog_dev_free);
         mog_oom_if_null(svc->by_mog_devid);
 
         switch (hash_insert_if_absent(by_docroot, svc, NULL)) {
diff --git a/svc_dev.c b/svc_dev.c
index 5d19580..6fe3396 100644
--- a/svc_dev.c
+++ b/svc_dev.c
@@ -118,7 +118,7 @@ static int svc_scandev(struct mog_svc *svc, size_t *nr, mog_scandev_cb cb)
                 if (*end != 0) continue;
                 if (mog_devid > MOG_DEVID_MAX) continue;
 
-                dev = mog_dev_for(svc, (uint32_t)mog_devid);
+                dev = mog_dev_for(svc, (uint32_t)mog_devid, true);
                 if (!dev) continue;
 
                 devlist = svc_devlist(svc, dev->st_dev);