X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?p=scpubgit%2FObject-Remote.git;a=blobdiff_plain;f=lib%2FObject%2FRemote%2FConnection.pm;h=090b9aa8a398553f34613f6f24138d1bec727072;hp=136b065aac3284ed345cbe598a5e595b1ac155e2;hb=55c0d0209fa9d9265ff178f54ae9fe5fdddef3c1;hpb=b0ec7e3b19d47b9ba5a864f5077d0dc8030834cc diff --git a/lib/Object/Remote/Connection.pm b/lib/Object/Remote/Connection.pm index 136b065..090b9aa 100644 --- a/lib/Object/Remote/Connection.pm +++ b/lib/Object/Remote/Connection.pm @@ -21,10 +21,14 @@ use Carp qw(croak); BEGIN { router()->exclude_forwarding } END { - log_debug { "Killing all child processes in the process group" }; - - #send SIGINT to the process group for our children - kill(1, -2); + our %child_pids; + + log_trace { "END handler is being invoked in " . __PACKAGE__ }; + + foreach(keys(%child_pids)) { + log_debug { "Killing child process '$_'" }; + kill('TERM', $_); + } } has _id => ( is => 'ro', required => 1, default => sub { our $NEXT_CONNECTION_ID++ } ); @@ -42,11 +46,11 @@ has read_channel => ( is => 'ro', required => 1, trigger => sub { my ($self, $ch) = @_; - my $id = $self->_id; + my $id = $self->_id; Dlog_trace { "trigger for read_channel has been invoked for connection $id; file handle is $_" } $ch->fh; weaken($self); $ch->on_line_call(sub { $self->_receive(@_) }); - $ch->on_close_call(sub { + $ch->on_close_call(sub { log_trace { "invoking 'done' on on_close handler for connection id '$id'" }; $self->on_close->done(@_); }); @@ -86,32 +90,37 @@ has _json => ( after BUILD => sub { my ($self) = @_; my $pid = $self->child_pid; - - unless (defined $pid) { - log_trace { "After BUILD invoked for connection but there was no pid" }; - return; - } - - log_trace { "Setting process group of child process '$pid'" }; - - setpgrp($self->child_pid, 1); + our %child_pids; + return unless defined $pid; + $child_pids{$pid} = 1; + return; }; sub BUILD { } sub is_valid { my ($self) = @_; - my $closed = $self->on_close->is_ready; - - log_trace { "Connection closed: $closed" }; - return ! $closed; + my $valid = ! $self->on_close->is_ready; + + log_trace { + my $id = $self->_id; + my $text; + if ($valid) { + $text = 'yes'; + } else { + $text = 'no'; + } + "Connection '$id' is valid: '$text'" + }; + + return $valid; } sub _fail_outstanding { my ($self, $error) = @_; my $outstanding = $self->outstanding_futures; - - Dlog_debug { + + Dlog_debug { sprintf "Failing %i outstanding futures with '$error'", scalar(keys(%$outstanding)) }; @@ -127,6 +136,7 @@ sub _fail_outstanding { sub _install_future_handlers { my ($self, $f) = @_; + our %child_pids; Dlog_trace { "Installing handlers into future for connection $_" } $self->_id; weaken($self); $f->on_done(sub { @@ -147,8 +157,10 @@ sub _install_future_handlers { "Remote Perl interpreter exited with value '$exit_value'" }; } + + delete $child_pids{$pid}; }); - return $f; + return $f; }; sub _id_to_remote_object { @@ -204,9 +216,9 @@ sub _build__json { } sub _load_if_possible { - my ($class) = @_; + my ($class) = @_; - use_module($class); + use_module($class); if ($@) { log_debug { "Attempt at loading '$class' failed with '$@'" }; @@ -221,7 +233,7 @@ BEGIN { Object::Remote::Connector::LocalSudo Object::Remote::Connector::SSH Object::Remote::Connector::UNIX - ); + ); } sub conn_from_spec { @@ -231,7 +243,7 @@ sub conn_from_spec { return $conn; } } - + return undef; } @@ -239,11 +251,11 @@ sub new_from_spec { my ($class, $spec, @args) = @_; return $spec if blessed $spec; my $conn = $class->conn_from_spec($spec, @args); - + die "Couldn't figure out what to do with ${spec}" unless defined $conn; - - return $conn->maybe::start::connect; + + return $conn->maybe::start::connect; } sub remote_object { @@ -264,7 +276,7 @@ sub connect { sub remote_sub { my ($self, $sub) = @_; my ($pkg, $name) = $sub =~ m/^(.*)::([^:]+)$/; - Dlog_debug { "Invoking remote sub '$sub' for connection $_" } $self->_id; + Dlog_debug { "Invoking remote sub '$sub' for connection '$_'" } $self->_id; return await_future($self->send_class_call(0, $pkg, can => $name)); } @@ -340,32 +352,32 @@ sub send_discard { sub _send { my ($self, $to_send) = @_; my $fh = $self->send_to_fh; - + unless ($self->is_valid) { croak "Attempt to invoke _send on a connection that is not valid"; } - + Dlog_trace { "Starting to serialize data in argument to _send for connection $_" } $self->_id; my $serialized = $self->_serialize($to_send)."\n"; Dlog_trace { my $l = length($serialized); "serialization is completed; sending '$l' characters of serialized data to $_" } $fh; - my $ret; - eval { + my $ret; + eval { #TODO this should be converted over to a non-blocking ::WriteChannel class die "filehandle is not open" unless openhandle($fh); log_trace { "file handle has passed openhandle() test; printing to it" }; $ret = print $fh $serialized; die "print was not successful: $!" unless defined $ret }; - + if ($@) { Dlog_debug { "exception encountered when trying to write to file handle $_: $@" } $fh; my $error = $@; chomp($error); $self->on_close->done("could not write to file handle: $error") unless $self->on_close->is_ready; - return; + return; } - - return $ret; + + return $ret; } sub _serialize { @@ -407,14 +419,14 @@ sub _deobjectify { if ($ref eq 'HASH') { my $tied_to = tied(%$data); if(defined($tied_to)) { - return +{__remote_tied_hash__ => $self->_local_object_to_id($tied_to)}; + return +{__remote_tied_hash__ => $self->_local_object_to_id($tied_to)}; } else { return +{ map +($_ => $self->_deobjectify($data->{$_})), keys %$data }; } } elsif ($ref eq 'ARRAY') { my $tied_to = tied(@$data); if (defined($tied_to)) { - return +{__remote_tied_array__ => $self->_local_object_to_id($tied_to)}; + return +{__remote_tied_array__ => $self->_local_object_to_id($tied_to)}; } else { return [ map $self->_deobjectify($_), @$data ]; } @@ -501,20 +513,20 @@ sub _invoke { Object::Remote::Connection - An underlying connection for L use Object::Remote; - + my %opts = ( nice => '10', ulimit => '-v 400000', watchdog_timeout => 120, stderr => \*STDERR, ); - + my $local = Object::Remote->connect('-'); my $remote = Object::Remote->connect('myserver', nice => 5); my $remote_user = Object::Remote->connect('user@myserver', %opts); my $local_sudo = Object::Remote->connect('user@'); - + #$remote can be any other connection object my $hostname = Sys::Hostname->can::on($remote, 'hostname'); - + =head1 DESCRIPTION This is the class that supports connections to a Perl interpreter that is executed in a