about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2011-11-04 02:15:15 +0000
committerEric Wong <normalperson@yhbt.net>2011-11-04 02:15:15 +0000
commitafe10667b8deeb60e3f70d2ddc7a20ae7e0bc072 (patch)
tree7dc79f89f0bfa1bd6b6d1d927fdabbeeeab593e3
parent8a6680f0209bce12d56ae2ee2613448783f3d55a (diff)
downloadmogilefs-client-afe10667b8deeb60e3f70d2ddc7a20ae7e0bc072.tar.gz
It's only used internally by the deprecated bigfile
implementation, and we don't even need it for that.
-rw-r--r--Manifest.txt1
-rw-r--r--lib/mogilefs/bigfile.rb24
-rw-r--r--lib/mogilefs/network.rb108
-rw-r--r--test/test_network.rb56
4 files changed, 16 insertions, 173 deletions
diff --git a/Manifest.txt b/Manifest.txt
index f11c2f3..2426585 100644
--- a/Manifest.txt
+++ b/Manifest.txt
@@ -14,7 +14,6 @@ lib/mogilefs/client.rb
 lib/mogilefs/httpfile.rb
 lib/mogilefs/mogilefs.rb
 lib/mogilefs/mysql.rb
-lib/mogilefs/network.rb
 lib/mogilefs/pool.rb
 lib/mogilefs/util.rb
 setup.rb
diff --git a/lib/mogilefs/bigfile.rb b/lib/mogilefs/bigfile.rb
index 95e89ad..15e65d9 100644
--- a/lib/mogilefs/bigfile.rb
+++ b/lib/mogilefs/bigfile.rb
@@ -1,7 +1,6 @@
 # -*- encoding: binary -*-
 require 'uri'
 require 'mogilefs/util'
-require 'mogilefs/network'
 
 # Used for reading deprecated "bigfile" objects generated by the deprecated
 # mogtool(1) utility.  This is for reading legacy data and not recommended for
@@ -11,13 +10,17 @@ require 'mogilefs/network'
 
 module MogileFS::Bigfile
   # VALID_TYPES = %w(file tarball partition).map { |x| x.freeze }.freeze
-  include MogileFS::Network
 
   # returns a big_info hash if successful
   def bigfile_stat(key)
     bigfile_parse_info(get_file_data(key))
   end
 
+  def trypath(path)
+    http_read_sock(URI.parse(path))
+    rescue
+  end
+
   # returns total bytes written and the big_info hash if successful, raises an
   # exception if not.  wr_io is expected to be an IO-like object capable of
   # receiving the write method.
@@ -33,16 +36,21 @@ module MogileFS::Bigfile
 
     info[:parts].each_with_index do |part,part_nr|
       next if part_nr == 0 # info[:parts][0] is always empty
-      uris = verify_uris(part[:paths].map { |path| URI.parse(path) })
-      if uris.empty?
+
+      sock = nil
+      part[:paths].each { |path| sock = trypath(path) and break }
+
+      unless sock
         # part[:paths] may not be valid anymore due to rebalancing, however we
         # can get_keys on key,<part_nr> and retry paths if all paths fail
-        part[:paths] = get_paths("#{key.gsub(/^big_info:/, '')},#{part_nr}")
-        uris = verify_uris(part[:paths].map { |path| URI.parse(path) })
-        raise MogileFS::Backend::NoDevices if uris.empty?
+        part_key = "#{key.sub(/^big_info:/, '')},#{part_nr}"
+        get_paths(part_key).each { |path| sock = trypath(path) and break }
+        unless sock
+          raise MogileFS::Backend::NoDevices,
+                "no device for key=#{part_key.inspect}"
+        end
       end
 
-      sock = http_read_sock(uris[0])
       w = copy_stream(sock, wr_io)
 
       wr_io.respond_to?(:md5_check!) and wr_io.md5_check!(part[:md5])
diff --git a/lib/mogilefs/network.rb b/lib/mogilefs/network.rb
deleted file mode 100644
index 52dc257..0000000
--- a/lib/mogilefs/network.rb
+++ /dev/null
@@ -1,108 +0,0 @@
-# -*- encoding: binary -*-
-# here are internal implementation details, do not use them in your code
-require 'mogilefs'
-require 'socket'
-require 'mogilefs/util'
-
-module MogileFS::Network
-  # given an array of URIs, verify that at least one of them is accessible
-  # with the expected HTTP code within the timeout period (in seconds).
-  def verify_uris(uris = [], expect = '200', timeout = 2.00)
-    uri_socks = {}
-
-    # first, we asynchronously connect to all of them
-    uris.each do |uri|
-      sock = MogileFS::Socket.start(uri.host, uri.port) rescue next
-      uri_socks[sock] = uri
-    end
-
-    # wait for at least one of them to finish connecting and send
-    # HTTP requests to the connected ones
-    sockets, timeout = get_writable_set(uri_socks, timeout)
-
-    # Await a response from the sockets we had written to, we only need one
-    # valid response, but we'll take more if they return simultaneously
-    sockets[0] ? get_readable_uris(sockets, uri_socks, expect, timeout) : []
-
-    ensure
-      uri_socks.keys.each { |sock| sock.close rescue nil }
-  end
-
-  private
-    include MogileFS::Util
-
-    # returns an array of writeable Sockets and leftover from timeout
-    def get_writable_set(uri_socks, timeout)
-      sockets = []
-      begin
-        t0 = Time.now
-        r = begin
-         IO.select(nil, uri_socks.keys, nil, timeout > 0 ? timeout : 0)
-        rescue
-          # get rid of bad descriptors
-          uri_socks.delete_if do |sock, uri|
-            begin
-              sock.recv_nonblock(1)
-              false # should never get here for HTTP, really...
-            rescue Errno::EAGAIN, Errno::EINTR
-              false
-            rescue
-              sock.close rescue nil
-              true
-            end
-          end
-          timeout -= (Time.now - t0)
-          retry if timeout >= 0
-        end
-
-        break unless r && r[1]
-
-        r[1].each do |sock|
-          begin
-            # we don't care about short/interrupted writes here, if the
-            # following request fails or blocks then the server is
-            # flat-out hopeless
-            sock.write_nonblock(
-              "HEAD #{uri_socks[sock].request_uri} HTTP/1.0\r\n\r\n")
-            sockets << sock
-          rescue
-            sock.close rescue nil
-          end
-        end
-
-        timeout -= (Time.now - t0)
-      end until (sockets[0] || timeout < 0)
-
-      [ sockets, timeout ]
-    end
-
-    # returns an array of URIs from uri_socks that are good
-    def get_readable_uris(sockets, uri_socks, expect, timeout)
-      ok_uris = []
-
-      begin
-        t0 = Time.now
-        r = IO.select(sockets, nil, nil, timeout > 0 ? timeout : 0) rescue nil
-
-        (r && r[0] ? r[0] : sockets).each do |sock|
-          buf = begin
-            sock.recv_nonblock(128, Socket::MSG_PEEK)
-          rescue Errno::EAGAIN, Errno::EINTR
-            next
-          rescue
-            sockets.delete(sock) # socket went bad
-            next
-          end
-
-          if buf && /\AHTTP\/[\d\.]+ #{expect} / =~ buf
-            ok_uris << uri_socks.delete(sock)
-            sock.close rescue nil
-          end
-        end
-        timeout -= (Time.now - t0)
-      end until ok_uris[0] || timeout < 0
-
-      ok_uris
-    end
-
-end # module MogileFS::Network
diff --git a/test/test_network.rb b/test/test_network.rb
deleted file mode 100644
index eda5e04..0000000
--- a/test/test_network.rb
+++ /dev/null
@@ -1,56 +0,0 @@
-# -*- encoding: binary -*-
-require './test/setup'
-require 'mogilefs'
-require 'mogilefs/network'
-
-class TestNetwork < Test::Unit::TestCase
-  include MogileFS::Network
-
-  def test_verify_uris
-    good = TempServer.new(Proc.new do |serv,port|
-      client,client_addr = serv.accept
-      client.readpartial(4096)
-      client.syswrite("HTTP/1.0 200 OK\r\nContent-Length: 0\r\n\r\n")
-    end)
-    bad = TempServer.new(Proc.new do |serv,port|
-      client, client_addr = serv.accept
-      client.close rescue nil
-    end)
-
-    good_uri = URI.parse("http://127.0.0.1:#{good.port}/")
-    bad_uri = URI.parse("http://127.0.0.1:#{bad.port}/")
-    ok = verify_uris([ good_uri, bad_uri ])
-    assert_equal [ good_uri ], ok
-    ensure
-      TempServer.destroy_all!
-  end
-
-  def test_verify_non_existent
-    good = TempServer.new(Proc.new do |serv,port|
-      client,client_addr = serv.accept
-      client.readpartial(4096)
-      client.syswrite("HTTP/1.0 200 OK\r\nContent-Length: 0\r\n\r\n")
-    end)
-    bad = TempServer.new(Proc.new { |serv,port| serv.close })
-
-    good_uri = URI.parse("http://127.0.0.1:#{good.port}/")
-    bad_uri = URI.parse("http://127.0.0.1:#{bad.port}/")
-    ok = verify_uris([ good_uri, bad_uri ])
-    assert_equal [ good_uri ], ok
-    ensure
-      TempServer.destroy_all!
-  end
-
-  def test_verify_all_bad
-    good = TempServer.new(Proc.new { |serv,port| serv.close })
-    bad = TempServer.new(Proc.new { |serv,port| serv.close })
-
-    good_uri = URI.parse("http://127.0.0.1:#{good.port}/")
-    bad_uri = URI.parse("http://127.0.0.1:#{bad.port}/")
-    ok = verify_uris([ good_uri, bad_uri ], '200', 1.0)
-    assert ok.empty?, "nothing returned"
-    ensure
-      TempServer.destroy_all!
-  end
-
-end