mogilefs-client.git  about / heads / tags
MogileFS client library for Ruby
blob c338e70cebc625926a7bbb12c32a5635634b41fb 2524 bytes (raw)
$ git show pipeline:lib/mogilefs/http_reader.rb	# shows this blob on the CLI

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
 
# -*- encoding: binary -*-
# internal implementation details here, do not rely on them in your code

# This class is needed because Net::HTTP streaming is still inefficient
# for reading huge response bodies over fast LANs.
class MogileFS::HTTPReader < MogileFS::Socket
  attr_accessor :content_length, :uri

  # backwards compat, if anybody cares
  alias mogilefs_size content_length # :nodoc:

  # this may OOM your system on large files
  def to_s
    buf = ""
    read(@content_length, buf)
    return buf if buf.size == @content_length

    raise MogileFS::SizeMismatchError,
          "read=#{buf.size} bytes, expected=#@content_length from #@uri", []
  end

  def stream_to(dest)
    rv = MogileFS::X.copy_stream(self, dest)
    return rv if rv == @content_length
    raise MogileFS::SizeMismatchError,
          "read=#{rv} bytes, expected=#@content_length from #@uri", []
  end

  def self.first(paths, timeout, count = nil, offset = nil)
    errors = nil
    if offset || count
      offset ||= 0
      range_end = count ? offset + count - 1 : ""
      range = "Range: bytes=#{offset}-#{range_end}\r\n"
    end

    paths.each do |path|
      begin
        sock = try(path, timeout, range) and return sock
      rescue => e
        errors ||= []
        errors << "#{path} - #{e.message} (#{e.class})"
      end
    end
    raise MogileFS::Error,
          "all paths failed with GET: #{errors.join(', ')}", []
  end

  # given a path, this returns a readable socket with ready data from the
  # body of the response.
  def self.try(path, timeout, range) # :nodoc:
    uri = URI.parse(path)
    sock = tcp(uri.host, uri.port, timeout)
    buf = "GET #{uri.request_uri} HTTP/1.0\r\n#{range}\r\n" # no chunking
    sock.timed_write(buf, timeout)

    sock.timed_peek(2048, buf, timeout) or
      raise MogileFS::InvalidResponseError, "EOF while reading header", []

    head, _ = buf.split(/\r\n\r\n/, 2)

    # we're dealing with a seriously slow/stupid HTTP server if we can't
    # get the header in a single recv(2) syscall.
    if ((range && head =~ %r{\AHTTP/\d+\.\d+\s+206\s*}) ||
        (!range && head =~ %r{\AHTTP/\d+\.\d+\s+200\s*})) &&
       head =~ %r{^Content-Length:\s*(\d+)}i
      sock.content_length = $1.to_i
      sock.uri = uri
      sock.timed_read(head.bytesize + 4, buf, 0)
      return sock
    end
    msg = range ? "Expected 206 w/#{range.strip}: " : "header="
    msg << head.inspect
    raise MogileFS::InvalidResponseError, msg, []
  rescue
    sock.close if sock
    raise
  end
end

git clone https://yhbt.net/mogilefs-client.git