further detail OR->connect arguments, and document other bits
[scpubgit/Object-Remote.git] / lib / Object / Remote / Connection.pm
1 package Object::Remote::Connection;
2
3 use Object::Remote::Logging qw (:log :dlog router);
4 use Object::Remote::Future;
5 use Object::Remote::Null;
6 use Object::Remote::Handle;
7 use Object::Remote::CodeContainer;
8 use Object::Remote::GlobProxy;
9 use Object::Remote::GlobContainer;
10 use Object::Remote::Tied;
11 use Object::Remote;
12 use Symbol;
13 use IO::Handle;
14 use POSIX ":sys_wait_h";
15 use Module::Runtime qw(use_module);
16 use Scalar::Util qw(weaken blessed refaddr openhandle);
17 use JSON::PP qw(encode_json);
18 use Future;
19 use Carp qw(croak);
20 use Moo;
21
22 BEGIN { router()->exclude_forwarding }
23
24 END {
25   our %child_pids;
26
27   log_trace { "END handler is being invoked in " . __PACKAGE__ };
28
29   foreach(keys(%child_pids)) {
30     log_debug { "Killing child process '$_'" };
31     kill('TERM', $_);
32   }
33 }
34
35 has _id => ( is => 'ro', required => 1, default => sub { our $NEXT_CONNECTION_ID++ } );
36
37 has send_to_fh => (
38   is => 'ro', required => 1,
39   trigger => sub {
40     my $self = $_[0];
41     $_[1]->autoflush(1);
42     Dlog_trace { my $id = $self->_id; "connection had send_to_fh set to $_"  } $_[1];
43   },
44 );
45
46 has read_channel => (
47   is => 'ro', required => 1,
48   trigger => sub {
49     my ($self, $ch) = @_;
50     my $id = $self->_id;
51     Dlog_trace { "trigger for read_channel has been invoked for connection $id; file handle is $_" } $ch->fh;
52     weaken($self);
53     $ch->on_line_call(sub { $self->_receive(@_) });
54     $ch->on_close_call(sub {
55       log_trace { "invoking 'done' on on_close handler for connection id '$id'" };
56       $self->on_close->done(@_);
57     });
58   },
59 );
60
61 has on_close => (
62   is => 'rw', default => sub { $_[0]->_install_future_handlers(Future->new) },
63   trigger => sub {
64     log_trace { "Installing handlers into future via trigger" };
65     $_[0]->_install_future_handlers($_[1])
66   },
67 );
68
69 has child_pid => (is => 'ro');
70
71 has local_objects_by_id => (
72   is => 'ro', default => sub { {} },
73   coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
74 );
75
76 has remote_objects_by_id => (
77   is => 'ro', default => sub { {} },
78   coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
79 );
80
81 has outstanding_futures => (is => 'ro', default => sub { {} });
82
83 has _json => (
84   is => 'lazy',
85   handles => {
86     _deserialize => 'decode',
87     _encode => 'encode',
88   },
89 );
90
91 after BUILD => sub {
92   my ($self) = @_;
93   my $pid = $self->child_pid;
94   our %child_pids;
95   return unless defined $pid;
96   $child_pids{$pid} = 1;
97   return;
98 };
99
100 sub BUILD { }
101
102 sub is_valid {
103   my ($self) = @_;
104   my $valid = ! $self->on_close->is_ready;
105
106   log_trace {
107     my $id = $self->_id;
108     my $text;
109     if ($valid) {
110       $text = 'yes';
111     } else {
112       $text = 'no';
113     }
114     "Connection '$id' is valid: '$text'"
115   };
116
117   return $valid;
118 }
119
120 sub _fail_outstanding {
121   my ($self, $error) = @_;
122   my $outstanding = $self->outstanding_futures;
123
124   Dlog_debug {
125     sprintf "Failing %i outstanding futures with '$error'", scalar(keys(%$outstanding))
126   };
127
128   foreach(keys(%$outstanding)) {
129     log_trace { "Failing future for $_" };
130     my $future = $outstanding->{$_};
131     $future->fail("$error\n");
132   }
133
134   %$outstanding = ();
135   return;
136 }
137
138 sub _install_future_handlers {
139     my ($self, $f) = @_;
140     our %child_pids;
141     Dlog_trace { "Installing handlers into future for connection $_" } $self->_id;
142     weaken($self);
143     $f->on_done(sub {
144       my $pid = $self->child_pid;
145       Dlog_trace { "Executing on_done handler in future for connection $_" } $self->_id;
146       $self->_fail_outstanding("Object::Remote connection lost: " . ($f->get)[0]);
147       return unless defined $pid;
148       log_debug { "Waiting for child '$pid' to exit" };
149       my $ret = waitpid($pid, 0);
150       if ($ret != $pid) {
151         log_debug { "Waited for pid $pid but waitpid() returned $ret" };
152         return;
153       } elsif ($? & 127) {
154           log_warn { "Remote interpreter did not exit cleanly" };
155       } else {
156         log_verbose {
157           my $exit_value = $? >> 8;
158           "Remote Perl interpreter exited with value '$exit_value'"
159         };
160       }
161
162       delete $child_pids{$pid};
163     });
164     return $f;
165 };
166
167 sub _id_to_remote_object {
168   my ($self, $id) = @_;
169   Dlog_trace { "fetching proxy for remote object with id '$id' for connection $_" } $self->_id;
170   return bless({}, 'Object::Remote::Null') if $id eq 'NULL';
171   (
172     $self->remote_objects_by_id->{$id}
173     or Object::Remote::Handle->new(connection => $self, id => $id)
174   )->proxy;
175 }
176
177 sub _build__json {
178   weaken(my $self = shift);
179   JSON::PP->new->filter_json_single_key_object(
180     __remote_object__ => sub {
181       $self->_id_to_remote_object(@_);
182     }
183   )->filter_json_single_key_object(
184     __remote_code__ => sub {
185       my $code_container = $self->_id_to_remote_object(@_);
186       sub { $code_container->call(@_) };
187     }
188   )->filter_json_single_key_object(
189     __scalar_ref__ => sub {
190       my $value = shift;
191       return \$value;
192     }
193   )->filter_json_single_key_object(
194     __glob_ref__ => sub {
195       my $glob_container = $self->_id_to_remote_object(@_);
196       my $handle = Symbol::gensym;
197       tie *$handle, 'Object::Remote::GlobProxy', $glob_container;
198       return $handle;
199     }
200   )->filter_json_single_key_object(
201     __local_object__ => sub {
202       $self->local_objects_by_id->{$_[0]}
203     }
204   )->filter_json_single_key_object(
205     __remote_tied_hash__ => sub {
206       my %tied_hash;
207       tie %tied_hash, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
208       return \%tied_hash;
209     }
210   )->filter_json_single_key_object(
211     __remote_tied_array__ => sub {
212       my @tied_array;
213       tie @tied_array, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
214       return \@tied_array;
215     }
216   );
217 }
218
219 sub _load_if_possible {
220   my ($class) = @_;
221
222   use_module($class);
223
224   if ($@) {
225     log_debug { "Attempt at loading '$class' failed with '$@'" };
226   }
227
228 }
229
230 BEGIN {
231   unshift our @Guess, sub { blessed($_[0]) ? $_[0] : undef };
232   map _load_if_possible($_), qw(
233     Object::Remote::Connector::Local
234     Object::Remote::Connector::LocalSudo
235     Object::Remote::Connector::SSH
236     Object::Remote::Connector::UNIX
237     Object::Remote::Connector::INET
238   );
239 }
240
241 sub conn_from_spec {
242   my ($class, $spec, @args) = @_;
243   foreach my $poss (do { our @Guess }) {
244     if (my $conn = $poss->($spec, @args)) {
245       return $conn;
246     }
247   }
248
249   return undef;
250 }
251
252 sub new_from_spec {
253   my ($class, $spec, @args) = @_;
254   return $spec if blessed $spec;
255   my $conn = $class->conn_from_spec($spec, @args);
256
257   die "Couldn't figure out what to do with ${spec}"
258     unless defined $conn;
259
260   return $conn->maybe::start::connect;
261 }
262
263 sub remote_object {
264   my ($self, @args) = @_;
265   Object::Remote::Handle->new(
266     connection => $self, @args
267   )->proxy;
268 }
269
270 sub connect {
271   my ($self, $to) = @_;
272   Dlog_debug { "Creating connection to remote node '$to' for connection $_" } $self->_id;
273   return await_future(
274     $self->send_class_call(0, 'Object::Remote', connect => $to)
275   );
276 }
277
278 sub remote_sub {
279   my ($self, $sub) = @_;
280   my ($pkg, $name) = $sub =~ m/^(.*)::([^:]+)$/;
281   Dlog_debug { "Invoking remote sub '$sub' for connection '$_'" } $self->_id;
282   return await_future($self->send_class_call(0, $pkg, can => $name));
283 }
284
285 sub send_class_call {
286   my ($self, $ctx, @call) = @_;
287   Dlog_trace { "Sending a class call for connection $_" } $self->_id;
288   $self->send(call => class_call_handler => $ctx => call => @call);
289 }
290
291 sub register_class_call_handler {
292   my ($self) = @_;
293   $self->local_objects_by_id->{'class_call_handler'} ||= do {
294     my $o = $self->new_class_call_handler;
295     $self->_local_object_to_id($o);
296     $o;
297   };
298 }
299
300 sub new_class_call_handler {
301   Object::Remote::CodeContainer->new(
302     code => sub {
303       my ($class, $method) = (shift, shift);
304       use_module($class)->$method(@_);
305     }
306   );
307 }
308
309 sub register_remote {
310   my ($self, $remote) = @_;
311   Dlog_trace { my $i = $remote->id; "Registered a remote object with id of '$i' for connection $_" } $self->_id;
312   weaken($self->remote_objects_by_id->{$remote->id} = $remote);
313   return $remote;
314 }
315
316 sub send_free {
317   my ($self, $id) = @_;
318   Dlog_trace { "sending request to free object '$id' for connection $_" } $self->_id;
319   #TODO this shows up some times when a remote side dies in the middle of a remote
320   #method invocation - possibly only when the object is being constructed?
321   #(in cleanup) Use of uninitialized value $id in delete at ../Object-Remote/lib/Object/Remote/Connection.
322   delete $self->remote_objects_by_id->{$id};
323   $self->_send([ free => $id ]);
324 }
325
326 sub send {
327   my ($self, $type, @call) = @_;
328
329   my $future = Future->new;
330   my $remote = $self->remote_objects_by_id->{$call[0]};
331
332   unshift @call, $type => $self->_local_object_to_id($future);
333
334   my $outstanding = $self->outstanding_futures;
335   $outstanding->{$future} = $future;
336   $future->on_ready(sub {
337     undef($remote);
338     delete $outstanding->{$future}
339   });
340
341   $self->_send(\@call);
342
343   return $future;
344 }
345
346 sub send_discard {
347   my ($self, $type, @call) = @_;
348
349   unshift @call, $type => 'NULL';
350
351   $self->_send(\@call);
352 }
353
354 sub _send {
355   my ($self, $to_send) = @_;
356   my $fh = $self->send_to_fh;
357
358   unless ($self->is_valid) {
359     croak "Attempt to invoke _send on a connection that is not valid";
360   }
361
362   Dlog_trace { "Starting to serialize data in argument to _send for connection $_" } $self->_id;
363   my $serialized = $self->_serialize($to_send)."\n";
364   Dlog_trace { my $l = length($serialized); "serialization is completed; sending '$l' characters of serialized data to $_" } $fh;
365   my $ret;
366   eval {
367     #TODO this should be converted over to a non-blocking ::WriteChannel class
368     die "filehandle is not open" unless openhandle($fh);
369     log_trace { "file handle has passed openhandle() test; printing to it" };
370     $ret = print $fh $serialized;
371     die "print was not successful: $!" unless defined $ret
372   };
373
374   if ($@) {
375     Dlog_debug { "exception encountered when trying to write to file handle $_: $@" } $fh;
376     my $error = $@;
377     chomp($error);
378     $self->on_close->done("could not write to file handle: $error") unless $self->on_close->is_ready;
379     return;
380   }
381
382   return $ret;
383 }
384
385 sub _serialize {
386   my ($self, $data) = @_;
387   local our @New_Ids = (-1);
388   return eval {
389     my $flat = $self->_encode($self->_deobjectify($data));
390     $flat;
391   } || do {
392     my $err = $@; # won't get here if the eval doesn't die
393     # don't keep refs to new things
394     delete @{$self->local_objects_by_id}{@New_Ids};
395     die "Error serializing: $err";
396   };
397 }
398
399 sub _local_object_to_id {
400   my ($self, $object) = @_;
401   my $id = refaddr($object);
402   $self->local_objects_by_id->{$id} ||= do {
403     push our(@New_Ids), $id if @New_Ids;
404     $object;
405   };
406   return $id;
407 }
408
409 sub _deobjectify {
410   my ($self, $data) = @_;
411   if (blessed($data)) {
412     if (
413       $data->isa('Object::Remote::Proxy')
414       and $data->{remote}->connection == $self
415     ) {
416       return +{ __local_object__ => $data->{remote}->id };
417     } else {
418       return +{ __remote_object__ => $self->_local_object_to_id($data) };
419     }
420   } elsif (my $ref = ref($data)) {
421     if ($ref eq 'HASH') {
422       my $tied_to = tied(%$data);
423       if(defined($tied_to)) {
424         return +{__remote_tied_hash__ => $self->_local_object_to_id($tied_to)};
425       } else {
426         return +{ map +($_ => $self->_deobjectify($data->{$_})), keys %$data };
427       }
428     } elsif ($ref eq 'ARRAY') {
429       my $tied_to = tied(@$data);
430       if (defined($tied_to)) {
431         return +{__remote_tied_array__ => $self->_local_object_to_id($tied_to)};
432       } else {
433         return [ map $self->_deobjectify($_), @$data ];
434       }
435     } elsif ($ref eq 'CODE') {
436       my $id = $self->_local_object_to_id(
437                  Object::Remote::CodeContainer->new(code => $data)
438                );
439       return +{ __remote_code__ => $id };
440     } elsif ($ref eq 'SCALAR') {
441       return +{ __scalar_ref__ => $$data };
442     } elsif ($ref eq 'GLOB') {
443       return +{ __glob_ref__ => $self->_local_object_to_id(
444         Object::Remote::GlobContainer->new(handle => $data)
445       ) };
446     } else {
447       die "Can't collapse reftype $ref";
448     }
449   }
450   return $data; # plain scalar
451 }
452
453 sub _receive {
454   my ($self, $flat) = @_;
455   Dlog_trace { my $l = length($flat); "Starting to deserialize $l characters of data for connection $_" } $self->_id;
456   my ($type, @rest) = eval { @{$self->_deserialize($flat)} }
457     or do { warn "Deserialize failed for ${flat}: $@"; return };
458   Dlog_trace { "deserialization complete for connection $_" } $self->_id;
459   eval { $self->${\"receive_${type}"}(@rest); 1 }
460     or do { warn "Receive failed for ${flat}: $@"; return };
461   return;
462 }
463
464 sub receive_free {
465   my ($self, $id) = @_;
466   Dlog_trace { "got a receive_free for object '$id' for connection $_" } $self->_id;
467   delete $self->local_objects_by_id->{$id}
468     or warn "Free: no such object $id";
469   return;
470 }
471
472 sub receive_call {
473   my ($self, $future_id, $id, @rest) = @_;
474   Dlog_trace { "got a receive_call for object '$id' for connection $_" } $self->_id;
475   my $future = $self->_id_to_remote_object($future_id);
476   $future->{method} = 'call_discard_free';
477   my $local = $self->local_objects_by_id->{$id}
478     or do { $future->fail("No such object $id"); return };
479   $self->_invoke($future, $local, @rest);
480 }
481
482 sub receive_call_free {
483   my ($self, $future, $id, @rest) = @_;
484   Dlog_trace { "got a receive_call_free for object '$id' for connection $_" } $self->_id;
485   $self->receive_call($future, $id, undef, @rest);
486   $self->receive_free($id);
487 }
488
489 sub _invoke {
490   my ($self, $future, $local, $ctx, $method, @args) = @_;
491   Dlog_trace { "got _invoke for a method named '$method' for connection $_" } $self->_id;
492   if ($method =~ /^start::/) {
493     my $f = $local->$method(@args);
494     $f->on_done(sub { undef($f); $future->done(@_) });
495     return unless $f;
496     $f->on_fail(sub { undef($f); $future->fail(@_) });
497     return;
498   }
499   my $do = sub { $local->$method(@args) };
500   eval {
501     $future->done(
502       defined($ctx)
503         ? ($ctx ? $do->() : scalar($do->()))
504         : do { $do->(); () }
505     );
506     1;
507   } or do { $future->fail($@); return; };
508   return;
509 }
510
511 1;
512
513 =head1 NAME
514
515 Object::Remote::Connection - An underlying connection for L<Object::Remote>
516
517   use Object::Remote;
518
519   my $local = Object::Remote->connect('-');
520   my $remote = Object::Remote->connect('myserver');
521   my $remote_user = Object::Remote->connect('user@myserver');
522   my $local_sudo = Object::Remote->connect('user@');
523
524   #$remote can be any other connection object
525   my $hostname = Sys::Hostname->can::on($remote, 'hostname');
526
527 =head1 DESCRIPTION
528
529 This is the base class for connections in OR objects. Connections are present
530 both in the local and remote parts of each OR pair, and handle the data
531 processing for sending OR commands and responses as JSON via the appropiate
532 connection mechanism.
533
534 =head1 METHODS
535
536 =head2 new_from_spec
537
538   my $conn = Object::Remote::Connection->new_from_spec($spec, %args);
539
540 Not intended for direct use, called by L<Object::Remote/connect> in a
541 L<Future>-compatible way.
542
543 Uses the spec to guess the appropiate Object::Remote::Connector::* class to use,
544 instantiates it with the spec and any further arguments given, then calls
545 C<connect> on it in a L<Future>-compatible way.
546
547 =head1 SEE ALSO
548
549 =over 4
550
551 =item C<Object::Remote::Role::Connector::PerlInterpreter>
552
553 =item C<Object::Remote>
554
555 =back
556
557 =cut