package System::Introspector::Report::Publish::MediaWiki;
use Moo;
use Try::Tiny;
+use Object::Remote;
+use Object::Remote::Future;
use Log::Contextual::WarnLogger;
use Log::Contextual qw( :log ),
-default_logger => Log::Contextual::WarnLogger->new({
levels => [qw( error fatal warn )],
});
-use aliased 'System::Introspector::Report::Publish::MediaWiki::Connection';
+use aliased 'System::Introspector::Report::Publish::MediaWiki::Updater';
has page_options => (
is => 'ro',
init_arg => 'page',
);
-has connection => (is => 'ro', lazy => 1, builder => 1, handles => {
- get_page => 'get',
- put_page => 'put',
-});
-
-has api_uri => (is => 'ro', required => 1);
-has username => (is => 'ro', required => 1);
-has password => (is => 'ro', required => 1);
-has allow_create => (is => 'ro', default => sub { 0 });
-has http_auth => (is => 'ro');
-has http_realm => (is => 'ro');
+has process_count => (
+ is => 'ro',
+ isa => sub {
+ die "MediaWiki processes count has to be above 0\n"
+ if $_[0] < 1;
+ },
+ default => sub { 1 },
+ init_arg => 'processes',
+);
-sub _build_producer { Producer->new }
+has updater_pool => (
+ is => 'lazy',
+ init_arg => undef,
+);
-sub _build_connection {
+sub _build_updater_pool {
my ($self) = @_;
- return Connection->new(
- api_uri => $self->api_uri,
- username => $self->username,
- password => $self->password,
- allow_create => $self->allow_create,
- $self->http_auth ? (
- http_auth => 1,
- http_realm => $self->http_realm,
- ) : (),
- );
+ return [map {
+ Updater->new::on('-', $self->connect_info_pairs);
+ } 1 .. $self->process_count];
}
sub publish {
sprintf "Pushing reports to MediaWiki at '%s'", $self->api_uri;
};
for my $page (sort keys %$pages) {
- $self->_publish_page($reports, $page, $pages->{$page});
+ my $options = $pages->{$page};
+ if ($page =~ m{\%\(meta:(.+?)\)}) {
+ return $self->_publish_dynamic($reports, $page, $1, $options);
+ }
+ else {
+ $self->_publish_pages([$reports, $page, $options]);
+ }
}
log_info { "Finished pushing to MediaWiki" };
return 1;
}
+sub _publish_pages {
+ my ($self, @pages) = @_;
+ my $stream = sub {
+ my $next = shift @pages
+ or return undef;
+ my ($reports, $page_name, $options) = @$next;
+ my $sorted = $self->_sort_reports($reports, $options->{report} || []);
+ return [$page_name, $options, $sorted];
+ };
+ await_all map {
+ $_->start::process($stream);
+ } @{ $self->updater_pool };
+}
+
sub _sort_reports {
my ($self, $reports, $included) = @_;
my @matchers = map {
next unless defined $value;
push @{ $reports_by_key{$value} ||= [] }, $report;
}
+ my @pages;
for my $value (keys %reports_by_key) {
(my $effective_page = $page_name)
=~ s{\%\(meta:\Q$key\E\)}{$value};
my $assoc = $reports_by_key{$value};
- $self->_publish_page($assoc, $effective_page, $options);
+ push @pages, [$assoc, $effective_page, $options];
}
+ $self->_publish_pages(@pages);
return 1;
}
-sub _publish_page {
- my ($self, $reports, $page_name, $options) = @_;
- if ($page_name =~ m{\%\(meta:(.+?)\)}) {
- return $self->_publish_dynamic($reports, $page_name, $1, $options);
- }
- my $sorted = $self->_sort_reports($reports, $options->{report} || []);
- unless (scalar @$sorted) {
- log_debug { "Skipping page '$page_name': No reports to publish" };
- return 1;
- }
- my $do_create = $options->{create};
- return try {
- my $page = $self->get_page($page_name);
- if ($page->is_new and not $do_create) {
- log_trace { "Skipping page '$page_name': Does not yet exist" };
- return 1;
- }
- my $is_changed = $page->update($sorted);
- unless ($is_changed) {
- log_debug { "Not pushing page '$page_name': No changes" };
- return 1;
- }
- log_debug { "Updating page '$page_name'" };
- $self->put_page($page);
- return 1;
- }
- catch {
- log_error { "Error during page update: $_" };
- return 1;
- };
-}
-
with $_ for qw(
System::Introspector::Report::Publish::API
+ System::Introspector::Report::Publish::MediaWiki::HasConnectInfo
);
1;
--- /dev/null
+package System::Introspector::Report::Publish::MediaWiki::Updater;
+use Moo;
+use Try::Tiny;
+use Module::Runtime qw( use_module );
+use Log::Contextual::WarnLogger;
+use Log::Contextual qw( :log ),
+ -default_logger => Log::Contextual::WarnLogger->new({
+ env_prefix => 'SI_REPORT_MEDIAWIKI_UPDATER',
+ levels => [qw( error fatal warn info trace debug )],
+ });
+
+has connection => (is => 'ro', lazy => 1, builder => 1, handles => {
+ get_page => 'get',
+ put_page => 'put',
+});
+
+sub _build_connection {
+ my ($self) = @_;
+ return use_module($self->connection_class)->new(
+ api_uri => $self->api_uri,
+ username => $self->username,
+ password => $self->password,
+ allow_create => 1,
+ $self->http_auth ? (
+ http_auth => 1,
+ http_realm => $self->http_realm,
+ ) : (),
+ );
+}
+
+sub process {
+ my ($self, $stream) = @_;
+ while (my $report = $stream->()) {
+ $self->_process_page(@$report);
+ }
+ return 1;
+}
+
+sub _process_page {
+ my ($self, $page_name, $options, $reports) = @_;
+ unless (scalar @$reports) {
+ log_debug { "Skipping page '$page_name': No reports to publish" };
+ return 1;
+ }
+ my $do_create = $options->{create};
+ return try {
+ my $page = $self->get_page($page_name);
+ if ($page->is_new and not $do_create) {
+ log_trace { "Skipping page '$page_name': Does not yet exist" };
+ return 1;
+ }
+ my $is_changed = $page->update($reports);
+ unless ($is_changed) {
+ log_debug { "Not pushing page '$page_name': No changes" };
+ return 1;
+ }
+ log_debug { "Updating page '$page_name'" };
+ $self->put_page($page);
+ return 1;
+ }
+ catch {
+ log_error { "Error during page update: $_" };
+ return 1;
+ };
+}
+
+with $_ for qw(
+ System::Introspector::Report::Publish::MediaWiki::HasConnectInfo
+);
+
+1;
use Test::More;
use IO::All;
use FindBin;
-
+use File::Temp;
+use lib "$FindBin::Bin/lib";
use aliased 'System::Introspector::Report::Publish::MediaWiki';
-my %result;
-
-do {
- package TestConnection;
- use Moo;
- use IO::All;
- use aliased 'System::Introspector::Report::Publish::MediaWiki::Page';
+$ENV{TEST_SI_REPORT_OUT} = my $out_dir = File::Temp->newdir;
+$ENV{TEST_SI_REPORT_IN} = "$FindBin::Bin/data";
- sub get {
- my ($self, $name) = @_;
- return Page->new(
- name => $name,
- timestamp => 23,
- content => scalar(io("$FindBin::Bin/data/$name.txt")->slurp),
- );
- }
-
- sub put {
- my ($self, $page) = @_;
- $result{$page->name} = $page->content;
- }
-};
+my %result;
-my $conn = TestConnection->new;
my $wiki = MediaWiki->new(
api_uri => 'http://example.com:9999/',
+ connection_class => 'TestConnection',
username => 'foo',
password => 'bar',
- connection => $conn,
page => {
foo => {
report => ['foo:*'],
};
## uncomment to regenerate result file
-# do { no warnings; $result{foo} > io("$FindBin::Bin/data/result/foo.txt") };
+# do { no warnings; io("$out_dir/foo") > io("$FindBin::Bin/data/result/foo.txt"); fail "regenerated" };
-is $result{foo}->$_despace,
+is scalar(io("$out_dir/foo")->slurp)->$_despace,
scalar(io("$FindBin::Bin/data/result/foo.txt")->slurp)->$_despace,
'resulting page looks correct';