Merge 'trunk' into 'replication_dedux'
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
CommitLineData
2156bbdd 1package DBIx::Class::Storage::DBI::Replicated;
f5d3a5de 2
2bf79155 3use Moose;
2ce6e9a6 4use Class::MOP;
5use Moose::Util::TypeConstraints;
26ab719a 6use DBIx::Class::Storage::DBI;
2bf79155 7use DBIx::Class::Storage::DBI::Replicated::Pool;
26ab719a 8use DBIx::Class::Storage::DBI::Replicated::Balancer;
2bf79155 9
10=head1 NAME
11
12DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
13
14=head1 SYNOPSIS
15
16The Following example shows how to change an existing $schema to a replicated
17storage type, add some replicated (readonly) databases, and perform reporting
955a6df6 18tasks.
2bf79155 19
64cdad22 20 ## Change storage_type in your schema class
21 $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
22
23 ## Add some slaves. Basically this is an array of arrayrefs, where each
24 ## arrayref is database connect information
25
26 $schema->storage->connect_replicants(
27 [$dsn1, $user, $pass, \%opts],
28 [$dsn2, $user, $pass, \%opts],
29 [$dsn3, $user, $pass, \%opts],
30 );
31
7e38d850 32 ## Now, just use the $schema as normal
33 $schema->resultset('Source')->search({name=>'etc'});
34
35 ## You can force a given query to use a particular storage using the search
36 ### attribute 'force_pool'. For example:
37
38 my $RS = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
39
40 ## Now $RS will force everything (both reads and writes) to use whatever was
41 ## setup as the master storage. 'master' is hardcoded to always point to the
42 ## Master, but you can also use any Replicant name. Please see:
43 ## L<DBIx::Class::Storage::Replicated::Pool> and the replicants attribute for
44 ## More. Also see transactions and L</execute_reliably> for alternative ways
45 ## to force read traffic to the master.
46
2bf79155 47=head1 DESCRIPTION
48
7e38d850 49Warning: This class is marked BETA. This has been running a production
50website using MySQL native replication as it's backend and we have some decent
51test coverage but the code hasn't yet been stressed by a variety of databases.
52Individual DB's may have quirks we are not aware of. Please use this in first
53development and pass along your experiences/bug fixes.
2bf79155 54
55This class implements replicated data store for DBI. Currently you can define
56one master and numerous slave database connections. All write-type queries
57(INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
58database, all read-type queries (SELECTs) go to the slave database.
59
60Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
bca099a3 61handle gets delegated to one of the two attributes: L</read_handler> or to
62L</write_handler>. Additionally, some methods need to be distributed
2bf79155 63to all existing storages. This way our storage class is a drop in replacement
64for L<DBIx::Class::Storage::DBI>.
65
66Read traffic is spread across the replicants (slaves) occuring to a user
67selected algorithm. The default algorithm is random weighted.
68
bca099a3 69=head1 NOTES
70
71The consistancy betweeen master and replicants is database specific. The Pool
72gives you a method to validate it's replicants, removing and replacing them
7e38d850 73when they fail/pass predefined criteria. Please make careful use of the ways
74to force a query to run against Master when needed.
2bf79155 75
76=head1 ATTRIBUTES
77
78This class defines the following attributes.
79
2ce6e9a6 80=head2 schema
81
82The underlying L<DBIx::Class::Schema> object this storage is attaching
83
84=cut
85
86has 'schema' => (
87 is=>'rw',
88 isa=>'DBIx::Class::Schema',
89 weak_ref=>1,
90 required=>1,
91);
92
26ab719a 93=head2 pool_type
2bf79155 94
26ab719a 95Contains the classname which will instantiate the L</pool> object. Defaults
96to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
2bf79155 97
98=cut
99
26ab719a 100has 'pool_type' => (
64cdad22 101 is=>'ro',
102 isa=>'ClassName',
2ce6e9a6 103 required=>1,
104 default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
64cdad22 105 handles=>{
106 'create_pool' => 'new',
107 },
2bf79155 108);
109
f068a139 110=head2 pool_args
111
112Contains a hashref of initialized information to pass to the Balancer object.
113See L<DBIx::Class::Storage::Replicated::Pool> for available arguments.
114
115=cut
116
117has 'pool_args' => (
64cdad22 118 is=>'ro',
119 isa=>'HashRef',
120 lazy=>1,
121 required=>1,
122 default=>sub { {} },
f068a139 123);
124
125
26ab719a 126=head2 balancer_type
2bf79155 127
128The replication pool requires a balance class to provider the methods for
129choose how to spread the query load across each replicant in the pool.
130
131=cut
132
2ce6e9a6 133subtype 'DBIx::Class::Storage::DBI::Replicated::BalancerClassNamePart',
134 as 'ClassName';
135
136coerce 'DBIx::Class::Storage::DBI::Replicated::BalancerClassNamePart',
137 from 'Str',
138 via {
139 my $type = $_;
140 if($type=~m/^::/) {
141 $type = 'DBIx::Class::Storage::DBI::Replicated::Balancer'.$type;
142 }
143 Class::MOP::load_class($type);
144 $type;
145 };
146
26ab719a 147has 'balancer_type' => (
64cdad22 148 is=>'ro',
2ce6e9a6 149 isa=>'DBIx::Class::Storage::DBI::Replicated::BalancerClassNamePart',
150 coerce=>1,
151 required=>1,
152 default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
64cdad22 153 handles=>{
154 'create_balancer' => 'new',
155 },
2bf79155 156);
157
17b05c13 158=head2 balancer_args
159
160Contains a hashref of initialized information to pass to the Balancer object.
f068a139 161See L<DBIx::Class::Storage::Replicated::Balancer> for available arguments.
17b05c13 162
163=cut
164
165has 'balancer_args' => (
64cdad22 166 is=>'ro',
167 isa=>'HashRef',
168 lazy=>1,
169 required=>1,
170 default=>sub { {} },
17b05c13 171);
172
26ab719a 173=head2 pool
2bf79155 174
26ab719a 175Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class. This is a
176container class for one or more replicated databases.
2bf79155 177
178=cut
179
26ab719a 180has 'pool' => (
64cdad22 181 is=>'ro',
182 isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
183 lazy_build=>1,
184 handles=>[qw/
185 connect_replicants
186 replicants
187 has_replicants
188 /],
2bf79155 189);
190
26ab719a 191=head2 balancer
2bf79155 192
26ab719a 193Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class. This
194is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
2bf79155 195
26ab719a 196=cut
2bf79155 197
26ab719a 198has 'balancer' => (
64cdad22 199 is=>'ro',
200 isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
201 lazy_build=>1,
202 handles=>[qw/auto_validate_every/],
26ab719a 203);
2bf79155 204
cb6ec758 205=head2 master
206
207The master defines the canonical state for a pool of connected databases. All
208the replicants are expected to match this databases state. Thus, in a classic
209Master / Slaves distributed system, all the slaves are expected to replicate
210the Master's state as quick as possible. This is the only database in the
211pool of databases that is allowed to handle write traffic.
212
213=cut
214
215has 'master' => (
64cdad22 216 is=> 'ro',
217 isa=>'DBIx::Class::Storage::DBI',
218 lazy_build=>1,
cb6ec758 219);
220
cb6ec758 221=head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
222
223The following methods are delegated all the methods required for the
224L<DBIx::Class::Storage::DBI> interface.
225
226=head2 read_handler
227
228Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
229
230=cut
231
232has 'read_handler' => (
64cdad22 233 is=>'rw',
234 isa=>'Object',
235 lazy_build=>1,
236 handles=>[qw/
237 select
238 select_single
239 columns_info_for
240 /],
cb6ec758 241);
242
cb6ec758 243=head2 write_handler
244
245Defines an object that implements the write side of L<BIx::Class::Storage::DBI>.
246
247=cut
248
249has 'write_handler' => (
64cdad22 250 is=>'ro',
251 isa=>'Object',
252 lazy_build=>1,
253 lazy_build=>1,
254 handles=>[qw/
255 on_connect_do
256 on_disconnect_do
257 connect_info
258 throw_exception
259 sql_maker
260 sqlt_type
261 create_ddl_dir
262 deployment_statements
263 datetime_parser
264 datetime_parser_type
265 last_insert_id
266 insert
267 insert_bulk
268 update
269 delete
270 dbh
2ce6e9a6 271 txn_begin
64cdad22 272 txn_do
273 txn_commit
274 txn_rollback
2ce6e9a6 275 txn_scope_guard
64cdad22 276 sth
277 deploy
2ce6e9a6 278
64cdad22 279 reload_row
2ce6e9a6 280 _prep_for_execute
281 configure_sqlt
282
64cdad22 283 /],
cb6ec758 284);
285
26ab719a 286=head1 METHODS
2bf79155 287
26ab719a 288This class defines the following methods.
2bf79155 289
cb6ec758 290=head2 new
2bf79155 291
cb6ec758 292L<DBIx::Class::Schema> when instantiating it's storage passed itself as the
2ce6e9a6 293first argument. So we need to massage the arguments a bit so that all the
294bits get put into the correct places.
2bf79155 295
296=cut
297
2ce6e9a6 298around 'new' => sub {
299 my ($new, $self, $schema, $storage_type_args, @args) = @_;
300 return $self->$new(schema=>$schema, %$storage_type_args, @args);
301};
2bf79155 302
cb6ec758 303=head2 _build_master
2bf79155 304
cb6ec758 305Lazy builder for the L</master> attribute.
2bf79155 306
307=cut
308
cb6ec758 309sub _build_master {
2ce6e9a6 310 my $self = shift @_;
311 DBIx::Class::Storage::DBI->new($self->schema);
106d5f3b 312}
313
26ab719a 314=head2 _build_pool
2bf79155 315
26ab719a 316Lazy builder for the L</pool> attribute.
2bf79155 317
318=cut
319
26ab719a 320sub _build_pool {
64cdad22 321 my $self = shift @_;
322 $self->create_pool(%{$self->pool_args});
2bf79155 323}
324
26ab719a 325=head2 _build_balancer
2bf79155 326
cb6ec758 327Lazy builder for the L</balancer> attribute. This takes a Pool object so that
328the balancer knows which pool it's balancing.
2bf79155 329
330=cut
331
26ab719a 332sub _build_balancer {
64cdad22 333 my $self = shift @_;
334 $self->create_balancer(
335 pool=>$self->pool,
336 master=>$self->master,
337 %{$self->balancer_args},
338 );
2bf79155 339}
340
cb6ec758 341=head2 _build_write_handler
2bf79155 342
cb6ec758 343Lazy builder for the L</write_handler> attribute. The default is to set this to
344the L</master>.
50336325 345
346=cut
347
cb6ec758 348sub _build_write_handler {
64cdad22 349 return shift->master;
cb6ec758 350}
50336325 351
cb6ec758 352=head2 _build_read_handler
2bf79155 353
cb6ec758 354Lazy builder for the L</read_handler> attribute. The default is to set this to
355the L</balancer>.
2bf79155 356
357=cut
358
cb6ec758 359sub _build_read_handler {
64cdad22 360 return shift->balancer;
cb6ec758 361}
50336325 362
cb6ec758 363=head2 around: connect_replicants
2bf79155 364
cb6ec758 365All calls to connect_replicants needs to have an existing $schema tacked onto
366top of the args, since L<DBIx::Storage::DBI> needs it.
955a6df6 367
cb6ec758 368=cut
955a6df6 369
cb6ec758 370around 'connect_replicants' => sub {
64cdad22 371 my ($method, $self, @args) = @_;
372 $self->$method($self->schema, @args);
955a6df6 373};
2bf79155 374
2bf79155 375=head2 all_storages
376
377Returns an array of of all the connected storage backends. The first element
378in the returned array is the master, and the remainings are each of the
379replicants.
380
381=cut
382
383sub all_storages {
64cdad22 384 my $self = shift @_;
385 return grep {defined $_ && blessed $_} (
386 $self->master,
387 $self->replicants,
388 );
2bf79155 389}
390
c4d3fae2 391=head2 execute_reliably ($coderef, ?@args)
392
393Given a coderef, saves the current state of the L</read_handler>, forces it to
394use reliable storage (ie sets it to the master), executes a coderef and then
395restores the original state.
396
397Example:
398
64cdad22 399 my $reliably = sub {
400 my $name = shift @_;
401 $schema->resultset('User')->create({name=>$name});
402 my $user_rs = $schema->resultset('User')->find({name=>$name});
403 return $user_rs;
404 };
c4d3fae2 405
64cdad22 406 my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
c4d3fae2 407
408Use this when you must be certain of your database state, such as when you just
409inserted something and need to get a resultset including it, etc.
410
411=cut
412
413sub execute_reliably {
64cdad22 414 my ($self, $coderef, @args) = @_;
415
416 unless( ref $coderef eq 'CODE') {
417 $self->throw_exception('Second argument must be a coderef');
418 }
419
420 ##Get copy of master storage
421 my $master = $self->master;
422
423 ##Get whatever the current read hander is
424 my $current = $self->read_handler;
425
426 ##Set the read handler to master
427 $self->read_handler($master);
428
429 ## do whatever the caller needs
430 my @result;
431 my $want_array = wantarray;
432
433 eval {
434 if($want_array) {
435 @result = $coderef->(@args);
436 } elsif(defined $want_array) {
437 ($result[0]) = ($coderef->(@args));
ed213e85 438 } else {
64cdad22 439 $coderef->(@args);
440 }
441 };
442
443 ##Reset to the original state
444 $self->read_handler($current);
445
446 ##Exception testing has to come last, otherwise you might leave the
447 ##read_handler set to master.
448
449 if($@) {
450 $self->throw_exception("coderef returned an error: $@");
451 } else {
452 return $want_array ? @result : $result[0];
453 }
c4d3fae2 454}
455
cb6ec758 456=head2 set_reliable_storage
457
458Sets the current $schema to be 'reliable', that is all queries, both read and
459write are sent to the master
64cdad22 460
cb6ec758 461=cut
462
463sub set_reliable_storage {
64cdad22 464 my $self = shift @_;
465 my $schema = $self->schema;
466 my $write_handler = $self->schema->storage->write_handler;
467
468 $schema->storage->read_handler($write_handler);
cb6ec758 469}
470
471=head2 set_balanced_storage
472
473Sets the current $schema to be use the </balancer> for all reads, while all
474writea are sent to the master only
64cdad22 475
cb6ec758 476=cut
477
478sub set_balanced_storage {
64cdad22 479 my $self = shift @_;
480 my $schema = $self->schema;
481 my $write_handler = $self->schema->storage->balancer;
482
483 $schema->storage->read_handler($write_handler);
cb6ec758 484}
2bf79155 485
6834cc1d 486=head2 around: txn_do ($coderef)
c4d3fae2 487
488Overload to the txn_do method, which is delegated to whatever the
489L<write_handler> is set to. We overload this in order to wrap in inside a
490L</execute_reliably> method.
491
492=cut
493
6834cc1d 494around 'txn_do' => sub {
64cdad22 495 my($txn_do, $self, $coderef, @args) = @_;
496 $self->execute_reliably(sub {$self->$txn_do($coderef, @args)});
6834cc1d 497};
c4d3fae2 498
2bf79155 499=head2 connected
500
501Check that the master and at least one of the replicants is connected.
502
503=cut
504
505sub connected {
64cdad22 506 my $self = shift @_;
507 return
508 $self->master->connected &&
509 $self->pool->connected_replicants;
2bf79155 510}
511
2bf79155 512=head2 ensure_connected
513
514Make sure all the storages are connected.
515
516=cut
517
518sub ensure_connected {
64cdad22 519 my $self = shift @_;
520 foreach my $source ($self->all_storages) {
521 $source->ensure_connected(@_);
522 }
2bf79155 523}
524
2bf79155 525=head2 limit_dialect
526
527Set the limit_dialect for all existing storages
528
529=cut
530
531sub limit_dialect {
64cdad22 532 my $self = shift @_;
533 foreach my $source ($self->all_storages) {
534 $source->limit_dialect(@_);
535 }
2bf79155 536}
537
2bf79155 538=head2 quote_char
539
540Set the quote_char for all existing storages
541
542=cut
543
544sub quote_char {
64cdad22 545 my $self = shift @_;
546 foreach my $source ($self->all_storages) {
547 $source->quote_char(@_);
548 }
2bf79155 549}
550
2bf79155 551=head2 name_sep
552
553Set the name_sep for all existing storages
554
555=cut
556
557sub name_sep {
64cdad22 558 my $self = shift @_;
559 foreach my $source ($self->all_storages) {
560 $source->name_sep(@_);
561 }
2bf79155 562}
563
2bf79155 564=head2 set_schema
565
566Set the schema object for all existing storages
567
568=cut
569
570sub set_schema {
64cdad22 571 my $self = shift @_;
572 foreach my $source ($self->all_storages) {
573 $source->set_schema(@_);
574 }
2bf79155 575}
576
2bf79155 577=head2 debug
578
579set a debug flag across all storages
580
581=cut
582
583sub debug {
64cdad22 584 my $self = shift @_;
585 foreach my $source ($self->all_storages) {
586 $source->debug(@_);
587 }
2bf79155 588}
589
2bf79155 590=head2 debugobj
591
592set a debug object across all storages
593
594=cut
595
596sub debugobj {
64cdad22 597 my $self = shift @_;
598 foreach my $source ($self->all_storages) {
599 $source->debugobj(@_);
600 }
2bf79155 601}
602
2bf79155 603=head2 debugfh
604
605set a debugfh object across all storages
606
607=cut
608
609sub debugfh {
64cdad22 610 my $self = shift @_;
611 foreach my $source ($self->all_storages) {
612 $source->debugfh(@_);
613 }
2bf79155 614}
615
2bf79155 616=head2 debugcb
617
618set a debug callback across all storages
619
620=cut
621
622sub debugcb {
64cdad22 623 my $self = shift @_;
624 foreach my $source ($self->all_storages) {
625 $source->debugcb(@_);
626 }
2bf79155 627}
628
2bf79155 629=head2 disconnect
630
631disconnect everything
632
633=cut
634
635sub disconnect {
64cdad22 636 my $self = shift @_;
637 foreach my $source ($self->all_storages) {
638 $source->disconnect(@_);
639 }
2bf79155 640}
641
7e38d850 642=head1 GOTCHAS
643
644Due to the fact that replicants can lag behind a master, you must take care to
645make sure you use one of the methods to force read queries to a master should
646you need realtime data integrity. For example, if you insert a row, and then
647immediately re-read it from the database (say, by doing $row->discard_changes)
648or you insert a row and then immediately build a query that expects that row
649to be an item, you should force the master to handle reads. Otherwise, due to
650the lag, there is no certainty your data will be in the expected state.
651
652For data integrity, all transactions automatically use the master storage for
653all read and write queries. Using a transaction is the preferred and recommended
654method to force the master to handle all read queries.
655
656Otherwise, you can force a single query to use the master with the 'force_pool'
657attribute:
658
659 my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk);
660
661This attribute will safely be ignore by non replicated storages, so you can use
662the same code for both types of systems.
663
664Lastly, you can use the L</execute_reliably> method, which works very much like
665a transaction.
666
667For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
668and L</set_balanced_storage>, however this operates at a global level and is not
669suitable if you have a shared Schema object being used by multiple processes,
670such as on a web application server. You can get around this limitation by
671using the Schema clone method.
672
673 my $new_schema = $schema->clone;
674 $new_schema->set_reliable_storage;
675
676 ## $new_schema will use only the Master storage for all reads/writes while
677 ## the $schema object will use replicated storage.
678
f5d3a5de 679=head1 AUTHOR
680
64cdad22 681 John Napiorkowski <john.napiorkowski@takkle.com>
f5d3a5de 682
c4d3fae2 683Based on code originated by:
f5d3a5de 684
64cdad22 685 Norbert Csongrádi <bert@cpan.org>
686 Peter Siklósi <einon@einon.hu>
2156bbdd 687
f5d3a5de 688=head1 LICENSE
689
690You may distribute this code under the same terms as Perl itself.
691
692=cut
693
6941;