TCP support for ListenerService
[scpubgit/Tak-Daemon.git] / lib / Tak / Daemon / ListenerService.pm
1 package Tak::Daemon::ListenerService;
2
3 use Scalar::Util qw(weaken);
4 use Moo;
5
6 with 'Tak::Role::Service';
7
8 has listen_on => (is => 'ro', required => 1);
9 has router => (is => 'ro', required => 1);
10
11 has state => (is => 'rw', default => sub { 'down' }, init_arg => undef);
12
13 has _start_in_progress => (is => 'lazy', clearer => '_clear_start_in_progress');
14
15 has listener => (is => 'rw', clearer => 'clear_listener');
16
17 has connections => (is => 'ro', default => sub { {} });
18
19 sub start_start_request {
20   my ($self, $req) = @_;
21   $req->result('already_started') if $self->state eq 'running';
22   push(@{$self->_start_in_progress->{requests}}, $req);
23   $self->_start_in_progress->{start}();
24 }
25
26 sub _build__start_in_progress {
27   my ($self) = @_;
28   weaken($self);
29   my %start = (requests => (my $requests = []));
30   my $listen_on = $self->listen_on;
31   my %addr = (
32     socktype => "stream",
33     map +(
34       ref($_)
35         ? (family => "inet", %$_)
36         : (family => "unix", path => $_)
37     ), $listen_on
38   );
39   $start{start} = sub {
40     $self->state('starting');
41     Tak->loop_upgrade;
42     Tak->loop->listen(
43       addr => \%addr,
44       on_notifier => sub {
45         $self->listener($_[0]);
46         $_->success('started') for @$requests;
47         $self->_clear_start_in_progress;
48         $self->state('running');
49       },
50       on_resolve_error => sub { # no-op until we add non-unix
51         $_->failure(resolve => @_) for @$requests;
52         $self->_clear_start_in_progress;
53         $self->state('stopped');
54       },
55       on_listen_error => sub {
56         $_->failure(listen => @_) for @$requests;
57         $self->_clear_start_in_progress;
58         $self->state('stopped');
59       },
60       on_accept => sub {
61         $self->setup_connection($_[0]);
62       },
63       on_accept_error => sub {
64         $self->handle_stop;
65       },
66     );
67     $start{start} = sub {}; # delete yourself
68   };
69   \%start;
70 }
71
72 sub handle_stop {
73   my ($self) = @_;
74   return 'already_stopped' if $self->state eq 'down';
75   # there's probably something more intelligent to do here, but meh
76   die failure => 'starting' if $self->state eq 'starting';
77   Tak->loop->remove($self->clear_listener);
78   !ref and unlink($_) for $self->listen_on;
79   $self->state('down');
80   return 'stopped';
81 }
82
83 sub DEMOLISH {
84   my ($self, $in_global_destruction) = @_;
85
86   return unless $self->state eq 'running';
87
88   !ref and unlink($_) for $self->listen_on;
89
90   return if $in_global_destruction;
91
92   Tak->loop->remove($self->listener);
93 }
94
95 sub setup_connection {
96   my ($self, $socket) = @_;
97   my $conn_set = $self->connections;
98   my $conn_str;
99   my $connection = Tak::ConnectionService->new(
100     read_fh => $socket, write_fh => $socket,
101     listening_service => $self->router->clone_or_self,
102     on_close => sub { delete $conn_set->{$conn_str} }
103   );
104   $conn_str = "$connection";
105   $connection->receiver->service->service->register_weak(remote => $connection);
106   $conn_set->{$conn_str} = $connection;
107   return;
108 }
109
110 1;