object proxying system
[scpubgit/Tak.git] / lib / Tak / Router.pm
CommitLineData
36cf3bcb 1package Tak::Router;
2
3use Tak::Request;
8b6c1f59 4use Tak::ServiceManager;
857f4834 5use Tak::MetaService;
36cf3bcb 6use Moo;
7
8has channel => (is => 'ro', required => 1);
9
10has local_request_handlers => (is => 'ro', default => sub { {} });
11
12has requests_received => (is => 'ro', default => sub { {} });
13
14has last_serial => (is => 'ro',default => sub { 'A0000' });
15
16sub next_serial { ++($_[0]->{last_serial}) }
17
18has requests_sent => (is => 'ro', default => sub { {} });
19
857f4834 20sub BUILD {
21 my ($self) = @_;
22 $self->register(meta => Tak::MetaService->new(router => $self));
23}
24
36cf3bcb 25sub run { shift->run_until }
26
27sub run_until {
28 my ($self, $done) = @_;
29 while (!$_[1] and my $message = $self->channel->receive) {
30 $self->receive(@$message);
31 }
32}
33
34sub receive {
35 my ($self, $type, @payload) = @_;
36 unless ($type) {
37 $self->channel->send(MISTAKE => message_format => "No message type");
38 return;
39 }
40 unless (@payload) {
41 $self->channel->send(MISTAKE => message_format => "Tag missing");
42 return;
43 }
44 unless (@payload > 1) {
45 $self->channel->send(MISTAKE => message_format => "No payload");
46 }
47 if ($type eq 'REQUEST') {
48 $self->receive_request(@payload);
49 return;
50 }
51 if ($type eq 'RESPONSE') {
52 $self->receive_response(@payload);
53 return;
54 }
55}
56
57sub receive_request {
58 my ($self, $tag, $handler_name, @payload) = @_;
59 if ($self->requests_received->{$tag}) {
60 $self->channel->send(
61 MISTAKE => request_tag => "Request for ${tag} in process"
62 );
63 return;
64 }
65 my $handler = $self->local_request_handlers->{$handler_name};
66 unless ($handler) {
67 $self->send_response(
68 $tag => MISTAKE => handler_name => "No such handler ${handler_name}"
69 );
70 return;
71 }
72 my $request
73 = $self->requests_received->{$tag}
74 = Tak::Request->new(
ca30c15e 75 tag => $tag, respond_to => $self, respond_with => 'send_response',
36cf3bcb 76 );
77 $handler->start_request($request => @payload);
78}
79
80sub send_response {
81 my ($self, $tag, @result) = @_;
82 delete $self->requests_received->{$tag};
83 $self->channel->send(RESPONSE => $tag => @result);
84}
85
86sub send_request {
ca30c15e 87 my ($self, $respond_to, $respond_with, @payload) = @_;
36cf3bcb 88 my $tag = $self->next_serial;
89 my $request
90 = $self->requests_sent->{$tag}
91 = Tak::Request->new(
92 tag => $tag,
ca30c15e 93 respond_to => $respond_to,
94 respond_with => $respond_with,
36cf3bcb 95 );
96 $self->channel->send(REQUEST => $tag => @payload);
97 return $request;
98}
99
100sub receive_response {
101 my ($self, $tag, @result) = @_;
102 my $request = delete $self->requests_sent->{$tag};
103 $request->respond(@result);
104}
105
8b6c1f59 106sub register {
107 my ($self, $name, $service) = @_;
108 $self->local_request_handlers->{$name} = Tak::ServiceManager->new(
109 service => $service
110 );
111}
112
36cf3bcb 1131;