got first pass on the replication and balancer, passing all of the old test suite...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 use Moose;
4 use DBIx::Class::Storage::DBI;
5 use DBIx::Class::Storage::DBI::Replicated::Pool;
6 use DBIx::Class::Storage::DBI::Replicated::Balancer;
7 use Scalar::Util qw(blessed);
8
9 extends 'DBIx::Class::Storage::DBI', 'Moose::Object';
10
11 =head1 NAME
12
13 DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
14
15 =head1 SYNOPSIS
16
17 The Following example shows how to change an existing $schema to a replicated
18 storage type, add some replicated (readonly) databases, and perform reporting
19 tasks
20
21     ## Change storage_type in your schema class
22     $schema->storage_type( '::DBI::Replicated' );
23     
24     ## Add some slaves.  Basically this is an array of arrayrefs, where each
25     ## arrayref is database connect information
26     
27     $schema->storage->create_replicants(
28         [$dsn1, $user, $pass, \%opts],
29         [$dsn1, $user, $pass, \%opts],
30         [$dsn1, $user, $pass, \%opts],
31         ## This is just going to use the standard DBIC connect method, so it
32         ## supports everything that method supports, such as connecting to an
33         ## existing database handle.
34         [$dbh],
35         \%global_opts
36     );
37     
38     ## a hash of replicants, keyed by their DSN
39     my %replicants = $schema->storage->replicants;
40     my $replicant = $schema->storage->get_replicant($dsn);
41     $replicant->status;
42     $replicant->is_active;
43     $replicant->active;
44     
45 =head1 DESCRIPTION
46
47 Warning: This class is marked ALPHA.  We are using this in development and have
48 some basic test coverage but the code hasn't yet been stressed by a variety
49 of databases.  Individual DB's may have quirks we are not aware of.  Please
50 use this in development and pass along your experiences/bug fixes.
51
52 This class implements replicated data store for DBI. Currently you can define
53 one master and numerous slave database connections. All write-type queries
54 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
55 database, all read-type queries (SELECTs) go to the slave database.
56
57 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
58 handle gets delegated to one of the two attributes: L</master_storage> or to
59 L</current_replicant_storage>.  Additionally, some methods need to be distributed
60 to all existing storages.  This way our storage class is a drop in replacement
61 for L<DBIx::Class::Storage::DBI>.
62
63 Read traffic is spread across the replicants (slaves) occuring to a user
64 selected algorithm.  The default algorithm is random weighted.
65
66 TODO more details about the algorithm.
67
68 =head1 ATTRIBUTES
69
70 This class defines the following attributes.
71
72 =head2 master
73
74 The master defines the canonical state for a pool of connected databases.  All
75 the replicants are expected to match this databases state.  Thus, in a classic
76 Master / Slaves distributed system, all the slaves are expected to replicate
77 the Master's state as quick as possible.  This is the only database in the
78 pool of databases that is allowed to handle write traffic.
79
80 =cut
81
82 has 'master' => (
83     is=> 'ro',
84     isa=>'DBIx::Class::Storage::DBI',
85     lazy_build=>1,
86     handles=>[qw/   
87         on_connect_do
88         on_disconnect_do       
89         connect_info
90         throw_exception
91         sql_maker
92         sqlt_type
93         create_ddl_dir
94         deployment_statements
95         datetime_parser
96         datetime_parser_type        
97         last_insert_id
98         insert
99         insert_bulk
100         update
101         delete
102         dbh
103         txn_do
104         txn_commit
105         txn_rollback
106         sth
107         deploy
108     /],
109 );
110
111
112 =head2 current_replicant
113
114 Replicant storages (slaves) handle all read only traffic.  The assumption is
115 that your database will become readbound well before it becomes write bound
116 and that being able to spread your read only traffic around to multiple 
117 databases is going to help you to scale traffic.
118
119 This attribute returns the next slave to handle a read request.  Your L</pool>
120 attribute has methods to help you shuffle through all the available replicants
121 via it's balancer object.
122
123 This attribute defines the following reader/writer methods
124
125 =over 4
126
127 =item get_current_replicant
128
129 Returns the contained L<DBIx::Class::Storage::DBI> replicant
130
131 =item set_current_replicant
132
133 Set the attribute to a given L<DBIx::Class::Storage::DBI> (or subclass) object.
134
135 =back
136
137 We split the reader/writer to make it easier to selectively override how the
138 replicant is altered.
139
140 =cut
141
142 has 'current_replicant' => (
143     is=> 'rw',
144     reader=>'get_current_replicant',
145     writer=>'set_current_replicant',
146     isa=>'DBIx::Class::Storage::DBI',
147     lazy_build=>1,
148     handles=>[qw/
149         select
150         select_single
151         columns_info_for
152     /],
153 );
154
155
156 =head2 pool_type
157
158 Contains the classname which will instantiate the L</pool> object.  Defaults 
159 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
160
161 =cut
162
163 has 'pool_type' => (
164     is=>'ro',
165     isa=>'ClassName',
166     required=>1,
167     lazy=>1,
168     default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
169     handles=>{
170         'create_pool' => 'new',
171     },
172 );
173
174
175 =head2 balancer_type
176
177 The replication pool requires a balance class to provider the methods for
178 choose how to spread the query load across each replicant in the pool.
179
180 =cut
181
182 has 'balancer_type' => (
183     is=>'ro',
184     isa=>'ClassName',
185     required=>1,
186     lazy=>1,
187     default=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
188     handles=>{
189         'create_balancer' => 'new',
190     },
191 );
192
193
194 =head2 pool
195
196 Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
197 container class for one or more replicated databases.
198
199 =cut
200
201 has 'pool' => (
202     is=>'ro',
203     isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
204     lazy_build=>1,
205     handles=>[qw/
206         replicants
207         has_replicants
208         create_replicants
209         num_replicants
210         delete_replicant
211     /],
212 );
213
214
215 =head2 balancer
216
217 Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This 
218 is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
219
220 =cut
221
222 has 'balancer' => (
223     is=>'ro',
224     isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
225     lazy_build=>1,
226     handles=>[qw/next_storage/],
227 );
228
229 =head1 METHODS
230
231 This class defines the following methods.
232
233 =head2 _build_master
234
235 Lazy builder for the L</master> attribute.
236
237 =cut
238
239 sub _build_master {
240         DBIx::Class::Storage::DBI->new;
241 }
242
243
244 =head2 _build_current_replicant
245
246 Lazy builder for the L</current_replicant_storage> attribute.
247
248 =cut
249
250 sub _build_current_replicant {
251         my $self = shift @_;
252         $self->next_storage($self->pool);
253 }
254
255
256 =head2 _build_pool
257
258 Lazy builder for the L</pool> attribute.
259
260 =cut
261
262 sub _build_pool {
263     my $self = shift @_;
264     $self->create_pool;
265 }
266
267
268 =head2 _build_balancer
269
270 Lazy builder for the L</balancer> attribute.
271
272 =cut
273
274 sub _build_balancer {
275     my $self = shift @_;
276     $self->create_balancer;
277 }
278
279
280 =head2 after: get_current_replicant_storage
281
282 Advice on the current_replicant_storage attribute.  Each time we use a replicant
283 we need to change it via the storage pool algorithm.  That way we are spreading
284 the load evenly (hopefully) across existing capacity.
285
286 =cut
287
288 after 'get_current_replicant' => sub {
289     my $self = shift @_;
290     my $next_replicant = $self->next_storage($self->pool);
291     
292     $self->set_current_replicant($next_replicant);
293 };
294
295
296 =head2 all_storages
297
298 Returns an array of of all the connected storage backends.  The first element
299 in the returned array is the master, and the remainings are each of the
300 replicants.
301
302 =cut
303
304 sub all_storages {
305         my $self = shift @_;
306         
307         return grep {defined $_ && blessed $_} (
308            $self->master,
309            $self->replicants,
310         );
311 }
312
313
314 =head2 connected
315
316 Check that the master and at least one of the replicants is connected.
317
318 =cut
319
320 sub connected {
321         my $self = shift @_;
322         
323         return
324            $self->master->connected &&
325            $self->pool->connected_replicants;
326 }
327
328
329 =head2 ensure_connected
330
331 Make sure all the storages are connected.
332
333 =cut
334
335 sub ensure_connected {
336     my $self = shift @_;
337     foreach my $source ($self->all_storages) {
338         $source->ensure_connected(@_);
339     }
340 }
341
342
343 =head2 limit_dialect
344
345 Set the limit_dialect for all existing storages
346
347 =cut
348
349 sub limit_dialect {
350     my $self = shift @_;
351     foreach my $source ($self->all_storages) {
352         $source->limit_dialect(@_);
353     }
354 }
355
356
357 =head2 quote_char
358
359 Set the quote_char for all existing storages
360
361 =cut
362
363 sub quote_char {
364     my $self = shift @_;
365     foreach my $source ($self->all_storages) {
366         $source->quote_char(@_);
367     }
368 }
369
370
371 =head2 name_sep
372
373 Set the name_sep for all existing storages
374
375 =cut
376
377 sub name_sep {
378     my $self = shift @_;
379     foreach my $source ($self->all_storages) {
380         $source->name_sep(@_);
381     }
382 }
383
384
385 =head2 set_schema
386
387 Set the schema object for all existing storages
388
389 =cut
390
391 sub set_schema {
392         my $self = shift @_;
393         foreach my $source ($self->all_storages) {
394                 $source->set_schema(@_);
395         }
396 }
397
398
399 =head2 debug
400
401 set a debug flag across all storages
402
403 =cut
404
405 sub debug {
406     my $self = shift @_;
407     foreach my $source ($self->all_storages) {
408         $source->debug(@_);
409     }
410 }
411
412
413 =head2 debugobj
414
415 set a debug object across all storages
416
417 =cut
418
419 sub debugobj {
420     my $self = shift @_;
421     foreach my $source ($self->all_storages) {
422         $source->debugobj(@_);
423     }
424 }
425
426
427 =head2 debugfh
428
429 set a debugfh object across all storages
430
431 =cut
432
433 sub debugfh {
434     my $self = shift @_;
435     foreach my $source ($self->all_storages) {
436         $source->debugfh(@_);
437     }
438 }
439
440
441 =head2 debugcb
442
443 set a debug callback across all storages
444
445 =cut
446
447 sub debugcb {
448     my $self = shift @_;
449     foreach my $source ($self->all_storages) {
450         $source->debugcb(@_);
451     }
452 }
453
454
455 =head2 disconnect
456
457 disconnect everything
458
459 =cut
460
461 sub disconnect {
462     my $self = shift @_;
463     foreach my $source ($self->all_storages) {
464         $source->disconnect(@_);
465     }
466 }
467
468
469 =head2 DESTROY
470
471 Make sure we pass destroy events down to the storage handlers
472
473 =cut
474
475 sub DESTROY {
476     my $self = shift;
477     ## TODO, maybe we can just leave this alone ???
478 }
479
480
481 =head1 AUTHOR
482
483 Norbert Csongrádi <bert@cpan.org>
484
485 Peter Siklósi <einon@einon.hu>
486
487 John Napiorkowski <john.napiorkowski@takkle.com>
488
489 =head1 LICENSE
490
491 You may distribute this code under the same terms as Perl itself.
492
493 =cut
494
495 1;
496
497 __END__
498
499 use strict;
500 use warnings;
501
502 use DBIx::Class::Storage::DBI;
503 use DBD::Multi;
504
505 use base qw/Class::Accessor::Fast/;
506
507 __PACKAGE__->mk_accessors( qw/read_source write_source/ );
508
509 =head1 NAME
510
511 DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
512
513 =head1 SYNOPSIS
514
515 The Following example shows how to change an existing $schema to a replicated
516 storage type and update it's connection information to contain a master DSN and
517 an array of slaves.
518
519     ## Change storage_type in your schema class
520     $schema->storage_type( '::DBI::Replicated' );
521     
522     ## Set your connection.
523     $schema->connect(
524         $dsn, $user, $password, {
525                 AutoCommit => 1,
526                 ## Other standard DBI connection or DBD custom attributes added as
527                 ## usual.  Additionally, we have two custom attributes for defining
528                 ## slave information and controlling how the underlying DBD::Multi
529                 slaves_connect_info => [
530                    ## Define each slave like a 'normal' DBI connection, but you add
531                    ## in a DBD::Multi custom attribute to define how the slave is
532                    ## prioritized.  Please see DBD::Multi for more.
533                    [$slave1dsn, $user, $password, {%slave1opts, priority=>10}],
534                [$slave2dsn, $user, $password, {%slave2opts, priority=>10}],
535                [$slave3dsn, $user, $password, {%slave3opts, priority=>20}],
536                ## add in a preexisting database handle
537                [$dbh, '','', {priority=>30}], 
538                ## DBD::Multi will call this coderef for connects 
539                [sub {  DBI->connect(< DSN info >) }, '', '', {priority=>40}],  
540                ## If the last item is hashref, we use that for DBD::Multi's 
541                ## configuration information.  Again, see DBD::Multi for more.
542                {timeout=>25, failed_max=>2},               
543                 ],
544         },
545     );
546     
547     ## Now, just use the schema as normal
548     $schema->resultset('Table')->find(< unique >); ## Reads will use slaves
549     $schema->resultset('Table')->create(\%info); ## Writes will use master
550
551 =head1 DESCRIPTION
552
553 Warning: This class is marked ALPHA.  We are using this in development and have
554 some basic test coverage but the code hasn't yet been stressed by a variety
555 of databases.  Individual DB's may have quirks we are not aware of.  Please
556 use this in development and pass along your experiences/bug fixes.
557
558 This class implements replicated data store for DBI. Currently you can define
559 one master and numerous slave database connections. All write-type queries
560 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
561 database, all read-type queries (SELECTs) go to the slave database.
562
563 For every slave database you can define a priority value, which controls data
564 source usage pattern. It uses L<DBD::Multi>, so first the lower priority data
565 sources used (if they have the same priority, the are used randomized), than
566 if all low priority data sources fail, higher ones tried in order.
567
568 =head1 CONFIGURATION
569
570 Please see L<DBD::Multi> for most configuration information.
571
572 =cut
573
574 sub new {
575     my $proto = shift;
576     my $class = ref( $proto ) || $proto;
577     my $self = {};
578
579     bless( $self, $class );
580
581     $self->write_source( DBIx::Class::Storage::DBI->new );
582     $self->read_source( DBIx::Class::Storage::DBI->new );
583
584     return $self;
585 }
586
587 sub all_sources {
588     my $self = shift;
589
590     my @sources = ($self->read_source, $self->write_source);
591
592     return wantarray ? @sources : \@sources;
593 }
594
595 sub _connect_info {
596         my $self = shift;
597     my $master = $self->write_source->_connect_info;
598     $master->[-1]->{slave_connect_info} = $self->read_source->_connect_info;
599     return $master;
600 }
601
602 sub connect_info {
603         my ($self, $source_info) = @_;
604
605     ## if there is no $source_info, treat this sub like an accessor
606     return $self->_connect_info
607      if !$source_info;
608     
609     ## Alright, let's conect the master 
610     $self->write_source->connect_info($source_info);
611   
612     ## Now, build and then connect the Slaves
613     my @slaves_connect_info = @{$source_info->[-1]->{slaves_connect_info}};   
614     my $dbd_multi_config = ref $slaves_connect_info[-1] eq 'HASH' 
615         ? pop @slaves_connect_info : {};
616
617     ## We need to do this since SQL::Abstract::Limit can't guess what DBD::Multi is
618     $dbd_multi_config->{limit_dialect} = $self->write_source->sql_maker->limit_dialect
619         unless defined $dbd_multi_config->{limit_dialect};
620
621     @slaves_connect_info = map {
622         ## if the first element in the arrayhash is a ref, make that the value
623         my $db = ref $_->[0] ? $_->[0] : $_;
624         my $priority = $_->[-1]->{priority} || 10; ## default priority is 10
625         $priority => $db;
626     } @slaves_connect_info;
627     
628     $self->read_source->connect_info([ 
629         'dbi:Multi:', undef, undef, { 
630                 dsns => [@slaves_connect_info],
631                 %$dbd_multi_config,
632         },
633     ]);
634     
635     ## Return the formated connection information
636     return $self->_connect_info;
637 }
638
639 sub select {
640     shift->read_source->select( @_ );
641 }
642 sub select_single {
643     shift->read_source->select_single( @_ );
644 }
645 sub throw_exception {
646     shift->read_source->throw_exception( @_ );
647 }
648 sub sql_maker {
649     shift->read_source->sql_maker( @_ );
650 }
651 sub columns_info_for {
652     shift->read_source->columns_info_for( @_ );
653 }
654 sub sqlt_type {
655     shift->read_source->sqlt_type( @_ );
656 }
657 sub create_ddl_dir {
658     shift->read_source->create_ddl_dir( @_ );
659 }
660 sub deployment_statements {
661     shift->read_source->deployment_statements( @_ );
662 }
663 sub datetime_parser {
664     shift->read_source->datetime_parser( @_ );
665 }
666 sub datetime_parser_type {
667     shift->read_source->datetime_parser_type( @_ );
668 }
669 sub build_datetime_parser {
670     shift->read_source->build_datetime_parser( @_ );
671 }
672
673 sub limit_dialect { $_->limit_dialect( @_ ) for( shift->all_sources ) }
674 sub quote_char { $_->quote_char( @_ ) for( shift->all_sources ) }
675 sub name_sep { $_->quote_char( @_ ) for( shift->all_sources ) }
676 sub disconnect { $_->disconnect( @_ ) for( shift->all_sources ) }
677 sub set_schema { $_->set_schema( @_ ) for( shift->all_sources ) }
678
679 sub DESTROY {
680     my $self = shift;
681
682     undef $self->{write_source};
683     undef $self->{read_sources};
684 }
685
686 sub last_insert_id {
687     shift->write_source->last_insert_id( @_ );
688 }
689 sub insert {
690     shift->write_source->insert( @_ );
691 }
692 sub update {
693     shift->write_source->update( @_ );
694 }
695 sub update_all {
696     shift->write_source->update_all( @_ );
697 }
698 sub delete {
699     shift->write_source->delete( @_ );
700 }
701 sub delete_all {
702     shift->write_source->delete_all( @_ );
703 }
704 sub create {
705     shift->write_source->create( @_ );
706 }
707 sub find_or_create {
708     shift->write_source->find_or_create( @_ );
709 }
710 sub update_or_create {
711     shift->write_source->update_or_create( @_ );
712 }
713 sub connected {
714     shift->write_source->connected( @_ );
715 }
716 sub ensure_connected {
717     shift->write_source->ensure_connected( @_ );
718 }
719 sub dbh {
720     shift->write_source->dbh( @_ );
721 }
722 sub txn_do {
723     shift->write_source->txn_do( @_ );
724 }
725 sub txn_commit {
726     shift->write_source->txn_commit( @_ );
727 }
728 sub txn_rollback {
729     shift->write_source->txn_rollback( @_ );
730 }
731 sub sth {
732     shift->write_source->sth( @_ );
733 }
734 sub deploy {
735     shift->write_source->deploy( @_ );
736 }
737 sub _prep_for_execute {
738         shift->write_source->_prep_for_execute(@_);
739 }
740
741 sub debugobj {
742         shift->write_source->debugobj(@_);
743 }
744 sub debug {
745     shift->write_source->debug(@_);
746 }
747
748 sub debugfh { shift->_not_supported( 'debugfh' ) };
749 sub debugcb { shift->_not_supported( 'debugcb' ) };
750
751 sub _not_supported {
752     my( $self, $method ) = @_;
753
754     die "This Storage does not support $method method.";
755 }
756
757 =head1 SEE ALSO
758
759 L<DBI::Class::Storage::DBI>, L<DBD::Multi>, L<DBI>
760
761 =head1 AUTHOR
762
763 Norbert Csongrádi <bert@cpan.org>
764
765 Peter Siklósi <einon@einon.hu>
766
767 John Napiorkowski <john.napiorkowski@takkle.com>
768
769 =head1 LICENSE
770
771 You may distribute this code under the same terms as Perl itself.
772
773 =cut
774
775 1;