make notes of things to fix before release
[scpubgit/Object-Remote.git] / lib / Object / Remote / Connection.pm
CommitLineData
9e72f0cf 1package Object::Remote::Connection;
2
f4a85080 3use Object::Remote::Logging qw (:log :dlog router);
dc28afe8 4use Object::Remote::Future;
9d804009 5use Object::Remote::Null;
676438a1 6use Object::Remote::Handle;
fe6c9a7f 7use Object::Remote::CodeContainer;
ed5a8a8e 8use Object::Remote::GlobProxy;
9use Object::Remote::GlobContainer;
7790ca36 10use Object::Remote::Tied;
9e72f0cf 11use Object::Remote;
ed5a8a8e 12use Symbol;
9e72f0cf 13use IO::Handle;
c824fdf3 14use POSIX ":sys_wait_h";
9e72f0cf 15use Module::Runtime qw(use_module);
f8080c1c 16use Scalar::Util qw(weaken blessed refaddr openhandle);
9e72f0cf 17use JSON::PP qw(encode_json);
18use Moo;
5add5e29 19use Carp qw(croak);
9e72f0cf 20
e1a0b9ca 21BEGIN { router()->exclude_forwarding }
c824fdf3 22
23END {
24 log_debug { "Killing all child processes in the process group" };
25
eb351344 26 #FIXME update along with setpgrp() to not use a process
27 #group anymore
28
c824fdf3 29 #send SIGINT to the process group for our children
30 kill(1, -2);
31}
32
9031635d 33has _id => ( is => 'ro', required => 1, default => sub { our $NEXT_CONNECTION_ID++ } );
ad4f54b2 34
9e72f0cf 35has send_to_fh => (
36 is => 'ro', required => 1,
90115979 37 trigger => sub {
8f43bcd9 38 my $self = $_[0];
39 $_[1]->autoflush(1);
40 Dlog_trace { my $id = $self->_id; "connection had send_to_fh set to $_" } $_[1];
90115979 41 },
9e72f0cf 42);
43
12fb4a80 44has read_channel => (
9e72f0cf 45 is => 'ro', required => 1,
46 trigger => sub {
12fb4a80 47 my ($self, $ch) = @_;
f8080c1c 48 my $id = $self->_id;
998ff9a4 49 Dlog_trace { "trigger for read_channel has been invoked for connection $id; file handle is $_" } $ch->fh;
9e72f0cf 50 weaken($self);
12fb4a80 51 $ch->on_line_call(sub { $self->_receive(@_) });
f8080c1c 52 $ch->on_close_call(sub {
998ff9a4 53 log_trace { "invoking 'done' on on_close handler for connection id '$id'" };
37efeb68 54 $self->on_close->done(@_);
f8080c1c 55 });
9e72f0cf 56 },
57);
58
12fb4a80 59has on_close => (
353556c4 60 is => 'rw', default => sub { $_[0]->_install_future_handlers(CPS::Future->new) },
998ff9a4 61 trigger => sub {
8f43bcd9 62 log_trace { "Installing handlers into future via trigger" };
63 $_[0]->_install_future_handlers($_[1])
998ff9a4 64 },
12fb4a80 65);
ad4f54b2 66
47c83a13 67has child_pid => (is => 'ro');
68
11dbd4a0 69has local_objects_by_id => (
70 is => 'ro', default => sub { {} },
71 coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
72);
9e72f0cf 73
11dbd4a0 74has remote_objects_by_id => (
75 is => 'ro', default => sub { {} },
76 coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
77);
9e72f0cf 78
a980b0b8 79has outstanding_futures => (is => 'ro', default => sub { {} });
80
353556c4 81has _json => (
82 is => 'lazy',
83 handles => {
84 _deserialize => 'decode',
85 _encode => 'encode',
86 },
87);
88
89after BUILD => sub {
998ff9a4 90 my ($self) = @_;
91 my $pid = $self->child_pid;
353556c4 92
998ff9a4 93 unless (defined $pid) {
8f43bcd9 94 log_trace { "After BUILD invoked for connection but there was no pid" };
95 return;
998ff9a4 96 }
97
98 log_trace { "Setting process group of child process '$pid'" };
353556c4 99
eb351344 100 #FIXME moving things into a process group has side effects for
101 #users of the library - move to a list
353556c4 102 setpgrp($self->child_pid, 1);
103};
104
105sub BUILD { }
106
d2eadebb 107sub is_valid {
108 my ($self) = @_;
109 my $closed = $self->on_close->is_ready;
110
111 log_trace { "Connection closed: $closed" };
112 return ! $closed;
113}
114
12fb4a80 115sub _fail_outstanding {
116 my ($self, $error) = @_;
117 my $outstanding = $self->outstanding_futures;
d2eadebb 118
119 Dlog_debug {
120 sprintf "Failing %i outstanding futures with '$error'", scalar(keys(%$outstanding))
121 };
122
123 foreach(keys(%$outstanding)) {
124 log_trace { "Failing future for $_" };
125 my $future = $outstanding->{$_};
867e4de5 126 $future->fail("$error\n");
d2eadebb 127 }
128
12fb4a80 129 %$outstanding = ();
130 return;
131}
132
353556c4 133sub _install_future_handlers {
134 my ($self, $f) = @_;
998ff9a4 135 Dlog_trace { "Installing handlers into future for connection $_" } $self->_id;
353556c4 136 weaken($self);
137 $f->on_done(sub {
998ff9a4 138 my $pid = $self->child_pid;
139 Dlog_trace { "Executing on_done handler in future for connection $_" } $self->_id;
353556c4 140 $self->_fail_outstanding("Object::Remote connection lost: " . ($f->get)[0]);
998ff9a4 141 return unless defined $pid;
142 log_debug { "Waiting for child '$pid' to exit" };
143 my $ret = waitpid($pid, 0);
144 if ($ret != $pid) {
145 log_debug { "Waited for pid $pid but waitpid() returned $ret" };
146 return;
147 } elsif ($? & 127) {
148 log_warn { "Remote interpreter did not exit cleanly" };
149 } else {
150 log_verbose {
151 my $exit_value = $? >> 8;
152 "Remote Perl interpreter exited with value '$exit_value'"
153 };
154 }
353556c4 155 });
156 return $f;
157};
9e72f0cf 158
fe6c9a7f 159sub _id_to_remote_object {
160 my ($self, $id) = @_;
9031635d 161 Dlog_trace { "fetching proxy for remote object with id '$id' for connection $_" } $self->_id;
fe6c9a7f 162 return bless({}, 'Object::Remote::Null') if $id eq 'NULL';
163 (
164 $self->remote_objects_by_id->{$id}
165 or Object::Remote::Handle->new(connection => $self, id => $id)
166 )->proxy;
167}
168
9e72f0cf 169sub _build__json {
170 weaken(my $self = shift);
9e72f0cf 171 JSON::PP->new->filter_json_single_key_object(
172 __remote_object__ => sub {
fe6c9a7f 173 $self->_id_to_remote_object(@_);
174 }
175 )->filter_json_single_key_object(
176 __remote_code__ => sub {
177 my $code_container = $self->_id_to_remote_object(@_);
178 sub { $code_container->call(@_) };
9e72f0cf 179 }
6ed5d580 180 )->filter_json_single_key_object(
181 __scalar_ref__ => sub {
182 my $value = shift;
183 return \$value;
184 }
ed5a8a8e 185 )->filter_json_single_key_object(
186 __glob_ref__ => sub {
187 my $glob_container = $self->_id_to_remote_object(@_);
188 my $handle = Symbol::gensym;
189 tie *$handle, 'Object::Remote::GlobProxy', $glob_container;
190 return $handle;
191 }
6163a5aa 192 )->filter_json_single_key_object(
193 __local_object__ => sub {
194 $self->local_objects_by_id->{$_[0]}
195 }
7790ca36 196 )->filter_json_single_key_object(
197 __remote_tied_hash__ => sub {
37efeb68 198 my %tied_hash;
199 tie %tied_hash, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
200 return \%tied_hash;
7790ca36 201 }
202 )->filter_json_single_key_object(
203 __remote_tied_array__ => sub {
37efeb68 204 my @tied_array;
205 tie @tied_array, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
206 return \@tied_array;
7790ca36 207 }
208 );
9e72f0cf 209}
210
c824fdf3 211sub _load_if_possible {
212 my ($class) = @_;
213
624072a8 214 use_module($class);
c824fdf3 215
216 if ($@) {
217 log_debug { "Attempt at loading '$class' failed with '$@'" };
218 }
219
220}
221
84b04178 222BEGIN {
223 unshift our @Guess, sub { blessed($_[0]) ? $_[0] : undef };
c824fdf3 224 map _load_if_possible($_), qw(
225 Object::Remote::Connector::Local
226 Object::Remote::Connector::LocalSudo
227 Object::Remote::Connector::SSH
228 Object::Remote::Connector::UNIX
229 );
84b04178 230}
231
c824fdf3 232sub conn_from_spec {
233 my ($class, $spec, @args) = @_;
84b04178 234 foreach my $poss (do { our @Guess }) {
c824fdf3 235 if (my $conn = $poss->($spec, @args)) {
236 return $conn;
fbd3b8ec 237 }
84b04178 238 }
c824fdf3 239
240 return undef;
241}
242
243sub new_from_spec {
71087610 244 my ($class, $spec, @args) = @_;
c824fdf3 245 return $spec if blessed $spec;
71087610 246 my $conn = $class->conn_from_spec($spec, @args);
c824fdf3 247
248 die "Couldn't figure out what to do with ${spec}"
249 unless defined $conn;
250
251 return $conn->maybe::start::connect;
84b04178 252}
253
11dbd4a0 254sub remote_object {
e144d525 255 my ($self, @args) = @_;
256 Object::Remote::Handle->new(
257 connection => $self, @args
258 )->proxy;
259}
260
4c17fea5 261sub connect {
262 my ($self, $to) = @_;
9031635d 263 Dlog_debug { "Creating connection to remote node '$to' for connection $_" } $self->_id;
deb77aaf 264 return await_future(
265 $self->send_class_call(0, 'Object::Remote', connect => $to)
266 );
4c17fea5 267}
268
11dbd4a0 269sub remote_sub {
c6fe6fbd 270 my ($self, $sub) = @_;
271 my ($pkg, $name) = $sub =~ m/^(.*)::([^:]+)$/;
9031635d 272 Dlog_debug { "Invoking remote sub '$sub' for connection $_" } $self->_id;
deb77aaf 273 return await_future($self->send_class_call(0, $pkg, can => $name));
274}
275
276sub send_class_call {
277 my ($self, $ctx, @call) = @_;
9031635d 278 Dlog_trace { "Sending a class call for connection $_" } $self->_id;
deb77aaf 279 $self->send(call => class_call_handler => $ctx => call => @call);
280}
281
282sub register_class_call_handler {
283 my ($self) = @_;
3687a42d 284 $self->local_objects_by_id->{'class_call_handler'} ||= do {
c5736e1c 285 my $o = $self->new_class_call_handler;
3687a42d 286 $self->_local_object_to_id($o);
287 $o;
288 };
c6fe6fbd 289}
290
c5736e1c 291sub new_class_call_handler {
292 Object::Remote::CodeContainer->new(
293 code => sub {
294 my ($class, $method) = (shift, shift);
295 use_module($class)->$method(@_);
296 }
297 );
298}
299
9e72f0cf 300sub register_remote {
301 my ($self, $remote) = @_;
9031635d 302 Dlog_trace { my $i = $remote->id; "Registered a remote object with id of '$i' for connection $_" } $self->_id;
9e72f0cf 303 weaken($self->remote_objects_by_id->{$remote->id} = $remote);
304 return $remote;
305}
306
307sub send_free {
308 my ($self, $id) = @_;
7790ca36 309 Dlog_trace { "sending request to free object '$id' for connection $_" } $self->_id;
302ecfbf 310 #TODO this shows up some times when a remote side dies in the middle of a remote
311 #method invocation - possibly only when the object is being constructed?
312 #(in cleanup) Use of uninitialized value $id in delete at ../Object-Remote/lib/Object/Remote/Connection.
9e72f0cf 313 delete $self->remote_objects_by_id->{$id};
314 $self->_send([ free => $id ]);
315}
316
317sub send {
318 my ($self, $type, @call) = @_;
319
deb77aaf 320 my $future = CPS::Future->new;
a2d43709 321 my $remote = $self->remote_objects_by_id->{$call[0]};
deb77aaf 322
323 unshift @call, $type => $self->_local_object_to_id($future);
9e72f0cf 324
a980b0b8 325 my $outstanding = $self->outstanding_futures;
326 $outstanding->{$future} = $future;
a2d43709 327 $future->on_ready(sub {
328 undef($remote);
329 delete $outstanding->{$future}
330 });
a980b0b8 331
9e72f0cf 332 $self->_send(\@call);
333
334 return $future;
335}
336
9d804009 337sub send_discard {
338 my ($self, $type, @call) = @_;
339
deb77aaf 340 unshift @call, $type => 'NULL';
9d804009 341
342 $self->_send(\@call);
343}
344
9e72f0cf 345sub _send {
346 my ($self, $to_send) = @_;
9d64d2d9 347 my $fh = $self->send_to_fh;
d2eadebb 348
349 unless ($self->is_valid) {
5add5e29 350 croak "Attempt to invoke _send on a connection that is not valid";
d2eadebb 351 }
352
9031635d 353 Dlog_trace { "Starting to serialize data in argument to _send for connection $_" } $self->_id;
9d64d2d9 354 my $serialized = $self->_serialize($to_send)."\n";
6b7b2732 355 Dlog_trace { my $l = length($serialized); "serialization is completed; sending '$l' characters of serialized data to $_" } $fh;
f8080c1c 356 my $ret;
357 eval {
353556c4 358 #TODO this should be converted over to a non-blocking ::WriteChannel class
359 die "filehandle is not open" unless openhandle($fh);
360 log_trace { "file handle has passed openhandle() test; printing to it" };
361 $ret = print $fh $serialized;
362 die "print was not successful: $!" unless defined $ret
f8080c1c 363 };
c824fdf3 364
f8080c1c 365 if ($@) {
353556c4 366 Dlog_debug { "exception encountered when trying to write to file handle $_: $@" } $fh;
998ff9a4 367 my $error = $@;
368 chomp($error);
353556c4 369 $self->on_close->done("could not write to file handle: $error") unless $self->on_close->is_ready;
370 return;
f8080c1c 371 }
372
9d64d2d9 373 return $ret;
9e72f0cf 374}
375
376sub _serialize {
377 my ($self, $data) = @_;
3687a42d 378 local our @New_Ids = (-1);
9d804009 379 return eval {
ad4f54b2 380 my $flat = $self->_encode($self->_deobjectify($data));
ad4f54b2 381 $flat;
70a578ac 382 } || do {
9d804009 383 my $err = $@; # won't get here if the eval doesn't die
384 # don't keep refs to new things
385 delete @{$self->local_objects_by_id}{@New_Ids};
386 die "Error serializing: $err";
387 };
9e72f0cf 388}
389
a76f2f60 390sub _local_object_to_id {
391 my ($self, $object) = @_;
392 my $id = refaddr($object);
393 $self->local_objects_by_id->{$id} ||= do {
3687a42d 394 push our(@New_Ids), $id if @New_Ids;
a76f2f60 395 $object;
396 };
397 return $id;
398}
399
9e72f0cf 400sub _deobjectify {
401 my ($self, $data) = @_;
402 if (blessed($data)) {
6163a5aa 403 if (
404 $data->isa('Object::Remote::Proxy')
405 and $data->{remote}->connection == $self
406 ) {
407 return +{ __local_object__ => $data->{remote}->id };
408 } else {
409 return +{ __remote_object__ => $self->_local_object_to_id($data) };
410 }
9e72f0cf 411 } elsif (my $ref = ref($data)) {
412 if ($ref eq 'HASH') {
7790ca36 413 my $tied_to = tied(%$data);
414 if(defined($tied_to)) {
415 return +{__remote_tied_hash__ => $self->_local_object_to_id($tied_to)};
416 } else {
417 return +{ map +($_ => $self->_deobjectify($data->{$_})), keys %$data };
418 }
9e72f0cf 419 } elsif ($ref eq 'ARRAY') {
7790ca36 420 my $tied_to = tied(@$data);
421 if (defined($tied_to)) {
422 return +{__remote_tied_array__ => $self->_local_object_to_id($tied_to)};
423 } else {
424 return [ map $self->_deobjectify($_), @$data ];
425 }
fe6c9a7f 426 } elsif ($ref eq 'CODE') {
427 my $id = $self->_local_object_to_id(
428 Object::Remote::CodeContainer->new(code => $data)
429 );
430 return +{ __remote_code__ => $id };
6ed5d580 431 } elsif ($ref eq 'SCALAR') {
432 return +{ __scalar_ref__ => $$data };
ed5a8a8e 433 } elsif ($ref eq 'GLOB') {
434 return +{ __glob_ref__ => $self->_local_object_to_id(
435 Object::Remote::GlobContainer->new(handle => $data)
436 ) };
9e72f0cf 437 } else {
438 die "Can't collapse reftype $ref";
439 }
440 }
441 return $data; # plain scalar
442}
443
9e72f0cf 444sub _receive {
ad4f54b2 445 my ($self, $flat) = @_;
9031635d 446 Dlog_trace { my $l = length($flat); "Starting to deserialize $l characters of data for connection $_" } $self->_id;
ad4f54b2 447 my ($type, @rest) = eval { @{$self->_deserialize($flat)} }
448 or do { warn "Deserialize failed for ${flat}: $@"; return };
9031635d 449 Dlog_trace { "deserialization complete for connection $_" } $self->_id;
9e72f0cf 450 eval { $self->${\"receive_${type}"}(@rest); 1 }
ad4f54b2 451 or do { warn "Receive failed for ${flat}: $@"; return };
9e72f0cf 452 return;
453}
454
455sub receive_free {
456 my ($self, $id) = @_;
9031635d 457 Dlog_trace { "got a receive_free for object '$id' for connection $_" } $self->_id;
9d804009 458 delete $self->local_objects_by_id->{$id}
459 or warn "Free: no such object $id";
9e72f0cf 460 return;
461}
462
463sub receive_call {
deb77aaf 464 my ($self, $future_id, $id, @rest) = @_;
9031635d 465 Dlog_trace { "got a receive_call for object '$id' for connection $_" } $self->_id;
deb77aaf 466 my $future = $self->_id_to_remote_object($future_id);
8131a88a 467 $future->{method} = 'call_discard_free';
9e72f0cf 468 my $local = $self->local_objects_by_id->{$id}
469 or do { $future->fail("No such object $id"); return };
470 $self->_invoke($future, $local, @rest);
471}
472
8131a88a 473sub receive_call_free {
474 my ($self, $future, $id, @rest) = @_;
9031635d 475 Dlog_trace { "got a receive_call_free for object '$id' for connection $_" } $self->_id;
84b04178 476 $self->receive_call($future, $id, undef, @rest);
8131a88a 477 $self->receive_free($id);
478}
479
9e72f0cf 480sub _invoke {
84b04178 481 my ($self, $future, $local, $ctx, $method, @args) = @_;
9031635d 482 Dlog_trace { "got _invoke for a method named '$method' for connection $_" } $self->_id;
dc28afe8 483 if ($method =~ /^start::/) {
484 my $f = $local->$method(@args);
485 $f->on_done(sub { undef($f); $future->done(@_) });
3f1f1e66 486 return unless $f;
dc28afe8 487 $f->on_fail(sub { undef($f); $future->fail(@_) });
488 return;
489 }
84b04178 490 my $do = sub { $local->$method(@args) };
491 eval {
492 $future->done(
493 defined($ctx)
494 ? ($ctx ? $do->() : scalar($do->()))
495 : do { $do->(); () }
496 );
497 1;
498 } or do { $future->fail($@); return; };
9e72f0cf 499 return;
500}
501
9e72f0cf 5021;
b9a9982d 503
504=head1 NAME
505
506Object::Remote::Connection - An underlying connection for L<Object::Remote>
507
b0ec7e3b 508 use Object::Remote;
509
510 my %opts = (
511 nice => '10', ulimit => '-v 400000',
512 watchdog_timeout => 120, stderr => \*STDERR,
513 );
514
515 my $local = Object::Remote->connect('-');
516 my $remote = Object::Remote->connect('myserver', nice => 5);
517 my $remote_user = Object::Remote->connect('user@myserver', %opts);
518 my $local_sudo = Object::Remote->connect('user@');
519
520 #$remote can be any other connection object
521 my $hostname = Sys::Hostname->can::on($remote, 'hostname');
522
523=head1 DESCRIPTION
524
525This is the class that supports connections to a Perl interpreter that is executed in a
526different process. The new Perl interpreter can be either on the local or a remote machine
527and is configurable via arguments passed to the constructor.
528
529=head1 ARGUMENTS
530
531=over 4
532
533=item nice
534
535If this value is defined then it will be used as the nice value of the Perl process when it
536is started. The default is the undefined value and will not nice the process.
537
538=item stderr
539
540If this value is defined then it will be used as the file handle that receives the output
541of STDERR from the Perl interpreter process and I/O will be performed by the run loop in a
542non-blocking way. If the value is undefined then STDERR of the remote process will be connected
543directly to STDERR of the local process with out the run loop managing I/O. The default value
544is undefined.
545
546There are a few ways to use this feature. By default the behavior is to form one unified STDERR
547across all of the Perl interpreters including the local one. For small scale and quick operation
548this offers a predictable and easy to use way to get at error messages generated anywhere. If
549the local Perl interpreter crashes then the remote Perl interpreters still have an active STDERR
550and it is possible to still receive output from them. This is generally a good thing but can
551cause issues.
552
553When using a file handle as the output for STDERR once the local Perl interpreter is no longer
554running there is no longer a valid STDERR for the remote interpreters to send data to. This means
555that it is no longer possible to receive error output from the remote interpreters and that the
556shell will start to kill off the child processes. Passing a reference to STDERR for the local
557interpreter (as the SYNOPSIS shows) causes the run loop to manage I/O, one unified STDERR for
558all Perl interpreters that ends as soon as the local interpreter process does, and the shell will
559start killing children when the local interpreter exits.
560
561It is also possible to pass in a file handle that has been opened for writing. This would be
562useful for logging the output of the remote interpreter directly into a dedicated file.
563
564=item ulimit
565
566If this string is defined then it will be passed unmodified as the arguments to ulimit when
567the Perl process is started. The default value is the undefined value and will not limit the
568process in any way.
569
570=item watchdog_timeout
571
572If this value is defined then it will be used as the number of seconds the watchdog will wait
573for an update before it terminates the Perl interpreter process. The default value is undefined
574and will not use the watchdog. See C<Object::Remote::Watchdog> for more information.
575
576=back
577
578=head1 SEE ALSO
579
580=over 4
581
582=item C<Object::Remote>
b9a9982d 583
b0ec7e3b 584=back
b9a9982d 585
586=cut