0.009002 release commit
[p5sagit/IO-Pipeline.git] / lib / IO / Pipeline.pm
1 package IO::Pipeline;
2
3 use strict;
4 use warnings FATAL => 'all';
5 use 5.008001;
6 use Scalar::Util qw(blessed);
7 use IO::Handle;
8 use Exporter ();
9
10 our @ISA = qw(Exporter);
11
12 our @EXPORT = qw(pmap pgrep psink);
13
14 our $VERSION = '0.009002'; # 0.9.2
15
16 $VERSION = eval $VERSION;
17
18 sub import {
19   warnings->unimport('void');
20   shift->export_to_level(1, @_);
21 }
22
23 sub pmap (&) { IO::Pipeline->from_code_map($_[0]) }
24 sub pgrep (&) { IO::Pipeline->from_code_grep($_[0]) }
25 sub psink (&) { IO::Pipeline->from_code_sink($_[0]) }
26
27 use overload
28   '|' => '_pipe_operator',
29   fallback => 1;
30
31 sub IO::Pipeline::CodeSink::print {
32   my $code = (shift)->{code};
33   foreach my $line (@_) {
34     local $_ = $line;
35     $code->($line);
36   }
37 }
38
39 sub from_code_map {
40   bless({ map => [ $_[1] ] }, $_[0]);
41 }
42
43 sub from_code_grep {
44   my ($class, $grep) = @_;
45   $class->from_code_map(sub { $grep->($_) ? ($_) : () });
46 }
47
48 sub from_code_sink {
49   bless({ code => $_[1] }, 'IO::Pipeline::CodeSink');
50 }
51
52 sub _pipe_operator {
53   my ($self, $other, $reversed) = @_;
54   if (blessed($other) && $other->isa('IO::Pipeline')) {
55     my ($left, $right) = $reversed ? ($other, $self) : ($self, $other);
56     my %new = (map => [ @{$left->{map}}, @{$right->{map}} ]);
57     die "Right hand side has a source, makes no sense"
58       if $right->{source};
59     $new{source} = $left->{source} if $left->{source};
60     die "Left hand side has a sink, makes no sense"
61       if $left->{sink};
62     $new{sink} = $right->{sink} if $right->{sink};
63     return bless(\%new, ref($self));
64   } else {
65     my ($is, $isnt) = $reversed ? qw(source sink) : qw(sink source);
66     if (my $fail = $self->{$is}) {
67       die "Tried to add ${is} ${other} but we already had ${fail}";
68     }
69     my $new = bless({ $is => $other, %$self }, ref($self));
70     if ($new->{$isnt}) {
71       $new->run;
72       return;
73     } else {
74       return $new;
75     }
76   }
77 }
78
79 sub run {
80   my ($self) = @_;
81   my $source = $self->{source};
82   my $sink = $self->{sink};
83   LINE: while (defined(my $line = $source->getline)) {
84     my @lines = ($line);
85     foreach my $map (@{$self->{map}}) {
86       @lines = map $map->($_), @lines;
87       next LINE unless @lines;
88     }
89     $sink->print(@lines);
90   }
91 }
92
93 =head1 NAME
94
95 IO::Pipeline - map and grep for filehandles, unix pipe style
96
97 =head1 SYNOPSIS
98
99   my $source = <<'END';
100   2010-03-21 16:15:30 1NtNoI-000658-6V Completed
101   2010-03-21 16:17:29 1NtNlx-00062B-0R Completed
102   2010-03-21 16:20:37 1NtNtF-0006AE-G6 Completed
103   2010-03-21 16:28:37 no host name found for IP address 218.108.42.254
104   2010-03-21 16:28:51 H=(ZTZUWWCRQY) [218.108.42.254] F=<pansiesyd75@setupper.com> rejected RCPT <inline@trout.me.uk>: rejected because 218.108.42.254 is in a black list at zen.spamhaus.org 
105   2010-03-21 16:28:51 unexpected disconnection while reading SMTP command from (ZTZUWWCRQY) [218.108.42.254] (error: Connection reset by peer)
106   2010-03-21 16:35:57 no host name found for IP address 123.122.231.66
107   2010-03-21 16:35:59 H=(LFMTSDM) [123.122.231.66] F=<belladonnai6@buybuildanichestore.com> rejected RCPT <tal@fyrestorm.co.uk>: rejected because 123.122.231.66 is in a black list at zen.spamhaus.org
108   END 
109   
110   open my $in, '<', \$source
111     or die "Failed to create filehandle from scalar: $!";
112   
113   my $out;
114   
115   $in
116     | pmap { [ /^(\S+) (\S+) (.*)$/ ] }
117     | pgrep { $_->[2] =~ /rejected|Completed/ }
118     | pmap { [ @{$_}[0, 1], $_->[2] =~ /rejected/ ? 'Rejected' : 'Completed' ] }
119     | pmap { join(' ', @$_)."\n" }
120     | psink { $out .= $_ };
121   
122   print $out;
123
124 will print:
125
126   2010-03-21 16:15:30 Completed
127   2010-03-21 16:17:29 Completed
128   2010-03-21 16:20:37 Completed
129   2010-03-21 16:28:51 Rejected
130   2010-03-21 16:35:59 Rejected
131
132 =head1 DESCRIPTION
133
134 IO::Pipeline was born of the idea that I really like writing map/grep type
135 expressions in perl, but writing:
136
137   map { ... } <$fh>;
138
139 does a slurp of the filehandle, and when processing big log files I tend
140 to Not Want That To Happen. Plus, map restricts us to right-to-left processing
141 and I've always been fond of the shell metaphor of connecting commands
142 together left-to-read in a pipeline.
143
144 So, this module was born.
145
146   use IO::Pipeline;
147
148 will export three functions - L</pmap>, L</pgrep> and L</psink>. The first
149 two are the meat of the module, the last one is a means to test by sending
150 results somewhere other than a filehandle (or to chain IO::Pipeline output
151 on to ... well, anywhere else, really).
152
153 pmap and pgrep both return pipeline objects (currently of class IO::Pipeline,
154 but this is considered an implementation detail, not a feature - so please
155 don't write code that relies on it) that provide an overloaded '|' operator.
156
157   my $mapper = pmap { "[header] ".$_ };
158
159   my $filter = pgrep { /ALERT/ };
160
161 When you use | to chain two pipeline objects together, you get another
162 pipeline object:
163
164   my $combined = $mapper | $filter;
165
166 Although since we're going left to right, you probably want to do the grep
167 first:
168
169   my $combined = $filter | $mapper;
170
171 (but it's all the same to IO::Pipeline, of course)
172
173 When you use | with a filehandle on one side, that sets the start or
174 finish of the pipeline, so:
175
176   my $combined_with_input = $readable_fh | $combined;
177
178   my $combined_with_output = $combined | $writeable_fh;
179
180 and if you don't want a real filehandle for the second option, you can use
181 psink:
182
183   my $output = '';
184   
185   my $combined_with_output = $combined | psink { $output .= $_ };
186
187 Once both an input and an output have been provided, IO::Pipeline runs the
188 full pipeline, reading from the input and pushing one line at a time down
189 the pipe to the output until the input filehandle is exhausted.
190
191 Non-completed pipeline objects are completely re-usable though - so you can
192 (and are expected to) do things like:
193
194   my $combined_to_stoud = $combined | \*STDOUT;
195   
196   foreach my $file (@files_to_process) {
197   
198     open my $in, '<', $file
199       or die "Couldn't open ${file}: $!";
200   
201     $in | $combined_to_stdout;
202   }
203
204 =head1 EXPORTED FUNCTIONS
205
206 =head2 pmap
207
208   my $mapper = pmap { <return zero or more new lines based on $_> };
209
210 A pipeline part built with pmap gets invoked for each line on the pipeline,
211 with the line in both $_ and $_[0].
212
213 It may, as with perl's map operator, return zero or more elements. If it
214 returns nothing at all, IO::Pipeline will go back to the start of the pipe
215 chain and read another line to restart processing with. If it returns
216 one or more lines, each one is fed in turn into the rest of the pipe chain.
217
218 Most of the time, you probably just want to modify the line somehow and then
219 return it (note that $_ is a copy of the input line so this is safe):
220
221   my $fix_teh = pmap { s/teh/the/g; $_; };
222
223 Note that you still need to actively return $_ for the pipe to continue
224 (again, as with perl's map operator).
225
226 =head2 pgrep
227
228   my $filter = pgrep { <return true or false to keep or throw away $_> };
229
230 A pipeline part built with pgrep gets invoked for each line on the pipeline,
231 with the line in both $_ and $_[0].
232
233 If it returns a true value, the line is passed on to the next stage of the
234 pipeline. If it returns a false value, the line is thrown away and IO::Pipeline
235 will go back to the start of the pipe chain and read another line to restart
236 processing with.
237
238 The upshot of this is that any pgrep can be turned trivially into a pmap:
239
240   my $filter = pgrep { /ALERT/ };
241
242 is precisely equivalent to:
243
244   my $filter = pmap { /ALERT/ ? ($_) : () };
245
246 but the pgrep form is rather clearer.
247
248 =head2 psink
249
250   my $output = '';
251   
252   my $sink = psink { $output .= $_ };
253
254 A pipe sink is an alternative to an output filehandle as the last element
255 of a pipeline. Where in the case of a normal filehandle a line would be
256 printed to the handle, given a sink IO::Pipeline will call the code block
257 provided. So:
258
259   $pipeline | \*STDOUT;
260
261 and
262
263   $pipeline | psink { print STDOUT $_; }
264
265 will have exactly the same end result.
266
267 If you're looking for the source version of this, there isn't one built in
268 because L<IO::Handle::Util|Yuval Kogman's IO::Handle::Util module> already
269 provides an io_from_getline construct that does that, along with a bunch
270 more things that you may find very useful.
271
272 =head1 DECONSTRUCTING THE SYNOPSIS
273
274 Start with an input filehandle:
275
276   $in
277
278 Next, we split the line up - so
279
280   2010-03-21 16:15:30 1NtNoI-000658-6V Completed
281
282 becomes
283
284   [ '2010-03-21', '16:15:30', '1NtNoI-000658-6V Completed' ]
285
286 using a regexp in list context so that all the match values fall out into
287 a new anonymous array reference:
288
289     | pmap { [ /^(\S+) (\S+) (.*)$/ ] }
290
291 Now we've separated out the message, we want to throw away anything that isn't
292 either a 'rejected' or 'Completed' line, so we test the last element of the
293 split line for that:
294
295     | pgrep { $_->[2] =~ /rejected|Completed/ }
296
297 Now we know which is which, we want to turn
298
299   [ '2010-03-21', '16:15:30', '1NtNoI-000658-6V Completed' ]
300
301 into
302
303   [ '2010-03-21', '16:15:30', 'Completed' ]
304
305 and similarly for rejected lines. Since we know both lines are one or the
306 other, we can simply test for 'rejected' in the line -
307
308   $_->[2] =~ /rejected/ ? 'Rejected' : 'Completed'
309
310 and then we construct a new array reference consisting of the first two
311 elements of the original array
312
313   @{$_}[0, 1]
314
315 plus the new value for the third element:
316
317     | pmap { [ @{$_}[0, 1], $_->[2] =~ /rejected/ ? 'Rejected' : 'Completed' ] }
318
319 This done, we can now reassemble the line using join (remembering to add a
320 newline since IO::Pipeline doesn't in case you didn't want one)
321
322     | pmap { join(' ', @$_)."\n" }
323
324 and then in lieu of sending it somewhere else, since this is just a
325 demonstration code fragment, add a sink that appends things onto the end of
326 a variable so that we can examine the results:
327
328     | psink { $out .= $_ };
329
330 =head1 AUTHOR
331
332 Matt S. Trout (mst) <mst@shadowcat.co.uk>
333
334 =head2 CONTRIBUTORS
335
336 None as yet, though I'm sure that'll change as soon as people spot the
337 giant gaping holes that inevitably exist in any software only used by
338 the author so far.
339
340 =head1 COPYRIGHT
341
342 Copyright (c) 2010 the IO::Pipeline L</AUTHOR> and L</CONTRIBUTORS>
343 as listed above.
344
345 =head1 LICENSE
346
347 This library is free software and may be distributed under the same terms
348 as perl itself.
349
350 =head1 SUPPORT
351
352 Right now, your best routes are probably (a) to come ask questions on
353 #perl on irc.freenode.net or #perl-help on irc.perl.org (I'm on there with
354 nick mst if nobody else around at the time manages to help you first) or
355 (b) to email me directly at the address given in L</AUTHOR> above. You're
356 also welcome to use rt.cpan.org to report bugs (which you can do without
357 a login by mailing bugs-IO-Pipeline at that domain), but please cc my
358 email address as well on grounds of me being a Bad Person and thereby not
359 always spotting tickets.
360
361 =head1 SOURCE CODE
362
363 This code lives in git.shadowcat.co.uk and can be viewed via gitweb using
364
365   http://git.shadowcat.co.uk/gitweb/gitweb.cgi?p=p5sagit/IO-Pipeline.git;a=summary
366
367 or checked out via git-daemon using
368
369   git://git.shadowcat.co.uk/p5sagit/IO-Pipeline.git
370
371 =cut
372
373 1;