d5d528a3a971e9b8d697aabda1d48eb33e917424
[scpubgit/Object-Remote.git] / lib / Object / Remote / Future.pm
1 package Object::Remote::Future;
2
3 use strict;
4 use warnings;
5 use base qw(Exporter);
6
7 use Object::Remote::Logging qw( :log router );
8
9 BEGIN { router()->exclude_forwarding }
10
11 use CPS::Future;
12
13 our @EXPORT = qw(future await_future await_all);
14
15 sub future (&;$) {
16   my $f = $_[0]->(CPS::Future->new);
17   return $f if ((caller(1+($_[1]||0))||'') eq 'start');
18   await_future($f);
19 }
20
21 our @await;
22
23 sub await_future {
24   my $f = shift;
25   log_trace { my $ir = $f->is_ready; "await_future() invoked; is_ready: $ir" };
26   return $f if $f->is_ready;
27   require Object::Remote;
28   my $loop = Object::Remote->current_loop;
29   {
30     local @await = (@await, $f);
31     $f->on_ready(sub {
32       log_trace { my $l = @await; "future has become ready, length of \@await: '$l'" };
33       if ($f == $await[-1]) {
34         log_trace { "This future is not waiting on anything so calling stop on the run loop" };
35         $loop->stop;
36       }
37     });
38     log_trace { "Starting run loop for newly created future" };
39     $loop->run;
40   }
41   if (@await and $await[-1]->is_ready) {
42     log_trace { "Last future in await list was ready, stopping run loop" };
43     $loop->stop;
44   }
45   log_trace { "await_future() returning" };
46   return wantarray ? $f->get : ($f->get)[0];
47 }
48
49 sub await_all {
50   log_trace { my $l = @_; "await_all() invoked with '$l' futures to wait on" };
51   await_future(CPS::Future->wait_all(@_));
52   map $_->get, @_;
53 }
54
55 package start;
56
57 our $start = sub { my ($obj, $call) = (shift, shift); $obj->$call(@_); };
58
59 sub AUTOLOAD {
60   my $invocant = shift;
61   my ($method) = our $AUTOLOAD =~ /^start::(.+)$/;
62   my $res;
63   unless (eval { $res = $invocant->$method(@_); 1 }) {
64     my $f = CPS::Future->new;
65     $f->fail($@);
66     return $f;
67   }
68   unless (Scalar::Util::blessed($res) and $res->isa('CPS::Future')) {
69     my $f = CPS::Future->new;
70     $f->done($res);
71     return $f;
72   }
73   return $res;
74 }
75
76 package maybe;
77
78 sub start {
79   my ($obj, $call) = (shift, shift);
80   if ((caller(1)||'') eq 'start') {
81     $obj->$start::start($call => @_);
82   } else {
83     $obj->$call(@_);
84   }
85 }
86
87 package maybe::start;
88
89 sub AUTOLOAD {
90   my $invocant = shift;
91   my ($method) = our $AUTOLOAD =~ /^maybe::start::(.+)$/;
92   $method = "start::${method}" if ((caller(1)||'') eq 'start');
93   $invocant->$method(@_);
94 }
95
96 package then;
97
98 sub AUTOLOAD {
99   my $invocant = shift;
100   my ($method) = our $AUTOLOAD =~ /^then::(.+)$/;
101   my @args = @_;
102   # Need two copies since if we're called on an already complete future
103   # $f will be freed immediately
104   my $ret = my $f = CPS::Future->new;
105   $invocant->on_fail(sub { $f->fail(@_); undef($f); });
106   $invocant->on_done(sub {
107     my ($obj) = @_;
108     my $next = $obj->${\"start::${method}"}(@args);
109     $next->on_done(sub { $f->done(@_); undef($f); });
110     $next->on_fail(sub { $f->fail(@_); undef($f); });
111   });
112   return $ret;
113 }
114
115 1;
116
117 =head1 NAME
118
119 Object::Remote::Future - Asynchronous calling for L<Object::Remote>
120
121 =head1 LAME
122
123 Shipping prioritised over writing this part up. Blame mst.
124
125 =cut