stream_exec, buffering fixes
[scpubgit/Tak.git] / lib / Tak / JSONChannel.pm
CommitLineData
36cf3bcb 1package Tak::JSONChannel;
2
3use JSON::PP qw(encode_json decode_json);
4use IO::Handle;
5use Scalar::Util qw(weaken);
7b71b06e 6use Log::Contextual qw(:log);
36cf3bcb 7use Moo;
8
9has read_fh => (is => 'ro', required => 1);
10has write_fh => (is => 'ro', required => 1);
11
9964f8e0 12has _read_buf => (is => 'ro', default => sub { my $x = ''; \$x });
13
36cf3bcb 14sub BUILD { shift->write_fh->autoflush(1); }
15
9964f8e0 16sub read_messages {
17 my ($self, $cb) = @_;
18 my $rb = $self->_read_buf;
19 if (sysread($self->read_fh, $$rb, 1024, length($$rb)) > 0) {
20 while ($$rb =~ s/^(.*)\n//) {
21 my $line = $1;
22 log_trace { "Received $line" };
23 if (my $unpacked = $self->_unpack_line($line)) {
24 $cb->(@$unpacked);
25 }
36cf3bcb 26 }
986f5290 27 } else {
9964f8e0 28 log_trace { "Closing" };
29 $cb->('close', 'channel');
36cf3bcb 30 }
31}
32
33sub _unpack_line {
34 my ($self, $line) = @_;
35 my $data = eval { decode_json($line) };
36 unless ($data) {
77bf1d9b 37 $self->write_message(mistake => invalid_json => $@||'No data and no exception');
36cf3bcb 38 return;
39 }
40 unless (ref($data) eq 'ARRAY') {
77bf1d9b 41 $self->write_message(mistake => message_format => "Not an ARRAY");
36cf3bcb 42 return;
43 }
44 unless (@$data > 0) {
77bf1d9b 45 $self->write_message(mistake => message_format => "Empty request array");
36cf3bcb 46 return;
47 }
48 $data;
49}
50
77bf1d9b 51sub write_message {
36cf3bcb 52 my ($self, @msg) = @_;
53 my $json = eval { encode_json(\@msg) };
54 unless ($json) {
77bf1d9b 55 $self->_raw_write_message(
36cf3bcb 56 encode_json(
77bf1d9b 57 [ failure => invalid_message => $@||'No data and no exception' ]
36cf3bcb 58 )
59 );
857f4834 60 return;
36cf3bcb 61 }
7b71b06e 62 log_trace { "Sending: $json" };
77bf1d9b 63 $self->_raw_write_message($json);
36cf3bcb 64}
65
77bf1d9b 66sub _raw_write_message {
36cf3bcb 67 my ($self, $raw) = @_;
31a246e4 68#warn "Sending: ${raw}\n";
9964f8e0 69 print { $self->write_fh } $raw."\n"
70 or log_error { "Error writing: $!" };
36cf3bcb 71}
72
731;