From: Chris Andrews Date: Thu, 30 Apr 2009 15:38:30 +0000 (+0100) Subject: Initial import of Catalyst::Engine::Stomp X-Git-Tag: 0.03~21 X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?p=catagits%2FCatalyst-Engine-STOMP.git;a=commitdiff_plain;h=0a66358961c58c5fff96a363853ee93b00f55119 Initial import of Catalyst::Engine::Stomp --- 0a66358961c58c5fff96a363853ee93b00f55119 diff --git a/Changes b/Changes new file mode 100644 index 0000000..958f7e6 --- /dev/null +++ b/Changes @@ -0,0 +1,6 @@ +Revision history for Perl extension Catalyst::Engine::Stomp. + +0.01 Tue Dec 16 10:15:20 2008 + - original version; created by h2xs 1.23 with options + -n Catalyst::Engine::Stomp + diff --git a/MANIFEST b/MANIFEST new file mode 100644 index 0000000..245cfe2 --- /dev/null +++ b/MANIFEST @@ -0,0 +1,12 @@ +Changes +Makefile.PL +MANIFEST +README +t/Catalyst-Engine-Stomp.t +lib/Catalyst/Engine/Stomp.pm +lib/Catalyst/Controller/MessageDriven.pm +testapp/lib/TestApp/Controller/TestController.pm +testapp/lib/TestApp/testapp.yml +testapp/lib/TestApp.pm +testapp/script/testapp_stomp.pl + diff --git a/Makefile.PL b/Makefile.PL new file mode 100644 index 0000000..fb84532 --- /dev/null +++ b/Makefile.PL @@ -0,0 +1,20 @@ +use 5.008008; +use ExtUtils::MakeMaker; +# See lib/ExtUtils/MakeMaker.pm for details of how to influence +# the contents of the Makefile that is written. +WriteMakefile( + NAME => 'Catalyst::Engine::Stomp', + VERSION_FROM => 'lib/Catalyst/Engine/Stomp.pm', # finds $VERSION + PREREQ_PM => { + Catalyst::Engine::Embeddable => 0.0.1, + Moose => 0, + Net::Stomp => 0.34, + YAML::XS => 0.32, + }, # e.g., Module::Name => 1.1 + ($] >= 5.005 ? ## Add these new keywords supported since 5.005 + (ABSTRACT_FROM => 'lib/Catalyst/Engine/Stomp.pm', # retrieve abstract from module + AUTHOR => 'Chris Andrews ') : ()), + LIBS => [''], # e.g., '-lm' + DEFINE => '', # e.g., '-DHAVE_SOMETHING' + INC => '-I.', # e.g., '-I. -I/usr/include/other' +); diff --git a/README b/README new file mode 100644 index 0000000..00e5819 --- /dev/null +++ b/README @@ -0,0 +1,40 @@ +Catalyst-Engine-Stomp version 0.01 +================================== + +The README is used to introduce the module and provide instructions on +how to install the module, any machine dependencies it may have (for +example C compilers and installed libraries) and any other information +that should be provided before the module is installed. + +A README file is required for CPAN modules since CPAN extracts the +README file from a module distribution so that people browsing the +archive can use it get an idea of the modules uses. It is usually a +good idea to provide version information here so that people can +decide whether fixes for the module are worth downloading. + +INSTALLATION + +To install this module type the following: + + perl Makefile.PL + make + make test + make install + +DEPENDENCIES + +This module requires these other modules and libraries: + + blah blah blah + +COPYRIGHT AND LICENCE + +Put the correct copyright and licence information here. + +Copyright (C) 2008 by Chris Andrews + +This library is free software; you can redistribute it and/or modify +it under the same terms as Perl itself, either Perl version 5.8.8 or, +at your option, any later version of Perl 5 you may have available. + + diff --git a/lib/Catalyst/Controller/MessageDriven.pm b/lib/Catalyst/Controller/MessageDriven.pm new file mode 100644 index 0000000..fba221d --- /dev/null +++ b/lib/Catalyst/Controller/MessageDriven.pm @@ -0,0 +1,40 @@ +package Catalyst::Controller::MessageDriven; +use Moose; + +BEGIN { extends 'Catalyst::Controller' } + +__PACKAGE__->config( + 'default' => 'text/x-yaml', + 'stash_key' => 'response', + 'map' => { 'text/x-yaml' => 'YAML' }, + ); + +sub begin :ActionClass('Deserialize') { } + +sub end :ActionClass('Serialize') { + my ($self, $c) = @_; + + # Engine will send our reply based on the value of this header. + $c->response->headers->header( 'X-Reply-Address' => $c->req->data->{reply_to} ); + + # Custom error handler - steal errors from catalyst and dump them into + # the stash, to get them serialized out as the reply. + if (scalar @{$c->error}) { + my $error = join "\n", @{$c->error}; + $c->stash->{response} = { status => 'ERROR', error => $error }; + $c->error(0); # clear errors, so our response isn't clobbered + } +} + +sub default : Private { + my ($self, $c) = @_; + + # Forward the request to the appropriate action, based on the + # message type. + my $action = $c->req->data->{type}; + $c->forward($action, [$c->req->data]); +} + +__PACKAGE__->meta->make_immutable; + +1; diff --git a/lib/Catalyst/Engine/Stomp.pm b/lib/Catalyst/Engine/Stomp.pm new file mode 100644 index 0000000..12238a4 --- /dev/null +++ b/lib/Catalyst/Engine/Stomp.pm @@ -0,0 +1,219 @@ +package Catalyst::Engine::Stomp; +use Moose; +extends 'Catalyst::Engine::Embeddable'; + +our $VERSION = '0.01'; + +use List::MoreUtils qw/ uniq /; +use HTTP::Request; +use Net::Stomp; + +has connection => (is => 'rw', isa => 'Net::Stomp'); +has conn_desc => (is => 'rw', isa => 'Str'); + +=head1 NAME + +Catalyst::Engine::Stomp - write message handling apps with Catalyst. + +=head1 SYNOPSIS + + # In a server script: + + BEGIN { + $ENV{CATALYST_ENGINE} = 'Stomp'; + require Catalyst::Engine::Stomp; + } + + MyApp->config->{Engine::Stomp} = + { + hostname => '127.0.0.1', + port => 61613, + }; + MyApp->run(); + + # In a controller, or controller base class: + + use YAML; + + # configure YAML deserialization; requires Catalyst::Action::REST + __PACKAGE__->config( + 'default' => 'text/x-yaml', + 'stash_key' => 'rest', + 'map' => { 'text/x-yaml' => 'YAML' }, + ); + + sub begin :ActionClass('Deserialize') { } + + # have a default action, which forwards to the correct action + # based on the message contents (the type). + sub default : Private { + my ($self, $c) = @_; + + my $action = $c->req->data->{type}; + $c->forward($action); + } + + # Send messages back: + $c->engine->send_message($queue, Dump($msg)); + +=head1 DESCRIPTION + +Write a Catalyst app connected to a Stomp messagebroker, not HTTP. You +need a controller that understands messaging, as well as this engine. + +This is single-threaded and single process - you need to run multiple +instances of this engine to get concurrency, and configure your broker +to load-balance across multiple consumers of the same queue. + +=head1 METHODS + +=head2 run + +App entry point. Starts a loop listening for messages. + +=cut + +sub run { + my ($self, $app, $oneshot) = @_; + + die 'No Engine::Stomp configuration found' + unless ref $app->config->{'Engine::Stomp'} eq 'HASH'; + + # list the path namespaces that will be mapped as queues. + # + # this is known to use the deprecated + # Dispatcher->action_hash() method, but there doesn't appear + # to be another way to get the relevant strings out. + # + # http://github.com/rafl/catalyst-runtime/commit/5de163f4963d9dbb41d7311ca6f17314091b7af3#L2R644 + # + my @queues = + uniq + grep { length $_ } + map { $_->namespace } + values %{$app->dispatcher->action_hash}; + + # connect up + my %template = %{$app->config->{'Engine::Stomp'}}; + $self->connection(Net::Stomp->new(\%template)); + $self->connection->connect(); + $self->conn_desc($template{hostname}.':'.$template{port}); + + # subscribe, with client ack. + foreach my $queue (@queues) { + my $queue_name = "/queue/$queue"; + $self->connection->subscribe({ + destination => $queue_name, + ack => 'client' + }); + } + + # enter loop... + while (1) { + my $frame = $self->connection->receive_frame(); + $self->handle_stomp_frame($app, $frame); + last if $ENV{ENGINE_ONESHOT}; + } + exit 0; +} + +=head2 prepare_request + +Overridden to add the source broker to the request, in place of the +client IP address. + +=cut + +sub prepare_request { + my ($self, $c, $req, $res_ref) = @_; + shift @_; + $self->next::method(@_); + $c->req->address($self->conn_desc); +} + +=head2 finalize_headers + +Overridden to dump out any errors encountered. + +=cut + +sub finalize_headers { + my ($self, $c) = @_; + my $error = join "\n", @{$c->error}; + if ($error) { + $c->log->debug($error); + } + return $self->next::method($c); +} + +=head2 handle_stomp_frame + +Dispatch according to STOMP frame type. + +=cut + +sub handle_stomp_frame { + my ($self, $app, $frame) = @_; + + my $command = $frame->command(); + $app->log->debug("Got STOMP command: $command"); + + if ($command eq 'MESSAGE') { + $self->handle_stomp_message($app, $frame); + } + elsif ($command eq 'ERROR') { + $self->handle_stomp_error($app, $frame); + } + else { + $app->log->debug("Got unknown STOMP command: $command"); + } +} + +=head2 handle_stomp_message + +Dispatch a STOMP message into the Catalyst app. + +=cut + +sub handle_stomp_message { + my ($self, $app, $frame) = @_; + + # queue -> controller + my $queue = $frame->headers->{destination}; + my ($controller) = $queue =~ m!^/queue/(.*)$!; + + # set up request + my $config = $app->config->{'Engine::Stomp'}; + my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller; + my $req = HTTP::Request->new(POST => $url); + $req->content($frame->body); + $req->content_length(length $frame->body); + + # dispatch + my $response; + $app->handle_request($req, \$response); + + # reply + my $reply_queue = '/remote-temp-queue/' . ($response->headers->header('X-Reply-Address')); + $app->log->debug("replying to $reply_queue\n"); + $self->connection->send({ destination => $reply_queue, body => $response->content }); + + # ack the message off the queue now we've replied + $self->connection->ack( { frame => $frame } ); +} + +=head2 handle_stomp_error + +Log any STOMP error frames we receive. + +=cut + +sub handle_stomp_error { + my ($self, $app, $frame) = @_; + + my $error = $frame->headers->{message}; + $app->log->debug("Got STOMP error: $error"); +} + +1; + diff --git a/t/Catalyst-Engine-Stomp.t b/t/Catalyst-Engine-Stomp.t new file mode 100644 index 0000000..e96c028 --- /dev/null +++ b/t/Catalyst-Engine-Stomp.t @@ -0,0 +1,52 @@ +use Test::More tests => 14; +use Test::Fork; + +# Tests which expect a STOMP server like ActiveMQ to exist on +# localhost:61613, which is what you get if you just get the ActiveMQ +# distro and run its out-of-the-box config. + +use Net::Stomp; +use YAML::XS qw/ Dump Load /; +use Data::Dumper; + +# First fire off the server +my $child_pid = fork_ok(1, sub { + ok(system("$^X -Ilib -Itestapp/lib testapp/script/testapp_stomp.pl --oneshot")); + #sleep 3; +}); + +# Now be a client to that server + +my $stomp = Net::Stomp->new( { hostname => 'localhost', port => 61613 } ); +ok($stomp, 'Net::Stomp object'); + +my $frame = $stomp->connect(); +ok($frame, 'connect to MQ server ok'); + +my $reply_to = sprintf '%s:1', $frame->headers->{session}; +ok($frame->headers->{session}, 'got a session'); +ok(length $reply_to > 2, 'valid-looking reply_to queue'); + +ok($stomp->subscribe( { destination => '/temp-queue/reply' } ), 'subscribe to temp queue'); + +my $message = { + payload => { foo => 1, bar => 2 }, + reply_to => $reply_to, + type => 'testaction', + }; +my $text = Dump($message); +ok($text, 'compose message'); + +$stomp->send( { destination => '/queue/testcontroller', body => $text } ); + +my $reply_frame = $stomp->receive_frame(); +ok($reply_frame, 'got a reply'); +ok($reply_frame->headers->{destination} eq "/remote-temp-queue/$reply_to", 'came to correct temp queue'); +ok($reply_frame->body, 'has a body'); + +my $response = Load($reply_frame->body); +ok($response, 'YAML response ok'); +ok($response->{type} eq 'testaction_response', 'correct type'); + +ok($stomp->disconnect, 'disconnected'); + diff --git a/testapp/lib/TestApp.pm b/testapp/lib/TestApp.pm new file mode 100644 index 0000000..c28f6ac --- /dev/null +++ b/testapp/lib/TestApp.pm @@ -0,0 +1,17 @@ +package TestApp; +use Moose; +use Catalyst::Runtime '5.80002'; + +use Catalyst qw/-Debug + ConfigLoader + /; + +extends 'Catalyst'; + +our $VERSION = '0.01'; + +__PACKAGE__->config( name => 'TestApp' ); +__PACKAGE__->setup(); +__PACKAGE__->meta->make_immutable; + +1; diff --git a/testapp/lib/TestApp/Controller/TestController.pm b/testapp/lib/TestApp/Controller/TestController.pm new file mode 100644 index 0000000..75904f6 --- /dev/null +++ b/testapp/lib/TestApp/Controller/TestController.pm @@ -0,0 +1,14 @@ +package TestApp::Controller::TestController; +use Moose; + +BEGIN { extends 'Catalyst::Controller::MessageDriven' }; + +sub testaction : Local { + my ($self, $c) = @_; + + # Reply with a minimal response message + my $response = { type => 'testaction_response' }; + $c->stash->{response} = $response; +} + +1; diff --git a/testapp/lib/TestApp/testapp.yml b/testapp/lib/TestApp/testapp.yml new file mode 100644 index 0000000..775c112 --- /dev/null +++ b/testapp/lib/TestApp/testapp.yml @@ -0,0 +1,5 @@ +--- +name: TestApp +'Engine::Stomp': + hostname: localhost + port: 61613 diff --git a/testapp/script/testapp_stomp.pl b/testapp/script/testapp_stomp.pl new file mode 100755 index 0000000..e061718 --- /dev/null +++ b/testapp/script/testapp_stomp.pl @@ -0,0 +1,63 @@ +BEGIN { + $ENV{CATALYST_ENGINE} = 'Stomp'; + require Catalyst::Engine::Stomp; +} + +use strict; +use warnings; +use Getopt::Long; +use Pod::Usage; +use FindBin; +use lib "$FindBin::Bin/../lib"; + +my $debug = 0; +my $help = 0; +my $oneshot = 0; + +my @argv = @ARGV; + +GetOptions( + 'debug|d' => \$debug, + 'help|?' => \$help, + 'oneshot' => \$oneshot, +); + +pod2usage(1) if $help; + +if ( $debug ) { + $ENV{CATALYST_DEBUG} = 1; +} + +if ( $oneshot ) { + $ENV{ENGINE_ONESHOT} = 1; +} + +# This is require instead of use so that the above environment +# variables can be set at runtime. +require TestApp; +TestApp->run(); + +1; + +=head1 NAME + +testapp_stomp.pl - Catalyst STOMP client + +=head1 SYNOPSIS + +testapp_stomp.pl [options] + + Options: + -d -debug force debug mode + -? -help display this help and exits + + See also: + perldoc Catalyst::Engine::Stomp + perldoc Catalyst::Manual + perldoc Catalyst::Manual::Intro + +=head1 DESCRIPTION + +Run a Catalyst STOMP client for this application. + +=cut