Switch the ::Sybase family to _determine_connector_driver (same as 75d3bdb2)
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 use warnings;
4 use strict;
5
6 BEGIN {
7   require DBIx::Class::Optional::Dependencies;
8   if ( my $missing = DBIx::Class::Optional::Dependencies->req_missing_for('replicated') ) {
9     die "The following modules are required for Replicated storage support: $missing\n";
10   }
11 }
12
13 use Moose;
14 use DBIx::Class::Storage::DBI;
15 use DBIx::Class::Storage::DBI::Replicated::Pool;
16 use DBIx::Class::Storage::DBI::Replicated::Balancer;
17 use DBIx::Class::Storage::DBI::Replicated::Types qw/BalancerClassNamePart DBICSchema DBICStorageDBI/;
18 use MooseX::Types::Moose qw/ClassName HashRef Object/;
19 use Scalar::Util 'reftype';
20 use Hash::Merge;
21 use List::Util qw/min max reduce/;
22 use Context::Preserve 'preserve_context';
23 use Try::Tiny;
24
25 use namespace::clean -except => 'meta';
26
27 =head1 NAME
28
29 DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
30
31 =head1 SYNOPSIS
32
33 The Following example shows how to change an existing $schema to a replicated
34 storage type, add some replicated (read-only) databases, and perform reporting
35 tasks.
36
37 You should set the 'storage_type attribute to a replicated type.  You should
38 also define your arguments, such as which balancer you want and any arguments
39 that the Pool object should get.
40
41   my $schema = Schema::Class->clone;
42   $schema->storage_type(['::DBI::Replicated', { balancer_type => '::Random' }]);
43   $schema->connection(...);
44
45 Next, you need to add in the Replicants.  Basically this is an array of
46 arrayrefs, where each arrayref is database connect information.  Think of these
47 arguments as what you'd pass to the 'normal' $schema->connect method.
48
49   $schema->storage->connect_replicants(
50     [$dsn1, $user, $pass, \%opts],
51     [$dsn2, $user, $pass, \%opts],
52     [$dsn3, $user, $pass, \%opts],
53   );
54
55 Now, just use the $schema as you normally would.  Automatically all reads will
56 be delegated to the replicants, while writes to the master.
57
58   $schema->resultset('Source')->search({name=>'etc'});
59
60 You can force a given query to use a particular storage using the search
61 attribute 'force_pool'.  For example:
62
63   my $rs = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
64
65 Now $rs will force everything (both reads and writes) to use whatever was setup
66 as the master storage.  'master' is hardcoded to always point to the Master,
67 but you can also use any Replicant name.  Please see:
68 L<DBIx::Class::Storage::DBI::Replicated::Pool> and the replicants attribute for more.
69
70 Also see transactions and L</execute_reliably> for alternative ways to
71 force read traffic to the master.  In general, you should wrap your statements
72 in a transaction when you are reading and writing to the same tables at the
73 same time, since your replicants will often lag a bit behind the master.
74
75 If you have a multi-statement read only transaction you can force it to select
76 a random server in the pool by:
77
78   my $rs = $schema->resultset('Source')->search( undef,
79     { force_pool => $db->storage->read_handler->next_storage }
80   );
81
82 =head1 DESCRIPTION
83
84 Warning: This class is marked BETA.  This has been running a production
85 website using MySQL native replication as its backend and we have some decent
86 test coverage but the code hasn't yet been stressed by a variety of databases.
87 Individual DBs may have quirks we are not aware of.  Please use this in first
88 development and pass along your experiences/bug fixes.
89
90 This class implements replicated data store for DBI. Currently you can define
91 one master and numerous slave database connections. All write-type queries
92 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
93 database, all read-type queries (SELECTs) go to the slave database.
94
95 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
96 handle gets delegated to one of the two attributes: L</read_handler> or to
97 L</write_handler>.  Additionally, some methods need to be distributed
98 to all existing storages.  This way our storage class is a drop in replacement
99 for L<DBIx::Class::Storage::DBI>.
100
101 Read traffic is spread across the replicants (slaves) occurring to a user
102 selected algorithm.  The default algorithm is random weighted.
103
104 =head1 NOTES
105
106 The consistency between master and replicants is database specific.  The Pool
107 gives you a method to validate its replicants, removing and replacing them
108 when they fail/pass predefined criteria.  Please make careful use of the ways
109 to force a query to run against Master when needed.
110
111 =head1 REQUIREMENTS
112
113 Replicated Storage has additional requirements not currently part of
114 L<DBIx::Class>. See L<DBIx::Class::Optional::Dependencies> for more details.
115
116 =head1 ATTRIBUTES
117
118 This class defines the following attributes.
119
120 =head2 schema
121
122 The underlying L<DBIx::Class::Schema> object this storage is attaching
123
124 =cut
125
126 has 'schema' => (
127     is=>'rw',
128     isa=>DBICSchema,
129     weak_ref=>1,
130     required=>1,
131 );
132
133 =head2 pool_type
134
135 Contains the classname which will instantiate the L</pool> object.  Defaults
136 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
137
138 =cut
139
140 has 'pool_type' => (
141   is=>'rw',
142   isa=>ClassName,
143   default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
144   handles=>{
145     'create_pool' => 'new',
146   },
147 );
148
149 =head2 pool_args
150
151 Contains a hashref of initialized information to pass to the Balancer object.
152 See L<DBIx::Class::Storage::DBI::Replicated::Pool> for available arguments.
153
154 =cut
155
156 has 'pool_args' => (
157   is=>'rw',
158   isa=>HashRef,
159   lazy=>1,
160   default=>sub { {} },
161 );
162
163
164 =head2 balancer_type
165
166 The replication pool requires a balance class to provider the methods for
167 choose how to spread the query load across each replicant in the pool.
168
169 =cut
170
171 has 'balancer_type' => (
172   is=>'rw',
173   isa=>BalancerClassNamePart,
174   coerce=>1,
175   required=>1,
176   default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
177   handles=>{
178     'create_balancer' => 'new',
179   },
180 );
181
182 =head2 balancer_args
183
184 Contains a hashref of initialized information to pass to the Balancer object.
185 See L<DBIx::Class::Storage::DBI::Replicated::Balancer> for available arguments.
186
187 =cut
188
189 has 'balancer_args' => (
190   is=>'rw',
191   isa=>HashRef,
192   lazy=>1,
193   required=>1,
194   default=>sub { {} },
195 );
196
197 =head2 pool
198
199 Is a L<DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
200 container class for one or more replicated databases.
201
202 =cut
203
204 has 'pool' => (
205   is=>'ro',
206   isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
207   lazy_build=>1,
208   handles=>[qw/
209     connect_replicants
210     replicants
211     has_replicants
212   /],
213 );
214
215 =head2 balancer
216
217 Is a L<DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This
218 is a class that takes a pool (L<DBIx::Class::Storage::DBI::Replicated::Pool>)
219
220 =cut
221
222 has 'balancer' => (
223   is=>'rw',
224   isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
225   lazy_build=>1,
226   handles=>[qw/auto_validate_every/],
227 );
228
229 =head2 master
230
231 The master defines the canonical state for a pool of connected databases.  All
232 the replicants are expected to match this databases state.  Thus, in a classic
233 Master / Slaves distributed system, all the slaves are expected to replicate
234 the Master's state as quick as possible.  This is the only database in the
235 pool of databases that is allowed to handle write traffic.
236
237 =cut
238
239 has 'master' => (
240   is=> 'ro',
241   isa=>DBICStorageDBI,
242   lazy_build=>1,
243 );
244
245 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
246
247 The following methods are delegated all the methods required for the
248 L<DBIx::Class::Storage::DBI> interface.
249
250 =cut
251
252 my $method_dispatch = {
253   writer => [qw/
254     on_connect_do
255     on_disconnect_do
256     on_connect_call
257     on_disconnect_call
258     connect_info
259     _connect_info
260     throw_exception
261     sql_maker
262     sqlt_type
263     create_ddl_dir
264     deployment_statements
265     datetime_parser
266     datetime_parser_type
267     build_datetime_parser
268     last_insert_id
269     insert
270     update
271     delete
272     dbh
273     txn_begin
274     txn_do
275     txn_commit
276     txn_rollback
277     txn_scope_guard
278     _exec_txn_rollback
279     _exec_txn_begin
280     _exec_txn_commit
281     deploy
282     with_deferred_fk_checks
283     dbh_do
284     _prep_for_execute
285     is_datatype_numeric
286     _count_select
287     svp_rollback
288     svp_begin
289     svp_release
290     relname_to_table_alias
291     _dbh_last_insert_id
292     _default_dbi_connect_attributes
293     _dbi_connect_info
294     _dbic_connect_attributes
295     auto_savepoint
296     _query_start
297     _query_end
298     _format_for_trace
299     _dbi_attrs_for_bind
300     bind_attribute_by_data_type
301     transaction_depth
302     _dbh
303     _select_args
304     _dbh_execute_for_fetch
305     _sql_maker
306     _dbh_execute_inserts_with_no_binds
307     _select_args_to_query
308     _gen_sql_bind
309     _svp_generate_name
310     _normalize_connect_info
311     _parse_connect_do
312     savepoints
313     _sql_maker_opts
314     _use_multicolumn_in
315     _conn_pid
316     _dbh_autocommit
317     _native_data_type
318     _get_dbh
319     sql_maker_class
320     insert_bulk
321     _insert_bulk
322     _execute
323     _do_query
324     _dbh_execute
325   /, Class::MOP::Class->initialize('DBIx::Class::Storage::DBIHacks')->get_method_list ],
326   reader => [qw/
327     select
328     select_single
329     columns_info_for
330     _dbh_columns_info_for
331     _select
332   /],
333   unimplemented => [qw/
334     _arm_global_destructor
335     _verify_pid
336
337     get_use_dbms_capability
338     set_use_dbms_capability
339     get_dbms_capability
340     set_dbms_capability
341     _dbh_details
342     _dbh_get_info
343     _get_rdbms_name
344
345     _determine_connector_driver
346     _extract_driver_from_connect_info
347     _describe_connection
348     _warn_undetermined_driver
349
350     sql_limit_dialect
351     sql_quote_char
352     sql_name_sep
353
354     _prefetch_autovalues
355     _perform_autoinc_retrieval
356     _autoinc_supplied_for_op
357
358     _resolve_bindattrs
359
360     _max_column_bytesize
361     _is_lob_type
362     _is_binary_lob_type
363     _is_binary_type
364     _is_text_lob_type
365
366     _prepare_sth
367     _bind_sth_params
368   /,(
369     # the capability framework
370     # not sure if CMOP->initialize does evil things to DBIC::S::DBI, fix if a problem
371     grep
372       { $_ =~ /^ _ (?: use | supports | determine_supports ) _ /x and $_ ne '_use_multicolumn_in' }
373       ( Class::MOP::Class->initialize('DBIx::Class::Storage::DBI')->get_all_method_names )
374   )],
375 };
376
377 if (DBIx::Class::_ENV_::DBICTEST) {
378
379   my $seen;
380   for my $type (keys %$method_dispatch) {
381     for (@{$method_dispatch->{$type}}) {
382       push @{$seen->{$_}}, $type;
383     }
384   }
385
386   if (my @dupes = grep { @{$seen->{$_}} > 1 } keys %$seen) {
387     die(join "\n", '',
388       'The following methods show up multiple times in ::Storage::DBI::Replicated handlers:',
389       (map { "$_: " . (join ', ', @{$seen->{$_}}) } sort @dupes),
390       '',
391     );
392   }
393
394   if (my @cant = grep { ! DBIx::Class::Storage::DBI->can($_) } keys %$seen) {
395     die(join "\n", '',
396       '::Storage::DBI::Replicated specifies handling of the following *NON EXISTING* ::Storage::DBI methods:',
397       @cant,
398       '',
399     );
400   }
401 }
402
403 for my $method (@{$method_dispatch->{unimplemented}}) {
404   __PACKAGE__->meta->add_method($method, sub {
405     my $self = shift;
406     $self->throw_exception("$method() must not be called on ".(blessed $self).' objects');
407   });
408 }
409
410 =head2 read_handler
411
412 Defines an object that implements the read side of L<DBIx::Class::Storage::DBI>.
413
414 =cut
415
416 has 'read_handler' => (
417   is=>'rw',
418   isa=>Object,
419   lazy_build=>1,
420   handles=>$method_dispatch->{reader},
421 );
422
423 =head2 write_handler
424
425 Defines an object that implements the write side of L<DBIx::Class::Storage::DBI>,
426 as well as methods that don't write or read that can be called on only one
427 storage, methods that return a C<$dbh>, and any methods that don't make sense to
428 run on a replicant.
429
430 =cut
431
432 has 'write_handler' => (
433   is=>'ro',
434   isa=>Object,
435   lazy_build=>1,
436   handles=>$method_dispatch->{writer},
437 );
438
439
440
441 has _master_connect_info_opts =>
442   (is => 'rw', isa => HashRef, default => sub { {} });
443
444 =head2 around: connect_info
445
446 Preserves master's C<connect_info> options (for merging with replicants.)
447 Also sets any Replicated-related options from connect_info, such as
448 C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>.
449
450 =cut
451
452 around connect_info => sub {
453   my ($next, $self, $info, @extra) = @_;
454
455   $self->throw_exception(
456     'connect_info can not be retrieved from a replicated storage - '
457   . 'accessor must be called on a specific pool instance'
458   ) unless defined $info;
459
460   my $merge = Hash::Merge->new('LEFT_PRECEDENT');
461
462   my %opts;
463   for my $arg (@$info) {
464     next unless (reftype($arg)||'') eq 'HASH';
465     %opts = %{ $merge->merge($arg, \%opts) };
466   }
467   delete $opts{dsn};
468
469   if (@opts{qw/pool_type pool_args/}) {
470     $self->pool_type(delete $opts{pool_type})
471       if $opts{pool_type};
472
473     $self->pool_args(
474       $merge->merge((delete $opts{pool_args} || {}), $self->pool_args)
475     );
476
477     ## Since we possibly changed the pool_args, we need to clear the current
478     ## pool object so that next time it is used it will be rebuilt.
479     $self->clear_pool;
480   }
481
482   if (@opts{qw/balancer_type balancer_args/}) {
483     $self->balancer_type(delete $opts{balancer_type})
484       if $opts{balancer_type};
485
486     $self->balancer_args(
487       $merge->merge((delete $opts{balancer_args} || {}), $self->balancer_args)
488     );
489
490     $self->balancer($self->_build_balancer)
491       if $self->balancer;
492   }
493
494   $self->_master_connect_info_opts(\%opts);
495
496   return preserve_context {
497     $self->$next($info, @extra);
498   } after => sub {
499     # Make sure master is blessed into the correct class and apply role to it.
500     my $master = $self->master;
501     $master->_determine_driver;
502     Moose::Meta::Class->initialize(ref $master);
503
504     DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master);
505
506     # link pool back to master
507     $self->pool->master($master);
508   };
509 };
510
511 =head1 METHODS
512
513 This class defines the following methods.
514
515 =head2 BUILDARGS
516
517 L<DBIx::Class::Schema> when instantiating its storage passed itself as the
518 first argument.  So we need to massage the arguments a bit so that all the
519 bits get put into the correct places.
520
521 =cut
522
523 sub BUILDARGS {
524   my ($class, $schema, $storage_type_args, @args) = @_;
525
526   return {
527     schema=>$schema,
528     %$storage_type_args,
529     @args
530   }
531 }
532
533 =head2 _build_master
534
535 Lazy builder for the L</master> attribute.
536
537 =cut
538
539 sub _build_master {
540   my $self = shift @_;
541   my $master = DBIx::Class::Storage::DBI->new($self->schema);
542   $master
543 }
544
545 =head2 _build_pool
546
547 Lazy builder for the L</pool> attribute.
548
549 =cut
550
551 sub _build_pool {
552   my $self = shift @_;
553   $self->create_pool(%{$self->pool_args});
554 }
555
556 =head2 _build_balancer
557
558 Lazy builder for the L</balancer> attribute.  This takes a Pool object so that
559 the balancer knows which pool it's balancing.
560
561 =cut
562
563 sub _build_balancer {
564   my $self = shift @_;
565   $self->create_balancer(
566     pool=>$self->pool,
567     master=>$self->master,
568     %{$self->balancer_args},
569   );
570 }
571
572 =head2 _build_write_handler
573
574 Lazy builder for the L</write_handler> attribute.  The default is to set this to
575 the L</master>.
576
577 =cut
578
579 sub _build_write_handler {
580   return shift->master;
581 }
582
583 =head2 _build_read_handler
584
585 Lazy builder for the L</read_handler> attribute.  The default is to set this to
586 the L</balancer>.
587
588 =cut
589
590 sub _build_read_handler {
591   return shift->balancer;
592 }
593
594 =head2 around: connect_replicants
595
596 All calls to connect_replicants needs to have an existing $schema tacked onto
597 top of the args, since L<DBIx::Class::Storage::DBI> needs it, and any
598 L<connect_info|DBIx::Class::Storage::DBI/connect_info>
599 options merged with the master, with replicant opts having higher priority.
600
601 =cut
602
603 around connect_replicants => sub {
604   my ($next, $self, @args) = @_;
605
606   for my $r (@args) {
607     $r = [ $r ] unless reftype $r eq 'ARRAY';
608
609     $self->throw_exception('coderef replicant connect_info not supported')
610       if ref $r->[0] && reftype $r->[0] eq 'CODE';
611
612 # any connect_info options?
613     my $i = 0;
614     $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
615
616 # make one if none
617     $r->[$i] = {} unless $r->[$i];
618
619 # merge if two hashes
620     my @hashes = @$r[$i .. $#{$r}];
621
622     $self->throw_exception('invalid connect_info options')
623       if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes;
624
625     $self->throw_exception('too many hashrefs in connect_info')
626       if @hashes > 2;
627
628     my $merge = Hash::Merge->new('LEFT_PRECEDENT');
629     my %opts = %{ $merge->merge(reverse @hashes) };
630
631 # delete them
632     splice @$r, $i+1, ($#{$r} - $i), ();
633
634 # make sure master/replicants opts don't clash
635     my %master_opts = %{ $self->_master_connect_info_opts };
636     if (exists $opts{dbh_maker}) {
637         delete @master_opts{qw/dsn user password/};
638     }
639     delete $master_opts{dbh_maker};
640
641 # merge with master
642     %opts = %{ $merge->merge(\%opts, \%master_opts) };
643
644 # update
645     $r->[$i] = \%opts;
646   }
647
648   $self->$next($self->schema, @args);
649 };
650
651 =head2 all_storages
652
653 Returns an array of all the connected storage backends.  The first element
654 in the returned array is the master, and the rest are each of the
655 replicants.
656
657 =cut
658
659 sub all_storages {
660   my $self = shift @_;
661   return grep {defined $_ && blessed $_} (
662      $self->master,
663      values %{ $self->replicants },
664   );
665 }
666
667 =head2 execute_reliably ($coderef, ?@args)
668
669 Given a coderef, saves the current state of the L</read_handler>, forces it to
670 use reliable storage (e.g. sets it to the master), executes a coderef and then
671 restores the original state.
672
673 Example:
674
675   my $reliably = sub {
676     my $name = shift @_;
677     $schema->resultset('User')->create({name=>$name});
678     my $user_rs = $schema->resultset('User')->find({name=>$name});
679     return $user_rs;
680   };
681
682   my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
683
684 Use this when you must be certain of your database state, such as when you just
685 inserted something and need to get a resultset including it, etc.
686
687 =cut
688
689 sub execute_reliably {
690   my $self = shift;
691   my $coderef = shift;
692
693   unless( ref $coderef eq 'CODE') {
694     $self->throw_exception('Second argument must be a coderef');
695   }
696
697   ## replace the current read handler for the remainder of the scope
698   local $self->{read_handler} = $self->master;
699
700   my $args = \@_;
701   return try {
702     $coderef->(@$args);
703   } catch {
704     $self->throw_exception("coderef returned an error: $_");
705   };
706 }
707
708 =head2 set_reliable_storage
709
710 Sets the current $schema to be 'reliable', that is all queries, both read and
711 write are sent to the master
712
713 =cut
714
715 sub set_reliable_storage {
716   my $self = shift @_;
717   my $schema = $self->schema;
718   my $write_handler = $self->schema->storage->write_handler;
719
720   $schema->storage->read_handler($write_handler);
721 }
722
723 =head2 set_balanced_storage
724
725 Sets the current $schema to be use the </balancer> for all reads, while all
726 writes are sent to the master only
727
728 =cut
729
730 sub set_balanced_storage {
731   my $self = shift @_;
732   my $schema = $self->schema;
733   my $balanced_handler = $self->schema->storage->balancer;
734
735   $schema->storage->read_handler($balanced_handler);
736 }
737
738 =head2 connected
739
740 Check that the master and at least one of the replicants is connected.
741
742 =cut
743
744 sub connected {
745   my $self = shift @_;
746   return
747     $self->master->connected &&
748     $self->pool->connected_replicants;
749 }
750
751 =head2 ensure_connected
752
753 Make sure all the storages are connected.
754
755 =cut
756
757 sub ensure_connected {
758   my $self = shift @_;
759   foreach my $source ($self->all_storages) {
760     $source->ensure_connected(@_);
761   }
762 }
763
764 =head2 limit_dialect
765
766 Set the limit_dialect for all existing storages
767
768 =cut
769
770 sub limit_dialect {
771   my $self = shift @_;
772   foreach my $source ($self->all_storages) {
773     $source->limit_dialect(@_);
774   }
775   return $self->master->limit_dialect;
776 }
777
778 =head2 quote_char
779
780 Set the quote_char for all existing storages
781
782 =cut
783
784 sub quote_char {
785   my $self = shift @_;
786   foreach my $source ($self->all_storages) {
787     $source->quote_char(@_);
788   }
789   return $self->master->quote_char;
790 }
791
792 =head2 name_sep
793
794 Set the name_sep for all existing storages
795
796 =cut
797
798 sub name_sep {
799   my $self = shift @_;
800   foreach my $source ($self->all_storages) {
801     $source->name_sep(@_);
802   }
803   return $self->master->name_sep;
804 }
805
806 =head2 set_schema
807
808 Set the schema object for all existing storages
809
810 =cut
811
812 sub set_schema {
813   my $self = shift @_;
814   foreach my $source ($self->all_storages) {
815     $source->set_schema(@_);
816   }
817 }
818
819 =head2 debug
820
821 set a debug flag across all storages
822
823 =cut
824
825 sub debug {
826   my $self = shift @_;
827   if(@_) {
828     foreach my $source ($self->all_storages) {
829       $source->debug(@_);
830     }
831   }
832   return $self->master->debug;
833 }
834
835 =head2 debugobj
836
837 set a debug object
838
839 =cut
840
841 sub debugobj {
842   my $self = shift @_;
843   return $self->master->debugobj(@_);
844 }
845
846 =head2 debugfh
847
848 set a debugfh object
849
850 =cut
851
852 sub debugfh {
853   my $self = shift @_;
854   return $self->master->debugfh(@_);
855 }
856
857 =head2 debugcb
858
859 set a debug callback
860
861 =cut
862
863 sub debugcb {
864   my $self = shift @_;
865   return $self->master->debugcb(@_);
866 }
867
868 =head2 disconnect
869
870 disconnect everything
871
872 =cut
873
874 sub disconnect {
875   my $self = shift @_;
876   foreach my $source ($self->all_storages) {
877     $source->disconnect(@_);
878   }
879 }
880
881 =head2 cursor_class
882
883 set cursor class on all storages, or return master's
884
885 =cut
886
887 sub cursor_class {
888   my ($self, $cursor_class) = @_;
889
890   if ($cursor_class) {
891     $_->cursor_class($cursor_class) for $self->all_storages;
892   }
893   $self->master->cursor_class;
894 }
895
896 =head2 cursor
897
898 set cursor class on all storages, or return master's, alias for L</cursor_class>
899 above.
900
901 =cut
902
903 sub cursor {
904   my ($self, $cursor_class) = @_;
905
906   if ($cursor_class) {
907     $_->cursor($cursor_class) for $self->all_storages;
908   }
909   $self->master->cursor;
910 }
911
912 =head2 unsafe
913
914 sets the L<DBIx::Class::Storage::DBI/unsafe> option on all storages or returns
915 master's current setting
916
917 =cut
918
919 sub unsafe {
920   my $self = shift;
921
922   if (@_) {
923     $_->unsafe(@_) for $self->all_storages;
924   }
925
926   return $self->master->unsafe;
927 }
928
929 =head2 disable_sth_caching
930
931 sets the L<DBIx::Class::Storage::DBI/disable_sth_caching> option on all storages
932 or returns master's current setting
933
934 =cut
935
936 sub disable_sth_caching {
937   my $self = shift;
938
939   if (@_) {
940     $_->disable_sth_caching(@_) for $self->all_storages;
941   }
942
943   return $self->master->disable_sth_caching;
944 }
945
946 =head2 lag_behind_master
947
948 returns the highest Replicant L<DBIx::Class::Storage::DBI/lag_behind_master>
949 setting
950
951 =cut
952
953 sub lag_behind_master {
954   my $self = shift;
955
956   return max map $_->lag_behind_master, $self->replicants;
957 }
958
959 =head2 is_replicating
960
961 returns true if all replicants return true for
962 L<DBIx::Class::Storage::DBI/is_replicating>
963
964 =cut
965
966 sub is_replicating {
967   my $self = shift;
968
969   return (grep $_->is_replicating, $self->replicants) == ($self->replicants);
970 }
971
972 =head2 connect_call_datetime_setup
973
974 calls L<DBIx::Class::Storage::DBI/connect_call_datetime_setup> for all storages
975
976 =cut
977
978 sub connect_call_datetime_setup {
979   my $self = shift;
980   $_->connect_call_datetime_setup for $self->all_storages;
981 }
982
983 sub _populate_dbh {
984   my $self = shift;
985   $_->_populate_dbh for $self->all_storages;
986 }
987
988 sub _connect {
989   my $self = shift;
990   $_->_connect for $self->all_storages;
991 }
992
993 sub _rebless {
994   my $self = shift;
995   $_->_rebless for $self->all_storages;
996 }
997
998 sub _determine_driver {
999   my $self = shift;
1000   $_->_determine_driver for $self->all_storages;
1001 }
1002
1003 sub _driver_determined {
1004   my $self = shift;
1005
1006   if (@_) {
1007     $_->_driver_determined(@_) for $self->all_storages;
1008   }
1009
1010   return $self->master->_driver_determined;
1011 }
1012
1013 sub _init {
1014   my $self = shift;
1015
1016   $_->_init for $self->all_storages;
1017 }
1018
1019 sub _run_connection_actions {
1020   my $self = shift;
1021
1022   $_->_run_connection_actions for $self->all_storages;
1023 }
1024
1025 sub _do_connection_actions {
1026   my $self = shift;
1027
1028   if (@_) {
1029     $_->_do_connection_actions(@_) for $self->all_storages;
1030   }
1031 }
1032
1033 sub connect_call_do_sql {
1034   my $self = shift;
1035   $_->connect_call_do_sql(@_) for $self->all_storages;
1036 }
1037
1038 sub disconnect_call_do_sql {
1039   my $self = shift;
1040   $_->disconnect_call_do_sql(@_) for $self->all_storages;
1041 }
1042
1043 sub _seems_connected {
1044   my $self = shift;
1045
1046   return min map $_->_seems_connected, $self->all_storages;
1047 }
1048
1049 sub _ping {
1050   my $self = shift;
1051
1052   return min map $_->_ping, $self->all_storages;
1053 }
1054
1055 # not using the normalized_version, because we want to preserve
1056 # version numbers much longer than the conventional xxx.yyyzzz
1057 my $numify_ver = sub {
1058   my $ver = shift;
1059   my @numparts = split /\D+/, $ver;
1060   my $format = '%d.' . (join '', ('%06d') x (@numparts - 1));
1061
1062   return sprintf $format, @numparts;
1063 };
1064 sub _server_info {
1065   my $self = shift;
1066
1067   if (not $self->_dbh_details->{info}) {
1068     $self->_dbh_details->{info} = (
1069       reduce { $a->[0] < $b->[0] ? $a : $b }
1070       map [ $numify_ver->($_->{dbms_version}), $_ ],
1071       map $_->_server_info, $self->all_storages
1072     )->[1];
1073   }
1074
1075   return $self->next::method;
1076 }
1077
1078 sub _get_server_version {
1079   my $self = shift;
1080
1081   return $self->_server_info->{dbms_version};
1082 }
1083
1084 =head1 GOTCHAS
1085
1086 Due to the fact that replicants can lag behind a master, you must take care to
1087 make sure you use one of the methods to force read queries to a master should
1088 you need realtime data integrity.  For example, if you insert a row, and then
1089 immediately re-read it from the database (say, by doing
1090 L<< $result->discard_changes|DBIx::Class::Row/discard_changes >>)
1091 or you insert a row and then immediately build a query that expects that row
1092 to be an item, you should force the master to handle reads.  Otherwise, due to
1093 the lag, there is no certainty your data will be in the expected state.
1094
1095 For data integrity, all transactions automatically use the master storage for
1096 all read and write queries.  Using a transaction is the preferred and recommended
1097 method to force the master to handle all read queries.
1098
1099 Otherwise, you can force a single query to use the master with the 'force_pool'
1100 attribute:
1101
1102   my $result = $resultset->search(undef, {force_pool=>'master'})->find($pk);
1103
1104 This attribute will safely be ignored by non replicated storages, so you can use
1105 the same code for both types of systems.
1106
1107 Lastly, you can use the L</execute_reliably> method, which works very much like
1108 a transaction.
1109
1110 For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
1111 and L</set_balanced_storage>, however this operates at a global level and is not
1112 suitable if you have a shared Schema object being used by multiple processes,
1113 such as on a web application server.  You can get around this limitation by
1114 using the Schema clone method.
1115
1116   my $new_schema = $schema->clone;
1117   $new_schema->set_reliable_storage;
1118
1119   ## $new_schema will use only the Master storage for all reads/writes while
1120   ## the $schema object will use replicated storage.
1121
1122 =head1 FURTHER QUESTIONS?
1123
1124 Check the list of L<additional DBIC resources|DBIx::Class/GETTING HELP/SUPPORT>.
1125
1126 =head1 COPYRIGHT AND LICENSE
1127
1128 This module is free software L<copyright|DBIx::Class/COPYRIGHT AND LICENSE>
1129 by the L<DBIx::Class (DBIC) authors|DBIx::Class/AUTHORS>. You can
1130 redistribute it and/or modify it under the same terms as the
1131 L<DBIx::Class library|DBIx::Class/COPYRIGHT AND LICENSE>.
1132
1133 =cut
1134
1135 __PACKAGE__->meta->make_immutable;
1136
1137 1;