package Catalyst::Engine::Stomp;
use Moose;
-extends 'Catalyst::Engine::Embeddable';
-
-our $VERSION = '0.01';
-
use List::MoreUtils qw/ uniq /;
use HTTP::Request;
use Net::Stomp;
+use MooseX::Types::Moose qw/Str Int HashRef/;
+use namespace::autoclean;
+
+extends 'Catalyst::Engine::Embeddable';
+
+our $VERSION = '0.06';
has connection => (is => 'rw', isa => 'Net::Stomp');
-has conn_desc => (is => 'rw', isa => 'Str');
+has conn_desc => (is => 'rw', isa => Str);
=head1 NAME
BEGIN {
$ENV{CATALYST_ENGINE} = 'Stomp';
require Catalyst::Engine::Stomp;
- }
-
- MyApp->config->{Engine::Stomp} =
- {
- hostname => '127.0.0.1',
- port => 61613,
- };
+ }
+
+ MyApp->config(
+ 'Engine::Stomp' = {
+ hostname => '127.0.0.1',
+ port => 61613,
+ subscribe_header => {
+ transformation => 'jms-to-json',
+ }
+ },
+ );
MyApp->run();
# In a controller, or controller base class:
+ use base qw/ Catalyst::Controller::MessageDriven /;
- use YAML;
+ # then create actions, which map as message types
+ sub testaction : Local {
+ my ($self, $c) = @_;
- # configure YAML deserialization; requires Catalyst::Action::REST
- __PACKAGE__->config(
- 'default' => 'text/x-yaml',
- 'stash_key' => 'rest',
- 'map' => { 'text/x-yaml' => 'YAML' },
- );
-
- sub begin :ActionClass('Deserialize') { }
-
- # have a default action, which forwards to the correct action
- # based on the message contents (the type).
- sub default : Private {
- my ($self, $c) = @_;
-
- my $action = $c->req->data->{type};
- $c->forward($action);
- }
-
- # Send messages back:
- $c->engine->send_message($queue, Dump($msg));
+ # Reply with a minimal response message
+ my $response = { type => 'testaction_response' };
+ $c->stash->{response} = $response;
+ }
=head1 DESCRIPTION
Write a Catalyst app connected to a Stomp messagebroker, not HTTP. You
-need a controller that understands messaging, as well as this engine.
+need a controller that understands messaging, as well as this engine.
This is single-threaded and single process - you need to run multiple
instances of this engine to get concurrency, and configure your broker
to load-balance across multiple consumers of the same queue.
+Controllers are mapped to Stomp queues, and a controller base class is
+provided, Catalyst::Controller::MessageDriven, which implements
+YAML-serialized messages, mapping a top-level YAML "type" key to
+the action.
+
=head1 METHODS
=head2 run
my ($self, $app, $oneshot) = @_;
die 'No Engine::Stomp configuration found'
- unless ref $app->config->{'Engine::Stomp'} eq 'HASH';
-
- # list the path namespaces that will be mapped as queues.
- #
- # this is known to use the deprecated
- # Dispatcher->action_hash() method, but there doesn't appear
- # to be another way to get the relevant strings out.
- #
- # http://github.com/rafl/catalyst-runtime/commit/5de163f4963d9dbb41d7311ca6f17314091b7af3#L2R644
- #
- my @queues =
- uniq
- grep { length $_ }
- map { $_->namespace }
- values %{$app->dispatcher->action_hash};
-
- # connect up
+ unless ref $app->config->{'Engine::Stomp'} eq 'HASH';
+
+ my @queues = grep { length $_ }
+ map { $app->controller($_)->action_namespace } $app->controllers;
+
+ # connect up
my %template = %{$app->config->{'Engine::Stomp'}};
- $self->connection(Net::Stomp->new(\%template));
- $self->connection->connect();
- $self->conn_desc($template{hostname}.':'.$template{port});
+ my $add_header = delete $template{subscribe_header};
+ if (ref($add_header) ne 'HASH') {
+ $add_header = undef;
+ }
+ $self->connection(Net::Stomp->new(\%template));
+ $self->connection->connect();
+ $self->conn_desc($template{hostname}.':'.$template{port});
- # subscribe, with client ack.
+ # subscribe, with client ack.
foreach my $queue (@queues) {
- my $queue_name = "/queue/$queue";
- $self->connection->subscribe({
- destination => $queue_name,
- ack => 'client'
- });
+ my $queue_name = "/queue/$queue";
+ my $header_hash = {
+ destination => $queue_name,
+ ack => 'client',
+ };
+
+ # add the additional headers - yes I know it overwrites but
+ # thats the dev's problem?
+ if (keys %{$add_header}) {
+ foreach my $key (keys %{$add_header}) {
+ $header_hash->{$key} = $add_header->{$key};
+ }
+ }
+
+ $self->connection->subscribe($header_hash);
}
- # enter loop...
- while (1) {
- my $frame = $self->connection->receive_frame();
- $self->handle_stomp_frame($app, $frame);
- last if $ENV{ENGINE_ONESHOT};
- }
- exit 0;
+ # enter loop...
+ while (1) {
+ my $frame = $self->connection->receive_frame();
+ $self->handle_stomp_frame($app, $frame);
+ last if $ENV{ENGINE_ONESHOT};
+ }
+ exit 0;
}
=head2 prepare_request
=cut
sub prepare_request {
- my ($self, $c, $req, $res_ref) = @_;
- shift @_;
- $self->next::method(@_);
- $c->req->address($self->conn_desc);
+ my ($self, $c, $req, $res_ref) = @_;
+ shift @_;
+ $self->next::method(@_);
+ $c->req->address($self->conn_desc);
}
=head2 finalize_headers
-Overridden to dump out any errors encountered.
+Overridden to dump out any errors encountered, since you won't get a
+"debugging" message as for HTTP.
=cut
sub finalize_headers {
- my ($self, $c) = @_;
- my $error = join "\n", @{$c->error};
- if ($error) {
- $c->log->debug($error);
- }
- return $self->next::method($c);
+ my ($self, $c) = @_;
+ my $error = join "\n", @{$c->error};
+ if ($error) {
+ $c->log->debug($error);
+ }
+ return $self->next::method($c);
}
=head2 handle_stomp_frame
-Dispatch according to STOMP frame type.
+Dispatch according to Stomp frame type.
=cut
sub handle_stomp_frame {
- my ($self, $app, $frame) = @_;
-
- my $command = $frame->command();
- if ($command eq 'MESSAGE') {
- $self->handle_stomp_message($app, $frame);
- }
- elsif ($command eq 'ERROR') {
- $self->handle_stomp_error($app, $frame);
- }
- else {
- $app->log->debug("Got unknown STOMP command: $command");
- }
+ my ($self, $app, $frame) = @_;
+
+ my $command = $frame->command();
+ if ($command eq 'MESSAGE') {
+ $self->handle_stomp_message($app, $frame);
+ }
+ elsif ($command eq 'ERROR') {
+ $self->handle_stomp_error($app, $frame);
+ }
+ else {
+ $app->log->debug("Got unknown Stomp command: $command");
+ }
}
=head2 handle_stomp_message
-Dispatch a STOMP message into the Catalyst app.
+Dispatch a Stomp message into the Catalyst app.
=cut
sub handle_stomp_message {
- my ($self, $app, $frame) = @_;
-
- # queue -> controller
- my $queue = $frame->headers->{destination};
- my ($controller) = $queue =~ m!^/queue/(.*)$!;
-
- # set up request
- my $config = $app->config->{'Engine::Stomp'};
- my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller;
- my $req = HTTP::Request->new(POST => $url);
- $req->content($frame->body);
- $req->content_length(length $frame->body);
-
- # dispatch
- my $response;
- $app->handle_request($req, \$response);
-
- # reply
- my $reply_queue = '/remote-temp-queue/' . ($response->headers->header('X-Reply-Address'));
- $self->connection->send({ destination => $reply_queue, body => $response->content });
-
- # ack the message off the queue now we've replied
- $self->connection->ack( { frame => $frame } );
+ my ($self, $app, $frame) = @_;
+
+ # queue -> controller
+ my $queue = $frame->headers->{destination};
+ my ($controller) = $queue =~ m|^/queue/(.*)$|;
+
+ # set up request
+ my $config = $app->config->{'Engine::Stomp'};
+ my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller;
+ my $req = HTTP::Request->new(POST => $url);
+ $req->content($frame->body);
+ $req->content_length(length $frame->body);
+
+ # dispatch
+ my $response;
+ $app->handle_request($req, \$response);
+
+ # reply, if header set
+ if (my $reply_to = $response->headers->header('X-Reply-Address')) {
+ my $reply_queue = '/remote-temp-queue/' . $reply_to;
+ $self->connection->send({ destination => $reply_queue, body => $response->content });
+ }
+
+ # ack the message off the queue now we've replied / processed
+ $self->connection->ack( { frame => $frame } );
}
-
=head2 handle_stomp_error
-Log any STOMP error frames we receive.
+Log any Stomp error frames we receive.
=cut
sub handle_stomp_error {
- my ($self, $app, $frame) = @_;
-
- my $error = $frame->headers->{message};
- $app->log->debug("Got STOMP error: $error");
+ my ($self, $app, $frame) = @_;
+
+ my $error = $frame->headers->{message};
+ $app->log->debug("Got Stomp error: $error");
}
-1;
+__PACKAGE__->meta->make_immutable;
+
+=head1 CONFIGURATION
+=head2 subscribe_header
+
+Add additional header key/value pairs to the subscribe message sent to the
+message broker.
+
+=cut