1 package DBIx::Class::Storage::DBI::Replicated;
7 require DBIx::Class::Optional::Dependencies;
8 if ( my $missing = DBIx::Class::Optional::Dependencies->req_missing_for('replicated') ) {
9 die "The following modules are required for Replicated storage support: $missing\n";
14 use DBIx::Class::Storage::DBI;
15 use DBIx::Class::Storage::DBI::Replicated::Pool;
16 use DBIx::Class::Storage::DBI::Replicated::Balancer;
17 use DBIx::Class::Storage::DBI::Replicated::Types qw/BalancerClassNamePart DBICSchema DBICStorageDBI/;
18 use MooseX::Types::Moose qw/ClassName HashRef Object/;
19 use Scalar::Util 'reftype';
21 use List::Util qw/min max reduce/;
22 use Context::Preserve 'preserve_context';
24 use DBIx::Class::_Util 'dbic_internal_try';
26 use namespace::clean -except => 'meta';
30 DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
34 The Following example shows how to change an existing $schema to a replicated
35 storage type, add some replicated (read-only) databases, and perform reporting
38 You should set the 'storage_type attribute to a replicated type. You should
39 also define your arguments, such as which balancer you want and any arguments
40 that the Pool object should get.
42 my $schema = Schema::Class->clone;
43 $schema->storage_type(['::DBI::Replicated', { balancer_type => '::Random' }]);
44 $schema->connection(...);
46 Next, you need to add in the Replicants. Basically this is an array of
47 arrayrefs, where each arrayref is database connect information. Think of these
48 arguments as what you'd pass to the 'normal' $schema->connect method.
50 $schema->storage->connect_replicants(
51 [$dsn1, $user, $pass, \%opts],
52 [$dsn2, $user, $pass, \%opts],
53 [$dsn3, $user, $pass, \%opts],
56 Now, just use the $schema as you normally would. Automatically all reads will
57 be delegated to the replicants, while writes to the master.
59 $schema->resultset('Source')->search({name=>'etc'});
61 You can force a given query to use a particular storage using the search
62 attribute 'force_pool'. For example:
64 my $rs = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
66 Now $rs will force everything (both reads and writes) to use whatever was setup
67 as the master storage. 'master' is hardcoded to always point to the Master,
68 but you can also use any Replicant name. Please see:
69 L<DBIx::Class::Storage::DBI::Replicated::Pool> and the replicants attribute for more.
71 Also see transactions and L</execute_reliably> for alternative ways to
72 force read traffic to the master. In general, you should wrap your statements
73 in a transaction when you are reading and writing to the same tables at the
74 same time, since your replicants will often lag a bit behind the master.
76 If you have a multi-statement read only transaction you can force it to select
77 a random server in the pool by:
79 my $rs = $schema->resultset('Source')->search( undef,
80 { force_pool => $db->storage->read_handler->next_storage }
85 Warning: This class is marked BETA. This has been running a production
86 website using MySQL native replication as its backend and we have some decent
87 test coverage but the code hasn't yet been stressed by a variety of databases.
88 Individual DBs may have quirks we are not aware of. Please use this in first
89 development and pass along your experiences/bug fixes.
91 This class implements replicated data store for DBI. Currently you can define
92 one master and numerous slave database connections. All write-type queries
93 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
94 database, all read-type queries (SELECTs) go to the slave database.
96 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
97 handle gets delegated to one of the two attributes: L</read_handler> or to
98 L</write_handler>. Additionally, some methods need to be distributed
99 to all existing storages. This way our storage class is a drop in replacement
100 for L<DBIx::Class::Storage::DBI>.
102 Read traffic is spread across the replicants (slaves) occurring to a user
103 selected algorithm. The default algorithm is random weighted.
107 The consistency between master and replicants is database specific. The Pool
108 gives you a method to validate its replicants, removing and replacing them
109 when they fail/pass predefined criteria. Please make careful use of the ways
110 to force a query to run against Master when needed.
114 Replicated Storage has additional requirements not currently part of
115 L<DBIx::Class>. See L<DBIx::Class::Optional::Dependencies> for more details.
119 This class defines the following attributes.
123 The underlying L<DBIx::Class::Schema> object this storage is attaching
136 Contains the classname which will instantiate the L</pool> object. Defaults
137 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
144 default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
146 'create_pool' => 'new',
152 Contains a hashref of initialized information to pass to the Balancer object.
153 See L<DBIx::Class::Storage::DBI::Replicated::Pool> for available arguments.
167 The replication pool requires a balance class to provider the methods for
168 choose how to spread the query load across each replicant in the pool.
172 has 'balancer_type' => (
174 isa=>BalancerClassNamePart,
177 default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
179 'create_balancer' => 'new',
185 Contains a hashref of initialized information to pass to the Balancer object.
186 See L<DBIx::Class::Storage::DBI::Replicated::Balancer> for available arguments.
190 has 'balancer_args' => (
200 Is a L<DBIx::Class::Storage::DBI::Replicated::Pool> or derived class. This is a
201 container class for one or more replicated databases.
207 isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
218 Is a L<DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class. This
219 is a class that takes a pool (L<DBIx::Class::Storage::DBI::Replicated::Pool>)
225 isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
227 handles=>[qw/auto_validate_every/],
232 The master defines the canonical state for a pool of connected databases. All
233 the replicants are expected to match this databases state. Thus, in a classic
234 Master / Slaves distributed system, all the slaves are expected to replicate
235 the Master's state as quick as possible. This is the only database in the
236 pool of databases that is allowed to handle write traffic.
246 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
248 The following methods are delegated all the methods required for the
249 L<DBIx::Class::Storage::DBI> interface.
253 my $method_dispatch = {
265 deployment_statements
268 build_datetime_parser
283 with_deferred_fk_checks
291 relname_to_table_alias
293 _default_dbi_connect_attributes
295 _dbic_connect_attributes
301 bind_attribute_by_data_type
305 _dbh_execute_for_fetch
307 _dbh_execute_inserts_with_no_binds
308 _select_args_to_query
311 _normalize_connect_info
326 /, Class::MOP::Class->initialize('DBIx::Class::Storage::DBIHacks')->get_method_list ],
331 _dbh_columns_info_for
334 unimplemented => [qw/
335 _arm_global_destructor
339 get_use_dbms_capability
340 set_use_dbms_capability
347 _determine_connector_driver
348 _extract_driver_from_connect_info
350 _warn_undetermined_driver
357 _perform_autoinc_retrieval
358 _autoinc_supplied_for_op
371 # the capability framework
372 # not sure if CMOP->initialize does evil things to DBIC::S::DBI, fix if a problem
374 { $_ =~ /^ _ (?: use | supports | determine_supports ) _ /x and $_ ne '_use_multicolumn_in' }
375 ( Class::MOP::Class->initialize('DBIx::Class::Storage::DBI')->get_all_method_names )
379 # this only happens during DBIC-internal testing
380 if ( $INC{"t/lib/ANFANG.pm"} ) {
383 for my $type (keys %$method_dispatch) {
384 for (@{$method_dispatch->{$type}}) {
385 push @{$seen->{$_}}, $type;
389 if (my @dupes = grep { @{$seen->{$_}} > 1 } keys %$seen) {
391 'The following methods show up multiple times in ::Storage::DBI::Replicated handlers:',
392 (map { "$_: " . (join ', ', @{$seen->{$_}}) } sort @dupes),
397 if (my @cant = grep { ! DBIx::Class::Storage::DBI->can($_) } keys %$seen) {
399 '::Storage::DBI::Replicated specifies handling of the following *NON EXISTING* ::Storage::DBI methods:',
406 for my $method (@{$method_dispatch->{unimplemented}}) {
407 __PACKAGE__->meta->add_method($method, sub {
409 $self->throw_exception("$method() must not be called on ".(blessed $self).' objects');
415 Defines an object that implements the read side of L<DBIx::Class::Storage::DBI>.
419 has 'read_handler' => (
423 handles=>$method_dispatch->{reader},
428 Defines an object that implements the write side of L<DBIx::Class::Storage::DBI>,
429 as well as methods that don't write or read that can be called on only one
430 storage, methods that return a C<$dbh>, and any methods that don't make sense to
435 has 'write_handler' => (
439 handles=>$method_dispatch->{writer},
444 has _master_connect_info_opts =>
445 (is => 'rw', isa => HashRef, default => sub { {} });
447 =head2 around: connect_info
449 Preserves master's C<connect_info> options (for merging with replicants.)
450 Also sets any Replicated-related options from connect_info, such as
451 C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>.
455 around connect_info => sub {
456 my ($next, $self, $info, @extra) = @_;
458 $self->throw_exception(
459 'connect_info can not be retrieved from a replicated storage - '
460 . 'accessor must be called on a specific pool instance'
461 ) unless defined $info;
463 my $merge = Hash::Merge->new('LEFT_PRECEDENT');
466 for my $arg (@$info) {
467 next unless (reftype($arg)||'') eq 'HASH';
468 %opts = %{ $merge->merge($arg, \%opts) };
472 if (@opts{qw/pool_type pool_args/}) {
473 $self->pool_type(delete $opts{pool_type})
477 $merge->merge((delete $opts{pool_args} || {}), $self->pool_args)
480 ## Since we possibly changed the pool_args, we need to clear the current
481 ## pool object so that next time it is used it will be rebuilt.
485 if (@opts{qw/balancer_type balancer_args/}) {
486 $self->balancer_type(delete $opts{balancer_type})
487 if $opts{balancer_type};
489 $self->balancer_args(
490 $merge->merge((delete $opts{balancer_args} || {}), $self->balancer_args)
493 $self->balancer($self->_build_balancer)
497 $self->_master_connect_info_opts(\%opts);
499 return preserve_context {
500 $self->$next($info, @extra);
502 # Make sure master is blessed into the correct class and apply role to it.
503 my $master = $self->master;
504 $master->_determine_driver;
505 Moose::Meta::Class->initialize(ref $master);
507 DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master);
509 # link pool back to master
510 $self->pool->master($master);
516 This class defines the following methods.
520 L<DBIx::Class::Schema> when instantiating its storage passed itself as the
521 first argument. So we need to massage the arguments a bit so that all the
522 bits get put into the correct places.
527 my ($class, $schema, $storage_type_args, @args) = @_;
538 Lazy builder for the L</master> attribute.
544 my $master = DBIx::Class::Storage::DBI->new($self->schema);
550 Lazy builder for the L</pool> attribute.
556 $self->create_pool(%{$self->pool_args});
559 =head2 _build_balancer
561 Lazy builder for the L</balancer> attribute. This takes a Pool object so that
562 the balancer knows which pool it's balancing.
566 sub _build_balancer {
568 $self->create_balancer(
570 master=>$self->master,
571 %{$self->balancer_args},
575 =head2 _build_write_handler
577 Lazy builder for the L</write_handler> attribute. The default is to set this to
582 sub _build_write_handler {
583 return shift->master;
586 =head2 _build_read_handler
588 Lazy builder for the L</read_handler> attribute. The default is to set this to
593 sub _build_read_handler {
594 return shift->balancer;
597 =head2 around: connect_replicants
599 All calls to connect_replicants needs to have an existing $schema tacked onto
600 top of the args, since L<DBIx::Class::Storage::DBI> needs it, and any
601 L<connect_info|DBIx::Class::Storage::DBI/connect_info>
602 options merged with the master, with replicant opts having higher priority.
606 around connect_replicants => sub {
607 my ($next, $self, @args) = @_;
610 $r = [ $r ] unless reftype $r eq 'ARRAY';
612 $self->throw_exception('coderef replicant connect_info not supported')
613 if ref $r->[0] && reftype $r->[0] eq 'CODE';
615 # any connect_info options?
617 $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
620 $r->[$i] = {} unless $r->[$i];
622 # merge if two hashes
623 my @hashes = @$r[$i .. $#{$r}];
625 $self->throw_exception('invalid connect_info options')
626 if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes;
628 $self->throw_exception('too many hashrefs in connect_info')
631 my $merge = Hash::Merge->new('LEFT_PRECEDENT');
632 my %opts = %{ $merge->merge(reverse @hashes) };
635 splice @$r, $i+1, ($#{$r} - $i), ();
637 # make sure master/replicants opts don't clash
638 my %master_opts = %{ $self->_master_connect_info_opts };
639 if (exists $opts{dbh_maker}) {
640 delete @master_opts{qw/dsn user password/};
642 delete $master_opts{dbh_maker};
645 %opts = %{ $merge->merge(\%opts, \%master_opts) };
651 $self->$next($self->schema, @args);
656 Returns an array of all the connected storage backends. The first element
657 in the returned array is the master, and the rest are each of the
664 return grep {defined $_ && blessed $_} (
666 values %{ $self->replicants },
670 =head2 execute_reliably ($coderef, ?@args)
672 Given a coderef, saves the current state of the L</read_handler>, forces it to
673 use reliable storage (e.g. sets it to the master), executes a coderef and then
674 restores the original state.
680 $schema->resultset('User')->create({name=>$name});
681 my $user_rs = $schema->resultset('User')->find({name=>$name});
685 my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
687 Use this when you must be certain of your database state, such as when you just
688 inserted something and need to get a resultset including it, etc.
692 sub execute_reliably {
696 $self->throw_exception('Second argument must be a coderef')
697 unless( ref $coderef eq 'CODE');
699 ## replace the current read handler for the remainder of the scope
700 local $self->{read_handler} = $self->master;
705 =head2 set_reliable_storage
707 Sets the current $schema to be 'reliable', that is all queries, both read and
708 write are sent to the master
712 sub set_reliable_storage {
714 my $schema = $self->schema;
715 my $write_handler = $self->schema->storage->write_handler;
717 $schema->storage->read_handler($write_handler);
720 =head2 set_balanced_storage
722 Sets the current $schema to be use the </balancer> for all reads, while all
723 writes are sent to the master only
727 sub set_balanced_storage {
729 my $schema = $self->schema;
730 my $balanced_handler = $self->schema->storage->balancer;
732 $schema->storage->read_handler($balanced_handler);
737 Check that the master and at least one of the replicants is connected.
744 $self->master->connected &&
745 $self->pool->connected_replicants;
748 =head2 ensure_connected
750 Make sure all the storages are connected.
754 sub ensure_connected {
756 foreach my $source ($self->all_storages) {
757 $source->ensure_connected(@_);
763 Set the limit_dialect for all existing storages
769 foreach my $source ($self->all_storages) {
770 $source->limit_dialect(@_);
772 return $self->master->limit_dialect;
777 Set the quote_char for all existing storages
783 foreach my $source ($self->all_storages) {
784 $source->quote_char(@_);
786 return $self->master->quote_char;
791 Set the name_sep for all existing storages
797 foreach my $source ($self->all_storages) {
798 $source->name_sep(@_);
800 return $self->master->name_sep;
805 Set the schema object for all existing storages
811 foreach my $source ($self->all_storages) {
812 $source->set_schema(@_);
818 set a debug flag across all storages
825 foreach my $source ($self->all_storages) {
829 return $self->master->debug;
840 return $self->master->debugobj(@_);
851 return $self->master->debugfh(@_);
862 return $self->master->debugcb(@_);
867 disconnect everything
873 foreach my $source ($self->all_storages) {
874 $source->disconnect(@_);
880 set cursor class on all storages, or return master's
885 my ($self, $cursor_class) = @_;
888 $_->cursor_class($cursor_class) for $self->all_storages;
890 $self->master->cursor_class;
895 set cursor class on all storages, or return master's, alias for L</cursor_class>
901 my ($self, $cursor_class) = @_;
904 $_->cursor($cursor_class) for $self->all_storages;
906 $self->master->cursor;
911 sets the L<DBIx::Class::Storage::DBI/unsafe> option on all storages or returns
912 master's current setting
920 $_->unsafe(@_) for $self->all_storages;
923 return $self->master->unsafe;
926 =head2 disable_sth_caching
928 sets the L<DBIx::Class::Storage::DBI/disable_sth_caching> option on all storages
929 or returns master's current setting
933 sub disable_sth_caching {
937 $_->disable_sth_caching(@_) for $self->all_storages;
940 return $self->master->disable_sth_caching;
943 =head2 lag_behind_master
945 returns the highest Replicant L<DBIx::Class::Storage::DBI/lag_behind_master>
950 sub lag_behind_master {
953 return max map $_->lag_behind_master, $self->replicants;
956 =head2 is_replicating
958 returns true if all replicants return true for
959 L<DBIx::Class::Storage::DBI/is_replicating>
966 return (grep $_->is_replicating, $self->replicants) == ($self->replicants);
969 =head2 connect_call_datetime_setup
971 calls L<DBIx::Class::Storage::DBI/connect_call_datetime_setup> for all storages
975 sub connect_call_datetime_setup {
977 $_->connect_call_datetime_setup for $self->all_storages;
982 $_->_populate_dbh for $self->all_storages;
987 $_->_connect for $self->all_storages;
992 $_->_rebless for $self->all_storages;
995 sub _determine_driver {
997 $_->_determine_driver for $self->all_storages;
1000 sub _driver_determined {
1004 $_->_driver_determined(@_) for $self->all_storages;
1007 return $self->master->_driver_determined;
1013 $_->_init for $self->all_storages;
1016 sub _run_connection_actions {
1019 $_->_run_connection_actions for $self->all_storages;
1022 sub _do_connection_actions {
1026 $_->_do_connection_actions(@_) for $self->all_storages;
1030 sub connect_call_do_sql {
1032 $_->connect_call_do_sql(@_) for $self->all_storages;
1035 sub disconnect_call_do_sql {
1037 $_->disconnect_call_do_sql(@_) for $self->all_storages;
1040 sub _seems_connected {
1043 return min map $_->_seems_connected, $self->all_storages;
1049 return min map $_->_ping, $self->all_storages;
1052 # not using the normalized_version, because we want to preserve
1053 # version numbers much longer than the conventional xxx.yyyzzz
1054 my $numify_ver = sub {
1056 my @numparts = split /\D+/, $ver;
1057 my $format = '%d.' . (join '', ('%06d') x (@numparts - 1));
1059 return sprintf $format, @numparts;
1064 if (not $self->_dbh_details->{info}) {
1065 $self->_dbh_details->{info} = (
1066 reduce { $a->[0] < $b->[0] ? $a : $b }
1067 map [ $numify_ver->($_->{dbms_version}), $_ ],
1068 map $_->_server_info, $self->all_storages
1072 return $self->next::method;
1075 sub _get_server_version {
1078 return $self->_server_info->{dbms_version};
1083 Due to the fact that replicants can lag behind a master, you must take care to
1084 make sure you use one of the methods to force read queries to a master should
1085 you need realtime data integrity. For example, if you insert a row, and then
1086 immediately re-read it from the database (say, by doing
1087 L<< $result->discard_changes|DBIx::Class::Row/discard_changes >>)
1088 or you insert a row and then immediately build a query that expects that row
1089 to be an item, you should force the master to handle reads. Otherwise, due to
1090 the lag, there is no certainty your data will be in the expected state.
1092 For data integrity, all transactions automatically use the master storage for
1093 all read and write queries. Using a transaction is the preferred and recommended
1094 method to force the master to handle all read queries.
1096 Otherwise, you can force a single query to use the master with the 'force_pool'
1099 my $result = $resultset->search(undef, {force_pool=>'master'})->find($pk);
1101 This attribute will safely be ignored by non replicated storages, so you can use
1102 the same code for both types of systems.
1104 Lastly, you can use the L</execute_reliably> method, which works very much like
1107 For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
1108 and L</set_balanced_storage>, however this operates at a global level and is not
1109 suitable if you have a shared Schema object being used by multiple processes,
1110 such as on a web application server. You can get around this limitation by
1111 using the Schema clone method.
1113 my $new_schema = $schema->clone;
1114 $new_schema->set_reliable_storage;
1116 ## $new_schema will use only the Master storage for all reads/writes while
1117 ## the $schema object will use replicated storage.
1119 =head1 FURTHER QUESTIONS?
1121 Check the list of L<additional DBIC resources|DBIx::Class/GETTING HELP/SUPPORT>.
1123 =head1 COPYRIGHT AND LICENSE
1125 This module is free software L<copyright|DBIx::Class/COPYRIGHT AND LICENSE>
1126 by the L<DBIx::Class (DBIC) authors|DBIx::Class/AUTHORS>. You can
1127 redistribute it and/or modify it under the same terms as the
1128 L<DBIx::Class library|DBIx::Class/COPYRIGHT AND LICENSE>.
1132 __PACKAGE__->meta->make_immutable;