diff options
author | Eric Wong <normalperson@yhbt.net> | 2013-07-11 08:57:02 +0000 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2013-07-11 19:04:38 +0000 |
commit | daab757f5e52ce36a47e2d713365d68367a0e6dd (patch) | |
tree | cd3ae71f0e2f674b65374d414475ae62ac48cc69 | |
parent | 9302d584dcf68489a9c4739a3a42a468323ccda6 (diff) | |
download | cmogstored-daab757f5e52ce36a47e2d713365d68367a0e6dd.tar.gz |
This will allow us to detect I/O contention on our queue and yield the current thread to other clients for fairness. This can prevent a client from hogging the thread in situations where the network is much faster than the filesystem/disk.
-rw-r--r-- | cmogstored.h | 2 | ||||
-rw-r--r-- | http_get.c | 2 | ||||
-rw-r--r-- | http_put.c | 6 | ||||
-rw-r--r-- | ioq.c | 38 |
4 files changed, 46 insertions, 2 deletions
diff --git a/cmogstored.h b/cmogstored.h index f7fa022..4162931 100644 --- a/cmogstored.h +++ b/cmogstored.h @@ -101,6 +101,7 @@ struct mog_ioq { size_t max; pthread_mutex_t mtx; SIMPLEQ_HEAD(ioq_head, mog_fd) ioq_head; + bool contended; struct mog_svc *svc; }; @@ -635,6 +636,7 @@ void mog_yield(void); extern __thread struct mog_ioq *mog_ioq_current; void mog_ioq_init(struct mog_ioq *, struct mog_svc *, size_t val); bool mog_ioq_ready(struct mog_ioq *, struct mog_fd *) MOG_CHECK; +bool mog_ioq_contended(void) MOG_CHECK; void mog_ioq_next(struct mog_ioq *); void mog_ioq_destroy(struct mog_ioq *); bool mog_ioq_unblock(struct mog_fd *); @@ -251,7 +251,7 @@ enum mog_next mog_http_get_in_progress(struct mog_fd *mfd) struct mog_file *file; ssize_t w; off_t count; - static const off_t max_sendfile = 1024 * 1024 * 100; + off_t max_sendfile = (mog_ioq_contended() ? 1 : 100) * 1024 * 1024; assert(http->wbuf == NULL && "can't serve file with http->wbuf"); assert(http->forward && http->forward != MOG_IOSTAT && "bad forward"); @@ -488,6 +488,9 @@ retry: need -= r; if (need == 0) return http_put_commit(http); + + if (mog_ioq_contended()) + return MOG_NEXT_WAIT_RD; goto again; } if (r != 0) { @@ -543,6 +546,9 @@ again: /* chunk is complete */ if (http->_p.content_len == 0) mog_chunk_init(http); + + if (mog_ioq_contended()) + return MOG_NEXT_WAIT_RD; goto again; case MOG_CHUNK_STATE_TRAILER: chunk_state_trailer: @@ -19,11 +19,20 @@ void mog_ioq_init(struct mog_ioq *ioq, struct mog_svc *svc, size_t val) ioq->cur = val; ioq->max = val; ioq->svc = svc; + ioq->contended = false; SIMPLEQ_INIT(&ioq->ioq_head); CHECK(int, 0, pthread_mutex_init(&ioq->mtx, NULL)); } /* + * this is only a hint, so no explicit memory barriers or atomics + */ +static inline void ioq_set_contended(struct mog_ioq *ioq) +{ + ioq->contended = true; +} + +/* * This is like sem_trywait. Each thread is only allowed to acquire * one ioq at once. * @@ -43,6 +52,7 @@ bool mog_ioq_ready(struct mog_ioq *ioq, struct mog_fd *client_mfd) } else { client_mfd->ioq_blocked = 1; SIMPLEQ_INSERT_TAIL(&ioq->ioq_head, client_mfd, ioqent); + ioq_set_contended(ioq); } CHECK(int, 0, pthread_mutex_unlock(&ioq->mtx)); @@ -70,8 +80,13 @@ void mog_ioq_next(struct mog_ioq *check_ioq) if (mog_ioq_current->cur <= mog_ioq_current->max) { /* wake up any waiters */ client_mfd = SIMPLEQ_FIRST(&mog_ioq_current->ioq_head); - if (client_mfd) + if (client_mfd) { SIMPLEQ_REMOVE_HEAD(&mog_ioq_current->ioq_head, ioqent); + + /* if there's another head, we're still contended */ + if (SIMPLEQ_FIRST(&mog_ioq_current->ioq_head)) + ioq_set_contended(mog_ioq_current); + } } else { /* mog_ioq_adjust was called and lowered our capacity */ mog_ioq_current->cur--; @@ -85,6 +100,27 @@ void mog_ioq_next(struct mog_ioq *check_ioq) mog_ioq_current = NULL; } +/* + * Returns true if the currently held ioq is contended. + * This releases the contended flag if it is set, so the caller + * is expected to yield the current thread shortly afterwards. + * This is only a hint. + */ +bool mog_ioq_contended(void) +{ + struct mog_ioq *cur = mog_ioq_current; + + /* assume contended for non /devXXX* paths */ + if (!cur) + return true; + + /* + * we only want to minimize the threads hitting true, so we use + * an atomic exchange and hope for the best. This is only a hint. + */ + return __sync_bool_compare_and_swap(&cur->contended, true, false); +} + void mog_ioq_destroy(struct mog_ioq *ioq) { CHECK(int, 0, pthread_mutex_destroy(&ioq->mtx)); |