add 5.8 requirements
[scpubgit/Tak.git] / lib / Tak / ConnectionReceiver.pm
CommitLineData
77bf1d9b 1package Tak::ConnectionReceiver;
2
3use Tak::Request;
4use Scalar::Util qw(weaken);
5use Moo;
6
7with 'Tak::Role::Service';
8
9has requests => (is => 'ro', default => sub { {} });
10
11has channel => (is => 'ro', required => 1);
12
13has service => (is => 'ro', required => 1);
14
986f5290 15has on_close => (is => 'ro', required => 1);
16
77bf1d9b 17sub BUILD {
18 weaken(my $self = shift);
19 my $channel = $self->channel;
20 Tak->loop->watch_io(
21 handle => $channel->read_fh,
22 on_read_ready => sub {
23 if (my $message = $channel->read_message) {
24 $self->receive(@$message);
25 }
26 }
27 );
28}
29
30sub DEMOLISH {
31 Tak->loop->unwatch_io(
32 handle => $_[0]->channel->read_fh,
33 on_read_ready => 1,
34 );
35}
36
37sub receive_request {
38 my ($self, $tag, $meta, @payload) = @_;
39 my $channel = $self->channel;
40 my $req = Tak::Request->new(
41 ($meta->{progress}
42 ? (on_progress => sub { $channel->write_message(progress => $tag => @_) })
43 : ()),
44 on_result => sub { $channel->write_message(result => $tag => $_[0]->flatten) }
45 );
46 $self->service->start_request($req => @payload);
47}
48
49sub receive_progress {
50 my ($self, $tag, @payload) = @_;
51 $self->requests->{$tag}->progress(@payload);
52}
53
54sub receive_result {
55 my ($self, $tag, @payload) = @_;
986f5290 56 (delete $self->requests->{$tag})->result(@payload);
77bf1d9b 57}
58
59sub receive_message {
60 my ($self, @payload) = @_;
61 $self->service->receive(@payload);
62}
63
986f5290 64sub receive_close {
65 my ($self, @payload) = @_;
66 $self->on_close->(@payload);
67}
68
77bf1d9b 691;