diff options
Diffstat (limited to 'lib/rainbows')
-rw-r--r-- | lib/rainbows/coolio/client.rb | 24 | ||||
-rw-r--r-- | lib/rainbows/coolio/thread_client.rb | 2 | ||||
-rw-r--r-- | lib/rainbows/epoll/client.rb | 20 | ||||
-rw-r--r-- | lib/rainbows/ev_core.rb | 9 | ||||
-rw-r--r-- | lib/rainbows/event_machine/client.rb | 13 | ||||
-rw-r--r-- | lib/rainbows/process_client.rb | 10 | ||||
-rw-r--r-- | lib/rainbows/response.rb | 72 | ||||
-rw-r--r-- | lib/rainbows/revactor/client/methods.rb | 2 | ||||
-rw-r--r-- | lib/rainbows/stream_response_epoll.rb | 49 | ||||
-rw-r--r-- | lib/rainbows/stream_response_epoll/client.rb | 14 | ||||
-rw-r--r-- | lib/rainbows/writer_thread_pool/client.rb | 2 |
11 files changed, 160 insertions, 57 deletions
diff --git a/lib/rainbows/coolio/client.rb b/lib/rainbows/coolio/client.rb index 8d48bbf..843f574 100644 --- a/lib/rainbows/coolio/client.rb +++ b/lib/rainbows/coolio/client.rb @@ -86,6 +86,12 @@ class Rainbows::Coolio::Client < Coolio::IO @deferred = true end + def hijacked + CONN.delete(self) + detach + nil + end + def write_response_path(status, headers, body, alive) io = body_to_io(body) st = io.stat @@ -93,7 +99,8 @@ class Rainbows::Coolio::Client < Coolio::IO if st.file? defer_file(status, headers, body, alive, io, st) elsif st.socket? || st.pipe? - chunk = stream_response_headers(status, headers, alive) + chunk = stream_response_headers(status, headers, alive, body) + return hijacked if nil == chunk stream_response_body(body, io, chunk) else # char or block device... WTF? @@ -103,10 +110,11 @@ class Rainbows::Coolio::Client < Coolio::IO def ev_write_response(status, headers, body, alive) if body.respond_to?(:to_path) - write_response_path(status, headers, body, alive) + body = write_response_path(status, headers, body, alive) else - write_response(status, headers, body, alive) + body = write_response(status, headers, body, alive) end + return hijacked unless body return quit unless alive && :close != @state @state = :headers end @@ -117,9 +125,11 @@ class Rainbows::Coolio::Client < Coolio::IO @env[RACK_INPUT] = input @env[REMOTE_ADDR] = @_io.kgio_addr @env[ASYNC_CALLBACK] = method(:write_async_response) + @hp.hijack_setup(@env, @_io) status, headers, body = catch(:async) { APP.call(@env.merge!(RACK_DEFAULTS)) } + return hijacked if @hp.hijacked? (nil == status || -1 == status) ? @deferred = true : ev_write_response(status, headers, body, @hp.next?) @@ -186,12 +196,13 @@ class Rainbows::Coolio::Client < Coolio::IO def defer_file(status, headers, body, alive, io, st) if r = sendfile_range(status, headers) status, headers, range = r - write_headers(status, headers, alive) + body = write_headers(status, headers, alive, body) or return hijacked range and defer_file_stream(range[0], range[1], io, body) else - write_headers(status, headers, alive) + write_headers(status, headers, alive, body) or return hijacked defer_file_stream(0, st.size, io, body) end + body end def stream_file_chunk(sf) # +sf+ is a Rainbows::StreamFile object @@ -207,8 +218,9 @@ class Rainbows::Coolio::Client < Coolio::IO end else def defer_file(status, headers, body, alive, io, st) - write_headers(status, headers, alive) + write_headers(status, headers, alive, body) or return hijacked defer_file_stream(0, st.size, io, body) + body end def stream_file_chunk(body) diff --git a/lib/rainbows/coolio/thread_client.rb b/lib/rainbows/coolio/thread_client.rb index abc11d2..ee9fa04 100644 --- a/lib/rainbows/coolio/thread_client.rb +++ b/lib/rainbows/coolio/thread_client.rb @@ -14,6 +14,7 @@ class Rainbows::Coolio::ThreadClient < Rainbows::Coolio::Client # this is only called in the master thread def response_write(response) + return hijacked if @hp.hijacked? ev_write_response(*response, @hp.next?) rescue => e handle_error(e) @@ -25,6 +26,7 @@ class Rainbows::Coolio::ThreadClient < Rainbows::Coolio::Client def app_response begin @env[REMOTE_ADDR] = @_io.kgio_addr + @hp.hijack_setup(@env, @_io) APP.call(@env.merge!(RACK_DEFAULTS)) rescue => e Rainbows::Error.app(e) # we guarantee this does not raise diff --git a/lib/rainbows/epoll/client.rb b/lib/rainbows/epoll/client.rb index d72696b..f6af6fa 100644 --- a/lib/rainbows/epoll/client.rb +++ b/lib/rainbows/epoll/client.rb @@ -52,6 +52,7 @@ module Rainbows::Epoll::Client when String on_read(rv) return if @wr_queue[0] || closed? + return hijacked if @hp.hijacked? when :wait_readable KATO[self] = @@last_expire if :headers == @state return EP.set(self, IN) @@ -67,7 +68,9 @@ module Rainbows::Epoll::Client def app_call input # called by on_read() @env[RACK_INPUT] = input @env[REMOTE_ADDR] = kgio_addr + @hp.hijack_setup(@env, self) status, headers, body = APP.call(@env.merge!(RACK_DEFAULTS)) + return hijacked if @hp.hijacked? ev_write_response(status, headers, body, @hp.next?) end @@ -78,7 +81,8 @@ module Rainbows::Epoll::Client if st.file? defer_file(status, headers, body, alive, io, st) elsif st.socket? || st.pipe? - chunk = stream_response_headers(status, headers, alive) + chunk = stream_response_headers(status, headers, alive, body) + return hijacked if nil == chunk stream_response_body(body, io, chunk) else # char or block device... WTF? @@ -102,10 +106,18 @@ module Rainbows::Epoll::Client else write_response(status, headers, body, alive) end + return hijacked if @hp.hijacked? # try to read more if we didn't have to buffer writes next_request if alive && 0 == @wr_queue.size end + def hijacked + KATO.delete(self) + Server.decr # no other place to do this + EP.delete(self) + nil + end + def next_request if 0 == @buf.size want_more @@ -113,6 +125,7 @@ module Rainbows::Epoll::Client # pipelined request (already in buffer) on_read(Z) return if @wr_queue[0] || closed? + return hijacked if @hp.hijacked? close if :close == @state end end @@ -197,13 +210,14 @@ module Rainbows::Epoll::Client true end + # Rack apps should not hijack here, but they may... def defer_file(status, headers, body, alive, io, st) if r = sendfile_range(status, headers) status, headers, range = r - write_headers(status, headers, alive) + write_headers(status, headers, alive, body) or return hijacked range and defer_file_stream(range[0], range[1], io, body) else - write_headers(status, headers, alive) + write_headers(status, headers, alive, body) or return hijacked defer_file_stream(0, st.size, io, body) end end diff --git a/lib/rainbows/ev_core.rb b/lib/rainbows/ev_core.rb index 46feaff..5c3c5b8 100644 --- a/lib/rainbows/ev_core.rb +++ b/lib/rainbows/ev_core.rb @@ -52,16 +52,17 @@ module Rainbows::EvCore end # returns whether to enable response chunking for autochunk models - def stream_response_headers(status, headers, alive) + # returns nil if request was hijacked in response stage + def stream_response_headers(status, headers, alive, body) headers = Rack::Utils::HeaderHash.new(headers) unless Hash === headers if headers.include?(Content_Length) - write_headers(status, headers, alive) + write_headers(status, headers, alive, body) or return return false end case @env[HTTP_VERSION] when "HTTP/1.0" # disable HTTP/1.0 keepalive to stream - write_headers(status, headers, false) + write_headers(status, headers, false, body) or return @hp.clear false when nil # "HTTP/0.9" @@ -69,7 +70,7 @@ module Rainbows::EvCore else rv = !!(headers[Transfer_Encoding] =~ %r{\Achunked\z}i) rv = false unless @env["rainbows.autochunk"] - write_headers(status, headers, alive) + write_headers(status, headers, alive, body) or return rv end end diff --git a/lib/rainbows/event_machine/client.rb b/lib/rainbows/event_machine/client.rb index 26f0dbd..9871c09 100644 --- a/lib/rainbows/event_machine/client.rb +++ b/lib/rainbows/event_machine/client.rb @@ -10,6 +10,7 @@ class Rainbows::EventMachine::Client < EM::Connection end alias write send_data + alias hijacked detach def receive_data(data) # To avoid clobbering the current streaming response @@ -37,9 +38,11 @@ class Rainbows::EventMachine::Client < EM::Connection @env[REMOTE_ADDR] = @_io.kgio_addr @env[ASYNC_CALLBACK] = method(:write_async_response) @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new + @hp.hijack_setup(@env, @_io) status, headers, body = catch(:async) { APP.call(@env.merge!(RACK_DEFAULTS)) } + return hijacked if @hp.hijacked? if (nil == status || -1 == status) @deferred = true @@ -67,8 +70,8 @@ class Rainbows::EventMachine::Client < EM::Connection def ev_write_response(status, headers, body, alive) @state = :headers if alive if body.respond_to?(:errback) && body.respond_to?(:callback) + write_headers(status, headers, alive, body) or return hijacked @deferred = body - write_headers(status, headers, alive) write_body_each(body) deferred_errback(body) deferred_callback(body, alive) @@ -77,21 +80,22 @@ class Rainbows::EventMachine::Client < EM::Connection st = File.stat(path = body.to_path) if st.file? - write_headers(status, headers, alive) + write_headers(status, headers, alive, body) or return hijacked @deferred = stream_file_data(path) deferred_errback(body) deferred_callback(body, alive) return elsif st.socket? || st.pipe? + chunk = stream_response_headers(status, headers, alive, body) + return hijacked if nil == chunk io = body_to_io(@deferred = body) - chunk = stream_response_headers(status, headers, alive) m = chunk ? Rainbows::EventMachine::ResponseChunkPipe : Rainbows::EventMachine::ResponsePipe return EM.watch(io, m, self).notify_readable = true end # char or block device... WTF? fall through to body.each end - write_response(status, headers, body, alive) + write_response(status, headers, body, alive) or return hijacked if alive if @deferred.nil? if @buf.empty? @@ -112,6 +116,7 @@ class Rainbows::EventMachine::Client < EM::Connection end def unbind + return if @hp.hijacked? async_close = @env[ASYNC_CLOSE] and async_close.succeed @deferred.respond_to?(:fail) and @deferred.fail begin diff --git a/lib/rainbows/process_client.rb b/lib/rainbows/process_client.rb index b685001..f58770c 100644 --- a/lib/rainbows/process_client.rb +++ b/lib/rainbows/process_client.rb @@ -40,6 +40,7 @@ module Rainbows::ProcessClient set_input(env, hp) env[REMOTE_ADDR] = kgio_addr + hp.hijack_setup(env, to_io) status, headers, body = APP.call(env.merge!(RACK_DEFAULTS)) if 100 == status.to_i @@ -47,7 +48,8 @@ module Rainbows::ProcessClient env.delete(HTTP_EXPECT) status, headers, body = APP.call(env) end - write_response(status, headers, body, alive = @hp.next?) + return if hp.hijacked? + write_response(status, headers, body, alive = hp.next?) or return end while alive # if we get any error, try to write something back to the client # assuming we haven't closed the socket, but don't get hung up @@ -56,7 +58,7 @@ module Rainbows::ProcessClient rescue => e handle_error(e) ensure - close unless closed? + close unless closed? || hp.hijacked? end def handle_error(e) @@ -71,13 +73,15 @@ module Rainbows::ProcessClient begin set_input(env, hp) env[REMOTE_ADDR] = kgio_addr + hp.hijack_setup(env, to_io) status, headers, body = APP.call(env.merge!(RACK_DEFAULTS)) if 100 == status.to_i write(EXPECT_100_RESPONSE) env.delete(HTTP_EXPECT) status, headers, body = APP.call(env) end - write_response(status, headers, body, alive = hp.next?) + return if hp.hijacked? + write_response(status, headers, body, alive = hp.next?) or return end while alive && pipeline_ready(hp) alive or close rescue => e diff --git a/lib/rainbows/response.rb b/lib/rainbows/response.rb index f8b0831..8a0daf8 100644 --- a/lib/rainbows/response.rb +++ b/lib/rainbows/response.rb @@ -19,23 +19,56 @@ module Rainbows::Response Rainbows::HttpParser.keepalive_requests = 0 end - def write_headers(status, headers, alive) - @hp.headers? or return + # Rack 1.5.0 (protocol version 1.2) adds response hijacking support + if ((Rack::VERSION[0] << 8) | Rack::VERSION[1]) >= 0x0102 + RACK_HIJACK = "rack.hijack" + + def hijack_prepare(value) + value + end + + def hijack_socket + @hp.env[RACK_HIJACK].call + end + else + def hijack_prepare(_) + end + end + + # returns the original body on success + # returns nil if the headers hijacked the response body + def write_headers(status, headers, alive, body) + @hp.headers? or return body + hijack = nil status = CODES[status.to_i] || status buf = "HTTP/1.1 #{status}\r\n" \ "Date: #{httpdate}\r\n" \ - "Status: #{status}\r\n" \ - "Connection: #{alive ? KeepAlive : Close}\r\n" + "Status: #{status}\r\n" headers.each do |key, value| - next if %r{\A(?:Date\z|Connection\z)}i =~ key - if value =~ /\n/ - # avoiding blank, key-only cookies with /\n+/ - buf << value.split(/\n+/).map! { |v| "#{key}: #{v}\r\n" }.join + case key + when %r{\A(?:Date\z|Connection\z)}i + next + when "rack.hijack" + # this was an illegal key in Rack < 1.5, so it should be + # OK to silently discard it for those older versions + hijack = hijack_prepare(value) + alive = false # No persistent connections for hijacking else - buf << "#{key}: #{value}\r\n" + if /\n/ =~ value + # avoiding blank, key-only cookies with /\n+/ + buf << value.split(/\n+/).map! { |v| "#{key}: #{v}\r\n" }.join + else + buf << "#{key}: #{value}\r\n" + end end end - write(buf << CRLF) + write(buf << "Connection: #{alive ? KeepAlive : Close}\r\n\r\n") + + if hijack + body = nil # ensure caller does not close body + hijack.call(hijack_socket) + end + body end def close_if_private(io) @@ -70,8 +103,9 @@ module Rainbows::Response # generic response writer, used for most dynamically-generated responses # and also when copy_stream and/or IO#trysendfile is unavailable def write_response(status, headers, body, alive) - write_headers(status, headers, alive) - write_body_each(body) + body = write_headers(status, headers, alive, body) + write_body_each(body) if body + body ensure body.close if body.respond_to?(:close) end @@ -166,21 +200,23 @@ module Rainbows::Response if File.file?(body.to_path) if r = sendfile_range(status, headers) status, headers, range = r - write_headers(status, headers, alive) - write_body_file(body, range) if range + body = write_headers(status, headers, alive, body) + write_body_file(body, range) if body && range else - write_headers(status, headers, alive) - write_body_file(body, nil) + body = write_headers(status, headers, alive, body) + write_body_file(body, nil) if body end else - write_headers(status, headers, alive) - write_body_stream(body) + body = write_headers(status, headers, alive, body) + write_body_stream(body) if body end + body ensure body.close if body.respond_to?(:close) end module ToPath + # returns nil if hijacked def write_response(status, headers, body, alive) if body.respond_to?(:to_path) write_response_path(status, headers, body, alive) diff --git a/lib/rainbows/revactor/client/methods.rb b/lib/rainbows/revactor/client/methods.rb index b2e1847..592c996 100644 --- a/lib/rainbows/revactor/client/methods.rb +++ b/lib/rainbows/revactor/client/methods.rb @@ -36,7 +36,7 @@ module Rainbows::Revactor::Client::Methods end def write_response(status, headers, body, alive) - super(status, headers, body, alive) + super(status, headers, body, alive) or return alive && @ts and @hp.buf << @ts.leftover end diff --git a/lib/rainbows/stream_response_epoll.rb b/lib/rainbows/stream_response_epoll.rb index 3bb3540..33d7386 100644 --- a/lib/rainbows/stream_response_epoll.rb +++ b/lib/rainbows/stream_response_epoll.rb @@ -26,18 +26,24 @@ module Rainbows::StreamResponseEpoll def http_response_write(socket, status, headers, body) status = CODES[status.to_i] || status - ep_client = false + hijack = ep_client = false if headers # don't set extra headers here, this is only intended for # consuming by nginx. buf = "HTTP/1.0 #{status}\r\nStatus: #{status}\r\n" headers.each do |key, value| - if value =~ /\n/ - # avoiding blank, key-only cookies with /\n+/ - buf << value.split(/\n+/).map! { |v| "#{key}: #{v}\r\n" }.join + case key + when "rack.hijack" + hijack = hijack_prepare(value) + body = nil # ensure we do not close body else - buf << "#{key}: #{value}\r\n" + if /\n/ =~ value + # avoiding blank, key-only cookies with /\n+/ + buf << value.split(/\n+/).map! { |v| "#{key}: #{v}\r\n" }.join + else + buf << "#{key}: #{value}\r\n" + end end end buf << HEADER_END @@ -48,11 +54,22 @@ module Rainbows::StreamResponseEpoll buf = rv when :wait_writable ep_client = Client.new(socket, buf) - body.each { |chunk| ep_client.write(chunk) } - return ep_client.close + if hijack + ep_client.hijack(hijack) + else + body.each { |chunk| ep_client.write(chunk) } + ep_client.close + end + # body is nil on hijack, in which case ep_client is never closed by us + return end while true end + if hijack + hijack.call(socket) + return + end + body.each do |chunk| if ep_client ep_client.write(chunk) @@ -67,14 +84,15 @@ module Rainbows::StreamResponseEpoll end while true end end - ensure - body.respond_to?(:close) and body.close - if ep_client - ep_client.close - else - socket.shutdown - socket.close - end + ensure + return if hijack + body.respond_to?(:close) and body.close + if ep_client + ep_client.close + else + socket.shutdown + socket.close + end end # once a client is accepted, it is processed in its entirety here @@ -88,6 +106,7 @@ module Rainbows::StreamResponseEpoll status, headers, body = @app.call(env) end @request.headers? or headers = nil + return if @request.hijacked? http_response_write(client, status, headers, body) rescue => e handle_error(client, e) diff --git a/lib/rainbows/stream_response_epoll/client.rb b/lib/rainbows/stream_response_epoll/client.rb index db303b0..dc226d6 100644 --- a/lib/rainbows/stream_response_epoll/client.rb +++ b/lib/rainbows/stream_response_epoll/client.rb @@ -18,7 +18,7 @@ class Rainbows::StreamResponseEpoll::Client attr_reader :to_io def initialize(io, unwritten) - @closed = false + @finish = false @to_io = io @wr_queue = [ unwritten.dup ] EP.set(self, OUT) @@ -29,7 +29,11 @@ class Rainbows::StreamResponseEpoll::Client end def close - @closed = true + @finish = true + end + + def hijack(hijack) + @finish = hijack end def epoll_run @@ -49,10 +53,14 @@ class Rainbows::StreamResponseEpoll::Client end def on_write_complete - if @closed + if true == @finish @to_io.shutdown @to_io.close N.decr(0, 1) + elsif @finish.respond_to?(:call) # hijacked + EP.delete(self) + N.decr(0, 1) + @finish.call(@to_io) end end end diff --git a/lib/rainbows/writer_thread_pool/client.rb b/lib/rainbows/writer_thread_pool/client.rb index 4df7f49..e02d6a8 100644 --- a/lib/rainbows/writer_thread_pool/client.rb +++ b/lib/rainbows/writer_thread_pool/client.rb @@ -8,11 +8,13 @@ class Rainbows::WriterThreadPool::Client < Struct.new(:to_io, :q) module Methods def write_body_each(body) + return if @hp.hijacked? q << [ to_io, :write_body_each, body ] end def write_response_close(status, headers, body, alive) to_io.instance_variable_set(:@hp, @hp) # XXX ugh + return if @hp.hijacked? Rainbows::SyncClose.new(body) { |sync_body| q << [ to_io, :write_response, status, headers, sync_body, alive ] } |