::DBI::Replicated - add master_read_weight to ::Random balancer_type
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 BEGIN {
4   use Carp::Clan qw/^DBIx::Class/;
5         
6   ## Modules required for Replication support not required for general DBIC
7   ## use, so we explicitly test for these.
8         
9   my %replication_required = (
10     Moose => '0.77',
11     MooseX::AttributeHelpers => '0.12',
12     MooseX::Types => '0.10',
13     namespace::clean => '0.11',
14   );
15         
16   my @didnt_load;
17   
18   for my $module (keys %replication_required) {
19         eval "use $module $replication_required{$module}";
20         push @didnt_load, "$module $replication_required{$module}"
21          if $@;
22   }
23         
24   croak("@{[ join ', ', @didnt_load ]} are missing and are required for Replication")
25     if @didnt_load;     
26 }
27
28 use Moose;
29 use DBIx::Class::Storage::DBI;
30 use DBIx::Class::Storage::DBI::Replicated::Pool;
31 use DBIx::Class::Storage::DBI::Replicated::Balancer;
32 use DBIx::Class::Storage::DBI::Replicated::Types 'BalancerClassNamePart';
33 use MooseX::Types::Moose qw/ClassName HashRef Object/;
34 use Scalar::Util 'reftype';
35 use Carp::Clan qw/^DBIx::Class/;
36
37 use namespace::clean -except => 'meta';
38
39 =head1 NAME
40
41 DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
42
43 =head1 SYNOPSIS
44
45 The Following example shows how to change an existing $schema to a replicated
46 storage type, add some replicated (readonly) databases, and perform reporting
47 tasks.
48
49   ## Change storage_type in your schema class
50   $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
51   
52   ## Add some slaves.  Basically this is an array of arrayrefs, where each
53   ## arrayref is database connect information
54   
55   $schema->storage->connect_replicants(
56     [$dsn1, $user, $pass, \%opts],
57     [$dsn2, $user, $pass, \%opts],
58     [$dsn3, $user, $pass, \%opts],
59   );
60   
61   ## Now, just use the $schema as normal
62   $schema->resultset('Source')->search({name=>'etc'});
63   
64   ## You can force a given query to use a particular storage using the search
65   ### attribute 'force_pool'.  For example:
66   
67   my $RS = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
68   
69   ## Now $RS will force everything (both reads and writes) to use whatever was
70   ## setup as the master storage.  'master' is hardcoded to always point to the
71   ## Master, but you can also use any Replicant name.  Please see:
72   ## L<DBIx::Class::Storage::Replicated::Pool> and the replicants attribute for
73   ## More. Also see transactions and L</execute_reliably> for alternative ways
74   ## to force read traffic to the master.
75   
76 =head1 DESCRIPTION
77
78 Warning: This class is marked BETA.  This has been running a production
79 website using MySQL native replication as its backend and we have some decent
80 test coverage but the code hasn't yet been stressed by a variety of databases.
81 Individual DB's may have quirks we are not aware of.  Please use this in first
82 development and pass along your experiences/bug fixes.
83
84 This class implements replicated data store for DBI. Currently you can define
85 one master and numerous slave database connections. All write-type queries
86 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
87 database, all read-type queries (SELECTs) go to the slave database.
88
89 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
90 handle gets delegated to one of the two attributes: L</read_handler> or to
91 L</write_handler>.  Additionally, some methods need to be distributed
92 to all existing storages.  This way our storage class is a drop in replacement
93 for L<DBIx::Class::Storage::DBI>.
94
95 Read traffic is spread across the replicants (slaves) occuring to a user
96 selected algorithm.  The default algorithm is random weighted.
97
98 =head1 NOTES
99
100 The consistancy betweeen master and replicants is database specific.  The Pool
101 gives you a method to validate it's replicants, removing and replacing them
102 when they fail/pass predefined criteria.  Please make careful use of the ways
103 to force a query to run against Master when needed.
104
105 =head1 REQUIREMENTS
106
107 Replicated Storage has additional requirements not currently part of L<DBIx::Class>
108
109   Moose => 0.77
110   MooseX::AttributeHelpers => 0.12 
111   MooseX::Types => 0.10
112   namespace::clean => 0.11
113   
114 You will need to install these modules manually via CPAN or make them part of the
115 Makefile for your distribution.
116
117 =head1 ATTRIBUTES
118
119 This class defines the following attributes.
120
121 =head2 schema
122
123 The underlying L<DBIx::Class::Schema> object this storage is attaching
124
125 =cut
126
127 has 'schema' => (
128     is=>'rw',
129     isa=>'DBIx::Class::Schema',
130     weak_ref=>1,
131     required=>1,
132 );
133
134 =head2 pool_type
135
136 Contains the classname which will instantiate the L</pool> object.  Defaults 
137 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
138
139 =cut
140
141 has 'pool_type' => (
142   is=>'rw',
143   isa=>ClassName,
144   default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
145   handles=>{
146     'create_pool' => 'new',
147   },
148 );
149
150 =head2 pool_args
151
152 Contains a hashref of initialized information to pass to the Balancer object.
153 See L<DBIx::Class::Storage::Replicated::Pool> for available arguments.
154
155 =cut
156
157 has 'pool_args' => (
158   is=>'rw',
159   isa=>HashRef,
160   lazy=>1,
161   default=>sub { {} },
162 );
163
164
165 =head2 balancer_type
166
167 The replication pool requires a balance class to provider the methods for
168 choose how to spread the query load across each replicant in the pool.
169
170 =cut
171
172 has 'balancer_type' => (
173   is=>'rw',
174   isa=>BalancerClassNamePart,
175   coerce=>1,
176   required=>1,
177   default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
178   handles=>{
179     'create_balancer' => 'new',
180   },
181 );
182
183 =head2 balancer_args
184
185 Contains a hashref of initialized information to pass to the Balancer object.
186 See L<DBIx::Class::Storage::Replicated::Balancer> for available arguments.
187
188 =cut
189
190 has 'balancer_args' => (
191   is=>'rw',
192   isa=>HashRef,
193   lazy=>1,
194   required=>1,
195   default=>sub { {} },
196 );
197
198 =head2 pool
199
200 Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
201 container class for one or more replicated databases.
202
203 =cut
204
205 has 'pool' => (
206   is=>'ro',
207   isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
208   lazy_build=>1,
209   handles=>[qw/
210     connect_replicants    
211     replicants
212     has_replicants
213   /],
214 );
215
216 =head2 balancer
217
218 Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This 
219 is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
220
221 =cut
222
223 has 'balancer' => (
224   is=>'rw',
225   isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
226   lazy_build=>1,
227   handles=>[qw/auto_validate_every/],
228 );
229
230 =head2 master
231
232 The master defines the canonical state for a pool of connected databases.  All
233 the replicants are expected to match this databases state.  Thus, in a classic
234 Master / Slaves distributed system, all the slaves are expected to replicate
235 the Master's state as quick as possible.  This is the only database in the
236 pool of databases that is allowed to handle write traffic.
237
238 =cut
239
240 has 'master' => (
241   is=> 'ro',
242   isa=>'DBIx::Class::Storage::DBI',
243   lazy_build=>1,
244 );
245
246 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
247
248 The following methods are delegated all the methods required for the 
249 L<DBIx::Class::Storage::DBI> interface.
250
251 =head2 read_handler
252
253 Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
254
255 =cut
256
257 has 'read_handler' => (
258   is=>'rw',
259   isa=>Object,
260   lazy_build=>1,
261   handles=>[qw/
262     select
263     select_single
264     columns_info_for
265   /],    
266 );
267
268 =head2 write_handler
269
270 Defines an object that implements the write side of L<BIx::Class::Storage::DBI>.
271
272 =cut
273
274 has 'write_handler' => (
275   is=>'ro',
276   isa=>Object,
277   lazy_build=>1,
278   handles=>[qw/   
279     on_connect_do
280     on_disconnect_do       
281     connect_info
282     throw_exception
283     sql_maker
284     sqlt_type
285     create_ddl_dir
286     deployment_statements
287     datetime_parser
288     datetime_parser_type        
289     last_insert_id
290     insert
291     insert_bulk
292     update
293     delete
294     dbh
295     txn_begin
296     txn_do
297     txn_commit
298     txn_rollback
299     txn_scope_guard
300     sth
301     deploy
302     with_deferred_fk_checks
303
304     reload_row
305     _prep_for_execute
306     
307   /],
308 );
309
310 has _master_connect_info_opts =>
311   (is => 'rw', isa => HashRef, default => sub { {} });
312
313 =head2 around: connect_info
314
315 Preserve master's C<connect_info> options (for merging with replicants.)
316 Also set any Replicated related options from connect_info, such as
317 C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>.
318
319 =cut
320
321 around connect_info => sub {
322   my ($next, $self, $info, @extra) = @_;
323
324   my %opts;
325   for my $arg (@$info) {
326     next unless (reftype($arg)||'') eq 'HASH';
327     %opts = (%opts, %$arg);
328   }
329   delete $opts{dsn};
330
331   if (@opts{qw/pool_type pool_args/}) {
332     $self->pool_type(delete $opts{pool_type})
333       if $opts{pool_type};
334
335     $self->pool_args({
336       %{ $self->pool_args },
337       %{ delete $opts{pool_args} || {} }
338     });
339
340     $self->pool($self->_build_pool);
341   }
342
343   if (@opts{qw/balancer_type balancer_args/}) {
344     $self->balancer_type(delete $opts{balancer_type})
345       if $opts{balancer_type};
346
347     $self->balancer_args({
348       %{ $self->balancer_args },
349       %{ delete $opts{balancer_args} || {} }
350     });
351
352     $self->balancer($self->_build_balancer);
353   }
354
355   $self->_master_connect_info_opts(\%opts);
356
357   $self->$next($info, @extra);
358 };
359
360 =head1 METHODS
361
362 This class defines the following methods.
363
364 =head2 BUILDARGS
365
366 L<DBIx::Class::Schema> when instantiating it's storage passed itself as the
367 first argument.  So we need to massage the arguments a bit so that all the
368 bits get put into the correct places.
369
370 =cut
371
372 sub BUILDARGS {
373   my ($class, $schema, $storage_type_args, @args) = @_; 
374   
375   return {
376         schema=>$schema, 
377         %$storage_type_args,
378         @args
379   }
380 }
381
382 =head2 _build_master
383
384 Lazy builder for the L</master> attribute.
385
386 =cut
387
388 sub _build_master {
389   my $self = shift @_;
390   my $master = DBIx::Class::Storage::DBI->new($self->schema);
391   DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master);
392   $master
393 }
394
395 =head2 _build_pool
396
397 Lazy builder for the L</pool> attribute.
398
399 =cut
400
401 sub _build_pool {
402   my $self = shift @_;
403   $self->create_pool(%{$self->pool_args});
404 }
405
406 =head2 _build_balancer
407
408 Lazy builder for the L</balancer> attribute.  This takes a Pool object so that
409 the balancer knows which pool it's balancing.
410
411 =cut
412
413 sub _build_balancer {
414   my $self = shift @_;
415   $self->create_balancer(
416     pool=>$self->pool, 
417     master=>$self->master,
418     %{$self->balancer_args},
419   );
420 }
421
422 =head2 _build_write_handler
423
424 Lazy builder for the L</write_handler> attribute.  The default is to set this to
425 the L</master>.
426
427 =cut
428
429 sub _build_write_handler {
430   return shift->master;
431 }
432
433 =head2 _build_read_handler
434
435 Lazy builder for the L</read_handler> attribute.  The default is to set this to
436 the L</balancer>.
437
438 =cut
439
440 sub _build_read_handler {
441   return shift->balancer;
442 }
443
444 =head2 around: connect_replicants
445
446 All calls to connect_replicants needs to have an existing $schema tacked onto
447 top of the args, since L<DBIx::Storage::DBI> needs it, and any C<connect_info>
448 options merged with the master, with replicant opts having higher priority.
449
450 =cut
451
452 around connect_replicants => sub {
453   my ($next, $self, @args) = @_;
454
455   for my $r (@args) {
456     $r = [ $r ] unless reftype $r eq 'ARRAY';
457
458     croak "coderef replicant connect_info not supported"
459       if ref $r->[0] && reftype $r->[0] eq 'CODE';
460
461 # any connect_info options?
462     my $i = 0;
463     $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
464
465 # make one if none    
466     $r->[$i] = {} unless $r->[$i];
467
468 # merge if two hashes
469     my %opts = map %$_, @$r[$i .. $#{$r}];
470     splice @$r, $i+1, ($#{$r} - $i), ();
471
472 # merge with master
473     %opts = (%{ $self->_master_connect_info_opts }, %opts);
474
475 # update
476     $r->[$i] = \%opts;
477   }
478
479   $self->$next($self->schema, @args);
480 };
481
482 =head2 all_storages
483
484 Returns an array of of all the connected storage backends.  The first element
485 in the returned array is the master, and the remainings are each of the
486 replicants.
487
488 =cut
489
490 sub all_storages {
491   my $self = shift @_;
492   return grep {defined $_ && blessed $_} (
493      $self->master,
494      values %{ $self->replicants },
495   );
496 }
497
498 =head2 execute_reliably ($coderef, ?@args)
499
500 Given a coderef, saves the current state of the L</read_handler>, forces it to
501 use reliable storage (ie sets it to the master), executes a coderef and then
502 restores the original state.
503
504 Example:
505
506   my $reliably = sub {
507     my $name = shift @_;
508     $schema->resultset('User')->create({name=>$name});
509     my $user_rs = $schema->resultset('User')->find({name=>$name}); 
510     return $user_rs;
511   };
512
513   my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
514
515 Use this when you must be certain of your database state, such as when you just
516 inserted something and need to get a resultset including it, etc.
517
518 =cut
519
520 sub execute_reliably {
521   my ($self, $coderef, @args) = @_;
522   
523   unless( ref $coderef eq 'CODE') {
524     $self->throw_exception('Second argument must be a coderef');
525   }
526   
527   ##Get copy of master storage
528   my $master = $self->master;
529   
530   ##Get whatever the current read hander is
531   my $current = $self->read_handler;
532   
533   ##Set the read handler to master
534   $self->read_handler($master);
535   
536   ## do whatever the caller needs
537   my @result;
538   my $want_array = wantarray;
539   
540   eval {
541     if($want_array) {
542       @result = $coderef->(@args);
543     } elsif(defined $want_array) {
544       ($result[0]) = ($coderef->(@args));
545     } else {
546       $coderef->(@args);
547     }       
548   };
549   
550   ##Reset to the original state
551   $self->read_handler($current); 
552   
553   ##Exception testing has to come last, otherwise you might leave the 
554   ##read_handler set to master.
555   
556   if($@) {
557     $self->throw_exception("coderef returned an error: $@");
558   } else {
559     return $want_array ? @result : $result[0];
560   }
561 }
562
563 =head2 set_reliable_storage
564
565 Sets the current $schema to be 'reliable', that is all queries, both read and
566 write are sent to the master
567   
568 =cut
569
570 sub set_reliable_storage {
571   my $self = shift @_;
572   my $schema = $self->schema;
573   my $write_handler = $self->schema->storage->write_handler;
574   
575   $schema->storage->read_handler($write_handler);
576 }
577
578 =head2 set_balanced_storage
579
580 Sets the current $schema to be use the </balancer> for all reads, while all
581 writea are sent to the master only
582   
583 =cut
584
585 sub set_balanced_storage {
586   my $self = shift @_;
587   my $schema = $self->schema;
588   my $write_handler = $self->schema->storage->balancer;
589   
590   $schema->storage->read_handler($write_handler);
591 }
592
593 =head2 around: txn_do ($coderef)
594
595 Overload to the txn_do method, which is delegated to whatever the
596 L<write_handler> is set to.  We overload this in order to wrap in inside a
597 L</execute_reliably> method.
598
599 =cut
600
601 around 'txn_do' => sub {
602   my($txn_do, $self, $coderef, @args) = @_;
603   $self->execute_reliably(sub {$self->$txn_do($coderef, @args)}); 
604 };
605
606 =head2 connected
607
608 Check that the master and at least one of the replicants is connected.
609
610 =cut
611
612 sub connected {
613   my $self = shift @_;
614   return
615     $self->master->connected &&
616     $self->pool->connected_replicants;
617 }
618
619 =head2 ensure_connected
620
621 Make sure all the storages are connected.
622
623 =cut
624
625 sub ensure_connected {
626   my $self = shift @_;
627   foreach my $source ($self->all_storages) {
628     $source->ensure_connected(@_);
629   }
630 }
631
632 =head2 limit_dialect
633
634 Set the limit_dialect for all existing storages
635
636 =cut
637
638 sub limit_dialect {
639   my $self = shift @_;
640   foreach my $source ($self->all_storages) {
641     $source->limit_dialect(@_);
642   }
643   return $self->master->quote_char;
644 }
645
646 =head2 quote_char
647
648 Set the quote_char for all existing storages
649
650 =cut
651
652 sub quote_char {
653   my $self = shift @_;
654   foreach my $source ($self->all_storages) {
655     $source->quote_char(@_);
656   }
657   return $self->master->quote_char;
658 }
659
660 =head2 name_sep
661
662 Set the name_sep for all existing storages
663
664 =cut
665
666 sub name_sep {
667   my $self = shift @_;
668   foreach my $source ($self->all_storages) {
669     $source->name_sep(@_);
670   }
671   return $self->master->name_sep;
672 }
673
674 =head2 set_schema
675
676 Set the schema object for all existing storages
677
678 =cut
679
680 sub set_schema {
681   my $self = shift @_;
682   foreach my $source ($self->all_storages) {
683     $source->set_schema(@_);
684   }
685 }
686
687 =head2 debug
688
689 set a debug flag across all storages
690
691 =cut
692
693 sub debug {
694   my $self = shift @_;
695   if(@_) {
696     foreach my $source ($self->all_storages) {
697       $source->debug(@_);
698     }   
699   }
700   return $self->master->debug;
701 }
702
703 =head2 debugobj
704
705 set a debug object across all storages
706
707 =cut
708
709 sub debugobj {
710   my $self = shift @_;
711   if(@_) {
712     foreach my $source ($self->all_storages) {
713       $source->debugobj(@_);
714     }   
715   }
716   return $self->master->debugobj;
717 }
718
719 =head2 debugfh
720
721 set a debugfh object across all storages
722
723 =cut
724
725 sub debugfh {
726   my $self = shift @_;
727   if(@_) {
728     foreach my $source ($self->all_storages) {
729       $source->debugfh(@_);
730     }   
731   }
732   return $self->master->debugfh;
733 }
734
735 =head2 debugcb
736
737 set a debug callback across all storages
738
739 =cut
740
741 sub debugcb {
742   my $self = shift @_;
743   if(@_) {
744     foreach my $source ($self->all_storages) {
745       $source->debugcb(@_);
746     }   
747   }
748   return $self->master->debugcb;
749 }
750
751 =head2 disconnect
752
753 disconnect everything
754
755 =cut
756
757 sub disconnect {
758   my $self = shift @_;
759   foreach my $source ($self->all_storages) {
760     $source->disconnect(@_);
761   }
762 }
763
764 =head2 cursor_class
765
766 set cursor class on all storages, or return master's
767
768 =cut
769
770 sub cursor_class {
771   my ($self, $cursor_class) = @_;
772
773   if ($cursor_class) {
774     $_->cursor_class($cursor_class) for $self->all_storages;
775   }
776   $self->master->cursor_class;
777 }
778   
779 =head1 GOTCHAS
780
781 Due to the fact that replicants can lag behind a master, you must take care to
782 make sure you use one of the methods to force read queries to a master should
783 you need realtime data integrity.  For example, if you insert a row, and then
784 immediately re-read it from the database (say, by doing $row->discard_changes)
785 or you insert a row and then immediately build a query that expects that row
786 to be an item, you should force the master to handle reads.  Otherwise, due to
787 the lag, there is no certainty your data will be in the expected state.
788
789 For data integrity, all transactions automatically use the master storage for
790 all read and write queries.  Using a transaction is the preferred and recommended
791 method to force the master to handle all read queries.
792
793 Otherwise, you can force a single query to use the master with the 'force_pool'
794 attribute:
795
796   my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk);
797
798 This attribute will safely be ignore by non replicated storages, so you can use
799 the same code for both types of systems.
800
801 Lastly, you can use the L</execute_reliably> method, which works very much like
802 a transaction.
803
804 For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
805 and L</set_balanced_storage>, however this operates at a global level and is not
806 suitable if you have a shared Schema object being used by multiple processes,
807 such as on a web application server.  You can get around this limitation by
808 using the Schema clone method.
809
810   my $new_schema = $schema->clone;
811   $new_schema->set_reliable_storage;
812   
813   ## $new_schema will use only the Master storage for all reads/writes while
814   ## the $schema object will use replicated storage.
815
816 =head1 AUTHOR
817
818   John Napiorkowski <john.napiorkowski@takkle.com>
819
820 Based on code originated by:
821
822   Norbert Csongrádi <bert@cpan.org>
823   Peter Siklósi <einon@einon.hu>
824
825 =head1 LICENSE
826
827 You may distribute this code under the same terms as Perl itself.
828
829 =cut
830
831 __PACKAGE__->meta->make_immutable;
832
833 1;