Commit | Line | Data |
77bf1d9b |
1 | package Tak::ConnectorService; |
2 | |
3 | use IPC::Open2; |
a1cbf366 |
4 | use IO::Socket::UNIX; |
42c84e78 |
5 | use IO::Socket::INET; # Sucks to be v6, see comment where used |
77bf1d9b |
6 | use IO::All; |
7 | use Tak::Router; |
8 | use Tak::Client; |
9 | use Tak::ConnectionService; |
2791fd73 |
10 | use Net::OpenSSH; |
3f28f492 |
11 | use Tak::STDIONode; |
77bf1d9b |
12 | use Moo; |
13 | |
14 | with 'Tak::Role::Service'; |
15 | |
16 | has connections => (is => 'ro', default => sub { Tak::Router->new }); |
17 | |
2791fd73 |
18 | has ssh => (is => 'ro', default => sub { {} }); |
19 | |
77bf1d9b |
20 | sub handle_create { |
7b71b06e |
21 | my ($self, $on, %args) = @_; |
8facab5f |
22 | die [ mistake => "No target supplied to create" ] unless $on; |
7b71b06e |
23 | my $log_level = $args{log_level}||'info'; |
24 | my ($kid_in, $kid_out, $kid_pid) = $self->_open($on, $log_level); |
42c84e78 |
25 | if ($kid_pid) { |
a1cbf366 |
26 | $kid_in->print($Tak::STDIONode::DATA, "__END__\n") unless $on eq '-'; |
27 | # Need to get a handshake to indicate STDIOSetup has finished |
28 | # messing around with file descriptors, otherwise we can severely |
29 | # confuse things by sending before the dup. |
30 | my $up = <$kid_out>; |
31 | die [ failure => "Garbled response from child: $up" ] |
ac3e780c |
32 | unless $up eq "Shere\n"; |
a1cbf366 |
33 | } |
77bf1d9b |
34 | my $connection = Tak::ConnectionService->new( |
35 | read_fh => $kid_out, write_fh => $kid_in, |
36 | listening_service => Tak::Router->new |
37 | ); |
38 | my $client = Tak::Client->new(service => $connection); |
39 | # actually, we should register with a monotonic id and |
40 | # stash the pid elsewhere. but meh for now. |
41 | my $pid = $client->do(meta => 'pid'); |
8facab5f |
42 | my $name = $on.':'.$pid; |
2791fd73 |
43 | my $conn_router = Tak::Router->new; |
44 | $conn_router->register(local => $connection->receiver->service); |
45 | $conn_router->register(remote => $connection); |
46 | $self->connections->register($name, $conn_router); |
47 | return ($name); |
48 | } |
49 | |
50 | sub _open { |
7b71b06e |
51 | my ($self, $on, @args) = @_; |
8facab5f |
52 | if ($on eq '-') { |
53 | my $kid_pid = IPC::Open2::open2(my $kid_out, my $kid_in, 'tak-stdio-node', '-', @args) |
2791fd73 |
54 | or die "Couldn't open2 child: $!"; |
55 | return ($kid_in, $kid_out, $kid_pid); |
a1cbf366 |
56 | } elsif ($on =~ /^\.?\//) { # ./foo or /foo |
57 | my $sock = IO::Socket::UNIX->new($on) |
42c84e78 |
58 | or die "Couldn't open unix domain socket ${on}: $!"; |
59 | return ($sock, $sock, undef); |
60 | } elsif ($on =~ /:/) { # foo:80 we hope |
61 | # IO::Socket::IP is a better answer. But can pull in XS deps. |
62 | # Well, more strictly it pulls in Socket::GetAddrInfo, which can |
63 | # actually work without its XS implementation (just doesn't handle v6) |
64 | # and I've not properly pondered how to make things like fatpacking |
65 | # Just Fucking Work in such a circumstance. First person to need IPv6 |
66 | # and be reading this comment, please start a conversation about it. |
67 | my $sock = IO::Socket::INET->new(PeerAddr => $on) |
68 | or die "Couldn't open TCP socket ${on}: $!"; |
69 | return ($sock, $sock, undef); |
2791fd73 |
70 | } |
71 | my $ssh = $self->ssh->{$on} ||= Net::OpenSSH->new($on); |
72 | $ssh->error and |
73 | die "Couldn't establish ssh connection: ".$ssh->error; |
7b71b06e |
74 | return $ssh->open2('perl','-', $on, @args); |
77bf1d9b |
75 | } |
76 | |
2791fd73 |
77 | sub start_connection_request { |
77bf1d9b |
78 | my ($self, $req, @payload) = @_;; |
79 | $self->connections->start_request($req, @payload); |
80 | } |
81 | |
2791fd73 |
82 | sub receive_connection { |
77bf1d9b |
83 | my ($self, @payload) = @_; |
84 | $self->connections->receive(@payload); |
85 | } |
86 | |
87 | 1; |