Commit | Line | Data |
5c608989 |
1 | package Object::Remote::ConnectionServer; |
2 | |
3 | use Scalar::Util qw(blessed weaken); |
4 | use Module::Runtime qw(use_module); |
5 | use Object::Remote; |
07105aca |
6 | use Object::Remote::Logging qw( :log :dlog ); |
5c608989 |
7 | use IO::Socket::UNIX; |
5c608989 |
8 | use Moo; |
9 | |
10 | has listen_on => ( |
11 | is => 'ro', |
12 | coerce => sub { |
13 | return $_[0] if blessed($_[0]); |
14 | unlink($_[0]); |
15 | IO::Socket::UNIX->new( |
16 | Local => $_[0], |
17 | Listen => 1 |
18 | ) or die "Couldn't liten to $_[0]: $!"; |
19 | }, |
20 | trigger => sub { |
21 | my ($self, $fh) = @_; |
07105aca |
22 | log_debug { "adding connection server to run loop because the trigger has executed" }; |
5c608989 |
23 | weaken($self); |
24 | Object::Remote->current_loop |
25 | ->watch_io( |
26 | handle => $fh, |
27 | on_read_ready => sub { $self->_listen_ready($fh) } |
28 | ); |
29 | }, |
30 | ); |
31 | |
32 | has connection_args => ( |
33 | is => 'ro', default => sub { [] } |
34 | ); |
35 | |
3687a42d |
36 | has connection_callback => ( |
37 | is => 'ro', default => sub { sub { shift } } |
38 | ); |
39 | |
5c608989 |
40 | sub BUILD { |
07105aca |
41 | log_debug { "A connection server has been built; calling want_run on run loop" }; |
5c608989 |
42 | Object::Remote->current_loop->want_run; |
43 | } |
44 | |
45 | sub run { |
07105aca |
46 | log_debug { "Connection server is calling run_while_wanted on the run loop" }; |
5c608989 |
47 | Object::Remote->current_loop->run_while_wanted; |
48 | } |
49 | |
50 | sub _listen_ready { |
51 | my ($self, $fh) = @_; |
07105aca |
52 | log_debug { "Got a connection, calling accept on the file handle" }; |
5c608989 |
53 | my $new = $fh->accept or die "Couldn't accept: $!"; |
07105aca |
54 | log_trace { "Setting file handle non-blocking" }; |
5c608989 |
55 | $new->blocking(0); |
56 | my $f = CPS::Future->new; |
07105aca |
57 | log_trace { "Creating a new connection with the remote node" }; |
5c608989 |
58 | my $c = use_module('Object::Remote::Connection')->new( |
59 | receive_from_fh => $new, |
60 | send_to_fh => $new, |
61 | on_close => $f, # and so will die $c |
62 | @{$self->connection_args} |
3687a42d |
63 | )->${\$self->connection_callback}; |
5c608989 |
64 | $f->on_ready(sub { undef($c) }); |
07105aca |
65 | log_trace { "marking the future as done" }; |
5c608989 |
66 | $c->ready_future->done; |
07105aca |
67 | Dlog_trace { "Sending 'Shere' to socket $_" } $new; |
5c608989 |
68 | print $new "Shere\n" or die "Couldn't send to new socket: $!"; |
07105aca |
69 | log_debug { "Connection has been fully handled" }; |
5c608989 |
70 | return $c; |
71 | } |
72 | |
73 | sub DEMOLISH { |
74 | my ($self, $gd) = @_; |
07105aca |
75 | log_debug { "A connection server is being destroyed; global destruction: '$gd'" }; |
5c608989 |
76 | return if $gd; |
07105aca |
77 | log_trace { "Removing the connection server IO watcher from run loop" }; |
5c608989 |
78 | Object::Remote->current_loop |
79 | ->unwatch_io( |
80 | handle => $self->listen_on, |
81 | on_read_ready => 1 |
82 | ); |
83 | if ($self->listen_on->can('hostpath')) { |
07105aca |
84 | log_debug { my $p = $self->listen_on->hostpath; "Removing '$p' from the filesystem" }; |
5c608989 |
85 | unlink($self->listen_on->hostpath); |
86 | } |
07105aca |
87 | log_trace { "calling want_stop on the run loop" }; |
5c608989 |
88 | Object::Remote->current_loop->want_stop; |
89 | } |
90 | |
91 | 1; |