trap param error instead of crashing
[scpubgit/Tak.git] / lib / Tak / ConnectionReceiver.pm
index bb2ba15..9a06dd1 100644 (file)
@@ -2,6 +2,7 @@ package Tak::ConnectionReceiver;
 
 use Tak::Request;
 use Scalar::Util qw(weaken);
+use Log::Contextual qw(:log);
 use Moo;
 
 with 'Tak::Role::Service';
@@ -12,15 +13,15 @@ has channel => (is => 'ro', required => 1);
 
 has service => (is => 'ro', required => 1);
 
+has on_close => (is => 'ro', required => 1);
+
 sub BUILD {
   weaken(my $self = shift);
   my $channel = $self->channel;
   Tak->loop->watch_io(
     handle => $channel->read_fh,
     on_read_ready => sub {
-      if (my $message = $channel->read_message) {
-        $self->receive(@$message);
-      }
+      $channel->read_messages(sub { $self->receive(@_) });
     }
   );
 }
@@ -35,6 +36,10 @@ sub DEMOLISH {
 sub receive_request {
   my ($self, $tag, $meta, @payload) = @_;
   my $channel = $self->channel;
+  unless (ref($meta) eq 'HASH') {
+    $channel->write_message(mistake => $tag => 'meta value not a hashref');
+    return;
+  }
   my $req = Tak::Request->new(
     ($meta->{progress}
         ? (on_progress => sub { $channel->write_message(progress => $tag => @_) })
@@ -51,7 +56,7 @@ sub receive_progress {
 
 sub receive_result {
   my ($self, $tag, @payload) = @_;
-  $self->requests->{$tag}->result(@payload);
+  (delete $self->requests->{$tag})->result(@payload);
 }
 
 sub receive_message {
@@ -59,4 +64,9 @@ sub receive_message {
   $self->service->receive(@payload);
 }
 
+sub receive_close {
+  my ($self, @payload) = @_;
+  $self->on_close->(@payload);
+}
+
 1;