37954b40b30a65f7dfb4c783a784f2c403604003
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 BEGIN {
4   use DBIx::Class;
5   die('The following modules are required for Replication ' . DBIx::Class::Optional::Dependencies->req_missing_for ('replicated') . "\n" )
6     unless DBIx::Class::Optional::Dependencies->req_ok_for ('replicated');
7 }
8
9 use Moose;
10 use DBIx::Class::Storage::DBI;
11 use DBIx::Class::Storage::DBI::Replicated::Pool;
12 use DBIx::Class::Storage::DBI::Replicated::Balancer;
13 use DBIx::Class::Storage::DBI::Replicated::Types qw/BalancerClassNamePart DBICSchema DBICStorageDBI/;
14 use MooseX::Types::Moose qw/ClassName HashRef Object/;
15 use Scalar::Util 'reftype';
16 use Hash::Merge;
17 use List::Util qw/min max reduce/;
18 use Try::Tiny;
19 use namespace::clean;
20
21 use namespace::clean -except => 'meta';
22
23 =head1 NAME
24
25 DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
26
27 =head1 SYNOPSIS
28
29 The Following example shows how to change an existing $schema to a replicated
30 storage type, add some replicated (read-only) databases, and perform reporting
31 tasks.
32
33 You should set the 'storage_type attribute to a replicated type.  You should
34 also define your arguments, such as which balancer you want and any arguments
35 that the Pool object should get.
36
37   my $schema = Schema::Class->clone;
38   $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
39   $schema->connection(...);
40
41 Next, you need to add in the Replicants.  Basically this is an array of
42 arrayrefs, where each arrayref is database connect information.  Think of these
43 arguments as what you'd pass to the 'normal' $schema->connect method.
44
45   $schema->storage->connect_replicants(
46     [$dsn1, $user, $pass, \%opts],
47     [$dsn2, $user, $pass, \%opts],
48     [$dsn3, $user, $pass, \%opts],
49   );
50
51 Now, just use the $schema as you normally would.  Automatically all reads will
52 be delegated to the replicants, while writes to the master.
53
54   $schema->resultset('Source')->search({name=>'etc'});
55
56 You can force a given query to use a particular storage using the search
57 attribute 'force_pool'.  For example:
58
59   my $rs = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
60
61 Now $rs will force everything (both reads and writes) to use whatever was setup
62 as the master storage.  'master' is hardcoded to always point to the Master,
63 but you can also use any Replicant name.  Please see:
64 L<DBIx::Class::Storage::DBI::Replicated::Pool> and the replicants attribute for more.
65
66 Also see transactions and L</execute_reliably> for alternative ways to
67 force read traffic to the master.  In general, you should wrap your statements
68 in a transaction when you are reading and writing to the same tables at the
69 same time, since your replicants will often lag a bit behind the master.
70
71 If you have a multi-statement read only transaction you can force it to select
72 a random server in the pool by:
73
74   my $rs = $schema->resultset('Source')->search( undef,
75     { force_pool => $db->storage->read_handler->next_storage }
76   );
77
78 =head1 DESCRIPTION
79
80 Warning: This class is marked BETA.  This has been running a production
81 website using MySQL native replication as its backend and we have some decent
82 test coverage but the code hasn't yet been stressed by a variety of databases.
83 Individual DBs may have quirks we are not aware of.  Please use this in first
84 development and pass along your experiences/bug fixes.
85
86 This class implements replicated data store for DBI. Currently you can define
87 one master and numerous slave database connections. All write-type queries
88 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
89 database, all read-type queries (SELECTs) go to the slave database.
90
91 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
92 handle gets delegated to one of the two attributes: L</read_handler> or to
93 L</write_handler>.  Additionally, some methods need to be distributed
94 to all existing storages.  This way our storage class is a drop in replacement
95 for L<DBIx::Class::Storage::DBI>.
96
97 Read traffic is spread across the replicants (slaves) occurring to a user
98 selected algorithm.  The default algorithm is random weighted.
99
100 =head1 NOTES
101
102 The consistency between master and replicants is database specific.  The Pool
103 gives you a method to validate its replicants, removing and replacing them
104 when they fail/pass predefined criteria.  Please make careful use of the ways
105 to force a query to run against Master when needed.
106
107 =head1 REQUIREMENTS
108
109 Replicated Storage has additional requirements not currently part of
110 L<DBIx::Class>. See L<DBIx::Class::Optional::Dependencies> for more details.
111
112 =head1 ATTRIBUTES
113
114 This class defines the following attributes.
115
116 =head2 schema
117
118 The underlying L<DBIx::Class::Schema> object this storage is attaching
119
120 =cut
121
122 has 'schema' => (
123     is=>'rw',
124     isa=>DBICSchema,
125     weak_ref=>1,
126     required=>1,
127 );
128
129 =head2 pool_type
130
131 Contains the classname which will instantiate the L</pool> object.  Defaults
132 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
133
134 =cut
135
136 has 'pool_type' => (
137   is=>'rw',
138   isa=>ClassName,
139   default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
140   handles=>{
141     'create_pool' => 'new',
142   },
143 );
144
145 =head2 pool_args
146
147 Contains a hashref of initialized information to pass to the Balancer object.
148 See L<DBIx::Class::Storage::DBI::Replicated::Pool> for available arguments.
149
150 =cut
151
152 has 'pool_args' => (
153   is=>'rw',
154   isa=>HashRef,
155   lazy=>1,
156   default=>sub { {} },
157 );
158
159
160 =head2 balancer_type
161
162 The replication pool requires a balance class to provider the methods for
163 choose how to spread the query load across each replicant in the pool.
164
165 =cut
166
167 has 'balancer_type' => (
168   is=>'rw',
169   isa=>BalancerClassNamePart,
170   coerce=>1,
171   required=>1,
172   default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
173   handles=>{
174     'create_balancer' => 'new',
175   },
176 );
177
178 =head2 balancer_args
179
180 Contains a hashref of initialized information to pass to the Balancer object.
181 See L<DBIx::Class::Storage::DBI::Replicated::Balancer> for available arguments.
182
183 =cut
184
185 has 'balancer_args' => (
186   is=>'rw',
187   isa=>HashRef,
188   lazy=>1,
189   required=>1,
190   default=>sub { {} },
191 );
192
193 =head2 pool
194
195 Is a L<DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
196 container class for one or more replicated databases.
197
198 =cut
199
200 has 'pool' => (
201   is=>'ro',
202   isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
203   lazy_build=>1,
204   handles=>[qw/
205     connect_replicants
206     replicants
207     has_replicants
208   /],
209 );
210
211 =head2 balancer
212
213 Is a L<DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This
214 is a class that takes a pool (L<DBIx::Class::Storage::DBI::Replicated::Pool>)
215
216 =cut
217
218 has 'balancer' => (
219   is=>'rw',
220   isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
221   lazy_build=>1,
222   handles=>[qw/auto_validate_every/],
223 );
224
225 =head2 master
226
227 The master defines the canonical state for a pool of connected databases.  All
228 the replicants are expected to match this databases state.  Thus, in a classic
229 Master / Slaves distributed system, all the slaves are expected to replicate
230 the Master's state as quick as possible.  This is the only database in the
231 pool of databases that is allowed to handle write traffic.
232
233 =cut
234
235 has 'master' => (
236   is=> 'ro',
237   isa=>DBICStorageDBI,
238   lazy_build=>1,
239 );
240
241 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
242
243 The following methods are delegated all the methods required for the
244 L<DBIx::Class::Storage::DBI> interface.
245
246 =cut
247
248 my $method_dispatch = {
249   writer => [qw/
250     on_connect_do
251     on_disconnect_do
252     on_connect_call
253     on_disconnect_call
254     connect_info
255     _connect_info
256     throw_exception
257     sql_maker
258     sqlt_type
259     create_ddl_dir
260     deployment_statements
261     datetime_parser
262     datetime_parser_type
263     build_datetime_parser
264     last_insert_id
265     insert
266     insert_bulk
267     update
268     delete
269     dbh
270     txn_begin
271     txn_do
272     txn_commit
273     txn_rollback
274     txn_scope_guard
275     _exec_txn_rollback
276     _exec_txn_begin
277     _exec_txn_commit
278     deploy
279     with_deferred_fk_checks
280     dbh_do
281     _prep_for_execute
282     is_datatype_numeric
283     _count_select
284     svp_rollback
285     svp_begin
286     svp_release
287     relname_to_table_alias
288     _dbh_last_insert_id
289     _default_dbi_connect_attributes
290     _dbi_connect_info
291     _dbic_connect_attributes
292     auto_savepoint
293     _query_start
294     _query_end
295     _format_for_trace
296     _dbi_attrs_for_bind
297     bind_attribute_by_data_type
298     transaction_depth
299     _dbh
300     _select_args
301     _dbh_execute_for_fetch
302     _sql_maker
303     _dbh_execute_inserts_with_no_binds
304     _select_args_to_query
305     _gen_sql_bind
306     _svp_generate_name
307     _normalize_connect_info
308     _parse_connect_do
309     savepoints
310     _sql_maker_opts
311     _conn_pid
312     _dbh_autocommit
313     _native_data_type
314     _get_dbh
315     sql_maker_class
316     _execute
317     _do_query
318     _sth
319     _dbh_sth
320     _dbh_execute
321   /, Class::MOP::Class->initialize('DBIx::Class::Storage::DBIHacks')->get_method_list ],
322   reader => [qw/
323     select
324     select_single
325     columns_info_for
326     _dbh_columns_info_for
327     _select
328   /],
329   unimplemented => [qw/
330     _arm_global_destructor
331     _verify_pid
332
333     source_bind_attributes
334
335     get_use_dbms_capability
336     set_use_dbms_capability
337     get_dbms_capability
338     set_dbms_capability
339     _dbh_details
340     _dbh_get_info
341
342     sql_limit_dialect
343     sql_quote_char
344     sql_name_sep
345
346     _prefetch_autovalues
347     _perform_autoinc_retrieval
348     _autoinc_supplied_for_op
349
350     _resolve_bindattrs
351
352     _max_column_bytesize
353     _is_lob_type
354     _is_binary_lob_type
355     _is_text_lob_type
356
357     sth
358   /,(
359     # the capability framework
360     # not sure if CMOP->initialize does evil things to DBIC::S::DBI, fix if a problem
361     grep
362       { $_ =~ /^ _ (?: use | supports | determine_supports ) _ /x }
363       ( Class::MOP::Class->initialize('DBIx::Class::Storage::DBI')->get_all_method_names )
364   )],
365 };
366
367 if (DBIx::Class::_ENV_::DBICTEST) {
368
369   my $seen;
370   for my $type (keys %$method_dispatch) {
371     for (@{$method_dispatch->{$type}}) {
372       push @{$seen->{$_}}, $type;
373     }
374   }
375
376   if (my @dupes = grep { @{$seen->{$_}} > 1 } keys %$seen) {
377     die(join "\n", '',
378       'The following methods show up multiple times in ::Storage::DBI::Replicated handlers:',
379       (map { "$_: " . (join ', ', @{$seen->{$_}}) } sort @dupes),
380       '',
381     );
382   }
383
384   if (my @cant = grep { ! DBIx::Class::Storage::DBI->can($_) } keys %$seen) {
385     die(join "\n", '',
386       '::Storage::DBI::Replicated specifies handling of the following *NON EXISTING* ::Storage::DBI methods:',
387       @cant,
388       '',
389     );
390   }
391 }
392
393 for my $method (@{$method_dispatch->{unimplemented}}) {
394   __PACKAGE__->meta->add_method($method, sub {
395     my $self = shift;
396     $self->throw_exception("$method must not be called on ".(blessed $self).' objects');
397   });
398 }
399
400 =head2 read_handler
401
402 Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
403
404 =cut
405
406 has 'read_handler' => (
407   is=>'rw',
408   isa=>Object,
409   lazy_build=>1,
410   handles=>$method_dispatch->{reader},
411 );
412
413 =head2 write_handler
414
415 Defines an object that implements the write side of L<BIx::Class::Storage::DBI>,
416 as well as methods that don't write or read that can be called on only one
417 storage, methods that return a C<$dbh>, and any methods that don't make sense to
418 run on a replicant.
419
420 =cut
421
422 has 'write_handler' => (
423   is=>'ro',
424   isa=>Object,
425   lazy_build=>1,
426   handles=>$method_dispatch->{writer},
427 );
428
429
430
431 has _master_connect_info_opts =>
432   (is => 'rw', isa => HashRef, default => sub { {} });
433
434 =head2 around: connect_info
435
436 Preserves master's C<connect_info> options (for merging with replicants.)
437 Also sets any Replicated-related options from connect_info, such as
438 C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>.
439
440 =cut
441
442 around connect_info => sub {
443   my ($next, $self, $info, @extra) = @_;
444
445   my $merge = Hash::Merge->new('LEFT_PRECEDENT');
446
447   my %opts;
448   for my $arg (@$info) {
449     next unless (reftype($arg)||'') eq 'HASH';
450     %opts = %{ $merge->merge($arg, \%opts) };
451   }
452   delete $opts{dsn};
453
454   if (@opts{qw/pool_type pool_args/}) {
455     $self->pool_type(delete $opts{pool_type})
456       if $opts{pool_type};
457
458     $self->pool_args(
459       $merge->merge((delete $opts{pool_args} || {}), $self->pool_args)
460     );
461
462     ## Since we possibly changed the pool_args, we need to clear the current
463     ## pool object so that next time it is used it will be rebuilt.
464     $self->clear_pool;
465   }
466
467   if (@opts{qw/balancer_type balancer_args/}) {
468     $self->balancer_type(delete $opts{balancer_type})
469       if $opts{balancer_type};
470
471     $self->balancer_args(
472       $merge->merge((delete $opts{balancer_args} || {}), $self->balancer_args)
473     );
474
475     $self->balancer($self->_build_balancer)
476       if $self->balancer;
477   }
478
479   $self->_master_connect_info_opts(\%opts);
480
481   my @res;
482   if (wantarray) {
483     @res = $self->$next($info, @extra);
484   } else {
485     $res[0] = $self->$next($info, @extra);
486   }
487
488   # Make sure master is blessed into the correct class and apply role to it.
489   my $master = $self->master;
490   $master->_determine_driver;
491   Moose::Meta::Class->initialize(ref $master);
492
493   DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master);
494
495   # link pool back to master
496   $self->pool->master($master);
497
498   wantarray ? @res : $res[0];
499 };
500
501 =head1 METHODS
502
503 This class defines the following methods.
504
505 =head2 BUILDARGS
506
507 L<DBIx::Class::Schema> when instantiating its storage passed itself as the
508 first argument.  So we need to massage the arguments a bit so that all the
509 bits get put into the correct places.
510
511 =cut
512
513 sub BUILDARGS {
514   my ($class, $schema, $storage_type_args, @args) = @_;
515
516   return {
517     schema=>$schema,
518     %$storage_type_args,
519     @args
520   }
521 }
522
523 =head2 _build_master
524
525 Lazy builder for the L</master> attribute.
526
527 =cut
528
529 sub _build_master {
530   my $self = shift @_;
531   my $master = DBIx::Class::Storage::DBI->new($self->schema);
532   $master
533 }
534
535 =head2 _build_pool
536
537 Lazy builder for the L</pool> attribute.
538
539 =cut
540
541 sub _build_pool {
542   my $self = shift @_;
543   $self->create_pool(%{$self->pool_args});
544 }
545
546 =head2 _build_balancer
547
548 Lazy builder for the L</balancer> attribute.  This takes a Pool object so that
549 the balancer knows which pool it's balancing.
550
551 =cut
552
553 sub _build_balancer {
554   my $self = shift @_;
555   $self->create_balancer(
556     pool=>$self->pool,
557     master=>$self->master,
558     %{$self->balancer_args},
559   );
560 }
561
562 =head2 _build_write_handler
563
564 Lazy builder for the L</write_handler> attribute.  The default is to set this to
565 the L</master>.
566
567 =cut
568
569 sub _build_write_handler {
570   return shift->master;
571 }
572
573 =head2 _build_read_handler
574
575 Lazy builder for the L</read_handler> attribute.  The default is to set this to
576 the L</balancer>.
577
578 =cut
579
580 sub _build_read_handler {
581   return shift->balancer;
582 }
583
584 =head2 around: connect_replicants
585
586 All calls to connect_replicants needs to have an existing $schema tacked onto
587 top of the args, since L<DBIx::Storage::DBI> needs it, and any C<connect_info>
588 options merged with the master, with replicant opts having higher priority.
589
590 =cut
591
592 around connect_replicants => sub {
593   my ($next, $self, @args) = @_;
594
595   for my $r (@args) {
596     $r = [ $r ] unless reftype $r eq 'ARRAY';
597
598     $self->throw_exception('coderef replicant connect_info not supported')
599       if ref $r->[0] && reftype $r->[0] eq 'CODE';
600
601 # any connect_info options?
602     my $i = 0;
603     $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
604
605 # make one if none
606     $r->[$i] = {} unless $r->[$i];
607
608 # merge if two hashes
609     my @hashes = @$r[$i .. $#{$r}];
610
611     $self->throw_exception('invalid connect_info options')
612       if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes;
613
614     $self->throw_exception('too many hashrefs in connect_info')
615       if @hashes > 2;
616
617     my $merge = Hash::Merge->new('LEFT_PRECEDENT');
618     my %opts = %{ $merge->merge(reverse @hashes) };
619
620 # delete them
621     splice @$r, $i+1, ($#{$r} - $i), ();
622
623 # make sure master/replicants opts don't clash
624     my %master_opts = %{ $self->_master_connect_info_opts };
625     if (exists $opts{dbh_maker}) {
626         delete @master_opts{qw/dsn user password/};
627     }
628     delete $master_opts{dbh_maker};
629
630 # merge with master
631     %opts = %{ $merge->merge(\%opts, \%master_opts) };
632
633 # update
634     $r->[$i] = \%opts;
635   }
636
637   $self->$next($self->schema, @args);
638 };
639
640 =head2 all_storages
641
642 Returns an array of of all the connected storage backends.  The first element
643 in the returned array is the master, and the remainings are each of the
644 replicants.
645
646 =cut
647
648 sub all_storages {
649   my $self = shift @_;
650   return grep {defined $_ && blessed $_} (
651      $self->master,
652      values %{ $self->replicants },
653   );
654 }
655
656 =head2 execute_reliably ($coderef, ?@args)
657
658 Given a coderef, saves the current state of the L</read_handler>, forces it to
659 use reliable storage (e.g. sets it to the master), executes a coderef and then
660 restores the original state.
661
662 Example:
663
664   my $reliably = sub {
665     my $name = shift @_;
666     $schema->resultset('User')->create({name=>$name});
667     my $user_rs = $schema->resultset('User')->find({name=>$name});
668     return $user_rs;
669   };
670
671   my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
672
673 Use this when you must be certain of your database state, such as when you just
674 inserted something and need to get a resultset including it, etc.
675
676 =cut
677
678 sub execute_reliably {
679   my ($self, $coderef, @args) = @_;
680
681   unless( ref $coderef eq 'CODE') {
682     $self->throw_exception('Second argument must be a coderef');
683   }
684
685   ##Get copy of master storage
686   my $master = $self->master;
687
688   ##Get whatever the current read hander is
689   my $current = $self->read_handler;
690
691   ##Set the read handler to master
692   $self->read_handler($master);
693
694   ## do whatever the caller needs
695   my @result;
696   my $want_array = wantarray;
697
698   try {
699     if($want_array) {
700       @result = $coderef->(@args);
701     } elsif(defined $want_array) {
702       ($result[0]) = ($coderef->(@args));
703     } else {
704       $coderef->(@args);
705     }
706   } catch {
707     $self->throw_exception("coderef returned an error: $_");
708   } finally {
709     ##Reset to the original state
710     $self->read_handler($current);
711   };
712
713   return wantarray ? @result : $result[0];
714 }
715
716 =head2 set_reliable_storage
717
718 Sets the current $schema to be 'reliable', that is all queries, both read and
719 write are sent to the master
720
721 =cut
722
723 sub set_reliable_storage {
724   my $self = shift @_;
725   my $schema = $self->schema;
726   my $write_handler = $self->schema->storage->write_handler;
727
728   $schema->storage->read_handler($write_handler);
729 }
730
731 =head2 set_balanced_storage
732
733 Sets the current $schema to be use the </balancer> for all reads, while all
734 writes are sent to the master only
735
736 =cut
737
738 sub set_balanced_storage {
739   my $self = shift @_;
740   my $schema = $self->schema;
741   my $balanced_handler = $self->schema->storage->balancer;
742
743   $schema->storage->read_handler($balanced_handler);
744 }
745
746 =head2 connected
747
748 Check that the master and at least one of the replicants is connected.
749
750 =cut
751
752 sub connected {
753   my $self = shift @_;
754   return
755     $self->master->connected &&
756     $self->pool->connected_replicants;
757 }
758
759 =head2 ensure_connected
760
761 Make sure all the storages are connected.
762
763 =cut
764
765 sub ensure_connected {
766   my $self = shift @_;
767   foreach my $source ($self->all_storages) {
768     $source->ensure_connected(@_);
769   }
770 }
771
772 =head2 limit_dialect
773
774 Set the limit_dialect for all existing storages
775
776 =cut
777
778 sub limit_dialect {
779   my $self = shift @_;
780   foreach my $source ($self->all_storages) {
781     $source->limit_dialect(@_);
782   }
783   return $self->master->limit_dialect;
784 }
785
786 =head2 quote_char
787
788 Set the quote_char for all existing storages
789
790 =cut
791
792 sub quote_char {
793   my $self = shift @_;
794   foreach my $source ($self->all_storages) {
795     $source->quote_char(@_);
796   }
797   return $self->master->quote_char;
798 }
799
800 =head2 name_sep
801
802 Set the name_sep for all existing storages
803
804 =cut
805
806 sub name_sep {
807   my $self = shift @_;
808   foreach my $source ($self->all_storages) {
809     $source->name_sep(@_);
810   }
811   return $self->master->name_sep;
812 }
813
814 =head2 set_schema
815
816 Set the schema object for all existing storages
817
818 =cut
819
820 sub set_schema {
821   my $self = shift @_;
822   foreach my $source ($self->all_storages) {
823     $source->set_schema(@_);
824   }
825 }
826
827 =head2 debug
828
829 set a debug flag across all storages
830
831 =cut
832
833 sub debug {
834   my $self = shift @_;
835   if(@_) {
836     foreach my $source ($self->all_storages) {
837       $source->debug(@_);
838     }
839   }
840   return $self->master->debug;
841 }
842
843 =head2 debugobj
844
845 set a debug object
846
847 =cut
848
849 sub debugobj {
850   my $self = shift @_;
851   return $self->master->debugobj(@_);
852 }
853
854 =head2 debugfh
855
856 set a debugfh object
857
858 =cut
859
860 sub debugfh {
861   my $self = shift @_;
862   return $self->master->debugfh(@_);
863 }
864
865 =head2 debugcb
866
867 set a debug callback
868
869 =cut
870
871 sub debugcb {
872   my $self = shift @_;
873   return $self->master->debugcb(@_);
874 }
875
876 =head2 disconnect
877
878 disconnect everything
879
880 =cut
881
882 sub disconnect {
883   my $self = shift @_;
884   foreach my $source ($self->all_storages) {
885     $source->disconnect(@_);
886   }
887 }
888
889 =head2 cursor_class
890
891 set cursor class on all storages, or return master's
892
893 =cut
894
895 sub cursor_class {
896   my ($self, $cursor_class) = @_;
897
898   if ($cursor_class) {
899     $_->cursor_class($cursor_class) for $self->all_storages;
900   }
901   $self->master->cursor_class;
902 }
903
904 =head2 cursor
905
906 set cursor class on all storages, or return master's, alias for L</cursor_class>
907 above.
908
909 =cut
910
911 sub cursor {
912   my ($self, $cursor_class) = @_;
913
914   if ($cursor_class) {
915     $_->cursor($cursor_class) for $self->all_storages;
916   }
917   $self->master->cursor;
918 }
919
920 =head2 unsafe
921
922 sets the L<DBIx::Class::Storage::DBI/unsafe> option on all storages or returns
923 master's current setting
924
925 =cut
926
927 sub unsafe {
928   my $self = shift;
929
930   if (@_) {
931     $_->unsafe(@_) for $self->all_storages;
932   }
933
934   return $self->master->unsafe;
935 }
936
937 =head2 disable_sth_caching
938
939 sets the L<DBIx::Class::Storage::DBI/disable_sth_caching> option on all storages
940 or returns master's current setting
941
942 =cut
943
944 sub disable_sth_caching {
945   my $self = shift;
946
947   if (@_) {
948     $_->disable_sth_caching(@_) for $self->all_storages;
949   }
950
951   return $self->master->disable_sth_caching;
952 }
953
954 =head2 lag_behind_master
955
956 returns the highest Replicant L<DBIx::Class::Storage::DBI/lag_behind_master>
957 setting
958
959 =cut
960
961 sub lag_behind_master {
962   my $self = shift;
963
964   return max map $_->lag_behind_master, $self->replicants;
965 }
966
967 =head2 is_replicating
968
969 returns true if all replicants return true for
970 L<DBIx::Class::Storage::DBI/is_replicating>
971
972 =cut
973
974 sub is_replicating {
975   my $self = shift;
976
977   return (grep $_->is_replicating, $self->replicants) == ($self->replicants);
978 }
979
980 =head2 connect_call_datetime_setup
981
982 calls L<DBIx::Class::Storage::DBI/connect_call_datetime_setup> for all storages
983
984 =cut
985
986 sub connect_call_datetime_setup {
987   my $self = shift;
988   $_->connect_call_datetime_setup for $self->all_storages;
989 }
990
991 sub _populate_dbh {
992   my $self = shift;
993   $_->_populate_dbh for $self->all_storages;
994 }
995
996 sub _connect {
997   my $self = shift;
998   $_->_connect for $self->all_storages;
999 }
1000
1001 sub _rebless {
1002   my $self = shift;
1003   $_->_rebless for $self->all_storages;
1004 }
1005
1006 sub _determine_driver {
1007   my $self = shift;
1008   $_->_determine_driver for $self->all_storages;
1009 }
1010
1011 sub _driver_determined {
1012   my $self = shift;
1013
1014   if (@_) {
1015     $_->_driver_determined(@_) for $self->all_storages;
1016   }
1017
1018   return $self->master->_driver_determined;
1019 }
1020
1021 sub _init {
1022   my $self = shift;
1023
1024   $_->_init for $self->all_storages;
1025 }
1026
1027 sub _run_connection_actions {
1028   my $self = shift;
1029
1030   $_->_run_connection_actions for $self->all_storages;
1031 }
1032
1033 sub _do_connection_actions {
1034   my $self = shift;
1035
1036   if (@_) {
1037     $_->_do_connection_actions(@_) for $self->all_storages;
1038   }
1039 }
1040
1041 sub connect_call_do_sql {
1042   my $self = shift;
1043   $_->connect_call_do_sql(@_) for $self->all_storages;
1044 }
1045
1046 sub disconnect_call_do_sql {
1047   my $self = shift;
1048   $_->disconnect_call_do_sql(@_) for $self->all_storages;
1049 }
1050
1051 sub _seems_connected {
1052   my $self = shift;
1053
1054   return min map $_->_seems_connected, $self->all_storages;
1055 }
1056
1057 sub _ping {
1058   my $self = shift;
1059
1060   return min map $_->_ping, $self->all_storages;
1061 }
1062
1063 # not using the normalized_version, because we want to preserve
1064 # version numbers much longer than the conventional xxx.yyyzzz
1065 my $numify_ver = sub {
1066   my $ver = shift;
1067   my @numparts = split /\D+/, $ver;
1068   my $format = '%d.' . (join '', ('%06d') x (@numparts - 1));
1069
1070   return sprintf $format, @numparts;
1071 };
1072 sub _server_info {
1073   my $self = shift;
1074
1075   if (not $self->_dbh_details->{info}) {
1076     $self->_dbh_details->{info} = (
1077       reduce { $a->[0] < $b->[0] ? $a : $b }
1078       map [ $numify_ver->($_->{dbms_version}), $_ ],
1079       map $_->_server_info, $self->all_storages
1080     )->[1];
1081   }
1082
1083   return $self->next::method;
1084 }
1085
1086 sub _get_server_version {
1087   my $self = shift;
1088
1089   return $self->_server_info->{dbms_version};
1090 }
1091
1092 =head1 GOTCHAS
1093
1094 Due to the fact that replicants can lag behind a master, you must take care to
1095 make sure you use one of the methods to force read queries to a master should
1096 you need realtime data integrity.  For example, if you insert a row, and then
1097 immediately re-read it from the database (say, by doing $row->discard_changes)
1098 or you insert a row and then immediately build a query that expects that row
1099 to be an item, you should force the master to handle reads.  Otherwise, due to
1100 the lag, there is no certainty your data will be in the expected state.
1101
1102 For data integrity, all transactions automatically use the master storage for
1103 all read and write queries.  Using a transaction is the preferred and recommended
1104 method to force the master to handle all read queries.
1105
1106 Otherwise, you can force a single query to use the master with the 'force_pool'
1107 attribute:
1108
1109   my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk);
1110
1111 This attribute will safely be ignore by non replicated storages, so you can use
1112 the same code for both types of systems.
1113
1114 Lastly, you can use the L</execute_reliably> method, which works very much like
1115 a transaction.
1116
1117 For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
1118 and L</set_balanced_storage>, however this operates at a global level and is not
1119 suitable if you have a shared Schema object being used by multiple processes,
1120 such as on a web application server.  You can get around this limitation by
1121 using the Schema clone method.
1122
1123   my $new_schema = $schema->clone;
1124   $new_schema->set_reliable_storage;
1125
1126   ## $new_schema will use only the Master storage for all reads/writes while
1127   ## the $schema object will use replicated storage.
1128
1129 =head1 AUTHOR
1130
1131   John Napiorkowski <john.napiorkowski@takkle.com>
1132
1133 Based on code originated by:
1134
1135   Norbert Csongrádi <bert@cpan.org>
1136   Peter Siklósi <einon@einon.hu>
1137
1138 =head1 LICENSE
1139
1140 You may distribute this code under the same terms as Perl itself.
1141
1142 =cut
1143
1144 __PACKAGE__->meta->make_immutable;
1145
1146 1;