trap param error instead of crashing
[scpubgit/Tak.git] / lib / Tak / ConnectionReceiver.pm
CommitLineData
77bf1d9b 1package Tak::ConnectionReceiver;
2
3use Tak::Request;
4use Scalar::Util qw(weaken);
9964f8e0 5use Log::Contextual qw(:log);
77bf1d9b 6use Moo;
7
8with 'Tak::Role::Service';
9
10has requests => (is => 'ro', default => sub { {} });
11
12has channel => (is => 'ro', required => 1);
13
14has service => (is => 'ro', required => 1);
15
986f5290 16has on_close => (is => 'ro', required => 1);
17
77bf1d9b 18sub BUILD {
19 weaken(my $self = shift);
20 my $channel = $self->channel;
21 Tak->loop->watch_io(
22 handle => $channel->read_fh,
23 on_read_ready => sub {
9964f8e0 24 $channel->read_messages(sub { $self->receive(@_) });
77bf1d9b 25 }
26 );
27}
28
29sub DEMOLISH {
30 Tak->loop->unwatch_io(
31 handle => $_[0]->channel->read_fh,
32 on_read_ready => 1,
33 );
34}
35
36sub receive_request {
37 my ($self, $tag, $meta, @payload) = @_;
38 my $channel = $self->channel;
b8d6fce4 39 unless (ref($meta) eq 'HASH') {
40 $channel->write_message(mistake => $tag => 'meta value not a hashref');
41 return;
42 }
77bf1d9b 43 my $req = Tak::Request->new(
44 ($meta->{progress}
45 ? (on_progress => sub { $channel->write_message(progress => $tag => @_) })
46 : ()),
47 on_result => sub { $channel->write_message(result => $tag => $_[0]->flatten) }
48 );
49 $self->service->start_request($req => @payload);
50}
51
52sub receive_progress {
53 my ($self, $tag, @payload) = @_;
54 $self->requests->{$tag}->progress(@payload);
55}
56
57sub receive_result {
58 my ($self, $tag, @payload) = @_;
986f5290 59 (delete $self->requests->{$tag})->result(@payload);
77bf1d9b 60}
61
62sub receive_message {
63 my ($self, @payload) = @_;
64 $self->service->receive(@payload);
65}
66
986f5290 67sub receive_close {
68 my ($self, @payload) = @_;
69 $self->on_close->(@payload);
70}
71
77bf1d9b 721;