Commit | Line | Data |
77bf1d9b |
1 | package Tak::ConnectorService; |
2 | |
3 | use IPC::Open2; |
a1cbf366 |
4 | use IO::Socket::UNIX; |
77bf1d9b |
5 | use IO::All; |
6 | use Tak::Router; |
7 | use Tak::Client; |
8 | use Tak::ConnectionService; |
2791fd73 |
9 | use Net::OpenSSH; |
3f28f492 |
10 | use Tak::STDIONode; |
77bf1d9b |
11 | use Moo; |
12 | |
13 | with 'Tak::Role::Service'; |
14 | |
15 | has connections => (is => 'ro', default => sub { Tak::Router->new }); |
16 | |
2791fd73 |
17 | has ssh => (is => 'ro', default => sub { {} }); |
18 | |
77bf1d9b |
19 | sub handle_create { |
7b71b06e |
20 | my ($self, $on, %args) = @_; |
8facab5f |
21 | die [ mistake => "No target supplied to create" ] unless $on; |
7b71b06e |
22 | my $log_level = $args{log_level}||'info'; |
23 | my ($kid_in, $kid_out, $kid_pid) = $self->_open($on, $log_level); |
a1cbf366 |
24 | unless ($on =~ /^\.?\//) { |
25 | $kid_in->print($Tak::STDIONode::DATA, "__END__\n") unless $on eq '-'; |
26 | # Need to get a handshake to indicate STDIOSetup has finished |
27 | # messing around with file descriptors, otherwise we can severely |
28 | # confuse things by sending before the dup. |
29 | my $up = <$kid_out>; |
30 | die [ failure => "Garbled response from child: $up" ] |
31 | unless $up eq "Ssyshere\n"; |
32 | } |
77bf1d9b |
33 | my $connection = Tak::ConnectionService->new( |
34 | read_fh => $kid_out, write_fh => $kid_in, |
35 | listening_service => Tak::Router->new |
36 | ); |
37 | my $client = Tak::Client->new(service => $connection); |
38 | # actually, we should register with a monotonic id and |
39 | # stash the pid elsewhere. but meh for now. |
40 | my $pid = $client->do(meta => 'pid'); |
8facab5f |
41 | my $name = $on.':'.$pid; |
2791fd73 |
42 | my $conn_router = Tak::Router->new; |
43 | $conn_router->register(local => $connection->receiver->service); |
44 | $conn_router->register(remote => $connection); |
45 | $self->connections->register($name, $conn_router); |
46 | return ($name); |
47 | } |
48 | |
49 | sub _open { |
7b71b06e |
50 | my ($self, $on, @args) = @_; |
8facab5f |
51 | if ($on eq '-') { |
52 | my $kid_pid = IPC::Open2::open2(my $kid_out, my $kid_in, 'tak-stdio-node', '-', @args) |
2791fd73 |
53 | or die "Couldn't open2 child: $!"; |
54 | return ($kid_in, $kid_out, $kid_pid); |
a1cbf366 |
55 | } elsif ($on =~ /^\.?\//) { # ./foo or /foo |
56 | my $sock = IO::Socket::UNIX->new($on) |
57 | or die "Couldn't open socket ${on}: $!"; |
58 | return ($sock, $sock, 'UNIX'); |
2791fd73 |
59 | } |
60 | my $ssh = $self->ssh->{$on} ||= Net::OpenSSH->new($on); |
61 | $ssh->error and |
62 | die "Couldn't establish ssh connection: ".$ssh->error; |
7b71b06e |
63 | return $ssh->open2('perl','-', $on, @args); |
77bf1d9b |
64 | } |
65 | |
2791fd73 |
66 | sub start_connection_request { |
77bf1d9b |
67 | my ($self, $req, @payload) = @_;; |
68 | $self->connections->start_request($req, @payload); |
69 | } |
70 | |
2791fd73 |
71 | sub receive_connection { |
77bf1d9b |
72 | my ($self, @payload) = @_; |
73 | $self->connections->receive(@payload); |
74 | } |
75 | |
76 | 1; |