update logging api to match log-contextual 0.005
[scpubgit/Object-Remote.git] / lib / Object / Remote / ConnectionServer.pm
1 package Object::Remote::ConnectionServer;
2
3 use Scalar::Util qw(blessed weaken);
4 use Module::Runtime qw(use_module);
5 use Object::Remote;
6 use Object::Remote::Logging qw( :log :dlog );
7 use IO::Socket::UNIX;
8 use Moo;
9
10 has listen_on => (
11   is => 'ro',
12   coerce => sub {
13     return $_[0] if blessed($_[0]);
14     unlink($_[0]);
15     IO::Socket::UNIX->new(
16       Local => $_[0],
17       Listen => 1
18     ) or die "Couldn't liten to $_[0]: $!";
19   },
20   trigger => sub {
21     my ($self, $fh) = @_;
22     log_debug { "adding connection server to run loop because the trigger has executed" };
23     weaken($self);
24     Object::Remote->current_loop
25                   ->watch_io(
26                       handle => $fh,
27                       on_read_ready => sub { $self->_listen_ready($fh) }
28                     );
29   },
30 );
31
32 has connection_args => (
33  is => 'ro', default => sub { [] }
34 );
35
36 has connection_callback => (
37   is => 'ro', default => sub { sub { shift } }
38 );
39
40 sub BUILD {
41   log_debug { "A connection server has been built; calling want_run on run loop" };
42   Object::Remote->current_loop->want_run;
43 }
44
45 sub run {
46   log_debug { "Connection server is calling run_while_wanted on the run loop" };
47   Object::Remote->current_loop->run_while_wanted;
48 }
49
50 sub _listen_ready {
51   my ($self, $fh) = @_;
52   log_debug { "Got a connection, calling accept on the file handle" };
53   my $new = $fh->accept or die "Couldn't accept: $!";
54   log_trace { "Setting file handle non-blocking" };
55   $new->blocking(0);
56   my $f = CPS::Future->new;
57   log_trace { "Creating a new connection with the remote node" };
58   my $c = use_module('Object::Remote::Connection')->new(
59     receive_from_fh => $new,
60     send_to_fh => $new,
61     on_close => $f, # and so will die $c
62     @{$self->connection_args}
63   )->${\$self->connection_callback};
64   $f->on_ready(sub { undef($c) });
65   log_trace { "marking the future as done" };
66   $c->ready_future->done;
67   Dlog_trace { "Sending 'Shere' to socket $_" } $new; 
68   print $new "Shere\n" or die "Couldn't send to new socket: $!";
69   log_debug { "Connection has been fully handled" };
70   return $c;
71 }
72
73 sub DEMOLISH {
74   my ($self, $gd) = @_;
75   log_debug { "A connection server is being destroyed; global destruction: '$gd'" };
76   return if $gd;
77   log_trace { "Removing the connection server IO watcher from run loop" };
78   Object::Remote->current_loop
79                 ->unwatch_io(
80                     handle => $self->listen_on,
81                     on_read_ready => 1
82                   );
83   if ($self->listen_on->can('hostpath')) {
84     log_debug { my $p = $self->listen_on->hostpath; "Removing '$p' from the filesystem" };
85     unlink($self->listen_on->hostpath);
86   }
87   log_trace { "calling want_stop on the run loop" };
88   Object::Remote->current_loop->want_stop;
89 }
90
91 1;