'Complex stream'
);
+my $call_op = FromCode->new(
+ code => sub {
+ my ($self, $state) = @_;
+ my @rst = @{$state->return_stack};
+ my $save_scope = $state->scope;
+ my %scope = (S => $save_scope->{S});
+ my $ret_op = FromCode->new(
+ code => sub { $_[1]->new(%{$_[1]}, scope => $save_scope, next_op => $_[0]->next) },
+ next => $self->next,
+ );
+ $state->new(%$state,
+ scope => \%scope,
+ return_stack => [ @rst, $ret_op ],
+ next_op => $op
+ );
+ },
+ next => FromCode->new(
+ code => bind_array(P => \@shells),
+ next => FromCode->new(
+ code => test_values([ qw(S P) ], sub { $shells{$_[1]}{$_[0]} }),
+ )
+ )
+);
+
+my $callstream = DX::ResultStream->new(
+ for_state => make_state([ qw(S P) ], $call_op)
+);
+
+is_deeply(
+ [ $callstream->results ],
+ [
+ { P => 'csh', S => 'jim.example.com' },
+ { P => 'csh', S => 'joe.example.com' },
+ { P => 'bash', S => 'joe.example.com' },
+ { P => 'csh', S => 'bob.example.com' },
+ ],
+ 'Call stream'
+);
+
done_testing;