rainbows.git  about / heads / tags
Unicorn for sleepy apps and slow clients
blob e132f188267712fa4db0defa917161e7fd8a0a4f 2175 bytes (raw)
$ git show v0.6.0:lib/rainbows/ev_thread_core.rb	# shows this blob on the CLI

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
 
# -*- encoding: binary -*-
require 'thread' # for Queue
require 'rainbows/ev_core'

module Rainbows

  # base module for mixed Thread + evented models like RevThreadSpawn
  module EvThreadCore
    include EvCore

    def post_init
      super
      @lock = Mutex.new
      @thread = nil
    end

    # we pass ourselves off as a Socket to Unicorn::TeeInput and this
    # is the only method Unicorn::TeeInput requires from the socket
    def readpartial(length, buf = "")
      # we must modify the original buffer if there was one
      length == 0 and return buf.replace("")

      # wait on the main loop to feed us
      while @tbuf.size == 0
        @tbuf.write(@state.pop)
        resume
      end
      buf.replace(@tbuf.read(length))
    end

    def app_spawn(input)
      begin
        @thread.nil? or @thread.join # only one thread per connection
        env = @env.dup
        alive, headers = @hp.keepalive?, @hp.headers?
        @thread = Thread.new(self) do |client|
          begin
            env[REMOTE_ADDR] = @remote_addr
            env[RACK_INPUT] = input || TeeInput.new(client, env, @hp, @buf)
            response = APP.call(env.update(RACK_DEFAULTS))
            if 100 == response.first.to_i
              write(EXPECT_100_RESPONSE)
              env.delete(HTTP_EXPECT)
              response = APP.call(env)
            end

            alive &&= G.alive
            out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if headers
            response_write(response, out)
          rescue => e
            handle_error(e) rescue nil
          end
        end
        if alive # in case we pipeline
          @hp.reset
          redo if @hp.headers(@env.clear, @buf)
        end
      end while false
    end

    def on_read(data)
      case @state
      when :headers
        @hp.headers(@env, @buf << data) or return
        if 0 == @hp.content_length
          app_spawn(HttpRequest::NULL_IO) # common case
        else # nil or len > 0
          @state, @tbuf = Queue.new, ::IO::Buffer.new
          app_spawn(nil)
        end
      when Queue
        pause
        @state << data
      end
      rescue => e
        handle_error(e)
    end

  end
end

git clone https://yhbt.net/rainbows.git