found location of hang; make annotations; added more log lines
Tyler Riddle [Sun, 16 Sep 2012 20:56:46 +0000 (13:56 -0700)]
lib/Object/Remote/Connection.pm
lib/Object/Remote/ConnectionServer.pm
lib/Object/Remote/Connector/LocalSudo.pm
lib/Object/Remote/Future.pm
lib/Object/Remote/MiniLoop.pm
lib/Object/Remote/Node.pm
lib/Object/Remote/ReadChannel.pm

index da80bac..ba68d86 100644 (file)
@@ -6,6 +6,7 @@ use Object::Remote::Handle;
 use Object::Remote::CodeContainer;
 use Object::Remote::GlobProxy;
 use Object::Remote::GlobContainer;
+use Object::Remote::Logging qw (:log :dlog);
 use Object::Remote;
 use Symbol;
 use IO::Handle;
@@ -139,6 +140,7 @@ sub remote_object {
 
 sub connect {
   my ($self, $to) = @_;
+  Dlog_debug { "Creating connection to remote node $_" } $to;
   return await_future(
     $self->send_class_call(0, 'Object::Remote', connect => $to)
   );
@@ -147,11 +149,13 @@ sub connect {
 sub remote_sub {
   my ($self, $sub) = @_;
   my ($pkg, $name) = $sub =~ m/^(.*)::([^:]+)$/;
+  log_debug { "Invoking remote sub '$sub'" };
   return await_future($self->send_class_call(0, $pkg, can => $name));
 }
 
 sub send_class_call {
   my ($self, $ctx, @call) = @_;
+  log_trace { "Sending a non-blocking class call" };
   $self->send(call => class_call_handler => $ctx => call => @call);
 }
 
@@ -175,12 +179,14 @@ sub new_class_call_handler {
 
 sub register_remote {
   my ($self, $remote) = @_;
+  log_trace { my $i = $remote->id; "Registered a remote object with id of '$i'" };
   weaken($self->remote_objects_by_id->{$remote->id} = $remote);
   return $remote;
 }
 
 sub send_free {
   my ($self, $id) = @_;
+  log_debug { "sending request to free object '$id'" };
   delete $self->remote_objects_by_id->{$id};
   $self->_send([ free => $id ]);
 }
@@ -215,8 +221,16 @@ sub send_discard {
 
 sub _send {
   my ($self, $to_send) = @_;
-
-  print { $self->send_to_fh } $self->_serialize($to_send)."\n";
+  my $fh = $self->send_to_fh;
+  my $serialized = $self->_serialize($to_send)."\n";
+  Dlog_debug { my $l = length($serialized); "Sending '$l' characters of serialized data to $_" } $fh;
+  #TODO this is very risky for deadlocks unless it's set to non-blocking and then with out extra
+  #logic it could easily do short-writes to the remote side
+  my $ret = print $fh $serialized;
+  Dlog_trace { my $r = defined $ret ? $ret : 'undef'; "print() returned $r with $_" } $fh;
+  #TODO hrm reason print's return value was ignored?
+  die "could not write to filehandle: $!" unless $ret;
+  return $ret; 
 }
 
 sub _serialize {
index 85869d0..639d456 100644 (file)
@@ -57,6 +57,9 @@ sub _listen_ready {
   )->${\$self->connection_callback};
   $f->on_ready(sub { undef($c) });
   $c->ready_future->done;
+  #TODO see if this runs on the controller or the remote node 
+  #if this runs on the controller a poorly behaved remote node
+  #could cause the print() to block but it's a very low probability
   print $new "Shere\n" or die "Couldn't send to new socket: $!";
   return $c;
 }
index 043b688..0b5f33d 100644 (file)
@@ -64,9 +64,16 @@ sub _start_perl {
                 ->watch_io(
                     handle => $sudo_stderr,
                     on_read_ready => sub {
+  #TODO is there a specific reason sysread() and syswrite() aren't
+  #a part of ::MiniLoop? It's one spot to handle errors and other
+  #logic involving filehandles
+                      log_debug { "LocalSudo: Preparing to read data" };
                       if (sysread($sudo_stderr, my $buf, 1024) > 0) {
+                        log_trace { "LocalSudo: successfully read data, printing it to STDERR" };
                         print STDERR $buf;
+                        log_trace { "LocalSudo: print() to STDERR is done" };                   
                       } else {
+                        log_debug { "LocalSudo: received EOF or error on file handle, unwatching it" };
                         Object::Remote->current_loop
                                       ->unwatch_io(
                                           handle => $sudo_stderr,
index 7afac68..c54a8a9 100644 (file)
@@ -20,23 +20,32 @@ our @await;
 
 sub await_future {
   my $f = shift;
+  log_trace { my $ir = $f->is_ready; "await_future() invoked; is_ready: $ir" };
   return $f if $f->is_ready;
   require Object::Remote;
   my $loop = Object::Remote->current_loop;
   {
     local @await = (@await, $f);
     $f->on_ready(sub {
-      $loop->stop if $f == $await[-1]
+      log_trace { my $l = @await; "future has become ready, length of \@await: '$l'" };
+      if ($f == $await[-1]) {
+        log_debug { "This future is not waiting on anything so calling stop on the run loop" };
+        $loop->stop;         
+      }
     });
+    log_debug { "Starting run loop for newly created future" };
     $loop->run;
   }
   if (@await and $await[-1]->is_ready) {
+    log_debug { "Last future in await list was ready, stopping run loop" };
     $loop->stop;
   }
+  log_trace { "await_future() returning" };
   return wantarray ? $f->get : ($f->get)[0];
 }
 
 sub await_all {
+  log_trace { my $l = @_; "await_all() invoked with '$l' futures to wait on" };
   await_future(CPS::Future->wait_all(@_));
   map $_->get, @_;
 }
index 932d0b9..c23bfa1 100644 (file)
@@ -37,14 +37,14 @@ sub pass_watches_to {
 sub watch_io {
   my ($self, %watch) = @_;
   my $fh = $watch{handle};
-  log_debug { my $type = ref($fh); "Adding watch for ref of type '$type'" };
+  Dlog_debug { my $type = ref($fh); "Adding IO watch for $_" } $fh;
   if (my $cb = $watch{on_read_ready}) {
-    log_trace { "IO watcher on_read_ready has been invoked" };
+    log_trace { "IO watcher is registering with select() for reading" };
     $self->_read_select->add($fh);
     $self->_read_watches->{$fh} = $cb;
   }
   if (my $cb = $watch{on_write_ready}) {
-    log_trace { "IO watcher on_write_ready has been invoked" };
+    log_trace { "IO watcher is registering with select() for writing" };
     $self->_write_select->add($fh);
     $self->_write_watches->{$fh} = $cb;
   }
@@ -54,12 +54,14 @@ sub watch_io {
 sub unwatch_io {
   my ($self, %watch) = @_;
   my $fh = $watch{handle};
-  log_debug { my $type = ref($fh); "Removing watch for ref of type '$type'" };
+  Dlog_debug { "Removing IO watch for $_" } $fh;
   if ($watch{on_read_ready}) {
+    log_trace { "IO watcher is removing read from select()" };
     $self->_read_select->remove($fh);
     delete $self->_read_watches->{$fh};
   }
   if ($watch{on_write_ready}) {
+    log_trace { "IO watcher is removing write from select()" };
     $self->_write_select->remove($fh);
     delete $self->_write_watches->{$fh};
   }
@@ -94,6 +96,7 @@ sub _next_timer_expires_delay {
   #when data is ready - when the system
   #deadlocks the chatter from the timeout in
   #select clogs up the logs
+  #TODO should make this an attribute
   my $delay_max = undef;
     
   return $delay_max unless @$timers;
@@ -107,8 +110,8 @@ sub _next_timer_expires_delay {
     $duration = $delay_max;
   }
   
-  log_trace { "returning $duration as select() timeout period" }
-    
+  #uncomment for original behavior
+  #return .5;    
   return $duration; 
 }
 
@@ -122,7 +125,22 @@ sub loop_once {
   my $wait_time = $self->_next_timer_expires_delay;
   log_debug {  sprintf("Run loop: loop_once() has been invoked by $c[1]:$c[2] with read:%i write:%i select timeout:%s",
       scalar(keys(%$read)), scalar(keys(%$write)), defined $wait_time ? $wait_time : 'indefinite' ) };
+  #TODO The docs state that select() in some instances can return a socket as ready to
+  #read data even if reading from it would block and the recomendation is to set
+  #handles used with select() as non-blocking but Perl on Windows can not set a 
+  #handle to use non-blocking IO - If Windows is not one of the operating
+  #systems where select() returns a handle that could block it would work to
+  #enable non-blocking mode only under Posix - the non-blocking sysread()
+  #logic would work unmodified for both blocking and non-blocking handles
+  #under Posix and Windows.
   my ($readable, $writeable) = IO::Select->select(
+    #TODO how come select() isn't used to identify handles with errors on them?
+    #TODO is there a specific reason for a half second maximum wait duration?
+    #The two places I've found for the runloop to be invoked don't return control
+    #to the caller until a controlling variable interrupts the loop that invokes
+    #loop_once() - is this to allow that variable to be polled and exit the
+    #run loop? If so why isn't that behavior event driven and causes select() to
+    #return? 
     $self->_read_select, $self->_write_select, undef, $wait_time
   ); 
   log_debug { 
@@ -159,6 +177,9 @@ sub loop_once {
   return;
 }
 
+#::Node and ::ConnectionServer use the want_run() / want_stop()
+#counter to cause a run-loop to execute while something is active;
+#the futures do this via a different mechanism
 sub want_run {
   my ($self) = @_;
   Dlog_debug { "Run loop: Incrimenting want_running, is now $_" }
@@ -167,7 +188,7 @@ sub want_run {
 
 sub run_while_wanted {
   my ($self) = @_;
-  log_debug { "Run loop: run_while_wanted() invoked" };
+  log_debug { my $wr = $self->{want_running}; "Run loop: run_while_wanted() invoked; want_running: $wr" };
   $self->loop_once while $self->{want_running};
   log_debug { "Run loop: run_while_wanted() completed" };
   return;
@@ -183,6 +204,11 @@ sub want_stop {
     --$self->{want_running};
 }
 
+#TODO Hypothesis: Futures invoke run() which gives that future
+#it's own localized is_running attribute - any adjustment to the
+#is_running attribute outside of that future will not effect that
+#future so each future winds up able to call run() and stop() at 
+#will with out interfering with each other 
 sub run {
   my ($self) = @_;
   log_info { "Run loop: run() invoked" };
index dc2563c..58923c8 100644 (file)
@@ -2,22 +2,29 @@ package Object::Remote::Node;
 
 use strictures 1;
 use Object::Remote::Connector::STDIO;
+use Object::Remote::Logging qw(:log);
 use Object::Remote;
 use CPS::Future;
 
 sub run {
+  log_trace { "run() has been invoked on remote node; creating STDIO connector" };
   my $c = Object::Remote::Connector::STDIO->new->connect;
 
   $c->register_class_call_handler;
 
   my $loop = Object::Remote->current_loop;
 
-  $c->on_close->on_ready(sub { $loop->want_stop });
+  $c->on_close->on_ready(sub { 
+    log_info { "Node connection with call handler has closed" };
+    $loop->want_stop 
+  });
 
   print { $c->send_to_fh } "Shere\n";
 
+  log_debug { "Node is going to start the run loop" };
   $loop->want_run;
   $loop->run_while_wanted;
+  log_debug { "Run loop invocation in node has completed" };
 }
 
 1;
index 5de5bf1..6402bdd 100644 (file)
@@ -27,10 +27,24 @@ has on_line_call => (is => 'rw');
 
 has _receive_data_buffer => (is => 'ro', default => sub { my $x = ''; \$x });
 
+#TODO confirmed this is the point of the hang - sysread() is invoked on a 
+#socket inside the controller that blocks and deadlocks the entire system.
+#The remote nodes are all waiting to receive data at that point.
+#Validated this behavior exists in an unmodified Object::Remote from CPAN 
+#by wrapping this sysread() with warns that have the pid in them and pounding 
+#my local machine with System::Introspector via ssh and 7 remote perl instances
+#It looks like one of the futures is responding to an event regarding the ability
+#to read from a socket and every once in a while an ordering issue means that
+#there is no actual data to read from the socket
 sub _receive_data_from {
   my ($self, $fh) = @_;
   log_trace { "Preparing to read data" };
+  #use Carp qw(cluck); cluck();
   my $rb = $self->_receive_data_buffer;
+  #TODO is there a specific reason sysread() and syswrite() aren't
+  #a part of ::MiniLoop? It's one spot to handle errors and other
+  #logic involving filehandles
+  #TODO why are the buffers so small? BUFSIZ is usually 32768
   my $len = sysread($fh, $$rb, 1024, length($$rb));
   my $err = defined($len) ? '' : ": $!";
   if (defined($len) and $len > 0) {