mogilefs-client.git  about / heads / tags
MogileFS client library for Ruby
blob 4396b8b7cd36ffaab68666acc19907ef100036c3 2375 bytes (raw)
$ git show HEAD:lib/mogilefs/new_file/stream.rb	# shows this blob on the CLI

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
 
# -*- encoding: binary -*-
# here are internal implementation details, do not use them in your code

require 'mogilefs/new_file/writer'

class MogileFS::NewFile::Stream
  attr_reader :to_io
  attr_reader :md5

  include MogileFS::NewFile::Writer
  include MogileFS::NewFile::Common

  def initialize(dests, opts)
    @opts = opts
    @md5 = nil
    @bytes_uploaded = 0
    dests.each do |devid, path|
      begin
        uri = URI.parse(path)
        sock = MogileFS::Socket.tcp(uri.host, uri.port)
        start_sock(sock, uri) # raise on errors
        @to_io = sock
        @uri = uri
        @devid = devid
        if ! @md5 && @opts[:content_length]
          @writer = @to_io
        else
          @writer = MogileFS::Chunker.new(@to_io, @md5, @opts[:content_md5])
        end
        return
      rescue SystemCallError => e
        sock.close if sock && ! sock.closed?
        errors ||= []
        errors << "#{path} - #{e.message} (#{e.class})"
      end
    end

    raise NoStorageNodesError,
          "all paths failed with PUT: #{errors.join(', ')}", []
  end

  def write(buf)
    buf = String buf
    return 0 if buf.empty?
    rv = @writer.write(buf)
    @bytes_uploaded += rv
    rv
  end

  def commit
    @writer.flush

    clen = @opts[:content_length]
    if clen && @bytes_uploaded != clen
      raise MogileFS::SizeMismatchError,
           "did not upload expected content_length: #{clen} uploaded: " \
           "#@bytes_uploaded"
    end
    read_response(@to_io) # raises on errors
    create_close(@devid, @uri, @bytes_uploaded)
  ensure
    @to_io.close if @to_io && ! @to_io.closed?
  end

  def start_sock(sock, uri)
    host_with_port = "#{uri.host}:#{uri.port}"
    headers = "PUT #{uri.request_uri} HTTP/1.1\r\n" \
              "Host: #{host_with_port}\r\n"

    content_md5 = @opts[:content_md5]
    if String === content_md5
      headers << "Content-MD5: #{content_md5}\r\n"
    elsif content_md5.respond_to?(:call) ||
          :trailer == content_md5 ||
          MD5_TRAILER_NODES[host_with_port]
      @md5 = Digest::MD5.new
      headers << "Trailer: Content-MD5\r\n".freeze
    end

    if ! @md5 && clen = @opts[:content_length]
      headers << "Content-Length: #{clen}\r\n"
    else
      headers << "Transfer-Encoding: chunked\r\n".freeze
    end

    sock.write(headers << "\r\n".freeze)
  end

  alias syswrite write
end

git clone https://yhbt.net/mogilefs-client.git