X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?p=catagits%2FCatalyst-Engine-STOMP.git;a=blobdiff_plain;f=lib%2FCatalyst%2FEngine%2FStomp.pm;h=20da9d237aa8c9f9e20cd24a149d0396a530f73b;hp=f2c3847f2417db550df397e53dc0c2e1f31f9fbe;hb=6e81857d01f942e507b6ab2be95d43e56de25b65;hpb=48bc6cd6f67f4da5f95fcb29a44d18f8a23e4f9a diff --git a/lib/Catalyst/Engine/Stomp.pm b/lib/Catalyst/Engine/Stomp.pm index f2c3847..20da9d2 100644 --- a/lib/Catalyst/Engine/Stomp.pm +++ b/lib/Catalyst/Engine/Stomp.pm @@ -22,7 +22,7 @@ Catalyst::Engine::Stomp - write message handling apps with Catalyst. BEGIN { $ENV{CATALYST_ENGINE} = 'Stomp'; require Catalyst::Engine::Stomp; - } + } MyApp->config->{Engine::Stomp} = { @@ -43,18 +43,18 @@ Catalyst::Engine::Stomp - write message handling apps with Catalyst. $c->stash->{response} = $response; } - # The default serialization is YAML, but this configuration + # The default serialization is YAML, but this configuration # may be overridden in your controller: __PACKAGE__->config( - 'default' => 'text/x-yaml', - 'stash_key' => 'rest', - 'map' => { 'text/x-yaml' => 'YAML' }, - ); + 'default' => 'text/x-yaml', + 'stash_key' => 'rest', + 'map' => { 'text/x-yaml' => 'YAML' }, + ); =head1 DESCRIPTION Write a Catalyst app connected to a Stomp messagebroker, not HTTP. You -need a controller that understands messaging, as well as this engine. +need a controller that understands messaging, as well as this engine. This is single-threaded and single process - you need to run multiple instances of this engine to get concurrency, and configure your broker @@ -62,8 +62,8 @@ to load-balance across multiple consumers of the same queue. Controllers are mapped to Stomp queues, and a controller base class is provided, Catalyst::Controller::MessageDriven, which implements -YAML-serialized messages, mapping a top-level YAML "type" key to -the action. +YAML-serialized messages, mapping a top-level YAML "type" key to +the action. =head1 METHODS @@ -77,44 +77,44 @@ sub run { my ($self, $app, $oneshot) = @_; die 'No Engine::Stomp configuration found' - unless ref $app->config->{'Engine::Stomp'} eq 'HASH'; + unless ref $app->config->{'Engine::Stomp'} eq 'HASH'; # list the path namespaces that will be mapped as queues. - # - # this is known to use the deprecated - # Dispatcher->action_hash() method, but there doesn't appear - # to be another way to get the relevant strings out. - # - # http://github.com/rafl/catalyst-runtime/commit/5de163f4963d9dbb41d7311ca6f17314091b7af3#L2R644 - # + # + # this is known to use the deprecated + # Dispatcher->action_hash() method, but there doesn't appear + # to be another way to get the relevant strings out. + # + # http://github.com/rafl/catalyst-runtime/commit/5de163f4963d9dbb41d7311ca6f17314091b7af3#L2R644 + # my @queues = - uniq - grep { length $_ } - map { $_->namespace } - values %{$app->dispatcher->action_hash}; + uniq + grep { length $_ } + map { $_->namespace } + values %{$app->dispatcher->action_hash}; - # connect up + # connect up my %template = %{$app->config->{'Engine::Stomp'}}; - $self->connection(Net::Stomp->new(\%template)); - $self->connection->connect(); - $self->conn_desc($template{hostname}.':'.$template{port}); + $self->connection(Net::Stomp->new(\%template)); + $self->connection->connect(); + $self->conn_desc($template{hostname}.':'.$template{port}); - # subscribe, with client ack. + # subscribe, with client ack. foreach my $queue (@queues) { - my $queue_name = "/queue/$queue"; - $self->connection->subscribe({ - destination => $queue_name, - ack => 'client' - }); + my $queue_name = "/queue/$queue"; + $self->connection->subscribe({ + destination => $queue_name, + ack => 'client', + }); } - # enter loop... - while (1) { - my $frame = $self->connection->receive_frame(); - $self->handle_stomp_frame($app, $frame); - last if $ENV{ENGINE_ONESHOT}; - } - exit 0; + # enter loop... + while (1) { + my $frame = $self->connection->receive_frame(); + $self->handle_stomp_frame($app, $frame); + last if $ENV{ENGINE_ONESHOT}; + } + exit 0; } =head2 prepare_request @@ -126,9 +126,9 @@ client IP address. sub prepare_request { my ($self, $c, $req, $res_ref) = @_; - shift @_; - $self->next::method(@_); - $c->req->address($self->conn_desc); + shift @_; + $self->next::method(@_); + $c->req->address($self->conn_desc); } =head2 finalize_headers @@ -139,12 +139,12 @@ Overridden to dump out any errors encountered, since you won't get a =cut sub finalize_headers { - my ($self, $c) = @_; - my $error = join "\n", @{$c->error}; - if ($error) { - $c->log->debug($error); - } - return $self->next::method($c); + my ($self, $c) = @_; + my $error = join "\n", @{$c->error}; + if ($error) { + $c->log->debug($error); + } + return $self->next::method($c); } =head2 handle_stomp_frame @@ -154,18 +154,18 @@ Dispatch according to Stomp frame type. =cut sub handle_stomp_frame { - my ($self, $app, $frame) = @_; - - my $command = $frame->command(); - if ($command eq 'MESSAGE') { - $self->handle_stomp_message($app, $frame); - } - elsif ($command eq 'ERROR') { - $self->handle_stomp_error($app, $frame); - } - else { - $app->log->debug("Got unknown Stomp command: $command"); - } + my ($self, $app, $frame) = @_; + + my $command = $frame->command(); + if ($command eq 'MESSAGE') { + $self->handle_stomp_message($app, $frame); + } + elsif ($command eq 'ERROR') { + $self->handle_stomp_error($app, $frame); + } + else { + $app->log->debug("Got unknown Stomp command: $command"); + } } =head2 handle_stomp_message @@ -175,29 +175,29 @@ Dispatch a Stomp message into the Catalyst app. =cut sub handle_stomp_message { - my ($self, $app, $frame) = @_; + my ($self, $app, $frame) = @_; - # queue -> controller - my $queue = $frame->headers->{destination}; - my ($controller) = $queue =~ m!^/queue/(.*)$!; + # queue -> controller + my $queue = $frame->headers->{destination}; + my ($controller) = $queue =~ m!^/queue/(.*)$!; - # set up request + # set up request my $config = $app->config->{'Engine::Stomp'}; my $url = 'stomp://'.$config->{hostname}.':'.$config->{port}.'/'.$controller; my $req = HTTP::Request->new(POST => $url); - $req->content($frame->body); - $req->content_length(length $frame->body); + $req->content($frame->body); + $req->content_length(length $frame->body); - # dispatch - my $response; + # dispatch + my $response; $app->handle_request($req, \$response); - # reply - my $reply_queue = '/remote-temp-queue/' . ($response->headers->header('X-Reply-Address')); - $self->connection->send({ destination => $reply_queue, body => $response->content }); + # reply + my $reply_queue = '/remote-temp-queue/' . ($response->headers->header('X-Reply-Address')); + $self->connection->send({ destination => $reply_queue, body => $response->content }); - # ack the message off the queue now we've replied - $self->connection->ack( { frame => $frame } ); + # ack the message off the queue now we've replied + $self->connection->ack( { frame => $frame } ); } =head2 handle_stomp_error @@ -207,10 +207,10 @@ Log any Stomp error frames we receive. =cut sub handle_stomp_error { - my ($self, $app, $frame) = @_; - - my $error = $frame->headers->{message}; - $app->log->debug("Got Stomp error: $error"); + my ($self, $app, $frame) = @_; + + my $error = $frame->headers->{message}; + $app->log->debug("Got Stomp error: $error"); } __PACKAGE__->meta->make_immutable;