Skip tests segfaulting with ancient DBD::Sybase versions
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
CommitLineData
2156bbdd 1package DBIx::Class::Storage::DBI::Replicated;
f5d3a5de 2
ecb65397 3BEGIN {
4 use Carp::Clan qw/^DBIx::Class/;
a34b0c89 5 use DBIx::Class;
6 croak('The following modules are required for Replication ' . DBIx::Class::Optional::Dependencies->req_missing_for ('replicated') )
7 unless DBIx::Class::Optional::Dependencies->req_ok_for ('replicated');
ecb65397 8}
9
b2e4d522 10use Moose;
26ab719a 11use DBIx::Class::Storage::DBI;
2bf79155 12use DBIx::Class::Storage::DBI::Replicated::Pool;
26ab719a 13use DBIx::Class::Storage::DBI::Replicated::Balancer;
6a151f58 14use DBIx::Class::Storage::DBI::Replicated::Types qw/BalancerClassNamePart DBICSchema DBICStorageDBI/;
41916570 15use MooseX::Types::Moose qw/ClassName HashRef Object/;
b2e4d522 16use Scalar::Util 'reftype';
e666c5fd 17use Hash::Merge;
7da56142 18use List::Util qw/min max reduce/;
ed7ab0f4 19use Try::Tiny;
9901aad7 20
21use namespace::clean -except => 'meta';
2bf79155 22
23=head1 NAME
24
ecb65397 25DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
2bf79155 26
27=head1 SYNOPSIS
28
29The Following example shows how to change an existing $schema to a replicated
48580715 30storage type, add some replicated (read-only) databases, and perform reporting
955a6df6 31tasks.
2bf79155 32
3da4f736 33You should set the 'storage_type attribute to a replicated type. You should
d4daee7b 34also define your arguments, such as which balancer you want and any arguments
3da4f736 35that the Pool object should get.
36
ce854fd3 37 my $schema = Schema::Class->clone;
64cdad22 38 $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
ce854fd3 39 $schema->connection(...);
d4daee7b 40
3da4f736 41Next, you need to add in the Replicants. Basically this is an array of
42arrayrefs, where each arrayref is database connect information. Think of these
43arguments as what you'd pass to the 'normal' $schema->connect method.
d4daee7b 44
64cdad22 45 $schema->storage->connect_replicants(
46 [$dsn1, $user, $pass, \%opts],
47 [$dsn2, $user, $pass, \%opts],
48 [$dsn3, $user, $pass, \%opts],
49 );
d4daee7b 50
3da4f736 51Now, just use the $schema as you normally would. Automatically all reads will
52be delegated to the replicants, while writes to the master.
53
7e38d850 54 $schema->resultset('Source')->search({name=>'etc'});
d4daee7b 55
3da4f736 56You can force a given query to use a particular storage using the search
57attribute 'force_pool'. For example:
d4daee7b 58
7e38d850 59 my $RS = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
3da4f736 60
61Now $RS will force everything (both reads and writes) to use whatever was setup
62as the master storage. 'master' is hardcoded to always point to the Master,
63but you can also use any Replicant name. Please see:
212cc5c2 64L<DBIx::Class::Storage::DBI::Replicated::Pool> and the replicants attribute for more.
3da4f736 65
66Also see transactions and L</execute_reliably> for alternative ways to
67force read traffic to the master. In general, you should wrap your statements
68in a transaction when you are reading and writing to the same tables at the
69same time, since your replicants will often lag a bit behind the master.
212cc5c2 70
71See L<DBIx::Class::Storage::DBI::Replicated::Instructions> for more help and
72walkthroughs.
d4daee7b 73
2bf79155 74=head1 DESCRIPTION
75
7e38d850 76Warning: This class is marked BETA. This has been running a production
ccb3b897 77website using MySQL native replication as its backend and we have some decent
7e38d850 78test coverage but the code hasn't yet been stressed by a variety of databases.
48580715 79Individual DBs may have quirks we are not aware of. Please use this in first
7e38d850 80development and pass along your experiences/bug fixes.
2bf79155 81
82This class implements replicated data store for DBI. Currently you can define
83one master and numerous slave database connections. All write-type queries
84(INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
85database, all read-type queries (SELECTs) go to the slave database.
86
87Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
bca099a3 88handle gets delegated to one of the two attributes: L</read_handler> or to
89L</write_handler>. Additionally, some methods need to be distributed
2bf79155 90to all existing storages. This way our storage class is a drop in replacement
91for L<DBIx::Class::Storage::DBI>.
92
48580715 93Read traffic is spread across the replicants (slaves) occurring to a user
2bf79155 94selected algorithm. The default algorithm is random weighted.
95
bca099a3 96=head1 NOTES
97
48580715 98The consistency between master and replicants is database specific. The Pool
faaba25f 99gives you a method to validate its replicants, removing and replacing them
7e38d850 100when they fail/pass predefined criteria. Please make careful use of the ways
ecb65397 101to force a query to run against Master when needed.
102
103=head1 REQUIREMENTS
104
a34b0c89 105Replicated Storage has additional requirements not currently part of
aa8b2277 106L<DBIx::Class>. See L<DBIx::Class::Optional::Dependencies> for more details.
2bf79155 107
108=head1 ATTRIBUTES
109
110This class defines the following attributes.
111
2ce6e9a6 112=head2 schema
113
114The underlying L<DBIx::Class::Schema> object this storage is attaching
115
116=cut
117
118has 'schema' => (
119 is=>'rw',
6a151f58 120 isa=>DBICSchema,
2ce6e9a6 121 weak_ref=>1,
122 required=>1,
123);
124
26ab719a 125=head2 pool_type
2bf79155 126
26ab719a 127Contains the classname which will instantiate the L</pool> object. Defaults
128to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
2bf79155 129
130=cut
131
26ab719a 132has 'pool_type' => (
dcdf7b2c 133 is=>'rw',
41916570 134 isa=>ClassName,
2ce6e9a6 135 default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
64cdad22 136 handles=>{
137 'create_pool' => 'new',
138 },
2bf79155 139);
140
f068a139 141=head2 pool_args
142
143Contains a hashref of initialized information to pass to the Balancer object.
212cc5c2 144See L<DBIx::Class::Storage::DBI::Replicated::Pool> for available arguments.
f068a139 145
146=cut
147
148has 'pool_args' => (
dcdf7b2c 149 is=>'rw',
41916570 150 isa=>HashRef,
64cdad22 151 lazy=>1,
64cdad22 152 default=>sub { {} },
f068a139 153);
154
155
26ab719a 156=head2 balancer_type
2bf79155 157
158The replication pool requires a balance class to provider the methods for
159choose how to spread the query load across each replicant in the pool.
160
161=cut
162
26ab719a 163has 'balancer_type' => (
dcdf7b2c 164 is=>'rw',
9901aad7 165 isa=>BalancerClassNamePart,
2ce6e9a6 166 coerce=>1,
167 required=>1,
168 default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
64cdad22 169 handles=>{
170 'create_balancer' => 'new',
171 },
2bf79155 172);
173
17b05c13 174=head2 balancer_args
175
176Contains a hashref of initialized information to pass to the Balancer object.
212cc5c2 177See L<DBIx::Class::Storage::DBI::Replicated::Balancer> for available arguments.
17b05c13 178
179=cut
180
181has 'balancer_args' => (
dcdf7b2c 182 is=>'rw',
41916570 183 isa=>HashRef,
64cdad22 184 lazy=>1,
185 required=>1,
186 default=>sub { {} },
17b05c13 187);
188
26ab719a 189=head2 pool
2bf79155 190
26ab719a 191Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class. This is a
192container class for one or more replicated databases.
2bf79155 193
194=cut
195
26ab719a 196has 'pool' => (
64cdad22 197 is=>'ro',
198 isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
199 lazy_build=>1,
200 handles=>[qw/
6f7344b8 201 connect_replicants
64cdad22 202 replicants
203 has_replicants
204 /],
2bf79155 205);
206
26ab719a 207=head2 balancer
2bf79155 208
26ab719a 209Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class. This
210is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
2bf79155 211
26ab719a 212=cut
2bf79155 213
26ab719a 214has 'balancer' => (
dcdf7b2c 215 is=>'rw',
64cdad22 216 isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
217 lazy_build=>1,
218 handles=>[qw/auto_validate_every/],
26ab719a 219);
2bf79155 220
cb6ec758 221=head2 master
222
223The master defines the canonical state for a pool of connected databases. All
224the replicants are expected to match this databases state. Thus, in a classic
225Master / Slaves distributed system, all the slaves are expected to replicate
226the Master's state as quick as possible. This is the only database in the
227pool of databases that is allowed to handle write traffic.
228
229=cut
230
231has 'master' => (
64cdad22 232 is=> 'ro',
6a151f58 233 isa=>DBICStorageDBI,
64cdad22 234 lazy_build=>1,
cb6ec758 235);
236
cb6ec758 237=head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
238
239The following methods are delegated all the methods required for the
240L<DBIx::Class::Storage::DBI> interface.
241
242=head2 read_handler
243
244Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
245
246=cut
247
248has 'read_handler' => (
64cdad22 249 is=>'rw',
41916570 250 isa=>Object,
64cdad22 251 lazy_build=>1,
252 handles=>[qw/
253 select
254 select_single
255 columns_info_for
3244fdcc 256 _dbh_columns_info_for
257 _select
6f7344b8 258 /],
cb6ec758 259);
260
cb6ec758 261=head2 write_handler
262
3244fdcc 263Defines an object that implements the write side of L<BIx::Class::Storage::DBI>,
264as well as methods that don't write or read that can be called on only one
265storage, methods that return a C<$dbh>, and any methods that don't make sense to
266run on a replicant.
cb6ec758 267
268=cut
269
270has 'write_handler' => (
64cdad22 271 is=>'ro',
41916570 272 isa=>Object,
64cdad22 273 lazy_build=>1,
6f7344b8 274 handles=>[qw/
64cdad22 275 on_connect_do
6f7344b8 276 on_disconnect_do
3244fdcc 277 on_connect_call
278 on_disconnect_call
64cdad22 279 connect_info
3244fdcc 280 _connect_info
64cdad22 281 throw_exception
282 sql_maker
283 sqlt_type
284 create_ddl_dir
285 deployment_statements
286 datetime_parser
6f7344b8 287 datetime_parser_type
288 build_datetime_parser
64cdad22 289 last_insert_id
290 insert
291 insert_bulk
292 update
293 delete
294 dbh
2ce6e9a6 295 txn_begin
64cdad22 296 txn_do
297 txn_commit
298 txn_rollback
2ce6e9a6 299 txn_scope_guard
64cdad22 300 sth
301 deploy
0180bef9 302 with_deferred_fk_checks
6f7344b8 303 dbh_do
64cdad22 304 reload_row
6f7344b8 305 with_deferred_fk_checks
2ce6e9a6 306 _prep_for_execute
7fb60fb1 307
6f7344b8 308 backup
309 is_datatype_numeric
227d8366 310 _supports_insert_returning
6f7344b8 311 _count_select
6f7344b8 312 _subq_update_delete
313 svp_rollback
314 svp_begin
315 svp_release
e398f77e 316 relname_to_table_alias
317 _straight_join_to_node
3244fdcc 318 _dbh_last_insert_id
319 _fix_bind_params
320 _default_dbi_connect_attributes
321 _dbi_connect_info
322 auto_savepoint
323 _sqlt_version_ok
324 _query_end
325 bind_attribute_by_data_type
326 transaction_depth
327 _dbh
328 _select_args
329 _dbh_execute_array
330 _sql_maker_args
331 _sql_maker
332 _query_start
333 _sqlt_version_error
334 _per_row_update_delete
335 _dbh_begin_work
336 _dbh_execute_inserts_with_no_binds
337 _select_args_to_query
338 _svp_generate_name
339 _multipk_update_delete
340 source_bind_attributes
341 _normalize_connect_info
342 _parse_connect_do
343 _dbh_commit
344 _execute_array
345 _placeholders_supported
3244fdcc 346 savepoints
347 _sqlt_minimum_version
348 _sql_maker_opts
349 _conn_pid
350 _typeless_placeholders_supported
351 _conn_tid
352 _dbh_autocommit
353 _native_data_type
354 _get_dbh
355 sql_maker_class
356 _dbh_rollback
357 _adjust_select_args_for_complex_prefetch
358 _resolve_ident_sources
359 _resolve_column_info
360 _prune_unused_joins
361 _strip_cond_qualifiers
362 _parse_order_by
363 _resolve_aliastypes_from_select_args
364 _execute
365 _do_query
366 _dbh_sth
367 _dbh_execute
6ed1cd2e 368 _prefetch_insert_auto_nextvals
6d766626 369 _server_info_hash
64cdad22 370 /],
cb6ec758 371);
372
e471ab87 373my @unimplemented = qw(
374 _arm_global_destructor
375 _preserve_foreign_dbh
572338e0 376 _verify_pid
377 _verify_tid
e471ab87 378);
379
380for my $method (@unimplemented) {
381 __PACKAGE__->meta->add_method($method, sub {
382 croak "$method must not be called on ".(blessed shift).' objects';
383 });
384}
6d766626 385
b2e4d522 386has _master_connect_info_opts =>
387 (is => 'rw', isa => HashRef, default => sub { {} });
388
389=head2 around: connect_info
390
48580715 391Preserves master's C<connect_info> options (for merging with replicants.)
392Also sets any Replicated-related options from connect_info, such as
dcdf7b2c 393C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>.
b2e4d522 394
395=cut
396
397around connect_info => sub {
398 my ($next, $self, $info, @extra) = @_;
399
0ce2d0d5 400 my $wantarray = wantarray;
401
282a9a4f 402 my $merge = Hash::Merge->new('LEFT_PRECEDENT');
e666c5fd 403
b2e4d522 404 my %opts;
405 for my $arg (@$info) {
406 next unless (reftype($arg)||'') eq 'HASH';
e666c5fd 407 %opts = %{ $merge->merge($arg, \%opts) };
b2e4d522 408 }
b2e4d522 409 delete $opts{dsn};
410
dcdf7b2c 411 if (@opts{qw/pool_type pool_args/}) {
412 $self->pool_type(delete $opts{pool_type})
413 if $opts{pool_type};
414
b88b85e7 415 $self->pool_args(
e666c5fd 416 $merge->merge((delete $opts{pool_args} || {}), $self->pool_args)
b88b85e7 417 );
dcdf7b2c 418
67c43863 419 $self->pool($self->_build_pool)
6f7344b8 420 if $self->pool;
dcdf7b2c 421 }
422
423 if (@opts{qw/balancer_type balancer_args/}) {
424 $self->balancer_type(delete $opts{balancer_type})
425 if $opts{balancer_type};
426
b88b85e7 427 $self->balancer_args(
e666c5fd 428 $merge->merge((delete $opts{balancer_args} || {}), $self->balancer_args)
b88b85e7 429 );
dcdf7b2c 430
67c43863 431 $self->balancer($self->_build_balancer)
6f7344b8 432 if $self->balancer;
dcdf7b2c 433 }
434
b2e4d522 435 $self->_master_connect_info_opts(\%opts);
436
0ce2d0d5 437 my (@res, $res);
438 if ($wantarray) {
439 @res = $self->$next($info, @extra);
440 } else {
441 $res = $self->$next($info, @extra);
442 }
443
fd4eb9c2 444 # Make sure master is blessed into the correct class and apply role to it.
445 my $master = $self->master;
446 $master->_determine_driver;
447 Moose::Meta::Class->initialize(ref $master);
cea43436 448
ec0946db 449 DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master);
cea43436 450
451 # link pool back to master
452 $self->pool->master($master);
0ce2d0d5 453
454 $wantarray ? @res : $res;
b2e4d522 455};
456
26ab719a 457=head1 METHODS
2bf79155 458
26ab719a 459This class defines the following methods.
2bf79155 460
c354902c 461=head2 BUILDARGS
2bf79155 462
faaba25f 463L<DBIx::Class::Schema> when instantiating its storage passed itself as the
2ce6e9a6 464first argument. So we need to massage the arguments a bit so that all the
465bits get put into the correct places.
2bf79155 466
467=cut
468
c354902c 469sub BUILDARGS {
d7a58a29 470 my ($class, $schema, $storage_type_args, @args) = @_;
d4daee7b 471
c354902c 472 return {
6f7344b8 473 schema=>$schema,
474 %$storage_type_args,
475 @args
c354902c 476 }
477}
2bf79155 478
cb6ec758 479=head2 _build_master
2bf79155 480
cb6ec758 481Lazy builder for the L</master> attribute.
2bf79155 482
483=cut
484
cb6ec758 485sub _build_master {
2ce6e9a6 486 my $self = shift @_;
ee356d00 487 my $master = DBIx::Class::Storage::DBI->new($self->schema);
ee356d00 488 $master
106d5f3b 489}
490
26ab719a 491=head2 _build_pool
2bf79155 492
26ab719a 493Lazy builder for the L</pool> attribute.
2bf79155 494
495=cut
496
26ab719a 497sub _build_pool {
64cdad22 498 my $self = shift @_;
499 $self->create_pool(%{$self->pool_args});
2bf79155 500}
501
26ab719a 502=head2 _build_balancer
2bf79155 503
cb6ec758 504Lazy builder for the L</balancer> attribute. This takes a Pool object so that
505the balancer knows which pool it's balancing.
2bf79155 506
507=cut
508
26ab719a 509sub _build_balancer {
64cdad22 510 my $self = shift @_;
511 $self->create_balancer(
6f7344b8 512 pool=>$self->pool,
64cdad22 513 master=>$self->master,
514 %{$self->balancer_args},
515 );
2bf79155 516}
517
cb6ec758 518=head2 _build_write_handler
2bf79155 519
cb6ec758 520Lazy builder for the L</write_handler> attribute. The default is to set this to
521the L</master>.
50336325 522
523=cut
524
cb6ec758 525sub _build_write_handler {
64cdad22 526 return shift->master;
cb6ec758 527}
50336325 528
cb6ec758 529=head2 _build_read_handler
2bf79155 530
cb6ec758 531Lazy builder for the L</read_handler> attribute. The default is to set this to
532the L</balancer>.
2bf79155 533
534=cut
535
cb6ec758 536sub _build_read_handler {
64cdad22 537 return shift->balancer;
cb6ec758 538}
50336325 539
cb6ec758 540=head2 around: connect_replicants
2bf79155 541
cb6ec758 542All calls to connect_replicants needs to have an existing $schema tacked onto
b2e4d522 543top of the args, since L<DBIx::Storage::DBI> needs it, and any C<connect_info>
544options merged with the master, with replicant opts having higher priority.
955a6df6 545
cb6ec758 546=cut
955a6df6 547
b2e4d522 548around connect_replicants => sub {
549 my ($next, $self, @args) = @_;
550
551 for my $r (@args) {
552 $r = [ $r ] unless reftype $r eq 'ARRAY';
553
1a58752c 554 $self->throw_exception('coderef replicant connect_info not supported')
b2e4d522 555 if ref $r->[0] && reftype $r->[0] eq 'CODE';
556
557# any connect_info options?
558 my $i = 0;
559 $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
560
6f7344b8 561# make one if none
b2e4d522 562 $r->[$i] = {} unless $r->[$i];
563
564# merge if two hashes
b88b85e7 565 my @hashes = @$r[$i .. $#{$r}];
566
1a58752c 567 $self->throw_exception('invalid connect_info options')
b88b85e7 568 if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes;
569
1a58752c 570 $self->throw_exception('too many hashrefs in connect_info')
b88b85e7 571 if @hashes > 2;
572
282a9a4f 573 my $merge = Hash::Merge->new('LEFT_PRECEDENT');
e666c5fd 574 my %opts = %{ $merge->merge(reverse @hashes) };
b88b85e7 575
576# delete them
b2e4d522 577 splice @$r, $i+1, ($#{$r} - $i), ();
578
0bd8e058 579# make sure master/replicants opts don't clash
580 my %master_opts = %{ $self->_master_connect_info_opts };
581 if (exists $opts{dbh_maker}) {
582 delete @master_opts{qw/dsn user password/};
583 }
584 delete $master_opts{dbh_maker};
585
b2e4d522 586# merge with master
e666c5fd 587 %opts = %{ $merge->merge(\%opts, \%master_opts) };
b2e4d522 588
589# update
590 $r->[$i] = \%opts;
591 }
592
593 $self->$next($self->schema, @args);
955a6df6 594};
2bf79155 595
2bf79155 596=head2 all_storages
597
598Returns an array of of all the connected storage backends. The first element
599in the returned array is the master, and the remainings are each of the
600replicants.
601
602=cut
603
604sub all_storages {
64cdad22 605 my $self = shift @_;
606 return grep {defined $_ && blessed $_} (
607 $self->master,
6412a592 608 values %{ $self->replicants },
64cdad22 609 );
2bf79155 610}
611
c4d3fae2 612=head2 execute_reliably ($coderef, ?@args)
613
614Given a coderef, saves the current state of the L</read_handler>, forces it to
48580715 615use reliable storage (e.g. sets it to the master), executes a coderef and then
c4d3fae2 616restores the original state.
617
618Example:
619
64cdad22 620 my $reliably = sub {
621 my $name = shift @_;
622 $schema->resultset('User')->create({name=>$name});
623 my $user_rs = $schema->resultset('User')->find({name=>$name});
624 return $user_rs;
625 };
c4d3fae2 626
64cdad22 627 my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
c4d3fae2 628
629Use this when you must be certain of your database state, such as when you just
630inserted something and need to get a resultset including it, etc.
631
632=cut
633
634sub execute_reliably {
64cdad22 635 my ($self, $coderef, @args) = @_;
d4daee7b 636
64cdad22 637 unless( ref $coderef eq 'CODE') {
638 $self->throw_exception('Second argument must be a coderef');
639 }
d4daee7b 640
64cdad22 641 ##Get copy of master storage
642 my $master = $self->master;
d4daee7b 643
64cdad22 644 ##Get whatever the current read hander is
645 my $current = $self->read_handler;
d4daee7b 646
64cdad22 647 ##Set the read handler to master
648 $self->read_handler($master);
d4daee7b 649
64cdad22 650 ## do whatever the caller needs
651 my @result;
652 my $want_array = wantarray;
d4daee7b 653
ed7ab0f4 654 try {
64cdad22 655 if($want_array) {
656 @result = $coderef->(@args);
657 } elsif(defined $want_array) {
658 ($result[0]) = ($coderef->(@args));
ed213e85 659 } else {
64cdad22 660 $coderef->(@args);
6f7344b8 661 }
ed7ab0f4 662 } catch {
663 $self->throw_exception("coderef returned an error: $_");
664 } finally {
665 ##Reset to the original state
666 $self->read_handler($current);
64cdad22 667 };
d4daee7b 668
ed7ab0f4 669 return $want_array ? @result : $result[0];
c4d3fae2 670}
671
cb6ec758 672=head2 set_reliable_storage
673
674Sets the current $schema to be 'reliable', that is all queries, both read and
675write are sent to the master
d4daee7b 676
cb6ec758 677=cut
678
679sub set_reliable_storage {
64cdad22 680 my $self = shift @_;
681 my $schema = $self->schema;
682 my $write_handler = $self->schema->storage->write_handler;
d4daee7b 683
64cdad22 684 $schema->storage->read_handler($write_handler);
cb6ec758 685}
686
687=head2 set_balanced_storage
688
689Sets the current $schema to be use the </balancer> for all reads, while all
48580715 690writes are sent to the master only
d4daee7b 691
cb6ec758 692=cut
693
694sub set_balanced_storage {
64cdad22 695 my $self = shift @_;
696 my $schema = $self->schema;
bd5da369 697 my $balanced_handler = $self->schema->storage->balancer;
d4daee7b 698
bd5da369 699 $schema->storage->read_handler($balanced_handler);
cb6ec758 700}
2bf79155 701
702=head2 connected
703
704Check that the master and at least one of the replicants is connected.
705
706=cut
707
708sub connected {
64cdad22 709 my $self = shift @_;
710 return
711 $self->master->connected &&
712 $self->pool->connected_replicants;
2bf79155 713}
714
2bf79155 715=head2 ensure_connected
716
717Make sure all the storages are connected.
718
719=cut
720
721sub ensure_connected {
64cdad22 722 my $self = shift @_;
723 foreach my $source ($self->all_storages) {
724 $source->ensure_connected(@_);
725 }
2bf79155 726}
727
2bf79155 728=head2 limit_dialect
729
730Set the limit_dialect for all existing storages
731
732=cut
733
734sub limit_dialect {
64cdad22 735 my $self = shift @_;
736 foreach my $source ($self->all_storages) {
737 $source->limit_dialect(@_);
738 }
3fbe08e3 739 return $self->master->quote_char;
2bf79155 740}
741
2bf79155 742=head2 quote_char
743
744Set the quote_char for all existing storages
745
746=cut
747
748sub quote_char {
64cdad22 749 my $self = shift @_;
750 foreach my $source ($self->all_storages) {
751 $source->quote_char(@_);
752 }
3fbe08e3 753 return $self->master->quote_char;
2bf79155 754}
755
2bf79155 756=head2 name_sep
757
758Set the name_sep for all existing storages
759
760=cut
761
762sub name_sep {
64cdad22 763 my $self = shift @_;
764 foreach my $source ($self->all_storages) {
765 $source->name_sep(@_);
766 }
3fbe08e3 767 return $self->master->name_sep;
2bf79155 768}
769
2bf79155 770=head2 set_schema
771
772Set the schema object for all existing storages
773
774=cut
775
776sub set_schema {
64cdad22 777 my $self = shift @_;
778 foreach my $source ($self->all_storages) {
779 $source->set_schema(@_);
780 }
2bf79155 781}
782
2bf79155 783=head2 debug
784
785set a debug flag across all storages
786
787=cut
788
789sub debug {
64cdad22 790 my $self = shift @_;
3fbe08e3 791 if(@_) {
792 foreach my $source ($self->all_storages) {
793 $source->debug(@_);
6f7344b8 794 }
64cdad22 795 }
3fbe08e3 796 return $self->master->debug;
2bf79155 797}
798
2bf79155 799=head2 debugobj
800
cea43436 801set a debug object
2bf79155 802
803=cut
804
805sub debugobj {
64cdad22 806 my $self = shift @_;
cea43436 807 return $self->master->debugobj(@_);
2bf79155 808}
809
2bf79155 810=head2 debugfh
811
cea43436 812set a debugfh object
2bf79155 813
814=cut
815
816sub debugfh {
64cdad22 817 my $self = shift @_;
cea43436 818 return $self->master->debugfh(@_);
2bf79155 819}
820
2bf79155 821=head2 debugcb
822
cea43436 823set a debug callback
2bf79155 824
825=cut
826
827sub debugcb {
64cdad22 828 my $self = shift @_;
cea43436 829 return $self->master->debugcb(@_);
2bf79155 830}
831
2bf79155 832=head2 disconnect
833
834disconnect everything
835
836=cut
837
838sub disconnect {
64cdad22 839 my $self = shift @_;
840 foreach my $source ($self->all_storages) {
841 $source->disconnect(@_);
842 }
2bf79155 843}
844
b2e4d522 845=head2 cursor_class
846
847set cursor class on all storages, or return master's
848
849=cut
850
851sub cursor_class {
852 my ($self, $cursor_class) = @_;
853
854 if ($cursor_class) {
855 $_->cursor_class($cursor_class) for $self->all_storages;
856 }
857 $self->master->cursor_class;
858}
d4daee7b 859
3244fdcc 860=head2 cursor
861
862set cursor class on all storages, or return master's, alias for L</cursor_class>
863above.
864
865=cut
866
867sub cursor {
868 my ($self, $cursor_class) = @_;
869
870 if ($cursor_class) {
871 $_->cursor($cursor_class) for $self->all_storages;
872 }
873 $self->master->cursor;
874}
875
876=head2 unsafe
877
878sets the L<DBIx::Class::Storage::DBI/unsafe> option on all storages or returns
879master's current setting
880
881=cut
882
883sub unsafe {
884 my $self = shift;
885
886 if (@_) {
887 $_->unsafe(@_) for $self->all_storages;
888 }
889
890 return $self->master->unsafe;
891}
892
893=head2 disable_sth_caching
894
895sets the L<DBIx::Class::Storage::DBI/disable_sth_caching> option on all storages
896or returns master's current setting
897
898=cut
899
900sub disable_sth_caching {
901 my $self = shift;
902
903 if (@_) {
904 $_->disable_sth_caching(@_) for $self->all_storages;
905 }
906
907 return $self->master->disable_sth_caching;
908}
909
910=head2 lag_behind_master
911
912returns the highest Replicant L<DBIx::Class::Storage::DBI/lag_behind_master>
913setting
914
915=cut
916
917sub lag_behind_master {
918 my $self = shift;
919
920 return max map $_->lag_behind_master, $self->replicants;
921}
922
923=head2 is_replicating
924
925returns true if all replicants return true for
926L<DBIx::Class::Storage::DBI/is_replicating>
927
928=cut
929
930sub is_replicating {
931 my $self = shift;
932
933 return (grep $_->is_replicating, $self->replicants) == ($self->replicants);
934}
935
936=head2 connect_call_datetime_setup
937
938calls L<DBIx::Class::Storage::DBI/connect_call_datetime_setup> for all storages
939
940=cut
941
942sub connect_call_datetime_setup {
943 my $self = shift;
944 $_->connect_call_datetime_setup for $self->all_storages;
945}
946
947sub _populate_dbh {
948 my $self = shift;
949 $_->_populate_dbh for $self->all_storages;
950}
951
952sub _connect {
953 my $self = shift;
954 $_->_connect for $self->all_storages;
955}
956
957sub _rebless {
958 my $self = shift;
959 $_->_rebless for $self->all_storages;
960}
961
962sub _determine_driver {
963 my $self = shift;
964 $_->_determine_driver for $self->all_storages;
965}
966
967sub _driver_determined {
968 my $self = shift;
969
970 if (@_) {
971 $_->_driver_determined(@_) for $self->all_storages;
972 }
973
974 return $self->master->_driver_determined;
975}
976
977sub _init {
978 my $self = shift;
979
980 $_->_init for $self->all_storages;
981}
982
983sub _run_connection_actions {
984 my $self = shift;
985
986 $_->_run_connection_actions for $self->all_storages;
987}
988
989sub _do_connection_actions {
990 my $self = shift;
991
992 if (@_) {
993 $_->_do_connection_actions(@_) for $self->all_storages;
994 }
995}
996
997sub connect_call_do_sql {
998 my $self = shift;
999 $_->connect_call_do_sql(@_) for $self->all_storages;
1000}
1001
1002sub disconnect_call_do_sql {
1003 my $self = shift;
1004 $_->disconnect_call_do_sql(@_) for $self->all_storages;
1005}
1006
1007sub _seems_connected {
1008 my $self = shift;
1009
1010 return min map $_->_seems_connected, $self->all_storages;
1011}
1012
1013sub _ping {
1014 my $self = shift;
1015
1016 return min map $_->_ping, $self->all_storages;
1017}
1018
7da56142 1019my $numify_ver = sub {
1020 my $ver = shift;
1021 my @numparts = split /\D+/, $ver;
1022 my $format = '%d.' . (join '', ('%05d') x (@numparts - 1));
1023
1024 return sprintf $format, @numparts;
1025};
1026
fecb38cb 1027sub _server_info {
1028 my $self = shift;
1029
1030 if (not $self->_server_info_hash) {
7da56142 1031 my $min_version_info = (
1032 reduce { $a->[0] < $b->[0] ? $a : $b }
1033 map [ $numify_ver->($_->{dbms_version}), $_ ],
1034 map $_->_server_info, $self->all_storages
1035 )->[1];
fecb38cb 1036
1037 $self->_server_info_hash($min_version_info); # on master
1038 }
1039
1040 return $self->_server_info_hash;
1041}
1042
1043sub _get_server_version {
1044 my $self = shift;
1045
1046 return $self->_server_info->{dbms_version};
1047}
1048
7e38d850 1049=head1 GOTCHAS
1050
1051Due to the fact that replicants can lag behind a master, you must take care to
1052make sure you use one of the methods to force read queries to a master should
1053you need realtime data integrity. For example, if you insert a row, and then
1054immediately re-read it from the database (say, by doing $row->discard_changes)
1055or you insert a row and then immediately build a query that expects that row
1056to be an item, you should force the master to handle reads. Otherwise, due to
1057the lag, there is no certainty your data will be in the expected state.
1058
1059For data integrity, all transactions automatically use the master storage for
1060all read and write queries. Using a transaction is the preferred and recommended
1061method to force the master to handle all read queries.
1062
1063Otherwise, you can force a single query to use the master with the 'force_pool'
1064attribute:
1065
1066 my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk);
1067
1068This attribute will safely be ignore by non replicated storages, so you can use
1069the same code for both types of systems.
1070
1071Lastly, you can use the L</execute_reliably> method, which works very much like
1072a transaction.
1073
1074For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
1075and L</set_balanced_storage>, however this operates at a global level and is not
1076suitable if you have a shared Schema object being used by multiple processes,
1077such as on a web application server. You can get around this limitation by
1078using the Schema clone method.
1079
1080 my $new_schema = $schema->clone;
1081 $new_schema->set_reliable_storage;
d4daee7b 1082
7e38d850 1083 ## $new_schema will use only the Master storage for all reads/writes while
1084 ## the $schema object will use replicated storage.
1085
f5d3a5de 1086=head1 AUTHOR
1087
64cdad22 1088 John Napiorkowski <john.napiorkowski@takkle.com>
f5d3a5de 1089
c4d3fae2 1090Based on code originated by:
f5d3a5de 1091
64cdad22 1092 Norbert Csongrádi <bert@cpan.org>
1093 Peter Siklósi <einon@einon.hu>
2156bbdd 1094
f5d3a5de 1095=head1 LICENSE
1096
1097You may distribute this code under the same terms as Perl itself.
1098
1099=cut
1100
c354902c 1101__PACKAGE__->meta->make_immutable;
1102
f5d3a5de 11031;