allow usage of a pool of mediawiki updaters instead of just single-process
Robert 'phaylon' Sedlacek [Thu, 4 Oct 2012 20:01:13 +0000 (20:01 +0000)]
Makefile.PL
lib/System/Introspector/Report/Publish/MediaWiki.pm
lib/System/Introspector/Report/Publish/MediaWiki/HasConnectInfo.pm [new file with mode: 0644]
lib/System/Introspector/Report/Publish/MediaWiki/Updater.pm [new file with mode: 0644]
t/lib/TestConnection.pm [new file with mode: 0644]
t/publish_mediawiki.t

index 86cba39..d532751 100644 (file)
@@ -18,6 +18,7 @@ WriteMakefile(
     'Object::Remote'    => '0.001001',
     'MediaWiki::API'    => '0.39',
     'Log::Contextual'   => '0.004202',
+    'Module::Runtime'   => '0.013',
     'MRO::Compat'       => 0,
     'Class::C3'         => 0,
     'Getopt::Long'      => 0,
index 768a465..338c3a0 100644 (file)
@@ -1,6 +1,8 @@
 package System::Introspector::Report::Publish::MediaWiki;
 use Moo;
 use Try::Tiny;
+use Object::Remote;
+use Object::Remote::Future;
 use Log::Contextual::WarnLogger;
 use Log::Contextual qw( :log ),
   -default_logger => Log::Contextual::WarnLogger->new({
@@ -8,7 +10,7 @@ use Log::Contextual qw( :log ),
     levels      => [qw( error fatal warn )],
   });
 
-use aliased 'System::Introspector::Report::Publish::MediaWiki::Connection';
+use aliased 'System::Introspector::Report::Publish::MediaWiki::Updater';
 
 has page_options => (
   is => 'ro',
@@ -16,32 +18,26 @@ has page_options => (
   init_arg => 'page',
 );
 
-has connection => (is => 'ro', lazy => 1, builder => 1, handles => {
-  get_page => 'get',
-  put_page => 'put',
-});
-
-has api_uri         => (is => 'ro', required => 1);
-has username        => (is => 'ro', required => 1);
-has password        => (is => 'ro', required => 1);
-has allow_create    => (is => 'ro', default => sub { 0 });
-has http_auth       => (is => 'ro');
-has http_realm      => (is => 'ro');
+has process_count => (
+  is => 'ro',
+  isa => sub {
+    die "MediaWiki processes count has to be above 0\n"
+      if $_[0] < 1;
+  },
+  default => sub { 1 },
+  init_arg => 'processes',
+);
 
-sub _build_producer { Producer->new }
+has updater_pool => (
+  is => 'lazy',
+  init_arg => undef,
+);
 
-sub _build_connection {
+sub _build_updater_pool {
   my ($self) = @_;
-  return Connection->new(
-    api_uri         => $self->api_uri,
-    username        => $self->username,
-    password        => $self->password,
-    allow_create    => $self->allow_create,
-    $self->http_auth ? (
-      http_auth  => 1,
-      http_realm => $self->http_realm,
-    ) : (),
-  );
+  return [map {
+    Updater->new::on('-', $self->connect_info_pairs);
+  } 1 .. $self->process_count];
 }
 
 sub publish {
@@ -51,12 +47,32 @@ sub publish {
     sprintf "Pushing reports to MediaWiki at '%s'", $self->api_uri;
   };
   for my $page (sort keys %$pages) {
-    $self->_publish_page($reports, $page, $pages->{$page});
+    my $options = $pages->{$page};
+    if ($page =~ m{\%\(meta:(.+?)\)}) {
+      return $self->_publish_dynamic($reports, $page, $1, $options);
+    }
+    else {
+      $self->_publish_pages([$reports, $page, $options]);
+    }
   }
   log_info { "Finished pushing to MediaWiki" };
   return 1;
 }
 
+sub _publish_pages {
+  my ($self, @pages) = @_;
+  my $stream = sub {
+    my $next = shift @pages
+      or return undef;
+    my ($reports, $page_name, $options) = @$next;
+    my $sorted = $self->_sort_reports($reports, $options->{report} || []);
+    return [$page_name, $options, $sorted];
+  };
+  await_all map {
+    $_->start::process($stream);
+  } @{ $self->updater_pool };
+}
+
 sub _sort_reports {
   my ($self, $reports, $included) = @_;
   my @matchers = map {
@@ -82,49 +98,20 @@ sub _publish_dynamic {
     next unless defined $value;
     push @{ $reports_by_key{$value} ||= [] }, $report;
   }
+  my @pages;
   for my $value (keys %reports_by_key) {
     (my $effective_page = $page_name)
       =~ s{\%\(meta:\Q$key\E\)}{$value};
     my $assoc = $reports_by_key{$value};
-    $self->_publish_page($assoc, $effective_page, $options);
+    push @pages, [$assoc, $effective_page, $options];
   }
+  $self->_publish_pages(@pages);
   return 1;
 }
 
-sub _publish_page {
-  my ($self, $reports, $page_name, $options) = @_;
-  if ($page_name =~ m{\%\(meta:(.+?)\)}) {
-    return $self->_publish_dynamic($reports, $page_name, $1, $options);
-  }
-  my $sorted = $self->_sort_reports($reports, $options->{report} || []);
-  unless (scalar @$sorted) {
-    log_debug { "Skipping page '$page_name': No reports to publish" };
-    return 1;
-  }
-  my $do_create = $options->{create};
-  return try {
-    my $page = $self->get_page($page_name);
-    if ($page->is_new and not $do_create) {
-      log_trace { "Skipping page '$page_name': Does not yet exist" };
-      return 1;
-    }
-    my $is_changed = $page->update($sorted);
-    unless ($is_changed) {
-      log_debug { "Not pushing page '$page_name': No changes" };
-      return 1;
-    }
-    log_debug { "Updating page '$page_name'" };
-    $self->put_page($page);
-    return 1;
-  }
-  catch {
-    log_error { "Error during page update: $_" };
-    return 1;
-  };
-}
-
 with $_ for qw(
   System::Introspector::Report::Publish::API
+  System::Introspector::Report::Publish::MediaWiki::HasConnectInfo
 );
 
 1;
diff --git a/lib/System/Introspector/Report/Publish/MediaWiki/HasConnectInfo.pm b/lib/System/Introspector/Report/Publish/MediaWiki/HasConnectInfo.pm
new file mode 100644 (file)
index 0000000..96e1d2f
--- /dev/null
@@ -0,0 +1,27 @@
+package System::Introspector::Report::Publish::MediaWiki::HasConnectInfo;
+use Moo::Role;
+
+has connection_class    => (is => 'ro', lazy => 1, builder => 1);
+has api_uri             => (is => 'ro', required => 1);
+has username            => (is => 'ro', required => 1);
+has password            => (is => 'ro', required => 1);
+has http_auth           => (is => 'ro');
+has http_realm          => (is => 'ro');
+
+sub _build_connection_class {
+  'System::Introspector::Report::Publish::MediaWiki::Connection'
+}
+
+sub connect_info_pairs {
+  my ($self) = @_;
+  return map { ($_ => $self->$_) } qw(
+    api_uri
+    username
+    password
+    http_auth
+    http_realm
+    connection_class
+  );
+}
+
+1;
diff --git a/lib/System/Introspector/Report/Publish/MediaWiki/Updater.pm b/lib/System/Introspector/Report/Publish/MediaWiki/Updater.pm
new file mode 100644 (file)
index 0000000..1ae642b
--- /dev/null
@@ -0,0 +1,71 @@
+package System::Introspector::Report::Publish::MediaWiki::Updater;
+use Moo;
+use Try::Tiny;
+use Module::Runtime qw( use_module );
+use Log::Contextual::WarnLogger;
+use Log::Contextual qw( :log ),
+  -default_logger => Log::Contextual::WarnLogger->new({
+    env_prefix => 'SI_REPORT_MEDIAWIKI_UPDATER',
+    levels      => [qw( error fatal warn info trace debug )],
+  });
+
+has connection => (is => 'ro', lazy => 1, builder => 1, handles => {
+  get_page => 'get',
+  put_page => 'put',
+});
+
+sub _build_connection {
+  my ($self) = @_;
+  return use_module($self->connection_class)->new(
+    api_uri         => $self->api_uri,
+    username        => $self->username,
+    password        => $self->password,
+    allow_create    => 1,
+    $self->http_auth ? (
+      http_auth  => 1,
+      http_realm => $self->http_realm,
+    ) : (),
+  );
+}
+
+sub process {
+  my ($self, $stream) = @_;
+  while (my $report = $stream->()) {
+    $self->_process_page(@$report);
+  }
+  return 1;
+}
+
+sub _process_page {
+  my ($self, $page_name, $options, $reports) = @_;
+  unless (scalar @$reports) {
+    log_debug { "Skipping page '$page_name': No reports to publish" };
+    return 1;
+  }
+  my $do_create = $options->{create};
+  return try {
+    my $page = $self->get_page($page_name);
+    if ($page->is_new and not $do_create) {
+      log_trace { "Skipping page '$page_name': Does not yet exist" };
+      return 1;
+    }
+    my $is_changed = $page->update($reports);
+    unless ($is_changed) {
+      log_debug { "Not pushing page '$page_name': No changes" };
+      return 1;
+    }
+    log_debug { "Updating page '$page_name'" };
+    $self->put_page($page);
+    return 1;
+  }
+  catch {
+    log_error { "Error during page update: $_" };
+    return 1;
+  };
+}
+
+with $_ for qw(
+  System::Introspector::Report::Publish::MediaWiki::HasConnectInfo
+);
+
+1;
diff --git a/t/lib/TestConnection.pm b/t/lib/TestConnection.pm
new file mode 100644 (file)
index 0000000..1956a29
--- /dev/null
@@ -0,0 +1,25 @@
+package TestConnection;
+use Moo;
+use IO::All;
+use aliased 'System::Introspector::Report::Publish::MediaWiki::Page';
+
+extends 'System::Introspector::Report::Publish::MediaWiki::Connection';
+
+sub get {
+  my ($self, $name) = @_;
+  return Page->new(
+    name      => $name,
+    timestamp => 23,
+    content   => scalar(io("$ENV{TEST_SI_REPORT_IN}/$name.txt")->slurp),
+  );
+}
+
+sub put {
+  my ($self, $page) = @_;
+  my $out = $ENV{TEST_SI_REPORT_OUT}
+    or die "No output directory";
+  io("$out/" . $page->name)->print($page->content);
+  return 1;
+}
+
+1;
index a328e8e..1731bdf 100644 (file)
@@ -2,38 +2,20 @@ use strictures 1;
 use Test::More;
 use IO::All;
 use FindBin;
-
+use File::Temp;
+use lib "$FindBin::Bin/lib";
 use aliased 'System::Introspector::Report::Publish::MediaWiki';
 
-my %result;
-
-do {
-  package TestConnection;
-  use Moo;
-  use IO::All;
-  use aliased 'System::Introspector::Report::Publish::MediaWiki::Page';
+$ENV{TEST_SI_REPORT_OUT} = my $out_dir = File::Temp->newdir;
+$ENV{TEST_SI_REPORT_IN}  = "$FindBin::Bin/data";
 
-  sub get {
-    my ($self, $name) = @_;
-    return Page->new(
-      name      => $name,
-      timestamp => 23,
-      content   => scalar(io("$FindBin::Bin/data/$name.txt")->slurp),
-    );
-  }
-
-  sub put {
-    my ($self, $page) = @_;
-    $result{$page->name} = $page->content;
-  }
-};
+my %result;
 
-my $conn = TestConnection->new;
 my $wiki = MediaWiki->new(
   api_uri => 'http://example.com:9999/',
+  connection_class => 'TestConnection',
   username => 'foo',
   password => 'bar',
-  connection => $conn,
   page => {
     foo => {
       report => ['foo:*'],
@@ -89,9 +71,9 @@ my $_despace = sub {
 };
 
 ## uncomment to regenerate result file
-# do { no warnings; $result{foo} > io("$FindBin::Bin/data/result/foo.txt") };
+# do { no warnings; io("$out_dir/foo") > io("$FindBin::Bin/data/result/foo.txt"); fail "regenerated" };
 
-is $result{foo}->$_despace,
+is scalar(io("$out_dir/foo")->slurp)->$_despace,
    scalar(io("$FindBin::Bin/data/result/foo.txt")->slurp)->$_despace,
    'resulting page looks correct';