cleanup trailing whitespace ugliness
[scpubgit/Object-Remote.git] / lib / Object / Remote / Connection.pm
1 package Object::Remote::Connection;
2
3 use Object::Remote::Logging qw (:log :dlog router);
4 use Object::Remote::Future;
5 use Object::Remote::Null;
6 use Object::Remote::Handle;
7 use Object::Remote::CodeContainer;
8 use Object::Remote::GlobProxy;
9 use Object::Remote::GlobContainer;
10 use Object::Remote::Tied;
11 use Object::Remote;
12 use Symbol;
13 use IO::Handle;
14 use POSIX ":sys_wait_h";
15 use Module::Runtime qw(use_module);
16 use Scalar::Util qw(weaken blessed refaddr openhandle);
17 use JSON::PP qw(encode_json);
18 use Moo;
19 use Carp qw(croak);
20
21 BEGIN { router()->exclude_forwarding }
22
23 END {
24   our %child_pids;
25
26   log_trace { "END handler is being invoked in " . __PACKAGE__ };
27
28   foreach(keys(%child_pids)) {
29     log_debug { "Killing child process '$_'" };
30     kill('TERM', $_);
31   }
32 }
33
34 has _id => ( is => 'ro', required => 1, default => sub { our $NEXT_CONNECTION_ID++ } );
35
36 has send_to_fh => (
37   is => 'ro', required => 1,
38   trigger => sub {
39     my $self = $_[0];
40     $_[1]->autoflush(1);
41     Dlog_trace { my $id = $self->_id; "connection had send_to_fh set to $_"  } $_[1];
42   },
43 );
44
45 has read_channel => (
46   is => 'ro', required => 1,
47   trigger => sub {
48     my ($self, $ch) = @_;
49     my $id = $self->_id;
50     Dlog_trace { "trigger for read_channel has been invoked for connection $id; file handle is $_" } $ch->fh;
51     weaken($self);
52     $ch->on_line_call(sub { $self->_receive(@_) });
53     $ch->on_close_call(sub {
54       log_trace { "invoking 'done' on on_close handler for connection id '$id'" };
55       $self->on_close->done(@_);
56     });
57   },
58 );
59
60 has on_close => (
61   is => 'rw', default => sub { $_[0]->_install_future_handlers(CPS::Future->new) },
62   trigger => sub {
63     log_trace { "Installing handlers into future via trigger" };
64     $_[0]->_install_future_handlers($_[1])
65   },
66 );
67
68 has child_pid => (is => 'ro');
69
70 has local_objects_by_id => (
71   is => 'ro', default => sub { {} },
72   coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
73 );
74
75 has remote_objects_by_id => (
76   is => 'ro', default => sub { {} },
77   coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
78 );
79
80 has outstanding_futures => (is => 'ro', default => sub { {} });
81
82 has _json => (
83   is => 'lazy',
84   handles => {
85     _deserialize => 'decode',
86     _encode => 'encode',
87   },
88 );
89
90 after BUILD => sub {
91   my ($self) = @_;
92   my $pid = $self->child_pid;
93   our %child_pids;
94   return unless defined $pid;
95   $child_pids{$pid} = 1;
96   return;
97 };
98
99 sub BUILD { }
100
101 sub is_valid {
102   my ($self) = @_;
103   my $valid = ! $self->on_close->is_ready;
104
105   log_trace {
106     my $id = $self->_id;
107     my $text;
108     if ($valid) {
109       $text = 'yes';
110     } else {
111       $text = 'no';
112     }
113     "Connection '$id' is valid: '$text'"
114   };
115
116   return $valid;
117 }
118
119 sub _fail_outstanding {
120   my ($self, $error) = @_;
121   my $outstanding = $self->outstanding_futures;
122
123   Dlog_debug {
124     sprintf "Failing %i outstanding futures with '$error'", scalar(keys(%$outstanding))
125   };
126
127   foreach(keys(%$outstanding)) {
128     log_trace { "Failing future for $_" };
129     my $future = $outstanding->{$_};
130     $future->fail("$error\n");
131   }
132
133   %$outstanding = ();
134   return;
135 }
136
137 sub _install_future_handlers {
138     my ($self, $f) = @_;
139     our %child_pids;
140     Dlog_trace { "Installing handlers into future for connection $_" } $self->_id;
141     weaken($self);
142     $f->on_done(sub {
143       my $pid = $self->child_pid;
144       Dlog_trace { "Executing on_done handler in future for connection $_" } $self->_id;
145       $self->_fail_outstanding("Object::Remote connection lost: " . ($f->get)[0]);
146       return unless defined $pid;
147       log_debug { "Waiting for child '$pid' to exit" };
148       my $ret = waitpid($pid, 0);
149       if ($ret != $pid) {
150         log_debug { "Waited for pid $pid but waitpid() returned $ret" };
151         return;
152       } elsif ($? & 127) {
153           log_warn { "Remote interpreter did not exit cleanly" };
154       } else {
155         log_verbose {
156           my $exit_value = $? >> 8;
157           "Remote Perl interpreter exited with value '$exit_value'"
158         };
159       }
160
161       delete $child_pids{$pid};
162     });
163     return $f;
164 };
165
166 sub _id_to_remote_object {
167   my ($self, $id) = @_;
168   Dlog_trace { "fetching proxy for remote object with id '$id' for connection $_" } $self->_id;
169   return bless({}, 'Object::Remote::Null') if $id eq 'NULL';
170   (
171     $self->remote_objects_by_id->{$id}
172     or Object::Remote::Handle->new(connection => $self, id => $id)
173   )->proxy;
174 }
175
176 sub _build__json {
177   weaken(my $self = shift);
178   JSON::PP->new->filter_json_single_key_object(
179     __remote_object__ => sub {
180       $self->_id_to_remote_object(@_);
181     }
182   )->filter_json_single_key_object(
183     __remote_code__ => sub {
184       my $code_container = $self->_id_to_remote_object(@_);
185       sub { $code_container->call(@_) };
186     }
187   )->filter_json_single_key_object(
188     __scalar_ref__ => sub {
189       my $value = shift;
190       return \$value;
191     }
192   )->filter_json_single_key_object(
193     __glob_ref__ => sub {
194       my $glob_container = $self->_id_to_remote_object(@_);
195       my $handle = Symbol::gensym;
196       tie *$handle, 'Object::Remote::GlobProxy', $glob_container;
197       return $handle;
198     }
199   )->filter_json_single_key_object(
200     __local_object__ => sub {
201       $self->local_objects_by_id->{$_[0]}
202     }
203   )->filter_json_single_key_object(
204     __remote_tied_hash__ => sub {
205       my %tied_hash;
206       tie %tied_hash, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
207       return \%tied_hash;
208     }
209   )->filter_json_single_key_object(
210     __remote_tied_array__ => sub {
211       my @tied_array;
212       tie @tied_array, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
213       return \@tied_array;
214     }
215   ); 
216 }
217
218 sub _load_if_possible {
219   my ($class) = @_;
220
221   use_module($class);
222
223   if ($@) {
224     log_debug { "Attempt at loading '$class' failed with '$@'" };
225   }
226
227 }
228
229 BEGIN {
230   unshift our @Guess, sub { blessed($_[0]) ? $_[0] : undef };
231   map _load_if_possible($_), qw(
232     Object::Remote::Connector::Local
233     Object::Remote::Connector::LocalSudo
234     Object::Remote::Connector::SSH
235     Object::Remote::Connector::UNIX
236   );
237 }
238
239 sub conn_from_spec {
240   my ($class, $spec, @args) = @_;
241   foreach my $poss (do { our @Guess }) {
242     if (my $conn = $poss->($spec, @args)) {
243       return $conn;
244     }
245   }
246
247   return undef;
248 }
249
250 sub new_from_spec {
251   my ($class, $spec, @args) = @_;
252   return $spec if blessed $spec;
253   my $conn = $class->conn_from_spec($spec, @args);
254
255   die "Couldn't figure out what to do with ${spec}"
256     unless defined $conn;
257
258   return $conn->maybe::start::connect;
259 }
260
261 sub remote_object {
262   my ($self, @args) = @_;
263   Object::Remote::Handle->new(
264     connection => $self, @args
265   )->proxy;
266 }
267
268 sub connect {
269   my ($self, $to) = @_;
270   Dlog_debug { "Creating connection to remote node '$to' for connection $_" } $self->_id;
271   return await_future(
272     $self->send_class_call(0, 'Object::Remote', connect => $to)
273   );
274 }
275
276 sub remote_sub {
277   my ($self, $sub) = @_;
278   my ($pkg, $name) = $sub =~ m/^(.*)::([^:]+)$/;
279   Dlog_debug { "Invoking remote sub '$sub' for connection '$_'" } $self->_id;
280   return await_future($self->send_class_call(0, $pkg, can => $name));
281 }
282
283 sub send_class_call {
284   my ($self, $ctx, @call) = @_;
285   Dlog_trace { "Sending a class call for connection $_" } $self->_id;
286   $self->send(call => class_call_handler => $ctx => call => @call);
287 }
288
289 sub register_class_call_handler {
290   my ($self) = @_;
291   $self->local_objects_by_id->{'class_call_handler'} ||= do {
292     my $o = $self->new_class_call_handler;
293     $self->_local_object_to_id($o);
294     $o;
295   };
296 }
297
298 sub new_class_call_handler {
299   Object::Remote::CodeContainer->new(
300     code => sub {
301       my ($class, $method) = (shift, shift);
302       use_module($class)->$method(@_);
303     }
304   );
305 }
306
307 sub register_remote {
308   my ($self, $remote) = @_;
309   Dlog_trace { my $i = $remote->id; "Registered a remote object with id of '$i' for connection $_" } $self->_id;
310   weaken($self->remote_objects_by_id->{$remote->id} = $remote);
311   return $remote;
312 }
313
314 sub send_free {
315   my ($self, $id) = @_;
316   Dlog_trace { "sending request to free object '$id' for connection $_" } $self->_id;
317   #TODO this shows up some times when a remote side dies in the middle of a remote
318   #method invocation - possibly only when the object is being constructed?
319   #(in cleanup) Use of uninitialized value $id in delete at ../Object-Remote/lib/Object/Remote/Connection.
320   delete $self->remote_objects_by_id->{$id};
321   $self->_send([ free => $id ]);
322 }
323
324 sub send {
325   my ($self, $type, @call) = @_;
326
327   my $future = CPS::Future->new;
328   my $remote = $self->remote_objects_by_id->{$call[0]};
329
330   unshift @call, $type => $self->_local_object_to_id($future);
331
332   my $outstanding = $self->outstanding_futures;
333   $outstanding->{$future} = $future;
334   $future->on_ready(sub {
335     undef($remote);
336     delete $outstanding->{$future}
337   });
338
339   $self->_send(\@call);
340
341   return $future;
342 }
343
344 sub send_discard {
345   my ($self, $type, @call) = @_;
346
347   unshift @call, $type => 'NULL';
348
349   $self->_send(\@call);
350 }
351
352 sub _send {
353   my ($self, $to_send) = @_;
354   my $fh = $self->send_to_fh;
355
356   unless ($self->is_valid) {
357     croak "Attempt to invoke _send on a connection that is not valid";
358   }
359
360   Dlog_trace { "Starting to serialize data in argument to _send for connection $_" } $self->_id;
361   my $serialized = $self->_serialize($to_send)."\n";
362   Dlog_trace { my $l = length($serialized); "serialization is completed; sending '$l' characters of serialized data to $_" } $fh;
363   my $ret;
364   eval {
365     #TODO this should be converted over to a non-blocking ::WriteChannel class
366     die "filehandle is not open" unless openhandle($fh);
367     log_trace { "file handle has passed openhandle() test; printing to it" };
368     $ret = print $fh $serialized;
369     die "print was not successful: $!" unless defined $ret
370   };
371
372   if ($@) {
373     Dlog_debug { "exception encountered when trying to write to file handle $_: $@" } $fh;
374     my $error = $@;
375     chomp($error);
376     $self->on_close->done("could not write to file handle: $error") unless $self->on_close->is_ready;
377     return;
378   }
379
380   return $ret;
381 }
382
383 sub _serialize {
384   my ($self, $data) = @_;
385   local our @New_Ids = (-1);
386   return eval {
387     my $flat = $self->_encode($self->_deobjectify($data));
388     $flat;
389   } || do {
390     my $err = $@; # won't get here if the eval doesn't die
391     # don't keep refs to new things
392     delete @{$self->local_objects_by_id}{@New_Ids};
393     die "Error serializing: $err";
394   };
395 }
396
397 sub _local_object_to_id {
398   my ($self, $object) = @_;
399   my $id = refaddr($object);
400   $self->local_objects_by_id->{$id} ||= do {
401     push our(@New_Ids), $id if @New_Ids;
402     $object;
403   };
404   return $id;
405 }
406
407 sub _deobjectify {
408   my ($self, $data) = @_;
409   if (blessed($data)) {
410     if (
411       $data->isa('Object::Remote::Proxy')
412       and $data->{remote}->connection == $self
413     ) {
414       return +{ __local_object__ => $data->{remote}->id };
415     } else {
416       return +{ __remote_object__ => $self->_local_object_to_id($data) };
417     }
418   } elsif (my $ref = ref($data)) {
419     if ($ref eq 'HASH') {
420       my $tied_to = tied(%$data);
421       if(defined($tied_to)) {
422         return +{__remote_tied_hash__ => $self->_local_object_to_id($tied_to)};
423       } else {
424         return +{ map +($_ => $self->_deobjectify($data->{$_})), keys %$data };
425       }
426     } elsif ($ref eq 'ARRAY') {
427       my $tied_to = tied(@$data);
428       if (defined($tied_to)) {
429         return +{__remote_tied_array__ => $self->_local_object_to_id($tied_to)};
430       } else {
431         return [ map $self->_deobjectify($_), @$data ];
432       }
433     } elsif ($ref eq 'CODE') {
434       my $id = $self->_local_object_to_id(
435                  Object::Remote::CodeContainer->new(code => $data)
436                );
437       return +{ __remote_code__ => $id };
438     } elsif ($ref eq 'SCALAR') {
439       return +{ __scalar_ref__ => $$data };
440     } elsif ($ref eq 'GLOB') {
441       return +{ __glob_ref__ => $self->_local_object_to_id(
442         Object::Remote::GlobContainer->new(handle => $data)
443       ) };
444     } else {
445       die "Can't collapse reftype $ref";
446     }
447   }
448   return $data; # plain scalar
449 }
450
451 sub _receive {
452   my ($self, $flat) = @_;
453   Dlog_trace { my $l = length($flat); "Starting to deserialize $l characters of data for connection $_" } $self->_id;
454   my ($type, @rest) = eval { @{$self->_deserialize($flat)} }
455     or do { warn "Deserialize failed for ${flat}: $@"; return };
456   Dlog_trace { "deserialization complete for connection $_" } $self->_id;
457   eval { $self->${\"receive_${type}"}(@rest); 1 }
458     or do { warn "Receive failed for ${flat}: $@"; return };
459   return;
460 }
461
462 sub receive_free {
463   my ($self, $id) = @_;
464   Dlog_trace { "got a receive_free for object '$id' for connection $_" } $self->_id;
465   delete $self->local_objects_by_id->{$id}
466     or warn "Free: no such object $id";
467   return;
468 }
469
470 sub receive_call {
471   my ($self, $future_id, $id, @rest) = @_;
472   Dlog_trace { "got a receive_call for object '$id' for connection $_" } $self->_id;
473   my $future = $self->_id_to_remote_object($future_id);
474   $future->{method} = 'call_discard_free';
475   my $local = $self->local_objects_by_id->{$id}
476     or do { $future->fail("No such object $id"); return };
477   $self->_invoke($future, $local, @rest);
478 }
479
480 sub receive_call_free {
481   my ($self, $future, $id, @rest) = @_;
482   Dlog_trace { "got a receive_call_free for object '$id' for connection $_" } $self->_id;
483   $self->receive_call($future, $id, undef, @rest);
484   $self->receive_free($id);
485 }
486
487 sub _invoke {
488   my ($self, $future, $local, $ctx, $method, @args) = @_;
489   Dlog_trace { "got _invoke for a method named '$method' for connection $_" } $self->_id;
490   if ($method =~ /^start::/) {
491     my $f = $local->$method(@args);
492     $f->on_done(sub { undef($f); $future->done(@_) });
493     return unless $f;
494     $f->on_fail(sub { undef($f); $future->fail(@_) });
495     return;
496   }
497   my $do = sub { $local->$method(@args) };
498   eval {
499     $future->done(
500       defined($ctx)
501         ? ($ctx ? $do->() : scalar($do->()))
502         : do { $do->(); () }
503     );
504     1;
505   } or do { $future->fail($@); return; };
506   return;
507 }
508
509 1;
510
511 =head1 NAME
512
513 Object::Remote::Connection - An underlying connection for L<Object::Remote>
514
515   use Object::Remote;
516
517   my %opts = (
518     nice => '10', ulimit => '-v 400000',
519     watchdog_timeout => 120, stderr => \*STDERR,
520   );
521
522   my $local = Object::Remote->connect('-');
523   my $remote = Object::Remote->connect('myserver', nice => 5);
524   my $remote_user = Object::Remote->connect('user@myserver', %opts);
525   my $local_sudo = Object::Remote->connect('user@');
526
527   #$remote can be any other connection object
528   my $hostname = Sys::Hostname->can::on($remote, 'hostname');
529
530 =head1 DESCRIPTION
531
532 This is the class that supports connections to a Perl interpreter that is executed in a
533 different process. The new Perl interpreter can be either on the local or a remote machine
534 and is configurable via arguments passed to the constructor.
535
536 =head1 ARGUMENTS
537
538 =over 4
539
540 =item nice
541
542 If this value is defined then it will be used as the nice value of the Perl process when it
543 is started. The default is the undefined value and will not nice the process.
544
545 =item stderr
546
547 If this value is defined then it will be used as the file handle that receives the output
548 of STDERR from the Perl interpreter process and I/O will be performed by the run loop in a
549 non-blocking way. If the value is undefined then STDERR of the remote process will be connected
550 directly to STDERR of the local process with out the run loop managing I/O. The default value
551 is undefined.
552
553 There are a few ways to use this feature. By default the behavior is to form one unified STDERR
554 across all of the Perl interpreters including the local one. For small scale and quick operation
555 this offers a predictable and easy to use way to get at error messages generated anywhere. If
556 the local Perl interpreter crashes then the remote Perl interpreters still have an active STDERR
557 and it is possible to still receive output from them. This is generally a good thing but can
558 cause issues.
559
560 When using a file handle as the output for STDERR once the local Perl interpreter is no longer
561 running there is no longer a valid STDERR for the remote interpreters to send data to. This means
562 that it is no longer possible to receive error output from the remote interpreters and that the
563 shell will start to kill off the child processes. Passing a reference to STDERR for the local
564 interpreter (as the SYNOPSIS shows) causes the run loop to manage I/O, one unified STDERR for
565 all Perl interpreters that ends as soon as the local interpreter process does, and the shell will
566 start killing children when the local interpreter exits.
567
568 It is also possible to pass in a file handle that has been opened for writing. This would be
569 useful for logging the output of the remote interpreter directly into a dedicated file.
570
571 =item ulimit
572
573 If this string is defined then it will be passed unmodified as the arguments to ulimit when
574 the Perl process is started. The default value is the undefined value and will not limit the
575 process in any way.
576
577 =item watchdog_timeout
578
579 If this value is defined then it will be used as the number of seconds the watchdog will wait
580 for an update before it terminates the Perl interpreter process. The default value is undefined
581 and will not use the watchdog. See C<Object::Remote::Watchdog> for more information.
582
583 =back
584
585 =head1 SEE ALSO
586
587 =over 4
588
589 =item C<Object::Remote>
590
591 =back
592
593 =cut