switch from CPS::Future to Future
[scpubgit/Object-Remote.git] / lib / Object / Remote / ConnectionServer.pm
1 package Object::Remote::ConnectionServer;
2
3 use Scalar::Util qw(blessed weaken);
4 use Module::Runtime qw(use_module);
5 use Object::Remote;
6 use Object::Remote::Logging qw( :log :dlog );
7 use Future;
8 use IO::Socket::UNIX;
9 use Moo;
10
11 has listen_on => (
12   is => 'ro',
13   coerce => sub {
14     return $_[0] if blessed($_[0]);
15     unlink($_[0]);
16     IO::Socket::UNIX->new(
17       Local => $_[0],
18       Listen => 1
19     ) or die "Couldn't liten to $_[0]: $!";
20   },
21   trigger => sub {
22     my ($self, $fh) = @_;
23     log_debug { "adding connection server to run loop because the trigger has executed" };
24     weaken($self);
25     Object::Remote->current_loop
26                   ->watch_io(
27                       handle => $fh,
28                       on_read_ready => sub { $self->_listen_ready($fh) }
29                     );
30   },
31 );
32
33 has connection_args => (
34  is => 'ro', default => sub { [] }
35 );
36
37 has connection_callback => (
38   is => 'ro', default => sub { sub { shift } }
39 );
40
41 sub BUILD {
42   log_debug { "A connection server has been built; calling want_run on run loop" };
43   Object::Remote->current_loop->want_run;
44 }
45
46 sub run {
47   log_debug { "Connection server is calling run_while_wanted on the run loop" };
48   Object::Remote->current_loop->run_while_wanted;
49 }
50
51 sub _listen_ready {
52   my ($self, $fh) = @_;
53   log_debug { "Got a connection, calling accept on the file handle" };
54   my $new = $fh->accept or die "Couldn't accept: $!";
55   log_trace { "Setting file handle non-blocking" };
56   $new->blocking(0);
57   my $f = Future->new;
58   log_trace { "Creating a new connection with the remote node" };
59   my $c = use_module('Object::Remote::Connection')->new(
60     receive_from_fh => $new,
61     send_to_fh => $new,
62     on_close => $f, # and so will die $c
63     @{$self->connection_args}
64   )->${\$self->connection_callback};
65   $f->on_ready(sub { undef($c) });
66   log_trace { "marking the future as done" };
67   $c->ready_future->done;
68   Dlog_trace { "Sending 'Shere' to socket $_" } $new;
69   print $new "Shere\n" or die "Couldn't send to new socket: $!";
70   log_debug { "Connection has been fully handled" };
71   return $c;
72 }
73
74 sub DEMOLISH {
75   my ($self, $gd) = @_;
76   log_debug { "A connection server is being destroyed; global destruction: '$gd'" };
77   return if $gd;
78   log_trace { "Removing the connection server IO watcher from run loop" };
79   Object::Remote->current_loop
80                 ->unwatch_io(
81                     handle => $self->listen_on,
82                     on_read_ready => 1
83                   );
84   if ($self->listen_on->can('hostpath')) {
85     log_debug { my $p = $self->listen_on->hostpath; "Removing '$p' from the filesystem" };
86     unlink($self->listen_on->hostpath);
87   }
88   log_trace { "calling want_stop on the run loop" };
89   Object::Remote->current_loop->want_stop;
90 }
91
92 1;