basic (working) ListenerService
[scpubgit/Tak-Daemon.git] / lib / Tak / Daemon / ListenerService.pm
CommitLineData
dc2fd705 1package Tak::Daemon::ListenerService;
2
3use Scalar::Util qw(weaken);
4use Moo;
5
6with 'Tak::Role::Service';
7
8has socket_location => (is => 'ro', required => 1);
9has router => (is => 'ro', required => 1);
10
11has state => (is => 'rw', default => sub { 'down' }, init_arg => undef);
12
13has _start_in_progress => (is => 'lazy', clearer => '_clear_start_in_progress');
14
15has listener => (is => 'rw', clearer => 'clear_listener');
16
17has connections => (is => 'ro', default => sub { {} });
18
19sub start_start_request {
20 my ($self, $req) = @_;
21 $req->result('already_started') if $self->state eq 'running';
22 push(@{$self->_start_in_progress->{requests}}, $req);
23 $self->_start_in_progress->{start}();
24}
25
26sub _build__start_in_progress {
27 my ($self) = @_;
28 weaken($self);
29 my %start = (requests => (my $requests = []));
30 $start{start} = sub {
31 $self->state('starting');
32 Tak->loop_upgrade;
33 Tak->loop->listen(
34 addr => {
35 family => "unix",
36 socktype => "stream",
37 path => $self->socket_location,
38 },
39 on_notifier => sub {
40 $self->listener($_[0]);
41 $_->success('started') for @$requests;
42 $self->_clear_start_in_progress;
43 $self->state('running');
44 },
45 on_resolve_error => sub { # no-op until we add non-unix
46 $_->failure(resolve => @_) for @$requests;
47 $self->_clear_start_in_progress;
48 $self->state('stopped');
49 },
50 on_listen_error => sub {
51 $_->failure(listen => @_) for @$requests;
52 $self->_clear_start_in_progress;
53 $self->state('stopped');
54 },
55 on_accept => sub {
56 $self->setup_connection($_[0]);
57 },
58 );
59 $start{start} = sub {}; # delete yourself
60 };
61 \%start;
62}
63
64sub handle_stop {
65 my ($self) = @_;
66 return 'already_stopped' if $self->state eq 'down';
67 # there's probably something more intelligent to do here, but meh
68 die failure => 'starting' if $self->state eq 'starting';
69 Tak->loop->remove($self->clear_listener);
70 $self->state('down');
71 return 'stopped';
72}
73
74sub DEMOLISH {
75 my ($self, $gd) = @_;
76 return if $gd;
77 if (my $l = $self->listener) {
78 $l->get_loop->remove($l);
79 }
80}
81
82sub setup_connection {
83 my ($self, $socket) = @_;
84 my $conn_set = $self->connections;
85 my $connection;
86 $connection = Tak::ConnectionService->new(
87 read_fh => $socket, write_fh => $socket,
88 listening_service => $self->router,
89 on_close => sub { delete $conn_set->{$connection} }
90 );
91 $conn_set->{$connection} = $connection;
92 return;
93}
94
951;