1 package Tak::ConnectionReceiver;
4 use Scalar::Util qw(weaken);
5 use Log::Contextual qw(:log);
8 with 'Tak::Role::Service';
10 has requests => (is => 'ro', default => sub { {} });
12 has channel => (is => 'ro', required => 1);
14 has service => (is => 'ro', required => 1);
16 has on_close => (is => 'ro', required => 1);
19 weaken(my $self = shift);
20 my $channel = $self->channel;
22 handle => $channel->read_fh,
23 on_read_ready => sub {
24 $channel->read_messages(sub { $self->receive(@_) });
30 Tak->loop->unwatch_io(
31 handle => $_[0]->channel->read_fh,
37 my ($self, $tag, $meta, @payload) = @_;
38 my $channel = $self->channel;
39 unless (ref($meta) eq 'HASH') {
40 $channel->write_message(mistake => $tag => 'meta value not a hashref');
43 my $req = Tak::Request->new(
45 ? (on_progress => sub { $channel->write_message(progress => $tag => @_) })
47 on_result => sub { $channel->write_message(result => $tag => $_[0]->flatten) }
49 $self->service->start_request($req => @payload);
52 sub receive_progress {
53 my ($self, $tag, @payload) = @_;
54 $self->requests->{$tag}->progress(@payload);
58 my ($self, $tag, @payload) = @_;
59 (delete $self->requests->{$tag})->result(@payload);
63 my ($self, @payload) = @_;
64 $self->service->receive(@payload);
68 my ($self, @payload) = @_;
69 $self->on_close->(@payload);