1 package Object::Remote::Connection;
3 use Object::Remote::Logging qw (:log :dlog router);
4 use Object::Remote::Future;
5 use Object::Remote::Null;
6 use Object::Remote::Handle;
7 use Object::Remote::CodeContainer;
8 use Object::Remote::GlobProxy;
9 use Object::Remote::GlobContainer;
10 use Object::Remote::Tied;
14 use POSIX ":sys_wait_h";
15 use Module::Runtime qw(use_module);
16 use Scalar::Util qw(weaken blessed refaddr openhandle);
17 use JSON::PP qw(encode_json);
21 BEGIN { router()->exclude_forwarding }
24 log_debug { "Killing all child processes in the process group" };
26 #FIXME update along with setpgrp() to not use a process
29 #send SIGINT to the process group for our children
33 has _id => ( is => 'ro', required => 1, default => sub { our $NEXT_CONNECTION_ID++ } );
36 is => 'ro', required => 1,
40 Dlog_trace { my $id = $self->_id; "connection had send_to_fh set to $_" } $_[1];
45 is => 'ro', required => 1,
49 Dlog_trace { "trigger for read_channel has been invoked for connection $id; file handle is $_" } $ch->fh;
51 $ch->on_line_call(sub { $self->_receive(@_) });
52 $ch->on_close_call(sub {
53 log_trace { "invoking 'done' on on_close handler for connection id '$id'" };
54 $self->on_close->done(@_);
60 is => 'rw', default => sub { $_[0]->_install_future_handlers(CPS::Future->new) },
62 log_trace { "Installing handlers into future via trigger" };
63 $_[0]->_install_future_handlers($_[1])
67 has child_pid => (is => 'ro');
69 has local_objects_by_id => (
70 is => 'ro', default => sub { {} },
71 coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
74 has remote_objects_by_id => (
75 is => 'ro', default => sub { {} },
76 coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
79 has outstanding_futures => (is => 'ro', default => sub { {} });
84 _deserialize => 'decode',
91 my $pid = $self->child_pid;
93 unless (defined $pid) {
94 log_trace { "After BUILD invoked for connection but there was no pid" };
98 log_trace { "Setting process group of child process '$pid'" };
100 #FIXME moving things into a process group has side effects for
101 #users of the library - move to a list
102 setpgrp($self->child_pid, 1);
109 my $closed = $self->on_close->is_ready;
111 log_trace { "Connection closed: $closed" };
115 sub _fail_outstanding {
116 my ($self, $error) = @_;
117 my $outstanding = $self->outstanding_futures;
120 sprintf "Failing %i outstanding futures with '$error'", scalar(keys(%$outstanding))
123 foreach(keys(%$outstanding)) {
124 log_trace { "Failing future for $_" };
125 my $future = $outstanding->{$_};
126 $future->fail("$error\n");
133 sub _install_future_handlers {
135 Dlog_trace { "Installing handlers into future for connection $_" } $self->_id;
138 my $pid = $self->child_pid;
139 Dlog_trace { "Executing on_done handler in future for connection $_" } $self->_id;
140 $self->_fail_outstanding("Object::Remote connection lost: " . ($f->get)[0]);
141 return unless defined $pid;
142 log_debug { "Waiting for child '$pid' to exit" };
143 my $ret = waitpid($pid, 0);
145 log_debug { "Waited for pid $pid but waitpid() returned $ret" };
148 log_warn { "Remote interpreter did not exit cleanly" };
151 my $exit_value = $? >> 8;
152 "Remote Perl interpreter exited with value '$exit_value'"
159 sub _id_to_remote_object {
160 my ($self, $id) = @_;
161 Dlog_trace { "fetching proxy for remote object with id '$id' for connection $_" } $self->_id;
162 return bless({}, 'Object::Remote::Null') if $id eq 'NULL';
164 $self->remote_objects_by_id->{$id}
165 or Object::Remote::Handle->new(connection => $self, id => $id)
170 weaken(my $self = shift);
171 JSON::PP->new->filter_json_single_key_object(
172 __remote_object__ => sub {
173 $self->_id_to_remote_object(@_);
175 )->filter_json_single_key_object(
176 __remote_code__ => sub {
177 my $code_container = $self->_id_to_remote_object(@_);
178 sub { $code_container->call(@_) };
180 )->filter_json_single_key_object(
181 __scalar_ref__ => sub {
185 )->filter_json_single_key_object(
186 __glob_ref__ => sub {
187 my $glob_container = $self->_id_to_remote_object(@_);
188 my $handle = Symbol::gensym;
189 tie *$handle, 'Object::Remote::GlobProxy', $glob_container;
192 )->filter_json_single_key_object(
193 __local_object__ => sub {
194 $self->local_objects_by_id->{$_[0]}
196 )->filter_json_single_key_object(
197 __remote_tied_hash__ => sub {
199 tie %tied_hash, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
202 )->filter_json_single_key_object(
203 __remote_tied_array__ => sub {
205 tie @tied_array, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
211 sub _load_if_possible {
217 log_debug { "Attempt at loading '$class' failed with '$@'" };
223 unshift our @Guess, sub { blessed($_[0]) ? $_[0] : undef };
224 map _load_if_possible($_), qw(
225 Object::Remote::Connector::Local
226 Object::Remote::Connector::LocalSudo
227 Object::Remote::Connector::SSH
228 Object::Remote::Connector::UNIX
233 my ($class, $spec, @args) = @_;
234 foreach my $poss (do { our @Guess }) {
235 if (my $conn = $poss->($spec, @args)) {
244 my ($class, $spec, @args) = @_;
245 return $spec if blessed $spec;
246 my $conn = $class->conn_from_spec($spec, @args);
248 die "Couldn't figure out what to do with ${spec}"
249 unless defined $conn;
251 return $conn->maybe::start::connect;
255 my ($self, @args) = @_;
256 Object::Remote::Handle->new(
257 connection => $self, @args
262 my ($self, $to) = @_;
263 Dlog_debug { "Creating connection to remote node '$to' for connection $_" } $self->_id;
265 $self->send_class_call(0, 'Object::Remote', connect => $to)
270 my ($self, $sub) = @_;
271 my ($pkg, $name) = $sub =~ m/^(.*)::([^:]+)$/;
272 Dlog_debug { "Invoking remote sub '$sub' for connection $_" } $self->_id;
273 return await_future($self->send_class_call(0, $pkg, can => $name));
276 sub send_class_call {
277 my ($self, $ctx, @call) = @_;
278 Dlog_trace { "Sending a class call for connection $_" } $self->_id;
279 $self->send(call => class_call_handler => $ctx => call => @call);
282 sub register_class_call_handler {
284 $self->local_objects_by_id->{'class_call_handler'} ||= do {
285 my $o = $self->new_class_call_handler;
286 $self->_local_object_to_id($o);
291 sub new_class_call_handler {
292 Object::Remote::CodeContainer->new(
294 my ($class, $method) = (shift, shift);
295 use_module($class)->$method(@_);
300 sub register_remote {
301 my ($self, $remote) = @_;
302 Dlog_trace { my $i = $remote->id; "Registered a remote object with id of '$i' for connection $_" } $self->_id;
303 weaken($self->remote_objects_by_id->{$remote->id} = $remote);
308 my ($self, $id) = @_;
309 Dlog_trace { "sending request to free object '$id' for connection $_" } $self->_id;
310 #TODO this shows up some times when a remote side dies in the middle of a remote
311 #method invocation - possibly only when the object is being constructed?
312 #(in cleanup) Use of uninitialized value $id in delete at ../Object-Remote/lib/Object/Remote/Connection.
313 delete $self->remote_objects_by_id->{$id};
314 $self->_send([ free => $id ]);
318 my ($self, $type, @call) = @_;
320 my $future = CPS::Future->new;
321 my $remote = $self->remote_objects_by_id->{$call[0]};
323 unshift @call, $type => $self->_local_object_to_id($future);
325 my $outstanding = $self->outstanding_futures;
326 $outstanding->{$future} = $future;
327 $future->on_ready(sub {
329 delete $outstanding->{$future}
332 $self->_send(\@call);
338 my ($self, $type, @call) = @_;
340 unshift @call, $type => 'NULL';
342 $self->_send(\@call);
346 my ($self, $to_send) = @_;
347 my $fh = $self->send_to_fh;
349 unless ($self->is_valid) {
350 croak "Attempt to invoke _send on a connection that is not valid";
353 Dlog_trace { "Starting to serialize data in argument to _send for connection $_" } $self->_id;
354 my $serialized = $self->_serialize($to_send)."\n";
355 Dlog_trace { my $l = length($serialized); "serialization is completed; sending '$l' characters of serialized data to $_" } $fh;
358 #TODO this should be converted over to a non-blocking ::WriteChannel class
359 die "filehandle is not open" unless openhandle($fh);
360 log_trace { "file handle has passed openhandle() test; printing to it" };
361 $ret = print $fh $serialized;
362 die "print was not successful: $!" unless defined $ret
366 Dlog_debug { "exception encountered when trying to write to file handle $_: $@" } $fh;
369 $self->on_close->done("could not write to file handle: $error") unless $self->on_close->is_ready;
377 my ($self, $data) = @_;
378 local our @New_Ids = (-1);
380 my $flat = $self->_encode($self->_deobjectify($data));
383 my $err = $@; # won't get here if the eval doesn't die
384 # don't keep refs to new things
385 delete @{$self->local_objects_by_id}{@New_Ids};
386 die "Error serializing: $err";
390 sub _local_object_to_id {
391 my ($self, $object) = @_;
392 my $id = refaddr($object);
393 $self->local_objects_by_id->{$id} ||= do {
394 push our(@New_Ids), $id if @New_Ids;
401 my ($self, $data) = @_;
402 if (blessed($data)) {
404 $data->isa('Object::Remote::Proxy')
405 and $data->{remote}->connection == $self
407 return +{ __local_object__ => $data->{remote}->id };
409 return +{ __remote_object__ => $self->_local_object_to_id($data) };
411 } elsif (my $ref = ref($data)) {
412 if ($ref eq 'HASH') {
413 my $tied_to = tied(%$data);
414 if(defined($tied_to)) {
415 return +{__remote_tied_hash__ => $self->_local_object_to_id($tied_to)};
417 return +{ map +($_ => $self->_deobjectify($data->{$_})), keys %$data };
419 } elsif ($ref eq 'ARRAY') {
420 my $tied_to = tied(@$data);
421 if (defined($tied_to)) {
422 return +{__remote_tied_array__ => $self->_local_object_to_id($tied_to)};
424 return [ map $self->_deobjectify($_), @$data ];
426 } elsif ($ref eq 'CODE') {
427 my $id = $self->_local_object_to_id(
428 Object::Remote::CodeContainer->new(code => $data)
430 return +{ __remote_code__ => $id };
431 } elsif ($ref eq 'SCALAR') {
432 return +{ __scalar_ref__ => $$data };
433 } elsif ($ref eq 'GLOB') {
434 return +{ __glob_ref__ => $self->_local_object_to_id(
435 Object::Remote::GlobContainer->new(handle => $data)
438 die "Can't collapse reftype $ref";
441 return $data; # plain scalar
445 my ($self, $flat) = @_;
446 Dlog_trace { my $l = length($flat); "Starting to deserialize $l characters of data for connection $_" } $self->_id;
447 my ($type, @rest) = eval { @{$self->_deserialize($flat)} }
448 or do { warn "Deserialize failed for ${flat}: $@"; return };
449 Dlog_trace { "deserialization complete for connection $_" } $self->_id;
450 eval { $self->${\"receive_${type}"}(@rest); 1 }
451 or do { warn "Receive failed for ${flat}: $@"; return };
456 my ($self, $id) = @_;
457 Dlog_trace { "got a receive_free for object '$id' for connection $_" } $self->_id;
458 delete $self->local_objects_by_id->{$id}
459 or warn "Free: no such object $id";
464 my ($self, $future_id, $id, @rest) = @_;
465 Dlog_trace { "got a receive_call for object '$id' for connection $_" } $self->_id;
466 my $future = $self->_id_to_remote_object($future_id);
467 $future->{method} = 'call_discard_free';
468 my $local = $self->local_objects_by_id->{$id}
469 or do { $future->fail("No such object $id"); return };
470 $self->_invoke($future, $local, @rest);
473 sub receive_call_free {
474 my ($self, $future, $id, @rest) = @_;
475 Dlog_trace { "got a receive_call_free for object '$id' for connection $_" } $self->_id;
476 $self->receive_call($future, $id, undef, @rest);
477 $self->receive_free($id);
481 my ($self, $future, $local, $ctx, $method, @args) = @_;
482 Dlog_trace { "got _invoke for a method named '$method' for connection $_" } $self->_id;
483 if ($method =~ /^start::/) {
484 my $f = $local->$method(@args);
485 $f->on_done(sub { undef($f); $future->done(@_) });
487 $f->on_fail(sub { undef($f); $future->fail(@_) });
490 my $do = sub { $local->$method(@args) };
494 ? ($ctx ? $do->() : scalar($do->()))
498 } or do { $future->fail($@); return; };
506 Object::Remote::Connection - An underlying connection for L<Object::Remote>
511 nice => '10', ulimit => '-v 400000',
512 watchdog_timeout => 120, stderr => \*STDERR,
515 my $local = Object::Remote->connect('-');
516 my $remote = Object::Remote->connect('myserver', nice => 5);
517 my $remote_user = Object::Remote->connect('user@myserver', %opts);
518 my $local_sudo = Object::Remote->connect('user@');
520 #$remote can be any other connection object
521 my $hostname = Sys::Hostname->can::on($remote, 'hostname');
525 This is the class that supports connections to a Perl interpreter that is executed in a
526 different process. The new Perl interpreter can be either on the local or a remote machine
527 and is configurable via arguments passed to the constructor.
535 If this value is defined then it will be used as the nice value of the Perl process when it
536 is started. The default is the undefined value and will not nice the process.
540 If this value is defined then it will be used as the file handle that receives the output
541 of STDERR from the Perl interpreter process and I/O will be performed by the run loop in a
542 non-blocking way. If the value is undefined then STDERR of the remote process will be connected
543 directly to STDERR of the local process with out the run loop managing I/O. The default value
546 There are a few ways to use this feature. By default the behavior is to form one unified STDERR
547 across all of the Perl interpreters including the local one. For small scale and quick operation
548 this offers a predictable and easy to use way to get at error messages generated anywhere. If
549 the local Perl interpreter crashes then the remote Perl interpreters still have an active STDERR
550 and it is possible to still receive output from them. This is generally a good thing but can
553 When using a file handle as the output for STDERR once the local Perl interpreter is no longer
554 running there is no longer a valid STDERR for the remote interpreters to send data to. This means
555 that it is no longer possible to receive error output from the remote interpreters and that the
556 shell will start to kill off the child processes. Passing a reference to STDERR for the local
557 interpreter (as the SYNOPSIS shows) causes the run loop to manage I/O, one unified STDERR for
558 all Perl interpreters that ends as soon as the local interpreter process does, and the shell will
559 start killing children when the local interpreter exits.
561 It is also possible to pass in a file handle that has been opened for writing. This would be
562 useful for logging the output of the remote interpreter directly into a dedicated file.
566 If this string is defined then it will be passed unmodified as the arguments to ulimit when
567 the Perl process is started. The default value is the undefined value and will not limit the
570 =item watchdog_timeout
572 If this value is defined then it will be used as the number of seconds the watchdog will wait
573 for an update before it terminates the Perl interpreter process. The default value is undefined
574 and will not use the watchdog. See C<Object::Remote::Watchdog> for more information.
582 =item C<Object::Remote>