Commit | Line | Data |
77bf1d9b |
1 | package Tak::ConnectionReceiver; |
2 | |
3 | use Tak::Request; |
4 | use Scalar::Util qw(weaken); |
5 | use Moo; |
6 | |
7 | with 'Tak::Role::Service'; |
8 | |
9 | has requests => (is => 'ro', default => sub { {} }); |
10 | |
11 | has channel => (is => 'ro', required => 1); |
12 | |
13 | has service => (is => 'ro', required => 1); |
14 | |
15 | sub BUILD { |
16 | weaken(my $self = shift); |
17 | my $channel = $self->channel; |
18 | Tak->loop->watch_io( |
19 | handle => $channel->read_fh, |
20 | on_read_ready => sub { |
21 | if (my $message = $channel->read_message) { |
22 | $self->receive(@$message); |
23 | } |
24 | } |
25 | ); |
26 | } |
27 | |
28 | sub DEMOLISH { |
29 | Tak->loop->unwatch_io( |
30 | handle => $_[0]->channel->read_fh, |
31 | on_read_ready => 1, |
32 | ); |
33 | } |
34 | |
35 | sub receive_request { |
36 | my ($self, $tag, $meta, @payload) = @_; |
37 | my $channel = $self->channel; |
38 | my $req = Tak::Request->new( |
39 | ($meta->{progress} |
40 | ? (on_progress => sub { $channel->write_message(progress => $tag => @_) }) |
41 | : ()), |
42 | on_result => sub { $channel->write_message(result => $tag => $_[0]->flatten) } |
43 | ); |
44 | $self->service->start_request($req => @payload); |
45 | } |
46 | |
47 | sub receive_progress { |
48 | my ($self, $tag, @payload) = @_; |
49 | $self->requests->{$tag}->progress(@payload); |
50 | } |
51 | |
52 | sub receive_result { |
53 | my ($self, $tag, @payload) = @_; |
54 | $self->requests->{$tag}->result(@payload); |
55 | } |
56 | |
57 | sub receive_message { |
58 | my ($self, @payload) = @_; |
59 | $self->service->receive(@payload); |
60 | } |
61 | |
62 | 1; |