1 package Object::Remote::Connection;
3 use Object::Remote::Logging qw (:log :dlog router);
4 use Object::Remote::Future;
5 use Object::Remote::Null;
6 use Object::Remote::Handle;
7 use Object::Remote::CodeContainer;
8 use Object::Remote::GlobProxy;
9 use Object::Remote::GlobContainer;
10 use Object::Remote::Tied;
14 use POSIX ":sys_wait_h";
15 use Module::Runtime qw(use_module);
16 use Scalar::Util qw(weaken blessed refaddr openhandle);
17 use JSON::PP qw(encode_json);
21 BEGIN { router()->exclude_forwarding }
26 log_trace { "END handler is being invoked in " . __PACKAGE__ };
28 foreach(keys(%child_pids)) {
29 log_debug { "Killing child process '$_'" };
34 has _id => ( is => 'ro', required => 1, default => sub { our $NEXT_CONNECTION_ID++ } );
37 is => 'ro', required => 1,
41 Dlog_trace { my $id = $self->_id; "connection had send_to_fh set to $_" } $_[1];
46 is => 'ro', required => 1,
50 Dlog_trace { "trigger for read_channel has been invoked for connection $id; file handle is $_" } $ch->fh;
52 $ch->on_line_call(sub { $self->_receive(@_) });
53 $ch->on_close_call(sub {
54 log_trace { "invoking 'done' on on_close handler for connection id '$id'" };
55 $self->on_close->done(@_);
61 is => 'rw', default => sub { $_[0]->_install_future_handlers(CPS::Future->new) },
63 log_trace { "Installing handlers into future via trigger" };
64 $_[0]->_install_future_handlers($_[1])
68 has child_pid => (is => 'ro');
70 has local_objects_by_id => (
71 is => 'ro', default => sub { {} },
72 coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
75 has remote_objects_by_id => (
76 is => 'ro', default => sub { {} },
77 coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
80 has outstanding_futures => (is => 'ro', default => sub { {} });
85 _deserialize => 'decode',
92 my $pid = $self->child_pid;
94 return unless defined $pid;
95 $child_pids{$pid} = 1;
103 my $valid = ! $self->on_close->is_ready;
113 "Connection '$id' is valid: '$text'"
119 sub _fail_outstanding {
120 my ($self, $error) = @_;
121 my $outstanding = $self->outstanding_futures;
124 sprintf "Failing %i outstanding futures with '$error'", scalar(keys(%$outstanding))
127 foreach(keys(%$outstanding)) {
128 log_trace { "Failing future for $_" };
129 my $future = $outstanding->{$_};
130 $future->fail("$error\n");
137 sub _install_future_handlers {
140 Dlog_trace { "Installing handlers into future for connection $_" } $self->_id;
143 my $pid = $self->child_pid;
144 Dlog_trace { "Executing on_done handler in future for connection $_" } $self->_id;
145 $self->_fail_outstanding("Object::Remote connection lost: " . ($f->get)[0]);
146 return unless defined $pid;
147 log_debug { "Waiting for child '$pid' to exit" };
148 my $ret = waitpid($pid, 0);
150 log_debug { "Waited for pid $pid but waitpid() returned $ret" };
153 log_warn { "Remote interpreter did not exit cleanly" };
156 my $exit_value = $? >> 8;
157 "Remote Perl interpreter exited with value '$exit_value'"
161 delete $child_pids{$pid};
166 sub _id_to_remote_object {
167 my ($self, $id) = @_;
168 Dlog_trace { "fetching proxy for remote object with id '$id' for connection $_" } $self->_id;
169 return bless({}, 'Object::Remote::Null') if $id eq 'NULL';
171 $self->remote_objects_by_id->{$id}
172 or Object::Remote::Handle->new(connection => $self, id => $id)
177 weaken(my $self = shift);
178 JSON::PP->new->filter_json_single_key_object(
179 __remote_object__ => sub {
180 $self->_id_to_remote_object(@_);
182 )->filter_json_single_key_object(
183 __remote_code__ => sub {
184 my $code_container = $self->_id_to_remote_object(@_);
185 sub { $code_container->call(@_) };
187 )->filter_json_single_key_object(
188 __scalar_ref__ => sub {
192 )->filter_json_single_key_object(
193 __glob_ref__ => sub {
194 my $glob_container = $self->_id_to_remote_object(@_);
195 my $handle = Symbol::gensym;
196 tie *$handle, 'Object::Remote::GlobProxy', $glob_container;
199 )->filter_json_single_key_object(
200 __local_object__ => sub {
201 $self->local_objects_by_id->{$_[0]}
203 )->filter_json_single_key_object(
204 __remote_tied_hash__ => sub {
206 tie %tied_hash, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
209 )->filter_json_single_key_object(
210 __remote_tied_array__ => sub {
212 tie @tied_array, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
218 sub _load_if_possible {
224 log_debug { "Attempt at loading '$class' failed with '$@'" };
230 unshift our @Guess, sub { blessed($_[0]) ? $_[0] : undef };
231 map _load_if_possible($_), qw(
232 Object::Remote::Connector::Local
233 Object::Remote::Connector::LocalSudo
234 Object::Remote::Connector::SSH
235 Object::Remote::Connector::UNIX
240 my ($class, $spec, @args) = @_;
241 foreach my $poss (do { our @Guess }) {
242 if (my $conn = $poss->($spec, @args)) {
251 my ($class, $spec, @args) = @_;
252 return $spec if blessed $spec;
253 my $conn = $class->conn_from_spec($spec, @args);
255 die "Couldn't figure out what to do with ${spec}"
256 unless defined $conn;
258 return $conn->maybe::start::connect;
262 my ($self, @args) = @_;
263 Object::Remote::Handle->new(
264 connection => $self, @args
269 my ($self, $to) = @_;
270 Dlog_debug { "Creating connection to remote node '$to' for connection $_" } $self->_id;
272 $self->send_class_call(0, 'Object::Remote', connect => $to)
277 my ($self, $sub) = @_;
278 my ($pkg, $name) = $sub =~ m/^(.*)::([^:]+)$/;
279 Dlog_debug { "Invoking remote sub '$sub' for connection '$_'" } $self->_id;
280 return await_future($self->send_class_call(0, $pkg, can => $name));
283 sub send_class_call {
284 my ($self, $ctx, @call) = @_;
285 Dlog_trace { "Sending a class call for connection $_" } $self->_id;
286 $self->send(call => class_call_handler => $ctx => call => @call);
289 sub register_class_call_handler {
291 $self->local_objects_by_id->{'class_call_handler'} ||= do {
292 my $o = $self->new_class_call_handler;
293 $self->_local_object_to_id($o);
298 sub new_class_call_handler {
299 Object::Remote::CodeContainer->new(
301 my ($class, $method) = (shift, shift);
302 use_module($class)->$method(@_);
307 sub register_remote {
308 my ($self, $remote) = @_;
309 Dlog_trace { my $i = $remote->id; "Registered a remote object with id of '$i' for connection $_" } $self->_id;
310 weaken($self->remote_objects_by_id->{$remote->id} = $remote);
315 my ($self, $id) = @_;
316 Dlog_trace { "sending request to free object '$id' for connection $_" } $self->_id;
317 #TODO this shows up some times when a remote side dies in the middle of a remote
318 #method invocation - possibly only when the object is being constructed?
319 #(in cleanup) Use of uninitialized value $id in delete at ../Object-Remote/lib/Object/Remote/Connection.
320 delete $self->remote_objects_by_id->{$id};
321 $self->_send([ free => $id ]);
325 my ($self, $type, @call) = @_;
327 my $future = CPS::Future->new;
328 my $remote = $self->remote_objects_by_id->{$call[0]};
330 unshift @call, $type => $self->_local_object_to_id($future);
332 my $outstanding = $self->outstanding_futures;
333 $outstanding->{$future} = $future;
334 $future->on_ready(sub {
336 delete $outstanding->{$future}
339 $self->_send(\@call);
345 my ($self, $type, @call) = @_;
347 unshift @call, $type => 'NULL';
349 $self->_send(\@call);
353 my ($self, $to_send) = @_;
354 my $fh = $self->send_to_fh;
356 unless ($self->is_valid) {
357 croak "Attempt to invoke _send on a connection that is not valid";
360 Dlog_trace { "Starting to serialize data in argument to _send for connection $_" } $self->_id;
361 my $serialized = $self->_serialize($to_send)."\n";
362 Dlog_trace { my $l = length($serialized); "serialization is completed; sending '$l' characters of serialized data to $_" } $fh;
365 #TODO this should be converted over to a non-blocking ::WriteChannel class
366 die "filehandle is not open" unless openhandle($fh);
367 log_trace { "file handle has passed openhandle() test; printing to it" };
368 $ret = print $fh $serialized;
369 die "print was not successful: $!" unless defined $ret
373 Dlog_debug { "exception encountered when trying to write to file handle $_: $@" } $fh;
376 $self->on_close->done("could not write to file handle: $error") unless $self->on_close->is_ready;
384 my ($self, $data) = @_;
385 local our @New_Ids = (-1);
387 my $flat = $self->_encode($self->_deobjectify($data));
390 my $err = $@; # won't get here if the eval doesn't die
391 # don't keep refs to new things
392 delete @{$self->local_objects_by_id}{@New_Ids};
393 die "Error serializing: $err";
397 sub _local_object_to_id {
398 my ($self, $object) = @_;
399 my $id = refaddr($object);
400 $self->local_objects_by_id->{$id} ||= do {
401 push our(@New_Ids), $id if @New_Ids;
408 my ($self, $data) = @_;
409 if (blessed($data)) {
411 $data->isa('Object::Remote::Proxy')
412 and $data->{remote}->connection == $self
414 return +{ __local_object__ => $data->{remote}->id };
416 return +{ __remote_object__ => $self->_local_object_to_id($data) };
418 } elsif (my $ref = ref($data)) {
419 if ($ref eq 'HASH') {
420 my $tied_to = tied(%$data);
421 if(defined($tied_to)) {
422 return +{__remote_tied_hash__ => $self->_local_object_to_id($tied_to)};
424 return +{ map +($_ => $self->_deobjectify($data->{$_})), keys %$data };
426 } elsif ($ref eq 'ARRAY') {
427 my $tied_to = tied(@$data);
428 if (defined($tied_to)) {
429 return +{__remote_tied_array__ => $self->_local_object_to_id($tied_to)};
431 return [ map $self->_deobjectify($_), @$data ];
433 } elsif ($ref eq 'CODE') {
434 my $id = $self->_local_object_to_id(
435 Object::Remote::CodeContainer->new(code => $data)
437 return +{ __remote_code__ => $id };
438 } elsif ($ref eq 'SCALAR') {
439 return +{ __scalar_ref__ => $$data };
440 } elsif ($ref eq 'GLOB') {
441 return +{ __glob_ref__ => $self->_local_object_to_id(
442 Object::Remote::GlobContainer->new(handle => $data)
445 die "Can't collapse reftype $ref";
448 return $data; # plain scalar
452 my ($self, $flat) = @_;
453 Dlog_trace { my $l = length($flat); "Starting to deserialize $l characters of data for connection $_" } $self->_id;
454 my ($type, @rest) = eval { @{$self->_deserialize($flat)} }
455 or do { warn "Deserialize failed for ${flat}: $@"; return };
456 Dlog_trace { "deserialization complete for connection $_" } $self->_id;
457 eval { $self->${\"receive_${type}"}(@rest); 1 }
458 or do { warn "Receive failed for ${flat}: $@"; return };
463 my ($self, $id) = @_;
464 Dlog_trace { "got a receive_free for object '$id' for connection $_" } $self->_id;
465 delete $self->local_objects_by_id->{$id}
466 or warn "Free: no such object $id";
471 my ($self, $future_id, $id, @rest) = @_;
472 Dlog_trace { "got a receive_call for object '$id' for connection $_" } $self->_id;
473 my $future = $self->_id_to_remote_object($future_id);
474 $future->{method} = 'call_discard_free';
475 my $local = $self->local_objects_by_id->{$id}
476 or do { $future->fail("No such object $id"); return };
477 $self->_invoke($future, $local, @rest);
480 sub receive_call_free {
481 my ($self, $future, $id, @rest) = @_;
482 Dlog_trace { "got a receive_call_free for object '$id' for connection $_" } $self->_id;
483 $self->receive_call($future, $id, undef, @rest);
484 $self->receive_free($id);
488 my ($self, $future, $local, $ctx, $method, @args) = @_;
489 Dlog_trace { "got _invoke for a method named '$method' for connection $_" } $self->_id;
490 if ($method =~ /^start::/) {
491 my $f = $local->$method(@args);
492 $f->on_done(sub { undef($f); $future->done(@_) });
494 $f->on_fail(sub { undef($f); $future->fail(@_) });
497 my $do = sub { $local->$method(@args) };
501 ? ($ctx ? $do->() : scalar($do->()))
505 } or do { $future->fail($@); return; };
513 Object::Remote::Connection - An underlying connection for L<Object::Remote>
518 nice => '10', ulimit => '-v 400000',
519 watchdog_timeout => 120, stderr => \*STDERR,
522 my $local = Object::Remote->connect('-');
523 my $remote = Object::Remote->connect('myserver', nice => 5);
524 my $remote_user = Object::Remote->connect('user@myserver', %opts);
525 my $local_sudo = Object::Remote->connect('user@');
527 #$remote can be any other connection object
528 my $hostname = Sys::Hostname->can::on($remote, 'hostname');
532 This is the class that supports connections to a Perl interpreter that is executed in a
533 different process. The new Perl interpreter can be either on the local or a remote machine
534 and is configurable via arguments passed to the constructor.
542 If this value is defined then it will be used as the nice value of the Perl process when it
543 is started. The default is the undefined value and will not nice the process.
547 If this value is defined then it will be used as the file handle that receives the output
548 of STDERR from the Perl interpreter process and I/O will be performed by the run loop in a
549 non-blocking way. If the value is undefined then STDERR of the remote process will be connected
550 directly to STDERR of the local process with out the run loop managing I/O. The default value
553 There are a few ways to use this feature. By default the behavior is to form one unified STDERR
554 across all of the Perl interpreters including the local one. For small scale and quick operation
555 this offers a predictable and easy to use way to get at error messages generated anywhere. If
556 the local Perl interpreter crashes then the remote Perl interpreters still have an active STDERR
557 and it is possible to still receive output from them. This is generally a good thing but can
560 When using a file handle as the output for STDERR once the local Perl interpreter is no longer
561 running there is no longer a valid STDERR for the remote interpreters to send data to. This means
562 that it is no longer possible to receive error output from the remote interpreters and that the
563 shell will start to kill off the child processes. Passing a reference to STDERR for the local
564 interpreter (as the SYNOPSIS shows) causes the run loop to manage I/O, one unified STDERR for
565 all Perl interpreters that ends as soon as the local interpreter process does, and the shell will
566 start killing children when the local interpreter exits.
568 It is also possible to pass in a file handle that has been opened for writing. This would be
569 useful for logging the output of the remote interpreter directly into a dedicated file.
573 If this string is defined then it will be passed unmodified as the arguments to ulimit when
574 the Perl process is started. The default value is the undefined value and will not limit the
577 =item watchdog_timeout
579 If this value is defined then it will be used as the number of seconds the watchdog will wait
580 for an update before it terminates the Perl interpreter process. The default value is undefined
581 and will not use the watchdog. See C<Object::Remote::Watchdog> for more information.
589 =item C<Object::Remote>