fixed boneheaded failure to properly propogate txn_do
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 use Moose;
4 use DBIx::Class::Storage::DBI;
5 use DBIx::Class::Storage::DBI::Replicated::Pool;
6 use DBIx::Class::Storage::DBI::Replicated::Balancer;
7 use Scalar::Util qw(blessed);
8
9 extends 'DBIx::Class::Storage::DBI', 'Moose::Object';
10
11 =head1 NAME
12
13 DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
14
15 =head1 SYNOPSIS
16
17 The Following example shows how to change an existing $schema to a replicated
18 storage type, add some replicated (readonly) databases, and perform reporting
19 tasks.
20
21     ## Change storage_type in your schema class
22     $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
23     
24     ## Add some slaves.  Basically this is an array of arrayrefs, where each
25     ## arrayref is database connect information
26     
27     $schema->storage->connect_replicants(
28         [$dsn1, $user, $pass, \%opts],
29         [$dsn1, $user, $pass, \%opts],
30         [$dsn1, $user, $pass, \%opts],
31     );
32     
33 =head1 DESCRIPTION
34
35 Warning: This class is marked ALPHA.  We are using this in development and have
36 some basic test coverage but the code hasn't yet been stressed by a variety
37 of databases.  Individual DB's may have quirks we are not aware of.  Please
38 use this in development and pass along your experiences/bug fixes.
39
40 This class implements replicated data store for DBI. Currently you can define
41 one master and numerous slave database connections. All write-type queries
42 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
43 database, all read-type queries (SELECTs) go to the slave database.
44
45 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
46 handle gets delegated to one of the two attributes: L</read_handler> or to
47 L</write_handler>.  Additionally, some methods need to be distributed
48 to all existing storages.  This way our storage class is a drop in replacement
49 for L<DBIx::Class::Storage::DBI>.
50
51 Read traffic is spread across the replicants (slaves) occuring to a user
52 selected algorithm.  The default algorithm is random weighted.
53
54 =head1 NOTES
55
56 The consistancy betweeen master and replicants is database specific.  The Pool
57 gives you a method to validate it's replicants, removing and replacing them
58 when they fail/pass predefined criteria.  It is recommened that your application
59 define two schemas, one using the replicated storage and another that just 
60 connects to the master.
61
62 =head1 ATTRIBUTES
63
64 This class defines the following attributes.
65
66 =head2 pool_type
67
68 Contains the classname which will instantiate the L</pool> object.  Defaults 
69 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
70
71 =cut
72
73 has 'pool_type' => (
74     is=>'ro',
75     isa=>'ClassName',
76     lazy_build=>1,
77     handles=>{
78         'create_pool' => 'new',
79     },
80 );
81
82 =head2 pool_args
83
84 Contains a hashref of initialized information to pass to the Balancer object.
85 See L<DBIx::Class::Storage::Replicated::Pool> for available arguments.
86
87 =cut
88
89 has 'pool_args' => (
90     is=>'ro',
91     isa=>'HashRef',
92     lazy=>1,
93     required=>1,
94     default=>sub { {} },
95 );
96
97
98 =head2 balancer_type
99
100 The replication pool requires a balance class to provider the methods for
101 choose how to spread the query load across each replicant in the pool.
102
103 =cut
104
105 has 'balancer_type' => (
106     is=>'ro',
107     isa=>'ClassName',
108     lazy_build=>1,
109     handles=>{
110         'create_balancer' => 'new',
111     },
112 );
113
114 =head2 balancer_args
115
116 Contains a hashref of initialized information to pass to the Balancer object.
117 See L<DBIx::Class::Storage::Replicated::Balancer> for available arguments.
118
119 =cut
120
121 has 'balancer_args' => (
122     is=>'ro',
123     isa=>'HashRef',
124     lazy=>1,
125     required=>1,
126     default=>sub { {} },
127 );
128
129 =head2 pool
130
131 Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
132 container class for one or more replicated databases.
133
134 =cut
135
136 has 'pool' => (
137     is=>'ro',
138     isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
139     lazy_build=>1,
140     handles=>[qw/
141         connect_replicants    
142         replicants
143         has_replicants
144     /],
145 );
146
147 =head2 balancer
148
149 Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This 
150 is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
151
152 =cut
153
154 has 'balancer' => (
155     is=>'ro',
156     isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
157     lazy_build=>1,
158     handles=>[qw/auto_validate_every/],
159 );
160
161 =head2 master
162
163 The master defines the canonical state for a pool of connected databases.  All
164 the replicants are expected to match this databases state.  Thus, in a classic
165 Master / Slaves distributed system, all the slaves are expected to replicate
166 the Master's state as quick as possible.  This is the only database in the
167 pool of databases that is allowed to handle write traffic.
168
169 =cut
170
171 has 'master' => (
172     is=> 'ro',
173     isa=>'DBIx::Class::Storage::DBI',
174     lazy_build=>1,
175 );
176
177 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
178
179 The following methods are delegated all the methods required for the 
180 L<DBIx::Class::Storage::DBI> interface.
181
182 =head2 read_handler
183
184 Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
185
186 =cut
187
188 has 'read_handler' => (
189     is=>'rw',
190     isa=>'Object',
191     lazy_build=>1,
192     handles=>[qw/
193         select
194         select_single
195         columns_info_for
196     /],    
197 );
198
199 =head2 write_handler
200
201 Defines an object that implements the write side of L<BIx::Class::Storage::DBI>.
202
203 =cut
204
205 has 'write_handler' => (
206     is=>'ro',
207     isa=>'Object',
208     lazy_build=>1,
209     lazy_build=>1,
210     handles=>[qw/   
211         on_connect_do
212         on_disconnect_do       
213         connect_info
214         throw_exception
215         sql_maker
216         sqlt_type
217         create_ddl_dir
218         deployment_statements
219         datetime_parser
220         datetime_parser_type        
221         last_insert_id
222         insert
223         insert_bulk
224         update
225         delete
226         dbh
227         txn_do
228         txn_commit
229         txn_rollback
230         sth
231         deploy
232         schema
233         reload_row
234     /],
235 );
236
237 =head1 METHODS
238
239 This class defines the following methods.
240
241 =head2 new
242
243 L<DBIx::Class::Schema> when instantiating it's storage passed itself as the
244 first argument.  We need to invoke L</new> on the underlying parent class, make
245 sure we properly give it a L<Moose> meta class, and then correctly instantiate
246 our attributes.  Basically we pass on whatever the schema has in it's class
247 data for 'storage_type_args' to our replicated storage type.
248
249 =cut
250
251 sub new {
252     my $class = shift @_;
253     my $schema = shift @_;
254     my $storage_type_args = shift @_;
255     my $obj = $class->SUPER::new($schema, $storage_type_args, @_);
256     
257     ## Hate to do it this way, but can't seem to get advice on the attribute working right
258     ## maybe we can do a type and coercion for it. 
259     if( $storage_type_args->{balancer_type} && $storage_type_args->{balancer_type}=~m/^::/) {
260         $storage_type_args->{balancer_type} = 'DBIx::Class::Storage::DBI::Replicated::Balancer'.$storage_type_args->{balancer_type};
261         eval "require $storage_type_args->{balancer_type}";
262     }
263     
264     return $class->meta->new_object(
265         __INSTANCE__ => $obj,
266         %$storage_type_args,
267         @_,
268     );
269 }
270
271 =head2 _build_master
272
273 Lazy builder for the L</master> attribute.
274
275 =cut
276
277 sub _build_master {
278         DBIx::Class::Storage::DBI->new;
279 }
280
281 =head2 _build_pool_type
282
283 Lazy builder for the L</pool_type> attribute.
284
285 =cut
286
287 sub _build_pool_type {
288     return 'DBIx::Class::Storage::DBI::Replicated::Pool';
289 }
290
291 =head2 _build_pool
292
293 Lazy builder for the L</pool> attribute.
294
295 =cut
296
297 sub _build_pool {
298         my $self = shift @_;
299     $self->create_pool(%{$self->pool_args});
300 }
301
302 =head2 _build_balancer_type
303
304 Lazy builder for the L</balancer_type> attribute.
305
306 =cut
307
308 sub _build_balancer_type {
309     return 'DBIx::Class::Storage::DBI::Replicated::Balancer::First';
310 }
311
312 =head2 _build_balancer
313
314 Lazy builder for the L</balancer> attribute.  This takes a Pool object so that
315 the balancer knows which pool it's balancing.
316
317 =cut
318
319 sub _build_balancer {
320     my $self = shift @_;
321     $self->create_balancer(
322         pool=>$self->pool, 
323         master=>$self->master,
324         %{$self->balancer_args},);
325 }
326
327 =head2 _build_write_handler
328
329 Lazy builder for the L</write_handler> attribute.  The default is to set this to
330 the L</master>.
331
332 =cut
333
334 sub _build_write_handler {
335     return shift->master;
336 }
337
338 =head2 _build_read_handler
339
340 Lazy builder for the L</read_handler> attribute.  The default is to set this to
341 the L</balancer>.
342
343 =cut
344
345 sub _build_read_handler {
346     return shift->balancer;
347 }
348
349 =head2 around: connect_replicants
350
351 All calls to connect_replicants needs to have an existing $schema tacked onto
352 top of the args, since L<DBIx::Storage::DBI> needs it.
353
354 =cut
355
356 around 'connect_replicants' => sub {
357         my ($method, $self, @args) = @_;
358         $self->$method($self->schema, @args);
359 };
360
361 =head2 all_storages
362
363 Returns an array of of all the connected storage backends.  The first element
364 in the returned array is the master, and the remainings are each of the
365 replicants.
366
367 =cut
368
369 sub all_storages {
370         my $self = shift @_;
371         
372         return grep {defined $_ && blessed $_} (
373            $self->master,
374            $self->replicants,
375         );
376 }
377
378 =head2 execute_reliably ($coderef, ?@args)
379
380 Given a coderef, saves the current state of the L</read_handler>, forces it to
381 use reliable storage (ie sets it to the master), executes a coderef and then
382 restores the original state.
383
384 Example:
385
386     my $reliably = sub {
387         my $name = shift @_;
388         $schema->resultset('User')->create({name=>$name});
389         my $user_rs = $schema->resultset('User')->find({name=>$name}); 
390         return $user_rs;
391     };
392
393     my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
394
395 Use this when you must be certain of your database state, such as when you just
396 inserted something and need to get a resultset including it, etc.
397
398 =cut
399
400 sub execute_reliably {
401     my ($self, $coderef, @args) = @_;
402         
403     unless( ref $coderef eq 'CODE') {
404         $self->throw_exception('Second argument must be a coderef');
405     }
406
407     ##Get copy of master storage
408     my $master = $self->master;
409     
410     ##Get whatever the current read hander is
411     my $current = $self->read_handler;
412     
413     ##Set the read handler to master
414     $self->read_handler($master);
415     
416     ## do whatever the caller needs
417     my @result;
418     my $want_array = wantarray;
419     
420     eval {
421             if($want_array) {
422                 @result = $coderef->(@args);
423             }
424             elsif(defined $want_array) {
425                 ($result[0]) = ($coderef->(@args));
426             } else {
427                 $coderef->(@args);
428             }           
429     };
430     
431     ##Reset to the original state
432     $self->read_handler($current); 
433     
434     ##Exception testing has to come last, otherwise you might leave the 
435     ##read_handler set to master.
436     
437     if($@) {
438         $self->throw_exception("coderef returned an error: $@");
439     } else {
440         return $want_array ? @result : $result[0];
441     }
442 }
443
444 =head2 set_reliable_storage
445
446 Sets the current $schema to be 'reliable', that is all queries, both read and
447 write are sent to the master
448     
449 =cut
450
451 sub set_reliable_storage {
452         my $self = shift @_;
453         my $schema = $self->schema;
454         my $write_handler = $self->schema->storage->write_handler;
455         
456         $schema->storage->read_handler($write_handler);
457 }
458
459 =head2 set_balanced_storage
460
461 Sets the current $schema to be use the </balancer> for all reads, while all
462 writea are sent to the master only
463     
464 =cut
465
466 sub set_balanced_storage {
467     my $self = shift @_;
468     my $schema = $self->schema;
469     my $write_handler = $self->schema->storage->balancer;
470     
471     $schema->storage->read_handler($write_handler);
472 }
473
474 =head2 around: txn_do ($coderef)
475
476 Overload to the txn_do method, which is delegated to whatever the
477 L<write_handler> is set to.  We overload this in order to wrap in inside a
478 L</execute_reliably> method.
479
480 =cut
481
482 around 'txn_do' => sub {
483     my($txn_do, $self, $coderef, @args) = @_;
484     $self->execute_reliably(sub {$self->$txn_do($coderef, @args)});     
485 };
486
487 =head2 reload_row ($row)
488
489 Overload to the reload_row method so that the reloading is always directed to
490 the master storage.
491
492 =cut
493
494 around 'reload_row' => sub {
495         my ($reload_row, $self, $row) = @_;
496         return $self->execute_reliably(sub {
497                 return $self->$reload_row(shift);
498         }, $row);
499 };
500
501 =head2 connected
502
503 Check that the master and at least one of the replicants is connected.
504
505 =cut
506
507 sub connected {
508         my $self = shift @_;
509         
510         return
511            $self->master->connected &&
512            $self->pool->connected_replicants;
513 }
514
515 =head2 ensure_connected
516
517 Make sure all the storages are connected.
518
519 =cut
520
521 sub ensure_connected {
522     my $self = shift @_;
523     foreach my $source ($self->all_storages) {
524         $source->ensure_connected(@_);
525     }
526 }
527
528 =head2 limit_dialect
529
530 Set the limit_dialect for all existing storages
531
532 =cut
533
534 sub limit_dialect {
535     my $self = shift @_;
536     foreach my $source ($self->all_storages) {
537         $source->limit_dialect(@_);
538     }
539 }
540
541 =head2 quote_char
542
543 Set the quote_char for all existing storages
544
545 =cut
546
547 sub quote_char {
548     my $self = shift @_;
549     foreach my $source ($self->all_storages) {
550         $source->quote_char(@_);
551     }
552 }
553
554 =head2 name_sep
555
556 Set the name_sep for all existing storages
557
558 =cut
559
560 sub name_sep {
561     my $self = shift @_;
562     foreach my $source ($self->all_storages) {
563         $source->name_sep(@_);
564     }
565 }
566
567 =head2 set_schema
568
569 Set the schema object for all existing storages
570
571 =cut
572
573 sub set_schema {
574         my $self = shift @_;
575         foreach my $source ($self->all_storages) {
576                 $source->set_schema(@_);
577         }
578 }
579
580 =head2 debug
581
582 set a debug flag across all storages
583
584 =cut
585
586 sub debug {
587     my $self = shift @_;
588     foreach my $source ($self->all_storages) {
589         $source->debug(@_);
590     }
591 }
592
593 =head2 debugobj
594
595 set a debug object across all storages
596
597 =cut
598
599 sub debugobj {
600     my $self = shift @_;
601     foreach my $source ($self->all_storages) {
602         $source->debugobj(@_);
603     }
604 }
605
606 =head2 debugfh
607
608 set a debugfh object across all storages
609
610 =cut
611
612 sub debugfh {
613     my $self = shift @_;
614     foreach my $source ($self->all_storages) {
615         $source->debugfh(@_);
616     }
617 }
618
619 =head2 debugcb
620
621 set a debug callback across all storages
622
623 =cut
624
625 sub debugcb {
626     my $self = shift @_;
627     foreach my $source ($self->all_storages) {
628         $source->debugcb(@_);
629     }
630 }
631
632 =head2 disconnect
633
634 disconnect everything
635
636 =cut
637
638 sub disconnect {
639     my $self = shift @_;
640     foreach my $source ($self->all_storages) {
641         $source->disconnect(@_);
642     }
643 }
644
645 =head1 AUTHOR
646
647     John Napiorkowski <john.napiorkowski@takkle.com>
648
649 Based on code originated by:
650
651     Norbert Csongrádi <bert@cpan.org>
652     Peter Siklósi <einon@einon.hu>
653
654 =head1 LICENSE
655
656 You may distribute this code under the same terms as Perl itself.
657
658 =cut
659
660 1;