1 package DBIx::Class::Storage::DBI::Replicated;
4 use DBIx::Class::Storage::DBI;
5 use DBIx::Class::Storage::DBI::Replicated::Pool;
6 use DBIx::Class::Storage::DBI::Replicated::Balancer;
7 use Scalar::Util qw(blessed);
9 extends 'DBIx::Class::Storage::DBI', 'Moose::Object';
13 DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
17 The Following example shows how to change an existing $schema to a replicated
18 storage type, add some replicated (readonly) databases, and perform reporting
21 ## Change storage_type in your schema class
22 $schema->storage_type( '::DBI::Replicated' );
24 ## Add some slaves. Basically this is an array of arrayrefs, where each
25 ## arrayref is database connect information
27 $schema->storage->create_replicants(
28 [$dsn1, $user, $pass, \%opts],
29 [$dsn1, $user, $pass, \%opts],
30 [$dsn1, $user, $pass, \%opts],
31 ## This is just going to use the standard DBIC connect method, so it
32 ## supports everything that method supports, such as connecting to an
33 ## existing database handle.
38 ## a hash of replicants, keyed by their DSN
39 my %replicants = $schema->storage->replicants;
40 my $replicant = $schema->storage->get_replicant($dsn);
42 $replicant->is_active;
47 Warning: This class is marked ALPHA. We are using this in development and have
48 some basic test coverage but the code hasn't yet been stressed by a variety
49 of databases. Individual DB's may have quirks we are not aware of. Please
50 use this in development and pass along your experiences/bug fixes.
52 This class implements replicated data store for DBI. Currently you can define
53 one master and numerous slave database connections. All write-type queries
54 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
55 database, all read-type queries (SELECTs) go to the slave database.
57 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
58 handle gets delegated to one of the two attributes: L</master_storage> or to
59 L</current_replicant_storage>. Additionally, some methods need to be distributed
60 to all existing storages. This way our storage class is a drop in replacement
61 for L<DBIx::Class::Storage::DBI>.
63 Read traffic is spread across the replicants (slaves) occuring to a user
64 selected algorithm. The default algorithm is random weighted.
66 TODO more details about the algorithm.
70 This class defines the following attributes.
74 The master defines the canonical state for a pool of connected databases. All
75 the replicants are expected to match this databases state. Thus, in a classic
76 Master / Slaves distributed system, all the slaves are expected to replicate
77 the Master's state as quick as possible. This is the only database in the
78 pool of databases that is allowed to handle write traffic.
84 isa=>'DBIx::Class::Storage::DBI',
112 =head2 current_replicant
114 Replicant storages (slaves) handle all read only traffic. The assumption is
115 that your database will become readbound well before it becomes write bound
116 and that being able to spread your read only traffic around to multiple
117 databases is going to help you to scale traffic.
119 This attribute returns the next slave to handle a read request. Your L</pool>
120 attribute has methods to help you shuffle through all the available replicants
121 via it's balancer object.
123 This attribute defines the following reader/writer methods
127 =item get_current_replicant
129 Returns the contained L<DBIx::Class::Storage::DBI> replicant
131 =item set_current_replicant
133 Set the attribute to a given L<DBIx::Class::Storage::DBI> (or subclass) object.
137 We split the reader/writer to make it easier to selectively override how the
138 replicant is altered.
142 has 'current_replicant' => (
144 reader=>'get_current_replicant',
145 writer=>'set_current_replicant',
146 isa=>'DBIx::Class::Storage::DBI',
158 Contains the classname which will instantiate the L</pool> object. Defaults
159 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
168 default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
170 'create_pool' => 'new',
177 The replication pool requires a balance class to provider the methods for
178 choose how to spread the query load across each replicant in the pool.
182 has 'balancer_type' => (
187 default=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
189 'create_balancer' => 'new',
196 Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class. This is a
197 container class for one or more replicated databases.
203 isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
217 Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class. This
218 is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
224 isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
226 handles=>[qw/next_storage/],
231 This class defines the following methods.
235 Lazy builder for the L</master> attribute.
240 DBIx::Class::Storage::DBI->new;
244 =head2 _build_current_replicant
246 Lazy builder for the L</current_replicant_storage> attribute.
250 sub _build_current_replicant {
252 $self->next_storage($self->pool);
258 Lazy builder for the L</pool> attribute.
268 =head2 _build_balancer
270 Lazy builder for the L</balancer> attribute.
274 sub _build_balancer {
276 $self->create_balancer;
280 =head2 after: get_current_replicant_storage
282 Advice on the current_replicant_storage attribute. Each time we use a replicant
283 we need to change it via the storage pool algorithm. That way we are spreading
284 the load evenly (hopefully) across existing capacity.
288 after 'get_current_replicant' => sub {
290 my $next_replicant = $self->next_storage($self->pool);
292 $self->set_current_replicant($next_replicant);
298 Returns an array of of all the connected storage backends. The first element
299 in the returned array is the master, and the remainings are each of the
307 return grep {defined $_ && blessed $_} (
316 Check that the master and at least one of the replicants is connected.
324 $self->master->connected &&
325 $self->pool->connected_replicants;
329 =head2 ensure_connected
331 Make sure all the storages are connected.
335 sub ensure_connected {
337 foreach my $source ($self->all_storages) {
338 $source->ensure_connected(@_);
345 Set the limit_dialect for all existing storages
351 foreach my $source ($self->all_storages) {
352 $source->limit_dialect(@_);
359 Set the quote_char for all existing storages
365 foreach my $source ($self->all_storages) {
366 $source->quote_char(@_);
373 Set the name_sep for all existing storages
379 foreach my $source ($self->all_storages) {
380 $source->name_sep(@_);
387 Set the schema object for all existing storages
393 foreach my $source ($self->all_storages) {
394 $source->set_schema(@_);
401 set a debug flag across all storages
407 foreach my $source ($self->all_storages) {
415 set a debug object across all storages
421 foreach my $source ($self->all_storages) {
422 $source->debugobj(@_);
429 set a debugfh object across all storages
435 foreach my $source ($self->all_storages) {
436 $source->debugfh(@_);
443 set a debug callback across all storages
449 foreach my $source ($self->all_storages) {
450 $source->debugcb(@_);
457 disconnect everything
463 foreach my $source ($self->all_storages) {
464 $source->disconnect(@_);
471 Make sure we pass destroy events down to the storage handlers
477 ## TODO, maybe we can just leave this alone ???
483 Norbert Csongrádi <bert@cpan.org>
485 Peter Siklósi <einon@einon.hu>
487 John Napiorkowski <john.napiorkowski@takkle.com>
491 You may distribute this code under the same terms as Perl itself.
502 use DBIx::Class::Storage::DBI;
505 use base qw/Class::Accessor::Fast/;
507 __PACKAGE__->mk_accessors( qw/read_source write_source/ );
511 DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
515 The Following example shows how to change an existing $schema to a replicated
516 storage type and update it's connection information to contain a master DSN and
519 ## Change storage_type in your schema class
520 $schema->storage_type( '::DBI::Replicated' );
522 ## Set your connection.
524 $dsn, $user, $password, {
526 ## Other standard DBI connection or DBD custom attributes added as
527 ## usual. Additionally, we have two custom attributes for defining
528 ## slave information and controlling how the underlying DBD::Multi
529 slaves_connect_info => [
530 ## Define each slave like a 'normal' DBI connection, but you add
531 ## in a DBD::Multi custom attribute to define how the slave is
532 ## prioritized. Please see DBD::Multi for more.
533 [$slave1dsn, $user, $password, {%slave1opts, priority=>10}],
534 [$slave2dsn, $user, $password, {%slave2opts, priority=>10}],
535 [$slave3dsn, $user, $password, {%slave3opts, priority=>20}],
536 ## add in a preexisting database handle
537 [$dbh, '','', {priority=>30}],
538 ## DBD::Multi will call this coderef for connects
539 [sub { DBI->connect(< DSN info >) }, '', '', {priority=>40}],
540 ## If the last item is hashref, we use that for DBD::Multi's
541 ## configuration information. Again, see DBD::Multi for more.
542 {timeout=>25, failed_max=>2},
547 ## Now, just use the schema as normal
548 $schema->resultset('Table')->find(< unique >); ## Reads will use slaves
549 $schema->resultset('Table')->create(\%info); ## Writes will use master
553 Warning: This class is marked ALPHA. We are using this in development and have
554 some basic test coverage but the code hasn't yet been stressed by a variety
555 of databases. Individual DB's may have quirks we are not aware of. Please
556 use this in development and pass along your experiences/bug fixes.
558 This class implements replicated data store for DBI. Currently you can define
559 one master and numerous slave database connections. All write-type queries
560 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
561 database, all read-type queries (SELECTs) go to the slave database.
563 For every slave database you can define a priority value, which controls data
564 source usage pattern. It uses L<DBD::Multi>, so first the lower priority data
565 sources used (if they have the same priority, the are used randomized), than
566 if all low priority data sources fail, higher ones tried in order.
570 Please see L<DBD::Multi> for most configuration information.
576 my $class = ref( $proto ) || $proto;
579 bless( $self, $class );
581 $self->write_source( DBIx::Class::Storage::DBI->new );
582 $self->read_source( DBIx::Class::Storage::DBI->new );
590 my @sources = ($self->read_source, $self->write_source);
592 return wantarray ? @sources : \@sources;
597 my $master = $self->write_source->_connect_info;
598 $master->[-1]->{slave_connect_info} = $self->read_source->_connect_info;
603 my ($self, $source_info) = @_;
605 ## if there is no $source_info, treat this sub like an accessor
606 return $self->_connect_info
609 ## Alright, let's conect the master
610 $self->write_source->connect_info($source_info);
612 ## Now, build and then connect the Slaves
613 my @slaves_connect_info = @{$source_info->[-1]->{slaves_connect_info}};
614 my $dbd_multi_config = ref $slaves_connect_info[-1] eq 'HASH'
615 ? pop @slaves_connect_info : {};
617 ## We need to do this since SQL::Abstract::Limit can't guess what DBD::Multi is
618 $dbd_multi_config->{limit_dialect} = $self->write_source->sql_maker->limit_dialect
619 unless defined $dbd_multi_config->{limit_dialect};
621 @slaves_connect_info = map {
622 ## if the first element in the arrayhash is a ref, make that the value
623 my $db = ref $_->[0] ? $_->[0] : $_;
624 my $priority = $_->[-1]->{priority} || 10; ## default priority is 10
626 } @slaves_connect_info;
628 $self->read_source->connect_info([
629 'dbi:Multi:', undef, undef, {
630 dsns => [@slaves_connect_info],
635 ## Return the formated connection information
636 return $self->_connect_info;
640 shift->read_source->select( @_ );
643 shift->read_source->select_single( @_ );
645 sub throw_exception {
646 shift->read_source->throw_exception( @_ );
649 shift->read_source->sql_maker( @_ );
651 sub columns_info_for {
652 shift->read_source->columns_info_for( @_ );
655 shift->read_source->sqlt_type( @_ );
658 shift->read_source->create_ddl_dir( @_ );
660 sub deployment_statements {
661 shift->read_source->deployment_statements( @_ );
663 sub datetime_parser {
664 shift->read_source->datetime_parser( @_ );
666 sub datetime_parser_type {
667 shift->read_source->datetime_parser_type( @_ );
669 sub build_datetime_parser {
670 shift->read_source->build_datetime_parser( @_ );
673 sub limit_dialect { $_->limit_dialect( @_ ) for( shift->all_sources ) }
674 sub quote_char { $_->quote_char( @_ ) for( shift->all_sources ) }
675 sub name_sep { $_->quote_char( @_ ) for( shift->all_sources ) }
676 sub disconnect { $_->disconnect( @_ ) for( shift->all_sources ) }
677 sub set_schema { $_->set_schema( @_ ) for( shift->all_sources ) }
682 undef $self->{write_source};
683 undef $self->{read_sources};
687 shift->write_source->last_insert_id( @_ );
690 shift->write_source->insert( @_ );
693 shift->write_source->update( @_ );
696 shift->write_source->update_all( @_ );
699 shift->write_source->delete( @_ );
702 shift->write_source->delete_all( @_ );
705 shift->write_source->create( @_ );
708 shift->write_source->find_or_create( @_ );
710 sub update_or_create {
711 shift->write_source->update_or_create( @_ );
714 shift->write_source->connected( @_ );
716 sub ensure_connected {
717 shift->write_source->ensure_connected( @_ );
720 shift->write_source->dbh( @_ );
723 shift->write_source->txn_do( @_ );
726 shift->write_source->txn_commit( @_ );
729 shift->write_source->txn_rollback( @_ );
732 shift->write_source->sth( @_ );
735 shift->write_source->deploy( @_ );
737 sub _prep_for_execute {
738 shift->write_source->_prep_for_execute(@_);
742 shift->write_source->debugobj(@_);
745 shift->write_source->debug(@_);
748 sub debugfh { shift->_not_supported( 'debugfh' ) };
749 sub debugcb { shift->_not_supported( 'debugcb' ) };
752 my( $self, $method ) = @_;
754 die "This Storage does not support $method method.";
759 L<DBI::Class::Storage::DBI>, L<DBD::Multi>, L<DBI>
763 Norbert Csongrádi <bert@cpan.org>
765 Peter Siklósi <einon@einon.hu>
767 John Napiorkowski <john.napiorkowski@takkle.com>
771 You may distribute this code under the same terms as Perl itself.