support hashrefs for connect_replicants
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
CommitLineData
2156bbdd 1package DBIx::Class::Storage::DBI::Replicated;
f5d3a5de 2
ecb65397 3BEGIN {
4 use Carp::Clan qw/^DBIx::Class/;
5
6 ## Modules required for Replication support not required for general DBIC
7 ## use, so we explicitly test for these.
8
9 my %replication_required = (
10 Moose => '0.54',
11 MooseX::AttributeHelpers => '0.12',
12 Moose::Util::TypeConstraints => '0.54',
13 Class::MOP => '0.63',
14 );
15
16 my @didnt_load;
17
18 for my $module (keys %replication_required) {
19 eval "use $module $replication_required{$module}";
20 push @didnt_load, "$module $replication_required{$module}"
21 if $@;
22 }
23
24 croak("@{[ join ', ', @didnt_load ]} are missing and are required for Replication")
25 if @didnt_load;
26}
27
26ab719a 28use DBIx::Class::Storage::DBI;
2bf79155 29use DBIx::Class::Storage::DBI::Replicated::Pool;
26ab719a 30use DBIx::Class::Storage::DBI::Replicated::Balancer;
2bf79155 31
32=head1 NAME
33
ecb65397 34DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
2bf79155 35
36=head1 SYNOPSIS
37
38The Following example shows how to change an existing $schema to a replicated
39storage type, add some replicated (readonly) databases, and perform reporting
955a6df6 40tasks.
2bf79155 41
64cdad22 42 ## Change storage_type in your schema class
43 $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
44
45 ## Add some slaves. Basically this is an array of arrayrefs, where each
46 ## arrayref is database connect information
47
48 $schema->storage->connect_replicants(
49 [$dsn1, $user, $pass, \%opts],
50 [$dsn2, $user, $pass, \%opts],
51 [$dsn3, $user, $pass, \%opts],
52 );
53
7e38d850 54 ## Now, just use the $schema as normal
55 $schema->resultset('Source')->search({name=>'etc'});
56
57 ## You can force a given query to use a particular storage using the search
58 ### attribute 'force_pool'. For example:
59
60 my $RS = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
61
62 ## Now $RS will force everything (both reads and writes) to use whatever was
63 ## setup as the master storage. 'master' is hardcoded to always point to the
64 ## Master, but you can also use any Replicant name. Please see:
65 ## L<DBIx::Class::Storage::Replicated::Pool> and the replicants attribute for
66 ## More. Also see transactions and L</execute_reliably> for alternative ways
67 ## to force read traffic to the master.
68
2bf79155 69=head1 DESCRIPTION
70
7e38d850 71Warning: This class is marked BETA. This has been running a production
ccb3b897 72website using MySQL native replication as its backend and we have some decent
7e38d850 73test coverage but the code hasn't yet been stressed by a variety of databases.
74Individual DB's may have quirks we are not aware of. Please use this in first
75development and pass along your experiences/bug fixes.
2bf79155 76
77This class implements replicated data store for DBI. Currently you can define
78one master and numerous slave database connections. All write-type queries
79(INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
80database, all read-type queries (SELECTs) go to the slave database.
81
82Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
bca099a3 83handle gets delegated to one of the two attributes: L</read_handler> or to
84L</write_handler>. Additionally, some methods need to be distributed
2bf79155 85to all existing storages. This way our storage class is a drop in replacement
86for L<DBIx::Class::Storage::DBI>.
87
88Read traffic is spread across the replicants (slaves) occuring to a user
89selected algorithm. The default algorithm is random weighted.
90
bca099a3 91=head1 NOTES
92
93The consistancy betweeen master and replicants is database specific. The Pool
94gives you a method to validate it's replicants, removing and replacing them
7e38d850 95when they fail/pass predefined criteria. Please make careful use of the ways
ecb65397 96to force a query to run against Master when needed.
97
98=head1 REQUIREMENTS
99
100Replicated Storage has additional requirements not currently part of L<DBIx::Class>
101
e3c2b86d 102 Moose => 0.54
ecb65397 103 MooseX::AttributeHelpers => 0.12
104 Moose::Util::TypeConstraints => 0.54
105 Class::MOP => 0.63
106
107You will need to install these modules manually via CPAN or make them part of the
108Makefile for your distribution.
2bf79155 109
110=head1 ATTRIBUTES
111
112This class defines the following attributes.
113
2ce6e9a6 114=head2 schema
115
116The underlying L<DBIx::Class::Schema> object this storage is attaching
117
118=cut
119
120has 'schema' => (
121 is=>'rw',
122 isa=>'DBIx::Class::Schema',
123 weak_ref=>1,
124 required=>1,
125);
126
26ab719a 127=head2 pool_type
2bf79155 128
26ab719a 129Contains the classname which will instantiate the L</pool> object. Defaults
130to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
2bf79155 131
132=cut
133
26ab719a 134has 'pool_type' => (
64cdad22 135 is=>'ro',
136 isa=>'ClassName',
2ce6e9a6 137 required=>1,
138 default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
64cdad22 139 handles=>{
140 'create_pool' => 'new',
141 },
2bf79155 142);
143
f068a139 144=head2 pool_args
145
146Contains a hashref of initialized information to pass to the Balancer object.
147See L<DBIx::Class::Storage::Replicated::Pool> for available arguments.
148
149=cut
150
151has 'pool_args' => (
64cdad22 152 is=>'ro',
153 isa=>'HashRef',
154 lazy=>1,
155 required=>1,
156 default=>sub { {} },
f068a139 157);
158
159
26ab719a 160=head2 balancer_type
2bf79155 161
162The replication pool requires a balance class to provider the methods for
163choose how to spread the query load across each replicant in the pool.
164
165=cut
166
2ce6e9a6 167subtype 'DBIx::Class::Storage::DBI::Replicated::BalancerClassNamePart',
168 as 'ClassName';
169
170coerce 'DBIx::Class::Storage::DBI::Replicated::BalancerClassNamePart',
171 from 'Str',
172 via {
173 my $type = $_;
174 if($type=~m/^::/) {
175 $type = 'DBIx::Class::Storage::DBI::Replicated::Balancer'.$type;
176 }
177 Class::MOP::load_class($type);
178 $type;
179 };
180
26ab719a 181has 'balancer_type' => (
64cdad22 182 is=>'ro',
2ce6e9a6 183 isa=>'DBIx::Class::Storage::DBI::Replicated::BalancerClassNamePart',
184 coerce=>1,
185 required=>1,
186 default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
64cdad22 187 handles=>{
188 'create_balancer' => 'new',
189 },
2bf79155 190);
191
17b05c13 192=head2 balancer_args
193
194Contains a hashref of initialized information to pass to the Balancer object.
f068a139 195See L<DBIx::Class::Storage::Replicated::Balancer> for available arguments.
17b05c13 196
197=cut
198
199has 'balancer_args' => (
64cdad22 200 is=>'ro',
201 isa=>'HashRef',
202 lazy=>1,
203 required=>1,
204 default=>sub { {} },
17b05c13 205);
206
26ab719a 207=head2 pool
2bf79155 208
26ab719a 209Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class. This is a
210container class for one or more replicated databases.
2bf79155 211
212=cut
213
26ab719a 214has 'pool' => (
64cdad22 215 is=>'ro',
216 isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
217 lazy_build=>1,
218 handles=>[qw/
219 connect_replicants
220 replicants
221 has_replicants
222 /],
2bf79155 223);
224
26ab719a 225=head2 balancer
2bf79155 226
26ab719a 227Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class. This
228is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
2bf79155 229
26ab719a 230=cut
2bf79155 231
26ab719a 232has 'balancer' => (
64cdad22 233 is=>'ro',
234 isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
235 lazy_build=>1,
236 handles=>[qw/auto_validate_every/],
26ab719a 237);
2bf79155 238
cb6ec758 239=head2 master
240
241The master defines the canonical state for a pool of connected databases. All
242the replicants are expected to match this databases state. Thus, in a classic
243Master / Slaves distributed system, all the slaves are expected to replicate
244the Master's state as quick as possible. This is the only database in the
245pool of databases that is allowed to handle write traffic.
246
247=cut
248
249has 'master' => (
64cdad22 250 is=> 'ro',
251 isa=>'DBIx::Class::Storage::DBI',
252 lazy_build=>1,
cb6ec758 253);
254
cb6ec758 255=head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
256
257The following methods are delegated all the methods required for the
258L<DBIx::Class::Storage::DBI> interface.
259
260=head2 read_handler
261
262Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
263
264=cut
265
266has 'read_handler' => (
64cdad22 267 is=>'rw',
268 isa=>'Object',
269 lazy_build=>1,
270 handles=>[qw/
271 select
272 select_single
273 columns_info_for
274 /],
cb6ec758 275);
276
cb6ec758 277=head2 write_handler
278
279Defines an object that implements the write side of L<BIx::Class::Storage::DBI>.
280
281=cut
282
283has 'write_handler' => (
64cdad22 284 is=>'ro',
285 isa=>'Object',
286 lazy_build=>1,
287 lazy_build=>1,
288 handles=>[qw/
289 on_connect_do
290 on_disconnect_do
291 connect_info
292 throw_exception
293 sql_maker
294 sqlt_type
295 create_ddl_dir
296 deployment_statements
297 datetime_parser
298 datetime_parser_type
299 last_insert_id
300 insert
301 insert_bulk
302 update
303 delete
304 dbh
2ce6e9a6 305 txn_begin
64cdad22 306 txn_do
307 txn_commit
308 txn_rollback
2ce6e9a6 309 txn_scope_guard
64cdad22 310 sth
311 deploy
0180bef9 312 with_deferred_fk_checks
2ce6e9a6 313
64cdad22 314 reload_row
2ce6e9a6 315 _prep_for_execute
2ce6e9a6 316
64cdad22 317 /],
cb6ec758 318);
319
26ab719a 320=head1 METHODS
2bf79155 321
26ab719a 322This class defines the following methods.
2bf79155 323
c354902c 324=head2 BUILDARGS
2bf79155 325
cb6ec758 326L<DBIx::Class::Schema> when instantiating it's storage passed itself as the
2ce6e9a6 327first argument. So we need to massage the arguments a bit so that all the
328bits get put into the correct places.
2bf79155 329
330=cut
331
c354902c 332sub BUILDARGS {
333 my ($class, $schema, $storage_type_args, @args) = @_;
334
335 return {
336 schema=>$schema,
337 %$storage_type_args,
338 @args
339 }
340}
2bf79155 341
cb6ec758 342=head2 _build_master
2bf79155 343
cb6ec758 344Lazy builder for the L</master> attribute.
2bf79155 345
346=cut
347
cb6ec758 348sub _build_master {
2ce6e9a6 349 my $self = shift @_;
350 DBIx::Class::Storage::DBI->new($self->schema);
106d5f3b 351}
352
26ab719a 353=head2 _build_pool
2bf79155 354
26ab719a 355Lazy builder for the L</pool> attribute.
2bf79155 356
357=cut
358
26ab719a 359sub _build_pool {
64cdad22 360 my $self = shift @_;
361 $self->create_pool(%{$self->pool_args});
2bf79155 362}
363
26ab719a 364=head2 _build_balancer
2bf79155 365
cb6ec758 366Lazy builder for the L</balancer> attribute. This takes a Pool object so that
367the balancer knows which pool it's balancing.
2bf79155 368
369=cut
370
26ab719a 371sub _build_balancer {
64cdad22 372 my $self = shift @_;
373 $self->create_balancer(
374 pool=>$self->pool,
375 master=>$self->master,
376 %{$self->balancer_args},
377 );
2bf79155 378}
379
cb6ec758 380=head2 _build_write_handler
2bf79155 381
cb6ec758 382Lazy builder for the L</write_handler> attribute. The default is to set this to
383the L</master>.
50336325 384
385=cut
386
cb6ec758 387sub _build_write_handler {
64cdad22 388 return shift->master;
cb6ec758 389}
50336325 390
cb6ec758 391=head2 _build_read_handler
2bf79155 392
cb6ec758 393Lazy builder for the L</read_handler> attribute. The default is to set this to
394the L</balancer>.
2bf79155 395
396=cut
397
cb6ec758 398sub _build_read_handler {
64cdad22 399 return shift->balancer;
cb6ec758 400}
50336325 401
cb6ec758 402=head2 around: connect_replicants
2bf79155 403
cb6ec758 404All calls to connect_replicants needs to have an existing $schema tacked onto
405top of the args, since L<DBIx::Storage::DBI> needs it.
955a6df6 406
cb6ec758 407=cut
955a6df6 408
cb6ec758 409around 'connect_replicants' => sub {
64cdad22 410 my ($method, $self, @args) = @_;
411 $self->$method($self->schema, @args);
955a6df6 412};
2bf79155 413
2bf79155 414=head2 all_storages
415
416Returns an array of of all the connected storage backends. The first element
417in the returned array is the master, and the remainings are each of the
418replicants.
419
420=cut
421
422sub all_storages {
64cdad22 423 my $self = shift @_;
424 return grep {defined $_ && blessed $_} (
425 $self->master,
426 $self->replicants,
427 );
2bf79155 428}
429
c4d3fae2 430=head2 execute_reliably ($coderef, ?@args)
431
432Given a coderef, saves the current state of the L</read_handler>, forces it to
433use reliable storage (ie sets it to the master), executes a coderef and then
434restores the original state.
435
436Example:
437
64cdad22 438 my $reliably = sub {
439 my $name = shift @_;
440 $schema->resultset('User')->create({name=>$name});
441 my $user_rs = $schema->resultset('User')->find({name=>$name});
442 return $user_rs;
443 };
c4d3fae2 444
64cdad22 445 my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
c4d3fae2 446
447Use this when you must be certain of your database state, such as when you just
448inserted something and need to get a resultset including it, etc.
449
450=cut
451
452sub execute_reliably {
64cdad22 453 my ($self, $coderef, @args) = @_;
454
455 unless( ref $coderef eq 'CODE') {
456 $self->throw_exception('Second argument must be a coderef');
457 }
458
459 ##Get copy of master storage
460 my $master = $self->master;
461
462 ##Get whatever the current read hander is
463 my $current = $self->read_handler;
464
465 ##Set the read handler to master
466 $self->read_handler($master);
467
468 ## do whatever the caller needs
469 my @result;
470 my $want_array = wantarray;
471
472 eval {
473 if($want_array) {
474 @result = $coderef->(@args);
475 } elsif(defined $want_array) {
476 ($result[0]) = ($coderef->(@args));
ed213e85 477 } else {
64cdad22 478 $coderef->(@args);
479 }
480 };
481
482 ##Reset to the original state
483 $self->read_handler($current);
484
485 ##Exception testing has to come last, otherwise you might leave the
486 ##read_handler set to master.
487
488 if($@) {
489 $self->throw_exception("coderef returned an error: $@");
490 } else {
491 return $want_array ? @result : $result[0];
492 }
c4d3fae2 493}
494
cb6ec758 495=head2 set_reliable_storage
496
497Sets the current $schema to be 'reliable', that is all queries, both read and
498write are sent to the master
64cdad22 499
cb6ec758 500=cut
501
502sub set_reliable_storage {
64cdad22 503 my $self = shift @_;
504 my $schema = $self->schema;
505 my $write_handler = $self->schema->storage->write_handler;
506
507 $schema->storage->read_handler($write_handler);
cb6ec758 508}
509
510=head2 set_balanced_storage
511
512Sets the current $schema to be use the </balancer> for all reads, while all
513writea are sent to the master only
64cdad22 514
cb6ec758 515=cut
516
517sub set_balanced_storage {
64cdad22 518 my $self = shift @_;
519 my $schema = $self->schema;
520 my $write_handler = $self->schema->storage->balancer;
521
522 $schema->storage->read_handler($write_handler);
cb6ec758 523}
2bf79155 524
6834cc1d 525=head2 around: txn_do ($coderef)
c4d3fae2 526
527Overload to the txn_do method, which is delegated to whatever the
528L<write_handler> is set to. We overload this in order to wrap in inside a
529L</execute_reliably> method.
530
531=cut
532
6834cc1d 533around 'txn_do' => sub {
64cdad22 534 my($txn_do, $self, $coderef, @args) = @_;
535 $self->execute_reliably(sub {$self->$txn_do($coderef, @args)});
6834cc1d 536};
c4d3fae2 537
2bf79155 538=head2 connected
539
540Check that the master and at least one of the replicants is connected.
541
542=cut
543
544sub connected {
64cdad22 545 my $self = shift @_;
546 return
547 $self->master->connected &&
548 $self->pool->connected_replicants;
2bf79155 549}
550
2bf79155 551=head2 ensure_connected
552
553Make sure all the storages are connected.
554
555=cut
556
557sub ensure_connected {
64cdad22 558 my $self = shift @_;
559 foreach my $source ($self->all_storages) {
560 $source->ensure_connected(@_);
561 }
2bf79155 562}
563
2bf79155 564=head2 limit_dialect
565
566Set the limit_dialect for all existing storages
567
568=cut
569
570sub limit_dialect {
64cdad22 571 my $self = shift @_;
572 foreach my $source ($self->all_storages) {
573 $source->limit_dialect(@_);
574 }
3fbe08e3 575 return $self->master->quote_char;
2bf79155 576}
577
2bf79155 578=head2 quote_char
579
580Set the quote_char for all existing storages
581
582=cut
583
584sub quote_char {
64cdad22 585 my $self = shift @_;
586 foreach my $source ($self->all_storages) {
587 $source->quote_char(@_);
588 }
3fbe08e3 589 return $self->master->quote_char;
2bf79155 590}
591
2bf79155 592=head2 name_sep
593
594Set the name_sep for all existing storages
595
596=cut
597
598sub name_sep {
64cdad22 599 my $self = shift @_;
600 foreach my $source ($self->all_storages) {
601 $source->name_sep(@_);
602 }
3fbe08e3 603 return $self->master->name_sep;
2bf79155 604}
605
2bf79155 606=head2 set_schema
607
608Set the schema object for all existing storages
609
610=cut
611
612sub set_schema {
64cdad22 613 my $self = shift @_;
614 foreach my $source ($self->all_storages) {
615 $source->set_schema(@_);
616 }
2bf79155 617}
618
2bf79155 619=head2 debug
620
621set a debug flag across all storages
622
623=cut
624
625sub debug {
64cdad22 626 my $self = shift @_;
3fbe08e3 627 if(@_) {
628 foreach my $source ($self->all_storages) {
629 $source->debug(@_);
630 }
64cdad22 631 }
3fbe08e3 632 return $self->master->debug;
2bf79155 633}
634
2bf79155 635=head2 debugobj
636
637set a debug object across all storages
638
639=cut
640
641sub debugobj {
64cdad22 642 my $self = shift @_;
3fbe08e3 643 if(@_) {
644 foreach my $source ($self->all_storages) {
645 $source->debugobj(@_);
646 }
64cdad22 647 }
3fbe08e3 648 return $self->master->debugobj;
2bf79155 649}
650
2bf79155 651=head2 debugfh
652
653set a debugfh object across all storages
654
655=cut
656
657sub debugfh {
64cdad22 658 my $self = shift @_;
3fbe08e3 659 if(@_) {
660 foreach my $source ($self->all_storages) {
661 $source->debugfh(@_);
662 }
64cdad22 663 }
3fbe08e3 664 return $self->master->debugfh;
2bf79155 665}
666
2bf79155 667=head2 debugcb
668
669set a debug callback across all storages
670
671=cut
672
673sub debugcb {
64cdad22 674 my $self = shift @_;
3fbe08e3 675 if(@_) {
676 foreach my $source ($self->all_storages) {
677 $source->debugcb(@_);
678 }
64cdad22 679 }
3fbe08e3 680 return $self->master->debugcb;
2bf79155 681}
682
2bf79155 683=head2 disconnect
684
685disconnect everything
686
687=cut
688
689sub disconnect {
64cdad22 690 my $self = shift @_;
691 foreach my $source ($self->all_storages) {
692 $source->disconnect(@_);
693 }
2bf79155 694}
695
7e38d850 696=head1 GOTCHAS
697
698Due to the fact that replicants can lag behind a master, you must take care to
699make sure you use one of the methods to force read queries to a master should
700you need realtime data integrity. For example, if you insert a row, and then
701immediately re-read it from the database (say, by doing $row->discard_changes)
702or you insert a row and then immediately build a query that expects that row
703to be an item, you should force the master to handle reads. Otherwise, due to
704the lag, there is no certainty your data will be in the expected state.
705
706For data integrity, all transactions automatically use the master storage for
707all read and write queries. Using a transaction is the preferred and recommended
708method to force the master to handle all read queries.
709
710Otherwise, you can force a single query to use the master with the 'force_pool'
711attribute:
712
713 my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk);
714
715This attribute will safely be ignore by non replicated storages, so you can use
716the same code for both types of systems.
717
718Lastly, you can use the L</execute_reliably> method, which works very much like
719a transaction.
720
721For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
722and L</set_balanced_storage>, however this operates at a global level and is not
723suitable if you have a shared Schema object being used by multiple processes,
724such as on a web application server. You can get around this limitation by
725using the Schema clone method.
726
727 my $new_schema = $schema->clone;
728 $new_schema->set_reliable_storage;
729
730 ## $new_schema will use only the Master storage for all reads/writes while
731 ## the $schema object will use replicated storage.
732
f5d3a5de 733=head1 AUTHOR
734
64cdad22 735 John Napiorkowski <john.napiorkowski@takkle.com>
f5d3a5de 736
c4d3fae2 737Based on code originated by:
f5d3a5de 738
64cdad22 739 Norbert Csongrádi <bert@cpan.org>
740 Peter Siklósi <einon@einon.hu>
2156bbdd 741
f5d3a5de 742=head1 LICENSE
743
744You may distribute this code under the same terms as Perl itself.
745
746=cut
747
c354902c 748__PACKAGE__->meta->make_immutable;
749
f5d3a5de 7501;