136b065aac3284ed345cbe598a5e595b1ac155e2
[scpubgit/Object-Remote.git] / lib / Object / Remote / Connection.pm
1 package Object::Remote::Connection;
2
3 use Object::Remote::Logging qw (:log :dlog router);
4 use Object::Remote::Future;
5 use Object::Remote::Null;
6 use Object::Remote::Handle;
7 use Object::Remote::CodeContainer;
8 use Object::Remote::GlobProxy;
9 use Object::Remote::GlobContainer;
10 use Object::Remote::Tied;
11 use Object::Remote;
12 use Symbol;
13 use IO::Handle;
14 use POSIX ":sys_wait_h";
15 use Module::Runtime qw(use_module);
16 use Scalar::Util qw(weaken blessed refaddr openhandle);
17 use JSON::PP qw(encode_json);
18 use Moo;
19 use Carp qw(croak);
20
21 BEGIN { router()->exclude_forwarding }
22
23 END {
24   log_debug { "Killing all child processes in the process group" };
25     
26   #send SIGINT to the process group for our children
27   kill(1, -2);
28 }
29
30 has _id => ( is => 'ro', required => 1, default => sub { our $NEXT_CONNECTION_ID++ } );
31
32 has send_to_fh => (
33   is => 'ro', required => 1,
34   trigger => sub {
35     my $self = $_[0];
36     $_[1]->autoflush(1);
37     Dlog_trace { my $id = $self->_id; "connection had send_to_fh set to $_"  } $_[1];
38   },
39 );
40
41 has read_channel => (
42   is => 'ro', required => 1,
43   trigger => sub {
44     my ($self, $ch) = @_;
45     my $id = $self->_id; 
46     Dlog_trace { "trigger for read_channel has been invoked for connection $id; file handle is $_" } $ch->fh;
47     weaken($self);
48     $ch->on_line_call(sub { $self->_receive(@_) });
49     $ch->on_close_call(sub { 
50       log_trace { "invoking 'done' on on_close handler for connection id '$id'" };
51       $self->on_close->done(@_);
52     });
53   },
54 );
55
56 has on_close => (
57   is => 'rw', default => sub { $_[0]->_install_future_handlers(CPS::Future->new) },
58   trigger => sub {
59     log_trace { "Installing handlers into future via trigger" };
60     $_[0]->_install_future_handlers($_[1])
61   },
62 );
63
64 has child_pid => (is => 'ro');
65
66 has local_objects_by_id => (
67   is => 'ro', default => sub { {} },
68   coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
69 );
70
71 has remote_objects_by_id => (
72   is => 'ro', default => sub { {} },
73   coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
74 );
75
76 has outstanding_futures => (is => 'ro', default => sub { {} });
77
78 has _json => (
79   is => 'lazy',
80   handles => {
81     _deserialize => 'decode',
82     _encode => 'encode',
83   },
84 );
85
86 after BUILD => sub {
87   my ($self) = @_;
88   my $pid = $self->child_pid;
89   
90   unless (defined $pid) {
91     log_trace { "After BUILD invoked for connection but there was no pid" };
92     return;
93   }
94     
95   log_trace { "Setting process group of child process '$pid'" };
96   
97   setpgrp($self->child_pid, 1);
98 };
99
100 sub BUILD { }
101
102 sub is_valid {
103   my ($self) = @_;
104   my $closed = $self->on_close->is_ready;
105   
106   log_trace { "Connection closed: $closed" };
107   return ! $closed;
108 }
109
110 sub _fail_outstanding {
111   my ($self, $error) = @_;
112   my $outstanding = $self->outstanding_futures;
113   
114   Dlog_debug { 
115     sprintf "Failing %i outstanding futures with '$error'", scalar(keys(%$outstanding))
116   };
117
118   foreach(keys(%$outstanding)) {
119     log_trace { "Failing future for $_" };
120     my $future = $outstanding->{$_};
121     $future->fail("$error\n");
122   }
123
124   %$outstanding = ();
125   return;
126 }
127
128 sub _install_future_handlers {
129     my ($self, $f) = @_;
130     Dlog_trace { "Installing handlers into future for connection $_" } $self->_id;
131     weaken($self);
132     $f->on_done(sub {
133       my $pid = $self->child_pid;
134       Dlog_trace { "Executing on_done handler in future for connection $_" } $self->_id;
135       $self->_fail_outstanding("Object::Remote connection lost: " . ($f->get)[0]);
136       return unless defined $pid;
137       log_debug { "Waiting for child '$pid' to exit" };
138       my $ret = waitpid($pid, 0);
139       if ($ret != $pid) {
140         log_debug { "Waited for pid $pid but waitpid() returned $ret" };
141         return;
142       } elsif ($? & 127) {
143           log_warn { "Remote interpreter did not exit cleanly" };
144       } else {
145         log_verbose {
146           my $exit_value = $? >> 8;
147           "Remote Perl interpreter exited with value '$exit_value'"
148         };
149       }
150     });
151     return $f; 
152 };
153
154 sub _id_to_remote_object {
155   my ($self, $id) = @_;
156   Dlog_trace { "fetching proxy for remote object with id '$id' for connection $_" } $self->_id;
157   return bless({}, 'Object::Remote::Null') if $id eq 'NULL';
158   (
159     $self->remote_objects_by_id->{$id}
160     or Object::Remote::Handle->new(connection => $self, id => $id)
161   )->proxy;
162 }
163
164 sub _build__json {
165   weaken(my $self = shift);
166   JSON::PP->new->filter_json_single_key_object(
167     __remote_object__ => sub {
168       $self->_id_to_remote_object(@_);
169     }
170   )->filter_json_single_key_object(
171     __remote_code__ => sub {
172       my $code_container = $self->_id_to_remote_object(@_);
173       sub { $code_container->call(@_) };
174     }
175   )->filter_json_single_key_object(
176     __scalar_ref__ => sub {
177       my $value = shift;
178       return \$value;
179     }
180   )->filter_json_single_key_object(
181     __glob_ref__ => sub {
182       my $glob_container = $self->_id_to_remote_object(@_);
183       my $handle = Symbol::gensym;
184       tie *$handle, 'Object::Remote::GlobProxy', $glob_container;
185       return $handle;
186     }
187   )->filter_json_single_key_object(
188     __local_object__ => sub {
189       $self->local_objects_by_id->{$_[0]}
190     }
191   )->filter_json_single_key_object(
192     __remote_tied_hash__ => sub {
193       my %tied_hash;
194       tie %tied_hash, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
195       return \%tied_hash;
196     }
197   )->filter_json_single_key_object(
198     __remote_tied_array__ => sub {
199       my @tied_array;
200       tie @tied_array, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
201       return \@tied_array;
202     }
203   ); 
204 }
205
206 sub _load_if_possible {
207   my ($class) = @_; 
208
209   use_module($class); 
210
211   if ($@) {
212     log_debug { "Attempt at loading '$class' failed with '$@'" };
213   }
214
215 }
216
217 BEGIN {
218   unshift our @Guess, sub { blessed($_[0]) ? $_[0] : undef };
219   map _load_if_possible($_), qw(
220     Object::Remote::Connector::Local
221     Object::Remote::Connector::LocalSudo
222     Object::Remote::Connector::SSH
223     Object::Remote::Connector::UNIX
224   ); 
225 }
226
227 sub conn_from_spec {
228   my ($class, $spec, @args) = @_;
229   foreach my $poss (do { our @Guess }) {
230     if (my $conn = $poss->($spec, @args)) {
231       return $conn;
232     }
233   }
234   
235   return undef;
236 }
237
238 sub new_from_spec {
239   my ($class, $spec, @args) = @_;
240   return $spec if blessed $spec;
241   my $conn = $class->conn_from_spec($spec, @args);
242   
243   die "Couldn't figure out what to do with ${spec}"
244     unless defined $conn;
245     
246   return $conn->maybe::start::connect;  
247 }
248
249 sub remote_object {
250   my ($self, @args) = @_;
251   Object::Remote::Handle->new(
252     connection => $self, @args
253   )->proxy;
254 }
255
256 sub connect {
257   my ($self, $to) = @_;
258   Dlog_debug { "Creating connection to remote node '$to' for connection $_" } $self->_id;
259   return await_future(
260     $self->send_class_call(0, 'Object::Remote', connect => $to)
261   );
262 }
263
264 sub remote_sub {
265   my ($self, $sub) = @_;
266   my ($pkg, $name) = $sub =~ m/^(.*)::([^:]+)$/;
267   Dlog_debug { "Invoking remote sub '$sub' for connection $_" } $self->_id;
268   return await_future($self->send_class_call(0, $pkg, can => $name));
269 }
270
271 sub send_class_call {
272   my ($self, $ctx, @call) = @_;
273   Dlog_trace { "Sending a class call for connection $_" } $self->_id;
274   $self->send(call => class_call_handler => $ctx => call => @call);
275 }
276
277 sub register_class_call_handler {
278   my ($self) = @_;
279   $self->local_objects_by_id->{'class_call_handler'} ||= do {
280     my $o = $self->new_class_call_handler;
281     $self->_local_object_to_id($o);
282     $o;
283   };
284 }
285
286 sub new_class_call_handler {
287   Object::Remote::CodeContainer->new(
288     code => sub {
289       my ($class, $method) = (shift, shift);
290       use_module($class)->$method(@_);
291     }
292   );
293 }
294
295 sub register_remote {
296   my ($self, $remote) = @_;
297   Dlog_trace { my $i = $remote->id; "Registered a remote object with id of '$i' for connection $_" } $self->_id;
298   weaken($self->remote_objects_by_id->{$remote->id} = $remote);
299   return $remote;
300 }
301
302 sub send_free {
303   my ($self, $id) = @_;
304   Dlog_trace { "sending request to free object '$id' for connection $_" } $self->_id;
305   #TODO this shows up some times when a remote side dies in the middle of a remote
306   #method invocation - possibly only when the object is being constructed?
307   #(in cleanup) Use of uninitialized value $id in delete at ../Object-Remote/lib/Object/Remote/Connection.
308   delete $self->remote_objects_by_id->{$id};
309   $self->_send([ free => $id ]);
310 }
311
312 sub send {
313   my ($self, $type, @call) = @_;
314
315   my $future = CPS::Future->new;
316   my $remote = $self->remote_objects_by_id->{$call[0]};
317
318   unshift @call, $type => $self->_local_object_to_id($future);
319
320   my $outstanding = $self->outstanding_futures;
321   $outstanding->{$future} = $future;
322   $future->on_ready(sub {
323     undef($remote);
324     delete $outstanding->{$future}
325   });
326
327   $self->_send(\@call);
328
329   return $future;
330 }
331
332 sub send_discard {
333   my ($self, $type, @call) = @_;
334
335   unshift @call, $type => 'NULL';
336
337   $self->_send(\@call);
338 }
339
340 sub _send {
341   my ($self, $to_send) = @_;
342   my $fh = $self->send_to_fh;
343   
344   unless ($self->is_valid) {
345     croak "Attempt to invoke _send on a connection that is not valid";
346   }
347   
348   Dlog_trace { "Starting to serialize data in argument to _send for connection $_" } $self->_id;
349   my $serialized = $self->_serialize($to_send)."\n";
350   Dlog_trace { my $l = length($serialized); "serialization is completed; sending '$l' characters of serialized data to $_" } $fh;
351   my $ret; 
352   eval { 
353     #TODO this should be converted over to a non-blocking ::WriteChannel class
354     die "filehandle is not open" unless openhandle($fh);
355     log_trace { "file handle has passed openhandle() test; printing to it" };
356     $ret = print $fh $serialized;
357     die "print was not successful: $!" unless defined $ret
358   };
359     
360   if ($@) {
361     Dlog_debug { "exception encountered when trying to write to file handle $_: $@" } $fh;
362     my $error = $@;
363     chomp($error);
364     $self->on_close->done("could not write to file handle: $error") unless $self->on_close->is_ready;
365     return; 
366   }
367       
368   return $ret; 
369 }
370
371 sub _serialize {
372   my ($self, $data) = @_;
373   local our @New_Ids = (-1);
374   return eval {
375     my $flat = $self->_encode($self->_deobjectify($data));
376     $flat;
377   } || do {
378     my $err = $@; # won't get here if the eval doesn't die
379     # don't keep refs to new things
380     delete @{$self->local_objects_by_id}{@New_Ids};
381     die "Error serializing: $err";
382   };
383 }
384
385 sub _local_object_to_id {
386   my ($self, $object) = @_;
387   my $id = refaddr($object);
388   $self->local_objects_by_id->{$id} ||= do {
389     push our(@New_Ids), $id if @New_Ids;
390     $object;
391   };
392   return $id;
393 }
394
395 sub _deobjectify {
396   my ($self, $data) = @_;
397   if (blessed($data)) {
398     if (
399       $data->isa('Object::Remote::Proxy')
400       and $data->{remote}->connection == $self
401     ) {
402       return +{ __local_object__ => $data->{remote}->id };
403     } else {
404       return +{ __remote_object__ => $self->_local_object_to_id($data) };
405     }
406   } elsif (my $ref = ref($data)) {
407     if ($ref eq 'HASH') {
408       my $tied_to = tied(%$data);
409       if(defined($tied_to)) {
410         return +{__remote_tied_hash__ => $self->_local_object_to_id($tied_to)}; 
411       } else {
412         return +{ map +($_ => $self->_deobjectify($data->{$_})), keys %$data };
413       }
414     } elsif ($ref eq 'ARRAY') {
415       my $tied_to = tied(@$data);
416       if (defined($tied_to)) {
417         return +{__remote_tied_array__ => $self->_local_object_to_id($tied_to)}; 
418       } else {
419         return [ map $self->_deobjectify($_), @$data ];
420       }
421     } elsif ($ref eq 'CODE') {
422       my $id = $self->_local_object_to_id(
423                  Object::Remote::CodeContainer->new(code => $data)
424                );
425       return +{ __remote_code__ => $id };
426     } elsif ($ref eq 'SCALAR') {
427       return +{ __scalar_ref__ => $$data };
428     } elsif ($ref eq 'GLOB') {
429       return +{ __glob_ref__ => $self->_local_object_to_id(
430         Object::Remote::GlobContainer->new(handle => $data)
431       ) };
432     } else {
433       die "Can't collapse reftype $ref";
434     }
435   }
436   return $data; # plain scalar
437 }
438
439 sub _receive {
440   my ($self, $flat) = @_;
441   Dlog_trace { my $l = length($flat); "Starting to deserialize $l characters of data for connection $_" } $self->_id;
442   my ($type, @rest) = eval { @{$self->_deserialize($flat)} }
443     or do { warn "Deserialize failed for ${flat}: $@"; return };
444   Dlog_trace { "deserialization complete for connection $_" } $self->_id;
445   eval { $self->${\"receive_${type}"}(@rest); 1 }
446     or do { warn "Receive failed for ${flat}: $@"; return };
447   return;
448 }
449
450 sub receive_free {
451   my ($self, $id) = @_;
452   Dlog_trace { "got a receive_free for object '$id' for connection $_" } $self->_id;
453   delete $self->local_objects_by_id->{$id}
454     or warn "Free: no such object $id";
455   return;
456 }
457
458 sub receive_call {
459   my ($self, $future_id, $id, @rest) = @_;
460   Dlog_trace { "got a receive_call for object '$id' for connection $_" } $self->_id;
461   my $future = $self->_id_to_remote_object($future_id);
462   $future->{method} = 'call_discard_free';
463   my $local = $self->local_objects_by_id->{$id}
464     or do { $future->fail("No such object $id"); return };
465   $self->_invoke($future, $local, @rest);
466 }
467
468 sub receive_call_free {
469   my ($self, $future, $id, @rest) = @_;
470   Dlog_trace { "got a receive_call_free for object '$id' for connection $_" } $self->_id;
471   $self->receive_call($future, $id, undef, @rest);
472   $self->receive_free($id);
473 }
474
475 sub _invoke {
476   my ($self, $future, $local, $ctx, $method, @args) = @_;
477   Dlog_trace { "got _invoke for a method named '$method' for connection $_" } $self->_id;
478   if ($method =~ /^start::/) {
479     my $f = $local->$method(@args);
480     $f->on_done(sub { undef($f); $future->done(@_) });
481     return unless $f;
482     $f->on_fail(sub { undef($f); $future->fail(@_) });
483     return;
484   }
485   my $do = sub { $local->$method(@args) };
486   eval {
487     $future->done(
488       defined($ctx)
489         ? ($ctx ? $do->() : scalar($do->()))
490         : do { $do->(); () }
491     );
492     1;
493   } or do { $future->fail($@); return; };
494   return;
495 }
496
497 1;
498
499 =head1 NAME
500
501 Object::Remote::Connection - An underlying connection for L<Object::Remote>
502
503   use Object::Remote;
504   
505   my %opts = (
506     nice => '10', ulimit => '-v 400000',
507     watchdog_timeout => 120, stderr => \*STDERR,
508   );
509   
510   my $local = Object::Remote->connect('-');
511   my $remote = Object::Remote->connect('myserver', nice => 5);
512   my $remote_user = Object::Remote->connect('user@myserver', %opts);
513   my $local_sudo = Object::Remote->connect('user@');
514   
515   #$remote can be any other connection object
516   my $hostname = Sys::Hostname->can::on($remote, 'hostname');
517   
518 =head1 DESCRIPTION
519
520 This is the class that supports connections to a Perl interpreter that is executed in a
521 different process. The new Perl interpreter can be either on the local or a remote machine
522 and is configurable via arguments passed to the constructor.
523
524 =head1 ARGUMENTS
525
526 =over 4
527
528 =item nice
529
530 If this value is defined then it will be used as the nice value of the Perl process when it
531 is started. The default is the undefined value and will not nice the process.
532
533 =item stderr
534
535 If this value is defined then it will be used as the file handle that receives the output
536 of STDERR from the Perl interpreter process and I/O will be performed by the run loop in a
537 non-blocking way. If the value is undefined then STDERR of the remote process will be connected
538 directly to STDERR of the local process with out the run loop managing I/O. The default value
539 is undefined.
540
541 There are a few ways to use this feature. By default the behavior is to form one unified STDERR
542 across all of the Perl interpreters including the local one. For small scale and quick operation
543 this offers a predictable and easy to use way to get at error messages generated anywhere. If
544 the local Perl interpreter crashes then the remote Perl interpreters still have an active STDERR
545 and it is possible to still receive output from them. This is generally a good thing but can
546 cause issues.
547
548 When using a file handle as the output for STDERR once the local Perl interpreter is no longer
549 running there is no longer a valid STDERR for the remote interpreters to send data to. This means
550 that it is no longer possible to receive error output from the remote interpreters and that the
551 shell will start to kill off the child processes. Passing a reference to STDERR for the local
552 interpreter (as the SYNOPSIS shows) causes the run loop to manage I/O, one unified STDERR for
553 all Perl interpreters that ends as soon as the local interpreter process does, and the shell will
554 start killing children when the local interpreter exits.
555
556 It is also possible to pass in a file handle that has been opened for writing. This would be
557 useful for logging the output of the remote interpreter directly into a dedicated file.
558
559 =item ulimit
560
561 If this string is defined then it will be passed unmodified as the arguments to ulimit when
562 the Perl process is started. The default value is the undefined value and will not limit the
563 process in any way.
564
565 =item watchdog_timeout
566
567 If this value is defined then it will be used as the number of seconds the watchdog will wait
568 for an update before it terminates the Perl interpreter process. The default value is undefined
569 and will not use the watchdog. See C<Object::Remote::Watchdog> for more information.
570
571 =back
572
573 =head1 SEE ALSO
574
575 =over 4
576
577 =item C<Object::Remote>
578
579 =back
580
581 =cut