support ::DBI::Replicated opts in connect_info
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 BEGIN {
4   use Carp::Clan qw/^DBIx::Class/;
5         
6   ## Modules required for Replication support not required for general DBIC
7   ## use, so we explicitly test for these.
8         
9   my %replication_required = (
10     Moose => '0.77',
11     MooseX::AttributeHelpers => '0.12',
12     MooseX::Types => '0.10',
13     namespace::clean => '0.11',
14   );
15         
16   my @didnt_load;
17   
18   for my $module (keys %replication_required) {
19         eval "use $module $replication_required{$module}";
20         push @didnt_load, "$module $replication_required{$module}"
21          if $@;
22   }
23         
24   croak("@{[ join ', ', @didnt_load ]} are missing and are required for Replication")
25     if @didnt_load;     
26 }
27
28 use Moose;
29 use DBIx::Class::Storage::DBI;
30 use DBIx::Class::Storage::DBI::Replicated::Pool;
31 use DBIx::Class::Storage::DBI::Replicated::Balancer;
32 use DBIx::Class::Storage::DBI::Replicated::Types 'BalancerClassNamePart';
33 use MooseX::Types::Moose qw/ClassName HashRef Object/;
34 use Scalar::Util 'reftype';
35 use Carp::Clan qw/^DBIx::Class/;
36
37 use namespace::clean -except => 'meta';
38
39 =head1 NAME
40
41 DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
42
43 =head1 SYNOPSIS
44
45 The Following example shows how to change an existing $schema to a replicated
46 storage type, add some replicated (readonly) databases, and perform reporting
47 tasks.
48
49   ## Change storage_type in your schema class
50   $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
51   
52   ## Add some slaves.  Basically this is an array of arrayrefs, where each
53   ## arrayref is database connect information
54   
55   $schema->storage->connect_replicants(
56     [$dsn1, $user, $pass, \%opts],
57     [$dsn2, $user, $pass, \%opts],
58     [$dsn3, $user, $pass, \%opts],
59   );
60   
61   ## Now, just use the $schema as normal
62   $schema->resultset('Source')->search({name=>'etc'});
63   
64   ## You can force a given query to use a particular storage using the search
65   ### attribute 'force_pool'.  For example:
66   
67   my $RS = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
68   
69   ## Now $RS will force everything (both reads and writes) to use whatever was
70   ## setup as the master storage.  'master' is hardcoded to always point to the
71   ## Master, but you can also use any Replicant name.  Please see:
72   ## L<DBIx::Class::Storage::Replicated::Pool> and the replicants attribute for
73   ## More. Also see transactions and L</execute_reliably> for alternative ways
74   ## to force read traffic to the master.
75   
76 =head1 DESCRIPTION
77
78 Warning: This class is marked BETA.  This has been running a production
79 website using MySQL native replication as its backend and we have some decent
80 test coverage but the code hasn't yet been stressed by a variety of databases.
81 Individual DB's may have quirks we are not aware of.  Please use this in first
82 development and pass along your experiences/bug fixes.
83
84 This class implements replicated data store for DBI. Currently you can define
85 one master and numerous slave database connections. All write-type queries
86 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
87 database, all read-type queries (SELECTs) go to the slave database.
88
89 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
90 handle gets delegated to one of the two attributes: L</read_handler> or to
91 L</write_handler>.  Additionally, some methods need to be distributed
92 to all existing storages.  This way our storage class is a drop in replacement
93 for L<DBIx::Class::Storage::DBI>.
94
95 Read traffic is spread across the replicants (slaves) occuring to a user
96 selected algorithm.  The default algorithm is random weighted.
97
98 =head1 NOTES
99
100 The consistancy betweeen master and replicants is database specific.  The Pool
101 gives you a method to validate it's replicants, removing and replacing them
102 when they fail/pass predefined criteria.  Please make careful use of the ways
103 to force a query to run against Master when needed.
104
105 =head1 REQUIREMENTS
106
107 Replicated Storage has additional requirements not currently part of L<DBIx::Class>
108
109   Moose => 0.77
110   MooseX::AttributeHelpers => 0.12 
111   MooseX::Types => 0.10
112   namespace::clean => 0.11
113   
114 You will need to install these modules manually via CPAN or make them part of the
115 Makefile for your distribution.
116
117 =head1 ATTRIBUTES
118
119 This class defines the following attributes.
120
121 =head2 schema
122
123 The underlying L<DBIx::Class::Schema> object this storage is attaching
124
125 =cut
126
127 has 'schema' => (
128     is=>'rw',
129     isa=>'DBIx::Class::Schema',
130     weak_ref=>1,
131     required=>1,
132 );
133
134 =head2 pool_type
135
136 Contains the classname which will instantiate the L</pool> object.  Defaults 
137 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
138
139 =cut
140
141 has 'pool_type' => (
142   is=>'rw',
143   isa=>ClassName,
144   default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
145   handles=>{
146     'create_pool' => 'new',
147   },
148 );
149
150 =head2 pool_args
151
152 Contains a hashref of initialized information to pass to the Balancer object.
153 See L<DBIx::Class::Storage::Replicated::Pool> for available arguments.
154
155 =cut
156
157 has 'pool_args' => (
158   is=>'rw',
159   isa=>HashRef,
160   lazy=>1,
161   default=>sub { {} },
162 );
163
164
165 =head2 balancer_type
166
167 The replication pool requires a balance class to provider the methods for
168 choose how to spread the query load across each replicant in the pool.
169
170 =cut
171
172 has 'balancer_type' => (
173   is=>'rw',
174   isa=>BalancerClassNamePart,
175   coerce=>1,
176   required=>1,
177   default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
178   handles=>{
179     'create_balancer' => 'new',
180   },
181 );
182
183 =head2 balancer_args
184
185 Contains a hashref of initialized information to pass to the Balancer object.
186 See L<DBIx::Class::Storage::Replicated::Balancer> for available arguments.
187
188 =cut
189
190 has 'balancer_args' => (
191   is=>'rw',
192   isa=>HashRef,
193   lazy=>1,
194   required=>1,
195   default=>sub { {} },
196 );
197
198 =head2 pool
199
200 Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
201 container class for one or more replicated databases.
202
203 =cut
204
205 has 'pool' => (
206   is=>'ro',
207   isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
208   lazy_build=>1,
209   handles=>[qw/
210     connect_replicants    
211     replicants
212     has_replicants
213   /],
214 );
215
216 =head2 balancer
217
218 Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This 
219 is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
220
221 =cut
222
223 has 'balancer' => (
224   is=>'rw',
225   isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
226   lazy_build=>1,
227   handles=>[qw/auto_validate_every/],
228 );
229
230 =head2 master
231
232 The master defines the canonical state for a pool of connected databases.  All
233 the replicants are expected to match this databases state.  Thus, in a classic
234 Master / Slaves distributed system, all the slaves are expected to replicate
235 the Master's state as quick as possible.  This is the only database in the
236 pool of databases that is allowed to handle write traffic.
237
238 =cut
239
240 has 'master' => (
241   is=> 'ro',
242   isa=>'DBIx::Class::Storage::DBI',
243   lazy_build=>1,
244 );
245
246 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
247
248 The following methods are delegated all the methods required for the 
249 L<DBIx::Class::Storage::DBI> interface.
250
251 =head2 read_handler
252
253 Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
254
255 =cut
256
257 has 'read_handler' => (
258   is=>'rw',
259   isa=>Object,
260   lazy_build=>1,
261   handles=>[qw/
262     select
263     select_single
264     columns_info_for
265   /],    
266 );
267
268 =head2 write_handler
269
270 Defines an object that implements the write side of L<BIx::Class::Storage::DBI>.
271
272 =cut
273
274 has 'write_handler' => (
275   is=>'ro',
276   isa=>Object,
277   lazy_build=>1,
278   lazy_build=>1,
279   handles=>[qw/   
280     on_connect_do
281     on_disconnect_do       
282     connect_info
283     throw_exception
284     sql_maker
285     sqlt_type
286     create_ddl_dir
287     deployment_statements
288     datetime_parser
289     datetime_parser_type        
290     last_insert_id
291     insert
292     insert_bulk
293     update
294     delete
295     dbh
296     txn_begin
297     txn_do
298     txn_commit
299     txn_rollback
300     txn_scope_guard
301     sth
302     deploy
303     with_deferred_fk_checks
304
305     reload_row
306     _prep_for_execute
307     
308   /],
309 );
310
311 has _master_connect_info_opts =>
312   (is => 'rw', isa => HashRef, default => sub { {} });
313
314 =head2 around: connect_info
315
316 Preserve master's C<connect_info> options (for merging with replicants.)
317 Also set any Replicated related options from connect_info, such as
318 C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>.
319
320 =cut
321
322 around connect_info => sub {
323   my ($next, $self, $info, @extra) = @_;
324
325   my %opts;
326   for my $arg (@$info) {
327     next unless (reftype($arg)||'') eq 'HASH';
328     %opts = (%opts, %$arg);
329   }
330   delete $opts{dsn};
331
332   if (@opts{qw/pool_type pool_args/}) {
333     $self->pool_type(delete $opts{pool_type})
334       if $opts{pool_type};
335
336     $self->pool_args({
337       %{ $self->pool_args },
338       %{ delete $opts{pool_args} || {} }
339     });
340
341     $self->pool($self->_build_pool);
342   }
343
344   if (@opts{qw/balancer_type balancer_args/}) {
345     $self->balancer_type(delete $opts{balancer_type})
346       if $opts{balancer_type};
347
348     $self->balancer_args({
349       %{ $self->balancer_args },
350       %{ delete $opts{balancer_args} || {} }
351     });
352
353     $self->balancer($self->_build_balancer);
354   }
355
356   $self->_master_connect_info_opts(\%opts);
357
358   $self->$next($info, @extra);
359 };
360
361 =head1 METHODS
362
363 This class defines the following methods.
364
365 =head2 BUILDARGS
366
367 L<DBIx::Class::Schema> when instantiating it's storage passed itself as the
368 first argument.  So we need to massage the arguments a bit so that all the
369 bits get put into the correct places.
370
371 =cut
372
373 sub BUILDARGS {
374   my ($class, $schema, $storage_type_args, @args) = @_; 
375   
376   return {
377         schema=>$schema, 
378         %$storage_type_args,
379         @args
380   }
381 }
382
383 =head2 _build_master
384
385 Lazy builder for the L</master> attribute.
386
387 =cut
388
389 sub _build_master {
390   my $self = shift @_;
391   DBIx::Class::Storage::DBI->new($self->schema);
392 }
393
394 =head2 _build_pool
395
396 Lazy builder for the L</pool> attribute.
397
398 =cut
399
400 sub _build_pool {
401   my $self = shift @_;
402   $self->create_pool(%{$self->pool_args});
403 }
404
405 =head2 _build_balancer
406
407 Lazy builder for the L</balancer> attribute.  This takes a Pool object so that
408 the balancer knows which pool it's balancing.
409
410 =cut
411
412 sub _build_balancer {
413   my $self = shift @_;
414   $self->create_balancer(
415     pool=>$self->pool, 
416     master=>$self->master,
417     %{$self->balancer_args},
418   );
419 }
420
421 =head2 _build_write_handler
422
423 Lazy builder for the L</write_handler> attribute.  The default is to set this to
424 the L</master>.
425
426 =cut
427
428 sub _build_write_handler {
429   return shift->master;
430 }
431
432 =head2 _build_read_handler
433
434 Lazy builder for the L</read_handler> attribute.  The default is to set this to
435 the L</balancer>.
436
437 =cut
438
439 sub _build_read_handler {
440   return shift->balancer;
441 }
442
443 =head2 around: connect_replicants
444
445 All calls to connect_replicants needs to have an existing $schema tacked onto
446 top of the args, since L<DBIx::Storage::DBI> needs it, and any C<connect_info>
447 options merged with the master, with replicant opts having higher priority.
448
449 =cut
450
451 around connect_replicants => sub {
452   my ($next, $self, @args) = @_;
453
454   for my $r (@args) {
455     $r = [ $r ] unless reftype $r eq 'ARRAY';
456
457     croak "coderef replicant connect_info not supported"
458       if ref $r->[0] && reftype $r->[0] eq 'CODE';
459
460 # any connect_info options?
461     my $i = 0;
462     $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
463
464 # make one if none    
465     $r->[$i] = {} unless $r->[$i];
466
467 # merge if two hashes
468     my %opts = map %$_, @$r[$i .. $#{$r}];
469     splice @$r, $i+1, ($#{$r} - $i), ();
470
471 # merge with master
472     %opts = (%{ $self->_master_connect_info_opts }, %opts);
473
474 # update
475     $r->[$i] = \%opts;
476   }
477
478   $self->$next($self->schema, @args);
479 };
480
481 =head2 all_storages
482
483 Returns an array of of all the connected storage backends.  The first element
484 in the returned array is the master, and the remainings are each of the
485 replicants.
486
487 =cut
488
489 sub all_storages {
490   my $self = shift @_;
491   return grep {defined $_ && blessed $_} (
492      $self->master,
493      values %{ $self->replicants },
494   );
495 }
496
497 =head2 execute_reliably ($coderef, ?@args)
498
499 Given a coderef, saves the current state of the L</read_handler>, forces it to
500 use reliable storage (ie sets it to the master), executes a coderef and then
501 restores the original state.
502
503 Example:
504
505   my $reliably = sub {
506     my $name = shift @_;
507     $schema->resultset('User')->create({name=>$name});
508     my $user_rs = $schema->resultset('User')->find({name=>$name}); 
509     return $user_rs;
510   };
511
512   my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
513
514 Use this when you must be certain of your database state, such as when you just
515 inserted something and need to get a resultset including it, etc.
516
517 =cut
518
519 sub execute_reliably {
520   my ($self, $coderef, @args) = @_;
521   
522   unless( ref $coderef eq 'CODE') {
523     $self->throw_exception('Second argument must be a coderef');
524   }
525   
526   ##Get copy of master storage
527   my $master = $self->master;
528   
529   ##Get whatever the current read hander is
530   my $current = $self->read_handler;
531   
532   ##Set the read handler to master
533   $self->read_handler($master);
534   
535   ## do whatever the caller needs
536   my @result;
537   my $want_array = wantarray;
538   
539   eval {
540     if($want_array) {
541       @result = $coderef->(@args);
542     } elsif(defined $want_array) {
543       ($result[0]) = ($coderef->(@args));
544     } else {
545       $coderef->(@args);
546     }       
547   };
548   
549   ##Reset to the original state
550   $self->read_handler($current); 
551   
552   ##Exception testing has to come last, otherwise you might leave the 
553   ##read_handler set to master.
554   
555   if($@) {
556     $self->throw_exception("coderef returned an error: $@");
557   } else {
558     return $want_array ? @result : $result[0];
559   }
560 }
561
562 =head2 set_reliable_storage
563
564 Sets the current $schema to be 'reliable', that is all queries, both read and
565 write are sent to the master
566   
567 =cut
568
569 sub set_reliable_storage {
570   my $self = shift @_;
571   my $schema = $self->schema;
572   my $write_handler = $self->schema->storage->write_handler;
573   
574   $schema->storage->read_handler($write_handler);
575 }
576
577 =head2 set_balanced_storage
578
579 Sets the current $schema to be use the </balancer> for all reads, while all
580 writea are sent to the master only
581   
582 =cut
583
584 sub set_balanced_storage {
585   my $self = shift @_;
586   my $schema = $self->schema;
587   my $write_handler = $self->schema->storage->balancer;
588   
589   $schema->storage->read_handler($write_handler);
590 }
591
592 =head2 around: txn_do ($coderef)
593
594 Overload to the txn_do method, which is delegated to whatever the
595 L<write_handler> is set to.  We overload this in order to wrap in inside a
596 L</execute_reliably> method.
597
598 =cut
599
600 around 'txn_do' => sub {
601   my($txn_do, $self, $coderef, @args) = @_;
602   $self->execute_reliably(sub {$self->$txn_do($coderef, @args)}); 
603 };
604
605 =head2 connected
606
607 Check that the master and at least one of the replicants is connected.
608
609 =cut
610
611 sub connected {
612   my $self = shift @_;
613   return
614     $self->master->connected &&
615     $self->pool->connected_replicants;
616 }
617
618 =head2 ensure_connected
619
620 Make sure all the storages are connected.
621
622 =cut
623
624 sub ensure_connected {
625   my $self = shift @_;
626   foreach my $source ($self->all_storages) {
627     $source->ensure_connected(@_);
628   }
629 }
630
631 =head2 limit_dialect
632
633 Set the limit_dialect for all existing storages
634
635 =cut
636
637 sub limit_dialect {
638   my $self = shift @_;
639   foreach my $source ($self->all_storages) {
640     $source->limit_dialect(@_);
641   }
642   return $self->master->quote_char;
643 }
644
645 =head2 quote_char
646
647 Set the quote_char for all existing storages
648
649 =cut
650
651 sub quote_char {
652   my $self = shift @_;
653   foreach my $source ($self->all_storages) {
654     $source->quote_char(@_);
655   }
656   return $self->master->quote_char;
657 }
658
659 =head2 name_sep
660
661 Set the name_sep for all existing storages
662
663 =cut
664
665 sub name_sep {
666   my $self = shift @_;
667   foreach my $source ($self->all_storages) {
668     $source->name_sep(@_);
669   }
670   return $self->master->name_sep;
671 }
672
673 =head2 set_schema
674
675 Set the schema object for all existing storages
676
677 =cut
678
679 sub set_schema {
680   my $self = shift @_;
681   foreach my $source ($self->all_storages) {
682     $source->set_schema(@_);
683   }
684 }
685
686 =head2 debug
687
688 set a debug flag across all storages
689
690 =cut
691
692 sub debug {
693   my $self = shift @_;
694   if(@_) {
695     foreach my $source ($self->all_storages) {
696       $source->debug(@_);
697     }   
698   }
699   return $self->master->debug;
700 }
701
702 =head2 debugobj
703
704 set a debug object across all storages
705
706 =cut
707
708 sub debugobj {
709   my $self = shift @_;
710   if(@_) {
711     foreach my $source ($self->all_storages) {
712       $source->debugobj(@_);
713     }   
714   }
715   return $self->master->debugobj;
716 }
717
718 =head2 debugfh
719
720 set a debugfh object across all storages
721
722 =cut
723
724 sub debugfh {
725   my $self = shift @_;
726   if(@_) {
727     foreach my $source ($self->all_storages) {
728       $source->debugfh(@_);
729     }   
730   }
731   return $self->master->debugfh;
732 }
733
734 =head2 debugcb
735
736 set a debug callback across all storages
737
738 =cut
739
740 sub debugcb {
741   my $self = shift @_;
742   if(@_) {
743     foreach my $source ($self->all_storages) {
744       $source->debugcb(@_);
745     }   
746   }
747   return $self->master->debugcb;
748 }
749
750 =head2 disconnect
751
752 disconnect everything
753
754 =cut
755
756 sub disconnect {
757   my $self = shift @_;
758   foreach my $source ($self->all_storages) {
759     $source->disconnect(@_);
760   }
761 }
762
763 =head2 cursor_class
764
765 set cursor class on all storages, or return master's
766
767 =cut
768
769 sub cursor_class {
770   my ($self, $cursor_class) = @_;
771
772   if ($cursor_class) {
773     $_->cursor_class($cursor_class) for $self->all_storages;
774   }
775   $self->master->cursor_class;
776 }
777   
778 =head1 GOTCHAS
779
780 Due to the fact that replicants can lag behind a master, you must take care to
781 make sure you use one of the methods to force read queries to a master should
782 you need realtime data integrity.  For example, if you insert a row, and then
783 immediately re-read it from the database (say, by doing $row->discard_changes)
784 or you insert a row and then immediately build a query that expects that row
785 to be an item, you should force the master to handle reads.  Otherwise, due to
786 the lag, there is no certainty your data will be in the expected state.
787
788 For data integrity, all transactions automatically use the master storage for
789 all read and write queries.  Using a transaction is the preferred and recommended
790 method to force the master to handle all read queries.
791
792 Otherwise, you can force a single query to use the master with the 'force_pool'
793 attribute:
794
795   my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk);
796
797 This attribute will safely be ignore by non replicated storages, so you can use
798 the same code for both types of systems.
799
800 Lastly, you can use the L</execute_reliably> method, which works very much like
801 a transaction.
802
803 For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
804 and L</set_balanced_storage>, however this operates at a global level and is not
805 suitable if you have a shared Schema object being used by multiple processes,
806 such as on a web application server.  You can get around this limitation by
807 using the Schema clone method.
808
809   my $new_schema = $schema->clone;
810   $new_schema->set_reliable_storage;
811   
812   ## $new_schema will use only the Master storage for all reads/writes while
813   ## the $schema object will use replicated storage.
814
815 =head1 AUTHOR
816
817   John Napiorkowski <john.napiorkowski@takkle.com>
818
819 Based on code originated by:
820
821   Norbert Csongrádi <bert@cpan.org>
822   Peter Siklósi <einon@einon.hu>
823
824 =head1 LICENSE
825
826 You may distribute this code under the same terms as Perl itself.
827
828 =cut
829
830 __PACKAGE__->meta->make_immutable;
831
832 1;