more log lines - found deadlock where controller blocks on read seemingly outside...
Tyler Riddle [Sat, 15 Sep 2012 20:38:52 +0000 (13:38 -0700)]
lib/Object/Remote.pm
lib/Object/Remote/LogDestination.pm
lib/Object/Remote/LogRouter.pm
lib/Object/Remote/Logging.pm
lib/Object/Remote/MiniLoop.pm
lib/Object/Remote/ReadChannel.pm
lib/Object/Remote/Role/Connector.pm
lib/Object/Remote/Role/Connector/PerlInterpreter.pm
lib/Object/Remote/Role/LogForwarder.pm

index 675b2fb..9dbb70e 100644 (file)
@@ -14,13 +14,14 @@ BEGIN {
 sub new::on {
   my ($class, $on, @args) = @_;
   my $conn = __PACKAGE__->connect($on);
-  log_debug { sprintf("constructing instance of $class on connection for child pid of %i", $conn->child_pid) };
+  log_trace { sprintf("constructing instance of $class on connection for child pid of %i", $conn->child_pid) };
   return $conn->remote_object(class => $class, args => \@args);
 }
 
 sub can::on {
   my ($class, $on, $name) = @_;
   my $conn = __PACKAGE__->connect($on);
+  log_trace { "Invoking remote \$class->can('$name')" };
   return $conn->remote_sub(join('::', $class, $name));
 }
 
index af4235b..67f436a 100644 (file)
@@ -7,15 +7,15 @@ has logger => ( is => 'ro', required => 1 );
 has subscriptions => ( is => 'ro', required => 1, default => sub { [] } ); 
 
 sub select {
-   my ($self, $router, $selector) = @_; 
-   my $subscription = $router->subscribe($self->logger, $selector); 
-   push(@{ $self->subscriptions }, $subscription);
-   return $subscription; 
+  my ($self, $router, $selector) = @_; 
+  my $subscription = $router->subscribe($self->logger, $selector); 
+  push(@{ $self->subscriptions }, $subscription);
+  return $subscription; 
 }
 
 sub connect {
-   my ($self, $router) = @_; 
-   return $self->select($router, sub { 1 });
+  my ($self, $router) = @_; 
+  return $self->select($router, sub { 1 });
 }
 
 1; 
index 9fa20e0..17108dc 100644 (file)
@@ -10,8 +10,8 @@ has description => ( is => 'rw', required => 1 );
 
 sub before_import { }
 sub after_import {   
-   my ($self, $controller, $importer, $config) = @_;
-   my $logger = $controller->arg_logger($config->{logger});
+  my ($self, $controller, $importer, $config) = @_;
+  my $logger = $controller->arg_logger($config->{logger});
    
 # TODO need to review this concept, ignore these configuration values for now
 #   my $package_logger = $controller->arg_package_logger($config->{package_logger});
@@ -37,70 +37,70 @@ sub after_import {
 }
 
 sub subscribe {
-   my ($self, $logger, $selector, $is_temp) = @_; 
-   my $subscription_list = $self->subscriptions;
+  my ($self, $logger, $selector, $is_temp) = @_; 
+  my $subscription_list = $self->subscriptions;
    
-   if(ref $logger ne 'CODE') {
-      die 'logger was not a CodeRef or a logger object.  Please try again.'
-         unless blessed($logger);
-      $logger = do { my $l = $logger; sub { $l } }
-   }
+  if(ref $logger ne 'CODE') {
+    die 'logger was not a CodeRef or a logger object.  Please try again.'
+      unless blessed($logger);
+    $logger = do { my $l = $logger; sub { $l } }
+  }
   
    my $subscription = [ $logger, $selector ];
   
    $is_temp = 0 unless defined $is_temp; 
    push(@$subscription_list, $subscription);
    if ($is_temp) {
-      #weaken($subscription->[-1]);
+     #weaken($subscription->[-1]);
    }
    return $subscription; 
 }
 
 #TODO turn this logic into a role
 sub handle_log_message {
-   my ($self, $caller, $level, $log_meth, @values) = @_; 
-   my $should_clean = 0; 
+  my ($self, $caller, $level, $log_meth, @values) = @_; 
+  my $should_clean = 0; 
       
-   foreach(@{ $self->subscriptions }) {
-      unless(defined($_)) {
-         $should_clean = 1;
-         next; 
-      }
-      my ($logger, $selector) = @$_;
-      #TODO this is not a firm part of the api but providing
-      #this info to the selector is a good feature
-      local($_) = { level => $level, package => $caller };
-      if ($selector->(@values)) {
-         #TODO resolve caller_level issues with routing
-         #idea: the caller level will differ in distance from the
-         #start of the call stack but it's a constant distance from
-         #the end of the call stack - can that be exploited to calculate
-         #the distance from the start right before it's used?
-         #
-         #newer idea: in order for log4perl to work right the logger
-         #must be invoked in the exported log_* method directly
-         #so by passing the logger down the chain of routers
-         #it can be invoked in that location and the caller level
-         #problem doesn't exist anymore
-         $logger = $logger->($caller, { caller_level => -1 });
+  foreach(@{ $self->subscriptions }) {
+    unless(defined($_)) {
+      $should_clean = 1;
+        next; 
+     }
+     my ($logger, $selector) = @$_;
+     #TODO this is not a firm part of the api but providing
+     #this info to the selector is a good feature
+     local($_) = { level => $level, package => $caller };
+     if ($selector->(@values)) {
+        #TODO resolve caller_level issues with routing
+        #idea: the caller level will differ in distance from the
+        #start of the call stack but it's a constant distance from
+        #the end of the call stack - can that be exploited to calculate
+        #the distance from the start right before it's used?
+        #
+        #newer idea: in order for log4perl to work right the logger
+        #must be invoked in the exported log_* method directly
+        #so by passing the logger down the chain of routers
+        #it can be invoked in that location and the caller level
+        #problem doesn't exist anymore
+        $logger = $logger->($caller, { caller_level => -1 });
          
-         $logger->$level($log_meth->(@values))
-            if $logger->${\"is_$level"};
-      }
+        $logger->$level($log_meth->(@values))
+          if $logger->${\"is_$level"};
+     }
    }
    
    if ($should_clean) {
-      $self->_remove_dead_subscriptions; 
+     $self->_remove_dead_subscriptions; 
    }
    
    return; 
 }
 
 sub _remove_dead_subscriptions {
-   my ($self) = @_; 
-   my @ok = grep { defined $_ } @{$self->subscriptions}; 
-   @{$self->subscriptions} = @ok; 
-   return; 
+  my ($self) = @_; 
+  my @ok = grep { defined $_ } @{$self->subscriptions}; 
+  @{$self->subscriptions} = @ok; 
+  return; 
 }
 
 
index 432b797..f30a318 100644 (file)
@@ -11,51 +11,51 @@ use Carp qw(cluck);
 use base qw(Log::Contextual); 
 
 sub arg_router {
-    return $_[1] if defined $_[1]; 
-    our $Router_Instance;
+  return $_[1] if defined $_[1]; 
+  our $Router_Instance;
  
-    return $Router_Instance if defined $Router_Instance; 
+  return $Router_Instance if defined $Router_Instance; 
  
-    $Router_Instance = Object::Remote::LogRouter->new(
-        description => $_[0],
-    );
+  $Router_Instance = Object::Remote::LogRouter->new(
+    description => $_[0],
+  );
 }
 
 sub init_logging {
-    my ($class) = @_; 
-    our $Did_Init;
+  my ($class) = @_; 
+  our $Did_Init;
     
-    return if $Did_Init;
-    $Did_Init = 1; 
+  return if $Did_Init;
+  $Did_Init = 1; 
     
-    if ($ENV{OBJECT_REMOTE_LOG_LEVEL}) {
-        $class->init_logging_stderr($ENV{OBJECT_REMOTE_LOG_LEVEL});
-    }
+  if ($ENV{OBJECT_REMOTE_LOG_LEVEL}) {
+    $class->init_logging_stderr($ENV{OBJECT_REMOTE_LOG_LEVEL});
+  }
 }
 
 sub init_logging_stderr {
-    my ($class, $level) = @_;
-    our $Log_Level = $level;
-    chomp(my $hostname = `hostname`);
-    our $Log_Output = Object::Remote::LogDestination->new(
-        logger => Log::Contextual::SimpleLogger->new({ 
-            levels_upto => $Log_Level,
-            coderef => sub { 
-                my @t = localtime();
-                my $time = sprintf("%0.2i:%0.2i:%0.2i", $t[2], $t[1], $t[0]);
-                warn "[$hostname $$] $time ", @_ 
-            },
-        })
-    );
-    $Log_Output->connect($class->arg_router);
+  my ($class, $level) = @_;
+  our $Log_Level = $level;
+  chomp(my $hostname = `hostname`);
+  our $Log_Output = Object::Remote::LogDestination->new(
+    logger => Log::Contextual::SimpleLogger->new({ 
+      levels_upto => $Log_Level,
+      coderef => sub { 
+        my @t = localtime();
+        my $time = sprintf("%0.2i:%0.2i:%0.2i", $t[2], $t[1], $t[0]);
+        warn "[$hostname $$] $time ", @_ 
+      },
+    })
+  );
+  $Log_Output->connect($class->arg_router);
 }
 
 sub init_logging_forwarding {
-#    my ($class, $remote_parent) = @_; 
-#    chomp(my $host = `hostname`);
-#    $class->arg_router->description("$$ $host");
-#    $class->arg_router->parent_router($remote_parent);
-#    $remote_parent->add_child_router($class->arg_router);
+#  my ($class, $remote_parent) = @_; 
+#  chomp(my $host = `hostname`);
+#  $class->arg_router->description("$$ $host");
+#  $class->arg_router->parent_router($remote_parent);
+#  $remote_parent->add_child_router($class->arg_router);
 }
 
 1;
index 4531946..16216de 100644 (file)
@@ -2,7 +2,7 @@ package Object::Remote::MiniLoop;
 
 use IO::Select;
 use Time::HiRes qw(time);
-use Object::Remote::Logging qw( :log );
+use Object::Remote::Logging qw( :log :dlog );
 use Moo;
 
 # this is ro because we only actually set it using local in sub run
@@ -39,10 +39,12 @@ sub watch_io {
   my $fh = $watch{handle};
   log_debug { my $type = ref($fh); "Adding watch for ref of type '$type'" };
   if (my $cb = $watch{on_read_ready}) {
+    log_trace { "IO watcher on_read_ready has been invoked" };
     $self->_read_select->add($fh);
     $self->_read_watches->{$fh} = $cb;
   }
   if (my $cb = $watch{on_write_ready}) {
+    log_trace { "IO watcher on_write_ready has been invoked" };
     $self->_write_select->add($fh);
     $self->_write_watches->{$fh} = $cb;
   }
@@ -74,7 +76,7 @@ sub watch_time {
   my $timers = $self->_timers;
   my $new = [ $at => $code ];
   @{$timers} = sort { $a->[0] <=> $b->[0] } @{$timers}, $new;
-  log_debug { "Created new timer with id of '$new' that expires at '$at'" };
+  log_debug { "Created new timer that expires at '$at'" };
   return "$new";
 }
 
@@ -85,35 +87,72 @@ sub unwatch_time {
   return;
 }
 
+sub _next_timer_expires_delay {
+  my ($self) = @_;
+  my $timers = $self->_timers;
+  #undef means no timeout, only returns
+  #when data is ready - when the system
+  #deadlocks the chatter from the timeout in
+  #select clogs up the logs
+  my $delay_max = undef;
+    
+  return $delay_max unless @$timers;
+  my $duration = $timers->[0]->[0] - time;
+
+  log_trace { "next timer fires in '$duration' seconds " };
+  
+  if ($duration < 0) {
+    $duration = 0; 
+  } elsif (! defined($delay_max)) {
+    $duration = undef; 
+  } elsif ($duration > $delay_max) {
+    $duration = $delay_max; 
+  }
+    
+  return $duration; 
+}
+
 sub loop_once {
   my ($self) = @_;
   my $read = $self->_read_watches;
   my $write = $self->_write_watches;
+  my $read_count = 0;
+  my $write_count = 0; 
   my @c = caller;
-  log_trace {  sprintf("Run loop: loop_once() has been invoked by $c[1]:$c[2] with read:%i write:%i", scalar(keys(%$read)), scalar(keys(%$write))) };
+  my $wait_time = $self->_next_timer_expires_delay;
+  log_debug {  sprintf("Run loop: loop_once() has been invoked by $c[1]:$c[2] with read:%i write:%i select timeout:%s",
+      scalar(keys(%$read)), scalar(keys(%$write)), defined $wait_time ? $wait_time : 'indefinite' ) };
   my ($readable, $writeable) = IO::Select->select(
-    $self->_read_select, $self->_write_select, undef, 0.5
-  );
+    $self->_read_select, $self->_write_select, undef, $wait_time
+  ); 
   log_debug { 
-      my $readable_count = defined $readable ? scalar(@$readable) : 0;
-      my $writable_count = defined $writeable ? scalar(@$writeable) : 0;
-      "run loop has readable:$readable_count writeable:$writable_count";
+    my $readable_count = defined $readable ? scalar(@$readable) : 0;
+    my $writable_count = defined $writeable ? scalar(@$writeable) : 0;
+    "Run loop: select returned readable:$readable_count writeable:$writable_count";
   };
   # I would love to trap errors in the select call but IO::Select doesn't
   # differentiate between an error and a timeout.
   #   -- no, love, mst.
   log_trace { "Reading from all ready filehandles" };
   foreach my $fh (@$readable) {
-    $read->{$fh}() if $read->{$fh};
+    next unless $read->{$fh};
+    $read_count++;
+    $read->{$fh}();
+#    $read->{$fh}() if $read->{$fh};
   }
   log_trace { "Writing to all ready filehandles" };
   foreach my $fh (@$writeable) {
-    $write->{$fh}() if $write->{$fh};
+    next unless $write->{$fh};
+    $write_count++;
+    $write->{$fh}();
+#    $write->{$fh}() if $write->{$fh};
   }
+  log_trace { "Read from $read_count filehandles; wrote to $write_count filehandles" };
   my $timers = $self->_timers;
   my $now = time();
   log_trace { "Checking timers" };
   while (@$timers and $timers->[0][0] <= $now) {
+    Dlog_debug { "Found timer that needs to be executed: $_" } $timers->[0];
     (shift @$timers)->[1]->();
   }
   log_debug { "Run loop: single loop is completed" };
@@ -122,7 +161,8 @@ sub loop_once {
 
 sub want_run {
   my ($self) = @_;
-  $self->{want_running}++;
+  Dlog_debug { "Run loop: Incrimenting want_running, is now $_" }
+    ++$self->{want_running};
 }
 
 sub run_while_wanted {
@@ -135,7 +175,12 @@ sub run_while_wanted {
 
 sub want_stop {
   my ($self) = @_;
-  $self->{want_running}-- if $self->{want_running};
+  if (! $self->{want_running}) {
+    log_debug { "Run loop: want_stop() was called but want_running was not true" };
+    return; 
+  }
+  Dlog_debug { "Run loop: decrimenting want_running, is now $_" }
+    --$self->{want_running};
 }
 
 sub run {
index 84b3270..5de5bf1 100644 (file)
@@ -2,6 +2,7 @@ package Object::Remote::ReadChannel;
 
 use CPS::Future;
 use Scalar::Util qw(weaken);
+use Object::Remote::Logging qw(:log);
 use Moo;
 
 has fh => (
@@ -9,6 +10,7 @@ has fh => (
   trigger => sub {
     my ($self, $fh) = @_;
     weaken($self);
+    log_trace { "Watching filehandle via trigger on 'fh' attribute in Object::Remote::ReadChannel" };
     Object::Remote->current_loop
                   ->watch_io(
                       handle => $fh,
@@ -27,14 +29,17 @@ has _receive_data_buffer => (is => 'ro', default => sub { my $x = ''; \$x });
 
 sub _receive_data_from {
   my ($self, $fh) = @_;
+  log_trace { "Preparing to read data" };
   my $rb = $self->_receive_data_buffer;
   my $len = sysread($fh, $$rb, 1024, length($$rb));
   my $err = defined($len) ? '' : ": $!";
   if (defined($len) and $len > 0) {
+    log_trace { "Read $len bytes of data" };
     while (my $cb = $self->on_line_call and $$rb =~ s/^(.*)\n//) {
       $cb->(my $line = $1);
     }
   } else {
+    log_trace { "Got EOF or error, this read channel is done" };
     Object::Remote->current_loop
                   ->unwatch_io(
                       handle => $self->fh,
@@ -47,6 +52,7 @@ sub _receive_data_from {
 sub DEMOLISH {
   my ($self, $gd) = @_;
   return if $gd;
+  log_trace { "read channel is being demolished" };
   Object::Remote->current_loop
                 ->unwatch_io(
                     handle => $self->fh,
index 6da4a9c..83bfcea 100644 (file)
@@ -2,6 +2,7 @@ package Object::Remote::Role::Connector;
 
 use Module::Runtime qw(use_module);
 use Object::Remote::Future;
+use Object::Remote::Logging qw(:log :dlog );
 use Moo::Role;
 
 requires '_open2_for';
@@ -10,14 +11,17 @@ has timeout => (is => 'ro', default => sub { { after => 10 } });
 
 sub connect {
   my $self = shift;
+  Dlog_debug { "Perparing to create connection with args of: $_" } @_;
   my ($send_to_fh, $receive_from_fh, $child_pid) = $self->_open2_for(@_);
   my $channel = use_module('Object::Remote::ReadChannel')->new(
     fh => $receive_from_fh
   );
   return future {
+    log_trace { "Initializing connection for child pid '$child_pid'" };
     my $f = shift;
     $channel->on_line_call(sub {
       if ($_[0] eq "Shere") {
+        log_trace { "Received 'Shere' from child pid '$child_pid'; setting done handler to create connection" };
         $f->done(
           use_module('Object::Remote::Connection')->new(
             send_to_fh => $send_to_fh,
@@ -26,6 +30,7 @@ sub connect {
           )
         );
       } else {
+        log_warn { "'Shere' was not found in connection data for child pid '$child_pid'" };
         $f->fail("Expected Shere from remote but received: $_[0]");
       }
       undef($channel);
@@ -34,14 +39,25 @@ sub connect {
       $f->fail("Channel closed without seeing Shere: $_[0]");
       undef($channel);
     });
+    log_trace { "initialized events on channel for child pid '$child_pid'; creating timeout" };
     Object::Remote->current_loop
                   ->watch_time(
                       %{$self->timeout},
                       code => sub {
-                        $f->fail("Connection timed out") unless $f->is_ready;
+#                        log_warn { "Connection timed out for child pid '$child_pid'" };
+#                        $f->fail("Connection timed out") unless $f->is_ready;
+#                        undef($channel);
+                        Dlog_trace { "Connection timeout timer has fired for child pid '$child_pid'; is_ready: $_" } $f->is_ready;
+                        unless($f->is_ready) {
+                            log_warn { "Connection with child pid '$child_pid' has timed out" };
+                            $f->fail("Connection timed out") unless $f->is_ready;                    
+                        }
+                        #TODO hrm was this supposed to be conditional on the is_ready ? 
+                        #a connection is only good for timeout seconds?
                         undef($channel);
                       }
                     );
+    log_trace { "connection for child pid '$child_pid' has been initialized" }; 
     $f;
   }
 }
index d418c23..815ef3e 100644 (file)
@@ -56,10 +56,13 @@ sub _start_perl {
   return ($foreign_stdin, $foreign_stdout, $pid);
 }
 
+#TODO open2() forks off a child and I have not been able to locate
+#a mechanism for reaping dead children so they don't become zombies
 sub _open2_for {
   my $self = shift;
   my ($foreign_stdin, $foreign_stdout, $pid) = $self->_start_perl(@_);
   my $to_send = $self->fatnode_text;
+  log_debug { my $len = length($to_send); "Sending contents of fat node to remote node; size is '$len' characters"  };
   Object::Remote->current_loop
                 ->watch_io(
                     handle => $foreign_stdin,
@@ -71,11 +74,14 @@ sub _open2_for {
                       # if the stdin went away, we'll never get Shere
                       # so it's not a big deal to simply give up on !defined
                       if (!defined($len) or 0 == length($to_send)) {
+                        log_trace { "Got EOF or error when writing fatnode data to filehandle, unwatching it" };
                         Object::Remote->current_loop
                                       ->unwatch_io(
                                           handle => $foreign_stdin,
                                           on_write_ready => 1
                                         );
+                      } else {
+                          log_trace { "Sent $len bytes of fatnode data to remote side" };
                       }
                     }
                   );
index 3c1ef62..4a4f4af 100644 (file)
@@ -16,30 +16,30 @@ has parent_router => ( is => 'rw', );#weak_ref => 1 );
 sub BUILD { }
 
 after BUILD => sub {
-    my ($self) = @_; 
-#    my $parent = $self->parent_router; 
-#    return unless defined $parent ; 
-#    $parent->add_child_router($self);
+  my ($self) = @_; 
+#  my $parent = $self->parent_router; 
+#  return unless defined $parent ; 
+#  $parent->add_child_router($self);
 };
 
 sub describe {
-    my ($self, $depth) = @_; 
-    $depth = -1 unless defined $depth; 
-    $depth++;
-    my $buf = "\t" x $depth . $self->description . "\n";
-    foreach my $child (@{$self->child_routers}) {
-        next unless defined $child; 
-        $buf .= $child->describe($depth);
-    }
+  my ($self, $depth) = @_; 
+  $depth = -1 unless defined $depth; 
+  $depth++;
+  my $buf = "\t" x $depth . $self->description . "\n";
+  foreach my $child (@{$self->child_routers}) {
+      next unless defined $child; 
+      $buf .= $child->describe($depth);
+  }
     
-    return $buf; 
+  return $buf; 
 }
 
 sub add_child_router {
-   my ($self, $router) = @_;
-   push(@{ $self->child_routers }, $router);
+  my ($self, $router) = @_;
+  push(@{ $self->child_routers }, $router);
 #   weaken(${ $self->child_routers }[-1]);
-   return; 
+  return; 
 }
 
 #sub remove_child_router {
@@ -48,11 +48,12 @@ sub add_child_router {
 #}
 
 after handle_log_message => sub {
-   my ($self, @args) = @_;
-   my $parent = $self->parent_router;
+  my ($self, @args) = @_;
+  my $parent = $self->parent_router;
       
-   return unless defined $parent;
-   $parent->handle_log_message(@args);
+  return unless defined $parent;
+  $parent->handle_log_message(@args);
 };
 
 1;
+