1 package Catalyst::Engine::Stomp;
3 use List::MoreUtils qw/ uniq /;
6 use namespace::autoclean;
8 extends 'Catalyst::Engine::Embeddable';
10 our $VERSION = '0.05';
12 has connection => (is => 'rw', isa => 'Net::Stomp');
13 has conn_desc => (is => 'rw', isa => 'Str');
17 Catalyst::Engine::Stomp - write message handling apps with Catalyst.
24 $ENV{CATALYST_ENGINE} = 'Stomp';
25 require Catalyst::Engine::Stomp;
28 MyApp->config->{Engine::Stomp} =
30 hostname => '127.0.0.1',
35 # In a controller, or controller base class:
36 use base qw/ Catalyst::Controller::MessageDriven /;
38 # then create actions, which map as message types
39 sub testaction : Local {
42 # Reply with a minimal response message
43 my $response = { type => 'testaction_response' };
44 $c->stash->{response} = $response;
49 Write a Catalyst app connected to a Stomp messagebroker, not HTTP. You
50 need a controller that understands messaging, as well as this engine.
52 This is single-threaded and single process - you need to run multiple
53 instances of this engine to get concurrency, and configure your broker
54 to load-balance across multiple consumers of the same queue.
56 Controllers are mapped to Stomp queues, and a controller base class is
57 provided, Catalyst::Controller::MessageDriven, which implements
58 YAML-serialized messages, mapping a top-level YAML "type" key to
65 App entry point. Starts a loop listening for messages.
70 my ($self, $app, $oneshot) = @_;
72 die 'No Engine::Stomp configuration found'
73 unless ref $app->config->{'Engine::Stomp'} eq 'HASH';
75 my @queues = grep { length $_ }
76 map { $app->controller($_)->action_namespace } $app->controllers;
79 my %template = %{$app->config->{'Engine::Stomp'}};
80 $self->connection(Net::Stomp->new(\%template));
81 $self->connection->connect();
82 $self->conn_desc($template{hostname}.':'.$template{port});
84 # subscribe, with client ack.
85 foreach my $queue (@queues) {
86 my $queue_name = "/queue/$queue";
87 $self->connection->subscribe({
88 destination => $queue_name,
95 my $frame = $self->connection->receive_frame();
96 $self->handle_stomp_frame($app, $frame);
97 last if $ENV{ENGINE_ONESHOT};
102 =head2 prepare_request
104 Overridden to add the source broker to the request, in place of the
109 sub prepare_request {
110 my ($self, $c, $req, $res_ref) = @_;
112 $self->next::method(@_);
113 $c->req->address($self->conn_desc);
116 =head2 finalize_headers
118 Overridden to dump out any errors encountered, since you won't get a
119 "debugging" message as for HTTP.
123 sub finalize_headers {
125 my $error = join "\n", @{$c->error};
127 $c->log->debug($error);
129 return $self->next::method($c);
132 =head2 handle_stomp_frame
134 Dispatch according to Stomp frame type.
138 sub handle_stomp_frame {
139 my ($self, $app, $frame) = @_;
141 my $command = $frame->command();
142 if ($command eq 'MESSAGE') {
143 $self->handle_stomp_message($app, $frame);
145 elsif ($command eq 'ERROR') {
146 $self->handle_stomp_error($app, $frame);
149 $app->log->debug("Got unknown Stomp command: $command");
153 =head2 handle_stomp_message
155 Dispatch a Stomp message into the Catalyst app.
159 sub handle_stomp_message {
160 my ($self, $app, $frame) = @_;
162 # queue -> controller
163 my $queue = $frame->headers->{destination};
164 my ($controller) = $queue =~ m|^/queue/(.*)$|;
167 my $config = $app->config->{'Engine::Stomp'};
168 my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller;
169 my $req = HTTP::Request->new(POST => $url);
170 $req->content($frame->body);
171 $req->content_length(length $frame->body);
175 $app->handle_request($req, \$response);
177 # reply, if header set
178 if (my $reply_to = $response->headers->header('X-Reply-Address')) {
179 my $reply_queue = '/remote-temp-queue/' . $reply_to;
180 $self->connection->send({ destination => $reply_queue, body => $response->content });
183 # ack the message off the queue now we've replied / processed
184 $self->connection->ack( { frame => $frame } );
187 =head2 handle_stomp_error
189 Log any Stomp error frames we receive.
193 sub handle_stomp_error {
194 my ($self, $app, $frame) = @_;
196 my $error = $frame->headers->{message};
197 $app->log->debug("Got Stomp error: $error");
200 __PACKAGE__->meta->make_immutable;