mogilefs-client.git  about / heads / tags
MogileFS client library for Ruby
blob 8dd4725579cb56ebe757f6272f308a0363fe04a1 2488 bytes (raw)
$ git show pu:lib/mogilefs/new_file/content_range.rb	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
 
# -*- encoding: binary -*-
# here are internal implementation details, do not rely on them in your code
require 'net/http'
require 'mogilefs/new_file/writer'

# an IO-like object
class MogileFS::NewFile::ContentRange
  include MogileFS::NewFile::Writer
  include MogileFS::NewFile::Common
  attr_reader :md5

  # :stopdoc:
  begin
    require 'net/http/persistent'
    NHP = Net::HTTP::Persistent.new('mogilefs')

    def hit(uri, req)
      NHP.request(uri, req).value
    end
  rescue LoadError
    def hit(uri, req)
      Net::HTTP.start(uri.host, uri.port) { |h| h.request(req).value }
    end
  end
  # :startdoc:

  def initialize(dests, opts) # :nodoc:
    @dests = dests
    @opts = opts
    @devid = @uri = @md5 = nil
    @bytes_uploaded = 0
    @errors = []
  end

  def get_dest # :nodoc:
    return [ @devid, @uri ] if @uri
    rv = @dests.shift or no_nodes!
    rv[1] = URI.parse(rv[1])
    rv
  end

  def no_nodes! # :nodoc:
    raise NoStorageNodesError,
          "all paths failed with PUT: #{@errors.join(', ')}", []
  end

  def request_for(uri, buf) # :nodoc:
    put = Net::HTTP::Put.new(uri.path)
    put["Content-Type"] = "application/octet-stream"
    put["Content-MD5"] = [ Digest::MD5.digest(buf) ].pack("m").chomp!
    if @bytes_uploaded > 0
      last_byte = @bytes_uploaded + buf.bytesize - 1
      put["Content-Range"] = "bytes #@bytes_uploaded-#{last_byte}/*"
    end
    put.body = buf

    put
  end

  # see IO#write
  def write(buf)
    buf = String buf
    len = buf.bytesize
    return 0 if 0 == len

    devid, uri = get_dest
    put = request_for(uri, buf)
    begin
      hit(uri, put) # raises on error
    rescue => e
      raise if @bytes_uploaded > 0

      # nothing uploaded, try another dest
      @errors << "#{uri.to_s} - #{e.message} (#{e.class})"
      devid, uri = get_dest
      put = request_for(uri, buf)
      retry
    end

    @uri, @devid = uri, devid if 0 == @bytes_uploaded
    @bytes_uploaded += len
    len
  end

  # called on close, do not use
  def commit # :nodoc:
    zero_byte_special if @bytes_uploaded == 0

    create_close(@devid, @uri, @bytes_uploaded)
  end

  # special case for zero-byte files :<
  def zero_byte_special # :nodoc:
    @devid, @uri = get_dest
    put = request_for(@uri, "")
    begin
      hit(@uri, put) # raises on error
    rescue => e
      @errors << "#{@uri.to_s} - #{e.message} (#{e.class})"
      @devid, @uri = get_dest
      put = request_for(@uri, "")
      retry
    end
  end
end

git clone https://yhbt.net/mogilefs-client.git