mogilefs-client.git  about / heads / tags
MogileFS client library for Ruby
blob aa0bcf2434245fb0a4ebb49b6ebd7ba1f95ac4e4 5622 bytes (raw)
$ git show HEAD:lib/mogilefs/http_file.rb	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
 
# -*- encoding: binary -*-
# here are internal implementation details, do not use them in your code
require 'stringio'
require 'mogilefs/new_file'

##
# HTTPFile wraps up the new file operations for storing files onto an HTTP
# storage node.
#
# You really don't want to create an HTTPFile by hand.  Instead you want to
# create a new file using MogileFS::MogileFS.new_file.
#
class MogileFS::HTTPFile < StringIO
  include MogileFS::NewFile::Common

  ##
  # The big_io name in case we have file > 256M

  attr_accessor :big_io

  attr_accessor :streaming_io

  ##
  # Creates a new HTTPFile with MogileFS-specific data.  Use
  # MogileFS::MogileFS#new_file instead of this method.

  def initialize(dests, opts = nil)
    super ""
    @md5 = @streaming_io = @big_io = @active = nil
    @dests = dests
    @opts = Integer === opts ? { :content_length => opts } : opts
  end

  def request_put(sock, uri, file_size, input = nil)
    host_with_port = "#{uri.host}:#{uri.port}"
    clen = @opts[:content_length]
    file_size ||= clen

    content_md5 = @opts[:content_md5]
    if String === content_md5
      file_size or
        raise ArgumentError,
              ":content_length must be specified with :content_md5 String"
      file_size = "#{file_size}\r\nContent-MD5: #{content_md5}"
    elsif content_md5.respond_to?(:call) ||
          :trailer == content_md5 ||
          MD5_TRAILER_NODES[host_with_port]
      file_size = nil
      @md5 = Digest::MD5.new
    end

    if file_size
      sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \
                 "Content-Length: #{file_size}\r\n\r\n")
      rv = input ? MogileFS.io.copy_stream(@active = input, sock) : yield(sock)
    else
      trailers = @md5 ? "Trailer: Content-MD5\r\n".freeze : "".freeze
      sock.write("PUT #{uri.request_uri} HTTP/1.1\r\n" \
                 "Host: #{host_with_port}\r\n#{trailers}" \
                 "Transfer-Encoding: chunked\r\n\r\n")
      tmp = MogileFS::Chunker.new(sock, @md5, content_md5)
      rv = input ? MogileFS.io.copy_stream(@active = input, tmp) : yield(tmp)
      tmp.flush
    end

    if clen && clen != rv
      raise MogileFS::SizeMismatchError,
            ":content_length expected: #{clen.inspect}, actual: #{rv.inspect}"
    end
    rv
  end

  def put_streaming_io(sock, uri) # unlikely to be used
    file_size = @streaming_io.length
    written = 0
    request_put(sock, uri, file_size) do |wr|
      @streaming_io.call(Proc.new do |data_to_write|
        written += wr.write(data_to_write)
      end)
    end
    file_size ? file_size : written
  end

  def rewind_or_raise!(uri, err)
    @active.rewind if @active
  rescue => e
    msg = "#{uri} failed with #{err.message} (#{err.class}) and " \
          "retrying is impossible as rewind on " \
          "#{@active.inspect} failed with: #{e.message} (#{e.class})"
    raise NonRetryableError, msg, e.backtrace
  end

  ##
  # Writes an HTTP PUT request to +sock+ to upload the file and
  # returns file size if the socket finished writing
  def upload(devid, uri) # :nodoc:
    sock = MogileFS::Socket.tcp(uri.host, uri.port)
    file_size = length

    if @streaming_io
      file_size = put_streaming_io(sock, uri)
    elsif @big_io
      stat = file = size = nil
      if @big_io.respond_to?(:stat)
        stat = @big_io.stat
      elsif String === @big_io || @big_io.respond_to?(:to_path)
        begin
          file = File.open(@big_io)
        rescue => e
          msg = "Failed to open input (#{@big_io.inspect}): " \
                "#{e.message} (#{e.class})"
          raise NonRetryableError, msg, e.backtrace
        end
        stat = file.stat
      elsif @big_io.respond_to?(:size)
        size = @big_io.size
      end
      if stat && stat.file?
        size ||= stat.size
        file ||= @big_io.to_io if @big_io.respond_to?(:to_io)
      end
      file_size = request_put(sock, uri, size, file || @big_io)
    else
      rewind
      request_put(sock, uri, file_size, self)
    end

    read_response(sock) # raises on errors
    file_size
  rescue SystemCallError, RetryableError => err
    rewind_or_raise!(uri, err)
    raise
  ensure
    file.close if file && @big_io != file
    sock.close if sock
  end

  def nhp_put(devid, uri)
    clen = @opts[:content_length]
    if clen && clen != size
      raise MogileFS::SizeMismatchError,
        ":content_length expected: #{clen.inspect}, actual: #{size}"
    end

    put = Net::HTTP::Put.new(uri.path)
    put["Content-Type".freeze] = "application/octet-stream".freeze
    if md5 = @opts[:content_md5]
      if md5.respond_to?(:call)
        md5 = md5.call.strip
      elsif md5 == :trailer
        md5 = [ Digest::MD5.digest(string) ].pack("m".freeze).chomp!
      end
      put["Content-MD5".freeze] = md5
    end
    put.body = string
    res = @opts[:nhp_put].request(uri, put)
    return size if Net::HTTPSuccess === res
    raise BadResponseError, "#{res.code} #{res.message}"
  rescue => e
    /\ANet::/ =~ "#{e.class}" and
        raise RetryableError, "#{e.message} (#{e.class})", e.backtrace
    raise
  end

  def commit
    errors = nil
    @dests.each do |devid, path|
      begin
        uri = URI.parse(path)
        bytes_uploaded = size > 0 ? nhp_put(devid, uri) : upload(devid, uri)
        return create_close(devid, uri, bytes_uploaded)
      rescue Timeout::Error, SystemCallError, RetryableError => e
        errors ||= []
        errors << "#{path} - #{e.message} (#{e.class})"
      end
    end

    raise NoStorageNodesError,
          "all paths failed with PUT: #{errors.join(', ')}", []
  end

  def close
    commit
    super
  end
end

git clone https://yhbt.net/mogilefs-client.git