3 package Data::Stream::Bulk;
6 use namespace::clean -except => 'meta';
10 requires qw(next is_done);
15 if ( my $a = $self->next ) {
27 while ( my $next = $self->next ) {
35 my ( $self, @streams ) = @_;
37 return $self unless @streams;
39 my @cat = $self->list_cat(@streams);
42 return Data::Stream::Bulk::Nil->new;
43 } elsif ( @cat == 1 ) {
46 return Data::Stream::Bulk::Cat->new(
53 my ( $self, $head, @tail ) = @_;
55 return $self unless $head;
56 return ( $self, $head->list_cat(@tail) );
60 my ( $self, $filter ) = @_;
62 return Data::Stream::Bulk::Filter->new(
70 # load it *after* the entire role is defined
71 require Data::Stream::Bulk::Cat;
72 require Data::Stream::Bulk::Nil;
73 require Data::Stream::Bulk::Filter;
83 Data::Stream::Bulk - N at a time iteration API
87 # get a bulk stream from somewere
88 my $s = Data::Stream::Bulk::Foo->new( ... );
90 # can be used like this:
91 until ( $s->is_done ) {
92 foreach my $item ( $s->items ) {
98 while( my $block = $s->next ) {
99 foreach my $item ( @$block ) {
106 This module tries to find middle ground between one at a time and all at once
107 processing of data sets.
109 The purpose of this module is to avoid the overhead of implementing an
110 iterative api when this isn't necessary, without breaking forward
111 compatibility in case that becomes necessary later on.
113 The API optimizes for when a data set typically fits in memory and is returned
114 as an array, but the consumer cannot assume that the data set is bounded.
116 The API is destructive in order to minimize the chance that resultsets are
117 leaked due to improper usage.
121 =head2 Required Methods
123 The API requires two methods to be implemented:
129 Should return true if the stream is exhausted.
131 As long as this method returns a false value (not done) C<next> could
132 potentially return another block.
136 Returns the next block.
138 Note that C<next> is not guaranteed to return an array reference, even if
139 C<is_done> returned false prior to calling it.
143 =head2 Convenience Methods
149 This method calls C<next> and dereferences the result if there are pending
154 Force evaluation of the entire resultset.
156 Note that for large data sets this might cause swap thrashing of various other
157 undesired effects. Use with caution.
161 Concatenates this stream with @streams, returning a single stream.
165 Returns a possibly cleaned up list of streams.
169 Overridden by L<Data::Stream::Bulk::Array>, L<Data::Stream::Bulk::Cat> and
170 L<Data::Stream::Bulk::Nil> to implement some simple short circuiting.
174 Applies a per-block block filter to the stream.
176 Returns a possibly new stream with the filtering layered.
178 C<$filter> is invoked once per block and should return an array reference to
183 Should be overridden to return true if all the items are already realized (e.g.
184 in the case of L<Data::Stream::Bulk::Array>).
186 Returns false by default.
188 When true calling C<all> is supposed to be safe (memory usage should be in the
189 same order of magnitude as stream's own usage).
191 This is typically useful when tranforming an array is easier than transorming a
192 stream (e.g. optional duplicate filtering).
200 =item L<Data::Stream::Bulk::Array>
202 This class is not a stream at all, but just one block. When the data set easily
203 fits in memory this class can be used, while retaining forward compatibility
204 with larger data sets.
206 =item L<Data::Stream::Bulk::Callback>
208 Callback driven iteration.
210 =item L<Data::Stream::Bulk::DBI>
212 Bulk fetching of data from L<DBI> statement handles.
214 =item L<Data::Stream::Bulk::DBIC>
216 L<DBIx::Class::ResultSet> iteration.
218 =item L<Data::Stream::Bulk::Nil>
222 =item L<Data::Stream::Bulk::Cat>
224 A concatenation of several streams.
226 =item L<Data::Stream::Bulk::Filter>
228 A filter wrapping a stream.
234 L<HOP::Stream>, L<Iterator>, L<Class::Iterator> etc for one by one iteration
236 L<DBI>, L<DBIx::Class::ResultSet>
242 L<Parallel::Iterator>
244 L<http://en.wikipedia.org/wiki/MapReduce>, LISP, and all that other kool aid
252 Add a hint for sorted streams (like C<loaded> but as an attribute in the base
255 Introduce a C<merge> operation for merging of sorted streams.
257 Optimize C<unique> to make use of sorting hints for constant space uniquing.
259 =item More utility functions
261 To assist in proccessing and creating streams.
263 =item Coercion tables
265 L<Moose::Util::TypeConstraints>
269 =head1 VERSION CONTROL
271 This module is maintained using git. You can get the latest version from
272 L<http://github.com/nothingmuch/data-stream-bulk/>.
276 Yuval Kogman E<lt>nothingmuch@woobling.orgE<gt>
280 Copyright (c) 2008 Yuval Kogman. All rights reserved
281 This program is free software; you can redistribute
282 it and/or modify it under the same terms as Perl itself.