diff options
author | Eric Wong <normalperson@yhbt.net> | 2013-05-30 03:50:45 +0000 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2013-05-30 03:50:45 +0000 |
commit | 43ab652e15b4bd0b4cebfe7170c27f312eceb306 (patch) | |
tree | 36f50266ea4d2ab52b37392f8ee20f6c47ebd195 | |
parent | d1c18e26b0cc05009199f3997f0d57b07cdaa331 (diff) | |
download | cmogstored-iosem.tar.gz |
This is similar to the AIO channels functionality in Perlbal, but implemented using semaphores to optimize for the uncontended case.
-rw-r--r-- | cmogstored.h | 22 | ||||
-rw-r--r-- | dev.c | 30 | ||||
-rw-r--r-- | file.c | 108 | ||||
-rw-r--r-- | http_get.c | 27 | ||||
-rw-r--r-- | http_put.c | 18 | ||||
-rw-r--r-- | mgmt_fn.c | 7 | ||||
-rw-r--r-- | queue_epoll.c | 20 | ||||
-rw-r--r-- | svc.c | 2 | ||||
-rw-r--r-- | svc_dev.c | 2 |
9 files changed, 198 insertions, 38 deletions
diff --git a/cmogstored.h b/cmogstored.h index 0b5a7bf..9abe175 100644 --- a/cmogstored.h +++ b/cmogstored.h @@ -47,6 +47,7 @@ #include <sched.h> #include <error.h> /* GNU */ #include <poll.h> +#include <semaphore.h> #include "bsd/queue_safe.h" #include "bsd/simpleq.h" @@ -100,6 +101,9 @@ struct mog_wbuf; struct mog_dev { dev_t st_dev; uint32_t devid; + unsigned max_active; + unsigned cur_max; + sem_t sem; char prefix[FLEXIBLE_ARRAY_MEMBER]; }; @@ -249,6 +253,8 @@ struct mog_file { char *tmppath; /* NULL-ed if rename()-ed away */ void *mmptr; struct mog_svc *svc; + struct mog_dev *dev; + bool dev_held; struct mog_digest digest; }; @@ -334,10 +340,13 @@ size_t mog_svc_each(Hash_processor processor, void *data); void mog_svc_upgrade_prepare(void); /* dev.c */ -struct mog_dev * mog_dev_for(struct mog_svc *, uint32_t mog_devid); +struct mog_dev *mog_dev_for(struct mog_svc *, uint32_t mog_devid, bool update); int mog_dev_mkusage(const struct mog_dev *, struct mog_svc *); size_t mog_dev_hash(const void *, size_t tablesize); bool mog_dev_cmp(const void *a, const void *b); +void mog_dev_wait(struct mog_dev *); +void mog_dev_post(struct mog_dev *); +void mog_dev_free(void *); /* valid_path.rl */ int mog_valid_path(const char *buf, size_t len); @@ -434,10 +443,17 @@ void mog_queue_quit_loop(struct mog_queue *queue); enum mog_next mog_queue_step(struct mog_fd *mfd) MOG_CHECK; /* file.c */ -struct mog_fd * mog_file_open_read(struct mog_svc *, char *path); -struct mog_fd * mog_file_open_put(struct mog_svc *, char *path, int flags); +struct mog_fd * +mog_file_open_read(struct mog_svc *, uint32_t mog_devid, char *path); +struct mog_fd * +mog_file_open_put(struct mog_svc *, uint32_t mog_devid, char *path, int flags); void mog_file_close(struct mog_fd *); bool mog_open_expire_retry(struct mog_svc *); +int mog_stat_req(struct mog_svc *, uint32_t mog_devid, + const char *path, struct stat *); +void mog_file_post(struct mog_fd *); +void mog_file_wait(struct mog_fd *); +void mog_file_ensure_wait(struct mog_fd *); /* notify.c */ void mog_notify_init(void); @@ -23,13 +23,17 @@ static struct mog_dev *mog_dev_new(struct mog_svc *svc, uint32_t mog_devid) memcpy(dev->prefix, devprefix, len); free(devprefix); + dev->cur_max = dev->max_active = 1; + CHECK(int, 0, sem_init(&dev->sem, 0, 1)); + dev->devid = mog_devid; dev->st_dev = sb.st_dev; return dev; } -struct mog_dev *mog_dev_for(struct mog_svc *svc, uint32_t mog_devid) +struct mog_dev * +mog_dev_for(struct mog_svc *svc, uint32_t mog_devid, bool update) { struct mog_dev finder; struct mog_dev *ret; @@ -41,6 +45,9 @@ struct mog_dev *mog_dev_for(struct mog_svc *svc, uint32_t mog_devid) if (ret) { struct stat sb; + if (!update) + goto out; + /* * devXXX dir existed before, but is no longer readable * Possible FS/device error, it could come back, so do @@ -216,3 +223,24 @@ out: } while (0)); return errno ? -1 : 0; } + + +void mog_dev_wait(struct mog_dev *dev) +{ + if (dev) + CHECK(int, 0, sem_wait(&dev->sem)); +} + +void mog_dev_post(struct mog_dev *dev) +{ + if (dev) + CHECK(int, 0, sem_post(&dev->sem)); +} + +void mog_dev_free(void *ptr) +{ + struct mog_dev *dev = ptr; + + CHECK(int, 0, sem_destroy(&dev->sem)); + free(dev); +} @@ -19,27 +19,53 @@ bool mog_open_expire_retry(struct mog_svc *svc) /* path must be a free()-able pointer */ struct mog_fd * -mog_file_open_read(struct mog_svc *svc, char *path) +mog_file_open_read(struct mog_svc *svc, uint32_t mog_devid, char *path) { struct mog_fd *mfd; struct mog_file *mfile; - int fd = mog_open_read(svc, path); - - if (fd < 0 && mog_open_expire_retry(svc)) - fd = mog_open_read(svc, path); - - if (fd < 0) return NULL; + int fd; + struct mog_dev *dev = mog_dev_for(svc, mog_devid, false); + + mog_dev_wait(dev); + fd = mog_open_read(svc, path); + + if (fd < 0) { + mog_dev_post(dev); + if (mog_open_expire_retry(svc)) { + mog_dev_wait(dev); + fd = mog_open_read(svc, path); + if (fd < 0) + mog_dev_post(dev); + } + if (fd < 0) + return NULL; + } mfd = mog_fd_init(fd, MOG_FD_TYPE_FILE); mfile = &mfd->as.file; memset(mfile, 0, sizeof(struct mog_file)); mfile->fsize = -1; + mfile->dev = dev; + mfile->dev_held = true; mfile->svc = svc; return mfd; } +int mog_stat_req(struct mog_svc *svc, uint32_t mog_devid, + const char *path, struct stat *sb) +{ + struct mog_dev *dev = mog_dev_for(svc, mog_devid, false); + int rc; + + mog_dev_wait(dev); + rc = mog_stat(svc, path, sb); + mog_dev_post(dev); + + return rc; +} + static int mkpath_open_put(struct mog_svc *svc, char *path, int flags) { int fd = mog_open_put(svc, path, flags); @@ -59,36 +85,80 @@ static int mkpath_open_put(struct mog_svc *svc, char *path, int flags) /* path must be a free()-able pointer */ struct mog_fd * -mog_file_open_put(struct mog_svc *svc, char *path, int flags) +mog_file_open_put(struct mog_svc *svc, uint32_t mog_devid, char *path, + int flags) { struct mog_fd *mfd; struct mog_file *mfile; - int fd = mkpath_open_put(svc, path, flags); - - if (fd < 0 && mog_open_expire_retry(svc)) - fd = mkpath_open_put(svc, path, flags); - - if (fd < 0) return NULL; - + int fd; + struct mog_dev *dev = mog_dev_for(svc, mog_devid, false); + + mog_dev_wait(dev); + fd = mkpath_open_put(svc, path, flags); + + if (fd < 0) { + mog_dev_post(dev); + if (mog_open_expire_retry(svc)) { + mog_dev_wait(dev); + fd = mkpath_open_put(svc, path, flags); + if (fd < 0) + mog_dev_post(dev); + } + if (fd < 0) + return NULL; + } mfd = mog_fd_init(fd, MOG_FD_TYPE_FILE); mfile = &mfd->as.file; memset(mfile, 0, sizeof(struct mog_file)); mfile->svc = svc; + mfile->dev = dev; + mfile->dev_held = true; return mfd; } +void mog_file_post(struct mog_fd *mfd) +{ + struct mog_file *file = &mfd->as.file; + + assert(file->dev_held); + file->dev_held = false; + mog_dev_post(file->dev); +} + +void mog_file_wait(struct mog_fd *mfd) +{ + struct mog_file *file = &mfd->as.file; + + assert(!file->dev_held); + file->dev_held = true; + mog_dev_wait(file->dev); +} + +void mog_file_ensure_wait(struct mog_fd *mfd) +{ + struct mog_file *file = &mfd->as.file; + + if (!file->dev_held) { + file->dev_held = true; + mog_dev_wait(file->dev); + } +} + void mog_file_close(struct mog_fd *mfd) { - struct mog_file *mfile = &mfd->as.file; + struct mog_file *file = &mfd->as.file; assert(mfd->fd_type == MOG_FD_TYPE_FILE && "mog_fd is not a file"); + if (file->dev_held) + mog_dev_post(file->dev); + /* all of these may already be NULL */ - free(mfile->path); - free(mfile->tmppath); - mog_digest_destroy(&mfile->digest); + free(file->path); + free(file->tmppath); + mog_digest_destroy(&file->digest); mog_fd_put(mfd); } @@ -197,10 +197,14 @@ void mog_http_get_open(struct mog_fd *mfd, char *buf) sb.st_mtime = 0; sb.st_size = 0; } else if (http->_p.http_method == MOG_HTTP_METHOD_HEAD) { - if (mog_stat(http->svc, path, &sb) < 0) goto err; - if (!S_ISREG(sb.st_mode)) goto forbidden; + if (mog_stat_req(http->svc, http->_p.mog_devid, path, &sb) < 0) + goto err; + if (!S_ISREG(sb.st_mode)) + goto forbidden; } else { - http->forward = mog_file_open_read(http->svc, path); + http->forward = mog_file_open_read(http->svc, + http->_p.mog_devid, path); + if (http->forward == NULL) goto err; @@ -264,14 +268,25 @@ enum mog_next mog_http_get_in_progress(struct mog_fd *mfd) count = count > max_sendfile ? max_sendfile : count; if (count == 0) goto done; + + /* + * we want to hold the semaphore across open and initial sendfile, + * so this is the only place were we conditionally hold the semaphore + */ + mog_file_ensure_wait(file_mfd); retry: w = sendfile(mfd->fd, file_mfd->fd, &file->foff, (size_t)count); if (w > 0) { - if (file->foff == file->fsize) goto done; + if (file->foff == file->fsize) + goto done; + + mog_file_post(file_mfd); return MOG_NEXT_ACTIVE; } else if (w < 0) { switch (errno) { - case_EAGAIN: return MOG_NEXT_WAIT_WR; + case_EAGAIN: + mog_file_post(file_mfd); + return MOG_NEXT_WAIT_WR; case EINTR: goto retry; } http->_p.persistent = 0; @@ -286,7 +301,7 @@ retry: (long long)file->foff); } done: - mog_file_close(http->forward); + mog_file_close(file_mfd); if (http->_p.persistent) { mog_http_reset(http); return MOG_NEXT_ACTIVE; @@ -320,13 +320,15 @@ static char *tmppath_for(struct mog_http *http, const char *path) static struct mog_file * open_put(struct mog_http *http, char *path) { struct mog_file *file; + struct mog_svc *svc = http->svc; + uint32_t devid = http->_p.mog_devid; /* * we can't do an atomic rename(2) on successful PUT * if we have a partial upload */ if (http->_p.has_content_range) { - http->forward = mog_file_open_put(http->svc, path, O_CREAT); + http->forward = mog_file_open_put(svc, devid, path, O_CREAT); if (http->forward == NULL) return NULL; @@ -337,13 +339,13 @@ static struct mog_file * open_put(struct mog_http *http, char *path) char *tmp = tmppath_for(http, path); int fl = O_EXCL | O_TRUNC | O_CREAT; - http->forward = mog_file_open_put(http->svc, tmp, fl); + http->forward = mog_file_open_put(svc, devid, tmp, fl); /* retry once on EEXIST, don't inf loop if RNG is broken */ if (http->forward == NULL && errno == EEXIST) { free(tmp); tmp = tmppath_for(http, path); - http->forward = mog_file_open_put(http->svc, tmp, fl); + http->forward = mog_file_open_put(svc, devid, tmp, fl); } if (http->forward == NULL) { PRESERVE_ERRNO( free(tmp) ); @@ -507,7 +509,9 @@ retry: } if (r != 0) { switch (errno) { - case_EAGAIN: return MOG_NEXT_WAIT_RD; + case_EAGAIN: + mog_file_post(http->forward); + return MOG_NEXT_WAIT_RD; case EINTR: goto retry; } } @@ -630,7 +634,9 @@ chunk_state_trailer: read_err: if (r < 0) { switch (errno) { - case_EAGAIN: return MOG_NEXT_WAIT_RD; + case_EAGAIN: + mog_file_post(http->forward); + return MOG_NEXT_WAIT_RD; } } read_err_dbg(mfd, r); @@ -639,6 +645,8 @@ read_err: enum mog_next mog_http_put_in_progress(struct mog_fd *mfd) { + mog_file_ensure_wait(mfd->as.http.forward); + if (mfd->as.http._p.chunked) return chunked_put_in_progress(mfd); @@ -42,10 +42,13 @@ void mog_mgmt_fn_digest(struct mog_mgmt *mgmt, char *buf) if (!path) return; - mgmt->forward = mog_file_open_read(mgmt->svc, path); + mgmt->forward = mog_file_open_read(mgmt->svc, mgmt->mog_devid, path); if (mgmt->forward) { struct mog_file *file = &mgmt->forward->as.file; + /* use fsck queue instead */ + mog_file_post(mgmt->forward); + mog_fadv_noreuse(mgmt->forward->fd, 0, 0 /* ALL */); mog_fadv_sequential(mgmt->forward->fd, 0, 0 /* ALL */); mog_digest_init(&file->digest, mgmt->alg); @@ -140,7 +143,7 @@ void mog_mgmt_fn_size(struct mog_mgmt *mgmt, char *buf) if (!path) return; - if (mog_stat(mgmt->svc, path, &sb) == 0) { + if (mog_stat_req(mgmt->svc, mgmt->mog_devid, path, &sb) == 0) { long long size = (long long)sb.st_size; iov[1].iov_base = tmp; diff --git a/queue_epoll.c b/queue_epoll.c index e2e8222..0a9d065 100644 --- a/queue_epoll.c +++ b/queue_epoll.c @@ -209,8 +209,28 @@ fake_epoll_ctl_mod(struct mog_queue *q, struct mog_fd *mfd, enum mog_qev ev) epoll_ctl_error(q, mfd); } +static inline void assert_not_held(struct mog_fd *mfd) +{ + if (!mfd) + return; + + assert(! mfd->as.file.dev_held); +} + void mog_idleq_push(struct mog_queue *q, struct mog_fd *mfd, enum mog_qev ev) { + /* ensure we do not go to sleep while holding a semaphore */ + switch (mfd->fd_type) { + case MOG_FD_TYPE_HTTP: + case MOG_FD_TYPE_HTTPGET: + assert_not_held(mfd->as.http.forward); + break; + case MOG_FD_TYPE_MGMT: + assert_not_held(mfd->as.mgmt.forward); + default: + break; + } + if (epoll_ctl_mod_buggy) fake_epoll_ctl_mod(q, mfd, ev); else @@ -96,7 +96,7 @@ struct mog_svc * mog_svc_new(const char *docroot) CHECK(int, 0, pthread_mutex_init(&svc->devstats_lock, NULL)); CHECK(int, 0, pthread_mutex_init(&svc->by_mog_devid_lock, NULL)); svc->by_mog_devid = hash_initialize(7, NULL, mog_dev_hash, - mog_dev_cmp, free); + mog_dev_cmp, mog_dev_free); mog_oom_if_null(svc->by_mog_devid); switch (hash_insert_if_absent(by_docroot, svc, NULL)) { @@ -118,7 +118,7 @@ static int svc_scandev(struct mog_svc *svc, size_t *nr, mog_scandev_cb cb) if (*end != 0) continue; if (mog_devid > MOG_DEVID_MAX) continue; - dev = mog_dev_for(svc, (uint32_t)mog_devid); + dev = mog_dev_for(svc, (uint32_t)mog_devid, true); if (!dev) continue; devlist = svc_devlist(svc, dev->st_dev); |