make connector time out correctly
Matt S Trout [Sat, 5 Nov 2011 22:22:50 +0000 (22:22 +0000)]
bin/tak-repl
lib/Tak/ConnectionReceiver.pm
lib/Tak/ConnectionService.pm
lib/Tak/ConnectorService.pm
lib/Tak/JSONChannel.pm
lib/Tak/Loop.pm
lib/Tak/STDIOSetup.pm

index 2148f2d..2383219 100644 (file)
@@ -2,15 +2,10 @@
 
 use Tak::Client;
 use Tak::Router;
-use Tak::MetaService;
 use Tak::REPL;
 use strictures 1;
 
-my $router = Tak::Router->new;
-
-$router->register(meta => Tak::MetaService->new(router => $router));
-
-my $client = Tak::Client->new(service => $router);
+my $client = Tak::Client->new(service => Tak::Router->new);
 
 $client->curry('meta')->do(register => eval => 'Tak::EvalService');
 
index bb2ba15..fe3c33f 100644 (file)
@@ -12,6 +12,8 @@ has channel => (is => 'ro', required => 1);
 
 has service => (is => 'ro', required => 1);
 
+has on_close => (is => 'ro', required => 1);
+
 sub BUILD {
   weaken(my $self = shift);
   my $channel = $self->channel;
@@ -51,7 +53,7 @@ sub receive_progress {
 
 sub receive_result {
   my ($self, $tag, @payload) = @_;
-  $self->requests->{$tag}->result(@payload);
+  (delete $self->requests->{$tag})->result(@payload);
 }
 
 sub receive_message {
@@ -59,4 +61,9 @@ sub receive_message {
   $self->service->receive(@payload);
 }
 
+sub receive_close {
+  my ($self, @payload) = @_;
+  $self->on_close->(@payload);
+}
+
 1;
index 6390e80..f947851 100644 (file)
@@ -15,7 +15,8 @@ sub BUILD {
   );
   my $receiver = $self->_set_receiver(
     Tak::ConnectionReceiver->new(
-      channel => $channel, service => $args->{listening_service}
+      channel => $channel, service => $args->{listening_service},
+      on_close => $args->{on_close},
     )
   );
 }
index 8a244a2..fa39815 100644 (file)
@@ -15,7 +15,7 @@ sub handle_create {
   my ($self) = @_;
   my $kid_pid = IPC::Open2::open2(my $kid_out, my $kid_in, $^X, '-')
     or die "Couldn't open2 child: $!";
-  io($kid_in)->print(io('maint/mk-fat |')->all, "__END__\n");
+  $kid_in->print(io('maint/mk-fat |')->all, "__END__\n");
   my $connection = Tak::ConnectionService->new(
     read_fh => $kid_out, write_fh => $kid_in,
     listening_service => Tak::Router->new
index c06054e..86a63e9 100644 (file)
@@ -16,6 +16,8 @@ sub read_message {
     if (my $unpacked = $self->_unpack_line($line)) {
       return $unpacked;
     }
+  } else {
+    return [ 'close', 'channel' ];
   }
 }
 
index 888136c..464eb69 100644 (file)
@@ -29,7 +29,8 @@ sub unwatch_io {
 sub loop_once {
   my ($self) = @_;
   my $read = $self->_read_watches;
-  my ($readable) = IO::Select->select($self->_read_select);
+  my ($readable) = IO::Select->select($self->_read_select, undef, undef, 0.5);
+  die "FFFFFUUUUU: $!" unless $readable;
   foreach my $fh (@$readable) {
     $read->{$fh}();
   }
index fed4656..cb35399 100644 (file)
@@ -9,11 +9,13 @@ sub run {
   open my $stdin, '<&', \*STDIN;
   open my $stdout, '>&', \*STDOUT;
   close STDIN; close STDOUT;
+  my $done;
   my $connection = Tak::ConnectionService->new(
     read_fh => $stdin, write_fh => $stdout,
-    listening_service => Tak::Router->new
+    listening_service => Tak::Router->new,
+    on_close => sub { $done = 1 }
   );
-  Tak->loop->loop_forever;
+  Tak->loop_until($done);
 }
 
 1;