use System::Introspector::Gatherer::Bridge;
use Module::Runtime qw( use_module );
-sub gather {
- my ($self, $class, $args) = @_;
- return use_module("System::Introspector::Probe::$class")
- ->new($args)
- ->gather;
+has introspectors => (is => 'ro', required => 1);
+
+sub gather_all {
+ my ($self) = @_;
+ my %report;
+ for my $spec (@{ $self->introspectors }) {
+ my ($base, $args) = @$spec;
+ $report{$base} = use_module("System::Introspector::Probe::$base")
+ ->new($args)
+ ->gather;
+ }
+ return \%report;
}
sub _new_direct {
- my ($class, $remote) = @_;
- return $class->new::on($remote);
+ my ($class, $remote, $args) = @_;
+ return $class->new::on($remote, $args || {});
}
sub _new_bridged {
- my ($class, $bridge, $remote) = @_;
+ my ($class, $bridge, $remote, $args) = @_;
return System::Introspector::Gatherer::Bridge->new::on($bridge,
remote_spec => $remote,
remote_class => $class,
+ remote_args => $args || {},
);
}
my ($class, %arg) = @_;
my ($user, $host, $sudo_user) = @arg{qw( user host sudo_user )};
my $sudo = defined($sudo_user) ? sprintf('%s@', $sudo_user) : undef;
+ my $args = { introspectors => $arg{introspectors} };
if (defined $host) {
my $remote = join '@', grep defined, $user, $host;
if (defined $sudo_user) {
- return $class->_new_bridged($remote, $sudo);
+ return $class->_new_bridged($remote, $sudo, $args);
}
else {
- return $class->_new_direct($remote);
+ return $class->_new_direct($remote, $args);
}
}
else {
if (defined $sudo_user) {
- return $class->_new_direct($sudo);
+ return $class->_new_direct($sudo, $args);
}
else {
- return $class->new;
+ return $class->new($args);
}
}
}
use Moo;
use File::Tree::Snapshot;
use System::Introspector::Gatherer;
+use Object::Remote::Future qw( await_all );
use JSON::Diffable qw( encode_json );
sub sudo_user { $_[0]->config->sudo_user }
+sub _log { shift; printf "[%s] %s\n", scalar(localtime), join '', @_ }
+
sub gather {
my ($self, @groups) = @_;
- for my $host ($self->config->hosts) {
- $self->fetch_and_store($host, @groups);
+ $self->_log('Start');
+ for my $group (@groups) {
+ my @waiting;
+ for my $host ($self->config->hosts) {
+ $self->_log("Beginning to fetch group '$group' on '$host'");
+ push @waiting, [$host, $self->fetch($host, $group)];
+ }
+ $self->_log("Now waiting for results");
+ my @results = map {
+ my ($host, @futures) = @$_;
+ my $done = [$host, await_all @futures];
+ $self->_log("Received all from group '$group' on '$host'");
+ $done;
+ } @waiting;
+ $self->_log("All gathered for group '$group'");
+ for my $result (@results) {
+ my ($host, @data) = @$result;
+ $self->_store($host, $group, +{ map %$_, @data });
+ }
}
+ $self->_log('Done');
return 1;
}
return $self->config->config_for_group($group)->{introspect};
}
-sub fetch_and_store {
- my ($self, $host, @groups) = @_;
- my $data = $self->fetch($host, @groups);
- return $self->_store($host, $data);
-}
-
sub fetch {
- my ($self, $host, @groups) = @_;
- return +{ map {
- ($_, $self->fetch_group($host, $_));
- } @groups };
-}
-
-sub fetch_group {
my ($self, $host, $group) = @_;
my $spec = $self->introspectors($group);
my (@sudo, @nosudo);
push(@{ $spec->{$_}{sudo} ? \@sudo : \@nosudo}, [$_, $spec->{$_}])
for sort keys %$spec;
- my %report;
+ my @futures;
if (@nosudo) {
- my $gatherer = $self->_create_gatherer(host => $host);
- %report = %{ $self->_fetch_with_gatherer($gatherer, @nosudo) || {} };
+ $self->_log("Without sudo: ", join ", ", map $_->[0], @nosudo);
+ my $proxy = $self->_create_gatherer(
+ host => $host,
+ introspectors => [@nosudo],
+ );
+ push @futures, $proxy->start::gather_all;
}
if (@sudo) {
- my $gatherer = $self->_create_gatherer(sudo => 1, host => $host);
- %report = (%report, %{ $self->_fetch_with_gatherer($gatherer, @sudo) || {} });
- }
- return \%report;
-}
-
-sub _fetch_with_gatherer {
- my ($self, $gatherer, @spec) = @_;
- my %report;
- for my $class_spec (@spec) {
- my ($class_base, $args) = @$class_spec;
- print "Gathering $class_base data\n";
- $report{ $class_base } = $gatherer
- ->gather($class_base, $args);
+ $self->_log("With sudo: ", join ", ", map $_->[0], @nosudo);
+ my $proxy = $self->_create_gatherer(
+ sudo => 1,
+ host => $host,
+ introspectors => [@sudo],
+ );
+ push @futures, $proxy->start::gather_all;
}
- print "All gathered\n";
- return \%report;
+ return @futures;
}
sub storage {
}
sub _store {
- my ($self, $host, $data) = @_;
- for my $group (sort keys %$data) {
- my $storage = $self->storage($host, $group);
- my $gathered = $data->{$group};
+ my ($self, $host, $group, $gathered) = @_;
+ my $storage = $self->storage($host, $group);
+ my $ok = eval {
my @files;
for my $class (sort keys %$gathered) {
my $file = sprintf '%s.json', join '/',
$_;
} split m{::}, $class;
my $fh = $storage->open('>:utf8', $file, mkpath => 1);
- print "Writing $file\n";
+ my $full_path = $storage->file($file);
+ $self->_log("Writing $full_path");
print $fh encode_json($gathered->{$class});
- push @files, $storage->file($file);
+ push @files, $full_path;
}
$self->_cleanup($storage, [@files]);
+ $self->_log("Committing");
$storage->commit;
+ };
+ unless ($ok) {
+ $self->_log("Rolling back snapshot because of: ", $@ || 'unknown error');
+ $storage->rollback;
+ die $@;
}
return 1;
}
my @files = $storage->find_files('json');
for my $file (@files) {
next if $known{$file};
- print "Removing $file\n";
+ $self->_log("Removing $file");
unlink($file)
or die "Unable to remove '$file': $!\n";
}
sub _create_gatherer {
my ($self, %arg) = @_;
return System::Introspector::Gatherer->new_from_spec(
- user => $self->user,
- host => $arg{host},
- sudo_user => $arg{sudo} && $self->sudo_user,
+ user => $self->user,
+ host => $arg{host},
+ sudo_user => $arg{sudo} && $self->sudo_user,
+ introspectors => $arg{introspectors},
);
}