X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=blobdiff_plain;f=lib%2FTak%2FCommandService.pm;h=8e5e251f09f8a295b19a30b36e302bfae53b6054;hb=9964f8e0924c0375b8f9e4e7690a4ab371624bc4;hp=b311c82ca2de37ba96b07ff6826805c303a73c64;hpb=b96be97f13c5b13762d63e25fc0adc3cb8736e60;p=scpubgit%2FTak.git diff --git a/lib/Tak/CommandService.pm b/lib/Tak/CommandService.pm index b311c82..8e5e251 100644 --- a/lib/Tak/CommandService.pm +++ b/lib/Tak/CommandService.pm @@ -2,6 +2,8 @@ package Tak::CommandService; use Capture::Tiny qw(capture); use IPC::System::Simple qw(runx EXIT_ANY); +use IPC::Open3; +use Symbol qw(gensym); use Moo; with 'Tak::Role::Service'; @@ -15,4 +17,40 @@ sub handle_exec { return { stdout => $stdout, stderr => $stderr, exit_code => $code }; } +sub start_stream_exec_request { + my ($self, $req, $command) = @_; + my $err = gensym; + my $pid = open3(my $in, my $out, $err, @$command) + or return $req->failure("Couldn't spawn process: $!"); + close($in); # bye + my $done = sub { + Tak->loop->unwatch_io(handle => $_, on_read_ready => 1) + for ($out, $err); + waitpid($pid, 0); + $req->success({ exit_code => $? }); + }; + my $outbuf = ''; + my $errbuf = ''; + Tak->loop->watch_io( + handle => $out, + on_read_ready => sub { + if (sysread($out, $outbuf, 1024, length($outbuf)) > 0) { + $req->progress(stdout => $1) while $outbuf =~ s/^(.*)\n//; + } else { + $req->progress(stdout => $outbuf) if $outbuf; + $req->progress(stderr => $errbuf) if $errbuf; + $done->(); + } + } + ); + Tak->loop->watch_io( + handle => $err, + on_read_ready => sub { + if (sysread($err, $errbuf, 1024, length($errbuf)) > 0) { + $req->progress(stderr => $1) while $errbuf =~ s/^(.*)\n//; + } + } + ); +} + 1;