b9f59d1058e69efe482c46bc1a50f2158b581f73
[scpubgit/Tak.git] / lib / Tak / ConnectorService.pm
1 package Tak::ConnectorService;
2
3 use IPC::Open2;
4 use IO::All;
5 use Tak::Router;
6 use Tak::Client;
7 use Tak::ConnectionService;
8 use Net::OpenSSH;
9 use Tak::STDIONode;
10 use Moo;
11
12 with 'Tak::Role::Service';
13
14 has connections => (is => 'ro', default => sub { Tak::Router->new });
15
16 has ssh => (is => 'ro', default => sub { {} });
17
18 sub handle_create {
19   my ($self, $on, %args) = @_;
20   my $log_level = $args{log_level}||'info';
21   my ($kid_in, $kid_out, $kid_pid) = $self->_open($on, $log_level);
22   $kid_in->print($Tak::STDIONode::DATA, "__END__\n");
23   # Need to get a handshake to indicate STDIOSetup has finished
24   # messing around with file descriptors, otherwise we can severely
25   # confuse things by sending before the dup.
26   my $up = <$kid_out>;
27   die [ failure => "Garbled response from child: $up" ]
28     unless $up eq "Ssyshere\n";
29   my $connection = Tak::ConnectionService->new(
30     read_fh => $kid_out, write_fh => $kid_in,
31     listening_service => Tak::Router->new
32   );
33   my $client = Tak::Client->new(service => $connection);
34   # actually, we should register with a monotonic id and
35   # stash the pid elsewhere. but meh for now.
36   my $pid = $client->do(meta => 'pid');
37   my $name = ($on||'|').':'.$pid;
38   my $conn_router = Tak::Router->new;
39   $conn_router->register(local => $connection->receiver->service);
40   $conn_router->register(remote => $connection);
41   $self->connections->register($name, $conn_router);
42   return ($name);
43 }
44
45 sub _open {
46   my ($self, $on, @args) = @_;
47   unless ($on) {
48     my $kid_pid = IPC::Open2::open2(my $kid_out, my $kid_in, $^X, '-', '-', @args)
49       or die "Couldn't open2 child: $!";
50     return ($kid_in, $kid_out, $kid_pid);
51   }
52   my $ssh = $self->ssh->{$on} ||= Net::OpenSSH->new($on);
53   $ssh->error and
54     die "Couldn't establish ssh connection: ".$ssh->error;
55   return $ssh->open2('perl','-', $on, @args);
56 }
57
58 sub start_connection_request {
59   my ($self, $req, @payload) = @_;;
60   $self->connections->start_request($req, @payload);
61 }
62
63 sub receive_connection {
64   my ($self, @payload) = @_;
65   $self->connections->receive(@payload);
66 }
67
68 1;