new remote code
Matt S Trout [Fri, 4 Nov 2011 21:51:55 +0000 (21:51 +0000)]
13 files changed:
lib/Tak/Client.pm
lib/Tak/ConnectionReceiver.pm [new file with mode: 0644]
lib/Tak/ConnectionService.pm [new file with mode: 0644]
lib/Tak/ConnectorService.pm [new file with mode: 0644]
lib/Tak/JSONChannel.pm
lib/Tak/Loop.pm
lib/Tak/MetaService.pm
lib/Tak/Request.pm
lib/Tak/Result.pm
lib/Tak/Role/Service.pm
lib/Tak/Router.pm
lib/Tak/STDIOSetup.pm [new file with mode: 0644]
maint/mk-fat

index 04d86ef..949e9af 100644 (file)
@@ -33,6 +33,10 @@ sub _new_request {
 }
 
 sub do {
+  shift->result_of(@_)->get;
+}
+
+sub result_of {
   my ($self, @payload) = @_;
   my $done;
   my $result;
@@ -40,7 +44,7 @@ sub do {
     on_result => sub { $result = shift },
   }, @payload);
   Tak->loop_until($result);
-  $result->get;
+  return $result;
 }
 
 1;
diff --git a/lib/Tak/ConnectionReceiver.pm b/lib/Tak/ConnectionReceiver.pm
new file mode 100644 (file)
index 0000000..bb2ba15
--- /dev/null
@@ -0,0 +1,62 @@
+package Tak::ConnectionReceiver;
+
+use Tak::Request;
+use Scalar::Util qw(weaken);
+use Moo;
+
+with 'Tak::Role::Service';
+
+has requests => (is => 'ro', default => sub { {} });
+
+has channel => (is => 'ro', required => 1);
+
+has service => (is => 'ro', required => 1);
+
+sub BUILD {
+  weaken(my $self = shift);
+  my $channel = $self->channel;
+  Tak->loop->watch_io(
+    handle => $channel->read_fh,
+    on_read_ready => sub {
+      if (my $message = $channel->read_message) {
+        $self->receive(@$message);
+      }
+    }
+  );
+}
+
+sub DEMOLISH {
+  Tak->loop->unwatch_io(
+    handle => $_[0]->channel->read_fh,
+    on_read_ready => 1,
+  );
+}
+
+sub receive_request {
+  my ($self, $tag, $meta, @payload) = @_;
+  my $channel = $self->channel;
+  my $req = Tak::Request->new(
+    ($meta->{progress}
+        ? (on_progress => sub { $channel->write_message(progress => $tag => @_) })
+        : ()),
+    on_result => sub { $channel->write_message(result => $tag => $_[0]->flatten) }
+  );
+  $self->service->start_request($req => @payload);
+}
+
+sub receive_progress {
+  my ($self, $tag, @payload) = @_;
+  $self->requests->{$tag}->progress(@payload);
+}
+
+sub receive_result {
+  my ($self, $tag, @payload) = @_;
+  $self->requests->{$tag}->result(@payload);
+}
+
+sub receive_message {
+  my ($self, @payload) = @_;
+  $self->service->receive(@payload);
+}
+
+1;
diff --git a/lib/Tak/ConnectionService.pm b/lib/Tak/ConnectionService.pm
new file mode 100644 (file)
index 0000000..6390e80
--- /dev/null
@@ -0,0 +1,35 @@
+package Tak::ConnectionService;
+
+use Tak::ConnectionReceiver;
+use Tak::JSONChannel;
+use Moo;
+
+has receiver => (is => 'ro', writer => '_set_receiver');
+
+has channel => (is => 'ro', writer => '_set_channel');
+
+sub BUILD {
+  my ($self, $args) = @_;
+  my $channel = $self->_set_channel(
+    Tak::JSONChannel->new(map +($_ => $args->{$_}), qw(read_fh write_fh))
+  );
+  my $receiver = $self->_set_receiver(
+    Tak::ConnectionReceiver->new(
+      channel => $channel, service => $args->{listening_service}
+    )
+  );
+}
+
+sub start_request {
+  my ($self, $req, @payload) = @_;
+  $self->receiver->requests->{my $tag = "$req"} = $req;
+  my $meta = { progress => !!$req->on_progress };
+  $self->channel->write_message(request => $tag => $meta => @payload);
+}
+
+sub receive {
+  my ($self, @payload) = @_;
+  $self->channel->write_message(message => @payload);
+}
+
+1;
diff --git a/lib/Tak/ConnectorService.pm b/lib/Tak/ConnectorService.pm
new file mode 100644 (file)
index 0000000..8a244a2
--- /dev/null
@@ -0,0 +1,41 @@
+package Tak::ConnectorService;
+
+use IPC::Open2;
+use IO::All;
+use Tak::Router;
+use Tak::Client;
+use Tak::ConnectionService;
+use Moo;
+
+with 'Tak::Role::Service';
+
+has connections => (is => 'ro', default => sub { Tak::Router->new });
+
+sub handle_create {
+  my ($self) = @_;
+  my $kid_pid = IPC::Open2::open2(my $kid_out, my $kid_in, $^X, '-')
+    or die "Couldn't open2 child: $!";
+  io($kid_in)->print(io('maint/mk-fat |')->all, "__END__\n");
+  my $connection = Tak::ConnectionService->new(
+    read_fh => $kid_out, write_fh => $kid_in,
+    listening_service => Tak::Router->new
+  );
+  my $client = Tak::Client->new(service => $connection);
+  # actually, we should register with a monotonic id and
+  # stash the pid elsewhere. but meh for now.
+  my $pid = $client->do(meta => 'pid');
+  $self->connections->register('|'.$pid, $connection);
+  return ('proxy', '|'.$pid);
+}
+
+sub start_proxy_request {
+  my ($self, $req, @payload) = @_;;
+  $self->connections->start_request($req, @payload);
+}
+
+sub receive_proxy {
+  my ($self, @payload) = @_;
+  $self->connections->receive(@payload);
+}
+
+1;
index 6bfb16a..c06054e 100644 (file)
@@ -10,9 +10,9 @@ has write_fh => (is => 'ro', required => 1);
 
 sub BUILD { shift->write_fh->autoflush(1); }
 
-sub receive {
+sub read_message {
   my ($self) = @_;
-  while (my $line = readline($self->read_fh)) {
+  if (defined(my $line = readline($self->read_fh))) {
     if (my $unpacked = $self->_unpack_line($line)) {
       return $unpacked;
     }
@@ -23,35 +23,35 @@ sub _unpack_line {
   my ($self, $line) = @_;
   my $data = eval { decode_json($line) };
   unless ($data) {
-    $self->send(MISTAKE => invalid_json => $@||'No data and no exception');
+    $self->write_message(mistake => invalid_json => $@||'No data and no exception');
     return;
   }
   unless (ref($data) eq 'ARRAY') {
-    $self->send(MISTAKE => message_format => "Not an ARRAY");
+    $self->write_message(mistake => message_format => "Not an ARRAY");
     return;
   }
   unless (@$data > 0) {
-    $self->send(MISTAKE => message_format => "Empty request array");
+    $self->write_message(mistake => message_format => "Empty request array");
     return;
   }
   $data;
 }
 
-sub send {
+sub write_message {
   my ($self, @msg) = @_;
   my $json = eval { encode_json(\@msg) };
   unless ($json) {
-    $self->_raw_send(
+    $self->_raw_write_message(
       encode_json(
-        [ FAILURE => invalid_message => $@||'No data and no exception' ]
+        [ failure => invalid_message => $@||'No data and no exception' ]
       )
     );
     return;
   }
-  $self->_raw_send($json);
+  $self->_raw_write_message($json);
 }
 
-sub _raw_send {
+sub _raw_write_message {
   my ($self, $raw) = @_;
 #warn "Sending: ${raw}\n";
   print { $self->write_fh } $raw."\n";
index fe9dff9..888136c 100644 (file)
@@ -1,5 +1,46 @@
 package Tak::Loop;
 
+use IO::Select;
 use Moo;
 
+has is_running => (is => 'rw', clearer => 'loop_stop');
+
+has _read_watches => (is => 'ro', default => sub { {} });
+has _read_select => (is => 'ro', default => sub { IO::Select->new });
+
+sub watch_io {
+  my ($self, %watch) = @_;
+  my $fh = $watch{handle};
+  if (my $cb = $watch{on_read_ready}) {
+    $self->_read_select->add($fh);
+    $self->_read_watches->{$fh} = $cb;
+  }
+}
+
+sub unwatch_io {
+  my ($self, %watch) = @_;
+  my $fh = $watch{handle};
+  if ($watch{on_read_ready}) {
+    $self->_read_select->remove($fh);
+    delete $self->_read_watches->{$fh};
+  }
+}
+
+sub loop_once {
+  my ($self) = @_;
+  my $read = $self->_read_watches;
+  my ($readable) = IO::Select->select($self->_read_select);
+  foreach my $fh (@$readable) {
+    $read->{$fh}();
+  }
+}
+
+sub loop_forever {
+  my ($self) = @_;
+  $self->is_running(1);
+  while ($self->is_running) {
+    $self->loop_once;
+  }
+}
+
 1;
index 9e32704..4dfe9d4 100644 (file)
@@ -6,6 +6,10 @@ with 'Tak::Role::Service';
 
 has router => (is => 'ro', required => 1, weak_ref => 1);
 
+sub handle_pid {
+  return $$;
+}
+
 sub handle_register {
   my ($self, $name, $class, @args) = @_;
   (my $file = $class) =~ s/::/\//g;
index 342ddba..49731d9 100644 (file)
@@ -19,6 +19,11 @@ sub result {
   $self->on_result->(Tak::Result->new(type => $type, data => \@data));
 }
 
+sub flatten {
+  my ($self) = @_;
+  return ($self->type, @{$self->data});
+}
+
 sub success { shift->result(success => @_) }
 sub mistake { shift->result(mistake => @_) }
 sub failure { shift->result(failure => @_) }
index a7afb4f..2c06242 100644 (file)
@@ -5,6 +5,8 @@ use Moo;
 has type => (is => 'ro', required => 1);
 has data => (is => 'ro', required => 1);
 
+sub flatten { $_[0]->type, @{$_[0]->data} }
+
 sub get {
   my ($self) = @_;
   $self->throw unless $self->type eq 'success';
index a746d15..3ee3d7f 100644 (file)
@@ -3,14 +3,14 @@ package Tak::Role::Service;
 use Moo::Role;
 
 sub start_request {
-  my ($self, $req, $type, @args) = @_;
+  my ($self, $req, $type, @payload) = @_;
   unless ($type) {
     $req->mistake(request_type => "No request type given");
     return;
   }
   if (my $meth = $self->can("handle_${type}")) {
     my @result;
-    if (eval { @result = $self->$meth(@args); 1 }) {
+    if (eval { @result = $self->$meth(@payload); 1 }) {
       $req->success(@result);
     } else {
       if (ref($@) eq 'ARRAY') {
@@ -19,11 +19,18 @@ sub start_request {
         $req->failure(exception => $@);
       }
     }
+  } elsif ($meth = $self->can("start_${type}_request")) {
+    $self->$meth($req => @payload);
   } else {
     $req->mistake(request_type => "Unknown request type ${type}");
   }
 }
 
-sub receive { }
+sub receive {
+  my ($self, $type, @payload) = @_;
+  if (my $meth = $self->can("receive_${type}")) {
+    $self->$meth(@payload);
+  }
+}
 
 1;
index 3041f02..f7e56dd 100644 (file)
@@ -1,9 +1,15 @@
 package Tak::Router;
 
+use Tak::MetaService;
 use Moo;
 
 has services => (is => 'ro', default => sub { {} });
 
+sub BUILD {
+  my ($self) = @_;
+  $self->register(meta => Tak::MetaService->new(router => $self));
+}
+
 sub start_request {
   my ($self, $req, $target, @payload) = @_;
   $req->mistake("Reached router with no target")
diff --git a/lib/Tak/STDIOSetup.pm b/lib/Tak/STDIOSetup.pm
new file mode 100644 (file)
index 0000000..fed4656
--- /dev/null
@@ -0,0 +1,19 @@
+package Tak::STDIOSetup;
+
+use Tak::ConnectionService;
+use Tak::Router;
+use Tak;
+use strictures 1;
+
+sub run {
+  open my $stdin, '<&', \*STDIN;
+  open my $stdout, '>&', \*STDOUT;
+  close STDIN; close STDOUT;
+  my $connection = Tak::ConnectionService->new(
+    read_fh => $stdin, write_fh => $stdout,
+    listening_service => Tak::Router->new
+  );
+  Tak->loop->loop_forever;
+}
+
+1;
index 5d22b2d..a23f132 100755 (executable)
@@ -4,4 +4,4 @@
 #fatpack tree $(fatpack packlists-for strictures.pm Moo.pm JSON/PP.pm)
 fatpack file
 #rm -r fatlib
-echo "use lib 'lib'; use Tak::World; Tak::World->new_from_stdio->run;"
+echo "use lib 'lib'; use Tak::STDIOSetup; Tak::STDIOSetup->run;"