about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2009-10-24 22:13:23 -0700
committerEric Wong <normalperson@yhbt.net>2009-10-26 02:26:22 -0700
commit344f8cf1e000704d53d7841eb896d83b470d7a08 (patch)
tree90c62aaab04269aa34b9728070d56f5551109722
parentbb9939c2397b76628e862c93799fdc57909504f4 (diff)
downloadrainbows-344f8cf1e000704d53d7841eb896d83b470d7a08.tar.gz
EventMachine and Rev models seem to be able to share a lot of
common code, so lets share.  We may support Packet in the
future, too, and end up with a similar programming model there
as well.
-rw-r--r--lib/rainbows/ev_core.rb89
-rw-r--r--lib/rainbows/event_machine.rb84
-rw-r--r--lib/rainbows/rev.rb79
3 files changed, 95 insertions, 157 deletions
diff --git a/lib/rainbows/ev_core.rb b/lib/rainbows/ev_core.rb
new file mode 100644
index 0000000..aa0b155
--- /dev/null
+++ b/lib/rainbows/ev_core.rb
@@ -0,0 +1,89 @@
+# -*- encoding: binary -*-
+
+module Rainbows
+
+  # base module for evented models like Rev and EventMachine
+  module EvCore
+    include Unicorn
+    include Rainbows::Const
+    G = Rainbows::G
+
+    def post_init
+      @remote_addr = ::TCPSocket === @_io ? @_io.peeraddr.last : LOCALHOST
+      @env = {}
+      @hp = HttpParser.new
+      @state = :headers # [ :body [ :trailers ] ] :app_call :close
+      @buf = ""
+      @deferred_bodies = [] # for (fast) regular files only
+    end
+
+    # graceful exit, like SIGQUIT
+    def quit
+      @deferred_bodies.clear
+      @state = :close
+    end
+
+    def handle_error(e)
+      quit
+      msg = case e
+      when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
+        ERROR_500_RESPONSE
+      when HttpParserError # try to tell the client they're bad
+        ERROR_400_RESPONSE
+      else
+        G.logger.error "Read error: #{e.inspect}"
+        G.logger.error e.backtrace.join("\n")
+        ERROR_500_RESPONSE
+      end
+      write(msg)
+    end
+
+    def tmpio
+      io = Util.tmpio
+      def io.size
+        # already sync=true at creation, so no need to flush before stat
+        stat.size
+      end
+      io
+    end
+
+    # TeeInput doesn't map too well to this right now...
+    def on_read(data)
+      case @state
+      when :headers
+        @hp.headers(@env, @buf << data) or return
+        @state = :body
+        len = @hp.content_length
+        if len == 0
+          @input = HttpRequest::NULL_IO
+          app_call # common case
+        else # nil or len > 0
+          # since we don't do streaming input, we have no choice but
+          # to take over 100-continue handling from the Rack application
+          if @env[HTTP_EXPECT] =~ /\A100-continue\z/i
+            write(EXPECT_100_RESPONSE)
+            @env.delete(HTTP_EXPECT)
+          end
+          @input = len && len <= MAX_BODY ? StringIO.new("") : tmpio
+          @hp.filter_body(@buf2 = @buf.dup, @buf)
+          @input << @buf2
+          on_read("")
+        end
+      when :body
+        if @hp.body_eof?
+          @state = :trailers
+          on_read(data)
+        elsif data.size > 0
+          @hp.filter_body(@buf2, @buf << data)
+          @input << @buf2
+          on_read("")
+        end
+      when :trailers
+        @hp.trailers(@env, @buf << data) and app_call
+      end
+      rescue Object => e
+        handle_error(e)
+    end
+
+  end
+end
diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb
index 3a7349c..2cc0f15 100644
--- a/lib/rainbows/event_machine.rb
+++ b/lib/rainbows/event_machine.rb
@@ -1,5 +1,6 @@
 # -*- encoding: binary -*-
 require 'eventmachine'
+require 'rainbows/ev_core'
 
 module Rainbows
 
@@ -26,45 +27,15 @@ module Rainbows
     include Base
 
     class Client < EM::Connection
-      include Unicorn
-      include Rainbows::Const
+      include Rainbows::EvCore
       G = Rainbows::G
 
       def initialize(io)
         @_io = io
       end
 
-      def post_init
-        @remote_addr = ::TCPSocket === @_io ? @_io.peeraddr.last : LOCALHOST
-        @env = {}
-        @hp = HttpParser.new
-        @state = :headers # [ :body [ :trailers ] ] :app_call :close
-        @buf = ""
-        @deferred_bodies = [] # for (fast) regular files only
-      end
-
-      # graceful exit, like SIGQUIT
-      def quit
-        @deferred_bodies.clear
-        @state = :close
-      end
-
       alias write send_data
-
-      def handle_error(e)
-        quit
-        msg = case e
-        when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
-          ERROR_500_RESPONSE
-        when HttpParserError # try to tell the client they're bad
-          ERROR_400_RESPONSE
-        else
-          G.logger.error "Read error: #{e.inspect}"
-          G.logger.error e.backtrace.join("\n")
-          ERROR_500_RESPONSE
-        end
-        write(msg)
-      end
+      alias receive_data on_read
 
       def app_call
         begin
@@ -108,55 +79,6 @@ module Rainbows
         end
       end
 
-      def tmpio
-        io = Util.tmpio
-        def io.size
-          # already sync=true at creation, so no need to flush before stat
-          stat.size
-        end
-        io
-      end
-
-      alias on_read receive_data
-
-      # TeeInput doesn't map too well to this right now...
-      def receive_data(data)
-        case @state
-        when :headers
-          @hp.headers(@env, @buf << data) or return
-          @state = :body
-          len = @hp.content_length
-          if len == 0
-            @input = HttpRequest::NULL_IO
-            app_call # common case
-          else # nil or len > 0
-            # since we don't do streaming input, we have no choice but
-            # to take over 100-continue handling from the Rack application
-            if @env[HTTP_EXPECT] =~ /\A100-continue\z/i
-              write(EXPECT_100_RESPONSE)
-              @env.delete(HTTP_EXPECT)
-            end
-            @input = len && len <= MAX_BODY ? StringIO.new("") : tmpio
-            @hp.filter_body(@buf2 = @buf.dup, @buf)
-            @input << @buf2
-            on_read("")
-          end
-        when :body
-          if @hp.body_eof?
-            @state = :trailers
-            on_read(data)
-          elsif data.size > 0
-            @hp.filter_body(@buf2, @buf << data)
-            @input << @buf2
-            on_read("")
-          end
-        when :trailers
-          @hp.trailers(@env, @buf << data) and app_call
-        end
-        rescue Object => e
-          handle_error(e)
-      end
-
     end
 
     module Server
diff --git a/lib/rainbows/rev.rb b/lib/rainbows/rev.rb
index d27538c..572b88a 100644
--- a/lib/rainbows/rev.rb
+++ b/lib/rainbows/rev.rb
@@ -1,5 +1,6 @@
 # -*- encoding: binary -*-
 require 'rev'
+require 'rainbows/ev_core'
 
 module Rainbows
 
@@ -26,25 +27,13 @@ module Rainbows
     include Base
 
     class Client < ::Rev::IO
-      include Unicorn
-      include Rainbows::Const
+      include Rainbows::EvCore
       G = Rainbows::G
 
       def initialize(io)
         G.cur += 1
         super(io)
-        @remote_addr = ::TCPSocket === io ? io.peeraddr.last : LOCALHOST
-        @env = {}
-        @hp = HttpParser.new
-        @state = :headers # [ :body [ :trailers ] ] :app_call :close
-        @buf = ""
-        @deferred_bodies = [] # for (fast) regular files only
-      end
-
-      # graceful exit, like SIGQUIT
-      def quit
-        @deferred_bodies.clear
-        @state = :close
+        post_init
       end
 
       # queued, optional response bodies, it should only be unpollable "fast"
@@ -56,21 +45,6 @@ module Rainbows
         on_write_complete unless @hp.headers? # triggers a write
       end
 
-      def handle_error(e)
-        quit
-        msg = case e
-        when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
-          ERROR_500_RESPONSE
-        when HttpParserError # try to tell the client they're bad
-          ERROR_400_RESPONSE
-        else
-          G.logger.error "Read error: #{e.inspect}"
-          G.logger.error e.backtrace.join("\n")
-          ERROR_500_RESPONSE
-        end
-        write(msg)
-      end
-
       def app_call
         begin
           (@env[RACK_INPUT] = @input).rewind
@@ -116,53 +90,6 @@ module Rainbows
       def on_close
         G.cur -= 1
       end
-
-      def tmpio
-        io = Util.tmpio
-        def io.size
-          # already sync=true at creation, so no need to flush before stat
-          stat.size
-        end
-        io
-      end
-
-      # TeeInput doesn't map too well to this right now...
-      def on_read(data)
-        case @state
-        when :headers
-          @hp.headers(@env, @buf << data) or return
-          @state = :body
-          len = @hp.content_length
-          if len == 0
-            @input = HttpRequest::NULL_IO
-            app_call # common case
-          else # nil or len > 0
-            # since we don't do streaming input, we have no choice but
-            # to take over 100-continue handling from the Rack application
-            if @env[HTTP_EXPECT] =~ /\A100-continue\z/i
-              write(EXPECT_100_RESPONSE)
-              @env.delete(HTTP_EXPECT)
-            end
-            @input = len && len <= MAX_BODY ? StringIO.new("") : tmpio
-            @hp.filter_body(@buf2 = @buf.dup, @buf)
-            @input << @buf2
-            on_read("")
-          end
-        when :body
-          if @hp.body_eof?
-            @state = :trailers
-            on_read(data)
-          elsif data.size > 0
-            @hp.filter_body(@buf2, @buf << data)
-            @input << @buf2
-            on_read("")
-          end
-        when :trailers
-          @hp.trailers(@env, @buf << data) and app_call
-        end
-        rescue Object => e
-          handle_error(e)
-      end
     end
 
     class Server < ::Rev::IO