sub new::on {
my ($class, $on, @args) = @_;
my $conn = __PACKAGE__->connect($on);
- log_debug { sprintf("constructing instance of $class on connection for child pid of %i", $conn->child_pid) };
+ log_trace { sprintf("constructing instance of $class on connection for child pid of %i", $conn->child_pid) };
return $conn->remote_object(class => $class, args => \@args);
}
sub can::on {
my ($class, $on, $name) = @_;
my $conn = __PACKAGE__->connect($on);
+ log_trace { "Invoking remote \$class->can('$name')" };
return $conn->remote_sub(join('::', $class, $name));
}
has subscriptions => ( is => 'ro', required => 1, default => sub { [] } );
sub select {
- my ($self, $router, $selector) = @_;
- my $subscription = $router->subscribe($self->logger, $selector);
- push(@{ $self->subscriptions }, $subscription);
- return $subscription;
+ my ($self, $router, $selector) = @_;
+ my $subscription = $router->subscribe($self->logger, $selector);
+ push(@{ $self->subscriptions }, $subscription);
+ return $subscription;
}
sub connect {
- my ($self, $router) = @_;
- return $self->select($router, sub { 1 });
+ my ($self, $router) = @_;
+ return $self->select($router, sub { 1 });
}
1;
sub before_import { }
sub after_import {
- my ($self, $controller, $importer, $config) = @_;
- my $logger = $controller->arg_logger($config->{logger});
+ my ($self, $controller, $importer, $config) = @_;
+ my $logger = $controller->arg_logger($config->{logger});
# TODO need to review this concept, ignore these configuration values for now
# my $package_logger = $controller->arg_package_logger($config->{package_logger});
}
sub subscribe {
- my ($self, $logger, $selector, $is_temp) = @_;
- my $subscription_list = $self->subscriptions;
+ my ($self, $logger, $selector, $is_temp) = @_;
+ my $subscription_list = $self->subscriptions;
- if(ref $logger ne 'CODE') {
- die 'logger was not a CodeRef or a logger object. Please try again.'
- unless blessed($logger);
- $logger = do { my $l = $logger; sub { $l } }
- }
+ if(ref $logger ne 'CODE') {
+ die 'logger was not a CodeRef or a logger object. Please try again.'
+ unless blessed($logger);
+ $logger = do { my $l = $logger; sub { $l } }
+ }
my $subscription = [ $logger, $selector ];
$is_temp = 0 unless defined $is_temp;
push(@$subscription_list, $subscription);
if ($is_temp) {
- #weaken($subscription->[-1]);
+ #weaken($subscription->[-1]);
}
return $subscription;
}
#TODO turn this logic into a role
sub handle_log_message {
- my ($self, $caller, $level, $log_meth, @values) = @_;
- my $should_clean = 0;
+ my ($self, $caller, $level, $log_meth, @values) = @_;
+ my $should_clean = 0;
- foreach(@{ $self->subscriptions }) {
- unless(defined($_)) {
- $should_clean = 1;
- next;
- }
- my ($logger, $selector) = @$_;
- #TODO this is not a firm part of the api but providing
- #this info to the selector is a good feature
- local($_) = { level => $level, package => $caller };
- if ($selector->(@values)) {
- #TODO resolve caller_level issues with routing
- #idea: the caller level will differ in distance from the
- #start of the call stack but it's a constant distance from
- #the end of the call stack - can that be exploited to calculate
- #the distance from the start right before it's used?
- #
- #newer idea: in order for log4perl to work right the logger
- #must be invoked in the exported log_* method directly
- #so by passing the logger down the chain of routers
- #it can be invoked in that location and the caller level
- #problem doesn't exist anymore
- $logger = $logger->($caller, { caller_level => -1 });
+ foreach(@{ $self->subscriptions }) {
+ unless(defined($_)) {
+ $should_clean = 1;
+ next;
+ }
+ my ($logger, $selector) = @$_;
+ #TODO this is not a firm part of the api but providing
+ #this info to the selector is a good feature
+ local($_) = { level => $level, package => $caller };
+ if ($selector->(@values)) {
+ #TODO resolve caller_level issues with routing
+ #idea: the caller level will differ in distance from the
+ #start of the call stack but it's a constant distance from
+ #the end of the call stack - can that be exploited to calculate
+ #the distance from the start right before it's used?
+ #
+ #newer idea: in order for log4perl to work right the logger
+ #must be invoked in the exported log_* method directly
+ #so by passing the logger down the chain of routers
+ #it can be invoked in that location and the caller level
+ #problem doesn't exist anymore
+ $logger = $logger->($caller, { caller_level => -1 });
- $logger->$level($log_meth->(@values))
- if $logger->${\"is_$level"};
- }
+ $logger->$level($log_meth->(@values))
+ if $logger->${\"is_$level"};
+ }
}
if ($should_clean) {
- $self->_remove_dead_subscriptions;
+ $self->_remove_dead_subscriptions;
}
return;
}
sub _remove_dead_subscriptions {
- my ($self) = @_;
- my @ok = grep { defined $_ } @{$self->subscriptions};
- @{$self->subscriptions} = @ok;
- return;
+ my ($self) = @_;
+ my @ok = grep { defined $_ } @{$self->subscriptions};
+ @{$self->subscriptions} = @ok;
+ return;
}
use base qw(Log::Contextual);
sub arg_router {
- return $_[1] if defined $_[1];
- our $Router_Instance;
+ return $_[1] if defined $_[1];
+ our $Router_Instance;
- return $Router_Instance if defined $Router_Instance;
+ return $Router_Instance if defined $Router_Instance;
- $Router_Instance = Object::Remote::LogRouter->new(
- description => $_[0],
- );
+ $Router_Instance = Object::Remote::LogRouter->new(
+ description => $_[0],
+ );
}
sub init_logging {
- my ($class) = @_;
- our $Did_Init;
+ my ($class) = @_;
+ our $Did_Init;
- return if $Did_Init;
- $Did_Init = 1;
+ return if $Did_Init;
+ $Did_Init = 1;
- if ($ENV{OBJECT_REMOTE_LOG_LEVEL}) {
- $class->init_logging_stderr($ENV{OBJECT_REMOTE_LOG_LEVEL});
- }
+ if ($ENV{OBJECT_REMOTE_LOG_LEVEL}) {
+ $class->init_logging_stderr($ENV{OBJECT_REMOTE_LOG_LEVEL});
+ }
}
sub init_logging_stderr {
- my ($class, $level) = @_;
- our $Log_Level = $level;
- chomp(my $hostname = `hostname`);
- our $Log_Output = Object::Remote::LogDestination->new(
- logger => Log::Contextual::SimpleLogger->new({
- levels_upto => $Log_Level,
- coderef => sub {
- my @t = localtime();
- my $time = sprintf("%0.2i:%0.2i:%0.2i", $t[2], $t[1], $t[0]);
- warn "[$hostname $$] $time ", @_
- },
- })
- );
- $Log_Output->connect($class->arg_router);
+ my ($class, $level) = @_;
+ our $Log_Level = $level;
+ chomp(my $hostname = `hostname`);
+ our $Log_Output = Object::Remote::LogDestination->new(
+ logger => Log::Contextual::SimpleLogger->new({
+ levels_upto => $Log_Level,
+ coderef => sub {
+ my @t = localtime();
+ my $time = sprintf("%0.2i:%0.2i:%0.2i", $t[2], $t[1], $t[0]);
+ warn "[$hostname $$] $time ", @_
+ },
+ })
+ );
+ $Log_Output->connect($class->arg_router);
}
sub init_logging_forwarding {
-# my ($class, $remote_parent) = @_;
-# chomp(my $host = `hostname`);
-# $class->arg_router->description("$$ $host");
-# $class->arg_router->parent_router($remote_parent);
-# $remote_parent->add_child_router($class->arg_router);
+# my ($class, $remote_parent) = @_;
+# chomp(my $host = `hostname`);
+# $class->arg_router->description("$$ $host");
+# $class->arg_router->parent_router($remote_parent);
+# $remote_parent->add_child_router($class->arg_router);
}
1;
use IO::Select;
use Time::HiRes qw(time);
-use Object::Remote::Logging qw( :log );
+use Object::Remote::Logging qw( :log :dlog );
use Moo;
# this is ro because we only actually set it using local in sub run
my $fh = $watch{handle};
log_debug { my $type = ref($fh); "Adding watch for ref of type '$type'" };
if (my $cb = $watch{on_read_ready}) {
+ log_trace { "IO watcher on_read_ready has been invoked" };
$self->_read_select->add($fh);
$self->_read_watches->{$fh} = $cb;
}
if (my $cb = $watch{on_write_ready}) {
+ log_trace { "IO watcher on_write_ready has been invoked" };
$self->_write_select->add($fh);
$self->_write_watches->{$fh} = $cb;
}
my $timers = $self->_timers;
my $new = [ $at => $code ];
@{$timers} = sort { $a->[0] <=> $b->[0] } @{$timers}, $new;
- log_debug { "Created new timer with id of '$new' that expires at '$at'" };
+ log_debug { "Created new timer that expires at '$at'" };
return "$new";
}
return;
}
+sub _next_timer_expires_delay {
+ my ($self) = @_;
+ my $timers = $self->_timers;
+ #undef means no timeout, only returns
+ #when data is ready - when the system
+ #deadlocks the chatter from the timeout in
+ #select clogs up the logs
+ my $delay_max = undef;
+
+ return $delay_max unless @$timers;
+ my $duration = $timers->[0]->[0] - time;
+
+ log_trace { "next timer fires in '$duration' seconds " };
+
+ if ($duration < 0) {
+ $duration = 0;
+ } elsif (! defined($delay_max)) {
+ $duration = undef;
+ } elsif ($duration > $delay_max) {
+ $duration = $delay_max;
+ }
+
+ return $duration;
+}
+
sub loop_once {
my ($self) = @_;
my $read = $self->_read_watches;
my $write = $self->_write_watches;
+ my $read_count = 0;
+ my $write_count = 0;
my @c = caller;
- log_trace { sprintf("Run loop: loop_once() has been invoked by $c[1]:$c[2] with read:%i write:%i", scalar(keys(%$read)), scalar(keys(%$write))) };
+ my $wait_time = $self->_next_timer_expires_delay;
+ log_debug { sprintf("Run loop: loop_once() has been invoked by $c[1]:$c[2] with read:%i write:%i select timeout:%s",
+ scalar(keys(%$read)), scalar(keys(%$write)), defined $wait_time ? $wait_time : 'indefinite' ) };
my ($readable, $writeable) = IO::Select->select(
- $self->_read_select, $self->_write_select, undef, 0.5
- );
+ $self->_read_select, $self->_write_select, undef, $wait_time
+ );
log_debug {
- my $readable_count = defined $readable ? scalar(@$readable) : 0;
- my $writable_count = defined $writeable ? scalar(@$writeable) : 0;
- "run loop has readable:$readable_count writeable:$writable_count";
+ my $readable_count = defined $readable ? scalar(@$readable) : 0;
+ my $writable_count = defined $writeable ? scalar(@$writeable) : 0;
+ "Run loop: select returned readable:$readable_count writeable:$writable_count";
};
# I would love to trap errors in the select call but IO::Select doesn't
# differentiate between an error and a timeout.
# -- no, love, mst.
log_trace { "Reading from all ready filehandles" };
foreach my $fh (@$readable) {
- $read->{$fh}() if $read->{$fh};
+ next unless $read->{$fh};
+ $read_count++;
+ $read->{$fh}();
+# $read->{$fh}() if $read->{$fh};
}
log_trace { "Writing to all ready filehandles" };
foreach my $fh (@$writeable) {
- $write->{$fh}() if $write->{$fh};
+ next unless $write->{$fh};
+ $write_count++;
+ $write->{$fh}();
+# $write->{$fh}() if $write->{$fh};
}
+ log_trace { "Read from $read_count filehandles; wrote to $write_count filehandles" };
my $timers = $self->_timers;
my $now = time();
log_trace { "Checking timers" };
while (@$timers and $timers->[0][0] <= $now) {
+ Dlog_debug { "Found timer that needs to be executed: $_" } $timers->[0];
(shift @$timers)->[1]->();
}
log_debug { "Run loop: single loop is completed" };
sub want_run {
my ($self) = @_;
- $self->{want_running}++;
+ Dlog_debug { "Run loop: Incrimenting want_running, is now $_" }
+ ++$self->{want_running};
}
sub run_while_wanted {
sub want_stop {
my ($self) = @_;
- $self->{want_running}-- if $self->{want_running};
+ if (! $self->{want_running}) {
+ log_debug { "Run loop: want_stop() was called but want_running was not true" };
+ return;
+ }
+ Dlog_debug { "Run loop: decrimenting want_running, is now $_" }
+ --$self->{want_running};
}
sub run {
use CPS::Future;
use Scalar::Util qw(weaken);
+use Object::Remote::Logging qw(:log);
use Moo;
has fh => (
trigger => sub {
my ($self, $fh) = @_;
weaken($self);
+ log_trace { "Watching filehandle via trigger on 'fh' attribute in Object::Remote::ReadChannel" };
Object::Remote->current_loop
->watch_io(
handle => $fh,
sub _receive_data_from {
my ($self, $fh) = @_;
+ log_trace { "Preparing to read data" };
my $rb = $self->_receive_data_buffer;
my $len = sysread($fh, $$rb, 1024, length($$rb));
my $err = defined($len) ? '' : ": $!";
if (defined($len) and $len > 0) {
+ log_trace { "Read $len bytes of data" };
while (my $cb = $self->on_line_call and $$rb =~ s/^(.*)\n//) {
$cb->(my $line = $1);
}
} else {
+ log_trace { "Got EOF or error, this read channel is done" };
Object::Remote->current_loop
->unwatch_io(
handle => $self->fh,
sub DEMOLISH {
my ($self, $gd) = @_;
return if $gd;
+ log_trace { "read channel is being demolished" };
Object::Remote->current_loop
->unwatch_io(
handle => $self->fh,
use Module::Runtime qw(use_module);
use Object::Remote::Future;
+use Object::Remote::Logging qw(:log :dlog );
use Moo::Role;
requires '_open2_for';
sub connect {
my $self = shift;
+ Dlog_debug { "Perparing to create connection with args of: $_" } @_;
my ($send_to_fh, $receive_from_fh, $child_pid) = $self->_open2_for(@_);
my $channel = use_module('Object::Remote::ReadChannel')->new(
fh => $receive_from_fh
);
return future {
+ log_trace { "Initializing connection for child pid '$child_pid'" };
my $f = shift;
$channel->on_line_call(sub {
if ($_[0] eq "Shere") {
+ log_trace { "Received 'Shere' from child pid '$child_pid'; setting done handler to create connection" };
$f->done(
use_module('Object::Remote::Connection')->new(
send_to_fh => $send_to_fh,
)
);
} else {
+ log_warn { "'Shere' was not found in connection data for child pid '$child_pid'" };
$f->fail("Expected Shere from remote but received: $_[0]");
}
undef($channel);
$f->fail("Channel closed without seeing Shere: $_[0]");
undef($channel);
});
+ log_trace { "initialized events on channel for child pid '$child_pid'; creating timeout" };
Object::Remote->current_loop
->watch_time(
%{$self->timeout},
code => sub {
- $f->fail("Connection timed out") unless $f->is_ready;
+# log_warn { "Connection timed out for child pid '$child_pid'" };
+# $f->fail("Connection timed out") unless $f->is_ready;
+# undef($channel);
+ Dlog_trace { "Connection timeout timer has fired for child pid '$child_pid'; is_ready: $_" } $f->is_ready;
+ unless($f->is_ready) {
+ log_warn { "Connection with child pid '$child_pid' has timed out" };
+ $f->fail("Connection timed out") unless $f->is_ready;
+ }
+ #TODO hrm was this supposed to be conditional on the is_ready ?
+ #a connection is only good for timeout seconds?
undef($channel);
}
);
+ log_trace { "connection for child pid '$child_pid' has been initialized" };
$f;
}
}
return ($foreign_stdin, $foreign_stdout, $pid);
}
+#TODO open2() forks off a child and I have not been able to locate
+#a mechanism for reaping dead children so they don't become zombies
sub _open2_for {
my $self = shift;
my ($foreign_stdin, $foreign_stdout, $pid) = $self->_start_perl(@_);
my $to_send = $self->fatnode_text;
+ log_debug { my $len = length($to_send); "Sending contents of fat node to remote node; size is '$len' characters" };
Object::Remote->current_loop
->watch_io(
handle => $foreign_stdin,
# if the stdin went away, we'll never get Shere
# so it's not a big deal to simply give up on !defined
if (!defined($len) or 0 == length($to_send)) {
+ log_trace { "Got EOF or error when writing fatnode data to filehandle, unwatching it" };
Object::Remote->current_loop
->unwatch_io(
handle => $foreign_stdin,
on_write_ready => 1
);
+ } else {
+ log_trace { "Sent $len bytes of fatnode data to remote side" };
}
}
);
sub BUILD { }
after BUILD => sub {
- my ($self) = @_;
-# my $parent = $self->parent_router;
-# return unless defined $parent ;
-# $parent->add_child_router($self);
+ my ($self) = @_;
+# my $parent = $self->parent_router;
+# return unless defined $parent ;
+# $parent->add_child_router($self);
};
sub describe {
- my ($self, $depth) = @_;
- $depth = -1 unless defined $depth;
- $depth++;
- my $buf = "\t" x $depth . $self->description . "\n";
- foreach my $child (@{$self->child_routers}) {
- next unless defined $child;
- $buf .= $child->describe($depth);
- }
+ my ($self, $depth) = @_;
+ $depth = -1 unless defined $depth;
+ $depth++;
+ my $buf = "\t" x $depth . $self->description . "\n";
+ foreach my $child (@{$self->child_routers}) {
+ next unless defined $child;
+ $buf .= $child->describe($depth);
+ }
- return $buf;
+ return $buf;
}
sub add_child_router {
- my ($self, $router) = @_;
- push(@{ $self->child_routers }, $router);
+ my ($self, $router) = @_;
+ push(@{ $self->child_routers }, $router);
# weaken(${ $self->child_routers }[-1]);
- return;
+ return;
}
#sub remove_child_router {
#}
after handle_log_message => sub {
- my ($self, @args) = @_;
- my $parent = $self->parent_router;
+ my ($self, @args) = @_;
+ my $parent = $self->parent_router;
- return unless defined $parent;
- $parent->handle_log_message(@args);
+ return unless defined $parent;
+ $parent->handle_log_message(@args);
};
1;
+