From: Matt S Trout Date: Sun, 21 Mar 2010 19:35:03 +0000 (+0000) Subject: initial import with basic test X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?p=p5sagit%2FIO-Pipeline.git;a=commitdiff_plain;h=d5217b4dab9ef5f4217f1fd4885562f5203d8044 initial import with basic test --- d5217b4dab9ef5f4217f1fd4885562f5203d8044 diff --git a/lib/IO/Pipeline.pm b/lib/IO/Pipeline.pm new file mode 100644 index 0000000..a3084b8 --- /dev/null +++ b/lib/IO/Pipeline.pm @@ -0,0 +1,88 @@ +package IO::Pipeline; + +use strict; +use warnings FATAL => 'all'; +use Scalar::Util qw(blessed); +use IO::Handle; +use Exporter (); + +our @ISA = qw(Exporter); + +our @EXPORT = qw(pmap pgrep psink); + +sub import { + warnings->unimport('void'); + shift->export_to_level(1, @_); +} + +sub pmap (&) { IO::Pipeline->from_code_map($_[0]) } +sub pgrep (&) { IO::Pipeline->from_code_grep($_[0]) } +sub psink (&) { IO::Pipeline->from_code_sink($_[0]) } + +use overload + '|' => '_pipe_operator', + fallback => 1; + +sub IO::Pipeline::CodeSink::print { + my $code = (shift)->{code}; + foreach my $line (@_) { + local $_ = $line; + $code->($line); + } +} + +sub from_code_map { + bless({ map => [ $_[1] ] }, $_[0]); +} + +sub from_code_grep { + my ($class, $grep) = @_; + $class->from_code_map(sub { $grep->($_) ? ($_) : () }); +} + +sub from_code_sink { + bless({ code => $_[1] }, 'IO::Pipeline::CodeSink'); +} + +sub _pipe_operator { + my ($self, $other, $reversed) = @_; + if (blessed($other) && $other->isa('IO::Pipeline')) { + my ($left, $right) = $reversed ? ($other, $self) : ($self, $other); + my %new = (map => [ @{$left->{map}}, @{$right->{map}} ]); + die "Right hand side has a source, makes no sense" + if $right->{source}; + $new{source} = $left->{source} if $left->{source}; + die "Left hand side has a sink, makes no sense" + if $left->{sink}; + $new{sink} = $right->{sink} if $right->{sink}; + return bless(\%new, ref($self)); + } else { + my ($is, $isnt) = $reversed ? qw(source sink) : qw(sink source); + if (my $fail = $self->{$is}) { + die "Tried to add ${is} ${other} but we already had ${fail}"; + } + my $new = bless({ $is => $other, %$self }, ref($self)); + if ($new->{$isnt}) { + $new->run; + return; + } else { + return $new; + } + } +} + +sub run { + my ($self) = @_; + my $source = $self->{source}; + my $sink = $self->{sink}; + LINE: while (defined(my $line = $source->getline)) { + my @lines = ($line); + foreach my $map (@{$self->{map}}) { + @lines = map $map->($_), @lines; + next LINE unless @lines; + } + $sink->print(@lines); + } +} + +1; diff --git a/t/pipeline.t b/t/pipeline.t new file mode 100644 index 0000000..073c017 --- /dev/null +++ b/t/pipeline.t @@ -0,0 +1,37 @@ +use strict; +use warnings FATAL => 'all'; +use IO::Pipeline; +use Test::More qw(no_plan); + +my $source = <<'END'; +2010-03-21 16:15:30 1NtNoI-000658-6V Completed +2010-03-21 16:17:29 1NtNlx-00062B-0R Completed +2010-03-21 16:20:37 1NtNtF-0006AE-G6 Completed +2010-03-21 16:28:37 no host name found for IP address 218.108.42.254 +2010-03-21 16:28:51 H=(ZTZUWWCRQY) [218.108.42.254] F= rejected RCPT : rejected because 218.108.42.254 is in a black list at zen.spamhaus.org +2010-03-21 16:28:51 unexpected disconnection while reading SMTP command from (ZTZUWWCRQY) [218.108.42.254] (error: Connection reset by peer) +2010-03-21 16:35:57 no host name found for IP address 123.122.231.66 +2010-03-21 16:35:59 H=(LFMTSDM) [123.122.231.66] F= rejected RCPT : rejected because 123.122.231.66 is in a black list at zen.spamhaus.org +END + +sub input_fh { + open my $in, '<', \$source; + return $in; +} + +my $out; + +my $pipe = input_fh + | pmap { [ /^(\S+) (\S+) (.*)$/ ] } + | pgrep { $_->[2] =~ /rejected|Completed/ } + | pmap { [ @{$_}[0, 1], $_->[2] =~ /rejected/ ? 'Rejected' : 'Completed' ] } + | pmap { join(' ', @$_)."\n" } + | psink { $out .= $_ }; + +is($out, <<'END', 'Output ok'); +2010-03-21 16:15:30 Completed +2010-03-21 16:17:29 Completed +2010-03-21 16:20:37 Completed +2010-03-21 16:28:51 Rejected +2010-03-21 16:35:59 Rejected +END