From: Matt S Trout Date: Tue, 1 Nov 2011 20:00:04 +0000 (+0000) Subject: new Client/Router/Service arch X-Git-Tag: v0.001001~29 X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?p=scpubgit%2FTak.git;a=commitdiff_plain;h=31a246e4b1ef71cc1991c1631573f6c80cdf41b5 new Client/Router/Service arch --- diff --git a/bin/tak-repl b/bin/tak-repl index 7bc5d3e..2148f2d 100644 --- a/bin/tak-repl +++ b/bin/tak-repl @@ -1,15 +1,19 @@ #!/usr/bin/env perl -use Tak::WorldHandle; +use Tak::Client; +use Tak::Router; +use Tak::MetaService; use Tak::REPL; use strictures 1; -my $world = do { - if (my $ssh_target = $ARGV[0]) { - Tak::WorldHandle->new_remote($ssh_target); - } else { - Tak::WorldHandle->new_local; - } -}; +my $router = Tak::Router->new; -Tak::REPL->new(world => $world)->run; +$router->register(meta => Tak::MetaService->new(router => $router)); + +my $client = Tak::Client->new(service => $router); + +$client->curry('meta')->do(register => eval => 'Tak::EvalService'); + +my $repl = Tak::REPL->new(client => $client->curry('eval')); + +$repl->run; diff --git a/lib/Tak.pm b/lib/Tak.pm new file mode 100644 index 0000000..33bc378 --- /dev/null +++ b/lib/Tak.pm @@ -0,0 +1,17 @@ +package Tak; + +use Tak::Loop; +use strictures 1; + +our $loop; + +sub loop { $loop ||= Tak::Loop->new } + +sub loop_until { + my ($class, $done) = @_; + return if $done; + my $loop = $class->loop; + $loop->loop_once until $_[1]; +} + +1; diff --git a/lib/Tak/Client.pm b/lib/Tak/Client.pm new file mode 100644 index 0000000..04d86ef --- /dev/null +++ b/lib/Tak/Client.pm @@ -0,0 +1,46 @@ +package Tak::Client; + +use Tak; +use Tak::Request; +use Moo; + +has service => (is => 'ro', required => 1); + +has curried => (is => 'ro', default => sub { [] }); + +sub curry { + my ($self, @curry) = @_; + (ref $self)->new(%$self, curried => [ @{$self->curried}, @curry ]); +} + +sub send { + my ($self, @message) = @_; + $self->service->receive(@{$self->curried}, @message); +} + +sub start { + my ($self, $register, @payload) = @_; + my $req = $self->_new_request($register); + $self->service->start_request($req, @{$self->curried}, @payload); + return $req; +} + +sub request_class { 'Tak::Request' } + +sub _new_request { + my ($self, $args) = @_; + $self->request_class->new($args); +} + +sub do { + my ($self, @payload) = @_; + my $done; + my $result; + my $req = $self->start({ + on_result => sub { $result = shift }, + }, @payload); + Tak->loop_until($result); + $result->get; +} + +1; diff --git a/lib/Tak/EvalService.pm b/lib/Tak/EvalService.pm index 9a45db9..629beae 100644 --- a/lib/Tak/EvalService.pm +++ b/lib/Tak/EvalService.pm @@ -1,9 +1,11 @@ package Tak::EvalService; use Eval::WithLexicals; -use Moo; use Data::Dumper::Concise; use Capture::Tiny qw(capture); +use Moo; + +with 'Tak::Role::Service'; has 'eval_withlexicals' => (is => 'lazy'); @@ -14,12 +16,12 @@ sub _build_eval_withlexicals { sub handle_eval { my ($self, $perl) = @_; unless ($perl) { - return MISTAKE => eval_input => "No code supplied"; + die [ mistake => eval_input => "No code supplied" ]; } if (my $ref = ref($perl)) { - return MISTAKE => eval_input => "Code was a ${ref} reference"; + die [ mistake => eval_input => "Code was a ${ref} reference" ]; } - my ($code, @ret); + my ($ok, @ret); my ($stdout, $stderr); if (eval { ($stdout, $stderr) = capture { @@ -27,17 +29,18 @@ sub handle_eval { }; 1 }) { - $code = 'RESULT'; + $ok = 1; } else { - ($code, @ret) = (FAILURE => $@); + ($ok, @ret) = (0, $@); } my $dumped_ret; unless (eval { $dumped_ret = Dumper(@ret); 1 }) { - $dumped_ret = "Error dumping ${code} result: $@"; - $code = 'FAILURE'; + $dumped_ret = "Error dumping ${\($ok ? 'result' : 'exception')}: $@"; + $ok = 0; } - return $code => { - stdout => $stdout, stderr => $stderr, return => $dumped_ret + return { + stdout => $stdout, stderr => $stderr, + ($ok ? 'return' : 'exception') => $dumped_ret }; } diff --git a/lib/Tak/JSONChannel.pm b/lib/Tak/JSONChannel.pm index fd3e7b5..6bfb16a 100644 --- a/lib/Tak/JSONChannel.pm +++ b/lib/Tak/JSONChannel.pm @@ -53,6 +53,7 @@ sub send { sub _raw_send { my ($self, $raw) = @_; +#warn "Sending: ${raw}\n"; print { $self->write_fh } $raw."\n"; } diff --git a/lib/Tak/Loop.pm b/lib/Tak/Loop.pm new file mode 100644 index 0000000..fe9dff9 --- /dev/null +++ b/lib/Tak/Loop.pm @@ -0,0 +1,5 @@ +package Tak::Loop; + +use Moo; + +1; diff --git a/lib/Tak/MetaService.pm b/lib/Tak/MetaService.pm index b0b6af9..9e32704 100644 --- a/lib/Tak/MetaService.pm +++ b/lib/Tak/MetaService.pm @@ -2,6 +2,8 @@ package Tak::MetaService; use Moo; +with 'Tak::Role::Service'; + has router => (is => 'ro', required => 1, weak_ref => 1); sub handle_register { @@ -10,7 +12,7 @@ sub handle_register { require "${file}.pm"; my $new = $class->new(@args); $self->router->register($name => $new); - return SUCCESS => "Registered ${name}"; + return "Registered ${name}"; } 1; diff --git a/lib/Tak/REPL.pm b/lib/Tak/REPL.pm index 4c4f161..835f659 100644 --- a/lib/Tak/REPL.pm +++ b/lib/Tak/REPL.pm @@ -3,40 +3,27 @@ package Tak::REPL; use Term::ReadLine; use Moo; -has world => (is => 'ro', required => 1); - -has remote => (is => 'lazy'); - -sub _build_remote { - my ($self) = @_; - my $world = $self->world; - $world->remote_for('meta')->blocking_request( - register => eval => 'Tak::EvalService' - ); - $world->remote_for('eval') -} +has client => (is => 'ro', required => 1); sub run { - my $remote = $_[0]->remote; + my $client = $_[0]->client; my $read = Term::ReadLine->new('REPL'); while (1) { my $line = $read->readline('re.pl$ '); last unless defined $line; next unless length $line; - my @reply = $remote->blocking_request(eval => $line); - if ($reply[0] eq 'MISTAKE') { - die "Botch: ".join(': ', @reply[1,2]); - } - my $ret = $reply[1]; - print $ret->{return}; - if ($ret->{stdout}) { - chomp($ret->{stdout}); - print "STDOUT:\n${\$ret->{stdout}}\n"; + my $result = $client->do(eval => $line); + print exists($result->{return}) + ? $result->{return} + : "Error: ".$result->{exception}; + if ($result->{stdout}) { + chomp($result->{stdout}); + print "STDOUT:\n${\$result->{stdout}}\n"; } - if ($ret->{stderr}) { - chomp($ret->{stderr}); - print "STDERR:\n${\$ret->{stderr}}\n"; + if ($result->{stderr}) { + chomp($result->{stderr}); + print "STDERR:\n${\$result->{stderr}}\n"; } } } diff --git a/lib/Tak/Request.pm b/lib/Tak/Request.pm index 2ba8262..342ddba 100644 --- a/lib/Tak/Request.pm +++ b/lib/Tak/Request.pm @@ -1,16 +1,27 @@ package Tak::Request; +use Tak::Result; use Moo; -has tag => (is => 'ro', required => 1); +has on_progress => (is => 'ro'); -has respond_to => (is => 'ro', required => 1, weak_ref => 1); +has on_result => (is => 'ro', required => 1); -has respond_with => (is => 'ro', required => 1); +sub progress { + my ($self, @report) = @_; + if (my $cb = $self->on_progress) { + $cb->(@report); + } +} -sub respond { - my $self = shift; - $self->respond_to->${\$self->respond_with}($self->tag => @_); +sub result { + my ($self, $type, @data) = @_; + $self->on_result->(Tak::Result->new(type => $type, data => \@data)); } +sub success { shift->result(success => @_) } +sub mistake { shift->result(mistake => @_) } +sub failure { shift->result(failure => @_) } +sub fatal { shift->result(fatal => @_) } + 1; diff --git a/lib/Tak/Result.pm b/lib/Tak/Result.pm new file mode 100644 index 0000000..a7afb4f --- /dev/null +++ b/lib/Tak/Result.pm @@ -0,0 +1,24 @@ +package Tak::Result; + +use Moo; + +has type => (is => 'ro', required => 1); +has data => (is => 'ro', required => 1); + +sub get { + my ($self) = @_; + $self->throw unless $self->type eq 'success'; + return wantarray ? @{$self->data} : $self->data->[0]; +} + +sub throw { + my ($self) = @_; + die $self->exception; +} + +sub exception { + my ($self) = @_; + $self->type.': '.join ' ', @{$self->data}; +} + +1; diff --git a/lib/Tak/Role/ObjectMangling.pm b/lib/Tak/Role/ObjectMangling.pm new file mode 100644 index 0000000..9ef5380 --- /dev/null +++ b/lib/Tak/Role/ObjectMangling.pm @@ -0,0 +1,36 @@ +package Tak::Role::ObjectMangling; + +use Scalar::Util qw(weaken); +use JSON::PP qw(encode_json decode_json); + +use Moo::Role; + +requires 'inflate'; +requires 'deflate'; + +has encoder_json => (is => 'lazy'); +has decoder_json => (is => 'lazy'); + +sub _build_encoder_json { + JSON::PP->new->allow_nonref(1)->convert_blessed(1); +} + +sub _build_decoder_json { + my $self = shift; + weaken($self); + JSON::PP->new->allow_nonref(1)->filter_json_single_key_object( + __proxied_object__ => sub { $self->inflate($_[0]) } + ); +} + +sub encode_objects { + my ($self, $data) = @_; + local *UNIVERSAL::TO_JSON = sub { $self->deflate($_[0]) }; + decode_json($self->encoder_json->encode($data)); +} + +sub decode_objects { + my ($self, $data) = @_; + $self->decoder_json->decode(encode_json($data)); +} + diff --git a/lib/Tak/Role/Service.pm b/lib/Tak/Role/Service.pm new file mode 100644 index 0000000..a746d15 --- /dev/null +++ b/lib/Tak/Role/Service.pm @@ -0,0 +1,29 @@ +package Tak::Role::Service; + +use Moo::Role; + +sub start_request { + my ($self, $req, $type, @args) = @_; + unless ($type) { + $req->mistake(request_type => "No request type given"); + return; + } + if (my $meth = $self->can("handle_${type}")) { + my @result; + if (eval { @result = $self->$meth(@args); 1 }) { + $req->success(@result); + } else { + if (ref($@) eq 'ARRAY') { + $req->result(@{$@}); + } else { + $req->failure(exception => $@); + } + } + } else { + $req->mistake(request_type => "Unknown request type ${type}"); + } +} + +sub receive { } + +1; diff --git a/lib/Tak/Router.pm b/lib/Tak/Router.pm index 3e6853b..3041f02 100644 --- a/lib/Tak/Router.pm +++ b/lib/Tak/Router.pm @@ -1,113 +1,33 @@ package Tak::Router; -use Tak::Request; -use Tak::ServiceManager; -use Tak::MetaService; use Moo; -has channel => (is => 'ro', required => 1); +has services => (is => 'ro', default => sub { {} }); -has local_request_handlers => (is => 'ro', default => sub { {} }); - -has requests_received => (is => 'ro', default => sub { {} }); - -has last_serial => (is => 'ro',default => sub { 'A0000' }); - -sub next_serial { ++($_[0]->{last_serial}) } - -has requests_sent => (is => 'ro', default => sub { {} }); - -sub BUILD { - my ($self) = @_; - $self->register(meta => Tak::MetaService->new(router => $self)); -} - -sub run { shift->run_until } - -sub run_until { - my ($self, $done) = @_; - while (!$_[1] and my $message = $self->channel->receive) { - $self->receive(@$message); - } +sub start_request { + my ($self, $req, $target, @payload) = @_; + $req->mistake("Reached router with no target") + unless $target; + $req->failure("Reached router with invalid target ${target}") + unless my $next = $self->services->{$target}; + $next->start_request($req, @payload); } sub receive { - my ($self, $type, @payload) = @_; - unless ($type) { - $self->channel->send(MISTAKE => message_format => "No message type"); - return; - } - unless (@payload) { - $self->channel->send(MISTAKE => message_format => "Tag missing"); - return; - } - unless (@payload > 1) { - $self->channel->send(MISTAKE => message_format => "No payload"); - } - if ($type eq 'REQUEST') { - $self->receive_request(@payload); - return; - } - if ($type eq 'RESPONSE') { - $self->receive_response(@payload); - return; - } -} - -sub receive_request { - my ($self, $tag, $handler_name, @payload) = @_; - if ($self->requests_received->{$tag}) { - $self->channel->send( - MISTAKE => request_tag => "Request for ${tag} in process" - ); - return; - } - my $handler = $self->local_request_handlers->{$handler_name}; - unless ($handler) { - $self->send_response( - $tag => MISTAKE => handler_name => "No such handler ${handler_name}" - ); - return; - } - my $request - = $self->requests_received->{$tag} - = Tak::Request->new( - tag => $tag, respond_to => $self, respond_with => 'send_response', - ); - $handler->start_request($request => @payload); -} - -sub send_response { - my ($self, $tag, @result) = @_; - delete $self->requests_received->{$tag}; - $self->channel->send(RESPONSE => $tag => @result); -} - -sub send_request { - my ($self, $respond_to, $respond_with, @payload) = @_; - my $tag = $self->next_serial; - my $request - = $self->requests_sent->{$tag} - = Tak::Request->new( - tag => $tag, - respond_to => $respond_to, - respond_with => $respond_with, - ); - $self->channel->send(REQUEST => $tag => @payload); - return $request; -} - -sub receive_response { - my ($self, $tag, @result) = @_; - my $request = delete $self->requests_sent->{$tag}; - $request->respond(@result); + my ($self, $target, @payload) = @_; + return unless $target; + return unless my $next = $self->services->{$target}; + $next->receive(@payload); } sub register { my ($self, $name, $service) = @_; - $self->local_request_handlers->{$name} = Tak::ServiceManager->new( - service => $service - ); + $self->services->{$name} = $service; +} + +sub deregister { + my ($self, $name) = @_; + delete $self->services->{$name} } 1; diff --git a/lib/Tak/ServiceManager.pm b/lib/Tak/ServiceManager.pm deleted file mode 100644 index 89d6f9e..0000000 --- a/lib/Tak/ServiceManager.pm +++ /dev/null @@ -1,21 +0,0 @@ -package Tak::ServiceManager; - -use Moo; - -has service => (is => 'ro', required => 1); - -sub start_request { - my ($self, $req, $type, @args) = @_; - unless ($type) { - $req->respond(MISTAKE => request_type => "No request type given"); - return; - } - my $service = $self->service; - if (my $meth = $service->can("handle_${type}")) { - $req->respond($service->$meth(@args)); - return; - } - $req->respond(MISTAKE => request_type => "Unknown request type ${type}"); -} - -1; diff --git a/lib/Tak/World.pm b/lib/Tak/World.pm index 84d05a9..83985e5 100644 --- a/lib/Tak/World.pm +++ b/lib/Tak/World.pm @@ -35,7 +35,7 @@ sub _build_router { remote => $remote ); - unshift @INC, $loader->inc_callback; + push @INC, $loader->inc_callback; return $router; } diff --git a/maint/mk-fat b/maint/mk-fat index 45db08f..5d22b2d 100755 --- a/maint/mk-fat +++ b/maint/mk-fat @@ -1,7 +1,7 @@ #!/bin/sh -if [ -e fatlib ]; then rm -r fatlib; fi -fatpack tree $(fatpack packlists-for strictures.pm Moo.pm JSON/PP.pm) +#if [ -e fatlib ]; then rm -r fatlib; fi +#fatpack tree $(fatpack packlists-for strictures.pm Moo.pm JSON/PP.pm) fatpack file -rm -r fatlib -echo "use Tak::World; Tak::World->new_from_stdio->run;" +#rm -r fatlib +echo "use lib 'lib'; use Tak::World; Tak::World->new_from_stdio->run;"