From: Tyler Riddle Date: Fri, 21 Sep 2012 20:20:27 +0000 (-0700) Subject: add non-blocking file handle support for reading in miniloop and change miniloop... X-Git-Tag: v0.003001_01~111 X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?p=scpubgit%2FObject-Remote.git;a=commitdiff_plain;h=6b7b2732b7cc6b6c626469f2e6300b7012caef07 add non-blocking file handle support for reading in miniloop and change miniloop to be safer under recursion; performing manual management of child process stderr --- diff --git a/lib/Object/Remote/Connection.pm b/lib/Object/Remote/Connection.pm index ef44655..d338a21 100644 --- a/lib/Object/Remote/Connection.pm +++ b/lib/Object/Remote/Connection.pm @@ -239,7 +239,7 @@ sub _send { my $fh = $self->send_to_fh; Dlog_trace { "Starting to serialize data in argument to _send for connection $_" } $self->_id; my $serialized = $self->_serialize($to_send)."\n"; - Dlog_debug { my $l = length($serialized); "serialization is completed; sending '$l' characters of serialized data to $_" } $fh; + Dlog_trace { my $l = length($serialized); "serialization is completed; sending '$l' characters of serialized data to $_" } $fh; #TODO this is very risky for deadlocks unless it's set to non-blocking and then with out extra #logic it could easily do short-writes to the remote side my $ret = print $fh $serialized; @@ -252,7 +252,6 @@ sub _send { sub _serialize { my ($self, $data) = @_; local our @New_Ids = (-1); - Dlog_debug { "starting to serialize data for connection $_" } $self->_id; return eval { my $flat = $self->_encode($self->_deobjectify($data)); warn "$$ >>> ${flat}\n" if $DEBUG; diff --git a/lib/Object/Remote/Future.pm b/lib/Object/Remote/Future.pm index 2817539..d6d6081 100644 --- a/lib/Object/Remote/Future.pm +++ b/lib/Object/Remote/Future.pm @@ -29,15 +29,15 @@ sub await_future { $f->on_ready(sub { log_trace { my $l = @await; "future has become ready, length of \@await: '$l'" }; if ($f == $await[-1]) { - log_debug { "This future is not waiting on anything so calling stop on the run loop" }; + log_trace { "This future is not waiting on anything so calling stop on the run loop" }; $loop->stop; } }); - log_debug { "Starting run loop for newly created future" }; + log_trace { "Starting run loop for newly created future" }; $loop->run; } if (@await and $await[-1]->is_ready) { - log_debug { "Last future in await list was ready, stopping run loop" }; + log_trace { "Last future in await list was ready, stopping run loop" }; $loop->stop; } log_trace { "await_future() returning" }; diff --git a/lib/Object/Remote/MiniLoop.pm b/lib/Object/Remote/MiniLoop.pm index 5f22eac..18c0b47 100644 --- a/lib/Object/Remote/MiniLoop.pm +++ b/lib/Object/Remote/MiniLoop.pm @@ -43,13 +43,15 @@ sub watch_io { #will need to be integrated in a way that #is compatible with Windows which has no #non-blocking support - Dlog_warn { "setting file handle to be non-blocking: " } $fh; - use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); - my $flags = fcntl($fh, F_GETFL, 0) - or die "Can't get flags for the socket: $!\n"; - $flags = fcntl($fh, F_SETFL, $flags | O_NONBLOCK) - or die "Can't set flags for the socket: $!\n"; - + if (0) { + Dlog_warn { "setting file handle to be non-blocking: $_" } $fh; + use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); + my $flags = fcntl($fh, F_GETFL, 0) + or die "Can't get flags for the socket: $!\n"; + $flags = fcntl($fh, F_SETFL, $flags | O_NONBLOCK) + or die "Can't set flags for the socket: $!\n"; + } + if (my $cb = $watch{on_read_ready}) { log_trace { "IO watcher is registering with select for reading" }; $self->_read_select->add($fh); @@ -122,8 +124,6 @@ sub _next_timer_expires_delay { $duration = $delay_max; } - #uncomment for original behavior - #return .5; return $duration; } @@ -131,11 +131,12 @@ sub loop_once { my ($self) = @_; my $read = $self->_read_watches; my $write = $self->_write_watches; + our $Loop_Entered = 1; my $read_count = 0; my $write_count = 0; my @c = caller; my $wait_time = $self->_next_timer_expires_delay; - log_debug { sprintf("Run loop: loop_once() has been invoked by $c[1]:$c[2] with read:%i write:%i select timeout:%s", + log_trace { sprintf("Run loop: loop_once() has been invoked by $c[1]:$c[2] with read:%i write:%i select timeout:%s", scalar(keys(%$read)), scalar(keys(%$write)), defined $wait_time ? $wait_time : 'indefinite' ) }; #TODO The docs state that select() in some instances can return a socket as ready to #read data even if reading from it would block and the recomendation is to set @@ -155,7 +156,7 @@ sub loop_once { #return? $self->_read_select, $self->_write_select, undef, $wait_time ); - log_debug { + log_trace { my $readable_count = defined $readable ? scalar(@$readable) : 0; my $writable_count = defined $writeable ? scalar(@$writeable) : 0; "Run loop: select returned readable:$readable_count writeable:$writable_count"; @@ -163,11 +164,15 @@ sub loop_once { # I would love to trap errors in the select call but IO::Select doesn't # differentiate between an error and a timeout. # -- no, love, mst. + + local $Loop_Entered; + log_trace { "Reading from all ready filehandles" }; foreach my $fh (@$readable) { next unless $read->{$fh}; $read_count++; $read->{$fh}(); + last if $Loop_Entered; # $read->{$fh}() if $read->{$fh}; } log_trace { "Writing to all ready filehandles" }; @@ -175,6 +180,7 @@ sub loop_once { next unless $write->{$fh}; $write_count++; $write->{$fh}(); + last if $Loop_Entered; # $write->{$fh}() if $write->{$fh}; } log_trace { "Read from $read_count filehandles; wrote to $write_count filehandles" }; @@ -184,8 +190,9 @@ sub loop_once { while (@$timers and $timers->[0][0] <= $now) { Dlog_debug { "Found timer that needs to be executed: $_" } $timers->[0]; (shift @$timers)->[1]->(); + last if $Loop_Entered; } - log_debug { "Run loop: single loop is completed" }; + log_trace { "Run loop: single loop is completed" }; return; } @@ -223,12 +230,12 @@ sub want_stop { #will with out interfering with each other sub run { my ($self) = @_; - log_info { "Run loop: run() invoked" }; + log_trace { "Run loop: run() invoked" }; local $self->{is_running} = 1; while ($self->is_running) { $self->loop_once; } - log_info { "Run loop: run() completed" }; + log_trace { "Run loop: run() completed" }; return; } diff --git a/lib/Object/Remote/Role/Connector/PerlInterpreter.pm b/lib/Object/Remote/Role/Connector/PerlInterpreter.pm index b1efd9b..291a664 100644 --- a/lib/Object/Remote/Role/Connector/PerlInterpreter.pm +++ b/lib/Object/Remote/Role/Connector/PerlInterpreter.pm @@ -1,6 +1,6 @@ package Object::Remote::Role::Connector::PerlInterpreter; -#use IPC::Open2; +use IPC::Open2; use IPC::Open3; use IO::Handle; use Object::Remote::ModuleSender; @@ -13,6 +13,9 @@ use Moo::Role; with 'Object::Remote::Role::Connector'; has module_sender => (is => 'lazy'); +#if no child_stderr file handle is specified then stderr +#of the child will be connected to stderr of the parent +has stderr => ( is => 'rw', default => sub { \*STDERR } ); sub _build_module_sender { my ($hook) = @@ -28,6 +31,7 @@ has perl_command => (is => 'lazy'); #ulimit of ~500 megs of v-ram #TODO only works with ssh with quotes but only works locally #with out quotes +#sub _build_perl_command { [ 'sh', '-c', '"ulimit -v 80000; nice -n 15 perl -"' ] } sub _build_perl_command { [ 'sh', '-c', '"ulimit -v 500000; nice -n 15 perl -"' ] } #sub _build_perl_command { [ 'perl', '-' ] } @@ -55,8 +59,52 @@ sub final_perl_command { shift->perl_command } sub _start_perl { my $self = shift; + my $given_stderr = $self->stderr; + my $foreign_stderr; + Dlog_debug { "invoking connection to perl interpreter using command line: $_" } @{$self->final_perl_command}; + use Symbol; + + if (defined($given_stderr)) { + $foreign_stderr = gensym(); + } else { + $foreign_stderr = ">&STDERR"; + } + + my $pid = open3( + my $foreign_stdin, + my $foreign_stdout, + $foreign_stderr, + @{$self->final_perl_command}, + ) or die "Failed to run perl at '$_[0]': $!"; + + if (defined($given_stderr)) { + log_warn { "using experimental cat for child stderr" }; + + #TODO refactor if this solves the problem + Object::Remote->current_loop + ->watch_io( + handle => $foreign_stderr, + on_read_ready => sub { + my $buf = ''; + my $len = sysread($foreign_stderr, $buf, 32768); + if (!defined($len) or $len == 0) { + log_trace { "Got EOF or error on child stderr, removing from watcher" }; + $self->stderr(undef); + Object::Remote->current_loop + ->unwatch_io( + handle => $foreign_stderr, + on_read_ready => 1 + ); + } else { + Dlog_trace { "got $len characters of stderr data for connection" }; + print $given_stderr $buf or die "could not send stderr data: $!"; + } + } + ); + } + #TODO open2() dupes the child stderr into the calling #process stderr which means if this process exits the #child is still attached to the shell - using open3() @@ -69,11 +117,11 @@ sub _start_perl { #where the user of a connection species a destination for output #either a file name or their own file handle and the node output #is dumped to it - my $pid = open2( - my $foreign_stdout, - my $foreign_stdin, - @{$self->final_perl_command}, - ) or die "Failed to run perl at '$_[0]': $!"; +# my $pid = open2( +# my $foreign_stdout, +# my $foreign_stdin, +# @{$self->final_perl_command}, +# ) or die "Failed to run perl at '$_[0]': $!"; Dlog_trace { "Connection to remote side successful; remote stdin and stdout: $_" } [ $foreign_stdin, $foreign_stdout ]; return ($foreign_stdin, $foreign_stdout, $pid);