X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=blobdiff_plain;f=lib%2FTak%2FConnectionReceiver.pm;h=9a06dd18401ddca397ff5308c9334c9a6c7af6c7;hb=b8d6fce44d99e86b2b116f00c3752ef340950402;hp=bb2ba151b9404ccee427cc400d28603bcbb68ad5;hpb=77bf1d9b5b6832894676ab549ee5664cb7200d33;p=scpubgit%2FTak.git diff --git a/lib/Tak/ConnectionReceiver.pm b/lib/Tak/ConnectionReceiver.pm index bb2ba15..9a06dd1 100644 --- a/lib/Tak/ConnectionReceiver.pm +++ b/lib/Tak/ConnectionReceiver.pm @@ -2,6 +2,7 @@ package Tak::ConnectionReceiver; use Tak::Request; use Scalar::Util qw(weaken); +use Log::Contextual qw(:log); use Moo; with 'Tak::Role::Service'; @@ -12,15 +13,15 @@ has channel => (is => 'ro', required => 1); has service => (is => 'ro', required => 1); +has on_close => (is => 'ro', required => 1); + sub BUILD { weaken(my $self = shift); my $channel = $self->channel; Tak->loop->watch_io( handle => $channel->read_fh, on_read_ready => sub { - if (my $message = $channel->read_message) { - $self->receive(@$message); - } + $channel->read_messages(sub { $self->receive(@_) }); } ); } @@ -35,6 +36,10 @@ sub DEMOLISH { sub receive_request { my ($self, $tag, $meta, @payload) = @_; my $channel = $self->channel; + unless (ref($meta) eq 'HASH') { + $channel->write_message(mistake => $tag => 'meta value not a hashref'); + return; + } my $req = Tak::Request->new( ($meta->{progress} ? (on_progress => sub { $channel->write_message(progress => $tag => @_) }) @@ -51,7 +56,7 @@ sub receive_progress { sub receive_result { my ($self, $tag, @payload) = @_; - $self->requests->{$tag}->result(@payload); + (delete $self->requests->{$tag})->result(@payload); } sub receive_message { @@ -59,4 +64,9 @@ sub receive_message { $self->service->receive(@payload); } +sub receive_close { + my ($self, @payload) = @_; + $self->on_close->(@payload); +} + 1;