Merge branch 'master' into topic/constructor_rewrite
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 BEGIN {
4   use DBIx::Class;
5   die('The following modules are required for Replication ' . DBIx::Class::Optional::Dependencies->req_missing_for ('replicated') . "\n" )
6     unless DBIx::Class::Optional::Dependencies->req_ok_for ('replicated');
7 }
8
9 use Moose;
10 use DBIx::Class::Storage::DBI;
11 use DBIx::Class::Storage::DBI::Replicated::Pool;
12 use DBIx::Class::Storage::DBI::Replicated::Balancer;
13 use DBIx::Class::Storage::DBI::Replicated::Types qw/BalancerClassNamePart DBICSchema DBICStorageDBI/;
14 use MooseX::Types::Moose qw/ClassName HashRef Object/;
15 use Scalar::Util 'reftype';
16 use Hash::Merge;
17 use List::Util qw/min max reduce/;
18 use Context::Preserve 'preserve_context';
19 use Try::Tiny;
20
21 use namespace::clean -except => 'meta';
22
23 =head1 NAME
24
25 DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
26
27 =head1 SYNOPSIS
28
29 The Following example shows how to change an existing $schema to a replicated
30 storage type, add some replicated (read-only) databases, and perform reporting
31 tasks.
32
33 You should set the 'storage_type attribute to a replicated type.  You should
34 also define your arguments, such as which balancer you want and any arguments
35 that the Pool object should get.
36
37   my $schema = Schema::Class->clone;
38   $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
39   $schema->connection(...);
40
41 Next, you need to add in the Replicants.  Basically this is an array of
42 arrayrefs, where each arrayref is database connect information.  Think of these
43 arguments as what you'd pass to the 'normal' $schema->connect method.
44
45   $schema->storage->connect_replicants(
46     [$dsn1, $user, $pass, \%opts],
47     [$dsn2, $user, $pass, \%opts],
48     [$dsn3, $user, $pass, \%opts],
49   );
50
51 Now, just use the $schema as you normally would.  Automatically all reads will
52 be delegated to the replicants, while writes to the master.
53
54   $schema->resultset('Source')->search({name=>'etc'});
55
56 You can force a given query to use a particular storage using the search
57 attribute 'force_pool'.  For example:
58
59   my $rs = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
60
61 Now $rs will force everything (both reads and writes) to use whatever was setup
62 as the master storage.  'master' is hardcoded to always point to the Master,
63 but you can also use any Replicant name.  Please see:
64 L<DBIx::Class::Storage::DBI::Replicated::Pool> and the replicants attribute for more.
65
66 Also see transactions and L</execute_reliably> for alternative ways to
67 force read traffic to the master.  In general, you should wrap your statements
68 in a transaction when you are reading and writing to the same tables at the
69 same time, since your replicants will often lag a bit behind the master.
70
71 If you have a multi-statement read only transaction you can force it to select
72 a random server in the pool by:
73
74   my $rs = $schema->resultset('Source')->search( undef,
75     { force_pool => $db->storage->read_handler->next_storage }
76   );
77
78 =head1 DESCRIPTION
79
80 Warning: This class is marked BETA.  This has been running a production
81 website using MySQL native replication as its backend and we have some decent
82 test coverage but the code hasn't yet been stressed by a variety of databases.
83 Individual DBs may have quirks we are not aware of.  Please use this in first
84 development and pass along your experiences/bug fixes.
85
86 This class implements replicated data store for DBI. Currently you can define
87 one master and numerous slave database connections. All write-type queries
88 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
89 database, all read-type queries (SELECTs) go to the slave database.
90
91 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
92 handle gets delegated to one of the two attributes: L</read_handler> or to
93 L</write_handler>.  Additionally, some methods need to be distributed
94 to all existing storages.  This way our storage class is a drop in replacement
95 for L<DBIx::Class::Storage::DBI>.
96
97 Read traffic is spread across the replicants (slaves) occurring to a user
98 selected algorithm.  The default algorithm is random weighted.
99
100 =head1 NOTES
101
102 The consistency between master and replicants is database specific.  The Pool
103 gives you a method to validate its replicants, removing and replacing them
104 when they fail/pass predefined criteria.  Please make careful use of the ways
105 to force a query to run against Master when needed.
106
107 =head1 REQUIREMENTS
108
109 Replicated Storage has additional requirements not currently part of
110 L<DBIx::Class>. See L<DBIx::Class::Optional::Dependencies> for more details.
111
112 =head1 ATTRIBUTES
113
114 This class defines the following attributes.
115
116 =head2 schema
117
118 The underlying L<DBIx::Class::Schema> object this storage is attaching
119
120 =cut
121
122 has 'schema' => (
123     is=>'rw',
124     isa=>DBICSchema,
125     weak_ref=>1,
126     required=>1,
127 );
128
129 =head2 pool_type
130
131 Contains the classname which will instantiate the L</pool> object.  Defaults
132 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
133
134 =cut
135
136 has 'pool_type' => (
137   is=>'rw',
138   isa=>ClassName,
139   default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
140   handles=>{
141     'create_pool' => 'new',
142   },
143 );
144
145 =head2 pool_args
146
147 Contains a hashref of initialized information to pass to the Balancer object.
148 See L<DBIx::Class::Storage::DBI::Replicated::Pool> for available arguments.
149
150 =cut
151
152 has 'pool_args' => (
153   is=>'rw',
154   isa=>HashRef,
155   lazy=>1,
156   default=>sub { {} },
157 );
158
159
160 =head2 balancer_type
161
162 The replication pool requires a balance class to provider the methods for
163 choose how to spread the query load across each replicant in the pool.
164
165 =cut
166
167 has 'balancer_type' => (
168   is=>'rw',
169   isa=>BalancerClassNamePart,
170   coerce=>1,
171   required=>1,
172   default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
173   handles=>{
174     'create_balancer' => 'new',
175   },
176 );
177
178 =head2 balancer_args
179
180 Contains a hashref of initialized information to pass to the Balancer object.
181 See L<DBIx::Class::Storage::DBI::Replicated::Balancer> for available arguments.
182
183 =cut
184
185 has 'balancer_args' => (
186   is=>'rw',
187   isa=>HashRef,
188   lazy=>1,
189   required=>1,
190   default=>sub { {} },
191 );
192
193 =head2 pool
194
195 Is a L<DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
196 container class for one or more replicated databases.
197
198 =cut
199
200 has 'pool' => (
201   is=>'ro',
202   isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
203   lazy_build=>1,
204   handles=>[qw/
205     connect_replicants
206     replicants
207     has_replicants
208   /],
209 );
210
211 =head2 balancer
212
213 Is a L<DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This
214 is a class that takes a pool (L<DBIx::Class::Storage::DBI::Replicated::Pool>)
215
216 =cut
217
218 has 'balancer' => (
219   is=>'rw',
220   isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
221   lazy_build=>1,
222   handles=>[qw/auto_validate_every/],
223 );
224
225 =head2 master
226
227 The master defines the canonical state for a pool of connected databases.  All
228 the replicants are expected to match this databases state.  Thus, in a classic
229 Master / Slaves distributed system, all the slaves are expected to replicate
230 the Master's state as quick as possible.  This is the only database in the
231 pool of databases that is allowed to handle write traffic.
232
233 =cut
234
235 has 'master' => (
236   is=> 'ro',
237   isa=>DBICStorageDBI,
238   lazy_build=>1,
239 );
240
241 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
242
243 The following methods are delegated all the methods required for the
244 L<DBIx::Class::Storage::DBI> interface.
245
246 =cut
247
248 my $method_dispatch = {
249   writer => [qw/
250     on_connect_do
251     on_disconnect_do
252     on_connect_call
253     on_disconnect_call
254     connect_info
255     _connect_info
256     throw_exception
257     sql_maker
258     sqlt_type
259     create_ddl_dir
260     deployment_statements
261     datetime_parser
262     datetime_parser_type
263     build_datetime_parser
264     last_insert_id
265     insert
266     insert_bulk
267     update
268     delete
269     dbh
270     txn_begin
271     txn_do
272     txn_commit
273     txn_rollback
274     txn_scope_guard
275     _exec_txn_rollback
276     _exec_txn_begin
277     _exec_txn_commit
278     deploy
279     with_deferred_fk_checks
280     dbh_do
281     _prep_for_execute
282     is_datatype_numeric
283     _count_select
284     svp_rollback
285     svp_begin
286     svp_release
287     relname_to_table_alias
288     _dbh_last_insert_id
289     _default_dbi_connect_attributes
290     _dbi_connect_info
291     _dbic_connect_attributes
292     auto_savepoint
293     _query_start
294     _query_end
295     _format_for_trace
296     _dbi_attrs_for_bind
297     bind_attribute_by_data_type
298     transaction_depth
299     _dbh
300     _select_args
301     _dbh_execute_for_fetch
302     _sql_maker
303     _dbh_execute_inserts_with_no_binds
304     _select_args_to_query
305     _gen_sql_bind
306     _svp_generate_name
307     _normalize_connect_info
308     _parse_connect_do
309     savepoints
310     _sql_maker_opts
311     _conn_pid
312     _dbh_autocommit
313     _native_data_type
314     _get_dbh
315     sql_maker_class
316     _execute
317     _do_query
318     _sth
319     _dbh_sth
320     _dbh_execute
321   /, Class::MOP::Class->initialize('DBIx::Class::Storage::DBIHacks')->get_method_list ],
322   reader => [qw/
323     select
324     select_single
325     columns_info_for
326     _dbh_columns_info_for
327     _select
328   /],
329   unimplemented => [qw/
330     _arm_global_destructor
331     _verify_pid
332
333     source_bind_attributes
334
335     get_use_dbms_capability
336     set_use_dbms_capability
337     get_dbms_capability
338     set_dbms_capability
339     _dbh_details
340     _dbh_get_info
341
342     sql_limit_dialect
343     sql_quote_char
344     sql_name_sep
345
346     _prefetch_autovalues
347     _perform_autoinc_retrieval
348     _autoinc_supplied_for_op
349
350     _resolve_bindattrs
351
352     _max_column_bytesize
353     _is_lob_type
354     _is_binary_lob_type
355     _is_text_lob_type
356
357     sth
358   /,(
359     # the capability framework
360     # not sure if CMOP->initialize does evil things to DBIC::S::DBI, fix if a problem
361     grep
362       { $_ =~ /^ _ (?: use | supports | determine_supports ) _ /x }
363       ( Class::MOP::Class->initialize('DBIx::Class::Storage::DBI')->get_all_method_names )
364   )],
365 };
366
367 if (DBIx::Class::_ENV_::DBICTEST) {
368
369   my $seen;
370   for my $type (keys %$method_dispatch) {
371     for (@{$method_dispatch->{$type}}) {
372       push @{$seen->{$_}}, $type;
373     }
374   }
375
376   if (my @dupes = grep { @{$seen->{$_}} > 1 } keys %$seen) {
377     die(join "\n", '',
378       'The following methods show up multiple times in ::Storage::DBI::Replicated handlers:',
379       (map { "$_: " . (join ', ', @{$seen->{$_}}) } sort @dupes),
380       '',
381     );
382   }
383
384   if (my @cant = grep { ! DBIx::Class::Storage::DBI->can($_) } keys %$seen) {
385     die(join "\n", '',
386       '::Storage::DBI::Replicated specifies handling of the following *NON EXISTING* ::Storage::DBI methods:',
387       @cant,
388       '',
389     );
390   }
391 }
392
393 for my $method (@{$method_dispatch->{unimplemented}}) {
394   __PACKAGE__->meta->add_method($method, sub {
395     my $self = shift;
396     $self->throw_exception("$method must not be called on ".(blessed $self).' objects');
397   });
398 }
399
400 =head2 read_handler
401
402 Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
403
404 =cut
405
406 has 'read_handler' => (
407   is=>'rw',
408   isa=>Object,
409   lazy_build=>1,
410   handles=>$method_dispatch->{reader},
411 );
412
413 =head2 write_handler
414
415 Defines an object that implements the write side of L<BIx::Class::Storage::DBI>,
416 as well as methods that don't write or read that can be called on only one
417 storage, methods that return a C<$dbh>, and any methods that don't make sense to
418 run on a replicant.
419
420 =cut
421
422 has 'write_handler' => (
423   is=>'ro',
424   isa=>Object,
425   lazy_build=>1,
426   handles=>$method_dispatch->{writer},
427 );
428
429
430
431 has _master_connect_info_opts =>
432   (is => 'rw', isa => HashRef, default => sub { {} });
433
434 =head2 around: connect_info
435
436 Preserves master's C<connect_info> options (for merging with replicants.)
437 Also sets any Replicated-related options from connect_info, such as
438 C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>.
439
440 =cut
441
442 around connect_info => sub {
443   my ($next, $self, $info, @extra) = @_;
444
445   my $merge = Hash::Merge->new('LEFT_PRECEDENT');
446
447   my %opts;
448   for my $arg (@$info) {
449     next unless (reftype($arg)||'') eq 'HASH';
450     %opts = %{ $merge->merge($arg, \%opts) };
451   }
452   delete $opts{dsn};
453
454   if (@opts{qw/pool_type pool_args/}) {
455     $self->pool_type(delete $opts{pool_type})
456       if $opts{pool_type};
457
458     $self->pool_args(
459       $merge->merge((delete $opts{pool_args} || {}), $self->pool_args)
460     );
461
462     ## Since we possibly changed the pool_args, we need to clear the current
463     ## pool object so that next time it is used it will be rebuilt.
464     $self->clear_pool;
465   }
466
467   if (@opts{qw/balancer_type balancer_args/}) {
468     $self->balancer_type(delete $opts{balancer_type})
469       if $opts{balancer_type};
470
471     $self->balancer_args(
472       $merge->merge((delete $opts{balancer_args} || {}), $self->balancer_args)
473     );
474
475     $self->balancer($self->_build_balancer)
476       if $self->balancer;
477   }
478
479   $self->_master_connect_info_opts(\%opts);
480
481   return preserve_context {
482     $self->$next($info, @extra);
483   } after => sub {
484     # Make sure master is blessed into the correct class and apply role to it.
485     my $master = $self->master;
486     $master->_determine_driver;
487     Moose::Meta::Class->initialize(ref $master);
488
489     DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master);
490
491     # link pool back to master
492     $self->pool->master($master);
493   };
494 };
495
496 =head1 METHODS
497
498 This class defines the following methods.
499
500 =head2 BUILDARGS
501
502 L<DBIx::Class::Schema> when instantiating its storage passed itself as the
503 first argument.  So we need to massage the arguments a bit so that all the
504 bits get put into the correct places.
505
506 =cut
507
508 sub BUILDARGS {
509   my ($class, $schema, $storage_type_args, @args) = @_;
510
511   return {
512     schema=>$schema,
513     %$storage_type_args,
514     @args
515   }
516 }
517
518 =head2 _build_master
519
520 Lazy builder for the L</master> attribute.
521
522 =cut
523
524 sub _build_master {
525   my $self = shift @_;
526   my $master = DBIx::Class::Storage::DBI->new($self->schema);
527   $master
528 }
529
530 =head2 _build_pool
531
532 Lazy builder for the L</pool> attribute.
533
534 =cut
535
536 sub _build_pool {
537   my $self = shift @_;
538   $self->create_pool(%{$self->pool_args});
539 }
540
541 =head2 _build_balancer
542
543 Lazy builder for the L</balancer> attribute.  This takes a Pool object so that
544 the balancer knows which pool it's balancing.
545
546 =cut
547
548 sub _build_balancer {
549   my $self = shift @_;
550   $self->create_balancer(
551     pool=>$self->pool,
552     master=>$self->master,
553     %{$self->balancer_args},
554   );
555 }
556
557 =head2 _build_write_handler
558
559 Lazy builder for the L</write_handler> attribute.  The default is to set this to
560 the L</master>.
561
562 =cut
563
564 sub _build_write_handler {
565   return shift->master;
566 }
567
568 =head2 _build_read_handler
569
570 Lazy builder for the L</read_handler> attribute.  The default is to set this to
571 the L</balancer>.
572
573 =cut
574
575 sub _build_read_handler {
576   return shift->balancer;
577 }
578
579 =head2 around: connect_replicants
580
581 All calls to connect_replicants needs to have an existing $schema tacked onto
582 top of the args, since L<DBIx::Storage::DBI> needs it, and any C<connect_info>
583 options merged with the master, with replicant opts having higher priority.
584
585 =cut
586
587 around connect_replicants => sub {
588   my ($next, $self, @args) = @_;
589
590   for my $r (@args) {
591     $r = [ $r ] unless reftype $r eq 'ARRAY';
592
593     $self->throw_exception('coderef replicant connect_info not supported')
594       if ref $r->[0] && reftype $r->[0] eq 'CODE';
595
596 # any connect_info options?
597     my $i = 0;
598     $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
599
600 # make one if none
601     $r->[$i] = {} unless $r->[$i];
602
603 # merge if two hashes
604     my @hashes = @$r[$i .. $#{$r}];
605
606     $self->throw_exception('invalid connect_info options')
607       if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes;
608
609     $self->throw_exception('too many hashrefs in connect_info')
610       if @hashes > 2;
611
612     my $merge = Hash::Merge->new('LEFT_PRECEDENT');
613     my %opts = %{ $merge->merge(reverse @hashes) };
614
615 # delete them
616     splice @$r, $i+1, ($#{$r} - $i), ();
617
618 # make sure master/replicants opts don't clash
619     my %master_opts = %{ $self->_master_connect_info_opts };
620     if (exists $opts{dbh_maker}) {
621         delete @master_opts{qw/dsn user password/};
622     }
623     delete $master_opts{dbh_maker};
624
625 # merge with master
626     %opts = %{ $merge->merge(\%opts, \%master_opts) };
627
628 # update
629     $r->[$i] = \%opts;
630   }
631
632   $self->$next($self->schema, @args);
633 };
634
635 =head2 all_storages
636
637 Returns an array of of all the connected storage backends.  The first element
638 in the returned array is the master, and the remainings are each of the
639 replicants.
640
641 =cut
642
643 sub all_storages {
644   my $self = shift @_;
645   return grep {defined $_ && blessed $_} (
646      $self->master,
647      values %{ $self->replicants },
648   );
649 }
650
651 =head2 execute_reliably ($coderef, ?@args)
652
653 Given a coderef, saves the current state of the L</read_handler>, forces it to
654 use reliable storage (e.g. sets it to the master), executes a coderef and then
655 restores the original state.
656
657 Example:
658
659   my $reliably = sub {
660     my $name = shift @_;
661     $schema->resultset('User')->create({name=>$name});
662     my $user_rs = $schema->resultset('User')->find({name=>$name});
663     return $user_rs;
664   };
665
666   my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
667
668 Use this when you must be certain of your database state, such as when you just
669 inserted something and need to get a resultset including it, etc.
670
671 =cut
672
673 sub execute_reliably {
674   my $self = shift;
675   my $coderef = shift;
676
677   unless( ref $coderef eq 'CODE') {
678     $self->throw_exception('Second argument must be a coderef');
679   }
680
681   ## replace the current read handler for the remainder of the scope
682   local $self->{read_handler} = $self->master;
683
684   my $args = \@_;
685   return try {
686     $coderef->(@$args);
687   } catch {
688     $self->throw_exception("coderef returned an error: $_");
689   };
690 }
691
692 =head2 set_reliable_storage
693
694 Sets the current $schema to be 'reliable', that is all queries, both read and
695 write are sent to the master
696
697 =cut
698
699 sub set_reliable_storage {
700   my $self = shift @_;
701   my $schema = $self->schema;
702   my $write_handler = $self->schema->storage->write_handler;
703
704   $schema->storage->read_handler($write_handler);
705 }
706
707 =head2 set_balanced_storage
708
709 Sets the current $schema to be use the </balancer> for all reads, while all
710 writes are sent to the master only
711
712 =cut
713
714 sub set_balanced_storage {
715   my $self = shift @_;
716   my $schema = $self->schema;
717   my $balanced_handler = $self->schema->storage->balancer;
718
719   $schema->storage->read_handler($balanced_handler);
720 }
721
722 =head2 connected
723
724 Check that the master and at least one of the replicants is connected.
725
726 =cut
727
728 sub connected {
729   my $self = shift @_;
730   return
731     $self->master->connected &&
732     $self->pool->connected_replicants;
733 }
734
735 =head2 ensure_connected
736
737 Make sure all the storages are connected.
738
739 =cut
740
741 sub ensure_connected {
742   my $self = shift @_;
743   foreach my $source ($self->all_storages) {
744     $source->ensure_connected(@_);
745   }
746 }
747
748 =head2 limit_dialect
749
750 Set the limit_dialect for all existing storages
751
752 =cut
753
754 sub limit_dialect {
755   my $self = shift @_;
756   foreach my $source ($self->all_storages) {
757     $source->limit_dialect(@_);
758   }
759   return $self->master->limit_dialect;
760 }
761
762 =head2 quote_char
763
764 Set the quote_char for all existing storages
765
766 =cut
767
768 sub quote_char {
769   my $self = shift @_;
770   foreach my $source ($self->all_storages) {
771     $source->quote_char(@_);
772   }
773   return $self->master->quote_char;
774 }
775
776 =head2 name_sep
777
778 Set the name_sep for all existing storages
779
780 =cut
781
782 sub name_sep {
783   my $self = shift @_;
784   foreach my $source ($self->all_storages) {
785     $source->name_sep(@_);
786   }
787   return $self->master->name_sep;
788 }
789
790 =head2 set_schema
791
792 Set the schema object for all existing storages
793
794 =cut
795
796 sub set_schema {
797   my $self = shift @_;
798   foreach my $source ($self->all_storages) {
799     $source->set_schema(@_);
800   }
801 }
802
803 =head2 debug
804
805 set a debug flag across all storages
806
807 =cut
808
809 sub debug {
810   my $self = shift @_;
811   if(@_) {
812     foreach my $source ($self->all_storages) {
813       $source->debug(@_);
814     }
815   }
816   return $self->master->debug;
817 }
818
819 =head2 debugobj
820
821 set a debug object
822
823 =cut
824
825 sub debugobj {
826   my $self = shift @_;
827   return $self->master->debugobj(@_);
828 }
829
830 =head2 debugfh
831
832 set a debugfh object
833
834 =cut
835
836 sub debugfh {
837   my $self = shift @_;
838   return $self->master->debugfh(@_);
839 }
840
841 =head2 debugcb
842
843 set a debug callback
844
845 =cut
846
847 sub debugcb {
848   my $self = shift @_;
849   return $self->master->debugcb(@_);
850 }
851
852 =head2 disconnect
853
854 disconnect everything
855
856 =cut
857
858 sub disconnect {
859   my $self = shift @_;
860   foreach my $source ($self->all_storages) {
861     $source->disconnect(@_);
862   }
863 }
864
865 =head2 cursor_class
866
867 set cursor class on all storages, or return master's
868
869 =cut
870
871 sub cursor_class {
872   my ($self, $cursor_class) = @_;
873
874   if ($cursor_class) {
875     $_->cursor_class($cursor_class) for $self->all_storages;
876   }
877   $self->master->cursor_class;
878 }
879
880 =head2 cursor
881
882 set cursor class on all storages, or return master's, alias for L</cursor_class>
883 above.
884
885 =cut
886
887 sub cursor {
888   my ($self, $cursor_class) = @_;
889
890   if ($cursor_class) {
891     $_->cursor($cursor_class) for $self->all_storages;
892   }
893   $self->master->cursor;
894 }
895
896 =head2 unsafe
897
898 sets the L<DBIx::Class::Storage::DBI/unsafe> option on all storages or returns
899 master's current setting
900
901 =cut
902
903 sub unsafe {
904   my $self = shift;
905
906   if (@_) {
907     $_->unsafe(@_) for $self->all_storages;
908   }
909
910   return $self->master->unsafe;
911 }
912
913 =head2 disable_sth_caching
914
915 sets the L<DBIx::Class::Storage::DBI/disable_sth_caching> option on all storages
916 or returns master's current setting
917
918 =cut
919
920 sub disable_sth_caching {
921   my $self = shift;
922
923   if (@_) {
924     $_->disable_sth_caching(@_) for $self->all_storages;
925   }
926
927   return $self->master->disable_sth_caching;
928 }
929
930 =head2 lag_behind_master
931
932 returns the highest Replicant L<DBIx::Class::Storage::DBI/lag_behind_master>
933 setting
934
935 =cut
936
937 sub lag_behind_master {
938   my $self = shift;
939
940   return max map $_->lag_behind_master, $self->replicants;
941 }
942
943 =head2 is_replicating
944
945 returns true if all replicants return true for
946 L<DBIx::Class::Storage::DBI/is_replicating>
947
948 =cut
949
950 sub is_replicating {
951   my $self = shift;
952
953   return (grep $_->is_replicating, $self->replicants) == ($self->replicants);
954 }
955
956 =head2 connect_call_datetime_setup
957
958 calls L<DBIx::Class::Storage::DBI/connect_call_datetime_setup> for all storages
959
960 =cut
961
962 sub connect_call_datetime_setup {
963   my $self = shift;
964   $_->connect_call_datetime_setup for $self->all_storages;
965 }
966
967 sub _populate_dbh {
968   my $self = shift;
969   $_->_populate_dbh for $self->all_storages;
970 }
971
972 sub _connect {
973   my $self = shift;
974   $_->_connect for $self->all_storages;
975 }
976
977 sub _rebless {
978   my $self = shift;
979   $_->_rebless for $self->all_storages;
980 }
981
982 sub _determine_driver {
983   my $self = shift;
984   $_->_determine_driver for $self->all_storages;
985 }
986
987 sub _driver_determined {
988   my $self = shift;
989
990   if (@_) {
991     $_->_driver_determined(@_) for $self->all_storages;
992   }
993
994   return $self->master->_driver_determined;
995 }
996
997 sub _init {
998   my $self = shift;
999
1000   $_->_init for $self->all_storages;
1001 }
1002
1003 sub _run_connection_actions {
1004   my $self = shift;
1005
1006   $_->_run_connection_actions for $self->all_storages;
1007 }
1008
1009 sub _do_connection_actions {
1010   my $self = shift;
1011
1012   if (@_) {
1013     $_->_do_connection_actions(@_) for $self->all_storages;
1014   }
1015 }
1016
1017 sub connect_call_do_sql {
1018   my $self = shift;
1019   $_->connect_call_do_sql(@_) for $self->all_storages;
1020 }
1021
1022 sub disconnect_call_do_sql {
1023   my $self = shift;
1024   $_->disconnect_call_do_sql(@_) for $self->all_storages;
1025 }
1026
1027 sub _seems_connected {
1028   my $self = shift;
1029
1030   return min map $_->_seems_connected, $self->all_storages;
1031 }
1032
1033 sub _ping {
1034   my $self = shift;
1035
1036   return min map $_->_ping, $self->all_storages;
1037 }
1038
1039 # not using the normalized_version, because we want to preserve
1040 # version numbers much longer than the conventional xxx.yyyzzz
1041 my $numify_ver = sub {
1042   my $ver = shift;
1043   my @numparts = split /\D+/, $ver;
1044   my $format = '%d.' . (join '', ('%06d') x (@numparts - 1));
1045
1046   return sprintf $format, @numparts;
1047 };
1048 sub _server_info {
1049   my $self = shift;
1050
1051   if (not $self->_dbh_details->{info}) {
1052     $self->_dbh_details->{info} = (
1053       reduce { $a->[0] < $b->[0] ? $a : $b }
1054       map [ $numify_ver->($_->{dbms_version}), $_ ],
1055       map $_->_server_info, $self->all_storages
1056     )->[1];
1057   }
1058
1059   return $self->next::method;
1060 }
1061
1062 sub _get_server_version {
1063   my $self = shift;
1064
1065   return $self->_server_info->{dbms_version};
1066 }
1067
1068 =head1 GOTCHAS
1069
1070 Due to the fact that replicants can lag behind a master, you must take care to
1071 make sure you use one of the methods to force read queries to a master should
1072 you need realtime data integrity.  For example, if you insert a row, and then
1073 immediately re-read it from the database (say, by doing $row->discard_changes)
1074 or you insert a row and then immediately build a query that expects that row
1075 to be an item, you should force the master to handle reads.  Otherwise, due to
1076 the lag, there is no certainty your data will be in the expected state.
1077
1078 For data integrity, all transactions automatically use the master storage for
1079 all read and write queries.  Using a transaction is the preferred and recommended
1080 method to force the master to handle all read queries.
1081
1082 Otherwise, you can force a single query to use the master with the 'force_pool'
1083 attribute:
1084
1085   my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk);
1086
1087 This attribute will safely be ignore by non replicated storages, so you can use
1088 the same code for both types of systems.
1089
1090 Lastly, you can use the L</execute_reliably> method, which works very much like
1091 a transaction.
1092
1093 For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
1094 and L</set_balanced_storage>, however this operates at a global level and is not
1095 suitable if you have a shared Schema object being used by multiple processes,
1096 such as on a web application server.  You can get around this limitation by
1097 using the Schema clone method.
1098
1099   my $new_schema = $schema->clone;
1100   $new_schema->set_reliable_storage;
1101
1102   ## $new_schema will use only the Master storage for all reads/writes while
1103   ## the $schema object will use replicated storage.
1104
1105 =head1 AUTHOR
1106
1107   John Napiorkowski <john.napiorkowski@takkle.com>
1108
1109 Based on code originated by:
1110
1111   Norbert Csongrádi <bert@cpan.org>
1112   Peter Siklósi <einon@einon.hu>
1113
1114 =head1 LICENSE
1115
1116 You may distribute this code under the same terms as Perl itself.
1117
1118 =cut
1119
1120 __PACKAGE__->meta->make_immutable;
1121
1122 1;