about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/rainbows.rb1
-rw-r--r--lib/rainbows/rev.rb150
-rwxr-xr-xt/bin/content-md5-put36
-rw-r--r--t/content-md5.ru23
-rwxr-xr-xt/t4000-rev-basic.sh51
-rw-r--r--t/t4000.ru3
-rwxr-xr-xt/t4002-rev-graceful.sh52
-rwxr-xr-xt/t4100-rev-rack-input.sh44
-rwxr-xr-xt/t4101-rev-rack-input-trailer.sh51
-rw-r--r--t/test-lib.sh8
10 files changed, 419 insertions, 0 deletions
diff --git a/lib/rainbows.rb b/lib/rainbows.rb
index 94246f5..c2813a5 100644
--- a/lib/rainbows.rb
+++ b/lib/rainbows.rb
@@ -47,6 +47,7 @@ module Rainbows
     :Revactor => 50,
     :ThreadSpawn => 30,
     :ThreadPool => 10,
+    :Rev => 50,
   }.each do |model, _|
     u = model.to_s.gsub(/([a-z0-9])([A-Z0-9])/) { "#{$1}_#{$2.downcase!}" }
     autoload model, "rainbows/#{u.downcase!}"
diff --git a/lib/rainbows/rev.rb b/lib/rainbows/rev.rb
new file mode 100644
index 0000000..8cd76ae
--- /dev/null
+++ b/lib/rainbows/rev.rb
@@ -0,0 +1,150 @@
+# -*- encoding: binary -*-
+require 'rev'
+
+# workaround revactor 0.1.4 still using the old Rev::Buffer
+# ref: http://rubyforge.org/pipermail/revactor-talk/2009-October/000034.html
+defined?(Rev::Buffer) or Rev::Buffer = IO::Buffer
+
+module Rainbows
+
+  module Rev
+
+    # global vars because class/instance variables are confusing me :<
+    # this struct is only accessed inside workers and thus private to each
+    G = Struct.new(:nr, :max, :logger, :alive, :app).new
+
+    include Base
+
+    class Client < ::Rev::IO
+      include Unicorn
+      include Rainbows::Const
+      G = Rainbows::Rev::G
+
+      def initialize(io)
+        G.nr += 1
+        super(io)
+        @remote_addr = ::TCPSocket === io ? io.peeraddr.last : LOCALHOST
+        @env = {}
+        @hp = HttpParser.new
+        @state = :headers # [ :body [ :trailers ] ] :app_call :close
+        @buf = ""
+      end
+
+      def handle_error(e)
+        msg = case e
+        when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
+          ERROR_500_RESPONSE
+        when HttpParserError # try to tell the client they're bad
+          ERROR_400_RESPONSE
+        else
+          G.logger.error "Read error: #{e.inspect}"
+          G.logger.error e.backtrace.join("\n")
+          ERROR_500_RESPONSE
+        end
+        write(msg)
+        ensure
+          @state = :close
+      end
+
+      def app_call
+        @input.rewind
+        @env[RACK_INPUT] = @input
+        @env[REMOTE_ADDR] = @remote_addr
+        response = G.app.call(@env.update(RACK_DEFAULTS))
+        alive = @hp.keepalive? && G.alive
+        out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers?
+        HttpResponse.write(self, response, out)
+        if alive
+          @env.clear
+          @hp.reset
+          @state = :headers
+        else
+          @state = :close
+        end
+        rescue Object => e
+          handle_error(e)
+      end
+
+      def on_write_complete
+        :close == @state and close
+      end
+
+      def on_close
+        G.nr -= 1
+      end
+
+      def tmpio
+        io = Util.tmpio
+        def io.size
+          # already sync=true at creation, so no need to flush before stat
+          stat.size
+        end
+        io
+      end
+
+      # TeeInput doesn't map too well to this right now...
+      def on_read(data)
+        case @state
+        when :headers
+          @hp.headers(@env, @buf << data) or return
+          @state = :body
+          len = @hp.content_length
+          if len == 0
+            @input = HttpRequest::NULL_IO
+            app_call # common case
+          else # nil or len > 0
+            # since we don't do streaming input, we have no choice but
+            # to take over 100-continue handling from the Rack application
+            if @env[HTTP_EXPECT] =~ /\A100-continue\z/i
+              write(EXPECT_100_RESPONSE)
+              @env.delete(HTTP_EXPECT)
+            end
+            @input = len && len <= MAX_BODY ? StringIO.new("") : tmpio
+            @hp.filter_body(@buf2 = @buf.dup, @buf)
+            @input << @buf2
+            on_read("")
+          end
+        when :body
+          if @hp.body_eof?
+            @state = :trailers
+            on_read(data)
+          elsif data.size > 0
+            @hp.filter_body(@buf2, @buf << data)
+            @input << @buf2
+            on_read("")
+          end
+        when :trailers
+          @hp.trailers(@env, @buf << data) and app_call
+        end
+      end
+    end
+
+    class Server < ::Rev::IO
+      G = Rainbows::Rev::G
+
+      def on_readable
+        return if G.nr >= G.max
+        begin
+          Client.new(@_io.accept_nonblock).attach(::Rev::Loop.default)
+        rescue Errno::EAGAIN, Errno::ECONNBORTED
+        end
+      end
+
+    end
+
+    # runs inside each forked worker, this sits around and waits
+    # for connections and doesn't die until the parent dies (or is
+    # given a INT, QUIT, or TERM signal)
+    def worker_loop(worker)
+      init_worker_process(worker)
+      G.nr = 0
+      G.max = worker_connections
+      G.alive = true
+      G.logger = logger
+      G.app = app
+      LISTENERS.map! { |s| Server.new(s).attach(::Rev::Loop.default) }
+      ::Rev::Loop.default.run
+    end
+
+  end
+end
diff --git a/t/bin/content-md5-put b/t/bin/content-md5-put
new file mode 100755
index 0000000..c35c92c
--- /dev/null
+++ b/t/bin/content-md5-put
@@ -0,0 +1,36 @@
+#!/usr/bin/env ruby
+# simple chunked HTTP PUT request generator (and just that),
+# it reads stdin and writes to stdout so socat can write to a
+# UNIX or TCP socket (or to another filter or file) along with
+# a Content-MD5 trailer.
+# -*- encoding: binary -*-
+require 'digest/md5'
+$stdout.sync = $stderr.sync = true
+$stdout.binmode
+$stdin.binmode
+
+bs = ENV['bs'] ? ENV['bs'].to_i : 4096
+
+if ARGV.grep("--no-headers").empty?
+  $stdout.write(
+      "PUT / HTTP/1.1\r\n" \
+      "Host: example.com\r\n" \
+      "Transfer-Encoding: chunked\r\n" \
+      "Trailer: Content-MD5\r\n" \
+      "\r\n"
+    )
+end
+
+digest = Digest::MD5.new
+if buf = $stdin.read(bs)
+  begin
+    digest.update(buf)
+    $stdout.write("%x\r\n" % [ buf.size ])
+    $stdout.write(buf)
+    $stdout.write("\r\n")
+  end while $stdin.read(bs, buf)
+end
+
+digest = [ digest.digest ].pack('m').strip
+$stdout.write("0\r\n")
+$stdout.write("Content-MD5: #{digest}\r\n\r\n")
diff --git a/t/content-md5.ru b/t/content-md5.ru
new file mode 100644
index 0000000..e3ce4d3
--- /dev/null
+++ b/t/content-md5.ru
@@ -0,0 +1,23 @@
+# SHA1 checksum generator
+bs = ENV['bs'] ? ENV['bs'].to_i : 4096
+require 'digest/md5'
+use Rack::ContentLength
+app = lambda do |env|
+  /\A100-continue\z/i =~ env['HTTP_EXPECT'] and
+    return [ 100, {}, [] ]
+  digest = Digest::MD5.new
+  input = env['rack.input']
+  if buf = input.read(bs)
+    begin
+      digest.update(buf)
+    end while input.read(bs, buf)
+  end
+
+  expect = env['HTTP_CONTENT_MD5']
+  readed = [ digest.digest ].pack('m').strip
+  body = "expect=#{expect}\nreaded=#{readed}\n"
+  status = expect == readed ? 200 : 500
+
+  [ status, {'Content-Type' => 'text/plain'}, [ body ] ]
+end
+run app
diff --git a/t/t4000-rev-basic.sh b/t/t4000-rev-basic.sh
new file mode 100755
index 0000000..e5cfcad
--- /dev/null
+++ b/t/t4000-rev-basic.sh
@@ -0,0 +1,51 @@
+#!/bin/sh
+. ./test-lib.sh
+require_rev
+
+eval $(unused_listen)
+rtmpfiles unicorn_config pid r_err r_out tmp fifo ok
+rm -f $fifo
+mkfifo $fifo
+
+nr_client=30
+
+cat > $unicorn_config <<EOF
+listen "$listen"
+pid "$pid"
+stderr_path "$r_err"
+stdout_path "$r_out"
+Rainbows! do
+  use :Rev
+  worker_connections 50
+end
+EOF
+
+rainbows -D t4000.ru -c $unicorn_config
+wait_for_pid $pid
+
+echo "single request"
+curl -sSfv http://$listen/
+
+echo "two requests with keepalive"
+curl -sSfv http://$listen/a http://$listen/b > $tmp 2>&1
+grep 'Re-using existing connection' < $tmp
+
+echo "pipelining partial requests"
+req='GET / HTTP/1.1\r\nHost: example.com\r\n'
+(
+        printf "$req"'\r\n'"$req"
+        cat $fifo > $tmp &
+        sleep 1
+        printf 'Connection: close\r\n\r\n'
+        wait
+        echo ok > $ok
+) | socat - TCP:$listen > $fifo
+
+kill $(cat $pid)
+
+test 2 -eq $(grep '^HTTP/1.1' $tmp | wc -l)
+test 2 -eq $(grep '^HTTP/1.1 200 OK' $tmp | wc -l)
+test 1 -eq $(grep '^Connection: keep-alive' $tmp | wc -l)
+test 1 -eq $(grep '^Connection: close' $tmp | wc -l)
+test x"$(cat $ok)" = xok
+! grep Error $r_err
diff --git a/t/t4000.ru b/t/t4000.ru
new file mode 100644
index 0000000..c2355da
--- /dev/null
+++ b/t/t4000.ru
@@ -0,0 +1,3 @@
+use Rack::ContentLength
+use Rack::ContentType
+run lambda { |env| [ 200, {}, [ env.inspect << "\n" ] ] }
diff --git a/t/t4002-rev-graceful.sh b/t/t4002-rev-graceful.sh
new file mode 100755
index 0000000..e286886
--- /dev/null
+++ b/t/t4002-rev-graceful.sh
@@ -0,0 +1,52 @@
+#!/bin/sh
+. ./test-lib.sh
+require_rev
+
+eval $(unused_listen)
+rtmpfiles unicorn_config tmp pid r_err r_out out
+nr_client=10
+cat > $unicorn_config <<EOF
+listen "$listen"
+stderr_path "$r_err"
+stdout_path "$r_out"
+pid "$pid"
+Rainbows! do
+  use :Rev
+end
+EOF
+
+rainbows -D sleep.ru -c $unicorn_config
+wait_for_pid $pid
+
+for i in $(awk "BEGIN{for(i=0;i<$nr_client;++i) print i}" </dev/null)
+do
+        (
+                rtmpfiles fifo tmp
+                rm -f $fifo
+                mkfifo $fifo
+                (
+                        printf 'GET /0 HTTP/1.1\r\n'
+                        cat $fifo > $tmp &
+                        sleep 1
+                        printf 'Host: example.com\r\n'
+                        sleep 1
+                        printf 'Connection: close\r\n'
+                        sleep 1
+                        printf '\r\n'
+                        wait
+                ) | socat - TCP:$listen > $fifo
+                fgrep 'Hello' $tmp >> $out || :
+                rm -f $fifo $tmp
+        ) &
+done
+
+sleep 2 # potentially racy :<
+kill -QUIT $(cat $pid)
+wait
+
+test x"$(wc -l < $out)" = x$nr_client
+nr=$(sort < $out | uniq | wc -l)
+test "$nr" -eq 1
+
+test x$(sort < $out | uniq) = xHello
+! grep Error $r_err
diff --git a/t/t4100-rev-rack-input.sh b/t/t4100-rev-rack-input.sh
new file mode 100755
index 0000000..2a37fed
--- /dev/null
+++ b/t/t4100-rev-rack-input.sh
@@ -0,0 +1,44 @@
+#!/bin/sh
+nr_client=${nr_client-25}
+nr=${nr-50}
+
+. ./test-lib.sh
+require_rev
+test -r random_blob || die "random_blob required, run with 'make $0'"
+
+eval $(unused_listen)
+rtmpfiles unicorn_config curl_out curl_err r_err r_out pid
+
+cat > $unicorn_config <<EOF
+listen "$listen"
+pid "$pid"
+stderr_path "$r_err"
+stdout_path "$r_out"
+Rainbows! do
+  use :Rev
+  worker_connections $nr
+end
+EOF
+
+echo pid=$pid
+rainbows -D sha1.ru -c $unicorn_config
+wait_for_pid $pid
+
+start=$(date +%s)
+for i in $(awk "BEGIN{for(i=0;i<$nr_client;++i) print i}" </dev/null)
+do
+        (
+                curl -sSf -T- http://$listen/$i \
+                  < random_blob >> $curl_out 2>> $curl_err
+        ) &
+done
+wait
+echo elapsed=$(( $(date +%s) - $start ))
+
+kill $(cat $pid)
+test $nr_client -eq $(wc -l < $curl_out)
+test 1 -eq $(sort < $curl_out | uniq | wc -l)
+blob_sha1=$( expr "$(sha1sum < random_blob)" : '\([a-f0-9]\+\)')
+echo blob_sha1=$blob_sha1
+test x"$blob_sha1" = x"$(sort < $curl_out | uniq)"
+! grep Error $r_err
diff --git a/t/t4101-rev-rack-input-trailer.sh b/t/t4101-rev-rack-input-trailer.sh
new file mode 100755
index 0000000..9dffc43
--- /dev/null
+++ b/t/t4101-rev-rack-input-trailer.sh
@@ -0,0 +1,51 @@
+#!/bin/sh
+nr_client=${nr_client-25}
+nr=${nr-50}
+
+. ./test-lib.sh
+require_rev
+test -r random_blob || die "random_blob required, run with 'make $0'"
+
+eval $(unused_listen)
+rtmpfiles unicorn_config tmp r_err r_out pid fifo ok
+rm -f $fifo
+mkfifo $fifo
+
+cat > $unicorn_config <<EOF
+listen "$listen"
+pid "$pid"
+stderr_path "$r_err"
+stdout_path "$r_out"
+Rainbows! do
+  use :Rev
+end
+EOF
+
+rainbows -D content-md5.ru -c $unicorn_config
+wait_for_pid $pid
+
+echo "small blob"
+(
+        echo hello world | content-md5-put
+        cat $fifo > $tmp &
+        wait
+        echo ok > $ok
+) | socat - TCP:$listen | tee $fifo
+
+fgrep 'HTTP/1.1 200 OK' $tmp
+test xok = x"$(cat $ok)"
+! grep Error $r_err
+
+
+echo "big blob"
+(
+        content-md5-put < random_blob
+        cat $fifo > $tmp &
+        wait
+        echo ok > $ok
+) | socat - TCP:$listen | tee $fifo
+
+fgrep 'HTTP/1.1 200 OK' $tmp
+test xok = x"$(cat $ok)"
+! grep Error $r_err
+kill $(cat $pid)
diff --git a/t/test-lib.sh b/t/test-lib.sh
index d278329..26adfc9 100644
--- a/t/test-lib.sh
+++ b/t/test-lib.sh
@@ -42,6 +42,14 @@ require_revactor () {
         fi
 }
 
+require_rev() {
+        if ! $ruby -rrev -e "puts Rev::VERSION" >/dev/null 2>&1
+        then
+                echo >&2 "skipping $T since we don't have Rev"
+                exit 0
+        fi
+}
+
 # given a list of variable names, create temporary files and assign
 # the pathnames to those variables
 rtmpfiles () {