6a2f7ad8f34e9fb999718685faac9a7336919371
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 use warnings;
4 use strict;
5
6 BEGIN {
7   require DBIx::Class::Optional::Dependencies;
8   if ( my $missing = DBIx::Class::Optional::Dependencies->req_missing_for('replicated') ) {
9     die "The following modules are required for Replicated storage support: $missing\n";
10   }
11 }
12
13 use Moose;
14 use DBIx::Class::Storage::DBI;
15 use DBIx::Class::Storage::DBI::Replicated::Pool;
16 use DBIx::Class::Storage::DBI::Replicated::Balancer;
17 use DBIx::Class::Storage::DBI::Replicated::Types qw/BalancerClassNamePart DBICSchema DBICStorageDBI/;
18 use MooseX::Types::Moose qw/ClassName HashRef Object/;
19 use Scalar::Util 'reftype';
20 use Hash::Merge;
21 use List::Util qw( min max );
22 use Context::Preserve 'preserve_context';
23 use Try::Tiny;
24 use DBIx::Class::_Util 'dbic_internal_try';
25
26 use namespace::clean -except => 'meta';
27
28 =head1 NAME
29
30 DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
31
32 =head1 SYNOPSIS
33
34 The Following example shows how to change an existing $schema to a replicated
35 storage type, add some replicated (read-only) databases, and perform reporting
36 tasks.
37
38 You should set the 'storage_type attribute to a replicated type.  You should
39 also define your arguments, such as which balancer you want and any arguments
40 that the Pool object should get.
41
42   my $schema = Schema::Class->clone;
43   $schema->storage_type(['::DBI::Replicated', { balancer_type => '::Random' }]);
44   $schema->connection(...);
45
46 Next, you need to add in the Replicants.  Basically this is an array of
47 arrayrefs, where each arrayref is database connect information.  Think of these
48 arguments as what you'd pass to the 'normal' $schema->connect method.
49
50   $schema->storage->connect_replicants(
51     [$dsn1, $user, $pass, \%opts],
52     [$dsn2, $user, $pass, \%opts],
53     [$dsn3, $user, $pass, \%opts],
54   );
55
56 Now, just use the $schema as you normally would.  Automatically all reads will
57 be delegated to the replicants, while writes to the master.
58
59   $schema->resultset('Source')->search({name=>'etc'});
60
61 You can force a given query to use a particular storage using the search
62 attribute 'force_pool'.  For example:
63
64   my $rs = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
65
66 Now $rs will force everything (both reads and writes) to use whatever was setup
67 as the master storage.  'master' is hardcoded to always point to the Master,
68 but you can also use any Replicant name.  Please see:
69 L<DBIx::Class::Storage::DBI::Replicated::Pool> and the replicants attribute for more.
70
71 Also see transactions and L</execute_reliably> for alternative ways to
72 force read traffic to the master.  In general, you should wrap your statements
73 in a transaction when you are reading and writing to the same tables at the
74 same time, since your replicants will often lag a bit behind the master.
75
76 If you have a multi-statement read only transaction you can force it to select
77 a random server in the pool by:
78
79   my $rs = $schema->resultset('Source')->search( undef,
80     { force_pool => $db->storage->read_handler->next_storage }
81   );
82
83 =head1 DESCRIPTION
84
85 Warning: This class is marked BETA.  This has been running a production
86 website using MySQL native replication as its backend and we have some decent
87 test coverage but the code hasn't yet been stressed by a variety of databases.
88 Individual DBs may have quirks we are not aware of.  Please use this in first
89 development and pass along your experiences/bug fixes.
90
91 This class implements replicated data store for DBI. Currently you can define
92 one master and numerous slave database connections. All write-type queries
93 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
94 database, all read-type queries (SELECTs) go to the slave database.
95
96 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
97 handle gets delegated to one of the two attributes: L</read_handler> or to
98 L</write_handler>.  Additionally, some methods need to be distributed
99 to all existing storages.  This way our storage class is a drop in replacement
100 for L<DBIx::Class::Storage::DBI>.
101
102 Read traffic is spread across the replicants (slaves) occurring to a user
103 selected algorithm.  The default algorithm is random weighted.
104
105 =head1 NOTES
106
107 The consistency between master and replicants is database specific.  The Pool
108 gives you a method to validate its replicants, removing and replacing them
109 when they fail/pass predefined criteria.  Please make careful use of the ways
110 to force a query to run against Master when needed.
111
112 =head1 REQUIREMENTS
113
114 Replicated Storage has additional requirements not currently part of
115 L<DBIx::Class>. See L<DBIx::Class::Optional::Dependencies> for more details.
116
117 =head1 ATTRIBUTES
118
119 This class defines the following attributes.
120
121 =head2 schema
122
123 The underlying L<DBIx::Class::Schema> object this storage is attaching
124
125 =cut
126
127 has 'schema' => (
128     is=>'rw',
129     isa=>DBICSchema,
130     weak_ref=>1,
131     required=>1,
132 );
133
134 =head2 pool_type
135
136 Contains the classname which will instantiate the L</pool> object.  Defaults
137 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
138
139 =cut
140
141 has 'pool_type' => (
142   is=>'rw',
143   isa=>ClassName,
144   default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
145   handles=>{
146     'create_pool' => 'new',
147   },
148 );
149
150 =head2 pool_args
151
152 Contains a hashref of initialized information to pass to the Balancer object.
153 See L<DBIx::Class::Storage::DBI::Replicated::Pool> for available arguments.
154
155 =cut
156
157 has 'pool_args' => (
158   is=>'rw',
159   isa=>HashRef,
160   lazy=>1,
161   default=>sub { {} },
162 );
163
164
165 =head2 balancer_type
166
167 The replication pool requires a balance class to provider the methods for
168 choose how to spread the query load across each replicant in the pool.
169
170 =cut
171
172 has 'balancer_type' => (
173   is=>'rw',
174   isa=>BalancerClassNamePart,
175   coerce=>1,
176   required=>1,
177   default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
178   handles=>{
179     'create_balancer' => 'new',
180   },
181 );
182
183 =head2 balancer_args
184
185 Contains a hashref of initialized information to pass to the Balancer object.
186 See L<DBIx::Class::Storage::DBI::Replicated::Balancer> for available arguments.
187
188 =cut
189
190 has 'balancer_args' => (
191   is=>'rw',
192   isa=>HashRef,
193   lazy=>1,
194   required=>1,
195   default=>sub { {} },
196 );
197
198 =head2 pool
199
200 Is a L<DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
201 container class for one or more replicated databases.
202
203 =cut
204
205 has 'pool' => (
206   is=>'ro',
207   isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
208   lazy_build=>1,
209   handles=>[qw/
210     connect_replicants
211     replicants
212     has_replicants
213   /],
214 );
215
216 =head2 balancer
217
218 Is a L<DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This
219 is a class that takes a pool (L<DBIx::Class::Storage::DBI::Replicated::Pool>)
220
221 =cut
222
223 has 'balancer' => (
224   is=>'rw',
225   isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
226   lazy_build=>1,
227   handles=>[qw/auto_validate_every/],
228 );
229
230 =head2 master
231
232 The master defines the canonical state for a pool of connected databases.  All
233 the replicants are expected to match this databases state.  Thus, in a classic
234 Master / Slaves distributed system, all the slaves are expected to replicate
235 the Master's state as quick as possible.  This is the only database in the
236 pool of databases that is allowed to handle write traffic.
237
238 =cut
239
240 has 'master' => (
241   is=> 'ro',
242   isa=>DBICStorageDBI,
243   lazy_build=>1,
244 );
245
246 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
247
248 The following methods are delegated all the methods required for the
249 L<DBIx::Class::Storage::DBI> interface.
250
251 =cut
252
253 my $method_dispatch = {
254   writer => [qw/
255     on_connect_do
256     on_disconnect_do
257     on_connect_call
258     on_disconnect_call
259     connect_info
260     _connect_info
261     throw_exception
262     sql_maker
263     sqlt_type
264     create_ddl_dir
265     deployment_statements
266     datetime_parser
267     datetime_parser_type
268     build_datetime_parser
269     last_insert_id
270     insert
271     update
272     delete
273     dbh
274     txn_begin
275     txn_do
276     txn_commit
277     txn_rollback
278     txn_scope_guard
279     _exec_txn_rollback
280     _exec_txn_begin
281     _exec_txn_commit
282     deploy
283     with_deferred_fk_checks
284     dbh_do
285     _prep_for_execute
286     is_datatype_numeric
287     _count_select
288     svp_rollback
289     svp_begin
290     svp_release
291     relname_to_table_alias
292     _dbh_last_insert_id
293     _default_dbi_connect_attributes
294     _dbi_connect_info
295     _dbic_connect_attributes
296     auto_savepoint
297     _query_start
298     _query_end
299     _format_for_trace
300     _dbi_attrs_for_bind
301     bind_attribute_by_data_type
302     transaction_depth
303     _dbh
304     _select_args
305     _dbh_execute_for_fetch
306     _sql_maker
307     _dbh_execute_inserts_with_no_binds
308     _select_args_to_query
309     _gen_sql_bind
310     _svp_generate_name
311     _normalize_connect_info
312     _parse_connect_do
313     savepoints
314     _sql_maker_opts
315     _use_multicolumn_in
316     _conn_pid
317     _dbh_autocommit
318     _native_data_type
319     _get_dbh
320     sql_maker_class
321     insert_bulk
322     _insert_bulk
323     _execute
324     _do_query
325     _dbh_execute
326   /, Class::MOP::Class->initialize('DBIx::Class::Storage::DBIHacks')->get_method_list ],
327   reader => [qw/
328     select
329     select_single
330     columns_info_for
331     _dbh_columns_info_for
332     _select
333   /],
334   unimplemented => [qw/
335     _arm_global_destructor
336     _verify_pid
337     __delicate_rollback
338
339     get_use_dbms_capability
340     set_use_dbms_capability
341     get_dbms_capability
342     set_dbms_capability
343     _dbh_details
344     _dbh_get_info
345     _get_rdbms_name
346     _get_server_version
347     _server_info
348
349     _determine_connector_driver
350     _extract_driver_from_connect_info
351     _describe_connection
352     _warn_undetermined_driver
353
354     sql_limit_dialect
355     sql_quote_char
356     sql_name_sep
357
358     _prefetch_autovalues
359     _perform_autoinc_retrieval
360     _autoinc_supplied_for_op
361
362     _resolve_bindattrs
363
364     _max_column_bytesize
365     _is_lob_type
366     _is_binary_lob_type
367     _is_binary_type
368     _is_text_lob_type
369
370     _prepare_sth
371     _bind_sth_params
372   /,(
373     # the capability framework
374     # not sure if CMOP->initialize does evil things to DBIC::S::DBI, fix if a problem
375     grep
376       { $_ =~ /^ _ (?: use | supports | determine_supports ) _ /x and $_ ne '_use_multicolumn_in' }
377       ( Class::MOP::Class->initialize('DBIx::Class::Storage::DBI')->get_all_method_names )
378   )],
379 };
380
381 # this only happens during DBIC-internal testing
382 if ( $INC{"t/lib/ANFANG.pm"} ) {
383
384   my $seen;
385   for my $type (keys %$method_dispatch) {
386     for (@{$method_dispatch->{$type}}) {
387       push @{$seen->{$_}}, $type;
388     }
389   }
390
391   if (my @dupes = grep { @{$seen->{$_}} > 1 } keys %$seen) {
392     die(join "\n", '',
393       'The following methods show up multiple times in ::Storage::DBI::Replicated handlers:',
394       (map { "$_: " . (join ', ', @{$seen->{$_}}) } sort @dupes),
395       '',
396     );
397   }
398
399   if (my @cant = grep { ! DBIx::Class::Storage::DBI->can($_) } keys %$seen) {
400     die(join "\n", '',
401       '::Storage::DBI::Replicated specifies handling of the following *NON EXISTING* ::Storage::DBI methods:',
402       @cant,
403       '',
404     );
405   }
406 }
407
408 for my $method (@{$method_dispatch->{unimplemented}}) {
409   __PACKAGE__->meta->add_method($method, sub {
410     my $self = shift;
411     $self->throw_exception(
412       "$method() may not be called on '@{[ blessed $self ]}' objects, "
413     . 'call it on a specific pool instance instead'
414     );
415   });
416 }
417
418 =head2 read_handler
419
420 Defines an object that implements the read side of L<DBIx::Class::Storage::DBI>.
421
422 =cut
423
424 has 'read_handler' => (
425   is=>'rw',
426   isa=>Object,
427   lazy_build=>1,
428   handles=>$method_dispatch->{reader},
429 );
430
431 =head2 write_handler
432
433 Defines an object that implements the write side of L<DBIx::Class::Storage::DBI>,
434 as well as methods that don't write or read that can be called on only one
435 storage, methods that return a C<$dbh>, and any methods that don't make sense to
436 run on a replicant.
437
438 =cut
439
440 has 'write_handler' => (
441   is=>'ro',
442   isa=>Object,
443   lazy_build=>1,
444   handles=>$method_dispatch->{writer},
445 );
446
447
448
449 has _master_connect_info_opts =>
450   (is => 'rw', isa => HashRef, default => sub { {} });
451
452 =head2 around: connect_info
453
454 Preserves master's C<connect_info> options (for merging with replicants.)
455 Also sets any Replicated-related options from connect_info, such as
456 C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>.
457
458 =cut
459
460 around connect_info => sub {
461   my ($next, $self, $info, @extra) = @_;
462
463   $self->throw_exception(
464     'connect_info can not be retrieved from a replicated storage - '
465   . 'accessor must be called on a specific pool instance'
466   ) unless defined $info;
467
468   my $merge = Hash::Merge->new('LEFT_PRECEDENT');
469
470   my %opts;
471   for my $arg (@$info) {
472     next unless (reftype($arg)||'') eq 'HASH';
473     %opts = %{ $merge->merge($arg, \%opts) };
474   }
475   delete $opts{dsn};
476
477   if (@opts{qw/pool_type pool_args/}) {
478     $self->pool_type(delete $opts{pool_type})
479       if $opts{pool_type};
480
481     $self->pool_args(
482       $merge->merge((delete $opts{pool_args} || {}), $self->pool_args)
483     );
484
485     ## Since we possibly changed the pool_args, we need to clear the current
486     ## pool object so that next time it is used it will be rebuilt.
487     $self->clear_pool;
488   }
489
490   if (@opts{qw/balancer_type balancer_args/}) {
491     $self->balancer_type(delete $opts{balancer_type})
492       if $opts{balancer_type};
493
494     $self->balancer_args(
495       $merge->merge((delete $opts{balancer_args} || {}), $self->balancer_args)
496     );
497
498     $self->balancer($self->_build_balancer)
499       if $self->balancer;
500   }
501
502   $self->_master_connect_info_opts(\%opts);
503
504   return preserve_context {
505     $self->$next($info, @extra);
506   } after => sub {
507     # Make sure master is blessed into the correct class and apply role to it.
508     my $master = $self->master;
509     $master->_determine_driver;
510     Moose::Meta::Class->initialize(ref $master);
511
512     DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master);
513
514     # link pool back to master
515     $self->pool->master($master);
516   };
517 };
518
519 =head1 METHODS
520
521 This class defines the following methods.
522
523 =head2 BUILDARGS
524
525 L<DBIx::Class::Schema> when instantiating its storage passed itself as the
526 first argument.  So we need to massage the arguments a bit so that all the
527 bits get put into the correct places.
528
529 =cut
530
531 sub BUILDARGS {
532   my ($class, $schema, $storage_type_args, @args) = @_;
533
534   return {
535     schema=>$schema,
536     %$storage_type_args,
537     @args
538   }
539 }
540
541 =head2 _build_master
542
543 Lazy builder for the L</master> attribute.
544
545 =cut
546
547 sub _build_master {
548   my $self = shift @_;
549   my $master = DBIx::Class::Storage::DBI->new($self->schema);
550   $master
551 }
552
553 =head2 _build_pool
554
555 Lazy builder for the L</pool> attribute.
556
557 =cut
558
559 sub _build_pool {
560   my $self = shift @_;
561   $self->create_pool(%{$self->pool_args});
562 }
563
564 =head2 _build_balancer
565
566 Lazy builder for the L</balancer> attribute.  This takes a Pool object so that
567 the balancer knows which pool it's balancing.
568
569 =cut
570
571 sub _build_balancer {
572   my $self = shift @_;
573   $self->create_balancer(
574     pool=>$self->pool,
575     master=>$self->master,
576     %{$self->balancer_args},
577   );
578 }
579
580 =head2 _build_write_handler
581
582 Lazy builder for the L</write_handler> attribute.  The default is to set this to
583 the L</master>.
584
585 =cut
586
587 sub _build_write_handler {
588   return shift->master;
589 }
590
591 =head2 _build_read_handler
592
593 Lazy builder for the L</read_handler> attribute.  The default is to set this to
594 the L</balancer>.
595
596 =cut
597
598 sub _build_read_handler {
599   return shift->balancer;
600 }
601
602 =head2 around: connect_replicants
603
604 All calls to connect_replicants needs to have an existing $schema tacked onto
605 top of the args, since L<DBIx::Class::Storage::DBI> needs it, and any
606 L<connect_info|DBIx::Class::Storage::DBI/connect_info>
607 options merged with the master, with replicant opts having higher priority.
608
609 =cut
610
611 around connect_replicants => sub {
612   my ($next, $self, @args) = @_;
613
614   for my $r (@args) {
615     $r = [ $r ] unless reftype $r eq 'ARRAY';
616
617     $self->throw_exception('coderef replicant connect_info not supported')
618       if ref $r->[0] && reftype $r->[0] eq 'CODE';
619
620 # any connect_info options?
621     my $i = 0;
622     $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
623
624 # make one if none
625     $r->[$i] = {} unless $r->[$i];
626
627 # merge if two hashes
628     my @hashes = @$r[$i .. $#{$r}];
629
630     $self->throw_exception('invalid connect_info options')
631       if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes;
632
633     $self->throw_exception('too many hashrefs in connect_info')
634       if @hashes > 2;
635
636     my $merge = Hash::Merge->new('LEFT_PRECEDENT');
637     my %opts = %{ $merge->merge(reverse @hashes) };
638
639 # delete them
640     splice @$r, $i+1, ($#{$r} - $i), ();
641
642 # make sure master/replicants opts don't clash
643     my %master_opts = %{ $self->_master_connect_info_opts };
644     if (exists $opts{dbh_maker}) {
645         delete @master_opts{qw/dsn user password/};
646     }
647     delete $master_opts{dbh_maker};
648
649 # merge with master
650     %opts = %{ $merge->merge(\%opts, \%master_opts) };
651
652 # update
653     $r->[$i] = \%opts;
654   }
655
656   $self->$next($self->schema, @args);
657 };
658
659 =head2 all_storages
660
661 Returns an array of all the connected storage backends.  The first element
662 in the returned array is the master, and the rest are each of the
663 replicants.
664
665 =cut
666
667 sub all_storages {
668   my $self = shift @_;
669   return grep {defined $_ && blessed $_} (
670      $self->master,
671      values %{ $self->replicants },
672   );
673 }
674
675 =head2 execute_reliably ($coderef, ?@args)
676
677 Given a coderef, saves the current state of the L</read_handler>, forces it to
678 use reliable storage (e.g. sets it to the master), executes a coderef and then
679 restores the original state.
680
681 Example:
682
683   my $reliably = sub {
684     my $name = shift @_;
685     $schema->resultset('User')->create({name=>$name});
686     my $user_rs = $schema->resultset('User')->find({name=>$name});
687     return $user_rs;
688   };
689
690   my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
691
692 Use this when you must be certain of your database state, such as when you just
693 inserted something and need to get a resultset including it, etc.
694
695 =cut
696
697 sub execute_reliably {
698   my $self = shift;
699   my $coderef = shift;
700
701   $self->throw_exception('Second argument must be a coderef')
702     unless( ref $coderef eq 'CODE');
703
704   ## replace the current read handler for the remainder of the scope
705   local $self->{read_handler} = $self->master;
706
707   &$coderef;
708 }
709
710 =head2 set_reliable_storage
711
712 Sets the current $schema to be 'reliable', that is all queries, both read and
713 write are sent to the master
714
715 =cut
716
717 sub set_reliable_storage {
718   my $self = shift @_;
719   my $schema = $self->schema;
720   my $write_handler = $self->schema->storage->write_handler;
721
722   $schema->storage->read_handler($write_handler);
723 }
724
725 =head2 set_balanced_storage
726
727 Sets the current $schema to be use the </balancer> for all reads, while all
728 writes are sent to the master only
729
730 =cut
731
732 sub set_balanced_storage {
733   my $self = shift @_;
734   my $schema = $self->schema;
735   my $balanced_handler = $self->schema->storage->balancer;
736
737   $schema->storage->read_handler($balanced_handler);
738 }
739
740 =head2 connected
741
742 Check that the master and at least one of the replicants is connected.
743
744 =cut
745
746 sub connected {
747   my $self = shift @_;
748   return
749     $self->master->connected &&
750     $self->pool->connected_replicants;
751 }
752
753 =head2 ensure_connected
754
755 Make sure all the storages are connected.
756
757 =cut
758
759 sub ensure_connected {
760   my $self = shift @_;
761   foreach my $source ($self->all_storages) {
762     $source->ensure_connected(@_);
763   }
764 }
765
766 =head2 limit_dialect
767
768 Set the limit_dialect for all existing storages
769
770 =cut
771
772 sub limit_dialect {
773   my $self = shift @_;
774   foreach my $source ($self->all_storages) {
775     $source->limit_dialect(@_);
776   }
777   return $self->master->limit_dialect;
778 }
779
780 =head2 quote_char
781
782 Set the quote_char for all existing storages
783
784 =cut
785
786 sub quote_char {
787   my $self = shift @_;
788   foreach my $source ($self->all_storages) {
789     $source->quote_char(@_);
790   }
791   return $self->master->quote_char;
792 }
793
794 =head2 name_sep
795
796 Set the name_sep for all existing storages
797
798 =cut
799
800 sub name_sep {
801   my $self = shift @_;
802   foreach my $source ($self->all_storages) {
803     $source->name_sep(@_);
804   }
805   return $self->master->name_sep;
806 }
807
808 =head2 set_schema
809
810 Set the schema object for all existing storages
811
812 =cut
813
814 sub set_schema {
815   my $self = shift @_;
816   foreach my $source ($self->all_storages) {
817     $source->set_schema(@_);
818   }
819 }
820
821 =head2 debug
822
823 set a debug flag across all storages
824
825 =cut
826
827 sub debug {
828   my $self = shift @_;
829   if(@_) {
830     foreach my $source ($self->all_storages) {
831       $source->debug(@_);
832     }
833   }
834   return $self->master->debug;
835 }
836
837 =head2 debugobj
838
839 set a debug object
840
841 =cut
842
843 sub debugobj {
844   my $self = shift @_;
845   return $self->master->debugobj(@_);
846 }
847
848 =head2 debugfh
849
850 set a debugfh object
851
852 =cut
853
854 sub debugfh {
855   my $self = shift @_;
856   return $self->master->debugfh(@_);
857 }
858
859 =head2 debugcb
860
861 set a debug callback
862
863 =cut
864
865 sub debugcb {
866   my $self = shift @_;
867   return $self->master->debugcb(@_);
868 }
869
870 =head2 disconnect
871
872 disconnect everything
873
874 =cut
875
876 sub disconnect {
877   my $self = shift @_;
878   foreach my $source ($self->all_storages) {
879     $source->disconnect(@_);
880   }
881 }
882
883 =head2 cursor_class
884
885 set cursor class on all storages, or return master's
886
887 =cut
888
889 sub cursor_class {
890   my ($self, $cursor_class) = @_;
891
892   if ($cursor_class) {
893     $_->cursor_class($cursor_class) for $self->all_storages;
894   }
895   $self->master->cursor_class;
896 }
897
898 =head2 cursor
899
900 set cursor class on all storages, or return master's, alias for L</cursor_class>
901 above.
902
903 =cut
904
905 sub cursor {
906   my ($self, $cursor_class) = @_;
907
908   if ($cursor_class) {
909     $_->cursor($cursor_class) for $self->all_storages;
910   }
911   $self->master->cursor;
912 }
913
914 =head2 unsafe
915
916 sets the L<DBIx::Class::Storage::DBI/unsafe> option on all storages or returns
917 master's current setting
918
919 =cut
920
921 sub unsafe {
922   my $self = shift;
923
924   if (@_) {
925     $_->unsafe(@_) for $self->all_storages;
926   }
927
928   return $self->master->unsafe;
929 }
930
931 =head2 disable_sth_caching
932
933 sets the L<DBIx::Class::Storage::DBI/disable_sth_caching> option on all storages
934 or returns master's current setting
935
936 =cut
937
938 sub disable_sth_caching {
939   my $self = shift;
940
941   if (@_) {
942     $_->disable_sth_caching(@_) for $self->all_storages;
943   }
944
945   return $self->master->disable_sth_caching;
946 }
947
948 =head2 lag_behind_master
949
950 returns the highest Replicant L<DBIx::Class::Storage::DBI/lag_behind_master>
951 setting
952
953 =cut
954
955 sub lag_behind_master {
956   my $self = shift;
957
958   return max map $_->lag_behind_master, $self->replicants;
959 }
960
961 =head2 is_replicating
962
963 returns true if all replicants return true for
964 L<DBIx::Class::Storage::DBI/is_replicating>
965
966 =cut
967
968 sub is_replicating {
969   my $self = shift;
970
971   return (grep $_->is_replicating, $self->replicants) == ($self->replicants);
972 }
973
974 =head2 connect_call_datetime_setup
975
976 calls L<DBIx::Class::Storage::DBI/connect_call_datetime_setup> for all storages
977
978 =cut
979
980 sub connect_call_datetime_setup {
981   my $self = shift;
982   $_->connect_call_datetime_setup for $self->all_storages;
983 }
984
985 sub _populate_dbh {
986   my $self = shift;
987   $_->_populate_dbh for $self->all_storages;
988 }
989
990 sub _connect {
991   my $self = shift;
992   $_->_connect for $self->all_storages;
993 }
994
995 sub _rebless {
996   my $self = shift;
997   $_->_rebless for $self->all_storages;
998 }
999
1000 sub _determine_driver {
1001   my $self = shift;
1002   $_->_determine_driver for $self->all_storages;
1003 }
1004
1005 sub _driver_determined {
1006   my $self = shift;
1007
1008   if (@_) {
1009     $_->_driver_determined(@_) for $self->all_storages;
1010   }
1011
1012   return $self->master->_driver_determined;
1013 }
1014
1015 sub _init {
1016   my $self = shift;
1017
1018   $_->_init for $self->all_storages;
1019 }
1020
1021 sub _run_connection_actions {
1022   my $self = shift;
1023
1024   $_->_run_connection_actions for $self->all_storages;
1025 }
1026
1027 sub _do_connection_actions {
1028   my $self = shift;
1029
1030   if (@_) {
1031     $_->_do_connection_actions(@_) for $self->all_storages;
1032   }
1033 }
1034
1035 sub connect_call_do_sql {
1036   my $self = shift;
1037   $_->connect_call_do_sql(@_) for $self->all_storages;
1038 }
1039
1040 sub disconnect_call_do_sql {
1041   my $self = shift;
1042   $_->disconnect_call_do_sql(@_) for $self->all_storages;
1043 }
1044
1045 sub _seems_connected {
1046   my $self = shift;
1047
1048   return min map $_->_seems_connected, $self->all_storages;
1049 }
1050
1051 sub _ping {
1052   my $self = shift;
1053
1054   return min map $_->_ping, $self->all_storages;
1055 }
1056
1057 =head1 GOTCHAS
1058
1059 Due to the fact that replicants can lag behind a master, you must take care to
1060 make sure you use one of the methods to force read queries to a master should
1061 you need realtime data integrity.  For example, if you insert a row, and then
1062 immediately re-read it from the database (say, by doing
1063 L<< $result->discard_changes|DBIx::Class::Row/discard_changes >>)
1064 or you insert a row and then immediately build a query that expects that row
1065 to be an item, you should force the master to handle reads.  Otherwise, due to
1066 the lag, there is no certainty your data will be in the expected state.
1067
1068 For data integrity, all transactions automatically use the master storage for
1069 all read and write queries.  Using a transaction is the preferred and recommended
1070 method to force the master to handle all read queries.
1071
1072 Otherwise, you can force a single query to use the master with the 'force_pool'
1073 attribute:
1074
1075   my $result = $resultset->search(undef, {force_pool=>'master'})->find($pk);
1076
1077 This attribute will safely be ignored by non replicated storages, so you can use
1078 the same code for both types of systems.
1079
1080 Lastly, you can use the L</execute_reliably> method, which works very much like
1081 a transaction.
1082
1083 For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
1084 and L</set_balanced_storage>, however this operates at a global level and is not
1085 suitable if you have a shared Schema object being used by multiple processes,
1086 such as on a web application server.  You can get around this limitation by
1087 using the Schema clone method.
1088
1089   my $new_schema = $schema->clone;
1090   $new_schema->set_reliable_storage;
1091
1092   ## $new_schema will use only the Master storage for all reads/writes while
1093   ## the $schema object will use replicated storage.
1094
1095 =head1 FURTHER QUESTIONS?
1096
1097 Check the list of L<additional DBIC resources|DBIx::Class/GETTING HELP/SUPPORT>.
1098
1099 =head1 COPYRIGHT AND LICENSE
1100
1101 This module is free software L<copyright|DBIx::Class/COPYRIGHT AND LICENSE>
1102 by the L<DBIx::Class (DBIC) authors|DBIx::Class/AUTHORS>. You can
1103 redistribute it and/or modify it under the same terms as the
1104 L<DBIx::Class library|DBIx::Class/COPYRIGHT AND LICENSE>.
1105
1106 =cut
1107
1108 __PACKAGE__->meta->make_immutable;
1109
1110 1;