Overhaul thread handling
Peter Rabbitson [Thu, 18 Nov 2010 07:56:14 +0000 (08:56 +0100)]
Throw away _verify_tid as it happens too late (when a request is fired).
Instead destroy the current $dbh as soon as possible in CLONE and hope for
the best. Also deactivate _verify_pid on Win32 as it silently spawns a
thread in the background (and thus invokes CLONE).

Changes
lib/DBIx/Class.pm
lib/DBIx/Class/Storage/DBI.pm
lib/DBIx/Class/Storage/DBI/Replicated.pm
lib/DBIx/Class/Storage/TxnScopeGuard.pm
t/51threads.t
t/51threadtxn.t
t/storage/txn.t

diff --git a/Changes b/Changes
index d464eea..6b23ccc 100644 (file)
--- a/Changes
+++ b/Changes
@@ -22,6 +22,8 @@ Revision history for DBIx::Class
         - Missing dependency check in t/60core.t (RT#62635)
         - Fix regressions in IC::DT registration logic
         - Fix infinite loops on old perls with a recent Try::Tiny
+        - Improve "fork()" on Win32 by reimplementing a more robust DBIC
+          thread support (still problematic, pending a DBI fix)
 
     * Misc
         - Switch all serialization to use Storable::nfreeze for portable
index 3b71596..41eacb1 100644 (file)
@@ -195,7 +195,8 @@ resultset is used as an iterator it only fetches rows off the statement
 handle as requested in order to minimise memory usage. It has auto-increment
 support for SQLite, MySQL, PostgreSQL, Oracle, SQL Server and DB2 and is
 known to be used in production on at least the first four, and is fork-
-and thread-safe out of the box (although your DBD may not be).
+and thread-safe out of the box (although
+L<your DBD may not be|DBI/Threads_and_Thread_Safety>).
 
 This project is still under rapid development, so large new features may be
 marked EXPERIMENTAL - such APIs are still usable but may have edge bugs.
index 33fe93d..8595c16 100644 (file)
@@ -27,7 +27,7 @@ __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
 
 __PACKAGE__->mk_group_accessors('simple' => qw/
   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
-  _dbh _dbh_details _conn_pid _conn_tid _sql_maker _sql_maker_opts
+  _dbh _dbh_details _conn_pid _sql_maker _sql_maker_opts
   transaction_depth _dbh_autocommit  savepoints
 /);
 
@@ -166,6 +166,8 @@ sub new {
 # of a fork()ed child to kill the parent's shared DBI handle,
 # *before perl reaches the DESTROY in this package*
 # Yes, it is ugly and effective.
+# Additionally this registry is used by the CLONE method to
+# make sure no handles are shared between threads
 {
   my %seek_and_destroy;
 
@@ -180,11 +182,22 @@ sub new {
     local $?; # just in case the DBI destructor changes it somehow
 
     # destroy just the object if not native to this process/thread
-    $_->_preserve_foreign_dbh for (grep
+    $_->_verify_pid for (grep
       { defined $_ }
       values %seek_and_destroy
     );
   }
+
+  sub CLONE {
+    # As per DBI's recommendation, DBIC disconnects all handles as
+    # soon as possible (DBIC will reconnect only on demand from within
+    # the thread)
+    for (values %seek_and_destroy) {
+      next unless $_;
+      $_->{_dbh_gen}++;  # so that existing cursors will drop as well
+      $_->_dbh(undef);
+    }
+  }
 }
 
 sub DESTROY {
@@ -195,51 +208,20 @@ sub DESTROY {
   $self->_dbh(undef);
 }
 
-sub _preserve_foreign_dbh {
-  my $self = shift;
-
-  return unless $self->_dbh;
-
-  $self->_verify_tid;
-
-  return unless $self->_dbh;
-
-  $self->_verify_pid;
-
-}
-
 # handle pid changes correctly - do not destroy parent's connection
 sub _verify_pid {
   my $self = shift;
 
-  return if ( defined $self->_conn_pid and $self->_conn_pid == $$ );
-
-  $self->_dbh->{InactiveDestroy} = 1;
-  $self->_dbh(undef);
-  $self->{_dbh_gen}++;
-
-  return;
-}
-
-# very similar to above, but seems to FAIL if I set InactiveDestroy
-sub _verify_tid {
-  my $self = shift;
-
-  if ( ! defined $self->_conn_tid ) {
-    return; # no threads
-  }
-  elsif ( $self->_conn_tid == threads->tid ) {
-    return; # same thread
+  my $pid = $self->_conn_pid;
+  if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) {
+    $dbh->{InactiveDestroy} = 1;
+    $self->{_dbh_gen}++;
+    $self->_dbh(undef);
   }
 
-  #$self->_dbh->{InactiveDestroy} = 1;  # why does t/51threads.t fail...?
-  $self->_dbh(undef);
-  $self->{_dbh_gen}++;
-
   return;
 }
 
-
 =head2 connect_info
 
 This method is normally called by L<DBIx::Class::Schema/connection>, which
@@ -921,7 +903,7 @@ sub connected {
 sub _seems_connected {
   my $self = shift;
 
-  $self->_preserve_foreign_dbh;
+  $self->_verify_pid;
 
   my $dbh = $self->_dbh
     or return 0;
@@ -969,7 +951,7 @@ sub dbh {
 # this is the internal "get dbh or connect (don't check)" method
 sub _get_dbh {
   my $self = shift;
-  $self->_preserve_foreign_dbh;
+  $self->_verify_pid;
   $self->_populate_dbh unless $self->_dbh;
   return $self->_dbh;
 }
@@ -1023,8 +1005,7 @@ sub _populate_dbh {
 
   $self->_dbh($self->_connect(@info));
 
-  $self->_conn_pid($$);
-  $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
+  $self->_conn_pid($$) if $^O ne 'MSWin32'; # on win32 these are in fact threads
 
   $self->_determine_driver;
 
index b61e831..6dad218 100644 (file)
@@ -345,7 +345,6 @@ has 'write_handler' => (
     _sqlt_minimum_version
     _sql_maker_opts
     _conn_pid
-    _conn_tid
     _dbh_autocommit
     _native_data_type
     _get_dbh
@@ -367,9 +366,7 @@ has 'write_handler' => (
 
 my @unimplemented = qw(
   _arm_global_destructor
-  _preserve_foreign_dbh
   _verify_pid
-  _verify_tid
 
   get_use_dbms_capability
   set_use_dbms_capability
index 4365b9d..9bda8bb 100644 (file)
@@ -107,7 +107,7 @@ sub DESTROY {
   return if $dismiss;
 
   # if our dbh is not ours anymore, the weakref will go undef
-  $storage->_preserve_foreign_dbh;
+  $storage->_verify_pid;
   return unless $_[0]->[2];
 
   my $exception = $@;
index 3a26de9..4564e3f 100644 (file)
@@ -28,9 +28,11 @@ if($num_children !~ /^[0-9]+$/ || $num_children < 10) {
    $num_children = 10;
 }
 
-diag 'It is normal to see a series of "Scalars leaked: ..." warnings during this test';
-
 use_ok('DBICTest::Schema');
+
+diag "\n\nIt is ok if you see series of 'Attempt to free unreferenced scalar: ...' warnings during this test\n "
+  if $] < '5.008005';
+
 my $schema = DBICTest::Schema->connection($dsn, $user, $pass, { AutoCommit => 1, RaiseError => 1, PrintError => 0 });
 
 my $parent_rs;
index 74f6ce8..84a4c3d 100644 (file)
@@ -27,10 +27,11 @@ if($num_children !~ /^[0-9]+$/ || $num_children < 10) {
    $num_children = 10;
 }
 
-diag 'It is normal to see a series of "Scalars leaked: ..." warnings during this test';
-
 use_ok('DBICTest::Schema');
 
+diag "\n\nIt is ok if you see series of 'Attempt to free unreferenced scalar: ...' warnings during this test\n "
+  if $] < '5.008005';
+
 my $schema = DBICTest::Schema->connection($dsn, $user, $pass, { AutoCommit => 1, RaiseError => 1, PrintError => 0 });
 
 my $parent_rs;
index d48d84b..895eaaa 100644 (file)
@@ -7,9 +7,6 @@ use Test::Exception;
 use lib qw(t/lib);
 use DBICTest;
 
-plan skip_all => 'Disabled on windows, pending resolution of DBD::SQLite SIGSEGVs'
-  if $^O eq 'MSWin32';
-
 my $code = sub {
   my ($artist, @cd_titles) = @_;
 
@@ -130,11 +127,12 @@ for my $want (0,1) {
       $guard->commit
     },
   ) {
-    push @pids, fork();
+    my $pid = fork();
     die "Unable to fork: $!\n"
-      if ! defined $pids[-1];
+      if ! defined $pid;
 
-    if ($pids[-1]) {
+    if ($pid) {
+      push @pids, $pid;
       next;
     }
 
@@ -167,11 +165,8 @@ for my $want (0,1) {
       });
     }
 
-    for my $pid ( $schema->txn_do ( sub { _forking_action ($schema) } ) ) {
-      waitpid ($pid, 0);
-      ok (! $?, "Child $pid exit ok (pass $pass)");
-      isa_ok ($schema->resultset ('Artist')->find ({ name => "forking action $pid" }), 'DBIx::Class::Row');
-    }
+    $schema->txn_do ( sub { _test_forking_action ($schema, $pass) } );
+
   }
 }
 
@@ -192,34 +187,32 @@ for my $want (0,1) {
       });
     }
 
-    my @pids;
     my $guard = $schema->txn_scope_guard;
-    @pids = _forking_action ($schema);
+    my @pids = _test_forking_action ($schema, $pass);
     $guard->commit;
-
-    for my $pid (@pids) {
-      waitpid ($pid, 0);
-      ok (! $?, "Child $pid exit ok (pass $pass)");
-      isa_ok ($schema->resultset ('Artist')->find ({ name => "forking action $pid" }), 'DBIx::Class::Row');
-    }
   }
 }
 
-sub _forking_action {
-  my $schema = shift;
+sub _test_forking_action {
+  my ($schema, $pass) = @_;
 
   my @pids;
-  while (@pids < 5) {
 
-    push @pids, fork();
+  SKIP: for my $count (1 .. 5) {
+
+    skip 'Weird DBI General Protection Faults, skip forking tests (RT#63104)', 5
+      if $^O eq 'MSWin32';
+
+    my $pid = fork();
     die "Unable to fork: $!\n"
-      if ! defined $pids[-1];
+      if ! defined $pid;
 
-    if ($pids[-1]) {
+    if ($pid) {
+      push @pids, $pid;
       next;
     }
 
-    if (@pids % 2) {
+    if ($count % 2) {
       $schema->txn_do (sub {
         my $depth = $schema->storage->transaction_depth;
         die "$$(txn_do)unexpected txn depth $depth!" if $depth != 1;
@@ -237,7 +230,11 @@ sub _forking_action {
     exit 0;
   }
 
-  return @pids;
+  for my $pid (@pids) {
+    waitpid ($pid, 0);
+    ok (! $?, "Child $pid exit ok (pass $pass)");
+    isa_ok ($schema->resultset ('Artist')->find ({ name => "forking action $pid" }), 'DBIx::Class::Row');
+  }
 }
 
 my $fail_code = sub {