package DBIx::Class::Storage::DBI::Replicated;
+BEGIN {
+ use Carp::Clan qw/^DBIx::Class/;
+ use DBIx::Class;
+ croak('The following modules are required for Replication ' . DBIx::Class::Optional::Dependencies->req_missing_for ('replicated') )
+ unless DBIx::Class::Optional::Dependencies->req_ok_for ('replicated');
+}
+
use Moose;
+use DBIx::Class::Storage::DBI;
use DBIx::Class::Storage::DBI::Replicated::Pool;
+use DBIx::Class::Storage::DBI::Replicated::Balancer;
+use DBIx::Class::Storage::DBI::Replicated::Types qw/BalancerClassNamePart DBICSchema DBICStorageDBI/;
+use MooseX::Types::Moose qw/ClassName HashRef Object/;
+use Scalar::Util 'reftype';
+use Hash::Merge;
+use List::Util qw/min max reduce/;
+use Try::Tiny;
+use namespace::clean;
-#extends 'DBIx::Class::Storage::DBI', 'Moose::Object';
+use namespace::clean -except => 'meta';
=head1 NAME
-DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
+DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
=head1 SYNOPSIS
The Following example shows how to change an existing $schema to a replicated
-storage type, add some replicated (readonly) databases, and perform reporting
-tasks
-
- ## Change storage_type in your schema class
- $schema->storage_type( '::DBI::Replicated' );
-
- ## Add some slaves. Basically this is an array of arrayrefs, where each
- ## arrayref is database connect information
-
- $schema->storage->create_replicants(
- [$dsn1, $user, $pass, \%opts],
- [$dsn1, $user, $pass, \%opts],
- [$dsn1, $user, $pass, \%opts],
- ## This is just going to use the standard DBIC connect method, so it
- ## supports everything that method supports, such as connecting to an
- ## existing database handle.
- [$dbh],
- \%global_opts
- );
-
- ## a hash of replicants, keyed by their DSN
- my %replicants = $schema->storage->replicants;
- my $replicant = $schema->storage->get_replicant($dsn);
- $replicant->status;
- $replicant->is_active;
- $replicant->active;
-
+storage type, add some replicated (read-only) databases, and perform reporting
+tasks.
+
+You should set the 'storage_type attribute to a replicated type. You should
+also define your arguments, such as which balancer you want and any arguments
+that the Pool object should get.
+
+ my $schema = Schema::Class->clone;
+ $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
+ $schema->connection(...);
+
+Next, you need to add in the Replicants. Basically this is an array of
+arrayrefs, where each arrayref is database connect information. Think of these
+arguments as what you'd pass to the 'normal' $schema->connect method.
+
+ $schema->storage->connect_replicants(
+ [$dsn1, $user, $pass, \%opts],
+ [$dsn2, $user, $pass, \%opts],
+ [$dsn3, $user, $pass, \%opts],
+ );
+
+Now, just use the $schema as you normally would. Automatically all reads will
+be delegated to the replicants, while writes to the master.
+
+ $schema->resultset('Source')->search({name=>'etc'});
+
+You can force a given query to use a particular storage using the search
+attribute 'force_pool'. For example:
+
+ my $RS = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
+
+Now $RS will force everything (both reads and writes) to use whatever was setup
+as the master storage. 'master' is hardcoded to always point to the Master,
+but you can also use any Replicant name. Please see:
+L<DBIx::Class::Storage::DBI::Replicated::Pool> and the replicants attribute for more.
+
+Also see transactions and L</execute_reliably> for alternative ways to
+force read traffic to the master. In general, you should wrap your statements
+in a transaction when you are reading and writing to the same tables at the
+same time, since your replicants will often lag a bit behind the master.
+
+See L<DBIx::Class::Storage::DBI::Replicated::Instructions> for more help and
+walkthroughs.
+
=head1 DESCRIPTION
-Warning: This class is marked ALPHA. We are using this in development and have
-some basic test coverage but the code hasn't yet been stressed by a variety
-of databases. Individual DB's may have quirks we are not aware of. Please
-use this in development and pass along your experiences/bug fixes.
+Warning: This class is marked BETA. This has been running a production
+website using MySQL native replication as its backend and we have some decent
+test coverage but the code hasn't yet been stressed by a variety of databases.
+Individual DBs may have quirks we are not aware of. Please use this in first
+development and pass along your experiences/bug fixes.
This class implements replicated data store for DBI. Currently you can define
one master and numerous slave database connections. All write-type queries
database, all read-type queries (SELECTs) go to the slave database.
Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
-handle gets delegated to one of the two attributes: L</master_storage> or to
-L</current_replicant_storage>. Additionally, some methods need to be distributed
+handle gets delegated to one of the two attributes: L</read_handler> or to
+L</write_handler>. Additionally, some methods need to be distributed
to all existing storages. This way our storage class is a drop in replacement
for L<DBIx::Class::Storage::DBI>.
-Read traffic is spread across the replicants (slaves) occuring to a user
+Read traffic is spread across the replicants (slaves) occurring to a user
selected algorithm. The default algorithm is random weighted.
-TODO more details about the algorithm.
+=head1 NOTES
+
+The consistency between master and replicants is database specific. The Pool
+gives you a method to validate its replicants, removing and replacing them
+when they fail/pass predefined criteria. Please make careful use of the ways
+to force a query to run against Master when needed.
+
+=head1 REQUIREMENTS
+
+Replicated Storage has additional requirements not currently part of
+L<DBIx::Class>. See L<DBIx::Class::Optional::Dependencies> for more details.
=head1 ATTRIBUTES
This class defines the following attributes.
+=head2 schema
+
+The underlying L<DBIx::Class::Schema> object this storage is attaching
+
+=cut
+
+has 'schema' => (
+ is=>'rw',
+ isa=>DBICSchema,
+ weak_ref=>1,
+ required=>1,
+);
+
+=head2 pool_type
+
+Contains the classname which will instantiate the L</pool> object. Defaults
+to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
+
+=cut
+
+has 'pool_type' => (
+ is=>'rw',
+ isa=>ClassName,
+ default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
+ handles=>{
+ 'create_pool' => 'new',
+ },
+);
+
+=head2 pool_args
+
+Contains a hashref of initialized information to pass to the Balancer object.
+See L<DBIx::Class::Storage::DBI::Replicated::Pool> for available arguments.
+
+=cut
+
+has 'pool_args' => (
+ is=>'rw',
+ isa=>HashRef,
+ lazy=>1,
+ default=>sub { {} },
+);
+
+
+=head2 balancer_type
+
+The replication pool requires a balance class to provider the methods for
+choose how to spread the query load across each replicant in the pool.
+
+=cut
+
+has 'balancer_type' => (
+ is=>'rw',
+ isa=>BalancerClassNamePart,
+ coerce=>1,
+ required=>1,
+ default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
+ handles=>{
+ 'create_balancer' => 'new',
+ },
+);
+
+=head2 balancer_args
+
+Contains a hashref of initialized information to pass to the Balancer object.
+See L<DBIx::Class::Storage::DBI::Replicated::Balancer> for available arguments.
+
+=cut
+
+has 'balancer_args' => (
+ is=>'rw',
+ isa=>HashRef,
+ lazy=>1,
+ required=>1,
+ default=>sub { {} },
+);
+
+=head2 pool
+
+Is a L<DBIx::Class::Storage::DBI::Replicated::Pool> or derived class. This is a
+container class for one or more replicated databases.
+
+=cut
+
+has 'pool' => (
+ is=>'ro',
+ isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
+ lazy_build=>1,
+ handles=>[qw/
+ connect_replicants
+ replicants
+ has_replicants
+ /],
+);
+
+=head2 balancer
+
+Is a L<DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class. This
+is a class that takes a pool (L<DBIx::Class::Storage::DBI::Replicated::Pool>)
+
+=cut
+
+has 'balancer' => (
+ is=>'rw',
+ isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
+ lazy_build=>1,
+ handles=>[qw/auto_validate_every/],
+);
+
=head2 master
The master defines the canonical state for a pool of connected databases. All
=cut
has 'master' => (
- is=> 'ro',
- isa=>'DBIx::Class::Storage::DBI',
- lazy_build=>1,
- handles=>[qw/
- on_connect_do
- on_disconnect_do
- columns_info_for
- connect_info
- throw_exception
- sql_maker
- sqlt_type
- create_ddl_dir
- deployment_statements
- datetime_parser
- datetime_parser_type
- last_insert_id
- insert
- insert_bulk
- update
- delete
- dbh
- txn_do
- txn_commit
- txn_rollback
- sth
- deploy
- /],
+ is=> 'ro',
+ isa=>DBICStorageDBI,
+ lazy_build=>1,
);
+=head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
-=head2 current_replicant
+The following methods are delegated all the methods required for the
+L<DBIx::Class::Storage::DBI> interface.
-Replicant storages (slaves) handle all read only traffic. The assumption is
-that your database will become readbound well before it becomes write bound
-and that being able to spread your read only traffic around to multiple
-databases is going to help you to scale traffic.
+=head2 read_handler
-This attribute returns the next slave to handle a read request. Your L</pool>
-attribute has methods to help you shuffle through all the available replicants
-via it's balancer object.
+Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
-This attribute defines the following reader/writer methods
+=cut
-=over 4
+has 'read_handler' => (
+ is=>'rw',
+ isa=>Object,
+ lazy_build=>1,
+ handles=>[qw/
+ select
+ select_single
+ columns_info_for
+ _dbh_columns_info_for
+ _select
+ /],
+);
-=item get_current_replicant
+=head2 write_handler
-Returns the contained L<DBIx::Class::Storage::DBI> replicant
+Defines an object that implements the write side of L<BIx::Class::Storage::DBI>,
+as well as methods that don't write or read that can be called on only one
+storage, methods that return a C<$dbh>, and any methods that don't make sense to
+run on a replicant.
-=item set_current_replicant
+=cut
+
+has 'write_handler' => (
+ is=>'ro',
+ isa=>Object,
+ lazy_build=>1,
+ handles=>[qw/
+ on_connect_do
+ on_disconnect_do
+ on_connect_call
+ on_disconnect_call
+ connect_info
+ _connect_info
+ throw_exception
+ sql_maker
+ sqlt_type
+ create_ddl_dir
+ deployment_statements
+ datetime_parser
+ datetime_parser_type
+ build_datetime_parser
+ last_insert_id
+ insert
+ insert_bulk
+ update
+ delete
+ dbh
+ txn_begin
+ txn_do
+ txn_commit
+ txn_rollback
+ txn_scope_guard
+ sth
+ deploy
+ with_deferred_fk_checks
+ dbh_do
+ reload_row
+ with_deferred_fk_checks
+ _prep_for_execute
+
+ backup
+ is_datatype_numeric
+ _count_select
+ _subq_update_delete
+ svp_rollback
+ svp_begin
+ svp_release
+ relname_to_table_alias
+ _dbh_last_insert_id
+ _fix_bind_params
+ _default_dbi_connect_attributes
+ _dbi_connect_info
+ _dbic_connect_attributes
+ auto_savepoint
+ _sqlt_version_ok
+ _query_end
+ bind_attribute_by_data_type
+ transaction_depth
+ _dbh
+ _select_args
+ _dbh_execute_array
+ _sql_maker
+ _query_start
+ _sqlt_version_error
+ _per_row_update_delete
+ _dbh_begin_work
+ _dbh_execute_inserts_with_no_binds
+ _select_args_to_query
+ _svp_generate_name
+ _multipk_update_delete
+ source_bind_attributes
+ _normalize_connect_info
+ _parse_connect_do
+ _dbh_commit
+ _execute_array
+ savepoints
+ _sqlt_minimum_version
+ _sql_maker_opts
+ _conn_pid
+ _dbh_autocommit
+ _native_data_type
+ _get_dbh
+ sql_maker_class
+ _dbh_rollback
+ _adjust_select_args_for_complex_prefetch
+ _resolve_ident_sources
+ _resolve_column_info
+ _prune_unused_joins
+ _strip_cond_qualifiers
+ _resolve_aliastypes_from_select_args
+ _execute
+ _do_query
+ _dbh_sth
+ _dbh_execute
+ /],
+);
-Set the attribute to a given L<DBIx::Class::Storage::DBI> (or subclass) object.
+my @unimplemented = qw(
+ _arm_global_destructor
+ _verify_pid
-=back
+ get_use_dbms_capability
+ set_use_dbms_capability
+ get_dbms_capability
+ set_dbms_capability
+ _dbh_details
+ _dbh_get_info
-We split the reader/writer to make it easier to selectively override how the
-replicant is altered.
+ sql_limit_dialect
+ sql_quote_char
+ sql_name_sep
-=cut
+ _inner_join_to_node
+ _group_over_selection
+ _extract_order_criteria
+
+ _prefetch_autovalues
+
+ _max_column_bytesize
+ _is_lob_type
+);
-has 'current_replicant' => (
- is=> 'rw',
- reader=>'get_current_replicant',
- writer=>'set_current_replicant',
- isa=>'DBIx::Class::Storage::DBI',
- lazy_build=>1,
- handles=>[qw/
- select
- select_single
- columns_info_for
- /],
+# the capability framework
+# not sure if CMOP->initialize does evil things to DBIC::S::DBI, fix if a problem
+push @unimplemented, ( grep
+ { $_ =~ /^ _ (?: use | supports | determine_supports ) _ /x }
+ ( Class::MOP::Class->initialize('DBIx::Class::Storage::DBI')->get_all_method_names )
);
+for my $method (@unimplemented) {
+ __PACKAGE__->meta->add_method($method, sub {
+ croak "$method must not be called on ".(blessed shift).' objects';
+ });
+}
+
+has _master_connect_info_opts =>
+ (is => 'rw', isa => HashRef, default => sub { {} });
-=head2 replicant_storage_pool_type
+=head2 around: connect_info
-Contains the classname which will instantiate the L</replicant_storage_pool>
-object. Defaults to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
+Preserves master's C<connect_info> options (for merging with replicants.)
+Also sets any Replicated-related options from connect_info, such as
+C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>.
=cut
-has 'replicant_storage_pool_type' => (
- is=>'ro',
- isa=>'ClassName',
- required=>1,
- default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
- handles=> {
- 'create_replicant_storage_pool' => 'new',
- },
-);
+around connect_info => sub {
+ my ($next, $self, $info, @extra) = @_;
+ my $merge = Hash::Merge->new('LEFT_PRECEDENT');
-=head2 pool_balancer_type
+ my %opts;
+ for my $arg (@$info) {
+ next unless (reftype($arg)||'') eq 'HASH';
+ %opts = %{ $merge->merge($arg, \%opts) };
+ }
+ delete $opts{dsn};
-The replication pool requires a balance class to provider the methods for
-choose how to spread the query load across each replicant in the pool.
+ if (@opts{qw/pool_type pool_args/}) {
+ $self->pool_type(delete $opts{pool_type})
+ if $opts{pool_type};
-=cut
+ $self->pool_args(
+ $merge->merge((delete $opts{pool_args} || {}), $self->pool_args)
+ );
-has 'pool_balancer_type' => (
- is=>'ro',
- isa=>'ClassName',
- required=>1,
- default=>'DBIx::Class::Storage::DBI::Replicated::Pool::Balancer',
- handles=> {
- 'create_replicant_storage_pool' => 'new',
- },
-);
+ ## Since we possibly changed the pool_args, we need to clear the current
+ ## pool object so that next time it is used it will be rebuilt.
+ $self->clear_pool;
+ }
+ if (@opts{qw/balancer_type balancer_args/}) {
+ $self->balancer_type(delete $opts{balancer_type})
+ if $opts{balancer_type};
-=head2 replicant_storage_pool
+ $self->balancer_args(
+ $merge->merge((delete $opts{balancer_args} || {}), $self->balancer_args)
+ );
-Holds the list of connected replicants, their status and other housekeeping or
-reporting methods.
+ $self->balancer($self->_build_balancer)
+ if $self->balancer;
+ }
-=cut
+ $self->_master_connect_info_opts(\%opts);
-has 'replicant_storage_pool' => (
- is=>'ro',
- isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
- lazy_build=>1,
- handles=>[qw/replicant_storages/],
-);
+ my @res;
+ if (wantarray) {
+ @res = $self->$next($info, @extra);
+ } else {
+ $res[0] = $self->$next($info, @extra);
+ }
+
+ # Make sure master is blessed into the correct class and apply role to it.
+ my $master = $self->master;
+ $master->_determine_driver;
+ Moose::Meta::Class->initialize(ref $master);
+ DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master);
+ # link pool back to master
+ $self->pool->master($master);
+
+ wantarray ? @res : $res[0];
+};
=head1 METHODS
This class defines the following methods.
-=head2 new
+=head2 BUILDARGS
-Make sure we properly inherit from L<Moose>.
+L<DBIx::Class::Schema> when instantiating its storage passed itself as the
+first argument. So we need to massage the arguments a bit so that all the
+bits get put into the correct places.
=cut
-sub new {
- my $class = shift @_;
- my $obj = $class->SUPER::new(@_);
-
- return $class->meta->new_object(
- __INSTANCE__ => $obj, @_
- );
+sub BUILDARGS {
+ my ($class, $schema, $storage_type_args, @args) = @_;
+
+ return {
+ schema=>$schema,
+ %$storage_type_args,
+ @args
+ }
}
-=head2 _build_master_storage
+=head2 _build_master
-Lazy builder for the L</master_storage> attribute.
+Lazy builder for the L</master> attribute.
=cut
-sub _build_next_replicant_storage {
- DBIx::Class::Storage::DBI->new;
+sub _build_master {
+ my $self = shift @_;
+ my $master = DBIx::Class::Storage::DBI->new($self->schema);
+ $master
}
+=head2 _build_pool
-=head2 _build_current_replicant_storage
-
-Lazy builder for the L</current_replicant_storage> attribute.
+Lazy builder for the L</pool> attribute.
=cut
-sub _build_current_replicant_storage {
- shift->replicant_storage_pool->first;
+sub _build_pool {
+ my $self = shift @_;
+ $self->create_pool(%{$self->pool_args});
}
+=head2 _build_balancer
-=head2 _build_replicant_storage_pool
-
-Lazy builder for the L</replicant_storage_pool> attribute.
+Lazy builder for the L</balancer> attribute. This takes a Pool object so that
+the balancer knows which pool it's balancing.
=cut
-sub _build_replicant_storage_pool {
- my $self = shift @_;
- $self->create_replicant_storage_pool;
+sub _build_balancer {
+ my $self = shift @_;
+ $self->create_balancer(
+ pool=>$self->pool,
+ master=>$self->master,
+ %{$self->balancer_args},
+ );
}
+=head2 _build_write_handler
-=head2 around: create_replicant_storage_pool
-
-Make sure all calles to the method set a default balancer type to our current
-balancer type.
+Lazy builder for the L</write_handler> attribute. The default is to set this to
+the L</master>.
=cut
-around 'create_replicant_storage_pool' => sub {
- my ($method, $self, @args) = @_;
- return $self->$method(balancer_type=>$self->pool_balancer_type, @args);
+sub _build_write_handler {
+ return shift->master;
}
+=head2 _build_read_handler
-=head2 after: get_current_replicant_storage
+Lazy builder for the L</read_handler> attribute. The default is to set this to
+the L</balancer>.
-Advice on the current_replicant_storage attribute. Each time we use a replicant
-we need to change it via the storage pool algorithm. That way we are spreading
-the load evenly (hopefully) across existing capacity.
+=cut
+
+sub _build_read_handler {
+ return shift->balancer;
+}
+
+=head2 around: connect_replicants
+
+All calls to connect_replicants needs to have an existing $schema tacked onto
+top of the args, since L<DBIx::Storage::DBI> needs it, and any C<connect_info>
+options merged with the master, with replicant opts having higher priority.
=cut
-after 'get_current_replicant_storage' => sub {
- my $self = shift @_;
- my $next_replicant = $self->replicant_storage_pool->next;
- $self->next_replicant_storage($next_replicant);
-};
+around connect_replicants => sub {
+ my ($next, $self, @args) = @_;
+ for my $r (@args) {
+ $r = [ $r ] unless reftype $r eq 'ARRAY';
-=head2 find_or_create
+ $self->throw_exception('coderef replicant connect_info not supported')
+ if ref $r->[0] && reftype $r->[0] eq 'CODE';
-First do a find on the replicant. If no rows are found, pass it on to the
-L</master_storage>
+# any connect_info options?
+ my $i = 0;
+ $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
-=cut
+# make one if none
+ $r->[$i] = {} unless $r->[$i];
-sub find_or_create {
- my $self = shift @_;
-}
+# merge if two hashes
+ my @hashes = @$r[$i .. $#{$r}];
+
+ $self->throw_exception('invalid connect_info options')
+ if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes;
+
+ $self->throw_exception('too many hashrefs in connect_info')
+ if @hashes > 2;
+
+ my $merge = Hash::Merge->new('LEFT_PRECEDENT');
+ my %opts = %{ $merge->merge(reverse @hashes) };
+
+# delete them
+ splice @$r, $i+1, ($#{$r} - $i), ();
+
+# make sure master/replicants opts don't clash
+ my %master_opts = %{ $self->_master_connect_info_opts };
+ if (exists $opts{dbh_maker}) {
+ delete @master_opts{qw/dsn user password/};
+ }
+ delete $master_opts{dbh_maker};
+
+# merge with master
+ %opts = %{ $merge->merge(\%opts, \%master_opts) };
+
+# update
+ $r->[$i] = \%opts;
+ }
+
+ $self->$next($self->schema, @args);
+};
=head2 all_storages
=cut
sub all_storages {
- my $self = shift @_;
-
- return (
- $self->master_storage,
- $self->replicant_storages,
- );
+ my $self = shift @_;
+ return grep {defined $_ && blessed $_} (
+ $self->master,
+ values %{ $self->replicants },
+ );
+}
+
+=head2 execute_reliably ($coderef, ?@args)
+
+Given a coderef, saves the current state of the L</read_handler>, forces it to
+use reliable storage (e.g. sets it to the master), executes a coderef and then
+restores the original state.
+
+Example:
+
+ my $reliably = sub {
+ my $name = shift @_;
+ $schema->resultset('User')->create({name=>$name});
+ my $user_rs = $schema->resultset('User')->find({name=>$name});
+ return $user_rs;
+ };
+
+ my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
+
+Use this when you must be certain of your database state, such as when you just
+inserted something and need to get a resultset including it, etc.
+
+=cut
+
+sub execute_reliably {
+ my ($self, $coderef, @args) = @_;
+
+ unless( ref $coderef eq 'CODE') {
+ $self->throw_exception('Second argument must be a coderef');
+ }
+
+ ##Get copy of master storage
+ my $master = $self->master;
+
+ ##Get whatever the current read hander is
+ my $current = $self->read_handler;
+
+ ##Set the read handler to master
+ $self->read_handler($master);
+
+ ## do whatever the caller needs
+ my @result;
+ my $want_array = wantarray;
+
+ try {
+ if($want_array) {
+ @result = $coderef->(@args);
+ } elsif(defined $want_array) {
+ ($result[0]) = ($coderef->(@args));
+ } else {
+ $coderef->(@args);
+ }
+ } catch {
+ $self->throw_exception("coderef returned an error: $_");
+ } finally {
+ ##Reset to the original state
+ $self->read_handler($current);
+ };
+
+ return wantarray ? @result : $result[0];
+}
+
+=head2 set_reliable_storage
+
+Sets the current $schema to be 'reliable', that is all queries, both read and
+write are sent to the master
+
+=cut
+
+sub set_reliable_storage {
+ my $self = shift @_;
+ my $schema = $self->schema;
+ my $write_handler = $self->schema->storage->write_handler;
+
+ $schema->storage->read_handler($write_handler);
}
+=head2 set_balanced_storage
+
+Sets the current $schema to be use the </balancer> for all reads, while all
+writes are sent to the master only
+
+=cut
+
+sub set_balanced_storage {
+ my $self = shift @_;
+ my $schema = $self->schema;
+ my $balanced_handler = $self->schema->storage->balancer;
+
+ $schema->storage->read_handler($balanced_handler);
+}
=head2 connected
=cut
sub connected {
- my $self = shift @_;
-
- return
- $self->master_storage->connected &&
- $self->replicant_storage_pool->has_connected_slaves;
+ my $self = shift @_;
+ return
+ $self->master->connected &&
+ $self->pool->connected_replicants;
}
-
=head2 ensure_connected
Make sure all the storages are connected.
=cut
sub ensure_connected {
- my $self = shift @_;
- foreach $source (shift->all_sources) {
- $source->ensure_connected(@_);
- }
+ my $self = shift @_;
+ foreach my $source ($self->all_storages) {
+ $source->ensure_connected(@_);
+ }
}
-
=head2 limit_dialect
Set the limit_dialect for all existing storages
=cut
sub limit_dialect {
- my $self = shift @_;
- foreach $source (shift->all_sources) {
- $source->name_sep(@_);
- }
+ my $self = shift @_;
+ foreach my $source ($self->all_storages) {
+ $source->limit_dialect(@_);
+ }
+ return $self->master->limit_dialect;
}
-
=head2 quote_char
Set the quote_char for all existing storages
=cut
sub quote_char {
- my $self = shift @_;
- foreach $source (shift->all_sources) {
- $source->name_sep(@_);
- }
+ my $self = shift @_;
+ foreach my $source ($self->all_storages) {
+ $source->quote_char(@_);
+ }
+ return $self->master->quote_char;
}
-
=head2 name_sep
Set the name_sep for all existing storages
=cut
sub name_sep {
- my $self = shift @_;
- foreach $source (shift->all_sources) {
- $source->name_sep(@_);
- }
+ my $self = shift @_;
+ foreach my $source ($self->all_storages) {
+ $source->name_sep(@_);
+ }
+ return $self->master->name_sep;
}
-
=head2 set_schema
Set the schema object for all existing storages
=cut
sub set_schema {
- my $self = shift @_;
- foreach $source (shift->all_sources) {
- $source->set_schema(@_);
- }
+ my $self = shift @_;
+ foreach my $source ($self->all_storages) {
+ $source->set_schema(@_);
+ }
}
-
=head2 debug
set a debug flag across all storages
=cut
sub debug {
- my $self = shift @_;
- foreach $source (shift->all_sources) {
- $source->debug(@_);
+ my $self = shift @_;
+ if(@_) {
+ foreach my $source ($self->all_storages) {
+ $source->debug(@_);
}
+ }
+ return $self->master->debug;
}
-
=head2 debugobj
-set a debug object across all storages
+set a debug object
=cut
sub debugobj {
- my $self = shift @_;
- foreach $source (shift->all_sources) {
- $source->debugobj(@_);
- }
+ my $self = shift @_;
+ return $self->master->debugobj(@_);
}
-
=head2 debugfh
-set a debugfh object across all storages
+set a debugfh object
=cut
sub debugfh {
- my $self = shift @_;
- foreach $source (shift->all_sources) {
- $source->debugfh(@_);
- }
+ my $self = shift @_;
+ return $self->master->debugfh(@_);
}
-
=head2 debugcb
-set a debug callback across all storages
+set a debug callback
=cut
sub debugcb {
- my $self = shift @_;
- foreach $source (shift->all_sources) {
- $source->debugcb(@_);
- }
+ my $self = shift @_;
+ return $self->master->debugcb(@_);
}
-
=head2 disconnect
disconnect everything
=cut
sub disconnect {
- my $self = shift @_;
- foreach $source (shift->all_sources) {
- $source->disconnect(@_);
- }
+ my $self = shift @_;
+ foreach my $source ($self->all_storages) {
+ $source->disconnect(@_);
+ }
}
+=head2 cursor_class
-=head2 DESTROY
-
-Make sure we pass destroy events down to the storage handlers
+set cursor class on all storages, or return master's
=cut
-sub DESTROY {
- my $self = shift;
- ## TODO, maybe we can just leave this alone ???
+sub cursor_class {
+ my ($self, $cursor_class) = @_;
+
+ if ($cursor_class) {
+ $_->cursor_class($cursor_class) for $self->all_storages;
+ }
+ $self->master->cursor_class;
}
+=head2 cursor
-=head1 AUTHOR
+set cursor class on all storages, or return master's, alias for L</cursor_class>
+above.
-Norbert Csongrádi <bert@cpan.org>
+=cut
-Peter Siklósi <einon@einon.hu>
+sub cursor {
+ my ($self, $cursor_class) = @_;
-John Napiorkowski <john.napiorkowski@takkle.com>
+ if ($cursor_class) {
+ $_->cursor($cursor_class) for $self->all_storages;
+ }
+ $self->master->cursor;
+}
-=head1 LICENSE
+=head2 unsafe
-You may distribute this code under the same terms as Perl itself.
+sets the L<DBIx::Class::Storage::DBI/unsafe> option on all storages or returns
+master's current setting
=cut
-1;
+sub unsafe {
+ my $self = shift;
-__END__
+ if (@_) {
+ $_->unsafe(@_) for $self->all_storages;
+ }
-use strict;
-use warnings;
+ return $self->master->unsafe;
+}
-use DBIx::Class::Storage::DBI;
-use DBD::Multi;
+=head2 disable_sth_caching
-use base qw/Class::Accessor::Fast/;
+sets the L<DBIx::Class::Storage::DBI/disable_sth_caching> option on all storages
+or returns master's current setting
-__PACKAGE__->mk_accessors( qw/read_source write_source/ );
+=cut
-=head1 NAME
+sub disable_sth_caching {
+ my $self = shift;
-DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
+ if (@_) {
+ $_->disable_sth_caching(@_) for $self->all_storages;
+ }
-=head1 SYNOPSIS
+ return $self->master->disable_sth_caching;
+}
-The Following example shows how to change an existing $schema to a replicated
-storage type and update it's connection information to contain a master DSN and
-an array of slaves.
-
- ## Change storage_type in your schema class
- $schema->storage_type( '::DBI::Replicated' );
-
- ## Set your connection.
- $schema->connect(
- $dsn, $user, $password, {
- AutoCommit => 1,
- ## Other standard DBI connection or DBD custom attributes added as
- ## usual. Additionally, we have two custom attributes for defining
- ## slave information and controlling how the underlying DBD::Multi
- slaves_connect_info => [
- ## Define each slave like a 'normal' DBI connection, but you add
- ## in a DBD::Multi custom attribute to define how the slave is
- ## prioritized. Please see DBD::Multi for more.
- [$slave1dsn, $user, $password, {%slave1opts, priority=>10}],
- [$slave2dsn, $user, $password, {%slave2opts, priority=>10}],
- [$slave3dsn, $user, $password, {%slave3opts, priority=>20}],
- ## add in a preexisting database handle
- [$dbh, '','', {priority=>30}],
- ## DBD::Multi will call this coderef for connects
- [sub { DBI->connect(< DSN info >) }, '', '', {priority=>40}],
- ## If the last item is hashref, we use that for DBD::Multi's
- ## configuration information. Again, see DBD::Multi for more.
- {timeout=>25, failed_max=>2},
- ],
- },
- );
-
- ## Now, just use the schema as normal
- $schema->resultset('Table')->find(< unique >); ## Reads will use slaves
- $schema->resultset('Table')->create(\%info); ## Writes will use master
+=head2 lag_behind_master
-=head1 DESCRIPTION
+returns the highest Replicant L<DBIx::Class::Storage::DBI/lag_behind_master>
+setting
-Warning: This class is marked ALPHA. We are using this in development and have
-some basic test coverage but the code hasn't yet been stressed by a variety
-of databases. Individual DB's may have quirks we are not aware of. Please
-use this in development and pass along your experiences/bug fixes.
+=cut
-This class implements replicated data store for DBI. Currently you can define
-one master and numerous slave database connections. All write-type queries
-(INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
-database, all read-type queries (SELECTs) go to the slave database.
+sub lag_behind_master {
+ my $self = shift;
-For every slave database you can define a priority value, which controls data
-source usage pattern. It uses L<DBD::Multi>, so first the lower priority data
-sources used (if they have the same priority, the are used randomized), than
-if all low priority data sources fail, higher ones tried in order.
+ return max map $_->lag_behind_master, $self->replicants;
+}
-=head1 CONFIGURATION
+=head2 is_replicating
-Please see L<DBD::Multi> for most configuration information.
+returns true if all replicants return true for
+L<DBIx::Class::Storage::DBI/is_replicating>
=cut
-sub new {
- my $proto = shift;
- my $class = ref( $proto ) || $proto;
- my $self = {};
+sub is_replicating {
+ my $self = shift;
- bless( $self, $class );
+ return (grep $_->is_replicating, $self->replicants) == ($self->replicants);
+}
- $self->write_source( DBIx::Class::Storage::DBI->new );
- $self->read_source( DBIx::Class::Storage::DBI->new );
+=head2 connect_call_datetime_setup
- return $self;
-}
+calls L<DBIx::Class::Storage::DBI/connect_call_datetime_setup> for all storages
-sub all_sources {
- my $self = shift;
+=cut
- my @sources = ($self->read_source, $self->write_source);
+sub connect_call_datetime_setup {
+ my $self = shift;
+ $_->connect_call_datetime_setup for $self->all_storages;
+}
- return wantarray ? @sources : \@sources;
+sub _populate_dbh {
+ my $self = shift;
+ $_->_populate_dbh for $self->all_storages;
}
-sub _connect_info {
- my $self = shift;
- my $master = $self->write_source->_connect_info;
- $master->[-1]->{slave_connect_info} = $self->read_source->_connect_info;
- return $master;
+sub _connect {
+ my $self = shift;
+ $_->_connect for $self->all_storages;
}
-sub connect_info {
- my ($self, $source_info) = @_;
+sub _rebless {
+ my $self = shift;
+ $_->_rebless for $self->all_storages;
+}
- ## if there is no $source_info, treat this sub like an accessor
- return $self->_connect_info
- if !$source_info;
-
- ## Alright, let's conect the master
- $self->write_source->connect_info($source_info);
-
- ## Now, build and then connect the Slaves
- my @slaves_connect_info = @{$source_info->[-1]->{slaves_connect_info}};
- my $dbd_multi_config = ref $slaves_connect_info[-1] eq 'HASH'
- ? pop @slaves_connect_info : {};
+sub _determine_driver {
+ my $self = shift;
+ $_->_determine_driver for $self->all_storages;
+}
- ## We need to do this since SQL::Abstract::Limit can't guess what DBD::Multi is
- $dbd_multi_config->{limit_dialect} = $self->write_source->sql_maker->limit_dialect
- unless defined $dbd_multi_config->{limit_dialect};
+sub _driver_determined {
+ my $self = shift;
- @slaves_connect_info = map {
- ## if the first element in the arrayhash is a ref, make that the value
- my $db = ref $_->[0] ? $_->[0] : $_;
- my $priority = $_->[-1]->{priority} || 10; ## default priority is 10
- $priority => $db;
- } @slaves_connect_info;
-
- $self->read_source->connect_info([
- 'dbi:Multi:', undef, undef, {
- dsns => [@slaves_connect_info],
- %$dbd_multi_config,
- },
- ]);
-
- ## Return the formated connection information
- return $self->_connect_info;
-}
+ if (@_) {
+ $_->_driver_determined(@_) for $self->all_storages;
+ }
-sub select {
- shift->read_source->select( @_ );
-}
-sub select_single {
- shift->read_source->select_single( @_ );
+ return $self->master->_driver_determined;
}
-sub throw_exception {
- shift->read_source->throw_exception( @_ );
+
+sub _init {
+ my $self = shift;
+
+ $_->_init for $self->all_storages;
}
-sub sql_maker {
- shift->read_source->sql_maker( @_ );
+
+sub _run_connection_actions {
+ my $self = shift;
+
+ $_->_run_connection_actions for $self->all_storages;
}
-sub columns_info_for {
- shift->read_source->columns_info_for( @_ );
+
+sub _do_connection_actions {
+ my $self = shift;
+
+ if (@_) {
+ $_->_do_connection_actions(@_) for $self->all_storages;
+ }
}
-sub sqlt_type {
- shift->read_source->sqlt_type( @_ );
+
+sub connect_call_do_sql {
+ my $self = shift;
+ $_->connect_call_do_sql(@_) for $self->all_storages;
}
-sub create_ddl_dir {
- shift->read_source->create_ddl_dir( @_ );
+
+sub disconnect_call_do_sql {
+ my $self = shift;
+ $_->disconnect_call_do_sql(@_) for $self->all_storages;
}
-sub deployment_statements {
- shift->read_source->deployment_statements( @_ );
+
+sub _seems_connected {
+ my $self = shift;
+
+ return min map $_->_seems_connected, $self->all_storages;
}
-sub datetime_parser {
- shift->read_source->datetime_parser( @_ );
+
+sub _ping {
+ my $self = shift;
+
+ return min map $_->_ping, $self->all_storages;
}
-sub datetime_parser_type {
- shift->read_source->datetime_parser_type( @_ );
+
+# not using the normalized_version, because we want to preserve
+# version numbers much longer than the conventional xxx.yyyzzz
+my $numify_ver = sub {
+ my $ver = shift;
+ my @numparts = split /\D+/, $ver;
+ my $format = '%d.' . (join '', ('%06d') x (@numparts - 1));
+
+ return sprintf $format, @numparts;
+};
+sub _server_info {
+ my $self = shift;
+
+ if (not $self->_dbh_details->{info}) {
+ $self->_dbh_details->{info} = (
+ reduce { $a->[0] < $b->[0] ? $a : $b }
+ map [ $numify_ver->($_->{dbms_version}), $_ ],
+ map $_->_server_info, $self->all_storages
+ )->[1];
+ }
+
+ return $self->next::method;
}
-sub build_datetime_parser {
- shift->read_source->build_datetime_parser( @_ );
+
+sub _get_server_version {
+ my $self = shift;
+
+ return $self->_server_info->{dbms_version};
}
-sub limit_dialect { $_->limit_dialect( @_ ) for( shift->all_sources ) }
-sub quote_char { $_->quote_char( @_ ) for( shift->all_sources ) }
-sub name_sep { $_->quote_char( @_ ) for( shift->all_sources ) }
-sub disconnect { $_->disconnect( @_ ) for( shift->all_sources ) }
-sub set_schema { $_->set_schema( @_ ) for( shift->all_sources ) }
+=head1 GOTCHAS
-sub DESTROY {
- my $self = shift;
+Due to the fact that replicants can lag behind a master, you must take care to
+make sure you use one of the methods to force read queries to a master should
+you need realtime data integrity. For example, if you insert a row, and then
+immediately re-read it from the database (say, by doing $row->discard_changes)
+or you insert a row and then immediately build a query that expects that row
+to be an item, you should force the master to handle reads. Otherwise, due to
+the lag, there is no certainty your data will be in the expected state.
- undef $self->{write_source};
- undef $self->{read_sources};
-}
+For data integrity, all transactions automatically use the master storage for
+all read and write queries. Using a transaction is the preferred and recommended
+method to force the master to handle all read queries.
-sub last_insert_id {
- shift->write_source->last_insert_id( @_ );
-}
-sub insert {
- shift->write_source->insert( @_ );
-}
-sub update {
- shift->write_source->update( @_ );
-}
-sub update_all {
- shift->write_source->update_all( @_ );
-}
-sub delete {
- shift->write_source->delete( @_ );
-}
-sub delete_all {
- shift->write_source->delete_all( @_ );
-}
-sub create {
- shift->write_source->create( @_ );
-}
-sub find_or_create {
- shift->write_source->find_or_create( @_ );
-}
-sub update_or_create {
- shift->write_source->update_or_create( @_ );
-}
-sub connected {
- shift->write_source->connected( @_ );
-}
-sub ensure_connected {
- shift->write_source->ensure_connected( @_ );
-}
-sub dbh {
- shift->write_source->dbh( @_ );
-}
-sub txn_do {
- shift->write_source->txn_do( @_ );
-}
-sub txn_commit {
- shift->write_source->txn_commit( @_ );
-}
-sub txn_rollback {
- shift->write_source->txn_rollback( @_ );
-}
-sub sth {
- shift->write_source->sth( @_ );
-}
-sub deploy {
- shift->write_source->deploy( @_ );
-}
-sub _prep_for_execute {
- shift->write_source->_prep_for_execute(@_);
-}
+Otherwise, you can force a single query to use the master with the 'force_pool'
+attribute:
-sub debugobj {
- shift->write_source->debugobj(@_);
-}
-sub debug {
- shift->write_source->debug(@_);
-}
+ my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk);
-sub debugfh { shift->_not_supported( 'debugfh' ) };
-sub debugcb { shift->_not_supported( 'debugcb' ) };
+This attribute will safely be ignore by non replicated storages, so you can use
+the same code for both types of systems.
-sub _not_supported {
- my( $self, $method ) = @_;
+Lastly, you can use the L</execute_reliably> method, which works very much like
+a transaction.
- die "This Storage does not support $method method.";
-}
+For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
+and L</set_balanced_storage>, however this operates at a global level and is not
+suitable if you have a shared Schema object being used by multiple processes,
+such as on a web application server. You can get around this limitation by
+using the Schema clone method.
-=head1 SEE ALSO
+ my $new_schema = $schema->clone;
+ $new_schema->set_reliable_storage;
-L<DBI::Class::Storage::DBI>, L<DBD::Multi>, L<DBI>
+ ## $new_schema will use only the Master storage for all reads/writes while
+ ## the $schema object will use replicated storage.
=head1 AUTHOR
-Norbert Csongrádi <bert@cpan.org>
+ John Napiorkowski <john.napiorkowski@takkle.com>
-Peter Siklósi <einon@einon.hu>
+Based on code originated by:
-John Napiorkowski <john.napiorkowski@takkle.com>
+ Norbert Csongrádi <bert@cpan.org>
+ Peter Siklósi <einon@einon.hu>
=head1 LICENSE
=cut
+__PACKAGE__->meta->make_immutable;
+
1;