about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2013-06-21 03:34:28 +0000
committerEric Wong <normalperson@yhbt.net>2013-06-25 22:05:48 +0000
commit328623972837345dbcf3ed372293201e3bc4fe3c (patch)
treee3385166859612c64b1d4e34f44a61e70b91149c
parent40f84cd0924958c619d434a9147e7ed2b6abaadc (diff)
downloadcmogstored-328623972837345dbcf3ed372293201e3bc4fe3c.tar.gz
This should prevent one class of "accidental" failures.
(The sidechannel has never been meant to be secure and exposed
 to the public).
-rw-r--r--cmogstored.h3
-rw-r--r--svc.c46
-rw-r--r--test/mgmt.rb33
3 files changed, 57 insertions, 25 deletions
diff --git a/cmogstored.h b/cmogstored.h
index bc6ee59..a91f393 100644
--- a/cmogstored.h
+++ b/cmogstored.h
@@ -135,11 +135,13 @@ struct mog_mgmt {
 };
 
 struct mog_queue;
+struct mog_svc;
 struct mog_svc {
         int docroot_fd;
         const char *docroot;
         size_t nmogdev;
         size_t user_set_aio_threads; /* only touched by main/notify thread */
+        size_t user_req_aio_threads; /* protected by aio_threads_lock */
         size_t thr_per_dev;
 
         /* private */
@@ -150,6 +152,7 @@ struct mog_svc {
         pthread_mutex_t devstats_lock;
         struct mog_queue *queue;
         LIST_HEAD(mgmt_head, mog_mgmt) devstats_subscribers;
+        SIMPLEQ_ENTRY(mog_svc) qentry;
         mode_t put_perms;
         mode_t mkcol_perms;
         struct mog_fd *http_mfd;
diff --git a/svc.c b/svc.c
index cf6a083..27440ff 100644
--- a/svc.c
+++ b/svc.c
@@ -21,17 +21,9 @@ static mode_t mog_umask;
  * to the notify thread.
  */
 static pthread_mutex_t aio_threads_lock = PTHREAD_MUTEX_INITIALIZER;
-struct aio_threads_req;
-struct aio_threads_req {
-        struct mog_svc *svc;
-        size_t size;
-        SIMPLEQ_ENTRY(aio_threads_req) qentry;
-};
-
-static SIMPLEQ_HEAD(sq, aio_threads_req) aio_threads_qhead =
+static SIMPLEQ_HEAD(sq, mog_svc) aio_threads_qhead =
                                 SIMPLEQ_HEAD_INITIALIZER(aio_threads_qhead);
 
-
 static void svc_free(void *ptr)
 {
         struct mog_svc *svc = ptr;
@@ -256,16 +248,16 @@ bool mog_svc_start_each(void *svc_ptr, void *main_ptr)
  */
 void mog_svc_aio_threads_enqueue(struct mog_svc *svc, size_t size)
 {
-        struct aio_threads_req *req;
-
-        /* this gets free'ed in mog_thrpool_process_queue() */
-        req = xmalloc(sizeof(struct aio_threads_req));
-        req->size = size;
-        req->svc = svc;
+        size_t prev_enq;
 
-        /* put into the queue so main thread can process it */
         CHECK(int, 0, pthread_mutex_lock(&aio_threads_lock));
-        SIMPLEQ_INSERT_TAIL(&aio_threads_qhead, req, qentry);
+
+        prev_enq = svc->user_req_aio_threads;
+        svc->user_req_aio_threads = size;
+        if (!prev_enq)
+                /* put into the queue so main thread can process it */
+                SIMPLEQ_INSERT_TAIL(&aio_threads_qhead, svc, qentry);
+
         CHECK(int, 0, pthread_mutex_unlock(&aio_threads_lock));
 
         /* wake up the main thread so it can process the queue */
@@ -275,26 +267,30 @@ void mog_svc_aio_threads_enqueue(struct mog_svc *svc, size_t size)
 /* this runs in the main (notify) thread */
 void mog_svc_aio_threads_handler(void)
 {
-        struct aio_threads_req *req;
+        struct mog_svc *svc;
 
         /* guard against requests bundled in one wakeup by looping here */
         for (;;) {
+                size_t req_size = 0;
+
                 CHECK(int, 0, pthread_mutex_lock(&aio_threads_lock));
-                req = SIMPLEQ_FIRST(&aio_threads_qhead);
-                if (req)
+                svc = SIMPLEQ_FIRST(&aio_threads_qhead);
+                if (svc) {
                         SIMPLEQ_REMOVE_HEAD(&aio_threads_qhead, qentry);
+                        req_size = svc->user_req_aio_threads;
+                        svc->user_req_aio_threads = 0;
+                }
                 CHECK(int, 0, pthread_mutex_unlock(&aio_threads_lock));
 
                 /*
                  * spurious wakeup is possible since we loop here
                  * (and we must loop, see above comment)
                  */
-                if (req == NULL)
+                if (svc == NULL || req_size == 0)
                         return;
 
-                syslog(LOG_INFO, "server aio_threads=%zu", req->size);
-                req->svc->user_set_aio_threads = req->size;
-                mog_thrpool_set_size(&req->svc->queue->thrpool, req->size);
-                free(req);
+                syslog(LOG_INFO, "server aio_threads=%zu", req_size);
+                svc->user_set_aio_threads = req_size;
+                mog_thrpool_set_size(&svc->queue->thrpool, req_size);
         }
 }
diff --git a/test/mgmt.rb b/test/mgmt.rb
index deec383..7101245 100644
--- a/test/mgmt.rb
+++ b/test/mgmt.rb
@@ -290,6 +290,39 @@ class TestMgmt < Test::Unit::TestCase
     assert_match(%r{ERROR: unknown command}, @client.gets)
   end
 
+  def test_aio_threads_spam
+    tries = 1000
+    @client.write "WTF\r\n"
+    assert_match(%r{ERROR: unknown command}, @client.gets)
+    t_yield # wait for threads to spawn
+    100.times do |i|
+      @client.write "server aio_threads = 1\r\n"
+      assert_equal "\r\n", @client.readpartial(4)
+      @client.write "server aio_threads = 2\r\n"
+      assert_equal "\r\n", @client.readpartial(4)
+    end
+  end
+
+  def test_giant_devid_skip
+    max = 16777215 # devid is MEDIUMINT in DB
+    Dir.mkdir("#@tmpdir/dev#{max}")
+    Dir.mkdir("#@tmpdir/dev#{max + 1}")
+    @client.write "watch\n"
+    lines = []
+
+    2.times do # 2 times in case we're slow
+      begin
+        line = @client.gets
+        lines << line
+      end until line == ".\n"
+    end
+
+    assert lines.grep(/\b#{max}\b/)[0]
+    assert_nil lines.grep(/\b#{max + 1}\b/)[0]
+    assert File.exist?("#@tmpdir/dev#{max}/usage")
+    assert ! File.exist?("#@tmpdir/dev#{max + 1}/usage")
+  end
+
   def test_iostat_watch
     Dir.mkdir("#@tmpdir/dev666")
     @client.write "watch\n"