removed the baked in ulimit and nice code, use custom perl_command instead
[scpubgit/Object-Remote.git] / lib / Object / Remote / Connection.pm
CommitLineData
9e72f0cf 1package Object::Remote::Connection;
2
f4a85080 3use Object::Remote::Logging qw (:log :dlog router);
dc28afe8 4use Object::Remote::Future;
9d804009 5use Object::Remote::Null;
676438a1 6use Object::Remote::Handle;
fe6c9a7f 7use Object::Remote::CodeContainer;
ed5a8a8e 8use Object::Remote::GlobProxy;
9use Object::Remote::GlobContainer;
7790ca36 10use Object::Remote::Tied;
9e72f0cf 11use Object::Remote;
ed5a8a8e 12use Symbol;
9e72f0cf 13use IO::Handle;
c824fdf3 14use POSIX ":sys_wait_h";
9e72f0cf 15use Module::Runtime qw(use_module);
f8080c1c 16use Scalar::Util qw(weaken blessed refaddr openhandle);
9e72f0cf 17use JSON::PP qw(encode_json);
18use Moo;
5add5e29 19use Carp qw(croak);
9e72f0cf 20
e1a0b9ca 21BEGIN { router()->exclude_forwarding }
c824fdf3 22
23END {
24 log_debug { "Killing all child processes in the process group" };
25
eb351344 26 #FIXME update along with setpgrp() to not use a process
27 #group anymore
28
c824fdf3 29 #send SIGINT to the process group for our children
30 kill(1, -2);
31}
32
9031635d 33has _id => ( is => 'ro', required => 1, default => sub { our $NEXT_CONNECTION_ID++ } );
ad4f54b2 34
9e72f0cf 35has send_to_fh => (
36 is => 'ro', required => 1,
90115979 37 trigger => sub {
8f43bcd9 38 my $self = $_[0];
39 $_[1]->autoflush(1);
40 Dlog_trace { my $id = $self->_id; "connection had send_to_fh set to $_" } $_[1];
90115979 41 },
9e72f0cf 42);
43
12fb4a80 44has read_channel => (
9e72f0cf 45 is => 'ro', required => 1,
46 trigger => sub {
12fb4a80 47 my ($self, $ch) = @_;
f8080c1c 48 my $id = $self->_id;
998ff9a4 49 Dlog_trace { "trigger for read_channel has been invoked for connection $id; file handle is $_" } $ch->fh;
9e72f0cf 50 weaken($self);
12fb4a80 51 $ch->on_line_call(sub { $self->_receive(@_) });
f8080c1c 52 $ch->on_close_call(sub {
998ff9a4 53 log_trace { "invoking 'done' on on_close handler for connection id '$id'" };
37efeb68 54 $self->on_close->done(@_);
f8080c1c 55 });
9e72f0cf 56 },
57);
58
12fb4a80 59has on_close => (
353556c4 60 is => 'rw', default => sub { $_[0]->_install_future_handlers(CPS::Future->new) },
998ff9a4 61 trigger => sub {
8f43bcd9 62 log_trace { "Installing handlers into future via trigger" };
63 $_[0]->_install_future_handlers($_[1])
998ff9a4 64 },
12fb4a80 65);
ad4f54b2 66
47c83a13 67has child_pid => (is => 'ro');
68
11dbd4a0 69has local_objects_by_id => (
70 is => 'ro', default => sub { {} },
71 coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
72);
9e72f0cf 73
11dbd4a0 74has remote_objects_by_id => (
75 is => 'ro', default => sub { {} },
76 coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
77);
9e72f0cf 78
a980b0b8 79has outstanding_futures => (is => 'ro', default => sub { {} });
80
353556c4 81has _json => (
82 is => 'lazy',
83 handles => {
84 _deserialize => 'decode',
85 _encode => 'encode',
86 },
87);
88
89after BUILD => sub {
998ff9a4 90 my ($self) = @_;
91 my $pid = $self->child_pid;
353556c4 92
998ff9a4 93 unless (defined $pid) {
8f43bcd9 94 log_trace { "After BUILD invoked for connection but there was no pid" };
95 return;
998ff9a4 96 }
97
98 log_trace { "Setting process group of child process '$pid'" };
353556c4 99
eb351344 100 #FIXME moving things into a process group has side effects for
101 #users of the library - move to a list
353556c4 102 setpgrp($self->child_pid, 1);
103};
104
105sub BUILD { }
106
d2eadebb 107sub is_valid {
108 my ($self) = @_;
fb258df6 109 my $valid = ! $self->on_close->is_ready;
d2eadebb 110
fb258df6 111 log_trace {
112 my $id = $self->_id;
113 my $text;
114 if ($valid) {
115 $text = 'yes';
116 } else {
117 $text = 'no';
118 }
119 "Connection '$id' is valid: '$text'"
120 };
121
122 return $valid;
d2eadebb 123}
124
12fb4a80 125sub _fail_outstanding {
126 my ($self, $error) = @_;
127 my $outstanding = $self->outstanding_futures;
d2eadebb 128
129 Dlog_debug {
130 sprintf "Failing %i outstanding futures with '$error'", scalar(keys(%$outstanding))
131 };
132
133 foreach(keys(%$outstanding)) {
134 log_trace { "Failing future for $_" };
135 my $future = $outstanding->{$_};
867e4de5 136 $future->fail("$error\n");
d2eadebb 137 }
138
12fb4a80 139 %$outstanding = ();
140 return;
141}
142
353556c4 143sub _install_future_handlers {
144 my ($self, $f) = @_;
998ff9a4 145 Dlog_trace { "Installing handlers into future for connection $_" } $self->_id;
353556c4 146 weaken($self);
147 $f->on_done(sub {
998ff9a4 148 my $pid = $self->child_pid;
149 Dlog_trace { "Executing on_done handler in future for connection $_" } $self->_id;
353556c4 150 $self->_fail_outstanding("Object::Remote connection lost: " . ($f->get)[0]);
998ff9a4 151 return unless defined $pid;
152 log_debug { "Waiting for child '$pid' to exit" };
153 my $ret = waitpid($pid, 0);
154 if ($ret != $pid) {
155 log_debug { "Waited for pid $pid but waitpid() returned $ret" };
156 return;
157 } elsif ($? & 127) {
158 log_warn { "Remote interpreter did not exit cleanly" };
159 } else {
160 log_verbose {
161 my $exit_value = $? >> 8;
162 "Remote Perl interpreter exited with value '$exit_value'"
163 };
164 }
353556c4 165 });
166 return $f;
167};
9e72f0cf 168
fe6c9a7f 169sub _id_to_remote_object {
170 my ($self, $id) = @_;
9031635d 171 Dlog_trace { "fetching proxy for remote object with id '$id' for connection $_" } $self->_id;
fe6c9a7f 172 return bless({}, 'Object::Remote::Null') if $id eq 'NULL';
173 (
174 $self->remote_objects_by_id->{$id}
175 or Object::Remote::Handle->new(connection => $self, id => $id)
176 )->proxy;
177}
178
9e72f0cf 179sub _build__json {
180 weaken(my $self = shift);
9e72f0cf 181 JSON::PP->new->filter_json_single_key_object(
182 __remote_object__ => sub {
fe6c9a7f 183 $self->_id_to_remote_object(@_);
184 }
185 )->filter_json_single_key_object(
186 __remote_code__ => sub {
187 my $code_container = $self->_id_to_remote_object(@_);
188 sub { $code_container->call(@_) };
9e72f0cf 189 }
6ed5d580 190 )->filter_json_single_key_object(
191 __scalar_ref__ => sub {
192 my $value = shift;
193 return \$value;
194 }
ed5a8a8e 195 )->filter_json_single_key_object(
196 __glob_ref__ => sub {
197 my $glob_container = $self->_id_to_remote_object(@_);
198 my $handle = Symbol::gensym;
199 tie *$handle, 'Object::Remote::GlobProxy', $glob_container;
200 return $handle;
201 }
6163a5aa 202 )->filter_json_single_key_object(
203 __local_object__ => sub {
204 $self->local_objects_by_id->{$_[0]}
205 }
7790ca36 206 )->filter_json_single_key_object(
207 __remote_tied_hash__ => sub {
37efeb68 208 my %tied_hash;
209 tie %tied_hash, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
210 return \%tied_hash;
7790ca36 211 }
212 )->filter_json_single_key_object(
213 __remote_tied_array__ => sub {
37efeb68 214 my @tied_array;
215 tie @tied_array, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
216 return \@tied_array;
7790ca36 217 }
218 );
9e72f0cf 219}
220
c824fdf3 221sub _load_if_possible {
222 my ($class) = @_;
223
624072a8 224 use_module($class);
c824fdf3 225
226 if ($@) {
227 log_debug { "Attempt at loading '$class' failed with '$@'" };
228 }
229
230}
231
84b04178 232BEGIN {
233 unshift our @Guess, sub { blessed($_[0]) ? $_[0] : undef };
c824fdf3 234 map _load_if_possible($_), qw(
235 Object::Remote::Connector::Local
236 Object::Remote::Connector::LocalSudo
237 Object::Remote::Connector::SSH
238 Object::Remote::Connector::UNIX
239 );
84b04178 240}
241
c824fdf3 242sub conn_from_spec {
243 my ($class, $spec, @args) = @_;
84b04178 244 foreach my $poss (do { our @Guess }) {
c824fdf3 245 if (my $conn = $poss->($spec, @args)) {
246 return $conn;
fbd3b8ec 247 }
84b04178 248 }
c824fdf3 249
250 return undef;
251}
252
253sub new_from_spec {
71087610 254 my ($class, $spec, @args) = @_;
c824fdf3 255 return $spec if blessed $spec;
71087610 256 my $conn = $class->conn_from_spec($spec, @args);
c824fdf3 257
258 die "Couldn't figure out what to do with ${spec}"
259 unless defined $conn;
260
261 return $conn->maybe::start::connect;
84b04178 262}
263
11dbd4a0 264sub remote_object {
e144d525 265 my ($self, @args) = @_;
266 Object::Remote::Handle->new(
267 connection => $self, @args
268 )->proxy;
269}
270
4c17fea5 271sub connect {
272 my ($self, $to) = @_;
9031635d 273 Dlog_debug { "Creating connection to remote node '$to' for connection $_" } $self->_id;
deb77aaf 274 return await_future(
275 $self->send_class_call(0, 'Object::Remote', connect => $to)
276 );
4c17fea5 277}
278
11dbd4a0 279sub remote_sub {
c6fe6fbd 280 my ($self, $sub) = @_;
281 my ($pkg, $name) = $sub =~ m/^(.*)::([^:]+)$/;
fb258df6 282 Dlog_debug { "Invoking remote sub '$sub' for connection '$_'" } $self->_id;
deb77aaf 283 return await_future($self->send_class_call(0, $pkg, can => $name));
284}
285
286sub send_class_call {
287 my ($self, $ctx, @call) = @_;
9031635d 288 Dlog_trace { "Sending a class call for connection $_" } $self->_id;
deb77aaf 289 $self->send(call => class_call_handler => $ctx => call => @call);
290}
291
292sub register_class_call_handler {
293 my ($self) = @_;
3687a42d 294 $self->local_objects_by_id->{'class_call_handler'} ||= do {
c5736e1c 295 my $o = $self->new_class_call_handler;
3687a42d 296 $self->_local_object_to_id($o);
297 $o;
298 };
c6fe6fbd 299}
300
c5736e1c 301sub new_class_call_handler {
302 Object::Remote::CodeContainer->new(
303 code => sub {
304 my ($class, $method) = (shift, shift);
305 use_module($class)->$method(@_);
306 }
307 );
308}
309
9e72f0cf 310sub register_remote {
311 my ($self, $remote) = @_;
9031635d 312 Dlog_trace { my $i = $remote->id; "Registered a remote object with id of '$i' for connection $_" } $self->_id;
9e72f0cf 313 weaken($self->remote_objects_by_id->{$remote->id} = $remote);
314 return $remote;
315}
316
317sub send_free {
318 my ($self, $id) = @_;
7790ca36 319 Dlog_trace { "sending request to free object '$id' for connection $_" } $self->_id;
302ecfbf 320 #TODO this shows up some times when a remote side dies in the middle of a remote
321 #method invocation - possibly only when the object is being constructed?
322 #(in cleanup) Use of uninitialized value $id in delete at ../Object-Remote/lib/Object/Remote/Connection.
9e72f0cf 323 delete $self->remote_objects_by_id->{$id};
324 $self->_send([ free => $id ]);
325}
326
327sub send {
328 my ($self, $type, @call) = @_;
329
deb77aaf 330 my $future = CPS::Future->new;
a2d43709 331 my $remote = $self->remote_objects_by_id->{$call[0]};
deb77aaf 332
333 unshift @call, $type => $self->_local_object_to_id($future);
9e72f0cf 334
a980b0b8 335 my $outstanding = $self->outstanding_futures;
336 $outstanding->{$future} = $future;
a2d43709 337 $future->on_ready(sub {
338 undef($remote);
339 delete $outstanding->{$future}
340 });
a980b0b8 341
9e72f0cf 342 $self->_send(\@call);
343
344 return $future;
345}
346
9d804009 347sub send_discard {
348 my ($self, $type, @call) = @_;
349
deb77aaf 350 unshift @call, $type => 'NULL';
9d804009 351
352 $self->_send(\@call);
353}
354
9e72f0cf 355sub _send {
356 my ($self, $to_send) = @_;
9d64d2d9 357 my $fh = $self->send_to_fh;
d2eadebb 358
359 unless ($self->is_valid) {
5add5e29 360 croak "Attempt to invoke _send on a connection that is not valid";
d2eadebb 361 }
362
9031635d 363 Dlog_trace { "Starting to serialize data in argument to _send for connection $_" } $self->_id;
9d64d2d9 364 my $serialized = $self->_serialize($to_send)."\n";
6b7b2732 365 Dlog_trace { my $l = length($serialized); "serialization is completed; sending '$l' characters of serialized data to $_" } $fh;
f8080c1c 366 my $ret;
367 eval {
353556c4 368 #TODO this should be converted over to a non-blocking ::WriteChannel class
369 die "filehandle is not open" unless openhandle($fh);
370 log_trace { "file handle has passed openhandle() test; printing to it" };
371 $ret = print $fh $serialized;
372 die "print was not successful: $!" unless defined $ret
f8080c1c 373 };
c824fdf3 374
f8080c1c 375 if ($@) {
353556c4 376 Dlog_debug { "exception encountered when trying to write to file handle $_: $@" } $fh;
998ff9a4 377 my $error = $@;
378 chomp($error);
353556c4 379 $self->on_close->done("could not write to file handle: $error") unless $self->on_close->is_ready;
380 return;
f8080c1c 381 }
382
9d64d2d9 383 return $ret;
9e72f0cf 384}
385
386sub _serialize {
387 my ($self, $data) = @_;
3687a42d 388 local our @New_Ids = (-1);
9d804009 389 return eval {
ad4f54b2 390 my $flat = $self->_encode($self->_deobjectify($data));
ad4f54b2 391 $flat;
70a578ac 392 } || do {
9d804009 393 my $err = $@; # won't get here if the eval doesn't die
394 # don't keep refs to new things
395 delete @{$self->local_objects_by_id}{@New_Ids};
396 die "Error serializing: $err";
397 };
9e72f0cf 398}
399
a76f2f60 400sub _local_object_to_id {
401 my ($self, $object) = @_;
402 my $id = refaddr($object);
403 $self->local_objects_by_id->{$id} ||= do {
3687a42d 404 push our(@New_Ids), $id if @New_Ids;
a76f2f60 405 $object;
406 };
407 return $id;
408}
409
9e72f0cf 410sub _deobjectify {
411 my ($self, $data) = @_;
412 if (blessed($data)) {
6163a5aa 413 if (
414 $data->isa('Object::Remote::Proxy')
415 and $data->{remote}->connection == $self
416 ) {
417 return +{ __local_object__ => $data->{remote}->id };
418 } else {
419 return +{ __remote_object__ => $self->_local_object_to_id($data) };
420 }
9e72f0cf 421 } elsif (my $ref = ref($data)) {
422 if ($ref eq 'HASH') {
7790ca36 423 my $tied_to = tied(%$data);
424 if(defined($tied_to)) {
425 return +{__remote_tied_hash__ => $self->_local_object_to_id($tied_to)};
426 } else {
427 return +{ map +($_ => $self->_deobjectify($data->{$_})), keys %$data };
428 }
9e72f0cf 429 } elsif ($ref eq 'ARRAY') {
7790ca36 430 my $tied_to = tied(@$data);
431 if (defined($tied_to)) {
432 return +{__remote_tied_array__ => $self->_local_object_to_id($tied_to)};
433 } else {
434 return [ map $self->_deobjectify($_), @$data ];
435 }
fe6c9a7f 436 } elsif ($ref eq 'CODE') {
437 my $id = $self->_local_object_to_id(
438 Object::Remote::CodeContainer->new(code => $data)
439 );
440 return +{ __remote_code__ => $id };
6ed5d580 441 } elsif ($ref eq 'SCALAR') {
442 return +{ __scalar_ref__ => $$data };
ed5a8a8e 443 } elsif ($ref eq 'GLOB') {
444 return +{ __glob_ref__ => $self->_local_object_to_id(
445 Object::Remote::GlobContainer->new(handle => $data)
446 ) };
9e72f0cf 447 } else {
448 die "Can't collapse reftype $ref";
449 }
450 }
451 return $data; # plain scalar
452}
453
9e72f0cf 454sub _receive {
ad4f54b2 455 my ($self, $flat) = @_;
9031635d 456 Dlog_trace { my $l = length($flat); "Starting to deserialize $l characters of data for connection $_" } $self->_id;
ad4f54b2 457 my ($type, @rest) = eval { @{$self->_deserialize($flat)} }
458 or do { warn "Deserialize failed for ${flat}: $@"; return };
9031635d 459 Dlog_trace { "deserialization complete for connection $_" } $self->_id;
9e72f0cf 460 eval { $self->${\"receive_${type}"}(@rest); 1 }
ad4f54b2 461 or do { warn "Receive failed for ${flat}: $@"; return };
9e72f0cf 462 return;
463}
464
465sub receive_free {
466 my ($self, $id) = @_;
9031635d 467 Dlog_trace { "got a receive_free for object '$id' for connection $_" } $self->_id;
9d804009 468 delete $self->local_objects_by_id->{$id}
469 or warn "Free: no such object $id";
9e72f0cf 470 return;
471}
472
473sub receive_call {
deb77aaf 474 my ($self, $future_id, $id, @rest) = @_;
9031635d 475 Dlog_trace { "got a receive_call for object '$id' for connection $_" } $self->_id;
deb77aaf 476 my $future = $self->_id_to_remote_object($future_id);
8131a88a 477 $future->{method} = 'call_discard_free';
9e72f0cf 478 my $local = $self->local_objects_by_id->{$id}
479 or do { $future->fail("No such object $id"); return };
480 $self->_invoke($future, $local, @rest);
481}
482
8131a88a 483sub receive_call_free {
484 my ($self, $future, $id, @rest) = @_;
9031635d 485 Dlog_trace { "got a receive_call_free for object '$id' for connection $_" } $self->_id;
84b04178 486 $self->receive_call($future, $id, undef, @rest);
8131a88a 487 $self->receive_free($id);
488}
489
9e72f0cf 490sub _invoke {
84b04178 491 my ($self, $future, $local, $ctx, $method, @args) = @_;
9031635d 492 Dlog_trace { "got _invoke for a method named '$method' for connection $_" } $self->_id;
dc28afe8 493 if ($method =~ /^start::/) {
494 my $f = $local->$method(@args);
495 $f->on_done(sub { undef($f); $future->done(@_) });
3f1f1e66 496 return unless $f;
dc28afe8 497 $f->on_fail(sub { undef($f); $future->fail(@_) });
498 return;
499 }
84b04178 500 my $do = sub { $local->$method(@args) };
501 eval {
502 $future->done(
503 defined($ctx)
504 ? ($ctx ? $do->() : scalar($do->()))
505 : do { $do->(); () }
506 );
507 1;
508 } or do { $future->fail($@); return; };
9e72f0cf 509 return;
510}
511
9e72f0cf 5121;
b9a9982d 513
514=head1 NAME
515
516Object::Remote::Connection - An underlying connection for L<Object::Remote>
517
b0ec7e3b 518 use Object::Remote;
519
520 my %opts = (
521 nice => '10', ulimit => '-v 400000',
522 watchdog_timeout => 120, stderr => \*STDERR,
523 );
524
525 my $local = Object::Remote->connect('-');
526 my $remote = Object::Remote->connect('myserver', nice => 5);
527 my $remote_user = Object::Remote->connect('user@myserver', %opts);
528 my $local_sudo = Object::Remote->connect('user@');
529
530 #$remote can be any other connection object
531 my $hostname = Sys::Hostname->can::on($remote, 'hostname');
532
533=head1 DESCRIPTION
534
535This is the class that supports connections to a Perl interpreter that is executed in a
536different process. The new Perl interpreter can be either on the local or a remote machine
537and is configurable via arguments passed to the constructor.
538
539=head1 ARGUMENTS
540
541=over 4
542
543=item nice
544
545If this value is defined then it will be used as the nice value of the Perl process when it
546is started. The default is the undefined value and will not nice the process.
547
548=item stderr
549
550If this value is defined then it will be used as the file handle that receives the output
551of STDERR from the Perl interpreter process and I/O will be performed by the run loop in a
552non-blocking way. If the value is undefined then STDERR of the remote process will be connected
553directly to STDERR of the local process with out the run loop managing I/O. The default value
554is undefined.
555
556There are a few ways to use this feature. By default the behavior is to form one unified STDERR
557across all of the Perl interpreters including the local one. For small scale and quick operation
558this offers a predictable and easy to use way to get at error messages generated anywhere. If
559the local Perl interpreter crashes then the remote Perl interpreters still have an active STDERR
560and it is possible to still receive output from them. This is generally a good thing but can
561cause issues.
562
563When using a file handle as the output for STDERR once the local Perl interpreter is no longer
564running there is no longer a valid STDERR for the remote interpreters to send data to. This means
565that it is no longer possible to receive error output from the remote interpreters and that the
566shell will start to kill off the child processes. Passing a reference to STDERR for the local
567interpreter (as the SYNOPSIS shows) causes the run loop to manage I/O, one unified STDERR for
568all Perl interpreters that ends as soon as the local interpreter process does, and the shell will
569start killing children when the local interpreter exits.
570
571It is also possible to pass in a file handle that has been opened for writing. This would be
572useful for logging the output of the remote interpreter directly into a dedicated file.
573
574=item ulimit
575
576If this string is defined then it will be passed unmodified as the arguments to ulimit when
577the Perl process is started. The default value is the undefined value and will not limit the
578process in any way.
579
580=item watchdog_timeout
581
582If this value is defined then it will be used as the number of seconds the watchdog will wait
583for an update before it terminates the Perl interpreter process. The default value is undefined
584and will not use the watchdog. See C<Object::Remote::Watchdog> for more information.
585
586=back
587
588=head1 SEE ALSO
589
590=over 4
591
592=item C<Object::Remote>
b9a9982d 593
b0ec7e3b 594=back
b9a9982d 595
596=cut