all your tabs belong to spaces
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 use Moose;
4 use DBIx::Class::Storage::DBI;
5 use DBIx::Class::Storage::DBI::Replicated::Pool;
6 use DBIx::Class::Storage::DBI::Replicated::Balancer;
7 use Scalar::Util qw(blessed);
8
9 extends 'DBIx::Class::Storage::DBI', 'Moose::Object';
10
11 =head1 NAME
12
13 DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
14
15 =head1 SYNOPSIS
16
17 The Following example shows how to change an existing $schema to a replicated
18 storage type, add some replicated (readonly) databases, and perform reporting
19 tasks.
20
21     ## Change storage_type in your schema class
22     $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
23     
24     ## Add some slaves.  Basically this is an array of arrayrefs, where each
25     ## arrayref is database connect information
26     
27     $schema->storage->connect_replicants(
28         [$dsn1, $user, $pass, \%opts],
29         [$dsn1, $user, $pass, \%opts],
30         [$dsn1, $user, $pass, \%opts],
31     );
32     
33 =head1 DESCRIPTION
34
35 Warning: This class is marked ALPHA.  We are using this in development and have
36 some basic test coverage but the code hasn't yet been stressed by a variety
37 of databases.  Individual DB's may have quirks we are not aware of.  Please
38 use this in development and pass along your experiences/bug fixes.
39
40 This class implements replicated data store for DBI. Currently you can define
41 one master and numerous slave database connections. All write-type queries
42 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
43 database, all read-type queries (SELECTs) go to the slave database.
44
45 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
46 handle gets delegated to one of the two attributes: L</read_handler> or to
47 L</write_handler>.  Additionally, some methods need to be distributed
48 to all existing storages.  This way our storage class is a drop in replacement
49 for L<DBIx::Class::Storage::DBI>.
50
51 Read traffic is spread across the replicants (slaves) occuring to a user
52 selected algorithm.  The default algorithm is random weighted.
53
54 =head1 NOTES
55
56 The consistancy betweeen master and replicants is database specific.  The Pool
57 gives you a method to validate it's replicants, removing and replacing them
58 when they fail/pass predefined criteria.  It is recommened that your application
59 define two schemas, one using the replicated storage and another that just 
60 connects to the master.
61
62 =head1 ATTRIBUTES
63
64 This class defines the following attributes.
65
66 =head2 pool_type
67
68 Contains the classname which will instantiate the L</pool> object.  Defaults 
69 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
70
71 =cut
72
73 has 'pool_type' => (
74     is=>'ro',
75     isa=>'ClassName',
76     lazy_build=>1,
77     handles=>{
78         'create_pool' => 'new',
79     },
80 );
81
82 =head2 pool_args
83
84 Contains a hashref of initialized information to pass to the Balancer object.
85 See L<DBIx::Class::Storage::Replicated::Pool> for available arguments.
86
87 =cut
88
89 has 'pool_args' => (
90     is=>'ro',
91     isa=>'HashRef',
92     lazy=>1,
93     required=>1,
94     default=>sub { {} },
95 );
96
97
98 =head2 balancer_type
99
100 The replication pool requires a balance class to provider the methods for
101 choose how to spread the query load across each replicant in the pool.
102
103 =cut
104
105 has 'balancer_type' => (
106     is=>'ro',
107     isa=>'ClassName',
108     lazy_build=>1,
109     handles=>{
110         'create_balancer' => 'new',
111     },
112 );
113
114 =head2 balancer_args
115
116 Contains a hashref of initialized information to pass to the Balancer object.
117 See L<DBIx::Class::Storage::Replicated::Balancer> for available arguments.
118
119 =cut
120
121 has 'balancer_args' => (
122     is=>'ro',
123     isa=>'HashRef',
124     lazy=>1,
125     required=>1,
126     default=>sub { {} },
127 );
128
129 =head2 pool
130
131 Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
132 container class for one or more replicated databases.
133
134 =cut
135
136 has 'pool' => (
137     is=>'ro',
138     isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
139     lazy_build=>1,
140     handles=>[qw/
141         connect_replicants    
142         replicants
143         has_replicants
144     /],
145 );
146
147 =head2 balancer
148
149 Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This 
150 is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
151
152 =cut
153
154 has 'balancer' => (
155     is=>'ro',
156     isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
157     lazy_build=>1,
158     handles=>[qw/auto_validate_every/],
159 );
160
161 =head2 master
162
163 The master defines the canonical state for a pool of connected databases.  All
164 the replicants are expected to match this databases state.  Thus, in a classic
165 Master / Slaves distributed system, all the slaves are expected to replicate
166 the Master's state as quick as possible.  This is the only database in the
167 pool of databases that is allowed to handle write traffic.
168
169 =cut
170
171 has 'master' => (
172     is=> 'ro',
173     isa=>'DBIx::Class::Storage::DBI',
174     lazy_build=>1,
175 );
176
177 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
178
179 The following methods are delegated all the methods required for the 
180 L<DBIx::Class::Storage::DBI> interface.
181
182 =head2 read_handler
183
184 Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
185
186 =cut
187
188 has 'read_handler' => (
189     is=>'rw',
190     isa=>'Object',
191     lazy_build=>1,
192     handles=>[qw/
193         select
194         select_single
195         columns_info_for
196     /],    
197 );
198
199 =head2 write_handler
200
201 Defines an object that implements the write side of L<BIx::Class::Storage::DBI>.
202
203 =cut
204
205 has 'write_handler' => (
206     is=>'ro',
207     isa=>'Object',
208     lazy_build=>1,
209     lazy_build=>1,
210     handles=>[qw/   
211         on_connect_do
212         on_disconnect_do       
213         connect_info
214         throw_exception
215         sql_maker
216         sqlt_type
217         create_ddl_dir
218         deployment_statements
219         datetime_parser
220         datetime_parser_type        
221         last_insert_id
222         insert
223         insert_bulk
224         update
225         delete
226         dbh
227         txn_do
228         txn_commit
229         txn_rollback
230         sth
231         deploy
232         schema
233         reload_row
234     /],
235 );
236
237 =head1 METHODS
238
239 This class defines the following methods.
240
241 =head2 new
242
243 L<DBIx::Class::Schema> when instantiating it's storage passed itself as the
244 first argument.  We need to invoke L</new> on the underlying parent class, make
245 sure we properly give it a L<Moose> meta class, and then correctly instantiate
246 our attributes.  Basically we pass on whatever the schema has in it's class
247 data for 'storage_type_args' to our replicated storage type.
248
249 =cut
250
251 sub new {
252     my $class = shift @_;
253     my $schema = shift @_;
254     my $storage_type_args = shift @_;
255     my $obj = $class->SUPER::new($schema, $storage_type_args, @_);
256     
257     ## Hate to do it this way, but can't seem to get advice on the attribute working right
258     ## maybe we can do a type and coercion for it. 
259     if( $storage_type_args->{balancer_type} && $storage_type_args->{balancer_type}=~m/^::/) {
260         $storage_type_args->{balancer_type} = 'DBIx::Class::Storage::DBI::Replicated::Balancer'.$storage_type_args->{balancer_type};
261         eval "require $storage_type_args->{balancer_type}";
262     }
263     
264     return $class->meta->new_object(
265         __INSTANCE__ => $obj,
266         %$storage_type_args,
267         @_,
268     );
269 }
270
271 =head2 _build_master
272
273 Lazy builder for the L</master> attribute.
274
275 =cut
276
277 sub _build_master {
278     DBIx::Class::Storage::DBI->new;
279 }
280
281 =head2 _build_pool_type
282
283 Lazy builder for the L</pool_type> attribute.
284
285 =cut
286
287 sub _build_pool_type {
288     return 'DBIx::Class::Storage::DBI::Replicated::Pool';
289 }
290
291 =head2 _build_pool
292
293 Lazy builder for the L</pool> attribute.
294
295 =cut
296
297 sub _build_pool {
298     my $self = shift @_;
299     $self->create_pool(%{$self->pool_args});
300 }
301
302 =head2 _build_balancer_type
303
304 Lazy builder for the L</balancer_type> attribute.
305
306 =cut
307
308 sub _build_balancer_type {
309     return 'DBIx::Class::Storage::DBI::Replicated::Balancer::First';
310 }
311
312 =head2 _build_balancer
313
314 Lazy builder for the L</balancer> attribute.  This takes a Pool object so that
315 the balancer knows which pool it's balancing.
316
317 =cut
318
319 sub _build_balancer {
320     my $self = shift @_;
321     $self->create_balancer(
322         pool=>$self->pool, 
323         master=>$self->master,
324         %{$self->balancer_args},
325     );
326 }
327
328 =head2 _build_write_handler
329
330 Lazy builder for the L</write_handler> attribute.  The default is to set this to
331 the L</master>.
332
333 =cut
334
335 sub _build_write_handler {
336     return shift->master;
337 }
338
339 =head2 _build_read_handler
340
341 Lazy builder for the L</read_handler> attribute.  The default is to set this to
342 the L</balancer>.
343
344 =cut
345
346 sub _build_read_handler {
347     return shift->balancer;
348 }
349
350 =head2 around: connect_replicants
351
352 All calls to connect_replicants needs to have an existing $schema tacked onto
353 top of the args, since L<DBIx::Storage::DBI> needs it.
354
355 =cut
356
357 around 'connect_replicants' => sub {
358     my ($method, $self, @args) = @_;
359     $self->$method($self->schema, @args);
360 };
361
362 =head2 all_storages
363
364 Returns an array of of all the connected storage backends.  The first element
365 in the returned array is the master, and the remainings are each of the
366 replicants.
367
368 =cut
369
370 sub all_storages {
371     my $self = shift @_;
372     
373     return grep {defined $_ && blessed $_} (
374        $self->master,
375        $self->replicants,
376     );
377 }
378
379 =head2 execute_reliably ($coderef, ?@args)
380
381 Given a coderef, saves the current state of the L</read_handler>, forces it to
382 use reliable storage (ie sets it to the master), executes a coderef and then
383 restores the original state.
384
385 Example:
386
387     my $reliably = sub {
388         my $name = shift @_;
389         $schema->resultset('User')->create({name=>$name});
390         my $user_rs = $schema->resultset('User')->find({name=>$name}); 
391         return $user_rs;
392     };
393
394     my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
395
396 Use this when you must be certain of your database state, such as when you just
397 inserted something and need to get a resultset including it, etc.
398
399 =cut
400
401 sub execute_reliably {
402     my ($self, $coderef, @args) = @_;
403     
404     unless( ref $coderef eq 'CODE') {
405         $self->throw_exception('Second argument must be a coderef');
406     }
407
408     ##Get copy of master storage
409     my $master = $self->master;
410     
411     ##Get whatever the current read hander is
412     my $current = $self->read_handler;
413     
414     ##Set the read handler to master
415     $self->read_handler($master);
416     
417     ## do whatever the caller needs
418     my @result;
419     my $want_array = wantarray;
420     
421     eval {
422         if($want_array) {
423             @result = $coderef->(@args);
424         }
425         elsif(defined $want_array) {
426             ($result[0]) = ($coderef->(@args));
427         } else {
428             $coderef->(@args);
429         }       
430     };
431     
432     ##Reset to the original state
433     $self->read_handler($current); 
434     
435     ##Exception testing has to come last, otherwise you might leave the 
436     ##read_handler set to master.
437     
438     if($@) {
439         $self->throw_exception("coderef returned an error: $@");
440     } else {
441         return $want_array ? @result : $result[0];
442     }
443 }
444
445 =head2 set_reliable_storage
446
447 Sets the current $schema to be 'reliable', that is all queries, both read and
448 write are sent to the master
449     
450 =cut
451
452 sub set_reliable_storage {
453     my $self = shift @_;
454     my $schema = $self->schema;
455     my $write_handler = $self->schema->storage->write_handler;
456     
457     $schema->storage->read_handler($write_handler);
458 }
459
460 =head2 set_balanced_storage
461
462 Sets the current $schema to be use the </balancer> for all reads, while all
463 writea are sent to the master only
464     
465 =cut
466
467 sub set_balanced_storage {
468     my $self = shift @_;
469     my $schema = $self->schema;
470     my $write_handler = $self->schema->storage->balancer;
471     
472     $schema->storage->read_handler($write_handler);
473 }
474
475 =head2 around: txn_do ($coderef)
476
477 Overload to the txn_do method, which is delegated to whatever the
478 L<write_handler> is set to.  We overload this in order to wrap in inside a
479 L</execute_reliably> method.
480
481 =cut
482
483 around 'txn_do' => sub {
484     my($txn_do, $self, $coderef, @args) = @_;
485     $self->execute_reliably(sub {$self->$txn_do($coderef, @args)}); 
486 };
487
488 =head2 reload_row ($row)
489
490 Overload to the reload_row method so that the reloading is always directed to
491 the master storage.
492
493 =cut
494
495 around 'reload_row' => sub {
496     my ($reload_row, $self, $row) = @_;
497     return $self->execute_reliably(sub {
498         return $self->$reload_row(shift);
499     }, $row);
500 };
501
502 =head2 connected
503
504 Check that the master and at least one of the replicants is connected.
505
506 =cut
507
508 sub connected {
509     my $self = shift @_;
510     
511     return
512        $self->master->connected &&
513        $self->pool->connected_replicants;
514 }
515
516 =head2 ensure_connected
517
518 Make sure all the storages are connected.
519
520 =cut
521
522 sub ensure_connected {
523     my $self = shift @_;
524     foreach my $source ($self->all_storages) {
525         $source->ensure_connected(@_);
526     }
527 }
528
529 =head2 limit_dialect
530
531 Set the limit_dialect for all existing storages
532
533 =cut
534
535 sub limit_dialect {
536     my $self = shift @_;
537     foreach my $source ($self->all_storages) {
538         $source->limit_dialect(@_);
539     }
540 }
541
542 =head2 quote_char
543
544 Set the quote_char for all existing storages
545
546 =cut
547
548 sub quote_char {
549     my $self = shift @_;
550     foreach my $source ($self->all_storages) {
551         $source->quote_char(@_);
552     }
553 }
554
555 =head2 name_sep
556
557 Set the name_sep for all existing storages
558
559 =cut
560
561 sub name_sep {
562     my $self = shift @_;
563     foreach my $source ($self->all_storages) {
564         $source->name_sep(@_);
565     }
566 }
567
568 =head2 set_schema
569
570 Set the schema object for all existing storages
571
572 =cut
573
574 sub set_schema {
575     my $self = shift @_;
576     foreach my $source ($self->all_storages) {
577         $source->set_schema(@_);
578     }
579 }
580
581 =head2 debug
582
583 set a debug flag across all storages
584
585 =cut
586
587 sub debug {
588     my $self = shift @_;
589     foreach my $source ($self->all_storages) {
590         $source->debug(@_);
591     }
592 }
593
594 =head2 debugobj
595
596 set a debug object across all storages
597
598 =cut
599
600 sub debugobj {
601     my $self = shift @_;
602     foreach my $source ($self->all_storages) {
603         $source->debugobj(@_);
604     }
605 }
606
607 =head2 debugfh
608
609 set a debugfh object across all storages
610
611 =cut
612
613 sub debugfh {
614     my $self = shift @_;
615     foreach my $source ($self->all_storages) {
616         $source->debugfh(@_);
617     }
618 }
619
620 =head2 debugcb
621
622 set a debug callback across all storages
623
624 =cut
625
626 sub debugcb {
627     my $self = shift @_;
628     foreach my $source ($self->all_storages) {
629         $source->debugcb(@_);
630     }
631 }
632
633 =head2 disconnect
634
635 disconnect everything
636
637 =cut
638
639 sub disconnect {
640     my $self = shift @_;
641     foreach my $source ($self->all_storages) {
642         $source->disconnect(@_);
643     }
644 }
645
646 =head1 AUTHOR
647
648     John Napiorkowski <john.napiorkowski@takkle.com>
649
650 Based on code originated by:
651
652     Norbert Csongrádi <bert@cpan.org>
653     Peter Siklósi <einon@einon.hu>
654
655 =head1 LICENSE
656
657 You may distribute this code under the same terms as Perl itself.
658
659 =cut
660
661 1;