added some advice to debugging replicants so that we can see a replicant dsn, got...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 use Moose;
4 use DBIx::Class::Storage::DBI;
5 use DBIx::Class::Storage::DBI::Replicated::Pool;
6 use DBIx::Class::Storage::DBI::Replicated::Balancer;
7 use Scalar::Util qw(blessed);
8
9 extends 'DBIx::Class::Storage::DBI', 'Moose::Object';
10
11 =head1 NAME
12
13 DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
14
15 =head1 SYNOPSIS
16
17 The Following example shows how to change an existing $schema to a replicated
18 storage type, add some replicated (readonly) databases, and perform reporting
19 tasks
20
21     ## Change storage_type in your schema class
22     $schema->storage_type( '::DBI::Replicated' );
23     
24     ## Add some slaves.  Basically this is an array of arrayrefs, where each
25     ## arrayref is database connect information
26     
27     $schema->storage->create_replicants(
28         [$dsn1, $user, $pass, \%opts],
29         [$dsn1, $user, $pass, \%opts],
30         [$dsn1, $user, $pass, \%opts],
31         ## This is just going to use the standard DBIC connect method, so it
32         ## supports everything that method supports, such as connecting to an
33         ## existing database handle.
34         [$dbh],
35     );
36
37     
38 =head1 DESCRIPTION
39
40 Warning: This class is marked ALPHA.  We are using this in development and have
41 some basic test coverage but the code hasn't yet been stressed by a variety
42 of databases.  Individual DB's may have quirks we are not aware of.  Please
43 use this in development and pass along your experiences/bug fixes.
44
45 This class implements replicated data store for DBI. Currently you can define
46 one master and numerous slave database connections. All write-type queries
47 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
48 database, all read-type queries (SELECTs) go to the slave database.
49
50 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
51 handle gets delegated to one of the two attributes: L</master_storage> or to
52 L</current_replicant_storage>.  Additionally, some methods need to be distributed
53 to all existing storages.  This way our storage class is a drop in replacement
54 for L<DBIx::Class::Storage::DBI>.
55
56 Read traffic is spread across the replicants (slaves) occuring to a user
57 selected algorithm.  The default algorithm is random weighted.
58
59 TODO more details about the algorithm.
60
61 =head1 ATTRIBUTES
62
63 This class defines the following attributes.
64
65 =head2 master
66
67 The master defines the canonical state for a pool of connected databases.  All
68 the replicants are expected to match this databases state.  Thus, in a classic
69 Master / Slaves distributed system, all the slaves are expected to replicate
70 the Master's state as quick as possible.  This is the only database in the
71 pool of databases that is allowed to handle write traffic.
72
73 =cut
74
75 has 'master' => (
76     is=> 'ro',
77     isa=>'DBIx::Class::Storage::DBI',
78     lazy_build=>1,
79     handles=>[qw/   
80         on_connect_do
81         on_disconnect_do       
82         connect_info
83         throw_exception
84         sql_maker
85         sqlt_type
86         create_ddl_dir
87         deployment_statements
88         datetime_parser
89         datetime_parser_type        
90         last_insert_id
91         insert
92         insert_bulk
93         update
94         delete
95         dbh
96         txn_do
97         txn_commit
98         txn_rollback
99         sth
100         deploy
101         schema
102     /],
103 );
104
105
106 =head2 current_replicant
107
108 Replicant storages (slaves) handle all read only traffic.  The assumption is
109 that your database will become readbound well before it becomes write bound
110 and that being able to spread your read only traffic around to multiple 
111 databases is going to help you to scale traffic.
112
113 This attribute returns the next slave to handle a read request.  Your L</pool>
114 attribute has methods to help you shuffle through all the available replicants
115 via it's balancer object.
116
117 This attribute defines the following reader/writer methods
118
119 =over 4
120
121 =item get_current_replicant
122
123 Returns the contained L<DBIx::Class::Storage::DBI> replicant
124
125 =item set_current_replicant
126
127 Set the attribute to a given L<DBIx::Class::Storage::DBI> (or subclass) object.
128
129 =back
130
131 We split the reader/writer to make it easier to selectively override how the
132 replicant is altered.
133
134 =cut
135
136 has 'current_replicant' => (
137     is=> 'rw',
138     reader=>'get_current_replicant',
139     writer=>'set_current_replicant',
140     isa=>'DBIx::Class::Storage::DBI',
141     lazy_build=>1,
142     handles=>[qw/
143         select
144         select_single
145         columns_info_for
146     /],
147     trigger=>sub {'xxxxxxxxxxxxxxxxxxxxxxxxxxx'},
148 );
149
150
151 =head2 pool_type
152
153 Contains the classname which will instantiate the L</pool> object.  Defaults 
154 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
155
156 =cut
157
158 has 'pool_type' => (
159     is=>'ro',
160     isa=>'ClassName',
161     required=>1,
162     lazy=>1,
163     default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
164     handles=>{
165         'create_pool' => 'new',
166     },
167 );
168
169
170 =head2 balancer_type
171
172 The replication pool requires a balance class to provider the methods for
173 choose how to spread the query load across each replicant in the pool.
174
175 =cut
176
177 has 'balancer_type' => (
178     is=>'ro',
179     isa=>'ClassName',
180     required=>1,
181     lazy=>1,
182     default=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
183     handles=>{
184         'create_balancer' => 'new',
185     },
186 );
187
188
189 =head2 pool
190
191 Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
192 container class for one or more replicated databases.
193
194 =cut
195
196 has 'pool' => (
197     is=>'ro',
198     isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
199     lazy_build=>1,
200     handles=>[qw/
201         replicants
202         has_replicants
203         create_replicants
204         num_replicants
205         delete_replicant
206     /],
207 );
208
209
210 =head2 balancer
211
212 Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This 
213 is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
214
215 =cut
216
217 has 'balancer' => (
218     is=>'ro',
219     isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
220     lazy_build=>1,
221     handles=>[qw/next_storage/],
222 );
223
224 =head1 METHODS
225
226 This class defines the following methods.
227
228 =head2 _build_master
229
230 Lazy builder for the L</master> attribute.
231
232 =cut
233
234 sub _build_master {
235         DBIx::Class::Storage::DBI->new;
236 }
237
238
239 =head2 _build_current_replicant
240
241 Lazy builder for the L</current_replicant_storage> attribute.
242
243 =cut
244
245 sub _build_current_replicant {
246         my $self = shift @_;
247         $self->next_storage($self->pool);
248 }
249
250
251 =head2 _build_pool
252
253 Lazy builder for the L</pool> attribute.
254
255 =cut
256
257 sub _build_pool {
258     my $self = shift @_;
259     $self->create_pool;
260 }
261
262
263 =head2 _build_balancer
264
265 Lazy builder for the L</balancer> attribute.
266
267 =cut
268
269 sub _build_balancer {
270     my $self = shift @_;
271     $self->create_balancer;
272 }
273
274
275 =head2 around: create_replicants
276
277 All calls to create_replicants needs to have an existing $schema tacked onto
278 top of the args
279
280 =cut
281
282 around 'create_replicants' => sub {
283         my ($method, $self, @args) = @_;
284         $self->$method($self->schema, @args);
285 };
286
287
288 =head2 after: get_current_replicant_storage
289
290 Advice on the current_replicant_storage attribute.  Each time we use a replicant
291 we need to change it via the storage pool algorithm.  That way we are spreading
292 the load evenly (hopefully) across existing capacity.
293
294 =cut
295
296 after 'current_replicant' => sub {
297     my $self = shift @_;
298     my $next_replicant = $self->next_storage($self->pool);
299
300 warn '......................';
301     $self->set_current_replicant($next_replicant);
302 };
303
304
305 =head2 all_storages
306
307 Returns an array of of all the connected storage backends.  The first element
308 in the returned array is the master, and the remainings are each of the
309 replicants.
310
311 =cut
312
313 sub all_storages {
314         my $self = shift @_;
315         
316         return grep {defined $_ && blessed $_} (
317            $self->master,
318            $self->replicants,
319         );
320 }
321
322
323 =head2 connected
324
325 Check that the master and at least one of the replicants is connected.
326
327 =cut
328
329 sub connected {
330         my $self = shift @_;
331         
332         return
333            $self->master->connected &&
334            $self->pool->connected_replicants;
335 }
336
337
338 =head2 ensure_connected
339
340 Make sure all the storages are connected.
341
342 =cut
343
344 sub ensure_connected {
345     my $self = shift @_;
346     foreach my $source ($self->all_storages) {
347         $source->ensure_connected(@_);
348     }
349 }
350
351
352 =head2 limit_dialect
353
354 Set the limit_dialect for all existing storages
355
356 =cut
357
358 sub limit_dialect {
359     my $self = shift @_;
360     foreach my $source ($self->all_storages) {
361         $source->limit_dialect(@_);
362     }
363 }
364
365
366 =head2 quote_char
367
368 Set the quote_char for all existing storages
369
370 =cut
371
372 sub quote_char {
373     my $self = shift @_;
374     foreach my $source ($self->all_storages) {
375         $source->quote_char(@_);
376     }
377 }
378
379
380 =head2 name_sep
381
382 Set the name_sep for all existing storages
383
384 =cut
385
386 sub name_sep {
387     my $self = shift @_;
388     foreach my $source ($self->all_storages) {
389         $source->name_sep(@_);
390     }
391 }
392
393
394 =head2 set_schema
395
396 Set the schema object for all existing storages
397
398 =cut
399
400 sub set_schema {
401         my $self = shift @_;
402         foreach my $source ($self->all_storages) {
403                 $source->set_schema(@_);
404         }
405 }
406
407
408 =head2 debug
409
410 set a debug flag across all storages
411
412 =cut
413
414 sub debug {
415     my $self = shift @_;
416     foreach my $source ($self->all_storages) {
417         $source->debug(@_);
418     }
419 }
420
421
422 =head2 debugobj
423
424 set a debug object across all storages
425
426 =cut
427
428 sub debugobj {
429     my $self = shift @_;
430     foreach my $source ($self->all_storages) {
431         $source->debugobj(@_);
432     }
433 }
434
435
436 =head2 debugfh
437
438 set a debugfh object across all storages
439
440 =cut
441
442 sub debugfh {
443     my $self = shift @_;
444     foreach my $source ($self->all_storages) {
445         $source->debugfh(@_);
446     }
447 }
448
449
450 =head2 debugcb
451
452 set a debug callback across all storages
453
454 =cut
455
456 sub debugcb {
457     my $self = shift @_;
458     foreach my $source ($self->all_storages) {
459         $source->debugcb(@_);
460     }
461 }
462
463
464 =head2 disconnect
465
466 disconnect everything
467
468 =cut
469
470 sub disconnect {
471     my $self = shift @_;
472     foreach my $source ($self->all_storages) {
473         $source->disconnect(@_);
474     }
475 }
476
477
478 =head2 DESTROY
479
480 Make sure we pass destroy events down to the storage handlers
481
482 =cut
483
484 sub DESTROY {
485     my $self = shift;
486     ## TODO, maybe we can just leave this alone ???
487 }
488
489
490 =head1 AUTHOR
491
492 Norbert Csongrádi <bert@cpan.org>
493
494 Peter Siklósi <einon@einon.hu>
495
496 John Napiorkowski <john.napiorkowski@takkle.com>
497
498 =head1 LICENSE
499
500 You may distribute this code under the same terms as Perl itself.
501
502 =cut
503
504 1;
505
506 __END__
507
508 use strict;
509 use warnings;
510
511 use DBIx::Class::Storage::DBI;
512 use DBD::Multi;
513
514 use base qw/Class::Accessor::Fast/;
515
516 __PACKAGE__->mk_accessors( qw/read_source write_source/ );
517
518 =head1 NAME
519
520 DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
521
522 =head1 SYNOPSIS
523
524 The Following example shows how to change an existing $schema to a replicated
525 storage type and update it's connection information to contain a master DSN and
526 an array of slaves.
527
528     ## Change storage_type in your schema class
529     $schema->storage_type( '::DBI::Replicated' );
530     
531     ## Set your connection.
532     $schema->connect(
533         $dsn, $user, $password, {
534                 AutoCommit => 1,
535                 ## Other standard DBI connection or DBD custom attributes added as
536                 ## usual.  Additionally, we have two custom attributes for defining
537                 ## slave information and controlling how the underlying DBD::Multi
538                 slaves_connect_info => [
539                    ## Define each slave like a 'normal' DBI connection, but you add
540                    ## in a DBD::Multi custom attribute to define how the slave is
541                    ## prioritized.  Please see DBD::Multi for more.
542                    [$slave1dsn, $user, $password, {%slave1opts, priority=>10}],
543                [$slave2dsn, $user, $password, {%slave2opts, priority=>10}],
544                [$slave3dsn, $user, $password, {%slave3opts, priority=>20}],
545                ## add in a preexisting database handle
546                [$dbh, '','', {priority=>30}], 
547                ## DBD::Multi will call this coderef for connects 
548                [sub {  DBI->connect(< DSN info >) }, '', '', {priority=>40}],  
549                ## If the last item is hashref, we use that for DBD::Multi's 
550                ## configuration information.  Again, see DBD::Multi for more.
551                {timeout=>25, failed_max=>2},               
552                 ],
553         },
554     );
555     
556     ## Now, just use the schema as normal
557     $schema->resultset('Table')->find(< unique >); ## Reads will use slaves
558     $schema->resultset('Table')->create(\%info); ## Writes will use master
559
560 =head1 DESCRIPTION
561
562 Warning: This class is marked ALPHA.  We are using this in development and have
563 some basic test coverage but the code hasn't yet been stressed by a variety
564 of databases.  Individual DB's may have quirks we are not aware of.  Please
565 use this in development and pass along your experiences/bug fixes.
566
567 This class implements replicated data store for DBI. Currently you can define
568 one master and numerous slave database connections. All write-type queries
569 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
570 database, all read-type queries (SELECTs) go to the slave database.
571
572 For every slave database you can define a priority value, which controls data
573 source usage pattern. It uses L<DBD::Multi>, so first the lower priority data
574 sources used (if they have the same priority, the are used randomized), than
575 if all low priority data sources fail, higher ones tried in order.
576
577 =head1 CONFIGURATION
578
579 Please see L<DBD::Multi> for most configuration information.
580
581 =cut
582
583 sub new {
584     my $proto = shift;
585     my $class = ref( $proto ) || $proto;
586     my $self = {};
587
588     bless( $self, $class );
589
590     $self->write_source( DBIx::Class::Storage::DBI->new );
591     $self->read_source( DBIx::Class::Storage::DBI->new );
592
593     return $self;
594 }
595
596 sub all_sources {
597     my $self = shift;
598
599     my @sources = ($self->read_source, $self->write_source);
600
601     return wantarray ? @sources : \@sources;
602 }
603
604 sub _connect_info {
605         my $self = shift;
606     my $master = $self->write_source->_connect_info;
607     $master->[-1]->{slave_connect_info} = $self->read_source->_connect_info;
608     return $master;
609 }
610
611 sub connect_info {
612         my ($self, $source_info) = @_;
613
614     ## if there is no $source_info, treat this sub like an accessor
615     return $self->_connect_info
616      if !$source_info;
617     
618     ## Alright, let's conect the master 
619     $self->write_source->connect_info($source_info);
620   
621     ## Now, build and then connect the Slaves
622     my @slaves_connect_info = @{$source_info->[-1]->{slaves_connect_info}};   
623     my $dbd_multi_config = ref $slaves_connect_info[-1] eq 'HASH' 
624         ? pop @slaves_connect_info : {};
625
626     ## We need to do this since SQL::Abstract::Limit can't guess what DBD::Multi is
627     $dbd_multi_config->{limit_dialect} = $self->write_source->sql_maker->limit_dialect
628         unless defined $dbd_multi_config->{limit_dialect};
629
630     @slaves_connect_info = map {
631         ## if the first element in the arrayhash is a ref, make that the value
632         my $db = ref $_->[0] ? $_->[0] : $_;
633         my $priority = $_->[-1]->{priority} || 10; ## default priority is 10
634         $priority => $db;
635     } @slaves_connect_info;
636     
637     $self->read_source->connect_info([ 
638         'dbi:Multi:', undef, undef, { 
639                 dsns => [@slaves_connect_info],
640                 %$dbd_multi_config,
641         },
642     ]);
643     
644     ## Return the formated connection information
645     return $self->_connect_info;
646 }
647
648 sub select {
649     shift->read_source->select( @_ );
650 }
651 sub select_single {
652     shift->read_source->select_single( @_ );
653 }
654 sub throw_exception {
655     shift->read_source->throw_exception( @_ );
656 }
657 sub sql_maker {
658     shift->read_source->sql_maker( @_ );
659 }
660 sub columns_info_for {
661     shift->read_source->columns_info_for( @_ );
662 }
663 sub sqlt_type {
664     shift->read_source->sqlt_type( @_ );
665 }
666 sub create_ddl_dir {
667     shift->read_source->create_ddl_dir( @_ );
668 }
669 sub deployment_statements {
670     shift->read_source->deployment_statements( @_ );
671 }
672 sub datetime_parser {
673     shift->read_source->datetime_parser( @_ );
674 }
675 sub datetime_parser_type {
676     shift->read_source->datetime_parser_type( @_ );
677 }
678 sub build_datetime_parser {
679     shift->read_source->build_datetime_parser( @_ );
680 }
681
682 sub limit_dialect { $_->limit_dialect( @_ ) for( shift->all_sources ) }
683 sub quote_char { $_->quote_char( @_ ) for( shift->all_sources ) }
684 sub name_sep { $_->quote_char( @_ ) for( shift->all_sources ) }
685 sub disconnect { $_->disconnect( @_ ) for( shift->all_sources ) }
686 sub set_schema { $_->set_schema( @_ ) for( shift->all_sources ) }
687
688 sub DESTROY {
689     my $self = shift;
690
691     undef $self->{write_source};
692     undef $self->{read_sources};
693 }
694
695 sub last_insert_id {
696     shift->write_source->last_insert_id( @_ );
697 }
698 sub insert {
699     shift->write_source->insert( @_ );
700 }
701 sub update {
702     shift->write_source->update( @_ );
703 }
704 sub update_all {
705     shift->write_source->update_all( @_ );
706 }
707 sub delete {
708     shift->write_source->delete( @_ );
709 }
710 sub delete_all {
711     shift->write_source->delete_all( @_ );
712 }
713 sub create {
714     shift->write_source->create( @_ );
715 }
716 sub find_or_create {
717     shift->write_source->find_or_create( @_ );
718 }
719 sub update_or_create {
720     shift->write_source->update_or_create( @_ );
721 }
722 sub connected {
723     shift->write_source->connected( @_ );
724 }
725 sub ensure_connected {
726     shift->write_source->ensure_connected( @_ );
727 }
728 sub dbh {
729     shift->write_source->dbh( @_ );
730 }
731 sub txn_do {
732     shift->write_source->txn_do( @_ );
733 }
734 sub txn_commit {
735     shift->write_source->txn_commit( @_ );
736 }
737 sub txn_rollback {
738     shift->write_source->txn_rollback( @_ );
739 }
740 sub sth {
741     shift->write_source->sth( @_ );
742 }
743 sub deploy {
744     shift->write_source->deploy( @_ );
745 }
746 sub _prep_for_execute {
747         shift->write_source->_prep_for_execute(@_);
748 }
749
750 sub debugobj {
751         shift->write_source->debugobj(@_);
752 }
753 sub debug {
754     shift->write_source->debug(@_);
755 }
756
757 sub debugfh { shift->_not_supported( 'debugfh' ) };
758 sub debugcb { shift->_not_supported( 'debugcb' ) };
759
760 sub _not_supported {
761     my( $self, $method ) = @_;
762
763     die "This Storage does not support $method method.";
764 }
765
766 =head1 SEE ALSO
767
768 L<DBI::Class::Storage::DBI>, L<DBD::Multi>, L<DBI>
769
770 =head1 AUTHOR
771
772 Norbert Csongrádi <bert@cpan.org>
773
774 Peter Siklósi <einon@einon.hu>
775
776 John Napiorkowski <john.napiorkowski@takkle.com>
777
778 =head1 LICENSE
779
780 You may distribute this code under the same terms as Perl itself.
781
782 =cut
783
784 1;