factor out ArrayStream, update new stream types to respect peek
[catagits/HTML-Zoom.git] / lib / HTML / Zoom / StreamUtils.pm
index fcd341c..3b2f921 100644 (file)
@@ -4,8 +4,11 @@ use strict;
 use warnings FATAL => 'all';
 use base qw(HTML::Zoom::SubObject);
 
+use Scalar::Util ();
+
 use HTML::Zoom::CodeStream;
 use HTML::Zoom::FilterStream;
+use HTML::Zoom::ArrayStream;
 
 sub stream_from_code {
   my ($self, $code) = @_;
@@ -18,10 +21,10 @@ sub stream_from_code {
 sub stream_from_array {
   my $self = shift;
   my @array = @_;
-  $self->stream_from_code(sub {
-    return unless @array;
-    return shift @array;
-  });
+  HTML::Zoom::ArrayStream->new({
+    array => \@array,
+    zconfig => $self->_zconfig,
+  })
 }
 
 sub stream_concat {
@@ -42,6 +45,9 @@ sub stream_from_proto {
     return $proto->();
   } elsif ($ref eq 'SCALAR') {
     return $self->_zconfig->parser->html_to_stream($$proto);
+  } elsif (Scalar::Util::blessed($proto) && $proto->can('to_stream')) {
+    my $stream = $proto->to_stream;
+    return $self->stream_from_code(sub { $stream->next });
   }
   die "Don't know how to turn $proto (ref $ref) into a stream";
 }
@@ -56,4 +62,28 @@ sub wrap_with_filter {
   })
 }
 
+sub stream_to_array {
+  my $stream = $_[1];
+  my @array;
+  while (my ($evt) = $stream->next) { push @array, $evt }
+  return @array;
+}
+
+sub flatten_stream_of_streams {
+  my ($self, $source_stream) = @_;
+  my $cur_stream;
+  HTML::Zoom::CodeStream->new({
+    code => sub {
+      return unless $source_stream;
+      my $next;
+      until (($next) = ($cur_stream ? $cur_stream->next : ())) {
+        unless (($cur_stream) = $source_stream->next) {
+          undef $source_stream; return;
+        }
+      }
+      return $next;
+    }
+  });
+}
+
 1;