Bletch, more tabs
[catagits/Catalyst-Engine-STOMP.git] / lib / Catalyst / Engine / Stomp.pm
1 package Catalyst::Engine::Stomp;
2 use Moose;
3 extends 'Catalyst::Engine::Embeddable';
4
5 our $VERSION = '0.03';
6
7 use List::MoreUtils qw/ uniq /;
8 use HTTP::Request;
9 use Net::Stomp;
10
11 has connection => (is => 'rw', isa => 'Net::Stomp');
12 has conn_desc => (is => 'rw', isa => 'Str');
13
14 =head1 NAME
15
16 Catalyst::Engine::Stomp - write message handling apps with Catalyst.
17
18 =head1 SYNOPSIS
19
20   # In a server script:
21
22   BEGIN {
23     $ENV{CATALYST_ENGINE} = 'Stomp';
24     require Catalyst::Engine::Stomp;
25   }
26
27   MyApp->config->{Engine::Stomp} =
28    {
29      hostname => '127.0.0.1',
30      port     => 61613,
31    };
32   MyApp->run();
33
34   # In a controller, or controller base class:
35   use base qw/ Catalyst::Controller::MessageDriven /;
36
37   # then create actions, which map as message types
38   sub testaction : Local {
39       my ($self, $c) = @_;
40
41       # Reply with a minimal response message
42       my $response = { type => 'testaction_response' };
43       $c->stash->{response} = $response;
44   }
45
46   # The default serialization is YAML, but this configuration
47   # may be overridden in your controller:
48   __PACKAGE__->config(
49               'default'   => 'text/x-yaml',
50             'stash_key' => 'rest',
51             'map'       => { 'text/x-yaml' => 'YAML' },
52            );
53
54 =head1 DESCRIPTION
55
56 Write a Catalyst app connected to a Stomp messagebroker, not HTTP. You
57 need a controller that understands messaging, as well as this engine.
58
59 This is single-threaded and single process - you need to run multiple
60 instances of this engine to get concurrency, and configure your broker
61 to load-balance across multiple consumers of the same queue.
62
63 Controllers are mapped to Stomp queues, and a controller base class is
64 provided, Catalyst::Controller::MessageDriven, which implements
65 YAML-serialized messages, mapping a top-level YAML "type" key to
66 the action.
67
68 =head1 METHODS
69
70 =head2 run
71
72 App entry point. Starts a loop listening for messages.
73
74 =cut
75
76 sub run {
77         my ($self, $app, $oneshot) = @_;
78
79         die 'No Engine::Stomp configuration found'
80          unless ref $app->config->{'Engine::Stomp'} eq 'HASH';
81
82         # list the path namespaces that will be mapped as queues.
83     #
84     # this is known to use the deprecated
85     # Dispatcher->action_hash() method, but there doesn't appear
86     # to be another way to get the relevant strings out.
87     #
88     # http://github.com/rafl/catalyst-runtime/commit/5de163f4963d9dbb41d7311ca6f17314091b7af3#L2R644
89     #
90         my @queues =
91         uniq
92         grep { length $_ }
93         map  { $_->namespace }
94         values %{$app->dispatcher->action_hash};
95
96     # connect up
97         my %template = %{$app->config->{'Engine::Stomp'}};
98     $self->connection(Net::Stomp->new(\%template));
99     $self->connection->connect();
100     $self->conn_desc($template{hostname}.':'.$template{port});
101
102     # subscribe, with client ack.
103         foreach my $queue (@queues) {
104         my $queue_name = "/queue/$queue";
105         $self->connection->subscribe({
106                           destination => $queue_name,
107                           ack         => 'client',
108                          });
109         }
110
111     # enter loop...
112     while (1) {
113         my $frame = $self->connection->receive_frame();
114         $self->handle_stomp_frame($app, $frame);
115         last if $ENV{ENGINE_ONESHOT};
116     }
117     exit 0;
118 }
119
120 =head2 prepare_request
121
122 Overridden to add the source broker to the request, in place of the
123 client IP address.
124
125 =cut
126
127 sub prepare_request {
128         my ($self, $c, $req, $res_ref) = @_;
129     shift @_;
130     $self->next::method(@_);
131     $c->req->address($self->conn_desc);
132 }
133
134 =head2 finalize_headers
135
136 Overridden to dump out any errors encountered, since you won't get a
137 "debugging" message as for HTTP.
138
139 =cut
140
141 sub finalize_headers {
142     my ($self, $c) = @_;
143     my $error = join "\n", @{$c->error};
144     if ($error) {
145         $c->log->debug($error);
146     }
147     return $self->next::method($c);
148 }
149
150 =head2 handle_stomp_frame
151
152 Dispatch according to Stomp frame type.
153
154 =cut
155
156 sub handle_stomp_frame {
157     my ($self, $app, $frame) = @_;
158
159     my $command = $frame->command();
160     if ($command eq 'MESSAGE') {
161         $self->handle_stomp_message($app, $frame);
162     }
163     elsif ($command eq 'ERROR') {
164         $self->handle_stomp_error($app, $frame);
165     }
166     else {
167         $app->log->debug("Got unknown Stomp command: $command");
168     }
169 }
170
171 =head2 handle_stomp_message
172
173 Dispatch a Stomp message into the Catalyst app.
174
175 =cut
176
177 sub handle_stomp_message {
178     my ($self, $app, $frame) = @_;
179
180     # queue -> controller
181     my $queue = $frame->headers->{destination};
182     my ($controller) = $queue =~ m!^/queue/(.*)$!;
183
184     # set up request
185         my $config = $app->config->{'Engine::Stomp'};
186         my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller;
187         my $req = HTTP::Request->new(POST => $url);
188         $req->content($frame->body);
189     $req->content_length(length $frame->body);
190
191     # dispatch
192     my $response;
193         $app->handle_request($req, \$response);
194
195     # reply
196     my $reply_queue = '/remote-temp-queue/' . ($response->headers->header('X-Reply-Address'));
197     $self->connection->send({ destination => $reply_queue, body => $response->content });
198
199     # ack the message off the queue now we've replied
200     $self->connection->ack( { frame => $frame } );
201 }
202
203 =head2 handle_stomp_error
204
205 Log any Stomp error frames we receive.
206
207 =cut
208
209 sub handle_stomp_error {
210     my ($self, $app, $frame) = @_;
211
212     my $error = $frame->headers->{message};
213     $app->log->debug("Got Stomp error: $error");
214 }
215
216 __PACKAGE__->meta->make_immutable;
217
218 1;
219