From: Tyler Riddle Date: Fri, 21 Sep 2012 02:45:40 +0000 (-0700) Subject: experimental move to non-blocking reads in ReadChannel; fix log bugs; annotate fixes... X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=commitdiff_plain;h=90a3a7f22af359fe9a659c9e8fcf5d332af72ebb;p=scpubgit%2FObject-Remote.git experimental move to non-blocking reads in ReadChannel; fix log bugs; annotate fixes for huge json and stuck process issues --- diff --git a/lib/Object/Remote/Connection.pm b/lib/Object/Remote/Connection.pm index ddaa74b..2b8483c 100644 --- a/lib/Object/Remote/Connection.pm +++ b/lib/Object/Remote/Connection.pm @@ -24,14 +24,18 @@ has _id => ( is => 'ro', required => 1, default => sub { our $NEXT_CONNECTION_ID has send_to_fh => ( is => 'ro', required => 1, - trigger => sub { $_[1]->autoflush(1) }, + trigger => sub { + my $self = $_[0]; + $_[1]->autoflush(1); + Dlog_trace { my $id = $self->_id; "connection had send_to_fh set to $_" } $_[1]; + }, ); has read_channel => ( is => 'ro', required => 1, trigger => sub { my ($self, $ch) = @_; - Dlog_trace { "trigger for read_channel has been invoked for connection $_" } $self->_id; + Dlog_trace { my $id = $self->_id; "trigger for read_channel has been invoked for connection $id; file handle is " } $ch->fh; weaken($self); $ch->on_line_call(sub { $self->_receive(@_) }); $ch->on_close_call(sub { $self->on_close->done(@_) }); diff --git a/lib/Object/Remote/FatNode.pm b/lib/Object/Remote/FatNode.pm index 419598f..a0fddc0 100644 --- a/lib/Object/Remote/FatNode.pm +++ b/lib/Object/Remote/FatNode.pm @@ -1,5 +1,8 @@ package Object::Remote::FatNode; +#TODO If a file does not end in a new line by itself +#then fat node fails + use strictures 1; use Config; use B qw(perlstring); diff --git a/lib/Object/Remote/Handle.pm b/lib/Object/Remote/Handle.pm index de7a65b..40584b5 100644 --- a/lib/Object/Remote/Handle.pm +++ b/lib/Object/Remote/Handle.pm @@ -34,7 +34,7 @@ sub BUILD { my ($self, $args) = @_; log_debug { "constructing remote handle" }; if ($self->id) { - log_trace { "disaming free for this hanle" }; + log_trace { "disarming free for this handle" }; $self->disarm_free; } else { die "No id supplied and no class either" unless $args->{class}; @@ -49,14 +49,14 @@ sub BUILD { )->{remote}->disarm_free->id ); } - log_trace { "finished constructing remote handle; registering it" . ref($self) }; + log_trace { "finished constructing remote handle; registering it " . ref($self) }; $self->connection->register_remote($self); } sub call { my ($self, $method, @args) = @_; my $w = wantarray; - log_debug { my $def = defined $w; "call() has been invoked on a remote handle; wantarray: '$def'" }; + log_debug { my $def = defined $w ? 1 : 0; "call() has been invoked on a remote handle; wantarray: '$def'" }; $method = "start::${method}" if (caller(0)||'') eq 'start'; future { $self->connection->send(call => $self->id, $w, $method, @args) diff --git a/lib/Object/Remote/MiniLoop.pm b/lib/Object/Remote/MiniLoop.pm index c23bfa1..5f22eac 100644 --- a/lib/Object/Remote/MiniLoop.pm +++ b/lib/Object/Remote/MiniLoop.pm @@ -37,14 +37,26 @@ sub pass_watches_to { sub watch_io { my ($self, %watch) = @_; my $fh = $watch{handle}; - Dlog_debug { my $type = ref($fh); "Adding IO watch for $_" } $fh; + Dlog_debug { "Adding IO watch for $_" } $fh; + + #TODO if this works out non-blocking support + #will need to be integrated in a way that + #is compatible with Windows which has no + #non-blocking support + Dlog_warn { "setting file handle to be non-blocking: " } $fh; + use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); + my $flags = fcntl($fh, F_GETFL, 0) + or die "Can't get flags for the socket: $!\n"; + $flags = fcntl($fh, F_SETFL, $flags | O_NONBLOCK) + or die "Can't set flags for the socket: $!\n"; + if (my $cb = $watch{on_read_ready}) { - log_trace { "IO watcher is registering with select() for reading" }; + log_trace { "IO watcher is registering with select for reading" }; $self->_read_select->add($fh); $self->_read_watches->{$fh} = $cb; } if (my $cb = $watch{on_write_ready}) { - log_trace { "IO watcher is registering with select() for writing" }; + log_trace { "IO watcher is registering with select for writing" }; $self->_write_select->add($fh); $self->_write_watches->{$fh} = $cb; } diff --git a/lib/Object/Remote/ReadChannel.pm b/lib/Object/Remote/ReadChannel.pm index b328d11..2d3da40 100644 --- a/lib/Object/Remote/ReadChannel.pm +++ b/lib/Object/Remote/ReadChannel.pm @@ -3,6 +3,7 @@ package Object::Remote::ReadChannel; use CPS::Future; use Scalar::Util qw(weaken); use Object::Remote::Logging qw(:log :dlog); +use POSIX; use Moo; has fh => ( @@ -51,7 +52,10 @@ sub _receive_data_from { while (my $cb = $self->on_line_call and $$rb =~ s/^(.*)\n//) { $cb->(my $line = $1); } - } else { + #TODO this isn't compatible with Windows but would be if + #EAGAIN was set to something that could never match + #if on Windows + } elsif ($! != EAGAIN) { log_trace { "Got EOF or error, this read channel is done" }; Object::Remote->current_loop ->unwatch_io( diff --git a/lib/Object/Remote/Role/Connector/PerlInterpreter.pm b/lib/Object/Remote/Role/Connector/PerlInterpreter.pm index b8e3f75..b1efd9b 100644 --- a/lib/Object/Remote/Role/Connector/PerlInterpreter.pm +++ b/lib/Object/Remote/Role/Connector/PerlInterpreter.pm @@ -1,6 +1,7 @@ package Object::Remote::Role::Connector::PerlInterpreter; -use IPC::Open2; +#use IPC::Open2; +use IPC::Open3; use IO::Handle; use Object::Remote::ModuleSender; use Object::Remote::Handle; @@ -55,17 +56,33 @@ sub final_perl_command { shift->perl_command } sub _start_perl { my $self = shift; Dlog_debug { "invoking connection to perl interpreter using command line: $_" } @{$self->final_perl_command}; + + #TODO open2() dupes the child stderr into the calling + #process stderr which means if this process exits the + #child is still attached to the shell - using open3() + #and having the run loop manage the stderr means this + #won't happen BUT if the run loop just sends the remote + #stderr data to the local stderr the logs will interleave + #for sure - a simple test would be to use open3() and just + #close the remote stderr and see what happens - a longer + #term solution would be for Object::Remote to offer a feature + #where the user of a connection species a destination for output + #either a file name or their own file handle and the node output + #is dumped to it my $pid = open2( my $foreign_stdout, my $foreign_stdin, @{$self->final_perl_command}, ) or die "Failed to run perl at '$_[0]': $!"; + Dlog_trace { "Connection to remote side successful; remote stdin and stdout: $_" } [ $foreign_stdin, $foreign_stdout ]; return ($foreign_stdin, $foreign_stdout, $pid); } #TODO open2() forks off a child and I have not been able to locate #a mechanism for reaping dead children so they don't become zombies +#CONFIRMED there is no reaping of children being done, find a safe +#way to do it sub _open2_for { my $self = shift; my ($foreign_stdin, $foreign_stdout, $pid) = $self->_start_perl(@_);