From daab757f5e52ce36a47e2d713365d68367a0e6dd Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 11 Jul 2013 08:57:02 +0000 Subject: ioq: introduce mog_ioq_contended hint This will allow us to detect I/O contention on our queue and yield the current thread to other clients for fairness. This can prevent a client from hogging the thread in situations where the network is much faster than the filesystem/disk. --- cmogstored.h | 2 ++ http_get.c | 2 +- http_put.c | 6 ++++++ ioq.c | 38 +++++++++++++++++++++++++++++++++++++- 4 files changed, 46 insertions(+), 2 deletions(-) diff --git a/cmogstored.h b/cmogstored.h index f7fa022..4162931 100644 --- a/cmogstored.h +++ b/cmogstored.h @@ -101,6 +101,7 @@ struct mog_ioq { size_t max; pthread_mutex_t mtx; SIMPLEQ_HEAD(ioq_head, mog_fd) ioq_head; + bool contended; struct mog_svc *svc; }; @@ -635,6 +636,7 @@ void mog_yield(void); extern __thread struct mog_ioq *mog_ioq_current; void mog_ioq_init(struct mog_ioq *, struct mog_svc *, size_t val); bool mog_ioq_ready(struct mog_ioq *, struct mog_fd *) MOG_CHECK; +bool mog_ioq_contended(void) MOG_CHECK; void mog_ioq_next(struct mog_ioq *); void mog_ioq_destroy(struct mog_ioq *); bool mog_ioq_unblock(struct mog_fd *); diff --git a/http_get.c b/http_get.c index 8fc566e..45470e5 100644 --- a/http_get.c +++ b/http_get.c @@ -251,7 +251,7 @@ enum mog_next mog_http_get_in_progress(struct mog_fd *mfd) struct mog_file *file; ssize_t w; off_t count; - static const off_t max_sendfile = 1024 * 1024 * 100; + off_t max_sendfile = (mog_ioq_contended() ? 1 : 100) * 1024 * 1024; assert(http->wbuf == NULL && "can't serve file with http->wbuf"); assert(http->forward && http->forward != MOG_IOSTAT && "bad forward"); diff --git a/http_put.c b/http_put.c index 277faa7..56a6325 100644 --- a/http_put.c +++ b/http_put.c @@ -488,6 +488,9 @@ retry: need -= r; if (need == 0) return http_put_commit(http); + + if (mog_ioq_contended()) + return MOG_NEXT_WAIT_RD; goto again; } if (r != 0) { @@ -543,6 +546,9 @@ again: /* chunk is complete */ if (http->_p.content_len == 0) mog_chunk_init(http); + + if (mog_ioq_contended()) + return MOG_NEXT_WAIT_RD; goto again; case MOG_CHUNK_STATE_TRAILER: chunk_state_trailer: diff --git a/ioq.c b/ioq.c index 028b225..829e00d 100644 --- a/ioq.c +++ b/ioq.c @@ -19,10 +19,19 @@ void mog_ioq_init(struct mog_ioq *ioq, struct mog_svc *svc, size_t val) ioq->cur = val; ioq->max = val; ioq->svc = svc; + ioq->contended = false; SIMPLEQ_INIT(&ioq->ioq_head); CHECK(int, 0, pthread_mutex_init(&ioq->mtx, NULL)); } +/* + * this is only a hint, so no explicit memory barriers or atomics + */ +static inline void ioq_set_contended(struct mog_ioq *ioq) +{ + ioq->contended = true; +} + /* * This is like sem_trywait. Each thread is only allowed to acquire * one ioq at once. @@ -43,6 +52,7 @@ bool mog_ioq_ready(struct mog_ioq *ioq, struct mog_fd *client_mfd) } else { client_mfd->ioq_blocked = 1; SIMPLEQ_INSERT_TAIL(&ioq->ioq_head, client_mfd, ioqent); + ioq_set_contended(ioq); } CHECK(int, 0, pthread_mutex_unlock(&ioq->mtx)); @@ -70,8 +80,13 @@ void mog_ioq_next(struct mog_ioq *check_ioq) if (mog_ioq_current->cur <= mog_ioq_current->max) { /* wake up any waiters */ client_mfd = SIMPLEQ_FIRST(&mog_ioq_current->ioq_head); - if (client_mfd) + if (client_mfd) { SIMPLEQ_REMOVE_HEAD(&mog_ioq_current->ioq_head, ioqent); + + /* if there's another head, we're still contended */ + if (SIMPLEQ_FIRST(&mog_ioq_current->ioq_head)) + ioq_set_contended(mog_ioq_current); + } } else { /* mog_ioq_adjust was called and lowered our capacity */ mog_ioq_current->cur--; @@ -85,6 +100,27 @@ void mog_ioq_next(struct mog_ioq *check_ioq) mog_ioq_current = NULL; } +/* + * Returns true if the currently held ioq is contended. + * This releases the contended flag if it is set, so the caller + * is expected to yield the current thread shortly afterwards. + * This is only a hint. + */ +bool mog_ioq_contended(void) +{ + struct mog_ioq *cur = mog_ioq_current; + + /* assume contended for non /devXXX* paths */ + if (!cur) + return true; + + /* + * we only want to minimize the threads hitting true, so we use + * an atomic exchange and hope for the best. This is only a hint. + */ + return __sync_bool_compare_and_swap(&cur->contended, true, false); +} + void mog_ioq_destroy(struct mog_ioq *ioq) { CHECK(int, 0, pthread_mutex_destroy(&ioq->mtx)); -- cgit v1.2.3-24-ge0c7