f9738cefe3703bb958a0c2780fe2a4851b3cac3e
[scpubgit/Tak-Daemon.git] / lib / Tak / Daemon / ListenerService.pm
1 package Tak::Daemon::ListenerService;
2
3 use Scalar::Util qw(weaken);
4 use Moo;
5
6 with 'Tak::Role::Service';
7
8 has socket_location => (is => 'ro', required => 1);
9 has router => (is => 'ro', required => 1);
10
11 has state => (is => 'rw', default => sub { 'down' }, init_arg => undef);
12
13 has _start_in_progress => (is => 'lazy', clearer => '_clear_start_in_progress');
14
15 has listener => (is => 'rw', clearer => 'clear_listener');
16
17 has connections => (is => 'ro', default => sub { {} });
18
19 sub start_start_request {
20   my ($self, $req) = @_;
21   $req->result('already_started') if $self->state eq 'running';
22   push(@{$self->_start_in_progress->{requests}}, $req);
23   $self->_start_in_progress->{start}();
24 }
25
26 sub _build__start_in_progress {
27   my ($self) = @_;
28   weaken($self);
29   my %start = (requests => (my $requests = []));
30   $start{start} = sub {
31     $self->state('starting');
32     Tak->loop_upgrade;
33     Tak->loop->listen(
34       addr => {
35         family => "unix",
36         socktype => "stream",
37         path => $self->socket_location,
38       },
39       on_notifier => sub {
40         $self->listener($_[0]);
41         $_->success('started') for @$requests;
42         $self->_clear_start_in_progress;
43         $self->state('running');
44       },
45       on_resolve_error => sub { # no-op until we add non-unix
46         $_->failure(resolve => @_) for @$requests;
47         $self->_clear_start_in_progress;
48         $self->state('stopped');
49       },
50       on_listen_error => sub {
51         $_->failure(listen => @_) for @$requests;
52         $self->_clear_start_in_progress;
53         $self->state('stopped');
54       },
55       on_accept => sub {
56         $self->setup_connection($_[0]);
57       },
58       on_accept_error => sub {
59         $self->handle_stop;
60       },
61     );
62     $start{start} = sub {}; # delete yourself
63   };
64   \%start;
65 }
66
67 sub handle_stop {
68   my ($self) = @_;
69   return 'already_stopped' if $self->state eq 'down';
70   # there's probably something more intelligent to do here, but meh
71   die failure => 'starting' if $self->state eq 'starting';
72   Tak->loop->remove($self->clear_listener);
73   unlink($self->socket_location);
74   $self->state('down');
75   return 'stopped';
76 }
77
78 sub DEMOLISH {
79   my ($self, $gd) = @_;
80   return if $gd;
81   if (my $l = $self->listener) {
82     $l->get_loop->remove($l);
83     unlink($self->socket_location);
84   }
85 }
86
87 sub setup_connection {
88   my ($self, $socket) = @_;
89   my $conn_set = $self->connections;
90   my $connection;
91   $connection = Tak::ConnectionService->new(
92     read_fh => $socket, write_fh => $socket,
93     listening_service => $self->router->clone,
94     on_close => sub { delete $conn_set->{$connection} }
95   );
96   $conn_set->{$connection} = $connection;
97   return;
98 }
99
100 1;