}
sub do {
+ shift->result_of(@_)->get;
+}
+
+sub result_of {
my ($self, @payload) = @_;
my $done;
my $result;
on_result => sub { $result = shift },
}, @payload);
Tak->loop_until($result);
- $result->get;
+ return $result;
}
1;
--- /dev/null
+package Tak::ConnectionReceiver;
+
+use Tak::Request;
+use Scalar::Util qw(weaken);
+use Moo;
+
+with 'Tak::Role::Service';
+
+has requests => (is => 'ro', default => sub { {} });
+
+has channel => (is => 'ro', required => 1);
+
+has service => (is => 'ro', required => 1);
+
+sub BUILD {
+ weaken(my $self = shift);
+ my $channel = $self->channel;
+ Tak->loop->watch_io(
+ handle => $channel->read_fh,
+ on_read_ready => sub {
+ if (my $message = $channel->read_message) {
+ $self->receive(@$message);
+ }
+ }
+ );
+}
+
+sub DEMOLISH {
+ Tak->loop->unwatch_io(
+ handle => $_[0]->channel->read_fh,
+ on_read_ready => 1,
+ );
+}
+
+sub receive_request {
+ my ($self, $tag, $meta, @payload) = @_;
+ my $channel = $self->channel;
+ my $req = Tak::Request->new(
+ ($meta->{progress}
+ ? (on_progress => sub { $channel->write_message(progress => $tag => @_) })
+ : ()),
+ on_result => sub { $channel->write_message(result => $tag => $_[0]->flatten) }
+ );
+ $self->service->start_request($req => @payload);
+}
+
+sub receive_progress {
+ my ($self, $tag, @payload) = @_;
+ $self->requests->{$tag}->progress(@payload);
+}
+
+sub receive_result {
+ my ($self, $tag, @payload) = @_;
+ $self->requests->{$tag}->result(@payload);
+}
+
+sub receive_message {
+ my ($self, @payload) = @_;
+ $self->service->receive(@payload);
+}
+
+1;
--- /dev/null
+package Tak::ConnectionService;
+
+use Tak::ConnectionReceiver;
+use Tak::JSONChannel;
+use Moo;
+
+has receiver => (is => 'ro', writer => '_set_receiver');
+
+has channel => (is => 'ro', writer => '_set_channel');
+
+sub BUILD {
+ my ($self, $args) = @_;
+ my $channel = $self->_set_channel(
+ Tak::JSONChannel->new(map +($_ => $args->{$_}), qw(read_fh write_fh))
+ );
+ my $receiver = $self->_set_receiver(
+ Tak::ConnectionReceiver->new(
+ channel => $channel, service => $args->{listening_service}
+ )
+ );
+}
+
+sub start_request {
+ my ($self, $req, @payload) = @_;
+ $self->receiver->requests->{my $tag = "$req"} = $req;
+ my $meta = { progress => !!$req->on_progress };
+ $self->channel->write_message(request => $tag => $meta => @payload);
+}
+
+sub receive {
+ my ($self, @payload) = @_;
+ $self->channel->write_message(message => @payload);
+}
+
+1;
--- /dev/null
+package Tak::ConnectorService;
+
+use IPC::Open2;
+use IO::All;
+use Tak::Router;
+use Tak::Client;
+use Tak::ConnectionService;
+use Moo;
+
+with 'Tak::Role::Service';
+
+has connections => (is => 'ro', default => sub { Tak::Router->new });
+
+sub handle_create {
+ my ($self) = @_;
+ my $kid_pid = IPC::Open2::open2(my $kid_out, my $kid_in, $^X, '-')
+ or die "Couldn't open2 child: $!";
+ io($kid_in)->print(io('maint/mk-fat |')->all, "__END__\n");
+ my $connection = Tak::ConnectionService->new(
+ read_fh => $kid_out, write_fh => $kid_in,
+ listening_service => Tak::Router->new
+ );
+ my $client = Tak::Client->new(service => $connection);
+ # actually, we should register with a monotonic id and
+ # stash the pid elsewhere. but meh for now.
+ my $pid = $client->do(meta => 'pid');
+ $self->connections->register('|'.$pid, $connection);
+ return ('proxy', '|'.$pid);
+}
+
+sub start_proxy_request {
+ my ($self, $req, @payload) = @_;;
+ $self->connections->start_request($req, @payload);
+}
+
+sub receive_proxy {
+ my ($self, @payload) = @_;
+ $self->connections->receive(@payload);
+}
+
+1;
sub BUILD { shift->write_fh->autoflush(1); }
-sub receive {
+sub read_message {
my ($self) = @_;
- while (my $line = readline($self->read_fh)) {
+ if (defined(my $line = readline($self->read_fh))) {
if (my $unpacked = $self->_unpack_line($line)) {
return $unpacked;
}
my ($self, $line) = @_;
my $data = eval { decode_json($line) };
unless ($data) {
- $self->send(MISTAKE => invalid_json => $@||'No data and no exception');
+ $self->write_message(mistake => invalid_json => $@||'No data and no exception');
return;
}
unless (ref($data) eq 'ARRAY') {
- $self->send(MISTAKE => message_format => "Not an ARRAY");
+ $self->write_message(mistake => message_format => "Not an ARRAY");
return;
}
unless (@$data > 0) {
- $self->send(MISTAKE => message_format => "Empty request array");
+ $self->write_message(mistake => message_format => "Empty request array");
return;
}
$data;
}
-sub send {
+sub write_message {
my ($self, @msg) = @_;
my $json = eval { encode_json(\@msg) };
unless ($json) {
- $self->_raw_send(
+ $self->_raw_write_message(
encode_json(
- [ FAILURE => invalid_message => $@||'No data and no exception' ]
+ [ failure => invalid_message => $@||'No data and no exception' ]
)
);
return;
}
- $self->_raw_send($json);
+ $self->_raw_write_message($json);
}
-sub _raw_send {
+sub _raw_write_message {
my ($self, $raw) = @_;
#warn "Sending: ${raw}\n";
print { $self->write_fh } $raw."\n";
package Tak::Loop;
+use IO::Select;
use Moo;
+has is_running => (is => 'rw', clearer => 'loop_stop');
+
+has _read_watches => (is => 'ro', default => sub { {} });
+has _read_select => (is => 'ro', default => sub { IO::Select->new });
+
+sub watch_io {
+ my ($self, %watch) = @_;
+ my $fh = $watch{handle};
+ if (my $cb = $watch{on_read_ready}) {
+ $self->_read_select->add($fh);
+ $self->_read_watches->{$fh} = $cb;
+ }
+}
+
+sub unwatch_io {
+ my ($self, %watch) = @_;
+ my $fh = $watch{handle};
+ if ($watch{on_read_ready}) {
+ $self->_read_select->remove($fh);
+ delete $self->_read_watches->{$fh};
+ }
+}
+
+sub loop_once {
+ my ($self) = @_;
+ my $read = $self->_read_watches;
+ my ($readable) = IO::Select->select($self->_read_select);
+ foreach my $fh (@$readable) {
+ $read->{$fh}();
+ }
+}
+
+sub loop_forever {
+ my ($self) = @_;
+ $self->is_running(1);
+ while ($self->is_running) {
+ $self->loop_once;
+ }
+}
+
1;
has router => (is => 'ro', required => 1, weak_ref => 1);
+sub handle_pid {
+ return $$;
+}
+
sub handle_register {
my ($self, $name, $class, @args) = @_;
(my $file = $class) =~ s/::/\//g;
$self->on_result->(Tak::Result->new(type => $type, data => \@data));
}
+sub flatten {
+ my ($self) = @_;
+ return ($self->type, @{$self->data});
+}
+
sub success { shift->result(success => @_) }
sub mistake { shift->result(mistake => @_) }
sub failure { shift->result(failure => @_) }
has type => (is => 'ro', required => 1);
has data => (is => 'ro', required => 1);
+sub flatten { $_[0]->type, @{$_[0]->data} }
+
sub get {
my ($self) = @_;
$self->throw unless $self->type eq 'success';
use Moo::Role;
sub start_request {
- my ($self, $req, $type, @args) = @_;
+ my ($self, $req, $type, @payload) = @_;
unless ($type) {
$req->mistake(request_type => "No request type given");
return;
}
if (my $meth = $self->can("handle_${type}")) {
my @result;
- if (eval { @result = $self->$meth(@args); 1 }) {
+ if (eval { @result = $self->$meth(@payload); 1 }) {
$req->success(@result);
} else {
if (ref($@) eq 'ARRAY') {
$req->failure(exception => $@);
}
}
+ } elsif ($meth = $self->can("start_${type}_request")) {
+ $self->$meth($req => @payload);
} else {
$req->mistake(request_type => "Unknown request type ${type}");
}
}
-sub receive { }
+sub receive {
+ my ($self, $type, @payload) = @_;
+ if (my $meth = $self->can("receive_${type}")) {
+ $self->$meth(@payload);
+ }
+}
1;
package Tak::Router;
+use Tak::MetaService;
use Moo;
has services => (is => 'ro', default => sub { {} });
+sub BUILD {
+ my ($self) = @_;
+ $self->register(meta => Tak::MetaService->new(router => $self));
+}
+
sub start_request {
my ($self, $req, $target, @payload) = @_;
$req->mistake("Reached router with no target")
--- /dev/null
+package Tak::STDIOSetup;
+
+use Tak::ConnectionService;
+use Tak::Router;
+use Tak;
+use strictures 1;
+
+sub run {
+ open my $stdin, '<&', \*STDIN;
+ open my $stdout, '>&', \*STDOUT;
+ close STDIN; close STDOUT;
+ my $connection = Tak::ConnectionService->new(
+ read_fh => $stdin, write_fh => $stdout,
+ listening_service => Tak::Router->new
+ );
+ Tak->loop->loop_forever;
+}
+
+1;
#fatpack tree $(fatpack packlists-for strictures.pm Moo.pm JSON/PP.pm)
fatpack file
#rm -r fatlib
-echo "use lib 'lib'; use Tak::World; Tak::World->new_from_stdio->run;"
+echo "use lib 'lib'; use Tak::STDIOSetup; Tak::STDIOSetup->run;"