From: Matt S Trout Date: Tue, 22 Nov 2011 19:07:52 +0000 (+0000) Subject: basic (working) ListenerService X-Git-Tag: v0.001001~12 X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?p=scpubgit%2FTak-Daemon.git;a=commitdiff_plain;h=dc2fd705138e74123bafb0d2c87c775493f22fe4 basic (working) ListenerService --- dc2fd705138e74123bafb0d2c87c775493f22fe4 diff --git a/lib/Tak/Daemon/ListenerService.pm b/lib/Tak/Daemon/ListenerService.pm new file mode 100644 index 0000000..3ef2a8b --- /dev/null +++ b/lib/Tak/Daemon/ListenerService.pm @@ -0,0 +1,95 @@ +package Tak::Daemon::ListenerService; + +use Scalar::Util qw(weaken); +use Moo; + +with 'Tak::Role::Service'; + +has socket_location => (is => 'ro', required => 1); +has router => (is => 'ro', required => 1); + +has state => (is => 'rw', default => sub { 'down' }, init_arg => undef); + +has _start_in_progress => (is => 'lazy', clearer => '_clear_start_in_progress'); + +has listener => (is => 'rw', clearer => 'clear_listener'); + +has connections => (is => 'ro', default => sub { {} }); + +sub start_start_request { + my ($self, $req) = @_; + $req->result('already_started') if $self->state eq 'running'; + push(@{$self->_start_in_progress->{requests}}, $req); + $self->_start_in_progress->{start}(); +} + +sub _build__start_in_progress { + my ($self) = @_; + weaken($self); + my %start = (requests => (my $requests = [])); + $start{start} = sub { + $self->state('starting'); + Tak->loop_upgrade; + Tak->loop->listen( + addr => { + family => "unix", + socktype => "stream", + path => $self->socket_location, + }, + on_notifier => sub { + $self->listener($_[0]); + $_->success('started') for @$requests; + $self->_clear_start_in_progress; + $self->state('running'); + }, + on_resolve_error => sub { # no-op until we add non-unix + $_->failure(resolve => @_) for @$requests; + $self->_clear_start_in_progress; + $self->state('stopped'); + }, + on_listen_error => sub { + $_->failure(listen => @_) for @$requests; + $self->_clear_start_in_progress; + $self->state('stopped'); + }, + on_accept => sub { + $self->setup_connection($_[0]); + }, + ); + $start{start} = sub {}; # delete yourself + }; + \%start; +} + +sub handle_stop { + my ($self) = @_; + return 'already_stopped' if $self->state eq 'down'; + # there's probably something more intelligent to do here, but meh + die failure => 'starting' if $self->state eq 'starting'; + Tak->loop->remove($self->clear_listener); + $self->state('down'); + return 'stopped'; +} + +sub DEMOLISH { + my ($self, $gd) = @_; + return if $gd; + if (my $l = $self->listener) { + $l->get_loop->remove($l); + } +} + +sub setup_connection { + my ($self, $socket) = @_; + my $conn_set = $self->connections; + my $connection; + $connection = Tak::ConnectionService->new( + read_fh => $socket, write_fh => $socket, + listening_service => $self->router, + on_close => sub { delete $conn_set->{$connection} } + ); + $conn_set->{$connection} = $connection; + return; +} + +1;