diff options
-rw-r--r-- | lib/rainbows.rb | 1 | ||||
-rw-r--r-- | lib/rainbows/rev.rb | 150 | ||||
-rwxr-xr-x | t/bin/content-md5-put | 36 | ||||
-rw-r--r-- | t/content-md5.ru | 23 | ||||
-rwxr-xr-x | t/t4000-rev-basic.sh | 51 | ||||
-rw-r--r-- | t/t4000.ru | 3 | ||||
-rwxr-xr-x | t/t4002-rev-graceful.sh | 52 | ||||
-rwxr-xr-x | t/t4100-rev-rack-input.sh | 44 | ||||
-rwxr-xr-x | t/t4101-rev-rack-input-trailer.sh | 51 | ||||
-rw-r--r-- | t/test-lib.sh | 8 |
10 files changed, 419 insertions, 0 deletions
diff --git a/lib/rainbows.rb b/lib/rainbows.rb index 94246f5..c2813a5 100644 --- a/lib/rainbows.rb +++ b/lib/rainbows.rb @@ -47,6 +47,7 @@ module Rainbows :Revactor => 50, :ThreadSpawn => 30, :ThreadPool => 10, + :Rev => 50, }.each do |model, _| u = model.to_s.gsub(/([a-z0-9])([A-Z0-9])/) { "#{$1}_#{$2.downcase!}" } autoload model, "rainbows/#{u.downcase!}" diff --git a/lib/rainbows/rev.rb b/lib/rainbows/rev.rb new file mode 100644 index 0000000..8cd76ae --- /dev/null +++ b/lib/rainbows/rev.rb @@ -0,0 +1,150 @@ +# -*- encoding: binary -*- +require 'rev' + +# workaround revactor 0.1.4 still using the old Rev::Buffer +# ref: http://rubyforge.org/pipermail/revactor-talk/2009-October/000034.html +defined?(Rev::Buffer) or Rev::Buffer = IO::Buffer + +module Rainbows + + module Rev + + # global vars because class/instance variables are confusing me :< + # this struct is only accessed inside workers and thus private to each + G = Struct.new(:nr, :max, :logger, :alive, :app).new + + include Base + + class Client < ::Rev::IO + include Unicorn + include Rainbows::Const + G = Rainbows::Rev::G + + def initialize(io) + G.nr += 1 + super(io) + @remote_addr = ::TCPSocket === io ? io.peeraddr.last : LOCALHOST + @env = {} + @hp = HttpParser.new + @state = :headers # [ :body [ :trailers ] ] :app_call :close + @buf = "" + end + + def handle_error(e) + msg = case e + when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF + ERROR_500_RESPONSE + when HttpParserError # try to tell the client they're bad + ERROR_400_RESPONSE + else + G.logger.error "Read error: #{e.inspect}" + G.logger.error e.backtrace.join("\n") + ERROR_500_RESPONSE + end + write(msg) + ensure + @state = :close + end + + def app_call + @input.rewind + @env[RACK_INPUT] = @input + @env[REMOTE_ADDR] = @remote_addr + response = G.app.call(@env.update(RACK_DEFAULTS)) + alive = @hp.keepalive? && G.alive + out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers? + HttpResponse.write(self, response, out) + if alive + @env.clear + @hp.reset + @state = :headers + else + @state = :close + end + rescue Object => e + handle_error(e) + end + + def on_write_complete + :close == @state and close + end + + def on_close + G.nr -= 1 + end + + def tmpio + io = Util.tmpio + def io.size + # already sync=true at creation, so no need to flush before stat + stat.size + end + io + end + + # TeeInput doesn't map too well to this right now... + def on_read(data) + case @state + when :headers + @hp.headers(@env, @buf << data) or return + @state = :body + len = @hp.content_length + if len == 0 + @input = HttpRequest::NULL_IO + app_call # common case + else # nil or len > 0 + # since we don't do streaming input, we have no choice but + # to take over 100-continue handling from the Rack application + if @env[HTTP_EXPECT] =~ /\A100-continue\z/i + write(EXPECT_100_RESPONSE) + @env.delete(HTTP_EXPECT) + end + @input = len && len <= MAX_BODY ? StringIO.new("") : tmpio + @hp.filter_body(@buf2 = @buf.dup, @buf) + @input << @buf2 + on_read("") + end + when :body + if @hp.body_eof? + @state = :trailers + on_read(data) + elsif data.size > 0 + @hp.filter_body(@buf2, @buf << data) + @input << @buf2 + on_read("") + end + when :trailers + @hp.trailers(@env, @buf << data) and app_call + end + end + end + + class Server < ::Rev::IO + G = Rainbows::Rev::G + + def on_readable + return if G.nr >= G.max + begin + Client.new(@_io.accept_nonblock).attach(::Rev::Loop.default) + rescue Errno::EAGAIN, Errno::ECONNBORTED + end + end + + end + + # runs inside each forked worker, this sits around and waits + # for connections and doesn't die until the parent dies (or is + # given a INT, QUIT, or TERM signal) + def worker_loop(worker) + init_worker_process(worker) + G.nr = 0 + G.max = worker_connections + G.alive = true + G.logger = logger + G.app = app + LISTENERS.map! { |s| Server.new(s).attach(::Rev::Loop.default) } + ::Rev::Loop.default.run + end + + end +end diff --git a/t/bin/content-md5-put b/t/bin/content-md5-put new file mode 100755 index 0000000..c35c92c --- /dev/null +++ b/t/bin/content-md5-put @@ -0,0 +1,36 @@ +#!/usr/bin/env ruby +# simple chunked HTTP PUT request generator (and just that), +# it reads stdin and writes to stdout so socat can write to a +# UNIX or TCP socket (or to another filter or file) along with +# a Content-MD5 trailer. +# -*- encoding: binary -*- +require 'digest/md5' +$stdout.sync = $stderr.sync = true +$stdout.binmode +$stdin.binmode + +bs = ENV['bs'] ? ENV['bs'].to_i : 4096 + +if ARGV.grep("--no-headers").empty? + $stdout.write( + "PUT / HTTP/1.1\r\n" \ + "Host: example.com\r\n" \ + "Transfer-Encoding: chunked\r\n" \ + "Trailer: Content-MD5\r\n" \ + "\r\n" + ) +end + +digest = Digest::MD5.new +if buf = $stdin.read(bs) + begin + digest.update(buf) + $stdout.write("%x\r\n" % [ buf.size ]) + $stdout.write(buf) + $stdout.write("\r\n") + end while $stdin.read(bs, buf) +end + +digest = [ digest.digest ].pack('m').strip +$stdout.write("0\r\n") +$stdout.write("Content-MD5: #{digest}\r\n\r\n") diff --git a/t/content-md5.ru b/t/content-md5.ru new file mode 100644 index 0000000..e3ce4d3 --- /dev/null +++ b/t/content-md5.ru @@ -0,0 +1,23 @@ +# SHA1 checksum generator +bs = ENV['bs'] ? ENV['bs'].to_i : 4096 +require 'digest/md5' +use Rack::ContentLength +app = lambda do |env| + /\A100-continue\z/i =~ env['HTTP_EXPECT'] and + return [ 100, {}, [] ] + digest = Digest::MD5.new + input = env['rack.input'] + if buf = input.read(bs) + begin + digest.update(buf) + end while input.read(bs, buf) + end + + expect = env['HTTP_CONTENT_MD5'] + readed = [ digest.digest ].pack('m').strip + body = "expect=#{expect}\nreaded=#{readed}\n" + status = expect == readed ? 200 : 500 + + [ status, {'Content-Type' => 'text/plain'}, [ body ] ] +end +run app diff --git a/t/t4000-rev-basic.sh b/t/t4000-rev-basic.sh new file mode 100755 index 0000000..e5cfcad --- /dev/null +++ b/t/t4000-rev-basic.sh @@ -0,0 +1,51 @@ +#!/bin/sh +. ./test-lib.sh +require_rev + +eval $(unused_listen) +rtmpfiles unicorn_config pid r_err r_out tmp fifo ok +rm -f $fifo +mkfifo $fifo + +nr_client=30 + +cat > $unicorn_config <<EOF +listen "$listen" +pid "$pid" +stderr_path "$r_err" +stdout_path "$r_out" +Rainbows! do + use :Rev + worker_connections 50 +end +EOF + +rainbows -D t4000.ru -c $unicorn_config +wait_for_pid $pid + +echo "single request" +curl -sSfv http://$listen/ + +echo "two requests with keepalive" +curl -sSfv http://$listen/a http://$listen/b > $tmp 2>&1 +grep 'Re-using existing connection' < $tmp + +echo "pipelining partial requests" +req='GET / HTTP/1.1\r\nHost: example.com\r\n' +( + printf "$req"'\r\n'"$req" + cat $fifo > $tmp & + sleep 1 + printf 'Connection: close\r\n\r\n' + wait + echo ok > $ok +) | socat - TCP:$listen > $fifo + +kill $(cat $pid) + +test 2 -eq $(grep '^HTTP/1.1' $tmp | wc -l) +test 2 -eq $(grep '^HTTP/1.1 200 OK' $tmp | wc -l) +test 1 -eq $(grep '^Connection: keep-alive' $tmp | wc -l) +test 1 -eq $(grep '^Connection: close' $tmp | wc -l) +test x"$(cat $ok)" = xok +! grep Error $r_err diff --git a/t/t4000.ru b/t/t4000.ru new file mode 100644 index 0000000..c2355da --- /dev/null +++ b/t/t4000.ru @@ -0,0 +1,3 @@ +use Rack::ContentLength +use Rack::ContentType +run lambda { |env| [ 200, {}, [ env.inspect << "\n" ] ] } diff --git a/t/t4002-rev-graceful.sh b/t/t4002-rev-graceful.sh new file mode 100755 index 0000000..e286886 --- /dev/null +++ b/t/t4002-rev-graceful.sh @@ -0,0 +1,52 @@ +#!/bin/sh +. ./test-lib.sh +require_rev + +eval $(unused_listen) +rtmpfiles unicorn_config tmp pid r_err r_out out +nr_client=10 +cat > $unicorn_config <<EOF +listen "$listen" +stderr_path "$r_err" +stdout_path "$r_out" +pid "$pid" +Rainbows! do + use :Rev +end +EOF + +rainbows -D sleep.ru -c $unicorn_config +wait_for_pid $pid + +for i in $(awk "BEGIN{for(i=0;i<$nr_client;++i) print i}" </dev/null) +do + ( + rtmpfiles fifo tmp + rm -f $fifo + mkfifo $fifo + ( + printf 'GET /0 HTTP/1.1\r\n' + cat $fifo > $tmp & + sleep 1 + printf 'Host: example.com\r\n' + sleep 1 + printf 'Connection: close\r\n' + sleep 1 + printf '\r\n' + wait + ) | socat - TCP:$listen > $fifo + fgrep 'Hello' $tmp >> $out || : + rm -f $fifo $tmp + ) & +done + +sleep 2 # potentially racy :< +kill -QUIT $(cat $pid) +wait + +test x"$(wc -l < $out)" = x$nr_client +nr=$(sort < $out | uniq | wc -l) +test "$nr" -eq 1 + +test x$(sort < $out | uniq) = xHello +! grep Error $r_err diff --git a/t/t4100-rev-rack-input.sh b/t/t4100-rev-rack-input.sh new file mode 100755 index 0000000..2a37fed --- /dev/null +++ b/t/t4100-rev-rack-input.sh @@ -0,0 +1,44 @@ +#!/bin/sh +nr_client=${nr_client-25} +nr=${nr-50} + +. ./test-lib.sh +require_rev +test -r random_blob || die "random_blob required, run with 'make $0'" + +eval $(unused_listen) +rtmpfiles unicorn_config curl_out curl_err r_err r_out pid + +cat > $unicorn_config <<EOF +listen "$listen" +pid "$pid" +stderr_path "$r_err" +stdout_path "$r_out" +Rainbows! do + use :Rev + worker_connections $nr +end +EOF + +echo pid=$pid +rainbows -D sha1.ru -c $unicorn_config +wait_for_pid $pid + +start=$(date +%s) +for i in $(awk "BEGIN{for(i=0;i<$nr_client;++i) print i}" </dev/null) +do + ( + curl -sSf -T- http://$listen/$i \ + < random_blob >> $curl_out 2>> $curl_err + ) & +done +wait +echo elapsed=$(( $(date +%s) - $start )) + +kill $(cat $pid) +test $nr_client -eq $(wc -l < $curl_out) +test 1 -eq $(sort < $curl_out | uniq | wc -l) +blob_sha1=$( expr "$(sha1sum < random_blob)" : '\([a-f0-9]\+\)') +echo blob_sha1=$blob_sha1 +test x"$blob_sha1" = x"$(sort < $curl_out | uniq)" +! grep Error $r_err diff --git a/t/t4101-rev-rack-input-trailer.sh b/t/t4101-rev-rack-input-trailer.sh new file mode 100755 index 0000000..9dffc43 --- /dev/null +++ b/t/t4101-rev-rack-input-trailer.sh @@ -0,0 +1,51 @@ +#!/bin/sh +nr_client=${nr_client-25} +nr=${nr-50} + +. ./test-lib.sh +require_rev +test -r random_blob || die "random_blob required, run with 'make $0'" + +eval $(unused_listen) +rtmpfiles unicorn_config tmp r_err r_out pid fifo ok +rm -f $fifo +mkfifo $fifo + +cat > $unicorn_config <<EOF +listen "$listen" +pid "$pid" +stderr_path "$r_err" +stdout_path "$r_out" +Rainbows! do + use :Rev +end +EOF + +rainbows -D content-md5.ru -c $unicorn_config +wait_for_pid $pid + +echo "small blob" +( + echo hello world | content-md5-put + cat $fifo > $tmp & + wait + echo ok > $ok +) | socat - TCP:$listen | tee $fifo + +fgrep 'HTTP/1.1 200 OK' $tmp +test xok = x"$(cat $ok)" +! grep Error $r_err + + +echo "big blob" +( + content-md5-put < random_blob + cat $fifo > $tmp & + wait + echo ok > $ok +) | socat - TCP:$listen | tee $fifo + +fgrep 'HTTP/1.1 200 OK' $tmp +test xok = x"$(cat $ok)" +! grep Error $r_err +kill $(cat $pid) diff --git a/t/test-lib.sh b/t/test-lib.sh index d278329..26adfc9 100644 --- a/t/test-lib.sh +++ b/t/test-lib.sh @@ -42,6 +42,14 @@ require_revactor () { fi } +require_rev() { + if ! $ruby -rrev -e "puts Rev::VERSION" >/dev/null 2>&1 + then + echo >&2 "skipping $T since we don't have Rev" + exit 0 + fi +} + # given a list of variable names, create temporary files and assign # the pathnames to those variables rtmpfiles () { |