=head1 KNOWN ISSUES
-=over 4
+=over 4
=item Large data structures
=item Deadlocks
-Deadlocks can happen quite easily because of flaws in programs that use Object::Remote or
+Deadlocks can happen quite easily because of flaws in programs that use Object::Remote or
Object::Remote itself so the C<Object::Remote::WatchDog> is available. When used the run
loop will periodically update the watch dog object on the remote Perl interpreter. If the
watch dog goes longer than the configured interval with out being updated then it will
END {
our %child_pids;
-
+
log_trace { "END handler is being invoked in " . __PACKAGE__ };
-
+
foreach(keys(%child_pids)) {
log_debug { "Killing child process '$_'" };
kill('TERM', $_);
is => 'ro', required => 1,
trigger => sub {
my ($self, $ch) = @_;
- my $id = $self->_id;
+ my $id = $self->_id;
Dlog_trace { "trigger for read_channel has been invoked for connection $id; file handle is $_" } $ch->fh;
weaken($self);
$ch->on_line_call(sub { $self->_receive(@_) });
- $ch->on_close_call(sub {
+ $ch->on_close_call(sub {
log_trace { "invoking 'done' on on_close handler for connection id '$id'" };
$self->on_close->done(@_);
});
sub is_valid {
my ($self) = @_;
my $valid = ! $self->on_close->is_ready;
-
+
log_trace {
my $id = $self->_id;
my $text;
}
"Connection '$id' is valid: '$text'"
};
-
+
return $valid;
}
sub _fail_outstanding {
my ($self, $error) = @_;
my $outstanding = $self->outstanding_futures;
-
- Dlog_debug {
+
+ Dlog_debug {
sprintf "Failing %i outstanding futures with '$error'", scalar(keys(%$outstanding))
};
"Remote Perl interpreter exited with value '$exit_value'"
};
}
-
+
delete $child_pids{$pid};
});
- return $f;
+ return $f;
};
sub _id_to_remote_object {
}
sub _load_if_possible {
- my ($class) = @_;
+ my ($class) = @_;
- use_module($class);
+ use_module($class);
if ($@) {
log_debug { "Attempt at loading '$class' failed with '$@'" };
Object::Remote::Connector::LocalSudo
Object::Remote::Connector::SSH
Object::Remote::Connector::UNIX
- );
+ );
}
sub conn_from_spec {
return $conn;
}
}
-
+
return undef;
}
my ($class, $spec, @args) = @_;
return $spec if blessed $spec;
my $conn = $class->conn_from_spec($spec, @args);
-
+
die "Couldn't figure out what to do with ${spec}"
unless defined $conn;
-
- return $conn->maybe::start::connect;
+
+ return $conn->maybe::start::connect;
}
sub remote_object {
sub _send {
my ($self, $to_send) = @_;
my $fh = $self->send_to_fh;
-
+
unless ($self->is_valid) {
croak "Attempt to invoke _send on a connection that is not valid";
}
-
+
Dlog_trace { "Starting to serialize data in argument to _send for connection $_" } $self->_id;
my $serialized = $self->_serialize($to_send)."\n";
Dlog_trace { my $l = length($serialized); "serialization is completed; sending '$l' characters of serialized data to $_" } $fh;
- my $ret;
- eval {
+ my $ret;
+ eval {
#TODO this should be converted over to a non-blocking ::WriteChannel class
die "filehandle is not open" unless openhandle($fh);
log_trace { "file handle has passed openhandle() test; printing to it" };
$ret = print $fh $serialized;
die "print was not successful: $!" unless defined $ret
};
-
+
if ($@) {
Dlog_debug { "exception encountered when trying to write to file handle $_: $@" } $fh;
my $error = $@;
chomp($error);
$self->on_close->done("could not write to file handle: $error") unless $self->on_close->is_ready;
- return;
+ return;
}
-
- return $ret;
+
+ return $ret;
}
sub _serialize {
if ($ref eq 'HASH') {
my $tied_to = tied(%$data);
if(defined($tied_to)) {
- return +{__remote_tied_hash__ => $self->_local_object_to_id($tied_to)};
+ return +{__remote_tied_hash__ => $self->_local_object_to_id($tied_to)};
} else {
return +{ map +($_ => $self->_deobjectify($data->{$_})), keys %$data };
}
} elsif ($ref eq 'ARRAY') {
my $tied_to = tied(@$data);
if (defined($tied_to)) {
- return +{__remote_tied_array__ => $self->_local_object_to_id($tied_to)};
+ return +{__remote_tied_array__ => $self->_local_object_to_id($tied_to)};
} else {
return [ map $self->_deobjectify($_), @$data ];
}
Object::Remote::Connection - An underlying connection for L<Object::Remote>
use Object::Remote;
-
+
my %opts = (
nice => '10', ulimit => '-v 400000',
watchdog_timeout => 120, stderr => \*STDERR,
);
-
+
my $local = Object::Remote->connect('-');
my $remote = Object::Remote->connect('myserver', nice => 5);
my $remote_user = Object::Remote->connect('user@myserver', %opts);
my $local_sudo = Object::Remote->connect('user@');
-
+
#$remote can be any other connection object
my $hostname = Sys::Hostname->can::on($remote, 'hostname');
-
+
=head1 DESCRIPTION
This is the class that supports connections to a Perl interpreter that is executed in a
$f->on_ready(sub { undef($c) });
log_trace { "marking the future as done" };
$c->ready_future->done;
- Dlog_trace { "Sending 'Shere' to socket $_" } $new;
+ Dlog_trace { "Sending 'Shere' to socket $_" } $new;
print $new "Shere\n" or die "Couldn't send to new socket: $!";
log_debug { "Connection has been fully handled" };
return $c;
push @Object::Remote::Connection::Guess, sub {
if (($_[0]||'') eq '-') {
- shift(@_);
- __PACKAGE__->new(@_);
+ shift(@_);
+ __PACKAGE__->new(@_);
}
};
package Object::Remote::Connector::LocalSudo;
-
+
use Object::Remote::Logging qw (:log :dlog);
use Symbol qw(gensym);
use Module::Runtime qw(use_module);
if (sysread($sudo_stderr, my $buf, 32768) > 0) {
log_trace { "LocalSudo: successfully read data, printing it to STDERR" };
print STDERR $buf;
- log_trace { "LocalSudo: print() to STDERR is done" };
+ log_trace { "LocalSudo: print() to STDERR is done" };
} else {
log_debug { "LocalSudo: received EOF or error on file handle, unwatching it" };
Object::Remote->current_loop
sub _build_ssh_perl_command {
my ($self) = @_;
- my $perl_command = $self->perl_command;
+ my $perl_command = $self->perl_command;
return [
do { my $c = $self->ssh_command; ref($c) ? @$c : $c },
no warnings 'once';
-push @Object::Remote::Connection::Guess, sub {
+push @Object::Remote::Connection::Guess, sub {
for ($_[0]) {
# 0-9 a-z _ - first char, those or . subsequent - hostnamish
if (defined and !ref and /^(?:.*?\@)?[\w\-][\w\-\.]/) {
no warnings 'once';
-push @Object::Remote::Connection::Guess, sub {
+push @Object::Remote::Connection::Guess, sub {
for ($_[0]) {
if (defined and !ref and /^(?:\.\/|\/)/) {
my $socket = shift(@_);
foreach(keys(%mods)) {
if ($exclude{ $mods{$_} }) {
- delete($mods{$_});
+ delete($mods{$_});
}
}
push @non_core_non_arch, grep +(
not (
#some of the config variables can be empty which will eval as a matching regex
- $Config{privlibexp} ne '' && /^\Q$Config{privlibexp}/
+ $Config{privlibexp} ne '' && /^\Q$Config{privlibexp}/
or $Config{archlibexp} ne '' && /^\Q$Config{archlibexp}/
or $Config{vendorarchexp} ne '' && /^\Q$Config{vendorarchexp}/
or $Config{sitearchexp} ne '' && /^\Q$Config{sitearchexp}/
if (my $fat = $_[0]->{$_[1]}) {
if ($exclude{$_[1]}) {
warn "Will not pre-load '$_[1]'";
- return undef;
+ return undef;
}
-
+
#warn "Handling $_[1]";
open my $fh, '<', \$fat;
return $fh;
}
-
+
#Uncomment this to find brokenness
#warn "Missing $_[1]";
return;
use strictures 1;
use Object::Remote::Node;
-
+
unless ($Object::Remote::FatNode::INHIBIT_RUN_NODE) {
- Object::Remote::Node->run(watchdog_timeout => $WATCHDOG_TIMEOUT);
+ Object::Remote::Node->run(watchdog_timeout => $WATCHDOG_TIMEOUT);
}
-
+
END_END
my %files = map +($mods{$_} => scalar do { local (@ARGV, $/) = ($_); <> }),
log_trace { my $l = @await; "future has become ready, length of \@await: '$l'" };
if ($f == $await[-1]) {
log_trace { "This future is not waiting on anything so calling stop on the run loop" };
- $loop->stop;
+ $loop->stop;
}
});
log_trace { "Starting run loop for newly created future" };
my ($self, $method, @args) = @_;
my $w = wantarray;
my $id = $self->id;
-
+
$method = "start::${method}" if (caller(0)||'') eq 'start';
log_trace { "call('$method') has been invoked on remote handle '$id'; creating future" };
$DID_INIT = 1;
init_logging();
}
-
+
$class->SUPER::before_import($importer, $spec);
}
sub _parse_selections {
my ($selections_string) = @_;
my %log_ok;
-
+
#example string:
#" * -Object::Remote::Logging Foo::Bar::Baz "
foreach(split(/\s+/, $selections_string)) {
$log_ok{$_} = 1;
}
}
-
+
return %log_ok;
}
my $selections = $ENV{OBJECT_REMOTE_LOG_SELECTIONS};
my $test_logging = $ENV{OBJECT_REMOTE_TEST_LOGGER};
my %controller_should_log;
-
+
unless (defined $ENV{OBJECT_REMOTE_LOG_FORWARDING} && $ENV{OBJECT_REMOTE_LOG_FORWARDING} ne '') {
$ENV{OBJECT_REMOTE_LOG_FORWARDING} = 0;
}
-
+
if ($test_logging) {
require Object::Remote::Logging::TestLogger;
router->connect(Object::Remote::Logging::TestLogger->new(
#the connection id for the remote node comes in later
#as the controlling node inits remote logging
router()->_remote_metadata({ connection_id => undef });
- }
+ }
}
return unless defined $level && $level ne '';
-
+
$format = "[%l %r] %s" unless defined $format;
$selections = __PACKAGE__ unless defined $selections;
%controller_should_log = _parse_selections($selections);
level_names => Object::Remote::Logging::arg_levels(),
);
- router()->connect(sub {
+ router()->connect(sub {
my $controller = $_[1]->{exporter};
my $will_log = $controller_should_log{$controller};
my $remote_info = $_[1]->{object_remote};
-
+
$will_log = $controller_should_log{'*'} unless defined $will_log;
-
+
return unless $will_log;
#skip things from remote hosts because they log to STDERR
#when OBJECT_REMOTE_LOG_LEVEL is in effect
#on the remote nodes
sub init_remote_logging {
my ($self, %controller_info) = @_;
-
+
router()->_remote_metadata(\%controller_info);
router()->_forward_destination($controller_info{router}) if $ENV{OBJECT_REMOTE_LOG_FORWARDING};
}
=head1 SYNOPSIS
use Object::Remote::Logging qw( :log :dlog arg_levels router );
-
+
@levels = qw( trace debug verbose info warn error fatal );
@levels = arg_levels(); #same result
-
+
$ENV{OBJECT_REMOTE_LOG_LEVEL} = 'trace'; #or other level name
$ENV{OBJECT_REMOTE_LOG_FORMAT} = '%l %t: %p::%m %s'; #and more
$ENV{OBJECT_REMOTE_LOG_SELECTIONS} = 'Object::Remote::Logging Some::Other::Subclass';
$ENV{OBJECT_REMOTE_LOG_SELECTIONS} = '* -Object::Remote::Logging';
$ENV{OBJECT_REMOTE_LOG_FORWARDING} = 0; #default 1
-
+
log_info { 'Trace log event' };
Dlog_verbose { "Debug event with Data::Dumper::Concise: $_" } { foo => 'bar' };
=head1 DESCRIPTION
This is the logging framework for Object::Remote implemented as a subclass of
-L<Log::Contextual> with a slightly incompatible API. This system allows
+L<Log::Contextual> with a slightly incompatible API. This system allows
developers using Object::Remote and end users of that software to control
Object::Remote logging so operation can be tracked if needed. This is also
the API used to generate log messages inside the Object::Remote source code.
Object::Remote logging is not enabled by default. If you need to immediately start
debugging set the OBJECT_REMOTE_LOG_LEVEL environment variable to either 'trace'
-or 'debug'. This will enable logging to STDERR on the local and all remote Perl
+or 'debug'. This will enable logging to STDERR on the local and all remote Perl
interpreters. By default STDERR for all remote interpreters is passed through
unmodified so this is sufficient to receive logs generated anywhere Object::Remote
is running.
id and other metadata is available in the log output via a log format string that can
be set via the OBJECT_REMOTE_LOG_FORMAT environment variable. The format string and
available metadata is documented in L<Object::Remote::Logging::Logger>. Setting this
-environment variable on the local interpreter will cause it to be propagated to the
+environment variable on the local interpreter will cause it to be propagated to the
remote interpreter so all logs will be formated the same way.
This class is designed so any module can create their own logging sub-class using it.
With out any additional configuration the consumers of this logging class will
-automatically be enabled via OBJECT_REMOTE_LOG_LEVEL and formated with
-OBJECT_REMOTE_LOG_FORMAT but those additional log messages are not sent to STDERR.
+automatically be enabled via OBJECT_REMOTE_LOG_LEVEL and formated with
+OBJECT_REMOTE_LOG_FORMAT but those additional log messages are not sent to STDERR.
By setting the OBJECT_REMOTE_LOG_SELECTIONS environment variable to a list of
class names seperated by spaces then logs generated by packages that use those classes
will be sent to STDERR. If the asterisk character (*) is used in the place of a class
=item log_<level> and Dlog_<level>
-These methods come direct from L<Log::Contextual>; see that documentation for a
+These methods come direct from L<Log::Contextual>; see that documentation for a
complete reference. For each of the log level names there are subroutines with the log_
and Dlog_ prefix that will generate the log message. The first argument is a code block
that returns the log message contents and the optional further arguments are both passed
=item debug
-Messages about operations that could hang as well as internal state changes,
+Messages about operations that could hang as well as internal state changes,
results from method invocations, and information useful when looking for faults.
Double verbose operation (-v -v).
my $generator;
my $log_contextual_level;
our %LEVEL_NAME_MAP;
-
+
#just a proof of concept - support for the is_ methods can
#be done but requires modifications to the router
return 1 if $log_level =~ m/^is_/;
#skip DESTROY and friends
return if $log_level =~ m/^[A-Z]+$/;
-
+
if ($log_contextual_level = $LEVEL_NAME_MAP{$log_level}) {
$generator = sub { @content };
} elsif(($log_level =~ s/f$//) && ($log_contextual_level = $LEVEL_NAME_MAP{$log_level})) {
} else {
croak "invalid log level: $log_level";
}
-
+
router->handle_log_request({
- controller => 'Log::Any',
+ controller => 'Log::Any',
package => scalar(caller),
caller_level => 1,
level => $log_contextual_level,
}, $generator);
-
+
return;
}
$self->_log($level_name, @_);
};
-
+
return $self->$level_name(@_);
}
}
sub _build__level_active {
- my ($self) = @_;
+ my ($self) = @_;
my $should_log = 0;
my $min_level = $self->min_level;
my $max_level = $self->max_level;
my %active;
-
+
foreach my $level (@{$self->level_names}) {
if($level eq $min_level) {
- $should_log = 1;
+ $should_log = 1;
}
$active{$level} = $should_log;
-
+
if (defined $max_level && $level eq $max_level) {
$should_log = 0;
}
sub _create_format_lookup {
my ($self, $level, $metadata, $content) = @_;
my $method = $metadata->{method};
-
+
$method = '(none)' unless defined $method;
-
- return {
+
+ return {
'%' => '%', 'n' => "\n",
t => $self->_render_time($metadata->{timestamp}),
r => $self->_render_remote($metadata->{object_remote}),
- s => $self->_render_log(@$content), l => $level,
+ s => $self->_render_log(@$content), l => $level,
c => $metadata->{exporter}, p => $metadata->{caller_package}, m => $method,
- f => $metadata->{filename}, i => $metadata->{line},
+ f => $metadata->{filename}, i => $metadata->{line},
h => $metadata->{hostname}, P => $metadata->{pid},
};
}
my ($self, $level, $metadata, @content) = @_;
my $var_table = $self->_create_format_lookup($level, $metadata, [@content]);
my $template = $self->format;
-
+
$template =~ s/%([\w%])/$self->_get_format_var_value($1, $var_table)/ge;
-
+
chomp($template);
$template =~ s/\n/\n /g;
$template .= "\n";
use Object::Remote::Logging::Logger;
use Object::Remote::Logging qw( router arg_levels );
-
+
my $app_output = Object::Remote::Logging::Logger->new(
level_names => arg_levels, format => '%t %s',
min_level => 'verbose', max_level => 'info',
#disconnect the selector from the router
undef($selector);
-
+
#router will hold this logger forever
#and send it all log messages
router->connect(Object::Remote::Logging::Logger->new(
A logger object receives the log messages that are generated and converts them to
formatted log entries then displays them to the end user. Each logger has a set
of active log levels and will only output a log entry if the log message is at
-an active log level.
+an active log level.
To gain access to the stream of log messages a connection is made to the log router.
A logger can directly connect to the router and receive an unfiltered stream of
=item level_names
This is a required attribute. Must be an array ref with the list of log level names
-in it. The list must be ordered with the lowest level as element 0 and the highest
+in it. The list must be ordered with the lowest level as element 0 and the highest
level as the last element. There is no default value.
=item min_level
=item %s
-Log message rendered into a string with a leading space before any additional lines in a
+Log message rendered into a string with a leading space before any additional lines in a
multiple line message.
=item %t
$self->_clean_connections if $need_clean;
- return @loggers;
+ return @loggers;
}
#overloadable so a router can invoke a logger
my $generator = $message_info{message_sub};
my $args = $message_info{message_args};
my $level = $message_info{message_level};
-
+
return unless @loggers > 0;
#this is the point where the user provided log message code block is executed
my @content = $generator->(@$args);
my @caller_info = caller($caller_level);
$message_info{filename} = $caller_info[1];
$message_info{line} = $caller_info[2];
-
+
@caller_info = caller($caller_level + 1);
$message_info{method} = $caller_info[3];
$message_info{method} =~ s/^${package}::// if defined $message_info{method};
sub connect {
my ($self, $destination, $is_weak) = @_;
- my $wrapped;
+ my $wrapped;
if (ref($destination) ne 'CODE') {
$wrapped = sub { $destination };
use Object::Remote::Logging qw( :log :dlog router );
use Moo;
-BEGIN {
+BEGIN {
$SIG{PIPE} = sub { log_debug { "Got a PIPE signal" } };
-
+
router()->exclude_forwarding
}
my ($self, %watch) = @_;
my $fh = $watch{handle};
Dlog_debug { "Adding IO watch for $_" } $fh;
-
+
if (my $cb = $watch{on_read_ready}) {
log_trace { "IO watcher is registering with select for reading" };
$self->_read_select->add($fh);
sub _sort_timers {
my ($self, @new) = @_;
- my $timers = $self->_timers;
-
+ my $timers = $self->_timers;
+
log_trace { "Sorting timers" };
-
+
@{$timers} = sort { $a->[0] <=> $b->[0] } @{$timers}, @new;
- return;
+ return;
}
sub watch_time {
my ($self, %watch) = @_;
- my $at;
-
+ my $at;
+
Dlog_trace { "watch_time() invoked with $_" } \%watch;
-
+
if (exists($watch{every})) {
$at = time() + $watch{every};
} elsif (exists($watch{after})) {
- $at = time() + $watch{after};
+ $at = time() + $watch{after};
} elsif (exists($watch{at})) {
- $at = $watch{at};
+ $at = $watch{at};
} else {
die "watch_time requires every, after or at";
}
-
+
die "watch_time requires code" unless my $code = $watch{code};
my $timers = $self->_timers;
my $new = [ $at => $code, $watch{every} ];
- $self->_sort_timers($new);
+ $self->_sort_timers($new);
log_debug { "Created new timer with id '$new' that expires at '$at'" };
return "$new";
}
my ($self) = @_;
my $timers = $self->_timers;
my $delay_max = $self->block_duration;
-
+
return $delay_max unless @$timers;
my $duration = $timers->[0]->[0] - time;
log_trace { "next timer fires in '$duration' seconds" };
-
+
if ($duration < 0) {
- $duration = 0;
+ $duration = 0;
} elsif (defined $delay_max && $duration > $delay_max) {
$duration = $delay_max;
}
-
- return $duration;
+
+ return $duration;
}
sub loop_once {
my $read = $self->_read_watches;
my $write = $self->_write_watches;
my $read_count = 0;
- my $write_count = 0;
+ my $write_count = 0;
my @c = caller;
my $wait_time = $self->_next_timer_expires_delay;
log_trace {
};
my ($readable, $writeable) = IO::Select->select(
$self->_read_select, $self->_write_select, undef, $wait_time
- );
- log_trace {
+ );
+ log_trace {
my $readable_count = defined $readable ? scalar(@$readable) : 0;
my $writable_count = defined $writeable ? scalar(@$writeable) : 0;
"Run loop: select returned readable:$readable_count writeable:$writable_count";
#under load
last;
}
-
+
#moving the timers above the read() section exposes a deadlock
log_trace { "Read from $read_count filehandles; wrote to $write_count filehandles" };
my $timers = $self->_timers;
my $now = time();
log_trace { "Checking timers" };
while (@$timers and $timers->[0][0] <= $now) {
- my $active = $timers->[0];
+ my $active = $timers->[0];
Dlog_trace { "Found timer that needs to be executed: '$active'" };
-
+
if (defined($active->[2])) {
#handle the case of an 'every' timer
$active->[0] = time() + $active->[2];
Dlog_trace { "scheduling timer for repeat execution at $_"} $active->[0];
$self->_sort_timers;
} else {
- #it doesn't repeat again so get rid of it
+ #it doesn't repeat again so get rid of it
shift(@$timers);
}
#execute the timer
$active->[1]->();
}
-
+
log_trace { "Run loop: single loop is completed" };
return;
}
my ($self) = @_;
if (! $self->{want_running}) {
log_debug { "Run loop: want_stop() was called but want_running was not true" };
- return;
+ return;
}
Dlog_debug { "Run loop: decrimenting want_running, is now $_" }
--$self->{want_running};
log_debug { "Constructing module builder hook" };
my $hook = Object::Remote::ModuleLoader::Hook->new(sender => $self->module_sender);
log_trace { "Done constructing module builder hook" };
- return $hook;
+ return $hook;
}
sub BUILD { shift->enable }
use CPS::Future;
sub run {
- my ($class, %args) = @_;
+ my ($class, %args) = @_;
log_trace { "run() has been invoked on remote node" };
-
+
my $c = Object::Remote::Connector::STDIO->new->connect;
-
+
$c->register_class_call_handler;
my $loop = Object::Remote->current_loop;
-
- $c->on_close->on_ready(sub {
+
+ $c->on_close->on_ready(sub {
log_debug { "Node connection with call handler has closed" };
- $loop->want_stop
+ $loop->want_stop
});
Dlog_trace { "Node is sending 'Shere' to $_" } $c->send_to_fh;
log_debug { "Node is going to start the run loop" };
#TODO the alarm should be reset after the run loop starts
#at a minimum - the remote side node should probably send
- #a command that clears the alarm in all instances - even
- #if the Object::Remote::Watchdog is not being used
+ #a command that clears the alarm in all instances - even
+ #if the Object::Remote::Watchdog is not being used
if ($args{watchdog_timeout}) {
Object::Remote::WatchDog->new(timeout => $args{watchdog_timeout});
} else {
if ((caller(0)||'') eq 'start') {
$to_fire = "start::${to_fire}";
}
-
+
unless ($self->{remote}->is_valid) {
croak "Attempt to use Object::Remote::Proxy backed by an invalid handle";
}
-
+
$self->{remote}->$to_fire($method => @_);
}
my ($self, $gd) = @_;
return if $gd;
log_trace { "read channel is being demolished" };
-
+
Object::Remote->current_loop
->unwatch_io(
handle => $self->fh,
on_read_ready => 1
);
-
-
}
1;
$f->fail("Connection timed out") unless $f->is_ready;
}
undef($channel);
-
+
}
);
- log_trace { "connection for child pid '$child_pid' has been initialized" };
+ log_trace { "connection for child pid '$child_pid' has been initialized" };
$f;
}
}
package Object::Remote::Role::Connector::PerlInterpreter;
-use IPC::Open3;
+use IPC::Open3;
use IO::Handle;
-use Symbol;
+use Symbol;
use Object::Remote::Logging qw(:log :dlog router);
use Object::Remote::ModuleSender;
use Object::Remote::Handle;
#By policy object-remote does not invoke a shell
sub _build_perl_command {
my $perl_bin = 'perl';
-
+
if (exists $ENV{OBJECT_REMOTE_PERL_BIN}) {
$perl_bin = $ENV{OBJECT_REMOTE_PERL_BIN};
}
my $self = shift;
my $given_stderr = $self->stderr;
my $foreign_stderr;
-
+
Dlog_verbose {
s/\n/ /g; "invoking connection to perl interpreter using command line: $_"
} @{$self->final_perl_command};
-
+
if (defined($given_stderr)) {
#if the stderr data goes to an existing file handle
#an anonymous file handle is required
#the child stderr to the parent stderr
$foreign_stderr = ">&STDERR";
}
-
+
my $pid = open3(
my $foreign_stdin,
my $foreign_stdout,
$foreign_stderr,
@{$self->final_perl_command},
) or die "Failed to run perl at '$_[0]': $!";
-
+
$self->_set_pid($pid);
-
- if (defined($given_stderr)) {
+
+ if (defined($given_stderr)) {
Dlog_debug { "Child process STDERR is being handled via run loop" };
-
+
Object::Remote->current_loop
->watch_io(
handle => $foreign_stderr,
on_read_ready => sub {
- my $buf = '';
+ my $buf = '';
my $len = sysread($foreign_stderr, $buf, 32768);
if (!defined($len) or $len == 0) {
log_trace { "Got EOF or error on child stderr, removing from watcher" };
Dlog_trace { "got $len characters of stderr data for connection" };
print $given_stderr $buf or die "could not send stderr data: $!";
}
- }
- );
+ }
+ );
}
-
+
return ($foreign_stdin, $foreign_stdout, $pid);
}
sub _setup_watchdog_reset {
my ($self, $conn) = @_;
- my $timer_id;
-
+ my $timer_id;
+
return unless $self->watchdog_timeout;
-
+
Dlog_trace { "Creating Watchdog management timer for connection id $_" } $conn->_id;
-
+
weaken($conn);
-
+
$timer_id = Object::Remote->current_loop->watch_time(
every => $self->watchdog_timeout / 3,
code => sub {
Object::Remote->current_loop->unwatch_time($timer_id);
return;
}
-
+
unless($conn->is_valid) {
log_warn { "Watchdog timer found an invalid connection, removing the timer" };
Object::Remote->current_loop->unwatch_time($timer_id);
return;
}
-
+
Dlog_trace { "Reseting Watchdog for connection id $_" } $conn->_id;
#we do not want to block in the run loop so send the
#update off and ignore any result, we don't need it
$conn->send_class_call(0, 'Object::Remote::WatchDog', 'reset');
}
);
-
+
$conn->on_close->on_ready(sub {
log_debug { "Removing watchdog for connection that is now closed" };
Object::Remote->current_loop->unwatch_time($timer_id);
my $text = '';
require Object::Remote::FatNode;
-
+
if (defined($connection_timeout)) {
$text .= "alarm($connection_timeout);\n";
}
-
+
if (defined($watchdog_timeout)) {
$text .= "my \$WATCHDOG_TIMEOUT = $watchdog_timeout;\n";
} else {
$text .= "my \$WATCHDOG_TIMEOUT = undef;\n";
}
-
+
$text .= $self->_create_env_forward(@{$self->forward_env});
#Action at a distance but at least it's not spooky - the logging
#setup on the remote side yet so this flag allows a graceful
#degredation to happen
$text .= '$Object::Remote::FatNode::REMOTE_NODE = "1";' . "\n";
-
+
$text .= <<'END';
$INC{'Object/Remote/FatNode.pm'} = __FILE__;
$Object::Remote::FatNode::DATA = <<'ENDFAT';
eval $Object::Remote::FatNode::DATA;
die $@ if $@;
END
-
+
$text .= "__END__\n";
return $text;
}
my $package = $message_info{caller_package};
my $destination = $self->_forward_destination;
our $reentrant;
-
+
if (defined $message_info{object_remote}) {
$message_info{object_remote} = { %{$message_info{object_remote}} };
}
-
+
$message_info{object_remote}->{forwarded} = 1;
return unless $self->enable_forward;
warn "log forwarding went reentrant. bottom: '$reentrant' top: '$package'";
return;
}
-
+
local $reentrant = $package;
-
+
eval { $destination->_deliver_message(%message_info) };
-
+
if ($@ && $@ !~ /^Attempt to use Object::Remote::Proxy backed by an invalid handle/) {
die $@;
}
package Object::Remote::Tied;
-use strictures 1;
+use strictures 1;
#a proxied tied object just ties to the
#proxy object that exists on the remote
#side of the actual tied variable - when
#creating the remote tied variable the proxy
-#is passed to the constructor
+#is passed to the constructor
sub TIEHASH {
return $_[1];
return $_[1];
}
-
1;
-package Object::Remote::WatchDog;
+package Object::Remote::WatchDog;
-use Object::Remote::MiniLoop;
+use Object::Remote::MiniLoop;
use Object::Remote::Logging qw (:log :dlog router);
-use Moo;
+use Moo;
has timeout => ( is => 'ro', required => 1 );
BEGIN { router()->exclude_forwarding; }
around new => sub {
- my ($orig, $self, @args) = @_;
+ my ($orig, $self, @args) = @_;
our ($WATCHDOG);
-
+
return $WATCHDOG if defined $WATCHDOG;
log_trace { "Constructing new instance of global watchdog" };
return $WATCHDOG = $self->$orig(@args);
#start the watchdog
sub BUILD {
my ($self) = @_;
-
+
$SIG{ALRM} = sub {
#if the Watchdog is killing the process we don't want any chance of the
#process not actually exiting and die could be caught by an eval which
- #doesn't do us any good
+ #doesn't do us any good
log_fatal { "Watchdog has expired, terminating the process" };
exit(1);
- };
-
+ };
+
Dlog_debug { "Initializing watchdog with timeout of $_ seconds" } $self->timeout;
alarm($self->timeout);
}
#invoke at least once per timeout to stop
-#the watchdog from killing the process
+#the watchdog from killing the process
sub reset {
our ($WATCHDOG);
die "Attempt to reset the watchdog before it was constructed"
- unless defined $WATCHDOG;
-
+ unless defined $WATCHDOG;
+
log_debug { "Watchdog has been reset" };
- alarm($WATCHDOG->timeout);
+ alarm($WATCHDOG->timeout);
}
#must explicitly call this method to stop the
sub shutdown {
my ($self) = @_;
log_debug { "Watchdog is shutting down" };
- alarm(0);
+ alarm(0);
}
1;
my ($self) = @_;
$self->feedback_output(undef);
$self->feedback_input(undef);
-
+
ok(! defined $self->feedback_output && ! defined $self->feedback_input, 'Reset successful');
}
sub _log {
my $self = shift;
-
+
$self->feedback_input([@_]);
-
+
$self->SUPER::_log(@_);
}
package ORTestTiedRemote;
-use Moo;
+use Moo;
use Tie::Array;
-use Tie::Hash;
+use Tie::Hash;
has hash => ( is => 'ro', builder => 1 );
has array => ( is => 'ro', builder => 1 );
tie(my %hash, 'Tie::StdHash');
%hash = ( akey => 'a value');
return \%hash;
-}
+}
sub _build_array {
tie(my @array, 'Tie::StdArray');
sub sum_array {
my ($self) = @_;
my $sum = 0;
-
+
foreach(@{$self->array}) {
$sum += $_;
- }
-
- return $sum;
+ }
+
+ return $sum;
}
sub sum_hash {
- my ($self) = @_;
+ my ($self) = @_;
my $sum = 0;
-
+
foreach(values(%{$self->hash})) {
$sum += $_;
}
-
- return $sum;
+
+ return $sum;
}
1;
router->connect($selector, 1);
log_info { "Test message" };
return [$logger->feedback_output, __LINE__ - 1];
-
}
my $controller_name = 'Test::Log::Controller';
my $generator = sub { "Generator output" };
my %metadata = (
- exporter => $controller_name,
+ exporter => $controller_name,
caller_package => __PACKAGE__, caller_level => 0,
message_level => 'test1', message_sub => $generator, message_args => [],
);
$ENV{OBJECT_REMOTE_TEST_LOGGER} = 1;
-use Object::Remote::Connector::Local;
-use Object::Remote::Connector::SSH;
+use Object::Remote::Connector::Local;
+use Object::Remote::Connector::SSH;
my $defaults = Object::Remote::Connector::Local->new;
my $normal = $defaults->final_perl_command;
is_deeply($normal, ['perl', '-'], 'Default Perl interpreter arguments correct');
is_deeply($ssh, [qw(ssh -A testhost), "perl -"], "Arguments using ssh are correct");
-done_testing;
+done_testing;
use lib 't/lib';
use Tie::Array;
-use Tie::Hash;
+use Tie::Hash;
$ENV{OBJECT_REMOTE_TEST_LOGGER} = 1;
-use Object::Remote;
+use Object::Remote;
use ORTestTiedRemote;
my @test_data = qw(1 5 10 30 80);
isa_ok($remote, 'Object::Remote::Proxy');
my $remote_array = $remote->array;
-my $remote_hash = $remote->hash;
+my $remote_hash = $remote->hash;
is(ref($remote_array), 'ARRAY', 'Array ref is array ref');
is(ref(tied(@$remote_array)), 'Object::Remote::Proxy', 'Array is tied to proxy object');
%$remote_hash = ();
do { my $i = 0; map { $remote_hash->{++$i} = $_ } @test_data };
-is($remote->sum_hash, $test_sum, 'Sum of hash values matches sum of test data');
+is($remote->sum_hash, $test_sum, 'Sum of hash values matches sum of test data');
done_testing;
-
-
\ No newline at end of file
$ENV{OBJECT_REMOTE_TEST_LOGGER} = 1;
use Object::Remote::Connection;
-use Object::Remote::FromData;
+use Object::Remote::FromData;
$SIG{ALRM} = sub { fail("Watchdog killed remote process in time"); die "test failed" };
eval { $remote->hang };
-like($@, qr/^Object::Remote connection lost: eof/, "Correct error message");
+like($@, qr/^Object::Remote connection lost: eof/, "Correct error message");
-done_testing;
+done_testing;
__DATA__
use Moo;
sub alive {
- return 1;
+ return 1;
}
sub hang {
while(1) {
- sleep(1);
+ sleep(1);
}
}
$ENV{OBJECT_REMOTE_TEST_LOGGER} = 1;
-use Object::Remote::Connector::Local;
+use Object::Remote::Connector::Local;
$SIG{ALRM} = sub { die "alarm signal\n" };
#if it's not handled right
eval {
no warnings 'once';
- $Object::Remote::FatNode::INHIBIT_RUN_NODE = 1;
+ $Object::Remote::FatNode::INHIBIT_RUN_NODE = 1;
eval $fatnode_text;
-
+
if ($@) {
die "could not eval fatnode text: $@";
- }
-
+ }
+
while(1) {
sleep(1);
}
is($@, "alarm signal\n", "Alarm handler was invoked");
-done_testing;
+done_testing;