new config option to DBICTest to let you set an alternative storage type, start on...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 use Moose;
4 use DBIx::Class::Storage::DBI::Replicated::Pool;
5
6 #extends 'DBIx::Class::Storage::DBI', 'Moose::Object';
7
8 =head1 NAME
9
10 DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
11
12 =head1 SYNOPSIS
13
14 The Following example shows how to change an existing $schema to a replicated
15 storage type, add some replicated (readonly) databases, and perform reporting
16 tasks
17
18     ## Change storage_type in your schema class
19     $schema->storage_type( '::DBI::Replicated' );
20     
21     ## Add some slaves.  Basically this is an array of arrayrefs, where each
22     ## arrayref is database connect information
23     
24     $schema->storage->create_replicants(
25         [$dsn1, $user, $pass, \%opts],
26         [$dsn1, $user, $pass, \%opts],
27         [$dsn1, $user, $pass, \%opts],
28         ## This is just going to use the standard DBIC connect method, so it
29         ## supports everything that method supports, such as connecting to an
30         ## existing database handle.
31         [$dbh],
32         \%global_opts
33     );
34     
35     ## a hash of replicants, keyed by their DSN
36     my %replicants = $schema->storage->replicants;
37     my $replicant = $schema->storage->get_replicant($dsn);
38     $replicant->status;
39     $replicant->is_active;
40     $replicant->active;
41     
42 =head1 DESCRIPTION
43
44 Warning: This class is marked ALPHA.  We are using this in development and have
45 some basic test coverage but the code hasn't yet been stressed by a variety
46 of databases.  Individual DB's may have quirks we are not aware of.  Please
47 use this in development and pass along your experiences/bug fixes.
48
49 This class implements replicated data store for DBI. Currently you can define
50 one master and numerous slave database connections. All write-type queries
51 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
52 database, all read-type queries (SELECTs) go to the slave database.
53
54 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
55 handle gets delegated to one of the two attributes: L</master_storage> or to
56 L</current_replicant_storage>.  Additionally, some methods need to be distributed
57 to all existing storages.  This way our storage class is a drop in replacement
58 for L<DBIx::Class::Storage::DBI>.
59
60 Read traffic is spread across the replicants (slaves) occuring to a user
61 selected algorithm.  The default algorithm is random weighted.
62
63 TODO more details about the algorithm.
64
65 =head1 ATTRIBUTES
66
67 This class defines the following attributes.
68
69 =head2 master
70
71 The master defines the canonical state for a pool of connected databases.  All
72 the replicants are expected to match this databases state.  Thus, in a classic
73 Master / Slaves distributed system, all the slaves are expected to replicate
74 the Master's state as quick as possible.  This is the only database in the
75 pool of databases that is allowed to handle write traffic.
76
77 =cut
78
79 has 'master' => (
80     is=> 'ro',
81     isa=>'DBIx::Class::Storage::DBI',
82     lazy_build=>1,
83     handles=>[qw/   
84         on_connect_do
85         on_disconnect_do       
86         columns_info_for
87         connect_info
88         throw_exception
89         sql_maker
90         sqlt_type
91         create_ddl_dir
92         deployment_statements
93         datetime_parser
94         datetime_parser_type        
95         last_insert_id
96         insert
97         insert_bulk
98         update
99         delete
100         dbh
101         txn_do
102         txn_commit
103         txn_rollback
104         sth
105         deploy
106     /],
107 );
108
109
110 =head2 current_replicant
111
112 Replicant storages (slaves) handle all read only traffic.  The assumption is
113 that your database will become readbound well before it becomes write bound
114 and that being able to spread your read only traffic around to multiple 
115 databases is going to help you to scale traffic.
116
117 This attribute returns the next slave to handle a read request.  Your L</pool>
118 attribute has methods to help you shuffle through all the available replicants
119 via it's balancer object.
120
121 This attribute defines the following reader/writer methods
122
123 =over 4
124
125 =item get_current_replicant
126
127 Returns the contained L<DBIx::Class::Storage::DBI> replicant
128
129 =item set_current_replicant
130
131 Set the attribute to a given L<DBIx::Class::Storage::DBI> (or subclass) object.
132
133 =back
134
135 We split the reader/writer to make it easier to selectively override how the
136 replicant is altered.
137
138 =cut
139
140 has 'current_replicant' => (
141     is=> 'rw',
142     reader=>'get_current_replicant',
143     writer=>'set_current_replicant',
144     isa=>'DBIx::Class::Storage::DBI',
145     lazy_build=>1,
146     handles=>[qw/
147         select
148         select_single
149         columns_info_for
150     /],
151 );
152
153
154 =head2 replicant_storage_pool_type
155
156 Contains the classname which will instantiate the L</replicant_storage_pool>
157 object.  Defaults to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
158
159 =cut
160
161 has 'replicant_storage_pool_type' => (
162     is=>'ro',
163     isa=>'ClassName',
164     required=>1,
165     default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
166     handles=> {
167         'create_replicant_storage_pool' => 'new',
168     },
169 );
170
171
172 =head2 pool_balancer_type
173
174 The replication pool requires a balance class to provider the methods for
175 choose how to spread the query load across each replicant in the pool.
176
177 =cut
178
179 has 'pool_balancer_type' => (
180     is=>'ro',
181     isa=>'ClassName',
182     required=>1,
183     default=>'DBIx::Class::Storage::DBI::Replicated::Pool::Balancer',
184     handles=> {
185         'create_replicant_storage_pool' => 'new',
186     },
187 );
188
189
190 =head2 replicant_storage_pool
191
192 Holds the list of connected replicants, their status and other housekeeping or
193 reporting methods.
194
195 =cut
196
197 has 'replicant_storage_pool' => (
198     is=>'ro',
199     isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
200     lazy_build=>1,
201     handles=>[qw/replicant_storages/],
202 );
203
204
205
206 =head1 METHODS
207
208 This class defines the following methods.
209
210 =head2 new
211
212 Make sure we properly inherit from L<Moose>.
213
214 =cut
215
216 sub new {
217     my $class = shift @_;
218     my $obj = $class->SUPER::new(@_);
219   
220     return $class->meta->new_object(
221         __INSTANCE__ => $obj, @_
222     );
223 }
224
225 =head2 _build_master_storage
226
227 Lazy builder for the L</master_storage> attribute.
228
229 =cut
230
231 sub _build_next_replicant_storage {
232         DBIx::Class::Storage::DBI->new;
233 }
234
235
236 =head2 _build_current_replicant_storage
237
238 Lazy builder for the L</current_replicant_storage> attribute.
239
240 =cut
241
242 sub _build_current_replicant_storage {
243     shift->replicant_storage_pool->first;
244 }
245
246
247 =head2 _build_replicant_storage_pool
248
249 Lazy builder for the L</replicant_storage_pool> attribute.
250
251 =cut
252
253 sub _build_replicant_storage_pool {
254     my $self = shift @_;
255     $self->create_replicant_storage_pool;
256 }
257
258
259 =head2 around: create_replicant_storage_pool
260
261 Make sure all calles to the method set a default balancer type to our current
262 balancer type.
263
264 =cut
265
266 around 'create_replicant_storage_pool' => sub {
267     my ($method, $self, @args) = @_;
268     return $self->$method(balancer_type=>$self->pool_balancer_type, @args);
269 }
270
271
272 =head2 after: get_current_replicant_storage
273
274 Advice on the current_replicant_storage attribute.  Each time we use a replicant
275 we need to change it via the storage pool algorithm.  That way we are spreading
276 the load evenly (hopefully) across existing capacity.
277
278 =cut
279
280 after 'get_current_replicant_storage' => sub {
281     my $self = shift @_;
282     my $next_replicant = $self->replicant_storage_pool->next;
283     $self->next_replicant_storage($next_replicant);
284 };
285
286
287 =head2 find_or_create
288
289 First do a find on the replicant.  If no rows are found, pass it on to the
290 L</master_storage>
291
292 =cut
293
294 sub find_or_create {
295         my $self = shift @_;
296 }
297
298 =head2 all_storages
299
300 Returns an array of of all the connected storage backends.  The first element
301 in the returned array is the master, and the remainings are each of the
302 replicants.
303
304 =cut
305
306 sub all_storages {
307         my $self = shift @_;
308         
309         return (
310            $self->master_storage,
311            $self->replicant_storages,
312         );
313 }
314
315
316 =head2 connected
317
318 Check that the master and at least one of the replicants is connected.
319
320 =cut
321
322 sub connected {
323         my $self = shift @_;
324         
325         return
326            $self->master_storage->connected &&
327            $self->replicant_storage_pool->has_connected_slaves;
328 }
329
330
331 =head2 ensure_connected
332
333 Make sure all the storages are connected.
334
335 =cut
336
337 sub ensure_connected {
338     my $self = shift @_;
339     foreach $source (shift->all_sources) {
340         $source->ensure_connected(@_);
341     }
342 }
343
344
345 =head2 limit_dialect
346
347 Set the limit_dialect for all existing storages
348
349 =cut
350
351 sub limit_dialect {
352     my $self = shift @_;
353     foreach $source (shift->all_sources) {
354         $source->name_sep(@_);
355     }
356 }
357
358
359 =head2 quote_char
360
361 Set the quote_char for all existing storages
362
363 =cut
364
365 sub quote_char {
366     my $self = shift @_;
367     foreach $source (shift->all_sources) {
368         $source->name_sep(@_);
369     }
370 }
371
372
373 =head2 name_sep
374
375 Set the name_sep for all existing storages
376
377 =cut
378
379 sub name_sep {
380     my $self = shift @_;
381     foreach $source (shift->all_sources) {
382         $source->name_sep(@_);
383     }
384 }
385
386
387 =head2 set_schema
388
389 Set the schema object for all existing storages
390
391 =cut
392
393 sub set_schema {
394         my $self = shift @_;
395         foreach $source (shift->all_sources) {
396                 $source->set_schema(@_);
397         }
398 }
399
400
401 =head2 debug
402
403 set a debug flag across all storages
404
405 =cut
406
407 sub debug {
408     my $self = shift @_;
409     foreach $source (shift->all_sources) {
410         $source->debug(@_);
411     }
412 }
413
414
415 =head2 debugobj
416
417 set a debug object across all storages
418
419 =cut
420
421 sub debugobj {
422     my $self = shift @_;
423     foreach $source (shift->all_sources) {
424         $source->debugobj(@_);
425     }
426 }
427
428
429 =head2 debugfh
430
431 set a debugfh object across all storages
432
433 =cut
434
435 sub debugfh {
436     my $self = shift @_;
437     foreach $source (shift->all_sources) {
438         $source->debugfh(@_);
439     }
440 }
441
442
443 =head2 debugcb
444
445 set a debug callback across all storages
446
447 =cut
448
449 sub debugcb {
450     my $self = shift @_;
451     foreach $source (shift->all_sources) {
452         $source->debugcb(@_);
453     }
454 }
455
456
457 =head2 disconnect
458
459 disconnect everything
460
461 =cut
462
463 sub disconnect {
464     my $self = shift @_;
465     foreach $source (shift->all_sources) {
466         $source->disconnect(@_);
467     }
468 }
469
470
471 =head2 DESTROY
472
473 Make sure we pass destroy events down to the storage handlers
474
475 =cut
476
477 sub DESTROY {
478     my $self = shift;
479     ## TODO, maybe we can just leave this alone ???
480 }
481
482
483 =head1 AUTHOR
484
485 Norbert Csongrádi <bert@cpan.org>
486
487 Peter Siklósi <einon@einon.hu>
488
489 John Napiorkowski <john.napiorkowski@takkle.com>
490
491 =head1 LICENSE
492
493 You may distribute this code under the same terms as Perl itself.
494
495 =cut
496
497 1;
498
499 __END__
500
501 use strict;
502 use warnings;
503
504 use DBIx::Class::Storage::DBI;
505 use DBD::Multi;
506
507 use base qw/Class::Accessor::Fast/;
508
509 __PACKAGE__->mk_accessors( qw/read_source write_source/ );
510
511 =head1 NAME
512
513 DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
514
515 =head1 SYNOPSIS
516
517 The Following example shows how to change an existing $schema to a replicated
518 storage type and update it's connection information to contain a master DSN and
519 an array of slaves.
520
521     ## Change storage_type in your schema class
522     $schema->storage_type( '::DBI::Replicated' );
523     
524     ## Set your connection.
525     $schema->connect(
526         $dsn, $user, $password, {
527                 AutoCommit => 1,
528                 ## Other standard DBI connection or DBD custom attributes added as
529                 ## usual.  Additionally, we have two custom attributes for defining
530                 ## slave information and controlling how the underlying DBD::Multi
531                 slaves_connect_info => [
532                    ## Define each slave like a 'normal' DBI connection, but you add
533                    ## in a DBD::Multi custom attribute to define how the slave is
534                    ## prioritized.  Please see DBD::Multi for more.
535                    [$slave1dsn, $user, $password, {%slave1opts, priority=>10}],
536                [$slave2dsn, $user, $password, {%slave2opts, priority=>10}],
537                [$slave3dsn, $user, $password, {%slave3opts, priority=>20}],
538                ## add in a preexisting database handle
539                [$dbh, '','', {priority=>30}], 
540                ## DBD::Multi will call this coderef for connects 
541                [sub {  DBI->connect(< DSN info >) }, '', '', {priority=>40}],  
542                ## If the last item is hashref, we use that for DBD::Multi's 
543                ## configuration information.  Again, see DBD::Multi for more.
544                {timeout=>25, failed_max=>2},               
545                 ],
546         },
547     );
548     
549     ## Now, just use the schema as normal
550     $schema->resultset('Table')->find(< unique >); ## Reads will use slaves
551     $schema->resultset('Table')->create(\%info); ## Writes will use master
552
553 =head1 DESCRIPTION
554
555 Warning: This class is marked ALPHA.  We are using this in development and have
556 some basic test coverage but the code hasn't yet been stressed by a variety
557 of databases.  Individual DB's may have quirks we are not aware of.  Please
558 use this in development and pass along your experiences/bug fixes.
559
560 This class implements replicated data store for DBI. Currently you can define
561 one master and numerous slave database connections. All write-type queries
562 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
563 database, all read-type queries (SELECTs) go to the slave database.
564
565 For every slave database you can define a priority value, which controls data
566 source usage pattern. It uses L<DBD::Multi>, so first the lower priority data
567 sources used (if they have the same priority, the are used randomized), than
568 if all low priority data sources fail, higher ones tried in order.
569
570 =head1 CONFIGURATION
571
572 Please see L<DBD::Multi> for most configuration information.
573
574 =cut
575
576 sub new {
577     my $proto = shift;
578     my $class = ref( $proto ) || $proto;
579     my $self = {};
580
581     bless( $self, $class );
582
583     $self->write_source( DBIx::Class::Storage::DBI->new );
584     $self->read_source( DBIx::Class::Storage::DBI->new );
585
586     return $self;
587 }
588
589 sub all_sources {
590     my $self = shift;
591
592     my @sources = ($self->read_source, $self->write_source);
593
594     return wantarray ? @sources : \@sources;
595 }
596
597 sub _connect_info {
598         my $self = shift;
599     my $master = $self->write_source->_connect_info;
600     $master->[-1]->{slave_connect_info} = $self->read_source->_connect_info;
601     return $master;
602 }
603
604 sub connect_info {
605         my ($self, $source_info) = @_;
606
607     ## if there is no $source_info, treat this sub like an accessor
608     return $self->_connect_info
609      if !$source_info;
610     
611     ## Alright, let's conect the master 
612     $self->write_source->connect_info($source_info);
613   
614     ## Now, build and then connect the Slaves
615     my @slaves_connect_info = @{$source_info->[-1]->{slaves_connect_info}};   
616     my $dbd_multi_config = ref $slaves_connect_info[-1] eq 'HASH' 
617         ? pop @slaves_connect_info : {};
618
619     ## We need to do this since SQL::Abstract::Limit can't guess what DBD::Multi is
620     $dbd_multi_config->{limit_dialect} = $self->write_source->sql_maker->limit_dialect
621         unless defined $dbd_multi_config->{limit_dialect};
622
623     @slaves_connect_info = map {
624         ## if the first element in the arrayhash is a ref, make that the value
625         my $db = ref $_->[0] ? $_->[0] : $_;
626         my $priority = $_->[-1]->{priority} || 10; ## default priority is 10
627         $priority => $db;
628     } @slaves_connect_info;
629     
630     $self->read_source->connect_info([ 
631         'dbi:Multi:', undef, undef, { 
632                 dsns => [@slaves_connect_info],
633                 %$dbd_multi_config,
634         },
635     ]);
636     
637     ## Return the formated connection information
638     return $self->_connect_info;
639 }
640
641 sub select {
642     shift->read_source->select( @_ );
643 }
644 sub select_single {
645     shift->read_source->select_single( @_ );
646 }
647 sub throw_exception {
648     shift->read_source->throw_exception( @_ );
649 }
650 sub sql_maker {
651     shift->read_source->sql_maker( @_ );
652 }
653 sub columns_info_for {
654     shift->read_source->columns_info_for( @_ );
655 }
656 sub sqlt_type {
657     shift->read_source->sqlt_type( @_ );
658 }
659 sub create_ddl_dir {
660     shift->read_source->create_ddl_dir( @_ );
661 }
662 sub deployment_statements {
663     shift->read_source->deployment_statements( @_ );
664 }
665 sub datetime_parser {
666     shift->read_source->datetime_parser( @_ );
667 }
668 sub datetime_parser_type {
669     shift->read_source->datetime_parser_type( @_ );
670 }
671 sub build_datetime_parser {
672     shift->read_source->build_datetime_parser( @_ );
673 }
674
675 sub limit_dialect { $_->limit_dialect( @_ ) for( shift->all_sources ) }
676 sub quote_char { $_->quote_char( @_ ) for( shift->all_sources ) }
677 sub name_sep { $_->quote_char( @_ ) for( shift->all_sources ) }
678 sub disconnect { $_->disconnect( @_ ) for( shift->all_sources ) }
679 sub set_schema { $_->set_schema( @_ ) for( shift->all_sources ) }
680
681 sub DESTROY {
682     my $self = shift;
683
684     undef $self->{write_source};
685     undef $self->{read_sources};
686 }
687
688 sub last_insert_id {
689     shift->write_source->last_insert_id( @_ );
690 }
691 sub insert {
692     shift->write_source->insert( @_ );
693 }
694 sub update {
695     shift->write_source->update( @_ );
696 }
697 sub update_all {
698     shift->write_source->update_all( @_ );
699 }
700 sub delete {
701     shift->write_source->delete( @_ );
702 }
703 sub delete_all {
704     shift->write_source->delete_all( @_ );
705 }
706 sub create {
707     shift->write_source->create( @_ );
708 }
709 sub find_or_create {
710     shift->write_source->find_or_create( @_ );
711 }
712 sub update_or_create {
713     shift->write_source->update_or_create( @_ );
714 }
715 sub connected {
716     shift->write_source->connected( @_ );
717 }
718 sub ensure_connected {
719     shift->write_source->ensure_connected( @_ );
720 }
721 sub dbh {
722     shift->write_source->dbh( @_ );
723 }
724 sub txn_do {
725     shift->write_source->txn_do( @_ );
726 }
727 sub txn_commit {
728     shift->write_source->txn_commit( @_ );
729 }
730 sub txn_rollback {
731     shift->write_source->txn_rollback( @_ );
732 }
733 sub sth {
734     shift->write_source->sth( @_ );
735 }
736 sub deploy {
737     shift->write_source->deploy( @_ );
738 }
739 sub _prep_for_execute {
740         shift->write_source->_prep_for_execute(@_);
741 }
742
743 sub debugobj {
744         shift->write_source->debugobj(@_);
745 }
746 sub debug {
747     shift->write_source->debug(@_);
748 }
749
750 sub debugfh { shift->_not_supported( 'debugfh' ) };
751 sub debugcb { shift->_not_supported( 'debugcb' ) };
752
753 sub _not_supported {
754     my( $self, $method ) = @_;
755
756     die "This Storage does not support $method method.";
757 }
758
759 =head1 SEE ALSO
760
761 L<DBI::Class::Storage::DBI>, L<DBD::Multi>, L<DBI>
762
763 =head1 AUTHOR
764
765 Norbert Csongrádi <bert@cpan.org>
766
767 Peter Siklósi <einon@einon.hu>
768
769 John Napiorkowski <john.napiorkowski@takkle.com>
770
771 =head1 LICENSE
772
773 You may distribute this code under the same terms as Perl itself.
774
775 =cut
776
777 1;