mogilefs-client.git  about / heads / tags
MogileFS client library for Ruby
blob 0fd03ad0f554bb885cd3cdac508f675dd5225d82 4377 bytes (raw)
$ git show pipeline:lib/mogilefs/http_file.rb	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
 
# -*- encoding: binary -*-
# here are internal implementation details, do not use them in your code
require 'stringio'
require 'uri'
require 'mogilefs/chunker'

##
# HTTPFile wraps up the new file operations for storing files onto an HTTP
# storage node.
#
# You really don't want to create an HTTPFile by hand.  Instead you want to
# create a new file using MogileFS::MogileFS.new_file.
#
class MogileFS::HTTPFile < StringIO
  class EmptyResponseError < MogileFS::Error; end
  class BadResponseError < MogileFS::Error; end
  class UnparseableResponseError < MogileFS::Error; end
  class NoStorageNodesError < MogileFS::Error
    def message; 'Unable to open socket to storage node'; end
  end
  class NonRetryableError < MogileFS::Error; end

  ##
  # The URI this file will be stored to.

  attr_reader :uri

  attr_reader :devid

  ##
  # The big_io name in case we have file > 256M

  attr_accessor :big_io

  attr_accessor :streaming_io

  ##
  # Creates a new HTTPFile with MogileFS-specific data.  Use
  # MogileFS::MogileFS#new_file instead of this method.

  def initialize(dests, content_length)
    super ""
    @streaming_io = @big_io = @uri = @devid = @active = nil
    @dests = dests
  end

  def request_put(sock, uri, file_size, input = nil)
    if file_size
      sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \
                 "Content-Length: #{file_size}\r\n\r\n")
      input ? MogileFS::X.copy_stream(@active = input, sock) : yield(sock)
    else
      sock.write("PUT #{uri.request_uri} HTTP/1.1\r\n" \
                 "Host: #{uri.host}:#{uri.port}\r\n" \
                 "Transfer-Encoding: chunked\r\n\r\n")
      tmp = MogileFS::Chunker.new(sock)
      rv = input ? MogileFS::X.copy_stream(@active = input, tmp) : yield(tmp)
      tmp.flush
      rv
    end
  end

  def put_streaming_io(sock, uri) # unlikely to be used
    file_size = @streaming_io.length
    written = 0
    request_put(sock, uri, file_size) do |wr|
      @streaming_io.call(Proc.new do |data_to_write|
        written += wr.write(data_to_write)
      end)
    end
    file_size ? file_size : written
  end

  def rewind_or_raise!(uri, err)
    @active.rewind if @active
    rescue => e
      msg = "#{uri} failed with #{err.message} (#{err.class}) and " \
            "retrying is impossible as rewind on " \
            "#{@active.inspect} failed with: #{e.message} (#{e.class})"
      raise NonRetryableError, msg, e.backtrace
  end

  ##
  # Writes an HTTP PUT request to +sock+ to upload the file and
  # returns file size if the socket finished writing
  def upload(devid, uri) # :nodoc:
    sock = MogileFS::Socket.tcp(uri.host, uri.port)
    file_size = length

    if @streaming_io
      file_size = put_streaming_io(sock, uri)
    elsif @big_io
      if String === @big_io || @big_io.respond_to?(:to_path)
        File.open(@big_io) do |rd|
          stat = rd.stat
          file_size = request_put(sock, uri, stat.file? ? stat.size : nil, rd)
        end
      else
        size = nil
        if @big_io.respond_to?(:stat)
          stat = @big_io.stat
          size = stat.size if stat.file?
        elsif @big_io.respond_to?(:size)
          size = @big_io.size
        end
        file_size = request_put(sock, uri, size, @big_io)
      end
    else
      sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \
                 "Content-Length: #{file_size}\r\n\r\n#{string}")
    end

    case line = sock.timed_read(23, "")
    when %r{^HTTP/\d\.\d\s+(2\d\d)\s} # success!
      file_size
    when nil
      raise EmptyResponseError, 'Unable to read response line from server'
    when %r{^HTTP/\d\.\d\s+(\d+)}
      raise BadResponseError, "HTTP response status from upload: #$1"
    else
      raise UnparseableResponseError, "Response line not understood: #{line}"
    end
    rescue => err
      rewind_or_raise!(uri, err)
      raise
    ensure
      sock.close if sock
  end

  def commit
    errors = nil
    @dests.each do |devid, path|
      begin
        uri = URI.parse(path)
        bytes_uploaded = upload(devid, uri)
        @devid, @uri = devid, uri
        return bytes_uploaded
      rescue NonRetryableError
        raise
      rescue => e
        errors ||= []
        errors << "#{path} - #{e.message} (#{e.class})"
      end
    end

    raise NoStorageNodesError,
          "all paths failed with PUT: #{errors.join(', ')}", []
  end
end

git clone https://yhbt.net/mogilefs-client.git