Mangle config docs to be nicer, use MooseX::Types
[catagits/Catalyst-Engine-STOMP.git] / lib / Catalyst / Engine / Stomp.pm
1 package Catalyst::Engine::Stomp;
2 use Moose;
3 use List::MoreUtils qw/ uniq /;
4 use HTTP::Request;
5 use Net::Stomp;
6 use MooseX::Types::Moose qw/Str Int HashRef/;
7 use namespace::autoclean;
8
9 extends 'Catalyst::Engine::Embeddable';
10
11 our $VERSION = '0.06';
12
13 has connection => (is => 'rw', isa => 'Net::Stomp');
14 has conn_desc => (is => 'rw', isa => Str);
15
16 =head1 NAME
17
18 Catalyst::Engine::Stomp - write message handling apps with Catalyst.
19
20 =head1 SYNOPSIS
21
22   # In a server script:
23
24   BEGIN {
25     $ENV{CATALYST_ENGINE} = 'Stomp';
26     require Catalyst::Engine::Stomp;
27   }
28
29   MyApp->config(
30      'Engine::Stomp' = {
31        hostname         => '127.0.0.1',
32        port             => 61613,
33        subscribe_header => {
34          transformation       => 'jms-to-json',
35        }
36     },
37   );
38   MyApp->run();
39
40   # In a controller, or controller base class:
41   use base qw/ Catalyst::Controller::MessageDriven /;
42
43   # then create actions, which map as message types
44   sub testaction : Local {
45       my ($self, $c) = @_;
46
47       # Reply with a minimal response message
48       my $response = { type => 'testaction_response' };
49       $c->stash->{response} = $response;
50   }
51
52 =head1 DESCRIPTION
53
54 Write a Catalyst app connected to a Stomp messagebroker, not HTTP. You
55 need a controller that understands messaging, as well as this engine.
56
57 This is single-threaded and single process - you need to run multiple
58 instances of this engine to get concurrency, and configure your broker
59 to load-balance across multiple consumers of the same queue.
60
61 Controllers are mapped to Stomp queues, and a controller base class is
62 provided, Catalyst::Controller::MessageDriven, which implements
63 YAML-serialized messages, mapping a top-level YAML "type" key to
64 the action.
65
66 =head1 METHODS
67
68 =head2 run
69
70 App entry point. Starts a loop listening for messages.
71
72 =cut
73
74 sub run {
75         my ($self, $app, $oneshot) = @_;
76
77         die 'No Engine::Stomp configuration found'
78              unless ref $app->config->{'Engine::Stomp'} eq 'HASH';
79
80         my @queues = grep { length $_ }
81                      map  { $app->controller($_)->action_namespace } $app->controllers;
82
83         # connect up
84         my %template = %{$app->config->{'Engine::Stomp'}};
85         my $add_header = delete $template{subscribe_header};
86         if (ref($add_header) ne 'HASH') {
87             $add_header = undef;
88         }
89         $self->connection(Net::Stomp->new(\%template));
90         $self->connection->connect();
91         $self->conn_desc($template{hostname}.':'.$template{port});
92
93         # subscribe, with client ack.
94         foreach my $queue (@queues) {
95                 my $queue_name = "/queue/$queue";
96                 my $header_hash = {
97                     destination => $queue_name,
98                     ack         => 'client',
99                 };
100
101                 # add the additional headers - yes I know it overwrites but
102                 # thats the dev's problem?
103                 if (keys %{$add_header}) {
104                     foreach my $key (keys %{$add_header}) {
105                         $header_hash->{$key} = $add_header->{$key};
106                     }
107                 }
108
109                 $self->connection->subscribe($header_hash);
110         }
111
112         # enter loop...
113         while (1) {
114                 my $frame = $self->connection->receive_frame();
115                 $self->handle_stomp_frame($app, $frame);
116                 last if $ENV{ENGINE_ONESHOT};
117         }
118         exit 0;
119 }
120
121 =head2 prepare_request
122
123 Overridden to add the source broker to the request, in place of the
124 client IP address.
125
126 =cut
127
128 sub prepare_request {
129     my ($self, $c, $req, $res_ref) = @_;
130     shift @_;
131     $self->next::method(@_);
132     $c->req->address($self->conn_desc);
133 }
134
135 =head2 finalize_headers
136
137 Overridden to dump out any errors encountered, since you won't get a
138 "debugging" message as for HTTP.
139
140 =cut
141
142 sub finalize_headers {
143     my ($self, $c) = @_;
144     my $error = join "\n", @{$c->error};
145     if ($error) {
146         $c->log->debug($error);
147     }
148     return $self->next::method($c);
149 }
150
151 =head2 handle_stomp_frame
152
153 Dispatch according to Stomp frame type.
154
155 =cut
156
157 sub handle_stomp_frame {
158     my ($self, $app, $frame) = @_;
159
160     my $command = $frame->command();
161     if ($command eq 'MESSAGE') {
162         $self->handle_stomp_message($app, $frame);
163     }
164     elsif ($command eq 'ERROR') {
165         $self->handle_stomp_error($app, $frame);
166     }
167     else {
168         $app->log->debug("Got unknown Stomp command: $command");
169     }
170 }
171
172 =head2 handle_stomp_message
173
174 Dispatch a Stomp message into the Catalyst app.
175
176 =cut
177
178 sub handle_stomp_message {
179     my ($self, $app, $frame) = @_;
180
181     # queue -> controller
182     my $queue = $frame->headers->{destination};
183     my ($controller) = $queue =~ m|^/queue/(.*)$|;
184
185     # set up request
186     my $config = $app->config->{'Engine::Stomp'};
187     my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller;
188     my $req = HTTP::Request->new(POST => $url);
189     $req->content($frame->body);
190     $req->content_length(length $frame->body);
191
192     # dispatch
193     my $response;
194     $app->handle_request($req, \$response);
195
196     # reply, if header set
197     if (my $reply_to = $response->headers->header('X-Reply-Address')) {
198         my $reply_queue = '/remote-temp-queue/' . $reply_to;
199         $self->connection->send({ destination => $reply_queue, body => $response->content });
200     }
201
202     # ack the message off the queue now we've replied / processed
203     $self->connection->ack( { frame => $frame } );
204 }
205 =head2 handle_stomp_error
206
207 Log any Stomp error frames we receive.
208
209 =cut
210
211 sub handle_stomp_error {
212     my ($self, $app, $frame) = @_;
213
214     my $error = $frame->headers->{message};
215     $app->log->debug("Got Stomp error: $error");
216 }
217
218 __PACKAGE__->meta->make_immutable;
219
220 =head1 CONFIGURATION
221
222 =head2 subscribe_header
223
224 Add additional header key/value pairs to the subscribe message sent to the
225 message broker.
226
227 =cut