use Tak::REPL;
use strictures 1;
-my $client = Tak::Client->new(service => Tak::Router->new);
+my $iclient = Tak::Client->new(service => Tak::Router->new);
-$client->curry('meta')->do(register => eval => 'Tak::EvalService');
+my $rclient;
-my $repl = Tak::REPL->new(client => $client->curry('eval'));
+if (my $on = $ARGV[0]) {
+ $iclient->curry('meta')->do(register => connector => 'Tak::ConnectorService');
+ my @path = $iclient->curry('connector')->do(create => $ARGV[0]);
+ $rclient = $iclient->curry('connector', 'connection', @path, 'remote');
+ my $lclient = $iclient->curry('connector', 'connection', @path, 'local');
+ $lclient->curry('meta')->do(register => module_sender => 'Tak::ModuleSender');
+ $rclient->curry('meta')->do(
+ register => module_loader => 'Tak::ModuleLoader',
+ expose => { module_sender => [ 'remote', 'module_sender' ] }
+ );
+ $rclient->curry('module_loader')->do('enable');
+} else {
+ $rclient = $iclient;
+}
+
+$rclient->curry('meta')->do(register => eval => 'Tak::EvalService');
+
+my $repl = Tak::REPL->new(client => $rclient->curry('eval'));
$repl->run;
use Tak::Router;
use Tak::Client;
use Tak::ConnectionService;
+use Net::OpenSSH;
use Moo;
with 'Tak::Role::Service';
has connections => (is => 'ro', default => sub { Tak::Router->new });
+has ssh => (is => 'ro', default => sub { {} });
+
sub handle_create {
- my ($self) = @_;
- my $kid_pid = IPC::Open2::open2(my $kid_out, my $kid_in, $^X, '-')
- or die "Couldn't open2 child: $!";
+ my ($self, $on) = @_;
+ my ($kid_in, $kid_out, $kid_pid) = $self->_open($on);
$kid_in->print(io('maint/mk-fat |')->all, "__END__\n");
my $connection = Tak::ConnectionService->new(
read_fh => $kid_out, write_fh => $kid_in,
# actually, we should register with a monotonic id and
# stash the pid elsewhere. but meh for now.
my $pid = $client->do(meta => 'pid');
- $self->connections->register('|'.$pid, $connection);
- return ('proxy', '|'.$pid);
+ my $name = ($on||'|').':'.$pid;
+ my $conn_router = Tak::Router->new;
+ $conn_router->register(local => $connection->receiver->service);
+ $conn_router->register(remote => $connection);
+ $self->connections->register($name, $conn_router);
+ return ($name);
+}
+
+sub _open {
+ my ($self, $on) = @_;
+ unless ($on) {
+ my $kid_pid = IPC::Open2::open2(my $kid_out, my $kid_in, $^X, '-')
+ or die "Couldn't open2 child: $!";
+ return ($kid_in, $kid_out, $kid_pid);
+ }
+ my $ssh = $self->ssh->{$on} ||= Net::OpenSSH->new($on);
+ $ssh->error and
+ die "Couldn't establish ssh connection: ".$ssh->error;
+ return $ssh->open2('perl','-');
}
-sub start_proxy_request {
+sub start_connection_request {
my ($self, $req, @payload) = @_;;
$self->connections->start_request($req, @payload);
}
-sub receive_proxy {
+sub receive_connection {
my ($self, @payload) = @_;
$self->connections->receive(@payload);
}
my ($self) = @_;
my $read = $self->_read_watches;
my ($readable) = IO::Select->select($self->_read_select, undef, undef, 0.5);
- die "FFFFFUUUUU: $!" unless $readable;
+ # I would love to trap errors in the select call but IO::Select doesn't
+ # differentiate between an error and a timeout.
+ # -- no, love, mst.
foreach my $fh (@$readable) {
$read->{$fh}();
}
package Tak::MetaService;
+use Tak::WeakClient;
use Moo;
with 'Tak::Role::Service';
}
sub handle_register {
- my ($self, $name, $class, @args) = @_;
+ my ($self, $name, $class, %args) = @_;
(my $file = $class) =~ s/::/\//g;
require "${file}.pm";
- my $new = $class->new(@args);
+ if (my $expose = delete $args{expose}) {
+ my $client = Tak::WeakClient->new(service => $self->router);
+ foreach my $name (%$expose) {
+ $args{$name} = $client->curry(@{$expose->{$name}});
+ }
+ }
+ my $new = $class->new(\%args);
$self->router->register($name => $new);
return "Registered ${name}";
}
package Tak::ModuleLoader;
+use Tak::ModuleLoader::Hook;
use Moo;
-has remote => (is => 'ro', required => 1);
+with 'Tak::Role::Service';
-sub inc_callback {
+has module_sender => (is => 'ro', required => 1);
+
+has inc_hook => (is => 'lazy');
+
+sub _build_inc_hook {
+ my ($self) = @_;
+ Tak::ModuleLoader::Hook->new(sender => $self->module_sender);
+}
+
+sub handle_enable {
my ($self) = @_;
- sub { return if our $In_Reentry; $self->maybe_load_module($_[1]) }
+ push @INC, $self->inc_hook;
+ return 'enabled';
}
-sub maybe_load_module {
- my ($self, $module) = @_;
- local our $In_Reentry = 1;
- my ($status, $code) = $self->remote->blocking_request(source_for => $module);
- if ($status eq 'RESULT') {
- open my $fh, '<', \$code;
- return $fh;
- }
- return;
+sub handle_disable {
+ my ($self) = @_;
+ my $hook = $self->inc_hook;
+ @INC = grep $_ ne $hook, @INC;
+ return 'disabled';
+}
+
+sub DEMOLISH {
+ my ($self) = @_;
+ $self->handle_disable;
}
1;
--- /dev/null
+package Tak::ModuleLoader::Hook;
+
+use Moo;
+
+has sender => (is => 'ro', required => 1);
+
+sub Tak::ModuleLoader::Hook::INC { # unqualified INC forced into package main
+ my ($self, $module) = @_;
+ my $result = $self->sender->result_of(source_for => $module);
+ if ($result->is_success) {
+ my $code = $result->get;
+ open my $fh, '<', \$code;
+ return $fh;
+ }
+ return;
+}
+
+1;
use IO::All;
use Moo;
+with 'Tak::Role::Service';
+
sub handle_source_for {
my ($self, $module) = @_;
my $io = io->dir("$ENV{HOME}/perl5/lib/perl5")->catfile($module);
unless ($io->exists) {
- return FAILURE => undef;
+ die [ 'failure' ];
}
my $code = $io->all;
- return RESULT => $code;
+ return $code;
}
1;
sub flatten { $_[0]->type, @{$_[0]->data} }
+sub is_success { $_[0]->type eq 'success' }
+
sub get {
my ($self) = @_;
- $self->throw unless $self->type eq 'success';
+ $self->throw unless $self->is_success;
return wantarray ? @{$self->data} : $self->data->[0];
}
package Tak::Router;
use Tak::MetaService;
+use Scalar::Util qw(weaken);
use Moo;
has services => (is => 'ro', default => sub { {} });
sub start_request {
my ($self, $req, $target, @payload) = @_;
- $req->mistake("Reached router with no target")
+ return $req->mistake("Reached router with no target")
unless $target;
- $req->failure("Reached router with invalid target ${target}")
+ return $req->failure("Reached router with invalid target ${target}")
unless my $next = $self->services->{$target};
$next->start_request($req, @payload);
}
$self->services->{$name} = $service;
}
+sub register_weak {
+ my ($self, $name, $service) = @_;
+ weaken($self->services->{$name} = $service);
+}
+
sub deregister {
my ($self, $name) = @_;
delete $self->services->{$name}
sub run {
open my $stdin, '<&', \*STDIN;
open my $stdout, '>&', \*STDOUT;
- close STDIN; close STDOUT;
+ # if we don't re-open them then 0 and 1 get re-used - which is not
+ # only potentially bloody confusing but results in warnings like:
+ # "Filehandle STDOUT reopened as STDIN only for input"
+ close STDIN; open STDIN, '<', '/dev/null';
+ close STDOUT; open STDOUT, '>', '/dev/null';
my $done;
my $connection = Tak::ConnectionService->new(
read_fh => $stdin, write_fh => $stdout,
listening_service => Tak::Router->new,
on_close => sub { $done = 1 }
);
+ $connection->receiver->service->register_weak(remote => $connection);
+ if ($0 eq '-') {
+ $0 = 'tak-stdio-node';
+ }
Tak->loop_until($done);
}
--- /dev/null
+package Tak::WeakClient;
+
+use Moo;
+
+extends 'Tak::Client';
+
+has service => (is => 'ro', required => 1, weak_ref => 1);
+
+1;