initial import with basic test
Matt S Trout [Sun, 21 Mar 2010 19:35:03 +0000 (19:35 +0000)]
lib/IO/Pipeline.pm [new file with mode: 0644]
t/pipeline.t [new file with mode: 0644]

diff --git a/lib/IO/Pipeline.pm b/lib/IO/Pipeline.pm
new file mode 100644 (file)
index 0000000..a3084b8
--- /dev/null
@@ -0,0 +1,88 @@
+package IO::Pipeline;
+
+use strict;
+use warnings FATAL => 'all';
+use Scalar::Util qw(blessed);
+use IO::Handle;
+use Exporter ();
+
+our @ISA = qw(Exporter);
+
+our @EXPORT = qw(pmap pgrep psink);
+
+sub import {
+  warnings->unimport('void');
+  shift->export_to_level(1, @_);
+}
+
+sub pmap (&) { IO::Pipeline->from_code_map($_[0]) }
+sub pgrep (&) { IO::Pipeline->from_code_grep($_[0]) }
+sub psink (&) { IO::Pipeline->from_code_sink($_[0]) }
+
+use overload
+  '|' => '_pipe_operator',
+  fallback => 1;
+
+sub IO::Pipeline::CodeSink::print {
+  my $code = (shift)->{code};
+  foreach my $line (@_) {
+    local $_ = $line;
+    $code->($line);
+  }
+}
+
+sub from_code_map {
+  bless({ map => [ $_[1] ] }, $_[0]);
+}
+
+sub from_code_grep {
+  my ($class, $grep) = @_;
+  $class->from_code_map(sub { $grep->($_) ? ($_) : () });
+}
+
+sub from_code_sink {
+  bless({ code => $_[1] }, 'IO::Pipeline::CodeSink');
+}
+
+sub _pipe_operator {
+  my ($self, $other, $reversed) = @_;
+  if (blessed($other) && $other->isa('IO::Pipeline')) {
+    my ($left, $right) = $reversed ? ($other, $self) : ($self, $other);
+    my %new = (map => [ @{$left->{map}}, @{$right->{map}} ]);
+    die "Right hand side has a source, makes no sense"
+      if $right->{source};
+    $new{source} = $left->{source} if $left->{source};
+    die "Left hand side has a sink, makes no sense"
+      if $left->{sink};
+    $new{sink} = $right->{sink} if $right->{sink};
+    return bless(\%new, ref($self));
+  } else {
+    my ($is, $isnt) = $reversed ? qw(source sink) : qw(sink source);
+    if (my $fail = $self->{$is}) {
+      die "Tried to add ${is} ${other} but we already had ${fail}";
+    }
+    my $new = bless({ $is => $other, %$self }, ref($self));
+    if ($new->{$isnt}) {
+      $new->run;
+      return;
+    } else {
+      return $new;
+    }
+  }
+}
+
+sub run {
+  my ($self) = @_;
+  my $source = $self->{source};
+  my $sink = $self->{sink};
+  LINE: while (defined(my $line = $source->getline)) {
+    my @lines = ($line);
+    foreach my $map (@{$self->{map}}) {
+      @lines = map $map->($_), @lines;
+      next LINE unless @lines;
+    }
+    $sink->print(@lines);
+  }
+}
+
+1;
diff --git a/t/pipeline.t b/t/pipeline.t
new file mode 100644 (file)
index 0000000..073c017
--- /dev/null
@@ -0,0 +1,37 @@
+use strict;
+use warnings FATAL => 'all';
+use IO::Pipeline;
+use Test::More qw(no_plan);
+
+my $source = <<'END';
+2010-03-21 16:15:30 1NtNoI-000658-6V Completed
+2010-03-21 16:17:29 1NtNlx-00062B-0R Completed
+2010-03-21 16:20:37 1NtNtF-0006AE-G6 Completed
+2010-03-21 16:28:37 no host name found for IP address 218.108.42.254
+2010-03-21 16:28:51 H=(ZTZUWWCRQY) [218.108.42.254] F=<pansiesyd75@setupper.com> rejected RCPT <inline@trout.me.uk>: rejected because 218.108.42.254 is in a black list at zen.spamhaus.org
+2010-03-21 16:28:51 unexpected disconnection while reading SMTP command from (ZTZUWWCRQY) [218.108.42.254] (error: Connection reset by peer)
+2010-03-21 16:35:57 no host name found for IP address 123.122.231.66
+2010-03-21 16:35:59 H=(LFMTSDM) [123.122.231.66] F=<belladonnai6@buybuildanichestore.com> rejected RCPT <tal@fyrestorm.co.uk>: rejected because 123.122.231.66 is in a black list at zen.spamhaus.org
+END
+
+sub input_fh {
+  open my $in, '<', \$source;
+  return $in;
+}
+
+my $out;
+
+my $pipe = input_fh
+  | pmap { [ /^(\S+) (\S+) (.*)$/ ] }
+  | pgrep { $_->[2] =~ /rejected|Completed/ }
+  | pmap { [ @{$_}[0, 1], $_->[2] =~ /rejected/ ? 'Rejected' : 'Completed' ] }
+  | pmap { join(' ', @$_)."\n" }
+  | psink { $out .= $_ };
+
+is($out, <<'END', 'Output ok');
+2010-03-21 16:15:30 Completed
+2010-03-21 16:17:29 Completed
+2010-03-21 16:20:37 Completed
+2010-03-21 16:28:51 Rejected
+2010-03-21 16:35:59 Rejected
+END