--- /dev/null
+package IO::Pipeline;
+
+use strict;
+use warnings FATAL => 'all';
+use Scalar::Util qw(blessed);
+use IO::Handle;
+use Exporter ();
+
+our @ISA = qw(Exporter);
+
+our @EXPORT = qw(pmap pgrep psink);
+
+sub import {
+ warnings->unimport('void');
+ shift->export_to_level(1, @_);
+}
+
+sub pmap (&) { IO::Pipeline->from_code_map($_[0]) }
+sub pgrep (&) { IO::Pipeline->from_code_grep($_[0]) }
+sub psink (&) { IO::Pipeline->from_code_sink($_[0]) }
+
+use overload
+ '|' => '_pipe_operator',
+ fallback => 1;
+
+sub IO::Pipeline::CodeSink::print {
+ my $code = (shift)->{code};
+ foreach my $line (@_) {
+ local $_ = $line;
+ $code->($line);
+ }
+}
+
+sub from_code_map {
+ bless({ map => [ $_[1] ] }, $_[0]);
+}
+
+sub from_code_grep {
+ my ($class, $grep) = @_;
+ $class->from_code_map(sub { $grep->($_) ? ($_) : () });
+}
+
+sub from_code_sink {
+ bless({ code => $_[1] }, 'IO::Pipeline::CodeSink');
+}
+
+sub _pipe_operator {
+ my ($self, $other, $reversed) = @_;
+ if (blessed($other) && $other->isa('IO::Pipeline')) {
+ my ($left, $right) = $reversed ? ($other, $self) : ($self, $other);
+ my %new = (map => [ @{$left->{map}}, @{$right->{map}} ]);
+ die "Right hand side has a source, makes no sense"
+ if $right->{source};
+ $new{source} = $left->{source} if $left->{source};
+ die "Left hand side has a sink, makes no sense"
+ if $left->{sink};
+ $new{sink} = $right->{sink} if $right->{sink};
+ return bless(\%new, ref($self));
+ } else {
+ my ($is, $isnt) = $reversed ? qw(source sink) : qw(sink source);
+ if (my $fail = $self->{$is}) {
+ die "Tried to add ${is} ${other} but we already had ${fail}";
+ }
+ my $new = bless({ $is => $other, %$self }, ref($self));
+ if ($new->{$isnt}) {
+ $new->run;
+ return;
+ } else {
+ return $new;
+ }
+ }
+}
+
+sub run {
+ my ($self) = @_;
+ my $source = $self->{source};
+ my $sink = $self->{sink};
+ LINE: while (defined(my $line = $source->getline)) {
+ my @lines = ($line);
+ foreach my $map (@{$self->{map}}) {
+ @lines = map $map->($_), @lines;
+ next LINE unless @lines;
+ }
+ $sink->print(@lines);
+ }
+}
+
+1;
--- /dev/null
+use strict;
+use warnings FATAL => 'all';
+use IO::Pipeline;
+use Test::More qw(no_plan);
+
+my $source = <<'END';
+2010-03-21 16:15:30 1NtNoI-000658-6V Completed
+2010-03-21 16:17:29 1NtNlx-00062B-0R Completed
+2010-03-21 16:20:37 1NtNtF-0006AE-G6 Completed
+2010-03-21 16:28:37 no host name found for IP address 218.108.42.254
+2010-03-21 16:28:51 H=(ZTZUWWCRQY) [218.108.42.254] F=<pansiesyd75@setupper.com> rejected RCPT <inline@trout.me.uk>: rejected because 218.108.42.254 is in a black list at zen.spamhaus.org
+2010-03-21 16:28:51 unexpected disconnection while reading SMTP command from (ZTZUWWCRQY) [218.108.42.254] (error: Connection reset by peer)
+2010-03-21 16:35:57 no host name found for IP address 123.122.231.66
+2010-03-21 16:35:59 H=(LFMTSDM) [123.122.231.66] F=<belladonnai6@buybuildanichestore.com> rejected RCPT <tal@fyrestorm.co.uk>: rejected because 123.122.231.66 is in a black list at zen.spamhaus.org
+END
+
+sub input_fh {
+ open my $in, '<', \$source;
+ return $in;
+}
+
+my $out;
+
+my $pipe = input_fh
+ | pmap { [ /^(\S+) (\S+) (.*)$/ ] }
+ | pgrep { $_->[2] =~ /rejected|Completed/ }
+ | pmap { [ @{$_}[0, 1], $_->[2] =~ /rejected/ ? 'Rejected' : 'Completed' ] }
+ | pmap { join(' ', @$_)."\n" }
+ | psink { $out .= $_ };
+
+is($out, <<'END', 'Output ok');
+2010-03-21 16:15:30 Completed
+2010-03-21 16:17:29 Completed
+2010-03-21 16:20:37 Completed
+2010-03-21 16:28:51 Rejected
+2010-03-21 16:35:59 Rejected
+END