improved concurrency connections in Catalyst::Engine::HTTP::Daemon
Christian Hansen [Sun, 29 May 2005 19:14:57 +0000 (19:14 +0000)]
Changes
lib/Catalyst/Engine/HTTP/Daemon.pm

diff --git a/Changes b/Changes
index 2ad2bad..fa4751f 100644 (file)
--- a/Changes
+++ b/Changes
@@ -1,7 +1,8 @@
 This file documents the revision history for Perl extension Catalyst.
 
 5.23  2005-00-00 00:00:00
-        - added support for non Catalyst::Base components to live in namespace.
+        - added support for non Catalyst::Base components to live in namespace
+        - improved concurrency connections in Catalyst::Engine::HTTP::Daemon
 
 5.22  2005-05-26 14:24:00
         - Improved base locating in MP engines
index 92faf42..6da6e65 100644 (file)
@@ -3,7 +3,7 @@ package Catalyst::Engine::HTTP::Daemon;
 use strict;
 use base 'Catalyst::Engine::HTTP::Base';
 
-use IO::Socket qw( SOCK_STREAM SOMAXCONN );
+use IO::Select;
 
 =head1 NAME
 
@@ -38,28 +38,19 @@ This class overloads some methods from C<Catalyst::Engine::HTTP::Base>.
 =cut
 
 sub handler {
-    my ( $class, $client ) = @_;
+    my ( $class, $request, $response, $client ) = @_;
 
-    $client->timeout(5);
+    $request->uri->scheme('http');    # Force URI::http
+    $request->uri->host( $request->header('Host') || $client->sockhost );
+    $request->uri->port( $client->sockport );
 
-    while ( my $request = $client->get_request ) {
-
-        $request->uri->scheme('http');    # Force URI::http
-        $request->uri->host( $request->header('Host') || $client->sockhost );
-        $request->uri->port( $client->sockport );
-
-        my $http = Catalyst::Engine::HTTP::Base::struct->new(
-            address  => $client->peerhost,
-            request  => $request,
-            response => HTTP::Response->new
-        );
-
-        $class->SUPER::handler($http);
-
-        $client->send_response( $http->response );
-    }
+    my $http = Catalyst::Engine::HTTP::Base::struct->new(
+        address  => $client->peerhost,
+        request  => $request,
+        response => $response
+    );
 
-    $client->close;
+    $class->SUPER::handler($http);
 }
 
 =item $c->run
@@ -69,29 +60,81 @@ sub handler {
 sub run {
     my $class = shift;
     my $port  = shift || 3000;
-    
+
     $SIG{'PIPE'} = 'IGNORE';
-    
-    $HTTP::Daemon::PROTO = 'HTTP/1.0'; # For now until we resolve the blocking 
-                                       # issues with HTTP 1.1
 
     my $daemon = Catalyst::Engine::HTTP::Daemon::Catalyst->new(
-        Listen    => SOMAXCONN,
+        Listen    => 1,
         LocalPort => $port,
         ReuseAddr => 1,
-        Type      => SOCK_STREAM,
+        Timeout   => 5
     );
-    
+
     unless ( defined $daemon ) {
-        die( qq/Failed to create daemon. Reason: '$!'/ );
+        die(qq/Failed to create daemon. Reason: '$!'/);
     }
 
     my $base = URI->new( $daemon->url )->canonical;
 
     printf( "You can connect to your server at %s\n", $base );
 
-    while ( my $client = $daemon->accept ) {
-        $class->handler($client);
+    my $select = IO::Select->new($daemon);
+
+    while (1) {
+
+        for my $client ( $select->can_read ) {
+
+            if ( $client == $daemon ) {
+                $client = $daemon->accept;
+                $client->blocking(0);
+                $select->add($client);
+            }
+
+            else {
+                next if $client->request;
+                next if $client->response;
+
+                my $read = $client->sysread( my $buf, 4096 );
+                
+                unless ( defined($read) && length($buf) ) {
+             
+                    $select->remove($client);
+                    $client->close;
+
+                    next;
+                }
+
+                $client->read_buffer($buf);
+                $client->request( $client->get_request );
+            }
+        }
+
+        for my $client ( $select->handles ) {
+
+            next if $client == $daemon;
+            next if $client->response;
+            next unless $client->request;
+
+            $client->response( HTTP::Response->new );
+            $class->handler( $client->request, $client->response, $client );    
+        }
+
+        for my $client ( $select->can_write(0) ) {
+
+            next unless $client->response;
+
+            $client->send_response( $client->response );
+
+            my $connection = $client->request->header('Connection');
+
+            unless ( $connection && $connection =~ /Keep-Alive/i ) {
+                $select->remove($client);
+                $client->close;
+            }
+
+            $client->request(undef);
+            $client->response(undef);
+        }
     }
 }
 
@@ -99,7 +142,7 @@ sub run {
 
 =head1 SEE ALSO
 
-L<Catalyst>, L<Catalyst::Engine>, L<Catalyst::Engine::HTTP::Base>, 
+L<Catalyst>, L<Catalyst::Engine>, L<Catalyst::Engine::HTTP::Base>,
 L<HTTP::Daemon>.
 
 =head1 AUTHOR
@@ -119,8 +162,47 @@ package Catalyst::Engine::HTTP::Daemon::Catalyst;
 use strict;
 use base 'HTTP::Daemon';
 
+sub accept {
+    return shift->SUPER::accept('Catalyst::Engine::HTTP::Daemon::Client');
+}
+
 sub product_tokens {
-    "Catalyst/$Catalyst::VERSION";
+    return "Catalyst/$Catalyst::VERSION";
+}
+
+package Catalyst::Engine::HTTP::Daemon::Client;
+
+use strict;
+use base 'HTTP::Daemon::ClientConn';
+
+sub read_buffer {
+    my $self = shift;
+
+    if (@_) {
+        ${*$self}{'httpd_rbuf'} .= shift;
+    }
+
+    return ${*$self}{'httpd_rbuf'};
+}
+
+sub request {
+    my $self = shift;
+
+    if (@_) {
+        ${*$self}{'request'} = shift;
+    }
+
+    return ${*$self}{'request'};
+}
+
+sub response {
+    my $self = shift;
+
+    if (@_) {
+        ${*$self}{'response'} = shift;
+    }
+
+    return ${*$self}{'response'};
 }
 
 1;