parallelise connection setup
Matt S Trout [Thu, 19 Jul 2012 20:21:54 +0000 (20:21 +0000)]
Changes
lib/Object/Remote.pm
lib/Object/Remote/Connection.pm
lib/Object/Remote/Connector/Local.pm
lib/Object/Remote/Connector/LocalSudo.pm
lib/Object/Remote/Connector/SSH.pm
lib/Object/Remote/Connector/UNIX.pm
lib/Object/Remote/Future.pm
lib/Object/Remote/MiniLoop.pm
lib/Object/Remote/Role/Connector/PerlInterpreter.pm
xt/local-sudo.t

diff --git a/Changes b/Changes
index e24ef1b..be4beb9 100644 (file)
--- a/Changes
+++ b/Changes
@@ -1,3 +1,4 @@
+  - support Object::Remote->start::connect
   - timer support in MiniLoop
 
 0.002001 - 2012-07-18
index 0e9a559..1d3edf9 100644 (file)
@@ -25,7 +25,7 @@ sub new {
 
 sub connect {
   my ($class, $to) = @_;
-  use_module('Object::Remote::Connection')->new_from_spec($to);
+  use_module('Object::Remote::Connection')->maybe::start::new_from_spec($to);
 }
 
 sub current_loop {
index 21bcec4..ed78e0e 100644 (file)
@@ -119,7 +119,9 @@ sub new_from_spec {
   my ($class, $spec) = @_;
   return $spec if blessed $spec;
   foreach my $poss (do { our @Guess }) {
-    if (my $obj = $poss->($spec)) { return $obj }
+    if (my $conn = $poss->($spec)) {
+      return $conn->maybe::start::connect;
+    }
   }
   die "Couldn't figure out what to do with ${spec}";
 }
index bfe6639..729bcd7 100644 (file)
@@ -7,7 +7,7 @@ with 'Object::Remote::Role::Connector::PerlInterpreter';
 no warnings 'once';
 
 push @Object::Remote::Connection::Guess, sub {
-  if (($_[0]||'') eq '-') { __PACKAGE__->new->connect }
+  if (($_[0]||'') eq '-') { __PACKAGE__->new }
 };
 
 1;
index 0ff34b3..304b3e9 100644 (file)
@@ -82,7 +82,7 @@ push @Object::Remote::Connection::Guess, sub {
   for ($_[0]) {
     # username followed by @
     if (defined and !ref and /^ ([^\@]*?) \@ $/x) {
-      return __PACKAGE__->new(target_user => $1)->connect;
+      return __PACKAGE__->new(target_user => $1);
     }
   }
   return;
index bf24a0a..fb6ed4b 100644 (file)
@@ -6,9 +6,11 @@ use Moo;
 
 with 'Object::Remote::Role::Connector::PerlInterpreter';
 
+has ssh_to => (is => 'ro', required => 1);
+
 around _perl_command => sub {
-  my ($orig, $self, $target) = @_;
-  return 'ssh', '-A', $target, $self->$orig($target);
+  my ($orig, $self) = @_;
+  return 'ssh', '-A', $self->ssh_to, $self->$orig;
 };
 
 no warnings 'once';
@@ -17,7 +19,7 @@ push @Object::Remote::Connection::Guess, sub {
   for ($_[0]) {
     # 0-9 a-z _ - first char, those or . subsequent - hostnamish
     if (defined and !ref and /^(?:.*?\@)?[\w\-][\w\-\.]/) {
-      return __PACKAGE__->new->connect($_[0]);
+      return __PACKAGE__->new(ssh_to => $_[0]);
     }
   }
   return;
index 614609a..926f060 100644 (file)
@@ -5,8 +5,11 @@ use Moo;
 
 with 'Object::Remote::Role::Connector';
 
+has socket_path => (is => 'ro', required => 1);
+
 sub _open2_for {
-  my ($self,$path) = @_;
+  my ($self) = @_;
+  my $path = $self->socket_path;
   my $sock = IO::Socket::UNIX->new($path)
     or die "Couldn't open socket ${path}: $!";
   ($sock, $sock, undef);
@@ -17,7 +20,7 @@ no warnings 'once';
 push @Object::Remote::Connection::Guess, sub { 
   for ($_[0]) {
     if (defined and !ref and /^(?:\.\/|\/)/) {
-      return __PACKAGE__->new->connect($_[0]);
+      return __PACKAGE__->new(socket_path => $_[0]);
     }
   }
   return;
index 609799a..893fb6d 100644 (file)
@@ -8,19 +8,29 @@ use CPS::Future;
 
 our @EXPORT = qw(future await_future await_all);
 
-sub future (&) {
+sub future (&;$) {
   my $f = $_[0]->(CPS::Future->new);
-  return $f if ((caller(1)||'') eq 'start');
+  return $f if ((caller(1+($_[1]||0))||'') eq 'start');
   await_future($f);
 }
 
+our @await;
+
 sub await_future {
   my $f = shift;
   return $f if $f->is_ready;
   require Object::Remote;
   my $loop = Object::Remote->current_loop;
-  $f->on_ready(sub { $loop->stop });
-  $loop->run;
+  {
+    local @await = (@await, $f);
+    $f->on_ready(sub {
+      $loop->stop if $f == $await[-1]
+    });
+    $loop->run;
+  }
+  if (@await and $await[-1]->is_ready) {
+    $loop->stop;
+  }
   return wantarray ? $f->get : ($f->get)[0];
 }
 
@@ -31,6 +41,8 @@ sub await_all {
 
 package start;
 
+our $start = sub { my ($obj, $call) = (shift, shift); $obj->$call(@_); };
+
 sub AUTOLOAD {
   my $invocant = shift;
   my ($method) = our $AUTOLOAD =~ /^start::(.+)$/;
@@ -48,6 +60,17 @@ sub AUTOLOAD {
   return $res;
 }
 
+package maybe;
+
+sub start {
+  my ($obj, $call) = (shift, shift);
+  if ((caller(1)||'') eq 'start') {
+    $obj->$start::start($call => @_);
+  } else {
+    $obj->$call(@_);
+  }
+}
+
 package maybe::start;
 
 sub AUTOLOAD {
index de2515f..e7be944 100644 (file)
@@ -11,6 +11,9 @@ has is_running => (is => 'ro', clearer => 'stop');
 has _read_watches => (is => 'ro', default => sub { {} });
 has _read_select => (is => 'ro', default => sub { IO::Select->new });
 
+has _write_watches => (is => 'ro', default => sub { {} });
+has _write_select => (is => 'ro', default => sub { IO::Select->new });
+
 has _timers => (is => 'ro', default => sub { [] });
 
 sub pass_watches_to {
@@ -21,6 +24,12 @@ sub pass_watches_to {
       on_read_ready => $self->_read_watches->{$fh}
     );
   }
+  foreach my $fh ($self->_write_select->handles) {
+    $new_loop->watch_io(
+      handle => $fh,
+      on_write_ready => $self->_write_watches->{$fh}
+    );
+  }
 }
 
 sub watch_io {
@@ -30,6 +39,10 @@ sub watch_io {
     $self->_read_select->add($fh);
     $self->_read_watches->{$fh} = $cb;
   }
+  if (my $cb = $watch{on_write_ready}) {
+    $self->_write_select->add($fh);
+    $self->_write_watches->{$fh} = $cb;
+  }
 }
 
 sub unwatch_io {
@@ -39,6 +52,10 @@ sub unwatch_io {
     $self->_read_select->remove($fh);
     delete $self->_read_watches->{$fh};
   }
+  if ($watch{on_write_ready}) {
+    $self->_write_select->remove($fh);
+    delete $self->_write_watches->{$fh};
+  }
   return;
 }
 
@@ -64,12 +81,18 @@ sub unwatch_time {
 sub loop_once {
   my ($self) = @_;
   my $read = $self->_read_watches;
-  my ($readable) = IO::Select->select($self->_read_select, undef, undef, 0.5);
+  my $write = $self->_write_watches;
+  my ($readable, $writeable) = IO::Select->select(
+    $self->_read_select, $self->_write_select, undef, 0.5
+  );
   # I would love to trap errors in the select call but IO::Select doesn't
   # differentiate between an error and a timeout.
   #   -- no, love, mst.
   foreach my $fh (@$readable) {
-    $read->{$fh}();
+    $read->{$fh}() if $read->{$fh};
+  }
+  foreach my $fh (@$writeable) {
+    $write->{$fh}() if $write->{$fh};
   }
   my $timers = $self->_timers;
   my $now = time();
index f304a8d..23ff0ea 100644 (file)
@@ -4,6 +4,7 @@ use IPC::Open2;
 use IO::Handle;
 use Object::Remote::ModuleSender;
 use Object::Remote::Handle;
+use Object::Remote::Future;
 use Scalar::Util qw(blessed);
 use Moo::Role;
 
@@ -20,15 +21,20 @@ sub _build_module_sender {
 
 around connect => sub {
   my ($orig, $self) = (shift, shift);
-  my $conn = $self->$orig(@_);
-  Object::Remote::Handle->new(
-    connection => $conn,
-    class => 'Object::Remote::ModuleLoader',
-    args => { module_sender => $self->module_sender }
-  )->disarm_free;
-  require Object::Remote::Prompt;
-  Object::Remote::Prompt::maybe_set_prompt_command_on($conn);
-  return $conn;
+  my $f = $self->$start::start($orig => @_);
+  return future {
+    $f->on_done(sub {
+      my ($conn) = $f->get;
+      Object::Remote::Handle->new(
+        connection => $conn,
+        class => 'Object::Remote::ModuleLoader',
+        args => { module_sender => $self->module_sender }
+      )->disarm_free;
+      require Object::Remote::Prompt;
+      Object::Remote::Prompt::maybe_set_prompt_command_on($conn);
+    });
+    $f;
+  } 2;
 };
 
 sub _perl_command { 'perl', '-' }
@@ -46,11 +52,26 @@ sub _start_perl {
 sub _open2_for {
   my $self = shift;
   my ($foreign_stdin, $foreign_stdout, $pid) = $self->_start_perl(@_);
-  $foreign_stdin->autoflush(1);
-  print $foreign_stdin 'BEGIN { $ENV{OBJECT_REMOTE_DEBUG} = 1 }'."\n"
-    if $ENV{OBJECT_REMOTE_DEBUG};
-  print $foreign_stdin $self->fatnode_text
-    or die "Failed to send fatpacked data to new node on '$_[0]': $!";
+  my $to_send = $self->fatnode_text;
+  Object::Remote->current_loop
+                ->watch_io(
+                    handle => $foreign_stdin,
+                    on_write_ready => sub {
+                      my $len = syswrite($foreign_stdin, $to_send, 4096);
+                      if (defined $len) {
+                        substr($to_send, 0, $len) = '';
+                      }
+                      # if the stdin went away, we'll never get Shere
+                      # so it's not a big deal to simply give up on !defined
+                      if (!defined($len) or 0 == length($to_send)) {
+                        Object::Remote->current_loop
+                                      ->unwatch_io(
+                                          handle => $foreign_stdin,
+                                          on_write_ready => 1
+                                        );
+                      }
+                    }
+                  );
   return ($foreign_stdin, $foreign_stdout, $pid);
 }
 
@@ -64,7 +85,7 @@ sub fatnode_text {
 $INC{'Object/Remote/FatNode.pm'} = __FILE__;
 $Object::Remote::FatNode::DATA = <<'ENDFAT';
 END
-  $text .= $Object::Remote::FatNode::DATA;
+  $text .= do { no warnings 'once'; $Object::Remote::FatNode::DATA };
   $text .= "ENDFAT\n";
   $text .= <<'END';
 eval $Object::Remote::FatNode::DATA;
index ef2f1fd..c109463 100644 (file)
@@ -9,7 +9,9 @@ use Object::Remote;
 my $user = $ENV{TEST_SUDOUSER}
     or plan skip_all => q{Requires TEST_SUDOUSER to be set};
 
-my $remote = TestFindUser->new::on("${user}\@");
+my $conn = Object::Remote->connect('-')->connect("${user}\@");
+
+my $remote = TestFindUser->new::on($conn);
 my $remote_user = $remote->user;
 like $remote_user, qr/^\d+$/, 'returned an int';
 isnt $remote_user, $<, 'ran as different user';