use namespace::clean w/ Try::Tiny
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
CommitLineData
2156bbdd 1package DBIx::Class::Storage::DBI::Replicated;
f5d3a5de 2
ecb65397 3BEGIN {
4 use Carp::Clan qw/^DBIx::Class/;
a34b0c89 5 use DBIx::Class;
6 croak('The following modules are required for Replication ' . DBIx::Class::Optional::Dependencies->req_missing_for ('replicated') )
7 unless DBIx::Class::Optional::Dependencies->req_ok_for ('replicated');
ecb65397 8}
9
b2e4d522 10use Moose;
26ab719a 11use DBIx::Class::Storage::DBI;
2bf79155 12use DBIx::Class::Storage::DBI::Replicated::Pool;
26ab719a 13use DBIx::Class::Storage::DBI::Replicated::Balancer;
6a151f58 14use DBIx::Class::Storage::DBI::Replicated::Types qw/BalancerClassNamePart DBICSchema DBICStorageDBI/;
41916570 15use MooseX::Types::Moose qw/ClassName HashRef Object/;
b2e4d522 16use Scalar::Util 'reftype';
e666c5fd 17use Hash::Merge;
7da56142 18use List::Util qw/min max reduce/;
ed7ab0f4 19use Try::Tiny;
fd323bf1 20use namespace::clean;
9901aad7 21
22use namespace::clean -except => 'meta';
2bf79155 23
24=head1 NAME
25
ecb65397 26DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
2bf79155 27
28=head1 SYNOPSIS
29
30The Following example shows how to change an existing $schema to a replicated
48580715 31storage type, add some replicated (read-only) databases, and perform reporting
955a6df6 32tasks.
2bf79155 33
3da4f736 34You should set the 'storage_type attribute to a replicated type. You should
d4daee7b 35also define your arguments, such as which balancer you want and any arguments
3da4f736 36that the Pool object should get.
37
ce854fd3 38 my $schema = Schema::Class->clone;
64cdad22 39 $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
ce854fd3 40 $schema->connection(...);
d4daee7b 41
fd323bf1 42Next, you need to add in the Replicants. Basically this is an array of
3da4f736 43arrayrefs, where each arrayref is database connect information. Think of these
44arguments as what you'd pass to the 'normal' $schema->connect method.
d4daee7b 45
64cdad22 46 $schema->storage->connect_replicants(
47 [$dsn1, $user, $pass, \%opts],
48 [$dsn2, $user, $pass, \%opts],
49 [$dsn3, $user, $pass, \%opts],
50 );
d4daee7b 51
3da4f736 52Now, just use the $schema as you normally would. Automatically all reads will
53be delegated to the replicants, while writes to the master.
54
7e38d850 55 $schema->resultset('Source')->search({name=>'etc'});
d4daee7b 56
3da4f736 57You can force a given query to use a particular storage using the search
58attribute 'force_pool'. For example:
d4daee7b 59
7e38d850 60 my $RS = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
3da4f736 61
62Now $RS will force everything (both reads and writes) to use whatever was setup
fd323bf1 63as the master storage. 'master' is hardcoded to always point to the Master,
3da4f736 64but you can also use any Replicant name. Please see:
212cc5c2 65L<DBIx::Class::Storage::DBI::Replicated::Pool> and the replicants attribute for more.
3da4f736 66
67Also see transactions and L</execute_reliably> for alternative ways to
68force read traffic to the master. In general, you should wrap your statements
69in a transaction when you are reading and writing to the same tables at the
70same time, since your replicants will often lag a bit behind the master.
212cc5c2 71
72See L<DBIx::Class::Storage::DBI::Replicated::Instructions> for more help and
73walkthroughs.
d4daee7b 74
2bf79155 75=head1 DESCRIPTION
76
7e38d850 77Warning: This class is marked BETA. This has been running a production
ccb3b897 78website using MySQL native replication as its backend and we have some decent
7e38d850 79test coverage but the code hasn't yet been stressed by a variety of databases.
48580715 80Individual DBs may have quirks we are not aware of. Please use this in first
7e38d850 81development and pass along your experiences/bug fixes.
2bf79155 82
83This class implements replicated data store for DBI. Currently you can define
84one master and numerous slave database connections. All write-type queries
85(INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
86database, all read-type queries (SELECTs) go to the slave database.
87
88Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
bca099a3 89handle gets delegated to one of the two attributes: L</read_handler> or to
90L</write_handler>. Additionally, some methods need to be distributed
2bf79155 91to all existing storages. This way our storage class is a drop in replacement
92for L<DBIx::Class::Storage::DBI>.
93
48580715 94Read traffic is spread across the replicants (slaves) occurring to a user
2bf79155 95selected algorithm. The default algorithm is random weighted.
96
bca099a3 97=head1 NOTES
98
48580715 99The consistency between master and replicants is database specific. The Pool
faaba25f 100gives you a method to validate its replicants, removing and replacing them
7e38d850 101when they fail/pass predefined criteria. Please make careful use of the ways
ecb65397 102to force a query to run against Master when needed.
103
104=head1 REQUIREMENTS
105
a34b0c89 106Replicated Storage has additional requirements not currently part of
aa8b2277 107L<DBIx::Class>. See L<DBIx::Class::Optional::Dependencies> for more details.
2bf79155 108
109=head1 ATTRIBUTES
110
111This class defines the following attributes.
112
2ce6e9a6 113=head2 schema
114
115The underlying L<DBIx::Class::Schema> object this storage is attaching
116
117=cut
118
119has 'schema' => (
120 is=>'rw',
6a151f58 121 isa=>DBICSchema,
2ce6e9a6 122 weak_ref=>1,
123 required=>1,
124);
125
26ab719a 126=head2 pool_type
2bf79155 127
fd323bf1 128Contains the classname which will instantiate the L</pool> object. Defaults
26ab719a 129to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
2bf79155 130
131=cut
132
26ab719a 133has 'pool_type' => (
dcdf7b2c 134 is=>'rw',
41916570 135 isa=>ClassName,
2ce6e9a6 136 default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
64cdad22 137 handles=>{
138 'create_pool' => 'new',
139 },
2bf79155 140);
141
f068a139 142=head2 pool_args
143
144Contains a hashref of initialized information to pass to the Balancer object.
212cc5c2 145See L<DBIx::Class::Storage::DBI::Replicated::Pool> for available arguments.
f068a139 146
147=cut
148
149has 'pool_args' => (
dcdf7b2c 150 is=>'rw',
41916570 151 isa=>HashRef,
64cdad22 152 lazy=>1,
64cdad22 153 default=>sub { {} },
f068a139 154);
155
156
26ab719a 157=head2 balancer_type
2bf79155 158
159The replication pool requires a balance class to provider the methods for
160choose how to spread the query load across each replicant in the pool.
161
162=cut
163
26ab719a 164has 'balancer_type' => (
dcdf7b2c 165 is=>'rw',
9901aad7 166 isa=>BalancerClassNamePart,
2ce6e9a6 167 coerce=>1,
168 required=>1,
169 default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
64cdad22 170 handles=>{
171 'create_balancer' => 'new',
172 },
2bf79155 173);
174
17b05c13 175=head2 balancer_args
176
177Contains a hashref of initialized information to pass to the Balancer object.
212cc5c2 178See L<DBIx::Class::Storage::DBI::Replicated::Balancer> for available arguments.
17b05c13 179
180=cut
181
182has 'balancer_args' => (
dcdf7b2c 183 is=>'rw',
41916570 184 isa=>HashRef,
64cdad22 185 lazy=>1,
186 required=>1,
187 default=>sub { {} },
17b05c13 188);
189
26ab719a 190=head2 pool
2bf79155 191
26ab719a 192Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class. This is a
193container class for one or more replicated databases.
2bf79155 194
195=cut
196
26ab719a 197has 'pool' => (
64cdad22 198 is=>'ro',
199 isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
200 lazy_build=>1,
201 handles=>[qw/
6f7344b8 202 connect_replicants
64cdad22 203 replicants
204 has_replicants
205 /],
2bf79155 206);
207
26ab719a 208=head2 balancer
2bf79155 209
fd323bf1 210Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class. This
26ab719a 211is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
2bf79155 212
26ab719a 213=cut
2bf79155 214
26ab719a 215has 'balancer' => (
dcdf7b2c 216 is=>'rw',
64cdad22 217 isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
218 lazy_build=>1,
219 handles=>[qw/auto_validate_every/],
26ab719a 220);
2bf79155 221
cb6ec758 222=head2 master
223
224The master defines the canonical state for a pool of connected databases. All
225the replicants are expected to match this databases state. Thus, in a classic
226Master / Slaves distributed system, all the slaves are expected to replicate
227the Master's state as quick as possible. This is the only database in the
228pool of databases that is allowed to handle write traffic.
229
230=cut
231
232has 'master' => (
64cdad22 233 is=> 'ro',
6a151f58 234 isa=>DBICStorageDBI,
64cdad22 235 lazy_build=>1,
cb6ec758 236);
237
cb6ec758 238=head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
239
fd323bf1 240The following methods are delegated all the methods required for the
cb6ec758 241L<DBIx::Class::Storage::DBI> interface.
242
243=head2 read_handler
244
245Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
246
247=cut
248
249has 'read_handler' => (
64cdad22 250 is=>'rw',
41916570 251 isa=>Object,
64cdad22 252 lazy_build=>1,
253 handles=>[qw/
254 select
255 select_single
256 columns_info_for
fd323bf1 257 _dbh_columns_info_for
3244fdcc 258 _select
6f7344b8 259 /],
cb6ec758 260);
261
cb6ec758 262=head2 write_handler
263
3244fdcc 264Defines an object that implements the write side of L<BIx::Class::Storage::DBI>,
265as well as methods that don't write or read that can be called on only one
266storage, methods that return a C<$dbh>, and any methods that don't make sense to
267run on a replicant.
cb6ec758 268
269=cut
270
271has 'write_handler' => (
64cdad22 272 is=>'ro',
41916570 273 isa=>Object,
64cdad22 274 lazy_build=>1,
6f7344b8 275 handles=>[qw/
64cdad22 276 on_connect_do
6f7344b8 277 on_disconnect_do
3244fdcc 278 on_connect_call
279 on_disconnect_call
64cdad22 280 connect_info
3244fdcc 281 _connect_info
64cdad22 282 throw_exception
283 sql_maker
284 sqlt_type
285 create_ddl_dir
286 deployment_statements
287 datetime_parser
6f7344b8 288 datetime_parser_type
289 build_datetime_parser
64cdad22 290 last_insert_id
291 insert
292 insert_bulk
293 update
294 delete
295 dbh
2ce6e9a6 296 txn_begin
64cdad22 297 txn_do
298 txn_commit
299 txn_rollback
2ce6e9a6 300 txn_scope_guard
64cdad22 301 sth
302 deploy
0180bef9 303 with_deferred_fk_checks
6f7344b8 304 dbh_do
64cdad22 305 reload_row
6f7344b8 306 with_deferred_fk_checks
2ce6e9a6 307 _prep_for_execute
7fb60fb1 308
6f7344b8 309 backup
310 is_datatype_numeric
227d8366 311 _supports_insert_returning
6f7344b8 312 _count_select
6f7344b8 313 _subq_update_delete
314 svp_rollback
315 svp_begin
316 svp_release
e398f77e 317 relname_to_table_alias
318 _straight_join_to_node
3244fdcc 319 _dbh_last_insert_id
320 _fix_bind_params
321 _default_dbi_connect_attributes
322 _dbi_connect_info
323 auto_savepoint
324 _sqlt_version_ok
325 _query_end
326 bind_attribute_by_data_type
327 transaction_depth
328 _dbh
329 _select_args
330 _dbh_execute_array
331 _sql_maker_args
332 _sql_maker
333 _query_start
334 _sqlt_version_error
335 _per_row_update_delete
336 _dbh_begin_work
337 _dbh_execute_inserts_with_no_binds
338 _select_args_to_query
339 _svp_generate_name
340 _multipk_update_delete
341 source_bind_attributes
342 _normalize_connect_info
343 _parse_connect_do
344 _dbh_commit
345 _execute_array
346 _placeholders_supported
3244fdcc 347 savepoints
348 _sqlt_minimum_version
349 _sql_maker_opts
350 _conn_pid
351 _typeless_placeholders_supported
352 _conn_tid
353 _dbh_autocommit
354 _native_data_type
355 _get_dbh
356 sql_maker_class
357 _dbh_rollback
358 _adjust_select_args_for_complex_prefetch
359 _resolve_ident_sources
360 _resolve_column_info
361 _prune_unused_joins
362 _strip_cond_qualifiers
363 _parse_order_by
364 _resolve_aliastypes_from_select_args
365 _execute
366 _do_query
367 _dbh_sth
368 _dbh_execute
6ed1cd2e 369 _prefetch_insert_auto_nextvals
6d766626 370 _server_info_hash
64cdad22 371 /],
cb6ec758 372);
373
e471ab87 374my @unimplemented = qw(
375 _arm_global_destructor
376 _preserve_foreign_dbh
572338e0 377 _verify_pid
378 _verify_tid
e471ab87 379);
380
381for my $method (@unimplemented) {
382 __PACKAGE__->meta->add_method($method, sub {
383 croak "$method must not be called on ".(blessed shift).' objects';
384 });
385}
6d766626 386
b2e4d522 387has _master_connect_info_opts =>
388 (is => 'rw', isa => HashRef, default => sub { {} });
389
390=head2 around: connect_info
391
48580715 392Preserves master's C<connect_info> options (for merging with replicants.)
393Also sets any Replicated-related options from connect_info, such as
dcdf7b2c 394C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>.
b2e4d522 395
396=cut
397
398around connect_info => sub {
399 my ($next, $self, $info, @extra) = @_;
400
0ce2d0d5 401 my $wantarray = wantarray;
402
282a9a4f 403 my $merge = Hash::Merge->new('LEFT_PRECEDENT');
e666c5fd 404
b2e4d522 405 my %opts;
406 for my $arg (@$info) {
407 next unless (reftype($arg)||'') eq 'HASH';
e666c5fd 408 %opts = %{ $merge->merge($arg, \%opts) };
b2e4d522 409 }
b2e4d522 410 delete $opts{dsn};
411
dcdf7b2c 412 if (@opts{qw/pool_type pool_args/}) {
413 $self->pool_type(delete $opts{pool_type})
414 if $opts{pool_type};
415
b88b85e7 416 $self->pool_args(
e666c5fd 417 $merge->merge((delete $opts{pool_args} || {}), $self->pool_args)
b88b85e7 418 );
dcdf7b2c 419
67c43863 420 $self->pool($self->_build_pool)
6f7344b8 421 if $self->pool;
dcdf7b2c 422 }
423
424 if (@opts{qw/balancer_type balancer_args/}) {
425 $self->balancer_type(delete $opts{balancer_type})
426 if $opts{balancer_type};
427
b88b85e7 428 $self->balancer_args(
e666c5fd 429 $merge->merge((delete $opts{balancer_args} || {}), $self->balancer_args)
b88b85e7 430 );
dcdf7b2c 431
67c43863 432 $self->balancer($self->_build_balancer)
6f7344b8 433 if $self->balancer;
dcdf7b2c 434 }
435
b2e4d522 436 $self->_master_connect_info_opts(\%opts);
437
0ce2d0d5 438 my (@res, $res);
439 if ($wantarray) {
440 @res = $self->$next($info, @extra);
441 } else {
442 $res = $self->$next($info, @extra);
443 }
444
fd4eb9c2 445 # Make sure master is blessed into the correct class and apply role to it.
446 my $master = $self->master;
447 $master->_determine_driver;
448 Moose::Meta::Class->initialize(ref $master);
cea43436 449
ec0946db 450 DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master);
cea43436 451
452 # link pool back to master
453 $self->pool->master($master);
0ce2d0d5 454
455 $wantarray ? @res : $res;
b2e4d522 456};
457
26ab719a 458=head1 METHODS
2bf79155 459
26ab719a 460This class defines the following methods.
2bf79155 461
c354902c 462=head2 BUILDARGS
2bf79155 463
faaba25f 464L<DBIx::Class::Schema> when instantiating its storage passed itself as the
2ce6e9a6 465first argument. So we need to massage the arguments a bit so that all the
466bits get put into the correct places.
2bf79155 467
468=cut
469
c354902c 470sub BUILDARGS {
fd323bf1 471 my ($class, $schema, $storage_type_args, @args) = @_;
d4daee7b 472
c354902c 473 return {
6f7344b8 474 schema=>$schema,
475 %$storage_type_args,
476 @args
c354902c 477 }
478}
2bf79155 479
cb6ec758 480=head2 _build_master
2bf79155 481
cb6ec758 482Lazy builder for the L</master> attribute.
2bf79155 483
484=cut
485
cb6ec758 486sub _build_master {
2ce6e9a6 487 my $self = shift @_;
ee356d00 488 my $master = DBIx::Class::Storage::DBI->new($self->schema);
ee356d00 489 $master
106d5f3b 490}
491
26ab719a 492=head2 _build_pool
2bf79155 493
26ab719a 494Lazy builder for the L</pool> attribute.
2bf79155 495
496=cut
497
26ab719a 498sub _build_pool {
64cdad22 499 my $self = shift @_;
500 $self->create_pool(%{$self->pool_args});
2bf79155 501}
502
26ab719a 503=head2 _build_balancer
2bf79155 504
cb6ec758 505Lazy builder for the L</balancer> attribute. This takes a Pool object so that
506the balancer knows which pool it's balancing.
2bf79155 507
508=cut
509
26ab719a 510sub _build_balancer {
64cdad22 511 my $self = shift @_;
512 $self->create_balancer(
6f7344b8 513 pool=>$self->pool,
64cdad22 514 master=>$self->master,
515 %{$self->balancer_args},
516 );
2bf79155 517}
518
cb6ec758 519=head2 _build_write_handler
2bf79155 520
cb6ec758 521Lazy builder for the L</write_handler> attribute. The default is to set this to
522the L</master>.
50336325 523
524=cut
525
cb6ec758 526sub _build_write_handler {
64cdad22 527 return shift->master;
cb6ec758 528}
50336325 529
cb6ec758 530=head2 _build_read_handler
2bf79155 531
cb6ec758 532Lazy builder for the L</read_handler> attribute. The default is to set this to
533the L</balancer>.
2bf79155 534
535=cut
536
cb6ec758 537sub _build_read_handler {
64cdad22 538 return shift->balancer;
cb6ec758 539}
50336325 540
cb6ec758 541=head2 around: connect_replicants
2bf79155 542
cb6ec758 543All calls to connect_replicants needs to have an existing $schema tacked onto
b2e4d522 544top of the args, since L<DBIx::Storage::DBI> needs it, and any C<connect_info>
545options merged with the master, with replicant opts having higher priority.
955a6df6 546
cb6ec758 547=cut
955a6df6 548
b2e4d522 549around connect_replicants => sub {
550 my ($next, $self, @args) = @_;
551
552 for my $r (@args) {
553 $r = [ $r ] unless reftype $r eq 'ARRAY';
554
1a58752c 555 $self->throw_exception('coderef replicant connect_info not supported')
b2e4d522 556 if ref $r->[0] && reftype $r->[0] eq 'CODE';
557
558# any connect_info options?
559 my $i = 0;
560 $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
561
6f7344b8 562# make one if none
b2e4d522 563 $r->[$i] = {} unless $r->[$i];
564
565# merge if two hashes
b88b85e7 566 my @hashes = @$r[$i .. $#{$r}];
567
1a58752c 568 $self->throw_exception('invalid connect_info options')
b88b85e7 569 if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes;
570
1a58752c 571 $self->throw_exception('too many hashrefs in connect_info')
b88b85e7 572 if @hashes > 2;
573
282a9a4f 574 my $merge = Hash::Merge->new('LEFT_PRECEDENT');
e666c5fd 575 my %opts = %{ $merge->merge(reverse @hashes) };
b88b85e7 576
577# delete them
b2e4d522 578 splice @$r, $i+1, ($#{$r} - $i), ();
579
0bd8e058 580# make sure master/replicants opts don't clash
581 my %master_opts = %{ $self->_master_connect_info_opts };
582 if (exists $opts{dbh_maker}) {
583 delete @master_opts{qw/dsn user password/};
584 }
585 delete $master_opts{dbh_maker};
586
b2e4d522 587# merge with master
e666c5fd 588 %opts = %{ $merge->merge(\%opts, \%master_opts) };
b2e4d522 589
590# update
591 $r->[$i] = \%opts;
592 }
593
594 $self->$next($self->schema, @args);
955a6df6 595};
2bf79155 596
2bf79155 597=head2 all_storages
598
599Returns an array of of all the connected storage backends. The first element
600in the returned array is the master, and the remainings are each of the
601replicants.
602
603=cut
604
605sub all_storages {
64cdad22 606 my $self = shift @_;
607 return grep {defined $_ && blessed $_} (
608 $self->master,
6412a592 609 values %{ $self->replicants },
64cdad22 610 );
2bf79155 611}
612
c4d3fae2 613=head2 execute_reliably ($coderef, ?@args)
614
615Given a coderef, saves the current state of the L</read_handler>, forces it to
48580715 616use reliable storage (e.g. sets it to the master), executes a coderef and then
c4d3fae2 617restores the original state.
618
619Example:
620
64cdad22 621 my $reliably = sub {
622 my $name = shift @_;
623 $schema->resultset('User')->create({name=>$name});
fd323bf1 624 my $user_rs = $schema->resultset('User')->find({name=>$name});
64cdad22 625 return $user_rs;
626 };
c4d3fae2 627
64cdad22 628 my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
c4d3fae2 629
630Use this when you must be certain of your database state, such as when you just
631inserted something and need to get a resultset including it, etc.
632
633=cut
634
635sub execute_reliably {
64cdad22 636 my ($self, $coderef, @args) = @_;
d4daee7b 637
64cdad22 638 unless( ref $coderef eq 'CODE') {
639 $self->throw_exception('Second argument must be a coderef');
640 }
d4daee7b 641
64cdad22 642 ##Get copy of master storage
643 my $master = $self->master;
d4daee7b 644
64cdad22 645 ##Get whatever the current read hander is
646 my $current = $self->read_handler;
d4daee7b 647
64cdad22 648 ##Set the read handler to master
649 $self->read_handler($master);
d4daee7b 650
64cdad22 651 ## do whatever the caller needs
652 my @result;
653 my $want_array = wantarray;
d4daee7b 654
ed7ab0f4 655 try {
64cdad22 656 if($want_array) {
657 @result = $coderef->(@args);
658 } elsif(defined $want_array) {
659 ($result[0]) = ($coderef->(@args));
ed213e85 660 } else {
64cdad22 661 $coderef->(@args);
6f7344b8 662 }
ed7ab0f4 663 } catch {
664 $self->throw_exception("coderef returned an error: $_");
665 } finally {
666 ##Reset to the original state
667 $self->read_handler($current);
64cdad22 668 };
d4daee7b 669
ed7ab0f4 670 return $want_array ? @result : $result[0];
c4d3fae2 671}
672
cb6ec758 673=head2 set_reliable_storage
674
675Sets the current $schema to be 'reliable', that is all queries, both read and
676write are sent to the master
d4daee7b 677
cb6ec758 678=cut
679
680sub set_reliable_storage {
64cdad22 681 my $self = shift @_;
682 my $schema = $self->schema;
683 my $write_handler = $self->schema->storage->write_handler;
d4daee7b 684
64cdad22 685 $schema->storage->read_handler($write_handler);
cb6ec758 686}
687
688=head2 set_balanced_storage
689
690Sets the current $schema to be use the </balancer> for all reads, while all
48580715 691writes are sent to the master only
d4daee7b 692
cb6ec758 693=cut
694
695sub set_balanced_storage {
64cdad22 696 my $self = shift @_;
697 my $schema = $self->schema;
bd5da369 698 my $balanced_handler = $self->schema->storage->balancer;
d4daee7b 699
bd5da369 700 $schema->storage->read_handler($balanced_handler);
cb6ec758 701}
2bf79155 702
703=head2 connected
704
705Check that the master and at least one of the replicants is connected.
706
707=cut
708
709sub connected {
64cdad22 710 my $self = shift @_;
711 return
712 $self->master->connected &&
713 $self->pool->connected_replicants;
2bf79155 714}
715
2bf79155 716=head2 ensure_connected
717
718Make sure all the storages are connected.
719
720=cut
721
722sub ensure_connected {
64cdad22 723 my $self = shift @_;
724 foreach my $source ($self->all_storages) {
725 $source->ensure_connected(@_);
726 }
2bf79155 727}
728
2bf79155 729=head2 limit_dialect
730
731Set the limit_dialect for all existing storages
732
733=cut
734
735sub limit_dialect {
64cdad22 736 my $self = shift @_;
737 foreach my $source ($self->all_storages) {
738 $source->limit_dialect(@_);
739 }
3fbe08e3 740 return $self->master->quote_char;
2bf79155 741}
742
2bf79155 743=head2 quote_char
744
745Set the quote_char for all existing storages
746
747=cut
748
749sub quote_char {
64cdad22 750 my $self = shift @_;
751 foreach my $source ($self->all_storages) {
752 $source->quote_char(@_);
753 }
3fbe08e3 754 return $self->master->quote_char;
2bf79155 755}
756
2bf79155 757=head2 name_sep
758
759Set the name_sep for all existing storages
760
761=cut
762
763sub name_sep {
64cdad22 764 my $self = shift @_;
765 foreach my $source ($self->all_storages) {
766 $source->name_sep(@_);
767 }
3fbe08e3 768 return $self->master->name_sep;
2bf79155 769}
770
2bf79155 771=head2 set_schema
772
773Set the schema object for all existing storages
774
775=cut
776
777sub set_schema {
64cdad22 778 my $self = shift @_;
779 foreach my $source ($self->all_storages) {
780 $source->set_schema(@_);
781 }
2bf79155 782}
783
2bf79155 784=head2 debug
785
786set a debug flag across all storages
787
788=cut
789
790sub debug {
64cdad22 791 my $self = shift @_;
3fbe08e3 792 if(@_) {
793 foreach my $source ($self->all_storages) {
794 $source->debug(@_);
6f7344b8 795 }
64cdad22 796 }
3fbe08e3 797 return $self->master->debug;
2bf79155 798}
799
2bf79155 800=head2 debugobj
801
cea43436 802set a debug object
2bf79155 803
804=cut
805
806sub debugobj {
64cdad22 807 my $self = shift @_;
cea43436 808 return $self->master->debugobj(@_);
2bf79155 809}
810
2bf79155 811=head2 debugfh
812
cea43436 813set a debugfh object
2bf79155 814
815=cut
816
817sub debugfh {
64cdad22 818 my $self = shift @_;
cea43436 819 return $self->master->debugfh(@_);
2bf79155 820}
821
2bf79155 822=head2 debugcb
823
cea43436 824set a debug callback
2bf79155 825
826=cut
827
828sub debugcb {
64cdad22 829 my $self = shift @_;
cea43436 830 return $self->master->debugcb(@_);
2bf79155 831}
832
2bf79155 833=head2 disconnect
834
835disconnect everything
836
837=cut
838
839sub disconnect {
64cdad22 840 my $self = shift @_;
841 foreach my $source ($self->all_storages) {
842 $source->disconnect(@_);
843 }
2bf79155 844}
845
b2e4d522 846=head2 cursor_class
847
848set cursor class on all storages, or return master's
849
850=cut
851
852sub cursor_class {
853 my ($self, $cursor_class) = @_;
854
855 if ($cursor_class) {
856 $_->cursor_class($cursor_class) for $self->all_storages;
857 }
858 $self->master->cursor_class;
859}
d4daee7b 860
3244fdcc 861=head2 cursor
862
863set cursor class on all storages, or return master's, alias for L</cursor_class>
864above.
865
866=cut
867
868sub cursor {
869 my ($self, $cursor_class) = @_;
870
871 if ($cursor_class) {
872 $_->cursor($cursor_class) for $self->all_storages;
873 }
874 $self->master->cursor;
875}
876
877=head2 unsafe
878
879sets the L<DBIx::Class::Storage::DBI/unsafe> option on all storages or returns
880master's current setting
881
882=cut
883
884sub unsafe {
885 my $self = shift;
886
887 if (@_) {
888 $_->unsafe(@_) for $self->all_storages;
889 }
890
891 return $self->master->unsafe;
892}
893
894=head2 disable_sth_caching
895
896sets the L<DBIx::Class::Storage::DBI/disable_sth_caching> option on all storages
897or returns master's current setting
898
899=cut
900
901sub disable_sth_caching {
902 my $self = shift;
903
904 if (@_) {
905 $_->disable_sth_caching(@_) for $self->all_storages;
906 }
907
908 return $self->master->disable_sth_caching;
909}
910
911=head2 lag_behind_master
912
913returns the highest Replicant L<DBIx::Class::Storage::DBI/lag_behind_master>
914setting
915
916=cut
917
918sub lag_behind_master {
919 my $self = shift;
920
921 return max map $_->lag_behind_master, $self->replicants;
fd323bf1 922}
3244fdcc 923
924=head2 is_replicating
925
926returns true if all replicants return true for
927L<DBIx::Class::Storage::DBI/is_replicating>
928
929=cut
930
931sub is_replicating {
932 my $self = shift;
933
934 return (grep $_->is_replicating, $self->replicants) == ($self->replicants);
935}
936
937=head2 connect_call_datetime_setup
938
939calls L<DBIx::Class::Storage::DBI/connect_call_datetime_setup> for all storages
940
941=cut
942
943sub connect_call_datetime_setup {
944 my $self = shift;
945 $_->connect_call_datetime_setup for $self->all_storages;
946}
947
948sub _populate_dbh {
949 my $self = shift;
950 $_->_populate_dbh for $self->all_storages;
951}
952
953sub _connect {
954 my $self = shift;
955 $_->_connect for $self->all_storages;
956}
957
958sub _rebless {
959 my $self = shift;
960 $_->_rebless for $self->all_storages;
961}
962
963sub _determine_driver {
964 my $self = shift;
965 $_->_determine_driver for $self->all_storages;
966}
967
968sub _driver_determined {
969 my $self = shift;
fd323bf1 970
3244fdcc 971 if (@_) {
972 $_->_driver_determined(@_) for $self->all_storages;
973 }
974
975 return $self->master->_driver_determined;
976}
977
978sub _init {
979 my $self = shift;
fd323bf1 980
3244fdcc 981 $_->_init for $self->all_storages;
982}
983
984sub _run_connection_actions {
985 my $self = shift;
fd323bf1 986
3244fdcc 987 $_->_run_connection_actions for $self->all_storages;
988}
989
990sub _do_connection_actions {
991 my $self = shift;
fd323bf1 992
3244fdcc 993 if (@_) {
994 $_->_do_connection_actions(@_) for $self->all_storages;
995 }
996}
997
998sub connect_call_do_sql {
999 my $self = shift;
1000 $_->connect_call_do_sql(@_) for $self->all_storages;
1001}
1002
1003sub disconnect_call_do_sql {
1004 my $self = shift;
1005 $_->disconnect_call_do_sql(@_) for $self->all_storages;
1006}
1007
1008sub _seems_connected {
1009 my $self = shift;
1010
1011 return min map $_->_seems_connected, $self->all_storages;
1012}
1013
1014sub _ping {
1015 my $self = shift;
1016
1017 return min map $_->_ping, $self->all_storages;
1018}
1019
7da56142 1020my $numify_ver = sub {
1021 my $ver = shift;
1022 my @numparts = split /\D+/, $ver;
1023 my $format = '%d.' . (join '', ('%05d') x (@numparts - 1));
1024
1025 return sprintf $format, @numparts;
1026};
1027
fecb38cb 1028sub _server_info {
1029 my $self = shift;
1030
1031 if (not $self->_server_info_hash) {
7da56142 1032 my $min_version_info = (
fd323bf1 1033 reduce { $a->[0] < $b->[0] ? $a : $b }
7da56142 1034 map [ $numify_ver->($_->{dbms_version}), $_ ],
1035 map $_->_server_info, $self->all_storages
1036 )->[1];
fecb38cb 1037
1038 $self->_server_info_hash($min_version_info); # on master
1039 }
1040
1041 return $self->_server_info_hash;
1042}
1043
1044sub _get_server_version {
1045 my $self = shift;
1046
1047 return $self->_server_info->{dbms_version};
1048}
1049
7e38d850 1050=head1 GOTCHAS
1051
1052Due to the fact that replicants can lag behind a master, you must take care to
1053make sure you use one of the methods to force read queries to a master should
1054you need realtime data integrity. For example, if you insert a row, and then
1055immediately re-read it from the database (say, by doing $row->discard_changes)
1056or you insert a row and then immediately build a query that expects that row
1057to be an item, you should force the master to handle reads. Otherwise, due to
1058the lag, there is no certainty your data will be in the expected state.
1059
1060For data integrity, all transactions automatically use the master storage for
1061all read and write queries. Using a transaction is the preferred and recommended
1062method to force the master to handle all read queries.
1063
1064Otherwise, you can force a single query to use the master with the 'force_pool'
1065attribute:
1066
1067 my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk);
1068
1069This attribute will safely be ignore by non replicated storages, so you can use
1070the same code for both types of systems.
1071
1072Lastly, you can use the L</execute_reliably> method, which works very much like
1073a transaction.
1074
1075For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
1076and L</set_balanced_storage>, however this operates at a global level and is not
1077suitable if you have a shared Schema object being used by multiple processes,
1078such as on a web application server. You can get around this limitation by
1079using the Schema clone method.
1080
1081 my $new_schema = $schema->clone;
1082 $new_schema->set_reliable_storage;
d4daee7b 1083
7e38d850 1084 ## $new_schema will use only the Master storage for all reads/writes while
1085 ## the $schema object will use replicated storage.
1086
f5d3a5de 1087=head1 AUTHOR
1088
64cdad22 1089 John Napiorkowski <john.napiorkowski@takkle.com>
f5d3a5de 1090
c4d3fae2 1091Based on code originated by:
f5d3a5de 1092
64cdad22 1093 Norbert Csongrádi <bert@cpan.org>
1094 Peter Siklósi <einon@einon.hu>
2156bbdd 1095
f5d3a5de 1096=head1 LICENSE
1097
1098You may distribute this code under the same terms as Perl itself.
1099
1100=cut
1101
c354902c 1102__PACKAGE__->meta->make_immutable;
1103
f5d3a5de 11041;