Commit | Line | Data |
dc2fd705 |
1 | package Tak::Daemon::ListenerService; |
2 | |
3 | use Scalar::Util qw(weaken); |
4 | use Moo; |
5 | |
6 | with 'Tak::Role::Service'; |
7 | |
8 | has socket_location => (is => 'ro', required => 1); |
9 | has router => (is => 'ro', required => 1); |
10 | |
11 | has state => (is => 'rw', default => sub { 'down' }, init_arg => undef); |
12 | |
13 | has _start_in_progress => (is => 'lazy', clearer => '_clear_start_in_progress'); |
14 | |
15 | has listener => (is => 'rw', clearer => 'clear_listener'); |
16 | |
17 | has connections => (is => 'ro', default => sub { {} }); |
18 | |
19 | sub start_start_request { |
20 | my ($self, $req) = @_; |
21 | $req->result('already_started') if $self->state eq 'running'; |
22 | push(@{$self->_start_in_progress->{requests}}, $req); |
23 | $self->_start_in_progress->{start}(); |
24 | } |
25 | |
26 | sub _build__start_in_progress { |
27 | my ($self) = @_; |
28 | weaken($self); |
29 | my %start = (requests => (my $requests = [])); |
30 | $start{start} = sub { |
31 | $self->state('starting'); |
32 | Tak->loop_upgrade; |
33 | Tak->loop->listen( |
34 | addr => { |
35 | family => "unix", |
36 | socktype => "stream", |
37 | path => $self->socket_location, |
38 | }, |
39 | on_notifier => sub { |
40 | $self->listener($_[0]); |
41 | $_->success('started') for @$requests; |
42 | $self->_clear_start_in_progress; |
43 | $self->state('running'); |
44 | }, |
45 | on_resolve_error => sub { # no-op until we add non-unix |
46 | $_->failure(resolve => @_) for @$requests; |
47 | $self->_clear_start_in_progress; |
48 | $self->state('stopped'); |
49 | }, |
50 | on_listen_error => sub { |
51 | $_->failure(listen => @_) for @$requests; |
52 | $self->_clear_start_in_progress; |
53 | $self->state('stopped'); |
54 | }, |
55 | on_accept => sub { |
56 | $self->setup_connection($_[0]); |
57 | }, |
55cf7075 |
58 | on_accept_error => sub { |
59 | $self->handle_stop; |
60 | }, |
dc2fd705 |
61 | ); |
62 | $start{start} = sub {}; # delete yourself |
63 | }; |
64 | \%start; |
65 | } |
66 | |
67 | sub handle_stop { |
68 | my ($self) = @_; |
69 | return 'already_stopped' if $self->state eq 'down'; |
70 | # there's probably something more intelligent to do here, but meh |
71 | die failure => 'starting' if $self->state eq 'starting'; |
72 | Tak->loop->remove($self->clear_listener); |
55cf7075 |
73 | unlink($self->socket_location); |
dc2fd705 |
74 | $self->state('down'); |
75 | return 'stopped'; |
76 | } |
77 | |
78 | sub DEMOLISH { |
baea9bda |
79 | my ($self, $in_global_destruction) = @_; |
80 | |
81 | return unless $self->state eq 'running'; |
82 | |
83 | unlink($self->socket_location); |
84 | |
85 | return if $in_global_destruction; |
86 | |
87 | Tak->loop->remove($self->listener); |
dc2fd705 |
88 | } |
89 | |
90 | sub setup_connection { |
91 | my ($self, $socket) = @_; |
92 | my $conn_set = $self->connections; |
6518a480 |
93 | my $conn_str; |
94 | my $connection = Tak::ConnectionService->new( |
dc2fd705 |
95 | read_fh => $socket, write_fh => $socket, |
6518a480 |
96 | listening_service => $self->router->clone_or_self, |
97 | on_close => sub { delete $conn_set->{$conn_str} } |
dc2fd705 |
98 | ); |
6518a480 |
99 | $conn_str = "$connection"; |
100 | $connection->receiver->service->service->register_weak(remote => $connection); |
101 | $conn_set->{$conn_str} = $connection; |
dc2fd705 |
102 | return; |
103 | } |
104 | |
105 | 1; |