add meta service
[scpubgit/Tak.git] / lib / Tak / Router.pm
1 package Tak::Router;
2
3 use Tak::Request;
4 use Tak::ServiceManager;
5 use Tak::MetaService;
6 use Moo;
7
8 has channel => (is => 'ro', required => 1);
9
10 has local_request_handlers => (is => 'ro', default => sub { {} });
11
12 has requests_received => (is => 'ro', default => sub { {} });
13
14 has last_serial => (is => 'ro',default => sub { 'A0000' });
15
16 sub next_serial { ++($_[0]->{last_serial}) }
17
18 has requests_sent => (is => 'ro', default => sub { {} });
19
20 sub BUILD {
21   my ($self) = @_;
22   $self->register(meta => Tak::MetaService->new(router => $self));
23 }
24
25 sub run { shift->run_until }
26
27 sub run_until {
28   my ($self, $done) = @_;
29   while (!$_[1] and my $message = $self->channel->receive) {
30     $self->receive(@$message);
31   }
32 }
33
34 sub receive {
35   my ($self, $type, @payload) = @_;
36   unless ($type) {
37     $self->channel->send(MISTAKE => message_format => "No message type");
38     return;
39   }
40   unless (@payload) {
41     $self->channel->send(MISTAKE => message_format => "Tag missing");
42     return;
43   }
44   unless (@payload > 1) {
45     $self->channel->send(MISTAKE => message_format => "No payload");
46   }
47   if ($type eq 'REQUEST') {
48     $self->receive_request(@payload);
49     return;
50   }
51   if ($type eq 'RESPONSE') {
52     $self->receive_response(@payload);
53     return;
54   }
55 }
56
57 sub receive_request {
58   my ($self, $tag, $handler_name, @payload) = @_;
59   if ($self->requests_received->{$tag}) {
60     $self->channel->send(
61       MISTAKE => request_tag => "Request for ${tag} in process"
62     );
63     return;
64   }
65   my $handler = $self->local_request_handlers->{$handler_name};
66   unless ($handler) {
67     $self->send_response(
68       $tag => MISTAKE => handler_name => "No such handler ${handler_name}"
69     );
70     return;
71   }
72   my $request
73     = $self->requests_received->{$tag}
74     = Tak::Request->new(
75         tag => $tag, respond_to => $self, respond_with => 'send_response',
76       );
77   $handler->start_request($request => @payload);
78 }
79
80 sub send_response {
81   my ($self, $tag, @result) = @_;
82   delete $self->requests_received->{$tag};
83   $self->channel->send(RESPONSE => $tag => @result);
84 }
85
86 sub send_request {
87   my ($self, $respond_to, $respond_with, @payload) = @_;
88   my $tag = $self->next_serial;
89   my $request
90     = $self->requests_sent->{$tag}
91     = Tak::Request->new(
92         tag => $tag,
93         respond_to => $respond_to,
94         respond_with => $respond_with,
95       );
96   $self->channel->send(REQUEST => $tag => @payload);
97   return $request;
98 }
99
100 sub receive_response {
101   my ($self, $tag, @result) = @_;
102   my $request = delete $self->requests_sent->{$tag};
103   $request->respond(@result);
104 }
105
106 sub register {
107   my ($self, $name, $service) = @_;
108   $self->local_request_handlers->{$name} = Tak::ServiceManager->new(
109     service => $service
110   );
111 }
112
113 1;