use BUILDARGS intead of wrapping new, added make_immutable, removed unnneeded test...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 use Moose;
4 use Class::MOP;
5 use Moose::Util::TypeConstraints;
6 use DBIx::Class::Storage::DBI;
7 use DBIx::Class::Storage::DBI::Replicated::Pool;
8 use DBIx::Class::Storage::DBI::Replicated::Balancer;
9
10 =head1 NAME
11
12 DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
13
14 =head1 SYNOPSIS
15
16 The Following example shows how to change an existing $schema to a replicated
17 storage type, add some replicated (readonly) databases, and perform reporting
18 tasks.
19
20   ## Change storage_type in your schema class
21   $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
22   
23   ## Add some slaves.  Basically this is an array of arrayrefs, where each
24   ## arrayref is database connect information
25   
26   $schema->storage->connect_replicants(
27     [$dsn1, $user, $pass, \%opts],
28     [$dsn2, $user, $pass, \%opts],
29     [$dsn3, $user, $pass, \%opts],
30   );
31   
32   ## Now, just use the $schema as normal
33   $schema->resultset('Source')->search({name=>'etc'});
34   
35   ## You can force a given query to use a particular storage using the search
36   ### attribute 'force_pool'.  For example:
37   
38   my $RS = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
39   
40   ## Now $RS will force everything (both reads and writes) to use whatever was
41   ## setup as the master storage.  'master' is hardcoded to always point to the
42   ## Master, but you can also use any Replicant name.  Please see:
43   ## L<DBIx::Class::Storage::Replicated::Pool> and the replicants attribute for
44   ## More. Also see transactions and L</execute_reliably> for alternative ways
45   ## to force read traffic to the master.
46   
47 =head1 DESCRIPTION
48
49 Warning: This class is marked BETA.  This has been running a production
50 website using MySQL native replication as it's backend and we have some decent
51 test coverage but the code hasn't yet been stressed by a variety of databases.
52 Individual DB's may have quirks we are not aware of.  Please use this in first
53 development and pass along your experiences/bug fixes.
54
55 This class implements replicated data store for DBI. Currently you can define
56 one master and numerous slave database connections. All write-type queries
57 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
58 database, all read-type queries (SELECTs) go to the slave database.
59
60 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
61 handle gets delegated to one of the two attributes: L</read_handler> or to
62 L</write_handler>.  Additionally, some methods need to be distributed
63 to all existing storages.  This way our storage class is a drop in replacement
64 for L<DBIx::Class::Storage::DBI>.
65
66 Read traffic is spread across the replicants (slaves) occuring to a user
67 selected algorithm.  The default algorithm is random weighted.
68
69 =head1 NOTES
70
71 The consistancy betweeen master and replicants is database specific.  The Pool
72 gives you a method to validate it's replicants, removing and replacing them
73 when they fail/pass predefined criteria.  Please make careful use of the ways
74 to force a query to run against Master when needed.  
75
76 =head1 ATTRIBUTES
77
78 This class defines the following attributes.
79
80 =head2 schema
81
82 The underlying L<DBIx::Class::Schema> object this storage is attaching
83
84 =cut
85
86 has 'schema' => (
87     is=>'rw',
88     isa=>'DBIx::Class::Schema',
89     weak_ref=>1,
90     required=>1,
91 );
92
93 =head2 pool_type
94
95 Contains the classname which will instantiate the L</pool> object.  Defaults 
96 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
97
98 =cut
99
100 has 'pool_type' => (
101   is=>'ro',
102   isa=>'ClassName',
103   required=>1,
104   default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
105   handles=>{
106     'create_pool' => 'new',
107   },
108 );
109
110 =head2 pool_args
111
112 Contains a hashref of initialized information to pass to the Balancer object.
113 See L<DBIx::Class::Storage::Replicated::Pool> for available arguments.
114
115 =cut
116
117 has 'pool_args' => (
118   is=>'ro',
119   isa=>'HashRef',
120   lazy=>1,
121   required=>1,
122   default=>sub { {} },
123 );
124
125
126 =head2 balancer_type
127
128 The replication pool requires a balance class to provider the methods for
129 choose how to spread the query load across each replicant in the pool.
130
131 =cut
132
133 subtype 'DBIx::Class::Storage::DBI::Replicated::BalancerClassNamePart',
134   as 'ClassName';
135     
136 coerce 'DBIx::Class::Storage::DBI::Replicated::BalancerClassNamePart',
137   from 'Str',
138   via {
139         my $type = $_;
140     if($type=~m/^::/) {
141       $type = 'DBIx::Class::Storage::DBI::Replicated::Balancer'.$type;
142     }  
143     Class::MOP::load_class($type);  
144     $type;      
145   };
146
147 has 'balancer_type' => (
148   is=>'ro',
149   isa=>'DBIx::Class::Storage::DBI::Replicated::BalancerClassNamePart',
150   coerce=>1,
151   required=>1,
152   default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
153   handles=>{
154     'create_balancer' => 'new',
155   },
156 );
157
158 =head2 balancer_args
159
160 Contains a hashref of initialized information to pass to the Balancer object.
161 See L<DBIx::Class::Storage::Replicated::Balancer> for available arguments.
162
163 =cut
164
165 has 'balancer_args' => (
166   is=>'ro',
167   isa=>'HashRef',
168   lazy=>1,
169   required=>1,
170   default=>sub { {} },
171 );
172
173 =head2 pool
174
175 Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
176 container class for one or more replicated databases.
177
178 =cut
179
180 has 'pool' => (
181   is=>'ro',
182   isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
183   lazy_build=>1,
184   handles=>[qw/
185     connect_replicants    
186     replicants
187     has_replicants
188   /],
189 );
190
191 =head2 balancer
192
193 Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This 
194 is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
195
196 =cut
197
198 has 'balancer' => (
199   is=>'ro',
200   isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
201   lazy_build=>1,
202   handles=>[qw/auto_validate_every/],
203 );
204
205 =head2 master
206
207 The master defines the canonical state for a pool of connected databases.  All
208 the replicants are expected to match this databases state.  Thus, in a classic
209 Master / Slaves distributed system, all the slaves are expected to replicate
210 the Master's state as quick as possible.  This is the only database in the
211 pool of databases that is allowed to handle write traffic.
212
213 =cut
214
215 has 'master' => (
216   is=> 'ro',
217   isa=>'DBIx::Class::Storage::DBI',
218   lazy_build=>1,
219 );
220
221 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
222
223 The following methods are delegated all the methods required for the 
224 L<DBIx::Class::Storage::DBI> interface.
225
226 =head2 read_handler
227
228 Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
229
230 =cut
231
232 has 'read_handler' => (
233   is=>'rw',
234   isa=>'Object',
235   lazy_build=>1,
236   handles=>[qw/
237     select
238     select_single
239     columns_info_for
240   /],    
241 );
242
243 =head2 write_handler
244
245 Defines an object that implements the write side of L<BIx::Class::Storage::DBI>.
246
247 =cut
248
249 has 'write_handler' => (
250   is=>'ro',
251   isa=>'Object',
252   lazy_build=>1,
253   lazy_build=>1,
254   handles=>[qw/   
255     on_connect_do
256     on_disconnect_do       
257     connect_info
258     throw_exception
259     sql_maker
260     sqlt_type
261     create_ddl_dir
262     deployment_statements
263     datetime_parser
264     datetime_parser_type        
265     last_insert_id
266     insert
267     insert_bulk
268     update
269     delete
270     dbh
271     txn_begin
272     txn_do
273     txn_commit
274     txn_rollback
275     txn_scope_guard
276     sth
277     deploy
278
279     reload_row
280     _prep_for_execute
281     configure_sqlt
282     
283   /],
284 );
285
286 =head1 METHODS
287
288 This class defines the following methods.
289
290 =head2 BUILDARGS
291
292 L<DBIx::Class::Schema> when instantiating it's storage passed itself as the
293 first argument.  So we need to massage the arguments a bit so that all the
294 bits get put into the correct places.
295
296 =cut
297
298 sub BUILDARGS {
299   my ($class, $schema, $storage_type_args, @args) = @_; 
300   
301   return {
302         schema=>$schema, 
303         %$storage_type_args,
304         @args
305   }
306 }
307
308 =head2 _build_master
309
310 Lazy builder for the L</master> attribute.
311
312 =cut
313
314 sub _build_master {
315   my $self = shift @_;
316   DBIx::Class::Storage::DBI->new($self->schema);
317 }
318
319 =head2 _build_pool
320
321 Lazy builder for the L</pool> attribute.
322
323 =cut
324
325 sub _build_pool {
326   my $self = shift @_;
327   $self->create_pool(%{$self->pool_args});
328 }
329
330 =head2 _build_balancer
331
332 Lazy builder for the L</balancer> attribute.  This takes a Pool object so that
333 the balancer knows which pool it's balancing.
334
335 =cut
336
337 sub _build_balancer {
338   my $self = shift @_;
339   $self->create_balancer(
340     pool=>$self->pool, 
341     master=>$self->master,
342     %{$self->balancer_args},
343   );
344 }
345
346 =head2 _build_write_handler
347
348 Lazy builder for the L</write_handler> attribute.  The default is to set this to
349 the L</master>.
350
351 =cut
352
353 sub _build_write_handler {
354   return shift->master;
355 }
356
357 =head2 _build_read_handler
358
359 Lazy builder for the L</read_handler> attribute.  The default is to set this to
360 the L</balancer>.
361
362 =cut
363
364 sub _build_read_handler {
365   return shift->balancer;
366 }
367
368 =head2 around: connect_replicants
369
370 All calls to connect_replicants needs to have an existing $schema tacked onto
371 top of the args, since L<DBIx::Storage::DBI> needs it.
372
373 =cut
374
375 around 'connect_replicants' => sub {
376   my ($method, $self, @args) = @_;
377   $self->$method($self->schema, @args);
378 };
379
380 =head2 all_storages
381
382 Returns an array of of all the connected storage backends.  The first element
383 in the returned array is the master, and the remainings are each of the
384 replicants.
385
386 =cut
387
388 sub all_storages {
389   my $self = shift @_;
390   return grep {defined $_ && blessed $_} (
391      $self->master,
392      $self->replicants,
393   );
394 }
395
396 =head2 execute_reliably ($coderef, ?@args)
397
398 Given a coderef, saves the current state of the L</read_handler>, forces it to
399 use reliable storage (ie sets it to the master), executes a coderef and then
400 restores the original state.
401
402 Example:
403
404   my $reliably = sub {
405     my $name = shift @_;
406     $schema->resultset('User')->create({name=>$name});
407     my $user_rs = $schema->resultset('User')->find({name=>$name}); 
408     return $user_rs;
409   };
410
411   my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
412
413 Use this when you must be certain of your database state, such as when you just
414 inserted something and need to get a resultset including it, etc.
415
416 =cut
417
418 sub execute_reliably {
419   my ($self, $coderef, @args) = @_;
420   
421   unless( ref $coderef eq 'CODE') {
422     $self->throw_exception('Second argument must be a coderef');
423   }
424   
425   ##Get copy of master storage
426   my $master = $self->master;
427   
428   ##Get whatever the current read hander is
429   my $current = $self->read_handler;
430   
431   ##Set the read handler to master
432   $self->read_handler($master);
433   
434   ## do whatever the caller needs
435   my @result;
436   my $want_array = wantarray;
437   
438   eval {
439     if($want_array) {
440       @result = $coderef->(@args);
441     } elsif(defined $want_array) {
442       ($result[0]) = ($coderef->(@args));
443     } else {
444       $coderef->(@args);
445     }       
446   };
447   
448   ##Reset to the original state
449   $self->read_handler($current); 
450   
451   ##Exception testing has to come last, otherwise you might leave the 
452   ##read_handler set to master.
453   
454   if($@) {
455     $self->throw_exception("coderef returned an error: $@");
456   } else {
457     return $want_array ? @result : $result[0];
458   }
459 }
460
461 =head2 set_reliable_storage
462
463 Sets the current $schema to be 'reliable', that is all queries, both read and
464 write are sent to the master
465   
466 =cut
467
468 sub set_reliable_storage {
469   my $self = shift @_;
470   my $schema = $self->schema;
471   my $write_handler = $self->schema->storage->write_handler;
472   
473   $schema->storage->read_handler($write_handler);
474 }
475
476 =head2 set_balanced_storage
477
478 Sets the current $schema to be use the </balancer> for all reads, while all
479 writea are sent to the master only
480   
481 =cut
482
483 sub set_balanced_storage {
484   my $self = shift @_;
485   my $schema = $self->schema;
486   my $write_handler = $self->schema->storage->balancer;
487   
488   $schema->storage->read_handler($write_handler);
489 }
490
491 =head2 around: txn_do ($coderef)
492
493 Overload to the txn_do method, which is delegated to whatever the
494 L<write_handler> is set to.  We overload this in order to wrap in inside a
495 L</execute_reliably> method.
496
497 =cut
498
499 around 'txn_do' => sub {
500   my($txn_do, $self, $coderef, @args) = @_;
501   $self->execute_reliably(sub {$self->$txn_do($coderef, @args)}); 
502 };
503
504 =head2 connected
505
506 Check that the master and at least one of the replicants is connected.
507
508 =cut
509
510 sub connected {
511   my $self = shift @_;
512   return
513     $self->master->connected &&
514     $self->pool->connected_replicants;
515 }
516
517 =head2 ensure_connected
518
519 Make sure all the storages are connected.
520
521 =cut
522
523 sub ensure_connected {
524   my $self = shift @_;
525   foreach my $source ($self->all_storages) {
526     $source->ensure_connected(@_);
527   }
528 }
529
530 =head2 limit_dialect
531
532 Set the limit_dialect for all existing storages
533
534 =cut
535
536 sub limit_dialect {
537   my $self = shift @_;
538   foreach my $source ($self->all_storages) {
539     $source->limit_dialect(@_);
540   }
541 }
542
543 =head2 quote_char
544
545 Set the quote_char for all existing storages
546
547 =cut
548
549 sub quote_char {
550   my $self = shift @_;
551   foreach my $source ($self->all_storages) {
552     $source->quote_char(@_);
553   }
554 }
555
556 =head2 name_sep
557
558 Set the name_sep for all existing storages
559
560 =cut
561
562 sub name_sep {
563   my $self = shift @_;
564   foreach my $source ($self->all_storages) {
565     $source->name_sep(@_);
566   }
567 }
568
569 =head2 set_schema
570
571 Set the schema object for all existing storages
572
573 =cut
574
575 sub set_schema {
576   my $self = shift @_;
577   foreach my $source ($self->all_storages) {
578     $source->set_schema(@_);
579   }
580 }
581
582 =head2 debug
583
584 set a debug flag across all storages
585
586 =cut
587
588 sub debug {
589   my $self = shift @_;
590   foreach my $source ($self->all_storages) {
591     $source->debug(@_);
592   }
593 }
594
595 =head2 debugobj
596
597 set a debug object across all storages
598
599 =cut
600
601 sub debugobj {
602   my $self = shift @_;
603   foreach my $source ($self->all_storages) {
604     $source->debugobj(@_);
605   }
606 }
607
608 =head2 debugfh
609
610 set a debugfh object across all storages
611
612 =cut
613
614 sub debugfh {
615   my $self = shift @_;
616   foreach my $source ($self->all_storages) {
617     $source->debugfh(@_);
618   }
619 }
620
621 =head2 debugcb
622
623 set a debug callback across all storages
624
625 =cut
626
627 sub debugcb {
628   my $self = shift @_;
629   foreach my $source ($self->all_storages) {
630     $source->debugcb(@_);
631   }
632 }
633
634 =head2 disconnect
635
636 disconnect everything
637
638 =cut
639
640 sub disconnect {
641   my $self = shift @_;
642   foreach my $source ($self->all_storages) {
643     $source->disconnect(@_);
644   }
645 }
646
647 =head1 GOTCHAS
648
649 Due to the fact that replicants can lag behind a master, you must take care to
650 make sure you use one of the methods to force read queries to a master should
651 you need realtime data integrity.  For example, if you insert a row, and then
652 immediately re-read it from the database (say, by doing $row->discard_changes)
653 or you insert a row and then immediately build a query that expects that row
654 to be an item, you should force the master to handle reads.  Otherwise, due to
655 the lag, there is no certainty your data will be in the expected state.
656
657 For data integrity, all transactions automatically use the master storage for
658 all read and write queries.  Using a transaction is the preferred and recommended
659 method to force the master to handle all read queries.
660
661 Otherwise, you can force a single query to use the master with the 'force_pool'
662 attribute:
663
664   my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk);
665
666 This attribute will safely be ignore by non replicated storages, so you can use
667 the same code for both types of systems.
668
669 Lastly, you can use the L</execute_reliably> method, which works very much like
670 a transaction.
671
672 For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
673 and L</set_balanced_storage>, however this operates at a global level and is not
674 suitable if you have a shared Schema object being used by multiple processes,
675 such as on a web application server.  You can get around this limitation by
676 using the Schema clone method.
677
678   my $new_schema = $schema->clone;
679   $new_schema->set_reliable_storage;
680   
681   ## $new_schema will use only the Master storage for all reads/writes while
682   ## the $schema object will use replicated storage.
683
684 =head1 AUTHOR
685
686   John Napiorkowski <john.napiorkowski@takkle.com>
687
688 Based on code originated by:
689
690   Norbert Csongrádi <bert@cpan.org>
691   Peter Siklósi <einon@einon.hu>
692
693 =head1 LICENSE
694
695 You may distribute this code under the same terms as Perl itself.
696
697 =cut
698
699 __PACKAGE__->meta->make_immutable;
700
701 1;