INET connector
[scpubgit/Object-Remote.git] / lib / Object / Remote / ConnectionServer.pm
CommitLineData
5c608989 1package Object::Remote::ConnectionServer;
2
3use Scalar::Util qw(blessed weaken);
4use Module::Runtime qw(use_module);
5use Object::Remote;
9031635d 6use Object::Remote::Logging qw( :log :dlog );
783105c4 7use Future;
5c608989 8use IO::Socket::UNIX;
5c608989 9use Moo;
10
11has listen_on => (
12 is => 'ro',
13 coerce => sub {
14 return $_[0] if blessed($_[0]);
15 unlink($_[0]);
16 IO::Socket::UNIX->new(
17 Local => $_[0],
18 Listen => 1
19 ) or die "Couldn't liten to $_[0]: $!";
20 },
21 trigger => sub {
22 my ($self, $fh) = @_;
9031635d 23 log_debug { "adding connection server to run loop because the trigger has executed" };
5c608989 24 weaken($self);
25 Object::Remote->current_loop
26 ->watch_io(
27 handle => $fh,
28 on_read_ready => sub { $self->_listen_ready($fh) }
29 );
30 },
31);
32
33has connection_args => (
34 is => 'ro', default => sub { [] }
35);
36
3687a42d 37has connection_callback => (
38 is => 'ro', default => sub { sub { shift } }
39);
40
5c608989 41sub BUILD {
9031635d 42 log_debug { "A connection server has been built; calling want_run on run loop" };
5c608989 43 Object::Remote->current_loop->want_run;
44}
45
46sub run {
9031635d 47 log_debug { "Connection server is calling run_while_wanted on the run loop" };
5c608989 48 Object::Remote->current_loop->run_while_wanted;
49}
50
51sub _listen_ready {
52 my ($self, $fh) = @_;
9031635d 53 log_debug { "Got a connection, calling accept on the file handle" };
5c608989 54 my $new = $fh->accept or die "Couldn't accept: $!";
9031635d 55 log_trace { "Setting file handle non-blocking" };
5c608989 56 $new->blocking(0);
783105c4 57 my $f = Future->new;
9031635d 58 log_trace { "Creating a new connection with the remote node" };
5c608989 59 my $c = use_module('Object::Remote::Connection')->new(
60 receive_from_fh => $new,
61 send_to_fh => $new,
62 on_close => $f, # and so will die $c
63 @{$self->connection_args}
3687a42d 64 )->${\$self->connection_callback};
5c608989 65 $f->on_ready(sub { undef($c) });
9031635d 66 log_trace { "marking the future as done" };
5c608989 67 $c->ready_future->done;
55c0d020 68 Dlog_trace { "Sending 'Shere' to socket $_" } $new;
5c608989 69 print $new "Shere\n" or die "Couldn't send to new socket: $!";
9031635d 70 log_debug { "Connection has been fully handled" };
5c608989 71 return $c;
72}
73
74sub DEMOLISH {
75 my ($self, $gd) = @_;
9031635d 76 log_debug { "A connection server is being destroyed; global destruction: '$gd'" };
5c608989 77 return if $gd;
9031635d 78 log_trace { "Removing the connection server IO watcher from run loop" };
5c608989 79 Object::Remote->current_loop
80 ->unwatch_io(
81 handle => $self->listen_on,
82 on_read_ready => 1
83 );
84 if ($self->listen_on->can('hostpath')) {
9031635d 85 log_debug { my $p = $self->listen_on->hostpath; "Removing '$p' from the filesystem" };
5c608989 86 unlink($self->listen_on->hostpath);
87 }
9031635d 88 log_trace { "calling want_stop on the run loop" };
5c608989 89 Object::Remote->current_loop->want_stop;
90}
91
921;