12238a4ec85b6437dababfb23ed25e61b00cef06
[catagits/Catalyst-Engine-STOMP.git] / lib / Catalyst / Engine / Stomp.pm
1 package Catalyst::Engine::Stomp;
2 use Moose;
3 extends 'Catalyst::Engine::Embeddable';
4
5 our $VERSION = '0.01';
6
7 use List::MoreUtils qw/ uniq /;
8 use HTTP::Request;
9 use Net::Stomp;
10
11 has connection => (is => 'rw', isa => 'Net::Stomp');
12 has conn_desc => (is => 'rw', isa => 'Str');
13
14 =head1 NAME
15
16 Catalyst::Engine::Stomp - write message handling apps with Catalyst.
17
18 =head1 SYNOPSIS
19
20   # In a server script:
21
22   BEGIN {
23     $ENV{CATALYST_ENGINE} = 'Stomp';
24     require Catalyst::Engine::Stomp;
25   }  
26
27   MyApp->config->{Engine::Stomp} =
28    {
29      hostname => '127.0.0.1',
30      port     => 61613,
31    };
32   MyApp->run();
33
34   # In a controller, or controller base class:
35
36   use YAML;
37
38   # configure YAML deserialization; requires Catalyst::Action::REST
39   __PACKAGE__->config(
40                     'default'   => 'text/x-yaml',
41                     'stash_key' => 'rest',
42                     'map'       => { 'text/x-yaml' => 'YAML' },
43                    );
44
45   sub begin :ActionClass('Deserialize') { }
46
47   # have a default action, which forwards to the correct action
48   # based on the message contents (the type).
49   sub default : Private {
50           my ($self, $c) = @_;
51
52           my $action = $c->req->data->{type};
53           $c->forward($action);
54   }  
55
56   # Send messages back:
57   $c->engine->send_message($queue, Dump($msg));
58
59 =head1 DESCRIPTION
60
61 Write a Catalyst app connected to a Stomp messagebroker, not HTTP. You
62 need a controller that understands messaging, as well as this engine. 
63
64 This is single-threaded and single process - you need to run multiple
65 instances of this engine to get concurrency, and configure your broker
66 to load-balance across multiple consumers of the same queue.
67
68 =head1 METHODS
69
70 =head2 run
71
72 App entry point. Starts a loop listening for messages.
73
74 =cut
75
76 sub run {
77         my ($self, $app, $oneshot) = @_;
78
79         die 'No Engine::Stomp configuration found'
80              unless ref $app->config->{'Engine::Stomp'} eq 'HASH';
81
82         # list the path namespaces that will be mapped as queues.
83         #
84         # this is known to use the deprecated
85         # Dispatcher->action_hash() method, but there doesn't appear
86         # to be another way to get the relevant strings out.
87         #
88         # http://github.com/rafl/catalyst-runtime/commit/5de163f4963d9dbb41d7311ca6f17314091b7af3#L2R644
89         #
90         my @queues =
91             uniq
92             grep { length $_ }
93             map  { $_->namespace }
94             values %{$app->dispatcher->action_hash};
95
96         # connect up
97         my %template = %{$app->config->{'Engine::Stomp'}};
98         $self->connection(Net::Stomp->new(\%template));
99         $self->connection->connect();
100         $self->conn_desc($template{hostname}.':'.$template{port});
101
102         # subscribe, with client ack.
103         foreach my $queue (@queues) {
104                 my $queue_name = "/queue/$queue";
105                 $self->connection->subscribe({
106                                               destination => $queue_name, 
107                                               ack         => 'client' 
108                                              });
109         }
110
111         # enter loop...
112         while (1) {
113                 my $frame = $self->connection->receive_frame();
114                 $self->handle_stomp_frame($app, $frame);
115                 last if $ENV{ENGINE_ONESHOT};
116         }
117         exit 0;
118 }
119
120 =head2 prepare_request
121
122 Overridden to add the source broker to the request, in place of the
123 client IP address.
124
125 =cut
126
127 sub prepare_request {
128         my ($self, $c, $req, $res_ref) = @_;
129         shift @_;
130         $self->next::method(@_);
131         $c->req->address($self->conn_desc);
132 }
133
134 =head2 finalize_headers
135
136 Overridden to dump out any errors encountered.
137
138 =cut
139
140 sub finalize_headers {
141         my ($self, $c) = @_;
142         my $error = join "\n", @{$c->error};
143         if ($error) {
144                 $c->log->debug($error);
145         }
146         return $self->next::method($c);
147 }
148
149 =head2 handle_stomp_frame
150
151 Dispatch according to STOMP frame type.
152
153 =cut
154
155 sub handle_stomp_frame {
156         my ($self, $app, $frame) = @_;
157
158         my $command = $frame->command();
159         $app->log->debug("Got STOMP command: $command");
160         
161         if ($command eq 'MESSAGE') {
162                 $self->handle_stomp_message($app, $frame);
163         }
164         elsif ($command eq 'ERROR') {
165                 $self->handle_stomp_error($app, $frame);
166         }
167         else {
168                 $app->log->debug("Got unknown STOMP command: $command");
169         }
170 }
171
172 =head2 handle_stomp_message
173
174 Dispatch a STOMP message into the Catalyst app.
175
176 =cut
177
178 sub handle_stomp_message {
179         my ($self, $app, $frame) = @_;
180
181         # queue -> controller
182         my $queue = $frame->headers->{destination};
183         my ($controller) = $queue =~ m!^/queue/(.*)$!;
184
185         # set up request
186         my $config = $app->config->{'Engine::Stomp'};
187         my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller;
188         my $req = HTTP::Request->new(POST => $url);
189         $req->content($frame->body);    
190         $req->content_length(length $frame->body);
191
192         # dispatch
193         my $response;
194         $app->handle_request($req, \$response);
195
196         # reply
197         my $reply_queue = '/remote-temp-queue/' . ($response->headers->header('X-Reply-Address'));
198         $app->log->debug("replying to $reply_queue\n");
199         $self->connection->send({ destination => $reply_queue, body => $response->content });
200
201         # ack the message off the queue now we've replied
202         $self->connection->ack( { frame => $frame } );
203 }
204
205 =head2 handle_stomp_error
206
207 Log any STOMP error frames we receive.
208
209 =cut
210
211 sub handle_stomp_error {
212         my ($self, $app, $frame) = @_;
213         
214         my $error = $frame->headers->{message};
215         $app->log->debug("Got STOMP error: $error");
216 }
217
218 1;
219