Cosmetics + changes
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 BEGIN {
4   use Carp::Clan qw/^DBIx::Class/;
5   use DBIx::Class;
6   croak('The following modules are required for Replication ' . DBIx::Class::Optional::Dependencies->req_missing_for ('replicated') )
7     unless DBIx::Class::Optional::Dependencies->req_ok_for ('replicated');
8 }
9
10 use Moose;
11 use DBIx::Class::Storage::DBI;
12 use DBIx::Class::Storage::DBI::Replicated::Pool;
13 use DBIx::Class::Storage::DBI::Replicated::Balancer;
14 use DBIx::Class::Storage::DBI::Replicated::Types qw/BalancerClassNamePart DBICSchema DBICStorageDBI/;
15 use MooseX::Types::Moose qw/ClassName HashRef Object/;
16 use Scalar::Util 'reftype';
17 use Hash::Merge;
18 use List::Util qw/min max/;
19
20 use namespace::clean -except => 'meta';
21
22 =head1 NAME
23
24 DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
25
26 =head1 SYNOPSIS
27
28 The Following example shows how to change an existing $schema to a replicated
29 storage type, add some replicated (read-only) databases, and perform reporting
30 tasks.
31
32 You should set the 'storage_type attribute to a replicated type.  You should
33 also define your arguments, such as which balancer you want and any arguments
34 that the Pool object should get.
35
36   my $schema = Schema::Class->clone;
37   $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
38   $schema->connection(...);
39
40 Next, you need to add in the Replicants.  Basically this is an array of 
41 arrayrefs, where each arrayref is database connect information.  Think of these
42 arguments as what you'd pass to the 'normal' $schema->connect method.
43
44   $schema->storage->connect_replicants(
45     [$dsn1, $user, $pass, \%opts],
46     [$dsn2, $user, $pass, \%opts],
47     [$dsn3, $user, $pass, \%opts],
48   );
49
50 Now, just use the $schema as you normally would.  Automatically all reads will
51 be delegated to the replicants, while writes to the master.
52
53   $schema->resultset('Source')->search({name=>'etc'});
54
55 You can force a given query to use a particular storage using the search
56 attribute 'force_pool'.  For example:
57
58   my $RS = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
59
60 Now $RS will force everything (both reads and writes) to use whatever was setup
61 as the master storage.  'master' is hardcoded to always point to the Master, 
62 but you can also use any Replicant name.  Please see:
63 L<DBIx::Class::Storage::DBI::Replicated::Pool> and the replicants attribute for more.
64
65 Also see transactions and L</execute_reliably> for alternative ways to
66 force read traffic to the master.  In general, you should wrap your statements
67 in a transaction when you are reading and writing to the same tables at the
68 same time, since your replicants will often lag a bit behind the master.
69
70 See L<DBIx::Class::Storage::DBI::Replicated::Instructions> for more help and
71 walkthroughs.
72
73 =head1 DESCRIPTION
74
75 Warning: This class is marked BETA.  This has been running a production
76 website using MySQL native replication as its backend and we have some decent
77 test coverage but the code hasn't yet been stressed by a variety of databases.
78 Individual DBs may have quirks we are not aware of.  Please use this in first
79 development and pass along your experiences/bug fixes.
80
81 This class implements replicated data store for DBI. Currently you can define
82 one master and numerous slave database connections. All write-type queries
83 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
84 database, all read-type queries (SELECTs) go to the slave database.
85
86 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
87 handle gets delegated to one of the two attributes: L</read_handler> or to
88 L</write_handler>.  Additionally, some methods need to be distributed
89 to all existing storages.  This way our storage class is a drop in replacement
90 for L<DBIx::Class::Storage::DBI>.
91
92 Read traffic is spread across the replicants (slaves) occurring to a user
93 selected algorithm.  The default algorithm is random weighted.
94
95 =head1 NOTES
96
97 The consistency between master and replicants is database specific.  The Pool
98 gives you a method to validate its replicants, removing and replacing them
99 when they fail/pass predefined criteria.  Please make careful use of the ways
100 to force a query to run against Master when needed.
101
102 =head1 REQUIREMENTS
103
104 Replicated Storage has additional requirements not currently part of
105 L<DBIx::Class>. See L<DBIx::Class::Optional::Dependencies> for more details.
106
107 =head1 ATTRIBUTES
108
109 This class defines the following attributes.
110
111 =head2 schema
112
113 The underlying L<DBIx::Class::Schema> object this storage is attaching
114
115 =cut
116
117 has 'schema' => (
118     is=>'rw',
119     isa=>DBICSchema,
120     weak_ref=>1,
121     required=>1,
122 );
123
124 =head2 pool_type
125
126 Contains the classname which will instantiate the L</pool> object.  Defaults 
127 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
128
129 =cut
130
131 has 'pool_type' => (
132   is=>'rw',
133   isa=>ClassName,
134   default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
135   handles=>{
136     'create_pool' => 'new',
137   },
138 );
139
140 =head2 pool_args
141
142 Contains a hashref of initialized information to pass to the Balancer object.
143 See L<DBIx::Class::Storage::DBI::Replicated::Pool> for available arguments.
144
145 =cut
146
147 has 'pool_args' => (
148   is=>'rw',
149   isa=>HashRef,
150   lazy=>1,
151   default=>sub { {} },
152 );
153
154
155 =head2 balancer_type
156
157 The replication pool requires a balance class to provider the methods for
158 choose how to spread the query load across each replicant in the pool.
159
160 =cut
161
162 has 'balancer_type' => (
163   is=>'rw',
164   isa=>BalancerClassNamePart,
165   coerce=>1,
166   required=>1,
167   default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
168   handles=>{
169     'create_balancer' => 'new',
170   },
171 );
172
173 =head2 balancer_args
174
175 Contains a hashref of initialized information to pass to the Balancer object.
176 See L<DBIx::Class::Storage::DBI::Replicated::Balancer> for available arguments.
177
178 =cut
179
180 has 'balancer_args' => (
181   is=>'rw',
182   isa=>HashRef,
183   lazy=>1,
184   required=>1,
185   default=>sub { {} },
186 );
187
188 =head2 pool
189
190 Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
191 container class for one or more replicated databases.
192
193 =cut
194
195 has 'pool' => (
196   is=>'ro',
197   isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
198   lazy_build=>1,
199   handles=>[qw/
200     connect_replicants
201     replicants
202     has_replicants
203   /],
204 );
205
206 =head2 balancer
207
208 Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This 
209 is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
210
211 =cut
212
213 has 'balancer' => (
214   is=>'rw',
215   isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
216   lazy_build=>1,
217   handles=>[qw/auto_validate_every/],
218 );
219
220 =head2 master
221
222 The master defines the canonical state for a pool of connected databases.  All
223 the replicants are expected to match this databases state.  Thus, in a classic
224 Master / Slaves distributed system, all the slaves are expected to replicate
225 the Master's state as quick as possible.  This is the only database in the
226 pool of databases that is allowed to handle write traffic.
227
228 =cut
229
230 has 'master' => (
231   is=> 'ro',
232   isa=>DBICStorageDBI,
233   lazy_build=>1,
234 );
235
236 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
237
238 The following methods are delegated all the methods required for the 
239 L<DBIx::Class::Storage::DBI> interface.
240
241 =head2 read_handler
242
243 Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
244
245 =cut
246
247 has 'read_handler' => (
248   is=>'rw',
249   isa=>Object,
250   lazy_build=>1,
251   handles=>[qw/
252     select
253     select_single
254     columns_info_for
255     _dbh_columns_info_for 
256     _select
257   /],
258 );
259
260 =head2 write_handler
261
262 Defines an object that implements the write side of L<BIx::Class::Storage::DBI>,
263 as well as methods that don't write or read that can be called on only one
264 storage, methods that return a C<$dbh>, and any methods that don't make sense to
265 run on a replicant.
266
267 =cut
268
269 has 'write_handler' => (
270   is=>'ro',
271   isa=>Object,
272   lazy_build=>1,
273   handles=>[qw/
274     on_connect_do
275     on_disconnect_do
276     on_connect_call
277     on_disconnect_call
278     connect_info
279     _connect_info
280     throw_exception
281     sql_maker
282     sqlt_type
283     create_ddl_dir
284     deployment_statements
285     datetime_parser
286     datetime_parser_type
287     build_datetime_parser
288     last_insert_id
289     insert
290     insert_bulk
291     update
292     delete
293     dbh
294     txn_begin
295     txn_do
296     txn_commit
297     txn_rollback
298     txn_scope_guard
299     sth
300     deploy
301     with_deferred_fk_checks
302     dbh_do
303     reload_row
304     with_deferred_fk_checks
305     _prep_for_execute
306
307     backup
308     is_datatype_numeric
309     can_insert_returning
310     _count_select
311     _subq_count_select
312     _subq_update_delete
313     svp_rollback
314     svp_begin
315     svp_release
316     relname_to_table_alias
317     _straight_join_to_node
318     _dbh_last_insert_id
319     _fix_bind_params
320     _default_dbi_connect_attributes
321     _dbi_connect_info
322     auto_savepoint
323     _sqlt_version_ok
324     _query_end
325     bind_attribute_by_data_type
326     transaction_depth
327     _dbh
328     _select_args
329     _dbh_execute_array
330     _sql_maker_args
331     _sql_maker
332     _query_start
333     _sqlt_version_error
334     _per_row_update_delete
335     _dbh_begin_work
336     _dbh_execute_inserts_with_no_binds
337     _select_args_to_query
338     _svp_generate_name
339     _multipk_update_delete
340     source_bind_attributes
341     _normalize_connect_info
342     _parse_connect_do
343     _dbh_commit
344     _execute_array
345     _placeholders_supported
346     _verify_pid
347     savepoints
348     _sqlt_minimum_version
349     _sql_maker_opts
350     _conn_pid
351     _typeless_placeholders_supported
352     _conn_tid
353     _dbh_autocommit
354     _native_data_type
355     _get_dbh
356     sql_maker_class
357     _dbh_rollback
358     _adjust_select_args_for_complex_prefetch
359     _resolve_ident_sources
360     _resolve_column_info
361     _prune_unused_joins
362     _strip_cond_qualifiers
363     _parse_order_by
364     _resolve_aliastypes_from_select_args
365     _execute
366     _do_query
367     _dbh_sth
368     _dbh_execute
369     _prefetch_insert_auto_nextvals
370   /],
371 );
372
373 has _master_connect_info_opts =>
374   (is => 'rw', isa => HashRef, default => sub { {} });
375
376 =head2 around: connect_info
377
378 Preserves master's C<connect_info> options (for merging with replicants.)
379 Also sets any Replicated-related options from connect_info, such as
380 C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>.
381
382 =cut
383
384 around connect_info => sub {
385   my ($next, $self, $info, @extra) = @_;
386
387   my $wantarray = wantarray;
388
389   my $merge = Hash::Merge->new('LEFT_PRECEDENT');
390
391   my %opts;
392   for my $arg (@$info) {
393     next unless (reftype($arg)||'') eq 'HASH';
394     %opts = %{ $merge->merge($arg, \%opts) };
395   }
396   delete $opts{dsn};
397
398   if (@opts{qw/pool_type pool_args/}) {
399     $self->pool_type(delete $opts{pool_type})
400       if $opts{pool_type};
401
402     $self->pool_args(
403       $merge->merge((delete $opts{pool_args} || {}), $self->pool_args)
404     );
405
406     $self->pool($self->_build_pool)
407       if $self->pool;
408   }
409
410   if (@opts{qw/balancer_type balancer_args/}) {
411     $self->balancer_type(delete $opts{balancer_type})
412       if $opts{balancer_type};
413
414     $self->balancer_args(
415       $merge->merge((delete $opts{balancer_args} || {}), $self->balancer_args)
416     );
417
418     $self->balancer($self->_build_balancer)
419       if $self->balancer;
420   }
421
422   $self->_master_connect_info_opts(\%opts);
423
424   my (@res, $res);
425   if ($wantarray) {
426     @res = $self->$next($info, @extra);
427   } else {
428     $res = $self->$next($info, @extra);
429   }
430
431   # Make sure master is blessed into the correct class and apply role to it.
432   my $master = $self->master;
433   $master->_determine_driver;
434   Moose::Meta::Class->initialize(ref $master);
435
436   DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master);
437
438   # link pool back to master
439   $self->pool->master($master);
440
441   $wantarray ? @res : $res;
442 };
443
444 =head1 METHODS
445
446 This class defines the following methods.
447
448 =head2 BUILDARGS
449
450 L<DBIx::Class::Schema> when instantiating its storage passed itself as the
451 first argument.  So we need to massage the arguments a bit so that all the
452 bits get put into the correct places.
453
454 =cut
455
456 sub BUILDARGS {
457   my ($class, $schema, $storage_type_args, @args) = @_;  
458
459   return {
460     schema=>$schema,
461     %$storage_type_args,
462     @args
463   }
464 }
465
466 =head2 _build_master
467
468 Lazy builder for the L</master> attribute.
469
470 =cut
471
472 sub _build_master {
473   my $self = shift @_;
474   my $master = DBIx::Class::Storage::DBI->new($self->schema);
475   $master
476 }
477
478 =head2 _build_pool
479
480 Lazy builder for the L</pool> attribute.
481
482 =cut
483
484 sub _build_pool {
485   my $self = shift @_;
486   $self->create_pool(%{$self->pool_args});
487 }
488
489 =head2 _build_balancer
490
491 Lazy builder for the L</balancer> attribute.  This takes a Pool object so that
492 the balancer knows which pool it's balancing.
493
494 =cut
495
496 sub _build_balancer {
497   my $self = shift @_;
498   $self->create_balancer(
499     pool=>$self->pool,
500     master=>$self->master,
501     %{$self->balancer_args},
502   );
503 }
504
505 =head2 _build_write_handler
506
507 Lazy builder for the L</write_handler> attribute.  The default is to set this to
508 the L</master>.
509
510 =cut
511
512 sub _build_write_handler {
513   return shift->master;
514 }
515
516 =head2 _build_read_handler
517
518 Lazy builder for the L</read_handler> attribute.  The default is to set this to
519 the L</balancer>.
520
521 =cut
522
523 sub _build_read_handler {
524   return shift->balancer;
525 }
526
527 =head2 around: connect_replicants
528
529 All calls to connect_replicants needs to have an existing $schema tacked onto
530 top of the args, since L<DBIx::Storage::DBI> needs it, and any C<connect_info>
531 options merged with the master, with replicant opts having higher priority.
532
533 =cut
534
535 around connect_replicants => sub {
536   my ($next, $self, @args) = @_;
537
538   for my $r (@args) {
539     $r = [ $r ] unless reftype $r eq 'ARRAY';
540
541     $self->throw_exception('coderef replicant connect_info not supported')
542       if ref $r->[0] && reftype $r->[0] eq 'CODE';
543
544 # any connect_info options?
545     my $i = 0;
546     $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
547
548 # make one if none
549     $r->[$i] = {} unless $r->[$i];
550
551 # merge if two hashes
552     my @hashes = @$r[$i .. $#{$r}];
553
554     $self->throw_exception('invalid connect_info options')
555       if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes;
556
557     $self->throw_exception('too many hashrefs in connect_info')
558       if @hashes > 2;
559
560     my $merge = Hash::Merge->new('LEFT_PRECEDENT');
561     my %opts = %{ $merge->merge(reverse @hashes) };
562
563 # delete them
564     splice @$r, $i+1, ($#{$r} - $i), ();
565
566 # make sure master/replicants opts don't clash
567     my %master_opts = %{ $self->_master_connect_info_opts };
568     if (exists $opts{dbh_maker}) {
569         delete @master_opts{qw/dsn user password/};
570     }
571     delete $master_opts{dbh_maker};
572
573 # merge with master
574     %opts = %{ $merge->merge(\%opts, \%master_opts) };
575
576 # update
577     $r->[$i] = \%opts;
578   }
579
580   $self->$next($self->schema, @args);
581 };
582
583 =head2 all_storages
584
585 Returns an array of of all the connected storage backends.  The first element
586 in the returned array is the master, and the remainings are each of the
587 replicants.
588
589 =cut
590
591 sub all_storages {
592   my $self = shift @_;
593   return grep {defined $_ && blessed $_} (
594      $self->master,
595      values %{ $self->replicants },
596   );
597 }
598
599 =head2 execute_reliably ($coderef, ?@args)
600
601 Given a coderef, saves the current state of the L</read_handler>, forces it to
602 use reliable storage (e.g. sets it to the master), executes a coderef and then
603 restores the original state.
604
605 Example:
606
607   my $reliably = sub {
608     my $name = shift @_;
609     $schema->resultset('User')->create({name=>$name});
610     my $user_rs = $schema->resultset('User')->find({name=>$name}); 
611     return $user_rs;
612   };
613
614   my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
615
616 Use this when you must be certain of your database state, such as when you just
617 inserted something and need to get a resultset including it, etc.
618
619 =cut
620
621 sub execute_reliably {
622   my ($self, $coderef, @args) = @_;
623
624   unless( ref $coderef eq 'CODE') {
625     $self->throw_exception('Second argument must be a coderef');
626   }
627
628   ##Get copy of master storage
629   my $master = $self->master;
630
631   ##Get whatever the current read hander is
632   my $current = $self->read_handler;
633
634   ##Set the read handler to master
635   $self->read_handler($master);
636
637   ## do whatever the caller needs
638   my @result;
639   my $want_array = wantarray;
640
641   eval {
642     if($want_array) {
643       @result = $coderef->(@args);
644     } elsif(defined $want_array) {
645       ($result[0]) = ($coderef->(@args));
646     } else {
647       $coderef->(@args);
648     }
649   };
650
651   ##Reset to the original state
652   $self->read_handler($current);
653
654   ##Exception testing has to come last, otherwise you might leave the 
655   ##read_handler set to master.
656
657   if($@) {
658     $self->throw_exception("coderef returned an error: $@");
659   } else {
660     return $want_array ? @result : $result[0];
661   }
662 }
663
664 =head2 set_reliable_storage
665
666 Sets the current $schema to be 'reliable', that is all queries, both read and
667 write are sent to the master
668
669 =cut
670
671 sub set_reliable_storage {
672   my $self = shift @_;
673   my $schema = $self->schema;
674   my $write_handler = $self->schema->storage->write_handler;
675
676   $schema->storage->read_handler($write_handler);
677 }
678
679 =head2 set_balanced_storage
680
681 Sets the current $schema to be use the </balancer> for all reads, while all
682 writes are sent to the master only
683
684 =cut
685
686 sub set_balanced_storage {
687   my $self = shift @_;
688   my $schema = $self->schema;
689   my $balanced_handler = $self->schema->storage->balancer;
690
691   $schema->storage->read_handler($balanced_handler);
692 }
693
694 =head2 connected
695
696 Check that the master and at least one of the replicants is connected.
697
698 =cut
699
700 sub connected {
701   my $self = shift @_;
702   return
703     $self->master->connected &&
704     $self->pool->connected_replicants;
705 }
706
707 =head2 ensure_connected
708
709 Make sure all the storages are connected.
710
711 =cut
712
713 sub ensure_connected {
714   my $self = shift @_;
715   foreach my $source ($self->all_storages) {
716     $source->ensure_connected(@_);
717   }
718 }
719
720 =head2 limit_dialect
721
722 Set the limit_dialect for all existing storages
723
724 =cut
725
726 sub limit_dialect {
727   my $self = shift @_;
728   foreach my $source ($self->all_storages) {
729     $source->limit_dialect(@_);
730   }
731   return $self->master->quote_char;
732 }
733
734 =head2 quote_char
735
736 Set the quote_char for all existing storages
737
738 =cut
739
740 sub quote_char {
741   my $self = shift @_;
742   foreach my $source ($self->all_storages) {
743     $source->quote_char(@_);
744   }
745   return $self->master->quote_char;
746 }
747
748 =head2 name_sep
749
750 Set the name_sep for all existing storages
751
752 =cut
753
754 sub name_sep {
755   my $self = shift @_;
756   foreach my $source ($self->all_storages) {
757     $source->name_sep(@_);
758   }
759   return $self->master->name_sep;
760 }
761
762 =head2 set_schema
763
764 Set the schema object for all existing storages
765
766 =cut
767
768 sub set_schema {
769   my $self = shift @_;
770   foreach my $source ($self->all_storages) {
771     $source->set_schema(@_);
772   }
773 }
774
775 =head2 debug
776
777 set a debug flag across all storages
778
779 =cut
780
781 sub debug {
782   my $self = shift @_;
783   if(@_) {
784     foreach my $source ($self->all_storages) {
785       $source->debug(@_);
786     }
787   }
788   return $self->master->debug;
789 }
790
791 =head2 debugobj
792
793 set a debug object
794
795 =cut
796
797 sub debugobj {
798   my $self = shift @_;
799   return $self->master->debugobj(@_);
800 }
801
802 =head2 debugfh
803
804 set a debugfh object
805
806 =cut
807
808 sub debugfh {
809   my $self = shift @_;
810   return $self->master->debugfh(@_);
811 }
812
813 =head2 debugcb
814
815 set a debug callback
816
817 =cut
818
819 sub debugcb {
820   my $self = shift @_;
821   return $self->master->debugcb(@_);
822 }
823
824 =head2 disconnect
825
826 disconnect everything
827
828 =cut
829
830 sub disconnect {
831   my $self = shift @_;
832   foreach my $source ($self->all_storages) {
833     $source->disconnect(@_);
834   }
835 }
836
837 =head2 cursor_class
838
839 set cursor class on all storages, or return master's
840
841 =cut
842
843 sub cursor_class {
844   my ($self, $cursor_class) = @_;
845
846   if ($cursor_class) {
847     $_->cursor_class($cursor_class) for $self->all_storages;
848   }
849   $self->master->cursor_class;
850 }
851
852 =head2 cursor
853
854 set cursor class on all storages, or return master's, alias for L</cursor_class>
855 above.
856
857 =cut
858
859 sub cursor {
860   my ($self, $cursor_class) = @_;
861
862   if ($cursor_class) {
863     $_->cursor($cursor_class) for $self->all_storages;
864   }
865   $self->master->cursor;
866 }
867
868 =head2 unsafe
869
870 sets the L<DBIx::Class::Storage::DBI/unsafe> option on all storages or returns
871 master's current setting
872
873 =cut
874
875 sub unsafe {
876   my $self = shift;
877
878   if (@_) {
879     $_->unsafe(@_) for $self->all_storages;
880   }
881
882   return $self->master->unsafe;
883 }
884
885 =head2 disable_sth_caching
886
887 sets the L<DBIx::Class::Storage::DBI/disable_sth_caching> option on all storages
888 or returns master's current setting
889
890 =cut
891
892 sub disable_sth_caching {
893   my $self = shift;
894
895   if (@_) {
896     $_->disable_sth_caching(@_) for $self->all_storages;
897   }
898
899   return $self->master->disable_sth_caching;
900 }
901
902 =head2 lag_behind_master
903
904 returns the highest Replicant L<DBIx::Class::Storage::DBI/lag_behind_master>
905 setting
906
907 =cut
908
909 sub lag_behind_master {
910   my $self = shift;
911
912   return max map $_->lag_behind_master, $self->replicants;
913
914
915 =head2 is_replicating
916
917 returns true if all replicants return true for
918 L<DBIx::Class::Storage::DBI/is_replicating>
919
920 =cut
921
922 sub is_replicating {
923   my $self = shift;
924
925   return (grep $_->is_replicating, $self->replicants) == ($self->replicants);
926 }
927
928 =head2 connect_call_datetime_setup
929
930 calls L<DBIx::Class::Storage::DBI/connect_call_datetime_setup> for all storages
931
932 =cut
933
934 sub connect_call_datetime_setup {
935   my $self = shift;
936   $_->connect_call_datetime_setup for $self->all_storages;
937 }
938
939 sub _populate_dbh {
940   my $self = shift;
941   $_->_populate_dbh for $self->all_storages;
942 }
943
944 sub _connect {
945   my $self = shift;
946   $_->_connect for $self->all_storages;
947 }
948
949 sub _rebless {
950   my $self = shift;
951   $_->_rebless for $self->all_storages;
952 }
953
954 sub _determine_driver {
955   my $self = shift;
956   $_->_determine_driver for $self->all_storages;
957 }
958
959 sub _driver_determined {
960   my $self = shift;
961   
962   if (@_) {
963     $_->_driver_determined(@_) for $self->all_storages;
964   }
965
966   return $self->master->_driver_determined;
967 }
968
969 sub _init {
970   my $self = shift;
971   
972   $_->_init for $self->all_storages;
973 }
974
975 sub _run_connection_actions {
976   my $self = shift;
977   
978   $_->_run_connection_actions for $self->all_storages;
979 }
980
981 sub _do_connection_actions {
982   my $self = shift;
983   
984   if (@_) {
985     $_->_do_connection_actions(@_) for $self->all_storages;
986   }
987 }
988
989 sub connect_call_do_sql {
990   my $self = shift;
991   $_->connect_call_do_sql(@_) for $self->all_storages;
992 }
993
994 sub disconnect_call_do_sql {
995   my $self = shift;
996   $_->disconnect_call_do_sql(@_) for $self->all_storages;
997 }
998
999 sub _seems_connected {
1000   my $self = shift;
1001
1002   return min map $_->_seems_connected, $self->all_storages;
1003 }
1004
1005 sub _ping {
1006   my $self = shift;
1007
1008   return min map $_->_ping, $self->all_storages;
1009 }
1010
1011 =head1 GOTCHAS
1012
1013 Due to the fact that replicants can lag behind a master, you must take care to
1014 make sure you use one of the methods to force read queries to a master should
1015 you need realtime data integrity.  For example, if you insert a row, and then
1016 immediately re-read it from the database (say, by doing $row->discard_changes)
1017 or you insert a row and then immediately build a query that expects that row
1018 to be an item, you should force the master to handle reads.  Otherwise, due to
1019 the lag, there is no certainty your data will be in the expected state.
1020
1021 For data integrity, all transactions automatically use the master storage for
1022 all read and write queries.  Using a transaction is the preferred and recommended
1023 method to force the master to handle all read queries.
1024
1025 Otherwise, you can force a single query to use the master with the 'force_pool'
1026 attribute:
1027
1028   my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk);
1029
1030 This attribute will safely be ignore by non replicated storages, so you can use
1031 the same code for both types of systems.
1032
1033 Lastly, you can use the L</execute_reliably> method, which works very much like
1034 a transaction.
1035
1036 For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
1037 and L</set_balanced_storage>, however this operates at a global level and is not
1038 suitable if you have a shared Schema object being used by multiple processes,
1039 such as on a web application server.  You can get around this limitation by
1040 using the Schema clone method.
1041
1042   my $new_schema = $schema->clone;
1043   $new_schema->set_reliable_storage;
1044
1045   ## $new_schema will use only the Master storage for all reads/writes while
1046   ## the $schema object will use replicated storage.
1047
1048 =head1 AUTHOR
1049
1050   John Napiorkowski <john.napiorkowski@takkle.com>
1051
1052 Based on code originated by:
1053
1054   Norbert Csongrádi <bert@cpan.org>
1055   Peter Siklósi <einon@einon.hu>
1056
1057 =head1 LICENSE
1058
1059 You may distribute this code under the same terms as Perl itself.
1060
1061 =cut
1062
1063 __PACKAGE__->meta->make_immutable;
1064
1065 1;