From: Tyler Riddle Date: Thu, 20 Sep 2012 19:08:42 +0000 (-0700) Subject: got all general logging done, start of adding ids to objects and incorporating ids... X-Git-Tag: v0.003001_01~113 X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=commitdiff_plain;h=9031635d18e754da303557b656c63ce8e7eb8e77;p=scpubgit%2FObject-Remote.git got all general logging done, start of adding ids to objects and incorporating ids in the logs --- diff --git a/lib/Object/Remote/Connection.pm b/lib/Object/Remote/Connection.pm index ba68d86..b051b0b 100644 --- a/lib/Object/Remote/Connection.pm +++ b/lib/Object/Remote/Connection.pm @@ -16,6 +16,11 @@ use JSON::PP qw(encode_json); use Moo; our $DEBUG = !!$ENV{OBJECT_REMOTE_DEBUG}; +#numbering each connection allows it to be +#tracked along with file handles in +#the logs +BEGIN { our $NEXT_CONNECTION_ID = 0 } +has _id => ( is => 'ro', required => 1, default => sub { our $NEXT_CONNECTION_ID++ } ); has send_to_fh => ( is => 'ro', required => 1, @@ -26,6 +31,7 @@ has read_channel => ( is => 'ro', required => 1, trigger => sub { my ($self, $ch) = @_; + Dlog_trace { "trigger for read_channel has been invoked for connection $_" } $self->_id; weaken($self); $ch->on_line_call(sub { $self->_receive(@_) }); $ch->on_close_call(sub { $self->on_close->done(@_) }); @@ -34,8 +40,9 @@ has read_channel => ( has on_close => ( is => 'ro', default => sub { CPS::Future->new }, - trigger => sub { + trigger => sub { my ($self, $f) = @_; + Dlog_trace { "trigger for on_close has been invoked for connection $_" } $self->_id; weaken($self); $f->on_done(sub { $self->_fail_outstanding("Connection lost: ".($f->get)[0]); @@ -59,6 +66,7 @@ has outstanding_futures => (is => 'ro', default => sub { {} }); sub _fail_outstanding { my ($self, $error) = @_; + Dlog_debug { "Failing outstanding futures with '$error' for connection $_" } $self->_id; my $outstanding = $self->outstanding_futures; $_->fail($error) for values %$outstanding; %$outstanding = (); @@ -75,6 +83,7 @@ has _json => ( sub _id_to_remote_object { my ($self, $id) = @_; + Dlog_trace { "fetching proxy for remote object with id '$id' for connection $_" } $self->_id; return bless({}, 'Object::Remote::Null') if $id eq 'NULL'; ( $self->remote_objects_by_id->{$id} @@ -123,8 +132,10 @@ BEGIN { sub new_from_spec { my ($class, $spec) = @_; return $spec if blessed $spec; + Dlog_debug { "creating a new connection from spec" }; foreach my $poss (do { our @Guess }) { if (my $conn = $poss->($spec)) { + #Dlog_debug { my $id = $conn->_id; "created connection $id for spec $_" } $spec; return $conn->maybe::start::connect; } } @@ -140,7 +151,7 @@ sub remote_object { sub connect { my ($self, $to) = @_; - Dlog_debug { "Creating connection to remote node $_" } $to; + Dlog_debug { "Creating connection to remote node '$to' for connection $_" } $self->_id; return await_future( $self->send_class_call(0, 'Object::Remote', connect => $to) ); @@ -149,13 +160,13 @@ sub connect { sub remote_sub { my ($self, $sub) = @_; my ($pkg, $name) = $sub =~ m/^(.*)::([^:]+)$/; - log_debug { "Invoking remote sub '$sub'" }; + Dlog_debug { "Invoking remote sub '$sub' for connection $_" } $self->_id; return await_future($self->send_class_call(0, $pkg, can => $name)); } sub send_class_call { my ($self, $ctx, @call) = @_; - log_trace { "Sending a non-blocking class call" }; + Dlog_trace { "Sending a class call for connection $_" } $self->_id; $self->send(call => class_call_handler => $ctx => call => @call); } @@ -179,14 +190,14 @@ sub new_class_call_handler { sub register_remote { my ($self, $remote) = @_; - log_trace { my $i = $remote->id; "Registered a remote object with id of '$i'" }; + Dlog_trace { my $i = $remote->id; "Registered a remote object with id of '$i' for connection $_" } $self->_id; weaken($self->remote_objects_by_id->{$remote->id} = $remote); return $remote; } sub send_free { my ($self, $id) = @_; - log_debug { "sending request to free object '$id'" }; + Dlog_debug { "sending request to free object '$id' for connection $_" } $self->_id; delete $self->remote_objects_by_id->{$id}; $self->_send([ free => $id ]); } @@ -222,8 +233,9 @@ sub send_discard { sub _send { my ($self, $to_send) = @_; my $fh = $self->send_to_fh; + Dlog_trace { "Starting to serialize data in argument to _send for connection $_" } $self->_id; my $serialized = $self->_serialize($to_send)."\n"; - Dlog_debug { my $l = length($serialized); "Sending '$l' characters of serialized data to $_" } $fh; + Dlog_debug { my $l = length($serialized); "serialization is completed; sending '$l' characters of serialized data to $_" } $fh; #TODO this is very risky for deadlocks unless it's set to non-blocking and then with out extra #logic it could easily do short-writes to the remote side my $ret = print $fh $serialized; @@ -236,6 +248,7 @@ sub _send { sub _serialize { my ($self, $data) = @_; local our @New_Ids = (-1); + Dlog_debug { "starting to serialize data for connection $_" } $self->_id; return eval { my $flat = $self->_encode($self->_deobjectify($data)); warn "$$ >>> ${flat}\n" if $DEBUG; @@ -295,8 +308,10 @@ sub _deobjectify { sub _receive { my ($self, $flat) = @_; warn "$$ <<< $flat\n" if $DEBUG; + Dlog_trace { my $l = length($flat); "Starting to deserialize $l characters of data for connection $_" } $self->_id; my ($type, @rest) = eval { @{$self->_deserialize($flat)} } or do { warn "Deserialize failed for ${flat}: $@"; return }; + Dlog_trace { "deserialization complete for connection $_" } $self->_id; eval { $self->${\"receive_${type}"}(@rest); 1 } or do { warn "Receive failed for ${flat}: $@"; return }; return; @@ -304,6 +319,7 @@ sub _receive { sub receive_free { my ($self, $id) = @_; + Dlog_trace { "got a receive_free for object '$id' for connection $_" } $self->_id; delete $self->local_objects_by_id->{$id} or warn "Free: no such object $id"; return; @@ -311,6 +327,7 @@ sub receive_free { sub receive_call { my ($self, $future_id, $id, @rest) = @_; + Dlog_trace { "got a receive_call for object '$id' for connection $_" } $self->_id; my $future = $self->_id_to_remote_object($future_id); $future->{method} = 'call_discard_free'; my $local = $self->local_objects_by_id->{$id} @@ -320,12 +337,14 @@ sub receive_call { sub receive_call_free { my ($self, $future, $id, @rest) = @_; + Dlog_trace { "got a receive_call_free for object '$id' for connection $_" } $self->_id; $self->receive_call($future, $id, undef, @rest); $self->receive_free($id); } sub _invoke { my ($self, $future, $local, $ctx, $method, @args) = @_; + Dlog_trace { "got _invoke for a method named '$method' for connection $_" } $self->_id; if ($method =~ /^start::/) { my $f = $local->$method(@args); $f->on_done(sub { undef($f); $future->done(@_) }); diff --git a/lib/Object/Remote/ConnectionServer.pm b/lib/Object/Remote/ConnectionServer.pm index 639d456..13bf811 100644 --- a/lib/Object/Remote/ConnectionServer.pm +++ b/lib/Object/Remote/ConnectionServer.pm @@ -3,6 +3,7 @@ package Object::Remote::ConnectionServer; use Scalar::Util qw(blessed weaken); use Module::Runtime qw(use_module); use Object::Remote; +use Object::Remote::Logging qw( :log :dlog ); use IO::Socket::UNIX; use POSIX (); use Moo; @@ -19,6 +20,7 @@ has listen_on => ( }, trigger => sub { my ($self, $fh) = @_; + log_debug { "adding connection server to run loop because the trigger has executed" }; weaken($self); Object::Remote->current_loop ->watch_io( @@ -37,18 +39,23 @@ has connection_callback => ( ); sub BUILD { + log_debug { "A connection server has been built; calling want_run on run loop" }; Object::Remote->current_loop->want_run; } sub run { + log_debug { "Connection server is calling run_while_wanted on the run loop" }; Object::Remote->current_loop->run_while_wanted; } sub _listen_ready { my ($self, $fh) = @_; + log_debug { "Got a connection, calling accept on the file handle" }; my $new = $fh->accept or die "Couldn't accept: $!"; + log_trace { "Setting file handle non-blocking" }; $new->blocking(0); my $f = CPS::Future->new; + log_trace { "Creating a new connection with the remote node" }; my $c = use_module('Object::Remote::Connection')->new( receive_from_fh => $new, send_to_fh => $new, @@ -56,25 +63,32 @@ sub _listen_ready { @{$self->connection_args} )->${\$self->connection_callback}; $f->on_ready(sub { undef($c) }); + log_trace { "marking the future as done" }; $c->ready_future->done; #TODO see if this runs on the controller or the remote node #if this runs on the controller a poorly behaved remote node #could cause the print() to block but it's a very low probability + Dlog_trace { "Sending 'Shere' to socket $_" } $new; print $new "Shere\n" or die "Couldn't send to new socket: $!"; + log_debug { "Connection has been fully handled" }; return $c; } sub DEMOLISH { my ($self, $gd) = @_; + log_debug { "A connection server is being destroyed; global destruction: '$gd'" }; return if $gd; + log_trace { "Removing the connection server IO watcher from run loop" }; Object::Remote->current_loop ->unwatch_io( handle => $self->listen_on, on_read_ready => 1 ); if ($self->listen_on->can('hostpath')) { + log_debug { my $p = $self->listen_on->hostpath; "Removing '$p' from the filesystem" }; unlink($self->listen_on->hostpath); } + log_trace { "calling want_stop on the run loop" }; Object::Remote->current_loop->want_stop; } diff --git a/lib/Object/Remote/Connector/Local.pm b/lib/Object/Remote/Connector/Local.pm index 729bcd7..a2e52b0 100644 --- a/lib/Object/Remote/Connector/Local.pm +++ b/lib/Object/Remote/Connector/Local.pm @@ -6,6 +6,8 @@ with 'Object::Remote::Role::Connector::PerlInterpreter'; no warnings 'once'; +BEGIN { } + push @Object::Remote::Connection::Guess, sub { if (($_[0]||'') eq '-') { __PACKAGE__->new } }; diff --git a/lib/Object/Remote/Connector/LocalSudo.pm b/lib/Object/Remote/Connector/LocalSudo.pm index 0b5f33d..f6b16ee 100644 --- a/lib/Object/Remote/Connector/LocalSudo.pm +++ b/lib/Object/Remote/Connector/LocalSudo.pm @@ -67,8 +67,8 @@ sub _start_perl { #TODO is there a specific reason sysread() and syswrite() aren't #a part of ::MiniLoop? It's one spot to handle errors and other #logic involving filehandles - log_debug { "LocalSudo: Preparing to read data" }; - if (sysread($sudo_stderr, my $buf, 1024) > 0) { + Dlog_debug { "LocalSudo: Preparing to read data from $_" } $sudo_stderr; + if (sysread($sudo_stderr, my $buf, 32768) > 0) { log_trace { "LocalSudo: successfully read data, printing it to STDERR" }; print STDERR $buf; log_trace { "LocalSudo: print() to STDERR is done" }; diff --git a/lib/Object/Remote/FromData.pm b/lib/Object/Remote/FromData.pm index 3738b66..4a1bbe1 100644 --- a/lib/Object/Remote/FromData.pm +++ b/lib/Object/Remote/FromData.pm @@ -2,6 +2,7 @@ package Object::Remote::FromData; use strictures 1; use Object::Remote; +use Object::Remote::Logging qw ( :log ); our %Modules; our %Not_Loaded_Yet; @@ -9,12 +10,16 @@ our %Seen; sub import { my $target = caller; + log_trace { "import has been invoked by '$target' on " . __PACKAGE__ }; return if $Seen{$target}; + log_debug { "'$target' has not yet loaded " . __PACKAGE__ }; $Seen{$target} = $Not_Loaded_Yet{$target} = 1; } sub flush_loaded { + log_debug { "flushing the loaded classes" }; foreach my $key (keys %Not_Loaded_Yet) { + log_trace { "flushing '$key'" }; my $data_fh = do { no strict 'refs'; *{"${key}::DATA"} }; my $data = do { local $/; <$data_fh> }; my %modules = reverse( @@ -24,6 +29,7 @@ sub flush_loaded { @Modules{keys %modules} = values %modules; delete $Not_Loaded_Yet{$key}; } + log_trace { "done flushing loaded classes" }; } sub find_module { diff --git a/lib/Object/Remote/Future.pm b/lib/Object/Remote/Future.pm index c54a8a9..2817539 100644 --- a/lib/Object/Remote/Future.pm +++ b/lib/Object/Remote/Future.pm @@ -4,7 +4,7 @@ use strict; use warnings; use base qw(Exporter); -use Object::Remote::Logging qw( :log ); +use Object::Remote::Logging qw( :log Dlog_trace ); use CPS::Future; diff --git a/lib/Object/Remote/Handle.pm b/lib/Object/Remote/Handle.pm index a3cf33a..de7a65b 100644 --- a/lib/Object/Remote/Handle.pm +++ b/lib/Object/Remote/Handle.pm @@ -2,6 +2,7 @@ package Object::Remote::Handle; use Object::Remote::Proxy; use Scalar::Util qw(weaken blessed); +use Object::Remote::Logging qw ( :log ); use Object::Remote::Future; #must find way to exclude certain log events #from being forwarded - log events generated in @@ -31,12 +32,14 @@ sub proxy { sub BUILD { my ($self, $args) = @_; -# log_debug { "constructing instance of " . ref($self) }; + log_debug { "constructing remote handle" }; if ($self->id) { + log_trace { "disaming free for this hanle" }; $self->disarm_free; } else { die "No id supplied and no class either" unless $args->{class}; ref($_) eq 'HASH' and $_ = [ %$_ ] for $args->{args}; + log_trace { "fetching id for handle and disarming free on remote side" }; $self->_set_id( await_future( $self->connection->send_class_call( @@ -46,13 +49,14 @@ sub BUILD { )->{remote}->disarm_free->id ); } -# log_trace { "finished constructing " . ref($self) }; + log_trace { "finished constructing remote handle; registering it" . ref($self) }; $self->connection->register_remote($self); } sub call { my ($self, $method, @args) = @_; my $w = wantarray; + log_debug { my $def = defined $w; "call() has been invoked on a remote handle; wantarray: '$def'" }; $method = "start::${method}" if (caller(0)||'') eq 'start'; future { $self->connection->send(call => $self->id, $w, $method, @args) @@ -61,17 +65,20 @@ sub call { sub call_discard { my ($self, $method, @args) = @_; + log_trace { "invoking send_discard() with 'call' for method '$method' on connection for remote handle" }; $self->connection->send_discard(call => $self->id, $method, @args); } sub call_discard_free { my ($self, $method, @args) = @_; $self->disarm_free; + log_trace { "invoking send_discard() with 'call_free' for method '$method' on connection for remote handle" }; $self->connection->send_discard(call_free => $self->id, $method, @args); } sub DEMOLISH { my ($self, $gd) = @_; + log_trace { "Demolishing remote handle" }; return if $gd or $self->disarmed_free; $self->connection->send_free($self->id); } diff --git a/lib/Object/Remote/LogRouter.pm b/lib/Object/Remote/LogRouter.pm index 17108dc..12ce1a9 100644 --- a/lib/Object/Remote/LogRouter.pm +++ b/lib/Object/Remote/LogRouter.pm @@ -71,7 +71,7 @@ sub handle_log_message { #this info to the selector is a good feature local($_) = { level => $level, package => $caller }; if ($selector->(@values)) { - #TODO resolve caller_level issues with routing + #SOLVED resolve caller_level issues with routing #idea: the caller level will differ in distance from the #start of the call stack but it's a constant distance from #the end of the call stack - can that be exploited to calculate @@ -83,7 +83,14 @@ sub handle_log_message { #it can be invoked in that location and the caller level #problem doesn't exist anymore $logger = $logger->($caller, { caller_level => -1 }); - + + #TODO there is a known issue with the interaction of this + #routed logging scheme and objects proxied with Object::Remote. + #Specifically the loggers must be invoked with a calling + #depth of 0 which isn't possible using a logger that has + #been proxied which is what happens with routed logging + #if the logger is created in one Perl interpreter and the + #logging happens in another $logger->$level($log_meth->(@values)) if $logger->${\"is_$level"}; } diff --git a/lib/Object/Remote/Node.pm b/lib/Object/Remote/Node.pm index 58923c8..027097c 100644 --- a/lib/Object/Remote/Node.pm +++ b/lib/Object/Remote/Node.pm @@ -2,12 +2,12 @@ package Object::Remote::Node; use strictures 1; use Object::Remote::Connector::STDIO; -use Object::Remote::Logging qw(:log); +use Object::Remote::Logging qw(:log :dlog); use Object::Remote; use CPS::Future; sub run { - log_trace { "run() has been invoked on remote node; creating STDIO connector" }; + log_trace { "run() has been invoked on remote node" }; my $c = Object::Remote::Connector::STDIO->new->connect; $c->register_class_call_handler; @@ -19,6 +19,7 @@ sub run { $loop->want_stop }); + Dlog_trace { "Node is sending 'Shere' to $_" } $c->send_to_fh; print { $c->send_to_fh } "Shere\n"; log_debug { "Node is going to start the run loop" }; diff --git a/lib/Object/Remote/ReadChannel.pm b/lib/Object/Remote/ReadChannel.pm index 6402bdd..b328d11 100644 --- a/lib/Object/Remote/ReadChannel.pm +++ b/lib/Object/Remote/ReadChannel.pm @@ -2,7 +2,7 @@ package Object::Remote::ReadChannel; use CPS::Future; use Scalar::Util qw(weaken); -use Object::Remote::Logging qw(:log); +use Object::Remote::Logging qw(:log :dlog); use Moo; has fh => ( @@ -38,14 +38,13 @@ has _receive_data_buffer => (is => 'ro', default => sub { my $x = ''; \$x }); #there is no actual data to read from the socket sub _receive_data_from { my ($self, $fh) = @_; - log_trace { "Preparing to read data" }; + Dlog_trace { "Preparing to read data from $_" } $fh; #use Carp qw(cluck); cluck(); my $rb = $self->_receive_data_buffer; #TODO is there a specific reason sysread() and syswrite() aren't #a part of ::MiniLoop? It's one spot to handle errors and other #logic involving filehandles - #TODO why are the buffers so small? BUFSIZ is usually 32768 - my $len = sysread($fh, $$rb, 1024, length($$rb)); + my $len = sysread($fh, $$rb, 32768, length($$rb)); my $err = defined($len) ? '' : ": $!"; if (defined($len) and $len > 0) { log_trace { "Read $len bytes of data" }; diff --git a/lib/Object/Remote/Role/Connector.pm b/lib/Object/Remote/Role/Connector.pm index 83bfcea..d5d46d7 100644 --- a/lib/Object/Remote/Role/Connector.pm +++ b/lib/Object/Remote/Role/Connector.pm @@ -7,6 +7,8 @@ use Moo::Role; requires '_open2_for'; +#TODO return to 10 seconds after debugging +#has timeout => (is => 'ro', default => sub { { after => 10 } }); has timeout => (is => 'ro', default => sub { { after => 10 } }); sub connect { @@ -50,11 +52,10 @@ sub connect { Dlog_trace { "Connection timeout timer has fired for child pid '$child_pid'; is_ready: $_" } $f->is_ready; unless($f->is_ready) { log_warn { "Connection with child pid '$child_pid' has timed out" }; - $f->fail("Connection timed out") unless $f->is_ready; + $f->fail("Connection timed out") unless $f->is_ready; } - #TODO hrm was this supposed to be conditional on the is_ready ? - #a connection is only good for timeout seconds? undef($channel); + } ); log_trace { "connection for child pid '$child_pid' has been initialized" }; diff --git a/lib/Object/Remote/Role/Connector/PerlInterpreter.pm b/lib/Object/Remote/Role/Connector/PerlInterpreter.pm index 815ef3e..b8e3f75 100644 --- a/lib/Object/Remote/Role/Connector/PerlInterpreter.pm +++ b/lib/Object/Remote/Role/Connector/PerlInterpreter.pm @@ -5,7 +5,7 @@ use IO::Handle; use Object::Remote::ModuleSender; use Object::Remote::Handle; use Object::Remote::Future; -use Object::Remote::Logging qw( :log ); +use Object::Remote::Logging qw( :log :dlog ); use Scalar::Util qw(blessed); use Moo::Role; @@ -22,7 +22,13 @@ sub _build_module_sender { has perl_command => (is => 'lazy'); -sub _build_perl_command { [ 'perl', '-' ] } +#TODO convert nice value into optional feature enabled by +#setting value of attribute +#ulimit of ~500 megs of v-ram +#TODO only works with ssh with quotes but only works locally +#with out quotes +sub _build_perl_command { [ 'sh', '-c', '"ulimit -v 500000; nice -n 15 perl -"' ] } +#sub _build_perl_command { [ 'perl', '-' ] } around connect => sub { my ($orig, $self) = (shift, shift); @@ -48,11 +54,13 @@ sub final_perl_command { shift->perl_command } sub _start_perl { my $self = shift; + Dlog_debug { "invoking connection to perl interpreter using command line: $_" } @{$self->final_perl_command}; my $pid = open2( my $foreign_stdout, my $foreign_stdin, @{$self->final_perl_command}, ) or die "Failed to run perl at '$_[0]': $!"; + Dlog_trace { "Connection to remote side successful; remote stdin and stdout: $_" } [ $foreign_stdin, $foreign_stdout ]; return ($foreign_stdin, $foreign_stdout, $pid); } @@ -67,7 +75,7 @@ sub _open2_for { ->watch_io( handle => $foreign_stdin, on_write_ready => sub { - my $len = syswrite($foreign_stdin, $to_send, 4096); + my $len = syswrite($foreign_stdin, $to_send, 32768); if (defined $len) { substr($to_send, 0, $len) = ''; }