From: Matt S Trout Date: Sun, 6 Nov 2011 10:15:13 +0000 (+0000) Subject: repl works again X-Git-Tag: v0.001001~26 X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=commitdiff_plain;h=2791fd73b196072f047b2b3d746b64be53312ca3;p=scpubgit%2FTak.git repl works again --- diff --git a/bin/tak-repl b/bin/tak-repl index 2383219..d6abb6b 100644 --- a/bin/tak-repl +++ b/bin/tak-repl @@ -5,10 +5,27 @@ use Tak::Router; use Tak::REPL; use strictures 1; -my $client = Tak::Client->new(service => Tak::Router->new); +my $iclient = Tak::Client->new(service => Tak::Router->new); -$client->curry('meta')->do(register => eval => 'Tak::EvalService'); +my $rclient; -my $repl = Tak::REPL->new(client => $client->curry('eval')); +if (my $on = $ARGV[0]) { + $iclient->curry('meta')->do(register => connector => 'Tak::ConnectorService'); + my @path = $iclient->curry('connector')->do(create => $ARGV[0]); + $rclient = $iclient->curry('connector', 'connection', @path, 'remote'); + my $lclient = $iclient->curry('connector', 'connection', @path, 'local'); + $lclient->curry('meta')->do(register => module_sender => 'Tak::ModuleSender'); + $rclient->curry('meta')->do( + register => module_loader => 'Tak::ModuleLoader', + expose => { module_sender => [ 'remote', 'module_sender' ] } + ); + $rclient->curry('module_loader')->do('enable'); +} else { + $rclient = $iclient; +} + +$rclient->curry('meta')->do(register => eval => 'Tak::EvalService'); + +my $repl = Tak::REPL->new(client => $rclient->curry('eval')); $repl->run; diff --git a/lib/Tak/ConnectorService.pm b/lib/Tak/ConnectorService.pm index fa39815..2fb1eaf 100644 --- a/lib/Tak/ConnectorService.pm +++ b/lib/Tak/ConnectorService.pm @@ -5,16 +5,18 @@ use IO::All; use Tak::Router; use Tak::Client; use Tak::ConnectionService; +use Net::OpenSSH; use Moo; with 'Tak::Role::Service'; has connections => (is => 'ro', default => sub { Tak::Router->new }); +has ssh => (is => 'ro', default => sub { {} }); + sub handle_create { - my ($self) = @_; - my $kid_pid = IPC::Open2::open2(my $kid_out, my $kid_in, $^X, '-') - or die "Couldn't open2 child: $!"; + my ($self, $on) = @_; + my ($kid_in, $kid_out, $kid_pid) = $self->_open($on); $kid_in->print(io('maint/mk-fat |')->all, "__END__\n"); my $connection = Tak::ConnectionService->new( read_fh => $kid_out, write_fh => $kid_in, @@ -24,16 +26,33 @@ sub handle_create { # actually, we should register with a monotonic id and # stash the pid elsewhere. but meh for now. my $pid = $client->do(meta => 'pid'); - $self->connections->register('|'.$pid, $connection); - return ('proxy', '|'.$pid); + my $name = ($on||'|').':'.$pid; + my $conn_router = Tak::Router->new; + $conn_router->register(local => $connection->receiver->service); + $conn_router->register(remote => $connection); + $self->connections->register($name, $conn_router); + return ($name); +} + +sub _open { + my ($self, $on) = @_; + unless ($on) { + my $kid_pid = IPC::Open2::open2(my $kid_out, my $kid_in, $^X, '-') + or die "Couldn't open2 child: $!"; + return ($kid_in, $kid_out, $kid_pid); + } + my $ssh = $self->ssh->{$on} ||= Net::OpenSSH->new($on); + $ssh->error and + die "Couldn't establish ssh connection: ".$ssh->error; + return $ssh->open2('perl','-'); } -sub start_proxy_request { +sub start_connection_request { my ($self, $req, @payload) = @_;; $self->connections->start_request($req, @payload); } -sub receive_proxy { +sub receive_connection { my ($self, @payload) = @_; $self->connections->receive(@payload); } diff --git a/lib/Tak/Loop.pm b/lib/Tak/Loop.pm index 464eb69..a1a4db3 100644 --- a/lib/Tak/Loop.pm +++ b/lib/Tak/Loop.pm @@ -30,7 +30,9 @@ sub loop_once { my ($self) = @_; my $read = $self->_read_watches; my ($readable) = IO::Select->select($self->_read_select, undef, undef, 0.5); - die "FFFFFUUUUU: $!" unless $readable; + # I would love to trap errors in the select call but IO::Select doesn't + # differentiate between an error and a timeout. + # -- no, love, mst. foreach my $fh (@$readable) { $read->{$fh}(); } diff --git a/lib/Tak/MetaService.pm b/lib/Tak/MetaService.pm index 4dfe9d4..62bafa8 100644 --- a/lib/Tak/MetaService.pm +++ b/lib/Tak/MetaService.pm @@ -1,5 +1,6 @@ package Tak::MetaService; +use Tak::WeakClient; use Moo; with 'Tak::Role::Service'; @@ -11,10 +12,16 @@ sub handle_pid { } sub handle_register { - my ($self, $name, $class, @args) = @_; + my ($self, $name, $class, %args) = @_; (my $file = $class) =~ s/::/\//g; require "${file}.pm"; - my $new = $class->new(@args); + if (my $expose = delete $args{expose}) { + my $client = Tak::WeakClient->new(service => $self->router); + foreach my $name (%$expose) { + $args{$name} = $client->curry(@{$expose->{$name}}); + } + } + my $new = $class->new(\%args); $self->router->register($name => $new); return "Registered ${name}"; } diff --git a/lib/Tak/ModuleLoader.pm b/lib/Tak/ModuleLoader.pm index 924230b..bea9c49 100644 --- a/lib/Tak/ModuleLoader.pm +++ b/lib/Tak/ModuleLoader.pm @@ -1,23 +1,35 @@ package Tak::ModuleLoader; +use Tak::ModuleLoader::Hook; use Moo; -has remote => (is => 'ro', required => 1); +with 'Tak::Role::Service'; -sub inc_callback { +has module_sender => (is => 'ro', required => 1); + +has inc_hook => (is => 'lazy'); + +sub _build_inc_hook { + my ($self) = @_; + Tak::ModuleLoader::Hook->new(sender => $self->module_sender); +} + +sub handle_enable { my ($self) = @_; - sub { return if our $In_Reentry; $self->maybe_load_module($_[1]) } + push @INC, $self->inc_hook; + return 'enabled'; } -sub maybe_load_module { - my ($self, $module) = @_; - local our $In_Reentry = 1; - my ($status, $code) = $self->remote->blocking_request(source_for => $module); - if ($status eq 'RESULT') { - open my $fh, '<', \$code; - return $fh; - } - return; +sub handle_disable { + my ($self) = @_; + my $hook = $self->inc_hook; + @INC = grep $_ ne $hook, @INC; + return 'disabled'; +} + +sub DEMOLISH { + my ($self) = @_; + $self->handle_disable; } 1; diff --git a/lib/Tak/ModuleLoader/Hook.pm b/lib/Tak/ModuleLoader/Hook.pm new file mode 100644 index 0000000..c1a1627 --- /dev/null +++ b/lib/Tak/ModuleLoader/Hook.pm @@ -0,0 +1,18 @@ +package Tak::ModuleLoader::Hook; + +use Moo; + +has sender => (is => 'ro', required => 1); + +sub Tak::ModuleLoader::Hook::INC { # unqualified INC forced into package main + my ($self, $module) = @_; + my $result = $self->sender->result_of(source_for => $module); + if ($result->is_success) { + my $code = $result->get; + open my $fh, '<', \$code; + return $fh; + } + return; +} + +1; diff --git a/lib/Tak/ModuleSender.pm b/lib/Tak/ModuleSender.pm index e565a26..9ecbe68 100644 --- a/lib/Tak/ModuleSender.pm +++ b/lib/Tak/ModuleSender.pm @@ -3,14 +3,16 @@ package Tak::ModuleSender; use IO::All; use Moo; +with 'Tak::Role::Service'; + sub handle_source_for { my ($self, $module) = @_; my $io = io->dir("$ENV{HOME}/perl5/lib/perl5")->catfile($module); unless ($io->exists) { - return FAILURE => undef; + die [ 'failure' ]; } my $code = $io->all; - return RESULT => $code; + return $code; } 1; diff --git a/lib/Tak/Result.pm b/lib/Tak/Result.pm index 2c06242..38d5e59 100644 --- a/lib/Tak/Result.pm +++ b/lib/Tak/Result.pm @@ -7,9 +7,11 @@ has data => (is => 'ro', required => 1); sub flatten { $_[0]->type, @{$_[0]->data} } +sub is_success { $_[0]->type eq 'success' } + sub get { my ($self) = @_; - $self->throw unless $self->type eq 'success'; + $self->throw unless $self->is_success; return wantarray ? @{$self->data} : $self->data->[0]; } diff --git a/lib/Tak/Router.pm b/lib/Tak/Router.pm index f7e56dd..5c2502b 100644 --- a/lib/Tak/Router.pm +++ b/lib/Tak/Router.pm @@ -1,6 +1,7 @@ package Tak::Router; use Tak::MetaService; +use Scalar::Util qw(weaken); use Moo; has services => (is => 'ro', default => sub { {} }); @@ -12,9 +13,9 @@ sub BUILD { sub start_request { my ($self, $req, $target, @payload) = @_; - $req->mistake("Reached router with no target") + return $req->mistake("Reached router with no target") unless $target; - $req->failure("Reached router with invalid target ${target}") + return $req->failure("Reached router with invalid target ${target}") unless my $next = $self->services->{$target}; $next->start_request($req, @payload); } @@ -31,6 +32,11 @@ sub register { $self->services->{$name} = $service; } +sub register_weak { + my ($self, $name, $service) = @_; + weaken($self->services->{$name} = $service); +} + sub deregister { my ($self, $name) = @_; delete $self->services->{$name} diff --git a/lib/Tak/STDIOSetup.pm b/lib/Tak/STDIOSetup.pm index cb35399..dcf0dc4 100644 --- a/lib/Tak/STDIOSetup.pm +++ b/lib/Tak/STDIOSetup.pm @@ -8,13 +8,21 @@ use strictures 1; sub run { open my $stdin, '<&', \*STDIN; open my $stdout, '>&', \*STDOUT; - close STDIN; close STDOUT; + # if we don't re-open them then 0 and 1 get re-used - which is not + # only potentially bloody confusing but results in warnings like: + # "Filehandle STDOUT reopened as STDIN only for input" + close STDIN; open STDIN, '<', '/dev/null'; + close STDOUT; open STDOUT, '>', '/dev/null'; my $done; my $connection = Tak::ConnectionService->new( read_fh => $stdin, write_fh => $stdout, listening_service => Tak::Router->new, on_close => sub { $done = 1 } ); + $connection->receiver->service->register_weak(remote => $connection); + if ($0 eq '-') { + $0 = 'tak-stdio-node'; + } Tak->loop_until($done); } diff --git a/lib/Tak/WeakClient.pm b/lib/Tak/WeakClient.pm new file mode 100644 index 0000000..dd4ae0e --- /dev/null +++ b/lib/Tak/WeakClient.pm @@ -0,0 +1,9 @@ +package Tak::WeakClient; + +use Moo; + +extends 'Tak::Client'; + +has service => (is => 'ro', required => 1, weak_ref => 1); + +1;