added some advice to debugging replicants so that we can see a replicant dsn, got...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
CommitLineData
2156bbdd 1package DBIx::Class::Storage::DBI::Replicated;
f5d3a5de 2
2bf79155 3use Moose;
26ab719a 4use DBIx::Class::Storage::DBI;
2bf79155 5use DBIx::Class::Storage::DBI::Replicated::Pool;
26ab719a 6use DBIx::Class::Storage::DBI::Replicated::Balancer;
7use Scalar::Util qw(blessed);
2bf79155 8
26ab719a 9extends 'DBIx::Class::Storage::DBI', 'Moose::Object';
2bf79155 10
11=head1 NAME
12
13DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
14
15=head1 SYNOPSIS
16
17The Following example shows how to change an existing $schema to a replicated
18storage type, add some replicated (readonly) databases, and perform reporting
19tasks
20
21 ## Change storage_type in your schema class
22 $schema->storage_type( '::DBI::Replicated' );
23
24 ## Add some slaves. Basically this is an array of arrayrefs, where each
25 ## arrayref is database connect information
26
27 $schema->storage->create_replicants(
28 [$dsn1, $user, $pass, \%opts],
29 [$dsn1, $user, $pass, \%opts],
30 [$dsn1, $user, $pass, \%opts],
31 ## This is just going to use the standard DBIC connect method, so it
32 ## supports everything that method supports, such as connecting to an
33 ## existing database handle.
34 [$dbh],
2bf79155 35 );
50336325 36
2bf79155 37
38=head1 DESCRIPTION
39
40Warning: This class is marked ALPHA. We are using this in development and have
41some basic test coverage but the code hasn't yet been stressed by a variety
42of databases. Individual DB's may have quirks we are not aware of. Please
43use this in development and pass along your experiences/bug fixes.
44
45This class implements replicated data store for DBI. Currently you can define
46one master and numerous slave database connections. All write-type queries
47(INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
48database, all read-type queries (SELECTs) go to the slave database.
49
50Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
51handle gets delegated to one of the two attributes: L</master_storage> or to
52L</current_replicant_storage>. Additionally, some methods need to be distributed
53to all existing storages. This way our storage class is a drop in replacement
54for L<DBIx::Class::Storage::DBI>.
55
56Read traffic is spread across the replicants (slaves) occuring to a user
57selected algorithm. The default algorithm is random weighted.
58
59TODO more details about the algorithm.
60
61=head1 ATTRIBUTES
62
63This class defines the following attributes.
64
65=head2 master
66
67The master defines the canonical state for a pool of connected databases. All
68the replicants are expected to match this databases state. Thus, in a classic
69Master / Slaves distributed system, all the slaves are expected to replicate
70the Master's state as quick as possible. This is the only database in the
71pool of databases that is allowed to handle write traffic.
72
73=cut
74
75has 'master' => (
76 is=> 'ro',
77 isa=>'DBIx::Class::Storage::DBI',
78 lazy_build=>1,
79 handles=>[qw/
80 on_connect_do
81 on_disconnect_do
2bf79155 82 connect_info
83 throw_exception
84 sql_maker
85 sqlt_type
86 create_ddl_dir
87 deployment_statements
88 datetime_parser
89 datetime_parser_type
90 last_insert_id
91 insert
92 insert_bulk
93 update
94 delete
95 dbh
96 txn_do
97 txn_commit
98 txn_rollback
99 sth
100 deploy
50336325 101 schema
2bf79155 102 /],
103);
104
105
106=head2 current_replicant
107
108Replicant storages (slaves) handle all read only traffic. The assumption is
109that your database will become readbound well before it becomes write bound
110and that being able to spread your read only traffic around to multiple
111databases is going to help you to scale traffic.
112
113This attribute returns the next slave to handle a read request. Your L</pool>
114attribute has methods to help you shuffle through all the available replicants
115via it's balancer object.
116
117This attribute defines the following reader/writer methods
118
119=over 4
120
121=item get_current_replicant
122
123Returns the contained L<DBIx::Class::Storage::DBI> replicant
124
125=item set_current_replicant
126
127Set the attribute to a given L<DBIx::Class::Storage::DBI> (or subclass) object.
128
129=back
130
131We split the reader/writer to make it easier to selectively override how the
132replicant is altered.
133
134=cut
135
136has 'current_replicant' => (
137 is=> 'rw',
138 reader=>'get_current_replicant',
139 writer=>'set_current_replicant',
140 isa=>'DBIx::Class::Storage::DBI',
141 lazy_build=>1,
142 handles=>[qw/
143 select
144 select_single
145 columns_info_for
146 /],
147);
148
149
26ab719a 150=head2 pool_type
2bf79155 151
26ab719a 152Contains the classname which will instantiate the L</pool> object. Defaults
153to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
2bf79155 154
155=cut
156
26ab719a 157has 'pool_type' => (
2bf79155 158 is=>'ro',
159 isa=>'ClassName',
160 required=>1,
26ab719a 161 lazy=>1,
2bf79155 162 default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
26ab719a 163 handles=>{
164 'create_pool' => 'new',
2bf79155 165 },
166);
167
168
26ab719a 169=head2 balancer_type
2bf79155 170
171The replication pool requires a balance class to provider the methods for
172choose how to spread the query load across each replicant in the pool.
173
174=cut
175
26ab719a 176has 'balancer_type' => (
2bf79155 177 is=>'ro',
178 isa=>'ClassName',
179 required=>1,
26ab719a 180 lazy=>1,
181 default=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
182 handles=>{
183 'create_balancer' => 'new',
2bf79155 184 },
185);
186
187
26ab719a 188=head2 pool
2bf79155 189
26ab719a 190Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class. This is a
191container class for one or more replicated databases.
2bf79155 192
193=cut
194
26ab719a 195has 'pool' => (
2bf79155 196 is=>'ro',
197 isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
198 lazy_build=>1,
26ab719a 199 handles=>[qw/
200 replicants
201 has_replicants
202 create_replicants
203 num_replicants
204 delete_replicant
205 /],
2bf79155 206);
207
208
26ab719a 209=head2 balancer
2bf79155 210
26ab719a 211Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class. This
212is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
2bf79155 213
26ab719a 214=cut
2bf79155 215
26ab719a 216has 'balancer' => (
217 is=>'ro',
218 isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
219 lazy_build=>1,
220 handles=>[qw/next_storage/],
221);
2bf79155 222
26ab719a 223=head1 METHODS
2bf79155 224
26ab719a 225This class defines the following methods.
2bf79155 226
26ab719a 227=head2 _build_master
2bf79155 228
26ab719a 229Lazy builder for the L</master> attribute.
2bf79155 230
231=cut
232
26ab719a 233sub _build_master {
2bf79155 234 DBIx::Class::Storage::DBI->new;
235}
236
237
26ab719a 238=head2 _build_current_replicant
2bf79155 239
240Lazy builder for the L</current_replicant_storage> attribute.
241
242=cut
243
26ab719a 244sub _build_current_replicant {
245 my $self = shift @_;
246 $self->next_storage($self->pool);
2bf79155 247}
248
249
26ab719a 250=head2 _build_pool
2bf79155 251
26ab719a 252Lazy builder for the L</pool> attribute.
2bf79155 253
254=cut
255
26ab719a 256sub _build_pool {
2bf79155 257 my $self = shift @_;
26ab719a 258 $self->create_pool;
2bf79155 259}
260
261
26ab719a 262=head2 _build_balancer
2bf79155 263
26ab719a 264Lazy builder for the L</balancer> attribute.
2bf79155 265
266=cut
267
26ab719a 268sub _build_balancer {
269 my $self = shift @_;
270 $self->create_balancer;
2bf79155 271}
272
273
50336325 274=head2 around: create_replicants
275
276All calls to create_replicants needs to have an existing $schema tacked onto
277top of the args
278
279=cut
280
281around 'create_replicants' => sub {
282 my ($method, $self, @args) = @_;
283 $self->$method($self->schema, @args);
284};
285
286
2bf79155 287=head2 after: get_current_replicant_storage
288
289Advice on the current_replicant_storage attribute. Each time we use a replicant
290we need to change it via the storage pool algorithm. That way we are spreading
291the load evenly (hopefully) across existing capacity.
292
293=cut
294
50336325 295after 'current_replicant' => sub {
2bf79155 296 my $self = shift @_;
26ab719a 297 my $next_replicant = $self->next_storage($self->pool);
50336325 298
299warn '......................';
26ab719a 300 $self->set_current_replicant($next_replicant);
2bf79155 301};
302
303
2bf79155 304=head2 all_storages
305
306Returns an array of of all the connected storage backends. The first element
307in the returned array is the master, and the remainings are each of the
308replicants.
309
310=cut
311
312sub all_storages {
313 my $self = shift @_;
314
26ab719a 315 return grep {defined $_ && blessed $_} (
316 $self->master,
317 $self->replicants,
2bf79155 318 );
319}
320
321
322=head2 connected
323
324Check that the master and at least one of the replicants is connected.
325
326=cut
327
328sub connected {
329 my $self = shift @_;
330
331 return
26ab719a 332 $self->master->connected &&
333 $self->pool->connected_replicants;
2bf79155 334}
335
336
337=head2 ensure_connected
338
339Make sure all the storages are connected.
340
341=cut
342
343sub ensure_connected {
344 my $self = shift @_;
26ab719a 345 foreach my $source ($self->all_storages) {
2bf79155 346 $source->ensure_connected(@_);
347 }
348}
349
350
351=head2 limit_dialect
352
353Set the limit_dialect for all existing storages
354
355=cut
356
357sub limit_dialect {
358 my $self = shift @_;
26ab719a 359 foreach my $source ($self->all_storages) {
360 $source->limit_dialect(@_);
2bf79155 361 }
362}
363
364
365=head2 quote_char
366
367Set the quote_char for all existing storages
368
369=cut
370
371sub quote_char {
372 my $self = shift @_;
26ab719a 373 foreach my $source ($self->all_storages) {
374 $source->quote_char(@_);
2bf79155 375 }
376}
377
378
379=head2 name_sep
380
381Set the name_sep for all existing storages
382
383=cut
384
385sub name_sep {
386 my $self = shift @_;
26ab719a 387 foreach my $source ($self->all_storages) {
2bf79155 388 $source->name_sep(@_);
389 }
390}
391
392
393=head2 set_schema
394
395Set the schema object for all existing storages
396
397=cut
398
399sub set_schema {
400 my $self = shift @_;
26ab719a 401 foreach my $source ($self->all_storages) {
2bf79155 402 $source->set_schema(@_);
403 }
404}
405
406
407=head2 debug
408
409set a debug flag across all storages
410
411=cut
412
413sub debug {
414 my $self = shift @_;
26ab719a 415 foreach my $source ($self->all_storages) {
2bf79155 416 $source->debug(@_);
417 }
418}
419
420
421=head2 debugobj
422
423set a debug object across all storages
424
425=cut
426
427sub debugobj {
428 my $self = shift @_;
26ab719a 429 foreach my $source ($self->all_storages) {
2bf79155 430 $source->debugobj(@_);
431 }
432}
433
434
435=head2 debugfh
436
437set a debugfh object across all storages
438
439=cut
440
441sub debugfh {
442 my $self = shift @_;
26ab719a 443 foreach my $source ($self->all_storages) {
2bf79155 444 $source->debugfh(@_);
445 }
446}
447
448
449=head2 debugcb
450
451set a debug callback across all storages
452
453=cut
454
455sub debugcb {
456 my $self = shift @_;
26ab719a 457 foreach my $source ($self->all_storages) {
2bf79155 458 $source->debugcb(@_);
459 }
460}
461
462
463=head2 disconnect
464
465disconnect everything
466
467=cut
468
469sub disconnect {
470 my $self = shift @_;
26ab719a 471 foreach my $source ($self->all_storages) {
2bf79155 472 $source->disconnect(@_);
473 }
474}
475
476
477=head2 DESTROY
478
479Make sure we pass destroy events down to the storage handlers
480
481=cut
482
483sub DESTROY {
484 my $self = shift;
485 ## TODO, maybe we can just leave this alone ???
486}
487
488
489=head1 AUTHOR
490
491Norbert Csongrádi <bert@cpan.org>
492
493Peter Siklósi <einon@einon.hu>
494
495John Napiorkowski <john.napiorkowski@takkle.com>
496
497=head1 LICENSE
498
499You may distribute this code under the same terms as Perl itself.
500
501=cut
502
5031;
504
505__END__
506
f5d3a5de 507use strict;
508use warnings;
509
510use DBIx::Class::Storage::DBI;
511use DBD::Multi;
2156bbdd 512
f5d3a5de 513use base qw/Class::Accessor::Fast/;
514
515__PACKAGE__->mk_accessors( qw/read_source write_source/ );
516
517=head1 NAME
518
2156bbdd 519DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
f5d3a5de 520
521=head1 SYNOPSIS
522
2156bbdd 523The Following example shows how to change an existing $schema to a replicated
524storage type and update it's connection information to contain a master DSN and
525an array of slaves.
526
527 ## Change storage_type in your schema class
528 $schema->storage_type( '::DBI::Replicated' );
529
530 ## Set your connection.
531 $schema->connect(
532 $dsn, $user, $password, {
533 AutoCommit => 1,
534 ## Other standard DBI connection or DBD custom attributes added as
535 ## usual. Additionally, we have two custom attributes for defining
536 ## slave information and controlling how the underlying DBD::Multi
537 slaves_connect_info => [
538 ## Define each slave like a 'normal' DBI connection, but you add
539 ## in a DBD::Multi custom attribute to define how the slave is
540 ## prioritized. Please see DBD::Multi for more.
541 [$slave1dsn, $user, $password, {%slave1opts, priority=>10}],
542 [$slave2dsn, $user, $password, {%slave2opts, priority=>10}],
543 [$slave3dsn, $user, $password, {%slave3opts, priority=>20}],
544 ## add in a preexisting database handle
545 [$dbh, '','', {priority=>30}],
546 ## DBD::Multi will call this coderef for connects
547 [sub { DBI->connect(< DSN info >) }, '', '', {priority=>40}],
548 ## If the last item is hashref, we use that for DBD::Multi's
549 ## configuration information. Again, see DBD::Multi for more.
550 {timeout=>25, failed_max=>2},
551 ],
552 },
553 );
554
555 ## Now, just use the schema as normal
556 $schema->resultset('Table')->find(< unique >); ## Reads will use slaves
557 $schema->resultset('Table')->create(\%info); ## Writes will use master
f5d3a5de 558
559=head1 DESCRIPTION
560
2156bbdd 561Warning: This class is marked ALPHA. We are using this in development and have
562some basic test coverage but the code hasn't yet been stressed by a variety
563of databases. Individual DB's may have quirks we are not aware of. Please
564use this in development and pass along your experiences/bug fixes.
f5d3a5de 565
0aff97c2 566This class implements replicated data store for DBI. Currently you can define
567one master and numerous slave database connections. All write-type queries
568(INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
569database, all read-type queries (SELECTs) go to the slave database.
f5d3a5de 570
0aff97c2 571For every slave database you can define a priority value, which controls data
572source usage pattern. It uses L<DBD::Multi>, so first the lower priority data
573sources used (if they have the same priority, the are used randomized), than
574if all low priority data sources fail, higher ones tried in order.
f5d3a5de 575
576=head1 CONFIGURATION
577
2156bbdd 578Please see L<DBD::Multi> for most configuration information.
f5d3a5de 579
580=cut
581
582sub new {
583 my $proto = shift;
584 my $class = ref( $proto ) || $proto;
585 my $self = {};
586
587 bless( $self, $class );
588
589 $self->write_source( DBIx::Class::Storage::DBI->new );
590 $self->read_source( DBIx::Class::Storage::DBI->new );
591
592 return $self;
593}
594
595sub all_sources {
596 my $self = shift;
597
598 my @sources = ($self->read_source, $self->write_source);
599
600 return wantarray ? @sources : \@sources;
601}
602
2156bbdd 603sub _connect_info {
604 my $self = shift;
605 my $master = $self->write_source->_connect_info;
606 $master->[-1]->{slave_connect_info} = $self->read_source->_connect_info;
607 return $master;
608}
f5d3a5de 609
2156bbdd 610sub connect_info {
611 my ($self, $source_info) = @_;
f5d3a5de 612
2156bbdd 613 ## if there is no $source_info, treat this sub like an accessor
614 return $self->_connect_info
615 if !$source_info;
616
617 ## Alright, let's conect the master
618 $self->write_source->connect_info($source_info);
619
620 ## Now, build and then connect the Slaves
621 my @slaves_connect_info = @{$source_info->[-1]->{slaves_connect_info}};
622 my $dbd_multi_config = ref $slaves_connect_info[-1] eq 'HASH'
623 ? pop @slaves_connect_info : {};
624
625 ## We need to do this since SQL::Abstract::Limit can't guess what DBD::Multi is
626 $dbd_multi_config->{limit_dialect} = $self->write_source->sql_maker->limit_dialect
627 unless defined $dbd_multi_config->{limit_dialect};
628
629 @slaves_connect_info = map {
0f83441a 630 ## if the first element in the arrayhash is a ref, make that the value
631 my $db = ref $_->[0] ? $_->[0] : $_;
2156bbdd 632 my $priority = $_->[-1]->{priority} || 10; ## default priority is 10
633 $priority => $db;
634 } @slaves_connect_info;
0f83441a 635
2156bbdd 636 $self->read_source->connect_info([
637 'dbi:Multi:', undef, undef, {
638 dsns => [@slaves_connect_info],
639 %$dbd_multi_config,
640 },
641 ]);
642
643 ## Return the formated connection information
644 return $self->_connect_info;
f5d3a5de 645}
646
647sub select {
648 shift->read_source->select( @_ );
649}
650sub select_single {
651 shift->read_source->select_single( @_ );
652}
653sub throw_exception {
654 shift->read_source->throw_exception( @_ );
655}
656sub sql_maker {
657 shift->read_source->sql_maker( @_ );
658}
659sub columns_info_for {
660 shift->read_source->columns_info_for( @_ );
661}
662sub sqlt_type {
663 shift->read_source->sqlt_type( @_ );
664}
665sub create_ddl_dir {
666 shift->read_source->create_ddl_dir( @_ );
667}
668sub deployment_statements {
669 shift->read_source->deployment_statements( @_ );
670}
671sub datetime_parser {
672 shift->read_source->datetime_parser( @_ );
673}
674sub datetime_parser_type {
675 shift->read_source->datetime_parser_type( @_ );
676}
677sub build_datetime_parser {
678 shift->read_source->build_datetime_parser( @_ );
679}
680
9b21c682 681sub limit_dialect { $_->limit_dialect( @_ ) for( shift->all_sources ) }
682sub quote_char { $_->quote_char( @_ ) for( shift->all_sources ) }
683sub name_sep { $_->quote_char( @_ ) for( shift->all_sources ) }
684sub disconnect { $_->disconnect( @_ ) for( shift->all_sources ) }
685sub set_schema { $_->set_schema( @_ ) for( shift->all_sources ) }
686
f5d3a5de 687sub DESTROY {
688 my $self = shift;
689
690 undef $self->{write_source};
691 undef $self->{read_sources};
692}
693
694sub last_insert_id {
695 shift->write_source->last_insert_id( @_ );
696}
697sub insert {
698 shift->write_source->insert( @_ );
699}
700sub update {
701 shift->write_source->update( @_ );
702}
703sub update_all {
704 shift->write_source->update_all( @_ );
705}
706sub delete {
707 shift->write_source->delete( @_ );
708}
709sub delete_all {
710 shift->write_source->delete_all( @_ );
711}
712sub create {
713 shift->write_source->create( @_ );
714}
715sub find_or_create {
716 shift->write_source->find_or_create( @_ );
717}
718sub update_or_create {
719 shift->write_source->update_or_create( @_ );
720}
721sub connected {
722 shift->write_source->connected( @_ );
723}
724sub ensure_connected {
725 shift->write_source->ensure_connected( @_ );
726}
727sub dbh {
728 shift->write_source->dbh( @_ );
729}
2156bbdd 730sub txn_do {
731 shift->write_source->txn_do( @_ );
f5d3a5de 732}
733sub txn_commit {
734 shift->write_source->txn_commit( @_ );
735}
736sub txn_rollback {
737 shift->write_source->txn_rollback( @_ );
738}
739sub sth {
740 shift->write_source->sth( @_ );
741}
742sub deploy {
743 shift->write_source->deploy( @_ );
744}
2156bbdd 745sub _prep_for_execute {
746 shift->write_source->_prep_for_execute(@_);
747}
f5d3a5de 748
2156bbdd 749sub debugobj {
750 shift->write_source->debugobj(@_);
751}
752sub debug {
753 shift->write_source->debug(@_);
754}
f5d3a5de 755
756sub debugfh { shift->_not_supported( 'debugfh' ) };
757sub debugcb { shift->_not_supported( 'debugcb' ) };
758
759sub _not_supported {
760 my( $self, $method ) = @_;
761
762 die "This Storage does not support $method method.";
763}
764
765=head1 SEE ALSO
766
767L<DBI::Class::Storage::DBI>, L<DBD::Multi>, L<DBI>
768
769=head1 AUTHOR
770
771Norbert Csongrádi <bert@cpan.org>
772
773Peter Siklósi <einon@einon.hu>
774
2156bbdd 775John Napiorkowski <john.napiorkowski@takkle.com>
776
f5d3a5de 777=head1 LICENSE
778
779You may distribute this code under the same terms as Perl itself.
780
781=cut
782
7831;