Rewrite txn_do and dbh_do to use a (hidden for now) blockrunner
Peter Rabbitson [Fri, 9 Dec 2011 16:35:53 +0000 (17:35 +0100)]
Makefile.PL
lib/DBIx/Class.pm
lib/DBIx/Class/Storage.pm
lib/DBIx/Class/Storage/BlockRunner.pm [new file with mode: 0644]
lib/DBIx/Class/Storage/DBI.pm
t/52leaks.t
t/53lean_startup.t
t/55namespaces_cleaned.t
t/storage/reconnect.t
xt/podcoverage.t

index 28d5dac..f2fff8b 100644 (file)
@@ -58,6 +58,7 @@ my $runtime_requires = {
   'Data::Dumper::Concise'    => '2.020',
   'Data::Page'               => '2.00',
   'Hash::Merge'              => '0.12',
+  'Moo'                      => '0.009013',
   'MRO::Compat'              => '0.09',
   'Module::Find'             => '0.06',
   'namespace::clean'         => '0.20',
@@ -89,6 +90,17 @@ my $test_requires = {
   'Package::Stash'           => '0.28',
 };
 
+# make strictures.pm happy (DO NOT LIKE, TOO MUCH XS!)
+# (i.e. what if the .git/.svn is *not* because of DBIC?)
+#
+# Note - this is added as test_requires *directly*, so it gets properly
+# excluded on META.yml cleansing
+if (-e '.git' or -e '.svn') {
+  test_requires 'indirect'              => '0.25';
+  test_requires 'multidimensional'      => '0.008';
+  test_requires 'bareword::filehandles' => '0.003';
+}
+
 # Bail out on parallel testing
 if (
   ($ENV{HARNESS_OPTIONS}||'') =~ / (?: ^ | \: ) j(\d+) /x
index caa8a49..7a8c313 100644 (file)
@@ -71,7 +71,7 @@ use base qw/DBIx::Class::Componentised DBIx::Class::AccessorGroup/;
 use DBIx::Class::StartupCheck;
 
 __PACKAGE__->mk_group_accessors(inherited => '_skip_namespace_frames');
-__PACKAGE__->_skip_namespace_frames('^DBIx::Class|^SQL::Abstract|^Try::Tiny|^Class::Accessor::Grouped$');
+__PACKAGE__->_skip_namespace_frames('^DBIx::Class|^SQL::Abstract|^Try::Tiny|^Class::Accessor::Grouped|^Context::Preserve');
 
 sub mk_classdata {
   shift->mk_classaccessor(@_);
index 0e162cf..a3ae532 100644 (file)
@@ -13,6 +13,7 @@ use mro 'c3';
 }
 
 use DBIx::Class::Carp;
+use DBIx::Class::Storage::BlockRunner;
 use Scalar::Util qw/blessed weaken/;
 use DBIx::Class::Storage::TxnScopeGuard;
 use Try::Tiny;
@@ -176,86 +177,13 @@ sub txn_do {
   my $self = shift;
   my $coderef = shift;
 
-  ref $coderef eq 'CODE' or $self->throw_exception
-    ('$coderef must be a CODE reference');
-
-  my $abort_txn = sub {
-    my ($self, $exception) = @_;
-
-    my $rollback_exception = try { $self->txn_rollback; undef } catch { shift };
-
-    if ( $rollback_exception and (
-      ! defined blessed $rollback_exception
-          or
-      ! $rollback_exception->isa('DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION')
-    ) ) {
-      $self->throw_exception(
-        "Transaction aborted: ${exception}. "
-        . "Rollback failed: ${rollback_exception}"
-      );
-    }
-    $self->throw_exception($exception);
-  };
-
-  # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
-  my $args = \@_;
-
-  # do not turn on until a succesful txn_begin
-  my $attempt_commit = 0;
-
-  my $txn_init_depth = $self->transaction_depth;
-
-  try {
-    $self->txn_begin;
-    $attempt_commit = 1;
-    $coderef->(@$args)
-  }
-  catch {
-    $attempt_commit = 0;
-
-    # init depth of > 0 implies nesting or non-autocommit (either way no retry)
-    if($txn_init_depth or $self->connected ) {
-      $abort_txn->($self, $_);
-    }
-    else {
-      carp "Retrying txn_do($coderef) after catching disconnected exception: $_"
-        if $ENV{DBIC_STORAGE_RETRY_DEBUG};
-
-      $self->_populate_dbh;
-
-      # if txn_depth is > 1 this means something was done to the
-      # original $dbh, otherwise we would not get past the if() above
-      $self->throw_exception(sprintf
-        'Unexpected transaction depth of %d on freshly connected handle',
-        $self->transaction_depth,
-      ) if $self->transaction_depth;
-
-      $self->txn_begin;
-      $attempt_commit = 1;
-
-      try {
-        $coderef->(@$args)
-      }
-      catch {
-        $attempt_commit = 0;
-        $abort_txn->($self, $_)
-      };
-    };
-  }
-  finally {
-    if ($attempt_commit) {
-      my $delta_txn = (1 + $txn_init_depth) - $self->transaction_depth;
-
-      if ($delta_txn) {
-        # a rollback in a top-level txn_do is valid-ish (seen in the wild and our own tests)
-        carp "Unexpected reduction of transaction depth by $delta_txn after execution of $coderef, skipping txn_do's commit"
-          unless $delta_txn == 1 and $self->transaction_depth == 0;
-      }
-      else {
-        $self->txn_commit;
-      }
-    }
-  };
+  DBIx::Class::Storage::BlockRunner->new(
+    storage => $self,
+    run_code => $coderef,
+    run_args => \@_, # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
+    wrap_txn => 1,
+    retry_handler => sub { ! ( $_[0]->retried_count or $_[0]->storage->connected ) },
+  )->run;
 }
 
 =head2 txn_begin
diff --git a/lib/DBIx/Class/Storage/BlockRunner.pm b/lib/DBIx/Class/Storage/BlockRunner.pm
new file mode 100644 (file)
index 0000000..fe2d221
--- /dev/null
@@ -0,0 +1,232 @@
+package # hide from pause until we figure it all out
+  DBIx::Class::Storage::BlockRunner;
+
+use Sub::Quote 'quote_sub';
+use DBIx::Class::Exception;
+use DBIx::Class::Carp;
+use Context::Preserve 'preserve_context';
+use Scalar::Util qw/weaken blessed/;
+use Try::Tiny;
+use Moo;
+use namespace::clean;
+
+=head1 NAME
+
+DBIx::Class::Storage::BlockRunner - Try running a block of code until success with a configurable retry logic
+
+=head1 DESCRIPTION
+
+=head1 METHODS
+
+=cut
+
+has storage => (
+  is => 'ro',
+  required => 1,
+);
+
+has wrap_txn => (
+  is => 'ro',
+  required => 1,
+);
+
+# true - retry, false - rethrow, or you can throw your own (not catching)
+has retry_handler => (
+  is => 'ro',
+  required => 1,
+  isa => quote_sub( q|
+    (ref $_[0]) eq 'CODE'
+      or DBIx::Class::Exception->throw('retry_handler must be a CODE reference')
+  |),
+);
+
+has run_code => (
+  is => 'ro',
+  required => 1,
+  isa => quote_sub( q|
+    (ref $_[0]) eq 'CODE'
+      or DBIx::Class::Exception->throw('run_code must be a CODE reference')
+  |),
+);
+
+has run_args => (
+  is => 'ro',
+  isa => quote_sub( q|
+    (ref $_[0]) eq 'ARRAY'
+      or DBIx::Class::Exception->throw('run_args must be an ARRAY reference')
+  |),
+  default => quote_sub( '[]' ),
+);
+
+has retry_debug => (
+  is => 'rw',
+  default => quote_sub( '$ENV{DBIC_STORAGE_RETRY_DEBUG}' ),
+);
+
+has max_retried_count => (
+  is => 'ro',
+  default => quote_sub( '20' ),
+);
+
+has retried_count => (
+  is => 'ro',
+  init_arg => undef,
+  writer => '_set_retried_count',
+  clearer => '_reset_retried_count',
+  default => quote_sub(q{ 0 }),
+  lazy => 1,
+  trigger => quote_sub(q{
+    DBIx::Class::Exception->throw(sprintf (
+      'Exceeded max_retried_count amount of %d, latest exception: %s',
+      $_[0]->max_retried_count, $_[0]->last_exception
+    )) if $_[0]->max_retried_count < ($_[1]||0);
+  }),
+);
+
+has exception_stack => (
+  is => 'ro',
+  init_arg => undef,
+  clearer => '_reset_exception_stack',
+  default => quote_sub(q{ [] }),
+  lazy => 1,
+);
+
+sub last_exception { shift->exception_stack->[-1] }
+
+sub run {
+  my $self = shift;
+
+  DBIx::Class::Exception->throw('run() takes no arguments') if @_;
+
+  $self->_reset_exception_stack;
+  $self->_reset_retried_count;
+  my $storage = $self->storage;
+
+  return $self->run_code->( @{$self->run_args} )
+    if (! $self->wrap_txn and $storage->{_in_do_block});
+
+  local $storage->{_in_do_block} = 1 unless $storage->{_in_do_block};
+
+  return $self->_run;
+}
+
+# this is the actual recursing worker
+sub _run {
+  # warnings here mean I did not anticipate some ueber-complex case
+  # fatal warnings are not warranted
+  no warnings;
+  use warnings;
+
+  my $self = shift;
+
+  # from this point on (defined $txn_init_depth) is an indicator for wrap_txn
+  # save a bit on method calls
+  my $txn_init_depth = $self->wrap_txn ? $self->storage->transaction_depth : undef;
+  my $txn_begin_ok;
+
+  my $run_err = '';
+
+  weaken (my $weakself = $self);
+
+  return preserve_context {
+    try {
+      if (defined $txn_init_depth) {
+        $weakself->storage->txn_begin;
+        $txn_begin_ok = 1;
+      }
+      $weakself->run_code->( @{$weakself->run_args} );
+    } catch {
+      $run_err = $_;
+      (); # important, affects @_ below
+    };
+  } replace => sub {
+    my @res = @_;
+
+    my $storage = $weakself->storage;
+    my $cur_depth = $storage->transaction_depth;
+
+    if (defined $txn_init_depth and $run_err eq '') {
+      my $delta_txn = (1 + $txn_init_depth) - $cur_depth;
+
+      if ($delta_txn) {
+        # a rollback in a top-level txn_do is valid-ish (seen in the wild and our own tests)
+        carp (sprintf
+          'Unexpected reduction of transaction depth by %d after execution of '
+        . '%s, skipping txn_commit()',
+          $delta_txn,
+          $weakself->run_code,
+        ) unless $delta_txn == 1 and $cur_depth == 0;
+      }
+      else {
+        $run_err = eval { $storage->txn_commit; 1 } ? '' : $@;
+      }
+    }
+
+    # something above threw an error (could be the begin, the code or the commit)
+    if ($run_err ne '') {
+
+      # attempt a rollback if we did begin in the first place
+      if ($txn_begin_ok) {
+        # some DBDs go crazy if there is nothing to roll back on, perform a soft-check
+        my $rollback_exception = $storage->_seems_connected
+          ? (! eval { $storage->txn_rollback; 1 }) ? $@ : ''
+          : 'lost connection to storage'
+        ;
+
+        if ( $rollback_exception and (
+          ! defined blessed $rollback_exception
+            or
+          ! $rollback_exception->isa('DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION')
+        ) ) {
+          $run_err = "Transaction aborted: $run_err. Rollback failed: $rollback_exception";
+        }
+      }
+
+      push @{ $weakself->exception_stack }, $run_err;
+
+      # init depth of > 0 ( > 1 with AC) implies nesting - no retry attempt queries
+      $storage->throw_exception($run_err) if (
+        (
+          defined $txn_init_depth
+            and
+          # FIXME - we assume that $storage->{_dbh_autocommit} is there if
+          # txn_init_depth is there, but this is a DBI-ism
+          $txn_init_depth > ( $storage->{_dbh_autocommit} ? 0 : 1 )
+        ) or ! $weakself->retry_handler->($weakself)
+      );
+
+      $weakself->_set_retried_count($weakself->retried_count + 1);
+
+      # we got that far - let's retry
+      carp( sprintf 'Retrying %s (run %d) after caught exception: %s',
+        $weakself->run_code,
+        $weakself->retried_count + 1,
+        $run_err,
+      ) if $weakself->retry_debug;
+
+      $storage->ensure_connected;
+      # if txn_depth is > 1 this means something was done to the
+      # original $dbh, otherwise we would not get past the preceeding if()
+      $storage->throw_exception(sprintf
+        'Unexpected transaction depth of %d on freshly connected handle',
+        $storage->transaction_depth,
+      ) if (defined $txn_init_depth and $storage->transaction_depth);
+
+      return $weakself->_run;
+    }
+
+    return wantarray ? @res : $res[0];
+  };
+}
+
+=head1 AUTHORS
+
+see L<DBIx::Class>
+
+=head1 LICENSE
+
+You may distribute this code under the same terms as Perl itself.
+
+=cut
+
+1;
index 7041e9d..b7e969f 100644 (file)
@@ -779,37 +779,28 @@ Example:
 
 sub dbh_do {
   my $self = shift;
-  my $code = shift;
+  my $run_target = shift;
 
-  my $dbh = $self->_get_dbh;
-
-  return $self->$code($dbh, @_)
-    if ( $self->{_in_do_block} || $self->{transaction_depth} );
-
-  local $self->{_in_do_block} = 1;
+  # short circuit when we know there is no need for a runner
+  #
+  # FIXME - asumption may be wrong
+  # the rationale for the txn_depth check is that if this block is a part
+  # of a larger transaction, everything up to that point is screwed anyway
+  return $self->$run_target($self->_get_dbh, @_)
+    if $self->{_in_do_block} or $self->transaction_depth;
 
-  # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
   my $args = \@_;
 
-  try {
-    $self->$code ($dbh, @$args);
-  } catch {
-    $self->throw_exception($_) if $self->connected;
-
-    # We were not connected - reconnect and retry, but let any
-    #  exception fall right through this time
-    carp "Retrying dbh_do($code) after catching disconnected exception: $_"
-      if $ENV{DBIC_STORAGE_RETRY_DEBUG};
-
-    $self->_populate_dbh;
-    $self->$code($self->_dbh, @$args);
-  };
+  DBIx::Class::Storage::BlockRunner->new(
+    storage => $self,
+    run_code => sub { $self->$run_target ($self->_get_dbh, @$args ) },
+    wrap_txn => 0,
+    retry_handler => sub { ! ( $_[0]->retried_count or $_[0]->storage->connected ) },
+  )->run;
 }
 
 sub txn_do {
-  # connects or reconnects on pid change, necessary to grab correct txn_depth
-  $_[0]->_get_dbh;
-  local $_[0]->{_in_do_block} = 1;
+  $_[0]->_get_dbh; # connects or reconnects on pid change, necessary to grab correct txn_depth
   shift->next::method(@_);
 }
 
index aefd001..d76fa38 100644 (file)
@@ -140,7 +140,7 @@ my @compose_ns_classes;
   # txn_do to invoke more codepaths
   my ($mc_row_obj, $pager, $pager_explicit_count) = $schema->txn_do (sub {
 
-    my $artist = $rs->create ({
+    my $artist = $schema->resultset('Artist')->create ({
       name => 'foo artist',
       cds => [{
         title => 'foo cd',
@@ -160,7 +160,24 @@ my @compose_ns_classes;
     return ($artist, $pg, $pg_wcount);
   });
 
-  # same for dbh_do
+  # more codepaths - error handling in txn_do
+  {
+    eval { $schema->txn_do ( sub {
+      $storage->_dbh->begin_work;
+      fail ('how did we get so far?!');
+    } ) };
+
+    eval { $schema->txn_do ( sub {
+      $schema->txn_do ( sub {
+        die "It's called EXCEPTION";
+        fail ('how did we get so far?!');
+      } );
+      fail ('how did we get so far?!');
+    } ) };
+    like( $@, qr/It\'s called EXCEPTION/, 'Exception correctly propagated in nested txn_do' );
+  }
+
+  # dbh_do codepath
   my ($rs_bind_circref, $cond_rowobj) = $schema->storage->dbh_do ( sub {
     my $row = $_[0]->schema->resultset('Artist')->new({});
     my $rs = $_[0]->schema->resultset('Artist')->search({
@@ -350,6 +367,10 @@ for my $slot (keys %$weak_registry) {
     # T::B 2.0 has result objects and other fancyness
     delete $weak_registry->{$slot};
   }
+  elsif ($slot =~ /^Method::Generate::(?:Accessor|Constructor)/) {
+    # Moo keeps globals around, this is normal
+    delete $weak_registry->{$slot};
+  }
   elsif ($slot =~ /^SQL::Translator/) {
     # SQLT is a piece of shit, leaks all over
     delete $weak_registry->{$slot};
index 7e81af2..8af340a 100644 (file)
@@ -41,6 +41,7 @@ BEGIN {
 
     namespace::clean
     Try::Tiny
+    Context::Preserve
     Sub::Name
 
     Scalar::Util
@@ -55,6 +56,8 @@ BEGIN {
 
     Class::Accessor::Grouped
     Class::C3::Componentised
+    Moo
+    Sub::Quote
   /, $] < 5.010 ? ( 'Class::C3', 'MRO::Compat' ) : () }; # this is special-cased in DBIx/Class.pm
 
   $test_hook = sub {
index 6706966..c8a2f75 100644 (file)
@@ -76,6 +76,9 @@ my $skip_idx = { map { $_ => 1 } (
   # this subclass is expected to inherit whatever crap comes
   # from the parent
   'DBIx::Class::ResultSet::Pager',
+
+  # Moo does not name its generated methods, fix pending
+  'DBIx::Class::Storage::BlockRunner',
 ) };
 
 my $has_cmop = eval { require Class::MOP };
@@ -115,7 +118,11 @@ for my $mod (@modules) {
       my $origin = $gv->STASH->NAME;
 
       TODO: {
-        local $TODO = 'CAG does not clean its BEGIN constants' if $name =~ /^__CAG_/;
+        local $TODO;
+        if ($name =~ /^__CAG_/) {
+          $TODO = 'CAG does not clean its BEGIN constants';
+        }
+
         is ($gv->NAME, $name, "Properly named $name method at $origin" . ($origin eq $mod
           ? ''
           : " (inherited by $mod)"
index eca17cf..7416486 100644 (file)
@@ -7,8 +7,6 @@ use Test::More;
 use lib qw(t/lib);
 use DBICTest;
 
-plan tests => 6;
-
 my $db_orig = "$FindBin::Bin/../var/DBIxClass.db";
 my $db_tmp  = "$db_orig.tmp";
 
@@ -71,3 +69,44 @@ SKIP: {
     ok( !$@, 'The operation succeeded' );
     cmp_ok( @art_four, '==', 3, "Three artists returned" );
 }
+
+# check that reconnection contexts are preserved in txn_do / dbh_do
+
+my $args = [1, 2, 3];
+
+my $ctx_map = {
+  VOID => {
+    invoke => sub { shift->(); 1 },
+    wa => undef,
+  },
+  SCALAR => {
+    invoke => sub { my $foo = shift->() },
+    wa => '',
+  },
+  LIST => {
+    invoke => sub { my @foo = shift->() },
+    wa => 1,
+  },
+};
+
+for my $ctx (keys $ctx_map) {
+
+  # start disconnected and then connected
+  $schema->storage->disconnect;
+  for (1, 2) {
+    my $disarmed;
+
+    $ctx_map->{$ctx}{invoke}->(sub { $schema->txn_do(sub {
+      is_deeply (\@_, $args, 'Args propagated correctly' );
+
+      is (wantarray(), $ctx_map->{$ctx}{wa}, "Correct $ctx context");
+
+      # this will cause a retry
+      $schema->storage->_dbh->disconnect unless $disarmed++;
+
+      isa_ok ($schema->resultset('Artist')->next, 'DBICTest::Artist');
+    }, @$args) });
+  }
+};
+
+done_testing;
index 0e07ece..17bb7ed 100644 (file)
@@ -125,6 +125,7 @@ my $exceptions = {
     'DBIx::Class::ResultSource::*'                  => { skip => 1 },
     'DBIx::Class::Storage::Statistics'              => { skip => 1 },
     'DBIx::Class::Storage::DBI::Replicated::Types'  => { skip => 1 },
+    'DBIx::Class::Storage::BlockRunner'             => { skip => 1 }, # temporary
 
 # test some specific components whose parents are exempt below
     'DBIx::Class::Relationship::Base'               => {},