From: Tyler Riddle Date: Tue, 2 Oct 2012 23:07:25 +0000 (-0700) Subject: fix some non-blocking behavior but it's not right yet; log some signals X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=commitdiff_plain;h=bd20b1bfd1714aad8fd5c52450c8d8d84d0c8526;p=scpubgit%2FObject-Remote.git fix some non-blocking behavior but it's not right yet; log some signals --- diff --git a/lib/Object/Remote/Connection.pm b/lib/Object/Remote/Connection.pm index 2c688a6..02c874c 100644 --- a/lib/Object/Remote/Connection.pm +++ b/lib/Object/Remote/Connection.pm @@ -37,13 +37,15 @@ BEGIN { #in waitpid() $SIG{CHLD} = sub { my $kid; - log_debug { "CHLD signal handler is executing" }; + log_trace { "CHLD signal handler is executing" }; do { $kid = waitpid(-1, WNOHANG); - log_trace { "waitpid() returned '$kid'" }; + log_debug { "waitpid() returned '$kid'" }; } while $kid > 0; log_trace { "CHLD signal handler is done" }; - }; + }; + + $SIG{PIPE} = sub { log_debug { "Got a PIPE signal" } }; } END { @@ -109,7 +111,6 @@ after BUILD => sub { }; - has on_close => ( is => 'rw', default => sub { CPS::Future->new }, trigger => sub { @@ -141,7 +142,7 @@ sub _fail_outstanding { my ($self, $error) = @_; Dlog_debug { "Failing outstanding futures with '$error' for connection $_" } $self->_id; my $outstanding = $self->outstanding_futures; - $_->fail($error) for values %$outstanding; + $_->fail("$error\n") for values %$outstanding; %$outstanding = (); return; } diff --git a/lib/Object/Remote/MiniLoop.pm b/lib/Object/Remote/MiniLoop.pm index 9f615c4..a4a835c 100644 --- a/lib/Object/Remote/MiniLoop.pm +++ b/lib/Object/Remote/MiniLoop.pm @@ -193,7 +193,7 @@ sub loop_once { log_trace { "Checking timers" }; while (@$timers and $timers->[0][0] <= $now) { my $active = $timers->[0]; - Dlog_debug { "Found timer that needs to be executed: '$active'" }; + Dlog_trace { "Found timer that needs to be executed: '$active'" }; if (defined($active->[2])) { #handle the case of an 'every' timer diff --git a/lib/Object/Remote/ReadChannel.pm b/lib/Object/Remote/ReadChannel.pm index 6bfc369..f6a7cca 100644 --- a/lib/Object/Remote/ReadChannel.pm +++ b/lib/Object/Remote/ReadChannel.pm @@ -1,7 +1,7 @@ package Object::Remote::ReadChannel; use CPS::Future; -use Scalar::Util qw(weaken); +use Scalar::Util qw(weaken openhandle); use Object::Remote::Logging qw(:log :dlog); use POSIX; use Moo; @@ -58,11 +58,14 @@ sub DEMOLISH { my ($self, $gd) = @_; return if $gd; log_trace { "read channel is being demolished" }; + Object::Remote->current_loop ->unwatch_io( handle => $self->fh, on_read_ready => 1 ); + + } 1; diff --git a/lib/Object/Remote/Role/Connector.pm b/lib/Object/Remote/Role/Connector.pm index dac9939..d3b43c5 100644 --- a/lib/Object/Remote/Role/Connector.pm +++ b/lib/Object/Remote/Role/Connector.pm @@ -11,7 +11,7 @@ has timeout => (is => 'ro', default => sub { { after => 10 } }); sub connect { my $self = shift; - Dlog_debug { "Perparing to create connection with args of: $_" } @_; + Dlog_debug { "Preparing to create connection with args of: $_" } @_; my ($send_to_fh, $receive_from_fh, $child_pid) = $self->_open2_for(@_); my $channel = use_module('Object::Remote::ReadChannel')->new( fh => $receive_from_fh diff --git a/lib/Object/Remote/Role/Connector/PerlInterpreter.pm b/lib/Object/Remote/Role/Connector/PerlInterpreter.pm index 31e13e2..b3e82d0 100644 --- a/lib/Object/Remote/Role/Connector/PerlInterpreter.pm +++ b/lib/Object/Remote/Role/Connector/PerlInterpreter.pm @@ -8,6 +8,7 @@ use Object::Remote::ModuleSender; use Object::Remote::Handle; use Object::Remote::Future; use Scalar::Util qw(blessed weaken); +use POSIX; use Moo::Role; use Symbol; @@ -98,7 +99,7 @@ sub _start_perl { on_read_ready => sub { my $buf = ''; my $len = sysread($foreign_stderr, $buf, 32768); - if (!defined($len) or $len == 0) { + if ((!defined($len) && $! != EAGAIN) or $len == 0) { log_trace { "Got EOF or error on child stderr, removing from watcher" }; $self->stderr(undef); Object::Remote->current_loop @@ -132,7 +133,7 @@ sub _open2_for { } # if the stdin went away, we'll never get Shere # so it's not a big deal to simply give up on !defined - if (!defined($len) or 0 == length($to_send)) { + if ((!defined($len) && $! != EAGAIN) or 0 == length($to_send)) { log_trace { "Got EOF or error when writing fatnode data to filehandle, unwatching it" }; Object::Remote->current_loop ->unwatch_io( @@ -154,7 +155,9 @@ sub _setup_watchdog_reset { return unless $self->watchdog_timeout; Dlog_trace { "Creating Watchdog management timer for connection id $_" } $conn->_id; - + + weaken($conn); + $timer_id = Object::Remote->current_loop->watch_time( every => $self->watchdog_timeout / 3, code => sub { @@ -164,15 +167,13 @@ sub _setup_watchdog_reset { return; } - Dlog_debug { "Reseting Watchdog for connection id $_" } $conn->_id; + Dlog_trace { "Reseting Watchdog for connection id $_" } $conn->_id; #we do not want to block in the run loop so send the #update off and ignore any result, we don't need it #anyway $conn->send_class_call(0, 'Object::Remote::WatchDog', 'reset'); } - ); - - $conn->on_close->on_done(sub { Object::Remote->current_loop->unwatch_time($timer_id) }); + ); } sub fatnode_text { @@ -181,12 +182,13 @@ sub fatnode_text { require Object::Remote::FatNode; - $text = "my \$WATCHDOG_TIMEOUT = '" . $self->watchdog_timeout . "';\n"; - - if (my $duration = $self->watchdog_timeout) { + if (defined($self->watchdog_timeout)) { + $text = "my \$WATCHDOG_TIMEOUT = '" . $self->watchdog_timeout . "';\n"; $text .= "alarm(\$WATCHDOG_TIMEOUT);\n"; + } else { + $text = "my \$WATCHDOG_TIMEOUT = undef;\n"; } - + $text .= 'BEGIN { $ENV{OBJECT_REMOTE_DEBUG} = 1 }'."\n" if $ENV{OBJECT_REMOTE_DEBUG}; $text .= <<'END';