Majorly cleanup $rs->update/delete (no $rs-aware code should be in ::Storages)
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
CommitLineData
2156bbdd 1package DBIx::Class::Storage::DBI::Replicated;
f5d3a5de 2
ecb65397 3BEGIN {
a34b0c89 4 use DBIx::Class;
70c28808 5 die('The following modules are required for Replication ' . DBIx::Class::Optional::Dependencies->req_missing_for ('replicated') . "\n" )
a34b0c89 6 unless DBIx::Class::Optional::Dependencies->req_ok_for ('replicated');
ecb65397 7}
8
b2e4d522 9use Moose;
26ab719a 10use DBIx::Class::Storage::DBI;
2bf79155 11use DBIx::Class::Storage::DBI::Replicated::Pool;
26ab719a 12use DBIx::Class::Storage::DBI::Replicated::Balancer;
6a151f58 13use DBIx::Class::Storage::DBI::Replicated::Types qw/BalancerClassNamePart DBICSchema DBICStorageDBI/;
41916570 14use MooseX::Types::Moose qw/ClassName HashRef Object/;
b2e4d522 15use Scalar::Util 'reftype';
e666c5fd 16use Hash::Merge;
7da56142 17use List::Util qw/min max reduce/;
ed7ab0f4 18use Try::Tiny;
fd323bf1 19use namespace::clean;
9901aad7 20
21use namespace::clean -except => 'meta';
2bf79155 22
23=head1 NAME
24
ecb65397 25DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
2bf79155 26
27=head1 SYNOPSIS
28
29The Following example shows how to change an existing $schema to a replicated
48580715 30storage type, add some replicated (read-only) databases, and perform reporting
955a6df6 31tasks.
2bf79155 32
3da4f736 33You should set the 'storage_type attribute to a replicated type. You should
d4daee7b 34also define your arguments, such as which balancer you want and any arguments
3da4f736 35that the Pool object should get.
36
ce854fd3 37 my $schema = Schema::Class->clone;
64cdad22 38 $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
ce854fd3 39 $schema->connection(...);
d4daee7b 40
fd323bf1 41Next, you need to add in the Replicants. Basically this is an array of
3da4f736 42arrayrefs, where each arrayref is database connect information. Think of these
43arguments as what you'd pass to the 'normal' $schema->connect method.
d4daee7b 44
64cdad22 45 $schema->storage->connect_replicants(
46 [$dsn1, $user, $pass, \%opts],
47 [$dsn2, $user, $pass, \%opts],
48 [$dsn3, $user, $pass, \%opts],
49 );
d4daee7b 50
3da4f736 51Now, just use the $schema as you normally would. Automatically all reads will
52be delegated to the replicants, while writes to the master.
53
7e38d850 54 $schema->resultset('Source')->search({name=>'etc'});
d4daee7b 55
3da4f736 56You can force a given query to use a particular storage using the search
57attribute 'force_pool'. For example:
d4daee7b 58
d7c37d66 59 my $rs = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
3da4f736 60
d7c37d66 61Now $rs will force everything (both reads and writes) to use whatever was setup
fd323bf1 62as the master storage. 'master' is hardcoded to always point to the Master,
3da4f736 63but you can also use any Replicant name. Please see:
212cc5c2 64L<DBIx::Class::Storage::DBI::Replicated::Pool> and the replicants attribute for more.
3da4f736 65
66Also see transactions and L</execute_reliably> for alternative ways to
67force read traffic to the master. In general, you should wrap your statements
68in a transaction when you are reading and writing to the same tables at the
69same time, since your replicants will often lag a bit behind the master.
212cc5c2 70
d7c37d66 71If you have a multi-statement read only transaction you can force it to select
72a random server in the pool by:
73
74 my $rs = $schema->resultset('Source')->search( undef,
75 { force_pool => $db->storage->read_handler->next_storage }
76 );
d4daee7b 77
2bf79155 78=head1 DESCRIPTION
79
7e38d850 80Warning: This class is marked BETA. This has been running a production
ccb3b897 81website using MySQL native replication as its backend and we have some decent
7e38d850 82test coverage but the code hasn't yet been stressed by a variety of databases.
48580715 83Individual DBs may have quirks we are not aware of. Please use this in first
7e38d850 84development and pass along your experiences/bug fixes.
2bf79155 85
86This class implements replicated data store for DBI. Currently you can define
87one master and numerous slave database connections. All write-type queries
88(INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
89database, all read-type queries (SELECTs) go to the slave database.
90
91Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
bca099a3 92handle gets delegated to one of the two attributes: L</read_handler> or to
93L</write_handler>. Additionally, some methods need to be distributed
2bf79155 94to all existing storages. This way our storage class is a drop in replacement
95for L<DBIx::Class::Storage::DBI>.
96
48580715 97Read traffic is spread across the replicants (slaves) occurring to a user
2bf79155 98selected algorithm. The default algorithm is random weighted.
99
bca099a3 100=head1 NOTES
101
48580715 102The consistency between master and replicants is database specific. The Pool
faaba25f 103gives you a method to validate its replicants, removing and replacing them
7e38d850 104when they fail/pass predefined criteria. Please make careful use of the ways
ecb65397 105to force a query to run against Master when needed.
106
107=head1 REQUIREMENTS
108
a34b0c89 109Replicated Storage has additional requirements not currently part of
aa8b2277 110L<DBIx::Class>. See L<DBIx::Class::Optional::Dependencies> for more details.
2bf79155 111
112=head1 ATTRIBUTES
113
114This class defines the following attributes.
115
2ce6e9a6 116=head2 schema
117
118The underlying L<DBIx::Class::Schema> object this storage is attaching
119
120=cut
121
122has 'schema' => (
123 is=>'rw',
6a151f58 124 isa=>DBICSchema,
2ce6e9a6 125 weak_ref=>1,
126 required=>1,
127);
128
26ab719a 129=head2 pool_type
2bf79155 130
fd323bf1 131Contains the classname which will instantiate the L</pool> object. Defaults
26ab719a 132to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
2bf79155 133
134=cut
135
26ab719a 136has 'pool_type' => (
dcdf7b2c 137 is=>'rw',
41916570 138 isa=>ClassName,
2ce6e9a6 139 default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
64cdad22 140 handles=>{
141 'create_pool' => 'new',
142 },
2bf79155 143);
144
f068a139 145=head2 pool_args
146
147Contains a hashref of initialized information to pass to the Balancer object.
212cc5c2 148See L<DBIx::Class::Storage::DBI::Replicated::Pool> for available arguments.
f068a139 149
150=cut
151
152has 'pool_args' => (
dcdf7b2c 153 is=>'rw',
41916570 154 isa=>HashRef,
64cdad22 155 lazy=>1,
64cdad22 156 default=>sub { {} },
f068a139 157);
158
159
26ab719a 160=head2 balancer_type
2bf79155 161
162The replication pool requires a balance class to provider the methods for
163choose how to spread the query load across each replicant in the pool.
164
165=cut
166
26ab719a 167has 'balancer_type' => (
dcdf7b2c 168 is=>'rw',
9901aad7 169 isa=>BalancerClassNamePart,
2ce6e9a6 170 coerce=>1,
171 required=>1,
172 default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
64cdad22 173 handles=>{
174 'create_balancer' => 'new',
175 },
2bf79155 176);
177
17b05c13 178=head2 balancer_args
179
180Contains a hashref of initialized information to pass to the Balancer object.
212cc5c2 181See L<DBIx::Class::Storage::DBI::Replicated::Balancer> for available arguments.
17b05c13 182
183=cut
184
185has 'balancer_args' => (
dcdf7b2c 186 is=>'rw',
41916570 187 isa=>HashRef,
64cdad22 188 lazy=>1,
189 required=>1,
190 default=>sub { {} },
17b05c13 191);
192
26ab719a 193=head2 pool
2bf79155 194
370f78d4 195Is a L<DBIx::Class::Storage::DBI::Replicated::Pool> or derived class. This is a
26ab719a 196container class for one or more replicated databases.
2bf79155 197
198=cut
199
26ab719a 200has 'pool' => (
64cdad22 201 is=>'ro',
202 isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
203 lazy_build=>1,
204 handles=>[qw/
6f7344b8 205 connect_replicants
64cdad22 206 replicants
207 has_replicants
208 /],
2bf79155 209);
210
26ab719a 211=head2 balancer
2bf79155 212
370f78d4 213Is a L<DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class. This
214is a class that takes a pool (L<DBIx::Class::Storage::DBI::Replicated::Pool>)
2bf79155 215
26ab719a 216=cut
2bf79155 217
26ab719a 218has 'balancer' => (
dcdf7b2c 219 is=>'rw',
64cdad22 220 isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
221 lazy_build=>1,
222 handles=>[qw/auto_validate_every/],
26ab719a 223);
2bf79155 224
cb6ec758 225=head2 master
226
227The master defines the canonical state for a pool of connected databases. All
228the replicants are expected to match this databases state. Thus, in a classic
229Master / Slaves distributed system, all the slaves are expected to replicate
230the Master's state as quick as possible. This is the only database in the
231pool of databases that is allowed to handle write traffic.
232
233=cut
234
235has 'master' => (
64cdad22 236 is=> 'ro',
6a151f58 237 isa=>DBICStorageDBI,
64cdad22 238 lazy_build=>1,
cb6ec758 239);
240
cb6ec758 241=head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
242
fd323bf1 243The following methods are delegated all the methods required for the
cb6ec758 244L<DBIx::Class::Storage::DBI> interface.
245
cb6ec758 246=cut
247
4bea1fe7 248my $method_dispatch = {
249 writer => [qw/
64cdad22 250 on_connect_do
6f7344b8 251 on_disconnect_do
3244fdcc 252 on_connect_call
253 on_disconnect_call
64cdad22 254 connect_info
3244fdcc 255 _connect_info
64cdad22 256 throw_exception
257 sql_maker
258 sqlt_type
259 create_ddl_dir
260 deployment_statements
261 datetime_parser
6f7344b8 262 datetime_parser_type
263 build_datetime_parser
64cdad22 264 last_insert_id
265 insert
266 insert_bulk
267 update
268 delete
269 dbh
2ce6e9a6 270 txn_begin
64cdad22 271 txn_do
272 txn_commit
273 txn_rollback
2ce6e9a6 274 txn_scope_guard
90d7422f 275 _exec_txn_rollback
276 _exec_txn_begin
277 _exec_txn_commit
64cdad22 278 deploy
0180bef9 279 with_deferred_fk_checks
6f7344b8 280 dbh_do
2ce6e9a6 281 _prep_for_execute
6f7344b8 282 is_datatype_numeric
283 _count_select
6f7344b8 284 svp_rollback
285 svp_begin
286 svp_release
e398f77e 287 relname_to_table_alias
3244fdcc 288 _dbh_last_insert_id
3244fdcc 289 _default_dbi_connect_attributes
290 _dbi_connect_info
b9ca4ff1 291 _dbic_connect_attributes
3244fdcc 292 auto_savepoint
0e773352 293 _query_start
3244fdcc 294 _query_end
0e773352 295 _format_for_trace
296 _dbi_attrs_for_bind
3244fdcc 297 bind_attribute_by_data_type
298 transaction_depth
299 _dbh
300 _select_args
52cef7e3 301 _dbh_execute_for_fetch
3244fdcc 302 _sql_maker
3244fdcc 303 _dbh_execute_inserts_with_no_binds
304 _select_args_to_query
5e782048 305 _gen_sql_bind
3244fdcc 306 _svp_generate_name
3244fdcc 307 _normalize_connect_info
308 _parse_connect_do
3244fdcc 309 savepoints
3244fdcc 310 _sql_maker_opts
311 _conn_pid
3244fdcc 312 _dbh_autocommit
313 _native_data_type
314 _get_dbh
315 sql_maker_class
3244fdcc 316 _execute
317 _do_query
402ac1c9 318 _sth
3244fdcc 319 _dbh_sth
320 _dbh_execute
5b4f3fd0 321 /, Class::MOP::Class->initialize('DBIx::Class::Storage::DBIHacks')->get_method_list ],
4bea1fe7 322 reader => [qw/
323 select
324 select_single
325 columns_info_for
326 _dbh_columns_info_for
327 _select
328 /],
329 unimplemented => [qw/
330 _arm_global_destructor
331 _verify_pid
332
0e773352 333 source_bind_attributes
334
4bea1fe7 335 get_use_dbms_capability
336 set_use_dbms_capability
337 get_dbms_capability
338 set_dbms_capability
339 _dbh_details
340 _dbh_get_info
341
342 sql_limit_dialect
343 sql_quote_char
344 sql_name_sep
345
4bea1fe7 346 _prefetch_autovalues
347
eec07bca 348 _resolve_bindattrs
349
4bea1fe7 350 _max_column_bytesize
351 _is_lob_type
352 _is_binary_lob_type
353 _is_text_lob_type
402ac1c9 354
355 sth
4bea1fe7 356 /,(
357 # the capability framework
358 # not sure if CMOP->initialize does evil things to DBIC::S::DBI, fix if a problem
359 grep
360 { $_ =~ /^ _ (?: use | supports | determine_supports ) _ /x }
361 ( Class::MOP::Class->initialize('DBIx::Class::Storage::DBI')->get_all_method_names )
362 )],
363};
364
365if (DBIx::Class::_ENV_::DBICTEST) {
366
367 my $seen;
368 for my $type (keys %$method_dispatch) {
369 for (@{$method_dispatch->{$type}}) {
370 push @{$seen->{$_}}, $type;
371 }
372 }
cb6ec758 373
4bea1fe7 374 if (my @dupes = grep { @{$seen->{$_}} > 1 } keys %$seen) {
375 die(join "\n", '',
376 'The following methods show up multiple times in ::Storage::DBI::Replicated handlers:',
377 (map { "$_: " . (join ', ', @{$seen->{$_}}) } sort @dupes),
378 '',
379 );
380 }
bbdda281 381
4bea1fe7 382 if (my @cant = grep { ! DBIx::Class::Storage::DBI->can($_) } keys %$seen) {
383 die(join "\n", '',
384 '::Storage::DBI::Replicated specifies handling of the following *NON EXISTING* ::Storage::DBI methods:',
385 @cant,
386 '',
387 );
388 }
389}
bbdda281 390
4bea1fe7 391for my $method (@{$method_dispatch->{unimplemented}}) {
392 __PACKAGE__->meta->add_method($method, sub {
70c28808 393 my $self = shift;
394 $self->throw_exception("$method must not be called on ".(blessed $self).' objects');
4bea1fe7 395 });
396}
31a8aaaf 397
4bea1fe7 398=head2 read_handler
584ea6e4 399
4bea1fe7 400Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
584ea6e4 401
4bea1fe7 402=cut
403
404has 'read_handler' => (
405 is=>'rw',
406 isa=>Object,
407 lazy_build=>1,
408 handles=>$method_dispatch->{reader},
e471ab87 409);
410
4bea1fe7 411=head2 write_handler
412
413Defines an object that implements the write side of L<BIx::Class::Storage::DBI>,
414as well as methods that don't write or read that can be called on only one
415storage, methods that return a C<$dbh>, and any methods that don't make sense to
416run on a replicant.
417
418=cut
419
420has 'write_handler' => (
421 is=>'ro',
422 isa=>Object,
423 lazy_build=>1,
424 handles=>$method_dispatch->{writer},
7f4433eb 425);
426
4bea1fe7 427
6d766626 428
b2e4d522 429has _master_connect_info_opts =>
430 (is => 'rw', isa => HashRef, default => sub { {} });
431
432=head2 around: connect_info
433
48580715 434Preserves master's C<connect_info> options (for merging with replicants.)
435Also sets any Replicated-related options from connect_info, such as
dcdf7b2c 436C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>.
b2e4d522 437
438=cut
439
440around connect_info => sub {
441 my ($next, $self, $info, @extra) = @_;
442
282a9a4f 443 my $merge = Hash::Merge->new('LEFT_PRECEDENT');
e666c5fd 444
b2e4d522 445 my %opts;
446 for my $arg (@$info) {
447 next unless (reftype($arg)||'') eq 'HASH';
e666c5fd 448 %opts = %{ $merge->merge($arg, \%opts) };
b2e4d522 449 }
b2e4d522 450 delete $opts{dsn};
451
dcdf7b2c 452 if (@opts{qw/pool_type pool_args/}) {
453 $self->pool_type(delete $opts{pool_type})
454 if $opts{pool_type};
455
b88b85e7 456 $self->pool_args(
e666c5fd 457 $merge->merge((delete $opts{pool_args} || {}), $self->pool_args)
b88b85e7 458 );
dcdf7b2c 459
64ae1667 460 ## Since we possibly changed the pool_args, we need to clear the current
461 ## pool object so that next time it is used it will be rebuilt.
462 $self->clear_pool;
dcdf7b2c 463 }
464
465 if (@opts{qw/balancer_type balancer_args/}) {
466 $self->balancer_type(delete $opts{balancer_type})
467 if $opts{balancer_type};
468
b88b85e7 469 $self->balancer_args(
e666c5fd 470 $merge->merge((delete $opts{balancer_args} || {}), $self->balancer_args)
b88b85e7 471 );
dcdf7b2c 472
67c43863 473 $self->balancer($self->_build_balancer)
6f7344b8 474 if $self->balancer;
dcdf7b2c 475 }
476
b2e4d522 477 $self->_master_connect_info_opts(\%opts);
478
cca282b6 479 my @res;
480 if (wantarray) {
0ce2d0d5 481 @res = $self->$next($info, @extra);
482 } else {
cca282b6 483 $res[0] = $self->$next($info, @extra);
0ce2d0d5 484 }
485
fd4eb9c2 486 # Make sure master is blessed into the correct class and apply role to it.
487 my $master = $self->master;
488 $master->_determine_driver;
489 Moose::Meta::Class->initialize(ref $master);
cea43436 490
ec0946db 491 DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master);
cea43436 492
493 # link pool back to master
494 $self->pool->master($master);
0ce2d0d5 495
cca282b6 496 wantarray ? @res : $res[0];
b2e4d522 497};
498
26ab719a 499=head1 METHODS
2bf79155 500
26ab719a 501This class defines the following methods.
2bf79155 502
c354902c 503=head2 BUILDARGS
2bf79155 504
faaba25f 505L<DBIx::Class::Schema> when instantiating its storage passed itself as the
2ce6e9a6 506first argument. So we need to massage the arguments a bit so that all the
507bits get put into the correct places.
2bf79155 508
509=cut
510
c354902c 511sub BUILDARGS {
fd323bf1 512 my ($class, $schema, $storage_type_args, @args) = @_;
d4daee7b 513
c354902c 514 return {
6f7344b8 515 schema=>$schema,
516 %$storage_type_args,
517 @args
c354902c 518 }
519}
2bf79155 520
cb6ec758 521=head2 _build_master
2bf79155 522
cb6ec758 523Lazy builder for the L</master> attribute.
2bf79155 524
525=cut
526
cb6ec758 527sub _build_master {
2ce6e9a6 528 my $self = shift @_;
ee356d00 529 my $master = DBIx::Class::Storage::DBI->new($self->schema);
ee356d00 530 $master
106d5f3b 531}
532
26ab719a 533=head2 _build_pool
2bf79155 534
26ab719a 535Lazy builder for the L</pool> attribute.
2bf79155 536
537=cut
538
26ab719a 539sub _build_pool {
64cdad22 540 my $self = shift @_;
541 $self->create_pool(%{$self->pool_args});
2bf79155 542}
543
26ab719a 544=head2 _build_balancer
2bf79155 545
cb6ec758 546Lazy builder for the L</balancer> attribute. This takes a Pool object so that
547the balancer knows which pool it's balancing.
2bf79155 548
549=cut
550
26ab719a 551sub _build_balancer {
64cdad22 552 my $self = shift @_;
553 $self->create_balancer(
6f7344b8 554 pool=>$self->pool,
64cdad22 555 master=>$self->master,
556 %{$self->balancer_args},
557 );
2bf79155 558}
559
cb6ec758 560=head2 _build_write_handler
2bf79155 561
cb6ec758 562Lazy builder for the L</write_handler> attribute. The default is to set this to
563the L</master>.
50336325 564
565=cut
566
cb6ec758 567sub _build_write_handler {
64cdad22 568 return shift->master;
cb6ec758 569}
50336325 570
cb6ec758 571=head2 _build_read_handler
2bf79155 572
cb6ec758 573Lazy builder for the L</read_handler> attribute. The default is to set this to
574the L</balancer>.
2bf79155 575
576=cut
577
cb6ec758 578sub _build_read_handler {
64cdad22 579 return shift->balancer;
cb6ec758 580}
50336325 581
cb6ec758 582=head2 around: connect_replicants
2bf79155 583
cb6ec758 584All calls to connect_replicants needs to have an existing $schema tacked onto
b2e4d522 585top of the args, since L<DBIx::Storage::DBI> needs it, and any C<connect_info>
586options merged with the master, with replicant opts having higher priority.
955a6df6 587
cb6ec758 588=cut
955a6df6 589
b2e4d522 590around connect_replicants => sub {
591 my ($next, $self, @args) = @_;
592
593 for my $r (@args) {
594 $r = [ $r ] unless reftype $r eq 'ARRAY';
595
1a58752c 596 $self->throw_exception('coderef replicant connect_info not supported')
b2e4d522 597 if ref $r->[0] && reftype $r->[0] eq 'CODE';
598
599# any connect_info options?
600 my $i = 0;
601 $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
602
6f7344b8 603# make one if none
b2e4d522 604 $r->[$i] = {} unless $r->[$i];
605
606# merge if two hashes
b88b85e7 607 my @hashes = @$r[$i .. $#{$r}];
608
1a58752c 609 $self->throw_exception('invalid connect_info options')
b88b85e7 610 if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes;
611
1a58752c 612 $self->throw_exception('too many hashrefs in connect_info')
b88b85e7 613 if @hashes > 2;
614
282a9a4f 615 my $merge = Hash::Merge->new('LEFT_PRECEDENT');
e666c5fd 616 my %opts = %{ $merge->merge(reverse @hashes) };
b88b85e7 617
618# delete them
b2e4d522 619 splice @$r, $i+1, ($#{$r} - $i), ();
620
0bd8e058 621# make sure master/replicants opts don't clash
622 my %master_opts = %{ $self->_master_connect_info_opts };
623 if (exists $opts{dbh_maker}) {
624 delete @master_opts{qw/dsn user password/};
625 }
626 delete $master_opts{dbh_maker};
627
b2e4d522 628# merge with master
e666c5fd 629 %opts = %{ $merge->merge(\%opts, \%master_opts) };
b2e4d522 630
631# update
632 $r->[$i] = \%opts;
633 }
634
635 $self->$next($self->schema, @args);
955a6df6 636};
2bf79155 637
2bf79155 638=head2 all_storages
639
640Returns an array of of all the connected storage backends. The first element
641in the returned array is the master, and the remainings are each of the
642replicants.
643
644=cut
645
646sub all_storages {
64cdad22 647 my $self = shift @_;
648 return grep {defined $_ && blessed $_} (
649 $self->master,
6412a592 650 values %{ $self->replicants },
64cdad22 651 );
2bf79155 652}
653
c4d3fae2 654=head2 execute_reliably ($coderef, ?@args)
655
656Given a coderef, saves the current state of the L</read_handler>, forces it to
48580715 657use reliable storage (e.g. sets it to the master), executes a coderef and then
c4d3fae2 658restores the original state.
659
660Example:
661
64cdad22 662 my $reliably = sub {
663 my $name = shift @_;
664 $schema->resultset('User')->create({name=>$name});
fd323bf1 665 my $user_rs = $schema->resultset('User')->find({name=>$name});
64cdad22 666 return $user_rs;
667 };
c4d3fae2 668
64cdad22 669 my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
c4d3fae2 670
671Use this when you must be certain of your database state, such as when you just
672inserted something and need to get a resultset including it, etc.
673
674=cut
675
676sub execute_reliably {
64cdad22 677 my ($self, $coderef, @args) = @_;
d4daee7b 678
64cdad22 679 unless( ref $coderef eq 'CODE') {
680 $self->throw_exception('Second argument must be a coderef');
681 }
d4daee7b 682
64cdad22 683 ##Get copy of master storage
684 my $master = $self->master;
d4daee7b 685
64cdad22 686 ##Get whatever the current read hander is
687 my $current = $self->read_handler;
d4daee7b 688
64cdad22 689 ##Set the read handler to master
690 $self->read_handler($master);
d4daee7b 691
64cdad22 692 ## do whatever the caller needs
693 my @result;
694 my $want_array = wantarray;
d4daee7b 695
ed7ab0f4 696 try {
64cdad22 697 if($want_array) {
698 @result = $coderef->(@args);
699 } elsif(defined $want_array) {
700 ($result[0]) = ($coderef->(@args));
ed213e85 701 } else {
64cdad22 702 $coderef->(@args);
6f7344b8 703 }
ed7ab0f4 704 } catch {
705 $self->throw_exception("coderef returned an error: $_");
706 } finally {
707 ##Reset to the original state
708 $self->read_handler($current);
64cdad22 709 };
d4daee7b 710
cca282b6 711 return wantarray ? @result : $result[0];
c4d3fae2 712}
713
cb6ec758 714=head2 set_reliable_storage
715
716Sets the current $schema to be 'reliable', that is all queries, both read and
717write are sent to the master
d4daee7b 718
cb6ec758 719=cut
720
721sub set_reliable_storage {
64cdad22 722 my $self = shift @_;
723 my $schema = $self->schema;
724 my $write_handler = $self->schema->storage->write_handler;
d4daee7b 725
64cdad22 726 $schema->storage->read_handler($write_handler);
cb6ec758 727}
728
729=head2 set_balanced_storage
730
731Sets the current $schema to be use the </balancer> for all reads, while all
48580715 732writes are sent to the master only
d4daee7b 733
cb6ec758 734=cut
735
736sub set_balanced_storage {
64cdad22 737 my $self = shift @_;
738 my $schema = $self->schema;
bd5da369 739 my $balanced_handler = $self->schema->storage->balancer;
d4daee7b 740
bd5da369 741 $schema->storage->read_handler($balanced_handler);
cb6ec758 742}
2bf79155 743
744=head2 connected
745
746Check that the master and at least one of the replicants is connected.
747
748=cut
749
750sub connected {
64cdad22 751 my $self = shift @_;
752 return
753 $self->master->connected &&
754 $self->pool->connected_replicants;
2bf79155 755}
756
2bf79155 757=head2 ensure_connected
758
759Make sure all the storages are connected.
760
761=cut
762
763sub ensure_connected {
64cdad22 764 my $self = shift @_;
765 foreach my $source ($self->all_storages) {
766 $source->ensure_connected(@_);
767 }
2bf79155 768}
769
2bf79155 770=head2 limit_dialect
771
772Set the limit_dialect for all existing storages
773
774=cut
775
776sub limit_dialect {
64cdad22 777 my $self = shift @_;
778 foreach my $source ($self->all_storages) {
779 $source->limit_dialect(@_);
780 }
f3e9f010 781 return $self->master->limit_dialect;
2bf79155 782}
783
2bf79155 784=head2 quote_char
785
786Set the quote_char for all existing storages
787
788=cut
789
790sub quote_char {
64cdad22 791 my $self = shift @_;
792 foreach my $source ($self->all_storages) {
793 $source->quote_char(@_);
794 }
3fbe08e3 795 return $self->master->quote_char;
2bf79155 796}
797
2bf79155 798=head2 name_sep
799
800Set the name_sep for all existing storages
801
802=cut
803
804sub name_sep {
64cdad22 805 my $self = shift @_;
806 foreach my $source ($self->all_storages) {
807 $source->name_sep(@_);
808 }
3fbe08e3 809 return $self->master->name_sep;
2bf79155 810}
811
2bf79155 812=head2 set_schema
813
814Set the schema object for all existing storages
815
816=cut
817
818sub set_schema {
64cdad22 819 my $self = shift @_;
820 foreach my $source ($self->all_storages) {
821 $source->set_schema(@_);
822 }
2bf79155 823}
824
2bf79155 825=head2 debug
826
827set a debug flag across all storages
828
829=cut
830
831sub debug {
64cdad22 832 my $self = shift @_;
3fbe08e3 833 if(@_) {
834 foreach my $source ($self->all_storages) {
835 $source->debug(@_);
6f7344b8 836 }
64cdad22 837 }
3fbe08e3 838 return $self->master->debug;
2bf79155 839}
840
2bf79155 841=head2 debugobj
842
cea43436 843set a debug object
2bf79155 844
845=cut
846
847sub debugobj {
64cdad22 848 my $self = shift @_;
cea43436 849 return $self->master->debugobj(@_);
2bf79155 850}
851
2bf79155 852=head2 debugfh
853
cea43436 854set a debugfh object
2bf79155 855
856=cut
857
858sub debugfh {
64cdad22 859 my $self = shift @_;
cea43436 860 return $self->master->debugfh(@_);
2bf79155 861}
862
2bf79155 863=head2 debugcb
864
cea43436 865set a debug callback
2bf79155 866
867=cut
868
869sub debugcb {
64cdad22 870 my $self = shift @_;
cea43436 871 return $self->master->debugcb(@_);
2bf79155 872}
873
2bf79155 874=head2 disconnect
875
876disconnect everything
877
878=cut
879
880sub disconnect {
64cdad22 881 my $self = shift @_;
882 foreach my $source ($self->all_storages) {
883 $source->disconnect(@_);
884 }
2bf79155 885}
886
b2e4d522 887=head2 cursor_class
888
889set cursor class on all storages, or return master's
890
891=cut
892
893sub cursor_class {
894 my ($self, $cursor_class) = @_;
895
896 if ($cursor_class) {
897 $_->cursor_class($cursor_class) for $self->all_storages;
898 }
899 $self->master->cursor_class;
900}
d4daee7b 901
3244fdcc 902=head2 cursor
903
904set cursor class on all storages, or return master's, alias for L</cursor_class>
905above.
906
907=cut
908
909sub cursor {
910 my ($self, $cursor_class) = @_;
911
912 if ($cursor_class) {
913 $_->cursor($cursor_class) for $self->all_storages;
914 }
915 $self->master->cursor;
916}
917
918=head2 unsafe
919
920sets the L<DBIx::Class::Storage::DBI/unsafe> option on all storages or returns
921master's current setting
922
923=cut
924
925sub unsafe {
926 my $self = shift;
927
928 if (@_) {
929 $_->unsafe(@_) for $self->all_storages;
930 }
931
932 return $self->master->unsafe;
933}
934
935=head2 disable_sth_caching
936
937sets the L<DBIx::Class::Storage::DBI/disable_sth_caching> option on all storages
938or returns master's current setting
939
940=cut
941
942sub disable_sth_caching {
943 my $self = shift;
944
945 if (@_) {
946 $_->disable_sth_caching(@_) for $self->all_storages;
947 }
948
949 return $self->master->disable_sth_caching;
950}
951
952=head2 lag_behind_master
953
954returns the highest Replicant L<DBIx::Class::Storage::DBI/lag_behind_master>
955setting
956
957=cut
958
959sub lag_behind_master {
960 my $self = shift;
961
962 return max map $_->lag_behind_master, $self->replicants;
fd323bf1 963}
3244fdcc 964
965=head2 is_replicating
966
967returns true if all replicants return true for
968L<DBIx::Class::Storage::DBI/is_replicating>
969
970=cut
971
972sub is_replicating {
973 my $self = shift;
974
975 return (grep $_->is_replicating, $self->replicants) == ($self->replicants);
976}
977
978=head2 connect_call_datetime_setup
979
980calls L<DBIx::Class::Storage::DBI/connect_call_datetime_setup> for all storages
981
982=cut
983
984sub connect_call_datetime_setup {
985 my $self = shift;
986 $_->connect_call_datetime_setup for $self->all_storages;
987}
988
989sub _populate_dbh {
990 my $self = shift;
991 $_->_populate_dbh for $self->all_storages;
992}
993
994sub _connect {
995 my $self = shift;
996 $_->_connect for $self->all_storages;
997}
998
999sub _rebless {
1000 my $self = shift;
1001 $_->_rebless for $self->all_storages;
1002}
1003
1004sub _determine_driver {
1005 my $self = shift;
1006 $_->_determine_driver for $self->all_storages;
1007}
1008
1009sub _driver_determined {
1010 my $self = shift;
fd323bf1 1011
3244fdcc 1012 if (@_) {
1013 $_->_driver_determined(@_) for $self->all_storages;
1014 }
1015
1016 return $self->master->_driver_determined;
1017}
1018
1019sub _init {
1020 my $self = shift;
fd323bf1 1021
3244fdcc 1022 $_->_init for $self->all_storages;
1023}
1024
1025sub _run_connection_actions {
1026 my $self = shift;
fd323bf1 1027
3244fdcc 1028 $_->_run_connection_actions for $self->all_storages;
1029}
1030
1031sub _do_connection_actions {
1032 my $self = shift;
fd323bf1 1033
3244fdcc 1034 if (@_) {
1035 $_->_do_connection_actions(@_) for $self->all_storages;
1036 }
1037}
1038
1039sub connect_call_do_sql {
1040 my $self = shift;
1041 $_->connect_call_do_sql(@_) for $self->all_storages;
1042}
1043
1044sub disconnect_call_do_sql {
1045 my $self = shift;
1046 $_->disconnect_call_do_sql(@_) for $self->all_storages;
1047}
1048
1049sub _seems_connected {
1050 my $self = shift;
1051
1052 return min map $_->_seems_connected, $self->all_storages;
1053}
1054
1055sub _ping {
1056 my $self = shift;
1057
1058 return min map $_->_ping, $self->all_storages;
1059}
1060
bbdda281 1061# not using the normalized_version, because we want to preserve
1062# version numbers much longer than the conventional xxx.yyyzzz
7da56142 1063my $numify_ver = sub {
1064 my $ver = shift;
1065 my @numparts = split /\D+/, $ver;
bbdda281 1066 my $format = '%d.' . (join '', ('%06d') x (@numparts - 1));
7da56142 1067
1068 return sprintf $format, @numparts;
1069};
fecb38cb 1070sub _server_info {
1071 my $self = shift;
1072
bbdda281 1073 if (not $self->_dbh_details->{info}) {
1074 $self->_dbh_details->{info} = (
fd323bf1 1075 reduce { $a->[0] < $b->[0] ? $a : $b }
7da56142 1076 map [ $numify_ver->($_->{dbms_version}), $_ ],
1077 map $_->_server_info, $self->all_storages
1078 )->[1];
fecb38cb 1079 }
1080
bbdda281 1081 return $self->next::method;
fecb38cb 1082}
1083
1084sub _get_server_version {
1085 my $self = shift;
1086
1087 return $self->_server_info->{dbms_version};
1088}
1089
7e38d850 1090=head1 GOTCHAS
1091
1092Due to the fact that replicants can lag behind a master, you must take care to
1093make sure you use one of the methods to force read queries to a master should
1094you need realtime data integrity. For example, if you insert a row, and then
1095immediately re-read it from the database (say, by doing $row->discard_changes)
1096or you insert a row and then immediately build a query that expects that row
1097to be an item, you should force the master to handle reads. Otherwise, due to
1098the lag, there is no certainty your data will be in the expected state.
1099
1100For data integrity, all transactions automatically use the master storage for
1101all read and write queries. Using a transaction is the preferred and recommended
1102method to force the master to handle all read queries.
1103
1104Otherwise, you can force a single query to use the master with the 'force_pool'
1105attribute:
1106
1107 my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk);
1108
1109This attribute will safely be ignore by non replicated storages, so you can use
1110the same code for both types of systems.
1111
1112Lastly, you can use the L</execute_reliably> method, which works very much like
1113a transaction.
1114
1115For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
1116and L</set_balanced_storage>, however this operates at a global level and is not
1117suitable if you have a shared Schema object being used by multiple processes,
1118such as on a web application server. You can get around this limitation by
1119using the Schema clone method.
1120
1121 my $new_schema = $schema->clone;
1122 $new_schema->set_reliable_storage;
d4daee7b 1123
7e38d850 1124 ## $new_schema will use only the Master storage for all reads/writes while
1125 ## the $schema object will use replicated storage.
1126
f5d3a5de 1127=head1 AUTHOR
1128
64cdad22 1129 John Napiorkowski <john.napiorkowski@takkle.com>
f5d3a5de 1130
c4d3fae2 1131Based on code originated by:
f5d3a5de 1132
64cdad22 1133 Norbert Csongrádi <bert@cpan.org>
1134 Peter Siklósi <einon@einon.hu>
2156bbdd 1135
f5d3a5de 1136=head1 LICENSE
1137
1138You may distribute this code under the same terms as Perl itself.
1139
1140=cut
1141
c354902c 1142__PACKAGE__->meta->make_immutable;
1143
f5d3a5de 11441;