stream_exec, buffering fixes
[scpubgit/Tak.git] / lib / Tak / CommandService.pm
CommitLineData
b96be97f 1package Tak::CommandService;
2
3use Capture::Tiny qw(capture);
4use IPC::System::Simple qw(runx EXIT_ANY);
9964f8e0 5use IPC::Open3;
6use Symbol qw(gensym);
b96be97f 7use Moo;
8
9with 'Tak::Role::Service';
10
11sub handle_exec {
12 my ($self, $command) = @_;
13 my $code;
14 my ($stdout, $stderr) = capture {
15 $code = runx(EXIT_ANY, @$command);
16 };
17 return { stdout => $stdout, stderr => $stderr, exit_code => $code };
18}
19
9964f8e0 20sub start_stream_exec_request {
21 my ($self, $req, $command) = @_;
22 my $err = gensym;
23 my $pid = open3(my $in, my $out, $err, @$command)
24 or return $req->failure("Couldn't spawn process: $!");
25 close($in); # bye
26 my $done = sub {
27 Tak->loop->unwatch_io(handle => $_, on_read_ready => 1)
28 for ($out, $err);
29 waitpid($pid, 0);
30 $req->success({ exit_code => $? });
31 };
32 my $outbuf = '';
33 my $errbuf = '';
34 Tak->loop->watch_io(
35 handle => $out,
36 on_read_ready => sub {
37 if (sysread($out, $outbuf, 1024, length($outbuf)) > 0) {
38 $req->progress(stdout => $1) while $outbuf =~ s/^(.*)\n//;
39 } else {
40 $req->progress(stdout => $outbuf) if $outbuf;
41 $req->progress(stderr => $errbuf) if $errbuf;
42 $done->();
43 }
44 }
45 );
46 Tak->loop->watch_io(
47 handle => $err,
48 on_read_ready => sub {
49 if (sysread($err, $errbuf, 1024, length($errbuf)) > 0) {
50 $req->progress(stderr => $1) while $errbuf =~ s/^(.*)\n//;
51 }
52 }
53 );
54}
55
b96be97f 561;