Cleanup Oracle's 00a28188 / add support for update/delete with blobs in WHERE
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 BEGIN {
4   use DBIx::Class;
5   die('The following modules are required for Replication ' . DBIx::Class::Optional::Dependencies->req_missing_for ('replicated') . "\n" )
6     unless DBIx::Class::Optional::Dependencies->req_ok_for ('replicated');
7 }
8
9 use Moose;
10 use DBIx::Class::Storage::DBI;
11 use DBIx::Class::Storage::DBI::Replicated::Pool;
12 use DBIx::Class::Storage::DBI::Replicated::Balancer;
13 use DBIx::Class::Storage::DBI::Replicated::Types qw/BalancerClassNamePart DBICSchema DBICStorageDBI/;
14 use MooseX::Types::Moose qw/ClassName HashRef Object/;
15 use Scalar::Util 'reftype';
16 use Hash::Merge;
17 use List::Util qw/min max reduce/;
18 use Try::Tiny;
19 use namespace::clean;
20
21 use namespace::clean -except => 'meta';
22
23 =head1 NAME
24
25 DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
26
27 =head1 SYNOPSIS
28
29 The Following example shows how to change an existing $schema to a replicated
30 storage type, add some replicated (read-only) databases, and perform reporting
31 tasks.
32
33 You should set the 'storage_type attribute to a replicated type.  You should
34 also define your arguments, such as which balancer you want and any arguments
35 that the Pool object should get.
36
37   my $schema = Schema::Class->clone;
38   $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
39   $schema->connection(...);
40
41 Next, you need to add in the Replicants.  Basically this is an array of
42 arrayrefs, where each arrayref is database connect information.  Think of these
43 arguments as what you'd pass to the 'normal' $schema->connect method.
44
45   $schema->storage->connect_replicants(
46     [$dsn1, $user, $pass, \%opts],
47     [$dsn2, $user, $pass, \%opts],
48     [$dsn3, $user, $pass, \%opts],
49   );
50
51 Now, just use the $schema as you normally would.  Automatically all reads will
52 be delegated to the replicants, while writes to the master.
53
54   $schema->resultset('Source')->search({name=>'etc'});
55
56 You can force a given query to use a particular storage using the search
57 attribute 'force_pool'.  For example:
58
59   my $RS = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
60
61 Now $RS will force everything (both reads and writes) to use whatever was setup
62 as the master storage.  'master' is hardcoded to always point to the Master,
63 but you can also use any Replicant name.  Please see:
64 L<DBIx::Class::Storage::DBI::Replicated::Pool> and the replicants attribute for more.
65
66 Also see transactions and L</execute_reliably> for alternative ways to
67 force read traffic to the master.  In general, you should wrap your statements
68 in a transaction when you are reading and writing to the same tables at the
69 same time, since your replicants will often lag a bit behind the master.
70
71 See L<DBIx::Class::Storage::DBI::Replicated::Instructions> for more help and
72 walkthroughs.
73
74 =head1 DESCRIPTION
75
76 Warning: This class is marked BETA.  This has been running a production
77 website using MySQL native replication as its backend and we have some decent
78 test coverage but the code hasn't yet been stressed by a variety of databases.
79 Individual DBs may have quirks we are not aware of.  Please use this in first
80 development and pass along your experiences/bug fixes.
81
82 This class implements replicated data store for DBI. Currently you can define
83 one master and numerous slave database connections. All write-type queries
84 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
85 database, all read-type queries (SELECTs) go to the slave database.
86
87 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
88 handle gets delegated to one of the two attributes: L</read_handler> or to
89 L</write_handler>.  Additionally, some methods need to be distributed
90 to all existing storages.  This way our storage class is a drop in replacement
91 for L<DBIx::Class::Storage::DBI>.
92
93 Read traffic is spread across the replicants (slaves) occurring to a user
94 selected algorithm.  The default algorithm is random weighted.
95
96 =head1 NOTES
97
98 The consistency between master and replicants is database specific.  The Pool
99 gives you a method to validate its replicants, removing and replacing them
100 when they fail/pass predefined criteria.  Please make careful use of the ways
101 to force a query to run against Master when needed.
102
103 =head1 REQUIREMENTS
104
105 Replicated Storage has additional requirements not currently part of
106 L<DBIx::Class>. See L<DBIx::Class::Optional::Dependencies> for more details.
107
108 =head1 ATTRIBUTES
109
110 This class defines the following attributes.
111
112 =head2 schema
113
114 The underlying L<DBIx::Class::Schema> object this storage is attaching
115
116 =cut
117
118 has 'schema' => (
119     is=>'rw',
120     isa=>DBICSchema,
121     weak_ref=>1,
122     required=>1,
123 );
124
125 =head2 pool_type
126
127 Contains the classname which will instantiate the L</pool> object.  Defaults
128 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
129
130 =cut
131
132 has 'pool_type' => (
133   is=>'rw',
134   isa=>ClassName,
135   default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
136   handles=>{
137     'create_pool' => 'new',
138   },
139 );
140
141 =head2 pool_args
142
143 Contains a hashref of initialized information to pass to the Balancer object.
144 See L<DBIx::Class::Storage::DBI::Replicated::Pool> for available arguments.
145
146 =cut
147
148 has 'pool_args' => (
149   is=>'rw',
150   isa=>HashRef,
151   lazy=>1,
152   default=>sub { {} },
153 );
154
155
156 =head2 balancer_type
157
158 The replication pool requires a balance class to provider the methods for
159 choose how to spread the query load across each replicant in the pool.
160
161 =cut
162
163 has 'balancer_type' => (
164   is=>'rw',
165   isa=>BalancerClassNamePart,
166   coerce=>1,
167   required=>1,
168   default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
169   handles=>{
170     'create_balancer' => 'new',
171   },
172 );
173
174 =head2 balancer_args
175
176 Contains a hashref of initialized information to pass to the Balancer object.
177 See L<DBIx::Class::Storage::DBI::Replicated::Balancer> for available arguments.
178
179 =cut
180
181 has 'balancer_args' => (
182   is=>'rw',
183   isa=>HashRef,
184   lazy=>1,
185   required=>1,
186   default=>sub { {} },
187 );
188
189 =head2 pool
190
191 Is a L<DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
192 container class for one or more replicated databases.
193
194 =cut
195
196 has 'pool' => (
197   is=>'ro',
198   isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
199   lazy_build=>1,
200   handles=>[qw/
201     connect_replicants
202     replicants
203     has_replicants
204   /],
205 );
206
207 =head2 balancer
208
209 Is a L<DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This
210 is a class that takes a pool (L<DBIx::Class::Storage::DBI::Replicated::Pool>)
211
212 =cut
213
214 has 'balancer' => (
215   is=>'rw',
216   isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
217   lazy_build=>1,
218   handles=>[qw/auto_validate_every/],
219 );
220
221 =head2 master
222
223 The master defines the canonical state for a pool of connected databases.  All
224 the replicants are expected to match this databases state.  Thus, in a classic
225 Master / Slaves distributed system, all the slaves are expected to replicate
226 the Master's state as quick as possible.  This is the only database in the
227 pool of databases that is allowed to handle write traffic.
228
229 =cut
230
231 has 'master' => (
232   is=> 'ro',
233   isa=>DBICStorageDBI,
234   lazy_build=>1,
235 );
236
237 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
238
239 The following methods are delegated all the methods required for the
240 L<DBIx::Class::Storage::DBI> interface.
241
242 =cut
243
244 my $method_dispatch = {
245   writer => [qw/
246     on_connect_do
247     on_disconnect_do
248     on_connect_call
249     on_disconnect_call
250     connect_info
251     _connect_info
252     throw_exception
253     sql_maker
254     sqlt_type
255     create_ddl_dir
256     deployment_statements
257     datetime_parser
258     datetime_parser_type
259     build_datetime_parser
260     last_insert_id
261     insert
262     insert_bulk
263     update
264     delete
265     dbh
266     txn_begin
267     txn_do
268     txn_commit
269     txn_rollback
270     txn_scope_guard
271     _exec_txn_rollback
272     _exec_txn_begin
273     _exec_txn_commit
274     deploy
275     with_deferred_fk_checks
276     dbh_do
277     _prep_for_execute
278     is_datatype_numeric
279     _count_select
280     _subq_update_delete
281     svp_rollback
282     svp_begin
283     svp_release
284     relname_to_table_alias
285     _dbh_last_insert_id
286     _default_dbi_connect_attributes
287     _dbi_connect_info
288     _dbic_connect_attributes
289     auto_savepoint
290     _query_start
291     _query_end
292     _format_for_trace
293     _dbi_attrs_for_bind
294     bind_attribute_by_data_type
295     transaction_depth
296     _dbh
297     _select_args
298     _dbh_execute_array
299     _sql_maker
300     _per_row_update_delete
301     _dbh_execute_inserts_with_no_binds
302     _select_args_to_query
303     _gen_sql_bind
304     _svp_generate_name
305     _multipk_update_delete
306     _normalize_connect_info
307     _parse_connect_do
308     _execute_array
309     savepoints
310     _sql_maker_opts
311     _conn_pid
312     _dbh_autocommit
313     _native_data_type
314     _get_dbh
315     sql_maker_class
316     _adjust_select_args_for_complex_prefetch
317     _resolve_ident_sources
318     _resolve_column_info
319     _prune_unused_joins
320     _strip_cond_qualifiers
321     _strip_cond_qualifiers_from_array
322     _resolve_aliastypes_from_select_args
323     _execute
324     _do_query
325     _sth
326     _dbh_sth
327     _dbh_execute
328   /],
329   reader => [qw/
330     select
331     select_single
332     columns_info_for
333     _dbh_columns_info_for
334     _select
335   /],
336   unimplemented => [qw/
337     _arm_global_destructor
338     _verify_pid
339
340     source_bind_attributes
341
342     get_use_dbms_capability
343     set_use_dbms_capability
344     get_dbms_capability
345     set_dbms_capability
346     _dbh_details
347     _dbh_get_info
348
349     sql_limit_dialect
350     sql_quote_char
351     sql_name_sep
352
353     _inner_join_to_node
354     _group_over_selection
355     _extract_order_criteria
356
357     _prefetch_autovalues
358
359     _max_column_bytesize
360     _is_lob_type
361     _is_binary_lob_type
362     _is_text_lob_type
363
364     sth
365   /,(
366     # the capability framework
367     # not sure if CMOP->initialize does evil things to DBIC::S::DBI, fix if a problem
368     grep
369       { $_ =~ /^ _ (?: use | supports | determine_supports ) _ /x }
370       ( Class::MOP::Class->initialize('DBIx::Class::Storage::DBI')->get_all_method_names )
371   )],
372 };
373
374 if (DBIx::Class::_ENV_::DBICTEST) {
375
376   my $seen;
377   for my $type (keys %$method_dispatch) {
378     for (@{$method_dispatch->{$type}}) {
379       push @{$seen->{$_}}, $type;
380     }
381   }
382
383   if (my @dupes = grep { @{$seen->{$_}} > 1 } keys %$seen) {
384     die(join "\n", '',
385       'The following methods show up multiple times in ::Storage::DBI::Replicated handlers:',
386       (map { "$_: " . (join ', ', @{$seen->{$_}}) } sort @dupes),
387       '',
388     );
389   }
390
391   if (my @cant = grep { ! DBIx::Class::Storage::DBI->can($_) } keys %$seen) {
392     die(join "\n", '',
393       '::Storage::DBI::Replicated specifies handling of the following *NON EXISTING* ::Storage::DBI methods:',
394       @cant,
395       '',
396     );
397   }
398 }
399
400 for my $method (@{$method_dispatch->{unimplemented}}) {
401   __PACKAGE__->meta->add_method($method, sub {
402     my $self = shift;
403     $self->throw_exception("$method must not be called on ".(blessed $self).' objects');
404   });
405 }
406
407 =head2 read_handler
408
409 Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
410
411 =cut
412
413 has 'read_handler' => (
414   is=>'rw',
415   isa=>Object,
416   lazy_build=>1,
417   handles=>$method_dispatch->{reader},
418 );
419
420 =head2 write_handler
421
422 Defines an object that implements the write side of L<BIx::Class::Storage::DBI>,
423 as well as methods that don't write or read that can be called on only one
424 storage, methods that return a C<$dbh>, and any methods that don't make sense to
425 run on a replicant.
426
427 =cut
428
429 has 'write_handler' => (
430   is=>'ro',
431   isa=>Object,
432   lazy_build=>1,
433   handles=>$method_dispatch->{writer},
434 );
435
436
437
438 has _master_connect_info_opts =>
439   (is => 'rw', isa => HashRef, default => sub { {} });
440
441 =head2 around: connect_info
442
443 Preserves master's C<connect_info> options (for merging with replicants.)
444 Also sets any Replicated-related options from connect_info, such as
445 C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>.
446
447 =cut
448
449 around connect_info => sub {
450   my ($next, $self, $info, @extra) = @_;
451
452   my $merge = Hash::Merge->new('LEFT_PRECEDENT');
453
454   my %opts;
455   for my $arg (@$info) {
456     next unless (reftype($arg)||'') eq 'HASH';
457     %opts = %{ $merge->merge($arg, \%opts) };
458   }
459   delete $opts{dsn};
460
461   if (@opts{qw/pool_type pool_args/}) {
462     $self->pool_type(delete $opts{pool_type})
463       if $opts{pool_type};
464
465     $self->pool_args(
466       $merge->merge((delete $opts{pool_args} || {}), $self->pool_args)
467     );
468
469     ## Since we possibly changed the pool_args, we need to clear the current
470     ## pool object so that next time it is used it will be rebuilt.
471     $self->clear_pool;
472   }
473
474   if (@opts{qw/balancer_type balancer_args/}) {
475     $self->balancer_type(delete $opts{balancer_type})
476       if $opts{balancer_type};
477
478     $self->balancer_args(
479       $merge->merge((delete $opts{balancer_args} || {}), $self->balancer_args)
480     );
481
482     $self->balancer($self->_build_balancer)
483       if $self->balancer;
484   }
485
486   $self->_master_connect_info_opts(\%opts);
487
488   my @res;
489   if (wantarray) {
490     @res = $self->$next($info, @extra);
491   } else {
492     $res[0] = $self->$next($info, @extra);
493   }
494
495   # Make sure master is blessed into the correct class and apply role to it.
496   my $master = $self->master;
497   $master->_determine_driver;
498   Moose::Meta::Class->initialize(ref $master);
499
500   DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master);
501
502   # link pool back to master
503   $self->pool->master($master);
504
505   wantarray ? @res : $res[0];
506 };
507
508 =head1 METHODS
509
510 This class defines the following methods.
511
512 =head2 BUILDARGS
513
514 L<DBIx::Class::Schema> when instantiating its storage passed itself as the
515 first argument.  So we need to massage the arguments a bit so that all the
516 bits get put into the correct places.
517
518 =cut
519
520 sub BUILDARGS {
521   my ($class, $schema, $storage_type_args, @args) = @_;
522
523   return {
524     schema=>$schema,
525     %$storage_type_args,
526     @args
527   }
528 }
529
530 =head2 _build_master
531
532 Lazy builder for the L</master> attribute.
533
534 =cut
535
536 sub _build_master {
537   my $self = shift @_;
538   my $master = DBIx::Class::Storage::DBI->new($self->schema);
539   $master
540 }
541
542 =head2 _build_pool
543
544 Lazy builder for the L</pool> attribute.
545
546 =cut
547
548 sub _build_pool {
549   my $self = shift @_;
550   $self->create_pool(%{$self->pool_args});
551 }
552
553 =head2 _build_balancer
554
555 Lazy builder for the L</balancer> attribute.  This takes a Pool object so that
556 the balancer knows which pool it's balancing.
557
558 =cut
559
560 sub _build_balancer {
561   my $self = shift @_;
562   $self->create_balancer(
563     pool=>$self->pool,
564     master=>$self->master,
565     %{$self->balancer_args},
566   );
567 }
568
569 =head2 _build_write_handler
570
571 Lazy builder for the L</write_handler> attribute.  The default is to set this to
572 the L</master>.
573
574 =cut
575
576 sub _build_write_handler {
577   return shift->master;
578 }
579
580 =head2 _build_read_handler
581
582 Lazy builder for the L</read_handler> attribute.  The default is to set this to
583 the L</balancer>.
584
585 =cut
586
587 sub _build_read_handler {
588   return shift->balancer;
589 }
590
591 =head2 around: connect_replicants
592
593 All calls to connect_replicants needs to have an existing $schema tacked onto
594 top of the args, since L<DBIx::Storage::DBI> needs it, and any C<connect_info>
595 options merged with the master, with replicant opts having higher priority.
596
597 =cut
598
599 around connect_replicants => sub {
600   my ($next, $self, @args) = @_;
601
602   for my $r (@args) {
603     $r = [ $r ] unless reftype $r eq 'ARRAY';
604
605     $self->throw_exception('coderef replicant connect_info not supported')
606       if ref $r->[0] && reftype $r->[0] eq 'CODE';
607
608 # any connect_info options?
609     my $i = 0;
610     $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
611
612 # make one if none
613     $r->[$i] = {} unless $r->[$i];
614
615 # merge if two hashes
616     my @hashes = @$r[$i .. $#{$r}];
617
618     $self->throw_exception('invalid connect_info options')
619       if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes;
620
621     $self->throw_exception('too many hashrefs in connect_info')
622       if @hashes > 2;
623
624     my $merge = Hash::Merge->new('LEFT_PRECEDENT');
625     my %opts = %{ $merge->merge(reverse @hashes) };
626
627 # delete them
628     splice @$r, $i+1, ($#{$r} - $i), ();
629
630 # make sure master/replicants opts don't clash
631     my %master_opts = %{ $self->_master_connect_info_opts };
632     if (exists $opts{dbh_maker}) {
633         delete @master_opts{qw/dsn user password/};
634     }
635     delete $master_opts{dbh_maker};
636
637 # merge with master
638     %opts = %{ $merge->merge(\%opts, \%master_opts) };
639
640 # update
641     $r->[$i] = \%opts;
642   }
643
644   $self->$next($self->schema, @args);
645 };
646
647 =head2 all_storages
648
649 Returns an array of of all the connected storage backends.  The first element
650 in the returned array is the master, and the remainings are each of the
651 replicants.
652
653 =cut
654
655 sub all_storages {
656   my $self = shift @_;
657   return grep {defined $_ && blessed $_} (
658      $self->master,
659      values %{ $self->replicants },
660   );
661 }
662
663 =head2 execute_reliably ($coderef, ?@args)
664
665 Given a coderef, saves the current state of the L</read_handler>, forces it to
666 use reliable storage (e.g. sets it to the master), executes a coderef and then
667 restores the original state.
668
669 Example:
670
671   my $reliably = sub {
672     my $name = shift @_;
673     $schema->resultset('User')->create({name=>$name});
674     my $user_rs = $schema->resultset('User')->find({name=>$name});
675     return $user_rs;
676   };
677
678   my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
679
680 Use this when you must be certain of your database state, such as when you just
681 inserted something and need to get a resultset including it, etc.
682
683 =cut
684
685 sub execute_reliably {
686   my ($self, $coderef, @args) = @_;
687
688   unless( ref $coderef eq 'CODE') {
689     $self->throw_exception('Second argument must be a coderef');
690   }
691
692   ##Get copy of master storage
693   my $master = $self->master;
694
695   ##Get whatever the current read hander is
696   my $current = $self->read_handler;
697
698   ##Set the read handler to master
699   $self->read_handler($master);
700
701   ## do whatever the caller needs
702   my @result;
703   my $want_array = wantarray;
704
705   try {
706     if($want_array) {
707       @result = $coderef->(@args);
708     } elsif(defined $want_array) {
709       ($result[0]) = ($coderef->(@args));
710     } else {
711       $coderef->(@args);
712     }
713   } catch {
714     $self->throw_exception("coderef returned an error: $_");
715   } finally {
716     ##Reset to the original state
717     $self->read_handler($current);
718   };
719
720   return wantarray ? @result : $result[0];
721 }
722
723 =head2 set_reliable_storage
724
725 Sets the current $schema to be 'reliable', that is all queries, both read and
726 write are sent to the master
727
728 =cut
729
730 sub set_reliable_storage {
731   my $self = shift @_;
732   my $schema = $self->schema;
733   my $write_handler = $self->schema->storage->write_handler;
734
735   $schema->storage->read_handler($write_handler);
736 }
737
738 =head2 set_balanced_storage
739
740 Sets the current $schema to be use the </balancer> for all reads, while all
741 writes are sent to the master only
742
743 =cut
744
745 sub set_balanced_storage {
746   my $self = shift @_;
747   my $schema = $self->schema;
748   my $balanced_handler = $self->schema->storage->balancer;
749
750   $schema->storage->read_handler($balanced_handler);
751 }
752
753 =head2 connected
754
755 Check that the master and at least one of the replicants is connected.
756
757 =cut
758
759 sub connected {
760   my $self = shift @_;
761   return
762     $self->master->connected &&
763     $self->pool->connected_replicants;
764 }
765
766 =head2 ensure_connected
767
768 Make sure all the storages are connected.
769
770 =cut
771
772 sub ensure_connected {
773   my $self = shift @_;
774   foreach my $source ($self->all_storages) {
775     $source->ensure_connected(@_);
776   }
777 }
778
779 =head2 limit_dialect
780
781 Set the limit_dialect for all existing storages
782
783 =cut
784
785 sub limit_dialect {
786   my $self = shift @_;
787   foreach my $source ($self->all_storages) {
788     $source->limit_dialect(@_);
789   }
790   return $self->master->limit_dialect;
791 }
792
793 =head2 quote_char
794
795 Set the quote_char for all existing storages
796
797 =cut
798
799 sub quote_char {
800   my $self = shift @_;
801   foreach my $source ($self->all_storages) {
802     $source->quote_char(@_);
803   }
804   return $self->master->quote_char;
805 }
806
807 =head2 name_sep
808
809 Set the name_sep for all existing storages
810
811 =cut
812
813 sub name_sep {
814   my $self = shift @_;
815   foreach my $source ($self->all_storages) {
816     $source->name_sep(@_);
817   }
818   return $self->master->name_sep;
819 }
820
821 =head2 set_schema
822
823 Set the schema object for all existing storages
824
825 =cut
826
827 sub set_schema {
828   my $self = shift @_;
829   foreach my $source ($self->all_storages) {
830     $source->set_schema(@_);
831   }
832 }
833
834 =head2 debug
835
836 set a debug flag across all storages
837
838 =cut
839
840 sub debug {
841   my $self = shift @_;
842   if(@_) {
843     foreach my $source ($self->all_storages) {
844       $source->debug(@_);
845     }
846   }
847   return $self->master->debug;
848 }
849
850 =head2 debugobj
851
852 set a debug object
853
854 =cut
855
856 sub debugobj {
857   my $self = shift @_;
858   return $self->master->debugobj(@_);
859 }
860
861 =head2 debugfh
862
863 set a debugfh object
864
865 =cut
866
867 sub debugfh {
868   my $self = shift @_;
869   return $self->master->debugfh(@_);
870 }
871
872 =head2 debugcb
873
874 set a debug callback
875
876 =cut
877
878 sub debugcb {
879   my $self = shift @_;
880   return $self->master->debugcb(@_);
881 }
882
883 =head2 disconnect
884
885 disconnect everything
886
887 =cut
888
889 sub disconnect {
890   my $self = shift @_;
891   foreach my $source ($self->all_storages) {
892     $source->disconnect(@_);
893   }
894 }
895
896 =head2 cursor_class
897
898 set cursor class on all storages, or return master's
899
900 =cut
901
902 sub cursor_class {
903   my ($self, $cursor_class) = @_;
904
905   if ($cursor_class) {
906     $_->cursor_class($cursor_class) for $self->all_storages;
907   }
908   $self->master->cursor_class;
909 }
910
911 =head2 cursor
912
913 set cursor class on all storages, or return master's, alias for L</cursor_class>
914 above.
915
916 =cut
917
918 sub cursor {
919   my ($self, $cursor_class) = @_;
920
921   if ($cursor_class) {
922     $_->cursor($cursor_class) for $self->all_storages;
923   }
924   $self->master->cursor;
925 }
926
927 =head2 unsafe
928
929 sets the L<DBIx::Class::Storage::DBI/unsafe> option on all storages or returns
930 master's current setting
931
932 =cut
933
934 sub unsafe {
935   my $self = shift;
936
937   if (@_) {
938     $_->unsafe(@_) for $self->all_storages;
939   }
940
941   return $self->master->unsafe;
942 }
943
944 =head2 disable_sth_caching
945
946 sets the L<DBIx::Class::Storage::DBI/disable_sth_caching> option on all storages
947 or returns master's current setting
948
949 =cut
950
951 sub disable_sth_caching {
952   my $self = shift;
953
954   if (@_) {
955     $_->disable_sth_caching(@_) for $self->all_storages;
956   }
957
958   return $self->master->disable_sth_caching;
959 }
960
961 =head2 lag_behind_master
962
963 returns the highest Replicant L<DBIx::Class::Storage::DBI/lag_behind_master>
964 setting
965
966 =cut
967
968 sub lag_behind_master {
969   my $self = shift;
970
971   return max map $_->lag_behind_master, $self->replicants;
972 }
973
974 =head2 is_replicating
975
976 returns true if all replicants return true for
977 L<DBIx::Class::Storage::DBI/is_replicating>
978
979 =cut
980
981 sub is_replicating {
982   my $self = shift;
983
984   return (grep $_->is_replicating, $self->replicants) == ($self->replicants);
985 }
986
987 =head2 connect_call_datetime_setup
988
989 calls L<DBIx::Class::Storage::DBI/connect_call_datetime_setup> for all storages
990
991 =cut
992
993 sub connect_call_datetime_setup {
994   my $self = shift;
995   $_->connect_call_datetime_setup for $self->all_storages;
996 }
997
998 sub _populate_dbh {
999   my $self = shift;
1000   $_->_populate_dbh for $self->all_storages;
1001 }
1002
1003 sub _connect {
1004   my $self = shift;
1005   $_->_connect for $self->all_storages;
1006 }
1007
1008 sub _rebless {
1009   my $self = shift;
1010   $_->_rebless for $self->all_storages;
1011 }
1012
1013 sub _determine_driver {
1014   my $self = shift;
1015   $_->_determine_driver for $self->all_storages;
1016 }
1017
1018 sub _driver_determined {
1019   my $self = shift;
1020
1021   if (@_) {
1022     $_->_driver_determined(@_) for $self->all_storages;
1023   }
1024
1025   return $self->master->_driver_determined;
1026 }
1027
1028 sub _init {
1029   my $self = shift;
1030
1031   $_->_init for $self->all_storages;
1032 }
1033
1034 sub _run_connection_actions {
1035   my $self = shift;
1036
1037   $_->_run_connection_actions for $self->all_storages;
1038 }
1039
1040 sub _do_connection_actions {
1041   my $self = shift;
1042
1043   if (@_) {
1044     $_->_do_connection_actions(@_) for $self->all_storages;
1045   }
1046 }
1047
1048 sub connect_call_do_sql {
1049   my $self = shift;
1050   $_->connect_call_do_sql(@_) for $self->all_storages;
1051 }
1052
1053 sub disconnect_call_do_sql {
1054   my $self = shift;
1055   $_->disconnect_call_do_sql(@_) for $self->all_storages;
1056 }
1057
1058 sub _seems_connected {
1059   my $self = shift;
1060
1061   return min map $_->_seems_connected, $self->all_storages;
1062 }
1063
1064 sub _ping {
1065   my $self = shift;
1066
1067   return min map $_->_ping, $self->all_storages;
1068 }
1069
1070 # not using the normalized_version, because we want to preserve
1071 # version numbers much longer than the conventional xxx.yyyzzz
1072 my $numify_ver = sub {
1073   my $ver = shift;
1074   my @numparts = split /\D+/, $ver;
1075   my $format = '%d.' . (join '', ('%06d') x (@numparts - 1));
1076
1077   return sprintf $format, @numparts;
1078 };
1079 sub _server_info {
1080   my $self = shift;
1081
1082   if (not $self->_dbh_details->{info}) {
1083     $self->_dbh_details->{info} = (
1084       reduce { $a->[0] < $b->[0] ? $a : $b }
1085       map [ $numify_ver->($_->{dbms_version}), $_ ],
1086       map $_->_server_info, $self->all_storages
1087     )->[1];
1088   }
1089
1090   return $self->next::method;
1091 }
1092
1093 sub _get_server_version {
1094   my $self = shift;
1095
1096   return $self->_server_info->{dbms_version};
1097 }
1098
1099 =head1 GOTCHAS
1100
1101 Due to the fact that replicants can lag behind a master, you must take care to
1102 make sure you use one of the methods to force read queries to a master should
1103 you need realtime data integrity.  For example, if you insert a row, and then
1104 immediately re-read it from the database (say, by doing $row->discard_changes)
1105 or you insert a row and then immediately build a query that expects that row
1106 to be an item, you should force the master to handle reads.  Otherwise, due to
1107 the lag, there is no certainty your data will be in the expected state.
1108
1109 For data integrity, all transactions automatically use the master storage for
1110 all read and write queries.  Using a transaction is the preferred and recommended
1111 method to force the master to handle all read queries.
1112
1113 Otherwise, you can force a single query to use the master with the 'force_pool'
1114 attribute:
1115
1116   my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk);
1117
1118 This attribute will safely be ignore by non replicated storages, so you can use
1119 the same code for both types of systems.
1120
1121 Lastly, you can use the L</execute_reliably> method, which works very much like
1122 a transaction.
1123
1124 For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
1125 and L</set_balanced_storage>, however this operates at a global level and is not
1126 suitable if you have a shared Schema object being used by multiple processes,
1127 such as on a web application server.  You can get around this limitation by
1128 using the Schema clone method.
1129
1130   my $new_schema = $schema->clone;
1131   $new_schema->set_reliable_storage;
1132
1133   ## $new_schema will use only the Master storage for all reads/writes while
1134   ## the $schema object will use replicated storage.
1135
1136 =head1 AUTHOR
1137
1138   John Napiorkowski <john.napiorkowski@takkle.com>
1139
1140 Based on code originated by:
1141
1142   Norbert Csongrádi <bert@cpan.org>
1143   Peter Siklósi <einon@einon.hu>
1144
1145 =head1 LICENSE
1146
1147 You may distribute this code under the same terms as Perl itself.
1148
1149 =cut
1150
1151 __PACKAGE__->meta->make_immutable;
1152
1153 1;