stream_exec, buffering fixes
[scpubgit/Tak.git] / lib / Tak / JSONChannel.pm
1 package Tak::JSONChannel;
2
3 use JSON::PP qw(encode_json decode_json);
4 use IO::Handle;
5 use Scalar::Util qw(weaken);
6 use Log::Contextual qw(:log);
7 use Moo;
8
9 has read_fh => (is => 'ro', required => 1);
10 has write_fh => (is => 'ro', required => 1);
11
12 has _read_buf => (is => 'ro', default => sub { my $x = ''; \$x });
13
14 sub BUILD { shift->write_fh->autoflush(1); }
15
16 sub read_messages {
17   my ($self, $cb) = @_;
18   my $rb = $self->_read_buf;
19   if (sysread($self->read_fh, $$rb, 1024, length($$rb)) > 0) {
20     while ($$rb =~ s/^(.*)\n//) {
21       my $line = $1;
22       log_trace { "Received $line" };
23       if (my $unpacked = $self->_unpack_line($line)) {
24         $cb->(@$unpacked);
25       }
26     }
27   } else {
28     log_trace { "Closing" };
29     $cb->('close', 'channel');
30   }
31 }
32
33 sub _unpack_line {
34   my ($self, $line) = @_;
35   my $data = eval { decode_json($line) };
36   unless ($data) {
37     $self->write_message(mistake => invalid_json => $@||'No data and no exception');
38     return;
39   }
40   unless (ref($data) eq 'ARRAY') {
41     $self->write_message(mistake => message_format => "Not an ARRAY");
42     return;
43   }
44   unless (@$data > 0) {
45     $self->write_message(mistake => message_format => "Empty request array");
46     return;
47   }
48   $data;
49 }
50
51 sub write_message {
52   my ($self, @msg) = @_;
53   my $json = eval { encode_json(\@msg) };
54   unless ($json) {
55     $self->_raw_write_message(
56       encode_json(
57         [ failure => invalid_message => $@||'No data and no exception' ]
58       )
59     );
60     return;
61   }
62   log_trace { "Sending: $json" };
63   $self->_raw_write_message($json);
64 }
65
66 sub _raw_write_message {
67   my ($self, $raw) = @_;
68 #warn "Sending: ${raw}\n";
69   print { $self->write_fh } $raw."\n"
70     or log_error { "Error writing: $!" };
71 }
72
73 1;