created storage method to execute a coderef using master storage only, changed tnx_do...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
CommitLineData
2156bbdd 1package DBIx::Class::Storage::DBI::Replicated;
f5d3a5de 2
2bf79155 3use Moose;
26ab719a 4use DBIx::Class::Storage::DBI;
2bf79155 5use DBIx::Class::Storage::DBI::Replicated::Pool;
26ab719a 6use DBIx::Class::Storage::DBI::Replicated::Balancer;
7use Scalar::Util qw(blessed);
2bf79155 8
26ab719a 9extends 'DBIx::Class::Storage::DBI', 'Moose::Object';
2bf79155 10
11=head1 NAME
12
13DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
14
15=head1 SYNOPSIS
16
17The Following example shows how to change an existing $schema to a replicated
18storage type, add some replicated (readonly) databases, and perform reporting
955a6df6 19tasks.
2bf79155 20
21 ## Change storage_type in your schema class
106d5f3b 22 $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
2bf79155 23
24 ## Add some slaves. Basically this is an array of arrayrefs, where each
25 ## arrayref is database connect information
26
955a6df6 27 $schema->storage->connect_replicants(
2bf79155 28 [$dsn1, $user, $pass, \%opts],
29 [$dsn1, $user, $pass, \%opts],
30 [$dsn1, $user, $pass, \%opts],
2bf79155 31 );
2bf79155 32
33=head1 DESCRIPTION
34
35Warning: This class is marked ALPHA. We are using this in development and have
36some basic test coverage but the code hasn't yet been stressed by a variety
37of databases. Individual DB's may have quirks we are not aware of. Please
38use this in development and pass along your experiences/bug fixes.
39
40This class implements replicated data store for DBI. Currently you can define
41one master and numerous slave database connections. All write-type queries
42(INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
43database, all read-type queries (SELECTs) go to the slave database.
44
45Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
bca099a3 46handle gets delegated to one of the two attributes: L</read_handler> or to
47L</write_handler>. Additionally, some methods need to be distributed
2bf79155 48to all existing storages. This way our storage class is a drop in replacement
49for L<DBIx::Class::Storage::DBI>.
50
51Read traffic is spread across the replicants (slaves) occuring to a user
52selected algorithm. The default algorithm is random weighted.
53
bca099a3 54=head1 NOTES
55
56The consistancy betweeen master and replicants is database specific. The Pool
57gives you a method to validate it's replicants, removing and replacing them
58when they fail/pass predefined criteria. It is recommened that your application
59define two schemas, one using the replicated storage and another that just
60connects to the master.
2bf79155 61
62=head1 ATTRIBUTES
63
64This class defines the following attributes.
65
26ab719a 66=head2 pool_type
2bf79155 67
26ab719a 68Contains the classname which will instantiate the L</pool> object. Defaults
69to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
2bf79155 70
71=cut
72
26ab719a 73has 'pool_type' => (
2bf79155 74 is=>'ro',
75 isa=>'ClassName',
106d5f3b 76 lazy_build=>1,
26ab719a 77 handles=>{
78 'create_pool' => 'new',
2bf79155 79 },
80);
81
f068a139 82=head2 pool_args
83
84Contains a hashref of initialized information to pass to the Balancer object.
85See L<DBIx::Class::Storage::Replicated::Pool> for available arguments.
86
87=cut
88
89has 'pool_args' => (
90 is=>'ro',
91 isa=>'HashRef',
92 lazy=>1,
93 required=>1,
94 default=>sub { {} },
95);
96
97
26ab719a 98=head2 balancer_type
2bf79155 99
100The replication pool requires a balance class to provider the methods for
101choose how to spread the query load across each replicant in the pool.
102
103=cut
104
26ab719a 105has 'balancer_type' => (
2bf79155 106 is=>'ro',
107 isa=>'ClassName',
106d5f3b 108 lazy_build=>1,
26ab719a 109 handles=>{
110 'create_balancer' => 'new',
2bf79155 111 },
112);
113
17b05c13 114=head2 balancer_args
115
116Contains a hashref of initialized information to pass to the Balancer object.
f068a139 117See L<DBIx::Class::Storage::Replicated::Balancer> for available arguments.
17b05c13 118
119=cut
120
121has 'balancer_args' => (
122 is=>'ro',
123 isa=>'HashRef',
f068a139 124 lazy=>1,
125 required=>1,
126 default=>sub { {} },
17b05c13 127);
128
26ab719a 129=head2 pool
2bf79155 130
26ab719a 131Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class. This is a
132container class for one or more replicated databases.
2bf79155 133
134=cut
135
26ab719a 136has 'pool' => (
2bf79155 137 is=>'ro',
138 isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
139 lazy_build=>1,
26ab719a 140 handles=>[qw/
cb6ec758 141 connect_replicants
26ab719a 142 replicants
143 has_replicants
26ab719a 144 /],
2bf79155 145);
146
26ab719a 147=head2 balancer
2bf79155 148
26ab719a 149Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class. This
150is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
2bf79155 151
26ab719a 152=cut
2bf79155 153
26ab719a 154has 'balancer' => (
155 is=>'ro',
156 isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
157 lazy_build=>1,
17b05c13 158 handles=>[qw/auto_validate_every/],
26ab719a 159);
2bf79155 160
cb6ec758 161=head2 master
162
163The master defines the canonical state for a pool of connected databases. All
164the replicants are expected to match this databases state. Thus, in a classic
165Master / Slaves distributed system, all the slaves are expected to replicate
166the Master's state as quick as possible. This is the only database in the
167pool of databases that is allowed to handle write traffic.
168
169=cut
170
171has 'master' => (
172 is=> 'ro',
173 isa=>'DBIx::Class::Storage::DBI',
174 lazy_build=>1,
175);
176
cb6ec758 177=head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
178
179The following methods are delegated all the methods required for the
180L<DBIx::Class::Storage::DBI> interface.
181
182=head2 read_handler
183
184Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
185
186=cut
187
188has 'read_handler' => (
189 is=>'rw',
190 isa=>'Object',
191 lazy_build=>1,
192 handles=>[qw/
193 select
194 select_single
195 columns_info_for
196 /],
197);
198
cb6ec758 199=head2 write_handler
200
201Defines an object that implements the write side of L<BIx::Class::Storage::DBI>.
202
203=cut
204
205has 'write_handler' => (
206 is=>'ro',
207 isa=>'Object',
208 lazy_build=>1,
209 lazy_build=>1,
210 handles=>[qw/
211 on_connect_do
212 on_disconnect_do
213 connect_info
214 throw_exception
215 sql_maker
216 sqlt_type
217 create_ddl_dir
218 deployment_statements
219 datetime_parser
220 datetime_parser_type
221 last_insert_id
222 insert
223 insert_bulk
224 update
225 delete
226 dbh
cb6ec758 227 txn_commit
228 txn_rollback
229 sth
230 deploy
231 schema
232 /],
233);
234
26ab719a 235=head1 METHODS
2bf79155 236
26ab719a 237This class defines the following methods.
2bf79155 238
cb6ec758 239=head2 new
2bf79155 240
cb6ec758 241L<DBIx::Class::Schema> when instantiating it's storage passed itself as the
242first argument. We need to invoke L</new> on the underlying parent class, make
243sure we properly give it a L<Moose> meta class, and then correctly instantiate
244our attributes. Basically we pass on whatever the schema has in it's class
245data for 'storage_type_args' to our replicated storage type.
2bf79155 246
247=cut
248
cb6ec758 249sub new {
250 my $class = shift @_;
251 my $schema = shift @_;
252 my $storage_type_args = shift @_;
253 my $obj = $class->SUPER::new($schema, $storage_type_args, @_);
106d5f3b 254
255 ## Hate to do it this way, but can't seem to get advice on the attribute working right
256 ## maybe we can do a type and coercion for it.
257 if( $storage_type_args->{balancer_type} && $storage_type_args->{balancer_type}=~m/^::/) {
258 $storage_type_args->{balancer_type} = 'DBIx::Class::Storage::DBI::Replicated::Balancer'.$storage_type_args->{balancer_type};
259 eval "require $storage_type_args->{balancer_type}";
260 }
261
cb6ec758 262 return $class->meta->new_object(
263 __INSTANCE__ => $obj,
264 %$storage_type_args,
265 @_,
266 );
2bf79155 267}
268
cb6ec758 269=head2 _build_master
2bf79155 270
cb6ec758 271Lazy builder for the L</master> attribute.
2bf79155 272
273=cut
274
cb6ec758 275sub _build_master {
276 DBIx::Class::Storage::DBI->new;
2bf79155 277}
278
106d5f3b 279=head2 _build_pool_type
280
281Lazy builder for the L</pool_type> attribute.
282
283=cut
284
285sub _build_pool_type {
286 return 'DBIx::Class::Storage::DBI::Replicated::Pool';
287}
288
26ab719a 289=head2 _build_pool
2bf79155 290
26ab719a 291Lazy builder for the L</pool> attribute.
2bf79155 292
293=cut
294
26ab719a 295sub _build_pool {
4a607d7a 296 my $self = shift @_;
f068a139 297 $self->create_pool(%{$self->pool_args});
2bf79155 298}
299
106d5f3b 300=head2 _build_balancer_type
301
302Lazy builder for the L</balancer_type> attribute.
303
304=cut
305
306sub _build_balancer_type {
17b05c13 307 return 'DBIx::Class::Storage::DBI::Replicated::Balancer::First';
106d5f3b 308}
309
26ab719a 310=head2 _build_balancer
2bf79155 311
cb6ec758 312Lazy builder for the L</balancer> attribute. This takes a Pool object so that
313the balancer knows which pool it's balancing.
2bf79155 314
315=cut
316
26ab719a 317sub _build_balancer {
318 my $self = shift @_;
106d5f3b 319 $self->create_balancer(
320 pool=>$self->pool,
17b05c13 321 master=>$self->master,
322 %{$self->balancer_args},);
2bf79155 323}
324
cb6ec758 325=head2 _build_write_handler
2bf79155 326
cb6ec758 327Lazy builder for the L</write_handler> attribute. The default is to set this to
328the L</master>.
50336325 329
330=cut
331
cb6ec758 332sub _build_write_handler {
333 return shift->master;
334}
50336325 335
cb6ec758 336=head2 _build_read_handler
2bf79155 337
cb6ec758 338Lazy builder for the L</read_handler> attribute. The default is to set this to
339the L</balancer>.
2bf79155 340
341=cut
342
cb6ec758 343sub _build_read_handler {
344 return shift->balancer;
345}
50336325 346
cb6ec758 347=head2 around: connect_replicants
2bf79155 348
cb6ec758 349All calls to connect_replicants needs to have an existing $schema tacked onto
350top of the args, since L<DBIx::Storage::DBI> needs it.
955a6df6 351
cb6ec758 352=cut
955a6df6 353
cb6ec758 354around 'connect_replicants' => sub {
355 my ($method, $self, @args) = @_;
356 $self->$method($self->schema, @args);
955a6df6 357};
2bf79155 358
2bf79155 359=head2 all_storages
360
361Returns an array of of all the connected storage backends. The first element
362in the returned array is the master, and the remainings are each of the
363replicants.
364
365=cut
366
367sub all_storages {
368 my $self = shift @_;
369
26ab719a 370 return grep {defined $_ && blessed $_} (
371 $self->master,
372 $self->replicants,
2bf79155 373 );
374}
375
c4d3fae2 376=head2 execute_reliably ($coderef, ?@args)
377
378Given a coderef, saves the current state of the L</read_handler>, forces it to
379use reliable storage (ie sets it to the master), executes a coderef and then
380restores the original state.
381
382Example:
383
384 my $reliably = sub {
385 my $name = shift @_;
386 $schema->resultset('User')->create({name=>$name});
387 my $user_rs = $schema->resultset('User')->find({name=>$name});
388 };
389
390 $schema->storage->execute_reliably($reliably, 'John');
391
392Use this when you must be certain of your database state, such as when you just
393inserted something and need to get a resultset including it, etc.
394
395=cut
396
397sub execute_reliably {
398 my ($self, $coderef, @args) = @_;
399
400 unless( ref $coderef eq 'CODE') {
401 $self->throw_exception('Second argument must be a coderef');
402 }
403
404 ##Get copy of master storage
405 my $master = $self->master;
406
407 ##Get whatever the current read hander is
408 my $current = $self->read_handler;
409
410 ##Set the read handler to master
411 $self->read_handler($master);
412
413 ## do whatever the caller needs
414 eval {
415 $coderef->(@args);
416 };
417
418 if($@) {
419 $self->throw_exception("coderef returned an error: $@");
420 }
421
422 ##Reset to the original state
423 $self->schema->storage->read_handler($current);
424}
425
cb6ec758 426=head2 set_reliable_storage
427
428Sets the current $schema to be 'reliable', that is all queries, both read and
429write are sent to the master
430
431=cut
432
433sub set_reliable_storage {
434 my $self = shift @_;
435 my $schema = $self->schema;
436 my $write_handler = $self->schema->storage->write_handler;
437
438 $schema->storage->read_handler($write_handler);
439}
440
441=head2 set_balanced_storage
442
443Sets the current $schema to be use the </balancer> for all reads, while all
444writea are sent to the master only
445
446=cut
447
448sub set_balanced_storage {
449 my $self = shift @_;
450 my $schema = $self->schema;
451 my $write_handler = $self->schema->storage->balancer;
452
453 $schema->storage->read_handler($write_handler);
454}
2bf79155 455
c4d3fae2 456=head2 txn_do ($coderef)
457
458Overload to the txn_do method, which is delegated to whatever the
459L<write_handler> is set to. We overload this in order to wrap in inside a
460L</execute_reliably> method.
461
462=cut
463
464sub txn_do {
465 my($self, $coderef, @args) = @_;
466 $self->execute_reliably($coderef, @args);
467}
468
2bf79155 469=head2 connected
470
471Check that the master and at least one of the replicants is connected.
472
473=cut
474
475sub connected {
476 my $self = shift @_;
477
478 return
26ab719a 479 $self->master->connected &&
480 $self->pool->connected_replicants;
2bf79155 481}
482
2bf79155 483=head2 ensure_connected
484
485Make sure all the storages are connected.
486
487=cut
488
489sub ensure_connected {
490 my $self = shift @_;
26ab719a 491 foreach my $source ($self->all_storages) {
2bf79155 492 $source->ensure_connected(@_);
493 }
494}
495
2bf79155 496=head2 limit_dialect
497
498Set the limit_dialect for all existing storages
499
500=cut
501
502sub limit_dialect {
503 my $self = shift @_;
26ab719a 504 foreach my $source ($self->all_storages) {
505 $source->limit_dialect(@_);
2bf79155 506 }
507}
508
2bf79155 509=head2 quote_char
510
511Set the quote_char for all existing storages
512
513=cut
514
515sub quote_char {
516 my $self = shift @_;
26ab719a 517 foreach my $source ($self->all_storages) {
518 $source->quote_char(@_);
2bf79155 519 }
520}
521
2bf79155 522=head2 name_sep
523
524Set the name_sep for all existing storages
525
526=cut
527
528sub name_sep {
529 my $self = shift @_;
26ab719a 530 foreach my $source ($self->all_storages) {
2bf79155 531 $source->name_sep(@_);
532 }
533}
534
2bf79155 535=head2 set_schema
536
537Set the schema object for all existing storages
538
539=cut
540
541sub set_schema {
542 my $self = shift @_;
26ab719a 543 foreach my $source ($self->all_storages) {
2bf79155 544 $source->set_schema(@_);
545 }
546}
547
2bf79155 548=head2 debug
549
550set a debug flag across all storages
551
552=cut
553
554sub debug {
555 my $self = shift @_;
26ab719a 556 foreach my $source ($self->all_storages) {
2bf79155 557 $source->debug(@_);
558 }
559}
560
2bf79155 561=head2 debugobj
562
563set a debug object across all storages
564
565=cut
566
567sub debugobj {
568 my $self = shift @_;
26ab719a 569 foreach my $source ($self->all_storages) {
2bf79155 570 $source->debugobj(@_);
571 }
572}
573
2bf79155 574=head2 debugfh
575
576set a debugfh object across all storages
577
578=cut
579
580sub debugfh {
581 my $self = shift @_;
26ab719a 582 foreach my $source ($self->all_storages) {
2bf79155 583 $source->debugfh(@_);
584 }
585}
586
2bf79155 587=head2 debugcb
588
589set a debug callback across all storages
590
591=cut
592
593sub debugcb {
594 my $self = shift @_;
26ab719a 595 foreach my $source ($self->all_storages) {
2bf79155 596 $source->debugcb(@_);
597 }
598}
599
2bf79155 600=head2 disconnect
601
602disconnect everything
603
604=cut
605
606sub disconnect {
607 my $self = shift @_;
26ab719a 608 foreach my $source ($self->all_storages) {
2bf79155 609 $source->disconnect(@_);
610 }
611}
612
f5d3a5de 613=head1 AUTHOR
614
c4d3fae2 615 John Napiorkowski <john.napiorkowski@takkle.com>
f5d3a5de 616
c4d3fae2 617Based on code originated by:
f5d3a5de 618
c4d3fae2 619 Norbert Csongrádi <bert@cpan.org>
620 Peter Siklósi <einon@einon.hu>
2156bbdd 621
f5d3a5de 622=head1 LICENSE
623
624You may distribute this code under the same terms as Perl itself.
625
626=cut
627
6281;