Commit | Line | Data |
0a663589 |
1 | package Catalyst::Engine::Stomp; |
2 | use Moose; |
3 | extends 'Catalyst::Engine::Embeddable'; |
4 | |
5 | our $VERSION = '0.01'; |
6 | |
7 | use List::MoreUtils qw/ uniq /; |
8 | use HTTP::Request; |
9 | use Net::Stomp; |
10 | |
11 | has connection => (is => 'rw', isa => 'Net::Stomp'); |
12 | has conn_desc => (is => 'rw', isa => 'Str'); |
13 | |
14 | =head1 NAME |
15 | |
16 | Catalyst::Engine::Stomp - write message handling apps with Catalyst. |
17 | |
18 | =head1 SYNOPSIS |
19 | |
20 | # In a server script: |
21 | |
22 | BEGIN { |
23 | $ENV{CATALYST_ENGINE} = 'Stomp'; |
24 | require Catalyst::Engine::Stomp; |
25 | } |
26 | |
27 | MyApp->config->{Engine::Stomp} = |
28 | { |
29 | hostname => '127.0.0.1', |
30 | port => 61613, |
31 | }; |
32 | MyApp->run(); |
33 | |
34 | # In a controller, or controller base class: |
35 | |
36 | use YAML; |
37 | |
38 | # configure YAML deserialization; requires Catalyst::Action::REST |
39 | __PACKAGE__->config( |
40 | 'default' => 'text/x-yaml', |
41 | 'stash_key' => 'rest', |
42 | 'map' => { 'text/x-yaml' => 'YAML' }, |
43 | ); |
44 | |
45 | sub begin :ActionClass('Deserialize') { } |
46 | |
47 | # have a default action, which forwards to the correct action |
48 | # based on the message contents (the type). |
49 | sub default : Private { |
50 | my ($self, $c) = @_; |
51 | |
52 | my $action = $c->req->data->{type}; |
53 | $c->forward($action); |
54 | } |
55 | |
56 | # Send messages back: |
57 | $c->engine->send_message($queue, Dump($msg)); |
58 | |
59 | =head1 DESCRIPTION |
60 | |
61 | Write a Catalyst app connected to a Stomp messagebroker, not HTTP. You |
62 | need a controller that understands messaging, as well as this engine. |
63 | |
64 | This is single-threaded and single process - you need to run multiple |
65 | instances of this engine to get concurrency, and configure your broker |
66 | to load-balance across multiple consumers of the same queue. |
67 | |
68 | =head1 METHODS |
69 | |
70 | =head2 run |
71 | |
72 | App entry point. Starts a loop listening for messages. |
73 | |
74 | =cut |
75 | |
76 | sub run { |
77 | my ($self, $app, $oneshot) = @_; |
78 | |
79 | die 'No Engine::Stomp configuration found' |
80 | unless ref $app->config->{'Engine::Stomp'} eq 'HASH'; |
81 | |
82 | # list the path namespaces that will be mapped as queues. |
83 | # |
84 | # this is known to use the deprecated |
85 | # Dispatcher->action_hash() method, but there doesn't appear |
86 | # to be another way to get the relevant strings out. |
87 | # |
88 | # http://github.com/rafl/catalyst-runtime/commit/5de163f4963d9dbb41d7311ca6f17314091b7af3#L2R644 |
89 | # |
90 | my @queues = |
91 | uniq |
92 | grep { length $_ } |
93 | map { $_->namespace } |
94 | values %{$app->dispatcher->action_hash}; |
95 | |
96 | # connect up |
97 | my %template = %{$app->config->{'Engine::Stomp'}}; |
98 | $self->connection(Net::Stomp->new(\%template)); |
99 | $self->connection->connect(); |
100 | $self->conn_desc($template{hostname}.':'.$template{port}); |
101 | |
102 | # subscribe, with client ack. |
103 | foreach my $queue (@queues) { |
104 | my $queue_name = "/queue/$queue"; |
105 | $self->connection->subscribe({ |
106 | destination => $queue_name, |
107 | ack => 'client' |
108 | }); |
109 | } |
110 | |
111 | # enter loop... |
112 | while (1) { |
113 | my $frame = $self->connection->receive_frame(); |
114 | $self->handle_stomp_frame($app, $frame); |
115 | last if $ENV{ENGINE_ONESHOT}; |
116 | } |
117 | exit 0; |
118 | } |
119 | |
120 | =head2 prepare_request |
121 | |
122 | Overridden to add the source broker to the request, in place of the |
123 | client IP address. |
124 | |
125 | =cut |
126 | |
127 | sub prepare_request { |
128 | my ($self, $c, $req, $res_ref) = @_; |
129 | shift @_; |
130 | $self->next::method(@_); |
131 | $c->req->address($self->conn_desc); |
132 | } |
133 | |
134 | =head2 finalize_headers |
135 | |
136 | Overridden to dump out any errors encountered. |
137 | |
138 | =cut |
139 | |
140 | sub finalize_headers { |
141 | my ($self, $c) = @_; |
142 | my $error = join "\n", @{$c->error}; |
143 | if ($error) { |
144 | $c->log->debug($error); |
145 | } |
146 | return $self->next::method($c); |
147 | } |
148 | |
149 | =head2 handle_stomp_frame |
150 | |
151 | Dispatch according to STOMP frame type. |
152 | |
153 | =cut |
154 | |
155 | sub handle_stomp_frame { |
156 | my ($self, $app, $frame) = @_; |
157 | |
158 | my $command = $frame->command(); |
159 | $app->log->debug("Got STOMP command: $command"); |
160 | |
161 | if ($command eq 'MESSAGE') { |
162 | $self->handle_stomp_message($app, $frame); |
163 | } |
164 | elsif ($command eq 'ERROR') { |
165 | $self->handle_stomp_error($app, $frame); |
166 | } |
167 | else { |
168 | $app->log->debug("Got unknown STOMP command: $command"); |
169 | } |
170 | } |
171 | |
172 | =head2 handle_stomp_message |
173 | |
174 | Dispatch a STOMP message into the Catalyst app. |
175 | |
176 | =cut |
177 | |
178 | sub handle_stomp_message { |
179 | my ($self, $app, $frame) = @_; |
180 | |
181 | # queue -> controller |
182 | my $queue = $frame->headers->{destination}; |
183 | my ($controller) = $queue =~ m!^/queue/(.*)$!; |
184 | |
185 | # set up request |
186 | my $config = $app->config->{'Engine::Stomp'}; |
187 | my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller; |
188 | my $req = HTTP::Request->new(POST => $url); |
189 | $req->content($frame->body); |
190 | $req->content_length(length $frame->body); |
191 | |
192 | # dispatch |
193 | my $response; |
194 | $app->handle_request($req, \$response); |
195 | |
196 | # reply |
197 | my $reply_queue = '/remote-temp-queue/' . ($response->headers->header('X-Reply-Address')); |
198 | $app->log->debug("replying to $reply_queue\n"); |
199 | $self->connection->send({ destination => $reply_queue, body => $response->content }); |
200 | |
201 | # ack the message off the queue now we've replied |
202 | $self->connection->ack( { frame => $frame } ); |
203 | } |
204 | |
205 | =head2 handle_stomp_error |
206 | |
207 | Log any STOMP error frames we receive. |
208 | |
209 | =cut |
210 | |
211 | sub handle_stomp_error { |
212 | my ($self, $app, $frame) = @_; |
213 | |
214 | my $error = $frame->headers->{message}; |
215 | $app->log->debug("Got STOMP error: $error"); |
216 | } |
217 | |
218 | 1; |
219 | |