::DBI::Replicated - don't build pool/balancer from connect_info unless necessary
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 BEGIN {
4   use Carp::Clan qw/^DBIx::Class/;
5         
6   ## Modules required for Replication support not required for general DBIC
7   ## use, so we explicitly test for these.
8         
9   my %replication_required = (
10     Moose => '0.77',
11     MooseX::AttributeHelpers => '0.12',
12     MooseX::Types => '0.10',
13     namespace::clean => '0.11',
14   );
15         
16   my @didnt_load;
17   
18   for my $module (keys %replication_required) {
19         eval "use $module $replication_required{$module}";
20         push @didnt_load, "$module $replication_required{$module}"
21          if $@;
22   }
23         
24   croak("@{[ join ', ', @didnt_load ]} are missing and are required for Replication")
25     if @didnt_load;     
26 }
27
28 use Moose;
29 use DBIx::Class::Storage::DBI;
30 use DBIx::Class::Storage::DBI::Replicated::Pool;
31 use DBIx::Class::Storage::DBI::Replicated::Balancer;
32 use DBIx::Class::Storage::DBI::Replicated::Types 'BalancerClassNamePart';
33 use MooseX::Types::Moose qw/ClassName HashRef Object/;
34 use Scalar::Util 'reftype';
35 use Carp::Clan qw/^DBIx::Class/;
36
37 use namespace::clean -except => 'meta';
38
39 =head1 NAME
40
41 DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
42
43 =head1 SYNOPSIS
44
45 The Following example shows how to change an existing $schema to a replicated
46 storage type, add some replicated (readonly) databases, and perform reporting
47 tasks.
48
49   ## Change storage_type in your schema class
50   $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
51   
52   ## Add some slaves.  Basically this is an array of arrayrefs, where each
53   ## arrayref is database connect information
54   
55   $schema->storage->connect_replicants(
56     [$dsn1, $user, $pass, \%opts],
57     [$dsn2, $user, $pass, \%opts],
58     [$dsn3, $user, $pass, \%opts],
59   );
60   
61   ## Now, just use the $schema as normal
62   $schema->resultset('Source')->search({name=>'etc'});
63   
64   ## You can force a given query to use a particular storage using the search
65   ### attribute 'force_pool'.  For example:
66   
67   my $RS = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
68   
69   ## Now $RS will force everything (both reads and writes) to use whatever was
70   ## setup as the master storage.  'master' is hardcoded to always point to the
71   ## Master, but you can also use any Replicant name.  Please see:
72   ## L<DBIx::Class::Storage::Replicated::Pool> and the replicants attribute for
73   ## More. Also see transactions and L</execute_reliably> for alternative ways
74   ## to force read traffic to the master.
75   
76 =head1 DESCRIPTION
77
78 Warning: This class is marked BETA.  This has been running a production
79 website using MySQL native replication as its backend and we have some decent
80 test coverage but the code hasn't yet been stressed by a variety of databases.
81 Individual DB's may have quirks we are not aware of.  Please use this in first
82 development and pass along your experiences/bug fixes.
83
84 This class implements replicated data store for DBI. Currently you can define
85 one master and numerous slave database connections. All write-type queries
86 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
87 database, all read-type queries (SELECTs) go to the slave database.
88
89 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
90 handle gets delegated to one of the two attributes: L</read_handler> or to
91 L</write_handler>.  Additionally, some methods need to be distributed
92 to all existing storages.  This way our storage class is a drop in replacement
93 for L<DBIx::Class::Storage::DBI>.
94
95 Read traffic is spread across the replicants (slaves) occuring to a user
96 selected algorithm.  The default algorithm is random weighted.
97
98 =head1 NOTES
99
100 The consistancy betweeen master and replicants is database specific.  The Pool
101 gives you a method to validate it's replicants, removing and replacing them
102 when they fail/pass predefined criteria.  Please make careful use of the ways
103 to force a query to run against Master when needed.
104
105 =head1 REQUIREMENTS
106
107 Replicated Storage has additional requirements not currently part of L<DBIx::Class>
108
109   Moose => 0.77
110   MooseX::AttributeHelpers => 0.12 
111   MooseX::Types => 0.10
112   namespace::clean => 0.11
113   
114 You will need to install these modules manually via CPAN or make them part of the
115 Makefile for your distribution.
116
117 =head1 ATTRIBUTES
118
119 This class defines the following attributes.
120
121 =head2 schema
122
123 The underlying L<DBIx::Class::Schema> object this storage is attaching
124
125 =cut
126
127 has 'schema' => (
128     is=>'rw',
129     isa=>'DBIx::Class::Schema',
130     weak_ref=>1,
131     required=>1,
132 );
133
134 =head2 pool_type
135
136 Contains the classname which will instantiate the L</pool> object.  Defaults 
137 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
138
139 =cut
140
141 has 'pool_type' => (
142   is=>'rw',
143   isa=>ClassName,
144   default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
145   handles=>{
146     'create_pool' => 'new',
147   },
148 );
149
150 =head2 pool_args
151
152 Contains a hashref of initialized information to pass to the Balancer object.
153 See L<DBIx::Class::Storage::Replicated::Pool> for available arguments.
154
155 =cut
156
157 has 'pool_args' => (
158   is=>'rw',
159   isa=>HashRef,
160   lazy=>1,
161   default=>sub { {} },
162 );
163
164
165 =head2 balancer_type
166
167 The replication pool requires a balance class to provider the methods for
168 choose how to spread the query load across each replicant in the pool.
169
170 =cut
171
172 has 'balancer_type' => (
173   is=>'rw',
174   isa=>BalancerClassNamePart,
175   coerce=>1,
176   required=>1,
177   default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
178   handles=>{
179     'create_balancer' => 'new',
180   },
181 );
182
183 =head2 balancer_args
184
185 Contains a hashref of initialized information to pass to the Balancer object.
186 See L<DBIx::Class::Storage::Replicated::Balancer> for available arguments.
187
188 =cut
189
190 has 'balancer_args' => (
191   is=>'rw',
192   isa=>HashRef,
193   lazy=>1,
194   required=>1,
195   default=>sub { {} },
196 );
197
198 =head2 pool
199
200 Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
201 container class for one or more replicated databases.
202
203 =cut
204
205 has 'pool' => (
206   is=>'ro',
207   isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
208   lazy_build=>1,
209   handles=>[qw/
210     connect_replicants    
211     replicants
212     has_replicants
213   /],
214 );
215
216 =head2 balancer
217
218 Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This 
219 is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
220
221 =cut
222
223 has 'balancer' => (
224   is=>'rw',
225   isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
226   lazy_build=>1,
227   handles=>[qw/auto_validate_every/],
228 );
229
230 =head2 master
231
232 The master defines the canonical state for a pool of connected databases.  All
233 the replicants are expected to match this databases state.  Thus, in a classic
234 Master / Slaves distributed system, all the slaves are expected to replicate
235 the Master's state as quick as possible.  This is the only database in the
236 pool of databases that is allowed to handle write traffic.
237
238 =cut
239
240 has 'master' => (
241   is=> 'ro',
242   isa=>'DBIx::Class::Storage::DBI',
243   lazy_build=>1,
244 );
245
246 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
247
248 The following methods are delegated all the methods required for the 
249 L<DBIx::Class::Storage::DBI> interface.
250
251 =head2 read_handler
252
253 Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
254
255 =cut
256
257 has 'read_handler' => (
258   is=>'rw',
259   isa=>Object,
260   lazy_build=>1,
261   handles=>[qw/
262     select
263     select_single
264     columns_info_for
265   /],    
266 );
267
268 =head2 write_handler
269
270 Defines an object that implements the write side of L<BIx::Class::Storage::DBI>.
271
272 =cut
273
274 has 'write_handler' => (
275   is=>'ro',
276   isa=>Object,
277   lazy_build=>1,
278   handles=>[qw/   
279     on_connect_do
280     on_disconnect_do       
281     connect_info
282     throw_exception
283     sql_maker
284     sqlt_type
285     create_ddl_dir
286     deployment_statements
287     datetime_parser
288     datetime_parser_type        
289     last_insert_id
290     insert
291     insert_bulk
292     update
293     delete
294     dbh
295     txn_begin
296     txn_do
297     txn_commit
298     txn_rollback
299     txn_scope_guard
300     sth
301     deploy
302     with_deferred_fk_checks
303
304     reload_row
305     _prep_for_execute
306     
307   /],
308 );
309
310 has _master_connect_info_opts =>
311   (is => 'rw', isa => HashRef, default => sub { {} });
312
313 =head2 around: connect_info
314
315 Preserve master's C<connect_info> options (for merging with replicants.)
316 Also set any Replicated related options from connect_info, such as
317 C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>.
318
319 =cut
320
321 around connect_info => sub {
322   my ($next, $self, $info, @extra) = @_;
323
324   my %opts;
325   for my $arg (@$info) {
326     next unless (reftype($arg)||'') eq 'HASH';
327     %opts = (%opts, %$arg);
328   }
329   delete $opts{dsn};
330
331   if (@opts{qw/pool_type pool_args/}) {
332     $self->pool_type(delete $opts{pool_type})
333       if $opts{pool_type};
334
335     $self->pool_args({
336       %{ $self->pool_args },
337       %{ delete $opts{pool_args} || {} }
338     });
339
340     $self->pool($self->_build_pool)
341         if $self->pool;
342   }
343
344   if (@opts{qw/balancer_type balancer_args/}) {
345     $self->balancer_type(delete $opts{balancer_type})
346       if $opts{balancer_type};
347
348     $self->balancer_args({
349       %{ $self->balancer_args },
350       %{ delete $opts{balancer_args} || {} }
351     });
352
353     $self->balancer($self->_build_balancer)
354         if $self->balancer;
355   }
356
357   $self->_master_connect_info_opts(\%opts);
358
359   $self->$next($info, @extra);
360 };
361
362 =head1 METHODS
363
364 This class defines the following methods.
365
366 =head2 BUILDARGS
367
368 L<DBIx::Class::Schema> when instantiating it's storage passed itself as the
369 first argument.  So we need to massage the arguments a bit so that all the
370 bits get put into the correct places.
371
372 =cut
373
374 sub BUILDARGS {
375   my ($class, $schema, $storage_type_args, @args) = @_; 
376   
377   return {
378         schema=>$schema, 
379         %$storage_type_args,
380         @args
381   }
382 }
383
384 =head2 _build_master
385
386 Lazy builder for the L</master> attribute.
387
388 =cut
389
390 sub _build_master {
391   my $self = shift @_;
392   my $master = DBIx::Class::Storage::DBI->new($self->schema);
393   DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master);
394   $master
395 }
396
397 =head2 _build_pool
398
399 Lazy builder for the L</pool> attribute.
400
401 =cut
402
403 sub _build_pool {
404   my $self = shift @_;
405   $self->create_pool(%{$self->pool_args});
406 }
407
408 =head2 _build_balancer
409
410 Lazy builder for the L</balancer> attribute.  This takes a Pool object so that
411 the balancer knows which pool it's balancing.
412
413 =cut
414
415 sub _build_balancer {
416   my $self = shift @_;
417   $self->create_balancer(
418     pool=>$self->pool, 
419     master=>$self->master,
420     %{$self->balancer_args},
421   );
422 }
423
424 =head2 _build_write_handler
425
426 Lazy builder for the L</write_handler> attribute.  The default is to set this to
427 the L</master>.
428
429 =cut
430
431 sub _build_write_handler {
432   return shift->master;
433 }
434
435 =head2 _build_read_handler
436
437 Lazy builder for the L</read_handler> attribute.  The default is to set this to
438 the L</balancer>.
439
440 =cut
441
442 sub _build_read_handler {
443   return shift->balancer;
444 }
445
446 =head2 around: connect_replicants
447
448 All calls to connect_replicants needs to have an existing $schema tacked onto
449 top of the args, since L<DBIx::Storage::DBI> needs it, and any C<connect_info>
450 options merged with the master, with replicant opts having higher priority.
451
452 =cut
453
454 around connect_replicants => sub {
455   my ($next, $self, @args) = @_;
456
457   for my $r (@args) {
458     $r = [ $r ] unless reftype $r eq 'ARRAY';
459
460     croak "coderef replicant connect_info not supported"
461       if ref $r->[0] && reftype $r->[0] eq 'CODE';
462
463 # any connect_info options?
464     my $i = 0;
465     $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
466
467 # make one if none    
468     $r->[$i] = {} unless $r->[$i];
469
470 # merge if two hashes
471     my %opts = map %$_, @$r[$i .. $#{$r}];
472     splice @$r, $i+1, ($#{$r} - $i), ();
473
474 # merge with master
475     %opts = (%{ $self->_master_connect_info_opts }, %opts);
476
477 # update
478     $r->[$i] = \%opts;
479   }
480
481   $self->$next($self->schema, @args);
482 };
483
484 =head2 all_storages
485
486 Returns an array of of all the connected storage backends.  The first element
487 in the returned array is the master, and the remainings are each of the
488 replicants.
489
490 =cut
491
492 sub all_storages {
493   my $self = shift @_;
494   return grep {defined $_ && blessed $_} (
495      $self->master,
496      values %{ $self->replicants },
497   );
498 }
499
500 =head2 execute_reliably ($coderef, ?@args)
501
502 Given a coderef, saves the current state of the L</read_handler>, forces it to
503 use reliable storage (ie sets it to the master), executes a coderef and then
504 restores the original state.
505
506 Example:
507
508   my $reliably = sub {
509     my $name = shift @_;
510     $schema->resultset('User')->create({name=>$name});
511     my $user_rs = $schema->resultset('User')->find({name=>$name}); 
512     return $user_rs;
513   };
514
515   my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
516
517 Use this when you must be certain of your database state, such as when you just
518 inserted something and need to get a resultset including it, etc.
519
520 =cut
521
522 sub execute_reliably {
523   my ($self, $coderef, @args) = @_;
524   
525   unless( ref $coderef eq 'CODE') {
526     $self->throw_exception('Second argument must be a coderef');
527   }
528   
529   ##Get copy of master storage
530   my $master = $self->master;
531   
532   ##Get whatever the current read hander is
533   my $current = $self->read_handler;
534   
535   ##Set the read handler to master
536   $self->read_handler($master);
537   
538   ## do whatever the caller needs
539   my @result;
540   my $want_array = wantarray;
541   
542   eval {
543     if($want_array) {
544       @result = $coderef->(@args);
545     } elsif(defined $want_array) {
546       ($result[0]) = ($coderef->(@args));
547     } else {
548       $coderef->(@args);
549     }       
550   };
551   
552   ##Reset to the original state
553   $self->read_handler($current); 
554   
555   ##Exception testing has to come last, otherwise you might leave the 
556   ##read_handler set to master.
557   
558   if($@) {
559     $self->throw_exception("coderef returned an error: $@");
560   } else {
561     return $want_array ? @result : $result[0];
562   }
563 }
564
565 =head2 set_reliable_storage
566
567 Sets the current $schema to be 'reliable', that is all queries, both read and
568 write are sent to the master
569   
570 =cut
571
572 sub set_reliable_storage {
573   my $self = shift @_;
574   my $schema = $self->schema;
575   my $write_handler = $self->schema->storage->write_handler;
576   
577   $schema->storage->read_handler($write_handler);
578 }
579
580 =head2 set_balanced_storage
581
582 Sets the current $schema to be use the </balancer> for all reads, while all
583 writea are sent to the master only
584   
585 =cut
586
587 sub set_balanced_storage {
588   my $self = shift @_;
589   my $schema = $self->schema;
590   my $write_handler = $self->schema->storage->balancer;
591   
592   $schema->storage->read_handler($write_handler);
593 }
594
595 =head2 around: txn_do ($coderef)
596
597 Overload to the txn_do method, which is delegated to whatever the
598 L<write_handler> is set to.  We overload this in order to wrap in inside a
599 L</execute_reliably> method.
600
601 =cut
602
603 around 'txn_do' => sub {
604   my($txn_do, $self, $coderef, @args) = @_;
605   $self->execute_reliably(sub {$self->$txn_do($coderef, @args)}); 
606 };
607
608 =head2 connected
609
610 Check that the master and at least one of the replicants is connected.
611
612 =cut
613
614 sub connected {
615   my $self = shift @_;
616   return
617     $self->master->connected &&
618     $self->pool->connected_replicants;
619 }
620
621 =head2 ensure_connected
622
623 Make sure all the storages are connected.
624
625 =cut
626
627 sub ensure_connected {
628   my $self = shift @_;
629   foreach my $source ($self->all_storages) {
630     $source->ensure_connected(@_);
631   }
632 }
633
634 =head2 limit_dialect
635
636 Set the limit_dialect for all existing storages
637
638 =cut
639
640 sub limit_dialect {
641   my $self = shift @_;
642   foreach my $source ($self->all_storages) {
643     $source->limit_dialect(@_);
644   }
645   return $self->master->quote_char;
646 }
647
648 =head2 quote_char
649
650 Set the quote_char for all existing storages
651
652 =cut
653
654 sub quote_char {
655   my $self = shift @_;
656   foreach my $source ($self->all_storages) {
657     $source->quote_char(@_);
658   }
659   return $self->master->quote_char;
660 }
661
662 =head2 name_sep
663
664 Set the name_sep for all existing storages
665
666 =cut
667
668 sub name_sep {
669   my $self = shift @_;
670   foreach my $source ($self->all_storages) {
671     $source->name_sep(@_);
672   }
673   return $self->master->name_sep;
674 }
675
676 =head2 set_schema
677
678 Set the schema object for all existing storages
679
680 =cut
681
682 sub set_schema {
683   my $self = shift @_;
684   foreach my $source ($self->all_storages) {
685     $source->set_schema(@_);
686   }
687 }
688
689 =head2 debug
690
691 set a debug flag across all storages
692
693 =cut
694
695 sub debug {
696   my $self = shift @_;
697   if(@_) {
698     foreach my $source ($self->all_storages) {
699       $source->debug(@_);
700     }   
701   }
702   return $self->master->debug;
703 }
704
705 =head2 debugobj
706
707 set a debug object across all storages
708
709 =cut
710
711 sub debugobj {
712   my $self = shift @_;
713   if(@_) {
714     foreach my $source ($self->all_storages) {
715       $source->debugobj(@_);
716     }   
717   }
718   return $self->master->debugobj;
719 }
720
721 =head2 debugfh
722
723 set a debugfh object across all storages
724
725 =cut
726
727 sub debugfh {
728   my $self = shift @_;
729   if(@_) {
730     foreach my $source ($self->all_storages) {
731       $source->debugfh(@_);
732     }   
733   }
734   return $self->master->debugfh;
735 }
736
737 =head2 debugcb
738
739 set a debug callback across all storages
740
741 =cut
742
743 sub debugcb {
744   my $self = shift @_;
745   if(@_) {
746     foreach my $source ($self->all_storages) {
747       $source->debugcb(@_);
748     }   
749   }
750   return $self->master->debugcb;
751 }
752
753 =head2 disconnect
754
755 disconnect everything
756
757 =cut
758
759 sub disconnect {
760   my $self = shift @_;
761   foreach my $source ($self->all_storages) {
762     $source->disconnect(@_);
763   }
764 }
765
766 =head2 cursor_class
767
768 set cursor class on all storages, or return master's
769
770 =cut
771
772 sub cursor_class {
773   my ($self, $cursor_class) = @_;
774
775   if ($cursor_class) {
776     $_->cursor_class($cursor_class) for $self->all_storages;
777   }
778   $self->master->cursor_class;
779 }
780   
781 =head1 GOTCHAS
782
783 Due to the fact that replicants can lag behind a master, you must take care to
784 make sure you use one of the methods to force read queries to a master should
785 you need realtime data integrity.  For example, if you insert a row, and then
786 immediately re-read it from the database (say, by doing $row->discard_changes)
787 or you insert a row and then immediately build a query that expects that row
788 to be an item, you should force the master to handle reads.  Otherwise, due to
789 the lag, there is no certainty your data will be in the expected state.
790
791 For data integrity, all transactions automatically use the master storage for
792 all read and write queries.  Using a transaction is the preferred and recommended
793 method to force the master to handle all read queries.
794
795 Otherwise, you can force a single query to use the master with the 'force_pool'
796 attribute:
797
798   my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk);
799
800 This attribute will safely be ignore by non replicated storages, so you can use
801 the same code for both types of systems.
802
803 Lastly, you can use the L</execute_reliably> method, which works very much like
804 a transaction.
805
806 For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
807 and L</set_balanced_storage>, however this operates at a global level and is not
808 suitable if you have a shared Schema object being used by multiple processes,
809 such as on a web application server.  You can get around this limitation by
810 using the Schema clone method.
811
812   my $new_schema = $schema->clone;
813   $new_schema->set_reliable_storage;
814   
815   ## $new_schema will use only the Master storage for all reads/writes while
816   ## the $schema object will use replicated storage.
817
818 =head1 AUTHOR
819
820   John Napiorkowski <john.napiorkowski@takkle.com>
821
822 Based on code originated by:
823
824   Norbert Csongrádi <bert@cpan.org>
825   Peter Siklósi <einon@einon.hu>
826
827 =head1 LICENSE
828
829 You may distribute this code under the same terms as Perl itself.
830
831 =cut
832
833 __PACKAGE__->meta->make_immutable;
834
835 1;