diff options
Diffstat (limited to 'lib/rainbows/reverse_proxy')
-rw-r--r-- | lib/rainbows/reverse_proxy/coolio.rb | 61 | ||||
-rw-r--r-- | lib/rainbows/reverse_proxy/ev_client.rb | 39 | ||||
-rw-r--r-- | lib/rainbows/reverse_proxy/event_machine.rb | 46 | ||||
-rw-r--r-- | lib/rainbows/reverse_proxy/multi_thread.rb | 6 | ||||
-rw-r--r-- | lib/rainbows/reverse_proxy/synchronous.rb | 21 |
5 files changed, 173 insertions, 0 deletions
diff --git a/lib/rainbows/reverse_proxy/coolio.rb b/lib/rainbows/reverse_proxy/coolio.rb new file mode 100644 index 0000000..2197958 --- /dev/null +++ b/lib/rainbows/reverse_proxy/coolio.rb @@ -0,0 +1,61 @@ +# -*- encoding: binary -*- +# :enddoc: +# TODO: handle large responses without having it all in memory +module Rainbows::ReverseProxy::Coolio + LOOP = Cool.io::Loop.default + + class Backend < Cool.io::IO + include Rainbows::ReverseProxy::EvClient + + def initialize(env, addr, input) + @env = env + @input = input + @junk, @rbuf = "", "" + @parser = Kcar::Parser.new + @response = @body = nil + @headers = Rack::Utils::HeaderHash.new + super(UpstreamSocket.start(addr)) # kgio-enabled socket + end + + def on_write_complete + if @input + buf = @input.read(16384, @junk) and return write(buf) + @input = nil + end + end + + def on_readable + # avoiding IO#read_nonblock since that's expensive in 1.9.2 + case buf = @_io.kgio_tryread(16384, @junk) + when String + receive_data(buf) + when :wait_readable + return + when nil + @env[AsyncCallback].call(@response) + return close + end while true # we always read until EAGAIN or EOF + + rescue => e + case e + when Errno::ECONNRESET + @env[AsyncCallback].call(@response) + return close + when SystemCallError + else + logger = @env["rack.logger"] + logger.error "#{e} #{e.message}" + e.backtrace.each { |m| logger.error m } + end + @env[AsyncCallback].call(Rainbows::ReverseProxy::E502) + close + end + end + + def call(env) + input = prepare_input!(env) + sock = Backend.new(env, pick_upstream(env), input).attach(LOOP) + sock.write(build_headers(env, input)) + throw :async + end +end diff --git a/lib/rainbows/reverse_proxy/ev_client.rb b/lib/rainbows/reverse_proxy/ev_client.rb new file mode 100644 index 0000000..94e7f82 --- /dev/null +++ b/lib/rainbows/reverse_proxy/ev_client.rb @@ -0,0 +1,39 @@ +# -*- encoding: binary -*- +# :enddoc: +require 'tempfile' +module Rainbows::ReverseProxy::EvClient + include Rainbows::ReverseProxy::Synchronous + AsyncCallback = "async.callback" + CBB = Unicorn::TeeInput.client_body_buffer_size + Content_Length = "Content-Length" + Transfer_Encoding = "Transfer-Encoding" + + def receive_data(buf) + if @body + @body << buf + else + response = @parser.headers(@headers, @rbuf << buf) or return + if (cl = @headers[Content_Length] && cl.to_i > CBB) || + (%r{\bchunked\b} =~ @headers[Transfer_Encoding]) + @body = LargeBody.new("") + @body << @rbuf + @response = response << @body + else + @body = @rbuf.dup + @response = response << [ @body ] + end + end + end + + class LargeBody < Tempfile + def each(&block) + buf = "" + rewind + while read(16384, buf) + yield buf + end + end + + alias close close! + end +end diff --git a/lib/rainbows/reverse_proxy/event_machine.rb b/lib/rainbows/reverse_proxy/event_machine.rb new file mode 100644 index 0000000..8cb382c --- /dev/null +++ b/lib/rainbows/reverse_proxy/event_machine.rb @@ -0,0 +1,46 @@ +# -*- encoding: binary -*- +# :enddoc: +# TODO: handle large responses without having it all in memory +module Rainbows::ReverseProxy::EventMachine + class Backend < EM::Connection + include Rainbows::ReverseProxy::EvClient # provides receive_data + + # +addr+ is a packed sockaddr, so it can be either a UNIX or TCP socket + def initialize(env) + @env = env + @rbuf = "" + @parser = Kcar::Parser.new + @response = @body = nil + @headers = Rack::Utils::HeaderHash.new + end + + # prevents us from sending too much at once and OOM-ing on large uploads + def stream_input(input) + if buf = input.read(16384) + send_data buf + EM.next_tick { stream_input(input) } + end + end + + def on_write_complete + if @input + buf = @input.read(16384, @junk) and return write(buf) + @input = nil + end + end + + def unbind + @env[AsyncCallback].call(@response || Rainbows::ReverseProxy::E502) + end + end + + UpstreamSocket = Rainbows::ReverseProxy::UpstreamSocket + def call(env) + input = prepare_input!(env) + io = UpstreamSocket.start(pick_upstream(env)) + sock = EM.attach(io, Backend, env) + sock.send_data(build_headers(env, input)) + sock.stream_input(input) if input + throw :async + end +end diff --git a/lib/rainbows/reverse_proxy/multi_thread.rb b/lib/rainbows/reverse_proxy/multi_thread.rb new file mode 100644 index 0000000..6714bc0 --- /dev/null +++ b/lib/rainbows/reverse_proxy/multi_thread.rb @@ -0,0 +1,6 @@ +# -*- encoding -*- +module Rainbows::ReverseProxy::MultiThread + def pick_upstream(env) + @lock.synchronize { super(env) } + end +end diff --git a/lib/rainbows/reverse_proxy/synchronous.rb b/lib/rainbows/reverse_proxy/synchronous.rb new file mode 100644 index 0000000..b5911a9 --- /dev/null +++ b/lib/rainbows/reverse_proxy/synchronous.rb @@ -0,0 +1,21 @@ +# -*- encoding: binary -*- +# :enddoc: +module Rainbows::ReverseProxy::Synchronous + UpstreamSocket = Rainbows::ReverseProxy::UpstreamSocket + + def each_block(input, &block) + buf = "" + while input.read(16384, buf) + yield buf + end + end + + def call(env) + input = prepare_input!(env) + req = build_headers(env, input) + sock = UpstreamSocket.new(pick_upstream(env)) + sock.write(req) + each_block(input) { |buf| sock.kgio_write(buf) } if input + Kcar::Response.new(sock).rack + end +end |