1 package Tak::Daemon::ListenerService;
3 use Scalar::Util qw(weaken);
6 with 'Tak::Role::Service';
8 has listen_on => (is => 'ro', required => 1);
9 has router => (is => 'ro', required => 1);
11 has state => (is => 'rw', default => sub { 'down' }, init_arg => undef);
13 has _start_in_progress => (is => 'lazy', clearer => '_clear_start_in_progress');
15 has listener => (is => 'rw', clearer => 'clear_listener');
17 has connections => (is => 'ro', default => sub { {} });
19 sub start_start_request {
20 my ($self, $req) = @_;
21 $req->result('already_started') if $self->state eq 'running';
22 push(@{$self->_start_in_progress->{requests}}, $req);
23 $self->_start_in_progress->{start}();
26 sub _build__start_in_progress {
29 my %start = (requests => (my $requests = []));
30 my $listen_on = $self->listen_on;
35 ? (family => "inet", %$_)
36 : (family => "unix", path => $_)
40 $self->state('starting');
45 $self->listener($_[0]);
46 $_->success('started') for @$requests;
47 $self->_clear_start_in_progress;
48 $self->state('running');
50 on_resolve_error => sub { # no-op until we add non-unix
51 $_->failure(resolve => @_) for @$requests;
52 $self->_clear_start_in_progress;
53 $self->state('stopped');
55 on_listen_error => sub {
56 $_->failure(listen => @_) for @$requests;
57 $self->_clear_start_in_progress;
58 $self->state('stopped');
61 $self->setup_connection($_[0]);
63 on_accept_error => sub {
67 $start{start} = sub {}; # delete yourself
74 return 'already_stopped' if $self->state eq 'down';
75 # there's probably something more intelligent to do here, but meh
76 die failure => 'starting' if $self->state eq 'starting';
77 Tak->loop->remove($self->clear_listener);
78 !ref and unlink($_) for $self->listen_on;
84 my ($self, $in_global_destruction) = @_;
86 return unless $self->state eq 'running';
88 !ref and unlink($_) for $self->listen_on;
90 return if $in_global_destruction;
92 Tak->loop->remove($self->listener);
95 sub setup_connection {
96 my ($self, $socket) = @_;
97 my $conn_set = $self->connections;
99 my $connection = Tak::ConnectionService->new(
100 read_fh => $socket, write_fh => $socket,
101 listening_service => $self->router->clone_or_self,
102 on_close => sub { delete $conn_set->{$conn_str} }
104 $conn_str = "$connection";
105 $connection->receiver->service->service->register_weak(remote => $connection);
106 $conn_set->{$conn_str} = $connection;