::Replicated - test hashref for connect_replicants and croak on coderef, switch to...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 BEGIN {
4   use Carp::Clan qw/^DBIx::Class/;
5         
6   ## Modules required for Replication support not required for general DBIC
7   ## use, so we explicitly test for these.
8         
9   my %replication_required = (
10     Moose => '0.77',
11     MooseX::AttributeHelpers => '0.12',
12     MooseX::Types => '0.10',
13     namespace::clean => '0.11',
14   );
15         
16   my @didnt_load;
17   
18   for my $module (keys %replication_required) {
19         eval "use $module $replication_required{$module}";
20         push @didnt_load, "$module $replication_required{$module}"
21          if $@;
22   }
23         
24   croak("@{[ join ', ', @didnt_load ]} are missing and are required for Replication")
25     if @didnt_load;     
26 }
27
28 use DBIx::Class::Storage::DBI;
29 use DBIx::Class::Storage::DBI::Replicated::Pool;
30 use DBIx::Class::Storage::DBI::Replicated::Balancer;
31 use DBIx::Class::Storage::DBI::Replicated::Types 'BalancerClassNamePart';
32
33 use namespace::clean -except => 'meta';
34
35 =head1 NAME
36
37 DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
38
39 =head1 SYNOPSIS
40
41 The Following example shows how to change an existing $schema to a replicated
42 storage type, add some replicated (readonly) databases, and perform reporting
43 tasks.
44
45   ## Change storage_type in your schema class
46   $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
47   
48   ## Add some slaves.  Basically this is an array of arrayrefs, where each
49   ## arrayref is database connect information
50   
51   $schema->storage->connect_replicants(
52     [$dsn1, $user, $pass, \%opts],
53     [$dsn2, $user, $pass, \%opts],
54     [$dsn3, $user, $pass, \%opts],
55   );
56   
57   ## Now, just use the $schema as normal
58   $schema->resultset('Source')->search({name=>'etc'});
59   
60   ## You can force a given query to use a particular storage using the search
61   ### attribute 'force_pool'.  For example:
62   
63   my $RS = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
64   
65   ## Now $RS will force everything (both reads and writes) to use whatever was
66   ## setup as the master storage.  'master' is hardcoded to always point to the
67   ## Master, but you can also use any Replicant name.  Please see:
68   ## L<DBIx::Class::Storage::Replicated::Pool> and the replicants attribute for
69   ## More. Also see transactions and L</execute_reliably> for alternative ways
70   ## to force read traffic to the master.
71   
72 =head1 DESCRIPTION
73
74 Warning: This class is marked BETA.  This has been running a production
75 website using MySQL native replication as its backend and we have some decent
76 test coverage but the code hasn't yet been stressed by a variety of databases.
77 Individual DB's may have quirks we are not aware of.  Please use this in first
78 development and pass along your experiences/bug fixes.
79
80 This class implements replicated data store for DBI. Currently you can define
81 one master and numerous slave database connections. All write-type queries
82 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
83 database, all read-type queries (SELECTs) go to the slave database.
84
85 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
86 handle gets delegated to one of the two attributes: L</read_handler> or to
87 L</write_handler>.  Additionally, some methods need to be distributed
88 to all existing storages.  This way our storage class is a drop in replacement
89 for L<DBIx::Class::Storage::DBI>.
90
91 Read traffic is spread across the replicants (slaves) occuring to a user
92 selected algorithm.  The default algorithm is random weighted.
93
94 =head1 NOTES
95
96 The consistancy betweeen master and replicants is database specific.  The Pool
97 gives you a method to validate it's replicants, removing and replacing them
98 when they fail/pass predefined criteria.  Please make careful use of the ways
99 to force a query to run against Master when needed.
100
101 =head1 REQUIREMENTS
102
103 Replicated Storage has additional requirements not currently part of L<DBIx::Class>
104
105   Moose => 0.77
106   MooseX::AttributeHelpers => 0.12 
107   MooseX::Types => 0.10
108   namespace::clean => 0.11
109   
110 You will need to install these modules manually via CPAN or make them part of the
111 Makefile for your distribution.
112
113 =head1 ATTRIBUTES
114
115 This class defines the following attributes.
116
117 =head2 schema
118
119 The underlying L<DBIx::Class::Schema> object this storage is attaching
120
121 =cut
122
123 has 'schema' => (
124     is=>'rw',
125     isa=>'DBIx::Class::Schema',
126     weak_ref=>1,
127     required=>1,
128 );
129
130 =head2 pool_type
131
132 Contains the classname which will instantiate the L</pool> object.  Defaults 
133 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
134
135 =cut
136
137 has 'pool_type' => (
138   is=>'ro',
139   isa=>'ClassName',
140   required=>1,
141   default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
142   handles=>{
143     'create_pool' => 'new',
144   },
145 );
146
147 =head2 pool_args
148
149 Contains a hashref of initialized information to pass to the Balancer object.
150 See L<DBIx::Class::Storage::Replicated::Pool> for available arguments.
151
152 =cut
153
154 has 'pool_args' => (
155   is=>'ro',
156   isa=>'HashRef',
157   lazy=>1,
158   required=>1,
159   default=>sub { {} },
160 );
161
162
163 =head2 balancer_type
164
165 The replication pool requires a balance class to provider the methods for
166 choose how to spread the query load across each replicant in the pool.
167
168 =cut
169
170 has 'balancer_type' => (
171   is=>'ro',
172   isa=>BalancerClassNamePart,
173   coerce=>1,
174   required=>1,
175   default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
176   handles=>{
177     'create_balancer' => 'new',
178   },
179 );
180
181 =head2 balancer_args
182
183 Contains a hashref of initialized information to pass to the Balancer object.
184 See L<DBIx::Class::Storage::Replicated::Balancer> for available arguments.
185
186 =cut
187
188 has 'balancer_args' => (
189   is=>'ro',
190   isa=>'HashRef',
191   lazy=>1,
192   required=>1,
193   default=>sub { {} },
194 );
195
196 =head2 pool
197
198 Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
199 container class for one or more replicated databases.
200
201 =cut
202
203 has 'pool' => (
204   is=>'ro',
205   isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
206   lazy_build=>1,
207   handles=>[qw/
208     connect_replicants    
209     replicants
210     has_replicants
211   /],
212 );
213
214 =head2 balancer
215
216 Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This 
217 is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
218
219 =cut
220
221 has 'balancer' => (
222   is=>'ro',
223   isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
224   lazy_build=>1,
225   handles=>[qw/auto_validate_every/],
226 );
227
228 =head2 master
229
230 The master defines the canonical state for a pool of connected databases.  All
231 the replicants are expected to match this databases state.  Thus, in a classic
232 Master / Slaves distributed system, all the slaves are expected to replicate
233 the Master's state as quick as possible.  This is the only database in the
234 pool of databases that is allowed to handle write traffic.
235
236 =cut
237
238 has 'master' => (
239   is=> 'ro',
240   isa=>'DBIx::Class::Storage::DBI',
241   lazy_build=>1,
242 );
243
244 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
245
246 The following methods are delegated all the methods required for the 
247 L<DBIx::Class::Storage::DBI> interface.
248
249 =head2 read_handler
250
251 Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
252
253 =cut
254
255 has 'read_handler' => (
256   is=>'rw',
257   isa=>'Object',
258   lazy_build=>1,
259   handles=>[qw/
260     select
261     select_single
262     columns_info_for
263   /],    
264 );
265
266 =head2 write_handler
267
268 Defines an object that implements the write side of L<BIx::Class::Storage::DBI>.
269
270 =cut
271
272 has 'write_handler' => (
273   is=>'ro',
274   isa=>'Object',
275   lazy_build=>1,
276   lazy_build=>1,
277   handles=>[qw/   
278     on_connect_do
279     on_disconnect_do       
280     connect_info
281     throw_exception
282     sql_maker
283     sqlt_type
284     create_ddl_dir
285     deployment_statements
286     datetime_parser
287     datetime_parser_type        
288     last_insert_id
289     insert
290     insert_bulk
291     update
292     delete
293     dbh
294     txn_begin
295     txn_do
296     txn_commit
297     txn_rollback
298     txn_scope_guard
299     sth
300     deploy
301     with_deferred_fk_checks
302
303     reload_row
304     _prep_for_execute
305     
306   /],
307 );
308
309 =head1 METHODS
310
311 This class defines the following methods.
312
313 =head2 BUILDARGS
314
315 L<DBIx::Class::Schema> when instantiating it's storage passed itself as the
316 first argument.  So we need to massage the arguments a bit so that all the
317 bits get put into the correct places.
318
319 =cut
320
321 sub BUILDARGS {
322   my ($class, $schema, $storage_type_args, @args) = @_; 
323   
324   return {
325         schema=>$schema, 
326         %$storage_type_args,
327         @args
328   }
329 }
330
331 =head2 _build_master
332
333 Lazy builder for the L</master> attribute.
334
335 =cut
336
337 sub _build_master {
338   my $self = shift @_;
339   DBIx::Class::Storage::DBI->new($self->schema);
340 }
341
342 =head2 _build_pool
343
344 Lazy builder for the L</pool> attribute.
345
346 =cut
347
348 sub _build_pool {
349   my $self = shift @_;
350   $self->create_pool(%{$self->pool_args});
351 }
352
353 =head2 _build_balancer
354
355 Lazy builder for the L</balancer> attribute.  This takes a Pool object so that
356 the balancer knows which pool it's balancing.
357
358 =cut
359
360 sub _build_balancer {
361   my $self = shift @_;
362   $self->create_balancer(
363     pool=>$self->pool, 
364     master=>$self->master,
365     %{$self->balancer_args},
366   );
367 }
368
369 =head2 _build_write_handler
370
371 Lazy builder for the L</write_handler> attribute.  The default is to set this to
372 the L</master>.
373
374 =cut
375
376 sub _build_write_handler {
377   return shift->master;
378 }
379
380 =head2 _build_read_handler
381
382 Lazy builder for the L</read_handler> attribute.  The default is to set this to
383 the L</balancer>.
384
385 =cut
386
387 sub _build_read_handler {
388   return shift->balancer;
389 }
390
391 =head2 around: connect_replicants
392
393 All calls to connect_replicants needs to have an existing $schema tacked onto
394 top of the args, since L<DBIx::Storage::DBI> needs it.
395
396 =cut
397
398 around 'connect_replicants' => sub {
399   my ($method, $self, @args) = @_;
400   $self->$method($self->schema, @args);
401 };
402
403 =head2 all_storages
404
405 Returns an array of of all the connected storage backends.  The first element
406 in the returned array is the master, and the remainings are each of the
407 replicants.
408
409 =cut
410
411 sub all_storages {
412   my $self = shift @_;
413   return grep {defined $_ && blessed $_} (
414      $self->master,
415      $self->replicants,
416   );
417 }
418
419 =head2 execute_reliably ($coderef, ?@args)
420
421 Given a coderef, saves the current state of the L</read_handler>, forces it to
422 use reliable storage (ie sets it to the master), executes a coderef and then
423 restores the original state.
424
425 Example:
426
427   my $reliably = sub {
428     my $name = shift @_;
429     $schema->resultset('User')->create({name=>$name});
430     my $user_rs = $schema->resultset('User')->find({name=>$name}); 
431     return $user_rs;
432   };
433
434   my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
435
436 Use this when you must be certain of your database state, such as when you just
437 inserted something and need to get a resultset including it, etc.
438
439 =cut
440
441 sub execute_reliably {
442   my ($self, $coderef, @args) = @_;
443   
444   unless( ref $coderef eq 'CODE') {
445     $self->throw_exception('Second argument must be a coderef');
446   }
447   
448   ##Get copy of master storage
449   my $master = $self->master;
450   
451   ##Get whatever the current read hander is
452   my $current = $self->read_handler;
453   
454   ##Set the read handler to master
455   $self->read_handler($master);
456   
457   ## do whatever the caller needs
458   my @result;
459   my $want_array = wantarray;
460   
461   eval {
462     if($want_array) {
463       @result = $coderef->(@args);
464     } elsif(defined $want_array) {
465       ($result[0]) = ($coderef->(@args));
466     } else {
467       $coderef->(@args);
468     }       
469   };
470   
471   ##Reset to the original state
472   $self->read_handler($current); 
473   
474   ##Exception testing has to come last, otherwise you might leave the 
475   ##read_handler set to master.
476   
477   if($@) {
478     $self->throw_exception("coderef returned an error: $@");
479   } else {
480     return $want_array ? @result : $result[0];
481   }
482 }
483
484 =head2 set_reliable_storage
485
486 Sets the current $schema to be 'reliable', that is all queries, both read and
487 write are sent to the master
488   
489 =cut
490
491 sub set_reliable_storage {
492   my $self = shift @_;
493   my $schema = $self->schema;
494   my $write_handler = $self->schema->storage->write_handler;
495   
496   $schema->storage->read_handler($write_handler);
497 }
498
499 =head2 set_balanced_storage
500
501 Sets the current $schema to be use the </balancer> for all reads, while all
502 writea are sent to the master only
503   
504 =cut
505
506 sub set_balanced_storage {
507   my $self = shift @_;
508   my $schema = $self->schema;
509   my $write_handler = $self->schema->storage->balancer;
510   
511   $schema->storage->read_handler($write_handler);
512 }
513
514 =head2 around: txn_do ($coderef)
515
516 Overload to the txn_do method, which is delegated to whatever the
517 L<write_handler> is set to.  We overload this in order to wrap in inside a
518 L</execute_reliably> method.
519
520 =cut
521
522 around 'txn_do' => sub {
523   my($txn_do, $self, $coderef, @args) = @_;
524   $self->execute_reliably(sub {$self->$txn_do($coderef, @args)}); 
525 };
526
527 =head2 connected
528
529 Check that the master and at least one of the replicants is connected.
530
531 =cut
532
533 sub connected {
534   my $self = shift @_;
535   return
536     $self->master->connected &&
537     $self->pool->connected_replicants;
538 }
539
540 =head2 ensure_connected
541
542 Make sure all the storages are connected.
543
544 =cut
545
546 sub ensure_connected {
547   my $self = shift @_;
548   foreach my $source ($self->all_storages) {
549     $source->ensure_connected(@_);
550   }
551 }
552
553 =head2 limit_dialect
554
555 Set the limit_dialect for all existing storages
556
557 =cut
558
559 sub limit_dialect {
560   my $self = shift @_;
561   foreach my $source ($self->all_storages) {
562     $source->limit_dialect(@_);
563   }
564   return $self->master->quote_char;
565 }
566
567 =head2 quote_char
568
569 Set the quote_char for all existing storages
570
571 =cut
572
573 sub quote_char {
574   my $self = shift @_;
575   foreach my $source ($self->all_storages) {
576     $source->quote_char(@_);
577   }
578   return $self->master->quote_char;
579 }
580
581 =head2 name_sep
582
583 Set the name_sep for all existing storages
584
585 =cut
586
587 sub name_sep {
588   my $self = shift @_;
589   foreach my $source ($self->all_storages) {
590     $source->name_sep(@_);
591   }
592   return $self->master->name_sep;
593 }
594
595 =head2 set_schema
596
597 Set the schema object for all existing storages
598
599 =cut
600
601 sub set_schema {
602   my $self = shift @_;
603   foreach my $source ($self->all_storages) {
604     $source->set_schema(@_);
605   }
606 }
607
608 =head2 debug
609
610 set a debug flag across all storages
611
612 =cut
613
614 sub debug {
615   my $self = shift @_;
616   if(@_) {
617     foreach my $source ($self->all_storages) {
618       $source->debug(@_);
619     }   
620   }
621   return $self->master->debug;
622 }
623
624 =head2 debugobj
625
626 set a debug object across all storages
627
628 =cut
629
630 sub debugobj {
631   my $self = shift @_;
632   if(@_) {
633     foreach my $source ($self->all_storages) {
634       $source->debugobj(@_);
635     }   
636   }
637   return $self->master->debugobj;
638 }
639
640 =head2 debugfh
641
642 set a debugfh object across all storages
643
644 =cut
645
646 sub debugfh {
647   my $self = shift @_;
648   if(@_) {
649     foreach my $source ($self->all_storages) {
650       $source->debugfh(@_);
651     }   
652   }
653   return $self->master->debugfh;
654 }
655
656 =head2 debugcb
657
658 set a debug callback across all storages
659
660 =cut
661
662 sub debugcb {
663   my $self = shift @_;
664   if(@_) {
665     foreach my $source ($self->all_storages) {
666       $source->debugcb(@_);
667     }   
668   }
669   return $self->master->debugcb;
670 }
671
672 =head2 disconnect
673
674 disconnect everything
675
676 =cut
677
678 sub disconnect {
679   my $self = shift @_;
680   foreach my $source ($self->all_storages) {
681     $source->disconnect(@_);
682   }
683 }
684
685 =head1 GOTCHAS
686
687 Due to the fact that replicants can lag behind a master, you must take care to
688 make sure you use one of the methods to force read queries to a master should
689 you need realtime data integrity.  For example, if you insert a row, and then
690 immediately re-read it from the database (say, by doing $row->discard_changes)
691 or you insert a row and then immediately build a query that expects that row
692 to be an item, you should force the master to handle reads.  Otherwise, due to
693 the lag, there is no certainty your data will be in the expected state.
694
695 For data integrity, all transactions automatically use the master storage for
696 all read and write queries.  Using a transaction is the preferred and recommended
697 method to force the master to handle all read queries.
698
699 Otherwise, you can force a single query to use the master with the 'force_pool'
700 attribute:
701
702   my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk);
703
704 This attribute will safely be ignore by non replicated storages, so you can use
705 the same code for both types of systems.
706
707 Lastly, you can use the L</execute_reliably> method, which works very much like
708 a transaction.
709
710 For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
711 and L</set_balanced_storage>, however this operates at a global level and is not
712 suitable if you have a shared Schema object being used by multiple processes,
713 such as on a web application server.  You can get around this limitation by
714 using the Schema clone method.
715
716   my $new_schema = $schema->clone;
717   $new_schema->set_reliable_storage;
718   
719   ## $new_schema will use only the Master storage for all reads/writes while
720   ## the $schema object will use replicated storage.
721
722 =head1 AUTHOR
723
724   John Napiorkowski <john.napiorkowski@takkle.com>
725
726 Based on code originated by:
727
728   Norbert Csongrádi <bert@cpan.org>
729   Peter Siklósi <einon@einon.hu>
730
731 =head1 LICENSE
732
733 You may distribute this code under the same terms as Perl itself.
734
735 =cut
736
737 __PACKAGE__->meta->make_immutable;
738
739 1;