allow usage of a pool of mediawiki updaters instead of just single-process
[scpubgit/System-Introspector-Report.git] / lib / System / Introspector / Report / Publish / MediaWiki.pm
index 768a465..338c3a0 100644 (file)
@@ -1,6 +1,8 @@
 package System::Introspector::Report::Publish::MediaWiki;
 use Moo;
 use Try::Tiny;
+use Object::Remote;
+use Object::Remote::Future;
 use Log::Contextual::WarnLogger;
 use Log::Contextual qw( :log ),
   -default_logger => Log::Contextual::WarnLogger->new({
@@ -8,7 +10,7 @@ use Log::Contextual qw( :log ),
     levels      => [qw( error fatal warn )],
   });
 
-use aliased 'System::Introspector::Report::Publish::MediaWiki::Connection';
+use aliased 'System::Introspector::Report::Publish::MediaWiki::Updater';
 
 has page_options => (
   is => 'ro',
@@ -16,32 +18,26 @@ has page_options => (
   init_arg => 'page',
 );
 
-has connection => (is => 'ro', lazy => 1, builder => 1, handles => {
-  get_page => 'get',
-  put_page => 'put',
-});
-
-has api_uri         => (is => 'ro', required => 1);
-has username        => (is => 'ro', required => 1);
-has password        => (is => 'ro', required => 1);
-has allow_create    => (is => 'ro', default => sub { 0 });
-has http_auth       => (is => 'ro');
-has http_realm      => (is => 'ro');
+has process_count => (
+  is => 'ro',
+  isa => sub {
+    die "MediaWiki processes count has to be above 0\n"
+      if $_[0] < 1;
+  },
+  default => sub { 1 },
+  init_arg => 'processes',
+);
 
-sub _build_producer { Producer->new }
+has updater_pool => (
+  is => 'lazy',
+  init_arg => undef,
+);
 
-sub _build_connection {
+sub _build_updater_pool {
   my ($self) = @_;
-  return Connection->new(
-    api_uri         => $self->api_uri,
-    username        => $self->username,
-    password        => $self->password,
-    allow_create    => $self->allow_create,
-    $self->http_auth ? (
-      http_auth  => 1,
-      http_realm => $self->http_realm,
-    ) : (),
-  );
+  return [map {
+    Updater->new::on('-', $self->connect_info_pairs);
+  } 1 .. $self->process_count];
 }
 
 sub publish {
@@ -51,12 +47,32 @@ sub publish {
     sprintf "Pushing reports to MediaWiki at '%s'", $self->api_uri;
   };
   for my $page (sort keys %$pages) {
-    $self->_publish_page($reports, $page, $pages->{$page});
+    my $options = $pages->{$page};
+    if ($page =~ m{\%\(meta:(.+?)\)}) {
+      return $self->_publish_dynamic($reports, $page, $1, $options);
+    }
+    else {
+      $self->_publish_pages([$reports, $page, $options]);
+    }
   }
   log_info { "Finished pushing to MediaWiki" };
   return 1;
 }
 
+sub _publish_pages {
+  my ($self, @pages) = @_;
+  my $stream = sub {
+    my $next = shift @pages
+      or return undef;
+    my ($reports, $page_name, $options) = @$next;
+    my $sorted = $self->_sort_reports($reports, $options->{report} || []);
+    return [$page_name, $options, $sorted];
+  };
+  await_all map {
+    $_->start::process($stream);
+  } @{ $self->updater_pool };
+}
+
 sub _sort_reports {
   my ($self, $reports, $included) = @_;
   my @matchers = map {
@@ -82,49 +98,20 @@ sub _publish_dynamic {
     next unless defined $value;
     push @{ $reports_by_key{$value} ||= [] }, $report;
   }
+  my @pages;
   for my $value (keys %reports_by_key) {
     (my $effective_page = $page_name)
       =~ s{\%\(meta:\Q$key\E\)}{$value};
     my $assoc = $reports_by_key{$value};
-    $self->_publish_page($assoc, $effective_page, $options);
+    push @pages, [$assoc, $effective_page, $options];
   }
+  $self->_publish_pages(@pages);
   return 1;
 }
 
-sub _publish_page {
-  my ($self, $reports, $page_name, $options) = @_;
-  if ($page_name =~ m{\%\(meta:(.+?)\)}) {
-    return $self->_publish_dynamic($reports, $page_name, $1, $options);
-  }
-  my $sorted = $self->_sort_reports($reports, $options->{report} || []);
-  unless (scalar @$sorted) {
-    log_debug { "Skipping page '$page_name': No reports to publish" };
-    return 1;
-  }
-  my $do_create = $options->{create};
-  return try {
-    my $page = $self->get_page($page_name);
-    if ($page->is_new and not $do_create) {
-      log_trace { "Skipping page '$page_name': Does not yet exist" };
-      return 1;
-    }
-    my $is_changed = $page->update($sorted);
-    unless ($is_changed) {
-      log_debug { "Not pushing page '$page_name': No changes" };
-      return 1;
-    }
-    log_debug { "Updating page '$page_name'" };
-    $self->put_page($page);
-    return 1;
-  }
-  catch {
-    log_error { "Error during page update: $_" };
-    return 1;
-  };
-}
-
 with $_ for qw(
   System::Introspector::Report::Publish::API
+  System::Introspector::Report::Publish::MediaWiki::HasConnectInfo
 );
 
 1;