update logging api to match log-contextual 0.005
[scpubgit/Object-Remote.git] / lib / Object / Remote / Connection.pm
CommitLineData
9e72f0cf 1package Object::Remote::Connection;
2
f4a85080 3use Object::Remote::Logging qw (:log :dlog router);
dc28afe8 4use Object::Remote::Future;
9d804009 5use Object::Remote::Null;
676438a1 6use Object::Remote::Handle;
fe6c9a7f 7use Object::Remote::CodeContainer;
ed5a8a8e 8use Object::Remote::GlobProxy;
9use Object::Remote::GlobContainer;
7790ca36 10use Object::Remote::Tied;
9e72f0cf 11use Object::Remote;
ed5a8a8e 12use Symbol;
9e72f0cf 13use IO::Handle;
c824fdf3 14use POSIX ":sys_wait_h";
9e72f0cf 15use Module::Runtime qw(use_module);
f8080c1c 16use Scalar::Util qw(weaken blessed refaddr openhandle);
9e72f0cf 17use JSON::PP qw(encode_json);
18use Moo;
5add5e29 19use Carp qw(croak);
9e72f0cf 20
e1a0b9ca 21BEGIN { router()->exclude_forwarding }
c824fdf3 22
23END {
24 log_debug { "Killing all child processes in the process group" };
25
26 #send SIGINT to the process group for our children
27 kill(1, -2);
28}
29
9031635d 30has _id => ( is => 'ro', required => 1, default => sub { our $NEXT_CONNECTION_ID++ } );
ad4f54b2 31
9e72f0cf 32has send_to_fh => (
33 is => 'ro', required => 1,
90115979 34 trigger => sub {
8f43bcd9 35 my $self = $_[0];
36 $_[1]->autoflush(1);
37 Dlog_trace { my $id = $self->_id; "connection had send_to_fh set to $_" } $_[1];
90115979 38 },
9e72f0cf 39);
40
12fb4a80 41has read_channel => (
9e72f0cf 42 is => 'ro', required => 1,
43 trigger => sub {
12fb4a80 44 my ($self, $ch) = @_;
f8080c1c 45 my $id = $self->_id;
998ff9a4 46 Dlog_trace { "trigger for read_channel has been invoked for connection $id; file handle is $_" } $ch->fh;
9e72f0cf 47 weaken($self);
12fb4a80 48 $ch->on_line_call(sub { $self->_receive(@_) });
f8080c1c 49 $ch->on_close_call(sub {
998ff9a4 50 log_trace { "invoking 'done' on on_close handler for connection id '$id'" };
37efeb68 51 $self->on_close->done(@_);
f8080c1c 52 });
9e72f0cf 53 },
54);
55
12fb4a80 56has on_close => (
353556c4 57 is => 'rw', default => sub { $_[0]->_install_future_handlers(CPS::Future->new) },
998ff9a4 58 trigger => sub {
8f43bcd9 59 log_trace { "Installing handlers into future via trigger" };
60 $_[0]->_install_future_handlers($_[1])
998ff9a4 61 },
12fb4a80 62);
ad4f54b2 63
47c83a13 64has child_pid => (is => 'ro');
65
11dbd4a0 66has local_objects_by_id => (
67 is => 'ro', default => sub { {} },
68 coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
69);
9e72f0cf 70
11dbd4a0 71has remote_objects_by_id => (
72 is => 'ro', default => sub { {} },
73 coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
74);
9e72f0cf 75
a980b0b8 76has outstanding_futures => (is => 'ro', default => sub { {} });
77
353556c4 78has _json => (
79 is => 'lazy',
80 handles => {
81 _deserialize => 'decode',
82 _encode => 'encode',
83 },
84);
85
86after BUILD => sub {
998ff9a4 87 my ($self) = @_;
88 my $pid = $self->child_pid;
353556c4 89
998ff9a4 90 unless (defined $pid) {
8f43bcd9 91 log_trace { "After BUILD invoked for connection but there was no pid" };
92 return;
998ff9a4 93 }
94
95 log_trace { "Setting process group of child process '$pid'" };
353556c4 96
97 setpgrp($self->child_pid, 1);
98};
99
100sub BUILD { }
101
d2eadebb 102sub is_valid {
103 my ($self) = @_;
104 my $closed = $self->on_close->is_ready;
105
106 log_trace { "Connection closed: $closed" };
107 return ! $closed;
108}
109
12fb4a80 110sub _fail_outstanding {
111 my ($self, $error) = @_;
112 my $outstanding = $self->outstanding_futures;
d2eadebb 113
114 Dlog_debug {
115 sprintf "Failing %i outstanding futures with '$error'", scalar(keys(%$outstanding))
116 };
117
118 foreach(keys(%$outstanding)) {
119 log_trace { "Failing future for $_" };
120 my $future = $outstanding->{$_};
867e4de5 121 $future->fail("$error\n");
d2eadebb 122 }
123
12fb4a80 124 %$outstanding = ();
125 return;
126}
127
353556c4 128sub _install_future_handlers {
129 my ($self, $f) = @_;
998ff9a4 130 Dlog_trace { "Installing handlers into future for connection $_" } $self->_id;
353556c4 131 weaken($self);
132 $f->on_done(sub {
998ff9a4 133 my $pid = $self->child_pid;
134 Dlog_trace { "Executing on_done handler in future for connection $_" } $self->_id;
353556c4 135 $self->_fail_outstanding("Object::Remote connection lost: " . ($f->get)[0]);
998ff9a4 136 return unless defined $pid;
137 log_debug { "Waiting for child '$pid' to exit" };
138 my $ret = waitpid($pid, 0);
139 if ($ret != $pid) {
140 log_debug { "Waited for pid $pid but waitpid() returned $ret" };
141 return;
142 } elsif ($? & 127) {
143 log_warn { "Remote interpreter did not exit cleanly" };
144 } else {
145 log_verbose {
146 my $exit_value = $? >> 8;
147 "Remote Perl interpreter exited with value '$exit_value'"
148 };
149 }
353556c4 150 });
151 return $f;
152};
9e72f0cf 153
fe6c9a7f 154sub _id_to_remote_object {
155 my ($self, $id) = @_;
9031635d 156 Dlog_trace { "fetching proxy for remote object with id '$id' for connection $_" } $self->_id;
fe6c9a7f 157 return bless({}, 'Object::Remote::Null') if $id eq 'NULL';
158 (
159 $self->remote_objects_by_id->{$id}
160 or Object::Remote::Handle->new(connection => $self, id => $id)
161 )->proxy;
162}
163
9e72f0cf 164sub _build__json {
165 weaken(my $self = shift);
9e72f0cf 166 JSON::PP->new->filter_json_single_key_object(
167 __remote_object__ => sub {
fe6c9a7f 168 $self->_id_to_remote_object(@_);
169 }
170 )->filter_json_single_key_object(
171 __remote_code__ => sub {
172 my $code_container = $self->_id_to_remote_object(@_);
173 sub { $code_container->call(@_) };
9e72f0cf 174 }
6ed5d580 175 )->filter_json_single_key_object(
176 __scalar_ref__ => sub {
177 my $value = shift;
178 return \$value;
179 }
ed5a8a8e 180 )->filter_json_single_key_object(
181 __glob_ref__ => sub {
182 my $glob_container = $self->_id_to_remote_object(@_);
183 my $handle = Symbol::gensym;
184 tie *$handle, 'Object::Remote::GlobProxy', $glob_container;
185 return $handle;
186 }
6163a5aa 187 )->filter_json_single_key_object(
188 __local_object__ => sub {
189 $self->local_objects_by_id->{$_[0]}
190 }
7790ca36 191 )->filter_json_single_key_object(
192 __remote_tied_hash__ => sub {
37efeb68 193 my %tied_hash;
194 tie %tied_hash, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
195 return \%tied_hash;
7790ca36 196 }
197 )->filter_json_single_key_object(
198 __remote_tied_array__ => sub {
37efeb68 199 my @tied_array;
200 tie @tied_array, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
201 return \@tied_array;
7790ca36 202 }
203 );
9e72f0cf 204}
205
c824fdf3 206sub _load_if_possible {
207 my ($class) = @_;
208
624072a8 209 use_module($class);
c824fdf3 210
211 if ($@) {
212 log_debug { "Attempt at loading '$class' failed with '$@'" };
213 }
214
215}
216
84b04178 217BEGIN {
218 unshift our @Guess, sub { blessed($_[0]) ? $_[0] : undef };
c824fdf3 219 map _load_if_possible($_), qw(
220 Object::Remote::Connector::Local
221 Object::Remote::Connector::LocalSudo
222 Object::Remote::Connector::SSH
223 Object::Remote::Connector::UNIX
224 );
84b04178 225}
226
c824fdf3 227sub conn_from_spec {
228 my ($class, $spec, @args) = @_;
84b04178 229 foreach my $poss (do { our @Guess }) {
c824fdf3 230 if (my $conn = $poss->($spec, @args)) {
231 return $conn;
fbd3b8ec 232 }
84b04178 233 }
c824fdf3 234
235 return undef;
236}
237
238sub new_from_spec {
71087610 239 my ($class, $spec, @args) = @_;
c824fdf3 240 return $spec if blessed $spec;
71087610 241 my $conn = $class->conn_from_spec($spec, @args);
c824fdf3 242
243 die "Couldn't figure out what to do with ${spec}"
244 unless defined $conn;
245
246 return $conn->maybe::start::connect;
84b04178 247}
248
11dbd4a0 249sub remote_object {
e144d525 250 my ($self, @args) = @_;
251 Object::Remote::Handle->new(
252 connection => $self, @args
253 )->proxy;
254}
255
4c17fea5 256sub connect {
257 my ($self, $to) = @_;
9031635d 258 Dlog_debug { "Creating connection to remote node '$to' for connection $_" } $self->_id;
deb77aaf 259 return await_future(
260 $self->send_class_call(0, 'Object::Remote', connect => $to)
261 );
4c17fea5 262}
263
11dbd4a0 264sub remote_sub {
c6fe6fbd 265 my ($self, $sub) = @_;
266 my ($pkg, $name) = $sub =~ m/^(.*)::([^:]+)$/;
9031635d 267 Dlog_debug { "Invoking remote sub '$sub' for connection $_" } $self->_id;
deb77aaf 268 return await_future($self->send_class_call(0, $pkg, can => $name));
269}
270
271sub send_class_call {
272 my ($self, $ctx, @call) = @_;
9031635d 273 Dlog_trace { "Sending a class call for connection $_" } $self->_id;
deb77aaf 274 $self->send(call => class_call_handler => $ctx => call => @call);
275}
276
277sub register_class_call_handler {
278 my ($self) = @_;
3687a42d 279 $self->local_objects_by_id->{'class_call_handler'} ||= do {
c5736e1c 280 my $o = $self->new_class_call_handler;
3687a42d 281 $self->_local_object_to_id($o);
282 $o;
283 };
c6fe6fbd 284}
285
c5736e1c 286sub new_class_call_handler {
287 Object::Remote::CodeContainer->new(
288 code => sub {
289 my ($class, $method) = (shift, shift);
290 use_module($class)->$method(@_);
291 }
292 );
293}
294
9e72f0cf 295sub register_remote {
296 my ($self, $remote) = @_;
9031635d 297 Dlog_trace { my $i = $remote->id; "Registered a remote object with id of '$i' for connection $_" } $self->_id;
9e72f0cf 298 weaken($self->remote_objects_by_id->{$remote->id} = $remote);
299 return $remote;
300}
301
302sub send_free {
303 my ($self, $id) = @_;
7790ca36 304 Dlog_trace { "sending request to free object '$id' for connection $_" } $self->_id;
302ecfbf 305 #TODO this shows up some times when a remote side dies in the middle of a remote
306 #method invocation - possibly only when the object is being constructed?
307 #(in cleanup) Use of uninitialized value $id in delete at ../Object-Remote/lib/Object/Remote/Connection.
9e72f0cf 308 delete $self->remote_objects_by_id->{$id};
309 $self->_send([ free => $id ]);
310}
311
312sub send {
313 my ($self, $type, @call) = @_;
314
deb77aaf 315 my $future = CPS::Future->new;
a2d43709 316 my $remote = $self->remote_objects_by_id->{$call[0]};
deb77aaf 317
318 unshift @call, $type => $self->_local_object_to_id($future);
9e72f0cf 319
a980b0b8 320 my $outstanding = $self->outstanding_futures;
321 $outstanding->{$future} = $future;
a2d43709 322 $future->on_ready(sub {
323 undef($remote);
324 delete $outstanding->{$future}
325 });
a980b0b8 326
9e72f0cf 327 $self->_send(\@call);
328
329 return $future;
330}
331
9d804009 332sub send_discard {
333 my ($self, $type, @call) = @_;
334
deb77aaf 335 unshift @call, $type => 'NULL';
9d804009 336
337 $self->_send(\@call);
338}
339
9e72f0cf 340sub _send {
341 my ($self, $to_send) = @_;
9d64d2d9 342 my $fh = $self->send_to_fh;
d2eadebb 343
344 unless ($self->is_valid) {
5add5e29 345 croak "Attempt to invoke _send on a connection that is not valid";
d2eadebb 346 }
347
9031635d 348 Dlog_trace { "Starting to serialize data in argument to _send for connection $_" } $self->_id;
9d64d2d9 349 my $serialized = $self->_serialize($to_send)."\n";
6b7b2732 350 Dlog_trace { my $l = length($serialized); "serialization is completed; sending '$l' characters of serialized data to $_" } $fh;
f8080c1c 351 my $ret;
352 eval {
353556c4 353 #TODO this should be converted over to a non-blocking ::WriteChannel class
354 die "filehandle is not open" unless openhandle($fh);
355 log_trace { "file handle has passed openhandle() test; printing to it" };
356 $ret = print $fh $serialized;
357 die "print was not successful: $!" unless defined $ret
f8080c1c 358 };
c824fdf3 359
f8080c1c 360 if ($@) {
353556c4 361 Dlog_debug { "exception encountered when trying to write to file handle $_: $@" } $fh;
998ff9a4 362 my $error = $@;
363 chomp($error);
353556c4 364 $self->on_close->done("could not write to file handle: $error") unless $self->on_close->is_ready;
365 return;
f8080c1c 366 }
367
9d64d2d9 368 return $ret;
9e72f0cf 369}
370
371sub _serialize {
372 my ($self, $data) = @_;
3687a42d 373 local our @New_Ids = (-1);
9d804009 374 return eval {
ad4f54b2 375 my $flat = $self->_encode($self->_deobjectify($data));
ad4f54b2 376 $flat;
70a578ac 377 } || do {
9d804009 378 my $err = $@; # won't get here if the eval doesn't die
379 # don't keep refs to new things
380 delete @{$self->local_objects_by_id}{@New_Ids};
381 die "Error serializing: $err";
382 };
9e72f0cf 383}
384
a76f2f60 385sub _local_object_to_id {
386 my ($self, $object) = @_;
387 my $id = refaddr($object);
388 $self->local_objects_by_id->{$id} ||= do {
3687a42d 389 push our(@New_Ids), $id if @New_Ids;
a76f2f60 390 $object;
391 };
392 return $id;
393}
394
9e72f0cf 395sub _deobjectify {
396 my ($self, $data) = @_;
397 if (blessed($data)) {
6163a5aa 398 if (
399 $data->isa('Object::Remote::Proxy')
400 and $data->{remote}->connection == $self
401 ) {
402 return +{ __local_object__ => $data->{remote}->id };
403 } else {
404 return +{ __remote_object__ => $self->_local_object_to_id($data) };
405 }
9e72f0cf 406 } elsif (my $ref = ref($data)) {
407 if ($ref eq 'HASH') {
7790ca36 408 my $tied_to = tied(%$data);
409 if(defined($tied_to)) {
410 return +{__remote_tied_hash__ => $self->_local_object_to_id($tied_to)};
411 } else {
412 return +{ map +($_ => $self->_deobjectify($data->{$_})), keys %$data };
413 }
9e72f0cf 414 } elsif ($ref eq 'ARRAY') {
7790ca36 415 my $tied_to = tied(@$data);
416 if (defined($tied_to)) {
417 return +{__remote_tied_array__ => $self->_local_object_to_id($tied_to)};
418 } else {
419 return [ map $self->_deobjectify($_), @$data ];
420 }
fe6c9a7f 421 } elsif ($ref eq 'CODE') {
422 my $id = $self->_local_object_to_id(
423 Object::Remote::CodeContainer->new(code => $data)
424 );
425 return +{ __remote_code__ => $id };
6ed5d580 426 } elsif ($ref eq 'SCALAR') {
427 return +{ __scalar_ref__ => $$data };
ed5a8a8e 428 } elsif ($ref eq 'GLOB') {
429 return +{ __glob_ref__ => $self->_local_object_to_id(
430 Object::Remote::GlobContainer->new(handle => $data)
431 ) };
9e72f0cf 432 } else {
433 die "Can't collapse reftype $ref";
434 }
435 }
436 return $data; # plain scalar
437}
438
9e72f0cf 439sub _receive {
ad4f54b2 440 my ($self, $flat) = @_;
9031635d 441 Dlog_trace { my $l = length($flat); "Starting to deserialize $l characters of data for connection $_" } $self->_id;
ad4f54b2 442 my ($type, @rest) = eval { @{$self->_deserialize($flat)} }
443 or do { warn "Deserialize failed for ${flat}: $@"; return };
9031635d 444 Dlog_trace { "deserialization complete for connection $_" } $self->_id;
9e72f0cf 445 eval { $self->${\"receive_${type}"}(@rest); 1 }
ad4f54b2 446 or do { warn "Receive failed for ${flat}: $@"; return };
9e72f0cf 447 return;
448}
449
450sub receive_free {
451 my ($self, $id) = @_;
9031635d 452 Dlog_trace { "got a receive_free for object '$id' for connection $_" } $self->_id;
9d804009 453 delete $self->local_objects_by_id->{$id}
454 or warn "Free: no such object $id";
9e72f0cf 455 return;
456}
457
458sub receive_call {
deb77aaf 459 my ($self, $future_id, $id, @rest) = @_;
9031635d 460 Dlog_trace { "got a receive_call for object '$id' for connection $_" } $self->_id;
deb77aaf 461 my $future = $self->_id_to_remote_object($future_id);
8131a88a 462 $future->{method} = 'call_discard_free';
9e72f0cf 463 my $local = $self->local_objects_by_id->{$id}
464 or do { $future->fail("No such object $id"); return };
465 $self->_invoke($future, $local, @rest);
466}
467
8131a88a 468sub receive_call_free {
469 my ($self, $future, $id, @rest) = @_;
9031635d 470 Dlog_trace { "got a receive_call_free for object '$id' for connection $_" } $self->_id;
84b04178 471 $self->receive_call($future, $id, undef, @rest);
8131a88a 472 $self->receive_free($id);
473}
474
9e72f0cf 475sub _invoke {
84b04178 476 my ($self, $future, $local, $ctx, $method, @args) = @_;
9031635d 477 Dlog_trace { "got _invoke for a method named '$method' for connection $_" } $self->_id;
dc28afe8 478 if ($method =~ /^start::/) {
479 my $f = $local->$method(@args);
480 $f->on_done(sub { undef($f); $future->done(@_) });
3f1f1e66 481 return unless $f;
dc28afe8 482 $f->on_fail(sub { undef($f); $future->fail(@_) });
483 return;
484 }
84b04178 485 my $do = sub { $local->$method(@args) };
486 eval {
487 $future->done(
488 defined($ctx)
489 ? ($ctx ? $do->() : scalar($do->()))
490 : do { $do->(); () }
491 );
492 1;
493 } or do { $future->fail($@); return; };
9e72f0cf 494 return;
495}
496
9e72f0cf 4971;
b9a9982d 498
499=head1 NAME
500
501Object::Remote::Connection - An underlying connection for L<Object::Remote>
502
b0ec7e3b 503 use Object::Remote;
504
505 my %opts = (
506 nice => '10', ulimit => '-v 400000',
507 watchdog_timeout => 120, stderr => \*STDERR,
508 );
509
510 my $local = Object::Remote->connect('-');
511 my $remote = Object::Remote->connect('myserver', nice => 5);
512 my $remote_user = Object::Remote->connect('user@myserver', %opts);
513 my $local_sudo = Object::Remote->connect('user@');
514
515 #$remote can be any other connection object
516 my $hostname = Sys::Hostname->can::on($remote, 'hostname');
517
518=head1 DESCRIPTION
519
520This is the class that supports connections to a Perl interpreter that is executed in a
521different process. The new Perl interpreter can be either on the local or a remote machine
522and is configurable via arguments passed to the constructor.
523
524=head1 ARGUMENTS
525
526=over 4
527
528=item nice
529
530If this value is defined then it will be used as the nice value of the Perl process when it
531is started. The default is the undefined value and will not nice the process.
532
533=item stderr
534
535If this value is defined then it will be used as the file handle that receives the output
536of STDERR from the Perl interpreter process and I/O will be performed by the run loop in a
537non-blocking way. If the value is undefined then STDERR of the remote process will be connected
538directly to STDERR of the local process with out the run loop managing I/O. The default value
539is undefined.
540
541There are a few ways to use this feature. By default the behavior is to form one unified STDERR
542across all of the Perl interpreters including the local one. For small scale and quick operation
543this offers a predictable and easy to use way to get at error messages generated anywhere. If
544the local Perl interpreter crashes then the remote Perl interpreters still have an active STDERR
545and it is possible to still receive output from them. This is generally a good thing but can
546cause issues.
547
548When using a file handle as the output for STDERR once the local Perl interpreter is no longer
549running there is no longer a valid STDERR for the remote interpreters to send data to. This means
550that it is no longer possible to receive error output from the remote interpreters and that the
551shell will start to kill off the child processes. Passing a reference to STDERR for the local
552interpreter (as the SYNOPSIS shows) causes the run loop to manage I/O, one unified STDERR for
553all Perl interpreters that ends as soon as the local interpreter process does, and the shell will
554start killing children when the local interpreter exits.
555
556It is also possible to pass in a file handle that has been opened for writing. This would be
557useful for logging the output of the remote interpreter directly into a dedicated file.
558
559=item ulimit
560
561If this string is defined then it will be passed unmodified as the arguments to ulimit when
562the Perl process is started. The default value is the undefined value and will not limit the
563process in any way.
564
565=item watchdog_timeout
566
567If this value is defined then it will be used as the number of seconds the watchdog will wait
568for an update before it terminates the Perl interpreter process. The default value is undefined
569and will not use the watchdog. See C<Object::Remote::Watchdog> for more information.
570
571=back
572
573=head1 SEE ALSO
574
575=over 4
576
577=item C<Object::Remote>
b9a9982d 578
b0ec7e3b 579=back
b9a9982d 580
581=cut