implement optional watchdog for remote connections
Tyler Riddle [Fri, 28 Sep 2012 20:45:34 +0000 (13:45 -0700)]
12 files changed:
lib/Object/Remote/Connection.pm
lib/Object/Remote/Connector/Local.pm
lib/Object/Remote/Connector/LocalSudo.pm
lib/Object/Remote/Connector/SSH.pm
lib/Object/Remote/Connector/UNIX.pm
lib/Object/Remote/FatNode.pm
lib/Object/Remote/MiniLoop.pm
lib/Object/Remote/Node.pm
lib/Object/Remote/ReadChannel.pm
lib/Object/Remote/Role/Connector.pm
lib/Object/Remote/Role/Connector/PerlInterpreter.pm
lib/Object/Remote/WatchDog.pm [new file with mode: 0644]

index 4d46618..e23c823 100644 (file)
@@ -1,26 +1,64 @@
 package Object::Remote::Connection;
 
+use Object::Remote::Logging qw (:log :dlog);
 use Object::Remote::Future;
 use Object::Remote::Null;
 use Object::Remote::Handle;
 use Object::Remote::CodeContainer;
 use Object::Remote::GlobProxy;
 use Object::Remote::GlobContainer;
-use Object::Remote::Logging qw (:log :dlog);
 use Object::Remote::Tied;
 use Object::Remote;
 use Symbol;
 use IO::Handle;
+use POSIX ":sys_wait_h";
 use Module::Runtime qw(use_module);
 use Scalar::Util qw(weaken blessed refaddr openhandle);
 use JSON::PP qw(encode_json);
 use Moo;
 
+BEGIN { 
+  #this will reap child processes as soon
+  #as they are done executing so the process
+  #table cleans up as fast as possible but
+  #anything that needs to call waitpid()
+  #in the future to get the exit value of
+  #a child will get trash results if
+  #the signal handler was running. 
+  #If creating a child and getting the
+  #exit value is required then set
+  #a localized version of the signal
+  #handler for CHLD to be 'IGNORE'
+  #in the smallest block possible
+  #and outside the block send
+  #the process a CHLD signal
+  #to reap anything that may
+  #have exited while blocked
+  #in waitpid() 
+  $SIG{CHLD} = sub { 
+    my $kid; 
+    log_debug { "CHLD signal handler is executing" };
+    do {
+      $kid = waitpid(-1, WNOHANG);
+      log_trace { "waitpid() returned '$kid'" };
+    } while $kid > 0;
+    log_trace { "CHLD signal handler is done" };
+  };      
+}
+
+END {
+  log_debug { "Killing all child processes in the process group" };
+    
+  #send SIGINT to the process group for our children
+  kill(1, -2);
+}
+
+
 our $DEBUG = !!$ENV{OBJECT_REMOTE_DEBUG};
 #numbering each connection allows it to be
 #tracked along with file handles in
 #the logs
-BEGIN { our $NEXT_CONNECTION_ID = 0 }
+
 has _id => ( is => 'ro', required => 1, default => sub { our $NEXT_CONNECTION_ID++ } );
 
 has send_to_fh => (
@@ -32,6 +70,9 @@ has send_to_fh => (
   },
 );
 
+#TODO see if this is another case of the same bug below 
+#where trigger never fires because the attribute isn't
+#actually set at any time
 has read_channel => (
   is => 'ro', required => 1,
   trigger => sub {
@@ -52,11 +93,23 @@ has read_channel => (
 #value and the on_close attribute is read only....
 #the future never gets the on_done handler
 #installed 
-sub BUILD {
+sub BUILD { 
   my ($self) = @_; 
-  $self->on_close(CPS::Future->new);  
+  $self->on_close(CPS::Future->new);
 }
 
+after BUILD => sub {
+  my ($self) = @_; 
+  
+  return unless defined $self->child_pid; 
+  
+  log_debug { "Setting process group of child process" };
+  
+  setpgrp($self->child_pid, 1);
+};
+
+
+
 has on_close => (
   is => 'rw', default => sub { CPS::Future->new },
   trigger => sub {
@@ -65,7 +118,7 @@ has on_close => (
     weaken($self);
     $f->on_done(sub {
       Dlog_trace { "failing all of the outstanding futures for connection $_" } $self->_id;
-      $self->_fail_outstanding("Connection lost: " . ($f->get)[0]);
+      $self->_fail_outstanding("Object::Remote connection lost: " . ($f->get)[0]);
     });
   }
 );
@@ -153,24 +206,47 @@ sub _build__json {
   ); 
 }
 
+sub _load_if_possible {
+  my ($class) = @_; 
+
+  eval "require $class"; 
+
+  if ($@) {
+    log_debug { "Attempt at loading '$class' failed with '$@'" };
+  }
+
+}
+
 BEGIN {
   unshift our @Guess, sub { blessed($_[0]) ? $_[0] : undef };
-  eval { require Object::Remote::Connector::Local };
-  eval { require Object::Remote::Connector::LocalSudo };
-  eval { require Object::Remote::Connector::SSH };
-  eval { require Object::Remote::Connector::UNIX };
+  map _load_if_possible($_), qw(
+    Object::Remote::Connector::Local
+    Object::Remote::Connector::LocalSudo
+    Object::Remote::Connector::SSH
+    Object::Remote::Connector::UNIX
+  ); 
 }
 
-sub new_from_spec {
-  my ($class, $spec) = @_;
-  return $spec if blessed $spec;
-  Dlog_debug { "creating a new connection from spec" };
+sub conn_from_spec {
+  my ($class, $spec, @args) = @_;
   foreach my $poss (do { our @Guess }) {
-    if (my $conn = $poss->($spec)) {
-      return $conn->maybe::start::connect;
+    if (my $conn = $poss->($spec, @args)) {
+      return $conn;
     }
   }
-  die "Couldn't figure out what to do with ${spec}";
+  
+  return undef;
+}
+
+sub new_from_spec {
+  my ($class, $spec) = @_;
+  return $spec if blessed $spec;
+  my $conn = $class->conn_from_spec($spec); 
+  
+  die "Couldn't figure out what to do with ${spec}"
+    unless defined $conn;
+    
+  return $conn->maybe::start::connect;  
 }
 
 sub remote_object {
@@ -279,7 +355,7 @@ sub _send {
       $ret = print $fh $serialized;
       die "print was not successful: $!" unless defined $ret
   };
-  
+    
   if ($@) {
       Dlog_debug { "exception encountered when trying to write to file handle $_: $@" } $fh;
       my $error = $@; chomp($error);
index a2e52b0..838bd5d 100644 (file)
@@ -9,7 +9,10 @@ no warnings 'once';
 BEGIN {  }
 
 push @Object::Remote::Connection::Guess, sub {
-  if (($_[0]||'') eq '-') { __PACKAGE__->new }
+  if (($_[0]||'') eq '-') {
+      shift(@_); 
+      __PACKAGE__->new(@_); 
+  }
 };
 
 1;
index f6b16ee..044d106 100644 (file)
@@ -91,7 +91,8 @@ push @Object::Remote::Connection::Guess, sub {
   for ($_[0]) {
     # username followed by @
     if (defined and !ref and /^ ([^\@]*?) \@ $/x) {
-      return __PACKAGE__->new(target_user => $1);
+      shift(@_);
+      return __PACKAGE__->new(@_, target_user => $1);
     }
   }
   return;
index ed5218a..bb0b869 100644 (file)
@@ -14,6 +14,9 @@ has ssh_options => (is => 'ro', default => sub { [ '-A' ] });
 
 has ssh_command => (is => 'ro', default => sub { 'ssh' });
 
+#TODO properly integrate if this works
+BEGIN { $ENV{TERM} = 'dumb'; } 
+
 sub _build_ssh_perl_command {
   my ($self) = @_;
   return [
@@ -31,7 +34,8 @@ push @Object::Remote::Connection::Guess, sub {
   for ($_[0]) {
     # 0-9 a-z _ - first char, those or . subsequent - hostnamish
     if (defined and !ref and /^(?:.*?\@)?[\w\-][\w\-\.]/) {
-      return __PACKAGE__->new(ssh_to => $_[0]);
+      my $host = shift(@_);
+      return __PACKAGE__->new(@_, ssh_to => $host);
     }
   }
   return;
index 926f060..6aea150 100644 (file)
@@ -20,7 +20,8 @@ no warnings 'once';
 push @Object::Remote::Connection::Guess, sub { 
   for ($_[0]) {
     if (defined and !ref and /^(?:\.\/|\/)/) {
-      return __PACKAGE__->new(socket_path => $_[0]);
+      my $socket = shift(@_);
+      return __PACKAGE__->new(@_, socket_path => $socket);
     }
   }
   return;
index d60b81d..6da6676 100644 (file)
@@ -106,7 +106,7 @@ my $end = stripspace <<'END_END';
 
   use strictures 1;
   use Object::Remote::Node;
-  Object::Remote::Node->run;
+  Object::Remote::Node->run(watchdog_timeout => $WATCHDOG_TIMEOUT);
 END_END
 
 my %files = map +($mods{$_} => scalar do { local (@ARGV, $/) = ($_); <> }),
index af9ce36..9f615c4 100644 (file)
@@ -112,7 +112,7 @@ sub watch_time {
   my $timers = $self->_timers;
   my $new = [ $at => $code, $watch{every} ];
   $self->_sort_timers($new); 
-  log_debug { "Created new timer that expires at '$at'" };
+  log_debug { "Created new timer with id '$new' that expires at '$at'" };
   return "$new";
 }
 
@@ -158,22 +158,7 @@ sub loop_once {
   my $wait_time = $self->_next_timer_expires_delay;
   log_trace {  sprintf("Run loop: loop_once() has been invoked by $c[1]:$c[2] with read:%i write:%i select timeout:%s",
       scalar(keys(%$read)), scalar(keys(%$write)), defined $wait_time ? $wait_time : 'indefinite' ) };
-  #TODO The docs state that select() in some instances can return a socket as ready to
-  #read data even if reading from it would block and the recomendation is to set
-  #handles used with select() as non-blocking but Perl on Windows can not set a 
-  #handle to use non-blocking IO - If Windows is not one of the operating
-  #systems where select() returns a handle that could block it would work to
-  #enable non-blocking mode only under Posix - the non-blocking sysread()
-  #logic would work unmodified for both blocking and non-blocking handles
-  #under Posix and Windows.
   my ($readable, $writeable) = IO::Select->select(
-    #TODO how come select() isn't used to identify handles with errors on them?
-    #TODO is there a specific reason for a half second maximum wait duration?
-    #The two places I've found for the runloop to be invoked don't return control
-    #to the caller until a controlling variable interrupts the loop that invokes
-    #loop_once() - is this to allow that variable to be polled and exit the
-    #run loop? If so why isn't that behavior event driven and causes select() to
-    #return? 
     $self->_read_select, $self->_write_select, undef, $wait_time
   ); 
   log_trace { 
@@ -208,8 +193,7 @@ sub loop_once {
   log_trace { "Checking timers" };
   while (@$timers and $timers->[0][0] <= $now) {
     my $active = $timers->[0]; 
-    Dlog_debug { "Found timer that needs to be executed: $_" } $active;
-#   my (shift @$timers)->[1]->();
+    Dlog_debug { "Found timer that needs to be executed: '$active'" };
      
     if (defined($active->[2])) {
       #handle the case of an 'every' timer
@@ -231,9 +215,6 @@ sub loop_once {
   return;
 }
 
-#::Node and ::ConnectionServer use the want_run() / want_stop()
-#counter to cause a run-loop to execute while something is active;
-#the futures do this via a different mechanism
 sub want_run {
   my ($self) = @_;
   Dlog_debug { "Run loop: Incrimenting want_running, is now $_" }
@@ -258,12 +239,6 @@ sub want_stop {
     --$self->{want_running};
 }
 
-#TODO Hypothesis: Futures invoke run() which gives that future
-#it's own localized is_running attribute - any adjustment to the
-#is_running attribute outside of that future will not effect that
-#future so each future winds up able to call run() and stop() at 
-#will with out interfering with each other - how about having
-#run loop until the future becomes ready? 
 sub run {
   my ($self) = @_;
   log_trace { "Run loop: run() invoked" };
index 027097c..e769e58 100644 (file)
@@ -3,17 +3,24 @@ package Object::Remote::Node;
 use strictures 1;
 use Object::Remote::Connector::STDIO;
 use Object::Remote::Logging qw(:log :dlog);
+use Object::Remote::WatchDog;
 use Object::Remote;
 use CPS::Future;
 
 sub run {
+  my ($class, %args) = @_; 
   log_trace { "run() has been invoked on remote node" };
+  
+  if ($args{watchdog_timeout}) {
+    Object::Remote::WatchDog->new(timeout => $args{watchdog_timeout}); 
+  }
+  
   my $c = Object::Remote::Connector::STDIO->new->connect;
-
+  
   $c->register_class_call_handler;
 
   my $loop = Object::Remote->current_loop;
-
+  
   $c->on_close->on_ready(sub { 
     log_info { "Node connection with call handler has closed" };
     $loop->want_stop 
index fb13c80..6bfc369 100644 (file)
@@ -33,7 +33,7 @@ sub _receive_data_from {
   Dlog_trace { "Preparing to read data from $_" } $fh;
   my $rb = $self->_receive_data_buffer;
   my $len = sysread($fh, $$rb, 32768, length($$rb));
-  my $err = defined($len) ? '' : ": $!";
+  my $err = defined($len) ? 'eof' : ": $!";
   if (defined($len) and $len > 0) {
     log_trace { "Read $len bytes of data" };
     while (my $cb = $self->on_line_call and $$rb =~ s/^(.*)\n//) {
index 4f008d7..dac9939 100644 (file)
@@ -7,8 +7,6 @@ use Moo::Role;
 
 requires '_open2_for';
 
-#TODO return to 10 seconds after debugging
-#has timeout => (is => 'ro', default => sub { { after => 10 } });
 has timeout => (is => 'ro', default => sub { { after => 10 } });
 
 sub connect {
@@ -47,9 +45,6 @@ sub connect {
                   ->watch_time(
                       %{$self->timeout},
                       code => sub {
-#                        log_warn { "Connection timed out for child pid '$child_pid'" };
-#                        $f->fail("Connection timed out") unless $f->is_ready;
-#                        undef($channel);
                         Dlog_trace { "Connection timeout timer has fired for child pid '$child_pid'; is_ready: $_" } $f->is_ready;
                         unless($f->is_ready) {
                             log_warn { "Connection with child pid '$child_pid' has timed out" };
index 6509e36..afbf8bf 100644 (file)
@@ -3,33 +3,21 @@ package Object::Remote::Role::Connector::PerlInterpreter;
 use IPC::Open2;
 use IPC::Open3; 
 use IO::Handle;
+use Object::Remote::Logging qw( :log :dlog );
 use Object::Remote::ModuleSender;
 use Object::Remote::Handle;
 use Object::Remote::Future;
-use Object::Remote::Logging qw( :log :dlog );
-use Scalar::Util qw(blessed);
-use POSIX ":sys_wait_h";
+use Scalar::Util qw(blessed weaken);
 use Moo::Role;
 use Symbol; 
 
 with 'Object::Remote::Role::Connector';
 
-#TODO ugh breaks some of the stuff in System::Introspector::Util by
-#screwing with status value of child
-BEGIN { 
-  $SIG{CHLD} = sub { 
-    my $kid; 
-    do {
-      $kid = waitpid(-1, WNOHANG);
-    } while $kid > 0;
-  } 
-}
-
 has module_sender => (is => 'lazy');
+
 #if no child_stderr file handle is specified then stderr
 #of the child will be connected to stderr of the parent
-has stderr => ( is => 'rw', default => sub { \*STDERR } );
-#has stderr => ( is => 'rw' );
+has stderr => ( is => 'rw', default => sub { undef } );
 
 sub _build_module_sender {
   my ($hook) =
@@ -39,14 +27,16 @@ sub _build_module_sender {
 }
 
 has perl_command => (is => 'lazy');
+has watchdog_timeout => ( is => 'ro', required => 1, default => sub { 0 } );
 
 #TODO convert nice value into optional feature enabled by
 #setting value of attribute
 #ulimit of ~500 megs of v-ram
 #TODO only works with ssh with quotes but only works locally
 #with out quotes
-sub _build_perl_command { [ 'sh', '-c', '"ulimit -v 200000; nice -n 15 perl -"' ] }
-#sub _build_perl_command { [ 'perl', '-' ] }
+#sub _build_perl_command { [ 'sh', '-c', '"ulimit -v 200000; nice -n 15 perl -"' ] }
+sub _build_perl_command { [ 'perl', '-' ] }
+#sub _build_perl_command { [ 'cat' ] }
 
 around connect => sub {
   my ($orig, $self) = (shift, shift);
@@ -54,6 +44,7 @@ around connect => sub {
   return future {
     $f->on_done(sub {
       my ($conn) = $f->get;
+      $self->_setup_watchdog_reset($conn); 
       my $sub = $conn->remote_sub('Object::Remote::Logging::init_logging_forwarding');
       $sub->('Object::Remote::Logging', Object::Remote::Logging->arg_router);
       Object::Remote::Handle->new(
@@ -78,8 +69,15 @@ sub _start_perl {
   Dlog_debug { "invoking connection to perl interpreter using command line: $_" } @{$self->final_perl_command};
     
   if (defined($given_stderr)) {
+      #if the stderr data goes to an existing file handle
+      #an need an anonymous file handle is required
+      #as the other half of a pipe style file handle pair
+      #so the file handles can go into the run loop
       $foreign_stderr = gensym();
   } else {
+      #if no file handle has been specified
+      #for the child's stderr then connect
+      #the child stderr to the parent stderr
       $foreign_stderr = ">&STDERR";
   }
   
@@ -116,34 +114,9 @@ sub _start_perl {
                       );     
   }
       
-  #TODO open2() dupes the child stderr into the calling
-  #process stderr which means if this process exits the
-  #child is still attached to the shell - using open3()
-  #and having the run loop manage the stderr means this
-  #won't happen BUT if the run loop just sends the remote
-  #stderr data to the local stderr the logs will interleave
-  #for sure - a simple test would be to use open3() and just
-  #close the remote stderr and see what happens - a longer
-  #term solution would be for Object::Remote to offer a feature
-  #where the user of a connection species a destination for output
-  #either a file name or their own file handle and the node output
-  #is dumped to it 
-#  my $pid = open2(
-#    my $foreign_stdout,
-#    my $foreign_stdin,
-#    @{$self->final_perl_command},
-#  ) or die "Failed to run perl at '$_[0]': $!";
-#
-#  Dlog_trace { "Connection to remote side successful; remote stdin and stdout: $_" } [ $foreign_stdin, $foreign_stdout ];
-
-
-   return ($foreign_stdin, $foreign_stdout, $pid);
+  return ($foreign_stdin, $foreign_stdout, $pid);
 }
 
-#TODO open2() forks off a child and I have not been able to locate
-#a mechanism for reaping dead children so they don't become zombies
-#CONFIRMED there is no reaping of children being done, find a safe
-#way to do it
 sub _open2_for {
   my $self = shift;
   my ($foreign_stdin, $foreign_stdout, $pid) = $self->_start_perl(@_);
@@ -174,10 +147,46 @@ sub _open2_for {
   return ($foreign_stdin, $foreign_stdout, $pid);
 }
 
+sub _setup_watchdog_reset {
+    my ($self, $conn) = @_;
+    my $timer_id; 
+    
+    return unless $self->watchdog_timeout; 
+        
+    Dlog_trace { "Creating Watchdog management timer for connection id $_" } $conn->_id;
+
+    $timer_id = Object::Remote->current_loop->watch_time(
+        every => $self->watchdog_timeout / 5,
+        code => sub {
+            unless(defined($conn)) {
+                log_trace { "Weak reference to connection in Watchdog was lost, terminating update timer $timer_id" };
+                Object::Remote->current_loop->unwatch_time($timer_id);
+                return;  
+            }
+            
+            Dlog_debug { "Reseting Watchdog for connection id $_" } $conn->_id;
+            #we do not want to block in the run loop so send the
+            #update off and ignore any result, we don't need it
+            #anyway
+            $conn->send_class_call(0, 'Object::Remote::WatchDog', 'reset');
+        }
+    );
+    
+    $conn->on_close->on_done(sub { Object::Remote->current_loop->unwatch_time($timer_id) });
+}
+
 sub fatnode_text {
   my ($self) = @_;
-  require Object::Remote::FatNode;
   my $text = '';
+
+  require Object::Remote::FatNode;
+  
+  $text = "my \$WATCHDOG_TIMEOUT = '" . $self->watchdog_timeout . "';\n";
+  
+  if (my $duration = $self->watchdog_timeout) {
+    $text .= "alarm(\$WATCHDOG_TIMEOUT);\n";    
+  }
+
   $text .= 'BEGIN { $ENV{OBJECT_REMOTE_DEBUG} = 1 }'."\n"
     if $ENV{OBJECT_REMOTE_DEBUG};
   $text .= <<'END';
@@ -190,6 +199,7 @@ END
 eval $Object::Remote::FatNode::DATA;
 die $@ if $@;
 END
+  
   $text .= "__END__\n";
   return $text;
 }
diff --git a/lib/Object/Remote/WatchDog.pm b/lib/Object/Remote/WatchDog.pm
new file mode 100644 (file)
index 0000000..2a3fc61
--- /dev/null
@@ -0,0 +1,57 @@
+package Object::Remote::WatchDog; 
+
+use Object::Remote::MiniLoop; 
+use Object::Remote::Logging qw ( :log :dlog );
+use Moo; 
+
+BEGIN { 
+  $SIG{ALRM} = sub {
+    #if the Watchdog is killing the process we don't want any chance of the
+    #process not actually exiting and die could be caught by an eval which
+    #doesn't do us any good 
+    log_error { sprintf("Watchdog has expired, terminating the process at file %s line %s", __FILE__, __LINE__ + 1); };
+    exit(1);     
+  };   
+};
+
+has timeout => ( is => 'ro', required => 1 );
+
+around new => sub {
+  my ($orig, $self, @args) = @_; 
+  our ($WATCHDOG);
+    
+  return $WATCHDOG if defined $WATCHDOG;
+  log_trace { "Constructing new instance of global watchdog" };
+  return $WATCHDOG = $self->$orig(@args);    
+};
+
+#start the watchdog
+sub BUILD {
+  my ($self) = @_;
+  Dlog_debug { "Initializing watchdog with timeout of $_ seconds" } $self->timeout;
+  alarm($self->timeout);
+}
+
+#invoke at least once per timeout to stop
+#the watchdog from killing the process 
+sub reset {
+  our ($WATCHDOG);
+  die "Attempt to reset the watchdog before it was constructed"
+    unless defined $WATCHDOG; 
+  
+  log_trace { "Watchdog has been reset" };
+  alarm($WATCHDOG->timeout); 
+}
+
+#must explicitly call this method to stop the
+#watchdog from killing the process - if the
+#watchdog is lost because it goes out of scope
+#it makes sense to still terminate the process
+sub shutdown {
+  my ($self) = @_;
+  alarm(0); 
+}
+
+1;
+
+