Release commit for 0.001004
[scpubgit/Tak.git] / lib / Tak / ConnectorService.pm
CommitLineData
77bf1d9b 1package Tak::ConnectorService;
2
3use IPC::Open2;
a1cbf366 4use IO::Socket::UNIX;
42c84e78 5use IO::Socket::INET; # Sucks to be v6, see comment where used
77bf1d9b 6use IO::All;
7use Tak::Router;
8use Tak::Client;
9use Tak::ConnectionService;
2791fd73 10use Net::OpenSSH;
3f28f492 11use Tak::STDIONode;
77bf1d9b 12use Moo;
13
14with 'Tak::Role::Service';
15
16has connections => (is => 'ro', default => sub { Tak::Router->new });
17
2791fd73 18has ssh => (is => 'ro', default => sub { {} });
19
77bf1d9b 20sub handle_create {
7b71b06e 21 my ($self, $on, %args) = @_;
8facab5f 22 die [ mistake => "No target supplied to create" ] unless $on;
7b71b06e 23 my $log_level = $args{log_level}||'info';
24 my ($kid_in, $kid_out, $kid_pid) = $self->_open($on, $log_level);
42c84e78 25 if ($kid_pid) {
a1cbf366 26 $kid_in->print($Tak::STDIONode::DATA, "__END__\n") unless $on eq '-';
27 # Need to get a handshake to indicate STDIOSetup has finished
28 # messing around with file descriptors, otherwise we can severely
29 # confuse things by sending before the dup.
30 my $up = <$kid_out>;
31 die [ failure => "Garbled response from child: $up" ]
ac3e780c 32 unless $up eq "Shere\n";
a1cbf366 33 }
77bf1d9b 34 my $connection = Tak::ConnectionService->new(
35 read_fh => $kid_out, write_fh => $kid_in,
36 listening_service => Tak::Router->new
37 );
38 my $client = Tak::Client->new(service => $connection);
39 # actually, we should register with a monotonic id and
40 # stash the pid elsewhere. but meh for now.
41 my $pid = $client->do(meta => 'pid');
8facab5f 42 my $name = $on.':'.$pid;
2791fd73 43 my $conn_router = Tak::Router->new;
44 $conn_router->register(local => $connection->receiver->service);
45 $conn_router->register(remote => $connection);
46 $self->connections->register($name, $conn_router);
47 return ($name);
48}
49
50sub _open {
7b71b06e 51 my ($self, $on, @args) = @_;
8facab5f 52 if ($on eq '-') {
53 my $kid_pid = IPC::Open2::open2(my $kid_out, my $kid_in, 'tak-stdio-node', '-', @args)
2791fd73 54 or die "Couldn't open2 child: $!";
55 return ($kid_in, $kid_out, $kid_pid);
a1cbf366 56 } elsif ($on =~ /^\.?\//) { # ./foo or /foo
57 my $sock = IO::Socket::UNIX->new($on)
42c84e78 58 or die "Couldn't open unix domain socket ${on}: $!";
59 return ($sock, $sock, undef);
60 } elsif ($on =~ /:/) { # foo:80 we hope
61 # IO::Socket::IP is a better answer. But can pull in XS deps.
62 # Well, more strictly it pulls in Socket::GetAddrInfo, which can
63 # actually work without its XS implementation (just doesn't handle v6)
64 # and I've not properly pondered how to make things like fatpacking
65 # Just Fucking Work in such a circumstance. First person to need IPv6
66 # and be reading this comment, please start a conversation about it.
67 my $sock = IO::Socket::INET->new(PeerAddr => $on)
68 or die "Couldn't open TCP socket ${on}: $!";
69 return ($sock, $sock, undef);
2791fd73 70 }
71 my $ssh = $self->ssh->{$on} ||= Net::OpenSSH->new($on);
72 $ssh->error and
73 die "Couldn't establish ssh connection: ".$ssh->error;
7b71b06e 74 return $ssh->open2('perl','-', $on, @args);
77bf1d9b 75}
76
2791fd73 77sub start_connection_request {
77bf1d9b 78 my ($self, $req, @payload) = @_;;
79 $self->connections->start_request($req, @payload);
80}
81
2791fd73 82sub receive_connection {
77bf1d9b 83 my ($self, @payload) = @_;
84 $self->connections->receive(@payload);
85}
86
871;