diff options
author | Eric Wong <normalperson@yhbt.net> | 2011-11-04 02:15:15 +0000 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2011-11-04 02:15:15 +0000 |
commit | afe10667b8deeb60e3f70d2ddc7a20ae7e0bc072 (patch) | |
tree | 7dc79f89f0bfa1bd6b6d1d927fdabbeeeab593e3 | |
parent | 8a6680f0209bce12d56ae2ee2613448783f3d55a (diff) | |
download | mogilefs-client-afe10667b8deeb60e3f70d2ddc7a20ae7e0bc072.tar.gz |
It's only used internally by the deprecated bigfile implementation, and we don't even need it for that.
-rw-r--r-- | Manifest.txt | 1 | ||||
-rw-r--r-- | lib/mogilefs/bigfile.rb | 24 | ||||
-rw-r--r-- | lib/mogilefs/network.rb | 108 | ||||
-rw-r--r-- | test/test_network.rb | 56 |
4 files changed, 16 insertions, 173 deletions
diff --git a/Manifest.txt b/Manifest.txt index f11c2f3..2426585 100644 --- a/Manifest.txt +++ b/Manifest.txt @@ -14,7 +14,6 @@ lib/mogilefs/client.rb lib/mogilefs/httpfile.rb lib/mogilefs/mogilefs.rb lib/mogilefs/mysql.rb -lib/mogilefs/network.rb lib/mogilefs/pool.rb lib/mogilefs/util.rb setup.rb diff --git a/lib/mogilefs/bigfile.rb b/lib/mogilefs/bigfile.rb index 95e89ad..15e65d9 100644 --- a/lib/mogilefs/bigfile.rb +++ b/lib/mogilefs/bigfile.rb @@ -1,7 +1,6 @@ # -*- encoding: binary -*- require 'uri' require 'mogilefs/util' -require 'mogilefs/network' # Used for reading deprecated "bigfile" objects generated by the deprecated # mogtool(1) utility. This is for reading legacy data and not recommended for @@ -11,13 +10,17 @@ require 'mogilefs/network' module MogileFS::Bigfile # VALID_TYPES = %w(file tarball partition).map { |x| x.freeze }.freeze - include MogileFS::Network # returns a big_info hash if successful def bigfile_stat(key) bigfile_parse_info(get_file_data(key)) end + def trypath(path) + http_read_sock(URI.parse(path)) + rescue + end + # returns total bytes written and the big_info hash if successful, raises an # exception if not. wr_io is expected to be an IO-like object capable of # receiving the write method. @@ -33,16 +36,21 @@ module MogileFS::Bigfile info[:parts].each_with_index do |part,part_nr| next if part_nr == 0 # info[:parts][0] is always empty - uris = verify_uris(part[:paths].map { |path| URI.parse(path) }) - if uris.empty? + + sock = nil + part[:paths].each { |path| sock = trypath(path) and break } + + unless sock # part[:paths] may not be valid anymore due to rebalancing, however we # can get_keys on key,<part_nr> and retry paths if all paths fail - part[:paths] = get_paths("#{key.gsub(/^big_info:/, '')},#{part_nr}") - uris = verify_uris(part[:paths].map { |path| URI.parse(path) }) - raise MogileFS::Backend::NoDevices if uris.empty? + part_key = "#{key.sub(/^big_info:/, '')},#{part_nr}" + get_paths(part_key).each { |path| sock = trypath(path) and break } + unless sock + raise MogileFS::Backend::NoDevices, + "no device for key=#{part_key.inspect}" + end end - sock = http_read_sock(uris[0]) w = copy_stream(sock, wr_io) wr_io.respond_to?(:md5_check!) and wr_io.md5_check!(part[:md5]) diff --git a/lib/mogilefs/network.rb b/lib/mogilefs/network.rb deleted file mode 100644 index 52dc257..0000000 --- a/lib/mogilefs/network.rb +++ /dev/null @@ -1,108 +0,0 @@ -# -*- encoding: binary -*- -# here are internal implementation details, do not use them in your code -require 'mogilefs' -require 'socket' -require 'mogilefs/util' - -module MogileFS::Network - # given an array of URIs, verify that at least one of them is accessible - # with the expected HTTP code within the timeout period (in seconds). - def verify_uris(uris = [], expect = '200', timeout = 2.00) - uri_socks = {} - - # first, we asynchronously connect to all of them - uris.each do |uri| - sock = MogileFS::Socket.start(uri.host, uri.port) rescue next - uri_socks[sock] = uri - end - - # wait for at least one of them to finish connecting and send - # HTTP requests to the connected ones - sockets, timeout = get_writable_set(uri_socks, timeout) - - # Await a response from the sockets we had written to, we only need one - # valid response, but we'll take more if they return simultaneously - sockets[0] ? get_readable_uris(sockets, uri_socks, expect, timeout) : [] - - ensure - uri_socks.keys.each { |sock| sock.close rescue nil } - end - - private - include MogileFS::Util - - # returns an array of writeable Sockets and leftover from timeout - def get_writable_set(uri_socks, timeout) - sockets = [] - begin - t0 = Time.now - r = begin - IO.select(nil, uri_socks.keys, nil, timeout > 0 ? timeout : 0) - rescue - # get rid of bad descriptors - uri_socks.delete_if do |sock, uri| - begin - sock.recv_nonblock(1) - false # should never get here for HTTP, really... - rescue Errno::EAGAIN, Errno::EINTR - false - rescue - sock.close rescue nil - true - end - end - timeout -= (Time.now - t0) - retry if timeout >= 0 - end - - break unless r && r[1] - - r[1].each do |sock| - begin - # we don't care about short/interrupted writes here, if the - # following request fails or blocks then the server is - # flat-out hopeless - sock.write_nonblock( - "HEAD #{uri_socks[sock].request_uri} HTTP/1.0\r\n\r\n") - sockets << sock - rescue - sock.close rescue nil - end - end - - timeout -= (Time.now - t0) - end until (sockets[0] || timeout < 0) - - [ sockets, timeout ] - end - - # returns an array of URIs from uri_socks that are good - def get_readable_uris(sockets, uri_socks, expect, timeout) - ok_uris = [] - - begin - t0 = Time.now - r = IO.select(sockets, nil, nil, timeout > 0 ? timeout : 0) rescue nil - - (r && r[0] ? r[0] : sockets).each do |sock| - buf = begin - sock.recv_nonblock(128, Socket::MSG_PEEK) - rescue Errno::EAGAIN, Errno::EINTR - next - rescue - sockets.delete(sock) # socket went bad - next - end - - if buf && /\AHTTP\/[\d\.]+ #{expect} / =~ buf - ok_uris << uri_socks.delete(sock) - sock.close rescue nil - end - end - timeout -= (Time.now - t0) - end until ok_uris[0] || timeout < 0 - - ok_uris - end - -end # module MogileFS::Network diff --git a/test/test_network.rb b/test/test_network.rb deleted file mode 100644 index eda5e04..0000000 --- a/test/test_network.rb +++ /dev/null @@ -1,56 +0,0 @@ -# -*- encoding: binary -*- -require './test/setup' -require 'mogilefs' -require 'mogilefs/network' - -class TestNetwork < Test::Unit::TestCase - include MogileFS::Network - - def test_verify_uris - good = TempServer.new(Proc.new do |serv,port| - client,client_addr = serv.accept - client.readpartial(4096) - client.syswrite("HTTP/1.0 200 OK\r\nContent-Length: 0\r\n\r\n") - end) - bad = TempServer.new(Proc.new do |serv,port| - client, client_addr = serv.accept - client.close rescue nil - end) - - good_uri = URI.parse("http://127.0.0.1:#{good.port}/") - bad_uri = URI.parse("http://127.0.0.1:#{bad.port}/") - ok = verify_uris([ good_uri, bad_uri ]) - assert_equal [ good_uri ], ok - ensure - TempServer.destroy_all! - end - - def test_verify_non_existent - good = TempServer.new(Proc.new do |serv,port| - client,client_addr = serv.accept - client.readpartial(4096) - client.syswrite("HTTP/1.0 200 OK\r\nContent-Length: 0\r\n\r\n") - end) - bad = TempServer.new(Proc.new { |serv,port| serv.close }) - - good_uri = URI.parse("http://127.0.0.1:#{good.port}/") - bad_uri = URI.parse("http://127.0.0.1:#{bad.port}/") - ok = verify_uris([ good_uri, bad_uri ]) - assert_equal [ good_uri ], ok - ensure - TempServer.destroy_all! - end - - def test_verify_all_bad - good = TempServer.new(Proc.new { |serv,port| serv.close }) - bad = TempServer.new(Proc.new { |serv,port| serv.close }) - - good_uri = URI.parse("http://127.0.0.1:#{good.port}/") - bad_uri = URI.parse("http://127.0.0.1:#{bad.port}/") - ok = verify_uris([ good_uri, bad_uri ], '200', 1.0) - assert ok.empty?, "nothing returned" - ensure - TempServer.destroy_all! - end - -end |