X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=blobdiff_plain;f=lib%2FDBIx%2FClass%2FStorage%2FDBI%2FReplicated.pm;h=8ae3cf4e67051ca9c2962f87fcda8de81dbc34fc;hb=2ed909315eef3db2b248df574abd644a1ecfd0f3;hp=d736c41a6cff4457f88b7df013c16aa5eb22645c;hpb=2156bbddf88e67ebe429789d0018c708cddfcbe4;p=dbsrgits%2FDBIx-Class.git diff --git a/lib/DBIx/Class/Storage/DBI/Replicated.pm b/lib/DBIx/Class/Storage/DBI/Replicated.pm index d736c41..8ae3cf4 100644 --- a/lib/DBIx/Class/Storage/DBI/Replicated.pm +++ b/lib/DBIx/Class/Storage/DBI/Replicated.pm @@ -1,5 +1,509 @@ package DBIx::Class::Storage::DBI::Replicated; +use Moose; +use DBIx::Class::Storage::DBI; +use DBIx::Class::Storage::DBI::Replicated::Pool; +use DBIx::Class::Storage::DBI::Replicated::Balancer; +use Scalar::Util qw(blessed); + +extends 'DBIx::Class::Storage::DBI', 'Moose::Object'; + +=head1 NAME + +DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support + +=head1 SYNOPSIS + +The Following example shows how to change an existing $schema to a replicated +storage type, add some replicated (readonly) databases, and perform reporting +tasks + + ## Change storage_type in your schema class + $schema->storage_type( '::DBI::Replicated' ); + + ## Add some slaves. Basically this is an array of arrayrefs, where each + ## arrayref is database connect information + + $schema->storage->create_replicants( + [$dsn1, $user, $pass, \%opts], + [$dsn1, $user, $pass, \%opts], + [$dsn1, $user, $pass, \%opts], + ## This is just going to use the standard DBIC connect method, so it + ## supports everything that method supports, such as connecting to an + ## existing database handle. + [$dbh], + ); + + +=head1 DESCRIPTION + +Warning: This class is marked ALPHA. We are using this in development and have +some basic test coverage but the code hasn't yet been stressed by a variety +of databases. Individual DB's may have quirks we are not aware of. Please +use this in development and pass along your experiences/bug fixes. + +This class implements replicated data store for DBI. Currently you can define +one master and numerous slave database connections. All write-type queries +(INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master +database, all read-type queries (SELECTs) go to the slave database. + +Basically, any method request that L would normally +handle gets delegated to one of the two attributes: L or to +L. Additionally, some methods need to be distributed +to all existing storages. This way our storage class is a drop in replacement +for L. + +Read traffic is spread across the replicants (slaves) occuring to a user +selected algorithm. The default algorithm is random weighted. + +TODO more details about the algorithm. + +=head1 ATTRIBUTES + +This class defines the following attributes. + +=head2 master + +The master defines the canonical state for a pool of connected databases. All +the replicants are expected to match this databases state. Thus, in a classic +Master / Slaves distributed system, all the slaves are expected to replicate +the Master's state as quick as possible. This is the only database in the +pool of databases that is allowed to handle write traffic. + +=cut + +has 'master' => ( + is=> 'ro', + isa=>'DBIx::Class::Storage::DBI', + lazy_build=>1, + handles=>[qw/ + on_connect_do + on_disconnect_do + connect_info + throw_exception + sql_maker + sqlt_type + create_ddl_dir + deployment_statements + datetime_parser + datetime_parser_type + last_insert_id + insert + insert_bulk + update + delete + dbh + txn_do + txn_commit + txn_rollback + sth + deploy + schema + /], +); + + +=head2 current_replicant + +Replicant storages (slaves) handle all read only traffic. The assumption is +that your database will become readbound well before it becomes write bound +and that being able to spread your read only traffic around to multiple +databases is going to help you to scale traffic. + +This attribute returns the next slave to handle a read request. Your L +attribute has methods to help you shuffle through all the available replicants +via it's balancer object. + +This attribute defines the following reader/writer methods + +=over 4 + +=item get_current_replicant + +Returns the contained L replicant + +=item set_current_replicant + +Set the attribute to a given L (or subclass) object. + +=back + +We split the reader/writer to make it easier to selectively override how the +replicant is altered. + +=cut + +has 'current_replicant' => ( + is=> 'rw', + reader=>'get_current_replicant', + writer=>'set_current_replicant', + isa=>'DBIx::Class::Storage::DBI', + lazy_build=>1, + handles=>[qw/ + select + select_single + columns_info_for + /], +); + + +=head2 pool_type + +Contains the classname which will instantiate the L object. Defaults +to: L. + +=cut + +has 'pool_type' => ( + is=>'ro', + isa=>'ClassName', + required=>1, + lazy=>1, + default=>'DBIx::Class::Storage::DBI::Replicated::Pool', + handles=>{ + 'create_pool' => 'new', + }, +); + + +=head2 balancer_type + +The replication pool requires a balance class to provider the methods for +choose how to spread the query load across each replicant in the pool. + +=cut + +has 'balancer_type' => ( + is=>'ro', + isa=>'ClassName', + required=>1, + lazy=>1, + default=>'DBIx::Class::Storage::DBI::Replicated::Balancer', + handles=>{ + 'create_balancer' => 'new', + }, +); + + +=head2 pool + +Is a or derived class. This is a +container class for one or more replicated databases. + +=cut + +has 'pool' => ( + is=>'ro', + isa=>'DBIx::Class::Storage::DBI::Replicated::Pool', + lazy_build=>1, + handles=>[qw/ + replicants + has_replicants + create_replicants + num_replicants + delete_replicant + /], +); + + +=head2 balancer + +Is a or derived class. This +is a class that takes a pool () + +=cut + +has 'balancer' => ( + is=>'ro', + isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer', + lazy_build=>1, + handles=>[qw/next_storage/], +); + +=head1 METHODS + +This class defines the following methods. + +=head2 _build_master + +Lazy builder for the L attribute. + +=cut + +sub _build_master { + DBIx::Class::Storage::DBI->new; +} + + +=head2 _build_current_replicant + +Lazy builder for the L attribute. + +=cut + +sub _build_current_replicant { + my $self = shift @_; + $self->next_storage($self->pool); +} + + +=head2 _build_pool + +Lazy builder for the L attribute. + +=cut + +sub _build_pool { + my $self = shift @_; + $self->create_pool; +} + + +=head2 _build_balancer + +Lazy builder for the L attribute. + +=cut + +sub _build_balancer { + my $self = shift @_; + $self->create_balancer; +} + + +=head2 around: create_replicants + +All calls to create_replicants needs to have an existing $schema tacked onto +top of the args + +=cut + +around 'create_replicants' => sub { + my ($method, $self, @args) = @_; + $self->$method($self->schema, @args); +}; + + +=head2 after: get_current_replicant_storage + +Advice on the current_replicant_storage attribute. Each time we use a replicant +we need to change it via the storage pool algorithm. That way we are spreading +the load evenly (hopefully) across existing capacity. + +=cut + +after 'current_replicant' => sub { + my $self = shift @_; + my $next_replicant = $self->next_storage($self->pool); + +warn '......................'; + $self->set_current_replicant($next_replicant); +}; + + +=head2 all_storages + +Returns an array of of all the connected storage backends. The first element +in the returned array is the master, and the remainings are each of the +replicants. + +=cut + +sub all_storages { + my $self = shift @_; + + return grep {defined $_ && blessed $_} ( + $self->master, + $self->replicants, + ); +} + + +=head2 connected + +Check that the master and at least one of the replicants is connected. + +=cut + +sub connected { + my $self = shift @_; + + return + $self->master->connected && + $self->pool->connected_replicants; +} + + +=head2 ensure_connected + +Make sure all the storages are connected. + +=cut + +sub ensure_connected { + my $self = shift @_; + foreach my $source ($self->all_storages) { + $source->ensure_connected(@_); + } +} + + +=head2 limit_dialect + +Set the limit_dialect for all existing storages + +=cut + +sub limit_dialect { + my $self = shift @_; + foreach my $source ($self->all_storages) { + $source->limit_dialect(@_); + } +} + + +=head2 quote_char + +Set the quote_char for all existing storages + +=cut + +sub quote_char { + my $self = shift @_; + foreach my $source ($self->all_storages) { + $source->quote_char(@_); + } +} + + +=head2 name_sep + +Set the name_sep for all existing storages + +=cut + +sub name_sep { + my $self = shift @_; + foreach my $source ($self->all_storages) { + $source->name_sep(@_); + } +} + + +=head2 set_schema + +Set the schema object for all existing storages + +=cut + +sub set_schema { + my $self = shift @_; + foreach my $source ($self->all_storages) { + $source->set_schema(@_); + } +} + + +=head2 debug + +set a debug flag across all storages + +=cut + +sub debug { + my $self = shift @_; + foreach my $source ($self->all_storages) { + $source->debug(@_); + } +} + + +=head2 debugobj + +set a debug object across all storages + +=cut + +sub debugobj { + my $self = shift @_; + foreach my $source ($self->all_storages) { + $source->debugobj(@_); + } +} + + +=head2 debugfh + +set a debugfh object across all storages + +=cut + +sub debugfh { + my $self = shift @_; + foreach my $source ($self->all_storages) { + $source->debugfh(@_); + } +} + + +=head2 debugcb + +set a debug callback across all storages + +=cut + +sub debugcb { + my $self = shift @_; + foreach my $source ($self->all_storages) { + $source->debugcb(@_); + } +} + + +=head2 disconnect + +disconnect everything + +=cut + +sub disconnect { + my $self = shift @_; + foreach my $source ($self->all_storages) { + $source->disconnect(@_); + } +} + + +=head2 DESTROY + +Make sure we pass destroy events down to the storage handlers + +=cut + +sub DESTROY { + my $self = shift; + ## TODO, maybe we can just leave this alone ??? +} + + +=head1 AUTHOR + +Norbert Csongrádi + +Peter Siklósi + +John Napiorkowski + +=head1 LICENSE + +You may distribute this code under the same terms as Perl itself. + +=cut + +1; + +__END__ + use strict; use warnings;