Initial import of Catalyst::Engine::Stomp
Chris Andrews [Thu, 30 Apr 2009 15:38:30 +0000 (16:38 +0100)]
Changes [new file with mode: 0644]
MANIFEST [new file with mode: 0644]
Makefile.PL [new file with mode: 0644]
README [new file with mode: 0644]
lib/Catalyst/Controller/MessageDriven.pm [new file with mode: 0644]
lib/Catalyst/Engine/Stomp.pm [new file with mode: 0644]
t/Catalyst-Engine-Stomp.t [new file with mode: 0644]
testapp/lib/TestApp.pm [new file with mode: 0644]
testapp/lib/TestApp/Controller/TestController.pm [new file with mode: 0644]
testapp/lib/TestApp/testapp.yml [new file with mode: 0644]
testapp/script/testapp_stomp.pl [new file with mode: 0755]

diff --git a/Changes b/Changes
new file mode 100644 (file)
index 0000000..958f7e6
--- /dev/null
+++ b/Changes
@@ -0,0 +1,6 @@
+Revision history for Perl extension Catalyst::Engine::Stomp.
+
+0.01  Tue Dec 16 10:15:20 2008
+       - original version; created by h2xs 1.23 with options
+               -n Catalyst::Engine::Stomp
+
diff --git a/MANIFEST b/MANIFEST
new file mode 100644 (file)
index 0000000..245cfe2
--- /dev/null
+++ b/MANIFEST
@@ -0,0 +1,12 @@
+Changes
+Makefile.PL
+MANIFEST
+README
+t/Catalyst-Engine-Stomp.t
+lib/Catalyst/Engine/Stomp.pm
+lib/Catalyst/Controller/MessageDriven.pm
+testapp/lib/TestApp/Controller/TestController.pm
+testapp/lib/TestApp/testapp.yml
+testapp/lib/TestApp.pm
+testapp/script/testapp_stomp.pl
+
diff --git a/Makefile.PL b/Makefile.PL
new file mode 100644 (file)
index 0000000..fb84532
--- /dev/null
@@ -0,0 +1,20 @@
+use 5.008008;
+use ExtUtils::MakeMaker;
+# See lib/ExtUtils/MakeMaker.pm for details of how to influence
+# the contents of the Makefile that is written.
+WriteMakefile(
+    NAME              => 'Catalyst::Engine::Stomp',
+    VERSION_FROM      => 'lib/Catalyst/Engine/Stomp.pm', # finds $VERSION
+    PREREQ_PM         => {
+                         Catalyst::Engine::Embeddable => 0.0.1,
+                         Moose => 0,
+                         Net::Stomp => 0.34,
+                         YAML::XS => 0.32,
+                        }, # e.g., Module::Name => 1.1
+    ($] >= 5.005 ?     ## Add these new keywords supported since 5.005
+      (ABSTRACT_FROM  => 'lib/Catalyst/Engine/Stomp.pm', # retrieve abstract from module
+       AUTHOR         => 'Chris Andrews <chris@nodnol.org>') : ()),
+    LIBS              => [''], # e.g., '-lm'
+    DEFINE            => '', # e.g., '-DHAVE_SOMETHING'
+    INC               => '-I.', # e.g., '-I. -I/usr/include/other'
+);
diff --git a/README b/README
new file mode 100644 (file)
index 0000000..00e5819
--- /dev/null
+++ b/README
@@ -0,0 +1,40 @@
+Catalyst-Engine-Stomp version 0.01
+==================================
+
+The README is used to introduce the module and provide instructions on
+how to install the module, any machine dependencies it may have (for
+example C compilers and installed libraries) and any other information
+that should be provided before the module is installed.
+
+A README file is required for CPAN modules since CPAN extracts the
+README file from a module distribution so that people browsing the
+archive can use it get an idea of the modules uses. It is usually a
+good idea to provide version information here so that people can
+decide whether fixes for the module are worth downloading.
+
+INSTALLATION
+
+To install this module type the following:
+
+   perl Makefile.PL
+   make
+   make test
+   make install
+
+DEPENDENCIES
+
+This module requires these other modules and libraries:
+
+  blah blah blah
+
+COPYRIGHT AND LICENCE
+
+Put the correct copyright and licence information here.
+
+Copyright (C) 2008 by Chris Andrews
+
+This library is free software; you can redistribute it and/or modify
+it under the same terms as Perl itself, either Perl version 5.8.8 or,
+at your option, any later version of Perl 5 you may have available.
+
+
diff --git a/lib/Catalyst/Controller/MessageDriven.pm b/lib/Catalyst/Controller/MessageDriven.pm
new file mode 100644 (file)
index 0000000..fba221d
--- /dev/null
@@ -0,0 +1,40 @@
+package Catalyst::Controller::MessageDriven;
+use Moose;
+
+BEGIN { extends 'Catalyst::Controller' }
+
+__PACKAGE__->config(
+                   'default'   => 'text/x-yaml',
+                   'stash_key' => 'response',
+                   'map'       => { 'text/x-yaml' => 'YAML' },
+                  );
+
+sub begin :ActionClass('Deserialize') { }
+
+sub end :ActionClass('Serialize') {
+       my ($self, $c) = @_;
+
+       # Engine will send our reply based on the value of this header.
+       $c->response->headers->header( 'X-Reply-Address' => $c->req->data->{reply_to} );
+
+       # Custom error handler - steal errors from catalyst and dump them into
+       # the stash, to get them serialized out as the reply.
+       if (scalar @{$c->error}) {
+               my $error = join "\n", @{$c->error};
+               $c->stash->{response} = { status => 'ERROR', error => $error };
+               $c->error(0); # clear errors, so our response isn't clobbered
+       }
+}
+
+sub default : Private {
+       my ($self, $c) = @_;
+       
+       # Forward the request to the appropriate action, based on the
+       # message type.
+       my $action = $c->req->data->{type};
+       $c->forward($action, [$c->req->data]);
+}
+
+__PACKAGE__->meta->make_immutable;
+
+1;
diff --git a/lib/Catalyst/Engine/Stomp.pm b/lib/Catalyst/Engine/Stomp.pm
new file mode 100644 (file)
index 0000000..12238a4
--- /dev/null
@@ -0,0 +1,219 @@
+package Catalyst::Engine::Stomp;
+use Moose;
+extends 'Catalyst::Engine::Embeddable';
+
+our $VERSION = '0.01';
+
+use List::MoreUtils qw/ uniq /;
+use HTTP::Request;
+use Net::Stomp;
+
+has connection => (is => 'rw', isa => 'Net::Stomp');
+has conn_desc => (is => 'rw', isa => 'Str');
+
+=head1 NAME
+
+Catalyst::Engine::Stomp - write message handling apps with Catalyst.
+
+=head1 SYNOPSIS
+
+  # In a server script:
+
+  BEGIN {
+    $ENV{CATALYST_ENGINE} = 'Stomp';
+    require Catalyst::Engine::Stomp;
+  }  
+
+  MyApp->config->{Engine::Stomp} =
+   {
+     hostname => '127.0.0.1',
+     port     => 61613,
+   };
+  MyApp->run();
+
+  # In a controller, or controller base class:
+
+  use YAML;
+
+  # configure YAML deserialization; requires Catalyst::Action::REST
+  __PACKAGE__->config(
+                   'default'   => 'text/x-yaml',
+                   'stash_key' => 'rest',
+                   'map'       => { 'text/x-yaml' => 'YAML' },
+                  );
+
+  sub begin :ActionClass('Deserialize') { }
+
+  # have a default action, which forwards to the correct action
+  # based on the message contents (the type).
+  sub default : Private {
+         my ($self, $c) = @_;
+
+         my $action = $c->req->data->{type};
+         $c->forward($action);
+  }  
+
+  # Send messages back:
+  $c->engine->send_message($queue, Dump($msg));
+
+=head1 DESCRIPTION
+
+Write a Catalyst app connected to a Stomp messagebroker, not HTTP. You
+need a controller that understands messaging, as well as this engine. 
+
+This is single-threaded and single process - you need to run multiple
+instances of this engine to get concurrency, and configure your broker
+to load-balance across multiple consumers of the same queue.
+
+=head1 METHODS
+
+=head2 run
+
+App entry point. Starts a loop listening for messages.
+
+=cut
+
+sub run {
+        my ($self, $app, $oneshot) = @_;
+
+        die 'No Engine::Stomp configuration found'
+            unless ref $app->config->{'Engine::Stomp'} eq 'HASH';
+
+        # list the path namespaces that will be mapped as queues.
+       #
+       # this is known to use the deprecated
+       # Dispatcher->action_hash() method, but there doesn't appear
+       # to be another way to get the relevant strings out.
+       #
+       # http://github.com/rafl/catalyst-runtime/commit/5de163f4963d9dbb41d7311ca6f17314091b7af3#L2R644
+       #
+        my @queues =
+           uniq
+           grep { length $_ }
+           map  { $_->namespace }
+           values %{$app->dispatcher->action_hash};
+
+       # connect up
+        my %template = %{$app->config->{'Engine::Stomp'}};
+       $self->connection(Net::Stomp->new(\%template));
+       $self->connection->connect();
+       $self->conn_desc($template{hostname}.':'.$template{port});
+
+       # subscribe, with client ack.
+        foreach my $queue (@queues) {
+               my $queue_name = "/queue/$queue";
+               $self->connection->subscribe({
+                                             destination => $queue_name, 
+                                             ack         => 'client' 
+                                            });
+        }
+
+       # enter loop...
+       while (1) {
+               my $frame = $self->connection->receive_frame();
+               $self->handle_stomp_frame($app, $frame);
+               last if $ENV{ENGINE_ONESHOT};
+       }
+       exit 0;
+}
+
+=head2 prepare_request
+
+Overridden to add the source broker to the request, in place of the
+client IP address.
+
+=cut
+
+sub prepare_request {
+        my ($self, $c, $req, $res_ref) = @_;
+       shift @_;
+       $self->next::method(@_);
+       $c->req->address($self->conn_desc);
+}
+
+=head2 finalize_headers
+
+Overridden to dump out any errors encountered.
+
+=cut
+
+sub finalize_headers {
+       my ($self, $c) = @_;
+       my $error = join "\n", @{$c->error};
+       if ($error) {
+               $c->log->debug($error);
+       }
+       return $self->next::method($c);
+}
+
+=head2 handle_stomp_frame
+
+Dispatch according to STOMP frame type.
+
+=cut
+
+sub handle_stomp_frame {
+       my ($self, $app, $frame) = @_;
+
+       my $command = $frame->command();
+       $app->log->debug("Got STOMP command: $command");
+       
+       if ($command eq 'MESSAGE') {
+               $self->handle_stomp_message($app, $frame);
+       }
+       elsif ($command eq 'ERROR') {
+               $self->handle_stomp_error($app, $frame);
+       }
+       else {
+               $app->log->debug("Got unknown STOMP command: $command");
+       }
+}
+
+=head2 handle_stomp_message
+
+Dispatch a STOMP message into the Catalyst app.
+
+=cut
+
+sub handle_stomp_message {
+       my ($self, $app, $frame) = @_;
+
+       # queue -> controller
+       my $queue = $frame->headers->{destination};
+       my ($controller) = $queue =~ m!^/queue/(.*)$!;
+
+       # set up request
+        my $config = $app->config->{'Engine::Stomp'};
+        my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller;
+        my $req = HTTP::Request->new(POST => $url);
+        $req->content($frame->body);   
+       $req->content_length(length $frame->body);
+
+       # dispatch
+       my $response;
+        $app->handle_request($req, \$response);
+
+       # reply
+       my $reply_queue = '/remote-temp-queue/' . ($response->headers->header('X-Reply-Address'));
+       $app->log->debug("replying to $reply_queue\n");
+       $self->connection->send({ destination => $reply_queue, body => $response->content });
+
+       # ack the message off the queue now we've replied
+       $self->connection->ack( { frame => $frame } );
+}
+
+=head2 handle_stomp_error
+
+Log any STOMP error frames we receive.
+
+=cut
+
+sub handle_stomp_error {
+       my ($self, $app, $frame) = @_;
+       
+       my $error = $frame->headers->{message};
+       $app->log->debug("Got STOMP error: $error");
+}
+
+1;
+
diff --git a/t/Catalyst-Engine-Stomp.t b/t/Catalyst-Engine-Stomp.t
new file mode 100644 (file)
index 0000000..e96c028
--- /dev/null
@@ -0,0 +1,52 @@
+use Test::More tests => 14;
+use Test::Fork;
+
+# Tests which expect a STOMP server like ActiveMQ to exist on
+# localhost:61613, which is what you get if you just get the ActiveMQ
+# distro and run its out-of-the-box config.
+
+use Net::Stomp;
+use YAML::XS qw/ Dump Load /;
+use Data::Dumper;
+
+# First fire off the server
+my $child_pid = fork_ok(1, sub {
+  ok(system("$^X -Ilib -Itestapp/lib testapp/script/testapp_stomp.pl --oneshot"));
+  #sleep 3;
+});
+
+# Now be a client to that server
+
+my $stomp = Net::Stomp->new( { hostname => 'localhost', port => 61613 } );
+ok($stomp, 'Net::Stomp object');
+
+my $frame = $stomp->connect();
+ok($frame, 'connect to MQ server ok');
+
+my $reply_to = sprintf '%s:1', $frame->headers->{session};
+ok($frame->headers->{session}, 'got a session');
+ok(length $reply_to > 2, 'valid-looking reply_to queue');
+
+ok($stomp->subscribe( { destination => '/temp-queue/reply' } ), 'subscribe to temp queue');
+
+my $message = {
+              payload => { foo => 1, bar => 2 },
+              reply_to => $reply_to,
+              type => 'testaction',
+             };
+my $text = Dump($message);
+ok($text, 'compose message');
+
+$stomp->send( { destination => '/queue/testcontroller', body => $text } );
+
+my $reply_frame = $stomp->receive_frame();
+ok($reply_frame, 'got a reply');
+ok($reply_frame->headers->{destination} eq "/remote-temp-queue/$reply_to", 'came to correct temp queue');
+ok($reply_frame->body, 'has a body');
+
+my $response = Load($reply_frame->body);
+ok($response, 'YAML response ok');
+ok($response->{type} eq 'testaction_response', 'correct type');
+
+ok($stomp->disconnect, 'disconnected');
+
diff --git a/testapp/lib/TestApp.pm b/testapp/lib/TestApp.pm
new file mode 100644 (file)
index 0000000..c28f6ac
--- /dev/null
@@ -0,0 +1,17 @@
+package TestApp;
+use Moose;
+use Catalyst::Runtime '5.80002';
+
+use Catalyst qw/-Debug
+                ConfigLoader
+              /;
+
+extends 'Catalyst';
+
+our $VERSION = '0.01';
+
+__PACKAGE__->config( name => 'TestApp' );
+__PACKAGE__->setup();
+__PACKAGE__->meta->make_immutable;
+
+1;
diff --git a/testapp/lib/TestApp/Controller/TestController.pm b/testapp/lib/TestApp/Controller/TestController.pm
new file mode 100644 (file)
index 0000000..75904f6
--- /dev/null
@@ -0,0 +1,14 @@
+package TestApp::Controller::TestController;
+use Moose;
+
+BEGIN { extends 'Catalyst::Controller::MessageDriven' };
+
+sub testaction : Local {
+    my ($self, $c) = @_;
+
+    # Reply with a minimal response message
+    my $response = { type => 'testaction_response' };
+    $c->stash->{response} = $response;
+}
+
+1;
diff --git a/testapp/lib/TestApp/testapp.yml b/testapp/lib/TestApp/testapp.yml
new file mode 100644 (file)
index 0000000..775c112
--- /dev/null
@@ -0,0 +1,5 @@
+---
+name: TestApp
+'Engine::Stomp':
+  hostname: localhost
+  port: 61613
diff --git a/testapp/script/testapp_stomp.pl b/testapp/script/testapp_stomp.pl
new file mode 100755 (executable)
index 0000000..e061718
--- /dev/null
@@ -0,0 +1,63 @@
+BEGIN { 
+       $ENV{CATALYST_ENGINE} = 'Stomp';
+       require Catalyst::Engine::Stomp;
+}  
+
+use strict;
+use warnings;
+use Getopt::Long;
+use Pod::Usage;
+use FindBin;
+use lib "$FindBin::Bin/../lib";
+
+my $debug   = 0;
+my $help    = 0;
+my $oneshot = 0;
+
+my @argv = @ARGV;
+
+GetOptions(
+    'debug|d' => \$debug,
+    'help|?'  => \$help,
+    'oneshot' => \$oneshot,
+);
+
+pod2usage(1) if $help;
+
+if ( $debug ) {
+       $ENV{CATALYST_DEBUG} = 1;
+}
+
+if ( $oneshot ) { 
+       $ENV{ENGINE_ONESHOT} = 1;
+}      
+
+# This is require instead of use so that the above environment
+# variables can be set at runtime.
+require TestApp;
+TestApp->run();
+
+1;
+
+=head1 NAME
+
+testapp_stomp.pl - Catalyst STOMP client
+
+=head1 SYNOPSIS
+
+testapp_stomp.pl [options]
+
+ Options:
+   -d -debug          force debug mode
+   -? -help           display this help and exits
+
+ See also:
+   perldoc Catalyst::Engine::Stomp
+   perldoc Catalyst::Manual
+   perldoc Catalyst::Manual::Intro
+
+=head1 DESCRIPTION
+
+Run a Catalyst STOMP client for this application.
+
+=cut