stream_exec, buffering fixes
[scpubgit/Tak.git] / lib / Tak / JSONChannel.pm
index 86a63e9..fc30741 100644 (file)
@@ -3,21 +3,30 @@ package Tak::JSONChannel;
 use JSON::PP qw(encode_json decode_json);
 use IO::Handle;
 use Scalar::Util qw(weaken);
+use Log::Contextual qw(:log);
 use Moo;
 
 has read_fh => (is => 'ro', required => 1);
 has write_fh => (is => 'ro', required => 1);
 
+has _read_buf => (is => 'ro', default => sub { my $x = ''; \$x });
+
 sub BUILD { shift->write_fh->autoflush(1); }
 
-sub read_message {
-  my ($self) = @_;
-  if (defined(my $line = readline($self->read_fh))) {
-    if (my $unpacked = $self->_unpack_line($line)) {
-      return $unpacked;
+sub read_messages {
+  my ($self, $cb) = @_;
+  my $rb = $self->_read_buf;
+  if (sysread($self->read_fh, $$rb, 1024, length($$rb)) > 0) {
+    while ($$rb =~ s/^(.*)\n//) {
+      my $line = $1;
+      log_trace { "Received $line" };
+      if (my $unpacked = $self->_unpack_line($line)) {
+        $cb->(@$unpacked);
+      }
     }
   } else {
-    return [ 'close', 'channel' ];
+    log_trace { "Closing" };
+    $cb->('close', 'channel');
   }
 }
 
@@ -50,13 +59,15 @@ sub write_message {
     );
     return;
   }
+  log_trace { "Sending: $json" };
   $self->_raw_write_message($json);
 }
 
 sub _raw_write_message {
   my ($self, $raw) = @_;
 #warn "Sending: ${raw}\n";
-  print { $self->write_fh } $raw."\n";
+  print { $self->write_fh } $raw."\n"
+    or log_error { "Error writing: $!" };
 }
 
 1;