More tests are passing
rkinyon [Sat, 23 Dec 2006 07:10:02 +0000 (07:10 +0000)]
lib/DBM/Deep/Engine3.pm
t/33_transactions.t

index 524bf09..baf3aee 100644 (file)
@@ -42,7 +42,6 @@ sub new {
     my $class = shift;
     my ($args) = @_;
 
-    print "\n********* NEW ********\n\n";
     my $self = bless {
         byte_size   => 4,
 
@@ -332,6 +331,10 @@ sub begin_work {
         DBM::Deep->_throw_error( "Cannot begin_work - no available transactions" );
     }
 
+    $self->set_trans_ctr(
+        $self->get_txn_staleness_counter( $self->trans_id ),
+    );
+
     return;
 }
 
@@ -350,7 +353,7 @@ sub rollback {
         my $read_loc = $entry
           + $self->hash_size
           + $self->byte_size
-          + $self->trans_id * $self->byte_size;
+          + $self->trans_id * ( 2 * $self->byte_size );
 
         my $data_loc = $self->storage->read_at( $read_loc, $self->byte_size );
         $data_loc = unpack( $StP{$self->byte_size}, $data_loc );
@@ -366,6 +369,8 @@ sub rollback {
     my @slots = $self->read_txn_slots;
     $slots[$self->trans_id] = 0;
     $self->write_txn_slots( @slots );
+    $self->inc_txn_staleness_counter( $self->trans_id );
+    $self->set_trans_ctr( 0 );
     $self->set_trans_id( 0 );
 
     return 1;
@@ -379,9 +384,9 @@ sub commit {
         DBM::Deep->_throw_error( "Cannot commit without a transaction" );
     }
 
-    print "TID: " . $self->trans_id, $/;
+    #print "TID: " . $self->trans_id, $/;
     foreach my $entry (@{ $self->get_entries } ) {
-        print "$entry\n";
+        #print "$entry\n";
         # Overwrite the entry in head with the entry in trans_id
         my $base = $entry
           + $self->hash_size
@@ -390,13 +395,13 @@ sub commit {
         my $head_loc = $self->storage->read_at( $base, $self->byte_size );
         $head_loc = unpack( $StP{$self->byte_size}, $head_loc );
         my $trans_loc = $self->storage->read_at(
-            $base + $self->trans_id * $self->byte_size, $self->byte_size,
+            $base + $self->trans_id * ( 2 * $self->byte_size ), $self->byte_size,
         );
 
         $self->storage->print_at( $base, $trans_loc );
         $self->storage->print_at(
-            $base + $self->trans_id * $self->byte_size,
-            pack( $StP{$self->byte_size}, 0 ),
+            $base + $self->trans_id * ( 2 * $self->byte_size ),
+            pack( $StP{$self->byte_size} . ' N', (0) x 2 ),
         );
 
         if ( $head_loc > 1 ) {
@@ -409,6 +414,8 @@ sub commit {
     my @slots = $self->read_txn_slots;
     $slots[$self->trans_id] = 0;
     $self->write_txn_slots( @slots );
+    $self->inc_txn_staleness_counter( $self->trans_id );
+    $self->set_trans_ctr( 0 );
     $self->set_trans_id( 0 );
 
     return 1;
@@ -436,6 +443,25 @@ sub get_running_txn_ids {
     my @trans_ids = grep { $transactions[$_] } 0 .. $#transactions;
 }
 
+sub get_txn_staleness_counter {
+    my $self = shift;
+    my ($trans_id) = @_;
+    return unpack( 'N',
+        $self->storage->read_at(
+            $self->trans_loc + 4 * ($trans_id - 1), 4,
+        )
+    );
+}
+
+sub inc_txn_staleness_counter {
+    my $self = shift;
+    my ($trans_id) = @_;
+    $self->storage->print_at(
+        $self->trans_loc,
+        pack( 'N', $self->get_txn_staleness_counter( $trans_id ) + 1 ),
+    );
+}
+
 sub get_entries {
     my $self = shift;
     return [ keys %{ $self->{entries}{$self->trans_id} ||= {} } ];
@@ -445,15 +471,15 @@ sub add_entry {
     my $self = shift;
     my ($trans_id, $loc) = @_;
 
-    print "$trans_id => $loc\n";
+    #print "$trans_id => $loc\n";
     $self->{entries}{$trans_id} ||= {};
     $self->{entries}{$trans_id}{$loc} = undef;
-    use Data::Dumper;print "$self: " . Dumper $self->{entries};
+    #use Data::Dumper;print "$self: " . Dumper $self->{entries};
 }
 
 sub clear_entries {
     my $self = shift;
-    print "Clearing\n";
+    #print "Clearing\n";
     delete $self->{entries}{$self->trans_id};
 }
 
@@ -485,6 +511,7 @@ sub clear_entries {
 
         $self->set_trans_loc( $header_fixed + 2 );
         $self->set_chains_loc( $header_fixed + 2 + 4 + 4 * $self->num_txns );
+        $self->set_trans_ctr( 0 );
 
         return;
     }
@@ -514,6 +541,7 @@ sub clear_entries {
 
         $self->set_trans_loc( $header_fixed + 2 );
         $self->set_chains_loc( $header_fixed + 2 + 4 + 4 * $self->num_txns );
+        $self->set_trans_ctr( 0 );
 
         if ( @values < 2 || grep { !defined } @values ) {
             $self->storage->close;
@@ -646,6 +674,9 @@ sub blank_md5   { chr(0) x $_[0]->hash_size }
 sub trans_id     { $_[0]{trans_id} }
 sub set_trans_id { $_[0]{trans_id} = $_[1] }
 
+sub trans_ctr     { $_[0]{trans_ctr} }
+sub set_trans_ctr { $_[0]{trans_ctr} = $_[1] }
+
 sub trans_loc     { $_[0]{trans_loc} }
 sub set_trans_loc { $_[0]{trans_loc} = $_[1] }
 
@@ -1001,7 +1032,7 @@ sub write_data {
                 my $old_value = $blist->get_data_for;
                 foreach my $other_trans_id ( @trans_ids ) {
                     next if $blist->get_data_location_for({ trans_id => $other_trans_id, allow_head => 0 });
-                    print "write_md5 to save a value\n";
+                    #print "write_md5 to save a value\n";
                     $blist->write_md5({
                         trans_id => $other_trans_id,
                         key      => $args->{key},
@@ -1156,8 +1187,6 @@ package DBM::Deep::Engine::Sector::BucketList;
 
 our @ISA = qw( DBM::Deep::Engine::Sector );
 
-sub idx_for_txn { return $_[1] + 1 }
-
 sub _init {
     my $self = shift;
 
@@ -1181,7 +1210,7 @@ sub _init {
     return $self;
 }
 
-sub base_size { 2 } # Sig + recycled counter
+sub base_size { 1 + 1 } # Sig + recycled counter
 
 sub size {
     my $self = shift;
@@ -1196,9 +1225,9 @@ sub bucket_size {
     my $self = shift;
     unless ( $self->{bucket_size} ) {
         my $e = $self->engine;
-        # Key + transactions
-        my $locs_size = (1 + $e->num_txns ) * $e->byte_size;
-        $self->{bucket_size} = $e->hash_size + $locs_size;
+        # Key + head (location) + transactions (location + staleness-counter)
+        my $location_size = $e->byte_size + $e->num_txns * ( 2 * $e->byte_size );
+        $self->{bucket_size} = $e->hash_size + $location_size;
     }
     return $self->{bucket_size};
 }
@@ -1256,7 +1285,7 @@ sub write_md5 {
     $args->{trans_id} = $engine->trans_id unless exists $args->{trans_id};
 
     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
-    print "Adding $args->{trans_id} -> $spot\n";
+    #print "Adding $args->{trans_id} -> $spot\n";
     $engine->add_entry( $args->{trans_id}, $spot );
 
     unless ($self->{found}) {
@@ -1274,10 +1303,11 @@ sub write_md5 {
     my $loc = $spot
       + $engine->hash_size
       + $engine->byte_size
-      + $args->{trans_id} * $engine->byte_size;
+      + $args->{trans_id} * ( 2 * $engine->byte_size );
 
     $engine->storage->print_at( $loc,
         pack( $StP{$engine->byte_size}, $args->{value}->offset ),
+        pack( 'N', $engine->get_txn_staleness_counter( $args->{trans_id} ) ),
     );
 }
 
@@ -1296,10 +1326,11 @@ sub mark_deleted {
     my $loc = $spot
       + $engine->hash_size
       + $engine->byte_size
-      + $args->{trans_id} * $engine->byte_size;
+      + $args->{trans_id} * ( 2 * $engine->byte_size );
 
     $engine->storage->print_at( $loc,
         pack( $StP{$engine->byte_size}, 1 ), # 1 is the marker for deleted
+        pack( 'N', $engine->get_txn_staleness_counter( $args->{trans_id} ) ),
     );
 }
 
@@ -1345,15 +1376,28 @@ sub get_data_location_for {
     $args->{trans_id}   = $self->engine->trans_id unless exists $args->{trans_id};
     $args->{idx}        = $self->{idx} unless exists $args->{idx};
 
-    my $location = $self->engine->storage->read_at(
-        $self->offset + $self->base_size
+    my $e = $self->engine;
+
+    my $spot = $self->offset + $self->base_size
       + $args->{idx} * $self->bucket_size
-      + $self->engine->hash_size
-      + $self->engine->byte_size
-      + $args->{trans_id} * $self->engine->byte_size,
-        $self->engine->byte_size,
+      + $e->hash_size
+      + $e->byte_size
+      + $args->{trans_id} * ( 2 * $e->byte_size );
+
+    my $buffer = $e->storage->read_at(
+        $spot,
+        2 * $e->byte_size,
     );
-    my $loc = unpack( $StP{$self->engine->byte_size}, $location );
+    my ($loc, $staleness) = unpack( $StP{$e->byte_size} . ' N', $buffer );
+
+    # We have found an entry that is old, so get rid of it
+    if ( $staleness != $e->get_txn_staleness_counter( $args->{trans_id} ) ) {
+        $e->storage->print_at(
+            $spot,
+            pack( $StP{$e->byte_size} . ' N', (0) x 2 ), 
+        );
+        $loc = 0;
+    }
 
     # If we're in a transaction and we never wrote to this location, try the
     # HEAD instead.
index dbed193..efedbfa 100644 (file)
@@ -109,7 +109,7 @@ $db1->begin_work;
 
     is( $db1->{other_x}, 'bar', "After begin_work, DB1's other_x is still bar" );
     is( $db2->{other_x}, 'bar', "After begin_work, DB2's other_x is still bar" );
-__END__
+
     delete $db2->{other_x};
     ok( !exists $db2->{other_x}, "DB2 deleted other_x in DB1's transaction, so it can't see it anymore" );
     is( $db1->{other_x}, 'bar', "Since other_x was deleted after the transaction began, DB1 still sees it." );