Changes for 0.04
[catagits/Catalyst-Engine-STOMP.git] / lib / Catalyst / Engine / Stomp.pm
1 package Catalyst::Engine::Stomp;
2 use Moose;
3 extends 'Catalyst::Engine::Embeddable';
4
5 our $VERSION = '0.04';
6
7 use List::MoreUtils qw/ uniq /;
8 use HTTP::Request;
9 use Net::Stomp;
10
11 has connection => (is => 'rw', isa => 'Net::Stomp');
12 has conn_desc => (is => 'rw', isa => 'Str');
13
14 =head1 NAME
15
16 Catalyst::Engine::Stomp - write message handling apps with Catalyst.
17
18 =head1 SYNOPSIS
19
20   # In a server script:
21
22   BEGIN {
23     $ENV{CATALYST_ENGINE} = 'Stomp';
24     require Catalyst::Engine::Stomp;
25   }  
26
27   MyApp->config->{Engine::Stomp} =
28    {
29      hostname => '127.0.0.1',
30      port     => 61613,
31    };
32   MyApp->run();
33
34   # In a controller, or controller base class:
35   use base qw/ Catalyst::Controller::MessageDriven /;
36
37   # then create actions, which map as message types
38   sub testaction : Local {
39       my ($self, $c) = @_;
40
41       # Reply with a minimal response message
42       my $response = { type => 'testaction_response' };
43       $c->stash->{response} = $response;
44   }
45
46 =head1 DESCRIPTION
47
48 Write a Catalyst app connected to a Stomp messagebroker, not HTTP. You
49 need a controller that understands messaging, as well as this engine. 
50
51 This is single-threaded and single process - you need to run multiple
52 instances of this engine to get concurrency, and configure your broker
53 to load-balance across multiple consumers of the same queue.
54
55 Controllers are mapped to Stomp queues, and a controller base class is
56 provided, Catalyst::Controller::MessageDriven, which implements
57 YAML-serialized messages, mapping a top-level YAML "type" key to 
58 the action. 
59
60 =head1 METHODS
61
62 =head2 run
63
64 App entry point. Starts a loop listening for messages.
65
66 =cut
67
68 sub run {
69         my ($self, $app, $oneshot) = @_;
70
71         die 'No Engine::Stomp configuration found'
72              unless ref $app->config->{'Engine::Stomp'} eq 'HASH';
73
74         # list the path namespaces that will be mapped as queues.
75         #
76         # this is known to use the deprecated
77         # Dispatcher->action_hash() method, but there doesn't appear
78         # to be another way to get the relevant strings out.
79         #
80         # http://github.com/rafl/catalyst-runtime/commit/5de163f4963d9dbb41d7311ca6f17314091b7af3#L2R644
81         #
82         my @queues =
83             uniq
84             grep { length $_ }
85             map  { $_->namespace }
86             values %{$app->dispatcher->action_hash};
87
88         # connect up
89         my %template = %{$app->config->{'Engine::Stomp'}};
90         $self->connection(Net::Stomp->new(\%template));
91         $self->connection->connect();
92         $self->conn_desc($template{hostname}.':'.$template{port});
93
94         # subscribe, with client ack.
95         foreach my $queue (@queues) {
96                 my $queue_name = "/queue/$queue";
97                 $self->connection->subscribe({
98                                               destination => $queue_name, 
99                                               ack         => 'client' 
100                                              });
101         }
102
103         # enter loop...
104         while (1) {
105                 my $frame = $self->connection->receive_frame();
106                 $self->handle_stomp_frame($app, $frame);
107                 last if $ENV{ENGINE_ONESHOT};
108         }
109         exit 0;
110 }
111
112 =head2 prepare_request
113
114 Overridden to add the source broker to the request, in place of the
115 client IP address.
116
117 =cut
118
119 sub prepare_request {
120         my ($self, $c, $req, $res_ref) = @_;
121         shift @_;
122         $self->next::method(@_);
123         $c->req->address($self->conn_desc);
124 }
125
126 =head2 finalize_headers
127
128 Overridden to dump out any errors encountered, since you won't get a
129 "debugging" message as for HTTP.
130
131 =cut
132
133 sub finalize_headers {
134         my ($self, $c) = @_;
135         my $error = join "\n", @{$c->error};
136         if ($error) {
137                 $c->log->debug($error);
138         }
139         return $self->next::method($c);
140 }
141
142 =head2 handle_stomp_frame
143
144 Dispatch according to Stomp frame type.
145
146 =cut
147
148 sub handle_stomp_frame {
149         my ($self, $app, $frame) = @_;
150
151         my $command = $frame->command();
152         if ($command eq 'MESSAGE') {
153                 $self->handle_stomp_message($app, $frame);
154         }
155         elsif ($command eq 'ERROR') {
156                 $self->handle_stomp_error($app, $frame);
157         }
158         else {
159                 $app->log->debug("Got unknown Stomp command: $command");
160         }
161 }
162
163 =head2 handle_stomp_message
164
165 Dispatch a Stomp message into the Catalyst app.
166
167 =cut
168
169 sub handle_stomp_message {
170         my ($self, $app, $frame) = @_;
171
172         # queue -> controller
173         my $queue = $frame->headers->{destination};
174         my ($controller) = $queue =~ m!^/queue/(.*)$!;
175
176         # set up request
177         my $config = $app->config->{'Engine::Stomp'};
178         my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller;
179         my $req = HTTP::Request->new(POST => $url);
180         $req->content($frame->body);
181         $req->content_length(length $frame->body);
182
183         # dispatch
184         my $response;
185         $app->handle_request($req, \$response);
186
187         # reply, if header set
188         if (my $reply_to = $response->headers->header('X-Reply-Address')) {
189                 my $reply_queue = '/remote-temp-queue/' . $reply_to;
190                 $self->connection->send({ destination => $reply_queue, body => $response->content });
191         }
192
193         # ack the message off the queue now we've replied / processed
194         $self->connection->ack( { frame => $frame } );
195 }
196
197 =head2 handle_stomp_error
198
199 Log any Stomp error frames we receive.
200
201 =cut
202
203 sub handle_stomp_error {
204         my ($self, $app, $frame) = @_;
205         
206         my $error = $frame->headers->{message};
207         $app->log->debug("Got Stomp error: $error");
208 }
209
210 __PACKAGE__->meta->make_immutable;
211
212 1;
213