client and server manage to talk
Matt S Trout [Fri, 28 Oct 2011 18:49:31 +0000 (18:49 +0000)]
lib/Tak/EvalService.pm [new file with mode: 0644]
lib/Tak/JSONChannel.pm [new file with mode: 0644]
lib/Tak/ModuleLoader.pm [new file with mode: 0644]
lib/Tak/Remote.pm [new file with mode: 0644]
lib/Tak/Request.pm [new file with mode: 0644]
lib/Tak/Router.pm [new file with mode: 0644]
lib/Tak/ServiceManager.pm [new file with mode: 0644]
takc [new file with mode: 0644]
takd [new file with mode: 0644]

diff --git a/lib/Tak/EvalService.pm b/lib/Tak/EvalService.pm
new file mode 100644 (file)
index 0000000..9a45db9
--- /dev/null
@@ -0,0 +1,44 @@
+package Tak::EvalService;
+
+use Eval::WithLexicals;
+use Moo;
+use Data::Dumper::Concise;
+use Capture::Tiny qw(capture);
+
+has 'eval_withlexicals' => (is => 'lazy');
+
+sub _build_eval_withlexicals {
+  Eval::WithLexicals->new
+}
+
+sub handle_eval {
+  my ($self, $perl) = @_;
+  unless ($perl) {
+    return MISTAKE => eval_input => "No code supplied";
+  }
+  if (my $ref = ref($perl)) {
+    return MISTAKE => eval_input => "Code was a ${ref} reference";
+  }
+  my ($code, @ret);
+  my ($stdout, $stderr);
+  if (eval {
+    ($stdout, $stderr) = capture {
+      @ret = $self->eval_withlexicals->eval($perl);
+    };
+    1
+  }) {
+    $code = 'RESULT';
+  } else {
+    ($code, @ret) = (FAILURE => $@);
+  }
+  my $dumped_ret;
+  unless (eval { $dumped_ret = Dumper(@ret); 1 }) {
+    $dumped_ret = "Error dumping ${code} result: $@";
+    $code = 'FAILURE';
+  }
+  return $code => {
+    stdout => $stdout, stderr => $stderr, return => $dumped_ret
+  };
+}
+
+1;
diff --git a/lib/Tak/JSONChannel.pm b/lib/Tak/JSONChannel.pm
new file mode 100644 (file)
index 0000000..9716768
--- /dev/null
@@ -0,0 +1,58 @@
+package Tak::JSONChannel;
+
+use JSON::PP qw(encode_json decode_json);
+use IO::Handle;
+use Scalar::Util qw(weaken);
+use Moo;
+
+has read_fh => (is => 'ro', required => 1);
+has write_fh => (is => 'ro', required => 1);
+
+sub BUILD { shift->write_fh->autoflush(1); }
+
+sub receive {
+  my ($self) = @_;
+  while (my $line = readline($self->read_fh)) {
+    if (my $unpacked = $self->_unpack_line($line)) {
+      return $unpacked;
+    }
+  }
+}
+
+sub _unpack_line {
+  my ($self, $line) = @_;
+  my $data = eval { decode_json($line) };
+  unless ($data) {
+    $self->send(MISTAKE => invalid_json => $@||'No data and no exception');
+    return;
+  }
+  unless (ref($data) eq 'ARRAY') {
+    $self->send(MISTAKE => message_format => "Not an ARRAY");
+    return;
+  }
+  unless (@$data > 0) {
+    $self->send(MISTAKE => message_format => "Empty request array");
+    return;
+  }
+  $data;
+}
+
+sub send {
+  my ($self, @msg) = @_;
+  my $json = eval { encode_json(\@msg) };
+  unless ($json) {
+    $self->_raw_send(
+      encode_json(
+        [ FAILURE => invalid_message => $@||'No data and no exception' ]
+      )
+    );
+  }
+  $self->_raw_send($json);
+}
+
+sub _raw_send {
+  my ($self, $raw) = @_;
+  print { $self->write_fh } $raw."\n";
+}
+
+1;
diff --git a/lib/Tak/ModuleLoader.pm b/lib/Tak/ModuleLoader.pm
new file mode 100644 (file)
index 0000000..8d5b837
--- /dev/null
@@ -0,0 +1,21 @@
+package Tak::ModuleLoader;
+
+use Moo;
+
+has remote => (is => 'ro', required => 1);
+
+sub inc_callback {
+  sub { $self->maybe_load_module($_[1]) }
+}
+
+sub maybe_load_module {
+  my ($self, $module) = @_;
+  my $result = $self->remote->blocking_request(source_for => $module);
+  if (my $code = $result->{code}) {
+    open my $fh, '<', \$code;
+    return $fh;
+  }
+  return;
+}
+
+1;
diff --git a/lib/Tak/Remote.pm b/lib/Tak/Remote.pm
new file mode 100644 (file)
index 0000000..7d1d7d6
--- /dev/null
@@ -0,0 +1,29 @@
+package Tak::Remote;
+
+use Moo;
+
+has router => (is => 'ro', required => 1);
+
+has name => (is => 'ro', required => 1);
+
+sub blocking_request {
+  my ($self, @payload) = @_;
+  local our $Request = $self->router->send_request($self, $self->name, @payload);
+  local our $Done;
+  local our @Result;
+  $self->router->run_until($Done);
+  return @Result;
+}
+
+sub send_response {
+  my ($self, $tag, @result) = @_;
+  {
+    our $Request;
+    die "Out of order response ${\$Request->tag}, expecting ${tag}"
+      if $Request->tag ne $tag;
+  }
+  our @Result = @result;
+  our $Done = 1;
+}
+
+1;
diff --git a/lib/Tak/Request.pm b/lib/Tak/Request.pm
new file mode 100644 (file)
index 0000000..669e027
--- /dev/null
@@ -0,0 +1,14 @@
+package Tak::Request;
+
+use Moo;
+
+has tag => (is => 'ro', required => 1);
+
+has respond_to => (is => 'ro', required => 1, weak_ref => 1);
+
+sub respond {
+  my $self = shift;
+  $self->respond_to->send_response($self->tag => @_);
+}
+
+1;
diff --git a/lib/Tak/Router.pm b/lib/Tak/Router.pm
new file mode 100644 (file)
index 0000000..214455c
--- /dev/null
@@ -0,0 +1,98 @@
+package Tak::Router;
+
+use Tak::Request;
+use Moo;
+
+has channel => (is => 'ro', required => 1);
+
+has local_request_handlers => (is => 'ro', default => sub { {} });
+
+has requests_received => (is => 'ro', default => sub { {} });
+
+has last_serial => (is => 'ro',default => sub { 'A0000' });
+
+sub next_serial { ++($_[0]->{last_serial}) }
+
+has requests_sent => (is => 'ro', default => sub { {} });
+
+sub run { shift->run_until }
+
+sub run_until {
+  my ($self, $done) = @_;
+  while (!$_[1] and my $message = $self->channel->receive) {
+    $self->receive(@$message);
+  }
+}
+
+sub receive {
+  my ($self, $type, @payload) = @_;
+  unless ($type) {
+    $self->channel->send(MISTAKE => message_format => "No message type");
+    return;
+  }
+  unless (@payload) {
+    $self->channel->send(MISTAKE => message_format => "Tag missing");
+    return;
+  }
+  unless (@payload > 1) {
+    $self->channel->send(MISTAKE => message_format => "No payload");
+  }
+  if ($type eq 'REQUEST') {
+    $self->receive_request(@payload);
+    return;
+  }
+  if ($type eq 'RESPONSE') {
+    $self->receive_response(@payload);
+    return;
+  }
+}
+
+sub receive_request {
+  my ($self, $tag, $handler_name, @payload) = @_;
+  if ($self->requests_received->{$tag}) {
+    $self->channel->send(
+      MISTAKE => request_tag => "Request for ${tag} in process"
+    );
+    return;
+  }
+  my $handler = $self->local_request_handlers->{$handler_name};
+  unless ($handler) {
+    $self->send_response(
+      $tag => MISTAKE => handler_name => "No such handler ${handler_name}"
+    );
+    return;
+  }
+  my $request
+    = $self->requests_received->{$tag}
+    = Tak::Request->new(
+        tag => $tag, respond_to => $self
+      );
+  $handler->start_request($request => @payload);
+}
+
+sub send_response {
+  my ($self, $tag, @result) = @_;
+  delete $self->requests_received->{$tag};
+  $self->channel->send(RESPONSE => $tag => @result);
+}
+
+sub send_request {
+  my ($self, $respond_to, @payload) = @_;
+  my $tag = $self->next_serial;
+  my $request
+    = $self->requests_sent->{$tag}
+    = Tak::Request->new(
+        tag => $tag,
+        respond_to => $respond_to
+      );
+  $self->channel->send(REQUEST => $tag => @payload);
+  return $request;
+}
+
+sub receive_response {
+  my ($self, $tag, @result) = @_;
+  my $request = delete $self->requests_sent->{$tag};
+  $request->respond(@result);
+}
+
+1;
diff --git a/lib/Tak/ServiceManager.pm b/lib/Tak/ServiceManager.pm
new file mode 100644 (file)
index 0000000..89d6f9e
--- /dev/null
@@ -0,0 +1,21 @@
+package Tak::ServiceManager;
+
+use Moo;
+
+has service => (is => 'ro', required => 1);
+
+sub start_request {
+  my ($self, $req, $type, @args) = @_;
+  unless ($type) {
+    $req->respond(MISTAKE => request_type => "No request type given");
+    return;
+  }
+  my $service = $self->service;
+  if (my $meth = $service->can("handle_${type}")) {
+    $req->respond($service->$meth(@args));
+    return;
+  }
+  $req->respond(MISTAKE => request_type => "Unknown request type ${type}");
+}
+
+1;
diff --git a/takc b/takc
new file mode 100644 (file)
index 0000000..91745e6
--- /dev/null
+++ b/takc
@@ -0,0 +1,26 @@
+use strictures 1;
+use Devel::Dwarn;
+use Tak::JSONChannel;
+use Tak::Router;
+use IPC::Open2;
+use Tak::Remote;
+
+my $pid = open2(my $out, my $in, $^X, qw(-Ilib takd))
+  or die "Couldn't open2 child: $!";
+
+my $channel = Tak::JSONChannel->new(
+  read_fh => $out,
+  write_fh => $in
+);
+
+my $router = Tak::Router->new(
+  channel => $channel,
+);
+
+my $remote = Tak::Remote->new(
+  router => $router,
+  name => 'EVAL'
+);
+
+::Dwarn([ $remote->blocking_request(eval => 'my $x = 1;') ]);
+::Dwarn([ $remote->blocking_request(eval => '1+$x;') ]);
diff --git a/takd b/takd
new file mode 100644 (file)
index 0000000..4dea863
--- /dev/null
+++ b/takd
@@ -0,0 +1,23 @@
+use strictures 1;
+use Tak::JSONChannel;
+use Tak::ServiceManager;
+use Tak::EvalService;
+use Tak::Router;
+
+my $channel = Tak::JSONChannel->new(
+  read_fh => \*STDIN,
+  write_fh => \*STDOUT
+);
+
+my $eval = Tak::ServiceManager->new(
+  service => Tak::EvalService->new
+);
+
+my $router = Tak::Router->new(
+  channel => $channel,
+  local_request_handlers => {
+    EVAL => $eval
+  }
+);
+
+$router->run;