6c10d4584c0196c097ce17c59d8e8b5e29acf4df
[scpubgit/Object-Remote.git] / lib / Object / Remote / Connection.pm
1 package Object::Remote::Connection;
2
3 use Object::Remote::Logging qw (:log :dlog router);
4 use Object::Remote::Future;
5 use Object::Remote::Null;
6 use Object::Remote::Handle;
7 use Object::Remote::CodeContainer;
8 use Object::Remote::GlobProxy;
9 use Object::Remote::GlobContainer;
10 use Object::Remote::Tied;
11 use Object::Remote;
12 use Symbol;
13 use IO::Handle;
14 use POSIX ":sys_wait_h";
15 use Module::Runtime qw(use_module);
16 use Scalar::Util qw(weaken blessed refaddr openhandle);
17 use JSON::PP qw(encode_json);
18 use Moo;
19 use Carp qw(croak);
20
21 BEGIN { router()->exclude_forwarding }
22
23 END {
24   log_debug { "Killing all child processes in the process group" };
25     
26   #FIXME update along with setpgrp() to not use a process
27   #group anymore
28   
29   #send SIGINT to the process group for our children
30   kill(1, -2);
31 }
32
33 has _id => ( is => 'ro', required => 1, default => sub { our $NEXT_CONNECTION_ID++ } );
34
35 has send_to_fh => (
36   is => 'ro', required => 1,
37   trigger => sub {
38     my $self = $_[0];
39     $_[1]->autoflush(1);
40     Dlog_trace { my $id = $self->_id; "connection had send_to_fh set to $_"  } $_[1];
41   },
42 );
43
44 has read_channel => (
45   is => 'ro', required => 1,
46   trigger => sub {
47     my ($self, $ch) = @_;
48     my $id = $self->_id; 
49     Dlog_trace { "trigger for read_channel has been invoked for connection $id; file handle is $_" } $ch->fh;
50     weaken($self);
51     $ch->on_line_call(sub { $self->_receive(@_) });
52     $ch->on_close_call(sub { 
53       log_trace { "invoking 'done' on on_close handler for connection id '$id'" };
54       $self->on_close->done(@_);
55     });
56   },
57 );
58
59 has on_close => (
60   is => 'rw', default => sub { $_[0]->_install_future_handlers(CPS::Future->new) },
61   trigger => sub {
62     log_trace { "Installing handlers into future via trigger" };
63     $_[0]->_install_future_handlers($_[1])
64   },
65 );
66
67 has child_pid => (is => 'ro');
68
69 has local_objects_by_id => (
70   is => 'ro', default => sub { {} },
71   coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
72 );
73
74 has remote_objects_by_id => (
75   is => 'ro', default => sub { {} },
76   coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
77 );
78
79 has outstanding_futures => (is => 'ro', default => sub { {} });
80
81 has _json => (
82   is => 'lazy',
83   handles => {
84     _deserialize => 'decode',
85     _encode => 'encode',
86   },
87 );
88
89 after BUILD => sub {
90   my ($self) = @_;
91   my $pid = $self->child_pid;
92   
93   unless (defined $pid) {
94     log_trace { "After BUILD invoked for connection but there was no pid" };
95     return;
96   }
97     
98   log_trace { "Setting process group of child process '$pid'" };
99   
100   #FIXME moving things into a process group has side effects for
101   #users of the library - move to a list
102   setpgrp($self->child_pid, 1);
103 };
104
105 sub BUILD { }
106
107 sub is_valid {
108   my ($self) = @_;
109   my $valid = ! $self->on_close->is_ready;
110   
111   log_trace {
112     my $id = $self->_id;
113     my $text;
114     if ($valid) {
115       $text = 'yes';
116     } else {
117       $text = 'no';
118     }
119     "Connection '$id' is valid: '$text'"
120   };
121   
122   return $valid;
123 }
124
125 sub _fail_outstanding {
126   my ($self, $error) = @_;
127   my $outstanding = $self->outstanding_futures;
128   
129   Dlog_debug { 
130     sprintf "Failing %i outstanding futures with '$error'", scalar(keys(%$outstanding))
131   };
132
133   foreach(keys(%$outstanding)) {
134     log_trace { "Failing future for $_" };
135     my $future = $outstanding->{$_};
136     $future->fail("$error\n");
137   }
138
139   %$outstanding = ();
140   return;
141 }
142
143 sub _install_future_handlers {
144     my ($self, $f) = @_;
145     Dlog_trace { "Installing handlers into future for connection $_" } $self->_id;
146     weaken($self);
147     $f->on_done(sub {
148       my $pid = $self->child_pid;
149       Dlog_trace { "Executing on_done handler in future for connection $_" } $self->_id;
150       $self->_fail_outstanding("Object::Remote connection lost: " . ($f->get)[0]);
151       return unless defined $pid;
152       log_debug { "Waiting for child '$pid' to exit" };
153       my $ret = waitpid($pid, 0);
154       if ($ret != $pid) {
155         log_debug { "Waited for pid $pid but waitpid() returned $ret" };
156         return;
157       } elsif ($? & 127) {
158           log_warn { "Remote interpreter did not exit cleanly" };
159       } else {
160         log_verbose {
161           my $exit_value = $? >> 8;
162           "Remote Perl interpreter exited with value '$exit_value'"
163         };
164       }
165     });
166     return $f; 
167 };
168
169 sub _id_to_remote_object {
170   my ($self, $id) = @_;
171   Dlog_trace { "fetching proxy for remote object with id '$id' for connection $_" } $self->_id;
172   return bless({}, 'Object::Remote::Null') if $id eq 'NULL';
173   (
174     $self->remote_objects_by_id->{$id}
175     or Object::Remote::Handle->new(connection => $self, id => $id)
176   )->proxy;
177 }
178
179 sub _build__json {
180   weaken(my $self = shift);
181   JSON::PP->new->filter_json_single_key_object(
182     __remote_object__ => sub {
183       $self->_id_to_remote_object(@_);
184     }
185   )->filter_json_single_key_object(
186     __remote_code__ => sub {
187       my $code_container = $self->_id_to_remote_object(@_);
188       sub { $code_container->call(@_) };
189     }
190   )->filter_json_single_key_object(
191     __scalar_ref__ => sub {
192       my $value = shift;
193       return \$value;
194     }
195   )->filter_json_single_key_object(
196     __glob_ref__ => sub {
197       my $glob_container = $self->_id_to_remote_object(@_);
198       my $handle = Symbol::gensym;
199       tie *$handle, 'Object::Remote::GlobProxy', $glob_container;
200       return $handle;
201     }
202   )->filter_json_single_key_object(
203     __local_object__ => sub {
204       $self->local_objects_by_id->{$_[0]}
205     }
206   )->filter_json_single_key_object(
207     __remote_tied_hash__ => sub {
208       my %tied_hash;
209       tie %tied_hash, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
210       return \%tied_hash;
211     }
212   )->filter_json_single_key_object(
213     __remote_tied_array__ => sub {
214       my @tied_array;
215       tie @tied_array, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
216       return \@tied_array;
217     }
218   ); 
219 }
220
221 sub _load_if_possible {
222   my ($class) = @_; 
223
224   use_module($class); 
225
226   if ($@) {
227     log_debug { "Attempt at loading '$class' failed with '$@'" };
228   }
229
230 }
231
232 BEGIN {
233   unshift our @Guess, sub { blessed($_[0]) ? $_[0] : undef };
234   map _load_if_possible($_), qw(
235     Object::Remote::Connector::Local
236     Object::Remote::Connector::LocalSudo
237     Object::Remote::Connector::SSH
238     Object::Remote::Connector::UNIX
239   ); 
240 }
241
242 sub conn_from_spec {
243   my ($class, $spec, @args) = @_;
244   foreach my $poss (do { our @Guess }) {
245     if (my $conn = $poss->($spec, @args)) {
246       return $conn;
247     }
248   }
249   
250   return undef;
251 }
252
253 sub new_from_spec {
254   my ($class, $spec, @args) = @_;
255   return $spec if blessed $spec;
256   my $conn = $class->conn_from_spec($spec, @args);
257   
258   die "Couldn't figure out what to do with ${spec}"
259     unless defined $conn;
260     
261   return $conn->maybe::start::connect;  
262 }
263
264 sub remote_object {
265   my ($self, @args) = @_;
266   Object::Remote::Handle->new(
267     connection => $self, @args
268   )->proxy;
269 }
270
271 sub connect {
272   my ($self, $to) = @_;
273   Dlog_debug { "Creating connection to remote node '$to' for connection $_" } $self->_id;
274   return await_future(
275     $self->send_class_call(0, 'Object::Remote', connect => $to)
276   );
277 }
278
279 sub remote_sub {
280   my ($self, $sub) = @_;
281   my ($pkg, $name) = $sub =~ m/^(.*)::([^:]+)$/;
282   Dlog_debug { "Invoking remote sub '$sub' for connection '$_'" } $self->_id;
283   return await_future($self->send_class_call(0, $pkg, can => $name));
284 }
285
286 sub send_class_call {
287   my ($self, $ctx, @call) = @_;
288   Dlog_trace { "Sending a class call for connection $_" } $self->_id;
289   $self->send(call => class_call_handler => $ctx => call => @call);
290 }
291
292 sub register_class_call_handler {
293   my ($self) = @_;
294   $self->local_objects_by_id->{'class_call_handler'} ||= do {
295     my $o = $self->new_class_call_handler;
296     $self->_local_object_to_id($o);
297     $o;
298   };
299 }
300
301 sub new_class_call_handler {
302   Object::Remote::CodeContainer->new(
303     code => sub {
304       my ($class, $method) = (shift, shift);
305       use_module($class)->$method(@_);
306     }
307   );
308 }
309
310 sub register_remote {
311   my ($self, $remote) = @_;
312   Dlog_trace { my $i = $remote->id; "Registered a remote object with id of '$i' for connection $_" } $self->_id;
313   weaken($self->remote_objects_by_id->{$remote->id} = $remote);
314   return $remote;
315 }
316
317 sub send_free {
318   my ($self, $id) = @_;
319   Dlog_trace { "sending request to free object '$id' for connection $_" } $self->_id;
320   #TODO this shows up some times when a remote side dies in the middle of a remote
321   #method invocation - possibly only when the object is being constructed?
322   #(in cleanup) Use of uninitialized value $id in delete at ../Object-Remote/lib/Object/Remote/Connection.
323   delete $self->remote_objects_by_id->{$id};
324   $self->_send([ free => $id ]);
325 }
326
327 sub send {
328   my ($self, $type, @call) = @_;
329
330   my $future = CPS::Future->new;
331   my $remote = $self->remote_objects_by_id->{$call[0]};
332
333   unshift @call, $type => $self->_local_object_to_id($future);
334
335   my $outstanding = $self->outstanding_futures;
336   $outstanding->{$future} = $future;
337   $future->on_ready(sub {
338     undef($remote);
339     delete $outstanding->{$future}
340   });
341
342   $self->_send(\@call);
343
344   return $future;
345 }
346
347 sub send_discard {
348   my ($self, $type, @call) = @_;
349
350   unshift @call, $type => 'NULL';
351
352   $self->_send(\@call);
353 }
354
355 sub _send {
356   my ($self, $to_send) = @_;
357   my $fh = $self->send_to_fh;
358   
359   unless ($self->is_valid) {
360     croak "Attempt to invoke _send on a connection that is not valid";
361   }
362   
363   Dlog_trace { "Starting to serialize data in argument to _send for connection $_" } $self->_id;
364   my $serialized = $self->_serialize($to_send)."\n";
365   Dlog_trace { my $l = length($serialized); "serialization is completed; sending '$l' characters of serialized data to $_" } $fh;
366   my $ret; 
367   eval { 
368     #TODO this should be converted over to a non-blocking ::WriteChannel class
369     die "filehandle is not open" unless openhandle($fh);
370     log_trace { "file handle has passed openhandle() test; printing to it" };
371     $ret = print $fh $serialized;
372     die "print was not successful: $!" unless defined $ret
373   };
374     
375   if ($@) {
376     Dlog_debug { "exception encountered when trying to write to file handle $_: $@" } $fh;
377     my $error = $@;
378     chomp($error);
379     $self->on_close->done("could not write to file handle: $error") unless $self->on_close->is_ready;
380     return; 
381   }
382       
383   return $ret; 
384 }
385
386 sub _serialize {
387   my ($self, $data) = @_;
388   local our @New_Ids = (-1);
389   return eval {
390     my $flat = $self->_encode($self->_deobjectify($data));
391     $flat;
392   } || do {
393     my $err = $@; # won't get here if the eval doesn't die
394     # don't keep refs to new things
395     delete @{$self->local_objects_by_id}{@New_Ids};
396     die "Error serializing: $err";
397   };
398 }
399
400 sub _local_object_to_id {
401   my ($self, $object) = @_;
402   my $id = refaddr($object);
403   $self->local_objects_by_id->{$id} ||= do {
404     push our(@New_Ids), $id if @New_Ids;
405     $object;
406   };
407   return $id;
408 }
409
410 sub _deobjectify {
411   my ($self, $data) = @_;
412   if (blessed($data)) {
413     if (
414       $data->isa('Object::Remote::Proxy')
415       and $data->{remote}->connection == $self
416     ) {
417       return +{ __local_object__ => $data->{remote}->id };
418     } else {
419       return +{ __remote_object__ => $self->_local_object_to_id($data) };
420     }
421   } elsif (my $ref = ref($data)) {
422     if ($ref eq 'HASH') {
423       my $tied_to = tied(%$data);
424       if(defined($tied_to)) {
425         return +{__remote_tied_hash__ => $self->_local_object_to_id($tied_to)}; 
426       } else {
427         return +{ map +($_ => $self->_deobjectify($data->{$_})), keys %$data };
428       }
429     } elsif ($ref eq 'ARRAY') {
430       my $tied_to = tied(@$data);
431       if (defined($tied_to)) {
432         return +{__remote_tied_array__ => $self->_local_object_to_id($tied_to)}; 
433       } else {
434         return [ map $self->_deobjectify($_), @$data ];
435       }
436     } elsif ($ref eq 'CODE') {
437       my $id = $self->_local_object_to_id(
438                  Object::Remote::CodeContainer->new(code => $data)
439                );
440       return +{ __remote_code__ => $id };
441     } elsif ($ref eq 'SCALAR') {
442       return +{ __scalar_ref__ => $$data };
443     } elsif ($ref eq 'GLOB') {
444       return +{ __glob_ref__ => $self->_local_object_to_id(
445         Object::Remote::GlobContainer->new(handle => $data)
446       ) };
447     } else {
448       die "Can't collapse reftype $ref";
449     }
450   }
451   return $data; # plain scalar
452 }
453
454 sub _receive {
455   my ($self, $flat) = @_;
456   Dlog_trace { my $l = length($flat); "Starting to deserialize $l characters of data for connection $_" } $self->_id;
457   my ($type, @rest) = eval { @{$self->_deserialize($flat)} }
458     or do { warn "Deserialize failed for ${flat}: $@"; return };
459   Dlog_trace { "deserialization complete for connection $_" } $self->_id;
460   eval { $self->${\"receive_${type}"}(@rest); 1 }
461     or do { warn "Receive failed for ${flat}: $@"; return };
462   return;
463 }
464
465 sub receive_free {
466   my ($self, $id) = @_;
467   Dlog_trace { "got a receive_free for object '$id' for connection $_" } $self->_id;
468   delete $self->local_objects_by_id->{$id}
469     or warn "Free: no such object $id";
470   return;
471 }
472
473 sub receive_call {
474   my ($self, $future_id, $id, @rest) = @_;
475   Dlog_trace { "got a receive_call for object '$id' for connection $_" } $self->_id;
476   my $future = $self->_id_to_remote_object($future_id);
477   $future->{method} = 'call_discard_free';
478   my $local = $self->local_objects_by_id->{$id}
479     or do { $future->fail("No such object $id"); return };
480   $self->_invoke($future, $local, @rest);
481 }
482
483 sub receive_call_free {
484   my ($self, $future, $id, @rest) = @_;
485   Dlog_trace { "got a receive_call_free for object '$id' for connection $_" } $self->_id;
486   $self->receive_call($future, $id, undef, @rest);
487   $self->receive_free($id);
488 }
489
490 sub _invoke {
491   my ($self, $future, $local, $ctx, $method, @args) = @_;
492   Dlog_trace { "got _invoke for a method named '$method' for connection $_" } $self->_id;
493   if ($method =~ /^start::/) {
494     my $f = $local->$method(@args);
495     $f->on_done(sub { undef($f); $future->done(@_) });
496     return unless $f;
497     $f->on_fail(sub { undef($f); $future->fail(@_) });
498     return;
499   }
500   my $do = sub { $local->$method(@args) };
501   eval {
502     $future->done(
503       defined($ctx)
504         ? ($ctx ? $do->() : scalar($do->()))
505         : do { $do->(); () }
506     );
507     1;
508   } or do { $future->fail($@); return; };
509   return;
510 }
511
512 1;
513
514 =head1 NAME
515
516 Object::Remote::Connection - An underlying connection for L<Object::Remote>
517
518   use Object::Remote;
519   
520   my %opts = (
521     nice => '10', ulimit => '-v 400000',
522     watchdog_timeout => 120, stderr => \*STDERR,
523   );
524   
525   my $local = Object::Remote->connect('-');
526   my $remote = Object::Remote->connect('myserver', nice => 5);
527   my $remote_user = Object::Remote->connect('user@myserver', %opts);
528   my $local_sudo = Object::Remote->connect('user@');
529   
530   #$remote can be any other connection object
531   my $hostname = Sys::Hostname->can::on($remote, 'hostname');
532   
533 =head1 DESCRIPTION
534
535 This is the class that supports connections to a Perl interpreter that is executed in a
536 different process. The new Perl interpreter can be either on the local or a remote machine
537 and is configurable via arguments passed to the constructor.
538
539 =head1 ARGUMENTS
540
541 =over 4
542
543 =item nice
544
545 If this value is defined then it will be used as the nice value of the Perl process when it
546 is started. The default is the undefined value and will not nice the process.
547
548 =item stderr
549
550 If this value is defined then it will be used as the file handle that receives the output
551 of STDERR from the Perl interpreter process and I/O will be performed by the run loop in a
552 non-blocking way. If the value is undefined then STDERR of the remote process will be connected
553 directly to STDERR of the local process with out the run loop managing I/O. The default value
554 is undefined.
555
556 There are a few ways to use this feature. By default the behavior is to form one unified STDERR
557 across all of the Perl interpreters including the local one. For small scale and quick operation
558 this offers a predictable and easy to use way to get at error messages generated anywhere. If
559 the local Perl interpreter crashes then the remote Perl interpreters still have an active STDERR
560 and it is possible to still receive output from them. This is generally a good thing but can
561 cause issues.
562
563 When using a file handle as the output for STDERR once the local Perl interpreter is no longer
564 running there is no longer a valid STDERR for the remote interpreters to send data to. This means
565 that it is no longer possible to receive error output from the remote interpreters and that the
566 shell will start to kill off the child processes. Passing a reference to STDERR for the local
567 interpreter (as the SYNOPSIS shows) causes the run loop to manage I/O, one unified STDERR for
568 all Perl interpreters that ends as soon as the local interpreter process does, and the shell will
569 start killing children when the local interpreter exits.
570
571 It is also possible to pass in a file handle that has been opened for writing. This would be
572 useful for logging the output of the remote interpreter directly into a dedicated file.
573
574 =item ulimit
575
576 If this string is defined then it will be passed unmodified as the arguments to ulimit when
577 the Perl process is started. The default value is the undefined value and will not limit the
578 process in any way.
579
580 =item watchdog_timeout
581
582 If this value is defined then it will be used as the number of seconds the watchdog will wait
583 for an update before it terminates the Perl interpreter process. The default value is undefined
584 and will not use the watchdog. See C<Object::Remote::Watchdog> for more information.
585
586 =back
587
588 =head1 SEE ALSO
589
590 =over 4
591
592 =item C<Object::Remote>
593
594 =back
595
596 =cut