add in fault tolerance for crashing gatherer; start of watchdog
Tyler Riddle [Thu, 27 Sep 2012 01:44:57 +0000 (18:44 -0700)]
lib/System/Introspector/Gatherer.pm
lib/System/Introspector/Gatherer/Report.pm
lib/System/Introspector/Probe/LibDirs/Perl.pm
lib/System/Introspector/State.pm

index 3fa7cc5..30eba49 100644 (file)
@@ -21,6 +21,13 @@ has introspectors => (is => 'ro', required => 1);
 #    return $self;
 #}
 
+sub ping {
+    my ($self) = @_; 
+    
+    log_trace { "Gatherer just got pinged" };
+    return 1; 
+}
+
 sub gather_all {
     my ($self) = @_;
     my $report = System::Introspector::Gatherer::Report->new; 
index a909f52..0c6f7a7 100644 (file)
@@ -74,6 +74,7 @@ use System::Introspector::Logger qw( :log :dlog );
 use Moo;
 
 has data => ( is => 'ro', required => 1, default => sub { {} } );
+#set to true to enable chunked transfers using tied report variables
 has tied_output => ( is => 'ro', required => 1, default => sub { 0 } );
 
 #return list of class names in report
index ca0687a..ddfb3af 100644 (file)
@@ -1,7 +1,6 @@
 package System::Introspector::Probe::LibDirs::Perl;
 
 use Moo;
-use Module::Metadata;
 use Digest::SHA;
 use ExtUtils::Installed; 
 
@@ -46,12 +45,12 @@ sub _gather_libdir_info {
     log_debug { "Gathering Perl libdir info for '$libdir'" };
 
     my $installed = ExtUtils::Installed->new(inc_override => [ $libdir ]);
-    
+        
     foreach my $module ($installed->modules) {
         my $packlist = $installed->packlist($module)->packlist_file;
         
         Dlog_trace { "Packlist file for '$module' in '$libdir' is '$_'" } $packlist;
-        
+                
         $modules{$module} = {};
                 
         if ($self->enumerate_packlists && -f $packlist) {
@@ -65,6 +64,8 @@ sub _gather_libdir_info {
 #sub _enumerate_metadata {
 #    my ($self, $libdir, $module) = @_;
 #    
+#    require Module::Metadata;
+#
 #    my $pipe = $self->_open_locate_pm_pipe($libdir);
 #    while (defined( my $line = <$pipe> )) {
 #        chomp $line;
@@ -109,6 +110,7 @@ sub _open_locate_libdirs_pipe {
     log_debug { "Executing 'locate' to identify Perl 5 library directories" };
     return handle_from_command sprintf
         #lib/perl5 for Local::Lib and debian installed perl? - lib/perl for others?
+#        q{locate --regex '^%s.*lib/perl$'}, $root;
         q{locate --regex '^%s.*lib/perl5$'}, $root;
 }
 
index 82e6c33..e383182 100644 (file)
@@ -2,7 +2,9 @@ package System::Introspector::State;
 use Moo;
 use File::Tree::Snapshot;
 use Object::Remote::Future qw( await_all );
+use Object::Remote::MiniLoop; 
 use JSON::Diffable qw( encode_json );
+use Scalar::Util qw(weaken);
 use System::Introspector::Gatherer;
 use System::Introspector::Logger qw( :log :dlog );
 use System::Introspector::Report;
@@ -33,6 +35,7 @@ sub gather {
         log_debug { sprintf("There are %i hosts in the waiting list", scalar(@waiting)) };
         for my $wait (@waiting) {
             my ($host, @futures) = @$wait;
+            my $report; 
             
             log_debug { "Waiting on futures for host '$host'" };
             
@@ -42,10 +45,14 @@ sub gather {
             #they complete - it would cause less RAM consumption for the
             #system as a whole but requires modifying the future based
             #syncronization logic
-            my ($report) = await_all @futures;
-#            die Dumper($report_proxy); use Data::Dumper; 
+            eval { ($report) = await_all @futures }; 
+            
+            if ($@) {
+                log_error { "Failure when probing '$host': $@" };
+                next; 
+            }
+            
             log_trace { "Received all from group '$group' on '$host'" };
-#            $self->_store($host, $group, +{ map %$_, @data });
             $self->_store($host, $group, $report);
         }
     }   
@@ -68,20 +75,18 @@ sub fetch {
     log_info { "Probing host '$host' with " . join ", ", map $_->[0], @nosudo, @sudo };   
     if (@nosudo) {
         log_debug { "Preparing to fetch without sudo: " . join ", ", map $_->[0], @nosudo };
-        my $proxy = $self->_create_gatherer(
+        push @futures, $self->_create_gatherer(
             host => $host,
             introspectors => [@nosudo],
         );
-        push @futures, $proxy->start::gather_all;
     }
     if (@sudo) {
         log_debug { "Preparing to fetch with sudo: ", join ", ", map $_->[0], @nosudo };
-        my $proxy = $self->_create_gatherer(
+        push @futures, $self->_create_gatherer(
             sudo => 1,
             host => $host,
             introspectors => [@sudo],
         );
-        push @futures, $proxy->start::gather_all;
     }
     
     log_trace { sprintf("Fetching resulted in %i futures being created", scalar(@futures)) };
@@ -115,6 +120,7 @@ sub _store {
                 } split m{::}, $class;
             my $fh = $storage->open('>:utf8', $file, mkpath => 1);
             my $full_path = $storage->file($file);
+            log_trace { "Collecting probe data for '$class' from '$host'" };
             my $data = $gathered->get_probe_data($class);
             log_debug { "Generated file name for storage: '$file'; Writing state to '$full_path'" };
             Dlog_trace { "Input to storage engine: $_" } $data; 
@@ -126,10 +132,11 @@ sub _store {
         log_trace { "Comitting stored data" };
         $storage->commit;
     };
+    
     unless ($ok) {
         log_error { "Rolling back snapshot because of: " . $@ || 'unknown error' };
-        $storage->rollback;
-        die $@;
+        $storage->reset;
+        return 0; 
     }
     return 1;
 }
@@ -150,13 +157,90 @@ sub _cleanup {
 
 sub _create_gatherer {
     my ($self, %arg) = @_;
-    return System::Introspector::Gatherer->new_from_spec(
+    my $gatherer = System::Introspector::Gatherer->new_from_spec(
         user          => $self->user,
         host          => $arg{host},
         sudo_user     => $arg{sudo} && $self->sudo_user,
         introspectors => $arg{introspectors},
-    #TODO waiting to see if this really is never coming back
-    );#->init_logging($self->config->log_level, "probe:$arg{host}");
+    );
+    
+    my $future = $gatherer->start::gather_all;
+    
+    $self->_create_watchdog($gatherer, $future); 
+    
+    return $future; 
+}
+
+sub _create_watchdog {
+    my ($self, $gatherer, $future) = @_; 
+    my $timeout = 3; 
+#    my $handler = $self->_create_watchdog_handler($timeout, $gatherer, $future);
+#    
+#    $self->_schedule_watchdog( $handler, $timeout );
+}
+
+sub _schedule_watchdog {
+    my ($self, $handler, $timeout) = @_; 
+    my $timer_id; 
+    
+    log_trace { "Setting a watchdog to check a gatherer after $timeout seconds" };
+    
+    $timer_id = Object::Remote->current_loop->watch_time(
+        every => $timeout,
+        code => sub {
+            log_trace { "Watchdog timer has expired; checking on gatherer" };
+            unless ($handler->()) {
+                log_trace { "Not scheduling watchdog to execute again" };
+                $handler = undef; 
+                Object::Remote->current_loop->unwatch_time($timer_id);
+            }
+        }
+    );
+}
+
+sub _create_watchdog_handler {
+    my ($self, $timeout, $gatherer, $future) = @_; 
+        
+    return sub {
+        #handler returns 1 if it should run again or 0 otherwise
+        my $ping_waiting; 
+        
+        log_debug { "Watchdog is checking up on gatherer" };
+        
+        if ($future->is_ready || $future->is_cancelled) {
+            log_trace { "Future for gatherer is ready or cancelled; watchdog is no longer needed" };
+            return 0; 
+        }
+        
+        if ($ping_waiting) {
+            log_debug { "This gatherer did not respond to a ping after '$timeout' seconds" };
+            $future->fail("Gatherer did not respond to ping");
+            return 0; 
+        }
+        
+        log_trace { "Watchdog is going to ping the gatherer" };
+        
+        $ping_waiting = 1;
+        my $ping_resp; 
+        eval { $ping_resp = $gatherer->ping };
+        
+        log_trace { "Ping completed" };
+        
+        if ($@) {
+            log_debug { "Could not invoke ping method on gatherer: $@" };
+            $future->fail("Could not ping gatherer: $@") unless $future->is_ready;
+            return 0; 
+        } else {
+            log_debug { "Ping to gatherer did not return true" };
+            $future->fail("Ping to gatherer did not return true") unless $future->is_ready;
+        }
+        
+        $ping_waiting = 0;
+        
+        log_trace { "The watchdog did not find any problems" };
+        
+        return 1; 
+    }
 }
 
 1;