1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
| | # -*- encoding: binary -*-
# :enddoc:
class Rainbows::EventMachine::Client < EM::Connection
attr_writer :body
include Rainbows::EvCore
def initialize(io)
@_io = io
@body = nil
end
alias write send_data
def receive_data(data)
# To avoid clobbering the current streaming response
# (often a static file), we do not attempt to process another
# request on the same connection until the first is complete
if @body
if data
@buf << data
@_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000
end
EM.next_tick { receive_data(nil) } unless @buf.empty?
else
on_read(data || "") if (@buf.size > 0) || data
end
end
def quit
super
close_connection_after_writing
end
def app_call
set_comm_inactivity_timeout 0
@env[RACK_INPUT] = @input
@env[REMOTE_ADDR] = @_io.kgio_addr
@env[ASYNC_CALLBACK] = method(:em_write_response)
@env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) }
# too tricky to support pipelining with :async since the
# second (pipelined) request could be a stuck behind a
# long-running async response
(response.nil? || -1 == response[0]) and return @state = :close
if @hp.next? && G.alive && G.kato > 0
@state = :headers
em_write_response(response, true)
if @buf.empty?
set_comm_inactivity_timeout(G.kato)
elsif @body.nil?
EM.next_tick { receive_data(nil) }
end
else
em_write_response(response, false)
end
end
def em_write_response(response, alive = false)
status, headers, body = response
if @hp.headers?
headers = HH.new(headers)
headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE
else
headers = nil
end
if body.respond_to?(:errback) && body.respond_to?(:callback)
@body = body
body.callback { quit }
body.errback { quit }
# async response, this could be a trickle as is in comet-style apps
headers[CONNECTION] = CLOSE if headers
alive = true
elsif body.respond_to?(:to_path)
st = File.stat(path = body.to_path)
if st.file?
write(response_header(status, headers)) if headers
@body = stream_file_data(path)
@body.errback do
body.close if body.respond_to?(:close)
quit
end
@body.callback do
body.close if body.respond_to?(:close)
@body = nil
alive ? receive_data(nil) : quit
end
return
elsif st.socket? || st.pipe?
@body = io = body_to_io(body)
chunk = stream_response_headers(status, headers) if headers
m = chunk ? Rainbows::EventMachine::ResponseChunkPipe :
Rainbows::EventMachine::ResponsePipe
return EM.watch(io, m, self, alive, body).notify_readable = true
end
# char or block device... WTF? fall through to body.each
end
write(response_header(status, headers)) if headers
write_body_each(self, body)
quit unless alive
end
def next!
@hp.keepalive? ? receive_data(@body = nil) : quit
end
def unbind
async_close = @env[ASYNC_CLOSE] and async_close.succeed
@body.respond_to?(:fail) and @body.fail
begin
@_io.close
rescue Errno::EBADF
# EventMachine's EventableDescriptor::Close() may close
# the underlying file descriptor without invalidating the
# associated IO object on errors, so @_io.closed? isn't
# sufficient.
end
end
end
|