mogilefs-client.git  about / heads / tags
MogileFS client library for Ruby
blob 8784f686d5414edd9a0df1d4f82bb2ee795cdd09 4539 bytes (raw)
$ git show pu:lib/mogilefs/http_file.rb	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
 
# -*- encoding: binary -*-
# here are internal implementation details, do not use them in your code
require 'stringio'
require 'mogilefs/new_file'

##
# HTTPFile wraps up the new file operations for storing files onto an HTTP
# storage node.
#
# You really don't want to create an HTTPFile by hand.  Instead you want to
# create a new file using MogileFS::MogileFS.new_file.
#
class MogileFS::HTTPFile < StringIO
  include MogileFS::NewFile::Common

  ##
  # The big_io name in case we have file > 256M

  attr_accessor :big_io

  attr_accessor :streaming_io

  ##
  # Creates a new HTTPFile with MogileFS-specific data.  Use
  # MogileFS::MogileFS#new_file instead of this method.

  def initialize(dests, opts = nil)
    super ""
    @md5 = @streaming_io = @big_io = @active = nil
    @dests = dests
    @opts = Integer === opts ? { :content_length => opts } : opts
  end

  def request_put(sock, uri, file_size, input = nil)
    host_with_port = "#{uri.host}:#{uri.port}"
    clen = @opts[:content_length]
    file_size ||= clen

    content_md5 = @opts[:content_md5]
    if String === content_md5
      file_size or
        raise ArgumentError,
              ":content_length must be specified with :content_md5 String"
      file_size = "#{file_size}\r\nContent-MD5: #{content_md5}"
    elsif content_md5.respond_to?(:call) ||
          :trailer == content_md5 ||
          MD5_TRAILER_NODES[host_with_port]
      file_size = nil
      @md5 = Digest::MD5.new
    end

    if file_size
      sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \
                 "Content-Length: #{file_size}\r\n\r\n")
      rv = input ? MogileFS.io.copy_stream(@active = input, sock) : yield(sock)
    else
      trailers = @md5 ? "Trailer: Content-MD5\r\n" : ""
      sock.write("PUT #{uri.request_uri} HTTP/1.1\r\n" \
                 "Host: #{host_with_port}\r\n#{trailers}" \
                 "Transfer-Encoding: chunked\r\n\r\n")
      tmp = MogileFS::Chunker.new(sock, @md5, content_md5)
      rv = input ? MogileFS.io.copy_stream(@active = input, tmp) : yield(tmp)
      tmp.flush
    end

    if clen && clen != rv
      raise MogileFS::SizeMismatchError,
            ":content_length expected: #{clen.inspect}, actual: #{rv.inspect}"
    end
    rv
  end

  def put_streaming_io(sock, uri) # unlikely to be used
    file_size = @streaming_io.length
    written = 0
    request_put(sock, uri, file_size) do |wr|
      @streaming_io.call(Proc.new do |data_to_write|
        written += wr.write(data_to_write)
      end)
    end
    file_size ? file_size : written
  end

  def rewind_or_raise!(uri, err)
    @active.rewind if @active
    rescue => e
      msg = "#{uri} failed with #{err.message} (#{err.class}) and " \
            "retrying is impossible as rewind on " \
            "#{@active.inspect} failed with: #{e.message} (#{e.class})"
      raise NonRetryableError, msg, e.backtrace
  end

  ##
  # Writes an HTTP PUT request to +sock+ to upload the file and
  # returns file size if the socket finished writing
  def upload(devid, uri) # :nodoc:
    sock = MogileFS::Socket.tcp(uri.host, uri.port)
    set_socket_options(sock)
    file_size = length

    if @streaming_io
      file_size = put_streaming_io(sock, uri)
    elsif @big_io
      stat = file = size = nil
      if @big_io.respond_to?(:stat)
        stat = @big_io.stat
      elsif String === @big_io || @big_io.respond_to?(:to_path)
        file = File.open(@big_io)
        stat = file.stat
      elsif @big_io.respond_to?(:size)
        size = @big_io.size
      end
      if stat && stat.file?
        size ||= stat.size
        file ||= @big_io.to_io if @big_io.respond_to?(:to_io)
      end
      file_size = request_put(sock, uri, size, file || @big_io)
    else
      rewind
      request_put(sock, uri, file_size, self)
    end

    read_response(sock) # raises on errors
    file_size
    rescue SystemCallError, RetryableError => err
      rewind_or_raise!(uri, err)
      raise
    ensure
      file.close if file && @big_io != file
      sock.close if sock
  end

  def commit
    errors = nil
    @dests.each do |devid, path|
      begin
        uri = URI.parse(path)
        bytes_uploaded = upload(devid, uri)
        return create_close(devid, uri, bytes_uploaded)
      rescue SystemCallError, RetryableError => e
        errors ||= []
        errors << "#{path} - #{e.message} (#{e.class})"
      end
    end

    raise NoStorageNodesError,
          "all paths failed with PUT: #{errors.join(', ')}", []
  end

  def close
    commit
    super
  end
end

git clone https://yhbt.net/mogilefs-client.git