X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=blobdiff_plain;f=lib%2FCatalyst%2FEngine%2FStomp.pm;h=5ac0d1df3205dec61720468753cb56ebd85a714b;hb=491ffbb309f6ef16c9fc47112d42c6144c2403c5;hp=20da9d237aa8c9f9e20cd24a149d0396a530f73b;hpb=6e81857d01f942e507b6ab2be95d43e56de25b65;p=catagits%2FCatalyst-Engine-STOMP.git diff --git a/lib/Catalyst/Engine/Stomp.pm b/lib/Catalyst/Engine/Stomp.pm index 20da9d2..5ac0d1d 100644 --- a/lib/Catalyst/Engine/Stomp.pm +++ b/lib/Catalyst/Engine/Stomp.pm @@ -1,15 +1,17 @@ package Catalyst::Engine::Stomp; use Moose; -extends 'Catalyst::Engine::Embeddable'; - -our $VERSION = '0.03'; - use List::MoreUtils qw/ uniq /; use HTTP::Request; use Net::Stomp; +use MooseX::Types::Moose qw/Str Int HashRef/; +use namespace::autoclean; + +extends 'Catalyst::Engine::Embeddable'; + +our $VERSION = '0.06'; has connection => (is => 'rw', isa => 'Net::Stomp'); -has conn_desc => (is => 'rw', isa => 'Str'); +has conn_desc => (is => 'rw', isa => Str); =head1 NAME @@ -24,11 +26,15 @@ Catalyst::Engine::Stomp - write message handling apps with Catalyst. require Catalyst::Engine::Stomp; } - MyApp->config->{Engine::Stomp} = - { - hostname => '127.0.0.1', - port => 61613, - }; + MyApp->config( + 'Engine::Stomp' = { + hostname => '127.0.0.1', + port => 61613, + subscribe_header => { + transformation => 'jms-to-json', + } + }, + ); MyApp->run(); # In a controller, or controller base class: @@ -43,14 +49,6 @@ Catalyst::Engine::Stomp - write message handling apps with Catalyst. $c->stash->{response} = $response; } - # The default serialization is YAML, but this configuration - # may be overridden in your controller: - __PACKAGE__->config( - 'default' => 'text/x-yaml', - 'stash_key' => 'rest', - 'map' => { 'text/x-yaml' => 'YAML' }, - ); - =head1 DESCRIPTION Write a Catalyst app connected to a Stomp messagebroker, not HTTP. You @@ -77,44 +75,38 @@ sub run { my ($self, $app, $oneshot) = @_; die 'No Engine::Stomp configuration found' - unless ref $app->config->{'Engine::Stomp'} eq 'HASH'; - - # list the path namespaces that will be mapped as queues. - # - # this is known to use the deprecated - # Dispatcher->action_hash() method, but there doesn't appear - # to be another way to get the relevant strings out. - # - # http://github.com/rafl/catalyst-runtime/commit/5de163f4963d9dbb41d7311ca6f17314091b7af3#L2R644 - # - my @queues = - uniq - grep { length $_ } - map { $_->namespace } - values %{$app->dispatcher->action_hash}; - - # connect up + unless ref $app->config->{'Engine::Stomp'} eq 'HASH'; + + my @queues = grep { length $_ } + map { $app->controller($_)->action_namespace } $app->controllers; + + # connect up my %template = %{$app->config->{'Engine::Stomp'}}; - $self->connection(Net::Stomp->new(\%template)); - $self->connection->connect(); - $self->conn_desc($template{hostname}.':'.$template{port}); + my $subscribe_headers = $template{subscribe_headers} || {}; + die("subscribe_headers config for Engine::Stomp must be a hashref!\n") + if (ref($subscribe_headers) ne 'HASH'); + + $self->connection(Net::Stomp->new(\%template)); + $self->connection->connect(); + $self->conn_desc($template{hostname}.':'.$template{port}); - # subscribe, with client ack. + # subscribe, with client ack. foreach my $queue (@queues) { - my $queue_name = "/queue/$queue"; - $self->connection->subscribe({ - destination => $queue_name, - ack => 'client', - }); + my $queue_name = "/queue/$queue"; + $self->connection->subscribe({ + %$subscribe_headers, + destination => $queue_name, + ack => 'client', + }); } - # enter loop... - while (1) { - my $frame = $self->connection->receive_frame(); - $self->handle_stomp_frame($app, $frame); - last if $ENV{ENGINE_ONESHOT}; - } - exit 0; + # enter loop... + while (1) { + my $frame = $self->connection->receive_frame(); + $self->handle_stomp_frame($app, $frame); + last if $ENV{ENGINE_ONESHOT}; + } + exit 0; } =head2 prepare_request @@ -125,7 +117,7 @@ client IP address. =cut sub prepare_request { - my ($self, $c, $req, $res_ref) = @_; + my ($self, $c, $req, $res_ref) = @_; shift @_; $self->next::method(@_); $c->req->address($self->conn_desc); @@ -142,7 +134,7 @@ sub finalize_headers { my ($self, $c) = @_; my $error = join "\n", @{$c->error}; if ($error) { - $c->log->debug($error); + $c->log->debug($error); } return $self->next::method($c); } @@ -158,13 +150,13 @@ sub handle_stomp_frame { my $command = $frame->command(); if ($command eq 'MESSAGE') { - $self->handle_stomp_message($app, $frame); + $self->handle_stomp_message($app, $frame); } elsif ($command eq 'ERROR') { - $self->handle_stomp_error($app, $frame); + $self->handle_stomp_error($app, $frame); } else { - $app->log->debug("Got unknown Stomp command: $command"); + $app->log->debug("Got unknown Stomp command: $command"); } } @@ -179,27 +171,28 @@ sub handle_stomp_message { # queue -> controller my $queue = $frame->headers->{destination}; - my ($controller) = $queue =~ m!^/queue/(.*)$!; + my ($controller) = $queue =~ m|^/queue/(.*)$|; # set up request - my $config = $app->config->{'Engine::Stomp'}; - my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller; - my $req = HTTP::Request->new(POST => $url); - $req->content($frame->body); + my $config = $app->config->{'Engine::Stomp'}; + my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller; + my $req = HTTP::Request->new(POST => $url); + $req->content($frame->body); $req->content_length(length $frame->body); # dispatch my $response; - $app->handle_request($req, \$response); + $app->handle_request($req, \$response); - # reply - my $reply_queue = '/remote-temp-queue/' . ($response->headers->header('X-Reply-Address')); - $self->connection->send({ destination => $reply_queue, body => $response->content }); + # reply, if header set + if (my $reply_to = $response->headers->header('X-Reply-Address')) { + my $reply_queue = '/remote-temp-queue/' . $reply_to; + $self->connection->send({ destination => $reply_queue, body => $response->content }); + } - # ack the message off the queue now we've replied + # ack the message off the queue now we've replied / processed $self->connection->ack( { frame => $frame } ); } - =head2 handle_stomp_error Log any Stomp error frames we receive. @@ -215,5 +208,38 @@ sub handle_stomp_error { __PACKAGE__->meta->make_immutable; -1; +=head1 CONFIGURATION + +=head2 subscribe_header + +Add additional header key/value pairs to the subscribe message sent to the +message broker. + +=cut + +=head1 DEVELOPMENT + +The source to Catalyst::Engine::Stomp is in github: + + http://github.com/chrisa/catalyst-engine-stomp + +=head1 AUTHOR + +Chris Andrews C<< >> + +=head1 CONTRIBUTORS + +Tomas Doran (t0m) C<< >> + +Jason Tang + +=head1 LICENCE AND COPYRIGHT + +Copyright (C) 2009 Venda Ltd + +This library is free software; you can redistribute it and/or modify +it under the same terms as Perl itself, either Perl version 5.8.8 or, +at your option, any later version of Perl 5 you may have available. + +=cut