Commit | Line | Data |
0a663589 |
1 | package Catalyst::Engine::Stomp; |
2 | use Moose; |
0a663589 |
3 | use List::MoreUtils qw/ uniq /; |
4 | use HTTP::Request; |
5 | use Net::Stomp; |
a58d0da6 |
6 | use MooseX::Types::Moose qw/Str Int HashRef/; |
03c90167 |
7 | use namespace::autoclean; |
8 | |
9 | extends 'Catalyst::Engine::Embeddable'; |
10 | |
f20933b7 |
11 | our $VERSION = '0.06'; |
0a663589 |
12 | |
13 | has connection => (is => 'rw', isa => 'Net::Stomp'); |
a58d0da6 |
14 | has conn_desc => (is => 'rw', isa => Str); |
0a663589 |
15 | |
16 | =head1 NAME |
17 | |
18 | Catalyst::Engine::Stomp - write message handling apps with Catalyst. |
19 | |
20 | =head1 SYNOPSIS |
21 | |
22 | # In a server script: |
23 | |
24 | BEGIN { |
25 | $ENV{CATALYST_ENGINE} = 'Stomp'; |
26 | require Catalyst::Engine::Stomp; |
5ec5e0b1 |
27 | } |
0a663589 |
28 | |
a58d0da6 |
29 | MyApp->config( |
30 | 'Engine::Stomp' = { |
31 | hostname => '127.0.0.1', |
32 | port => 61613, |
33 | subscribe_header => { |
34 | transformation => 'jms-to-json', |
35 | } |
36 | }, |
37 | ); |
0a663589 |
38 | MyApp->run(); |
39 | |
40 | # In a controller, or controller base class: |
d78bb739 |
41 | use base qw/ Catalyst::Controller::MessageDriven /; |
0a663589 |
42 | |
d78bb739 |
43 | # then create actions, which map as message types |
44 | sub testaction : Local { |
45 | my ($self, $c) = @_; |
0a663589 |
46 | |
d78bb739 |
47 | # Reply with a minimal response message |
48 | my $response = { type => 'testaction_response' }; |
49 | $c->stash->{response} = $response; |
50 | } |
51 | |
0a663589 |
52 | =head1 DESCRIPTION |
53 | |
54 | Write a Catalyst app connected to a Stomp messagebroker, not HTTP. You |
5ec5e0b1 |
55 | need a controller that understands messaging, as well as this engine. |
0a663589 |
56 | |
57 | This is single-threaded and single process - you need to run multiple |
58 | instances of this engine to get concurrency, and configure your broker |
59 | to load-balance across multiple consumers of the same queue. |
60 | |
d78bb739 |
61 | Controllers are mapped to Stomp queues, and a controller base class is |
62 | provided, Catalyst::Controller::MessageDriven, which implements |
5ec5e0b1 |
63 | YAML-serialized messages, mapping a top-level YAML "type" key to |
64 | the action. |
d78bb739 |
65 | |
0a663589 |
66 | =head1 METHODS |
67 | |
68 | =head2 run |
69 | |
70 | App entry point. Starts a loop listening for messages. |
71 | |
72 | =cut |
73 | |
74 | sub run { |
75 | my ($self, $app, $oneshot) = @_; |
76 | |
77 | die 'No Engine::Stomp configuration found' |
bf8937b7 |
78 | unless ref $app->config->{'Engine::Stomp'} eq 'HASH'; |
0a663589 |
79 | |
b9aa4861 |
80 | my @queues = grep { length $_ } |
ccd9e8fa |
81 | map { $app->controller($_)->action_namespace } $app->controllers; |
0a663589 |
82 | |
bf8937b7 |
83 | # connect up |
0a663589 |
84 | my %template = %{$app->config->{'Engine::Stomp'}}; |
de85f238 |
85 | my $subscribe_headers = $template{subscribe_headers} || {}; |
6f80183a |
86 | die("subscribe_headers config for Engine::Stomp must be a hashref!\n") |
87 | if (ref($subscribe_headers) ne 'HASH'); |
88 | |
bf8937b7 |
89 | $self->connection(Net::Stomp->new(\%template)); |
90 | $self->connection->connect(); |
91 | $self->conn_desc($template{hostname}.':'.$template{port}); |
0a663589 |
92 | |
bf8937b7 |
93 | # subscribe, with client ack. |
0a663589 |
94 | foreach my $queue (@queues) { |
bf8937b7 |
95 | my $queue_name = "/queue/$queue"; |
6f80183a |
96 | $self->connection->subscribe({ |
97 | %$subscribe_headers, |
d2057f6f |
98 | destination => $queue_name, |
99 | ack => 'client', |
6f80183a |
100 | }); |
0a663589 |
101 | } |
102 | |
bf8937b7 |
103 | # enter loop... |
104 | while (1) { |
105 | my $frame = $self->connection->receive_frame(); |
106 | $self->handle_stomp_frame($app, $frame); |
107 | last if $ENV{ENGINE_ONESHOT}; |
108 | } |
109 | exit 0; |
0a663589 |
110 | } |
111 | |
112 | =head2 prepare_request |
113 | |
114 | Overridden to add the source broker to the request, in place of the |
115 | client IP address. |
116 | |
117 | =cut |
118 | |
119 | sub prepare_request { |
03c90167 |
120 | my ($self, $c, $req, $res_ref) = @_; |
121 | shift @_; |
122 | $self->next::method(@_); |
123 | $c->req->address($self->conn_desc); |
0a663589 |
124 | } |
125 | |
126 | =head2 finalize_headers |
127 | |
d78bb739 |
128 | Overridden to dump out any errors encountered, since you won't get a |
129 | "debugging" message as for HTTP. |
0a663589 |
130 | |
131 | =cut |
132 | |
133 | sub finalize_headers { |
03c90167 |
134 | my ($self, $c) = @_; |
135 | my $error = join "\n", @{$c->error}; |
136 | if ($error) { |
137 | $c->log->debug($error); |
138 | } |
139 | return $self->next::method($c); |
0a663589 |
140 | } |
141 | |
142 | =head2 handle_stomp_frame |
143 | |
d78bb739 |
144 | Dispatch according to Stomp frame type. |
0a663589 |
145 | |
146 | =cut |
147 | |
148 | sub handle_stomp_frame { |
03c90167 |
149 | my ($self, $app, $frame) = @_; |
150 | |
151 | my $command = $frame->command(); |
152 | if ($command eq 'MESSAGE') { |
153 | $self->handle_stomp_message($app, $frame); |
154 | } |
155 | elsif ($command eq 'ERROR') { |
156 | $self->handle_stomp_error($app, $frame); |
157 | } |
158 | else { |
159 | $app->log->debug("Got unknown Stomp command: $command"); |
160 | } |
0a663589 |
161 | } |
162 | |
163 | =head2 handle_stomp_message |
164 | |
d78bb739 |
165 | Dispatch a Stomp message into the Catalyst app. |
0a663589 |
166 | |
167 | =cut |
168 | |
169 | sub handle_stomp_message { |
03c90167 |
170 | my ($self, $app, $frame) = @_; |
171 | |
172 | # queue -> controller |
173 | my $queue = $frame->headers->{destination}; |
174 | my ($controller) = $queue =~ m|^/queue/(.*)$|; |
175 | |
176 | # set up request |
177 | my $config = $app->config->{'Engine::Stomp'}; |
178 | my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller; |
179 | my $req = HTTP::Request->new(POST => $url); |
180 | $req->content($frame->body); |
181 | $req->content_length(length $frame->body); |
182 | |
183 | # dispatch |
184 | my $response; |
185 | $app->handle_request($req, \$response); |
186 | |
187 | # reply, if header set |
188 | if (my $reply_to = $response->headers->header('X-Reply-Address')) { |
189 | my $reply_queue = '/remote-temp-queue/' . $reply_to; |
190 | $self->connection->send({ destination => $reply_queue, body => $response->content }); |
191 | } |
192 | |
193 | # ack the message off the queue now we've replied / processed |
194 | $self->connection->ack( { frame => $frame } ); |
0a663589 |
195 | } |
0a663589 |
196 | =head2 handle_stomp_error |
197 | |
d78bb739 |
198 | Log any Stomp error frames we receive. |
0a663589 |
199 | |
200 | =cut |
201 | |
202 | sub handle_stomp_error { |
03c90167 |
203 | my ($self, $app, $frame) = @_; |
5ec5e0b1 |
204 | |
03c90167 |
205 | my $error = $frame->headers->{message}; |
206 | $app->log->debug("Got Stomp error: $error"); |
0a663589 |
207 | } |
208 | |
d78bb739 |
209 | __PACKAGE__->meta->make_immutable; |
210 | |
d7c364aa |
211 | =head1 CONFIGURATION |
212 | |
213 | =head2 subscribe_header |
214 | |
215 | Add additional header key/value pairs to the subscribe message sent to the |
216 | message broker. |
217 | |
218 | =cut |
491ffbb3 |
219 | |
220 | =head1 DEVELOPMENT |
221 | |
222 | The source to Catalyst::Engine::Stomp is in github: |
223 | |
224 | http://github.com/chrisa/catalyst-engine-stomp |
225 | |
226 | =head1 AUTHOR |
227 | |
228 | Chris Andrews C<< <chris@nodnol.org> >> |
229 | |
230 | =head1 CONTRIBUTORS |
231 | |
232 | Tomas Doran (t0m) C<< <bobtfish@bobtfish.net> >> |
233 | |
234 | Jason Tang |
235 | |
236 | =head1 LICENCE AND COPYRIGHT |
237 | |
238 | Copyright (C) 2009 Venda Ltd |
239 | |
240 | This library is free software; you can redistribute it and/or modify |
241 | it under the same terms as Perl itself, either Perl version 5.8.8 or, |
242 | at your option, any later version of Perl 5 you may have available. |
243 | |
244 | =cut |
245 | |