73998d2e7a07aa607b8e2cb6da41c3d84f7c1ac1
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 use warnings;
4 use strict;
5
6 BEGIN {
7   require DBIx::Class::Optional::Dependencies;
8   if ( my $missing = DBIx::Class::Optional::Dependencies->req_missing_for('replicated') ) {
9     die "The following modules are required for Replicated storage support: $missing\n";
10   }
11 }
12
13 use Moose;
14 use DBIx::Class::Storage::DBI;
15 use DBIx::Class::Storage::DBI::Replicated::Pool;
16 use DBIx::Class::Storage::DBI::Replicated::Balancer;
17 use DBIx::Class::Storage::DBI::Replicated::Types qw/BalancerClassNamePart DBICSchema DBICStorageDBI/;
18 use MooseX::Types::Moose qw/ClassName HashRef Object/;
19 use Scalar::Util 'reftype';
20 use Hash::Merge;
21 use List::Util qw/min max reduce/;
22 use Context::Preserve 'preserve_context';
23 use Try::Tiny;
24 use DBIx::Class::_Util 'dbic_internal_try';
25
26 use namespace::clean -except => 'meta';
27
28 =head1 NAME
29
30 DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
31
32 =head1 SYNOPSIS
33
34 The Following example shows how to change an existing $schema to a replicated
35 storage type, add some replicated (read-only) databases, and perform reporting
36 tasks.
37
38 You should set the 'storage_type attribute to a replicated type.  You should
39 also define your arguments, such as which balancer you want and any arguments
40 that the Pool object should get.
41
42   my $schema = Schema::Class->clone;
43   $schema->storage_type(['::DBI::Replicated', { balancer_type => '::Random' }]);
44   $schema->connection(...);
45
46 Next, you need to add in the Replicants.  Basically this is an array of
47 arrayrefs, where each arrayref is database connect information.  Think of these
48 arguments as what you'd pass to the 'normal' $schema->connect method.
49
50   $schema->storage->connect_replicants(
51     [$dsn1, $user, $pass, \%opts],
52     [$dsn2, $user, $pass, \%opts],
53     [$dsn3, $user, $pass, \%opts],
54   );
55
56 Now, just use the $schema as you normally would.  Automatically all reads will
57 be delegated to the replicants, while writes to the master.
58
59   $schema->resultset('Source')->search({name=>'etc'});
60
61 You can force a given query to use a particular storage using the search
62 attribute 'force_pool'.  For example:
63
64   my $rs = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
65
66 Now $rs will force everything (both reads and writes) to use whatever was setup
67 as the master storage.  'master' is hardcoded to always point to the Master,
68 but you can also use any Replicant name.  Please see:
69 L<DBIx::Class::Storage::DBI::Replicated::Pool> and the replicants attribute for more.
70
71 Also see transactions and L</execute_reliably> for alternative ways to
72 force read traffic to the master.  In general, you should wrap your statements
73 in a transaction when you are reading and writing to the same tables at the
74 same time, since your replicants will often lag a bit behind the master.
75
76 If you have a multi-statement read only transaction you can force it to select
77 a random server in the pool by:
78
79   my $rs = $schema->resultset('Source')->search( undef,
80     { force_pool => $db->storage->read_handler->next_storage }
81   );
82
83 =head1 DESCRIPTION
84
85 Warning: This class is marked BETA.  This has been running a production
86 website using MySQL native replication as its backend and we have some decent
87 test coverage but the code hasn't yet been stressed by a variety of databases.
88 Individual DBs may have quirks we are not aware of.  Please use this in first
89 development and pass along your experiences/bug fixes.
90
91 This class implements replicated data store for DBI. Currently you can define
92 one master and numerous slave database connections. All write-type queries
93 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
94 database, all read-type queries (SELECTs) go to the slave database.
95
96 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
97 handle gets delegated to one of the two attributes: L</read_handler> or to
98 L</write_handler>.  Additionally, some methods need to be distributed
99 to all existing storages.  This way our storage class is a drop in replacement
100 for L<DBIx::Class::Storage::DBI>.
101
102 Read traffic is spread across the replicants (slaves) occurring to a user
103 selected algorithm.  The default algorithm is random weighted.
104
105 =head1 NOTES
106
107 The consistency between master and replicants is database specific.  The Pool
108 gives you a method to validate its replicants, removing and replacing them
109 when they fail/pass predefined criteria.  Please make careful use of the ways
110 to force a query to run against Master when needed.
111
112 =head1 REQUIREMENTS
113
114 Replicated Storage has additional requirements not currently part of
115 L<DBIx::Class>. See L<DBIx::Class::Optional::Dependencies> for more details.
116
117 =head1 ATTRIBUTES
118
119 This class defines the following attributes.
120
121 =head2 schema
122
123 The underlying L<DBIx::Class::Schema> object this storage is attaching
124
125 =cut
126
127 has 'schema' => (
128     is=>'rw',
129     isa=>DBICSchema,
130     weak_ref=>1,
131     required=>1,
132 );
133
134 =head2 pool_type
135
136 Contains the classname which will instantiate the L</pool> object.  Defaults
137 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
138
139 =cut
140
141 has 'pool_type' => (
142   is=>'rw',
143   isa=>ClassName,
144   default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
145   handles=>{
146     'create_pool' => 'new',
147   },
148 );
149
150 =head2 pool_args
151
152 Contains a hashref of initialized information to pass to the Balancer object.
153 See L<DBIx::Class::Storage::DBI::Replicated::Pool> for available arguments.
154
155 =cut
156
157 has 'pool_args' => (
158   is=>'rw',
159   isa=>HashRef,
160   lazy=>1,
161   default=>sub { {} },
162 );
163
164
165 =head2 balancer_type
166
167 The replication pool requires a balance class to provider the methods for
168 choose how to spread the query load across each replicant in the pool.
169
170 =cut
171
172 has 'balancer_type' => (
173   is=>'rw',
174   isa=>BalancerClassNamePart,
175   coerce=>1,
176   required=>1,
177   default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
178   handles=>{
179     'create_balancer' => 'new',
180   },
181 );
182
183 =head2 balancer_args
184
185 Contains a hashref of initialized information to pass to the Balancer object.
186 See L<DBIx::Class::Storage::DBI::Replicated::Balancer> for available arguments.
187
188 =cut
189
190 has 'balancer_args' => (
191   is=>'rw',
192   isa=>HashRef,
193   lazy=>1,
194   required=>1,
195   default=>sub { {} },
196 );
197
198 =head2 pool
199
200 Is a L<DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
201 container class for one or more replicated databases.
202
203 =cut
204
205 has 'pool' => (
206   is=>'ro',
207   isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
208   lazy_build=>1,
209   handles=>[qw/
210     connect_replicants
211     replicants
212     has_replicants
213   /],
214 );
215
216 =head2 balancer
217
218 Is a L<DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This
219 is a class that takes a pool (L<DBIx::Class::Storage::DBI::Replicated::Pool>)
220
221 =cut
222
223 has 'balancer' => (
224   is=>'rw',
225   isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
226   lazy_build=>1,
227   handles=>[qw/auto_validate_every/],
228 );
229
230 =head2 master
231
232 The master defines the canonical state for a pool of connected databases.  All
233 the replicants are expected to match this databases state.  Thus, in a classic
234 Master / Slaves distributed system, all the slaves are expected to replicate
235 the Master's state as quick as possible.  This is the only database in the
236 pool of databases that is allowed to handle write traffic.
237
238 =cut
239
240 has 'master' => (
241   is=> 'ro',
242   isa=>DBICStorageDBI,
243   lazy_build=>1,
244 );
245
246 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
247
248 The following methods are delegated all the methods required for the
249 L<DBIx::Class::Storage::DBI> interface.
250
251 =cut
252
253 my $method_dispatch = {
254   writer => [qw/
255     on_connect_do
256     on_disconnect_do
257     on_connect_call
258     on_disconnect_call
259     connect_info
260     _connect_info
261     throw_exception
262     sql_maker
263     sqlt_type
264     create_ddl_dir
265     deployment_statements
266     datetime_parser
267     datetime_parser_type
268     build_datetime_parser
269     last_insert_id
270     insert
271     update
272     delete
273     dbh
274     txn_begin
275     txn_do
276     txn_commit
277     txn_rollback
278     txn_scope_guard
279     _exec_txn_rollback
280     _exec_txn_begin
281     _exec_txn_commit
282     deploy
283     with_deferred_fk_checks
284     dbh_do
285     _prep_for_execute
286     is_datatype_numeric
287     _count_select
288     svp_rollback
289     svp_begin
290     svp_release
291     relname_to_table_alias
292     _dbh_last_insert_id
293     _default_dbi_connect_attributes
294     _dbi_connect_info
295     _dbic_connect_attributes
296     auto_savepoint
297     _query_start
298     _query_end
299     _format_for_trace
300     _dbi_attrs_for_bind
301     bind_attribute_by_data_type
302     transaction_depth
303     _dbh
304     _select_args
305     _dbh_execute_for_fetch
306     _sql_maker
307     _dbh_execute_inserts_with_no_binds
308     _select_args_to_query
309     _gen_sql_bind
310     _svp_generate_name
311     _normalize_connect_info
312     _parse_connect_do
313     savepoints
314     _sql_maker_opts
315     _use_multicolumn_in
316     _conn_pid
317     _dbh_autocommit
318     _native_data_type
319     _get_dbh
320     sql_maker_class
321     insert_bulk
322     _insert_bulk
323     _execute
324     _do_query
325     _dbh_execute
326   /, Class::MOP::Class->initialize('DBIx::Class::Storage::DBIHacks')->get_method_list ],
327   reader => [qw/
328     select
329     select_single
330     columns_info_for
331     _dbh_columns_info_for
332     _select
333   /],
334   unimplemented => [qw/
335     _arm_global_destructor
336     _verify_pid
337     __delicate_rollback
338
339     get_use_dbms_capability
340     set_use_dbms_capability
341     get_dbms_capability
342     set_dbms_capability
343     _dbh_details
344     _dbh_get_info
345     _get_rdbms_name
346
347     _determine_connector_driver
348     _extract_driver_from_connect_info
349     _describe_connection
350     _warn_undetermined_driver
351
352     sql_limit_dialect
353     sql_quote_char
354     sql_name_sep
355
356     _prefetch_autovalues
357     _perform_autoinc_retrieval
358     _autoinc_supplied_for_op
359
360     _resolve_bindattrs
361
362     _max_column_bytesize
363     _is_lob_type
364     _is_binary_lob_type
365     _is_binary_type
366     _is_text_lob_type
367
368     _prepare_sth
369     _bind_sth_params
370   /,(
371     # the capability framework
372     # not sure if CMOP->initialize does evil things to DBIC::S::DBI, fix if a problem
373     grep
374       { $_ =~ /^ _ (?: use | supports | determine_supports ) _ /x and $_ ne '_use_multicolumn_in' }
375       ( Class::MOP::Class->initialize('DBIx::Class::Storage::DBI')->get_all_method_names )
376   )],
377 };
378
379 # this only happens during DBIC-internal testing
380 if ( $INC{"t/lib/ANFANG.pm"} ) {
381
382   my $seen;
383   for my $type (keys %$method_dispatch) {
384     for (@{$method_dispatch->{$type}}) {
385       push @{$seen->{$_}}, $type;
386     }
387   }
388
389   if (my @dupes = grep { @{$seen->{$_}} > 1 } keys %$seen) {
390     die(join "\n", '',
391       'The following methods show up multiple times in ::Storage::DBI::Replicated handlers:',
392       (map { "$_: " . (join ', ', @{$seen->{$_}}) } sort @dupes),
393       '',
394     );
395   }
396
397   if (my @cant = grep { ! DBIx::Class::Storage::DBI->can($_) } keys %$seen) {
398     die(join "\n", '',
399       '::Storage::DBI::Replicated specifies handling of the following *NON EXISTING* ::Storage::DBI methods:',
400       @cant,
401       '',
402     );
403   }
404 }
405
406 for my $method (@{$method_dispatch->{unimplemented}}) {
407   __PACKAGE__->meta->add_method($method, sub {
408     my $self = shift;
409     $self->throw_exception("$method() must not be called on ".(blessed $self).' objects');
410   });
411 }
412
413 =head2 read_handler
414
415 Defines an object that implements the read side of L<DBIx::Class::Storage::DBI>.
416
417 =cut
418
419 has 'read_handler' => (
420   is=>'rw',
421   isa=>Object,
422   lazy_build=>1,
423   handles=>$method_dispatch->{reader},
424 );
425
426 =head2 write_handler
427
428 Defines an object that implements the write side of L<DBIx::Class::Storage::DBI>,
429 as well as methods that don't write or read that can be called on only one
430 storage, methods that return a C<$dbh>, and any methods that don't make sense to
431 run on a replicant.
432
433 =cut
434
435 has 'write_handler' => (
436   is=>'ro',
437   isa=>Object,
438   lazy_build=>1,
439   handles=>$method_dispatch->{writer},
440 );
441
442
443
444 has _master_connect_info_opts =>
445   (is => 'rw', isa => HashRef, default => sub { {} });
446
447 =head2 around: connect_info
448
449 Preserves master's C<connect_info> options (for merging with replicants.)
450 Also sets any Replicated-related options from connect_info, such as
451 C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>.
452
453 =cut
454
455 around connect_info => sub {
456   my ($next, $self, $info, @extra) = @_;
457
458   $self->throw_exception(
459     'connect_info can not be retrieved from a replicated storage - '
460   . 'accessor must be called on a specific pool instance'
461   ) unless defined $info;
462
463   my $merge = Hash::Merge->new('LEFT_PRECEDENT');
464
465   my %opts;
466   for my $arg (@$info) {
467     next unless (reftype($arg)||'') eq 'HASH';
468     %opts = %{ $merge->merge($arg, \%opts) };
469   }
470   delete $opts{dsn};
471
472   if (@opts{qw/pool_type pool_args/}) {
473     $self->pool_type(delete $opts{pool_type})
474       if $opts{pool_type};
475
476     $self->pool_args(
477       $merge->merge((delete $opts{pool_args} || {}), $self->pool_args)
478     );
479
480     ## Since we possibly changed the pool_args, we need to clear the current
481     ## pool object so that next time it is used it will be rebuilt.
482     $self->clear_pool;
483   }
484
485   if (@opts{qw/balancer_type balancer_args/}) {
486     $self->balancer_type(delete $opts{balancer_type})
487       if $opts{balancer_type};
488
489     $self->balancer_args(
490       $merge->merge((delete $opts{balancer_args} || {}), $self->balancer_args)
491     );
492
493     $self->balancer($self->_build_balancer)
494       if $self->balancer;
495   }
496
497   $self->_master_connect_info_opts(\%opts);
498
499   return preserve_context {
500     $self->$next($info, @extra);
501   } after => sub {
502     # Make sure master is blessed into the correct class and apply role to it.
503     my $master = $self->master;
504     $master->_determine_driver;
505     Moose::Meta::Class->initialize(ref $master);
506
507     DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master);
508
509     # link pool back to master
510     $self->pool->master($master);
511   };
512 };
513
514 =head1 METHODS
515
516 This class defines the following methods.
517
518 =head2 BUILDARGS
519
520 L<DBIx::Class::Schema> when instantiating its storage passed itself as the
521 first argument.  So we need to massage the arguments a bit so that all the
522 bits get put into the correct places.
523
524 =cut
525
526 sub BUILDARGS {
527   my ($class, $schema, $storage_type_args, @args) = @_;
528
529   return {
530     schema=>$schema,
531     %$storage_type_args,
532     @args
533   }
534 }
535
536 =head2 _build_master
537
538 Lazy builder for the L</master> attribute.
539
540 =cut
541
542 sub _build_master {
543   my $self = shift @_;
544   my $master = DBIx::Class::Storage::DBI->new($self->schema);
545   $master
546 }
547
548 =head2 _build_pool
549
550 Lazy builder for the L</pool> attribute.
551
552 =cut
553
554 sub _build_pool {
555   my $self = shift @_;
556   $self->create_pool(%{$self->pool_args});
557 }
558
559 =head2 _build_balancer
560
561 Lazy builder for the L</balancer> attribute.  This takes a Pool object so that
562 the balancer knows which pool it's balancing.
563
564 =cut
565
566 sub _build_balancer {
567   my $self = shift @_;
568   $self->create_balancer(
569     pool=>$self->pool,
570     master=>$self->master,
571     %{$self->balancer_args},
572   );
573 }
574
575 =head2 _build_write_handler
576
577 Lazy builder for the L</write_handler> attribute.  The default is to set this to
578 the L</master>.
579
580 =cut
581
582 sub _build_write_handler {
583   return shift->master;
584 }
585
586 =head2 _build_read_handler
587
588 Lazy builder for the L</read_handler> attribute.  The default is to set this to
589 the L</balancer>.
590
591 =cut
592
593 sub _build_read_handler {
594   return shift->balancer;
595 }
596
597 =head2 around: connect_replicants
598
599 All calls to connect_replicants needs to have an existing $schema tacked onto
600 top of the args, since L<DBIx::Class::Storage::DBI> needs it, and any
601 L<connect_info|DBIx::Class::Storage::DBI/connect_info>
602 options merged with the master, with replicant opts having higher priority.
603
604 =cut
605
606 around connect_replicants => sub {
607   my ($next, $self, @args) = @_;
608
609   for my $r (@args) {
610     $r = [ $r ] unless reftype $r eq 'ARRAY';
611
612     $self->throw_exception('coderef replicant connect_info not supported')
613       if ref $r->[0] && reftype $r->[0] eq 'CODE';
614
615 # any connect_info options?
616     my $i = 0;
617     $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
618
619 # make one if none
620     $r->[$i] = {} unless $r->[$i];
621
622 # merge if two hashes
623     my @hashes = @$r[$i .. $#{$r}];
624
625     $self->throw_exception('invalid connect_info options')
626       if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes;
627
628     $self->throw_exception('too many hashrefs in connect_info')
629       if @hashes > 2;
630
631     my $merge = Hash::Merge->new('LEFT_PRECEDENT');
632     my %opts = %{ $merge->merge(reverse @hashes) };
633
634 # delete them
635     splice @$r, $i+1, ($#{$r} - $i), ();
636
637 # make sure master/replicants opts don't clash
638     my %master_opts = %{ $self->_master_connect_info_opts };
639     if (exists $opts{dbh_maker}) {
640         delete @master_opts{qw/dsn user password/};
641     }
642     delete $master_opts{dbh_maker};
643
644 # merge with master
645     %opts = %{ $merge->merge(\%opts, \%master_opts) };
646
647 # update
648     $r->[$i] = \%opts;
649   }
650
651   $self->$next($self->schema, @args);
652 };
653
654 =head2 all_storages
655
656 Returns an array of all the connected storage backends.  The first element
657 in the returned array is the master, and the rest are each of the
658 replicants.
659
660 =cut
661
662 sub all_storages {
663   my $self = shift @_;
664   return grep {defined $_ && blessed $_} (
665      $self->master,
666      values %{ $self->replicants },
667   );
668 }
669
670 =head2 execute_reliably ($coderef, ?@args)
671
672 Given a coderef, saves the current state of the L</read_handler>, forces it to
673 use reliable storage (e.g. sets it to the master), executes a coderef and then
674 restores the original state.
675
676 Example:
677
678   my $reliably = sub {
679     my $name = shift @_;
680     $schema->resultset('User')->create({name=>$name});
681     my $user_rs = $schema->resultset('User')->find({name=>$name});
682     return $user_rs;
683   };
684
685   my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
686
687 Use this when you must be certain of your database state, such as when you just
688 inserted something and need to get a resultset including it, etc.
689
690 =cut
691
692 sub execute_reliably {
693   my $self = shift;
694   my $coderef = shift;
695
696   unless( ref $coderef eq 'CODE') {
697     $self->throw_exception('Second argument must be a coderef');
698   }
699
700   ## replace the current read handler for the remainder of the scope
701   local $self->{read_handler} = $self->master;
702
703   my $args = \@_;
704   return dbic_internal_try {
705     $coderef->(@$args);
706   } catch {
707     $self->throw_exception("coderef returned an error: $_");
708   };
709 }
710
711 =head2 set_reliable_storage
712
713 Sets the current $schema to be 'reliable', that is all queries, both read and
714 write are sent to the master
715
716 =cut
717
718 sub set_reliable_storage {
719   my $self = shift @_;
720   my $schema = $self->schema;
721   my $write_handler = $self->schema->storage->write_handler;
722
723   $schema->storage->read_handler($write_handler);
724 }
725
726 =head2 set_balanced_storage
727
728 Sets the current $schema to be use the </balancer> for all reads, while all
729 writes are sent to the master only
730
731 =cut
732
733 sub set_balanced_storage {
734   my $self = shift @_;
735   my $schema = $self->schema;
736   my $balanced_handler = $self->schema->storage->balancer;
737
738   $schema->storage->read_handler($balanced_handler);
739 }
740
741 =head2 connected
742
743 Check that the master and at least one of the replicants is connected.
744
745 =cut
746
747 sub connected {
748   my $self = shift @_;
749   return
750     $self->master->connected &&
751     $self->pool->connected_replicants;
752 }
753
754 =head2 ensure_connected
755
756 Make sure all the storages are connected.
757
758 =cut
759
760 sub ensure_connected {
761   my $self = shift @_;
762   foreach my $source ($self->all_storages) {
763     $source->ensure_connected(@_);
764   }
765 }
766
767 =head2 limit_dialect
768
769 Set the limit_dialect for all existing storages
770
771 =cut
772
773 sub limit_dialect {
774   my $self = shift @_;
775   foreach my $source ($self->all_storages) {
776     $source->limit_dialect(@_);
777   }
778   return $self->master->limit_dialect;
779 }
780
781 =head2 quote_char
782
783 Set the quote_char for all existing storages
784
785 =cut
786
787 sub quote_char {
788   my $self = shift @_;
789   foreach my $source ($self->all_storages) {
790     $source->quote_char(@_);
791   }
792   return $self->master->quote_char;
793 }
794
795 =head2 name_sep
796
797 Set the name_sep for all existing storages
798
799 =cut
800
801 sub name_sep {
802   my $self = shift @_;
803   foreach my $source ($self->all_storages) {
804     $source->name_sep(@_);
805   }
806   return $self->master->name_sep;
807 }
808
809 =head2 set_schema
810
811 Set the schema object for all existing storages
812
813 =cut
814
815 sub set_schema {
816   my $self = shift @_;
817   foreach my $source ($self->all_storages) {
818     $source->set_schema(@_);
819   }
820 }
821
822 =head2 debug
823
824 set a debug flag across all storages
825
826 =cut
827
828 sub debug {
829   my $self = shift @_;
830   if(@_) {
831     foreach my $source ($self->all_storages) {
832       $source->debug(@_);
833     }
834   }
835   return $self->master->debug;
836 }
837
838 =head2 debugobj
839
840 set a debug object
841
842 =cut
843
844 sub debugobj {
845   my $self = shift @_;
846   return $self->master->debugobj(@_);
847 }
848
849 =head2 debugfh
850
851 set a debugfh object
852
853 =cut
854
855 sub debugfh {
856   my $self = shift @_;
857   return $self->master->debugfh(@_);
858 }
859
860 =head2 debugcb
861
862 set a debug callback
863
864 =cut
865
866 sub debugcb {
867   my $self = shift @_;
868   return $self->master->debugcb(@_);
869 }
870
871 =head2 disconnect
872
873 disconnect everything
874
875 =cut
876
877 sub disconnect {
878   my $self = shift @_;
879   foreach my $source ($self->all_storages) {
880     $source->disconnect(@_);
881   }
882 }
883
884 =head2 cursor_class
885
886 set cursor class on all storages, or return master's
887
888 =cut
889
890 sub cursor_class {
891   my ($self, $cursor_class) = @_;
892
893   if ($cursor_class) {
894     $_->cursor_class($cursor_class) for $self->all_storages;
895   }
896   $self->master->cursor_class;
897 }
898
899 =head2 cursor
900
901 set cursor class on all storages, or return master's, alias for L</cursor_class>
902 above.
903
904 =cut
905
906 sub cursor {
907   my ($self, $cursor_class) = @_;
908
909   if ($cursor_class) {
910     $_->cursor($cursor_class) for $self->all_storages;
911   }
912   $self->master->cursor;
913 }
914
915 =head2 unsafe
916
917 sets the L<DBIx::Class::Storage::DBI/unsafe> option on all storages or returns
918 master's current setting
919
920 =cut
921
922 sub unsafe {
923   my $self = shift;
924
925   if (@_) {
926     $_->unsafe(@_) for $self->all_storages;
927   }
928
929   return $self->master->unsafe;
930 }
931
932 =head2 disable_sth_caching
933
934 sets the L<DBIx::Class::Storage::DBI/disable_sth_caching> option on all storages
935 or returns master's current setting
936
937 =cut
938
939 sub disable_sth_caching {
940   my $self = shift;
941
942   if (@_) {
943     $_->disable_sth_caching(@_) for $self->all_storages;
944   }
945
946   return $self->master->disable_sth_caching;
947 }
948
949 =head2 lag_behind_master
950
951 returns the highest Replicant L<DBIx::Class::Storage::DBI/lag_behind_master>
952 setting
953
954 =cut
955
956 sub lag_behind_master {
957   my $self = shift;
958
959   return max map $_->lag_behind_master, $self->replicants;
960 }
961
962 =head2 is_replicating
963
964 returns true if all replicants return true for
965 L<DBIx::Class::Storage::DBI/is_replicating>
966
967 =cut
968
969 sub is_replicating {
970   my $self = shift;
971
972   return (grep $_->is_replicating, $self->replicants) == ($self->replicants);
973 }
974
975 =head2 connect_call_datetime_setup
976
977 calls L<DBIx::Class::Storage::DBI/connect_call_datetime_setup> for all storages
978
979 =cut
980
981 sub connect_call_datetime_setup {
982   my $self = shift;
983   $_->connect_call_datetime_setup for $self->all_storages;
984 }
985
986 sub _populate_dbh {
987   my $self = shift;
988   $_->_populate_dbh for $self->all_storages;
989 }
990
991 sub _connect {
992   my $self = shift;
993   $_->_connect for $self->all_storages;
994 }
995
996 sub _rebless {
997   my $self = shift;
998   $_->_rebless for $self->all_storages;
999 }
1000
1001 sub _determine_driver {
1002   my $self = shift;
1003   $_->_determine_driver for $self->all_storages;
1004 }
1005
1006 sub _driver_determined {
1007   my $self = shift;
1008
1009   if (@_) {
1010     $_->_driver_determined(@_) for $self->all_storages;
1011   }
1012
1013   return $self->master->_driver_determined;
1014 }
1015
1016 sub _init {
1017   my $self = shift;
1018
1019   $_->_init for $self->all_storages;
1020 }
1021
1022 sub _run_connection_actions {
1023   my $self = shift;
1024
1025   $_->_run_connection_actions for $self->all_storages;
1026 }
1027
1028 sub _do_connection_actions {
1029   my $self = shift;
1030
1031   if (@_) {
1032     $_->_do_connection_actions(@_) for $self->all_storages;
1033   }
1034 }
1035
1036 sub connect_call_do_sql {
1037   my $self = shift;
1038   $_->connect_call_do_sql(@_) for $self->all_storages;
1039 }
1040
1041 sub disconnect_call_do_sql {
1042   my $self = shift;
1043   $_->disconnect_call_do_sql(@_) for $self->all_storages;
1044 }
1045
1046 sub _seems_connected {
1047   my $self = shift;
1048
1049   return min map $_->_seems_connected, $self->all_storages;
1050 }
1051
1052 sub _ping {
1053   my $self = shift;
1054
1055   return min map $_->_ping, $self->all_storages;
1056 }
1057
1058 # not using the normalized_version, because we want to preserve
1059 # version numbers much longer than the conventional xxx.yyyzzz
1060 my $numify_ver = sub {
1061   my $ver = shift;
1062   my @numparts = split /\D+/, $ver;
1063   my $format = '%d.' . (join '', ('%06d') x (@numparts - 1));
1064
1065   return sprintf $format, @numparts;
1066 };
1067 sub _server_info {
1068   my $self = shift;
1069
1070   if (not $self->_dbh_details->{info}) {
1071     $self->_dbh_details->{info} = (
1072       reduce { $a->[0] < $b->[0] ? $a : $b }
1073       map [ $numify_ver->($_->{dbms_version}), $_ ],
1074       map $_->_server_info, $self->all_storages
1075     )->[1];
1076   }
1077
1078   return $self->next::method;
1079 }
1080
1081 sub _get_server_version {
1082   my $self = shift;
1083
1084   return $self->_server_info->{dbms_version};
1085 }
1086
1087 =head1 GOTCHAS
1088
1089 Due to the fact that replicants can lag behind a master, you must take care to
1090 make sure you use one of the methods to force read queries to a master should
1091 you need realtime data integrity.  For example, if you insert a row, and then
1092 immediately re-read it from the database (say, by doing
1093 L<< $result->discard_changes|DBIx::Class::Row/discard_changes >>)
1094 or you insert a row and then immediately build a query that expects that row
1095 to be an item, you should force the master to handle reads.  Otherwise, due to
1096 the lag, there is no certainty your data will be in the expected state.
1097
1098 For data integrity, all transactions automatically use the master storage for
1099 all read and write queries.  Using a transaction is the preferred and recommended
1100 method to force the master to handle all read queries.
1101
1102 Otherwise, you can force a single query to use the master with the 'force_pool'
1103 attribute:
1104
1105   my $result = $resultset->search(undef, {force_pool=>'master'})->find($pk);
1106
1107 This attribute will safely be ignored by non replicated storages, so you can use
1108 the same code for both types of systems.
1109
1110 Lastly, you can use the L</execute_reliably> method, which works very much like
1111 a transaction.
1112
1113 For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
1114 and L</set_balanced_storage>, however this operates at a global level and is not
1115 suitable if you have a shared Schema object being used by multiple processes,
1116 such as on a web application server.  You can get around this limitation by
1117 using the Schema clone method.
1118
1119   my $new_schema = $schema->clone;
1120   $new_schema->set_reliable_storage;
1121
1122   ## $new_schema will use only the Master storage for all reads/writes while
1123   ## the $schema object will use replicated storage.
1124
1125 =head1 FURTHER QUESTIONS?
1126
1127 Check the list of L<additional DBIC resources|DBIx::Class/GETTING HELP/SUPPORT>.
1128
1129 =head1 COPYRIGHT AND LICENSE
1130
1131 This module is free software L<copyright|DBIx::Class/COPYRIGHT AND LICENSE>
1132 by the L<DBIx::Class (DBIC) authors|DBIx::Class/AUTHORS>. You can
1133 redistribute it and/or modify it under the same terms as the
1134 L<DBIx::Class library|DBIx::Class/COPYRIGHT AND LICENSE>.
1135
1136 =cut
1137
1138 __PACKAGE__->meta->make_immutable;
1139
1140 1;