use Capture::Tiny qw(capture);
use IPC::System::Simple qw(runx EXIT_ANY);
+use IPC::Open3;
+use Symbol qw(gensym);
use Moo;
with 'Tak::Role::Service';
return { stdout => $stdout, stderr => $stderr, exit_code => $code };
}
+sub start_stream_exec_request {
+ my ($self, $req, $command) = @_;
+ my $err = gensym;
+ my $pid = open3(my $in, my $out, $err, @$command)
+ or return $req->failure("Couldn't spawn process: $!");
+ close($in); # bye
+ my $done = sub {
+ Tak->loop->unwatch_io(handle => $_, on_read_ready => 1)
+ for ($out, $err);
+ waitpid($pid, 0);
+ $req->success({ exit_code => $? });
+ };
+ my $outbuf = '';
+ my $errbuf = '';
+ Tak->loop->watch_io(
+ handle => $out,
+ on_read_ready => sub {
+ if (sysread($out, $outbuf, 1024, length($outbuf)) > 0) {
+ $req->progress(stdout => $1) while $outbuf =~ s/^(.*)\n//;
+ } else {
+ $req->progress(stdout => $outbuf) if $outbuf;
+ $req->progress(stderr => $errbuf) if $errbuf;
+ $done->();
+ }
+ }
+ );
+ Tak->loop->watch_io(
+ handle => $err,
+ on_read_ready => sub {
+ if (sysread($err, $errbuf, 1024, length($errbuf)) > 0) {
+ $req->progress(stderr => $1) while $errbuf =~ s/^(.*)\n//;
+ }
+ }
+ );
+}
+
1;
use Tak::Request;
use Scalar::Util qw(weaken);
+use Log::Contextual qw(:log);
use Moo;
with 'Tak::Role::Service';
Tak->loop->watch_io(
handle => $channel->read_fh,
on_read_ready => sub {
- if (my $message = $channel->read_message) {
- $self->receive(@$message);
- }
+ $channel->read_messages(sub { $self->receive(@_) });
}
);
}
has read_fh => (is => 'ro', required => 1);
has write_fh => (is => 'ro', required => 1);
+has _read_buf => (is => 'ro', default => sub { my $x = ''; \$x });
+
sub BUILD { shift->write_fh->autoflush(1); }
-sub read_message {
- my ($self) = @_;
- if (defined(my $line = readline($self->read_fh))) {
- log_trace { "Received $line" };
- if (my $unpacked = $self->_unpack_line($line)) {
- return $unpacked;
+sub read_messages {
+ my ($self, $cb) = @_;
+ my $rb = $self->_read_buf;
+ if (sysread($self->read_fh, $$rb, 1024, length($$rb)) > 0) {
+ while ($$rb =~ s/^(.*)\n//) {
+ my $line = $1;
+ log_trace { "Received $line" };
+ if (my $unpacked = $self->_unpack_line($line)) {
+ $cb->(@$unpacked);
+ }
}
} else {
- return [ 'close', 'channel' ];
+ log_trace { "Closing" };
+ $cb->('close', 'channel');
}
}
sub _raw_write_message {
my ($self, $raw) = @_;
#warn "Sending: ${raw}\n";
- print { $self->write_fh } $raw."\n";
+ print { $self->write_fh } $raw."\n"
+ or log_error { "Error writing: $!" };
}
1;