From 15631717fce044fbad2f386a7b1c7daf4bdd83d2 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 21 Oct 2010 16:25:39 -0700 Subject: code shuffling for kgio Despite the large number of changes, most of it is code movement here. --- lib/rainbows/fiber/io.rb | 232 +++++++++++++++++++++++++---------------------- 1 file changed, 124 insertions(+), 108 deletions(-) (limited to 'lib/rainbows/fiber/io.rb') diff --git a/lib/rainbows/fiber/io.rb b/lib/rainbows/fiber/io.rb index 571f070..711d95e 100644 --- a/lib/rainbows/fiber/io.rb +++ b/lib/rainbows/fiber/io.rb @@ -1,117 +1,133 @@ # -*- encoding: binary -*- -module Rainbows - module Fiber - - # A partially complete IO wrapper, this exports an IO.select()-able - # #to_io method and gives users the illusion of a synchronous - # interface that yields away from the current Fiber whenever - # the underlying IO object cannot read or write - # - # TODO: subclass off IO and include Kgio::SocketMethods instead - class IO < Struct.new(:to_io, :f) - # :stopdoc: - LOCALHOST = Kgio::LOCALHOST - - # needed to write errors with - def write_nonblock(buf) - to_io.write_nonblock(buf) - end - - def kgio_addr - to_io.kgio_addr - end - - # for wrapping output response bodies - def each(&block) - if buf = readpartial(16384) - yield buf - yield buf while readpartial(16384, buf) + +# A Fiber-aware IO class, gives users the illusion of a synchronous +# interface that yields away from the current Fiber whenever +# the underlying descriptor is blocked on reads or write +# +# This is a stable, legacy interface and should be preserved for all +# future versions of Rainbows! However, new apps should use +# Rainbows::Fiber::IO::Socket or Rainbows::Fiber::IO::Pipe instead. + +class Rainbows::Fiber::IO + attr_accessor :to_io + + # :stopdoc: + class << self + alias :[] :new + end + # :startdoc: + + # needed to write errors with + def write_nonblock(buf) + @to_io.write_nonblock(buf) + end + + def kgio_addr + @to_io.kgio_addr + end + + # for wrapping output response bodies + def each(&block) + buf = readpartial(16384) + yield buf + yield buf while readpartial(16384, buf) + rescue EOFError + self + end + + def closed? + @to_io.closed? + end + + def fileno + @to_io.fileno + end + + def write(buf) + if @to_io.respond_to?(:kgio_trywrite) + begin + case rv = @to_io.kgio_trywrite(buf) + when nil + return + when String + buf = rv + when Kgio::WaitWritable + wait_writable end - rescue EOFError - self - end - - def close - fileno = to_io.fileno - RD[fileno] = WR[fileno] = nil - to_io.close unless to_io.closed? - end - - def closed? - to_io.closed? - end - - def wait_readable - fileno = to_io.fileno - RD[fileno] = self - ::Fiber.yield - RD[fileno] = nil - end - - def wait_writable - fileno = to_io.fileno - WR[fileno] = self - ::Fiber.yield - WR[fileno] = nil - end - - def write(buf) - begin - case rv = to_io.kgio_trywrite(buf) - when nil - return - when String - buf = rv - when Kgio::WaitWritable - wait_writable - end - end while true - end - - # used for reading headers (respecting keepalive_timeout) - def read_timeout - expire = nil - begin - to_io.read_nonblock(16384) - rescue Errno::EAGAIN - return if expire && expire < Time.now - expire ||= Time.now + G.kato + end while true + else + begin + (rv = @to_io.write_nonblock(buf)) == buf.bytesize and return + buf = byte_slice(buf, rv..-1) + rescue Errno::EAGAIN + wait_writable + end while true + end + end + + def byte_slice(buf, range) # :nodoc: + if buf.encoding != Encoding::BINARY + buf.dup.force_encoding(Encoding::BINARY)[range] + else + buf[range] + end + end + + # used for reading headers (respecting keepalive_timeout) + def read_timeout + expire = nil + begin + return @to_io.read_nonblock(16384) + rescue Errno::EAGAIN + return if expire && expire < Time.now + expire ||= Time.now + G.kato + wait_readable + end while true + end + + def readpartial(length, buf = "") + if @to_io.respond_to?(:kgio_tryread) + begin + rv = @to_io.kgio_tryread(length, buf) + case rv + when nil + raise EOFError, "end of file reached", [] + when Kgio::WaitReadable wait_readable - retry - end - end - - def readpartial(length, buf = "") - if to_io.respond_to?(:kgio_tryread) - # TODO: use kgio_read! - begin - rv = to_io.kgio_tryread(length, buf) - case rv - when nil - raise EOFError, "end of file reached", [] - when Kgio::WaitReadable - wait_readable - else - return rv - end - end while true else - begin - to_io.read_nonblock(length, buf) - rescue Errno::EAGAIN - wait_readable - retry - end + return rv end - end + end while true + else + begin + return @to_io.read_nonblock(length, buf) + rescue Errno::EAGAIN + wait_readable + end while true + end + end - def kgio_read(*args) - to_io.kgio_read(*args) - end + def kgio_read(*args) + @to_io.kgio_read(*args) + end - def kgio_read!(*args) - to_io.kgio_read!(*args) - end - end + def kgio_read!(*args) + @to_io.kgio_read!(*args) end + + def kgio_trywrite(*args) + @to_io.kgio_trywrite(*args) + end + + autoload :Socket, 'rainbows/fiber/io/socket' + autoload :Pipe, 'rainbows/fiber/io/pipe' end + +# :stopdoc: +require 'rainbows/fiber/io/methods' +require 'rainbows/fiber/io/compat' +Rainbows::Client.__send__(:include, Rainbows::Fiber::IO::Methods) +Rainbows::Fiber::IO.__send__(:include, Rainbows::Fiber::IO::Compat) +Rainbows::Fiber::IO.__send__(:include, Rainbows::Fiber::IO::Methods) +Kgio.wait_readable = :wait_readable +Kgio.wait_writable = :wait_writable -- cgit v1.2.3-24-ge0c7