stream_exec, buffering fixes
[scpubgit/Tak.git] / lib / Tak / CommandService.pm
1 package Tak::CommandService;
2
3 use Capture::Tiny qw(capture);
4 use IPC::System::Simple qw(runx EXIT_ANY);
5 use IPC::Open3;
6 use Symbol qw(gensym);
7 use Moo;
8
9 with 'Tak::Role::Service';
10
11 sub handle_exec {
12   my ($self, $command) = @_;
13   my $code;
14   my ($stdout, $stderr) = capture {
15     $code = runx(EXIT_ANY, @$command);
16   };
17   return { stdout => $stdout, stderr => $stderr, exit_code => $code };
18 }
19
20 sub start_stream_exec_request {
21   my ($self, $req, $command) = @_;
22   my $err = gensym;
23   my $pid = open3(my $in, my $out, $err, @$command)
24     or return $req->failure("Couldn't spawn process: $!");
25   close($in); # bye
26   my $done = sub {
27     Tak->loop->unwatch_io(handle => $_, on_read_ready => 1)
28       for ($out, $err);
29     waitpid($pid, 0);
30     $req->success({ exit_code => $? });
31   };
32   my $outbuf = '';
33   my $errbuf = '';
34   Tak->loop->watch_io(
35     handle => $out,
36     on_read_ready => sub {
37       if (sysread($out, $outbuf, 1024, length($outbuf)) > 0) {
38         $req->progress(stdout => $1) while $outbuf =~ s/^(.*)\n//;
39       } else {
40         $req->progress(stdout => $outbuf) if $outbuf;
41         $req->progress(stderr => $errbuf) if $errbuf;
42         $done->();
43       }
44     }
45   );
46   Tak->loop->watch_io(
47     handle => $err,
48     on_read_ready => sub {
49       if (sysread($err, $errbuf, 1024, length($errbuf)) > 0) {
50         $req->progress(stderr => $1) while $errbuf =~ s/^(.*)\n//;
51       }
52     }
53   );
54 }
55
56 1;