make connector time out correctly
[scpubgit/Tak.git] / lib / Tak / ConnectionReceiver.pm
1 package Tak::ConnectionReceiver;
2
3 use Tak::Request;
4 use Scalar::Util qw(weaken);
5 use Moo;
6
7 with 'Tak::Role::Service';
8
9 has requests => (is => 'ro', default => sub { {} });
10
11 has channel => (is => 'ro', required => 1);
12
13 has service => (is => 'ro', required => 1);
14
15 has on_close => (is => 'ro', required => 1);
16
17 sub BUILD {
18   weaken(my $self = shift);
19   my $channel = $self->channel;
20   Tak->loop->watch_io(
21     handle => $channel->read_fh,
22     on_read_ready => sub {
23       if (my $message = $channel->read_message) {
24         $self->receive(@$message);
25       }
26     }
27   );
28 }
29
30 sub DEMOLISH {
31   Tak->loop->unwatch_io(
32     handle => $_[0]->channel->read_fh,
33     on_read_ready => 1,
34   );
35 }
36
37 sub receive_request {
38   my ($self, $tag, $meta, @payload) = @_;
39   my $channel = $self->channel;
40   my $req = Tak::Request->new(
41     ($meta->{progress}
42         ? (on_progress => sub { $channel->write_message(progress => $tag => @_) })
43         : ()),
44     on_result => sub { $channel->write_message(result => $tag => $_[0]->flatten) }
45   );
46   $self->service->start_request($req => @payload);
47 }
48
49 sub receive_progress {
50   my ($self, $tag, @payload) = @_;
51   $self->requests->{$tag}->progress(@payload);
52 }
53
54 sub receive_result {
55   my ($self, $tag, @payload) = @_;
56   (delete $self->requests->{$tag})->result(@payload);
57 }
58
59 sub receive_message {
60   my ($self, @payload) = @_;
61   $self->service->receive(@payload);
62 }
63
64 sub receive_close {
65   my ($self, @payload) = @_;
66   $self->on_close->(@payload);
67 }
68
69 1;