new Client/Router/Service arch
Matt S Trout [Tue, 1 Nov 2011 20:00:04 +0000 (20:00 +0000)]
16 files changed:
bin/tak-repl
lib/Tak.pm [new file with mode: 0644]
lib/Tak/Client.pm [new file with mode: 0644]
lib/Tak/EvalService.pm
lib/Tak/JSONChannel.pm
lib/Tak/Loop.pm [new file with mode: 0644]
lib/Tak/MetaService.pm
lib/Tak/REPL.pm
lib/Tak/Request.pm
lib/Tak/Result.pm [new file with mode: 0644]
lib/Tak/Role/ObjectMangling.pm [new file with mode: 0644]
lib/Tak/Role/Service.pm [new file with mode: 0644]
lib/Tak/Router.pm
lib/Tak/ServiceManager.pm [deleted file]
lib/Tak/World.pm
maint/mk-fat

index 7bc5d3e..2148f2d 100644 (file)
@@ -1,15 +1,19 @@
 #!/usr/bin/env perl
 
-use Tak::WorldHandle;
+use Tak::Client;
+use Tak::Router;
+use Tak::MetaService;
 use Tak::REPL;
 use strictures 1;
 
-my $world = do {
-  if (my $ssh_target = $ARGV[0]) {
-    Tak::WorldHandle->new_remote($ssh_target);
-  } else {
-    Tak::WorldHandle->new_local;
-  }
-};
+my $router = Tak::Router->new;
 
-Tak::REPL->new(world => $world)->run;
+$router->register(meta => Tak::MetaService->new(router => $router));
+
+my $client = Tak::Client->new(service => $router);
+
+$client->curry('meta')->do(register => eval => 'Tak::EvalService');
+
+my $repl = Tak::REPL->new(client => $client->curry('eval'));
+
+$repl->run;
diff --git a/lib/Tak.pm b/lib/Tak.pm
new file mode 100644 (file)
index 0000000..33bc378
--- /dev/null
@@ -0,0 +1,17 @@
+package Tak;
+
+use Tak::Loop;
+use strictures 1;
+
+our $loop;
+
+sub loop { $loop ||= Tak::Loop->new }
+
+sub loop_until {
+  my ($class, $done) = @_;
+  return if $done;
+  my $loop = $class->loop;
+  $loop->loop_once until $_[1];
+}
+
+1;
diff --git a/lib/Tak/Client.pm b/lib/Tak/Client.pm
new file mode 100644 (file)
index 0000000..04d86ef
--- /dev/null
@@ -0,0 +1,46 @@
+package Tak::Client;
+
+use Tak;
+use Tak::Request;
+use Moo;
+
+has service => (is => 'ro', required => 1);
+
+has curried => (is => 'ro', default => sub { [] });
+
+sub curry {
+  my ($self, @curry) = @_;
+  (ref $self)->new(%$self, curried => [ @{$self->curried}, @curry ]);
+}
+
+sub send {
+  my ($self, @message) = @_;
+  $self->service->receive(@{$self->curried}, @message);
+}
+
+sub start {
+  my ($self, $register, @payload) = @_;
+  my $req = $self->_new_request($register);
+  $self->service->start_request($req, @{$self->curried}, @payload);
+  return $req;
+}
+
+sub request_class { 'Tak::Request' }
+
+sub _new_request {
+  my ($self, $args) = @_;
+  $self->request_class->new($args);
+}
+
+sub do {
+  my ($self, @payload) = @_;
+  my $done;
+  my $result;
+  my $req = $self->start({
+    on_result => sub { $result = shift },
+  }, @payload);
+  Tak->loop_until($result);
+  $result->get;
+}
+
+1;
index 9a45db9..629beae 100644 (file)
@@ -1,9 +1,11 @@
 package Tak::EvalService;
 
 use Eval::WithLexicals;
-use Moo;
 use Data::Dumper::Concise;
 use Capture::Tiny qw(capture);
+use Moo;
+
+with 'Tak::Role::Service';
 
 has 'eval_withlexicals' => (is => 'lazy');
 
@@ -14,12 +16,12 @@ sub _build_eval_withlexicals {
 sub handle_eval {
   my ($self, $perl) = @_;
   unless ($perl) {
-    return MISTAKE => eval_input => "No code supplied";
+    die [ mistake => eval_input => "No code supplied" ];
   }
   if (my $ref = ref($perl)) {
-    return MISTAKE => eval_input => "Code was a ${ref} reference";
+    die [ mistake => eval_input => "Code was a ${ref} reference" ];
   }
-  my ($code, @ret);
+  my ($ok, @ret);
   my ($stdout, $stderr);
   if (eval {
     ($stdout, $stderr) = capture {
@@ -27,17 +29,18 @@ sub handle_eval {
     };
     1
   }) {
-    $code = 'RESULT';
+    $ok = 1;
   } else {
-    ($code, @ret) = (FAILURE => $@);
+    ($ok, @ret) = (0, $@);
   }
   my $dumped_ret;
   unless (eval { $dumped_ret = Dumper(@ret); 1 }) {
-    $dumped_ret = "Error dumping ${code} result: $@";
-    $code = 'FAILURE';
+    $dumped_ret = "Error dumping ${\($ok ? 'result' : 'exception')}: $@";
+    $ok = 0;
   }
-  return $code => {
-    stdout => $stdout, stderr => $stderr, return => $dumped_ret
+  return {
+    stdout => $stdout, stderr => $stderr,
+    ($ok ? 'return' : 'exception') => $dumped_ret
   };
 }
 
index fd3e7b5..6bfb16a 100644 (file)
@@ -53,6 +53,7 @@ sub send {
 
 sub _raw_send {
   my ($self, $raw) = @_;
+#warn "Sending: ${raw}\n";
   print { $self->write_fh } $raw."\n";
 }
 
diff --git a/lib/Tak/Loop.pm b/lib/Tak/Loop.pm
new file mode 100644 (file)
index 0000000..fe9dff9
--- /dev/null
@@ -0,0 +1,5 @@
+package Tak::Loop;
+
+use Moo;
+
+1;
index b0b6af9..9e32704 100644 (file)
@@ -2,6 +2,8 @@ package Tak::MetaService;
 
 use Moo;
 
+with 'Tak::Role::Service';
+
 has router => (is => 'ro', required => 1, weak_ref => 1);
 
 sub handle_register {
@@ -10,7 +12,7 @@ sub handle_register {
   require "${file}.pm";
   my $new = $class->new(@args);
   $self->router->register($name => $new);
-  return SUCCESS => "Registered ${name}";
+  return "Registered ${name}";
 }
 
 1;
index 4c4f161..835f659 100644 (file)
@@ -3,40 +3,27 @@ package Tak::REPL;
 use Term::ReadLine;
 use Moo;
 
-has world => (is => 'ro', required => 1);
-
-has remote => (is => 'lazy');
-
-sub _build_remote {
-  my ($self) = @_;
-  my $world = $self->world;
-  $world->remote_for('meta')->blocking_request(
-    register => eval => 'Tak::EvalService'
-  );
-  $world->remote_for('eval')
-}
+has client => (is => 'ro', required => 1);
 
 sub run {
-  my $remote = $_[0]->remote;
+  my $client = $_[0]->client;
   my $read = Term::ReadLine->new('REPL');
 
   while (1) {
     my $line = $read->readline('re.pl$ ');
     last unless defined $line;
     next unless length $line;
-    my @reply = $remote->blocking_request(eval => $line);
-    if ($reply[0] eq 'MISTAKE') {
-      die "Botch: ".join(': ', @reply[1,2]);
-    }
-    my $ret = $reply[1];
-    print $ret->{return};
-    if ($ret->{stdout}) {
-      chomp($ret->{stdout});
-      print "STDOUT:\n${\$ret->{stdout}}\n";
+    my $result = $client->do(eval => $line);
+    print exists($result->{return})
+            ? $result->{return}
+            : "Error: ".$result->{exception};
+    if ($result->{stdout}) {
+      chomp($result->{stdout});
+      print "STDOUT:\n${\$result->{stdout}}\n";
     }
-    if ($ret->{stderr}) {
-      chomp($ret->{stderr});
-      print "STDERR:\n${\$ret->{stderr}}\n";
+    if ($result->{stderr}) {
+      chomp($result->{stderr});
+      print "STDERR:\n${\$result->{stderr}}\n";
     }
   }
 }
index 2ba8262..342ddba 100644 (file)
@@ -1,16 +1,27 @@
 package Tak::Request;
 
+use Tak::Result;
 use Moo;
 
-has tag => (is => 'ro', required => 1);
+has on_progress => (is => 'ro');
 
-has respond_to => (is => 'ro', required => 1, weak_ref => 1);
+has on_result => (is => 'ro', required => 1);
 
-has respond_with => (is => 'ro', required => 1);
+sub progress {
+  my ($self, @report) = @_;
+  if (my $cb = $self->on_progress) {
+    $cb->(@report);
+  }
+}
 
-sub respond {
-  my $self = shift;
-  $self->respond_to->${\$self->respond_with}($self->tag => @_);
+sub result {
+  my ($self, $type, @data) = @_;
+  $self->on_result->(Tak::Result->new(type => $type, data => \@data));
 }
 
+sub success { shift->result(success => @_) }
+sub mistake { shift->result(mistake => @_) }
+sub failure { shift->result(failure => @_) }
+sub fatal { shift->result(fatal => @_) }
+
 1;
diff --git a/lib/Tak/Result.pm b/lib/Tak/Result.pm
new file mode 100644 (file)
index 0000000..a7afb4f
--- /dev/null
@@ -0,0 +1,24 @@
+package Tak::Result;
+
+use Moo;
+
+has type => (is => 'ro', required => 1);
+has data => (is => 'ro', required => 1);
+
+sub get {
+  my ($self) = @_;
+  $self->throw unless $self->type eq 'success';
+  return wantarray ? @{$self->data} : $self->data->[0];
+}
+
+sub throw {
+  my ($self) = @_;
+  die $self->exception;
+}
+
+sub exception {
+  my ($self) = @_;
+  $self->type.': '.join ' ', @{$self->data};
+}
+
+1;
diff --git a/lib/Tak/Role/ObjectMangling.pm b/lib/Tak/Role/ObjectMangling.pm
new file mode 100644 (file)
index 0000000..9ef5380
--- /dev/null
@@ -0,0 +1,36 @@
+package Tak::Role::ObjectMangling;
+
+use Scalar::Util qw(weaken);
+use JSON::PP qw(encode_json decode_json);
+
+use Moo::Role;
+
+requires 'inflate';
+requires 'deflate';
+
+has encoder_json => (is => 'lazy');
+has decoder_json => (is => 'lazy');
+
+sub _build_encoder_json {
+  JSON::PP->new->allow_nonref(1)->convert_blessed(1);
+}
+
+sub _build_decoder_json {
+  my $self = shift;
+  weaken($self);
+  JSON::PP->new->allow_nonref(1)->filter_json_single_key_object(
+    __proxied_object__ => sub { $self->inflate($_[0]) }
+  );
+}
+
+sub encode_objects {
+  my ($self, $data) = @_;
+  local *UNIVERSAL::TO_JSON = sub { $self->deflate($_[0]) };
+  decode_json($self->encoder_json->encode($data));
+}
+
+sub decode_objects {
+  my ($self, $data) = @_;
+  $self->decoder_json->decode(encode_json($data));
+}
+
diff --git a/lib/Tak/Role/Service.pm b/lib/Tak/Role/Service.pm
new file mode 100644 (file)
index 0000000..a746d15
--- /dev/null
@@ -0,0 +1,29 @@
+package Tak::Role::Service;
+
+use Moo::Role;
+
+sub start_request {
+  my ($self, $req, $type, @args) = @_;
+  unless ($type) {
+    $req->mistake(request_type => "No request type given");
+    return;
+  }
+  if (my $meth = $self->can("handle_${type}")) {
+    my @result;
+    if (eval { @result = $self->$meth(@args); 1 }) {
+      $req->success(@result);
+    } else {
+      if (ref($@) eq 'ARRAY') {
+        $req->result(@{$@});
+      } else {
+        $req->failure(exception => $@);
+      }
+    }
+  } else {
+    $req->mistake(request_type => "Unknown request type ${type}");
+  }
+}
+
+sub receive { }
+
+1;
index 3e6853b..3041f02 100644 (file)
 package Tak::Router;
 
-use Tak::Request;
-use Tak::ServiceManager;
-use Tak::MetaService;
 use Moo;
 
-has channel => (is => 'ro', required => 1);
+has services => (is => 'ro', default => sub { {} });
 
-has local_request_handlers => (is => 'ro', default => sub { {} });
-
-has requests_received => (is => 'ro', default => sub { {} });
-
-has last_serial => (is => 'ro',default => sub { 'A0000' });
-
-sub next_serial { ++($_[0]->{last_serial}) }
-
-has requests_sent => (is => 'ro', default => sub { {} });
-
-sub BUILD {
-  my ($self) = @_;
-  $self->register(meta => Tak::MetaService->new(router => $self));
-}
-
-sub run { shift->run_until }
-
-sub run_until {
-  my ($self, $done) = @_;
-  while (!$_[1] and my $message = $self->channel->receive) {
-    $self->receive(@$message);
-  }
+sub start_request {
+  my ($self, $req, $target, @payload) = @_;
+  $req->mistake("Reached router with no target")
+    unless $target;
+  $req->failure("Reached router with invalid target ${target}")
+    unless my $next = $self->services->{$target};
+  $next->start_request($req, @payload);
 }
 
 sub receive {
-  my ($self, $type, @payload) = @_;
-  unless ($type) {
-    $self->channel->send(MISTAKE => message_format => "No message type");
-    return;
-  }
-  unless (@payload) {
-    $self->channel->send(MISTAKE => message_format => "Tag missing");
-    return;
-  }
-  unless (@payload > 1) {
-    $self->channel->send(MISTAKE => message_format => "No payload");
-  }
-  if ($type eq 'REQUEST') {
-    $self->receive_request(@payload);
-    return;
-  }
-  if ($type eq 'RESPONSE') {
-    $self->receive_response(@payload);
-    return;
-  }
-}
-
-sub receive_request {
-  my ($self, $tag, $handler_name, @payload) = @_;
-  if ($self->requests_received->{$tag}) {
-    $self->channel->send(
-      MISTAKE => request_tag => "Request for ${tag} in process"
-    );
-    return;
-  }
-  my $handler = $self->local_request_handlers->{$handler_name};
-  unless ($handler) {
-    $self->send_response(
-      $tag => MISTAKE => handler_name => "No such handler ${handler_name}"
-    );
-    return;
-  }
-  my $request
-    = $self->requests_received->{$tag}
-    = Tak::Request->new(
-        tag => $tag, respond_to => $self, respond_with => 'send_response',
-      );
-  $handler->start_request($request => @payload);
-}
-
-sub send_response {
-  my ($self, $tag, @result) = @_;
-  delete $self->requests_received->{$tag};
-  $self->channel->send(RESPONSE => $tag => @result);
-}
-
-sub send_request {
-  my ($self, $respond_to, $respond_with, @payload) = @_;
-  my $tag = $self->next_serial;
-  my $request
-    = $self->requests_sent->{$tag}
-    = Tak::Request->new(
-        tag => $tag,
-        respond_to => $respond_to,
-        respond_with => $respond_with,
-      );
-  $self->channel->send(REQUEST => $tag => @payload);
-  return $request;
-}
-
-sub receive_response {
-  my ($self, $tag, @result) = @_;
-  my $request = delete $self->requests_sent->{$tag};
-  $request->respond(@result);
+  my ($self, $target, @payload) = @_;
+  return unless $target;
+  return unless my $next = $self->services->{$target};
+  $next->receive(@payload);
 }
 
 sub register {
   my ($self, $name, $service) = @_;
-  $self->local_request_handlers->{$name} = Tak::ServiceManager->new(
-    service => $service
-  );
+  $self->services->{$name} = $service;
+}
+
+sub deregister {
+  my ($self, $name) = @_;
+  delete $self->services->{$name}
 }
 
 1;
diff --git a/lib/Tak/ServiceManager.pm b/lib/Tak/ServiceManager.pm
deleted file mode 100644 (file)
index 89d6f9e..0000000
+++ /dev/null
@@ -1,21 +0,0 @@
-package Tak::ServiceManager;
-
-use Moo;
-
-has service => (is => 'ro', required => 1);
-
-sub start_request {
-  my ($self, $req, $type, @args) = @_;
-  unless ($type) {
-    $req->respond(MISTAKE => request_type => "No request type given");
-    return;
-  }
-  my $service = $self->service;
-  if (my $meth = $service->can("handle_${type}")) {
-    $req->respond($service->$meth(@args));
-    return;
-  }
-  $req->respond(MISTAKE => request_type => "Unknown request type ${type}");
-}
-
-1;
index 84d05a9..83985e5 100644 (file)
@@ -35,7 +35,7 @@ sub _build_router {
     remote => $remote
   );
   
-  unshift @INC, $loader->inc_callback;
+  push @INC, $loader->inc_callback;
   
   return $router;
 }
index 45db08f..5d22b2d 100755 (executable)
@@ -1,7 +1,7 @@
 #!/bin/sh
 
-if [ -e fatlib ]; then rm -r fatlib; fi
-fatpack tree $(fatpack packlists-for strictures.pm Moo.pm JSON/PP.pm)
+#if [ -e fatlib ]; then rm -r fatlib; fi
+#fatpack tree $(fatpack packlists-for strictures.pm Moo.pm JSON/PP.pm)
 fatpack file
-rm -r fatlib
-echo "use Tak::World; Tak::World->new_from_stdio->run;"
+#rm -r fatlib
+echo "use lib 'lib'; use Tak::World; Tak::World->new_from_stdio->run;"