just a tiny grammer fix to POD
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 BEGIN {
4   use Carp::Clan qw/^DBIx::Class/;
5         
6   ## Modules required for Replication support not required for general DBIC
7   ## use, so we explicitly test for these.
8         
9   my %replication_required = (
10     Moose => '0.54',
11     MooseX::AttributeHelpers => '0.12',
12     Moose::Util::TypeConstraints => '0.54',
13     Class::MOP => '0.63',
14   );
15         
16   my @didnt_load;
17   
18   for my $module (keys %replication_required) {
19         eval "use $module $replication_required{$module}";
20         push @didnt_load, "$module $replication_required{$module}"
21          if $@;
22   }
23         
24   croak("@{[ join ', ', @didnt_load ]} are missing and are required for Replication")
25     if @didnt_load;     
26 }
27
28 use DBIx::Class::Storage::DBI;
29 use DBIx::Class::Storage::DBI::Replicated::Pool;
30 use DBIx::Class::Storage::DBI::Replicated::Balancer;
31
32 =head1 NAME
33
34 DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
35
36 =head1 SYNOPSIS
37
38 The Following example shows how to change an existing $schema to a replicated
39 storage type, add some replicated (readonly) databases, and perform reporting
40 tasks.
41
42   ## Change storage_type in your schema class
43   $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
44   
45   ## Add some slaves.  Basically this is an array of arrayrefs, where each
46   ## arrayref is database connect information
47   
48   $schema->storage->connect_replicants(
49     [$dsn1, $user, $pass, \%opts],
50     [$dsn2, $user, $pass, \%opts],
51     [$dsn3, $user, $pass, \%opts],
52   );
53   
54   ## Now, just use the $schema as normal
55   $schema->resultset('Source')->search({name=>'etc'});
56   
57   ## You can force a given query to use a particular storage using the search
58   ### attribute 'force_pool'.  For example:
59   
60   my $RS = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
61   
62   ## Now $RS will force everything (both reads and writes) to use whatever was
63   ## setup as the master storage.  'master' is hardcoded to always point to the
64   ## Master, but you can also use any Replicant name.  Please see:
65   ## L<DBIx::Class::Storage::Replicated::Pool> and the replicants attribute for
66   ## More. Also see transactions and L</execute_reliably> for alternative ways
67   ## to force read traffic to the master.
68   
69 =head1 DESCRIPTION
70
71 Warning: This class is marked BETA.  This has been running a production
72 website using MySQL native replication as its backend and we have some decent
73 test coverage but the code hasn't yet been stressed by a variety of databases.
74 Individual DB's may have quirks we are not aware of.  Please use this in first
75 development and pass along your experiences/bug fixes.
76
77 This class implements replicated data store for DBI. Currently you can define
78 one master and numerous slave database connections. All write-type queries
79 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
80 database, all read-type queries (SELECTs) go to the slave database.
81
82 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
83 handle gets delegated to one of the two attributes: L</read_handler> or to
84 L</write_handler>.  Additionally, some methods need to be distributed
85 to all existing storages.  This way our storage class is a drop in replacement
86 for L<DBIx::Class::Storage::DBI>.
87
88 Read traffic is spread across the replicants (slaves) occuring to a user
89 selected algorithm.  The default algorithm is random weighted.
90
91 =head1 NOTES
92
93 The consistancy betweeen master and replicants is database specific.  The Pool
94 gives you a method to validate it's replicants, removing and replacing them
95 when they fail/pass predefined criteria.  Please make careful use of the ways
96 to force a query to run against Master when needed.
97
98 =head1 REQUIREMENTS
99
100 Replicated Storage has additional requirements not currently part of L<DBIx::Class>
101
102   Moose => 1.54
103   MooseX::AttributeHelpers => 0.12 
104   Moose::Util::TypeConstraints => 0.54
105   Class::MOP => 0.63
106   
107 You will need to install these modules manually via CPAN or make them part of the
108 Makefile for your distribution.
109
110 =head1 ATTRIBUTES
111
112 This class defines the following attributes.
113
114 =head2 schema
115
116 The underlying L<DBIx::Class::Schema> object this storage is attaching
117
118 =cut
119
120 has 'schema' => (
121     is=>'rw',
122     isa=>'DBIx::Class::Schema',
123     weak_ref=>1,
124     required=>1,
125 );
126
127 =head2 pool_type
128
129 Contains the classname which will instantiate the L</pool> object.  Defaults 
130 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
131
132 =cut
133
134 has 'pool_type' => (
135   is=>'ro',
136   isa=>'ClassName',
137   required=>1,
138   default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
139   handles=>{
140     'create_pool' => 'new',
141   },
142 );
143
144 =head2 pool_args
145
146 Contains a hashref of initialized information to pass to the Balancer object.
147 See L<DBIx::Class::Storage::Replicated::Pool> for available arguments.
148
149 =cut
150
151 has 'pool_args' => (
152   is=>'ro',
153   isa=>'HashRef',
154   lazy=>1,
155   required=>1,
156   default=>sub { {} },
157 );
158
159
160 =head2 balancer_type
161
162 The replication pool requires a balance class to provider the methods for
163 choose how to spread the query load across each replicant in the pool.
164
165 =cut
166
167 subtype 'DBIx::Class::Storage::DBI::Replicated::BalancerClassNamePart',
168   as 'ClassName';
169     
170 coerce 'DBIx::Class::Storage::DBI::Replicated::BalancerClassNamePart',
171   from 'Str',
172   via {
173         my $type = $_;
174     if($type=~m/^::/) {
175       $type = 'DBIx::Class::Storage::DBI::Replicated::Balancer'.$type;
176     }  
177     Class::MOP::load_class($type);  
178     $type;      
179   };
180
181 has 'balancer_type' => (
182   is=>'ro',
183   isa=>'DBIx::Class::Storage::DBI::Replicated::BalancerClassNamePart',
184   coerce=>1,
185   required=>1,
186   default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
187   handles=>{
188     'create_balancer' => 'new',
189   },
190 );
191
192 =head2 balancer_args
193
194 Contains a hashref of initialized information to pass to the Balancer object.
195 See L<DBIx::Class::Storage::Replicated::Balancer> for available arguments.
196
197 =cut
198
199 has 'balancer_args' => (
200   is=>'ro',
201   isa=>'HashRef',
202   lazy=>1,
203   required=>1,
204   default=>sub { {} },
205 );
206
207 =head2 pool
208
209 Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
210 container class for one or more replicated databases.
211
212 =cut
213
214 has 'pool' => (
215   is=>'ro',
216   isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
217   lazy_build=>1,
218   handles=>[qw/
219     connect_replicants    
220     replicants
221     has_replicants
222   /],
223 );
224
225 =head2 balancer
226
227 Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This 
228 is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
229
230 =cut
231
232 has 'balancer' => (
233   is=>'ro',
234   isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
235   lazy_build=>1,
236   handles=>[qw/auto_validate_every/],
237 );
238
239 =head2 master
240
241 The master defines the canonical state for a pool of connected databases.  All
242 the replicants are expected to match this databases state.  Thus, in a classic
243 Master / Slaves distributed system, all the slaves are expected to replicate
244 the Master's state as quick as possible.  This is the only database in the
245 pool of databases that is allowed to handle write traffic.
246
247 =cut
248
249 has 'master' => (
250   is=> 'ro',
251   isa=>'DBIx::Class::Storage::DBI',
252   lazy_build=>1,
253 );
254
255 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
256
257 The following methods are delegated all the methods required for the 
258 L<DBIx::Class::Storage::DBI> interface.
259
260 =head2 read_handler
261
262 Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
263
264 =cut
265
266 has 'read_handler' => (
267   is=>'rw',
268   isa=>'Object',
269   lazy_build=>1,
270   handles=>[qw/
271     select
272     select_single
273     columns_info_for
274   /],    
275 );
276
277 =head2 write_handler
278
279 Defines an object that implements the write side of L<BIx::Class::Storage::DBI>.
280
281 =cut
282
283 has 'write_handler' => (
284   is=>'ro',
285   isa=>'Object',
286   lazy_build=>1,
287   lazy_build=>1,
288   handles=>[qw/   
289     on_connect_do
290     on_disconnect_do       
291     connect_info
292     throw_exception
293     sql_maker
294     sqlt_type
295     create_ddl_dir
296     deployment_statements
297     datetime_parser
298     datetime_parser_type        
299     last_insert_id
300     insert
301     insert_bulk
302     update
303     delete
304     dbh
305     txn_begin
306     txn_do
307     txn_commit
308     txn_rollback
309     txn_scope_guard
310     sth
311     deploy
312
313     reload_row
314     _prep_for_execute
315     configure_sqlt
316     
317   /],
318 );
319
320 =head1 METHODS
321
322 This class defines the following methods.
323
324 =head2 BUILDARGS
325
326 L<DBIx::Class::Schema> when instantiating it's storage passed itself as the
327 first argument.  So we need to massage the arguments a bit so that all the
328 bits get put into the correct places.
329
330 =cut
331
332 sub BUILDARGS {
333   my ($class, $schema, $storage_type_args, @args) = @_; 
334   
335   return {
336         schema=>$schema, 
337         %$storage_type_args,
338         @args
339   }
340 }
341
342 =head2 _build_master
343
344 Lazy builder for the L</master> attribute.
345
346 =cut
347
348 sub _build_master {
349   my $self = shift @_;
350   DBIx::Class::Storage::DBI->new($self->schema);
351 }
352
353 =head2 _build_pool
354
355 Lazy builder for the L</pool> attribute.
356
357 =cut
358
359 sub _build_pool {
360   my $self = shift @_;
361   $self->create_pool(%{$self->pool_args});
362 }
363
364 =head2 _build_balancer
365
366 Lazy builder for the L</balancer> attribute.  This takes a Pool object so that
367 the balancer knows which pool it's balancing.
368
369 =cut
370
371 sub _build_balancer {
372   my $self = shift @_;
373   $self->create_balancer(
374     pool=>$self->pool, 
375     master=>$self->master,
376     %{$self->balancer_args},
377   );
378 }
379
380 =head2 _build_write_handler
381
382 Lazy builder for the L</write_handler> attribute.  The default is to set this to
383 the L</master>.
384
385 =cut
386
387 sub _build_write_handler {
388   return shift->master;
389 }
390
391 =head2 _build_read_handler
392
393 Lazy builder for the L</read_handler> attribute.  The default is to set this to
394 the L</balancer>.
395
396 =cut
397
398 sub _build_read_handler {
399   return shift->balancer;
400 }
401
402 =head2 around: connect_replicants
403
404 All calls to connect_replicants needs to have an existing $schema tacked onto
405 top of the args, since L<DBIx::Storage::DBI> needs it.
406
407 =cut
408
409 around 'connect_replicants' => sub {
410   my ($method, $self, @args) = @_;
411   $self->$method($self->schema, @args);
412 };
413
414 =head2 all_storages
415
416 Returns an array of of all the connected storage backends.  The first element
417 in the returned array is the master, and the remainings are each of the
418 replicants.
419
420 =cut
421
422 sub all_storages {
423   my $self = shift @_;
424   return grep {defined $_ && blessed $_} (
425      $self->master,
426      $self->replicants,
427   );
428 }
429
430 =head2 execute_reliably ($coderef, ?@args)
431
432 Given a coderef, saves the current state of the L</read_handler>, forces it to
433 use reliable storage (ie sets it to the master), executes a coderef and then
434 restores the original state.
435
436 Example:
437
438   my $reliably = sub {
439     my $name = shift @_;
440     $schema->resultset('User')->create({name=>$name});
441     my $user_rs = $schema->resultset('User')->find({name=>$name}); 
442     return $user_rs;
443   };
444
445   my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
446
447 Use this when you must be certain of your database state, such as when you just
448 inserted something and need to get a resultset including it, etc.
449
450 =cut
451
452 use Benchmark;
453
454 sub execute_reliably {
455   my ($self, $coderef, @args) = @_;
456   
457   unless( ref $coderef eq 'CODE') {
458     $self->throw_exception('Second argument must be a coderef');
459   }
460   
461   ##Get copy of master storage
462   my $master = $self->master;
463   
464   ##Get whatever the current read hander is
465   my $current = $self->read_handler;
466   
467   ##Set the read handler to master
468   $self->read_handler($master);
469   
470   ## do whatever the caller needs
471   my @result;
472   my $want_array = wantarray;
473   
474   eval {
475     if($want_array) {
476       @result = $coderef->(@args);
477     } elsif(defined $want_array) {
478       ($result[0]) = ($coderef->(@args));
479     } else {
480       $coderef->(@args);
481     }       
482   };
483   
484   ##Reset to the original state
485   $self->read_handler($current); 
486   
487   ##Exception testing has to come last, otherwise you might leave the 
488   ##read_handler set to master.
489   
490   if($@) {
491     $self->throw_exception("coderef returned an error: $@");
492   } else {
493     return $want_array ? @result : $result[0];
494   }
495 }
496
497 =head2 set_reliable_storage
498
499 Sets the current $schema to be 'reliable', that is all queries, both read and
500 write are sent to the master
501   
502 =cut
503
504 sub set_reliable_storage {
505   my $self = shift @_;
506   my $schema = $self->schema;
507   my $write_handler = $self->schema->storage->write_handler;
508   
509   $schema->storage->read_handler($write_handler);
510 }
511
512 =head2 set_balanced_storage
513
514 Sets the current $schema to be use the </balancer> for all reads, while all
515 writea are sent to the master only
516   
517 =cut
518
519 sub set_balanced_storage {
520   my $self = shift @_;
521   my $schema = $self->schema;
522   my $write_handler = $self->schema->storage->balancer;
523   
524   $schema->storage->read_handler($write_handler);
525 }
526
527 =head2 around: txn_do ($coderef)
528
529 Overload to the txn_do method, which is delegated to whatever the
530 L<write_handler> is set to.  We overload this in order to wrap in inside a
531 L</execute_reliably> method.
532
533 =cut
534
535 around 'txn_do' => sub {
536   my($txn_do, $self, $coderef, @args) = @_;
537   $self->execute_reliably(sub {$self->$txn_do($coderef, @args)}); 
538 };
539
540 =head2 connected
541
542 Check that the master and at least one of the replicants is connected.
543
544 =cut
545
546 sub connected {
547   my $self = shift @_;
548   return
549     $self->master->connected &&
550     $self->pool->connected_replicants;
551 }
552
553 =head2 ensure_connected
554
555 Make sure all the storages are connected.
556
557 =cut
558
559 sub ensure_connected {
560   my $self = shift @_;
561   foreach my $source ($self->all_storages) {
562     $source->ensure_connected(@_);
563   }
564 }
565
566 =head2 limit_dialect
567
568 Set the limit_dialect for all existing storages
569
570 =cut
571
572 sub limit_dialect {
573   my $self = shift @_;
574   foreach my $source ($self->all_storages) {
575     $source->limit_dialect(@_);
576   }
577   return $self->master->quote_char;
578 }
579
580 =head2 quote_char
581
582 Set the quote_char for all existing storages
583
584 =cut
585
586 sub quote_char {
587   my $self = shift @_;
588   foreach my $source ($self->all_storages) {
589     $source->quote_char(@_);
590   }
591   return $self->master->quote_char;
592 }
593
594 =head2 name_sep
595
596 Set the name_sep for all existing storages
597
598 =cut
599
600 sub name_sep {
601   my $self = shift @_;
602   foreach my $source ($self->all_storages) {
603     $source->name_sep(@_);
604   }
605   return $self->master->name_sep;
606 }
607
608 =head2 set_schema
609
610 Set the schema object for all existing storages
611
612 =cut
613
614 sub set_schema {
615   my $self = shift @_;
616   foreach my $source ($self->all_storages) {
617     $source->set_schema(@_);
618   }
619 }
620
621 =head2 debug
622
623 set a debug flag across all storages
624
625 =cut
626
627 sub debug {
628   my $self = shift @_;
629   if(@_) {
630     foreach my $source ($self->all_storages) {
631       $source->debug(@_);
632     }   
633   }
634   return $self->master->debug;
635 }
636
637 =head2 debugobj
638
639 set a debug object across all storages
640
641 =cut
642
643 sub debugobj {
644   my $self = shift @_;
645   if(@_) {
646     foreach my $source ($self->all_storages) {
647       $source->debugobj(@_);
648     }   
649   }
650   return $self->master->debugobj;
651 }
652
653 =head2 debugfh
654
655 set a debugfh object across all storages
656
657 =cut
658
659 sub debugfh {
660   my $self = shift @_;
661   if(@_) {
662     foreach my $source ($self->all_storages) {
663       $source->debugfh(@_);
664     }   
665   }
666   return $self->master->debugfh;
667 }
668
669 =head2 debugcb
670
671 set a debug callback across all storages
672
673 =cut
674
675 sub debugcb {
676   my $self = shift @_;
677   if(@_) {
678     foreach my $source ($self->all_storages) {
679       $source->debugcb(@_);
680     }   
681   }
682   return $self->master->debugcb;
683 }
684
685 =head2 disconnect
686
687 disconnect everything
688
689 =cut
690
691 sub disconnect {
692   my $self = shift @_;
693   foreach my $source ($self->all_storages) {
694     $source->disconnect(@_);
695   }
696 }
697
698 =head1 GOTCHAS
699
700 Due to the fact that replicants can lag behind a master, you must take care to
701 make sure you use one of the methods to force read queries to a master should
702 you need realtime data integrity.  For example, if you insert a row, and then
703 immediately re-read it from the database (say, by doing $row->discard_changes)
704 or you insert a row and then immediately build a query that expects that row
705 to be an item, you should force the master to handle reads.  Otherwise, due to
706 the lag, there is no certainty your data will be in the expected state.
707
708 For data integrity, all transactions automatically use the master storage for
709 all read and write queries.  Using a transaction is the preferred and recommended
710 method to force the master to handle all read queries.
711
712 Otherwise, you can force a single query to use the master with the 'force_pool'
713 attribute:
714
715   my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk);
716
717 This attribute will safely be ignore by non replicated storages, so you can use
718 the same code for both types of systems.
719
720 Lastly, you can use the L</execute_reliably> method, which works very much like
721 a transaction.
722
723 For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
724 and L</set_balanced_storage>, however this operates at a global level and is not
725 suitable if you have a shared Schema object being used by multiple processes,
726 such as on a web application server.  You can get around this limitation by
727 using the Schema clone method.
728
729   my $new_schema = $schema->clone;
730   $new_schema->set_reliable_storage;
731   
732   ## $new_schema will use only the Master storage for all reads/writes while
733   ## the $schema object will use replicated storage.
734
735 =head1 AUTHOR
736
737   John Napiorkowski <john.napiorkowski@takkle.com>
738
739 Based on code originated by:
740
741   Norbert Csongrádi <bert@cpan.org>
742   Peter Siklósi <einon@einon.hu>
743
744 =head1 LICENSE
745
746 You may distribute this code under the same terms as Perl itself.
747
748 =cut
749
750 __PACKAGE__->meta->make_immutable;
751
752 1;