08fc0ec4e199d013454804e69cf7b1dc74f91781
[catagits/Catalyst-Engine-STOMP.git] / lib / Catalyst / Engine / Stomp.pm
1 package Catalyst::Engine::Stomp;
2 use Moose;
3 use List::MoreUtils qw/ uniq /;
4 use HTTP::Request;
5 use Net::Stomp;
6 use namespace::autoclean;
7
8 extends 'Catalyst::Engine::Embeddable';
9
10 our $VERSION = '0.05';
11
12 has connection => (is => 'rw', isa => 'Net::Stomp');
13 has conn_desc => (is => 'rw', isa => 'Str');
14
15 =head1 NAME
16
17 Catalyst::Engine::Stomp - write message handling apps with Catalyst.
18
19 =head1 SYNOPSIS
20
21   # In a server script:
22
23   BEGIN {
24     $ENV{CATALYST_ENGINE} = 'Stomp';
25     require Catalyst::Engine::Stomp;
26   }
27
28   MyApp->config->{Engine::Stomp} =
29    {
30      hostname => '127.0.0.1',
31      port     => 61613,
32    };
33   MyApp->run();
34
35   # In a controller, or controller base class:
36   use base qw/ Catalyst::Controller::MessageDriven /;
37
38   # then create actions, which map as message types
39   sub testaction : Local {
40       my ($self, $c) = @_;
41
42       # Reply with a minimal response message
43       my $response = { type => 'testaction_response' };
44       $c->stash->{response} = $response;
45   }
46
47 =head1 DESCRIPTION
48
49 Write a Catalyst app connected to a Stomp messagebroker, not HTTP. You
50 need a controller that understands messaging, as well as this engine.
51
52 This is single-threaded and single process - you need to run multiple
53 instances of this engine to get concurrency, and configure your broker
54 to load-balance across multiple consumers of the same queue.
55
56 Controllers are mapped to Stomp queues, and a controller base class is
57 provided, Catalyst::Controller::MessageDriven, which implements
58 YAML-serialized messages, mapping a top-level YAML "type" key to
59 the action.
60
61 =head1 METHODS
62
63 =head2 run
64
65 App entry point. Starts a loop listening for messages.
66
67 =cut
68
69 sub run {
70         my ($self, $app, $oneshot) = @_;
71
72         die 'No Engine::Stomp configuration found'
73              unless ref $app->config->{'Engine::Stomp'} eq 'HASH';
74
75         # list the path namespaces that will be mapped as queues.
76         #
77         # this is known to use the deprecated
78         # Dispatcher->action_hash() method, but there doesn't appear
79         # to be another way to get the relevant strings out.
80         #
81         # http://github.com/rafl/catalyst-runtime/commit/5de163f4963d9dbb41d7311ca6f17314091b7af3#L2R644
82         #
83         my @queues =
84             uniq
85             grep { length $_ }
86             map  { $_->namespace }
87             values %{$app->dispatcher->action_hash};
88
89         # connect up
90         my %template = %{$app->config->{'Engine::Stomp'}};
91         $self->connection(Net::Stomp->new(\%template));
92         $self->connection->connect();
93         $self->conn_desc($template{hostname}.':'.$template{port});
94
95         # subscribe, with client ack.
96         foreach my $queue (@queues) {
97                 my $queue_name = "/queue/$queue";
98                 $self->connection->subscribe({
99                                               destination => $queue_name,
100                                               ack         => 'client'
101                                              });
102         }
103
104         # enter loop...
105         while (1) {
106                 my $frame = $self->connection->receive_frame();
107                 $self->handle_stomp_frame($app, $frame);
108                 last if $ENV{ENGINE_ONESHOT};
109         }
110         exit 0;
111 }
112
113 =head2 prepare_request
114
115 Overridden to add the source broker to the request, in place of the
116 client IP address.
117
118 =cut
119
120 sub prepare_request {
121     my ($self, $c, $req, $res_ref) = @_;
122     shift @_;
123     $self->next::method(@_);
124     $c->req->address($self->conn_desc);
125 }
126
127 =head2 finalize_headers
128
129 Overridden to dump out any errors encountered, since you won't get a
130 "debugging" message as for HTTP.
131
132 =cut
133
134 sub finalize_headers {
135     my ($self, $c) = @_;
136     my $error = join "\n", @{$c->error};
137     if ($error) {
138         $c->log->debug($error);
139     }
140     return $self->next::method($c);
141 }
142
143 =head2 handle_stomp_frame
144
145 Dispatch according to Stomp frame type.
146
147 =cut
148
149 sub handle_stomp_frame {
150     my ($self, $app, $frame) = @_;
151
152     my $command = $frame->command();
153     if ($command eq 'MESSAGE') {
154         $self->handle_stomp_message($app, $frame);
155     }
156     elsif ($command eq 'ERROR') {
157         $self->handle_stomp_error($app, $frame);
158     }
159     else {
160         $app->log->debug("Got unknown Stomp command: $command");
161     }
162 }
163
164 =head2 handle_stomp_message
165
166 Dispatch a Stomp message into the Catalyst app.
167
168 =cut
169
170 sub handle_stomp_message {
171     my ($self, $app, $frame) = @_;
172
173     # queue -> controller
174     my $queue = $frame->headers->{destination};
175     my ($controller) = $queue =~ m|^/queue/(.*)$|;
176
177     # set up request
178     my $config = $app->config->{'Engine::Stomp'};
179     my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller;
180     my $req = HTTP::Request->new(POST => $url);
181     $req->content($frame->body);
182     $req->content_length(length $frame->body);
183
184     # dispatch
185     my $response;
186     $app->handle_request($req, \$response);
187
188     # reply, if header set
189     if (my $reply_to = $response->headers->header('X-Reply-Address')) {
190         my $reply_queue = '/remote-temp-queue/' . $reply_to;
191         $self->connection->send({ destination => $reply_queue, body => $response->content });
192     }
193
194     # ack the message off the queue now we've replied / processed
195     $self->connection->ack( { frame => $frame } );
196 }
197
198 =head2 handle_stomp_error
199
200 Log any Stomp error frames we receive.
201
202 =cut
203
204 sub handle_stomp_error {
205     my ($self, $app, $frame) = @_;
206
207     my $error = $frame->headers->{message};
208     $app->log->debug("Got Stomp error: $error");
209 }
210
211 __PACKAGE__->meta->make_immutable;
212