From 8b76d98f80b5ab231634f6e0253a2d71c41e02f8 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 17 Dec 2015 03:57:38 +0000 Subject: replicate: avoid buffered IO on reads Perl buffered IO is only reading 8K at a time (or only 4K on older versions!) despite us requesting to read in 1MB chunks. This wastes syscalls and can affect TCP window scaling when MogileFS is replicating across long fat networks (LFN). While we're at it, this fixes a long-standing FIXME item to perform proper timeouts when reading headers as we're forced to do sysread instead of line-buffered I/O. ref: https://rt.perl.org/Public/Bug/Display.html?id=126403 (and confirmed by strace-ing replication workers) --- lib/MogileFS/Worker/Replicate.pm | 78 +++++++++++++++++++++++++++------------- 1 file changed, 53 insertions(+), 25 deletions(-) diff --git a/lib/MogileFS/Worker/Replicate.pm b/lib/MogileFS/Worker/Replicate.pm index f539710..363f20f 100644 --- a/lib/MogileFS/Worker/Replicate.pm +++ b/lib/MogileFS/Worker/Replicate.pm @@ -9,7 +9,7 @@ use fields ( use List::Util (); use MogileFS::Server; -use MogileFS::Util qw(error every debug); +use MogileFS::Util qw(error every debug wait_for_readability); use MogileFS::Config; use MogileFS::ReplicationRequest qw(rr_upgrade); use Digest; @@ -25,6 +25,7 @@ sub new { # replicator wants sub watchdog_timeout { 90; } +use constant SOCK_TIMEOUT => 45; sub work { my $self = shift; @@ -530,20 +531,32 @@ sub replicate { # keep => boolean, whether to keep the connection after reading # len => value of the Content-Length header (integer) # } +# Returns undef on timeout sub read_headers { - my ($sock) = @_; - my %rv = (); - # FIXME: this can block. needs to timeout. - my $line = <$sock>; - return unless defined $line; - $line =~ m!\AHTTP/(\d+\.\d+)\s+(\d+)! or return; - $rv{keep} = $1 >= 1.1; - $rv{code} = $2; + my ($sock, $intercopy_cb) = @_; + my $head = ''; - while (1) { - $line = <$sock>; - return unless defined $line; - last if $line =~ /\A\r?\n\z/; + do { + wait_for_readability(fileno($sock), SOCK_TIMEOUT) or return; + $intercopy_cb->(); + my $r = sysread($sock, $head, 1024, length($head)); + if (defined $r) { + return if $r == 0; # EOF + } elsif ($!{EAGAIN} || $!{EINTR}) { + # loop again + } else { + return; + } + } until ($head =~ /\r?\n\r?\n/); + + my $data; + ($head, $data) = split(/\r?\n\r?\n/, $head, 2); + my @head = split(/\r?\n/, $head); + $head = shift(@head); + $head =~ m!\AHTTP/(\d+\.\d+)\s+(\d+)! or return; + my %rv = ( keep => $1 >= 1.1, code => $2 ); + + foreach my $line (@head) { if ($line =~ /\AConnection:\s*keep-alive\s*\z/is) { $rv{keep} = 1; } elsif ($line =~ /\AConnection:\s*close\s*\z/is) { @@ -552,7 +565,7 @@ sub read_headers { $rv{len} = $1; } } - return \%rv; + return (\%rv, $data); } # copies a file from one Perlbal to another utilizing HTTP @@ -652,10 +665,10 @@ sub http_copy { # plugin set a custom host. $get .= "Host: $shttphost\r\n" if $shttphost; - my $data = ''; my ($sock, $dsock); my ($wcount, $bytes_to_read, $written, $remain); my ($stries, $dtries) = (0, 0); + my ($sres, $data, $bytes); retry: $sconn->close("retrying") if $sconn; @@ -671,7 +684,7 @@ retry: } # we just want a content length - my $sres = read_headers($sock); + ($sres, $data) = read_headers($sock, $intercopy_cb); unless ($sres) { goto retry if $sconn->retryable && $stries == 1; return $error_unreachable->("Error: Resource $surl failed to return an HTTP response"); @@ -696,18 +709,26 @@ retry: } # now read data and print while we're reading. + $bytes = length($data); ($written, $remain) = (0, $clen); $bytes_to_read = 1024*1024; # read 1MB at a time until there's less than that remaining $bytes_to_read = $remain if $remain < $bytes_to_read; $wcount = 0; while ($bytes_to_read) { - my $bytes = $sock->read($data, $bytes_to_read); unless (defined $bytes) { - return $src_error->("error reading midway through source: $!"); - } - if ($bytes == 0) { - return $src_error->("EOF reading midway through source: $!"); +read_again: + $bytes = sysread($sock, $data, $bytes_to_read); + unless (defined $bytes) { + if ($!{EAGAIN} || $!{EINTR}) { + wait_for_readability(fileno($sock), SOCK_TIMEOUT) and + goto read_again; + } + return $src_error->("error reading midway through source: $!"); + } + if ($bytes == 0) { + return $src_error->("EOF reading midway through source: $!"); + } } # now we've read in $bytes bytes @@ -716,6 +737,7 @@ retry: $digest->add($data) if $digest; my $data_len = $bytes; + $bytes = undef; my $data_off = 0; while (1) { my $wbytes = syswrite($dsock, $data, $data_len, $data_off); @@ -757,7 +779,7 @@ retry: } # now read in the response line (should be first line) - my $dres = read_headers($dsock); + my ($dres, $ddata) = read_headers($dsock, $intercopy_cb); unless ($dres) { goto retry if (!$wcount && $dconn->retryable && $dtries == 1); return $dest_error->("Error: HTTP response line not recognized writing to $durl"); @@ -765,10 +787,11 @@ retry: # drain the response body if there is one # there may be no dres->{len}/Content-Length if there is no body - if ($dres->{len}) { - my $r = $dsock->read($data, $dres->{len}); # dres->{len} should be tiny + my $dlen = ($dres->{len} || 0) - length($ddata); + if ($dlen > 0) { + my $r = $dsock->read($data, $dlen); # dres->{len} should be tiny if (defined $r) { - if ($r != $dres->{len}) { + if ($r != $dlen) { Mgd::error("Failed to read $r of Content-Length:$dres->{len} bytes for PUT response on $durl"); $dres->{keep} = 0; } @@ -776,6 +799,11 @@ retry: Mgd::error("Failed to read Content-Length:$dres->{len} bytes for PUT response on $durl ($!)"); $dres->{keep} = 0; } + } elsif ($dlen < 0) { + Mgd::error("strange response Content-Length:$dres->{len} with ". + length($ddata) . + " extra bytes for PUT response on $durl ($!)"); + $dres->{keep} = 0; } # return the connection back to the connection pool -- cgit v1.2.3-24-ge0c7