Commit | Line | Data |
0a663589 |
1 | package Catalyst::Engine::Stomp; |
2 | use Moose; |
0a663589 |
3 | use List::MoreUtils qw/ uniq /; |
4 | use HTTP::Request; |
5 | use Net::Stomp; |
03c90167 |
6 | use namespace::autoclean; |
7 | |
8 | extends 'Catalyst::Engine::Embeddable'; |
9 | |
f20933b7 |
10 | our $VERSION = '0.06'; |
0a663589 |
11 | |
12 | has connection => (is => 'rw', isa => 'Net::Stomp'); |
13 | has conn_desc => (is => 'rw', isa => 'Str'); |
14 | |
15 | =head1 NAME |
16 | |
17 | Catalyst::Engine::Stomp - write message handling apps with Catalyst. |
18 | |
19 | =head1 SYNOPSIS |
20 | |
21 | # In a server script: |
22 | |
23 | BEGIN { |
24 | $ENV{CATALYST_ENGINE} = 'Stomp'; |
25 | require Catalyst::Engine::Stomp; |
5ec5e0b1 |
26 | } |
0a663589 |
27 | |
28 | MyApp->config->{Engine::Stomp} = |
29 | { |
30 | hostname => '127.0.0.1', |
31 | port => 61613, |
32 | }; |
33 | MyApp->run(); |
34 | |
35 | # In a controller, or controller base class: |
d78bb739 |
36 | use base qw/ Catalyst::Controller::MessageDriven /; |
0a663589 |
37 | |
d78bb739 |
38 | # then create actions, which map as message types |
39 | sub testaction : Local { |
40 | my ($self, $c) = @_; |
0a663589 |
41 | |
d78bb739 |
42 | # Reply with a minimal response message |
43 | my $response = { type => 'testaction_response' }; |
44 | $c->stash->{response} = $response; |
45 | } |
46 | |
0a663589 |
47 | =head1 DESCRIPTION |
48 | |
49 | Write a Catalyst app connected to a Stomp messagebroker, not HTTP. You |
5ec5e0b1 |
50 | need a controller that understands messaging, as well as this engine. |
0a663589 |
51 | |
52 | This is single-threaded and single process - you need to run multiple |
53 | instances of this engine to get concurrency, and configure your broker |
54 | to load-balance across multiple consumers of the same queue. |
55 | |
d78bb739 |
56 | Controllers are mapped to Stomp queues, and a controller base class is |
57 | provided, Catalyst::Controller::MessageDriven, which implements |
5ec5e0b1 |
58 | YAML-serialized messages, mapping a top-level YAML "type" key to |
59 | the action. |
d78bb739 |
60 | |
0a663589 |
61 | =head1 METHODS |
62 | |
63 | =head2 run |
64 | |
65 | App entry point. Starts a loop listening for messages. |
66 | |
67 | =cut |
68 | |
69 | sub run { |
70 | my ($self, $app, $oneshot) = @_; |
71 | |
72 | die 'No Engine::Stomp configuration found' |
bf8937b7 |
73 | unless ref $app->config->{'Engine::Stomp'} eq 'HASH'; |
0a663589 |
74 | |
b9aa4861 |
75 | my @queues = grep { length $_ } |
ccd9e8fa |
76 | map { $app->controller($_)->action_namespace } $app->controllers; |
0a663589 |
77 | |
bf8937b7 |
78 | # connect up |
0a663589 |
79 | my %template = %{$app->config->{'Engine::Stomp'}}; |
bf8937b7 |
80 | $self->connection(Net::Stomp->new(\%template)); |
81 | $self->connection->connect(); |
82 | $self->conn_desc($template{hostname}.':'.$template{port}); |
0a663589 |
83 | |
bf8937b7 |
84 | # subscribe, with client ack. |
0a663589 |
85 | foreach my $queue (@queues) { |
bf8937b7 |
86 | my $queue_name = "/queue/$queue"; |
87 | $self->connection->subscribe({ |
5ec5e0b1 |
88 | destination => $queue_name, |
89 | ack => 'client' |
bf8937b7 |
90 | }); |
0a663589 |
91 | } |
92 | |
bf8937b7 |
93 | # enter loop... |
94 | while (1) { |
95 | my $frame = $self->connection->receive_frame(); |
96 | $self->handle_stomp_frame($app, $frame); |
97 | last if $ENV{ENGINE_ONESHOT}; |
98 | } |
99 | exit 0; |
0a663589 |
100 | } |
101 | |
102 | =head2 prepare_request |
103 | |
104 | Overridden to add the source broker to the request, in place of the |
105 | client IP address. |
106 | |
107 | =cut |
108 | |
109 | sub prepare_request { |
03c90167 |
110 | my ($self, $c, $req, $res_ref) = @_; |
111 | shift @_; |
112 | $self->next::method(@_); |
113 | $c->req->address($self->conn_desc); |
0a663589 |
114 | } |
115 | |
116 | =head2 finalize_headers |
117 | |
d78bb739 |
118 | Overridden to dump out any errors encountered, since you won't get a |
119 | "debugging" message as for HTTP. |
0a663589 |
120 | |
121 | =cut |
122 | |
123 | sub finalize_headers { |
03c90167 |
124 | my ($self, $c) = @_; |
125 | my $error = join "\n", @{$c->error}; |
126 | if ($error) { |
127 | $c->log->debug($error); |
128 | } |
129 | return $self->next::method($c); |
0a663589 |
130 | } |
131 | |
132 | =head2 handle_stomp_frame |
133 | |
d78bb739 |
134 | Dispatch according to Stomp frame type. |
0a663589 |
135 | |
136 | =cut |
137 | |
138 | sub handle_stomp_frame { |
03c90167 |
139 | my ($self, $app, $frame) = @_; |
140 | |
141 | my $command = $frame->command(); |
142 | if ($command eq 'MESSAGE') { |
143 | $self->handle_stomp_message($app, $frame); |
144 | } |
145 | elsif ($command eq 'ERROR') { |
146 | $self->handle_stomp_error($app, $frame); |
147 | } |
148 | else { |
149 | $app->log->debug("Got unknown Stomp command: $command"); |
150 | } |
0a663589 |
151 | } |
152 | |
153 | =head2 handle_stomp_message |
154 | |
d78bb739 |
155 | Dispatch a Stomp message into the Catalyst app. |
0a663589 |
156 | |
157 | =cut |
158 | |
159 | sub handle_stomp_message { |
03c90167 |
160 | my ($self, $app, $frame) = @_; |
161 | |
162 | # queue -> controller |
163 | my $queue = $frame->headers->{destination}; |
164 | my ($controller) = $queue =~ m|^/queue/(.*)$|; |
165 | |
166 | # set up request |
167 | my $config = $app->config->{'Engine::Stomp'}; |
168 | my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller; |
169 | my $req = HTTP::Request->new(POST => $url); |
170 | $req->content($frame->body); |
171 | $req->content_length(length $frame->body); |
172 | |
173 | # dispatch |
174 | my $response; |
175 | $app->handle_request($req, \$response); |
176 | |
177 | # reply, if header set |
178 | if (my $reply_to = $response->headers->header('X-Reply-Address')) { |
179 | my $reply_queue = '/remote-temp-queue/' . $reply_to; |
180 | $self->connection->send({ destination => $reply_queue, body => $response->content }); |
181 | } |
182 | |
183 | # ack the message off the queue now we've replied / processed |
184 | $self->connection->ack( { frame => $frame } ); |
0a663589 |
185 | } |
186 | |
187 | =head2 handle_stomp_error |
188 | |
d78bb739 |
189 | Log any Stomp error frames we receive. |
0a663589 |
190 | |
191 | =cut |
192 | |
193 | sub handle_stomp_error { |
03c90167 |
194 | my ($self, $app, $frame) = @_; |
5ec5e0b1 |
195 | |
03c90167 |
196 | my $error = $frame->headers->{message}; |
197 | $app->log->debug("Got Stomp error: $error"); |
0a663589 |
198 | } |
199 | |
d78bb739 |
200 | __PACKAGE__->meta->make_immutable; |
201 | |