about summary refs log tree commit homepage
path: root/lib/rainbows/event_machine
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2011-01-05 17:39:11 -0800
committerEric Wong <normalperson@yhbt.net>2011-01-06 14:14:30 -0800
commit2cb26ba8084cd37996330616b885de1c780d848e (patch)
tree6e829d0c1d4c49e666e97dacdfcc16e8eea0482d /lib/rainbows/event_machine
parent31a93152c8977f31045bd182ae99df4ebd088abf (diff)
downloadrainbows-2cb26ba8084cd37996330616b885de1c780d848e.tar.gz
This will allow Coolio to use it, too.
Diffstat (limited to 'lib/rainbows/event_machine')
-rw-r--r--lib/rainbows/event_machine/client.rb52
-rw-r--r--lib/rainbows/event_machine/response.rb38
2 files changed, 47 insertions, 43 deletions
diff --git a/lib/rainbows/event_machine/client.rb b/lib/rainbows/event_machine/client.rb
index d8ed6df..5abdc3b 100644
--- a/lib/rainbows/event_machine/client.rb
+++ b/lib/rainbows/event_machine/client.rb
@@ -1,8 +1,10 @@
 # -*- encoding: binary -*-
 # :enddoc:
+require 'rainbows/event_machine/response'
 class Rainbows::EventMachine::Client < EM::Connection
   attr_writer :body
   include Rainbows::EvCore
+  include Rainbows::EventMachine::Response
 
   def initialize(io)
     @_io = io
@@ -35,67 +37,31 @@ class Rainbows::EventMachine::Client < EM::Connection
     set_comm_inactivity_timeout 0
     @env[RACK_INPUT] = @input
     @env[REMOTE_ADDR] = @_io.kgio_addr
-    @env[ASYNC_CALLBACK] = method(:em_write_response)
+    @env[ASYNC_CALLBACK] = method(:write_async_response)
     @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
 
-    response = catch(:async) { APP.call(@env.merge!(RACK_DEFAULTS)) }
+    status, headers, body = catch(:async) {
+      APP.call(@env.merge!(RACK_DEFAULTS))
+    }
 
     # too tricky to support pipelining with :async since the
     # second (pipelined) request could be a stuck behind a
     # long-running async response
-    (response.nil? || -1 == response[0]) and return @state = :close
+    (status.nil? || -1 == status) and return @state = :close
 
     if @hp.next?
       @state = :headers
-      em_write_response(response, true)
+      write_response(status, headers, body, true)
       if @buf.empty?
         set_comm_inactivity_timeout(Rainbows.keepalive_timeout)
       elsif @body.nil?
         EM.next_tick { receive_data(nil) }
       end
     else
-      em_write_response(response, false)
+      write_response(status, headers, body, false)
     end
   end
 
-  # don't change this method signature, "async.callback" relies on it
-  def em_write_response(response, alive = false)
-    status, headers, body = response
-
-    if body.respond_to?(:errback) && body.respond_to?(:callback)
-      @body = body
-      body.callback { quit }
-      body.errback { quit }
-      alive = true
-    elsif body.respond_to?(:to_path)
-      st = File.stat(path = body.to_path)
-
-      if st.file?
-        write_headers(status, headers, alive)
-        @body = stream_file_data(path)
-        @body.errback do
-          body.close if body.respond_to?(:close)
-          quit
-        end
-        @body.callback do
-          body.close if body.respond_to?(:close)
-          @body = nil
-          alive ? receive_data(nil) : quit
-        end
-        return
-      elsif st.socket? || st.pipe?
-        io = body_to_io(@body = body)
-        chunk = stream_response_headers(status, headers, alive)
-        m = chunk ? Rainbows::EventMachine::ResponseChunkPipe :
-                    Rainbows::EventMachine::ResponsePipe
-        return EM.watch(io, m, self).notify_readable = true
-      end
-      # char or block device... WTF? fall through to body.each
-    end
-    write_response(status, headers, body, alive)
-    quit unless alive
-  end
-
   def next!
     @body.close if @body.respond_to?(:close)
     @hp.keepalive? ? receive_data(@body = nil) : quit
diff --git a/lib/rainbows/event_machine/response.rb b/lib/rainbows/event_machine/response.rb
new file mode 100644
index 0000000..49bcbd5
--- /dev/null
+++ b/lib/rainbows/event_machine/response.rb
@@ -0,0 +1,38 @@
+# -*- encoding: binary -*-
+# :enddoc:
+module Rainbows::EventMachine::Response
+  def write_response(status, headers, body, alive)
+    if body.respond_to?(:errback) && body.respond_to?(:callback)
+      @body = body
+      body.callback { quit }
+      body.errback { quit }
+      alive = true
+    elsif body.respond_to?(:to_path)
+      st = File.stat(path = body.to_path)
+
+      if st.file?
+        write_headers(status, headers, alive)
+        @body = stream_file_data(path)
+        @body.errback do
+          body.close if body.respond_to?(:close)
+          quit
+        end
+        @body.callback do
+          body.close if body.respond_to?(:close)
+          @body = nil
+          alive ? receive_data(nil) : quit
+        end
+        return
+      elsif st.socket? || st.pipe?
+        io = body_to_io(@body = body)
+        chunk = stream_response_headers(status, headers, alive)
+        m = chunk ? Rainbows::EventMachine::ResponseChunkPipe :
+                    Rainbows::EventMachine::ResponsePipe
+        return EM.watch(io, m, self).notify_readable = true
+      end
+      # char or block device... WTF? fall through to body.each
+    end
+    super(status, headers, body, alive)
+    quit unless alive
+  end
+end