X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?p=dbsrgits%2FDBIx-Class.git;a=blobdiff_plain;f=lib%2FDBIx%2FClass%2FStorage%2FDBI%2FReplicated.pm;h=6408cb4fd2f3ad7b6be77da13941c8c69b428f91;hp=6ec74e99b77e483314a2062f32fde2d71ee7d02e;hb=106d5f3b7e03a68fb2e125772e3f06a34115f22d;hpb=434ffe5f4cd4d2bfb6cadb6c6100096ecdadbb60 diff --git a/lib/DBIx/Class/Storage/DBI/Replicated.pm b/lib/DBIx/Class/Storage/DBI/Replicated.pm index 6ec74e9..6408cb4 100644 --- a/lib/DBIx/Class/Storage/DBI/Replicated.pm +++ b/lib/DBIx/Class/Storage/DBI/Replicated.pm @@ -1,225 +1,514 @@ -package DBIx::Class::Storage::DBI::Replication; - -use strict; -use warnings; +package DBIx::Class::Storage::DBI::Replicated; +use Moose; use DBIx::Class::Storage::DBI; -use DBD::Multi; -use base qw/Class::Accessor::Fast/; +use DBIx::Class::Storage::DBI::Replicated::Pool; +use DBIx::Class::Storage::DBI::Replicated::Balancer; +use Scalar::Util qw(blessed); -__PACKAGE__->mk_accessors( qw/read_source write_source/ ); +extends 'DBIx::Class::Storage::DBI', 'Moose::Object'; =head1 NAME -DBIx::Class::Storage::DBI::Replication - EXPERIMENTAL Replicated database support +DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support =head1 SYNOPSIS - # change storage_type in your schema class - $schema->storage_type( '::DBI::Replication' ); - $schema->connect_info( [ - [ "dbi:mysql:database=test;hostname=master", "username", "password", { AutoCommit => 1 } ], # master - [ "dbi:mysql:database=test;hostname=slave1", "username", "password", { priority => 10 } ], # slave1 - [ "dbi:mysql:database=test;hostname=slave2", "username", "password", { priority => 10 } ], # slave2 - [ $dbh, '','', {priority=>10}], # add in a preexisting database handle - [ sub { DBI->connect }, '', '', {priority=>10}], # DBD::Multi will call this coderef for connects - <...>, - { limit_dialect => 'LimitXY' } # If needed, see below - ] ); +The Following example shows how to change an existing $schema to a replicated +storage type, add some replicated (readonly) databases, and perform reporting +tasks. + ## Change storage_type in your schema class + $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] ); + + ## Add some slaves. Basically this is an array of arrayrefs, where each + ## arrayref is database connect information + + $schema->storage->connect_replicants( + [$dsn1, $user, $pass, \%opts], + [$dsn1, $user, $pass, \%opts], + [$dsn1, $user, $pass, \%opts], + ); + =head1 DESCRIPTION -Warning: This class is marked EXPERIMENTAL. It works for the authors but does -not currently have automated tests so your mileage may vary. +Warning: This class is marked ALPHA. We are using this in development and have +some basic test coverage but the code hasn't yet been stressed by a variety +of databases. Individual DB's may have quirks we are not aware of. Please +use this in development and pass along your experiences/bug fixes. This class implements replicated data store for DBI. Currently you can define one master and numerous slave database connections. All write-type queries (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master database, all read-type queries (SELECTs) go to the slave database. -For every slave database you can define a priority value, which controls data -source usage pattern. It uses L, so first the lower priority data -sources used (if they have the same priority, the are used randomized), than -if all low priority data sources fail, higher ones tried in order. +Basically, any method request that L would normally +handle gets delegated to one of the two attributes: L or to +L. Additionally, some methods need to be distributed +to all existing storages. This way our storage class is a drop in replacement +for L. + +Read traffic is spread across the replicants (slaves) occuring to a user +selected algorithm. The default algorithm is random weighted. + +TODO more details about the algorithm. -=head1 CONFIGURATION +=head1 ATTRIBUTES -=head2 Limit dialect +This class defines the following attributes. -If you use LIMIT in your queries (effectively, if you use -SQL::Abstract::Limit), do not forget to set up limit_dialect (perldoc -SQL::Abstract::Limit) by passing it as an option in the (optional) hash -reference to connect_info. DBIC can not set it up automatically, since it can -not guess DBD::Multi connection types. +=head2 pool_type + +Contains the classname which will instantiate the L object. Defaults +to: L. =cut -sub new { - my $proto = shift; - my $class = ref( $proto ) || $proto; - my $self = {}; +has 'pool_type' => ( + is=>'ro', + isa=>'ClassName', + lazy_build=>1, + handles=>{ + 'create_pool' => 'new', + }, +); - bless( $self, $class ); +=head2 balancer_type - $self->write_source( DBIx::Class::Storage::DBI->new ); - $self->read_source( DBIx::Class::Storage::DBI->new ); +The replication pool requires a balance class to provider the methods for +choose how to spread the query load across each replicant in the pool. - return $self; -} +=cut -sub all_sources { - my $self = shift; +has 'balancer_type' => ( + is=>'ro', + isa=>'ClassName', + lazy_build=>1, + handles=>{ + 'create_balancer' => 'new', + }, +); - my @sources = ($self->read_source, $self->write_source); +=head2 pool - return wantarray ? @sources : \@sources; -} +Is a or derived class. This is a +container class for one or more replicated databases. -sub connect_info { - my( $self, $source_info ) = @_; +=cut - my( $info, $global_options, $options, @dsns ); +has 'pool' => ( + is=>'ro', + isa=>'DBIx::Class::Storage::DBI::Replicated::Pool', + lazy_build=>1, + handles=>[qw/ + connect_replicants + replicants + has_replicants + num_replicants + delete_replicant + /], +); - $info = [ @$source_info ]; +=head2 balancer - $global_options = ref $info->[-1] eq 'HASH' ? pop( @$info ) : {}; - if( ref( $options = $info->[0]->[-1] ) eq 'HASH' ) { - # Local options present in dsn, merge them with global options - map { $global_options->{$_} = $options->{$_} } keys %$options; - pop @{$info->[0]}; - } +Is a or derived class. This +is a class that takes a pool () - # We need to copy-pass $global_options, since connect_info clears it while - # processing options - $self->write_source->connect_info( @{$info->[0]}, { %$global_options } ); +=cut - ## allow either a DSN string or an already connect $dbh. Just remember if - ## you use the $dbh option then DBD::Multi has no idea how to reconnect in - ## the event of a failure. - - @dsns = map { - ## if the first element in the arrayhash is a ref, make that the value - my $db = ref $_->[0] ? $_->[0] : $_; - ($_->[3]->{priority} || 10) => $db; - } @{$info->[0]}[1..@{$info->[0]}-1]; - - $global_options->{dsns} = \@dsns; +has 'balancer' => ( + is=>'ro', + isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer', + lazy_build=>1, +); - $self->read_source->connect_info( [ 'dbi:Multi:', undef, undef, { %$global_options } ] ); -} +=head2 master -sub select { - shift->read_source->select( @_ ); -} -sub select_single { - shift->read_source->select_single( @_ ); -} -sub throw_exception { - shift->read_source->throw_exception( @_ ); -} -sub sql_maker { - shift->read_source->sql_maker( @_ ); -} -sub columns_info_for { - shift->read_source->columns_info_for( @_ ); -} -sub sqlt_type { - shift->read_source->sqlt_type( @_ ); -} -sub create_ddl_dir { - shift->read_source->create_ddl_dir( @_ ); -} -sub deployment_statements { - shift->read_source->deployment_statements( @_ ); -} -sub datetime_parser { - shift->read_source->datetime_parser( @_ ); -} -sub datetime_parser_type { - shift->read_source->datetime_parser_type( @_ ); -} -sub build_datetime_parser { - shift->read_source->build_datetime_parser( @_ ); +The master defines the canonical state for a pool of connected databases. All +the replicants are expected to match this databases state. Thus, in a classic +Master / Slaves distributed system, all the slaves are expected to replicate +the Master's state as quick as possible. This is the only database in the +pool of databases that is allowed to handle write traffic. + +=cut + +has 'master' => ( + is=> 'ro', + isa=>'DBIx::Class::Storage::DBI', + lazy_build=>1, +); + +=head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE + +The following methods are delegated all the methods required for the +L interface. + +=head2 read_handler + +Defines an object that implements the read side of L. + +=cut + +has 'read_handler' => ( + is=>'rw', + isa=>'Object', + lazy_build=>1, + handles=>[qw/ + select + select_single + columns_info_for + /], +); + +=head2 write_handler + +Defines an object that implements the write side of L. + +=cut + +has 'write_handler' => ( + is=>'ro', + isa=>'Object', + lazy_build=>1, + lazy_build=>1, + handles=>[qw/ + on_connect_do + on_disconnect_do + connect_info + throw_exception + sql_maker + sqlt_type + create_ddl_dir + deployment_statements + datetime_parser + datetime_parser_type + last_insert_id + insert + insert_bulk + update + delete + dbh + txn_do + txn_commit + txn_rollback + sth + deploy + schema + /], +); + +=head1 METHODS + +This class defines the following methods. + +=head2 new + +L when instantiating it's storage passed itself as the +first argument. We need to invoke L on the underlying parent class, make +sure we properly give it a L meta class, and then correctly instantiate +our attributes. Basically we pass on whatever the schema has in it's class +data for 'storage_type_args' to our replicated storage type. + +=cut + +sub new { + my $class = shift @_; + my $schema = shift @_; + my $storage_type_args = shift @_; + my $obj = $class->SUPER::new($schema, $storage_type_args, @_); + + ## Hate to do it this way, but can't seem to get advice on the attribute working right + ## maybe we can do a type and coercion for it. + if( $storage_type_args->{balancer_type} && $storage_type_args->{balancer_type}=~m/^::/) { + $storage_type_args->{balancer_type} = 'DBIx::Class::Storage::DBI::Replicated::Balancer'.$storage_type_args->{balancer_type}; + eval "require $storage_type_args->{balancer_type}"; + } + + return $class->meta->new_object( + __INSTANCE__ => $obj, + %$storage_type_args, + @_, + ); } -sub limit_dialect { $_->limit_dialect( @_ ) for( shift->all_sources ) } -sub quote_char { $_->quote_char( @_ ) for( shift->all_sources ) } -sub name_sep { $_->quote_char( @_ ) for( shift->all_sources ) } -sub disconnect { $_->disconnect( @_ ) for( shift->all_sources ) } -sub set_schema { $_->set_schema( @_ ) for( shift->all_sources ) } +=head2 _build_master + +Lazy builder for the L attribute. -sub DESTROY { - my $self = shift; +=cut - undef $self->{write_source}; - undef $self->{read_sources}; +sub _build_master { + DBIx::Class::Storage::DBI->new; } -sub last_insert_id { - shift->write_source->last_insert_id( @_ ); +=head2 _build_pool_type + +Lazy builder for the L attribute. + +=cut + +sub _build_pool_type { + return 'DBIx::Class::Storage::DBI::Replicated::Pool'; } -sub insert { - shift->write_source->insert( @_ ); + +=head2 _build_pool + +Lazy builder for the L attribute. + +=cut + +sub _build_pool { + shift->create_pool; } -sub update { - shift->write_source->update( @_ ); + +=head2 _build_balancer_type + +Lazy builder for the L attribute. + +=cut + +sub _build_balancer_type { + return 'DBIx::Class::Storage::DBI::Replicated::Balancer'; } -sub update_all { - shift->write_source->update_all( @_ ); + +=head2 _build_balancer + +Lazy builder for the L attribute. This takes a Pool object so that +the balancer knows which pool it's balancing. + +=cut + +sub _build_balancer { + my $self = shift @_; + $self->create_balancer( + pool=>$self->pool, + master=>$self->master); } -sub delete { - shift->write_source->delete( @_ ); + +=head2 _build_write_handler + +Lazy builder for the L attribute. The default is to set this to +the L. + +=cut + +sub _build_write_handler { + return shift->master; } -sub delete_all { - shift->write_source->delete_all( @_ ); + +=head2 _build_read_handler + +Lazy builder for the L attribute. The default is to set this to +the L. + +=cut + +sub _build_read_handler { + return shift->balancer; } -sub create { - shift->write_source->create( @_ ); + +=head2 around: connect_replicants + +All calls to connect_replicants needs to have an existing $schema tacked onto +top of the args, since L needs it. + +=cut + +around 'connect_replicants' => sub { + my ($method, $self, @args) = @_; + $self->$method($self->schema, @args); +}; + +=head2 all_storages + +Returns an array of of all the connected storage backends. The first element +in the returned array is the master, and the remainings are each of the +replicants. + +=cut + +sub all_storages { + my $self = shift @_; + + return grep {defined $_ && blessed $_} ( + $self->master, + $self->replicants, + ); } -sub find_or_create { - shift->write_source->find_or_create( @_ ); + +=head2 set_reliable_storage + +Sets the current $schema to be 'reliable', that is all queries, both read and +write are sent to the master + +=cut + +sub set_reliable_storage { + my $self = shift @_; + my $schema = $self->schema; + my $write_handler = $self->schema->storage->write_handler; + + $schema->storage->read_handler($write_handler); } -sub update_or_create { - shift->write_source->update_or_create( @_ ); + +=head2 set_balanced_storage + +Sets the current $schema to be use the for all reads, while all +writea are sent to the master only + +=cut + +sub set_balanced_storage { + my $self = shift @_; + my $schema = $self->schema; + my $write_handler = $self->schema->storage->balancer; + + $schema->storage->read_handler($write_handler); } + +=head2 connected + +Check that the master and at least one of the replicants is connected. + +=cut + sub connected { - shift->write_source->connected( @_ ); + my $self = shift @_; + + return + $self->master->connected && + $self->pool->connected_replicants; } + +=head2 ensure_connected + +Make sure all the storages are connected. + +=cut + sub ensure_connected { - shift->write_source->ensure_connected( @_ ); + my $self = shift @_; + foreach my $source ($self->all_storages) { + $source->ensure_connected(@_); + } +} + +=head2 limit_dialect + +Set the limit_dialect for all existing storages + +=cut + +sub limit_dialect { + my $self = shift @_; + foreach my $source ($self->all_storages) { + $source->limit_dialect(@_); + } } -sub dbh { - shift->write_source->dbh( @_ ); + +=head2 quote_char + +Set the quote_char for all existing storages + +=cut + +sub quote_char { + my $self = shift @_; + foreach my $source ($self->all_storages) { + $source->quote_char(@_); + } } -sub txn_begin { - shift->write_source->txn_begin( @_ ); + +=head2 name_sep + +Set the name_sep for all existing storages + +=cut + +sub name_sep { + my $self = shift @_; + foreach my $source ($self->all_storages) { + $source->name_sep(@_); + } } -sub txn_commit { - shift->write_source->txn_commit( @_ ); + +=head2 set_schema + +Set the schema object for all existing storages + +=cut + +sub set_schema { + my $self = shift @_; + foreach my $source ($self->all_storages) { + $source->set_schema(@_); + } } -sub txn_rollback { - shift->write_source->txn_rollback( @_ ); + +=head2 debug + +set a debug flag across all storages + +=cut + +sub debug { + my $self = shift @_; + foreach my $source ($self->all_storages) { + $source->debug(@_); + } } -sub sth { - shift->write_source->sth( @_ ); + +=head2 debugobj + +set a debug object across all storages + +=cut + +sub debugobj { + my $self = shift @_; + foreach my $source ($self->all_storages) { + $source->debugobj(@_); + } } -sub deploy { - shift->write_source->deploy( @_ ); + +=head2 debugfh + +set a debugfh object across all storages + +=cut + +sub debugfh { + my $self = shift @_; + foreach my $source ($self->all_storages) { + $source->debugfh(@_); + } } +=head2 debugcb -sub debugfh { shift->_not_supported( 'debugfh' ) }; -sub debugcb { shift->_not_supported( 'debugcb' ) }; +set a debug callback across all storages -sub _not_supported { - my( $self, $method ) = @_; +=cut - die "This Storage does not support $method method."; +sub debugcb { + my $self = shift @_; + foreach my $source ($self->all_storages) { + $source->debugcb(@_); + } } -=head1 SEE ALSO +=head2 disconnect + +disconnect everything -L, L, L +=cut + +sub disconnect { + my $self = shift @_; + foreach my $source ($self->all_storages) { + $source->disconnect(@_); + } +} =head1 AUTHOR @@ -227,6 +516,8 @@ Norbert Csongr Peter Siklósi +John Napiorkowski + =head1 LICENSE You may distribute this code under the same terms as Perl itself.