found location of hang; make annotations; added more log lines
[scpubgit/Object-Remote.git] / lib / Object / Remote / ConnectionServer.pm
1 package Object::Remote::ConnectionServer;
2
3 use Scalar::Util qw(blessed weaken);
4 use Module::Runtime qw(use_module);
5 use Object::Remote;
6 use IO::Socket::UNIX;
7 use POSIX ();
8 use Moo;
9
10 has listen_on => (
11   is => 'ro',
12   coerce => sub {
13     return $_[0] if blessed($_[0]);
14     unlink($_[0]);
15     IO::Socket::UNIX->new(
16       Local => $_[0],
17       Listen => 1
18     ) or die "Couldn't liten to $_[0]: $!";
19   },
20   trigger => sub {
21     my ($self, $fh) = @_;
22     weaken($self);
23     Object::Remote->current_loop
24                   ->watch_io(
25                       handle => $fh,
26                       on_read_ready => sub { $self->_listen_ready($fh) }
27                     );
28   },
29 );
30
31 has connection_args => (
32  is => 'ro', default => sub { [] }
33 );
34
35 has connection_callback => (
36   is => 'ro', default => sub { sub { shift } }
37 );
38
39 sub BUILD {
40   Object::Remote->current_loop->want_run;
41 }
42
43 sub run {
44   Object::Remote->current_loop->run_while_wanted;
45 }
46
47 sub _listen_ready {
48   my ($self, $fh) = @_;
49   my $new = $fh->accept or die "Couldn't accept: $!";
50   $new->blocking(0);
51   my $f = CPS::Future->new;
52   my $c = use_module('Object::Remote::Connection')->new(
53     receive_from_fh => $new,
54     send_to_fh => $new,
55     on_close => $f, # and so will die $c
56     @{$self->connection_args}
57   )->${\$self->connection_callback};
58   $f->on_ready(sub { undef($c) });
59   $c->ready_future->done;
60   #TODO see if this runs on the controller or the remote node 
61   #if this runs on the controller a poorly behaved remote node
62   #could cause the print() to block but it's a very low probability
63   print $new "Shere\n" or die "Couldn't send to new socket: $!";
64   return $c;
65 }
66
67 sub DEMOLISH {
68   my ($self, $gd) = @_;
69   return if $gd;
70   Object::Remote->current_loop
71                 ->unwatch_io(
72                     handle => $self->listen_on,
73                     on_read_ready => 1
74                   );
75   if ($self->listen_on->can('hostpath')) {
76     unlink($self->listen_on->hostpath);
77   }
78   Object::Remote->current_loop->want_stop;
79 }
80
81 1;