From 130d086666ccd825fcb13cf02fcf941b8fe661af Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 1 Feb 2011 15:05:52 -0800 Subject: preliminary reverse proxy Rack application This can be a starting point for developing Cool.io or EventMachine-based reverse proxy applications on Rainbows! Eventually Rainbows! could replace nginx for Unicorn users! Just don't consider this code production ready, yet, at all, it doesn't handle any sort of failover and has no automated tests, yet. --- lib/rainbows/reverse_proxy/coolio.rb | 61 +++++++++++++++++++++++++++++ lib/rainbows/reverse_proxy/ev_client.rb | 39 ++++++++++++++++++ lib/rainbows/reverse_proxy/event_machine.rb | 46 ++++++++++++++++++++++ lib/rainbows/reverse_proxy/multi_thread.rb | 6 +++ lib/rainbows/reverse_proxy/synchronous.rb | 21 ++++++++++ 5 files changed, 173 insertions(+) create mode 100644 lib/rainbows/reverse_proxy/coolio.rb create mode 100644 lib/rainbows/reverse_proxy/ev_client.rb create mode 100644 lib/rainbows/reverse_proxy/event_machine.rb create mode 100644 lib/rainbows/reverse_proxy/multi_thread.rb create mode 100644 lib/rainbows/reverse_proxy/synchronous.rb (limited to 'lib/rainbows/reverse_proxy') diff --git a/lib/rainbows/reverse_proxy/coolio.rb b/lib/rainbows/reverse_proxy/coolio.rb new file mode 100644 index 0000000..2197958 --- /dev/null +++ b/lib/rainbows/reverse_proxy/coolio.rb @@ -0,0 +1,61 @@ +# -*- encoding: binary -*- +# :enddoc: +# TODO: handle large responses without having it all in memory +module Rainbows::ReverseProxy::Coolio + LOOP = Cool.io::Loop.default + + class Backend < Cool.io::IO + include Rainbows::ReverseProxy::EvClient + + def initialize(env, addr, input) + @env = env + @input = input + @junk, @rbuf = "", "" + @parser = Kcar::Parser.new + @response = @body = nil + @headers = Rack::Utils::HeaderHash.new + super(UpstreamSocket.start(addr)) # kgio-enabled socket + end + + def on_write_complete + if @input + buf = @input.read(16384, @junk) and return write(buf) + @input = nil + end + end + + def on_readable + # avoiding IO#read_nonblock since that's expensive in 1.9.2 + case buf = @_io.kgio_tryread(16384, @junk) + when String + receive_data(buf) + when :wait_readable + return + when nil + @env[AsyncCallback].call(@response) + return close + end while true # we always read until EAGAIN or EOF + + rescue => e + case e + when Errno::ECONNRESET + @env[AsyncCallback].call(@response) + return close + when SystemCallError + else + logger = @env["rack.logger"] + logger.error "#{e} #{e.message}" + e.backtrace.each { |m| logger.error m } + end + @env[AsyncCallback].call(Rainbows::ReverseProxy::E502) + close + end + end + + def call(env) + input = prepare_input!(env) + sock = Backend.new(env, pick_upstream(env), input).attach(LOOP) + sock.write(build_headers(env, input)) + throw :async + end +end diff --git a/lib/rainbows/reverse_proxy/ev_client.rb b/lib/rainbows/reverse_proxy/ev_client.rb new file mode 100644 index 0000000..94e7f82 --- /dev/null +++ b/lib/rainbows/reverse_proxy/ev_client.rb @@ -0,0 +1,39 @@ +# -*- encoding: binary -*- +# :enddoc: +require 'tempfile' +module Rainbows::ReverseProxy::EvClient + include Rainbows::ReverseProxy::Synchronous + AsyncCallback = "async.callback" + CBB = Unicorn::TeeInput.client_body_buffer_size + Content_Length = "Content-Length" + Transfer_Encoding = "Transfer-Encoding" + + def receive_data(buf) + if @body + @body << buf + else + response = @parser.headers(@headers, @rbuf << buf) or return + if (cl = @headers[Content_Length] && cl.to_i > CBB) || + (%r{\bchunked\b} =~ @headers[Transfer_Encoding]) + @body = LargeBody.new("") + @body << @rbuf + @response = response << @body + else + @body = @rbuf.dup + @response = response << [ @body ] + end + end + end + + class LargeBody < Tempfile + def each(&block) + buf = "" + rewind + while read(16384, buf) + yield buf + end + end + + alias close close! + end +end diff --git a/lib/rainbows/reverse_proxy/event_machine.rb b/lib/rainbows/reverse_proxy/event_machine.rb new file mode 100644 index 0000000..8cb382c --- /dev/null +++ b/lib/rainbows/reverse_proxy/event_machine.rb @@ -0,0 +1,46 @@ +# -*- encoding: binary -*- +# :enddoc: +# TODO: handle large responses without having it all in memory +module Rainbows::ReverseProxy::EventMachine + class Backend < EM::Connection + include Rainbows::ReverseProxy::EvClient # provides receive_data + + # +addr+ is a packed sockaddr, so it can be either a UNIX or TCP socket + def initialize(env) + @env = env + @rbuf = "" + @parser = Kcar::Parser.new + @response = @body = nil + @headers = Rack::Utils::HeaderHash.new + end + + # prevents us from sending too much at once and OOM-ing on large uploads + def stream_input(input) + if buf = input.read(16384) + send_data buf + EM.next_tick { stream_input(input) } + end + end + + def on_write_complete + if @input + buf = @input.read(16384, @junk) and return write(buf) + @input = nil + end + end + + def unbind + @env[AsyncCallback].call(@response || Rainbows::ReverseProxy::E502) + end + end + + UpstreamSocket = Rainbows::ReverseProxy::UpstreamSocket + def call(env) + input = prepare_input!(env) + io = UpstreamSocket.start(pick_upstream(env)) + sock = EM.attach(io, Backend, env) + sock.send_data(build_headers(env, input)) + sock.stream_input(input) if input + throw :async + end +end diff --git a/lib/rainbows/reverse_proxy/multi_thread.rb b/lib/rainbows/reverse_proxy/multi_thread.rb new file mode 100644 index 0000000..6714bc0 --- /dev/null +++ b/lib/rainbows/reverse_proxy/multi_thread.rb @@ -0,0 +1,6 @@ +# -*- encoding -*- +module Rainbows::ReverseProxy::MultiThread + def pick_upstream(env) + @lock.synchronize { super(env) } + end +end diff --git a/lib/rainbows/reverse_proxy/synchronous.rb b/lib/rainbows/reverse_proxy/synchronous.rb new file mode 100644 index 0000000..b5911a9 --- /dev/null +++ b/lib/rainbows/reverse_proxy/synchronous.rb @@ -0,0 +1,21 @@ +# -*- encoding: binary -*- +# :enddoc: +module Rainbows::ReverseProxy::Synchronous + UpstreamSocket = Rainbows::ReverseProxy::UpstreamSocket + + def each_block(input, &block) + buf = "" + while input.read(16384, buf) + yield buf + end + end + + def call(env) + input = prepare_input!(env) + req = build_headers(env, input) + sock = UpstreamSocket.new(pick_upstream(env)) + sock.write(req) + each_block(input) { |buf| sock.kgio_write(buf) } if input + Kcar::Response.new(sock).rack + end +end -- cgit v1.2.3-24-ge0c7