Fix exception on complex update/delete under a replicated setup
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
CommitLineData
2156bbdd 1package DBIx::Class::Storage::DBI::Replicated;
f5d3a5de 2
c96bf2bc 3use warnings;
4use strict;
5
ecb65397 6BEGIN {
c96bf2bc 7 require DBIx::Class::Optional::Dependencies;
8 if ( my $missing = DBIx::Class::Optional::Dependencies->req_missing_for('replicated') ) {
9 die "The following modules are required for Replicated storage support: $missing\n";
10 }
ecb65397 11}
12
b2e4d522 13use Moose;
26ab719a 14use DBIx::Class::Storage::DBI;
2bf79155 15use DBIx::Class::Storage::DBI::Replicated::Pool;
26ab719a 16use DBIx::Class::Storage::DBI::Replicated::Balancer;
6a151f58 17use DBIx::Class::Storage::DBI::Replicated::Types qw/BalancerClassNamePart DBICSchema DBICStorageDBI/;
41916570 18use MooseX::Types::Moose qw/ClassName HashRef Object/;
b2e4d522 19use Scalar::Util 'reftype';
e666c5fd 20use Hash::Merge;
7da56142 21use List::Util qw/min max reduce/;
1abccf54 22use Context::Preserve 'preserve_context';
ed7ab0f4 23use Try::Tiny;
9901aad7 24
25use namespace::clean -except => 'meta';
2bf79155 26
27=head1 NAME
28
ecb65397 29DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
2bf79155 30
31=head1 SYNOPSIS
32
33The Following example shows how to change an existing $schema to a replicated
48580715 34storage type, add some replicated (read-only) databases, and perform reporting
955a6df6 35tasks.
2bf79155 36
3da4f736 37You should set the 'storage_type attribute to a replicated type. You should
d4daee7b 38also define your arguments, such as which balancer you want and any arguments
3da4f736 39that the Pool object should get.
40
ce854fd3 41 my $schema = Schema::Class->clone;
592fa86f 42 $schema->storage_type(['::DBI::Replicated', { balancer_type => '::Random' }]);
ce854fd3 43 $schema->connection(...);
d4daee7b 44
fd323bf1 45Next, you need to add in the Replicants. Basically this is an array of
3da4f736 46arrayrefs, where each arrayref is database connect information. Think of these
47arguments as what you'd pass to the 'normal' $schema->connect method.
d4daee7b 48
64cdad22 49 $schema->storage->connect_replicants(
50 [$dsn1, $user, $pass, \%opts],
51 [$dsn2, $user, $pass, \%opts],
52 [$dsn3, $user, $pass, \%opts],
53 );
d4daee7b 54
3da4f736 55Now, just use the $schema as you normally would. Automatically all reads will
56be delegated to the replicants, while writes to the master.
57
7e38d850 58 $schema->resultset('Source')->search({name=>'etc'});
d4daee7b 59
3da4f736 60You can force a given query to use a particular storage using the search
61attribute 'force_pool'. For example:
d4daee7b 62
d7c37d66 63 my $rs = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
3da4f736 64
d7c37d66 65Now $rs will force everything (both reads and writes) to use whatever was setup
fd323bf1 66as the master storage. 'master' is hardcoded to always point to the Master,
3da4f736 67but you can also use any Replicant name. Please see:
212cc5c2 68L<DBIx::Class::Storage::DBI::Replicated::Pool> and the replicants attribute for more.
3da4f736 69
70Also see transactions and L</execute_reliably> for alternative ways to
71force read traffic to the master. In general, you should wrap your statements
72in a transaction when you are reading and writing to the same tables at the
73same time, since your replicants will often lag a bit behind the master.
212cc5c2 74
d7c37d66 75If you have a multi-statement read only transaction you can force it to select
76a random server in the pool by:
77
78 my $rs = $schema->resultset('Source')->search( undef,
79 { force_pool => $db->storage->read_handler->next_storage }
80 );
d4daee7b 81
2bf79155 82=head1 DESCRIPTION
83
7e38d850 84Warning: This class is marked BETA. This has been running a production
ccb3b897 85website using MySQL native replication as its backend and we have some decent
7e38d850 86test coverage but the code hasn't yet been stressed by a variety of databases.
48580715 87Individual DBs may have quirks we are not aware of. Please use this in first
7e38d850 88development and pass along your experiences/bug fixes.
2bf79155 89
90This class implements replicated data store for DBI. Currently you can define
91one master and numerous slave database connections. All write-type queries
92(INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
93database, all read-type queries (SELECTs) go to the slave database.
94
95Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
bca099a3 96handle gets delegated to one of the two attributes: L</read_handler> or to
97L</write_handler>. Additionally, some methods need to be distributed
2bf79155 98to all existing storages. This way our storage class is a drop in replacement
99for L<DBIx::Class::Storage::DBI>.
100
48580715 101Read traffic is spread across the replicants (slaves) occurring to a user
2bf79155 102selected algorithm. The default algorithm is random weighted.
103
bca099a3 104=head1 NOTES
105
48580715 106The consistency between master and replicants is database specific. The Pool
faaba25f 107gives you a method to validate its replicants, removing and replacing them
7e38d850 108when they fail/pass predefined criteria. Please make careful use of the ways
ecb65397 109to force a query to run against Master when needed.
110
111=head1 REQUIREMENTS
112
a34b0c89 113Replicated Storage has additional requirements not currently part of
aa8b2277 114L<DBIx::Class>. See L<DBIx::Class::Optional::Dependencies> for more details.
2bf79155 115
116=head1 ATTRIBUTES
117
118This class defines the following attributes.
119
2ce6e9a6 120=head2 schema
121
122The underlying L<DBIx::Class::Schema> object this storage is attaching
123
124=cut
125
126has 'schema' => (
127 is=>'rw',
6a151f58 128 isa=>DBICSchema,
2ce6e9a6 129 weak_ref=>1,
130 required=>1,
131);
132
26ab719a 133=head2 pool_type
2bf79155 134
fd323bf1 135Contains the classname which will instantiate the L</pool> object. Defaults
26ab719a 136to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
2bf79155 137
138=cut
139
26ab719a 140has 'pool_type' => (
dcdf7b2c 141 is=>'rw',
41916570 142 isa=>ClassName,
2ce6e9a6 143 default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
64cdad22 144 handles=>{
145 'create_pool' => 'new',
146 },
2bf79155 147);
148
f068a139 149=head2 pool_args
150
151Contains a hashref of initialized information to pass to the Balancer object.
212cc5c2 152See L<DBIx::Class::Storage::DBI::Replicated::Pool> for available arguments.
f068a139 153
154=cut
155
156has 'pool_args' => (
dcdf7b2c 157 is=>'rw',
41916570 158 isa=>HashRef,
64cdad22 159 lazy=>1,
64cdad22 160 default=>sub { {} },
f068a139 161);
162
163
26ab719a 164=head2 balancer_type
2bf79155 165
166The replication pool requires a balance class to provider the methods for
167choose how to spread the query load across each replicant in the pool.
168
169=cut
170
26ab719a 171has 'balancer_type' => (
dcdf7b2c 172 is=>'rw',
9901aad7 173 isa=>BalancerClassNamePart,
2ce6e9a6 174 coerce=>1,
175 required=>1,
176 default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
64cdad22 177 handles=>{
178 'create_balancer' => 'new',
179 },
2bf79155 180);
181
17b05c13 182=head2 balancer_args
183
184Contains a hashref of initialized information to pass to the Balancer object.
212cc5c2 185See L<DBIx::Class::Storage::DBI::Replicated::Balancer> for available arguments.
17b05c13 186
187=cut
188
189has 'balancer_args' => (
dcdf7b2c 190 is=>'rw',
41916570 191 isa=>HashRef,
64cdad22 192 lazy=>1,
193 required=>1,
194 default=>sub { {} },
17b05c13 195);
196
26ab719a 197=head2 pool
2bf79155 198
370f78d4 199Is a L<DBIx::Class::Storage::DBI::Replicated::Pool> or derived class. This is a
26ab719a 200container class for one or more replicated databases.
2bf79155 201
202=cut
203
26ab719a 204has 'pool' => (
64cdad22 205 is=>'ro',
206 isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
207 lazy_build=>1,
208 handles=>[qw/
6f7344b8 209 connect_replicants
64cdad22 210 replicants
211 has_replicants
212 /],
2bf79155 213);
214
26ab719a 215=head2 balancer
2bf79155 216
370f78d4 217Is a L<DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class. This
218is a class that takes a pool (L<DBIx::Class::Storage::DBI::Replicated::Pool>)
2bf79155 219
26ab719a 220=cut
2bf79155 221
26ab719a 222has 'balancer' => (
dcdf7b2c 223 is=>'rw',
64cdad22 224 isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
225 lazy_build=>1,
226 handles=>[qw/auto_validate_every/],
26ab719a 227);
2bf79155 228
cb6ec758 229=head2 master
230
231The master defines the canonical state for a pool of connected databases. All
232the replicants are expected to match this databases state. Thus, in a classic
233Master / Slaves distributed system, all the slaves are expected to replicate
234the Master's state as quick as possible. This is the only database in the
235pool of databases that is allowed to handle write traffic.
236
237=cut
238
239has 'master' => (
64cdad22 240 is=> 'ro',
6a151f58 241 isa=>DBICStorageDBI,
64cdad22 242 lazy_build=>1,
cb6ec758 243);
244
cb6ec758 245=head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
246
fd323bf1 247The following methods are delegated all the methods required for the
cb6ec758 248L<DBIx::Class::Storage::DBI> interface.
249
cb6ec758 250=cut
251
4bea1fe7 252my $method_dispatch = {
253 writer => [qw/
64cdad22 254 on_connect_do
6f7344b8 255 on_disconnect_do
3244fdcc 256 on_connect_call
257 on_disconnect_call
64cdad22 258 connect_info
3244fdcc 259 _connect_info
64cdad22 260 throw_exception
261 sql_maker
262 sqlt_type
263 create_ddl_dir
264 deployment_statements
265 datetime_parser
6f7344b8 266 datetime_parser_type
267 build_datetime_parser
64cdad22 268 last_insert_id
269 insert
64cdad22 270 update
271 delete
272 dbh
2ce6e9a6 273 txn_begin
64cdad22 274 txn_do
275 txn_commit
276 txn_rollback
2ce6e9a6 277 txn_scope_guard
90d7422f 278 _exec_txn_rollback
279 _exec_txn_begin
280 _exec_txn_commit
64cdad22 281 deploy
0180bef9 282 with_deferred_fk_checks
6f7344b8 283 dbh_do
2ce6e9a6 284 _prep_for_execute
6f7344b8 285 is_datatype_numeric
286 _count_select
6f7344b8 287 svp_rollback
288 svp_begin
289 svp_release
e398f77e 290 relname_to_table_alias
3244fdcc 291 _dbh_last_insert_id
3244fdcc 292 _default_dbi_connect_attributes
293 _dbi_connect_info
b9ca4ff1 294 _dbic_connect_attributes
3244fdcc 295 auto_savepoint
0e773352 296 _query_start
3244fdcc 297 _query_end
0e773352 298 _format_for_trace
299 _dbi_attrs_for_bind
3244fdcc 300 bind_attribute_by_data_type
301 transaction_depth
302 _dbh
303 _select_args
52cef7e3 304 _dbh_execute_for_fetch
3244fdcc 305 _sql_maker
3244fdcc 306 _dbh_execute_inserts_with_no_binds
307 _select_args_to_query
5e782048 308 _gen_sql_bind
3244fdcc 309 _svp_generate_name
3244fdcc 310 _normalize_connect_info
311 _parse_connect_do
3244fdcc 312 savepoints
3244fdcc 313 _sql_maker_opts
7d6c28b7 314 _use_multicolumn_in
3244fdcc 315 _conn_pid
3244fdcc 316 _dbh_autocommit
317 _native_data_type
318 _get_dbh
319 sql_maker_class
2a6dda4b 320 insert_bulk
321 _insert_bulk
3244fdcc 322 _execute
323 _do_query
3244fdcc 324 _dbh_execute
5b4f3fd0 325 /, Class::MOP::Class->initialize('DBIx::Class::Storage::DBIHacks')->get_method_list ],
4bea1fe7 326 reader => [qw/
327 select
328 select_single
329 columns_info_for
330 _dbh_columns_info_for
331 _select
332 /],
333 unimplemented => [qw/
334 _arm_global_destructor
335 _verify_pid
336
337 get_use_dbms_capability
338 set_use_dbms_capability
339 get_dbms_capability
340 set_dbms_capability
341 _dbh_details
342 _dbh_get_info
343
75d3bdb2 344 _determine_connector_driver
37b5ab51 345 _extract_driver_from_connect_info
75d3bdb2 346 _describe_connection
347 _warn_undetermined_driver
348
4bea1fe7 349 sql_limit_dialect
350 sql_quote_char
351 sql_name_sep
352
4bea1fe7 353 _prefetch_autovalues
fabbd5cc 354 _perform_autoinc_retrieval
355 _autoinc_supplied_for_op
4bea1fe7 356
eec07bca 357 _resolve_bindattrs
358
4bea1fe7 359 _max_column_bytesize
360 _is_lob_type
361 _is_binary_lob_type
5efba7fc 362 _is_binary_type
4bea1fe7 363 _is_text_lob_type
402ac1c9 364
9930caaf 365 _prepare_sth
366 _bind_sth_params
4bea1fe7 367 /,(
368 # the capability framework
369 # not sure if CMOP->initialize does evil things to DBIC::S::DBI, fix if a problem
370 grep
7d6c28b7 371 { $_ =~ /^ _ (?: use | supports | determine_supports ) _ /x and $_ ne '_use_multicolumn_in' }
4bea1fe7 372 ( Class::MOP::Class->initialize('DBIx::Class::Storage::DBI')->get_all_method_names )
373 )],
374};
375
376if (DBIx::Class::_ENV_::DBICTEST) {
377
378 my $seen;
379 for my $type (keys %$method_dispatch) {
380 for (@{$method_dispatch->{$type}}) {
381 push @{$seen->{$_}}, $type;
382 }
383 }
cb6ec758 384
4bea1fe7 385 if (my @dupes = grep { @{$seen->{$_}} > 1 } keys %$seen) {
386 die(join "\n", '',
387 'The following methods show up multiple times in ::Storage::DBI::Replicated handlers:',
388 (map { "$_: " . (join ', ', @{$seen->{$_}}) } sort @dupes),
389 '',
390 );
391 }
bbdda281 392
4bea1fe7 393 if (my @cant = grep { ! DBIx::Class::Storage::DBI->can($_) } keys %$seen) {
394 die(join "\n", '',
395 '::Storage::DBI::Replicated specifies handling of the following *NON EXISTING* ::Storage::DBI methods:',
396 @cant,
397 '',
398 );
399 }
400}
bbdda281 401
4bea1fe7 402for my $method (@{$method_dispatch->{unimplemented}}) {
403 __PACKAGE__->meta->add_method($method, sub {
70c28808 404 my $self = shift;
e705f529 405 $self->throw_exception("$method() must not be called on ".(blessed $self).' objects');
4bea1fe7 406 });
407}
31a8aaaf 408
4bea1fe7 409=head2 read_handler
584ea6e4 410
5529838f 411Defines an object that implements the read side of L<DBIx::Class::Storage::DBI>.
584ea6e4 412
4bea1fe7 413=cut
414
415has 'read_handler' => (
416 is=>'rw',
417 isa=>Object,
418 lazy_build=>1,
419 handles=>$method_dispatch->{reader},
e471ab87 420);
421
4bea1fe7 422=head2 write_handler
423
5529838f 424Defines an object that implements the write side of L<DBIx::Class::Storage::DBI>,
4bea1fe7 425as well as methods that don't write or read that can be called on only one
426storage, methods that return a C<$dbh>, and any methods that don't make sense to
427run on a replicant.
428
429=cut
430
431has 'write_handler' => (
432 is=>'ro',
433 isa=>Object,
434 lazy_build=>1,
435 handles=>$method_dispatch->{writer},
7f4433eb 436);
437
4bea1fe7 438
6d766626 439
b2e4d522 440has _master_connect_info_opts =>
441 (is => 'rw', isa => HashRef, default => sub { {} });
442
443=head2 around: connect_info
444
48580715 445Preserves master's C<connect_info> options (for merging with replicants.)
446Also sets any Replicated-related options from connect_info, such as
dcdf7b2c 447C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>.
b2e4d522 448
449=cut
450
451around connect_info => sub {
452 my ($next, $self, $info, @extra) = @_;
453
1fccee7b 454 $self->throw_exception(
455 'connect_info can not be retrieved from a replicated storage - '
456 . 'accessor must be called on a specific pool instance'
457 ) unless defined $info;
458
282a9a4f 459 my $merge = Hash::Merge->new('LEFT_PRECEDENT');
e666c5fd 460
b2e4d522 461 my %opts;
462 for my $arg (@$info) {
463 next unless (reftype($arg)||'') eq 'HASH';
e666c5fd 464 %opts = %{ $merge->merge($arg, \%opts) };
b2e4d522 465 }
b2e4d522 466 delete $opts{dsn};
467
dcdf7b2c 468 if (@opts{qw/pool_type pool_args/}) {
469 $self->pool_type(delete $opts{pool_type})
470 if $opts{pool_type};
471
b88b85e7 472 $self->pool_args(
e666c5fd 473 $merge->merge((delete $opts{pool_args} || {}), $self->pool_args)
b88b85e7 474 );
dcdf7b2c 475
64ae1667 476 ## Since we possibly changed the pool_args, we need to clear the current
477 ## pool object so that next time it is used it will be rebuilt.
478 $self->clear_pool;
dcdf7b2c 479 }
480
481 if (@opts{qw/balancer_type balancer_args/}) {
482 $self->balancer_type(delete $opts{balancer_type})
483 if $opts{balancer_type};
484
b88b85e7 485 $self->balancer_args(
e666c5fd 486 $merge->merge((delete $opts{balancer_args} || {}), $self->balancer_args)
b88b85e7 487 );
dcdf7b2c 488
67c43863 489 $self->balancer($self->_build_balancer)
6f7344b8 490 if $self->balancer;
dcdf7b2c 491 }
492
b2e4d522 493 $self->_master_connect_info_opts(\%opts);
494
1abccf54 495 return preserve_context {
496 $self->$next($info, @extra);
497 } after => sub {
498 # Make sure master is blessed into the correct class and apply role to it.
499 my $master = $self->master;
500 $master->_determine_driver;
501 Moose::Meta::Class->initialize(ref $master);
cea43436 502
1abccf54 503 DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master);
0ce2d0d5 504
1abccf54 505 # link pool back to master
506 $self->pool->master($master);
507 };
b2e4d522 508};
509
26ab719a 510=head1 METHODS
2bf79155 511
26ab719a 512This class defines the following methods.
2bf79155 513
c354902c 514=head2 BUILDARGS
2bf79155 515
faaba25f 516L<DBIx::Class::Schema> when instantiating its storage passed itself as the
2ce6e9a6 517first argument. So we need to massage the arguments a bit so that all the
518bits get put into the correct places.
2bf79155 519
520=cut
521
c354902c 522sub BUILDARGS {
fd323bf1 523 my ($class, $schema, $storage_type_args, @args) = @_;
d4daee7b 524
c354902c 525 return {
6f7344b8 526 schema=>$schema,
527 %$storage_type_args,
528 @args
c354902c 529 }
530}
2bf79155 531
cb6ec758 532=head2 _build_master
2bf79155 533
cb6ec758 534Lazy builder for the L</master> attribute.
2bf79155 535
536=cut
537
cb6ec758 538sub _build_master {
2ce6e9a6 539 my $self = shift @_;
ee356d00 540 my $master = DBIx::Class::Storage::DBI->new($self->schema);
ee356d00 541 $master
106d5f3b 542}
543
26ab719a 544=head2 _build_pool
2bf79155 545
26ab719a 546Lazy builder for the L</pool> attribute.
2bf79155 547
548=cut
549
26ab719a 550sub _build_pool {
64cdad22 551 my $self = shift @_;
552 $self->create_pool(%{$self->pool_args});
2bf79155 553}
554
26ab719a 555=head2 _build_balancer
2bf79155 556
cb6ec758 557Lazy builder for the L</balancer> attribute. This takes a Pool object so that
558the balancer knows which pool it's balancing.
2bf79155 559
560=cut
561
26ab719a 562sub _build_balancer {
64cdad22 563 my $self = shift @_;
564 $self->create_balancer(
6f7344b8 565 pool=>$self->pool,
64cdad22 566 master=>$self->master,
567 %{$self->balancer_args},
568 );
2bf79155 569}
570
cb6ec758 571=head2 _build_write_handler
2bf79155 572
cb6ec758 573Lazy builder for the L</write_handler> attribute. The default is to set this to
574the L</master>.
50336325 575
576=cut
577
cb6ec758 578sub _build_write_handler {
64cdad22 579 return shift->master;
cb6ec758 580}
50336325 581
cb6ec758 582=head2 _build_read_handler
2bf79155 583
cb6ec758 584Lazy builder for the L</read_handler> attribute. The default is to set this to
585the L</balancer>.
2bf79155 586
587=cut
588
cb6ec758 589sub _build_read_handler {
64cdad22 590 return shift->balancer;
cb6ec758 591}
50336325 592
cb6ec758 593=head2 around: connect_replicants
2bf79155 594
cb6ec758 595All calls to connect_replicants needs to have an existing $schema tacked onto
5529838f 596top of the args, since L<DBIx::Class::Storage::DBI> needs it, and any
597L<connect_info|DBIx::Class::Storage::DBI/connect_info>
b2e4d522 598options merged with the master, with replicant opts having higher priority.
955a6df6 599
cb6ec758 600=cut
955a6df6 601
b2e4d522 602around connect_replicants => sub {
603 my ($next, $self, @args) = @_;
604
605 for my $r (@args) {
606 $r = [ $r ] unless reftype $r eq 'ARRAY';
607
1a58752c 608 $self->throw_exception('coderef replicant connect_info not supported')
b2e4d522 609 if ref $r->[0] && reftype $r->[0] eq 'CODE';
610
611# any connect_info options?
612 my $i = 0;
613 $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
614
6f7344b8 615# make one if none
b2e4d522 616 $r->[$i] = {} unless $r->[$i];
617
618# merge if two hashes
b88b85e7 619 my @hashes = @$r[$i .. $#{$r}];
620
1a58752c 621 $self->throw_exception('invalid connect_info options')
b88b85e7 622 if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes;
623
1a58752c 624 $self->throw_exception('too many hashrefs in connect_info')
b88b85e7 625 if @hashes > 2;
626
282a9a4f 627 my $merge = Hash::Merge->new('LEFT_PRECEDENT');
e666c5fd 628 my %opts = %{ $merge->merge(reverse @hashes) };
b88b85e7 629
630# delete them
b2e4d522 631 splice @$r, $i+1, ($#{$r} - $i), ();
632
0bd8e058 633# make sure master/replicants opts don't clash
634 my %master_opts = %{ $self->_master_connect_info_opts };
635 if (exists $opts{dbh_maker}) {
636 delete @master_opts{qw/dsn user password/};
637 }
638 delete $master_opts{dbh_maker};
639
b2e4d522 640# merge with master
e666c5fd 641 %opts = %{ $merge->merge(\%opts, \%master_opts) };
b2e4d522 642
643# update
644 $r->[$i] = \%opts;
645 }
646
647 $self->$next($self->schema, @args);
955a6df6 648};
2bf79155 649
2bf79155 650=head2 all_storages
651
4a0eed52 652Returns an array of all the connected storage backends. The first element
653in the returned array is the master, and the rest are each of the
2bf79155 654replicants.
655
656=cut
657
658sub all_storages {
64cdad22 659 my $self = shift @_;
660 return grep {defined $_ && blessed $_} (
661 $self->master,
6412a592 662 values %{ $self->replicants },
64cdad22 663 );
2bf79155 664}
665
c4d3fae2 666=head2 execute_reliably ($coderef, ?@args)
667
668Given a coderef, saves the current state of the L</read_handler>, forces it to
48580715 669use reliable storage (e.g. sets it to the master), executes a coderef and then
c4d3fae2 670restores the original state.
671
672Example:
673
64cdad22 674 my $reliably = sub {
675 my $name = shift @_;
676 $schema->resultset('User')->create({name=>$name});
fd323bf1 677 my $user_rs = $schema->resultset('User')->find({name=>$name});
64cdad22 678 return $user_rs;
679 };
c4d3fae2 680
64cdad22 681 my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
c4d3fae2 682
683Use this when you must be certain of your database state, such as when you just
684inserted something and need to get a resultset including it, etc.
685
686=cut
687
688sub execute_reliably {
1abccf54 689 my $self = shift;
690 my $coderef = shift;
d4daee7b 691
64cdad22 692 unless( ref $coderef eq 'CODE') {
693 $self->throw_exception('Second argument must be a coderef');
694 }
d4daee7b 695
1abccf54 696 ## replace the current read handler for the remainder of the scope
697 local $self->{read_handler} = $self->master;
d4daee7b 698
1abccf54 699 my $args = \@_;
700 return try {
701 $coderef->(@$args);
ed7ab0f4 702 } catch {
703 $self->throw_exception("coderef returned an error: $_");
64cdad22 704 };
c4d3fae2 705}
706
cb6ec758 707=head2 set_reliable_storage
708
709Sets the current $schema to be 'reliable', that is all queries, both read and
710write are sent to the master
d4daee7b 711
cb6ec758 712=cut
713
714sub set_reliable_storage {
64cdad22 715 my $self = shift @_;
716 my $schema = $self->schema;
717 my $write_handler = $self->schema->storage->write_handler;
d4daee7b 718
64cdad22 719 $schema->storage->read_handler($write_handler);
cb6ec758 720}
721
722=head2 set_balanced_storage
723
724Sets the current $schema to be use the </balancer> for all reads, while all
48580715 725writes are sent to the master only
d4daee7b 726
cb6ec758 727=cut
728
729sub set_balanced_storage {
64cdad22 730 my $self = shift @_;
731 my $schema = $self->schema;
bd5da369 732 my $balanced_handler = $self->schema->storage->balancer;
d4daee7b 733
bd5da369 734 $schema->storage->read_handler($balanced_handler);
cb6ec758 735}
2bf79155 736
737=head2 connected
738
739Check that the master and at least one of the replicants is connected.
740
741=cut
742
743sub connected {
64cdad22 744 my $self = shift @_;
745 return
746 $self->master->connected &&
747 $self->pool->connected_replicants;
2bf79155 748}
749
2bf79155 750=head2 ensure_connected
751
752Make sure all the storages are connected.
753
754=cut
755
756sub ensure_connected {
64cdad22 757 my $self = shift @_;
758 foreach my $source ($self->all_storages) {
759 $source->ensure_connected(@_);
760 }
2bf79155 761}
762
2bf79155 763=head2 limit_dialect
764
765Set the limit_dialect for all existing storages
766
767=cut
768
769sub limit_dialect {
64cdad22 770 my $self = shift @_;
771 foreach my $source ($self->all_storages) {
772 $source->limit_dialect(@_);
773 }
f3e9f010 774 return $self->master->limit_dialect;
2bf79155 775}
776
2bf79155 777=head2 quote_char
778
779Set the quote_char for all existing storages
780
781=cut
782
783sub quote_char {
64cdad22 784 my $self = shift @_;
785 foreach my $source ($self->all_storages) {
786 $source->quote_char(@_);
787 }
3fbe08e3 788 return $self->master->quote_char;
2bf79155 789}
790
2bf79155 791=head2 name_sep
792
793Set the name_sep for all existing storages
794
795=cut
796
797sub name_sep {
64cdad22 798 my $self = shift @_;
799 foreach my $source ($self->all_storages) {
800 $source->name_sep(@_);
801 }
3fbe08e3 802 return $self->master->name_sep;
2bf79155 803}
804
2bf79155 805=head2 set_schema
806
807Set the schema object for all existing storages
808
809=cut
810
811sub set_schema {
64cdad22 812 my $self = shift @_;
813 foreach my $source ($self->all_storages) {
814 $source->set_schema(@_);
815 }
2bf79155 816}
817
2bf79155 818=head2 debug
819
820set a debug flag across all storages
821
822=cut
823
824sub debug {
64cdad22 825 my $self = shift @_;
3fbe08e3 826 if(@_) {
827 foreach my $source ($self->all_storages) {
828 $source->debug(@_);
6f7344b8 829 }
64cdad22 830 }
3fbe08e3 831 return $self->master->debug;
2bf79155 832}
833
2bf79155 834=head2 debugobj
835
cea43436 836set a debug object
2bf79155 837
838=cut
839
840sub debugobj {
64cdad22 841 my $self = shift @_;
cea43436 842 return $self->master->debugobj(@_);
2bf79155 843}
844
2bf79155 845=head2 debugfh
846
cea43436 847set a debugfh object
2bf79155 848
849=cut
850
851sub debugfh {
64cdad22 852 my $self = shift @_;
cea43436 853 return $self->master->debugfh(@_);
2bf79155 854}
855
2bf79155 856=head2 debugcb
857
cea43436 858set a debug callback
2bf79155 859
860=cut
861
862sub debugcb {
64cdad22 863 my $self = shift @_;
cea43436 864 return $self->master->debugcb(@_);
2bf79155 865}
866
2bf79155 867=head2 disconnect
868
869disconnect everything
870
871=cut
872
873sub disconnect {
64cdad22 874 my $self = shift @_;
875 foreach my $source ($self->all_storages) {
876 $source->disconnect(@_);
877 }
2bf79155 878}
879
b2e4d522 880=head2 cursor_class
881
882set cursor class on all storages, or return master's
883
884=cut
885
886sub cursor_class {
887 my ($self, $cursor_class) = @_;
888
889 if ($cursor_class) {
890 $_->cursor_class($cursor_class) for $self->all_storages;
891 }
892 $self->master->cursor_class;
893}
d4daee7b 894
3244fdcc 895=head2 cursor
896
897set cursor class on all storages, or return master's, alias for L</cursor_class>
898above.
899
900=cut
901
902sub cursor {
903 my ($self, $cursor_class) = @_;
904
905 if ($cursor_class) {
906 $_->cursor($cursor_class) for $self->all_storages;
907 }
908 $self->master->cursor;
909}
910
911=head2 unsafe
912
913sets the L<DBIx::Class::Storage::DBI/unsafe> option on all storages or returns
914master's current setting
915
916=cut
917
918sub unsafe {
919 my $self = shift;
920
921 if (@_) {
922 $_->unsafe(@_) for $self->all_storages;
923 }
924
925 return $self->master->unsafe;
926}
927
928=head2 disable_sth_caching
929
930sets the L<DBIx::Class::Storage::DBI/disable_sth_caching> option on all storages
931or returns master's current setting
932
933=cut
934
935sub disable_sth_caching {
936 my $self = shift;
937
938 if (@_) {
939 $_->disable_sth_caching(@_) for $self->all_storages;
940 }
941
942 return $self->master->disable_sth_caching;
943}
944
945=head2 lag_behind_master
946
947returns the highest Replicant L<DBIx::Class::Storage::DBI/lag_behind_master>
948setting
949
950=cut
951
952sub lag_behind_master {
953 my $self = shift;
954
955 return max map $_->lag_behind_master, $self->replicants;
fd323bf1 956}
3244fdcc 957
958=head2 is_replicating
959
960returns true if all replicants return true for
961L<DBIx::Class::Storage::DBI/is_replicating>
962
963=cut
964
965sub is_replicating {
966 my $self = shift;
967
968 return (grep $_->is_replicating, $self->replicants) == ($self->replicants);
969}
970
971=head2 connect_call_datetime_setup
972
973calls L<DBIx::Class::Storage::DBI/connect_call_datetime_setup> for all storages
974
975=cut
976
977sub connect_call_datetime_setup {
978 my $self = shift;
979 $_->connect_call_datetime_setup for $self->all_storages;
980}
981
982sub _populate_dbh {
983 my $self = shift;
984 $_->_populate_dbh for $self->all_storages;
985}
986
987sub _connect {
988 my $self = shift;
989 $_->_connect for $self->all_storages;
990}
991
992sub _rebless {
993 my $self = shift;
994 $_->_rebless for $self->all_storages;
995}
996
997sub _determine_driver {
998 my $self = shift;
999 $_->_determine_driver for $self->all_storages;
1000}
1001
1002sub _driver_determined {
1003 my $self = shift;
fd323bf1 1004
3244fdcc 1005 if (@_) {
1006 $_->_driver_determined(@_) for $self->all_storages;
1007 }
1008
1009 return $self->master->_driver_determined;
1010}
1011
1012sub _init {
1013 my $self = shift;
fd323bf1 1014
3244fdcc 1015 $_->_init for $self->all_storages;
1016}
1017
1018sub _run_connection_actions {
1019 my $self = shift;
fd323bf1 1020
3244fdcc 1021 $_->_run_connection_actions for $self->all_storages;
1022}
1023
1024sub _do_connection_actions {
1025 my $self = shift;
fd323bf1 1026
3244fdcc 1027 if (@_) {
1028 $_->_do_connection_actions(@_) for $self->all_storages;
1029 }
1030}
1031
1032sub connect_call_do_sql {
1033 my $self = shift;
1034 $_->connect_call_do_sql(@_) for $self->all_storages;
1035}
1036
1037sub disconnect_call_do_sql {
1038 my $self = shift;
1039 $_->disconnect_call_do_sql(@_) for $self->all_storages;
1040}
1041
1042sub _seems_connected {
1043 my $self = shift;
1044
1045 return min map $_->_seems_connected, $self->all_storages;
1046}
1047
1048sub _ping {
1049 my $self = shift;
1050
1051 return min map $_->_ping, $self->all_storages;
1052}
1053
bbdda281 1054# not using the normalized_version, because we want to preserve
1055# version numbers much longer than the conventional xxx.yyyzzz
7da56142 1056my $numify_ver = sub {
1057 my $ver = shift;
1058 my @numparts = split /\D+/, $ver;
bbdda281 1059 my $format = '%d.' . (join '', ('%06d') x (@numparts - 1));
7da56142 1060
1061 return sprintf $format, @numparts;
1062};
fecb38cb 1063sub _server_info {
1064 my $self = shift;
1065
bbdda281 1066 if (not $self->_dbh_details->{info}) {
1067 $self->_dbh_details->{info} = (
fd323bf1 1068 reduce { $a->[0] < $b->[0] ? $a : $b }
7da56142 1069 map [ $numify_ver->($_->{dbms_version}), $_ ],
1070 map $_->_server_info, $self->all_storages
1071 )->[1];
fecb38cb 1072 }
1073
bbdda281 1074 return $self->next::method;
fecb38cb 1075}
1076
1077sub _get_server_version {
1078 my $self = shift;
1079
1080 return $self->_server_info->{dbms_version};
1081}
1082
7e38d850 1083=head1 GOTCHAS
1084
1085Due to the fact that replicants can lag behind a master, you must take care to
1086make sure you use one of the methods to force read queries to a master should
1087you need realtime data integrity. For example, if you insert a row, and then
3dd506b8 1088immediately re-read it from the database (say, by doing
1089L<< $result->discard_changes|DBIx::Class::Row/discard_changes >>)
7e38d850 1090or you insert a row and then immediately build a query that expects that row
1091to be an item, you should force the master to handle reads. Otherwise, due to
1092the lag, there is no certainty your data will be in the expected state.
1093
1094For data integrity, all transactions automatically use the master storage for
1095all read and write queries. Using a transaction is the preferred and recommended
1096method to force the master to handle all read queries.
1097
1098Otherwise, you can force a single query to use the master with the 'force_pool'
1099attribute:
1100
47d7b769 1101 my $result = $resultset->search(undef, {force_pool=>'master'})->find($pk);
7e38d850 1102
b1de1b06 1103This attribute will safely be ignored by non replicated storages, so you can use
7e38d850 1104the same code for both types of systems.
1105
1106Lastly, you can use the L</execute_reliably> method, which works very much like
1107a transaction.
1108
1109For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
1110and L</set_balanced_storage>, however this operates at a global level and is not
1111suitable if you have a shared Schema object being used by multiple processes,
1112such as on a web application server. You can get around this limitation by
1113using the Schema clone method.
1114
1115 my $new_schema = $schema->clone;
1116 $new_schema->set_reliable_storage;
d4daee7b 1117
7e38d850 1118 ## $new_schema will use only the Master storage for all reads/writes while
1119 ## the $schema object will use replicated storage.
1120
a2bd3796 1121=head1 FURTHER QUESTIONS?
f5d3a5de 1122
a2bd3796 1123Check the list of L<additional DBIC resources|DBIx::Class/GETTING HELP/SUPPORT>.
2156bbdd 1124
a2bd3796 1125=head1 COPYRIGHT AND LICENSE
f5d3a5de 1126
a2bd3796 1127This module is free software L<copyright|DBIx::Class/COPYRIGHT AND LICENSE>
1128by the L<DBIx::Class (DBIC) authors|DBIx::Class/AUTHORS>. You can
1129redistribute it and/or modify it under the same terms as the
1130L<DBIx::Class library|DBIx::Class/COPYRIGHT AND LICENSE>.
f5d3a5de 1131
1132=cut
1133
c354902c 1134__PACKAGE__->meta->make_immutable;
1135
f5d3a5de 11361;