1 package DX::ResultStream;
6 has for_state => (is => 'ro', required => 1);
8 has _current_state => (is => 'rw');
10 has is_exhausted => (is => 'rwp');
12 has observation_policy => (is => 'ro', default => sub { sub { 0 } });
16 return if $self->is_exhausted;
18 if (my $cur = $self->_current_state) {
24 STATE: while ($state = $self->_current_state($state->run)) {
25 last if $state->isa('DX::State');
26 if ($state->isa('DX::ObservationRequired')) {
27 if ($self->observation_policy->($state->observer)) {
28 my ($type, $value) = $state->observer->run;
29 $state = $state->resume;
30 $state->facts->{$type}->set_value($value);
32 die "Observation refused";
39 $self->_set_is_exhausted(1);
42 return DX::Result->new(state => $state->copy_vars);
48 while (my $next = $self->next) {