sub {
my ($evt, $stream) = @_;
my $emit = $self->_stream_from_array(@$events);
- if ($evt->{is_in_place_close}) {
- return [ $evt, $emit ];
- }
- my ($filtered_evt, $coll) = @{$self->collect(undef, 1)->(@_)};
- return [ $filtered_evt, $self->_stream_concat($coll, $emit) ];
+ my $coll = $self->collect(undef, { passthrough => 1 })->(@_);
+ return ref($coll) eq 'HASH' # single event, no collect
+ ? [ $coll, $emit ]
+ : [ $coll->[0], $self->_stream_concat($coll->[1], $emit) ];
};
}
sub {
my ($evt, $stream) = @_;
my $emit = $self->_stream_from_array(@$events);
- if ($evt->{is_in_place_close}) {
- return $emit
- }
- return $self->_stream_concat($emit, $self->collect->(@_));
+ my $coll = $self->collect->(@_);
+ return $coll ? $self->_stream_concat($emit, $coll) : $emit;
};
}
sub collect {
- my ($self, $into, $passthrough) = @_;
+ my ($self, $into, $attrs) = @_;
+ my ($passthrough) = @{$attrs}{qw(passthrough)};
sub {
my ($evt, $stream) = @_;
push(@$into, $evt) if $into;