-package DBIx::Class::Storage::DBI::Replication;
-
-use strict;
-use warnings;
+package DBIx::Class::Storage::DBI::Replicated;
+use Moose;
use DBIx::Class::Storage::DBI;
-use DBD::Multi;
-use base qw/Class::Accessor::Fast/;
+use DBIx::Class::Storage::DBI::Replicated::Pool;
+use DBIx::Class::Storage::DBI::Replicated::Balancer;
+use Scalar::Util qw(blessed);
-__PACKAGE__->mk_accessors( qw/read_source write_source/ );
+extends 'DBIx::Class::Storage::DBI', 'Moose::Object';
=head1 NAME
-DBIx::Class::Storage::DBI::Replication - EXPERIMENTAL Replicated database support
+DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
=head1 SYNOPSIS
- # change storage_type in your schema class
- $schema->storage_type( '::DBI::Replication' );
- $schema->connect_info( [
- [ "dbi:mysql:database=test;hostname=master", "username", "password", { AutoCommit => 1 } ], # master
- [ "dbi:mysql:database=test;hostname=slave1", "username", "password", { priority => 10 } ], # slave1
- [ "dbi:mysql:database=test;hostname=slave2", "username", "password", { priority => 10 } ], # slave2
- [ $dbh, '','', {priority=>10}], # add in a preexisting database handle
- [ sub { DBI->connect }, '', '', {priority=>10}], # DBD::Multi will call this coderef for connects
- <...>,
- { limit_dialect => 'LimitXY' } # If needed, see below
- ] );
+The Following example shows how to change an existing $schema to a replicated
+storage type, add some replicated (readonly) databases, and perform reporting
+tasks.
+ ## Change storage_type in your schema class
+ $schema->storage_type( '::DBI::Replicated' );
+
+ ## Add some slaves. Basically this is an array of arrayrefs, where each
+ ## arrayref is database connect information
+
+ $schema->storage->connect_replicants(
+ [$dsn1, $user, $pass, \%opts],
+ [$dsn1, $user, $pass, \%opts],
+ [$dsn1, $user, $pass, \%opts],
+ );
+
=head1 DESCRIPTION
-Warning: This class is marked EXPERIMENTAL. It works for the authors but does
-not currently have automated tests so your mileage may vary.
+Warning: This class is marked ALPHA. We are using this in development and have
+some basic test coverage but the code hasn't yet been stressed by a variety
+of databases. Individual DB's may have quirks we are not aware of. Please
+use this in development and pass along your experiences/bug fixes.
This class implements replicated data store for DBI. Currently you can define
one master and numerous slave database connections. All write-type queries
(INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
database, all read-type queries (SELECTs) go to the slave database.
-For every slave database you can define a priority value, which controls data
-source usage pattern. It uses L<DBD::Multi>, so first the lower priority data
-sources used (if they have the same priority, the are used randomized), than
-if all low priority data sources fail, higher ones tried in order.
+Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
+handle gets delegated to one of the two attributes: L</master_storage> or to
+L</current_replicant_storage>. Additionally, some methods need to be distributed
+to all existing storages. This way our storage class is a drop in replacement
+for L<DBIx::Class::Storage::DBI>.
+
+Read traffic is spread across the replicants (slaves) occuring to a user
+selected algorithm. The default algorithm is random weighted.
-=head1 CONFIGURATION
+TODO more details about the algorithm.
-=head2 Limit dialect
+=head1 ATTRIBUTES
-If you use LIMIT in your queries (effectively, if you use
-SQL::Abstract::Limit), do not forget to set up limit_dialect (perldoc
-SQL::Abstract::Limit) by passing it as an option in the (optional) hash
-reference to connect_info. DBIC can not set it up automatically, since it can
-not guess DBD::Multi connection types.
+This class defines the following attributes.
+
+=head2 pool_type
+
+Contains the classname which will instantiate the L</pool> object. Defaults
+to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
=cut
-sub new {
- my $proto = shift;
- my $class = ref( $proto ) || $proto;
- my $self = {};
+has 'pool_type' => (
+ is=>'ro',
+ isa=>'ClassName',
+ required=>1,
+ lazy=>1,
+ default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
+ handles=>{
+ 'create_pool' => 'new',
+ },
+);
- bless( $self, $class );
- $self->write_source( DBIx::Class::Storage::DBI->new );
- $self->read_source( DBIx::Class::Storage::DBI->new );
+=head2 balancer_type
- return $self;
-}
+The replication pool requires a balance class to provider the methods for
+choose how to spread the query load across each replicant in the pool.
-sub all_sources {
- my $self = shift;
+=cut
- my @sources = ($self->read_source, $self->write_source);
+has 'balancer_type' => (
+ is=>'ro',
+ isa=>'ClassName',
+ required=>1,
+ lazy=>1,
+ default=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
+ handles=>{
+ 'create_balancer' => 'new',
+ },
+);
- return wantarray ? @sources : \@sources;
-}
-sub connect_info {
- my( $self, $source_info ) = @_;
+=head2 pool
- my( $info, $global_options, $options, @dsns );
+Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class. This is a
+container class for one or more replicated databases.
- $info = [ @$source_info ];
+=cut
- $global_options = ref $info->[-1] eq 'HASH' ? pop( @$info ) : {};
- if( ref( $options = $info->[0]->[-1] ) eq 'HASH' ) {
- # Local options present in dsn, merge them with global options
- map { $global_options->{$_} = $options->{$_} } keys %$options;
- pop @{$info->[0]};
- }
+has 'pool' => (
+ is=>'ro',
+ isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
+ lazy_build=>1,
+ handles=>[qw/
+ connect_replicants
+ replicants
+ has_replicants
+ num_replicants
+ delete_replicant
+ /],
+);
- # We need to copy-pass $global_options, since connect_info clears it while
- # processing options
- $self->write_source->connect_info( @{$info->[0]}, { %$global_options } );
- ## allow either a DSN string or an already connect $dbh. Just remember if
- ## you use the $dbh option then DBD::Multi has no idea how to reconnect in
- ## the event of a failure.
-
- @dsns = map {
- ## if the first element in the arrayhash is a ref, make that the value
- my $db = ref $_->[0] ? $_->[0] : $_;
- ($_->[3]->{priority} || 10) => $db;
- } @{$info->[0]}[1..@{$info->[0]}-1];
-
- $global_options->{dsns} = \@dsns;
+=head2 balancer
- $self->read_source->connect_info( [ 'dbi:Multi:', undef, undef, { %$global_options } ] );
-}
+Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class. This
+is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
-sub select {
- shift->read_source->select( @_ );
-}
-sub select_single {
- shift->read_source->select_single( @_ );
-}
-sub throw_exception {
- shift->read_source->throw_exception( @_ );
-}
-sub sql_maker {
- shift->read_source->sql_maker( @_ );
-}
-sub columns_info_for {
- shift->read_source->columns_info_for( @_ );
-}
-sub sqlt_type {
- shift->read_source->sqlt_type( @_ );
-}
-sub create_ddl_dir {
- shift->read_source->create_ddl_dir( @_ );
-}
-sub deployment_statements {
- shift->read_source->deployment_statements( @_ );
-}
-sub datetime_parser {
- shift->read_source->datetime_parser( @_ );
-}
-sub datetime_parser_type {
- shift->read_source->datetime_parser_type( @_ );
-}
-sub build_datetime_parser {
- shift->read_source->build_datetime_parser( @_ );
-}
+=cut
-sub limit_dialect { $_->limit_dialect( @_ ) for( shift->all_sources ) }
-sub quote_char { $_->quote_char( @_ ) for( shift->all_sources ) }
-sub name_sep { $_->quote_char( @_ ) for( shift->all_sources ) }
-sub disconnect { $_->disconnect( @_ ) for( shift->all_sources ) }
-sub set_schema { $_->set_schema( @_ ) for( shift->all_sources ) }
+has 'balancer' => (
+ is=>'ro',
+ isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
+ lazy_build=>1,
+);
-sub DESTROY {
- my $self = shift;
- undef $self->{write_source};
- undef $self->{read_sources};
-}
+=head2 master
+
+The master defines the canonical state for a pool of connected databases. All
+the replicants are expected to match this databases state. Thus, in a classic
+Master / Slaves distributed system, all the slaves are expected to replicate
+the Master's state as quick as possible. This is the only database in the
+pool of databases that is allowed to handle write traffic.
+
+=cut
+
+has 'master' => (
+ is=> 'ro',
+ isa=>'DBIx::Class::Storage::DBI',
+ lazy_build=>1,
+);
+
+
+=head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
+
+The following methods are delegated all the methods required for the
+L<DBIx::Class::Storage::DBI> interface.
+
+=head2 read_handler
+
+Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
+
+=cut
-sub last_insert_id {
- shift->write_source->last_insert_id( @_ );
+has 'read_handler' => (
+ is=>'rw',
+ isa=>'Object',
+ lazy_build=>1,
+ handles=>[qw/
+ select
+ select_single
+ columns_info_for
+ /],
+);
+
+
+=head2 write_handler
+
+Defines an object that implements the write side of L<BIx::Class::Storage::DBI>.
+
+=cut
+
+has 'write_handler' => (
+ is=>'ro',
+ isa=>'Object',
+ lazy_build=>1,
+ lazy_build=>1,
+ handles=>[qw/
+ on_connect_do
+ on_disconnect_do
+ connect_info
+ throw_exception
+ sql_maker
+ sqlt_type
+ create_ddl_dir
+ deployment_statements
+ datetime_parser
+ datetime_parser_type
+ last_insert_id
+ insert
+ insert_bulk
+ update
+ delete
+ dbh
+ txn_do
+ txn_commit
+ txn_rollback
+ sth
+ deploy
+ schema
+ /],
+);
+
+
+=head1 METHODS
+
+This class defines the following methods.
+
+=head2 new
+
+L<DBIx::Class::Schema> when instantiating it's storage passed itself as the
+first argument. We need to invoke L</new> on the underlying parent class, make
+sure we properly give it a L<Moose> meta class, and then correctly instantiate
+our attributes. Basically we pass on whatever the schema has in it's class
+data for 'storage_type_args' to our replicated storage type.
+
+=cut
+
+sub new {
+ my $class = shift @_;
+ my $schema = shift @_;
+ my $storage_type_args = shift @_;
+ my $obj = $class->SUPER::new($schema, $storage_type_args, @_);
+
+ return $class->meta->new_object(
+ __INSTANCE__ => $obj,
+ %$storage_type_args,
+ @_,
+ );
}
-sub insert {
- shift->write_source->insert( @_ );
+
+=head2 _build_master
+
+Lazy builder for the L</master> attribute.
+
+=cut
+
+sub _build_master {
+ DBIx::Class::Storage::DBI->new;
}
-sub update {
- shift->write_source->update( @_ );
+
+=head2 _build_pool
+
+Lazy builder for the L</pool> attribute.
+
+=cut
+
+sub _build_pool {
+ shift->create_pool;
}
-sub update_all {
- shift->write_source->update_all( @_ );
+
+=head2 _build_balancer
+
+Lazy builder for the L</balancer> attribute. This takes a Pool object so that
+the balancer knows which pool it's balancing.
+
+=cut
+
+sub _build_balancer {
+ my $self = shift @_;
+ $self->create_balancer(pool=>$self->pool);
}
-sub delete {
- shift->write_source->delete( @_ );
+
+=head2 _build_write_handler
+
+Lazy builder for the L</write_handler> attribute. The default is to set this to
+the L</master>.
+
+=cut
+
+sub _build_write_handler {
+ return shift->master;
}
-sub delete_all {
- shift->write_source->delete_all( @_ );
+
+=head2 _build_read_handler
+
+Lazy builder for the L</read_handler> attribute. The default is to set this to
+the L</balancer>.
+
+=cut
+
+sub _build_read_handler {
+ return shift->balancer;
}
-sub create {
- shift->write_source->create( @_ );
+
+=head2 around: connect_replicants
+
+All calls to connect_replicants needs to have an existing $schema tacked onto
+top of the args, since L<DBIx::Storage::DBI> needs it.
+
+=cut
+
+around 'connect_replicants' => sub {
+ my ($method, $self, @args) = @_;
+ $self->$method($self->schema, @args);
+};
+
+=head2 all_storages
+
+Returns an array of of all the connected storage backends. The first element
+in the returned array is the master, and the remainings are each of the
+replicants.
+
+=cut
+
+sub all_storages {
+ my $self = shift @_;
+
+ return grep {defined $_ && blessed $_} (
+ $self->master,
+ $self->replicants,
+ );
}
-sub find_or_create {
- shift->write_source->find_or_create( @_ );
+
+=head2 set_reliable_storage
+
+Sets the current $schema to be 'reliable', that is all queries, both read and
+write are sent to the master
+
+=cut
+
+sub set_reliable_storage {
+ my $self = shift @_;
+ my $schema = $self->schema;
+ my $write_handler = $self->schema->storage->write_handler;
+
+ $schema->storage->read_handler($write_handler);
}
-sub update_or_create {
- shift->write_source->update_or_create( @_ );
+
+=head2 set_balanced_storage
+
+Sets the current $schema to be use the </balancer> for all reads, while all
+writea are sent to the master only
+
+=cut
+
+sub set_balanced_storage {
+ my $self = shift @_;
+ my $schema = $self->schema;
+ my $write_handler = $self->schema->storage->balancer;
+
+ $schema->storage->read_handler($write_handler);
}
+
+=head2 connected
+
+Check that the master and at least one of the replicants is connected.
+
+=cut
+
sub connected {
- shift->write_source->connected( @_ );
+ my $self = shift @_;
+
+ return
+ $self->master->connected &&
+ $self->pool->connected_replicants;
}
+
+=head2 ensure_connected
+
+Make sure all the storages are connected.
+
+=cut
+
sub ensure_connected {
- shift->write_source->ensure_connected( @_ );
+ my $self = shift @_;
+ foreach my $source ($self->all_storages) {
+ $source->ensure_connected(@_);
+ }
}
-sub dbh {
- shift->write_source->dbh( @_ );
+
+=head2 limit_dialect
+
+Set the limit_dialect for all existing storages
+
+=cut
+
+sub limit_dialect {
+ my $self = shift @_;
+ foreach my $source ($self->all_storages) {
+ $source->limit_dialect(@_);
+ }
+}
+
+=head2 quote_char
+
+Set the quote_char for all existing storages
+
+=cut
+
+sub quote_char {
+ my $self = shift @_;
+ foreach my $source ($self->all_storages) {
+ $source->quote_char(@_);
+ }
}
-sub txn_begin {
- shift->write_source->txn_begin( @_ );
+
+=head2 name_sep
+
+Set the name_sep for all existing storages
+
+=cut
+
+sub name_sep {
+ my $self = shift @_;
+ foreach my $source ($self->all_storages) {
+ $source->name_sep(@_);
+ }
}
-sub txn_commit {
- shift->write_source->txn_commit( @_ );
+
+=head2 set_schema
+
+Set the schema object for all existing storages
+
+=cut
+
+sub set_schema {
+ my $self = shift @_;
+ foreach my $source ($self->all_storages) {
+ $source->set_schema(@_);
+ }
}
-sub txn_rollback {
- shift->write_source->txn_rollback( @_ );
+
+=head2 debug
+
+set a debug flag across all storages
+
+=cut
+
+sub debug {
+ my $self = shift @_;
+ foreach my $source ($self->all_storages) {
+ $source->debug(@_);
+ }
}
-sub sth {
- shift->write_source->sth( @_ );
+
+=head2 debugobj
+
+set a debug object across all storages
+
+=cut
+
+sub debugobj {
+ my $self = shift @_;
+ foreach my $source ($self->all_storages) {
+ $source->debugobj(@_);
+ }
}
-sub deploy {
- shift->write_source->deploy( @_ );
+
+=head2 debugfh
+
+set a debugfh object across all storages
+
+=cut
+
+sub debugfh {
+ my $self = shift @_;
+ foreach my $source ($self->all_storages) {
+ $source->debugfh(@_);
+ }
}
+=head2 debugcb
-sub debugfh { shift->_not_supported( 'debugfh' ) };
-sub debugcb { shift->_not_supported( 'debugcb' ) };
+set a debug callback across all storages
-sub _not_supported {
- my( $self, $method ) = @_;
+=cut
- die "This Storage does not support $method method.";
+sub debugcb {
+ my $self = shift @_;
+ foreach my $source ($self->all_storages) {
+ $source->debugcb(@_);
+ }
}
-=head1 SEE ALSO
+=head2 disconnect
+
+disconnect everything
-L<DBI::Class::Storage::DBI>, L<DBD::Multi>, L<DBI>
+=cut
+
+sub disconnect {
+ my $self = shift @_;
+ foreach my $source ($self->all_storages) {
+ $source->disconnect(@_);
+ }
+}
=head1 AUTHOR
Peter Siklósi <einon@einon.hu>
+John Napiorkowski <john.napiorkowski@takkle.com>
+
=head1 LICENSE
You may distribute this code under the same terms as Perl itself.