From: Tyler Riddle Date: Sat, 15 Sep 2012 20:38:52 +0000 (-0700) Subject: more log lines - found deadlock where controller blocks on read seemingly outside... X-Git-Tag: v0.003001_01~116 X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?p=scpubgit%2FObject-Remote.git;a=commitdiff_plain;h=5d59cb9859e004df5cde5d83aa7230e621a28b95;hp=a63cd862186adf328e26dd1294e7a3b1adc42ed6 more log lines - found deadlock where controller blocks on read seemingly outside of runloop --- diff --git a/lib/Object/Remote.pm b/lib/Object/Remote.pm index 675b2fb..9dbb70e 100644 --- a/lib/Object/Remote.pm +++ b/lib/Object/Remote.pm @@ -14,13 +14,14 @@ BEGIN { sub new::on { my ($class, $on, @args) = @_; my $conn = __PACKAGE__->connect($on); - log_debug { sprintf("constructing instance of $class on connection for child pid of %i", $conn->child_pid) }; + log_trace { sprintf("constructing instance of $class on connection for child pid of %i", $conn->child_pid) }; return $conn->remote_object(class => $class, args => \@args); } sub can::on { my ($class, $on, $name) = @_; my $conn = __PACKAGE__->connect($on); + log_trace { "Invoking remote \$class->can('$name')" }; return $conn->remote_sub(join('::', $class, $name)); } diff --git a/lib/Object/Remote/LogDestination.pm b/lib/Object/Remote/LogDestination.pm index af4235b..67f436a 100644 --- a/lib/Object/Remote/LogDestination.pm +++ b/lib/Object/Remote/LogDestination.pm @@ -7,15 +7,15 @@ has logger => ( is => 'ro', required => 1 ); has subscriptions => ( is => 'ro', required => 1, default => sub { [] } ); sub select { - my ($self, $router, $selector) = @_; - my $subscription = $router->subscribe($self->logger, $selector); - push(@{ $self->subscriptions }, $subscription); - return $subscription; + my ($self, $router, $selector) = @_; + my $subscription = $router->subscribe($self->logger, $selector); + push(@{ $self->subscriptions }, $subscription); + return $subscription; } sub connect { - my ($self, $router) = @_; - return $self->select($router, sub { 1 }); + my ($self, $router) = @_; + return $self->select($router, sub { 1 }); } 1; diff --git a/lib/Object/Remote/LogRouter.pm b/lib/Object/Remote/LogRouter.pm index 9fa20e0..17108dc 100644 --- a/lib/Object/Remote/LogRouter.pm +++ b/lib/Object/Remote/LogRouter.pm @@ -10,8 +10,8 @@ has description => ( is => 'rw', required => 1 ); sub before_import { } sub after_import { - my ($self, $controller, $importer, $config) = @_; - my $logger = $controller->arg_logger($config->{logger}); + my ($self, $controller, $importer, $config) = @_; + my $logger = $controller->arg_logger($config->{logger}); # TODO need to review this concept, ignore these configuration values for now # my $package_logger = $controller->arg_package_logger($config->{package_logger}); @@ -37,70 +37,70 @@ sub after_import { } sub subscribe { - my ($self, $logger, $selector, $is_temp) = @_; - my $subscription_list = $self->subscriptions; + my ($self, $logger, $selector, $is_temp) = @_; + my $subscription_list = $self->subscriptions; - if(ref $logger ne 'CODE') { - die 'logger was not a CodeRef or a logger object. Please try again.' - unless blessed($logger); - $logger = do { my $l = $logger; sub { $l } } - } + if(ref $logger ne 'CODE') { + die 'logger was not a CodeRef or a logger object. Please try again.' + unless blessed($logger); + $logger = do { my $l = $logger; sub { $l } } + } my $subscription = [ $logger, $selector ]; $is_temp = 0 unless defined $is_temp; push(@$subscription_list, $subscription); if ($is_temp) { - #weaken($subscription->[-1]); + #weaken($subscription->[-1]); } return $subscription; } #TODO turn this logic into a role sub handle_log_message { - my ($self, $caller, $level, $log_meth, @values) = @_; - my $should_clean = 0; + my ($self, $caller, $level, $log_meth, @values) = @_; + my $should_clean = 0; - foreach(@{ $self->subscriptions }) { - unless(defined($_)) { - $should_clean = 1; - next; - } - my ($logger, $selector) = @$_; - #TODO this is not a firm part of the api but providing - #this info to the selector is a good feature - local($_) = { level => $level, package => $caller }; - if ($selector->(@values)) { - #TODO resolve caller_level issues with routing - #idea: the caller level will differ in distance from the - #start of the call stack but it's a constant distance from - #the end of the call stack - can that be exploited to calculate - #the distance from the start right before it's used? - # - #newer idea: in order for log4perl to work right the logger - #must be invoked in the exported log_* method directly - #so by passing the logger down the chain of routers - #it can be invoked in that location and the caller level - #problem doesn't exist anymore - $logger = $logger->($caller, { caller_level => -1 }); + foreach(@{ $self->subscriptions }) { + unless(defined($_)) { + $should_clean = 1; + next; + } + my ($logger, $selector) = @$_; + #TODO this is not a firm part of the api but providing + #this info to the selector is a good feature + local($_) = { level => $level, package => $caller }; + if ($selector->(@values)) { + #TODO resolve caller_level issues with routing + #idea: the caller level will differ in distance from the + #start of the call stack but it's a constant distance from + #the end of the call stack - can that be exploited to calculate + #the distance from the start right before it's used? + # + #newer idea: in order for log4perl to work right the logger + #must be invoked in the exported log_* method directly + #so by passing the logger down the chain of routers + #it can be invoked in that location and the caller level + #problem doesn't exist anymore + $logger = $logger->($caller, { caller_level => -1 }); - $logger->$level($log_meth->(@values)) - if $logger->${\"is_$level"}; - } + $logger->$level($log_meth->(@values)) + if $logger->${\"is_$level"}; + } } if ($should_clean) { - $self->_remove_dead_subscriptions; + $self->_remove_dead_subscriptions; } return; } sub _remove_dead_subscriptions { - my ($self) = @_; - my @ok = grep { defined $_ } @{$self->subscriptions}; - @{$self->subscriptions} = @ok; - return; + my ($self) = @_; + my @ok = grep { defined $_ } @{$self->subscriptions}; + @{$self->subscriptions} = @ok; + return; } diff --git a/lib/Object/Remote/Logging.pm b/lib/Object/Remote/Logging.pm index 432b797..f30a318 100644 --- a/lib/Object/Remote/Logging.pm +++ b/lib/Object/Remote/Logging.pm @@ -11,51 +11,51 @@ use Carp qw(cluck); use base qw(Log::Contextual); sub arg_router { - return $_[1] if defined $_[1]; - our $Router_Instance; + return $_[1] if defined $_[1]; + our $Router_Instance; - return $Router_Instance if defined $Router_Instance; + return $Router_Instance if defined $Router_Instance; - $Router_Instance = Object::Remote::LogRouter->new( - description => $_[0], - ); + $Router_Instance = Object::Remote::LogRouter->new( + description => $_[0], + ); } sub init_logging { - my ($class) = @_; - our $Did_Init; + my ($class) = @_; + our $Did_Init; - return if $Did_Init; - $Did_Init = 1; + return if $Did_Init; + $Did_Init = 1; - if ($ENV{OBJECT_REMOTE_LOG_LEVEL}) { - $class->init_logging_stderr($ENV{OBJECT_REMOTE_LOG_LEVEL}); - } + if ($ENV{OBJECT_REMOTE_LOG_LEVEL}) { + $class->init_logging_stderr($ENV{OBJECT_REMOTE_LOG_LEVEL}); + } } sub init_logging_stderr { - my ($class, $level) = @_; - our $Log_Level = $level; - chomp(my $hostname = `hostname`); - our $Log_Output = Object::Remote::LogDestination->new( - logger => Log::Contextual::SimpleLogger->new({ - levels_upto => $Log_Level, - coderef => sub { - my @t = localtime(); - my $time = sprintf("%0.2i:%0.2i:%0.2i", $t[2], $t[1], $t[0]); - warn "[$hostname $$] $time ", @_ - }, - }) - ); - $Log_Output->connect($class->arg_router); + my ($class, $level) = @_; + our $Log_Level = $level; + chomp(my $hostname = `hostname`); + our $Log_Output = Object::Remote::LogDestination->new( + logger => Log::Contextual::SimpleLogger->new({ + levels_upto => $Log_Level, + coderef => sub { + my @t = localtime(); + my $time = sprintf("%0.2i:%0.2i:%0.2i", $t[2], $t[1], $t[0]); + warn "[$hostname $$] $time ", @_ + }, + }) + ); + $Log_Output->connect($class->arg_router); } sub init_logging_forwarding { -# my ($class, $remote_parent) = @_; -# chomp(my $host = `hostname`); -# $class->arg_router->description("$$ $host"); -# $class->arg_router->parent_router($remote_parent); -# $remote_parent->add_child_router($class->arg_router); +# my ($class, $remote_parent) = @_; +# chomp(my $host = `hostname`); +# $class->arg_router->description("$$ $host"); +# $class->arg_router->parent_router($remote_parent); +# $remote_parent->add_child_router($class->arg_router); } 1; diff --git a/lib/Object/Remote/MiniLoop.pm b/lib/Object/Remote/MiniLoop.pm index 4531946..16216de 100644 --- a/lib/Object/Remote/MiniLoop.pm +++ b/lib/Object/Remote/MiniLoop.pm @@ -2,7 +2,7 @@ package Object::Remote::MiniLoop; use IO::Select; use Time::HiRes qw(time); -use Object::Remote::Logging qw( :log ); +use Object::Remote::Logging qw( :log :dlog ); use Moo; # this is ro because we only actually set it using local in sub run @@ -39,10 +39,12 @@ sub watch_io { my $fh = $watch{handle}; log_debug { my $type = ref($fh); "Adding watch for ref of type '$type'" }; if (my $cb = $watch{on_read_ready}) { + log_trace { "IO watcher on_read_ready has been invoked" }; $self->_read_select->add($fh); $self->_read_watches->{$fh} = $cb; } if (my $cb = $watch{on_write_ready}) { + log_trace { "IO watcher on_write_ready has been invoked" }; $self->_write_select->add($fh); $self->_write_watches->{$fh} = $cb; } @@ -74,7 +76,7 @@ sub watch_time { my $timers = $self->_timers; my $new = [ $at => $code ]; @{$timers} = sort { $a->[0] <=> $b->[0] } @{$timers}, $new; - log_debug { "Created new timer with id of '$new' that expires at '$at'" }; + log_debug { "Created new timer that expires at '$at'" }; return "$new"; } @@ -85,35 +87,72 @@ sub unwatch_time { return; } +sub _next_timer_expires_delay { + my ($self) = @_; + my $timers = $self->_timers; + #undef means no timeout, only returns + #when data is ready - when the system + #deadlocks the chatter from the timeout in + #select clogs up the logs + my $delay_max = undef; + + return $delay_max unless @$timers; + my $duration = $timers->[0]->[0] - time; + + log_trace { "next timer fires in '$duration' seconds " }; + + if ($duration < 0) { + $duration = 0; + } elsif (! defined($delay_max)) { + $duration = undef; + } elsif ($duration > $delay_max) { + $duration = $delay_max; + } + + return $duration; +} + sub loop_once { my ($self) = @_; my $read = $self->_read_watches; my $write = $self->_write_watches; + my $read_count = 0; + my $write_count = 0; my @c = caller; - log_trace { sprintf("Run loop: loop_once() has been invoked by $c[1]:$c[2] with read:%i write:%i", scalar(keys(%$read)), scalar(keys(%$write))) }; + my $wait_time = $self->_next_timer_expires_delay; + log_debug { sprintf("Run loop: loop_once() has been invoked by $c[1]:$c[2] with read:%i write:%i select timeout:%s", + scalar(keys(%$read)), scalar(keys(%$write)), defined $wait_time ? $wait_time : 'indefinite' ) }; my ($readable, $writeable) = IO::Select->select( - $self->_read_select, $self->_write_select, undef, 0.5 - ); + $self->_read_select, $self->_write_select, undef, $wait_time + ); log_debug { - my $readable_count = defined $readable ? scalar(@$readable) : 0; - my $writable_count = defined $writeable ? scalar(@$writeable) : 0; - "run loop has readable:$readable_count writeable:$writable_count"; + my $readable_count = defined $readable ? scalar(@$readable) : 0; + my $writable_count = defined $writeable ? scalar(@$writeable) : 0; + "Run loop: select returned readable:$readable_count writeable:$writable_count"; }; # I would love to trap errors in the select call but IO::Select doesn't # differentiate between an error and a timeout. # -- no, love, mst. log_trace { "Reading from all ready filehandles" }; foreach my $fh (@$readable) { - $read->{$fh}() if $read->{$fh}; + next unless $read->{$fh}; + $read_count++; + $read->{$fh}(); +# $read->{$fh}() if $read->{$fh}; } log_trace { "Writing to all ready filehandles" }; foreach my $fh (@$writeable) { - $write->{$fh}() if $write->{$fh}; + next unless $write->{$fh}; + $write_count++; + $write->{$fh}(); +# $write->{$fh}() if $write->{$fh}; } + log_trace { "Read from $read_count filehandles; wrote to $write_count filehandles" }; my $timers = $self->_timers; my $now = time(); log_trace { "Checking timers" }; while (@$timers and $timers->[0][0] <= $now) { + Dlog_debug { "Found timer that needs to be executed: $_" } $timers->[0]; (shift @$timers)->[1]->(); } log_debug { "Run loop: single loop is completed" }; @@ -122,7 +161,8 @@ sub loop_once { sub want_run { my ($self) = @_; - $self->{want_running}++; + Dlog_debug { "Run loop: Incrimenting want_running, is now $_" } + ++$self->{want_running}; } sub run_while_wanted { @@ -135,7 +175,12 @@ sub run_while_wanted { sub want_stop { my ($self) = @_; - $self->{want_running}-- if $self->{want_running}; + if (! $self->{want_running}) { + log_debug { "Run loop: want_stop() was called but want_running was not true" }; + return; + } + Dlog_debug { "Run loop: decrimenting want_running, is now $_" } + --$self->{want_running}; } sub run { diff --git a/lib/Object/Remote/ReadChannel.pm b/lib/Object/Remote/ReadChannel.pm index 84b3270..5de5bf1 100644 --- a/lib/Object/Remote/ReadChannel.pm +++ b/lib/Object/Remote/ReadChannel.pm @@ -2,6 +2,7 @@ package Object::Remote::ReadChannel; use CPS::Future; use Scalar::Util qw(weaken); +use Object::Remote::Logging qw(:log); use Moo; has fh => ( @@ -9,6 +10,7 @@ has fh => ( trigger => sub { my ($self, $fh) = @_; weaken($self); + log_trace { "Watching filehandle via trigger on 'fh' attribute in Object::Remote::ReadChannel" }; Object::Remote->current_loop ->watch_io( handle => $fh, @@ -27,14 +29,17 @@ has _receive_data_buffer => (is => 'ro', default => sub { my $x = ''; \$x }); sub _receive_data_from { my ($self, $fh) = @_; + log_trace { "Preparing to read data" }; my $rb = $self->_receive_data_buffer; my $len = sysread($fh, $$rb, 1024, length($$rb)); my $err = defined($len) ? '' : ": $!"; if (defined($len) and $len > 0) { + log_trace { "Read $len bytes of data" }; while (my $cb = $self->on_line_call and $$rb =~ s/^(.*)\n//) { $cb->(my $line = $1); } } else { + log_trace { "Got EOF or error, this read channel is done" }; Object::Remote->current_loop ->unwatch_io( handle => $self->fh, @@ -47,6 +52,7 @@ sub _receive_data_from { sub DEMOLISH { my ($self, $gd) = @_; return if $gd; + log_trace { "read channel is being demolished" }; Object::Remote->current_loop ->unwatch_io( handle => $self->fh, diff --git a/lib/Object/Remote/Role/Connector.pm b/lib/Object/Remote/Role/Connector.pm index 6da4a9c..83bfcea 100644 --- a/lib/Object/Remote/Role/Connector.pm +++ b/lib/Object/Remote/Role/Connector.pm @@ -2,6 +2,7 @@ package Object::Remote::Role::Connector; use Module::Runtime qw(use_module); use Object::Remote::Future; +use Object::Remote::Logging qw(:log :dlog ); use Moo::Role; requires '_open2_for'; @@ -10,14 +11,17 @@ has timeout => (is => 'ro', default => sub { { after => 10 } }); sub connect { my $self = shift; + Dlog_debug { "Perparing to create connection with args of: $_" } @_; my ($send_to_fh, $receive_from_fh, $child_pid) = $self->_open2_for(@_); my $channel = use_module('Object::Remote::ReadChannel')->new( fh => $receive_from_fh ); return future { + log_trace { "Initializing connection for child pid '$child_pid'" }; my $f = shift; $channel->on_line_call(sub { if ($_[0] eq "Shere") { + log_trace { "Received 'Shere' from child pid '$child_pid'; setting done handler to create connection" }; $f->done( use_module('Object::Remote::Connection')->new( send_to_fh => $send_to_fh, @@ -26,6 +30,7 @@ sub connect { ) ); } else { + log_warn { "'Shere' was not found in connection data for child pid '$child_pid'" }; $f->fail("Expected Shere from remote but received: $_[0]"); } undef($channel); @@ -34,14 +39,25 @@ sub connect { $f->fail("Channel closed without seeing Shere: $_[0]"); undef($channel); }); + log_trace { "initialized events on channel for child pid '$child_pid'; creating timeout" }; Object::Remote->current_loop ->watch_time( %{$self->timeout}, code => sub { - $f->fail("Connection timed out") unless $f->is_ready; +# log_warn { "Connection timed out for child pid '$child_pid'" }; +# $f->fail("Connection timed out") unless $f->is_ready; +# undef($channel); + Dlog_trace { "Connection timeout timer has fired for child pid '$child_pid'; is_ready: $_" } $f->is_ready; + unless($f->is_ready) { + log_warn { "Connection with child pid '$child_pid' has timed out" }; + $f->fail("Connection timed out") unless $f->is_ready; + } + #TODO hrm was this supposed to be conditional on the is_ready ? + #a connection is only good for timeout seconds? undef($channel); } ); + log_trace { "connection for child pid '$child_pid' has been initialized" }; $f; } } diff --git a/lib/Object/Remote/Role/Connector/PerlInterpreter.pm b/lib/Object/Remote/Role/Connector/PerlInterpreter.pm index d418c23..815ef3e 100644 --- a/lib/Object/Remote/Role/Connector/PerlInterpreter.pm +++ b/lib/Object/Remote/Role/Connector/PerlInterpreter.pm @@ -56,10 +56,13 @@ sub _start_perl { return ($foreign_stdin, $foreign_stdout, $pid); } +#TODO open2() forks off a child and I have not been able to locate +#a mechanism for reaping dead children so they don't become zombies sub _open2_for { my $self = shift; my ($foreign_stdin, $foreign_stdout, $pid) = $self->_start_perl(@_); my $to_send = $self->fatnode_text; + log_debug { my $len = length($to_send); "Sending contents of fat node to remote node; size is '$len' characters" }; Object::Remote->current_loop ->watch_io( handle => $foreign_stdin, @@ -71,11 +74,14 @@ sub _open2_for { # if the stdin went away, we'll never get Shere # so it's not a big deal to simply give up on !defined if (!defined($len) or 0 == length($to_send)) { + log_trace { "Got EOF or error when writing fatnode data to filehandle, unwatching it" }; Object::Remote->current_loop ->unwatch_io( handle => $foreign_stdin, on_write_ready => 1 ); + } else { + log_trace { "Sent $len bytes of fatnode data to remote side" }; } } ); diff --git a/lib/Object/Remote/Role/LogForwarder.pm b/lib/Object/Remote/Role/LogForwarder.pm index 3c1ef62..4a4f4af 100644 --- a/lib/Object/Remote/Role/LogForwarder.pm +++ b/lib/Object/Remote/Role/LogForwarder.pm @@ -16,30 +16,30 @@ has parent_router => ( is => 'rw', );#weak_ref => 1 ); sub BUILD { } after BUILD => sub { - my ($self) = @_; -# my $parent = $self->parent_router; -# return unless defined $parent ; -# $parent->add_child_router($self); + my ($self) = @_; +# my $parent = $self->parent_router; +# return unless defined $parent ; +# $parent->add_child_router($self); }; sub describe { - my ($self, $depth) = @_; - $depth = -1 unless defined $depth; - $depth++; - my $buf = "\t" x $depth . $self->description . "\n"; - foreach my $child (@{$self->child_routers}) { - next unless defined $child; - $buf .= $child->describe($depth); - } + my ($self, $depth) = @_; + $depth = -1 unless defined $depth; + $depth++; + my $buf = "\t" x $depth . $self->description . "\n"; + foreach my $child (@{$self->child_routers}) { + next unless defined $child; + $buf .= $child->describe($depth); + } - return $buf; + return $buf; } sub add_child_router { - my ($self, $router) = @_; - push(@{ $self->child_routers }, $router); + my ($self, $router) = @_; + push(@{ $self->child_routers }, $router); # weaken(${ $self->child_routers }[-1]); - return; + return; } #sub remove_child_router { @@ -48,11 +48,12 @@ sub add_child_router { #} after handle_log_message => sub { - my ($self, @args) = @_; - my $parent = $self->parent_router; + my ($self, @args) = @_; + my $parent = $self->parent_router; - return unless defined $parent; - $parent->handle_log_message(@args); + return unless defined $parent; + $parent->handle_log_message(@args); }; 1; +