rainbows.git  about / heads / tags
Unicorn for sleepy apps and slow clients
blob 039b7a6794bfbdcfc626257903467c623383c3d6 3723 bytes (raw)
$ git show HEAD:lib/rainbows/event_machine/client.rb	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
 
# -*- encoding: binary -*-
# :enddoc:
class Rainbows::EventMachine::Client < EM::Connection
  include Rainbows::EvCore
  Rainbows.config!(self, :keepalive_timeout)

  def initialize(io)
    @_io = io
    @deferred = nil
  end

  alias write send_data
  alias hijacked detach

  def receive_data(data)
    # To avoid clobbering the current streaming response
    # (often a static file), we do not attempt to process another
    # request on the same connection until the first is complete
    if @deferred
      if data
        @buf << data
        @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000
      end
      EM.next_tick { receive_data(nil) } unless @buf.empty?
    else
      on_read(data || ''.freeze) if (@buf.size > 0) || data
    end
  end

  def quit
    super
    close_connection_after_writing if nil == @deferred
  end

  def app_call input
    set_comm_inactivity_timeout 0
    @env['rack.input'] = input
    @env['REMOTE_ADDR'] = @_io.kgio_addr
    @env['async.callback'] = method(:write_async_response)
    @env['async.close'] = EM::DefaultDeferrable.new
    @hp.hijack_setup(@_io)
    status, headers, body = catch(:async) {
      APP.call(@env.merge!(RACK_DEFAULTS))
    }
    return hijacked if @hp.hijacked?

    if (nil == status || -1 == status)
      @deferred = true
    else
      ev_write_response(status, headers, body, @hp.next?)
    end
  end

  def deferred_errback(orig_body)
    @deferred.errback do
      orig_body.close if orig_body.respond_to?(:close)
      @deferred = nil
      quit
    end
  end

  def deferred_callback(orig_body, alive)
    @deferred.callback do
      orig_body.close if orig_body.respond_to?(:close)
      @deferred = nil
      alive ? receive_data(nil) : quit
    end
  end

  def ev_write_response(status, headers, body, alive)
    @state = :headers if alive
    if body.respond_to?(:errback) && body.respond_to?(:callback)
      write_headers(status, headers, alive, body) or return hijacked
      @deferred = body
      write_body_each(body)
      deferred_errback(body)
      deferred_callback(body, alive)
      return
    elsif body.respond_to?(:to_path)
      st = File.stat(path = body.to_path)

      if st.file?
        write_headers(status, headers, alive, body) or return hijacked
        @deferred = stream_file_data(path)
        deferred_errback(body)
        deferred_callback(body, alive)
        return
      elsif st.socket? || st.pipe?
        chunk = stream_response_headers(status, headers, alive, body)
        return hijacked if nil == chunk
        io = body_to_io(@deferred = body)
        m = chunk ? Rainbows::EventMachine::ResponseChunkPipe :
                    Rainbows::EventMachine::ResponsePipe
        return EM.watch(io, m, self).notify_readable = true
      end
      # char or block device... WTF? fall through to body.each
    end
    write_response(status, headers, body, alive) or return hijacked
    if alive
      if @deferred.nil?
        if @buf.empty?
          set_comm_inactivity_timeout(KEEPALIVE_TIMEOUT)
        else
          EM.next_tick { receive_data(nil) }
        end
      end
    else
      quit unless @deferred
    end
  end

  def next!
    @deferred.close if @deferred.respond_to?(:close)
    @deferred = nil
    @hp.keepalive? ? receive_data(nil) : quit
  end

  def unbind
    return if @hp.hijacked?
    async_close = @env['async.close'] and async_close.succeed
    @deferred.respond_to?(:fail) and @deferred.fail
    begin
      @_io.close
    rescue Errno::EBADF
      # EventMachine's EventableDescriptor::Close() may close
      # the underlying file descriptor without invalidating the
      # associated IO object on errors, so @_io.closed? isn't
      # sufficient.
    end
  end
end

git clone https://yhbt.net/rainbows.git