From: Tyler Riddle Date: Tue, 2 Oct 2012 23:08:42 +0000 (-0700) Subject: remove broken watchdog; gatherer can be logged straight to file via run loop; arbitra... X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=commitdiff_plain;h=d7e10d26b8daa2afb1b957f744e1af9a36dd6468;p=scpubgit%2FSystem-Introspector.git remove broken watchdog; gatherer can be logged straight to file via run loop; arbitrary concurrency limit on number of simultanious hosts when gathering --- diff --git a/lib/System/Introspector/Gatherer.pm b/lib/System/Introspector/Gatherer.pm index 2c7da3b..b99cfb8 100644 --- a/lib/System/Introspector/Gatherer.pm +++ b/lib/System/Introspector/Gatherer.pm @@ -9,6 +9,7 @@ use Module::Runtime qw( use_module ); use System::Introspector::Logger qw( :log :dlog ); has introspectors => (is => 'ro', required => 1); +has stderr_fh => ( is => 'ro' ); sub gather_all { my ($self) = @_; @@ -23,8 +24,8 @@ sub gather_all { log_debug { "Using '$module' for this gather" }; my $module_name = use_module($module); log_trace { "Finished loading '$module'; returned value was '$module_name'" }; - my $instance = $module_name->new::on('-', $args); - #my $instance = $module_name->new($args); + #my $instance = $module_name->new::on('-', $args); + my $instance = $module_name->new($args); log_trace { "Finished constructing '$module_name'; starting gather" }; my $probe_data = $instance->gather; log_trace { "Gathering completed, storing data in the report for '$module_name'" }; @@ -62,9 +63,11 @@ sub new_from_spec { my ($user, $host, $sudo_user) = @arg{qw( user host sudo_user )}; my $sudo = defined($sudo_user) ? sprintf('%s@', $sudo_user) : undef; my $args = { introspectors => $arg{introspectors} }; + #TODO move watchdog_timeout into an attribute for this class + my %connection_args = ( stderr => $arg{stderr_fh}, watchdog_timeout => 300 ); if (defined $host) { my $remote = join '@', grep defined, $user, $host; - my $conn = Object::Remote::Connection->conn_from_spec($remote, watchdog_timeout => 120); + my $conn = Object::Remote::Connection->conn_from_spec($remote, %connection_args); $conn->maybe::start::connect; if (defined $sudo_user) { return $class->_new_bridged($conn->maybe::start::connect, $sudo, $args); @@ -76,7 +79,7 @@ sub new_from_spec { else { if (defined $sudo_user) { #TODO find a better way to achieve this result - my $conn = Object::Remote::Connection->conn_from_spec($sudo_user, watchdog_timeout => 120); + my $conn = Object::Remote::Connection->conn_from_spec($sudo_user, %connection_args); return $class->_new_direct($conn->maybe::start::connect, $args); } @@ -86,6 +89,13 @@ sub new_from_spec { } } +#let the controller shut down this process on command +sub quit { + my ($self) = @_; + log_debug { "this gatherer got a quit command, invoking exit(0)" }; + exit(0); +} + 1; __END__ diff --git a/lib/System/Introspector/Gatherer/Report.pm b/lib/System/Introspector/Gatherer/Report.pm index 0c6f7a7..0e8fd1a 100644 --- a/lib/System/Introspector/Gatherer/Report.pm +++ b/lib/System/Introspector/Gatherer/Report.pm @@ -77,6 +77,7 @@ has data => ( is => 'ro', required => 1, default => sub { {} } ); #set to true to enable chunked transfers using tied report variables has tied_output => ( is => 'ro', required => 1, default => sub { 0 } ); + #return list of class names in report sub probe_names { my ($self) = @_; diff --git a/lib/System/Introspector/State.pm b/lib/System/Introspector/State.pm index e383182..27ec8e7 100644 --- a/lib/System/Introspector/State.pm +++ b/lib/System/Introspector/State.pm @@ -27,35 +27,56 @@ sub gather { my ($self, @groups) = @_; log_debug { "Starting to gather results" }; for my $group (@groups) { - my @waiting; - for my $host ($self->config->hosts) { - log_debug { "Adding group '$group' on '$host' to list of active fetches" }; - push @waiting, [$host, $self->fetch($host, $group)]; - } - log_debug { sprintf("There are %i hosts in the waiting list", scalar(@waiting)) }; - for my $wait (@waiting) { - my ($host, @futures) = @$wait; - my $report; - - log_debug { "Waiting on futures for host '$host'" }; - - #TODO another way to solve the huge JSON problem is to - #invoke the probes in from controller directly via proxy - #objects and receive the results from each probe as - #they complete - it would cause less RAM consumption for the - #system as a whole but requires modifying the future based - #syncronization logic - eval { ($report) = await_all @futures }; + my @hosts = $self->config->hosts; + + log_info { my $c = scalar(@hosts); "This gather run is for $c hosts" }; + + while(scalar(@hosts) > 0) { + #TODO turn the splice size into an attribute on this class + my @scan = splice(@hosts, 0, 50); + my @waiting; + + Dlog_trace { my $c = scalar(@scan); "Scanning $c hosts in this gather loop" } @scan; - if ($@) { - log_error { "Failure when probing '$host': $@" }; - next; + for my $host (@scan) { + log_debug { "Adding group '$group' on '$host' to list of active fetches" }; + my $to_fetch; + + eval { $to_fetch = [$host, $self->fetch($host, $group)] }; + + if ($@) { + log_trace { "Could not start fetching for $host: '$@'" }; + next; + } + + push @waiting, $to_fetch; } - - log_trace { "Received all from group '$group' on '$host'" }; - $self->_store($host, $group, $report); - } - } + log_debug { sprintf("There are %i hosts in the waiting list", scalar(@waiting)) }; + for my $wait (@waiting) { + my ($host, @futures) = @$wait; + my $report; + + log_debug { "Waiting on futures for host '$host'" }; + + #TODO another way to solve the huge JSON problem is to + #invoke the probes in from controller directly via proxy + #objects and receive the results from each probe as + #they complete - it would cause less RAM consumption for the + #system as a whole but requires modifying the future based + #syncronization logic + eval { ($report) = await_all @futures }; + + if ($@) { + log_error { "Failure when probing '$host': $@" }; + next; + } + + log_trace { "Received all from group '$group' on '$host'" }; + $self->_store($host, $group, $report); + } + } + } + log_debug { "Completed gathering results" }; return 1; } @@ -110,7 +131,6 @@ sub _store { my $storage = $self->storage($host, $group); my $ok = eval { my @files; -# for my $class (sort keys %$gathered) { for my $class ($gathered->probe_names) { log_trace { "Storing data for probe name '$class'" }; my $file = sprintf '%s.json', join '/', @@ -157,92 +177,31 @@ sub _cleanup { sub _create_gatherer { my ($self, %arg) = @_; + my $stderr_fh; + + #TODO move into attribute for this class +if(1) { + my $log_file = "log/$arg{host}.log"; + + log_debug { "Logging stderr for host '$arg{host}' to '$log_file'" }; + + open($stderr_fh, ">>", $log_file) or die "Could not open '$log_file' for write: $!"; + +} + my $gatherer = System::Introspector::Gatherer->new_from_spec( user => $self->user, + stderr_fh => $stderr_fh, host => $arg{host}, sudo_user => $arg{sudo} && $self->sudo_user, introspectors => $arg{introspectors}, ); my $future = $gatherer->start::gather_all; - - $self->_create_watchdog($gatherer, $future); - + return $future; } -sub _create_watchdog { - my ($self, $gatherer, $future) = @_; - my $timeout = 3; -# my $handler = $self->_create_watchdog_handler($timeout, $gatherer, $future); -# -# $self->_schedule_watchdog( $handler, $timeout ); -} - -sub _schedule_watchdog { - my ($self, $handler, $timeout) = @_; - my $timer_id; - - log_trace { "Setting a watchdog to check a gatherer after $timeout seconds" }; - - $timer_id = Object::Remote->current_loop->watch_time( - every => $timeout, - code => sub { - log_trace { "Watchdog timer has expired; checking on gatherer" }; - unless ($handler->()) { - log_trace { "Not scheduling watchdog to execute again" }; - $handler = undef; - Object::Remote->current_loop->unwatch_time($timer_id); - } - } - ); -} - -sub _create_watchdog_handler { - my ($self, $timeout, $gatherer, $future) = @_; - - return sub { - #handler returns 1 if it should run again or 0 otherwise - my $ping_waiting; - - log_debug { "Watchdog is checking up on gatherer" }; - - if ($future->is_ready || $future->is_cancelled) { - log_trace { "Future for gatherer is ready or cancelled; watchdog is no longer needed" }; - return 0; - } - - if ($ping_waiting) { - log_debug { "This gatherer did not respond to a ping after '$timeout' seconds" }; - $future->fail("Gatherer did not respond to ping"); - return 0; - } - - log_trace { "Watchdog is going to ping the gatherer" }; - - $ping_waiting = 1; - my $ping_resp; - eval { $ping_resp = $gatherer->ping }; - - log_trace { "Ping completed" }; - - if ($@) { - log_debug { "Could not invoke ping method on gatherer: $@" }; - $future->fail("Could not ping gatherer: $@") unless $future->is_ready; - return 0; - } else { - log_debug { "Ping to gatherer did not return true" }; - $future->fail("Ping to gatherer did not return true") unless $future->is_ready; - } - - $ping_waiting = 0; - - log_trace { "The watchdog did not find any problems" }; - - return 1; - } -} - 1; =head1 NAME