protect against re-upgrading
[scpubgit/Tak.git] / lib / Tak / ConnectionReceiver.pm
index bb2ba15..d85b1b4 100644 (file)
@@ -2,6 +2,7 @@ package Tak::ConnectionReceiver;
 
 use Tak::Request;
 use Scalar::Util qw(weaken);
+use Log::Contextual qw(:log);
 use Moo;
 
 with 'Tak::Role::Service';
@@ -12,15 +13,15 @@ has channel => (is => 'ro', required => 1);
 
 has service => (is => 'ro', required => 1);
 
+has on_close => (is => 'ro', required => 1);
+
 sub BUILD {
   weaken(my $self = shift);
   my $channel = $self->channel;
   Tak->loop->watch_io(
     handle => $channel->read_fh,
     on_read_ready => sub {
-      if (my $message = $channel->read_message) {
-        $self->receive(@$message);
-      }
+      $channel->read_messages(sub { $self->receive(@_) });
     }
   );
 }
@@ -51,7 +52,7 @@ sub receive_progress {
 
 sub receive_result {
   my ($self, $tag, @payload) = @_;
-  $self->requests->{$tag}->result(@payload);
+  (delete $self->requests->{$tag})->result(@payload);
 }
 
 sub receive_message {
@@ -59,4 +60,9 @@ sub receive_message {
   $self->service->receive(@payload);
 }
 
+sub receive_close {
+  my ($self, @payload) = @_;
+  $self->on_close->(@payload);
+}
+
 1;