Commit | Line | Data |
0a663589 |
1 | package Catalyst::Engine::Stomp; |
2 | use Moose; |
0a663589 |
3 | use List::MoreUtils qw/ uniq /; |
4 | use HTTP::Request; |
5 | use Net::Stomp; |
03c90167 |
6 | use namespace::autoclean; |
7 | |
8 | extends 'Catalyst::Engine::Embeddable'; |
9 | |
f20933b7 |
10 | our $VERSION = '0.06'; |
0a663589 |
11 | |
12 | has connection => (is => 'rw', isa => 'Net::Stomp'); |
13 | has conn_desc => (is => 'rw', isa => 'Str'); |
14 | |
15 | =head1 NAME |
16 | |
17 | Catalyst::Engine::Stomp - write message handling apps with Catalyst. |
18 | |
19 | =head1 SYNOPSIS |
20 | |
21 | # In a server script: |
22 | |
23 | BEGIN { |
24 | $ENV{CATALYST_ENGINE} = 'Stomp'; |
25 | require Catalyst::Engine::Stomp; |
5ec5e0b1 |
26 | } |
0a663589 |
27 | |
28 | MyApp->config->{Engine::Stomp} = |
29 | { |
d7c364aa |
30 | hostname => '127.0.0.1', |
31 | port => 61613, |
32 | subscribe_header => { |
33 | transformation => 'jms-to-json', |
34 | } |
0a663589 |
35 | }; |
36 | MyApp->run(); |
37 | |
38 | # In a controller, or controller base class: |
d78bb739 |
39 | use base qw/ Catalyst::Controller::MessageDriven /; |
0a663589 |
40 | |
d78bb739 |
41 | # then create actions, which map as message types |
42 | sub testaction : Local { |
43 | my ($self, $c) = @_; |
0a663589 |
44 | |
d78bb739 |
45 | # Reply with a minimal response message |
46 | my $response = { type => 'testaction_response' }; |
47 | $c->stash->{response} = $response; |
48 | } |
49 | |
0a663589 |
50 | =head1 DESCRIPTION |
51 | |
52 | Write a Catalyst app connected to a Stomp messagebroker, not HTTP. You |
5ec5e0b1 |
53 | need a controller that understands messaging, as well as this engine. |
0a663589 |
54 | |
55 | This is single-threaded and single process - you need to run multiple |
56 | instances of this engine to get concurrency, and configure your broker |
57 | to load-balance across multiple consumers of the same queue. |
58 | |
d78bb739 |
59 | Controllers are mapped to Stomp queues, and a controller base class is |
60 | provided, Catalyst::Controller::MessageDriven, which implements |
5ec5e0b1 |
61 | YAML-serialized messages, mapping a top-level YAML "type" key to |
62 | the action. |
d78bb739 |
63 | |
0a663589 |
64 | =head1 METHODS |
65 | |
66 | =head2 run |
67 | |
68 | App entry point. Starts a loop listening for messages. |
69 | |
70 | =cut |
71 | |
72 | sub run { |
73 | my ($self, $app, $oneshot) = @_; |
74 | |
75 | die 'No Engine::Stomp configuration found' |
bf8937b7 |
76 | unless ref $app->config->{'Engine::Stomp'} eq 'HASH'; |
0a663589 |
77 | |
b9aa4861 |
78 | my @queues = grep { length $_ } |
ccd9e8fa |
79 | map { $app->controller($_)->action_namespace } $app->controllers; |
0a663589 |
80 | |
bf8937b7 |
81 | # connect up |
0a663589 |
82 | my %template = %{$app->config->{'Engine::Stomp'}}; |
d2057f6f |
83 | my $add_header = delete $template{subscribe_header}; |
84 | if (ref($add_header) ne 'HASH') { |
85 | $add_header = undef; |
86 | } |
bf8937b7 |
87 | $self->connection(Net::Stomp->new(\%template)); |
88 | $self->connection->connect(); |
89 | $self->conn_desc($template{hostname}.':'.$template{port}); |
0a663589 |
90 | |
bf8937b7 |
91 | # subscribe, with client ack. |
0a663589 |
92 | foreach my $queue (@queues) { |
bf8937b7 |
93 | my $queue_name = "/queue/$queue"; |
d2057f6f |
94 | my $header_hash = { |
95 | destination => $queue_name, |
96 | ack => 'client', |
97 | }; |
98 | |
99 | # add the additional headers - yes I know it overwrites but |
100 | # thats the dev's problem? |
101 | if (keys %{$add_header}) { |
102 | foreach my $key (keys %{$add_header}) { |
103 | $header_hash->{$key} = $add_header->{$key}; |
104 | } |
105 | } |
106 | |
107 | $self->connection->subscribe($header_hash); |
0a663589 |
108 | } |
109 | |
bf8937b7 |
110 | # enter loop... |
111 | while (1) { |
112 | my $frame = $self->connection->receive_frame(); |
113 | $self->handle_stomp_frame($app, $frame); |
114 | last if $ENV{ENGINE_ONESHOT}; |
115 | } |
116 | exit 0; |
0a663589 |
117 | } |
118 | |
119 | =head2 prepare_request |
120 | |
121 | Overridden to add the source broker to the request, in place of the |
122 | client IP address. |
123 | |
124 | =cut |
125 | |
126 | sub prepare_request { |
03c90167 |
127 | my ($self, $c, $req, $res_ref) = @_; |
128 | shift @_; |
129 | $self->next::method(@_); |
130 | $c->req->address($self->conn_desc); |
0a663589 |
131 | } |
132 | |
133 | =head2 finalize_headers |
134 | |
d78bb739 |
135 | Overridden to dump out any errors encountered, since you won't get a |
136 | "debugging" message as for HTTP. |
0a663589 |
137 | |
138 | =cut |
139 | |
140 | sub finalize_headers { |
03c90167 |
141 | my ($self, $c) = @_; |
142 | my $error = join "\n", @{$c->error}; |
143 | if ($error) { |
144 | $c->log->debug($error); |
145 | } |
146 | return $self->next::method($c); |
0a663589 |
147 | } |
148 | |
149 | =head2 handle_stomp_frame |
150 | |
d78bb739 |
151 | Dispatch according to Stomp frame type. |
0a663589 |
152 | |
153 | =cut |
154 | |
155 | sub handle_stomp_frame { |
03c90167 |
156 | my ($self, $app, $frame) = @_; |
157 | |
158 | my $command = $frame->command(); |
159 | if ($command eq 'MESSAGE') { |
160 | $self->handle_stomp_message($app, $frame); |
161 | } |
162 | elsif ($command eq 'ERROR') { |
163 | $self->handle_stomp_error($app, $frame); |
164 | } |
165 | else { |
166 | $app->log->debug("Got unknown Stomp command: $command"); |
167 | } |
0a663589 |
168 | } |
169 | |
170 | =head2 handle_stomp_message |
171 | |
d78bb739 |
172 | Dispatch a Stomp message into the Catalyst app. |
0a663589 |
173 | |
174 | =cut |
175 | |
176 | sub handle_stomp_message { |
03c90167 |
177 | my ($self, $app, $frame) = @_; |
178 | |
179 | # queue -> controller |
180 | my $queue = $frame->headers->{destination}; |
181 | my ($controller) = $queue =~ m|^/queue/(.*)$|; |
182 | |
183 | # set up request |
184 | my $config = $app->config->{'Engine::Stomp'}; |
185 | my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller; |
186 | my $req = HTTP::Request->new(POST => $url); |
187 | $req->content($frame->body); |
188 | $req->content_length(length $frame->body); |
189 | |
190 | # dispatch |
191 | my $response; |
192 | $app->handle_request($req, \$response); |
193 | |
194 | # reply, if header set |
195 | if (my $reply_to = $response->headers->header('X-Reply-Address')) { |
196 | my $reply_queue = '/remote-temp-queue/' . $reply_to; |
197 | $self->connection->send({ destination => $reply_queue, body => $response->content }); |
198 | } |
199 | |
200 | # ack the message off the queue now we've replied / processed |
201 | $self->connection->ack( { frame => $frame } ); |
0a663589 |
202 | } |
0a663589 |
203 | =head2 handle_stomp_error |
204 | |
d78bb739 |
205 | Log any Stomp error frames we receive. |
0a663589 |
206 | |
207 | =cut |
208 | |
209 | sub handle_stomp_error { |
03c90167 |
210 | my ($self, $app, $frame) = @_; |
5ec5e0b1 |
211 | |
03c90167 |
212 | my $error = $frame->headers->{message}; |
213 | $app->log->debug("Got Stomp error: $error"); |
0a663589 |
214 | } |
215 | |
d78bb739 |
216 | __PACKAGE__->meta->make_immutable; |
217 | |
d7c364aa |
218 | =head1 CONFIGURATION |
219 | |
220 | =head2 subscribe_header |
221 | |
222 | Add additional header key/value pairs to the subscribe message sent to the |
223 | message broker. |
224 | |
225 | =cut |