1) changed all 4 space indentation to 2 space style indents for replication code...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 use Moose;
4 use DBIx::Class::Storage::DBI;
5 use DBIx::Class::Storage::DBI::Replicated::Pool;
6 use DBIx::Class::Storage::DBI::Replicated::Balancer;
7 use Scalar::Util qw(blessed);
8
9 extends 'DBIx::Class::Storage::DBI', 'Moose::Object';
10
11 =head1 NAME
12
13 DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
14
15 =head1 SYNOPSIS
16
17 The Following example shows how to change an existing $schema to a replicated
18 storage type, add some replicated (readonly) databases, and perform reporting
19 tasks.
20
21   ## Change storage_type in your schema class
22   $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
23   
24   ## Add some slaves.  Basically this is an array of arrayrefs, where each
25   ## arrayref is database connect information
26   
27   $schema->storage->connect_replicants(
28     [$dsn1, $user, $pass, \%opts],
29     [$dsn2, $user, $pass, \%opts],
30     [$dsn3, $user, $pass, \%opts],
31   );
32   
33 =head1 DESCRIPTION
34
35 Warning: This class is marked ALPHA.  We are using this in development and have
36 some basic test coverage but the code hasn't yet been stressed by a variety
37 of databases.  Individual DB's may have quirks we are not aware of.  Please
38 use this in development and pass along your experiences/bug fixes.
39
40 This class implements replicated data store for DBI. Currently you can define
41 one master and numerous slave database connections. All write-type queries
42 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
43 database, all read-type queries (SELECTs) go to the slave database.
44
45 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
46 handle gets delegated to one of the two attributes: L</read_handler> or to
47 L</write_handler>.  Additionally, some methods need to be distributed
48 to all existing storages.  This way our storage class is a drop in replacement
49 for L<DBIx::Class::Storage::DBI>.
50
51 Read traffic is spread across the replicants (slaves) occuring to a user
52 selected algorithm.  The default algorithm is random weighted.
53
54 =head1 NOTES
55
56 The consistancy betweeen master and replicants is database specific.  The Pool
57 gives you a method to validate it's replicants, removing and replacing them
58 when they fail/pass predefined criteria.  It is recommened that your application
59 define two schemas, one using the replicated storage and another that just 
60 connects to the master.
61
62 =head1 ATTRIBUTES
63
64 This class defines the following attributes.
65
66 =head2 pool_type
67
68 Contains the classname which will instantiate the L</pool> object.  Defaults 
69 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
70
71 =cut
72
73 has 'pool_type' => (
74   is=>'ro',
75   isa=>'ClassName',
76   lazy_build=>1,
77   handles=>{
78     'create_pool' => 'new',
79   },
80 );
81
82 =head2 pool_args
83
84 Contains a hashref of initialized information to pass to the Balancer object.
85 See L<DBIx::Class::Storage::Replicated::Pool> for available arguments.
86
87 =cut
88
89 has 'pool_args' => (
90   is=>'ro',
91   isa=>'HashRef',
92   lazy=>1,
93   required=>1,
94   default=>sub { {} },
95 );
96
97
98 =head2 balancer_type
99
100 The replication pool requires a balance class to provider the methods for
101 choose how to spread the query load across each replicant in the pool.
102
103 =cut
104
105 has 'balancer_type' => (
106   is=>'ro',
107   isa=>'ClassName',
108   lazy_build=>1,
109   handles=>{
110     'create_balancer' => 'new',
111   },
112 );
113
114 =head2 balancer_args
115
116 Contains a hashref of initialized information to pass to the Balancer object.
117 See L<DBIx::Class::Storage::Replicated::Balancer> for available arguments.
118
119 =cut
120
121 has 'balancer_args' => (
122   is=>'ro',
123   isa=>'HashRef',
124   lazy=>1,
125   required=>1,
126   default=>sub { {} },
127 );
128
129 =head2 pool
130
131 Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
132 container class for one or more replicated databases.
133
134 =cut
135
136 has 'pool' => (
137   is=>'ro',
138   isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
139   lazy_build=>1,
140   handles=>[qw/
141     connect_replicants    
142     replicants
143     has_replicants
144   /],
145 );
146
147 =head2 balancer
148
149 Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This 
150 is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
151
152 =cut
153
154 has 'balancer' => (
155   is=>'ro',
156   isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
157   lazy_build=>1,
158   handles=>[qw/auto_validate_every/],
159 );
160
161 =head2 master
162
163 The master defines the canonical state for a pool of connected databases.  All
164 the replicants are expected to match this databases state.  Thus, in a classic
165 Master / Slaves distributed system, all the slaves are expected to replicate
166 the Master's state as quick as possible.  This is the only database in the
167 pool of databases that is allowed to handle write traffic.
168
169 =cut
170
171 has 'master' => (
172   is=> 'ro',
173   isa=>'DBIx::Class::Storage::DBI',
174   lazy_build=>1,
175 );
176
177 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
178
179 The following methods are delegated all the methods required for the 
180 L<DBIx::Class::Storage::DBI> interface.
181
182 =head2 read_handler
183
184 Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
185
186 =cut
187
188 has 'read_handler' => (
189   is=>'rw',
190   isa=>'Object',
191   lazy_build=>1,
192   handles=>[qw/
193     select
194     select_single
195     columns_info_for
196   /],    
197 );
198
199 =head2 write_handler
200
201 Defines an object that implements the write side of L<BIx::Class::Storage::DBI>.
202
203 =cut
204
205 has 'write_handler' => (
206   is=>'ro',
207   isa=>'Object',
208   lazy_build=>1,
209   lazy_build=>1,
210   handles=>[qw/   
211     on_connect_do
212     on_disconnect_do       
213     connect_info
214     throw_exception
215     sql_maker
216     sqlt_type
217     create_ddl_dir
218     deployment_statements
219     datetime_parser
220     datetime_parser_type        
221     last_insert_id
222     insert
223     insert_bulk
224     update
225     delete
226     dbh
227     txn_do
228     txn_commit
229     txn_rollback
230     sth
231     deploy
232     schema
233     reload_row
234   /],
235 );
236
237 =head1 METHODS
238
239 This class defines the following methods.
240
241 =head2 new
242
243 L<DBIx::Class::Schema> when instantiating it's storage passed itself as the
244 first argument.  We need to invoke L</new> on the underlying parent class, make
245 sure we properly give it a L<Moose> meta class, and then correctly instantiate
246 our attributes.  Basically we pass on whatever the schema has in it's class
247 data for 'storage_type_args' to our replicated storage type.
248
249 =cut
250
251 sub new {
252   my $class = shift @_;
253   my $schema = shift @_;
254   my $storage_type_args = shift @_;
255   my $obj = $class->SUPER::new($schema, $storage_type_args, @_);
256   
257   ## Hate to do it this way, but can't seem to get advice on the attribute working right
258   ## maybe we can do a type and coercion for it. 
259   if( $storage_type_args->{balancer_type} && $storage_type_args->{balancer_type}=~m/^::/) {
260     $storage_type_args->{balancer_type} = 'DBIx::Class::Storage::DBI::Replicated::Balancer'.$storage_type_args->{balancer_type};
261     eval "require $storage_type_args->{balancer_type}";
262   }
263   
264   return $class->meta->new_object(
265     __INSTANCE__ => $obj,
266     %$storage_type_args,
267     @_,
268   );
269 }
270
271 =head2 _build_master
272
273 Lazy builder for the L</master> attribute.
274
275 =cut
276
277 sub _build_master {
278   DBIx::Class::Storage::DBI->new;
279 }
280
281 =head2 _build_pool_type
282
283 Lazy builder for the L</pool_type> attribute.
284
285 =cut
286
287 sub _build_pool_type {
288   return 'DBIx::Class::Storage::DBI::Replicated::Pool';
289 }
290
291 =head2 _build_pool
292
293 Lazy builder for the L</pool> attribute.
294
295 =cut
296
297 sub _build_pool {
298   my $self = shift @_;
299   $self->create_pool(%{$self->pool_args});
300 }
301
302 =head2 _build_balancer_type
303
304 Lazy builder for the L</balancer_type> attribute.
305
306 =cut
307
308 sub _build_balancer_type {
309   return 'DBIx::Class::Storage::DBI::Replicated::Balancer::First';
310 }
311
312 =head2 _build_balancer
313
314 Lazy builder for the L</balancer> attribute.  This takes a Pool object so that
315 the balancer knows which pool it's balancing.
316
317 =cut
318
319 sub _build_balancer {
320   my $self = shift @_;
321   $self->create_balancer(
322     pool=>$self->pool, 
323     master=>$self->master,
324     %{$self->balancer_args},
325   );
326 }
327
328 =head2 _build_write_handler
329
330 Lazy builder for the L</write_handler> attribute.  The default is to set this to
331 the L</master>.
332
333 =cut
334
335 sub _build_write_handler {
336   return shift->master;
337 }
338
339 =head2 _build_read_handler
340
341 Lazy builder for the L</read_handler> attribute.  The default is to set this to
342 the L</balancer>.
343
344 =cut
345
346 sub _build_read_handler {
347   return shift->balancer;
348 }
349
350 =head2 around: connect_replicants
351
352 All calls to connect_replicants needs to have an existing $schema tacked onto
353 top of the args, since L<DBIx::Storage::DBI> needs it.
354
355 =cut
356
357 around 'connect_replicants' => sub {
358   my ($method, $self, @args) = @_;
359   $self->$method($self->schema, @args);
360 };
361
362 =head2 all_storages
363
364 Returns an array of of all the connected storage backends.  The first element
365 in the returned array is the master, and the remainings are each of the
366 replicants.
367
368 =cut
369
370 sub all_storages {
371   my $self = shift @_;
372   return grep {defined $_ && blessed $_} (
373      $self->master,
374      $self->replicants,
375   );
376 }
377
378 =head2 execute_reliably ($coderef, ?@args)
379
380 Given a coderef, saves the current state of the L</read_handler>, forces it to
381 use reliable storage (ie sets it to the master), executes a coderef and then
382 restores the original state.
383
384 Example:
385
386   my $reliably = sub {
387     my $name = shift @_;
388     $schema->resultset('User')->create({name=>$name});
389     my $user_rs = $schema->resultset('User')->find({name=>$name}); 
390     return $user_rs;
391   };
392
393   my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
394
395 Use this when you must be certain of your database state, such as when you just
396 inserted something and need to get a resultset including it, etc.
397
398 =cut
399
400 sub execute_reliably {
401   my ($self, $coderef, @args) = @_;
402   
403   unless( ref $coderef eq 'CODE') {
404     $self->throw_exception('Second argument must be a coderef');
405   }
406   
407   ##Get copy of master storage
408   my $master = $self->master;
409   
410   ##Get whatever the current read hander is
411   my $current = $self->read_handler;
412   
413   ##Set the read handler to master
414   $self->read_handler($master);
415   
416   ## do whatever the caller needs
417   my @result;
418   my $want_array = wantarray;
419   
420   eval {
421     if($want_array) {
422       @result = $coderef->(@args);
423     } elsif(defined $want_array) {
424       ($result[0]) = ($coderef->(@args));
425     } else {
426       $coderef->(@args);
427     }       
428   };
429   
430   ##Reset to the original state
431   $self->read_handler($current); 
432   
433   ##Exception testing has to come last, otherwise you might leave the 
434   ##read_handler set to master.
435   
436   if($@) {
437     $self->throw_exception("coderef returned an error: $@");
438   } else {
439     return $want_array ? @result : $result[0];
440   }
441 }
442
443 =head2 set_reliable_storage
444
445 Sets the current $schema to be 'reliable', that is all queries, both read and
446 write are sent to the master
447   
448 =cut
449
450 sub set_reliable_storage {
451   my $self = shift @_;
452   my $schema = $self->schema;
453   my $write_handler = $self->schema->storage->write_handler;
454   
455   $schema->storage->read_handler($write_handler);
456 }
457
458 =head2 set_balanced_storage
459
460 Sets the current $schema to be use the </balancer> for all reads, while all
461 writea are sent to the master only
462   
463 =cut
464
465 sub set_balanced_storage {
466   my $self = shift @_;
467   my $schema = $self->schema;
468   my $write_handler = $self->schema->storage->balancer;
469   
470   $schema->storage->read_handler($write_handler);
471 }
472
473 =head2 around: txn_do ($coderef)
474
475 Overload to the txn_do method, which is delegated to whatever the
476 L<write_handler> is set to.  We overload this in order to wrap in inside a
477 L</execute_reliably> method.
478
479 =cut
480
481 around 'txn_do' => sub {
482   my($txn_do, $self, $coderef, @args) = @_;
483   $self->execute_reliably(sub {$self->$txn_do($coderef, @args)}); 
484 };
485
486 =head2 reload_row ($row)
487
488 Overload to the reload_row method so that the reloading is always directed to
489 the master storage.
490
491 =cut
492
493 around 'reload_row' => sub {
494   my ($reload_row, $self, $row) = @_;
495   return $self->execute_reliably(sub {
496     return $self->$reload_row(shift);
497   }, $row);
498 };
499
500 =head2 connected
501
502 Check that the master and at least one of the replicants is connected.
503
504 =cut
505
506 sub connected {
507   my $self = shift @_;
508   return
509     $self->master->connected &&
510     $self->pool->connected_replicants;
511 }
512
513 =head2 ensure_connected
514
515 Make sure all the storages are connected.
516
517 =cut
518
519 sub ensure_connected {
520   my $self = shift @_;
521   foreach my $source ($self->all_storages) {
522     $source->ensure_connected(@_);
523   }
524 }
525
526 =head2 limit_dialect
527
528 Set the limit_dialect for all existing storages
529
530 =cut
531
532 sub limit_dialect {
533   my $self = shift @_;
534   foreach my $source ($self->all_storages) {
535     $source->limit_dialect(@_);
536   }
537 }
538
539 =head2 quote_char
540
541 Set the quote_char for all existing storages
542
543 =cut
544
545 sub quote_char {
546   my $self = shift @_;
547   foreach my $source ($self->all_storages) {
548     $source->quote_char(@_);
549   }
550 }
551
552 =head2 name_sep
553
554 Set the name_sep for all existing storages
555
556 =cut
557
558 sub name_sep {
559   my $self = shift @_;
560   foreach my $source ($self->all_storages) {
561     $source->name_sep(@_);
562   }
563 }
564
565 =head2 set_schema
566
567 Set the schema object for all existing storages
568
569 =cut
570
571 sub set_schema {
572   my $self = shift @_;
573   foreach my $source ($self->all_storages) {
574     $source->set_schema(@_);
575   }
576 }
577
578 =head2 debug
579
580 set a debug flag across all storages
581
582 =cut
583
584 sub debug {
585   my $self = shift @_;
586   foreach my $source ($self->all_storages) {
587     $source->debug(@_);
588   }
589 }
590
591 =head2 debugobj
592
593 set a debug object across all storages
594
595 =cut
596
597 sub debugobj {
598   my $self = shift @_;
599   foreach my $source ($self->all_storages) {
600     $source->debugobj(@_);
601   }
602 }
603
604 =head2 debugfh
605
606 set a debugfh object across all storages
607
608 =cut
609
610 sub debugfh {
611   my $self = shift @_;
612   foreach my $source ($self->all_storages) {
613     $source->debugfh(@_);
614   }
615 }
616
617 =head2 debugcb
618
619 set a debug callback across all storages
620
621 =cut
622
623 sub debugcb {
624   my $self = shift @_;
625   foreach my $source ($self->all_storages) {
626     $source->debugcb(@_);
627   }
628 }
629
630 =head2 disconnect
631
632 disconnect everything
633
634 =cut
635
636 sub disconnect {
637   my $self = shift @_;
638   foreach my $source ($self->all_storages) {
639     $source->disconnect(@_);
640   }
641 }
642
643 =head1 AUTHOR
644
645   John Napiorkowski <john.napiorkowski@takkle.com>
646
647 Based on code originated by:
648
649   Norbert Csongrádi <bert@cpan.org>
650   Peter Siklósi <einon@einon.hu>
651
652 =head1 LICENSE
653
654 You may distribute this code under the same terms as Perl itself.
655
656 =cut
657
658 1;