231b5a557264ed4e45d3a85c50fba638d733bf2e
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 use warnings;
4 use strict;
5
6 BEGIN {
7   require DBIx::Class::Optional::Dependencies;
8   if ( my $missing = DBIx::Class::Optional::Dependencies->req_missing_for('replicated') ) {
9     die "The following modules are required for Replicated storage support: $missing\n";
10   }
11 }
12
13 use Moose;
14 use DBIx::Class::Storage::DBI;
15 use DBIx::Class::Storage::DBI::Replicated::Pool;
16 use DBIx::Class::Storage::DBI::Replicated::Balancer;
17 use DBIx::Class::Storage::DBI::Replicated::Types qw/BalancerClassNamePart DBICSchema DBICStorageDBI/;
18 use MooseX::Types::Moose qw/ClassName HashRef Object/;
19 use Scalar::Util 'reftype';
20 use Hash::Merge;
21 use List::Util qw/min max reduce/;
22 use Context::Preserve 'preserve_context';
23 use Try::Tiny;
24
25 use namespace::clean -except => 'meta';
26
27 =head1 NAME
28
29 DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
30
31 =head1 SYNOPSIS
32
33 The Following example shows how to change an existing $schema to a replicated
34 storage type, add some replicated (read-only) databases, and perform reporting
35 tasks.
36
37 You should set the 'storage_type attribute to a replicated type.  You should
38 also define your arguments, such as which balancer you want and any arguments
39 that the Pool object should get.
40
41   my $schema = Schema::Class->clone;
42   $schema->storage_type(['::DBI::Replicated', { balancer_type => '::Random' }]);
43   $schema->connection(...);
44
45 Next, you need to add in the Replicants.  Basically this is an array of
46 arrayrefs, where each arrayref is database connect information.  Think of these
47 arguments as what you'd pass to the 'normal' $schema->connect method.
48
49   $schema->storage->connect_replicants(
50     [$dsn1, $user, $pass, \%opts],
51     [$dsn2, $user, $pass, \%opts],
52     [$dsn3, $user, $pass, \%opts],
53   );
54
55 Now, just use the $schema as you normally would.  Automatically all reads will
56 be delegated to the replicants, while writes to the master.
57
58   $schema->resultset('Source')->search({name=>'etc'});
59
60 You can force a given query to use a particular storage using the search
61 attribute 'force_pool'.  For example:
62
63   my $rs = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
64
65 Now $rs will force everything (both reads and writes) to use whatever was setup
66 as the master storage.  'master' is hardcoded to always point to the Master,
67 but you can also use any Replicant name.  Please see:
68 L<DBIx::Class::Storage::DBI::Replicated::Pool> and the replicants attribute for more.
69
70 Also see transactions and L</execute_reliably> for alternative ways to
71 force read traffic to the master.  In general, you should wrap your statements
72 in a transaction when you are reading and writing to the same tables at the
73 same time, since your replicants will often lag a bit behind the master.
74
75 If you have a multi-statement read only transaction you can force it to select
76 a random server in the pool by:
77
78   my $rs = $schema->resultset('Source')->search( undef,
79     { force_pool => $db->storage->read_handler->next_storage }
80   );
81
82 =head1 DESCRIPTION
83
84 Warning: This class is marked BETA.  This has been running a production
85 website using MySQL native replication as its backend and we have some decent
86 test coverage but the code hasn't yet been stressed by a variety of databases.
87 Individual DBs may have quirks we are not aware of.  Please use this in first
88 development and pass along your experiences/bug fixes.
89
90 This class implements replicated data store for DBI. Currently you can define
91 one master and numerous slave database connections. All write-type queries
92 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
93 database, all read-type queries (SELECTs) go to the slave database.
94
95 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
96 handle gets delegated to one of the two attributes: L</read_handler> or to
97 L</write_handler>.  Additionally, some methods need to be distributed
98 to all existing storages.  This way our storage class is a drop in replacement
99 for L<DBIx::Class::Storage::DBI>.
100
101 Read traffic is spread across the replicants (slaves) occurring to a user
102 selected algorithm.  The default algorithm is random weighted.
103
104 =head1 NOTES
105
106 The consistency between master and replicants is database specific.  The Pool
107 gives you a method to validate its replicants, removing and replacing them
108 when they fail/pass predefined criteria.  Please make careful use of the ways
109 to force a query to run against Master when needed.
110
111 =head1 REQUIREMENTS
112
113 Replicated Storage has additional requirements not currently part of
114 L<DBIx::Class>. See L<DBIx::Class::Optional::Dependencies> for more details.
115
116 =head1 ATTRIBUTES
117
118 This class defines the following attributes.
119
120 =head2 schema
121
122 The underlying L<DBIx::Class::Schema> object this storage is attaching
123
124 =cut
125
126 has 'schema' => (
127     is=>'rw',
128     isa=>DBICSchema,
129     weak_ref=>1,
130     required=>1,
131 );
132
133 =head2 pool_type
134
135 Contains the classname which will instantiate the L</pool> object.  Defaults
136 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
137
138 =cut
139
140 has 'pool_type' => (
141   is=>'rw',
142   isa=>ClassName,
143   default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
144   handles=>{
145     'create_pool' => 'new',
146   },
147 );
148
149 =head2 pool_args
150
151 Contains a hashref of initialized information to pass to the Balancer object.
152 See L<DBIx::Class::Storage::DBI::Replicated::Pool> for available arguments.
153
154 =cut
155
156 has 'pool_args' => (
157   is=>'rw',
158   isa=>HashRef,
159   lazy=>1,
160   default=>sub { {} },
161 );
162
163
164 =head2 balancer_type
165
166 The replication pool requires a balance class to provider the methods for
167 choose how to spread the query load across each replicant in the pool.
168
169 =cut
170
171 has 'balancer_type' => (
172   is=>'rw',
173   isa=>BalancerClassNamePart,
174   coerce=>1,
175   required=>1,
176   default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
177   handles=>{
178     'create_balancer' => 'new',
179   },
180 );
181
182 =head2 balancer_args
183
184 Contains a hashref of initialized information to pass to the Balancer object.
185 See L<DBIx::Class::Storage::DBI::Replicated::Balancer> for available arguments.
186
187 =cut
188
189 has 'balancer_args' => (
190   is=>'rw',
191   isa=>HashRef,
192   lazy=>1,
193   required=>1,
194   default=>sub { {} },
195 );
196
197 =head2 pool
198
199 Is a L<DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
200 container class for one or more replicated databases.
201
202 =cut
203
204 has 'pool' => (
205   is=>'ro',
206   isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
207   lazy_build=>1,
208   handles=>[qw/
209     connect_replicants
210     replicants
211     has_replicants
212   /],
213 );
214
215 =head2 balancer
216
217 Is a L<DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This
218 is a class that takes a pool (L<DBIx::Class::Storage::DBI::Replicated::Pool>)
219
220 =cut
221
222 has 'balancer' => (
223   is=>'rw',
224   isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
225   lazy_build=>1,
226   handles=>[qw/auto_validate_every/],
227 );
228
229 =head2 master
230
231 The master defines the canonical state for a pool of connected databases.  All
232 the replicants are expected to match this databases state.  Thus, in a classic
233 Master / Slaves distributed system, all the slaves are expected to replicate
234 the Master's state as quick as possible.  This is the only database in the
235 pool of databases that is allowed to handle write traffic.
236
237 =cut
238
239 has 'master' => (
240   is=> 'ro',
241   isa=>DBICStorageDBI,
242   lazy_build=>1,
243 );
244
245 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
246
247 The following methods are delegated all the methods required for the
248 L<DBIx::Class::Storage::DBI> interface.
249
250 =cut
251
252 my $method_dispatch = {
253   writer => [qw/
254     on_connect_do
255     on_disconnect_do
256     on_connect_call
257     on_disconnect_call
258     connect_info
259     _connect_info
260     throw_exception
261     sql_maker
262     sqlt_type
263     create_ddl_dir
264     deployment_statements
265     datetime_parser
266     datetime_parser_type
267     build_datetime_parser
268     last_insert_id
269     insert
270     update
271     delete
272     dbh
273     txn_begin
274     txn_do
275     txn_commit
276     txn_rollback
277     txn_scope_guard
278     _exec_txn_rollback
279     _exec_txn_begin
280     _exec_txn_commit
281     deploy
282     with_deferred_fk_checks
283     dbh_do
284     _prep_for_execute
285     is_datatype_numeric
286     _count_select
287     svp_rollback
288     svp_begin
289     svp_release
290     relname_to_table_alias
291     _dbh_last_insert_id
292     _default_dbi_connect_attributes
293     _dbi_connect_info
294     _dbic_connect_attributes
295     auto_savepoint
296     _query_start
297     _query_end
298     _format_for_trace
299     _dbi_attrs_for_bind
300     bind_attribute_by_data_type
301     transaction_depth
302     _dbh
303     _select_args
304     _dbh_execute_for_fetch
305     _sql_maker
306     _dbh_execute_inserts_with_no_binds
307     _select_args_to_query
308     _gen_sql_bind
309     _svp_generate_name
310     _normalize_connect_info
311     _parse_connect_do
312     savepoints
313     _sql_maker_opts
314     _conn_pid
315     _dbh_autocommit
316     _native_data_type
317     _get_dbh
318     sql_maker_class
319     insert_bulk
320     _insert_bulk
321     _execute
322     _do_query
323     _dbh_execute
324   /, Class::MOP::Class->initialize('DBIx::Class::Storage::DBIHacks')->get_method_list ],
325   reader => [qw/
326     select
327     select_single
328     columns_info_for
329     _dbh_columns_info_for
330     _select
331   /],
332   unimplemented => [qw/
333     _arm_global_destructor
334     _verify_pid
335
336     get_use_dbms_capability
337     set_use_dbms_capability
338     get_dbms_capability
339     set_dbms_capability
340     _dbh_details
341     _dbh_get_info
342
343     _determine_connector_driver
344     _extract_driver_from_connect_info
345     _describe_connection
346     _warn_undetermined_driver
347
348     sql_limit_dialect
349     sql_quote_char
350     sql_name_sep
351
352     _prefetch_autovalues
353     _perform_autoinc_retrieval
354     _autoinc_supplied_for_op
355
356     _resolve_bindattrs
357
358     _max_column_bytesize
359     _is_lob_type
360     _is_binary_lob_type
361     _is_binary_type
362     _is_text_lob_type
363
364     _prepare_sth
365     _bind_sth_params
366   /,(
367     # the capability framework
368     # not sure if CMOP->initialize does evil things to DBIC::S::DBI, fix if a problem
369     grep
370       { $_ =~ /^ _ (?: use | supports | determine_supports ) _ /x }
371       ( Class::MOP::Class->initialize('DBIx::Class::Storage::DBI')->get_all_method_names )
372   )],
373 };
374
375 if (DBIx::Class::_ENV_::DBICTEST) {
376
377   my $seen;
378   for my $type (keys %$method_dispatch) {
379     for (@{$method_dispatch->{$type}}) {
380       push @{$seen->{$_}}, $type;
381     }
382   }
383
384   if (my @dupes = grep { @{$seen->{$_}} > 1 } keys %$seen) {
385     die(join "\n", '',
386       'The following methods show up multiple times in ::Storage::DBI::Replicated handlers:',
387       (map { "$_: " . (join ', ', @{$seen->{$_}}) } sort @dupes),
388       '',
389     );
390   }
391
392   if (my @cant = grep { ! DBIx::Class::Storage::DBI->can($_) } keys %$seen) {
393     die(join "\n", '',
394       '::Storage::DBI::Replicated specifies handling of the following *NON EXISTING* ::Storage::DBI methods:',
395       @cant,
396       '',
397     );
398   }
399 }
400
401 for my $method (@{$method_dispatch->{unimplemented}}) {
402   __PACKAGE__->meta->add_method($method, sub {
403     my $self = shift;
404     $self->throw_exception("$method() must not be called on ".(blessed $self).' objects');
405   });
406 }
407
408 =head2 read_handler
409
410 Defines an object that implements the read side of L<DBIx::Class::Storage::DBI>.
411
412 =cut
413
414 has 'read_handler' => (
415   is=>'rw',
416   isa=>Object,
417   lazy_build=>1,
418   handles=>$method_dispatch->{reader},
419 );
420
421 =head2 write_handler
422
423 Defines an object that implements the write side of L<DBIx::Class::Storage::DBI>,
424 as well as methods that don't write or read that can be called on only one
425 storage, methods that return a C<$dbh>, and any methods that don't make sense to
426 run on a replicant.
427
428 =cut
429
430 has 'write_handler' => (
431   is=>'ro',
432   isa=>Object,
433   lazy_build=>1,
434   handles=>$method_dispatch->{writer},
435 );
436
437
438
439 has _master_connect_info_opts =>
440   (is => 'rw', isa => HashRef, default => sub { {} });
441
442 =head2 around: connect_info
443
444 Preserves master's C<connect_info> options (for merging with replicants.)
445 Also sets any Replicated-related options from connect_info, such as
446 C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>.
447
448 =cut
449
450 around connect_info => sub {
451   my ($next, $self, $info, @extra) = @_;
452
453   $self->throw_exception(
454     'connect_info can not be retrieved from a replicated storage - '
455   . 'accessor must be called on a specific pool instance'
456   ) unless defined $info;
457
458   my $merge = Hash::Merge->new('LEFT_PRECEDENT');
459
460   my %opts;
461   for my $arg (@$info) {
462     next unless (reftype($arg)||'') eq 'HASH';
463     %opts = %{ $merge->merge($arg, \%opts) };
464   }
465   delete $opts{dsn};
466
467   if (@opts{qw/pool_type pool_args/}) {
468     $self->pool_type(delete $opts{pool_type})
469       if $opts{pool_type};
470
471     $self->pool_args(
472       $merge->merge((delete $opts{pool_args} || {}), $self->pool_args)
473     );
474
475     ## Since we possibly changed the pool_args, we need to clear the current
476     ## pool object so that next time it is used it will be rebuilt.
477     $self->clear_pool;
478   }
479
480   if (@opts{qw/balancer_type balancer_args/}) {
481     $self->balancer_type(delete $opts{balancer_type})
482       if $opts{balancer_type};
483
484     $self->balancer_args(
485       $merge->merge((delete $opts{balancer_args} || {}), $self->balancer_args)
486     );
487
488     $self->balancer($self->_build_balancer)
489       if $self->balancer;
490   }
491
492   $self->_master_connect_info_opts(\%opts);
493
494   return preserve_context {
495     $self->$next($info, @extra);
496   } after => sub {
497     # Make sure master is blessed into the correct class and apply role to it.
498     my $master = $self->master;
499     $master->_determine_driver;
500     Moose::Meta::Class->initialize(ref $master);
501
502     DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master);
503
504     # link pool back to master
505     $self->pool->master($master);
506   };
507 };
508
509 =head1 METHODS
510
511 This class defines the following methods.
512
513 =head2 BUILDARGS
514
515 L<DBIx::Class::Schema> when instantiating its storage passed itself as the
516 first argument.  So we need to massage the arguments a bit so that all the
517 bits get put into the correct places.
518
519 =cut
520
521 sub BUILDARGS {
522   my ($class, $schema, $storage_type_args, @args) = @_;
523
524   return {
525     schema=>$schema,
526     %$storage_type_args,
527     @args
528   }
529 }
530
531 =head2 _build_master
532
533 Lazy builder for the L</master> attribute.
534
535 =cut
536
537 sub _build_master {
538   my $self = shift @_;
539   my $master = DBIx::Class::Storage::DBI->new($self->schema);
540   $master
541 }
542
543 =head2 _build_pool
544
545 Lazy builder for the L</pool> attribute.
546
547 =cut
548
549 sub _build_pool {
550   my $self = shift @_;
551   $self->create_pool(%{$self->pool_args});
552 }
553
554 =head2 _build_balancer
555
556 Lazy builder for the L</balancer> attribute.  This takes a Pool object so that
557 the balancer knows which pool it's balancing.
558
559 =cut
560
561 sub _build_balancer {
562   my $self = shift @_;
563   $self->create_balancer(
564     pool=>$self->pool,
565     master=>$self->master,
566     %{$self->balancer_args},
567   );
568 }
569
570 =head2 _build_write_handler
571
572 Lazy builder for the L</write_handler> attribute.  The default is to set this to
573 the L</master>.
574
575 =cut
576
577 sub _build_write_handler {
578   return shift->master;
579 }
580
581 =head2 _build_read_handler
582
583 Lazy builder for the L</read_handler> attribute.  The default is to set this to
584 the L</balancer>.
585
586 =cut
587
588 sub _build_read_handler {
589   return shift->balancer;
590 }
591
592 =head2 around: connect_replicants
593
594 All calls to connect_replicants needs to have an existing $schema tacked onto
595 top of the args, since L<DBIx::Class::Storage::DBI> needs it, and any
596 L<connect_info|DBIx::Class::Storage::DBI/connect_info>
597 options merged with the master, with replicant opts having higher priority.
598
599 =cut
600
601 around connect_replicants => sub {
602   my ($next, $self, @args) = @_;
603
604   for my $r (@args) {
605     $r = [ $r ] unless reftype $r eq 'ARRAY';
606
607     $self->throw_exception('coderef replicant connect_info not supported')
608       if ref $r->[0] && reftype $r->[0] eq 'CODE';
609
610 # any connect_info options?
611     my $i = 0;
612     $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
613
614 # make one if none
615     $r->[$i] = {} unless $r->[$i];
616
617 # merge if two hashes
618     my @hashes = @$r[$i .. $#{$r}];
619
620     $self->throw_exception('invalid connect_info options')
621       if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes;
622
623     $self->throw_exception('too many hashrefs in connect_info')
624       if @hashes > 2;
625
626     my $merge = Hash::Merge->new('LEFT_PRECEDENT');
627     my %opts = %{ $merge->merge(reverse @hashes) };
628
629 # delete them
630     splice @$r, $i+1, ($#{$r} - $i), ();
631
632 # make sure master/replicants opts don't clash
633     my %master_opts = %{ $self->_master_connect_info_opts };
634     if (exists $opts{dbh_maker}) {
635         delete @master_opts{qw/dsn user password/};
636     }
637     delete $master_opts{dbh_maker};
638
639 # merge with master
640     %opts = %{ $merge->merge(\%opts, \%master_opts) };
641
642 # update
643     $r->[$i] = \%opts;
644   }
645
646   $self->$next($self->schema, @args);
647 };
648
649 =head2 all_storages
650
651 Returns an array of all the connected storage backends.  The first element
652 in the returned array is the master, and the rest are each of the
653 replicants.
654
655 =cut
656
657 sub all_storages {
658   my $self = shift @_;
659   return grep {defined $_ && blessed $_} (
660      $self->master,
661      values %{ $self->replicants },
662   );
663 }
664
665 =head2 execute_reliably ($coderef, ?@args)
666
667 Given a coderef, saves the current state of the L</read_handler>, forces it to
668 use reliable storage (e.g. sets it to the master), executes a coderef and then
669 restores the original state.
670
671 Example:
672
673   my $reliably = sub {
674     my $name = shift @_;
675     $schema->resultset('User')->create({name=>$name});
676     my $user_rs = $schema->resultset('User')->find({name=>$name});
677     return $user_rs;
678   };
679
680   my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
681
682 Use this when you must be certain of your database state, such as when you just
683 inserted something and need to get a resultset including it, etc.
684
685 =cut
686
687 sub execute_reliably {
688   my $self = shift;
689   my $coderef = shift;
690
691   unless( ref $coderef eq 'CODE') {
692     $self->throw_exception('Second argument must be a coderef');
693   }
694
695   ## replace the current read handler for the remainder of the scope
696   local $self->{read_handler} = $self->master;
697
698   my $args = \@_;
699   return try {
700     $coderef->(@$args);
701   } catch {
702     $self->throw_exception("coderef returned an error: $_");
703   };
704 }
705
706 =head2 set_reliable_storage
707
708 Sets the current $schema to be 'reliable', that is all queries, both read and
709 write are sent to the master
710
711 =cut
712
713 sub set_reliable_storage {
714   my $self = shift @_;
715   my $schema = $self->schema;
716   my $write_handler = $self->schema->storage->write_handler;
717
718   $schema->storage->read_handler($write_handler);
719 }
720
721 =head2 set_balanced_storage
722
723 Sets the current $schema to be use the </balancer> for all reads, while all
724 writes are sent to the master only
725
726 =cut
727
728 sub set_balanced_storage {
729   my $self = shift @_;
730   my $schema = $self->schema;
731   my $balanced_handler = $self->schema->storage->balancer;
732
733   $schema->storage->read_handler($balanced_handler);
734 }
735
736 =head2 connected
737
738 Check that the master and at least one of the replicants is connected.
739
740 =cut
741
742 sub connected {
743   my $self = shift @_;
744   return
745     $self->master->connected &&
746     $self->pool->connected_replicants;
747 }
748
749 =head2 ensure_connected
750
751 Make sure all the storages are connected.
752
753 =cut
754
755 sub ensure_connected {
756   my $self = shift @_;
757   foreach my $source ($self->all_storages) {
758     $source->ensure_connected(@_);
759   }
760 }
761
762 =head2 limit_dialect
763
764 Set the limit_dialect for all existing storages
765
766 =cut
767
768 sub limit_dialect {
769   my $self = shift @_;
770   foreach my $source ($self->all_storages) {
771     $source->limit_dialect(@_);
772   }
773   return $self->master->limit_dialect;
774 }
775
776 =head2 quote_char
777
778 Set the quote_char for all existing storages
779
780 =cut
781
782 sub quote_char {
783   my $self = shift @_;
784   foreach my $source ($self->all_storages) {
785     $source->quote_char(@_);
786   }
787   return $self->master->quote_char;
788 }
789
790 =head2 name_sep
791
792 Set the name_sep for all existing storages
793
794 =cut
795
796 sub name_sep {
797   my $self = shift @_;
798   foreach my $source ($self->all_storages) {
799     $source->name_sep(@_);
800   }
801   return $self->master->name_sep;
802 }
803
804 =head2 set_schema
805
806 Set the schema object for all existing storages
807
808 =cut
809
810 sub set_schema {
811   my $self = shift @_;
812   foreach my $source ($self->all_storages) {
813     $source->set_schema(@_);
814   }
815 }
816
817 =head2 debug
818
819 set a debug flag across all storages
820
821 =cut
822
823 sub debug {
824   my $self = shift @_;
825   if(@_) {
826     foreach my $source ($self->all_storages) {
827       $source->debug(@_);
828     }
829   }
830   return $self->master->debug;
831 }
832
833 =head2 debugobj
834
835 set a debug object
836
837 =cut
838
839 sub debugobj {
840   my $self = shift @_;
841   return $self->master->debugobj(@_);
842 }
843
844 =head2 debugfh
845
846 set a debugfh object
847
848 =cut
849
850 sub debugfh {
851   my $self = shift @_;
852   return $self->master->debugfh(@_);
853 }
854
855 =head2 debugcb
856
857 set a debug callback
858
859 =cut
860
861 sub debugcb {
862   my $self = shift @_;
863   return $self->master->debugcb(@_);
864 }
865
866 =head2 disconnect
867
868 disconnect everything
869
870 =cut
871
872 sub disconnect {
873   my $self = shift @_;
874   foreach my $source ($self->all_storages) {
875     $source->disconnect(@_);
876   }
877 }
878
879 =head2 cursor_class
880
881 set cursor class on all storages, or return master's
882
883 =cut
884
885 sub cursor_class {
886   my ($self, $cursor_class) = @_;
887
888   if ($cursor_class) {
889     $_->cursor_class($cursor_class) for $self->all_storages;
890   }
891   $self->master->cursor_class;
892 }
893
894 =head2 cursor
895
896 set cursor class on all storages, or return master's, alias for L</cursor_class>
897 above.
898
899 =cut
900
901 sub cursor {
902   my ($self, $cursor_class) = @_;
903
904   if ($cursor_class) {
905     $_->cursor($cursor_class) for $self->all_storages;
906   }
907   $self->master->cursor;
908 }
909
910 =head2 unsafe
911
912 sets the L<DBIx::Class::Storage::DBI/unsafe> option on all storages or returns
913 master's current setting
914
915 =cut
916
917 sub unsafe {
918   my $self = shift;
919
920   if (@_) {
921     $_->unsafe(@_) for $self->all_storages;
922   }
923
924   return $self->master->unsafe;
925 }
926
927 =head2 disable_sth_caching
928
929 sets the L<DBIx::Class::Storage::DBI/disable_sth_caching> option on all storages
930 or returns master's current setting
931
932 =cut
933
934 sub disable_sth_caching {
935   my $self = shift;
936
937   if (@_) {
938     $_->disable_sth_caching(@_) for $self->all_storages;
939   }
940
941   return $self->master->disable_sth_caching;
942 }
943
944 =head2 lag_behind_master
945
946 returns the highest Replicant L<DBIx::Class::Storage::DBI/lag_behind_master>
947 setting
948
949 =cut
950
951 sub lag_behind_master {
952   my $self = shift;
953
954   return max map $_->lag_behind_master, $self->replicants;
955 }
956
957 =head2 is_replicating
958
959 returns true if all replicants return true for
960 L<DBIx::Class::Storage::DBI/is_replicating>
961
962 =cut
963
964 sub is_replicating {
965   my $self = shift;
966
967   return (grep $_->is_replicating, $self->replicants) == ($self->replicants);
968 }
969
970 =head2 connect_call_datetime_setup
971
972 calls L<DBIx::Class::Storage::DBI/connect_call_datetime_setup> for all storages
973
974 =cut
975
976 sub connect_call_datetime_setup {
977   my $self = shift;
978   $_->connect_call_datetime_setup for $self->all_storages;
979 }
980
981 sub _populate_dbh {
982   my $self = shift;
983   $_->_populate_dbh for $self->all_storages;
984 }
985
986 sub _connect {
987   my $self = shift;
988   $_->_connect for $self->all_storages;
989 }
990
991 sub _rebless {
992   my $self = shift;
993   $_->_rebless for $self->all_storages;
994 }
995
996 sub _determine_driver {
997   my $self = shift;
998   $_->_determine_driver for $self->all_storages;
999 }
1000
1001 sub _driver_determined {
1002   my $self = shift;
1003
1004   if (@_) {
1005     $_->_driver_determined(@_) for $self->all_storages;
1006   }
1007
1008   return $self->master->_driver_determined;
1009 }
1010
1011 sub _init {
1012   my $self = shift;
1013
1014   $_->_init for $self->all_storages;
1015 }
1016
1017 sub _run_connection_actions {
1018   my $self = shift;
1019
1020   $_->_run_connection_actions for $self->all_storages;
1021 }
1022
1023 sub _do_connection_actions {
1024   my $self = shift;
1025
1026   if (@_) {
1027     $_->_do_connection_actions(@_) for $self->all_storages;
1028   }
1029 }
1030
1031 sub connect_call_do_sql {
1032   my $self = shift;
1033   $_->connect_call_do_sql(@_) for $self->all_storages;
1034 }
1035
1036 sub disconnect_call_do_sql {
1037   my $self = shift;
1038   $_->disconnect_call_do_sql(@_) for $self->all_storages;
1039 }
1040
1041 sub _seems_connected {
1042   my $self = shift;
1043
1044   return min map $_->_seems_connected, $self->all_storages;
1045 }
1046
1047 sub _ping {
1048   my $self = shift;
1049
1050   return min map $_->_ping, $self->all_storages;
1051 }
1052
1053 # not using the normalized_version, because we want to preserve
1054 # version numbers much longer than the conventional xxx.yyyzzz
1055 my $numify_ver = sub {
1056   my $ver = shift;
1057   my @numparts = split /\D+/, $ver;
1058   my $format = '%d.' . (join '', ('%06d') x (@numparts - 1));
1059
1060   return sprintf $format, @numparts;
1061 };
1062 sub _server_info {
1063   my $self = shift;
1064
1065   if (not $self->_dbh_details->{info}) {
1066     $self->_dbh_details->{info} = (
1067       reduce { $a->[0] < $b->[0] ? $a : $b }
1068       map [ $numify_ver->($_->{dbms_version}), $_ ],
1069       map $_->_server_info, $self->all_storages
1070     )->[1];
1071   }
1072
1073   return $self->next::method;
1074 }
1075
1076 sub _get_server_version {
1077   my $self = shift;
1078
1079   return $self->_server_info->{dbms_version};
1080 }
1081
1082 =head1 GOTCHAS
1083
1084 Due to the fact that replicants can lag behind a master, you must take care to
1085 make sure you use one of the methods to force read queries to a master should
1086 you need realtime data integrity.  For example, if you insert a row, and then
1087 immediately re-read it from the database (say, by doing
1088 L<< $result->discard_changes|DBIx::Class::Row/discard_changes >>)
1089 or you insert a row and then immediately build a query that expects that row
1090 to be an item, you should force the master to handle reads.  Otherwise, due to
1091 the lag, there is no certainty your data will be in the expected state.
1092
1093 For data integrity, all transactions automatically use the master storage for
1094 all read and write queries.  Using a transaction is the preferred and recommended
1095 method to force the master to handle all read queries.
1096
1097 Otherwise, you can force a single query to use the master with the 'force_pool'
1098 attribute:
1099
1100   my $result = $resultset->search(undef, {force_pool=>'master'})->find($pk);
1101
1102 This attribute will safely be ignored by non replicated storages, so you can use
1103 the same code for both types of systems.
1104
1105 Lastly, you can use the L</execute_reliably> method, which works very much like
1106 a transaction.
1107
1108 For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
1109 and L</set_balanced_storage>, however this operates at a global level and is not
1110 suitable if you have a shared Schema object being used by multiple processes,
1111 such as on a web application server.  You can get around this limitation by
1112 using the Schema clone method.
1113
1114   my $new_schema = $schema->clone;
1115   $new_schema->set_reliable_storage;
1116
1117   ## $new_schema will use only the Master storage for all reads/writes while
1118   ## the $schema object will use replicated storage.
1119
1120 =head1 FURTHER QUESTIONS?
1121
1122 Check the list of L<additional DBIC resources|DBIx::Class/GETTING HELP/SUPPORT>.
1123
1124 =head1 COPYRIGHT AND LICENSE
1125
1126 This module is free software L<copyright|DBIx::Class/COPYRIGHT AND LICENSE>
1127 by the L<DBIx::Class (DBIC) authors|DBIx::Class/AUTHORS>. You can
1128 redistribute it and/or modify it under the same terms as the
1129 L<DBIx::Class library|DBIx::Class/COPYRIGHT AND LICENSE>.
1130
1131 =cut
1132
1133 __PACKAGE__->meta->make_immutable;
1134
1135 1;