Changes for 0.04
[catagits/Catalyst-Engine-STOMP.git] / lib / Catalyst / Engine / Stomp.pm
CommitLineData
0a663589 1package Catalyst::Engine::Stomp;
2use Moose;
3extends 'Catalyst::Engine::Embeddable';
4
0ee4d565 5our $VERSION = '0.04';
0a663589 6
7use List::MoreUtils qw/ uniq /;
8use HTTP::Request;
9use Net::Stomp;
10
11has connection => (is => 'rw', isa => 'Net::Stomp');
12has conn_desc => (is => 'rw', isa => 'Str');
13
14=head1 NAME
15
16Catalyst::Engine::Stomp - write message handling apps with Catalyst.
17
18=head1 SYNOPSIS
19
20 # In a server script:
21
22 BEGIN {
23 $ENV{CATALYST_ENGINE} = 'Stomp';
24 require Catalyst::Engine::Stomp;
25 }
26
27 MyApp->config->{Engine::Stomp} =
28 {
29 hostname => '127.0.0.1',
30 port => 61613,
31 };
32 MyApp->run();
33
34 # In a controller, or controller base class:
d78bb739 35 use base qw/ Catalyst::Controller::MessageDriven /;
0a663589 36
d78bb739 37 # then create actions, which map as message types
38 sub testaction : Local {
39 my ($self, $c) = @_;
0a663589 40
d78bb739 41 # Reply with a minimal response message
42 my $response = { type => 'testaction_response' };
43 $c->stash->{response} = $response;
44 }
45
0a663589 46=head1 DESCRIPTION
47
48Write a Catalyst app connected to a Stomp messagebroker, not HTTP. You
49need a controller that understands messaging, as well as this engine.
50
51This is single-threaded and single process - you need to run multiple
52instances of this engine to get concurrency, and configure your broker
53to load-balance across multiple consumers of the same queue.
54
d78bb739 55Controllers are mapped to Stomp queues, and a controller base class is
56provided, Catalyst::Controller::MessageDriven, which implements
57YAML-serialized messages, mapping a top-level YAML "type" key to
58the action.
59
0a663589 60=head1 METHODS
61
62=head2 run
63
64App entry point. Starts a loop listening for messages.
65
66=cut
67
68sub run {
69 my ($self, $app, $oneshot) = @_;
70
71 die 'No Engine::Stomp configuration found'
bf8937b7 72 unless ref $app->config->{'Engine::Stomp'} eq 'HASH';
0a663589 73
74 # list the path namespaces that will be mapped as queues.
bf8937b7 75 #
76 # this is known to use the deprecated
77 # Dispatcher->action_hash() method, but there doesn't appear
78 # to be another way to get the relevant strings out.
79 #
80 # http://github.com/rafl/catalyst-runtime/commit/5de163f4963d9dbb41d7311ca6f17314091b7af3#L2R644
81 #
0a663589 82 my @queues =
bf8937b7 83 uniq
84 grep { length $_ }
85 map { $_->namespace }
86 values %{$app->dispatcher->action_hash};
0a663589 87
bf8937b7 88 # connect up
0a663589 89 my %template = %{$app->config->{'Engine::Stomp'}};
bf8937b7 90 $self->connection(Net::Stomp->new(\%template));
91 $self->connection->connect();
92 $self->conn_desc($template{hostname}.':'.$template{port});
0a663589 93
bf8937b7 94 # subscribe, with client ack.
0a663589 95 foreach my $queue (@queues) {
bf8937b7 96 my $queue_name = "/queue/$queue";
97 $self->connection->subscribe({
98 destination => $queue_name,
99 ack => 'client'
100 });
0a663589 101 }
102
bf8937b7 103 # enter loop...
104 while (1) {
105 my $frame = $self->connection->receive_frame();
106 $self->handle_stomp_frame($app, $frame);
107 last if $ENV{ENGINE_ONESHOT};
108 }
109 exit 0;
0a663589 110}
111
112=head2 prepare_request
113
114Overridden to add the source broker to the request, in place of the
115client IP address.
116
117=cut
118
119sub prepare_request {
120 my ($self, $c, $req, $res_ref) = @_;
bf8937b7 121 shift @_;
122 $self->next::method(@_);
123 $c->req->address($self->conn_desc);
0a663589 124}
125
126=head2 finalize_headers
127
d78bb739 128Overridden to dump out any errors encountered, since you won't get a
129"debugging" message as for HTTP.
0a663589 130
131=cut
132
133sub finalize_headers {
bf8937b7 134 my ($self, $c) = @_;
135 my $error = join "\n", @{$c->error};
136 if ($error) {
137 $c->log->debug($error);
138 }
139 return $self->next::method($c);
0a663589 140}
141
142=head2 handle_stomp_frame
143
d78bb739 144Dispatch according to Stomp frame type.
0a663589 145
146=cut
147
148sub handle_stomp_frame {
bf8937b7 149 my ($self, $app, $frame) = @_;
150
151 my $command = $frame->command();
152 if ($command eq 'MESSAGE') {
153 $self->handle_stomp_message($app, $frame);
154 }
155 elsif ($command eq 'ERROR') {
156 $self->handle_stomp_error($app, $frame);
157 }
158 else {
159 $app->log->debug("Got unknown Stomp command: $command");
160 }
0a663589 161}
162
163=head2 handle_stomp_message
164
d78bb739 165Dispatch a Stomp message into the Catalyst app.
0a663589 166
167=cut
168
169sub handle_stomp_message {
bf8937b7 170 my ($self, $app, $frame) = @_;
0a663589 171
bf8937b7 172 # queue -> controller
173 my $queue = $frame->headers->{destination};
174 my ($controller) = $queue =~ m!^/queue/(.*)$!;
0a663589 175
bf8937b7 176 # set up request
0a663589 177 my $config = $app->config->{'Engine::Stomp'};
178 my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller;
179 my $req = HTTP::Request->new(POST => $url);
bf8937b7 180 $req->content($frame->body);
181 $req->content_length(length $frame->body);
0a663589 182
bf8937b7 183 # dispatch
184 my $response;
0a663589 185 $app->handle_request($req, \$response);
186
bf8937b7 187 # reply, if header set
188 if (my $reply_to = $response->headers->header('X-Reply-Address')) {
189 my $reply_queue = '/remote-temp-queue/' . $reply_to;
190 $self->connection->send({ destination => $reply_queue, body => $response->content });
191 }
0a663589 192
bf8937b7 193 # ack the message off the queue now we've replied / processed
194 $self->connection->ack( { frame => $frame } );
0a663589 195}
196
197=head2 handle_stomp_error
198
d78bb739 199Log any Stomp error frames we receive.
0a663589 200
201=cut
202
203sub handle_stomp_error {
bf8937b7 204 my ($self, $app, $frame) = @_;
205
206 my $error = $frame->headers->{message};
207 $app->log->debug("Got Stomp error: $error");
0a663589 208}
209
d78bb739 210__PACKAGE__->meta->make_immutable;
211
0a663589 2121;
213