trap param error instead of crashing
[scpubgit/Tak.git] / lib / Tak / ConnectionReceiver.pm
1 package Tak::ConnectionReceiver;
2
3 use Tak::Request;
4 use Scalar::Util qw(weaken);
5 use Log::Contextual qw(:log);
6 use Moo;
7
8 with 'Tak::Role::Service';
9
10 has requests => (is => 'ro', default => sub { {} });
11
12 has channel => (is => 'ro', required => 1);
13
14 has service => (is => 'ro', required => 1);
15
16 has on_close => (is => 'ro', required => 1);
17
18 sub BUILD {
19   weaken(my $self = shift);
20   my $channel = $self->channel;
21   Tak->loop->watch_io(
22     handle => $channel->read_fh,
23     on_read_ready => sub {
24       $channel->read_messages(sub { $self->receive(@_) });
25     }
26   );
27 }
28
29 sub DEMOLISH {
30   Tak->loop->unwatch_io(
31     handle => $_[0]->channel->read_fh,
32     on_read_ready => 1,
33   );
34 }
35
36 sub receive_request {
37   my ($self, $tag, $meta, @payload) = @_;
38   my $channel = $self->channel;
39   unless (ref($meta) eq 'HASH') {
40     $channel->write_message(mistake => $tag => 'meta value not a hashref');
41     return;
42   }
43   my $req = Tak::Request->new(
44     ($meta->{progress}
45         ? (on_progress => sub { $channel->write_message(progress => $tag => @_) })
46         : ()),
47     on_result => sub { $channel->write_message(result => $tag => $_[0]->flatten) }
48   );
49   $self->service->start_request($req => @payload);
50 }
51
52 sub receive_progress {
53   my ($self, $tag, @payload) = @_;
54   $self->requests->{$tag}->progress(@payload);
55 }
56
57 sub receive_result {
58   my ($self, $tag, @payload) = @_;
59   (delete $self->requests->{$tag})->result(@payload);
60 }
61
62 sub receive_message {
63   my ($self, @payload) = @_;
64   $self->service->receive(@payload);
65 }
66
67 sub receive_close {
68   my ($self, @payload) = @_;
69   $self->on_close->(@payload);
70 }
71
72 1;