Commit | Line | Data |
77bf1d9b |
1 | package Tak::ConnectionReceiver; |
2 | |
3 | use Tak::Request; |
4 | use Scalar::Util qw(weaken); |
9964f8e0 |
5 | use Log::Contextual qw(:log); |
77bf1d9b |
6 | use Moo; |
7 | |
8 | with 'Tak::Role::Service'; |
9 | |
10 | has requests => (is => 'ro', default => sub { {} }); |
11 | |
12 | has channel => (is => 'ro', required => 1); |
13 | |
14 | has service => (is => 'ro', required => 1); |
15 | |
986f5290 |
16 | has on_close => (is => 'ro', required => 1); |
17 | |
77bf1d9b |
18 | sub BUILD { |
19 | weaken(my $self = shift); |
20 | my $channel = $self->channel; |
21 | Tak->loop->watch_io( |
22 | handle => $channel->read_fh, |
23 | on_read_ready => sub { |
9964f8e0 |
24 | $channel->read_messages(sub { $self->receive(@_) }); |
77bf1d9b |
25 | } |
26 | ); |
27 | } |
28 | |
29 | sub DEMOLISH { |
30 | Tak->loop->unwatch_io( |
31 | handle => $_[0]->channel->read_fh, |
32 | on_read_ready => 1, |
33 | ); |
34 | } |
35 | |
36 | sub receive_request { |
37 | my ($self, $tag, $meta, @payload) = @_; |
38 | my $channel = $self->channel; |
b8d6fce4 |
39 | unless (ref($meta) eq 'HASH') { |
40 | $channel->write_message(mistake => $tag => 'meta value not a hashref'); |
41 | return; |
42 | } |
77bf1d9b |
43 | my $req = Tak::Request->new( |
44 | ($meta->{progress} |
45 | ? (on_progress => sub { $channel->write_message(progress => $tag => @_) }) |
46 | : ()), |
47 | on_result => sub { $channel->write_message(result => $tag => $_[0]->flatten) } |
48 | ); |
49 | $self->service->start_request($req => @payload); |
50 | } |
51 | |
52 | sub receive_progress { |
53 | my ($self, $tag, @payload) = @_; |
54 | $self->requests->{$tag}->progress(@payload); |
55 | } |
56 | |
57 | sub receive_result { |
58 | my ($self, $tag, @payload) = @_; |
986f5290 |
59 | (delete $self->requests->{$tag})->result(@payload); |
77bf1d9b |
60 | } |
61 | |
62 | sub receive_message { |
63 | my ($self, @payload) = @_; |
64 | $self->service->receive(@payload); |
65 | } |
66 | |
986f5290 |
67 | sub receive_close { |
68 | my ($self, @payload) = @_; |
69 | $self->on_close->(@payload); |
70 | } |
71 | |
77bf1d9b |
72 | 1; |