8328cf3d11e83bc502dbf094deb52c834175b22c
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 use warnings;
4 use strict;
5
6 BEGIN {
7   require DBIx::Class::Optional::Dependencies;
8   if ( my $missing = DBIx::Class::Optional::Dependencies->req_missing_for('replicated') ) {
9     die "The following modules are required for Replicated storage support: $missing\n";
10   }
11 }
12
13 use Moose;
14 use DBIx::Class::Storage::DBI;
15 use DBIx::Class::Storage::DBI::Replicated::Pool;
16 use DBIx::Class::Storage::DBI::Replicated::Balancer;
17 use DBIx::Class::Storage::DBI::Replicated::Types qw/BalancerClassNamePart DBICSchema DBICStorageDBI/;
18 use MooseX::Types::Moose qw/ClassName HashRef Object/;
19 use Scalar::Util 'reftype';
20 use Hash::Merge;
21 use List::Util qw/min max reduce/;
22 use Context::Preserve 'preserve_context';
23 use Try::Tiny;
24
25 use namespace::clean -except => 'meta';
26
27 =head1 NAME
28
29 DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
30
31 =head1 SYNOPSIS
32
33 The Following example shows how to change an existing $schema to a replicated
34 storage type, add some replicated (read-only) databases, and perform reporting
35 tasks.
36
37 You should set the 'storage_type attribute to a replicated type.  You should
38 also define your arguments, such as which balancer you want and any arguments
39 that the Pool object should get.
40
41   my $schema = Schema::Class->clone;
42   $schema->storage_type(['::DBI::Replicated', { balancer_type => '::Random' }]);
43   $schema->connection(...);
44
45 Next, you need to add in the Replicants.  Basically this is an array of
46 arrayrefs, where each arrayref is database connect information.  Think of these
47 arguments as what you'd pass to the 'normal' $schema->connect method.
48
49   $schema->storage->connect_replicants(
50     [$dsn1, $user, $pass, \%opts],
51     [$dsn2, $user, $pass, \%opts],
52     [$dsn3, $user, $pass, \%opts],
53   );
54
55 Now, just use the $schema as you normally would.  Automatically all reads will
56 be delegated to the replicants, while writes to the master.
57
58   $schema->resultset('Source')->search({name=>'etc'});
59
60 You can force a given query to use a particular storage using the search
61 attribute 'force_pool'.  For example:
62
63   my $rs = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
64
65 Now $rs will force everything (both reads and writes) to use whatever was setup
66 as the master storage.  'master' is hardcoded to always point to the Master,
67 but you can also use any Replicant name.  Please see:
68 L<DBIx::Class::Storage::DBI::Replicated::Pool> and the replicants attribute for more.
69
70 Also see transactions and L</execute_reliably> for alternative ways to
71 force read traffic to the master.  In general, you should wrap your statements
72 in a transaction when you are reading and writing to the same tables at the
73 same time, since your replicants will often lag a bit behind the master.
74
75 If you have a multi-statement read only transaction you can force it to select
76 a random server in the pool by:
77
78   my $rs = $schema->resultset('Source')->search( undef,
79     { force_pool => $db->storage->read_handler->next_storage }
80   );
81
82 =head1 DESCRIPTION
83
84 Warning: This class is marked BETA.  This has been running a production
85 website using MySQL native replication as its backend and we have some decent
86 test coverage but the code hasn't yet been stressed by a variety of databases.
87 Individual DBs may have quirks we are not aware of.  Please use this in first
88 development and pass along your experiences/bug fixes.
89
90 This class implements replicated data store for DBI. Currently you can define
91 one master and numerous slave database connections. All write-type queries
92 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
93 database, all read-type queries (SELECTs) go to the slave database.
94
95 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
96 handle gets delegated to one of the two attributes: L</read_handler> or to
97 L</write_handler>.  Additionally, some methods need to be distributed
98 to all existing storages.  This way our storage class is a drop in replacement
99 for L<DBIx::Class::Storage::DBI>.
100
101 Read traffic is spread across the replicants (slaves) occurring to a user
102 selected algorithm.  The default algorithm is random weighted.
103
104 =head1 NOTES
105
106 The consistency between master and replicants is database specific.  The Pool
107 gives you a method to validate its replicants, removing and replacing them
108 when they fail/pass predefined criteria.  Please make careful use of the ways
109 to force a query to run against Master when needed.
110
111 =head1 REQUIREMENTS
112
113 Replicated Storage has additional requirements not currently part of
114 L<DBIx::Class>. See L<DBIx::Class::Optional::Dependencies> for more details.
115
116 =head1 ATTRIBUTES
117
118 This class defines the following attributes.
119
120 =head2 schema
121
122 The underlying L<DBIx::Class::Schema> object this storage is attaching
123
124 =cut
125
126 has 'schema' => (
127     is=>'rw',
128     isa=>DBICSchema,
129     weak_ref=>1,
130     required=>1,
131 );
132
133 =head2 pool_type
134
135 Contains the classname which will instantiate the L</pool> object.  Defaults
136 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
137
138 =cut
139
140 has 'pool_type' => (
141   is=>'rw',
142   isa=>ClassName,
143   default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
144   handles=>{
145     'create_pool' => 'new',
146   },
147 );
148
149 =head2 pool_args
150
151 Contains a hashref of initialized information to pass to the Balancer object.
152 See L<DBIx::Class::Storage::DBI::Replicated::Pool> for available arguments.
153
154 =cut
155
156 has 'pool_args' => (
157   is=>'rw',
158   isa=>HashRef,
159   lazy=>1,
160   default=>sub { {} },
161 );
162
163
164 =head2 balancer_type
165
166 The replication pool requires a balance class to provider the methods for
167 choose how to spread the query load across each replicant in the pool.
168
169 =cut
170
171 has 'balancer_type' => (
172   is=>'rw',
173   isa=>BalancerClassNamePart,
174   coerce=>1,
175   required=>1,
176   default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
177   handles=>{
178     'create_balancer' => 'new',
179   },
180 );
181
182 =head2 balancer_args
183
184 Contains a hashref of initialized information to pass to the Balancer object.
185 See L<DBIx::Class::Storage::DBI::Replicated::Balancer> for available arguments.
186
187 =cut
188
189 has 'balancer_args' => (
190   is=>'rw',
191   isa=>HashRef,
192   lazy=>1,
193   required=>1,
194   default=>sub { {} },
195 );
196
197 =head2 pool
198
199 Is a L<DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
200 container class for one or more replicated databases.
201
202 =cut
203
204 has 'pool' => (
205   is=>'ro',
206   isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
207   lazy_build=>1,
208   handles=>[qw/
209     connect_replicants
210     replicants
211     has_replicants
212   /],
213 );
214
215 =head2 balancer
216
217 Is a L<DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This
218 is a class that takes a pool (L<DBIx::Class::Storage::DBI::Replicated::Pool>)
219
220 =cut
221
222 has 'balancer' => (
223   is=>'rw',
224   isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
225   lazy_build=>1,
226   handles=>[qw/auto_validate_every/],
227 );
228
229 =head2 master
230
231 The master defines the canonical state for a pool of connected databases.  All
232 the replicants are expected to match this databases state.  Thus, in a classic
233 Master / Slaves distributed system, all the slaves are expected to replicate
234 the Master's state as quick as possible.  This is the only database in the
235 pool of databases that is allowed to handle write traffic.
236
237 =cut
238
239 has 'master' => (
240   is=> 'ro',
241   isa=>DBICStorageDBI,
242   lazy_build=>1,
243 );
244
245 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
246
247 The following methods are delegated all the methods required for the
248 L<DBIx::Class::Storage::DBI> interface.
249
250 =cut
251
252 my $method_dispatch = {
253   writer => [qw/
254     on_connect_do
255     on_disconnect_do
256     on_connect_call
257     on_disconnect_call
258     connect_info
259     _connect_info
260     throw_exception
261     sql_maker
262     sqlt_type
263     create_ddl_dir
264     deployment_statements
265     datetime_parser
266     datetime_parser_type
267     build_datetime_parser
268     last_insert_id
269     insert
270     update
271     delete
272     dbh
273     txn_begin
274     txn_do
275     txn_commit
276     txn_rollback
277     txn_scope_guard
278     _exec_txn_rollback
279     _exec_txn_begin
280     _exec_txn_commit
281     deploy
282     with_deferred_fk_checks
283     dbh_do
284     _prep_for_execute
285     is_datatype_numeric
286     _count_select
287     svp_rollback
288     svp_begin
289     svp_release
290     relname_to_table_alias
291     _dbh_last_insert_id
292     _default_dbi_connect_attributes
293     _dbi_connect_info
294     _dbic_connect_attributes
295     auto_savepoint
296     _query_start
297     _query_end
298     _format_for_trace
299     _dbi_attrs_for_bind
300     bind_attribute_by_data_type
301     transaction_depth
302     _dbh
303     _select_args
304     _dbh_execute_for_fetch
305     _sql_maker
306     _dbh_execute_inserts_with_no_binds
307     _select_args_to_query
308     _gen_sql_bind
309     _svp_generate_name
310     _normalize_connect_info
311     _parse_connect_do
312     savepoints
313     _sql_maker_opts
314     _use_multicolumn_in
315     _conn_pid
316     _dbh_autocommit
317     _native_data_type
318     _get_dbh
319     sql_maker_class
320     insert_bulk
321     _insert_bulk
322     _execute
323     _do_query
324     _dbh_execute
325   /, Class::MOP::Class->initialize('DBIx::Class::Storage::DBIHacks')->get_method_list ],
326   reader => [qw/
327     select
328     select_single
329     columns_info_for
330     _dbh_columns_info_for
331     _select
332   /],
333   unimplemented => [qw/
334     _arm_global_destructor
335     _verify_pid
336     __delicate_rollback
337
338     get_use_dbms_capability
339     set_use_dbms_capability
340     get_dbms_capability
341     set_dbms_capability
342     _dbh_details
343     _dbh_get_info
344     _get_rdbms_name
345
346     _determine_connector_driver
347     _extract_driver_from_connect_info
348     _describe_connection
349     _warn_undetermined_driver
350
351     sql_limit_dialect
352     sql_quote_char
353     sql_name_sep
354
355     _prefetch_autovalues
356     _perform_autoinc_retrieval
357     _autoinc_supplied_for_op
358
359     _resolve_bindattrs
360
361     _max_column_bytesize
362     _is_lob_type
363     _is_binary_lob_type
364     _is_binary_type
365     _is_text_lob_type
366
367     _prepare_sth
368     _bind_sth_params
369   /,(
370     # the capability framework
371     # not sure if CMOP->initialize does evil things to DBIC::S::DBI, fix if a problem
372     grep
373       { $_ =~ /^ _ (?: use | supports | determine_supports ) _ /x and $_ ne '_use_multicolumn_in' }
374       ( Class::MOP::Class->initialize('DBIx::Class::Storage::DBI')->get_all_method_names )
375   )],
376 };
377
378 if (DBIx::Class::_ENV_::DBICTEST) {
379
380   my $seen;
381   for my $type (keys %$method_dispatch) {
382     for (@{$method_dispatch->{$type}}) {
383       push @{$seen->{$_}}, $type;
384     }
385   }
386
387   if (my @dupes = grep { @{$seen->{$_}} > 1 } keys %$seen) {
388     die(join "\n", '',
389       'The following methods show up multiple times in ::Storage::DBI::Replicated handlers:',
390       (map { "$_: " . (join ', ', @{$seen->{$_}}) } sort @dupes),
391       '',
392     );
393   }
394
395   if (my @cant = grep { ! DBIx::Class::Storage::DBI->can($_) } keys %$seen) {
396     die(join "\n", '',
397       '::Storage::DBI::Replicated specifies handling of the following *NON EXISTING* ::Storage::DBI methods:',
398       @cant,
399       '',
400     );
401   }
402 }
403
404 for my $method (@{$method_dispatch->{unimplemented}}) {
405   __PACKAGE__->meta->add_method($method, sub {
406     my $self = shift;
407     $self->throw_exception("$method() must not be called on ".(blessed $self).' objects');
408   });
409 }
410
411 =head2 read_handler
412
413 Defines an object that implements the read side of L<DBIx::Class::Storage::DBI>.
414
415 =cut
416
417 has 'read_handler' => (
418   is=>'rw',
419   isa=>Object,
420   lazy_build=>1,
421   handles=>$method_dispatch->{reader},
422 );
423
424 =head2 write_handler
425
426 Defines an object that implements the write side of L<DBIx::Class::Storage::DBI>,
427 as well as methods that don't write or read that can be called on only one
428 storage, methods that return a C<$dbh>, and any methods that don't make sense to
429 run on a replicant.
430
431 =cut
432
433 has 'write_handler' => (
434   is=>'ro',
435   isa=>Object,
436   lazy_build=>1,
437   handles=>$method_dispatch->{writer},
438 );
439
440
441
442 has _master_connect_info_opts =>
443   (is => 'rw', isa => HashRef, default => sub { {} });
444
445 =head2 around: connect_info
446
447 Preserves master's C<connect_info> options (for merging with replicants.)
448 Also sets any Replicated-related options from connect_info, such as
449 C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>.
450
451 =cut
452
453 around connect_info => sub {
454   my ($next, $self, $info, @extra) = @_;
455
456   $self->throw_exception(
457     'connect_info can not be retrieved from a replicated storage - '
458   . 'accessor must be called on a specific pool instance'
459   ) unless defined $info;
460
461   my $merge = Hash::Merge->new('LEFT_PRECEDENT');
462
463   my %opts;
464   for my $arg (@$info) {
465     next unless (reftype($arg)||'') eq 'HASH';
466     %opts = %{ $merge->merge($arg, \%opts) };
467   }
468   delete $opts{dsn};
469
470   if (@opts{qw/pool_type pool_args/}) {
471     $self->pool_type(delete $opts{pool_type})
472       if $opts{pool_type};
473
474     $self->pool_args(
475       $merge->merge((delete $opts{pool_args} || {}), $self->pool_args)
476     );
477
478     ## Since we possibly changed the pool_args, we need to clear the current
479     ## pool object so that next time it is used it will be rebuilt.
480     $self->clear_pool;
481   }
482
483   if (@opts{qw/balancer_type balancer_args/}) {
484     $self->balancer_type(delete $opts{balancer_type})
485       if $opts{balancer_type};
486
487     $self->balancer_args(
488       $merge->merge((delete $opts{balancer_args} || {}), $self->balancer_args)
489     );
490
491     $self->balancer($self->_build_balancer)
492       if $self->balancer;
493   }
494
495   $self->_master_connect_info_opts(\%opts);
496
497   return preserve_context {
498     $self->$next($info, @extra);
499   } after => sub {
500     # Make sure master is blessed into the correct class and apply role to it.
501     my $master = $self->master;
502     $master->_determine_driver;
503     Moose::Meta::Class->initialize(ref $master);
504
505     DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master);
506
507     # link pool back to master
508     $self->pool->master($master);
509   };
510 };
511
512 =head1 METHODS
513
514 This class defines the following methods.
515
516 =head2 BUILDARGS
517
518 L<DBIx::Class::Schema> when instantiating its storage passed itself as the
519 first argument.  So we need to massage the arguments a bit so that all the
520 bits get put into the correct places.
521
522 =cut
523
524 sub BUILDARGS {
525   my ($class, $schema, $storage_type_args, @args) = @_;
526
527   return {
528     schema=>$schema,
529     %$storage_type_args,
530     @args
531   }
532 }
533
534 =head2 _build_master
535
536 Lazy builder for the L</master> attribute.
537
538 =cut
539
540 sub _build_master {
541   my $self = shift @_;
542   my $master = DBIx::Class::Storage::DBI->new($self->schema);
543   $master
544 }
545
546 =head2 _build_pool
547
548 Lazy builder for the L</pool> attribute.
549
550 =cut
551
552 sub _build_pool {
553   my $self = shift @_;
554   $self->create_pool(%{$self->pool_args});
555 }
556
557 =head2 _build_balancer
558
559 Lazy builder for the L</balancer> attribute.  This takes a Pool object so that
560 the balancer knows which pool it's balancing.
561
562 =cut
563
564 sub _build_balancer {
565   my $self = shift @_;
566   $self->create_balancer(
567     pool=>$self->pool,
568     master=>$self->master,
569     %{$self->balancer_args},
570   );
571 }
572
573 =head2 _build_write_handler
574
575 Lazy builder for the L</write_handler> attribute.  The default is to set this to
576 the L</master>.
577
578 =cut
579
580 sub _build_write_handler {
581   return shift->master;
582 }
583
584 =head2 _build_read_handler
585
586 Lazy builder for the L</read_handler> attribute.  The default is to set this to
587 the L</balancer>.
588
589 =cut
590
591 sub _build_read_handler {
592   return shift->balancer;
593 }
594
595 =head2 around: connect_replicants
596
597 All calls to connect_replicants needs to have an existing $schema tacked onto
598 top of the args, since L<DBIx::Class::Storage::DBI> needs it, and any
599 L<connect_info|DBIx::Class::Storage::DBI/connect_info>
600 options merged with the master, with replicant opts having higher priority.
601
602 =cut
603
604 around connect_replicants => sub {
605   my ($next, $self, @args) = @_;
606
607   for my $r (@args) {
608     $r = [ $r ] unless reftype $r eq 'ARRAY';
609
610     $self->throw_exception('coderef replicant connect_info not supported')
611       if ref $r->[0] && reftype $r->[0] eq 'CODE';
612
613 # any connect_info options?
614     my $i = 0;
615     $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
616
617 # make one if none
618     $r->[$i] = {} unless $r->[$i];
619
620 # merge if two hashes
621     my @hashes = @$r[$i .. $#{$r}];
622
623     $self->throw_exception('invalid connect_info options')
624       if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes;
625
626     $self->throw_exception('too many hashrefs in connect_info')
627       if @hashes > 2;
628
629     my $merge = Hash::Merge->new('LEFT_PRECEDENT');
630     my %opts = %{ $merge->merge(reverse @hashes) };
631
632 # delete them
633     splice @$r, $i+1, ($#{$r} - $i), ();
634
635 # make sure master/replicants opts don't clash
636     my %master_opts = %{ $self->_master_connect_info_opts };
637     if (exists $opts{dbh_maker}) {
638         delete @master_opts{qw/dsn user password/};
639     }
640     delete $master_opts{dbh_maker};
641
642 # merge with master
643     %opts = %{ $merge->merge(\%opts, \%master_opts) };
644
645 # update
646     $r->[$i] = \%opts;
647   }
648
649   $self->$next($self->schema, @args);
650 };
651
652 =head2 all_storages
653
654 Returns an array of all the connected storage backends.  The first element
655 in the returned array is the master, and the rest are each of the
656 replicants.
657
658 =cut
659
660 sub all_storages {
661   my $self = shift @_;
662   return grep {defined $_ && blessed $_} (
663      $self->master,
664      values %{ $self->replicants },
665   );
666 }
667
668 =head2 execute_reliably ($coderef, ?@args)
669
670 Given a coderef, saves the current state of the L</read_handler>, forces it to
671 use reliable storage (e.g. sets it to the master), executes a coderef and then
672 restores the original state.
673
674 Example:
675
676   my $reliably = sub {
677     my $name = shift @_;
678     $schema->resultset('User')->create({name=>$name});
679     my $user_rs = $schema->resultset('User')->find({name=>$name});
680     return $user_rs;
681   };
682
683   my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
684
685 Use this when you must be certain of your database state, such as when you just
686 inserted something and need to get a resultset including it, etc.
687
688 =cut
689
690 sub execute_reliably {
691   my $self = shift;
692   my $coderef = shift;
693
694   unless( ref $coderef eq 'CODE') {
695     $self->throw_exception('Second argument must be a coderef');
696   }
697
698   ## replace the current read handler for the remainder of the scope
699   local $self->{read_handler} = $self->master;
700
701   my $args = \@_;
702   return try {
703     $coderef->(@$args);
704   } catch {
705     $self->throw_exception("coderef returned an error: $_");
706   };
707 }
708
709 =head2 set_reliable_storage
710
711 Sets the current $schema to be 'reliable', that is all queries, both read and
712 write are sent to the master
713
714 =cut
715
716 sub set_reliable_storage {
717   my $self = shift @_;
718   my $schema = $self->schema;
719   my $write_handler = $self->schema->storage->write_handler;
720
721   $schema->storage->read_handler($write_handler);
722 }
723
724 =head2 set_balanced_storage
725
726 Sets the current $schema to be use the </balancer> for all reads, while all
727 writes are sent to the master only
728
729 =cut
730
731 sub set_balanced_storage {
732   my $self = shift @_;
733   my $schema = $self->schema;
734   my $balanced_handler = $self->schema->storage->balancer;
735
736   $schema->storage->read_handler($balanced_handler);
737 }
738
739 =head2 connected
740
741 Check that the master and at least one of the replicants is connected.
742
743 =cut
744
745 sub connected {
746   my $self = shift @_;
747   return
748     $self->master->connected &&
749     $self->pool->connected_replicants;
750 }
751
752 =head2 ensure_connected
753
754 Make sure all the storages are connected.
755
756 =cut
757
758 sub ensure_connected {
759   my $self = shift @_;
760   foreach my $source ($self->all_storages) {
761     $source->ensure_connected(@_);
762   }
763 }
764
765 =head2 limit_dialect
766
767 Set the limit_dialect for all existing storages
768
769 =cut
770
771 sub limit_dialect {
772   my $self = shift @_;
773   foreach my $source ($self->all_storages) {
774     $source->limit_dialect(@_);
775   }
776   return $self->master->limit_dialect;
777 }
778
779 =head2 quote_char
780
781 Set the quote_char for all existing storages
782
783 =cut
784
785 sub quote_char {
786   my $self = shift @_;
787   foreach my $source ($self->all_storages) {
788     $source->quote_char(@_);
789   }
790   return $self->master->quote_char;
791 }
792
793 =head2 name_sep
794
795 Set the name_sep for all existing storages
796
797 =cut
798
799 sub name_sep {
800   my $self = shift @_;
801   foreach my $source ($self->all_storages) {
802     $source->name_sep(@_);
803   }
804   return $self->master->name_sep;
805 }
806
807 =head2 set_schema
808
809 Set the schema object for all existing storages
810
811 =cut
812
813 sub set_schema {
814   my $self = shift @_;
815   foreach my $source ($self->all_storages) {
816     $source->set_schema(@_);
817   }
818 }
819
820 =head2 debug
821
822 set a debug flag across all storages
823
824 =cut
825
826 sub debug {
827   my $self = shift @_;
828   if(@_) {
829     foreach my $source ($self->all_storages) {
830       $source->debug(@_);
831     }
832   }
833   return $self->master->debug;
834 }
835
836 =head2 debugobj
837
838 set a debug object
839
840 =cut
841
842 sub debugobj {
843   my $self = shift @_;
844   return $self->master->debugobj(@_);
845 }
846
847 =head2 debugfh
848
849 set a debugfh object
850
851 =cut
852
853 sub debugfh {
854   my $self = shift @_;
855   return $self->master->debugfh(@_);
856 }
857
858 =head2 debugcb
859
860 set a debug callback
861
862 =cut
863
864 sub debugcb {
865   my $self = shift @_;
866   return $self->master->debugcb(@_);
867 }
868
869 =head2 disconnect
870
871 disconnect everything
872
873 =cut
874
875 sub disconnect {
876   my $self = shift @_;
877   foreach my $source ($self->all_storages) {
878     $source->disconnect(@_);
879   }
880 }
881
882 =head2 cursor_class
883
884 set cursor class on all storages, or return master's
885
886 =cut
887
888 sub cursor_class {
889   my ($self, $cursor_class) = @_;
890
891   if ($cursor_class) {
892     $_->cursor_class($cursor_class) for $self->all_storages;
893   }
894   $self->master->cursor_class;
895 }
896
897 =head2 cursor
898
899 set cursor class on all storages, or return master's, alias for L</cursor_class>
900 above.
901
902 =cut
903
904 sub cursor {
905   my ($self, $cursor_class) = @_;
906
907   if ($cursor_class) {
908     $_->cursor($cursor_class) for $self->all_storages;
909   }
910   $self->master->cursor;
911 }
912
913 =head2 unsafe
914
915 sets the L<DBIx::Class::Storage::DBI/unsafe> option on all storages or returns
916 master's current setting
917
918 =cut
919
920 sub unsafe {
921   my $self = shift;
922
923   if (@_) {
924     $_->unsafe(@_) for $self->all_storages;
925   }
926
927   return $self->master->unsafe;
928 }
929
930 =head2 disable_sth_caching
931
932 sets the L<DBIx::Class::Storage::DBI/disable_sth_caching> option on all storages
933 or returns master's current setting
934
935 =cut
936
937 sub disable_sth_caching {
938   my $self = shift;
939
940   if (@_) {
941     $_->disable_sth_caching(@_) for $self->all_storages;
942   }
943
944   return $self->master->disable_sth_caching;
945 }
946
947 =head2 lag_behind_master
948
949 returns the highest Replicant L<DBIx::Class::Storage::DBI/lag_behind_master>
950 setting
951
952 =cut
953
954 sub lag_behind_master {
955   my $self = shift;
956
957   return max map $_->lag_behind_master, $self->replicants;
958 }
959
960 =head2 is_replicating
961
962 returns true if all replicants return true for
963 L<DBIx::Class::Storage::DBI/is_replicating>
964
965 =cut
966
967 sub is_replicating {
968   my $self = shift;
969
970   return (grep $_->is_replicating, $self->replicants) == ($self->replicants);
971 }
972
973 =head2 connect_call_datetime_setup
974
975 calls L<DBIx::Class::Storage::DBI/connect_call_datetime_setup> for all storages
976
977 =cut
978
979 sub connect_call_datetime_setup {
980   my $self = shift;
981   $_->connect_call_datetime_setup for $self->all_storages;
982 }
983
984 sub _populate_dbh {
985   my $self = shift;
986   $_->_populate_dbh for $self->all_storages;
987 }
988
989 sub _connect {
990   my $self = shift;
991   $_->_connect for $self->all_storages;
992 }
993
994 sub _rebless {
995   my $self = shift;
996   $_->_rebless for $self->all_storages;
997 }
998
999 sub _determine_driver {
1000   my $self = shift;
1001   $_->_determine_driver for $self->all_storages;
1002 }
1003
1004 sub _driver_determined {
1005   my $self = shift;
1006
1007   if (@_) {
1008     $_->_driver_determined(@_) for $self->all_storages;
1009   }
1010
1011   return $self->master->_driver_determined;
1012 }
1013
1014 sub _init {
1015   my $self = shift;
1016
1017   $_->_init for $self->all_storages;
1018 }
1019
1020 sub _run_connection_actions {
1021   my $self = shift;
1022
1023   $_->_run_connection_actions for $self->all_storages;
1024 }
1025
1026 sub _do_connection_actions {
1027   my $self = shift;
1028
1029   if (@_) {
1030     $_->_do_connection_actions(@_) for $self->all_storages;
1031   }
1032 }
1033
1034 sub connect_call_do_sql {
1035   my $self = shift;
1036   $_->connect_call_do_sql(@_) for $self->all_storages;
1037 }
1038
1039 sub disconnect_call_do_sql {
1040   my $self = shift;
1041   $_->disconnect_call_do_sql(@_) for $self->all_storages;
1042 }
1043
1044 sub _seems_connected {
1045   my $self = shift;
1046
1047   return min map $_->_seems_connected, $self->all_storages;
1048 }
1049
1050 sub _ping {
1051   my $self = shift;
1052
1053   return min map $_->_ping, $self->all_storages;
1054 }
1055
1056 # not using the normalized_version, because we want to preserve
1057 # version numbers much longer than the conventional xxx.yyyzzz
1058 my $numify_ver = sub {
1059   my $ver = shift;
1060   my @numparts = split /\D+/, $ver;
1061   my $format = '%d.' . (join '', ('%06d') x (@numparts - 1));
1062
1063   return sprintf $format, @numparts;
1064 };
1065 sub _server_info {
1066   my $self = shift;
1067
1068   if (not $self->_dbh_details->{info}) {
1069     $self->_dbh_details->{info} = (
1070       reduce { $a->[0] < $b->[0] ? $a : $b }
1071       map [ $numify_ver->($_->{dbms_version}), $_ ],
1072       map $_->_server_info, $self->all_storages
1073     )->[1];
1074   }
1075
1076   return $self->next::method;
1077 }
1078
1079 sub _get_server_version {
1080   my $self = shift;
1081
1082   return $self->_server_info->{dbms_version};
1083 }
1084
1085 =head1 GOTCHAS
1086
1087 Due to the fact that replicants can lag behind a master, you must take care to
1088 make sure you use one of the methods to force read queries to a master should
1089 you need realtime data integrity.  For example, if you insert a row, and then
1090 immediately re-read it from the database (say, by doing
1091 L<< $result->discard_changes|DBIx::Class::Row/discard_changes >>)
1092 or you insert a row and then immediately build a query that expects that row
1093 to be an item, you should force the master to handle reads.  Otherwise, due to
1094 the lag, there is no certainty your data will be in the expected state.
1095
1096 For data integrity, all transactions automatically use the master storage for
1097 all read and write queries.  Using a transaction is the preferred and recommended
1098 method to force the master to handle all read queries.
1099
1100 Otherwise, you can force a single query to use the master with the 'force_pool'
1101 attribute:
1102
1103   my $result = $resultset->search(undef, {force_pool=>'master'})->find($pk);
1104
1105 This attribute will safely be ignored by non replicated storages, so you can use
1106 the same code for both types of systems.
1107
1108 Lastly, you can use the L</execute_reliably> method, which works very much like
1109 a transaction.
1110
1111 For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
1112 and L</set_balanced_storage>, however this operates at a global level and is not
1113 suitable if you have a shared Schema object being used by multiple processes,
1114 such as on a web application server.  You can get around this limitation by
1115 using the Schema clone method.
1116
1117   my $new_schema = $schema->clone;
1118   $new_schema->set_reliable_storage;
1119
1120   ## $new_schema will use only the Master storage for all reads/writes while
1121   ## the $schema object will use replicated storage.
1122
1123 =head1 FURTHER QUESTIONS?
1124
1125 Check the list of L<additional DBIC resources|DBIx::Class/GETTING HELP/SUPPORT>.
1126
1127 =head1 COPYRIGHT AND LICENSE
1128
1129 This module is free software L<copyright|DBIx::Class/COPYRIGHT AND LICENSE>
1130 by the L<DBIx::Class (DBIC) authors|DBIx::Class/AUTHORS>. You can
1131 redistribute it and/or modify it under the same terms as the
1132 L<DBIx::Class library|DBIx::Class/COPYRIGHT AND LICENSE>.
1133
1134 =cut
1135
1136 __PACKAGE__->meta->make_immutable;
1137
1138 1;