Commit | Line | Data |
77bf1d9b |
1 | package Tak::ConnectionReceiver; |
2 | |
3 | use Tak::Request; |
4 | use Scalar::Util qw(weaken); |
9964f8e0 |
5 | use Log::Contextual qw(:log); |
77bf1d9b |
6 | use Moo; |
7 | |
8 | with 'Tak::Role::Service'; |
9 | |
10 | has requests => (is => 'ro', default => sub { {} }); |
11 | |
12 | has channel => (is => 'ro', required => 1); |
13 | |
14 | has service => (is => 'ro', required => 1); |
15 | |
986f5290 |
16 | has on_close => (is => 'ro', required => 1); |
17 | |
77bf1d9b |
18 | sub BUILD { |
19 | weaken(my $self = shift); |
20 | my $channel = $self->channel; |
21 | Tak->loop->watch_io( |
22 | handle => $channel->read_fh, |
23 | on_read_ready => sub { |
9964f8e0 |
24 | $channel->read_messages(sub { $self->receive(@_) }); |
77bf1d9b |
25 | } |
26 | ); |
27 | } |
28 | |
29 | sub DEMOLISH { |
30 | Tak->loop->unwatch_io( |
31 | handle => $_[0]->channel->read_fh, |
32 | on_read_ready => 1, |
33 | ); |
34 | } |
35 | |
36 | sub receive_request { |
37 | my ($self, $tag, $meta, @payload) = @_; |
38 | my $channel = $self->channel; |
39 | my $req = Tak::Request->new( |
40 | ($meta->{progress} |
41 | ? (on_progress => sub { $channel->write_message(progress => $tag => @_) }) |
42 | : ()), |
43 | on_result => sub { $channel->write_message(result => $tag => $_[0]->flatten) } |
44 | ); |
45 | $self->service->start_request($req => @payload); |
46 | } |
47 | |
48 | sub receive_progress { |
49 | my ($self, $tag, @payload) = @_; |
50 | $self->requests->{$tag}->progress(@payload); |
51 | } |
52 | |
53 | sub receive_result { |
54 | my ($self, $tag, @payload) = @_; |
986f5290 |
55 | (delete $self->requests->{$tag})->result(@payload); |
77bf1d9b |
56 | } |
57 | |
58 | sub receive_message { |
59 | my ($self, @payload) = @_; |
60 | $self->service->receive(@payload); |
61 | } |
62 | |
986f5290 |
63 | sub receive_close { |
64 | my ($self, @payload) = @_; |
65 | $self->on_close->(@payload); |
66 | } |
67 | |
77bf1d9b |
68 | 1; |