5816b03e80bf9c090c037b7f91f383ec77206737
[scpubgit/DKit.git] / lib / DX / ResultStream.pm
1 package DX::ResultStream;
2
3 use DX::Result;
4 use Moo;
5
6 has for_state => (is => 'ro', required => 1);
7
8 has _current_state => (is => 'rw');
9
10 has is_exhausted => (is => 'rwp');
11
12 has observation_policy => (is => 'ro', default => sub { sub { 0 } });
13
14 sub next {
15   my ($self) = @_;
16   return if $self->is_exhausted;
17   my $state = do {
18     if (my $cur = $self->_current_state) {
19       $cur->push_backtrack;
20     } else {
21       $self->for_state
22     }
23   };
24   STATE: while ($state = $self->_current_state($state->run)) {
25     last if $state->isa('DX::State');
26     if ($state->isa('DX::ObservationRequired')) {
27       if ($self->observation_policy->($state->observer)) {
28         my ($type, $value) = $state->observer->run;
29         $state = $state->resume;
30         $state->facts->{$type}->set_value($value);
31       } else {
32         die "Observation refused";
33       }
34     } else {
35       die "WTF: ".$state;
36     }
37   }
38   unless ($state) {
39     $self->_set_is_exhausted(1);
40     return;
41   }
42   return DX::Result->new(state => $state->copy_vars);
43 }
44
45 sub results {
46   my ($self) = @_;
47   my @all;
48   while (my $next = $self->next) {
49     push @all, $next;
50   }
51   return @all;
52 }
53
54 1;