experimental move to non-blocking reads in ReadChannel; fix log bugs; annotate fixes...
Tyler Riddle [Fri, 21 Sep 2012 02:45:40 +0000 (19:45 -0700)]
lib/Object/Remote/Connection.pm
lib/Object/Remote/FatNode.pm
lib/Object/Remote/Handle.pm
lib/Object/Remote/MiniLoop.pm
lib/Object/Remote/ReadChannel.pm
lib/Object/Remote/Role/Connector/PerlInterpreter.pm

index b051b0b..ef44655 100644 (file)
@@ -24,14 +24,18 @@ has _id => ( is => 'ro', required => 1, default => sub { our $NEXT_CONNECTION_ID
 
 has send_to_fh => (
   is => 'ro', required => 1,
-  trigger => sub { $_[1]->autoflush(1) },
+  trigger => sub {
+      my $self = $_[0];
+      $_[1]->autoflush(1);
+      Dlog_trace { my $id = $self->_id; "connection had send_to_fh set to $_"  } $_[1];
+  },
 );
 
 has read_channel => (
   is => 'ro', required => 1,
   trigger => sub {
     my ($self, $ch) = @_;
-    Dlog_trace { "trigger for read_channel has been invoked for connection $_" } $self->_id;
+    Dlog_trace { my $id = $self->_id; "trigger for read_channel has been invoked for connection $id; file handle is " } $ch->fh; 
     weaken($self);
     $ch->on_line_call(sub { $self->_receive(@_) });
     $ch->on_close_call(sub { $self->on_close->done(@_) });
index db1c17c..d60b81d 100644 (file)
@@ -1,5 +1,8 @@
 package Object::Remote::FatNode;
 
+#TODO If a file does not end in a new line by itself
+#then fat node fails
+
 use strictures 1;
 use Config;
 use B qw(perlstring);
index de7a65b..40584b5 100644 (file)
@@ -34,7 +34,7 @@ sub BUILD {
   my ($self, $args) = @_;
   log_debug { "constructing remote handle" };
   if ($self->id) {
-    log_trace { "disaming free for this hanle" };
+    log_trace { "disarming free for this handle" };
     $self->disarm_free;
   } else {
     die "No id supplied and no class either" unless $args->{class};
@@ -49,14 +49,14 @@ sub BUILD {
       )->{remote}->disarm_free->id
     );
   }
-  log_trace { "finished constructing remote handle; registering it" . ref($self) };
+  log_trace { "finished constructing remote handle; registering it " . ref($self) };
   $self->connection->register_remote($self);
 }
 
 sub call {
   my ($self, $method, @args) = @_;
   my $w = wantarray;
-  log_debug { my $def = defined $w; "call() has been invoked on a remote handle; wantarray: '$def'" };
+  log_debug { my $def = defined $w ? 1 : 0; "call() has been invoked on a remote handle; wantarray: '$def'" };
   $method = "start::${method}" if (caller(0)||'') eq 'start';
   future {
     $self->connection->send(call => $self->id, $w, $method, @args)
index c23bfa1..5f22eac 100644 (file)
@@ -37,14 +37,26 @@ sub pass_watches_to {
 sub watch_io {
   my ($self, %watch) = @_;
   my $fh = $watch{handle};
-  Dlog_debug { my $type = ref($fh); "Adding IO watch for $_" } $fh;
+  Dlog_debug { "Adding IO watch for $_" } $fh;
+
+  #TODO if this works out non-blocking support
+  #will need to be integrated in a way that
+  #is compatible with Windows which has no
+  #non-blocking support
+  Dlog_warn { "setting file handle to be non-blocking: " } $fh;
+  use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
+  my $flags = fcntl($fh, F_GETFL, 0)
+    or die "Can't get flags for the socket: $!\n";
+  $flags = fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
+    or die "Can't set flags for the socket: $!\n";
+
   if (my $cb = $watch{on_read_ready}) {
-    log_trace { "IO watcher is registering with select() for reading" };
+    log_trace { "IO watcher is registering with select for reading" };
     $self->_read_select->add($fh);
     $self->_read_watches->{$fh} = $cb;
   }
   if (my $cb = $watch{on_write_ready}) {
-    log_trace { "IO watcher is registering with select() for writing" };
+    log_trace { "IO watcher is registering with select for writing" };
     $self->_write_select->add($fh);
     $self->_write_watches->{$fh} = $cb;
   }
index b328d11..2d3da40 100644 (file)
@@ -3,6 +3,7 @@ package Object::Remote::ReadChannel;
 use CPS::Future;
 use Scalar::Util qw(weaken);
 use Object::Remote::Logging qw(:log :dlog);
+use POSIX;
 use Moo;
 
 has fh => (
@@ -51,7 +52,10 @@ sub _receive_data_from {
     while (my $cb = $self->on_line_call and $$rb =~ s/^(.*)\n//) {
       $cb->(my $line = $1);
     }
-  } else {
+  #TODO this isn't compatible with Windows but would be if
+  #EAGAIN was set to something that could never match
+  #if on Windows   
+  } elsif ($! != EAGAIN) {
     log_trace { "Got EOF or error, this read channel is done" };
     Object::Remote->current_loop
                   ->unwatch_io(
index b8e3f75..b1efd9b 100644 (file)
@@ -1,6 +1,7 @@
 package Object::Remote::Role::Connector::PerlInterpreter;
 
-use IPC::Open2;
+#use IPC::Open2;
+use IPC::Open3; 
 use IO::Handle;
 use Object::Remote::ModuleSender;
 use Object::Remote::Handle;
@@ -55,17 +56,33 @@ sub final_perl_command { shift->perl_command }
 sub _start_perl {
   my $self = shift;
   Dlog_debug { "invoking connection to perl interpreter using command line: $_" } @{$self->final_perl_command};
+  
+  #TODO open2() dupes the child stderr into the calling
+  #process stderr which means if this process exits the
+  #child is still attached to the shell - using open3()
+  #and having the run loop manage the stderr means this
+  #won't happen BUT if the run loop just sends the remote
+  #stderr data to the local stderr the logs will interleave
+  #for sure - a simple test would be to use open3() and just
+  #close the remote stderr and see what happens - a longer
+  #term solution would be for Object::Remote to offer a feature
+  #where the user of a connection species a destination for output
+  #either a file name or their own file handle and the node output
+  #is dumped to it 
   my $pid = open2(
     my $foreign_stdout,
     my $foreign_stdin,
     @{$self->final_perl_command},
   ) or die "Failed to run perl at '$_[0]': $!";
+
   Dlog_trace { "Connection to remote side successful; remote stdin and stdout: $_" } [ $foreign_stdin, $foreign_stdout ];
   return ($foreign_stdin, $foreign_stdout, $pid);
 }
 
 #TODO open2() forks off a child and I have not been able to locate
 #a mechanism for reaping dead children so they don't become zombies
+#CONFIRMED there is no reaping of children being done, find a safe
+#way to do it
 sub _open2_for {
   my $self = shift;
   my ($foreign_stdin, $foreign_stdout, $pid) = $self->_start_perl(@_);