+ - support Object::Remote->start::connect
- timer support in MiniLoop
0.002001 - 2012-07-18
sub connect {
my ($class, $to) = @_;
- use_module('Object::Remote::Connection')->new_from_spec($to);
+ use_module('Object::Remote::Connection')->maybe::start::new_from_spec($to);
}
sub current_loop {
my ($class, $spec) = @_;
return $spec if blessed $spec;
foreach my $poss (do { our @Guess }) {
- if (my $obj = $poss->($spec)) { return $obj }
+ if (my $conn = $poss->($spec)) {
+ return $conn->maybe::start::connect;
+ }
}
die "Couldn't figure out what to do with ${spec}";
}
no warnings 'once';
push @Object::Remote::Connection::Guess, sub {
- if (($_[0]||'') eq '-') { __PACKAGE__->new->connect }
+ if (($_[0]||'') eq '-') { __PACKAGE__->new }
};
1;
for ($_[0]) {
# username followed by @
if (defined and !ref and /^ ([^\@]*?) \@ $/x) {
- return __PACKAGE__->new(target_user => $1)->connect;
+ return __PACKAGE__->new(target_user => $1);
}
}
return;
with 'Object::Remote::Role::Connector::PerlInterpreter';
+has ssh_to => (is => 'ro', required => 1);
+
around _perl_command => sub {
- my ($orig, $self, $target) = @_;
- return 'ssh', '-A', $target, $self->$orig($target);
+ my ($orig, $self) = @_;
+ return 'ssh', '-A', $self->ssh_to, $self->$orig;
};
no warnings 'once';
for ($_[0]) {
# 0-9 a-z _ - first char, those or . subsequent - hostnamish
if (defined and !ref and /^(?:.*?\@)?[\w\-][\w\-\.]/) {
- return __PACKAGE__->new->connect($_[0]);
+ return __PACKAGE__->new(ssh_to => $_[0]);
}
}
return;
with 'Object::Remote::Role::Connector';
+has socket_path => (is => 'ro', required => 1);
+
sub _open2_for {
- my ($self,$path) = @_;
+ my ($self) = @_;
+ my $path = $self->socket_path;
my $sock = IO::Socket::UNIX->new($path)
or die "Couldn't open socket ${path}: $!";
($sock, $sock, undef);
push @Object::Remote::Connection::Guess, sub {
for ($_[0]) {
if (defined and !ref and /^(?:\.\/|\/)/) {
- return __PACKAGE__->new->connect($_[0]);
+ return __PACKAGE__->new(socket_path => $_[0]);
}
}
return;
our @EXPORT = qw(future await_future await_all);
-sub future (&) {
+sub future (&;$) {
my $f = $_[0]->(CPS::Future->new);
- return $f if ((caller(1)||'') eq 'start');
+ return $f if ((caller(1+($_[1]||0))||'') eq 'start');
await_future($f);
}
+our @await;
+
sub await_future {
my $f = shift;
return $f if $f->is_ready;
require Object::Remote;
my $loop = Object::Remote->current_loop;
- $f->on_ready(sub { $loop->stop });
- $loop->run;
+ {
+ local @await = (@await, $f);
+ $f->on_ready(sub {
+ $loop->stop if $f == $await[-1]
+ });
+ $loop->run;
+ }
+ if (@await and $await[-1]->is_ready) {
+ $loop->stop;
+ }
return wantarray ? $f->get : ($f->get)[0];
}
package start;
+our $start = sub { my ($obj, $call) = (shift, shift); $obj->$call(@_); };
+
sub AUTOLOAD {
my $invocant = shift;
my ($method) = our $AUTOLOAD =~ /^start::(.+)$/;
return $res;
}
+package maybe;
+
+sub start {
+ my ($obj, $call) = (shift, shift);
+ if ((caller(1)||'') eq 'start') {
+ $obj->$start::start($call => @_);
+ } else {
+ $obj->$call(@_);
+ }
+}
+
package maybe::start;
sub AUTOLOAD {
has _read_watches => (is => 'ro', default => sub { {} });
has _read_select => (is => 'ro', default => sub { IO::Select->new });
+has _write_watches => (is => 'ro', default => sub { {} });
+has _write_select => (is => 'ro', default => sub { IO::Select->new });
+
has _timers => (is => 'ro', default => sub { [] });
sub pass_watches_to {
on_read_ready => $self->_read_watches->{$fh}
);
}
+ foreach my $fh ($self->_write_select->handles) {
+ $new_loop->watch_io(
+ handle => $fh,
+ on_write_ready => $self->_write_watches->{$fh}
+ );
+ }
}
sub watch_io {
$self->_read_select->add($fh);
$self->_read_watches->{$fh} = $cb;
}
+ if (my $cb = $watch{on_write_ready}) {
+ $self->_write_select->add($fh);
+ $self->_write_watches->{$fh} = $cb;
+ }
}
sub unwatch_io {
$self->_read_select->remove($fh);
delete $self->_read_watches->{$fh};
}
+ if ($watch{on_write_ready}) {
+ $self->_write_select->remove($fh);
+ delete $self->_write_watches->{$fh};
+ }
return;
}
sub loop_once {
my ($self) = @_;
my $read = $self->_read_watches;
- my ($readable) = IO::Select->select($self->_read_select, undef, undef, 0.5);
+ my $write = $self->_write_watches;
+ my ($readable, $writeable) = IO::Select->select(
+ $self->_read_select, $self->_write_select, undef, 0.5
+ );
# I would love to trap errors in the select call but IO::Select doesn't
# differentiate between an error and a timeout.
# -- no, love, mst.
foreach my $fh (@$readable) {
- $read->{$fh}();
+ $read->{$fh}() if $read->{$fh};
+ }
+ foreach my $fh (@$writeable) {
+ $write->{$fh}() if $write->{$fh};
}
my $timers = $self->_timers;
my $now = time();
use IO::Handle;
use Object::Remote::ModuleSender;
use Object::Remote::Handle;
+use Object::Remote::Future;
use Scalar::Util qw(blessed);
use Moo::Role;
around connect => sub {
my ($orig, $self) = (shift, shift);
- my $conn = $self->$orig(@_);
- Object::Remote::Handle->new(
- connection => $conn,
- class => 'Object::Remote::ModuleLoader',
- args => { module_sender => $self->module_sender }
- )->disarm_free;
- require Object::Remote::Prompt;
- Object::Remote::Prompt::maybe_set_prompt_command_on($conn);
- return $conn;
+ my $f = $self->$start::start($orig => @_);
+ return future {
+ $f->on_done(sub {
+ my ($conn) = $f->get;
+ Object::Remote::Handle->new(
+ connection => $conn,
+ class => 'Object::Remote::ModuleLoader',
+ args => { module_sender => $self->module_sender }
+ )->disarm_free;
+ require Object::Remote::Prompt;
+ Object::Remote::Prompt::maybe_set_prompt_command_on($conn);
+ });
+ $f;
+ } 2;
};
sub _perl_command { 'perl', '-' }
sub _open2_for {
my $self = shift;
my ($foreign_stdin, $foreign_stdout, $pid) = $self->_start_perl(@_);
- $foreign_stdin->autoflush(1);
- print $foreign_stdin 'BEGIN { $ENV{OBJECT_REMOTE_DEBUG} = 1 }'."\n"
- if $ENV{OBJECT_REMOTE_DEBUG};
- print $foreign_stdin $self->fatnode_text
- or die "Failed to send fatpacked data to new node on '$_[0]': $!";
+ my $to_send = $self->fatnode_text;
+ Object::Remote->current_loop
+ ->watch_io(
+ handle => $foreign_stdin,
+ on_write_ready => sub {
+ my $len = syswrite($foreign_stdin, $to_send, 4096);
+ if (defined $len) {
+ substr($to_send, 0, $len) = '';
+ }
+ # if the stdin went away, we'll never get Shere
+ # so it's not a big deal to simply give up on !defined
+ if (!defined($len) or 0 == length($to_send)) {
+ Object::Remote->current_loop
+ ->unwatch_io(
+ handle => $foreign_stdin,
+ on_write_ready => 1
+ );
+ }
+ }
+ );
return ($foreign_stdin, $foreign_stdout, $pid);
}
$INC{'Object/Remote/FatNode.pm'} = __FILE__;
$Object::Remote::FatNode::DATA = <<'ENDFAT';
END
- $text .= $Object::Remote::FatNode::DATA;
+ $text .= do { no warnings 'once'; $Object::Remote::FatNode::DATA };
$text .= "ENDFAT\n";
$text .= <<'END';
eval $Object::Remote::FatNode::DATA;
my $user = $ENV{TEST_SUDOUSER}
or plan skip_all => q{Requires TEST_SUDOUSER to be set};
-my $remote = TestFindUser->new::on("${user}\@");
+my $conn = Object::Remote->connect('-')->connect("${user}\@");
+
+my $remote = TestFindUser->new::on($conn);
my $remote_user = $remote->user;
like $remote_user, qr/^\d+$/, 'returned an int';
isnt $remote_user, $<, 'ran as different user';