remove incomplete non-blocking support; make select() timeout duration configurable...
Tyler Riddle [Wed, 3 Oct 2012 15:10:47 +0000 (08:10 -0700)]
lib/Object/Remote/Connection.pm
lib/Object/Remote/ConnectionServer.pm
lib/Object/Remote/Connector/LocalSudo.pm
lib/Object/Remote/Connector/SSH.pm
lib/Object/Remote/FatNode.pm
lib/Object/Remote/MiniLoop.pm
lib/Object/Remote/ModuleLoader.pm
lib/Object/Remote/ReadChannel.pm
lib/Object/Remote/Role/Connector/PerlInterpreter.pm

index 02c874c..acede83 100644 (file)
@@ -55,11 +55,7 @@ END {
   kill(1, -2);
 }
 
-
 our $DEBUG = !!$ENV{OBJECT_REMOTE_DEBUG};
-#numbering each connection allows it to be
-#tracked along with file handles in
-#the logs
 
 has _id => ( is => 'ro', required => 1, default => sub { our $NEXT_CONNECTION_ID++ } );
 
@@ -72,9 +68,6 @@ has send_to_fh => (
   },
 );
 
-#TODO see if this is another case of the same bug below 
-#where trigger never fires because the attribute isn't
-#actually set at any time
 has read_channel => (
   is => 'ro', required => 1,
   trigger => sub {
@@ -90,38 +83,9 @@ has read_channel => (
   },
 );
 
-#TODO properly fix this bug -
-#trigger can't ever be invoked with a default
-#value and the on_close attribute is read only....
-#the future never gets the on_done handler
-#installed 
-sub BUILD { 
-  my ($self) = @_; 
-  $self->on_close(CPS::Future->new);
-}
-
-after BUILD => sub {
-  my ($self) = @_; 
-  
-  return unless defined $self->child_pid; 
-  
-  log_debug { "Setting process group of child process" };
-  
-  setpgrp($self->child_pid, 1);
-};
-
-
 has on_close => (
-  is => 'rw', default => sub { CPS::Future->new },
-  trigger => sub {
-    my ($self, $f) = @_;
-    Dlog_trace { "trigger for on_close has been invoked for connection $_" } $self->_id;
-    weaken($self);
-    $f->on_done(sub {
-      Dlog_trace { "failing all of the outstanding futures for connection $_" } $self->_id;
-      $self->_fail_outstanding("Object::Remote connection lost: " . ($f->get)[0]);
-    });
-  }
+  is => 'rw', default => sub { $_[0]->_install_future_handlers(CPS::Future->new) },
+  trigger => \&_install_future_handlers,
 );
 
 has child_pid => (is => 'ro');
@@ -138,6 +102,26 @@ has remote_objects_by_id => (
 
 has outstanding_futures => (is => 'ro', default => sub { {} });
 
+has _json => (
+  is => 'lazy',
+  handles => {
+    _deserialize => 'decode',
+    _encode => 'encode',
+  },
+);
+
+after BUILD => sub {
+  my ($self) = @_; 
+  
+  return unless defined $self->child_pid; 
+  
+  log_debug { "Setting process group of child process" };
+  
+  setpgrp($self->child_pid, 1);
+};
+
+sub BUILD { }
+
 sub _fail_outstanding {
   my ($self, $error) = @_;
   Dlog_debug { "Failing outstanding futures with '$error' for connection $_" } $self->_id;
@@ -147,13 +131,16 @@ sub _fail_outstanding {
   return;
 }
 
-has _json => (
-  is => 'lazy',
-  handles => {
-    _deserialize => 'decode',
-    _encode => 'encode',
-  },
-);
+sub _install_future_handlers {
+    my ($self, $f) = @_;
+    Dlog_trace { "trigger for on_close has been invoked for connection $_" } $self->_id;
+    weaken($self);
+    $f->on_done(sub {
+      Dlog_trace { "failing all of the outstanding futures for connection $_" } $self->_id;
+      $self->_fail_outstanding("Object::Remote connection lost: " . ($f->get)[0]);
+    });
+    return $f; 
+};
 
 sub _id_to_remote_object {
   my ($self, $id) = @_;
@@ -340,24 +327,20 @@ sub _send {
   Dlog_trace { "Starting to serialize data in argument to _send for connection $_" } $self->_id;
   my $serialized = $self->_serialize($to_send)."\n";
   Dlog_trace { my $l = length($serialized); "serialization is completed; sending '$l' characters of serialized data to $_" } $fh;
-  #TODO this is very risky for deadlocks unless it's set to non-blocking and then with out extra
-  #logic it could easily do short-writes to the remote side - how about taking this entire buffer
-  #and having the run loop send it to the file handle so this doesn't block while the sending
-  #is happening? 
   my $ret; 
   eval { 
-      local($SIG{PIPE}) = 'IGNORE';
-      die "filehandle is not open" unless openhandle($fh);
-      log_trace { "file handle has passed openhandle() test; printing to it" };
-      $ret = print $fh $serialized;
-      die "print was not successful: $!" unless defined $ret
+    #TODO this should be converted over to a non-blocking ::WriteChannel class
+    die "filehandle is not open" unless openhandle($fh);
+    log_trace { "file handle has passed openhandle() test; printing to it" };
+    $ret = print $fh $serialized;
+    die "print was not successful: $!" unless defined $ret
   };
     
   if ($@) {
-      Dlog_debug { "exception encountered when trying to write to file handle $_: $@" } $fh;
-      my $error = $@; chomp($error);
-      $self->on_close->done("could not write to file handle: $error") unless $self->on_close->is_ready;
-      return; 
+    Dlog_debug { "exception encountered when trying to write to file handle $_: $@" } $fh;
+    my $error = $@; chomp($error);
+    $self->on_close->done("could not write to file handle: $error") unless $self->on_close->is_ready;
+    return; 
   }
       
   return $ret; 
index 13bf811..330cb24 100644 (file)
@@ -5,7 +5,6 @@ use Module::Runtime qw(use_module);
 use Object::Remote;
 use Object::Remote::Logging qw( :log :dlog );
 use IO::Socket::UNIX;
-use POSIX ();
 use Moo;
 
 has listen_on => (
@@ -65,9 +64,6 @@ sub _listen_ready {
   $f->on_ready(sub { undef($c) });
   log_trace { "marking the future as done" };
   $c->ready_future->done;
-  #TODO see if this runs on the controller or the remote node 
-  #if this runs on the controller a poorly behaved remote node
-  #could cause the print() to block but it's a very low probability
   Dlog_trace { "Sending 'Shere' to socket $_" } $new; 
   print $new "Shere\n" or die "Couldn't send to new socket: $!";
   log_debug { "Connection has been fully handled" };
index 044d106..e9bbb96 100644 (file)
@@ -64,9 +64,6 @@ sub _start_perl {
                 ->watch_io(
                     handle => $sudo_stderr,
                     on_read_ready => sub {
-  #TODO is there a specific reason sysread() and syswrite() aren't
-  #a part of ::MiniLoop? It's one spot to handle errors and other
-  #logic involving filehandles
                       Dlog_debug { "LocalSudo: Preparing to read data from $_" } $sudo_stderr;
                       if (sysread($sudo_stderr, my $buf, 32768) > 0) {
                         log_trace { "LocalSudo: successfully read data, printing it to STDERR" };
index fcfb445..363e214 100644 (file)
@@ -14,9 +14,6 @@ has ssh_options => (is => 'ro', default => sub { [ '-A' ] });
 
 has ssh_command => (is => 'ro', default => sub { 'ssh' });
 
-#TODO properly integrate if this works
-BEGIN { $ENV{TERM} = 'dumb'; } 
-
 sub _escape_shell_arg { 
     my ($self, $str) = (@_);
     $str =~ s/((?:^|[^\\])(?:\\\\)*)'/$1\\'/g;
index c4725ab..9b00483 100644 (file)
@@ -1,8 +1,5 @@
 package Object::Remote::FatNode;
 
-#TODO If a file does not end in a new line by itself
-#then fat node fails
-
 use strictures 1;
 use Config;
 use B qw(perlstring);
index a4a835c..5be3100 100644 (file)
@@ -8,6 +8,9 @@ use Moo;
 # this is ro because we only actually set it using local in sub run
 
 has is_running => (is => 'ro', clearer => 'stop');
+#maximum duration that select() will block - undef means indefinite,
+#0 means no blocking, otherwise maximum time in seconds
+has block_duration => ( is => 'rw' );
 
 has _read_watches => (is => 'ro', default => sub { {} });
 has _read_select => (is => 'ro', default => sub { IO::Select->new });
@@ -38,19 +41,6 @@ sub watch_io {
   my ($self, %watch) = @_;
   my $fh = $watch{handle};
   Dlog_debug { "Adding IO watch for $_" } $fh;
-
-  #TODO if this works out non-blocking support
-  #will need to be integrated in a way that
-  #is compatible with Windows which has no
-  #non-blocking support - see also ::ReadChannel
-  if (0) {
-    Dlog_warn { "setting file handle to be non-blocking: $_" } $fh;
-    use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
-    my $flags = fcntl($fh, F_GETFL, 0)
-      or die "Can't get flags for the socket: $!\n";
-    $flags = fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
-      or die "Can't set flags for the socket: $!\n"; 
-  }
   
   if (my $cb = $watch{on_read_ready}) {
     log_trace { "IO watcher is registering with select for reading" };
@@ -126,12 +116,7 @@ sub unwatch_time {
 sub _next_timer_expires_delay {
   my ($self) = @_;
   my $timers = $self->_timers;
-  #undef means no timeout, select only returns
-  #when data is ready - when the system
-  #deadlocks the chatter from the timeout in
-  #select clogs up the logs
-  #TODO should make this an attribute
-  my $delay_max = undef;
+  my $delay_max = $self->block_duration;
     
   return $delay_max unless @$timers;
   my $duration = $timers->[0]->[0] - time;
index a5b8d97..0e59604 100644 (file)
@@ -9,7 +9,6 @@ BEGIN {
   # unqualified INC forced into package main
   sub Object::Remote::ModuleLoader::Hook::INC {
     my ($self, $module) = @_;
-    #TODO not logging - timing issue?
     log_debug { "Loading $module via " . ref($self) };
     if (my $code = $self->sender->source_for($module)) {
       open my $fh, '<', \$code;
index f6a7cca..5d7e9f0 100644 (file)
@@ -3,7 +3,6 @@ package Object::Remote::ReadChannel;
 use CPS::Future;
 use Scalar::Util qw(weaken openhandle);
 use Object::Remote::Logging qw(:log :dlog);
-use POSIX;
 use Moo;
 
 has fh => (
@@ -39,10 +38,7 @@ sub _receive_data_from {
     while (my $cb = $self->on_line_call and $$rb =~ s/^(.*)\n//) {
       $cb->(my $line = $1);
     }
-  #TODO this isn't compatible with Windows but would be if
-  #EAGAIN was set to something that could never match
-  #if on Windows   
-  } elsif ($! != EAGAIN) {
+  } else {
     log_trace { "Got EOF or error, this read channel is done" };
     Object::Remote->current_loop
                   ->unwatch_io(
index b3e82d0..4f74078 100644 (file)
@@ -3,14 +3,13 @@ package Object::Remote::Role::Connector::PerlInterpreter;
 use IPC::Open2;
 use IPC::Open3; 
 use IO::Handle;
+use Symbol; 
 use Object::Remote::Logging qw( :log :dlog );
 use Object::Remote::ModuleSender;
 use Object::Remote::Handle;
 use Object::Remote::Future;
 use Scalar::Util qw(blessed weaken);
-use POSIX;
 use Moo::Role;
-use Symbol; 
 
 with 'Object::Remote::Role::Connector';
 
@@ -30,14 +29,8 @@ sub _build_module_sender {
 has perl_command => (is => 'lazy');
 has watchdog_timeout => ( is => 'ro', required => 1, default => sub { 0 } );
 
-#TODO convert nice value into optional feature enabled by
-#setting value of attribute
-#ulimit of ~500 megs of v-ram
-#TODO only works with ssh with quotes but only works locally
-#with out quotes
+#TODO convert the ulimit and nice values into configurable attributes
 sub _build_perl_command {[ 'sh -c "ulimit -v 200000; nice -n 15 perl -"' ] }
-#sub _build_perl_command { [ 'perl', '-' ] }
-#sub _build_perl_command { [ 'cat' ] }
 
 around connect => sub {
   my ($orig, $self) = (shift, shift);
@@ -90,16 +83,15 @@ sub _start_perl {
   ) or die "Failed to run perl at '$_[0]': $!";
   
   if (defined($given_stderr)) {   
-      log_warn { "using experimental cat for child stderr" };
+      Dlog_debug { "Child process STDERR is being handled via run loop" };
         
-      #TODO refactor if this solves the problem
       Object::Remote->current_loop
                     ->watch_io(
                         handle => $foreign_stderr,
                         on_read_ready => sub {
                           my $buf = ''; 
                           my $len = sysread($foreign_stderr, $buf, 32768);
-                          if ((!defined($len) && $! != EAGAIN) or $len == 0) {
+                          if (!defined($len) or $len == 0) {
                             log_trace { "Got EOF or error on child stderr, removing from watcher" };
                             $self->stderr(undef);
                             Object::Remote->current_loop
@@ -133,7 +125,7 @@ sub _open2_for {
                       }
                       # if the stdin went away, we'll never get Shere
                       # so it's not a big deal to simply give up on !defined
-                      if ((!defined($len) && $! != EAGAIN) or 0 == length($to_send)) {
+                      if (!defined($len) or 0 == length($to_send)) {
                         log_trace { "Got EOF or error when writing fatnode data to filehandle, unwatching it" };
                         Object::Remote->current_loop
                                       ->unwatch_io(