Merge 'trunk' into 'replication_dedux'
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
CommitLineData
2156bbdd 1package DBIx::Class::Storage::DBI::Replicated;
f5d3a5de 2
2bf79155 3use Moose;
26ab719a 4use DBIx::Class::Storage::DBI;
2bf79155 5use DBIx::Class::Storage::DBI::Replicated::Pool;
26ab719a 6use DBIx::Class::Storage::DBI::Replicated::Balancer;
7use Scalar::Util qw(blessed);
2bf79155 8
26ab719a 9extends 'DBIx::Class::Storage::DBI', 'Moose::Object';
2bf79155 10
11=head1 NAME
12
13DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
14
15=head1 SYNOPSIS
16
17The Following example shows how to change an existing $schema to a replicated
18storage type, add some replicated (readonly) databases, and perform reporting
955a6df6 19tasks.
2bf79155 20
21 ## Change storage_type in your schema class
22 $schema->storage_type( '::DBI::Replicated' );
23
24 ## Add some slaves. Basically this is an array of arrayrefs, where each
25 ## arrayref is database connect information
26
955a6df6 27 $schema->storage->connect_replicants(
2bf79155 28 [$dsn1, $user, $pass, \%opts],
29 [$dsn1, $user, $pass, \%opts],
30 [$dsn1, $user, $pass, \%opts],
2bf79155 31 );
2bf79155 32
33=head1 DESCRIPTION
34
35Warning: This class is marked ALPHA. We are using this in development and have
36some basic test coverage but the code hasn't yet been stressed by a variety
37of databases. Individual DB's may have quirks we are not aware of. Please
38use this in development and pass along your experiences/bug fixes.
39
40This class implements replicated data store for DBI. Currently you can define
41one master and numerous slave database connections. All write-type queries
42(INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
43database, all read-type queries (SELECTs) go to the slave database.
44
45Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
46handle gets delegated to one of the two attributes: L</master_storage> or to
47L</current_replicant_storage>. Additionally, some methods need to be distributed
48to all existing storages. This way our storage class is a drop in replacement
49for L<DBIx::Class::Storage::DBI>.
50
51Read traffic is spread across the replicants (slaves) occuring to a user
52selected algorithm. The default algorithm is random weighted.
53
54TODO more details about the algorithm.
55
56=head1 ATTRIBUTES
57
58This class defines the following attributes.
59
60=head2 master
61
62The master defines the canonical state for a pool of connected databases. All
63the replicants are expected to match this databases state. Thus, in a classic
64Master / Slaves distributed system, all the slaves are expected to replicate
65the Master's state as quick as possible. This is the only database in the
66pool of databases that is allowed to handle write traffic.
67
68=cut
69
70has 'master' => (
71 is=> 'ro',
72 isa=>'DBIx::Class::Storage::DBI',
73 lazy_build=>1,
74 handles=>[qw/
75 on_connect_do
76 on_disconnect_do
2bf79155 77 connect_info
78 throw_exception
79 sql_maker
80 sqlt_type
81 create_ddl_dir
82 deployment_statements
83 datetime_parser
84 datetime_parser_type
85 last_insert_id
86 insert
87 insert_bulk
88 update
89 delete
90 dbh
91 txn_do
92 txn_commit
93 txn_rollback
94 sth
95 deploy
50336325 96 schema
2bf79155 97 /],
98);
99
100
101=head2 current_replicant
102
103Replicant storages (slaves) handle all read only traffic. The assumption is
104that your database will become readbound well before it becomes write bound
105and that being able to spread your read only traffic around to multiple
106databases is going to help you to scale traffic.
107
108This attribute returns the next slave to handle a read request. Your L</pool>
109attribute has methods to help you shuffle through all the available replicants
110via it's balancer object.
111
2bf79155 112We split the reader/writer to make it easier to selectively override how the
113replicant is altered.
114
115=cut
116
117has 'current_replicant' => (
118 is=> 'rw',
2bf79155 119 isa=>'DBIx::Class::Storage::DBI',
120 lazy_build=>1,
121 handles=>[qw/
122 select
123 select_single
124 columns_info_for
125 /],
126);
127
128
26ab719a 129=head2 pool_type
2bf79155 130
26ab719a 131Contains the classname which will instantiate the L</pool> object. Defaults
132to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
2bf79155 133
134=cut
135
26ab719a 136has 'pool_type' => (
2bf79155 137 is=>'ro',
138 isa=>'ClassName',
139 required=>1,
26ab719a 140 lazy=>1,
2bf79155 141 default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
26ab719a 142 handles=>{
143 'create_pool' => 'new',
2bf79155 144 },
145);
146
147
26ab719a 148=head2 balancer_type
2bf79155 149
150The replication pool requires a balance class to provider the methods for
151choose how to spread the query load across each replicant in the pool.
152
153=cut
154
26ab719a 155has 'balancer_type' => (
2bf79155 156 is=>'ro',
157 isa=>'ClassName',
158 required=>1,
26ab719a 159 lazy=>1,
160 default=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
161 handles=>{
162 'create_balancer' => 'new',
2bf79155 163 },
164);
165
166
26ab719a 167=head2 pool
2bf79155 168
26ab719a 169Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class. This is a
170container class for one or more replicated databases.
2bf79155 171
172=cut
173
26ab719a 174has 'pool' => (
2bf79155 175 is=>'ro',
176 isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
177 lazy_build=>1,
26ab719a 178 handles=>[qw/
179 replicants
180 has_replicants
955a6df6 181 connect_replicants
26ab719a 182 num_replicants
183 delete_replicant
184 /],
2bf79155 185);
186
187
26ab719a 188=head2 balancer
2bf79155 189
26ab719a 190Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class. This
191is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
2bf79155 192
26ab719a 193=cut
2bf79155 194
26ab719a 195has 'balancer' => (
196 is=>'ro',
197 isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
198 lazy_build=>1,
199 handles=>[qw/next_storage/],
200);
2bf79155 201
26ab719a 202=head1 METHODS
2bf79155 203
26ab719a 204This class defines the following methods.
2bf79155 205
26ab719a 206=head2 _build_master
2bf79155 207
26ab719a 208Lazy builder for the L</master> attribute.
2bf79155 209
210=cut
211
26ab719a 212sub _build_master {
2bf79155 213 DBIx::Class::Storage::DBI->new;
214}
215
216
26ab719a 217=head2 _build_current_replicant
2bf79155 218
219Lazy builder for the L</current_replicant_storage> attribute.
220
221=cut
222
26ab719a 223sub _build_current_replicant {
224 my $self = shift @_;
225 $self->next_storage($self->pool);
2bf79155 226}
227
228
26ab719a 229=head2 _build_pool
2bf79155 230
26ab719a 231Lazy builder for the L</pool> attribute.
2bf79155 232
233=cut
234
26ab719a 235sub _build_pool {
2bf79155 236 my $self = shift @_;
26ab719a 237 $self->create_pool;
2bf79155 238}
239
240
26ab719a 241=head2 _build_balancer
2bf79155 242
26ab719a 243Lazy builder for the L</balancer> attribute.
2bf79155 244
245=cut
246
26ab719a 247sub _build_balancer {
248 my $self = shift @_;
249 $self->create_balancer;
2bf79155 250}
251
252
50336325 253=head2 around: create_replicants
254
255All calls to create_replicants needs to have an existing $schema tacked onto
256top of the args
257
258=cut
259
955a6df6 260around 'connect_replicants' => sub {
50336325 261 my ($method, $self, @args) = @_;
262 $self->$method($self->schema, @args);
263};
264
265
955a6df6 266=head2 after: select, select_single, columns_info_for
2bf79155 267
268Advice on the current_replicant_storage attribute. Each time we use a replicant
269we need to change it via the storage pool algorithm. That way we are spreading
270the load evenly (hopefully) across existing capacity.
271
272=cut
273
955a6df6 274after 'select' => sub {
2bf79155 275 my $self = shift @_;
26ab719a 276 my $next_replicant = $self->next_storage($self->pool);
50336325 277
955a6df6 278 $self->current_replicant($next_replicant);
2bf79155 279};
280
955a6df6 281after 'select_single' => sub {
282 my $self = shift @_;
283 my $next_replicant = $self->next_storage($self->pool);
284
285 $self->current_replicant($next_replicant);
286};
287
288after 'columns_info_for' => sub {
289 my $self = shift @_;
290 my $next_replicant = $self->next_storage($self->pool);
291
292 $self->current_replicant($next_replicant);
293};
2bf79155 294
2bf79155 295=head2 all_storages
296
297Returns an array of of all the connected storage backends. The first element
298in the returned array is the master, and the remainings are each of the
299replicants.
300
301=cut
302
303sub all_storages {
304 my $self = shift @_;
305
26ab719a 306 return grep {defined $_ && blessed $_} (
307 $self->master,
308 $self->replicants,
2bf79155 309 );
310}
311
312
313=head2 connected
314
315Check that the master and at least one of the replicants is connected.
316
317=cut
318
319sub connected {
320 my $self = shift @_;
321
322 return
26ab719a 323 $self->master->connected &&
324 $self->pool->connected_replicants;
2bf79155 325}
326
327
328=head2 ensure_connected
329
330Make sure all the storages are connected.
331
332=cut
333
334sub ensure_connected {
335 my $self = shift @_;
26ab719a 336 foreach my $source ($self->all_storages) {
2bf79155 337 $source->ensure_connected(@_);
338 }
339}
340
341
342=head2 limit_dialect
343
344Set the limit_dialect for all existing storages
345
346=cut
347
348sub limit_dialect {
349 my $self = shift @_;
26ab719a 350 foreach my $source ($self->all_storages) {
351 $source->limit_dialect(@_);
2bf79155 352 }
353}
354
355
356=head2 quote_char
357
358Set the quote_char for all existing storages
359
360=cut
361
362sub quote_char {
363 my $self = shift @_;
26ab719a 364 foreach my $source ($self->all_storages) {
365 $source->quote_char(@_);
2bf79155 366 }
367}
368
369
370=head2 name_sep
371
372Set the name_sep for all existing storages
373
374=cut
375
376sub name_sep {
377 my $self = shift @_;
26ab719a 378 foreach my $source ($self->all_storages) {
2bf79155 379 $source->name_sep(@_);
380 }
381}
382
383
384=head2 set_schema
385
386Set the schema object for all existing storages
387
388=cut
389
390sub set_schema {
391 my $self = shift @_;
26ab719a 392 foreach my $source ($self->all_storages) {
2bf79155 393 $source->set_schema(@_);
394 }
395}
396
397
398=head2 debug
399
400set a debug flag across all storages
401
402=cut
403
404sub debug {
405 my $self = shift @_;
26ab719a 406 foreach my $source ($self->all_storages) {
2bf79155 407 $source->debug(@_);
408 }
409}
410
411
412=head2 debugobj
413
414set a debug object across all storages
415
416=cut
417
418sub debugobj {
419 my $self = shift @_;
26ab719a 420 foreach my $source ($self->all_storages) {
2bf79155 421 $source->debugobj(@_);
422 }
423}
424
425
426=head2 debugfh
427
428set a debugfh object across all storages
429
430=cut
431
432sub debugfh {
433 my $self = shift @_;
26ab719a 434 foreach my $source ($self->all_storages) {
2bf79155 435 $source->debugfh(@_);
436 }
437}
438
439
440=head2 debugcb
441
442set a debug callback across all storages
443
444=cut
445
446sub debugcb {
447 my $self = shift @_;
26ab719a 448 foreach my $source ($self->all_storages) {
2bf79155 449 $source->debugcb(@_);
450 }
451}
452
453
454=head2 disconnect
455
456disconnect everything
457
458=cut
459
460sub disconnect {
461 my $self = shift @_;
26ab719a 462 foreach my $source ($self->all_storages) {
2bf79155 463 $source->disconnect(@_);
464 }
465}
466
467
468=head2 DESTROY
469
470Make sure we pass destroy events down to the storage handlers
471
472=cut
473
474sub DESTROY {
475 my $self = shift;
476 ## TODO, maybe we can just leave this alone ???
477}
478
479
480=head1 AUTHOR
481
482Norbert Csongrádi <bert@cpan.org>
483
484Peter Siklósi <einon@einon.hu>
485
486John Napiorkowski <john.napiorkowski@takkle.com>
487
488=head1 LICENSE
489
490You may distribute this code under the same terms as Perl itself.
491
492=cut
493
4941;
495
496__END__
497
f5d3a5de 498use strict;
499use warnings;
500
501use DBIx::Class::Storage::DBI;
502use DBD::Multi;
2156bbdd 503
f5d3a5de 504use base qw/Class::Accessor::Fast/;
505
506__PACKAGE__->mk_accessors( qw/read_source write_source/ );
507
508=head1 NAME
509
2156bbdd 510DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
f5d3a5de 511
512=head1 SYNOPSIS
513
2156bbdd 514The Following example shows how to change an existing $schema to a replicated
515storage type and update it's connection information to contain a master DSN and
516an array of slaves.
517
518 ## Change storage_type in your schema class
519 $schema->storage_type( '::DBI::Replicated' );
520
521 ## Set your connection.
522 $schema->connect(
523 $dsn, $user, $password, {
524 AutoCommit => 1,
525 ## Other standard DBI connection or DBD custom attributes added as
526 ## usual. Additionally, we have two custom attributes for defining
527 ## slave information and controlling how the underlying DBD::Multi
955a6df6 528 connect_replicants => [
2156bbdd 529 ## Define each slave like a 'normal' DBI connection, but you add
530 ## in a DBD::Multi custom attribute to define how the slave is
531 ## prioritized. Please see DBD::Multi for more.
955a6df6 532 [$slave1dsn, $user, $password, {%slave1opts}],
533 [$slave2dsn, $user, $password, {%slave2opts}],
534 [$slave3dsn, $user, $password, {%slave3opts}],
2156bbdd 535 ],
536 },
537 );
538
539 ## Now, just use the schema as normal
540 $schema->resultset('Table')->find(< unique >); ## Reads will use slaves
541 $schema->resultset('Table')->create(\%info); ## Writes will use master
f5d3a5de 542
543=head1 DESCRIPTION
544
2156bbdd 545Warning: This class is marked ALPHA. We are using this in development and have
546some basic test coverage but the code hasn't yet been stressed by a variety
547of databases. Individual DB's may have quirks we are not aware of. Please
548use this in development and pass along your experiences/bug fixes.
f5d3a5de 549
0aff97c2 550This class implements replicated data store for DBI. Currently you can define
551one master and numerous slave database connections. All write-type queries
552(INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
553database, all read-type queries (SELECTs) go to the slave database.
f5d3a5de 554
0aff97c2 555For every slave database you can define a priority value, which controls data
556source usage pattern. It uses L<DBD::Multi>, so first the lower priority data
557sources used (if they have the same priority, the are used randomized), than
558if all low priority data sources fail, higher ones tried in order.
f5d3a5de 559
560=head1 CONFIGURATION
561
2156bbdd 562Please see L<DBD::Multi> for most configuration information.
f5d3a5de 563
564=cut
565
566sub new {
567 my $proto = shift;
568 my $class = ref( $proto ) || $proto;
569 my $self = {};
570
571 bless( $self, $class );
572
573 $self->write_source( DBIx::Class::Storage::DBI->new );
574 $self->read_source( DBIx::Class::Storage::DBI->new );
575
576 return $self;
577}
578
579sub all_sources {
580 my $self = shift;
581
582 my @sources = ($self->read_source, $self->write_source);
583
584 return wantarray ? @sources : \@sources;
585}
586
2156bbdd 587sub _connect_info {
588 my $self = shift;
589 my $master = $self->write_source->_connect_info;
590 $master->[-1]->{slave_connect_info} = $self->read_source->_connect_info;
591 return $master;
592}
f5d3a5de 593
2156bbdd 594sub connect_info {
595 my ($self, $source_info) = @_;
f5d3a5de 596
2156bbdd 597 ## if there is no $source_info, treat this sub like an accessor
598 return $self->_connect_info
599 if !$source_info;
600
601 ## Alright, let's conect the master
602 $self->write_source->connect_info($source_info);
603
604 ## Now, build and then connect the Slaves
605 my @slaves_connect_info = @{$source_info->[-1]->{slaves_connect_info}};
606 my $dbd_multi_config = ref $slaves_connect_info[-1] eq 'HASH'
607 ? pop @slaves_connect_info : {};
608
609 ## We need to do this since SQL::Abstract::Limit can't guess what DBD::Multi is
610 $dbd_multi_config->{limit_dialect} = $self->write_source->sql_maker->limit_dialect
611 unless defined $dbd_multi_config->{limit_dialect};
612
613 @slaves_connect_info = map {
0f83441a 614 ## if the first element in the arrayhash is a ref, make that the value
615 my $db = ref $_->[0] ? $_->[0] : $_;
2156bbdd 616 my $priority = $_->[-1]->{priority} || 10; ## default priority is 10
617 $priority => $db;
618 } @slaves_connect_info;
0f83441a 619
2156bbdd 620 $self->read_source->connect_info([
621 'dbi:Multi:', undef, undef, {
622 dsns => [@slaves_connect_info],
623 %$dbd_multi_config,
624 },
625 ]);
626
627 ## Return the formated connection information
628 return $self->_connect_info;
f5d3a5de 629}
630
631sub select {
632 shift->read_source->select( @_ );
633}
634sub select_single {
635 shift->read_source->select_single( @_ );
636}
637sub throw_exception {
638 shift->read_source->throw_exception( @_ );
639}
640sub sql_maker {
641 shift->read_source->sql_maker( @_ );
642}
643sub columns_info_for {
644 shift->read_source->columns_info_for( @_ );
645}
646sub sqlt_type {
647 shift->read_source->sqlt_type( @_ );
648}
649sub create_ddl_dir {
650 shift->read_source->create_ddl_dir( @_ );
651}
652sub deployment_statements {
653 shift->read_source->deployment_statements( @_ );
654}
655sub datetime_parser {
656 shift->read_source->datetime_parser( @_ );
657}
658sub datetime_parser_type {
659 shift->read_source->datetime_parser_type( @_ );
660}
661sub build_datetime_parser {
662 shift->read_source->build_datetime_parser( @_ );
663}
664
9b21c682 665sub limit_dialect { $_->limit_dialect( @_ ) for( shift->all_sources ) }
666sub quote_char { $_->quote_char( @_ ) for( shift->all_sources ) }
667sub name_sep { $_->quote_char( @_ ) for( shift->all_sources ) }
668sub disconnect { $_->disconnect( @_ ) for( shift->all_sources ) }
669sub set_schema { $_->set_schema( @_ ) for( shift->all_sources ) }
670
f5d3a5de 671sub DESTROY {
672 my $self = shift;
673
674 undef $self->{write_source};
675 undef $self->{read_sources};
676}
677
678sub last_insert_id {
679 shift->write_source->last_insert_id( @_ );
680}
681sub insert {
682 shift->write_source->insert( @_ );
683}
684sub update {
685 shift->write_source->update( @_ );
686}
687sub update_all {
688 shift->write_source->update_all( @_ );
689}
690sub delete {
691 shift->write_source->delete( @_ );
692}
693sub delete_all {
694 shift->write_source->delete_all( @_ );
695}
696sub create {
697 shift->write_source->create( @_ );
698}
699sub find_or_create {
700 shift->write_source->find_or_create( @_ );
701}
702sub update_or_create {
703 shift->write_source->update_or_create( @_ );
704}
705sub connected {
706 shift->write_source->connected( @_ );
707}
708sub ensure_connected {
709 shift->write_source->ensure_connected( @_ );
710}
711sub dbh {
712 shift->write_source->dbh( @_ );
713}
2156bbdd 714sub txn_do {
715 shift->write_source->txn_do( @_ );
f5d3a5de 716}
717sub txn_commit {
718 shift->write_source->txn_commit( @_ );
719}
720sub txn_rollback {
721 shift->write_source->txn_rollback( @_ );
722}
723sub sth {
724 shift->write_source->sth( @_ );
725}
726sub deploy {
727 shift->write_source->deploy( @_ );
728}
2156bbdd 729sub _prep_for_execute {
730 shift->write_source->_prep_for_execute(@_);
731}
f5d3a5de 732
2156bbdd 733sub debugobj {
734 shift->write_source->debugobj(@_);
735}
736sub debug {
737 shift->write_source->debug(@_);
738}
f5d3a5de 739
740sub debugfh { shift->_not_supported( 'debugfh' ) };
741sub debugcb { shift->_not_supported( 'debugcb' ) };
742
743sub _not_supported {
744 my( $self, $method ) = @_;
745
746 die "This Storage does not support $method method.";
747}
748
749=head1 SEE ALSO
750
751L<DBI::Class::Storage::DBI>, L<DBD::Multi>, L<DBI>
752
753=head1 AUTHOR
754
755Norbert Csongrádi <bert@cpan.org>
756
757Peter Siklósi <einon@einon.hu>
758
2156bbdd 759John Napiorkowski <john.napiorkowski@takkle.com>
760
f5d3a5de 761=head1 LICENSE
762
763You may distribute this code under the same terms as Perl itself.
764
765=cut
766
7671;