4 use warnings FATAL => 'all';
6 use Scalar::Util qw(blessed);
10 our @ISA = qw(Exporter);
12 our @EXPORT = qw(pmap pgrep psink);
14 our $VERSION = '0.009002'; # 0.9.2
16 $VERSION = eval $VERSION;
19 warnings->unimport('void');
20 shift->export_to_level(1, @_);
23 sub pmap (&) { IO::Pipeline->from_code_map($_[0]) }
24 sub pgrep (&) { IO::Pipeline->from_code_grep($_[0]) }
25 sub psink (&) { IO::Pipeline->from_code_sink($_[0]) }
28 '|' => '_pipe_operator',
31 sub IO::Pipeline::CodeSink::print {
32 my $code = (shift)->{code};
33 foreach my $line (@_) {
40 bless({ map => [ $_[1] ] }, $_[0]);
44 my ($class, $grep) = @_;
45 $class->from_code_map(sub { $grep->($_) ? ($_) : () });
49 bless({ code => $_[1] }, 'IO::Pipeline::CodeSink');
53 my ($self, $other, $reversed) = @_;
54 if (blessed($other) && $other->isa('IO::Pipeline')) {
55 my ($left, $right) = $reversed ? ($other, $self) : ($self, $other);
56 my %new = (map => [ @{$left->{map}}, @{$right->{map}} ]);
57 die "Right hand side has a source, makes no sense"
59 $new{source} = $left->{source} if $left->{source};
60 die "Left hand side has a sink, makes no sense"
62 $new{sink} = $right->{sink} if $right->{sink};
63 return bless(\%new, ref($self));
65 my ($is, $isnt) = $reversed ? qw(source sink) : qw(sink source);
66 if (my $fail = $self->{$is}) {
67 die "Tried to add ${is} ${other} but we already had ${fail}";
69 my $new = bless({ $is => $other, %$self }, ref($self));
81 my $source = $self->{source};
82 my $sink = $self->{sink};
83 LINE: while (defined(my $line = $source->getline)) {
85 foreach my $map (@{$self->{map}}) {
86 @lines = map $map->($_), @lines;
87 next LINE unless @lines;
95 IO::Pipeline - map and grep for filehandles, unix pipe style
100 2010-03-21 16:15:30 1NtNoI-000658-6V Completed
101 2010-03-21 16:17:29 1NtNlx-00062B-0R Completed
102 2010-03-21 16:20:37 1NtNtF-0006AE-G6 Completed
103 2010-03-21 16:28:37 no host name found for IP address 218.108.42.254
104 2010-03-21 16:28:51 H=(ZTZUWWCRQY) [218.108.42.254] F=<pansiesyd75@setupper.com> rejected RCPT <inline@trout.me.uk>: rejected because 218.108.42.254 is in a black list at zen.spamhaus.org
105 2010-03-21 16:28:51 unexpected disconnection while reading SMTP command from (ZTZUWWCRQY) [218.108.42.254] (error: Connection reset by peer)
106 2010-03-21 16:35:57 no host name found for IP address 123.122.231.66
107 2010-03-21 16:35:59 H=(LFMTSDM) [123.122.231.66] F=<belladonnai6@buybuildanichestore.com> rejected RCPT <tal@fyrestorm.co.uk>: rejected because 123.122.231.66 is in a black list at zen.spamhaus.org
110 open my $in, '<', \$source
111 or die "Failed to create filehandle from scalar: $!";
116 | pmap { [ /^(\S+) (\S+) (.*)$/ ] }
117 | pgrep { $_->[2] =~ /rejected|Completed/ }
118 | pmap { [ @{$_}[0, 1], $_->[2] =~ /rejected/ ? 'Rejected' : 'Completed' ] }
119 | pmap { join(' ', @$_)."\n" }
120 | psink { $out .= $_ };
126 2010-03-21 16:15:30 Completed
127 2010-03-21 16:17:29 Completed
128 2010-03-21 16:20:37 Completed
129 2010-03-21 16:28:51 Rejected
130 2010-03-21 16:35:59 Rejected
134 IO::Pipeline was born of the idea that I really like writing map/grep type
135 expressions in perl, but writing:
139 does a slurp of the filehandle, and when processing big log files I tend
140 to Not Want That To Happen. Plus, map restricts us to right-to-left processing
141 and I've always been fond of the shell metaphor of connecting commands
142 together left-to-read in a pipeline.
144 So, this module was born.
148 will export three functions - L</pmap>, L</pgrep> and L</psink>. The first
149 two are the meat of the module, the last one is a means to test by sending
150 results somewhere other than a filehandle (or to chain IO::Pipeline output
151 on to ... well, anywhere else, really).
153 pmap and pgrep both return pipeline objects (currently of class IO::Pipeline,
154 but this is considered an implementation detail, not a feature - so please
155 don't write code that relies on it) that provide an overloaded '|' operator.
157 my $mapper = pmap { "[header] ".$_ };
159 my $filter = pgrep { /ALERT/ };
161 When you use | to chain two pipeline objects together, you get another
164 my $combined = $mapper | $filter;
166 Although since we're going left to right, you probably want to do the grep
169 my $combined = $filter | $mapper;
171 (but it's all the same to IO::Pipeline, of course)
173 When you use | with a filehandle on one side, that sets the start or
174 finish of the pipeline, so:
176 my $combined_with_input = $readable_fh | $combined;
178 my $combined_with_output = $combined | $writeable_fh;
180 and if you don't want a real filehandle for the second option, you can use
185 my $combined_with_output = $combined | psink { $output .= $_ };
187 Once both an input and an output have been provided, IO::Pipeline runs the
188 full pipeline, reading from the input and pushing one line at a time down
189 the pipe to the output until the input filehandle is exhausted.
191 Non-completed pipeline objects are completely re-usable though - so you can
192 (and are expected to) do things like:
194 my $combined_to_stdout = $combined | \*STDOUT;
196 foreach my $file (@files_to_process) {
198 open my $in, '<', $file
199 or die "Couldn't open ${file}: $!";
201 $in | $combined_to_stdout;
204 =head1 EXPORTED FUNCTIONS
208 my $mapper = pmap { <return zero or more new lines based on $_> };
210 A pipeline part built with pmap gets invoked for each line on the pipeline,
211 with the line in both $_ and $_[0].
213 It may, as with perl's map operator, return zero or more elements. If it
214 returns nothing at all, IO::Pipeline will go back to the start of the pipe
215 chain and read another line to restart processing with. If it returns
216 one or more lines, each one is fed in turn into the rest of the pipe chain.
218 Most of the time, you probably just want to modify the line somehow and then
219 return it (note that $_ is a copy of the input line so this is safe):
221 my $fix_teh = pmap { s/teh/the/g; $_; };
223 Note that you still need to actively return $_ for the pipe to continue
224 (again, as with perl's map operator).
228 my $filter = pgrep { <return true or false to keep or throw away $_> };
230 A pipeline part built with pgrep gets invoked for each line on the pipeline,
231 with the line in both $_ and $_[0].
233 If it returns a true value, the line is passed on to the next stage of the
234 pipeline. If it returns a false value, the line is thrown away and IO::Pipeline
235 will go back to the start of the pipe chain and read another line to restart
238 The upshot of this is that any pgrep can be turned trivially into a pmap:
240 my $filter = pgrep { /ALERT/ };
242 is precisely equivalent to:
244 my $filter = pmap { /ALERT/ ? ($_) : () };
246 but the pgrep form is rather clearer.
252 my $sink = psink { $output .= $_ };
254 A pipe sink is an alternative to an output filehandle as the last element
255 of a pipeline. Where in the case of a normal filehandle a line would be
256 printed to the handle, given a sink IO::Pipeline will call the code block
259 $pipeline | \*STDOUT;
263 $pipeline | psink { print STDOUT $_; }
265 will have exactly the same end result.
267 If you're looking for the source version of this, there isn't one built in
268 because L<IO::Handle::Util|Yuval Kogman's IO::Handle::Util module> already
269 provides an io_from_getline construct that does that, along with a bunch
270 more things that you may find very useful.
272 =head1 DECONSTRUCTING THE SYNOPSIS
274 Start with an input filehandle:
278 Next, we split the line up - so
280 2010-03-21 16:15:30 1NtNoI-000658-6V Completed
284 [ '2010-03-21', '16:15:30', '1NtNoI-000658-6V Completed' ]
286 using a regexp in list context so that all the match values fall out into
287 a new anonymous array reference:
289 | pmap { [ /^(\S+) (\S+) (.*)$/ ] }
291 Now we've separated out the message, we want to throw away anything that isn't
292 either a 'rejected' or 'Completed' line, so we test the last element of the
295 | pgrep { $_->[2] =~ /rejected|Completed/ }
297 Now we know which is which, we want to turn
299 [ '2010-03-21', '16:15:30', '1NtNoI-000658-6V Completed' ]
303 [ '2010-03-21', '16:15:30', 'Completed' ]
305 and similarly for rejected lines. Since we know both lines are one or the
306 other, we can simply test for 'rejected' in the line -
308 $_->[2] =~ /rejected/ ? 'Rejected' : 'Completed'
310 and then we construct a new array reference consisting of the first two
311 elements of the original array
315 plus the new value for the third element:
317 | pmap { [ @{$_}[0, 1], $_->[2] =~ /rejected/ ? 'Rejected' : 'Completed' ] }
319 This done, we can now reassemble the line using join (remembering to add a
320 newline since IO::Pipeline doesn't in case you didn't want one)
322 | pmap { join(' ', @$_)."\n" }
324 and then in lieu of sending it somewhere else, since this is just a
325 demonstration code fragment, add a sink that appends things onto the end of
326 a variable so that we can examine the results:
328 | psink { $out .= $_ };
332 Matt S. Trout (mst) <mst@shadowcat.co.uk>
336 None as yet, though I'm sure that'll change as soon as people spot the
337 giant gaping holes that inevitably exist in any software only used by
342 Copyright (c) 2010 the IO::Pipeline L</AUTHOR> and L</CONTRIBUTORS>
347 This library is free software and may be distributed under the same terms
352 Right now, your best routes are probably (a) to come ask questions on
353 #perl on irc.freenode.net or #perl-help on irc.perl.org (I'm on there with
354 nick mst if nobody else around at the time manages to help you first) or
355 (b) to email me directly at the address given in L</AUTHOR> above. You're
356 also welcome to use rt.cpan.org to report bugs (which you can do without
357 a login by mailing bugs-IO-Pipeline at that domain), but please cc my
358 email address as well on grounds of me being a Bad Person and thereby not
359 always spotting tickets.
363 This code lives in git.shadowcat.co.uk and can be viewed via gitweb using
365 http://git.shadowcat.co.uk/gitweb/gitweb.cgi?p=p5sagit/IO-Pipeline.git;a=summary
367 or checked out via git-daemon using
369 git://git.shadowcat.co.uk/p5sagit/IO-Pipeline.git