1 package DBIx::Class::Storage::DBI::Replicated;
4 use DBIx::Class::Storage::DBI::Replicated::Pool;
6 #extends 'DBIx::Class::Storage::DBI', 'Moose::Object';
10 DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
14 The Following example shows how to change an existing $schema to a replicated
15 storage type, add some replicated (readonly) databases, and perform reporting
18 ## Change storage_type in your schema class
19 $schema->storage_type( '::DBI::Replicated' );
21 ## Add some slaves. Basically this is an array of arrayrefs, where each
22 ## arrayref is database connect information
24 $schema->storage->create_replicants(
25 [$dsn1, $user, $pass, \%opts],
26 [$dsn1, $user, $pass, \%opts],
27 [$dsn1, $user, $pass, \%opts],
28 ## This is just going to use the standard DBIC connect method, so it
29 ## supports everything that method supports, such as connecting to an
30 ## existing database handle.
35 ## a hash of replicants, keyed by their DSN
36 my %replicants = $schema->storage->replicants;
37 my $replicant = $schema->storage->get_replicant($dsn);
39 $replicant->is_active;
44 Warning: This class is marked ALPHA. We are using this in development and have
45 some basic test coverage but the code hasn't yet been stressed by a variety
46 of databases. Individual DB's may have quirks we are not aware of. Please
47 use this in development and pass along your experiences/bug fixes.
49 This class implements replicated data store for DBI. Currently you can define
50 one master and numerous slave database connections. All write-type queries
51 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
52 database, all read-type queries (SELECTs) go to the slave database.
54 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
55 handle gets delegated to one of the two attributes: L</master_storage> or to
56 L</current_replicant_storage>. Additionally, some methods need to be distributed
57 to all existing storages. This way our storage class is a drop in replacement
58 for L<DBIx::Class::Storage::DBI>.
60 Read traffic is spread across the replicants (slaves) occuring to a user
61 selected algorithm. The default algorithm is random weighted.
63 TODO more details about the algorithm.
67 This class defines the following attributes.
71 The master defines the canonical state for a pool of connected databases. All
72 the replicants are expected to match this databases state. Thus, in a classic
73 Master / Slaves distributed system, all the slaves are expected to replicate
74 the Master's state as quick as possible. This is the only database in the
75 pool of databases that is allowed to handle write traffic.
81 isa=>'DBIx::Class::Storage::DBI',
110 =head2 current_replicant
112 Replicant storages (slaves) handle all read only traffic. The assumption is
113 that your database will become readbound well before it becomes write bound
114 and that being able to spread your read only traffic around to multiple
115 databases is going to help you to scale traffic.
117 This attribute returns the next slave to handle a read request. Your L</pool>
118 attribute has methods to help you shuffle through all the available replicants
119 via it's balancer object.
121 This attribute defines the following reader/writer methods
125 =item get_current_replicant
127 Returns the contained L<DBIx::Class::Storage::DBI> replicant
129 =item set_current_replicant
131 Set the attribute to a given L<DBIx::Class::Storage::DBI> (or subclass) object.
135 We split the reader/writer to make it easier to selectively override how the
136 replicant is altered.
140 has 'current_replicant' => (
142 reader=>'get_current_replicant',
143 writer=>'set_current_replicant',
144 isa=>'DBIx::Class::Storage::DBI',
154 =head2 replicant_storage_pool_type
156 Contains the classname which will instantiate the L</replicant_storage_pool>
157 object. Defaults to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
161 has 'replicant_storage_pool_type' => (
165 default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
167 'create_replicant_storage_pool' => 'new',
172 =head2 pool_balancer_type
174 The replication pool requires a balance class to provider the methods for
175 choose how to spread the query load across each replicant in the pool.
179 has 'pool_balancer_type' => (
183 default=>'DBIx::Class::Storage::DBI::Replicated::Pool::Balancer',
185 'create_replicant_storage_pool' => 'new',
190 =head2 replicant_storage_pool
192 Holds the list of connected replicants, their status and other housekeeping or
197 has 'replicant_storage_pool' => (
199 isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
201 handles=>[qw/replicant_storages/],
208 This class defines the following methods.
212 Make sure we properly inherit from L<Moose>.
217 my $class = shift @_;
218 my $obj = $class->SUPER::new(@_);
220 return $class->meta->new_object(
221 __INSTANCE__ => $obj, @_
225 =head2 _build_master_storage
227 Lazy builder for the L</master_storage> attribute.
231 sub _build_next_replicant_storage {
232 DBIx::Class::Storage::DBI->new;
236 =head2 _build_current_replicant_storage
238 Lazy builder for the L</current_replicant_storage> attribute.
242 sub _build_current_replicant_storage {
243 shift->replicant_storage_pool->first;
247 =head2 _build_replicant_storage_pool
249 Lazy builder for the L</replicant_storage_pool> attribute.
253 sub _build_replicant_storage_pool {
255 $self->create_replicant_storage_pool;
259 =head2 around: create_replicant_storage_pool
261 Make sure all calles to the method set a default balancer type to our current
266 around 'create_replicant_storage_pool' => sub {
267 my ($method, $self, @args) = @_;
268 return $self->$method(balancer_type=>$self->pool_balancer_type, @args);
272 =head2 after: get_current_replicant_storage
274 Advice on the current_replicant_storage attribute. Each time we use a replicant
275 we need to change it via the storage pool algorithm. That way we are spreading
276 the load evenly (hopefully) across existing capacity.
280 after 'get_current_replicant_storage' => sub {
282 my $next_replicant = $self->replicant_storage_pool->next;
283 $self->next_replicant_storage($next_replicant);
287 =head2 find_or_create
289 First do a find on the replicant. If no rows are found, pass it on to the
300 Returns an array of of all the connected storage backends. The first element
301 in the returned array is the master, and the remainings are each of the
310 $self->master_storage,
311 $self->replicant_storages,
318 Check that the master and at least one of the replicants is connected.
326 $self->master_storage->connected &&
327 $self->replicant_storage_pool->has_connected_slaves;
331 =head2 ensure_connected
333 Make sure all the storages are connected.
337 sub ensure_connected {
339 foreach $source (shift->all_sources) {
340 $source->ensure_connected(@_);
347 Set the limit_dialect for all existing storages
353 foreach $source (shift->all_sources) {
354 $source->name_sep(@_);
361 Set the quote_char for all existing storages
367 foreach $source (shift->all_sources) {
368 $source->name_sep(@_);
375 Set the name_sep for all existing storages
381 foreach $source (shift->all_sources) {
382 $source->name_sep(@_);
389 Set the schema object for all existing storages
395 foreach $source (shift->all_sources) {
396 $source->set_schema(@_);
403 set a debug flag across all storages
409 foreach $source (shift->all_sources) {
417 set a debug object across all storages
423 foreach $source (shift->all_sources) {
424 $source->debugobj(@_);
431 set a debugfh object across all storages
437 foreach $source (shift->all_sources) {
438 $source->debugfh(@_);
445 set a debug callback across all storages
451 foreach $source (shift->all_sources) {
452 $source->debugcb(@_);
459 disconnect everything
465 foreach $source (shift->all_sources) {
466 $source->disconnect(@_);
473 Make sure we pass destroy events down to the storage handlers
479 ## TODO, maybe we can just leave this alone ???
485 Norbert Csongrádi <bert@cpan.org>
487 Peter Siklósi <einon@einon.hu>
489 John Napiorkowski <john.napiorkowski@takkle.com>
493 You may distribute this code under the same terms as Perl itself.
504 use DBIx::Class::Storage::DBI;
507 use base qw/Class::Accessor::Fast/;
509 __PACKAGE__->mk_accessors( qw/read_source write_source/ );
513 DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
517 The Following example shows how to change an existing $schema to a replicated
518 storage type and update it's connection information to contain a master DSN and
521 ## Change storage_type in your schema class
522 $schema->storage_type( '::DBI::Replicated' );
524 ## Set your connection.
526 $dsn, $user, $password, {
528 ## Other standard DBI connection or DBD custom attributes added as
529 ## usual. Additionally, we have two custom attributes for defining
530 ## slave information and controlling how the underlying DBD::Multi
531 slaves_connect_info => [
532 ## Define each slave like a 'normal' DBI connection, but you add
533 ## in a DBD::Multi custom attribute to define how the slave is
534 ## prioritized. Please see DBD::Multi for more.
535 [$slave1dsn, $user, $password, {%slave1opts, priority=>10}],
536 [$slave2dsn, $user, $password, {%slave2opts, priority=>10}],
537 [$slave3dsn, $user, $password, {%slave3opts, priority=>20}],
538 ## add in a preexisting database handle
539 [$dbh, '','', {priority=>30}],
540 ## DBD::Multi will call this coderef for connects
541 [sub { DBI->connect(< DSN info >) }, '', '', {priority=>40}],
542 ## If the last item is hashref, we use that for DBD::Multi's
543 ## configuration information. Again, see DBD::Multi for more.
544 {timeout=>25, failed_max=>2},
549 ## Now, just use the schema as normal
550 $schema->resultset('Table')->find(< unique >); ## Reads will use slaves
551 $schema->resultset('Table')->create(\%info); ## Writes will use master
555 Warning: This class is marked ALPHA. We are using this in development and have
556 some basic test coverage but the code hasn't yet been stressed by a variety
557 of databases. Individual DB's may have quirks we are not aware of. Please
558 use this in development and pass along your experiences/bug fixes.
560 This class implements replicated data store for DBI. Currently you can define
561 one master and numerous slave database connections. All write-type queries
562 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
563 database, all read-type queries (SELECTs) go to the slave database.
565 For every slave database you can define a priority value, which controls data
566 source usage pattern. It uses L<DBD::Multi>, so first the lower priority data
567 sources used (if they have the same priority, the are used randomized), than
568 if all low priority data sources fail, higher ones tried in order.
572 Please see L<DBD::Multi> for most configuration information.
578 my $class = ref( $proto ) || $proto;
581 bless( $self, $class );
583 $self->write_source( DBIx::Class::Storage::DBI->new );
584 $self->read_source( DBIx::Class::Storage::DBI->new );
592 my @sources = ($self->read_source, $self->write_source);
594 return wantarray ? @sources : \@sources;
599 my $master = $self->write_source->_connect_info;
600 $master->[-1]->{slave_connect_info} = $self->read_source->_connect_info;
605 my ($self, $source_info) = @_;
607 ## if there is no $source_info, treat this sub like an accessor
608 return $self->_connect_info
611 ## Alright, let's conect the master
612 $self->write_source->connect_info($source_info);
614 ## Now, build and then connect the Slaves
615 my @slaves_connect_info = @{$source_info->[-1]->{slaves_connect_info}};
616 my $dbd_multi_config = ref $slaves_connect_info[-1] eq 'HASH'
617 ? pop @slaves_connect_info : {};
619 ## We need to do this since SQL::Abstract::Limit can't guess what DBD::Multi is
620 $dbd_multi_config->{limit_dialect} = $self->write_source->sql_maker->limit_dialect
621 unless defined $dbd_multi_config->{limit_dialect};
623 @slaves_connect_info = map {
624 ## if the first element in the arrayhash is a ref, make that the value
625 my $db = ref $_->[0] ? $_->[0] : $_;
626 my $priority = $_->[-1]->{priority} || 10; ## default priority is 10
628 } @slaves_connect_info;
630 $self->read_source->connect_info([
631 'dbi:Multi:', undef, undef, {
632 dsns => [@slaves_connect_info],
637 ## Return the formated connection information
638 return $self->_connect_info;
642 shift->read_source->select( @_ );
645 shift->read_source->select_single( @_ );
647 sub throw_exception {
648 shift->read_source->throw_exception( @_ );
651 shift->read_source->sql_maker( @_ );
653 sub columns_info_for {
654 shift->read_source->columns_info_for( @_ );
657 shift->read_source->sqlt_type( @_ );
660 shift->read_source->create_ddl_dir( @_ );
662 sub deployment_statements {
663 shift->read_source->deployment_statements( @_ );
665 sub datetime_parser {
666 shift->read_source->datetime_parser( @_ );
668 sub datetime_parser_type {
669 shift->read_source->datetime_parser_type( @_ );
671 sub build_datetime_parser {
672 shift->read_source->build_datetime_parser( @_ );
675 sub limit_dialect { $_->limit_dialect( @_ ) for( shift->all_sources ) }
676 sub quote_char { $_->quote_char( @_ ) for( shift->all_sources ) }
677 sub name_sep { $_->quote_char( @_ ) for( shift->all_sources ) }
678 sub disconnect { $_->disconnect( @_ ) for( shift->all_sources ) }
679 sub set_schema { $_->set_schema( @_ ) for( shift->all_sources ) }
684 undef $self->{write_source};
685 undef $self->{read_sources};
689 shift->write_source->last_insert_id( @_ );
692 shift->write_source->insert( @_ );
695 shift->write_source->update( @_ );
698 shift->write_source->update_all( @_ );
701 shift->write_source->delete( @_ );
704 shift->write_source->delete_all( @_ );
707 shift->write_source->create( @_ );
710 shift->write_source->find_or_create( @_ );
712 sub update_or_create {
713 shift->write_source->update_or_create( @_ );
716 shift->write_source->connected( @_ );
718 sub ensure_connected {
719 shift->write_source->ensure_connected( @_ );
722 shift->write_source->dbh( @_ );
725 shift->write_source->txn_do( @_ );
728 shift->write_source->txn_commit( @_ );
731 shift->write_source->txn_rollback( @_ );
734 shift->write_source->sth( @_ );
737 shift->write_source->deploy( @_ );
739 sub _prep_for_execute {
740 shift->write_source->_prep_for_execute(@_);
744 shift->write_source->debugobj(@_);
747 shift->write_source->debug(@_);
750 sub debugfh { shift->_not_supported( 'debugfh' ) };
751 sub debugcb { shift->_not_supported( 'debugcb' ) };
754 my( $self, $method ) = @_;
756 die "This Storage does not support $method method.";
761 L<DBI::Class::Storage::DBI>, L<DBD::Multi>, L<DBI>
765 Norbert Csongrádi <bert@cpan.org>
767 Peter Siklósi <einon@einon.hu>
769 John Napiorkowski <john.napiorkowski@takkle.com>
773 You may distribute this code under the same terms as Perl itself.