Added some pod for type_key, serializer and subscribe_header features
[catagits/Catalyst-Engine-STOMP.git] / lib / Catalyst / Engine / Stomp.pm
1 package Catalyst::Engine::Stomp;
2 use Moose;
3 use List::MoreUtils qw/ uniq /;
4 use HTTP::Request;
5 use Net::Stomp;
6 use namespace::autoclean;
7
8 extends 'Catalyst::Engine::Embeddable';
9
10 our $VERSION = '0.06';
11
12 has connection => (is => 'rw', isa => 'Net::Stomp');
13 has conn_desc => (is => 'rw', isa => 'Str');
14
15 =head1 NAME
16
17 Catalyst::Engine::Stomp - write message handling apps with Catalyst.
18
19 =head1 SYNOPSIS
20
21   # In a server script:
22
23   BEGIN {
24     $ENV{CATALYST_ENGINE} = 'Stomp';
25     require Catalyst::Engine::Stomp;
26   }
27
28   MyApp->config->{Engine::Stomp} =
29    {
30      hostname         => '127.0.0.1',
31      port             => 61613,
32      subscribe_header => {
33        transformation       => 'jms-to-json',
34      }
35    };
36   MyApp->run();
37
38   # In a controller, or controller base class:
39   use base qw/ Catalyst::Controller::MessageDriven /;
40
41   # then create actions, which map as message types
42   sub testaction : Local {
43       my ($self, $c) = @_;
44
45       # Reply with a minimal response message
46       my $response = { type => 'testaction_response' };
47       $c->stash->{response} = $response;
48   }
49
50 =head1 DESCRIPTION
51
52 Write a Catalyst app connected to a Stomp messagebroker, not HTTP. You
53 need a controller that understands messaging, as well as this engine.
54
55 This is single-threaded and single process - you need to run multiple
56 instances of this engine to get concurrency, and configure your broker
57 to load-balance across multiple consumers of the same queue.
58
59 Controllers are mapped to Stomp queues, and a controller base class is
60 provided, Catalyst::Controller::MessageDriven, which implements
61 YAML-serialized messages, mapping a top-level YAML "type" key to
62 the action.
63
64 =head1 METHODS
65
66 =head2 run
67
68 App entry point. Starts a loop listening for messages.
69
70 =cut
71
72 sub run {
73         my ($self, $app, $oneshot) = @_;
74
75         die 'No Engine::Stomp configuration found'
76              unless ref $app->config->{'Engine::Stomp'} eq 'HASH';
77
78         my @queues = grep { length $_ }
79                      map  { $app->controller($_)->action_namespace } $app->controllers;
80
81         # connect up
82         my %template = %{$app->config->{'Engine::Stomp'}};
83         my $add_header = delete $template{subscribe_header};
84         if (ref($add_header) ne 'HASH') {
85             $add_header = undef;
86         }
87         $self->connection(Net::Stomp->new(\%template));
88         $self->connection->connect();
89         $self->conn_desc($template{hostname}.':'.$template{port});
90
91         # subscribe, with client ack.
92         foreach my $queue (@queues) {
93                 my $queue_name = "/queue/$queue";
94                 my $header_hash = {
95                     destination => $queue_name,
96                     ack         => 'client',
97                 };
98
99                 # add the additional headers - yes I know it overwrites but
100                 # thats the dev's problem?
101                 if (keys %{$add_header}) {
102                     foreach my $key (keys %{$add_header}) {
103                         $header_hash->{$key} = $add_header->{$key};
104                     }
105                 }
106
107                 $self->connection->subscribe($header_hash);
108         }
109
110         # enter loop...
111         while (1) {
112                 my $frame = $self->connection->receive_frame();
113                 $self->handle_stomp_frame($app, $frame);
114                 last if $ENV{ENGINE_ONESHOT};
115         }
116         exit 0;
117 }
118
119 =head2 prepare_request
120
121 Overridden to add the source broker to the request, in place of the
122 client IP address.
123
124 =cut
125
126 sub prepare_request {
127     my ($self, $c, $req, $res_ref) = @_;
128     shift @_;
129     $self->next::method(@_);
130     $c->req->address($self->conn_desc);
131 }
132
133 =head2 finalize_headers
134
135 Overridden to dump out any errors encountered, since you won't get a
136 "debugging" message as for HTTP.
137
138 =cut
139
140 sub finalize_headers {
141     my ($self, $c) = @_;
142     my $error = join "\n", @{$c->error};
143     if ($error) {
144         $c->log->debug($error);
145     }
146     return $self->next::method($c);
147 }
148
149 =head2 handle_stomp_frame
150
151 Dispatch according to Stomp frame type.
152
153 =cut
154
155 sub handle_stomp_frame {
156     my ($self, $app, $frame) = @_;
157
158     my $command = $frame->command();
159     if ($command eq 'MESSAGE') {
160         $self->handle_stomp_message($app, $frame);
161     }
162     elsif ($command eq 'ERROR') {
163         $self->handle_stomp_error($app, $frame);
164     }
165     else {
166         $app->log->debug("Got unknown Stomp command: $command");
167     }
168 }
169
170 =head2 handle_stomp_message
171
172 Dispatch a Stomp message into the Catalyst app.
173
174 =cut
175
176 sub handle_stomp_message {
177     my ($self, $app, $frame) = @_;
178
179     # queue -> controller
180     my $queue = $frame->headers->{destination};
181     my ($controller) = $queue =~ m|^/queue/(.*)$|;
182
183     # set up request
184     my $config = $app->config->{'Engine::Stomp'};
185     my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller;
186     my $req = HTTP::Request->new(POST => $url);
187     $req->content($frame->body);
188     $req->content_length(length $frame->body);
189
190     # dispatch
191     my $response;
192     $app->handle_request($req, \$response);
193
194     # reply, if header set
195     if (my $reply_to = $response->headers->header('X-Reply-Address')) {
196         my $reply_queue = '/remote-temp-queue/' . $reply_to;
197         $self->connection->send({ destination => $reply_queue, body => $response->content });
198     }
199
200     # ack the message off the queue now we've replied / processed
201     $self->connection->ack( { frame => $frame } );
202 }
203 =head2 handle_stomp_error
204
205 Log any Stomp error frames we receive.
206
207 =cut
208
209 sub handle_stomp_error {
210     my ($self, $app, $frame) = @_;
211
212     my $error = $frame->headers->{message};
213     $app->log->debug("Got Stomp error: $error");
214 }
215
216 __PACKAGE__->meta->make_immutable;
217
218 =head1 CONFIGURATION
219
220 =head2 subscribe_header
221
222 Add additional header key/value pairs to the subscribe message sent to the
223 message broker.
224
225 =cut