All expected evals converted to try, except where no test is done,
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
CommitLineData
2156bbdd 1package DBIx::Class::Storage::DBI::Replicated;
f5d3a5de 2
ecb65397 3BEGIN {
4 use Carp::Clan qw/^DBIx::Class/;
a34b0c89 5 use DBIx::Class;
6 croak('The following modules are required for Replication ' . DBIx::Class::Optional::Dependencies->req_missing_for ('replicated') )
7 unless DBIx::Class::Optional::Dependencies->req_ok_for ('replicated');
ecb65397 8}
9
b2e4d522 10use Moose;
26ab719a 11use DBIx::Class::Storage::DBI;
2bf79155 12use DBIx::Class::Storage::DBI::Replicated::Pool;
26ab719a 13use DBIx::Class::Storage::DBI::Replicated::Balancer;
6a151f58 14use DBIx::Class::Storage::DBI::Replicated::Types qw/BalancerClassNamePart DBICSchema DBICStorageDBI/;
41916570 15use MooseX::Types::Moose qw/ClassName HashRef Object/;
b2e4d522 16use Scalar::Util 'reftype';
e666c5fd 17use Hash::Merge;
7da56142 18use List::Util qw/min max reduce/;
ed7ab0f4 19use Try::Tiny;
9901aad7 20
21use namespace::clean -except => 'meta';
2bf79155 22
23=head1 NAME
24
ecb65397 25DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
2bf79155 26
27=head1 SYNOPSIS
28
29The Following example shows how to change an existing $schema to a replicated
48580715 30storage type, add some replicated (read-only) databases, and perform reporting
955a6df6 31tasks.
2bf79155 32
3da4f736 33You should set the 'storage_type attribute to a replicated type. You should
d4daee7b 34also define your arguments, such as which balancer you want and any arguments
3da4f736 35that the Pool object should get.
36
ce854fd3 37 my $schema = Schema::Class->clone;
64cdad22 38 $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
ce854fd3 39 $schema->connection(...);
d4daee7b 40
3da4f736 41Next, you need to add in the Replicants. Basically this is an array of
42arrayrefs, where each arrayref is database connect information. Think of these
43arguments as what you'd pass to the 'normal' $schema->connect method.
d4daee7b 44
64cdad22 45 $schema->storage->connect_replicants(
46 [$dsn1, $user, $pass, \%opts],
47 [$dsn2, $user, $pass, \%opts],
48 [$dsn3, $user, $pass, \%opts],
49 );
d4daee7b 50
3da4f736 51Now, just use the $schema as you normally would. Automatically all reads will
52be delegated to the replicants, while writes to the master.
53
7e38d850 54 $schema->resultset('Source')->search({name=>'etc'});
d4daee7b 55
3da4f736 56You can force a given query to use a particular storage using the search
57attribute 'force_pool'. For example:
d4daee7b 58
7e38d850 59 my $RS = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
3da4f736 60
61Now $RS will force everything (both reads and writes) to use whatever was setup
62as the master storage. 'master' is hardcoded to always point to the Master,
63but you can also use any Replicant name. Please see:
212cc5c2 64L<DBIx::Class::Storage::DBI::Replicated::Pool> and the replicants attribute for more.
3da4f736 65
66Also see transactions and L</execute_reliably> for alternative ways to
67force read traffic to the master. In general, you should wrap your statements
68in a transaction when you are reading and writing to the same tables at the
69same time, since your replicants will often lag a bit behind the master.
212cc5c2 70
71See L<DBIx::Class::Storage::DBI::Replicated::Instructions> for more help and
72walkthroughs.
d4daee7b 73
2bf79155 74=head1 DESCRIPTION
75
7e38d850 76Warning: This class is marked BETA. This has been running a production
ccb3b897 77website using MySQL native replication as its backend and we have some decent
7e38d850 78test coverage but the code hasn't yet been stressed by a variety of databases.
48580715 79Individual DBs may have quirks we are not aware of. Please use this in first
7e38d850 80development and pass along your experiences/bug fixes.
2bf79155 81
82This class implements replicated data store for DBI. Currently you can define
83one master and numerous slave database connections. All write-type queries
84(INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
85database, all read-type queries (SELECTs) go to the slave database.
86
87Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
bca099a3 88handle gets delegated to one of the two attributes: L</read_handler> or to
89L</write_handler>. Additionally, some methods need to be distributed
2bf79155 90to all existing storages. This way our storage class is a drop in replacement
91for L<DBIx::Class::Storage::DBI>.
92
48580715 93Read traffic is spread across the replicants (slaves) occurring to a user
2bf79155 94selected algorithm. The default algorithm is random weighted.
95
bca099a3 96=head1 NOTES
97
48580715 98The consistency between master and replicants is database specific. The Pool
faaba25f 99gives you a method to validate its replicants, removing and replacing them
7e38d850 100when they fail/pass predefined criteria. Please make careful use of the ways
ecb65397 101to force a query to run against Master when needed.
102
103=head1 REQUIREMENTS
104
a34b0c89 105Replicated Storage has additional requirements not currently part of
aa8b2277 106L<DBIx::Class>. See L<DBIx::Class::Optional::Dependencies> for more details.
2bf79155 107
108=head1 ATTRIBUTES
109
110This class defines the following attributes.
111
2ce6e9a6 112=head2 schema
113
114The underlying L<DBIx::Class::Schema> object this storage is attaching
115
116=cut
117
118has 'schema' => (
119 is=>'rw',
6a151f58 120 isa=>DBICSchema,
2ce6e9a6 121 weak_ref=>1,
122 required=>1,
123);
124
26ab719a 125=head2 pool_type
2bf79155 126
26ab719a 127Contains the classname which will instantiate the L</pool> object. Defaults
128to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
2bf79155 129
130=cut
131
26ab719a 132has 'pool_type' => (
dcdf7b2c 133 is=>'rw',
41916570 134 isa=>ClassName,
2ce6e9a6 135 default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
64cdad22 136 handles=>{
137 'create_pool' => 'new',
138 },
2bf79155 139);
140
f068a139 141=head2 pool_args
142
143Contains a hashref of initialized information to pass to the Balancer object.
212cc5c2 144See L<DBIx::Class::Storage::DBI::Replicated::Pool> for available arguments.
f068a139 145
146=cut
147
148has 'pool_args' => (
dcdf7b2c 149 is=>'rw',
41916570 150 isa=>HashRef,
64cdad22 151 lazy=>1,
64cdad22 152 default=>sub { {} },
f068a139 153);
154
155
26ab719a 156=head2 balancer_type
2bf79155 157
158The replication pool requires a balance class to provider the methods for
159choose how to spread the query load across each replicant in the pool.
160
161=cut
162
26ab719a 163has 'balancer_type' => (
dcdf7b2c 164 is=>'rw',
9901aad7 165 isa=>BalancerClassNamePart,
2ce6e9a6 166 coerce=>1,
167 required=>1,
168 default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
64cdad22 169 handles=>{
170 'create_balancer' => 'new',
171 },
2bf79155 172);
173
17b05c13 174=head2 balancer_args
175
176Contains a hashref of initialized information to pass to the Balancer object.
212cc5c2 177See L<DBIx::Class::Storage::DBI::Replicated::Balancer> for available arguments.
17b05c13 178
179=cut
180
181has 'balancer_args' => (
dcdf7b2c 182 is=>'rw',
41916570 183 isa=>HashRef,
64cdad22 184 lazy=>1,
185 required=>1,
186 default=>sub { {} },
17b05c13 187);
188
26ab719a 189=head2 pool
2bf79155 190
26ab719a 191Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class. This is a
192container class for one or more replicated databases.
2bf79155 193
194=cut
195
26ab719a 196has 'pool' => (
64cdad22 197 is=>'ro',
198 isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
199 lazy_build=>1,
200 handles=>[qw/
6f7344b8 201 connect_replicants
64cdad22 202 replicants
203 has_replicants
204 /],
2bf79155 205);
206
26ab719a 207=head2 balancer
2bf79155 208
26ab719a 209Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class. This
210is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>)
2bf79155 211
26ab719a 212=cut
2bf79155 213
26ab719a 214has 'balancer' => (
dcdf7b2c 215 is=>'rw',
64cdad22 216 isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
217 lazy_build=>1,
218 handles=>[qw/auto_validate_every/],
26ab719a 219);
2bf79155 220
cb6ec758 221=head2 master
222
223The master defines the canonical state for a pool of connected databases. All
224the replicants are expected to match this databases state. Thus, in a classic
225Master / Slaves distributed system, all the slaves are expected to replicate
226the Master's state as quick as possible. This is the only database in the
227pool of databases that is allowed to handle write traffic.
228
229=cut
230
231has 'master' => (
64cdad22 232 is=> 'ro',
6a151f58 233 isa=>DBICStorageDBI,
64cdad22 234 lazy_build=>1,
cb6ec758 235);
236
cb6ec758 237=head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
238
239The following methods are delegated all the methods required for the
240L<DBIx::Class::Storage::DBI> interface.
241
242=head2 read_handler
243
244Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
245
246=cut
247
248has 'read_handler' => (
64cdad22 249 is=>'rw',
41916570 250 isa=>Object,
64cdad22 251 lazy_build=>1,
252 handles=>[qw/
253 select
254 select_single
255 columns_info_for
3244fdcc 256 _dbh_columns_info_for
257 _select
6f7344b8 258 /],
cb6ec758 259);
260
cb6ec758 261=head2 write_handler
262
3244fdcc 263Defines an object that implements the write side of L<BIx::Class::Storage::DBI>,
264as well as methods that don't write or read that can be called on only one
265storage, methods that return a C<$dbh>, and any methods that don't make sense to
266run on a replicant.
cb6ec758 267
268=cut
269
270has 'write_handler' => (
64cdad22 271 is=>'ro',
41916570 272 isa=>Object,
64cdad22 273 lazy_build=>1,
6f7344b8 274 handles=>[qw/
64cdad22 275 on_connect_do
6f7344b8 276 on_disconnect_do
3244fdcc 277 on_connect_call
278 on_disconnect_call
64cdad22 279 connect_info
3244fdcc 280 _connect_info
64cdad22 281 throw_exception
282 sql_maker
283 sqlt_type
284 create_ddl_dir
285 deployment_statements
286 datetime_parser
6f7344b8 287 datetime_parser_type
288 build_datetime_parser
64cdad22 289 last_insert_id
290 insert
291 insert_bulk
292 update
293 delete
294 dbh
2ce6e9a6 295 txn_begin
64cdad22 296 txn_do
297 txn_commit
298 txn_rollback
2ce6e9a6 299 txn_scope_guard
64cdad22 300 sth
301 deploy
0180bef9 302 with_deferred_fk_checks
6f7344b8 303 dbh_do
64cdad22 304 reload_row
6f7344b8 305 with_deferred_fk_checks
2ce6e9a6 306 _prep_for_execute
7fb60fb1 307
6f7344b8 308 backup
309 is_datatype_numeric
227d8366 310 _supports_insert_returning
6f7344b8 311 _count_select
6f7344b8 312 _subq_update_delete
313 svp_rollback
314 svp_begin
315 svp_release
e398f77e 316 relname_to_table_alias
317 _straight_join_to_node
3244fdcc 318 _dbh_last_insert_id
319 _fix_bind_params
320 _default_dbi_connect_attributes
321 _dbi_connect_info
322 auto_savepoint
323 _sqlt_version_ok
324 _query_end
325 bind_attribute_by_data_type
326 transaction_depth
327 _dbh
328 _select_args
329 _dbh_execute_array
330 _sql_maker_args
331 _sql_maker
332 _query_start
333 _sqlt_version_error
334 _per_row_update_delete
335 _dbh_begin_work
336 _dbh_execute_inserts_with_no_binds
337 _select_args_to_query
338 _svp_generate_name
339 _multipk_update_delete
340 source_bind_attributes
341 _normalize_connect_info
342 _parse_connect_do
343 _dbh_commit
344 _execute_array
345 _placeholders_supported
3244fdcc 346 savepoints
347 _sqlt_minimum_version
348 _sql_maker_opts
349 _conn_pid
350 _typeless_placeholders_supported
351 _conn_tid
352 _dbh_autocommit
353 _native_data_type
354 _get_dbh
355 sql_maker_class
356 _dbh_rollback
357 _adjust_select_args_for_complex_prefetch
358 _resolve_ident_sources
359 _resolve_column_info
360 _prune_unused_joins
361 _strip_cond_qualifiers
362 _parse_order_by
363 _resolve_aliastypes_from_select_args
364 _execute
365 _do_query
366 _dbh_sth
367 _dbh_execute
6ed1cd2e 368 _prefetch_insert_auto_nextvals
6d766626 369 _server_info_hash
64cdad22 370 /],
cb6ec758 371);
372
e471ab87 373my @unimplemented = qw(
374 _arm_global_destructor
375 _preserve_foreign_dbh
572338e0 376 _verify_pid
377 _verify_tid
e471ab87 378);
379
380for my $method (@unimplemented) {
381 __PACKAGE__->meta->add_method($method, sub {
382 croak "$method must not be called on ".(blessed shift).' objects';
383 });
384}
6d766626 385
b2e4d522 386has _master_connect_info_opts =>
387 (is => 'rw', isa => HashRef, default => sub { {} });
388
389=head2 around: connect_info
390
48580715 391Preserves master's C<connect_info> options (for merging with replicants.)
392Also sets any Replicated-related options from connect_info, such as
dcdf7b2c 393C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>.
b2e4d522 394
395=cut
396
397around connect_info => sub {
398 my ($next, $self, $info, @extra) = @_;
399
0ce2d0d5 400 my $wantarray = wantarray;
401
282a9a4f 402 my $merge = Hash::Merge->new('LEFT_PRECEDENT');
e666c5fd 403
b2e4d522 404 my %opts;
405 for my $arg (@$info) {
406 next unless (reftype($arg)||'') eq 'HASH';
e666c5fd 407 %opts = %{ $merge->merge($arg, \%opts) };
b2e4d522 408 }
b2e4d522 409 delete $opts{dsn};
410
dcdf7b2c 411 if (@opts{qw/pool_type pool_args/}) {
412 $self->pool_type(delete $opts{pool_type})
413 if $opts{pool_type};
414
b88b85e7 415 $self->pool_args(
e666c5fd 416 $merge->merge((delete $opts{pool_args} || {}), $self->pool_args)
b88b85e7 417 );
dcdf7b2c 418
67c43863 419 $self->pool($self->_build_pool)
6f7344b8 420 if $self->pool;
dcdf7b2c 421 }
422
423 if (@opts{qw/balancer_type balancer_args/}) {
424 $self->balancer_type(delete $opts{balancer_type})
425 if $opts{balancer_type};
426
b88b85e7 427 $self->balancer_args(
e666c5fd 428 $merge->merge((delete $opts{balancer_args} || {}), $self->balancer_args)
b88b85e7 429 );
dcdf7b2c 430
67c43863 431 $self->balancer($self->_build_balancer)
6f7344b8 432 if $self->balancer;
dcdf7b2c 433 }
434
b2e4d522 435 $self->_master_connect_info_opts(\%opts);
436
0ce2d0d5 437 my (@res, $res);
438 if ($wantarray) {
439 @res = $self->$next($info, @extra);
440 } else {
441 $res = $self->$next($info, @extra);
442 }
443
fd4eb9c2 444 # Make sure master is blessed into the correct class and apply role to it.
445 my $master = $self->master;
446 $master->_determine_driver;
447 Moose::Meta::Class->initialize(ref $master);
cea43436 448
ec0946db 449 DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master);
cea43436 450
451 # link pool back to master
452 $self->pool->master($master);
0ce2d0d5 453
454 $wantarray ? @res : $res;
b2e4d522 455};
456
26ab719a 457=head1 METHODS
2bf79155 458
26ab719a 459This class defines the following methods.
2bf79155 460
c354902c 461=head2 BUILDARGS
2bf79155 462
faaba25f 463L<DBIx::Class::Schema> when instantiating its storage passed itself as the
2ce6e9a6 464first argument. So we need to massage the arguments a bit so that all the
465bits get put into the correct places.
2bf79155 466
467=cut
468
c354902c 469sub BUILDARGS {
d7a58a29 470 my ($class, $schema, $storage_type_args, @args) = @_;
d4daee7b 471
c354902c 472 return {
6f7344b8 473 schema=>$schema,
474 %$storage_type_args,
475 @args
c354902c 476 }
477}
2bf79155 478
cb6ec758 479=head2 _build_master
2bf79155 480
cb6ec758 481Lazy builder for the L</master> attribute.
2bf79155 482
483=cut
484
cb6ec758 485sub _build_master {
2ce6e9a6 486 my $self = shift @_;
ee356d00 487 my $master = DBIx::Class::Storage::DBI->new($self->schema);
ee356d00 488 $master
106d5f3b 489}
490
26ab719a 491=head2 _build_pool
2bf79155 492
26ab719a 493Lazy builder for the L</pool> attribute.
2bf79155 494
495=cut
496
26ab719a 497sub _build_pool {
64cdad22 498 my $self = shift @_;
499 $self->create_pool(%{$self->pool_args});
2bf79155 500}
501
26ab719a 502=head2 _build_balancer
2bf79155 503
cb6ec758 504Lazy builder for the L</balancer> attribute. This takes a Pool object so that
505the balancer knows which pool it's balancing.
2bf79155 506
507=cut
508
26ab719a 509sub _build_balancer {
64cdad22 510 my $self = shift @_;
511 $self->create_balancer(
6f7344b8 512 pool=>$self->pool,
64cdad22 513 master=>$self->master,
514 %{$self->balancer_args},
515 );
2bf79155 516}
517
cb6ec758 518=head2 _build_write_handler
2bf79155 519
cb6ec758 520Lazy builder for the L</write_handler> attribute. The default is to set this to
521the L</master>.
50336325 522
523=cut
524
cb6ec758 525sub _build_write_handler {
64cdad22 526 return shift->master;
cb6ec758 527}
50336325 528
cb6ec758 529=head2 _build_read_handler
2bf79155 530
cb6ec758 531Lazy builder for the L</read_handler> attribute. The default is to set this to
532the L</balancer>.
2bf79155 533
534=cut
535
cb6ec758 536sub _build_read_handler {
64cdad22 537 return shift->balancer;
cb6ec758 538}
50336325 539
cb6ec758 540=head2 around: connect_replicants
2bf79155 541
cb6ec758 542All calls to connect_replicants needs to have an existing $schema tacked onto
b2e4d522 543top of the args, since L<DBIx::Storage::DBI> needs it, and any C<connect_info>
544options merged with the master, with replicant opts having higher priority.
955a6df6 545
cb6ec758 546=cut
955a6df6 547
b2e4d522 548around connect_replicants => sub {
549 my ($next, $self, @args) = @_;
550
551 for my $r (@args) {
552 $r = [ $r ] unless reftype $r eq 'ARRAY';
553
1a58752c 554 $self->throw_exception('coderef replicant connect_info not supported')
b2e4d522 555 if ref $r->[0] && reftype $r->[0] eq 'CODE';
556
557# any connect_info options?
558 my $i = 0;
559 $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
560
6f7344b8 561# make one if none
b2e4d522 562 $r->[$i] = {} unless $r->[$i];
563
564# merge if two hashes
b88b85e7 565 my @hashes = @$r[$i .. $#{$r}];
566
1a58752c 567 $self->throw_exception('invalid connect_info options')
b88b85e7 568 if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes;
569
1a58752c 570 $self->throw_exception('too many hashrefs in connect_info')
b88b85e7 571 if @hashes > 2;
572
282a9a4f 573 my $merge = Hash::Merge->new('LEFT_PRECEDENT');
e666c5fd 574 my %opts = %{ $merge->merge(reverse @hashes) };
b88b85e7 575
576# delete them
b2e4d522 577 splice @$r, $i+1, ($#{$r} - $i), ();
578
0bd8e058 579# make sure master/replicants opts don't clash
580 my %master_opts = %{ $self->_master_connect_info_opts };
581 if (exists $opts{dbh_maker}) {
582 delete @master_opts{qw/dsn user password/};
583 }
584 delete $master_opts{dbh_maker};
585
b2e4d522 586# merge with master
e666c5fd 587 %opts = %{ $merge->merge(\%opts, \%master_opts) };
b2e4d522 588
589# update
590 $r->[$i] = \%opts;
591 }
592
593 $self->$next($self->schema, @args);
955a6df6 594};
2bf79155 595
2bf79155 596=head2 all_storages
597
598Returns an array of of all the connected storage backends. The first element
599in the returned array is the master, and the remainings are each of the
600replicants.
601
602=cut
603
604sub all_storages {
64cdad22 605 my $self = shift @_;
606 return grep {defined $_ && blessed $_} (
607 $self->master,
6412a592 608 values %{ $self->replicants },
64cdad22 609 );
2bf79155 610}
611
c4d3fae2 612=head2 execute_reliably ($coderef, ?@args)
613
614Given a coderef, saves the current state of the L</read_handler>, forces it to
48580715 615use reliable storage (e.g. sets it to the master), executes a coderef and then
c4d3fae2 616restores the original state.
617
618Example:
619
64cdad22 620 my $reliably = sub {
621 my $name = shift @_;
622 $schema->resultset('User')->create({name=>$name});
623 my $user_rs = $schema->resultset('User')->find({name=>$name});
624 return $user_rs;
625 };
c4d3fae2 626
64cdad22 627 my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
c4d3fae2 628
629Use this when you must be certain of your database state, such as when you just
630inserted something and need to get a resultset including it, etc.
631
632=cut
633
634sub execute_reliably {
64cdad22 635 my ($self, $coderef, @args) = @_;
d4daee7b 636
64cdad22 637 unless( ref $coderef eq 'CODE') {
638 $self->throw_exception('Second argument must be a coderef');
639 }
d4daee7b 640
64cdad22 641 ##Get copy of master storage
642 my $master = $self->master;
d4daee7b 643
64cdad22 644 ##Get whatever the current read hander is
645 my $current = $self->read_handler;
d4daee7b 646
64cdad22 647 ##Set the read handler to master
648 $self->read_handler($master);
d4daee7b 649
64cdad22 650 ## do whatever the caller needs
651 my @result;
652 my $want_array = wantarray;
d4daee7b 653
ed7ab0f4 654 my $exception;
655 try {
64cdad22 656 if($want_array) {
657 @result = $coderef->(@args);
658 } elsif(defined $want_array) {
659 ($result[0]) = ($coderef->(@args));
ed213e85 660 } else {
64cdad22 661 $coderef->(@args);
6f7344b8 662 }
ed7ab0f4 663 } catch {
664 $self->throw_exception("coderef returned an error: $_");
665 } finally {
666 ##Reset to the original state
667 $self->read_handler($current);
64cdad22 668 };
d4daee7b 669
ed7ab0f4 670 return $want_array ? @result : $result[0];
c4d3fae2 671}
672
cb6ec758 673=head2 set_reliable_storage
674
675Sets the current $schema to be 'reliable', that is all queries, both read and
676write are sent to the master
d4daee7b 677
cb6ec758 678=cut
679
680sub set_reliable_storage {
64cdad22 681 my $self = shift @_;
682 my $schema = $self->schema;
683 my $write_handler = $self->schema->storage->write_handler;
d4daee7b 684
64cdad22 685 $schema->storage->read_handler($write_handler);
cb6ec758 686}
687
688=head2 set_balanced_storage
689
690Sets the current $schema to be use the </balancer> for all reads, while all
48580715 691writes are sent to the master only
d4daee7b 692
cb6ec758 693=cut
694
695sub set_balanced_storage {
64cdad22 696 my $self = shift @_;
697 my $schema = $self->schema;
bd5da369 698 my $balanced_handler = $self->schema->storage->balancer;
d4daee7b 699
bd5da369 700 $schema->storage->read_handler($balanced_handler);
cb6ec758 701}
2bf79155 702
703=head2 connected
704
705Check that the master and at least one of the replicants is connected.
706
707=cut
708
709sub connected {
64cdad22 710 my $self = shift @_;
711 return
712 $self->master->connected &&
713 $self->pool->connected_replicants;
2bf79155 714}
715
2bf79155 716=head2 ensure_connected
717
718Make sure all the storages are connected.
719
720=cut
721
722sub ensure_connected {
64cdad22 723 my $self = shift @_;
724 foreach my $source ($self->all_storages) {
725 $source->ensure_connected(@_);
726 }
2bf79155 727}
728
2bf79155 729=head2 limit_dialect
730
731Set the limit_dialect for all existing storages
732
733=cut
734
735sub limit_dialect {
64cdad22 736 my $self = shift @_;
737 foreach my $source ($self->all_storages) {
738 $source->limit_dialect(@_);
739 }
3fbe08e3 740 return $self->master->quote_char;
2bf79155 741}
742
2bf79155 743=head2 quote_char
744
745Set the quote_char for all existing storages
746
747=cut
748
749sub quote_char {
64cdad22 750 my $self = shift @_;
751 foreach my $source ($self->all_storages) {
752 $source->quote_char(@_);
753 }
3fbe08e3 754 return $self->master->quote_char;
2bf79155 755}
756
2bf79155 757=head2 name_sep
758
759Set the name_sep for all existing storages
760
761=cut
762
763sub name_sep {
64cdad22 764 my $self = shift @_;
765 foreach my $source ($self->all_storages) {
766 $source->name_sep(@_);
767 }
3fbe08e3 768 return $self->master->name_sep;
2bf79155 769}
770
2bf79155 771=head2 set_schema
772
773Set the schema object for all existing storages
774
775=cut
776
777sub set_schema {
64cdad22 778 my $self = shift @_;
779 foreach my $source ($self->all_storages) {
780 $source->set_schema(@_);
781 }
2bf79155 782}
783
2bf79155 784=head2 debug
785
786set a debug flag across all storages
787
788=cut
789
790sub debug {
64cdad22 791 my $self = shift @_;
3fbe08e3 792 if(@_) {
793 foreach my $source ($self->all_storages) {
794 $source->debug(@_);
6f7344b8 795 }
64cdad22 796 }
3fbe08e3 797 return $self->master->debug;
2bf79155 798}
799
2bf79155 800=head2 debugobj
801
cea43436 802set a debug object
2bf79155 803
804=cut
805
806sub debugobj {
64cdad22 807 my $self = shift @_;
cea43436 808 return $self->master->debugobj(@_);
2bf79155 809}
810
2bf79155 811=head2 debugfh
812
cea43436 813set a debugfh object
2bf79155 814
815=cut
816
817sub debugfh {
64cdad22 818 my $self = shift @_;
cea43436 819 return $self->master->debugfh(@_);
2bf79155 820}
821
2bf79155 822=head2 debugcb
823
cea43436 824set a debug callback
2bf79155 825
826=cut
827
828sub debugcb {
64cdad22 829 my $self = shift @_;
cea43436 830 return $self->master->debugcb(@_);
2bf79155 831}
832
2bf79155 833=head2 disconnect
834
835disconnect everything
836
837=cut
838
839sub disconnect {
64cdad22 840 my $self = shift @_;
841 foreach my $source ($self->all_storages) {
842 $source->disconnect(@_);
843 }
2bf79155 844}
845
b2e4d522 846=head2 cursor_class
847
848set cursor class on all storages, or return master's
849
850=cut
851
852sub cursor_class {
853 my ($self, $cursor_class) = @_;
854
855 if ($cursor_class) {
856 $_->cursor_class($cursor_class) for $self->all_storages;
857 }
858 $self->master->cursor_class;
859}
d4daee7b 860
3244fdcc 861=head2 cursor
862
863set cursor class on all storages, or return master's, alias for L</cursor_class>
864above.
865
866=cut
867
868sub cursor {
869 my ($self, $cursor_class) = @_;
870
871 if ($cursor_class) {
872 $_->cursor($cursor_class) for $self->all_storages;
873 }
874 $self->master->cursor;
875}
876
877=head2 unsafe
878
879sets the L<DBIx::Class::Storage::DBI/unsafe> option on all storages or returns
880master's current setting
881
882=cut
883
884sub unsafe {
885 my $self = shift;
886
887 if (@_) {
888 $_->unsafe(@_) for $self->all_storages;
889 }
890
891 return $self->master->unsafe;
892}
893
894=head2 disable_sth_caching
895
896sets the L<DBIx::Class::Storage::DBI/disable_sth_caching> option on all storages
897or returns master's current setting
898
899=cut
900
901sub disable_sth_caching {
902 my $self = shift;
903
904 if (@_) {
905 $_->disable_sth_caching(@_) for $self->all_storages;
906 }
907
908 return $self->master->disable_sth_caching;
909}
910
911=head2 lag_behind_master
912
913returns the highest Replicant L<DBIx::Class::Storage::DBI/lag_behind_master>
914setting
915
916=cut
917
918sub lag_behind_master {
919 my $self = shift;
920
921 return max map $_->lag_behind_master, $self->replicants;
922}
923
924=head2 is_replicating
925
926returns true if all replicants return true for
927L<DBIx::Class::Storage::DBI/is_replicating>
928
929=cut
930
931sub is_replicating {
932 my $self = shift;
933
934 return (grep $_->is_replicating, $self->replicants) == ($self->replicants);
935}
936
937=head2 connect_call_datetime_setup
938
939calls L<DBIx::Class::Storage::DBI/connect_call_datetime_setup> for all storages
940
941=cut
942
943sub connect_call_datetime_setup {
944 my $self = shift;
945 $_->connect_call_datetime_setup for $self->all_storages;
946}
947
948sub _populate_dbh {
949 my $self = shift;
950 $_->_populate_dbh for $self->all_storages;
951}
952
953sub _connect {
954 my $self = shift;
955 $_->_connect for $self->all_storages;
956}
957
958sub _rebless {
959 my $self = shift;
960 $_->_rebless for $self->all_storages;
961}
962
963sub _determine_driver {
964 my $self = shift;
965 $_->_determine_driver for $self->all_storages;
966}
967
968sub _driver_determined {
969 my $self = shift;
970
971 if (@_) {
972 $_->_driver_determined(@_) for $self->all_storages;
973 }
974
975 return $self->master->_driver_determined;
976}
977
978sub _init {
979 my $self = shift;
980
981 $_->_init for $self->all_storages;
982}
983
984sub _run_connection_actions {
985 my $self = shift;
986
987 $_->_run_connection_actions for $self->all_storages;
988}
989
990sub _do_connection_actions {
991 my $self = shift;
992
993 if (@_) {
994 $_->_do_connection_actions(@_) for $self->all_storages;
995 }
996}
997
998sub connect_call_do_sql {
999 my $self = shift;
1000 $_->connect_call_do_sql(@_) for $self->all_storages;
1001}
1002
1003sub disconnect_call_do_sql {
1004 my $self = shift;
1005 $_->disconnect_call_do_sql(@_) for $self->all_storages;
1006}
1007
1008sub _seems_connected {
1009 my $self = shift;
1010
1011 return min map $_->_seems_connected, $self->all_storages;
1012}
1013
1014sub _ping {
1015 my $self = shift;
1016
1017 return min map $_->_ping, $self->all_storages;
1018}
1019
7da56142 1020my $numify_ver = sub {
1021 my $ver = shift;
1022 my @numparts = split /\D+/, $ver;
1023 my $format = '%d.' . (join '', ('%05d') x (@numparts - 1));
1024
1025 return sprintf $format, @numparts;
1026};
1027
fecb38cb 1028sub _server_info {
1029 my $self = shift;
1030
1031 if (not $self->_server_info_hash) {
7da56142 1032 my $min_version_info = (
1033 reduce { $a->[0] < $b->[0] ? $a : $b }
1034 map [ $numify_ver->($_->{dbms_version}), $_ ],
1035 map $_->_server_info, $self->all_storages
1036 )->[1];
fecb38cb 1037
1038 $self->_server_info_hash($min_version_info); # on master
1039 }
1040
1041 return $self->_server_info_hash;
1042}
1043
1044sub _get_server_version {
1045 my $self = shift;
1046
1047 return $self->_server_info->{dbms_version};
1048}
1049
7e38d850 1050=head1 GOTCHAS
1051
1052Due to the fact that replicants can lag behind a master, you must take care to
1053make sure you use one of the methods to force read queries to a master should
1054you need realtime data integrity. For example, if you insert a row, and then
1055immediately re-read it from the database (say, by doing $row->discard_changes)
1056or you insert a row and then immediately build a query that expects that row
1057to be an item, you should force the master to handle reads. Otherwise, due to
1058the lag, there is no certainty your data will be in the expected state.
1059
1060For data integrity, all transactions automatically use the master storage for
1061all read and write queries. Using a transaction is the preferred and recommended
1062method to force the master to handle all read queries.
1063
1064Otherwise, you can force a single query to use the master with the 'force_pool'
1065attribute:
1066
1067 my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk);
1068
1069This attribute will safely be ignore by non replicated storages, so you can use
1070the same code for both types of systems.
1071
1072Lastly, you can use the L</execute_reliably> method, which works very much like
1073a transaction.
1074
1075For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
1076and L</set_balanced_storage>, however this operates at a global level and is not
1077suitable if you have a shared Schema object being used by multiple processes,
1078such as on a web application server. You can get around this limitation by
1079using the Schema clone method.
1080
1081 my $new_schema = $schema->clone;
1082 $new_schema->set_reliable_storage;
d4daee7b 1083
7e38d850 1084 ## $new_schema will use only the Master storage for all reads/writes while
1085 ## the $schema object will use replicated storage.
1086
f5d3a5de 1087=head1 AUTHOR
1088
64cdad22 1089 John Napiorkowski <john.napiorkowski@takkle.com>
f5d3a5de 1090
c4d3fae2 1091Based on code originated by:
f5d3a5de 1092
64cdad22 1093 Norbert Csongrádi <bert@cpan.org>
1094 Peter Siklósi <einon@einon.hu>
2156bbdd 1095
f5d3a5de 1096=head1 LICENSE
1097
1098You may distribute this code under the same terms as Perl itself.
1099
1100=cut
1101
c354902c 1102__PACKAGE__->meta->make_immutable;
1103
f5d3a5de 11041;