Switch from using execute_array to execute_for_fetch directly
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
CommitLineData
2156bbdd 1package DBIx::Class::Storage::DBI::Replicated;
f5d3a5de 2
ecb65397 3BEGIN {
a34b0c89 4 use DBIx::Class;
70c28808 5 die('The following modules are required for Replication ' . DBIx::Class::Optional::Dependencies->req_missing_for ('replicated') . "\n" )
a34b0c89 6 unless DBIx::Class::Optional::Dependencies->req_ok_for ('replicated');
ecb65397 7}
8
b2e4d522 9use Moose;
26ab719a 10use DBIx::Class::Storage::DBI;
2bf79155 11use DBIx::Class::Storage::DBI::Replicated::Pool;
26ab719a 12use DBIx::Class::Storage::DBI::Replicated::Balancer;
6a151f58 13use DBIx::Class::Storage::DBI::Replicated::Types qw/BalancerClassNamePart DBICSchema DBICStorageDBI/;
41916570 14use MooseX::Types::Moose qw/ClassName HashRef Object/;
b2e4d522 15use Scalar::Util 'reftype';
e666c5fd 16use Hash::Merge;
7da56142 17use List::Util qw/min max reduce/;
ed7ab0f4 18use Try::Tiny;
fd323bf1 19use namespace::clean;
9901aad7 20
21use namespace::clean -except => 'meta';
2bf79155 22
23=head1 NAME
24
ecb65397 25DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
2bf79155 26
27=head1 SYNOPSIS
28
29The Following example shows how to change an existing $schema to a replicated
48580715 30storage type, add some replicated (read-only) databases, and perform reporting
955a6df6 31tasks.
2bf79155 32
3da4f736 33You should set the 'storage_type attribute to a replicated type. You should
d4daee7b 34also define your arguments, such as which balancer you want and any arguments
3da4f736 35that the Pool object should get.
36
ce854fd3 37 my $schema = Schema::Class->clone;
64cdad22 38 $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
ce854fd3 39 $schema->connection(...);
d4daee7b 40
fd323bf1 41Next, you need to add in the Replicants. Basically this is an array of
3da4f736 42arrayrefs, where each arrayref is database connect information. Think of these
43arguments as what you'd pass to the 'normal' $schema->connect method.
d4daee7b 44
64cdad22 45 $schema->storage->connect_replicants(
46 [$dsn1, $user, $pass, \%opts],
47 [$dsn2, $user, $pass, \%opts],
48 [$dsn3, $user, $pass, \%opts],
49 );
d4daee7b 50
3da4f736 51Now, just use the $schema as you normally would. Automatically all reads will
52be delegated to the replicants, while writes to the master.
53
7e38d850 54 $schema->resultset('Source')->search({name=>'etc'});
d4daee7b 55
3da4f736 56You can force a given query to use a particular storage using the search
57attribute 'force_pool'. For example:
d4daee7b 58
d7c37d66 59 my $rs = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
3da4f736 60
d7c37d66 61Now $rs will force everything (both reads and writes) to use whatever was setup
fd323bf1 62as the master storage. 'master' is hardcoded to always point to the Master,
3da4f736 63but you can also use any Replicant name. Please see:
212cc5c2 64L<DBIx::Class::Storage::DBI::Replicated::Pool> and the replicants attribute for more.
3da4f736 65
66Also see transactions and L</execute_reliably> for alternative ways to
67force read traffic to the master. In general, you should wrap your statements
68in a transaction when you are reading and writing to the same tables at the
69same time, since your replicants will often lag a bit behind the master.
212cc5c2 70
d7c37d66 71If you have a multi-statement read only transaction you can force it to select
72a random server in the pool by:
73
74 my $rs = $schema->resultset('Source')->search( undef,
75 { force_pool => $db->storage->read_handler->next_storage }
76 );
d4daee7b 77
2bf79155 78=head1 DESCRIPTION
79
7e38d850 80Warning: This class is marked BETA. This has been running a production
ccb3b897 81website using MySQL native replication as its backend and we have some decent
7e38d850 82test coverage but the code hasn't yet been stressed by a variety of databases.
48580715 83Individual DBs may have quirks we are not aware of. Please use this in first
7e38d850 84development and pass along your experiences/bug fixes.
2bf79155 85
86This class implements replicated data store for DBI. Currently you can define
87one master and numerous slave database connections. All write-type queries
88(INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
89database, all read-type queries (SELECTs) go to the slave database.
90
91Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
bca099a3 92handle gets delegated to one of the two attributes: L</read_handler> or to
93L</write_handler>. Additionally, some methods need to be distributed
2bf79155 94to all existing storages. This way our storage class is a drop in replacement
95for L<DBIx::Class::Storage::DBI>.
96
48580715 97Read traffic is spread across the replicants (slaves) occurring to a user
2bf79155 98selected algorithm. The default algorithm is random weighted.
99
bca099a3 100=head1 NOTES
101
48580715 102The consistency between master and replicants is database specific. The Pool
faaba25f 103gives you a method to validate its replicants, removing and replacing them
7e38d850 104when they fail/pass predefined criteria. Please make careful use of the ways
ecb65397 105to force a query to run against Master when needed.
106
107=head1 REQUIREMENTS
108
a34b0c89 109Replicated Storage has additional requirements not currently part of
aa8b2277 110L<DBIx::Class>. See L<DBIx::Class::Optional::Dependencies> for more details.
2bf79155 111
112=head1 ATTRIBUTES
113
114This class defines the following attributes.
115
2ce6e9a6 116=head2 schema
117
118The underlying L<DBIx::Class::Schema> object this storage is attaching
119
120=cut
121
122has 'schema' => (
123 is=>'rw',
6a151f58 124 isa=>DBICSchema,
2ce6e9a6 125 weak_ref=>1,
126 required=>1,
127);
128
26ab719a 129=head2 pool_type
2bf79155 130
fd323bf1 131Contains the classname which will instantiate the L</pool> object. Defaults
26ab719a 132to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
2bf79155 133
134=cut
135
26ab719a 136has 'pool_type' => (
dcdf7b2c 137 is=>'rw',
41916570 138 isa=>ClassName,
2ce6e9a6 139 default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
64cdad22 140 handles=>{
141 'create_pool' => 'new',
142 },
2bf79155 143);
144
f068a139 145=head2 pool_args
146
147Contains a hashref of initialized information to pass to the Balancer object.
212cc5c2 148See L<DBIx::Class::Storage::DBI::Replicated::Pool> for available arguments.
f068a139 149
150=cut
151
152has 'pool_args' => (
dcdf7b2c 153 is=>'rw',
41916570 154 isa=>HashRef,
64cdad22 155 lazy=>1,
64cdad22 156 default=>sub { {} },
f068a139 157);
158
159
26ab719a 160=head2 balancer_type
2bf79155 161
162The replication pool requires a balance class to provider the methods for
163choose how to spread the query load across each replicant in the pool.
164
165=cut
166
26ab719a 167has 'balancer_type' => (
dcdf7b2c 168 is=>'rw',
9901aad7 169 isa=>BalancerClassNamePart,
2ce6e9a6 170 coerce=>1,
171 required=>1,
172 default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
64cdad22 173 handles=>{
174 'create_balancer' => 'new',
175 },
2bf79155 176);
177
17b05c13 178=head2 balancer_args
179
180Contains a hashref of initialized information to pass to the Balancer object.
212cc5c2 181See L<DBIx::Class::Storage::DBI::Replicated::Balancer> for available arguments.
17b05c13 182
183=cut
184
185has 'balancer_args' => (
dcdf7b2c 186 is=>'rw',
41916570 187 isa=>HashRef,
64cdad22 188 lazy=>1,
189 required=>1,
190 default=>sub { {} },
17b05c13 191);
192
26ab719a 193=head2 pool
2bf79155 194
370f78d4 195Is a L<DBIx::Class::Storage::DBI::Replicated::Pool> or derived class. This is a
26ab719a 196container class for one or more replicated databases.
2bf79155 197
198=cut
199
26ab719a 200has 'pool' => (
64cdad22 201 is=>'ro',
202 isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
203 lazy_build=>1,
204 handles=>[qw/
6f7344b8 205 connect_replicants
64cdad22 206 replicants
207 has_replicants
208 /],
2bf79155 209);
210
26ab719a 211=head2 balancer
2bf79155 212
370f78d4 213Is a L<DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class. This
214is a class that takes a pool (L<DBIx::Class::Storage::DBI::Replicated::Pool>)
2bf79155 215
26ab719a 216=cut
2bf79155 217
26ab719a 218has 'balancer' => (
dcdf7b2c 219 is=>'rw',
64cdad22 220 isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
221 lazy_build=>1,
222 handles=>[qw/auto_validate_every/],
26ab719a 223);
2bf79155 224
cb6ec758 225=head2 master
226
227The master defines the canonical state for a pool of connected databases. All
228the replicants are expected to match this databases state. Thus, in a classic
229Master / Slaves distributed system, all the slaves are expected to replicate
230the Master's state as quick as possible. This is the only database in the
231pool of databases that is allowed to handle write traffic.
232
233=cut
234
235has 'master' => (
64cdad22 236 is=> 'ro',
6a151f58 237 isa=>DBICStorageDBI,
64cdad22 238 lazy_build=>1,
cb6ec758 239);
240
cb6ec758 241=head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
242
fd323bf1 243The following methods are delegated all the methods required for the
cb6ec758 244L<DBIx::Class::Storage::DBI> interface.
245
cb6ec758 246=cut
247
4bea1fe7 248my $method_dispatch = {
249 writer => [qw/
64cdad22 250 on_connect_do
6f7344b8 251 on_disconnect_do
3244fdcc 252 on_connect_call
253 on_disconnect_call
64cdad22 254 connect_info
3244fdcc 255 _connect_info
64cdad22 256 throw_exception
257 sql_maker
258 sqlt_type
259 create_ddl_dir
260 deployment_statements
261 datetime_parser
6f7344b8 262 datetime_parser_type
263 build_datetime_parser
64cdad22 264 last_insert_id
265 insert
266 insert_bulk
267 update
268 delete
269 dbh
2ce6e9a6 270 txn_begin
64cdad22 271 txn_do
272 txn_commit
273 txn_rollback
2ce6e9a6 274 txn_scope_guard
90d7422f 275 _exec_txn_rollback
276 _exec_txn_begin
277 _exec_txn_commit
64cdad22 278 deploy
0180bef9 279 with_deferred_fk_checks
6f7344b8 280 dbh_do
2ce6e9a6 281 _prep_for_execute
6f7344b8 282 is_datatype_numeric
283 _count_select
6f7344b8 284 _subq_update_delete
285 svp_rollback
286 svp_begin
287 svp_release
e398f77e 288 relname_to_table_alias
3244fdcc 289 _dbh_last_insert_id
3244fdcc 290 _default_dbi_connect_attributes
291 _dbi_connect_info
b9ca4ff1 292 _dbic_connect_attributes
3244fdcc 293 auto_savepoint
0e773352 294 _query_start
3244fdcc 295 _query_end
0e773352 296 _format_for_trace
297 _dbi_attrs_for_bind
3244fdcc 298 bind_attribute_by_data_type
299 transaction_depth
300 _dbh
301 _select_args
52cef7e3 302 _dbh_execute_for_fetch
3244fdcc 303 _sql_maker
3244fdcc 304 _per_row_update_delete
3244fdcc 305 _dbh_execute_inserts_with_no_binds
306 _select_args_to_query
5e782048 307 _gen_sql_bind
3244fdcc 308 _svp_generate_name
309 _multipk_update_delete
3244fdcc 310 _normalize_connect_info
311 _parse_connect_do
3244fdcc 312 savepoints
3244fdcc 313 _sql_maker_opts
314 _conn_pid
3244fdcc 315 _dbh_autocommit
316 _native_data_type
317 _get_dbh
318 sql_maker_class
3244fdcc 319 _execute
320 _do_query
402ac1c9 321 _sth
3244fdcc 322 _dbh_sth
323 _dbh_execute
5b4f3fd0 324 /, Class::MOP::Class->initialize('DBIx::Class::Storage::DBIHacks')->get_method_list ],
4bea1fe7 325 reader => [qw/
326 select
327 select_single
328 columns_info_for
329 _dbh_columns_info_for
330 _select
331 /],
332 unimplemented => [qw/
333 _arm_global_destructor
334 _verify_pid
335
0e773352 336 source_bind_attributes
337
4bea1fe7 338 get_use_dbms_capability
339 set_use_dbms_capability
340 get_dbms_capability
341 set_dbms_capability
342 _dbh_details
343 _dbh_get_info
344
345 sql_limit_dialect
346 sql_quote_char
347 sql_name_sep
348
4bea1fe7 349 _prefetch_autovalues
350
eec07bca 351 _resolve_bindattrs
352
4bea1fe7 353 _max_column_bytesize
354 _is_lob_type
355 _is_binary_lob_type
356 _is_text_lob_type
402ac1c9 357
358 sth
4bea1fe7 359 /,(
360 # the capability framework
361 # not sure if CMOP->initialize does evil things to DBIC::S::DBI, fix if a problem
362 grep
363 { $_ =~ /^ _ (?: use | supports | determine_supports ) _ /x }
364 ( Class::MOP::Class->initialize('DBIx::Class::Storage::DBI')->get_all_method_names )
365 )],
366};
367
368if (DBIx::Class::_ENV_::DBICTEST) {
369
370 my $seen;
371 for my $type (keys %$method_dispatch) {
372 for (@{$method_dispatch->{$type}}) {
373 push @{$seen->{$_}}, $type;
374 }
375 }
cb6ec758 376
4bea1fe7 377 if (my @dupes = grep { @{$seen->{$_}} > 1 } keys %$seen) {
378 die(join "\n", '',
379 'The following methods show up multiple times in ::Storage::DBI::Replicated handlers:',
380 (map { "$_: " . (join ', ', @{$seen->{$_}}) } sort @dupes),
381 '',
382 );
383 }
bbdda281 384
4bea1fe7 385 if (my @cant = grep { ! DBIx::Class::Storage::DBI->can($_) } keys %$seen) {
386 die(join "\n", '',
387 '::Storage::DBI::Replicated specifies handling of the following *NON EXISTING* ::Storage::DBI methods:',
388 @cant,
389 '',
390 );
391 }
392}
bbdda281 393
4bea1fe7 394for my $method (@{$method_dispatch->{unimplemented}}) {
395 __PACKAGE__->meta->add_method($method, sub {
70c28808 396 my $self = shift;
397 $self->throw_exception("$method must not be called on ".(blessed $self).' objects');
4bea1fe7 398 });
399}
31a8aaaf 400
4bea1fe7 401=head2 read_handler
584ea6e4 402
4bea1fe7 403Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
584ea6e4 404
4bea1fe7 405=cut
406
407has 'read_handler' => (
408 is=>'rw',
409 isa=>Object,
410 lazy_build=>1,
411 handles=>$method_dispatch->{reader},
e471ab87 412);
413
4bea1fe7 414=head2 write_handler
415
416Defines an object that implements the write side of L<BIx::Class::Storage::DBI>,
417as well as methods that don't write or read that can be called on only one
418storage, methods that return a C<$dbh>, and any methods that don't make sense to
419run on a replicant.
420
421=cut
422
423has 'write_handler' => (
424 is=>'ro',
425 isa=>Object,
426 lazy_build=>1,
427 handles=>$method_dispatch->{writer},
7f4433eb 428);
429
4bea1fe7 430
6d766626 431
b2e4d522 432has _master_connect_info_opts =>
433 (is => 'rw', isa => HashRef, default => sub { {} });
434
435=head2 around: connect_info
436
48580715 437Preserves master's C<connect_info> options (for merging with replicants.)
438Also sets any Replicated-related options from connect_info, such as
dcdf7b2c 439C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>.
b2e4d522 440
441=cut
442
443around connect_info => sub {
444 my ($next, $self, $info, @extra) = @_;
445
282a9a4f 446 my $merge = Hash::Merge->new('LEFT_PRECEDENT');
e666c5fd 447
b2e4d522 448 my %opts;
449 for my $arg (@$info) {
450 next unless (reftype($arg)||'') eq 'HASH';
e666c5fd 451 %opts = %{ $merge->merge($arg, \%opts) };
b2e4d522 452 }
b2e4d522 453 delete $opts{dsn};
454
dcdf7b2c 455 if (@opts{qw/pool_type pool_args/}) {
456 $self->pool_type(delete $opts{pool_type})
457 if $opts{pool_type};
458
b88b85e7 459 $self->pool_args(
e666c5fd 460 $merge->merge((delete $opts{pool_args} || {}), $self->pool_args)
b88b85e7 461 );
dcdf7b2c 462
64ae1667 463 ## Since we possibly changed the pool_args, we need to clear the current
464 ## pool object so that next time it is used it will be rebuilt.
465 $self->clear_pool;
dcdf7b2c 466 }
467
468 if (@opts{qw/balancer_type balancer_args/}) {
469 $self->balancer_type(delete $opts{balancer_type})
470 if $opts{balancer_type};
471
b88b85e7 472 $self->balancer_args(
e666c5fd 473 $merge->merge((delete $opts{balancer_args} || {}), $self->balancer_args)
b88b85e7 474 );
dcdf7b2c 475
67c43863 476 $self->balancer($self->_build_balancer)
6f7344b8 477 if $self->balancer;
dcdf7b2c 478 }
479
b2e4d522 480 $self->_master_connect_info_opts(\%opts);
481
cca282b6 482 my @res;
483 if (wantarray) {
0ce2d0d5 484 @res = $self->$next($info, @extra);
485 } else {
cca282b6 486 $res[0] = $self->$next($info, @extra);
0ce2d0d5 487 }
488
fd4eb9c2 489 # Make sure master is blessed into the correct class and apply role to it.
490 my $master = $self->master;
491 $master->_determine_driver;
492 Moose::Meta::Class->initialize(ref $master);
cea43436 493
ec0946db 494 DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master);
cea43436 495
496 # link pool back to master
497 $self->pool->master($master);
0ce2d0d5 498
cca282b6 499 wantarray ? @res : $res[0];
b2e4d522 500};
501
26ab719a 502=head1 METHODS
2bf79155 503
26ab719a 504This class defines the following methods.
2bf79155 505
c354902c 506=head2 BUILDARGS
2bf79155 507
faaba25f 508L<DBIx::Class::Schema> when instantiating its storage passed itself as the
2ce6e9a6 509first argument. So we need to massage the arguments a bit so that all the
510bits get put into the correct places.
2bf79155 511
512=cut
513
c354902c 514sub BUILDARGS {
fd323bf1 515 my ($class, $schema, $storage_type_args, @args) = @_;
d4daee7b 516
c354902c 517 return {
6f7344b8 518 schema=>$schema,
519 %$storage_type_args,
520 @args
c354902c 521 }
522}
2bf79155 523
cb6ec758 524=head2 _build_master
2bf79155 525
cb6ec758 526Lazy builder for the L</master> attribute.
2bf79155 527
528=cut
529
cb6ec758 530sub _build_master {
2ce6e9a6 531 my $self = shift @_;
ee356d00 532 my $master = DBIx::Class::Storage::DBI->new($self->schema);
ee356d00 533 $master
106d5f3b 534}
535
26ab719a 536=head2 _build_pool
2bf79155 537
26ab719a 538Lazy builder for the L</pool> attribute.
2bf79155 539
540=cut
541
26ab719a 542sub _build_pool {
64cdad22 543 my $self = shift @_;
544 $self->create_pool(%{$self->pool_args});
2bf79155 545}
546
26ab719a 547=head2 _build_balancer
2bf79155 548
cb6ec758 549Lazy builder for the L</balancer> attribute. This takes a Pool object so that
550the balancer knows which pool it's balancing.
2bf79155 551
552=cut
553
26ab719a 554sub _build_balancer {
64cdad22 555 my $self = shift @_;
556 $self->create_balancer(
6f7344b8 557 pool=>$self->pool,
64cdad22 558 master=>$self->master,
559 %{$self->balancer_args},
560 );
2bf79155 561}
562
cb6ec758 563=head2 _build_write_handler
2bf79155 564
cb6ec758 565Lazy builder for the L</write_handler> attribute. The default is to set this to
566the L</master>.
50336325 567
568=cut
569
cb6ec758 570sub _build_write_handler {
64cdad22 571 return shift->master;
cb6ec758 572}
50336325 573
cb6ec758 574=head2 _build_read_handler
2bf79155 575
cb6ec758 576Lazy builder for the L</read_handler> attribute. The default is to set this to
577the L</balancer>.
2bf79155 578
579=cut
580
cb6ec758 581sub _build_read_handler {
64cdad22 582 return shift->balancer;
cb6ec758 583}
50336325 584
cb6ec758 585=head2 around: connect_replicants
2bf79155 586
cb6ec758 587All calls to connect_replicants needs to have an existing $schema tacked onto
b2e4d522 588top of the args, since L<DBIx::Storage::DBI> needs it, and any C<connect_info>
589options merged with the master, with replicant opts having higher priority.
955a6df6 590
cb6ec758 591=cut
955a6df6 592
b2e4d522 593around connect_replicants => sub {
594 my ($next, $self, @args) = @_;
595
596 for my $r (@args) {
597 $r = [ $r ] unless reftype $r eq 'ARRAY';
598
1a58752c 599 $self->throw_exception('coderef replicant connect_info not supported')
b2e4d522 600 if ref $r->[0] && reftype $r->[0] eq 'CODE';
601
602# any connect_info options?
603 my $i = 0;
604 $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
605
6f7344b8 606# make one if none
b2e4d522 607 $r->[$i] = {} unless $r->[$i];
608
609# merge if two hashes
b88b85e7 610 my @hashes = @$r[$i .. $#{$r}];
611
1a58752c 612 $self->throw_exception('invalid connect_info options')
b88b85e7 613 if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes;
614
1a58752c 615 $self->throw_exception('too many hashrefs in connect_info')
b88b85e7 616 if @hashes > 2;
617
282a9a4f 618 my $merge = Hash::Merge->new('LEFT_PRECEDENT');
e666c5fd 619 my %opts = %{ $merge->merge(reverse @hashes) };
b88b85e7 620
621# delete them
b2e4d522 622 splice @$r, $i+1, ($#{$r} - $i), ();
623
0bd8e058 624# make sure master/replicants opts don't clash
625 my %master_opts = %{ $self->_master_connect_info_opts };
626 if (exists $opts{dbh_maker}) {
627 delete @master_opts{qw/dsn user password/};
628 }
629 delete $master_opts{dbh_maker};
630
b2e4d522 631# merge with master
e666c5fd 632 %opts = %{ $merge->merge(\%opts, \%master_opts) };
b2e4d522 633
634# update
635 $r->[$i] = \%opts;
636 }
637
638 $self->$next($self->schema, @args);
955a6df6 639};
2bf79155 640
2bf79155 641=head2 all_storages
642
643Returns an array of of all the connected storage backends. The first element
644in the returned array is the master, and the remainings are each of the
645replicants.
646
647=cut
648
649sub all_storages {
64cdad22 650 my $self = shift @_;
651 return grep {defined $_ && blessed $_} (
652 $self->master,
6412a592 653 values %{ $self->replicants },
64cdad22 654 );
2bf79155 655}
656
c4d3fae2 657=head2 execute_reliably ($coderef, ?@args)
658
659Given a coderef, saves the current state of the L</read_handler>, forces it to
48580715 660use reliable storage (e.g. sets it to the master), executes a coderef and then
c4d3fae2 661restores the original state.
662
663Example:
664
64cdad22 665 my $reliably = sub {
666 my $name = shift @_;
667 $schema->resultset('User')->create({name=>$name});
fd323bf1 668 my $user_rs = $schema->resultset('User')->find({name=>$name});
64cdad22 669 return $user_rs;
670 };
c4d3fae2 671
64cdad22 672 my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
c4d3fae2 673
674Use this when you must be certain of your database state, such as when you just
675inserted something and need to get a resultset including it, etc.
676
677=cut
678
679sub execute_reliably {
64cdad22 680 my ($self, $coderef, @args) = @_;
d4daee7b 681
64cdad22 682 unless( ref $coderef eq 'CODE') {
683 $self->throw_exception('Second argument must be a coderef');
684 }
d4daee7b 685
64cdad22 686 ##Get copy of master storage
687 my $master = $self->master;
d4daee7b 688
64cdad22 689 ##Get whatever the current read hander is
690 my $current = $self->read_handler;
d4daee7b 691
64cdad22 692 ##Set the read handler to master
693 $self->read_handler($master);
d4daee7b 694
64cdad22 695 ## do whatever the caller needs
696 my @result;
697 my $want_array = wantarray;
d4daee7b 698
ed7ab0f4 699 try {
64cdad22 700 if($want_array) {
701 @result = $coderef->(@args);
702 } elsif(defined $want_array) {
703 ($result[0]) = ($coderef->(@args));
ed213e85 704 } else {
64cdad22 705 $coderef->(@args);
6f7344b8 706 }
ed7ab0f4 707 } catch {
708 $self->throw_exception("coderef returned an error: $_");
709 } finally {
710 ##Reset to the original state
711 $self->read_handler($current);
64cdad22 712 };
d4daee7b 713
cca282b6 714 return wantarray ? @result : $result[0];
c4d3fae2 715}
716
cb6ec758 717=head2 set_reliable_storage
718
719Sets the current $schema to be 'reliable', that is all queries, both read and
720write are sent to the master
d4daee7b 721
cb6ec758 722=cut
723
724sub set_reliable_storage {
64cdad22 725 my $self = shift @_;
726 my $schema = $self->schema;
727 my $write_handler = $self->schema->storage->write_handler;
d4daee7b 728
64cdad22 729 $schema->storage->read_handler($write_handler);
cb6ec758 730}
731
732=head2 set_balanced_storage
733
734Sets the current $schema to be use the </balancer> for all reads, while all
48580715 735writes are sent to the master only
d4daee7b 736
cb6ec758 737=cut
738
739sub set_balanced_storage {
64cdad22 740 my $self = shift @_;
741 my $schema = $self->schema;
bd5da369 742 my $balanced_handler = $self->schema->storage->balancer;
d4daee7b 743
bd5da369 744 $schema->storage->read_handler($balanced_handler);
cb6ec758 745}
2bf79155 746
747=head2 connected
748
749Check that the master and at least one of the replicants is connected.
750
751=cut
752
753sub connected {
64cdad22 754 my $self = shift @_;
755 return
756 $self->master->connected &&
757 $self->pool->connected_replicants;
2bf79155 758}
759
2bf79155 760=head2 ensure_connected
761
762Make sure all the storages are connected.
763
764=cut
765
766sub ensure_connected {
64cdad22 767 my $self = shift @_;
768 foreach my $source ($self->all_storages) {
769 $source->ensure_connected(@_);
770 }
2bf79155 771}
772
2bf79155 773=head2 limit_dialect
774
775Set the limit_dialect for all existing storages
776
777=cut
778
779sub limit_dialect {
64cdad22 780 my $self = shift @_;
781 foreach my $source ($self->all_storages) {
782 $source->limit_dialect(@_);
783 }
f3e9f010 784 return $self->master->limit_dialect;
2bf79155 785}
786
2bf79155 787=head2 quote_char
788
789Set the quote_char for all existing storages
790
791=cut
792
793sub quote_char {
64cdad22 794 my $self = shift @_;
795 foreach my $source ($self->all_storages) {
796 $source->quote_char(@_);
797 }
3fbe08e3 798 return $self->master->quote_char;
2bf79155 799}
800
2bf79155 801=head2 name_sep
802
803Set the name_sep for all existing storages
804
805=cut
806
807sub name_sep {
64cdad22 808 my $self = shift @_;
809 foreach my $source ($self->all_storages) {
810 $source->name_sep(@_);
811 }
3fbe08e3 812 return $self->master->name_sep;
2bf79155 813}
814
2bf79155 815=head2 set_schema
816
817Set the schema object for all existing storages
818
819=cut
820
821sub set_schema {
64cdad22 822 my $self = shift @_;
823 foreach my $source ($self->all_storages) {
824 $source->set_schema(@_);
825 }
2bf79155 826}
827
2bf79155 828=head2 debug
829
830set a debug flag across all storages
831
832=cut
833
834sub debug {
64cdad22 835 my $self = shift @_;
3fbe08e3 836 if(@_) {
837 foreach my $source ($self->all_storages) {
838 $source->debug(@_);
6f7344b8 839 }
64cdad22 840 }
3fbe08e3 841 return $self->master->debug;
2bf79155 842}
843
2bf79155 844=head2 debugobj
845
cea43436 846set a debug object
2bf79155 847
848=cut
849
850sub debugobj {
64cdad22 851 my $self = shift @_;
cea43436 852 return $self->master->debugobj(@_);
2bf79155 853}
854
2bf79155 855=head2 debugfh
856
cea43436 857set a debugfh object
2bf79155 858
859=cut
860
861sub debugfh {
64cdad22 862 my $self = shift @_;
cea43436 863 return $self->master->debugfh(@_);
2bf79155 864}
865
2bf79155 866=head2 debugcb
867
cea43436 868set a debug callback
2bf79155 869
870=cut
871
872sub debugcb {
64cdad22 873 my $self = shift @_;
cea43436 874 return $self->master->debugcb(@_);
2bf79155 875}
876
2bf79155 877=head2 disconnect
878
879disconnect everything
880
881=cut
882
883sub disconnect {
64cdad22 884 my $self = shift @_;
885 foreach my $source ($self->all_storages) {
886 $source->disconnect(@_);
887 }
2bf79155 888}
889
b2e4d522 890=head2 cursor_class
891
892set cursor class on all storages, or return master's
893
894=cut
895
896sub cursor_class {
897 my ($self, $cursor_class) = @_;
898
899 if ($cursor_class) {
900 $_->cursor_class($cursor_class) for $self->all_storages;
901 }
902 $self->master->cursor_class;
903}
d4daee7b 904
3244fdcc 905=head2 cursor
906
907set cursor class on all storages, or return master's, alias for L</cursor_class>
908above.
909
910=cut
911
912sub cursor {
913 my ($self, $cursor_class) = @_;
914
915 if ($cursor_class) {
916 $_->cursor($cursor_class) for $self->all_storages;
917 }
918 $self->master->cursor;
919}
920
921=head2 unsafe
922
923sets the L<DBIx::Class::Storage::DBI/unsafe> option on all storages or returns
924master's current setting
925
926=cut
927
928sub unsafe {
929 my $self = shift;
930
931 if (@_) {
932 $_->unsafe(@_) for $self->all_storages;
933 }
934
935 return $self->master->unsafe;
936}
937
938=head2 disable_sth_caching
939
940sets the L<DBIx::Class::Storage::DBI/disable_sth_caching> option on all storages
941or returns master's current setting
942
943=cut
944
945sub disable_sth_caching {
946 my $self = shift;
947
948 if (@_) {
949 $_->disable_sth_caching(@_) for $self->all_storages;
950 }
951
952 return $self->master->disable_sth_caching;
953}
954
955=head2 lag_behind_master
956
957returns the highest Replicant L<DBIx::Class::Storage::DBI/lag_behind_master>
958setting
959
960=cut
961
962sub lag_behind_master {
963 my $self = shift;
964
965 return max map $_->lag_behind_master, $self->replicants;
fd323bf1 966}
3244fdcc 967
968=head2 is_replicating
969
970returns true if all replicants return true for
971L<DBIx::Class::Storage::DBI/is_replicating>
972
973=cut
974
975sub is_replicating {
976 my $self = shift;
977
978 return (grep $_->is_replicating, $self->replicants) == ($self->replicants);
979}
980
981=head2 connect_call_datetime_setup
982
983calls L<DBIx::Class::Storage::DBI/connect_call_datetime_setup> for all storages
984
985=cut
986
987sub connect_call_datetime_setup {
988 my $self = shift;
989 $_->connect_call_datetime_setup for $self->all_storages;
990}
991
992sub _populate_dbh {
993 my $self = shift;
994 $_->_populate_dbh for $self->all_storages;
995}
996
997sub _connect {
998 my $self = shift;
999 $_->_connect for $self->all_storages;
1000}
1001
1002sub _rebless {
1003 my $self = shift;
1004 $_->_rebless for $self->all_storages;
1005}
1006
1007sub _determine_driver {
1008 my $self = shift;
1009 $_->_determine_driver for $self->all_storages;
1010}
1011
1012sub _driver_determined {
1013 my $self = shift;
fd323bf1 1014
3244fdcc 1015 if (@_) {
1016 $_->_driver_determined(@_) for $self->all_storages;
1017 }
1018
1019 return $self->master->_driver_determined;
1020}
1021
1022sub _init {
1023 my $self = shift;
fd323bf1 1024
3244fdcc 1025 $_->_init for $self->all_storages;
1026}
1027
1028sub _run_connection_actions {
1029 my $self = shift;
fd323bf1 1030
3244fdcc 1031 $_->_run_connection_actions for $self->all_storages;
1032}
1033
1034sub _do_connection_actions {
1035 my $self = shift;
fd323bf1 1036
3244fdcc 1037 if (@_) {
1038 $_->_do_connection_actions(@_) for $self->all_storages;
1039 }
1040}
1041
1042sub connect_call_do_sql {
1043 my $self = shift;
1044 $_->connect_call_do_sql(@_) for $self->all_storages;
1045}
1046
1047sub disconnect_call_do_sql {
1048 my $self = shift;
1049 $_->disconnect_call_do_sql(@_) for $self->all_storages;
1050}
1051
1052sub _seems_connected {
1053 my $self = shift;
1054
1055 return min map $_->_seems_connected, $self->all_storages;
1056}
1057
1058sub _ping {
1059 my $self = shift;
1060
1061 return min map $_->_ping, $self->all_storages;
1062}
1063
bbdda281 1064# not using the normalized_version, because we want to preserve
1065# version numbers much longer than the conventional xxx.yyyzzz
7da56142 1066my $numify_ver = sub {
1067 my $ver = shift;
1068 my @numparts = split /\D+/, $ver;
bbdda281 1069 my $format = '%d.' . (join '', ('%06d') x (@numparts - 1));
7da56142 1070
1071 return sprintf $format, @numparts;
1072};
fecb38cb 1073sub _server_info {
1074 my $self = shift;
1075
bbdda281 1076 if (not $self->_dbh_details->{info}) {
1077 $self->_dbh_details->{info} = (
fd323bf1 1078 reduce { $a->[0] < $b->[0] ? $a : $b }
7da56142 1079 map [ $numify_ver->($_->{dbms_version}), $_ ],
1080 map $_->_server_info, $self->all_storages
1081 )->[1];
fecb38cb 1082 }
1083
bbdda281 1084 return $self->next::method;
fecb38cb 1085}
1086
1087sub _get_server_version {
1088 my $self = shift;
1089
1090 return $self->_server_info->{dbms_version};
1091}
1092
7e38d850 1093=head1 GOTCHAS
1094
1095Due to the fact that replicants can lag behind a master, you must take care to
1096make sure you use one of the methods to force read queries to a master should
1097you need realtime data integrity. For example, if you insert a row, and then
1098immediately re-read it from the database (say, by doing $row->discard_changes)
1099or you insert a row and then immediately build a query that expects that row
1100to be an item, you should force the master to handle reads. Otherwise, due to
1101the lag, there is no certainty your data will be in the expected state.
1102
1103For data integrity, all transactions automatically use the master storage for
1104all read and write queries. Using a transaction is the preferred and recommended
1105method to force the master to handle all read queries.
1106
1107Otherwise, you can force a single query to use the master with the 'force_pool'
1108attribute:
1109
1110 my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk);
1111
1112This attribute will safely be ignore by non replicated storages, so you can use
1113the same code for both types of systems.
1114
1115Lastly, you can use the L</execute_reliably> method, which works very much like
1116a transaction.
1117
1118For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
1119and L</set_balanced_storage>, however this operates at a global level and is not
1120suitable if you have a shared Schema object being used by multiple processes,
1121such as on a web application server. You can get around this limitation by
1122using the Schema clone method.
1123
1124 my $new_schema = $schema->clone;
1125 $new_schema->set_reliable_storage;
d4daee7b 1126
7e38d850 1127 ## $new_schema will use only the Master storage for all reads/writes while
1128 ## the $schema object will use replicated storage.
1129
f5d3a5de 1130=head1 AUTHOR
1131
64cdad22 1132 John Napiorkowski <john.napiorkowski@takkle.com>
f5d3a5de 1133
c4d3fae2 1134Based on code originated by:
f5d3a5de 1135
64cdad22 1136 Norbert Csongrádi <bert@cpan.org>
1137 Peter Siklósi <einon@einon.hu>
2156bbdd 1138
f5d3a5de 1139=head1 LICENSE
1140
1141You may distribute this code under the same terms as Perl itself.
1142
1143=cut
1144
c354902c 1145__PACKAGE__->meta->make_immutable;
1146
f5d3a5de 11471;