Commit | Line | Data |
77bf1d9b |
1 | package Tak::ConnectionReceiver; |
2 | |
3 | use Tak::Request; |
4 | use Scalar::Util qw(weaken); |
5 | use Moo; |
6 | |
7 | with 'Tak::Role::Service'; |
8 | |
9 | has requests => (is => 'ro', default => sub { {} }); |
10 | |
11 | has channel => (is => 'ro', required => 1); |
12 | |
13 | has service => (is => 'ro', required => 1); |
14 | |
986f5290 |
15 | has on_close => (is => 'ro', required => 1); |
16 | |
77bf1d9b |
17 | sub BUILD { |
18 | weaken(my $self = shift); |
19 | my $channel = $self->channel; |
20 | Tak->loop->watch_io( |
21 | handle => $channel->read_fh, |
22 | on_read_ready => sub { |
23 | if (my $message = $channel->read_message) { |
24 | $self->receive(@$message); |
25 | } |
26 | } |
27 | ); |
28 | } |
29 | |
30 | sub DEMOLISH { |
31 | Tak->loop->unwatch_io( |
32 | handle => $_[0]->channel->read_fh, |
33 | on_read_ready => 1, |
34 | ); |
35 | } |
36 | |
37 | sub receive_request { |
38 | my ($self, $tag, $meta, @payload) = @_; |
39 | my $channel = $self->channel; |
40 | my $req = Tak::Request->new( |
41 | ($meta->{progress} |
42 | ? (on_progress => sub { $channel->write_message(progress => $tag => @_) }) |
43 | : ()), |
44 | on_result => sub { $channel->write_message(result => $tag => $_[0]->flatten) } |
45 | ); |
46 | $self->service->start_request($req => @payload); |
47 | } |
48 | |
49 | sub receive_progress { |
50 | my ($self, $tag, @payload) = @_; |
51 | $self->requests->{$tag}->progress(@payload); |
52 | } |
53 | |
54 | sub receive_result { |
55 | my ($self, $tag, @payload) = @_; |
986f5290 |
56 | (delete $self->requests->{$tag})->result(@payload); |
77bf1d9b |
57 | } |
58 | |
59 | sub receive_message { |
60 | my ($self, @payload) = @_; |
61 | $self->service->receive(@payload); |
62 | } |
63 | |
986f5290 |
64 | sub receive_close { |
65 | my ($self, @payload) = @_; |
66 | $self->on_close->(@payload); |
67 | } |
68 | |
77bf1d9b |
69 | 1; |