From: Matt S Trout Date: Fri, 4 Nov 2011 21:51:55 +0000 (+0000) Subject: new remote code X-Git-Tag: v0.001001~28 X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?p=scpubgit%2FTak.git;a=commitdiff_plain;h=77bf1d9b5b6832894676ab549ee5664cb7200d33 new remote code --- diff --git a/lib/Tak/Client.pm b/lib/Tak/Client.pm index 04d86ef..949e9af 100644 --- a/lib/Tak/Client.pm +++ b/lib/Tak/Client.pm @@ -33,6 +33,10 @@ sub _new_request { } sub do { + shift->result_of(@_)->get; +} + +sub result_of { my ($self, @payload) = @_; my $done; my $result; @@ -40,7 +44,7 @@ sub do { on_result => sub { $result = shift }, }, @payload); Tak->loop_until($result); - $result->get; + return $result; } 1; diff --git a/lib/Tak/ConnectionReceiver.pm b/lib/Tak/ConnectionReceiver.pm new file mode 100644 index 0000000..bb2ba15 --- /dev/null +++ b/lib/Tak/ConnectionReceiver.pm @@ -0,0 +1,62 @@ +package Tak::ConnectionReceiver; + +use Tak::Request; +use Scalar::Util qw(weaken); +use Moo; + +with 'Tak::Role::Service'; + +has requests => (is => 'ro', default => sub { {} }); + +has channel => (is => 'ro', required => 1); + +has service => (is => 'ro', required => 1); + +sub BUILD { + weaken(my $self = shift); + my $channel = $self->channel; + Tak->loop->watch_io( + handle => $channel->read_fh, + on_read_ready => sub { + if (my $message = $channel->read_message) { + $self->receive(@$message); + } + } + ); +} + +sub DEMOLISH { + Tak->loop->unwatch_io( + handle => $_[0]->channel->read_fh, + on_read_ready => 1, + ); +} + +sub receive_request { + my ($self, $tag, $meta, @payload) = @_; + my $channel = $self->channel; + my $req = Tak::Request->new( + ($meta->{progress} + ? (on_progress => sub { $channel->write_message(progress => $tag => @_) }) + : ()), + on_result => sub { $channel->write_message(result => $tag => $_[0]->flatten) } + ); + $self->service->start_request($req => @payload); +} + +sub receive_progress { + my ($self, $tag, @payload) = @_; + $self->requests->{$tag}->progress(@payload); +} + +sub receive_result { + my ($self, $tag, @payload) = @_; + $self->requests->{$tag}->result(@payload); +} + +sub receive_message { + my ($self, @payload) = @_; + $self->service->receive(@payload); +} + +1; diff --git a/lib/Tak/ConnectionService.pm b/lib/Tak/ConnectionService.pm new file mode 100644 index 0000000..6390e80 --- /dev/null +++ b/lib/Tak/ConnectionService.pm @@ -0,0 +1,35 @@ +package Tak::ConnectionService; + +use Tak::ConnectionReceiver; +use Tak::JSONChannel; +use Moo; + +has receiver => (is => 'ro', writer => '_set_receiver'); + +has channel => (is => 'ro', writer => '_set_channel'); + +sub BUILD { + my ($self, $args) = @_; + my $channel = $self->_set_channel( + Tak::JSONChannel->new(map +($_ => $args->{$_}), qw(read_fh write_fh)) + ); + my $receiver = $self->_set_receiver( + Tak::ConnectionReceiver->new( + channel => $channel, service => $args->{listening_service} + ) + ); +} + +sub start_request { + my ($self, $req, @payload) = @_; + $self->receiver->requests->{my $tag = "$req"} = $req; + my $meta = { progress => !!$req->on_progress }; + $self->channel->write_message(request => $tag => $meta => @payload); +} + +sub receive { + my ($self, @payload) = @_; + $self->channel->write_message(message => @payload); +} + +1; diff --git a/lib/Tak/ConnectorService.pm b/lib/Tak/ConnectorService.pm new file mode 100644 index 0000000..8a244a2 --- /dev/null +++ b/lib/Tak/ConnectorService.pm @@ -0,0 +1,41 @@ +package Tak::ConnectorService; + +use IPC::Open2; +use IO::All; +use Tak::Router; +use Tak::Client; +use Tak::ConnectionService; +use Moo; + +with 'Tak::Role::Service'; + +has connections => (is => 'ro', default => sub { Tak::Router->new }); + +sub handle_create { + my ($self) = @_; + my $kid_pid = IPC::Open2::open2(my $kid_out, my $kid_in, $^X, '-') + or die "Couldn't open2 child: $!"; + io($kid_in)->print(io('maint/mk-fat |')->all, "__END__\n"); + my $connection = Tak::ConnectionService->new( + read_fh => $kid_out, write_fh => $kid_in, + listening_service => Tak::Router->new + ); + my $client = Tak::Client->new(service => $connection); + # actually, we should register with a monotonic id and + # stash the pid elsewhere. but meh for now. + my $pid = $client->do(meta => 'pid'); + $self->connections->register('|'.$pid, $connection); + return ('proxy', '|'.$pid); +} + +sub start_proxy_request { + my ($self, $req, @payload) = @_;; + $self->connections->start_request($req, @payload); +} + +sub receive_proxy { + my ($self, @payload) = @_; + $self->connections->receive(@payload); +} + +1; diff --git a/lib/Tak/JSONChannel.pm b/lib/Tak/JSONChannel.pm index 6bfb16a..c06054e 100644 --- a/lib/Tak/JSONChannel.pm +++ b/lib/Tak/JSONChannel.pm @@ -10,9 +10,9 @@ has write_fh => (is => 'ro', required => 1); sub BUILD { shift->write_fh->autoflush(1); } -sub receive { +sub read_message { my ($self) = @_; - while (my $line = readline($self->read_fh)) { + if (defined(my $line = readline($self->read_fh))) { if (my $unpacked = $self->_unpack_line($line)) { return $unpacked; } @@ -23,35 +23,35 @@ sub _unpack_line { my ($self, $line) = @_; my $data = eval { decode_json($line) }; unless ($data) { - $self->send(MISTAKE => invalid_json => $@||'No data and no exception'); + $self->write_message(mistake => invalid_json => $@||'No data and no exception'); return; } unless (ref($data) eq 'ARRAY') { - $self->send(MISTAKE => message_format => "Not an ARRAY"); + $self->write_message(mistake => message_format => "Not an ARRAY"); return; } unless (@$data > 0) { - $self->send(MISTAKE => message_format => "Empty request array"); + $self->write_message(mistake => message_format => "Empty request array"); return; } $data; } -sub send { +sub write_message { my ($self, @msg) = @_; my $json = eval { encode_json(\@msg) }; unless ($json) { - $self->_raw_send( + $self->_raw_write_message( encode_json( - [ FAILURE => invalid_message => $@||'No data and no exception' ] + [ failure => invalid_message => $@||'No data and no exception' ] ) ); return; } - $self->_raw_send($json); + $self->_raw_write_message($json); } -sub _raw_send { +sub _raw_write_message { my ($self, $raw) = @_; #warn "Sending: ${raw}\n"; print { $self->write_fh } $raw."\n"; diff --git a/lib/Tak/Loop.pm b/lib/Tak/Loop.pm index fe9dff9..888136c 100644 --- a/lib/Tak/Loop.pm +++ b/lib/Tak/Loop.pm @@ -1,5 +1,46 @@ package Tak::Loop; +use IO::Select; use Moo; +has is_running => (is => 'rw', clearer => 'loop_stop'); + +has _read_watches => (is => 'ro', default => sub { {} }); +has _read_select => (is => 'ro', default => sub { IO::Select->new }); + +sub watch_io { + my ($self, %watch) = @_; + my $fh = $watch{handle}; + if (my $cb = $watch{on_read_ready}) { + $self->_read_select->add($fh); + $self->_read_watches->{$fh} = $cb; + } +} + +sub unwatch_io { + my ($self, %watch) = @_; + my $fh = $watch{handle}; + if ($watch{on_read_ready}) { + $self->_read_select->remove($fh); + delete $self->_read_watches->{$fh}; + } +} + +sub loop_once { + my ($self) = @_; + my $read = $self->_read_watches; + my ($readable) = IO::Select->select($self->_read_select); + foreach my $fh (@$readable) { + $read->{$fh}(); + } +} + +sub loop_forever { + my ($self) = @_; + $self->is_running(1); + while ($self->is_running) { + $self->loop_once; + } +} + 1; diff --git a/lib/Tak/MetaService.pm b/lib/Tak/MetaService.pm index 9e32704..4dfe9d4 100644 --- a/lib/Tak/MetaService.pm +++ b/lib/Tak/MetaService.pm @@ -6,6 +6,10 @@ with 'Tak::Role::Service'; has router => (is => 'ro', required => 1, weak_ref => 1); +sub handle_pid { + return $$; +} + sub handle_register { my ($self, $name, $class, @args) = @_; (my $file = $class) =~ s/::/\//g; diff --git a/lib/Tak/Request.pm b/lib/Tak/Request.pm index 342ddba..49731d9 100644 --- a/lib/Tak/Request.pm +++ b/lib/Tak/Request.pm @@ -19,6 +19,11 @@ sub result { $self->on_result->(Tak::Result->new(type => $type, data => \@data)); } +sub flatten { + my ($self) = @_; + return ($self->type, @{$self->data}); +} + sub success { shift->result(success => @_) } sub mistake { shift->result(mistake => @_) } sub failure { shift->result(failure => @_) } diff --git a/lib/Tak/Result.pm b/lib/Tak/Result.pm index a7afb4f..2c06242 100644 --- a/lib/Tak/Result.pm +++ b/lib/Tak/Result.pm @@ -5,6 +5,8 @@ use Moo; has type => (is => 'ro', required => 1); has data => (is => 'ro', required => 1); +sub flatten { $_[0]->type, @{$_[0]->data} } + sub get { my ($self) = @_; $self->throw unless $self->type eq 'success'; diff --git a/lib/Tak/Role/Service.pm b/lib/Tak/Role/Service.pm index a746d15..3ee3d7f 100644 --- a/lib/Tak/Role/Service.pm +++ b/lib/Tak/Role/Service.pm @@ -3,14 +3,14 @@ package Tak::Role::Service; use Moo::Role; sub start_request { - my ($self, $req, $type, @args) = @_; + my ($self, $req, $type, @payload) = @_; unless ($type) { $req->mistake(request_type => "No request type given"); return; } if (my $meth = $self->can("handle_${type}")) { my @result; - if (eval { @result = $self->$meth(@args); 1 }) { + if (eval { @result = $self->$meth(@payload); 1 }) { $req->success(@result); } else { if (ref($@) eq 'ARRAY') { @@ -19,11 +19,18 @@ sub start_request { $req->failure(exception => $@); } } + } elsif ($meth = $self->can("start_${type}_request")) { + $self->$meth($req => @payload); } else { $req->mistake(request_type => "Unknown request type ${type}"); } } -sub receive { } +sub receive { + my ($self, $type, @payload) = @_; + if (my $meth = $self->can("receive_${type}")) { + $self->$meth(@payload); + } +} 1; diff --git a/lib/Tak/Router.pm b/lib/Tak/Router.pm index 3041f02..f7e56dd 100644 --- a/lib/Tak/Router.pm +++ b/lib/Tak/Router.pm @@ -1,9 +1,15 @@ package Tak::Router; +use Tak::MetaService; use Moo; has services => (is => 'ro', default => sub { {} }); +sub BUILD { + my ($self) = @_; + $self->register(meta => Tak::MetaService->new(router => $self)); +} + sub start_request { my ($self, $req, $target, @payload) = @_; $req->mistake("Reached router with no target") diff --git a/lib/Tak/STDIOSetup.pm b/lib/Tak/STDIOSetup.pm new file mode 100644 index 0000000..fed4656 --- /dev/null +++ b/lib/Tak/STDIOSetup.pm @@ -0,0 +1,19 @@ +package Tak::STDIOSetup; + +use Tak::ConnectionService; +use Tak::Router; +use Tak; +use strictures 1; + +sub run { + open my $stdin, '<&', \*STDIN; + open my $stdout, '>&', \*STDOUT; + close STDIN; close STDOUT; + my $connection = Tak::ConnectionService->new( + read_fh => $stdin, write_fh => $stdout, + listening_service => Tak::Router->new + ); + Tak->loop->loop_forever; +} + +1; diff --git a/maint/mk-fat b/maint/mk-fat index 5d22b2d..a23f132 100755 --- a/maint/mk-fat +++ b/maint/mk-fat @@ -4,4 +4,4 @@ #fatpack tree $(fatpack packlists-for strictures.pm Moo.pm JSON/PP.pm) fatpack file #rm -r fatlib -echo "use lib 'lib'; use Tak::World; Tak::World->new_from_stdio->run;" +echo "use lib 'lib'; use Tak::STDIOSetup; Tak::STDIOSetup->run;"