1 package Tak::JSONChannel;
3 use JSON::PP qw(encode_json decode_json);
5 use Scalar::Util qw(weaken);
6 use Log::Contextual qw(:log);
9 has read_fh => (is => 'ro', required => 1);
10 has write_fh => (is => 'ro', required => 1);
12 has _read_buf => (is => 'ro', default => sub { my $x = ''; \$x });
14 sub BUILD { shift->write_fh->autoflush(1); }
18 my $rb = $self->_read_buf;
19 if (sysread($self->read_fh, $$rb, 1024, length($$rb)) > 0) {
20 while ($$rb =~ s/^(.*)\n//) {
22 log_trace { "Received $line" };
23 if (my $unpacked = $self->_unpack_line($line)) {
28 log_trace { "Closing" };
29 $cb->('close', 'channel');
34 my ($self, $line) = @_;
35 my $data = eval { decode_json($line) };
37 $self->write_message(mistake => invalid_json => $@||'No data and no exception');
40 unless (ref($data) eq 'ARRAY') {
41 $self->write_message(mistake => message_format => "Not an ARRAY");
45 $self->write_message(mistake => message_format => "Empty request array");
52 my ($self, @msg) = @_;
53 my $json = eval { encode_json(\@msg) };
55 $self->_raw_write_message(
57 [ failure => invalid_message => $@||'No data and no exception' ]
62 log_trace { "Sending: $json" };
63 $self->_raw_write_message($json);
66 sub _raw_write_message {
67 my ($self, $raw) = @_;
68 #warn "Sending: ${raw}\n";
69 print { $self->write_fh } $raw."\n"
70 or log_error { "Error writing: $!" };