new remote code
[scpubgit/Tak.git] / lib / Tak / ConnectionReceiver.pm
CommitLineData
77bf1d9b 1package Tak::ConnectionReceiver;
2
3use Tak::Request;
4use Scalar::Util qw(weaken);
5use Moo;
6
7with 'Tak::Role::Service';
8
9has requests => (is => 'ro', default => sub { {} });
10
11has channel => (is => 'ro', required => 1);
12
13has service => (is => 'ro', required => 1);
14
15sub BUILD {
16 weaken(my $self = shift);
17 my $channel = $self->channel;
18 Tak->loop->watch_io(
19 handle => $channel->read_fh,
20 on_read_ready => sub {
21 if (my $message = $channel->read_message) {
22 $self->receive(@$message);
23 }
24 }
25 );
26}
27
28sub DEMOLISH {
29 Tak->loop->unwatch_io(
30 handle => $_[0]->channel->read_fh,
31 on_read_ready => 1,
32 );
33}
34
35sub receive_request {
36 my ($self, $tag, $meta, @payload) = @_;
37 my $channel = $self->channel;
38 my $req = Tak::Request->new(
39 ($meta->{progress}
40 ? (on_progress => sub { $channel->write_message(progress => $tag => @_) })
41 : ()),
42 on_result => sub { $channel->write_message(result => $tag => $_[0]->flatten) }
43 );
44 $self->service->start_request($req => @payload);
45}
46
47sub receive_progress {
48 my ($self, $tag, @payload) = @_;
49 $self->requests->{$tag}->progress(@payload);
50}
51
52sub receive_result {
53 my ($self, $tag, @payload) = @_;
54 $self->requests->{$tag}->result(@payload);
55}
56
57sub receive_message {
58 my ($self, @payload) = @_;
59 $self->service->receive(@payload);
60}
61
621;