0.009002 release commit
[p5sagit/IO-Pipeline.git] / lib / IO / Pipeline.pm
CommitLineData
d5217b4d 1package IO::Pipeline;
2
3use strict;
4use warnings FATAL => 'all';
fe528e03 5use 5.008001;
d5217b4d 6use Scalar::Util qw(blessed);
7use IO::Handle;
8use Exporter ();
9
10our @ISA = qw(Exporter);
11
12our @EXPORT = qw(pmap pgrep psink);
13
bd273c77 14our $VERSION = '0.009002'; # 0.9.2
fe528e03 15
16$VERSION = eval $VERSION;
17
d5217b4d 18sub import {
19 warnings->unimport('void');
20 shift->export_to_level(1, @_);
21}
22
23sub pmap (&) { IO::Pipeline->from_code_map($_[0]) }
24sub pgrep (&) { IO::Pipeline->from_code_grep($_[0]) }
25sub psink (&) { IO::Pipeline->from_code_sink($_[0]) }
26
27use overload
28 '|' => '_pipe_operator',
29 fallback => 1;
30
31sub IO::Pipeline::CodeSink::print {
32 my $code = (shift)->{code};
33 foreach my $line (@_) {
34 local $_ = $line;
35 $code->($line);
36 }
37}
38
39sub from_code_map {
40 bless({ map => [ $_[1] ] }, $_[0]);
41}
42
43sub from_code_grep {
44 my ($class, $grep) = @_;
45 $class->from_code_map(sub { $grep->($_) ? ($_) : () });
46}
47
48sub from_code_sink {
49 bless({ code => $_[1] }, 'IO::Pipeline::CodeSink');
50}
51
52sub _pipe_operator {
53 my ($self, $other, $reversed) = @_;
54 if (blessed($other) && $other->isa('IO::Pipeline')) {
55 my ($left, $right) = $reversed ? ($other, $self) : ($self, $other);
56 my %new = (map => [ @{$left->{map}}, @{$right->{map}} ]);
57 die "Right hand side has a source, makes no sense"
58 if $right->{source};
59 $new{source} = $left->{source} if $left->{source};
60 die "Left hand side has a sink, makes no sense"
61 if $left->{sink};
62 $new{sink} = $right->{sink} if $right->{sink};
63 return bless(\%new, ref($self));
64 } else {
65 my ($is, $isnt) = $reversed ? qw(source sink) : qw(sink source);
66 if (my $fail = $self->{$is}) {
67 die "Tried to add ${is} ${other} but we already had ${fail}";
68 }
69 my $new = bless({ $is => $other, %$self }, ref($self));
70 if ($new->{$isnt}) {
71 $new->run;
72 return;
73 } else {
74 return $new;
75 }
76 }
77}
78
79sub run {
80 my ($self) = @_;
81 my $source = $self->{source};
82 my $sink = $self->{sink};
83 LINE: while (defined(my $line = $source->getline)) {
84 my @lines = ($line);
85 foreach my $map (@{$self->{map}}) {
86 @lines = map $map->($_), @lines;
87 next LINE unless @lines;
88 }
89 $sink->print(@lines);
90 }
91}
92
fe528e03 93=head1 NAME
94
95IO::Pipeline - map and grep for filehandles, unix pipe style
96
97=head1 SYNOPSIS
98
99 my $source = <<'END';
100 2010-03-21 16:15:30 1NtNoI-000658-6V Completed
101 2010-03-21 16:17:29 1NtNlx-00062B-0R Completed
102 2010-03-21 16:20:37 1NtNtF-0006AE-G6 Completed
103 2010-03-21 16:28:37 no host name found for IP address 218.108.42.254
104 2010-03-21 16:28:51 H=(ZTZUWWCRQY) [218.108.42.254] F=<pansiesyd75@setupper.com> rejected RCPT <inline@trout.me.uk>: rejected because 218.108.42.254 is in a black list at zen.spamhaus.org
105 2010-03-21 16:28:51 unexpected disconnection while reading SMTP command from (ZTZUWWCRQY) [218.108.42.254] (error: Connection reset by peer)
106 2010-03-21 16:35:57 no host name found for IP address 123.122.231.66
107 2010-03-21 16:35:59 H=(LFMTSDM) [123.122.231.66] F=<belladonnai6@buybuildanichestore.com> rejected RCPT <tal@fyrestorm.co.uk>: rejected because 123.122.231.66 is in a black list at zen.spamhaus.org
108 END
109
110 open my $in, '<', \$source
111 or die "Failed to create filehandle from scalar: $!";
112
113 my $out;
114
115 $in
116 | pmap { [ /^(\S+) (\S+) (.*)$/ ] }
117 | pgrep { $_->[2] =~ /rejected|Completed/ }
118 | pmap { [ @{$_}[0, 1], $_->[2] =~ /rejected/ ? 'Rejected' : 'Completed' ] }
119 | pmap { join(' ', @$_)."\n" }
120 | psink { $out .= $_ };
121
122 print $out;
123
124will print:
125
126 2010-03-21 16:15:30 Completed
127 2010-03-21 16:17:29 Completed
128 2010-03-21 16:20:37 Completed
129 2010-03-21 16:28:51 Rejected
130 2010-03-21 16:35:59 Rejected
131
132=head1 DESCRIPTION
133
134IO::Pipeline was born of the idea that I really like writing map/grep type
135expressions in perl, but writing:
136
137 map { ... } <$fh>;
138
139does a slurp of the filehandle, and when processing big log files I tend
140to Not Want That To Happen. Plus, map restricts us to right-to-left processing
141and I've always been fond of the shell metaphor of connecting commands
142together left-to-read in a pipeline.
143
144So, this module was born.
145
146 use IO::Pipeline;
147
148will export three functions - L</pmap>, L</pgrep> and L</psink>. The first
149two are the meat of the module, the last one is a means to test by sending
150results somewhere other than a filehandle (or to chain IO::Pipeline output
151on to ... well, anywhere else, really).
152
153pmap and pgrep both return pipeline objects (currently of class IO::Pipeline,
154but this is considered an implementation detail, not a feature - so please
155don't write code that relies on it) that provide an overloaded '|' operator.
156
157 my $mapper = pmap { "[header] ".$_ };
158
159 my $filter = pgrep { /ALERT/ };
160
161When you use | to chain two pipeline objects together, you get another
162pipeline object:
163
164 my $combined = $mapper | $filter;
165
166Although since we're going left to right, you probably want to do the grep
167first:
168
169 my $combined = $filter | $mapper;
170
171(but it's all the same to IO::Pipeline, of course)
172
173When you use | with a filehandle on one side, that sets the start or
174finish of the pipeline, so:
175
176 my $combined_with_input = $readable_fh | $combined;
177
178 my $combined_with_output = $combined | $writeable_fh;
179
180and if you don't want a real filehandle for the second option, you can use
181psink:
182
183 my $output = '';
184
185 my $combined_with_output = $combined | psink { $output .= $_ };
186
187Once both an input and an output have been provided, IO::Pipeline runs the
188full pipeline, reading from the input and pushing one line at a time down
189the pipe to the output until the input filehandle is exhausted.
190
191Non-completed pipeline objects are completely re-usable though - so you can
192(and are expected to) do things like:
193
194 my $combined_to_stoud = $combined | \*STDOUT;
195
196 foreach my $file (@files_to_process) {
197
198 open my $in, '<', $file
199 or die "Couldn't open ${file}: $!";
200
201 $in | $combined_to_stdout;
202 }
203
204=head1 EXPORTED FUNCTIONS
205
206=head2 pmap
207
208 my $mapper = pmap { <return zero or more new lines based on $_> };
209
210A pipeline part built with pmap gets invoked for each line on the pipeline,
211with the line in both $_ and $_[0].
212
213It may, as with perl's map operator, return zero or more elements. If it
214returns nothing at all, IO::Pipeline will go back to the start of the pipe
215chain and read another line to restart processing with. If it returns
216one or more lines, each one is fed in turn into the rest of the pipe chain.
217
218Most of the time, you probably just want to modify the line somehow and then
219return it (note that $_ is a copy of the input line so this is safe):
220
221 my $fix_teh = pmap { s/teh/the/g; $_; };
222
223Note that you still need to actively return $_ for the pipe to continue
224(again, as with perl's map operator).
225
226=head2 pgrep
227
228 my $filter = pgrep { <return true or false to keep or throw away $_> };
229
230A pipeline part built with pgrep gets invoked for each line on the pipeline,
231with the line in both $_ and $_[0].
232
233If it returns a true value, the line is passed on to the next stage of the
234pipeline. If it returns a false value, the line is thrown away and IO::Pipeline
235will go back to the start of the pipe chain and read another line to restart
236processing with.
237
238The upshot of this is that any pgrep can be turned trivially into a pmap:
239
240 my $filter = pgrep { /ALERT/ };
241
242is precisely equivalent to:
243
244 my $filter = pmap { /ALERT/ ? ($_) : () };
245
246but the pgrep form is rather clearer.
247
248=head2 psink
249
250 my $output = '';
251
252 my $sink = psink { $output .= $_ };
253
254A pipe sink is an alternative to an output filehandle as the last element
255of a pipeline. Where in the case of a normal filehandle a line would be
256printed to the handle, given a sink IO::Pipeline will call the code block
257provided. So:
258
259 $pipeline | \*STDOUT;
260
261and
262
263 $pipeline | psink { print STDOUT $_; }
264
265will have exactly the same end result.
266
267If you're looking for the source version of this, there isn't one built in
268because L<IO::Handle::Util|Yuval Kogman's IO::Handle::Util module> already
269provides an io_from_getline construct that does that, along with a bunch
270more things that you may find very useful.
271
272=head1 DECONSTRUCTING THE SYNOPSIS
273
274Start with an input filehandle:
275
276 $in
277
278Next, we split the line up - so
279
280 2010-03-21 16:15:30 1NtNoI-000658-6V Completed
281
282becomes
283
284 [ '2010-03-21', '16:15:30', '1NtNoI-000658-6V Completed' ]
285
286using a regexp in list context so that all the match values fall out into
287a new anonymous array reference:
288
289 | pmap { [ /^(\S+) (\S+) (.*)$/ ] }
290
291Now we've separated out the message, we want to throw away anything that isn't
292either a 'rejected' or 'Completed' line, so we test the last element of the
293split line for that:
294
295 | pgrep { $_->[2] =~ /rejected|Completed/ }
296
297Now we know which is which, we want to turn
298
299 [ '2010-03-21', '16:15:30', '1NtNoI-000658-6V Completed' ]
300
301into
302
303 [ '2010-03-21', '16:15:30', 'Completed' ]
304
305and similarly for rejected lines. Since we know both lines are one or the
306other, we can simply test for 'rejected' in the line -
307
308 $_->[2] =~ /rejected/ ? 'Rejected' : 'Completed'
309
310and then we construct a new array reference consisting of the first two
311elements of the original array
312
313 @{$_}[0, 1]
314
315plus the new value for the third element:
316
317 | pmap { [ @{$_}[0, 1], $_->[2] =~ /rejected/ ? 'Rejected' : 'Completed' ] }
318
319This done, we can now reassemble the line using join (remembering to add a
320newline since IO::Pipeline doesn't in case you didn't want one)
321
322 | pmap { join(' ', @$_)."\n" }
323
324and then in lieu of sending it somewhere else, since this is just a
325demonstration code fragment, add a sink that appends things onto the end of
326a variable so that we can examine the results:
327
328 | psink { $out .= $_ };
329
330=head1 AUTHOR
331
332Matt S. Trout (mst) <mst@shadowcat.co.uk>
333
334=head2 CONTRIBUTORS
335
336None as yet, though I'm sure that'll change as soon as people spot the
337giant gaping holes that inevitably exist in any software only used by
338the author so far.
339
340=head1 COPYRIGHT
341
bd273c77 342Copyright (c) 2010 the IO::Pipeline L</AUTHOR> and L</CONTRIBUTORS>
fe528e03 343as listed above.
344
345=head1 LICENSE
346
347This library is free software and may be distributed under the same terms
348as perl itself.
349
350=head1 SUPPORT
351
352Right now, your best routes are probably (a) to come ask questions on
353#perl on irc.freenode.net or #perl-help on irc.perl.org (I'm on there with
354nick mst if nobody else around at the time manages to help you first) or
355(b) to email me directly at the address given in L</AUTHOR> above. You're
356also welcome to use rt.cpan.org to report bugs (which you can do without
357a login by mailing bugs-IO-Pipeline at that domain), but please cc my
358email address as well on grounds of me being a Bad Person and thereby not
359always spotting tickets.
360
361=head1 SOURCE CODE
362
363This code lives in git.shadowcat.co.uk and can be viewed via gitweb using
364
365 http://git.shadowcat.co.uk/gitweb/gitweb.cgi?p=p5sagit/IO-Pipeline.git;a=summary
366
367or checked out via git-daemon using
368
369 git://git.shadowcat.co.uk/p5sagit/IO-Pipeline.git
370
371=cut
372
d5217b4d 3731;