Commit | Line | Data |
5c608989 |
1 | package Object::Remote::ConnectionServer; |
2 | |
3 | use Scalar::Util qw(blessed weaken); |
4 | use Module::Runtime qw(use_module); |
5 | use Object::Remote; |
9031635d |
6 | use Object::Remote::Logging qw( :log :dlog ); |
783105c4 |
7 | use Future; |
5c608989 |
8 | use IO::Socket::UNIX; |
5c608989 |
9 | use Moo; |
10 | |
11 | has listen_on => ( |
12 | is => 'ro', |
13 | coerce => sub { |
14 | return $_[0] if blessed($_[0]); |
15 | unlink($_[0]); |
16 | IO::Socket::UNIX->new( |
17 | Local => $_[0], |
18 | Listen => 1 |
19 | ) or die "Couldn't liten to $_[0]: $!"; |
20 | }, |
21 | trigger => sub { |
22 | my ($self, $fh) = @_; |
9031635d |
23 | log_debug { "adding connection server to run loop because the trigger has executed" }; |
5c608989 |
24 | weaken($self); |
25 | Object::Remote->current_loop |
26 | ->watch_io( |
27 | handle => $fh, |
28 | on_read_ready => sub { $self->_listen_ready($fh) } |
29 | ); |
30 | }, |
31 | ); |
32 | |
33 | has connection_args => ( |
34 | is => 'ro', default => sub { [] } |
35 | ); |
36 | |
3687a42d |
37 | has connection_callback => ( |
38 | is => 'ro', default => sub { sub { shift } } |
39 | ); |
40 | |
5c608989 |
41 | sub BUILD { |
9031635d |
42 | log_debug { "A connection server has been built; calling want_run on run loop" }; |
5c608989 |
43 | Object::Remote->current_loop->want_run; |
44 | } |
45 | |
46 | sub run { |
9031635d |
47 | log_debug { "Connection server is calling run_while_wanted on the run loop" }; |
5c608989 |
48 | Object::Remote->current_loop->run_while_wanted; |
49 | } |
50 | |
51 | sub _listen_ready { |
52 | my ($self, $fh) = @_; |
9031635d |
53 | log_debug { "Got a connection, calling accept on the file handle" }; |
5c608989 |
54 | my $new = $fh->accept or die "Couldn't accept: $!"; |
9031635d |
55 | log_trace { "Setting file handle non-blocking" }; |
5c608989 |
56 | $new->blocking(0); |
783105c4 |
57 | my $f = Future->new; |
9031635d |
58 | log_trace { "Creating a new connection with the remote node" }; |
5c608989 |
59 | my $c = use_module('Object::Remote::Connection')->new( |
60 | receive_from_fh => $new, |
61 | send_to_fh => $new, |
62 | on_close => $f, # and so will die $c |
63 | @{$self->connection_args} |
3687a42d |
64 | )->${\$self->connection_callback}; |
5c608989 |
65 | $f->on_ready(sub { undef($c) }); |
9031635d |
66 | log_trace { "marking the future as done" }; |
5c608989 |
67 | $c->ready_future->done; |
55c0d020 |
68 | Dlog_trace { "Sending 'Shere' to socket $_" } $new; |
5c608989 |
69 | print $new "Shere\n" or die "Couldn't send to new socket: $!"; |
9031635d |
70 | log_debug { "Connection has been fully handled" }; |
5c608989 |
71 | return $c; |
72 | } |
73 | |
74 | sub DEMOLISH { |
75 | my ($self, $gd) = @_; |
9031635d |
76 | log_debug { "A connection server is being destroyed; global destruction: '$gd'" }; |
5c608989 |
77 | return if $gd; |
9031635d |
78 | log_trace { "Removing the connection server IO watcher from run loop" }; |
5c608989 |
79 | Object::Remote->current_loop |
80 | ->unwatch_io( |
81 | handle => $self->listen_on, |
82 | on_read_ready => 1 |
83 | ); |
84 | if ($self->listen_on->can('hostpath')) { |
9031635d |
85 | log_debug { my $p = $self->listen_on->hostpath; "Removing '$p' from the filesystem" }; |
5c608989 |
86 | unlink($self->listen_on->hostpath); |
87 | } |
9031635d |
88 | log_trace { "calling want_stop on the run loop" }; |
5c608989 |
89 | Object::Remote->current_loop->want_stop; |
90 | } |
91 | |
92 | 1; |