about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2013-07-11 08:57:02 +0000
committerEric Wong <normalperson@yhbt.net>2013-07-11 19:04:38 +0000
commitdaab757f5e52ce36a47e2d713365d68367a0e6dd (patch)
treecd3ae71f0e2f674b65374d414475ae62ac48cc69
parent9302d584dcf68489a9c4739a3a42a468323ccda6 (diff)
downloadcmogstored-daab757f5e52ce36a47e2d713365d68367a0e6dd.tar.gz
This will allow us to detect I/O contention on our queue
and yield the current thread to other clients for fairness.
This can prevent a client from hogging the thread in situations
where the network is much faster than the filesystem/disk.
-rw-r--r--cmogstored.h2
-rw-r--r--http_get.c2
-rw-r--r--http_put.c6
-rw-r--r--ioq.c38
4 files changed, 46 insertions, 2 deletions
diff --git a/cmogstored.h b/cmogstored.h
index f7fa022..4162931 100644
--- a/cmogstored.h
+++ b/cmogstored.h
@@ -101,6 +101,7 @@ struct mog_ioq {
         size_t max;
         pthread_mutex_t mtx;
         SIMPLEQ_HEAD(ioq_head, mog_fd) ioq_head;
+        bool contended;
         struct mog_svc *svc;
 };
 
@@ -635,6 +636,7 @@ void mog_yield(void);
 extern __thread struct mog_ioq *mog_ioq_current;
 void mog_ioq_init(struct mog_ioq *, struct mog_svc *, size_t val);
 bool mog_ioq_ready(struct mog_ioq *, struct mog_fd *) MOG_CHECK;
+bool mog_ioq_contended(void) MOG_CHECK;
 void mog_ioq_next(struct mog_ioq *);
 void mog_ioq_destroy(struct mog_ioq *);
 bool mog_ioq_unblock(struct mog_fd *);
diff --git a/http_get.c b/http_get.c
index 8fc566e..45470e5 100644
--- a/http_get.c
+++ b/http_get.c
@@ -251,7 +251,7 @@ enum mog_next mog_http_get_in_progress(struct mog_fd *mfd)
         struct mog_file *file;
         ssize_t w;
         off_t count;
-        static const off_t max_sendfile = 1024 * 1024 * 100;
+        off_t max_sendfile = (mog_ioq_contended() ? 1 : 100) * 1024 * 1024;
 
         assert(http->wbuf == NULL && "can't serve file with http->wbuf");
         assert(http->forward && http->forward != MOG_IOSTAT && "bad forward");
diff --git a/http_put.c b/http_put.c
index 277faa7..56a6325 100644
--- a/http_put.c
+++ b/http_put.c
@@ -488,6 +488,9 @@ retry:
                 need -= r;
                 if (need == 0)
                         return http_put_commit(http);
+
+                if (mog_ioq_contended())
+                        return MOG_NEXT_WAIT_RD;
                 goto again;
         }
         if (r != 0) {
@@ -543,6 +546,9 @@ again:
                 /* chunk is complete */
                 if (http->_p.content_len == 0)
                         mog_chunk_init(http);
+
+                if (mog_ioq_contended())
+                        return MOG_NEXT_WAIT_RD;
                 goto again;
         case MOG_CHUNK_STATE_TRAILER:
 chunk_state_trailer:
diff --git a/ioq.c b/ioq.c
index 028b225..829e00d 100644
--- a/ioq.c
+++ b/ioq.c
@@ -19,11 +19,20 @@ void mog_ioq_init(struct mog_ioq *ioq, struct mog_svc *svc, size_t val)
         ioq->cur = val;
         ioq->max = val;
         ioq->svc = svc;
+        ioq->contended = false;
         SIMPLEQ_INIT(&ioq->ioq_head);
         CHECK(int, 0, pthread_mutex_init(&ioq->mtx, NULL));
 }
 
 /*
+ * this is only a hint, so no explicit memory barriers or atomics
+ */
+static inline void ioq_set_contended(struct mog_ioq *ioq)
+{
+        ioq->contended = true;
+}
+
+/*
  * This is like sem_trywait.  Each thread is only allowed to acquire
  * one ioq at once.
  *
@@ -43,6 +52,7 @@ bool mog_ioq_ready(struct mog_ioq *ioq, struct mog_fd *client_mfd)
         } else {
                 client_mfd->ioq_blocked = 1;
                 SIMPLEQ_INSERT_TAIL(&ioq->ioq_head, client_mfd, ioqent);
+                ioq_set_contended(ioq);
         }
 
         CHECK(int, 0, pthread_mutex_unlock(&ioq->mtx));
@@ -70,8 +80,13 @@ void mog_ioq_next(struct mog_ioq *check_ioq)
         if (mog_ioq_current->cur <= mog_ioq_current->max) {
                 /* wake up any waiters */
                 client_mfd = SIMPLEQ_FIRST(&mog_ioq_current->ioq_head);
-                if (client_mfd)
+                if (client_mfd) {
                         SIMPLEQ_REMOVE_HEAD(&mog_ioq_current->ioq_head, ioqent);
+
+                        /* if there's another head, we're still contended */
+                        if (SIMPLEQ_FIRST(&mog_ioq_current->ioq_head))
+                                ioq_set_contended(mog_ioq_current);
+                }
         } else {
                 /* mog_ioq_adjust was called and lowered our capacity */
                 mog_ioq_current->cur--;
@@ -85,6 +100,27 @@ void mog_ioq_next(struct mog_ioq *check_ioq)
         mog_ioq_current = NULL;
 }
 
+/*
+ * Returns true if the currently held ioq is contended.
+ * This releases the contended flag if it is set, so the caller
+ * is expected to yield the current thread shortly afterwards.
+ * This is only a hint.
+ */
+bool mog_ioq_contended(void)
+{
+        struct mog_ioq *cur = mog_ioq_current;
+
+        /* assume contended for non /devXXX* paths */
+        if (!cur)
+                return true;
+
+        /*
+         * we only want to minimize the threads hitting true, so we use
+         * an atomic exchange and hope for the best.  This is only a hint.
+         */
+        return __sync_bool_compare_and_swap(&cur->contended, true, false);
+}
+
 void mog_ioq_destroy(struct mog_ioq *ioq)
 {
         CHECK(int, 0, pthread_mutex_destroy(&ioq->mtx));