X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?p=scpubgit%2FObject-Remote.git;a=blobdiff_plain;f=lib%2FObject%2FRemote%2FConnection.pm;h=bb1b2603c67a16e1c749ec9e25152d48d5d57981;hp=3832bfab0b3054b8579ccda7f7e3e019d267eee8;hb=bef36e73e4257b2ba8e59eb55661ffc51d8a620a;hpb=302ecfbf4056a977f44f185cbfa925e3886d0c1a diff --git a/lib/Object/Remote/Connection.pm b/lib/Object/Remote/Connection.pm index 3832bfa..bb1b260 100644 --- a/lib/Object/Remote/Connection.pm +++ b/lib/Object/Remote/Connection.pm @@ -15,18 +15,21 @@ use POSIX ":sys_wait_h"; use Module::Runtime qw(use_module); use Scalar::Util qw(weaken blessed refaddr openhandle); use JSON::PP qw(encode_json); +use Future; +use Carp qw(croak); use Moo; -BEGIN { - router()->exclude_forwarding; - $SIG{PIPE} = sub { log_debug { "Got a PIPE signal" } }; -} +BEGIN { router()->exclude_forwarding } END { - log_debug { "Killing all child processes in the process group" }; - - #send SIGINT to the process group for our children - kill(1, -2); + our %child_pids; + + log_trace { "END handler is being invoked in " . __PACKAGE__ }; + + foreach(keys(%child_pids)) { + log_debug { "Killing child process '$_'" }; + kill('TERM', $_); + } } has _id => ( is => 'ro', required => 1, default => sub { our $NEXT_CONNECTION_ID++ } ); @@ -34,9 +37,9 @@ has _id => ( is => 'ro', required => 1, default => sub { our $NEXT_CONNECTION_ID has send_to_fh => ( is => 'ro', required => 1, trigger => sub { - my $self = $_[0]; - $_[1]->autoflush(1); - Dlog_trace { my $id = $self->_id; "connection had send_to_fh set to $_" } $_[1]; + my $self = $_[0]; + $_[1]->autoflush(1); + Dlog_trace { my $id = $self->_id; "connection had send_to_fh set to $_" } $_[1]; }, ); @@ -44,11 +47,11 @@ has read_channel => ( is => 'ro', required => 1, trigger => sub { my ($self, $ch) = @_; - my $id = $self->_id; + my $id = $self->_id; Dlog_trace { "trigger for read_channel has been invoked for connection $id; file handle is $_" } $ch->fh; weaken($self); $ch->on_line_call(sub { $self->_receive(@_) }); - $ch->on_close_call(sub { + $ch->on_close_call(sub { log_trace { "invoking 'done' on on_close handler for connection id '$id'" }; $self->on_close->done(@_); }); @@ -56,10 +59,10 @@ has read_channel => ( ); has on_close => ( - is => 'rw', default => sub { $_[0]->_install_future_handlers(CPS::Future->new) }, + is => 'rw', default => sub { $_[0]->_install_future_handlers(Future->new) }, trigger => sub { - log_trace { "Installing handlers into future via trigger" }; - $_[0]->_install_future_handlers($_[1]) + log_trace { "Installing handlers into future via trigger" }; + $_[0]->_install_future_handlers($_[1]) }, ); @@ -88,30 +91,53 @@ has _json => ( after BUILD => sub { my ($self) = @_; my $pid = $self->child_pid; - - unless (defined $pid) { - log_trace { "After BUILD invoked for connection but there was no pid" }; - return; - } - - log_trace { "Setting process group of child process '$pid'" }; - - setpgrp($self->child_pid, 1); + our %child_pids; + return unless defined $pid; + $child_pids{$pid} = 1; + return; }; sub BUILD { } +sub is_valid { + my ($self) = @_; + my $valid = ! $self->on_close->is_ready; + + log_trace { + my $id = $self->_id; + my $text; + if ($valid) { + $text = 'yes'; + } else { + $text = 'no'; + } + "Connection '$id' is valid: '$text'" + }; + + return $valid; +} + sub _fail_outstanding { my ($self, $error) = @_; - Dlog_debug { "$$ Failing outstanding futures with '$error' for connection $_" } $self->_id; my $outstanding = $self->outstanding_futures; - $_->fail("$error\n") for values %$outstanding; + + Dlog_debug { + sprintf "Failing %i outstanding futures with '$error'", scalar(keys(%$outstanding)) + }; + + foreach(keys(%$outstanding)) { + log_trace { "Failing future for $_" }; + my $future = $outstanding->{$_}; + $future->fail("$error\n"); + } + %$outstanding = (); return; } sub _install_future_handlers { my ($self, $f) = @_; + our %child_pids; Dlog_trace { "Installing handlers into future for connection $_" } $self->_id; weaken($self); $f->on_done(sub { @@ -132,8 +158,10 @@ sub _install_future_handlers { "Remote Perl interpreter exited with value '$exit_value'" }; } + + delete $child_pids{$pid}; }); - return $f; + return $f; }; sub _id_to_remote_object { @@ -185,13 +213,13 @@ sub _build__json { tie @tied_array, 'Object::Remote::Tied', $self->_id_to_remote_object(@_); return \@tied_array; } - ); + ); } sub _load_if_possible { - my ($class) = @_; + my ($class) = @_; - use_module($class); + use_module($class); if ($@) { log_debug { "Attempt at loading '$class' failed with '$@'" }; @@ -206,7 +234,8 @@ BEGIN { Object::Remote::Connector::LocalSudo Object::Remote::Connector::SSH Object::Remote::Connector::UNIX - ); + Object::Remote::Connector::INET + ); } sub conn_from_spec { @@ -216,19 +245,19 @@ sub conn_from_spec { return $conn; } } - + return undef; } sub new_from_spec { - my ($class, $spec) = @_; + my ($class, $spec, @args) = @_; return $spec if blessed $spec; - my $conn = $class->conn_from_spec($spec); - + my $conn = $class->conn_from_spec($spec, @args); + die "Couldn't figure out what to do with ${spec}" unless defined $conn; - - return $conn->maybe::start::connect; + + return $conn->maybe::start::connect; } sub remote_object { @@ -249,7 +278,7 @@ sub connect { sub remote_sub { my ($self, $sub) = @_; my ($pkg, $name) = $sub =~ m/^(.*)::([^:]+)$/; - Dlog_debug { "Invoking remote sub '$sub' for connection $_" } $self->_id; + Dlog_debug { "Invoking remote sub '$sub' for connection '$_'" } $self->_id; return await_future($self->send_class_call(0, $pkg, can => $name)); } @@ -297,7 +326,7 @@ sub send_free { sub send { my ($self, $type, @call) = @_; - my $future = CPS::Future->new; + my $future = Future->new; my $remote = $self->remote_objects_by_id->{$call[0]}; unshift @call, $type => $self->_local_object_to_id($future); @@ -325,27 +354,32 @@ sub send_discard { sub _send { my ($self, $to_send) = @_; my $fh = $self->send_to_fh; + + unless ($self->is_valid) { + croak "Attempt to invoke _send on a connection that is not valid"; + } + Dlog_trace { "Starting to serialize data in argument to _send for connection $_" } $self->_id; my $serialized = $self->_serialize($to_send)."\n"; Dlog_trace { my $l = length($serialized); "serialization is completed; sending '$l' characters of serialized data to $_" } $fh; - my $ret; - eval { + my $ret; + eval { #TODO this should be converted over to a non-blocking ::WriteChannel class die "filehandle is not open" unless openhandle($fh); log_trace { "file handle has passed openhandle() test; printing to it" }; $ret = print $fh $serialized; die "print was not successful: $!" unless defined $ret }; - + if ($@) { Dlog_debug { "exception encountered when trying to write to file handle $_: $@" } $fh; my $error = $@; chomp($error); $self->on_close->done("could not write to file handle: $error") unless $self->on_close->is_ready; - return; + return; } - - return $ret; + + return $ret; } sub _serialize { @@ -387,14 +421,14 @@ sub _deobjectify { if ($ref eq 'HASH') { my $tied_to = tied(%$data); if(defined($tied_to)) { - return +{__remote_tied_hash__ => $self->_local_object_to_id($tied_to)}; + return +{__remote_tied_hash__ => $self->_local_object_to_id($tied_to)}; } else { return +{ map +($_ => $self->_deobjectify($data->{$_})), keys %$data }; } } elsif ($ref eq 'ARRAY') { my $tied_to = tied(@$data); if (defined($tied_to)) { - return +{__remote_tied_array__ => $self->_local_object_to_id($tied_to)}; + return +{__remote_tied_array__ => $self->_local_object_to_id($tied_to)}; } else { return [ map $self->_deobjectify($_), @$data ]; } @@ -480,8 +514,28 @@ sub _invoke { Object::Remote::Connection - An underlying connection for L -=head1 LAME + use Object::Remote; + + my $local = Object::Remote->connect('-'); + my $remote = Object::Remote->connect('myserver'); + my $remote_user = Object::Remote->connect('user@myserver'); + my $local_sudo = Object::Remote->connect('user@'); + + #$remote can be any other connection object + my $hostname = Sys::Hostname->can::on($remote, 'hostname'); + +=head1 DESCRIPTION + +This is the class that supports connections to remote objects. + +=head1 SEE ALSO + +=over 4 + +=item C + +=item C -Shipping prioritised over writing this part up. Blame mst. +=back =cut