1 package DBIx::Class::Storage::DBI::Replicated;
7 require DBIx::Class::Optional::Dependencies;
8 if ( my $missing = DBIx::Class::Optional::Dependencies->req_missing_for('replicated') ) {
9 die "The following modules are required for Replicated storage support: $missing\n";
14 use DBIx::Class::Storage::DBI;
15 use DBIx::Class::Storage::DBI::Replicated::Pool;
16 use DBIx::Class::Storage::DBI::Replicated::Balancer;
17 use DBIx::Class::Storage::DBI::Replicated::Types qw/BalancerClassNamePart DBICSchema DBICStorageDBI/;
18 use MooseX::Types::Moose qw/ClassName HashRef Object/;
19 use Scalar::Util 'reftype';
21 use List::Util qw( min max );
22 use Context::Preserve 'preserve_context';
24 use DBIx::Class::_Util 'dbic_internal_try';
26 use namespace::clean -except => 'meta';
30 DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
34 The Following example shows how to change an existing $schema to a replicated
35 storage type, add some replicated (read-only) databases, and perform reporting
38 You should set the 'storage_type attribute to a replicated type. You should
39 also define your arguments, such as which balancer you want and any arguments
40 that the Pool object should get.
42 my $schema = Schema::Class->clone;
43 $schema->storage_type(['::DBI::Replicated', { balancer_type => '::Random' }]);
44 $schema->connection(...);
46 Next, you need to add in the Replicants. Basically this is an array of
47 arrayrefs, where each arrayref is database connect information. Think of these
48 arguments as what you'd pass to the 'normal' $schema->connect method.
50 $schema->storage->connect_replicants(
51 [$dsn1, $user, $pass, \%opts],
52 [$dsn2, $user, $pass, \%opts],
53 [$dsn3, $user, $pass, \%opts],
56 Now, just use the $schema as you normally would. Automatically all reads will
57 be delegated to the replicants, while writes to the master.
59 $schema->resultset('Source')->search({name=>'etc'});
61 You can force a given query to use a particular storage using the search
62 attribute 'force_pool'. For example:
64 my $rs = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
66 Now $rs will force everything (both reads and writes) to use whatever was setup
67 as the master storage. 'master' is hardcoded to always point to the Master,
68 but you can also use any Replicant name. Please see:
69 L<DBIx::Class::Storage::DBI::Replicated::Pool> and the replicants attribute for more.
71 Also see transactions and L</execute_reliably> for alternative ways to
72 force read traffic to the master. In general, you should wrap your statements
73 in a transaction when you are reading and writing to the same tables at the
74 same time, since your replicants will often lag a bit behind the master.
76 If you have a multi-statement read only transaction you can force it to select
77 a random server in the pool by:
79 my $rs = $schema->resultset('Source')->search( undef,
80 { force_pool => $db->storage->read_handler->next_storage }
85 Warning: This class is marked BETA. This has been running a production
86 website using MySQL native replication as its backend and we have some decent
87 test coverage but the code hasn't yet been stressed by a variety of databases.
88 Individual DBs may have quirks we are not aware of. Please use this in first
89 development and pass along your experiences/bug fixes.
91 This class implements replicated data store for DBI. Currently you can define
92 one master and numerous slave database connections. All write-type queries
93 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
94 database, all read-type queries (SELECTs) go to the slave database.
96 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
97 handle gets delegated to one of the two attributes: L</read_handler> or to
98 L</write_handler>. Additionally, some methods need to be distributed
99 to all existing storages. This way our storage class is a drop in replacement
100 for L<DBIx::Class::Storage::DBI>.
102 Read traffic is spread across the replicants (slaves) occurring to a user
103 selected algorithm. The default algorithm is random weighted.
107 The consistency between master and replicants is database specific. The Pool
108 gives you a method to validate its replicants, removing and replacing them
109 when they fail/pass predefined criteria. Please make careful use of the ways
110 to force a query to run against Master when needed.
114 Replicated Storage has additional requirements not currently part of
115 L<DBIx::Class>. See L<DBIx::Class::Optional::Dependencies> for more details.
119 This class defines the following attributes.
123 The underlying L<DBIx::Class::Schema> object this storage is attaching
136 Contains the classname which will instantiate the L</pool> object. Defaults
137 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
144 default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
146 'create_pool' => 'new',
152 Contains a hashref of initialized information to pass to the Balancer object.
153 See L<DBIx::Class::Storage::DBI::Replicated::Pool> for available arguments.
167 The replication pool requires a balance class to provider the methods for
168 choose how to spread the query load across each replicant in the pool.
172 has 'balancer_type' => (
174 isa=>BalancerClassNamePart,
177 default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
179 'create_balancer' => 'new',
185 Contains a hashref of initialized information to pass to the Balancer object.
186 See L<DBIx::Class::Storage::DBI::Replicated::Balancer> for available arguments.
190 has 'balancer_args' => (
200 Is a L<DBIx::Class::Storage::DBI::Replicated::Pool> or derived class. This is a
201 container class for one or more replicated databases.
207 isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
218 Is a L<DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class. This
219 is a class that takes a pool (L<DBIx::Class::Storage::DBI::Replicated::Pool>)
225 isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
227 handles=>[qw/auto_validate_every/],
232 The master defines the canonical state for a pool of connected databases. All
233 the replicants are expected to match this databases state. Thus, in a classic
234 Master / Slaves distributed system, all the slaves are expected to replicate
235 the Master's state as quick as possible. This is the only database in the
236 pool of databases that is allowed to handle write traffic.
246 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
248 The following methods are delegated all the methods required for the
249 L<DBIx::Class::Storage::DBI> interface.
253 my $method_dispatch = {
265 deployment_statements
268 build_datetime_parser
283 with_deferred_fk_checks
291 relname_to_table_alias
293 _default_dbi_connect_attributes
295 _dbic_connect_attributes
301 bind_attribute_by_data_type
305 _dbh_execute_for_fetch
307 _dbh_execute_inserts_with_no_binds
308 _select_args_to_query
311 _normalize_connect_info
326 /, Class::MOP::Class->initialize('DBIx::Class::Storage::DBIHacks')->get_method_list ],
331 _dbh_columns_info_for
334 unimplemented => [qw/
335 _arm_global_destructor
339 get_use_dbms_capability
340 set_use_dbms_capability
349 _determine_connector_driver
350 _extract_driver_from_connect_info
352 _warn_undetermined_driver
359 _perform_autoinc_retrieval
360 _autoinc_supplied_for_op
373 # the capability framework
374 # not sure if CMOP->initialize does evil things to DBIC::S::DBI, fix if a problem
376 { $_ =~ /^ _ (?: use | supports | determine_supports ) _ /x and $_ ne '_use_multicolumn_in' }
377 ( Class::MOP::Class->initialize('DBIx::Class::Storage::DBI')->get_all_method_names )
381 # this only happens during DBIC-internal testing
382 if ( $INC{"t/lib/ANFANG.pm"} ) {
385 for my $type (keys %$method_dispatch) {
386 for (@{$method_dispatch->{$type}}) {
387 push @{$seen->{$_}}, $type;
391 if (my @dupes = grep { @{$seen->{$_}} > 1 } keys %$seen) {
393 'The following methods show up multiple times in ::Storage::DBI::Replicated handlers:',
394 (map { "$_: " . (join ', ', @{$seen->{$_}}) } sort @dupes),
399 if (my @cant = grep { ! DBIx::Class::Storage::DBI->can($_) } keys %$seen) {
401 '::Storage::DBI::Replicated specifies handling of the following *NON EXISTING* ::Storage::DBI methods:',
408 for my $method (@{$method_dispatch->{unimplemented}}) {
409 __PACKAGE__->meta->add_method($method, sub {
411 $self->throw_exception(
412 "$method() may not be called on '@{[ blessed $self ]}' objects, "
413 . 'call it on a specific pool instance instead'
420 Defines an object that implements the read side of L<DBIx::Class::Storage::DBI>.
424 has 'read_handler' => (
428 handles=>$method_dispatch->{reader},
433 Defines an object that implements the write side of L<DBIx::Class::Storage::DBI>,
434 as well as methods that don't write or read that can be called on only one
435 storage, methods that return a C<$dbh>, and any methods that don't make sense to
440 has 'write_handler' => (
444 handles=>$method_dispatch->{writer},
449 has _master_connect_info_opts =>
450 (is => 'rw', isa => HashRef, default => sub { {} });
452 =head2 around: connect_info
454 Preserves master's C<connect_info> options (for merging with replicants.)
455 Also sets any Replicated-related options from connect_info, such as
456 C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>.
460 around connect_info => sub {
461 my ($next, $self, $info, @extra) = @_;
463 $self->throw_exception(
464 'connect_info can not be retrieved from a replicated storage - '
465 . 'accessor must be called on a specific pool instance'
466 ) unless defined $info;
468 my $merge = Hash::Merge->new('LEFT_PRECEDENT');
471 for my $arg (@$info) {
472 next unless (reftype($arg)||'') eq 'HASH';
473 %opts = %{ $merge->merge($arg, \%opts) };
477 if (@opts{qw/pool_type pool_args/}) {
478 $self->pool_type(delete $opts{pool_type})
482 $merge->merge((delete $opts{pool_args} || {}), $self->pool_args)
485 ## Since we possibly changed the pool_args, we need to clear the current
486 ## pool object so that next time it is used it will be rebuilt.
490 if (@opts{qw/balancer_type balancer_args/}) {
491 $self->balancer_type(delete $opts{balancer_type})
492 if $opts{balancer_type};
494 $self->balancer_args(
495 $merge->merge((delete $opts{balancer_args} || {}), $self->balancer_args)
498 $self->balancer($self->_build_balancer)
502 $self->_master_connect_info_opts(\%opts);
504 return preserve_context {
505 $self->$next($info, @extra);
507 # Make sure master is blessed into the correct class and apply role to it.
508 my $master = $self->master;
509 $master->_determine_driver;
510 Moose::Meta::Class->initialize(ref $master);
512 DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master);
514 # link pool back to master
515 $self->pool->master($master);
521 This class defines the following methods.
525 L<DBIx::Class::Schema> when instantiating its storage passed itself as the
526 first argument. So we need to massage the arguments a bit so that all the
527 bits get put into the correct places.
532 my ($class, $schema, $storage_type_args, @args) = @_;
543 Lazy builder for the L</master> attribute.
549 my $master = DBIx::Class::Storage::DBI->new($self->schema);
555 Lazy builder for the L</pool> attribute.
561 $self->create_pool(%{$self->pool_args});
564 =head2 _build_balancer
566 Lazy builder for the L</balancer> attribute. This takes a Pool object so that
567 the balancer knows which pool it's balancing.
571 sub _build_balancer {
573 $self->create_balancer(
575 master=>$self->master,
576 %{$self->balancer_args},
580 =head2 _build_write_handler
582 Lazy builder for the L</write_handler> attribute. The default is to set this to
587 sub _build_write_handler {
588 return shift->master;
591 =head2 _build_read_handler
593 Lazy builder for the L</read_handler> attribute. The default is to set this to
598 sub _build_read_handler {
599 return shift->balancer;
602 =head2 around: connect_replicants
604 All calls to connect_replicants needs to have an existing $schema tacked onto
605 top of the args, since L<DBIx::Class::Storage::DBI> needs it, and any
606 L<connect_info|DBIx::Class::Storage::DBI/connect_info>
607 options merged with the master, with replicant opts having higher priority.
611 around connect_replicants => sub {
612 my ($next, $self, @args) = @_;
615 $r = [ $r ] unless reftype $r eq 'ARRAY';
617 $self->throw_exception('coderef replicant connect_info not supported')
618 if ref $r->[0] && reftype $r->[0] eq 'CODE';
620 # any connect_info options?
622 $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
625 $r->[$i] = {} unless $r->[$i];
627 # merge if two hashes
628 my @hashes = @$r[$i .. $#{$r}];
630 $self->throw_exception('invalid connect_info options')
631 if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes;
633 $self->throw_exception('too many hashrefs in connect_info')
636 my $merge = Hash::Merge->new('LEFT_PRECEDENT');
637 my %opts = %{ $merge->merge(reverse @hashes) };
640 splice @$r, $i+1, ($#{$r} - $i), ();
642 # make sure master/replicants opts don't clash
643 my %master_opts = %{ $self->_master_connect_info_opts };
644 if (exists $opts{dbh_maker}) {
645 delete @master_opts{qw/dsn user password/};
647 delete $master_opts{dbh_maker};
650 %opts = %{ $merge->merge(\%opts, \%master_opts) };
656 $self->$next($self->schema, @args);
661 Returns an array of all the connected storage backends. The first element
662 in the returned array is the master, and the rest are each of the
669 return grep {defined $_ && blessed $_} (
671 values %{ $self->replicants },
675 =head2 execute_reliably ($coderef, ?@args)
677 Given a coderef, saves the current state of the L</read_handler>, forces it to
678 use reliable storage (e.g. sets it to the master), executes a coderef and then
679 restores the original state.
685 $schema->resultset('User')->create({name=>$name});
686 my $user_rs = $schema->resultset('User')->find({name=>$name});
690 my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
692 Use this when you must be certain of your database state, such as when you just
693 inserted something and need to get a resultset including it, etc.
697 sub execute_reliably {
701 $self->throw_exception('Second argument must be a coderef')
702 unless( ref $coderef eq 'CODE');
704 ## replace the current read handler for the remainder of the scope
705 local $self->{read_handler} = $self->master;
710 =head2 set_reliable_storage
712 Sets the current $schema to be 'reliable', that is all queries, both read and
713 write are sent to the master
717 sub set_reliable_storage {
719 my $schema = $self->schema;
720 my $write_handler = $self->schema->storage->write_handler;
722 $schema->storage->read_handler($write_handler);
725 =head2 set_balanced_storage
727 Sets the current $schema to be use the </balancer> for all reads, while all
728 writes are sent to the master only
732 sub set_balanced_storage {
734 my $schema = $self->schema;
735 my $balanced_handler = $self->schema->storage->balancer;
737 $schema->storage->read_handler($balanced_handler);
742 Check that the master and at least one of the replicants is connected.
749 $self->master->connected &&
750 $self->pool->connected_replicants;
753 =head2 ensure_connected
755 Make sure all the storages are connected.
759 sub ensure_connected {
761 foreach my $source ($self->all_storages) {
762 $source->ensure_connected(@_);
768 Set the limit_dialect for all existing storages
774 foreach my $source ($self->all_storages) {
775 $source->limit_dialect(@_);
777 return $self->master->limit_dialect;
782 Set the quote_char for all existing storages
788 foreach my $source ($self->all_storages) {
789 $source->quote_char(@_);
791 return $self->master->quote_char;
796 Set the name_sep for all existing storages
802 foreach my $source ($self->all_storages) {
803 $source->name_sep(@_);
805 return $self->master->name_sep;
810 Set the schema object for all existing storages
816 foreach my $source ($self->all_storages) {
817 $source->set_schema(@_);
823 set a debug flag across all storages
830 foreach my $source ($self->all_storages) {
834 return $self->master->debug;
845 return $self->master->debugobj(@_);
856 return $self->master->debugfh(@_);
867 return $self->master->debugcb(@_);
872 disconnect everything
878 foreach my $source ($self->all_storages) {
879 $source->disconnect(@_);
885 set cursor class on all storages, or return master's
890 my ($self, $cursor_class) = @_;
893 $_->cursor_class($cursor_class) for $self->all_storages;
895 $self->master->cursor_class;
900 set cursor class on all storages, or return master's, alias for L</cursor_class>
906 my ($self, $cursor_class) = @_;
909 $_->cursor($cursor_class) for $self->all_storages;
911 $self->master->cursor;
916 sets the L<DBIx::Class::Storage::DBI/unsafe> option on all storages or returns
917 master's current setting
925 $_->unsafe(@_) for $self->all_storages;
928 return $self->master->unsafe;
931 =head2 disable_sth_caching
933 sets the L<DBIx::Class::Storage::DBI/disable_sth_caching> option on all storages
934 or returns master's current setting
938 sub disable_sth_caching {
942 $_->disable_sth_caching(@_) for $self->all_storages;
945 return $self->master->disable_sth_caching;
948 =head2 lag_behind_master
950 returns the highest Replicant L<DBIx::Class::Storage::DBI/lag_behind_master>
955 sub lag_behind_master {
958 return max map $_->lag_behind_master, $self->replicants;
961 =head2 is_replicating
963 returns true if all replicants return true for
964 L<DBIx::Class::Storage::DBI/is_replicating>
971 return (grep $_->is_replicating, $self->replicants) == ($self->replicants);
974 =head2 connect_call_datetime_setup
976 calls L<DBIx::Class::Storage::DBI/connect_call_datetime_setup> for all storages
980 sub connect_call_datetime_setup {
982 $_->connect_call_datetime_setup for $self->all_storages;
987 $_->_populate_dbh for $self->all_storages;
992 $_->_connect for $self->all_storages;
997 $_->_rebless for $self->all_storages;
1000 sub _determine_driver {
1002 $_->_determine_driver for $self->all_storages;
1005 sub _driver_determined {
1009 $_->_driver_determined(@_) for $self->all_storages;
1012 return $self->master->_driver_determined;
1018 $_->_init for $self->all_storages;
1021 sub _run_connection_actions {
1024 $_->_run_connection_actions for $self->all_storages;
1027 sub _do_connection_actions {
1031 $_->_do_connection_actions(@_) for $self->all_storages;
1035 sub connect_call_do_sql {
1037 $_->connect_call_do_sql(@_) for $self->all_storages;
1040 sub disconnect_call_do_sql {
1042 $_->disconnect_call_do_sql(@_) for $self->all_storages;
1045 sub _seems_connected {
1048 return min map $_->_seems_connected, $self->all_storages;
1054 return min map $_->_ping, $self->all_storages;
1059 Due to the fact that replicants can lag behind a master, you must take care to
1060 make sure you use one of the methods to force read queries to a master should
1061 you need realtime data integrity. For example, if you insert a row, and then
1062 immediately re-read it from the database (say, by doing
1063 L<< $result->discard_changes|DBIx::Class::Row/discard_changes >>)
1064 or you insert a row and then immediately build a query that expects that row
1065 to be an item, you should force the master to handle reads. Otherwise, due to
1066 the lag, there is no certainty your data will be in the expected state.
1068 For data integrity, all transactions automatically use the master storage for
1069 all read and write queries. Using a transaction is the preferred and recommended
1070 method to force the master to handle all read queries.
1072 Otherwise, you can force a single query to use the master with the 'force_pool'
1075 my $result = $resultset->search(undef, {force_pool=>'master'})->find($pk);
1077 This attribute will safely be ignored by non replicated storages, so you can use
1078 the same code for both types of systems.
1080 Lastly, you can use the L</execute_reliably> method, which works very much like
1083 For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
1084 and L</set_balanced_storage>, however this operates at a global level and is not
1085 suitable if you have a shared Schema object being used by multiple processes,
1086 such as on a web application server. You can get around this limitation by
1087 using the Schema clone method.
1089 my $new_schema = $schema->clone;
1090 $new_schema->set_reliable_storage;
1092 ## $new_schema will use only the Master storage for all reads/writes while
1093 ## the $schema object will use replicated storage.
1095 =head1 FURTHER QUESTIONS?
1097 Check the list of L<additional DBIC resources|DBIx::Class/GETTING HELP/SUPPORT>.
1099 =head1 COPYRIGHT AND LICENSE
1101 This module is free software L<copyright|DBIx::Class/COPYRIGHT AND LICENSE>
1102 by the L<DBIx::Class (DBIC) authors|DBIx::Class/AUTHORS>. You can
1103 redistribute it and/or modify it under the same terms as the
1104 L<DBIx::Class library|DBIx::Class/COPYRIGHT AND LICENSE>.
1108 __PACKAGE__->meta->make_immutable;