X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?p=dbsrgits%2FDBIx-Class.git;a=blobdiff_plain;f=lib%2FDBIx%2FClass%2FStorage%2FDBI%2FReplicated.pm;h=447747514dd38272da3c17fa1c9e635944407822;hp=c95a0848b199204354157dec648dc030f235bd44;hb=64ae166780d0cb2b9577e506da9b9b240c146d20;hpb=c4d3fae2d68b67050e6f4a04031608506e9a18e8 diff --git a/lib/DBIx/Class/Storage/DBI/Replicated.pm b/lib/DBIx/Class/Storage/DBI/Replicated.pm index c95a084..4477475 100644 --- a/lib/DBIx/Class/Storage/DBI/Replicated.pm +++ b/lib/DBIx/Class/Storage/DBI/Replicated.pm @@ -1,41 +1,84 @@ package DBIx::Class::Storage::DBI::Replicated; +BEGIN { + use Carp::Clan qw/^DBIx::Class/; + use DBIx::Class; + croak('The following modules are required for Replication ' . DBIx::Class::Optional::Dependencies->req_missing_for ('replicated') ) + unless DBIx::Class::Optional::Dependencies->req_ok_for ('replicated'); +} + use Moose; use DBIx::Class::Storage::DBI; use DBIx::Class::Storage::DBI::Replicated::Pool; use DBIx::Class::Storage::DBI::Replicated::Balancer; -use Scalar::Util qw(blessed); +use DBIx::Class::Storage::DBI::Replicated::Types qw/BalancerClassNamePart DBICSchema DBICStorageDBI/; +use MooseX::Types::Moose qw/ClassName HashRef Object/; +use Scalar::Util 'reftype'; +use Hash::Merge; +use List::Util qw/min max reduce/; +use Try::Tiny; +use namespace::clean; -extends 'DBIx::Class::Storage::DBI', 'Moose::Object'; +use namespace::clean -except => 'meta'; =head1 NAME -DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support +DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support =head1 SYNOPSIS The Following example shows how to change an existing $schema to a replicated -storage type, add some replicated (readonly) databases, and perform reporting +storage type, add some replicated (read-only) databases, and perform reporting tasks. - ## Change storage_type in your schema class - $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] ); - - ## Add some slaves. Basically this is an array of arrayrefs, where each - ## arrayref is database connect information - - $schema->storage->connect_replicants( - [$dsn1, $user, $pass, \%opts], - [$dsn1, $user, $pass, \%opts], - [$dsn1, $user, $pass, \%opts], - ); - +You should set the 'storage_type attribute to a replicated type. You should +also define your arguments, such as which balancer you want and any arguments +that the Pool object should get. + + my $schema = Schema::Class->clone; + $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] ); + $schema->connection(...); + +Next, you need to add in the Replicants. Basically this is an array of +arrayrefs, where each arrayref is database connect information. Think of these +arguments as what you'd pass to the 'normal' $schema->connect method. + + $schema->storage->connect_replicants( + [$dsn1, $user, $pass, \%opts], + [$dsn2, $user, $pass, \%opts], + [$dsn3, $user, $pass, \%opts], + ); + +Now, just use the $schema as you normally would. Automatically all reads will +be delegated to the replicants, while writes to the master. + + $schema->resultset('Source')->search({name=>'etc'}); + +You can force a given query to use a particular storage using the search +attribute 'force_pool'. For example: + + my $RS = $schema->resultset('Source')->search(undef, {force_pool=>'master'}); + +Now $RS will force everything (both reads and writes) to use whatever was setup +as the master storage. 'master' is hardcoded to always point to the Master, +but you can also use any Replicant name. Please see: +L and the replicants attribute for more. + +Also see transactions and L for alternative ways to +force read traffic to the master. In general, you should wrap your statements +in a transaction when you are reading and writing to the same tables at the +same time, since your replicants will often lag a bit behind the master. + +See L for more help and +walkthroughs. + =head1 DESCRIPTION -Warning: This class is marked ALPHA. We are using this in development and have -some basic test coverage but the code hasn't yet been stressed by a variety -of databases. Individual DB's may have quirks we are not aware of. Please -use this in development and pass along your experiences/bug fixes. +Warning: This class is marked BETA. This has been running a production +website using MySQL native replication as its backend and we have some decent +test coverage but the code hasn't yet been stressed by a variety of databases. +Individual DBs may have quirks we are not aware of. Please use this in first +development and pass along your experiences/bug fixes. This class implements replicated data store for DBI. Currently you can define one master and numerous slave database connections. All write-type queries @@ -48,50 +91,66 @@ L. Additionally, some methods need to be distributed to all existing storages. This way our storage class is a drop in replacement for L. -Read traffic is spread across the replicants (slaves) occuring to a user +Read traffic is spread across the replicants (slaves) occurring to a user selected algorithm. The default algorithm is random weighted. =head1 NOTES -The consistancy betweeen master and replicants is database specific. The Pool -gives you a method to validate it's replicants, removing and replacing them -when they fail/pass predefined criteria. It is recommened that your application -define two schemas, one using the replicated storage and another that just -connects to the master. +The consistency between master and replicants is database specific. The Pool +gives you a method to validate its replicants, removing and replacing them +when they fail/pass predefined criteria. Please make careful use of the ways +to force a query to run against Master when needed. + +=head1 REQUIREMENTS + +Replicated Storage has additional requirements not currently part of +L. See L for more details. =head1 ATTRIBUTES This class defines the following attributes. +=head2 schema + +The underlying L object this storage is attaching + +=cut + +has 'schema' => ( + is=>'rw', + isa=>DBICSchema, + weak_ref=>1, + required=>1, +); + =head2 pool_type -Contains the classname which will instantiate the L object. Defaults +Contains the classname which will instantiate the L object. Defaults to: L. =cut has 'pool_type' => ( - is=>'ro', - isa=>'ClassName', - lazy_build=>1, - handles=>{ - 'create_pool' => 'new', - }, + is=>'rw', + isa=>ClassName, + default=>'DBIx::Class::Storage::DBI::Replicated::Pool', + handles=>{ + 'create_pool' => 'new', + }, ); =head2 pool_args Contains a hashref of initialized information to pass to the Balancer object. -See L for available arguments. +See L for available arguments. =cut has 'pool_args' => ( - is=>'ro', - isa=>'HashRef', - lazy=>1, - required=>1, - default=>sub { {} }, + is=>'rw', + isa=>HashRef, + lazy=>1, + default=>sub { {} }, ); @@ -103,59 +162,61 @@ choose how to spread the query load across each replicant in the pool. =cut has 'balancer_type' => ( - is=>'ro', - isa=>'ClassName', - lazy_build=>1, - handles=>{ - 'create_balancer' => 'new', - }, + is=>'rw', + isa=>BalancerClassNamePart, + coerce=>1, + required=>1, + default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First', + handles=>{ + 'create_balancer' => 'new', + }, ); =head2 balancer_args Contains a hashref of initialized information to pass to the Balancer object. -See L for available arguments. +See L for available arguments. =cut has 'balancer_args' => ( - is=>'ro', - isa=>'HashRef', - lazy=>1, - required=>1, - default=>sub { {} }, + is=>'rw', + isa=>HashRef, + lazy=>1, + required=>1, + default=>sub { {} }, ); =head2 pool -Is a or derived class. This is a +Is a L or derived class. This is a container class for one or more replicated databases. =cut has 'pool' => ( - is=>'ro', - isa=>'DBIx::Class::Storage::DBI::Replicated::Pool', - lazy_build=>1, - handles=>[qw/ - connect_replicants - replicants - has_replicants - /], + is=>'ro', + isa=>'DBIx::Class::Storage::DBI::Replicated::Pool', + lazy_build=>1, + handles=>[qw/ + connect_replicants + replicants + has_replicants + /], ); =head2 balancer -Is a or derived class. This -is a class that takes a pool () +Is a L or derived class. This +is a class that takes a pool (L) =cut has 'balancer' => ( - is=>'ro', - isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer', - lazy_build=>1, - handles=>[qw/auto_validate_every/], + is=>'rw', + isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer', + lazy_build=>1, + handles=>[qw/auto_validate_every/], ); =head2 master @@ -169,14 +230,14 @@ pool of databases that is allowed to handle write traffic. =cut has 'master' => ( - is=> 'ro', - isa=>'DBIx::Class::Storage::DBI', - lazy_build=>1, + is=> 'ro', + isa=>DBICStorageDBI, + lazy_build=>1, ); =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE -The following methods are delegated all the methods required for the +The following methods are delegated all the methods required for the L interface. =head2 read_handler @@ -186,125 +247,271 @@ Defines an object that implements the read side of L. =cut has 'read_handler' => ( - is=>'rw', - isa=>'Object', - lazy_build=>1, - handles=>[qw/ - select - select_single - columns_info_for - /], + is=>'rw', + isa=>Object, + lazy_build=>1, + handles=>[qw/ + select + select_single + columns_info_for + _dbh_columns_info_for + _select + /], ); =head2 write_handler -Defines an object that implements the write side of L. +Defines an object that implements the write side of L, +as well as methods that don't write or read that can be called on only one +storage, methods that return a C<$dbh>, and any methods that don't make sense to +run on a replicant. =cut has 'write_handler' => ( - is=>'ro', - isa=>'Object', - lazy_build=>1, - lazy_build=>1, - handles=>[qw/ - on_connect_do - on_disconnect_do - connect_info - throw_exception - sql_maker - sqlt_type - create_ddl_dir - deployment_statements - datetime_parser - datetime_parser_type - last_insert_id - insert - insert_bulk - update - delete - dbh - txn_commit - txn_rollback - sth - deploy - schema - /], + is=>'ro', + isa=>Object, + lazy_build=>1, + handles=>[qw/ + on_connect_do + on_disconnect_do + on_connect_call + on_disconnect_call + connect_info + _connect_info + throw_exception + sql_maker + sqlt_type + create_ddl_dir + deployment_statements + datetime_parser + datetime_parser_type + build_datetime_parser + last_insert_id + insert + insert_bulk + update + delete + dbh + txn_begin + txn_do + txn_commit + txn_rollback + txn_scope_guard + sth + deploy + with_deferred_fk_checks + dbh_do + reload_row + with_deferred_fk_checks + _prep_for_execute + + backup + is_datatype_numeric + _count_select + _subq_update_delete + svp_rollback + svp_begin + svp_release + relname_to_table_alias + _dbh_last_insert_id + _fix_bind_params + _default_dbi_connect_attributes + _dbi_connect_info + _dbic_connect_attributes + auto_savepoint + _sqlt_version_ok + _query_end + bind_attribute_by_data_type + transaction_depth + _dbh + _select_args + _dbh_execute_array + _sql_maker + _query_start + _sqlt_version_error + _per_row_update_delete + _dbh_begin_work + _dbh_execute_inserts_with_no_binds + _select_args_to_query + _svp_generate_name + _multipk_update_delete + source_bind_attributes + _normalize_connect_info + _parse_connect_do + _dbh_commit + _execute_array + savepoints + _sqlt_minimum_version + _sql_maker_opts + _conn_pid + _conn_tid + _dbh_autocommit + _native_data_type + _get_dbh + sql_maker_class + _dbh_rollback + _adjust_select_args_for_complex_prefetch + _resolve_ident_sources + _resolve_column_info + _prune_unused_joins + _strip_cond_qualifiers + _extract_order_columns + _resolve_aliastypes_from_select_args + _execute + _do_query + _dbh_sth + _dbh_execute + _prefetch_insert_auto_nextvals + /], ); -=head1 METHODS +my @unimplemented = qw( + _arm_global_destructor + _preserve_foreign_dbh + _verify_pid + _verify_tid -This class defines the following methods. + get_use_dbms_capability + set_use_dbms_capability + get_dbms_capability + set_dbms_capability + _dbh_details -=head2 new + sql_limit_dialect -L when instantiating it's storage passed itself as the -first argument. We need to invoke L on the underlying parent class, make -sure we properly give it a L meta class, and then correctly instantiate -our attributes. Basically we pass on whatever the schema has in it's class -data for 'storage_type_args' to our replicated storage type. + _inner_join_to_node + _group_over_selection +); -=cut +# the capability framework +# not sure if CMOP->initialize does evil things to DBIC::S::DBI, fix if a problem +push @unimplemented, ( grep + { $_ =~ /^ _ (?: use | supports | determine_supports ) _ /x } + ( Class::MOP::Class->initialize('DBIx::Class::Storage::DBI')->get_all_method_names ) +); -sub new { - my $class = shift @_; - my $schema = shift @_; - my $storage_type_args = shift @_; - my $obj = $class->SUPER::new($schema, $storage_type_args, @_); - - ## Hate to do it this way, but can't seem to get advice on the attribute working right - ## maybe we can do a type and coercion for it. - if( $storage_type_args->{balancer_type} && $storage_type_args->{balancer_type}=~m/^::/) { - $storage_type_args->{balancer_type} = 'DBIx::Class::Storage::DBI::Replicated::Balancer'.$storage_type_args->{balancer_type}; - eval "require $storage_type_args->{balancer_type}"; - } - - return $class->meta->new_object( - __INSTANCE__ => $obj, - %$storage_type_args, - @_, - ); +for my $method (@unimplemented) { + __PACKAGE__->meta->add_method($method, sub { + croak "$method must not be called on ".(blessed shift).' objects'; + }); } -=head2 _build_master +has _master_connect_info_opts => + (is => 'rw', isa => HashRef, default => sub { {} }); -Lazy builder for the L attribute. +=head2 around: connect_info + +Preserves master's C options (for merging with replicants.) +Also sets any Replicated-related options from connect_info, such as +C, C, C and C. =cut -sub _build_master { - DBIx::Class::Storage::DBI->new; -} +around connect_info => sub { + my ($next, $self, $info, @extra) = @_; + + my $wantarray = wantarray; + + my $merge = Hash::Merge->new('LEFT_PRECEDENT'); + + my %opts; + for my $arg (@$info) { + next unless (reftype($arg)||'') eq 'HASH'; + %opts = %{ $merge->merge($arg, \%opts) }; + } + delete $opts{dsn}; + + if (@opts{qw/pool_type pool_args/}) { + $self->pool_type(delete $opts{pool_type}) + if $opts{pool_type}; + + $self->pool_args( + $merge->merge((delete $opts{pool_args} || {}), $self->pool_args) + ); + + ## Since we possibly changed the pool_args, we need to clear the current + ## pool object so that next time it is used it will be rebuilt. + $self->clear_pool; + } + + if (@opts{qw/balancer_type balancer_args/}) { + $self->balancer_type(delete $opts{balancer_type}) + if $opts{balancer_type}; + + $self->balancer_args( + $merge->merge((delete $opts{balancer_args} || {}), $self->balancer_args) + ); + + $self->balancer($self->_build_balancer) + if $self->balancer; + } + + $self->_master_connect_info_opts(\%opts); + + my (@res, $res); + if ($wantarray) { + @res = $self->$next($info, @extra); + } else { + $res = $self->$next($info, @extra); + } + + # Make sure master is blessed into the correct class and apply role to it. + my $master = $self->master; + $master->_determine_driver; + Moose::Meta::Class->initialize(ref $master); + + DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master); -=head2 _build_pool_type + # link pool back to master + $self->pool->master($master); -Lazy builder for the L attribute. + $wantarray ? @res : $res; +}; + +=head1 METHODS + +This class defines the following methods. + +=head2 BUILDARGS + +L when instantiating its storage passed itself as the +first argument. So we need to massage the arguments a bit so that all the +bits get put into the correct places. =cut -sub _build_pool_type { - return 'DBIx::Class::Storage::DBI::Replicated::Pool'; +sub BUILDARGS { + my ($class, $schema, $storage_type_args, @args) = @_; + + return { + schema=>$schema, + %$storage_type_args, + @args + } } -=head2 _build_pool +=head2 _build_master -Lazy builder for the L attribute. +Lazy builder for the L attribute. =cut -sub _build_pool { - my $self = shift @_; - $self->create_pool(%{$self->pool_args}); +sub _build_master { + my $self = shift @_; + my $master = DBIx::Class::Storage::DBI->new($self->schema); + $master } -=head2 _build_balancer_type +=head2 _build_pool -Lazy builder for the L attribute. +Lazy builder for the L attribute. =cut -sub _build_balancer_type { - return 'DBIx::Class::Storage::DBI::Replicated::Balancer::First'; +sub _build_pool { + my $self = shift @_; + $self->create_pool(%{$self->pool_args}); } =head2 _build_balancer @@ -315,11 +522,12 @@ the balancer knows which pool it's balancing. =cut sub _build_balancer { - my $self = shift @_; - $self->create_balancer( - pool=>$self->pool, - master=>$self->master, - %{$self->balancer_args},); + my $self = shift @_; + $self->create_balancer( + pool=>$self->pool, + master=>$self->master, + %{$self->balancer_args}, + ); } =head2 _build_write_handler @@ -330,7 +538,7 @@ the L. =cut sub _build_write_handler { - return shift->master; + return shift->master; } =head2 _build_read_handler @@ -341,19 +549,63 @@ the L. =cut sub _build_read_handler { - return shift->balancer; + return shift->balancer; } =head2 around: connect_replicants All calls to connect_replicants needs to have an existing $schema tacked onto -top of the args, since L needs it. +top of the args, since L needs it, and any C +options merged with the master, with replicant opts having higher priority. =cut -around 'connect_replicants' => sub { - my ($method, $self, @args) = @_; - $self->$method($self->schema, @args); +around connect_replicants => sub { + my ($next, $self, @args) = @_; + + for my $r (@args) { + $r = [ $r ] unless reftype $r eq 'ARRAY'; + + $self->throw_exception('coderef replicant connect_info not supported') + if ref $r->[0] && reftype $r->[0] eq 'CODE'; + +# any connect_info options? + my $i = 0; + $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH'; + +# make one if none + $r->[$i] = {} unless $r->[$i]; + +# merge if two hashes + my @hashes = @$r[$i .. $#{$r}]; + + $self->throw_exception('invalid connect_info options') + if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes; + + $self->throw_exception('too many hashrefs in connect_info') + if @hashes > 2; + + my $merge = Hash::Merge->new('LEFT_PRECEDENT'); + my %opts = %{ $merge->merge(reverse @hashes) }; + +# delete them + splice @$r, $i+1, ($#{$r} - $i), (); + +# make sure master/replicants opts don't clash + my %master_opts = %{ $self->_master_connect_info_opts }; + if (exists $opts{dbh_maker}) { + delete @master_opts{qw/dsn user password/}; + } + delete $master_opts{dbh_maker}; + +# merge with master + %opts = %{ $merge->merge(\%opts, \%master_opts) }; + +# update + $r->[$i] = \%opts; + } + + $self->$next($self->schema, @args); }; =head2 all_storages @@ -365,29 +617,29 @@ replicants. =cut sub all_storages { - my $self = shift @_; - - return grep {defined $_ && blessed $_} ( - $self->master, - $self->replicants, - ); + my $self = shift @_; + return grep {defined $_ && blessed $_} ( + $self->master, + values %{ $self->replicants }, + ); } =head2 execute_reliably ($coderef, ?@args) Given a coderef, saves the current state of the L, forces it to -use reliable storage (ie sets it to the master), executes a coderef and then +use reliable storage (e.g. sets it to the master), executes a coderef and then restores the original state. Example: - my $reliably = sub { - my $name = shift @_; - $schema->resultset('User')->create({name=>$name}); - my $user_rs = $schema->resultset('User')->find({name=>$name}); - }; + my $reliably = sub { + my $name = shift @_; + $schema->resultset('User')->create({name=>$name}); + my $user_rs = $schema->resultset('User')->find({name=>$name}); + return $user_rs; + }; - $schema->storage->execute_reliably($reliably, 'John'); + my $user_rs = $schema->storage->execute_reliably($reliably, 'John'); Use this when you must be certain of your database state, such as when you just inserted something and need to get a resultset including it, etc. @@ -395,75 +647,71 @@ inserted something and need to get a resultset including it, etc. =cut sub execute_reliably { - my ($self, $coderef, @args) = @_; - - unless( ref $coderef eq 'CODE') { - $self->throw_exception('Second argument must be a coderef'); - } + my ($self, $coderef, @args) = @_; + + unless( ref $coderef eq 'CODE') { + $self->throw_exception('Second argument must be a coderef'); + } + + ##Get copy of master storage + my $master = $self->master; - ##Get copy of master storage - my $master = $self->master; - - ##Get whatever the current read hander is - my $current = $self->read_handler; - - ##Set the read handler to master - $self->read_handler($master); - - ## do whatever the caller needs - eval { - $coderef->(@args); - }; - - if($@) { - $self->throw_exception("coderef returned an error: $@"); + ##Get whatever the current read hander is + my $current = $self->read_handler; + + ##Set the read handler to master + $self->read_handler($master); + + ## do whatever the caller needs + my @result; + my $want_array = wantarray; + + try { + if($want_array) { + @result = $coderef->(@args); + } elsif(defined $want_array) { + ($result[0]) = ($coderef->(@args)); + } else { + $coderef->(@args); } - + } catch { + $self->throw_exception("coderef returned an error: $_"); + } finally { ##Reset to the original state - $self->schema->storage->read_handler($current); + $self->read_handler($current); + }; + + return $want_array ? @result : $result[0]; } =head2 set_reliable_storage Sets the current $schema to be 'reliable', that is all queries, both read and write are sent to the master - + =cut sub set_reliable_storage { - my $self = shift @_; - my $schema = $self->schema; - my $write_handler = $self->schema->storage->write_handler; - - $schema->storage->read_handler($write_handler); + my $self = shift @_; + my $schema = $self->schema; + my $write_handler = $self->schema->storage->write_handler; + + $schema->storage->read_handler($write_handler); } =head2 set_balanced_storage Sets the current $schema to be use the for all reads, while all -writea are sent to the master only - +writes are sent to the master only + =cut sub set_balanced_storage { - my $self = shift @_; - my $schema = $self->schema; - my $write_handler = $self->schema->storage->balancer; - - $schema->storage->read_handler($write_handler); -} - -=head2 txn_do ($coderef) + my $self = shift @_; + my $schema = $self->schema; + my $balanced_handler = $self->schema->storage->balancer; -Overload to the txn_do method, which is delegated to whatever the -L is set to. We overload this in order to wrap in inside a -L method. - -=cut - -sub txn_do { - my($self, $coderef, @args) = @_; - $self->execute_reliably($coderef, @args); + $schema->storage->read_handler($balanced_handler); } =head2 connected @@ -473,11 +721,10 @@ Check that the master and at least one of the replicants is connected. =cut sub connected { - my $self = shift @_; - - return - $self->master->connected && - $self->pool->connected_replicants; + my $self = shift @_; + return + $self->master->connected && + $self->pool->connected_replicants; } =head2 ensure_connected @@ -487,10 +734,10 @@ Make sure all the storages are connected. =cut sub ensure_connected { - my $self = shift @_; - foreach my $source ($self->all_storages) { - $source->ensure_connected(@_); - } + my $self = shift @_; + foreach my $source ($self->all_storages) { + $source->ensure_connected(@_); + } } =head2 limit_dialect @@ -500,10 +747,11 @@ Set the limit_dialect for all existing storages =cut sub limit_dialect { - my $self = shift @_; - foreach my $source ($self->all_storages) { - $source->limit_dialect(@_); - } + my $self = shift @_; + foreach my $source ($self->all_storages) { + $source->limit_dialect(@_); + } + return $self->master->limit_dialect; } =head2 quote_char @@ -513,10 +761,11 @@ Set the quote_char for all existing storages =cut sub quote_char { - my $self = shift @_; - foreach my $source ($self->all_storages) { - $source->quote_char(@_); - } + my $self = shift @_; + foreach my $source ($self->all_storages) { + $source->quote_char(@_); + } + return $self->master->quote_char; } =head2 name_sep @@ -526,10 +775,11 @@ Set the name_sep for all existing storages =cut sub name_sep { - my $self = shift @_; - foreach my $source ($self->all_storages) { - $source->name_sep(@_); - } + my $self = shift @_; + foreach my $source ($self->all_storages) { + $source->name_sep(@_); + } + return $self->master->name_sep; } =head2 set_schema @@ -539,10 +789,10 @@ Set the schema object for all existing storages =cut sub set_schema { - my $self = shift @_; - foreach my $source ($self->all_storages) { - $source->set_schema(@_); - } + my $self = shift @_; + foreach my $source ($self->all_storages) { + $source->set_schema(@_); + } } =head2 debug @@ -552,49 +802,46 @@ set a debug flag across all storages =cut sub debug { - my $self = shift @_; + my $self = shift @_; + if(@_) { foreach my $source ($self->all_storages) { - $source->debug(@_); + $source->debug(@_); } + } + return $self->master->debug; } =head2 debugobj -set a debug object across all storages +set a debug object =cut sub debugobj { - my $self = shift @_; - foreach my $source ($self->all_storages) { - $source->debugobj(@_); - } + my $self = shift @_; + return $self->master->debugobj(@_); } =head2 debugfh -set a debugfh object across all storages +set a debugfh object =cut sub debugfh { - my $self = shift @_; - foreach my $source ($self->all_storages) { - $source->debugfh(@_); - } + my $self = shift @_; + return $self->master->debugfh(@_); } =head2 debugcb -set a debug callback across all storages +set a debug callback =cut sub debugcb { - my $self = shift @_; - foreach my $source ($self->all_storages) { - $source->debugcb(@_); - } + my $self = shift @_; + return $self->master->debugcb(@_); } =head2 disconnect @@ -604,20 +851,260 @@ disconnect everything =cut sub disconnect { - my $self = shift @_; - foreach my $source ($self->all_storages) { - $source->disconnect(@_); - } + my $self = shift @_; + foreach my $source ($self->all_storages) { + $source->disconnect(@_); + } +} + +=head2 cursor_class + +set cursor class on all storages, or return master's + +=cut + +sub cursor_class { + my ($self, $cursor_class) = @_; + + if ($cursor_class) { + $_->cursor_class($cursor_class) for $self->all_storages; + } + $self->master->cursor_class; +} + +=head2 cursor + +set cursor class on all storages, or return master's, alias for L +above. + +=cut + +sub cursor { + my ($self, $cursor_class) = @_; + + if ($cursor_class) { + $_->cursor($cursor_class) for $self->all_storages; + } + $self->master->cursor; +} + +=head2 unsafe + +sets the L option on all storages or returns +master's current setting + +=cut + +sub unsafe { + my $self = shift; + + if (@_) { + $_->unsafe(@_) for $self->all_storages; + } + + return $self->master->unsafe; +} + +=head2 disable_sth_caching + +sets the L option on all storages +or returns master's current setting + +=cut + +sub disable_sth_caching { + my $self = shift; + + if (@_) { + $_->disable_sth_caching(@_) for $self->all_storages; + } + + return $self->master->disable_sth_caching; } +=head2 lag_behind_master + +returns the highest Replicant L +setting + +=cut + +sub lag_behind_master { + my $self = shift; + + return max map $_->lag_behind_master, $self->replicants; +} + +=head2 is_replicating + +returns true if all replicants return true for +L + +=cut + +sub is_replicating { + my $self = shift; + + return (grep $_->is_replicating, $self->replicants) == ($self->replicants); +} + +=head2 connect_call_datetime_setup + +calls L for all storages + +=cut + +sub connect_call_datetime_setup { + my $self = shift; + $_->connect_call_datetime_setup for $self->all_storages; +} + +sub _populate_dbh { + my $self = shift; + $_->_populate_dbh for $self->all_storages; +} + +sub _connect { + my $self = shift; + $_->_connect for $self->all_storages; +} + +sub _rebless { + my $self = shift; + $_->_rebless for $self->all_storages; +} + +sub _determine_driver { + my $self = shift; + $_->_determine_driver for $self->all_storages; +} + +sub _driver_determined { + my $self = shift; + + if (@_) { + $_->_driver_determined(@_) for $self->all_storages; + } + + return $self->master->_driver_determined; +} + +sub _init { + my $self = shift; + + $_->_init for $self->all_storages; +} + +sub _run_connection_actions { + my $self = shift; + + $_->_run_connection_actions for $self->all_storages; +} + +sub _do_connection_actions { + my $self = shift; + + if (@_) { + $_->_do_connection_actions(@_) for $self->all_storages; + } +} + +sub connect_call_do_sql { + my $self = shift; + $_->connect_call_do_sql(@_) for $self->all_storages; +} + +sub disconnect_call_do_sql { + my $self = shift; + $_->disconnect_call_do_sql(@_) for $self->all_storages; +} + +sub _seems_connected { + my $self = shift; + + return min map $_->_seems_connected, $self->all_storages; +} + +sub _ping { + my $self = shift; + + return min map $_->_ping, $self->all_storages; +} + +# not using the normalized_version, because we want to preserve +# version numbers much longer than the conventional xxx.yyyzzz +my $numify_ver = sub { + my $ver = shift; + my @numparts = split /\D+/, $ver; + my $format = '%d.' . (join '', ('%06d') x (@numparts - 1)); + + return sprintf $format, @numparts; +}; +sub _server_info { + my $self = shift; + + if (not $self->_dbh_details->{info}) { + $self->_dbh_details->{info} = ( + reduce { $a->[0] < $b->[0] ? $a : $b } + map [ $numify_ver->($_->{dbms_version}), $_ ], + map $_->_server_info, $self->all_storages + )->[1]; + } + + return $self->next::method; +} + +sub _get_server_version { + my $self = shift; + + return $self->_server_info->{dbms_version}; +} + +=head1 GOTCHAS + +Due to the fact that replicants can lag behind a master, you must take care to +make sure you use one of the methods to force read queries to a master should +you need realtime data integrity. For example, if you insert a row, and then +immediately re-read it from the database (say, by doing $row->discard_changes) +or you insert a row and then immediately build a query that expects that row +to be an item, you should force the master to handle reads. Otherwise, due to +the lag, there is no certainty your data will be in the expected state. + +For data integrity, all transactions automatically use the master storage for +all read and write queries. Using a transaction is the preferred and recommended +method to force the master to handle all read queries. + +Otherwise, you can force a single query to use the master with the 'force_pool' +attribute: + + my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk); + +This attribute will safely be ignore by non replicated storages, so you can use +the same code for both types of systems. + +Lastly, you can use the L method, which works very much like +a transaction. + +For debugging, you can turn replication on/off with the methods L +and L, however this operates at a global level and is not +suitable if you have a shared Schema object being used by multiple processes, +such as on a web application server. You can get around this limitation by +using the Schema clone method. + + my $new_schema = $schema->clone; + $new_schema->set_reliable_storage; + + ## $new_schema will use only the Master storage for all reads/writes while + ## the $schema object will use replicated storage. + =head1 AUTHOR - John Napiorkowski + John Napiorkowski Based on code originated by: - Norbert Csongrádi - Peter Siklósi + Norbert Csongrádi + Peter Siklósi =head1 LICENSE @@ -625,4 +1112,6 @@ You may distribute this code under the same terms as Perl itself. =cut +__PACKAGE__->meta->make_immutable; + 1;