got first pass on the replication and balancer, passing all of the old test suite...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
CommitLineData
2156bbdd 1package DBIx::Class::Storage::DBI::Replicated;
f5d3a5de 2
2bf79155 3use Moose;
26ab719a 4use DBIx::Class::Storage::DBI;
2bf79155 5use DBIx::Class::Storage::DBI::Replicated::Pool;
26ab719a 6use DBIx::Class::Storage::DBI::Replicated::Balancer;
7use Scalar::Util qw(blessed);
2bf79155 8
26ab719a 9extends 'DBIx::Class::Storage::DBI', 'Moose::Object';
2bf79155 10
11=head1 NAME
12
13DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
14
15=head1 SYNOPSIS
16
17The Following example shows how to change an existing $schema to a replicated
18storage type, add some replicated (readonly) databases, and perform reporting
19tasks
20
21 ## Change storage_type in your schema class
22 $schema->storage_type( '::DBI::Replicated' );
23
24 ## Add some slaves. Basically this is an array of arrayrefs, where each
25 ## arrayref is database connect information
26
27 $schema->storage->create_replicants(
28 [$dsn1, $user, $pass, \%opts],
29 [$dsn1, $user, $pass, \%opts],
30 [$dsn1, $user, $pass, \%opts],
31 ## This is just going to use the standard DBIC connect method, so it
32 ## supports everything that method supports, such as connecting to an
33 ## existing database handle.
34 [$dbh],
35 \%global_opts
36 );
37
38 ## a hash of replicants, keyed by their DSN
39 my %replicants = $schema->storage->replicants;
40 my $replicant = $schema->storage->get_replicant($dsn);
41 $replicant->status;
42 $replicant->is_active;
43 $replicant->active;
44
45=head1 DESCRIPTION
46
47Warning: This class is marked ALPHA. We are using this in development and have
48some basic test coverage but the code hasn't yet been stressed by a variety
49of databases. Individual DB's may have quirks we are not aware of. Please
50use this in development and pass along your experiences/bug fixes.
51
52This class implements replicated data store for DBI. Currently you can define
53one master and numerous slave database connections. All write-type queries
54(INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
55database, all read-type queries (SELECTs) go to the slave database.
56
57Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
58handle gets delegated to one of the two attributes: L</master_storage> or to
59L</current_replicant_storage>. Additionally, some methods need to be distributed
60to all existing storages. This way our storage class is a drop in replacement
61for L<DBIx::Class::Storage::DBI>.
62
63Read traffic is spread across the replicants (slaves) occuring to a user
64selected algorithm. The default algorithm is random weighted.
65
66TODO more details about the algorithm.
67
68=head1 ATTRIBUTES
69
70This class defines the following attributes.
71
72=head2 master
73
74The master defines the canonical state for a pool of connected databases. All
75the replicants are expected to match this databases state. Thus, in a classic
76Master / Slaves distributed system, all the slaves are expected to replicate
77the Master's state as quick as possible. This is the only database in the
78pool of databases that is allowed to handle write traffic.
79
80=cut
81
82has 'master' => (
83 is=> 'ro',
84 isa=>'DBIx::Class::Storage::DBI',
85 lazy_build=>1,
86 handles=>[qw/
87 on_connect_do
88 on_disconnect_do
2bf79155 89 connect_info
90 throw_exception
91 sql_maker
92 sqlt_type
93 create_ddl_dir
94 deployment_statements
95 datetime_parser
96 datetime_parser_type
97 last_insert_id
98 insert
99 insert_bulk
100 update
101 delete
102 dbh
103 txn_do
104 txn_commit
105 txn_rollback
106 sth
107 deploy
108 /],
109);
110
111
112=head2 current_replicant
113
114Replicant storages (slaves) handle all read only traffic. The assumption is
115that your database will become readbound well before it becomes write bound
116and that being able to spread your read only traffic around to multiple
117databases is going to help you to scale traffic.
118
119This attribute returns the next slave to handle a read request. Your L</pool>
120attribute has methods to help you shuffle through all the available replicants
121via it's balancer object.
122
123This attribute defines the following reader/writer methods
124
125=over 4
126
127=item get_current_replicant
128
129Returns the contained L<DBIx::Class::Storage::DBI> replicant
130
131=item set_current_replicant
132
133Set the attribute to a given L<DBIx::Class::Storage::DBI> (or subclass) object.
134
135=back
136
137We split the reader/writer to make it easier to selectively override how the
138replicant is altered.
139
140=cut
141
142has 'current_replicant' => (
143 is=> 'rw',
144 reader=>'get_current_replicant',
145 writer=>'set_current_replicant',
146 isa=>'DBIx::Class::Storage::DBI',
147 lazy_build=>1,
148 handles=>[qw/
149 select
150 select_single
151 columns_info_for
152 /],
153);
154
155
26ab719a 156=head2 pool_type
2bf79155 157
26ab719a 158Contains the classname which will instantiate the L</pool> object. Defaults
159to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
2bf79155 160
161=cut
162
26ab719a 163has 'pool_type' => (
2bf79155 164 is=>'ro',
165 isa=>'ClassName',
166 required=>1,
26ab719a 167 lazy=>1,
2bf79155 168 default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
26ab719a 169 handles=>{
170 'create_pool' => 'new',
2bf79155 171 },
172);
173
174
26ab719a 175=head2 balancer_type
2bf79155 176
177The replication pool requires a balance class to provider the methods for
178choose how to spread the query load across each replicant in the pool.
179
180=cut
181
26ab719a 182has 'balancer_type' => (
2bf79155 183 is=>'ro',
184 isa=>'ClassName',
185 required=>1,
26ab719a 186 lazy=>1,
187 default=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
188 handles=>{
189 'create_balancer' => 'new',
2bf79155 190 },
191);
192
193
26ab719a 194=head2 pool
2bf79155 195
26ab719a 196Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class. This is a
197container class for one or more replicated databases.
2bf79155 198
199=cut
200
26ab719a 201has 'pool' => (
2bf79155 202 is=>'ro',
203 isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
204 lazy_build=>1,
26ab719a 205 handles=>[qw/
206 replicants
207 has_replicants
208 create_replicants
209 num_replicants
210 delete_replicant
211 /],
2bf79155 212);
213
214
26ab719a 215=head2 balancer
2bf79155 216
26ab719a 217Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class. This
218is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
2bf79155 219
26ab719a 220=cut
2bf79155 221
26ab719a 222has 'balancer' => (
223 is=>'ro',
224 isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
225 lazy_build=>1,
226 handles=>[qw/next_storage/],
227);
2bf79155 228
26ab719a 229=head1 METHODS
2bf79155 230
26ab719a 231This class defines the following methods.
2bf79155 232
26ab719a 233=head2 _build_master
2bf79155 234
26ab719a 235Lazy builder for the L</master> attribute.
2bf79155 236
237=cut
238
26ab719a 239sub _build_master {
2bf79155 240 DBIx::Class::Storage::DBI->new;
241}
242
243
26ab719a 244=head2 _build_current_replicant
2bf79155 245
246Lazy builder for the L</current_replicant_storage> attribute.
247
248=cut
249
26ab719a 250sub _build_current_replicant {
251 my $self = shift @_;
252 $self->next_storage($self->pool);
2bf79155 253}
254
255
26ab719a 256=head2 _build_pool
2bf79155 257
26ab719a 258Lazy builder for the L</pool> attribute.
2bf79155 259
260=cut
261
26ab719a 262sub _build_pool {
2bf79155 263 my $self = shift @_;
26ab719a 264 $self->create_pool;
2bf79155 265}
266
267
26ab719a 268=head2 _build_balancer
2bf79155 269
26ab719a 270Lazy builder for the L</balancer> attribute.
2bf79155 271
272=cut
273
26ab719a 274sub _build_balancer {
275 my $self = shift @_;
276 $self->create_balancer;
2bf79155 277}
278
279
280=head2 after: get_current_replicant_storage
281
282Advice on the current_replicant_storage attribute. Each time we use a replicant
283we need to change it via the storage pool algorithm. That way we are spreading
284the load evenly (hopefully) across existing capacity.
285
286=cut
287
26ab719a 288after 'get_current_replicant' => sub {
2bf79155 289 my $self = shift @_;
26ab719a 290 my $next_replicant = $self->next_storage($self->pool);
291
292 $self->set_current_replicant($next_replicant);
2bf79155 293};
294
295
2bf79155 296=head2 all_storages
297
298Returns an array of of all the connected storage backends. The first element
299in the returned array is the master, and the remainings are each of the
300replicants.
301
302=cut
303
304sub all_storages {
305 my $self = shift @_;
306
26ab719a 307 return grep {defined $_ && blessed $_} (
308 $self->master,
309 $self->replicants,
2bf79155 310 );
311}
312
313
314=head2 connected
315
316Check that the master and at least one of the replicants is connected.
317
318=cut
319
320sub connected {
321 my $self = shift @_;
322
323 return
26ab719a 324 $self->master->connected &&
325 $self->pool->connected_replicants;
2bf79155 326}
327
328
329=head2 ensure_connected
330
331Make sure all the storages are connected.
332
333=cut
334
335sub ensure_connected {
336 my $self = shift @_;
26ab719a 337 foreach my $source ($self->all_storages) {
2bf79155 338 $source->ensure_connected(@_);
339 }
340}
341
342
343=head2 limit_dialect
344
345Set the limit_dialect for all existing storages
346
347=cut
348
349sub limit_dialect {
350 my $self = shift @_;
26ab719a 351 foreach my $source ($self->all_storages) {
352 $source->limit_dialect(@_);
2bf79155 353 }
354}
355
356
357=head2 quote_char
358
359Set the quote_char for all existing storages
360
361=cut
362
363sub quote_char {
364 my $self = shift @_;
26ab719a 365 foreach my $source ($self->all_storages) {
366 $source->quote_char(@_);
2bf79155 367 }
368}
369
370
371=head2 name_sep
372
373Set the name_sep for all existing storages
374
375=cut
376
377sub name_sep {
378 my $self = shift @_;
26ab719a 379 foreach my $source ($self->all_storages) {
2bf79155 380 $source->name_sep(@_);
381 }
382}
383
384
385=head2 set_schema
386
387Set the schema object for all existing storages
388
389=cut
390
391sub set_schema {
392 my $self = shift @_;
26ab719a 393 foreach my $source ($self->all_storages) {
2bf79155 394 $source->set_schema(@_);
395 }
396}
397
398
399=head2 debug
400
401set a debug flag across all storages
402
403=cut
404
405sub debug {
406 my $self = shift @_;
26ab719a 407 foreach my $source ($self->all_storages) {
2bf79155 408 $source->debug(@_);
409 }
410}
411
412
413=head2 debugobj
414
415set a debug object across all storages
416
417=cut
418
419sub debugobj {
420 my $self = shift @_;
26ab719a 421 foreach my $source ($self->all_storages) {
2bf79155 422 $source->debugobj(@_);
423 }
424}
425
426
427=head2 debugfh
428
429set a debugfh object across all storages
430
431=cut
432
433sub debugfh {
434 my $self = shift @_;
26ab719a 435 foreach my $source ($self->all_storages) {
2bf79155 436 $source->debugfh(@_);
437 }
438}
439
440
441=head2 debugcb
442
443set a debug callback across all storages
444
445=cut
446
447sub debugcb {
448 my $self = shift @_;
26ab719a 449 foreach my $source ($self->all_storages) {
2bf79155 450 $source->debugcb(@_);
451 }
452}
453
454
455=head2 disconnect
456
457disconnect everything
458
459=cut
460
461sub disconnect {
462 my $self = shift @_;
26ab719a 463 foreach my $source ($self->all_storages) {
2bf79155 464 $source->disconnect(@_);
465 }
466}
467
468
469=head2 DESTROY
470
471Make sure we pass destroy events down to the storage handlers
472
473=cut
474
475sub DESTROY {
476 my $self = shift;
477 ## TODO, maybe we can just leave this alone ???
478}
479
480
481=head1 AUTHOR
482
483Norbert Csongrádi <bert@cpan.org>
484
485Peter Siklósi <einon@einon.hu>
486
487John Napiorkowski <john.napiorkowski@takkle.com>
488
489=head1 LICENSE
490
491You may distribute this code under the same terms as Perl itself.
492
493=cut
494
4951;
496
497__END__
498
f5d3a5de 499use strict;
500use warnings;
501
502use DBIx::Class::Storage::DBI;
503use DBD::Multi;
2156bbdd 504
f5d3a5de 505use base qw/Class::Accessor::Fast/;
506
507__PACKAGE__->mk_accessors( qw/read_source write_source/ );
508
509=head1 NAME
510
2156bbdd 511DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
f5d3a5de 512
513=head1 SYNOPSIS
514
2156bbdd 515The Following example shows how to change an existing $schema to a replicated
516storage type and update it's connection information to contain a master DSN and
517an array of slaves.
518
519 ## Change storage_type in your schema class
520 $schema->storage_type( '::DBI::Replicated' );
521
522 ## Set your connection.
523 $schema->connect(
524 $dsn, $user, $password, {
525 AutoCommit => 1,
526 ## Other standard DBI connection or DBD custom attributes added as
527 ## usual. Additionally, we have two custom attributes for defining
528 ## slave information and controlling how the underlying DBD::Multi
529 slaves_connect_info => [
530 ## Define each slave like a 'normal' DBI connection, but you add
531 ## in a DBD::Multi custom attribute to define how the slave is
532 ## prioritized. Please see DBD::Multi for more.
533 [$slave1dsn, $user, $password, {%slave1opts, priority=>10}],
534 [$slave2dsn, $user, $password, {%slave2opts, priority=>10}],
535 [$slave3dsn, $user, $password, {%slave3opts, priority=>20}],
536 ## add in a preexisting database handle
537 [$dbh, '','', {priority=>30}],
538 ## DBD::Multi will call this coderef for connects
539 [sub { DBI->connect(< DSN info >) }, '', '', {priority=>40}],
540 ## If the last item is hashref, we use that for DBD::Multi's
541 ## configuration information. Again, see DBD::Multi for more.
542 {timeout=>25, failed_max=>2},
543 ],
544 },
545 );
546
547 ## Now, just use the schema as normal
548 $schema->resultset('Table')->find(< unique >); ## Reads will use slaves
549 $schema->resultset('Table')->create(\%info); ## Writes will use master
f5d3a5de 550
551=head1 DESCRIPTION
552
2156bbdd 553Warning: This class is marked ALPHA. We are using this in development and have
554some basic test coverage but the code hasn't yet been stressed by a variety
555of databases. Individual DB's may have quirks we are not aware of. Please
556use this in development and pass along your experiences/bug fixes.
f5d3a5de 557
0aff97c2 558This class implements replicated data store for DBI. Currently you can define
559one master and numerous slave database connections. All write-type queries
560(INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
561database, all read-type queries (SELECTs) go to the slave database.
f5d3a5de 562
0aff97c2 563For every slave database you can define a priority value, which controls data
564source usage pattern. It uses L<DBD::Multi>, so first the lower priority data
565sources used (if they have the same priority, the are used randomized), than
566if all low priority data sources fail, higher ones tried in order.
f5d3a5de 567
568=head1 CONFIGURATION
569
2156bbdd 570Please see L<DBD::Multi> for most configuration information.
f5d3a5de 571
572=cut
573
574sub new {
575 my $proto = shift;
576 my $class = ref( $proto ) || $proto;
577 my $self = {};
578
579 bless( $self, $class );
580
581 $self->write_source( DBIx::Class::Storage::DBI->new );
582 $self->read_source( DBIx::Class::Storage::DBI->new );
583
584 return $self;
585}
586
587sub all_sources {
588 my $self = shift;
589
590 my @sources = ($self->read_source, $self->write_source);
591
592 return wantarray ? @sources : \@sources;
593}
594
2156bbdd 595sub _connect_info {
596 my $self = shift;
597 my $master = $self->write_source->_connect_info;
598 $master->[-1]->{slave_connect_info} = $self->read_source->_connect_info;
599 return $master;
600}
f5d3a5de 601
2156bbdd 602sub connect_info {
603 my ($self, $source_info) = @_;
f5d3a5de 604
2156bbdd 605 ## if there is no $source_info, treat this sub like an accessor
606 return $self->_connect_info
607 if !$source_info;
608
609 ## Alright, let's conect the master
610 $self->write_source->connect_info($source_info);
611
612 ## Now, build and then connect the Slaves
613 my @slaves_connect_info = @{$source_info->[-1]->{slaves_connect_info}};
614 my $dbd_multi_config = ref $slaves_connect_info[-1] eq 'HASH'
615 ? pop @slaves_connect_info : {};
616
617 ## We need to do this since SQL::Abstract::Limit can't guess what DBD::Multi is
618 $dbd_multi_config->{limit_dialect} = $self->write_source->sql_maker->limit_dialect
619 unless defined $dbd_multi_config->{limit_dialect};
620
621 @slaves_connect_info = map {
0f83441a 622 ## if the first element in the arrayhash is a ref, make that the value
623 my $db = ref $_->[0] ? $_->[0] : $_;
2156bbdd 624 my $priority = $_->[-1]->{priority} || 10; ## default priority is 10
625 $priority => $db;
626 } @slaves_connect_info;
0f83441a 627
2156bbdd 628 $self->read_source->connect_info([
629 'dbi:Multi:', undef, undef, {
630 dsns => [@slaves_connect_info],
631 %$dbd_multi_config,
632 },
633 ]);
634
635 ## Return the formated connection information
636 return $self->_connect_info;
f5d3a5de 637}
638
639sub select {
640 shift->read_source->select( @_ );
641}
642sub select_single {
643 shift->read_source->select_single( @_ );
644}
645sub throw_exception {
646 shift->read_source->throw_exception( @_ );
647}
648sub sql_maker {
649 shift->read_source->sql_maker( @_ );
650}
651sub columns_info_for {
652 shift->read_source->columns_info_for( @_ );
653}
654sub sqlt_type {
655 shift->read_source->sqlt_type( @_ );
656}
657sub create_ddl_dir {
658 shift->read_source->create_ddl_dir( @_ );
659}
660sub deployment_statements {
661 shift->read_source->deployment_statements( @_ );
662}
663sub datetime_parser {
664 shift->read_source->datetime_parser( @_ );
665}
666sub datetime_parser_type {
667 shift->read_source->datetime_parser_type( @_ );
668}
669sub build_datetime_parser {
670 shift->read_source->build_datetime_parser( @_ );
671}
672
9b21c682 673sub limit_dialect { $_->limit_dialect( @_ ) for( shift->all_sources ) }
674sub quote_char { $_->quote_char( @_ ) for( shift->all_sources ) }
675sub name_sep { $_->quote_char( @_ ) for( shift->all_sources ) }
676sub disconnect { $_->disconnect( @_ ) for( shift->all_sources ) }
677sub set_schema { $_->set_schema( @_ ) for( shift->all_sources ) }
678
f5d3a5de 679sub DESTROY {
680 my $self = shift;
681
682 undef $self->{write_source};
683 undef $self->{read_sources};
684}
685
686sub last_insert_id {
687 shift->write_source->last_insert_id( @_ );
688}
689sub insert {
690 shift->write_source->insert( @_ );
691}
692sub update {
693 shift->write_source->update( @_ );
694}
695sub update_all {
696 shift->write_source->update_all( @_ );
697}
698sub delete {
699 shift->write_source->delete( @_ );
700}
701sub delete_all {
702 shift->write_source->delete_all( @_ );
703}
704sub create {
705 shift->write_source->create( @_ );
706}
707sub find_or_create {
708 shift->write_source->find_or_create( @_ );
709}
710sub update_or_create {
711 shift->write_source->update_or_create( @_ );
712}
713sub connected {
714 shift->write_source->connected( @_ );
715}
716sub ensure_connected {
717 shift->write_source->ensure_connected( @_ );
718}
719sub dbh {
720 shift->write_source->dbh( @_ );
721}
2156bbdd 722sub txn_do {
723 shift->write_source->txn_do( @_ );
f5d3a5de 724}
725sub txn_commit {
726 shift->write_source->txn_commit( @_ );
727}
728sub txn_rollback {
729 shift->write_source->txn_rollback( @_ );
730}
731sub sth {
732 shift->write_source->sth( @_ );
733}
734sub deploy {
735 shift->write_source->deploy( @_ );
736}
2156bbdd 737sub _prep_for_execute {
738 shift->write_source->_prep_for_execute(@_);
739}
f5d3a5de 740
2156bbdd 741sub debugobj {
742 shift->write_source->debugobj(@_);
743}
744sub debug {
745 shift->write_source->debug(@_);
746}
f5d3a5de 747
748sub debugfh { shift->_not_supported( 'debugfh' ) };
749sub debugcb { shift->_not_supported( 'debugcb' ) };
750
751sub _not_supported {
752 my( $self, $method ) = @_;
753
754 die "This Storage does not support $method method.";
755}
756
757=head1 SEE ALSO
758
759L<DBI::Class::Storage::DBI>, L<DBD::Multi>, L<DBI>
760
761=head1 AUTHOR
762
763Norbert Csongrádi <bert@cpan.org>
764
765Peter Siklósi <einon@einon.hu>
766
2156bbdd 767John Napiorkowski <john.napiorkowski@takkle.com>
768
f5d3a5de 769=head1 LICENSE
770
771You may distribute this code under the same terms as Perl itself.
772
773=cut
774
7751;