From: Matt S Trout Date: Fri, 28 Oct 2011 18:49:31 +0000 (+0000) Subject: client and server manage to talk X-Git-Tag: v0.001001~37 X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?p=scpubgit%2FTak.git;a=commitdiff_plain;h=36cf3bcb9b7a2e87d48ebc657f50680ef23c96fe client and server manage to talk --- diff --git a/lib/Tak/EvalService.pm b/lib/Tak/EvalService.pm new file mode 100644 index 0000000..9a45db9 --- /dev/null +++ b/lib/Tak/EvalService.pm @@ -0,0 +1,44 @@ +package Tak::EvalService; + +use Eval::WithLexicals; +use Moo; +use Data::Dumper::Concise; +use Capture::Tiny qw(capture); + +has 'eval_withlexicals' => (is => 'lazy'); + +sub _build_eval_withlexicals { + Eval::WithLexicals->new +} + +sub handle_eval { + my ($self, $perl) = @_; + unless ($perl) { + return MISTAKE => eval_input => "No code supplied"; + } + if (my $ref = ref($perl)) { + return MISTAKE => eval_input => "Code was a ${ref} reference"; + } + my ($code, @ret); + my ($stdout, $stderr); + if (eval { + ($stdout, $stderr) = capture { + @ret = $self->eval_withlexicals->eval($perl); + }; + 1 + }) { + $code = 'RESULT'; + } else { + ($code, @ret) = (FAILURE => $@); + } + my $dumped_ret; + unless (eval { $dumped_ret = Dumper(@ret); 1 }) { + $dumped_ret = "Error dumping ${code} result: $@"; + $code = 'FAILURE'; + } + return $code => { + stdout => $stdout, stderr => $stderr, return => $dumped_ret + }; +} + +1; diff --git a/lib/Tak/JSONChannel.pm b/lib/Tak/JSONChannel.pm new file mode 100644 index 0000000..9716768 --- /dev/null +++ b/lib/Tak/JSONChannel.pm @@ -0,0 +1,58 @@ +package Tak::JSONChannel; + +use JSON::PP qw(encode_json decode_json); +use IO::Handle; +use Scalar::Util qw(weaken); +use Moo; + +has read_fh => (is => 'ro', required => 1); +has write_fh => (is => 'ro', required => 1); + +sub BUILD { shift->write_fh->autoflush(1); } + +sub receive { + my ($self) = @_; + while (my $line = readline($self->read_fh)) { + if (my $unpacked = $self->_unpack_line($line)) { + return $unpacked; + } + } +} + +sub _unpack_line { + my ($self, $line) = @_; + my $data = eval { decode_json($line) }; + unless ($data) { + $self->send(MISTAKE => invalid_json => $@||'No data and no exception'); + return; + } + unless (ref($data) eq 'ARRAY') { + $self->send(MISTAKE => message_format => "Not an ARRAY"); + return; + } + unless (@$data > 0) { + $self->send(MISTAKE => message_format => "Empty request array"); + return; + } + $data; +} + +sub send { + my ($self, @msg) = @_; + my $json = eval { encode_json(\@msg) }; + unless ($json) { + $self->_raw_send( + encode_json( + [ FAILURE => invalid_message => $@||'No data and no exception' ] + ) + ); + } + $self->_raw_send($json); +} + +sub _raw_send { + my ($self, $raw) = @_; + print { $self->write_fh } $raw."\n"; +} + +1; diff --git a/lib/Tak/ModuleLoader.pm b/lib/Tak/ModuleLoader.pm new file mode 100644 index 0000000..8d5b837 --- /dev/null +++ b/lib/Tak/ModuleLoader.pm @@ -0,0 +1,21 @@ +package Tak::ModuleLoader; + +use Moo; + +has remote => (is => 'ro', required => 1); + +sub inc_callback { + sub { $self->maybe_load_module($_[1]) } +} + +sub maybe_load_module { + my ($self, $module) = @_; + my $result = $self->remote->blocking_request(source_for => $module); + if (my $code = $result->{code}) { + open my $fh, '<', \$code; + return $fh; + } + return; +} + +1; diff --git a/lib/Tak/Remote.pm b/lib/Tak/Remote.pm new file mode 100644 index 0000000..7d1d7d6 --- /dev/null +++ b/lib/Tak/Remote.pm @@ -0,0 +1,29 @@ +package Tak::Remote; + +use Moo; + +has router => (is => 'ro', required => 1); + +has name => (is => 'ro', required => 1); + +sub blocking_request { + my ($self, @payload) = @_; + local our $Request = $self->router->send_request($self, $self->name, @payload); + local our $Done; + local our @Result; + $self->router->run_until($Done); + return @Result; +} + +sub send_response { + my ($self, $tag, @result) = @_; + { + our $Request; + die "Out of order response ${\$Request->tag}, expecting ${tag}" + if $Request->tag ne $tag; + } + our @Result = @result; + our $Done = 1; +} + +1; diff --git a/lib/Tak/Request.pm b/lib/Tak/Request.pm new file mode 100644 index 0000000..669e027 --- /dev/null +++ b/lib/Tak/Request.pm @@ -0,0 +1,14 @@ +package Tak::Request; + +use Moo; + +has tag => (is => 'ro', required => 1); + +has respond_to => (is => 'ro', required => 1, weak_ref => 1); + +sub respond { + my $self = shift; + $self->respond_to->send_response($self->tag => @_); +} + +1; diff --git a/lib/Tak/Router.pm b/lib/Tak/Router.pm new file mode 100644 index 0000000..214455c --- /dev/null +++ b/lib/Tak/Router.pm @@ -0,0 +1,98 @@ +package Tak::Router; + +use Tak::Request; +use Moo; + +has channel => (is => 'ro', required => 1); + +has local_request_handlers => (is => 'ro', default => sub { {} }); + +has requests_received => (is => 'ro', default => sub { {} }); + +has last_serial => (is => 'ro',default => sub { 'A0000' }); + +sub next_serial { ++($_[0]->{last_serial}) } + +has requests_sent => (is => 'ro', default => sub { {} }); + +sub run { shift->run_until } + +sub run_until { + my ($self, $done) = @_; + while (!$_[1] and my $message = $self->channel->receive) { + $self->receive(@$message); + } +} + +sub receive { + my ($self, $type, @payload) = @_; + unless ($type) { + $self->channel->send(MISTAKE => message_format => "No message type"); + return; + } + unless (@payload) { + $self->channel->send(MISTAKE => message_format => "Tag missing"); + return; + } + unless (@payload > 1) { + $self->channel->send(MISTAKE => message_format => "No payload"); + } + if ($type eq 'REQUEST') { + $self->receive_request(@payload); + return; + } + if ($type eq 'RESPONSE') { + $self->receive_response(@payload); + return; + } +} + +sub receive_request { + my ($self, $tag, $handler_name, @payload) = @_; + if ($self->requests_received->{$tag}) { + $self->channel->send( + MISTAKE => request_tag => "Request for ${tag} in process" + ); + return; + } + my $handler = $self->local_request_handlers->{$handler_name}; + unless ($handler) { + $self->send_response( + $tag => MISTAKE => handler_name => "No such handler ${handler_name}" + ); + return; + } + my $request + = $self->requests_received->{$tag} + = Tak::Request->new( + tag => $tag, respond_to => $self + ); + $handler->start_request($request => @payload); +} + +sub send_response { + my ($self, $tag, @result) = @_; + delete $self->requests_received->{$tag}; + $self->channel->send(RESPONSE => $tag => @result); +} + +sub send_request { + my ($self, $respond_to, @payload) = @_; + my $tag = $self->next_serial; + my $request + = $self->requests_sent->{$tag} + = Tak::Request->new( + tag => $tag, + respond_to => $respond_to + ); + $self->channel->send(REQUEST => $tag => @payload); + return $request; +} + +sub receive_response { + my ($self, $tag, @result) = @_; + my $request = delete $self->requests_sent->{$tag}; + $request->respond(@result); +} + +1; diff --git a/lib/Tak/ServiceManager.pm b/lib/Tak/ServiceManager.pm new file mode 100644 index 0000000..89d6f9e --- /dev/null +++ b/lib/Tak/ServiceManager.pm @@ -0,0 +1,21 @@ +package Tak::ServiceManager; + +use Moo; + +has service => (is => 'ro', required => 1); + +sub start_request { + my ($self, $req, $type, @args) = @_; + unless ($type) { + $req->respond(MISTAKE => request_type => "No request type given"); + return; + } + my $service = $self->service; + if (my $meth = $service->can("handle_${type}")) { + $req->respond($service->$meth(@args)); + return; + } + $req->respond(MISTAKE => request_type => "Unknown request type ${type}"); +} + +1; diff --git a/takc b/takc new file mode 100644 index 0000000..91745e6 --- /dev/null +++ b/takc @@ -0,0 +1,26 @@ +use strictures 1; +use Devel::Dwarn; +use Tak::JSONChannel; +use Tak::Router; +use IPC::Open2; +use Tak::Remote; + +my $pid = open2(my $out, my $in, $^X, qw(-Ilib takd)) + or die "Couldn't open2 child: $!"; + +my $channel = Tak::JSONChannel->new( + read_fh => $out, + write_fh => $in +); + +my $router = Tak::Router->new( + channel => $channel, +); + +my $remote = Tak::Remote->new( + router => $router, + name => 'EVAL' +); + +::Dwarn([ $remote->blocking_request(eval => 'my $x = 1;') ]); +::Dwarn([ $remote->blocking_request(eval => '1+$x;') ]); diff --git a/takd b/takd new file mode 100644 index 0000000..4dea863 --- /dev/null +++ b/takd @@ -0,0 +1,23 @@ +use strictures 1; +use Tak::JSONChannel; +use Tak::ServiceManager; +use Tak::EvalService; +use Tak::Router; + +my $channel = Tak::JSONChannel->new( + read_fh => \*STDIN, + write_fh => \*STDOUT +); + +my $eval = Tak::ServiceManager->new( + service => Tak::EvalService->new +); + +my $router = Tak::Router->new( + channel => $channel, + local_request_handlers => { + EVAL => $eval + } +); + +$router->run;