convert from the bottom up
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
CommitLineData
2156bbdd 1package DBIx::Class::Storage::DBI::Replicated;
0bbe6676 2
ecb65397 3BEGIN {
4 use Carp::Clan qw/^DBIx::Class/;
a34b0c89 5 use DBIx::Class;
6 croak('The following modules are required for Replication ' . DBIx::Class::Optional::Dependencies->req_missing_for ('replicated') )
7 unless DBIx::Class::Optional::Dependencies->req_ok_for ('replicated');
ecb65397 8}
9
0bbe6676 10use Moo;
11use Role::Tiny ();
26ab719a 12use DBIx::Class::Storage::DBI;
0bbe6676 13use Scalar::Util qw(reftype blessed);
14use List::Util qw(min max reduce);
ed7ab0f4 15use Try::Tiny;
0bbe6676 16use Sub::Name 'subname';
17use Class::Inspector;
18use DBIx::Class::Storage::DBI::Replicated::Types
19 qw(DBICSchema DBICStorageDBI ClassName HashRef Object
20 DoesDBICStorageReplicatedBalancer DBICStorageDBIReplicatedPool Defined);
2bf79155 21
22=head1 NAME
23
ecb65397 24DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
2bf79155 25
26=head1 SYNOPSIS
27
28The Following example shows how to change an existing $schema to a replicated
48580715 29storage type, add some replicated (read-only) databases, and perform reporting
955a6df6 30tasks.
2bf79155 31
3da4f736 32You should set the 'storage_type attribute to a replicated type. You should
d4daee7b 33also define your arguments, such as which balancer you want and any arguments
3da4f736 34that the Pool object should get.
35
ce854fd3 36 my $schema = Schema::Class->clone;
64cdad22 37 $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] );
ce854fd3 38 $schema->connection(...);
d4daee7b 39
fd323bf1 40Next, you need to add in the Replicants. Basically this is an array of
3da4f736 41arrayrefs, where each arrayref is database connect information. Think of these
42arguments as what you'd pass to the 'normal' $schema->connect method.
d4daee7b 43
64cdad22 44 $schema->storage->connect_replicants(
45 [$dsn1, $user, $pass, \%opts],
46 [$dsn2, $user, $pass, \%opts],
47 [$dsn3, $user, $pass, \%opts],
48 );
d4daee7b 49
3da4f736 50Now, just use the $schema as you normally would. Automatically all reads will
51be delegated to the replicants, while writes to the master.
52
7e38d850 53 $schema->resultset('Source')->search({name=>'etc'});
d4daee7b 54
3da4f736 55You can force a given query to use a particular storage using the search
56attribute 'force_pool'. For example:
d4daee7b 57
7e38d850 58 my $RS = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
3da4f736 59
60Now $RS will force everything (both reads and writes) to use whatever was setup
fd323bf1 61as the master storage. 'master' is hardcoded to always point to the Master,
3da4f736 62but you can also use any Replicant name. Please see:
212cc5c2 63L<DBIx::Class::Storage::DBI::Replicated::Pool> and the replicants attribute for more.
3da4f736 64
65Also see transactions and L</execute_reliably> for alternative ways to
66force read traffic to the master. In general, you should wrap your statements
67in a transaction when you are reading and writing to the same tables at the
68same time, since your replicants will often lag a bit behind the master.
212cc5c2 69
70See L<DBIx::Class::Storage::DBI::Replicated::Instructions> for more help and
71walkthroughs.
d4daee7b 72
2bf79155 73=head1 DESCRIPTION
74
7e38d850 75Warning: This class is marked BETA. This has been running a production
ccb3b897 76website using MySQL native replication as its backend and we have some decent
7e38d850 77test coverage but the code hasn't yet been stressed by a variety of databases.
48580715 78Individual DBs may have quirks we are not aware of. Please use this in first
7e38d850 79development and pass along your experiences/bug fixes.
2bf79155 80
81This class implements replicated data store for DBI. Currently you can define
82one master and numerous slave database connections. All write-type queries
83(INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
84database, all read-type queries (SELECTs) go to the slave database.
85
86Basically, any method request that L<DBIx::Class::Storage::DBI> would normally
bca099a3 87handle gets delegated to one of the two attributes: L</read_handler> or to
88L</write_handler>. Additionally, some methods need to be distributed
2bf79155 89to all existing storages. This way our storage class is a drop in replacement
90for L<DBIx::Class::Storage::DBI>.
91
48580715 92Read traffic is spread across the replicants (slaves) occurring to a user
2bf79155 93selected algorithm. The default algorithm is random weighted.
94
bca099a3 95=head1 NOTES
96
48580715 97The consistency between master and replicants is database specific. The Pool
faaba25f 98gives you a method to validate its replicants, removing and replacing them
7e38d850 99when they fail/pass predefined criteria. Please make careful use of the ways
ecb65397 100to force a query to run against Master when needed.
101
102=head1 REQUIREMENTS
103
a34b0c89 104Replicated Storage has additional requirements not currently part of
aa8b2277 105L<DBIx::Class>. See L<DBIx::Class::Optional::Dependencies> for more details.
2bf79155 106
107=head1 ATTRIBUTES
108
109This class defines the following attributes.
110
2ce6e9a6 111=head2 schema
112
113The underlying L<DBIx::Class::Schema> object this storage is attaching
114
115=cut
116
117has 'schema' => (
0bbe6676 118 is=>'rw',
119 isa=>DBICSchema,
120 weak_ref=>1,
121 required=>1,
2ce6e9a6 122);
123
26ab719a 124=head2 pool_type
2bf79155 125
fd323bf1 126Contains the classname which will instantiate the L</pool> object. Defaults
26ab719a 127to: L<DBIx::Class::Storage::DBI::Replicated::Pool>.
2bf79155 128
129=cut
130
26ab719a 131has 'pool_type' => (
dcdf7b2c 132 is=>'rw',
41916570 133 isa=>ClassName,
0bbe6676 134 default=> sub { 'DBIx::Class::Storage::DBI::Replicated::Pool'},
64cdad22 135 handles=>{
136 'create_pool' => 'new',
137 },
2bf79155 138);
139
f068a139 140=head2 pool_args
141
142Contains a hashref of initialized information to pass to the Balancer object.
212cc5c2 143See L<DBIx::Class::Storage::DBI::Replicated::Pool> for available arguments.
f068a139 144
145=cut
146
147has 'pool_args' => (
dcdf7b2c 148 is=>'rw',
0bbe6676 149 isa =>HashRef,
64cdad22 150 lazy=>1,
64cdad22 151 default=>sub { {} },
f068a139 152);
153
154
26ab719a 155=head2 balancer_type
2bf79155 156
157The replication pool requires a balance class to provider the methods for
158choose how to spread the query load across each replicant in the pool.
159
160=cut
161
26ab719a 162has 'balancer_type' => (
dcdf7b2c 163 is=>'rw',
0bbe6676 164 isa=>Defined,
165 default=>sub { 'DBIx::Class::Storage::DBI::Replicated::Balancer::First' },
2bf79155 166);
167
0bbe6676 168sub create_balancer {
169 my ($self, @args) = @_;
170 my $type = $self->balancer_type;
171 $type = 'DBIx::Class::Storage::DBI::Replicated::Balancer'.$type
172 if ($type=~m/^::/);
173 $self->schema->ensure_class_loaded($type);
174 return $type->new(@args);
175}
176
17b05c13 177=head2 balancer_args
178
179Contains a hashref of initialized information to pass to the Balancer object.
212cc5c2 180See L<DBIx::Class::Storage::DBI::Replicated::Balancer> for available arguments.
17b05c13 181
182=cut
183
184has 'balancer_args' => (
dcdf7b2c 185 is=>'rw',
0bbe6676 186 isa =>HashRef,
64cdad22 187 lazy=>1,
0bbe6676 188 default=>sub { +{} },
17b05c13 189);
190
26ab719a 191=head2 pool
2bf79155 192
370f78d4 193Is a L<DBIx::Class::Storage::DBI::Replicated::Pool> or derived class. This is a
26ab719a 194container class for one or more replicated databases.
2bf79155 195
196=cut
197
26ab719a 198has 'pool' => (
64cdad22 199 is=>'ro',
0bbe6676 200 isa =>DBICStorageDBIReplicatedPool,
201 lazy=>1,
202 builder=>'_build_pool',
203 clearer=>'clear_pool',
64cdad22 204 handles=>[qw/
6f7344b8 205 connect_replicants
64cdad22 206 replicants
64cdad22 207 /],
2bf79155 208);
209
26ab719a 210=head2 balancer
2bf79155 211
370f78d4 212Is a L<DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class. This
213is a class that takes a pool (L<DBIx::Class::Storage::DBI::Replicated::Pool>)
2bf79155 214
26ab719a 215=cut
2bf79155 216
26ab719a 217has 'balancer' => (
dcdf7b2c 218 is=>'rw',
0bbe6676 219 isa => DoesDBICStorageReplicatedBalancer,
220 lazy=>1,
221 builder=>'_build_balancer',
64cdad22 222 handles=>[qw/auto_validate_every/],
26ab719a 223);
2bf79155 224
cb6ec758 225=head2 master
226
227The master defines the canonical state for a pool of connected databases. All
228the replicants are expected to match this databases state. Thus, in a classic
229Master / Slaves distributed system, all the slaves are expected to replicate
230the Master's state as quick as possible. This is the only database in the
231pool of databases that is allowed to handle write traffic.
232
233=cut
234
235has 'master' => (
64cdad22 236 is=> 'ro',
0bbe6676 237 isa => DBICStorageDBI,
238 lazy=>1,
239 builder=>'_build_master',
cb6ec758 240);
241
cb6ec758 242=head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
243
fd323bf1 244The following methods are delegated all the methods required for the
cb6ec758 245L<DBIx::Class::Storage::DBI> interface.
246
247=head2 read_handler
248
249Defines an object that implements the read side of L<BIx::Class::Storage::DBI>.
250
251=cut
252
253has 'read_handler' => (
64cdad22 254 is=>'rw',
41916570 255 isa=>Object,
0bbe6676 256 lazy=>1,
257 builder=>'_build_read_handler',
64cdad22 258 handles=>[qw/
259 select
260 select_single
261 columns_info_for
fd323bf1 262 _dbh_columns_info_for
3244fdcc 263 _select
6f7344b8 264 /],
cb6ec758 265);
266
cb6ec758 267=head2 write_handler
268
3244fdcc 269Defines an object that implements the write side of L<BIx::Class::Storage::DBI>,
270as well as methods that don't write or read that can be called on only one
271storage, methods that return a C<$dbh>, and any methods that don't make sense to
272run on a replicant.
cb6ec758 273
274=cut
275
276has 'write_handler' => (
64cdad22 277 is=>'ro',
41916570 278 isa=>Object,
0bbe6676 279 lazy=>1,
280 builder=>'_build_write_handler',
6f7344b8 281 handles=>[qw/
64cdad22 282 on_connect_do
6f7344b8 283 on_disconnect_do
3244fdcc 284 on_connect_call
285 on_disconnect_call
64cdad22 286 connect_info
3244fdcc 287 _connect_info
64cdad22 288 throw_exception
289 sql_maker
290 sqlt_type
291 create_ddl_dir
292 deployment_statements
293 datetime_parser
6f7344b8 294 datetime_parser_type
295 build_datetime_parser
64cdad22 296 last_insert_id
297 insert
298 insert_bulk
299 update
300 delete
301 dbh
2ce6e9a6 302 txn_begin
64cdad22 303 txn_do
304 txn_commit
305 txn_rollback
2ce6e9a6 306 txn_scope_guard
64cdad22 307 sth
308 deploy
0180bef9 309 with_deferred_fk_checks
6f7344b8 310 dbh_do
64cdad22 311 reload_row
2ce6e9a6 312 _prep_for_execute
7fb60fb1 313
6f7344b8 314 backup
315 is_datatype_numeric
316 _count_select
6f7344b8 317 _subq_update_delete
318 svp_rollback
319 svp_begin
320 svp_release
e398f77e 321 relname_to_table_alias
3244fdcc 322 _dbh_last_insert_id
323 _fix_bind_params
324 _default_dbi_connect_attributes
325 _dbi_connect_info
b9ca4ff1 326 _dbic_connect_attributes
3244fdcc 327 auto_savepoint
328 _sqlt_version_ok
329 _query_end
330 bind_attribute_by_data_type
331 transaction_depth
332 _dbh
333 _select_args
334 _dbh_execute_array
3244fdcc 335 _sql_maker
336 _query_start
337 _sqlt_version_error
338 _per_row_update_delete
339 _dbh_begin_work
340 _dbh_execute_inserts_with_no_binds
341 _select_args_to_query
342 _svp_generate_name
343 _multipk_update_delete
344 source_bind_attributes
345 _normalize_connect_info
346 _parse_connect_do
347 _dbh_commit
348 _execute_array
3244fdcc 349 savepoints
350 _sqlt_minimum_version
351 _sql_maker_opts
352 _conn_pid
3244fdcc 353 _dbh_autocommit
354 _native_data_type
355 _get_dbh
356 sql_maker_class
357 _dbh_rollback
358 _adjust_select_args_for_complex_prefetch
359 _resolve_ident_sources
360 _resolve_column_info
361 _prune_unused_joins
362 _strip_cond_qualifiers
3244fdcc 363 _resolve_aliastypes_from_select_args
364 _execute
365 _do_query
366 _dbh_sth
367 _dbh_execute
64cdad22 368 /],
cb6ec758 369);
370
e471ab87 371my @unimplemented = qw(
372 _arm_global_destructor
0bbe6676 373 _preserve_foreign_dbh
572338e0 374 _verify_pid
0bbe6676 375 _verify_tid
bbdda281 376
377 get_use_dbms_capability
378 set_use_dbms_capability
379 get_dbms_capability
380 set_dbms_capability
bbdda281 381 _dbh_details
382
7f4433eb 383 sql_limit_dialect
31a8aaaf 384
385 _inner_join_to_node
0a3441ee 386 _group_over_selection
bac358c9 387 _extract_order_criteria
153a0398 388 _is_lob_type
0bbe6676 389 _max_column_bytesize
390 _prefetch_autovalues
e471ab87 391);
392
7f4433eb 393# the capability framework
7f4433eb 394push @unimplemented, ( grep
395 { $_ =~ /^ _ (?: use | supports | determine_supports ) _ /x }
0bbe6676 396 @{Class::Inspector->methods('DBIx::Class::Storage::DBI')||[]}
7f4433eb 397);
398
e471ab87 399for my $method (@unimplemented) {
0bbe6676 400 {
401 no strict qw/refs/;
402 *{__PACKAGE__ ."::$method"} = subname $method => sub {
403 croak "$method must not be called on ".(blessed shift).' objects';
404 };
405 }
e471ab87 406}
6d766626 407
0bbe6676 408has _master_connect_info_opts => (
409 is => 'rw',
410 isa =>HashRef ,
411 default => sub { +{} },
412);
b2e4d522 413
414=head2 around: connect_info
415
48580715 416Preserves master's C<connect_info> options (for merging with replicants.)
417Also sets any Replicated-related options from connect_info, such as
dcdf7b2c 418C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>.
b2e4d522 419
420=cut
421
422around connect_info => sub {
423 my ($next, $self, $info, @extra) = @_;
424
282a9a4f 425 my $merge = Hash::Merge->new('LEFT_PRECEDENT');
e666c5fd 426
b2e4d522 427 my %opts;
428 for my $arg (@$info) {
429 next unless (reftype($arg)||'') eq 'HASH';
e666c5fd 430 %opts = %{ $merge->merge($arg, \%opts) };
b2e4d522 431 }
b2e4d522 432 delete $opts{dsn};
433
dcdf7b2c 434 if (@opts{qw/pool_type pool_args/}) {
435 $self->pool_type(delete $opts{pool_type})
436 if $opts{pool_type};
437
b88b85e7 438 $self->pool_args(
e666c5fd 439 $merge->merge((delete $opts{pool_args} || {}), $self->pool_args)
b88b85e7 440 );
dcdf7b2c 441
64ae1667 442 ## Since we possibly changed the pool_args, we need to clear the current
443 ## pool object so that next time it is used it will be rebuilt.
444 $self->clear_pool;
dcdf7b2c 445 }
446
447 if (@opts{qw/balancer_type balancer_args/}) {
448 $self->balancer_type(delete $opts{balancer_type})
449 if $opts{balancer_type};
450
b88b85e7 451 $self->balancer_args(
e666c5fd 452 $merge->merge((delete $opts{balancer_args} || {}), $self->balancer_args)
b88b85e7 453 );
dcdf7b2c 454
67c43863 455 $self->balancer($self->_build_balancer)
6f7344b8 456 if $self->balancer;
dcdf7b2c 457 }
458
b2e4d522 459 $self->_master_connect_info_opts(\%opts);
460
cca282b6 461 my @res;
462 if (wantarray) {
0ce2d0d5 463 @res = $self->$next($info, @extra);
464 } else {
cca282b6 465 $res[0] = $self->$next($info, @extra);
0ce2d0d5 466 }
467
fd4eb9c2 468 # Make sure master is blessed into the correct class and apply role to it.
469 my $master = $self->master;
470 $master->_determine_driver;
cea43436 471
0bbe6676 472 ## Moose::Meta::Class->initialize(ref $master);
473 Role::Tiny->apply_roles_to_object($master, 'DBIx::Class::Storage::DBI::Replicated::WithDSN');
474 ## DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master);
cea43436 475
476 # link pool back to master
477 $self->pool->master($master);
0ce2d0d5 478
cca282b6 479 wantarray ? @res : $res[0];
b2e4d522 480};
481
26ab719a 482=head1 METHODS
2bf79155 483
26ab719a 484This class defines the following methods.
2bf79155 485
0bbe6676 486=head2 new
2bf79155 487
faaba25f 488L<DBIx::Class::Schema> when instantiating its storage passed itself as the
2ce6e9a6 489first argument. So we need to massage the arguments a bit so that all the
490bits get put into the correct places.
2bf79155 491
492=cut
493
0bbe6676 494around 'new', sub {
495 my ($orig, $class, $schema, $storage_type_args, @args) = @_;
496 return $orig->(
497 $class,
498 schema => $schema,
6f7344b8 499 %$storage_type_args,
0bbe6676 500 @args,
501 );
502};
2bf79155 503
cb6ec758 504=head2 _build_master
2bf79155 505
cb6ec758 506Lazy builder for the L</master> attribute.
2bf79155 507
508=cut
509
cb6ec758 510sub _build_master {
2ce6e9a6 511 my $self = shift @_;
ee356d00 512 my $master = DBIx::Class::Storage::DBI->new($self->schema);
0bbe6676 513 return $master;
106d5f3b 514}
515
26ab719a 516=head2 _build_pool
2bf79155 517
26ab719a 518Lazy builder for the L</pool> attribute.
2bf79155 519
520=cut
521
26ab719a 522sub _build_pool {
64cdad22 523 my $self = shift @_;
524 $self->create_pool(%{$self->pool_args});
2bf79155 525}
526
26ab719a 527=head2 _build_balancer
2bf79155 528
cb6ec758 529Lazy builder for the L</balancer> attribute. This takes a Pool object so that
530the balancer knows which pool it's balancing.
2bf79155 531
532=cut
533
26ab719a 534sub _build_balancer {
64cdad22 535 my $self = shift @_;
536 $self->create_balancer(
6f7344b8 537 pool=>$self->pool,
64cdad22 538 master=>$self->master,
539 %{$self->balancer_args},
540 );
2bf79155 541}
542
cb6ec758 543=head2 _build_write_handler
2bf79155 544
cb6ec758 545Lazy builder for the L</write_handler> attribute. The default is to set this to
546the L</master>.
50336325 547
548=cut
549
cb6ec758 550sub _build_write_handler {
64cdad22 551 return shift->master;
cb6ec758 552}
50336325 553
cb6ec758 554=head2 _build_read_handler
2bf79155 555
cb6ec758 556Lazy builder for the L</read_handler> attribute. The default is to set this to
557the L</balancer>.
2bf79155 558
559=cut
560
cb6ec758 561sub _build_read_handler {
64cdad22 562 return shift->balancer;
cb6ec758 563}
50336325 564
cb6ec758 565=head2 around: connect_replicants
2bf79155 566
cb6ec758 567All calls to connect_replicants needs to have an existing $schema tacked onto
b2e4d522 568top of the args, since L<DBIx::Storage::DBI> needs it, and any C<connect_info>
569options merged with the master, with replicant opts having higher priority.
955a6df6 570
cb6ec758 571=cut
955a6df6 572
b2e4d522 573around connect_replicants => sub {
574 my ($next, $self, @args) = @_;
575
576 for my $r (@args) {
577 $r = [ $r ] unless reftype $r eq 'ARRAY';
578
1a58752c 579 $self->throw_exception('coderef replicant connect_info not supported')
b2e4d522 580 if ref $r->[0] && reftype $r->[0] eq 'CODE';
581
582# any connect_info options?
583 my $i = 0;
584 $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
585
6f7344b8 586# make one if none
b2e4d522 587 $r->[$i] = {} unless $r->[$i];
588
589# merge if two hashes
b88b85e7 590 my @hashes = @$r[$i .. $#{$r}];
591
1a58752c 592 $self->throw_exception('invalid connect_info options')
b88b85e7 593 if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes;
594
1a58752c 595 $self->throw_exception('too many hashrefs in connect_info')
b88b85e7 596 if @hashes > 2;
597
282a9a4f 598 my $merge = Hash::Merge->new('LEFT_PRECEDENT');
e666c5fd 599 my %opts = %{ $merge->merge(reverse @hashes) };
b88b85e7 600
601# delete them
b2e4d522 602 splice @$r, $i+1, ($#{$r} - $i), ();
603
0bd8e058 604# make sure master/replicants opts don't clash
605 my %master_opts = %{ $self->_master_connect_info_opts };
606 if (exists $opts{dbh_maker}) {
607 delete @master_opts{qw/dsn user password/};
608 }
609 delete $master_opts{dbh_maker};
610
b2e4d522 611# merge with master
e666c5fd 612 %opts = %{ $merge->merge(\%opts, \%master_opts) };
b2e4d522 613
614# update
615 $r->[$i] = \%opts;
616 }
617
618 $self->$next($self->schema, @args);
955a6df6 619};
2bf79155 620
2bf79155 621=head2 all_storages
622
623Returns an array of of all the connected storage backends. The first element
624in the returned array is the master, and the remainings are each of the
625replicants.
626
627=cut
628
629sub all_storages {
64cdad22 630 my $self = shift @_;
631 return grep {defined $_ && blessed $_} (
632 $self->master,
6412a592 633 values %{ $self->replicants },
64cdad22 634 );
2bf79155 635}
636
c4d3fae2 637=head2 execute_reliably ($coderef, ?@args)
638
639Given a coderef, saves the current state of the L</read_handler>, forces it to
48580715 640use reliable storage (e.g. sets it to the master), executes a coderef and then
c4d3fae2 641restores the original state.
642
643Example:
644
64cdad22 645 my $reliably = sub {
646 my $name = shift @_;
647 $schema->resultset('User')->create({name=>$name});
fd323bf1 648 my $user_rs = $schema->resultset('User')->find({name=>$name});
64cdad22 649 return $user_rs;
650 };
c4d3fae2 651
64cdad22 652 my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
c4d3fae2 653
654Use this when you must be certain of your database state, such as when you just
655inserted something and need to get a resultset including it, etc.
656
657=cut
658
659sub execute_reliably {
64cdad22 660 my ($self, $coderef, @args) = @_;
d4daee7b 661
64cdad22 662 unless( ref $coderef eq 'CODE') {
663 $self->throw_exception('Second argument must be a coderef');
664 }
d4daee7b 665
64cdad22 666 ##Get copy of master storage
667 my $master = $self->master;
d4daee7b 668
64cdad22 669 ##Get whatever the current read hander is
670 my $current = $self->read_handler;
d4daee7b 671
64cdad22 672 ##Set the read handler to master
673 $self->read_handler($master);
d4daee7b 674
64cdad22 675 ## do whatever the caller needs
676 my @result;
677 my $want_array = wantarray;
d4daee7b 678
ed7ab0f4 679 try {
64cdad22 680 if($want_array) {
681 @result = $coderef->(@args);
682 } elsif(defined $want_array) {
683 ($result[0]) = ($coderef->(@args));
ed213e85 684 } else {
64cdad22 685 $coderef->(@args);
6f7344b8 686 }
ed7ab0f4 687 } catch {
688 $self->throw_exception("coderef returned an error: $_");
689 } finally {
690 ##Reset to the original state
691 $self->read_handler($current);
64cdad22 692 };
d4daee7b 693
cca282b6 694 return wantarray ? @result : $result[0];
c4d3fae2 695}
696
cb6ec758 697=head2 set_reliable_storage
698
699Sets the current $schema to be 'reliable', that is all queries, both read and
700write are sent to the master
d4daee7b 701
cb6ec758 702=cut
703
704sub set_reliable_storage {
64cdad22 705 my $self = shift @_;
706 my $schema = $self->schema;
707 my $write_handler = $self->schema->storage->write_handler;
d4daee7b 708
64cdad22 709 $schema->storage->read_handler($write_handler);
cb6ec758 710}
711
712=head2 set_balanced_storage
713
714Sets the current $schema to be use the </balancer> for all reads, while all
48580715 715writes are sent to the master only
d4daee7b 716
cb6ec758 717=cut
718
719sub set_balanced_storage {
64cdad22 720 my $self = shift @_;
721 my $schema = $self->schema;
bd5da369 722 my $balanced_handler = $self->schema->storage->balancer;
d4daee7b 723
bd5da369 724 $schema->storage->read_handler($balanced_handler);
cb6ec758 725}
2bf79155 726
727=head2 connected
728
729Check that the master and at least one of the replicants is connected.
730
731=cut
732
733sub connected {
64cdad22 734 my $self = shift @_;
735 return
736 $self->master->connected &&
737 $self->pool->connected_replicants;
2bf79155 738}
739
2bf79155 740=head2 ensure_connected
741
742Make sure all the storages are connected.
743
744=cut
745
746sub ensure_connected {
64cdad22 747 my $self = shift @_;
748 foreach my $source ($self->all_storages) {
749 $source->ensure_connected(@_);
750 }
2bf79155 751}
752
2bf79155 753=head2 limit_dialect
754
755Set the limit_dialect for all existing storages
756
757=cut
758
759sub limit_dialect {
64cdad22 760 my $self = shift @_;
761 foreach my $source ($self->all_storages) {
762 $source->limit_dialect(@_);
763 }
f3e9f010 764 return $self->master->limit_dialect;
2bf79155 765}
766
2bf79155 767=head2 quote_char
768
769Set the quote_char for all existing storages
770
771=cut
772
773sub quote_char {
64cdad22 774 my $self = shift @_;
775 foreach my $source ($self->all_storages) {
776 $source->quote_char(@_);
777 }
3fbe08e3 778 return $self->master->quote_char;
2bf79155 779}
780
2bf79155 781=head2 name_sep
782
783Set the name_sep for all existing storages
784
785=cut
786
787sub name_sep {
64cdad22 788 my $self = shift @_;
789 foreach my $source ($self->all_storages) {
790 $source->name_sep(@_);
791 }
3fbe08e3 792 return $self->master->name_sep;
2bf79155 793}
794
2bf79155 795=head2 set_schema
796
797Set the schema object for all existing storages
798
799=cut
800
801sub set_schema {
64cdad22 802 my $self = shift @_;
803 foreach my $source ($self->all_storages) {
804 $source->set_schema(@_);
805 }
2bf79155 806}
807
2bf79155 808=head2 debug
809
810set a debug flag across all storages
811
812=cut
813
814sub debug {
64cdad22 815 my $self = shift @_;
3fbe08e3 816 if(@_) {
817 foreach my $source ($self->all_storages) {
818 $source->debug(@_);
6f7344b8 819 }
64cdad22 820 }
3fbe08e3 821 return $self->master->debug;
2bf79155 822}
823
2bf79155 824=head2 debugobj
825
cea43436 826set a debug object
2bf79155 827
828=cut
829
830sub debugobj {
64cdad22 831 my $self = shift @_;
cea43436 832 return $self->master->debugobj(@_);
2bf79155 833}
834
2bf79155 835=head2 debugfh
836
cea43436 837set a debugfh object
2bf79155 838
839=cut
840
841sub debugfh {
64cdad22 842 my $self = shift @_;
cea43436 843 return $self->master->debugfh(@_);
2bf79155 844}
845
2bf79155 846=head2 debugcb
847
cea43436 848set a debug callback
2bf79155 849
850=cut
851
852sub debugcb {
64cdad22 853 my $self = shift @_;
cea43436 854 return $self->master->debugcb(@_);
2bf79155 855}
856
2bf79155 857=head2 disconnect
858
859disconnect everything
860
861=cut
862
863sub disconnect {
64cdad22 864 my $self = shift @_;
865 foreach my $source ($self->all_storages) {
866 $source->disconnect(@_);
867 }
2bf79155 868}
869
b2e4d522 870=head2 cursor_class
871
872set cursor class on all storages, or return master's
873
874=cut
875
876sub cursor_class {
877 my ($self, $cursor_class) = @_;
878
879 if ($cursor_class) {
880 $_->cursor_class($cursor_class) for $self->all_storages;
881 }
882 $self->master->cursor_class;
883}
d4daee7b 884
3244fdcc 885=head2 cursor
886
887set cursor class on all storages, or return master's, alias for L</cursor_class>
888above.
889
890=cut
891
892sub cursor {
893 my ($self, $cursor_class) = @_;
894
895 if ($cursor_class) {
896 $_->cursor($cursor_class) for $self->all_storages;
897 }
898 $self->master->cursor;
899}
900
901=head2 unsafe
902
903sets the L<DBIx::Class::Storage::DBI/unsafe> option on all storages or returns
904master's current setting
905
906=cut
907
908sub unsafe {
909 my $self = shift;
910
911 if (@_) {
912 $_->unsafe(@_) for $self->all_storages;
913 }
914
915 return $self->master->unsafe;
916}
917
918=head2 disable_sth_caching
919
920sets the L<DBIx::Class::Storage::DBI/disable_sth_caching> option on all storages
921or returns master's current setting
922
923=cut
924
925sub disable_sth_caching {
926 my $self = shift;
927
928 if (@_) {
929 $_->disable_sth_caching(@_) for $self->all_storages;
930 }
931
932 return $self->master->disable_sth_caching;
933}
934
935=head2 lag_behind_master
936
937returns the highest Replicant L<DBIx::Class::Storage::DBI/lag_behind_master>
938setting
939
940=cut
941
942sub lag_behind_master {
943 my $self = shift;
944
945 return max map $_->lag_behind_master, $self->replicants;
fd323bf1 946}
3244fdcc 947
948=head2 is_replicating
949
950returns true if all replicants return true for
951L<DBIx::Class::Storage::DBI/is_replicating>
952
953=cut
954
955sub is_replicating {
956 my $self = shift;
957
958 return (grep $_->is_replicating, $self->replicants) == ($self->replicants);
959}
960
961=head2 connect_call_datetime_setup
962
963calls L<DBIx::Class::Storage::DBI/connect_call_datetime_setup> for all storages
964
965=cut
966
967sub connect_call_datetime_setup {
968 my $self = shift;
969 $_->connect_call_datetime_setup for $self->all_storages;
970}
971
972sub _populate_dbh {
973 my $self = shift;
974 $_->_populate_dbh for $self->all_storages;
975}
976
977sub _connect {
978 my $self = shift;
979 $_->_connect for $self->all_storages;
980}
981
982sub _rebless {
983 my $self = shift;
984 $_->_rebless for $self->all_storages;
985}
986
987sub _determine_driver {
988 my $self = shift;
989 $_->_determine_driver for $self->all_storages;
990}
991
992sub _driver_determined {
993 my $self = shift;
fd323bf1 994
3244fdcc 995 if (@_) {
996 $_->_driver_determined(@_) for $self->all_storages;
997 }
998
999 return $self->master->_driver_determined;
1000}
1001
1002sub _init {
1003 my $self = shift;
fd323bf1 1004
3244fdcc 1005 $_->_init for $self->all_storages;
1006}
1007
1008sub _run_connection_actions {
1009 my $self = shift;
fd323bf1 1010
3244fdcc 1011 $_->_run_connection_actions for $self->all_storages;
1012}
1013
1014sub _do_connection_actions {
1015 my $self = shift;
fd323bf1 1016
3244fdcc 1017 if (@_) {
1018 $_->_do_connection_actions(@_) for $self->all_storages;
1019 }
1020}
1021
1022sub connect_call_do_sql {
1023 my $self = shift;
1024 $_->connect_call_do_sql(@_) for $self->all_storages;
1025}
1026
1027sub disconnect_call_do_sql {
1028 my $self = shift;
1029 $_->disconnect_call_do_sql(@_) for $self->all_storages;
1030}
1031
1032sub _seems_connected {
1033 my $self = shift;
1034
1035 return min map $_->_seems_connected, $self->all_storages;
1036}
1037
1038sub _ping {
1039 my $self = shift;
1040
1041 return min map $_->_ping, $self->all_storages;
1042}
1043
bbdda281 1044# not using the normalized_version, because we want to preserve
1045# version numbers much longer than the conventional xxx.yyyzzz
7da56142 1046my $numify_ver = sub {
1047 my $ver = shift;
1048 my @numparts = split /\D+/, $ver;
bbdda281 1049 my $format = '%d.' . (join '', ('%06d') x (@numparts - 1));
7da56142 1050
1051 return sprintf $format, @numparts;
1052};
fecb38cb 1053sub _server_info {
1054 my $self = shift;
1055
bbdda281 1056 if (not $self->_dbh_details->{info}) {
1057 $self->_dbh_details->{info} = (
fd323bf1 1058 reduce { $a->[0] < $b->[0] ? $a : $b }
7da56142 1059 map [ $numify_ver->($_->{dbms_version}), $_ ],
1060 map $_->_server_info, $self->all_storages
1061 )->[1];
fecb38cb 1062 }
1063
bbdda281 1064 return $self->next::method;
fecb38cb 1065}
1066
1067sub _get_server_version {
1068 my $self = shift;
1069
1070 return $self->_server_info->{dbms_version};
1071}
1072
7e38d850 1073=head1 GOTCHAS
1074
1075Due to the fact that replicants can lag behind a master, you must take care to
1076make sure you use one of the methods to force read queries to a master should
1077you need realtime data integrity. For example, if you insert a row, and then
1078immediately re-read it from the database (say, by doing $row->discard_changes)
1079or you insert a row and then immediately build a query that expects that row
1080to be an item, you should force the master to handle reads. Otherwise, due to
1081the lag, there is no certainty your data will be in the expected state.
1082
1083For data integrity, all transactions automatically use the master storage for
1084all read and write queries. Using a transaction is the preferred and recommended
1085method to force the master to handle all read queries.
1086
1087Otherwise, you can force a single query to use the master with the 'force_pool'
1088attribute:
1089
1090 my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk);
1091
1092This attribute will safely be ignore by non replicated storages, so you can use
1093the same code for both types of systems.
1094
1095Lastly, you can use the L</execute_reliably> method, which works very much like
1096a transaction.
1097
1098For debugging, you can turn replication on/off with the methods L</set_reliable_storage>
1099and L</set_balanced_storage>, however this operates at a global level and is not
1100suitable if you have a shared Schema object being used by multiple processes,
1101such as on a web application server. You can get around this limitation by
1102using the Schema clone method.
1103
1104 my $new_schema = $schema->clone;
1105 $new_schema->set_reliable_storage;
d4daee7b 1106
7e38d850 1107 ## $new_schema will use only the Master storage for all reads/writes while
1108 ## the $schema object will use replicated storage.
1109
f5d3a5de 1110=head1 AUTHOR
1111
0bbe6676 1112 John Napiorkowski <jjnapiork@cpan.org>
f5d3a5de 1113
c4d3fae2 1114Based on code originated by:
f5d3a5de 1115
64cdad22 1116 Norbert Csongrádi <bert@cpan.org>
1117 Peter Siklósi <einon@einon.hu>
2156bbdd 1118
f5d3a5de 1119=head1 LICENSE
1120
1121You may distribute this code under the same terms as Perl itself.
1122
1123=cut
1124
11251;