add a link to SQL::Abstract for WHERE clauses
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
CommitLineData
2156bbdd 1package DBIx::Class::Storage::DBI::Replicated;
f5d3a5de 2
ecb65397 3BEGIN {
a34b0c89 4 use DBIx::Class;
70c28808 5 die('The following modules are required for Replication ' . DBIx::Class::Optional::Dependencies->req_missing_for ('replicated') . "\n" )
a34b0c89 6 unless DBIx::Class::Optional::Dependencies->req_ok_for ('replicated');
ecb65397 7}
8
b2e4d522 9use Moose;
26ab719a 10use DBIx::Class::Storage::DBI;
2bf79155 11use DBIx::Class::Storage::DBI::Replicated::Pool;
26ab719a 12use DBIx::Class::Storage::DBI::Replicated::Balancer;
6a151f58 13use DBIx::Class::Storage::DBI::Replicated::Types qw/BalancerClassNamePart DBICSchema DBICStorageDBI/;
41916570 14use MooseX::Types::Moose qw/ClassName HashRef Object/;
b2e4d522 15use Scalar::Util 'reftype';
e666c5fd 16use Hash::Merge;
7da56142 17use List::Util qw/min max reduce/;
ed7ab0f4 18use Try::Tiny;
fd323bf1 19use namespace::clean;
9901aad7 20
21use namespace::clean -except => 'meta';
2bf79155 22
23=head1 NAME
24
ecb65397 25DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
2bf79155 26
27=head1 SYNOPSIS
28
29The Following example shows how to change an existing $schema to a replicated
48580715 30storage type, add some replicated (read-only) databases, and perform reporting
955a6df6 31tasks.
2bf79155 32
3da4f736 33You should set the 'storage_type attribute to a replicated type. You should
d4daee7b 34also define your arguments, such as which balancer you want and any arguments
3da4f736 35that the Pool object should get.
36
ce854fd3 37 my $schema = Schema::Class->clone;
64cdad22 38 $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
ce854fd3 39 $schema->connection(...);
d4daee7b 40
fd323bf1 41Next, you need to add in the Replicants. Basically this is an array of
3da4f736 42arrayrefs, where each arrayref is database connect information. Think of these
43arguments as what you'd pass to the 'normal' $schema->connect method.
d4daee7b 44
64cdad22 45 $schema->storage->connect_replicants(
46 [$dsn1, $user, $pass, \%opts],
47 [$dsn2, $user, $pass, \%opts],
48 [$dsn3, $user, $pass, \%opts],
49 );
d4daee7b 50
3da4f736 51Now, just use the $schema as you normally would. Automatically all reads will
52be delegated to the replicants, while writes to the master.
53
7e38d850 54 $schema->resultset('Source')->search({name=>'etc'});
d4daee7b 55
3da4f736 56You can force a given query to use a particular storage using the search
57attribute 'force_pool'. For example:
d4daee7b 58
7e38d850 59 my $RS = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
3da4f736 60
61Now $RS will force everything (both reads and writes) to use whatever was setup
fd323bf1 62as the master storage. 'master' is hardcoded to always point to the Master,
3da4f736 63but you can also use any Replicant name. Please see:
212cc5c2 64L<DBIx::Class::Storage::DBI::Replicated::Pool> and the replicants attribute for more.
3da4f736 65
66Also see transactions and L</execute_reliably> for alternative ways to
67force read traffic to the master. In general, you should wrap your statements
68in a transaction when you are reading and writing to the same tables at the
69same time, since your replicants will often lag a bit behind the master.
212cc5c2 70
71See L<DBIx::Class::Storage::DBI::Replicated::Instructions> for more help and
72walkthroughs.
d4daee7b 73
2bf79155 74=head1 DESCRIPTION
75
7e38d850 76Warning: This class is marked BETA. This has been running a production
ccb3b897 77website using MySQL native replication as its backend and we have some decent
7e38d850 78test coverage but the code hasn't yet been stressed by a variety of databases.
48580715 79Individual DBs may have quirks we are not aware of. Please use this in first
7e38d850 80development and pass along your experiences/bug fixes.
2bf79155 81
82This class implements replicated data store for DBI. Currently you can define
83one master and numerous slave database connections. All write-type queries
84(INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
85database, all read-type queries (SELECTs) go to the slave database.
86
87Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
bca099a3 88handle gets delegated to one of the two attributes: L</read_handler> or to
89L</write_handler>. Additionally, some methods need to be distributed
2bf79155 90to all existing storages. This way our storage class is a drop in replacement
91for L<DBIx::Class::Storage::DBI>.
92
48580715 93Read traffic is spread across the replicants (slaves) occurring to a user
2bf79155 94selected algorithm. The default algorithm is random weighted.
95
bca099a3 96=head1 NOTES
97
48580715 98The consistency between master and replicants is database specific. The Pool
faaba25f 99gives you a method to validate its replicants, removing and replacing them
7e38d850 100when they fail/pass predefined criteria. Please make careful use of the ways
ecb65397 101to force a query to run against Master when needed.
102
103=head1 REQUIREMENTS
104
a34b0c89 105Replicated Storage has additional requirements not currently part of
aa8b2277 106L<DBIx::Class>. See L<DBIx::Class::Optional::Dependencies> for more details.
2bf79155 107
108=head1 ATTRIBUTES
109
110This class defines the following attributes.
111
2ce6e9a6 112=head2 schema
113
114The underlying L<DBIx::Class::Schema> object this storage is attaching
115
116=cut
117
118has 'schema' => (
119 is=>'rw',
6a151f58 120 isa=>DBICSchema,
2ce6e9a6 121 weak_ref=>1,
122 required=>1,
123);
124
26ab719a 125=head2 pool_type
2bf79155 126
fd323bf1 127Contains the classname which will instantiate the L</pool> object. Defaults
26ab719a 128to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
2bf79155 129
130=cut
131
26ab719a 132has 'pool_type' => (
dcdf7b2c 133 is=>'rw',
41916570 134 isa=>ClassName,
2ce6e9a6 135 default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
64cdad22 136 handles=>{
137 'create_pool' => 'new',
138 },
2bf79155 139);
140
f068a139 141=head2 pool_args
142
143Contains a hashref of initialized information to pass to the Balancer object.
212cc5c2 144See L<DBIx::Class::Storage::DBI::Replicated::Pool> for available arguments.
f068a139 145
146=cut
147
148has 'pool_args' => (
dcdf7b2c 149 is=>'rw',
41916570 150 isa=>HashRef,
64cdad22 151 lazy=>1,
64cdad22 152 default=>sub { {} },
f068a139 153);
154
155
26ab719a 156=head2 balancer_type
2bf79155 157
158The replication pool requires a balance class to provider the methods for
159choose how to spread the query load across each replicant in the pool.
160
161=cut
162
26ab719a 163has 'balancer_type' => (
dcdf7b2c 164 is=>'rw',
9901aad7 165 isa=>BalancerClassNamePart,
2ce6e9a6 166 coerce=>1,
167 required=>1,
168 default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
64cdad22 169 handles=>{
170 'create_balancer' => 'new',
171 },
2bf79155 172);
173
17b05c13 174=head2 balancer_args
175
176Contains a hashref of initialized information to pass to the Balancer object.
212cc5c2 177See L<DBIx::Class::Storage::DBI::Replicated::Balancer> for available arguments.
17b05c13 178
179=cut
180
181has 'balancer_args' => (
dcdf7b2c 182 is=>'rw',
41916570 183 isa=>HashRef,
64cdad22 184 lazy=>1,
185 required=>1,
186 default=>sub { {} },
17b05c13 187);
188
26ab719a 189=head2 pool
2bf79155 190
370f78d4 191Is a L<DBIx::Class::Storage::DBI::Replicated::Pool> or derived class. This is a
26ab719a 192container class for one or more replicated databases.
2bf79155 193
194=cut
195
26ab719a 196has 'pool' => (
64cdad22 197 is=>'ro',
198 isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
199 lazy_build=>1,
200 handles=>[qw/
6f7344b8 201 connect_replicants
64cdad22 202 replicants
203 has_replicants
204 /],
2bf79155 205);
206
26ab719a 207=head2 balancer
2bf79155 208
370f78d4 209Is a L<DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class. This
210is a class that takes a pool (L<DBIx::Class::Storage::DBI::Replicated::Pool>)
2bf79155 211
26ab719a 212=cut
2bf79155 213
26ab719a 214has 'balancer' => (
dcdf7b2c 215 is=>'rw',
64cdad22 216 isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
217 lazy_build=>1,
218 handles=>[qw/auto_validate_every/],
26ab719a 219);
2bf79155 220
cb6ec758 221=head2 master
222
223The master defines the canonical state for a pool of connected databases. All
224the replicants are expected to match this databases state. Thus, in a classic
225Master / Slaves distributed system, all the slaves are expected to replicate
226the Master's state as quick as possible. This is the only database in the
227pool of databases that is allowed to handle write traffic.
228
229=cut
230
231has 'master' => (
64cdad22 232 is=> 'ro',
6a151f58 233 isa=>DBICStorageDBI,
64cdad22 234 lazy_build=>1,
cb6ec758 235);
236
cb6ec758 237=head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
238
fd323bf1 239The following methods are delegated all the methods required for the
cb6ec758 240L<DBIx::Class::Storage::DBI> interface.
241
cb6ec758 242=cut
243
4bea1fe7 244my $method_dispatch = {
245 writer => [qw/
64cdad22 246 on_connect_do
6f7344b8 247 on_disconnect_do
3244fdcc 248 on_connect_call
249 on_disconnect_call
64cdad22 250 connect_info
3244fdcc 251 _connect_info
64cdad22 252 throw_exception
253 sql_maker
254 sqlt_type
255 create_ddl_dir
256 deployment_statements
257 datetime_parser
6f7344b8 258 datetime_parser_type
259 build_datetime_parser
64cdad22 260 last_insert_id
261 insert
262 insert_bulk
263 update
264 delete
265 dbh
2ce6e9a6 266 txn_begin
64cdad22 267 txn_do
268 txn_commit
269 txn_rollback
2ce6e9a6 270 txn_scope_guard
90d7422f 271 _exec_txn_rollback
272 _exec_txn_begin
273 _exec_txn_commit
64cdad22 274 deploy
0180bef9 275 with_deferred_fk_checks
6f7344b8 276 dbh_do
2ce6e9a6 277 _prep_for_execute
6f7344b8 278 is_datatype_numeric
279 _count_select
6f7344b8 280 _subq_update_delete
281 svp_rollback
282 svp_begin
283 svp_release
e398f77e 284 relname_to_table_alias
3244fdcc 285 _dbh_last_insert_id
3244fdcc 286 _default_dbi_connect_attributes
287 _dbi_connect_info
b9ca4ff1 288 _dbic_connect_attributes
3244fdcc 289 auto_savepoint
0e773352 290 _query_start
3244fdcc 291 _query_end
0e773352 292 _format_for_trace
293 _dbi_attrs_for_bind
3244fdcc 294 bind_attribute_by_data_type
295 transaction_depth
296 _dbh
297 _select_args
298 _dbh_execute_array
3244fdcc 299 _sql_maker
3244fdcc 300 _per_row_update_delete
3244fdcc 301 _dbh_execute_inserts_with_no_binds
302 _select_args_to_query
5e782048 303 _gen_sql_bind
3244fdcc 304 _svp_generate_name
305 _multipk_update_delete
3244fdcc 306 _normalize_connect_info
307 _parse_connect_do
3244fdcc 308 _execute_array
3244fdcc 309 savepoints
3244fdcc 310 _sql_maker_opts
311 _conn_pid
3244fdcc 312 _dbh_autocommit
313 _native_data_type
314 _get_dbh
315 sql_maker_class
3244fdcc 316 _execute
317 _do_query
402ac1c9 318 _sth
3244fdcc 319 _dbh_sth
320 _dbh_execute
5b4f3fd0 321 /, Class::MOP::Class->initialize('DBIx::Class::Storage::DBIHacks')->get_method_list ],
4bea1fe7 322 reader => [qw/
323 select
324 select_single
325 columns_info_for
326 _dbh_columns_info_for
327 _select
328 /],
329 unimplemented => [qw/
330 _arm_global_destructor
331 _verify_pid
332
0e773352 333 source_bind_attributes
334
4bea1fe7 335 get_use_dbms_capability
336 set_use_dbms_capability
337 get_dbms_capability
338 set_dbms_capability
339 _dbh_details
340 _dbh_get_info
341
342 sql_limit_dialect
343 sql_quote_char
344 sql_name_sep
345
4bea1fe7 346 _prefetch_autovalues
347
348 _max_column_bytesize
349 _is_lob_type
350 _is_binary_lob_type
351 _is_text_lob_type
402ac1c9 352
353 sth
4bea1fe7 354 /,(
355 # the capability framework
356 # not sure if CMOP->initialize does evil things to DBIC::S::DBI, fix if a problem
357 grep
358 { $_ =~ /^ _ (?: use | supports | determine_supports ) _ /x }
359 ( Class::MOP::Class->initialize('DBIx::Class::Storage::DBI')->get_all_method_names )
360 )],
361};
362
363if (DBIx::Class::_ENV_::DBICTEST) {
364
365 my $seen;
366 for my $type (keys %$method_dispatch) {
367 for (@{$method_dispatch->{$type}}) {
368 push @{$seen->{$_}}, $type;
369 }
370 }
cb6ec758 371
4bea1fe7 372 if (my @dupes = grep { @{$seen->{$_}} > 1 } keys %$seen) {
373 die(join "\n", '',
374 'The following methods show up multiple times in ::Storage::DBI::Replicated handlers:',
375 (map { "$_: " . (join ', ', @{$seen->{$_}}) } sort @dupes),
376 '',
377 );
378 }
bbdda281 379
4bea1fe7 380 if (my @cant = grep { ! DBIx::Class::Storage::DBI->can($_) } keys %$seen) {
381 die(join "\n", '',
382 '::Storage::DBI::Replicated specifies handling of the following *NON EXISTING* ::Storage::DBI methods:',
383 @cant,
384 '',
385 );
386 }
387}
bbdda281 388
4bea1fe7 389for my $method (@{$method_dispatch->{unimplemented}}) {
390 __PACKAGE__->meta->add_method($method, sub {
70c28808 391 my $self = shift;
392 $self->throw_exception("$method must not be called on ".(blessed $self).' objects');
4bea1fe7 393 });
394}
31a8aaaf 395
4bea1fe7 396=head2 read_handler
584ea6e4 397
4bea1fe7 398Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
584ea6e4 399
4bea1fe7 400=cut
401
402has 'read_handler' => (
403 is=>'rw',
404 isa=>Object,
405 lazy_build=>1,
406 handles=>$method_dispatch->{reader},
e471ab87 407);
408
4bea1fe7 409=head2 write_handler
410
411Defines an object that implements the write side of L<BIx::Class::Storage::DBI>,
412as well as methods that don't write or read that can be called on only one
413storage, methods that return a C<$dbh>, and any methods that don't make sense to
414run on a replicant.
415
416=cut
417
418has 'write_handler' => (
419 is=>'ro',
420 isa=>Object,
421 lazy_build=>1,
422 handles=>$method_dispatch->{writer},
7f4433eb 423);
424
4bea1fe7 425
6d766626 426
b2e4d522 427has _master_connect_info_opts =>
428 (is => 'rw', isa => HashRef, default => sub { {} });
429
430=head2 around: connect_info
431
48580715 432Preserves master's C<connect_info> options (for merging with replicants.)
433Also sets any Replicated-related options from connect_info, such as
dcdf7b2c 434C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>.
b2e4d522 435
436=cut
437
438around connect_info => sub {
439 my ($next, $self, $info, @extra) = @_;
440
282a9a4f 441 my $merge = Hash::Merge->new('LEFT_PRECEDENT');
e666c5fd 442
b2e4d522 443 my %opts;
444 for my $arg (@$info) {
445 next unless (reftype($arg)||'') eq 'HASH';
e666c5fd 446 %opts = %{ $merge->merge($arg, \%opts) };
b2e4d522 447 }
b2e4d522 448 delete $opts{dsn};
449
dcdf7b2c 450 if (@opts{qw/pool_type pool_args/}) {
451 $self->pool_type(delete $opts{pool_type})
452 if $opts{pool_type};
453
b88b85e7 454 $self->pool_args(
e666c5fd 455 $merge->merge((delete $opts{pool_args} || {}), $self->pool_args)
b88b85e7 456 );
dcdf7b2c 457
64ae1667 458 ## Since we possibly changed the pool_args, we need to clear the current
459 ## pool object so that next time it is used it will be rebuilt.
460 $self->clear_pool;
dcdf7b2c 461 }
462
463 if (@opts{qw/balancer_type balancer_args/}) {
464 $self->balancer_type(delete $opts{balancer_type})
465 if $opts{balancer_type};
466
b88b85e7 467 $self->balancer_args(
e666c5fd 468 $merge->merge((delete $opts{balancer_args} || {}), $self->balancer_args)
b88b85e7 469 );
dcdf7b2c 470
67c43863 471 $self->balancer($self->_build_balancer)
6f7344b8 472 if $self->balancer;
dcdf7b2c 473 }
474
b2e4d522 475 $self->_master_connect_info_opts(\%opts);
476
cca282b6 477 my @res;
478 if (wantarray) {
0ce2d0d5 479 @res = $self->$next($info, @extra);
480 } else {
cca282b6 481 $res[0] = $self->$next($info, @extra);
0ce2d0d5 482 }
483
fd4eb9c2 484 # Make sure master is blessed into the correct class and apply role to it.
485 my $master = $self->master;
486 $master->_determine_driver;
487 Moose::Meta::Class->initialize(ref $master);
cea43436 488
ec0946db 489 DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master);
cea43436 490
491 # link pool back to master
492 $self->pool->master($master);
0ce2d0d5 493
cca282b6 494 wantarray ? @res : $res[0];
b2e4d522 495};
496
26ab719a 497=head1 METHODS
2bf79155 498
26ab719a 499This class defines the following methods.
2bf79155 500
c354902c 501=head2 BUILDARGS
2bf79155 502
faaba25f 503L<DBIx::Class::Schema> when instantiating its storage passed itself as the
2ce6e9a6 504first argument. So we need to massage the arguments a bit so that all the
505bits get put into the correct places.
2bf79155 506
507=cut
508
c354902c 509sub BUILDARGS {
fd323bf1 510 my ($class, $schema, $storage_type_args, @args) = @_;
d4daee7b 511
c354902c 512 return {
6f7344b8 513 schema=>$schema,
514 %$storage_type_args,
515 @args
c354902c 516 }
517}
2bf79155 518
cb6ec758 519=head2 _build_master
2bf79155 520
cb6ec758 521Lazy builder for the L</master> attribute.
2bf79155 522
523=cut
524
cb6ec758 525sub _build_master {
2ce6e9a6 526 my $self = shift @_;
ee356d00 527 my $master = DBIx::Class::Storage::DBI->new($self->schema);
ee356d00 528 $master
106d5f3b 529}
530
26ab719a 531=head2 _build_pool
2bf79155 532
26ab719a 533Lazy builder for the L</pool> attribute.
2bf79155 534
535=cut
536
26ab719a 537sub _build_pool {
64cdad22 538 my $self = shift @_;
539 $self->create_pool(%{$self->pool_args});
2bf79155 540}
541
26ab719a 542=head2 _build_balancer
2bf79155 543
cb6ec758 544Lazy builder for the L</balancer> attribute. This takes a Pool object so that
545the balancer knows which pool it's balancing.
2bf79155 546
547=cut
548
26ab719a 549sub _build_balancer {
64cdad22 550 my $self = shift @_;
551 $self->create_balancer(
6f7344b8 552 pool=>$self->pool,
64cdad22 553 master=>$self->master,
554 %{$self->balancer_args},
555 );
2bf79155 556}
557
cb6ec758 558=head2 _build_write_handler
2bf79155 559
cb6ec758 560Lazy builder for the L</write_handler> attribute. The default is to set this to
561the L</master>.
50336325 562
563=cut
564
cb6ec758 565sub _build_write_handler {
64cdad22 566 return shift->master;
cb6ec758 567}
50336325 568
cb6ec758 569=head2 _build_read_handler
2bf79155 570
cb6ec758 571Lazy builder for the L</read_handler> attribute. The default is to set this to
572the L</balancer>.
2bf79155 573
574=cut
575
cb6ec758 576sub _build_read_handler {
64cdad22 577 return shift->balancer;
cb6ec758 578}
50336325 579
cb6ec758 580=head2 around: connect_replicants
2bf79155 581
cb6ec758 582All calls to connect_replicants needs to have an existing $schema tacked onto
b2e4d522 583top of the args, since L<DBIx::Storage::DBI> needs it, and any C<connect_info>
584options merged with the master, with replicant opts having higher priority.
955a6df6 585
cb6ec758 586=cut
955a6df6 587
b2e4d522 588around connect_replicants => sub {
589 my ($next, $self, @args) = @_;
590
591 for my $r (@args) {
592 $r = [ $r ] unless reftype $r eq 'ARRAY';
593
1a58752c 594 $self->throw_exception('coderef replicant connect_info not supported')
b2e4d522 595 if ref $r->[0] && reftype $r->[0] eq 'CODE';
596
597# any connect_info options?
598 my $i = 0;
599 $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
600
6f7344b8 601# make one if none
b2e4d522 602 $r->[$i] = {} unless $r->[$i];
603
604# merge if two hashes
b88b85e7 605 my @hashes = @$r[$i .. $#{$r}];
606
1a58752c 607 $self->throw_exception('invalid connect_info options')
b88b85e7 608 if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes;
609
1a58752c 610 $self->throw_exception('too many hashrefs in connect_info')
b88b85e7 611 if @hashes > 2;
612
282a9a4f 613 my $merge = Hash::Merge->new('LEFT_PRECEDENT');
e666c5fd 614 my %opts = %{ $merge->merge(reverse @hashes) };
b88b85e7 615
616# delete them
b2e4d522 617 splice @$r, $i+1, ($#{$r} - $i), ();
618
0bd8e058 619# make sure master/replicants opts don't clash
620 my %master_opts = %{ $self->_master_connect_info_opts };
621 if (exists $opts{dbh_maker}) {
622 delete @master_opts{qw/dsn user password/};
623 }
624 delete $master_opts{dbh_maker};
625
b2e4d522 626# merge with master
e666c5fd 627 %opts = %{ $merge->merge(\%opts, \%master_opts) };
b2e4d522 628
629# update
630 $r->[$i] = \%opts;
631 }
632
633 $self->$next($self->schema, @args);
955a6df6 634};
2bf79155 635
2bf79155 636=head2 all_storages
637
638Returns an array of of all the connected storage backends. The first element
639in the returned array is the master, and the remainings are each of the
640replicants.
641
642=cut
643
644sub all_storages {
64cdad22 645 my $self = shift @_;
646 return grep {defined $_ && blessed $_} (
647 $self->master,
6412a592 648 values %{ $self->replicants },
64cdad22 649 );
2bf79155 650}
651
c4d3fae2 652=head2 execute_reliably ($coderef, ?@args)
653
654Given a coderef, saves the current state of the L</read_handler>, forces it to
48580715 655use reliable storage (e.g. sets it to the master), executes a coderef and then
c4d3fae2 656restores the original state.
657
658Example:
659
64cdad22 660 my $reliably = sub {
661 my $name = shift @_;
662 $schema->resultset('User')->create({name=>$name});
fd323bf1 663 my $user_rs = $schema->resultset('User')->find({name=>$name});
64cdad22 664 return $user_rs;
665 };
c4d3fae2 666
64cdad22 667 my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
c4d3fae2 668
669Use this when you must be certain of your database state, such as when you just
670inserted something and need to get a resultset including it, etc.
671
672=cut
673
674sub execute_reliably {
64cdad22 675 my ($self, $coderef, @args) = @_;
d4daee7b 676
64cdad22 677 unless( ref $coderef eq 'CODE') {
678 $self->throw_exception('Second argument must be a coderef');
679 }
d4daee7b 680
64cdad22 681 ##Get copy of master storage
682 my $master = $self->master;
d4daee7b 683
64cdad22 684 ##Get whatever the current read hander is
685 my $current = $self->read_handler;
d4daee7b 686
64cdad22 687 ##Set the read handler to master
688 $self->read_handler($master);
d4daee7b 689
64cdad22 690 ## do whatever the caller needs
691 my @result;
692 my $want_array = wantarray;
d4daee7b 693
ed7ab0f4 694 try {
64cdad22 695 if($want_array) {
696 @result = $coderef->(@args);
697 } elsif(defined $want_array) {
698 ($result[0]) = ($coderef->(@args));
ed213e85 699 } else {
64cdad22 700 $coderef->(@args);
6f7344b8 701 }
ed7ab0f4 702 } catch {
703 $self->throw_exception("coderef returned an error: $_");
704 } finally {
705 ##Reset to the original state
706 $self->read_handler($current);
64cdad22 707 };
d4daee7b 708
cca282b6 709 return wantarray ? @result : $result[0];
c4d3fae2 710}
711
cb6ec758 712=head2 set_reliable_storage
713
714Sets the current $schema to be 'reliable', that is all queries, both read and
715write are sent to the master
d4daee7b 716
cb6ec758 717=cut
718
719sub set_reliable_storage {
64cdad22 720 my $self = shift @_;
721 my $schema = $self->schema;
722 my $write_handler = $self->schema->storage->write_handler;
d4daee7b 723
64cdad22 724 $schema->storage->read_handler($write_handler);
cb6ec758 725}
726
727=head2 set_balanced_storage
728
729Sets the current $schema to be use the </balancer> for all reads, while all
48580715 730writes are sent to the master only
d4daee7b 731
cb6ec758 732=cut
733
734sub set_balanced_storage {
64cdad22 735 my $self = shift @_;
736 my $schema = $self->schema;
bd5da369 737 my $balanced_handler = $self->schema->storage->balancer;
d4daee7b 738
bd5da369 739 $schema->storage->read_handler($balanced_handler);
cb6ec758 740}
2bf79155 741
742=head2 connected
743
744Check that the master and at least one of the replicants is connected.
745
746=cut
747
748sub connected {
64cdad22 749 my $self = shift @_;
750 return
751 $self->master->connected &&
752 $self->pool->connected_replicants;
2bf79155 753}
754
2bf79155 755=head2 ensure_connected
756
757Make sure all the storages are connected.
758
759=cut
760
761sub ensure_connected {
64cdad22 762 my $self = shift @_;
763 foreach my $source ($self->all_storages) {
764 $source->ensure_connected(@_);
765 }
2bf79155 766}
767
2bf79155 768=head2 limit_dialect
769
770Set the limit_dialect for all existing storages
771
772=cut
773
774sub limit_dialect {
64cdad22 775 my $self = shift @_;
776 foreach my $source ($self->all_storages) {
777 $source->limit_dialect(@_);
778 }
f3e9f010 779 return $self->master->limit_dialect;
2bf79155 780}
781
2bf79155 782=head2 quote_char
783
784Set the quote_char for all existing storages
785
786=cut
787
788sub quote_char {
64cdad22 789 my $self = shift @_;
790 foreach my $source ($self->all_storages) {
791 $source->quote_char(@_);
792 }
3fbe08e3 793 return $self->master->quote_char;
2bf79155 794}
795
2bf79155 796=head2 name_sep
797
798Set the name_sep for all existing storages
799
800=cut
801
802sub name_sep {
64cdad22 803 my $self = shift @_;
804 foreach my $source ($self->all_storages) {
805 $source->name_sep(@_);
806 }
3fbe08e3 807 return $self->master->name_sep;
2bf79155 808}
809
2bf79155 810=head2 set_schema
811
812Set the schema object for all existing storages
813
814=cut
815
816sub set_schema {
64cdad22 817 my $self = shift @_;
818 foreach my $source ($self->all_storages) {
819 $source->set_schema(@_);
820 }
2bf79155 821}
822
2bf79155 823=head2 debug
824
825set a debug flag across all storages
826
827=cut
828
829sub debug {
64cdad22 830 my $self = shift @_;
3fbe08e3 831 if(@_) {
832 foreach my $source ($self->all_storages) {
833 $source->debug(@_);
6f7344b8 834 }
64cdad22 835 }
3fbe08e3 836 return $self->master->debug;
2bf79155 837}
838
2bf79155 839=head2 debugobj
840
cea43436 841set a debug object
2bf79155 842
843=cut
844
845sub debugobj {
64cdad22 846 my $self = shift @_;
cea43436 847 return $self->master->debugobj(@_);
2bf79155 848}
849
2bf79155 850=head2 debugfh
851
cea43436 852set a debugfh object
2bf79155 853
854=cut
855
856sub debugfh {
64cdad22 857 my $self = shift @_;
cea43436 858 return $self->master->debugfh(@_);
2bf79155 859}
860
2bf79155 861=head2 debugcb
862
cea43436 863set a debug callback
2bf79155 864
865=cut
866
867sub debugcb {
64cdad22 868 my $self = shift @_;
cea43436 869 return $self->master->debugcb(@_);
2bf79155 870}
871
2bf79155 872=head2 disconnect
873
874disconnect everything
875
876=cut
877
878sub disconnect {
64cdad22 879 my $self = shift @_;
880 foreach my $source ($self->all_storages) {
881 $source->disconnect(@_);
882 }
2bf79155 883}
884
b2e4d522 885=head2 cursor_class
886
887set cursor class on all storages, or return master's
888
889=cut
890
891sub cursor_class {
892 my ($self, $cursor_class) = @_;
893
894 if ($cursor_class) {
895 $_->cursor_class($cursor_class) for $self->all_storages;
896 }
897 $self->master->cursor_class;
898}
d4daee7b 899
3244fdcc 900=head2 cursor
901
902set cursor class on all storages, or return master's, alias for L</cursor_class>
903above.
904
905=cut
906
907sub cursor {
908 my ($self, $cursor_class) = @_;
909
910 if ($cursor_class) {
911 $_->cursor($cursor_class) for $self->all_storages;
912 }
913 $self->master->cursor;
914}
915
916=head2 unsafe
917
918sets the L<DBIx::Class::Storage::DBI/unsafe> option on all storages or returns
919master's current setting
920
921=cut
922
923sub unsafe {
924 my $self = shift;
925
926 if (@_) {
927 $_->unsafe(@_) for $self->all_storages;
928 }
929
930 return $self->master->unsafe;
931}
932
933=head2 disable_sth_caching
934
935sets the L<DBIx::Class::Storage::DBI/disable_sth_caching> option on all storages
936or returns master's current setting
937
938=cut
939
940sub disable_sth_caching {
941 my $self = shift;
942
943 if (@_) {
944 $_->disable_sth_caching(@_) for $self->all_storages;
945 }
946
947 return $self->master->disable_sth_caching;
948}
949
950=head2 lag_behind_master
951
952returns the highest Replicant L<DBIx::Class::Storage::DBI/lag_behind_master>
953setting
954
955=cut
956
957sub lag_behind_master {
958 my $self = shift;
959
960 return max map $_->lag_behind_master, $self->replicants;
fd323bf1 961}
3244fdcc 962
963=head2 is_replicating
964
965returns true if all replicants return true for
966L<DBIx::Class::Storage::DBI/is_replicating>
967
968=cut
969
970sub is_replicating {
971 my $self = shift;
972
973 return (grep $_->is_replicating, $self->replicants) == ($self->replicants);
974}
975
976=head2 connect_call_datetime_setup
977
978calls L<DBIx::Class::Storage::DBI/connect_call_datetime_setup> for all storages
979
980=cut
981
982sub connect_call_datetime_setup {
983 my $self = shift;
984 $_->connect_call_datetime_setup for $self->all_storages;
985}
986
987sub _populate_dbh {
988 my $self = shift;
989 $_->_populate_dbh for $self->all_storages;
990}
991
992sub _connect {
993 my $self = shift;
994 $_->_connect for $self->all_storages;
995}
996
997sub _rebless {
998 my $self = shift;
999 $_->_rebless for $self->all_storages;
1000}
1001
1002sub _determine_driver {
1003 my $self = shift;
1004 $_->_determine_driver for $self->all_storages;
1005}
1006
1007sub _driver_determined {
1008 my $self = shift;
fd323bf1 1009
3244fdcc 1010 if (@_) {
1011 $_->_driver_determined(@_) for $self->all_storages;
1012 }
1013
1014 return $self->master->_driver_determined;
1015}
1016
1017sub _init {
1018 my $self = shift;
fd323bf1 1019
3244fdcc 1020 $_->_init for $self->all_storages;
1021}
1022
1023sub _run_connection_actions {
1024 my $self = shift;
fd323bf1 1025
3244fdcc 1026 $_->_run_connection_actions for $self->all_storages;
1027}
1028
1029sub _do_connection_actions {
1030 my $self = shift;
fd323bf1 1031
3244fdcc 1032 if (@_) {
1033 $_->_do_connection_actions(@_) for $self->all_storages;
1034 }
1035}
1036
1037sub connect_call_do_sql {
1038 my $self = shift;
1039 $_->connect_call_do_sql(@_) for $self->all_storages;
1040}
1041
1042sub disconnect_call_do_sql {
1043 my $self = shift;
1044 $_->disconnect_call_do_sql(@_) for $self->all_storages;
1045}
1046
1047sub _seems_connected {
1048 my $self = shift;
1049
1050 return min map $_->_seems_connected, $self->all_storages;
1051}
1052
1053sub _ping {
1054 my $self = shift;
1055
1056 return min map $_->_ping, $self->all_storages;
1057}
1058
bbdda281 1059# not using the normalized_version, because we want to preserve
1060# version numbers much longer than the conventional xxx.yyyzzz
7da56142 1061my $numify_ver = sub {
1062 my $ver = shift;
1063 my @numparts = split /\D+/, $ver;
bbdda281 1064 my $format = '%d.' . (join '', ('%06d') x (@numparts - 1));
7da56142 1065
1066 return sprintf $format, @numparts;
1067};
fecb38cb 1068sub _server_info {
1069 my $self = shift;
1070
bbdda281 1071 if (not $self->_dbh_details->{info}) {
1072 $self->_dbh_details->{info} = (
fd323bf1 1073 reduce { $a->[0] < $b->[0] ? $a : $b }
7da56142 1074 map [ $numify_ver->($_->{dbms_version}), $_ ],
1075 map $_->_server_info, $self->all_storages
1076 )->[1];
fecb38cb 1077 }
1078
bbdda281 1079 return $self->next::method;
fecb38cb 1080}
1081
1082sub _get_server_version {
1083 my $self = shift;
1084
1085 return $self->_server_info->{dbms_version};
1086}
1087
7e38d850 1088=head1 GOTCHAS
1089
1090Due to the fact that replicants can lag behind a master, you must take care to
1091make sure you use one of the methods to force read queries to a master should
1092you need realtime data integrity. For example, if you insert a row, and then
1093immediately re-read it from the database (say, by doing $row->discard_changes)
1094or you insert a row and then immediately build a query that expects that row
1095to be an item, you should force the master to handle reads. Otherwise, due to
1096the lag, there is no certainty your data will be in the expected state.
1097
1098For data integrity, all transactions automatically use the master storage for
1099all read and write queries. Using a transaction is the preferred and recommended
1100method to force the master to handle all read queries.
1101
1102Otherwise, you can force a single query to use the master with the 'force_pool'
1103attribute:
1104
1105 my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk);
1106
1107This attribute will safely be ignore by non replicated storages, so you can use
1108the same code for both types of systems.
1109
1110Lastly, you can use the L</execute_reliably> method, which works very much like
1111a transaction.
1112
1113For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
1114and L</set_balanced_storage>, however this operates at a global level and is not
1115suitable if you have a shared Schema object being used by multiple processes,
1116such as on a web application server. You can get around this limitation by
1117using the Schema clone method.
1118
1119 my $new_schema = $schema->clone;
1120 $new_schema->set_reliable_storage;
d4daee7b 1121
7e38d850 1122 ## $new_schema will use only the Master storage for all reads/writes while
1123 ## the $schema object will use replicated storage.
1124
f5d3a5de 1125=head1 AUTHOR
1126
64cdad22 1127 John Napiorkowski <john.napiorkowski@takkle.com>
f5d3a5de 1128
c4d3fae2 1129Based on code originated by:
f5d3a5de 1130
64cdad22 1131 Norbert Csongrádi <bert@cpan.org>
1132 Peter Siklósi <einon@einon.hu>
2156bbdd 1133
f5d3a5de 1134=head1 LICENSE
1135
1136You may distribute this code under the same terms as Perl itself.
1137
1138=cut
1139
c354902c 1140__PACKAGE__->meta->make_immutable;
1141
f5d3a5de 11421;