48642ece68750d61a6493865866e5a14e3a64ba6
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 use warnings;
4 use strict;
5
6 BEGIN {
7   require DBIx::Class::Optional::Dependencies;
8   if ( my $missing = DBIx::Class::Optional::Dependencies->req_missing_for('replicated') ) {
9     die "The following modules are required for Replicated storage support: $missing\n";
10   }
11 }
12
13 use Moose;
14 use DBIx::Class::Storage::DBI;
15 use DBIx::Class::Storage::DBI::Replicated::Pool;
16 use DBIx::Class::Storage::DBI::Replicated::Balancer;
17 use DBIx::Class::Storage::DBI::Replicated::Types qw/BalancerClassNamePart DBICSchema DBICStorageDBI/;
18 use MooseX::Types::Moose qw/ClassName HashRef Object/;
19 use Scalar::Util 'reftype';
20 use Hash::Merge;
21 use List::Util qw( min max );
22 use Context::Preserve 'preserve_context';
23
24 use namespace::clean -except => 'meta';
25
26 =head1 NAME
27
28 DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
29
30 =head1 SYNOPSIS
31
32 The Following example shows how to change an existing $schema to a replicated
33 storage type, add some replicated (read-only) databases, and perform reporting
34 tasks.
35
36 You should set the 'storage_type attribute to a replicated type.  You should
37 also define your arguments, such as which balancer you want and any arguments
38 that the Pool object should get.
39
40   my $schema = Schema::Class->clone;
41   $schema->storage_type(['::DBI::Replicated', { balancer_type => '::Random' }]);
42   $schema->connection(...);
43
44 Next, you need to add in the Replicants.  Basically this is an array of
45 arrayrefs, where each arrayref is database connect information.  Think of these
46 arguments as what you'd pass to the 'normal' $schema->connect method.
47
48   $schema->storage->connect_replicants(
49     [$dsn1, $user, $pass, \%opts],
50     [$dsn2, $user, $pass, \%opts],
51     [$dsn3, $user, $pass, \%opts],
52   );
53
54 Now, just use the $schema as you normally would.  Automatically all reads will
55 be delegated to the replicants, while writes to the master.
56
57   $schema->resultset('Source')->search({name=>'etc'});
58
59 You can force a given query to use a particular storage using the search
60 attribute 'force_pool'.  For example:
61
62   my $rs = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
63
64 Now $rs will force everything (both reads and writes) to use whatever was setup
65 as the master storage.  'master' is hardcoded to always point to the Master,
66 but you can also use any Replicant name.  Please see:
67 L<DBIx::Class::Storage::DBI::Replicated::Pool> and the replicants attribute for more.
68
69 Also see transactions and L</execute_reliably> for alternative ways to
70 force read traffic to the master.  In general, you should wrap your statements
71 in a transaction when you are reading and writing to the same tables at the
72 same time, since your replicants will often lag a bit behind the master.
73
74 If you have a multi-statement read only transaction you can force it to select
75 a random server in the pool by:
76
77   my $rs = $schema->resultset('Source')->search( undef,
78     { force_pool => $db->storage->read_handler->next_storage }
79   );
80
81 =head1 DESCRIPTION
82
83 Warning: This class is marked BETA.  This has been running a production
84 website using MySQL native replication as its backend and we have some decent
85 test coverage but the code hasn't yet been stressed by a variety of databases.
86 Individual DBs may have quirks we are not aware of.  Please use this in first
87 development and pass along your experiences/bug fixes.
88
89 This class implements replicated data store for DBI. Currently you can define
90 one master and numerous slave database connections. All write-type queries
91 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
92 database, all read-type queries (SELECTs) go to the slave database.
93
94 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
95 handle gets delegated to one of the two attributes: L</read_handler> or to
96 L</write_handler>.  Additionally, some methods need to be distributed
97 to all existing storages.  This way our storage class is a drop in replacement
98 for L<DBIx::Class::Storage::DBI>.
99
100 Read traffic is spread across the replicants (slaves) occurring to a user
101 selected algorithm.  The default algorithm is random weighted.
102
103 =head1 NOTES
104
105 The consistency between master and replicants is database specific.  The Pool
106 gives you a method to validate its replicants, removing and replacing them
107 when they fail/pass predefined criteria.  Please make careful use of the ways
108 to force a query to run against Master when needed.
109
110 =head1 REQUIREMENTS
111
112 Replicated Storage has additional requirements not currently part of
113 L<DBIx::Class>. See L<DBIx::Class::Optional::Dependencies> for more details.
114
115 =head1 ATTRIBUTES
116
117 This class defines the following attributes.
118
119 =head2 schema
120
121 The underlying L<DBIx::Class::Schema> object this storage is attaching
122
123 =cut
124
125 has 'schema' => (
126     is=>'rw',
127     isa=>DBICSchema,
128     weak_ref=>1,
129     required=>1,
130 );
131
132 =head2 pool_type
133
134 Contains the classname which will instantiate the L</pool> object.  Defaults
135 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
136
137 =cut
138
139 has 'pool_type' => (
140   is=>'rw',
141   isa=>ClassName,
142   default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
143   handles=>{
144     'create_pool' => 'new',
145   },
146 );
147
148 =head2 pool_args
149
150 Contains a hashref of initialized information to pass to the Balancer object.
151 See L<DBIx::Class::Storage::DBI::Replicated::Pool> for available arguments.
152
153 =cut
154
155 has 'pool_args' => (
156   is=>'rw',
157   isa=>HashRef,
158   lazy=>1,
159   default=>sub { {} },
160 );
161
162
163 =head2 balancer_type
164
165 The replication pool requires a balance class to provider the methods for
166 choose how to spread the query load across each replicant in the pool.
167
168 =cut
169
170 has 'balancer_type' => (
171   is=>'rw',
172   isa=>BalancerClassNamePart,
173   coerce=>1,
174   required=>1,
175   default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
176   handles=>{
177     'create_balancer' => 'new',
178   },
179 );
180
181 =head2 balancer_args
182
183 Contains a hashref of initialized information to pass to the Balancer object.
184 See L<DBIx::Class::Storage::DBI::Replicated::Balancer> for available arguments.
185
186 =cut
187
188 has 'balancer_args' => (
189   is=>'rw',
190   isa=>HashRef,
191   lazy=>1,
192   required=>1,
193   default=>sub { {} },
194 );
195
196 =head2 pool
197
198 Is a L<DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
199 container class for one or more replicated databases.
200
201 =cut
202
203 has 'pool' => (
204   is=>'ro',
205   isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
206   lazy_build=>1,
207   handles=>[qw/
208     connect_replicants
209     replicants
210     has_replicants
211   /],
212 );
213
214 =head2 balancer
215
216 Is a L<DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This
217 is a class that takes a pool (L<DBIx::Class::Storage::DBI::Replicated::Pool>)
218
219 =cut
220
221 has 'balancer' => (
222   is=>'rw',
223   isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
224   lazy_build=>1,
225   handles=>[qw/auto_validate_every/],
226 );
227
228 =head2 master
229
230 The master defines the canonical state for a pool of connected databases.  All
231 the replicants are expected to match this databases state.  Thus, in a classic
232 Master / Slaves distributed system, all the slaves are expected to replicate
233 the Master's state as quick as possible.  This is the only database in the
234 pool of databases that is allowed to handle write traffic.
235
236 =cut
237
238 has 'master' => (
239   is=> 'ro',
240   isa=>DBICStorageDBI,
241   lazy_build=>1,
242 );
243
244 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
245
246 The following methods are delegated all the methods required for the
247 L<DBIx::Class::Storage::DBI> interface.
248
249 =cut
250
251 my $method_dispatch = {
252   writer => [qw/
253     on_connect_do
254     on_disconnect_do
255     on_connect_call
256     on_disconnect_call
257     connect_info
258     _connect_info
259     throw_exception
260     sql_maker
261     sqlt_type
262     create_ddl_dir
263     deployment_statements
264     datetime_parser
265     datetime_parser_type
266     build_datetime_parser
267     last_insert_id
268     insert
269     update
270     delete
271     dbh
272     txn_begin
273     txn_do
274     txn_commit
275     txn_rollback
276     txn_scope_guard
277     _exec_txn_rollback
278     _exec_txn_begin
279     _exec_txn_commit
280     deploy
281     with_deferred_fk_checks
282     dbh_do
283     _prep_for_execute
284     is_datatype_numeric
285     _count_select
286     svp_rollback
287     svp_begin
288     svp_release
289     relname_to_table_alias
290     _dbh_last_insert_id
291     _default_dbi_connect_attributes
292     _dbi_connect_info
293     _dbic_connect_attributes
294     auto_savepoint
295     _query_start
296     _query_end
297     _format_for_trace
298     _dbi_attrs_for_bind
299     bind_attribute_by_data_type
300     transaction_depth
301     _dbh
302     _select_args
303     _dbh_execute_for_fetch
304     _sql_maker
305     _dbh_execute_inserts_with_no_binds
306     _select_args_to_query
307     _gen_sql_bind
308     _svp_generate_name
309     _normalize_connect_info
310     _parse_connect_do
311     savepoints
312     _sql_maker_opts
313     _use_multicolumn_in
314     _conn_pid
315     _dbh_autocommit
316     _native_data_type
317     _get_dbh
318     sql_maker_class
319     insert_bulk
320     _insert_bulk
321     _execute
322     _do_query
323     _dbh_execute
324   /, Class::MOP::Class->initialize('DBIx::Class::Storage::DBIHacks')->get_method_list ],
325   reader => [qw/
326     select
327     select_single
328     columns_info_for
329     _dbh_columns_info_for
330     _select
331   /],
332   unimplemented => [qw/
333     _arm_global_destructor
334     _verify_pid
335     __delicate_rollback
336
337     get_use_dbms_capability
338     set_use_dbms_capability
339     get_dbms_capability
340     set_dbms_capability
341     _dbh_details
342     _dbh_get_info
343     _get_rdbms_name
344     _get_server_version
345     _server_info
346
347     _determine_connector_driver
348     _extract_driver_from_connect_info
349     _describe_connection
350     _warn_undetermined_driver
351
352     sql_limit_dialect
353     sql_quote_char
354     sql_name_sep
355
356     _prefetch_autovalues
357     _perform_autoinc_retrieval
358     _autoinc_supplied_for_op
359
360     _resolve_bindattrs
361
362     _max_column_bytesize
363     _is_lob_type
364     _is_binary_lob_type
365     _is_binary_type
366     _is_text_lob_type
367
368     _prepare_sth
369     _bind_sth_params
370   /,(
371     # the capability framework
372     # not sure if CMOP->initialize does evil things to DBIC::S::DBI, fix if a problem
373     grep
374       { $_ =~ /^ _ (?: use | supports | determine_supports ) _ /x and $_ ne '_use_multicolumn_in' }
375       ( Class::MOP::Class->initialize('DBIx::Class::Storage::DBI')->get_all_method_names )
376   )],
377 };
378
379 # this only happens during DBIC-internal testing
380 if ( $INC{"t/lib/ANFANG.pm"} ) {
381
382   my $seen;
383   for my $type (keys %$method_dispatch) {
384     for (@{$method_dispatch->{$type}}) {
385       push @{$seen->{$_}}, $type;
386     }
387   }
388
389   if (my @dupes = grep { @{$seen->{$_}} > 1 } keys %$seen) {
390     die(join "\n", '',
391       'The following methods show up multiple times in ::Storage::DBI::Replicated handlers:',
392       (map { "$_: " . (join ', ', @{$seen->{$_}}) } sort @dupes),
393       '',
394     );
395   }
396
397   if (my @cant = grep { ! DBIx::Class::Storage::DBI->can($_) } keys %$seen) {
398     die(join "\n", '',
399       '::Storage::DBI::Replicated specifies handling of the following *NON EXISTING* ::Storage::DBI methods:',
400       @cant,
401       '',
402     );
403   }
404 }
405
406 for my $method (@{$method_dispatch->{unimplemented}}) {
407   __PACKAGE__->meta->add_method($method, sub {
408     my $self = shift;
409     $self->throw_exception(
410       "$method() may not be called on '@{[ blessed $self ]}' objects, "
411     . 'call it on a specific pool instance instead'
412     );
413   });
414 }
415
416 =head2 read_handler
417
418 Defines an object that implements the read side of L<DBIx::Class::Storage::DBI>.
419
420 =cut
421
422 has 'read_handler' => (
423   is=>'rw',
424   isa=>Object,
425   lazy_build=>1,
426   handles=>$method_dispatch->{reader},
427 );
428
429 =head2 write_handler
430
431 Defines an object that implements the write side of L<DBIx::Class::Storage::DBI>,
432 as well as methods that don't write or read that can be called on only one
433 storage, methods that return a C<$dbh>, and any methods that don't make sense to
434 run on a replicant.
435
436 =cut
437
438 has 'write_handler' => (
439   is=>'ro',
440   isa=>Object,
441   lazy_build=>1,
442   handles=>$method_dispatch->{writer},
443 );
444
445
446
447 has _master_connect_info_opts =>
448   (is => 'rw', isa => HashRef, default => sub { {} });
449
450 =head2 around: connect_info
451
452 Preserves master's C<connect_info> options (for merging with replicants.)
453 Also sets any Replicated-related options from connect_info, such as
454 C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>.
455
456 =cut
457
458 around connect_info => sub {
459   my ($next, $self, $info, @extra) = @_;
460
461   $self->throw_exception(
462     'connect_info can not be retrieved from a replicated storage - '
463   . 'accessor must be called on a specific pool instance'
464   ) unless defined $info;
465
466   my $merge = Hash::Merge->new('LEFT_PRECEDENT');
467
468   my %opts;
469   for my $arg (@$info) {
470     next unless (reftype($arg)||'') eq 'HASH';
471     %opts = %{ $merge->merge($arg, \%opts) };
472   }
473   delete $opts{dsn};
474
475   if (@opts{qw/pool_type pool_args/}) {
476     $self->pool_type(delete $opts{pool_type})
477       if $opts{pool_type};
478
479     $self->pool_args(
480       $merge->merge((delete $opts{pool_args} || {}), $self->pool_args)
481     );
482
483     ## Since we possibly changed the pool_args, we need to clear the current
484     ## pool object so that next time it is used it will be rebuilt.
485     $self->clear_pool;
486   }
487
488   if (@opts{qw/balancer_type balancer_args/}) {
489     $self->balancer_type(delete $opts{balancer_type})
490       if $opts{balancer_type};
491
492     $self->balancer_args(
493       $merge->merge((delete $opts{balancer_args} || {}), $self->balancer_args)
494     );
495
496     $self->balancer($self->_build_balancer)
497       if $self->balancer;
498   }
499
500   $self->_master_connect_info_opts(\%opts);
501
502   return preserve_context {
503     $self->$next($info, @extra);
504   } after => sub {
505     # Make sure master is blessed into the correct class and apply role to it.
506     my $master = $self->master;
507     $master->_determine_driver;
508     Moose::Meta::Class->initialize(ref $master);
509
510     DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master);
511
512     # link pool back to master
513     $self->pool->master($master);
514   };
515 };
516
517 =head1 METHODS
518
519 This class defines the following methods.
520
521 =head2 BUILDARGS
522
523 L<DBIx::Class::Schema> when instantiating its storage passed itself as the
524 first argument.  So we need to massage the arguments a bit so that all the
525 bits get put into the correct places.
526
527 =cut
528
529 sub BUILDARGS {
530   my ($class, $schema, $storage_type_args, @args) = @_;
531
532   return {
533     schema=>$schema,
534     %$storage_type_args,
535     @args
536   }
537 }
538
539 =head2 _build_master
540
541 Lazy builder for the L</master> attribute.
542
543 =cut
544
545 sub _build_master {
546   my $self = shift @_;
547   my $master = DBIx::Class::Storage::DBI->new($self->schema);
548   $master
549 }
550
551 =head2 _build_pool
552
553 Lazy builder for the L</pool> attribute.
554
555 =cut
556
557 sub _build_pool {
558   my $self = shift @_;
559   $self->create_pool(%{$self->pool_args});
560 }
561
562 =head2 _build_balancer
563
564 Lazy builder for the L</balancer> attribute.  This takes a Pool object so that
565 the balancer knows which pool it's balancing.
566
567 =cut
568
569 sub _build_balancer {
570   my $self = shift @_;
571   $self->create_balancer(
572     pool=>$self->pool,
573     master=>$self->master,
574     %{$self->balancer_args},
575   );
576 }
577
578 =head2 _build_write_handler
579
580 Lazy builder for the L</write_handler> attribute.  The default is to set this to
581 the L</master>.
582
583 =cut
584
585 sub _build_write_handler {
586   return shift->master;
587 }
588
589 =head2 _build_read_handler
590
591 Lazy builder for the L</read_handler> attribute.  The default is to set this to
592 the L</balancer>.
593
594 =cut
595
596 sub _build_read_handler {
597   return shift->balancer;
598 }
599
600 =head2 around: connect_replicants
601
602 All calls to connect_replicants needs to have an existing $schema tacked onto
603 top of the args, since L<DBIx::Class::Storage::DBI> needs it, and any
604 L<connect_info|DBIx::Class::Storage::DBI/connect_info>
605 options merged with the master, with replicant opts having higher priority.
606
607 =cut
608
609 around connect_replicants => sub {
610   my ($next, $self, @args) = @_;
611
612   for my $r (@args) {
613     $r = [ $r ] unless reftype $r eq 'ARRAY';
614
615     $self->throw_exception('coderef replicant connect_info not supported')
616       if ref $r->[0] && reftype $r->[0] eq 'CODE';
617
618 # any connect_info options?
619     my $i = 0;
620     $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
621
622 # make one if none
623     $r->[$i] = {} unless $r->[$i];
624
625 # merge if two hashes
626     my @hashes = @$r[$i .. $#{$r}];
627
628     $self->throw_exception('invalid connect_info options')
629       if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes;
630
631     $self->throw_exception('too many hashrefs in connect_info')
632       if @hashes > 2;
633
634     my $merge = Hash::Merge->new('LEFT_PRECEDENT');
635     my %opts = %{ $merge->merge(reverse @hashes) };
636
637 # delete them
638     splice @$r, $i+1, ($#{$r} - $i), ();
639
640 # make sure master/replicants opts don't clash
641     my %master_opts = %{ $self->_master_connect_info_opts };
642     if (exists $opts{dbh_maker}) {
643         delete @master_opts{qw/dsn user password/};
644     }
645     delete $master_opts{dbh_maker};
646
647 # merge with master
648     %opts = %{ $merge->merge(\%opts, \%master_opts) };
649
650 # update
651     $r->[$i] = \%opts;
652   }
653
654   $self->$next($self->schema, @args);
655 };
656
657 =head2 all_storages
658
659 Returns an array of all the connected storage backends.  The first element
660 in the returned array is the master, and the rest are each of the
661 replicants.
662
663 =cut
664
665 sub all_storages {
666   my $self = shift @_;
667   return grep {defined $_ && blessed $_} (
668      $self->master,
669      values %{ $self->replicants },
670   );
671 }
672
673 =head2 execute_reliably ($coderef, ?@args)
674
675 Given a coderef, saves the current state of the L</read_handler>, forces it to
676 use reliable storage (e.g. sets it to the master), executes a coderef and then
677 restores the original state.
678
679 Example:
680
681   my $reliably = sub {
682     my $name = shift @_;
683     $schema->resultset('User')->create({name=>$name});
684     my $user_rs = $schema->resultset('User')->find({name=>$name});
685     return $user_rs;
686   };
687
688   my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
689
690 Use this when you must be certain of your database state, such as when you just
691 inserted something and need to get a resultset including it, etc.
692
693 =cut
694
695 sub execute_reliably {
696   my $self = shift;
697   my $coderef = shift;
698
699   $self->throw_exception('Second argument must be a coderef')
700     unless( ref $coderef eq 'CODE');
701
702   ## replace the current read handler for the remainder of the scope
703   local $self->{read_handler} = $self->master;
704
705   &$coderef;
706 }
707
708 =head2 set_reliable_storage
709
710 Sets the current $schema to be 'reliable', that is all queries, both read and
711 write are sent to the master
712
713 =cut
714
715 sub set_reliable_storage {
716   my $self = shift @_;
717   my $schema = $self->schema;
718   my $write_handler = $self->schema->storage->write_handler;
719
720   $schema->storage->read_handler($write_handler);
721 }
722
723 =head2 set_balanced_storage
724
725 Sets the current $schema to be use the </balancer> for all reads, while all
726 writes are sent to the master only
727
728 =cut
729
730 sub set_balanced_storage {
731   my $self = shift @_;
732   my $schema = $self->schema;
733   my $balanced_handler = $self->schema->storage->balancer;
734
735   $schema->storage->read_handler($balanced_handler);
736 }
737
738 =head2 connected
739
740 Check that the master and at least one of the replicants is connected.
741
742 =cut
743
744 sub connected {
745   my $self = shift @_;
746   return
747     $self->master->connected &&
748     $self->pool->connected_replicants;
749 }
750
751 =head2 ensure_connected
752
753 Make sure all the storages are connected.
754
755 =cut
756
757 sub ensure_connected {
758   my $self = shift @_;
759   foreach my $source ($self->all_storages) {
760     $source->ensure_connected(@_);
761   }
762 }
763
764 =head2 limit_dialect
765
766 Set the limit_dialect for all existing storages
767
768 =cut
769
770 sub limit_dialect {
771   my $self = shift @_;
772   foreach my $source ($self->all_storages) {
773     $source->limit_dialect(@_);
774   }
775   return $self->master->limit_dialect;
776 }
777
778 =head2 quote_char
779
780 Set the quote_char for all existing storages
781
782 =cut
783
784 sub quote_char {
785   my $self = shift @_;
786   foreach my $source ($self->all_storages) {
787     $source->quote_char(@_);
788   }
789   return $self->master->quote_char;
790 }
791
792 =head2 name_sep
793
794 Set the name_sep for all existing storages
795
796 =cut
797
798 sub name_sep {
799   my $self = shift @_;
800   foreach my $source ($self->all_storages) {
801     $source->name_sep(@_);
802   }
803   return $self->master->name_sep;
804 }
805
806 =head2 set_schema
807
808 Set the schema object for all existing storages
809
810 =cut
811
812 sub set_schema {
813   my $self = shift @_;
814   foreach my $source ($self->all_storages) {
815     $source->set_schema(@_);
816   }
817 }
818
819 =head2 debug
820
821 set a debug flag across all storages
822
823 =cut
824
825 sub debug {
826   my $self = shift @_;
827   if(@_) {
828     foreach my $source ($self->all_storages) {
829       $source->debug(@_);
830     }
831   }
832   return $self->master->debug;
833 }
834
835 =head2 debugobj
836
837 set a debug object
838
839 =cut
840
841 sub debugobj {
842   my $self = shift @_;
843   return $self->master->debugobj(@_);
844 }
845
846 =head2 debugfh
847
848 set a debugfh object
849
850 =cut
851
852 sub debugfh {
853   my $self = shift @_;
854   return $self->master->debugfh(@_);
855 }
856
857 =head2 debugcb
858
859 set a debug callback
860
861 =cut
862
863 sub debugcb {
864   my $self = shift @_;
865   return $self->master->debugcb(@_);
866 }
867
868 =head2 disconnect
869
870 disconnect everything
871
872 =cut
873
874 sub disconnect {
875   my $self = shift @_;
876   foreach my $source ($self->all_storages) {
877     $source->disconnect(@_);
878   }
879 }
880
881 =head2 cursor_class
882
883 set cursor class on all storages, or return master's
884
885 =cut
886
887 sub cursor_class {
888   my ($self, $cursor_class) = @_;
889
890   if ($cursor_class) {
891     $_->cursor_class($cursor_class) for $self->all_storages;
892   }
893   $self->master->cursor_class;
894 }
895
896 =head2 cursor
897
898 set cursor class on all storages, or return master's, alias for L</cursor_class>
899 above.
900
901 =cut
902
903 sub cursor {
904   my ($self, $cursor_class) = @_;
905
906   if ($cursor_class) {
907     $_->cursor($cursor_class) for $self->all_storages;
908   }
909   $self->master->cursor;
910 }
911
912 =head2 unsafe
913
914 sets the L<DBIx::Class::Storage::DBI/unsafe> option on all storages or returns
915 master's current setting
916
917 =cut
918
919 sub unsafe {
920   my $self = shift;
921
922   if (@_) {
923     $_->unsafe(@_) for $self->all_storages;
924   }
925
926   return $self->master->unsafe;
927 }
928
929 =head2 disable_sth_caching
930
931 sets the L<DBIx::Class::Storage::DBI/disable_sth_caching> option on all storages
932 or returns master's current setting
933
934 =cut
935
936 sub disable_sth_caching {
937   my $self = shift;
938
939   if (@_) {
940     $_->disable_sth_caching(@_) for $self->all_storages;
941   }
942
943   return $self->master->disable_sth_caching;
944 }
945
946 =head2 lag_behind_master
947
948 returns the highest Replicant L<DBIx::Class::Storage::DBI/lag_behind_master>
949 setting
950
951 =cut
952
953 sub lag_behind_master {
954   my $self = shift;
955
956   return max map $_->lag_behind_master, $self->replicants;
957 }
958
959 =head2 is_replicating
960
961 returns true if all replicants return true for
962 L<DBIx::Class::Storage::DBI/is_replicating>
963
964 =cut
965
966 sub is_replicating {
967   my $self = shift;
968
969   return (grep $_->is_replicating, $self->replicants) == ($self->replicants);
970 }
971
972 =head2 connect_call_datetime_setup
973
974 calls L<DBIx::Class::Storage::DBI/connect_call_datetime_setup> for all storages
975
976 =cut
977
978 sub connect_call_datetime_setup {
979   my $self = shift;
980   $_->connect_call_datetime_setup for $self->all_storages;
981 }
982
983 sub _populate_dbh {
984   my $self = shift;
985   $_->_populate_dbh for $self->all_storages;
986 }
987
988 sub _connect {
989   my $self = shift;
990   $_->_connect for $self->all_storages;
991 }
992
993 sub _rebless {
994   my $self = shift;
995   $_->_rebless for $self->all_storages;
996 }
997
998 sub _determine_driver {
999   my $self = shift;
1000   $_->_determine_driver for $self->all_storages;
1001 }
1002
1003 sub _driver_determined {
1004   my $self = shift;
1005
1006   if (@_) {
1007     $_->_driver_determined(@_) for $self->all_storages;
1008   }
1009
1010   return $self->master->_driver_determined;
1011 }
1012
1013 sub _init {
1014   my $self = shift;
1015
1016   $_->_init for $self->all_storages;
1017 }
1018
1019 sub _run_connection_actions {
1020   my $self = shift;
1021
1022   $_->_run_connection_actions for $self->all_storages;
1023 }
1024
1025 sub _do_connection_actions {
1026   my $self = shift;
1027
1028   if (@_) {
1029     $_->_do_connection_actions(@_) for $self->all_storages;
1030   }
1031 }
1032
1033 sub connect_call_do_sql {
1034   my $self = shift;
1035   $_->connect_call_do_sql(@_) for $self->all_storages;
1036 }
1037
1038 sub disconnect_call_do_sql {
1039   my $self = shift;
1040   $_->disconnect_call_do_sql(@_) for $self->all_storages;
1041 }
1042
1043 sub _seems_connected {
1044   my $self = shift;
1045
1046   return min map $_->_seems_connected, $self->all_storages;
1047 }
1048
1049 sub _ping {
1050   my $self = shift;
1051
1052   return min map $_->_ping, $self->all_storages;
1053 }
1054
1055 =head1 GOTCHAS
1056
1057 Due to the fact that replicants can lag behind a master, you must take care to
1058 make sure you use one of the methods to force read queries to a master should
1059 you need realtime data integrity.  For example, if you insert a row, and then
1060 immediately re-read it from the database (say, by doing
1061 L<< $result->discard_changes|DBIx::Class::Row/discard_changes >>)
1062 or you insert a row and then immediately build a query that expects that row
1063 to be an item, you should force the master to handle reads.  Otherwise, due to
1064 the lag, there is no certainty your data will be in the expected state.
1065
1066 For data integrity, all transactions automatically use the master storage for
1067 all read and write queries.  Using a transaction is the preferred and recommended
1068 method to force the master to handle all read queries.
1069
1070 Otherwise, you can force a single query to use the master with the 'force_pool'
1071 attribute:
1072
1073   my $result = $resultset->search(undef, {force_pool=>'master'})->find($pk);
1074
1075 This attribute will safely be ignored by non replicated storages, so you can use
1076 the same code for both types of systems.
1077
1078 Lastly, you can use the L</execute_reliably> method, which works very much like
1079 a transaction.
1080
1081 For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
1082 and L</set_balanced_storage>, however this operates at a global level and is not
1083 suitable if you have a shared Schema object being used by multiple processes,
1084 such as on a web application server.  You can get around this limitation by
1085 using the Schema clone method.
1086
1087   my $new_schema = $schema->clone;
1088   $new_schema->set_reliable_storage;
1089
1090   ## $new_schema will use only the Master storage for all reads/writes while
1091   ## the $schema object will use replicated storage.
1092
1093 =head1 FURTHER QUESTIONS?
1094
1095 Check the list of L<additional DBIC resources|DBIx::Class/GETTING HELP/SUPPORT>.
1096
1097 =head1 COPYRIGHT AND LICENSE
1098
1099 This module is free software L<copyright|DBIx::Class/COPYRIGHT AND LICENSE>
1100 by the L<DBIx::Class (DBIC) authors|DBIx::Class/AUTHORS>. You can
1101 redistribute it and/or modify it under the same terms as the
1102 L<DBIx::Class library|DBIx::Class/COPYRIGHT AND LICENSE>.
1103
1104 =cut
1105
1106 __PACKAGE__->meta->make_immutable;
1107
1108 1;