Mangle config docs to be nicer, use MooseX::Types
[catagits/Catalyst-Engine-STOMP.git] / lib / Catalyst / Engine / Stomp.pm
CommitLineData
0a663589 1package Catalyst::Engine::Stomp;
2use Moose;
0a663589 3use List::MoreUtils qw/ uniq /;
4use HTTP::Request;
5use Net::Stomp;
a58d0da6 6use MooseX::Types::Moose qw/Str Int HashRef/;
03c90167 7use namespace::autoclean;
8
9extends 'Catalyst::Engine::Embeddable';
10
f20933b7 11our $VERSION = '0.06';
0a663589 12
13has connection => (is => 'rw', isa => 'Net::Stomp');
a58d0da6 14has conn_desc => (is => 'rw', isa => Str);
0a663589 15
16=head1 NAME
17
18Catalyst::Engine::Stomp - write message handling apps with Catalyst.
19
20=head1 SYNOPSIS
21
22 # In a server script:
23
24 BEGIN {
25 $ENV{CATALYST_ENGINE} = 'Stomp';
26 require Catalyst::Engine::Stomp;
5ec5e0b1 27 }
0a663589 28
a58d0da6 29 MyApp->config(
30 'Engine::Stomp' = {
31 hostname => '127.0.0.1',
32 port => 61613,
33 subscribe_header => {
34 transformation => 'jms-to-json',
35 }
36 },
37 );
0a663589 38 MyApp->run();
39
40 # In a controller, or controller base class:
d78bb739 41 use base qw/ Catalyst::Controller::MessageDriven /;
0a663589 42
d78bb739 43 # then create actions, which map as message types
44 sub testaction : Local {
45 my ($self, $c) = @_;
0a663589 46
d78bb739 47 # Reply with a minimal response message
48 my $response = { type => 'testaction_response' };
49 $c->stash->{response} = $response;
50 }
51
0a663589 52=head1 DESCRIPTION
53
54Write a Catalyst app connected to a Stomp messagebroker, not HTTP. You
5ec5e0b1 55need a controller that understands messaging, as well as this engine.
0a663589 56
57This is single-threaded and single process - you need to run multiple
58instances of this engine to get concurrency, and configure your broker
59to load-balance across multiple consumers of the same queue.
60
d78bb739 61Controllers are mapped to Stomp queues, and a controller base class is
62provided, Catalyst::Controller::MessageDriven, which implements
5ec5e0b1 63YAML-serialized messages, mapping a top-level YAML "type" key to
64the action.
d78bb739 65
0a663589 66=head1 METHODS
67
68=head2 run
69
70App entry point. Starts a loop listening for messages.
71
72=cut
73
74sub run {
75 my ($self, $app, $oneshot) = @_;
76
77 die 'No Engine::Stomp configuration found'
bf8937b7 78 unless ref $app->config->{'Engine::Stomp'} eq 'HASH';
0a663589 79
b9aa4861 80 my @queues = grep { length $_ }
ccd9e8fa 81 map { $app->controller($_)->action_namespace } $app->controllers;
0a663589 82
bf8937b7 83 # connect up
0a663589 84 my %template = %{$app->config->{'Engine::Stomp'}};
d2057f6f 85 my $add_header = delete $template{subscribe_header};
86 if (ref($add_header) ne 'HASH') {
87 $add_header = undef;
88 }
bf8937b7 89 $self->connection(Net::Stomp->new(\%template));
90 $self->connection->connect();
91 $self->conn_desc($template{hostname}.':'.$template{port});
0a663589 92
bf8937b7 93 # subscribe, with client ack.
0a663589 94 foreach my $queue (@queues) {
bf8937b7 95 my $queue_name = "/queue/$queue";
d2057f6f 96 my $header_hash = {
97 destination => $queue_name,
98 ack => 'client',
99 };
100
101 # add the additional headers - yes I know it overwrites but
102 # thats the dev's problem?
103 if (keys %{$add_header}) {
104 foreach my $key (keys %{$add_header}) {
105 $header_hash->{$key} = $add_header->{$key};
106 }
107 }
108
109 $self->connection->subscribe($header_hash);
0a663589 110 }
111
bf8937b7 112 # enter loop...
113 while (1) {
114 my $frame = $self->connection->receive_frame();
115 $self->handle_stomp_frame($app, $frame);
116 last if $ENV{ENGINE_ONESHOT};
117 }
118 exit 0;
0a663589 119}
120
121=head2 prepare_request
122
123Overridden to add the source broker to the request, in place of the
124client IP address.
125
126=cut
127
128sub prepare_request {
03c90167 129 my ($self, $c, $req, $res_ref) = @_;
130 shift @_;
131 $self->next::method(@_);
132 $c->req->address($self->conn_desc);
0a663589 133}
134
135=head2 finalize_headers
136
d78bb739 137Overridden to dump out any errors encountered, since you won't get a
138"debugging" message as for HTTP.
0a663589 139
140=cut
141
142sub finalize_headers {
03c90167 143 my ($self, $c) = @_;
144 my $error = join "\n", @{$c->error};
145 if ($error) {
146 $c->log->debug($error);
147 }
148 return $self->next::method($c);
0a663589 149}
150
151=head2 handle_stomp_frame
152
d78bb739 153Dispatch according to Stomp frame type.
0a663589 154
155=cut
156
157sub handle_stomp_frame {
03c90167 158 my ($self, $app, $frame) = @_;
159
160 my $command = $frame->command();
161 if ($command eq 'MESSAGE') {
162 $self->handle_stomp_message($app, $frame);
163 }
164 elsif ($command eq 'ERROR') {
165 $self->handle_stomp_error($app, $frame);
166 }
167 else {
168 $app->log->debug("Got unknown Stomp command: $command");
169 }
0a663589 170}
171
172=head2 handle_stomp_message
173
d78bb739 174Dispatch a Stomp message into the Catalyst app.
0a663589 175
176=cut
177
178sub handle_stomp_message {
03c90167 179 my ($self, $app, $frame) = @_;
180
181 # queue -> controller
182 my $queue = $frame->headers->{destination};
183 my ($controller) = $queue =~ m|^/queue/(.*)$|;
184
185 # set up request
186 my $config = $app->config->{'Engine::Stomp'};
187 my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller;
188 my $req = HTTP::Request->new(POST => $url);
189 $req->content($frame->body);
190 $req->content_length(length $frame->body);
191
192 # dispatch
193 my $response;
194 $app->handle_request($req, \$response);
195
196 # reply, if header set
197 if (my $reply_to = $response->headers->header('X-Reply-Address')) {
198 my $reply_queue = '/remote-temp-queue/' . $reply_to;
199 $self->connection->send({ destination => $reply_queue, body => $response->content });
200 }
201
202 # ack the message off the queue now we've replied / processed
203 $self->connection->ack( { frame => $frame } );
0a663589 204}
0a663589 205=head2 handle_stomp_error
206
d78bb739 207Log any Stomp error frames we receive.
0a663589 208
209=cut
210
211sub handle_stomp_error {
03c90167 212 my ($self, $app, $frame) = @_;
5ec5e0b1 213
03c90167 214 my $error = $frame->headers->{message};
215 $app->log->debug("Got Stomp error: $error");
0a663589 216}
217
d78bb739 218__PACKAGE__->meta->make_immutable;
219
d7c364aa 220=head1 CONFIGURATION
221
222=head2 subscribe_header
223
224Add additional header key/value pairs to the subscribe message sent to the
225message broker.
226
227=cut