1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
| | # -*- encoding: binary -*-
# here are internal implementation details, do not use them in your code
require 'socket'
require 'uri'
require 'digest/md5'
require 'mogilefs/chunker'
module MogileFS::NewFile::Common
# :stopdoc:
RetryableError = Class.new(MogileFS::Error)
EmptyResponseError = Class.new(RetryableError)
BadResponseError = Class.new(RetryableError)
UnparseableResponseError = Class.new(RetryableError)
class NoStorageNodesError < MogileFS::Error
def message; 'Unable to open socket to storage node'; end
end
NonRetryableError = Class.new(MogileFS::Error)
MD5_TRAILER_NODES = {} # :nodoc: # EXPERIMENTAL
def read_response(sock)
tout = @opts[:new_file_max_time] || 3600.0
start_time = @opts[:start_time] and tout -= MogileFS.now - start_time
set_socket_options(sock)
case line = sock.timed_read(23, "", tout > 0.0 ? tout : 0)
when %r{^HTTP/\d\.\d\s+(?:2\d\d)\s} # success!
when nil
raise EmptyResponseError, 'Unable to read response line from server'
when %r{^HTTP/\d\.\d\s+(\d+)}
raise BadResponseError, "HTTP response status from upload: #$1"
else
raise UnparseableResponseError,
"Response line not understood: #{line.inspect}"
end
end
def create_close(devid, uri, bytes_uploaded)
close_args = @opts[:create_close_args]
dest_info = @opts[:info] ||= {}
dest_info["fid"] = @opts[:fid].to_i
dest_info["key"] = @opts[:key]
dest_info["domain"] = @opts[:domain]
dest_info[:devid] = devid
dest_info[:path] = uri.to_s
dest_info[:size] = bytes_uploaded
if @md5
dest_info["checksum"] = "MD5:#{@md5.hexdigest}"
elsif String === @opts[:content_md5]
hex = @opts[:content_md5].unpack('m')[0].unpack('H*')[0]
dest_info["checksum"] = "MD5:#{hex}"
end
dest_info[:checksumverify] = 1 if @opts[:checksumverify]
backend = @opts[:backend]
# upload could've taken a long time, ping and try to ensure socket
# is valid to minimize (but not completely eliminate) the chance
# create_close hits a stale socket (while reading the response after
# writing to it) and becomes non-retryable. We treat create_close
# specially as its less idempotent than any other command
# (even other non-idempotent ones). There may be no hope of retrying
# the upload at all if data was streamed and calling create_close
# twice will hurt us...
backend.noop
backend.create_close(close_args ? close_args.merge(dest_info) : dest_info)
# make this look like file_info + get_uris
dest_info.delete(:checksumverify)
dest_info.delete(:path)
dest_info[:uris] = [ uri ]
dest_info["devcount"] = 1
dest_info["devids"] = [ dest_info.delete(:devid).to_i ]
dest_info["length"] = dest_info.delete(:size)
bytes_uploaded
end
# aggressive keepalive settings on Linux + Ruby 1.9.2+
TCP_KEEPALIVE = {
:TCP_KEEPIDLE => 60, # seconds time before keepalive packet is sent
:TCP_KEEPINTVL => 5,
:TCP_KEEPCNT => 2, # number of retries
}
req_consts = TCP_KEEPALIVE.keys
if (Socket.constants & req_consts).size == req_consts.size
def set_socket_options(sock)
sock.setsockopt(:SOL_SOCKET, :SO_KEEPALIVE, 1)
TCP_KEEPALIVE.each do |k,v|
sock.setsockopt(:IPPROTO_TCP, k, v)
end
end
else
def set_socket_options(sock)
sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, 1)
end
end
# :startdoc:
end
|