parallel fetching of multiple hosts
[scpubgit/System-Introspector.git] / lib / System / Introspector / State.pm
1 package System::Introspector::State;
2 use Moo;
3 use File::Tree::Snapshot;
4 use System::Introspector::Gatherer;
5 use Object::Remote::Future qw( await_all );
6
7 use JSON::Diffable qw( encode_json );
8
9 has config => (is => 'ro', required => 1);
10
11 has root => (is => 'ro', required => 1);
12
13 sub user { $_[0]->config->user }
14
15 sub sudo_user { $_[0]->config->sudo_user }
16
17 sub _log { shift; printf "[%s] %s\n", scalar(localtime), join '', @_ }
18
19 sub gather {
20     my ($self, @groups) = @_;
21     $self->_log('Start');
22     for my $group (@groups) {
23         my @waiting;
24         for my $host ($self->config->hosts) {
25             $self->_log("Beginning to fetch group '$group' on '$host'");
26             push @waiting, [$host, $self->fetch($host, $group)];
27         }
28         $self->_log("Now waiting for results");
29         my @results = map {
30             my ($host, @futures) = @$_;
31             my $done = [$host, await_all @futures];
32             $self->_log("Received all from group '$group' on '$host'");
33             $done;
34         } @waiting;
35         $self->_log("All gathered for group '$group'");
36         for my $result (@results) {
37             my ($host, @data) = @$result;
38             $self->_store($host, $group, +{ map %$_, @data });
39         }
40     }
41     $self->_log('Done');
42     return 1;
43 }
44
45 sub introspectors {
46     my ($self, $group) = @_;
47     return $self->config->config_for_group($group)->{introspect};
48 }
49
50 sub fetch {
51     my ($self, $host, $group) = @_;
52     my $spec = $self->introspectors($group);
53     my (@sudo, @nosudo);
54     push(@{ $spec->{$_}{sudo} ? \@sudo : \@nosudo}, [$_, $spec->{$_}])
55         for sort keys %$spec;
56     my @futures;
57     if (@nosudo) {
58         $self->_log("Without sudo: ", join ", ", map $_->[0], @nosudo);
59         my $proxy = $self->_create_gatherer(
60             host => $host,
61             introspectors => [@nosudo],
62         );
63         push @futures, $proxy->start::gather_all;
64     }
65     if (@sudo) {
66         $self->_log("With sudo: ", join ", ", map $_->[0], @nosudo);
67         my $proxy = $self->_create_gatherer(
68             sudo => 1,
69             host => $host,
70             introspectors => [@sudo],
71         );
72         push @futures, $proxy->start::gather_all;
73     }
74     return @futures;
75 }
76
77 sub storage {
78     my ($self, @path) = @_;
79     my $storage = File::Tree::Snapshot->new(
80         allow_empty  => 0,
81         storage_path => join('/', $self->root, @path),
82     );
83     $storage->create
84         unless $storage->exists;
85     return $storage;
86 }
87
88 sub _store {
89     my ($self, $host, $group, $gathered) = @_;
90     my $storage = $self->storage($host, $group);
91     my $ok = eval {
92         my @files;
93         for my $class (sort keys %$gathered) {
94             my $file = sprintf '%s.json', join '/',
95                 map lc, map {
96                     s{([a-z0-9])([A-Z])}{${1}_${2}}g;
97                     $_;
98                 } split m{::}, $class;
99             my $fh = $storage->open('>:utf8', $file, mkpath => 1);
100             my $full_path = $storage->file($file);
101             $self->_log("Writing $full_path");
102             print $fh encode_json($gathered->{$class});
103             push @files, $full_path;
104         }
105         $self->_cleanup($storage, [@files]);
106         $self->_log("Committing");
107         $storage->commit;
108     };
109     unless ($ok) {
110         $self->_log("Rolling back snapshot because of: ", $@ || 'unknown error');
111         $storage->rollback;
112         die $@;
113     }
114     return 1;
115 }
116
117 sub _cleanup {
118     my ($self, $storage, $known_files) = @_;
119     my %known = map { ($_ => 1) } @$known_files;
120     my @files = $storage->find_files('json');
121     for my $file (@files) {
122         next if $known{$file};
123         $self->_log("Removing $file");
124         unlink($file)
125             or die "Unable to remove '$file': $!\n";
126     }
127     return 1;
128 }
129
130 sub _create_gatherer {
131     my ($self, %arg) = @_;
132     return System::Introspector::Gatherer->new_from_spec(
133         user          => $self->user,
134         host          => $arg{host},
135         sudo_user     => $arg{sudo} && $self->sudo_user,
136         introspectors => $arg{introspectors},
137     );
138 }
139
140 1;
141
142 =head1 NAME
143
144 System::Introspector::State - Gather system state
145
146 =head1 SYNOPSIS
147
148     my $state = System::Introspector::State->new(
149         host    => 'foo.example.com',
150         storage => $storage_obj,
151         config  => {
152             introspect => [qw( ProbeName )],
153         },
154     );
155
156     my $data = $state->fetch;
157     $state->fetch_and_store;
158
159 =head1 DESCRIPTION
160
161 Gathers system introspection data based on configuration and stores
162 it with a L<File::Tree::Snapshot> object.
163
164 =head1 ATTRIBUTES
165
166 =head2 config
167
168 A hash reference containing a C<introspect> key with an array reference
169 value containing a list of probe names without the
170 C<System::Introspector::Probe::> prefix. This attribute is required.
171
172 =head2 host
173
174 An optional hostname. If no hostname is supplied, the local configuration
175 data will be fetched.
176
177 =head2 storage
178
179 A L<File::Tree::Snapshot> object.
180
181 =head1 METHODS
182
183 =head2 fetch
184
185     my $data = $state->fetch;
186
187 Fetches all probe data.
188
189 =head2 fetch_and_store
190
191     $state->fetch_and_store;
192
193 Fetches all probe data and stores it in the L</storage>.
194
195 =cut