stream_exec, buffering fixes
[scpubgit/Tak.git] / lib / Tak / JSONChannel.pm
index af1aee1..fc30741 100644 (file)
@@ -9,17 +9,24 @@ use Moo;
 has read_fh => (is => 'ro', required => 1);
 has write_fh => (is => 'ro', required => 1);
 
+has _read_buf => (is => 'ro', default => sub { my $x = ''; \$x });
+
 sub BUILD { shift->write_fh->autoflush(1); }
 
-sub read_message {
-  my ($self) = @_;
-  if (defined(my $line = readline($self->read_fh))) {
-    log_trace { "Received $line" };
-    if (my $unpacked = $self->_unpack_line($line)) {
-      return $unpacked;
+sub read_messages {
+  my ($self, $cb) = @_;
+  my $rb = $self->_read_buf;
+  if (sysread($self->read_fh, $$rb, 1024, length($$rb)) > 0) {
+    while ($$rb =~ s/^(.*)\n//) {
+      my $line = $1;
+      log_trace { "Received $line" };
+      if (my $unpacked = $self->_unpack_line($line)) {
+        $cb->(@$unpacked);
+      }
     }
   } else {
-    return [ 'close', 'channel' ];
+    log_trace { "Closing" };
+    $cb->('close', 'channel');
   }
 }
 
@@ -59,7 +66,8 @@ sub write_message {
 sub _raw_write_message {
   my ($self, $raw) = @_;
 #warn "Sending: ${raw}\n";
-  print { $self->write_fh } $raw."\n";
+  print { $self->write_fh } $raw."\n"
+    or log_error { "Error writing: $!" };
 }
 
 1;