1 package Object::Remote::Connection;
3 use Object::Remote::Logging qw (:log :dlog router);
4 use Object::Remote::Future;
5 use Object::Remote::Null;
6 use Object::Remote::Handle;
7 use Object::Remote::CodeContainer;
8 use Object::Remote::GlobProxy;
9 use Object::Remote::GlobContainer;
10 use Object::Remote::Tied;
14 use POSIX ":sys_wait_h";
15 use Module::Runtime qw(use_module);
16 use Scalar::Util qw(weaken blessed refaddr openhandle);
17 use JSON::PP qw(encode_json);
22 BEGIN { router()->exclude_forwarding }
27 log_trace { "END handler is being invoked in " . __PACKAGE__ };
29 foreach(keys(%child_pids)) {
30 log_debug { "Killing child process '$_'" };
35 has _id => ( is => 'ro', required => 1, default => sub { our $NEXT_CONNECTION_ID++ } );
38 is => 'ro', required => 1,
42 Dlog_trace { my $id = $self->_id; "connection had send_to_fh set to $_" } $_[1];
47 is => 'ro', required => 1,
51 Dlog_trace { "trigger for read_channel has been invoked for connection $id; file handle is $_" } $ch->fh;
53 $ch->on_line_call(sub { $self->_receive(@_) });
54 $ch->on_close_call(sub {
55 log_trace { "invoking 'done' on on_close handler for connection id '$id'" };
56 $self->on_close->done(@_);
62 is => 'rw', default => sub { $_[0]->_install_future_handlers(Future->new) },
64 log_trace { "Installing handlers into future via trigger" };
65 $_[0]->_install_future_handlers($_[1])
69 has child_pid => (is => 'ro');
71 has local_objects_by_id => (
72 is => 'ro', default => sub { {} },
73 coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
76 has remote_objects_by_id => (
77 is => 'ro', default => sub { {} },
78 coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
81 has outstanding_futures => (is => 'ro', default => sub { {} });
86 _deserialize => 'decode',
93 my $pid = $self->child_pid;
95 return unless defined $pid;
96 $child_pids{$pid} = 1;
104 my $valid = ! $self->on_close->is_ready;
114 "Connection '$id' is valid: '$text'"
120 sub _fail_outstanding {
121 my ($self, $error) = @_;
122 my $outstanding = $self->outstanding_futures;
125 sprintf "Failing %i outstanding futures with '$error'", scalar(keys(%$outstanding))
128 foreach(keys(%$outstanding)) {
129 log_trace { "Failing future for $_" };
130 my $future = $outstanding->{$_};
131 $future->fail("$error\n");
138 sub _install_future_handlers {
141 Dlog_trace { "Installing handlers into future for connection $_" } $self->_id;
144 my $pid = $self->child_pid;
145 Dlog_trace { "Executing on_done handler in future for connection $_" } $self->_id;
146 $self->_fail_outstanding("Object::Remote connection lost: " . ($f->get)[0]);
147 return unless defined $pid;
148 log_debug { "Waiting for child '$pid' to exit" };
149 my $ret = waitpid($pid, 0);
151 log_debug { "Waited for pid $pid but waitpid() returned $ret" };
154 log_warn { "Remote interpreter did not exit cleanly" };
157 my $exit_value = $? >> 8;
158 "Remote Perl interpreter exited with value '$exit_value'"
162 delete $child_pids{$pid};
167 sub _id_to_remote_object {
168 my ($self, $id) = @_;
169 Dlog_trace { "fetching proxy for remote object with id '$id' for connection $_" } $self->_id;
170 return bless({}, 'Object::Remote::Null') if $id eq 'NULL';
172 $self->remote_objects_by_id->{$id}
173 or Object::Remote::Handle->new(connection => $self, id => $id)
178 weaken(my $self = shift);
179 JSON::PP->new->filter_json_single_key_object(
180 __remote_object__ => sub {
181 $self->_id_to_remote_object(@_);
183 )->filter_json_single_key_object(
184 __remote_code__ => sub {
185 my $code_container = $self->_id_to_remote_object(@_);
186 sub { $code_container->call(@_) };
188 )->filter_json_single_key_object(
189 __scalar_ref__ => sub {
193 )->filter_json_single_key_object(
194 __glob_ref__ => sub {
195 my $glob_container = $self->_id_to_remote_object(@_);
196 my $handle = Symbol::gensym;
197 tie *$handle, 'Object::Remote::GlobProxy', $glob_container;
200 )->filter_json_single_key_object(
201 __local_object__ => sub {
202 $self->local_objects_by_id->{$_[0]}
204 )->filter_json_single_key_object(
205 __remote_tied_hash__ => sub {
207 tie %tied_hash, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
210 )->filter_json_single_key_object(
211 __remote_tied_array__ => sub {
213 tie @tied_array, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
219 sub _load_if_possible {
225 log_debug { "Attempt at loading '$class' failed with '$@'" };
231 unshift our @Guess, sub { blessed($_[0]) ? $_[0] : undef };
232 map _load_if_possible($_), qw(
233 Object::Remote::Connector::Local
234 Object::Remote::Connector::LocalSudo
235 Object::Remote::Connector::SSH
236 Object::Remote::Connector::UNIX
237 Object::Remote::Connector::INET
242 my ($class, $spec, @args) = @_;
243 foreach my $poss (do { our @Guess }) {
244 if (my $conn = $poss->($spec, @args)) {
253 my ($class, $spec, @args) = @_;
254 return $spec if blessed $spec;
255 my $conn = $class->conn_from_spec($spec, @args);
257 die "Couldn't figure out what to do with ${spec}"
258 unless defined $conn;
260 return $conn->maybe::start::connect;
264 my ($self, @args) = @_;
265 Object::Remote::Handle->new(
266 connection => $self, @args
271 my ($self, $to) = @_;
272 Dlog_debug { "Creating connection to remote node '$to' for connection $_" } $self->_id;
274 $self->send_class_call(0, 'Object::Remote', connect => $to)
279 my ($self, $sub) = @_;
280 my ($pkg, $name) = $sub =~ m/^(.*)::([^:]+)$/;
281 Dlog_debug { "Invoking remote sub '$sub' for connection '$_'" } $self->_id;
282 return await_future($self->send_class_call(0, $pkg, can => $name));
285 sub send_class_call {
286 my ($self, $ctx, @call) = @_;
287 Dlog_trace { "Sending a class call for connection $_" } $self->_id;
288 $self->send(call => class_call_handler => $ctx => call => @call);
291 sub register_class_call_handler {
293 $self->local_objects_by_id->{'class_call_handler'} ||= do {
294 my $o = $self->new_class_call_handler;
295 $self->_local_object_to_id($o);
300 sub new_class_call_handler {
301 Object::Remote::CodeContainer->new(
303 my ($class, $method) = (shift, shift);
304 use_module($class)->$method(@_);
309 sub register_remote {
310 my ($self, $remote) = @_;
311 Dlog_trace { my $i = $remote->id; "Registered a remote object with id of '$i' for connection $_" } $self->_id;
312 weaken($self->remote_objects_by_id->{$remote->id} = $remote);
317 my ($self, $id) = @_;
318 Dlog_trace { "sending request to free object '$id' for connection $_" } $self->_id;
319 #TODO this shows up some times when a remote side dies in the middle of a remote
320 #method invocation - possibly only when the object is being constructed?
321 #(in cleanup) Use of uninitialized value $id in delete at ../Object-Remote/lib/Object/Remote/Connection.
322 delete $self->remote_objects_by_id->{$id};
323 $self->_send([ free => $id ]);
327 my ($self, $type, @call) = @_;
329 my $future = Future->new;
330 my $remote = $self->remote_objects_by_id->{$call[0]};
332 unshift @call, $type => $self->_local_object_to_id($future);
334 my $outstanding = $self->outstanding_futures;
335 $outstanding->{$future} = $future;
336 $future->on_ready(sub {
338 delete $outstanding->{$future}
341 $self->_send(\@call);
347 my ($self, $type, @call) = @_;
349 unshift @call, $type => 'NULL';
351 $self->_send(\@call);
355 my ($self, $to_send) = @_;
356 my $fh = $self->send_to_fh;
358 unless ($self->is_valid) {
359 croak "Attempt to invoke _send on a connection that is not valid";
362 Dlog_trace { "Starting to serialize data in argument to _send for connection $_" } $self->_id;
363 my $serialized = $self->_serialize($to_send)."\n";
364 Dlog_trace { my $l = length($serialized); "serialization is completed; sending '$l' characters of serialized data to $_" } $fh;
367 #TODO this should be converted over to a non-blocking ::WriteChannel class
368 die "filehandle is not open" unless openhandle($fh);
369 log_trace { "file handle has passed openhandle() test; printing to it" };
370 $ret = print $fh $serialized;
371 die "print was not successful: $!" unless defined $ret
375 Dlog_debug { "exception encountered when trying to write to file handle $_: $@" } $fh;
378 $self->on_close->done("could not write to file handle: $error") unless $self->on_close->is_ready;
386 my ($self, $data) = @_;
387 local our @New_Ids = (-1);
389 my $flat = $self->_encode($self->_deobjectify($data));
392 my $err = $@; # won't get here if the eval doesn't die
393 # don't keep refs to new things
394 delete @{$self->local_objects_by_id}{@New_Ids};
395 die "Error serializing: $err";
399 sub _local_object_to_id {
400 my ($self, $object) = @_;
401 my $id = refaddr($object);
402 $self->local_objects_by_id->{$id} ||= do {
403 push our(@New_Ids), $id if @New_Ids;
410 my ($self, $data) = @_;
411 if (blessed($data)) {
413 $data->isa('Object::Remote::Proxy')
414 and $data->{remote}->connection == $self
416 return +{ __local_object__ => $data->{remote}->id };
418 return +{ __remote_object__ => $self->_local_object_to_id($data) };
420 } elsif (my $ref = ref($data)) {
421 if ($ref eq 'HASH') {
422 my $tied_to = tied(%$data);
423 if(defined($tied_to)) {
424 return +{__remote_tied_hash__ => $self->_local_object_to_id($tied_to)};
426 return +{ map +($_ => $self->_deobjectify($data->{$_})), keys %$data };
428 } elsif ($ref eq 'ARRAY') {
429 my $tied_to = tied(@$data);
430 if (defined($tied_to)) {
431 return +{__remote_tied_array__ => $self->_local_object_to_id($tied_to)};
433 return [ map $self->_deobjectify($_), @$data ];
435 } elsif ($ref eq 'CODE') {
436 my $id = $self->_local_object_to_id(
437 Object::Remote::CodeContainer->new(code => $data)
439 return +{ __remote_code__ => $id };
440 } elsif ($ref eq 'SCALAR') {
441 return +{ __scalar_ref__ => $$data };
442 } elsif ($ref eq 'GLOB') {
443 return +{ __glob_ref__ => $self->_local_object_to_id(
444 Object::Remote::GlobContainer->new(handle => $data)
447 die "Can't collapse reftype $ref";
450 return $data; # plain scalar
454 my ($self, $flat) = @_;
455 Dlog_trace { my $l = length($flat); "Starting to deserialize $l characters of data for connection $_" } $self->_id;
456 my ($type, @rest) = eval { @{$self->_deserialize($flat)} }
457 or do { warn "Deserialize failed for ${flat}: $@"; return };
458 Dlog_trace { "deserialization complete for connection $_" } $self->_id;
459 eval { $self->${\"receive_${type}"}(@rest); 1 }
460 or do { warn "Receive failed for ${flat}: $@"; return };
465 my ($self, $id) = @_;
466 Dlog_trace { "got a receive_free for object '$id' for connection $_" } $self->_id;
467 delete $self->local_objects_by_id->{$id}
468 or warn "Free: no such object $id";
473 my ($self, $future_id, $id, @rest) = @_;
474 Dlog_trace { "got a receive_call for object '$id' for connection $_" } $self->_id;
475 my $future = $self->_id_to_remote_object($future_id);
476 $future->{method} = 'call_discard_free';
477 my $local = $self->local_objects_by_id->{$id}
478 or do { $future->fail("No such object $id"); return };
479 $self->_invoke($future, $local, @rest);
482 sub receive_call_free {
483 my ($self, $future, $id, @rest) = @_;
484 Dlog_trace { "got a receive_call_free for object '$id' for connection $_" } $self->_id;
485 $self->receive_call($future, $id, undef, @rest);
486 $self->receive_free($id);
490 my ($self, $future, $local, $ctx, $method, @args) = @_;
491 Dlog_trace { "got _invoke for a method named '$method' for connection $_" } $self->_id;
492 if ($method =~ /^start::/) {
493 my $f = $local->$method(@args);
494 $f->on_done(sub { undef($f); $future->done(@_) });
496 $f->on_fail(sub { undef($f); $future->fail(@_) });
499 my $do = sub { $local->$method(@args) };
503 ? ($ctx ? $do->() : scalar($do->()))
507 } or do { $future->fail($@); return; };
515 Object::Remote::Connection - An underlying connection for L<Object::Remote>
519 my $local = Object::Remote->connect('-');
520 my $remote = Object::Remote->connect('myserver');
521 my $remote_user = Object::Remote->connect('user@myserver');
522 my $local_sudo = Object::Remote->connect('user@');
524 #$remote can be any other connection object
525 my $hostname = Sys::Hostname->can::on($remote, 'hostname');
529 This is the base class for connections in OR objects. Connections are present
530 both in the local and remote parts of each OR pair, and handle the data
531 processing for sending OR commands and responses as JSON via the appropiate
532 connection mechanism.
538 my $conn = Object::Remote::Connection->new_from_spec($spec, %args);
540 Not intended for direct use, called by L<Object::Remote/connect> in a
541 L<Future>-compatible way.
543 Uses the spec to guess the appropiate Object::Remote::Connector::* class to use,
544 instantiates it with the spec and any further arguments given, then calls
545 C<connect> on it in a L<Future>-compatible way.
551 =item C<Object::Remote::Role::Connector::PerlInterpreter>
553 =item C<Object::Remote>