1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
| | # -*- encoding: binary -*-
# here are internal implementation details, do not use them in your code
require 'stringio'
require 'uri'
require 'mogilefs/chunker'
##
# HTTPFile wraps up the new file operations for storing files onto an HTTP
# storage node.
#
# You really don't want to create an HTTPFile by hand. Instead you want to
# create a new file using MogileFS::MogileFS.new_file.
#
class MogileFS::HTTPFile < StringIO
class EmptyResponseError < MogileFS::Error; end
class BadResponseError < MogileFS::Error; end
class UnparseableResponseError < MogileFS::Error; end
class NoStorageNodesError < MogileFS::Error
def message; 'Unable to open socket to storage node'; end
end
class NonRetryableError < MogileFS::Error; end
##
# The URI this file will be stored to.
attr_reader :uri
attr_reader :devid
##
# The big_io name in case we have file > 256M
attr_accessor :big_io
attr_accessor :streaming_io
##
# Creates a new HTTPFile with MogileFS-specific data. Use
# MogileFS::MogileFS#new_file instead of this method.
def initialize(dests, content_length)
super ""
@streaming_io = @big_io = @uri = @devid = @active = nil
@dests = dests
end
def request_put(sock, uri, file_size, input = nil)
if file_size
sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \
"Content-Length: #{file_size}\r\n\r\n")
input ? MogileFS::X.copy_stream(@active = input, sock) : yield(sock)
else
sock.write("PUT #{uri.request_uri} HTTP/1.1\r\n" \
"Host: #{uri.host}:#{uri.port}\r\n" \
"Transfer-Encoding: chunked\r\n\r\n")
tmp = MogileFS::Chunker.new(sock)
rv = input ? MogileFS::X.copy_stream(@active = input, tmp) : yield(tmp)
tmp.flush
rv
end
end
def put_streaming_io(sock, uri) # unlikely to be used
file_size = @streaming_io.length
written = 0
request_put(sock, uri, file_size) do |wr|
@streaming_io.call(Proc.new do |data_to_write|
written += wr.write(data_to_write)
end)
end
file_size ? file_size : written
end
def rewind_or_raise!(uri, err)
@active.rewind if @active
rescue => e
msg = "#{uri} failed with #{err.message} (#{err.class}) and " \
"retrying is impossible as rewind on " \
"#{@active.inspect} failed with: #{e.message} (#{e.class})"
raise NonRetryableError, msg, e.backtrace
end
##
# Writes an HTTP PUT request to +sock+ to upload the file and
# returns file size if the socket finished writing
def upload(devid, uri) # :nodoc:
sock = MogileFS::Socket.tcp(uri.host, uri.port)
file_size = length
if @streaming_io
file_size = put_streaming_io(sock, uri)
elsif @big_io
if String === @big_io || @big_io.respond_to?(:to_path)
File.open(@big_io) do |rd|
stat = rd.stat
file_size = request_put(sock, uri, stat.file? ? stat.size : nil, rd)
end
else
size = nil
if @big_io.respond_to?(:stat)
stat = @big_io.stat
size = stat.size if stat.file?
elsif @big_io.respond_to?(:size)
size = @big_io.size
end
file_size = request_put(sock, uri, size, @big_io)
end
else
sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \
"Content-Length: #{file_size}\r\n\r\n#{string}")
end
case line = sock.timed_read(23, "")
when %r{^HTTP/\d\.\d\s+(2\d\d)\s} # success!
file_size
when nil
raise EmptyResponseError, 'Unable to read response line from server'
when %r{^HTTP/\d\.\d\s+(\d+)}
raise BadResponseError, "HTTP response status from upload: #$1"
else
raise UnparseableResponseError, "Response line not understood: #{line}"
end
rescue => err
rewind_or_raise!(uri, err)
raise
ensure
sock.close if sock
end
def commit
errors = nil
@dests.each do |devid, path|
begin
uri = URI.parse(path)
bytes_uploaded = upload(devid, uri)
@devid, @uri = devid, uri
return bytes_uploaded
rescue NonRetryableError
raise
rescue => e
errors ||= []
errors << "#{path} - #{e.message} (#{e.class})"
end
end
raise NoStorageNodesError,
"all paths failed with PUT: #{errors.join(', ')}", []
end
end
|