about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2013-05-30 03:50:45 +0000
committerEric Wong <normalperson@yhbt.net>2013-05-30 03:50:45 +0000
commit43ab652e15b4bd0b4cebfe7170c27f312eceb306 (patch)
tree36f50266ea4d2ab52b37392f8ee20f6c47ebd195
parentd1c18e26b0cc05009199f3997f0d57b07cdaa331 (diff)
downloadcmogstored-iosem.tar.gz
This is similar to the AIO channels functionality in Perlbal,
but implemented using semaphores to optimize for the uncontended
case.
-rw-r--r--cmogstored.h22
-rw-r--r--dev.c30
-rw-r--r--file.c108
-rw-r--r--http_get.c27
-rw-r--r--http_put.c18
-rw-r--r--mgmt_fn.c7
-rw-r--r--queue_epoll.c20
-rw-r--r--svc.c2
-rw-r--r--svc_dev.c2
9 files changed, 198 insertions, 38 deletions
diff --git a/cmogstored.h b/cmogstored.h
index 0b5a7bf..9abe175 100644
--- a/cmogstored.h
+++ b/cmogstored.h
@@ -47,6 +47,7 @@
 #include <sched.h>
 #include <error.h> /* GNU */
 #include <poll.h>
+#include <semaphore.h>
 #include "bsd/queue_safe.h"
 #include "bsd/simpleq.h"
 
@@ -100,6 +101,9 @@ struct mog_wbuf;
 struct mog_dev {
         dev_t st_dev;
         uint32_t devid;
+        unsigned max_active;
+        unsigned cur_max;
+        sem_t sem;
         char prefix[FLEXIBLE_ARRAY_MEMBER];
 };
 
@@ -249,6 +253,8 @@ struct mog_file {
         char *tmppath; /* NULL-ed if rename()-ed away */
         void *mmptr;
         struct mog_svc *svc;
+        struct mog_dev *dev;
+        bool dev_held;
         struct mog_digest digest;
 };
 
@@ -334,10 +340,13 @@ size_t mog_svc_each(Hash_processor processor, void *data);
 void mog_svc_upgrade_prepare(void);
 
 /* dev.c */
-struct mog_dev * mog_dev_for(struct mog_svc *, uint32_t mog_devid);
+struct mog_dev *mog_dev_for(struct mog_svc *, uint32_t mog_devid, bool update);
 int mog_dev_mkusage(const struct mog_dev *, struct mog_svc *);
 size_t mog_dev_hash(const void *, size_t tablesize);
 bool mog_dev_cmp(const void *a, const void *b);
+void mog_dev_wait(struct mog_dev *);
+void mog_dev_post(struct mog_dev *);
+void mog_dev_free(void *);
 
 /* valid_path.rl */
 int mog_valid_path(const char *buf, size_t len);
@@ -434,10 +443,17 @@ void mog_queue_quit_loop(struct mog_queue *queue);
 enum mog_next mog_queue_step(struct mog_fd *mfd) MOG_CHECK;
 
 /* file.c */
-struct mog_fd * mog_file_open_read(struct mog_svc *, char *path);
-struct mog_fd * mog_file_open_put(struct mog_svc *, char *path, int flags);
+struct mog_fd *
+mog_file_open_read(struct mog_svc *, uint32_t mog_devid, char *path);
+struct mog_fd *
+mog_file_open_put(struct mog_svc *, uint32_t mog_devid, char *path, int flags);
 void mog_file_close(struct mog_fd *);
 bool mog_open_expire_retry(struct mog_svc *);
+int mog_stat_req(struct mog_svc *, uint32_t mog_devid,
+        const char *path, struct stat *);
+void mog_file_post(struct mog_fd *);
+void mog_file_wait(struct mog_fd *);
+void mog_file_ensure_wait(struct mog_fd *);
 
 /* notify.c */
 void mog_notify_init(void);
diff --git a/dev.c b/dev.c
index 7454b2d..551b15c 100644
--- a/dev.c
+++ b/dev.c
@@ -23,13 +23,17 @@ static struct mog_dev *mog_dev_new(struct mog_svc *svc, uint32_t mog_devid)
         memcpy(dev->prefix, devprefix, len);
         free(devprefix);
 
+        dev->cur_max = dev->max_active = 1;
+        CHECK(int, 0, sem_init(&dev->sem, 0, 1));
+
         dev->devid = mog_devid;
         dev->st_dev = sb.st_dev;
 
         return dev;
 }
 
-struct mog_dev *mog_dev_for(struct mog_svc *svc, uint32_t mog_devid)
+struct mog_dev *
+mog_dev_for(struct mog_svc *svc, uint32_t mog_devid, bool update)
 {
         struct mog_dev finder;
         struct mog_dev *ret;
@@ -41,6 +45,9 @@ struct mog_dev *mog_dev_for(struct mog_svc *svc, uint32_t mog_devid)
         if (ret) {
                 struct stat sb;
 
+                if (!update)
+                        goto out;
+
                 /*
                  * devXXX dir existed before, but is no longer readable
                  * Possible FS/device error, it could come back, so do
@@ -216,3 +223,24 @@ out:
         } while (0));
         return errno ? -1 : 0;
 }
+
+
+void mog_dev_wait(struct mog_dev *dev)
+{
+        if (dev)
+                CHECK(int, 0, sem_wait(&dev->sem));
+}
+
+void mog_dev_post(struct mog_dev *dev)
+{
+        if (dev)
+                CHECK(int, 0, sem_post(&dev->sem));
+}
+
+void mog_dev_free(void *ptr)
+{
+        struct mog_dev *dev = ptr;
+
+        CHECK(int, 0, sem_destroy(&dev->sem));
+        free(dev);
+}
diff --git a/file.c b/file.c
index bd47537..62efa27 100644
--- a/file.c
+++ b/file.c
@@ -19,27 +19,53 @@ bool mog_open_expire_retry(struct mog_svc *svc)
 
 /* path must be a free()-able pointer */
 struct mog_fd *
-mog_file_open_read(struct mog_svc *svc, char *path)
+mog_file_open_read(struct mog_svc *svc, uint32_t mog_devid, char *path)
 {
         struct mog_fd *mfd;
         struct mog_file *mfile;
-        int fd = mog_open_read(svc, path);
-
-        if (fd < 0 && mog_open_expire_retry(svc))
-                fd = mog_open_read(svc, path);
-
-        if (fd < 0) return NULL;
+        int fd;
+        struct mog_dev *dev = mog_dev_for(svc, mog_devid, false);
+
+        mog_dev_wait(dev);
+        fd = mog_open_read(svc, path);
+
+        if (fd < 0) {
+                mog_dev_post(dev);
+                if (mog_open_expire_retry(svc)) {
+                        mog_dev_wait(dev);
+                        fd = mog_open_read(svc, path);
+                        if (fd < 0)
+                                mog_dev_post(dev);
+                }
+                if (fd < 0)
+                        return NULL;
+        }
 
         mfd = mog_fd_init(fd, MOG_FD_TYPE_FILE);
 
         mfile = &mfd->as.file;
         memset(mfile, 0, sizeof(struct mog_file));
         mfile->fsize = -1;
+        mfile->dev = dev;
+        mfile->dev_held = true;
         mfile->svc = svc;
 
         return mfd;
 }
 
+int mog_stat_req(struct mog_svc *svc, uint32_t mog_devid,
+                const char *path, struct stat *sb)
+{
+        struct mog_dev *dev = mog_dev_for(svc, mog_devid, false);
+        int rc;
+
+        mog_dev_wait(dev);
+        rc = mog_stat(svc, path, sb);
+        mog_dev_post(dev);
+
+        return rc;
+}
+
 static int mkpath_open_put(struct mog_svc *svc, char *path, int flags)
 {
         int fd = mog_open_put(svc, path, flags);
@@ -59,36 +85,80 @@ static int mkpath_open_put(struct mog_svc *svc, char *path, int flags)
 
 /* path must be a free()-able pointer */
 struct mog_fd *
-mog_file_open_put(struct mog_svc *svc, char *path, int flags)
+mog_file_open_put(struct mog_svc *svc, uint32_t mog_devid, char *path,
+                int flags)
 {
         struct mog_fd *mfd;
         struct mog_file *mfile;
-        int fd = mkpath_open_put(svc, path, flags);
-
-        if (fd < 0 && mog_open_expire_retry(svc))
-                fd = mkpath_open_put(svc, path, flags);
-
-        if (fd < 0) return NULL;
-
+        int fd;
+        struct mog_dev *dev = mog_dev_for(svc, mog_devid, false);
+
+        mog_dev_wait(dev);
+        fd = mkpath_open_put(svc, path, flags);
+
+        if (fd < 0) {
+                mog_dev_post(dev);
+                if (mog_open_expire_retry(svc)) {
+                        mog_dev_wait(dev);
+                        fd = mkpath_open_put(svc, path, flags);
+                        if (fd < 0)
+                                mog_dev_post(dev);
+                }
+                if (fd < 0)
+                        return NULL;
+        }
         mfd = mog_fd_init(fd, MOG_FD_TYPE_FILE);
 
         mfile = &mfd->as.file;
         memset(mfile, 0, sizeof(struct mog_file));
         mfile->svc = svc;
+        mfile->dev = dev;
+        mfile->dev_held = true;
 
         return mfd;
 }
 
+void mog_file_post(struct mog_fd *mfd)
+{
+        struct mog_file *file = &mfd->as.file;
+
+        assert(file->dev_held);
+        file->dev_held = false;
+        mog_dev_post(file->dev);
+}
+
+void mog_file_wait(struct mog_fd *mfd)
+{
+        struct mog_file *file = &mfd->as.file;
+
+        assert(!file->dev_held);
+        file->dev_held = true;
+        mog_dev_wait(file->dev);
+}
+
+void mog_file_ensure_wait(struct mog_fd *mfd)
+{
+        struct mog_file *file = &mfd->as.file;
+
+        if (!file->dev_held) {
+                file->dev_held = true;
+                mog_dev_wait(file->dev);
+        }
+}
+
 void mog_file_close(struct mog_fd *mfd)
 {
-        struct mog_file *mfile = &mfd->as.file;
+        struct mog_file *file = &mfd->as.file;
 
         assert(mfd->fd_type == MOG_FD_TYPE_FILE && "mog_fd is not a file");
 
+        if (file->dev_held)
+                mog_dev_post(file->dev);
+
         /* all of these may already be NULL */
-        free(mfile->path);
-        free(mfile->tmppath);
-        mog_digest_destroy(&mfile->digest);
+        free(file->path);
+        free(file->tmppath);
+        mog_digest_destroy(&file->digest);
 
         mog_fd_put(mfd);
 }
diff --git a/http_get.c b/http_get.c
index 8fc566e..423df99 100644
--- a/http_get.c
+++ b/http_get.c
@@ -197,10 +197,14 @@ void mog_http_get_open(struct mog_fd *mfd, char *buf)
                 sb.st_mtime = 0;
                 sb.st_size = 0;
         } else if (http->_p.http_method == MOG_HTTP_METHOD_HEAD) {
-                if (mog_stat(http->svc, path, &sb) < 0) goto err;
-                if (!S_ISREG(sb.st_mode)) goto forbidden;
+                if (mog_stat_req(http->svc, http->_p.mog_devid, path, &sb) < 0)
+                        goto err;
+                if (!S_ISREG(sb.st_mode))
+                        goto forbidden;
         } else {
-                http->forward = mog_file_open_read(http->svc, path);
+                http->forward = mog_file_open_read(http->svc,
+                                                http->_p.mog_devid, path);
+
                 if (http->forward == NULL)
                         goto err;
 
@@ -264,14 +268,25 @@ enum mog_next mog_http_get_in_progress(struct mog_fd *mfd)
         count = count > max_sendfile ? max_sendfile : count;
         if (count == 0)
                 goto done;
+
+        /*
+         * we want to hold the semaphore across open and initial sendfile,
+         * so this is the only place were we conditionally hold the semaphore
+         */
+        mog_file_ensure_wait(file_mfd);
 retry:
         w = sendfile(mfd->fd, file_mfd->fd, &file->foff, (size_t)count);
         if (w > 0) {
-                if (file->foff == file->fsize) goto done;
+                if (file->foff == file->fsize)
+                        goto done;
+
+                mog_file_post(file_mfd);
                 return MOG_NEXT_ACTIVE;
         } else if (w < 0) {
                 switch (errno) {
-                case_EAGAIN: return MOG_NEXT_WAIT_WR;
+                case_EAGAIN:
+                        mog_file_post(file_mfd);
+                        return MOG_NEXT_WAIT_WR;
                 case EINTR: goto retry;
                 }
                 http->_p.persistent = 0;
@@ -286,7 +301,7 @@ retry:
                        (long long)file->foff);
         }
 done:
-        mog_file_close(http->forward);
+        mog_file_close(file_mfd);
         if (http->_p.persistent) {
                 mog_http_reset(http);
                 return MOG_NEXT_ACTIVE;
diff --git a/http_put.c b/http_put.c
index 9974c93..103c2c1 100644
--- a/http_put.c
+++ b/http_put.c
@@ -320,13 +320,15 @@ static char *tmppath_for(struct mog_http *http, const char *path)
 static struct mog_file * open_put(struct mog_http *http, char *path)
 {
         struct mog_file *file;
+        struct mog_svc *svc = http->svc;
+        uint32_t devid = http->_p.mog_devid;
 
         /*
          * we can't do an atomic rename(2) on successful PUT
          * if we have a partial upload
          */
         if (http->_p.has_content_range) {
-                http->forward = mog_file_open_put(http->svc, path, O_CREAT);
+                http->forward = mog_file_open_put(svc, devid, path, O_CREAT);
                 if (http->forward == NULL)
                         return NULL;
 
@@ -337,13 +339,13 @@ static struct mog_file * open_put(struct mog_http *http, char *path)
                 char *tmp = tmppath_for(http, path);
                 int fl = O_EXCL | O_TRUNC | O_CREAT;
 
-                http->forward = mog_file_open_put(http->svc, tmp, fl);
+                http->forward = mog_file_open_put(svc, devid, tmp, fl);
 
                 /* retry once on EEXIST, don't inf loop if RNG is broken */
                 if (http->forward == NULL && errno == EEXIST) {
                         free(tmp);
                         tmp = tmppath_for(http, path);
-                        http->forward = mog_file_open_put(http->svc, tmp, fl);
+                        http->forward = mog_file_open_put(svc, devid, tmp, fl);
                 }
                 if (http->forward == NULL) {
                         PRESERVE_ERRNO( free(tmp) );
@@ -507,7 +509,9 @@ retry:
         }
         if (r != 0) {
                 switch (errno) {
-                case_EAGAIN: return MOG_NEXT_WAIT_RD;
+                case_EAGAIN:
+                        mog_file_post(http->forward);
+                        return MOG_NEXT_WAIT_RD;
                 case EINTR: goto retry;
                 }
         }
@@ -630,7 +634,9 @@ chunk_state_trailer:
 read_err:
         if (r < 0) {
                 switch (errno) {
-                case_EAGAIN: return MOG_NEXT_WAIT_RD;
+                case_EAGAIN:
+                        mog_file_post(http->forward);
+                        return MOG_NEXT_WAIT_RD;
                 }
         }
         read_err_dbg(mfd, r);
@@ -639,6 +645,8 @@ read_err:
 
 enum mog_next mog_http_put_in_progress(struct mog_fd *mfd)
 {
+        mog_file_ensure_wait(mfd->as.http.forward);
+
         if (mfd->as.http._p.chunked)
                 return chunked_put_in_progress(mfd);
 
diff --git a/mgmt_fn.c b/mgmt_fn.c
index 0ca91f6..f84fcf5 100644
--- a/mgmt_fn.c
+++ b/mgmt_fn.c
@@ -42,10 +42,13 @@ void mog_mgmt_fn_digest(struct mog_mgmt *mgmt, char *buf)
 
         if (!path) return;
 
-        mgmt->forward = mog_file_open_read(mgmt->svc, path);
+        mgmt->forward = mog_file_open_read(mgmt->svc, mgmt->mog_devid, path);
         if (mgmt->forward) {
                 struct mog_file *file = &mgmt->forward->as.file;
 
+                /* use fsck queue instead */
+                mog_file_post(mgmt->forward);
+
                 mog_fadv_noreuse(mgmt->forward->fd, 0, 0 /* ALL */);
                 mog_fadv_sequential(mgmt->forward->fd, 0, 0 /* ALL */);
                 mog_digest_init(&file->digest, mgmt->alg);
@@ -140,7 +143,7 @@ void mog_mgmt_fn_size(struct mog_mgmt *mgmt, char *buf)
 
         if (!path) return;
 
-        if (mog_stat(mgmt->svc, path, &sb) == 0) {
+        if (mog_stat_req(mgmt->svc, mgmt->mog_devid, path, &sb) == 0) {
                 long long size = (long long)sb.st_size;
 
                 iov[1].iov_base = tmp;
diff --git a/queue_epoll.c b/queue_epoll.c
index e2e8222..0a9d065 100644
--- a/queue_epoll.c
+++ b/queue_epoll.c
@@ -209,8 +209,28 @@ fake_epoll_ctl_mod(struct mog_queue *q, struct mog_fd *mfd, enum mog_qev ev)
                 epoll_ctl_error(q, mfd);
 }
 
+static inline void assert_not_held(struct mog_fd *mfd)
+{
+        if (!mfd)
+                return;
+
+        assert(! mfd->as.file.dev_held);
+}
+
 void mog_idleq_push(struct mog_queue *q, struct mog_fd *mfd, enum mog_qev ev)
 {
+        /* ensure we do not go to sleep while holding a semaphore */
+        switch (mfd->fd_type) {
+        case MOG_FD_TYPE_HTTP:
+        case MOG_FD_TYPE_HTTPGET:
+                assert_not_held(mfd->as.http.forward);
+                break;
+        case MOG_FD_TYPE_MGMT:
+                assert_not_held(mfd->as.mgmt.forward);
+        default:
+                break;
+        }
+
         if (epoll_ctl_mod_buggy)
                 fake_epoll_ctl_mod(q, mfd, ev);
         else
diff --git a/svc.c b/svc.c
index e9d8d6d..8ea5220 100644
--- a/svc.c
+++ b/svc.c
@@ -96,7 +96,7 @@ struct mog_svc * mog_svc_new(const char *docroot)
         CHECK(int, 0, pthread_mutex_init(&svc->devstats_lock, NULL));
         CHECK(int, 0, pthread_mutex_init(&svc->by_mog_devid_lock, NULL));
         svc->by_mog_devid = hash_initialize(7, NULL, mog_dev_hash,
-                                        mog_dev_cmp, free);
+                                        mog_dev_cmp, mog_dev_free);
         mog_oom_if_null(svc->by_mog_devid);
 
         switch (hash_insert_if_absent(by_docroot, svc, NULL)) {
diff --git a/svc_dev.c b/svc_dev.c
index ab21211..4f2fefe 100644
--- a/svc_dev.c
+++ b/svc_dev.c
@@ -118,7 +118,7 @@ static int svc_scandev(struct mog_svc *svc, size_t *nr, mog_scandev_cb cb)
                 if (*end != 0) continue;
                 if (mog_devid > MOG_DEVID_MAX) continue;
 
-                dev = mog_dev_for(svc, (uint32_t)mog_devid);
+                dev = mog_dev_for(svc, (uint32_t)mog_devid, true);
                 if (!dev) continue;
 
                 devlist = svc_devlist(svc, dev->st_dev);