1 package Object::Remote::Connection;
3 use Object::Remote::Logging qw (:log :dlog router);
4 use Object::Remote::Future;
5 use Object::Remote::Null;
6 use Object::Remote::Handle;
7 use Object::Remote::CodeContainer;
8 use Object::Remote::GlobProxy;
9 use Object::Remote::GlobContainer;
10 use Object::Remote::Tied;
14 use POSIX ":sys_wait_h";
15 use Module::Runtime qw(use_module);
16 use Scalar::Util qw(weaken blessed refaddr openhandle);
17 use JSON::PP qw(encode_json);
21 BEGIN { router()->exclude_forwarding }
24 log_debug { "Killing all child processes in the process group" };
26 #send SIGINT to the process group for our children
30 has _id => ( is => 'ro', required => 1, default => sub { our $NEXT_CONNECTION_ID++ } );
33 is => 'ro', required => 1,
37 Dlog_trace { my $id = $self->_id; "connection had send_to_fh set to $_" } $_[1];
42 is => 'ro', required => 1,
46 Dlog_trace { "trigger for read_channel has been invoked for connection $id; file handle is $_" } $ch->fh;
48 $ch->on_line_call(sub { $self->_receive(@_) });
49 $ch->on_close_call(sub {
50 log_trace { "invoking 'done' on on_close handler for connection id '$id'" };
51 $self->on_close->done(@_);
57 is => 'rw', default => sub { $_[0]->_install_future_handlers(CPS::Future->new) },
59 log_trace { "Installing handlers into future via trigger" };
60 $_[0]->_install_future_handlers($_[1])
64 has child_pid => (is => 'ro');
66 has local_objects_by_id => (
67 is => 'ro', default => sub { {} },
68 coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
71 has remote_objects_by_id => (
72 is => 'ro', default => sub { {} },
73 coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
76 has outstanding_futures => (is => 'ro', default => sub { {} });
81 _deserialize => 'decode',
88 my $pid = $self->child_pid;
90 unless (defined $pid) {
91 log_trace { "After BUILD invoked for connection but there was no pid" };
95 log_trace { "Setting process group of child process '$pid'" };
97 setpgrp($self->child_pid, 1);
104 my $closed = $self->on_close->is_ready;
106 log_trace { "Connection closed: $closed" };
110 sub _fail_outstanding {
111 my ($self, $error) = @_;
112 my $outstanding = $self->outstanding_futures;
115 sprintf "Failing %i outstanding futures with '$error'", scalar(keys(%$outstanding))
118 foreach(keys(%$outstanding)) {
119 log_trace { "Failing future for $_" };
120 my $future = $outstanding->{$_};
121 $future->fail("$error\n");
128 sub _install_future_handlers {
130 Dlog_trace { "Installing handlers into future for connection $_" } $self->_id;
133 my $pid = $self->child_pid;
134 Dlog_trace { "Executing on_done handler in future for connection $_" } $self->_id;
135 $self->_fail_outstanding("Object::Remote connection lost: " . ($f->get)[0]);
136 return unless defined $pid;
137 log_debug { "Waiting for child '$pid' to exit" };
138 my $ret = waitpid($pid, 0);
140 log_debug { "Waited for pid $pid but waitpid() returned $ret" };
143 log_warn { "Remote interpreter did not exit cleanly" };
146 my $exit_value = $? >> 8;
147 "Remote Perl interpreter exited with value '$exit_value'"
154 sub _id_to_remote_object {
155 my ($self, $id) = @_;
156 Dlog_trace { "fetching proxy for remote object with id '$id' for connection $_" } $self->_id;
157 return bless({}, 'Object::Remote::Null') if $id eq 'NULL';
159 $self->remote_objects_by_id->{$id}
160 or Object::Remote::Handle->new(connection => $self, id => $id)
165 weaken(my $self = shift);
166 JSON::PP->new->filter_json_single_key_object(
167 __remote_object__ => sub {
168 $self->_id_to_remote_object(@_);
170 )->filter_json_single_key_object(
171 __remote_code__ => sub {
172 my $code_container = $self->_id_to_remote_object(@_);
173 sub { $code_container->call(@_) };
175 )->filter_json_single_key_object(
176 __scalar_ref__ => sub {
180 )->filter_json_single_key_object(
181 __glob_ref__ => sub {
182 my $glob_container = $self->_id_to_remote_object(@_);
183 my $handle = Symbol::gensym;
184 tie *$handle, 'Object::Remote::GlobProxy', $glob_container;
187 )->filter_json_single_key_object(
188 __local_object__ => sub {
189 $self->local_objects_by_id->{$_[0]}
191 )->filter_json_single_key_object(
192 __remote_tied_hash__ => sub {
194 tie %tied_hash, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
197 )->filter_json_single_key_object(
198 __remote_tied_array__ => sub {
200 tie @tied_array, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
206 sub _load_if_possible {
212 log_debug { "Attempt at loading '$class' failed with '$@'" };
218 unshift our @Guess, sub { blessed($_[0]) ? $_[0] : undef };
219 map _load_if_possible($_), qw(
220 Object::Remote::Connector::Local
221 Object::Remote::Connector::LocalSudo
222 Object::Remote::Connector::SSH
223 Object::Remote::Connector::UNIX
228 my ($class, $spec, @args) = @_;
229 foreach my $poss (do { our @Guess }) {
230 if (my $conn = $poss->($spec, @args)) {
239 my ($class, $spec, @args) = @_;
240 return $spec if blessed $spec;
241 my $conn = $class->conn_from_spec($spec, @args);
243 die "Couldn't figure out what to do with ${spec}"
244 unless defined $conn;
246 return $conn->maybe::start::connect;
250 my ($self, @args) = @_;
251 Object::Remote::Handle->new(
252 connection => $self, @args
257 my ($self, $to) = @_;
258 Dlog_debug { "Creating connection to remote node '$to' for connection $_" } $self->_id;
260 $self->send_class_call(0, 'Object::Remote', connect => $to)
265 my ($self, $sub) = @_;
266 my ($pkg, $name) = $sub =~ m/^(.*)::([^:]+)$/;
267 Dlog_debug { "Invoking remote sub '$sub' for connection $_" } $self->_id;
268 return await_future($self->send_class_call(0, $pkg, can => $name));
271 sub send_class_call {
272 my ($self, $ctx, @call) = @_;
273 Dlog_trace { "Sending a class call for connection $_" } $self->_id;
274 $self->send(call => class_call_handler => $ctx => call => @call);
277 sub register_class_call_handler {
279 $self->local_objects_by_id->{'class_call_handler'} ||= do {
280 my $o = $self->new_class_call_handler;
281 $self->_local_object_to_id($o);
286 sub new_class_call_handler {
287 Object::Remote::CodeContainer->new(
289 my ($class, $method) = (shift, shift);
290 use_module($class)->$method(@_);
295 sub register_remote {
296 my ($self, $remote) = @_;
297 Dlog_trace { my $i = $remote->id; "Registered a remote object with id of '$i' for connection $_" } $self->_id;
298 weaken($self->remote_objects_by_id->{$remote->id} = $remote);
303 my ($self, $id) = @_;
304 Dlog_trace { "sending request to free object '$id' for connection $_" } $self->_id;
305 #TODO this shows up some times when a remote side dies in the middle of a remote
306 #method invocation - possibly only when the object is being constructed?
307 #(in cleanup) Use of uninitialized value $id in delete at ../Object-Remote/lib/Object/Remote/Connection.
308 delete $self->remote_objects_by_id->{$id};
309 $self->_send([ free => $id ]);
313 my ($self, $type, @call) = @_;
315 my $future = CPS::Future->new;
316 my $remote = $self->remote_objects_by_id->{$call[0]};
318 unshift @call, $type => $self->_local_object_to_id($future);
320 my $outstanding = $self->outstanding_futures;
321 $outstanding->{$future} = $future;
322 $future->on_ready(sub {
324 delete $outstanding->{$future}
327 $self->_send(\@call);
333 my ($self, $type, @call) = @_;
335 unshift @call, $type => 'NULL';
337 $self->_send(\@call);
341 my ($self, $to_send) = @_;
342 my $fh = $self->send_to_fh;
344 unless ($self->is_valid) {
345 croak "Attempt to invoke _send on a connection that is not valid";
348 Dlog_trace { "Starting to serialize data in argument to _send for connection $_" } $self->_id;
349 my $serialized = $self->_serialize($to_send)."\n";
350 Dlog_trace { my $l = length($serialized); "serialization is completed; sending '$l' characters of serialized data to $_" } $fh;
353 #TODO this should be converted over to a non-blocking ::WriteChannel class
354 die "filehandle is not open" unless openhandle($fh);
355 log_trace { "file handle has passed openhandle() test; printing to it" };
356 $ret = print $fh $serialized;
357 die "print was not successful: $!" unless defined $ret
361 Dlog_debug { "exception encountered when trying to write to file handle $_: $@" } $fh;
364 $self->on_close->done("could not write to file handle: $error") unless $self->on_close->is_ready;
372 my ($self, $data) = @_;
373 local our @New_Ids = (-1);
375 my $flat = $self->_encode($self->_deobjectify($data));
378 my $err = $@; # won't get here if the eval doesn't die
379 # don't keep refs to new things
380 delete @{$self->local_objects_by_id}{@New_Ids};
381 die "Error serializing: $err";
385 sub _local_object_to_id {
386 my ($self, $object) = @_;
387 my $id = refaddr($object);
388 $self->local_objects_by_id->{$id} ||= do {
389 push our(@New_Ids), $id if @New_Ids;
396 my ($self, $data) = @_;
397 if (blessed($data)) {
399 $data->isa('Object::Remote::Proxy')
400 and $data->{remote}->connection == $self
402 return +{ __local_object__ => $data->{remote}->id };
404 return +{ __remote_object__ => $self->_local_object_to_id($data) };
406 } elsif (my $ref = ref($data)) {
407 if ($ref eq 'HASH') {
408 my $tied_to = tied(%$data);
409 if(defined($tied_to)) {
410 return +{__remote_tied_hash__ => $self->_local_object_to_id($tied_to)};
412 return +{ map +($_ => $self->_deobjectify($data->{$_})), keys %$data };
414 } elsif ($ref eq 'ARRAY') {
415 my $tied_to = tied(@$data);
416 if (defined($tied_to)) {
417 return +{__remote_tied_array__ => $self->_local_object_to_id($tied_to)};
419 return [ map $self->_deobjectify($_), @$data ];
421 } elsif ($ref eq 'CODE') {
422 my $id = $self->_local_object_to_id(
423 Object::Remote::CodeContainer->new(code => $data)
425 return +{ __remote_code__ => $id };
426 } elsif ($ref eq 'SCALAR') {
427 return +{ __scalar_ref__ => $$data };
428 } elsif ($ref eq 'GLOB') {
429 return +{ __glob_ref__ => $self->_local_object_to_id(
430 Object::Remote::GlobContainer->new(handle => $data)
433 die "Can't collapse reftype $ref";
436 return $data; # plain scalar
440 my ($self, $flat) = @_;
441 Dlog_trace { my $l = length($flat); "Starting to deserialize $l characters of data for connection $_" } $self->_id;
442 my ($type, @rest) = eval { @{$self->_deserialize($flat)} }
443 or do { warn "Deserialize failed for ${flat}: $@"; return };
444 Dlog_trace { "deserialization complete for connection $_" } $self->_id;
445 eval { $self->${\"receive_${type}"}(@rest); 1 }
446 or do { warn "Receive failed for ${flat}: $@"; return };
451 my ($self, $id) = @_;
452 Dlog_trace { "got a receive_free for object '$id' for connection $_" } $self->_id;
453 delete $self->local_objects_by_id->{$id}
454 or warn "Free: no such object $id";
459 my ($self, $future_id, $id, @rest) = @_;
460 Dlog_trace { "got a receive_call for object '$id' for connection $_" } $self->_id;
461 my $future = $self->_id_to_remote_object($future_id);
462 $future->{method} = 'call_discard_free';
463 my $local = $self->local_objects_by_id->{$id}
464 or do { $future->fail("No such object $id"); return };
465 $self->_invoke($future, $local, @rest);
468 sub receive_call_free {
469 my ($self, $future, $id, @rest) = @_;
470 Dlog_trace { "got a receive_call_free for object '$id' for connection $_" } $self->_id;
471 $self->receive_call($future, $id, undef, @rest);
472 $self->receive_free($id);
476 my ($self, $future, $local, $ctx, $method, @args) = @_;
477 Dlog_trace { "got _invoke for a method named '$method' for connection $_" } $self->_id;
478 if ($method =~ /^start::/) {
479 my $f = $local->$method(@args);
480 $f->on_done(sub { undef($f); $future->done(@_) });
482 $f->on_fail(sub { undef($f); $future->fail(@_) });
485 my $do = sub { $local->$method(@args) };
489 ? ($ctx ? $do->() : scalar($do->()))
493 } or do { $future->fail($@); return; };
501 Object::Remote::Connection - An underlying connection for L<Object::Remote>
506 nice => '10', ulimit => '-v 400000',
507 watchdog_timeout => 120, stderr => \*STDERR,
510 my $local = Object::Remote->connect('-');
511 my $remote = Object::Remote->connect('myserver', nice => 5);
512 my $remote_user = Object::Remote->connect('user@myserver', %opts);
513 my $local_sudo = Object::Remote->connect('user@');
515 #$remote can be any other connection object
516 my $hostname = Sys::Hostname->can::on($remote, 'hostname');
520 This is the class that supports connections to a Perl interpreter that is executed in a
521 different process. The new Perl interpreter can be either on the local or a remote machine
522 and is configurable via arguments passed to the constructor.
530 If this value is defined then it will be used as the nice value of the Perl process when it
531 is started. The default is the undefined value and will not nice the process.
535 If this value is defined then it will be used as the file handle that receives the output
536 of STDERR from the Perl interpreter process and I/O will be performed by the run loop in a
537 non-blocking way. If the value is undefined then STDERR of the remote process will be connected
538 directly to STDERR of the local process with out the run loop managing I/O. The default value
541 There are a few ways to use this feature. By default the behavior is to form one unified STDERR
542 across all of the Perl interpreters including the local one. For small scale and quick operation
543 this offers a predictable and easy to use way to get at error messages generated anywhere. If
544 the local Perl interpreter crashes then the remote Perl interpreters still have an active STDERR
545 and it is possible to still receive output from them. This is generally a good thing but can
548 When using a file handle as the output for STDERR once the local Perl interpreter is no longer
549 running there is no longer a valid STDERR for the remote interpreters to send data to. This means
550 that it is no longer possible to receive error output from the remote interpreters and that the
551 shell will start to kill off the child processes. Passing a reference to STDERR for the local
552 interpreter (as the SYNOPSIS shows) causes the run loop to manage I/O, one unified STDERR for
553 all Perl interpreters that ends as soon as the local interpreter process does, and the shell will
554 start killing children when the local interpreter exits.
556 It is also possible to pass in a file handle that has been opened for writing. This would be
557 useful for logging the output of the remote interpreter directly into a dedicated file.
561 If this string is defined then it will be passed unmodified as the arguments to ulimit when
562 the Perl process is started. The default value is the undefined value and will not limit the
565 =item watchdog_timeout
567 If this value is defined then it will be used as the number of seconds the watchdog will wait
568 for an update before it terminates the Perl interpreter process. The default value is undefined
569 and will not use the watchdog. See C<Object::Remote::Watchdog> for more information.
577 =item C<Object::Remote>