Commit | Line | Data |
b96be97f |
1 | package Tak::CommandService; |
2 | |
3 | use Capture::Tiny qw(capture); |
4 | use IPC::System::Simple qw(runx EXIT_ANY); |
9964f8e0 |
5 | use IPC::Open3; |
6 | use Symbol qw(gensym); |
b96be97f |
7 | use Moo; |
8 | |
9 | with 'Tak::Role::Service'; |
10 | |
11 | sub handle_exec { |
12 | my ($self, $command) = @_; |
13 | my $code; |
14 | my ($stdout, $stderr) = capture { |
15 | $code = runx(EXIT_ANY, @$command); |
16 | }; |
17 | return { stdout => $stdout, stderr => $stderr, exit_code => $code }; |
18 | } |
19 | |
9964f8e0 |
20 | sub start_stream_exec_request { |
21 | my ($self, $req, $command) = @_; |
22 | my $err = gensym; |
23 | my $pid = open3(my $in, my $out, $err, @$command) |
24 | or return $req->failure("Couldn't spawn process: $!"); |
25 | close($in); # bye |
26 | my $done = sub { |
27 | Tak->loop->unwatch_io(handle => $_, on_read_ready => 1) |
28 | for ($out, $err); |
29 | waitpid($pid, 0); |
30 | $req->success({ exit_code => $? }); |
31 | }; |
32 | my $outbuf = ''; |
33 | my $errbuf = ''; |
34 | Tak->loop->watch_io( |
35 | handle => $out, |
36 | on_read_ready => sub { |
37 | if (sysread($out, $outbuf, 1024, length($outbuf)) > 0) { |
38 | $req->progress(stdout => $1) while $outbuf =~ s/^(.*)\n//; |
39 | } else { |
40 | $req->progress(stdout => $outbuf) if $outbuf; |
41 | $req->progress(stderr => $errbuf) if $errbuf; |
42 | $done->(); |
43 | } |
44 | } |
45 | ); |
46 | Tak->loop->watch_io( |
47 | handle => $err, |
48 | on_read_ready => sub { |
49 | if (sysread($err, $errbuf, 1024, length($errbuf)) > 0) { |
50 | $req->progress(stderr => $1) while $errbuf =~ s/^(.*)\n//; |
51 | } |
52 | } |
53 | ); |
54 | } |
55 | |
b96be97f |
56 | 1; |