new remote code
[scpubgit/Tak.git] / lib / Tak / ConnectionReceiver.pm
1 package Tak::ConnectionReceiver;
2
3 use Tak::Request;
4 use Scalar::Util qw(weaken);
5 use Moo;
6
7 with 'Tak::Role::Service';
8
9 has requests => (is => 'ro', default => sub { {} });
10
11 has channel => (is => 'ro', required => 1);
12
13 has service => (is => 'ro', required => 1);
14
15 sub BUILD {
16   weaken(my $self = shift);
17   my $channel = $self->channel;
18   Tak->loop->watch_io(
19     handle => $channel->read_fh,
20     on_read_ready => sub {
21       if (my $message = $channel->read_message) {
22         $self->receive(@$message);
23       }
24     }
25   );
26 }
27
28 sub DEMOLISH {
29   Tak->loop->unwatch_io(
30     handle => $_[0]->channel->read_fh,
31     on_read_ready => 1,
32   );
33 }
34
35 sub receive_request {
36   my ($self, $tag, $meta, @payload) = @_;
37   my $channel = $self->channel;
38   my $req = Tak::Request->new(
39     ($meta->{progress}
40         ? (on_progress => sub { $channel->write_message(progress => $tag => @_) })
41         : ()),
42     on_result => sub { $channel->write_message(result => $tag => $_[0]->flatten) }
43   );
44   $self->service->start_request($req => @payload);
45 }
46
47 sub receive_progress {
48   my ($self, $tag, @payload) = @_;
49   $self->requests->{$tag}->progress(@payload);
50 }
51
52 sub receive_result {
53   my ($self, $tag, @payload) = @_;
54   $self->requests->{$tag}->result(@payload);
55 }
56
57 sub receive_message {
58   my ($self, @payload) = @_;
59   $self->service->receive(@payload);
60 }
61
62 1;