added some advice to debugging replicants so that we can see a replicant dsn, got...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
CommitLineData
2156bbdd 1package DBIx::Class::Storage::DBI::Replicated;
f5d3a5de 2
2bf79155 3use Moose;
26ab719a 4use DBIx::Class::Storage::DBI;
2bf79155 5use DBIx::Class::Storage::DBI::Replicated::Pool;
26ab719a 6use DBIx::Class::Storage::DBI::Replicated::Balancer;
7use Scalar::Util qw(blessed);
2bf79155 8
26ab719a 9extends 'DBIx::Class::Storage::DBI', 'Moose::Object';
2bf79155 10
11=head1 NAME
12
13DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
14
15=head1 SYNOPSIS
16
17The Following example shows how to change an existing $schema to a replicated
18storage type, add some replicated (readonly) databases, and perform reporting
19tasks
20
21 ## Change storage_type in your schema class
22 $schema->storage_type( '::DBI::Replicated' );
23
24 ## Add some slaves. Basically this is an array of arrayrefs, where each
25 ## arrayref is database connect information
26
27 $schema->storage->create_replicants(
28 [$dsn1, $user, $pass, \%opts],
29 [$dsn1, $user, $pass, \%opts],
30 [$dsn1, $user, $pass, \%opts],
31 ## This is just going to use the standard DBIC connect method, so it
32 ## supports everything that method supports, such as connecting to an
33 ## existing database handle.
34 [$dbh],
2bf79155 35 );
50336325 36
2bf79155 37
38=head1 DESCRIPTION
39
40Warning: This class is marked ALPHA. We are using this in development and have
41some basic test coverage but the code hasn't yet been stressed by a variety
42of databases. Individual DB's may have quirks we are not aware of. Please
43use this in development and pass along your experiences/bug fixes.
44
45This class implements replicated data store for DBI. Currently you can define
46one master and numerous slave database connections. All write-type queries
47(INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
48database, all read-type queries (SELECTs) go to the slave database.
49
50Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
51handle gets delegated to one of the two attributes: L</master_storage> or to
52L</current_replicant_storage>. Additionally, some methods need to be distributed
53to all existing storages. This way our storage class is a drop in replacement
54for L<DBIx::Class::Storage::DBI>.
55
56Read traffic is spread across the replicants (slaves) occuring to a user
57selected algorithm. The default algorithm is random weighted.
58
59TODO more details about the algorithm.
60
61=head1 ATTRIBUTES
62
63This class defines the following attributes.
64
65=head2 master
66
67The master defines the canonical state for a pool of connected databases. All
68the replicants are expected to match this databases state. Thus, in a classic
69Master / Slaves distributed system, all the slaves are expected to replicate
70the Master's state as quick as possible. This is the only database in the
71pool of databases that is allowed to handle write traffic.
72
73=cut
74
75has 'master' => (
76 is=> 'ro',
77 isa=>'DBIx::Class::Storage::DBI',
78 lazy_build=>1,
79 handles=>[qw/
80 on_connect_do
81 on_disconnect_do
2bf79155 82 connect_info
83 throw_exception
84 sql_maker
85 sqlt_type
86 create_ddl_dir
87 deployment_statements
88 datetime_parser
89 datetime_parser_type
90 last_insert_id
91 insert
92 insert_bulk
93 update
94 delete
95 dbh
96 txn_do
97 txn_commit
98 txn_rollback
99 sth
100 deploy
50336325 101 schema
2bf79155 102 /],
103);
104
105
106=head2 current_replicant
107
108Replicant storages (slaves) handle all read only traffic. The assumption is
109that your database will become readbound well before it becomes write bound
110and that being able to spread your read only traffic around to multiple
111databases is going to help you to scale traffic.
112
113This attribute returns the next slave to handle a read request. Your L</pool>
114attribute has methods to help you shuffle through all the available replicants
115via it's balancer object.
116
117This attribute defines the following reader/writer methods
118
119=over 4
120
121=item get_current_replicant
122
123Returns the contained L<DBIx::Class::Storage::DBI> replicant
124
125=item set_current_replicant
126
127Set the attribute to a given L<DBIx::Class::Storage::DBI> (or subclass) object.
128
129=back
130
131We split the reader/writer to make it easier to selectively override how the
132replicant is altered.
133
134=cut
135
136has 'current_replicant' => (
137 is=> 'rw',
138 reader=>'get_current_replicant',
139 writer=>'set_current_replicant',
140 isa=>'DBIx::Class::Storage::DBI',
141 lazy_build=>1,
142 handles=>[qw/
143 select
144 select_single
145 columns_info_for
146 /],
50336325 147 trigger=>sub {'xxxxxxxxxxxxxxxxxxxxxxxxxxx'},
2bf79155 148);
149
150
26ab719a 151=head2 pool_type
2bf79155 152
26ab719a 153Contains the classname which will instantiate the L</pool> object. Defaults
154to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
2bf79155 155
156=cut
157
26ab719a 158has 'pool_type' => (
2bf79155 159 is=>'ro',
160 isa=>'ClassName',
161 required=>1,
26ab719a 162 lazy=>1,
2bf79155 163 default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
26ab719a 164 handles=>{
165 'create_pool' => 'new',
2bf79155 166 },
167);
168
169
26ab719a 170=head2 balancer_type
2bf79155 171
172The replication pool requires a balance class to provider the methods for
173choose how to spread the query load across each replicant in the pool.
174
175=cut
176
26ab719a 177has 'balancer_type' => (
2bf79155 178 is=>'ro',
179 isa=>'ClassName',
180 required=>1,
26ab719a 181 lazy=>1,
182 default=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
183 handles=>{
184 'create_balancer' => 'new',
2bf79155 185 },
186);
187
188
26ab719a 189=head2 pool
2bf79155 190
26ab719a 191Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class. This is a
192container class for one or more replicated databases.
2bf79155 193
194=cut
195
26ab719a 196has 'pool' => (
2bf79155 197 is=>'ro',
198 isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
199 lazy_build=>1,
26ab719a 200 handles=>[qw/
201 replicants
202 has_replicants
203 create_replicants
204 num_replicants
205 delete_replicant
206 /],
2bf79155 207);
208
209
26ab719a 210=head2 balancer
2bf79155 211
26ab719a 212Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class. This
213is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
2bf79155 214
26ab719a 215=cut
2bf79155 216
26ab719a 217has 'balancer' => (
218 is=>'ro',
219 isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
220 lazy_build=>1,
221 handles=>[qw/next_storage/],
222);
2bf79155 223
26ab719a 224=head1 METHODS
2bf79155 225
26ab719a 226This class defines the following methods.
2bf79155 227
26ab719a 228=head2 _build_master
2bf79155 229
26ab719a 230Lazy builder for the L</master> attribute.
2bf79155 231
232=cut
233
26ab719a 234sub _build_master {
2bf79155 235 DBIx::Class::Storage::DBI->new;
236}
237
238
26ab719a 239=head2 _build_current_replicant
2bf79155 240
241Lazy builder for the L</current_replicant_storage> attribute.
242
243=cut
244
26ab719a 245sub _build_current_replicant {
246 my $self = shift @_;
247 $self->next_storage($self->pool);
2bf79155 248}
249
250
26ab719a 251=head2 _build_pool
2bf79155 252
26ab719a 253Lazy builder for the L</pool> attribute.
2bf79155 254
255=cut
256
26ab719a 257sub _build_pool {
2bf79155 258 my $self = shift @_;
26ab719a 259 $self->create_pool;
2bf79155 260}
261
262
26ab719a 263=head2 _build_balancer
2bf79155 264
26ab719a 265Lazy builder for the L</balancer> attribute.
2bf79155 266
267=cut
268
26ab719a 269sub _build_balancer {
270 my $self = shift @_;
271 $self->create_balancer;
2bf79155 272}
273
274
50336325 275=head2 around: create_replicants
276
277All calls to create_replicants needs to have an existing $schema tacked onto
278top of the args
279
280=cut
281
282around 'create_replicants' => sub {
283 my ($method, $self, @args) = @_;
284 $self->$method($self->schema, @args);
285};
286
287
2bf79155 288=head2 after: get_current_replicant_storage
289
290Advice on the current_replicant_storage attribute. Each time we use a replicant
291we need to change it via the storage pool algorithm. That way we are spreading
292the load evenly (hopefully) across existing capacity.
293
294=cut
295
50336325 296after 'current_replicant' => sub {
2bf79155 297 my $self = shift @_;
26ab719a 298 my $next_replicant = $self->next_storage($self->pool);
50336325 299
300warn '......................';
26ab719a 301 $self->set_current_replicant($next_replicant);
2bf79155 302};
303
304
2bf79155 305=head2 all_storages
306
307Returns an array of of all the connected storage backends. The first element
308in the returned array is the master, and the remainings are each of the
309replicants.
310
311=cut
312
313sub all_storages {
314 my $self = shift @_;
315
26ab719a 316 return grep {defined $_ && blessed $_} (
317 $self->master,
318 $self->replicants,
2bf79155 319 );
320}
321
322
323=head2 connected
324
325Check that the master and at least one of the replicants is connected.
326
327=cut
328
329sub connected {
330 my $self = shift @_;
331
332 return
26ab719a 333 $self->master->connected &&
334 $self->pool->connected_replicants;
2bf79155 335}
336
337
338=head2 ensure_connected
339
340Make sure all the storages are connected.
341
342=cut
343
344sub ensure_connected {
345 my $self = shift @_;
26ab719a 346 foreach my $source ($self->all_storages) {
2bf79155 347 $source->ensure_connected(@_);
348 }
349}
350
351
352=head2 limit_dialect
353
354Set the limit_dialect for all existing storages
355
356=cut
357
358sub limit_dialect {
359 my $self = shift @_;
26ab719a 360 foreach my $source ($self->all_storages) {
361 $source->limit_dialect(@_);
2bf79155 362 }
363}
364
365
366=head2 quote_char
367
368Set the quote_char for all existing storages
369
370=cut
371
372sub quote_char {
373 my $self = shift @_;
26ab719a 374 foreach my $source ($self->all_storages) {
375 $source->quote_char(@_);
2bf79155 376 }
377}
378
379
380=head2 name_sep
381
382Set the name_sep for all existing storages
383
384=cut
385
386sub name_sep {
387 my $self = shift @_;
26ab719a 388 foreach my $source ($self->all_storages) {
2bf79155 389 $source->name_sep(@_);
390 }
391}
392
393
394=head2 set_schema
395
396Set the schema object for all existing storages
397
398=cut
399
400sub set_schema {
401 my $self = shift @_;
26ab719a 402 foreach my $source ($self->all_storages) {
2bf79155 403 $source->set_schema(@_);
404 }
405}
406
407
408=head2 debug
409
410set a debug flag across all storages
411
412=cut
413
414sub debug {
415 my $self = shift @_;
26ab719a 416 foreach my $source ($self->all_storages) {
2bf79155 417 $source->debug(@_);
418 }
419}
420
421
422=head2 debugobj
423
424set a debug object across all storages
425
426=cut
427
428sub debugobj {
429 my $self = shift @_;
26ab719a 430 foreach my $source ($self->all_storages) {
2bf79155 431 $source->debugobj(@_);
432 }
433}
434
435
436=head2 debugfh
437
438set a debugfh object across all storages
439
440=cut
441
442sub debugfh {
443 my $self = shift @_;
26ab719a 444 foreach my $source ($self->all_storages) {
2bf79155 445 $source->debugfh(@_);
446 }
447}
448
449
450=head2 debugcb
451
452set a debug callback across all storages
453
454=cut
455
456sub debugcb {
457 my $self = shift @_;
26ab719a 458 foreach my $source ($self->all_storages) {
2bf79155 459 $source->debugcb(@_);
460 }
461}
462
463
464=head2 disconnect
465
466disconnect everything
467
468=cut
469
470sub disconnect {
471 my $self = shift @_;
26ab719a 472 foreach my $source ($self->all_storages) {
2bf79155 473 $source->disconnect(@_);
474 }
475}
476
477
478=head2 DESTROY
479
480Make sure we pass destroy events down to the storage handlers
481
482=cut
483
484sub DESTROY {
485 my $self = shift;
486 ## TODO, maybe we can just leave this alone ???
487}
488
489
490=head1 AUTHOR
491
492Norbert Csongrádi <bert@cpan.org>
493
494Peter Siklósi <einon@einon.hu>
495
496John Napiorkowski <john.napiorkowski@takkle.com>
497
498=head1 LICENSE
499
500You may distribute this code under the same terms as Perl itself.
501
502=cut
503
5041;
505
506__END__
507
f5d3a5de 508use strict;
509use warnings;
510
511use DBIx::Class::Storage::DBI;
512use DBD::Multi;
2156bbdd 513
f5d3a5de 514use base qw/Class::Accessor::Fast/;
515
516__PACKAGE__->mk_accessors( qw/read_source write_source/ );
517
518=head1 NAME
519
2156bbdd 520DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
f5d3a5de 521
522=head1 SYNOPSIS
523
2156bbdd 524The Following example shows how to change an existing $schema to a replicated
525storage type and update it's connection information to contain a master DSN and
526an array of slaves.
527
528 ## Change storage_type in your schema class
529 $schema->storage_type( '::DBI::Replicated' );
530
531 ## Set your connection.
532 $schema->connect(
533 $dsn, $user, $password, {
534 AutoCommit => 1,
535 ## Other standard DBI connection or DBD custom attributes added as
536 ## usual. Additionally, we have two custom attributes for defining
537 ## slave information and controlling how the underlying DBD::Multi
538 slaves_connect_info => [
539 ## Define each slave like a 'normal' DBI connection, but you add
540 ## in a DBD::Multi custom attribute to define how the slave is
541 ## prioritized. Please see DBD::Multi for more.
542 [$slave1dsn, $user, $password, {%slave1opts, priority=>10}],
543 [$slave2dsn, $user, $password, {%slave2opts, priority=>10}],
544 [$slave3dsn, $user, $password, {%slave3opts, priority=>20}],
545 ## add in a preexisting database handle
546 [$dbh, '','', {priority=>30}],
547 ## DBD::Multi will call this coderef for connects
548 [sub { DBI->connect(< DSN info >) }, '', '', {priority=>40}],
549 ## If the last item is hashref, we use that for DBD::Multi's
550 ## configuration information. Again, see DBD::Multi for more.
551 {timeout=>25, failed_max=>2},
552 ],
553 },
554 );
555
556 ## Now, just use the schema as normal
557 $schema->resultset('Table')->find(< unique >); ## Reads will use slaves
558 $schema->resultset('Table')->create(\%info); ## Writes will use master
f5d3a5de 559
560=head1 DESCRIPTION
561
2156bbdd 562Warning: This class is marked ALPHA. We are using this in development and have
563some basic test coverage but the code hasn't yet been stressed by a variety
564of databases. Individual DB's may have quirks we are not aware of. Please
565use this in development and pass along your experiences/bug fixes.
f5d3a5de 566
0aff97c2 567This class implements replicated data store for DBI. Currently you can define
568one master and numerous slave database connections. All write-type queries
569(INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
570database, all read-type queries (SELECTs) go to the slave database.
f5d3a5de 571
0aff97c2 572For every slave database you can define a priority value, which controls data
573source usage pattern. It uses L<DBD::Multi>, so first the lower priority data
574sources used (if they have the same priority, the are used randomized), than
575if all low priority data sources fail, higher ones tried in order.
f5d3a5de 576
577=head1 CONFIGURATION
578
2156bbdd 579Please see L<DBD::Multi> for most configuration information.
f5d3a5de 580
581=cut
582
583sub new {
584 my $proto = shift;
585 my $class = ref( $proto ) || $proto;
586 my $self = {};
587
588 bless( $self, $class );
589
590 $self->write_source( DBIx::Class::Storage::DBI->new );
591 $self->read_source( DBIx::Class::Storage::DBI->new );
592
593 return $self;
594}
595
596sub all_sources {
597 my $self = shift;
598
599 my @sources = ($self->read_source, $self->write_source);
600
601 return wantarray ? @sources : \@sources;
602}
603
2156bbdd 604sub _connect_info {
605 my $self = shift;
606 my $master = $self->write_source->_connect_info;
607 $master->[-1]->{slave_connect_info} = $self->read_source->_connect_info;
608 return $master;
609}
f5d3a5de 610
2156bbdd 611sub connect_info {
612 my ($self, $source_info) = @_;
f5d3a5de 613
2156bbdd 614 ## if there is no $source_info, treat this sub like an accessor
615 return $self->_connect_info
616 if !$source_info;
617
618 ## Alright, let's conect the master
619 $self->write_source->connect_info($source_info);
620
621 ## Now, build and then connect the Slaves
622 my @slaves_connect_info = @{$source_info->[-1]->{slaves_connect_info}};
623 my $dbd_multi_config = ref $slaves_connect_info[-1] eq 'HASH'
624 ? pop @slaves_connect_info : {};
625
626 ## We need to do this since SQL::Abstract::Limit can't guess what DBD::Multi is
627 $dbd_multi_config->{limit_dialect} = $self->write_source->sql_maker->limit_dialect
628 unless defined $dbd_multi_config->{limit_dialect};
629
630 @slaves_connect_info = map {
0f83441a 631 ## if the first element in the arrayhash is a ref, make that the value
632 my $db = ref $_->[0] ? $_->[0] : $_;
2156bbdd 633 my $priority = $_->[-1]->{priority} || 10; ## default priority is 10
634 $priority => $db;
635 } @slaves_connect_info;
0f83441a 636
2156bbdd 637 $self->read_source->connect_info([
638 'dbi:Multi:', undef, undef, {
639 dsns => [@slaves_connect_info],
640 %$dbd_multi_config,
641 },
642 ]);
643
644 ## Return the formated connection information
645 return $self->_connect_info;
f5d3a5de 646}
647
648sub select {
649 shift->read_source->select( @_ );
650}
651sub select_single {
652 shift->read_source->select_single( @_ );
653}
654sub throw_exception {
655 shift->read_source->throw_exception( @_ );
656}
657sub sql_maker {
658 shift->read_source->sql_maker( @_ );
659}
660sub columns_info_for {
661 shift->read_source->columns_info_for( @_ );
662}
663sub sqlt_type {
664 shift->read_source->sqlt_type( @_ );
665}
666sub create_ddl_dir {
667 shift->read_source->create_ddl_dir( @_ );
668}
669sub deployment_statements {
670 shift->read_source->deployment_statements( @_ );
671}
672sub datetime_parser {
673 shift->read_source->datetime_parser( @_ );
674}
675sub datetime_parser_type {
676 shift->read_source->datetime_parser_type( @_ );
677}
678sub build_datetime_parser {
679 shift->read_source->build_datetime_parser( @_ );
680}
681
9b21c682 682sub limit_dialect { $_->limit_dialect( @_ ) for( shift->all_sources ) }
683sub quote_char { $_->quote_char( @_ ) for( shift->all_sources ) }
684sub name_sep { $_->quote_char( @_ ) for( shift->all_sources ) }
685sub disconnect { $_->disconnect( @_ ) for( shift->all_sources ) }
686sub set_schema { $_->set_schema( @_ ) for( shift->all_sources ) }
687
f5d3a5de 688sub DESTROY {
689 my $self = shift;
690
691 undef $self->{write_source};
692 undef $self->{read_sources};
693}
694
695sub last_insert_id {
696 shift->write_source->last_insert_id( @_ );
697}
698sub insert {
699 shift->write_source->insert( @_ );
700}
701sub update {
702 shift->write_source->update( @_ );
703}
704sub update_all {
705 shift->write_source->update_all( @_ );
706}
707sub delete {
708 shift->write_source->delete( @_ );
709}
710sub delete_all {
711 shift->write_source->delete_all( @_ );
712}
713sub create {
714 shift->write_source->create( @_ );
715}
716sub find_or_create {
717 shift->write_source->find_or_create( @_ );
718}
719sub update_or_create {
720 shift->write_source->update_or_create( @_ );
721}
722sub connected {
723 shift->write_source->connected( @_ );
724}
725sub ensure_connected {
726 shift->write_source->ensure_connected( @_ );
727}
728sub dbh {
729 shift->write_source->dbh( @_ );
730}
2156bbdd 731sub txn_do {
732 shift->write_source->txn_do( @_ );
f5d3a5de 733}
734sub txn_commit {
735 shift->write_source->txn_commit( @_ );
736}
737sub txn_rollback {
738 shift->write_source->txn_rollback( @_ );
739}
740sub sth {
741 shift->write_source->sth( @_ );
742}
743sub deploy {
744 shift->write_source->deploy( @_ );
745}
2156bbdd 746sub _prep_for_execute {
747 shift->write_source->_prep_for_execute(@_);
748}
f5d3a5de 749
2156bbdd 750sub debugobj {
751 shift->write_source->debugobj(@_);
752}
753sub debug {
754 shift->write_source->debug(@_);
755}
f5d3a5de 756
757sub debugfh { shift->_not_supported( 'debugfh' ) };
758sub debugcb { shift->_not_supported( 'debugcb' ) };
759
760sub _not_supported {
761 my( $self, $method ) = @_;
762
763 die "This Storage does not support $method method.";
764}
765
766=head1 SEE ALSO
767
768L<DBI::Class::Storage::DBI>, L<DBD::Multi>, L<DBI>
769
770=head1 AUTHOR
771
772Norbert Csongrádi <bert@cpan.org>
773
774Peter Siklósi <einon@einon.hu>
775
2156bbdd 776John Napiorkowski <john.napiorkowski@takkle.com>
777
f5d3a5de 778=head1 LICENSE
779
780You may distribute this code under the same terms as Perl itself.
781
782=cut
783
7841;