From: Peter Rabbitson Date: Fri, 9 Dec 2011 16:35:53 +0000 (+0100) Subject: Rewrite txn_do and dbh_do to use a (hidden for now) blockrunner X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=commitdiff_plain;h=9345b14c6c86aa8888bf5d47a569ee9bbde24f47;p=dbsrgits%2FDBIx-Class-Historic.git Rewrite txn_do and dbh_do to use a (hidden for now) blockrunner --- diff --git a/Makefile.PL b/Makefile.PL index 28d5dac..f2fff8b 100644 --- a/Makefile.PL +++ b/Makefile.PL @@ -58,6 +58,7 @@ my $runtime_requires = { 'Data::Dumper::Concise' => '2.020', 'Data::Page' => '2.00', 'Hash::Merge' => '0.12', + 'Moo' => '0.009013', 'MRO::Compat' => '0.09', 'Module::Find' => '0.06', 'namespace::clean' => '0.20', @@ -89,6 +90,17 @@ my $test_requires = { 'Package::Stash' => '0.28', }; +# make strictures.pm happy (DO NOT LIKE, TOO MUCH XS!) +# (i.e. what if the .git/.svn is *not* because of DBIC?) +# +# Note - this is added as test_requires *directly*, so it gets properly +# excluded on META.yml cleansing +if (-e '.git' or -e '.svn') { + test_requires 'indirect' => '0.25'; + test_requires 'multidimensional' => '0.008'; + test_requires 'bareword::filehandles' => '0.003'; +} + # Bail out on parallel testing if ( ($ENV{HARNESS_OPTIONS}||'') =~ / (?: ^ | \: ) j(\d+) /x diff --git a/lib/DBIx/Class.pm b/lib/DBIx/Class.pm index caa8a49..7a8c313 100644 --- a/lib/DBIx/Class.pm +++ b/lib/DBIx/Class.pm @@ -71,7 +71,7 @@ use base qw/DBIx::Class::Componentised DBIx::Class::AccessorGroup/; use DBIx::Class::StartupCheck; __PACKAGE__->mk_group_accessors(inherited => '_skip_namespace_frames'); -__PACKAGE__->_skip_namespace_frames('^DBIx::Class|^SQL::Abstract|^Try::Tiny|^Class::Accessor::Grouped$'); +__PACKAGE__->_skip_namespace_frames('^DBIx::Class|^SQL::Abstract|^Try::Tiny|^Class::Accessor::Grouped|^Context::Preserve'); sub mk_classdata { shift->mk_classaccessor(@_); diff --git a/lib/DBIx/Class/Storage.pm b/lib/DBIx/Class/Storage.pm index 0e162cf..a3ae532 100644 --- a/lib/DBIx/Class/Storage.pm +++ b/lib/DBIx/Class/Storage.pm @@ -13,6 +13,7 @@ use mro 'c3'; } use DBIx::Class::Carp; +use DBIx::Class::Storage::BlockRunner; use Scalar::Util qw/blessed weaken/; use DBIx::Class::Storage::TxnScopeGuard; use Try::Tiny; @@ -176,86 +177,13 @@ sub txn_do { my $self = shift; my $coderef = shift; - ref $coderef eq 'CODE' or $self->throw_exception - ('$coderef must be a CODE reference'); - - my $abort_txn = sub { - my ($self, $exception) = @_; - - my $rollback_exception = try { $self->txn_rollback; undef } catch { shift }; - - if ( $rollback_exception and ( - ! defined blessed $rollback_exception - or - ! $rollback_exception->isa('DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION') - ) ) { - $self->throw_exception( - "Transaction aborted: ${exception}. " - . "Rollback failed: ${rollback_exception}" - ); - } - $self->throw_exception($exception); - }; - - # take a ref instead of a copy, to preserve coderef @_ aliasing semantics - my $args = \@_; - - # do not turn on until a succesful txn_begin - my $attempt_commit = 0; - - my $txn_init_depth = $self->transaction_depth; - - try { - $self->txn_begin; - $attempt_commit = 1; - $coderef->(@$args) - } - catch { - $attempt_commit = 0; - - # init depth of > 0 implies nesting or non-autocommit (either way no retry) - if($txn_init_depth or $self->connected ) { - $abort_txn->($self, $_); - } - else { - carp "Retrying txn_do($coderef) after catching disconnected exception: $_" - if $ENV{DBIC_STORAGE_RETRY_DEBUG}; - - $self->_populate_dbh; - - # if txn_depth is > 1 this means something was done to the - # original $dbh, otherwise we would not get past the if() above - $self->throw_exception(sprintf - 'Unexpected transaction depth of %d on freshly connected handle', - $self->transaction_depth, - ) if $self->transaction_depth; - - $self->txn_begin; - $attempt_commit = 1; - - try { - $coderef->(@$args) - } - catch { - $attempt_commit = 0; - $abort_txn->($self, $_) - }; - }; - } - finally { - if ($attempt_commit) { - my $delta_txn = (1 + $txn_init_depth) - $self->transaction_depth; - - if ($delta_txn) { - # a rollback in a top-level txn_do is valid-ish (seen in the wild and our own tests) - carp "Unexpected reduction of transaction depth by $delta_txn after execution of $coderef, skipping txn_do's commit" - unless $delta_txn == 1 and $self->transaction_depth == 0; - } - else { - $self->txn_commit; - } - } - }; + DBIx::Class::Storage::BlockRunner->new( + storage => $self, + run_code => $coderef, + run_args => \@_, # take a ref instead of a copy, to preserve coderef @_ aliasing semantics + wrap_txn => 1, + retry_handler => sub { ! ( $_[0]->retried_count or $_[0]->storage->connected ) }, + )->run; } =head2 txn_begin diff --git a/lib/DBIx/Class/Storage/BlockRunner.pm b/lib/DBIx/Class/Storage/BlockRunner.pm new file mode 100644 index 0000000..fe2d221 --- /dev/null +++ b/lib/DBIx/Class/Storage/BlockRunner.pm @@ -0,0 +1,232 @@ +package # hide from pause until we figure it all out + DBIx::Class::Storage::BlockRunner; + +use Sub::Quote 'quote_sub'; +use DBIx::Class::Exception; +use DBIx::Class::Carp; +use Context::Preserve 'preserve_context'; +use Scalar::Util qw/weaken blessed/; +use Try::Tiny; +use Moo; +use namespace::clean; + +=head1 NAME + +DBIx::Class::Storage::BlockRunner - Try running a block of code until success with a configurable retry logic + +=head1 DESCRIPTION + +=head1 METHODS + +=cut + +has storage => ( + is => 'ro', + required => 1, +); + +has wrap_txn => ( + is => 'ro', + required => 1, +); + +# true - retry, false - rethrow, or you can throw your own (not catching) +has retry_handler => ( + is => 'ro', + required => 1, + isa => quote_sub( q| + (ref $_[0]) eq 'CODE' + or DBIx::Class::Exception->throw('retry_handler must be a CODE reference') + |), +); + +has run_code => ( + is => 'ro', + required => 1, + isa => quote_sub( q| + (ref $_[0]) eq 'CODE' + or DBIx::Class::Exception->throw('run_code must be a CODE reference') + |), +); + +has run_args => ( + is => 'ro', + isa => quote_sub( q| + (ref $_[0]) eq 'ARRAY' + or DBIx::Class::Exception->throw('run_args must be an ARRAY reference') + |), + default => quote_sub( '[]' ), +); + +has retry_debug => ( + is => 'rw', + default => quote_sub( '$ENV{DBIC_STORAGE_RETRY_DEBUG}' ), +); + +has max_retried_count => ( + is => 'ro', + default => quote_sub( '20' ), +); + +has retried_count => ( + is => 'ro', + init_arg => undef, + writer => '_set_retried_count', + clearer => '_reset_retried_count', + default => quote_sub(q{ 0 }), + lazy => 1, + trigger => quote_sub(q{ + DBIx::Class::Exception->throw(sprintf ( + 'Exceeded max_retried_count amount of %d, latest exception: %s', + $_[0]->max_retried_count, $_[0]->last_exception + )) if $_[0]->max_retried_count < ($_[1]||0); + }), +); + +has exception_stack => ( + is => 'ro', + init_arg => undef, + clearer => '_reset_exception_stack', + default => quote_sub(q{ [] }), + lazy => 1, +); + +sub last_exception { shift->exception_stack->[-1] } + +sub run { + my $self = shift; + + DBIx::Class::Exception->throw('run() takes no arguments') if @_; + + $self->_reset_exception_stack; + $self->_reset_retried_count; + my $storage = $self->storage; + + return $self->run_code->( @{$self->run_args} ) + if (! $self->wrap_txn and $storage->{_in_do_block}); + + local $storage->{_in_do_block} = 1 unless $storage->{_in_do_block}; + + return $self->_run; +} + +# this is the actual recursing worker +sub _run { + # warnings here mean I did not anticipate some ueber-complex case + # fatal warnings are not warranted + no warnings; + use warnings; + + my $self = shift; + + # from this point on (defined $txn_init_depth) is an indicator for wrap_txn + # save a bit on method calls + my $txn_init_depth = $self->wrap_txn ? $self->storage->transaction_depth : undef; + my $txn_begin_ok; + + my $run_err = ''; + + weaken (my $weakself = $self); + + return preserve_context { + try { + if (defined $txn_init_depth) { + $weakself->storage->txn_begin; + $txn_begin_ok = 1; + } + $weakself->run_code->( @{$weakself->run_args} ); + } catch { + $run_err = $_; + (); # important, affects @_ below + }; + } replace => sub { + my @res = @_; + + my $storage = $weakself->storage; + my $cur_depth = $storage->transaction_depth; + + if (defined $txn_init_depth and $run_err eq '') { + my $delta_txn = (1 + $txn_init_depth) - $cur_depth; + + if ($delta_txn) { + # a rollback in a top-level txn_do is valid-ish (seen in the wild and our own tests) + carp (sprintf + 'Unexpected reduction of transaction depth by %d after execution of ' + . '%s, skipping txn_commit()', + $delta_txn, + $weakself->run_code, + ) unless $delta_txn == 1 and $cur_depth == 0; + } + else { + $run_err = eval { $storage->txn_commit; 1 } ? '' : $@; + } + } + + # something above threw an error (could be the begin, the code or the commit) + if ($run_err ne '') { + + # attempt a rollback if we did begin in the first place + if ($txn_begin_ok) { + # some DBDs go crazy if there is nothing to roll back on, perform a soft-check + my $rollback_exception = $storage->_seems_connected + ? (! eval { $storage->txn_rollback; 1 }) ? $@ : '' + : 'lost connection to storage' + ; + + if ( $rollback_exception and ( + ! defined blessed $rollback_exception + or + ! $rollback_exception->isa('DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION') + ) ) { + $run_err = "Transaction aborted: $run_err. Rollback failed: $rollback_exception"; + } + } + + push @{ $weakself->exception_stack }, $run_err; + + # init depth of > 0 ( > 1 with AC) implies nesting - no retry attempt queries + $storage->throw_exception($run_err) if ( + ( + defined $txn_init_depth + and + # FIXME - we assume that $storage->{_dbh_autocommit} is there if + # txn_init_depth is there, but this is a DBI-ism + $txn_init_depth > ( $storage->{_dbh_autocommit} ? 0 : 1 ) + ) or ! $weakself->retry_handler->($weakself) + ); + + $weakself->_set_retried_count($weakself->retried_count + 1); + + # we got that far - let's retry + carp( sprintf 'Retrying %s (run %d) after caught exception: %s', + $weakself->run_code, + $weakself->retried_count + 1, + $run_err, + ) if $weakself->retry_debug; + + $storage->ensure_connected; + # if txn_depth is > 1 this means something was done to the + # original $dbh, otherwise we would not get past the preceeding if() + $storage->throw_exception(sprintf + 'Unexpected transaction depth of %d on freshly connected handle', + $storage->transaction_depth, + ) if (defined $txn_init_depth and $storage->transaction_depth); + + return $weakself->_run; + } + + return wantarray ? @res : $res[0]; + }; +} + +=head1 AUTHORS + +see L + +=head1 LICENSE + +You may distribute this code under the same terms as Perl itself. + +=cut + +1; diff --git a/lib/DBIx/Class/Storage/DBI.pm b/lib/DBIx/Class/Storage/DBI.pm index 7041e9d..b7e969f 100644 --- a/lib/DBIx/Class/Storage/DBI.pm +++ b/lib/DBIx/Class/Storage/DBI.pm @@ -779,37 +779,28 @@ Example: sub dbh_do { my $self = shift; - my $code = shift; + my $run_target = shift; - my $dbh = $self->_get_dbh; - - return $self->$code($dbh, @_) - if ( $self->{_in_do_block} || $self->{transaction_depth} ); - - local $self->{_in_do_block} = 1; + # short circuit when we know there is no need for a runner + # + # FIXME - asumption may be wrong + # the rationale for the txn_depth check is that if this block is a part + # of a larger transaction, everything up to that point is screwed anyway + return $self->$run_target($self->_get_dbh, @_) + if $self->{_in_do_block} or $self->transaction_depth; - # take a ref instead of a copy, to preserve coderef @_ aliasing semantics my $args = \@_; - try { - $self->$code ($dbh, @$args); - } catch { - $self->throw_exception($_) if $self->connected; - - # We were not connected - reconnect and retry, but let any - # exception fall right through this time - carp "Retrying dbh_do($code) after catching disconnected exception: $_" - if $ENV{DBIC_STORAGE_RETRY_DEBUG}; - - $self->_populate_dbh; - $self->$code($self->_dbh, @$args); - }; + DBIx::Class::Storage::BlockRunner->new( + storage => $self, + run_code => sub { $self->$run_target ($self->_get_dbh, @$args ) }, + wrap_txn => 0, + retry_handler => sub { ! ( $_[0]->retried_count or $_[0]->storage->connected ) }, + )->run; } sub txn_do { - # connects or reconnects on pid change, necessary to grab correct txn_depth - $_[0]->_get_dbh; - local $_[0]->{_in_do_block} = 1; + $_[0]->_get_dbh; # connects or reconnects on pid change, necessary to grab correct txn_depth shift->next::method(@_); } diff --git a/t/52leaks.t b/t/52leaks.t index aefd001..d76fa38 100644 --- a/t/52leaks.t +++ b/t/52leaks.t @@ -140,7 +140,7 @@ my @compose_ns_classes; # txn_do to invoke more codepaths my ($mc_row_obj, $pager, $pager_explicit_count) = $schema->txn_do (sub { - my $artist = $rs->create ({ + my $artist = $schema->resultset('Artist')->create ({ name => 'foo artist', cds => [{ title => 'foo cd', @@ -160,7 +160,24 @@ my @compose_ns_classes; return ($artist, $pg, $pg_wcount); }); - # same for dbh_do + # more codepaths - error handling in txn_do + { + eval { $schema->txn_do ( sub { + $storage->_dbh->begin_work; + fail ('how did we get so far?!'); + } ) }; + + eval { $schema->txn_do ( sub { + $schema->txn_do ( sub { + die "It's called EXCEPTION"; + fail ('how did we get so far?!'); + } ); + fail ('how did we get so far?!'); + } ) }; + like( $@, qr/It\'s called EXCEPTION/, 'Exception correctly propagated in nested txn_do' ); + } + + # dbh_do codepath my ($rs_bind_circref, $cond_rowobj) = $schema->storage->dbh_do ( sub { my $row = $_[0]->schema->resultset('Artist')->new({}); my $rs = $_[0]->schema->resultset('Artist')->search({ @@ -350,6 +367,10 @@ for my $slot (keys %$weak_registry) { # T::B 2.0 has result objects and other fancyness delete $weak_registry->{$slot}; } + elsif ($slot =~ /^Method::Generate::(?:Accessor|Constructor)/) { + # Moo keeps globals around, this is normal + delete $weak_registry->{$slot}; + } elsif ($slot =~ /^SQL::Translator/) { # SQLT is a piece of shit, leaks all over delete $weak_registry->{$slot}; diff --git a/t/53lean_startup.t b/t/53lean_startup.t index 7e81af2..8af340a 100644 --- a/t/53lean_startup.t +++ b/t/53lean_startup.t @@ -41,6 +41,7 @@ BEGIN { namespace::clean Try::Tiny + Context::Preserve Sub::Name Scalar::Util @@ -55,6 +56,8 @@ BEGIN { Class::Accessor::Grouped Class::C3::Componentised + Moo + Sub::Quote /, $] < 5.010 ? ( 'Class::C3', 'MRO::Compat' ) : () }; # this is special-cased in DBIx/Class.pm $test_hook = sub { diff --git a/t/55namespaces_cleaned.t b/t/55namespaces_cleaned.t index 6706966..c8a2f75 100644 --- a/t/55namespaces_cleaned.t +++ b/t/55namespaces_cleaned.t @@ -76,6 +76,9 @@ my $skip_idx = { map { $_ => 1 } ( # this subclass is expected to inherit whatever crap comes # from the parent 'DBIx::Class::ResultSet::Pager', + + # Moo does not name its generated methods, fix pending + 'DBIx::Class::Storage::BlockRunner', ) }; my $has_cmop = eval { require Class::MOP }; @@ -115,7 +118,11 @@ for my $mod (@modules) { my $origin = $gv->STASH->NAME; TODO: { - local $TODO = 'CAG does not clean its BEGIN constants' if $name =~ /^__CAG_/; + local $TODO; + if ($name =~ /^__CAG_/) { + $TODO = 'CAG does not clean its BEGIN constants'; + } + is ($gv->NAME, $name, "Properly named $name method at $origin" . ($origin eq $mod ? '' : " (inherited by $mod)" diff --git a/t/storage/reconnect.t b/t/storage/reconnect.t index eca17cf..7416486 100644 --- a/t/storage/reconnect.t +++ b/t/storage/reconnect.t @@ -7,8 +7,6 @@ use Test::More; use lib qw(t/lib); use DBICTest; -plan tests => 6; - my $db_orig = "$FindBin::Bin/../var/DBIxClass.db"; my $db_tmp = "$db_orig.tmp"; @@ -71,3 +69,44 @@ SKIP: { ok( !$@, 'The operation succeeded' ); cmp_ok( @art_four, '==', 3, "Three artists returned" ); } + +# check that reconnection contexts are preserved in txn_do / dbh_do + +my $args = [1, 2, 3]; + +my $ctx_map = { + VOID => { + invoke => sub { shift->(); 1 }, + wa => undef, + }, + SCALAR => { + invoke => sub { my $foo = shift->() }, + wa => '', + }, + LIST => { + invoke => sub { my @foo = shift->() }, + wa => 1, + }, +}; + +for my $ctx (keys $ctx_map) { + + # start disconnected and then connected + $schema->storage->disconnect; + for (1, 2) { + my $disarmed; + + $ctx_map->{$ctx}{invoke}->(sub { $schema->txn_do(sub { + is_deeply (\@_, $args, 'Args propagated correctly' ); + + is (wantarray(), $ctx_map->{$ctx}{wa}, "Correct $ctx context"); + + # this will cause a retry + $schema->storage->_dbh->disconnect unless $disarmed++; + + isa_ok ($schema->resultset('Artist')->next, 'DBICTest::Artist'); + }, @$args) }); + } +}; + +done_testing; diff --git a/xt/podcoverage.t b/xt/podcoverage.t index 0e07ece..17bb7ed 100644 --- a/xt/podcoverage.t +++ b/xt/podcoverage.t @@ -125,6 +125,7 @@ my $exceptions = { 'DBIx::Class::ResultSource::*' => { skip => 1 }, 'DBIx::Class::Storage::Statistics' => { skip => 1 }, 'DBIx::Class::Storage::DBI::Replicated::Types' => { skip => 1 }, + 'DBIx::Class::Storage::BlockRunner' => { skip => 1 }, # temporary # test some specific components whose parents are exempt below 'DBIx::Class::Relationship::Base' => {},