diff options
-rw-r--r-- | examples/reverse_proxy.ru | 9 | ||||
-rw-r--r-- | lib/rainbows.rb | 1 | ||||
-rw-r--r-- | lib/rainbows/reverse_proxy.rb | 185 | ||||
-rw-r--r-- | lib/rainbows/reverse_proxy/coolio.rb | 61 | ||||
-rw-r--r-- | lib/rainbows/reverse_proxy/ev_client.rb | 39 | ||||
-rw-r--r-- | lib/rainbows/reverse_proxy/event_machine.rb | 46 | ||||
-rw-r--r-- | lib/rainbows/reverse_proxy/multi_thread.rb | 6 | ||||
-rw-r--r-- | lib/rainbows/reverse_proxy/synchronous.rb | 21 |
8 files changed, 368 insertions, 0 deletions
diff --git a/examples/reverse_proxy.ru b/examples/reverse_proxy.ru new file mode 100644 index 0000000..fcb9e74 --- /dev/null +++ b/examples/reverse_proxy.ru @@ -0,0 +1,9 @@ +# see Rainbows::ReverseProxy RDoc +cfg = { + :upstreams => [ + "/tmp/.r.sock", + "http://bogomips.org/", + [ "http://10.6.6.6:666/", { :weight => 666 } ], + ] +} +run Rainbows::ReverseProxy.new(cfg) diff --git a/lib/rainbows.rb b/lib/rainbows.rb index f22a927..8c1bed9 100644 --- a/lib/rainbows.rb +++ b/lib/rainbows.rb @@ -137,6 +137,7 @@ module Rainbows autoload :ThreadTimeout, 'rainbows/thread_timeout' autoload :WorkerYield, 'rainbows/worker_yield' autoload :SyncClose, 'rainbows/sync_close' + autoload :ReverseProxy, 'rainbows/reverse_proxy' end require 'rainbows/error' diff --git a/lib/rainbows/reverse_proxy.rb b/lib/rainbows/reverse_proxy.rb new file mode 100644 index 0000000..0a8fd2b --- /dev/null +++ b/lib/rainbows/reverse_proxy.rb @@ -0,0 +1,185 @@ +# -*- encoding: binary -*- +require 'socket' +require 'thread' +require 'uri' +require 'kcar' # http://bogomips.org/kcar/ -- gem install kcar + +# A reverse proxy implementation for \Rainbows! It is a Rack application +# compatible and optimized for most \Rainbows! concurrency models. +# +# It makes HTTP/1.0 connections without keepalive to backends, so +# it is only recommended for proxying to upstreams on the same LAN +# or machine. It can proxy to TCP hosts as well as UNIX domain sockets. +# +# Currently it only does simple round-robin balancing and does not +# know to retry connections from failed backends. +# +# Buffering-behavior is currently dependent on the concurrency model selected: +# +# Fully-buffered (uploads and response bodies): +# Coolio, EventMachine, NeverBlock, CoolioThreadSpawn, CoolioThreadPool +# If you're proxying to Unicorn, fully-buffered is the way to go. +# +# Buffered input only (uploads, but not response bodies): +# ThreadSpawn, ThreadPool, FiberSpawn, FiberPool, CoolioFiberSpawn +# +# It is not recommended to use Base, WriterThreadSpawn or WriterThreadPool +# to host this application. However, you may proxy to a backend running +# one of these concurrency models with a fully-buffering concurrency model. +# +# See the {example config}[link:examples/reverse_proxy.ru] for a sample +# configuration +# +# TODO: Revactor support +# TODO: Support HTTP trailers +# TODO: optional streaming input for synchronous +# TODO: error handling +# +# WARNING! this is only lightly tested and has no automated tests, yet! +class Rainbows::ReverseProxy + autoload :MultiThread, 'rainbows/reverse_proxy/multi_thread' + autoload :Synchronous, 'rainbows/reverse_proxy/synchronous' + autoload :Coolio, 'rainbows/reverse_proxy/coolio' + autoload :EventMachine, 'rainbows/reverse_proxy/event_machine' + autoload :EvClient, 'rainbows/reverse_proxy/ev_client' + + HTTP_X_FORWARDED_FOR = "HTTP_X_FORWARDED_FOR" + REMOTE_ADDR = "REMOTE_ADDR" + REQUEST_METHOD = "REQUEST_METHOD" + REQUEST_URI = "REQUEST_URI" + CRLF = "\r\n" + TR = %w(_ -) + E502 = [ 502, [ %w(Content-Length 0), %w(Content-Type text/plain) ], [] ] + + def initialize(opts) + @lock = Mutex.new + upstreams = opts[:upstreams] + @upstreams = [] + upstreams.each do |url| + url, cfg = *url if Array === url + if url =~ %r{\Ahttp://} + uri = URI.parse(url) + sockaddr = Socket.sockaddr_in(uri.port, uri.host) + else + path = url.gsub(%r{\Aunix:}, "") # nginx compat + %r{\A~} =~ path and path = File.expand_path(path) + sockaddr = Socket.sockaddr_un(path) + end + ((cfg && cfg[:weight]) || 1).times { @upstreams << sockaddr } + end + @nr = 0 + end + + # detects the concurrency model at first run and replaces itself + def call(env) + if @lock.try_lock + case model = env["rainbows.model"] + when :EventMachine, :NeverBlock + extend(EventMachine) + when :Coolio, :CoolioThreadPool, :CoolioThreadSpawn + extend(Coolio) + when :RevFiberSpawn, :Rev, :RevThreadPool, :RevThreadSpawn + warn "#{model} is not *well* supported with #{self.class}" + warn "Switch to #{model.to_s.gsub(/Rev/, 'Coolio')}!" + extend(Synchronous) + when :Revactor + warn "Revactor is not *well* supported with #{self.class} yet" + extend(Synchronous) + when :FiberSpawn, :FiberPool, :CoolioFiberSpawn + extend(Synchronous) + Synchronous::UpstreamSocket. + __send__(:include, Rainbows::Fiber::IO::Methods) + when :WriterThreadSpawn, :WriterThreadPool + warn "#{model} is not recommended for use with #{self.class}" + extend(Synchronous) + else + extend(Synchronous) + end + extend(MultiThread) if env["rack.multithread"] + @lock.unlock + else + @lock.synchronize {} # wait for the first locker to finish + end + call(env) + end + + # returns request headers for sending to the upstream as a string + def build_headers(env, input) + remote_addr = env[REMOTE_ADDR] + xff = env[HTTP_X_FORWARDED_FOR] + xff = xff ? "#{xff},#{remote_addr}" : remote_addr + req = "#{env[REQUEST_METHOD]} #{env[REQUEST_URI]} HTTP/1.0\r\n" \ + "Connection: close\r\n" \ + "X-Forwarded-For: #{xff}\r\n" + uscore, dash = *TR + env.each do |key, value| + %r{\AHTTP_(\w+)\z} =~ key or next + key = $1 + next if %r{\A(?:VERSION|CONNECTION|KEEP_ALIVE|X_FORWARDED_FOR)\z}x =~ key + key.tr!(uscore, dash) + req << "#{key}: #{value}\r\n" + end + input and req << (input.respond_to?(:size) ? + "Content-Length: #{input.size}\r\n" : + "Transfer-Encoding: chunked\r\n") + req << CRLF + end + + def pick_upstream(env) # +env+ is reserved for future expansion + @nr += 1 + @upstreams[@nr %= @upstreams.size] + end + + CONTENT_LENGTH = "CONTENT_LENGTH" + HTTP_TRANSFER_ENCODING = "HTTP_TRANSFER_ENCODING" + RackInput = "rack.input" + + def prepare_input!(env) + if cl = env[CONTENT_LENGTH] + size = cl.to_i + size > 0 or return + elsif %r{\Achunked\z}i =~ env.delete(HTTP_TRANSFER_ENCODING) + # do people use multiple transfer-encodings? + else + return + end + + input = env[RackInput] + if input.respond_to?(:rewind) + if input.respond_to?(:size) + input.size # TeeInput-specific behavior + return input + else + return SizedInput.new(input, size) + end + end + tmp = size && size < 0x4000 ? StringIO.new("") : Unicorn::TmpIO.new + each_block(input) { |x| tmp.syswrite(x) } + tmp.rewind + tmp + end + + class SizedInput + attr_reader :size + + def initialize(input, n) + buf = "" + if n == nil + n = 0 + while input.read(16384, buf) + n += buf.size + end + input.rewind + end + @input, @size = input, n + end + + def read(*args) + @input.read(*args) + end + end + + class UpstreamSocket < Kgio::Socket + alias readpartial kgio_read! + end +end diff --git a/lib/rainbows/reverse_proxy/coolio.rb b/lib/rainbows/reverse_proxy/coolio.rb new file mode 100644 index 0000000..2197958 --- /dev/null +++ b/lib/rainbows/reverse_proxy/coolio.rb @@ -0,0 +1,61 @@ +# -*- encoding: binary -*- +# :enddoc: +# TODO: handle large responses without having it all in memory +module Rainbows::ReverseProxy::Coolio + LOOP = Cool.io::Loop.default + + class Backend < Cool.io::IO + include Rainbows::ReverseProxy::EvClient + + def initialize(env, addr, input) + @env = env + @input = input + @junk, @rbuf = "", "" + @parser = Kcar::Parser.new + @response = @body = nil + @headers = Rack::Utils::HeaderHash.new + super(UpstreamSocket.start(addr)) # kgio-enabled socket + end + + def on_write_complete + if @input + buf = @input.read(16384, @junk) and return write(buf) + @input = nil + end + end + + def on_readable + # avoiding IO#read_nonblock since that's expensive in 1.9.2 + case buf = @_io.kgio_tryread(16384, @junk) + when String + receive_data(buf) + when :wait_readable + return + when nil + @env[AsyncCallback].call(@response) + return close + end while true # we always read until EAGAIN or EOF + + rescue => e + case e + when Errno::ECONNRESET + @env[AsyncCallback].call(@response) + return close + when SystemCallError + else + logger = @env["rack.logger"] + logger.error "#{e} #{e.message}" + e.backtrace.each { |m| logger.error m } + end + @env[AsyncCallback].call(Rainbows::ReverseProxy::E502) + close + end + end + + def call(env) + input = prepare_input!(env) + sock = Backend.new(env, pick_upstream(env), input).attach(LOOP) + sock.write(build_headers(env, input)) + throw :async + end +end diff --git a/lib/rainbows/reverse_proxy/ev_client.rb b/lib/rainbows/reverse_proxy/ev_client.rb new file mode 100644 index 0000000..94e7f82 --- /dev/null +++ b/lib/rainbows/reverse_proxy/ev_client.rb @@ -0,0 +1,39 @@ +# -*- encoding: binary -*- +# :enddoc: +require 'tempfile' +module Rainbows::ReverseProxy::EvClient + include Rainbows::ReverseProxy::Synchronous + AsyncCallback = "async.callback" + CBB = Unicorn::TeeInput.client_body_buffer_size + Content_Length = "Content-Length" + Transfer_Encoding = "Transfer-Encoding" + + def receive_data(buf) + if @body + @body << buf + else + response = @parser.headers(@headers, @rbuf << buf) or return + if (cl = @headers[Content_Length] && cl.to_i > CBB) || + (%r{\bchunked\b} =~ @headers[Transfer_Encoding]) + @body = LargeBody.new("") + @body << @rbuf + @response = response << @body + else + @body = @rbuf.dup + @response = response << [ @body ] + end + end + end + + class LargeBody < Tempfile + def each(&block) + buf = "" + rewind + while read(16384, buf) + yield buf + end + end + + alias close close! + end +end diff --git a/lib/rainbows/reverse_proxy/event_machine.rb b/lib/rainbows/reverse_proxy/event_machine.rb new file mode 100644 index 0000000..8cb382c --- /dev/null +++ b/lib/rainbows/reverse_proxy/event_machine.rb @@ -0,0 +1,46 @@ +# -*- encoding: binary -*- +# :enddoc: +# TODO: handle large responses without having it all in memory +module Rainbows::ReverseProxy::EventMachine + class Backend < EM::Connection + include Rainbows::ReverseProxy::EvClient # provides receive_data + + # +addr+ is a packed sockaddr, so it can be either a UNIX or TCP socket + def initialize(env) + @env = env + @rbuf = "" + @parser = Kcar::Parser.new + @response = @body = nil + @headers = Rack::Utils::HeaderHash.new + end + + # prevents us from sending too much at once and OOM-ing on large uploads + def stream_input(input) + if buf = input.read(16384) + send_data buf + EM.next_tick { stream_input(input) } + end + end + + def on_write_complete + if @input + buf = @input.read(16384, @junk) and return write(buf) + @input = nil + end + end + + def unbind + @env[AsyncCallback].call(@response || Rainbows::ReverseProxy::E502) + end + end + + UpstreamSocket = Rainbows::ReverseProxy::UpstreamSocket + def call(env) + input = prepare_input!(env) + io = UpstreamSocket.start(pick_upstream(env)) + sock = EM.attach(io, Backend, env) + sock.send_data(build_headers(env, input)) + sock.stream_input(input) if input + throw :async + end +end diff --git a/lib/rainbows/reverse_proxy/multi_thread.rb b/lib/rainbows/reverse_proxy/multi_thread.rb new file mode 100644 index 0000000..6714bc0 --- /dev/null +++ b/lib/rainbows/reverse_proxy/multi_thread.rb @@ -0,0 +1,6 @@ +# -*- encoding -*- +module Rainbows::ReverseProxy::MultiThread + def pick_upstream(env) + @lock.synchronize { super(env) } + end +end diff --git a/lib/rainbows/reverse_proxy/synchronous.rb b/lib/rainbows/reverse_proxy/synchronous.rb new file mode 100644 index 0000000..b5911a9 --- /dev/null +++ b/lib/rainbows/reverse_proxy/synchronous.rb @@ -0,0 +1,21 @@ +# -*- encoding: binary -*- +# :enddoc: +module Rainbows::ReverseProxy::Synchronous + UpstreamSocket = Rainbows::ReverseProxy::UpstreamSocket + + def each_block(input, &block) + buf = "" + while input.read(16384, buf) + yield buf + end + end + + def call(env) + input = prepare_input!(env) + req = build_headers(env, input) + sock = UpstreamSocket.new(pick_upstream(env)) + sock.write(req) + each_block(input) { |buf| sock.kgio_write(buf) } if input + Kcar::Response.new(sock).rack + end +end |