for serialize / deserialize, and we might as well do it ourselves.
We would use Data::Serializer, but its YAML is YAML.pm, which just
doesn't seem to work... when Data::Serializer::YAML_XS shows up, we'll
switch to that.
Also broke the tests out a bit, and added a test for what happens when
the action you invoked crashed.
all_from 'lib/Catalyst/Engine/Stomp.pm';
requires 'Catalyst::Engine::Embeddable' => '0.0.1';
-requires 'Catalyst::Action::REST' => undef;
requires 'Catalyst::Runtime' => '5.80004';
requires 'Moose' => undef;
requires 'MooseX::Workers' => '0.05';
YAML::XS
Catalyst::Engine::Embeddable
Catalyst::Runtime 5.80003
- Catalyst::Action::REST
namespace::autoclean
The supplied workers script requires MooseX::Workers.
package Catalyst::Controller::MessageDriven;
use Moose;
+use YAML::XS qw/ LoadFile Dump /;
BEGIN { extends 'Catalyst::Controller' }
BEGIN { extends 'Catalyst::Controller::MessageDriven' }
sub some_action : Local {
- my ($self, $c) = @_;
+ my ($self, $c, $message) = @_;
+
+ # Handle message
+
# Reply with a minimal response message
my $response = { type => 'testaction_response' };
$c->stash->{response} = $response;
=cut
-__PACKAGE__->config(
- 'default' => 'text/x-yaml',
- 'stash_key' => 'response',
- 'map' => { 'text/x-yaml' => 'YAML' },
- );
-
-sub begin :ActionClass('Deserialize') { }
+sub begin : Private {
+ my ($self, $c) = @_;
+
+ # Deserialize the request message
+
+ my $message;
+ eval {
+ my $body = $c->request->body;
+ $message = LoadFile( "$body" );
+ };
+ if ($@) {
+ # can't reply - reply_to is embedded in the message
+ $c->error("exception in deserialize: $@");
+ }
+ else {
+ $c->stash->{request} = $message;
+ }
+}
-sub end :ActionClass('Serialize') {
+sub end : Private {
my ($self, $c) = @_;
# Engine will send our reply based on the value of this header.
- $c->response->headers->header( 'X-Reply-Address' => $c->req->data->{reply_to} );
+ $c->response->headers->header( 'X-Reply-Address' => $c->stash->{request}->{reply_to} );
# Custom error handler - steal errors from catalyst and dump them into
# the stash, to get them serialized out as the reply.
$c->stash->{response} = { status => 'ERROR', error => $error };
$c->error(0); # clear errors, so our response isn't clobbered
}
+
+ # Serialize the response
+ my $output;
+ eval {
+ $output = Dump( $c->stash->{response} );
+ };
+ if ($@) {
+ my $error = "exception in serialize: $@";
+ $c->error($error);
+ $c->stash->{response} = { status => 'ERROR', error => $error };
+ $output = Dump( $c->stash->{response} );
+ }
+
+ $c->response->output( $output );
}
sub default : Private {
my ($self, $c) = @_;
-
+
# Forward the request to the appropriate action, based on the
# message type.
- my $action = $c->req->data->{type};
- $c->forward($action, [$c->req->data]);
+ my $action = $c->stash->{request}->{type};
+ $c->forward($action, [$c->stash->{request}]);
}
__PACKAGE__->meta->make_immutable;
$c->stash->{response} = $response;
}
- # The default serialization is YAML, but this configuration
- # may be overridden in your controller:
- __PACKAGE__->config(
- 'default' => 'text/x-yaml',
- 'stash_key' => 'rest',
- 'map' => { 'text/x-yaml' => 'YAML' },
- );
-
=head1 DESCRIPTION
Write a Catalyst app connected to a Stomp messagebroker, not HTTP. You
my ($self, $app, $oneshot) = @_;
die 'No Engine::Stomp configuration found'
- unless ref $app->config->{'Engine::Stomp'} eq 'HASH';
+ unless ref $app->config->{'Engine::Stomp'} eq 'HASH';
# list the path namespaces that will be mapped as queues.
- #
- # this is known to use the deprecated
- # Dispatcher->action_hash() method, but there doesn't appear
- # to be another way to get the relevant strings out.
- #
- # http://github.com/rafl/catalyst-runtime/commit/5de163f4963d9dbb41d7311ca6f17314091b7af3#L2R644
- #
+ #
+ # this is known to use the deprecated
+ # Dispatcher->action_hash() method, but there doesn't appear
+ # to be another way to get the relevant strings out.
+ #
+ # http://github.com/rafl/catalyst-runtime/commit/5de163f4963d9dbb41d7311ca6f17314091b7af3#L2R644
+ #
my @queues =
- uniq
- grep { length $_ }
- map { $_->namespace }
- values %{$app->dispatcher->action_hash};
+ uniq
+ grep { length $_ }
+ map { $_->namespace }
+ values %{$app->dispatcher->action_hash};
- # connect up
+ # connect up
my %template = %{$app->config->{'Engine::Stomp'}};
- $self->connection(Net::Stomp->new(\%template));
- $self->connection->connect();
- $self->conn_desc($template{hostname}.':'.$template{port});
+ $self->connection(Net::Stomp->new(\%template));
+ $self->connection->connect();
+ $self->conn_desc($template{hostname}.':'.$template{port});
- # subscribe, with client ack.
+ # subscribe, with client ack.
foreach my $queue (@queues) {
- my $queue_name = "/queue/$queue";
- $self->connection->subscribe({
- destination => $queue_name,
- ack => 'client'
- });
+ my $queue_name = "/queue/$queue";
+ $self->connection->subscribe({
+ destination => $queue_name,
+ ack => 'client'
+ });
}
- # enter loop...
- while (1) {
- my $frame = $self->connection->receive_frame();
- $self->handle_stomp_frame($app, $frame);
- last if $ENV{ENGINE_ONESHOT};
- }
- exit 0;
+ # enter loop...
+ while (1) {
+ my $frame = $self->connection->receive_frame();
+ $self->handle_stomp_frame($app, $frame);
+ last if $ENV{ENGINE_ONESHOT};
+ }
+ exit 0;
}
=head2 prepare_request
sub prepare_request {
my ($self, $c, $req, $res_ref) = @_;
- shift @_;
- $self->next::method(@_);
- $c->req->address($self->conn_desc);
+ shift @_;
+ $self->next::method(@_);
+ $c->req->address($self->conn_desc);
}
=head2 finalize_headers
=cut
sub finalize_headers {
- my ($self, $c) = @_;
- my $error = join "\n", @{$c->error};
- if ($error) {
- $c->log->debug($error);
- }
- return $self->next::method($c);
+ my ($self, $c) = @_;
+ my $error = join "\n", @{$c->error};
+ if ($error) {
+ $c->log->debug($error);
+ }
+ return $self->next::method($c);
}
=head2 handle_stomp_frame
=cut
sub handle_stomp_frame {
- my ($self, $app, $frame) = @_;
-
- my $command = $frame->command();
- if ($command eq 'MESSAGE') {
- $self->handle_stomp_message($app, $frame);
- }
- elsif ($command eq 'ERROR') {
- $self->handle_stomp_error($app, $frame);
- }
- else {
- $app->log->debug("Got unknown Stomp command: $command");
- }
+ my ($self, $app, $frame) = @_;
+
+ my $command = $frame->command();
+ if ($command eq 'MESSAGE') {
+ $self->handle_stomp_message($app, $frame);
+ }
+ elsif ($command eq 'ERROR') {
+ $self->handle_stomp_error($app, $frame);
+ }
+ else {
+ $app->log->debug("Got unknown Stomp command: $command");
+ }
}
=head2 handle_stomp_message
=cut
sub handle_stomp_message {
- my ($self, $app, $frame) = @_;
+ my ($self, $app, $frame) = @_;
- # queue -> controller
- my $queue = $frame->headers->{destination};
- my ($controller) = $queue =~ m!^/queue/(.*)$!;
+ # queue -> controller
+ my $queue = $frame->headers->{destination};
+ my ($controller) = $queue =~ m!^/queue/(.*)$!;
- # set up request
+ # set up request
my $config = $app->config->{'Engine::Stomp'};
my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller;
my $req = HTTP::Request->new(POST => $url);
- $req->content($frame->body);
- $req->content_length(length $frame->body);
+ $req->content($frame->body);
+ $req->content_length(length $frame->body);
- # dispatch
- my $response;
+ # dispatch
+ my $response;
$app->handle_request($req, \$response);
- # reply
- my $reply_queue = '/remote-temp-queue/' . ($response->headers->header('X-Reply-Address'));
- $self->connection->send({ destination => $reply_queue, body => $response->content });
+ # reply, if header set
+ if (my $reply_to = $response->headers->header('X-Reply-Address')) {
+ my $reply_queue = '/remote-temp-queue/' . $reply_to;
+ $self->connection->send({ destination => $reply_queue, body => $response->content });
+ }
- # ack the message off the queue now we've replied
- $self->connection->ack( { frame => $frame } );
+ # ack the message off the queue now we've replied / processed
+ $self->connection->ack( { frame => $frame } );
}
=head2 handle_stomp_error
=cut
sub handle_stomp_error {
- my ($self, $app, $frame) = @_;
-
- my $error = $frame->headers->{message};
- $app->log->debug("Got Stomp error: $error");
+ my ($self, $app, $frame) = @_;
+
+ my $error = $frame->headers->{message};
+ $app->log->debug("Got Stomp error: $error");
}
__PACKAGE__->meta->make_immutable;
--- /dev/null
+use Test::More;
+
+# Tests which expect a STOMP server like ActiveMQ to exist on
+# localhost:61613, which is what you get if you just get the ActiveMQ
+# distro and run its out-of-the-box config.
+
+use Net::Stomp;
+use YAML::XS qw/ Dump Load /;
+use Data::Dumper;
+
+use FindBin;
+use lib "$FindBin::Bin";
+require 'server.pl';
+
+plan tests => 11;
+
+my $frame = $stomp->connect();
+ok($frame, 'connect to MQ server ok');
+
+my $reply_to = sprintf '%s:1', $frame->headers->{session};
+ok($frame->headers->{session}, 'got a session');
+ok(length $reply_to > 2, 'valid-looking reply_to queue');
+
+ok($stomp->subscribe( { destination => '/temp-queue/reply' } ), 'subscribe to temp queue');
+
+my $message = {
+ payload => { foo => 1, bar => 2 },
+ reply_to => $reply_to,
+ type => 'testaction',
+ };
+my $text = Dump($message);
+ok($text, 'compose message');
+
+$stomp->send( { destination => '/queue/testcontroller', body => $text } );
+
+my $reply_frame = $stomp->receive_frame();
+ok($reply_frame, 'got a reply');
+ok($reply_frame->headers->{destination} eq "/remote-temp-queue/$reply_to", 'came to correct temp queue');
+ok($reply_frame->body, 'has a body');
+
+my $response = Load($reply_frame->body);
+ok($response, 'YAML response ok');
+ok($response->{type} eq 'testaction_response', 'correct type');
+
+ok($stomp->disconnect, 'disconnected');
+
--- /dev/null
+use Test::More;
+
+# Tests which expect a STOMP server like ActiveMQ to exist on
+# localhost:61613, which is what you get if you just get the ActiveMQ
+# distro and run its out-of-the-box config.
+
+use Net::Stomp;
+use YAML::XS qw/ Dump Load /;
+use Data::Dumper;
+
+use FindBin;
+use lib "$FindBin::Bin";
+require 'server.pl';
+
+plan tests => 12;
+
+my $frame = $stomp->connect();
+ok($frame, 'connect to MQ server ok');
+
+my $reply_to = sprintf '%s:1', $frame->headers->{session};
+ok($frame->headers->{session}, 'got a session');
+ok(length $reply_to > 2, 'valid-looking reply_to queue');
+
+ok($stomp->subscribe( { destination => '/temp-queue/reply' } ), 'subscribe to temp queue');
+
+# Test what happens when the action crashes
+my $message = {
+ payload => { foo => 1, bar => 2 },
+ reply_to => $reply_to,
+ type => 'badaction',
+ };
+my $text = Dump($message);
+ok($text, 'compose message for badaction');
+
+$stomp->send( { destination => '/queue/testcontroller', body => $text } );
+
+my $reply_frame = $stomp->receive_frame();
+ok($reply_frame, 'got a reply');
+ok($reply_frame->headers->{destination} eq "/remote-temp-queue/$reply_to", 'came to correct temp queue');
+ok($reply_frame->body, 'has a body');
+
+my $response = Load($reply_frame->body);
+ok($response, 'YAML response ok');
+ok($response->{status} eq 'ERROR', 'is an error');
+ok($response->{error} =~ /oh noes/);
+
+ok($stomp->disconnect, 'disconnected');
+
+++ /dev/null
-use Test::More;
-
-# Tests which expect a STOMP server like ActiveMQ to exist on
-# localhost:61613, which is what you get if you just get the ActiveMQ
-# distro and run its out-of-the-box config.
-
-use Net::Stomp;
-use YAML::XS qw/ Dump Load /;
-use Data::Dumper;
-
-use Alien::ActiveMQ;
-my $ACTIVEMQ_VERSION = '5.2.0';
-
-my ($stomp, $mq);
-eval {
- $stomp = Net::Stomp->new( { hostname => 'localhost', port => 61613 } );
-};
-if ($@) {
-
- unless (Alien::ActiveMQ->is_version_installed($ACTIVEMQ_VERSION)) {
- plan 'skip_all' => 'No ActiveMQ server installed by Alien::ActiveMQ, try running the "install-activemq" command';
- exit;
- }
-
- $mq = Alien::ActiveMQ->run_server($ACTIVEMQ_VERSION);
-
- eval {
- $stomp = Net::Stomp->new( { hostname => 'localhost', port => 61613 } );
- };
- if ($@) {
- plan 'skip_all' => 'No ActiveMQ server listening on 61613: ' . $@;
- exit;
- }
-}
-
-plan tests => 12;
-
-# First fire off the server
-$SIG{CHLD} = 'IGNORE';
-unless (fork()) {
- system("$^X -Ilib -Itestapp/lib testapp/script/stomptestapp_stomp.pl --oneshot");
- exit 0;
-}
-print STDERR "server started, waiting for spinup...";
-sleep 20;
-
-# Now be a client to that server
-print STDERR "testing\n";
-ok($stomp, 'Net::Stomp object');
-
-my $frame = $stomp->connect();
-ok($frame, 'connect to MQ server ok');
-
-my $reply_to = sprintf '%s:1', $frame->headers->{session};
-ok($frame->headers->{session}, 'got a session');
-ok(length $reply_to > 2, 'valid-looking reply_to queue');
-
-ok($stomp->subscribe( { destination => '/temp-queue/reply' } ), 'subscribe to temp queue');
-
-my $message = {
- payload => { foo => 1, bar => 2 },
- reply_to => $reply_to,
- type => 'testaction',
- };
-my $text = Dump($message);
-ok($text, 'compose message');
-
-$stomp->send( { destination => '/queue/testcontroller', body => $text } );
-
-my $reply_frame = $stomp->receive_frame();
-ok($reply_frame, 'got a reply');
-ok($reply_frame->headers->{destination} eq "/remote-temp-queue/$reply_to", 'came to correct temp queue');
-ok($reply_frame->body, 'has a body');
-
-my $response = Load($reply_frame->body);
-ok($response, 'YAML response ok');
-ok($response->{type} eq 'testaction_response', 'correct type');
-
-ok($stomp->disconnect, 'disconnected');
-
--- /dev/null
+use Alien::ActiveMQ;
+my $ACTIVEMQ_VERSION = '5.2.0';
+
+eval {
+ $stomp = Net::Stomp->new( { hostname => 'localhost', port => 61613 } );
+};
+if ($@) {
+
+ unless (Alien::ActiveMQ->is_version_installed($ACTIVEMQ_VERSION)) {
+ plan 'skip_all' => 'No ActiveMQ server installed by Alien::ActiveMQ, try running the "install-activemq" command';
+ exit;
+ }
+
+ $mq = Alien::ActiveMQ->run_server($ACTIVEMQ_VERSION);
+
+ eval {
+ $stomp = Net::Stomp->new( { hostname => 'localhost', port => 61613 } );
+ };
+ if ($@) {
+ plan 'skip_all' => 'No ActiveMQ server listening on 61613: ' . $@;
+ exit;
+ }
+}
+
+# First fire off the server
+$SIG{CHLD} = 'IGNORE';
+unless (fork()) {
+ system("$^X -Ilib -Itestapp/lib testapp/script/stomptestapp_stomp.pl --oneshot");
+ exit 0;
+}
+print STDERR "server started, waiting for spinup...";
+sleep 2;
+
BEGIN { extends 'Catalyst::Controller::MessageDriven' };
sub testaction : Local {
- my ($self, $c) = @_;
+ my ($self, $c, $request) = @_;
# Reply with a minimal response message
my $response = { type => 'testaction_response' };
$c->stash->{response} = $response;
}
+sub badaction : Local {
+ my ($self, $c, $request) = @_;
+ die "oh noes";
+}
+
1;