Replicated - fixup types and namespace::clean
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 BEGIN {
4   use Carp::Clan qw/^DBIx::Class/;
5         
6   ## Modules required for Replication support not required for general DBIC
7   ## use, so we explicitly test for these.
8         
9   my %replication_required = (
10     Moose => '0.77',
11     MooseX::AttributeHelpers => '0.12',
12     MooseX::Types => '0.10',
13     namespace::clean => '0.11',
14   );
15         
16   my @didnt_load;
17   
18   for my $module (keys %replication_required) {
19         eval "use $module $replication_required{$module}";
20         push @didnt_load, "$module $replication_required{$module}"
21          if $@;
22   }
23         
24   croak("@{[ join ', ', @didnt_load ]} are missing and are required for Replication")
25     if @didnt_load;     
26 }
27
28 use DBIx::Class::Storage::DBI;
29 use DBIx::Class::Storage::DBI::Replicated::Pool;
30 use DBIx::Class::Storage::DBI::Replicated::Balancer;
31 use DBIx::Class::Storage::DBI::Replicated::Types 'BalancerClassNamePart';
32 use MooseX::Types::Moose qw/ClassName HashRef Object/;
33
34 use namespace::clean -except => 'meta';
35
36 =head1 NAME
37
38 DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
39
40 =head1 SYNOPSIS
41
42 The Following example shows how to change an existing $schema to a replicated
43 storage type, add some replicated (readonly) databases, and perform reporting
44 tasks.
45
46   ## Change storage_type in your schema class
47   $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
48   
49   ## Add some slaves.  Basically this is an array of arrayrefs, where each
50   ## arrayref is database connect information
51   
52   $schema->storage->connect_replicants(
53     [$dsn1, $user, $pass, \%opts],
54     [$dsn2, $user, $pass, \%opts],
55     [$dsn3, $user, $pass, \%opts],
56   );
57   
58   ## Now, just use the $schema as normal
59   $schema->resultset('Source')->search({name=>'etc'});
60   
61   ## You can force a given query to use a particular storage using the search
62   ### attribute 'force_pool'.  For example:
63   
64   my $RS = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
65   
66   ## Now $RS will force everything (both reads and writes) to use whatever was
67   ## setup as the master storage.  'master' is hardcoded to always point to the
68   ## Master, but you can also use any Replicant name.  Please see:
69   ## L<DBIx::Class::Storage::Replicated::Pool> and the replicants attribute for
70   ## More. Also see transactions and L</execute_reliably> for alternative ways
71   ## to force read traffic to the master.
72   
73 =head1 DESCRIPTION
74
75 Warning: This class is marked BETA.  This has been running a production
76 website using MySQL native replication as its backend and we have some decent
77 test coverage but the code hasn't yet been stressed by a variety of databases.
78 Individual DB's may have quirks we are not aware of.  Please use this in first
79 development and pass along your experiences/bug fixes.
80
81 This class implements replicated data store for DBI. Currently you can define
82 one master and numerous slave database connections. All write-type queries
83 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
84 database, all read-type queries (SELECTs) go to the slave database.
85
86 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
87 handle gets delegated to one of the two attributes: L</read_handler> or to
88 L</write_handler>.  Additionally, some methods need to be distributed
89 to all existing storages.  This way our storage class is a drop in replacement
90 for L<DBIx::Class::Storage::DBI>.
91
92 Read traffic is spread across the replicants (slaves) occuring to a user
93 selected algorithm.  The default algorithm is random weighted.
94
95 =head1 NOTES
96
97 The consistancy betweeen master and replicants is database specific.  The Pool
98 gives you a method to validate it's replicants, removing and replacing them
99 when they fail/pass predefined criteria.  Please make careful use of the ways
100 to force a query to run against Master when needed.
101
102 =head1 REQUIREMENTS
103
104 Replicated Storage has additional requirements not currently part of L<DBIx::Class>
105
106   Moose => 0.77
107   MooseX::AttributeHelpers => 0.12 
108   MooseX::Types => 0.10
109   namespace::clean => 0.11
110   
111 You will need to install these modules manually via CPAN or make them part of the
112 Makefile for your distribution.
113
114 =head1 ATTRIBUTES
115
116 This class defines the following attributes.
117
118 =head2 schema
119
120 The underlying L<DBIx::Class::Schema> object this storage is attaching
121
122 =cut
123
124 has 'schema' => (
125     is=>'rw',
126     isa=>'DBIx::Class::Schema',
127     weak_ref=>1,
128     required=>1,
129 );
130
131 =head2 pool_type
132
133 Contains the classname which will instantiate the L</pool> object.  Defaults 
134 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
135
136 =cut
137
138 has 'pool_type' => (
139   is=>'ro',
140   isa=>ClassName,
141   required=>1,
142   default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
143   handles=>{
144     'create_pool' => 'new',
145   },
146 );
147
148 =head2 pool_args
149
150 Contains a hashref of initialized information to pass to the Balancer object.
151 See L<DBIx::Class::Storage::Replicated::Pool> for available arguments.
152
153 =cut
154
155 has 'pool_args' => (
156   is=>'ro',
157   isa=>HashRef,
158   lazy=>1,
159   required=>1,
160   default=>sub { {} },
161 );
162
163
164 =head2 balancer_type
165
166 The replication pool requires a balance class to provider the methods for
167 choose how to spread the query load across each replicant in the pool.
168
169 =cut
170
171 has 'balancer_type' => (
172   is=>'ro',
173   isa=>BalancerClassNamePart,
174   coerce=>1,
175   required=>1,
176   default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
177   handles=>{
178     'create_balancer' => 'new',
179   },
180 );
181
182 =head2 balancer_args
183
184 Contains a hashref of initialized information to pass to the Balancer object.
185 See L<DBIx::Class::Storage::Replicated::Balancer> for available arguments.
186
187 =cut
188
189 has 'balancer_args' => (
190   is=>'ro',
191   isa=>HashRef,
192   lazy=>1,
193   required=>1,
194   default=>sub { {} },
195 );
196
197 =head2 pool
198
199 Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
200 container class for one or more replicated databases.
201
202 =cut
203
204 has 'pool' => (
205   is=>'ro',
206   isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
207   lazy_build=>1,
208   handles=>[qw/
209     connect_replicants    
210     replicants
211     has_replicants
212   /],
213 );
214
215 =head2 balancer
216
217 Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This 
218 is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
219
220 =cut
221
222 has 'balancer' => (
223   is=>'ro',
224   isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
225   lazy_build=>1,
226   handles=>[qw/auto_validate_every/],
227 );
228
229 =head2 master
230
231 The master defines the canonical state for a pool of connected databases.  All
232 the replicants are expected to match this databases state.  Thus, in a classic
233 Master / Slaves distributed system, all the slaves are expected to replicate
234 the Master's state as quick as possible.  This is the only database in the
235 pool of databases that is allowed to handle write traffic.
236
237 =cut
238
239 has 'master' => (
240   is=> 'ro',
241   isa=>'DBIx::Class::Storage::DBI',
242   lazy_build=>1,
243 );
244
245 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
246
247 The following methods are delegated all the methods required for the 
248 L<DBIx::Class::Storage::DBI> interface.
249
250 =head2 read_handler
251
252 Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
253
254 =cut
255
256 has 'read_handler' => (
257   is=>'rw',
258   isa=>Object,
259   lazy_build=>1,
260   handles=>[qw/
261     select
262     select_single
263     columns_info_for
264   /],    
265 );
266
267 =head2 write_handler
268
269 Defines an object that implements the write side of L<BIx::Class::Storage::DBI>.
270
271 =cut
272
273 has 'write_handler' => (
274   is=>'ro',
275   isa=>Object,
276   lazy_build=>1,
277   lazy_build=>1,
278   handles=>[qw/   
279     on_connect_do
280     on_disconnect_do       
281     connect_info
282     throw_exception
283     sql_maker
284     sqlt_type
285     create_ddl_dir
286     deployment_statements
287     datetime_parser
288     datetime_parser_type        
289     last_insert_id
290     insert
291     insert_bulk
292     update
293     delete
294     dbh
295     txn_begin
296     txn_do
297     txn_commit
298     txn_rollback
299     txn_scope_guard
300     sth
301     deploy
302     with_deferred_fk_checks
303
304     reload_row
305     _prep_for_execute
306     
307   /],
308 );
309
310 =head1 METHODS
311
312 This class defines the following methods.
313
314 =head2 BUILDARGS
315
316 L<DBIx::Class::Schema> when instantiating it's storage passed itself as the
317 first argument.  So we need to massage the arguments a bit so that all the
318 bits get put into the correct places.
319
320 =cut
321
322 sub BUILDARGS {
323   my ($class, $schema, $storage_type_args, @args) = @_; 
324   
325   return {
326         schema=>$schema, 
327         %$storage_type_args,
328         @args
329   }
330 }
331
332 =head2 _build_master
333
334 Lazy builder for the L</master> attribute.
335
336 =cut
337
338 sub _build_master {
339   my $self = shift @_;
340   DBIx::Class::Storage::DBI->new($self->schema);
341 }
342
343 =head2 _build_pool
344
345 Lazy builder for the L</pool> attribute.
346
347 =cut
348
349 sub _build_pool {
350   my $self = shift @_;
351   $self->create_pool(%{$self->pool_args});
352 }
353
354 =head2 _build_balancer
355
356 Lazy builder for the L</balancer> attribute.  This takes a Pool object so that
357 the balancer knows which pool it's balancing.
358
359 =cut
360
361 sub _build_balancer {
362   my $self = shift @_;
363   $self->create_balancer(
364     pool=>$self->pool, 
365     master=>$self->master,
366     %{$self->balancer_args},
367   );
368 }
369
370 =head2 _build_write_handler
371
372 Lazy builder for the L</write_handler> attribute.  The default is to set this to
373 the L</master>.
374
375 =cut
376
377 sub _build_write_handler {
378   return shift->master;
379 }
380
381 =head2 _build_read_handler
382
383 Lazy builder for the L</read_handler> attribute.  The default is to set this to
384 the L</balancer>.
385
386 =cut
387
388 sub _build_read_handler {
389   return shift->balancer;
390 }
391
392 =head2 around: connect_replicants
393
394 All calls to connect_replicants needs to have an existing $schema tacked onto
395 top of the args, since L<DBIx::Storage::DBI> needs it.
396
397 =cut
398
399 around 'connect_replicants' => sub {
400   my ($method, $self, @args) = @_;
401   $self->$method($self->schema, @args);
402 };
403
404 =head2 all_storages
405
406 Returns an array of of all the connected storage backends.  The first element
407 in the returned array is the master, and the remainings are each of the
408 replicants.
409
410 =cut
411
412 sub all_storages {
413   my $self = shift @_;
414   return grep {defined $_ && blessed $_} (
415      $self->master,
416      values %{ $self->replicants },
417   );
418 }
419
420 =head2 execute_reliably ($coderef, ?@args)
421
422 Given a coderef, saves the current state of the L</read_handler>, forces it to
423 use reliable storage (ie sets it to the master), executes a coderef and then
424 restores the original state.
425
426 Example:
427
428   my $reliably = sub {
429     my $name = shift @_;
430     $schema->resultset('User')->create({name=>$name});
431     my $user_rs = $schema->resultset('User')->find({name=>$name}); 
432     return $user_rs;
433   };
434
435   my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
436
437 Use this when you must be certain of your database state, such as when you just
438 inserted something and need to get a resultset including it, etc.
439
440 =cut
441
442 sub execute_reliably {
443   my ($self, $coderef, @args) = @_;
444   
445   unless( ref $coderef eq 'CODE') {
446     $self->throw_exception('Second argument must be a coderef');
447   }
448   
449   ##Get copy of master storage
450   my $master = $self->master;
451   
452   ##Get whatever the current read hander is
453   my $current = $self->read_handler;
454   
455   ##Set the read handler to master
456   $self->read_handler($master);
457   
458   ## do whatever the caller needs
459   my @result;
460   my $want_array = wantarray;
461   
462   eval {
463     if($want_array) {
464       @result = $coderef->(@args);
465     } elsif(defined $want_array) {
466       ($result[0]) = ($coderef->(@args));
467     } else {
468       $coderef->(@args);
469     }       
470   };
471   
472   ##Reset to the original state
473   $self->read_handler($current); 
474   
475   ##Exception testing has to come last, otherwise you might leave the 
476   ##read_handler set to master.
477   
478   if($@) {
479     $self->throw_exception("coderef returned an error: $@");
480   } else {
481     return $want_array ? @result : $result[0];
482   }
483 }
484
485 =head2 set_reliable_storage
486
487 Sets the current $schema to be 'reliable', that is all queries, both read and
488 write are sent to the master
489   
490 =cut
491
492 sub set_reliable_storage {
493   my $self = shift @_;
494   my $schema = $self->schema;
495   my $write_handler = $self->schema->storage->write_handler;
496   
497   $schema->storage->read_handler($write_handler);
498 }
499
500 =head2 set_balanced_storage
501
502 Sets the current $schema to be use the </balancer> for all reads, while all
503 writea are sent to the master only
504   
505 =cut
506
507 sub set_balanced_storage {
508   my $self = shift @_;
509   my $schema = $self->schema;
510   my $write_handler = $self->schema->storage->balancer;
511   
512   $schema->storage->read_handler($write_handler);
513 }
514
515 =head2 around: txn_do ($coderef)
516
517 Overload to the txn_do method, which is delegated to whatever the
518 L<write_handler> is set to.  We overload this in order to wrap in inside a
519 L</execute_reliably> method.
520
521 =cut
522
523 around 'txn_do' => sub {
524   my($txn_do, $self, $coderef, @args) = @_;
525   $self->execute_reliably(sub {$self->$txn_do($coderef, @args)}); 
526 };
527
528 =head2 connected
529
530 Check that the master and at least one of the replicants is connected.
531
532 =cut
533
534 sub connected {
535   my $self = shift @_;
536   return
537     $self->master->connected &&
538     $self->pool->connected_replicants;
539 }
540
541 =head2 ensure_connected
542
543 Make sure all the storages are connected.
544
545 =cut
546
547 sub ensure_connected {
548   my $self = shift @_;
549   foreach my $source ($self->all_storages) {
550     $source->ensure_connected(@_);
551   }
552 }
553
554 =head2 limit_dialect
555
556 Set the limit_dialect for all existing storages
557
558 =cut
559
560 sub limit_dialect {
561   my $self = shift @_;
562   foreach my $source ($self->all_storages) {
563     $source->limit_dialect(@_);
564   }
565   return $self->master->quote_char;
566 }
567
568 =head2 quote_char
569
570 Set the quote_char for all existing storages
571
572 =cut
573
574 sub quote_char {
575   my $self = shift @_;
576   foreach my $source ($self->all_storages) {
577     $source->quote_char(@_);
578   }
579   return $self->master->quote_char;
580 }
581
582 =head2 name_sep
583
584 Set the name_sep for all existing storages
585
586 =cut
587
588 sub name_sep {
589   my $self = shift @_;
590   foreach my $source ($self->all_storages) {
591     $source->name_sep(@_);
592   }
593   return $self->master->name_sep;
594 }
595
596 =head2 set_schema
597
598 Set the schema object for all existing storages
599
600 =cut
601
602 sub set_schema {
603   my $self = shift @_;
604   foreach my $source ($self->all_storages) {
605     $source->set_schema(@_);
606   }
607 }
608
609 =head2 debug
610
611 set a debug flag across all storages
612
613 =cut
614
615 sub debug {
616   my $self = shift @_;
617   if(@_) {
618     foreach my $source ($self->all_storages) {
619       $source->debug(@_);
620     }   
621   }
622   return $self->master->debug;
623 }
624
625 =head2 debugobj
626
627 set a debug object across all storages
628
629 =cut
630
631 sub debugobj {
632   my $self = shift @_;
633   if(@_) {
634     foreach my $source ($self->all_storages) {
635       $source->debugobj(@_);
636     }   
637   }
638   return $self->master->debugobj;
639 }
640
641 =head2 debugfh
642
643 set a debugfh object across all storages
644
645 =cut
646
647 sub debugfh {
648   my $self = shift @_;
649   if(@_) {
650     foreach my $source ($self->all_storages) {
651       $source->debugfh(@_);
652     }   
653   }
654   return $self->master->debugfh;
655 }
656
657 =head2 debugcb
658
659 set a debug callback across all storages
660
661 =cut
662
663 sub debugcb {
664   my $self = shift @_;
665   if(@_) {
666     foreach my $source ($self->all_storages) {
667       $source->debugcb(@_);
668     }   
669   }
670   return $self->master->debugcb;
671 }
672
673 =head2 disconnect
674
675 disconnect everything
676
677 =cut
678
679 sub disconnect {
680   my $self = shift @_;
681   foreach my $source ($self->all_storages) {
682     $source->disconnect(@_);
683   }
684 }
685
686 =head1 GOTCHAS
687
688 Due to the fact that replicants can lag behind a master, you must take care to
689 make sure you use one of the methods to force read queries to a master should
690 you need realtime data integrity.  For example, if you insert a row, and then
691 immediately re-read it from the database (say, by doing $row->discard_changes)
692 or you insert a row and then immediately build a query that expects that row
693 to be an item, you should force the master to handle reads.  Otherwise, due to
694 the lag, there is no certainty your data will be in the expected state.
695
696 For data integrity, all transactions automatically use the master storage for
697 all read and write queries.  Using a transaction is the preferred and recommended
698 method to force the master to handle all read queries.
699
700 Otherwise, you can force a single query to use the master with the 'force_pool'
701 attribute:
702
703   my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk);
704
705 This attribute will safely be ignore by non replicated storages, so you can use
706 the same code for both types of systems.
707
708 Lastly, you can use the L</execute_reliably> method, which works very much like
709 a transaction.
710
711 For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
712 and L</set_balanced_storage>, however this operates at a global level and is not
713 suitable if you have a shared Schema object being used by multiple processes,
714 such as on a web application server.  You can get around this limitation by
715 using the Schema clone method.
716
717   my $new_schema = $schema->clone;
718   $new_schema->set_reliable_storage;
719   
720   ## $new_schema will use only the Master storage for all reads/writes while
721   ## the $schema object will use replicated storage.
722
723 =head1 AUTHOR
724
725   John Napiorkowski <john.napiorkowski@takkle.com>
726
727 Based on code originated by:
728
729   Norbert Csongrádi <bert@cpan.org>
730   Peter Siklósi <einon@einon.hu>
731
732 =head1 LICENSE
733
734 You may distribute this code under the same terms as Perl itself.
735
736 =cut
737
738 __PACKAGE__->meta->make_immutable;
739
740 1;