From: Tyler Riddle Date: Thu, 27 Sep 2012 01:44:57 +0000 (-0700) Subject: add in fault tolerance for crashing gatherer; start of watchdog X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=commitdiff_plain;h=72e615139af7f26f601d311b07a24d40ce703b33;p=scpubgit%2FSystem-Introspector.git add in fault tolerance for crashing gatherer; start of watchdog --- diff --git a/lib/System/Introspector/Gatherer.pm b/lib/System/Introspector/Gatherer.pm index 3fa7cc5..30eba49 100644 --- a/lib/System/Introspector/Gatherer.pm +++ b/lib/System/Introspector/Gatherer.pm @@ -21,6 +21,13 @@ has introspectors => (is => 'ro', required => 1); # return $self; #} +sub ping { + my ($self) = @_; + + log_trace { "Gatherer just got pinged" }; + return 1; +} + sub gather_all { my ($self) = @_; my $report = System::Introspector::Gatherer::Report->new; diff --git a/lib/System/Introspector/Gatherer/Report.pm b/lib/System/Introspector/Gatherer/Report.pm index a909f52..0c6f7a7 100644 --- a/lib/System/Introspector/Gatherer/Report.pm +++ b/lib/System/Introspector/Gatherer/Report.pm @@ -74,6 +74,7 @@ use System::Introspector::Logger qw( :log :dlog ); use Moo; has data => ( is => 'ro', required => 1, default => sub { {} } ); +#set to true to enable chunked transfers using tied report variables has tied_output => ( is => 'ro', required => 1, default => sub { 0 } ); #return list of class names in report diff --git a/lib/System/Introspector/Probe/LibDirs/Perl.pm b/lib/System/Introspector/Probe/LibDirs/Perl.pm index ca0687a..ddfb3af 100644 --- a/lib/System/Introspector/Probe/LibDirs/Perl.pm +++ b/lib/System/Introspector/Probe/LibDirs/Perl.pm @@ -1,7 +1,6 @@ package System::Introspector::Probe::LibDirs::Perl; use Moo; -use Module::Metadata; use Digest::SHA; use ExtUtils::Installed; @@ -46,12 +45,12 @@ sub _gather_libdir_info { log_debug { "Gathering Perl libdir info for '$libdir'" }; my $installed = ExtUtils::Installed->new(inc_override => [ $libdir ]); - + foreach my $module ($installed->modules) { my $packlist = $installed->packlist($module)->packlist_file; Dlog_trace { "Packlist file for '$module' in '$libdir' is '$_'" } $packlist; - + $modules{$module} = {}; if ($self->enumerate_packlists && -f $packlist) { @@ -65,6 +64,8 @@ sub _gather_libdir_info { #sub _enumerate_metadata { # my ($self, $libdir, $module) = @_; # +# require Module::Metadata; +# # my $pipe = $self->_open_locate_pm_pipe($libdir); # while (defined( my $line = <$pipe> )) { # chomp $line; @@ -109,6 +110,7 @@ sub _open_locate_libdirs_pipe { log_debug { "Executing 'locate' to identify Perl 5 library directories" }; return handle_from_command sprintf #lib/perl5 for Local::Lib and debian installed perl? - lib/perl for others? +# q{locate --regex '^%s.*lib/perl$'}, $root; q{locate --regex '^%s.*lib/perl5$'}, $root; } diff --git a/lib/System/Introspector/State.pm b/lib/System/Introspector/State.pm index 82e6c33..e383182 100644 --- a/lib/System/Introspector/State.pm +++ b/lib/System/Introspector/State.pm @@ -2,7 +2,9 @@ package System::Introspector::State; use Moo; use File::Tree::Snapshot; use Object::Remote::Future qw( await_all ); +use Object::Remote::MiniLoop; use JSON::Diffable qw( encode_json ); +use Scalar::Util qw(weaken); use System::Introspector::Gatherer; use System::Introspector::Logger qw( :log :dlog ); use System::Introspector::Report; @@ -33,6 +35,7 @@ sub gather { log_debug { sprintf("There are %i hosts in the waiting list", scalar(@waiting)) }; for my $wait (@waiting) { my ($host, @futures) = @$wait; + my $report; log_debug { "Waiting on futures for host '$host'" }; @@ -42,10 +45,14 @@ sub gather { #they complete - it would cause less RAM consumption for the #system as a whole but requires modifying the future based #syncronization logic - my ($report) = await_all @futures; -# die Dumper($report_proxy); use Data::Dumper; + eval { ($report) = await_all @futures }; + + if ($@) { + log_error { "Failure when probing '$host': $@" }; + next; + } + log_trace { "Received all from group '$group' on '$host'" }; -# $self->_store($host, $group, +{ map %$_, @data }); $self->_store($host, $group, $report); } } @@ -68,20 +75,18 @@ sub fetch { log_info { "Probing host '$host' with " . join ", ", map $_->[0], @nosudo, @sudo }; if (@nosudo) { log_debug { "Preparing to fetch without sudo: " . join ", ", map $_->[0], @nosudo }; - my $proxy = $self->_create_gatherer( + push @futures, $self->_create_gatherer( host => $host, introspectors => [@nosudo], ); - push @futures, $proxy->start::gather_all; } if (@sudo) { log_debug { "Preparing to fetch with sudo: ", join ", ", map $_->[0], @nosudo }; - my $proxy = $self->_create_gatherer( + push @futures, $self->_create_gatherer( sudo => 1, host => $host, introspectors => [@sudo], ); - push @futures, $proxy->start::gather_all; } log_trace { sprintf("Fetching resulted in %i futures being created", scalar(@futures)) }; @@ -115,6 +120,7 @@ sub _store { } split m{::}, $class; my $fh = $storage->open('>:utf8', $file, mkpath => 1); my $full_path = $storage->file($file); + log_trace { "Collecting probe data for '$class' from '$host'" }; my $data = $gathered->get_probe_data($class); log_debug { "Generated file name for storage: '$file'; Writing state to '$full_path'" }; Dlog_trace { "Input to storage engine: $_" } $data; @@ -126,10 +132,11 @@ sub _store { log_trace { "Comitting stored data" }; $storage->commit; }; + unless ($ok) { log_error { "Rolling back snapshot because of: " . $@ || 'unknown error' }; - $storage->rollback; - die $@; + $storage->reset; + return 0; } return 1; } @@ -150,13 +157,90 @@ sub _cleanup { sub _create_gatherer { my ($self, %arg) = @_; - return System::Introspector::Gatherer->new_from_spec( + my $gatherer = System::Introspector::Gatherer->new_from_spec( user => $self->user, host => $arg{host}, sudo_user => $arg{sudo} && $self->sudo_user, introspectors => $arg{introspectors}, - #TODO waiting to see if this really is never coming back - );#->init_logging($self->config->log_level, "probe:$arg{host}"); + ); + + my $future = $gatherer->start::gather_all; + + $self->_create_watchdog($gatherer, $future); + + return $future; +} + +sub _create_watchdog { + my ($self, $gatherer, $future) = @_; + my $timeout = 3; +# my $handler = $self->_create_watchdog_handler($timeout, $gatherer, $future); +# +# $self->_schedule_watchdog( $handler, $timeout ); +} + +sub _schedule_watchdog { + my ($self, $handler, $timeout) = @_; + my $timer_id; + + log_trace { "Setting a watchdog to check a gatherer after $timeout seconds" }; + + $timer_id = Object::Remote->current_loop->watch_time( + every => $timeout, + code => sub { + log_trace { "Watchdog timer has expired; checking on gatherer" }; + unless ($handler->()) { + log_trace { "Not scheduling watchdog to execute again" }; + $handler = undef; + Object::Remote->current_loop->unwatch_time($timer_id); + } + } + ); +} + +sub _create_watchdog_handler { + my ($self, $timeout, $gatherer, $future) = @_; + + return sub { + #handler returns 1 if it should run again or 0 otherwise + my $ping_waiting; + + log_debug { "Watchdog is checking up on gatherer" }; + + if ($future->is_ready || $future->is_cancelled) { + log_trace { "Future for gatherer is ready or cancelled; watchdog is no longer needed" }; + return 0; + } + + if ($ping_waiting) { + log_debug { "This gatherer did not respond to a ping after '$timeout' seconds" }; + $future->fail("Gatherer did not respond to ping"); + return 0; + } + + log_trace { "Watchdog is going to ping the gatherer" }; + + $ping_waiting = 1; + my $ping_resp; + eval { $ping_resp = $gatherer->ping }; + + log_trace { "Ping completed" }; + + if ($@) { + log_debug { "Could not invoke ping method on gatherer: $@" }; + $future->fail("Could not ping gatherer: $@") unless $future->is_ready; + return 0; + } else { + log_debug { "Ping to gatherer did not return true" }; + $future->fail("Ping to gatherer did not return true") unless $future->is_ready; + } + + $ping_waiting = 0; + + log_trace { "The watchdog did not find any problems" }; + + return 1; + } } 1;