use BUILDARGS intead of wrapping new, added make_immutable, removed unnneeded test...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 use Moose;
4 use Class::MOP;
5 use Moose::Util::TypeConstraints;
6 use DBIx::Class::Storage::DBI;
7 use DBIx::Class::Storage::DBI::Replicated::Pool;
8 use DBIx::Class::Storage::DBI::Replicated::Balancer;
9
10 =head1 NAME
11
12 DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
13
14 =head1 SYNOPSIS
15
16 The Following example shows how to change an existing $schema to a replicated
17 storage type, add some replicated (readonly) databases, and perform reporting
18 tasks.
19
20   ## Change storage_type in your schema class
21   $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
22   
23   ## Add some slaves.  Basically this is an array of arrayrefs, where each
24   ## arrayref is database connect information
25   
26   $schema->storage->connect_replicants(
27     [$dsn1, $user, $pass, \%opts],
28     [$dsn2, $user, $pass, \%opts],
29     [$dsn3, $user, $pass, \%opts],
30   );
31   
32   ## Now, just use the $schema as normal
33   $schema->resultset('Source')->search({name=>'etc'});
34   
35   ## You can force a given query to use a particular storage using the search
36   ### attribute 'force_pool'.  For example:
37   
38   my $RS = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
39   
40   ## Now $RS will force everything (both reads and writes) to use whatever was
41   ## setup as the master storage.  'master' is hardcoded to always point to the
42   ## Master, but you can also use any Replicant name.  Please see:
43   ## L<DBIx::Class::Storage::Replicated::Pool> and the replicants attribute for
44   ## More. Also see transactions and L</execute_reliably> for alternative ways
45   ## to force read traffic to the master.
46   
47 =head1 DESCRIPTION
48
49 Warning: This class is marked BETA.  This has been running a production
50 website using MySQL native replication as it's backend and we have some decent
51 test coverage but the code hasn't yet been stressed by a variety of databases.
52 Individual DB's may have quirks we are not aware of.  Please use this in first
53 development and pass along your experiences/bug fixes.
54
55 This class implements replicated data store for DBI. Currently you can define
56 one master and numerous slave database connections. All write-type queries
57 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
58 database, all read-type queries (SELECTs) go to the slave database.
59
60 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
61 handle gets delegated to one of the two attributes: L</read_handler> or to
62 L</write_handler>.  Additionally, some methods need to be distributed
63 to all existing storages.  This way our storage class is a drop in replacement
64 for L<DBIx::Class::Storage::DBI>.
65
66 Read traffic is spread across the replicants (slaves) occuring to a user
67 selected algorithm.  The default algorithm is random weighted.
68
69 =head1 NOTES
70
71 The consistancy betweeen master and replicants is database specific.  The Pool
72 gives you a method to validate it's replicants, removing and replacing them
73 when they fail/pass predefined criteria.  Please make careful use of the ways
74 to force a query to run against Master when needed.  
75
76 =head1 ATTRIBUTES
77
78 This class defines the following attributes.
79
80 =head2 schema
81
82 The underlying L<DBIx::Class::Schema> object this storage is attaching
83
84 =cut
85
86 has 'schema' => (
87     is=>'rw',
88     isa=>'DBIx::Class::Schema',
89     weak_ref=>1,
90     required=>1,
91 );
92
93 =head2 pool_type
94
95 Contains the classname which will instantiate the L</pool> object.  Defaults 
96 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
97
98 =cut
99
100 has 'pool_type' => (
101   is=>'ro',
102   isa=>'ClassName',
103   required=>1,
104   default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
105   handles=>{
106     'create_pool' => 'new',
107   },
108 );
109
110 =head2 pool_args
111
112 Contains a hashref of initialized information to pass to the Balancer object.
113 See L<DBIx::Class::Storage::Replicated::Pool> for available arguments.
114
115 =cut
116
117 has 'pool_args' => (
118   is=>'ro',
119   isa=>'HashRef',
120   lazy=>1,
121   required=>1,
122   default=>sub { {} },
123 );
124
125
126 =head2 balancer_type
127
128 The replication pool requires a balance class to provider the methods for
129 choose how to spread the query load across each replicant in the pool.
130
131 =cut
132
133 subtype 'DBIx::Class::Storage::DBI::Replicated::BalancerClassNamePart',
134   as 'ClassName';
135     
136 coerce 'DBIx::Class::Storage::DBI::Replicated::BalancerClassNamePart',
137   from 'Str',
138   via {
139         my $type = $_;
140     if($type=~m/^::/) {
141       $type = 'DBIx::Class::Storage::DBI::Replicated::Balancer'.$type;
142     }  
143     Class::MOP::load_class($type);  
144     $type;      
145   };
146
147 has 'balancer_type' => (
148   is=>'ro',
149   isa=>'DBIx::Class::Storage::DBI::Replicated::BalancerClassNamePart',
150   coerce=>1,
151   required=>1,
152   default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
153   handles=>{
154     'create_balancer' => 'new',
155   },
156 );
157
158 =head2 balancer_args
159
160 Contains a hashref of initialized information to pass to the Balancer object.
161 See L<DBIx::Class::Storage::Replicated::Balancer> for available arguments.
162
163 =cut
164
165 has 'balancer_args' => (
166   is=>'ro',
167   isa=>'HashRef',
168   lazy=>1,
169   required=>1,
170   default=>sub { {} },
171 );
172
173 =head2 pool
174
175 Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
176 container class for one or more replicated databases.
177
178 =cut
179
180 has 'pool' => (
181   is=>'ro',
182   isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
183   lazy_build=>1,
184   handles=>[qw/
185     connect_replicants    
186     replicants
187     has_replicants
188   /],
189 );
190
191 =head2 balancer
192
193 Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This 
194 is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
195
196 =cut
197
198 has 'balancer' => (
199   is=>'ro',
200   isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
201   lazy_build=>1,
202   handles=>[qw/auto_validate_every/],
203 );
204
205 =head2 master
206
207 The master defines the canonical state for a pool of connected databases.  All
208 the replicants are expected to match this databases state.  Thus, in a classic
209 Master / Slaves distributed system, all the slaves are expected to replicate
210 the Master's state as quick as possible.  This is the only database in the
211 pool of databases that is allowed to handle write traffic.
212
213 =cut
214
215 has 'master' => (
216   is=> 'ro',
217   isa=>'DBIx::Class::Storage::DBI',
218   lazy_build=>1,
219 );
220
221 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
222
223 The following methods are delegated all the methods required for the 
224 L<DBIx::Class::Storage::DBI> interface.
225
226 =head2 read_handler
227
228 Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
229
230 =cut
231
232 has 'read_handler' => (
233   is=>'rw',
234   isa=>'Object',
235   lazy_build=>1,
236   handles=>[qw/
237     select
238     select_single
239     columns_info_for
240   /],    
241 );
242
243 =head2 write_handler
244
245 Defines an object that implements the write side of L<BIx::Class::Storage::DBI>.
246
247 =cut
248
249 has 'write_handler' => (
250   is=>'ro',
251   isa=>'Object',
252   lazy_build=>1,
253   lazy_build=>1,
254   handles=>[qw/   
255     on_connect_do
256     on_disconnect_do       
257     connect_info
258     throw_exception
259     sql_maker
260     sqlt_type
261     create_ddl_dir
262     deployment_statements
263     datetime_parser
264     datetime_parser_type        
265     last_insert_id
266     insert
267     insert_bulk
268     update
269     delete
270     dbh
271     txn_begin
272     txn_do
273     txn_commit
274     txn_rollback
275     txn_scope_guard
276     sth
277     deploy
278
279     reload_row
280     _prep_for_execute
281     configure_sqlt
282     
283   /],
284 );
285
286 =head1 METHODS
287
288 This class defines the following methods.
289
290 =head2 BUILDARGS
291
292 L<DBIx::Class::Schema> when instantiating it's storage passed itself as the
293 first argument.  So we need to massage the arguments a bit so that all the
294 bits get put into the correct places.
295
296 =cut
297
298 sub BUILDARGS {
299   my ($class, $schema, $storage_type_args, @args) = @_; 
300   
301   return {
302         schema=>$schema, 
303         %$storage_type_args,
304         @args
305   }
306 }
307
308 =head2 _build_master
309
310 Lazy builder for the L</master> attribute.
311
312 =cut
313
314 sub _build_master {
315   my $self = shift @_;
316   DBIx::Class::Storage::DBI->new($self->schema);
317 }
318
319 =head2 _build_pool
320
321 Lazy builder for the L</pool> attribute.
322
323 =cut
324
325 sub _build_pool {
326   my $self = shift @_;
327   $self->create_pool(%{$self->pool_args});
328 }
329
330 =head2 _build_balancer
331
332 Lazy builder for the L</balancer> attribute.  This takes a Pool object so that
333 the balancer knows which pool it's balancing.
334
335 =cut
336
337 sub _build_balancer {
338   my $self = shift @_;
339   $self->create_balancer(
340     pool=>$self->pool, 
341     master=>$self->master,
342     %{$self->balancer_args},
343   );
344 }
345
346 =head2 _build_write_handler
347
348 Lazy builder for the L</write_handler> attribute.  The default is to set this to
349 the L</master>.
350
351 =cut
352
353 sub _build_write_handler {
354   return shift->master;
355 }
356
357 =head2 _build_read_handler
358
359 Lazy builder for the L</read_handler> attribute.  The default is to set this to
360 the L</balancer>.
361
362 =cut
363
364 sub _build_read_handler {
365   return shift->balancer;
366 }
367
368 =head2 around: connect_replicants
369
370 All calls to connect_replicants needs to have an existing $schema tacked onto
371 top of the args, since L<DBIx::Storage::DBI> needs it.
372
373 =cut
374
375 around 'connect_replicants' => sub {
376   my ($method, $self, @args) = @_;
377   $self->$method($self->schema, @args);
378 };
379
380 =head2 all_storages
381
382 Returns an array of of all the connected storage backends.  The first element
383 in the returned array is the master, and the remainings are each of the
384 replicants.
385
386 =cut
387
388 sub all_storages {
389   my $self = shift @_;
390   return grep {defined $_ && blessed $_} (
391      $self->master,
392      $self->replicants,
393   );
394 }
395
396 =head2 execute_reliably ($coderef, ?@args)
397
398 Given a coderef, saves the current state of the L</read_handler>, forces it to
399 use reliable storage (ie sets it to the master), executes a coderef and then
400 restores the original state.
401
402 Example:
403
404   my $reliably = sub {
405     my $name = shift @_;
406     $schema->resultset('User')->create({name=>$name});
407     my $user_rs = $schema->resultset('User')->find({name=>$name}); 
408     return $user_rs;
409   };
410
411   my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
412
413 Use this when you must be certain of your database state, such as when you just
414 inserted something and need to get a resultset including it, etc.
415
416 =cut
417
418 use Benchmark;
419
420 sub execute_reliably {
421   my ($self, $coderef, @args) = @_;
422   
423   unless( ref $coderef eq 'CODE') {
424     $self->throw_exception('Second argument must be a coderef');
425   }
426   
427     my $t0 = new Benchmark;
428     my $clone = $self->clone;
429     my $t1 = new Benchmark;
430     my $td = timediff($t1, $t0);
431     warn "----------------- the code took:",timestr($td),"\n";
432       
433
434   
435   ##Get copy of master storage
436   my $master = $self->master;
437   
438   ##Get whatever the current read hander is
439   my $current = $self->read_handler;
440   
441   ##Set the read handler to master
442   $self->read_handler($master);
443   
444   ## do whatever the caller needs
445   my @result;
446   my $want_array = wantarray;
447   
448   eval {
449     if($want_array) {
450       @result = $coderef->(@args);
451     } elsif(defined $want_array) {
452       ($result[0]) = ($coderef->(@args));
453     } else {
454       $coderef->(@args);
455     }       
456   };
457   
458   ##Reset to the original state
459   $self->read_handler($current); 
460   
461   ##Exception testing has to come last, otherwise you might leave the 
462   ##read_handler set to master.
463   
464   if($@) {
465     $self->throw_exception("coderef returned an error: $@");
466   } else {
467     return $want_array ? @result : $result[0];
468   }
469 }
470
471 =head2 set_reliable_storage
472
473 Sets the current $schema to be 'reliable', that is all queries, both read and
474 write are sent to the master
475   
476 =cut
477
478 sub set_reliable_storage {
479   my $self = shift @_;
480   my $schema = $self->schema;
481   my $write_handler = $self->schema->storage->write_handler;
482   
483   $schema->storage->read_handler($write_handler);
484 }
485
486 =head2 set_balanced_storage
487
488 Sets the current $schema to be use the </balancer> for all reads, while all
489 writea are sent to the master only
490   
491 =cut
492
493 sub set_balanced_storage {
494   my $self = shift @_;
495   my $schema = $self->schema;
496   my $write_handler = $self->schema->storage->balancer;
497   
498   $schema->storage->read_handler($write_handler);
499 }
500
501 =head2 around: txn_do ($coderef)
502
503 Overload to the txn_do method, which is delegated to whatever the
504 L<write_handler> is set to.  We overload this in order to wrap in inside a
505 L</execute_reliably> method.
506
507 =cut
508
509 around 'txn_do' => sub {
510   my($txn_do, $self, $coderef, @args) = @_;
511   $self->execute_reliably(sub {$self->$txn_do($coderef, @args)}); 
512 };
513
514 =head2 connected
515
516 Check that the master and at least one of the replicants is connected.
517
518 =cut
519
520 sub connected {
521   my $self = shift @_;
522   return
523     $self->master->connected &&
524     $self->pool->connected_replicants;
525 }
526
527 =head2 ensure_connected
528
529 Make sure all the storages are connected.
530
531 =cut
532
533 sub ensure_connected {
534   my $self = shift @_;
535   foreach my $source ($self->all_storages) {
536     $source->ensure_connected(@_);
537   }
538 }
539
540 =head2 limit_dialect
541
542 Set the limit_dialect for all existing storages
543
544 =cut
545
546 sub limit_dialect {
547   my $self = shift @_;
548   foreach my $source ($self->all_storages) {
549     $source->limit_dialect(@_);
550   }
551 }
552
553 =head2 quote_char
554
555 Set the quote_char for all existing storages
556
557 =cut
558
559 sub quote_char {
560   my $self = shift @_;
561   foreach my $source ($self->all_storages) {
562     $source->quote_char(@_);
563   }
564 }
565
566 =head2 name_sep
567
568 Set the name_sep for all existing storages
569
570 =cut
571
572 sub name_sep {
573   my $self = shift @_;
574   foreach my $source ($self->all_storages) {
575     $source->name_sep(@_);
576   }
577 }
578
579 =head2 set_schema
580
581 Set the schema object for all existing storages
582
583 =cut
584
585 sub set_schema {
586   my $self = shift @_;
587   foreach my $source ($self->all_storages) {
588     $source->set_schema(@_);
589   }
590 }
591
592 =head2 debug
593
594 set a debug flag across all storages
595
596 =cut
597
598 sub debug {
599   my $self = shift @_;
600   foreach my $source ($self->all_storages) {
601     $source->debug(@_);
602   }
603 }
604
605 =head2 debugobj
606
607 set a debug object across all storages
608
609 =cut
610
611 sub debugobj {
612   my $self = shift @_;
613   foreach my $source ($self->all_storages) {
614     $source->debugobj(@_);
615   }
616 }
617
618 =head2 debugfh
619
620 set a debugfh object across all storages
621
622 =cut
623
624 sub debugfh {
625   my $self = shift @_;
626   foreach my $source ($self->all_storages) {
627     $source->debugfh(@_);
628   }
629 }
630
631 =head2 debugcb
632
633 set a debug callback across all storages
634
635 =cut
636
637 sub debugcb {
638   my $self = shift @_;
639   foreach my $source ($self->all_storages) {
640     $source->debugcb(@_);
641   }
642 }
643
644 =head2 disconnect
645
646 disconnect everything
647
648 =cut
649
650 sub disconnect {
651   my $self = shift @_;
652   foreach my $source ($self->all_storages) {
653     $source->disconnect(@_);
654   }
655 }
656
657 =head1 GOTCHAS
658
659 Due to the fact that replicants can lag behind a master, you must take care to
660 make sure you use one of the methods to force read queries to a master should
661 you need realtime data integrity.  For example, if you insert a row, and then
662 immediately re-read it from the database (say, by doing $row->discard_changes)
663 or you insert a row and then immediately build a query that expects that row
664 to be an item, you should force the master to handle reads.  Otherwise, due to
665 the lag, there is no certainty your data will be in the expected state.
666
667 For data integrity, all transactions automatically use the master storage for
668 all read and write queries.  Using a transaction is the preferred and recommended
669 method to force the master to handle all read queries.
670
671 Otherwise, you can force a single query to use the master with the 'force_pool'
672 attribute:
673
674   my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk);
675
676 This attribute will safely be ignore by non replicated storages, so you can use
677 the same code for both types of systems.
678
679 Lastly, you can use the L</execute_reliably> method, which works very much like
680 a transaction.
681
682 For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
683 and L</set_balanced_storage>, however this operates at a global level and is not
684 suitable if you have a shared Schema object being used by multiple processes,
685 such as on a web application server.  You can get around this limitation by
686 using the Schema clone method.
687
688   my $new_schema = $schema->clone;
689   $new_schema->set_reliable_storage;
690   
691   ## $new_schema will use only the Master storage for all reads/writes while
692   ## the $schema object will use replicated storage.
693
694 =head1 AUTHOR
695
696   John Napiorkowski <john.napiorkowski@takkle.com>
697
698 Based on code originated by:
699
700   Norbert Csongrádi <bert@cpan.org>
701   Peter Siklósi <einon@einon.hu>
702
703 =head1 LICENSE
704
705 You may distribute this code under the same terms as Perl itself.
706
707 =cut
708
709 __PACKAGE__->meta->make_immutable;
710
711 1;