use Object::Remote::CodeContainer;
use Object::Remote::GlobProxy;
use Object::Remote::GlobContainer;
+use Object::Remote::Logging qw (:log :dlog);
use Object::Remote;
use Symbol;
use IO::Handle;
sub connect {
my ($self, $to) = @_;
+ Dlog_debug { "Creating connection to remote node $_" } $to;
return await_future(
$self->send_class_call(0, 'Object::Remote', connect => $to)
);
sub remote_sub {
my ($self, $sub) = @_;
my ($pkg, $name) = $sub =~ m/^(.*)::([^:]+)$/;
+ log_debug { "Invoking remote sub '$sub'" };
return await_future($self->send_class_call(0, $pkg, can => $name));
}
sub send_class_call {
my ($self, $ctx, @call) = @_;
+ log_trace { "Sending a non-blocking class call" };
$self->send(call => class_call_handler => $ctx => call => @call);
}
sub register_remote {
my ($self, $remote) = @_;
+ log_trace { my $i = $remote->id; "Registered a remote object with id of '$i'" };
weaken($self->remote_objects_by_id->{$remote->id} = $remote);
return $remote;
}
sub send_free {
my ($self, $id) = @_;
+ log_debug { "sending request to free object '$id'" };
delete $self->remote_objects_by_id->{$id};
$self->_send([ free => $id ]);
}
sub _send {
my ($self, $to_send) = @_;
-
- print { $self->send_to_fh } $self->_serialize($to_send)."\n";
+ my $fh = $self->send_to_fh;
+ my $serialized = $self->_serialize($to_send)."\n";
+ Dlog_debug { my $l = length($serialized); "Sending '$l' characters of serialized data to $_" } $fh;
+ #TODO this is very risky for deadlocks unless it's set to non-blocking and then with out extra
+ #logic it could easily do short-writes to the remote side
+ my $ret = print $fh $serialized;
+ Dlog_trace { my $r = defined $ret ? $ret : 'undef'; "print() returned $r with $_" } $fh;
+ #TODO hrm reason print's return value was ignored?
+ die "could not write to filehandle: $!" unless $ret;
+ return $ret;
}
sub _serialize {
)->${\$self->connection_callback};
$f->on_ready(sub { undef($c) });
$c->ready_future->done;
+ #TODO see if this runs on the controller or the remote node
+ #if this runs on the controller a poorly behaved remote node
+ #could cause the print() to block but it's a very low probability
print $new "Shere\n" or die "Couldn't send to new socket: $!";
return $c;
}
->watch_io(
handle => $sudo_stderr,
on_read_ready => sub {
+ #TODO is there a specific reason sysread() and syswrite() aren't
+ #a part of ::MiniLoop? It's one spot to handle errors and other
+ #logic involving filehandles
+ log_debug { "LocalSudo: Preparing to read data" };
if (sysread($sudo_stderr, my $buf, 1024) > 0) {
+ log_trace { "LocalSudo: successfully read data, printing it to STDERR" };
print STDERR $buf;
+ log_trace { "LocalSudo: print() to STDERR is done" };
} else {
+ log_debug { "LocalSudo: received EOF or error on file handle, unwatching it" };
Object::Remote->current_loop
->unwatch_io(
handle => $sudo_stderr,
sub await_future {
my $f = shift;
+ log_trace { my $ir = $f->is_ready; "await_future() invoked; is_ready: $ir" };
return $f if $f->is_ready;
require Object::Remote;
my $loop = Object::Remote->current_loop;
{
local @await = (@await, $f);
$f->on_ready(sub {
- $loop->stop if $f == $await[-1]
+ log_trace { my $l = @await; "future has become ready, length of \@await: '$l'" };
+ if ($f == $await[-1]) {
+ log_debug { "This future is not waiting on anything so calling stop on the run loop" };
+ $loop->stop;
+ }
});
+ log_debug { "Starting run loop for newly created future" };
$loop->run;
}
if (@await and $await[-1]->is_ready) {
+ log_debug { "Last future in await list was ready, stopping run loop" };
$loop->stop;
}
+ log_trace { "await_future() returning" };
return wantarray ? $f->get : ($f->get)[0];
}
sub await_all {
+ log_trace { my $l = @_; "await_all() invoked with '$l' futures to wait on" };
await_future(CPS::Future->wait_all(@_));
map $_->get, @_;
}
sub watch_io {
my ($self, %watch) = @_;
my $fh = $watch{handle};
- log_debug { my $type = ref($fh); "Adding watch for ref of type '$type'" };
+ Dlog_debug { my $type = ref($fh); "Adding IO watch for $_" } $fh;
if (my $cb = $watch{on_read_ready}) {
- log_trace { "IO watcher on_read_ready has been invoked" };
+ log_trace { "IO watcher is registering with select() for reading" };
$self->_read_select->add($fh);
$self->_read_watches->{$fh} = $cb;
}
if (my $cb = $watch{on_write_ready}) {
- log_trace { "IO watcher on_write_ready has been invoked" };
+ log_trace { "IO watcher is registering with select() for writing" };
$self->_write_select->add($fh);
$self->_write_watches->{$fh} = $cb;
}
sub unwatch_io {
my ($self, %watch) = @_;
my $fh = $watch{handle};
- log_debug { my $type = ref($fh); "Removing watch for ref of type '$type'" };
+ Dlog_debug { "Removing IO watch for $_" } $fh;
if ($watch{on_read_ready}) {
+ log_trace { "IO watcher is removing read from select()" };
$self->_read_select->remove($fh);
delete $self->_read_watches->{$fh};
}
if ($watch{on_write_ready}) {
+ log_trace { "IO watcher is removing write from select()" };
$self->_write_select->remove($fh);
delete $self->_write_watches->{$fh};
}
#when data is ready - when the system
#deadlocks the chatter from the timeout in
#select clogs up the logs
+ #TODO should make this an attribute
my $delay_max = undef;
return $delay_max unless @$timers;
$duration = $delay_max;
}
- log_trace { "returning $duration as select() timeout period" }
-
+ #uncomment for original behavior
+ #return .5;
return $duration;
}
my $wait_time = $self->_next_timer_expires_delay;
log_debug { sprintf("Run loop: loop_once() has been invoked by $c[1]:$c[2] with read:%i write:%i select timeout:%s",
scalar(keys(%$read)), scalar(keys(%$write)), defined $wait_time ? $wait_time : 'indefinite' ) };
+ #TODO The docs state that select() in some instances can return a socket as ready to
+ #read data even if reading from it would block and the recomendation is to set
+ #handles used with select() as non-blocking but Perl on Windows can not set a
+ #handle to use non-blocking IO - If Windows is not one of the operating
+ #systems where select() returns a handle that could block it would work to
+ #enable non-blocking mode only under Posix - the non-blocking sysread()
+ #logic would work unmodified for both blocking and non-blocking handles
+ #under Posix and Windows.
my ($readable, $writeable) = IO::Select->select(
+ #TODO how come select() isn't used to identify handles with errors on them?
+ #TODO is there a specific reason for a half second maximum wait duration?
+ #The two places I've found for the runloop to be invoked don't return control
+ #to the caller until a controlling variable interrupts the loop that invokes
+ #loop_once() - is this to allow that variable to be polled and exit the
+ #run loop? If so why isn't that behavior event driven and causes select() to
+ #return?
$self->_read_select, $self->_write_select, undef, $wait_time
);
log_debug {
return;
}
+#::Node and ::ConnectionServer use the want_run() / want_stop()
+#counter to cause a run-loop to execute while something is active;
+#the futures do this via a different mechanism
sub want_run {
my ($self) = @_;
Dlog_debug { "Run loop: Incrimenting want_running, is now $_" }
sub run_while_wanted {
my ($self) = @_;
- log_debug { "Run loop: run_while_wanted() invoked" };
+ log_debug { my $wr = $self->{want_running}; "Run loop: run_while_wanted() invoked; want_running: $wr" };
$self->loop_once while $self->{want_running};
log_debug { "Run loop: run_while_wanted() completed" };
return;
--$self->{want_running};
}
+#TODO Hypothesis: Futures invoke run() which gives that future
+#it's own localized is_running attribute - any adjustment to the
+#is_running attribute outside of that future will not effect that
+#future so each future winds up able to call run() and stop() at
+#will with out interfering with each other
sub run {
my ($self) = @_;
log_info { "Run loop: run() invoked" };
use strictures 1;
use Object::Remote::Connector::STDIO;
+use Object::Remote::Logging qw(:log);
use Object::Remote;
use CPS::Future;
sub run {
+ log_trace { "run() has been invoked on remote node; creating STDIO connector" };
my $c = Object::Remote::Connector::STDIO->new->connect;
$c->register_class_call_handler;
my $loop = Object::Remote->current_loop;
- $c->on_close->on_ready(sub { $loop->want_stop });
+ $c->on_close->on_ready(sub {
+ log_info { "Node connection with call handler has closed" };
+ $loop->want_stop
+ });
print { $c->send_to_fh } "Shere\n";
+ log_debug { "Node is going to start the run loop" };
$loop->want_run;
$loop->run_while_wanted;
+ log_debug { "Run loop invocation in node has completed" };
}
1;
has _receive_data_buffer => (is => 'ro', default => sub { my $x = ''; \$x });
+#TODO confirmed this is the point of the hang - sysread() is invoked on a
+#socket inside the controller that blocks and deadlocks the entire system.
+#The remote nodes are all waiting to receive data at that point.
+#Validated this behavior exists in an unmodified Object::Remote from CPAN
+#by wrapping this sysread() with warns that have the pid in them and pounding
+#my local machine with System::Introspector via ssh and 7 remote perl instances
+#It looks like one of the futures is responding to an event regarding the ability
+#to read from a socket and every once in a while an ordering issue means that
+#there is no actual data to read from the socket
sub _receive_data_from {
my ($self, $fh) = @_;
log_trace { "Preparing to read data" };
+ #use Carp qw(cluck); cluck();
my $rb = $self->_receive_data_buffer;
+ #TODO is there a specific reason sysread() and syswrite() aren't
+ #a part of ::MiniLoop? It's one spot to handle errors and other
+ #logic involving filehandles
+ #TODO why are the buffers so small? BUFSIZ is usually 32768
my $len = sysread($fh, $$rb, 1024, length($$rb));
my $err = defined($len) ? '' : ": $!";
if (defined($len) and $len > 0) {