proxying new storage method
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
CommitLineData
2156bbdd 1package DBIx::Class::Storage::DBI::Replicated;
f5d3a5de 2
ecb65397 3BEGIN {
4 use Carp::Clan qw/^DBIx::Class/;
5
6 ## Modules required for Replication support not required for general DBIC
7 ## use, so we explicitly test for these.
8
9 my %replication_required = (
10 Moose => '0.54',
11 MooseX::AttributeHelpers => '0.12',
12 Moose::Util::TypeConstraints => '0.54',
13 Class::MOP => '0.63',
14 );
15
16 my @didnt_load;
17
18 for my $module (keys %replication_required) {
19 eval "use $module $replication_required{$module}";
20 push @didnt_load, "$module $replication_required{$module}"
21 if $@;
22 }
23
24 croak("@{[ join ', ', @didnt_load ]} are missing and are required for Replication")
25 if @didnt_load;
26}
27
26ab719a 28use DBIx::Class::Storage::DBI;
2bf79155 29use DBIx::Class::Storage::DBI::Replicated::Pool;
26ab719a 30use DBIx::Class::Storage::DBI::Replicated::Balancer;
2bf79155 31
32=head1 NAME
33
ecb65397 34DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
2bf79155 35
36=head1 SYNOPSIS
37
38The Following example shows how to change an existing $schema to a replicated
39storage type, add some replicated (readonly) databases, and perform reporting
955a6df6 40tasks.
2bf79155 41
64cdad22 42 ## Change storage_type in your schema class
43 $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
44
45 ## Add some slaves. Basically this is an array of arrayrefs, where each
46 ## arrayref is database connect information
47
48 $schema->storage->connect_replicants(
49 [$dsn1, $user, $pass, \%opts],
50 [$dsn2, $user, $pass, \%opts],
51 [$dsn3, $user, $pass, \%opts],
52 );
53
7e38d850 54 ## Now, just use the $schema as normal
55 $schema->resultset('Source')->search({name=>'etc'});
56
57 ## You can force a given query to use a particular storage using the search
58 ### attribute 'force_pool'. For example:
59
60 my $RS = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
61
62 ## Now $RS will force everything (both reads and writes) to use whatever was
63 ## setup as the master storage. 'master' is hardcoded to always point to the
64 ## Master, but you can also use any Replicant name. Please see:
65 ## L<DBIx::Class::Storage::Replicated::Pool> and the replicants attribute for
66 ## More. Also see transactions and L</execute_reliably> for alternative ways
67 ## to force read traffic to the master.
68
2bf79155 69=head1 DESCRIPTION
70
7e38d850 71Warning: This class is marked BETA. This has been running a production
ccb3b897 72website using MySQL native replication as its backend and we have some decent
7e38d850 73test coverage but the code hasn't yet been stressed by a variety of databases.
74Individual DB's may have quirks we are not aware of. Please use this in first
75development and pass along your experiences/bug fixes.
2bf79155 76
77This class implements replicated data store for DBI. Currently you can define
78one master and numerous slave database connections. All write-type queries
79(INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
80database, all read-type queries (SELECTs) go to the slave database.
81
82Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
bca099a3 83handle gets delegated to one of the two attributes: L</read_handler> or to
84L</write_handler>. Additionally, some methods need to be distributed
2bf79155 85to all existing storages. This way our storage class is a drop in replacement
86for L<DBIx::Class::Storage::DBI>.
87
88Read traffic is spread across the replicants (slaves) occuring to a user
89selected algorithm. The default algorithm is random weighted.
90
bca099a3 91=head1 NOTES
92
93The consistancy betweeen master and replicants is database specific. The Pool
94gives you a method to validate it's replicants, removing and replacing them
7e38d850 95when they fail/pass predefined criteria. Please make careful use of the ways
ecb65397 96to force a query to run against Master when needed.
97
98=head1 REQUIREMENTS
99
100Replicated Storage has additional requirements not currently part of L<DBIx::Class>
101
102 Moose => 1.54
103 MooseX::AttributeHelpers => 0.12
104 Moose::Util::TypeConstraints => 0.54
105 Class::MOP => 0.63
106
107You will need to install these modules manually via CPAN or make them part of the
108Makefile for your distribution.
2bf79155 109
110=head1 ATTRIBUTES
111
112This class defines the following attributes.
113
2ce6e9a6 114=head2 schema
115
116The underlying L<DBIx::Class::Schema> object this storage is attaching
117
118=cut
119
120has 'schema' => (
121 is=>'rw',
122 isa=>'DBIx::Class::Schema',
123 weak_ref=>1,
124 required=>1,
125);
126
26ab719a 127=head2 pool_type
2bf79155 128
26ab719a 129Contains the classname which will instantiate the L</pool> object. Defaults
130to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
2bf79155 131
132=cut
133
26ab719a 134has 'pool_type' => (
64cdad22 135 is=>'ro',
136 isa=>'ClassName',
2ce6e9a6 137 required=>1,
138 default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
64cdad22 139 handles=>{
140 'create_pool' => 'new',
141 },
2bf79155 142);
143
f068a139 144=head2 pool_args
145
146Contains a hashref of initialized information to pass to the Balancer object.
147See L<DBIx::Class::Storage::Replicated::Pool> for available arguments.
148
149=cut
150
151has 'pool_args' => (
64cdad22 152 is=>'ro',
153 isa=>'HashRef',
154 lazy=>1,
155 required=>1,
156 default=>sub { {} },
f068a139 157);
158
159
26ab719a 160=head2 balancer_type
2bf79155 161
162The replication pool requires a balance class to provider the methods for
163choose how to spread the query load across each replicant in the pool.
164
165=cut
166
2ce6e9a6 167subtype 'DBIx::Class::Storage::DBI::Replicated::BalancerClassNamePart',
168 as 'ClassName';
169
170coerce 'DBIx::Class::Storage::DBI::Replicated::BalancerClassNamePart',
171 from 'Str',
172 via {
173 my $type = $_;
174 if($type=~m/^::/) {
175 $type = 'DBIx::Class::Storage::DBI::Replicated::Balancer'.$type;
176 }
177 Class::MOP::load_class($type);
178 $type;
179 };
180
26ab719a 181has 'balancer_type' => (
64cdad22 182 is=>'ro',
2ce6e9a6 183 isa=>'DBIx::Class::Storage::DBI::Replicated::BalancerClassNamePart',
184 coerce=>1,
185 required=>1,
186 default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
64cdad22 187 handles=>{
188 'create_balancer' => 'new',
189 },
2bf79155 190);
191
17b05c13 192=head2 balancer_args
193
194Contains a hashref of initialized information to pass to the Balancer object.
f068a139 195See L<DBIx::Class::Storage::Replicated::Balancer> for available arguments.
17b05c13 196
197=cut
198
199has 'balancer_args' => (
64cdad22 200 is=>'ro',
201 isa=>'HashRef',
202 lazy=>1,
203 required=>1,
204 default=>sub { {} },
17b05c13 205);
206
26ab719a 207=head2 pool
2bf79155 208
26ab719a 209Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class. This is a
210container class for one or more replicated databases.
2bf79155 211
212=cut
213
26ab719a 214has 'pool' => (
64cdad22 215 is=>'ro',
216 isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
217 lazy_build=>1,
218 handles=>[qw/
219 connect_replicants
220 replicants
221 has_replicants
222 /],
2bf79155 223);
224
26ab719a 225=head2 balancer
2bf79155 226
26ab719a 227Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class. This
228is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
2bf79155 229
26ab719a 230=cut
2bf79155 231
26ab719a 232has 'balancer' => (
64cdad22 233 is=>'ro',
234 isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
235 lazy_build=>1,
236 handles=>[qw/auto_validate_every/],
26ab719a 237);
2bf79155 238
cb6ec758 239=head2 master
240
241The master defines the canonical state for a pool of connected databases. All
242the replicants are expected to match this databases state. Thus, in a classic
243Master / Slaves distributed system, all the slaves are expected to replicate
244the Master's state as quick as possible. This is the only database in the
245pool of databases that is allowed to handle write traffic.
246
247=cut
248
249has 'master' => (
64cdad22 250 is=> 'ro',
251 isa=>'DBIx::Class::Storage::DBI',
252 lazy_build=>1,
cb6ec758 253);
254
cb6ec758 255=head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
256
257The following methods are delegated all the methods required for the
258L<DBIx::Class::Storage::DBI> interface.
259
260=head2 read_handler
261
262Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
263
264=cut
265
266has 'read_handler' => (
64cdad22 267 is=>'rw',
268 isa=>'Object',
269 lazy_build=>1,
270 handles=>[qw/
271 select
272 select_single
273 columns_info_for
274 /],
cb6ec758 275);
276
cb6ec758 277=head2 write_handler
278
279Defines an object that implements the write side of L<BIx::Class::Storage::DBI>.
280
281=cut
282
283has 'write_handler' => (
64cdad22 284 is=>'ro',
285 isa=>'Object',
286 lazy_build=>1,
287 lazy_build=>1,
288 handles=>[qw/
289 on_connect_do
290 on_disconnect_do
291 connect_info
292 throw_exception
293 sql_maker
294 sqlt_type
295 create_ddl_dir
296 deployment_statements
297 datetime_parser
298 datetime_parser_type
299 last_insert_id
300 insert
301 insert_bulk
302 update
303 delete
304 dbh
2ce6e9a6 305 txn_begin
64cdad22 306 txn_do
307 txn_commit
308 txn_rollback
2ce6e9a6 309 txn_scope_guard
64cdad22 310 sth
311 deploy
0180bef9 312 with_deferred_fk_checks
2ce6e9a6 313
64cdad22 314 reload_row
2ce6e9a6 315 _prep_for_execute
316 configure_sqlt
317
64cdad22 318 /],
cb6ec758 319);
320
26ab719a 321=head1 METHODS
2bf79155 322
26ab719a 323This class defines the following methods.
2bf79155 324
c354902c 325=head2 BUILDARGS
2bf79155 326
cb6ec758 327L<DBIx::Class::Schema> when instantiating it's storage passed itself as the
2ce6e9a6 328first argument. So we need to massage the arguments a bit so that all the
329bits get put into the correct places.
2bf79155 330
331=cut
332
c354902c 333sub BUILDARGS {
334 my ($class, $schema, $storage_type_args, @args) = @_;
335
336 return {
337 schema=>$schema,
338 %$storage_type_args,
339 @args
340 }
341}
2bf79155 342
cb6ec758 343=head2 _build_master
2bf79155 344
cb6ec758 345Lazy builder for the L</master> attribute.
2bf79155 346
347=cut
348
cb6ec758 349sub _build_master {
2ce6e9a6 350 my $self = shift @_;
351 DBIx::Class::Storage::DBI->new($self->schema);
106d5f3b 352}
353
26ab719a 354=head2 _build_pool
2bf79155 355
26ab719a 356Lazy builder for the L</pool> attribute.
2bf79155 357
358=cut
359
26ab719a 360sub _build_pool {
64cdad22 361 my $self = shift @_;
362 $self->create_pool(%{$self->pool_args});
2bf79155 363}
364
26ab719a 365=head2 _build_balancer
2bf79155 366
cb6ec758 367Lazy builder for the L</balancer> attribute. This takes a Pool object so that
368the balancer knows which pool it's balancing.
2bf79155 369
370=cut
371
26ab719a 372sub _build_balancer {
64cdad22 373 my $self = shift @_;
374 $self->create_balancer(
375 pool=>$self->pool,
376 master=>$self->master,
377 %{$self->balancer_args},
378 );
2bf79155 379}
380
cb6ec758 381=head2 _build_write_handler
2bf79155 382
cb6ec758 383Lazy builder for the L</write_handler> attribute. The default is to set this to
384the L</master>.
50336325 385
386=cut
387
cb6ec758 388sub _build_write_handler {
64cdad22 389 return shift->master;
cb6ec758 390}
50336325 391
cb6ec758 392=head2 _build_read_handler
2bf79155 393
cb6ec758 394Lazy builder for the L</read_handler> attribute. The default is to set this to
395the L</balancer>.
2bf79155 396
397=cut
398
cb6ec758 399sub _build_read_handler {
64cdad22 400 return shift->balancer;
cb6ec758 401}
50336325 402
cb6ec758 403=head2 around: connect_replicants
2bf79155 404
cb6ec758 405All calls to connect_replicants needs to have an existing $schema tacked onto
406top of the args, since L<DBIx::Storage::DBI> needs it.
955a6df6 407
cb6ec758 408=cut
955a6df6 409
cb6ec758 410around 'connect_replicants' => sub {
64cdad22 411 my ($method, $self, @args) = @_;
412 $self->$method($self->schema, @args);
955a6df6 413};
2bf79155 414
2bf79155 415=head2 all_storages
416
417Returns an array of of all the connected storage backends. The first element
418in the returned array is the master, and the remainings are each of the
419replicants.
420
421=cut
422
423sub all_storages {
64cdad22 424 my $self = shift @_;
425 return grep {defined $_ && blessed $_} (
426 $self->master,
427 $self->replicants,
428 );
2bf79155 429}
430
c4d3fae2 431=head2 execute_reliably ($coderef, ?@args)
432
433Given a coderef, saves the current state of the L</read_handler>, forces it to
434use reliable storage (ie sets it to the master), executes a coderef and then
435restores the original state.
436
437Example:
438
64cdad22 439 my $reliably = sub {
440 my $name = shift @_;
441 $schema->resultset('User')->create({name=>$name});
442 my $user_rs = $schema->resultset('User')->find({name=>$name});
443 return $user_rs;
444 };
c4d3fae2 445
64cdad22 446 my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
c4d3fae2 447
448Use this when you must be certain of your database state, such as when you just
449inserted something and need to get a resultset including it, etc.
450
451=cut
452
453sub execute_reliably {
64cdad22 454 my ($self, $coderef, @args) = @_;
455
456 unless( ref $coderef eq 'CODE') {
457 $self->throw_exception('Second argument must be a coderef');
458 }
459
460 ##Get copy of master storage
461 my $master = $self->master;
462
463 ##Get whatever the current read hander is
464 my $current = $self->read_handler;
465
466 ##Set the read handler to master
467 $self->read_handler($master);
468
469 ## do whatever the caller needs
470 my @result;
471 my $want_array = wantarray;
472
473 eval {
474 if($want_array) {
475 @result = $coderef->(@args);
476 } elsif(defined $want_array) {
477 ($result[0]) = ($coderef->(@args));
ed213e85 478 } else {
64cdad22 479 $coderef->(@args);
480 }
481 };
482
483 ##Reset to the original state
484 $self->read_handler($current);
485
486 ##Exception testing has to come last, otherwise you might leave the
487 ##read_handler set to master.
488
489 if($@) {
490 $self->throw_exception("coderef returned an error: $@");
491 } else {
492 return $want_array ? @result : $result[0];
493 }
c4d3fae2 494}
495
cb6ec758 496=head2 set_reliable_storage
497
498Sets the current $schema to be 'reliable', that is all queries, both read and
499write are sent to the master
64cdad22 500
cb6ec758 501=cut
502
503sub set_reliable_storage {
64cdad22 504 my $self = shift @_;
505 my $schema = $self->schema;
506 my $write_handler = $self->schema->storage->write_handler;
507
508 $schema->storage->read_handler($write_handler);
cb6ec758 509}
510
511=head2 set_balanced_storage
512
513Sets the current $schema to be use the </balancer> for all reads, while all
514writea are sent to the master only
64cdad22 515
cb6ec758 516=cut
517
518sub set_balanced_storage {
64cdad22 519 my $self = shift @_;
520 my $schema = $self->schema;
521 my $write_handler = $self->schema->storage->balancer;
522
523 $schema->storage->read_handler($write_handler);
cb6ec758 524}
2bf79155 525
6834cc1d 526=head2 around: txn_do ($coderef)
c4d3fae2 527
528Overload to the txn_do method, which is delegated to whatever the
529L<write_handler> is set to. We overload this in order to wrap in inside a
530L</execute_reliably> method.
531
532=cut
533
6834cc1d 534around 'txn_do' => sub {
64cdad22 535 my($txn_do, $self, $coderef, @args) = @_;
536 $self->execute_reliably(sub {$self->$txn_do($coderef, @args)});
6834cc1d 537};
c4d3fae2 538
2bf79155 539=head2 connected
540
541Check that the master and at least one of the replicants is connected.
542
543=cut
544
545sub connected {
64cdad22 546 my $self = shift @_;
547 return
548 $self->master->connected &&
549 $self->pool->connected_replicants;
2bf79155 550}
551
2bf79155 552=head2 ensure_connected
553
554Make sure all the storages are connected.
555
556=cut
557
558sub ensure_connected {
64cdad22 559 my $self = shift @_;
560 foreach my $source ($self->all_storages) {
561 $source->ensure_connected(@_);
562 }
2bf79155 563}
564
2bf79155 565=head2 limit_dialect
566
567Set the limit_dialect for all existing storages
568
569=cut
570
571sub limit_dialect {
64cdad22 572 my $self = shift @_;
573 foreach my $source ($self->all_storages) {
574 $source->limit_dialect(@_);
575 }
3fbe08e3 576 return $self->master->quote_char;
2bf79155 577}
578
2bf79155 579=head2 quote_char
580
581Set the quote_char for all existing storages
582
583=cut
584
585sub quote_char {
64cdad22 586 my $self = shift @_;
587 foreach my $source ($self->all_storages) {
588 $source->quote_char(@_);
589 }
3fbe08e3 590 return $self->master->quote_char;
2bf79155 591}
592
2bf79155 593=head2 name_sep
594
595Set the name_sep for all existing storages
596
597=cut
598
599sub name_sep {
64cdad22 600 my $self = shift @_;
601 foreach my $source ($self->all_storages) {
602 $source->name_sep(@_);
603 }
3fbe08e3 604 return $self->master->name_sep;
2bf79155 605}
606
2bf79155 607=head2 set_schema
608
609Set the schema object for all existing storages
610
611=cut
612
613sub set_schema {
64cdad22 614 my $self = shift @_;
615 foreach my $source ($self->all_storages) {
616 $source->set_schema(@_);
617 }
2bf79155 618}
619
2bf79155 620=head2 debug
621
622set a debug flag across all storages
623
624=cut
625
626sub debug {
64cdad22 627 my $self = shift @_;
3fbe08e3 628 if(@_) {
629 foreach my $source ($self->all_storages) {
630 $source->debug(@_);
631 }
64cdad22 632 }
3fbe08e3 633 return $self->master->debug;
2bf79155 634}
635
2bf79155 636=head2 debugobj
637
638set a debug object across all storages
639
640=cut
641
642sub debugobj {
64cdad22 643 my $self = shift @_;
3fbe08e3 644 if(@_) {
645 foreach my $source ($self->all_storages) {
646 $source->debugobj(@_);
647 }
64cdad22 648 }
3fbe08e3 649 return $self->master->debugobj;
2bf79155 650}
651
2bf79155 652=head2 debugfh
653
654set a debugfh object across all storages
655
656=cut
657
658sub debugfh {
64cdad22 659 my $self = shift @_;
3fbe08e3 660 if(@_) {
661 foreach my $source ($self->all_storages) {
662 $source->debugfh(@_);
663 }
64cdad22 664 }
3fbe08e3 665 return $self->master->debugfh;
2bf79155 666}
667
2bf79155 668=head2 debugcb
669
670set a debug callback across all storages
671
672=cut
673
674sub debugcb {
64cdad22 675 my $self = shift @_;
3fbe08e3 676 if(@_) {
677 foreach my $source ($self->all_storages) {
678 $source->debugcb(@_);
679 }
64cdad22 680 }
3fbe08e3 681 return $self->master->debugcb;
2bf79155 682}
683
2bf79155 684=head2 disconnect
685
686disconnect everything
687
688=cut
689
690sub disconnect {
64cdad22 691 my $self = shift @_;
692 foreach my $source ($self->all_storages) {
693 $source->disconnect(@_);
694 }
2bf79155 695}
696
7e38d850 697=head1 GOTCHAS
698
699Due to the fact that replicants can lag behind a master, you must take care to
700make sure you use one of the methods to force read queries to a master should
701you need realtime data integrity. For example, if you insert a row, and then
702immediately re-read it from the database (say, by doing $row->discard_changes)
703or you insert a row and then immediately build a query that expects that row
704to be an item, you should force the master to handle reads. Otherwise, due to
705the lag, there is no certainty your data will be in the expected state.
706
707For data integrity, all transactions automatically use the master storage for
708all read and write queries. Using a transaction is the preferred and recommended
709method to force the master to handle all read queries.
710
711Otherwise, you can force a single query to use the master with the 'force_pool'
712attribute:
713
714 my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk);
715
716This attribute will safely be ignore by non replicated storages, so you can use
717the same code for both types of systems.
718
719Lastly, you can use the L</execute_reliably> method, which works very much like
720a transaction.
721
722For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
723and L</set_balanced_storage>, however this operates at a global level and is not
724suitable if you have a shared Schema object being used by multiple processes,
725such as on a web application server. You can get around this limitation by
726using the Schema clone method.
727
728 my $new_schema = $schema->clone;
729 $new_schema->set_reliable_storage;
730
731 ## $new_schema will use only the Master storage for all reads/writes while
732 ## the $schema object will use replicated storage.
733
f5d3a5de 734=head1 AUTHOR
735
64cdad22 736 John Napiorkowski <john.napiorkowski@takkle.com>
f5d3a5de 737
c4d3fae2 738Based on code originated by:
f5d3a5de 739
64cdad22 740 Norbert Csongrádi <bert@cpan.org>
741 Peter Siklósi <einon@einon.hu>
2156bbdd 742
f5d3a5de 743=head1 LICENSE
744
745You may distribute this code under the same terms as Perl itself.
746
747=cut
748
c354902c 749__PACKAGE__->meta->make_immutable;
750
f5d3a5de 7511;