From: Matt S Trout Date: Thu, 19 Jul 2012 20:21:54 +0000 (+0000) Subject: parallelise connection setup X-Git-Tag: v0.002002~5 X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=commitdiff_plain;h=fbd3b8ecbd2c9004f0e56ff1c0bc30f677a19c62;p=scpubgit%2FObject-Remote.git parallelise connection setup --- diff --git a/Changes b/Changes index e24ef1b..be4beb9 100644 --- a/Changes +++ b/Changes @@ -1,3 +1,4 @@ + - support Object::Remote->start::connect - timer support in MiniLoop 0.002001 - 2012-07-18 diff --git a/lib/Object/Remote.pm b/lib/Object/Remote.pm index 0e9a559..1d3edf9 100644 --- a/lib/Object/Remote.pm +++ b/lib/Object/Remote.pm @@ -25,7 +25,7 @@ sub new { sub connect { my ($class, $to) = @_; - use_module('Object::Remote::Connection')->new_from_spec($to); + use_module('Object::Remote::Connection')->maybe::start::new_from_spec($to); } sub current_loop { diff --git a/lib/Object/Remote/Connection.pm b/lib/Object/Remote/Connection.pm index 21bcec4..ed78e0e 100644 --- a/lib/Object/Remote/Connection.pm +++ b/lib/Object/Remote/Connection.pm @@ -119,7 +119,9 @@ sub new_from_spec { my ($class, $spec) = @_; return $spec if blessed $spec; foreach my $poss (do { our @Guess }) { - if (my $obj = $poss->($spec)) { return $obj } + if (my $conn = $poss->($spec)) { + return $conn->maybe::start::connect; + } } die "Couldn't figure out what to do with ${spec}"; } diff --git a/lib/Object/Remote/Connector/Local.pm b/lib/Object/Remote/Connector/Local.pm index bfe6639..729bcd7 100644 --- a/lib/Object/Remote/Connector/Local.pm +++ b/lib/Object/Remote/Connector/Local.pm @@ -7,7 +7,7 @@ with 'Object::Remote::Role::Connector::PerlInterpreter'; no warnings 'once'; push @Object::Remote::Connection::Guess, sub { - if (($_[0]||'') eq '-') { __PACKAGE__->new->connect } + if (($_[0]||'') eq '-') { __PACKAGE__->new } }; 1; diff --git a/lib/Object/Remote/Connector/LocalSudo.pm b/lib/Object/Remote/Connector/LocalSudo.pm index 0ff34b3..304b3e9 100644 --- a/lib/Object/Remote/Connector/LocalSudo.pm +++ b/lib/Object/Remote/Connector/LocalSudo.pm @@ -82,7 +82,7 @@ push @Object::Remote::Connection::Guess, sub { for ($_[0]) { # username followed by @ if (defined and !ref and /^ ([^\@]*?) \@ $/x) { - return __PACKAGE__->new(target_user => $1)->connect; + return __PACKAGE__->new(target_user => $1); } } return; diff --git a/lib/Object/Remote/Connector/SSH.pm b/lib/Object/Remote/Connector/SSH.pm index bf24a0a..fb6ed4b 100644 --- a/lib/Object/Remote/Connector/SSH.pm +++ b/lib/Object/Remote/Connector/SSH.pm @@ -6,9 +6,11 @@ use Moo; with 'Object::Remote::Role::Connector::PerlInterpreter'; +has ssh_to => (is => 'ro', required => 1); + around _perl_command => sub { - my ($orig, $self, $target) = @_; - return 'ssh', '-A', $target, $self->$orig($target); + my ($orig, $self) = @_; + return 'ssh', '-A', $self->ssh_to, $self->$orig; }; no warnings 'once'; @@ -17,7 +19,7 @@ push @Object::Remote::Connection::Guess, sub { for ($_[0]) { # 0-9 a-z _ - first char, those or . subsequent - hostnamish if (defined and !ref and /^(?:.*?\@)?[\w\-][\w\-\.]/) { - return __PACKAGE__->new->connect($_[0]); + return __PACKAGE__->new(ssh_to => $_[0]); } } return; diff --git a/lib/Object/Remote/Connector/UNIX.pm b/lib/Object/Remote/Connector/UNIX.pm index 614609a..926f060 100644 --- a/lib/Object/Remote/Connector/UNIX.pm +++ b/lib/Object/Remote/Connector/UNIX.pm @@ -5,8 +5,11 @@ use Moo; with 'Object::Remote::Role::Connector'; +has socket_path => (is => 'ro', required => 1); + sub _open2_for { - my ($self,$path) = @_; + my ($self) = @_; + my $path = $self->socket_path; my $sock = IO::Socket::UNIX->new($path) or die "Couldn't open socket ${path}: $!"; ($sock, $sock, undef); @@ -17,7 +20,7 @@ no warnings 'once'; push @Object::Remote::Connection::Guess, sub { for ($_[0]) { if (defined and !ref and /^(?:\.\/|\/)/) { - return __PACKAGE__->new->connect($_[0]); + return __PACKAGE__->new(socket_path => $_[0]); } } return; diff --git a/lib/Object/Remote/Future.pm b/lib/Object/Remote/Future.pm index 609799a..893fb6d 100644 --- a/lib/Object/Remote/Future.pm +++ b/lib/Object/Remote/Future.pm @@ -8,19 +8,29 @@ use CPS::Future; our @EXPORT = qw(future await_future await_all); -sub future (&) { +sub future (&;$) { my $f = $_[0]->(CPS::Future->new); - return $f if ((caller(1)||'') eq 'start'); + return $f if ((caller(1+($_[1]||0))||'') eq 'start'); await_future($f); } +our @await; + sub await_future { my $f = shift; return $f if $f->is_ready; require Object::Remote; my $loop = Object::Remote->current_loop; - $f->on_ready(sub { $loop->stop }); - $loop->run; + { + local @await = (@await, $f); + $f->on_ready(sub { + $loop->stop if $f == $await[-1] + }); + $loop->run; + } + if (@await and $await[-1]->is_ready) { + $loop->stop; + } return wantarray ? $f->get : ($f->get)[0]; } @@ -31,6 +41,8 @@ sub await_all { package start; +our $start = sub { my ($obj, $call) = (shift, shift); $obj->$call(@_); }; + sub AUTOLOAD { my $invocant = shift; my ($method) = our $AUTOLOAD =~ /^start::(.+)$/; @@ -48,6 +60,17 @@ sub AUTOLOAD { return $res; } +package maybe; + +sub start { + my ($obj, $call) = (shift, shift); + if ((caller(1)||'') eq 'start') { + $obj->$start::start($call => @_); + } else { + $obj->$call(@_); + } +} + package maybe::start; sub AUTOLOAD { diff --git a/lib/Object/Remote/MiniLoop.pm b/lib/Object/Remote/MiniLoop.pm index de2515f..e7be944 100644 --- a/lib/Object/Remote/MiniLoop.pm +++ b/lib/Object/Remote/MiniLoop.pm @@ -11,6 +11,9 @@ has is_running => (is => 'ro', clearer => 'stop'); has _read_watches => (is => 'ro', default => sub { {} }); has _read_select => (is => 'ro', default => sub { IO::Select->new }); +has _write_watches => (is => 'ro', default => sub { {} }); +has _write_select => (is => 'ro', default => sub { IO::Select->new }); + has _timers => (is => 'ro', default => sub { [] }); sub pass_watches_to { @@ -21,6 +24,12 @@ sub pass_watches_to { on_read_ready => $self->_read_watches->{$fh} ); } + foreach my $fh ($self->_write_select->handles) { + $new_loop->watch_io( + handle => $fh, + on_write_ready => $self->_write_watches->{$fh} + ); + } } sub watch_io { @@ -30,6 +39,10 @@ sub watch_io { $self->_read_select->add($fh); $self->_read_watches->{$fh} = $cb; } + if (my $cb = $watch{on_write_ready}) { + $self->_write_select->add($fh); + $self->_write_watches->{$fh} = $cb; + } } sub unwatch_io { @@ -39,6 +52,10 @@ sub unwatch_io { $self->_read_select->remove($fh); delete $self->_read_watches->{$fh}; } + if ($watch{on_write_ready}) { + $self->_write_select->remove($fh); + delete $self->_write_watches->{$fh}; + } return; } @@ -64,12 +81,18 @@ sub unwatch_time { sub loop_once { my ($self) = @_; my $read = $self->_read_watches; - my ($readable) = IO::Select->select($self->_read_select, undef, undef, 0.5); + my $write = $self->_write_watches; + my ($readable, $writeable) = IO::Select->select( + $self->_read_select, $self->_write_select, undef, 0.5 + ); # I would love to trap errors in the select call but IO::Select doesn't # differentiate between an error and a timeout. # -- no, love, mst. foreach my $fh (@$readable) { - $read->{$fh}(); + $read->{$fh}() if $read->{$fh}; + } + foreach my $fh (@$writeable) { + $write->{$fh}() if $write->{$fh}; } my $timers = $self->_timers; my $now = time(); diff --git a/lib/Object/Remote/Role/Connector/PerlInterpreter.pm b/lib/Object/Remote/Role/Connector/PerlInterpreter.pm index f304a8d..23ff0ea 100644 --- a/lib/Object/Remote/Role/Connector/PerlInterpreter.pm +++ b/lib/Object/Remote/Role/Connector/PerlInterpreter.pm @@ -4,6 +4,7 @@ use IPC::Open2; use IO::Handle; use Object::Remote::ModuleSender; use Object::Remote::Handle; +use Object::Remote::Future; use Scalar::Util qw(blessed); use Moo::Role; @@ -20,15 +21,20 @@ sub _build_module_sender { around connect => sub { my ($orig, $self) = (shift, shift); - my $conn = $self->$orig(@_); - Object::Remote::Handle->new( - connection => $conn, - class => 'Object::Remote::ModuleLoader', - args => { module_sender => $self->module_sender } - )->disarm_free; - require Object::Remote::Prompt; - Object::Remote::Prompt::maybe_set_prompt_command_on($conn); - return $conn; + my $f = $self->$start::start($orig => @_); + return future { + $f->on_done(sub { + my ($conn) = $f->get; + Object::Remote::Handle->new( + connection => $conn, + class => 'Object::Remote::ModuleLoader', + args => { module_sender => $self->module_sender } + )->disarm_free; + require Object::Remote::Prompt; + Object::Remote::Prompt::maybe_set_prompt_command_on($conn); + }); + $f; + } 2; }; sub _perl_command { 'perl', '-' } @@ -46,11 +52,26 @@ sub _start_perl { sub _open2_for { my $self = shift; my ($foreign_stdin, $foreign_stdout, $pid) = $self->_start_perl(@_); - $foreign_stdin->autoflush(1); - print $foreign_stdin 'BEGIN { $ENV{OBJECT_REMOTE_DEBUG} = 1 }'."\n" - if $ENV{OBJECT_REMOTE_DEBUG}; - print $foreign_stdin $self->fatnode_text - or die "Failed to send fatpacked data to new node on '$_[0]': $!"; + my $to_send = $self->fatnode_text; + Object::Remote->current_loop + ->watch_io( + handle => $foreign_stdin, + on_write_ready => sub { + my $len = syswrite($foreign_stdin, $to_send, 4096); + if (defined $len) { + substr($to_send, 0, $len) = ''; + } + # if the stdin went away, we'll never get Shere + # so it's not a big deal to simply give up on !defined + if (!defined($len) or 0 == length($to_send)) { + Object::Remote->current_loop + ->unwatch_io( + handle => $foreign_stdin, + on_write_ready => 1 + ); + } + } + ); return ($foreign_stdin, $foreign_stdout, $pid); } @@ -64,7 +85,7 @@ sub fatnode_text { $INC{'Object/Remote/FatNode.pm'} = __FILE__; $Object::Remote::FatNode::DATA = <<'ENDFAT'; END - $text .= $Object::Remote::FatNode::DATA; + $text .= do { no warnings 'once'; $Object::Remote::FatNode::DATA }; $text .= "ENDFAT\n"; $text .= <<'END'; eval $Object::Remote::FatNode::DATA; diff --git a/xt/local-sudo.t b/xt/local-sudo.t index ef2f1fd..c109463 100644 --- a/xt/local-sudo.t +++ b/xt/local-sudo.t @@ -9,7 +9,9 @@ use Object::Remote; my $user = $ENV{TEST_SUDOUSER} or plan skip_all => q{Requires TEST_SUDOUSER to be set}; -my $remote = TestFindUser->new::on("${user}\@"); +my $conn = Object::Remote->connect('-')->connect("${user}\@"); + +my $remote = TestFindUser->new::on($conn); my $remote_user = $remote->user; like $remote_user, qr/^\d+$/, 'returned an int'; isnt $remote_user, $<, 'ran as different user';