From 40445641f11f01c6a24bf96c8b80eed5fd33a512 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 28 Dec 2010 17:59:27 -0800 Subject: complete Rev => Coolio renaming We use Cool.io internally everywhere now, but preserve Rev-based models for anybody using them. --- lib/rainbows/coolio/client.rb | 191 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 191 insertions(+) create mode 100644 lib/rainbows/coolio/client.rb (limited to 'lib/rainbows/coolio/client.rb') diff --git a/lib/rainbows/coolio/client.rb b/lib/rainbows/coolio/client.rb new file mode 100644 index 0000000..7ecea3c --- /dev/null +++ b/lib/rainbows/coolio/client.rb @@ -0,0 +1,191 @@ +# -*- encoding: binary -*- +# :enddoc: +class Rainbows::Coolio::Client < Coolio::IO + include Rainbows::EvCore + G = Rainbows::G + SF = Rainbows::StreamFile + CONN = Rainbows::Coolio::CONN + KATO = Rainbows::Coolio::KATO + DeferredResponse = Rainbows::Coolio::DeferredResponse + DeferredChunkResponse = Rainbows::Coolio::DeferredChunkResponse + + def initialize(io) + CONN[self] = false + super(io) + post_init + @deferred = nil + end + + def want_more + enable unless enabled? + end + + def quit + super + close if @deferred.nil? && @_write_buffer.empty? + end + + # override the Coolio::IO#write method try to write directly to the + # kernel socket buffers to avoid an extra userspace copy if + # possible. + def write(buf) + if @_write_buffer.empty? + begin + case rv = @_io.kgio_trywrite(buf) + when nil + return enable_write_watcher + when :wait_writable + break # fall through to super(buf) + when String + buf = rv # retry, skb could grow or been drained + end + rescue => e + return handle_error(e) + end while true + end + super(buf) + end + + def on_readable + buf = @_io.kgio_tryread(16384) + case buf + when :wait_readable + when nil # eof + close + else + on_read buf + end + rescue Errno::ECONNRESET + close + end + + # queued, optional response bodies, it should only be unpollable "fast" + # devices where read(2) is uninterruptable. Unfortunately, NFS and ilk + # are also part of this. We'll also stick DeferredResponse bodies in + # here to prevent connections from being closed on us. + def defer_body(io) + @deferred = io + enable_write_watcher + end + + # allows enabling of write watcher even when read watcher is disabled + def evloop + LOOP # this constant is set in when a worker starts + end + + def next! + attached? or return + @deferred = nil + enable_write_watcher + end + + def timeout? + @deferred.nil? && @_write_buffer.empty? and close.nil? + end + + # used for streaming sockets and pipes + def stream_response(status, headers, io, body) + c = stream_response_headers(status, headers) if headers + # we only want to attach to the Coolio::Loop belonging to the + # main thread in Ruby 1.9 + io = (c ? DeferredChunkResponse : DeferredResponse).new(io, self, body) + defer_body(io.attach(LOOP)) + end + + def coolio_write_response(response, alive) + status, headers, body = response + headers = @hp.headers? ? HH.new(headers) : nil + + headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE if headers + if body.respond_to?(:to_path) + io = body_to_io(body) + st = io.stat + + if st.file? + offset, count = 0, st.size + if headers + if range = make_range!(@env, status, headers) + status, offset, count = range + end + write(response_header(status, headers)) + end + return defer_body(SF.new(offset, count, io, body)) + elsif st.socket? || st.pipe? + return stream_response(status, headers, io, body) + end + # char or block device... WTF? fall through to body.each + end + write(response_header(status, headers)) if headers + write_body_each(self, body, nil) + end + + def app_call + KATO.delete(self) + @env[RACK_INPUT] = @input + @env[REMOTE_ADDR] = @_io.kgio_addr + response = APP.call(@env.update(RACK_DEFAULTS)) + + coolio_write_response(response, alive = @hp.next? && G.alive) + return quit unless alive && :close != @state + @state = :headers + disable if enabled? + end + + def on_write_complete + case @deferred + when DeferredResponse then return + when NilClass # fall through + else + begin + return rev_sendfile(@deferred) + rescue EOFError # expected at file EOF + close_deferred + end + end + + case @state + when :close + close if @_write_buffer.empty? + when :headers + if @buf.empty? + unless enabled? + enable + KATO[self] = Time.now + end + else + on_read("") + end + end + rescue => e + handle_error(e) + end + + def handle_error(e) + close_deferred + if msg = Rainbows::Error.response(e) + @_io.kgio_trywrite(msg) rescue nil + end + @_write_buffer.clear + ensure + quit + end + + def close_deferred + case @deferred + when DeferredResponse, NilClass + else + begin + @deferred.close + rescue => e + G.server.logger.error("closing #@deferred: #{e}") + end + @deferred = nil + end + end + + def on_close + close_deferred + CONN.delete(self) + KATO.delete(self) + end +end -- cgit v1.2.3-24-ge0c7