updated documentation, adding some hints and details, changed the way we can use...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 use Moose;
4 use Class::MOP;
5 use Moose::Util::TypeConstraints;
6 use DBIx::Class::Storage::DBI;
7 use DBIx::Class::Storage::DBI::Replicated::Pool;
8 use DBIx::Class::Storage::DBI::Replicated::Balancer;
9
10 =head1 NAME
11
12 DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
13
14 =head1 SYNOPSIS
15
16 The Following example shows how to change an existing $schema to a replicated
17 storage type, add some replicated (readonly) databases, and perform reporting
18 tasks.
19
20   ## Change storage_type in your schema class
21   $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
22   
23   ## Add some slaves.  Basically this is an array of arrayrefs, where each
24   ## arrayref is database connect information
25   
26   $schema->storage->connect_replicants(
27     [$dsn1, $user, $pass, \%opts],
28     [$dsn2, $user, $pass, \%opts],
29     [$dsn3, $user, $pass, \%opts],
30   );
31   
32   ## Now, just use the $schema as normal
33   $schema->resultset('Source')->search({name=>'etc'});
34   
35   ## You can force a given query to use a particular storage using the search
36   ### attribute 'force_pool'.  For example:
37   
38   my $RS = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
39   
40   ## Now $RS will force everything (both reads and writes) to use whatever was
41   ## setup as the master storage.  'master' is hardcoded to always point to the
42   ## Master, but you can also use any Replicant name.  Please see:
43   ## L<DBIx::Class::Storage::Replicated::Pool> and the replicants attribute for
44   ## More. Also see transactions and L</execute_reliably> for alternative ways
45   ## to force read traffic to the master.
46   
47 =head1 DESCRIPTION
48
49 Warning: This class is marked BETA.  This has been running a production
50 website using MySQL native replication as it's backend and we have some decent
51 test coverage but the code hasn't yet been stressed by a variety of databases.
52 Individual DB's may have quirks we are not aware of.  Please use this in first
53 development and pass along your experiences/bug fixes.
54
55 This class implements replicated data store for DBI. Currently you can define
56 one master and numerous slave database connections. All write-type queries
57 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
58 database, all read-type queries (SELECTs) go to the slave database.
59
60 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
61 handle gets delegated to one of the two attributes: L</read_handler> or to
62 L</write_handler>.  Additionally, some methods need to be distributed
63 to all existing storages.  This way our storage class is a drop in replacement
64 for L<DBIx::Class::Storage::DBI>.
65
66 Read traffic is spread across the replicants (slaves) occuring to a user
67 selected algorithm.  The default algorithm is random weighted.
68
69 =head1 NOTES
70
71 The consistancy betweeen master and replicants is database specific.  The Pool
72 gives you a method to validate it's replicants, removing and replacing them
73 when they fail/pass predefined criteria.  Please make careful use of the ways
74 to force a query to run against Master when needed.  
75
76 =head1 ATTRIBUTES
77
78 This class defines the following attributes.
79
80 =head2 schema
81
82 The underlying L<DBIx::Class::Schema> object this storage is attaching
83
84 =cut
85
86 has 'schema' => (
87     is=>'rw',
88     isa=>'DBIx::Class::Schema',
89     weak_ref=>1,
90     required=>1,
91 );
92
93 =head2 pool_type
94
95 Contains the classname which will instantiate the L</pool> object.  Defaults 
96 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
97
98 =cut
99
100 has 'pool_type' => (
101   is=>'ro',
102   isa=>'ClassName',
103   required=>1,
104   default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
105   handles=>{
106     'create_pool' => 'new',
107   },
108 );
109
110 =head2 pool_args
111
112 Contains a hashref of initialized information to pass to the Balancer object.
113 See L<DBIx::Class::Storage::Replicated::Pool> for available arguments.
114
115 =cut
116
117 has 'pool_args' => (
118   is=>'ro',
119   isa=>'HashRef',
120   lazy=>1,
121   required=>1,
122   default=>sub { {} },
123 );
124
125
126 =head2 balancer_type
127
128 The replication pool requires a balance class to provider the methods for
129 choose how to spread the query load across each replicant in the pool.
130
131 =cut
132
133 subtype 'DBIx::Class::Storage::DBI::Replicated::BalancerClassNamePart',
134   as 'ClassName';
135     
136 coerce 'DBIx::Class::Storage::DBI::Replicated::BalancerClassNamePart',
137   from 'Str',
138   via {
139         my $type = $_;
140     if($type=~m/^::/) {
141       $type = 'DBIx::Class::Storage::DBI::Replicated::Balancer'.$type;
142     }  
143     Class::MOP::load_class($type);  
144     $type;      
145   };
146
147 has 'balancer_type' => (
148   is=>'ro',
149   isa=>'DBIx::Class::Storage::DBI::Replicated::BalancerClassNamePart',
150   coerce=>1,
151   required=>1,
152   default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
153   handles=>{
154     'create_balancer' => 'new',
155   },
156 );
157
158 =head2 balancer_args
159
160 Contains a hashref of initialized information to pass to the Balancer object.
161 See L<DBIx::Class::Storage::Replicated::Balancer> for available arguments.
162
163 =cut
164
165 has 'balancer_args' => (
166   is=>'ro',
167   isa=>'HashRef',
168   lazy=>1,
169   required=>1,
170   default=>sub { {} },
171 );
172
173 =head2 pool
174
175 Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
176 container class for one or more replicated databases.
177
178 =cut
179
180 has 'pool' => (
181   is=>'ro',
182   isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
183   lazy_build=>1,
184   handles=>[qw/
185     connect_replicants    
186     replicants
187     has_replicants
188   /],
189 );
190
191 =head2 balancer
192
193 Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This 
194 is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
195
196 =cut
197
198 has 'balancer' => (
199   is=>'ro',
200   isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
201   lazy_build=>1,
202   handles=>[qw/auto_validate_every/],
203 );
204
205 =head2 master
206
207 The master defines the canonical state for a pool of connected databases.  All
208 the replicants are expected to match this databases state.  Thus, in a classic
209 Master / Slaves distributed system, all the slaves are expected to replicate
210 the Master's state as quick as possible.  This is the only database in the
211 pool of databases that is allowed to handle write traffic.
212
213 =cut
214
215 has 'master' => (
216   is=> 'ro',
217   isa=>'DBIx::Class::Storage::DBI',
218   lazy_build=>1,
219 );
220
221 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
222
223 The following methods are delegated all the methods required for the 
224 L<DBIx::Class::Storage::DBI> interface.
225
226 =head2 read_handler
227
228 Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
229
230 =cut
231
232 has 'read_handler' => (
233   is=>'rw',
234   isa=>'Object',
235   lazy_build=>1,
236   handles=>[qw/
237     select
238     select_single
239     columns_info_for
240   /],    
241 );
242
243 =head2 write_handler
244
245 Defines an object that implements the write side of L<BIx::Class::Storage::DBI>.
246
247 =cut
248
249 has 'write_handler' => (
250   is=>'ro',
251   isa=>'Object',
252   lazy_build=>1,
253   lazy_build=>1,
254   handles=>[qw/   
255     on_connect_do
256     on_disconnect_do       
257     connect_info
258     throw_exception
259     sql_maker
260     sqlt_type
261     create_ddl_dir
262     deployment_statements
263     datetime_parser
264     datetime_parser_type        
265     last_insert_id
266     insert
267     insert_bulk
268     update
269     delete
270     dbh
271     txn_begin
272     txn_do
273     txn_commit
274     txn_rollback
275     txn_scope_guard
276     sth
277     deploy
278
279     reload_row
280     _prep_for_execute
281     configure_sqlt
282     
283   /],
284 );
285
286 =head1 METHODS
287
288 This class defines the following methods.
289
290 =head2 new
291
292 L<DBIx::Class::Schema> when instantiating it's storage passed itself as the
293 first argument.  So we need to massage the arguments a bit so that all the
294 bits get put into the correct places.
295
296 =cut
297
298 around 'new' => sub {
299   my ($new, $self, $schema, $storage_type_args, @args) = @_;
300   return $self->$new(schema=>$schema, %$storage_type_args, @args);
301 };
302
303 =head2 _build_master
304
305 Lazy builder for the L</master> attribute.
306
307 =cut
308
309 sub _build_master {
310   my $self = shift @_;
311   DBIx::Class::Storage::DBI->new($self->schema);
312 }
313
314 =head2 _build_pool
315
316 Lazy builder for the L</pool> attribute.
317
318 =cut
319
320 sub _build_pool {
321   my $self = shift @_;
322   $self->create_pool(%{$self->pool_args});
323 }
324
325 =head2 _build_balancer
326
327 Lazy builder for the L</balancer> attribute.  This takes a Pool object so that
328 the balancer knows which pool it's balancing.
329
330 =cut
331
332 sub _build_balancer {
333   my $self = shift @_;
334   $self->create_balancer(
335     pool=>$self->pool, 
336     master=>$self->master,
337     %{$self->balancer_args},
338   );
339 }
340
341 =head2 _build_write_handler
342
343 Lazy builder for the L</write_handler> attribute.  The default is to set this to
344 the L</master>.
345
346 =cut
347
348 sub _build_write_handler {
349   return shift->master;
350 }
351
352 =head2 _build_read_handler
353
354 Lazy builder for the L</read_handler> attribute.  The default is to set this to
355 the L</balancer>.
356
357 =cut
358
359 sub _build_read_handler {
360   return shift->balancer;
361 }
362
363 =head2 around: connect_replicants
364
365 All calls to connect_replicants needs to have an existing $schema tacked onto
366 top of the args, since L<DBIx::Storage::DBI> needs it.
367
368 =cut
369
370 around 'connect_replicants' => sub {
371   my ($method, $self, @args) = @_;
372   $self->$method($self->schema, @args);
373 };
374
375 =head2 all_storages
376
377 Returns an array of of all the connected storage backends.  The first element
378 in the returned array is the master, and the remainings are each of the
379 replicants.
380
381 =cut
382
383 sub all_storages {
384   my $self = shift @_;
385   return grep {defined $_ && blessed $_} (
386      $self->master,
387      $self->replicants,
388   );
389 }
390
391 =head2 execute_reliably ($coderef, ?@args)
392
393 Given a coderef, saves the current state of the L</read_handler>, forces it to
394 use reliable storage (ie sets it to the master), executes a coderef and then
395 restores the original state.
396
397 Example:
398
399   my $reliably = sub {
400     my $name = shift @_;
401     $schema->resultset('User')->create({name=>$name});
402     my $user_rs = $schema->resultset('User')->find({name=>$name}); 
403     return $user_rs;
404   };
405
406   my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
407
408 Use this when you must be certain of your database state, such as when you just
409 inserted something and need to get a resultset including it, etc.
410
411 =cut
412
413 sub execute_reliably {
414   my ($self, $coderef, @args) = @_;
415   
416   unless( ref $coderef eq 'CODE') {
417     $self->throw_exception('Second argument must be a coderef');
418   }
419   
420   ##Get copy of master storage
421   my $master = $self->master;
422   
423   ##Get whatever the current read hander is
424   my $current = $self->read_handler;
425   
426   ##Set the read handler to master
427   $self->read_handler($master);
428   
429   ## do whatever the caller needs
430   my @result;
431   my $want_array = wantarray;
432   
433   eval {
434     if($want_array) {
435       @result = $coderef->(@args);
436     } elsif(defined $want_array) {
437       ($result[0]) = ($coderef->(@args));
438     } else {
439       $coderef->(@args);
440     }       
441   };
442   
443   ##Reset to the original state
444   $self->read_handler($current); 
445   
446   ##Exception testing has to come last, otherwise you might leave the 
447   ##read_handler set to master.
448   
449   if($@) {
450     $self->throw_exception("coderef returned an error: $@");
451   } else {
452     return $want_array ? @result : $result[0];
453   }
454 }
455
456 =head2 set_reliable_storage
457
458 Sets the current $schema to be 'reliable', that is all queries, both read and
459 write are sent to the master
460   
461 =cut
462
463 sub set_reliable_storage {
464   my $self = shift @_;
465   my $schema = $self->schema;
466   my $write_handler = $self->schema->storage->write_handler;
467   
468   $schema->storage->read_handler($write_handler);
469 }
470
471 =head2 set_balanced_storage
472
473 Sets the current $schema to be use the </balancer> for all reads, while all
474 writea are sent to the master only
475   
476 =cut
477
478 sub set_balanced_storage {
479   my $self = shift @_;
480   my $schema = $self->schema;
481   my $write_handler = $self->schema->storage->balancer;
482   
483   $schema->storage->read_handler($write_handler);
484 }
485
486 =head2 around: txn_do ($coderef)
487
488 Overload to the txn_do method, which is delegated to whatever the
489 L<write_handler> is set to.  We overload this in order to wrap in inside a
490 L</execute_reliably> method.
491
492 =cut
493
494 around 'txn_do' => sub {
495   my($txn_do, $self, $coderef, @args) = @_;
496   $self->execute_reliably(sub {$self->$txn_do($coderef, @args)}); 
497 };
498
499 =head2 connected
500
501 Check that the master and at least one of the replicants is connected.
502
503 =cut
504
505 sub connected {
506   my $self = shift @_;
507   return
508     $self->master->connected &&
509     $self->pool->connected_replicants;
510 }
511
512 =head2 ensure_connected
513
514 Make sure all the storages are connected.
515
516 =cut
517
518 sub ensure_connected {
519   my $self = shift @_;
520   foreach my $source ($self->all_storages) {
521     $source->ensure_connected(@_);
522   }
523 }
524
525 =head2 limit_dialect
526
527 Set the limit_dialect for all existing storages
528
529 =cut
530
531 sub limit_dialect {
532   my $self = shift @_;
533   foreach my $source ($self->all_storages) {
534     $source->limit_dialect(@_);
535   }
536 }
537
538 =head2 quote_char
539
540 Set the quote_char for all existing storages
541
542 =cut
543
544 sub quote_char {
545   my $self = shift @_;
546   foreach my $source ($self->all_storages) {
547     $source->quote_char(@_);
548   }
549 }
550
551 =head2 name_sep
552
553 Set the name_sep for all existing storages
554
555 =cut
556
557 sub name_sep {
558   my $self = shift @_;
559   foreach my $source ($self->all_storages) {
560     $source->name_sep(@_);
561   }
562 }
563
564 =head2 set_schema
565
566 Set the schema object for all existing storages
567
568 =cut
569
570 sub set_schema {
571   my $self = shift @_;
572   foreach my $source ($self->all_storages) {
573     $source->set_schema(@_);
574   }
575 }
576
577 =head2 debug
578
579 set a debug flag across all storages
580
581 =cut
582
583 sub debug {
584   my $self = shift @_;
585   foreach my $source ($self->all_storages) {
586     $source->debug(@_);
587   }
588 }
589
590 =head2 debugobj
591
592 set a debug object across all storages
593
594 =cut
595
596 sub debugobj {
597   my $self = shift @_;
598   foreach my $source ($self->all_storages) {
599     $source->debugobj(@_);
600   }
601 }
602
603 =head2 debugfh
604
605 set a debugfh object across all storages
606
607 =cut
608
609 sub debugfh {
610   my $self = shift @_;
611   foreach my $source ($self->all_storages) {
612     $source->debugfh(@_);
613   }
614 }
615
616 =head2 debugcb
617
618 set a debug callback across all storages
619
620 =cut
621
622 sub debugcb {
623   my $self = shift @_;
624   foreach my $source ($self->all_storages) {
625     $source->debugcb(@_);
626   }
627 }
628
629 =head2 disconnect
630
631 disconnect everything
632
633 =cut
634
635 sub disconnect {
636   my $self = shift @_;
637   foreach my $source ($self->all_storages) {
638     $source->disconnect(@_);
639   }
640 }
641
642 =head1 GOTCHAS
643
644 Due to the fact that replicants can lag behind a master, you must take care to
645 make sure you use one of the methods to force read queries to a master should
646 you need realtime data integrity.  For example, if you insert a row, and then
647 immediately re-read it from the database (say, by doing $row->discard_changes)
648 or you insert a row and then immediately build a query that expects that row
649 to be an item, you should force the master to handle reads.  Otherwise, due to
650 the lag, there is no certainty your data will be in the expected state.
651
652 For data integrity, all transactions automatically use the master storage for
653 all read and write queries.  Using a transaction is the preferred and recommended
654 method to force the master to handle all read queries.
655
656 Otherwise, you can force a single query to use the master with the 'force_pool'
657 attribute:
658
659   my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk);
660
661 This attribute will safely be ignore by non replicated storages, so you can use
662 the same code for both types of systems.
663
664 Lastly, you can use the L</execute_reliably> method, which works very much like
665 a transaction.
666
667 For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
668 and L</set_balanced_storage>, however this operates at a global level and is not
669 suitable if you have a shared Schema object being used by multiple processes,
670 such as on a web application server.  You can get around this limitation by
671 using the Schema clone method.
672
673   my $new_schema = $schema->clone;
674   $new_schema->set_reliable_storage;
675   
676   ## $new_schema will use only the Master storage for all reads/writes while
677   ## the $schema object will use replicated storage.
678
679 =head1 AUTHOR
680
681   John Napiorkowski <john.napiorkowski@takkle.com>
682
683 Based on code originated by:
684
685   Norbert Csongrádi <bert@cpan.org>
686   Peter Siklósi <einon@einon.hu>
687
688 =head1 LICENSE
689
690 You may distribute this code under the same terms as Perl itself.
691
692 =cut
693
694 1;