Fix exception on complex update/delete under a replicated setup
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 use warnings;
4 use strict;
5
6 BEGIN {
7   require DBIx::Class::Optional::Dependencies;
8   if ( my $missing = DBIx::Class::Optional::Dependencies->req_missing_for('replicated') ) {
9     die "The following modules are required for Replicated storage support: $missing\n";
10   }
11 }
12
13 use Moose;
14 use DBIx::Class::Storage::DBI;
15 use DBIx::Class::Storage::DBI::Replicated::Pool;
16 use DBIx::Class::Storage::DBI::Replicated::Balancer;
17 use DBIx::Class::Storage::DBI::Replicated::Types qw/BalancerClassNamePart DBICSchema DBICStorageDBI/;
18 use MooseX::Types::Moose qw/ClassName HashRef Object/;
19 use Scalar::Util 'reftype';
20 use Hash::Merge;
21 use List::Util qw/min max reduce/;
22 use Context::Preserve 'preserve_context';
23 use Try::Tiny;
24
25 use namespace::clean -except => 'meta';
26
27 =head1 NAME
28
29 DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
30
31 =head1 SYNOPSIS
32
33 The Following example shows how to change an existing $schema to a replicated
34 storage type, add some replicated (read-only) databases, and perform reporting
35 tasks.
36
37 You should set the 'storage_type attribute to a replicated type.  You should
38 also define your arguments, such as which balancer you want and any arguments
39 that the Pool object should get.
40
41   my $schema = Schema::Class->clone;
42   $schema->storage_type(['::DBI::Replicated', { balancer_type => '::Random' }]);
43   $schema->connection(...);
44
45 Next, you need to add in the Replicants.  Basically this is an array of
46 arrayrefs, where each arrayref is database connect information.  Think of these
47 arguments as what you'd pass to the 'normal' $schema->connect method.
48
49   $schema->storage->connect_replicants(
50     [$dsn1, $user, $pass, \%opts],
51     [$dsn2, $user, $pass, \%opts],
52     [$dsn3, $user, $pass, \%opts],
53   );
54
55 Now, just use the $schema as you normally would.  Automatically all reads will
56 be delegated to the replicants, while writes to the master.
57
58   $schema->resultset('Source')->search({name=>'etc'});
59
60 You can force a given query to use a particular storage using the search
61 attribute 'force_pool'.  For example:
62
63   my $rs = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
64
65 Now $rs will force everything (both reads and writes) to use whatever was setup
66 as the master storage.  'master' is hardcoded to always point to the Master,
67 but you can also use any Replicant name.  Please see:
68 L<DBIx::Class::Storage::DBI::Replicated::Pool> and the replicants attribute for more.
69
70 Also see transactions and L</execute_reliably> for alternative ways to
71 force read traffic to the master.  In general, you should wrap your statements
72 in a transaction when you are reading and writing to the same tables at the
73 same time, since your replicants will often lag a bit behind the master.
74
75 If you have a multi-statement read only transaction you can force it to select
76 a random server in the pool by:
77
78   my $rs = $schema->resultset('Source')->search( undef,
79     { force_pool => $db->storage->read_handler->next_storage }
80   );
81
82 =head1 DESCRIPTION
83
84 Warning: This class is marked BETA.  This has been running a production
85 website using MySQL native replication as its backend and we have some decent
86 test coverage but the code hasn't yet been stressed by a variety of databases.
87 Individual DBs may have quirks we are not aware of.  Please use this in first
88 development and pass along your experiences/bug fixes.
89
90 This class implements replicated data store for DBI. Currently you can define
91 one master and numerous slave database connections. All write-type queries
92 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
93 database, all read-type queries (SELECTs) go to the slave database.
94
95 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
96 handle gets delegated to one of the two attributes: L</read_handler> or to
97 L</write_handler>.  Additionally, some methods need to be distributed
98 to all existing storages.  This way our storage class is a drop in replacement
99 for L<DBIx::Class::Storage::DBI>.
100
101 Read traffic is spread across the replicants (slaves) occurring to a user
102 selected algorithm.  The default algorithm is random weighted.
103
104 =head1 NOTES
105
106 The consistency between master and replicants is database specific.  The Pool
107 gives you a method to validate its replicants, removing and replacing them
108 when they fail/pass predefined criteria.  Please make careful use of the ways
109 to force a query to run against Master when needed.
110
111 =head1 REQUIREMENTS
112
113 Replicated Storage has additional requirements not currently part of
114 L<DBIx::Class>. See L<DBIx::Class::Optional::Dependencies> for more details.
115
116 =head1 ATTRIBUTES
117
118 This class defines the following attributes.
119
120 =head2 schema
121
122 The underlying L<DBIx::Class::Schema> object this storage is attaching
123
124 =cut
125
126 has 'schema' => (
127     is=>'rw',
128     isa=>DBICSchema,
129     weak_ref=>1,
130     required=>1,
131 );
132
133 =head2 pool_type
134
135 Contains the classname which will instantiate the L</pool> object.  Defaults
136 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
137
138 =cut
139
140 has 'pool_type' => (
141   is=>'rw',
142   isa=>ClassName,
143   default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
144   handles=>{
145     'create_pool' => 'new',
146   },
147 );
148
149 =head2 pool_args
150
151 Contains a hashref of initialized information to pass to the Balancer object.
152 See L<DBIx::Class::Storage::DBI::Replicated::Pool> for available arguments.
153
154 =cut
155
156 has 'pool_args' => (
157   is=>'rw',
158   isa=>HashRef,
159   lazy=>1,
160   default=>sub { {} },
161 );
162
163
164 =head2 balancer_type
165
166 The replication pool requires a balance class to provider the methods for
167 choose how to spread the query load across each replicant in the pool.
168
169 =cut
170
171 has 'balancer_type' => (
172   is=>'rw',
173   isa=>BalancerClassNamePart,
174   coerce=>1,
175   required=>1,
176   default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
177   handles=>{
178     'create_balancer' => 'new',
179   },
180 );
181
182 =head2 balancer_args
183
184 Contains a hashref of initialized information to pass to the Balancer object.
185 See L<DBIx::Class::Storage::DBI::Replicated::Balancer> for available arguments.
186
187 =cut
188
189 has 'balancer_args' => (
190   is=>'rw',
191   isa=>HashRef,
192   lazy=>1,
193   required=>1,
194   default=>sub { {} },
195 );
196
197 =head2 pool
198
199 Is a L<DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
200 container class for one or more replicated databases.
201
202 =cut
203
204 has 'pool' => (
205   is=>'ro',
206   isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
207   lazy_build=>1,
208   handles=>[qw/
209     connect_replicants
210     replicants
211     has_replicants
212   /],
213 );
214
215 =head2 balancer
216
217 Is a L<DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This
218 is a class that takes a pool (L<DBIx::Class::Storage::DBI::Replicated::Pool>)
219
220 =cut
221
222 has 'balancer' => (
223   is=>'rw',
224   isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
225   lazy_build=>1,
226   handles=>[qw/auto_validate_every/],
227 );
228
229 =head2 master
230
231 The master defines the canonical state for a pool of connected databases.  All
232 the replicants are expected to match this databases state.  Thus, in a classic
233 Master / Slaves distributed system, all the slaves are expected to replicate
234 the Master's state as quick as possible.  This is the only database in the
235 pool of databases that is allowed to handle write traffic.
236
237 =cut
238
239 has 'master' => (
240   is=> 'ro',
241   isa=>DBICStorageDBI,
242   lazy_build=>1,
243 );
244
245 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
246
247 The following methods are delegated all the methods required for the
248 L<DBIx::Class::Storage::DBI> interface.
249
250 =cut
251
252 my $method_dispatch = {
253   writer => [qw/
254     on_connect_do
255     on_disconnect_do
256     on_connect_call
257     on_disconnect_call
258     connect_info
259     _connect_info
260     throw_exception
261     sql_maker
262     sqlt_type
263     create_ddl_dir
264     deployment_statements
265     datetime_parser
266     datetime_parser_type
267     build_datetime_parser
268     last_insert_id
269     insert
270     update
271     delete
272     dbh
273     txn_begin
274     txn_do
275     txn_commit
276     txn_rollback
277     txn_scope_guard
278     _exec_txn_rollback
279     _exec_txn_begin
280     _exec_txn_commit
281     deploy
282     with_deferred_fk_checks
283     dbh_do
284     _prep_for_execute
285     is_datatype_numeric
286     _count_select
287     svp_rollback
288     svp_begin
289     svp_release
290     relname_to_table_alias
291     _dbh_last_insert_id
292     _default_dbi_connect_attributes
293     _dbi_connect_info
294     _dbic_connect_attributes
295     auto_savepoint
296     _query_start
297     _query_end
298     _format_for_trace
299     _dbi_attrs_for_bind
300     bind_attribute_by_data_type
301     transaction_depth
302     _dbh
303     _select_args
304     _dbh_execute_for_fetch
305     _sql_maker
306     _dbh_execute_inserts_with_no_binds
307     _select_args_to_query
308     _gen_sql_bind
309     _svp_generate_name
310     _normalize_connect_info
311     _parse_connect_do
312     savepoints
313     _sql_maker_opts
314     _use_multicolumn_in
315     _conn_pid
316     _dbh_autocommit
317     _native_data_type
318     _get_dbh
319     sql_maker_class
320     insert_bulk
321     _insert_bulk
322     _execute
323     _do_query
324     _dbh_execute
325   /, Class::MOP::Class->initialize('DBIx::Class::Storage::DBIHacks')->get_method_list ],
326   reader => [qw/
327     select
328     select_single
329     columns_info_for
330     _dbh_columns_info_for
331     _select
332   /],
333   unimplemented => [qw/
334     _arm_global_destructor
335     _verify_pid
336
337     get_use_dbms_capability
338     set_use_dbms_capability
339     get_dbms_capability
340     set_dbms_capability
341     _dbh_details
342     _dbh_get_info
343
344     _determine_connector_driver
345     _extract_driver_from_connect_info
346     _describe_connection
347     _warn_undetermined_driver
348
349     sql_limit_dialect
350     sql_quote_char
351     sql_name_sep
352
353     _prefetch_autovalues
354     _perform_autoinc_retrieval
355     _autoinc_supplied_for_op
356
357     _resolve_bindattrs
358
359     _max_column_bytesize
360     _is_lob_type
361     _is_binary_lob_type
362     _is_binary_type
363     _is_text_lob_type
364
365     _prepare_sth
366     _bind_sth_params
367   /,(
368     # the capability framework
369     # not sure if CMOP->initialize does evil things to DBIC::S::DBI, fix if a problem
370     grep
371       { $_ =~ /^ _ (?: use | supports | determine_supports ) _ /x and $_ ne '_use_multicolumn_in' }
372       ( Class::MOP::Class->initialize('DBIx::Class::Storage::DBI')->get_all_method_names )
373   )],
374 };
375
376 if (DBIx::Class::_ENV_::DBICTEST) {
377
378   my $seen;
379   for my $type (keys %$method_dispatch) {
380     for (@{$method_dispatch->{$type}}) {
381       push @{$seen->{$_}}, $type;
382     }
383   }
384
385   if (my @dupes = grep { @{$seen->{$_}} > 1 } keys %$seen) {
386     die(join "\n", '',
387       'The following methods show up multiple times in ::Storage::DBI::Replicated handlers:',
388       (map { "$_: " . (join ', ', @{$seen->{$_}}) } sort @dupes),
389       '',
390     );
391   }
392
393   if (my @cant = grep { ! DBIx::Class::Storage::DBI->can($_) } keys %$seen) {
394     die(join "\n", '',
395       '::Storage::DBI::Replicated specifies handling of the following *NON EXISTING* ::Storage::DBI methods:',
396       @cant,
397       '',
398     );
399   }
400 }
401
402 for my $method (@{$method_dispatch->{unimplemented}}) {
403   __PACKAGE__->meta->add_method($method, sub {
404     my $self = shift;
405     $self->throw_exception("$method() must not be called on ".(blessed $self).' objects');
406   });
407 }
408
409 =head2 read_handler
410
411 Defines an object that implements the read side of L<DBIx::Class::Storage::DBI>.
412
413 =cut
414
415 has 'read_handler' => (
416   is=>'rw',
417   isa=>Object,
418   lazy_build=>1,
419   handles=>$method_dispatch->{reader},
420 );
421
422 =head2 write_handler
423
424 Defines an object that implements the write side of L<DBIx::Class::Storage::DBI>,
425 as well as methods that don't write or read that can be called on only one
426 storage, methods that return a C<$dbh>, and any methods that don't make sense to
427 run on a replicant.
428
429 =cut
430
431 has 'write_handler' => (
432   is=>'ro',
433   isa=>Object,
434   lazy_build=>1,
435   handles=>$method_dispatch->{writer},
436 );
437
438
439
440 has _master_connect_info_opts =>
441   (is => 'rw', isa => HashRef, default => sub { {} });
442
443 =head2 around: connect_info
444
445 Preserves master's C<connect_info> options (for merging with replicants.)
446 Also sets any Replicated-related options from connect_info, such as
447 C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>.
448
449 =cut
450
451 around connect_info => sub {
452   my ($next, $self, $info, @extra) = @_;
453
454   $self->throw_exception(
455     'connect_info can not be retrieved from a replicated storage - '
456   . 'accessor must be called on a specific pool instance'
457   ) unless defined $info;
458
459   my $merge = Hash::Merge->new('LEFT_PRECEDENT');
460
461   my %opts;
462   for my $arg (@$info) {
463     next unless (reftype($arg)||'') eq 'HASH';
464     %opts = %{ $merge->merge($arg, \%opts) };
465   }
466   delete $opts{dsn};
467
468   if (@opts{qw/pool_type pool_args/}) {
469     $self->pool_type(delete $opts{pool_type})
470       if $opts{pool_type};
471
472     $self->pool_args(
473       $merge->merge((delete $opts{pool_args} || {}), $self->pool_args)
474     );
475
476     ## Since we possibly changed the pool_args, we need to clear the current
477     ## pool object so that next time it is used it will be rebuilt.
478     $self->clear_pool;
479   }
480
481   if (@opts{qw/balancer_type balancer_args/}) {
482     $self->balancer_type(delete $opts{balancer_type})
483       if $opts{balancer_type};
484
485     $self->balancer_args(
486       $merge->merge((delete $opts{balancer_args} || {}), $self->balancer_args)
487     );
488
489     $self->balancer($self->_build_balancer)
490       if $self->balancer;
491   }
492
493   $self->_master_connect_info_opts(\%opts);
494
495   return preserve_context {
496     $self->$next($info, @extra);
497   } after => sub {
498     # Make sure master is blessed into the correct class and apply role to it.
499     my $master = $self->master;
500     $master->_determine_driver;
501     Moose::Meta::Class->initialize(ref $master);
502
503     DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master);
504
505     # link pool back to master
506     $self->pool->master($master);
507   };
508 };
509
510 =head1 METHODS
511
512 This class defines the following methods.
513
514 =head2 BUILDARGS
515
516 L<DBIx::Class::Schema> when instantiating its storage passed itself as the
517 first argument.  So we need to massage the arguments a bit so that all the
518 bits get put into the correct places.
519
520 =cut
521
522 sub BUILDARGS {
523   my ($class, $schema, $storage_type_args, @args) = @_;
524
525   return {
526     schema=>$schema,
527     %$storage_type_args,
528     @args
529   }
530 }
531
532 =head2 _build_master
533
534 Lazy builder for the L</master> attribute.
535
536 =cut
537
538 sub _build_master {
539   my $self = shift @_;
540   my $master = DBIx::Class::Storage::DBI->new($self->schema);
541   $master
542 }
543
544 =head2 _build_pool
545
546 Lazy builder for the L</pool> attribute.
547
548 =cut
549
550 sub _build_pool {
551   my $self = shift @_;
552   $self->create_pool(%{$self->pool_args});
553 }
554
555 =head2 _build_balancer
556
557 Lazy builder for the L</balancer> attribute.  This takes a Pool object so that
558 the balancer knows which pool it's balancing.
559
560 =cut
561
562 sub _build_balancer {
563   my $self = shift @_;
564   $self->create_balancer(
565     pool=>$self->pool,
566     master=>$self->master,
567     %{$self->balancer_args},
568   );
569 }
570
571 =head2 _build_write_handler
572
573 Lazy builder for the L</write_handler> attribute.  The default is to set this to
574 the L</master>.
575
576 =cut
577
578 sub _build_write_handler {
579   return shift->master;
580 }
581
582 =head2 _build_read_handler
583
584 Lazy builder for the L</read_handler> attribute.  The default is to set this to
585 the L</balancer>.
586
587 =cut
588
589 sub _build_read_handler {
590   return shift->balancer;
591 }
592
593 =head2 around: connect_replicants
594
595 All calls to connect_replicants needs to have an existing $schema tacked onto
596 top of the args, since L<DBIx::Class::Storage::DBI> needs it, and any
597 L<connect_info|DBIx::Class::Storage::DBI/connect_info>
598 options merged with the master, with replicant opts having higher priority.
599
600 =cut
601
602 around connect_replicants => sub {
603   my ($next, $self, @args) = @_;
604
605   for my $r (@args) {
606     $r = [ $r ] unless reftype $r eq 'ARRAY';
607
608     $self->throw_exception('coderef replicant connect_info not supported')
609       if ref $r->[0] && reftype $r->[0] eq 'CODE';
610
611 # any connect_info options?
612     my $i = 0;
613     $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
614
615 # make one if none
616     $r->[$i] = {} unless $r->[$i];
617
618 # merge if two hashes
619     my @hashes = @$r[$i .. $#{$r}];
620
621     $self->throw_exception('invalid connect_info options')
622       if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes;
623
624     $self->throw_exception('too many hashrefs in connect_info')
625       if @hashes > 2;
626
627     my $merge = Hash::Merge->new('LEFT_PRECEDENT');
628     my %opts = %{ $merge->merge(reverse @hashes) };
629
630 # delete them
631     splice @$r, $i+1, ($#{$r} - $i), ();
632
633 # make sure master/replicants opts don't clash
634     my %master_opts = %{ $self->_master_connect_info_opts };
635     if (exists $opts{dbh_maker}) {
636         delete @master_opts{qw/dsn user password/};
637     }
638     delete $master_opts{dbh_maker};
639
640 # merge with master
641     %opts = %{ $merge->merge(\%opts, \%master_opts) };
642
643 # update
644     $r->[$i] = \%opts;
645   }
646
647   $self->$next($self->schema, @args);
648 };
649
650 =head2 all_storages
651
652 Returns an array of all the connected storage backends.  The first element
653 in the returned array is the master, and the rest are each of the
654 replicants.
655
656 =cut
657
658 sub all_storages {
659   my $self = shift @_;
660   return grep {defined $_ && blessed $_} (
661      $self->master,
662      values %{ $self->replicants },
663   );
664 }
665
666 =head2 execute_reliably ($coderef, ?@args)
667
668 Given a coderef, saves the current state of the L</read_handler>, forces it to
669 use reliable storage (e.g. sets it to the master), executes a coderef and then
670 restores the original state.
671
672 Example:
673
674   my $reliably = sub {
675     my $name = shift @_;
676     $schema->resultset('User')->create({name=>$name});
677     my $user_rs = $schema->resultset('User')->find({name=>$name});
678     return $user_rs;
679   };
680
681   my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
682
683 Use this when you must be certain of your database state, such as when you just
684 inserted something and need to get a resultset including it, etc.
685
686 =cut
687
688 sub execute_reliably {
689   my $self = shift;
690   my $coderef = shift;
691
692   unless( ref $coderef eq 'CODE') {
693     $self->throw_exception('Second argument must be a coderef');
694   }
695
696   ## replace the current read handler for the remainder of the scope
697   local $self->{read_handler} = $self->master;
698
699   my $args = \@_;
700   return try {
701     $coderef->(@$args);
702   } catch {
703     $self->throw_exception("coderef returned an error: $_");
704   };
705 }
706
707 =head2 set_reliable_storage
708
709 Sets the current $schema to be 'reliable', that is all queries, both read and
710 write are sent to the master
711
712 =cut
713
714 sub set_reliable_storage {
715   my $self = shift @_;
716   my $schema = $self->schema;
717   my $write_handler = $self->schema->storage->write_handler;
718
719   $schema->storage->read_handler($write_handler);
720 }
721
722 =head2 set_balanced_storage
723
724 Sets the current $schema to be use the </balancer> for all reads, while all
725 writes are sent to the master only
726
727 =cut
728
729 sub set_balanced_storage {
730   my $self = shift @_;
731   my $schema = $self->schema;
732   my $balanced_handler = $self->schema->storage->balancer;
733
734   $schema->storage->read_handler($balanced_handler);
735 }
736
737 =head2 connected
738
739 Check that the master and at least one of the replicants is connected.
740
741 =cut
742
743 sub connected {
744   my $self = shift @_;
745   return
746     $self->master->connected &&
747     $self->pool->connected_replicants;
748 }
749
750 =head2 ensure_connected
751
752 Make sure all the storages are connected.
753
754 =cut
755
756 sub ensure_connected {
757   my $self = shift @_;
758   foreach my $source ($self->all_storages) {
759     $source->ensure_connected(@_);
760   }
761 }
762
763 =head2 limit_dialect
764
765 Set the limit_dialect for all existing storages
766
767 =cut
768
769 sub limit_dialect {
770   my $self = shift @_;
771   foreach my $source ($self->all_storages) {
772     $source->limit_dialect(@_);
773   }
774   return $self->master->limit_dialect;
775 }
776
777 =head2 quote_char
778
779 Set the quote_char for all existing storages
780
781 =cut
782
783 sub quote_char {
784   my $self = shift @_;
785   foreach my $source ($self->all_storages) {
786     $source->quote_char(@_);
787   }
788   return $self->master->quote_char;
789 }
790
791 =head2 name_sep
792
793 Set the name_sep for all existing storages
794
795 =cut
796
797 sub name_sep {
798   my $self = shift @_;
799   foreach my $source ($self->all_storages) {
800     $source->name_sep(@_);
801   }
802   return $self->master->name_sep;
803 }
804
805 =head2 set_schema
806
807 Set the schema object for all existing storages
808
809 =cut
810
811 sub set_schema {
812   my $self = shift @_;
813   foreach my $source ($self->all_storages) {
814     $source->set_schema(@_);
815   }
816 }
817
818 =head2 debug
819
820 set a debug flag across all storages
821
822 =cut
823
824 sub debug {
825   my $self = shift @_;
826   if(@_) {
827     foreach my $source ($self->all_storages) {
828       $source->debug(@_);
829     }
830   }
831   return $self->master->debug;
832 }
833
834 =head2 debugobj
835
836 set a debug object
837
838 =cut
839
840 sub debugobj {
841   my $self = shift @_;
842   return $self->master->debugobj(@_);
843 }
844
845 =head2 debugfh
846
847 set a debugfh object
848
849 =cut
850
851 sub debugfh {
852   my $self = shift @_;
853   return $self->master->debugfh(@_);
854 }
855
856 =head2 debugcb
857
858 set a debug callback
859
860 =cut
861
862 sub debugcb {
863   my $self = shift @_;
864   return $self->master->debugcb(@_);
865 }
866
867 =head2 disconnect
868
869 disconnect everything
870
871 =cut
872
873 sub disconnect {
874   my $self = shift @_;
875   foreach my $source ($self->all_storages) {
876     $source->disconnect(@_);
877   }
878 }
879
880 =head2 cursor_class
881
882 set cursor class on all storages, or return master's
883
884 =cut
885
886 sub cursor_class {
887   my ($self, $cursor_class) = @_;
888
889   if ($cursor_class) {
890     $_->cursor_class($cursor_class) for $self->all_storages;
891   }
892   $self->master->cursor_class;
893 }
894
895 =head2 cursor
896
897 set cursor class on all storages, or return master's, alias for L</cursor_class>
898 above.
899
900 =cut
901
902 sub cursor {
903   my ($self, $cursor_class) = @_;
904
905   if ($cursor_class) {
906     $_->cursor($cursor_class) for $self->all_storages;
907   }
908   $self->master->cursor;
909 }
910
911 =head2 unsafe
912
913 sets the L<DBIx::Class::Storage::DBI/unsafe> option on all storages or returns
914 master's current setting
915
916 =cut
917
918 sub unsafe {
919   my $self = shift;
920
921   if (@_) {
922     $_->unsafe(@_) for $self->all_storages;
923   }
924
925   return $self->master->unsafe;
926 }
927
928 =head2 disable_sth_caching
929
930 sets the L<DBIx::Class::Storage::DBI/disable_sth_caching> option on all storages
931 or returns master's current setting
932
933 =cut
934
935 sub disable_sth_caching {
936   my $self = shift;
937
938   if (@_) {
939     $_->disable_sth_caching(@_) for $self->all_storages;
940   }
941
942   return $self->master->disable_sth_caching;
943 }
944
945 =head2 lag_behind_master
946
947 returns the highest Replicant L<DBIx::Class::Storage::DBI/lag_behind_master>
948 setting
949
950 =cut
951
952 sub lag_behind_master {
953   my $self = shift;
954
955   return max map $_->lag_behind_master, $self->replicants;
956 }
957
958 =head2 is_replicating
959
960 returns true if all replicants return true for
961 L<DBIx::Class::Storage::DBI/is_replicating>
962
963 =cut
964
965 sub is_replicating {
966   my $self = shift;
967
968   return (grep $_->is_replicating, $self->replicants) == ($self->replicants);
969 }
970
971 =head2 connect_call_datetime_setup
972
973 calls L<DBIx::Class::Storage::DBI/connect_call_datetime_setup> for all storages
974
975 =cut
976
977 sub connect_call_datetime_setup {
978   my $self = shift;
979   $_->connect_call_datetime_setup for $self->all_storages;
980 }
981
982 sub _populate_dbh {
983   my $self = shift;
984   $_->_populate_dbh for $self->all_storages;
985 }
986
987 sub _connect {
988   my $self = shift;
989   $_->_connect for $self->all_storages;
990 }
991
992 sub _rebless {
993   my $self = shift;
994   $_->_rebless for $self->all_storages;
995 }
996
997 sub _determine_driver {
998   my $self = shift;
999   $_->_determine_driver for $self->all_storages;
1000 }
1001
1002 sub _driver_determined {
1003   my $self = shift;
1004
1005   if (@_) {
1006     $_->_driver_determined(@_) for $self->all_storages;
1007   }
1008
1009   return $self->master->_driver_determined;
1010 }
1011
1012 sub _init {
1013   my $self = shift;
1014
1015   $_->_init for $self->all_storages;
1016 }
1017
1018 sub _run_connection_actions {
1019   my $self = shift;
1020
1021   $_->_run_connection_actions for $self->all_storages;
1022 }
1023
1024 sub _do_connection_actions {
1025   my $self = shift;
1026
1027   if (@_) {
1028     $_->_do_connection_actions(@_) for $self->all_storages;
1029   }
1030 }
1031
1032 sub connect_call_do_sql {
1033   my $self = shift;
1034   $_->connect_call_do_sql(@_) for $self->all_storages;
1035 }
1036
1037 sub disconnect_call_do_sql {
1038   my $self = shift;
1039   $_->disconnect_call_do_sql(@_) for $self->all_storages;
1040 }
1041
1042 sub _seems_connected {
1043   my $self = shift;
1044
1045   return min map $_->_seems_connected, $self->all_storages;
1046 }
1047
1048 sub _ping {
1049   my $self = shift;
1050
1051   return min map $_->_ping, $self->all_storages;
1052 }
1053
1054 # not using the normalized_version, because we want to preserve
1055 # version numbers much longer than the conventional xxx.yyyzzz
1056 my $numify_ver = sub {
1057   my $ver = shift;
1058   my @numparts = split /\D+/, $ver;
1059   my $format = '%d.' . (join '', ('%06d') x (@numparts - 1));
1060
1061   return sprintf $format, @numparts;
1062 };
1063 sub _server_info {
1064   my $self = shift;
1065
1066   if (not $self->_dbh_details->{info}) {
1067     $self->_dbh_details->{info} = (
1068       reduce { $a->[0] < $b->[0] ? $a : $b }
1069       map [ $numify_ver->($_->{dbms_version}), $_ ],
1070       map $_->_server_info, $self->all_storages
1071     )->[1];
1072   }
1073
1074   return $self->next::method;
1075 }
1076
1077 sub _get_server_version {
1078   my $self = shift;
1079
1080   return $self->_server_info->{dbms_version};
1081 }
1082
1083 =head1 GOTCHAS
1084
1085 Due to the fact that replicants can lag behind a master, you must take care to
1086 make sure you use one of the methods to force read queries to a master should
1087 you need realtime data integrity.  For example, if you insert a row, and then
1088 immediately re-read it from the database (say, by doing
1089 L<< $result->discard_changes|DBIx::Class::Row/discard_changes >>)
1090 or you insert a row and then immediately build a query that expects that row
1091 to be an item, you should force the master to handle reads.  Otherwise, due to
1092 the lag, there is no certainty your data will be in the expected state.
1093
1094 For data integrity, all transactions automatically use the master storage for
1095 all read and write queries.  Using a transaction is the preferred and recommended
1096 method to force the master to handle all read queries.
1097
1098 Otherwise, you can force a single query to use the master with the 'force_pool'
1099 attribute:
1100
1101   my $result = $resultset->search(undef, {force_pool=>'master'})->find($pk);
1102
1103 This attribute will safely be ignored by non replicated storages, so you can use
1104 the same code for both types of systems.
1105
1106 Lastly, you can use the L</execute_reliably> method, which works very much like
1107 a transaction.
1108
1109 For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
1110 and L</set_balanced_storage>, however this operates at a global level and is not
1111 suitable if you have a shared Schema object being used by multiple processes,
1112 such as on a web application server.  You can get around this limitation by
1113 using the Schema clone method.
1114
1115   my $new_schema = $schema->clone;
1116   $new_schema->set_reliable_storage;
1117
1118   ## $new_schema will use only the Master storage for all reads/writes while
1119   ## the $schema object will use replicated storage.
1120
1121 =head1 FURTHER QUESTIONS?
1122
1123 Check the list of L<additional DBIC resources|DBIx::Class/GETTING HELP/SUPPORT>.
1124
1125 =head1 COPYRIGHT AND LICENSE
1126
1127 This module is free software L<copyright|DBIx::Class/COPYRIGHT AND LICENSE>
1128 by the L<DBIx::Class (DBIC) authors|DBIx::Class/AUTHORS>. You can
1129 redistribute it and/or modify it under the same terms as the
1130 L<DBIx::Class library|DBIx::Class/COPYRIGHT AND LICENSE>.
1131
1132 =cut
1133
1134 __PACKAGE__->meta->make_immutable;
1135
1136 1;