--- /dev/null
+Revision history for Perl extension Catalyst::Engine::Stomp.
+
+0.01 Tue Dec 16 10:15:20 2008
+ - original version; created by h2xs 1.23 with options
+ -n Catalyst::Engine::Stomp
+
--- /dev/null
+Changes
+Makefile.PL
+MANIFEST
+README
+t/Catalyst-Engine-Stomp.t
+lib/Catalyst/Engine/Stomp.pm
+lib/Catalyst/Controller/MessageDriven.pm
+testapp/lib/TestApp/Controller/TestController.pm
+testapp/lib/TestApp/testapp.yml
+testapp/lib/TestApp.pm
+testapp/script/testapp_stomp.pl
+
--- /dev/null
+use 5.008008;
+use ExtUtils::MakeMaker;
+# See lib/ExtUtils/MakeMaker.pm for details of how to influence
+# the contents of the Makefile that is written.
+WriteMakefile(
+ NAME => 'Catalyst::Engine::Stomp',
+ VERSION_FROM => 'lib/Catalyst/Engine/Stomp.pm', # finds $VERSION
+ PREREQ_PM => {
+ Catalyst::Engine::Embeddable => 0.0.1,
+ Moose => 0,
+ Net::Stomp => 0.34,
+ YAML::XS => 0.32,
+ }, # e.g., Module::Name => 1.1
+ ($] >= 5.005 ? ## Add these new keywords supported since 5.005
+ (ABSTRACT_FROM => 'lib/Catalyst/Engine/Stomp.pm', # retrieve abstract from module
+ AUTHOR => 'Chris Andrews <chris@nodnol.org>') : ()),
+ LIBS => [''], # e.g., '-lm'
+ DEFINE => '', # e.g., '-DHAVE_SOMETHING'
+ INC => '-I.', # e.g., '-I. -I/usr/include/other'
+);
--- /dev/null
+Catalyst-Engine-Stomp version 0.01
+==================================
+
+The README is used to introduce the module and provide instructions on
+how to install the module, any machine dependencies it may have (for
+example C compilers and installed libraries) and any other information
+that should be provided before the module is installed.
+
+A README file is required for CPAN modules since CPAN extracts the
+README file from a module distribution so that people browsing the
+archive can use it get an idea of the modules uses. It is usually a
+good idea to provide version information here so that people can
+decide whether fixes for the module are worth downloading.
+
+INSTALLATION
+
+To install this module type the following:
+
+ perl Makefile.PL
+ make
+ make test
+ make install
+
+DEPENDENCIES
+
+This module requires these other modules and libraries:
+
+ blah blah blah
+
+COPYRIGHT AND LICENCE
+
+Put the correct copyright and licence information here.
+
+Copyright (C) 2008 by Chris Andrews
+
+This library is free software; you can redistribute it and/or modify
+it under the same terms as Perl itself, either Perl version 5.8.8 or,
+at your option, any later version of Perl 5 you may have available.
+
+
--- /dev/null
+package Catalyst::Controller::MessageDriven;
+use Moose;
+
+BEGIN { extends 'Catalyst::Controller' }
+
+__PACKAGE__->config(
+ 'default' => 'text/x-yaml',
+ 'stash_key' => 'response',
+ 'map' => { 'text/x-yaml' => 'YAML' },
+ );
+
+sub begin :ActionClass('Deserialize') { }
+
+sub end :ActionClass('Serialize') {
+ my ($self, $c) = @_;
+
+ # Engine will send our reply based on the value of this header.
+ $c->response->headers->header( 'X-Reply-Address' => $c->req->data->{reply_to} );
+
+ # Custom error handler - steal errors from catalyst and dump them into
+ # the stash, to get them serialized out as the reply.
+ if (scalar @{$c->error}) {
+ my $error = join "\n", @{$c->error};
+ $c->stash->{response} = { status => 'ERROR', error => $error };
+ $c->error(0); # clear errors, so our response isn't clobbered
+ }
+}
+
+sub default : Private {
+ my ($self, $c) = @_;
+
+ # Forward the request to the appropriate action, based on the
+ # message type.
+ my $action = $c->req->data->{type};
+ $c->forward($action, [$c->req->data]);
+}
+
+__PACKAGE__->meta->make_immutable;
+
+1;
--- /dev/null
+package Catalyst::Engine::Stomp;
+use Moose;
+extends 'Catalyst::Engine::Embeddable';
+
+our $VERSION = '0.01';
+
+use List::MoreUtils qw/ uniq /;
+use HTTP::Request;
+use Net::Stomp;
+
+has connection => (is => 'rw', isa => 'Net::Stomp');
+has conn_desc => (is => 'rw', isa => 'Str');
+
+=head1 NAME
+
+Catalyst::Engine::Stomp - write message handling apps with Catalyst.
+
+=head1 SYNOPSIS
+
+ # In a server script:
+
+ BEGIN {
+ $ENV{CATALYST_ENGINE} = 'Stomp';
+ require Catalyst::Engine::Stomp;
+ }
+
+ MyApp->config->{Engine::Stomp} =
+ {
+ hostname => '127.0.0.1',
+ port => 61613,
+ };
+ MyApp->run();
+
+ # In a controller, or controller base class:
+
+ use YAML;
+
+ # configure YAML deserialization; requires Catalyst::Action::REST
+ __PACKAGE__->config(
+ 'default' => 'text/x-yaml',
+ 'stash_key' => 'rest',
+ 'map' => { 'text/x-yaml' => 'YAML' },
+ );
+
+ sub begin :ActionClass('Deserialize') { }
+
+ # have a default action, which forwards to the correct action
+ # based on the message contents (the type).
+ sub default : Private {
+ my ($self, $c) = @_;
+
+ my $action = $c->req->data->{type};
+ $c->forward($action);
+ }
+
+ # Send messages back:
+ $c->engine->send_message($queue, Dump($msg));
+
+=head1 DESCRIPTION
+
+Write a Catalyst app connected to a Stomp messagebroker, not HTTP. You
+need a controller that understands messaging, as well as this engine.
+
+This is single-threaded and single process - you need to run multiple
+instances of this engine to get concurrency, and configure your broker
+to load-balance across multiple consumers of the same queue.
+
+=head1 METHODS
+
+=head2 run
+
+App entry point. Starts a loop listening for messages.
+
+=cut
+
+sub run {
+ my ($self, $app, $oneshot) = @_;
+
+ die 'No Engine::Stomp configuration found'
+ unless ref $app->config->{'Engine::Stomp'} eq 'HASH';
+
+ # list the path namespaces that will be mapped as queues.
+ #
+ # this is known to use the deprecated
+ # Dispatcher->action_hash() method, but there doesn't appear
+ # to be another way to get the relevant strings out.
+ #
+ # http://github.com/rafl/catalyst-runtime/commit/5de163f4963d9dbb41d7311ca6f17314091b7af3#L2R644
+ #
+ my @queues =
+ uniq
+ grep { length $_ }
+ map { $_->namespace }
+ values %{$app->dispatcher->action_hash};
+
+ # connect up
+ my %template = %{$app->config->{'Engine::Stomp'}};
+ $self->connection(Net::Stomp->new(\%template));
+ $self->connection->connect();
+ $self->conn_desc($template{hostname}.':'.$template{port});
+
+ # subscribe, with client ack.
+ foreach my $queue (@queues) {
+ my $queue_name = "/queue/$queue";
+ $self->connection->subscribe({
+ destination => $queue_name,
+ ack => 'client'
+ });
+ }
+
+ # enter loop...
+ while (1) {
+ my $frame = $self->connection->receive_frame();
+ $self->handle_stomp_frame($app, $frame);
+ last if $ENV{ENGINE_ONESHOT};
+ }
+ exit 0;
+}
+
+=head2 prepare_request
+
+Overridden to add the source broker to the request, in place of the
+client IP address.
+
+=cut
+
+sub prepare_request {
+ my ($self, $c, $req, $res_ref) = @_;
+ shift @_;
+ $self->next::method(@_);
+ $c->req->address($self->conn_desc);
+}
+
+=head2 finalize_headers
+
+Overridden to dump out any errors encountered.
+
+=cut
+
+sub finalize_headers {
+ my ($self, $c) = @_;
+ my $error = join "\n", @{$c->error};
+ if ($error) {
+ $c->log->debug($error);
+ }
+ return $self->next::method($c);
+}
+
+=head2 handle_stomp_frame
+
+Dispatch according to STOMP frame type.
+
+=cut
+
+sub handle_stomp_frame {
+ my ($self, $app, $frame) = @_;
+
+ my $command = $frame->command();
+ $app->log->debug("Got STOMP command: $command");
+
+ if ($command eq 'MESSAGE') {
+ $self->handle_stomp_message($app, $frame);
+ }
+ elsif ($command eq 'ERROR') {
+ $self->handle_stomp_error($app, $frame);
+ }
+ else {
+ $app->log->debug("Got unknown STOMP command: $command");
+ }
+}
+
+=head2 handle_stomp_message
+
+Dispatch a STOMP message into the Catalyst app.
+
+=cut
+
+sub handle_stomp_message {
+ my ($self, $app, $frame) = @_;
+
+ # queue -> controller
+ my $queue = $frame->headers->{destination};
+ my ($controller) = $queue =~ m!^/queue/(.*)$!;
+
+ # set up request
+ my $config = $app->config->{'Engine::Stomp'};
+ my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller;
+ my $req = HTTP::Request->new(POST => $url);
+ $req->content($frame->body);
+ $req->content_length(length $frame->body);
+
+ # dispatch
+ my $response;
+ $app->handle_request($req, \$response);
+
+ # reply
+ my $reply_queue = '/remote-temp-queue/' . ($response->headers->header('X-Reply-Address'));
+ $app->log->debug("replying to $reply_queue\n");
+ $self->connection->send({ destination => $reply_queue, body => $response->content });
+
+ # ack the message off the queue now we've replied
+ $self->connection->ack( { frame => $frame } );
+}
+
+=head2 handle_stomp_error
+
+Log any STOMP error frames we receive.
+
+=cut
+
+sub handle_stomp_error {
+ my ($self, $app, $frame) = @_;
+
+ my $error = $frame->headers->{message};
+ $app->log->debug("Got STOMP error: $error");
+}
+
+1;
+
--- /dev/null
+use Test::More tests => 14;
+use Test::Fork;
+
+# Tests which expect a STOMP server like ActiveMQ to exist on
+# localhost:61613, which is what you get if you just get the ActiveMQ
+# distro and run its out-of-the-box config.
+
+use Net::Stomp;
+use YAML::XS qw/ Dump Load /;
+use Data::Dumper;
+
+# First fire off the server
+my $child_pid = fork_ok(1, sub {
+ ok(system("$^X -Ilib -Itestapp/lib testapp/script/testapp_stomp.pl --oneshot"));
+ #sleep 3;
+});
+
+# Now be a client to that server
+
+my $stomp = Net::Stomp->new( { hostname => 'localhost', port => 61613 } );
+ok($stomp, 'Net::Stomp object');
+
+my $frame = $stomp->connect();
+ok($frame, 'connect to MQ server ok');
+
+my $reply_to = sprintf '%s:1', $frame->headers->{session};
+ok($frame->headers->{session}, 'got a session');
+ok(length $reply_to > 2, 'valid-looking reply_to queue');
+
+ok($stomp->subscribe( { destination => '/temp-queue/reply' } ), 'subscribe to temp queue');
+
+my $message = {
+ payload => { foo => 1, bar => 2 },
+ reply_to => $reply_to,
+ type => 'testaction',
+ };
+my $text = Dump($message);
+ok($text, 'compose message');
+
+$stomp->send( { destination => '/queue/testcontroller', body => $text } );
+
+my $reply_frame = $stomp->receive_frame();
+ok($reply_frame, 'got a reply');
+ok($reply_frame->headers->{destination} eq "/remote-temp-queue/$reply_to", 'came to correct temp queue');
+ok($reply_frame->body, 'has a body');
+
+my $response = Load($reply_frame->body);
+ok($response, 'YAML response ok');
+ok($response->{type} eq 'testaction_response', 'correct type');
+
+ok($stomp->disconnect, 'disconnected');
+
--- /dev/null
+package TestApp;
+use Moose;
+use Catalyst::Runtime '5.80002';
+
+use Catalyst qw/-Debug
+ ConfigLoader
+ /;
+
+extends 'Catalyst';
+
+our $VERSION = '0.01';
+
+__PACKAGE__->config( name => 'TestApp' );
+__PACKAGE__->setup();
+__PACKAGE__->meta->make_immutable;
+
+1;
--- /dev/null
+package TestApp::Controller::TestController;
+use Moose;
+
+BEGIN { extends 'Catalyst::Controller::MessageDriven' };
+
+sub testaction : Local {
+ my ($self, $c) = @_;
+
+ # Reply with a minimal response message
+ my $response = { type => 'testaction_response' };
+ $c->stash->{response} = $response;
+}
+
+1;
--- /dev/null
+---
+name: TestApp
+'Engine::Stomp':
+ hostname: localhost
+ port: 61613
--- /dev/null
+BEGIN {
+ $ENV{CATALYST_ENGINE} = 'Stomp';
+ require Catalyst::Engine::Stomp;
+}
+
+use strict;
+use warnings;
+use Getopt::Long;
+use Pod::Usage;
+use FindBin;
+use lib "$FindBin::Bin/../lib";
+
+my $debug = 0;
+my $help = 0;
+my $oneshot = 0;
+
+my @argv = @ARGV;
+
+GetOptions(
+ 'debug|d' => \$debug,
+ 'help|?' => \$help,
+ 'oneshot' => \$oneshot,
+);
+
+pod2usage(1) if $help;
+
+if ( $debug ) {
+ $ENV{CATALYST_DEBUG} = 1;
+}
+
+if ( $oneshot ) {
+ $ENV{ENGINE_ONESHOT} = 1;
+}
+
+# This is require instead of use so that the above environment
+# variables can be set at runtime.
+require TestApp;
+TestApp->run();
+
+1;
+
+=head1 NAME
+
+testapp_stomp.pl - Catalyst STOMP client
+
+=head1 SYNOPSIS
+
+testapp_stomp.pl [options]
+
+ Options:
+ -d -debug force debug mode
+ -? -help display this help and exits
+
+ See also:
+ perldoc Catalyst::Engine::Stomp
+ perldoc Catalyst::Manual
+ perldoc Catalyst::Manual::Intro
+
+=head1 DESCRIPTION
+
+Run a Catalyst STOMP client for this application.
+
+=cut