begin_work, rollback, and commit now properly lock the database
rkinyon@cpan.org [Tue, 1 Jul 2008 01:19:26 +0000 (01:19 +0000)]
git-svn-id: http://svn.ali.as/cpan/trunk/DBM-Deep@3670 88f4d9cd-8a04-0410-9d60-8f63309c3137

lib/DBM/Deep.pm
lib/DBM/Deep/Engine.pm
lib/DBM/Deep/Engine/Sector/BucketList.pm
lib/DBM/Deep/Engine/Sector/FileHeader.pm
lib/DBM/Deep/Engine/Sector/Reference.pm
t/33_transactions.t
t/41_transaction_multilevel.t
t/43_transaction_maximum.t

index 5e67af1..24056e4 100644 (file)
@@ -419,17 +419,26 @@ sub clone {
 
 sub begin_work {
     my $self = shift->_get_self;
-    return $self->_engine->begin_work( $self, @_ );
+    $self->lock_exclusive;
+    my $rv = $self->_engine->begin_work( $self, @_ );
+    $self->unlock;
+    return $rv;
 }
 
 sub rollback {
     my $self = shift->_get_self;
-    return $self->_engine->rollback( $self, @_ );
+    $self->lock_exclusive;
+    my $rv = $self->_engine->rollback( $self, @_ );
+    $self->unlock;
+    return $rv;
 }
 
 sub commit {
     my $self = shift->_get_self;
-    return $self->_engine->commit( $self, @_ );
+    $self->lock_exclusive;
+    my $rv = $self->_engine->commit( $self, @_ );
+    $self->unlock;
+    return $rv;
 }
 
 ##
index 8f172b3..870a528 100644 (file)
@@ -491,6 +491,12 @@ sub rollback {
         DBM::Deep->_throw_error( "Cannot rollback without an active transaction" );
     }
 
+    foreach my $entry ( @{ $self->get_entries } ) {
+        my ($sector, $idx) = split ':', $entry;
+        $self->_load_sector( $sector )->rollback( $idx );
+    }
+
+=pod
     # Each entry is the file location for a bucket that has a modification for
     # this transaction. The entries need to be expunged.
     foreach my $entry (@{ $self->get_entries } ) {
@@ -509,6 +515,7 @@ sub rollback {
             $self->_load_sector( $data_loc )->free;
         }
     }
+=cut
 
     $self->clear_entries;
 
@@ -529,6 +536,12 @@ sub commit {
         DBM::Deep->_throw_error( "Cannot commit without an active transaction" );
     }
 
+    foreach my $entry ( @{ $self->get_entries } ) {
+        my ($sector, $idx) = split ':', $entry;
+        $self->_load_sector( $sector )->commit( $idx );
+    }
+
+=pod
     foreach my $entry (@{ $self->get_entries } ) {
         # Overwrite the entry in head with the entry in trans_id
         my $base = $entry
@@ -553,6 +566,7 @@ sub commit {
             $self->_load_sector( $head_loc )->free;
         }
     }
+=cut
 
     $self->clear_entries;
 
@@ -598,23 +612,23 @@ sub get_entries {
 
 sub add_entry {
     my $self = shift;
-    my ($trans_id, $loc) = @_;
+    my ($trans_id, $loc, $idx) = @_;
 
     $self->{entries}{$trans_id} ||= {};
-    $self->{entries}{$trans_id}{$loc} = undef;
+    $self->{entries}{$trans_id}{"$loc:$idx"} = undef;
 }
 
 # If the buckets are being relocated because of a reindexing, the entries
 # mechanism needs to be made aware of it.
 sub reindex_entry {
     my $self = shift;
-    my ($old_loc, $new_loc) = @_;
+    my ($old_loc, $old_idx, $new_loc, $new_idx) = @_;
 
     TRANS:
     while ( my ($trans_id, $locs) = each %{ $self->{entries} } ) {
-        if ( exists $locs->{$old_loc} ) {
-            delete $locs->{$old_loc};
-            $locs->{$new_loc} = undef;
+        if ( exists $locs->{"$old_loc:$old_idx"} ) {
+            delete $locs->{"$old_loc:$old_idx"};
+            $locs->{"$new_loc:$new_idx"} = undef;
             next TRANS;
         }
     }
index 36537aa..95215b0 100644 (file)
@@ -123,6 +123,7 @@ sub chopped_up {
     return @buckets;
 }
 
+#XXX Call this append() instead? -RobK, 2008-06-30
 sub write_at_next_open {
     my $self = shift;
     my ($entry) = @_;
@@ -133,7 +134,7 @@ sub write_at_next_open {
     my $spot = $self->base_size + $self->{_next_open}++ * $self->bucket_size;
     $self->write( $spot, $entry );
 
-    return $spot;
+    return ($self->{_next_open} - 1);
 }
 
 sub has_md5 {
@@ -193,7 +194,7 @@ sub write_md5 {
     $args->{trans_id} = $e->trans_id unless exists $args->{trans_id};
 
     my $spot = $self->base_size + $self->{idx} * $self->bucket_size;
-    $e->add_entry( $args->{trans_id}, $self->offset + $spot );
+    $e->add_entry( $args->{trans_id}, $self->offset, $self->{idx} );
 
     unless ($self->{found}) {
         my $key_sector = DBM::Deep::Engine::Sector::Scalar->new({
@@ -229,7 +230,7 @@ sub mark_deleted {
     $args->{trans_id} = $e->trans_id unless exists $args->{trans_id};
 
     my $spot = $self->base_size + $self->{idx} * $self->bucket_size;
-    $e->add_entry( $args->{trans_id}, $self->offset + $spot );
+    $e->add_entry( $args->{trans_id}, $self->offset, $self->{idx} );
 
     my $loc = $spot
       + $e->hash_size
@@ -364,5 +365,50 @@ sub get_key_for {
     return $self->engine->_load_sector( $location );
 }
 
+sub rollback {
+    my $self = shift;
+    my ($idx) = @_;
+    my $e = $self->engine;
+    my $trans_id = $e->trans_id;
+
+    my $base = $self->base_size + ($idx * $self->bucket_size) + $e->hash_size + $e->byte_size;
+    my $spot = $base + $e->byte_size + ($trans_id - 1) * ( $e->byte_size + $DBM::Deep::Engine::STALE_SIZE );
+
+    my $trans_loc = $self->read( $spot, $e->byte_size );
+    $trans_loc = unpack( $e->StP($e->byte_size), $trans_loc );
+
+    $self->write( $spot, pack( $e->StP($e->byte_size), 0 ) );
+
+    if ( $trans_loc > 1 ) {
+        $e->_load_sector( $trans_loc )->free;
+    }
+
+    return;
+}
+
+sub commit {
+    my $self = shift;
+    my ($idx) = @_;
+    my $e = $self->engine;
+    my $trans_id = $e->trans_id;
+
+    my $base = $self->base_size + ($idx * $self->bucket_size) + $e->hash_size + $e->byte_size;
+
+    my $head_loc = $self->read( $base, $e->byte_size );
+    $head_loc = unpack( $e->StP($e->byte_size), $head_loc );
+
+    my $spot = $base + $e->byte_size + ($trans_id - 1) * ( $e->byte_size + $DBM::Deep::Engine::STALE_SIZE );
+    my $trans_loc = $self->read( $spot, $e->byte_size );
+
+    $self->write( $base, $trans_loc );
+    $self->write( $spot, pack( $e->StP($e->byte_size) . ' ' . $e->StP($DBM::Deep::Engine::STALE_SIZE), (0) x 2 ) );
+
+    if ( $head_loc > 1 ) {
+        $e->_load_sector( $head_loc )->free;
+    }
+
+    return;
+}
+
 1;
 __END__
index ea14d9e..fe70e6a 100644 (file)
@@ -198,7 +198,8 @@ sub read_txn_slots {
     my $e = $self->engine;
     my $bl = $e->txn_bitfield_len;
     my $num_bits = $bl * 8;
-    return split '', unpack( 'b'.$num_bits, $self->read( $e->trans_loc, $bl ) );
+    my @x = split '', unpack( 'b'.$num_bits, $self->read( $e->trans_loc, $bl ) );
+    return @x;
 }
 
 sub write_txn_slots {
index 5586f9d..0c5e215 100644 (file)
@@ -284,6 +284,7 @@ sub get_bucket_list {
 
         my %blist_cache;
         #XXX q.v. the comments for this function.
+        my $old_idx = 0;
         foreach my $entry ( $sector->chopped_up ) {
             my ($spot, $md5) = @{$entry};
             my $idx = ord( substr( $md5, $i, 1 ) );
@@ -296,8 +297,12 @@ sub get_bucket_list {
 
             $new_index->set_entry( $idx => $blist->offset );
 
-            my $new_spot = $blist->write_at_next_open( $md5 );
-            $engine->reindex_entry( $spot => $new_spot );
+            #XXX q.v. the comments for this function.
+            my $new_idx = $blist->write_at_next_open( $md5 );
+
+            $engine->reindex_entry( ( $sector->offset, $old_idx ) => ( $blist->offset, $new_idx ) );
+
+            $old_idx++;
         }
 
         # Handle the new item separately.
index 1055952..49252ad 100644 (file)
@@ -47,7 +47,10 @@ lives_ok {
 } "Rolling back an empty transaction is ok.";
 
 cmp_bag( [ keys %$db1 ], [qw( x )], "DB1 keys correct" );
+__END__
+warn "4\n";
 cmp_bag( [ keys %$db2 ], [qw( x )], "DB2 keys correct" );
+warn "5\n";
 
 $db1->begin_work;
 
index 2c3c44a..d125582 100644 (file)
@@ -27,6 +27,7 @@ $db1->{x} = { xy => { foo => 'y' } };
 is( $db1->{x}{xy}{foo}, 'y', "Before transaction, DB1's X is Y" );
 is( $db2->{x}{xy}{foo}, 'y', "Before transaction, DB2's X is Y" );
 
+#warn $db1->_dump_file;
 $db1->begin_work;
 
     cmp_bag( [ keys %$db1 ], [qw( x )], "DB1 keys correct" );
@@ -48,8 +49,9 @@ $db1->begin_work;
     cmp_bag( [ keys %{$db1->{x}} ], [qw( yz )], "DB1->X keys correct" );
     cmp_bag( [ keys %{$db2->{x}} ], [qw( xy )], "DB2->X keys correct" );
 
+#warn $db1->_dump_file;
 $db1->rollback;
-
+__END__
 cmp_bag( [ keys %$db1 ], [qw( x )], "DB1 keys correct" );
 cmp_bag( [ keys %$db2 ], [qw( x )], "DB2 keys correct" );
 
index e8462b3..d861010 100644 (file)
@@ -6,17 +6,20 @@ use t::common qw( new_fh );
 
 use DBM::Deep;
 
-my $max_txns = 255;
+my $max_txns = 250;
 
 my ($fh, $filename) = new_fh();
 
 my @dbs = grep { $_ } map {
+    my $x = 
     eval {
         DBM::Deep->new(
-            file => $filename,
-            num_txns  => $max_txns,
+            file     => $filename,
+            num_txns => $max_txns,
         );
     };
+    die $@ if $@;
+    $x;
 } 1 .. $max_txns;
 
 my $num = $#dbs;