about summary refs log tree commit homepage
path: root/lib/rainbows/event_machine.rb
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2009-10-25 22:50:59 -0700
committerEric Wong <normalperson@yhbt.net>2009-10-26 02:26:22 -0700
commitc52c26c126f5c7d3871257fa41651fc91ccfd20e (patch)
tree2d2bcf5512f9831ff4f177aa0c2682bd75a9c436 /lib/rainbows/event_machine.rb
parente4552b3db07fb5fb4816eae89f7feed50e5cd629 (diff)
downloadrainbows-c52c26c126f5c7d3871257fa41651fc91ccfd20e.tar.gz
This means Rainbows::DevFdBody async responses and large
file streaming without slurping.

This is only with eventmachine 0.12.8, it looks like 0.12.10
changes the attach/watch API...
Diffstat (limited to 'lib/rainbows/event_machine.rb')
-rw-r--r--lib/rainbows/event_machine.rb94
1 files changed, 77 insertions, 17 deletions
diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb
index 176bf51..6fe8e85 100644
--- a/lib/rainbows/event_machine.rb
+++ b/lib/rainbows/event_machine.rb
@@ -50,40 +50,99 @@ module Rainbows
           response = G.app.call(@env.update(RACK_DEFAULTS))
           alive &&= G.alive
           out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers?
+          response_write(response, out, alive)
 
-          HttpResponse.write(self, response, out)
           if alive
             @env.clear
             @hp.reset
             @state = :headers
             # keepalive requests are always body-less, so @input is unchanged
             @hp.headers(@env, @buf) and next
-          else
-            quit
           end
           return
         end while true
       end
 
-      def on_write_complete
-        if body = @deferred_bodies.first
-          return if DeferredResponse === body
-          begin
-            begin
-              write(body.sysread(CHUNK_SIZE))
-            rescue EOFError # expected at file EOF
-              @deferred_bodies.shift
-              body.close
-              close if :close == @state && @deferred_bodies.empty?
-            end
-          rescue Object => e
-            handle_error(e)
+      def response_write(response, out, alive)
+        body = response.last
+        unless body.respond_to?(:to_path)
+          HttpResponse.write(self, response, out)
+          quit unless alive
+          return
+        end
+
+        headers = Rack::Utils::HeaderHash.new(response[1])
+        path = body.to_path
+        io = body.to_io if body.respond_to?(:to_io)
+        io ||= IO.new($1.to_i) if path =~ %r{\A/dev/fd/(\d+)\z}
+        io ||= File.open(path, 'rb') # could be a named pipe
+
+        st = io.stat
+        if st.file?
+          headers.delete('Transfer-Encoding')
+          headers['Content-Length'] ||= st.size.to_s
+          response = [ response.first, headers.to_hash, [] ]
+          HttpResponse.write(self, response, out)
+          stream = stream_file_data(path)
+          stream.callback { quit } unless alive
+        elsif st.socket? || st.pipe?
+          do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i)
+          do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no'
+          if out.nil?
+            do_chunk = false
+          else
+            out[0] = CONN_CLOSE
+          end
+          response = [ response.first, headers.to_hash, [] ]
+          HttpResponse.write(self, response, out)
+          if do_chunk
+            EM.attach(io, ResponseChunkPipe, io, self)
+          else
+            EM.enable_proxy(EM.attach(io, ResponsePipe, io, self), self)
           end
         else
-          close if :close == @state
+          HttpResponse.write(self, response, out)
         end
       end
 
+      def unbind
+        @_io.close
+      end
+    end
+
+    module ResponsePipe
+      def initialize(io, client)
+        @io, @client = io, client
+      end
+
+      def unbind
+        @io.close
+        @client.quit
+      end
+    end
+
+    module ResponseChunkPipe
+      include ResponsePipe
+
+      def unbind
+        @client.write("0\r\n\r\n")
+        super
+      end
+
+      def notify_readable
+        begin
+          data = begin
+            @io.read_nonblock(16384)
+          rescue Errno::EINTR
+            retry
+          rescue Errno::EAGAIN
+            return
+          end
+          @client.send_data(sprintf("%x\r\n", data.size))
+          @client.send_data(data)
+          @client.send_data("\r\n")
+        end while true
+      end
     end
 
     module Server
@@ -116,6 +175,7 @@ module Rainbows
     def worker_loop(worker)
       init_worker_process(worker)
       m = 0
+      logger.info "EventMachine: epoll=#{EM.epoll} kqueue=#{EM.kqueue}"
       EM.run {
         conns = EM.instance_variable_get(:@conns) or
           raise RuntimeError, "EM @conns instance variable not accessible!"