move PIPE signal handler to miniloop and update timeout param in t/timeout.t to match...
[scpubgit/Object-Remote.git] / lib / Object / Remote / Connection.pm
1 package Object::Remote::Connection;
2
3 use Object::Remote::Logging qw (:log :dlog router);
4 use Object::Remote::Future;
5 use Object::Remote::Null;
6 use Object::Remote::Handle;
7 use Object::Remote::CodeContainer;
8 use Object::Remote::GlobProxy;
9 use Object::Remote::GlobContainer;
10 use Object::Remote::Tied;
11 use Object::Remote;
12 use Symbol;
13 use IO::Handle;
14 use POSIX ":sys_wait_h";
15 use Module::Runtime qw(use_module);
16 use Scalar::Util qw(weaken blessed refaddr openhandle);
17 use JSON::PP qw(encode_json);
18 use Moo;
19 use Carp qw(croak);
20
21 BEGIN { router()->exclude_forwarding }
22
23 END {
24   log_debug { "Killing all child processes in the process group" };
25     
26   #send SIGINT to the process group for our children
27   kill(1, -2);
28 }
29
30 has _id => ( is => 'ro', required => 1, default => sub { our $NEXT_CONNECTION_ID++ } );
31
32 has send_to_fh => (
33   is => 'ro', required => 1,
34   trigger => sub {
35       my $self = $_[0];
36       $_[1]->autoflush(1);
37       Dlog_trace { my $id = $self->_id; "connection had send_to_fh set to $_"  } $_[1];
38   },
39 );
40
41 has read_channel => (
42   is => 'ro', required => 1,
43   trigger => sub {
44     my ($self, $ch) = @_;
45     my $id = $self->_id; 
46     Dlog_trace { "trigger for read_channel has been invoked for connection $id; file handle is $_" } $ch->fh;
47     weaken($self);
48     $ch->on_line_call(sub { $self->_receive(@_) });
49     $ch->on_close_call(sub { 
50       log_trace { "invoking 'done' on on_close handler for connection id '$id'" };
51       $self->on_close->done(@_);
52     });
53   },
54 );
55
56 has on_close => (
57   is => 'rw', default => sub { $_[0]->_install_future_handlers(CPS::Future->new) },
58   trigger => sub {
59       log_trace { "Installing handlers into future via trigger" };
60       $_[0]->_install_future_handlers($_[1])
61   },
62 );
63
64 has child_pid => (is => 'ro');
65
66 has local_objects_by_id => (
67   is => 'ro', default => sub { {} },
68   coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
69 );
70
71 has remote_objects_by_id => (
72   is => 'ro', default => sub { {} },
73   coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
74 );
75
76 has outstanding_futures => (is => 'ro', default => sub { {} });
77
78 has _json => (
79   is => 'lazy',
80   handles => {
81     _deserialize => 'decode',
82     _encode => 'encode',
83   },
84 );
85
86 after BUILD => sub {
87   my ($self) = @_;
88   my $pid = $self->child_pid;
89   
90   unless (defined $pid) {
91       log_trace { "After BUILD invoked for connection but there was no pid" };
92       return;
93   }
94     
95   log_trace { "Setting process group of child process '$pid'" };
96   
97   setpgrp($self->child_pid, 1);
98 };
99
100 sub BUILD { }
101
102 sub is_valid {
103   my ($self) = @_;
104   my $closed = $self->on_close->is_ready;
105   
106   log_trace { "Connection closed: $closed" };
107   return ! $closed;
108 }
109
110 sub _fail_outstanding {
111   my ($self, $error) = @_;
112   my $outstanding = $self->outstanding_futures;
113   
114   Dlog_debug { 
115     sprintf "Failing %i outstanding futures with '$error'", scalar(keys(%$outstanding))
116   };
117
118   foreach(keys(%$outstanding)) {
119     log_trace { "Failing future for $_" };
120     my $future = $outstanding->{$_};
121     $future->fail("$error\n");
122   }
123
124   %$outstanding = ();
125   return;
126 }
127
128 sub _install_future_handlers {
129     my ($self, $f) = @_;
130     Dlog_trace { "Installing handlers into future for connection $_" } $self->_id;
131     weaken($self);
132     $f->on_done(sub {
133       my $pid = $self->child_pid;
134       Dlog_trace { "Executing on_done handler in future for connection $_" } $self->_id;
135       $self->_fail_outstanding("Object::Remote connection lost: " . ($f->get)[0]);
136       return unless defined $pid;
137       log_debug { "Waiting for child '$pid' to exit" };
138       my $ret = waitpid($pid, 0);
139       if ($ret != $pid) {
140         log_debug { "Waited for pid $pid but waitpid() returned $ret" };
141         return;
142       } elsif ($? & 127) {
143           log_warn { "Remote interpreter did not exit cleanly" };
144       } else {
145         log_verbose {
146           my $exit_value = $? >> 8;
147           "Remote Perl interpreter exited with value '$exit_value'"
148         };
149       }
150     });
151     return $f; 
152 };
153
154 sub _id_to_remote_object {
155   my ($self, $id) = @_;
156   Dlog_trace { "fetching proxy for remote object with id '$id' for connection $_" } $self->_id;
157   return bless({}, 'Object::Remote::Null') if $id eq 'NULL';
158   (
159     $self->remote_objects_by_id->{$id}
160     or Object::Remote::Handle->new(connection => $self, id => $id)
161   )->proxy;
162 }
163
164 sub _build__json {
165   weaken(my $self = shift);
166   JSON::PP->new->filter_json_single_key_object(
167     __remote_object__ => sub {
168       $self->_id_to_remote_object(@_);
169     }
170   )->filter_json_single_key_object(
171     __remote_code__ => sub {
172       my $code_container = $self->_id_to_remote_object(@_);
173       sub { $code_container->call(@_) };
174     }
175   )->filter_json_single_key_object(
176     __scalar_ref__ => sub {
177       my $value = shift;
178       return \$value;
179     }
180   )->filter_json_single_key_object(
181     __glob_ref__ => sub {
182       my $glob_container = $self->_id_to_remote_object(@_);
183       my $handle = Symbol::gensym;
184       tie *$handle, 'Object::Remote::GlobProxy', $glob_container;
185       return $handle;
186     }
187   )->filter_json_single_key_object(
188     __local_object__ => sub {
189       $self->local_objects_by_id->{$_[0]}
190     }
191   )->filter_json_single_key_object(
192     __remote_tied_hash__ => sub {
193       my %tied_hash;
194       tie %tied_hash, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
195       return \%tied_hash;
196     }
197   )->filter_json_single_key_object(
198     __remote_tied_array__ => sub {
199       my @tied_array;
200       tie @tied_array, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
201       return \@tied_array;
202     }
203   ); 
204 }
205
206 sub _load_if_possible {
207   my ($class) = @_; 
208
209   use_module($class); 
210
211   if ($@) {
212     log_debug { "Attempt at loading '$class' failed with '$@'" };
213   }
214
215 }
216
217 BEGIN {
218   unshift our @Guess, sub { blessed($_[0]) ? $_[0] : undef };
219   map _load_if_possible($_), qw(
220     Object::Remote::Connector::Local
221     Object::Remote::Connector::LocalSudo
222     Object::Remote::Connector::SSH
223     Object::Remote::Connector::UNIX
224   ); 
225 }
226
227 sub conn_from_spec {
228   my ($class, $spec, @args) = @_;
229   foreach my $poss (do { our @Guess }) {
230     if (my $conn = $poss->($spec, @args)) {
231       return $conn;
232     }
233   }
234   
235   return undef;
236 }
237
238 sub new_from_spec {
239   my ($class, $spec) = @_;
240   return $spec if blessed $spec;
241   my $conn = $class->conn_from_spec($spec); 
242   
243   die "Couldn't figure out what to do with ${spec}"
244     unless defined $conn;
245     
246   return $conn->maybe::start::connect;  
247 }
248
249 sub remote_object {
250   my ($self, @args) = @_;
251   Object::Remote::Handle->new(
252     connection => $self, @args
253   )->proxy;
254 }
255
256 sub connect {
257   my ($self, $to) = @_;
258   Dlog_debug { "Creating connection to remote node '$to' for connection $_" } $self->_id;
259   return await_future(
260     $self->send_class_call(0, 'Object::Remote', connect => $to)
261   );
262 }
263
264 sub remote_sub {
265   my ($self, $sub) = @_;
266   my ($pkg, $name) = $sub =~ m/^(.*)::([^:]+)$/;
267   Dlog_debug { "Invoking remote sub '$sub' for connection $_" } $self->_id;
268   return await_future($self->send_class_call(0, $pkg, can => $name));
269 }
270
271 sub send_class_call {
272   my ($self, $ctx, @call) = @_;
273   Dlog_trace { "Sending a class call for connection $_" } $self->_id;
274   $self->send(call => class_call_handler => $ctx => call => @call);
275 }
276
277 sub register_class_call_handler {
278   my ($self) = @_;
279   $self->local_objects_by_id->{'class_call_handler'} ||= do {
280     my $o = $self->new_class_call_handler;
281     $self->_local_object_to_id($o);
282     $o;
283   };
284 }
285
286 sub new_class_call_handler {
287   Object::Remote::CodeContainer->new(
288     code => sub {
289       my ($class, $method) = (shift, shift);
290       use_module($class)->$method(@_);
291     }
292   );
293 }
294
295 sub register_remote {
296   my ($self, $remote) = @_;
297   Dlog_trace { my $i = $remote->id; "Registered a remote object with id of '$i' for connection $_" } $self->_id;
298   weaken($self->remote_objects_by_id->{$remote->id} = $remote);
299   return $remote;
300 }
301
302 sub send_free {
303   my ($self, $id) = @_;
304   Dlog_trace { "sending request to free object '$id' for connection $_" } $self->_id;
305   #TODO this shows up some times when a remote side dies in the middle of a remote
306   #method invocation - possibly only when the object is being constructed?
307   #(in cleanup) Use of uninitialized value $id in delete at ../Object-Remote/lib/Object/Remote/Connection.
308   delete $self->remote_objects_by_id->{$id};
309   $self->_send([ free => $id ]);
310 }
311
312 sub send {
313   my ($self, $type, @call) = @_;
314
315   my $future = CPS::Future->new;
316   my $remote = $self->remote_objects_by_id->{$call[0]};
317
318   unshift @call, $type => $self->_local_object_to_id($future);
319
320   my $outstanding = $self->outstanding_futures;
321   $outstanding->{$future} = $future;
322   $future->on_ready(sub {
323     undef($remote);
324     delete $outstanding->{$future}
325   });
326
327   $self->_send(\@call);
328
329   return $future;
330 }
331
332 sub send_discard {
333   my ($self, $type, @call) = @_;
334
335   unshift @call, $type => 'NULL';
336
337   $self->_send(\@call);
338 }
339
340 sub _send {
341   my ($self, $to_send) = @_;
342   my $fh = $self->send_to_fh;
343   
344   unless ($self->is_valid) {
345     croak "Attempt to invoke _send on a connection that is not valid";
346   }
347   
348   Dlog_trace { "Starting to serialize data in argument to _send for connection $_" } $self->_id;
349   my $serialized = $self->_serialize($to_send)."\n";
350   Dlog_trace { my $l = length($serialized); "serialization is completed; sending '$l' characters of serialized data to $_" } $fh;
351   my $ret; 
352   eval { 
353     #TODO this should be converted over to a non-blocking ::WriteChannel class
354     die "filehandle is not open" unless openhandle($fh);
355     log_trace { "file handle has passed openhandle() test; printing to it" };
356     $ret = print $fh $serialized;
357     die "print was not successful: $!" unless defined $ret
358   };
359     
360   if ($@) {
361     Dlog_debug { "exception encountered when trying to write to file handle $_: $@" } $fh;
362     my $error = $@;
363     chomp($error);
364     $self->on_close->done("could not write to file handle: $error") unless $self->on_close->is_ready;
365     return; 
366   }
367       
368   return $ret; 
369 }
370
371 sub _serialize {
372   my ($self, $data) = @_;
373   local our @New_Ids = (-1);
374   return eval {
375     my $flat = $self->_encode($self->_deobjectify($data));
376     $flat;
377   } || do {
378     my $err = $@; # won't get here if the eval doesn't die
379     # don't keep refs to new things
380     delete @{$self->local_objects_by_id}{@New_Ids};
381     die "Error serializing: $err";
382   };
383 }
384
385 sub _local_object_to_id {
386   my ($self, $object) = @_;
387   my $id = refaddr($object);
388   $self->local_objects_by_id->{$id} ||= do {
389     push our(@New_Ids), $id if @New_Ids;
390     $object;
391   };
392   return $id;
393 }
394
395 sub _deobjectify {
396   my ($self, $data) = @_;
397   if (blessed($data)) {
398     if (
399       $data->isa('Object::Remote::Proxy')
400       and $data->{remote}->connection == $self
401     ) {
402       return +{ __local_object__ => $data->{remote}->id };
403     } else {
404       return +{ __remote_object__ => $self->_local_object_to_id($data) };
405     }
406   } elsif (my $ref = ref($data)) {
407     if ($ref eq 'HASH') {
408       my $tied_to = tied(%$data);
409       if(defined($tied_to)) {
410         return +{__remote_tied_hash__ => $self->_local_object_to_id($tied_to)}; 
411       } else {
412         return +{ map +($_ => $self->_deobjectify($data->{$_})), keys %$data };
413       }
414     } elsif ($ref eq 'ARRAY') {
415       my $tied_to = tied(@$data);
416       if (defined($tied_to)) {
417         return +{__remote_tied_array__ => $self->_local_object_to_id($tied_to)}; 
418       } else {
419         return [ map $self->_deobjectify($_), @$data ];
420       }
421     } elsif ($ref eq 'CODE') {
422       my $id = $self->_local_object_to_id(
423                  Object::Remote::CodeContainer->new(code => $data)
424                );
425       return +{ __remote_code__ => $id };
426     } elsif ($ref eq 'SCALAR') {
427       return +{ __scalar_ref__ => $$data };
428     } elsif ($ref eq 'GLOB') {
429       return +{ __glob_ref__ => $self->_local_object_to_id(
430         Object::Remote::GlobContainer->new(handle => $data)
431       ) };
432     } else {
433       die "Can't collapse reftype $ref";
434     }
435   }
436   return $data; # plain scalar
437 }
438
439 sub _receive {
440   my ($self, $flat) = @_;
441   Dlog_trace { my $l = length($flat); "Starting to deserialize $l characters of data for connection $_" } $self->_id;
442   my ($type, @rest) = eval { @{$self->_deserialize($flat)} }
443     or do { warn "Deserialize failed for ${flat}: $@"; return };
444   Dlog_trace { "deserialization complete for connection $_" } $self->_id;
445   eval { $self->${\"receive_${type}"}(@rest); 1 }
446     or do { warn "Receive failed for ${flat}: $@"; return };
447   return;
448 }
449
450 sub receive_free {
451   my ($self, $id) = @_;
452   Dlog_trace { "got a receive_free for object '$id' for connection $_" } $self->_id;
453   delete $self->local_objects_by_id->{$id}
454     or warn "Free: no such object $id";
455   return;
456 }
457
458 sub receive_call {
459   my ($self, $future_id, $id, @rest) = @_;
460   Dlog_trace { "got a receive_call for object '$id' for connection $_" } $self->_id;
461   my $future = $self->_id_to_remote_object($future_id);
462   $future->{method} = 'call_discard_free';
463   my $local = $self->local_objects_by_id->{$id}
464     or do { $future->fail("No such object $id"); return };
465   $self->_invoke($future, $local, @rest);
466 }
467
468 sub receive_call_free {
469   my ($self, $future, $id, @rest) = @_;
470   Dlog_trace { "got a receive_call_free for object '$id' for connection $_" } $self->_id;
471   $self->receive_call($future, $id, undef, @rest);
472   $self->receive_free($id);
473 }
474
475 sub _invoke {
476   my ($self, $future, $local, $ctx, $method, @args) = @_;
477   Dlog_trace { "got _invoke for a method named '$method' for connection $_" } $self->_id;
478   if ($method =~ /^start::/) {
479     my $f = $local->$method(@args);
480     $f->on_done(sub { undef($f); $future->done(@_) });
481     return unless $f;
482     $f->on_fail(sub { undef($f); $future->fail(@_) });
483     return;
484   }
485   my $do = sub { $local->$method(@args) };
486   eval {
487     $future->done(
488       defined($ctx)
489         ? ($ctx ? $do->() : scalar($do->()))
490         : do { $do->(); () }
491     );
492     1;
493   } or do { $future->fail($@); return; };
494   return;
495 }
496
497 1;
498
499 =head1 NAME
500
501 Object::Remote::Connection - An underlying connection for L<Object::Remote>
502
503 =head1 LAME
504
505 Shipping prioritised over writing this part up. Blame mst.
506
507 =cut