1 package Catalyst::Engine::Stomp;
3 extends 'Catalyst::Engine::Embeddable';
7 use List::MoreUtils qw/ uniq /;
11 has connection => (is => 'rw', isa => 'Net::Stomp');
12 has conn_desc => (is => 'rw', isa => 'Str');
16 Catalyst::Engine::Stomp - write message handling apps with Catalyst.
23 $ENV{CATALYST_ENGINE} = 'Stomp';
24 require Catalyst::Engine::Stomp;
27 MyApp->config->{Engine::Stomp} =
29 hostname => '127.0.0.1',
34 # In a controller, or controller base class:
38 # configure YAML deserialization; requires Catalyst::Action::REST
40 'default' => 'text/x-yaml',
41 'stash_key' => 'rest',
42 'map' => { 'text/x-yaml' => 'YAML' },
45 sub begin :ActionClass('Deserialize') { }
47 # have a default action, which forwards to the correct action
48 # based on the message contents (the type).
49 sub default : Private {
52 my $action = $c->req->data->{type};
57 $c->engine->send_message($queue, Dump($msg));
61 Write a Catalyst app connected to a Stomp messagebroker, not HTTP. You
62 need a controller that understands messaging, as well as this engine.
64 This is single-threaded and single process - you need to run multiple
65 instances of this engine to get concurrency, and configure your broker
66 to load-balance across multiple consumers of the same queue.
72 App entry point. Starts a loop listening for messages.
77 my ($self, $app, $oneshot) = @_;
79 die 'No Engine::Stomp configuration found'
80 unless ref $app->config->{'Engine::Stomp'} eq 'HASH';
82 # list the path namespaces that will be mapped as queues.
84 # this is known to use the deprecated
85 # Dispatcher->action_hash() method, but there doesn't appear
86 # to be another way to get the relevant strings out.
88 # http://github.com/rafl/catalyst-runtime/commit/5de163f4963d9dbb41d7311ca6f17314091b7af3#L2R644
94 values %{$app->dispatcher->action_hash};
97 my %template = %{$app->config->{'Engine::Stomp'}};
98 $self->connection(Net::Stomp->new(\%template));
99 $self->connection->connect();
100 $self->conn_desc($template{hostname}.':'.$template{port});
102 # subscribe, with client ack.
103 foreach my $queue (@queues) {
104 my $queue_name = "/queue/$queue";
105 $self->connection->subscribe({
106 destination => $queue_name,
113 my $frame = $self->connection->receive_frame();
114 $self->handle_stomp_frame($app, $frame);
115 last if $ENV{ENGINE_ONESHOT};
120 =head2 prepare_request
122 Overridden to add the source broker to the request, in place of the
127 sub prepare_request {
128 my ($self, $c, $req, $res_ref) = @_;
130 $self->next::method(@_);
131 $c->req->address($self->conn_desc);
134 =head2 finalize_headers
136 Overridden to dump out any errors encountered.
140 sub finalize_headers {
142 my $error = join "\n", @{$c->error};
144 $c->log->debug($error);
146 return $self->next::method($c);
149 =head2 handle_stomp_frame
151 Dispatch according to STOMP frame type.
155 sub handle_stomp_frame {
156 my ($self, $app, $frame) = @_;
158 my $command = $frame->command();
159 $app->log->debug("Got STOMP command: $command");
161 if ($command eq 'MESSAGE') {
162 $self->handle_stomp_message($app, $frame);
164 elsif ($command eq 'ERROR') {
165 $self->handle_stomp_error($app, $frame);
168 $app->log->debug("Got unknown STOMP command: $command");
172 =head2 handle_stomp_message
174 Dispatch a STOMP message into the Catalyst app.
178 sub handle_stomp_message {
179 my ($self, $app, $frame) = @_;
181 # queue -> controller
182 my $queue = $frame->headers->{destination};
183 my ($controller) = $queue =~ m!^/queue/(.*)$!;
186 my $config = $app->config->{'Engine::Stomp'};
187 my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller;
188 my $req = HTTP::Request->new(POST => $url);
189 $req->content($frame->body);
190 $req->content_length(length $frame->body);
194 $app->handle_request($req, \$response);
197 my $reply_queue = '/remote-temp-queue/' . ($response->headers->header('X-Reply-Address'));
198 $app->log->debug("replying to $reply_queue\n");
199 $self->connection->send({ destination => $reply_queue, body => $response->content });
201 # ack the message off the queue now we've replied
202 $self->connection->ack( { frame => $frame } );
205 =head2 handle_stomp_error
207 Log any STOMP error frames we receive.
211 sub handle_stomp_error {
212 my ($self, $app, $frame) = @_;
214 my $error = $frame->headers->{message};
215 $app->log->debug("Got STOMP error: $error");