1 package DBIx::Class::Storage::DBI::Replicated;
4 use DBIx::Class::Storage::DBI;
5 use DBIx::Class::Storage::DBI::Replicated::Pool;
6 use DBIx::Class::Storage::DBI::Replicated::Balancer;
7 use Scalar::Util qw(blessed);
9 extends 'DBIx::Class::Storage::DBI', 'Moose::Object';
13 DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
17 The Following example shows how to change an existing $schema to a replicated
18 storage type, add some replicated (readonly) databases, and perform reporting
21 ## Change storage_type in your schema class
22 $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
24 ## Add some slaves. Basically this is an array of arrayrefs, where each
25 ## arrayref is database connect information
27 $schema->storage->connect_replicants(
28 [$dsn1, $user, $pass, \%opts],
29 [$dsn1, $user, $pass, \%opts],
30 [$dsn1, $user, $pass, \%opts],
35 Warning: This class is marked ALPHA. We are using this in development and have
36 some basic test coverage but the code hasn't yet been stressed by a variety
37 of databases. Individual DB's may have quirks we are not aware of. Please
38 use this in development and pass along your experiences/bug fixes.
40 This class implements replicated data store for DBI. Currently you can define
41 one master and numerous slave database connections. All write-type queries
42 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
43 database, all read-type queries (SELECTs) go to the slave database.
45 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
46 handle gets delegated to one of the two attributes: L</read_handler> or to
47 L</write_handler>. Additionally, some methods need to be distributed
48 to all existing storages. This way our storage class is a drop in replacement
49 for L<DBIx::Class::Storage::DBI>.
51 Read traffic is spread across the replicants (slaves) occuring to a user
52 selected algorithm. The default algorithm is random weighted.
56 The consistancy betweeen master and replicants is database specific. The Pool
57 gives you a method to validate it's replicants, removing and replacing them
58 when they fail/pass predefined criteria. It is recommened that your application
59 define two schemas, one using the replicated storage and another that just
60 connects to the master.
64 This class defines the following attributes.
68 Contains the classname which will instantiate the L</pool> object. Defaults
69 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
78 'create_pool' => 'new',
84 Contains a hashref of initialized information to pass to the Balancer object.
85 See L<DBIx::Class::Storage::Replicated::Pool> for available arguments.
100 The replication pool requires a balance class to provider the methods for
101 choose how to spread the query load across each replicant in the pool.
105 has 'balancer_type' => (
110 'create_balancer' => 'new',
116 Contains a hashref of initialized information to pass to the Balancer object.
117 See L<DBIx::Class::Storage::Replicated::Balancer> for available arguments.
121 has 'balancer_args' => (
131 Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class. This is a
132 container class for one or more replicated databases.
138 isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
149 Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class. This
150 is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
156 isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
158 handles=>[qw/auto_validate_every/],
163 The master defines the canonical state for a pool of connected databases. All
164 the replicants are expected to match this databases state. Thus, in a classic
165 Master / Slaves distributed system, all the slaves are expected to replicate
166 the Master's state as quick as possible. This is the only database in the
167 pool of databases that is allowed to handle write traffic.
173 isa=>'DBIx::Class::Storage::DBI',
177 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
179 The following methods are delegated all the methods required for the
180 L<DBIx::Class::Storage::DBI> interface.
184 Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
188 has 'read_handler' => (
201 Defines an object that implements the write side of L<BIx::Class::Storage::DBI>.
205 has 'write_handler' => (
218 deployment_statements
239 This class defines the following methods.
243 L<DBIx::Class::Schema> when instantiating it's storage passed itself as the
244 first argument. We need to invoke L</new> on the underlying parent class, make
245 sure we properly give it a L<Moose> meta class, and then correctly instantiate
246 our attributes. Basically we pass on whatever the schema has in it's class
247 data for 'storage_type_args' to our replicated storage type.
252 my $class = shift @_;
253 my $schema = shift @_;
254 my $storage_type_args = shift @_;
255 my $obj = $class->SUPER::new($schema, $storage_type_args, @_);
257 ## Hate to do it this way, but can't seem to get advice on the attribute working right
258 ## maybe we can do a type and coercion for it.
259 if( $storage_type_args->{balancer_type} && $storage_type_args->{balancer_type}=~m/^::/) {
260 $storage_type_args->{balancer_type} = 'DBIx::Class::Storage::DBI::Replicated::Balancer'.$storage_type_args->{balancer_type};
261 eval "require $storage_type_args->{balancer_type}";
264 return $class->meta->new_object(
265 __INSTANCE__ => $obj,
273 Lazy builder for the L</master> attribute.
278 DBIx::Class::Storage::DBI->new;
281 =head2 _build_pool_type
283 Lazy builder for the L</pool_type> attribute.
287 sub _build_pool_type {
288 return 'DBIx::Class::Storage::DBI::Replicated::Pool';
293 Lazy builder for the L</pool> attribute.
299 $self->create_pool(%{$self->pool_args});
302 =head2 _build_balancer_type
304 Lazy builder for the L</balancer_type> attribute.
308 sub _build_balancer_type {
309 return 'DBIx::Class::Storage::DBI::Replicated::Balancer::First';
312 =head2 _build_balancer
314 Lazy builder for the L</balancer> attribute. This takes a Pool object so that
315 the balancer knows which pool it's balancing.
319 sub _build_balancer {
321 $self->create_balancer(
323 master=>$self->master,
324 %{$self->balancer_args},);
327 =head2 _build_write_handler
329 Lazy builder for the L</write_handler> attribute. The default is to set this to
334 sub _build_write_handler {
335 return shift->master;
338 =head2 _build_read_handler
340 Lazy builder for the L</read_handler> attribute. The default is to set this to
345 sub _build_read_handler {
346 return shift->balancer;
349 =head2 around: connect_replicants
351 All calls to connect_replicants needs to have an existing $schema tacked onto
352 top of the args, since L<DBIx::Storage::DBI> needs it.
356 around 'connect_replicants' => sub {
357 my ($method, $self, @args) = @_;
358 $self->$method($self->schema, @args);
363 Returns an array of of all the connected storage backends. The first element
364 in the returned array is the master, and the remainings are each of the
372 return grep {defined $_ && blessed $_} (
378 =head2 execute_reliably ($coderef, ?@args)
380 Given a coderef, saves the current state of the L</read_handler>, forces it to
381 use reliable storage (ie sets it to the master), executes a coderef and then
382 restores the original state.
388 $schema->resultset('User')->create({name=>$name});
389 my $user_rs = $schema->resultset('User')->find({name=>$name});
393 my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
395 Use this when you must be certain of your database state, such as when you just
396 inserted something and need to get a resultset including it, etc.
400 sub execute_reliably {
401 my ($self, $coderef, @args) = @_;
403 unless( ref $coderef eq 'CODE') {
404 $self->throw_exception('Second argument must be a coderef');
407 ##Get copy of master storage
408 my $master = $self->master;
410 ##Get whatever the current read hander is
411 my $current = $self->read_handler;
413 ##Set the read handler to master
414 $self->read_handler($master);
416 ## do whatever the caller needs
418 my $want_array = wantarray;
422 @result = $coderef->(@args);
424 elsif(defined $want_array) {
425 ($result[0]) = ($coderef->(@args));
431 ##Reset to the original state
432 $self->read_handler($current);
434 ##Exception testing has to come last, otherwise you might leave the
435 ##read_handler set to master.
438 $self->throw_exception("coderef returned an error: $@");
440 return $want_array ? @result : $result[0];
444 =head2 set_reliable_storage
446 Sets the current $schema to be 'reliable', that is all queries, both read and
447 write are sent to the master
451 sub set_reliable_storage {
453 my $schema = $self->schema;
454 my $write_handler = $self->schema->storage->write_handler;
456 $schema->storage->read_handler($write_handler);
459 =head2 set_balanced_storage
461 Sets the current $schema to be use the </balancer> for all reads, while all
462 writea are sent to the master only
466 sub set_balanced_storage {
468 my $schema = $self->schema;
469 my $write_handler = $self->schema->storage->balancer;
471 $schema->storage->read_handler($write_handler);
474 =head2 around: txn_do ($coderef)
476 Overload to the txn_do method, which is delegated to whatever the
477 L<write_handler> is set to. We overload this in order to wrap in inside a
478 L</execute_reliably> method.
482 around 'txn_do' => sub {
483 my($txn_do, $self, $coderef, @args) = @_;
484 $self->execute_reliably(sub {$self->$txn_do($coderef, @args)});
487 =head2 reload_row ($row)
489 Overload to the reload_row method so that the reloading is always directed to
494 around 'reload_row' => sub {
495 my ($reload_row, $self, $row) = @_;
496 return $self->execute_reliably(sub {
497 return $self->$reload_row(shift);
503 Check that the master and at least one of the replicants is connected.
511 $self->master->connected &&
512 $self->pool->connected_replicants;
515 =head2 ensure_connected
517 Make sure all the storages are connected.
521 sub ensure_connected {
523 foreach my $source ($self->all_storages) {
524 $source->ensure_connected(@_);
530 Set the limit_dialect for all existing storages
536 foreach my $source ($self->all_storages) {
537 $source->limit_dialect(@_);
543 Set the quote_char for all existing storages
549 foreach my $source ($self->all_storages) {
550 $source->quote_char(@_);
556 Set the name_sep for all existing storages
562 foreach my $source ($self->all_storages) {
563 $source->name_sep(@_);
569 Set the schema object for all existing storages
575 foreach my $source ($self->all_storages) {
576 $source->set_schema(@_);
582 set a debug flag across all storages
588 foreach my $source ($self->all_storages) {
595 set a debug object across all storages
601 foreach my $source ($self->all_storages) {
602 $source->debugobj(@_);
608 set a debugfh object across all storages
614 foreach my $source ($self->all_storages) {
615 $source->debugfh(@_);
621 set a debug callback across all storages
627 foreach my $source ($self->all_storages) {
628 $source->debugcb(@_);
634 disconnect everything
640 foreach my $source ($self->all_storages) {
641 $source->disconnect(@_);
647 John Napiorkowski <john.napiorkowski@takkle.com>
649 Based on code originated by:
651 Norbert Csongrádi <bert@cpan.org>
652 Peter Siklósi <einon@einon.hu>
656 You may distribute this code under the same terms as Perl itself.