Merge 'trunk' into 'replication_dedux'
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
CommitLineData
2156bbdd 1package DBIx::Class::Storage::DBI::Replicated;
f5d3a5de 2
2bf79155 3use Moose;
26ab719a 4use DBIx::Class::Storage::DBI;
2bf79155 5use DBIx::Class::Storage::DBI::Replicated::Pool;
26ab719a 6use DBIx::Class::Storage::DBI::Replicated::Balancer;
7use Scalar::Util qw(blessed);
2bf79155 8
26ab719a 9extends 'DBIx::Class::Storage::DBI', 'Moose::Object';
2bf79155 10
11=head1 NAME
12
13DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
14
15=head1 SYNOPSIS
16
17The Following example shows how to change an existing $schema to a replicated
18storage type, add some replicated (readonly) databases, and perform reporting
955a6df6 19tasks.
2bf79155 20
21 ## Change storage_type in your schema class
106d5f3b 22 $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
2bf79155 23
24 ## Add some slaves. Basically this is an array of arrayrefs, where each
25 ## arrayref is database connect information
26
955a6df6 27 $schema->storage->connect_replicants(
2bf79155 28 [$dsn1, $user, $pass, \%opts],
1c6a29fe 29 [$dsn2, $user, $pass, \%opts],
30 [$dsn3, $user, $pass, \%opts],
2bf79155 31 );
2bf79155 32
33=head1 DESCRIPTION
34
35Warning: This class is marked ALPHA. We are using this in development and have
36some basic test coverage but the code hasn't yet been stressed by a variety
37of databases. Individual DB's may have quirks we are not aware of. Please
38use this in development and pass along your experiences/bug fixes.
39
40This class implements replicated data store for DBI. Currently you can define
41one master and numerous slave database connections. All write-type queries
42(INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
43database, all read-type queries (SELECTs) go to the slave database.
44
45Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
bca099a3 46handle gets delegated to one of the two attributes: L</read_handler> or to
47L</write_handler>. Additionally, some methods need to be distributed
2bf79155 48to all existing storages. This way our storage class is a drop in replacement
49for L<DBIx::Class::Storage::DBI>.
50
51Read traffic is spread across the replicants (slaves) occuring to a user
52selected algorithm. The default algorithm is random weighted.
53
bca099a3 54=head1 NOTES
55
56The consistancy betweeen master and replicants is database specific. The Pool
57gives you a method to validate it's replicants, removing and replacing them
58when they fail/pass predefined criteria. It is recommened that your application
59define two schemas, one using the replicated storage and another that just
60connects to the master.
2bf79155 61
62=head1 ATTRIBUTES
63
64This class defines the following attributes.
65
26ab719a 66=head2 pool_type
2bf79155 67
26ab719a 68Contains the classname which will instantiate the L</pool> object. Defaults
69to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
2bf79155 70
71=cut
72
26ab719a 73has 'pool_type' => (
2bf79155 74 is=>'ro',
75 isa=>'ClassName',
106d5f3b 76 lazy_build=>1,
26ab719a 77 handles=>{
0c90fabe 78 'create_pool' => 'new',
2bf79155 79 },
80);
81
f068a139 82=head2 pool_args
83
84Contains a hashref of initialized information to pass to the Balancer object.
85See L<DBIx::Class::Storage::Replicated::Pool> for available arguments.
86
87=cut
88
89has 'pool_args' => (
90 is=>'ro',
91 isa=>'HashRef',
92 lazy=>1,
93 required=>1,
94 default=>sub { {} },
95);
96
97
26ab719a 98=head2 balancer_type
2bf79155 99
100The replication pool requires a balance class to provider the methods for
101choose how to spread the query load across each replicant in the pool.
102
103=cut
104
26ab719a 105has 'balancer_type' => (
2bf79155 106 is=>'ro',
107 isa=>'ClassName',
106d5f3b 108 lazy_build=>1,
26ab719a 109 handles=>{
0c90fabe 110 'create_balancer' => 'new',
2bf79155 111 },
112);
113
17b05c13 114=head2 balancer_args
115
116Contains a hashref of initialized information to pass to the Balancer object.
f068a139 117See L<DBIx::Class::Storage::Replicated::Balancer> for available arguments.
17b05c13 118
119=cut
120
121has 'balancer_args' => (
122 is=>'ro',
123 isa=>'HashRef',
f068a139 124 lazy=>1,
125 required=>1,
126 default=>sub { {} },
17b05c13 127);
128
26ab719a 129=head2 pool
2bf79155 130
26ab719a 131Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class. This is a
132container class for one or more replicated databases.
2bf79155 133
134=cut
135
26ab719a 136has 'pool' => (
2bf79155 137 is=>'ro',
138 isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
139 lazy_build=>1,
26ab719a 140 handles=>[qw/
cb6ec758 141 connect_replicants
26ab719a 142 replicants
143 has_replicants
26ab719a 144 /],
2bf79155 145);
146
26ab719a 147=head2 balancer
2bf79155 148
26ab719a 149Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class. This
150is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
2bf79155 151
26ab719a 152=cut
2bf79155 153
26ab719a 154has 'balancer' => (
155 is=>'ro',
156 isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
157 lazy_build=>1,
17b05c13 158 handles=>[qw/auto_validate_every/],
26ab719a 159);
2bf79155 160
cb6ec758 161=head2 master
162
163The master defines the canonical state for a pool of connected databases. All
164the replicants are expected to match this databases state. Thus, in a classic
165Master / Slaves distributed system, all the slaves are expected to replicate
166the Master's state as quick as possible. This is the only database in the
167pool of databases that is allowed to handle write traffic.
168
169=cut
170
171has 'master' => (
172 is=> 'ro',
173 isa=>'DBIx::Class::Storage::DBI',
174 lazy_build=>1,
175);
176
cb6ec758 177=head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
178
179The following methods are delegated all the methods required for the
180L<DBIx::Class::Storage::DBI> interface.
181
182=head2 read_handler
183
184Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
185
186=cut
187
188has 'read_handler' => (
189 is=>'rw',
190 isa=>'Object',
191 lazy_build=>1,
192 handles=>[qw/
193 select
194 select_single
195 columns_info_for
196 /],
197);
198
cb6ec758 199=head2 write_handler
200
201Defines an object that implements the write side of L<BIx::Class::Storage::DBI>.
202
203=cut
204
205has 'write_handler' => (
206 is=>'ro',
207 isa=>'Object',
208 lazy_build=>1,
209 lazy_build=>1,
210 handles=>[qw/
211 on_connect_do
212 on_disconnect_do
213 connect_info
214 throw_exception
215 sql_maker
216 sqlt_type
217 create_ddl_dir
218 deployment_statements
219 datetime_parser
220 datetime_parser_type
221 last_insert_id
222 insert
223 insert_bulk
224 update
225 delete
226 dbh
6834cc1d 227 txn_do
cb6ec758 228 txn_commit
229 txn_rollback
230 sth
231 deploy
232 schema
ed213e85 233 reload_row
cb6ec758 234 /],
235);
236
26ab719a 237=head1 METHODS
2bf79155 238
26ab719a 239This class defines the following methods.
2bf79155 240
cb6ec758 241=head2 new
2bf79155 242
cb6ec758 243L<DBIx::Class::Schema> when instantiating it's storage passed itself as the
244first argument. We need to invoke L</new> on the underlying parent class, make
245sure we properly give it a L<Moose> meta class, and then correctly instantiate
246our attributes. Basically we pass on whatever the schema has in it's class
247data for 'storage_type_args' to our replicated storage type.
2bf79155 248
249=cut
250
cb6ec758 251sub new {
252 my $class = shift @_;
253 my $schema = shift @_;
254 my $storage_type_args = shift @_;
255 my $obj = $class->SUPER::new($schema, $storage_type_args, @_);
106d5f3b 256
257 ## Hate to do it this way, but can't seem to get advice on the attribute working right
258 ## maybe we can do a type and coercion for it.
259 if( $storage_type_args->{balancer_type} && $storage_type_args->{balancer_type}=~m/^::/) {
0c90fabe 260 $storage_type_args->{balancer_type} = 'DBIx::Class::Storage::DBI::Replicated::Balancer'.$storage_type_args->{balancer_type};
261 eval "require $storage_type_args->{balancer_type}";
106d5f3b 262 }
263
cb6ec758 264 return $class->meta->new_object(
265 __INSTANCE__ => $obj,
266 %$storage_type_args,
267 @_,
268 );
2bf79155 269}
270
cb6ec758 271=head2 _build_master
2bf79155 272
cb6ec758 273Lazy builder for the L</master> attribute.
2bf79155 274
275=cut
276
cb6ec758 277sub _build_master {
0c90fabe 278 DBIx::Class::Storage::DBI->new;
2bf79155 279}
280
106d5f3b 281=head2 _build_pool_type
282
283Lazy builder for the L</pool_type> attribute.
284
285=cut
286
287sub _build_pool_type {
288 return 'DBIx::Class::Storage::DBI::Replicated::Pool';
289}
290
26ab719a 291=head2 _build_pool
2bf79155 292
26ab719a 293Lazy builder for the L</pool> attribute.
2bf79155 294
295=cut
296
26ab719a 297sub _build_pool {
0c90fabe 298 my $self = shift @_;
f068a139 299 $self->create_pool(%{$self->pool_args});
2bf79155 300}
301
106d5f3b 302=head2 _build_balancer_type
303
304Lazy builder for the L</balancer_type> attribute.
305
306=cut
307
308sub _build_balancer_type {
17b05c13 309 return 'DBIx::Class::Storage::DBI::Replicated::Balancer::First';
106d5f3b 310}
311
26ab719a 312=head2 _build_balancer
2bf79155 313
cb6ec758 314Lazy builder for the L</balancer> attribute. This takes a Pool object so that
315the balancer knows which pool it's balancing.
2bf79155 316
317=cut
318
26ab719a 319sub _build_balancer {
320 my $self = shift @_;
106d5f3b 321 $self->create_balancer(
322 pool=>$self->pool,
17b05c13 323 master=>$self->master,
0c90fabe 324 %{$self->balancer_args},
325 );
2bf79155 326}
327
cb6ec758 328=head2 _build_write_handler
2bf79155 329
cb6ec758 330Lazy builder for the L</write_handler> attribute. The default is to set this to
331the L</master>.
50336325 332
333=cut
334
cb6ec758 335sub _build_write_handler {
336 return shift->master;
337}
50336325 338
cb6ec758 339=head2 _build_read_handler
2bf79155 340
cb6ec758 341Lazy builder for the L</read_handler> attribute. The default is to set this to
342the L</balancer>.
2bf79155 343
344=cut
345
cb6ec758 346sub _build_read_handler {
347 return shift->balancer;
348}
50336325 349
cb6ec758 350=head2 around: connect_replicants
2bf79155 351
cb6ec758 352All calls to connect_replicants needs to have an existing $schema tacked onto
353top of the args, since L<DBIx::Storage::DBI> needs it.
955a6df6 354
cb6ec758 355=cut
955a6df6 356
cb6ec758 357around 'connect_replicants' => sub {
0c90fabe 358 my ($method, $self, @args) = @_;
359 $self->$method($self->schema, @args);
955a6df6 360};
2bf79155 361
2bf79155 362=head2 all_storages
363
364Returns an array of of all the connected storage backends. The first element
365in the returned array is the master, and the remainings are each of the
366replicants.
367
368=cut
369
370sub all_storages {
0c90fabe 371 my $self = shift @_;
372
373 return grep {defined $_ && blessed $_} (
374 $self->master,
375 $self->replicants,
376 );
2bf79155 377}
378
c4d3fae2 379=head2 execute_reliably ($coderef, ?@args)
380
381Given a coderef, saves the current state of the L</read_handler>, forces it to
382use reliable storage (ie sets it to the master), executes a coderef and then
383restores the original state.
384
385Example:
386
387 my $reliably = sub {
388 my $name = shift @_;
389 $schema->resultset('User')->create({name=>$name});
390 my $user_rs = $schema->resultset('User')->find({name=>$name});
ed213e85 391 return $user_rs;
c4d3fae2 392 };
393
ed213e85 394 my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
c4d3fae2 395
396Use this when you must be certain of your database state, such as when you just
397inserted something and need to get a resultset including it, etc.
398
399=cut
400
401sub execute_reliably {
402 my ($self, $coderef, @args) = @_;
0c90fabe 403
c4d3fae2 404 unless( ref $coderef eq 'CODE') {
405 $self->throw_exception('Second argument must be a coderef');
406 }
407
408 ##Get copy of master storage
409 my $master = $self->master;
410
411 ##Get whatever the current read hander is
412 my $current = $self->read_handler;
413
414 ##Set the read handler to master
415 $self->read_handler($master);
416
417 ## do whatever the caller needs
ed213e85 418 my @result;
419 my $want_array = wantarray;
420
c4d3fae2 421 eval {
0c90fabe 422 if($want_array) {
423 @result = $coderef->(@args);
424 }
425 elsif(defined $want_array) {
426 ($result[0]) = ($coderef->(@args));
427 } else {
428 $coderef->(@args);
429 }
c4d3fae2 430 };
431
ed213e85 432 ##Reset to the original state
4fafa779 433 $self->read_handler($current);
ed213e85 434
435 ##Exception testing has to come last, otherwise you might leave the
436 ##read_handler set to master.
437
c4d3fae2 438 if($@) {
439 $self->throw_exception("coderef returned an error: $@");
ed213e85 440 } else {
0c90fabe 441 return $want_array ? @result : $result[0];
c4d3fae2 442 }
c4d3fae2 443}
444
cb6ec758 445=head2 set_reliable_storage
446
447Sets the current $schema to be 'reliable', that is all queries, both read and
448write are sent to the master
449
450=cut
451
452sub set_reliable_storage {
0c90fabe 453 my $self = shift @_;
454 my $schema = $self->schema;
455 my $write_handler = $self->schema->storage->write_handler;
456
457 $schema->storage->read_handler($write_handler);
cb6ec758 458}
459
460=head2 set_balanced_storage
461
462Sets the current $schema to be use the </balancer> for all reads, while all
463writea are sent to the master only
464
465=cut
466
467sub set_balanced_storage {
468 my $self = shift @_;
469 my $schema = $self->schema;
470 my $write_handler = $self->schema->storage->balancer;
471
472 $schema->storage->read_handler($write_handler);
473}
2bf79155 474
6834cc1d 475=head2 around: txn_do ($coderef)
c4d3fae2 476
477Overload to the txn_do method, which is delegated to whatever the
478L<write_handler> is set to. We overload this in order to wrap in inside a
479L</execute_reliably> method.
480
481=cut
482
6834cc1d 483around 'txn_do' => sub {
484 my($txn_do, $self, $coderef, @args) = @_;
0c90fabe 485 $self->execute_reliably(sub {$self->$txn_do($coderef, @args)});
6834cc1d 486};
c4d3fae2 487
ed213e85 488=head2 reload_row ($row)
489
490Overload to the reload_row method so that the reloading is always directed to
491the master storage.
492
493=cut
494
495around 'reload_row' => sub {
0c90fabe 496 my ($reload_row, $self, $row) = @_;
497 return $self->execute_reliably(sub {
498 return $self->$reload_row(shift);
499 }, $row);
ed213e85 500};
501
2bf79155 502=head2 connected
503
504Check that the master and at least one of the replicants is connected.
505
506=cut
507
508sub connected {
0c90fabe 509 my $self = shift @_;
510
511 return
512 $self->master->connected &&
513 $self->pool->connected_replicants;
2bf79155 514}
515
2bf79155 516=head2 ensure_connected
517
518Make sure all the storages are connected.
519
520=cut
521
522sub ensure_connected {
523 my $self = shift @_;
26ab719a 524 foreach my $source ($self->all_storages) {
2bf79155 525 $source->ensure_connected(@_);
526 }
527}
528
2bf79155 529=head2 limit_dialect
530
531Set the limit_dialect for all existing storages
532
533=cut
534
535sub limit_dialect {
536 my $self = shift @_;
26ab719a 537 foreach my $source ($self->all_storages) {
538 $source->limit_dialect(@_);
2bf79155 539 }
540}
541
2bf79155 542=head2 quote_char
543
544Set the quote_char for all existing storages
545
546=cut
547
548sub quote_char {
549 my $self = shift @_;
26ab719a 550 foreach my $source ($self->all_storages) {
551 $source->quote_char(@_);
2bf79155 552 }
553}
554
2bf79155 555=head2 name_sep
556
557Set the name_sep for all existing storages
558
559=cut
560
561sub name_sep {
562 my $self = shift @_;
26ab719a 563 foreach my $source ($self->all_storages) {
2bf79155 564 $source->name_sep(@_);
565 }
566}
567
2bf79155 568=head2 set_schema
569
570Set the schema object for all existing storages
571
572=cut
573
574sub set_schema {
0c90fabe 575 my $self = shift @_;
576 foreach my $source ($self->all_storages) {
577 $source->set_schema(@_);
578 }
2bf79155 579}
580
2bf79155 581=head2 debug
582
583set a debug flag across all storages
584
585=cut
586
587sub debug {
588 my $self = shift @_;
26ab719a 589 foreach my $source ($self->all_storages) {
2bf79155 590 $source->debug(@_);
591 }
592}
593
2bf79155 594=head2 debugobj
595
596set a debug object across all storages
597
598=cut
599
600sub debugobj {
601 my $self = shift @_;
26ab719a 602 foreach my $source ($self->all_storages) {
2bf79155 603 $source->debugobj(@_);
604 }
605}
606
2bf79155 607=head2 debugfh
608
609set a debugfh object across all storages
610
611=cut
612
613sub debugfh {
614 my $self = shift @_;
26ab719a 615 foreach my $source ($self->all_storages) {
2bf79155 616 $source->debugfh(@_);
617 }
618}
619
2bf79155 620=head2 debugcb
621
622set a debug callback across all storages
623
624=cut
625
626sub debugcb {
627 my $self = shift @_;
26ab719a 628 foreach my $source ($self->all_storages) {
2bf79155 629 $source->debugcb(@_);
630 }
631}
632
2bf79155 633=head2 disconnect
634
635disconnect everything
636
637=cut
638
639sub disconnect {
640 my $self = shift @_;
26ab719a 641 foreach my $source ($self->all_storages) {
2bf79155 642 $source->disconnect(@_);
643 }
644}
645
f5d3a5de 646=head1 AUTHOR
647
c4d3fae2 648 John Napiorkowski <john.napiorkowski@takkle.com>
f5d3a5de 649
c4d3fae2 650Based on code originated by:
f5d3a5de 651
c4d3fae2 652 Norbert Csongrádi <bert@cpan.org>
653 Peter Siklósi <einon@einon.hu>
2156bbdd 654
f5d3a5de 655=head1 LICENSE
656
657You may distribute this code under the same terms as Perl itself.
658
659=cut
660
6611;