parallel fetching of multiple hosts
Robert 'phaylon' Sedlacek [Wed, 11 Jul 2012 19:09:23 +0000 (19:09 +0000)]
lib/System/Introspector/Gatherer.pm
lib/System/Introspector/Gatherer/Bridge.pm
lib/System/Introspector/State.pm
lib/System/Introspector/Util.pm

index 43ffff4..d88f590 100644 (file)
@@ -5,23 +5,31 @@ use Object::Remote::Future;
 use System::Introspector::Gatherer::Bridge;
 use Module::Runtime qw( use_module );
 
-sub gather {
-    my ($self, $class, $args) = @_;
-    return use_module("System::Introspector::Probe::$class")
-        ->new($args)
-        ->gather;
+has introspectors => (is => 'ro', required => 1);
+
+sub gather_all {
+    my ($self) = @_;
+    my %report;
+    for my $spec (@{ $self->introspectors }) {
+        my ($base, $args) = @$spec;
+        $report{$base} = use_module("System::Introspector::Probe::$base")
+            ->new($args)
+            ->gather;
+    }
+    return \%report;
 }
 
 sub _new_direct {
-    my ($class, $remote) = @_;
-    return $class->new::on($remote);
+    my ($class, $remote, $args) = @_;
+    return $class->new::on($remote, $args || {});
 }
 
 sub _new_bridged {
-    my ($class, $bridge, $remote) = @_;
+    my ($class, $bridge, $remote, $args) = @_;
     return System::Introspector::Gatherer::Bridge->new::on($bridge,
         remote_spec  => $remote,
         remote_class => $class,
+        remote_args  => $args || {},
     );
 }
 
@@ -29,21 +37,22 @@ sub new_from_spec {
     my ($class, %arg) = @_;
     my ($user, $host, $sudo_user) = @arg{qw( user host sudo_user )};
     my $sudo = defined($sudo_user) ? sprintf('%s@', $sudo_user) : undef;
+    my $args = { introspectors => $arg{introspectors} };
     if (defined $host) {
         my $remote = join '@', grep defined, $user, $host;
         if (defined $sudo_user) {
-            return $class->_new_bridged($remote, $sudo);
+            return $class->_new_bridged($remote, $sudo, $args);
         }
         else {
-            return $class->_new_direct($remote);
+            return $class->_new_direct($remote, $args);
         }
     }
     else {
         if (defined $sudo_user) {
-            return $class->_new_direct($sudo);
+            return $class->_new_direct($sudo, $args);
         }
         else {
-            return $class->new;
+            return $class->new($args);
         }
     }
 }
index dd6cd19..e802691 100644 (file)
@@ -5,11 +5,13 @@ use Moo;
 
 has remote_spec => (is => 'ro', required => 1);
 has remote_class => (is => 'ro', required => 1);
+has remote_args => (is => 'ro', required => 1);
 has remote => (is => 'lazy');
 
 sub _build_remote {
     my ($self) = @_;
-    return $self->remote_class->new::on($self->remote_spec);
+    return $self->remote_class
+        ->new::on($self->remote_spec, $self->remote_args);
 }
 
 sub gather { (shift)->remote->gather(@_) }
index 3f20876..678bc42 100644 (file)
@@ -2,6 +2,7 @@ package System::Introspector::State;
 use Moo;
 use File::Tree::Snapshot;
 use System::Introspector::Gatherer;
+use Object::Remote::Future qw( await_all );
 
 use JSON::Diffable qw( encode_json );
 
@@ -13,11 +14,31 @@ sub user { $_[0]->config->user }
 
 sub sudo_user { $_[0]->config->sudo_user }
 
+sub _log { shift; printf "[%s] %s\n", scalar(localtime), join '', @_ }
+
 sub gather {
     my ($self, @groups) = @_;
-    for my $host ($self->config->hosts) {
-        $self->fetch_and_store($host, @groups);
+    $self->_log('Start');
+    for my $group (@groups) {
+        my @waiting;
+        for my $host ($self->config->hosts) {
+            $self->_log("Beginning to fetch group '$group' on '$host'");
+            push @waiting, [$host, $self->fetch($host, $group)];
+        }
+        $self->_log("Now waiting for results");
+        my @results = map {
+            my ($host, @futures) = @$_;
+            my $done = [$host, await_all @futures];
+            $self->_log("Received all from group '$group' on '$host'");
+            $done;
+        } @waiting;
+        $self->_log("All gathered for group '$group'");
+        for my $result (@results) {
+            my ($host, @data) = @$result;
+            $self->_store($host, $group, +{ map %$_, @data });
+        }
     }
+    $self->_log('Done');
     return 1;
 }
 
@@ -26,48 +47,31 @@ sub introspectors {
     return $self->config->config_for_group($group)->{introspect};
 }
 
-sub fetch_and_store {
-    my ($self, $host, @groups) = @_;
-    my $data = $self->fetch($host, @groups);
-    return $self->_store($host, $data);
-}
-
 sub fetch {
-    my ($self, $host, @groups) = @_;
-    return +{ map {
-        ($_, $self->fetch_group($host, $_));
-    } @groups };
-}
-
-sub fetch_group {
     my ($self, $host, $group) = @_;
     my $spec = $self->introspectors($group);
     my (@sudo, @nosudo);
     push(@{ $spec->{$_}{sudo} ? \@sudo : \@nosudo}, [$_, $spec->{$_}])
         for sort keys %$spec;
-    my %report;
+    my @futures;
     if (@nosudo) {
-        my $gatherer = $self->_create_gatherer(host => $host);
-        %report = %{ $self->_fetch_with_gatherer($gatherer, @nosudo) || {} };
+        $self->_log("Without sudo: ", join ", ", map $_->[0], @nosudo);
+        my $proxy = $self->_create_gatherer(
+            host => $host,
+            introspectors => [@nosudo],
+        );
+        push @futures, $proxy->start::gather_all;
     }
     if (@sudo) {
-        my $gatherer = $self->_create_gatherer(sudo => 1, host => $host);
-        %report = (%report, %{ $self->_fetch_with_gatherer($gatherer, @sudo) || {} });
-    }
-    return \%report;
-}
-
-sub _fetch_with_gatherer {
-    my ($self, $gatherer, @spec) = @_;
-    my %report;
-    for my $class_spec (@spec) {
-        my ($class_base, $args) = @$class_spec;
-        print "Gathering $class_base data\n";
-        $report{ $class_base } = $gatherer
-            ->gather($class_base, $args);
+        $self->_log("With sudo: ", join ", ", map $_->[0], @nosudo);
+        my $proxy = $self->_create_gatherer(
+            sudo => 1,
+            host => $host,
+            introspectors => [@sudo],
+        );
+        push @futures, $proxy->start::gather_all;
     }
-    print "All gathered\n";
-    return \%report;
+    return @futures;
 }
 
 sub storage {
@@ -82,10 +86,9 @@ sub storage {
 }
 
 sub _store {
-    my ($self, $host, $data) = @_;
-    for my $group (sort keys %$data) {
-        my $storage = $self->storage($host, $group);
-        my $gathered = $data->{$group};
+    my ($self, $host, $group, $gathered) = @_;
+    my $storage = $self->storage($host, $group);
+    my $ok = eval {
         my @files;
         for my $class (sort keys %$gathered) {
             my $file = sprintf '%s.json', join '/',
@@ -94,12 +97,19 @@ sub _store {
                     $_;
                 } split m{::}, $class;
             my $fh = $storage->open('>:utf8', $file, mkpath => 1);
-            print "Writing $file\n";
+            my $full_path = $storage->file($file);
+            $self->_log("Writing $full_path");
             print $fh encode_json($gathered->{$class});
-            push @files, $storage->file($file);
+            push @files, $full_path;
         }
         $self->_cleanup($storage, [@files]);
+        $self->_log("Committing");
         $storage->commit;
+    };
+    unless ($ok) {
+        $self->_log("Rolling back snapshot because of: ", $@ || 'unknown error');
+        $storage->rollback;
+        die $@;
     }
     return 1;
 }
@@ -110,7 +120,7 @@ sub _cleanup {
     my @files = $storage->find_files('json');
     for my $file (@files) {
         next if $known{$file};
-        print "Removing $file\n";
+        $self->_log("Removing $file");
         unlink($file)
             or die "Unable to remove '$file': $!\n";
     }
@@ -120,9 +130,10 @@ sub _cleanup {
 sub _create_gatherer {
     my ($self, %arg) = @_;
     return System::Introspector::Gatherer->new_from_spec(
-        user        => $self->user,
-        host        => $arg{host},
-        sudo_user   => $arg{sudo} && $self->sudo_user,
+        user          => $self->user,
+        host          => $arg{host},
+        sudo_user     => $arg{sudo} && $self->sudo_user,
+        introspectors => $arg{introspectors},
     );
 }
 
index c73572b..a953dc2 100644 (file)
@@ -16,6 +16,7 @@ our @EXPORT_OK = qw(
     lines_from_command
     files_from_dir
     transform_exceptions
+    fail
 );
 
 do {