stream_exec, buffering fixes
[scpubgit/Tak.git] / lib / Tak / ConnectionReceiver.pm
CommitLineData
77bf1d9b 1package Tak::ConnectionReceiver;
2
3use Tak::Request;
4use Scalar::Util qw(weaken);
9964f8e0 5use Log::Contextual qw(:log);
77bf1d9b 6use Moo;
7
8with 'Tak::Role::Service';
9
10has requests => (is => 'ro', default => sub { {} });
11
12has channel => (is => 'ro', required => 1);
13
14has service => (is => 'ro', required => 1);
15
986f5290 16has on_close => (is => 'ro', required => 1);
17
77bf1d9b 18sub BUILD {
19 weaken(my $self = shift);
20 my $channel = $self->channel;
21 Tak->loop->watch_io(
22 handle => $channel->read_fh,
23 on_read_ready => sub {
9964f8e0 24 $channel->read_messages(sub { $self->receive(@_) });
77bf1d9b 25 }
26 );
27}
28
29sub DEMOLISH {
30 Tak->loop->unwatch_io(
31 handle => $_[0]->channel->read_fh,
32 on_read_ready => 1,
33 );
34}
35
36sub receive_request {
37 my ($self, $tag, $meta, @payload) = @_;
38 my $channel = $self->channel;
39 my $req = Tak::Request->new(
40 ($meta->{progress}
41 ? (on_progress => sub { $channel->write_message(progress => $tag => @_) })
42 : ()),
43 on_result => sub { $channel->write_message(result => $tag => $_[0]->flatten) }
44 );
45 $self->service->start_request($req => @payload);
46}
47
48sub receive_progress {
49 my ($self, $tag, @payload) = @_;
50 $self->requests->{$tag}->progress(@payload);
51}
52
53sub receive_result {
54 my ($self, $tag, @payload) = @_;
986f5290 55 (delete $self->requests->{$tag})->result(@payload);
77bf1d9b 56}
57
58sub receive_message {
59 my ($self, @payload) = @_;
60 $self->service->receive(@payload);
61}
62
986f5290 63sub receive_close {
64 my ($self, @payload) = @_;
65 $self->on_close->(@payload);
66}
67
77bf1d9b 681;