stream_exec, buffering fixes
Matt S Trout [Fri, 11 Nov 2011 12:21:28 +0000 (12:21 +0000)]
lib/Tak/CommandService.pm
lib/Tak/ConnectionReceiver.pm
lib/Tak/JSONChannel.pm

index b311c82..8e5e251 100644 (file)
@@ -2,6 +2,8 @@ package Tak::CommandService;
 
 use Capture::Tiny qw(capture);
 use IPC::System::Simple qw(runx EXIT_ANY);
+use IPC::Open3;
+use Symbol qw(gensym);
 use Moo;
 
 with 'Tak::Role::Service';
@@ -15,4 +17,40 @@ sub handle_exec {
   return { stdout => $stdout, stderr => $stderr, exit_code => $code };
 }
 
+sub start_stream_exec_request {
+  my ($self, $req, $command) = @_;
+  my $err = gensym;
+  my $pid = open3(my $in, my $out, $err, @$command)
+    or return $req->failure("Couldn't spawn process: $!");
+  close($in); # bye
+  my $done = sub {
+    Tak->loop->unwatch_io(handle => $_, on_read_ready => 1)
+      for ($out, $err);
+    waitpid($pid, 0);
+    $req->success({ exit_code => $? });
+  };
+  my $outbuf = '';
+  my $errbuf = '';
+  Tak->loop->watch_io(
+    handle => $out,
+    on_read_ready => sub {
+      if (sysread($out, $outbuf, 1024, length($outbuf)) > 0) {
+        $req->progress(stdout => $1) while $outbuf =~ s/^(.*)\n//;
+      } else {
+        $req->progress(stdout => $outbuf) if $outbuf;
+        $req->progress(stderr => $errbuf) if $errbuf;
+        $done->();
+      }
+    }
+  );
+  Tak->loop->watch_io(
+    handle => $err,
+    on_read_ready => sub {
+      if (sysread($err, $errbuf, 1024, length($errbuf)) > 0) {
+        $req->progress(stderr => $1) while $errbuf =~ s/^(.*)\n//;
+      }
+    }
+  );
+}
+
 1;
index fe3c33f..d85b1b4 100644 (file)
@@ -2,6 +2,7 @@ package Tak::ConnectionReceiver;
 
 use Tak::Request;
 use Scalar::Util qw(weaken);
+use Log::Contextual qw(:log);
 use Moo;
 
 with 'Tak::Role::Service';
@@ -20,9 +21,7 @@ sub BUILD {
   Tak->loop->watch_io(
     handle => $channel->read_fh,
     on_read_ready => sub {
-      if (my $message = $channel->read_message) {
-        $self->receive(@$message);
-      }
+      $channel->read_messages(sub { $self->receive(@_) });
     }
   );
 }
index af1aee1..fc30741 100644 (file)
@@ -9,17 +9,24 @@ use Moo;
 has read_fh => (is => 'ro', required => 1);
 has write_fh => (is => 'ro', required => 1);
 
+has _read_buf => (is => 'ro', default => sub { my $x = ''; \$x });
+
 sub BUILD { shift->write_fh->autoflush(1); }
 
-sub read_message {
-  my ($self) = @_;
-  if (defined(my $line = readline($self->read_fh))) {
-    log_trace { "Received $line" };
-    if (my $unpacked = $self->_unpack_line($line)) {
-      return $unpacked;
+sub read_messages {
+  my ($self, $cb) = @_;
+  my $rb = $self->_read_buf;
+  if (sysread($self->read_fh, $$rb, 1024, length($$rb)) > 0) {
+    while ($$rb =~ s/^(.*)\n//) {
+      my $line = $1;
+      log_trace { "Received $line" };
+      if (my $unpacked = $self->_unpack_line($line)) {
+        $cb->(@$unpacked);
+      }
     }
   } else {
-    return [ 'close', 'channel' ];
+    log_trace { "Closing" };
+    $cb->('close', 'channel');
   }
 }
 
@@ -59,7 +66,8 @@ sub write_message {
 sub _raw_write_message {
   my ($self, $raw) = @_;
 #warn "Sending: ${raw}\n";
-  print { $self->write_fh } $raw."\n";
+  print { $self->write_fh } $raw."\n"
+    or log_error { "Error writing: $!" };
 }
 
 1;