about summary refs log tree commit homepage
path: root/lib/rainbows/reverse_proxy
diff options
context:
space:
mode:
Diffstat (limited to 'lib/rainbows/reverse_proxy')
-rw-r--r--lib/rainbows/reverse_proxy/coolio.rb61
-rw-r--r--lib/rainbows/reverse_proxy/ev_client.rb39
-rw-r--r--lib/rainbows/reverse_proxy/event_machine.rb46
-rw-r--r--lib/rainbows/reverse_proxy/multi_thread.rb6
-rw-r--r--lib/rainbows/reverse_proxy/synchronous.rb21
5 files changed, 173 insertions, 0 deletions
diff --git a/lib/rainbows/reverse_proxy/coolio.rb b/lib/rainbows/reverse_proxy/coolio.rb
new file mode 100644
index 0000000..2197958
--- /dev/null
+++ b/lib/rainbows/reverse_proxy/coolio.rb
@@ -0,0 +1,61 @@
+# -*- encoding: binary -*-
+# :enddoc:
+# TODO: handle large responses without having it all in memory
+module Rainbows::ReverseProxy::Coolio
+  LOOP = Cool.io::Loop.default
+
+  class Backend < Cool.io::IO
+    include Rainbows::ReverseProxy::EvClient
+
+    def initialize(env, addr, input)
+      @env = env
+      @input = input
+      @junk, @rbuf = "", ""
+      @parser = Kcar::Parser.new
+      @response = @body = nil
+      @headers = Rack::Utils::HeaderHash.new
+      super(UpstreamSocket.start(addr)) # kgio-enabled socket
+    end
+
+    def on_write_complete
+      if @input
+        buf = @input.read(16384, @junk) and return write(buf)
+        @input = nil
+      end
+    end
+
+    def on_readable
+      # avoiding IO#read_nonblock since that's expensive in 1.9.2
+      case buf = @_io.kgio_tryread(16384, @junk)
+      when String
+        receive_data(buf)
+      when :wait_readable
+        return
+      when nil
+        @env[AsyncCallback].call(@response)
+        return close
+      end while true # we always read until EAGAIN or EOF
+
+      rescue => e
+        case e
+        when Errno::ECONNRESET
+          @env[AsyncCallback].call(@response)
+          return close
+        when SystemCallError
+        else
+          logger = @env["rack.logger"]
+          logger.error "#{e} #{e.message}"
+          e.backtrace.each { |m| logger.error m }
+        end
+        @env[AsyncCallback].call(Rainbows::ReverseProxy::E502)
+        close
+    end
+  end
+
+  def call(env)
+    input = prepare_input!(env)
+    sock = Backend.new(env, pick_upstream(env), input).attach(LOOP)
+    sock.write(build_headers(env, input))
+    throw :async
+  end
+end
diff --git a/lib/rainbows/reverse_proxy/ev_client.rb b/lib/rainbows/reverse_proxy/ev_client.rb
new file mode 100644
index 0000000..94e7f82
--- /dev/null
+++ b/lib/rainbows/reverse_proxy/ev_client.rb
@@ -0,0 +1,39 @@
+# -*- encoding: binary -*-
+# :enddoc:
+require 'tempfile'
+module Rainbows::ReverseProxy::EvClient
+  include Rainbows::ReverseProxy::Synchronous
+  AsyncCallback = "async.callback"
+  CBB = Unicorn::TeeInput.client_body_buffer_size
+  Content_Length = "Content-Length"
+  Transfer_Encoding = "Transfer-Encoding"
+
+  def receive_data(buf)
+    if @body
+      @body << buf
+    else
+      response = @parser.headers(@headers, @rbuf << buf) or return
+      if (cl = @headers[Content_Length] && cl.to_i > CBB) ||
+         (%r{\bchunked\b} =~ @headers[Transfer_Encoding])
+        @body = LargeBody.new("")
+        @body << @rbuf
+        @response = response << @body
+      else
+        @body = @rbuf.dup
+        @response = response << [ @body ]
+      end
+    end
+  end
+
+  class LargeBody < Tempfile
+    def each(&block)
+      buf = ""
+      rewind
+      while read(16384, buf)
+        yield buf
+      end
+    end
+
+    alias close close!
+  end
+end
diff --git a/lib/rainbows/reverse_proxy/event_machine.rb b/lib/rainbows/reverse_proxy/event_machine.rb
new file mode 100644
index 0000000..8cb382c
--- /dev/null
+++ b/lib/rainbows/reverse_proxy/event_machine.rb
@@ -0,0 +1,46 @@
+# -*- encoding: binary -*-
+# :enddoc:
+# TODO: handle large responses without having it all in memory
+module Rainbows::ReverseProxy::EventMachine
+  class Backend < EM::Connection
+    include Rainbows::ReverseProxy::EvClient # provides receive_data
+
+    # +addr+ is a packed sockaddr, so it can be either a UNIX or TCP socket
+    def initialize(env)
+      @env = env
+      @rbuf = ""
+      @parser = Kcar::Parser.new
+      @response = @body = nil
+      @headers = Rack::Utils::HeaderHash.new
+    end
+
+    # prevents us from sending too much at once and OOM-ing on large uploads
+    def stream_input(input)
+      if buf = input.read(16384)
+        send_data buf
+        EM.next_tick { stream_input(input) }
+      end
+    end
+
+    def on_write_complete
+      if @input
+        buf = @input.read(16384, @junk) and return write(buf)
+        @input = nil
+      end
+    end
+
+    def unbind
+      @env[AsyncCallback].call(@response || Rainbows::ReverseProxy::E502)
+    end
+  end
+
+  UpstreamSocket = Rainbows::ReverseProxy::UpstreamSocket
+  def call(env)
+    input = prepare_input!(env)
+    io = UpstreamSocket.start(pick_upstream(env))
+    sock = EM.attach(io, Backend, env)
+    sock.send_data(build_headers(env, input))
+    sock.stream_input(input) if input
+    throw :async
+  end
+end
diff --git a/lib/rainbows/reverse_proxy/multi_thread.rb b/lib/rainbows/reverse_proxy/multi_thread.rb
new file mode 100644
index 0000000..6714bc0
--- /dev/null
+++ b/lib/rainbows/reverse_proxy/multi_thread.rb
@@ -0,0 +1,6 @@
+# -*- encoding -*-
+module Rainbows::ReverseProxy::MultiThread
+  def pick_upstream(env)
+    @lock.synchronize { super(env) }
+  end
+end
diff --git a/lib/rainbows/reverse_proxy/synchronous.rb b/lib/rainbows/reverse_proxy/synchronous.rb
new file mode 100644
index 0000000..b5911a9
--- /dev/null
+++ b/lib/rainbows/reverse_proxy/synchronous.rb
@@ -0,0 +1,21 @@
+# -*- encoding: binary -*-
+# :enddoc:
+module Rainbows::ReverseProxy::Synchronous
+  UpstreamSocket = Rainbows::ReverseProxy::UpstreamSocket
+
+  def each_block(input, &block)
+    buf = ""
+    while input.read(16384, buf)
+      yield buf
+    end
+  end
+
+  def call(env)
+    input = prepare_input!(env)
+    req = build_headers(env, input)
+    sock = UpstreamSocket.new(pick_upstream(env))
+    sock.write(req)
+    each_block(input) { |buf| sock.kgio_write(buf) } if input
+    Kcar::Response.new(sock).rack
+  end
+end