From: Matt S Trout Date: Fri, 11 Nov 2011 12:21:28 +0000 (+0000) Subject: stream_exec, buffering fixes X-Git-Tag: v0.001001~14 X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=commitdiff_plain;h=9964f8e0924c0375b8f9e4e7690a4ab371624bc4;p=scpubgit%2FTak.git stream_exec, buffering fixes --- diff --git a/lib/Tak/CommandService.pm b/lib/Tak/CommandService.pm index b311c82..8e5e251 100644 --- a/lib/Tak/CommandService.pm +++ b/lib/Tak/CommandService.pm @@ -2,6 +2,8 @@ package Tak::CommandService; use Capture::Tiny qw(capture); use IPC::System::Simple qw(runx EXIT_ANY); +use IPC::Open3; +use Symbol qw(gensym); use Moo; with 'Tak::Role::Service'; @@ -15,4 +17,40 @@ sub handle_exec { return { stdout => $stdout, stderr => $stderr, exit_code => $code }; } +sub start_stream_exec_request { + my ($self, $req, $command) = @_; + my $err = gensym; + my $pid = open3(my $in, my $out, $err, @$command) + or return $req->failure("Couldn't spawn process: $!"); + close($in); # bye + my $done = sub { + Tak->loop->unwatch_io(handle => $_, on_read_ready => 1) + for ($out, $err); + waitpid($pid, 0); + $req->success({ exit_code => $? }); + }; + my $outbuf = ''; + my $errbuf = ''; + Tak->loop->watch_io( + handle => $out, + on_read_ready => sub { + if (sysread($out, $outbuf, 1024, length($outbuf)) > 0) { + $req->progress(stdout => $1) while $outbuf =~ s/^(.*)\n//; + } else { + $req->progress(stdout => $outbuf) if $outbuf; + $req->progress(stderr => $errbuf) if $errbuf; + $done->(); + } + } + ); + Tak->loop->watch_io( + handle => $err, + on_read_ready => sub { + if (sysread($err, $errbuf, 1024, length($errbuf)) > 0) { + $req->progress(stderr => $1) while $errbuf =~ s/^(.*)\n//; + } + } + ); +} + 1; diff --git a/lib/Tak/ConnectionReceiver.pm b/lib/Tak/ConnectionReceiver.pm index fe3c33f..d85b1b4 100644 --- a/lib/Tak/ConnectionReceiver.pm +++ b/lib/Tak/ConnectionReceiver.pm @@ -2,6 +2,7 @@ package Tak::ConnectionReceiver; use Tak::Request; use Scalar::Util qw(weaken); +use Log::Contextual qw(:log); use Moo; with 'Tak::Role::Service'; @@ -20,9 +21,7 @@ sub BUILD { Tak->loop->watch_io( handle => $channel->read_fh, on_read_ready => sub { - if (my $message = $channel->read_message) { - $self->receive(@$message); - } + $channel->read_messages(sub { $self->receive(@_) }); } ); } diff --git a/lib/Tak/JSONChannel.pm b/lib/Tak/JSONChannel.pm index af1aee1..fc30741 100644 --- a/lib/Tak/JSONChannel.pm +++ b/lib/Tak/JSONChannel.pm @@ -9,17 +9,24 @@ use Moo; has read_fh => (is => 'ro', required => 1); has write_fh => (is => 'ro', required => 1); +has _read_buf => (is => 'ro', default => sub { my $x = ''; \$x }); + sub BUILD { shift->write_fh->autoflush(1); } -sub read_message { - my ($self) = @_; - if (defined(my $line = readline($self->read_fh))) { - log_trace { "Received $line" }; - if (my $unpacked = $self->_unpack_line($line)) { - return $unpacked; +sub read_messages { + my ($self, $cb) = @_; + my $rb = $self->_read_buf; + if (sysread($self->read_fh, $$rb, 1024, length($$rb)) > 0) { + while ($$rb =~ s/^(.*)\n//) { + my $line = $1; + log_trace { "Received $line" }; + if (my $unpacked = $self->_unpack_line($line)) { + $cb->(@$unpacked); + } } } else { - return [ 'close', 'channel' ]; + log_trace { "Closing" }; + $cb->('close', 'channel'); } } @@ -59,7 +66,8 @@ sub write_message { sub _raw_write_message { my ($self, $raw) = @_; #warn "Sending: ${raw}\n"; - print { $self->write_fh } $raw."\n"; + print { $self->write_fh } $raw."\n" + or log_error { "Error writing: $!" }; } 1;