Stop relying on ->can in the SQL::Abstract carp-hack, allows
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
CommitLineData
2156bbdd 1package DBIx::Class::Storage::DBI::Replicated;
f5d3a5de 2
ecb65397 3BEGIN {
4 use Carp::Clan qw/^DBIx::Class/;
a34b0c89 5 use DBIx::Class;
6 croak('The following modules are required for Replication ' . DBIx::Class::Optional::Dependencies->req_missing_for ('replicated') )
7 unless DBIx::Class::Optional::Dependencies->req_ok_for ('replicated');
ecb65397 8}
9
b2e4d522 10use Moose;
26ab719a 11use DBIx::Class::Storage::DBI;
2bf79155 12use DBIx::Class::Storage::DBI::Replicated::Pool;
26ab719a 13use DBIx::Class::Storage::DBI::Replicated::Balancer;
6a151f58 14use DBIx::Class::Storage::DBI::Replicated::Types qw/BalancerClassNamePart DBICSchema DBICStorageDBI/;
41916570 15use MooseX::Types::Moose qw/ClassName HashRef Object/;
b2e4d522 16use Scalar::Util 'reftype';
e666c5fd 17use Hash::Merge;
7da56142 18use List::Util qw/min max reduce/;
ed7ab0f4 19use Try::Tiny;
fd323bf1 20use namespace::clean;
9901aad7 21
22use namespace::clean -except => 'meta';
2bf79155 23
24=head1 NAME
25
ecb65397 26DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
2bf79155 27
28=head1 SYNOPSIS
29
30The Following example shows how to change an existing $schema to a replicated
48580715 31storage type, add some replicated (read-only) databases, and perform reporting
955a6df6 32tasks.
2bf79155 33
3da4f736 34You should set the 'storage_type attribute to a replicated type. You should
d4daee7b 35also define your arguments, such as which balancer you want and any arguments
3da4f736 36that the Pool object should get.
37
ce854fd3 38 my $schema = Schema::Class->clone;
64cdad22 39 $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
ce854fd3 40 $schema->connection(...);
d4daee7b 41
fd323bf1 42Next, you need to add in the Replicants. Basically this is an array of
3da4f736 43arrayrefs, where each arrayref is database connect information. Think of these
44arguments as what you'd pass to the 'normal' $schema->connect method.
d4daee7b 45
64cdad22 46 $schema->storage->connect_replicants(
47 [$dsn1, $user, $pass, \%opts],
48 [$dsn2, $user, $pass, \%opts],
49 [$dsn3, $user, $pass, \%opts],
50 );
d4daee7b 51
3da4f736 52Now, just use the $schema as you normally would. Automatically all reads will
53be delegated to the replicants, while writes to the master.
54
7e38d850 55 $schema->resultset('Source')->search({name=>'etc'});
d4daee7b 56
3da4f736 57You can force a given query to use a particular storage using the search
58attribute 'force_pool'. For example:
d4daee7b 59
7e38d850 60 my $RS = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
3da4f736 61
62Now $RS will force everything (both reads and writes) to use whatever was setup
fd323bf1 63as the master storage. 'master' is hardcoded to always point to the Master,
3da4f736 64but you can also use any Replicant name. Please see:
212cc5c2 65L<DBIx::Class::Storage::DBI::Replicated::Pool> and the replicants attribute for more.
3da4f736 66
67Also see transactions and L</execute_reliably> for alternative ways to
68force read traffic to the master. In general, you should wrap your statements
69in a transaction when you are reading and writing to the same tables at the
70same time, since your replicants will often lag a bit behind the master.
212cc5c2 71
72See L<DBIx::Class::Storage::DBI::Replicated::Instructions> for more help and
73walkthroughs.
d4daee7b 74
2bf79155 75=head1 DESCRIPTION
76
7e38d850 77Warning: This class is marked BETA. This has been running a production
ccb3b897 78website using MySQL native replication as its backend and we have some decent
7e38d850 79test coverage but the code hasn't yet been stressed by a variety of databases.
48580715 80Individual DBs may have quirks we are not aware of. Please use this in first
7e38d850 81development and pass along your experiences/bug fixes.
2bf79155 82
83This class implements replicated data store for DBI. Currently you can define
84one master and numerous slave database connections. All write-type queries
85(INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
86database, all read-type queries (SELECTs) go to the slave database.
87
88Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
bca099a3 89handle gets delegated to one of the two attributes: L</read_handler> or to
90L</write_handler>. Additionally, some methods need to be distributed
2bf79155 91to all existing storages. This way our storage class is a drop in replacement
92for L<DBIx::Class::Storage::DBI>.
93
48580715 94Read traffic is spread across the replicants (slaves) occurring to a user
2bf79155 95selected algorithm. The default algorithm is random weighted.
96
bca099a3 97=head1 NOTES
98
48580715 99The consistency between master and replicants is database specific. The Pool
faaba25f 100gives you a method to validate its replicants, removing and replacing them
7e38d850 101when they fail/pass predefined criteria. Please make careful use of the ways
ecb65397 102to force a query to run against Master when needed.
103
104=head1 REQUIREMENTS
105
a34b0c89 106Replicated Storage has additional requirements not currently part of
aa8b2277 107L<DBIx::Class>. See L<DBIx::Class::Optional::Dependencies> for more details.
2bf79155 108
109=head1 ATTRIBUTES
110
111This class defines the following attributes.
112
2ce6e9a6 113=head2 schema
114
115The underlying L<DBIx::Class::Schema> object this storage is attaching
116
117=cut
118
119has 'schema' => (
120 is=>'rw',
6a151f58 121 isa=>DBICSchema,
2ce6e9a6 122 weak_ref=>1,
123 required=>1,
124);
125
26ab719a 126=head2 pool_type
2bf79155 127
fd323bf1 128Contains the classname which will instantiate the L</pool> object. Defaults
26ab719a 129to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
2bf79155 130
131=cut
132
26ab719a 133has 'pool_type' => (
dcdf7b2c 134 is=>'rw',
41916570 135 isa=>ClassName,
2ce6e9a6 136 default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
64cdad22 137 handles=>{
138 'create_pool' => 'new',
139 },
2bf79155 140);
141
f068a139 142=head2 pool_args
143
144Contains a hashref of initialized information to pass to the Balancer object.
212cc5c2 145See L<DBIx::Class::Storage::DBI::Replicated::Pool> for available arguments.
f068a139 146
147=cut
148
149has 'pool_args' => (
dcdf7b2c 150 is=>'rw',
41916570 151 isa=>HashRef,
64cdad22 152 lazy=>1,
64cdad22 153 default=>sub { {} },
f068a139 154);
155
156
26ab719a 157=head2 balancer_type
2bf79155 158
159The replication pool requires a balance class to provider the methods for
160choose how to spread the query load across each replicant in the pool.
161
162=cut
163
26ab719a 164has 'balancer_type' => (
dcdf7b2c 165 is=>'rw',
9901aad7 166 isa=>BalancerClassNamePart,
2ce6e9a6 167 coerce=>1,
168 required=>1,
169 default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
64cdad22 170 handles=>{
171 'create_balancer' => 'new',
172 },
2bf79155 173);
174
17b05c13 175=head2 balancer_args
176
177Contains a hashref of initialized information to pass to the Balancer object.
212cc5c2 178See L<DBIx::Class::Storage::DBI::Replicated::Balancer> for available arguments.
17b05c13 179
180=cut
181
182has 'balancer_args' => (
dcdf7b2c 183 is=>'rw',
41916570 184 isa=>HashRef,
64cdad22 185 lazy=>1,
186 required=>1,
187 default=>sub { {} },
17b05c13 188);
189
26ab719a 190=head2 pool
2bf79155 191
370f78d4 192Is a L<DBIx::Class::Storage::DBI::Replicated::Pool> or derived class. This is a
26ab719a 193container class for one or more replicated databases.
2bf79155 194
195=cut
196
26ab719a 197has 'pool' => (
64cdad22 198 is=>'ro',
199 isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
200 lazy_build=>1,
201 handles=>[qw/
6f7344b8 202 connect_replicants
64cdad22 203 replicants
204 has_replicants
205 /],
2bf79155 206);
207
26ab719a 208=head2 balancer
2bf79155 209
370f78d4 210Is a L<DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class. This
211is a class that takes a pool (L<DBIx::Class::Storage::DBI::Replicated::Pool>)
2bf79155 212
26ab719a 213=cut
2bf79155 214
26ab719a 215has 'balancer' => (
dcdf7b2c 216 is=>'rw',
64cdad22 217 isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
218 lazy_build=>1,
219 handles=>[qw/auto_validate_every/],
26ab719a 220);
2bf79155 221
cb6ec758 222=head2 master
223
224The master defines the canonical state for a pool of connected databases. All
225the replicants are expected to match this databases state. Thus, in a classic
226Master / Slaves distributed system, all the slaves are expected to replicate
227the Master's state as quick as possible. This is the only database in the
228pool of databases that is allowed to handle write traffic.
229
230=cut
231
232has 'master' => (
64cdad22 233 is=> 'ro',
6a151f58 234 isa=>DBICStorageDBI,
64cdad22 235 lazy_build=>1,
cb6ec758 236);
237
cb6ec758 238=head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
239
fd323bf1 240The following methods are delegated all the methods required for the
cb6ec758 241L<DBIx::Class::Storage::DBI> interface.
242
243=head2 read_handler
244
245Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
246
247=cut
248
249has 'read_handler' => (
64cdad22 250 is=>'rw',
41916570 251 isa=>Object,
64cdad22 252 lazy_build=>1,
253 handles=>[qw/
254 select
255 select_single
256 columns_info_for
fd323bf1 257 _dbh_columns_info_for
3244fdcc 258 _select
6f7344b8 259 /],
cb6ec758 260);
261
cb6ec758 262=head2 write_handler
263
3244fdcc 264Defines an object that implements the write side of L<BIx::Class::Storage::DBI>,
265as well as methods that don't write or read that can be called on only one
266storage, methods that return a C<$dbh>, and any methods that don't make sense to
267run on a replicant.
cb6ec758 268
269=cut
270
271has 'write_handler' => (
64cdad22 272 is=>'ro',
41916570 273 isa=>Object,
64cdad22 274 lazy_build=>1,
6f7344b8 275 handles=>[qw/
64cdad22 276 on_connect_do
6f7344b8 277 on_disconnect_do
3244fdcc 278 on_connect_call
279 on_disconnect_call
64cdad22 280 connect_info
3244fdcc 281 _connect_info
64cdad22 282 throw_exception
283 sql_maker
284 sqlt_type
285 create_ddl_dir
286 deployment_statements
287 datetime_parser
6f7344b8 288 datetime_parser_type
289 build_datetime_parser
64cdad22 290 last_insert_id
291 insert
292 insert_bulk
293 update
294 delete
295 dbh
2ce6e9a6 296 txn_begin
64cdad22 297 txn_do
298 txn_commit
299 txn_rollback
2ce6e9a6 300 txn_scope_guard
64cdad22 301 sth
302 deploy
0180bef9 303 with_deferred_fk_checks
6f7344b8 304 dbh_do
64cdad22 305 reload_row
6f7344b8 306 with_deferred_fk_checks
2ce6e9a6 307 _prep_for_execute
7fb60fb1 308
6f7344b8 309 backup
310 is_datatype_numeric
311 _count_select
6f7344b8 312 _subq_update_delete
313 svp_rollback
314 svp_begin
315 svp_release
e398f77e 316 relname_to_table_alias
317 _straight_join_to_node
3244fdcc 318 _dbh_last_insert_id
319 _fix_bind_params
320 _default_dbi_connect_attributes
321 _dbi_connect_info
b9ca4ff1 322 _dbic_connect_attributes
3244fdcc 323 auto_savepoint
324 _sqlt_version_ok
325 _query_end
326 bind_attribute_by_data_type
327 transaction_depth
328 _dbh
329 _select_args
330 _dbh_execute_array
331 _sql_maker_args
332 _sql_maker
333 _query_start
334 _sqlt_version_error
335 _per_row_update_delete
336 _dbh_begin_work
337 _dbh_execute_inserts_with_no_binds
338 _select_args_to_query
339 _svp_generate_name
340 _multipk_update_delete
341 source_bind_attributes
342 _normalize_connect_info
343 _parse_connect_do
344 _dbh_commit
345 _execute_array
3244fdcc 346 savepoints
347 _sqlt_minimum_version
348 _sql_maker_opts
349 _conn_pid
3244fdcc 350 _conn_tid
351 _dbh_autocommit
352 _native_data_type
353 _get_dbh
354 sql_maker_class
355 _dbh_rollback
356 _adjust_select_args_for_complex_prefetch
357 _resolve_ident_sources
358 _resolve_column_info
359 _prune_unused_joins
360 _strip_cond_qualifiers
361 _parse_order_by
362 _resolve_aliastypes_from_select_args
363 _execute
364 _do_query
365 _dbh_sth
366 _dbh_execute
6ed1cd2e 367 _prefetch_insert_auto_nextvals
64cdad22 368 /],
cb6ec758 369);
370
e471ab87 371my @unimplemented = qw(
372 _arm_global_destructor
373 _preserve_foreign_dbh
572338e0 374 _verify_pid
375 _verify_tid
bbdda281 376
377 get_use_dbms_capability
378 set_use_dbms_capability
379 get_dbms_capability
380 set_dbms_capability
381
382 _dbh_details
383
384 _use_insert_returning
385 _supports_insert_returning
386
387 _use_placeholders
388 _supports_placeholders
389 _determine_supports_placeholders
390
391 _use_typeless_placeholders
392 _supports_typeless_placeholders
393 _determine_supports_typeless_placeholders
e471ab87 394);
395
396for my $method (@unimplemented) {
397 __PACKAGE__->meta->add_method($method, sub {
398 croak "$method must not be called on ".(blessed shift).' objects';
399 });
400}
6d766626 401
b2e4d522 402has _master_connect_info_opts =>
403 (is => 'rw', isa => HashRef, default => sub { {} });
404
405=head2 around: connect_info
406
48580715 407Preserves master's C<connect_info> options (for merging with replicants.)
408Also sets any Replicated-related options from connect_info, such as
dcdf7b2c 409C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>.
b2e4d522 410
411=cut
412
413around connect_info => sub {
414 my ($next, $self, $info, @extra) = @_;
415
0ce2d0d5 416 my $wantarray = wantarray;
417
282a9a4f 418 my $merge = Hash::Merge->new('LEFT_PRECEDENT');
e666c5fd 419
b2e4d522 420 my %opts;
421 for my $arg (@$info) {
422 next unless (reftype($arg)||'') eq 'HASH';
e666c5fd 423 %opts = %{ $merge->merge($arg, \%opts) };
b2e4d522 424 }
b2e4d522 425 delete $opts{dsn};
426
dcdf7b2c 427 if (@opts{qw/pool_type pool_args/}) {
428 $self->pool_type(delete $opts{pool_type})
429 if $opts{pool_type};
430
b88b85e7 431 $self->pool_args(
e666c5fd 432 $merge->merge((delete $opts{pool_args} || {}), $self->pool_args)
b88b85e7 433 );
dcdf7b2c 434
67c43863 435 $self->pool($self->_build_pool)
6f7344b8 436 if $self->pool;
dcdf7b2c 437 }
438
439 if (@opts{qw/balancer_type balancer_args/}) {
440 $self->balancer_type(delete $opts{balancer_type})
441 if $opts{balancer_type};
442
b88b85e7 443 $self->balancer_args(
e666c5fd 444 $merge->merge((delete $opts{balancer_args} || {}), $self->balancer_args)
b88b85e7 445 );
dcdf7b2c 446
67c43863 447 $self->balancer($self->_build_balancer)
6f7344b8 448 if $self->balancer;
dcdf7b2c 449 }
450
b2e4d522 451 $self->_master_connect_info_opts(\%opts);
452
0ce2d0d5 453 my (@res, $res);
454 if ($wantarray) {
455 @res = $self->$next($info, @extra);
456 } else {
457 $res = $self->$next($info, @extra);
458 }
459
fd4eb9c2 460 # Make sure master is blessed into the correct class and apply role to it.
461 my $master = $self->master;
462 $master->_determine_driver;
463 Moose::Meta::Class->initialize(ref $master);
cea43436 464
ec0946db 465 DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master);
cea43436 466
467 # link pool back to master
468 $self->pool->master($master);
0ce2d0d5 469
470 $wantarray ? @res : $res;
b2e4d522 471};
472
26ab719a 473=head1 METHODS
2bf79155 474
26ab719a 475This class defines the following methods.
2bf79155 476
c354902c 477=head2 BUILDARGS
2bf79155 478
faaba25f 479L<DBIx::Class::Schema> when instantiating its storage passed itself as the
2ce6e9a6 480first argument. So we need to massage the arguments a bit so that all the
481bits get put into the correct places.
2bf79155 482
483=cut
484
c354902c 485sub BUILDARGS {
fd323bf1 486 my ($class, $schema, $storage_type_args, @args) = @_;
d4daee7b 487
c354902c 488 return {
6f7344b8 489 schema=>$schema,
490 %$storage_type_args,
491 @args
c354902c 492 }
493}
2bf79155 494
cb6ec758 495=head2 _build_master
2bf79155 496
cb6ec758 497Lazy builder for the L</master> attribute.
2bf79155 498
499=cut
500
cb6ec758 501sub _build_master {
2ce6e9a6 502 my $self = shift @_;
ee356d00 503 my $master = DBIx::Class::Storage::DBI->new($self->schema);
ee356d00 504 $master
106d5f3b 505}
506
26ab719a 507=head2 _build_pool
2bf79155 508
26ab719a 509Lazy builder for the L</pool> attribute.
2bf79155 510
511=cut
512
26ab719a 513sub _build_pool {
64cdad22 514 my $self = shift @_;
515 $self->create_pool(%{$self->pool_args});
2bf79155 516}
517
26ab719a 518=head2 _build_balancer
2bf79155 519
cb6ec758 520Lazy builder for the L</balancer> attribute. This takes a Pool object so that
521the balancer knows which pool it's balancing.
2bf79155 522
523=cut
524
26ab719a 525sub _build_balancer {
64cdad22 526 my $self = shift @_;
527 $self->create_balancer(
6f7344b8 528 pool=>$self->pool,
64cdad22 529 master=>$self->master,
530 %{$self->balancer_args},
531 );
2bf79155 532}
533
cb6ec758 534=head2 _build_write_handler
2bf79155 535
cb6ec758 536Lazy builder for the L</write_handler> attribute. The default is to set this to
537the L</master>.
50336325 538
539=cut
540
cb6ec758 541sub _build_write_handler {
64cdad22 542 return shift->master;
cb6ec758 543}
50336325 544
cb6ec758 545=head2 _build_read_handler
2bf79155 546
cb6ec758 547Lazy builder for the L</read_handler> attribute. The default is to set this to
548the L</balancer>.
2bf79155 549
550=cut
551
cb6ec758 552sub _build_read_handler {
64cdad22 553 return shift->balancer;
cb6ec758 554}
50336325 555
cb6ec758 556=head2 around: connect_replicants
2bf79155 557
cb6ec758 558All calls to connect_replicants needs to have an existing $schema tacked onto
b2e4d522 559top of the args, since L<DBIx::Storage::DBI> needs it, and any C<connect_info>
560options merged with the master, with replicant opts having higher priority.
955a6df6 561
cb6ec758 562=cut
955a6df6 563
b2e4d522 564around connect_replicants => sub {
565 my ($next, $self, @args) = @_;
566
567 for my $r (@args) {
568 $r = [ $r ] unless reftype $r eq 'ARRAY';
569
1a58752c 570 $self->throw_exception('coderef replicant connect_info not supported')
b2e4d522 571 if ref $r->[0] && reftype $r->[0] eq 'CODE';
572
573# any connect_info options?
574 my $i = 0;
575 $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
576
6f7344b8 577# make one if none
b2e4d522 578 $r->[$i] = {} unless $r->[$i];
579
580# merge if two hashes
b88b85e7 581 my @hashes = @$r[$i .. $#{$r}];
582
1a58752c 583 $self->throw_exception('invalid connect_info options')
b88b85e7 584 if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes;
585
1a58752c 586 $self->throw_exception('too many hashrefs in connect_info')
b88b85e7 587 if @hashes > 2;
588
282a9a4f 589 my $merge = Hash::Merge->new('LEFT_PRECEDENT');
e666c5fd 590 my %opts = %{ $merge->merge(reverse @hashes) };
b88b85e7 591
592# delete them
b2e4d522 593 splice @$r, $i+1, ($#{$r} - $i), ();
594
0bd8e058 595# make sure master/replicants opts don't clash
596 my %master_opts = %{ $self->_master_connect_info_opts };
597 if (exists $opts{dbh_maker}) {
598 delete @master_opts{qw/dsn user password/};
599 }
600 delete $master_opts{dbh_maker};
601
b2e4d522 602# merge with master
e666c5fd 603 %opts = %{ $merge->merge(\%opts, \%master_opts) };
b2e4d522 604
605# update
606 $r->[$i] = \%opts;
607 }
608
609 $self->$next($self->schema, @args);
955a6df6 610};
2bf79155 611
2bf79155 612=head2 all_storages
613
614Returns an array of of all the connected storage backends. The first element
615in the returned array is the master, and the remainings are each of the
616replicants.
617
618=cut
619
620sub all_storages {
64cdad22 621 my $self = shift @_;
622 return grep {defined $_ && blessed $_} (
623 $self->master,
6412a592 624 values %{ $self->replicants },
64cdad22 625 );
2bf79155 626}
627
c4d3fae2 628=head2 execute_reliably ($coderef, ?@args)
629
630Given a coderef, saves the current state of the L</read_handler>, forces it to
48580715 631use reliable storage (e.g. sets it to the master), executes a coderef and then
c4d3fae2 632restores the original state.
633
634Example:
635
64cdad22 636 my $reliably = sub {
637 my $name = shift @_;
638 $schema->resultset('User')->create({name=>$name});
fd323bf1 639 my $user_rs = $schema->resultset('User')->find({name=>$name});
64cdad22 640 return $user_rs;
641 };
c4d3fae2 642
64cdad22 643 my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
c4d3fae2 644
645Use this when you must be certain of your database state, such as when you just
646inserted something and need to get a resultset including it, etc.
647
648=cut
649
650sub execute_reliably {
64cdad22 651 my ($self, $coderef, @args) = @_;
d4daee7b 652
64cdad22 653 unless( ref $coderef eq 'CODE') {
654 $self->throw_exception('Second argument must be a coderef');
655 }
d4daee7b 656
64cdad22 657 ##Get copy of master storage
658 my $master = $self->master;
d4daee7b 659
64cdad22 660 ##Get whatever the current read hander is
661 my $current = $self->read_handler;
d4daee7b 662
64cdad22 663 ##Set the read handler to master
664 $self->read_handler($master);
d4daee7b 665
64cdad22 666 ## do whatever the caller needs
667 my @result;
668 my $want_array = wantarray;
d4daee7b 669
ed7ab0f4 670 try {
64cdad22 671 if($want_array) {
672 @result = $coderef->(@args);
673 } elsif(defined $want_array) {
674 ($result[0]) = ($coderef->(@args));
ed213e85 675 } else {
64cdad22 676 $coderef->(@args);
6f7344b8 677 }
ed7ab0f4 678 } catch {
679 $self->throw_exception("coderef returned an error: $_");
680 } finally {
681 ##Reset to the original state
682 $self->read_handler($current);
64cdad22 683 };
d4daee7b 684
ed7ab0f4 685 return $want_array ? @result : $result[0];
c4d3fae2 686}
687
cb6ec758 688=head2 set_reliable_storage
689
690Sets the current $schema to be 'reliable', that is all queries, both read and
691write are sent to the master
d4daee7b 692
cb6ec758 693=cut
694
695sub set_reliable_storage {
64cdad22 696 my $self = shift @_;
697 my $schema = $self->schema;
698 my $write_handler = $self->schema->storage->write_handler;
d4daee7b 699
64cdad22 700 $schema->storage->read_handler($write_handler);
cb6ec758 701}
702
703=head2 set_balanced_storage
704
705Sets the current $schema to be use the </balancer> for all reads, while all
48580715 706writes are sent to the master only
d4daee7b 707
cb6ec758 708=cut
709
710sub set_balanced_storage {
64cdad22 711 my $self = shift @_;
712 my $schema = $self->schema;
bd5da369 713 my $balanced_handler = $self->schema->storage->balancer;
d4daee7b 714
bd5da369 715 $schema->storage->read_handler($balanced_handler);
cb6ec758 716}
2bf79155 717
718=head2 connected
719
720Check that the master and at least one of the replicants is connected.
721
722=cut
723
724sub connected {
64cdad22 725 my $self = shift @_;
726 return
727 $self->master->connected &&
728 $self->pool->connected_replicants;
2bf79155 729}
730
2bf79155 731=head2 ensure_connected
732
733Make sure all the storages are connected.
734
735=cut
736
737sub ensure_connected {
64cdad22 738 my $self = shift @_;
739 foreach my $source ($self->all_storages) {
740 $source->ensure_connected(@_);
741 }
2bf79155 742}
743
2bf79155 744=head2 limit_dialect
745
746Set the limit_dialect for all existing storages
747
748=cut
749
750sub limit_dialect {
64cdad22 751 my $self = shift @_;
752 foreach my $source ($self->all_storages) {
753 $source->limit_dialect(@_);
754 }
3fbe08e3 755 return $self->master->quote_char;
2bf79155 756}
757
2bf79155 758=head2 quote_char
759
760Set the quote_char for all existing storages
761
762=cut
763
764sub quote_char {
64cdad22 765 my $self = shift @_;
766 foreach my $source ($self->all_storages) {
767 $source->quote_char(@_);
768 }
3fbe08e3 769 return $self->master->quote_char;
2bf79155 770}
771
2bf79155 772=head2 name_sep
773
774Set the name_sep for all existing storages
775
776=cut
777
778sub name_sep {
64cdad22 779 my $self = shift @_;
780 foreach my $source ($self->all_storages) {
781 $source->name_sep(@_);
782 }
3fbe08e3 783 return $self->master->name_sep;
2bf79155 784}
785
2bf79155 786=head2 set_schema
787
788Set the schema object for all existing storages
789
790=cut
791
792sub set_schema {
64cdad22 793 my $self = shift @_;
794 foreach my $source ($self->all_storages) {
795 $source->set_schema(@_);
796 }
2bf79155 797}
798
2bf79155 799=head2 debug
800
801set a debug flag across all storages
802
803=cut
804
805sub debug {
64cdad22 806 my $self = shift @_;
3fbe08e3 807 if(@_) {
808 foreach my $source ($self->all_storages) {
809 $source->debug(@_);
6f7344b8 810 }
64cdad22 811 }
3fbe08e3 812 return $self->master->debug;
2bf79155 813}
814
2bf79155 815=head2 debugobj
816
cea43436 817set a debug object
2bf79155 818
819=cut
820
821sub debugobj {
64cdad22 822 my $self = shift @_;
cea43436 823 return $self->master->debugobj(@_);
2bf79155 824}
825
2bf79155 826=head2 debugfh
827
cea43436 828set a debugfh object
2bf79155 829
830=cut
831
832sub debugfh {
64cdad22 833 my $self = shift @_;
cea43436 834 return $self->master->debugfh(@_);
2bf79155 835}
836
2bf79155 837=head2 debugcb
838
cea43436 839set a debug callback
2bf79155 840
841=cut
842
843sub debugcb {
64cdad22 844 my $self = shift @_;
cea43436 845 return $self->master->debugcb(@_);
2bf79155 846}
847
2bf79155 848=head2 disconnect
849
850disconnect everything
851
852=cut
853
854sub disconnect {
64cdad22 855 my $self = shift @_;
856 foreach my $source ($self->all_storages) {
857 $source->disconnect(@_);
858 }
2bf79155 859}
860
b2e4d522 861=head2 cursor_class
862
863set cursor class on all storages, or return master's
864
865=cut
866
867sub cursor_class {
868 my ($self, $cursor_class) = @_;
869
870 if ($cursor_class) {
871 $_->cursor_class($cursor_class) for $self->all_storages;
872 }
873 $self->master->cursor_class;
874}
d4daee7b 875
3244fdcc 876=head2 cursor
877
878set cursor class on all storages, or return master's, alias for L</cursor_class>
879above.
880
881=cut
882
883sub cursor {
884 my ($self, $cursor_class) = @_;
885
886 if ($cursor_class) {
887 $_->cursor($cursor_class) for $self->all_storages;
888 }
889 $self->master->cursor;
890}
891
892=head2 unsafe
893
894sets the L<DBIx::Class::Storage::DBI/unsafe> option on all storages or returns
895master's current setting
896
897=cut
898
899sub unsafe {
900 my $self = shift;
901
902 if (@_) {
903 $_->unsafe(@_) for $self->all_storages;
904 }
905
906 return $self->master->unsafe;
907}
908
909=head2 disable_sth_caching
910
911sets the L<DBIx::Class::Storage::DBI/disable_sth_caching> option on all storages
912or returns master's current setting
913
914=cut
915
916sub disable_sth_caching {
917 my $self = shift;
918
919 if (@_) {
920 $_->disable_sth_caching(@_) for $self->all_storages;
921 }
922
923 return $self->master->disable_sth_caching;
924}
925
926=head2 lag_behind_master
927
928returns the highest Replicant L<DBIx::Class::Storage::DBI/lag_behind_master>
929setting
930
931=cut
932
933sub lag_behind_master {
934 my $self = shift;
935
936 return max map $_->lag_behind_master, $self->replicants;
fd323bf1 937}
3244fdcc 938
939=head2 is_replicating
940
941returns true if all replicants return true for
942L<DBIx::Class::Storage::DBI/is_replicating>
943
944=cut
945
946sub is_replicating {
947 my $self = shift;
948
949 return (grep $_->is_replicating, $self->replicants) == ($self->replicants);
950}
951
952=head2 connect_call_datetime_setup
953
954calls L<DBIx::Class::Storage::DBI/connect_call_datetime_setup> for all storages
955
956=cut
957
958sub connect_call_datetime_setup {
959 my $self = shift;
960 $_->connect_call_datetime_setup for $self->all_storages;
961}
962
963sub _populate_dbh {
964 my $self = shift;
965 $_->_populate_dbh for $self->all_storages;
966}
967
968sub _connect {
969 my $self = shift;
970 $_->_connect for $self->all_storages;
971}
972
973sub _rebless {
974 my $self = shift;
975 $_->_rebless for $self->all_storages;
976}
977
978sub _determine_driver {
979 my $self = shift;
980 $_->_determine_driver for $self->all_storages;
981}
982
983sub _driver_determined {
984 my $self = shift;
fd323bf1 985
3244fdcc 986 if (@_) {
987 $_->_driver_determined(@_) for $self->all_storages;
988 }
989
990 return $self->master->_driver_determined;
991}
992
993sub _init {
994 my $self = shift;
fd323bf1 995
3244fdcc 996 $_->_init for $self->all_storages;
997}
998
999sub _run_connection_actions {
1000 my $self = shift;
fd323bf1 1001
3244fdcc 1002 $_->_run_connection_actions for $self->all_storages;
1003}
1004
1005sub _do_connection_actions {
1006 my $self = shift;
fd323bf1 1007
3244fdcc 1008 if (@_) {
1009 $_->_do_connection_actions(@_) for $self->all_storages;
1010 }
1011}
1012
1013sub connect_call_do_sql {
1014 my $self = shift;
1015 $_->connect_call_do_sql(@_) for $self->all_storages;
1016}
1017
1018sub disconnect_call_do_sql {
1019 my $self = shift;
1020 $_->disconnect_call_do_sql(@_) for $self->all_storages;
1021}
1022
1023sub _seems_connected {
1024 my $self = shift;
1025
1026 return min map $_->_seems_connected, $self->all_storages;
1027}
1028
1029sub _ping {
1030 my $self = shift;
1031
1032 return min map $_->_ping, $self->all_storages;
1033}
1034
bbdda281 1035# not using the normalized_version, because we want to preserve
1036# version numbers much longer than the conventional xxx.yyyzzz
7da56142 1037my $numify_ver = sub {
1038 my $ver = shift;
1039 my @numparts = split /\D+/, $ver;
bbdda281 1040 my $format = '%d.' . (join '', ('%06d') x (@numparts - 1));
7da56142 1041
1042 return sprintf $format, @numparts;
1043};
fecb38cb 1044sub _server_info {
1045 my $self = shift;
1046
bbdda281 1047 if (not $self->_dbh_details->{info}) {
1048 $self->_dbh_details->{info} = (
fd323bf1 1049 reduce { $a->[0] < $b->[0] ? $a : $b }
7da56142 1050 map [ $numify_ver->($_->{dbms_version}), $_ ],
1051 map $_->_server_info, $self->all_storages
1052 )->[1];
fecb38cb 1053 }
1054
bbdda281 1055 return $self->next::method;
fecb38cb 1056}
1057
1058sub _get_server_version {
1059 my $self = shift;
1060
1061 return $self->_server_info->{dbms_version};
1062}
1063
7e38d850 1064=head1 GOTCHAS
1065
1066Due to the fact that replicants can lag behind a master, you must take care to
1067make sure you use one of the methods to force read queries to a master should
1068you need realtime data integrity. For example, if you insert a row, and then
1069immediately re-read it from the database (say, by doing $row->discard_changes)
1070or you insert a row and then immediately build a query that expects that row
1071to be an item, you should force the master to handle reads. Otherwise, due to
1072the lag, there is no certainty your data will be in the expected state.
1073
1074For data integrity, all transactions automatically use the master storage for
1075all read and write queries. Using a transaction is the preferred and recommended
1076method to force the master to handle all read queries.
1077
1078Otherwise, you can force a single query to use the master with the 'force_pool'
1079attribute:
1080
1081 my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk);
1082
1083This attribute will safely be ignore by non replicated storages, so you can use
1084the same code for both types of systems.
1085
1086Lastly, you can use the L</execute_reliably> method, which works very much like
1087a transaction.
1088
1089For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
1090and L</set_balanced_storage>, however this operates at a global level and is not
1091suitable if you have a shared Schema object being used by multiple processes,
1092such as on a web application server. You can get around this limitation by
1093using the Schema clone method.
1094
1095 my $new_schema = $schema->clone;
1096 $new_schema->set_reliable_storage;
d4daee7b 1097
7e38d850 1098 ## $new_schema will use only the Master storage for all reads/writes while
1099 ## the $schema object will use replicated storage.
1100
f5d3a5de 1101=head1 AUTHOR
1102
64cdad22 1103 John Napiorkowski <john.napiorkowski@takkle.com>
f5d3a5de 1104
c4d3fae2 1105Based on code originated by:
f5d3a5de 1106
64cdad22 1107 Norbert Csongrádi <bert@cpan.org>
1108 Peter Siklósi <einon@einon.hu>
2156bbdd 1109
f5d3a5de 1110=head1 LICENSE
1111
1112You may distribute this code under the same terms as Perl itself.
1113
1114=cut
1115
c354902c 1116__PACKAGE__->meta->make_immutable;
1117
f5d3a5de 11181;