further detail OR->connect arguments, and document other bits
[scpubgit/Object-Remote.git] / lib / Object / Remote / Connection.pm
CommitLineData
9e72f0cf 1package Object::Remote::Connection;
2
f4a85080 3use Object::Remote::Logging qw (:log :dlog router);
dc28afe8 4use Object::Remote::Future;
9d804009 5use Object::Remote::Null;
676438a1 6use Object::Remote::Handle;
fe6c9a7f 7use Object::Remote::CodeContainer;
ed5a8a8e 8use Object::Remote::GlobProxy;
9use Object::Remote::GlobContainer;
7790ca36 10use Object::Remote::Tied;
9e72f0cf 11use Object::Remote;
ed5a8a8e 12use Symbol;
9e72f0cf 13use IO::Handle;
c824fdf3 14use POSIX ":sys_wait_h";
9e72f0cf 15use Module::Runtime qw(use_module);
f8080c1c 16use Scalar::Util qw(weaken blessed refaddr openhandle);
9e72f0cf 17use JSON::PP qw(encode_json);
783105c4 18use Future;
5add5e29 19use Carp qw(croak);
783105c4 20use Moo;
9e72f0cf 21
e1a0b9ca 22BEGIN { router()->exclude_forwarding }
c824fdf3 23
24END {
5ccce2d5 25 our %child_pids;
55c0d020 26
5ccce2d5 27 log_trace { "END handler is being invoked in " . __PACKAGE__ };
55c0d020 28
5ccce2d5 29 foreach(keys(%child_pids)) {
30 log_debug { "Killing child process '$_'" };
31 kill('TERM', $_);
32 }
c824fdf3 33}
34
9031635d 35has _id => ( is => 'ro', required => 1, default => sub { our $NEXT_CONNECTION_ID++ } );
ad4f54b2 36
9e72f0cf 37has send_to_fh => (
38 is => 'ro', required => 1,
90115979 39 trigger => sub {
8f43bcd9 40 my $self = $_[0];
41 $_[1]->autoflush(1);
42 Dlog_trace { my $id = $self->_id; "connection had send_to_fh set to $_" } $_[1];
90115979 43 },
9e72f0cf 44);
45
12fb4a80 46has read_channel => (
9e72f0cf 47 is => 'ro', required => 1,
48 trigger => sub {
12fb4a80 49 my ($self, $ch) = @_;
55c0d020 50 my $id = $self->_id;
998ff9a4 51 Dlog_trace { "trigger for read_channel has been invoked for connection $id; file handle is $_" } $ch->fh;
9e72f0cf 52 weaken($self);
12fb4a80 53 $ch->on_line_call(sub { $self->_receive(@_) });
55c0d020 54 $ch->on_close_call(sub {
998ff9a4 55 log_trace { "invoking 'done' on on_close handler for connection id '$id'" };
37efeb68 56 $self->on_close->done(@_);
f8080c1c 57 });
9e72f0cf 58 },
59);
60
12fb4a80 61has on_close => (
783105c4 62 is => 'rw', default => sub { $_[0]->_install_future_handlers(Future->new) },
998ff9a4 63 trigger => sub {
8f43bcd9 64 log_trace { "Installing handlers into future via trigger" };
65 $_[0]->_install_future_handlers($_[1])
998ff9a4 66 },
12fb4a80 67);
ad4f54b2 68
47c83a13 69has child_pid => (is => 'ro');
70
11dbd4a0 71has local_objects_by_id => (
72 is => 'ro', default => sub { {} },
73 coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
74);
9e72f0cf 75
11dbd4a0 76has remote_objects_by_id => (
77 is => 'ro', default => sub { {} },
78 coerce => sub { +{ %{$_[0]} } }, # shallow clone on the way in
79);
9e72f0cf 80
a980b0b8 81has outstanding_futures => (is => 'ro', default => sub { {} });
82
353556c4 83has _json => (
84 is => 'lazy',
85 handles => {
86 _deserialize => 'decode',
87 _encode => 'encode',
88 },
89);
90
91after BUILD => sub {
998ff9a4 92 my ($self) = @_;
93 my $pid = $self->child_pid;
5ccce2d5 94 our %child_pids;
95 return unless defined $pid;
96 $child_pids{$pid} = 1;
97 return;
353556c4 98};
99
100sub BUILD { }
101
d2eadebb 102sub is_valid {
103 my ($self) = @_;
fb258df6 104 my $valid = ! $self->on_close->is_ready;
55c0d020 105
fb258df6 106 log_trace {
107 my $id = $self->_id;
108 my $text;
109 if ($valid) {
110 $text = 'yes';
111 } else {
112 $text = 'no';
113 }
114 "Connection '$id' is valid: '$text'"
115 };
55c0d020 116
fb258df6 117 return $valid;
d2eadebb 118}
119
12fb4a80 120sub _fail_outstanding {
121 my ($self, $error) = @_;
122 my $outstanding = $self->outstanding_futures;
55c0d020 123
124 Dlog_debug {
d2eadebb 125 sprintf "Failing %i outstanding futures with '$error'", scalar(keys(%$outstanding))
126 };
127
128 foreach(keys(%$outstanding)) {
129 log_trace { "Failing future for $_" };
130 my $future = $outstanding->{$_};
867e4de5 131 $future->fail("$error\n");
d2eadebb 132 }
133
12fb4a80 134 %$outstanding = ();
135 return;
136}
137
353556c4 138sub _install_future_handlers {
139 my ($self, $f) = @_;
5ccce2d5 140 our %child_pids;
998ff9a4 141 Dlog_trace { "Installing handlers into future for connection $_" } $self->_id;
353556c4 142 weaken($self);
143 $f->on_done(sub {
998ff9a4 144 my $pid = $self->child_pid;
145 Dlog_trace { "Executing on_done handler in future for connection $_" } $self->_id;
353556c4 146 $self->_fail_outstanding("Object::Remote connection lost: " . ($f->get)[0]);
998ff9a4 147 return unless defined $pid;
148 log_debug { "Waiting for child '$pid' to exit" };
149 my $ret = waitpid($pid, 0);
150 if ($ret != $pid) {
151 log_debug { "Waited for pid $pid but waitpid() returned $ret" };
152 return;
153 } elsif ($? & 127) {
154 log_warn { "Remote interpreter did not exit cleanly" };
155 } else {
156 log_verbose {
157 my $exit_value = $? >> 8;
158 "Remote Perl interpreter exited with value '$exit_value'"
159 };
160 }
55c0d020 161
5ccce2d5 162 delete $child_pids{$pid};
353556c4 163 });
55c0d020 164 return $f;
353556c4 165};
9e72f0cf 166
fe6c9a7f 167sub _id_to_remote_object {
168 my ($self, $id) = @_;
9031635d 169 Dlog_trace { "fetching proxy for remote object with id '$id' for connection $_" } $self->_id;
fe6c9a7f 170 return bless({}, 'Object::Remote::Null') if $id eq 'NULL';
171 (
172 $self->remote_objects_by_id->{$id}
173 or Object::Remote::Handle->new(connection => $self, id => $id)
174 )->proxy;
175}
176
9e72f0cf 177sub _build__json {
178 weaken(my $self = shift);
9e72f0cf 179 JSON::PP->new->filter_json_single_key_object(
180 __remote_object__ => sub {
fe6c9a7f 181 $self->_id_to_remote_object(@_);
182 }
183 )->filter_json_single_key_object(
184 __remote_code__ => sub {
185 my $code_container = $self->_id_to_remote_object(@_);
186 sub { $code_container->call(@_) };
9e72f0cf 187 }
6ed5d580 188 )->filter_json_single_key_object(
189 __scalar_ref__ => sub {
190 my $value = shift;
191 return \$value;
192 }
ed5a8a8e 193 )->filter_json_single_key_object(
194 __glob_ref__ => sub {
195 my $glob_container = $self->_id_to_remote_object(@_);
196 my $handle = Symbol::gensym;
197 tie *$handle, 'Object::Remote::GlobProxy', $glob_container;
198 return $handle;
199 }
6163a5aa 200 )->filter_json_single_key_object(
201 __local_object__ => sub {
202 $self->local_objects_by_id->{$_[0]}
203 }
7790ca36 204 )->filter_json_single_key_object(
205 __remote_tied_hash__ => sub {
37efeb68 206 my %tied_hash;
207 tie %tied_hash, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
208 return \%tied_hash;
7790ca36 209 }
210 )->filter_json_single_key_object(
211 __remote_tied_array__ => sub {
37efeb68 212 my @tied_array;
213 tie @tied_array, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
214 return \@tied_array;
7790ca36 215 }
1bf55307 216 );
9e72f0cf 217}
218
c824fdf3 219sub _load_if_possible {
55c0d020 220 my ($class) = @_;
c824fdf3 221
55c0d020 222 use_module($class);
c824fdf3 223
224 if ($@) {
225 log_debug { "Attempt at loading '$class' failed with '$@'" };
226 }
227
228}
229
84b04178 230BEGIN {
231 unshift our @Guess, sub { blessed($_[0]) ? $_[0] : undef };
c824fdf3 232 map _load_if_possible($_), qw(
233 Object::Remote::Connector::Local
234 Object::Remote::Connector::LocalSudo
235 Object::Remote::Connector::SSH
236 Object::Remote::Connector::UNIX
bef36e73 237 Object::Remote::Connector::INET
55c0d020 238 );
84b04178 239}
240
c824fdf3 241sub conn_from_spec {
242 my ($class, $spec, @args) = @_;
84b04178 243 foreach my $poss (do { our @Guess }) {
c824fdf3 244 if (my $conn = $poss->($spec, @args)) {
245 return $conn;
fbd3b8ec 246 }
84b04178 247 }
55c0d020 248
c824fdf3 249 return undef;
250}
251
252sub new_from_spec {
71087610 253 my ($class, $spec, @args) = @_;
c824fdf3 254 return $spec if blessed $spec;
71087610 255 my $conn = $class->conn_from_spec($spec, @args);
55c0d020 256
c824fdf3 257 die "Couldn't figure out what to do with ${spec}"
258 unless defined $conn;
55c0d020 259
260 return $conn->maybe::start::connect;
84b04178 261}
262
11dbd4a0 263sub remote_object {
e144d525 264 my ($self, @args) = @_;
265 Object::Remote::Handle->new(
266 connection => $self, @args
267 )->proxy;
268}
269
4c17fea5 270sub connect {
271 my ($self, $to) = @_;
9031635d 272 Dlog_debug { "Creating connection to remote node '$to' for connection $_" } $self->_id;
deb77aaf 273 return await_future(
274 $self->send_class_call(0, 'Object::Remote', connect => $to)
275 );
4c17fea5 276}
277
11dbd4a0 278sub remote_sub {
c6fe6fbd 279 my ($self, $sub) = @_;
280 my ($pkg, $name) = $sub =~ m/^(.*)::([^:]+)$/;
fb258df6 281 Dlog_debug { "Invoking remote sub '$sub' for connection '$_'" } $self->_id;
deb77aaf 282 return await_future($self->send_class_call(0, $pkg, can => $name));
283}
284
285sub send_class_call {
286 my ($self, $ctx, @call) = @_;
9031635d 287 Dlog_trace { "Sending a class call for connection $_" } $self->_id;
deb77aaf 288 $self->send(call => class_call_handler => $ctx => call => @call);
289}
290
291sub register_class_call_handler {
292 my ($self) = @_;
3687a42d 293 $self->local_objects_by_id->{'class_call_handler'} ||= do {
c5736e1c 294 my $o = $self->new_class_call_handler;
3687a42d 295 $self->_local_object_to_id($o);
296 $o;
297 };
c6fe6fbd 298}
299
c5736e1c 300sub new_class_call_handler {
301 Object::Remote::CodeContainer->new(
302 code => sub {
303 my ($class, $method) = (shift, shift);
304 use_module($class)->$method(@_);
305 }
306 );
307}
308
9e72f0cf 309sub register_remote {
310 my ($self, $remote) = @_;
9031635d 311 Dlog_trace { my $i = $remote->id; "Registered a remote object with id of '$i' for connection $_" } $self->_id;
9e72f0cf 312 weaken($self->remote_objects_by_id->{$remote->id} = $remote);
313 return $remote;
314}
315
316sub send_free {
317 my ($self, $id) = @_;
7790ca36 318 Dlog_trace { "sending request to free object '$id' for connection $_" } $self->_id;
302ecfbf 319 #TODO this shows up some times when a remote side dies in the middle of a remote
320 #method invocation - possibly only when the object is being constructed?
321 #(in cleanup) Use of uninitialized value $id in delete at ../Object-Remote/lib/Object/Remote/Connection.
9e72f0cf 322 delete $self->remote_objects_by_id->{$id};
323 $self->_send([ free => $id ]);
324}
325
326sub send {
327 my ($self, $type, @call) = @_;
328
783105c4 329 my $future = Future->new;
a2d43709 330 my $remote = $self->remote_objects_by_id->{$call[0]};
deb77aaf 331
332 unshift @call, $type => $self->_local_object_to_id($future);
9e72f0cf 333
a980b0b8 334 my $outstanding = $self->outstanding_futures;
335 $outstanding->{$future} = $future;
a2d43709 336 $future->on_ready(sub {
337 undef($remote);
338 delete $outstanding->{$future}
339 });
a980b0b8 340
9e72f0cf 341 $self->_send(\@call);
342
343 return $future;
344}
345
9d804009 346sub send_discard {
347 my ($self, $type, @call) = @_;
348
deb77aaf 349 unshift @call, $type => 'NULL';
9d804009 350
351 $self->_send(\@call);
352}
353
9e72f0cf 354sub _send {
355 my ($self, $to_send) = @_;
9d64d2d9 356 my $fh = $self->send_to_fh;
55c0d020 357
d2eadebb 358 unless ($self->is_valid) {
5add5e29 359 croak "Attempt to invoke _send on a connection that is not valid";
d2eadebb 360 }
55c0d020 361
9031635d 362 Dlog_trace { "Starting to serialize data in argument to _send for connection $_" } $self->_id;
9d64d2d9 363 my $serialized = $self->_serialize($to_send)."\n";
6b7b2732 364 Dlog_trace { my $l = length($serialized); "serialization is completed; sending '$l' characters of serialized data to $_" } $fh;
55c0d020 365 my $ret;
366 eval {
353556c4 367 #TODO this should be converted over to a non-blocking ::WriteChannel class
368 die "filehandle is not open" unless openhandle($fh);
369 log_trace { "file handle has passed openhandle() test; printing to it" };
370 $ret = print $fh $serialized;
371 die "print was not successful: $!" unless defined $ret
f8080c1c 372 };
55c0d020 373
f8080c1c 374 if ($@) {
353556c4 375 Dlog_debug { "exception encountered when trying to write to file handle $_: $@" } $fh;
998ff9a4 376 my $error = $@;
377 chomp($error);
353556c4 378 $self->on_close->done("could not write to file handle: $error") unless $self->on_close->is_ready;
55c0d020 379 return;
f8080c1c 380 }
55c0d020 381
382 return $ret;
9e72f0cf 383}
384
385sub _serialize {
386 my ($self, $data) = @_;
3687a42d 387 local our @New_Ids = (-1);
9d804009 388 return eval {
ad4f54b2 389 my $flat = $self->_encode($self->_deobjectify($data));
ad4f54b2 390 $flat;
70a578ac 391 } || do {
9d804009 392 my $err = $@; # won't get here if the eval doesn't die
393 # don't keep refs to new things
394 delete @{$self->local_objects_by_id}{@New_Ids};
395 die "Error serializing: $err";
396 };
9e72f0cf 397}
398
a76f2f60 399sub _local_object_to_id {
400 my ($self, $object) = @_;
401 my $id = refaddr($object);
402 $self->local_objects_by_id->{$id} ||= do {
3687a42d 403 push our(@New_Ids), $id if @New_Ids;
a76f2f60 404 $object;
405 };
406 return $id;
407}
408
9e72f0cf 409sub _deobjectify {
410 my ($self, $data) = @_;
411 if (blessed($data)) {
6163a5aa 412 if (
413 $data->isa('Object::Remote::Proxy')
414 and $data->{remote}->connection == $self
415 ) {
416 return +{ __local_object__ => $data->{remote}->id };
417 } else {
418 return +{ __remote_object__ => $self->_local_object_to_id($data) };
419 }
9e72f0cf 420 } elsif (my $ref = ref($data)) {
421 if ($ref eq 'HASH') {
7790ca36 422 my $tied_to = tied(%$data);
423 if(defined($tied_to)) {
55c0d020 424 return +{__remote_tied_hash__ => $self->_local_object_to_id($tied_to)};
7790ca36 425 } else {
426 return +{ map +($_ => $self->_deobjectify($data->{$_})), keys %$data };
427 }
9e72f0cf 428 } elsif ($ref eq 'ARRAY') {
7790ca36 429 my $tied_to = tied(@$data);
430 if (defined($tied_to)) {
55c0d020 431 return +{__remote_tied_array__ => $self->_local_object_to_id($tied_to)};
7790ca36 432 } else {
433 return [ map $self->_deobjectify($_), @$data ];
434 }
fe6c9a7f 435 } elsif ($ref eq 'CODE') {
436 my $id = $self->_local_object_to_id(
437 Object::Remote::CodeContainer->new(code => $data)
438 );
439 return +{ __remote_code__ => $id };
6ed5d580 440 } elsif ($ref eq 'SCALAR') {
441 return +{ __scalar_ref__ => $$data };
ed5a8a8e 442 } elsif ($ref eq 'GLOB') {
443 return +{ __glob_ref__ => $self->_local_object_to_id(
444 Object::Remote::GlobContainer->new(handle => $data)
445 ) };
9e72f0cf 446 } else {
447 die "Can't collapse reftype $ref";
448 }
449 }
450 return $data; # plain scalar
451}
452
9e72f0cf 453sub _receive {
ad4f54b2 454 my ($self, $flat) = @_;
9031635d 455 Dlog_trace { my $l = length($flat); "Starting to deserialize $l characters of data for connection $_" } $self->_id;
ad4f54b2 456 my ($type, @rest) = eval { @{$self->_deserialize($flat)} }
457 or do { warn "Deserialize failed for ${flat}: $@"; return };
9031635d 458 Dlog_trace { "deserialization complete for connection $_" } $self->_id;
9e72f0cf 459 eval { $self->${\"receive_${type}"}(@rest); 1 }
ad4f54b2 460 or do { warn "Receive failed for ${flat}: $@"; return };
9e72f0cf 461 return;
462}
463
464sub receive_free {
465 my ($self, $id) = @_;
9031635d 466 Dlog_trace { "got a receive_free for object '$id' for connection $_" } $self->_id;
9d804009 467 delete $self->local_objects_by_id->{$id}
468 or warn "Free: no such object $id";
9e72f0cf 469 return;
470}
471
472sub receive_call {
deb77aaf 473 my ($self, $future_id, $id, @rest) = @_;
9031635d 474 Dlog_trace { "got a receive_call for object '$id' for connection $_" } $self->_id;
deb77aaf 475 my $future = $self->_id_to_remote_object($future_id);
8131a88a 476 $future->{method} = 'call_discard_free';
9e72f0cf 477 my $local = $self->local_objects_by_id->{$id}
478 or do { $future->fail("No such object $id"); return };
479 $self->_invoke($future, $local, @rest);
480}
481
8131a88a 482sub receive_call_free {
483 my ($self, $future, $id, @rest) = @_;
9031635d 484 Dlog_trace { "got a receive_call_free for object '$id' for connection $_" } $self->_id;
84b04178 485 $self->receive_call($future, $id, undef, @rest);
8131a88a 486 $self->receive_free($id);
487}
488
9e72f0cf 489sub _invoke {
84b04178 490 my ($self, $future, $local, $ctx, $method, @args) = @_;
9031635d 491 Dlog_trace { "got _invoke for a method named '$method' for connection $_" } $self->_id;
dc28afe8 492 if ($method =~ /^start::/) {
493 my $f = $local->$method(@args);
494 $f->on_done(sub { undef($f); $future->done(@_) });
3f1f1e66 495 return unless $f;
dc28afe8 496 $f->on_fail(sub { undef($f); $future->fail(@_) });
497 return;
498 }
84b04178 499 my $do = sub { $local->$method(@args) };
500 eval {
501 $future->done(
502 defined($ctx)
503 ? ($ctx ? $do->() : scalar($do->()))
504 : do { $do->(); () }
505 );
506 1;
507 } or do { $future->fail($@); return; };
9e72f0cf 508 return;
509}
510
9e72f0cf 5111;
b9a9982d 512
513=head1 NAME
514
515Object::Remote::Connection - An underlying connection for L<Object::Remote>
516
b0ec7e3b 517 use Object::Remote;
55c0d020 518
b0ec7e3b 519 my $local = Object::Remote->connect('-');
de9062cf 520 my $remote = Object::Remote->connect('myserver');
521 my $remote_user = Object::Remote->connect('user@myserver');
b0ec7e3b 522 my $local_sudo = Object::Remote->connect('user@');
55c0d020 523
b0ec7e3b 524 #$remote can be any other connection object
525 my $hostname = Sys::Hostname->can::on($remote, 'hostname');
55c0d020 526
b0ec7e3b 527=head1 DESCRIPTION
528
8dbf62a5 529This is the base class for connections in OR objects. Connections are present
530both in the local and remote parts of each OR pair, and handle the data
531processing for sending OR commands and responses as JSON via the appropiate
532connection mechanism.
b0ec7e3b 533
4e25b1fd 534=head1 METHODS
535
536=head2 new_from_spec
537
538 my $conn = Object::Remote::Connection->new_from_spec($spec, %args);
539
540Not intended for direct use, called by L<Object::Remote/connect> in a
541L<Future>-compatible way.
542
543Uses the spec to guess the appropiate Object::Remote::Connector::* class to use,
544instantiates it with the spec and any further arguments given, then calls
545C<connect> on it in a L<Future>-compatible way.
546
b0ec7e3b 547=head1 SEE ALSO
548
549=over 4
550
de9062cf 551=item C<Object::Remote::Role::Connector::PerlInterpreter>
552
b0ec7e3b 553=item C<Object::Remote>
b9a9982d 554
b0ec7e3b 555=back
b9a9982d 556
557=cut