c95a0848b199204354157dec648dc030f235bd44
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 use Moose;
4 use DBIx::Class::Storage::DBI;
5 use DBIx::Class::Storage::DBI::Replicated::Pool;
6 use DBIx::Class::Storage::DBI::Replicated::Balancer;
7 use Scalar::Util qw(blessed);
8
9 extends 'DBIx::Class::Storage::DBI', 'Moose::Object';
10
11 =head1 NAME
12
13 DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
14
15 =head1 SYNOPSIS
16
17 The Following example shows how to change an existing $schema to a replicated
18 storage type, add some replicated (readonly) databases, and perform reporting
19 tasks.
20
21     ## Change storage_type in your schema class
22     $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
23     
24     ## Add some slaves.  Basically this is an array of arrayrefs, where each
25     ## arrayref is database connect information
26     
27     $schema->storage->connect_replicants(
28         [$dsn1, $user, $pass, \%opts],
29         [$dsn1, $user, $pass, \%opts],
30         [$dsn1, $user, $pass, \%opts],
31     );
32     
33 =head1 DESCRIPTION
34
35 Warning: This class is marked ALPHA.  We are using this in development and have
36 some basic test coverage but the code hasn't yet been stressed by a variety
37 of databases.  Individual DB's may have quirks we are not aware of.  Please
38 use this in development and pass along your experiences/bug fixes.
39
40 This class implements replicated data store for DBI. Currently you can define
41 one master and numerous slave database connections. All write-type queries
42 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
43 database, all read-type queries (SELECTs) go to the slave database.
44
45 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
46 handle gets delegated to one of the two attributes: L</read_handler> or to
47 L</write_handler>.  Additionally, some methods need to be distributed
48 to all existing storages.  This way our storage class is a drop in replacement
49 for L<DBIx::Class::Storage::DBI>.
50
51 Read traffic is spread across the replicants (slaves) occuring to a user
52 selected algorithm.  The default algorithm is random weighted.
53
54 =head1 NOTES
55
56 The consistancy betweeen master and replicants is database specific.  The Pool
57 gives you a method to validate it's replicants, removing and replacing them
58 when they fail/pass predefined criteria.  It is recommened that your application
59 define two schemas, one using the replicated storage and another that just 
60 connects to the master.
61
62 =head1 ATTRIBUTES
63
64 This class defines the following attributes.
65
66 =head2 pool_type
67
68 Contains the classname which will instantiate the L</pool> object.  Defaults 
69 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
70
71 =cut
72
73 has 'pool_type' => (
74     is=>'ro',
75     isa=>'ClassName',
76     lazy_build=>1,
77     handles=>{
78         'create_pool' => 'new',
79     },
80 );
81
82 =head2 pool_args
83
84 Contains a hashref of initialized information to pass to the Balancer object.
85 See L<DBIx::Class::Storage::Replicated::Pool> for available arguments.
86
87 =cut
88
89 has 'pool_args' => (
90     is=>'ro',
91     isa=>'HashRef',
92     lazy=>1,
93     required=>1,
94     default=>sub { {} },
95 );
96
97
98 =head2 balancer_type
99
100 The replication pool requires a balance class to provider the methods for
101 choose how to spread the query load across each replicant in the pool.
102
103 =cut
104
105 has 'balancer_type' => (
106     is=>'ro',
107     isa=>'ClassName',
108     lazy_build=>1,
109     handles=>{
110         'create_balancer' => 'new',
111     },
112 );
113
114 =head2 balancer_args
115
116 Contains a hashref of initialized information to pass to the Balancer object.
117 See L<DBIx::Class::Storage::Replicated::Balancer> for available arguments.
118
119 =cut
120
121 has 'balancer_args' => (
122     is=>'ro',
123     isa=>'HashRef',
124     lazy=>1,
125     required=>1,
126     default=>sub { {} },
127 );
128
129 =head2 pool
130
131 Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
132 container class for one or more replicated databases.
133
134 =cut
135
136 has 'pool' => (
137     is=>'ro',
138     isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
139     lazy_build=>1,
140     handles=>[qw/
141         connect_replicants    
142         replicants
143         has_replicants
144     /],
145 );
146
147 =head2 balancer
148
149 Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This 
150 is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
151
152 =cut
153
154 has 'balancer' => (
155     is=>'ro',
156     isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
157     lazy_build=>1,
158     handles=>[qw/auto_validate_every/],
159 );
160
161 =head2 master
162
163 The master defines the canonical state for a pool of connected databases.  All
164 the replicants are expected to match this databases state.  Thus, in a classic
165 Master / Slaves distributed system, all the slaves are expected to replicate
166 the Master's state as quick as possible.  This is the only database in the
167 pool of databases that is allowed to handle write traffic.
168
169 =cut
170
171 has 'master' => (
172     is=> 'ro',
173     isa=>'DBIx::Class::Storage::DBI',
174     lazy_build=>1,
175 );
176
177 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
178
179 The following methods are delegated all the methods required for the 
180 L<DBIx::Class::Storage::DBI> interface.
181
182 =head2 read_handler
183
184 Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
185
186 =cut
187
188 has 'read_handler' => (
189     is=>'rw',
190     isa=>'Object',
191     lazy_build=>1,
192     handles=>[qw/
193         select
194         select_single
195         columns_info_for
196     /],    
197 );
198
199 =head2 write_handler
200
201 Defines an object that implements the write side of L<BIx::Class::Storage::DBI>.
202
203 =cut
204
205 has 'write_handler' => (
206     is=>'ro',
207     isa=>'Object',
208     lazy_build=>1,
209     lazy_build=>1,
210     handles=>[qw/   
211         on_connect_do
212         on_disconnect_do       
213         connect_info
214         throw_exception
215         sql_maker
216         sqlt_type
217         create_ddl_dir
218         deployment_statements
219         datetime_parser
220         datetime_parser_type        
221         last_insert_id
222         insert
223         insert_bulk
224         update
225         delete
226         dbh
227         txn_commit
228         txn_rollback
229         sth
230         deploy
231         schema
232     /],
233 );
234
235 =head1 METHODS
236
237 This class defines the following methods.
238
239 =head2 new
240
241 L<DBIx::Class::Schema> when instantiating it's storage passed itself as the
242 first argument.  We need to invoke L</new> on the underlying parent class, make
243 sure we properly give it a L<Moose> meta class, and then correctly instantiate
244 our attributes.  Basically we pass on whatever the schema has in it's class
245 data for 'storage_type_args' to our replicated storage type.
246
247 =cut
248
249 sub new {
250     my $class = shift @_;
251     my $schema = shift @_;
252     my $storage_type_args = shift @_;
253     my $obj = $class->SUPER::new($schema, $storage_type_args, @_);
254     
255     ## Hate to do it this way, but can't seem to get advice on the attribute working right
256     ## maybe we can do a type and coercion for it. 
257     if( $storage_type_args->{balancer_type} && $storage_type_args->{balancer_type}=~m/^::/) {
258         $storage_type_args->{balancer_type} = 'DBIx::Class::Storage::DBI::Replicated::Balancer'.$storage_type_args->{balancer_type};
259         eval "require $storage_type_args->{balancer_type}";
260     }
261     
262     return $class->meta->new_object(
263         __INSTANCE__ => $obj,
264         %$storage_type_args,
265         @_,
266     );
267 }
268
269 =head2 _build_master
270
271 Lazy builder for the L</master> attribute.
272
273 =cut
274
275 sub _build_master {
276         DBIx::Class::Storage::DBI->new;
277 }
278
279 =head2 _build_pool_type
280
281 Lazy builder for the L</pool_type> attribute.
282
283 =cut
284
285 sub _build_pool_type {
286     return 'DBIx::Class::Storage::DBI::Replicated::Pool';
287 }
288
289 =head2 _build_pool
290
291 Lazy builder for the L</pool> attribute.
292
293 =cut
294
295 sub _build_pool {
296         my $self = shift @_;
297     $self->create_pool(%{$self->pool_args});
298 }
299
300 =head2 _build_balancer_type
301
302 Lazy builder for the L</balancer_type> attribute.
303
304 =cut
305
306 sub _build_balancer_type {
307     return 'DBIx::Class::Storage::DBI::Replicated::Balancer::First';
308 }
309
310 =head2 _build_balancer
311
312 Lazy builder for the L</balancer> attribute.  This takes a Pool object so that
313 the balancer knows which pool it's balancing.
314
315 =cut
316
317 sub _build_balancer {
318     my $self = shift @_;
319     $self->create_balancer(
320         pool=>$self->pool, 
321         master=>$self->master,
322         %{$self->balancer_args},);
323 }
324
325 =head2 _build_write_handler
326
327 Lazy builder for the L</write_handler> attribute.  The default is to set this to
328 the L</master>.
329
330 =cut
331
332 sub _build_write_handler {
333     return shift->master;
334 }
335
336 =head2 _build_read_handler
337
338 Lazy builder for the L</read_handler> attribute.  The default is to set this to
339 the L</balancer>.
340
341 =cut
342
343 sub _build_read_handler {
344     return shift->balancer;
345 }
346
347 =head2 around: connect_replicants
348
349 All calls to connect_replicants needs to have an existing $schema tacked onto
350 top of the args, since L<DBIx::Storage::DBI> needs it.
351
352 =cut
353
354 around 'connect_replicants' => sub {
355         my ($method, $self, @args) = @_;
356         $self->$method($self->schema, @args);
357 };
358
359 =head2 all_storages
360
361 Returns an array of of all the connected storage backends.  The first element
362 in the returned array is the master, and the remainings are each of the
363 replicants.
364
365 =cut
366
367 sub all_storages {
368         my $self = shift @_;
369         
370         return grep {defined $_ && blessed $_} (
371            $self->master,
372            $self->replicants,
373         );
374 }
375
376 =head2 execute_reliably ($coderef, ?@args)
377
378 Given a coderef, saves the current state of the L</read_handler>, forces it to
379 use reliable storage (ie sets it to the master), executes a coderef and then
380 restores the original state.
381
382 Example:
383
384     my $reliably = sub {
385         my $name = shift @_;
386         $schema->resultset('User')->create({name=>$name});
387         my $user_rs = $schema->resultset('User')->find({name=>$name}); 
388     };
389
390     $schema->storage->execute_reliably($reliably, 'John');
391
392 Use this when you must be certain of your database state, such as when you just
393 inserted something and need to get a resultset including it, etc.
394
395 =cut
396
397 sub execute_reliably {
398     my ($self, $coderef, @args) = @_;
399         
400     unless( ref $coderef eq 'CODE') {
401         $self->throw_exception('Second argument must be a coderef');
402     }
403
404     ##Get copy of master storage
405     my $master = $self->master;
406     
407     ##Get whatever the current read hander is
408     my $current = $self->read_handler;
409     
410     ##Set the read handler to master
411     $self->read_handler($master);
412     
413     ## do whatever the caller needs
414     eval {
415         $coderef->(@args);
416     };
417     
418     if($@) {
419         $self->throw_exception("coderef returned an error: $@");
420     }
421   
422     ##Reset to the original state
423     $self->schema->storage->read_handler($current);     
424 }
425
426 =head2 set_reliable_storage
427
428 Sets the current $schema to be 'reliable', that is all queries, both read and
429 write are sent to the master
430     
431 =cut
432
433 sub set_reliable_storage {
434         my $self = shift @_;
435         my $schema = $self->schema;
436         my $write_handler = $self->schema->storage->write_handler;
437         
438         $schema->storage->read_handler($write_handler);
439 }
440
441 =head2 set_balanced_storage
442
443 Sets the current $schema to be use the </balancer> for all reads, while all
444 writea are sent to the master only
445     
446 =cut
447
448 sub set_balanced_storage {
449     my $self = shift @_;
450     my $schema = $self->schema;
451     my $write_handler = $self->schema->storage->balancer;
452     
453     $schema->storage->read_handler($write_handler);
454 }
455
456 =head2 txn_do ($coderef)
457
458 Overload to the txn_do method, which is delegated to whatever the
459 L<write_handler> is set to.  We overload this in order to wrap in inside a
460 L</execute_reliably> method.
461
462 =cut
463
464 sub txn_do {
465         my($self, $coderef, @args) = @_;
466         $self->execute_reliably($coderef, @args);
467 }
468
469 =head2 connected
470
471 Check that the master and at least one of the replicants is connected.
472
473 =cut
474
475 sub connected {
476         my $self = shift @_;
477         
478         return
479            $self->master->connected &&
480            $self->pool->connected_replicants;
481 }
482
483 =head2 ensure_connected
484
485 Make sure all the storages are connected.
486
487 =cut
488
489 sub ensure_connected {
490     my $self = shift @_;
491     foreach my $source ($self->all_storages) {
492         $source->ensure_connected(@_);
493     }
494 }
495
496 =head2 limit_dialect
497
498 Set the limit_dialect for all existing storages
499
500 =cut
501
502 sub limit_dialect {
503     my $self = shift @_;
504     foreach my $source ($self->all_storages) {
505         $source->limit_dialect(@_);
506     }
507 }
508
509 =head2 quote_char
510
511 Set the quote_char for all existing storages
512
513 =cut
514
515 sub quote_char {
516     my $self = shift @_;
517     foreach my $source ($self->all_storages) {
518         $source->quote_char(@_);
519     }
520 }
521
522 =head2 name_sep
523
524 Set the name_sep for all existing storages
525
526 =cut
527
528 sub name_sep {
529     my $self = shift @_;
530     foreach my $source ($self->all_storages) {
531         $source->name_sep(@_);
532     }
533 }
534
535 =head2 set_schema
536
537 Set the schema object for all existing storages
538
539 =cut
540
541 sub set_schema {
542         my $self = shift @_;
543         foreach my $source ($self->all_storages) {
544                 $source->set_schema(@_);
545         }
546 }
547
548 =head2 debug
549
550 set a debug flag across all storages
551
552 =cut
553
554 sub debug {
555     my $self = shift @_;
556     foreach my $source ($self->all_storages) {
557         $source->debug(@_);
558     }
559 }
560
561 =head2 debugobj
562
563 set a debug object across all storages
564
565 =cut
566
567 sub debugobj {
568     my $self = shift @_;
569     foreach my $source ($self->all_storages) {
570         $source->debugobj(@_);
571     }
572 }
573
574 =head2 debugfh
575
576 set a debugfh object across all storages
577
578 =cut
579
580 sub debugfh {
581     my $self = shift @_;
582     foreach my $source ($self->all_storages) {
583         $source->debugfh(@_);
584     }
585 }
586
587 =head2 debugcb
588
589 set a debug callback across all storages
590
591 =cut
592
593 sub debugcb {
594     my $self = shift @_;
595     foreach my $source ($self->all_storages) {
596         $source->debugcb(@_);
597     }
598 }
599
600 =head2 disconnect
601
602 disconnect everything
603
604 =cut
605
606 sub disconnect {
607     my $self = shift @_;
608     foreach my $source ($self->all_storages) {
609         $source->disconnect(@_);
610     }
611 }
612
613 =head1 AUTHOR
614
615     John Napiorkowski <john.napiorkowski@takkle.com>
616
617 Based on code originated by:
618
619     Norbert Csongrádi <bert@cpan.org>
620     Peter Siklósi <einon@einon.hu>
621
622 =head1 LICENSE
623
624 You may distribute this code under the same terms as Perl itself.
625
626 =cut
627
628 1;