discard changes now is forced to use master for replication. changed discard_changes...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
CommitLineData
2156bbdd 1package DBIx::Class::Storage::DBI::Replicated;
f5d3a5de 2
2bf79155 3use Moose;
26ab719a 4use DBIx::Class::Storage::DBI;
2bf79155 5use DBIx::Class::Storage::DBI::Replicated::Pool;
26ab719a 6use DBIx::Class::Storage::DBI::Replicated::Balancer;
7use Scalar::Util qw(blessed);
2bf79155 8
26ab719a 9extends 'DBIx::Class::Storage::DBI', 'Moose::Object';
2bf79155 10
11=head1 NAME
12
13DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
14
15=head1 SYNOPSIS
16
17The Following example shows how to change an existing $schema to a replicated
18storage type, add some replicated (readonly) databases, and perform reporting
955a6df6 19tasks.
2bf79155 20
21 ## Change storage_type in your schema class
106d5f3b 22 $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
2bf79155 23
24 ## Add some slaves. Basically this is an array of arrayrefs, where each
25 ## arrayref is database connect information
26
955a6df6 27 $schema->storage->connect_replicants(
2bf79155 28 [$dsn1, $user, $pass, \%opts],
29 [$dsn1, $user, $pass, \%opts],
30 [$dsn1, $user, $pass, \%opts],
2bf79155 31 );
2bf79155 32
33=head1 DESCRIPTION
34
35Warning: This class is marked ALPHA. We are using this in development and have
36some basic test coverage but the code hasn't yet been stressed by a variety
37of databases. Individual DB's may have quirks we are not aware of. Please
38use this in development and pass along your experiences/bug fixes.
39
40This class implements replicated data store for DBI. Currently you can define
41one master and numerous slave database connections. All write-type queries
42(INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
43database, all read-type queries (SELECTs) go to the slave database.
44
45Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
bca099a3 46handle gets delegated to one of the two attributes: L</read_handler> or to
47L</write_handler>. Additionally, some methods need to be distributed
2bf79155 48to all existing storages. This way our storage class is a drop in replacement
49for L<DBIx::Class::Storage::DBI>.
50
51Read traffic is spread across the replicants (slaves) occuring to a user
52selected algorithm. The default algorithm is random weighted.
53
bca099a3 54=head1 NOTES
55
56The consistancy betweeen master and replicants is database specific. The Pool
57gives you a method to validate it's replicants, removing and replacing them
58when they fail/pass predefined criteria. It is recommened that your application
59define two schemas, one using the replicated storage and another that just
60connects to the master.
2bf79155 61
62=head1 ATTRIBUTES
63
64This class defines the following attributes.
65
26ab719a 66=head2 pool_type
2bf79155 67
26ab719a 68Contains the classname which will instantiate the L</pool> object. Defaults
69to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
2bf79155 70
71=cut
72
26ab719a 73has 'pool_type' => (
2bf79155 74 is=>'ro',
75 isa=>'ClassName',
106d5f3b 76 lazy_build=>1,
26ab719a 77 handles=>{
78 'create_pool' => 'new',
2bf79155 79 },
80);
81
f068a139 82=head2 pool_args
83
84Contains a hashref of initialized information to pass to the Balancer object.
85See L<DBIx::Class::Storage::Replicated::Pool> for available arguments.
86
87=cut
88
89has 'pool_args' => (
90 is=>'ro',
91 isa=>'HashRef',
92 lazy=>1,
93 required=>1,
94 default=>sub { {} },
95);
96
97
26ab719a 98=head2 balancer_type
2bf79155 99
100The replication pool requires a balance class to provider the methods for
101choose how to spread the query load across each replicant in the pool.
102
103=cut
104
26ab719a 105has 'balancer_type' => (
2bf79155 106 is=>'ro',
107 isa=>'ClassName',
106d5f3b 108 lazy_build=>1,
26ab719a 109 handles=>{
110 'create_balancer' => 'new',
2bf79155 111 },
112);
113
17b05c13 114=head2 balancer_args
115
116Contains a hashref of initialized information to pass to the Balancer object.
f068a139 117See L<DBIx::Class::Storage::Replicated::Balancer> for available arguments.
17b05c13 118
119=cut
120
121has 'balancer_args' => (
122 is=>'ro',
123 isa=>'HashRef',
f068a139 124 lazy=>1,
125 required=>1,
126 default=>sub { {} },
17b05c13 127);
128
26ab719a 129=head2 pool
2bf79155 130
26ab719a 131Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class. This is a
132container class for one or more replicated databases.
2bf79155 133
134=cut
135
26ab719a 136has 'pool' => (
2bf79155 137 is=>'ro',
138 isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
139 lazy_build=>1,
26ab719a 140 handles=>[qw/
cb6ec758 141 connect_replicants
26ab719a 142 replicants
143 has_replicants
26ab719a 144 /],
2bf79155 145);
146
26ab719a 147=head2 balancer
2bf79155 148
26ab719a 149Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class. This
150is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
2bf79155 151
26ab719a 152=cut
2bf79155 153
26ab719a 154has 'balancer' => (
155 is=>'ro',
156 isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
157 lazy_build=>1,
17b05c13 158 handles=>[qw/auto_validate_every/],
26ab719a 159);
2bf79155 160
cb6ec758 161=head2 master
162
163The master defines the canonical state for a pool of connected databases. All
164the replicants are expected to match this databases state. Thus, in a classic
165Master / Slaves distributed system, all the slaves are expected to replicate
166the Master's state as quick as possible. This is the only database in the
167pool of databases that is allowed to handle write traffic.
168
169=cut
170
171has 'master' => (
172 is=> 'ro',
173 isa=>'DBIx::Class::Storage::DBI',
174 lazy_build=>1,
175);
176
cb6ec758 177=head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
178
179The following methods are delegated all the methods required for the
180L<DBIx::Class::Storage::DBI> interface.
181
182=head2 read_handler
183
184Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
185
186=cut
187
188has 'read_handler' => (
189 is=>'rw',
190 isa=>'Object',
191 lazy_build=>1,
192 handles=>[qw/
193 select
194 select_single
195 columns_info_for
196 /],
197);
198
cb6ec758 199=head2 write_handler
200
201Defines an object that implements the write side of L<BIx::Class::Storage::DBI>.
202
203=cut
204
205has 'write_handler' => (
206 is=>'ro',
207 isa=>'Object',
208 lazy_build=>1,
209 lazy_build=>1,
210 handles=>[qw/
211 on_connect_do
212 on_disconnect_do
213 connect_info
214 throw_exception
215 sql_maker
216 sqlt_type
217 create_ddl_dir
218 deployment_statements
219 datetime_parser
220 datetime_parser_type
221 last_insert_id
222 insert
223 insert_bulk
224 update
225 delete
226 dbh
cb6ec758 227 txn_commit
228 txn_rollback
229 sth
230 deploy
231 schema
ed213e85 232 reload_row
cb6ec758 233 /],
234);
235
26ab719a 236=head1 METHODS
2bf79155 237
26ab719a 238This class defines the following methods.
2bf79155 239
cb6ec758 240=head2 new
2bf79155 241
cb6ec758 242L<DBIx::Class::Schema> when instantiating it's storage passed itself as the
243first argument. We need to invoke L</new> on the underlying parent class, make
244sure we properly give it a L<Moose> meta class, and then correctly instantiate
245our attributes. Basically we pass on whatever the schema has in it's class
246data for 'storage_type_args' to our replicated storage type.
2bf79155 247
248=cut
249
cb6ec758 250sub new {
251 my $class = shift @_;
252 my $schema = shift @_;
253 my $storage_type_args = shift @_;
254 my $obj = $class->SUPER::new($schema, $storage_type_args, @_);
106d5f3b 255
256 ## Hate to do it this way, but can't seem to get advice on the attribute working right
257 ## maybe we can do a type and coercion for it.
258 if( $storage_type_args->{balancer_type} && $storage_type_args->{balancer_type}=~m/^::/) {
259 $storage_type_args->{balancer_type} = 'DBIx::Class::Storage::DBI::Replicated::Balancer'.$storage_type_args->{balancer_type};
260 eval "require $storage_type_args->{balancer_type}";
261 }
262
cb6ec758 263 return $class->meta->new_object(
264 __INSTANCE__ => $obj,
265 %$storage_type_args,
266 @_,
267 );
2bf79155 268}
269
cb6ec758 270=head2 _build_master
2bf79155 271
cb6ec758 272Lazy builder for the L</master> attribute.
2bf79155 273
274=cut
275
cb6ec758 276sub _build_master {
277 DBIx::Class::Storage::DBI->new;
2bf79155 278}
279
106d5f3b 280=head2 _build_pool_type
281
282Lazy builder for the L</pool_type> attribute.
283
284=cut
285
286sub _build_pool_type {
287 return 'DBIx::Class::Storage::DBI::Replicated::Pool';
288}
289
26ab719a 290=head2 _build_pool
2bf79155 291
26ab719a 292Lazy builder for the L</pool> attribute.
2bf79155 293
294=cut
295
26ab719a 296sub _build_pool {
4a607d7a 297 my $self = shift @_;
f068a139 298 $self->create_pool(%{$self->pool_args});
2bf79155 299}
300
106d5f3b 301=head2 _build_balancer_type
302
303Lazy builder for the L</balancer_type> attribute.
304
305=cut
306
307sub _build_balancer_type {
17b05c13 308 return 'DBIx::Class::Storage::DBI::Replicated::Balancer::First';
106d5f3b 309}
310
26ab719a 311=head2 _build_balancer
2bf79155 312
cb6ec758 313Lazy builder for the L</balancer> attribute. This takes a Pool object so that
314the balancer knows which pool it's balancing.
2bf79155 315
316=cut
317
26ab719a 318sub _build_balancer {
319 my $self = shift @_;
106d5f3b 320 $self->create_balancer(
321 pool=>$self->pool,
17b05c13 322 master=>$self->master,
323 %{$self->balancer_args},);
2bf79155 324}
325
cb6ec758 326=head2 _build_write_handler
2bf79155 327
cb6ec758 328Lazy builder for the L</write_handler> attribute. The default is to set this to
329the L</master>.
50336325 330
331=cut
332
cb6ec758 333sub _build_write_handler {
334 return shift->master;
335}
50336325 336
cb6ec758 337=head2 _build_read_handler
2bf79155 338
cb6ec758 339Lazy builder for the L</read_handler> attribute. The default is to set this to
340the L</balancer>.
2bf79155 341
342=cut
343
cb6ec758 344sub _build_read_handler {
345 return shift->balancer;
346}
50336325 347
cb6ec758 348=head2 around: connect_replicants
2bf79155 349
cb6ec758 350All calls to connect_replicants needs to have an existing $schema tacked onto
351top of the args, since L<DBIx::Storage::DBI> needs it.
955a6df6 352
cb6ec758 353=cut
955a6df6 354
cb6ec758 355around 'connect_replicants' => sub {
356 my ($method, $self, @args) = @_;
357 $self->$method($self->schema, @args);
955a6df6 358};
2bf79155 359
2bf79155 360=head2 all_storages
361
362Returns an array of of all the connected storage backends. The first element
363in the returned array is the master, and the remainings are each of the
364replicants.
365
366=cut
367
368sub all_storages {
369 my $self = shift @_;
370
26ab719a 371 return grep {defined $_ && blessed $_} (
372 $self->master,
373 $self->replicants,
2bf79155 374 );
375}
376
c4d3fae2 377=head2 execute_reliably ($coderef, ?@args)
378
379Given a coderef, saves the current state of the L</read_handler>, forces it to
380use reliable storage (ie sets it to the master), executes a coderef and then
381restores the original state.
382
383Example:
384
385 my $reliably = sub {
386 my $name = shift @_;
387 $schema->resultset('User')->create({name=>$name});
388 my $user_rs = $schema->resultset('User')->find({name=>$name});
ed213e85 389 return $user_rs;
c4d3fae2 390 };
391
ed213e85 392 my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
c4d3fae2 393
394Use this when you must be certain of your database state, such as when you just
395inserted something and need to get a resultset including it, etc.
396
397=cut
398
399sub execute_reliably {
400 my ($self, $coderef, @args) = @_;
401
402 unless( ref $coderef eq 'CODE') {
403 $self->throw_exception('Second argument must be a coderef');
404 }
405
406 ##Get copy of master storage
407 my $master = $self->master;
408
409 ##Get whatever the current read hander is
410 my $current = $self->read_handler;
411
412 ##Set the read handler to master
413 $self->read_handler($master);
414
415 ## do whatever the caller needs
ed213e85 416 my @result;
417 my $want_array = wantarray;
418
c4d3fae2 419 eval {
ed213e85 420 if($want_array) {
421 @result = $coderef->(@args);
422 }
423 elsif(defined $want_array) {
424 ($result[0]) = ($coderef->(@args));
425 } else {
426 $coderef->(@args);
427 }
c4d3fae2 428 };
429
ed213e85 430 ##Reset to the original state
431 $self->schema->storage->read_handler($current);
432
433 ##Exception testing has to come last, otherwise you might leave the
434 ##read_handler set to master.
435
c4d3fae2 436 if($@) {
437 $self->throw_exception("coderef returned an error: $@");
ed213e85 438 } else {
439 return $want_array ? @result : $result[0];
c4d3fae2 440 }
c4d3fae2 441}
442
cb6ec758 443=head2 set_reliable_storage
444
445Sets the current $schema to be 'reliable', that is all queries, both read and
446write are sent to the master
447
448=cut
449
450sub set_reliable_storage {
451 my $self = shift @_;
452 my $schema = $self->schema;
453 my $write_handler = $self->schema->storage->write_handler;
454
455 $schema->storage->read_handler($write_handler);
456}
457
458=head2 set_balanced_storage
459
460Sets the current $schema to be use the </balancer> for all reads, while all
461writea are sent to the master only
462
463=cut
464
465sub set_balanced_storage {
466 my $self = shift @_;
467 my $schema = $self->schema;
468 my $write_handler = $self->schema->storage->balancer;
469
470 $schema->storage->read_handler($write_handler);
471}
2bf79155 472
c4d3fae2 473=head2 txn_do ($coderef)
474
475Overload to the txn_do method, which is delegated to whatever the
476L<write_handler> is set to. We overload this in order to wrap in inside a
477L</execute_reliably> method.
478
479=cut
480
481sub txn_do {
482 my($self, $coderef, @args) = @_;
483 $self->execute_reliably($coderef, @args);
484}
485
ed213e85 486=head2 reload_row ($row)
487
488Overload to the reload_row method so that the reloading is always directed to
489the master storage.
490
491=cut
492
493around 'reload_row' => sub {
494 my ($reload_row, $self, $row) = @_;
495 $self->execute_reliably(sub {
496 $self->$reload_row(shift);
497 }, $row);
498};
499
2bf79155 500=head2 connected
501
502Check that the master and at least one of the replicants is connected.
503
504=cut
505
506sub connected {
507 my $self = shift @_;
508
509 return
26ab719a 510 $self->master->connected &&
511 $self->pool->connected_replicants;
2bf79155 512}
513
2bf79155 514=head2 ensure_connected
515
516Make sure all the storages are connected.
517
518=cut
519
520sub ensure_connected {
521 my $self = shift @_;
26ab719a 522 foreach my $source ($self->all_storages) {
2bf79155 523 $source->ensure_connected(@_);
524 }
525}
526
2bf79155 527=head2 limit_dialect
528
529Set the limit_dialect for all existing storages
530
531=cut
532
533sub limit_dialect {
534 my $self = shift @_;
26ab719a 535 foreach my $source ($self->all_storages) {
536 $source->limit_dialect(@_);
2bf79155 537 }
538}
539
2bf79155 540=head2 quote_char
541
542Set the quote_char for all existing storages
543
544=cut
545
546sub quote_char {
547 my $self = shift @_;
26ab719a 548 foreach my $source ($self->all_storages) {
549 $source->quote_char(@_);
2bf79155 550 }
551}
552
2bf79155 553=head2 name_sep
554
555Set the name_sep for all existing storages
556
557=cut
558
559sub name_sep {
560 my $self = shift @_;
26ab719a 561 foreach my $source ($self->all_storages) {
2bf79155 562 $source->name_sep(@_);
563 }
564}
565
2bf79155 566=head2 set_schema
567
568Set the schema object for all existing storages
569
570=cut
571
572sub set_schema {
573 my $self = shift @_;
26ab719a 574 foreach my $source ($self->all_storages) {
2bf79155 575 $source->set_schema(@_);
576 }
577}
578
2bf79155 579=head2 debug
580
581set a debug flag across all storages
582
583=cut
584
585sub debug {
586 my $self = shift @_;
26ab719a 587 foreach my $source ($self->all_storages) {
2bf79155 588 $source->debug(@_);
589 }
590}
591
2bf79155 592=head2 debugobj
593
594set a debug object across all storages
595
596=cut
597
598sub debugobj {
599 my $self = shift @_;
26ab719a 600 foreach my $source ($self->all_storages) {
2bf79155 601 $source->debugobj(@_);
602 }
603}
604
2bf79155 605=head2 debugfh
606
607set a debugfh object across all storages
608
609=cut
610
611sub debugfh {
612 my $self = shift @_;
26ab719a 613 foreach my $source ($self->all_storages) {
2bf79155 614 $source->debugfh(@_);
615 }
616}
617
2bf79155 618=head2 debugcb
619
620set a debug callback across all storages
621
622=cut
623
624sub debugcb {
625 my $self = shift @_;
26ab719a 626 foreach my $source ($self->all_storages) {
2bf79155 627 $source->debugcb(@_);
628 }
629}
630
2bf79155 631=head2 disconnect
632
633disconnect everything
634
635=cut
636
637sub disconnect {
638 my $self = shift @_;
26ab719a 639 foreach my $source ($self->all_storages) {
2bf79155 640 $source->disconnect(@_);
641 }
642}
643
f5d3a5de 644=head1 AUTHOR
645
c4d3fae2 646 John Napiorkowski <john.napiorkowski@takkle.com>
f5d3a5de 647
c4d3fae2 648Based on code originated by:
f5d3a5de 649
c4d3fae2 650 Norbert Csongrádi <bert@cpan.org>
651 Peter Siklósi <einon@einon.hu>
2156bbdd 652
f5d3a5de 653=head1 LICENSE
654
655You may distribute this code under the same terms as Perl itself.
656
657=cut
658
6591;