Merge 'trunk' into 'replication_dedux'
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
CommitLineData
2156bbdd 1package DBIx::Class::Storage::DBI::Replicated;
f5d3a5de 2
2bf79155 3use Moose;
26ab719a 4use DBIx::Class::Storage::DBI;
2bf79155 5use DBIx::Class::Storage::DBI::Replicated::Pool;
26ab719a 6use DBIx::Class::Storage::DBI::Replicated::Balancer;
7use Scalar::Util qw(blessed);
2bf79155 8
26ab719a 9extends 'DBIx::Class::Storage::DBI', 'Moose::Object';
2bf79155 10
11=head1 NAME
12
13DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
14
15=head1 SYNOPSIS
16
17The Following example shows how to change an existing $schema to a replicated
18storage type, add some replicated (readonly) databases, and perform reporting
955a6df6 19tasks.
2bf79155 20
21 ## Change storage_type in your schema class
106d5f3b 22 $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
2bf79155 23
24 ## Add some slaves. Basically this is an array of arrayrefs, where each
25 ## arrayref is database connect information
26
955a6df6 27 $schema->storage->connect_replicants(
2bf79155 28 [$dsn1, $user, $pass, \%opts],
29 [$dsn1, $user, $pass, \%opts],
30 [$dsn1, $user, $pass, \%opts],
2bf79155 31 );
2bf79155 32
33=head1 DESCRIPTION
34
35Warning: This class is marked ALPHA. We are using this in development and have
36some basic test coverage but the code hasn't yet been stressed by a variety
37of databases. Individual DB's may have quirks we are not aware of. Please
38use this in development and pass along your experiences/bug fixes.
39
40This class implements replicated data store for DBI. Currently you can define
41one master and numerous slave database connections. All write-type queries
42(INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
43database, all read-type queries (SELECTs) go to the slave database.
44
45Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
bca099a3 46handle gets delegated to one of the two attributes: L</read_handler> or to
47L</write_handler>. Additionally, some methods need to be distributed
2bf79155 48to all existing storages. This way our storage class is a drop in replacement
49for L<DBIx::Class::Storage::DBI>.
50
51Read traffic is spread across the replicants (slaves) occuring to a user
52selected algorithm. The default algorithm is random weighted.
53
bca099a3 54=head1 NOTES
55
56The consistancy betweeen master and replicants is database specific. The Pool
57gives you a method to validate it's replicants, removing and replacing them
58when they fail/pass predefined criteria. It is recommened that your application
59define two schemas, one using the replicated storage and another that just
60connects to the master.
2bf79155 61
62=head1 ATTRIBUTES
63
64This class defines the following attributes.
65
26ab719a 66=head2 pool_type
2bf79155 67
26ab719a 68Contains the classname which will instantiate the L</pool> object. Defaults
69to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
2bf79155 70
71=cut
72
26ab719a 73has 'pool_type' => (
2bf79155 74 is=>'ro',
75 isa=>'ClassName',
106d5f3b 76 lazy_build=>1,
26ab719a 77 handles=>{
78 'create_pool' => 'new',
2bf79155 79 },
80);
81
f068a139 82=head2 pool_args
83
84Contains a hashref of initialized information to pass to the Balancer object.
85See L<DBIx::Class::Storage::Replicated::Pool> for available arguments.
86
87=cut
88
89has 'pool_args' => (
90 is=>'ro',
91 isa=>'HashRef',
92 lazy=>1,
93 required=>1,
94 default=>sub { {} },
95);
96
97
26ab719a 98=head2 balancer_type
2bf79155 99
100The replication pool requires a balance class to provider the methods for
101choose how to spread the query load across each replicant in the pool.
102
103=cut
104
26ab719a 105has 'balancer_type' => (
2bf79155 106 is=>'ro',
107 isa=>'ClassName',
106d5f3b 108 lazy_build=>1,
26ab719a 109 handles=>{
110 'create_balancer' => 'new',
2bf79155 111 },
112);
113
17b05c13 114=head2 balancer_args
115
116Contains a hashref of initialized information to pass to the Balancer object.
f068a139 117See L<DBIx::Class::Storage::Replicated::Balancer> for available arguments.
17b05c13 118
119=cut
120
121has 'balancer_args' => (
122 is=>'ro',
123 isa=>'HashRef',
f068a139 124 lazy=>1,
125 required=>1,
126 default=>sub { {} },
17b05c13 127);
128
26ab719a 129=head2 pool
2bf79155 130
26ab719a 131Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class. This is a
132container class for one or more replicated databases.
2bf79155 133
134=cut
135
26ab719a 136has 'pool' => (
2bf79155 137 is=>'ro',
138 isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
139 lazy_build=>1,
26ab719a 140 handles=>[qw/
cb6ec758 141 connect_replicants
26ab719a 142 replicants
143 has_replicants
26ab719a 144 /],
2bf79155 145);
146
26ab719a 147=head2 balancer
2bf79155 148
26ab719a 149Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class. This
150is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
2bf79155 151
26ab719a 152=cut
2bf79155 153
26ab719a 154has 'balancer' => (
155 is=>'ro',
156 isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
157 lazy_build=>1,
17b05c13 158 handles=>[qw/auto_validate_every/],
26ab719a 159);
2bf79155 160
cb6ec758 161=head2 master
162
163The master defines the canonical state for a pool of connected databases. All
164the replicants are expected to match this databases state. Thus, in a classic
165Master / Slaves distributed system, all the slaves are expected to replicate
166the Master's state as quick as possible. This is the only database in the
167pool of databases that is allowed to handle write traffic.
168
169=cut
170
171has 'master' => (
172 is=> 'ro',
173 isa=>'DBIx::Class::Storage::DBI',
174 lazy_build=>1,
175);
176
cb6ec758 177=head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
178
179The following methods are delegated all the methods required for the
180L<DBIx::Class::Storage::DBI> interface.
181
182=head2 read_handler
183
184Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
185
186=cut
187
188has 'read_handler' => (
189 is=>'rw',
190 isa=>'Object',
191 lazy_build=>1,
192 handles=>[qw/
193 select
194 select_single
195 columns_info_for
196 /],
197);
198
cb6ec758 199=head2 write_handler
200
201Defines an object that implements the write side of L<BIx::Class::Storage::DBI>.
202
203=cut
204
205has 'write_handler' => (
206 is=>'ro',
207 isa=>'Object',
208 lazy_build=>1,
209 lazy_build=>1,
210 handles=>[qw/
211 on_connect_do
212 on_disconnect_do
213 connect_info
214 throw_exception
215 sql_maker
216 sqlt_type
217 create_ddl_dir
218 deployment_statements
219 datetime_parser
220 datetime_parser_type
221 last_insert_id
222 insert
223 insert_bulk
224 update
225 delete
226 dbh
6834cc1d 227 txn_do
cb6ec758 228 txn_commit
229 txn_rollback
230 sth
231 deploy
232 schema
ed213e85 233 reload_row
cb6ec758 234 /],
235);
236
26ab719a 237=head1 METHODS
2bf79155 238
26ab719a 239This class defines the following methods.
2bf79155 240
cb6ec758 241=head2 new
2bf79155 242
cb6ec758 243L<DBIx::Class::Schema> when instantiating it's storage passed itself as the
244first argument. We need to invoke L</new> on the underlying parent class, make
245sure we properly give it a L<Moose> meta class, and then correctly instantiate
246our attributes. Basically we pass on whatever the schema has in it's class
247data for 'storage_type_args' to our replicated storage type.
2bf79155 248
249=cut
250
cb6ec758 251sub new {
252 my $class = shift @_;
253 my $schema = shift @_;
254 my $storage_type_args = shift @_;
255 my $obj = $class->SUPER::new($schema, $storage_type_args, @_);
106d5f3b 256
257 ## Hate to do it this way, but can't seem to get advice on the attribute working right
258 ## maybe we can do a type and coercion for it.
259 if( $storage_type_args->{balancer_type} && $storage_type_args->{balancer_type}=~m/^::/) {
260 $storage_type_args->{balancer_type} = 'DBIx::Class::Storage::DBI::Replicated::Balancer'.$storage_type_args->{balancer_type};
261 eval "require $storage_type_args->{balancer_type}";
262 }
263
cb6ec758 264 return $class->meta->new_object(
265 __INSTANCE__ => $obj,
266 %$storage_type_args,
267 @_,
268 );
2bf79155 269}
270
cb6ec758 271=head2 _build_master
2bf79155 272
cb6ec758 273Lazy builder for the L</master> attribute.
2bf79155 274
275=cut
276
cb6ec758 277sub _build_master {
278 DBIx::Class::Storage::DBI->new;
2bf79155 279}
280
106d5f3b 281=head2 _build_pool_type
282
283Lazy builder for the L</pool_type> attribute.
284
285=cut
286
287sub _build_pool_type {
288 return 'DBIx::Class::Storage::DBI::Replicated::Pool';
289}
290
26ab719a 291=head2 _build_pool
2bf79155 292
26ab719a 293Lazy builder for the L</pool> attribute.
2bf79155 294
295=cut
296
26ab719a 297sub _build_pool {
4a607d7a 298 my $self = shift @_;
f068a139 299 $self->create_pool(%{$self->pool_args});
2bf79155 300}
301
106d5f3b 302=head2 _build_balancer_type
303
304Lazy builder for the L</balancer_type> attribute.
305
306=cut
307
308sub _build_balancer_type {
17b05c13 309 return 'DBIx::Class::Storage::DBI::Replicated::Balancer::First';
106d5f3b 310}
311
26ab719a 312=head2 _build_balancer
2bf79155 313
cb6ec758 314Lazy builder for the L</balancer> attribute. This takes a Pool object so that
315the balancer knows which pool it's balancing.
2bf79155 316
317=cut
318
26ab719a 319sub _build_balancer {
320 my $self = shift @_;
106d5f3b 321 $self->create_balancer(
322 pool=>$self->pool,
17b05c13 323 master=>$self->master,
324 %{$self->balancer_args},);
2bf79155 325}
326
cb6ec758 327=head2 _build_write_handler
2bf79155 328
cb6ec758 329Lazy builder for the L</write_handler> attribute. The default is to set this to
330the L</master>.
50336325 331
332=cut
333
cb6ec758 334sub _build_write_handler {
335 return shift->master;
336}
50336325 337
cb6ec758 338=head2 _build_read_handler
2bf79155 339
cb6ec758 340Lazy builder for the L</read_handler> attribute. The default is to set this to
341the L</balancer>.
2bf79155 342
343=cut
344
cb6ec758 345sub _build_read_handler {
346 return shift->balancer;
347}
50336325 348
cb6ec758 349=head2 around: connect_replicants
2bf79155 350
cb6ec758 351All calls to connect_replicants needs to have an existing $schema tacked onto
352top of the args, since L<DBIx::Storage::DBI> needs it.
955a6df6 353
cb6ec758 354=cut
955a6df6 355
cb6ec758 356around 'connect_replicants' => sub {
357 my ($method, $self, @args) = @_;
358 $self->$method($self->schema, @args);
955a6df6 359};
2bf79155 360
2bf79155 361=head2 all_storages
362
363Returns an array of of all the connected storage backends. The first element
364in the returned array is the master, and the remainings are each of the
365replicants.
366
367=cut
368
369sub all_storages {
370 my $self = shift @_;
371
26ab719a 372 return grep {defined $_ && blessed $_} (
373 $self->master,
374 $self->replicants,
2bf79155 375 );
376}
377
c4d3fae2 378=head2 execute_reliably ($coderef, ?@args)
379
380Given a coderef, saves the current state of the L</read_handler>, forces it to
381use reliable storage (ie sets it to the master), executes a coderef and then
382restores the original state.
383
384Example:
385
386 my $reliably = sub {
387 my $name = shift @_;
388 $schema->resultset('User')->create({name=>$name});
389 my $user_rs = $schema->resultset('User')->find({name=>$name});
ed213e85 390 return $user_rs;
c4d3fae2 391 };
392
ed213e85 393 my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
c4d3fae2 394
395Use this when you must be certain of your database state, such as when you just
396inserted something and need to get a resultset including it, etc.
397
398=cut
399
400sub execute_reliably {
401 my ($self, $coderef, @args) = @_;
402
403 unless( ref $coderef eq 'CODE') {
404 $self->throw_exception('Second argument must be a coderef');
405 }
406
407 ##Get copy of master storage
408 my $master = $self->master;
409
410 ##Get whatever the current read hander is
411 my $current = $self->read_handler;
412
413 ##Set the read handler to master
414 $self->read_handler($master);
415
416 ## do whatever the caller needs
ed213e85 417 my @result;
418 my $want_array = wantarray;
419
c4d3fae2 420 eval {
ed213e85 421 if($want_array) {
422 @result = $coderef->(@args);
423 }
424 elsif(defined $want_array) {
425 ($result[0]) = ($coderef->(@args));
426 } else {
427 $coderef->(@args);
428 }
c4d3fae2 429 };
430
ed213e85 431 ##Reset to the original state
4fafa779 432 $self->read_handler($current);
ed213e85 433
434 ##Exception testing has to come last, otherwise you might leave the
435 ##read_handler set to master.
436
c4d3fae2 437 if($@) {
438 $self->throw_exception("coderef returned an error: $@");
ed213e85 439 } else {
440 return $want_array ? @result : $result[0];
c4d3fae2 441 }
c4d3fae2 442}
443
cb6ec758 444=head2 set_reliable_storage
445
446Sets the current $schema to be 'reliable', that is all queries, both read and
447write are sent to the master
448
449=cut
450
451sub set_reliable_storage {
452 my $self = shift @_;
453 my $schema = $self->schema;
454 my $write_handler = $self->schema->storage->write_handler;
455
456 $schema->storage->read_handler($write_handler);
457}
458
459=head2 set_balanced_storage
460
461Sets the current $schema to be use the </balancer> for all reads, while all
462writea are sent to the master only
463
464=cut
465
466sub set_balanced_storage {
467 my $self = shift @_;
468 my $schema = $self->schema;
469 my $write_handler = $self->schema->storage->balancer;
470
471 $schema->storage->read_handler($write_handler);
472}
2bf79155 473
6834cc1d 474=head2 around: txn_do ($coderef)
c4d3fae2 475
476Overload to the txn_do method, which is delegated to whatever the
477L<write_handler> is set to. We overload this in order to wrap in inside a
478L</execute_reliably> method.
479
480=cut
481
6834cc1d 482around 'txn_do' => sub {
483 my($txn_do, $self, $coderef, @args) = @_;
484 $self->execute_reliably(sub {$self->$txn_do($coderef, @args)});
485};
c4d3fae2 486
ed213e85 487=head2 reload_row ($row)
488
489Overload to the reload_row method so that the reloading is always directed to
490the master storage.
491
492=cut
493
494around 'reload_row' => sub {
495 my ($reload_row, $self, $row) = @_;
243a6b72 496 return $self->execute_reliably(sub {
497 return $self->$reload_row(shift);
ed213e85 498 }, $row);
499};
500
2bf79155 501=head2 connected
502
503Check that the master and at least one of the replicants is connected.
504
505=cut
506
507sub connected {
508 my $self = shift @_;
509
510 return
26ab719a 511 $self->master->connected &&
512 $self->pool->connected_replicants;
2bf79155 513}
514
2bf79155 515=head2 ensure_connected
516
517Make sure all the storages are connected.
518
519=cut
520
521sub ensure_connected {
522 my $self = shift @_;
26ab719a 523 foreach my $source ($self->all_storages) {
2bf79155 524 $source->ensure_connected(@_);
525 }
526}
527
2bf79155 528=head2 limit_dialect
529
530Set the limit_dialect for all existing storages
531
532=cut
533
534sub limit_dialect {
535 my $self = shift @_;
26ab719a 536 foreach my $source ($self->all_storages) {
537 $source->limit_dialect(@_);
2bf79155 538 }
539}
540
2bf79155 541=head2 quote_char
542
543Set the quote_char for all existing storages
544
545=cut
546
547sub quote_char {
548 my $self = shift @_;
26ab719a 549 foreach my $source ($self->all_storages) {
550 $source->quote_char(@_);
2bf79155 551 }
552}
553
2bf79155 554=head2 name_sep
555
556Set the name_sep for all existing storages
557
558=cut
559
560sub name_sep {
561 my $self = shift @_;
26ab719a 562 foreach my $source ($self->all_storages) {
2bf79155 563 $source->name_sep(@_);
564 }
565}
566
2bf79155 567=head2 set_schema
568
569Set the schema object for all existing storages
570
571=cut
572
573sub set_schema {
574 my $self = shift @_;
26ab719a 575 foreach my $source ($self->all_storages) {
2bf79155 576 $source->set_schema(@_);
577 }
578}
579
2bf79155 580=head2 debug
581
582set a debug flag across all storages
583
584=cut
585
586sub debug {
587 my $self = shift @_;
26ab719a 588 foreach my $source ($self->all_storages) {
2bf79155 589 $source->debug(@_);
590 }
591}
592
2bf79155 593=head2 debugobj
594
595set a debug object across all storages
596
597=cut
598
599sub debugobj {
600 my $self = shift @_;
26ab719a 601 foreach my $source ($self->all_storages) {
2bf79155 602 $source->debugobj(@_);
603 }
604}
605
2bf79155 606=head2 debugfh
607
608set a debugfh object across all storages
609
610=cut
611
612sub debugfh {
613 my $self = shift @_;
26ab719a 614 foreach my $source ($self->all_storages) {
2bf79155 615 $source->debugfh(@_);
616 }
617}
618
2bf79155 619=head2 debugcb
620
621set a debug callback across all storages
622
623=cut
624
625sub debugcb {
626 my $self = shift @_;
26ab719a 627 foreach my $source ($self->all_storages) {
2bf79155 628 $source->debugcb(@_);
629 }
630}
631
2bf79155 632=head2 disconnect
633
634disconnect everything
635
636=cut
637
638sub disconnect {
639 my $self = shift @_;
26ab719a 640 foreach my $source ($self->all_storages) {
2bf79155 641 $source->disconnect(@_);
642 }
643}
644
f5d3a5de 645=head1 AUTHOR
646
c4d3fae2 647 John Napiorkowski <john.napiorkowski@takkle.com>
f5d3a5de 648
c4d3fae2 649Based on code originated by:
f5d3a5de 650
c4d3fae2 651 Norbert Csongrádi <bert@cpan.org>
652 Peter Siklósi <einon@einon.hu>
2156bbdd 653
f5d3a5de 654=head1 LICENSE
655
656You may distribute this code under the same terms as Perl itself.
657
658=cut
659
6601;