a4bce2e426d28bf498004e8e1370a28c6130e2cd
[scpubgit/Object-Remote.git] / lib / Object / Remote / Role / Connector.pm
1 package Object::Remote::Role::Connector;
2
3 use Module::Runtime qw(use_module);
4 use Object::Remote::Future;
5 use Object::Remote::Logging qw(:log :dlog );
6 use Moo::Role;
7
8 requires '_open2_for';
9
10 has timeout => (is => 'ro', default => sub { 10 });
11
12 sub connect {
13   my $self = shift;
14   Dlog_debug { "Preparing to create connection with args of: $_" } @_;
15   my ($send_to_fh, $receive_from_fh, $child_pid) = $self->_open2_for(@_);
16   my $channel = use_module('Object::Remote::ReadChannel')->new(
17     fh => $receive_from_fh
18   );
19   return future {
20     log_trace { "Initializing connection for child pid '$child_pid'" };
21     my $f = shift;
22     $channel->on_line_call(sub {
23       if ($_[0] eq "Shere") {
24         log_trace { "Received 'Shere' from child pid '$child_pid'; setting done handler to create connection" };
25         $f->done(
26           use_module('Object::Remote::Connection')->new(
27             send_to_fh => $send_to_fh,
28             read_channel => $channel,
29             child_pid => $child_pid,
30           )
31         );
32       } else {
33         log_warn { "'Shere' was not found in connection data for child pid '$child_pid'" };
34         $f->fail("Expected Shere from remote but received: $_[0]");
35       }
36       undef($channel);
37     });
38     $channel->on_close_call(sub {
39       log_trace { "Connection has been closed" };
40       $f->fail("Channel closed without seeing Shere: $_[0]");
41       undef($channel);
42     });
43     log_trace { "initialized events on channel for child pid '$child_pid'; creating timeout" };
44     Object::Remote->current_loop
45                   ->watch_time(
46                       after => $self->timeout,
47                       code => sub {
48                         Dlog_trace { "Connection timeout timer has fired for child pid '$child_pid'; is_ready: $_" } $f->is_ready;
49                         unless($f->is_ready) {
50                             log_warn { "Connection with child pid '$child_pid' has timed out" };
51                             $f->fail("Connection timed out") unless $f->is_ready;                                                
52                         }
53                         undef($channel);
54                         
55                       }
56                     );
57     log_trace { "connection for child pid '$child_pid' has been initialized" }; 
58     $f;
59   }
60 }
61
62 1;