e439ce9d05094bfd04a50042e2a1a03b194c8310
[scpubgit/DKit.git] / lib / DX / ResultStream.pm
1 package DX::ResultStream;
2
3 use Moo;
4
5 has for_state => (is => 'ro', required => 1);
6
7 has _current_state => (is => 'rw');
8
9 has is_exhausted => (is => 'rwp');
10
11 has observation_policy => (is => 'ro', default => sub { sub { 0 } });
12
13 sub next {
14   my ($self) = @_;
15   return if $self->is_exhausted;
16   my $state = do {
17     if (my $cur = $self->_current_state) {
18       $cur->push_backtrack;
19     } else {
20       $self->for_state
21     }
22   };
23   STATE: while ($state = $self->_current_state($state->run)) {
24     last if $state->isa('DX::State');
25     if ($state->isa('DX::ObservationRequired')) {
26       if ($self->observation_policy->($state->observer)) {
27         my ($type, $value) = $state->observer->run;
28         $state = $state->resume;
29         $state->facts->{$type}->set_value($value);
30       } else {
31         die "Observation refused";
32       }
33     } else {
34       die "WTF: ".$state;
35     }
36   }
37   unless ($state) {
38     $self->_set_is_exhausted(1);
39     return;
40   }
41   return +{
42     map +($_ => $state->scope_var($_)->copy), keys %{$state->scope}
43   };
44 }
45
46 sub results {
47   my ($self) = @_;
48   my @all;
49   while (my $next = $self->next) {
50     push @all, $next;
51   }
52   return @all;
53 }
54
55 1;