1 package DBIx::Class::Storage::DBI::Replicated;
5 die('The following modules are required for Replication ' . DBIx::Class::Optional::Dependencies->req_missing_for ('replicated') . "\n" )
6 unless DBIx::Class::Optional::Dependencies->req_ok_for ('replicated');
10 use DBIx::Class::Storage::DBI;
11 use DBIx::Class::Storage::DBI::Replicated::Pool;
12 use DBIx::Class::Storage::DBI::Replicated::Balancer;
13 use DBIx::Class::Storage::DBI::Replicated::Types qw/BalancerClassNamePart DBICSchema DBICStorageDBI/;
14 use MooseX::Types::Moose qw/ClassName HashRef Object/;
15 use Scalar::Util 'reftype';
17 use List::Util qw/min max reduce/;
21 use namespace::clean -except => 'meta';
25 DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
29 The Following example shows how to change an existing $schema to a replicated
30 storage type, add some replicated (read-only) databases, and perform reporting
33 You should set the 'storage_type attribute to a replicated type. You should
34 also define your arguments, such as which balancer you want and any arguments
35 that the Pool object should get.
37 my $schema = Schema::Class->clone;
38 $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
39 $schema->connection(...);
41 Next, you need to add in the Replicants. Basically this is an array of
42 arrayrefs, where each arrayref is database connect information. Think of these
43 arguments as what you'd pass to the 'normal' $schema->connect method.
45 $schema->storage->connect_replicants(
46 [$dsn1, $user, $pass, \%opts],
47 [$dsn2, $user, $pass, \%opts],
48 [$dsn3, $user, $pass, \%opts],
51 Now, just use the $schema as you normally would. Automatically all reads will
52 be delegated to the replicants, while writes to the master.
54 $schema->resultset('Source')->search({name=>'etc'});
56 You can force a given query to use a particular storage using the search
57 attribute 'force_pool'. For example:
59 my $rs = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
61 Now $rs will force everything (both reads and writes) to use whatever was setup
62 as the master storage. 'master' is hardcoded to always point to the Master,
63 but you can also use any Replicant name. Please see:
64 L<DBIx::Class::Storage::DBI::Replicated::Pool> and the replicants attribute for more.
66 Also see transactions and L</execute_reliably> for alternative ways to
67 force read traffic to the master. In general, you should wrap your statements
68 in a transaction when you are reading and writing to the same tables at the
69 same time, since your replicants will often lag a bit behind the master.
71 If you have a multi-statement read only transaction you can force it to select
72 a random server in the pool by:
74 my $rs = $schema->resultset('Source')->search( undef,
75 { force_pool => $db->storage->read_handler->next_storage }
80 Warning: This class is marked BETA. This has been running a production
81 website using MySQL native replication as its backend and we have some decent
82 test coverage but the code hasn't yet been stressed by a variety of databases.
83 Individual DBs may have quirks we are not aware of. Please use this in first
84 development and pass along your experiences/bug fixes.
86 This class implements replicated data store for DBI. Currently you can define
87 one master and numerous slave database connections. All write-type queries
88 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
89 database, all read-type queries (SELECTs) go to the slave database.
91 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
92 handle gets delegated to one of the two attributes: L</read_handler> or to
93 L</write_handler>. Additionally, some methods need to be distributed
94 to all existing storages. This way our storage class is a drop in replacement
95 for L<DBIx::Class::Storage::DBI>.
97 Read traffic is spread across the replicants (slaves) occurring to a user
98 selected algorithm. The default algorithm is random weighted.
102 The consistency between master and replicants is database specific. The Pool
103 gives you a method to validate its replicants, removing and replacing them
104 when they fail/pass predefined criteria. Please make careful use of the ways
105 to force a query to run against Master when needed.
109 Replicated Storage has additional requirements not currently part of
110 L<DBIx::Class>. See L<DBIx::Class::Optional::Dependencies> for more details.
114 This class defines the following attributes.
118 The underlying L<DBIx::Class::Schema> object this storage is attaching
131 Contains the classname which will instantiate the L</pool> object. Defaults
132 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
139 default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
141 'create_pool' => 'new',
147 Contains a hashref of initialized information to pass to the Balancer object.
148 See L<DBIx::Class::Storage::DBI::Replicated::Pool> for available arguments.
162 The replication pool requires a balance class to provider the methods for
163 choose how to spread the query load across each replicant in the pool.
167 has 'balancer_type' => (
169 isa=>BalancerClassNamePart,
172 default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
174 'create_balancer' => 'new',
180 Contains a hashref of initialized information to pass to the Balancer object.
181 See L<DBIx::Class::Storage::DBI::Replicated::Balancer> for available arguments.
185 has 'balancer_args' => (
195 Is a L<DBIx::Class::Storage::DBI::Replicated::Pool> or derived class. This is a
196 container class for one or more replicated databases.
202 isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
213 Is a L<DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class. This
214 is a class that takes a pool (L<DBIx::Class::Storage::DBI::Replicated::Pool>)
220 isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
222 handles=>[qw/auto_validate_every/],
227 The master defines the canonical state for a pool of connected databases. All
228 the replicants are expected to match this databases state. Thus, in a classic
229 Master / Slaves distributed system, all the slaves are expected to replicate
230 the Master's state as quick as possible. This is the only database in the
231 pool of databases that is allowed to handle write traffic.
241 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
243 The following methods are delegated all the methods required for the
244 L<DBIx::Class::Storage::DBI> interface.
248 my $method_dispatch = {
260 deployment_statements
263 build_datetime_parser
279 with_deferred_fk_checks
287 relname_to_table_alias
289 _default_dbi_connect_attributes
291 _dbic_connect_attributes
297 bind_attribute_by_data_type
301 _dbh_execute_for_fetch
303 _dbh_execute_inserts_with_no_binds
304 _select_args_to_query
307 _normalize_connect_info
321 /, Class::MOP::Class->initialize('DBIx::Class::Storage::DBIHacks')->get_method_list ],
326 _dbh_columns_info_for
329 unimplemented => [qw/
330 _arm_global_destructor
333 source_bind_attributes
335 get_use_dbms_capability
336 set_use_dbms_capability
357 # the capability framework
358 # not sure if CMOP->initialize does evil things to DBIC::S::DBI, fix if a problem
360 { $_ =~ /^ _ (?: use | supports | determine_supports ) _ /x }
361 ( Class::MOP::Class->initialize('DBIx::Class::Storage::DBI')->get_all_method_names )
365 if (DBIx::Class::_ENV_::DBICTEST) {
368 for my $type (keys %$method_dispatch) {
369 for (@{$method_dispatch->{$type}}) {
370 push @{$seen->{$_}}, $type;
374 if (my @dupes = grep { @{$seen->{$_}} > 1 } keys %$seen) {
376 'The following methods show up multiple times in ::Storage::DBI::Replicated handlers:',
377 (map { "$_: " . (join ', ', @{$seen->{$_}}) } sort @dupes),
382 if (my @cant = grep { ! DBIx::Class::Storage::DBI->can($_) } keys %$seen) {
384 '::Storage::DBI::Replicated specifies handling of the following *NON EXISTING* ::Storage::DBI methods:',
391 for my $method (@{$method_dispatch->{unimplemented}}) {
392 __PACKAGE__->meta->add_method($method, sub {
394 $self->throw_exception("$method must not be called on ".(blessed $self).' objects');
400 Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
404 has 'read_handler' => (
408 handles=>$method_dispatch->{reader},
413 Defines an object that implements the write side of L<BIx::Class::Storage::DBI>,
414 as well as methods that don't write or read that can be called on only one
415 storage, methods that return a C<$dbh>, and any methods that don't make sense to
420 has 'write_handler' => (
424 handles=>$method_dispatch->{writer},
429 has _master_connect_info_opts =>
430 (is => 'rw', isa => HashRef, default => sub { {} });
432 =head2 around: connect_info
434 Preserves master's C<connect_info> options (for merging with replicants.)
435 Also sets any Replicated-related options from connect_info, such as
436 C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>.
440 around connect_info => sub {
441 my ($next, $self, $info, @extra) = @_;
443 my $merge = Hash::Merge->new('LEFT_PRECEDENT');
446 for my $arg (@$info) {
447 next unless (reftype($arg)||'') eq 'HASH';
448 %opts = %{ $merge->merge($arg, \%opts) };
452 if (@opts{qw/pool_type pool_args/}) {
453 $self->pool_type(delete $opts{pool_type})
457 $merge->merge((delete $opts{pool_args} || {}), $self->pool_args)
460 ## Since we possibly changed the pool_args, we need to clear the current
461 ## pool object so that next time it is used it will be rebuilt.
465 if (@opts{qw/balancer_type balancer_args/}) {
466 $self->balancer_type(delete $opts{balancer_type})
467 if $opts{balancer_type};
469 $self->balancer_args(
470 $merge->merge((delete $opts{balancer_args} || {}), $self->balancer_args)
473 $self->balancer($self->_build_balancer)
477 $self->_master_connect_info_opts(\%opts);
481 @res = $self->$next($info, @extra);
483 $res[0] = $self->$next($info, @extra);
486 # Make sure master is blessed into the correct class and apply role to it.
487 my $master = $self->master;
488 $master->_determine_driver;
489 Moose::Meta::Class->initialize(ref $master);
491 DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master);
493 # link pool back to master
494 $self->pool->master($master);
496 wantarray ? @res : $res[0];
501 This class defines the following methods.
505 L<DBIx::Class::Schema> when instantiating its storage passed itself as the
506 first argument. So we need to massage the arguments a bit so that all the
507 bits get put into the correct places.
512 my ($class, $schema, $storage_type_args, @args) = @_;
523 Lazy builder for the L</master> attribute.
529 my $master = DBIx::Class::Storage::DBI->new($self->schema);
535 Lazy builder for the L</pool> attribute.
541 $self->create_pool(%{$self->pool_args});
544 =head2 _build_balancer
546 Lazy builder for the L</balancer> attribute. This takes a Pool object so that
547 the balancer knows which pool it's balancing.
551 sub _build_balancer {
553 $self->create_balancer(
555 master=>$self->master,
556 %{$self->balancer_args},
560 =head2 _build_write_handler
562 Lazy builder for the L</write_handler> attribute. The default is to set this to
567 sub _build_write_handler {
568 return shift->master;
571 =head2 _build_read_handler
573 Lazy builder for the L</read_handler> attribute. The default is to set this to
578 sub _build_read_handler {
579 return shift->balancer;
582 =head2 around: connect_replicants
584 All calls to connect_replicants needs to have an existing $schema tacked onto
585 top of the args, since L<DBIx::Storage::DBI> needs it, and any C<connect_info>
586 options merged with the master, with replicant opts having higher priority.
590 around connect_replicants => sub {
591 my ($next, $self, @args) = @_;
594 $r = [ $r ] unless reftype $r eq 'ARRAY';
596 $self->throw_exception('coderef replicant connect_info not supported')
597 if ref $r->[0] && reftype $r->[0] eq 'CODE';
599 # any connect_info options?
601 $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
604 $r->[$i] = {} unless $r->[$i];
606 # merge if two hashes
607 my @hashes = @$r[$i .. $#{$r}];
609 $self->throw_exception('invalid connect_info options')
610 if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes;
612 $self->throw_exception('too many hashrefs in connect_info')
615 my $merge = Hash::Merge->new('LEFT_PRECEDENT');
616 my %opts = %{ $merge->merge(reverse @hashes) };
619 splice @$r, $i+1, ($#{$r} - $i), ();
621 # make sure master/replicants opts don't clash
622 my %master_opts = %{ $self->_master_connect_info_opts };
623 if (exists $opts{dbh_maker}) {
624 delete @master_opts{qw/dsn user password/};
626 delete $master_opts{dbh_maker};
629 %opts = %{ $merge->merge(\%opts, \%master_opts) };
635 $self->$next($self->schema, @args);
640 Returns an array of of all the connected storage backends. The first element
641 in the returned array is the master, and the remainings are each of the
648 return grep {defined $_ && blessed $_} (
650 values %{ $self->replicants },
654 =head2 execute_reliably ($coderef, ?@args)
656 Given a coderef, saves the current state of the L</read_handler>, forces it to
657 use reliable storage (e.g. sets it to the master), executes a coderef and then
658 restores the original state.
664 $schema->resultset('User')->create({name=>$name});
665 my $user_rs = $schema->resultset('User')->find({name=>$name});
669 my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
671 Use this when you must be certain of your database state, such as when you just
672 inserted something and need to get a resultset including it, etc.
676 sub execute_reliably {
677 my ($self, $coderef, @args) = @_;
679 unless( ref $coderef eq 'CODE') {
680 $self->throw_exception('Second argument must be a coderef');
683 ##Get copy of master storage
684 my $master = $self->master;
686 ##Get whatever the current read hander is
687 my $current = $self->read_handler;
689 ##Set the read handler to master
690 $self->read_handler($master);
692 ## do whatever the caller needs
694 my $want_array = wantarray;
698 @result = $coderef->(@args);
699 } elsif(defined $want_array) {
700 ($result[0]) = ($coderef->(@args));
705 $self->throw_exception("coderef returned an error: $_");
707 ##Reset to the original state
708 $self->read_handler($current);
711 return wantarray ? @result : $result[0];
714 =head2 set_reliable_storage
716 Sets the current $schema to be 'reliable', that is all queries, both read and
717 write are sent to the master
721 sub set_reliable_storage {
723 my $schema = $self->schema;
724 my $write_handler = $self->schema->storage->write_handler;
726 $schema->storage->read_handler($write_handler);
729 =head2 set_balanced_storage
731 Sets the current $schema to be use the </balancer> for all reads, while all
732 writes are sent to the master only
736 sub set_balanced_storage {
738 my $schema = $self->schema;
739 my $balanced_handler = $self->schema->storage->balancer;
741 $schema->storage->read_handler($balanced_handler);
746 Check that the master and at least one of the replicants is connected.
753 $self->master->connected &&
754 $self->pool->connected_replicants;
757 =head2 ensure_connected
759 Make sure all the storages are connected.
763 sub ensure_connected {
765 foreach my $source ($self->all_storages) {
766 $source->ensure_connected(@_);
772 Set the limit_dialect for all existing storages
778 foreach my $source ($self->all_storages) {
779 $source->limit_dialect(@_);
781 return $self->master->limit_dialect;
786 Set the quote_char for all existing storages
792 foreach my $source ($self->all_storages) {
793 $source->quote_char(@_);
795 return $self->master->quote_char;
800 Set the name_sep for all existing storages
806 foreach my $source ($self->all_storages) {
807 $source->name_sep(@_);
809 return $self->master->name_sep;
814 Set the schema object for all existing storages
820 foreach my $source ($self->all_storages) {
821 $source->set_schema(@_);
827 set a debug flag across all storages
834 foreach my $source ($self->all_storages) {
838 return $self->master->debug;
849 return $self->master->debugobj(@_);
860 return $self->master->debugfh(@_);
871 return $self->master->debugcb(@_);
876 disconnect everything
882 foreach my $source ($self->all_storages) {
883 $source->disconnect(@_);
889 set cursor class on all storages, or return master's
894 my ($self, $cursor_class) = @_;
897 $_->cursor_class($cursor_class) for $self->all_storages;
899 $self->master->cursor_class;
904 set cursor class on all storages, or return master's, alias for L</cursor_class>
910 my ($self, $cursor_class) = @_;
913 $_->cursor($cursor_class) for $self->all_storages;
915 $self->master->cursor;
920 sets the L<DBIx::Class::Storage::DBI/unsafe> option on all storages or returns
921 master's current setting
929 $_->unsafe(@_) for $self->all_storages;
932 return $self->master->unsafe;
935 =head2 disable_sth_caching
937 sets the L<DBIx::Class::Storage::DBI/disable_sth_caching> option on all storages
938 or returns master's current setting
942 sub disable_sth_caching {
946 $_->disable_sth_caching(@_) for $self->all_storages;
949 return $self->master->disable_sth_caching;
952 =head2 lag_behind_master
954 returns the highest Replicant L<DBIx::Class::Storage::DBI/lag_behind_master>
959 sub lag_behind_master {
962 return max map $_->lag_behind_master, $self->replicants;
965 =head2 is_replicating
967 returns true if all replicants return true for
968 L<DBIx::Class::Storage::DBI/is_replicating>
975 return (grep $_->is_replicating, $self->replicants) == ($self->replicants);
978 =head2 connect_call_datetime_setup
980 calls L<DBIx::Class::Storage::DBI/connect_call_datetime_setup> for all storages
984 sub connect_call_datetime_setup {
986 $_->connect_call_datetime_setup for $self->all_storages;
991 $_->_populate_dbh for $self->all_storages;
996 $_->_connect for $self->all_storages;
1001 $_->_rebless for $self->all_storages;
1004 sub _determine_driver {
1006 $_->_determine_driver for $self->all_storages;
1009 sub _driver_determined {
1013 $_->_driver_determined(@_) for $self->all_storages;
1016 return $self->master->_driver_determined;
1022 $_->_init for $self->all_storages;
1025 sub _run_connection_actions {
1028 $_->_run_connection_actions for $self->all_storages;
1031 sub _do_connection_actions {
1035 $_->_do_connection_actions(@_) for $self->all_storages;
1039 sub connect_call_do_sql {
1041 $_->connect_call_do_sql(@_) for $self->all_storages;
1044 sub disconnect_call_do_sql {
1046 $_->disconnect_call_do_sql(@_) for $self->all_storages;
1049 sub _seems_connected {
1052 return min map $_->_seems_connected, $self->all_storages;
1058 return min map $_->_ping, $self->all_storages;
1061 # not using the normalized_version, because we want to preserve
1062 # version numbers much longer than the conventional xxx.yyyzzz
1063 my $numify_ver = sub {
1065 my @numparts = split /\D+/, $ver;
1066 my $format = '%d.' . (join '', ('%06d') x (@numparts - 1));
1068 return sprintf $format, @numparts;
1073 if (not $self->_dbh_details->{info}) {
1074 $self->_dbh_details->{info} = (
1075 reduce { $a->[0] < $b->[0] ? $a : $b }
1076 map [ $numify_ver->($_->{dbms_version}), $_ ],
1077 map $_->_server_info, $self->all_storages
1081 return $self->next::method;
1084 sub _get_server_version {
1087 return $self->_server_info->{dbms_version};
1092 Due to the fact that replicants can lag behind a master, you must take care to
1093 make sure you use one of the methods to force read queries to a master should
1094 you need realtime data integrity. For example, if you insert a row, and then
1095 immediately re-read it from the database (say, by doing $row->discard_changes)
1096 or you insert a row and then immediately build a query that expects that row
1097 to be an item, you should force the master to handle reads. Otherwise, due to
1098 the lag, there is no certainty your data will be in the expected state.
1100 For data integrity, all transactions automatically use the master storage for
1101 all read and write queries. Using a transaction is the preferred and recommended
1102 method to force the master to handle all read queries.
1104 Otherwise, you can force a single query to use the master with the 'force_pool'
1107 my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk);
1109 This attribute will safely be ignore by non replicated storages, so you can use
1110 the same code for both types of systems.
1112 Lastly, you can use the L</execute_reliably> method, which works very much like
1115 For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
1116 and L</set_balanced_storage>, however this operates at a global level and is not
1117 suitable if you have a shared Schema object being used by multiple processes,
1118 such as on a web application server. You can get around this limitation by
1119 using the Schema clone method.
1121 my $new_schema = $schema->clone;
1122 $new_schema->set_reliable_storage;
1124 ## $new_schema will use only the Master storage for all reads/writes while
1125 ## the $schema object will use replicated storage.
1129 John Napiorkowski <john.napiorkowski@takkle.com>
1131 Based on code originated by:
1133 Norbert Csongrádi <bert@cpan.org>
1134 Peter Siklósi <einon@einon.hu>
1138 You may distribute this code under the same terms as Perl itself.
1142 __PACKAGE__->meta->make_immutable;