changed Storage->reload_row to do less, reverted some behavior to PK->discard_changes...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 use Moose;
4 use DBIx::Class::Storage::DBI;
5 use DBIx::Class::Storage::DBI::Replicated::Pool;
6 use DBIx::Class::Storage::DBI::Replicated::Balancer;
7 use Scalar::Util qw(blessed);
8
9 extends 'DBIx::Class::Storage::DBI', 'Moose::Object';
10
11 =head1 NAME
12
13 DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
14
15 =head1 SYNOPSIS
16
17 The Following example shows how to change an existing $schema to a replicated
18 storage type, add some replicated (readonly) databases, and perform reporting
19 tasks.
20
21     ## Change storage_type in your schema class
22     $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
23     
24     ## Add some slaves.  Basically this is an array of arrayrefs, where each
25     ## arrayref is database connect information
26     
27     $schema->storage->connect_replicants(
28         [$dsn1, $user, $pass, \%opts],
29         [$dsn1, $user, $pass, \%opts],
30         [$dsn1, $user, $pass, \%opts],
31     );
32     
33 =head1 DESCRIPTION
34
35 Warning: This class is marked ALPHA.  We are using this in development and have
36 some basic test coverage but the code hasn't yet been stressed by a variety
37 of databases.  Individual DB's may have quirks we are not aware of.  Please
38 use this in development and pass along your experiences/bug fixes.
39
40 This class implements replicated data store for DBI. Currently you can define
41 one master and numerous slave database connections. All write-type queries
42 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
43 database, all read-type queries (SELECTs) go to the slave database.
44
45 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
46 handle gets delegated to one of the two attributes: L</read_handler> or to
47 L</write_handler>.  Additionally, some methods need to be distributed
48 to all existing storages.  This way our storage class is a drop in replacement
49 for L<DBIx::Class::Storage::DBI>.
50
51 Read traffic is spread across the replicants (slaves) occuring to a user
52 selected algorithm.  The default algorithm is random weighted.
53
54 =head1 NOTES
55
56 The consistancy betweeen master and replicants is database specific.  The Pool
57 gives you a method to validate it's replicants, removing and replacing them
58 when they fail/pass predefined criteria.  It is recommened that your application
59 define two schemas, one using the replicated storage and another that just 
60 connects to the master.
61
62 =head1 ATTRIBUTES
63
64 This class defines the following attributes.
65
66 =head2 pool_type
67
68 Contains the classname which will instantiate the L</pool> object.  Defaults 
69 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
70
71 =cut
72
73 has 'pool_type' => (
74     is=>'ro',
75     isa=>'ClassName',
76     lazy_build=>1,
77     handles=>{
78         'create_pool' => 'new',
79     },
80 );
81
82 =head2 pool_args
83
84 Contains a hashref of initialized information to pass to the Balancer object.
85 See L<DBIx::Class::Storage::Replicated::Pool> for available arguments.
86
87 =cut
88
89 has 'pool_args' => (
90     is=>'ro',
91     isa=>'HashRef',
92     lazy=>1,
93     required=>1,
94     default=>sub { {} },
95 );
96
97
98 =head2 balancer_type
99
100 The replication pool requires a balance class to provider the methods for
101 choose how to spread the query load across each replicant in the pool.
102
103 =cut
104
105 has 'balancer_type' => (
106     is=>'ro',
107     isa=>'ClassName',
108     lazy_build=>1,
109     handles=>{
110         'create_balancer' => 'new',
111     },
112 );
113
114 =head2 balancer_args
115
116 Contains a hashref of initialized information to pass to the Balancer object.
117 See L<DBIx::Class::Storage::Replicated::Balancer> for available arguments.
118
119 =cut
120
121 has 'balancer_args' => (
122     is=>'ro',
123     isa=>'HashRef',
124     lazy=>1,
125     required=>1,
126     default=>sub { {} },
127 );
128
129 =head2 pool
130
131 Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
132 container class for one or more replicated databases.
133
134 =cut
135
136 has 'pool' => (
137     is=>'ro',
138     isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
139     lazy_build=>1,
140     handles=>[qw/
141         connect_replicants    
142         replicants
143         has_replicants
144     /],
145 );
146
147 =head2 balancer
148
149 Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This 
150 is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
151
152 =cut
153
154 has 'balancer' => (
155     is=>'ro',
156     isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
157     lazy_build=>1,
158     handles=>[qw/auto_validate_every/],
159 );
160
161 =head2 master
162
163 The master defines the canonical state for a pool of connected databases.  All
164 the replicants are expected to match this databases state.  Thus, in a classic
165 Master / Slaves distributed system, all the slaves are expected to replicate
166 the Master's state as quick as possible.  This is the only database in the
167 pool of databases that is allowed to handle write traffic.
168
169 =cut
170
171 has 'master' => (
172     is=> 'ro',
173     isa=>'DBIx::Class::Storage::DBI',
174     lazy_build=>1,
175 );
176
177 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
178
179 The following methods are delegated all the methods required for the 
180 L<DBIx::Class::Storage::DBI> interface.
181
182 =head2 read_handler
183
184 Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
185
186 =cut
187
188 has 'read_handler' => (
189     is=>'rw',
190     isa=>'Object',
191     lazy_build=>1,
192     handles=>[qw/
193         select
194         select_single
195         columns_info_for
196     /],    
197 );
198
199 =head2 write_handler
200
201 Defines an object that implements the write side of L<BIx::Class::Storage::DBI>.
202
203 =cut
204
205 has 'write_handler' => (
206     is=>'ro',
207     isa=>'Object',
208     lazy_build=>1,
209     lazy_build=>1,
210     handles=>[qw/   
211         on_connect_do
212         on_disconnect_do       
213         connect_info
214         throw_exception
215         sql_maker
216         sqlt_type
217         create_ddl_dir
218         deployment_statements
219         datetime_parser
220         datetime_parser_type        
221         last_insert_id
222         insert
223         insert_bulk
224         update
225         delete
226         dbh
227         txn_commit
228         txn_rollback
229         sth
230         deploy
231         schema
232         reload_row
233     /],
234 );
235
236 =head1 METHODS
237
238 This class defines the following methods.
239
240 =head2 new
241
242 L<DBIx::Class::Schema> when instantiating it's storage passed itself as the
243 first argument.  We need to invoke L</new> on the underlying parent class, make
244 sure we properly give it a L<Moose> meta class, and then correctly instantiate
245 our attributes.  Basically we pass on whatever the schema has in it's class
246 data for 'storage_type_args' to our replicated storage type.
247
248 =cut
249
250 sub new {
251     my $class = shift @_;
252     my $schema = shift @_;
253     my $storage_type_args = shift @_;
254     my $obj = $class->SUPER::new($schema, $storage_type_args, @_);
255     
256     ## Hate to do it this way, but can't seem to get advice on the attribute working right
257     ## maybe we can do a type and coercion for it. 
258     if( $storage_type_args->{balancer_type} && $storage_type_args->{balancer_type}=~m/^::/) {
259         $storage_type_args->{balancer_type} = 'DBIx::Class::Storage::DBI::Replicated::Balancer'.$storage_type_args->{balancer_type};
260         eval "require $storage_type_args->{balancer_type}";
261     }
262     
263     return $class->meta->new_object(
264         __INSTANCE__ => $obj,
265         %$storage_type_args,
266         @_,
267     );
268 }
269
270 =head2 _build_master
271
272 Lazy builder for the L</master> attribute.
273
274 =cut
275
276 sub _build_master {
277         DBIx::Class::Storage::DBI->new;
278 }
279
280 =head2 _build_pool_type
281
282 Lazy builder for the L</pool_type> attribute.
283
284 =cut
285
286 sub _build_pool_type {
287     return 'DBIx::Class::Storage::DBI::Replicated::Pool';
288 }
289
290 =head2 _build_pool
291
292 Lazy builder for the L</pool> attribute.
293
294 =cut
295
296 sub _build_pool {
297         my $self = shift @_;
298     $self->create_pool(%{$self->pool_args});
299 }
300
301 =head2 _build_balancer_type
302
303 Lazy builder for the L</balancer_type> attribute.
304
305 =cut
306
307 sub _build_balancer_type {
308     return 'DBIx::Class::Storage::DBI::Replicated::Balancer::First';
309 }
310
311 =head2 _build_balancer
312
313 Lazy builder for the L</balancer> attribute.  This takes a Pool object so that
314 the balancer knows which pool it's balancing.
315
316 =cut
317
318 sub _build_balancer {
319     my $self = shift @_;
320     $self->create_balancer(
321         pool=>$self->pool, 
322         master=>$self->master,
323         %{$self->balancer_args},);
324 }
325
326 =head2 _build_write_handler
327
328 Lazy builder for the L</write_handler> attribute.  The default is to set this to
329 the L</master>.
330
331 =cut
332
333 sub _build_write_handler {
334     return shift->master;
335 }
336
337 =head2 _build_read_handler
338
339 Lazy builder for the L</read_handler> attribute.  The default is to set this to
340 the L</balancer>.
341
342 =cut
343
344 sub _build_read_handler {
345     return shift->balancer;
346 }
347
348 =head2 around: connect_replicants
349
350 All calls to connect_replicants needs to have an existing $schema tacked onto
351 top of the args, since L<DBIx::Storage::DBI> needs it.
352
353 =cut
354
355 around 'connect_replicants' => sub {
356         my ($method, $self, @args) = @_;
357         $self->$method($self->schema, @args);
358 };
359
360 =head2 all_storages
361
362 Returns an array of of all the connected storage backends.  The first element
363 in the returned array is the master, and the remainings are each of the
364 replicants.
365
366 =cut
367
368 sub all_storages {
369         my $self = shift @_;
370         
371         return grep {defined $_ && blessed $_} (
372            $self->master,
373            $self->replicants,
374         );
375 }
376
377 =head2 execute_reliably ($coderef, ?@args)
378
379 Given a coderef, saves the current state of the L</read_handler>, forces it to
380 use reliable storage (ie sets it to the master), executes a coderef and then
381 restores the original state.
382
383 Example:
384
385     my $reliably = sub {
386         my $name = shift @_;
387         $schema->resultset('User')->create({name=>$name});
388         my $user_rs = $schema->resultset('User')->find({name=>$name}); 
389         return $user_rs;
390     };
391
392     my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
393
394 Use this when you must be certain of your database state, such as when you just
395 inserted something and need to get a resultset including it, etc.
396
397 =cut
398
399 sub execute_reliably {
400     my ($self, $coderef, @args) = @_;
401         
402     unless( ref $coderef eq 'CODE') {
403         $self->throw_exception('Second argument must be a coderef');
404     }
405
406     ##Get copy of master storage
407     my $master = $self->master;
408     
409     ##Get whatever the current read hander is
410     my $current = $self->read_handler;
411     
412     ##Set the read handler to master
413     $self->read_handler($master);
414     
415     ## do whatever the caller needs
416     my @result;
417     my $want_array = wantarray;
418     
419     eval {
420             if($want_array) {
421                 @result = $coderef->(@args);
422             }
423             elsif(defined $want_array) {
424                 ($result[0]) = ($coderef->(@args));
425             } else {
426                 $coderef->(@args);
427             }           
428     };
429     
430     ##Reset to the original state
431     $self->schema->storage->read_handler($current); 
432     
433     ##Exception testing has to come last, otherwise you might leave the 
434     ##read_handler set to master.
435     
436     if($@) {
437         $self->throw_exception("coderef returned an error: $@");
438     } else {
439         return $want_array ? @result : $result[0];
440     }
441 }
442
443 =head2 set_reliable_storage
444
445 Sets the current $schema to be 'reliable', that is all queries, both read and
446 write are sent to the master
447     
448 =cut
449
450 sub set_reliable_storage {
451         my $self = shift @_;
452         my $schema = $self->schema;
453         my $write_handler = $self->schema->storage->write_handler;
454         
455         $schema->storage->read_handler($write_handler);
456 }
457
458 =head2 set_balanced_storage
459
460 Sets the current $schema to be use the </balancer> for all reads, while all
461 writea are sent to the master only
462     
463 =cut
464
465 sub set_balanced_storage {
466     my $self = shift @_;
467     my $schema = $self->schema;
468     my $write_handler = $self->schema->storage->balancer;
469     
470     $schema->storage->read_handler($write_handler);
471 }
472
473 =head2 txn_do ($coderef)
474
475 Overload to the txn_do method, which is delegated to whatever the
476 L<write_handler> is set to.  We overload this in order to wrap in inside a
477 L</execute_reliably> method.
478
479 =cut
480
481 sub txn_do {
482         my($self, $coderef, @args) = @_;
483         $self->execute_reliably($coderef, @args);
484 }
485
486 =head2 reload_row ($row)
487
488 Overload to the reload_row method so that the reloading is always directed to
489 the master storage.
490
491 =cut
492
493 around 'reload_row' => sub {
494         my ($reload_row, $self, $row) = @_;
495         return $self->execute_reliably(sub {
496                 return $self->$reload_row(shift);
497         }, $row);
498 };
499
500 =head2 connected
501
502 Check that the master and at least one of the replicants is connected.
503
504 =cut
505
506 sub connected {
507         my $self = shift @_;
508         
509         return
510            $self->master->connected &&
511            $self->pool->connected_replicants;
512 }
513
514 =head2 ensure_connected
515
516 Make sure all the storages are connected.
517
518 =cut
519
520 sub ensure_connected {
521     my $self = shift @_;
522     foreach my $source ($self->all_storages) {
523         $source->ensure_connected(@_);
524     }
525 }
526
527 =head2 limit_dialect
528
529 Set the limit_dialect for all existing storages
530
531 =cut
532
533 sub limit_dialect {
534     my $self = shift @_;
535     foreach my $source ($self->all_storages) {
536         $source->limit_dialect(@_);
537     }
538 }
539
540 =head2 quote_char
541
542 Set the quote_char for all existing storages
543
544 =cut
545
546 sub quote_char {
547     my $self = shift @_;
548     foreach my $source ($self->all_storages) {
549         $source->quote_char(@_);
550     }
551 }
552
553 =head2 name_sep
554
555 Set the name_sep for all existing storages
556
557 =cut
558
559 sub name_sep {
560     my $self = shift @_;
561     foreach my $source ($self->all_storages) {
562         $source->name_sep(@_);
563     }
564 }
565
566 =head2 set_schema
567
568 Set the schema object for all existing storages
569
570 =cut
571
572 sub set_schema {
573         my $self = shift @_;
574         foreach my $source ($self->all_storages) {
575                 $source->set_schema(@_);
576         }
577 }
578
579 =head2 debug
580
581 set a debug flag across all storages
582
583 =cut
584
585 sub debug {
586     my $self = shift @_;
587     foreach my $source ($self->all_storages) {
588         $source->debug(@_);
589     }
590 }
591
592 =head2 debugobj
593
594 set a debug object across all storages
595
596 =cut
597
598 sub debugobj {
599     my $self = shift @_;
600     foreach my $source ($self->all_storages) {
601         $source->debugobj(@_);
602     }
603 }
604
605 =head2 debugfh
606
607 set a debugfh object across all storages
608
609 =cut
610
611 sub debugfh {
612     my $self = shift @_;
613     foreach my $source ($self->all_storages) {
614         $source->debugfh(@_);
615     }
616 }
617
618 =head2 debugcb
619
620 set a debug callback across all storages
621
622 =cut
623
624 sub debugcb {
625     my $self = shift @_;
626     foreach my $source ($self->all_storages) {
627         $source->debugcb(@_);
628     }
629 }
630
631 =head2 disconnect
632
633 disconnect everything
634
635 =cut
636
637 sub disconnect {
638     my $self = shift @_;
639     foreach my $source ($self->all_storages) {
640         $source->disconnect(@_);
641     }
642 }
643
644 =head1 AUTHOR
645
646     John Napiorkowski <john.napiorkowski@takkle.com>
647
648 Based on code originated by:
649
650     Norbert Csongrádi <bert@cpan.org>
651     Peter Siklósi <einon@einon.hu>
652
653 =head1 LICENSE
654
655 You may distribute this code under the same terms as Perl itself.
656
657 =cut
658
659 1;