basic (working) ListenerService
Matt S Trout [Tue, 22 Nov 2011 19:07:52 +0000 (19:07 +0000)]
lib/Tak/Daemon/ListenerService.pm [new file with mode: 0644]

diff --git a/lib/Tak/Daemon/ListenerService.pm b/lib/Tak/Daemon/ListenerService.pm
new file mode 100644 (file)
index 0000000..3ef2a8b
--- /dev/null
@@ -0,0 +1,95 @@
+package Tak::Daemon::ListenerService;
+
+use Scalar::Util qw(weaken);
+use Moo;
+
+with 'Tak::Role::Service';
+
+has socket_location => (is => 'ro', required => 1);
+has router => (is => 'ro', required => 1);
+
+has state => (is => 'rw', default => sub { 'down' }, init_arg => undef);
+
+has _start_in_progress => (is => 'lazy', clearer => '_clear_start_in_progress');
+
+has listener => (is => 'rw', clearer => 'clear_listener');
+
+has connections => (is => 'ro', default => sub { {} });
+
+sub start_start_request {
+  my ($self, $req) = @_;
+  $req->result('already_started') if $self->state eq 'running';
+  push(@{$self->_start_in_progress->{requests}}, $req);
+  $self->_start_in_progress->{start}();
+}
+
+sub _build__start_in_progress {
+  my ($self) = @_;
+  weaken($self);
+  my %start = (requests => (my $requests = []));
+  $start{start} = sub {
+    $self->state('starting');
+    Tak->loop_upgrade;
+    Tak->loop->listen(
+      addr => {
+        family => "unix",
+        socktype => "stream",
+        path => $self->socket_location,
+      },
+      on_notifier => sub {
+        $self->listener($_[0]);
+        $_->success('started') for @$requests;
+        $self->_clear_start_in_progress;
+        $self->state('running');
+      },
+      on_resolve_error => sub { # no-op until we add non-unix
+        $_->failure(resolve => @_) for @$requests;
+        $self->_clear_start_in_progress;
+        $self->state('stopped');
+      },
+      on_listen_error => sub {
+        $_->failure(listen => @_) for @$requests;
+        $self->_clear_start_in_progress;
+        $self->state('stopped');
+      },
+      on_accept => sub {
+        $self->setup_connection($_[0]);
+      },
+    );
+    $start{start} = sub {}; # delete yourself
+  };
+  \%start;
+}
+
+sub handle_stop {
+  my ($self) = @_;
+  return 'already_stopped' if $self->state eq 'down';
+  # there's probably something more intelligent to do here, but meh
+  die failure => 'starting' if $self->state eq 'starting';
+  Tak->loop->remove($self->clear_listener);
+  $self->state('down');
+  return 'stopped';
+}
+
+sub DEMOLISH {
+  my ($self, $gd) = @_;
+  return if $gd;
+  if (my $l = $self->listener) {
+    $l->get_loop->remove($l);
+  }
+}
+
+sub setup_connection {
+  my ($self, $socket) = @_;
+  my $conn_set = $self->connections;
+  my $connection;
+  $connection = Tak::ConnectionService->new(
+    read_fh => $socket, write_fh => $socket,
+    listening_service => $self->router,
+    on_close => sub { delete $conn_set->{$connection} }
+  );
+  $conn_set->{$connection} = $connection;
+  return;
+}
+
+1;