Commit | Line | Data |
0a663589 |
1 | package Catalyst::Engine::Stomp; |
2 | use Moose; |
3 | extends 'Catalyst::Engine::Embeddable'; |
4 | |
93202d20 |
5 | our $VERSION = '0.03'; |
0a663589 |
6 | |
7 | use List::MoreUtils qw/ uniq /; |
8 | use HTTP::Request; |
9 | use Net::Stomp; |
10 | |
11 | has connection => (is => 'rw', isa => 'Net::Stomp'); |
12 | has conn_desc => (is => 'rw', isa => 'Str'); |
13 | |
14 | =head1 NAME |
15 | |
16 | Catalyst::Engine::Stomp - write message handling apps with Catalyst. |
17 | |
18 | =head1 SYNOPSIS |
19 | |
20 | # In a server script: |
21 | |
22 | BEGIN { |
23 | $ENV{CATALYST_ENGINE} = 'Stomp'; |
24 | require Catalyst::Engine::Stomp; |
6e81857d |
25 | } |
0a663589 |
26 | |
27 | MyApp->config->{Engine::Stomp} = |
28 | { |
29 | hostname => '127.0.0.1', |
30 | port => 61613, |
31 | }; |
32 | MyApp->run(); |
33 | |
34 | # In a controller, or controller base class: |
d78bb739 |
35 | use base qw/ Catalyst::Controller::MessageDriven /; |
0a663589 |
36 | |
d78bb739 |
37 | # then create actions, which map as message types |
38 | sub testaction : Local { |
39 | my ($self, $c) = @_; |
0a663589 |
40 | |
d78bb739 |
41 | # Reply with a minimal response message |
42 | my $response = { type => 'testaction_response' }; |
43 | $c->stash->{response} = $response; |
44 | } |
45 | |
6e81857d |
46 | # The default serialization is YAML, but this configuration |
d78bb739 |
47 | # may be overridden in your controller: |
0a663589 |
48 | __PACKAGE__->config( |
6e81857d |
49 | 'default' => 'text/x-yaml', |
50 | 'stash_key' => 'rest', |
51 | 'map' => { 'text/x-yaml' => 'YAML' }, |
52 | ); |
0a663589 |
53 | |
0a663589 |
54 | =head1 DESCRIPTION |
55 | |
56 | Write a Catalyst app connected to a Stomp messagebroker, not HTTP. You |
6e81857d |
57 | need a controller that understands messaging, as well as this engine. |
0a663589 |
58 | |
59 | This is single-threaded and single process - you need to run multiple |
60 | instances of this engine to get concurrency, and configure your broker |
61 | to load-balance across multiple consumers of the same queue. |
62 | |
d78bb739 |
63 | Controllers are mapped to Stomp queues, and a controller base class is |
64 | provided, Catalyst::Controller::MessageDriven, which implements |
6e81857d |
65 | YAML-serialized messages, mapping a top-level YAML "type" key to |
66 | the action. |
d78bb739 |
67 | |
0a663589 |
68 | =head1 METHODS |
69 | |
70 | =head2 run |
71 | |
72 | App entry point. Starts a loop listening for messages. |
73 | |
74 | =cut |
75 | |
76 | sub run { |
77 | my ($self, $app, $oneshot) = @_; |
78 | |
79 | die 'No Engine::Stomp configuration found' |
6e81857d |
80 | unless ref $app->config->{'Engine::Stomp'} eq 'HASH'; |
0a663589 |
81 | |
82 | # list the path namespaces that will be mapped as queues. |
6e81857d |
83 | # |
84 | # this is known to use the deprecated |
85 | # Dispatcher->action_hash() method, but there doesn't appear |
86 | # to be another way to get the relevant strings out. |
87 | # |
88 | # http://github.com/rafl/catalyst-runtime/commit/5de163f4963d9dbb41d7311ca6f17314091b7af3#L2R644 |
89 | # |
0a663589 |
90 | my @queues = |
6e81857d |
91 | uniq |
92 | grep { length $_ } |
93 | map { $_->namespace } |
94 | values %{$app->dispatcher->action_hash}; |
0a663589 |
95 | |
6e81857d |
96 | # connect up |
0a663589 |
97 | my %template = %{$app->config->{'Engine::Stomp'}}; |
6e81857d |
98 | $self->connection(Net::Stomp->new(\%template)); |
99 | $self->connection->connect(); |
100 | $self->conn_desc($template{hostname}.':'.$template{port}); |
0a663589 |
101 | |
6e81857d |
102 | # subscribe, with client ack. |
0a663589 |
103 | foreach my $queue (@queues) { |
6e81857d |
104 | my $queue_name = "/queue/$queue"; |
105 | $self->connection->subscribe({ |
106 | destination => $queue_name, |
107 | ack => 'client', |
108 | }); |
0a663589 |
109 | } |
110 | |
6e81857d |
111 | # enter loop... |
112 | while (1) { |
113 | my $frame = $self->connection->receive_frame(); |
114 | $self->handle_stomp_frame($app, $frame); |
115 | last if $ENV{ENGINE_ONESHOT}; |
116 | } |
117 | exit 0; |
0a663589 |
118 | } |
119 | |
120 | =head2 prepare_request |
121 | |
122 | Overridden to add the source broker to the request, in place of the |
123 | client IP address. |
124 | |
125 | =cut |
126 | |
127 | sub prepare_request { |
128 | my ($self, $c, $req, $res_ref) = @_; |
6e81857d |
129 | shift @_; |
130 | $self->next::method(@_); |
131 | $c->req->address($self->conn_desc); |
0a663589 |
132 | } |
133 | |
134 | =head2 finalize_headers |
135 | |
d78bb739 |
136 | Overridden to dump out any errors encountered, since you won't get a |
137 | "debugging" message as for HTTP. |
0a663589 |
138 | |
139 | =cut |
140 | |
141 | sub finalize_headers { |
6e81857d |
142 | my ($self, $c) = @_; |
143 | my $error = join "\n", @{$c->error}; |
144 | if ($error) { |
145 | $c->log->debug($error); |
146 | } |
147 | return $self->next::method($c); |
0a663589 |
148 | } |
149 | |
150 | =head2 handle_stomp_frame |
151 | |
d78bb739 |
152 | Dispatch according to Stomp frame type. |
0a663589 |
153 | |
154 | =cut |
155 | |
156 | sub handle_stomp_frame { |
6e81857d |
157 | my ($self, $app, $frame) = @_; |
158 | |
159 | my $command = $frame->command(); |
160 | if ($command eq 'MESSAGE') { |
161 | $self->handle_stomp_message($app, $frame); |
162 | } |
163 | elsif ($command eq 'ERROR') { |
164 | $self->handle_stomp_error($app, $frame); |
165 | } |
166 | else { |
167 | $app->log->debug("Got unknown Stomp command: $command"); |
168 | } |
0a663589 |
169 | } |
170 | |
171 | =head2 handle_stomp_message |
172 | |
d78bb739 |
173 | Dispatch a Stomp message into the Catalyst app. |
0a663589 |
174 | |
175 | =cut |
176 | |
177 | sub handle_stomp_message { |
6e81857d |
178 | my ($self, $app, $frame) = @_; |
0a663589 |
179 | |
6e81857d |
180 | # queue -> controller |
181 | my $queue = $frame->headers->{destination}; |
182 | my ($controller) = $queue =~ m!^/queue/(.*)$!; |
0a663589 |
183 | |
6e81857d |
184 | # set up request |
0a663589 |
185 | my $config = $app->config->{'Engine::Stomp'}; |
186 | my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller; |
187 | my $req = HTTP::Request->new(POST => $url); |
6e81857d |
188 | $req->content($frame->body); |
189 | $req->content_length(length $frame->body); |
0a663589 |
190 | |
6e81857d |
191 | # dispatch |
192 | my $response; |
0a663589 |
193 | $app->handle_request($req, \$response); |
194 | |
6e81857d |
195 | # reply |
196 | my $reply_queue = '/remote-temp-queue/' . ($response->headers->header('X-Reply-Address')); |
197 | $self->connection->send({ destination => $reply_queue, body => $response->content }); |
0a663589 |
198 | |
6e81857d |
199 | # ack the message off the queue now we've replied |
200 | $self->connection->ack( { frame => $frame } ); |
0a663589 |
201 | } |
202 | |
203 | =head2 handle_stomp_error |
204 | |
d78bb739 |
205 | Log any Stomp error frames we receive. |
0a663589 |
206 | |
207 | =cut |
208 | |
209 | sub handle_stomp_error { |
6e81857d |
210 | my ($self, $app, $frame) = @_; |
211 | |
212 | my $error = $frame->headers->{message}; |
213 | $app->log->debug("Got Stomp error: $error"); |
0a663589 |
214 | } |
215 | |
d78bb739 |
216 | __PACKAGE__->meta->make_immutable; |
217 | |
0a663589 |
218 | 1; |
219 | |