fix bug where dead connections would not execute cleanup code; add in repeating timer...
Tyler Riddle [Thu, 27 Sep 2012 01:47:42 +0000 (18:47 -0700)]
lib/Object/Remote/Connection.pm
lib/Object/Remote/MiniLoop.pm
lib/Object/Remote/ReadChannel.pm
lib/Object/Remote/Role/Connector.pm
lib/Object/Remote/Role/Connector/PerlInterpreter.pm

index bfd9996..4d46618 100644 (file)
@@ -12,7 +12,7 @@ use Object::Remote;
 use Symbol;
 use IO::Handle;
 use Module::Runtime qw(use_module);
-use Scalar::Util qw(weaken blessed refaddr);
+use Scalar::Util qw(weaken blessed refaddr openhandle);
 use JSON::PP qw(encode_json);
 use Moo;
 
@@ -36,21 +36,36 @@ has read_channel => (
   is => 'ro', required => 1,
   trigger => sub {
     my ($self, $ch) = @_;
-    Dlog_trace { my $id = $self->_id; "trigger for read_channel has been invoked for connection $id; file handle is " } $ch->fh; 
+    my $id = $self->_id; 
+    Dlog_trace { "trigger for read_channel has been invoked for connection $id; file handle is $_" } $ch->fh; 
     weaken($self);
     $ch->on_line_call(sub { $self->_receive(@_) });
-    $ch->on_close_call(sub { $self->on_close->done(@_) });
+    $ch->on_close_call(sub { 
+        log_trace { "invoking 'done' on on_close handler for connection id '$id'" }; 
+        $self->on_close->done(@_);
+    });
   },
 );
 
+#TODO properly fix this bug -
+#trigger can't ever be invoked with a default
+#value and the on_close attribute is read only....
+#the future never gets the on_done handler
+#installed 
+sub BUILD {
+  my ($self) = @_; 
+  $self->on_close(CPS::Future->new);  
+}
+
 has on_close => (
-  is => 'ro', default => sub { CPS::Future->new },
+  is => 'rw', default => sub { CPS::Future->new },
   trigger => sub {
     my ($self, $f) = @_;
     Dlog_trace { "trigger for on_close has been invoked for connection $_" } $self->_id;
     weaken($self);
     $f->on_done(sub {
-      $self->_fail_outstanding("Connection lost: ".($f->get)[0]);
+      Dlog_trace { "failing all of the outstanding futures for connection $_" } $self->_id;
+      $self->_fail_outstanding("Connection lost: " . ($f->get)[0]);
     });
   }
 );
@@ -256,10 +271,22 @@ sub _send {
   #logic it could easily do short-writes to the remote side - how about taking this entire buffer
   #and having the run loop send it to the file handle so this doesn't block while the sending
   #is happening? 
-  my $ret = print $fh $serialized;
-  Dlog_trace { my $r = defined $ret ? $ret : 'undef'; "print() returned $r with $_" } $fh;
-  #TODO hrm reason print's return value was ignored?
-  die "could not write to filehandle: $!" unless $ret;
+  my $ret; 
+  eval { 
+      local($SIG{PIPE}) = 'IGNORE';
+      die "filehandle is not open" unless openhandle($fh);
+      log_trace { "file handle has passed openhandle() test; printing to it" };
+      $ret = print $fh $serialized;
+      die "print was not successful: $!" unless defined $ret
+  };
+  
+  if ($@) {
+      Dlog_debug { "exception encountered when trying to write to file handle $_: $@" } $fh;
+      my $error = $@; chomp($error);
+      $self->on_close->done("could not write to file handle: $error") unless $self->on_close->is_ready;
+      return; 
+  }
+      
   return $ret; 
 }
 
index 60b7c27..af9ce36 100644 (file)
@@ -82,16 +82,36 @@ sub unwatch_io {
   return;
 }
 
+sub _sort_timers {
+  my ($self, @new) = @_;
+  my $timers = $self->_timers; 
+  
+  log_trace { "Sorting timers" };
+  
+  @{$timers} = sort { $a->[0] <=> $b->[0] } @{$timers}, @new;
+  return;   
+}
+
 sub watch_time {
   my ($self, %watch) = @_;
-  my $at = $watch{at} || do {
-    die "watch_time requires at or after" unless my $after = $watch{after};
-    time() + $after;
-  };
+  my $at; 
+  
+  Dlog_trace { "watch_time() invoked with $_" } \%watch;
+  if (exists($watch{every})) {
+    $at = time() + $watch{every};
+  } elsif (exists($watch{after})) {
+    $at = time() + $watch{after}; 
+  } elsif (exists($watch{at})) {
+      $at = $watch{at}; 
+  } else {
+      die "watch_time requires every, after or at";
+  }
+  
   die "watch_time requires code" unless my $code = $watch{code};
   my $timers = $self->_timers;
-  my $new = [ $at => $code ];
-  @{$timers} = sort { $a->[0] <=> $b->[0] } @{$timers}, $new;
+  my $new = [ $at => $code, $watch{every} ];
+  $self->_sort_timers($new); 
   log_debug { "Created new timer that expires at '$at'" };
   return "$new";
 }
@@ -187,10 +207,26 @@ sub loop_once {
   my $now = time();
   log_trace { "Checking timers" };
   while (@$timers and $timers->[0][0] <= $now) {
-    Dlog_debug { "Found timer that needs to be executed: $_" } $timers->[0];
-    (shift @$timers)->[1]->();
+    my $active = $timers->[0]; 
+    Dlog_debug { "Found timer that needs to be executed: $_" } $active;
+#   my (shift @$timers)->[1]->();
+     
+    if (defined($active->[2])) {
+      #handle the case of an 'every' timer
+      $active->[0] = time() + $active->[2]; 
+      Dlog_trace { "scheduling timer for repeat execution at $_"} $active->[0];
+      $self->_sort_timers;
+    } else {
+      #it doesn't repeat again so get rid of it  
+      shift(@$timers);    
+    }
+
+    #execute the timer
+    $active->[1]->();
+     
     last if $Loop_Entered;
   }
+  
   log_trace { "Run loop: single loop is completed" };
   return;
 }
index 6b27348..fb13c80 100644 (file)
@@ -49,6 +49,7 @@ sub _receive_data_from {
                       handle => $self->fh,
                       on_read_ready => 1
                     );
+    log_trace { "Invoking on_close_call() for dead read channel" };
     $self->on_close_call->($err);
   }
 }
index d5d46d7..4f008d7 100644 (file)
@@ -38,6 +38,7 @@ sub connect {
       undef($channel);
     });
     $channel->on_close_call(sub {
+      log_trace { "Connection has been closed" };
       $f->fail("Channel closed without seeing Shere: $_[0]");
       undef($channel);
     });
index 810c822..6509e36 100644 (file)
@@ -10,6 +10,7 @@ use Object::Remote::Logging qw( :log :dlog );
 use Scalar::Util qw(blessed);
 use POSIX ":sys_wait_h";
 use Moo::Role;
+use Symbol; 
 
 with 'Object::Remote::Role::Connector';
 
@@ -44,7 +45,6 @@ has perl_command => (is => 'lazy');
 #ulimit of ~500 megs of v-ram
 #TODO only works with ssh with quotes but only works locally
 #with out quotes
-#sub _build_perl_command { [ 'sh', '-c', '"ulimit -v 80000; nice -n 15 perl -"' ] }
 sub _build_perl_command { [ 'sh', '-c', '"ulimit -v 200000; nice -n 15 perl -"' ] }
 #sub _build_perl_command { [ 'perl', '-' ] }
 
@@ -76,9 +76,7 @@ sub _start_perl {
   my $foreign_stderr;
  
   Dlog_debug { "invoking connection to perl interpreter using command line: $_" } @{$self->final_perl_command};
-  
-  use Symbol; 
-  
+    
   if (defined($given_stderr)) {
       $foreign_stderr = gensym();
   } else {