Commit | Line | Data |
d5217b4d |
1 | package IO::Pipeline; |
2 | |
3 | use strict; |
4 | use warnings FATAL => 'all'; |
fe528e03 |
5 | use 5.008001; |
d5217b4d |
6 | use Scalar::Util qw(blessed); |
7 | use IO::Handle; |
8 | use Exporter (); |
9 | |
10 | our @ISA = qw(Exporter); |
11 | |
12 | our @EXPORT = qw(pmap pgrep psink); |
13 | |
bd273c77 |
14 | our $VERSION = '0.009002'; # 0.9.2 |
fe528e03 |
15 | |
16 | $VERSION = eval $VERSION; |
17 | |
d5217b4d |
18 | sub import { |
19 | warnings->unimport('void'); |
20 | shift->export_to_level(1, @_); |
21 | } |
22 | |
23 | sub pmap (&) { IO::Pipeline->from_code_map($_[0]) } |
24 | sub pgrep (&) { IO::Pipeline->from_code_grep($_[0]) } |
25 | sub psink (&) { IO::Pipeline->from_code_sink($_[0]) } |
26 | |
27 | use overload |
28 | '|' => '_pipe_operator', |
29 | fallback => 1; |
30 | |
31 | sub IO::Pipeline::CodeSink::print { |
32 | my $code = (shift)->{code}; |
33 | foreach my $line (@_) { |
34 | local $_ = $line; |
35 | $code->($line); |
36 | } |
37 | } |
38 | |
39 | sub from_code_map { |
40 | bless({ map => [ $_[1] ] }, $_[0]); |
41 | } |
42 | |
43 | sub from_code_grep { |
44 | my ($class, $grep) = @_; |
45 | $class->from_code_map(sub { $grep->($_) ? ($_) : () }); |
46 | } |
47 | |
48 | sub from_code_sink { |
49 | bless({ code => $_[1] }, 'IO::Pipeline::CodeSink'); |
50 | } |
51 | |
52 | sub _pipe_operator { |
53 | my ($self, $other, $reversed) = @_; |
54 | if (blessed($other) && $other->isa('IO::Pipeline')) { |
55 | my ($left, $right) = $reversed ? ($other, $self) : ($self, $other); |
56 | my %new = (map => [ @{$left->{map}}, @{$right->{map}} ]); |
57 | die "Right hand side has a source, makes no sense" |
58 | if $right->{source}; |
59 | $new{source} = $left->{source} if $left->{source}; |
60 | die "Left hand side has a sink, makes no sense" |
61 | if $left->{sink}; |
62 | $new{sink} = $right->{sink} if $right->{sink}; |
63 | return bless(\%new, ref($self)); |
64 | } else { |
65 | my ($is, $isnt) = $reversed ? qw(source sink) : qw(sink source); |
66 | if (my $fail = $self->{$is}) { |
67 | die "Tried to add ${is} ${other} but we already had ${fail}"; |
68 | } |
69 | my $new = bless({ $is => $other, %$self }, ref($self)); |
70 | if ($new->{$isnt}) { |
71 | $new->run; |
72 | return; |
73 | } else { |
74 | return $new; |
75 | } |
76 | } |
77 | } |
78 | |
79 | sub run { |
80 | my ($self) = @_; |
81 | my $source = $self->{source}; |
82 | my $sink = $self->{sink}; |
83 | LINE: while (defined(my $line = $source->getline)) { |
84 | my @lines = ($line); |
85 | foreach my $map (@{$self->{map}}) { |
86 | @lines = map $map->($_), @lines; |
87 | next LINE unless @lines; |
88 | } |
89 | $sink->print(@lines); |
90 | } |
91 | } |
92 | |
fe528e03 |
93 | =head1 NAME |
94 | |
95 | IO::Pipeline - map and grep for filehandles, unix pipe style |
96 | |
97 | =head1 SYNOPSIS |
98 | |
99 | my $source = <<'END'; |
100 | 2010-03-21 16:15:30 1NtNoI-000658-6V Completed |
101 | 2010-03-21 16:17:29 1NtNlx-00062B-0R Completed |
102 | 2010-03-21 16:20:37 1NtNtF-0006AE-G6 Completed |
103 | 2010-03-21 16:28:37 no host name found for IP address 218.108.42.254 |
104 | 2010-03-21 16:28:51 H=(ZTZUWWCRQY) [218.108.42.254] F=<pansiesyd75@setupper.com> rejected RCPT <inline@trout.me.uk>: rejected because 218.108.42.254 is in a black list at zen.spamhaus.org |
105 | 2010-03-21 16:28:51 unexpected disconnection while reading SMTP command from (ZTZUWWCRQY) [218.108.42.254] (error: Connection reset by peer) |
106 | 2010-03-21 16:35:57 no host name found for IP address 123.122.231.66 |
107 | 2010-03-21 16:35:59 H=(LFMTSDM) [123.122.231.66] F=<belladonnai6@buybuildanichestore.com> rejected RCPT <tal@fyrestorm.co.uk>: rejected because 123.122.231.66 is in a black list at zen.spamhaus.org |
108 | END |
109 | |
110 | open my $in, '<', \$source |
111 | or die "Failed to create filehandle from scalar: $!"; |
112 | |
113 | my $out; |
114 | |
115 | $in |
116 | | pmap { [ /^(\S+) (\S+) (.*)$/ ] } |
117 | | pgrep { $_->[2] =~ /rejected|Completed/ } |
118 | | pmap { [ @{$_}[0, 1], $_->[2] =~ /rejected/ ? 'Rejected' : 'Completed' ] } |
119 | | pmap { join(' ', @$_)."\n" } |
120 | | psink { $out .= $_ }; |
121 | |
122 | print $out; |
123 | |
124 | will print: |
125 | |
126 | 2010-03-21 16:15:30 Completed |
127 | 2010-03-21 16:17:29 Completed |
128 | 2010-03-21 16:20:37 Completed |
129 | 2010-03-21 16:28:51 Rejected |
130 | 2010-03-21 16:35:59 Rejected |
131 | |
132 | =head1 DESCRIPTION |
133 | |
134 | IO::Pipeline was born of the idea that I really like writing map/grep type |
135 | expressions in perl, but writing: |
136 | |
137 | map { ... } <$fh>; |
138 | |
139 | does a slurp of the filehandle, and when processing big log files I tend |
140 | to Not Want That To Happen. Plus, map restricts us to right-to-left processing |
141 | and I've always been fond of the shell metaphor of connecting commands |
142 | together left-to-read in a pipeline. |
143 | |
144 | So, this module was born. |
145 | |
146 | use IO::Pipeline; |
147 | |
148 | will export three functions - L</pmap>, L</pgrep> and L</psink>. The first |
149 | two are the meat of the module, the last one is a means to test by sending |
150 | results somewhere other than a filehandle (or to chain IO::Pipeline output |
151 | on to ... well, anywhere else, really). |
152 | |
153 | pmap and pgrep both return pipeline objects (currently of class IO::Pipeline, |
154 | but this is considered an implementation detail, not a feature - so please |
155 | don't write code that relies on it) that provide an overloaded '|' operator. |
156 | |
157 | my $mapper = pmap { "[header] ".$_ }; |
158 | |
159 | my $filter = pgrep { /ALERT/ }; |
160 | |
161 | When you use | to chain two pipeline objects together, you get another |
162 | pipeline object: |
163 | |
164 | my $combined = $mapper | $filter; |
165 | |
166 | Although since we're going left to right, you probably want to do the grep |
167 | first: |
168 | |
169 | my $combined = $filter | $mapper; |
170 | |
171 | (but it's all the same to IO::Pipeline, of course) |
172 | |
173 | When you use | with a filehandle on one side, that sets the start or |
174 | finish of the pipeline, so: |
175 | |
176 | my $combined_with_input = $readable_fh | $combined; |
177 | |
178 | my $combined_with_output = $combined | $writeable_fh; |
179 | |
180 | and if you don't want a real filehandle for the second option, you can use |
181 | psink: |
182 | |
183 | my $output = ''; |
184 | |
185 | my $combined_with_output = $combined | psink { $output .= $_ }; |
186 | |
187 | Once both an input and an output have been provided, IO::Pipeline runs the |
188 | full pipeline, reading from the input and pushing one line at a time down |
189 | the pipe to the output until the input filehandle is exhausted. |
190 | |
191 | Non-completed pipeline objects are completely re-usable though - so you can |
192 | (and are expected to) do things like: |
193 | |
20f0adf3 |
194 | my $combined_to_stdout = $combined | \*STDOUT; |
fe528e03 |
195 | |
196 | foreach my $file (@files_to_process) { |
197 | |
198 | open my $in, '<', $file |
199 | or die "Couldn't open ${file}: $!"; |
200 | |
201 | $in | $combined_to_stdout; |
202 | } |
203 | |
204 | =head1 EXPORTED FUNCTIONS |
205 | |
206 | =head2 pmap |
207 | |
208 | my $mapper = pmap { <return zero or more new lines based on $_> }; |
209 | |
210 | A pipeline part built with pmap gets invoked for each line on the pipeline, |
211 | with the line in both $_ and $_[0]. |
212 | |
213 | It may, as with perl's map operator, return zero or more elements. If it |
214 | returns nothing at all, IO::Pipeline will go back to the start of the pipe |
215 | chain and read another line to restart processing with. If it returns |
216 | one or more lines, each one is fed in turn into the rest of the pipe chain. |
217 | |
218 | Most of the time, you probably just want to modify the line somehow and then |
219 | return it (note that $_ is a copy of the input line so this is safe): |
220 | |
221 | my $fix_teh = pmap { s/teh/the/g; $_; }; |
222 | |
223 | Note that you still need to actively return $_ for the pipe to continue |
224 | (again, as with perl's map operator). |
225 | |
226 | =head2 pgrep |
227 | |
228 | my $filter = pgrep { <return true or false to keep or throw away $_> }; |
229 | |
230 | A pipeline part built with pgrep gets invoked for each line on the pipeline, |
231 | with the line in both $_ and $_[0]. |
232 | |
233 | If it returns a true value, the line is passed on to the next stage of the |
234 | pipeline. If it returns a false value, the line is thrown away and IO::Pipeline |
235 | will go back to the start of the pipe chain and read another line to restart |
236 | processing with. |
237 | |
238 | The upshot of this is that any pgrep can be turned trivially into a pmap: |
239 | |
240 | my $filter = pgrep { /ALERT/ }; |
241 | |
242 | is precisely equivalent to: |
243 | |
244 | my $filter = pmap { /ALERT/ ? ($_) : () }; |
245 | |
246 | but the pgrep form is rather clearer. |
247 | |
248 | =head2 psink |
249 | |
250 | my $output = ''; |
251 | |
252 | my $sink = psink { $output .= $_ }; |
253 | |
254 | A pipe sink is an alternative to an output filehandle as the last element |
255 | of a pipeline. Where in the case of a normal filehandle a line would be |
256 | printed to the handle, given a sink IO::Pipeline will call the code block |
257 | provided. So: |
258 | |
259 | $pipeline | \*STDOUT; |
260 | |
261 | and |
262 | |
263 | $pipeline | psink { print STDOUT $_; } |
264 | |
265 | will have exactly the same end result. |
266 | |
267 | If you're looking for the source version of this, there isn't one built in |
268 | because L<IO::Handle::Util|Yuval Kogman's IO::Handle::Util module> already |
269 | provides an io_from_getline construct that does that, along with a bunch |
270 | more things that you may find very useful. |
271 | |
272 | =head1 DECONSTRUCTING THE SYNOPSIS |
273 | |
274 | Start with an input filehandle: |
275 | |
276 | $in |
277 | |
278 | Next, we split the line up - so |
279 | |
280 | 2010-03-21 16:15:30 1NtNoI-000658-6V Completed |
281 | |
282 | becomes |
283 | |
284 | [ '2010-03-21', '16:15:30', '1NtNoI-000658-6V Completed' ] |
285 | |
286 | using a regexp in list context so that all the match values fall out into |
287 | a new anonymous array reference: |
288 | |
289 | | pmap { [ /^(\S+) (\S+) (.*)$/ ] } |
290 | |
291 | Now we've separated out the message, we want to throw away anything that isn't |
292 | either a 'rejected' or 'Completed' line, so we test the last element of the |
293 | split line for that: |
294 | |
295 | | pgrep { $_->[2] =~ /rejected|Completed/ } |
296 | |
297 | Now we know which is which, we want to turn |
298 | |
299 | [ '2010-03-21', '16:15:30', '1NtNoI-000658-6V Completed' ] |
300 | |
301 | into |
302 | |
303 | [ '2010-03-21', '16:15:30', 'Completed' ] |
304 | |
305 | and similarly for rejected lines. Since we know both lines are one or the |
306 | other, we can simply test for 'rejected' in the line - |
307 | |
308 | $_->[2] =~ /rejected/ ? 'Rejected' : 'Completed' |
309 | |
310 | and then we construct a new array reference consisting of the first two |
311 | elements of the original array |
312 | |
313 | @{$_}[0, 1] |
314 | |
315 | plus the new value for the third element: |
316 | |
317 | | pmap { [ @{$_}[0, 1], $_->[2] =~ /rejected/ ? 'Rejected' : 'Completed' ] } |
318 | |
319 | This done, we can now reassemble the line using join (remembering to add a |
320 | newline since IO::Pipeline doesn't in case you didn't want one) |
321 | |
322 | | pmap { join(' ', @$_)."\n" } |
323 | |
324 | and then in lieu of sending it somewhere else, since this is just a |
325 | demonstration code fragment, add a sink that appends things onto the end of |
326 | a variable so that we can examine the results: |
327 | |
328 | | psink { $out .= $_ }; |
329 | |
330 | =head1 AUTHOR |
331 | |
332 | Matt S. Trout (mst) <mst@shadowcat.co.uk> |
333 | |
d6633b53 |
334 | =head1 CONTRIBUTORS |
fe528e03 |
335 | |
336 | None as yet, though I'm sure that'll change as soon as people spot the |
337 | giant gaping holes that inevitably exist in any software only used by |
338 | the author so far. |
339 | |
340 | =head1 COPYRIGHT |
341 | |
bd273c77 |
342 | Copyright (c) 2010 the IO::Pipeline L</AUTHOR> and L</CONTRIBUTORS> |
fe528e03 |
343 | as listed above. |
344 | |
345 | =head1 LICENSE |
346 | |
347 | This library is free software and may be distributed under the same terms |
348 | as perl itself. |
349 | |
350 | =head1 SUPPORT |
351 | |
352 | Right now, your best routes are probably (a) to come ask questions on |
353 | #perl on irc.freenode.net or #perl-help on irc.perl.org (I'm on there with |
354 | nick mst if nobody else around at the time manages to help you first) or |
355 | (b) to email me directly at the address given in L</AUTHOR> above. You're |
356 | also welcome to use rt.cpan.org to report bugs (which you can do without |
357 | a login by mailing bugs-IO-Pipeline at that domain), but please cc my |
358 | email address as well on grounds of me being a Bad Person and thereby not |
359 | always spotting tickets. |
360 | |
361 | =head1 SOURCE CODE |
362 | |
363 | This code lives in git.shadowcat.co.uk and can be viewed via gitweb using |
364 | |
365 | http://git.shadowcat.co.uk/gitweb/gitweb.cgi?p=p5sagit/IO-Pipeline.git;a=summary |
366 | |
367 | or checked out via git-daemon using |
368 | |
369 | git://git.shadowcat.co.uk/p5sagit/IO-Pipeline.git |
370 | |
371 | =cut |
372 | |
d5217b4d |
373 | 1; |