Commit | Line | Data |
0a663589 |
1 | package Catalyst::Engine::Stomp; |
2 | use Moose; |
0a663589 |
3 | use List::MoreUtils qw/ uniq /; |
4 | use HTTP::Request; |
5 | use Net::Stomp; |
03c90167 |
6 | use namespace::autoclean; |
7 | |
8 | extends 'Catalyst::Engine::Embeddable'; |
9 | |
f20933b7 |
10 | our $VERSION = '0.06'; |
0a663589 |
11 | |
12 | has connection => (is => 'rw', isa => 'Net::Stomp'); |
13 | has conn_desc => (is => 'rw', isa => 'Str'); |
14 | |
15 | =head1 NAME |
16 | |
17 | Catalyst::Engine::Stomp - write message handling apps with Catalyst. |
18 | |
19 | =head1 SYNOPSIS |
20 | |
21 | # In a server script: |
22 | |
23 | BEGIN { |
24 | $ENV{CATALYST_ENGINE} = 'Stomp'; |
25 | require Catalyst::Engine::Stomp; |
5ec5e0b1 |
26 | } |
0a663589 |
27 | |
28 | MyApp->config->{Engine::Stomp} = |
29 | { |
30 | hostname => '127.0.0.1', |
31 | port => 61613, |
32 | }; |
33 | MyApp->run(); |
34 | |
35 | # In a controller, or controller base class: |
d78bb739 |
36 | use base qw/ Catalyst::Controller::MessageDriven /; |
0a663589 |
37 | |
d78bb739 |
38 | # then create actions, which map as message types |
39 | sub testaction : Local { |
40 | my ($self, $c) = @_; |
0a663589 |
41 | |
d78bb739 |
42 | # Reply with a minimal response message |
43 | my $response = { type => 'testaction_response' }; |
44 | $c->stash->{response} = $response; |
45 | } |
46 | |
0a663589 |
47 | =head1 DESCRIPTION |
48 | |
49 | Write a Catalyst app connected to a Stomp messagebroker, not HTTP. You |
5ec5e0b1 |
50 | need a controller that understands messaging, as well as this engine. |
0a663589 |
51 | |
52 | This is single-threaded and single process - you need to run multiple |
53 | instances of this engine to get concurrency, and configure your broker |
54 | to load-balance across multiple consumers of the same queue. |
55 | |
d78bb739 |
56 | Controllers are mapped to Stomp queues, and a controller base class is |
57 | provided, Catalyst::Controller::MessageDriven, which implements |
5ec5e0b1 |
58 | YAML-serialized messages, mapping a top-level YAML "type" key to |
59 | the action. |
d78bb739 |
60 | |
0a663589 |
61 | =head1 METHODS |
62 | |
63 | =head2 run |
64 | |
65 | App entry point. Starts a loop listening for messages. |
66 | |
67 | =cut |
68 | |
69 | sub run { |
70 | my ($self, $app, $oneshot) = @_; |
71 | |
72 | die 'No Engine::Stomp configuration found' |
bf8937b7 |
73 | unless ref $app->config->{'Engine::Stomp'} eq 'HASH'; |
0a663589 |
74 | |
b9aa4861 |
75 | my @queues = grep { length $_ } |
ccd9e8fa |
76 | map { $app->controller($_)->action_namespace } $app->controllers; |
0a663589 |
77 | |
bf8937b7 |
78 | # connect up |
0a663589 |
79 | my %template = %{$app->config->{'Engine::Stomp'}}; |
d2057f6f |
80 | my $add_header = delete $template{subscribe_header}; |
81 | if (ref($add_header) ne 'HASH') { |
82 | $add_header = undef; |
83 | } |
bf8937b7 |
84 | $self->connection(Net::Stomp->new(\%template)); |
85 | $self->connection->connect(); |
86 | $self->conn_desc($template{hostname}.':'.$template{port}); |
0a663589 |
87 | |
bf8937b7 |
88 | # subscribe, with client ack. |
0a663589 |
89 | foreach my $queue (@queues) { |
bf8937b7 |
90 | my $queue_name = "/queue/$queue"; |
d2057f6f |
91 | my $header_hash = { |
92 | destination => $queue_name, |
93 | ack => 'client', |
94 | }; |
95 | |
96 | # add the additional headers - yes I know it overwrites but |
97 | # thats the dev's problem? |
98 | if (keys %{$add_header}) { |
99 | foreach my $key (keys %{$add_header}) { |
100 | $header_hash->{$key} = $add_header->{$key}; |
101 | } |
102 | } |
103 | |
104 | $self->connection->subscribe($header_hash); |
0a663589 |
105 | } |
106 | |
bf8937b7 |
107 | # enter loop... |
108 | while (1) { |
109 | my $frame = $self->connection->receive_frame(); |
110 | $self->handle_stomp_frame($app, $frame); |
111 | last if $ENV{ENGINE_ONESHOT}; |
112 | } |
113 | exit 0; |
0a663589 |
114 | } |
115 | |
116 | =head2 prepare_request |
117 | |
118 | Overridden to add the source broker to the request, in place of the |
119 | client IP address. |
120 | |
121 | =cut |
122 | |
123 | sub prepare_request { |
03c90167 |
124 | my ($self, $c, $req, $res_ref) = @_; |
125 | shift @_; |
126 | $self->next::method(@_); |
127 | $c->req->address($self->conn_desc); |
0a663589 |
128 | } |
129 | |
130 | =head2 finalize_headers |
131 | |
d78bb739 |
132 | Overridden to dump out any errors encountered, since you won't get a |
133 | "debugging" message as for HTTP. |
0a663589 |
134 | |
135 | =cut |
136 | |
137 | sub finalize_headers { |
03c90167 |
138 | my ($self, $c) = @_; |
139 | my $error = join "\n", @{$c->error}; |
140 | if ($error) { |
141 | $c->log->debug($error); |
142 | } |
143 | return $self->next::method($c); |
0a663589 |
144 | } |
145 | |
146 | =head2 handle_stomp_frame |
147 | |
d78bb739 |
148 | Dispatch according to Stomp frame type. |
0a663589 |
149 | |
150 | =cut |
151 | |
152 | sub handle_stomp_frame { |
03c90167 |
153 | my ($self, $app, $frame) = @_; |
154 | |
155 | my $command = $frame->command(); |
156 | if ($command eq 'MESSAGE') { |
157 | $self->handle_stomp_message($app, $frame); |
158 | } |
159 | elsif ($command eq 'ERROR') { |
160 | $self->handle_stomp_error($app, $frame); |
161 | } |
162 | else { |
163 | $app->log->debug("Got unknown Stomp command: $command"); |
164 | } |
0a663589 |
165 | } |
166 | |
167 | =head2 handle_stomp_message |
168 | |
d78bb739 |
169 | Dispatch a Stomp message into the Catalyst app. |
0a663589 |
170 | |
171 | =cut |
172 | |
173 | sub handle_stomp_message { |
03c90167 |
174 | my ($self, $app, $frame) = @_; |
175 | |
176 | # queue -> controller |
177 | my $queue = $frame->headers->{destination}; |
178 | my ($controller) = $queue =~ m|^/queue/(.*)$|; |
179 | |
180 | # set up request |
181 | my $config = $app->config->{'Engine::Stomp'}; |
182 | my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller; |
183 | my $req = HTTP::Request->new(POST => $url); |
184 | $req->content($frame->body); |
185 | $req->content_length(length $frame->body); |
186 | |
187 | # dispatch |
188 | my $response; |
189 | $app->handle_request($req, \$response); |
190 | |
191 | # reply, if header set |
192 | if (my $reply_to = $response->headers->header('X-Reply-Address')) { |
193 | my $reply_queue = '/remote-temp-queue/' . $reply_to; |
194 | $self->connection->send({ destination => $reply_queue, body => $response->content }); |
195 | } |
196 | |
197 | # ack the message off the queue now we've replied / processed |
198 | $self->connection->ack( { frame => $frame } ); |
0a663589 |
199 | } |
200 | |
201 | =head2 handle_stomp_error |
202 | |
d78bb739 |
203 | Log any Stomp error frames we receive. |
0a663589 |
204 | |
205 | =cut |
206 | |
207 | sub handle_stomp_error { |
03c90167 |
208 | my ($self, $app, $frame) = @_; |
5ec5e0b1 |
209 | |
03c90167 |
210 | my $error = $frame->headers->{message}; |
211 | $app->log->debug("Got Stomp error: $error"); |
0a663589 |
212 | } |
213 | |
d78bb739 |
214 | __PACKAGE__->meta->make_immutable; |
215 | |