Commit | Line | Data |
36cf3bcb |
1 | package Tak::Router; |
2 | |
3 | use Tak::Request; |
4 | use Moo; |
5 | |
6 | has channel => (is => 'ro', required => 1); |
7 | |
8 | has local_request_handlers => (is => 'ro', default => sub { {} }); |
9 | |
10 | has requests_received => (is => 'ro', default => sub { {} }); |
11 | |
12 | has last_serial => (is => 'ro',default => sub { 'A0000' }); |
13 | |
14 | sub next_serial { ++($_[0]->{last_serial}) } |
15 | |
16 | has requests_sent => (is => 'ro', default => sub { {} }); |
17 | |
18 | sub run { shift->run_until } |
19 | |
20 | sub run_until { |
21 | my ($self, $done) = @_; |
22 | while (!$_[1] and my $message = $self->channel->receive) { |
23 | $self->receive(@$message); |
24 | } |
25 | } |
26 | |
27 | sub receive { |
28 | my ($self, $type, @payload) = @_; |
29 | unless ($type) { |
30 | $self->channel->send(MISTAKE => message_format => "No message type"); |
31 | return; |
32 | } |
33 | unless (@payload) { |
34 | $self->channel->send(MISTAKE => message_format => "Tag missing"); |
35 | return; |
36 | } |
37 | unless (@payload > 1) { |
38 | $self->channel->send(MISTAKE => message_format => "No payload"); |
39 | } |
40 | if ($type eq 'REQUEST') { |
41 | $self->receive_request(@payload); |
42 | return; |
43 | } |
44 | if ($type eq 'RESPONSE') { |
45 | $self->receive_response(@payload); |
46 | return; |
47 | } |
48 | } |
49 | |
50 | sub receive_request { |
51 | my ($self, $tag, $handler_name, @payload) = @_; |
52 | if ($self->requests_received->{$tag}) { |
53 | $self->channel->send( |
54 | MISTAKE => request_tag => "Request for ${tag} in process" |
55 | ); |
56 | return; |
57 | } |
58 | my $handler = $self->local_request_handlers->{$handler_name}; |
59 | unless ($handler) { |
60 | $self->send_response( |
61 | $tag => MISTAKE => handler_name => "No such handler ${handler_name}" |
62 | ); |
63 | return; |
64 | } |
65 | my $request |
66 | = $self->requests_received->{$tag} |
67 | = Tak::Request->new( |
68 | tag => $tag, respond_to => $self |
69 | ); |
70 | $handler->start_request($request => @payload); |
71 | } |
72 | |
73 | sub send_response { |
74 | my ($self, $tag, @result) = @_; |
75 | delete $self->requests_received->{$tag}; |
76 | $self->channel->send(RESPONSE => $tag => @result); |
77 | } |
78 | |
79 | sub send_request { |
80 | my ($self, $respond_to, @payload) = @_; |
81 | my $tag = $self->next_serial; |
82 | my $request |
83 | = $self->requests_sent->{$tag} |
84 | = Tak::Request->new( |
85 | tag => $tag, |
86 | respond_to => $respond_to |
87 | ); |
88 | $self->channel->send(REQUEST => $tag => @payload); |
89 | return $request; |
90 | } |
91 | |
92 | sub receive_response { |
93 | my ($self, $tag, @result) = @_; |
94 | my $request = delete $self->requests_sent->{$tag}; |
95 | $request->respond(@result); |
96 | } |
97 | |
98 | 1; |