Majorly cleanup $rs->update/delete (no $rs-aware code should be in ::Storages)
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 BEGIN {
4   use DBIx::Class;
5   die('The following modules are required for Replication ' . DBIx::Class::Optional::Dependencies->req_missing_for ('replicated') . "\n" )
6     unless DBIx::Class::Optional::Dependencies->req_ok_for ('replicated');
7 }
8
9 use Moose;
10 use DBIx::Class::Storage::DBI;
11 use DBIx::Class::Storage::DBI::Replicated::Pool;
12 use DBIx::Class::Storage::DBI::Replicated::Balancer;
13 use DBIx::Class::Storage::DBI::Replicated::Types qw/BalancerClassNamePart DBICSchema DBICStorageDBI/;
14 use MooseX::Types::Moose qw/ClassName HashRef Object/;
15 use Scalar::Util 'reftype';
16 use Hash::Merge;
17 use List::Util qw/min max reduce/;
18 use Try::Tiny;
19 use namespace::clean;
20
21 use namespace::clean -except => 'meta';
22
23 =head1 NAME
24
25 DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
26
27 =head1 SYNOPSIS
28
29 The Following example shows how to change an existing $schema to a replicated
30 storage type, add some replicated (read-only) databases, and perform reporting
31 tasks.
32
33 You should set the 'storage_type attribute to a replicated type.  You should
34 also define your arguments, such as which balancer you want and any arguments
35 that the Pool object should get.
36
37   my $schema = Schema::Class->clone;
38   $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
39   $schema->connection(...);
40
41 Next, you need to add in the Replicants.  Basically this is an array of
42 arrayrefs, where each arrayref is database connect information.  Think of these
43 arguments as what you'd pass to the 'normal' $schema->connect method.
44
45   $schema->storage->connect_replicants(
46     [$dsn1, $user, $pass, \%opts],
47     [$dsn2, $user, $pass, \%opts],
48     [$dsn3, $user, $pass, \%opts],
49   );
50
51 Now, just use the $schema as you normally would.  Automatically all reads will
52 be delegated to the replicants, while writes to the master.
53
54   $schema->resultset('Source')->search({name=>'etc'});
55
56 You can force a given query to use a particular storage using the search
57 attribute 'force_pool'.  For example:
58
59   my $rs = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
60
61 Now $rs will force everything (both reads and writes) to use whatever was setup
62 as the master storage.  'master' is hardcoded to always point to the Master,
63 but you can also use any Replicant name.  Please see:
64 L<DBIx::Class::Storage::DBI::Replicated::Pool> and the replicants attribute for more.
65
66 Also see transactions and L</execute_reliably> for alternative ways to
67 force read traffic to the master.  In general, you should wrap your statements
68 in a transaction when you are reading and writing to the same tables at the
69 same time, since your replicants will often lag a bit behind the master.
70
71 If you have a multi-statement read only transaction you can force it to select
72 a random server in the pool by:
73
74   my $rs = $schema->resultset('Source')->search( undef,
75     { force_pool => $db->storage->read_handler->next_storage }
76   );
77
78 =head1 DESCRIPTION
79
80 Warning: This class is marked BETA.  This has been running a production
81 website using MySQL native replication as its backend and we have some decent
82 test coverage but the code hasn't yet been stressed by a variety of databases.
83 Individual DBs may have quirks we are not aware of.  Please use this in first
84 development and pass along your experiences/bug fixes.
85
86 This class implements replicated data store for DBI. Currently you can define
87 one master and numerous slave database connections. All write-type queries
88 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
89 database, all read-type queries (SELECTs) go to the slave database.
90
91 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
92 handle gets delegated to one of the two attributes: L</read_handler> or to
93 L</write_handler>.  Additionally, some methods need to be distributed
94 to all existing storages.  This way our storage class is a drop in replacement
95 for L<DBIx::Class::Storage::DBI>.
96
97 Read traffic is spread across the replicants (slaves) occurring to a user
98 selected algorithm.  The default algorithm is random weighted.
99
100 =head1 NOTES
101
102 The consistency between master and replicants is database specific.  The Pool
103 gives you a method to validate its replicants, removing and replacing them
104 when they fail/pass predefined criteria.  Please make careful use of the ways
105 to force a query to run against Master when needed.
106
107 =head1 REQUIREMENTS
108
109 Replicated Storage has additional requirements not currently part of
110 L<DBIx::Class>. See L<DBIx::Class::Optional::Dependencies> for more details.
111
112 =head1 ATTRIBUTES
113
114 This class defines the following attributes.
115
116 =head2 schema
117
118 The underlying L<DBIx::Class::Schema> object this storage is attaching
119
120 =cut
121
122 has 'schema' => (
123     is=>'rw',
124     isa=>DBICSchema,
125     weak_ref=>1,
126     required=>1,
127 );
128
129 =head2 pool_type
130
131 Contains the classname which will instantiate the L</pool> object.  Defaults
132 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
133
134 =cut
135
136 has 'pool_type' => (
137   is=>'rw',
138   isa=>ClassName,
139   default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
140   handles=>{
141     'create_pool' => 'new',
142   },
143 );
144
145 =head2 pool_args
146
147 Contains a hashref of initialized information to pass to the Balancer object.
148 See L<DBIx::Class::Storage::DBI::Replicated::Pool> for available arguments.
149
150 =cut
151
152 has 'pool_args' => (
153   is=>'rw',
154   isa=>HashRef,
155   lazy=>1,
156   default=>sub { {} },
157 );
158
159
160 =head2 balancer_type
161
162 The replication pool requires a balance class to provider the methods for
163 choose how to spread the query load across each replicant in the pool.
164
165 =cut
166
167 has 'balancer_type' => (
168   is=>'rw',
169   isa=>BalancerClassNamePart,
170   coerce=>1,
171   required=>1,
172   default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
173   handles=>{
174     'create_balancer' => 'new',
175   },
176 );
177
178 =head2 balancer_args
179
180 Contains a hashref of initialized information to pass to the Balancer object.
181 See L<DBIx::Class::Storage::DBI::Replicated::Balancer> for available arguments.
182
183 =cut
184
185 has 'balancer_args' => (
186   is=>'rw',
187   isa=>HashRef,
188   lazy=>1,
189   required=>1,
190   default=>sub { {} },
191 );
192
193 =head2 pool
194
195 Is a L<DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
196 container class for one or more replicated databases.
197
198 =cut
199
200 has 'pool' => (
201   is=>'ro',
202   isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
203   lazy_build=>1,
204   handles=>[qw/
205     connect_replicants
206     replicants
207     has_replicants
208   /],
209 );
210
211 =head2 balancer
212
213 Is a L<DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This
214 is a class that takes a pool (L<DBIx::Class::Storage::DBI::Replicated::Pool>)
215
216 =cut
217
218 has 'balancer' => (
219   is=>'rw',
220   isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
221   lazy_build=>1,
222   handles=>[qw/auto_validate_every/],
223 );
224
225 =head2 master
226
227 The master defines the canonical state for a pool of connected databases.  All
228 the replicants are expected to match this databases state.  Thus, in a classic
229 Master / Slaves distributed system, all the slaves are expected to replicate
230 the Master's state as quick as possible.  This is the only database in the
231 pool of databases that is allowed to handle write traffic.
232
233 =cut
234
235 has 'master' => (
236   is=> 'ro',
237   isa=>DBICStorageDBI,
238   lazy_build=>1,
239 );
240
241 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
242
243 The following methods are delegated all the methods required for the
244 L<DBIx::Class::Storage::DBI> interface.
245
246 =cut
247
248 my $method_dispatch = {
249   writer => [qw/
250     on_connect_do
251     on_disconnect_do
252     on_connect_call
253     on_disconnect_call
254     connect_info
255     _connect_info
256     throw_exception
257     sql_maker
258     sqlt_type
259     create_ddl_dir
260     deployment_statements
261     datetime_parser
262     datetime_parser_type
263     build_datetime_parser
264     last_insert_id
265     insert
266     insert_bulk
267     update
268     delete
269     dbh
270     txn_begin
271     txn_do
272     txn_commit
273     txn_rollback
274     txn_scope_guard
275     _exec_txn_rollback
276     _exec_txn_begin
277     _exec_txn_commit
278     deploy
279     with_deferred_fk_checks
280     dbh_do
281     _prep_for_execute
282     is_datatype_numeric
283     _count_select
284     svp_rollback
285     svp_begin
286     svp_release
287     relname_to_table_alias
288     _dbh_last_insert_id
289     _default_dbi_connect_attributes
290     _dbi_connect_info
291     _dbic_connect_attributes
292     auto_savepoint
293     _query_start
294     _query_end
295     _format_for_trace
296     _dbi_attrs_for_bind
297     bind_attribute_by_data_type
298     transaction_depth
299     _dbh
300     _select_args
301     _dbh_execute_for_fetch
302     _sql_maker
303     _dbh_execute_inserts_with_no_binds
304     _select_args_to_query
305     _gen_sql_bind
306     _svp_generate_name
307     _normalize_connect_info
308     _parse_connect_do
309     savepoints
310     _sql_maker_opts
311     _conn_pid
312     _dbh_autocommit
313     _native_data_type
314     _get_dbh
315     sql_maker_class
316     _execute
317     _do_query
318     _sth
319     _dbh_sth
320     _dbh_execute
321   /, Class::MOP::Class->initialize('DBIx::Class::Storage::DBIHacks')->get_method_list ],
322   reader => [qw/
323     select
324     select_single
325     columns_info_for
326     _dbh_columns_info_for
327     _select
328   /],
329   unimplemented => [qw/
330     _arm_global_destructor
331     _verify_pid
332
333     source_bind_attributes
334
335     get_use_dbms_capability
336     set_use_dbms_capability
337     get_dbms_capability
338     set_dbms_capability
339     _dbh_details
340     _dbh_get_info
341
342     sql_limit_dialect
343     sql_quote_char
344     sql_name_sep
345
346     _prefetch_autovalues
347
348     _resolve_bindattrs
349
350     _max_column_bytesize
351     _is_lob_type
352     _is_binary_lob_type
353     _is_text_lob_type
354
355     sth
356   /,(
357     # the capability framework
358     # not sure if CMOP->initialize does evil things to DBIC::S::DBI, fix if a problem
359     grep
360       { $_ =~ /^ _ (?: use | supports | determine_supports ) _ /x }
361       ( Class::MOP::Class->initialize('DBIx::Class::Storage::DBI')->get_all_method_names )
362   )],
363 };
364
365 if (DBIx::Class::_ENV_::DBICTEST) {
366
367   my $seen;
368   for my $type (keys %$method_dispatch) {
369     for (@{$method_dispatch->{$type}}) {
370       push @{$seen->{$_}}, $type;
371     }
372   }
373
374   if (my @dupes = grep { @{$seen->{$_}} > 1 } keys %$seen) {
375     die(join "\n", '',
376       'The following methods show up multiple times in ::Storage::DBI::Replicated handlers:',
377       (map { "$_: " . (join ', ', @{$seen->{$_}}) } sort @dupes),
378       '',
379     );
380   }
381
382   if (my @cant = grep { ! DBIx::Class::Storage::DBI->can($_) } keys %$seen) {
383     die(join "\n", '',
384       '::Storage::DBI::Replicated specifies handling of the following *NON EXISTING* ::Storage::DBI methods:',
385       @cant,
386       '',
387     );
388   }
389 }
390
391 for my $method (@{$method_dispatch->{unimplemented}}) {
392   __PACKAGE__->meta->add_method($method, sub {
393     my $self = shift;
394     $self->throw_exception("$method must not be called on ".(blessed $self).' objects');
395   });
396 }
397
398 =head2 read_handler
399
400 Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
401
402 =cut
403
404 has 'read_handler' => (
405   is=>'rw',
406   isa=>Object,
407   lazy_build=>1,
408   handles=>$method_dispatch->{reader},
409 );
410
411 =head2 write_handler
412
413 Defines an object that implements the write side of L<BIx::Class::Storage::DBI>,
414 as well as methods that don't write or read that can be called on only one
415 storage, methods that return a C<$dbh>, and any methods that don't make sense to
416 run on a replicant.
417
418 =cut
419
420 has 'write_handler' => (
421   is=>'ro',
422   isa=>Object,
423   lazy_build=>1,
424   handles=>$method_dispatch->{writer},
425 );
426
427
428
429 has _master_connect_info_opts =>
430   (is => 'rw', isa => HashRef, default => sub { {} });
431
432 =head2 around: connect_info
433
434 Preserves master's C<connect_info> options (for merging with replicants.)
435 Also sets any Replicated-related options from connect_info, such as
436 C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>.
437
438 =cut
439
440 around connect_info => sub {
441   my ($next, $self, $info, @extra) = @_;
442
443   my $merge = Hash::Merge->new('LEFT_PRECEDENT');
444
445   my %opts;
446   for my $arg (@$info) {
447     next unless (reftype($arg)||'') eq 'HASH';
448     %opts = %{ $merge->merge($arg, \%opts) };
449   }
450   delete $opts{dsn};
451
452   if (@opts{qw/pool_type pool_args/}) {
453     $self->pool_type(delete $opts{pool_type})
454       if $opts{pool_type};
455
456     $self->pool_args(
457       $merge->merge((delete $opts{pool_args} || {}), $self->pool_args)
458     );
459
460     ## Since we possibly changed the pool_args, we need to clear the current
461     ## pool object so that next time it is used it will be rebuilt.
462     $self->clear_pool;
463   }
464
465   if (@opts{qw/balancer_type balancer_args/}) {
466     $self->balancer_type(delete $opts{balancer_type})
467       if $opts{balancer_type};
468
469     $self->balancer_args(
470       $merge->merge((delete $opts{balancer_args} || {}), $self->balancer_args)
471     );
472
473     $self->balancer($self->_build_balancer)
474       if $self->balancer;
475   }
476
477   $self->_master_connect_info_opts(\%opts);
478
479   my @res;
480   if (wantarray) {
481     @res = $self->$next($info, @extra);
482   } else {
483     $res[0] = $self->$next($info, @extra);
484   }
485
486   # Make sure master is blessed into the correct class and apply role to it.
487   my $master = $self->master;
488   $master->_determine_driver;
489   Moose::Meta::Class->initialize(ref $master);
490
491   DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master);
492
493   # link pool back to master
494   $self->pool->master($master);
495
496   wantarray ? @res : $res[0];
497 };
498
499 =head1 METHODS
500
501 This class defines the following methods.
502
503 =head2 BUILDARGS
504
505 L<DBIx::Class::Schema> when instantiating its storage passed itself as the
506 first argument.  So we need to massage the arguments a bit so that all the
507 bits get put into the correct places.
508
509 =cut
510
511 sub BUILDARGS {
512   my ($class, $schema, $storage_type_args, @args) = @_;
513
514   return {
515     schema=>$schema,
516     %$storage_type_args,
517     @args
518   }
519 }
520
521 =head2 _build_master
522
523 Lazy builder for the L</master> attribute.
524
525 =cut
526
527 sub _build_master {
528   my $self = shift @_;
529   my $master = DBIx::Class::Storage::DBI->new($self->schema);
530   $master
531 }
532
533 =head2 _build_pool
534
535 Lazy builder for the L</pool> attribute.
536
537 =cut
538
539 sub _build_pool {
540   my $self = shift @_;
541   $self->create_pool(%{$self->pool_args});
542 }
543
544 =head2 _build_balancer
545
546 Lazy builder for the L</balancer> attribute.  This takes a Pool object so that
547 the balancer knows which pool it's balancing.
548
549 =cut
550
551 sub _build_balancer {
552   my $self = shift @_;
553   $self->create_balancer(
554     pool=>$self->pool,
555     master=>$self->master,
556     %{$self->balancer_args},
557   );
558 }
559
560 =head2 _build_write_handler
561
562 Lazy builder for the L</write_handler> attribute.  The default is to set this to
563 the L</master>.
564
565 =cut
566
567 sub _build_write_handler {
568   return shift->master;
569 }
570
571 =head2 _build_read_handler
572
573 Lazy builder for the L</read_handler> attribute.  The default is to set this to
574 the L</balancer>.
575
576 =cut
577
578 sub _build_read_handler {
579   return shift->balancer;
580 }
581
582 =head2 around: connect_replicants
583
584 All calls to connect_replicants needs to have an existing $schema tacked onto
585 top of the args, since L<DBIx::Storage::DBI> needs it, and any C<connect_info>
586 options merged with the master, with replicant opts having higher priority.
587
588 =cut
589
590 around connect_replicants => sub {
591   my ($next, $self, @args) = @_;
592
593   for my $r (@args) {
594     $r = [ $r ] unless reftype $r eq 'ARRAY';
595
596     $self->throw_exception('coderef replicant connect_info not supported')
597       if ref $r->[0] && reftype $r->[0] eq 'CODE';
598
599 # any connect_info options?
600     my $i = 0;
601     $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
602
603 # make one if none
604     $r->[$i] = {} unless $r->[$i];
605
606 # merge if two hashes
607     my @hashes = @$r[$i .. $#{$r}];
608
609     $self->throw_exception('invalid connect_info options')
610       if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes;
611
612     $self->throw_exception('too many hashrefs in connect_info')
613       if @hashes > 2;
614
615     my $merge = Hash::Merge->new('LEFT_PRECEDENT');
616     my %opts = %{ $merge->merge(reverse @hashes) };
617
618 # delete them
619     splice @$r, $i+1, ($#{$r} - $i), ();
620
621 # make sure master/replicants opts don't clash
622     my %master_opts = %{ $self->_master_connect_info_opts };
623     if (exists $opts{dbh_maker}) {
624         delete @master_opts{qw/dsn user password/};
625     }
626     delete $master_opts{dbh_maker};
627
628 # merge with master
629     %opts = %{ $merge->merge(\%opts, \%master_opts) };
630
631 # update
632     $r->[$i] = \%opts;
633   }
634
635   $self->$next($self->schema, @args);
636 };
637
638 =head2 all_storages
639
640 Returns an array of of all the connected storage backends.  The first element
641 in the returned array is the master, and the remainings are each of the
642 replicants.
643
644 =cut
645
646 sub all_storages {
647   my $self = shift @_;
648   return grep {defined $_ && blessed $_} (
649      $self->master,
650      values %{ $self->replicants },
651   );
652 }
653
654 =head2 execute_reliably ($coderef, ?@args)
655
656 Given a coderef, saves the current state of the L</read_handler>, forces it to
657 use reliable storage (e.g. sets it to the master), executes a coderef and then
658 restores the original state.
659
660 Example:
661
662   my $reliably = sub {
663     my $name = shift @_;
664     $schema->resultset('User')->create({name=>$name});
665     my $user_rs = $schema->resultset('User')->find({name=>$name});
666     return $user_rs;
667   };
668
669   my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
670
671 Use this when you must be certain of your database state, such as when you just
672 inserted something and need to get a resultset including it, etc.
673
674 =cut
675
676 sub execute_reliably {
677   my ($self, $coderef, @args) = @_;
678
679   unless( ref $coderef eq 'CODE') {
680     $self->throw_exception('Second argument must be a coderef');
681   }
682
683   ##Get copy of master storage
684   my $master = $self->master;
685
686   ##Get whatever the current read hander is
687   my $current = $self->read_handler;
688
689   ##Set the read handler to master
690   $self->read_handler($master);
691
692   ## do whatever the caller needs
693   my @result;
694   my $want_array = wantarray;
695
696   try {
697     if($want_array) {
698       @result = $coderef->(@args);
699     } elsif(defined $want_array) {
700       ($result[0]) = ($coderef->(@args));
701     } else {
702       $coderef->(@args);
703     }
704   } catch {
705     $self->throw_exception("coderef returned an error: $_");
706   } finally {
707     ##Reset to the original state
708     $self->read_handler($current);
709   };
710
711   return wantarray ? @result : $result[0];
712 }
713
714 =head2 set_reliable_storage
715
716 Sets the current $schema to be 'reliable', that is all queries, both read and
717 write are sent to the master
718
719 =cut
720
721 sub set_reliable_storage {
722   my $self = shift @_;
723   my $schema = $self->schema;
724   my $write_handler = $self->schema->storage->write_handler;
725
726   $schema->storage->read_handler($write_handler);
727 }
728
729 =head2 set_balanced_storage
730
731 Sets the current $schema to be use the </balancer> for all reads, while all
732 writes are sent to the master only
733
734 =cut
735
736 sub set_balanced_storage {
737   my $self = shift @_;
738   my $schema = $self->schema;
739   my $balanced_handler = $self->schema->storage->balancer;
740
741   $schema->storage->read_handler($balanced_handler);
742 }
743
744 =head2 connected
745
746 Check that the master and at least one of the replicants is connected.
747
748 =cut
749
750 sub connected {
751   my $self = shift @_;
752   return
753     $self->master->connected &&
754     $self->pool->connected_replicants;
755 }
756
757 =head2 ensure_connected
758
759 Make sure all the storages are connected.
760
761 =cut
762
763 sub ensure_connected {
764   my $self = shift @_;
765   foreach my $source ($self->all_storages) {
766     $source->ensure_connected(@_);
767   }
768 }
769
770 =head2 limit_dialect
771
772 Set the limit_dialect for all existing storages
773
774 =cut
775
776 sub limit_dialect {
777   my $self = shift @_;
778   foreach my $source ($self->all_storages) {
779     $source->limit_dialect(@_);
780   }
781   return $self->master->limit_dialect;
782 }
783
784 =head2 quote_char
785
786 Set the quote_char for all existing storages
787
788 =cut
789
790 sub quote_char {
791   my $self = shift @_;
792   foreach my $source ($self->all_storages) {
793     $source->quote_char(@_);
794   }
795   return $self->master->quote_char;
796 }
797
798 =head2 name_sep
799
800 Set the name_sep for all existing storages
801
802 =cut
803
804 sub name_sep {
805   my $self = shift @_;
806   foreach my $source ($self->all_storages) {
807     $source->name_sep(@_);
808   }
809   return $self->master->name_sep;
810 }
811
812 =head2 set_schema
813
814 Set the schema object for all existing storages
815
816 =cut
817
818 sub set_schema {
819   my $self = shift @_;
820   foreach my $source ($self->all_storages) {
821     $source->set_schema(@_);
822   }
823 }
824
825 =head2 debug
826
827 set a debug flag across all storages
828
829 =cut
830
831 sub debug {
832   my $self = shift @_;
833   if(@_) {
834     foreach my $source ($self->all_storages) {
835       $source->debug(@_);
836     }
837   }
838   return $self->master->debug;
839 }
840
841 =head2 debugobj
842
843 set a debug object
844
845 =cut
846
847 sub debugobj {
848   my $self = shift @_;
849   return $self->master->debugobj(@_);
850 }
851
852 =head2 debugfh
853
854 set a debugfh object
855
856 =cut
857
858 sub debugfh {
859   my $self = shift @_;
860   return $self->master->debugfh(@_);
861 }
862
863 =head2 debugcb
864
865 set a debug callback
866
867 =cut
868
869 sub debugcb {
870   my $self = shift @_;
871   return $self->master->debugcb(@_);
872 }
873
874 =head2 disconnect
875
876 disconnect everything
877
878 =cut
879
880 sub disconnect {
881   my $self = shift @_;
882   foreach my $source ($self->all_storages) {
883     $source->disconnect(@_);
884   }
885 }
886
887 =head2 cursor_class
888
889 set cursor class on all storages, or return master's
890
891 =cut
892
893 sub cursor_class {
894   my ($self, $cursor_class) = @_;
895
896   if ($cursor_class) {
897     $_->cursor_class($cursor_class) for $self->all_storages;
898   }
899   $self->master->cursor_class;
900 }
901
902 =head2 cursor
903
904 set cursor class on all storages, or return master's, alias for L</cursor_class>
905 above.
906
907 =cut
908
909 sub cursor {
910   my ($self, $cursor_class) = @_;
911
912   if ($cursor_class) {
913     $_->cursor($cursor_class) for $self->all_storages;
914   }
915   $self->master->cursor;
916 }
917
918 =head2 unsafe
919
920 sets the L<DBIx::Class::Storage::DBI/unsafe> option on all storages or returns
921 master's current setting
922
923 =cut
924
925 sub unsafe {
926   my $self = shift;
927
928   if (@_) {
929     $_->unsafe(@_) for $self->all_storages;
930   }
931
932   return $self->master->unsafe;
933 }
934
935 =head2 disable_sth_caching
936
937 sets the L<DBIx::Class::Storage::DBI/disable_sth_caching> option on all storages
938 or returns master's current setting
939
940 =cut
941
942 sub disable_sth_caching {
943   my $self = shift;
944
945   if (@_) {
946     $_->disable_sth_caching(@_) for $self->all_storages;
947   }
948
949   return $self->master->disable_sth_caching;
950 }
951
952 =head2 lag_behind_master
953
954 returns the highest Replicant L<DBIx::Class::Storage::DBI/lag_behind_master>
955 setting
956
957 =cut
958
959 sub lag_behind_master {
960   my $self = shift;
961
962   return max map $_->lag_behind_master, $self->replicants;
963 }
964
965 =head2 is_replicating
966
967 returns true if all replicants return true for
968 L<DBIx::Class::Storage::DBI/is_replicating>
969
970 =cut
971
972 sub is_replicating {
973   my $self = shift;
974
975   return (grep $_->is_replicating, $self->replicants) == ($self->replicants);
976 }
977
978 =head2 connect_call_datetime_setup
979
980 calls L<DBIx::Class::Storage::DBI/connect_call_datetime_setup> for all storages
981
982 =cut
983
984 sub connect_call_datetime_setup {
985   my $self = shift;
986   $_->connect_call_datetime_setup for $self->all_storages;
987 }
988
989 sub _populate_dbh {
990   my $self = shift;
991   $_->_populate_dbh for $self->all_storages;
992 }
993
994 sub _connect {
995   my $self = shift;
996   $_->_connect for $self->all_storages;
997 }
998
999 sub _rebless {
1000   my $self = shift;
1001   $_->_rebless for $self->all_storages;
1002 }
1003
1004 sub _determine_driver {
1005   my $self = shift;
1006   $_->_determine_driver for $self->all_storages;
1007 }
1008
1009 sub _driver_determined {
1010   my $self = shift;
1011
1012   if (@_) {
1013     $_->_driver_determined(@_) for $self->all_storages;
1014   }
1015
1016   return $self->master->_driver_determined;
1017 }
1018
1019 sub _init {
1020   my $self = shift;
1021
1022   $_->_init for $self->all_storages;
1023 }
1024
1025 sub _run_connection_actions {
1026   my $self = shift;
1027
1028   $_->_run_connection_actions for $self->all_storages;
1029 }
1030
1031 sub _do_connection_actions {
1032   my $self = shift;
1033
1034   if (@_) {
1035     $_->_do_connection_actions(@_) for $self->all_storages;
1036   }
1037 }
1038
1039 sub connect_call_do_sql {
1040   my $self = shift;
1041   $_->connect_call_do_sql(@_) for $self->all_storages;
1042 }
1043
1044 sub disconnect_call_do_sql {
1045   my $self = shift;
1046   $_->disconnect_call_do_sql(@_) for $self->all_storages;
1047 }
1048
1049 sub _seems_connected {
1050   my $self = shift;
1051
1052   return min map $_->_seems_connected, $self->all_storages;
1053 }
1054
1055 sub _ping {
1056   my $self = shift;
1057
1058   return min map $_->_ping, $self->all_storages;
1059 }
1060
1061 # not using the normalized_version, because we want to preserve
1062 # version numbers much longer than the conventional xxx.yyyzzz
1063 my $numify_ver = sub {
1064   my $ver = shift;
1065   my @numparts = split /\D+/, $ver;
1066   my $format = '%d.' . (join '', ('%06d') x (@numparts - 1));
1067
1068   return sprintf $format, @numparts;
1069 };
1070 sub _server_info {
1071   my $self = shift;
1072
1073   if (not $self->_dbh_details->{info}) {
1074     $self->_dbh_details->{info} = (
1075       reduce { $a->[0] < $b->[0] ? $a : $b }
1076       map [ $numify_ver->($_->{dbms_version}), $_ ],
1077       map $_->_server_info, $self->all_storages
1078     )->[1];
1079   }
1080
1081   return $self->next::method;
1082 }
1083
1084 sub _get_server_version {
1085   my $self = shift;
1086
1087   return $self->_server_info->{dbms_version};
1088 }
1089
1090 =head1 GOTCHAS
1091
1092 Due to the fact that replicants can lag behind a master, you must take care to
1093 make sure you use one of the methods to force read queries to a master should
1094 you need realtime data integrity.  For example, if you insert a row, and then
1095 immediately re-read it from the database (say, by doing $row->discard_changes)
1096 or you insert a row and then immediately build a query that expects that row
1097 to be an item, you should force the master to handle reads.  Otherwise, due to
1098 the lag, there is no certainty your data will be in the expected state.
1099
1100 For data integrity, all transactions automatically use the master storage for
1101 all read and write queries.  Using a transaction is the preferred and recommended
1102 method to force the master to handle all read queries.
1103
1104 Otherwise, you can force a single query to use the master with the 'force_pool'
1105 attribute:
1106
1107   my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk);
1108
1109 This attribute will safely be ignore by non replicated storages, so you can use
1110 the same code for both types of systems.
1111
1112 Lastly, you can use the L</execute_reliably> method, which works very much like
1113 a transaction.
1114
1115 For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
1116 and L</set_balanced_storage>, however this operates at a global level and is not
1117 suitable if you have a shared Schema object being used by multiple processes,
1118 such as on a web application server.  You can get around this limitation by
1119 using the Schema clone method.
1120
1121   my $new_schema = $schema->clone;
1122   $new_schema->set_reliable_storage;
1123
1124   ## $new_schema will use only the Master storage for all reads/writes while
1125   ## the $schema object will use replicated storage.
1126
1127 =head1 AUTHOR
1128
1129   John Napiorkowski <john.napiorkowski@takkle.com>
1130
1131 Based on code originated by:
1132
1133   Norbert Csongrádi <bert@cpan.org>
1134   Peter Siklósi <einon@einon.hu>
1135
1136 =head1 LICENSE
1137
1138 You may distribute this code under the same terms as Perl itself.
1139
1140 =cut
1141
1142 __PACKAGE__->meta->make_immutable;
1143
1144 1;