my $fh = $self->send_to_fh;
Dlog_trace { "Starting to serialize data in argument to _send for connection $_" } $self->_id;
my $serialized = $self->_serialize($to_send)."\n";
- Dlog_debug { my $l = length($serialized); "serialization is completed; sending '$l' characters of serialized data to $_" } $fh;
+ Dlog_trace { my $l = length($serialized); "serialization is completed; sending '$l' characters of serialized data to $_" } $fh;
#TODO this is very risky for deadlocks unless it's set to non-blocking and then with out extra
#logic it could easily do short-writes to the remote side
my $ret = print $fh $serialized;
sub _serialize {
my ($self, $data) = @_;
local our @New_Ids = (-1);
- Dlog_debug { "starting to serialize data for connection $_" } $self->_id;
return eval {
my $flat = $self->_encode($self->_deobjectify($data));
warn "$$ >>> ${flat}\n" if $DEBUG;
$f->on_ready(sub {
log_trace { my $l = @await; "future has become ready, length of \@await: '$l'" };
if ($f == $await[-1]) {
- log_debug { "This future is not waiting on anything so calling stop on the run loop" };
+ log_trace { "This future is not waiting on anything so calling stop on the run loop" };
$loop->stop;
}
});
- log_debug { "Starting run loop for newly created future" };
+ log_trace { "Starting run loop for newly created future" };
$loop->run;
}
if (@await and $await[-1]->is_ready) {
- log_debug { "Last future in await list was ready, stopping run loop" };
+ log_trace { "Last future in await list was ready, stopping run loop" };
$loop->stop;
}
log_trace { "await_future() returning" };
#will need to be integrated in a way that
#is compatible with Windows which has no
#non-blocking support
- Dlog_warn { "setting file handle to be non-blocking: " } $fh;
- use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
- my $flags = fcntl($fh, F_GETFL, 0)
- or die "Can't get flags for the socket: $!\n";
- $flags = fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
- or die "Can't set flags for the socket: $!\n";
-
+ if (0) {
+ Dlog_warn { "setting file handle to be non-blocking: $_" } $fh;
+ use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
+ my $flags = fcntl($fh, F_GETFL, 0)
+ or die "Can't get flags for the socket: $!\n";
+ $flags = fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
+ or die "Can't set flags for the socket: $!\n";
+ }
+
if (my $cb = $watch{on_read_ready}) {
log_trace { "IO watcher is registering with select for reading" };
$self->_read_select->add($fh);
$duration = $delay_max;
}
- #uncomment for original behavior
- #return .5;
return $duration;
}
my ($self) = @_;
my $read = $self->_read_watches;
my $write = $self->_write_watches;
+ our $Loop_Entered = 1;
my $read_count = 0;
my $write_count = 0;
my @c = caller;
my $wait_time = $self->_next_timer_expires_delay;
- log_debug { sprintf("Run loop: loop_once() has been invoked by $c[1]:$c[2] with read:%i write:%i select timeout:%s",
+ log_trace { sprintf("Run loop: loop_once() has been invoked by $c[1]:$c[2] with read:%i write:%i select timeout:%s",
scalar(keys(%$read)), scalar(keys(%$write)), defined $wait_time ? $wait_time : 'indefinite' ) };
#TODO The docs state that select() in some instances can return a socket as ready to
#read data even if reading from it would block and the recomendation is to set
#return?
$self->_read_select, $self->_write_select, undef, $wait_time
);
- log_debug {
+ log_trace {
my $readable_count = defined $readable ? scalar(@$readable) : 0;
my $writable_count = defined $writeable ? scalar(@$writeable) : 0;
"Run loop: select returned readable:$readable_count writeable:$writable_count";
# I would love to trap errors in the select call but IO::Select doesn't
# differentiate between an error and a timeout.
# -- no, love, mst.
+
+ local $Loop_Entered;
+
log_trace { "Reading from all ready filehandles" };
foreach my $fh (@$readable) {
next unless $read->{$fh};
$read_count++;
$read->{$fh}();
+ last if $Loop_Entered;
# $read->{$fh}() if $read->{$fh};
}
log_trace { "Writing to all ready filehandles" };
next unless $write->{$fh};
$write_count++;
$write->{$fh}();
+ last if $Loop_Entered;
# $write->{$fh}() if $write->{$fh};
}
log_trace { "Read from $read_count filehandles; wrote to $write_count filehandles" };
while (@$timers and $timers->[0][0] <= $now) {
Dlog_debug { "Found timer that needs to be executed: $_" } $timers->[0];
(shift @$timers)->[1]->();
+ last if $Loop_Entered;
}
- log_debug { "Run loop: single loop is completed" };
+ log_trace { "Run loop: single loop is completed" };
return;
}
#will with out interfering with each other
sub run {
my ($self) = @_;
- log_info { "Run loop: run() invoked" };
+ log_trace { "Run loop: run() invoked" };
local $self->{is_running} = 1;
while ($self->is_running) {
$self->loop_once;
}
- log_info { "Run loop: run() completed" };
+ log_trace { "Run loop: run() completed" };
return;
}
package Object::Remote::Role::Connector::PerlInterpreter;
-#use IPC::Open2;
+use IPC::Open2;
use IPC::Open3;
use IO::Handle;
use Object::Remote::ModuleSender;
with 'Object::Remote::Role::Connector';
has module_sender => (is => 'lazy');
+#if no child_stderr file handle is specified then stderr
+#of the child will be connected to stderr of the parent
+has stderr => ( is => 'rw', default => sub { \*STDERR } );
sub _build_module_sender {
my ($hook) =
#ulimit of ~500 megs of v-ram
#TODO only works with ssh with quotes but only works locally
#with out quotes
+#sub _build_perl_command { [ 'sh', '-c', '"ulimit -v 80000; nice -n 15 perl -"' ] }
sub _build_perl_command { [ 'sh', '-c', '"ulimit -v 500000; nice -n 15 perl -"' ] }
#sub _build_perl_command { [ 'perl', '-' ] }
sub _start_perl {
my $self = shift;
+ my $given_stderr = $self->stderr;
+ my $foreign_stderr;
+
Dlog_debug { "invoking connection to perl interpreter using command line: $_" } @{$self->final_perl_command};
+ use Symbol;
+
+ if (defined($given_stderr)) {
+ $foreign_stderr = gensym();
+ } else {
+ $foreign_stderr = ">&STDERR";
+ }
+
+ my $pid = open3(
+ my $foreign_stdin,
+ my $foreign_stdout,
+ $foreign_stderr,
+ @{$self->final_perl_command},
+ ) or die "Failed to run perl at '$_[0]': $!";
+
+ if (defined($given_stderr)) {
+ log_warn { "using experimental cat for child stderr" };
+
+ #TODO refactor if this solves the problem
+ Object::Remote->current_loop
+ ->watch_io(
+ handle => $foreign_stderr,
+ on_read_ready => sub {
+ my $buf = '';
+ my $len = sysread($foreign_stderr, $buf, 32768);
+ if (!defined($len) or $len == 0) {
+ log_trace { "Got EOF or error on child stderr, removing from watcher" };
+ $self->stderr(undef);
+ Object::Remote->current_loop
+ ->unwatch_io(
+ handle => $foreign_stderr,
+ on_read_ready => 1
+ );
+ } else {
+ Dlog_trace { "got $len characters of stderr data for connection" };
+ print $given_stderr $buf or die "could not send stderr data: $!";
+ }
+ }
+ );
+ }
+
#TODO open2() dupes the child stderr into the calling
#process stderr which means if this process exits the
#child is still attached to the shell - using open3()
#where the user of a connection species a destination for output
#either a file name or their own file handle and the node output
#is dumped to it
- my $pid = open2(
- my $foreign_stdout,
- my $foreign_stdin,
- @{$self->final_perl_command},
- ) or die "Failed to run perl at '$_[0]': $!";
+# my $pid = open2(
+# my $foreign_stdout,
+# my $foreign_stdin,
+# @{$self->final_perl_command},
+# ) or die "Failed to run perl at '$_[0]': $!";
Dlog_trace { "Connection to remote side successful; remote stdin and stdout: $_" } [ $foreign_stdin, $foreign_stdout ];
return ($foreign_stdin, $foreign_stdout, $pid);