Commit | Line | Data |
0a663589 |
1 | package Catalyst::Engine::Stomp; |
2 | use Moose; |
0a663589 |
3 | use List::MoreUtils qw/ uniq /; |
4 | use HTTP::Request; |
5 | use Net::Stomp; |
03c90167 |
6 | use namespace::autoclean; |
7 | |
8 | extends 'Catalyst::Engine::Embeddable'; |
9 | |
f20933b7 |
10 | our $VERSION = '0.06'; |
0a663589 |
11 | |
12 | has connection => (is => 'rw', isa => 'Net::Stomp'); |
13 | has conn_desc => (is => 'rw', isa => 'Str'); |
14 | |
15 | =head1 NAME |
16 | |
17 | Catalyst::Engine::Stomp - write message handling apps with Catalyst. |
18 | |
19 | =head1 SYNOPSIS |
20 | |
21 | # In a server script: |
22 | |
23 | BEGIN { |
24 | $ENV{CATALYST_ENGINE} = 'Stomp'; |
25 | require Catalyst::Engine::Stomp; |
5ec5e0b1 |
26 | } |
0a663589 |
27 | |
28 | MyApp->config->{Engine::Stomp} = |
29 | { |
30 | hostname => '127.0.0.1', |
31 | port => 61613, |
32 | }; |
33 | MyApp->run(); |
34 | |
35 | # In a controller, or controller base class: |
d78bb739 |
36 | use base qw/ Catalyst::Controller::MessageDriven /; |
0a663589 |
37 | |
d78bb739 |
38 | # then create actions, which map as message types |
39 | sub testaction : Local { |
40 | my ($self, $c) = @_; |
0a663589 |
41 | |
d78bb739 |
42 | # Reply with a minimal response message |
43 | my $response = { type => 'testaction_response' }; |
44 | $c->stash->{response} = $response; |
45 | } |
46 | |
0a663589 |
47 | =head1 DESCRIPTION |
48 | |
49 | Write a Catalyst app connected to a Stomp messagebroker, not HTTP. You |
5ec5e0b1 |
50 | need a controller that understands messaging, as well as this engine. |
0a663589 |
51 | |
52 | This is single-threaded and single process - you need to run multiple |
53 | instances of this engine to get concurrency, and configure your broker |
54 | to load-balance across multiple consumers of the same queue. |
55 | |
d78bb739 |
56 | Controllers are mapped to Stomp queues, and a controller base class is |
57 | provided, Catalyst::Controller::MessageDriven, which implements |
5ec5e0b1 |
58 | YAML-serialized messages, mapping a top-level YAML "type" key to |
59 | the action. |
d78bb739 |
60 | |
0a663589 |
61 | =head1 METHODS |
62 | |
63 | =head2 run |
64 | |
65 | App entry point. Starts a loop listening for messages. |
66 | |
67 | =cut |
68 | |
69 | sub run { |
70 | my ($self, $app, $oneshot) = @_; |
71 | |
72 | die 'No Engine::Stomp configuration found' |
bf8937b7 |
73 | unless ref $app->config->{'Engine::Stomp'} eq 'HASH'; |
0a663589 |
74 | |
b9aa4861 |
75 | my @queues = grep { length $_ } |
ccd9e8fa |
76 | map { $app->controller($_)->action_namespace } $app->controllers; |
0a663589 |
77 | |
bf8937b7 |
78 | # connect up |
0a663589 |
79 | my %template = %{$app->config->{'Engine::Stomp'}}; |
de85f238 |
80 | my $subscribe_headers = $template{subscribe_headers} || {}; |
6f80183a |
81 | die("subscribe_headers config for Engine::Stomp must be a hashref!\n") |
82 | if (ref($subscribe_headers) ne 'HASH'); |
83 | |
bf8937b7 |
84 | $self->connection(Net::Stomp->new(\%template)); |
85 | $self->connection->connect(); |
86 | $self->conn_desc($template{hostname}.':'.$template{port}); |
0a663589 |
87 | |
bf8937b7 |
88 | # subscribe, with client ack. |
0a663589 |
89 | foreach my $queue (@queues) { |
bf8937b7 |
90 | my $queue_name = "/queue/$queue"; |
6f80183a |
91 | $self->connection->subscribe({ |
92 | %$subscribe_headers, |
d2057f6f |
93 | destination => $queue_name, |
94 | ack => 'client', |
6f80183a |
95 | }); |
0a663589 |
96 | } |
97 | |
bf8937b7 |
98 | # enter loop... |
99 | while (1) { |
100 | my $frame = $self->connection->receive_frame(); |
101 | $self->handle_stomp_frame($app, $frame); |
102 | last if $ENV{ENGINE_ONESHOT}; |
103 | } |
104 | exit 0; |
0a663589 |
105 | } |
106 | |
107 | =head2 prepare_request |
108 | |
109 | Overridden to add the source broker to the request, in place of the |
110 | client IP address. |
111 | |
112 | =cut |
113 | |
114 | sub prepare_request { |
03c90167 |
115 | my ($self, $c, $req, $res_ref) = @_; |
116 | shift @_; |
117 | $self->next::method(@_); |
118 | $c->req->address($self->conn_desc); |
0a663589 |
119 | } |
120 | |
121 | =head2 finalize_headers |
122 | |
d78bb739 |
123 | Overridden to dump out any errors encountered, since you won't get a |
124 | "debugging" message as for HTTP. |
0a663589 |
125 | |
126 | =cut |
127 | |
128 | sub finalize_headers { |
03c90167 |
129 | my ($self, $c) = @_; |
130 | my $error = join "\n", @{$c->error}; |
131 | if ($error) { |
132 | $c->log->debug($error); |
133 | } |
134 | return $self->next::method($c); |
0a663589 |
135 | } |
136 | |
137 | =head2 handle_stomp_frame |
138 | |
d78bb739 |
139 | Dispatch according to Stomp frame type. |
0a663589 |
140 | |
141 | =cut |
142 | |
143 | sub handle_stomp_frame { |
03c90167 |
144 | my ($self, $app, $frame) = @_; |
145 | |
146 | my $command = $frame->command(); |
147 | if ($command eq 'MESSAGE') { |
148 | $self->handle_stomp_message($app, $frame); |
149 | } |
150 | elsif ($command eq 'ERROR') { |
151 | $self->handle_stomp_error($app, $frame); |
152 | } |
153 | else { |
154 | $app->log->debug("Got unknown Stomp command: $command"); |
155 | } |
0a663589 |
156 | } |
157 | |
158 | =head2 handle_stomp_message |
159 | |
d78bb739 |
160 | Dispatch a Stomp message into the Catalyst app. |
0a663589 |
161 | |
162 | =cut |
163 | |
164 | sub handle_stomp_message { |
03c90167 |
165 | my ($self, $app, $frame) = @_; |
166 | |
167 | # queue -> controller |
168 | my $queue = $frame->headers->{destination}; |
169 | my ($controller) = $queue =~ m|^/queue/(.*)$|; |
170 | |
171 | # set up request |
172 | my $config = $app->config->{'Engine::Stomp'}; |
173 | my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller; |
174 | my $req = HTTP::Request->new(POST => $url); |
175 | $req->content($frame->body); |
176 | $req->content_length(length $frame->body); |
177 | |
178 | # dispatch |
179 | my $response; |
180 | $app->handle_request($req, \$response); |
181 | |
182 | # reply, if header set |
183 | if (my $reply_to = $response->headers->header('X-Reply-Address')) { |
184 | my $reply_queue = '/remote-temp-queue/' . $reply_to; |
185 | $self->connection->send({ destination => $reply_queue, body => $response->content }); |
186 | } |
187 | |
188 | # ack the message off the queue now we've replied / processed |
189 | $self->connection->ack( { frame => $frame } ); |
0a663589 |
190 | } |
191 | |
192 | =head2 handle_stomp_error |
193 | |
d78bb739 |
194 | Log any Stomp error frames we receive. |
0a663589 |
195 | |
196 | =cut |
197 | |
198 | sub handle_stomp_error { |
03c90167 |
199 | my ($self, $app, $frame) = @_; |
5ec5e0b1 |
200 | |
03c90167 |
201 | my $error = $frame->headers->{message}; |
202 | $app->log->debug("Got Stomp error: $error"); |
0a663589 |
203 | } |
204 | |
d78bb739 |
205 | __PACKAGE__->meta->make_immutable; |
206 | |