found location of hang; make annotations; added more log lines
[scpubgit/Object-Remote.git] / lib / Object / Remote / ConnectionServer.pm
CommitLineData
5c608989 1package Object::Remote::ConnectionServer;
2
3use Scalar::Util qw(blessed weaken);
4use Module::Runtime qw(use_module);
5use Object::Remote;
6use IO::Socket::UNIX;
7use POSIX ();
8use Moo;
9
10has listen_on => (
11 is => 'ro',
12 coerce => sub {
13 return $_[0] if blessed($_[0]);
14 unlink($_[0]);
15 IO::Socket::UNIX->new(
16 Local => $_[0],
17 Listen => 1
18 ) or die "Couldn't liten to $_[0]: $!";
19 },
20 trigger => sub {
21 my ($self, $fh) = @_;
22 weaken($self);
23 Object::Remote->current_loop
24 ->watch_io(
25 handle => $fh,
26 on_read_ready => sub { $self->_listen_ready($fh) }
27 );
28 },
29);
30
31has connection_args => (
32 is => 'ro', default => sub { [] }
33);
34
3687a42d 35has connection_callback => (
36 is => 'ro', default => sub { sub { shift } }
37);
38
5c608989 39sub BUILD {
40 Object::Remote->current_loop->want_run;
41}
42
43sub run {
44 Object::Remote->current_loop->run_while_wanted;
45}
46
47sub _listen_ready {
48 my ($self, $fh) = @_;
49 my $new = $fh->accept or die "Couldn't accept: $!";
50 $new->blocking(0);
51 my $f = CPS::Future->new;
52 my $c = use_module('Object::Remote::Connection')->new(
53 receive_from_fh => $new,
54 send_to_fh => $new,
55 on_close => $f, # and so will die $c
56 @{$self->connection_args}
3687a42d 57 )->${\$self->connection_callback};
5c608989 58 $f->on_ready(sub { undef($c) });
59 $c->ready_future->done;
9d64d2d9 60 #TODO see if this runs on the controller or the remote node
61 #if this runs on the controller a poorly behaved remote node
62 #could cause the print() to block but it's a very low probability
5c608989 63 print $new "Shere\n" or die "Couldn't send to new socket: $!";
64 return $c;
65}
66
67sub DEMOLISH {
68 my ($self, $gd) = @_;
69 return if $gd;
70 Object::Remote->current_loop
71 ->unwatch_io(
72 handle => $self->listen_on,
73 on_read_ready => 1
74 );
75 if ($self->listen_on->can('hostpath')) {
76 unlink($self->listen_on->hostpath);
77 }
78 Object::Remote->current_loop->want_stop;
79}
80
811;