X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?p=dbsrgits%2FDBIx-Class.git;a=blobdiff_plain;f=lib%2FDBIx%2FClass%2FStorage%2FDBI%2FReplicated.pm;h=9e3f59ccc3bbec0afbe9a1637eade547770a3fde;hp=7dd1d0d98bcf94511593cdb32d8eb75ef41c0dcc;hb=cea43436e10983c218ded47e1561183096685f9b;hpb=150bda3538a5a7e52fff51b70d3cde283b35a7ef diff --git a/lib/DBIx/Class/Storage/DBI/Replicated.pm b/lib/DBIx/Class/Storage/DBI/Replicated.pm index 7dd1d0d..9e3f59c 100644 --- a/lib/DBIx/Class/Storage/DBI/Replicated.pm +++ b/lib/DBIx/Class/Storage/DBI/Replicated.pm @@ -1,15 +1,44 @@ package DBIx::Class::Storage::DBI::Replicated; +BEGIN { + use Carp::Clan qw/^DBIx::Class/; + + ## Modules required for Replication support not required for general DBIC + ## use, so we explicitly test for these. + + my %replication_required = ( + 'Moose' => '0.90', + 'MooseX::Types' => '0.21', + 'namespace::clean' => '0.11', + 'Hash::Merge' => '0.11' + ); + + my @didnt_load; + + for my $module (keys %replication_required) { + eval "use $module $replication_required{$module}"; + push @didnt_load, "$module $replication_required{$module}" + if $@; + } + + croak("@{[ join ', ', @didnt_load ]} are missing and are required for Replication") + if @didnt_load; +} + use Moose; -use Class::MOP; -use Moose::Util::TypeConstraints; use DBIx::Class::Storage::DBI; use DBIx::Class::Storage::DBI::Replicated::Pool; use DBIx::Class::Storage::DBI::Replicated::Balancer; +use DBIx::Class::Storage::DBI::Replicated::Types qw/BalancerClassNamePart DBICSchema DBICStorageDBI/; +use MooseX::Types::Moose qw/ClassName HashRef Object/; +use Scalar::Util 'reftype'; +use Hash::Merge 'merge'; + +use namespace::clean -except => 'meta'; =head1 NAME -DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support +DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support =head1 SYNOPSIS @@ -17,37 +46,51 @@ The Following example shows how to change an existing $schema to a replicated storage type, add some replicated (readonly) databases, and perform reporting tasks. - ## Change storage_type in your schema class +You should set the 'storage_type attribute to a replicated type. You should +also define your arguments, such as which balancer you want and any arguments +that the Pool object should get. + + my $schema = Schema::Class->clone; $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] ); - - ## Add some slaves. Basically this is an array of arrayrefs, where each - ## arrayref is database connect information - + $schema->connection(...); + +Next, you need to add in the Replicants. Basically this is an array of +arrayrefs, where each arrayref is database connect information. Think of these +arguments as what you'd pass to the 'normal' $schema->connect method. + $schema->storage->connect_replicants( [$dsn1, $user, $pass, \%opts], [$dsn2, $user, $pass, \%opts], [$dsn3, $user, $pass, \%opts], ); - - ## Now, just use the $schema as normal + +Now, just use the $schema as you normally would. Automatically all reads will +be delegated to the replicants, while writes to the master. + $schema->resultset('Source')->search({name=>'etc'}); - - ## You can force a given query to use a particular storage using the search - ### attribute 'force_pool'. For example: - + +You can force a given query to use a particular storage using the search +attribute 'force_pool'. For example: + my $RS = $schema->resultset('Source')->search(undef, {force_pool=>'master'}); - - ## Now $RS will force everything (both reads and writes) to use whatever was - ## setup as the master storage. 'master' is hardcoded to always point to the - ## Master, but you can also use any Replicant name. Please see: - ## L and the replicants attribute for - ## More. Also see transactions and L for alternative ways - ## to force read traffic to the master. - + +Now $RS will force everything (both reads and writes) to use whatever was setup +as the master storage. 'master' is hardcoded to always point to the Master, +but you can also use any Replicant name. Please see: +L and the replicants attribute for more. + +Also see transactions and L for alternative ways to +force read traffic to the master. In general, you should wrap your statements +in a transaction when you are reading and writing to the same tables at the +same time, since your replicants will often lag a bit behind the master. + +See L for more help and +walkthroughs. + =head1 DESCRIPTION Warning: This class is marked BETA. This has been running a production -website using MySQL native replication as it's backend and we have some decent +website using MySQL native replication as its backend and we have some decent test coverage but the code hasn't yet been stressed by a variety of databases. Individual DB's may have quirks we are not aware of. Please use this in first development and pass along your experiences/bug fixes. @@ -69,9 +112,21 @@ selected algorithm. The default algorithm is random weighted. =head1 NOTES The consistancy betweeen master and replicants is database specific. The Pool -gives you a method to validate it's replicants, removing and replacing them +gives you a method to validate its replicants, removing and replacing them when they fail/pass predefined criteria. Please make careful use of the ways -to force a query to run against Master when needed. +to force a query to run against Master when needed. + +=head1 REQUIREMENTS + +Replicated Storage has additional requirements not currently part of L + + Moose => '0.90', + MooseX::Types => '0.21', + namespace::clean => '0.11', + Hash::Merge => '0.11' + +You will need to install these modules manually via CPAN or make them part of the +Makefile for your distribution. =head1 ATTRIBUTES @@ -85,7 +140,7 @@ The underlying L object this storage is attaching has 'schema' => ( is=>'rw', - isa=>'DBIx::Class::Schema', + isa=>DBICSchema, weak_ref=>1, required=>1, ); @@ -98,9 +153,8 @@ to: L. =cut has 'pool_type' => ( - is=>'ro', - isa=>'ClassName', - required=>1, + is=>'rw', + isa=>ClassName, default=>'DBIx::Class::Storage::DBI::Replicated::Pool', handles=>{ 'create_pool' => 'new', @@ -110,15 +164,14 @@ has 'pool_type' => ( =head2 pool_args Contains a hashref of initialized information to pass to the Balancer object. -See L for available arguments. +See L for available arguments. =cut has 'pool_args' => ( - is=>'ro', - isa=>'HashRef', + is=>'rw', + isa=>HashRef, lazy=>1, - required=>1, default=>sub { {} }, ); @@ -130,23 +183,9 @@ choose how to spread the query load across each replicant in the pool. =cut -subtype 'DBIx::Class::Storage::DBI::Replicated::BalancerClassNamePart', - as 'ClassName'; - -coerce 'DBIx::Class::Storage::DBI::Replicated::BalancerClassNamePart', - from 'Str', - via { - my $type = $_; - if($type=~m/^::/) { - $type = 'DBIx::Class::Storage::DBI::Replicated::Balancer'.$type; - } - Class::MOP::load_class($type); - $type; - }; - has 'balancer_type' => ( - is=>'ro', - isa=>'DBIx::Class::Storage::DBI::Replicated::BalancerClassNamePart', + is=>'rw', + isa=>BalancerClassNamePart, coerce=>1, required=>1, default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First', @@ -158,13 +197,13 @@ has 'balancer_type' => ( =head2 balancer_args Contains a hashref of initialized information to pass to the Balancer object. -See L for available arguments. +See L for available arguments. =cut has 'balancer_args' => ( - is=>'ro', - isa=>'HashRef', + is=>'rw', + isa=>HashRef, lazy=>1, required=>1, default=>sub { {} }, @@ -182,7 +221,7 @@ has 'pool' => ( isa=>'DBIx::Class::Storage::DBI::Replicated::Pool', lazy_build=>1, handles=>[qw/ - connect_replicants + connect_replicants replicants has_replicants /], @@ -196,7 +235,7 @@ is a class that takes a pool () =cut has 'balancer' => ( - is=>'ro', + is=>'rw', isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer', lazy_build=>1, handles=>[qw/auto_validate_every/], @@ -214,7 +253,7 @@ pool of databases that is allowed to handle write traffic. has 'master' => ( is=> 'ro', - isa=>'DBIx::Class::Storage::DBI', + isa=>DBICStorageDBI, lazy_build=>1, ); @@ -231,13 +270,13 @@ Defines an object that implements the read side of L. has 'read_handler' => ( is=>'rw', - isa=>'Object', + isa=>Object, lazy_build=>1, handles=>[qw/ select select_single columns_info_for - /], + /], ); =head2 write_handler @@ -248,12 +287,11 @@ Defines an object that implements the write side of L. has 'write_handler' => ( is=>'ro', - isa=>'Object', - lazy_build=>1, + isa=>Object, lazy_build=>1, - handles=>[qw/ + handles=>[qw/ on_connect_do - on_disconnect_do + on_disconnect_do connect_info throw_exception sql_maker @@ -261,7 +299,8 @@ has 'write_handler' => ( create_ddl_dir deployment_statements datetime_parser - datetime_parser_type + datetime_parser_type + build_datetime_parser last_insert_id insert insert_bulk @@ -275,33 +314,116 @@ has 'write_handler' => ( txn_scope_guard sth deploy - + with_deferred_fk_checks + dbh_do reload_row + with_deferred_fk_checks _prep_for_execute - configure_sqlt - + + backup + is_datatype_numeric + _count_select + _subq_count_select + _subq_update_delete + svp_rollback + svp_begin + svp_release /], ); +has _master_connect_info_opts => + (is => 'rw', isa => HashRef, default => sub { {} }); + +=head2 around: connect_info + +Preserve master's C options (for merging with replicants.) +Also set any Replicated related options from connect_info, such as +C, C, C and C. + +=cut + +around connect_info => sub { + my ($next, $self, $info, @extra) = @_; + + my $wantarray = wantarray; + + my %opts; + for my $arg (@$info) { + next unless (reftype($arg)||'') eq 'HASH'; + %opts = %{ merge($arg, \%opts) }; + } + delete $opts{dsn}; + + if (@opts{qw/pool_type pool_args/}) { + $self->pool_type(delete $opts{pool_type}) + if $opts{pool_type}; + + $self->pool_args( + merge((delete $opts{pool_args} || {}), $self->pool_args) + ); + + $self->pool($self->_build_pool) + if $self->pool; + } + + if (@opts{qw/balancer_type balancer_args/}) { + $self->balancer_type(delete $opts{balancer_type}) + if $opts{balancer_type}; + + $self->balancer_args( + merge((delete $opts{balancer_args} || {}), $self->balancer_args) + ); + + $self->balancer($self->_build_balancer) + if $self->balancer; + } + + $self->_master_connect_info_opts(\%opts); + + my (@res, $res); + if ($wantarray) { + @res = $self->$next($info, @extra); + } else { + $res = $self->$next($info, @extra); + } + + # Make sure master is blessed into the correct class and apply role to it. + my $master = $self->master; + $master->_determine_driver; + Moose::Meta::Class->initialize(ref $master); + + my $class = Moose::Meta::Class->create_anon_class( + superclasses => [ ref $master ], + roles => [ 'DBIx::Class::Storage::DBI::Replicated::WithDSN' ], + cache => 1, + ); + $class->rebless_instance($master); + + # link pool back to master + $self->pool->master($master); + + $wantarray ? @res : $res; +}; + =head1 METHODS This class defines the following methods. =head2 BUILDARGS -L when instantiating it's storage passed itself as the +L when instantiating its storage passed itself as the first argument. So we need to massage the arguments a bit so that all the bits get put into the correct places. =cut sub BUILDARGS { - my ($class, $schema, $storage_type_args, @args) = @_; - + my ($class, $schema, $storage_type_args, @args) = @_; + return { - schema=>$schema, - %$storage_type_args, - @args + schema=>$schema, + %$storage_type_args, + @args } } @@ -313,7 +435,8 @@ Lazy builder for the L attribute. sub _build_master { my $self = shift @_; - DBIx::Class::Storage::DBI->new($self->schema); + my $master = DBIx::Class::Storage::DBI->new($self->schema); + $master } =head2 _build_pool @@ -337,7 +460,7 @@ the balancer knows which pool it's balancing. sub _build_balancer { my $self = shift @_; $self->create_balancer( - pool=>$self->pool, + pool=>$self->pool, master=>$self->master, %{$self->balancer_args}, ); @@ -368,13 +491,56 @@ sub _build_read_handler { =head2 around: connect_replicants All calls to connect_replicants needs to have an existing $schema tacked onto -top of the args, since L needs it. +top of the args, since L needs it, and any C +options merged with the master, with replicant opts having higher priority. =cut -around 'connect_replicants' => sub { - my ($method, $self, @args) = @_; - $self->$method($self->schema, @args); +around connect_replicants => sub { + my ($next, $self, @args) = @_; + + for my $r (@args) { + $r = [ $r ] unless reftype $r eq 'ARRAY'; + + $self->throw_exception('coderef replicant connect_info not supported') + if ref $r->[0] && reftype $r->[0] eq 'CODE'; + +# any connect_info options? + my $i = 0; + $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH'; + +# make one if none + $r->[$i] = {} unless $r->[$i]; + +# merge if two hashes + my @hashes = @$r[$i .. $#{$r}]; + + $self->throw_exception('invalid connect_info options') + if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes; + + $self->throw_exception('too many hashrefs in connect_info') + if @hashes > 2; + + my %opts = %{ merge(reverse @hashes) }; + +# delete them + splice @$r, $i+1, ($#{$r} - $i), (); + +# make sure master/replicants opts don't clash + my %master_opts = %{ $self->_master_connect_info_opts }; + if (exists $opts{dbh_maker}) { + delete @master_opts{qw/dsn user password/}; + } + delete $master_opts{dbh_maker}; + +# merge with master + %opts = %{ merge(\%opts, \%master_opts) }; + +# update + $r->[$i] = \%opts; + } + + $self->$next($self->schema, @args); }; =head2 all_storages @@ -389,7 +555,7 @@ sub all_storages { my $self = shift @_; return grep {defined $_ && blessed $_} ( $self->master, - $self->replicants, + values %{ $self->replicants }, ); } @@ -415,36 +581,26 @@ inserted something and need to get a resultset including it, etc. =cut -use Benchmark; - sub execute_reliably { my ($self, $coderef, @args) = @_; - + unless( ref $coderef eq 'CODE') { $self->throw_exception('Second argument must be a coderef'); } - - my $t0 = new Benchmark; - my $clone = $self->clone; - my $t1 = new Benchmark; - my $td = timediff($t1, $t0); - warn "----------------- the code took:",timestr($td),"\n"; - - - + ##Get copy of master storage my $master = $self->master; - + ##Get whatever the current read hander is my $current = $self->read_handler; - + ##Set the read handler to master $self->read_handler($master); - + ## do whatever the caller needs my @result; my $want_array = wantarray; - + eval { if($want_array) { @result = $coderef->(@args); @@ -452,15 +608,15 @@ sub execute_reliably { ($result[0]) = ($coderef->(@args)); } else { $coderef->(@args); - } + } }; - + ##Reset to the original state - $self->read_handler($current); - + $self->read_handler($current); + ##Exception testing has to come last, otherwise you might leave the ##read_handler set to master. - + if($@) { $self->throw_exception("coderef returned an error: $@"); } else { @@ -472,14 +628,14 @@ sub execute_reliably { Sets the current $schema to be 'reliable', that is all queries, both read and write are sent to the master - + =cut sub set_reliable_storage { my $self = shift @_; my $schema = $self->schema; my $write_handler = $self->schema->storage->write_handler; - + $schema->storage->read_handler($write_handler); } @@ -487,29 +643,16 @@ sub set_reliable_storage { Sets the current $schema to be use the for all reads, while all writea are sent to the master only - + =cut sub set_balanced_storage { my $self = shift @_; my $schema = $self->schema; - my $write_handler = $self->schema->storage->balancer; - - $schema->storage->read_handler($write_handler); -} - -=head2 around: txn_do ($coderef) - -Overload to the txn_do method, which is delegated to whatever the -L is set to. We overload this in order to wrap in inside a -L method. - -=cut + my $balanced_handler = $self->schema->storage->balancer; -around 'txn_do' => sub { - my($txn_do, $self, $coderef, @args) = @_; - $self->execute_reliably(sub {$self->$txn_do($coderef, @args)}); -}; + $schema->storage->read_handler($balanced_handler); +} =head2 connected @@ -548,6 +691,7 @@ sub limit_dialect { foreach my $source ($self->all_storages) { $source->limit_dialect(@_); } + return $self->master->quote_char; } =head2 quote_char @@ -561,6 +705,7 @@ sub quote_char { foreach my $source ($self->all_storages) { $source->quote_char(@_); } + return $self->master->quote_char; } =head2 name_sep @@ -574,6 +719,7 @@ sub name_sep { foreach my $source ($self->all_storages) { $source->name_sep(@_); } + return $self->master->name_sep; } =head2 set_schema @@ -597,48 +743,45 @@ set a debug flag across all storages sub debug { my $self = shift @_; - foreach my $source ($self->all_storages) { - $source->debug(@_); + if(@_) { + foreach my $source ($self->all_storages) { + $source->debug(@_); + } } + return $self->master->debug; } =head2 debugobj -set a debug object across all storages +set a debug object =cut sub debugobj { my $self = shift @_; - foreach my $source ($self->all_storages) { - $source->debugobj(@_); - } + return $self->master->debugobj(@_); } =head2 debugfh -set a debugfh object across all storages +set a debugfh object =cut sub debugfh { my $self = shift @_; - foreach my $source ($self->all_storages) { - $source->debugfh(@_); - } + return $self->master->debugfh(@_); } =head2 debugcb -set a debug callback across all storages +set a debug callback =cut sub debugcb { my $self = shift @_; - foreach my $source ($self->all_storages) { - $source->debugcb(@_); - } + return $self->master->debugcb(@_); } =head2 disconnect @@ -654,6 +797,21 @@ sub disconnect { } } +=head2 cursor_class + +set cursor class on all storages, or return master's + +=cut + +sub cursor_class { + my ($self, $cursor_class) = @_; + + if ($cursor_class) { + $_->cursor_class($cursor_class) for $self->all_storages; + } + $self->master->cursor_class; +} + =head1 GOTCHAS Due to the fact that replicants can lag behind a master, you must take care to @@ -687,7 +845,7 @@ using the Schema clone method. my $new_schema = $schema->clone; $new_schema->set_reliable_storage; - + ## $new_schema will use only the Master storage for all reads/writes while ## the $schema object will use replicated storage.