1) changed all 4 space indentation to 2 space style indents for replication code...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
CommitLineData
2156bbdd 1package DBIx::Class::Storage::DBI::Replicated;
f5d3a5de 2
2bf79155 3use Moose;
26ab719a 4use DBIx::Class::Storage::DBI;
2bf79155 5use DBIx::Class::Storage::DBI::Replicated::Pool;
26ab719a 6use DBIx::Class::Storage::DBI::Replicated::Balancer;
7use Scalar::Util qw(blessed);
2bf79155 8
26ab719a 9extends 'DBIx::Class::Storage::DBI', 'Moose::Object';
2bf79155 10
11=head1 NAME
12
13DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
14
15=head1 SYNOPSIS
16
17The Following example shows how to change an existing $schema to a replicated
18storage type, add some replicated (readonly) databases, and perform reporting
955a6df6 19tasks.
2bf79155 20
64cdad22 21 ## Change storage_type in your schema class
22 $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
23
24 ## Add some slaves. Basically this is an array of arrayrefs, where each
25 ## arrayref is database connect information
26
27 $schema->storage->connect_replicants(
28 [$dsn1, $user, $pass, \%opts],
29 [$dsn2, $user, $pass, \%opts],
30 [$dsn3, $user, $pass, \%opts],
31 );
32
2bf79155 33=head1 DESCRIPTION
34
35Warning: This class is marked ALPHA. We are using this in development and have
36some basic test coverage but the code hasn't yet been stressed by a variety
37of databases. Individual DB's may have quirks we are not aware of. Please
38use this in development and pass along your experiences/bug fixes.
39
40This class implements replicated data store for DBI. Currently you can define
41one master and numerous slave database connections. All write-type queries
42(INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
43database, all read-type queries (SELECTs) go to the slave database.
44
45Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
bca099a3 46handle gets delegated to one of the two attributes: L</read_handler> or to
47L</write_handler>. Additionally, some methods need to be distributed
2bf79155 48to all existing storages. This way our storage class is a drop in replacement
49for L<DBIx::Class::Storage::DBI>.
50
51Read traffic is spread across the replicants (slaves) occuring to a user
52selected algorithm. The default algorithm is random weighted.
53
bca099a3 54=head1 NOTES
55
56The consistancy betweeen master and replicants is database specific. The Pool
57gives you a method to validate it's replicants, removing and replacing them
58when they fail/pass predefined criteria. It is recommened that your application
59define two schemas, one using the replicated storage and another that just
60connects to the master.
2bf79155 61
62=head1 ATTRIBUTES
63
64This class defines the following attributes.
65
26ab719a 66=head2 pool_type
2bf79155 67
26ab719a 68Contains the classname which will instantiate the L</pool> object. Defaults
69to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
2bf79155 70
71=cut
72
26ab719a 73has 'pool_type' => (
64cdad22 74 is=>'ro',
75 isa=>'ClassName',
76 lazy_build=>1,
77 handles=>{
78 'create_pool' => 'new',
79 },
2bf79155 80);
81
f068a139 82=head2 pool_args
83
84Contains a hashref of initialized information to pass to the Balancer object.
85See L<DBIx::Class::Storage::Replicated::Pool> for available arguments.
86
87=cut
88
89has 'pool_args' => (
64cdad22 90 is=>'ro',
91 isa=>'HashRef',
92 lazy=>1,
93 required=>1,
94 default=>sub { {} },
f068a139 95);
96
97
26ab719a 98=head2 balancer_type
2bf79155 99
100The replication pool requires a balance class to provider the methods for
101choose how to spread the query load across each replicant in the pool.
102
103=cut
104
26ab719a 105has 'balancer_type' => (
64cdad22 106 is=>'ro',
107 isa=>'ClassName',
108 lazy_build=>1,
109 handles=>{
110 'create_balancer' => 'new',
111 },
2bf79155 112);
113
17b05c13 114=head2 balancer_args
115
116Contains a hashref of initialized information to pass to the Balancer object.
f068a139 117See L<DBIx::Class::Storage::Replicated::Balancer> for available arguments.
17b05c13 118
119=cut
120
121has 'balancer_args' => (
64cdad22 122 is=>'ro',
123 isa=>'HashRef',
124 lazy=>1,
125 required=>1,
126 default=>sub { {} },
17b05c13 127);
128
26ab719a 129=head2 pool
2bf79155 130
26ab719a 131Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class. This is a
132container class for one or more replicated databases.
2bf79155 133
134=cut
135
26ab719a 136has 'pool' => (
64cdad22 137 is=>'ro',
138 isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
139 lazy_build=>1,
140 handles=>[qw/
141 connect_replicants
142 replicants
143 has_replicants
144 /],
2bf79155 145);
146
26ab719a 147=head2 balancer
2bf79155 148
26ab719a 149Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class. This
150is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
2bf79155 151
26ab719a 152=cut
2bf79155 153
26ab719a 154has 'balancer' => (
64cdad22 155 is=>'ro',
156 isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
157 lazy_build=>1,
158 handles=>[qw/auto_validate_every/],
26ab719a 159);
2bf79155 160
cb6ec758 161=head2 master
162
163The master defines the canonical state for a pool of connected databases. All
164the replicants are expected to match this databases state. Thus, in a classic
165Master / Slaves distributed system, all the slaves are expected to replicate
166the Master's state as quick as possible. This is the only database in the
167pool of databases that is allowed to handle write traffic.
168
169=cut
170
171has 'master' => (
64cdad22 172 is=> 'ro',
173 isa=>'DBIx::Class::Storage::DBI',
174 lazy_build=>1,
cb6ec758 175);
176
cb6ec758 177=head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
178
179The following methods are delegated all the methods required for the
180L<DBIx::Class::Storage::DBI> interface.
181
182=head2 read_handler
183
184Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
185
186=cut
187
188has 'read_handler' => (
64cdad22 189 is=>'rw',
190 isa=>'Object',
191 lazy_build=>1,
192 handles=>[qw/
193 select
194 select_single
195 columns_info_for
196 /],
cb6ec758 197);
198
cb6ec758 199=head2 write_handler
200
201Defines an object that implements the write side of L<BIx::Class::Storage::DBI>.
202
203=cut
204
205has 'write_handler' => (
64cdad22 206 is=>'ro',
207 isa=>'Object',
208 lazy_build=>1,
209 lazy_build=>1,
210 handles=>[qw/
211 on_connect_do
212 on_disconnect_do
213 connect_info
214 throw_exception
215 sql_maker
216 sqlt_type
217 create_ddl_dir
218 deployment_statements
219 datetime_parser
220 datetime_parser_type
221 last_insert_id
222 insert
223 insert_bulk
224 update
225 delete
226 dbh
227 txn_do
228 txn_commit
229 txn_rollback
230 sth
231 deploy
232 schema
233 reload_row
234 /],
cb6ec758 235);
236
26ab719a 237=head1 METHODS
2bf79155 238
26ab719a 239This class defines the following methods.
2bf79155 240
cb6ec758 241=head2 new
2bf79155 242
cb6ec758 243L<DBIx::Class::Schema> when instantiating it's storage passed itself as the
244first argument. We need to invoke L</new> on the underlying parent class, make
245sure we properly give it a L<Moose> meta class, and then correctly instantiate
246our attributes. Basically we pass on whatever the schema has in it's class
247data for 'storage_type_args' to our replicated storage type.
2bf79155 248
249=cut
250
cb6ec758 251sub new {
64cdad22 252 my $class = shift @_;
253 my $schema = shift @_;
254 my $storage_type_args = shift @_;
255 my $obj = $class->SUPER::new($schema, $storage_type_args, @_);
256
257 ## Hate to do it this way, but can't seem to get advice on the attribute working right
258 ## maybe we can do a type and coercion for it.
259 if( $storage_type_args->{balancer_type} && $storage_type_args->{balancer_type}=~m/^::/) {
260 $storage_type_args->{balancer_type} = 'DBIx::Class::Storage::DBI::Replicated::Balancer'.$storage_type_args->{balancer_type};
261 eval "require $storage_type_args->{balancer_type}";
262 }
263
264 return $class->meta->new_object(
265 __INSTANCE__ => $obj,
266 %$storage_type_args,
267 @_,
268 );
2bf79155 269}
270
cb6ec758 271=head2 _build_master
2bf79155 272
cb6ec758 273Lazy builder for the L</master> attribute.
2bf79155 274
275=cut
276
cb6ec758 277sub _build_master {
64cdad22 278 DBIx::Class::Storage::DBI->new;
2bf79155 279}
280
106d5f3b 281=head2 _build_pool_type
282
283Lazy builder for the L</pool_type> attribute.
284
285=cut
286
287sub _build_pool_type {
64cdad22 288 return 'DBIx::Class::Storage::DBI::Replicated::Pool';
106d5f3b 289}
290
26ab719a 291=head2 _build_pool
2bf79155 292
26ab719a 293Lazy builder for the L</pool> attribute.
2bf79155 294
295=cut
296
26ab719a 297sub _build_pool {
64cdad22 298 my $self = shift @_;
299 $self->create_pool(%{$self->pool_args});
2bf79155 300}
301
106d5f3b 302=head2 _build_balancer_type
303
304Lazy builder for the L</balancer_type> attribute.
305
306=cut
307
308sub _build_balancer_type {
64cdad22 309 return 'DBIx::Class::Storage::DBI::Replicated::Balancer::First';
106d5f3b 310}
311
26ab719a 312=head2 _build_balancer
2bf79155 313
cb6ec758 314Lazy builder for the L</balancer> attribute. This takes a Pool object so that
315the balancer knows which pool it's balancing.
2bf79155 316
317=cut
318
26ab719a 319sub _build_balancer {
64cdad22 320 my $self = shift @_;
321 $self->create_balancer(
322 pool=>$self->pool,
323 master=>$self->master,
324 %{$self->balancer_args},
325 );
2bf79155 326}
327
cb6ec758 328=head2 _build_write_handler
2bf79155 329
cb6ec758 330Lazy builder for the L</write_handler> attribute. The default is to set this to
331the L</master>.
50336325 332
333=cut
334
cb6ec758 335sub _build_write_handler {
64cdad22 336 return shift->master;
cb6ec758 337}
50336325 338
cb6ec758 339=head2 _build_read_handler
2bf79155 340
cb6ec758 341Lazy builder for the L</read_handler> attribute. The default is to set this to
342the L</balancer>.
2bf79155 343
344=cut
345
cb6ec758 346sub _build_read_handler {
64cdad22 347 return shift->balancer;
cb6ec758 348}
50336325 349
cb6ec758 350=head2 around: connect_replicants
2bf79155 351
cb6ec758 352All calls to connect_replicants needs to have an existing $schema tacked onto
353top of the args, since L<DBIx::Storage::DBI> needs it.
955a6df6 354
cb6ec758 355=cut
955a6df6 356
cb6ec758 357around 'connect_replicants' => sub {
64cdad22 358 my ($method, $self, @args) = @_;
359 $self->$method($self->schema, @args);
955a6df6 360};
2bf79155 361
2bf79155 362=head2 all_storages
363
364Returns an array of of all the connected storage backends. The first element
365in the returned array is the master, and the remainings are each of the
366replicants.
367
368=cut
369
370sub all_storages {
64cdad22 371 my $self = shift @_;
372 return grep {defined $_ && blessed $_} (
373 $self->master,
374 $self->replicants,
375 );
2bf79155 376}
377
c4d3fae2 378=head2 execute_reliably ($coderef, ?@args)
379
380Given a coderef, saves the current state of the L</read_handler>, forces it to
381use reliable storage (ie sets it to the master), executes a coderef and then
382restores the original state.
383
384Example:
385
64cdad22 386 my $reliably = sub {
387 my $name = shift @_;
388 $schema->resultset('User')->create({name=>$name});
389 my $user_rs = $schema->resultset('User')->find({name=>$name});
390 return $user_rs;
391 };
c4d3fae2 392
64cdad22 393 my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
c4d3fae2 394
395Use this when you must be certain of your database state, such as when you just
396inserted something and need to get a resultset including it, etc.
397
398=cut
399
400sub execute_reliably {
64cdad22 401 my ($self, $coderef, @args) = @_;
402
403 unless( ref $coderef eq 'CODE') {
404 $self->throw_exception('Second argument must be a coderef');
405 }
406
407 ##Get copy of master storage
408 my $master = $self->master;
409
410 ##Get whatever the current read hander is
411 my $current = $self->read_handler;
412
413 ##Set the read handler to master
414 $self->read_handler($master);
415
416 ## do whatever the caller needs
417 my @result;
418 my $want_array = wantarray;
419
420 eval {
421 if($want_array) {
422 @result = $coderef->(@args);
423 } elsif(defined $want_array) {
424 ($result[0]) = ($coderef->(@args));
ed213e85 425 } else {
64cdad22 426 $coderef->(@args);
427 }
428 };
429
430 ##Reset to the original state
431 $self->read_handler($current);
432
433 ##Exception testing has to come last, otherwise you might leave the
434 ##read_handler set to master.
435
436 if($@) {
437 $self->throw_exception("coderef returned an error: $@");
438 } else {
439 return $want_array ? @result : $result[0];
440 }
c4d3fae2 441}
442
cb6ec758 443=head2 set_reliable_storage
444
445Sets the current $schema to be 'reliable', that is all queries, both read and
446write are sent to the master
64cdad22 447
cb6ec758 448=cut
449
450sub set_reliable_storage {
64cdad22 451 my $self = shift @_;
452 my $schema = $self->schema;
453 my $write_handler = $self->schema->storage->write_handler;
454
455 $schema->storage->read_handler($write_handler);
cb6ec758 456}
457
458=head2 set_balanced_storage
459
460Sets the current $schema to be use the </balancer> for all reads, while all
461writea are sent to the master only
64cdad22 462
cb6ec758 463=cut
464
465sub set_balanced_storage {
64cdad22 466 my $self = shift @_;
467 my $schema = $self->schema;
468 my $write_handler = $self->schema->storage->balancer;
469
470 $schema->storage->read_handler($write_handler);
cb6ec758 471}
2bf79155 472
6834cc1d 473=head2 around: txn_do ($coderef)
c4d3fae2 474
475Overload to the txn_do method, which is delegated to whatever the
476L<write_handler> is set to. We overload this in order to wrap in inside a
477L</execute_reliably> method.
478
479=cut
480
6834cc1d 481around 'txn_do' => sub {
64cdad22 482 my($txn_do, $self, $coderef, @args) = @_;
483 $self->execute_reliably(sub {$self->$txn_do($coderef, @args)});
6834cc1d 484};
c4d3fae2 485
ed213e85 486=head2 reload_row ($row)
487
488Overload to the reload_row method so that the reloading is always directed to
489the master storage.
490
491=cut
492
493around 'reload_row' => sub {
64cdad22 494 my ($reload_row, $self, $row) = @_;
495 return $self->execute_reliably(sub {
496 return $self->$reload_row(shift);
497 }, $row);
ed213e85 498};
499
2bf79155 500=head2 connected
501
502Check that the master and at least one of the replicants is connected.
503
504=cut
505
506sub connected {
64cdad22 507 my $self = shift @_;
508 return
509 $self->master->connected &&
510 $self->pool->connected_replicants;
2bf79155 511}
512
2bf79155 513=head2 ensure_connected
514
515Make sure all the storages are connected.
516
517=cut
518
519sub ensure_connected {
64cdad22 520 my $self = shift @_;
521 foreach my $source ($self->all_storages) {
522 $source->ensure_connected(@_);
523 }
2bf79155 524}
525
2bf79155 526=head2 limit_dialect
527
528Set the limit_dialect for all existing storages
529
530=cut
531
532sub limit_dialect {
64cdad22 533 my $self = shift @_;
534 foreach my $source ($self->all_storages) {
535 $source->limit_dialect(@_);
536 }
2bf79155 537}
538
2bf79155 539=head2 quote_char
540
541Set the quote_char for all existing storages
542
543=cut
544
545sub quote_char {
64cdad22 546 my $self = shift @_;
547 foreach my $source ($self->all_storages) {
548 $source->quote_char(@_);
549 }
2bf79155 550}
551
2bf79155 552=head2 name_sep
553
554Set the name_sep for all existing storages
555
556=cut
557
558sub name_sep {
64cdad22 559 my $self = shift @_;
560 foreach my $source ($self->all_storages) {
561 $source->name_sep(@_);
562 }
2bf79155 563}
564
2bf79155 565=head2 set_schema
566
567Set the schema object for all existing storages
568
569=cut
570
571sub set_schema {
64cdad22 572 my $self = shift @_;
573 foreach my $source ($self->all_storages) {
574 $source->set_schema(@_);
575 }
2bf79155 576}
577
2bf79155 578=head2 debug
579
580set a debug flag across all storages
581
582=cut
583
584sub debug {
64cdad22 585 my $self = shift @_;
586 foreach my $source ($self->all_storages) {
587 $source->debug(@_);
588 }
2bf79155 589}
590
2bf79155 591=head2 debugobj
592
593set a debug object across all storages
594
595=cut
596
597sub debugobj {
64cdad22 598 my $self = shift @_;
599 foreach my $source ($self->all_storages) {
600 $source->debugobj(@_);
601 }
2bf79155 602}
603
2bf79155 604=head2 debugfh
605
606set a debugfh object across all storages
607
608=cut
609
610sub debugfh {
64cdad22 611 my $self = shift @_;
612 foreach my $source ($self->all_storages) {
613 $source->debugfh(@_);
614 }
2bf79155 615}
616
2bf79155 617=head2 debugcb
618
619set a debug callback across all storages
620
621=cut
622
623sub debugcb {
64cdad22 624 my $self = shift @_;
625 foreach my $source ($self->all_storages) {
626 $source->debugcb(@_);
627 }
2bf79155 628}
629
2bf79155 630=head2 disconnect
631
632disconnect everything
633
634=cut
635
636sub disconnect {
64cdad22 637 my $self = shift @_;
638 foreach my $source ($self->all_storages) {
639 $source->disconnect(@_);
640 }
2bf79155 641}
642
f5d3a5de 643=head1 AUTHOR
644
64cdad22 645 John Napiorkowski <john.napiorkowski@takkle.com>
f5d3a5de 646
c4d3fae2 647Based on code originated by:
f5d3a5de 648
64cdad22 649 Norbert Csongrádi <bert@cpan.org>
650 Peter Siklósi <einon@einon.hu>
2156bbdd 651
f5d3a5de 652=head1 LICENSE
653
654You may distribute this code under the same terms as Perl itself.
655
656=cut
657
6581;