1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
| | # -*- encoding: binary -*-
# internal implementation details here, do not rely on them in your code
# This class is needed because Net::HTTP streaming is still inefficient
# for reading huge response bodies over fast LANs.
class MogileFS::HTTPReader < MogileFS::Socket
attr_accessor :content_length, :uri
# backwards compat, if anybody cares
alias mogilefs_size content_length # :nodoc:
# this may OOM your system on large files
def to_s
buf = ""
read(@content_length, buf)
return buf if buf.size == @content_length
raise MogileFS::SizeMismatchError,
"read=#{buf.size} bytes, expected=#@content_length from #@uri", []
end
def stream_to(dest)
rv = MogileFS.io.copy_stream(self, dest)
return rv if rv == @content_length
raise MogileFS::SizeMismatchError,
"read=#{rv} bytes, expected=#@content_length from #@uri", []
end
def self.first(paths, timeout, range = nil)
errors = nil
range = "Range: bytes=#{range[0]}-#{range[1]}\r\n" if range
paths.each do |path|
begin
sock = try(path, timeout, range) and return sock
rescue => e
errors ||= []
errors << "#{path} - #{e.message} (#{e.class})"
end
end
raise MogileFS::Error,
"all paths failed with GET: #{errors.join(', ')}", []
end
# given a path, this returns a readable socket with ready data from the
# body of the response.
def self.try(path, timeout, range) # :nodoc:
uri = URI.parse(path)
expire_at = MogileFS.now + timeout
sock = tcp(uri.host, uri.port, timeout)
buf = "GET #{uri.request_uri} HTTP/1.0\r\n#{range}\r\n" # no chunking
sock.timed_write(buf, timeout)
begin
raise MogileFS::Timeout if MogileFS.now > expire_at
sock.timed_peek(2048, buf, timeout) or
raise MogileFS::InvalidResponseError, "EOF while reading header", []
end until /\r\n\r\n/ =~ buf
head, _ = buf.split(/\r\n\r\n/, 2)
if ((range && head =~ %r{\AHTTP/\d+\.\d+\s+206\s*}) ||
(!range && head =~ %r{\AHTTP/\d+\.\d+\s+200\s*})) &&
head =~ %r{^Content-Length:\s*(\d+)}i
sock.content_length = $1.to_i
sock.uri = uri
sock.timed_read(head.bytesize + 4, buf, 0)
return sock
end
msg = range ? "Expected 206 w/#{range.strip}: " : "header="
msg << head.inspect
raise MogileFS::InvalidResponseError, msg, []
rescue
sock.close if sock
raise
end
end
|