Put back namespace::autoclean and fix more whitespace
[catagits/Catalyst-Engine-STOMP.git] / lib / Catalyst / Engine / Stomp.pm
CommitLineData
0a663589 1package Catalyst::Engine::Stomp;
2use Moose;
0a663589 3use List::MoreUtils qw/ uniq /;
4use HTTP::Request;
5use Net::Stomp;
03c90167 6use namespace::autoclean;
7
8extends 'Catalyst::Engine::Embeddable';
9
10our $VERSION = '0.04';
0a663589 11
12has connection => (is => 'rw', isa => 'Net::Stomp');
13has conn_desc => (is => 'rw', isa => 'Str');
14
15=head1 NAME
16
17Catalyst::Engine::Stomp - write message handling apps with Catalyst.
18
19=head1 SYNOPSIS
20
21 # In a server script:
22
23 BEGIN {
24 $ENV{CATALYST_ENGINE} = 'Stomp';
25 require Catalyst::Engine::Stomp;
5ec5e0b1 26 }
0a663589 27
28 MyApp->config->{Engine::Stomp} =
29 {
30 hostname => '127.0.0.1',
31 port => 61613,
32 };
33 MyApp->run();
34
35 # In a controller, or controller base class:
d78bb739 36 use base qw/ Catalyst::Controller::MessageDriven /;
0a663589 37
d78bb739 38 # then create actions, which map as message types
39 sub testaction : Local {
40 my ($self, $c) = @_;
0a663589 41
d78bb739 42 # Reply with a minimal response message
43 my $response = { type => 'testaction_response' };
44 $c->stash->{response} = $response;
45 }
46
0a663589 47=head1 DESCRIPTION
48
49Write a Catalyst app connected to a Stomp messagebroker, not HTTP. You
5ec5e0b1 50need a controller that understands messaging, as well as this engine.
0a663589 51
52This is single-threaded and single process - you need to run multiple
53instances of this engine to get concurrency, and configure your broker
54to load-balance across multiple consumers of the same queue.
55
d78bb739 56Controllers are mapped to Stomp queues, and a controller base class is
57provided, Catalyst::Controller::MessageDriven, which implements
5ec5e0b1 58YAML-serialized messages, mapping a top-level YAML "type" key to
59the action.
d78bb739 60
0a663589 61=head1 METHODS
62
63=head2 run
64
65App entry point. Starts a loop listening for messages.
66
67=cut
68
69sub run {
70 my ($self, $app, $oneshot) = @_;
71
72 die 'No Engine::Stomp configuration found'
bf8937b7 73 unless ref $app->config->{'Engine::Stomp'} eq 'HASH';
0a663589 74
75 # list the path namespaces that will be mapped as queues.
bf8937b7 76 #
77 # this is known to use the deprecated
78 # Dispatcher->action_hash() method, but there doesn't appear
79 # to be another way to get the relevant strings out.
80 #
81 # http://github.com/rafl/catalyst-runtime/commit/5de163f4963d9dbb41d7311ca6f17314091b7af3#L2R644
82 #
0a663589 83 my @queues =
bf8937b7 84 uniq
85 grep { length $_ }
86 map { $_->namespace }
87 values %{$app->dispatcher->action_hash};
0a663589 88
bf8937b7 89 # connect up
0a663589 90 my %template = %{$app->config->{'Engine::Stomp'}};
bf8937b7 91 $self->connection(Net::Stomp->new(\%template));
92 $self->connection->connect();
93 $self->conn_desc($template{hostname}.':'.$template{port});
0a663589 94
bf8937b7 95 # subscribe, with client ack.
0a663589 96 foreach my $queue (@queues) {
bf8937b7 97 my $queue_name = "/queue/$queue";
98 $self->connection->subscribe({
5ec5e0b1 99 destination => $queue_name,
100 ack => 'client'
bf8937b7 101 });
0a663589 102 }
103
bf8937b7 104 # enter loop...
105 while (1) {
106 my $frame = $self->connection->receive_frame();
107 $self->handle_stomp_frame($app, $frame);
108 last if $ENV{ENGINE_ONESHOT};
109 }
110 exit 0;
0a663589 111}
112
113=head2 prepare_request
114
115Overridden to add the source broker to the request, in place of the
116client IP address.
117
118=cut
119
120sub prepare_request {
03c90167 121 my ($self, $c, $req, $res_ref) = @_;
122 shift @_;
123 $self->next::method(@_);
124 $c->req->address($self->conn_desc);
0a663589 125}
126
127=head2 finalize_headers
128
d78bb739 129Overridden to dump out any errors encountered, since you won't get a
130"debugging" message as for HTTP.
0a663589 131
132=cut
133
134sub finalize_headers {
03c90167 135 my ($self, $c) = @_;
136 my $error = join "\n", @{$c->error};
137 if ($error) {
138 $c->log->debug($error);
139 }
140 return $self->next::method($c);
0a663589 141}
142
143=head2 handle_stomp_frame
144
d78bb739 145Dispatch according to Stomp frame type.
0a663589 146
147=cut
148
149sub handle_stomp_frame {
03c90167 150 my ($self, $app, $frame) = @_;
151
152 my $command = $frame->command();
153 if ($command eq 'MESSAGE') {
154 $self->handle_stomp_message($app, $frame);
155 }
156 elsif ($command eq 'ERROR') {
157 $self->handle_stomp_error($app, $frame);
158 }
159 else {
160 $app->log->debug("Got unknown Stomp command: $command");
161 }
0a663589 162}
163
164=head2 handle_stomp_message
165
d78bb739 166Dispatch a Stomp message into the Catalyst app.
0a663589 167
168=cut
169
170sub handle_stomp_message {
03c90167 171 my ($self, $app, $frame) = @_;
172
173 # queue -> controller
174 my $queue = $frame->headers->{destination};
175 my ($controller) = $queue =~ m|^/queue/(.*)$|;
176
177 # set up request
178 my $config = $app->config->{'Engine::Stomp'};
179 my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller;
180 my $req = HTTP::Request->new(POST => $url);
181 $req->content($frame->body);
182 $req->content_length(length $frame->body);
183
184 # dispatch
185 my $response;
186 $app->handle_request($req, \$response);
187
188 # reply, if header set
189 if (my $reply_to = $response->headers->header('X-Reply-Address')) {
190 my $reply_queue = '/remote-temp-queue/' . $reply_to;
191 $self->connection->send({ destination => $reply_queue, body => $response->content });
192 }
193
194 # ack the message off the queue now we've replied / processed
195 $self->connection->ack( { frame => $frame } );
0a663589 196}
197
198=head2 handle_stomp_error
199
d78bb739 200Log any Stomp error frames we receive.
0a663589 201
202=cut
203
204sub handle_stomp_error {
03c90167 205 my ($self, $app, $frame) = @_;
5ec5e0b1 206
03c90167 207 my $error = $frame->headers->{message};
208 $app->log->debug("Got Stomp error: $error");
0a663589 209}
210
d78bb739 211__PACKAGE__->meta->make_immutable;
212