cleanup trailing whitespace ugliness
[scpubgit/Object-Remote.git] / lib / Object / Remote / Connection.pm
CommitLineData
9e72f0cf 1package Object::Remote::Connection;
2
f4a85080 3use Object::Remote::Logging qw (:log :dlog router);
dc28afe8 4use Object::Remote::Future;
9d804009 5use Object::Remote::Null;
676438a1 6use Object::Remote::Handle;
fe6c9a7f 7use Object::Remote::CodeContainer;
ed5a8a8e 8use Object::Remote::GlobProxy;
9use Object::Remote::GlobContainer;
7790ca36 10use Object::Remote::Tied;
9e72f0cf 11use Object::Remote;
ed5a8a8e 12use Symbol;
9e72f0cf 13use IO::Handle;
c824fdf3 14use POSIX ":sys_wait_h";
9e72f0cf 15use Module::Runtime qw(use_module);
f8080c1c 16use Scalar::Util qw(weaken blessed refaddr openhandle);
9e72f0cf 17use JSON::PP qw(encode_json);
18use Moo;
5add5e29 19use Carp qw(croak);
9e72f0cf 20
e1a0b9ca 21BEGIN { router()->exclude_forwarding }
c824fdf3 22
23END {
5ccce2d5 24 our %child_pids;
55c0d020 25
5ccce2d5 26 log_trace { "END handler is being invoked in " . __PACKAGE__ };
55c0d020 27
5ccce2d5 28 foreach(keys(%child_pids)) {
29 log_debug { "Killing child process '$_'" };
30 kill('TERM', $_);
31 }
c824fdf3 32}
33
9031635d 34has _id => ( is => 'ro', required => 1, default => sub { our $NEXT_CONNECTION_ID++ } );
ad4f54b2 35
9e72f0cf 36has send_to_fh => (
37 is => 'ro', required => 1,
90115979 38 trigger => sub {
8f43bcd9 39 my $self = $_[0];
40 $_[1]->autoflush(1);
41 Dlog_trace { my $id = $self->_id; "connection had send_to_fh set to $_" } $_[1];
90115979 42 },
9e72f0cf 43);
44
12fb4a80 45has read_channel => (
9e72f0cf 46 is => 'ro', required => 1,
47 trigger => sub {
12fb4a80 48 my ($self, $ch) = @_;
55c0d020 49 my $id = $self->_id;
998ff9a4 50 Dlog_trace { "trigger for read_channel has been invoked for connection $id; file handle is $_" } $ch->fh;
9e72f0cf 51 weaken($self);
12fb4a80 52 $ch->on_line_call(sub { $self->_receive(@_) });
55c0d020 53 $ch->on_close_call(sub {
998ff9a4 54 log_trace { "invoking 'done' on on_close handler for connection id '$id'" };
37efeb68 55 $self->on_close->done(@_);
f8080c1c 56 });
9e72f0cf 57 },
58);
59
12fb4a80 60has on_close => (
353556c4 61 is => 'rw', default => sub { $_[0]->_install_future_handlers(CPS::Future->new) },
998ff9a4 62 trigger => sub {
8f43bcd9 63 log_trace { "Installing handlers into future via trigger" };
64 $_[0]->_install_future_handlers($_[1])
998ff9a4 65 },
12fb4a80 66);
ad4f54b2 67
47c83a13 68has child_pid => (is => 'ro');
69
11dbd4a0 70has local_objects_by_id => (
71 is => 'ro', default => sub { {} },
72 coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
73);
9e72f0cf 74
11dbd4a0 75has remote_objects_by_id => (
76 is => 'ro', default => sub { {} },
77 coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
78);
9e72f0cf 79
a980b0b8 80has outstanding_futures => (is => 'ro', default => sub { {} });
81
353556c4 82has _json => (
83 is => 'lazy',
84 handles => {
85 _deserialize => 'decode',
86 _encode => 'encode',
87 },
88);
89
90after BUILD => sub {
998ff9a4 91 my ($self) = @_;
92 my $pid = $self->child_pid;
5ccce2d5 93 our %child_pids;
94 return unless defined $pid;
95 $child_pids{$pid} = 1;
96 return;
353556c4 97};
98
99sub BUILD { }
100
d2eadebb 101sub is_valid {
102 my ($self) = @_;
fb258df6 103 my $valid = ! $self->on_close->is_ready;
55c0d020 104
fb258df6 105 log_trace {
106 my $id = $self->_id;
107 my $text;
108 if ($valid) {
109 $text = 'yes';
110 } else {
111 $text = 'no';
112 }
113 "Connection '$id' is valid: '$text'"
114 };
55c0d020 115
fb258df6 116 return $valid;
d2eadebb 117}
118
12fb4a80 119sub _fail_outstanding {
120 my ($self, $error) = @_;
121 my $outstanding = $self->outstanding_futures;
55c0d020 122
123 Dlog_debug {
d2eadebb 124 sprintf "Failing %i outstanding futures with '$error'", scalar(keys(%$outstanding))
125 };
126
127 foreach(keys(%$outstanding)) {
128 log_trace { "Failing future for $_" };
129 my $future = $outstanding->{$_};
867e4de5 130 $future->fail("$error\n");
d2eadebb 131 }
132
12fb4a80 133 %$outstanding = ();
134 return;
135}
136
353556c4 137sub _install_future_handlers {
138 my ($self, $f) = @_;
5ccce2d5 139 our %child_pids;
998ff9a4 140 Dlog_trace { "Installing handlers into future for connection $_" } $self->_id;
353556c4 141 weaken($self);
142 $f->on_done(sub {
998ff9a4 143 my $pid = $self->child_pid;
144 Dlog_trace { "Executing on_done handler in future for connection $_" } $self->_id;
353556c4 145 $self->_fail_outstanding("Object::Remote connection lost: " . ($f->get)[0]);
998ff9a4 146 return unless defined $pid;
147 log_debug { "Waiting for child '$pid' to exit" };
148 my $ret = waitpid($pid, 0);
149 if ($ret != $pid) {
150 log_debug { "Waited for pid $pid but waitpid() returned $ret" };
151 return;
152 } elsif ($? & 127) {
153 log_warn { "Remote interpreter did not exit cleanly" };
154 } else {
155 log_verbose {
156 my $exit_value = $? >> 8;
157 "Remote Perl interpreter exited with value '$exit_value'"
158 };
159 }
55c0d020 160
5ccce2d5 161 delete $child_pids{$pid};
353556c4 162 });
55c0d020 163 return $f;
353556c4 164};
9e72f0cf 165
fe6c9a7f 166sub _id_to_remote_object {
167 my ($self, $id) = @_;
9031635d 168 Dlog_trace { "fetching proxy for remote object with id '$id' for connection $_" } $self->_id;
fe6c9a7f 169 return bless({}, 'Object::Remote::Null') if $id eq 'NULL';
170 (
171 $self->remote_objects_by_id->{$id}
172 or Object::Remote::Handle->new(connection => $self, id => $id)
173 )->proxy;
174}
175
9e72f0cf 176sub _build__json {
177 weaken(my $self = shift);
9e72f0cf 178 JSON::PP->new->filter_json_single_key_object(
179 __remote_object__ => sub {
fe6c9a7f 180 $self->_id_to_remote_object(@_);
181 }
182 )->filter_json_single_key_object(
183 __remote_code__ => sub {
184 my $code_container = $self->_id_to_remote_object(@_);
185 sub { $code_container->call(@_) };
9e72f0cf 186 }
6ed5d580 187 )->filter_json_single_key_object(
188 __scalar_ref__ => sub {
189 my $value = shift;
190 return \$value;
191 }
ed5a8a8e 192 )->filter_json_single_key_object(
193 __glob_ref__ => sub {
194 my $glob_container = $self->_id_to_remote_object(@_);
195 my $handle = Symbol::gensym;
196 tie *$handle, 'Object::Remote::GlobProxy', $glob_container;
197 return $handle;
198 }
6163a5aa 199 )->filter_json_single_key_object(
200 __local_object__ => sub {
201 $self->local_objects_by_id->{$_[0]}
202 }
7790ca36 203 )->filter_json_single_key_object(
204 __remote_tied_hash__ => sub {
37efeb68 205 my %tied_hash;
206 tie %tied_hash, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
207 return \%tied_hash;
7790ca36 208 }
209 )->filter_json_single_key_object(
210 __remote_tied_array__ => sub {
37efeb68 211 my @tied_array;
212 tie @tied_array, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
213 return \@tied_array;
7790ca36 214 }
215 );
9e72f0cf 216}
217
c824fdf3 218sub _load_if_possible {
55c0d020 219 my ($class) = @_;
c824fdf3 220
55c0d020 221 use_module($class);
c824fdf3 222
223 if ($@) {
224 log_debug { "Attempt at loading '$class' failed with '$@'" };
225 }
226
227}
228
84b04178 229BEGIN {
230 unshift our @Guess, sub { blessed($_[0]) ? $_[0] : undef };
c824fdf3 231 map _load_if_possible($_), qw(
232 Object::Remote::Connector::Local
233 Object::Remote::Connector::LocalSudo
234 Object::Remote::Connector::SSH
235 Object::Remote::Connector::UNIX
55c0d020 236 );
84b04178 237}
238
c824fdf3 239sub conn_from_spec {
240 my ($class, $spec, @args) = @_;
84b04178 241 foreach my $poss (do { our @Guess }) {
c824fdf3 242 if (my $conn = $poss->($spec, @args)) {
243 return $conn;
fbd3b8ec 244 }
84b04178 245 }
55c0d020 246
c824fdf3 247 return undef;
248}
249
250sub new_from_spec {
71087610 251 my ($class, $spec, @args) = @_;
c824fdf3 252 return $spec if blessed $spec;
71087610 253 my $conn = $class->conn_from_spec($spec, @args);
55c0d020 254
c824fdf3 255 die "Couldn't figure out what to do with ${spec}"
256 unless defined $conn;
55c0d020 257
258 return $conn->maybe::start::connect;
84b04178 259}
260
11dbd4a0 261sub remote_object {
e144d525 262 my ($self, @args) = @_;
263 Object::Remote::Handle->new(
264 connection => $self, @args
265 )->proxy;
266}
267
4c17fea5 268sub connect {
269 my ($self, $to) = @_;
9031635d 270 Dlog_debug { "Creating connection to remote node '$to' for connection $_" } $self->_id;
deb77aaf 271 return await_future(
272 $self->send_class_call(0, 'Object::Remote', connect => $to)
273 );
4c17fea5 274}
275
11dbd4a0 276sub remote_sub {
c6fe6fbd 277 my ($self, $sub) = @_;
278 my ($pkg, $name) = $sub =~ m/^(.*)::([^:]+)$/;
fb258df6 279 Dlog_debug { "Invoking remote sub '$sub' for connection '$_'" } $self->_id;
deb77aaf 280 return await_future($self->send_class_call(0, $pkg, can => $name));
281}
282
283sub send_class_call {
284 my ($self, $ctx, @call) = @_;
9031635d 285 Dlog_trace { "Sending a class call for connection $_" } $self->_id;
deb77aaf 286 $self->send(call => class_call_handler => $ctx => call => @call);
287}
288
289sub register_class_call_handler {
290 my ($self) = @_;
3687a42d 291 $self->local_objects_by_id->{'class_call_handler'} ||= do {
c5736e1c 292 my $o = $self->new_class_call_handler;
3687a42d 293 $self->_local_object_to_id($o);
294 $o;
295 };
c6fe6fbd 296}
297
c5736e1c 298sub new_class_call_handler {
299 Object::Remote::CodeContainer->new(
300 code => sub {
301 my ($class, $method) = (shift, shift);
302 use_module($class)->$method(@_);
303 }
304 );
305}
306
9e72f0cf 307sub register_remote {
308 my ($self, $remote) = @_;
9031635d 309 Dlog_trace { my $i = $remote->id; "Registered a remote object with id of '$i' for connection $_" } $self->_id;
9e72f0cf 310 weaken($self->remote_objects_by_id->{$remote->id} = $remote);
311 return $remote;
312}
313
314sub send_free {
315 my ($self, $id) = @_;
7790ca36 316 Dlog_trace { "sending request to free object '$id' for connection $_" } $self->_id;
302ecfbf 317 #TODO this shows up some times when a remote side dies in the middle of a remote
318 #method invocation - possibly only when the object is being constructed?
319 #(in cleanup) Use of uninitialized value $id in delete at ../Object-Remote/lib/Object/Remote/Connection.
9e72f0cf 320 delete $self->remote_objects_by_id->{$id};
321 $self->_send([ free => $id ]);
322}
323
324sub send {
325 my ($self, $type, @call) = @_;
326
deb77aaf 327 my $future = CPS::Future->new;
a2d43709 328 my $remote = $self->remote_objects_by_id->{$call[0]};
deb77aaf 329
330 unshift @call, $type => $self->_local_object_to_id($future);
9e72f0cf 331
a980b0b8 332 my $outstanding = $self->outstanding_futures;
333 $outstanding->{$future} = $future;
a2d43709 334 $future->on_ready(sub {
335 undef($remote);
336 delete $outstanding->{$future}
337 });
a980b0b8 338
9e72f0cf 339 $self->_send(\@call);
340
341 return $future;
342}
343
9d804009 344sub send_discard {
345 my ($self, $type, @call) = @_;
346
deb77aaf 347 unshift @call, $type => 'NULL';
9d804009 348
349 $self->_send(\@call);
350}
351
9e72f0cf 352sub _send {
353 my ($self, $to_send) = @_;
9d64d2d9 354 my $fh = $self->send_to_fh;
55c0d020 355
d2eadebb 356 unless ($self->is_valid) {
5add5e29 357 croak "Attempt to invoke _send on a connection that is not valid";
d2eadebb 358 }
55c0d020 359
9031635d 360 Dlog_trace { "Starting to serialize data in argument to _send for connection $_" } $self->_id;
9d64d2d9 361 my $serialized = $self->_serialize($to_send)."\n";
6b7b2732 362 Dlog_trace { my $l = length($serialized); "serialization is completed; sending '$l' characters of serialized data to $_" } $fh;
55c0d020 363 my $ret;
364 eval {
353556c4 365 #TODO this should be converted over to a non-blocking ::WriteChannel class
366 die "filehandle is not open" unless openhandle($fh);
367 log_trace { "file handle has passed openhandle() test; printing to it" };
368 $ret = print $fh $serialized;
369 die "print was not successful: $!" unless defined $ret
f8080c1c 370 };
55c0d020 371
f8080c1c 372 if ($@) {
353556c4 373 Dlog_debug { "exception encountered when trying to write to file handle $_: $@" } $fh;
998ff9a4 374 my $error = $@;
375 chomp($error);
353556c4 376 $self->on_close->done("could not write to file handle: $error") unless $self->on_close->is_ready;
55c0d020 377 return;
f8080c1c 378 }
55c0d020 379
380 return $ret;
9e72f0cf 381}
382
383sub _serialize {
384 my ($self, $data) = @_;
3687a42d 385 local our @New_Ids = (-1);
9d804009 386 return eval {
ad4f54b2 387 my $flat = $self->_encode($self->_deobjectify($data));
ad4f54b2 388 $flat;
70a578ac 389 } || do {
9d804009 390 my $err = $@; # won't get here if the eval doesn't die
391 # don't keep refs to new things
392 delete @{$self->local_objects_by_id}{@New_Ids};
393 die "Error serializing: $err";
394 };
9e72f0cf 395}
396
a76f2f60 397sub _local_object_to_id {
398 my ($self, $object) = @_;
399 my $id = refaddr($object);
400 $self->local_objects_by_id->{$id} ||= do {
3687a42d 401 push our(@New_Ids), $id if @New_Ids;
a76f2f60 402 $object;
403 };
404 return $id;
405}
406
9e72f0cf 407sub _deobjectify {
408 my ($self, $data) = @_;
409 if (blessed($data)) {
6163a5aa 410 if (
411 $data->isa('Object::Remote::Proxy')
412 and $data->{remote}->connection == $self
413 ) {
414 return +{ __local_object__ => $data->{remote}->id };
415 } else {
416 return +{ __remote_object__ => $self->_local_object_to_id($data) };
417 }
9e72f0cf 418 } elsif (my $ref = ref($data)) {
419 if ($ref eq 'HASH') {
7790ca36 420 my $tied_to = tied(%$data);
421 if(defined($tied_to)) {
55c0d020 422 return +{__remote_tied_hash__ => $self->_local_object_to_id($tied_to)};
7790ca36 423 } else {
424 return +{ map +($_ => $self->_deobjectify($data->{$_})), keys %$data };
425 }
9e72f0cf 426 } elsif ($ref eq 'ARRAY') {
7790ca36 427 my $tied_to = tied(@$data);
428 if (defined($tied_to)) {
55c0d020 429 return +{__remote_tied_array__ => $self->_local_object_to_id($tied_to)};
7790ca36 430 } else {
431 return [ map $self->_deobjectify($_), @$data ];
432 }
fe6c9a7f 433 } elsif ($ref eq 'CODE') {
434 my $id = $self->_local_object_to_id(
435 Object::Remote::CodeContainer->new(code => $data)
436 );
437 return +{ __remote_code__ => $id };
6ed5d580 438 } elsif ($ref eq 'SCALAR') {
439 return +{ __scalar_ref__ => $$data };
ed5a8a8e 440 } elsif ($ref eq 'GLOB') {
441 return +{ __glob_ref__ => $self->_local_object_to_id(
442 Object::Remote::GlobContainer->new(handle => $data)
443 ) };
9e72f0cf 444 } else {
445 die "Can't collapse reftype $ref";
446 }
447 }
448 return $data; # plain scalar
449}
450
9e72f0cf 451sub _receive {
ad4f54b2 452 my ($self, $flat) = @_;
9031635d 453 Dlog_trace { my $l = length($flat); "Starting to deserialize $l characters of data for connection $_" } $self->_id;
ad4f54b2 454 my ($type, @rest) = eval { @{$self->_deserialize($flat)} }
455 or do { warn "Deserialize failed for ${flat}: $@"; return };
9031635d 456 Dlog_trace { "deserialization complete for connection $_" } $self->_id;
9e72f0cf 457 eval { $self->${\"receive_${type}"}(@rest); 1 }
ad4f54b2 458 or do { warn "Receive failed for ${flat}: $@"; return };
9e72f0cf 459 return;
460}
461
462sub receive_free {
463 my ($self, $id) = @_;
9031635d 464 Dlog_trace { "got a receive_free for object '$id' for connection $_" } $self->_id;
9d804009 465 delete $self->local_objects_by_id->{$id}
466 or warn "Free: no such object $id";
9e72f0cf 467 return;
468}
469
470sub receive_call {
deb77aaf 471 my ($self, $future_id, $id, @rest) = @_;
9031635d 472 Dlog_trace { "got a receive_call for object '$id' for connection $_" } $self->_id;
deb77aaf 473 my $future = $self->_id_to_remote_object($future_id);
8131a88a 474 $future->{method} = 'call_discard_free';
9e72f0cf 475 my $local = $self->local_objects_by_id->{$id}
476 or do { $future->fail("No such object $id"); return };
477 $self->_invoke($future, $local, @rest);
478}
479
8131a88a 480sub receive_call_free {
481 my ($self, $future, $id, @rest) = @_;
9031635d 482 Dlog_trace { "got a receive_call_free for object '$id' for connection $_" } $self->_id;
84b04178 483 $self->receive_call($future, $id, undef, @rest);
8131a88a 484 $self->receive_free($id);
485}
486
9e72f0cf 487sub _invoke {
84b04178 488 my ($self, $future, $local, $ctx, $method, @args) = @_;
9031635d 489 Dlog_trace { "got _invoke for a method named '$method' for connection $_" } $self->_id;
dc28afe8 490 if ($method =~ /^start::/) {
491 my $f = $local->$method(@args);
492 $f->on_done(sub { undef($f); $future->done(@_) });
3f1f1e66 493 return unless $f;
dc28afe8 494 $f->on_fail(sub { undef($f); $future->fail(@_) });
495 return;
496 }
84b04178 497 my $do = sub { $local->$method(@args) };
498 eval {
499 $future->done(
500 defined($ctx)
501 ? ($ctx ? $do->() : scalar($do->()))
502 : do { $do->(); () }
503 );
504 1;
505 } or do { $future->fail($@); return; };
9e72f0cf 506 return;
507}
508
9e72f0cf 5091;
b9a9982d 510
511=head1 NAME
512
513Object::Remote::Connection - An underlying connection for L<Object::Remote>
514
b0ec7e3b 515 use Object::Remote;
55c0d020 516
b0ec7e3b 517 my %opts = (
518 nice => '10', ulimit => '-v 400000',
519 watchdog_timeout => 120, stderr => \*STDERR,
520 );
55c0d020 521
b0ec7e3b 522 my $local = Object::Remote->connect('-');
523 my $remote = Object::Remote->connect('myserver', nice => 5);
524 my $remote_user = Object::Remote->connect('user@myserver', %opts);
525 my $local_sudo = Object::Remote->connect('user@');
55c0d020 526
b0ec7e3b 527 #$remote can be any other connection object
528 my $hostname = Sys::Hostname->can::on($remote, 'hostname');
55c0d020 529
b0ec7e3b 530=head1 DESCRIPTION
531
532This is the class that supports connections to a Perl interpreter that is executed in a
533different process. The new Perl interpreter can be either on the local or a remote machine
534and is configurable via arguments passed to the constructor.
535
536=head1 ARGUMENTS
537
538=over 4
539
540=item nice
541
542If this value is defined then it will be used as the nice value of the Perl process when it
543is started. The default is the undefined value and will not nice the process.
544
545=item stderr
546
547If this value is defined then it will be used as the file handle that receives the output
548of STDERR from the Perl interpreter process and I/O will be performed by the run loop in a
549non-blocking way. If the value is undefined then STDERR of the remote process will be connected
550directly to STDERR of the local process with out the run loop managing I/O. The default value
551is undefined.
552
553There are a few ways to use this feature. By default the behavior is to form one unified STDERR
554across all of the Perl interpreters including the local one. For small scale and quick operation
555this offers a predictable and easy to use way to get at error messages generated anywhere. If
556the local Perl interpreter crashes then the remote Perl interpreters still have an active STDERR
557and it is possible to still receive output from them. This is generally a good thing but can
558cause issues.
559
560When using a file handle as the output for STDERR once the local Perl interpreter is no longer
561running there is no longer a valid STDERR for the remote interpreters to send data to. This means
562that it is no longer possible to receive error output from the remote interpreters and that the
563shell will start to kill off the child processes. Passing a reference to STDERR for the local
564interpreter (as the SYNOPSIS shows) causes the run loop to manage I/O, one unified STDERR for
565all Perl interpreters that ends as soon as the local interpreter process does, and the shell will
566start killing children when the local interpreter exits.
567
568It is also possible to pass in a file handle that has been opened for writing. This would be
569useful for logging the output of the remote interpreter directly into a dedicated file.
570
571=item ulimit
572
573If this string is defined then it will be passed unmodified as the arguments to ulimit when
574the Perl process is started. The default value is the undefined value and will not limit the
575process in any way.
576
577=item watchdog_timeout
578
579If this value is defined then it will be used as the number of seconds the watchdog will wait
580for an update before it terminates the Perl interpreter process. The default value is undefined
581and will not use the watchdog. See C<Object::Remote::Watchdog> for more information.
582
583=back
584
585=head1 SEE ALSO
586
587=over 4
588
589=item C<Object::Remote>
b9a9982d 590
b0ec7e3b 591=back
b9a9982d 592
593=cut