Attempt to please PAUSE by renaming our test app, and marking its package no_index
[catagits/Catalyst-Engine-STOMP.git] / lib / Catalyst / Engine / Stomp.pm
CommitLineData
0a663589 1package Catalyst::Engine::Stomp;
2use Moose;
3extends 'Catalyst::Engine::Embeddable';
4
68e73c9f 5our $VERSION = '0.02';
0a663589 6
7use List::MoreUtils qw/ uniq /;
8use HTTP::Request;
9use Net::Stomp;
10
11has connection => (is => 'rw', isa => 'Net::Stomp');
12has conn_desc => (is => 'rw', isa => 'Str');
13
14=head1 NAME
15
16Catalyst::Engine::Stomp - write message handling apps with Catalyst.
17
18=head1 SYNOPSIS
19
20 # In a server script:
21
22 BEGIN {
23 $ENV{CATALYST_ENGINE} = 'Stomp';
24 require Catalyst::Engine::Stomp;
25 }
26
27 MyApp->config->{Engine::Stomp} =
28 {
29 hostname => '127.0.0.1',
30 port => 61613,
31 };
32 MyApp->run();
33
34 # In a controller, or controller base class:
d78bb739 35 use base qw/ Catalyst::Controller::MessageDriven /;
0a663589 36
d78bb739 37 # then create actions, which map as message types
38 sub testaction : Local {
39 my ($self, $c) = @_;
0a663589 40
d78bb739 41 # Reply with a minimal response message
42 my $response = { type => 'testaction_response' };
43 $c->stash->{response} = $response;
44 }
45
46 # The default serialization is YAML, but this configuration
47 # may be overridden in your controller:
0a663589 48 __PACKAGE__->config(
49 'default' => 'text/x-yaml',
50 'stash_key' => 'rest',
51 'map' => { 'text/x-yaml' => 'YAML' },
52 );
53
0a663589 54=head1 DESCRIPTION
55
56Write a Catalyst app connected to a Stomp messagebroker, not HTTP. You
57need a controller that understands messaging, as well as this engine.
58
59This is single-threaded and single process - you need to run multiple
60instances of this engine to get concurrency, and configure your broker
61to load-balance across multiple consumers of the same queue.
62
d78bb739 63Controllers are mapped to Stomp queues, and a controller base class is
64provided, Catalyst::Controller::MessageDriven, which implements
65YAML-serialized messages, mapping a top-level YAML "type" key to
66the action.
67
0a663589 68=head1 METHODS
69
70=head2 run
71
72App entry point. Starts a loop listening for messages.
73
74=cut
75
76sub run {
77 my ($self, $app, $oneshot) = @_;
78
79 die 'No Engine::Stomp configuration found'
80 unless ref $app->config->{'Engine::Stomp'} eq 'HASH';
81
82 # list the path namespaces that will be mapped as queues.
83 #
84 # this is known to use the deprecated
85 # Dispatcher->action_hash() method, but there doesn't appear
86 # to be another way to get the relevant strings out.
87 #
88 # http://github.com/rafl/catalyst-runtime/commit/5de163f4963d9dbb41d7311ca6f17314091b7af3#L2R644
89 #
90 my @queues =
91 uniq
92 grep { length $_ }
93 map { $_->namespace }
94 values %{$app->dispatcher->action_hash};
95
96 # connect up
97 my %template = %{$app->config->{'Engine::Stomp'}};
98 $self->connection(Net::Stomp->new(\%template));
99 $self->connection->connect();
100 $self->conn_desc($template{hostname}.':'.$template{port});
101
102 # subscribe, with client ack.
103 foreach my $queue (@queues) {
104 my $queue_name = "/queue/$queue";
105 $self->connection->subscribe({
106 destination => $queue_name,
107 ack => 'client'
108 });
109 }
110
111 # enter loop...
112 while (1) {
113 my $frame = $self->connection->receive_frame();
114 $self->handle_stomp_frame($app, $frame);
115 last if $ENV{ENGINE_ONESHOT};
116 }
117 exit 0;
118}
119
120=head2 prepare_request
121
122Overridden to add the source broker to the request, in place of the
123client IP address.
124
125=cut
126
127sub prepare_request {
128 my ($self, $c, $req, $res_ref) = @_;
129 shift @_;
130 $self->next::method(@_);
131 $c->req->address($self->conn_desc);
132}
133
134=head2 finalize_headers
135
d78bb739 136Overridden to dump out any errors encountered, since you won't get a
137"debugging" message as for HTTP.
0a663589 138
139=cut
140
141sub finalize_headers {
142 my ($self, $c) = @_;
143 my $error = join "\n", @{$c->error};
144 if ($error) {
145 $c->log->debug($error);
146 }
147 return $self->next::method($c);
148}
149
150=head2 handle_stomp_frame
151
d78bb739 152Dispatch according to Stomp frame type.
0a663589 153
154=cut
155
156sub handle_stomp_frame {
157 my ($self, $app, $frame) = @_;
158
159 my $command = $frame->command();
0a663589 160 if ($command eq 'MESSAGE') {
161 $self->handle_stomp_message($app, $frame);
162 }
163 elsif ($command eq 'ERROR') {
164 $self->handle_stomp_error($app, $frame);
165 }
166 else {
d78bb739 167 $app->log->debug("Got unknown Stomp command: $command");
0a663589 168 }
169}
170
171=head2 handle_stomp_message
172
d78bb739 173Dispatch a Stomp message into the Catalyst app.
0a663589 174
175=cut
176
177sub handle_stomp_message {
178 my ($self, $app, $frame) = @_;
179
180 # queue -> controller
181 my $queue = $frame->headers->{destination};
182 my ($controller) = $queue =~ m!^/queue/(.*)$!;
183
184 # set up request
185 my $config = $app->config->{'Engine::Stomp'};
186 my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller;
187 my $req = HTTP::Request->new(POST => $url);
188 $req->content($frame->body);
189 $req->content_length(length $frame->body);
190
191 # dispatch
192 my $response;
193 $app->handle_request($req, \$response);
194
195 # reply
196 my $reply_queue = '/remote-temp-queue/' . ($response->headers->header('X-Reply-Address'));
0a663589 197 $self->connection->send({ destination => $reply_queue, body => $response->content });
198
199 # ack the message off the queue now we've replied
200 $self->connection->ack( { frame => $frame } );
201}
202
203=head2 handle_stomp_error
204
d78bb739 205Log any Stomp error frames we receive.
0a663589 206
207=cut
208
209sub handle_stomp_error {
210 my ($self, $app, $frame) = @_;
211
212 my $error = $frame->headers->{message};
d78bb739 213 $app->log->debug("Got Stomp error: $error");
0a663589 214}
215
d78bb739 216__PACKAGE__->meta->make_immutable;
217
0a663589 2181;
219