From: Robert 'phaylon' Sedlacek Date: Thu, 4 Oct 2012 20:01:13 +0000 (+0000) Subject: allow usage of a pool of mediawiki updaters instead of just single-process X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=commitdiff_plain;h=8a1868871d37c2f69c4d8861d1a7fb1924dd1f10;p=scpubgit%2FSystem-Introspector-Report.git allow usage of a pool of mediawiki updaters instead of just single-process --- diff --git a/Makefile.PL b/Makefile.PL index 86cba39..d532751 100644 --- a/Makefile.PL +++ b/Makefile.PL @@ -18,6 +18,7 @@ WriteMakefile( 'Object::Remote' => '0.001001', 'MediaWiki::API' => '0.39', 'Log::Contextual' => '0.004202', + 'Module::Runtime' => '0.013', 'MRO::Compat' => 0, 'Class::C3' => 0, 'Getopt::Long' => 0, diff --git a/lib/System/Introspector/Report/Publish/MediaWiki.pm b/lib/System/Introspector/Report/Publish/MediaWiki.pm index 768a465..338c3a0 100644 --- a/lib/System/Introspector/Report/Publish/MediaWiki.pm +++ b/lib/System/Introspector/Report/Publish/MediaWiki.pm @@ -1,6 +1,8 @@ package System::Introspector::Report::Publish::MediaWiki; use Moo; use Try::Tiny; +use Object::Remote; +use Object::Remote::Future; use Log::Contextual::WarnLogger; use Log::Contextual qw( :log ), -default_logger => Log::Contextual::WarnLogger->new({ @@ -8,7 +10,7 @@ use Log::Contextual qw( :log ), levels => [qw( error fatal warn )], }); -use aliased 'System::Introspector::Report::Publish::MediaWiki::Connection'; +use aliased 'System::Introspector::Report::Publish::MediaWiki::Updater'; has page_options => ( is => 'ro', @@ -16,32 +18,26 @@ has page_options => ( init_arg => 'page', ); -has connection => (is => 'ro', lazy => 1, builder => 1, handles => { - get_page => 'get', - put_page => 'put', -}); - -has api_uri => (is => 'ro', required => 1); -has username => (is => 'ro', required => 1); -has password => (is => 'ro', required => 1); -has allow_create => (is => 'ro', default => sub { 0 }); -has http_auth => (is => 'ro'); -has http_realm => (is => 'ro'); +has process_count => ( + is => 'ro', + isa => sub { + die "MediaWiki processes count has to be above 0\n" + if $_[0] < 1; + }, + default => sub { 1 }, + init_arg => 'processes', +); -sub _build_producer { Producer->new } +has updater_pool => ( + is => 'lazy', + init_arg => undef, +); -sub _build_connection { +sub _build_updater_pool { my ($self) = @_; - return Connection->new( - api_uri => $self->api_uri, - username => $self->username, - password => $self->password, - allow_create => $self->allow_create, - $self->http_auth ? ( - http_auth => 1, - http_realm => $self->http_realm, - ) : (), - ); + return [map { + Updater->new::on('-', $self->connect_info_pairs); + } 1 .. $self->process_count]; } sub publish { @@ -51,12 +47,32 @@ sub publish { sprintf "Pushing reports to MediaWiki at '%s'", $self->api_uri; }; for my $page (sort keys %$pages) { - $self->_publish_page($reports, $page, $pages->{$page}); + my $options = $pages->{$page}; + if ($page =~ m{\%\(meta:(.+?)\)}) { + return $self->_publish_dynamic($reports, $page, $1, $options); + } + else { + $self->_publish_pages([$reports, $page, $options]); + } } log_info { "Finished pushing to MediaWiki" }; return 1; } +sub _publish_pages { + my ($self, @pages) = @_; + my $stream = sub { + my $next = shift @pages + or return undef; + my ($reports, $page_name, $options) = @$next; + my $sorted = $self->_sort_reports($reports, $options->{report} || []); + return [$page_name, $options, $sorted]; + }; + await_all map { + $_->start::process($stream); + } @{ $self->updater_pool }; +} + sub _sort_reports { my ($self, $reports, $included) = @_; my @matchers = map { @@ -82,49 +98,20 @@ sub _publish_dynamic { next unless defined $value; push @{ $reports_by_key{$value} ||= [] }, $report; } + my @pages; for my $value (keys %reports_by_key) { (my $effective_page = $page_name) =~ s{\%\(meta:\Q$key\E\)}{$value}; my $assoc = $reports_by_key{$value}; - $self->_publish_page($assoc, $effective_page, $options); + push @pages, [$assoc, $effective_page, $options]; } + $self->_publish_pages(@pages); return 1; } -sub _publish_page { - my ($self, $reports, $page_name, $options) = @_; - if ($page_name =~ m{\%\(meta:(.+?)\)}) { - return $self->_publish_dynamic($reports, $page_name, $1, $options); - } - my $sorted = $self->_sort_reports($reports, $options->{report} || []); - unless (scalar @$sorted) { - log_debug { "Skipping page '$page_name': No reports to publish" }; - return 1; - } - my $do_create = $options->{create}; - return try { - my $page = $self->get_page($page_name); - if ($page->is_new and not $do_create) { - log_trace { "Skipping page '$page_name': Does not yet exist" }; - return 1; - } - my $is_changed = $page->update($sorted); - unless ($is_changed) { - log_debug { "Not pushing page '$page_name': No changes" }; - return 1; - } - log_debug { "Updating page '$page_name'" }; - $self->put_page($page); - return 1; - } - catch { - log_error { "Error during page update: $_" }; - return 1; - }; -} - with $_ for qw( System::Introspector::Report::Publish::API + System::Introspector::Report::Publish::MediaWiki::HasConnectInfo ); 1; diff --git a/lib/System/Introspector/Report/Publish/MediaWiki/HasConnectInfo.pm b/lib/System/Introspector/Report/Publish/MediaWiki/HasConnectInfo.pm new file mode 100644 index 0000000..96e1d2f --- /dev/null +++ b/lib/System/Introspector/Report/Publish/MediaWiki/HasConnectInfo.pm @@ -0,0 +1,27 @@ +package System::Introspector::Report::Publish::MediaWiki::HasConnectInfo; +use Moo::Role; + +has connection_class => (is => 'ro', lazy => 1, builder => 1); +has api_uri => (is => 'ro', required => 1); +has username => (is => 'ro', required => 1); +has password => (is => 'ro', required => 1); +has http_auth => (is => 'ro'); +has http_realm => (is => 'ro'); + +sub _build_connection_class { + 'System::Introspector::Report::Publish::MediaWiki::Connection' +} + +sub connect_info_pairs { + my ($self) = @_; + return map { ($_ => $self->$_) } qw( + api_uri + username + password + http_auth + http_realm + connection_class + ); +} + +1; diff --git a/lib/System/Introspector/Report/Publish/MediaWiki/Updater.pm b/lib/System/Introspector/Report/Publish/MediaWiki/Updater.pm new file mode 100644 index 0000000..1ae642b --- /dev/null +++ b/lib/System/Introspector/Report/Publish/MediaWiki/Updater.pm @@ -0,0 +1,71 @@ +package System::Introspector::Report::Publish::MediaWiki::Updater; +use Moo; +use Try::Tiny; +use Module::Runtime qw( use_module ); +use Log::Contextual::WarnLogger; +use Log::Contextual qw( :log ), + -default_logger => Log::Contextual::WarnLogger->new({ + env_prefix => 'SI_REPORT_MEDIAWIKI_UPDATER', + levels => [qw( error fatal warn info trace debug )], + }); + +has connection => (is => 'ro', lazy => 1, builder => 1, handles => { + get_page => 'get', + put_page => 'put', +}); + +sub _build_connection { + my ($self) = @_; + return use_module($self->connection_class)->new( + api_uri => $self->api_uri, + username => $self->username, + password => $self->password, + allow_create => 1, + $self->http_auth ? ( + http_auth => 1, + http_realm => $self->http_realm, + ) : (), + ); +} + +sub process { + my ($self, $stream) = @_; + while (my $report = $stream->()) { + $self->_process_page(@$report); + } + return 1; +} + +sub _process_page { + my ($self, $page_name, $options, $reports) = @_; + unless (scalar @$reports) { + log_debug { "Skipping page '$page_name': No reports to publish" }; + return 1; + } + my $do_create = $options->{create}; + return try { + my $page = $self->get_page($page_name); + if ($page->is_new and not $do_create) { + log_trace { "Skipping page '$page_name': Does not yet exist" }; + return 1; + } + my $is_changed = $page->update($reports); + unless ($is_changed) { + log_debug { "Not pushing page '$page_name': No changes" }; + return 1; + } + log_debug { "Updating page '$page_name'" }; + $self->put_page($page); + return 1; + } + catch { + log_error { "Error during page update: $_" }; + return 1; + }; +} + +with $_ for qw( + System::Introspector::Report::Publish::MediaWiki::HasConnectInfo +); + +1; diff --git a/t/lib/TestConnection.pm b/t/lib/TestConnection.pm new file mode 100644 index 0000000..1956a29 --- /dev/null +++ b/t/lib/TestConnection.pm @@ -0,0 +1,25 @@ +package TestConnection; +use Moo; +use IO::All; +use aliased 'System::Introspector::Report::Publish::MediaWiki::Page'; + +extends 'System::Introspector::Report::Publish::MediaWiki::Connection'; + +sub get { + my ($self, $name) = @_; + return Page->new( + name => $name, + timestamp => 23, + content => scalar(io("$ENV{TEST_SI_REPORT_IN}/$name.txt")->slurp), + ); +} + +sub put { + my ($self, $page) = @_; + my $out = $ENV{TEST_SI_REPORT_OUT} + or die "No output directory"; + io("$out/" . $page->name)->print($page->content); + return 1; +} + +1; diff --git a/t/publish_mediawiki.t b/t/publish_mediawiki.t index a328e8e..1731bdf 100644 --- a/t/publish_mediawiki.t +++ b/t/publish_mediawiki.t @@ -2,38 +2,20 @@ use strictures 1; use Test::More; use IO::All; use FindBin; - +use File::Temp; +use lib "$FindBin::Bin/lib"; use aliased 'System::Introspector::Report::Publish::MediaWiki'; -my %result; - -do { - package TestConnection; - use Moo; - use IO::All; - use aliased 'System::Introspector::Report::Publish::MediaWiki::Page'; +$ENV{TEST_SI_REPORT_OUT} = my $out_dir = File::Temp->newdir; +$ENV{TEST_SI_REPORT_IN} = "$FindBin::Bin/data"; - sub get { - my ($self, $name) = @_; - return Page->new( - name => $name, - timestamp => 23, - content => scalar(io("$FindBin::Bin/data/$name.txt")->slurp), - ); - } - - sub put { - my ($self, $page) = @_; - $result{$page->name} = $page->content; - } -}; +my %result; -my $conn = TestConnection->new; my $wiki = MediaWiki->new( api_uri => 'http://example.com:9999/', + connection_class => 'TestConnection', username => 'foo', password => 'bar', - connection => $conn, page => { foo => { report => ['foo:*'], @@ -89,9 +71,9 @@ my $_despace = sub { }; ## uncomment to regenerate result file -# do { no warnings; $result{foo} > io("$FindBin::Bin/data/result/foo.txt") }; +# do { no warnings; io("$out_dir/foo") > io("$FindBin::Bin/data/result/foo.txt"); fail "regenerated" }; -is $result{foo}->$_despace, +is scalar(io("$out_dir/foo")->slurp)->$_despace, scalar(io("$FindBin::Bin/data/result/foo.txt")->slurp)->$_despace, 'resulting page looks correct';