cleanup of some docs, got the default shuffling balancer to work properly. Don't...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 use Moose;
4 use DBIx::Class::Storage::DBI;
5 use DBIx::Class::Storage::DBI::Replicated::Pool;
6 use DBIx::Class::Storage::DBI::Replicated::Balancer;
7 use Scalar::Util qw(blessed);
8
9 extends 'DBIx::Class::Storage::DBI', 'Moose::Object';
10
11 =head1 NAME
12
13 DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
14
15 =head1 SYNOPSIS
16
17 The Following example shows how to change an existing $schema to a replicated
18 storage type, add some replicated (readonly) databases, and perform reporting
19 tasks.
20
21     ## Change storage_type in your schema class
22     $schema->storage_type( '::DBI::Replicated' );
23     
24     ## Add some slaves.  Basically this is an array of arrayrefs, where each
25     ## arrayref is database connect information
26     
27     $schema->storage->connect_replicants(
28         [$dsn1, $user, $pass, \%opts],
29         [$dsn1, $user, $pass, \%opts],
30         [$dsn1, $user, $pass, \%opts],
31     );
32     
33 =head1 DESCRIPTION
34
35 Warning: This class is marked ALPHA.  We are using this in development and have
36 some basic test coverage but the code hasn't yet been stressed by a variety
37 of databases.  Individual DB's may have quirks we are not aware of.  Please
38 use this in development and pass along your experiences/bug fixes.
39
40 This class implements replicated data store for DBI. Currently you can define
41 one master and numerous slave database connections. All write-type queries
42 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
43 database, all read-type queries (SELECTs) go to the slave database.
44
45 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
46 handle gets delegated to one of the two attributes: L</master_storage> or to
47 L</current_replicant_storage>.  Additionally, some methods need to be distributed
48 to all existing storages.  This way our storage class is a drop in replacement
49 for L<DBIx::Class::Storage::DBI>.
50
51 Read traffic is spread across the replicants (slaves) occuring to a user
52 selected algorithm.  The default algorithm is random weighted.
53
54 TODO more details about the algorithm.
55
56 =head1 ATTRIBUTES
57
58 This class defines the following attributes.
59
60 =head2 master
61
62 The master defines the canonical state for a pool of connected databases.  All
63 the replicants are expected to match this databases state.  Thus, in a classic
64 Master / Slaves distributed system, all the slaves are expected to replicate
65 the Master's state as quick as possible.  This is the only database in the
66 pool of databases that is allowed to handle write traffic.
67
68 =cut
69
70 has 'master' => (
71     is=> 'ro',
72     isa=>'DBIx::Class::Storage::DBI',
73     lazy_build=>1,
74     handles=>[qw/   
75         on_connect_do
76         on_disconnect_do       
77         connect_info
78         throw_exception
79         sql_maker
80         sqlt_type
81         create_ddl_dir
82         deployment_statements
83         datetime_parser
84         datetime_parser_type        
85         last_insert_id
86         insert
87         insert_bulk
88         update
89         delete
90         dbh
91         txn_do
92         txn_commit
93         txn_rollback
94         sth
95         deploy
96         schema
97     /],
98 );
99
100
101 =head2 current_replicant
102
103 Replicant storages (slaves) handle all read only traffic.  The assumption is
104 that your database will become readbound well before it becomes write bound
105 and that being able to spread your read only traffic around to multiple 
106 databases is going to help you to scale traffic.
107
108 This attribute returns the next slave to handle a read request.  Your L</pool>
109 attribute has methods to help you shuffle through all the available replicants
110 via it's balancer object.
111
112 We split the reader/writer to make it easier to selectively override how the
113 replicant is altered.
114
115 =cut
116
117 has 'current_replicant' => (
118     is=> 'rw',
119     isa=>'DBIx::Class::Storage::DBI',
120     lazy_build=>1,
121     handles=>[qw/
122         select
123         select_single
124         columns_info_for
125     /],
126 );
127
128
129 =head2 pool_type
130
131 Contains the classname which will instantiate the L</pool> object.  Defaults 
132 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
133
134 =cut
135
136 has 'pool_type' => (
137     is=>'ro',
138     isa=>'ClassName',
139     required=>1,
140     lazy=>1,
141     default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
142     handles=>{
143         'create_pool' => 'new',
144     },
145 );
146
147
148 =head2 balancer_type
149
150 The replication pool requires a balance class to provider the methods for
151 choose how to spread the query load across each replicant in the pool.
152
153 =cut
154
155 has 'balancer_type' => (
156     is=>'ro',
157     isa=>'ClassName',
158     required=>1,
159     lazy=>1,
160     default=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
161     handles=>{
162         'create_balancer' => 'new',
163     },
164 );
165
166
167 =head2 pool
168
169 Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
170 container class for one or more replicated databases.
171
172 =cut
173
174 has 'pool' => (
175     is=>'ro',
176     isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
177     lazy_build=>1,
178     handles=>[qw/
179         replicants
180         has_replicants
181         connect_replicants
182         num_replicants
183         delete_replicant
184     /],
185 );
186
187
188 =head2 balancer
189
190 Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This 
191 is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
192
193 =cut
194
195 has 'balancer' => (
196     is=>'ro',
197     isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
198     lazy_build=>1,
199     handles=>[qw/next_storage/],
200 );
201
202 =head1 METHODS
203
204 This class defines the following methods.
205
206 =head2 _build_master
207
208 Lazy builder for the L</master> attribute.
209
210 =cut
211
212 sub _build_master {
213         DBIx::Class::Storage::DBI->new;
214 }
215
216
217 =head2 _build_current_replicant
218
219 Lazy builder for the L</current_replicant_storage> attribute.
220
221 =cut
222
223 sub _build_current_replicant {
224         my $self = shift @_;
225         $self->next_storage($self->pool);
226 }
227
228
229 =head2 _build_pool
230
231 Lazy builder for the L</pool> attribute.
232
233 =cut
234
235 sub _build_pool {
236     my $self = shift @_;
237     $self->create_pool;
238 }
239
240
241 =head2 _build_balancer
242
243 Lazy builder for the L</balancer> attribute.
244
245 =cut
246
247 sub _build_balancer {
248     my $self = shift @_;
249     $self->create_balancer;
250 }
251
252
253 =head2 around: create_replicants
254
255 All calls to create_replicants needs to have an existing $schema tacked onto
256 top of the args
257
258 =cut
259
260 around 'connect_replicants' => sub {
261         my ($method, $self, @args) = @_;
262         $self->$method($self->schema, @args);
263 };
264
265
266 =head2 after: select, select_single, columns_info_for
267
268 Advice on the current_replicant_storage attribute.  Each time we use a replicant
269 we need to change it via the storage pool algorithm.  That way we are spreading
270 the load evenly (hopefully) across existing capacity.
271
272 =cut
273
274 after 'select' => sub {
275     my $self = shift @_;
276     my $next_replicant = $self->next_storage($self->pool);
277
278     $self->current_replicant($next_replicant);
279 };
280
281 after 'select_single' => sub {
282     my $self = shift @_;
283     my $next_replicant = $self->next_storage($self->pool);
284
285     $self->current_replicant($next_replicant);
286 };
287
288 after 'columns_info_for' => sub {
289     my $self = shift @_;
290     my $next_replicant = $self->next_storage($self->pool);
291
292     $self->current_replicant($next_replicant);
293 };
294
295 =head2 all_storages
296
297 Returns an array of of all the connected storage backends.  The first element
298 in the returned array is the master, and the remainings are each of the
299 replicants.
300
301 =cut
302
303 sub all_storages {
304         my $self = shift @_;
305         
306         return grep {defined $_ && blessed $_} (
307            $self->master,
308            $self->replicants,
309         );
310 }
311
312
313 =head2 connected
314
315 Check that the master and at least one of the replicants is connected.
316
317 =cut
318
319 sub connected {
320         my $self = shift @_;
321         
322         return
323            $self->master->connected &&
324            $self->pool->connected_replicants;
325 }
326
327
328 =head2 ensure_connected
329
330 Make sure all the storages are connected.
331
332 =cut
333
334 sub ensure_connected {
335     my $self = shift @_;
336     foreach my $source ($self->all_storages) {
337         $source->ensure_connected(@_);
338     }
339 }
340
341
342 =head2 limit_dialect
343
344 Set the limit_dialect for all existing storages
345
346 =cut
347
348 sub limit_dialect {
349     my $self = shift @_;
350     foreach my $source ($self->all_storages) {
351         $source->limit_dialect(@_);
352     }
353 }
354
355
356 =head2 quote_char
357
358 Set the quote_char for all existing storages
359
360 =cut
361
362 sub quote_char {
363     my $self = shift @_;
364     foreach my $source ($self->all_storages) {
365         $source->quote_char(@_);
366     }
367 }
368
369
370 =head2 name_sep
371
372 Set the name_sep for all existing storages
373
374 =cut
375
376 sub name_sep {
377     my $self = shift @_;
378     foreach my $source ($self->all_storages) {
379         $source->name_sep(@_);
380     }
381 }
382
383
384 =head2 set_schema
385
386 Set the schema object for all existing storages
387
388 =cut
389
390 sub set_schema {
391         my $self = shift @_;
392         foreach my $source ($self->all_storages) {
393                 $source->set_schema(@_);
394         }
395 }
396
397
398 =head2 debug
399
400 set a debug flag across all storages
401
402 =cut
403
404 sub debug {
405     my $self = shift @_;
406     foreach my $source ($self->all_storages) {
407         $source->debug(@_);
408     }
409 }
410
411
412 =head2 debugobj
413
414 set a debug object across all storages
415
416 =cut
417
418 sub debugobj {
419     my $self = shift @_;
420     foreach my $source ($self->all_storages) {
421         $source->debugobj(@_);
422     }
423 }
424
425
426 =head2 debugfh
427
428 set a debugfh object across all storages
429
430 =cut
431
432 sub debugfh {
433     my $self = shift @_;
434     foreach my $source ($self->all_storages) {
435         $source->debugfh(@_);
436     }
437 }
438
439
440 =head2 debugcb
441
442 set a debug callback across all storages
443
444 =cut
445
446 sub debugcb {
447     my $self = shift @_;
448     foreach my $source ($self->all_storages) {
449         $source->debugcb(@_);
450     }
451 }
452
453
454 =head2 disconnect
455
456 disconnect everything
457
458 =cut
459
460 sub disconnect {
461     my $self = shift @_;
462     foreach my $source ($self->all_storages) {
463         $source->disconnect(@_);
464     }
465 }
466
467
468 =head2 DESTROY
469
470 Make sure we pass destroy events down to the storage handlers
471
472 =cut
473
474 sub DESTROY {
475     my $self = shift;
476     ## TODO, maybe we can just leave this alone ???
477 }
478
479
480 =head1 AUTHOR
481
482 Norbert Csongrádi <bert@cpan.org>
483
484 Peter Siklósi <einon@einon.hu>
485
486 John Napiorkowski <john.napiorkowski@takkle.com>
487
488 =head1 LICENSE
489
490 You may distribute this code under the same terms as Perl itself.
491
492 =cut
493
494 1;
495
496 __END__
497
498 use strict;
499 use warnings;
500
501 use DBIx::Class::Storage::DBI;
502 use DBD::Multi;
503
504 use base qw/Class::Accessor::Fast/;
505
506 __PACKAGE__->mk_accessors( qw/read_source write_source/ );
507
508 =head1 NAME
509
510 DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
511
512 =head1 SYNOPSIS
513
514 The Following example shows how to change an existing $schema to a replicated
515 storage type and update it's connection information to contain a master DSN and
516 an array of slaves.
517
518     ## Change storage_type in your schema class
519     $schema->storage_type( '::DBI::Replicated' );
520     
521     ## Set your connection.
522     $schema->connect(
523         $dsn, $user, $password, {
524                 AutoCommit => 1,
525                 ## Other standard DBI connection or DBD custom attributes added as
526                 ## usual.  Additionally, we have two custom attributes for defining
527                 ## slave information and controlling how the underlying DBD::Multi
528                 connect_replicants => [
529                    ## Define each slave like a 'normal' DBI connection, but you add
530                    ## in a DBD::Multi custom attribute to define how the slave is
531                    ## prioritized.  Please see DBD::Multi for more.
532                    [$slave1dsn, $user, $password, {%slave1opts}],
533                [$slave2dsn, $user, $password, {%slave2opts}],
534                [$slave3dsn, $user, $password, {%slave3opts}],
535                 ],
536         },
537     );
538     
539     ## Now, just use the schema as normal
540     $schema->resultset('Table')->find(< unique >); ## Reads will use slaves
541     $schema->resultset('Table')->create(\%info); ## Writes will use master
542
543 =head1 DESCRIPTION
544
545 Warning: This class is marked ALPHA.  We are using this in development and have
546 some basic test coverage but the code hasn't yet been stressed by a variety
547 of databases.  Individual DB's may have quirks we are not aware of.  Please
548 use this in development and pass along your experiences/bug fixes.
549
550 This class implements replicated data store for DBI. Currently you can define
551 one master and numerous slave database connections. All write-type queries
552 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
553 database, all read-type queries (SELECTs) go to the slave database.
554
555 For every slave database you can define a priority value, which controls data
556 source usage pattern. It uses L<DBD::Multi>, so first the lower priority data
557 sources used (if they have the same priority, the are used randomized), than
558 if all low priority data sources fail, higher ones tried in order.
559
560 =head1 CONFIGURATION
561
562 Please see L<DBD::Multi> for most configuration information.
563
564 =cut
565
566 sub new {
567     my $proto = shift;
568     my $class = ref( $proto ) || $proto;
569     my $self = {};
570
571     bless( $self, $class );
572
573     $self->write_source( DBIx::Class::Storage::DBI->new );
574     $self->read_source( DBIx::Class::Storage::DBI->new );
575
576     return $self;
577 }
578
579 sub all_sources {
580     my $self = shift;
581
582     my @sources = ($self->read_source, $self->write_source);
583
584     return wantarray ? @sources : \@sources;
585 }
586
587 sub _connect_info {
588         my $self = shift;
589     my $master = $self->write_source->_connect_info;
590     $master->[-1]->{slave_connect_info} = $self->read_source->_connect_info;
591     return $master;
592 }
593
594 sub connect_info {
595         my ($self, $source_info) = @_;
596
597     ## if there is no $source_info, treat this sub like an accessor
598     return $self->_connect_info
599      if !$source_info;
600     
601     ## Alright, let's conect the master 
602     $self->write_source->connect_info($source_info);
603   
604     ## Now, build and then connect the Slaves
605     my @slaves_connect_info = @{$source_info->[-1]->{slaves_connect_info}};   
606     my $dbd_multi_config = ref $slaves_connect_info[-1] eq 'HASH' 
607         ? pop @slaves_connect_info : {};
608
609     ## We need to do this since SQL::Abstract::Limit can't guess what DBD::Multi is
610     $dbd_multi_config->{limit_dialect} = $self->write_source->sql_maker->limit_dialect
611         unless defined $dbd_multi_config->{limit_dialect};
612
613     @slaves_connect_info = map {
614         ## if the first element in the arrayhash is a ref, make that the value
615         my $db = ref $_->[0] ? $_->[0] : $_;
616         my $priority = $_->[-1]->{priority} || 10; ## default priority is 10
617         $priority => $db;
618     } @slaves_connect_info;
619     
620     $self->read_source->connect_info([ 
621         'dbi:Multi:', undef, undef, { 
622                 dsns => [@slaves_connect_info],
623                 %$dbd_multi_config,
624         },
625     ]);
626     
627     ## Return the formated connection information
628     return $self->_connect_info;
629 }
630
631 sub select {
632     shift->read_source->select( @_ );
633 }
634 sub select_single {
635     shift->read_source->select_single( @_ );
636 }
637 sub throw_exception {
638     shift->read_source->throw_exception( @_ );
639 }
640 sub sql_maker {
641     shift->read_source->sql_maker( @_ );
642 }
643 sub columns_info_for {
644     shift->read_source->columns_info_for( @_ );
645 }
646 sub sqlt_type {
647     shift->read_source->sqlt_type( @_ );
648 }
649 sub create_ddl_dir {
650     shift->read_source->create_ddl_dir( @_ );
651 }
652 sub deployment_statements {
653     shift->read_source->deployment_statements( @_ );
654 }
655 sub datetime_parser {
656     shift->read_source->datetime_parser( @_ );
657 }
658 sub datetime_parser_type {
659     shift->read_source->datetime_parser_type( @_ );
660 }
661 sub build_datetime_parser {
662     shift->read_source->build_datetime_parser( @_ );
663 }
664
665 sub limit_dialect { $_->limit_dialect( @_ ) for( shift->all_sources ) }
666 sub quote_char { $_->quote_char( @_ ) for( shift->all_sources ) }
667 sub name_sep { $_->quote_char( @_ ) for( shift->all_sources ) }
668 sub disconnect { $_->disconnect( @_ ) for( shift->all_sources ) }
669 sub set_schema { $_->set_schema( @_ ) for( shift->all_sources ) }
670
671 sub DESTROY {
672     my $self = shift;
673
674     undef $self->{write_source};
675     undef $self->{read_sources};
676 }
677
678 sub last_insert_id {
679     shift->write_source->last_insert_id( @_ );
680 }
681 sub insert {
682     shift->write_source->insert( @_ );
683 }
684 sub update {
685     shift->write_source->update( @_ );
686 }
687 sub update_all {
688     shift->write_source->update_all( @_ );
689 }
690 sub delete {
691     shift->write_source->delete( @_ );
692 }
693 sub delete_all {
694     shift->write_source->delete_all( @_ );
695 }
696 sub create {
697     shift->write_source->create( @_ );
698 }
699 sub find_or_create {
700     shift->write_source->find_or_create( @_ );
701 }
702 sub update_or_create {
703     shift->write_source->update_or_create( @_ );
704 }
705 sub connected {
706     shift->write_source->connected( @_ );
707 }
708 sub ensure_connected {
709     shift->write_source->ensure_connected( @_ );
710 }
711 sub dbh {
712     shift->write_source->dbh( @_ );
713 }
714 sub txn_do {
715     shift->write_source->txn_do( @_ );
716 }
717 sub txn_commit {
718     shift->write_source->txn_commit( @_ );
719 }
720 sub txn_rollback {
721     shift->write_source->txn_rollback( @_ );
722 }
723 sub sth {
724     shift->write_source->sth( @_ );
725 }
726 sub deploy {
727     shift->write_source->deploy( @_ );
728 }
729 sub _prep_for_execute {
730         shift->write_source->_prep_for_execute(@_);
731 }
732
733 sub debugobj {
734         shift->write_source->debugobj(@_);
735 }
736 sub debug {
737     shift->write_source->debug(@_);
738 }
739
740 sub debugfh { shift->_not_supported( 'debugfh' ) };
741 sub debugcb { shift->_not_supported( 'debugcb' ) };
742
743 sub _not_supported {
744     my( $self, $method ) = @_;
745
746     die "This Storage does not support $method method.";
747 }
748
749 =head1 SEE ALSO
750
751 L<DBI::Class::Storage::DBI>, L<DBD::Multi>, L<DBI>
752
753 =head1 AUTHOR
754
755 Norbert Csongrádi <bert@cpan.org>
756
757 Peter Siklósi <einon@einon.hu>
758
759 John Napiorkowski <john.napiorkowski@takkle.com>
760
761 =head1 LICENSE
762
763 You may distribute this code under the same terms as Perl itself.
764
765 =cut
766
767 1;