cleanup of some docs, got the default shuffling balancer to work properly. Don't...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
index 6ec74e9..df0734f 100644 (file)
-package DBIx::Class::Storage::DBI::Replication;
+package DBIx::Class::Storage::DBI::Replicated;
+
+use Moose;
+use DBIx::Class::Storage::DBI;
+use DBIx::Class::Storage::DBI::Replicated::Pool;
+use DBIx::Class::Storage::DBI::Replicated::Balancer;
+use Scalar::Util qw(blessed);
+
+extends 'DBIx::Class::Storage::DBI', 'Moose::Object';
+
+=head1 NAME
+
+DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
+
+=head1 SYNOPSIS
+
+The Following example shows how to change an existing $schema to a replicated
+storage type, add some replicated (readonly) databases, and perform reporting
+tasks.
+
+    ## Change storage_type in your schema class
+    $schema->storage_type( '::DBI::Replicated' );
+    
+    ## Add some slaves.  Basically this is an array of arrayrefs, where each
+    ## arrayref is database connect information
+    
+    $schema->storage->connect_replicants(
+        [$dsn1, $user, $pass, \%opts],
+        [$dsn1, $user, $pass, \%opts],
+        [$dsn1, $user, $pass, \%opts],
+    );
+    
+=head1 DESCRIPTION
+
+Warning: This class is marked ALPHA.  We are using this in development and have
+some basic test coverage but the code hasn't yet been stressed by a variety
+of databases.  Individual DB's may have quirks we are not aware of.  Please
+use this in development and pass along your experiences/bug fixes.
+
+This class implements replicated data store for DBI. Currently you can define
+one master and numerous slave database connections. All write-type queries
+(INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
+database, all read-type queries (SELECTs) go to the slave database.
+
+Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
+handle gets delegated to one of the two attributes: L</master_storage> or to
+L</current_replicant_storage>.  Additionally, some methods need to be distributed
+to all existing storages.  This way our storage class is a drop in replacement
+for L<DBIx::Class::Storage::DBI>.
+
+Read traffic is spread across the replicants (slaves) occuring to a user
+selected algorithm.  The default algorithm is random weighted.
+
+TODO more details about the algorithm.
+
+=head1 ATTRIBUTES
+
+This class defines the following attributes.
+
+=head2 master
+
+The master defines the canonical state for a pool of connected databases.  All
+the replicants are expected to match this databases state.  Thus, in a classic
+Master / Slaves distributed system, all the slaves are expected to replicate
+the Master's state as quick as possible.  This is the only database in the
+pool of databases that is allowed to handle write traffic.
+
+=cut
+
+has 'master' => (
+    is=> 'ro',
+    isa=>'DBIx::Class::Storage::DBI',
+    lazy_build=>1,
+    handles=>[qw/   
+        on_connect_do
+        on_disconnect_do       
+        connect_info
+        throw_exception
+        sql_maker
+        sqlt_type
+        create_ddl_dir
+        deployment_statements
+        datetime_parser
+        datetime_parser_type        
+        last_insert_id
+        insert
+        insert_bulk
+        update
+        delete
+        dbh
+        txn_do
+        txn_commit
+        txn_rollback
+        sth
+        deploy
+        schema
+    /],
+);
+
+
+=head2 current_replicant
+
+Replicant storages (slaves) handle all read only traffic.  The assumption is
+that your database will become readbound well before it becomes write bound
+and that being able to spread your read only traffic around to multiple 
+databases is going to help you to scale traffic.
+
+This attribute returns the next slave to handle a read request.  Your L</pool>
+attribute has methods to help you shuffle through all the available replicants
+via it's balancer object.
+
+We split the reader/writer to make it easier to selectively override how the
+replicant is altered.
+
+=cut
+
+has 'current_replicant' => (
+    is=> 'rw',
+    isa=>'DBIx::Class::Storage::DBI',
+    lazy_build=>1,
+    handles=>[qw/
+        select
+        select_single
+        columns_info_for
+    /],
+);
+
+
+=head2 pool_type
+
+Contains the classname which will instantiate the L</pool> object.  Defaults 
+to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
+
+=cut
+
+has 'pool_type' => (
+    is=>'ro',
+    isa=>'ClassName',
+    required=>1,
+    lazy=>1,
+    default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
+    handles=>{
+       'create_pool' => 'new',
+    },
+);
+
+
+=head2 balancer_type
+
+The replication pool requires a balance class to provider the methods for
+choose how to spread the query load across each replicant in the pool.
+
+=cut
+
+has 'balancer_type' => (
+    is=>'ro',
+    isa=>'ClassName',
+    required=>1,
+    lazy=>1,
+    default=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
+    handles=>{
+       'create_balancer' => 'new',
+    },
+);
+
+
+=head2 pool
+
+Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
+container class for one or more replicated databases.
+
+=cut
+
+has 'pool' => (
+    is=>'ro',
+    isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
+    lazy_build=>1,
+    handles=>[qw/
+        replicants
+        has_replicants
+        connect_replicants
+        num_replicants
+        delete_replicant
+    /],
+);
+
+
+=head2 balancer
+
+Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This 
+is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
+
+=cut
+
+has 'balancer' => (
+    is=>'ro',
+    isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
+    lazy_build=>1,
+    handles=>[qw/next_storage/],
+);
+
+=head1 METHODS
+
+This class defines the following methods.
+
+=head2 _build_master
+
+Lazy builder for the L</master> attribute.
+
+=cut
+
+sub _build_master {
+       DBIx::Class::Storage::DBI->new;
+}
+
+
+=head2 _build_current_replicant
+
+Lazy builder for the L</current_replicant_storage> attribute.
+
+=cut
+
+sub _build_current_replicant {
+       my $self = shift @_;
+       $self->next_storage($self->pool);
+}
+
+
+=head2 _build_pool
+
+Lazy builder for the L</pool> attribute.
+
+=cut
+
+sub _build_pool {
+    my $self = shift @_;
+    $self->create_pool;
+}
+
+
+=head2 _build_balancer
+
+Lazy builder for the L</balancer> attribute.
+
+=cut
+
+sub _build_balancer {
+    my $self = shift @_;
+    $self->create_balancer;
+}
+
+
+=head2 around: create_replicants
+
+All calls to create_replicants needs to have an existing $schema tacked onto
+top of the args
+
+=cut
+
+around 'connect_replicants' => sub {
+       my ($method, $self, @args) = @_;
+       $self->$method($self->schema, @args);
+};
+
+
+=head2 after: select, select_single, columns_info_for
+
+Advice on the current_replicant_storage attribute.  Each time we use a replicant
+we need to change it via the storage pool algorithm.  That way we are spreading
+the load evenly (hopefully) across existing capacity.
+
+=cut
+
+after 'select' => sub {
+    my $self = shift @_;
+    my $next_replicant = $self->next_storage($self->pool);
+
+    $self->current_replicant($next_replicant);
+};
+
+after 'select_single' => sub {
+    my $self = shift @_;
+    my $next_replicant = $self->next_storage($self->pool);
+
+    $self->current_replicant($next_replicant);
+};
+
+after 'columns_info_for' => sub {
+    my $self = shift @_;
+    my $next_replicant = $self->next_storage($self->pool);
+
+    $self->current_replicant($next_replicant);
+};
+
+=head2 all_storages
+
+Returns an array of of all the connected storage backends.  The first element
+in the returned array is the master, and the remainings are each of the
+replicants.
+
+=cut
+
+sub all_storages {
+       my $self = shift @_;
+       
+       return grep {defined $_ && blessed $_} (
+          $self->master,
+          $self->replicants,
+       );
+}
+
+
+=head2 connected
+
+Check that the master and at least one of the replicants is connected.
+
+=cut
+
+sub connected {
+       my $self = shift @_;
+       
+       return
+          $self->master->connected &&
+          $self->pool->connected_replicants;
+}
+
+
+=head2 ensure_connected
+
+Make sure all the storages are connected.
+
+=cut
+
+sub ensure_connected {
+    my $self = shift @_;
+    foreach my $source ($self->all_storages) {
+        $source->ensure_connected(@_);
+    }
+}
+
+
+=head2 limit_dialect
+
+Set the limit_dialect for all existing storages
+
+=cut
+
+sub limit_dialect {
+    my $self = shift @_;
+    foreach my $source ($self->all_storages) {
+        $source->limit_dialect(@_);
+    }
+}
+
+
+=head2 quote_char
+
+Set the quote_char for all existing storages
+
+=cut
+
+sub quote_char {
+    my $self = shift @_;
+    foreach my $source ($self->all_storages) {
+        $source->quote_char(@_);
+    }
+}
+
+
+=head2 name_sep
+
+Set the name_sep for all existing storages
+
+=cut
+
+sub name_sep {
+    my $self = shift @_;
+    foreach my $source ($self->all_storages) {
+        $source->name_sep(@_);
+    }
+}
+
+
+=head2 set_schema
+
+Set the schema object for all existing storages
+
+=cut
+
+sub set_schema {
+       my $self = shift @_;
+       foreach my $source ($self->all_storages) {
+               $source->set_schema(@_);
+       }
+}
+
+
+=head2 debug
+
+set a debug flag across all storages
+
+=cut
+
+sub debug {
+    my $self = shift @_;
+    foreach my $source ($self->all_storages) {
+        $source->debug(@_);
+    }
+}
+
+
+=head2 debugobj
+
+set a debug object across all storages
+
+=cut
+
+sub debugobj {
+    my $self = shift @_;
+    foreach my $source ($self->all_storages) {
+        $source->debugobj(@_);
+    }
+}
+
+
+=head2 debugfh
+
+set a debugfh object across all storages
+
+=cut
+
+sub debugfh {
+    my $self = shift @_;
+    foreach my $source ($self->all_storages) {
+        $source->debugfh(@_);
+    }
+}
+
+
+=head2 debugcb
+
+set a debug callback across all storages
+
+=cut
+
+sub debugcb {
+    my $self = shift @_;
+    foreach my $source ($self->all_storages) {
+        $source->debugcb(@_);
+    }
+}
+
+
+=head2 disconnect
+
+disconnect everything
+
+=cut
+
+sub disconnect {
+    my $self = shift @_;
+    foreach my $source ($self->all_storages) {
+        $source->disconnect(@_);
+    }
+}
+
+
+=head2 DESTROY
+
+Make sure we pass destroy events down to the storage handlers
+
+=cut
+
+sub DESTROY {
+    my $self = shift;
+    ## TODO, maybe we can just leave this alone ???
+}
+
+
+=head1 AUTHOR
+
+Norbert Csongrádi <bert@cpan.org>
+
+Peter Siklósi <einon@einon.hu>
+
+John Napiorkowski <john.napiorkowski@takkle.com>
+
+=head1 LICENSE
+
+You may distribute this code under the same terms as Perl itself.
+
+=cut
+
+1;
+
+__END__
 
 use strict;
 use warnings;
 
 use DBIx::Class::Storage::DBI;
 use DBD::Multi;
+
 use base qw/Class::Accessor::Fast/;
 
 __PACKAGE__->mk_accessors( qw/read_source write_source/ );
 
 =head1 NAME
 
-DBIx::Class::Storage::DBI::Replication - EXPERIMENTAL Replicated database support
+DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
 
 =head1 SYNOPSIS
 
-  # change storage_type in your schema class
-    $schema->storage_type( '::DBI::Replication' );
-    $schema->connect_info( [
-                    [ "dbi:mysql:database=test;hostname=master", "username", "password", { AutoCommit => 1 } ], # master
-                    [ "dbi:mysql:database=test;hostname=slave1", "username", "password", { priority => 10 } ],  # slave1
-                    [ "dbi:mysql:database=test;hostname=slave2", "username", "password", { priority => 10 } ],  # slave2
-                    [ $dbh, '','', {priority=>10}], # add in a preexisting database handle
-                    [ sub {  DBI->connect }, '', '', {priority=>10}], # DBD::Multi will call this coderef for connects
-                    <...>,
-                    { limit_dialect => 'LimitXY' } # If needed, see below
-                   ] );
+The Following example shows how to change an existing $schema to a replicated
+storage type and update it's connection information to contain a master DSN and
+an array of slaves.
+
+    ## Change storage_type in your schema class
+    $schema->storage_type( '::DBI::Replicated' );
+    
+    ## Set your connection.
+    $schema->connect(
+        $dsn, $user, $password, {
+               AutoCommit => 1,
+               ## Other standard DBI connection or DBD custom attributes added as
+               ## usual.  Additionally, we have two custom attributes for defining
+               ## slave information and controlling how the underlying DBD::Multi
+               connect_replicants => [
+                  ## Define each slave like a 'normal' DBI connection, but you add
+                  ## in a DBD::Multi custom attribute to define how the slave is
+                  ## prioritized.  Please see DBD::Multi for more.
+                  [$slave1dsn, $user, $password, {%slave1opts}],
+               [$slave2dsn, $user, $password, {%slave2opts}],
+               [$slave3dsn, $user, $password, {%slave3opts}],
+               ],
+        },
+    );
+    
+    ## Now, just use the schema as normal
+    $schema->resultset('Table')->find(< unique >); ## Reads will use slaves
+    $schema->resultset('Table')->create(\%info); ## Writes will use master
 
 =head1 DESCRIPTION
 
-Warning: This class is marked EXPERIMENTAL. It works for the authors but does
-not currently have automated tests so your mileage may vary.
+Warning: This class is marked ALPHA.  We are using this in development and have
+some basic test coverage but the code hasn't yet been stressed by a variety
+of databases.  Individual DB's may have quirks we are not aware of.  Please
+use this in development and pass along your experiences/bug fixes.
 
 This class implements replicated data store for DBI. Currently you can define
 one master and numerous slave database connections. All write-type queries
@@ -44,13 +559,7 @@ if all low priority data sources fail, higher ones tried in order.
 
 =head1 CONFIGURATION
 
-=head2 Limit dialect
-
-If you use LIMIT in your queries (effectively, if you use
-SQL::Abstract::Limit), do not forget to set up limit_dialect (perldoc
-SQL::Abstract::Limit) by passing it as an option in the (optional) hash
-reference to connect_info.  DBIC can not set it up automatically, since it can
-not guess DBD::Multi connection types.
+Please see L<DBD::Multi> for most configuration information.
 
 =cut
 
@@ -75,37 +584,48 @@ sub all_sources {
     return wantarray ? @sources : \@sources;
 }
 
-sub connect_info {
-    my( $self, $source_info ) = @_;
-
-    my( $info, $global_options, $options, @dsns );
-
-    $info = [ @$source_info ];
-
-    $global_options = ref $info->[-1] eq 'HASH' ? pop( @$info ) : {};
-    if( ref( $options = $info->[0]->[-1] ) eq 'HASH' ) {
-       # Local options present in dsn, merge them with global options
-        map { $global_options->{$_} = $options->{$_} } keys %$options;
-        pop @{$info->[0]};
-    }
+sub _connect_info {
+       my $self = shift;
+    my $master = $self->write_source->_connect_info;
+    $master->[-1]->{slave_connect_info} = $self->read_source->_connect_info;
+    return $master;
+}
 
-    # We need to copy-pass $global_options, since connect_info clears it while
-    # processing options
-    $self->write_source->connect_info( @{$info->[0]}, { %$global_options } );
+sub connect_info {
+       my ($self, $source_info) = @_;
 
-       ## allow either a DSN string or an already connect $dbh.  Just remember if
-       ## you use the $dbh option then DBD::Multi has no idea how to reconnect in
-       ## the event of a failure.
-       
-    @dsns = map {
+    ## if there is no $source_info, treat this sub like an accessor
+    return $self->_connect_info
+     if !$source_info;
+    
+    ## Alright, let's conect the master 
+    $self->write_source->connect_info($source_info);
+  
+    ## Now, build and then connect the Slaves
+    my @slaves_connect_info = @{$source_info->[-1]->{slaves_connect_info}};   
+    my $dbd_multi_config = ref $slaves_connect_info[-1] eq 'HASH' 
+        ? pop @slaves_connect_info : {};
+
+    ## We need to do this since SQL::Abstract::Limit can't guess what DBD::Multi is
+    $dbd_multi_config->{limit_dialect} = $self->write_source->sql_maker->limit_dialect
+        unless defined $dbd_multi_config->{limit_dialect};
+
+    @slaves_connect_info = map {
         ## if the first element in the arrayhash is a ref, make that the value
         my $db = ref $_->[0] ? $_->[0] : $_;
-        ($_->[3]->{priority} || 10) => $db;
-    } @{$info->[0]}[1..@{$info->[0]}-1];
+       my $priority = $_->[-1]->{priority} || 10; ## default priority is 10
+       $priority => $db;
+    } @slaves_connect_info;
     
-    $global_options->{dsns} = \@dsns;
-
-    $self->read_source->connect_info( [ 'dbi:Multi:', undef, undef, { %$global_options } ] );
+    $self->read_source->connect_info([ 
+        'dbi:Multi:', undef, undef, { 
+               dsns => [@slaves_connect_info],
+               %$dbd_multi_config,
+        },
+    ]);
+    
+    ## Return the formated connection information
+    return $self->_connect_info;
 }
 
 sub select {
@@ -191,8 +711,8 @@ sub ensure_connected {
 sub dbh {
     shift->write_source->dbh( @_ );
 }
-sub txn_begin {
-    shift->write_source->txn_begin( @_ );
+sub txn_do {
+    shift->write_source->txn_do( @_ );
 }
 sub txn_commit {
     shift->write_source->txn_commit( @_ );
@@ -206,7 +726,16 @@ sub sth {
 sub deploy {
     shift->write_source->deploy( @_ );
 }
+sub _prep_for_execute {
+       shift->write_source->_prep_for_execute(@_);
+}
 
+sub debugobj {
+       shift->write_source->debugobj(@_);
+}
+sub debug {
+    shift->write_source->debug(@_);
+}
 
 sub debugfh { shift->_not_supported( 'debugfh' ) };
 sub debugcb { shift->_not_supported( 'debugcb' ) };
@@ -227,6 +756,8 @@ Norbert Csongr
 
 Peter Siklósi <einon@einon.hu>
 
+John Napiorkowski <john.napiorkowski@takkle.com>
+
 =head1 LICENSE
 
 You may distribute this code under the same terms as Perl itself.