about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2013-02-28 09:00:50 +0000
committerEric Wong <normalperson@yhbt.net>2013-02-28 09:00:50 +0000
commit6b45526295eb68313075c19d66b86e6a524bc0a3 (patch)
tree1793cf7c829dfc5c309bfc6e722cde0d72ea9901
parentadc750ab6600980ba98d77d371efb07b38886f30 (diff)
downloadcmogstored-socket_alive.tar.gz
Clients may start an fsck checksum request and not be around
to read the response.  So detect client death and abort
checksumming if we have a dead socket.

This is not extensively tested and may be overkill.
-rw-r--r--Makefile.am1
-rw-r--r--cmogstored.h3
-rw-r--r--digest.h3
-rw-r--r--mgmt.c13
-rw-r--r--socket_alive.c39
-rw-r--r--test/mgmt.rb53
6 files changed, 90 insertions, 22 deletions
diff --git a/Makefile.am b/Makefile.am
index 6fa520c..862cd53 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -81,6 +81,7 @@ mog_src += queue_step.c
 mog_src += selfwake.h
 mog_src += selfwake.c
 mog_src += sig.c
+mog_src += socket_alive.c
 mog_src += svc.c
 mog_src += svc_dev.c
 mog_src += thrpool.c
diff --git a/cmogstored.h b/cmogstored.h
index e72c071..84d3b69 100644
--- a/cmogstored.h
+++ b/cmogstored.h
@@ -387,6 +387,9 @@ void mog_mgmt_post_accept(int fd, struct mog_svc *);
 enum mog_next mog_mgmt_queue_step(struct mog_fd *) MOG_CHECK;
 void mog_mgmt_quit_step(struct mog_fd *);
 
+/* socket_alive.c */
+bool mog_socket_alive(struct mog_fd *);
+
 /* queue_epoll.c */
 struct mog_queue * mog_queue_new(void);
 void mog_idleq_add(struct mog_queue *, struct mog_fd *, enum mog_qev);
diff --git a/digest.h b/digest.h
index bef35b8..a013efa 100644
--- a/digest.h
+++ b/digest.h
@@ -5,7 +5,8 @@
 enum mog_digest_next {
         MOG_DIGEST_CONTINUE = 0,
         MOG_DIGEST_EOF,
-        MOG_DIGEST_ERROR
+        MOG_DIGEST_ERROR,
+        MOG_DIGEST_ABORT
 };
 
 /* XXX gc_hash_handle is a typedef which hides a pointer, ugh... */
diff --git a/mgmt.c b/mgmt.c
index b6b2aa5..5726ab7 100644
--- a/mgmt.c
+++ b/mgmt.c
@@ -12,6 +12,7 @@ static void mgmt_digest_step(struct mog_fd *mfd)
         struct mog_mgmt *mgmt = &mfd->as.mgmt;
         struct mog_fd *fmfd = mgmt->forward;
         enum mog_digest_next next;
+        struct mog_digest *digest = &fmfd->as.file.digest;
 
         /*
          * MOG_PRIO_FSCK means we're likely the _only_ thread handling
@@ -21,13 +22,16 @@ static void mgmt_digest_step(struct mog_fd *mfd)
                 int ioprio = mog_ioprio_drop();
 
                 do {
-                        next = mog_digest_read(&fmfd->as.file.digest, fmfd->fd);
+                        if (mog_socket_alive(mfd))
+                                next = mog_digest_read(digest, fmfd->fd);
+                        else
+                                next = MOG_DIGEST_ABORT;
                 } while (next == MOG_DIGEST_CONTINUE);
 
                 if (ioprio != -1)
                         mog_ioprio_restore(ioprio);
         } else {
-                next = mog_digest_read(&fmfd->as.file.digest, fmfd->fd);
+                next = mog_digest_read(digest, fmfd->fd);
         }
 
         assert(mgmt->wbuf == NULL && "wbuf should be NULL here");
@@ -39,6 +43,11 @@ static void mgmt_digest_step(struct mog_fd *mfd)
                 break;
         case MOG_DIGEST_ERROR:
                 mog_mgmt_fn_digest_err(mgmt);
+                break;
+        case MOG_DIGEST_ABORT:
+                syslog(LOG_ERR, "fd=%d aborted while waiting for fsck",
+                       mfd->fd);
+                mgmt->wbuf = MOG_WR_ERROR; /* fake the error */
         }
 
         if (mgmt->prio == MOG_PRIO_FSCK)
diff --git a/socket_alive.c b/socket_alive.c
new file mode 100644
index 0000000..37e52a1
--- /dev/null
+++ b/socket_alive.c
@@ -0,0 +1,39 @@
+/*
+ * Copyright (C) 2012-2013, Eric Wong <normalperson@yhbt.net>
+ * License: GPLv3 or later (see COPYING for details)
+ */
+#include "cmogstored.h"
+
+/*
+ * check for POLLERR/POLLHUP on the socket to see if it died while
+ * we were in the queue.  This can save us from expensive processing.
+ */
+bool mog_socket_alive(struct mog_fd *mfd)
+{
+        static const short dead_events = POLLERR | POLLHUP;
+        struct pollfd fds = { .fd = mfd->fd, .events = POLLIN };
+        int rc;
+        int nbytes;
+
+        do {
+                rc = poll(&fds, 1, 0);
+        } while (rc < 0 && errno == EINTR);
+
+        if (rc == 1) {
+                if (fds.revents & POLLIN) {
+                        CHECK(int, 0, ioctl(mfd->fd, FIONREAD, &nbytes));
+                        return (nbytes <= 0) ? false : true;
+                }
+                return (fds.revents & dead_events) ? false : true;
+        }
+        if (rc == 0)
+                return true;
+
+        assert(rc < 0 && "poll returned unexpected value");
+
+        if (errno == ENOMEM || errno == EAGAIN)
+                return false; /* kernel is in trouble, abort */
+
+        assert(0 && "poll usage bug?");
+        return false;
+}
diff --git a/test/mgmt.rb b/test/mgmt.rb
index 373cd69..1086f56 100644
--- a/test/mgmt.rb
+++ b/test/mgmt.rb
@@ -4,6 +4,7 @@
 # License: GPLv3 or later (see COPYING for details)
 require 'test/test_helper'
 require 'digest/md5'
+require 'timeout'
 
 class TestMgmt < Test::Unit::TestCase
   def setup
@@ -238,28 +239,42 @@ class TestMgmt < Test::Unit::TestCase
   # ensure aborted requests do not trigger failure in graceful shutdown
   def test_concurrent_md5_fsck_abort
     sparse_file_prepare
+    nabort = 5
     File.open("#@tmpdir/dev666/sparse-file.fid") do |fp|
-      if fp.respond_to?(:advise)
-        # clear the cache
-        fp.advise(:dontneed)
-        req = "MD5 /dev666/sparse-file.fid fsck\r\n"
-        starter = get_client
-        clients = (1..5).map { get_client }
-
-        starter.write(req)
-        threads = clients.map do |c|
-          Thread.new(c) do |client|
-            client.write(req)
-            client.shutdown
-            client.close
-            :ok
-          end
+      unless fp.respond_to?(:advise)
+        skip("IO#advise not supported, skipping test") rescue nil
+        return
+      end
+      # clear the cache
+      fp.advise(:dontneed)
+      req = "MD5 /dev666/sparse-file.fid fsck\r\n"
+      starter = get_client
+      clients = (1..nabort).map { get_client }
+
+      starter.write(req)
+      threads = clients.map do |c|
+        Thread.new(c) do |client|
+          client.write(req)
+          client.shutdown
+          client.close
+          :ok
         end
-        threads.each { |thr| assert_equal :ok, thr.value }
-        line = starter.gets
-        assert_match(%r{\A/dev666/sparse-file\.fid MD5=[a-f0-9]{32}\r\n}, line)
-        starter.close
       end
+      threads.each { |thr| assert_equal :ok, thr.value }
+      line = starter.gets
+      assert_match(%r{\A/dev666/sparse-file\.fid MD5=[a-f0-9]{32}\r\n}, line)
+      starter.close
+
+      lines = nil
+      Timeout.timeout(30) do
+        begin
+          @err.rewind
+          lines = @err.readlines.grep(/aborted while waiting for fsck/)
+        end while lines.empty? && sleep(0.05)
+      end
+      assert lines[0], lines.inspect
+      @err.rewind
+      @err.truncate(0)
     end
   end