added some advice to debugging replicants so that we can see a replicant dsn, got...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 use Moose;
4 use DBIx::Class::Storage::DBI;
5 use DBIx::Class::Storage::DBI::Replicated::Pool;
6 use DBIx::Class::Storage::DBI::Replicated::Balancer;
7 use Scalar::Util qw(blessed);
8
9 extends 'DBIx::Class::Storage::DBI', 'Moose::Object';
10
11 =head1 NAME
12
13 DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
14
15 =head1 SYNOPSIS
16
17 The Following example shows how to change an existing $schema to a replicated
18 storage type, add some replicated (readonly) databases, and perform reporting
19 tasks
20
21     ## Change storage_type in your schema class
22     $schema->storage_type( '::DBI::Replicated' );
23     
24     ## Add some slaves.  Basically this is an array of arrayrefs, where each
25     ## arrayref is database connect information
26     
27     $schema->storage->create_replicants(
28         [$dsn1, $user, $pass, \%opts],
29         [$dsn1, $user, $pass, \%opts],
30         [$dsn1, $user, $pass, \%opts],
31         ## This is just going to use the standard DBIC connect method, so it
32         ## supports everything that method supports, such as connecting to an
33         ## existing database handle.
34         [$dbh],
35     );
36
37     
38 =head1 DESCRIPTION
39
40 Warning: This class is marked ALPHA.  We are using this in development and have
41 some basic test coverage but the code hasn't yet been stressed by a variety
42 of databases.  Individual DB's may have quirks we are not aware of.  Please
43 use this in development and pass along your experiences/bug fixes.
44
45 This class implements replicated data store for DBI. Currently you can define
46 one master and numerous slave database connections. All write-type queries
47 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
48 database, all read-type queries (SELECTs) go to the slave database.
49
50 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
51 handle gets delegated to one of the two attributes: L</master_storage> or to
52 L</current_replicant_storage>.  Additionally, some methods need to be distributed
53 to all existing storages.  This way our storage class is a drop in replacement
54 for L<DBIx::Class::Storage::DBI>.
55
56 Read traffic is spread across the replicants (slaves) occuring to a user
57 selected algorithm.  The default algorithm is random weighted.
58
59 TODO more details about the algorithm.
60
61 =head1 ATTRIBUTES
62
63 This class defines the following attributes.
64
65 =head2 master
66
67 The master defines the canonical state for a pool of connected databases.  All
68 the replicants are expected to match this databases state.  Thus, in a classic
69 Master / Slaves distributed system, all the slaves are expected to replicate
70 the Master's state as quick as possible.  This is the only database in the
71 pool of databases that is allowed to handle write traffic.
72
73 =cut
74
75 has 'master' => (
76     is=> 'ro',
77     isa=>'DBIx::Class::Storage::DBI',
78     lazy_build=>1,
79     handles=>[qw/   
80         on_connect_do
81         on_disconnect_do       
82         connect_info
83         throw_exception
84         sql_maker
85         sqlt_type
86         create_ddl_dir
87         deployment_statements
88         datetime_parser
89         datetime_parser_type        
90         last_insert_id
91         insert
92         insert_bulk
93         update
94         delete
95         dbh
96         txn_do
97         txn_commit
98         txn_rollback
99         sth
100         deploy
101         schema
102     /],
103 );
104
105
106 =head2 current_replicant
107
108 Replicant storages (slaves) handle all read only traffic.  The assumption is
109 that your database will become readbound well before it becomes write bound
110 and that being able to spread your read only traffic around to multiple 
111 databases is going to help you to scale traffic.
112
113 This attribute returns the next slave to handle a read request.  Your L</pool>
114 attribute has methods to help you shuffle through all the available replicants
115 via it's balancer object.
116
117 This attribute defines the following reader/writer methods
118
119 =over 4
120
121 =item get_current_replicant
122
123 Returns the contained L<DBIx::Class::Storage::DBI> replicant
124
125 =item set_current_replicant
126
127 Set the attribute to a given L<DBIx::Class::Storage::DBI> (or subclass) object.
128
129 =back
130
131 We split the reader/writer to make it easier to selectively override how the
132 replicant is altered.
133
134 =cut
135
136 has 'current_replicant' => (
137     is=> 'rw',
138     reader=>'get_current_replicant',
139     writer=>'set_current_replicant',
140     isa=>'DBIx::Class::Storage::DBI',
141     lazy_build=>1,
142     handles=>[qw/
143         select
144         select_single
145         columns_info_for
146     /],
147 );
148
149
150 =head2 pool_type
151
152 Contains the classname which will instantiate the L</pool> object.  Defaults 
153 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
154
155 =cut
156
157 has 'pool_type' => (
158     is=>'ro',
159     isa=>'ClassName',
160     required=>1,
161     lazy=>1,
162     default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
163     handles=>{
164         'create_pool' => 'new',
165     },
166 );
167
168
169 =head2 balancer_type
170
171 The replication pool requires a balance class to provider the methods for
172 choose how to spread the query load across each replicant in the pool.
173
174 =cut
175
176 has 'balancer_type' => (
177     is=>'ro',
178     isa=>'ClassName',
179     required=>1,
180     lazy=>1,
181     default=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
182     handles=>{
183         'create_balancer' => 'new',
184     },
185 );
186
187
188 =head2 pool
189
190 Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
191 container class for one or more replicated databases.
192
193 =cut
194
195 has 'pool' => (
196     is=>'ro',
197     isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
198     lazy_build=>1,
199     handles=>[qw/
200         replicants
201         has_replicants
202         create_replicants
203         num_replicants
204         delete_replicant
205     /],
206 );
207
208
209 =head2 balancer
210
211 Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This 
212 is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
213
214 =cut
215
216 has 'balancer' => (
217     is=>'ro',
218     isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
219     lazy_build=>1,
220     handles=>[qw/next_storage/],
221 );
222
223 =head1 METHODS
224
225 This class defines the following methods.
226
227 =head2 _build_master
228
229 Lazy builder for the L</master> attribute.
230
231 =cut
232
233 sub _build_master {
234         DBIx::Class::Storage::DBI->new;
235 }
236
237
238 =head2 _build_current_replicant
239
240 Lazy builder for the L</current_replicant_storage> attribute.
241
242 =cut
243
244 sub _build_current_replicant {
245         my $self = shift @_;
246         $self->next_storage($self->pool);
247 }
248
249
250 =head2 _build_pool
251
252 Lazy builder for the L</pool> attribute.
253
254 =cut
255
256 sub _build_pool {
257     my $self = shift @_;
258     $self->create_pool;
259 }
260
261
262 =head2 _build_balancer
263
264 Lazy builder for the L</balancer> attribute.
265
266 =cut
267
268 sub _build_balancer {
269     my $self = shift @_;
270     $self->create_balancer;
271 }
272
273
274 =head2 around: create_replicants
275
276 All calls to create_replicants needs to have an existing $schema tacked onto
277 top of the args
278
279 =cut
280
281 around 'create_replicants' => sub {
282         my ($method, $self, @args) = @_;
283         $self->$method($self->schema, @args);
284 };
285
286
287 =head2 after: get_current_replicant_storage
288
289 Advice on the current_replicant_storage attribute.  Each time we use a replicant
290 we need to change it via the storage pool algorithm.  That way we are spreading
291 the load evenly (hopefully) across existing capacity.
292
293 =cut
294
295 after 'current_replicant' => sub {
296     my $self = shift @_;
297     my $next_replicant = $self->next_storage($self->pool);
298
299 warn '......................';
300     $self->set_current_replicant($next_replicant);
301 };
302
303
304 =head2 all_storages
305
306 Returns an array of of all the connected storage backends.  The first element
307 in the returned array is the master, and the remainings are each of the
308 replicants.
309
310 =cut
311
312 sub all_storages {
313         my $self = shift @_;
314         
315         return grep {defined $_ && blessed $_} (
316            $self->master,
317            $self->replicants,
318         );
319 }
320
321
322 =head2 connected
323
324 Check that the master and at least one of the replicants is connected.
325
326 =cut
327
328 sub connected {
329         my $self = shift @_;
330         
331         return
332            $self->master->connected &&
333            $self->pool->connected_replicants;
334 }
335
336
337 =head2 ensure_connected
338
339 Make sure all the storages are connected.
340
341 =cut
342
343 sub ensure_connected {
344     my $self = shift @_;
345     foreach my $source ($self->all_storages) {
346         $source->ensure_connected(@_);
347     }
348 }
349
350
351 =head2 limit_dialect
352
353 Set the limit_dialect for all existing storages
354
355 =cut
356
357 sub limit_dialect {
358     my $self = shift @_;
359     foreach my $source ($self->all_storages) {
360         $source->limit_dialect(@_);
361     }
362 }
363
364
365 =head2 quote_char
366
367 Set the quote_char for all existing storages
368
369 =cut
370
371 sub quote_char {
372     my $self = shift @_;
373     foreach my $source ($self->all_storages) {
374         $source->quote_char(@_);
375     }
376 }
377
378
379 =head2 name_sep
380
381 Set the name_sep for all existing storages
382
383 =cut
384
385 sub name_sep {
386     my $self = shift @_;
387     foreach my $source ($self->all_storages) {
388         $source->name_sep(@_);
389     }
390 }
391
392
393 =head2 set_schema
394
395 Set the schema object for all existing storages
396
397 =cut
398
399 sub set_schema {
400         my $self = shift @_;
401         foreach my $source ($self->all_storages) {
402                 $source->set_schema(@_);
403         }
404 }
405
406
407 =head2 debug
408
409 set a debug flag across all storages
410
411 =cut
412
413 sub debug {
414     my $self = shift @_;
415     foreach my $source ($self->all_storages) {
416         $source->debug(@_);
417     }
418 }
419
420
421 =head2 debugobj
422
423 set a debug object across all storages
424
425 =cut
426
427 sub debugobj {
428     my $self = shift @_;
429     foreach my $source ($self->all_storages) {
430         $source->debugobj(@_);
431     }
432 }
433
434
435 =head2 debugfh
436
437 set a debugfh object across all storages
438
439 =cut
440
441 sub debugfh {
442     my $self = shift @_;
443     foreach my $source ($self->all_storages) {
444         $source->debugfh(@_);
445     }
446 }
447
448
449 =head2 debugcb
450
451 set a debug callback across all storages
452
453 =cut
454
455 sub debugcb {
456     my $self = shift @_;
457     foreach my $source ($self->all_storages) {
458         $source->debugcb(@_);
459     }
460 }
461
462
463 =head2 disconnect
464
465 disconnect everything
466
467 =cut
468
469 sub disconnect {
470     my $self = shift @_;
471     foreach my $source ($self->all_storages) {
472         $source->disconnect(@_);
473     }
474 }
475
476
477 =head2 DESTROY
478
479 Make sure we pass destroy events down to the storage handlers
480
481 =cut
482
483 sub DESTROY {
484     my $self = shift;
485     ## TODO, maybe we can just leave this alone ???
486 }
487
488
489 =head1 AUTHOR
490
491 Norbert Csongrádi <bert@cpan.org>
492
493 Peter Siklósi <einon@einon.hu>
494
495 John Napiorkowski <john.napiorkowski@takkle.com>
496
497 =head1 LICENSE
498
499 You may distribute this code under the same terms as Perl itself.
500
501 =cut
502
503 1;
504
505 __END__
506
507 use strict;
508 use warnings;
509
510 use DBIx::Class::Storage::DBI;
511 use DBD::Multi;
512
513 use base qw/Class::Accessor::Fast/;
514
515 __PACKAGE__->mk_accessors( qw/read_source write_source/ );
516
517 =head1 NAME
518
519 DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
520
521 =head1 SYNOPSIS
522
523 The Following example shows how to change an existing $schema to a replicated
524 storage type and update it's connection information to contain a master DSN and
525 an array of slaves.
526
527     ## Change storage_type in your schema class
528     $schema->storage_type( '::DBI::Replicated' );
529     
530     ## Set your connection.
531     $schema->connect(
532         $dsn, $user, $password, {
533                 AutoCommit => 1,
534                 ## Other standard DBI connection or DBD custom attributes added as
535                 ## usual.  Additionally, we have two custom attributes for defining
536                 ## slave information and controlling how the underlying DBD::Multi
537                 slaves_connect_info => [
538                    ## Define each slave like a 'normal' DBI connection, but you add
539                    ## in a DBD::Multi custom attribute to define how the slave is
540                    ## prioritized.  Please see DBD::Multi for more.
541                    [$slave1dsn, $user, $password, {%slave1opts, priority=>10}],
542                [$slave2dsn, $user, $password, {%slave2opts, priority=>10}],
543                [$slave3dsn, $user, $password, {%slave3opts, priority=>20}],
544                ## add in a preexisting database handle
545                [$dbh, '','', {priority=>30}], 
546                ## DBD::Multi will call this coderef for connects 
547                [sub {  DBI->connect(< DSN info >) }, '', '', {priority=>40}],  
548                ## If the last item is hashref, we use that for DBD::Multi's 
549                ## configuration information.  Again, see DBD::Multi for more.
550                {timeout=>25, failed_max=>2},               
551                 ],
552         },
553     );
554     
555     ## Now, just use the schema as normal
556     $schema->resultset('Table')->find(< unique >); ## Reads will use slaves
557     $schema->resultset('Table')->create(\%info); ## Writes will use master
558
559 =head1 DESCRIPTION
560
561 Warning: This class is marked ALPHA.  We are using this in development and have
562 some basic test coverage but the code hasn't yet been stressed by a variety
563 of databases.  Individual DB's may have quirks we are not aware of.  Please
564 use this in development and pass along your experiences/bug fixes.
565
566 This class implements replicated data store for DBI. Currently you can define
567 one master and numerous slave database connections. All write-type queries
568 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
569 database, all read-type queries (SELECTs) go to the slave database.
570
571 For every slave database you can define a priority value, which controls data
572 source usage pattern. It uses L<DBD::Multi>, so first the lower priority data
573 sources used (if they have the same priority, the are used randomized), than
574 if all low priority data sources fail, higher ones tried in order.
575
576 =head1 CONFIGURATION
577
578 Please see L<DBD::Multi> for most configuration information.
579
580 =cut
581
582 sub new {
583     my $proto = shift;
584     my $class = ref( $proto ) || $proto;
585     my $self = {};
586
587     bless( $self, $class );
588
589     $self->write_source( DBIx::Class::Storage::DBI->new );
590     $self->read_source( DBIx::Class::Storage::DBI->new );
591
592     return $self;
593 }
594
595 sub all_sources {
596     my $self = shift;
597
598     my @sources = ($self->read_source, $self->write_source);
599
600     return wantarray ? @sources : \@sources;
601 }
602
603 sub _connect_info {
604         my $self = shift;
605     my $master = $self->write_source->_connect_info;
606     $master->[-1]->{slave_connect_info} = $self->read_source->_connect_info;
607     return $master;
608 }
609
610 sub connect_info {
611         my ($self, $source_info) = @_;
612
613     ## if there is no $source_info, treat this sub like an accessor
614     return $self->_connect_info
615      if !$source_info;
616     
617     ## Alright, let's conect the master 
618     $self->write_source->connect_info($source_info);
619   
620     ## Now, build and then connect the Slaves
621     my @slaves_connect_info = @{$source_info->[-1]->{slaves_connect_info}};   
622     my $dbd_multi_config = ref $slaves_connect_info[-1] eq 'HASH' 
623         ? pop @slaves_connect_info : {};
624
625     ## We need to do this since SQL::Abstract::Limit can't guess what DBD::Multi is
626     $dbd_multi_config->{limit_dialect} = $self->write_source->sql_maker->limit_dialect
627         unless defined $dbd_multi_config->{limit_dialect};
628
629     @slaves_connect_info = map {
630         ## if the first element in the arrayhash is a ref, make that the value
631         my $db = ref $_->[0] ? $_->[0] : $_;
632         my $priority = $_->[-1]->{priority} || 10; ## default priority is 10
633         $priority => $db;
634     } @slaves_connect_info;
635     
636     $self->read_source->connect_info([ 
637         'dbi:Multi:', undef, undef, { 
638                 dsns => [@slaves_connect_info],
639                 %$dbd_multi_config,
640         },
641     ]);
642     
643     ## Return the formated connection information
644     return $self->_connect_info;
645 }
646
647 sub select {
648     shift->read_source->select( @_ );
649 }
650 sub select_single {
651     shift->read_source->select_single( @_ );
652 }
653 sub throw_exception {
654     shift->read_source->throw_exception( @_ );
655 }
656 sub sql_maker {
657     shift->read_source->sql_maker( @_ );
658 }
659 sub columns_info_for {
660     shift->read_source->columns_info_for( @_ );
661 }
662 sub sqlt_type {
663     shift->read_source->sqlt_type( @_ );
664 }
665 sub create_ddl_dir {
666     shift->read_source->create_ddl_dir( @_ );
667 }
668 sub deployment_statements {
669     shift->read_source->deployment_statements( @_ );
670 }
671 sub datetime_parser {
672     shift->read_source->datetime_parser( @_ );
673 }
674 sub datetime_parser_type {
675     shift->read_source->datetime_parser_type( @_ );
676 }
677 sub build_datetime_parser {
678     shift->read_source->build_datetime_parser( @_ );
679 }
680
681 sub limit_dialect { $_->limit_dialect( @_ ) for( shift->all_sources ) }
682 sub quote_char { $_->quote_char( @_ ) for( shift->all_sources ) }
683 sub name_sep { $_->quote_char( @_ ) for( shift->all_sources ) }
684 sub disconnect { $_->disconnect( @_ ) for( shift->all_sources ) }
685 sub set_schema { $_->set_schema( @_ ) for( shift->all_sources ) }
686
687 sub DESTROY {
688     my $self = shift;
689
690     undef $self->{write_source};
691     undef $self->{read_sources};
692 }
693
694 sub last_insert_id {
695     shift->write_source->last_insert_id( @_ );
696 }
697 sub insert {
698     shift->write_source->insert( @_ );
699 }
700 sub update {
701     shift->write_source->update( @_ );
702 }
703 sub update_all {
704     shift->write_source->update_all( @_ );
705 }
706 sub delete {
707     shift->write_source->delete( @_ );
708 }
709 sub delete_all {
710     shift->write_source->delete_all( @_ );
711 }
712 sub create {
713     shift->write_source->create( @_ );
714 }
715 sub find_or_create {
716     shift->write_source->find_or_create( @_ );
717 }
718 sub update_or_create {
719     shift->write_source->update_or_create( @_ );
720 }
721 sub connected {
722     shift->write_source->connected( @_ );
723 }
724 sub ensure_connected {
725     shift->write_source->ensure_connected( @_ );
726 }
727 sub dbh {
728     shift->write_source->dbh( @_ );
729 }
730 sub txn_do {
731     shift->write_source->txn_do( @_ );
732 }
733 sub txn_commit {
734     shift->write_source->txn_commit( @_ );
735 }
736 sub txn_rollback {
737     shift->write_source->txn_rollback( @_ );
738 }
739 sub sth {
740     shift->write_source->sth( @_ );
741 }
742 sub deploy {
743     shift->write_source->deploy( @_ );
744 }
745 sub _prep_for_execute {
746         shift->write_source->_prep_for_execute(@_);
747 }
748
749 sub debugobj {
750         shift->write_source->debugobj(@_);
751 }
752 sub debug {
753     shift->write_source->debug(@_);
754 }
755
756 sub debugfh { shift->_not_supported( 'debugfh' ) };
757 sub debugcb { shift->_not_supported( 'debugcb' ) };
758
759 sub _not_supported {
760     my( $self, $method ) = @_;
761
762     die "This Storage does not support $method method.";
763 }
764
765 =head1 SEE ALSO
766
767 L<DBI::Class::Storage::DBI>, L<DBD::Multi>, L<DBI>
768
769 =head1 AUTHOR
770
771 Norbert Csongrádi <bert@cpan.org>
772
773 Peter Siklósi <einon@einon.hu>
774
775 John Napiorkowski <john.napiorkowski@takkle.com>
776
777 =head1 LICENSE
778
779 You may distribute this code under the same terms as Perl itself.
780
781 =cut
782
783 1;