1 package Object::Remote::Connection;
3 use Object::Remote::Logging qw (:log :dlog router);
4 use Object::Remote::Future;
5 use Object::Remote::Null;
6 use Object::Remote::Handle;
7 use Object::Remote::CodeContainer;
8 use Object::Remote::GlobProxy;
9 use Object::Remote::GlobContainer;
10 use Object::Remote::Tied;
14 use POSIX ":sys_wait_h";
15 use Module::Runtime qw(use_module);
16 use Scalar::Util qw(weaken blessed refaddr openhandle);
17 use JSON::PP qw(encode_json);
21 BEGIN { router()->exclude_forwarding }
24 log_debug { "Killing all child processes in the process group" };
26 #FIXME update along with setpgrp() to not use a process
29 #send SIGINT to the process group for our children
33 has _id => ( is => 'ro', required => 1, default => sub { our $NEXT_CONNECTION_ID++ } );
36 is => 'ro', required => 1,
40 Dlog_trace { my $id = $self->_id; "connection had send_to_fh set to $_" } $_[1];
45 is => 'ro', required => 1,
49 Dlog_trace { "trigger for read_channel has been invoked for connection $id; file handle is $_" } $ch->fh;
51 $ch->on_line_call(sub { $self->_receive(@_) });
52 $ch->on_close_call(sub {
53 log_trace { "invoking 'done' on on_close handler for connection id '$id'" };
54 $self->on_close->done(@_);
60 is => 'rw', default => sub { $_[0]->_install_future_handlers(CPS::Future->new) },
62 log_trace { "Installing handlers into future via trigger" };
63 $_[0]->_install_future_handlers($_[1])
67 has child_pid => (is => 'ro');
69 has local_objects_by_id => (
70 is => 'ro', default => sub { {} },
71 coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
74 has remote_objects_by_id => (
75 is => 'ro', default => sub { {} },
76 coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
79 has outstanding_futures => (is => 'ro', default => sub { {} });
84 _deserialize => 'decode',
91 my $pid = $self->child_pid;
93 unless (defined $pid) {
94 log_trace { "After BUILD invoked for connection but there was no pid" };
98 log_trace { "Setting process group of child process '$pid'" };
100 #FIXME moving things into a process group has side effects for
101 #users of the library - move to a list
102 setpgrp($self->child_pid, 1);
109 my $valid = ! $self->on_close->is_ready;
119 "Connection '$id' is valid: '$text'"
125 sub _fail_outstanding {
126 my ($self, $error) = @_;
127 my $outstanding = $self->outstanding_futures;
130 sprintf "Failing %i outstanding futures with '$error'", scalar(keys(%$outstanding))
133 foreach(keys(%$outstanding)) {
134 log_trace { "Failing future for $_" };
135 my $future = $outstanding->{$_};
136 $future->fail("$error\n");
143 sub _install_future_handlers {
145 Dlog_trace { "Installing handlers into future for connection $_" } $self->_id;
148 my $pid = $self->child_pid;
149 Dlog_trace { "Executing on_done handler in future for connection $_" } $self->_id;
150 $self->_fail_outstanding("Object::Remote connection lost: " . ($f->get)[0]);
151 return unless defined $pid;
152 log_debug { "Waiting for child '$pid' to exit" };
153 my $ret = waitpid($pid, 0);
155 log_debug { "Waited for pid $pid but waitpid() returned $ret" };
158 log_warn { "Remote interpreter did not exit cleanly" };
161 my $exit_value = $? >> 8;
162 "Remote Perl interpreter exited with value '$exit_value'"
169 sub _id_to_remote_object {
170 my ($self, $id) = @_;
171 Dlog_trace { "fetching proxy for remote object with id '$id' for connection $_" } $self->_id;
172 return bless({}, 'Object::Remote::Null') if $id eq 'NULL';
174 $self->remote_objects_by_id->{$id}
175 or Object::Remote::Handle->new(connection => $self, id => $id)
180 weaken(my $self = shift);
181 JSON::PP->new->filter_json_single_key_object(
182 __remote_object__ => sub {
183 $self->_id_to_remote_object(@_);
185 )->filter_json_single_key_object(
186 __remote_code__ => sub {
187 my $code_container = $self->_id_to_remote_object(@_);
188 sub { $code_container->call(@_) };
190 )->filter_json_single_key_object(
191 __scalar_ref__ => sub {
195 )->filter_json_single_key_object(
196 __glob_ref__ => sub {
197 my $glob_container = $self->_id_to_remote_object(@_);
198 my $handle = Symbol::gensym;
199 tie *$handle, 'Object::Remote::GlobProxy', $glob_container;
202 )->filter_json_single_key_object(
203 __local_object__ => sub {
204 $self->local_objects_by_id->{$_[0]}
206 )->filter_json_single_key_object(
207 __remote_tied_hash__ => sub {
209 tie %tied_hash, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
212 )->filter_json_single_key_object(
213 __remote_tied_array__ => sub {
215 tie @tied_array, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
221 sub _load_if_possible {
227 log_debug { "Attempt at loading '$class' failed with '$@'" };
233 unshift our @Guess, sub { blessed($_[0]) ? $_[0] : undef };
234 map _load_if_possible($_), qw(
235 Object::Remote::Connector::Local
236 Object::Remote::Connector::LocalSudo
237 Object::Remote::Connector::SSH
238 Object::Remote::Connector::UNIX
243 my ($class, $spec, @args) = @_;
244 foreach my $poss (do { our @Guess }) {
245 if (my $conn = $poss->($spec, @args)) {
254 my ($class, $spec, @args) = @_;
255 return $spec if blessed $spec;
256 my $conn = $class->conn_from_spec($spec, @args);
258 die "Couldn't figure out what to do with ${spec}"
259 unless defined $conn;
261 return $conn->maybe::start::connect;
265 my ($self, @args) = @_;
266 Object::Remote::Handle->new(
267 connection => $self, @args
272 my ($self, $to) = @_;
273 Dlog_debug { "Creating connection to remote node '$to' for connection $_" } $self->_id;
275 $self->send_class_call(0, 'Object::Remote', connect => $to)
280 my ($self, $sub) = @_;
281 my ($pkg, $name) = $sub =~ m/^(.*)::([^:]+)$/;
282 Dlog_debug { "Invoking remote sub '$sub' for connection '$_'" } $self->_id;
283 return await_future($self->send_class_call(0, $pkg, can => $name));
286 sub send_class_call {
287 my ($self, $ctx, @call) = @_;
288 Dlog_trace { "Sending a class call for connection $_" } $self->_id;
289 $self->send(call => class_call_handler => $ctx => call => @call);
292 sub register_class_call_handler {
294 $self->local_objects_by_id->{'class_call_handler'} ||= do {
295 my $o = $self->new_class_call_handler;
296 $self->_local_object_to_id($o);
301 sub new_class_call_handler {
302 Object::Remote::CodeContainer->new(
304 my ($class, $method) = (shift, shift);
305 use_module($class)->$method(@_);
310 sub register_remote {
311 my ($self, $remote) = @_;
312 Dlog_trace { my $i = $remote->id; "Registered a remote object with id of '$i' for connection $_" } $self->_id;
313 weaken($self->remote_objects_by_id->{$remote->id} = $remote);
318 my ($self, $id) = @_;
319 Dlog_trace { "sending request to free object '$id' for connection $_" } $self->_id;
320 #TODO this shows up some times when a remote side dies in the middle of a remote
321 #method invocation - possibly only when the object is being constructed?
322 #(in cleanup) Use of uninitialized value $id in delete at ../Object-Remote/lib/Object/Remote/Connection.
323 delete $self->remote_objects_by_id->{$id};
324 $self->_send([ free => $id ]);
328 my ($self, $type, @call) = @_;
330 my $future = CPS::Future->new;
331 my $remote = $self->remote_objects_by_id->{$call[0]};
333 unshift @call, $type => $self->_local_object_to_id($future);
335 my $outstanding = $self->outstanding_futures;
336 $outstanding->{$future} = $future;
337 $future->on_ready(sub {
339 delete $outstanding->{$future}
342 $self->_send(\@call);
348 my ($self, $type, @call) = @_;
350 unshift @call, $type => 'NULL';
352 $self->_send(\@call);
356 my ($self, $to_send) = @_;
357 my $fh = $self->send_to_fh;
359 unless ($self->is_valid) {
360 croak "Attempt to invoke _send on a connection that is not valid";
363 Dlog_trace { "Starting to serialize data in argument to _send for connection $_" } $self->_id;
364 my $serialized = $self->_serialize($to_send)."\n";
365 Dlog_trace { my $l = length($serialized); "serialization is completed; sending '$l' characters of serialized data to $_" } $fh;
368 #TODO this should be converted over to a non-blocking ::WriteChannel class
369 die "filehandle is not open" unless openhandle($fh);
370 log_trace { "file handle has passed openhandle() test; printing to it" };
371 $ret = print $fh $serialized;
372 die "print was not successful: $!" unless defined $ret
376 Dlog_debug { "exception encountered when trying to write to file handle $_: $@" } $fh;
379 $self->on_close->done("could not write to file handle: $error") unless $self->on_close->is_ready;
387 my ($self, $data) = @_;
388 local our @New_Ids = (-1);
390 my $flat = $self->_encode($self->_deobjectify($data));
393 my $err = $@; # won't get here if the eval doesn't die
394 # don't keep refs to new things
395 delete @{$self->local_objects_by_id}{@New_Ids};
396 die "Error serializing: $err";
400 sub _local_object_to_id {
401 my ($self, $object) = @_;
402 my $id = refaddr($object);
403 $self->local_objects_by_id->{$id} ||= do {
404 push our(@New_Ids), $id if @New_Ids;
411 my ($self, $data) = @_;
412 if (blessed($data)) {
414 $data->isa('Object::Remote::Proxy')
415 and $data->{remote}->connection == $self
417 return +{ __local_object__ => $data->{remote}->id };
419 return +{ __remote_object__ => $self->_local_object_to_id($data) };
421 } elsif (my $ref = ref($data)) {
422 if ($ref eq 'HASH') {
423 my $tied_to = tied(%$data);
424 if(defined($tied_to)) {
425 return +{__remote_tied_hash__ => $self->_local_object_to_id($tied_to)};
427 return +{ map +($_ => $self->_deobjectify($data->{$_})), keys %$data };
429 } elsif ($ref eq 'ARRAY') {
430 my $tied_to = tied(@$data);
431 if (defined($tied_to)) {
432 return +{__remote_tied_array__ => $self->_local_object_to_id($tied_to)};
434 return [ map $self->_deobjectify($_), @$data ];
436 } elsif ($ref eq 'CODE') {
437 my $id = $self->_local_object_to_id(
438 Object::Remote::CodeContainer->new(code => $data)
440 return +{ __remote_code__ => $id };
441 } elsif ($ref eq 'SCALAR') {
442 return +{ __scalar_ref__ => $$data };
443 } elsif ($ref eq 'GLOB') {
444 return +{ __glob_ref__ => $self->_local_object_to_id(
445 Object::Remote::GlobContainer->new(handle => $data)
448 die "Can't collapse reftype $ref";
451 return $data; # plain scalar
455 my ($self, $flat) = @_;
456 Dlog_trace { my $l = length($flat); "Starting to deserialize $l characters of data for connection $_" } $self->_id;
457 my ($type, @rest) = eval { @{$self->_deserialize($flat)} }
458 or do { warn "Deserialize failed for ${flat}: $@"; return };
459 Dlog_trace { "deserialization complete for connection $_" } $self->_id;
460 eval { $self->${\"receive_${type}"}(@rest); 1 }
461 or do { warn "Receive failed for ${flat}: $@"; return };
466 my ($self, $id) = @_;
467 Dlog_trace { "got a receive_free for object '$id' for connection $_" } $self->_id;
468 delete $self->local_objects_by_id->{$id}
469 or warn "Free: no such object $id";
474 my ($self, $future_id, $id, @rest) = @_;
475 Dlog_trace { "got a receive_call for object '$id' for connection $_" } $self->_id;
476 my $future = $self->_id_to_remote_object($future_id);
477 $future->{method} = 'call_discard_free';
478 my $local = $self->local_objects_by_id->{$id}
479 or do { $future->fail("No such object $id"); return };
480 $self->_invoke($future, $local, @rest);
483 sub receive_call_free {
484 my ($self, $future, $id, @rest) = @_;
485 Dlog_trace { "got a receive_call_free for object '$id' for connection $_" } $self->_id;
486 $self->receive_call($future, $id, undef, @rest);
487 $self->receive_free($id);
491 my ($self, $future, $local, $ctx, $method, @args) = @_;
492 Dlog_trace { "got _invoke for a method named '$method' for connection $_" } $self->_id;
493 if ($method =~ /^start::/) {
494 my $f = $local->$method(@args);
495 $f->on_done(sub { undef($f); $future->done(@_) });
497 $f->on_fail(sub { undef($f); $future->fail(@_) });
500 my $do = sub { $local->$method(@args) };
504 ? ($ctx ? $do->() : scalar($do->()))
508 } or do { $future->fail($@); return; };
516 Object::Remote::Connection - An underlying connection for L<Object::Remote>
521 nice => '10', ulimit => '-v 400000',
522 watchdog_timeout => 120, stderr => \*STDERR,
525 my $local = Object::Remote->connect('-');
526 my $remote = Object::Remote->connect('myserver', nice => 5);
527 my $remote_user = Object::Remote->connect('user@myserver', %opts);
528 my $local_sudo = Object::Remote->connect('user@');
530 #$remote can be any other connection object
531 my $hostname = Sys::Hostname->can::on($remote, 'hostname');
535 This is the class that supports connections to a Perl interpreter that is executed in a
536 different process. The new Perl interpreter can be either on the local or a remote machine
537 and is configurable via arguments passed to the constructor.
545 If this value is defined then it will be used as the nice value of the Perl process when it
546 is started. The default is the undefined value and will not nice the process.
550 If this value is defined then it will be used as the file handle that receives the output
551 of STDERR from the Perl interpreter process and I/O will be performed by the run loop in a
552 non-blocking way. If the value is undefined then STDERR of the remote process will be connected
553 directly to STDERR of the local process with out the run loop managing I/O. The default value
556 There are a few ways to use this feature. By default the behavior is to form one unified STDERR
557 across all of the Perl interpreters including the local one. For small scale and quick operation
558 this offers a predictable and easy to use way to get at error messages generated anywhere. If
559 the local Perl interpreter crashes then the remote Perl interpreters still have an active STDERR
560 and it is possible to still receive output from them. This is generally a good thing but can
563 When using a file handle as the output for STDERR once the local Perl interpreter is no longer
564 running there is no longer a valid STDERR for the remote interpreters to send data to. This means
565 that it is no longer possible to receive error output from the remote interpreters and that the
566 shell will start to kill off the child processes. Passing a reference to STDERR for the local
567 interpreter (as the SYNOPSIS shows) causes the run loop to manage I/O, one unified STDERR for
568 all Perl interpreters that ends as soon as the local interpreter process does, and the shell will
569 start killing children when the local interpreter exits.
571 It is also possible to pass in a file handle that has been opened for writing. This would be
572 useful for logging the output of the remote interpreter directly into a dedicated file.
576 If this string is defined then it will be passed unmodified as the arguments to ulimit when
577 the Perl process is started. The default value is the undefined value and will not limit the
580 =item watchdog_timeout
582 If this value is defined then it will be used as the number of seconds the watchdog will wait
583 for an update before it terminates the Perl interpreter process. The default value is undefined
584 and will not use the watchdog. See C<Object::Remote::Watchdog> for more information.
592 =item C<Object::Remote>