1 package DBIx::Class::Storage::DBI::Replicated;
7 require DBIx::Class::Optional::Dependencies;
8 if ( my $missing = DBIx::Class::Optional::Dependencies->req_missing_for('replicated') ) {
9 die "The following modules are required for Replicated storage support: $missing\n";
14 use DBIx::Class::Storage::DBI;
15 use DBIx::Class::Storage::DBI::Replicated::Pool;
16 use DBIx::Class::Storage::DBI::Replicated::Balancer;
17 use DBIx::Class::Storage::DBI::Replicated::Types qw/BalancerClassNamePart DBICSchema DBICStorageDBI/;
18 use MooseX::Types::Moose qw/ClassName HashRef Object/;
19 use Scalar::Util 'reftype';
21 use List::Util qw/min max reduce/;
22 use Context::Preserve 'preserve_context';
24 use DBIx::Class::_Util 'dbic_internal_try';
26 use namespace::clean -except => 'meta';
30 DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
34 The Following example shows how to change an existing $schema to a replicated
35 storage type, add some replicated (read-only) databases, and perform reporting
38 You should set the 'storage_type attribute to a replicated type. You should
39 also define your arguments, such as which balancer you want and any arguments
40 that the Pool object should get.
42 my $schema = Schema::Class->clone;
43 $schema->storage_type(['::DBI::Replicated', { balancer_type => '::Random' }]);
44 $schema->connection(...);
46 Next, you need to add in the Replicants. Basically this is an array of
47 arrayrefs, where each arrayref is database connect information. Think of these
48 arguments as what you'd pass to the 'normal' $schema->connect method.
50 $schema->storage->connect_replicants(
51 [$dsn1, $user, $pass, \%opts],
52 [$dsn2, $user, $pass, \%opts],
53 [$dsn3, $user, $pass, \%opts],
56 Now, just use the $schema as you normally would. Automatically all reads will
57 be delegated to the replicants, while writes to the master.
59 $schema->resultset('Source')->search({name=>'etc'});
61 You can force a given query to use a particular storage using the search
62 attribute 'force_pool'. For example:
64 my $rs = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
66 Now $rs will force everything (both reads and writes) to use whatever was setup
67 as the master storage. 'master' is hardcoded to always point to the Master,
68 but you can also use any Replicant name. Please see:
69 L<DBIx::Class::Storage::DBI::Replicated::Pool> and the replicants attribute for more.
71 Also see transactions and L</execute_reliably> for alternative ways to
72 force read traffic to the master. In general, you should wrap your statements
73 in a transaction when you are reading and writing to the same tables at the
74 same time, since your replicants will often lag a bit behind the master.
76 If you have a multi-statement read only transaction you can force it to select
77 a random server in the pool by:
79 my $rs = $schema->resultset('Source')->search( undef,
80 { force_pool => $db->storage->read_handler->next_storage }
85 Warning: This class is marked BETA. This has been running a production
86 website using MySQL native replication as its backend and we have some decent
87 test coverage but the code hasn't yet been stressed by a variety of databases.
88 Individual DBs may have quirks we are not aware of. Please use this in first
89 development and pass along your experiences/bug fixes.
91 This class implements replicated data store for DBI. Currently you can define
92 one master and numerous slave database connections. All write-type queries
93 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
94 database, all read-type queries (SELECTs) go to the slave database.
96 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
97 handle gets delegated to one of the two attributes: L</read_handler> or to
98 L</write_handler>. Additionally, some methods need to be distributed
99 to all existing storages. This way our storage class is a drop in replacement
100 for L<DBIx::Class::Storage::DBI>.
102 Read traffic is spread across the replicants (slaves) occurring to a user
103 selected algorithm. The default algorithm is random weighted.
107 The consistency between master and replicants is database specific. The Pool
108 gives you a method to validate its replicants, removing and replacing them
109 when they fail/pass predefined criteria. Please make careful use of the ways
110 to force a query to run against Master when needed.
114 Replicated Storage has additional requirements not currently part of
115 L<DBIx::Class>. See L<DBIx::Class::Optional::Dependencies> for more details.
119 This class defines the following attributes.
123 The underlying L<DBIx::Class::Schema> object this storage is attaching
136 Contains the classname which will instantiate the L</pool> object. Defaults
137 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
144 default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
146 'create_pool' => 'new',
152 Contains a hashref of initialized information to pass to the Balancer object.
153 See L<DBIx::Class::Storage::DBI::Replicated::Pool> for available arguments.
167 The replication pool requires a balance class to provider the methods for
168 choose how to spread the query load across each replicant in the pool.
172 has 'balancer_type' => (
174 isa=>BalancerClassNamePart,
177 default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
179 'create_balancer' => 'new',
185 Contains a hashref of initialized information to pass to the Balancer object.
186 See L<DBIx::Class::Storage::DBI::Replicated::Balancer> for available arguments.
190 has 'balancer_args' => (
200 Is a L<DBIx::Class::Storage::DBI::Replicated::Pool> or derived class. This is a
201 container class for one or more replicated databases.
207 isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
218 Is a L<DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class. This
219 is a class that takes a pool (L<DBIx::Class::Storage::DBI::Replicated::Pool>)
225 isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
227 handles=>[qw/auto_validate_every/],
232 The master defines the canonical state for a pool of connected databases. All
233 the replicants are expected to match this databases state. Thus, in a classic
234 Master / Slaves distributed system, all the slaves are expected to replicate
235 the Master's state as quick as possible. This is the only database in the
236 pool of databases that is allowed to handle write traffic.
246 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
248 The following methods are delegated all the methods required for the
249 L<DBIx::Class::Storage::DBI> interface.
253 my $method_dispatch = {
265 deployment_statements
268 build_datetime_parser
283 with_deferred_fk_checks
291 relname_to_table_alias
293 _default_dbi_connect_attributes
295 _dbic_connect_attributes
301 bind_attribute_by_data_type
305 _dbh_execute_for_fetch
307 _dbh_execute_inserts_with_no_binds
308 _select_args_to_query
311 _normalize_connect_info
326 /, Class::MOP::Class->initialize('DBIx::Class::Storage::DBIHacks')->get_method_list ],
331 _dbh_columns_info_for
334 unimplemented => [qw/
335 _arm_global_destructor
339 get_use_dbms_capability
340 set_use_dbms_capability
347 _determine_connector_driver
348 _extract_driver_from_connect_info
350 _warn_undetermined_driver
357 _perform_autoinc_retrieval
358 _autoinc_supplied_for_op
371 # the capability framework
372 # not sure if CMOP->initialize does evil things to DBIC::S::DBI, fix if a problem
374 { $_ =~ /^ _ (?: use | supports | determine_supports ) _ /x and $_ ne '_use_multicolumn_in' }
375 ( Class::MOP::Class->initialize('DBIx::Class::Storage::DBI')->get_all_method_names )
379 # this only happens during DBIC-internal testing
380 if ( $INC{"t/lib/ANFANG.pm"} ) {
383 for my $type (keys %$method_dispatch) {
384 for (@{$method_dispatch->{$type}}) {
385 push @{$seen->{$_}}, $type;
389 if (my @dupes = grep { @{$seen->{$_}} > 1 } keys %$seen) {
391 'The following methods show up multiple times in ::Storage::DBI::Replicated handlers:',
392 (map { "$_: " . (join ', ', @{$seen->{$_}}) } sort @dupes),
397 if (my @cant = grep { ! DBIx::Class::Storage::DBI->can($_) } keys %$seen) {
399 '::Storage::DBI::Replicated specifies handling of the following *NON EXISTING* ::Storage::DBI methods:',
406 for my $method (@{$method_dispatch->{unimplemented}}) {
407 __PACKAGE__->meta->add_method($method, sub {
409 $self->throw_exception("$method() must not be called on ".(blessed $self).' objects');
415 Defines an object that implements the read side of L<DBIx::Class::Storage::DBI>.
419 has 'read_handler' => (
423 handles=>$method_dispatch->{reader},
428 Defines an object that implements the write side of L<DBIx::Class::Storage::DBI>,
429 as well as methods that don't write or read that can be called on only one
430 storage, methods that return a C<$dbh>, and any methods that don't make sense to
435 has 'write_handler' => (
439 handles=>$method_dispatch->{writer},
444 has _master_connect_info_opts =>
445 (is => 'rw', isa => HashRef, default => sub { {} });
447 =head2 around: connect_info
449 Preserves master's C<connect_info> options (for merging with replicants.)
450 Also sets any Replicated-related options from connect_info, such as
451 C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>.
455 around connect_info => sub {
456 my ($next, $self, $info, @extra) = @_;
458 $self->throw_exception(
459 'connect_info can not be retrieved from a replicated storage - '
460 . 'accessor must be called on a specific pool instance'
461 ) unless defined $info;
463 my $merge = Hash::Merge->new('LEFT_PRECEDENT');
466 for my $arg (@$info) {
467 next unless (reftype($arg)||'') eq 'HASH';
468 %opts = %{ $merge->merge($arg, \%opts) };
472 if (@opts{qw/pool_type pool_args/}) {
473 $self->pool_type(delete $opts{pool_type})
477 $merge->merge((delete $opts{pool_args} || {}), $self->pool_args)
480 ## Since we possibly changed the pool_args, we need to clear the current
481 ## pool object so that next time it is used it will be rebuilt.
485 if (@opts{qw/balancer_type balancer_args/}) {
486 $self->balancer_type(delete $opts{balancer_type})
487 if $opts{balancer_type};
489 $self->balancer_args(
490 $merge->merge((delete $opts{balancer_args} || {}), $self->balancer_args)
493 $self->balancer($self->_build_balancer)
497 $self->_master_connect_info_opts(\%opts);
499 return preserve_context {
500 $self->$next($info, @extra);
502 # Make sure master is blessed into the correct class and apply role to it.
503 my $master = $self->master;
504 $master->_determine_driver;
505 Moose::Meta::Class->initialize(ref $master);
507 DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master);
509 # link pool back to master
510 $self->pool->master($master);
516 This class defines the following methods.
520 L<DBIx::Class::Schema> when instantiating its storage passed itself as the
521 first argument. So we need to massage the arguments a bit so that all the
522 bits get put into the correct places.
527 my ($class, $schema, $storage_type_args, @args) = @_;
538 Lazy builder for the L</master> attribute.
544 my $master = DBIx::Class::Storage::DBI->new($self->schema);
550 Lazy builder for the L</pool> attribute.
556 $self->create_pool(%{$self->pool_args});
559 =head2 _build_balancer
561 Lazy builder for the L</balancer> attribute. This takes a Pool object so that
562 the balancer knows which pool it's balancing.
566 sub _build_balancer {
568 $self->create_balancer(
570 master=>$self->master,
571 %{$self->balancer_args},
575 =head2 _build_write_handler
577 Lazy builder for the L</write_handler> attribute. The default is to set this to
582 sub _build_write_handler {
583 return shift->master;
586 =head2 _build_read_handler
588 Lazy builder for the L</read_handler> attribute. The default is to set this to
593 sub _build_read_handler {
594 return shift->balancer;
597 =head2 around: connect_replicants
599 All calls to connect_replicants needs to have an existing $schema tacked onto
600 top of the args, since L<DBIx::Class::Storage::DBI> needs it, and any
601 L<connect_info|DBIx::Class::Storage::DBI/connect_info>
602 options merged with the master, with replicant opts having higher priority.
606 around connect_replicants => sub {
607 my ($next, $self, @args) = @_;
610 $r = [ $r ] unless reftype $r eq 'ARRAY';
612 $self->throw_exception('coderef replicant connect_info not supported')
613 if ref $r->[0] && reftype $r->[0] eq 'CODE';
615 # any connect_info options?
617 $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
620 $r->[$i] = {} unless $r->[$i];
622 # merge if two hashes
623 my @hashes = @$r[$i .. $#{$r}];
625 $self->throw_exception('invalid connect_info options')
626 if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes;
628 $self->throw_exception('too many hashrefs in connect_info')
631 my $merge = Hash::Merge->new('LEFT_PRECEDENT');
632 my %opts = %{ $merge->merge(reverse @hashes) };
635 splice @$r, $i+1, ($#{$r} - $i), ();
637 # make sure master/replicants opts don't clash
638 my %master_opts = %{ $self->_master_connect_info_opts };
639 if (exists $opts{dbh_maker}) {
640 delete @master_opts{qw/dsn user password/};
642 delete $master_opts{dbh_maker};
645 %opts = %{ $merge->merge(\%opts, \%master_opts) };
651 $self->$next($self->schema, @args);
656 Returns an array of all the connected storage backends. The first element
657 in the returned array is the master, and the rest are each of the
664 return grep {defined $_ && blessed $_} (
666 values %{ $self->replicants },
670 =head2 execute_reliably ($coderef, ?@args)
672 Given a coderef, saves the current state of the L</read_handler>, forces it to
673 use reliable storage (e.g. sets it to the master), executes a coderef and then
674 restores the original state.
680 $schema->resultset('User')->create({name=>$name});
681 my $user_rs = $schema->resultset('User')->find({name=>$name});
685 my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
687 Use this when you must be certain of your database state, such as when you just
688 inserted something and need to get a resultset including it, etc.
692 sub execute_reliably {
696 unless( ref $coderef eq 'CODE') {
697 $self->throw_exception('Second argument must be a coderef');
700 ## replace the current read handler for the remainder of the scope
701 local $self->{read_handler} = $self->master;
704 return dbic_internal_try {
707 $self->throw_exception("coderef returned an error: $_");
711 =head2 set_reliable_storage
713 Sets the current $schema to be 'reliable', that is all queries, both read and
714 write are sent to the master
718 sub set_reliable_storage {
720 my $schema = $self->schema;
721 my $write_handler = $self->schema->storage->write_handler;
723 $schema->storage->read_handler($write_handler);
726 =head2 set_balanced_storage
728 Sets the current $schema to be use the </balancer> for all reads, while all
729 writes are sent to the master only
733 sub set_balanced_storage {
735 my $schema = $self->schema;
736 my $balanced_handler = $self->schema->storage->balancer;
738 $schema->storage->read_handler($balanced_handler);
743 Check that the master and at least one of the replicants is connected.
750 $self->master->connected &&
751 $self->pool->connected_replicants;
754 =head2 ensure_connected
756 Make sure all the storages are connected.
760 sub ensure_connected {
762 foreach my $source ($self->all_storages) {
763 $source->ensure_connected(@_);
769 Set the limit_dialect for all existing storages
775 foreach my $source ($self->all_storages) {
776 $source->limit_dialect(@_);
778 return $self->master->limit_dialect;
783 Set the quote_char for all existing storages
789 foreach my $source ($self->all_storages) {
790 $source->quote_char(@_);
792 return $self->master->quote_char;
797 Set the name_sep for all existing storages
803 foreach my $source ($self->all_storages) {
804 $source->name_sep(@_);
806 return $self->master->name_sep;
811 Set the schema object for all existing storages
817 foreach my $source ($self->all_storages) {
818 $source->set_schema(@_);
824 set a debug flag across all storages
831 foreach my $source ($self->all_storages) {
835 return $self->master->debug;
846 return $self->master->debugobj(@_);
857 return $self->master->debugfh(@_);
868 return $self->master->debugcb(@_);
873 disconnect everything
879 foreach my $source ($self->all_storages) {
880 $source->disconnect(@_);
886 set cursor class on all storages, or return master's
891 my ($self, $cursor_class) = @_;
894 $_->cursor_class($cursor_class) for $self->all_storages;
896 $self->master->cursor_class;
901 set cursor class on all storages, or return master's, alias for L</cursor_class>
907 my ($self, $cursor_class) = @_;
910 $_->cursor($cursor_class) for $self->all_storages;
912 $self->master->cursor;
917 sets the L<DBIx::Class::Storage::DBI/unsafe> option on all storages or returns
918 master's current setting
926 $_->unsafe(@_) for $self->all_storages;
929 return $self->master->unsafe;
932 =head2 disable_sth_caching
934 sets the L<DBIx::Class::Storage::DBI/disable_sth_caching> option on all storages
935 or returns master's current setting
939 sub disable_sth_caching {
943 $_->disable_sth_caching(@_) for $self->all_storages;
946 return $self->master->disable_sth_caching;
949 =head2 lag_behind_master
951 returns the highest Replicant L<DBIx::Class::Storage::DBI/lag_behind_master>
956 sub lag_behind_master {
959 return max map $_->lag_behind_master, $self->replicants;
962 =head2 is_replicating
964 returns true if all replicants return true for
965 L<DBIx::Class::Storage::DBI/is_replicating>
972 return (grep $_->is_replicating, $self->replicants) == ($self->replicants);
975 =head2 connect_call_datetime_setup
977 calls L<DBIx::Class::Storage::DBI/connect_call_datetime_setup> for all storages
981 sub connect_call_datetime_setup {
983 $_->connect_call_datetime_setup for $self->all_storages;
988 $_->_populate_dbh for $self->all_storages;
993 $_->_connect for $self->all_storages;
998 $_->_rebless for $self->all_storages;
1001 sub _determine_driver {
1003 $_->_determine_driver for $self->all_storages;
1006 sub _driver_determined {
1010 $_->_driver_determined(@_) for $self->all_storages;
1013 return $self->master->_driver_determined;
1019 $_->_init for $self->all_storages;
1022 sub _run_connection_actions {
1025 $_->_run_connection_actions for $self->all_storages;
1028 sub _do_connection_actions {
1032 $_->_do_connection_actions(@_) for $self->all_storages;
1036 sub connect_call_do_sql {
1038 $_->connect_call_do_sql(@_) for $self->all_storages;
1041 sub disconnect_call_do_sql {
1043 $_->disconnect_call_do_sql(@_) for $self->all_storages;
1046 sub _seems_connected {
1049 return min map $_->_seems_connected, $self->all_storages;
1055 return min map $_->_ping, $self->all_storages;
1058 # not using the normalized_version, because we want to preserve
1059 # version numbers much longer than the conventional xxx.yyyzzz
1060 my $numify_ver = sub {
1062 my @numparts = split /\D+/, $ver;
1063 my $format = '%d.' . (join '', ('%06d') x (@numparts - 1));
1065 return sprintf $format, @numparts;
1070 if (not $self->_dbh_details->{info}) {
1071 $self->_dbh_details->{info} = (
1072 reduce { $a->[0] < $b->[0] ? $a : $b }
1073 map [ $numify_ver->($_->{dbms_version}), $_ ],
1074 map $_->_server_info, $self->all_storages
1078 return $self->next::method;
1081 sub _get_server_version {
1084 return $self->_server_info->{dbms_version};
1089 Due to the fact that replicants can lag behind a master, you must take care to
1090 make sure you use one of the methods to force read queries to a master should
1091 you need realtime data integrity. For example, if you insert a row, and then
1092 immediately re-read it from the database (say, by doing
1093 L<< $result->discard_changes|DBIx::Class::Row/discard_changes >>)
1094 or you insert a row and then immediately build a query that expects that row
1095 to be an item, you should force the master to handle reads. Otherwise, due to
1096 the lag, there is no certainty your data will be in the expected state.
1098 For data integrity, all transactions automatically use the master storage for
1099 all read and write queries. Using a transaction is the preferred and recommended
1100 method to force the master to handle all read queries.
1102 Otherwise, you can force a single query to use the master with the 'force_pool'
1105 my $result = $resultset->search(undef, {force_pool=>'master'})->find($pk);
1107 This attribute will safely be ignored by non replicated storages, so you can use
1108 the same code for both types of systems.
1110 Lastly, you can use the L</execute_reliably> method, which works very much like
1113 For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
1114 and L</set_balanced_storage>, however this operates at a global level and is not
1115 suitable if you have a shared Schema object being used by multiple processes,
1116 such as on a web application server. You can get around this limitation by
1117 using the Schema clone method.
1119 my $new_schema = $schema->clone;
1120 $new_schema->set_reliable_storage;
1122 ## $new_schema will use only the Master storage for all reads/writes while
1123 ## the $schema object will use replicated storage.
1125 =head1 FURTHER QUESTIONS?
1127 Check the list of L<additional DBIC resources|DBIx::Class/GETTING HELP/SUPPORT>.
1129 =head1 COPYRIGHT AND LICENSE
1131 This module is free software L<copyright|DBIx::Class/COPYRIGHT AND LICENSE>
1132 by the L<DBIx::Class (DBIC) authors|DBIx::Class/AUTHORS>. You can
1133 redistribute it and/or modify it under the same terms as the
1134 L<DBIx::Class library|DBIx::Class/COPYRIGHT AND LICENSE>.
1138 __PACKAGE__->meta->make_immutable;