Commit | Line | Data |
0a663589 |
1 | package Catalyst::Engine::Stomp; |
2 | use Moose; |
3 | extends 'Catalyst::Engine::Embeddable'; |
4 | |
0ee4d565 |
5 | our $VERSION = '0.04'; |
0a663589 |
6 | |
7 | use List::MoreUtils qw/ uniq /; |
8 | use HTTP::Request; |
9 | use Net::Stomp; |
10 | |
11 | has connection => (is => 'rw', isa => 'Net::Stomp'); |
12 | has conn_desc => (is => 'rw', isa => 'Str'); |
13 | |
14 | =head1 NAME |
15 | |
16 | Catalyst::Engine::Stomp - write message handling apps with Catalyst. |
17 | |
18 | =head1 SYNOPSIS |
19 | |
20 | # In a server script: |
21 | |
22 | BEGIN { |
23 | $ENV{CATALYST_ENGINE} = 'Stomp'; |
24 | require Catalyst::Engine::Stomp; |
25 | } |
26 | |
27 | MyApp->config->{Engine::Stomp} = |
28 | { |
29 | hostname => '127.0.0.1', |
30 | port => 61613, |
31 | }; |
32 | MyApp->run(); |
33 | |
34 | # In a controller, or controller base class: |
d78bb739 |
35 | use base qw/ Catalyst::Controller::MessageDriven /; |
0a663589 |
36 | |
d78bb739 |
37 | # then create actions, which map as message types |
38 | sub testaction : Local { |
39 | my ($self, $c) = @_; |
0a663589 |
40 | |
d78bb739 |
41 | # Reply with a minimal response message |
42 | my $response = { type => 'testaction_response' }; |
43 | $c->stash->{response} = $response; |
44 | } |
45 | |
0a663589 |
46 | =head1 DESCRIPTION |
47 | |
48 | Write a Catalyst app connected to a Stomp messagebroker, not HTTP. You |
49 | need a controller that understands messaging, as well as this engine. |
50 | |
51 | This is single-threaded and single process - you need to run multiple |
52 | instances of this engine to get concurrency, and configure your broker |
53 | to load-balance across multiple consumers of the same queue. |
54 | |
d78bb739 |
55 | Controllers are mapped to Stomp queues, and a controller base class is |
56 | provided, Catalyst::Controller::MessageDriven, which implements |
57 | YAML-serialized messages, mapping a top-level YAML "type" key to |
58 | the action. |
59 | |
0a663589 |
60 | =head1 METHODS |
61 | |
62 | =head2 run |
63 | |
64 | App entry point. Starts a loop listening for messages. |
65 | |
66 | =cut |
67 | |
68 | sub run { |
69 | my ($self, $app, $oneshot) = @_; |
70 | |
71 | die 'No Engine::Stomp configuration found' |
bf8937b7 |
72 | unless ref $app->config->{'Engine::Stomp'} eq 'HASH'; |
0a663589 |
73 | |
74 | # list the path namespaces that will be mapped as queues. |
bf8937b7 |
75 | # |
76 | # this is known to use the deprecated |
77 | # Dispatcher->action_hash() method, but there doesn't appear |
78 | # to be another way to get the relevant strings out. |
79 | # |
80 | # http://github.com/rafl/catalyst-runtime/commit/5de163f4963d9dbb41d7311ca6f17314091b7af3#L2R644 |
81 | # |
0a663589 |
82 | my @queues = |
bf8937b7 |
83 | uniq |
84 | grep { length $_ } |
85 | map { $_->namespace } |
86 | values %{$app->dispatcher->action_hash}; |
0a663589 |
87 | |
bf8937b7 |
88 | # connect up |
0a663589 |
89 | my %template = %{$app->config->{'Engine::Stomp'}}; |
bf8937b7 |
90 | $self->connection(Net::Stomp->new(\%template)); |
91 | $self->connection->connect(); |
92 | $self->conn_desc($template{hostname}.':'.$template{port}); |
0a663589 |
93 | |
bf8937b7 |
94 | # subscribe, with client ack. |
0a663589 |
95 | foreach my $queue (@queues) { |
bf8937b7 |
96 | my $queue_name = "/queue/$queue"; |
97 | $self->connection->subscribe({ |
98 | destination => $queue_name, |
99 | ack => 'client' |
100 | }); |
0a663589 |
101 | } |
102 | |
bf8937b7 |
103 | # enter loop... |
104 | while (1) { |
105 | my $frame = $self->connection->receive_frame(); |
106 | $self->handle_stomp_frame($app, $frame); |
107 | last if $ENV{ENGINE_ONESHOT}; |
108 | } |
109 | exit 0; |
0a663589 |
110 | } |
111 | |
112 | =head2 prepare_request |
113 | |
114 | Overridden to add the source broker to the request, in place of the |
115 | client IP address. |
116 | |
117 | =cut |
118 | |
119 | sub prepare_request { |
120 | my ($self, $c, $req, $res_ref) = @_; |
bf8937b7 |
121 | shift @_; |
122 | $self->next::method(@_); |
123 | $c->req->address($self->conn_desc); |
0a663589 |
124 | } |
125 | |
126 | =head2 finalize_headers |
127 | |
d78bb739 |
128 | Overridden to dump out any errors encountered, since you won't get a |
129 | "debugging" message as for HTTP. |
0a663589 |
130 | |
131 | =cut |
132 | |
133 | sub finalize_headers { |
bf8937b7 |
134 | my ($self, $c) = @_; |
135 | my $error = join "\n", @{$c->error}; |
136 | if ($error) { |
137 | $c->log->debug($error); |
138 | } |
139 | return $self->next::method($c); |
0a663589 |
140 | } |
141 | |
142 | =head2 handle_stomp_frame |
143 | |
d78bb739 |
144 | Dispatch according to Stomp frame type. |
0a663589 |
145 | |
146 | =cut |
147 | |
148 | sub handle_stomp_frame { |
bf8937b7 |
149 | my ($self, $app, $frame) = @_; |
150 | |
151 | my $command = $frame->command(); |
152 | if ($command eq 'MESSAGE') { |
153 | $self->handle_stomp_message($app, $frame); |
154 | } |
155 | elsif ($command eq 'ERROR') { |
156 | $self->handle_stomp_error($app, $frame); |
157 | } |
158 | else { |
159 | $app->log->debug("Got unknown Stomp command: $command"); |
160 | } |
0a663589 |
161 | } |
162 | |
163 | =head2 handle_stomp_message |
164 | |
d78bb739 |
165 | Dispatch a Stomp message into the Catalyst app. |
0a663589 |
166 | |
167 | =cut |
168 | |
169 | sub handle_stomp_message { |
bf8937b7 |
170 | my ($self, $app, $frame) = @_; |
0a663589 |
171 | |
bf8937b7 |
172 | # queue -> controller |
173 | my $queue = $frame->headers->{destination}; |
174 | my ($controller) = $queue =~ m!^/queue/(.*)$!; |
0a663589 |
175 | |
bf8937b7 |
176 | # set up request |
0a663589 |
177 | my $config = $app->config->{'Engine::Stomp'}; |
178 | my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller; |
179 | my $req = HTTP::Request->new(POST => $url); |
bf8937b7 |
180 | $req->content($frame->body); |
181 | $req->content_length(length $frame->body); |
0a663589 |
182 | |
bf8937b7 |
183 | # dispatch |
184 | my $response; |
0a663589 |
185 | $app->handle_request($req, \$response); |
186 | |
bf8937b7 |
187 | # reply, if header set |
188 | if (my $reply_to = $response->headers->header('X-Reply-Address')) { |
189 | my $reply_queue = '/remote-temp-queue/' . $reply_to; |
190 | $self->connection->send({ destination => $reply_queue, body => $response->content }); |
191 | } |
0a663589 |
192 | |
bf8937b7 |
193 | # ack the message off the queue now we've replied / processed |
194 | $self->connection->ack( { frame => $frame } ); |
0a663589 |
195 | } |
196 | |
197 | =head2 handle_stomp_error |
198 | |
d78bb739 |
199 | Log any Stomp error frames we receive. |
0a663589 |
200 | |
201 | =cut |
202 | |
203 | sub handle_stomp_error { |
bf8937b7 |
204 | my ($self, $app, $frame) = @_; |
205 | |
206 | my $error = $frame->headers->{message}; |
207 | $app->log->debug("Got Stomp error: $error"); |
0a663589 |
208 | } |
209 | |
d78bb739 |
210 | __PACKAGE__->meta->make_immutable; |
211 | |
0a663589 |
212 | 1; |
213 | |