make notes of things to fix before release
[scpubgit/Object-Remote.git] / lib / Object / Remote / Connection.pm
1 package Object::Remote::Connection;
2
3 use Object::Remote::Logging qw (:log :dlog router);
4 use Object::Remote::Future;
5 use Object::Remote::Null;
6 use Object::Remote::Handle;
7 use Object::Remote::CodeContainer;
8 use Object::Remote::GlobProxy;
9 use Object::Remote::GlobContainer;
10 use Object::Remote::Tied;
11 use Object::Remote;
12 use Symbol;
13 use IO::Handle;
14 use POSIX ":sys_wait_h";
15 use Module::Runtime qw(use_module);
16 use Scalar::Util qw(weaken blessed refaddr openhandle);
17 use JSON::PP qw(encode_json);
18 use Moo;
19 use Carp qw(croak);
20
21 BEGIN { router()->exclude_forwarding }
22
23 END {
24   log_debug { "Killing all child processes in the process group" };
25     
26   #FIXME update along with setpgrp() to not use a process
27   #group anymore
28   
29   #send SIGINT to the process group for our children
30   kill(1, -2);
31 }
32
33 has _id => ( is => 'ro', required => 1, default => sub { our $NEXT_CONNECTION_ID++ } );
34
35 has send_to_fh => (
36   is => 'ro', required => 1,
37   trigger => sub {
38     my $self = $_[0];
39     $_[1]->autoflush(1);
40     Dlog_trace { my $id = $self->_id; "connection had send_to_fh set to $_"  } $_[1];
41   },
42 );
43
44 has read_channel => (
45   is => 'ro', required => 1,
46   trigger => sub {
47     my ($self, $ch) = @_;
48     my $id = $self->_id; 
49     Dlog_trace { "trigger for read_channel has been invoked for connection $id; file handle is $_" } $ch->fh;
50     weaken($self);
51     $ch->on_line_call(sub { $self->_receive(@_) });
52     $ch->on_close_call(sub { 
53       log_trace { "invoking 'done' on on_close handler for connection id '$id'" };
54       $self->on_close->done(@_);
55     });
56   },
57 );
58
59 has on_close => (
60   is => 'rw', default => sub { $_[0]->_install_future_handlers(CPS::Future->new) },
61   trigger => sub {
62     log_trace { "Installing handlers into future via trigger" };
63     $_[0]->_install_future_handlers($_[1])
64   },
65 );
66
67 has child_pid => (is => 'ro');
68
69 has local_objects_by_id => (
70   is => 'ro', default => sub { {} },
71   coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
72 );
73
74 has remote_objects_by_id => (
75   is => 'ro', default => sub { {} },
76   coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
77 );
78
79 has outstanding_futures => (is => 'ro', default => sub { {} });
80
81 has _json => (
82   is => 'lazy',
83   handles => {
84     _deserialize => 'decode',
85     _encode => 'encode',
86   },
87 );
88
89 after BUILD => sub {
90   my ($self) = @_;
91   my $pid = $self->child_pid;
92   
93   unless (defined $pid) {
94     log_trace { "After BUILD invoked for connection but there was no pid" };
95     return;
96   }
97     
98   log_trace { "Setting process group of child process '$pid'" };
99   
100   #FIXME moving things into a process group has side effects for
101   #users of the library - move to a list
102   setpgrp($self->child_pid, 1);
103 };
104
105 sub BUILD { }
106
107 sub is_valid {
108   my ($self) = @_;
109   my $closed = $self->on_close->is_ready;
110   
111   log_trace { "Connection closed: $closed" };
112   return ! $closed;
113 }
114
115 sub _fail_outstanding {
116   my ($self, $error) = @_;
117   my $outstanding = $self->outstanding_futures;
118   
119   Dlog_debug { 
120     sprintf "Failing %i outstanding futures with '$error'", scalar(keys(%$outstanding))
121   };
122
123   foreach(keys(%$outstanding)) {
124     log_trace { "Failing future for $_" };
125     my $future = $outstanding->{$_};
126     $future->fail("$error\n");
127   }
128
129   %$outstanding = ();
130   return;
131 }
132
133 sub _install_future_handlers {
134     my ($self, $f) = @_;
135     Dlog_trace { "Installing handlers into future for connection $_" } $self->_id;
136     weaken($self);
137     $f->on_done(sub {
138       my $pid = $self->child_pid;
139       Dlog_trace { "Executing on_done handler in future for connection $_" } $self->_id;
140       $self->_fail_outstanding("Object::Remote connection lost: " . ($f->get)[0]);
141       return unless defined $pid;
142       log_debug { "Waiting for child '$pid' to exit" };
143       my $ret = waitpid($pid, 0);
144       if ($ret != $pid) {
145         log_debug { "Waited for pid $pid but waitpid() returned $ret" };
146         return;
147       } elsif ($? & 127) {
148           log_warn { "Remote interpreter did not exit cleanly" };
149       } else {
150         log_verbose {
151           my $exit_value = $? >> 8;
152           "Remote Perl interpreter exited with value '$exit_value'"
153         };
154       }
155     });
156     return $f; 
157 };
158
159 sub _id_to_remote_object {
160   my ($self, $id) = @_;
161   Dlog_trace { "fetching proxy for remote object with id '$id' for connection $_" } $self->_id;
162   return bless({}, 'Object::Remote::Null') if $id eq 'NULL';
163   (
164     $self->remote_objects_by_id->{$id}
165     or Object::Remote::Handle->new(connection => $self, id => $id)
166   )->proxy;
167 }
168
169 sub _build__json {
170   weaken(my $self = shift);
171   JSON::PP->new->filter_json_single_key_object(
172     __remote_object__ => sub {
173       $self->_id_to_remote_object(@_);
174     }
175   )->filter_json_single_key_object(
176     __remote_code__ => sub {
177       my $code_container = $self->_id_to_remote_object(@_);
178       sub { $code_container->call(@_) };
179     }
180   )->filter_json_single_key_object(
181     __scalar_ref__ => sub {
182       my $value = shift;
183       return \$value;
184     }
185   )->filter_json_single_key_object(
186     __glob_ref__ => sub {
187       my $glob_container = $self->_id_to_remote_object(@_);
188       my $handle = Symbol::gensym;
189       tie *$handle, 'Object::Remote::GlobProxy', $glob_container;
190       return $handle;
191     }
192   )->filter_json_single_key_object(
193     __local_object__ => sub {
194       $self->local_objects_by_id->{$_[0]}
195     }
196   )->filter_json_single_key_object(
197     __remote_tied_hash__ => sub {
198       my %tied_hash;
199       tie %tied_hash, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
200       return \%tied_hash;
201     }
202   )->filter_json_single_key_object(
203     __remote_tied_array__ => sub {
204       my @tied_array;
205       tie @tied_array, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
206       return \@tied_array;
207     }
208   ); 
209 }
210
211 sub _load_if_possible {
212   my ($class) = @_; 
213
214   use_module($class); 
215
216   if ($@) {
217     log_debug { "Attempt at loading '$class' failed with '$@'" };
218   }
219
220 }
221
222 BEGIN {
223   unshift our @Guess, sub { blessed($_[0]) ? $_[0] : undef };
224   map _load_if_possible($_), qw(
225     Object::Remote::Connector::Local
226     Object::Remote::Connector::LocalSudo
227     Object::Remote::Connector::SSH
228     Object::Remote::Connector::UNIX
229   ); 
230 }
231
232 sub conn_from_spec {
233   my ($class, $spec, @args) = @_;
234   foreach my $poss (do { our @Guess }) {
235     if (my $conn = $poss->($spec, @args)) {
236       return $conn;
237     }
238   }
239   
240   return undef;
241 }
242
243 sub new_from_spec {
244   my ($class, $spec, @args) = @_;
245   return $spec if blessed $spec;
246   my $conn = $class->conn_from_spec($spec, @args);
247   
248   die "Couldn't figure out what to do with ${spec}"
249     unless defined $conn;
250     
251   return $conn->maybe::start::connect;  
252 }
253
254 sub remote_object {
255   my ($self, @args) = @_;
256   Object::Remote::Handle->new(
257     connection => $self, @args
258   )->proxy;
259 }
260
261 sub connect {
262   my ($self, $to) = @_;
263   Dlog_debug { "Creating connection to remote node '$to' for connection $_" } $self->_id;
264   return await_future(
265     $self->send_class_call(0, 'Object::Remote', connect => $to)
266   );
267 }
268
269 sub remote_sub {
270   my ($self, $sub) = @_;
271   my ($pkg, $name) = $sub =~ m/^(.*)::([^:]+)$/;
272   Dlog_debug { "Invoking remote sub '$sub' for connection $_" } $self->_id;
273   return await_future($self->send_class_call(0, $pkg, can => $name));
274 }
275
276 sub send_class_call {
277   my ($self, $ctx, @call) = @_;
278   Dlog_trace { "Sending a class call for connection $_" } $self->_id;
279   $self->send(call => class_call_handler => $ctx => call => @call);
280 }
281
282 sub register_class_call_handler {
283   my ($self) = @_;
284   $self->local_objects_by_id->{'class_call_handler'} ||= do {
285     my $o = $self->new_class_call_handler;
286     $self->_local_object_to_id($o);
287     $o;
288   };
289 }
290
291 sub new_class_call_handler {
292   Object::Remote::CodeContainer->new(
293     code => sub {
294       my ($class, $method) = (shift, shift);
295       use_module($class)->$method(@_);
296     }
297   );
298 }
299
300 sub register_remote {
301   my ($self, $remote) = @_;
302   Dlog_trace { my $i = $remote->id; "Registered a remote object with id of '$i' for connection $_" } $self->_id;
303   weaken($self->remote_objects_by_id->{$remote->id} = $remote);
304   return $remote;
305 }
306
307 sub send_free {
308   my ($self, $id) = @_;
309   Dlog_trace { "sending request to free object '$id' for connection $_" } $self->_id;
310   #TODO this shows up some times when a remote side dies in the middle of a remote
311   #method invocation - possibly only when the object is being constructed?
312   #(in cleanup) Use of uninitialized value $id in delete at ../Object-Remote/lib/Object/Remote/Connection.
313   delete $self->remote_objects_by_id->{$id};
314   $self->_send([ free => $id ]);
315 }
316
317 sub send {
318   my ($self, $type, @call) = @_;
319
320   my $future = CPS::Future->new;
321   my $remote = $self->remote_objects_by_id->{$call[0]};
322
323   unshift @call, $type => $self->_local_object_to_id($future);
324
325   my $outstanding = $self->outstanding_futures;
326   $outstanding->{$future} = $future;
327   $future->on_ready(sub {
328     undef($remote);
329     delete $outstanding->{$future}
330   });
331
332   $self->_send(\@call);
333
334   return $future;
335 }
336
337 sub send_discard {
338   my ($self, $type, @call) = @_;
339
340   unshift @call, $type => 'NULL';
341
342   $self->_send(\@call);
343 }
344
345 sub _send {
346   my ($self, $to_send) = @_;
347   my $fh = $self->send_to_fh;
348   
349   unless ($self->is_valid) {
350     croak "Attempt to invoke _send on a connection that is not valid";
351   }
352   
353   Dlog_trace { "Starting to serialize data in argument to _send for connection $_" } $self->_id;
354   my $serialized = $self->_serialize($to_send)."\n";
355   Dlog_trace { my $l = length($serialized); "serialization is completed; sending '$l' characters of serialized data to $_" } $fh;
356   my $ret; 
357   eval { 
358     #TODO this should be converted over to a non-blocking ::WriteChannel class
359     die "filehandle is not open" unless openhandle($fh);
360     log_trace { "file handle has passed openhandle() test; printing to it" };
361     $ret = print $fh $serialized;
362     die "print was not successful: $!" unless defined $ret
363   };
364     
365   if ($@) {
366     Dlog_debug { "exception encountered when trying to write to file handle $_: $@" } $fh;
367     my $error = $@;
368     chomp($error);
369     $self->on_close->done("could not write to file handle: $error") unless $self->on_close->is_ready;
370     return; 
371   }
372       
373   return $ret; 
374 }
375
376 sub _serialize {
377   my ($self, $data) = @_;
378   local our @New_Ids = (-1);
379   return eval {
380     my $flat = $self->_encode($self->_deobjectify($data));
381     $flat;
382   } || do {
383     my $err = $@; # won't get here if the eval doesn't die
384     # don't keep refs to new things
385     delete @{$self->local_objects_by_id}{@New_Ids};
386     die "Error serializing: $err";
387   };
388 }
389
390 sub _local_object_to_id {
391   my ($self, $object) = @_;
392   my $id = refaddr($object);
393   $self->local_objects_by_id->{$id} ||= do {
394     push our(@New_Ids), $id if @New_Ids;
395     $object;
396   };
397   return $id;
398 }
399
400 sub _deobjectify {
401   my ($self, $data) = @_;
402   if (blessed($data)) {
403     if (
404       $data->isa('Object::Remote::Proxy')
405       and $data->{remote}->connection == $self
406     ) {
407       return +{ __local_object__ => $data->{remote}->id };
408     } else {
409       return +{ __remote_object__ => $self->_local_object_to_id($data) };
410     }
411   } elsif (my $ref = ref($data)) {
412     if ($ref eq 'HASH') {
413       my $tied_to = tied(%$data);
414       if(defined($tied_to)) {
415         return +{__remote_tied_hash__ => $self->_local_object_to_id($tied_to)}; 
416       } else {
417         return +{ map +($_ => $self->_deobjectify($data->{$_})), keys %$data };
418       }
419     } elsif ($ref eq 'ARRAY') {
420       my $tied_to = tied(@$data);
421       if (defined($tied_to)) {
422         return +{__remote_tied_array__ => $self->_local_object_to_id($tied_to)}; 
423       } else {
424         return [ map $self->_deobjectify($_), @$data ];
425       }
426     } elsif ($ref eq 'CODE') {
427       my $id = $self->_local_object_to_id(
428                  Object::Remote::CodeContainer->new(code => $data)
429                );
430       return +{ __remote_code__ => $id };
431     } elsif ($ref eq 'SCALAR') {
432       return +{ __scalar_ref__ => $$data };
433     } elsif ($ref eq 'GLOB') {
434       return +{ __glob_ref__ => $self->_local_object_to_id(
435         Object::Remote::GlobContainer->new(handle => $data)
436       ) };
437     } else {
438       die "Can't collapse reftype $ref";
439     }
440   }
441   return $data; # plain scalar
442 }
443
444 sub _receive {
445   my ($self, $flat) = @_;
446   Dlog_trace { my $l = length($flat); "Starting to deserialize $l characters of data for connection $_" } $self->_id;
447   my ($type, @rest) = eval { @{$self->_deserialize($flat)} }
448     or do { warn "Deserialize failed for ${flat}: $@"; return };
449   Dlog_trace { "deserialization complete for connection $_" } $self->_id;
450   eval { $self->${\"receive_${type}"}(@rest); 1 }
451     or do { warn "Receive failed for ${flat}: $@"; return };
452   return;
453 }
454
455 sub receive_free {
456   my ($self, $id) = @_;
457   Dlog_trace { "got a receive_free for object '$id' for connection $_" } $self->_id;
458   delete $self->local_objects_by_id->{$id}
459     or warn "Free: no such object $id";
460   return;
461 }
462
463 sub receive_call {
464   my ($self, $future_id, $id, @rest) = @_;
465   Dlog_trace { "got a receive_call for object '$id' for connection $_" } $self->_id;
466   my $future = $self->_id_to_remote_object($future_id);
467   $future->{method} = 'call_discard_free';
468   my $local = $self->local_objects_by_id->{$id}
469     or do { $future->fail("No such object $id"); return };
470   $self->_invoke($future, $local, @rest);
471 }
472
473 sub receive_call_free {
474   my ($self, $future, $id, @rest) = @_;
475   Dlog_trace { "got a receive_call_free for object '$id' for connection $_" } $self->_id;
476   $self->receive_call($future, $id, undef, @rest);
477   $self->receive_free($id);
478 }
479
480 sub _invoke {
481   my ($self, $future, $local, $ctx, $method, @args) = @_;
482   Dlog_trace { "got _invoke for a method named '$method' for connection $_" } $self->_id;
483   if ($method =~ /^start::/) {
484     my $f = $local->$method(@args);
485     $f->on_done(sub { undef($f); $future->done(@_) });
486     return unless $f;
487     $f->on_fail(sub { undef($f); $future->fail(@_) });
488     return;
489   }
490   my $do = sub { $local->$method(@args) };
491   eval {
492     $future->done(
493       defined($ctx)
494         ? ($ctx ? $do->() : scalar($do->()))
495         : do { $do->(); () }
496     );
497     1;
498   } or do { $future->fail($@); return; };
499   return;
500 }
501
502 1;
503
504 =head1 NAME
505
506 Object::Remote::Connection - An underlying connection for L<Object::Remote>
507
508   use Object::Remote;
509   
510   my %opts = (
511     nice => '10', ulimit => '-v 400000',
512     watchdog_timeout => 120, stderr => \*STDERR,
513   );
514   
515   my $local = Object::Remote->connect('-');
516   my $remote = Object::Remote->connect('myserver', nice => 5);
517   my $remote_user = Object::Remote->connect('user@myserver', %opts);
518   my $local_sudo = Object::Remote->connect('user@');
519   
520   #$remote can be any other connection object
521   my $hostname = Sys::Hostname->can::on($remote, 'hostname');
522   
523 =head1 DESCRIPTION
524
525 This is the class that supports connections to a Perl interpreter that is executed in a
526 different process. The new Perl interpreter can be either on the local or a remote machine
527 and is configurable via arguments passed to the constructor.
528
529 =head1 ARGUMENTS
530
531 =over 4
532
533 =item nice
534
535 If this value is defined then it will be used as the nice value of the Perl process when it
536 is started. The default is the undefined value and will not nice the process.
537
538 =item stderr
539
540 If this value is defined then it will be used as the file handle that receives the output
541 of STDERR from the Perl interpreter process and I/O will be performed by the run loop in a
542 non-blocking way. If the value is undefined then STDERR of the remote process will be connected
543 directly to STDERR of the local process with out the run loop managing I/O. The default value
544 is undefined.
545
546 There are a few ways to use this feature. By default the behavior is to form one unified STDERR
547 across all of the Perl interpreters including the local one. For small scale and quick operation
548 this offers a predictable and easy to use way to get at error messages generated anywhere. If
549 the local Perl interpreter crashes then the remote Perl interpreters still have an active STDERR
550 and it is possible to still receive output from them. This is generally a good thing but can
551 cause issues.
552
553 When using a file handle as the output for STDERR once the local Perl interpreter is no longer
554 running there is no longer a valid STDERR for the remote interpreters to send data to. This means
555 that it is no longer possible to receive error output from the remote interpreters and that the
556 shell will start to kill off the child processes. Passing a reference to STDERR for the local
557 interpreter (as the SYNOPSIS shows) causes the run loop to manage I/O, one unified STDERR for
558 all Perl interpreters that ends as soon as the local interpreter process does, and the shell will
559 start killing children when the local interpreter exits.
560
561 It is also possible to pass in a file handle that has been opened for writing. This would be
562 useful for logging the output of the remote interpreter directly into a dedicated file.
563
564 =item ulimit
565
566 If this string is defined then it will be passed unmodified as the arguments to ulimit when
567 the Perl process is started. The default value is the undefined value and will not limit the
568 process in any way.
569
570 =item watchdog_timeout
571
572 If this value is defined then it will be used as the number of seconds the watchdog will wait
573 for an update before it terminates the Perl interpreter process. The default value is undefined
574 and will not use the watchdog. See C<Object::Remote::Watchdog> for more information.
575
576 =back
577
578 =head1 SEE ALSO
579
580 =over 4
581
582 =item C<Object::Remote>
583
584 =back
585
586 =cut