stream_exec, buffering fixes
[scpubgit/Tak.git] / lib / Tak / JSONChannel.pm
index 9716768..fc30741 100644 (file)
@@ -3,19 +3,30 @@ package Tak::JSONChannel;
 use JSON::PP qw(encode_json decode_json);
 use IO::Handle;
 use Scalar::Util qw(weaken);
+use Log::Contextual qw(:log);
 use Moo;
 
 has read_fh => (is => 'ro', required => 1);
 has write_fh => (is => 'ro', required => 1);
 
+has _read_buf => (is => 'ro', default => sub { my $x = ''; \$x });
+
 sub BUILD { shift->write_fh->autoflush(1); }
 
-sub receive {
-  my ($self) = @_;
-  while (my $line = readline($self->read_fh)) {
-    if (my $unpacked = $self->_unpack_line($line)) {
-      return $unpacked;
+sub read_messages {
+  my ($self, $cb) = @_;
+  my $rb = $self->_read_buf;
+  if (sysread($self->read_fh, $$rb, 1024, length($$rb)) > 0) {
+    while ($$rb =~ s/^(.*)\n//) {
+      my $line = $1;
+      log_trace { "Received $line" };
+      if (my $unpacked = $self->_unpack_line($line)) {
+        $cb->(@$unpacked);
+      }
     }
+  } else {
+    log_trace { "Closing" };
+    $cb->('close', 'channel');
   }
 }
 
@@ -23,36 +34,40 @@ sub _unpack_line {
   my ($self, $line) = @_;
   my $data = eval { decode_json($line) };
   unless ($data) {
-    $self->send(MISTAKE => invalid_json => $@||'No data and no exception');
+    $self->write_message(mistake => invalid_json => $@||'No data and no exception');
     return;
   }
   unless (ref($data) eq 'ARRAY') {
-    $self->send(MISTAKE => message_format => "Not an ARRAY");
+    $self->write_message(mistake => message_format => "Not an ARRAY");
     return;
   }
   unless (@$data > 0) {
-    $self->send(MISTAKE => message_format => "Empty request array");
+    $self->write_message(mistake => message_format => "Empty request array");
     return;
   }
   $data;
 }
 
-sub send {
+sub write_message {
   my ($self, @msg) = @_;
   my $json = eval { encode_json(\@msg) };
   unless ($json) {
-    $self->_raw_send(
+    $self->_raw_write_message(
       encode_json(
-        [ FAILURE => invalid_message => $@||'No data and no exception' ]
+        [ failure => invalid_message => $@||'No data and no exception' ]
       )
     );
+    return;
   }
-  $self->_raw_send($json);
+  log_trace { "Sending: $json" };
+  $self->_raw_write_message($json);
 }
 
-sub _raw_send {
+sub _raw_write_message {
   my ($self, $raw) = @_;
-  print { $self->write_fh } $raw."\n";
+#warn "Sending: ${raw}\n";
+  print { $self->write_fh } $raw."\n"
+    or log_error { "Error writing: $!" };
 }
 
 1;