about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2009-10-25 22:50:59 -0700
committerEric Wong <normalperson@yhbt.net>2009-10-26 02:26:22 -0700
commitc52c26c126f5c7d3871257fa41651fc91ccfd20e (patch)
tree2d2bcf5512f9831ff4f177aa0c2682bd75a9c436
parente4552b3db07fb5fb4816eae89f7feed50e5cd629 (diff)
downloadrainbows-c52c26c126f5c7d3871257fa41651fc91ccfd20e.tar.gz
This means Rainbows::DevFdBody async responses and large
file streaming without slurping.

This is only with eventmachine 0.12.8, it looks like 0.12.10
changes the attach/watch API...
-rw-r--r--lib/rainbows/event_machine.rb94
-rw-r--r--lib/rainbows/http_server.rb5
-rw-r--r--local.mk.sample2
-rw-r--r--t/GNUmakefile4
4 files changed, 84 insertions, 21 deletions
diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb
index 176bf51..6fe8e85 100644
--- a/lib/rainbows/event_machine.rb
+++ b/lib/rainbows/event_machine.rb
@@ -50,40 +50,99 @@ module Rainbows
           response = G.app.call(@env.update(RACK_DEFAULTS))
           alive &&= G.alive
           out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers?
+          response_write(response, out, alive)
 
-          HttpResponse.write(self, response, out)
           if alive
             @env.clear
             @hp.reset
             @state = :headers
             # keepalive requests are always body-less, so @input is unchanged
             @hp.headers(@env, @buf) and next
-          else
-            quit
           end
           return
         end while true
       end
 
-      def on_write_complete
-        if body = @deferred_bodies.first
-          return if DeferredResponse === body
-          begin
-            begin
-              write(body.sysread(CHUNK_SIZE))
-            rescue EOFError # expected at file EOF
-              @deferred_bodies.shift
-              body.close
-              close if :close == @state && @deferred_bodies.empty?
-            end
-          rescue Object => e
-            handle_error(e)
+      def response_write(response, out, alive)
+        body = response.last
+        unless body.respond_to?(:to_path)
+          HttpResponse.write(self, response, out)
+          quit unless alive
+          return
+        end
+
+        headers = Rack::Utils::HeaderHash.new(response[1])
+        path = body.to_path
+        io = body.to_io if body.respond_to?(:to_io)
+        io ||= IO.new($1.to_i) if path =~ %r{\A/dev/fd/(\d+)\z}
+        io ||= File.open(path, 'rb') # could be a named pipe
+
+        st = io.stat
+        if st.file?
+          headers.delete('Transfer-Encoding')
+          headers['Content-Length'] ||= st.size.to_s
+          response = [ response.first, headers.to_hash, [] ]
+          HttpResponse.write(self, response, out)
+          stream = stream_file_data(path)
+          stream.callback { quit } unless alive
+        elsif st.socket? || st.pipe?
+          do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i)
+          do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no'
+          if out.nil?
+            do_chunk = false
+          else
+            out[0] = CONN_CLOSE
+          end
+          response = [ response.first, headers.to_hash, [] ]
+          HttpResponse.write(self, response, out)
+          if do_chunk
+            EM.attach(io, ResponseChunkPipe, io, self)
+          else
+            EM.enable_proxy(EM.attach(io, ResponsePipe, io, self), self)
           end
         else
-          close if :close == @state
+          HttpResponse.write(self, response, out)
         end
       end
 
+      def unbind
+        @_io.close
+      end
+    end
+
+    module ResponsePipe
+      def initialize(io, client)
+        @io, @client = io, client
+      end
+
+      def unbind
+        @io.close
+        @client.quit
+      end
+    end
+
+    module ResponseChunkPipe
+      include ResponsePipe
+
+      def unbind
+        @client.write("0\r\n\r\n")
+        super
+      end
+
+      def notify_readable
+        begin
+          data = begin
+            @io.read_nonblock(16384)
+          rescue Errno::EINTR
+            retry
+          rescue Errno::EAGAIN
+            return
+          end
+          @client.send_data(sprintf("%x\r\n", data.size))
+          @client.send_data(data)
+          @client.send_data("\r\n")
+        end while true
+      end
     end
 
     module Server
@@ -116,6 +175,7 @@ module Rainbows
     def worker_loop(worker)
       init_worker_process(worker)
       m = 0
+      logger.info "EventMachine: epoll=#{EM.epoll} kqueue=#{EM.kqueue}"
       EM.run {
         conns = EM.instance_variable_get(:@conns) or
           raise RuntimeError, "EM @conns instance variable not accessible!"
diff --git a/lib/rainbows/http_server.rb b/lib/rainbows/http_server.rb
index 5521513..4c4b63b 100644
--- a/lib/rainbows/http_server.rb
+++ b/lib/rainbows/http_server.rb
@@ -33,7 +33,10 @@ module Rainbows
       extend(mod)
       Const::RACK_DEFAULTS['rainbows.model'] = @use = model
       Const::RACK_DEFAULTS['rack.multithread'] = !!(/Thread/ =~ model.to_s)
-      Const::RACK_DEFAULTS['rainbows.autochunk'] = (model.to_s == "Rev")
+      case model
+      when :Rev, :EventMachine
+        Const::RACK_DEFAULTS['rainbows.autochunk'] = true
+      end
     end
 
     def worker_connections(*args)
diff --git a/local.mk.sample b/local.mk.sample
index 59a8e6a..2b01c46 100644
--- a/local.mk.sample
+++ b/local.mk.sample
@@ -5,7 +5,7 @@
 # This is depends on a bunch of GNU-isms from bash, sed, touch.
 
 DLEXT := so
-gems := rev-0.3.1 rack-1.0.0 iobuffer-0.1.1
+gems := rev-0.3.1 rack-1.0.0 iobuffer-0.1.1 eventmachine-0.12.8
 
 # Avoid loading rubygems to speed up tests because gmake is
 # fork+exec heavy with Ruby.
diff --git a/t/GNUmakefile b/t/GNUmakefile
index 4fcbc81..4cfcfbf 100644
--- a/t/GNUmakefile
+++ b/t/GNUmakefile
@@ -16,8 +16,8 @@ else
 endif
 export RUBYLIB RUBY_VERSION
 
-models := ThreadPool ThreadSpawn Revactor Rev
-all_models := $(models) Base EventMachine
+models := ThreadPool ThreadSpawn Revactor Rev EventMachine
+all_models := $(models) Base
 
 T = $(wildcard t[0-9][0-9][0-9][0-9]-*.sh)