sub {
my ($evt, $stream) = @_;
my $emit = $self->_stream_from_array(@$events);
- if ($evt->{is_in_place_close}) {
- return [ $evt, $emit ];
- }
- my ($filtered_evt, $coll) = @{$self->collect(undef, 1)->(@_)};
- return [ $filtered_evt, $self->_stream_concat($coll, $emit) ];
+ my $coll = $self->collect(undef, 1)->(@_);
+ return ref($coll) eq 'HASH' # single event, no collect
+ ? [ $coll, $emit ]
+ : [ $coll->[0], $self->_stream_concat($coll->[1], $emit) ];
};
}
sub {
my ($evt, $stream) = @_;
my $emit = $self->_stream_from_array(@$events);
- if ($evt->{is_in_place_close}) {
- return $emit
- }
- return $self->_stream_concat($emit, $self->collect->(@_));
+ my $coll = $self->collect->(@_);
+ return $coll ? $self->_stream_concat($emit, $coll) : $emit;
};
}