From: Matt S Trout Date: Sat, 5 Nov 2011 22:22:50 +0000 (+0000) Subject: make connector time out correctly X-Git-Tag: v0.001001~27 X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=commitdiff_plain;h=986f5290d9338118f95f2293f705368fb142d270;p=scpubgit%2FTak.git make connector time out correctly --- diff --git a/bin/tak-repl b/bin/tak-repl index 2148f2d..2383219 100644 --- a/bin/tak-repl +++ b/bin/tak-repl @@ -2,15 +2,10 @@ use Tak::Client; use Tak::Router; -use Tak::MetaService; use Tak::REPL; use strictures 1; -my $router = Tak::Router->new; - -$router->register(meta => Tak::MetaService->new(router => $router)); - -my $client = Tak::Client->new(service => $router); +my $client = Tak::Client->new(service => Tak::Router->new); $client->curry('meta')->do(register => eval => 'Tak::EvalService'); diff --git a/lib/Tak/ConnectionReceiver.pm b/lib/Tak/ConnectionReceiver.pm index bb2ba15..fe3c33f 100644 --- a/lib/Tak/ConnectionReceiver.pm +++ b/lib/Tak/ConnectionReceiver.pm @@ -12,6 +12,8 @@ has channel => (is => 'ro', required => 1); has service => (is => 'ro', required => 1); +has on_close => (is => 'ro', required => 1); + sub BUILD { weaken(my $self = shift); my $channel = $self->channel; @@ -51,7 +53,7 @@ sub receive_progress { sub receive_result { my ($self, $tag, @payload) = @_; - $self->requests->{$tag}->result(@payload); + (delete $self->requests->{$tag})->result(@payload); } sub receive_message { @@ -59,4 +61,9 @@ sub receive_message { $self->service->receive(@payload); } +sub receive_close { + my ($self, @payload) = @_; + $self->on_close->(@payload); +} + 1; diff --git a/lib/Tak/ConnectionService.pm b/lib/Tak/ConnectionService.pm index 6390e80..f947851 100644 --- a/lib/Tak/ConnectionService.pm +++ b/lib/Tak/ConnectionService.pm @@ -15,7 +15,8 @@ sub BUILD { ); my $receiver = $self->_set_receiver( Tak::ConnectionReceiver->new( - channel => $channel, service => $args->{listening_service} + channel => $channel, service => $args->{listening_service}, + on_close => $args->{on_close}, ) ); } diff --git a/lib/Tak/ConnectorService.pm b/lib/Tak/ConnectorService.pm index 8a244a2..fa39815 100644 --- a/lib/Tak/ConnectorService.pm +++ b/lib/Tak/ConnectorService.pm @@ -15,7 +15,7 @@ sub handle_create { my ($self) = @_; my $kid_pid = IPC::Open2::open2(my $kid_out, my $kid_in, $^X, '-') or die "Couldn't open2 child: $!"; - io($kid_in)->print(io('maint/mk-fat |')->all, "__END__\n"); + $kid_in->print(io('maint/mk-fat |')->all, "__END__\n"); my $connection = Tak::ConnectionService->new( read_fh => $kid_out, write_fh => $kid_in, listening_service => Tak::Router->new diff --git a/lib/Tak/JSONChannel.pm b/lib/Tak/JSONChannel.pm index c06054e..86a63e9 100644 --- a/lib/Tak/JSONChannel.pm +++ b/lib/Tak/JSONChannel.pm @@ -16,6 +16,8 @@ sub read_message { if (my $unpacked = $self->_unpack_line($line)) { return $unpacked; } + } else { + return [ 'close', 'channel' ]; } } diff --git a/lib/Tak/Loop.pm b/lib/Tak/Loop.pm index 888136c..464eb69 100644 --- a/lib/Tak/Loop.pm +++ b/lib/Tak/Loop.pm @@ -29,7 +29,8 @@ sub unwatch_io { sub loop_once { my ($self) = @_; my $read = $self->_read_watches; - my ($readable) = IO::Select->select($self->_read_select); + my ($readable) = IO::Select->select($self->_read_select, undef, undef, 0.5); + die "FFFFFUUUUU: $!" unless $readable; foreach my $fh (@$readable) { $read->{$fh}(); } diff --git a/lib/Tak/STDIOSetup.pm b/lib/Tak/STDIOSetup.pm index fed4656..cb35399 100644 --- a/lib/Tak/STDIOSetup.pm +++ b/lib/Tak/STDIOSetup.pm @@ -9,11 +9,13 @@ sub run { open my $stdin, '<&', \*STDIN; open my $stdout, '>&', \*STDOUT; close STDIN; close STDOUT; + my $done; my $connection = Tak::ConnectionService->new( read_fh => $stdin, write_fh => $stdout, - listening_service => Tak::Router->new + listening_service => Tak::Router->new, + on_close => sub { $done = 1 } ); - Tak->loop->loop_forever; + Tak->loop_until($done); } 1;