X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=blobdiff_plain;f=lib%2FObject%2FRemote%2FRole%2FConnector.pm;h=4f008d731f89bec0af75fe00f72ae1e02e56a955;hb=f8080c1c188fa6c4589ffcad8793e0cf7a8d4bdb;hp=9c7ac7b26256909eaf00187a073ab588ce72dc42;hpb=47c83a1379a33fc8baa4a128edc1d75d780776b0;p=scpubgit%2FObject-Remote.git diff --git a/lib/Object/Remote/Role/Connector.pm b/lib/Object/Remote/Role/Connector.pm index 9c7ac7b..4f008d7 100644 --- a/lib/Object/Remote/Role/Connector.pm +++ b/lib/Object/Remote/Role/Connector.pm @@ -1,19 +1,67 @@ package Object::Remote::Role::Connector; -use Object::Remote::Connection; +use Module::Runtime qw(use_module); +use Object::Remote::Future; +use Object::Remote::Logging qw(:log :dlog ); use Moo::Role; requires '_open2_for'; +#TODO return to 10 seconds after debugging +#has timeout => (is => 'ro', default => sub { { after => 10 } }); +has timeout => (is => 'ro', default => sub { { after => 10 } }); + sub connect { my $self = shift; - my %args; - @args{qw(send_to_fh receive_from_fh child_pid)} = $self->_open2_for(@_); - my $line = readline($args{receive_from_fh}); - unless ($line eq "Shere\n") { - die "New remote container did not send Shere - got ${line}"; + Dlog_debug { "Perparing to create connection with args of: $_" } @_; + my ($send_to_fh, $receive_from_fh, $child_pid) = $self->_open2_for(@_); + my $channel = use_module('Object::Remote::ReadChannel')->new( + fh => $receive_from_fh + ); + return future { + log_trace { "Initializing connection for child pid '$child_pid'" }; + my $f = shift; + $channel->on_line_call(sub { + if ($_[0] eq "Shere") { + log_trace { "Received 'Shere' from child pid '$child_pid'; setting done handler to create connection" }; + $f->done( + use_module('Object::Remote::Connection')->new( + send_to_fh => $send_to_fh, + read_channel => $channel, + child_pid => $child_pid, + ) + ); + } else { + log_warn { "'Shere' was not found in connection data for child pid '$child_pid'" }; + $f->fail("Expected Shere from remote but received: $_[0]"); + } + undef($channel); + }); + $channel->on_close_call(sub { + log_trace { "Connection has been closed" }; + $f->fail("Channel closed without seeing Shere: $_[0]"); + undef($channel); + }); + log_trace { "initialized events on channel for child pid '$child_pid'; creating timeout" }; + Object::Remote->current_loop + ->watch_time( + %{$self->timeout}, + code => sub { +# log_warn { "Connection timed out for child pid '$child_pid'" }; +# $f->fail("Connection timed out") unless $f->is_ready; +# undef($channel); + Dlog_trace { "Connection timeout timer has fired for child pid '$child_pid'; is_ready: $_" } $f->is_ready; + unless($f->is_ready) { + log_warn { "Connection with child pid '$child_pid' has timed out" }; + $f->fail("Connection timed out") unless $f->is_ready; + } + undef($channel); + + } + ); + log_trace { "connection for child pid '$child_pid' has been initialized" }; + $f; } - return Object::Remote::Connection->new(\%args); } 1;