removed ->reload_row from storage, changed this to a method based on the actual row...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 use Moose;
4 use Class::MOP;
5 use Moose::Util::TypeConstraints;
6 use DBIx::Class::Storage::DBI;
7 use DBIx::Class::Storage::DBI::Replicated::Pool;
8 use DBIx::Class::Storage::DBI::Replicated::Balancer;
9
10 =head1 NAME
11
12 DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
13
14 =head1 SYNOPSIS
15
16 The Following example shows how to change an existing $schema to a replicated
17 storage type, add some replicated (readonly) databases, and perform reporting
18 tasks.
19
20   ## Change storage_type in your schema class
21   $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
22   
23   ## Add some slaves.  Basically this is an array of arrayrefs, where each
24   ## arrayref is database connect information
25   
26   $schema->storage->connect_replicants(
27     [$dsn1, $user, $pass, \%opts],
28     [$dsn2, $user, $pass, \%opts],
29     [$dsn3, $user, $pass, \%opts],
30   );
31   
32 =head1 DESCRIPTION
33
34 Warning: This class is marked ALPHA.  We are using this in development and have
35 some basic test coverage but the code hasn't yet been stressed by a variety
36 of databases.  Individual DB's may have quirks we are not aware of.  Please
37 use this in development and pass along your experiences/bug fixes.
38
39 This class implements replicated data store for DBI. Currently you can define
40 one master and numerous slave database connections. All write-type queries
41 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
42 database, all read-type queries (SELECTs) go to the slave database.
43
44 Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
45 handle gets delegated to one of the two attributes: L</read_handler> or to
46 L</write_handler>.  Additionally, some methods need to be distributed
47 to all existing storages.  This way our storage class is a drop in replacement
48 for L<DBIx::Class::Storage::DBI>.
49
50 Read traffic is spread across the replicants (slaves) occuring to a user
51 selected algorithm.  The default algorithm is random weighted.
52
53 =head1 NOTES
54
55 The consistancy betweeen master and replicants is database specific.  The Pool
56 gives you a method to validate it's replicants, removing and replacing them
57 when they fail/pass predefined criteria.  It is recommened that your application
58 define two schemas, one using the replicated storage and another that just 
59 connects to the master.
60
61 =head1 ATTRIBUTES
62
63 This class defines the following attributes.
64
65 =head2 schema
66
67 The underlying L<DBIx::Class::Schema> object this storage is attaching
68
69 =cut
70
71 has 'schema' => (
72     is=>'rw',
73     isa=>'DBIx::Class::Schema',
74     weak_ref=>1,
75     required=>1,
76 );
77
78 =head2 pool_type
79
80 Contains the classname which will instantiate the L</pool> object.  Defaults 
81 to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
82
83 =cut
84
85 has 'pool_type' => (
86   is=>'ro',
87   isa=>'ClassName',
88   required=>1,
89   default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
90   handles=>{
91     'create_pool' => 'new',
92   },
93 );
94
95 =head2 pool_args
96
97 Contains a hashref of initialized information to pass to the Balancer object.
98 See L<DBIx::Class::Storage::Replicated::Pool> for available arguments.
99
100 =cut
101
102 has 'pool_args' => (
103   is=>'ro',
104   isa=>'HashRef',
105   lazy=>1,
106   required=>1,
107   default=>sub { {} },
108 );
109
110
111 =head2 balancer_type
112
113 The replication pool requires a balance class to provider the methods for
114 choose how to spread the query load across each replicant in the pool.
115
116 =cut
117
118 subtype 'DBIx::Class::Storage::DBI::Replicated::BalancerClassNamePart',
119   as 'ClassName';
120     
121 coerce 'DBIx::Class::Storage::DBI::Replicated::BalancerClassNamePart',
122   from 'Str',
123   via {
124         my $type = $_;
125     if($type=~m/^::/) {
126       $type = 'DBIx::Class::Storage::DBI::Replicated::Balancer'.$type;
127     }  
128     Class::MOP::load_class($type);  
129     $type;      
130   };
131
132 has 'balancer_type' => (
133   is=>'ro',
134   isa=>'DBIx::Class::Storage::DBI::Replicated::BalancerClassNamePart',
135   coerce=>1,
136   required=>1,
137   default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
138   handles=>{
139     'create_balancer' => 'new',
140   },
141 );
142
143 =head2 balancer_args
144
145 Contains a hashref of initialized information to pass to the Balancer object.
146 See L<DBIx::Class::Storage::Replicated::Balancer> for available arguments.
147
148 =cut
149
150 has 'balancer_args' => (
151   is=>'ro',
152   isa=>'HashRef',
153   lazy=>1,
154   required=>1,
155   default=>sub { {} },
156 );
157
158 =head2 pool
159
160 Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class.  This is a
161 container class for one or more replicated databases.
162
163 =cut
164
165 has 'pool' => (
166   is=>'ro',
167   isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
168   lazy_build=>1,
169   handles=>[qw/
170     connect_replicants    
171     replicants
172     has_replicants
173   /],
174 );
175
176 =head2 balancer
177
178 Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class.  This 
179 is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
180
181 =cut
182
183 has 'balancer' => (
184   is=>'ro',
185   isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
186   lazy_build=>1,
187   handles=>[qw/auto_validate_every/],
188 );
189
190 =head2 master
191
192 The master defines the canonical state for a pool of connected databases.  All
193 the replicants are expected to match this databases state.  Thus, in a classic
194 Master / Slaves distributed system, all the slaves are expected to replicate
195 the Master's state as quick as possible.  This is the only database in the
196 pool of databases that is allowed to handle write traffic.
197
198 =cut
199
200 has 'master' => (
201   is=> 'ro',
202   isa=>'DBIx::Class::Storage::DBI',
203   lazy_build=>1,
204 );
205
206 =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
207
208 The following methods are delegated all the methods required for the 
209 L<DBIx::Class::Storage::DBI> interface.
210
211 =head2 read_handler
212
213 Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
214
215 =cut
216
217 has 'read_handler' => (
218   is=>'rw',
219   isa=>'Object',
220   lazy_build=>1,
221   handles=>[qw/
222     select
223     select_single
224     columns_info_for
225   /],    
226 );
227
228 =head2 write_handler
229
230 Defines an object that implements the write side of L<BIx::Class::Storage::DBI>.
231
232 =cut
233
234 has 'write_handler' => (
235   is=>'ro',
236   isa=>'Object',
237   lazy_build=>1,
238   lazy_build=>1,
239   handles=>[qw/   
240     on_connect_do
241     on_disconnect_do       
242     connect_info
243     throw_exception
244     sql_maker
245     sqlt_type
246     create_ddl_dir
247     deployment_statements
248     datetime_parser
249     datetime_parser_type        
250     last_insert_id
251     insert
252     insert_bulk
253     update
254     delete
255     dbh
256     txn_begin
257     txn_do
258     txn_commit
259     txn_rollback
260     txn_scope_guard
261     sth
262     deploy
263
264     reload_row
265     _prep_for_execute
266     configure_sqlt
267     
268   /],
269 );
270
271 =head1 METHODS
272
273 This class defines the following methods.
274
275 =head2 new
276
277 L<DBIx::Class::Schema> when instantiating it's storage passed itself as the
278 first argument.  So we need to massage the arguments a bit so that all the
279 bits get put into the correct places.
280
281 =cut
282
283 around 'new' => sub {
284   my ($new, $self, $schema, $storage_type_args, @args) = @_;
285   return $self->$new(schema=>$schema, %$storage_type_args, @args);
286 };
287
288 =head2 _build_master
289
290 Lazy builder for the L</master> attribute.
291
292 =cut
293
294 sub _build_master {
295   my $self = shift @_;
296   DBIx::Class::Storage::DBI->new($self->schema);
297 }
298
299 =head2 _build_pool
300
301 Lazy builder for the L</pool> attribute.
302
303 =cut
304
305 sub _build_pool {
306   my $self = shift @_;
307   $self->create_pool(%{$self->pool_args});
308 }
309
310 =head2 _build_balancer
311
312 Lazy builder for the L</balancer> attribute.  This takes a Pool object so that
313 the balancer knows which pool it's balancing.
314
315 =cut
316
317 sub _build_balancer {
318   my $self = shift @_;
319   $self->create_balancer(
320     pool=>$self->pool, 
321     master=>$self->master,
322     %{$self->balancer_args},
323   );
324 }
325
326 =head2 _build_write_handler
327
328 Lazy builder for the L</write_handler> attribute.  The default is to set this to
329 the L</master>.
330
331 =cut
332
333 sub _build_write_handler {
334   return shift->master;
335 }
336
337 =head2 _build_read_handler
338
339 Lazy builder for the L</read_handler> attribute.  The default is to set this to
340 the L</balancer>.
341
342 =cut
343
344 sub _build_read_handler {
345   return shift->balancer;
346 }
347
348 =head2 around: connect_replicants
349
350 All calls to connect_replicants needs to have an existing $schema tacked onto
351 top of the args, since L<DBIx::Storage::DBI> needs it.
352
353 =cut
354
355 around 'connect_replicants' => sub {
356   my ($method, $self, @args) = @_;
357   $self->$method($self->schema, @args);
358 };
359
360 =head2 all_storages
361
362 Returns an array of of all the connected storage backends.  The first element
363 in the returned array is the master, and the remainings are each of the
364 replicants.
365
366 =cut
367
368 sub all_storages {
369   my $self = shift @_;
370   return grep {defined $_ && blessed $_} (
371      $self->master,
372      $self->replicants,
373   );
374 }
375
376 =head2 execute_reliably ($coderef, ?@args)
377
378 Given a coderef, saves the current state of the L</read_handler>, forces it to
379 use reliable storage (ie sets it to the master), executes a coderef and then
380 restores the original state.
381
382 Example:
383
384   my $reliably = sub {
385     my $name = shift @_;
386     $schema->resultset('User')->create({name=>$name});
387     my $user_rs = $schema->resultset('User')->find({name=>$name}); 
388     return $user_rs;
389   };
390
391   my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
392
393 Use this when you must be certain of your database state, such as when you just
394 inserted something and need to get a resultset including it, etc.
395
396 =cut
397
398 sub execute_reliably {
399   my ($self, $coderef, @args) = @_;
400   
401   unless( ref $coderef eq 'CODE') {
402     $self->throw_exception('Second argument must be a coderef');
403   }
404   
405   ##Get copy of master storage
406   my $master = $self->master;
407   
408   ##Get whatever the current read hander is
409   my $current = $self->read_handler;
410   
411   ##Set the read handler to master
412   $self->read_handler($master);
413   
414   ## do whatever the caller needs
415   my @result;
416   my $want_array = wantarray;
417   
418   eval {
419     if($want_array) {
420       @result = $coderef->(@args);
421     } elsif(defined $want_array) {
422       ($result[0]) = ($coderef->(@args));
423     } else {
424       $coderef->(@args);
425     }       
426   };
427   
428   ##Reset to the original state
429   $self->read_handler($current); 
430   
431   ##Exception testing has to come last, otherwise you might leave the 
432   ##read_handler set to master.
433   
434   if($@) {
435     $self->throw_exception("coderef returned an error: $@");
436   } else {
437     return $want_array ? @result : $result[0];
438   }
439 }
440
441 =head2 set_reliable_storage
442
443 Sets the current $schema to be 'reliable', that is all queries, both read and
444 write are sent to the master
445   
446 =cut
447
448 sub set_reliable_storage {
449   my $self = shift @_;
450   my $schema = $self->schema;
451   my $write_handler = $self->schema->storage->write_handler;
452   
453   $schema->storage->read_handler($write_handler);
454 }
455
456 =head2 set_balanced_storage
457
458 Sets the current $schema to be use the </balancer> for all reads, while all
459 writea are sent to the master only
460   
461 =cut
462
463 sub set_balanced_storage {
464   my $self = shift @_;
465   my $schema = $self->schema;
466   my $write_handler = $self->schema->storage->balancer;
467   
468   $schema->storage->read_handler($write_handler);
469 }
470
471 =head2 around: txn_do ($coderef)
472
473 Overload to the txn_do method, which is delegated to whatever the
474 L<write_handler> is set to.  We overload this in order to wrap in inside a
475 L</execute_reliably> method.
476
477 =cut
478
479 around 'txn_do' => sub {
480   my($txn_do, $self, $coderef, @args) = @_;
481   $self->execute_reliably(sub {$self->$txn_do($coderef, @args)}); 
482 };
483
484 =head2 connected
485
486 Check that the master and at least one of the replicants is connected.
487
488 =cut
489
490 sub connected {
491   my $self = shift @_;
492   return
493     $self->master->connected &&
494     $self->pool->connected_replicants;
495 }
496
497 =head2 ensure_connected
498
499 Make sure all the storages are connected.
500
501 =cut
502
503 sub ensure_connected {
504   my $self = shift @_;
505   foreach my $source ($self->all_storages) {
506     $source->ensure_connected(@_);
507   }
508 }
509
510 =head2 limit_dialect
511
512 Set the limit_dialect for all existing storages
513
514 =cut
515
516 sub limit_dialect {
517   my $self = shift @_;
518   foreach my $source ($self->all_storages) {
519     $source->limit_dialect(@_);
520   }
521 }
522
523 =head2 quote_char
524
525 Set the quote_char for all existing storages
526
527 =cut
528
529 sub quote_char {
530   my $self = shift @_;
531   foreach my $source ($self->all_storages) {
532     $source->quote_char(@_);
533   }
534 }
535
536 =head2 name_sep
537
538 Set the name_sep for all existing storages
539
540 =cut
541
542 sub name_sep {
543   my $self = shift @_;
544   foreach my $source ($self->all_storages) {
545     $source->name_sep(@_);
546   }
547 }
548
549 =head2 set_schema
550
551 Set the schema object for all existing storages
552
553 =cut
554
555 sub set_schema {
556   my $self = shift @_;
557   foreach my $source ($self->all_storages) {
558     $source->set_schema(@_);
559   }
560 }
561
562 =head2 debug
563
564 set a debug flag across all storages
565
566 =cut
567
568 sub debug {
569   my $self = shift @_;
570   foreach my $source ($self->all_storages) {
571     $source->debug(@_);
572   }
573 }
574
575 =head2 debugobj
576
577 set a debug object across all storages
578
579 =cut
580
581 sub debugobj {
582   my $self = shift @_;
583   foreach my $source ($self->all_storages) {
584     $source->debugobj(@_);
585   }
586 }
587
588 =head2 debugfh
589
590 set a debugfh object across all storages
591
592 =cut
593
594 sub debugfh {
595   my $self = shift @_;
596   foreach my $source ($self->all_storages) {
597     $source->debugfh(@_);
598   }
599 }
600
601 =head2 debugcb
602
603 set a debug callback across all storages
604
605 =cut
606
607 sub debugcb {
608   my $self = shift @_;
609   foreach my $source ($self->all_storages) {
610     $source->debugcb(@_);
611   }
612 }
613
614 =head2 disconnect
615
616 disconnect everything
617
618 =cut
619
620 sub disconnect {
621   my $self = shift @_;
622   foreach my $source ($self->all_storages) {
623     $source->disconnect(@_);
624   }
625 }
626
627 =head1 AUTHOR
628
629   John Napiorkowski <john.napiorkowski@takkle.com>
630
631 Based on code originated by:
632
633   Norbert Csongrádi <bert@cpan.org>
634   Peter Siklósi <einon@einon.hu>
635
636 =head1 LICENSE
637
638 You may distribute this code under the same terms as Perl itself.
639
640 =cut
641
642 1;