Added some pod for type_key, serializer and subscribe_header features
[catagits/Catalyst-Engine-STOMP.git] / lib / Catalyst / Engine / Stomp.pm
CommitLineData
0a663589 1package Catalyst::Engine::Stomp;
2use Moose;
0a663589 3use List::MoreUtils qw/ uniq /;
4use HTTP::Request;
5use Net::Stomp;
03c90167 6use namespace::autoclean;
7
8extends 'Catalyst::Engine::Embeddable';
9
f20933b7 10our $VERSION = '0.06';
0a663589 11
12has connection => (is => 'rw', isa => 'Net::Stomp');
13has conn_desc => (is => 'rw', isa => 'Str');
14
15=head1 NAME
16
17Catalyst::Engine::Stomp - write message handling apps with Catalyst.
18
19=head1 SYNOPSIS
20
21 # In a server script:
22
23 BEGIN {
24 $ENV{CATALYST_ENGINE} = 'Stomp';
25 require Catalyst::Engine::Stomp;
5ec5e0b1 26 }
0a663589 27
28 MyApp->config->{Engine::Stomp} =
29 {
d7c364aa 30 hostname => '127.0.0.1',
31 port => 61613,
32 subscribe_header => {
33 transformation => 'jms-to-json',
34 }
0a663589 35 };
36 MyApp->run();
37
38 # In a controller, or controller base class:
d78bb739 39 use base qw/ Catalyst::Controller::MessageDriven /;
0a663589 40
d78bb739 41 # then create actions, which map as message types
42 sub testaction : Local {
43 my ($self, $c) = @_;
0a663589 44
d78bb739 45 # Reply with a minimal response message
46 my $response = { type => 'testaction_response' };
47 $c->stash->{response} = $response;
48 }
49
0a663589 50=head1 DESCRIPTION
51
52Write a Catalyst app connected to a Stomp messagebroker, not HTTP. You
5ec5e0b1 53need a controller that understands messaging, as well as this engine.
0a663589 54
55This is single-threaded and single process - you need to run multiple
56instances of this engine to get concurrency, and configure your broker
57to load-balance across multiple consumers of the same queue.
58
d78bb739 59Controllers are mapped to Stomp queues, and a controller base class is
60provided, Catalyst::Controller::MessageDriven, which implements
5ec5e0b1 61YAML-serialized messages, mapping a top-level YAML "type" key to
62the action.
d78bb739 63
0a663589 64=head1 METHODS
65
66=head2 run
67
68App entry point. Starts a loop listening for messages.
69
70=cut
71
72sub run {
73 my ($self, $app, $oneshot) = @_;
74
75 die 'No Engine::Stomp configuration found'
bf8937b7 76 unless ref $app->config->{'Engine::Stomp'} eq 'HASH';
0a663589 77
b9aa4861 78 my @queues = grep { length $_ }
ccd9e8fa 79 map { $app->controller($_)->action_namespace } $app->controllers;
0a663589 80
bf8937b7 81 # connect up
0a663589 82 my %template = %{$app->config->{'Engine::Stomp'}};
d2057f6f 83 my $add_header = delete $template{subscribe_header};
84 if (ref($add_header) ne 'HASH') {
85 $add_header = undef;
86 }
bf8937b7 87 $self->connection(Net::Stomp->new(\%template));
88 $self->connection->connect();
89 $self->conn_desc($template{hostname}.':'.$template{port});
0a663589 90
bf8937b7 91 # subscribe, with client ack.
0a663589 92 foreach my $queue (@queues) {
bf8937b7 93 my $queue_name = "/queue/$queue";
d2057f6f 94 my $header_hash = {
95 destination => $queue_name,
96 ack => 'client',
97 };
98
99 # add the additional headers - yes I know it overwrites but
100 # thats the dev's problem?
101 if (keys %{$add_header}) {
102 foreach my $key (keys %{$add_header}) {
103 $header_hash->{$key} = $add_header->{$key};
104 }
105 }
106
107 $self->connection->subscribe($header_hash);
0a663589 108 }
109
bf8937b7 110 # enter loop...
111 while (1) {
112 my $frame = $self->connection->receive_frame();
113 $self->handle_stomp_frame($app, $frame);
114 last if $ENV{ENGINE_ONESHOT};
115 }
116 exit 0;
0a663589 117}
118
119=head2 prepare_request
120
121Overridden to add the source broker to the request, in place of the
122client IP address.
123
124=cut
125
126sub prepare_request {
03c90167 127 my ($self, $c, $req, $res_ref) = @_;
128 shift @_;
129 $self->next::method(@_);
130 $c->req->address($self->conn_desc);
0a663589 131}
132
133=head2 finalize_headers
134
d78bb739 135Overridden to dump out any errors encountered, since you won't get a
136"debugging" message as for HTTP.
0a663589 137
138=cut
139
140sub finalize_headers {
03c90167 141 my ($self, $c) = @_;
142 my $error = join "\n", @{$c->error};
143 if ($error) {
144 $c->log->debug($error);
145 }
146 return $self->next::method($c);
0a663589 147}
148
149=head2 handle_stomp_frame
150
d78bb739 151Dispatch according to Stomp frame type.
0a663589 152
153=cut
154
155sub handle_stomp_frame {
03c90167 156 my ($self, $app, $frame) = @_;
157
158 my $command = $frame->command();
159 if ($command eq 'MESSAGE') {
160 $self->handle_stomp_message($app, $frame);
161 }
162 elsif ($command eq 'ERROR') {
163 $self->handle_stomp_error($app, $frame);
164 }
165 else {
166 $app->log->debug("Got unknown Stomp command: $command");
167 }
0a663589 168}
169
170=head2 handle_stomp_message
171
d78bb739 172Dispatch a Stomp message into the Catalyst app.
0a663589 173
174=cut
175
176sub handle_stomp_message {
03c90167 177 my ($self, $app, $frame) = @_;
178
179 # queue -> controller
180 my $queue = $frame->headers->{destination};
181 my ($controller) = $queue =~ m|^/queue/(.*)$|;
182
183 # set up request
184 my $config = $app->config->{'Engine::Stomp'};
185 my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller;
186 my $req = HTTP::Request->new(POST => $url);
187 $req->content($frame->body);
188 $req->content_length(length $frame->body);
189
190 # dispatch
191 my $response;
192 $app->handle_request($req, \$response);
193
194 # reply, if header set
195 if (my $reply_to = $response->headers->header('X-Reply-Address')) {
196 my $reply_queue = '/remote-temp-queue/' . $reply_to;
197 $self->connection->send({ destination => $reply_queue, body => $response->content });
198 }
199
200 # ack the message off the queue now we've replied / processed
201 $self->connection->ack( { frame => $frame } );
0a663589 202}
0a663589 203=head2 handle_stomp_error
204
d78bb739 205Log any Stomp error frames we receive.
0a663589 206
207=cut
208
209sub handle_stomp_error {
03c90167 210 my ($self, $app, $frame) = @_;
5ec5e0b1 211
03c90167 212 my $error = $frame->headers->{message};
213 $app->log->debug("Got Stomp error: $error");
0a663589 214}
215
d78bb739 216__PACKAGE__->meta->make_immutable;
217
d7c364aa 218=head1 CONFIGURATION
219
220=head2 subscribe_header
221
222Add additional header key/value pairs to the subscribe message sent to the
223message broker.
224
225=cut