use Tak::Request;
use Scalar::Util qw(weaken);
+use Log::Contextual qw(:log);
use Moo;
with 'Tak::Role::Service';
has service => (is => 'ro', required => 1);
+has on_close => (is => 'ro', required => 1);
+
sub BUILD {
weaken(my $self = shift);
my $channel = $self->channel;
Tak->loop->watch_io(
handle => $channel->read_fh,
on_read_ready => sub {
- if (my $message = $channel->read_message) {
- $self->receive(@$message);
- }
+ $channel->read_messages(sub { $self->receive(@_) });
}
);
}
sub receive_result {
my ($self, $tag, @payload) = @_;
- $self->requests->{$tag}->result(@payload);
+ (delete $self->requests->{$tag})->result(@payload);
}
sub receive_message {
$self->service->receive(@payload);
}
+sub receive_close {
+ my ($self, @payload) = @_;
+ $self->on_close->(@payload);
+}
+
1;