From: Chris Andrews Date: Sat, 30 May 2009 11:26:37 +0000 (+0100) Subject: Remove our dependency on Catalyst::Action::REST. We were only using it X-Git-Tag: 0.04~3 X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=commitdiff_plain;h=bf8937b70eb1aabe53e63e2e6c70f569056e609a;hp=b044cf9b67638aabfea36c19afee14dabc0b65ba;p=catagits%2FCatalyst-Engine-STOMP.git Remove our dependency on Catalyst::Action::REST. We were only using it for serialize / deserialize, and we might as well do it ourselves. We would use Data::Serializer, but its YAML is YAML.pm, which just doesn't seem to work... when Data::Serializer::YAML_XS shows up, we'll switch to that. Also broke the tests out a bit, and added a test for what happens when the action you invoked crashed. --- diff --git a/Makefile.PL b/Makefile.PL index db989f6..1796805 100644 --- a/Makefile.PL +++ b/Makefile.PL @@ -7,7 +7,6 @@ name 'Catalyst-Engine-Stomp'; all_from 'lib/Catalyst/Engine/Stomp.pm'; requires 'Catalyst::Engine::Embeddable' => '0.0.1'; -requires 'Catalyst::Action::REST' => undef; requires 'Catalyst::Runtime' => '5.80004'; requires 'Moose' => undef; requires 'MooseX::Workers' => '0.05'; diff --git a/README b/README index bf1e24c..00042fe 100644 --- a/README +++ b/README @@ -25,7 +25,6 @@ This module requires these other modules and libraries: YAML::XS Catalyst::Engine::Embeddable Catalyst::Runtime 5.80003 - Catalyst::Action::REST namespace::autoclean The supplied workers script requires MooseX::Workers. diff --git a/lib/Catalyst/Controller/MessageDriven.pm b/lib/Catalyst/Controller/MessageDriven.pm index 0c00a8d..10e645d 100644 --- a/lib/Catalyst/Controller/MessageDriven.pm +++ b/lib/Catalyst/Controller/MessageDriven.pm @@ -1,5 +1,6 @@ package Catalyst::Controller::MessageDriven; use Moose; +use YAML::XS qw/ LoadFile Dump /; BEGIN { extends 'Catalyst::Controller' } @@ -14,7 +15,10 @@ Catalyst::Controller::MessageDriven BEGIN { extends 'Catalyst::Controller::MessageDriven' } sub some_action : Local { - my ($self, $c) = @_; + my ($self, $c, $message) = @_; + + # Handle message + # Reply with a minimal response message my $response = { type => 'testaction_response' }; $c->stash->{response} = $response; @@ -28,19 +32,30 @@ YAML determines the action dispatched to. =cut -__PACKAGE__->config( - 'default' => 'text/x-yaml', - 'stash_key' => 'response', - 'map' => { 'text/x-yaml' => 'YAML' }, - ); - -sub begin :ActionClass('Deserialize') { } +sub begin : Private { + my ($self, $c) = @_; + + # Deserialize the request message + + my $message; + eval { + my $body = $c->request->body; + $message = LoadFile( "$body" ); + }; + if ($@) { + # can't reply - reply_to is embedded in the message + $c->error("exception in deserialize: $@"); + } + else { + $c->stash->{request} = $message; + } +} -sub end :ActionClass('Serialize') { +sub end : Private { my ($self, $c) = @_; # Engine will send our reply based on the value of this header. - $c->response->headers->header( 'X-Reply-Address' => $c->req->data->{reply_to} ); + $c->response->headers->header( 'X-Reply-Address' => $c->stash->{request}->{reply_to} ); # Custom error handler - steal errors from catalyst and dump them into # the stash, to get them serialized out as the reply. @@ -49,15 +64,29 @@ sub end :ActionClass('Serialize') { $c->stash->{response} = { status => 'ERROR', error => $error }; $c->error(0); # clear errors, so our response isn't clobbered } + + # Serialize the response + my $output; + eval { + $output = Dump( $c->stash->{response} ); + }; + if ($@) { + my $error = "exception in serialize: $@"; + $c->error($error); + $c->stash->{response} = { status => 'ERROR', error => $error }; + $output = Dump( $c->stash->{response} ); + } + + $c->response->output( $output ); } sub default : Private { my ($self, $c) = @_; - + # Forward the request to the appropriate action, based on the # message type. - my $action = $c->req->data->{type}; - $c->forward($action, [$c->req->data]); + my $action = $c->stash->{request}->{type}; + $c->forward($action, [$c->stash->{request}]); } __PACKAGE__->meta->make_immutable; diff --git a/lib/Catalyst/Engine/Stomp.pm b/lib/Catalyst/Engine/Stomp.pm index f2c3847..b469d83 100644 --- a/lib/Catalyst/Engine/Stomp.pm +++ b/lib/Catalyst/Engine/Stomp.pm @@ -43,14 +43,6 @@ Catalyst::Engine::Stomp - write message handling apps with Catalyst. $c->stash->{response} = $response; } - # The default serialization is YAML, but this configuration - # may be overridden in your controller: - __PACKAGE__->config( - 'default' => 'text/x-yaml', - 'stash_key' => 'rest', - 'map' => { 'text/x-yaml' => 'YAML' }, - ); - =head1 DESCRIPTION Write a Catalyst app connected to a Stomp messagebroker, not HTTP. You @@ -77,44 +69,44 @@ sub run { my ($self, $app, $oneshot) = @_; die 'No Engine::Stomp configuration found' - unless ref $app->config->{'Engine::Stomp'} eq 'HASH'; + unless ref $app->config->{'Engine::Stomp'} eq 'HASH'; # list the path namespaces that will be mapped as queues. - # - # this is known to use the deprecated - # Dispatcher->action_hash() method, but there doesn't appear - # to be another way to get the relevant strings out. - # - # http://github.com/rafl/catalyst-runtime/commit/5de163f4963d9dbb41d7311ca6f17314091b7af3#L2R644 - # + # + # this is known to use the deprecated + # Dispatcher->action_hash() method, but there doesn't appear + # to be another way to get the relevant strings out. + # + # http://github.com/rafl/catalyst-runtime/commit/5de163f4963d9dbb41d7311ca6f17314091b7af3#L2R644 + # my @queues = - uniq - grep { length $_ } - map { $_->namespace } - values %{$app->dispatcher->action_hash}; + uniq + grep { length $_ } + map { $_->namespace } + values %{$app->dispatcher->action_hash}; - # connect up + # connect up my %template = %{$app->config->{'Engine::Stomp'}}; - $self->connection(Net::Stomp->new(\%template)); - $self->connection->connect(); - $self->conn_desc($template{hostname}.':'.$template{port}); + $self->connection(Net::Stomp->new(\%template)); + $self->connection->connect(); + $self->conn_desc($template{hostname}.':'.$template{port}); - # subscribe, with client ack. + # subscribe, with client ack. foreach my $queue (@queues) { - my $queue_name = "/queue/$queue"; - $self->connection->subscribe({ - destination => $queue_name, - ack => 'client' - }); + my $queue_name = "/queue/$queue"; + $self->connection->subscribe({ + destination => $queue_name, + ack => 'client' + }); } - # enter loop... - while (1) { - my $frame = $self->connection->receive_frame(); - $self->handle_stomp_frame($app, $frame); - last if $ENV{ENGINE_ONESHOT}; - } - exit 0; + # enter loop... + while (1) { + my $frame = $self->connection->receive_frame(); + $self->handle_stomp_frame($app, $frame); + last if $ENV{ENGINE_ONESHOT}; + } + exit 0; } =head2 prepare_request @@ -126,9 +118,9 @@ client IP address. sub prepare_request { my ($self, $c, $req, $res_ref) = @_; - shift @_; - $self->next::method(@_); - $c->req->address($self->conn_desc); + shift @_; + $self->next::method(@_); + $c->req->address($self->conn_desc); } =head2 finalize_headers @@ -139,12 +131,12 @@ Overridden to dump out any errors encountered, since you won't get a =cut sub finalize_headers { - my ($self, $c) = @_; - my $error = join "\n", @{$c->error}; - if ($error) { - $c->log->debug($error); - } - return $self->next::method($c); + my ($self, $c) = @_; + my $error = join "\n", @{$c->error}; + if ($error) { + $c->log->debug($error); + } + return $self->next::method($c); } =head2 handle_stomp_frame @@ -154,18 +146,18 @@ Dispatch according to Stomp frame type. =cut sub handle_stomp_frame { - my ($self, $app, $frame) = @_; - - my $command = $frame->command(); - if ($command eq 'MESSAGE') { - $self->handle_stomp_message($app, $frame); - } - elsif ($command eq 'ERROR') { - $self->handle_stomp_error($app, $frame); - } - else { - $app->log->debug("Got unknown Stomp command: $command"); - } + my ($self, $app, $frame) = @_; + + my $command = $frame->command(); + if ($command eq 'MESSAGE') { + $self->handle_stomp_message($app, $frame); + } + elsif ($command eq 'ERROR') { + $self->handle_stomp_error($app, $frame); + } + else { + $app->log->debug("Got unknown Stomp command: $command"); + } } =head2 handle_stomp_message @@ -175,29 +167,31 @@ Dispatch a Stomp message into the Catalyst app. =cut sub handle_stomp_message { - my ($self, $app, $frame) = @_; + my ($self, $app, $frame) = @_; - # queue -> controller - my $queue = $frame->headers->{destination}; - my ($controller) = $queue =~ m!^/queue/(.*)$!; + # queue -> controller + my $queue = $frame->headers->{destination}; + my ($controller) = $queue =~ m!^/queue/(.*)$!; - # set up request + # set up request my $config = $app->config->{'Engine::Stomp'}; my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller; my $req = HTTP::Request->new(POST => $url); - $req->content($frame->body); - $req->content_length(length $frame->body); + $req->content($frame->body); + $req->content_length(length $frame->body); - # dispatch - my $response; + # dispatch + my $response; $app->handle_request($req, \$response); - # reply - my $reply_queue = '/remote-temp-queue/' . ($response->headers->header('X-Reply-Address')); - $self->connection->send({ destination => $reply_queue, body => $response->content }); + # reply, if header set + if (my $reply_to = $response->headers->header('X-Reply-Address')) { + my $reply_queue = '/remote-temp-queue/' . $reply_to; + $self->connection->send({ destination => $reply_queue, body => $response->content }); + } - # ack the message off the queue now we've replied - $self->connection->ack( { frame => $frame } ); + # ack the message off the queue now we've replied / processed + $self->connection->ack( { frame => $frame } ); } =head2 handle_stomp_error @@ -207,10 +201,10 @@ Log any Stomp error frames we receive. =cut sub handle_stomp_error { - my ($self, $app, $frame) = @_; - - my $error = $frame->headers->{message}; - $app->log->debug("Got Stomp error: $error"); + my ($self, $app, $frame) = @_; + + my $error = $frame->headers->{message}; + $app->log->debug("Got Stomp error: $error"); } __PACKAGE__->meta->make_immutable; diff --git a/t/01-good-message.t b/t/01-good-message.t new file mode 100644 index 0000000..eabb7cb --- /dev/null +++ b/t/01-good-message.t @@ -0,0 +1,46 @@ +use Test::More; + +# Tests which expect a STOMP server like ActiveMQ to exist on +# localhost:61613, which is what you get if you just get the ActiveMQ +# distro and run its out-of-the-box config. + +use Net::Stomp; +use YAML::XS qw/ Dump Load /; +use Data::Dumper; + +use FindBin; +use lib "$FindBin::Bin"; +require 'server.pl'; + +plan tests => 11; + +my $frame = $stomp->connect(); +ok($frame, 'connect to MQ server ok'); + +my $reply_to = sprintf '%s:1', $frame->headers->{session}; +ok($frame->headers->{session}, 'got a session'); +ok(length $reply_to > 2, 'valid-looking reply_to queue'); + +ok($stomp->subscribe( { destination => '/temp-queue/reply' } ), 'subscribe to temp queue'); + +my $message = { + payload => { foo => 1, bar => 2 }, + reply_to => $reply_to, + type => 'testaction', + }; +my $text = Dump($message); +ok($text, 'compose message'); + +$stomp->send( { destination => '/queue/testcontroller', body => $text } ); + +my $reply_frame = $stomp->receive_frame(); +ok($reply_frame, 'got a reply'); +ok($reply_frame->headers->{destination} eq "/remote-temp-queue/$reply_to", 'came to correct temp queue'); +ok($reply_frame->body, 'has a body'); + +my $response = Load($reply_frame->body); +ok($response, 'YAML response ok'); +ok($response->{type} eq 'testaction_response', 'correct type'); + +ok($stomp->disconnect, 'disconnected'); + diff --git a/t/02-bad-action.t b/t/02-bad-action.t new file mode 100644 index 0000000..0b119eb --- /dev/null +++ b/t/02-bad-action.t @@ -0,0 +1,48 @@ +use Test::More; + +# Tests which expect a STOMP server like ActiveMQ to exist on +# localhost:61613, which is what you get if you just get the ActiveMQ +# distro and run its out-of-the-box config. + +use Net::Stomp; +use YAML::XS qw/ Dump Load /; +use Data::Dumper; + +use FindBin; +use lib "$FindBin::Bin"; +require 'server.pl'; + +plan tests => 12; + +my $frame = $stomp->connect(); +ok($frame, 'connect to MQ server ok'); + +my $reply_to = sprintf '%s:1', $frame->headers->{session}; +ok($frame->headers->{session}, 'got a session'); +ok(length $reply_to > 2, 'valid-looking reply_to queue'); + +ok($stomp->subscribe( { destination => '/temp-queue/reply' } ), 'subscribe to temp queue'); + +# Test what happens when the action crashes +my $message = { + payload => { foo => 1, bar => 2 }, + reply_to => $reply_to, + type => 'badaction', + }; +my $text = Dump($message); +ok($text, 'compose message for badaction'); + +$stomp->send( { destination => '/queue/testcontroller', body => $text } ); + +my $reply_frame = $stomp->receive_frame(); +ok($reply_frame, 'got a reply'); +ok($reply_frame->headers->{destination} eq "/remote-temp-queue/$reply_to", 'came to correct temp queue'); +ok($reply_frame->body, 'has a body'); + +my $response = Load($reply_frame->body); +ok($response, 'YAML response ok'); +ok($response->{status} eq 'ERROR', 'is an error'); +ok($response->{error} =~ /oh noes/); + +ok($stomp->disconnect, 'disconnected'); + diff --git a/t/Catalyst-Engine-Stomp.t b/t/Catalyst-Engine-Stomp.t deleted file mode 100644 index ea2e699..0000000 --- a/t/Catalyst-Engine-Stomp.t +++ /dev/null @@ -1,80 +0,0 @@ -use Test::More; - -# Tests which expect a STOMP server like ActiveMQ to exist on -# localhost:61613, which is what you get if you just get the ActiveMQ -# distro and run its out-of-the-box config. - -use Net::Stomp; -use YAML::XS qw/ Dump Load /; -use Data::Dumper; - -use Alien::ActiveMQ; -my $ACTIVEMQ_VERSION = '5.2.0'; - -my ($stomp, $mq); -eval { - $stomp = Net::Stomp->new( { hostname => 'localhost', port => 61613 } ); -}; -if ($@) { - - unless (Alien::ActiveMQ->is_version_installed($ACTIVEMQ_VERSION)) { - plan 'skip_all' => 'No ActiveMQ server installed by Alien::ActiveMQ, try running the "install-activemq" command'; - exit; - } - - $mq = Alien::ActiveMQ->run_server($ACTIVEMQ_VERSION); - - eval { - $stomp = Net::Stomp->new( { hostname => 'localhost', port => 61613 } ); - }; - if ($@) { - plan 'skip_all' => 'No ActiveMQ server listening on 61613: ' . $@; - exit; - } -} - -plan tests => 12; - -# First fire off the server -$SIG{CHLD} = 'IGNORE'; -unless (fork()) { - system("$^X -Ilib -Itestapp/lib testapp/script/stomptestapp_stomp.pl --oneshot"); - exit 0; -} -print STDERR "server started, waiting for spinup..."; -sleep 20; - -# Now be a client to that server -print STDERR "testing\n"; -ok($stomp, 'Net::Stomp object'); - -my $frame = $stomp->connect(); -ok($frame, 'connect to MQ server ok'); - -my $reply_to = sprintf '%s:1', $frame->headers->{session}; -ok($frame->headers->{session}, 'got a session'); -ok(length $reply_to > 2, 'valid-looking reply_to queue'); - -ok($stomp->subscribe( { destination => '/temp-queue/reply' } ), 'subscribe to temp queue'); - -my $message = { - payload => { foo => 1, bar => 2 }, - reply_to => $reply_to, - type => 'testaction', - }; -my $text = Dump($message); -ok($text, 'compose message'); - -$stomp->send( { destination => '/queue/testcontroller', body => $text } ); - -my $reply_frame = $stomp->receive_frame(); -ok($reply_frame, 'got a reply'); -ok($reply_frame->headers->{destination} eq "/remote-temp-queue/$reply_to", 'came to correct temp queue'); -ok($reply_frame->body, 'has a body'); - -my $response = Load($reply_frame->body); -ok($response, 'YAML response ok'); -ok($response->{type} eq 'testaction_response', 'correct type'); - -ok($stomp->disconnect, 'disconnected'); - diff --git a/t/server.pl b/t/server.pl new file mode 100644 index 0000000..981f199 --- /dev/null +++ b/t/server.pl @@ -0,0 +1,33 @@ +use Alien::ActiveMQ; +my $ACTIVEMQ_VERSION = '5.2.0'; + +eval { + $stomp = Net::Stomp->new( { hostname => 'localhost', port => 61613 } ); +}; +if ($@) { + + unless (Alien::ActiveMQ->is_version_installed($ACTIVEMQ_VERSION)) { + plan 'skip_all' => 'No ActiveMQ server installed by Alien::ActiveMQ, try running the "install-activemq" command'; + exit; + } + + $mq = Alien::ActiveMQ->run_server($ACTIVEMQ_VERSION); + + eval { + $stomp = Net::Stomp->new( { hostname => 'localhost', port => 61613 } ); + }; + if ($@) { + plan 'skip_all' => 'No ActiveMQ server listening on 61613: ' . $@; + exit; + } +} + +# First fire off the server +$SIG{CHLD} = 'IGNORE'; +unless (fork()) { + system("$^X -Ilib -Itestapp/lib testapp/script/stomptestapp_stomp.pl --oneshot"); + exit 0; +} +print STDERR "server started, waiting for spinup..."; +sleep 2; + diff --git a/testapp/lib/StompTestApp/Controller/TestController.pm b/testapp/lib/StompTestApp/Controller/TestController.pm index a3b83b2..d086637 100644 --- a/testapp/lib/StompTestApp/Controller/TestController.pm +++ b/testapp/lib/StompTestApp/Controller/TestController.pm @@ -6,11 +6,16 @@ use namespace::autoclean; BEGIN { extends 'Catalyst::Controller::MessageDriven' }; sub testaction : Local { - my ($self, $c) = @_; + my ($self, $c, $request) = @_; # Reply with a minimal response message my $response = { type => 'testaction_response' }; $c->stash->{response} = $response; } +sub badaction : Local { + my ($self, $c, $request) = @_; + die "oh noes"; +} + 1;