a3084b8b200d3f4352c0bab400b940b192ffe2b1
[p5sagit/IO-Pipeline.git] / lib / IO / Pipeline.pm
1 package IO::Pipeline;
2
3 use strict;
4 use warnings FATAL => 'all';
5 use Scalar::Util qw(blessed);
6 use IO::Handle;
7 use Exporter ();
8
9 our @ISA = qw(Exporter);
10
11 our @EXPORT = qw(pmap pgrep psink);
12
13 sub import {
14   warnings->unimport('void');
15   shift->export_to_level(1, @_);
16 }
17
18 sub pmap (&) { IO::Pipeline->from_code_map($_[0]) }
19 sub pgrep (&) { IO::Pipeline->from_code_grep($_[0]) }
20 sub psink (&) { IO::Pipeline->from_code_sink($_[0]) }
21
22 use overload
23   '|' => '_pipe_operator',
24   fallback => 1;
25
26 sub IO::Pipeline::CodeSink::print {
27   my $code = (shift)->{code};
28   foreach my $line (@_) {
29     local $_ = $line;
30     $code->($line);
31   }
32 }
33
34 sub from_code_map {
35   bless({ map => [ $_[1] ] }, $_[0]);
36 }
37
38 sub from_code_grep {
39   my ($class, $grep) = @_;
40   $class->from_code_map(sub { $grep->($_) ? ($_) : () });
41 }
42
43 sub from_code_sink {
44   bless({ code => $_[1] }, 'IO::Pipeline::CodeSink');
45 }
46
47 sub _pipe_operator {
48   my ($self, $other, $reversed) = @_;
49   if (blessed($other) && $other->isa('IO::Pipeline')) {
50     my ($left, $right) = $reversed ? ($other, $self) : ($self, $other);
51     my %new = (map => [ @{$left->{map}}, @{$right->{map}} ]);
52     die "Right hand side has a source, makes no sense"
53       if $right->{source};
54     $new{source} = $left->{source} if $left->{source};
55     die "Left hand side has a sink, makes no sense"
56       if $left->{sink};
57     $new{sink} = $right->{sink} if $right->{sink};
58     return bless(\%new, ref($self));
59   } else {
60     my ($is, $isnt) = $reversed ? qw(source sink) : qw(sink source);
61     if (my $fail = $self->{$is}) {
62       die "Tried to add ${is} ${other} but we already had ${fail}";
63     }
64     my $new = bless({ $is => $other, %$self }, ref($self));
65     if ($new->{$isnt}) {
66       $new->run;
67       return;
68     } else {
69       return $new;
70     }
71   }
72 }
73
74 sub run {
75   my ($self) = @_;
76   my $source = $self->{source};
77   my $sink = $self->{sink};
78   LINE: while (defined(my $line = $source->getline)) {
79     my @lines = ($line);
80     foreach my $map (@{$self->{map}}) {
81       @lines = map $map->($_), @lines;
82       next LINE unless @lines;
83     }
84     $sink->print(@lines);
85   }
86 }
87
88 1;