From: Robert 'phaylon' Sedlacek Date: Wed, 11 Jul 2012 19:09:23 +0000 (+0000) Subject: parallel fetching of multiple hosts X-Git-Tag: v0.001_001~28 X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=commitdiff_plain;h=949dba9cac17ec1aedcaa6182ff290c6c9b57857;p=scpubgit%2FSystem-Introspector.git parallel fetching of multiple hosts --- diff --git a/lib/System/Introspector/Gatherer.pm b/lib/System/Introspector/Gatherer.pm index 43ffff4..d88f590 100644 --- a/lib/System/Introspector/Gatherer.pm +++ b/lib/System/Introspector/Gatherer.pm @@ -5,23 +5,31 @@ use Object::Remote::Future; use System::Introspector::Gatherer::Bridge; use Module::Runtime qw( use_module ); -sub gather { - my ($self, $class, $args) = @_; - return use_module("System::Introspector::Probe::$class") - ->new($args) - ->gather; +has introspectors => (is => 'ro', required => 1); + +sub gather_all { + my ($self) = @_; + my %report; + for my $spec (@{ $self->introspectors }) { + my ($base, $args) = @$spec; + $report{$base} = use_module("System::Introspector::Probe::$base") + ->new($args) + ->gather; + } + return \%report; } sub _new_direct { - my ($class, $remote) = @_; - return $class->new::on($remote); + my ($class, $remote, $args) = @_; + return $class->new::on($remote, $args || {}); } sub _new_bridged { - my ($class, $bridge, $remote) = @_; + my ($class, $bridge, $remote, $args) = @_; return System::Introspector::Gatherer::Bridge->new::on($bridge, remote_spec => $remote, remote_class => $class, + remote_args => $args || {}, ); } @@ -29,21 +37,22 @@ sub new_from_spec { my ($class, %arg) = @_; my ($user, $host, $sudo_user) = @arg{qw( user host sudo_user )}; my $sudo = defined($sudo_user) ? sprintf('%s@', $sudo_user) : undef; + my $args = { introspectors => $arg{introspectors} }; if (defined $host) { my $remote = join '@', grep defined, $user, $host; if (defined $sudo_user) { - return $class->_new_bridged($remote, $sudo); + return $class->_new_bridged($remote, $sudo, $args); } else { - return $class->_new_direct($remote); + return $class->_new_direct($remote, $args); } } else { if (defined $sudo_user) { - return $class->_new_direct($sudo); + return $class->_new_direct($sudo, $args); } else { - return $class->new; + return $class->new($args); } } } diff --git a/lib/System/Introspector/Gatherer/Bridge.pm b/lib/System/Introspector/Gatherer/Bridge.pm index dd6cd19..e802691 100644 --- a/lib/System/Introspector/Gatherer/Bridge.pm +++ b/lib/System/Introspector/Gatherer/Bridge.pm @@ -5,11 +5,13 @@ use Moo; has remote_spec => (is => 'ro', required => 1); has remote_class => (is => 'ro', required => 1); +has remote_args => (is => 'ro', required => 1); has remote => (is => 'lazy'); sub _build_remote { my ($self) = @_; - return $self->remote_class->new::on($self->remote_spec); + return $self->remote_class + ->new::on($self->remote_spec, $self->remote_args); } sub gather { (shift)->remote->gather(@_) } diff --git a/lib/System/Introspector/State.pm b/lib/System/Introspector/State.pm index 3f20876..678bc42 100644 --- a/lib/System/Introspector/State.pm +++ b/lib/System/Introspector/State.pm @@ -2,6 +2,7 @@ package System::Introspector::State; use Moo; use File::Tree::Snapshot; use System::Introspector::Gatherer; +use Object::Remote::Future qw( await_all ); use JSON::Diffable qw( encode_json ); @@ -13,11 +14,31 @@ sub user { $_[0]->config->user } sub sudo_user { $_[0]->config->sudo_user } +sub _log { shift; printf "[%s] %s\n", scalar(localtime), join '', @_ } + sub gather { my ($self, @groups) = @_; - for my $host ($self->config->hosts) { - $self->fetch_and_store($host, @groups); + $self->_log('Start'); + for my $group (@groups) { + my @waiting; + for my $host ($self->config->hosts) { + $self->_log("Beginning to fetch group '$group' on '$host'"); + push @waiting, [$host, $self->fetch($host, $group)]; + } + $self->_log("Now waiting for results"); + my @results = map { + my ($host, @futures) = @$_; + my $done = [$host, await_all @futures]; + $self->_log("Received all from group '$group' on '$host'"); + $done; + } @waiting; + $self->_log("All gathered for group '$group'"); + for my $result (@results) { + my ($host, @data) = @$result; + $self->_store($host, $group, +{ map %$_, @data }); + } } + $self->_log('Done'); return 1; } @@ -26,48 +47,31 @@ sub introspectors { return $self->config->config_for_group($group)->{introspect}; } -sub fetch_and_store { - my ($self, $host, @groups) = @_; - my $data = $self->fetch($host, @groups); - return $self->_store($host, $data); -} - sub fetch { - my ($self, $host, @groups) = @_; - return +{ map { - ($_, $self->fetch_group($host, $_)); - } @groups }; -} - -sub fetch_group { my ($self, $host, $group) = @_; my $spec = $self->introspectors($group); my (@sudo, @nosudo); push(@{ $spec->{$_}{sudo} ? \@sudo : \@nosudo}, [$_, $spec->{$_}]) for sort keys %$spec; - my %report; + my @futures; if (@nosudo) { - my $gatherer = $self->_create_gatherer(host => $host); - %report = %{ $self->_fetch_with_gatherer($gatherer, @nosudo) || {} }; + $self->_log("Without sudo: ", join ", ", map $_->[0], @nosudo); + my $proxy = $self->_create_gatherer( + host => $host, + introspectors => [@nosudo], + ); + push @futures, $proxy->start::gather_all; } if (@sudo) { - my $gatherer = $self->_create_gatherer(sudo => 1, host => $host); - %report = (%report, %{ $self->_fetch_with_gatherer($gatherer, @sudo) || {} }); - } - return \%report; -} - -sub _fetch_with_gatherer { - my ($self, $gatherer, @spec) = @_; - my %report; - for my $class_spec (@spec) { - my ($class_base, $args) = @$class_spec; - print "Gathering $class_base data\n"; - $report{ $class_base } = $gatherer - ->gather($class_base, $args); + $self->_log("With sudo: ", join ", ", map $_->[0], @nosudo); + my $proxy = $self->_create_gatherer( + sudo => 1, + host => $host, + introspectors => [@sudo], + ); + push @futures, $proxy->start::gather_all; } - print "All gathered\n"; - return \%report; + return @futures; } sub storage { @@ -82,10 +86,9 @@ sub storage { } sub _store { - my ($self, $host, $data) = @_; - for my $group (sort keys %$data) { - my $storage = $self->storage($host, $group); - my $gathered = $data->{$group}; + my ($self, $host, $group, $gathered) = @_; + my $storage = $self->storage($host, $group); + my $ok = eval { my @files; for my $class (sort keys %$gathered) { my $file = sprintf '%s.json', join '/', @@ -94,12 +97,19 @@ sub _store { $_; } split m{::}, $class; my $fh = $storage->open('>:utf8', $file, mkpath => 1); - print "Writing $file\n"; + my $full_path = $storage->file($file); + $self->_log("Writing $full_path"); print $fh encode_json($gathered->{$class}); - push @files, $storage->file($file); + push @files, $full_path; } $self->_cleanup($storage, [@files]); + $self->_log("Committing"); $storage->commit; + }; + unless ($ok) { + $self->_log("Rolling back snapshot because of: ", $@ || 'unknown error'); + $storage->rollback; + die $@; } return 1; } @@ -110,7 +120,7 @@ sub _cleanup { my @files = $storage->find_files('json'); for my $file (@files) { next if $known{$file}; - print "Removing $file\n"; + $self->_log("Removing $file"); unlink($file) or die "Unable to remove '$file': $!\n"; } @@ -120,9 +130,10 @@ sub _cleanup { sub _create_gatherer { my ($self, %arg) = @_; return System::Introspector::Gatherer->new_from_spec( - user => $self->user, - host => $arg{host}, - sudo_user => $arg{sudo} && $self->sudo_user, + user => $self->user, + host => $arg{host}, + sudo_user => $arg{sudo} && $self->sudo_user, + introspectors => $arg{introspectors}, ); } diff --git a/lib/System/Introspector/Util.pm b/lib/System/Introspector/Util.pm index c73572b..a953dc2 100644 --- a/lib/System/Introspector/Util.pm +++ b/lib/System/Introspector/Util.pm @@ -16,6 +16,7 @@ our @EXPORT_OK = qw( lines_from_command files_from_dir transform_exceptions + fail ); do {