make sure various Storage mutators correctly return a useful value
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 use Moose;
4 use Class::MOP;
5 use Moose::Util::TypeConstraints;
6 use DBIx::Class::Storage::DBI;
7 use DBIx::Class::Storage::DBI::Replicated::Pool;
8 use DBIx::Class::Storage::DBI::Replicated::Balancer;
9
10 =head1 NAME
11
12 DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
13
14 =head1 SYNOPSIS
15
16 The Following example shows how to change an existing $schema to a replicated
17 storage type, add some replicated (readonly) databases, and perform reporting
18 tasks.
19
20   ## Change storage_type in your schema class
21   $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
22   
23   ## Add some slaves.  Basically this is an array of arrayrefs, where each
24   ## arrayref is database connect information
25   
26   $schema->storage->connect_replicants(
27     [$dsn1, $user, $pass, \%opts],
28     [$dsn2, $user, $pass, \%opts],
29     [$dsn3, $user, $pass, \%opts],
30   );
31   
32   ## Now, just use the $schema as normal
33   $schema->resultset('Source')->search({name=>'etc'});
34   
35   ## You can force a given query to use a particular storage using the search
36   ### attribute 'force_pool'.  For example:
37   
38   my $RS = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
39   
40   ## Now $RS will force everything (both reads and writes) to use whatever was
41   ## setup as the master storage.  'master' is hardcoded to always point to the
42   ## Master, but you can also use any Replicant name.  Please see:
43   ## L<DBIx::Class::Storage::Replicated::Pool> and the replicants attribute for
44   ## More. Also see transactions and L</execute_reliably> for alternative ways
45   ## to force read traffic to the master.
46   
47 =head1 DESCRIPTION
48
49 Warning: This class is marked BETA.  This has been running a production
50 website using MySQL native replication as it's backend and we have some decent
51 test coverage but the code hasn't yet been stressed by a variety of databases.
52 Individual DB's may have quirks we are not aware of.  Please use this in first
53 development and pass along your experiences/bug fixes.
54
55 This class implements replicated data store for DBI. Currently you can define
56 one master and numerous slave database connections. All write-type queries
57 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
58 database, all read-type queries (SELECTs) go to the slave database.
59
60 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
61 handle gets delegated to one of the two attributes: L</read_handler> or to
62 L</write_handler>.  Additionally, some methods need to be distributed
63 to all existing storages.  This way our storage class is a drop in replacement
64 for L<DBIx::Class::Storage::DBI>.
65
66 Read traffic is spread across the replicants (slaves) occuring to a user
67 selected algorithm.  The default algorithm is random weighted.
68
69 =head1 NOTES
70
71 The consistancy betweeen master and replicants is database specific.  The Pool
72 gives you a method to validate it's replicants, removing and replacing them
73 when they fail/pass predefined criteria.  Please make careful use of the ways
74 to force a query to run against Master when needed.  
75
76 =head1 ATTRIBUTES
77
78 This class defines the following attributes.
79
80 =head2 schema
81
82 The underlying L<DBIx::Class::Schema> object this storage is attaching
83
84 =cut
85
86 has 'schema' => (
87     is=>'rw',
88     isa=>'DBIx::Class::Schema',
89     weak_ref=>1,
90     required=>1,
91 );
92
93 =head2 pool_type
94
95 Contains the classname which will instantiate the L</pool> object.  Defaults 
96 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
97
98 =cut
99
100 has 'pool_type' => (
101   is=>'ro',
102   isa=>'ClassName',
103   required=>1,
104   default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
105   handles=>{
106     'create_pool' => 'new',
107   },
108 );
109
110 =head2 pool_args
111
112 Contains a hashref of initialized information to pass to the Balancer object.
113 See L<DBIx::Class::Storage::Replicated::Pool> for available arguments.
114
115 =cut
116
117 has 'pool_args' => (
118   is=>'ro',
119   isa=>'HashRef',
120   lazy=>1,
121   required=>1,
122   default=>sub { {} },
123 );
124
125
126 =head2 balancer_type
127
128 The replication pool requires a balance class to provider the methods for
129 choose how to spread the query load across each replicant in the pool.
130
131 =cut
132
133 subtype 'DBIx::Class::Storage::DBI::Replicated::BalancerClassNamePart',
134   as 'ClassName';
135     
136 coerce 'DBIx::Class::Storage::DBI::Replicated::BalancerClassNamePart',
137   from 'Str',
138   via {
139         my $type = $_;
140     if($type=~m/^::/) {
141       $type = 'DBIx::Class::Storage::DBI::Replicated::Balancer'.$type;
142     }  
143     Class::MOP::load_class($type);  
144     $type;      
145   };
146
147 has 'balancer_type' => (
148   is=>'ro',
149   isa=>'DBIx::Class::Storage::DBI::Replicated::BalancerClassNamePart',
150   coerce=>1,
151   required=>1,
152   default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
153   handles=>{
154     'create_balancer' => 'new',
155   },
156 );
157
158 =head2 balancer_args
159
160 Contains a hashref of initialized information to pass to the Balancer object.
161 See L<DBIx::Class::Storage::Replicated::Balancer> for available arguments.
162
163 =cut
164
165 has 'balancer_args' => (
166   is=>'ro',
167   isa=>'HashRef',
168   lazy=>1,
169   required=>1,
170   default=>sub { {} },
171 );
172
173 =head2 pool
174
175 Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
176 container class for one or more replicated databases.
177
178 =cut
179
180 has 'pool' => (
181   is=>'ro',
182   isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
183   lazy_build=>1,
184   handles=>[qw/
185     connect_replicants    
186     replicants
187     has_replicants
188   /],
189 );
190
191 =head2 balancer
192
193 Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This 
194 is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
195
196 =cut
197
198 has 'balancer' => (
199   is=>'ro',
200   isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
201   lazy_build=>1,
202   handles=>[qw/auto_validate_every/],
203 );
204
205 =head2 master
206
207 The master defines the canonical state for a pool of connected databases.  All
208 the replicants are expected to match this databases state.  Thus, in a classic
209 Master / Slaves distributed system, all the slaves are expected to replicate
210 the Master's state as quick as possible.  This is the only database in the
211 pool of databases that is allowed to handle write traffic.
212
213 =cut
214
215 has 'master' => (
216   is=> 'ro',
217   isa=>'DBIx::Class::Storage::DBI',
218   lazy_build=>1,
219 );
220
221 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
222
223 The following methods are delegated all the methods required for the 
224 L<DBIx::Class::Storage::DBI> interface.
225
226 =head2 read_handler
227
228 Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
229
230 =cut
231
232 has 'read_handler' => (
233   is=>'rw',
234   isa=>'Object',
235   lazy_build=>1,
236   handles=>[qw/
237     select
238     select_single
239     columns_info_for
240   /],    
241 );
242
243 =head2 write_handler
244
245 Defines an object that implements the write side of L<BIx::Class::Storage::DBI>.
246
247 =cut
248
249 has 'write_handler' => (
250   is=>'ro',
251   isa=>'Object',
252   lazy_build=>1,
253   lazy_build=>1,
254   handles=>[qw/   
255     on_connect_do
256     on_disconnect_do       
257     connect_info
258     throw_exception
259     sql_maker
260     sqlt_type
261     create_ddl_dir
262     deployment_statements
263     datetime_parser
264     datetime_parser_type        
265     last_insert_id
266     insert
267     insert_bulk
268     update
269     delete
270     dbh
271     txn_begin
272     txn_do
273     txn_commit
274     txn_rollback
275     txn_scope_guard
276     sth
277     deploy
278
279     reload_row
280     _prep_for_execute
281     configure_sqlt
282     
283   /],
284 );
285
286 =head1 METHODS
287
288 This class defines the following methods.
289
290 =head2 BUILDARGS
291
292 L<DBIx::Class::Schema> when instantiating it's storage passed itself as the
293 first argument.  So we need to massage the arguments a bit so that all the
294 bits get put into the correct places.
295
296 =cut
297
298 sub BUILDARGS {
299   my ($class, $schema, $storage_type_args, @args) = @_; 
300   
301   return {
302         schema=>$schema, 
303         %$storage_type_args,
304         @args
305   }
306 }
307
308 =head2 _build_master
309
310 Lazy builder for the L</master> attribute.
311
312 =cut
313
314 sub _build_master {
315   my $self = shift @_;
316   DBIx::Class::Storage::DBI->new($self->schema);
317 }
318
319 =head2 _build_pool
320
321 Lazy builder for the L</pool> attribute.
322
323 =cut
324
325 sub _build_pool {
326   my $self = shift @_;
327   $self->create_pool(%{$self->pool_args});
328 }
329
330 =head2 _build_balancer
331
332 Lazy builder for the L</balancer> attribute.  This takes a Pool object so that
333 the balancer knows which pool it's balancing.
334
335 =cut
336
337 sub _build_balancer {
338   my $self = shift @_;
339   $self->create_balancer(
340     pool=>$self->pool, 
341     master=>$self->master,
342     %{$self->balancer_args},
343   );
344 }
345
346 =head2 _build_write_handler
347
348 Lazy builder for the L</write_handler> attribute.  The default is to set this to
349 the L</master>.
350
351 =cut
352
353 sub _build_write_handler {
354   return shift->master;
355 }
356
357 =head2 _build_read_handler
358
359 Lazy builder for the L</read_handler> attribute.  The default is to set this to
360 the L</balancer>.
361
362 =cut
363
364 sub _build_read_handler {
365   return shift->balancer;
366 }
367
368 =head2 around: connect_replicants
369
370 All calls to connect_replicants needs to have an existing $schema tacked onto
371 top of the args, since L<DBIx::Storage::DBI> needs it.
372
373 =cut
374
375 around 'connect_replicants' => sub {
376   my ($method, $self, @args) = @_;
377   $self->$method($self->schema, @args);
378 };
379
380 =head2 all_storages
381
382 Returns an array of of all the connected storage backends.  The first element
383 in the returned array is the master, and the remainings are each of the
384 replicants.
385
386 =cut
387
388 sub all_storages {
389   my $self = shift @_;
390   return grep {defined $_ && blessed $_} (
391      $self->master,
392      $self->replicants,
393   );
394 }
395
396 =head2 execute_reliably ($coderef, ?@args)
397
398 Given a coderef, saves the current state of the L</read_handler>, forces it to
399 use reliable storage (ie sets it to the master), executes a coderef and then
400 restores the original state.
401
402 Example:
403
404   my $reliably = sub {
405     my $name = shift @_;
406     $schema->resultset('User')->create({name=>$name});
407     my $user_rs = $schema->resultset('User')->find({name=>$name}); 
408     return $user_rs;
409   };
410
411   my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
412
413 Use this when you must be certain of your database state, such as when you just
414 inserted something and need to get a resultset including it, etc.
415
416 =cut
417
418 use Benchmark;
419
420 sub execute_reliably {
421   my ($self, $coderef, @args) = @_;
422   
423   unless( ref $coderef eq 'CODE') {
424     $self->throw_exception('Second argument must be a coderef');
425   }
426   
427   ##Get copy of master storage
428   my $master = $self->master;
429   
430   ##Get whatever the current read hander is
431   my $current = $self->read_handler;
432   
433   ##Set the read handler to master
434   $self->read_handler($master);
435   
436   ## do whatever the caller needs
437   my @result;
438   my $want_array = wantarray;
439   
440   eval {
441     if($want_array) {
442       @result = $coderef->(@args);
443     } elsif(defined $want_array) {
444       ($result[0]) = ($coderef->(@args));
445     } else {
446       $coderef->(@args);
447     }       
448   };
449   
450   ##Reset to the original state
451   $self->read_handler($current); 
452   
453   ##Exception testing has to come last, otherwise you might leave the 
454   ##read_handler set to master.
455   
456   if($@) {
457     $self->throw_exception("coderef returned an error: $@");
458   } else {
459     return $want_array ? @result : $result[0];
460   }
461 }
462
463 =head2 set_reliable_storage
464
465 Sets the current $schema to be 'reliable', that is all queries, both read and
466 write are sent to the master
467   
468 =cut
469
470 sub set_reliable_storage {
471   my $self = shift @_;
472   my $schema = $self->schema;
473   my $write_handler = $self->schema->storage->write_handler;
474   
475   $schema->storage->read_handler($write_handler);
476 }
477
478 =head2 set_balanced_storage
479
480 Sets the current $schema to be use the </balancer> for all reads, while all
481 writea are sent to the master only
482   
483 =cut
484
485 sub set_balanced_storage {
486   my $self = shift @_;
487   my $schema = $self->schema;
488   my $write_handler = $self->schema->storage->balancer;
489   
490   $schema->storage->read_handler($write_handler);
491 }
492
493 =head2 around: txn_do ($coderef)
494
495 Overload to the txn_do method, which is delegated to whatever the
496 L<write_handler> is set to.  We overload this in order to wrap in inside a
497 L</execute_reliably> method.
498
499 =cut
500
501 around 'txn_do' => sub {
502   my($txn_do, $self, $coderef, @args) = @_;
503   $self->execute_reliably(sub {$self->$txn_do($coderef, @args)}); 
504 };
505
506 =head2 connected
507
508 Check that the master and at least one of the replicants is connected.
509
510 =cut
511
512 sub connected {
513   my $self = shift @_;
514   return
515     $self->master->connected &&
516     $self->pool->connected_replicants;
517 }
518
519 =head2 ensure_connected
520
521 Make sure all the storages are connected.
522
523 =cut
524
525 sub ensure_connected {
526   my $self = shift @_;
527   foreach my $source ($self->all_storages) {
528     $source->ensure_connected(@_);
529   }
530 }
531
532 =head2 limit_dialect
533
534 Set the limit_dialect for all existing storages
535
536 =cut
537
538 sub limit_dialect {
539   my $self = shift @_;
540   foreach my $source ($self->all_storages) {
541     $source->limit_dialect(@_);
542   }
543   return $self->master->quote_char;
544 }
545
546 =head2 quote_char
547
548 Set the quote_char for all existing storages
549
550 =cut
551
552 sub quote_char {
553   my $self = shift @_;
554   foreach my $source ($self->all_storages) {
555     $source->quote_char(@_);
556   }
557   return $self->master->quote_char;
558 }
559
560 =head2 name_sep
561
562 Set the name_sep for all existing storages
563
564 =cut
565
566 sub name_sep {
567   my $self = shift @_;
568   foreach my $source ($self->all_storages) {
569     $source->name_sep(@_);
570   }
571   return $self->master->name_sep;
572 }
573
574 =head2 set_schema
575
576 Set the schema object for all existing storages
577
578 =cut
579
580 sub set_schema {
581   my $self = shift @_;
582   foreach my $source ($self->all_storages) {
583     $source->set_schema(@_);
584   }
585 }
586
587 =head2 debug
588
589 set a debug flag across all storages
590
591 =cut
592
593 sub debug {
594   my $self = shift @_;
595   if(@_) {
596     foreach my $source ($self->all_storages) {
597       $source->debug(@_);
598     }   
599   }
600   return $self->master->debug;
601 }
602
603 =head2 debugobj
604
605 set a debug object across all storages
606
607 =cut
608
609 sub debugobj {
610   my $self = shift @_;
611   if(@_) {
612     foreach my $source ($self->all_storages) {
613       $source->debugobj(@_);
614     }   
615   }
616   return $self->master->debugobj;
617 }
618
619 =head2 debugfh
620
621 set a debugfh object across all storages
622
623 =cut
624
625 sub debugfh {
626   my $self = shift @_;
627   if(@_) {
628     foreach my $source ($self->all_storages) {
629       $source->debugfh(@_);
630     }   
631   }
632   return $self->master->debugfh;
633 }
634
635 =head2 debugcb
636
637 set a debug callback across all storages
638
639 =cut
640
641 sub debugcb {
642   my $self = shift @_;
643   if(@_) {
644     foreach my $source ($self->all_storages) {
645       $source->debugcb(@_);
646     }   
647   }
648   return $self->master->debugcb;
649 }
650
651 =head2 disconnect
652
653 disconnect everything
654
655 =cut
656
657 sub disconnect {
658   my $self = shift @_;
659   foreach my $source ($self->all_storages) {
660     $source->disconnect(@_);
661   }
662 }
663
664 =head1 GOTCHAS
665
666 Due to the fact that replicants can lag behind a master, you must take care to
667 make sure you use one of the methods to force read queries to a master should
668 you need realtime data integrity.  For example, if you insert a row, and then
669 immediately re-read it from the database (say, by doing $row->discard_changes)
670 or you insert a row and then immediately build a query that expects that row
671 to be an item, you should force the master to handle reads.  Otherwise, due to
672 the lag, there is no certainty your data will be in the expected state.
673
674 For data integrity, all transactions automatically use the master storage for
675 all read and write queries.  Using a transaction is the preferred and recommended
676 method to force the master to handle all read queries.
677
678 Otherwise, you can force a single query to use the master with the 'force_pool'
679 attribute:
680
681   my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk);
682
683 This attribute will safely be ignore by non replicated storages, so you can use
684 the same code for both types of systems.
685
686 Lastly, you can use the L</execute_reliably> method, which works very much like
687 a transaction.
688
689 For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
690 and L</set_balanced_storage>, however this operates at a global level and is not
691 suitable if you have a shared Schema object being used by multiple processes,
692 such as on a web application server.  You can get around this limitation by
693 using the Schema clone method.
694
695   my $new_schema = $schema->clone;
696   $new_schema->set_reliable_storage;
697   
698   ## $new_schema will use only the Master storage for all reads/writes while
699   ## the $schema object will use replicated storage.
700
701 =head1 AUTHOR
702
703   John Napiorkowski <john.napiorkowski@takkle.com>
704
705 Based on code originated by:
706
707   Norbert Csongrádi <bert@cpan.org>
708   Peter Siklósi <einon@einon.hu>
709
710 =head1 LICENSE
711
712 You may distribute this code under the same terms as Perl itself.
713
714 =cut
715
716 __PACKAGE__->meta->make_immutable;
717
718 1;