diff options
author | Eric Wong <normalperson@yhbt.net> | 2010-07-29 07:05:48 +0000 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2010-07-29 08:04:03 +0000 |
commit | 5479b15c766204e31495e87a64fa689141cc38a3 (patch) | |
tree | 022b17ec84c6ceebc95b5cdf6088878245bf12e5 /lib/rainbows/revactor | |
parent | f309cfaf70cbffd7a39208da869e47784e4cb41b (diff) | |
download | rainbows-5479b15c766204e31495e87a64fa689141cc38a3.tar.gz |
Proxying regular Ruby IO objects while Revactor is in use is highly suboptimal, so proxy it with an Actor-aware wrapper for better scheduling.
Diffstat (limited to 'lib/rainbows/revactor')
-rw-r--r-- | lib/rainbows/revactor/proxy.rb | 55 |
1 files changed, 55 insertions, 0 deletions
diff --git a/lib/rainbows/revactor/proxy.rb b/lib/rainbows/revactor/proxy.rb new file mode 100644 index 0000000..a7d3be1 --- /dev/null +++ b/lib/rainbows/revactor/proxy.rb @@ -0,0 +1,55 @@ +# -*- encoding: binary -*- +# :enddoc: +# Generic IO wrapper for proxying pipe and socket objects +# this behaves more like Rainbows::Fiber::IO than anything, +# making it highly suitable for proxying data from pipes/sockets +class Rainbows::Revactor::Proxy < Rev::IO + def initialize(io) + @receiver = Actor.current + super(io) + attach(Rev::Loop.default) + end + + def close + if @_io + super + @_io = nil + end + end + + def each(&block) + # when yield-ing, Revactor::TCP#write may raise EOFError + # (instead of Errno::EPIPE), so we need to limit the rescue + # to just readpartial and let EOFErrors during yield bubble up + begin + buf = readpartial(INPUT_SIZE) + rescue EOFError + break + end while yield(buf) || true + end + + # this may return more than the specified length, Rainbows! won't care... + def readpartial(length) + @receiver = Actor.current + enable if attached? && ! enabled? + + Actor.receive do |filter| + filter.when(T[:rainbows_io_input, self]) do |_, _, data| + return data + end + + filter.when(T[:rainbows_io_closed, self]) do + raise EOFError, "connection closed" + end + end + end + + def on_close + @receiver << T[:rainbows_io_closed, self] + end + + def on_read(data) + @receiver << T[:rainbows_io_input, self, data ] + disable + end +end |