1 package Catalyst::Engine::Stomp;
3 use List::MoreUtils qw/ uniq /;
6 use MooseX::Types::Moose qw/Str Int HashRef/;
7 use namespace::autoclean;
9 extends 'Catalyst::Engine::Embeddable';
11 our $VERSION = '0.06';
13 has connection => (is => 'rw', isa => 'Net::Stomp');
14 has conn_desc => (is => 'rw', isa => Str);
18 Catalyst::Engine::Stomp - write message handling apps with Catalyst.
25 $ENV{CATALYST_ENGINE} = 'Stomp';
26 require Catalyst::Engine::Stomp;
31 hostname => '127.0.0.1',
34 transformation => 'jms-to-json',
40 # In a controller, or controller base class:
41 use base qw/ Catalyst::Controller::MessageDriven /;
43 # then create actions, which map as message types
44 sub testaction : Local {
47 # Reply with a minimal response message
48 my $response = { type => 'testaction_response' };
49 $c->stash->{response} = $response;
54 Write a Catalyst app connected to a Stomp messagebroker, not HTTP. You
55 need a controller that understands messaging, as well as this engine.
57 This is single-threaded and single process - you need to run multiple
58 instances of this engine to get concurrency, and configure your broker
59 to load-balance across multiple consumers of the same queue.
61 Controllers are mapped to Stomp queues, and a controller base class is
62 provided, Catalyst::Controller::MessageDriven, which implements
63 YAML-serialized messages, mapping a top-level YAML "type" key to
70 App entry point. Starts a loop listening for messages.
75 my ($self, $app, $oneshot) = @_;
77 die 'No Engine::Stomp configuration found'
78 unless ref $app->config->{'Engine::Stomp'} eq 'HASH';
80 my @queues = grep { length $_ }
81 map { $app->controller($_)->action_namespace } $app->controllers;
84 my %template = %{$app->config->{'Engine::Stomp'}};
85 my $subscribe_headers = $template{subscribe_headers} || {};
86 die("subscribe_headers config for Engine::Stomp must be a hashref!\n")
87 if (ref($subscribe_headers) ne 'HASH');
89 $self->connection(Net::Stomp->new(\%template));
90 $self->connection->connect();
91 $self->conn_desc($template{hostname}.':'.$template{port});
93 # subscribe, with client ack.
94 foreach my $queue (@queues) {
95 my $queue_name = "/queue/$queue";
96 $self->connection->subscribe({
98 destination => $queue_name,
105 my $frame = $self->connection->receive_frame();
106 $self->handle_stomp_frame($app, $frame);
107 last if $ENV{ENGINE_ONESHOT};
112 =head2 prepare_request
114 Overridden to add the source broker to the request, in place of the
119 sub prepare_request {
120 my ($self, $c, $req, $res_ref) = @_;
122 $self->next::method(@_);
123 $c->req->address($self->conn_desc);
126 =head2 finalize_headers
128 Overridden to dump out any errors encountered, since you won't get a
129 "debugging" message as for HTTP.
133 sub finalize_headers {
135 my $error = join "\n", @{$c->error};
137 $c->log->debug($error);
139 return $self->next::method($c);
142 =head2 handle_stomp_frame
144 Dispatch according to Stomp frame type.
148 sub handle_stomp_frame {
149 my ($self, $app, $frame) = @_;
151 my $command = $frame->command();
152 if ($command eq 'MESSAGE') {
153 $self->handle_stomp_message($app, $frame);
155 elsif ($command eq 'ERROR') {
156 $self->handle_stomp_error($app, $frame);
159 $app->log->debug("Got unknown Stomp command: $command");
163 =head2 handle_stomp_message
165 Dispatch a Stomp message into the Catalyst app.
169 sub handle_stomp_message {
170 my ($self, $app, $frame) = @_;
172 # queue -> controller
173 my $queue = $frame->headers->{destination};
174 my ($controller) = $queue =~ m|^/queue/(.*)$|;
177 my $config = $app->config->{'Engine::Stomp'};
178 my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller;
179 my $req = HTTP::Request->new(POST => $url);
180 $req->content($frame->body);
181 $req->content_length(length $frame->body);
185 $app->handle_request($req, \$response);
187 # reply, if header set
188 if (my $reply_to = $response->headers->header('X-Reply-Address')) {
189 my $reply_queue = '/remote-temp-queue/' . $reply_to;
190 $self->connection->send({ destination => $reply_queue, body => $response->content });
193 # ack the message off the queue now we've replied / processed
194 $self->connection->ack( { frame => $frame } );
196 =head2 handle_stomp_error
198 Log any Stomp error frames we receive.
202 sub handle_stomp_error {
203 my ($self, $app, $frame) = @_;
205 my $error = $frame->headers->{message};
206 $app->log->debug("Got Stomp error: $error");
209 __PACKAGE__->meta->make_immutable;
213 =head2 subscribe_header
215 Add additional header key/value pairs to the subscribe message sent to the
222 The source to Catalyst::Engine::Stomp is in github:
224 http://github.com/chrisa/catalyst-engine-stomp
228 Chris Andrews C<< <chris@nodnol.org> >>
232 Tomas Doran (t0m) C<< <bobtfish@bobtfish.net> >>
236 =head1 LICENCE AND COPYRIGHT
238 Copyright (C) 2009 Venda Ltd
240 This library is free software; you can redistribute it and/or modify
241 it under the same terms as Perl itself, either Perl version 5.8.8 or,
242 at your option, any later version of Perl 5 you may have available.