70fd834def256bdc1afb5d41054d1d9e49dbb4c9
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 BEGIN {
4   use DBIx::Class;
5   die('The following modules are required for Replication ' . DBIx::Class::Optional::Dependencies->req_missing_for ('replicated') . "\n" )
6     unless DBIx::Class::Optional::Dependencies->req_ok_for ('replicated');
7 }
8
9 use Moose;
10 use DBIx::Class::Storage::DBI;
11 use DBIx::Class::Storage::DBI::Replicated::Pool;
12 use DBIx::Class::Storage::DBI::Replicated::Balancer;
13 use DBIx::Class::Storage::DBI::Replicated::Types qw/BalancerClassNamePart DBICSchema DBICStorageDBI/;
14 use MooseX::Types::Moose qw/ClassName HashRef Object/;
15 use Scalar::Util 'reftype';
16 use Hash::Merge;
17 use List::Util qw/min max reduce/;
18 use Context::Preserve 'preserve_context';
19 use Try::Tiny;
20
21 use namespace::clean -except => 'meta';
22
23 =head1 NAME
24
25 DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
26
27 =head1 SYNOPSIS
28
29 The Following example shows how to change an existing $schema to a replicated
30 storage type, add some replicated (read-only) databases, and perform reporting
31 tasks.
32
33 You should set the 'storage_type attribute to a replicated type.  You should
34 also define your arguments, such as which balancer you want and any arguments
35 that the Pool object should get.
36
37   my $schema = Schema::Class->clone;
38   $schema->storage_type(['::DBI::Replicated', { balancer_type => '::Random' }]);
39   $schema->connection(...);
40
41 Next, you need to add in the Replicants.  Basically this is an array of
42 arrayrefs, where each arrayref is database connect information.  Think of these
43 arguments as what you'd pass to the 'normal' $schema->connect method.
44
45   $schema->storage->connect_replicants(
46     [$dsn1, $user, $pass, \%opts],
47     [$dsn2, $user, $pass, \%opts],
48     [$dsn3, $user, $pass, \%opts],
49   );
50
51 Now, just use the $schema as you normally would.  Automatically all reads will
52 be delegated to the replicants, while writes to the master.
53
54   $schema->resultset('Source')->search({name=>'etc'});
55
56 You can force a given query to use a particular storage using the search
57 attribute 'force_pool'.  For example:
58
59   my $rs = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
60
61 Now $rs will force everything (both reads and writes) to use whatever was setup
62 as the master storage.  'master' is hardcoded to always point to the Master,
63 but you can also use any Replicant name.  Please see:
64 L<DBIx::Class::Storage::DBI::Replicated::Pool> and the replicants attribute for more.
65
66 Also see transactions and L</execute_reliably> for alternative ways to
67 force read traffic to the master.  In general, you should wrap your statements
68 in a transaction when you are reading and writing to the same tables at the
69 same time, since your replicants will often lag a bit behind the master.
70
71 If you have a multi-statement read only transaction you can force it to select
72 a random server in the pool by:
73
74   my $rs = $schema->resultset('Source')->search( undef,
75     { force_pool => $db->storage->read_handler->next_storage }
76   );
77
78 =head1 DESCRIPTION
79
80 Warning: This class is marked BETA.  This has been running a production
81 website using MySQL native replication as its backend and we have some decent
82 test coverage but the code hasn't yet been stressed by a variety of databases.
83 Individual DBs may have quirks we are not aware of.  Please use this in first
84 development and pass along your experiences/bug fixes.
85
86 This class implements replicated data store for DBI. Currently you can define
87 one master and numerous slave database connections. All write-type queries
88 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
89 database, all read-type queries (SELECTs) go to the slave database.
90
91 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
92 handle gets delegated to one of the two attributes: L</read_handler> or to
93 L</write_handler>.  Additionally, some methods need to be distributed
94 to all existing storages.  This way our storage class is a drop in replacement
95 for L<DBIx::Class::Storage::DBI>.
96
97 Read traffic is spread across the replicants (slaves) occurring to a user
98 selected algorithm.  The default algorithm is random weighted.
99
100 =head1 NOTES
101
102 The consistency between master and replicants is database specific.  The Pool
103 gives you a method to validate its replicants, removing and replacing them
104 when they fail/pass predefined criteria.  Please make careful use of the ways
105 to force a query to run against Master when needed.
106
107 =head1 REQUIREMENTS
108
109 Replicated Storage has additional requirements not currently part of
110 L<DBIx::Class>. See L<DBIx::Class::Optional::Dependencies> for more details.
111
112 =head1 ATTRIBUTES
113
114 This class defines the following attributes.
115
116 =head2 schema
117
118 The underlying L<DBIx::Class::Schema> object this storage is attaching
119
120 =cut
121
122 has 'schema' => (
123     is=>'rw',
124     isa=>DBICSchema,
125     weak_ref=>1,
126     required=>1,
127 );
128
129 =head2 pool_type
130
131 Contains the classname which will instantiate the L</pool> object.  Defaults
132 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
133
134 =cut
135
136 has 'pool_type' => (
137   is=>'rw',
138   isa=>ClassName,
139   default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
140   handles=>{
141     'create_pool' => 'new',
142   },
143 );
144
145 =head2 pool_args
146
147 Contains a hashref of initialized information to pass to the Balancer object.
148 See L<DBIx::Class::Storage::DBI::Replicated::Pool> for available arguments.
149
150 =cut
151
152 has 'pool_args' => (
153   is=>'rw',
154   isa=>HashRef,
155   lazy=>1,
156   default=>sub { {} },
157 );
158
159
160 =head2 balancer_type
161
162 The replication pool requires a balance class to provider the methods for
163 choose how to spread the query load across each replicant in the pool.
164
165 =cut
166
167 has 'balancer_type' => (
168   is=>'rw',
169   isa=>BalancerClassNamePart,
170   coerce=>1,
171   required=>1,
172   default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
173   handles=>{
174     'create_balancer' => 'new',
175   },
176 );
177
178 =head2 balancer_args
179
180 Contains a hashref of initialized information to pass to the Balancer object.
181 See L<DBIx::Class::Storage::DBI::Replicated::Balancer> for available arguments.
182
183 =cut
184
185 has 'balancer_args' => (
186   is=>'rw',
187   isa=>HashRef,
188   lazy=>1,
189   required=>1,
190   default=>sub { {} },
191 );
192
193 =head2 pool
194
195 Is a L<DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
196 container class for one or more replicated databases.
197
198 =cut
199
200 has 'pool' => (
201   is=>'ro',
202   isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
203   lazy_build=>1,
204   handles=>[qw/
205     connect_replicants
206     replicants
207     has_replicants
208   /],
209 );
210
211 =head2 balancer
212
213 Is a L<DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This
214 is a class that takes a pool (L<DBIx::Class::Storage::DBI::Replicated::Pool>)
215
216 =cut
217
218 has 'balancer' => (
219   is=>'rw',
220   isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
221   lazy_build=>1,
222   handles=>[qw/auto_validate_every/],
223 );
224
225 =head2 master
226
227 The master defines the canonical state for a pool of connected databases.  All
228 the replicants are expected to match this databases state.  Thus, in a classic
229 Master / Slaves distributed system, all the slaves are expected to replicate
230 the Master's state as quick as possible.  This is the only database in the
231 pool of databases that is allowed to handle write traffic.
232
233 =cut
234
235 has 'master' => (
236   is=> 'ro',
237   isa=>DBICStorageDBI,
238   lazy_build=>1,
239 );
240
241 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
242
243 The following methods are delegated all the methods required for the
244 L<DBIx::Class::Storage::DBI> interface.
245
246 =cut
247
248 my $method_dispatch = {
249   writer => [qw/
250     on_connect_do
251     on_disconnect_do
252     on_connect_call
253     on_disconnect_call
254     connect_info
255     _connect_info
256     throw_exception
257     sql_maker
258     sqlt_type
259     create_ddl_dir
260     deployment_statements
261     datetime_parser
262     datetime_parser_type
263     build_datetime_parser
264     last_insert_id
265     insert
266     update
267     delete
268     dbh
269     txn_begin
270     txn_do
271     txn_commit
272     txn_rollback
273     txn_scope_guard
274     _exec_txn_rollback
275     _exec_txn_begin
276     _exec_txn_commit
277     deploy
278     with_deferred_fk_checks
279     dbh_do
280     _prep_for_execute
281     is_datatype_numeric
282     _count_select
283     svp_rollback
284     svp_begin
285     svp_release
286     relname_to_table_alias
287     _dbh_last_insert_id
288     _default_dbi_connect_attributes
289     _dbi_connect_info
290     _dbic_connect_attributes
291     auto_savepoint
292     _query_start
293     _query_end
294     _format_for_trace
295     _dbi_attrs_for_bind
296     bind_attribute_by_data_type
297     transaction_depth
298     _dbh
299     _select_args
300     _dbh_execute_for_fetch
301     _sql_maker
302     _dbh_execute_inserts_with_no_binds
303     _select_args_to_query
304     _gen_sql_bind
305     _svp_generate_name
306     _normalize_connect_info
307     _parse_connect_do
308     savepoints
309     _sql_maker_opts
310     _conn_pid
311     _dbh_autocommit
312     _native_data_type
313     _get_dbh
314     sql_maker_class
315     insert_bulk
316     _insert_bulk
317     _execute
318     _do_query
319     _dbh_execute
320   /, Class::MOP::Class->initialize('DBIx::Class::Storage::DBIHacks')->get_method_list ],
321   reader => [qw/
322     select
323     select_single
324     columns_info_for
325     _dbh_columns_info_for
326     _select
327   /],
328   unimplemented => [qw/
329     _arm_global_destructor
330     _verify_pid
331
332     get_use_dbms_capability
333     set_use_dbms_capability
334     get_dbms_capability
335     set_dbms_capability
336     _dbh_details
337     _dbh_get_info
338
339     _determine_connector_driver
340     _extract_driver_from_connect_info
341     _describe_connection
342     _warn_undetermined_driver
343
344     sql_limit_dialect
345     sql_quote_char
346     sql_name_sep
347
348     _prefetch_autovalues
349     _perform_autoinc_retrieval
350     _autoinc_supplied_for_op
351
352     _resolve_bindattrs
353
354     _max_column_bytesize
355     _is_lob_type
356     _is_binary_lob_type
357     _is_binary_type
358     _is_text_lob_type
359
360     _prepare_sth
361     _bind_sth_params
362   /,(
363     # the capability framework
364     # not sure if CMOP->initialize does evil things to DBIC::S::DBI, fix if a problem
365     grep
366       { $_ =~ /^ _ (?: use | supports | determine_supports ) _ /x }
367       ( Class::MOP::Class->initialize('DBIx::Class::Storage::DBI')->get_all_method_names )
368   )],
369 };
370
371 if (DBIx::Class::_ENV_::DBICTEST) {
372
373   my $seen;
374   for my $type (keys %$method_dispatch) {
375     for (@{$method_dispatch->{$type}}) {
376       push @{$seen->{$_}}, $type;
377     }
378   }
379
380   if (my @dupes = grep { @{$seen->{$_}} > 1 } keys %$seen) {
381     die(join "\n", '',
382       'The following methods show up multiple times in ::Storage::DBI::Replicated handlers:',
383       (map { "$_: " . (join ', ', @{$seen->{$_}}) } sort @dupes),
384       '',
385     );
386   }
387
388   if (my @cant = grep { ! DBIx::Class::Storage::DBI->can($_) } keys %$seen) {
389     die(join "\n", '',
390       '::Storage::DBI::Replicated specifies handling of the following *NON EXISTING* ::Storage::DBI methods:',
391       @cant,
392       '',
393     );
394   }
395 }
396
397 for my $method (@{$method_dispatch->{unimplemented}}) {
398   __PACKAGE__->meta->add_method($method, sub {
399     my $self = shift;
400     $self->throw_exception("$method() must not be called on ".(blessed $self).' objects');
401   });
402 }
403
404 =head2 read_handler
405
406 Defines an object that implements the read side of L<DBIx::Class::Storage::DBI>.
407
408 =cut
409
410 has 'read_handler' => (
411   is=>'rw',
412   isa=>Object,
413   lazy_build=>1,
414   handles=>$method_dispatch->{reader},
415 );
416
417 =head2 write_handler
418
419 Defines an object that implements the write side of L<DBIx::Class::Storage::DBI>,
420 as well as methods that don't write or read that can be called on only one
421 storage, methods that return a C<$dbh>, and any methods that don't make sense to
422 run on a replicant.
423
424 =cut
425
426 has 'write_handler' => (
427   is=>'ro',
428   isa=>Object,
429   lazy_build=>1,
430   handles=>$method_dispatch->{writer},
431 );
432
433
434
435 has _master_connect_info_opts =>
436   (is => 'rw', isa => HashRef, default => sub { {} });
437
438 =head2 around: connect_info
439
440 Preserves master's C<connect_info> options (for merging with replicants.)
441 Also sets any Replicated-related options from connect_info, such as
442 C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>.
443
444 =cut
445
446 around connect_info => sub {
447   my ($next, $self, $info, @extra) = @_;
448
449   $self->throw_exception(
450     'connect_info can not be retrieved from a replicated storage - '
451   . 'accessor must be called on a specific pool instance'
452   ) unless defined $info;
453
454   my $merge = Hash::Merge->new('LEFT_PRECEDENT');
455
456   my %opts;
457   for my $arg (@$info) {
458     next unless (reftype($arg)||'') eq 'HASH';
459     %opts = %{ $merge->merge($arg, \%opts) };
460   }
461   delete $opts{dsn};
462
463   if (@opts{qw/pool_type pool_args/}) {
464     $self->pool_type(delete $opts{pool_type})
465       if $opts{pool_type};
466
467     $self->pool_args(
468       $merge->merge((delete $opts{pool_args} || {}), $self->pool_args)
469     );
470
471     ## Since we possibly changed the pool_args, we need to clear the current
472     ## pool object so that next time it is used it will be rebuilt.
473     $self->clear_pool;
474   }
475
476   if (@opts{qw/balancer_type balancer_args/}) {
477     $self->balancer_type(delete $opts{balancer_type})
478       if $opts{balancer_type};
479
480     $self->balancer_args(
481       $merge->merge((delete $opts{balancer_args} || {}), $self->balancer_args)
482     );
483
484     $self->balancer($self->_build_balancer)
485       if $self->balancer;
486   }
487
488   $self->_master_connect_info_opts(\%opts);
489
490   return preserve_context {
491     $self->$next($info, @extra);
492   } after => sub {
493     # Make sure master is blessed into the correct class and apply role to it.
494     my $master = $self->master;
495     $master->_determine_driver;
496     Moose::Meta::Class->initialize(ref $master);
497
498     DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master);
499
500     # link pool back to master
501     $self->pool->master($master);
502   };
503 };
504
505 =head1 METHODS
506
507 This class defines the following methods.
508
509 =head2 BUILDARGS
510
511 L<DBIx::Class::Schema> when instantiating its storage passed itself as the
512 first argument.  So we need to massage the arguments a bit so that all the
513 bits get put into the correct places.
514
515 =cut
516
517 sub BUILDARGS {
518   my ($class, $schema, $storage_type_args, @args) = @_;
519
520   return {
521     schema=>$schema,
522     %$storage_type_args,
523     @args
524   }
525 }
526
527 =head2 _build_master
528
529 Lazy builder for the L</master> attribute.
530
531 =cut
532
533 sub _build_master {
534   my $self = shift @_;
535   my $master = DBIx::Class::Storage::DBI->new($self->schema);
536   $master
537 }
538
539 =head2 _build_pool
540
541 Lazy builder for the L</pool> attribute.
542
543 =cut
544
545 sub _build_pool {
546   my $self = shift @_;
547   $self->create_pool(%{$self->pool_args});
548 }
549
550 =head2 _build_balancer
551
552 Lazy builder for the L</balancer> attribute.  This takes a Pool object so that
553 the balancer knows which pool it's balancing.
554
555 =cut
556
557 sub _build_balancer {
558   my $self = shift @_;
559   $self->create_balancer(
560     pool=>$self->pool,
561     master=>$self->master,
562     %{$self->balancer_args},
563   );
564 }
565
566 =head2 _build_write_handler
567
568 Lazy builder for the L</write_handler> attribute.  The default is to set this to
569 the L</master>.
570
571 =cut
572
573 sub _build_write_handler {
574   return shift->master;
575 }
576
577 =head2 _build_read_handler
578
579 Lazy builder for the L</read_handler> attribute.  The default is to set this to
580 the L</balancer>.
581
582 =cut
583
584 sub _build_read_handler {
585   return shift->balancer;
586 }
587
588 =head2 around: connect_replicants
589
590 All calls to connect_replicants needs to have an existing $schema tacked onto
591 top of the args, since L<DBIx::Class::Storage::DBI> needs it, and any
592 L<connect_info|DBIx::Class::Storage::DBI/connect_info>
593 options merged with the master, with replicant opts having higher priority.
594
595 =cut
596
597 around connect_replicants => sub {
598   my ($next, $self, @args) = @_;
599
600   for my $r (@args) {
601     $r = [ $r ] unless reftype $r eq 'ARRAY';
602
603     $self->throw_exception('coderef replicant connect_info not supported')
604       if ref $r->[0] && reftype $r->[0] eq 'CODE';
605
606 # any connect_info options?
607     my $i = 0;
608     $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
609
610 # make one if none
611     $r->[$i] = {} unless $r->[$i];
612
613 # merge if two hashes
614     my @hashes = @$r[$i .. $#{$r}];
615
616     $self->throw_exception('invalid connect_info options')
617       if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes;
618
619     $self->throw_exception('too many hashrefs in connect_info')
620       if @hashes > 2;
621
622     my $merge = Hash::Merge->new('LEFT_PRECEDENT');
623     my %opts = %{ $merge->merge(reverse @hashes) };
624
625 # delete them
626     splice @$r, $i+1, ($#{$r} - $i), ();
627
628 # make sure master/replicants opts don't clash
629     my %master_opts = %{ $self->_master_connect_info_opts };
630     if (exists $opts{dbh_maker}) {
631         delete @master_opts{qw/dsn user password/};
632     }
633     delete $master_opts{dbh_maker};
634
635 # merge with master
636     %opts = %{ $merge->merge(\%opts, \%master_opts) };
637
638 # update
639     $r->[$i] = \%opts;
640   }
641
642   $self->$next($self->schema, @args);
643 };
644
645 =head2 all_storages
646
647 Returns an array of all the connected storage backends.  The first element
648 in the returned array is the master, and the rest are each of the
649 replicants.
650
651 =cut
652
653 sub all_storages {
654   my $self = shift @_;
655   return grep {defined $_ && blessed $_} (
656      $self->master,
657      values %{ $self->replicants },
658   );
659 }
660
661 =head2 execute_reliably ($coderef, ?@args)
662
663 Given a coderef, saves the current state of the L</read_handler>, forces it to
664 use reliable storage (e.g. sets it to the master), executes a coderef and then
665 restores the original state.
666
667 Example:
668
669   my $reliably = sub {
670     my $name = shift @_;
671     $schema->resultset('User')->create({name=>$name});
672     my $user_rs = $schema->resultset('User')->find({name=>$name});
673     return $user_rs;
674   };
675
676   my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
677
678 Use this when you must be certain of your database state, such as when you just
679 inserted something and need to get a resultset including it, etc.
680
681 =cut
682
683 sub execute_reliably {
684   my $self = shift;
685   my $coderef = shift;
686
687   unless( ref $coderef eq 'CODE') {
688     $self->throw_exception('Second argument must be a coderef');
689   }
690
691   ## replace the current read handler for the remainder of the scope
692   local $self->{read_handler} = $self->master;
693
694   my $args = \@_;
695   return try {
696     $coderef->(@$args);
697   } catch {
698     $self->throw_exception("coderef returned an error: $_");
699   };
700 }
701
702 =head2 set_reliable_storage
703
704 Sets the current $schema to be 'reliable', that is all queries, both read and
705 write are sent to the master
706
707 =cut
708
709 sub set_reliable_storage {
710   my $self = shift @_;
711   my $schema = $self->schema;
712   my $write_handler = $self->schema->storage->write_handler;
713
714   $schema->storage->read_handler($write_handler);
715 }
716
717 =head2 set_balanced_storage
718
719 Sets the current $schema to be use the </balancer> for all reads, while all
720 writes are sent to the master only
721
722 =cut
723
724 sub set_balanced_storage {
725   my $self = shift @_;
726   my $schema = $self->schema;
727   my $balanced_handler = $self->schema->storage->balancer;
728
729   $schema->storage->read_handler($balanced_handler);
730 }
731
732 =head2 connected
733
734 Check that the master and at least one of the replicants is connected.
735
736 =cut
737
738 sub connected {
739   my $self = shift @_;
740   return
741     $self->master->connected &&
742     $self->pool->connected_replicants;
743 }
744
745 =head2 ensure_connected
746
747 Make sure all the storages are connected.
748
749 =cut
750
751 sub ensure_connected {
752   my $self = shift @_;
753   foreach my $source ($self->all_storages) {
754     $source->ensure_connected(@_);
755   }
756 }
757
758 =head2 limit_dialect
759
760 Set the limit_dialect for all existing storages
761
762 =cut
763
764 sub limit_dialect {
765   my $self = shift @_;
766   foreach my $source ($self->all_storages) {
767     $source->limit_dialect(@_);
768   }
769   return $self->master->limit_dialect;
770 }
771
772 =head2 quote_char
773
774 Set the quote_char for all existing storages
775
776 =cut
777
778 sub quote_char {
779   my $self = shift @_;
780   foreach my $source ($self->all_storages) {
781     $source->quote_char(@_);
782   }
783   return $self->master->quote_char;
784 }
785
786 =head2 name_sep
787
788 Set the name_sep for all existing storages
789
790 =cut
791
792 sub name_sep {
793   my $self = shift @_;
794   foreach my $source ($self->all_storages) {
795     $source->name_sep(@_);
796   }
797   return $self->master->name_sep;
798 }
799
800 =head2 set_schema
801
802 Set the schema object for all existing storages
803
804 =cut
805
806 sub set_schema {
807   my $self = shift @_;
808   foreach my $source ($self->all_storages) {
809     $source->set_schema(@_);
810   }
811 }
812
813 =head2 debug
814
815 set a debug flag across all storages
816
817 =cut
818
819 sub debug {
820   my $self = shift @_;
821   if(@_) {
822     foreach my $source ($self->all_storages) {
823       $source->debug(@_);
824     }
825   }
826   return $self->master->debug;
827 }
828
829 =head2 debugobj
830
831 set a debug object
832
833 =cut
834
835 sub debugobj {
836   my $self = shift @_;
837   return $self->master->debugobj(@_);
838 }
839
840 =head2 debugfh
841
842 set a debugfh object
843
844 =cut
845
846 sub debugfh {
847   my $self = shift @_;
848   return $self->master->debugfh(@_);
849 }
850
851 =head2 debugcb
852
853 set a debug callback
854
855 =cut
856
857 sub debugcb {
858   my $self = shift @_;
859   return $self->master->debugcb(@_);
860 }
861
862 =head2 disconnect
863
864 disconnect everything
865
866 =cut
867
868 sub disconnect {
869   my $self = shift @_;
870   foreach my $source ($self->all_storages) {
871     $source->disconnect(@_);
872   }
873 }
874
875 =head2 cursor_class
876
877 set cursor class on all storages, or return master's
878
879 =cut
880
881 sub cursor_class {
882   my ($self, $cursor_class) = @_;
883
884   if ($cursor_class) {
885     $_->cursor_class($cursor_class) for $self->all_storages;
886   }
887   $self->master->cursor_class;
888 }
889
890 =head2 cursor
891
892 set cursor class on all storages, or return master's, alias for L</cursor_class>
893 above.
894
895 =cut
896
897 sub cursor {
898   my ($self, $cursor_class) = @_;
899
900   if ($cursor_class) {
901     $_->cursor($cursor_class) for $self->all_storages;
902   }
903   $self->master->cursor;
904 }
905
906 =head2 unsafe
907
908 sets the L<DBIx::Class::Storage::DBI/unsafe> option on all storages or returns
909 master's current setting
910
911 =cut
912
913 sub unsafe {
914   my $self = shift;
915
916   if (@_) {
917     $_->unsafe(@_) for $self->all_storages;
918   }
919
920   return $self->master->unsafe;
921 }
922
923 =head2 disable_sth_caching
924
925 sets the L<DBIx::Class::Storage::DBI/disable_sth_caching> option on all storages
926 or returns master's current setting
927
928 =cut
929
930 sub disable_sth_caching {
931   my $self = shift;
932
933   if (@_) {
934     $_->disable_sth_caching(@_) for $self->all_storages;
935   }
936
937   return $self->master->disable_sth_caching;
938 }
939
940 =head2 lag_behind_master
941
942 returns the highest Replicant L<DBIx::Class::Storage::DBI/lag_behind_master>
943 setting
944
945 =cut
946
947 sub lag_behind_master {
948   my $self = shift;
949
950   return max map $_->lag_behind_master, $self->replicants;
951 }
952
953 =head2 is_replicating
954
955 returns true if all replicants return true for
956 L<DBIx::Class::Storage::DBI/is_replicating>
957
958 =cut
959
960 sub is_replicating {
961   my $self = shift;
962
963   return (grep $_->is_replicating, $self->replicants) == ($self->replicants);
964 }
965
966 =head2 connect_call_datetime_setup
967
968 calls L<DBIx::Class::Storage::DBI/connect_call_datetime_setup> for all storages
969
970 =cut
971
972 sub connect_call_datetime_setup {
973   my $self = shift;
974   $_->connect_call_datetime_setup for $self->all_storages;
975 }
976
977 sub _populate_dbh {
978   my $self = shift;
979   $_->_populate_dbh for $self->all_storages;
980 }
981
982 sub _connect {
983   my $self = shift;
984   $_->_connect for $self->all_storages;
985 }
986
987 sub _rebless {
988   my $self = shift;
989   $_->_rebless for $self->all_storages;
990 }
991
992 sub _determine_driver {
993   my $self = shift;
994   $_->_determine_driver for $self->all_storages;
995 }
996
997 sub _driver_determined {
998   my $self = shift;
999
1000   if (@_) {
1001     $_->_driver_determined(@_) for $self->all_storages;
1002   }
1003
1004   return $self->master->_driver_determined;
1005 }
1006
1007 sub _init {
1008   my $self = shift;
1009
1010   $_->_init for $self->all_storages;
1011 }
1012
1013 sub _run_connection_actions {
1014   my $self = shift;
1015
1016   $_->_run_connection_actions for $self->all_storages;
1017 }
1018
1019 sub _do_connection_actions {
1020   my $self = shift;
1021
1022   if (@_) {
1023     $_->_do_connection_actions(@_) for $self->all_storages;
1024   }
1025 }
1026
1027 sub connect_call_do_sql {
1028   my $self = shift;
1029   $_->connect_call_do_sql(@_) for $self->all_storages;
1030 }
1031
1032 sub disconnect_call_do_sql {
1033   my $self = shift;
1034   $_->disconnect_call_do_sql(@_) for $self->all_storages;
1035 }
1036
1037 sub _seems_connected {
1038   my $self = shift;
1039
1040   return min map $_->_seems_connected, $self->all_storages;
1041 }
1042
1043 sub _ping {
1044   my $self = shift;
1045
1046   return min map $_->_ping, $self->all_storages;
1047 }
1048
1049 # not using the normalized_version, because we want to preserve
1050 # version numbers much longer than the conventional xxx.yyyzzz
1051 my $numify_ver = sub {
1052   my $ver = shift;
1053   my @numparts = split /\D+/, $ver;
1054   my $format = '%d.' . (join '', ('%06d') x (@numparts - 1));
1055
1056   return sprintf $format, @numparts;
1057 };
1058 sub _server_info {
1059   my $self = shift;
1060
1061   if (not $self->_dbh_details->{info}) {
1062     $self->_dbh_details->{info} = (
1063       reduce { $a->[0] < $b->[0] ? $a : $b }
1064       map [ $numify_ver->($_->{dbms_version}), $_ ],
1065       map $_->_server_info, $self->all_storages
1066     )->[1];
1067   }
1068
1069   return $self->next::method;
1070 }
1071
1072 sub _get_server_version {
1073   my $self = shift;
1074
1075   return $self->_server_info->{dbms_version};
1076 }
1077
1078 =head1 GOTCHAS
1079
1080 Due to the fact that replicants can lag behind a master, you must take care to
1081 make sure you use one of the methods to force read queries to a master should
1082 you need realtime data integrity.  For example, if you insert a row, and then
1083 immediately re-read it from the database (say, by doing
1084 L<< $result->discard_changes|DBIx::Class::Row/discard_changes >>)
1085 or you insert a row and then immediately build a query that expects that row
1086 to be an item, you should force the master to handle reads.  Otherwise, due to
1087 the lag, there is no certainty your data will be in the expected state.
1088
1089 For data integrity, all transactions automatically use the master storage for
1090 all read and write queries.  Using a transaction is the preferred and recommended
1091 method to force the master to handle all read queries.
1092
1093 Otherwise, you can force a single query to use the master with the 'force_pool'
1094 attribute:
1095
1096   my $result = $resultset->search(undef, {force_pool=>'master'})->find($pk);
1097
1098 This attribute will safely be ignored by non replicated storages, so you can use
1099 the same code for both types of systems.
1100
1101 Lastly, you can use the L</execute_reliably> method, which works very much like
1102 a transaction.
1103
1104 For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
1105 and L</set_balanced_storage>, however this operates at a global level and is not
1106 suitable if you have a shared Schema object being used by multiple processes,
1107 such as on a web application server.  You can get around this limitation by
1108 using the Schema clone method.
1109
1110   my $new_schema = $schema->clone;
1111   $new_schema->set_reliable_storage;
1112
1113   ## $new_schema will use only the Master storage for all reads/writes while
1114   ## the $schema object will use replicated storage.
1115
1116 =head1 FURTHER QUESTIONS?
1117
1118 Check the list of L<additional DBIC resources|DBIx::Class/GETTING HELP/SUPPORT>.
1119
1120 =head1 COPYRIGHT AND LICENSE
1121
1122 This module is free software L<copyright|DBIx::Class/COPYRIGHT AND LICENSE>
1123 by the L<DBIx::Class (DBIC) authors|DBIx::Class/AUTHORS>. You can
1124 redistribute it and/or modify it under the same terms as the
1125 L<DBIx::Class library|DBIx::Class/COPYRIGHT AND LICENSE>.
1126
1127 =cut
1128
1129 __PACKAGE__->meta->make_immutable;
1130
1131 1;