From: Tyler Riddle Date: Fri, 28 Sep 2012 20:45:34 +0000 (-0700) Subject: implement optional watchdog for remote connections X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=commitdiff_plain;h=69aaad21b20345a6bdc1655c74acb770fee1fbcf;p=scpubgit%2FObject-Remote.git implement optional watchdog for remote connections --- diff --git a/lib/Object/Remote/Connection.pm b/lib/Object/Remote/Connection.pm index 400fdf1..2c688a6 100644 --- a/lib/Object/Remote/Connection.pm +++ b/lib/Object/Remote/Connection.pm @@ -1,26 +1,64 @@ package Object::Remote::Connection; +use Object::Remote::Logging qw (:log :dlog); use Object::Remote::Future; use Object::Remote::Null; use Object::Remote::Handle; use Object::Remote::CodeContainer; use Object::Remote::GlobProxy; use Object::Remote::GlobContainer; -use Object::Remote::Logging qw (:log :dlog); use Object::Remote::Tied; use Object::Remote; use Symbol; use IO::Handle; +use POSIX ":sys_wait_h"; use Module::Runtime qw(use_module); use Scalar::Util qw(weaken blessed refaddr openhandle); use JSON::PP qw(encode_json); use Moo; +BEGIN { + #this will reap child processes as soon + #as they are done executing so the process + #table cleans up as fast as possible but + #anything that needs to call waitpid() + #in the future to get the exit value of + #a child will get trash results if + #the signal handler was running. + #If creating a child and getting the + #exit value is required then set + #a localized version of the signal + #handler for CHLD to be 'IGNORE' + #in the smallest block possible + #and outside the block send + #the process a CHLD signal + #to reap anything that may + #have exited while blocked + #in waitpid() + $SIG{CHLD} = sub { + my $kid; + log_debug { "CHLD signal handler is executing" }; + do { + $kid = waitpid(-1, WNOHANG); + log_trace { "waitpid() returned '$kid'" }; + } while $kid > 0; + log_trace { "CHLD signal handler is done" }; + }; +} + +END { + log_debug { "Killing all child processes in the process group" }; + + #send SIGINT to the process group for our children + kill(1, -2); +} + + our $DEBUG = !!$ENV{OBJECT_REMOTE_DEBUG}; #numbering each connection allows it to be #tracked along with file handles in #the logs -BEGIN { our $NEXT_CONNECTION_ID = 0 } + has _id => ( is => 'ro', required => 1, default => sub { our $NEXT_CONNECTION_ID++ } ); has send_to_fh => ( @@ -32,6 +70,9 @@ has send_to_fh => ( }, ); +#TODO see if this is another case of the same bug below +#where trigger never fires because the attribute isn't +#actually set at any time has read_channel => ( is => 'ro', required => 1, trigger => sub { @@ -52,11 +93,23 @@ has read_channel => ( #value and the on_close attribute is read only.... #the future never gets the on_done handler #installed -sub BUILD { +sub BUILD { my ($self) = @_; - $self->on_close(CPS::Future->new); + $self->on_close(CPS::Future->new); } +after BUILD => sub { + my ($self) = @_; + + return unless defined $self->child_pid; + + log_debug { "Setting process group of child process" }; + + setpgrp($self->child_pid, 1); +}; + + + has on_close => ( is => 'rw', default => sub { CPS::Future->new }, trigger => sub { @@ -65,7 +118,7 @@ has on_close => ( weaken($self); $f->on_done(sub { Dlog_trace { "failing all of the outstanding futures for connection $_" } $self->_id; - $self->_fail_outstanding("Connection lost: " . ($f->get)[0]); + $self->_fail_outstanding("Object::Remote connection lost: " . ($f->get)[0]); }); } ); @@ -149,24 +202,47 @@ sub _build__json { ); } +sub _load_if_possible { + my ($class) = @_; + + eval "require $class"; + + if ($@) { + log_debug { "Attempt at loading '$class' failed with '$@'" }; + } + +} + BEGIN { unshift our @Guess, sub { blessed($_[0]) ? $_[0] : undef }; - eval { require Object::Remote::Connector::Local }; - eval { require Object::Remote::Connector::LocalSudo }; - eval { require Object::Remote::Connector::SSH }; - eval { require Object::Remote::Connector::UNIX }; + map _load_if_possible($_), qw( + Object::Remote::Connector::Local + Object::Remote::Connector::LocalSudo + Object::Remote::Connector::SSH + Object::Remote::Connector::UNIX + ); } -sub new_from_spec { - my ($class, $spec) = @_; - return $spec if blessed $spec; - Dlog_debug { "creating a new connection from spec" }; +sub conn_from_spec { + my ($class, $spec, @args) = @_; foreach my $poss (do { our @Guess }) { - if (my $conn = $poss->($spec)) { - return $conn->maybe::start::connect; + if (my $conn = $poss->($spec, @args)) { + return $conn; } } - die "Couldn't figure out what to do with ${spec}"; + + return undef; +} + +sub new_from_spec { + my ($class, $spec) = @_; + return $spec if blessed $spec; + my $conn = $class->conn_from_spec($spec); + + die "Couldn't figure out what to do with ${spec}" + unless defined $conn; + + return $conn->maybe::start::connect; } sub remote_object { @@ -275,7 +351,7 @@ sub _send { $ret = print $fh $serialized; die "print was not successful: $!" unless defined $ret }; - + if ($@) { Dlog_debug { "exception encountered when trying to write to file handle $_: $@" } $fh; my $error = $@; chomp($error); diff --git a/lib/Object/Remote/Connector/Local.pm b/lib/Object/Remote/Connector/Local.pm index a2e52b0..838bd5d 100644 --- a/lib/Object/Remote/Connector/Local.pm +++ b/lib/Object/Remote/Connector/Local.pm @@ -9,7 +9,10 @@ no warnings 'once'; BEGIN { } push @Object::Remote::Connection::Guess, sub { - if (($_[0]||'') eq '-') { __PACKAGE__->new } + if (($_[0]||'') eq '-') { + shift(@_); + __PACKAGE__->new(@_); + } }; 1; diff --git a/lib/Object/Remote/Connector/LocalSudo.pm b/lib/Object/Remote/Connector/LocalSudo.pm index f6b16ee..044d106 100644 --- a/lib/Object/Remote/Connector/LocalSudo.pm +++ b/lib/Object/Remote/Connector/LocalSudo.pm @@ -91,7 +91,8 @@ push @Object::Remote::Connection::Guess, sub { for ($_[0]) { # username followed by @ if (defined and !ref and /^ ([^\@]*?) \@ $/x) { - return __PACKAGE__->new(target_user => $1); + shift(@_); + return __PACKAGE__->new(@_, target_user => $1); } } return; diff --git a/lib/Object/Remote/Connector/SSH.pm b/lib/Object/Remote/Connector/SSH.pm index ed5218a..bb0b869 100644 --- a/lib/Object/Remote/Connector/SSH.pm +++ b/lib/Object/Remote/Connector/SSH.pm @@ -14,6 +14,9 @@ has ssh_options => (is => 'ro', default => sub { [ '-A' ] }); has ssh_command => (is => 'ro', default => sub { 'ssh' }); +#TODO properly integrate if this works +BEGIN { $ENV{TERM} = 'dumb'; } + sub _build_ssh_perl_command { my ($self) = @_; return [ @@ -31,7 +34,8 @@ push @Object::Remote::Connection::Guess, sub { for ($_[0]) { # 0-9 a-z _ - first char, those or . subsequent - hostnamish if (defined and !ref and /^(?:.*?\@)?[\w\-][\w\-\.]/) { - return __PACKAGE__->new(ssh_to => $_[0]); + my $host = shift(@_); + return __PACKAGE__->new(@_, ssh_to => $host); } } return; diff --git a/lib/Object/Remote/Connector/UNIX.pm b/lib/Object/Remote/Connector/UNIX.pm index 926f060..6aea150 100644 --- a/lib/Object/Remote/Connector/UNIX.pm +++ b/lib/Object/Remote/Connector/UNIX.pm @@ -20,7 +20,8 @@ no warnings 'once'; push @Object::Remote::Connection::Guess, sub { for ($_[0]) { if (defined and !ref and /^(?:\.\/|\/)/) { - return __PACKAGE__->new(socket_path => $_[0]); + my $socket = shift(@_); + return __PACKAGE__->new(@_, socket_path => $socket); } } return; diff --git a/lib/Object/Remote/FatNode.pm b/lib/Object/Remote/FatNode.pm index a0fddc0..c4725ab 100644 --- a/lib/Object/Remote/FatNode.pm +++ b/lib/Object/Remote/FatNode.pm @@ -90,7 +90,7 @@ my $end = stripspace <<'END_END'; use strictures 1; use Object::Remote::Node; - Object::Remote::Node->run; + Object::Remote::Node->run(watchdog_timeout => $WATCHDOG_TIMEOUT); END_END my %files = map +($mods{$_} => scalar do { local (@ARGV, $/) = ($_); <> }), diff --git a/lib/Object/Remote/MiniLoop.pm b/lib/Object/Remote/MiniLoop.pm index af9ce36..9f615c4 100644 --- a/lib/Object/Remote/MiniLoop.pm +++ b/lib/Object/Remote/MiniLoop.pm @@ -112,7 +112,7 @@ sub watch_time { my $timers = $self->_timers; my $new = [ $at => $code, $watch{every} ]; $self->_sort_timers($new); - log_debug { "Created new timer that expires at '$at'" }; + log_debug { "Created new timer with id '$new' that expires at '$at'" }; return "$new"; } @@ -158,22 +158,7 @@ sub loop_once { my $wait_time = $self->_next_timer_expires_delay; log_trace { sprintf("Run loop: loop_once() has been invoked by $c[1]:$c[2] with read:%i write:%i select timeout:%s", scalar(keys(%$read)), scalar(keys(%$write)), defined $wait_time ? $wait_time : 'indefinite' ) }; - #TODO The docs state that select() in some instances can return a socket as ready to - #read data even if reading from it would block and the recomendation is to set - #handles used with select() as non-blocking but Perl on Windows can not set a - #handle to use non-blocking IO - If Windows is not one of the operating - #systems where select() returns a handle that could block it would work to - #enable non-blocking mode only under Posix - the non-blocking sysread() - #logic would work unmodified for both blocking and non-blocking handles - #under Posix and Windows. my ($readable, $writeable) = IO::Select->select( - #TODO how come select() isn't used to identify handles with errors on them? - #TODO is there a specific reason for a half second maximum wait duration? - #The two places I've found for the runloop to be invoked don't return control - #to the caller until a controlling variable interrupts the loop that invokes - #loop_once() - is this to allow that variable to be polled and exit the - #run loop? If so why isn't that behavior event driven and causes select() to - #return? $self->_read_select, $self->_write_select, undef, $wait_time ); log_trace { @@ -208,8 +193,7 @@ sub loop_once { log_trace { "Checking timers" }; while (@$timers and $timers->[0][0] <= $now) { my $active = $timers->[0]; - Dlog_debug { "Found timer that needs to be executed: $_" } $active; -# my (shift @$timers)->[1]->(); + Dlog_debug { "Found timer that needs to be executed: '$active'" }; if (defined($active->[2])) { #handle the case of an 'every' timer @@ -231,9 +215,6 @@ sub loop_once { return; } -#::Node and ::ConnectionServer use the want_run() / want_stop() -#counter to cause a run-loop to execute while something is active; -#the futures do this via a different mechanism sub want_run { my ($self) = @_; Dlog_debug { "Run loop: Incrimenting want_running, is now $_" } @@ -258,12 +239,6 @@ sub want_stop { --$self->{want_running}; } -#TODO Hypothesis: Futures invoke run() which gives that future -#it's own localized is_running attribute - any adjustment to the -#is_running attribute outside of that future will not effect that -#future so each future winds up able to call run() and stop() at -#will with out interfering with each other - how about having -#run loop until the future becomes ready? sub run { my ($self) = @_; log_trace { "Run loop: run() invoked" }; diff --git a/lib/Object/Remote/Node.pm b/lib/Object/Remote/Node.pm index 027097c..e769e58 100644 --- a/lib/Object/Remote/Node.pm +++ b/lib/Object/Remote/Node.pm @@ -3,17 +3,24 @@ package Object::Remote::Node; use strictures 1; use Object::Remote::Connector::STDIO; use Object::Remote::Logging qw(:log :dlog); +use Object::Remote::WatchDog; use Object::Remote; use CPS::Future; sub run { + my ($class, %args) = @_; log_trace { "run() has been invoked on remote node" }; + + if ($args{watchdog_timeout}) { + Object::Remote::WatchDog->new(timeout => $args{watchdog_timeout}); + } + my $c = Object::Remote::Connector::STDIO->new->connect; - + $c->register_class_call_handler; my $loop = Object::Remote->current_loop; - + $c->on_close->on_ready(sub { log_info { "Node connection with call handler has closed" }; $loop->want_stop diff --git a/lib/Object/Remote/ReadChannel.pm b/lib/Object/Remote/ReadChannel.pm index fb13c80..6bfc369 100644 --- a/lib/Object/Remote/ReadChannel.pm +++ b/lib/Object/Remote/ReadChannel.pm @@ -33,7 +33,7 @@ sub _receive_data_from { Dlog_trace { "Preparing to read data from $_" } $fh; my $rb = $self->_receive_data_buffer; my $len = sysread($fh, $$rb, 32768, length($$rb)); - my $err = defined($len) ? '' : ": $!"; + my $err = defined($len) ? 'eof' : ": $!"; if (defined($len) and $len > 0) { log_trace { "Read $len bytes of data" }; while (my $cb = $self->on_line_call and $$rb =~ s/^(.*)\n//) { diff --git a/lib/Object/Remote/Role/Connector.pm b/lib/Object/Remote/Role/Connector.pm index 4f008d7..dac9939 100644 --- a/lib/Object/Remote/Role/Connector.pm +++ b/lib/Object/Remote/Role/Connector.pm @@ -7,8 +7,6 @@ use Moo::Role; requires '_open2_for'; -#TODO return to 10 seconds after debugging -#has timeout => (is => 'ro', default => sub { { after => 10 } }); has timeout => (is => 'ro', default => sub { { after => 10 } }); sub connect { @@ -47,9 +45,6 @@ sub connect { ->watch_time( %{$self->timeout}, code => sub { -# log_warn { "Connection timed out for child pid '$child_pid'" }; -# $f->fail("Connection timed out") unless $f->is_ready; -# undef($channel); Dlog_trace { "Connection timeout timer has fired for child pid '$child_pid'; is_ready: $_" } $f->is_ready; unless($f->is_ready) { log_warn { "Connection with child pid '$child_pid' has timed out" }; diff --git a/lib/Object/Remote/Role/Connector/PerlInterpreter.pm b/lib/Object/Remote/Role/Connector/PerlInterpreter.pm index 6509e36..afbf8bf 100644 --- a/lib/Object/Remote/Role/Connector/PerlInterpreter.pm +++ b/lib/Object/Remote/Role/Connector/PerlInterpreter.pm @@ -3,33 +3,21 @@ package Object::Remote::Role::Connector::PerlInterpreter; use IPC::Open2; use IPC::Open3; use IO::Handle; +use Object::Remote::Logging qw( :log :dlog ); use Object::Remote::ModuleSender; use Object::Remote::Handle; use Object::Remote::Future; -use Object::Remote::Logging qw( :log :dlog ); -use Scalar::Util qw(blessed); -use POSIX ":sys_wait_h"; +use Scalar::Util qw(blessed weaken); use Moo::Role; use Symbol; with 'Object::Remote::Role::Connector'; -#TODO ugh breaks some of the stuff in System::Introspector::Util by -#screwing with status value of child -BEGIN { - $SIG{CHLD} = sub { - my $kid; - do { - $kid = waitpid(-1, WNOHANG); - } while $kid > 0; - } -} - has module_sender => (is => 'lazy'); + #if no child_stderr file handle is specified then stderr #of the child will be connected to stderr of the parent -has stderr => ( is => 'rw', default => sub { \*STDERR } ); -#has stderr => ( is => 'rw' ); +has stderr => ( is => 'rw', default => sub { undef } ); sub _build_module_sender { my ($hook) = @@ -39,14 +27,16 @@ sub _build_module_sender { } has perl_command => (is => 'lazy'); +has watchdog_timeout => ( is => 'ro', required => 1, default => sub { 0 } ); #TODO convert nice value into optional feature enabled by #setting value of attribute #ulimit of ~500 megs of v-ram #TODO only works with ssh with quotes but only works locally #with out quotes -sub _build_perl_command { [ 'sh', '-c', '"ulimit -v 200000; nice -n 15 perl -"' ] } -#sub _build_perl_command { [ 'perl', '-' ] } +#sub _build_perl_command { [ 'sh', '-c', '"ulimit -v 200000; nice -n 15 perl -"' ] } +sub _build_perl_command { [ 'perl', '-' ] } +#sub _build_perl_command { [ 'cat' ] } around connect => sub { my ($orig, $self) = (shift, shift); @@ -54,6 +44,7 @@ around connect => sub { return future { $f->on_done(sub { my ($conn) = $f->get; + $self->_setup_watchdog_reset($conn); my $sub = $conn->remote_sub('Object::Remote::Logging::init_logging_forwarding'); $sub->('Object::Remote::Logging', Object::Remote::Logging->arg_router); Object::Remote::Handle->new( @@ -78,8 +69,15 @@ sub _start_perl { Dlog_debug { "invoking connection to perl interpreter using command line: $_" } @{$self->final_perl_command}; if (defined($given_stderr)) { + #if the stderr data goes to an existing file handle + #an need an anonymous file handle is required + #as the other half of a pipe style file handle pair + #so the file handles can go into the run loop $foreign_stderr = gensym(); } else { + #if no file handle has been specified + #for the child's stderr then connect + #the child stderr to the parent stderr $foreign_stderr = ">&STDERR"; } @@ -116,34 +114,9 @@ sub _start_perl { ); } - #TODO open2() dupes the child stderr into the calling - #process stderr which means if this process exits the - #child is still attached to the shell - using open3() - #and having the run loop manage the stderr means this - #won't happen BUT if the run loop just sends the remote - #stderr data to the local stderr the logs will interleave - #for sure - a simple test would be to use open3() and just - #close the remote stderr and see what happens - a longer - #term solution would be for Object::Remote to offer a feature - #where the user of a connection species a destination for output - #either a file name or their own file handle and the node output - #is dumped to it -# my $pid = open2( -# my $foreign_stdout, -# my $foreign_stdin, -# @{$self->final_perl_command}, -# ) or die "Failed to run perl at '$_[0]': $!"; -# -# Dlog_trace { "Connection to remote side successful; remote stdin and stdout: $_" } [ $foreign_stdin, $foreign_stdout ]; - - - return ($foreign_stdin, $foreign_stdout, $pid); + return ($foreign_stdin, $foreign_stdout, $pid); } -#TODO open2() forks off a child and I have not been able to locate -#a mechanism for reaping dead children so they don't become zombies -#CONFIRMED there is no reaping of children being done, find a safe -#way to do it sub _open2_for { my $self = shift; my ($foreign_stdin, $foreign_stdout, $pid) = $self->_start_perl(@_); @@ -174,10 +147,46 @@ sub _open2_for { return ($foreign_stdin, $foreign_stdout, $pid); } +sub _setup_watchdog_reset { + my ($self, $conn) = @_; + my $timer_id; + + return unless $self->watchdog_timeout; + + Dlog_trace { "Creating Watchdog management timer for connection id $_" } $conn->_id; + + $timer_id = Object::Remote->current_loop->watch_time( + every => $self->watchdog_timeout / 5, + code => sub { + unless(defined($conn)) { + log_trace { "Weak reference to connection in Watchdog was lost, terminating update timer $timer_id" }; + Object::Remote->current_loop->unwatch_time($timer_id); + return; + } + + Dlog_debug { "Reseting Watchdog for connection id $_" } $conn->_id; + #we do not want to block in the run loop so send the + #update off and ignore any result, we don't need it + #anyway + $conn->send_class_call(0, 'Object::Remote::WatchDog', 'reset'); + } + ); + + $conn->on_close->on_done(sub { Object::Remote->current_loop->unwatch_time($timer_id) }); +} + sub fatnode_text { my ($self) = @_; - require Object::Remote::FatNode; my $text = ''; + + require Object::Remote::FatNode; + + $text = "my \$WATCHDOG_TIMEOUT = '" . $self->watchdog_timeout . "';\n"; + + if (my $duration = $self->watchdog_timeout) { + $text .= "alarm(\$WATCHDOG_TIMEOUT);\n"; + } + $text .= 'BEGIN { $ENV{OBJECT_REMOTE_DEBUG} = 1 }'."\n" if $ENV{OBJECT_REMOTE_DEBUG}; $text .= <<'END'; @@ -190,6 +199,7 @@ END eval $Object::Remote::FatNode::DATA; die $@ if $@; END + $text .= "__END__\n"; return $text; } diff --git a/lib/Object/Remote/WatchDog.pm b/lib/Object/Remote/WatchDog.pm new file mode 100644 index 0000000..2a3fc61 --- /dev/null +++ b/lib/Object/Remote/WatchDog.pm @@ -0,0 +1,57 @@ +package Object::Remote::WatchDog; + +use Object::Remote::MiniLoop; +use Object::Remote::Logging qw ( :log :dlog ); +use Moo; + +BEGIN { + $SIG{ALRM} = sub { + #if the Watchdog is killing the process we don't want any chance of the + #process not actually exiting and die could be caught by an eval which + #doesn't do us any good + log_error { sprintf("Watchdog has expired, terminating the process at file %s line %s", __FILE__, __LINE__ + 1); }; + exit(1); + }; +}; + +has timeout => ( is => 'ro', required => 1 ); + +around new => sub { + my ($orig, $self, @args) = @_; + our ($WATCHDOG); + + return $WATCHDOG if defined $WATCHDOG; + log_trace { "Constructing new instance of global watchdog" }; + return $WATCHDOG = $self->$orig(@args); +}; + +#start the watchdog +sub BUILD { + my ($self) = @_; + Dlog_debug { "Initializing watchdog with timeout of $_ seconds" } $self->timeout; + alarm($self->timeout); +} + +#invoke at least once per timeout to stop +#the watchdog from killing the process +sub reset { + our ($WATCHDOG); + die "Attempt to reset the watchdog before it was constructed" + unless defined $WATCHDOG; + + log_trace { "Watchdog has been reset" }; + alarm($WATCHDOG->timeout); +} + +#must explicitly call this method to stop the +#watchdog from killing the process - if the +#watchdog is lost because it goes out of scope +#it makes sense to still terminate the process +sub shutdown { + my ($self) = @_; + alarm(0); +} + +1; + +