::DBI:Replicated - merge connect_info from master to replicants
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 BEGIN {
4   use Carp::Clan qw/^DBIx::Class/;
5         
6   ## Modules required for Replication support not required for general DBIC
7   ## use, so we explicitly test for these.
8         
9   my %replication_required = (
10     Moose => '0.77',
11     MooseX::AttributeHelpers => '0.12',
12     MooseX::Types => '0.10',
13     namespace::clean => '0.11',
14   );
15         
16   my @didnt_load;
17   
18   for my $module (keys %replication_required) {
19         eval "use $module $replication_required{$module}";
20         push @didnt_load, "$module $replication_required{$module}"
21          if $@;
22   }
23         
24   croak("@{[ join ', ', @didnt_load ]} are missing and are required for Replication")
25     if @didnt_load;     
26 }
27
28 use Moose;
29 use DBIx::Class::Storage::DBI;
30 use DBIx::Class::Storage::DBI::Replicated::Pool;
31 use DBIx::Class::Storage::DBI::Replicated::Balancer;
32 use DBIx::Class::Storage::DBI::Replicated::Types 'BalancerClassNamePart';
33 use MooseX::Types::Moose qw/ClassName HashRef Object/;
34 use Scalar::Util 'reftype';
35 use Carp::Clan qw/^DBIx::Class/;
36
37 use namespace::clean -except => 'meta';
38
39 =head1 NAME
40
41 DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
42
43 =head1 SYNOPSIS
44
45 The Following example shows how to change an existing $schema to a replicated
46 storage type, add some replicated (readonly) databases, and perform reporting
47 tasks.
48
49   ## Change storage_type in your schema class
50   $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
51   
52   ## Add some slaves.  Basically this is an array of arrayrefs, where each
53   ## arrayref is database connect information
54   
55   $schema->storage->connect_replicants(
56     [$dsn1, $user, $pass, \%opts],
57     [$dsn2, $user, $pass, \%opts],
58     [$dsn3, $user, $pass, \%opts],
59   );
60   
61   ## Now, just use the $schema as normal
62   $schema->resultset('Source')->search({name=>'etc'});
63   
64   ## You can force a given query to use a particular storage using the search
65   ### attribute 'force_pool'.  For example:
66   
67   my $RS = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
68   
69   ## Now $RS will force everything (both reads and writes) to use whatever was
70   ## setup as the master storage.  'master' is hardcoded to always point to the
71   ## Master, but you can also use any Replicant name.  Please see:
72   ## L<DBIx::Class::Storage::Replicated::Pool> and the replicants attribute for
73   ## More. Also see transactions and L</execute_reliably> for alternative ways
74   ## to force read traffic to the master.
75   
76 =head1 DESCRIPTION
77
78 Warning: This class is marked BETA.  This has been running a production
79 website using MySQL native replication as its backend and we have some decent
80 test coverage but the code hasn't yet been stressed by a variety of databases.
81 Individual DB's may have quirks we are not aware of.  Please use this in first
82 development and pass along your experiences/bug fixes.
83
84 This class implements replicated data store for DBI. Currently you can define
85 one master and numerous slave database connections. All write-type queries
86 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
87 database, all read-type queries (SELECTs) go to the slave database.
88
89 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
90 handle gets delegated to one of the two attributes: L</read_handler> or to
91 L</write_handler>.  Additionally, some methods need to be distributed
92 to all existing storages.  This way our storage class is a drop in replacement
93 for L<DBIx::Class::Storage::DBI>.
94
95 Read traffic is spread across the replicants (slaves) occuring to a user
96 selected algorithm.  The default algorithm is random weighted.
97
98 =head1 NOTES
99
100 The consistancy betweeen master and replicants is database specific.  The Pool
101 gives you a method to validate it's replicants, removing and replacing them
102 when they fail/pass predefined criteria.  Please make careful use of the ways
103 to force a query to run against Master when needed.
104
105 =head1 REQUIREMENTS
106
107 Replicated Storage has additional requirements not currently part of L<DBIx::Class>
108
109   Moose => 0.77
110   MooseX::AttributeHelpers => 0.12 
111   MooseX::Types => 0.10
112   namespace::clean => 0.11
113   
114 You will need to install these modules manually via CPAN or make them part of the
115 Makefile for your distribution.
116
117 =head1 ATTRIBUTES
118
119 This class defines the following attributes.
120
121 =head2 schema
122
123 The underlying L<DBIx::Class::Schema> object this storage is attaching
124
125 =cut
126
127 has 'schema' => (
128     is=>'rw',
129     isa=>'DBIx::Class::Schema',
130     weak_ref=>1,
131     required=>1,
132 );
133
134 =head2 pool_type
135
136 Contains the classname which will instantiate the L</pool> object.  Defaults 
137 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
138
139 =cut
140
141 has 'pool_type' => (
142   is=>'ro',
143   isa=>ClassName,
144   required=>1,
145   default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
146   handles=>{
147     'create_pool' => 'new',
148   },
149 );
150
151 =head2 pool_args
152
153 Contains a hashref of initialized information to pass to the Balancer object.
154 See L<DBIx::Class::Storage::Replicated::Pool> for available arguments.
155
156 =cut
157
158 has 'pool_args' => (
159   is=>'ro',
160   isa=>HashRef,
161   lazy=>1,
162   required=>1,
163   default=>sub { {} },
164 );
165
166
167 =head2 balancer_type
168
169 The replication pool requires a balance class to provider the methods for
170 choose how to spread the query load across each replicant in the pool.
171
172 =cut
173
174 has 'balancer_type' => (
175   is=>'ro',
176   isa=>BalancerClassNamePart,
177   coerce=>1,
178   required=>1,
179   default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
180   handles=>{
181     'create_balancer' => 'new',
182   },
183 );
184
185 =head2 balancer_args
186
187 Contains a hashref of initialized information to pass to the Balancer object.
188 See L<DBIx::Class::Storage::Replicated::Balancer> for available arguments.
189
190 =cut
191
192 has 'balancer_args' => (
193   is=>'ro',
194   isa=>HashRef,
195   lazy=>1,
196   required=>1,
197   default=>sub { {} },
198 );
199
200 =head2 pool
201
202 Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
203 container class for one or more replicated databases.
204
205 =cut
206
207 has 'pool' => (
208   is=>'ro',
209   isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
210   lazy_build=>1,
211   handles=>[qw/
212     connect_replicants    
213     replicants
214     has_replicants
215   /],
216 );
217
218 =head2 balancer
219
220 Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This 
221 is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
222
223 =cut
224
225 has 'balancer' => (
226   is=>'ro',
227   isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
228   lazy_build=>1,
229   handles=>[qw/auto_validate_every/],
230 );
231
232 =head2 master
233
234 The master defines the canonical state for a pool of connected databases.  All
235 the replicants are expected to match this databases state.  Thus, in a classic
236 Master / Slaves distributed system, all the slaves are expected to replicate
237 the Master's state as quick as possible.  This is the only database in the
238 pool of databases that is allowed to handle write traffic.
239
240 =cut
241
242 has 'master' => (
243   is=> 'ro',
244   isa=>'DBIx::Class::Storage::DBI',
245   lazy_build=>1,
246 );
247
248 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
249
250 The following methods are delegated all the methods required for the 
251 L<DBIx::Class::Storage::DBI> interface.
252
253 =head2 read_handler
254
255 Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
256
257 =cut
258
259 has 'read_handler' => (
260   is=>'rw',
261   isa=>Object,
262   lazy_build=>1,
263   handles=>[qw/
264     select
265     select_single
266     columns_info_for
267   /],    
268 );
269
270 =head2 write_handler
271
272 Defines an object that implements the write side of L<BIx::Class::Storage::DBI>.
273
274 =cut
275
276 has 'write_handler' => (
277   is=>'ro',
278   isa=>Object,
279   lazy_build=>1,
280   lazy_build=>1,
281   handles=>[qw/   
282     on_connect_do
283     on_disconnect_do       
284     connect_info
285     throw_exception
286     sql_maker
287     sqlt_type
288     create_ddl_dir
289     deployment_statements
290     datetime_parser
291     datetime_parser_type        
292     last_insert_id
293     insert
294     insert_bulk
295     update
296     delete
297     dbh
298     txn_begin
299     txn_do
300     txn_commit
301     txn_rollback
302     txn_scope_guard
303     sth
304     deploy
305     with_deferred_fk_checks
306
307     reload_row
308     _prep_for_execute
309     
310   /],
311 );
312
313 has _master_connect_info_opts =>
314   (is => 'rw', isa => HashRef, default => sub { {} });
315
316 =head2 around: connect_info
317
318 Preserve master's C<connect_info> options (for merging with replicants.)
319
320 =cut
321
322 around connect_info => sub {
323   my ($next, $self, $info, @extra) = @_;
324
325   my %opts;
326   for my $arg (@$info) {
327     next unless (reftype($arg)||'') eq 'HASH';
328     %opts = (%opts, %$arg);
329   }
330
331   delete $opts{dsn};
332
333   $self->_master_connect_info_opts(\%opts);
334
335   $self->$next($info, @extra);
336 };
337
338 =head1 METHODS
339
340 This class defines the following methods.
341
342 =head2 BUILDARGS
343
344 L<DBIx::Class::Schema> when instantiating it's storage passed itself as the
345 first argument.  So we need to massage the arguments a bit so that all the
346 bits get put into the correct places.
347
348 =cut
349
350 sub BUILDARGS {
351   my ($class, $schema, $storage_type_args, @args) = @_; 
352   
353   return {
354         schema=>$schema, 
355         %$storage_type_args,
356         @args
357   }
358 }
359
360 =head2 _build_master
361
362 Lazy builder for the L</master> attribute.
363
364 =cut
365
366 sub _build_master {
367   my $self = shift @_;
368   DBIx::Class::Storage::DBI->new($self->schema);
369 }
370
371 =head2 _build_pool
372
373 Lazy builder for the L</pool> attribute.
374
375 =cut
376
377 sub _build_pool {
378   my $self = shift @_;
379   $self->create_pool(%{$self->pool_args});
380 }
381
382 =head2 _build_balancer
383
384 Lazy builder for the L</balancer> attribute.  This takes a Pool object so that
385 the balancer knows which pool it's balancing.
386
387 =cut
388
389 sub _build_balancer {
390   my $self = shift @_;
391   $self->create_balancer(
392     pool=>$self->pool, 
393     master=>$self->master,
394     %{$self->balancer_args},
395   );
396 }
397
398 =head2 _build_write_handler
399
400 Lazy builder for the L</write_handler> attribute.  The default is to set this to
401 the L</master>.
402
403 =cut
404
405 sub _build_write_handler {
406   return shift->master;
407 }
408
409 =head2 _build_read_handler
410
411 Lazy builder for the L</read_handler> attribute.  The default is to set this to
412 the L</balancer>.
413
414 =cut
415
416 sub _build_read_handler {
417   return shift->balancer;
418 }
419
420 =head2 around: connect_replicants
421
422 All calls to connect_replicants needs to have an existing $schema tacked onto
423 top of the args, since L<DBIx::Storage::DBI> needs it, and any C<connect_info>
424 options merged with the master, with replicant opts having higher priority.
425
426 =cut
427
428 around connect_replicants => sub {
429   my ($next, $self, @args) = @_;
430
431   for my $r (@args) {
432     $r = [ $r ] unless reftype $r eq 'ARRAY';
433
434     croak "coderef replicant connect_info not supported"
435       if ref $r->[0] && reftype $r->[0] eq 'CODE';
436
437 # any connect_info options?
438     my $i = 0;
439     $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
440
441 # make one if none    
442     $r->[$i] = {} unless $r->[$i];
443
444 # merge if two hashes
445     my %opts = map %$_, @$r[$i .. $#{$r}];
446     splice @$r, $i+1, ($#{$r} - $i), ();
447
448 # merge with master
449     %opts = (%{ $self->_master_connect_info_opts }, %opts);
450
451 # update
452     $r->[$i] = \%opts;
453   }
454
455   $self->$next($self->schema, @args);
456 };
457
458 =head2 all_storages
459
460 Returns an array of of all the connected storage backends.  The first element
461 in the returned array is the master, and the remainings are each of the
462 replicants.
463
464 =cut
465
466 sub all_storages {
467   my $self = shift @_;
468   return grep {defined $_ && blessed $_} (
469      $self->master,
470      values %{ $self->replicants },
471   );
472 }
473
474 =head2 execute_reliably ($coderef, ?@args)
475
476 Given a coderef, saves the current state of the L</read_handler>, forces it to
477 use reliable storage (ie sets it to the master), executes a coderef and then
478 restores the original state.
479
480 Example:
481
482   my $reliably = sub {
483     my $name = shift @_;
484     $schema->resultset('User')->create({name=>$name});
485     my $user_rs = $schema->resultset('User')->find({name=>$name}); 
486     return $user_rs;
487   };
488
489   my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
490
491 Use this when you must be certain of your database state, such as when you just
492 inserted something and need to get a resultset including it, etc.
493
494 =cut
495
496 sub execute_reliably {
497   my ($self, $coderef, @args) = @_;
498   
499   unless( ref $coderef eq 'CODE') {
500     $self->throw_exception('Second argument must be a coderef');
501   }
502   
503   ##Get copy of master storage
504   my $master = $self->master;
505   
506   ##Get whatever the current read hander is
507   my $current = $self->read_handler;
508   
509   ##Set the read handler to master
510   $self->read_handler($master);
511   
512   ## do whatever the caller needs
513   my @result;
514   my $want_array = wantarray;
515   
516   eval {
517     if($want_array) {
518       @result = $coderef->(@args);
519     } elsif(defined $want_array) {
520       ($result[0]) = ($coderef->(@args));
521     } else {
522       $coderef->(@args);
523     }       
524   };
525   
526   ##Reset to the original state
527   $self->read_handler($current); 
528   
529   ##Exception testing has to come last, otherwise you might leave the 
530   ##read_handler set to master.
531   
532   if($@) {
533     $self->throw_exception("coderef returned an error: $@");
534   } else {
535     return $want_array ? @result : $result[0];
536   }
537 }
538
539 =head2 set_reliable_storage
540
541 Sets the current $schema to be 'reliable', that is all queries, both read and
542 write are sent to the master
543   
544 =cut
545
546 sub set_reliable_storage {
547   my $self = shift @_;
548   my $schema = $self->schema;
549   my $write_handler = $self->schema->storage->write_handler;
550   
551   $schema->storage->read_handler($write_handler);
552 }
553
554 =head2 set_balanced_storage
555
556 Sets the current $schema to be use the </balancer> for all reads, while all
557 writea are sent to the master only
558   
559 =cut
560
561 sub set_balanced_storage {
562   my $self = shift @_;
563   my $schema = $self->schema;
564   my $write_handler = $self->schema->storage->balancer;
565   
566   $schema->storage->read_handler($write_handler);
567 }
568
569 =head2 around: txn_do ($coderef)
570
571 Overload to the txn_do method, which is delegated to whatever the
572 L<write_handler> is set to.  We overload this in order to wrap in inside a
573 L</execute_reliably> method.
574
575 =cut
576
577 around 'txn_do' => sub {
578   my($txn_do, $self, $coderef, @args) = @_;
579   $self->execute_reliably(sub {$self->$txn_do($coderef, @args)}); 
580 };
581
582 =head2 connected
583
584 Check that the master and at least one of the replicants is connected.
585
586 =cut
587
588 sub connected {
589   my $self = shift @_;
590   return
591     $self->master->connected &&
592     $self->pool->connected_replicants;
593 }
594
595 =head2 ensure_connected
596
597 Make sure all the storages are connected.
598
599 =cut
600
601 sub ensure_connected {
602   my $self = shift @_;
603   foreach my $source ($self->all_storages) {
604     $source->ensure_connected(@_);
605   }
606 }
607
608 =head2 limit_dialect
609
610 Set the limit_dialect for all existing storages
611
612 =cut
613
614 sub limit_dialect {
615   my $self = shift @_;
616   foreach my $source ($self->all_storages) {
617     $source->limit_dialect(@_);
618   }
619   return $self->master->quote_char;
620 }
621
622 =head2 quote_char
623
624 Set the quote_char for all existing storages
625
626 =cut
627
628 sub quote_char {
629   my $self = shift @_;
630   foreach my $source ($self->all_storages) {
631     $source->quote_char(@_);
632   }
633   return $self->master->quote_char;
634 }
635
636 =head2 name_sep
637
638 Set the name_sep for all existing storages
639
640 =cut
641
642 sub name_sep {
643   my $self = shift @_;
644   foreach my $source ($self->all_storages) {
645     $source->name_sep(@_);
646   }
647   return $self->master->name_sep;
648 }
649
650 =head2 set_schema
651
652 Set the schema object for all existing storages
653
654 =cut
655
656 sub set_schema {
657   my $self = shift @_;
658   foreach my $source ($self->all_storages) {
659     $source->set_schema(@_);
660   }
661 }
662
663 =head2 debug
664
665 set a debug flag across all storages
666
667 =cut
668
669 sub debug {
670   my $self = shift @_;
671   if(@_) {
672     foreach my $source ($self->all_storages) {
673       $source->debug(@_);
674     }   
675   }
676   return $self->master->debug;
677 }
678
679 =head2 debugobj
680
681 set a debug object across all storages
682
683 =cut
684
685 sub debugobj {
686   my $self = shift @_;
687   if(@_) {
688     foreach my $source ($self->all_storages) {
689       $source->debugobj(@_);
690     }   
691   }
692   return $self->master->debugobj;
693 }
694
695 =head2 debugfh
696
697 set a debugfh object across all storages
698
699 =cut
700
701 sub debugfh {
702   my $self = shift @_;
703   if(@_) {
704     foreach my $source ($self->all_storages) {
705       $source->debugfh(@_);
706     }   
707   }
708   return $self->master->debugfh;
709 }
710
711 =head2 debugcb
712
713 set a debug callback across all storages
714
715 =cut
716
717 sub debugcb {
718   my $self = shift @_;
719   if(@_) {
720     foreach my $source ($self->all_storages) {
721       $source->debugcb(@_);
722     }   
723   }
724   return $self->master->debugcb;
725 }
726
727 =head2 disconnect
728
729 disconnect everything
730
731 =cut
732
733 sub disconnect {
734   my $self = shift @_;
735   foreach my $source ($self->all_storages) {
736     $source->disconnect(@_);
737   }
738 }
739
740 =head2 cursor_class
741
742 set cursor class on all storages, or return master's
743
744 =cut
745
746 sub cursor_class {
747   my ($self, $cursor_class) = @_;
748
749   if ($cursor_class) {
750     $_->cursor_class($cursor_class) for $self->all_storages;
751   }
752   $self->master->cursor_class;
753 }
754   
755 =head1 GOTCHAS
756
757 Due to the fact that replicants can lag behind a master, you must take care to
758 make sure you use one of the methods to force read queries to a master should
759 you need realtime data integrity.  For example, if you insert a row, and then
760 immediately re-read it from the database (say, by doing $row->discard_changes)
761 or you insert a row and then immediately build a query that expects that row
762 to be an item, you should force the master to handle reads.  Otherwise, due to
763 the lag, there is no certainty your data will be in the expected state.
764
765 For data integrity, all transactions automatically use the master storage for
766 all read and write queries.  Using a transaction is the preferred and recommended
767 method to force the master to handle all read queries.
768
769 Otherwise, you can force a single query to use the master with the 'force_pool'
770 attribute:
771
772   my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk);
773
774 This attribute will safely be ignore by non replicated storages, so you can use
775 the same code for both types of systems.
776
777 Lastly, you can use the L</execute_reliably> method, which works very much like
778 a transaction.
779
780 For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
781 and L</set_balanced_storage>, however this operates at a global level and is not
782 suitable if you have a shared Schema object being used by multiple processes,
783 such as on a web application server.  You can get around this limitation by
784 using the Schema clone method.
785
786   my $new_schema = $schema->clone;
787   $new_schema->set_reliable_storage;
788   
789   ## $new_schema will use only the Master storage for all reads/writes while
790   ## the $schema object will use replicated storage.
791
792 =head1 AUTHOR
793
794   John Napiorkowski <john.napiorkowski@takkle.com>
795
796 Based on code originated by:
797
798   Norbert Csongrádi <bert@cpan.org>
799   Peter Siklósi <einon@einon.hu>
800
801 =head1 LICENSE
802
803 You may distribute this code under the same terms as Perl itself.
804
805 =cut
806
807 __PACKAGE__->meta->make_immutable;
808
809 1;