break out future polling to be one at a time to try to determine which one is hanging
[scpubgit/Object-Remote.git] / lib / Object / Remote / Future.pm
1 package Object::Remote::Future;
2
3 use strict;
4 use warnings;
5 use base qw(Exporter);
6
7 use CPS::Future;
8
9 our @EXPORT = qw(future await_future await_all);
10
11 sub future (&;$) {
12   my $f = $_[0]->(CPS::Future->new);
13   return $f if ((caller(1+($_[1]||0))||'') eq 'start');
14   await_future($f);
15 }
16
17 our @await;
18
19 sub await_future {
20   my $f = shift;
21   return $f if $f->is_ready;
22   require Object::Remote;
23   my $loop = Object::Remote->current_loop;
24   {
25     local @await = (@await, $f);
26     $f->on_ready(sub {
27       $loop->stop if $f == $await[-1]
28     });
29     warn "running loop for ${f} [@{await}]";
30     $loop->run;
31   }
32   if (@await and $await[-1]->is_ready) {
33     $loop->stop;
34   }
35   return wantarray ? $f->get : ($f->get)[0];
36 }
37
38 sub await_all {
39   my @subs = @_;
40   for my $sub (@subs) {
41     warn "awaiting ${sub}";
42     await_future(CPS::Future->wait_all($sub));
43   }
44   map $_->get, @_;
45 }
46
47 package start;
48
49 our $start = sub { my ($obj, $call) = (shift, shift); $obj->$call(@_); };
50
51 sub AUTOLOAD {
52   my $invocant = shift;
53   my ($method) = our $AUTOLOAD =~ /^start::(.+)$/;
54   my $res;
55   unless (eval { $res = $invocant->$method(@_); 1 }) {
56     my $f = CPS::Future->new;
57     $f->fail($@);
58     return $f;
59   }
60   unless (Scalar::Util::blessed($res) and $res->isa('CPS::Future')) {
61     my $f = CPS::Future->new;
62     $f->done($res);
63     return $f;
64   }
65   return $res;
66 }
67
68 package maybe;
69
70 sub start {
71   my ($obj, $call) = (shift, shift);
72   if ((caller(1)||'') eq 'start') {
73     $obj->$start::start($call => @_);
74   } else {
75     $obj->$call(@_);
76   }
77 }
78
79 package maybe::start;
80
81 sub AUTOLOAD {
82   my $invocant = shift;
83   my ($method) = our $AUTOLOAD =~ /^maybe::start::(.+)$/;
84   $method = "start::${method}" if ((caller(1)||'') eq 'start');
85   $invocant->$method(@_);
86 }
87
88 package then;
89
90 sub AUTOLOAD {
91   my $invocant = shift;
92   my ($method) = our $AUTOLOAD =~ /^then::(.+)$/;
93   my @args = @_;
94   # Need two copies since if we're called on an already complete future
95   # $f will be freed immediately
96   my $ret = my $f = CPS::Future->new;
97   $invocant->on_fail(sub { $f->fail(@_); undef($f); });
98   $invocant->on_done(sub {
99     my ($obj) = @_;
100     my $next = $obj->${\"start::${method}"}(@args);
101     $next->on_done(sub { $f->done(@_); undef($f); });
102     $next->on_fail(sub { $f->fail(@_); undef($f); });
103   });
104   return $ret;
105 }
106
107 1;
108
109 =head1 NAME
110
111 Object::Remote::Future - Asynchronous calling for L<Object::Remote>
112
113 =head1 LAME
114
115 Shipping prioritised over writing this part up. Blame mst.
116
117 =cut