package Object::Remote::Connection;
+use Object::Remote::Logging qw (:log :dlog);
use Object::Remote::Future;
use Object::Remote::Null;
use Object::Remote::Handle;
use Object::Remote::CodeContainer;
use Object::Remote::GlobProxy;
use Object::Remote::GlobContainer;
-use Object::Remote::Logging qw (:log :dlog);
use Object::Remote::Tied;
use Object::Remote;
use Symbol;
use IO::Handle;
+use POSIX ":sys_wait_h";
use Module::Runtime qw(use_module);
use Scalar::Util qw(weaken blessed refaddr openhandle);
use JSON::PP qw(encode_json);
use Moo;
+BEGIN {
+ #this will reap child processes as soon
+ #as they are done executing so the process
+ #table cleans up as fast as possible but
+ #anything that needs to call waitpid()
+ #in the future to get the exit value of
+ #a child will get trash results if
+ #the signal handler was running.
+ #If creating a child and getting the
+ #exit value is required then set
+ #a localized version of the signal
+ #handler for CHLD to be 'IGNORE'
+ #in the smallest block possible
+ #and outside the block send
+ #the process a CHLD signal
+ #to reap anything that may
+ #have exited while blocked
+ #in waitpid()
+ $SIG{CHLD} = sub {
+ my $kid;
+ log_debug { "CHLD signal handler is executing" };
+ do {
+ $kid = waitpid(-1, WNOHANG);
+ log_trace { "waitpid() returned '$kid'" };
+ } while $kid > 0;
+ log_trace { "CHLD signal handler is done" };
+ };
+}
+
+END {
+ log_debug { "Killing all child processes in the process group" };
+
+ #send SIGINT to the process group for our children
+ kill(1, -2);
+}
+
+
our $DEBUG = !!$ENV{OBJECT_REMOTE_DEBUG};
#numbering each connection allows it to be
#tracked along with file handles in
#the logs
-BEGIN { our $NEXT_CONNECTION_ID = 0 }
+
has _id => ( is => 'ro', required => 1, default => sub { our $NEXT_CONNECTION_ID++ } );
has send_to_fh => (
},
);
+#TODO see if this is another case of the same bug below
+#where trigger never fires because the attribute isn't
+#actually set at any time
has read_channel => (
is => 'ro', required => 1,
trigger => sub {
#value and the on_close attribute is read only....
#the future never gets the on_done handler
#installed
-sub BUILD {
+sub BUILD {
my ($self) = @_;
- $self->on_close(CPS::Future->new);
+ $self->on_close(CPS::Future->new);
}
+after BUILD => sub {
+ my ($self) = @_;
+
+ return unless defined $self->child_pid;
+
+ log_debug { "Setting process group of child process" };
+
+ setpgrp($self->child_pid, 1);
+};
+
+
+
has on_close => (
is => 'rw', default => sub { CPS::Future->new },
trigger => sub {
weaken($self);
$f->on_done(sub {
Dlog_trace { "failing all of the outstanding futures for connection $_" } $self->_id;
- $self->_fail_outstanding("Connection lost: " . ($f->get)[0]);
+ $self->_fail_outstanding("Object::Remote connection lost: " . ($f->get)[0]);
});
}
);
);
}
+sub _load_if_possible {
+ my ($class) = @_;
+
+ eval "require $class";
+
+ if ($@) {
+ log_debug { "Attempt at loading '$class' failed with '$@'" };
+ }
+
+}
+
BEGIN {
unshift our @Guess, sub { blessed($_[0]) ? $_[0] : undef };
- eval { require Object::Remote::Connector::Local };
- eval { require Object::Remote::Connector::LocalSudo };
- eval { require Object::Remote::Connector::SSH };
- eval { require Object::Remote::Connector::UNIX };
+ map _load_if_possible($_), qw(
+ Object::Remote::Connector::Local
+ Object::Remote::Connector::LocalSudo
+ Object::Remote::Connector::SSH
+ Object::Remote::Connector::UNIX
+ );
}
-sub new_from_spec {
- my ($class, $spec) = @_;
- return $spec if blessed $spec;
- Dlog_debug { "creating a new connection from spec" };
+sub conn_from_spec {
+ my ($class, $spec, @args) = @_;
foreach my $poss (do { our @Guess }) {
- if (my $conn = $poss->($spec)) {
- return $conn->maybe::start::connect;
+ if (my $conn = $poss->($spec, @args)) {
+ return $conn;
}
}
- die "Couldn't figure out what to do with ${spec}";
+
+ return undef;
+}
+
+sub new_from_spec {
+ my ($class, $spec) = @_;
+ return $spec if blessed $spec;
+ my $conn = $class->conn_from_spec($spec);
+
+ die "Couldn't figure out what to do with ${spec}"
+ unless defined $conn;
+
+ return $conn->maybe::start::connect;
}
sub remote_object {
$ret = print $fh $serialized;
die "print was not successful: $!" unless defined $ret
};
-
+
if ($@) {
Dlog_debug { "exception encountered when trying to write to file handle $_: $@" } $fh;
my $error = $@; chomp($error);
BEGIN { }
push @Object::Remote::Connection::Guess, sub {
- if (($_[0]||'') eq '-') { __PACKAGE__->new }
+ if (($_[0]||'') eq '-') {
+ shift(@_);
+ __PACKAGE__->new(@_);
+ }
};
1;
for ($_[0]) {
# username followed by @
if (defined and !ref and /^ ([^\@]*?) \@ $/x) {
- return __PACKAGE__->new(target_user => $1);
+ shift(@_);
+ return __PACKAGE__->new(@_, target_user => $1);
}
}
return;
has ssh_command => (is => 'ro', default => sub { 'ssh' });
+#TODO properly integrate if this works
+BEGIN { $ENV{TERM} = 'dumb'; }
+
sub _build_ssh_perl_command {
my ($self) = @_;
return [
for ($_[0]) {
# 0-9 a-z _ - first char, those or . subsequent - hostnamish
if (defined and !ref and /^(?:.*?\@)?[\w\-][\w\-\.]/) {
- return __PACKAGE__->new(ssh_to => $_[0]);
+ my $host = shift(@_);
+ return __PACKAGE__->new(@_, ssh_to => $host);
}
}
return;
push @Object::Remote::Connection::Guess, sub {
for ($_[0]) {
if (defined and !ref and /^(?:\.\/|\/)/) {
- return __PACKAGE__->new(socket_path => $_[0]);
+ my $socket = shift(@_);
+ return __PACKAGE__->new(@_, socket_path => $socket);
}
}
return;
use strictures 1;
use Object::Remote::Node;
- Object::Remote::Node->run;
+ Object::Remote::Node->run(watchdog_timeout => $WATCHDOG_TIMEOUT);
END_END
my %files = map +($mods{$_} => scalar do { local (@ARGV, $/) = ($_); <> }),
my $timers = $self->_timers;
my $new = [ $at => $code, $watch{every} ];
$self->_sort_timers($new);
- log_debug { "Created new timer that expires at '$at'" };
+ log_debug { "Created new timer with id '$new' that expires at '$at'" };
return "$new";
}
my $wait_time = $self->_next_timer_expires_delay;
log_trace { sprintf("Run loop: loop_once() has been invoked by $c[1]:$c[2] with read:%i write:%i select timeout:%s",
scalar(keys(%$read)), scalar(keys(%$write)), defined $wait_time ? $wait_time : 'indefinite' ) };
- #TODO The docs state that select() in some instances can return a socket as ready to
- #read data even if reading from it would block and the recomendation is to set
- #handles used with select() as non-blocking but Perl on Windows can not set a
- #handle to use non-blocking IO - If Windows is not one of the operating
- #systems where select() returns a handle that could block it would work to
- #enable non-blocking mode only under Posix - the non-blocking sysread()
- #logic would work unmodified for both blocking and non-blocking handles
- #under Posix and Windows.
my ($readable, $writeable) = IO::Select->select(
- #TODO how come select() isn't used to identify handles with errors on them?
- #TODO is there a specific reason for a half second maximum wait duration?
- #The two places I've found for the runloop to be invoked don't return control
- #to the caller until a controlling variable interrupts the loop that invokes
- #loop_once() - is this to allow that variable to be polled and exit the
- #run loop? If so why isn't that behavior event driven and causes select() to
- #return?
$self->_read_select, $self->_write_select, undef, $wait_time
);
log_trace {
log_trace { "Checking timers" };
while (@$timers and $timers->[0][0] <= $now) {
my $active = $timers->[0];
- Dlog_debug { "Found timer that needs to be executed: $_" } $active;
-# my (shift @$timers)->[1]->();
+ Dlog_debug { "Found timer that needs to be executed: '$active'" };
if (defined($active->[2])) {
#handle the case of an 'every' timer
return;
}
-#::Node and ::ConnectionServer use the want_run() / want_stop()
-#counter to cause a run-loop to execute while something is active;
-#the futures do this via a different mechanism
sub want_run {
my ($self) = @_;
Dlog_debug { "Run loop: Incrimenting want_running, is now $_" }
--$self->{want_running};
}
-#TODO Hypothesis: Futures invoke run() which gives that future
-#it's own localized is_running attribute - any adjustment to the
-#is_running attribute outside of that future will not effect that
-#future so each future winds up able to call run() and stop() at
-#will with out interfering with each other - how about having
-#run loop until the future becomes ready?
sub run {
my ($self) = @_;
log_trace { "Run loop: run() invoked" };
use strictures 1;
use Object::Remote::Connector::STDIO;
use Object::Remote::Logging qw(:log :dlog);
+use Object::Remote::WatchDog;
use Object::Remote;
use CPS::Future;
sub run {
+ my ($class, %args) = @_;
log_trace { "run() has been invoked on remote node" };
+
+ if ($args{watchdog_timeout}) {
+ Object::Remote::WatchDog->new(timeout => $args{watchdog_timeout});
+ }
+
my $c = Object::Remote::Connector::STDIO->new->connect;
-
+
$c->register_class_call_handler;
my $loop = Object::Remote->current_loop;
-
+
$c->on_close->on_ready(sub {
log_info { "Node connection with call handler has closed" };
$loop->want_stop
Dlog_trace { "Preparing to read data from $_" } $fh;
my $rb = $self->_receive_data_buffer;
my $len = sysread($fh, $$rb, 32768, length($$rb));
- my $err = defined($len) ? '' : ": $!";
+ my $err = defined($len) ? 'eof' : ": $!";
if (defined($len) and $len > 0) {
log_trace { "Read $len bytes of data" };
while (my $cb = $self->on_line_call and $$rb =~ s/^(.*)\n//) {
requires '_open2_for';
-#TODO return to 10 seconds after debugging
-#has timeout => (is => 'ro', default => sub { { after => 10 } });
has timeout => (is => 'ro', default => sub { { after => 10 } });
sub connect {
->watch_time(
%{$self->timeout},
code => sub {
-# log_warn { "Connection timed out for child pid '$child_pid'" };
-# $f->fail("Connection timed out") unless $f->is_ready;
-# undef($channel);
Dlog_trace { "Connection timeout timer has fired for child pid '$child_pid'; is_ready: $_" } $f->is_ready;
unless($f->is_ready) {
log_warn { "Connection with child pid '$child_pid' has timed out" };
use IPC::Open2;
use IPC::Open3;
use IO::Handle;
+use Object::Remote::Logging qw( :log :dlog );
use Object::Remote::ModuleSender;
use Object::Remote::Handle;
use Object::Remote::Future;
-use Object::Remote::Logging qw( :log :dlog );
-use Scalar::Util qw(blessed);
-use POSIX ":sys_wait_h";
+use Scalar::Util qw(blessed weaken);
use Moo::Role;
use Symbol;
with 'Object::Remote::Role::Connector';
-#TODO ugh breaks some of the stuff in System::Introspector::Util by
-#screwing with status value of child
-BEGIN {
- $SIG{CHLD} = sub {
- my $kid;
- do {
- $kid = waitpid(-1, WNOHANG);
- } while $kid > 0;
- }
-}
-
has module_sender => (is => 'lazy');
+
#if no child_stderr file handle is specified then stderr
#of the child will be connected to stderr of the parent
-has stderr => ( is => 'rw', default => sub { \*STDERR } );
-#has stderr => ( is => 'rw' );
+has stderr => ( is => 'rw', default => sub { undef } );
sub _build_module_sender {
my ($hook) =
}
has perl_command => (is => 'lazy');
+has watchdog_timeout => ( is => 'ro', required => 1, default => sub { 0 } );
#TODO convert nice value into optional feature enabled by
#setting value of attribute
#ulimit of ~500 megs of v-ram
#TODO only works with ssh with quotes but only works locally
#with out quotes
-sub _build_perl_command { [ 'sh', '-c', '"ulimit -v 200000; nice -n 15 perl -"' ] }
-#sub _build_perl_command { [ 'perl', '-' ] }
+#sub _build_perl_command { [ 'sh', '-c', '"ulimit -v 200000; nice -n 15 perl -"' ] }
+sub _build_perl_command { [ 'perl', '-' ] }
+#sub _build_perl_command { [ 'cat' ] }
around connect => sub {
my ($orig, $self) = (shift, shift);
return future {
$f->on_done(sub {
my ($conn) = $f->get;
+ $self->_setup_watchdog_reset($conn);
my $sub = $conn->remote_sub('Object::Remote::Logging::init_logging_forwarding');
$sub->('Object::Remote::Logging', Object::Remote::Logging->arg_router);
Object::Remote::Handle->new(
Dlog_debug { "invoking connection to perl interpreter using command line: $_" } @{$self->final_perl_command};
if (defined($given_stderr)) {
+ #if the stderr data goes to an existing file handle
+ #an need an anonymous file handle is required
+ #as the other half of a pipe style file handle pair
+ #so the file handles can go into the run loop
$foreign_stderr = gensym();
} else {
+ #if no file handle has been specified
+ #for the child's stderr then connect
+ #the child stderr to the parent stderr
$foreign_stderr = ">&STDERR";
}
);
}
- #TODO open2() dupes the child stderr into the calling
- #process stderr which means if this process exits the
- #child is still attached to the shell - using open3()
- #and having the run loop manage the stderr means this
- #won't happen BUT if the run loop just sends the remote
- #stderr data to the local stderr the logs will interleave
- #for sure - a simple test would be to use open3() and just
- #close the remote stderr and see what happens - a longer
- #term solution would be for Object::Remote to offer a feature
- #where the user of a connection species a destination for output
- #either a file name or their own file handle and the node output
- #is dumped to it
-# my $pid = open2(
-# my $foreign_stdout,
-# my $foreign_stdin,
-# @{$self->final_perl_command},
-# ) or die "Failed to run perl at '$_[0]': $!";
-#
-# Dlog_trace { "Connection to remote side successful; remote stdin and stdout: $_" } [ $foreign_stdin, $foreign_stdout ];
-
-
- return ($foreign_stdin, $foreign_stdout, $pid);
+ return ($foreign_stdin, $foreign_stdout, $pid);
}
-#TODO open2() forks off a child and I have not been able to locate
-#a mechanism for reaping dead children so they don't become zombies
-#CONFIRMED there is no reaping of children being done, find a safe
-#way to do it
sub _open2_for {
my $self = shift;
my ($foreign_stdin, $foreign_stdout, $pid) = $self->_start_perl(@_);
return ($foreign_stdin, $foreign_stdout, $pid);
}
+sub _setup_watchdog_reset {
+ my ($self, $conn) = @_;
+ my $timer_id;
+
+ return unless $self->watchdog_timeout;
+
+ Dlog_trace { "Creating Watchdog management timer for connection id $_" } $conn->_id;
+
+ $timer_id = Object::Remote->current_loop->watch_time(
+ every => $self->watchdog_timeout / 5,
+ code => sub {
+ unless(defined($conn)) {
+ log_trace { "Weak reference to connection in Watchdog was lost, terminating update timer $timer_id" };
+ Object::Remote->current_loop->unwatch_time($timer_id);
+ return;
+ }
+
+ Dlog_debug { "Reseting Watchdog for connection id $_" } $conn->_id;
+ #we do not want to block in the run loop so send the
+ #update off and ignore any result, we don't need it
+ #anyway
+ $conn->send_class_call(0, 'Object::Remote::WatchDog', 'reset');
+ }
+ );
+
+ $conn->on_close->on_done(sub { Object::Remote->current_loop->unwatch_time($timer_id) });
+}
+
sub fatnode_text {
my ($self) = @_;
- require Object::Remote::FatNode;
my $text = '';
+
+ require Object::Remote::FatNode;
+
+ $text = "my \$WATCHDOG_TIMEOUT = '" . $self->watchdog_timeout . "';\n";
+
+ if (my $duration = $self->watchdog_timeout) {
+ $text .= "alarm(\$WATCHDOG_TIMEOUT);\n";
+ }
+
$text .= 'BEGIN { $ENV{OBJECT_REMOTE_DEBUG} = 1 }'."\n"
if $ENV{OBJECT_REMOTE_DEBUG};
$text .= <<'END';
eval $Object::Remote::FatNode::DATA;
die $@ if $@;
END
+
$text .= "__END__\n";
return $text;
}
--- /dev/null
+package Object::Remote::WatchDog;
+
+use Object::Remote::MiniLoop;
+use Object::Remote::Logging qw ( :log :dlog );
+use Moo;
+
+BEGIN {
+ $SIG{ALRM} = sub {
+ #if the Watchdog is killing the process we don't want any chance of the
+ #process not actually exiting and die could be caught by an eval which
+ #doesn't do us any good
+ log_error { sprintf("Watchdog has expired, terminating the process at file %s line %s", __FILE__, __LINE__ + 1); };
+ exit(1);
+ };
+};
+
+has timeout => ( is => 'ro', required => 1 );
+
+around new => sub {
+ my ($orig, $self, @args) = @_;
+ our ($WATCHDOG);
+
+ return $WATCHDOG if defined $WATCHDOG;
+ log_trace { "Constructing new instance of global watchdog" };
+ return $WATCHDOG = $self->$orig(@args);
+};
+
+#start the watchdog
+sub BUILD {
+ my ($self) = @_;
+ Dlog_debug { "Initializing watchdog with timeout of $_ seconds" } $self->timeout;
+ alarm($self->timeout);
+}
+
+#invoke at least once per timeout to stop
+#the watchdog from killing the process
+sub reset {
+ our ($WATCHDOG);
+ die "Attempt to reset the watchdog before it was constructed"
+ unless defined $WATCHDOG;
+
+ log_trace { "Watchdog has been reset" };
+ alarm($WATCHDOG->timeout);
+}
+
+#must explicitly call this method to stop the
+#watchdog from killing the process - if the
+#watchdog is lost because it goes out of scope
+#it makes sense to still terminate the process
+sub shutdown {
+ my ($self) = @_;
+ alarm(0);
+}
+
+1;
+
+