1 package DX::ResultStream;
5 has for_state => (is => 'ro', required => 1);
7 has _current_state => (is => 'rw');
9 has is_exhausted => (is => 'rwp');
11 has observation_policy => (is => 'ro', default => sub { sub { 0 } });
15 return if $self->is_exhausted;
17 if (my $cur = $self->_current_state) {
23 STATE: while ($state = $self->_current_state($state->run)) {
24 last if $state->isa('DX::State');
25 if ($state->isa('DX::ObservationRequired')) {
26 if ($self->observation_policy->($state->observer)) {
27 my ($type, $value) = $state->observer->run;
28 $state = $state->resume;
29 $state->facts->{$type}->set_value($value);
31 die "Observation refused";
38 $self->_set_is_exhausted(1);
42 map +($_ => $state->scope_var($_)->copy), keys %{$state->scope}
49 while (my $next = $self->next) {