All auditing now goes through a method on ::File
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
index 547a7af..309c274 100644 (file)
@@ -6,6 +6,14 @@ use strict;
 use warnings;
 
 use Fcntl qw( :DEFAULT :flock :seek );
+use Scalar::Util ();
+
+# File-wide notes:
+# * All the local($/,$\); are to protect read() and print() from -l.
+# * To add to bucket_size, make sure you modify the following:
+#   - calculate_sizes()
+#   - _get_key_subloc()
+#   - add_bucket() - where the buckets are printed
 
 ##
 # Setup file and tag signatures.  These should never change.
@@ -44,12 +52,13 @@ sub new {
         max_buckets => 16,
 
         fileobj => undef,
+        obj     => undef,
     }, $class;
 
     if ( defined $args->{pack_size} ) {
         if ( lc $args->{pack_size} eq 'small' ) {
             $args->{long_size} = 2;
-            $args->{long_pack} = 'S';
+            $args->{long_pack} = 'n';
         }
         elsif ( lc $args->{pack_size} eq 'medium' ) {
             $args->{long_size} = 4;
@@ -69,6 +78,7 @@ sub new {
         next unless exists $args->{$param};
         $self->{$param} = $args->{$param};
     }
+    Scalar::Util::weaken( $self->{obj} ) if $self->{obj};
 
     if ( $self->{max_buckets} < 16 ) {
         warn "Floor of max_buckets is 16. Setting it to 16 from '$self->{max_buckets}'\n";
@@ -84,8 +94,9 @@ sub _fh      { return $_[0]->_fileobj->{fh} }
 sub calculate_sizes {
     my $self = shift;
 
+    #XXX Does this need to be updated with different hashing algorithms?
     $self->{index_size}       = (2**8) * $self->{long_size};
-    $self->{bucket_size}      = $self->{hash_size} + $self->{long_size} * 2;
+    $self->{bucket_size}      = $self->{hash_size} + $self->{long_size} * 3;
     $self->{bucket_list_size} = $self->{max_buckets} * $self->{bucket_size};
 
     return;
@@ -93,37 +104,39 @@ sub calculate_sizes {
 
 sub write_file_header {
     my $self = shift;
-#    my ($obj) = @_;
+
+    local($/,$\);
 
     my $fh = $self->_fh;
 
-    my $loc = $self->_request_space(
-        undef, length( SIG_FILE ) + 21,
-    );
+    my $loc = $self->_request_space( length( SIG_FILE ) + 21 );
     seek($fh, $loc + $self->_fileobj->{file_offset}, SEEK_SET);
     print( $fh
         SIG_FILE,
         SIG_HEADER,
         pack('N', 1),  # header version
         pack('N', 12), # header size
-        pack('N', 0),  # file version
-        pack('S', $self->{long_size}),
+        pack('N', 0),  # currently running transaction IDs
+        pack('n', $self->{long_size}),
         pack('A', $self->{long_pack}),
-        pack('S', $self->{data_size}),
+        pack('n', $self->{data_size}),
         pack('A', $self->{data_pack}),
-        pack('S', $self->{max_buckets}),
+        pack('n', $self->{max_buckets}),
     );
 
+    $self->_fileobj->set_transaction_offset( 13 );
+
     return;
 }
 
 sub read_file_header {
     my $self = shift;
-    my ($obj) = @_;
 
-    my $fh = $obj->_fh;
+    local($/,$\);
+
+    my $fh = $self->_fh;
 
-    seek($fh, 0 + $obj->_fileobj->{file_offset}, SEEK_SET);
+    seek($fh, 0 + $self->_fileobj->{file_offset}, SEEK_SET);
     my $buffer;
     my $bytes_read = read( $fh, $buffer, length(SIG_FILE) + 9 );
 
@@ -134,21 +147,24 @@ sub read_file_header {
     );
 
     unless ( $file_signature eq SIG_FILE ) {
-        $self->{fileobj}->close;
-        $obj->_throw_error( "Signature not found -- file is not a Deep DB" );
+        $self->_fileobj->close;
+        $self->_throw_error( "Signature not found -- file is not a Deep DB" );
     }
 
     unless ( $sig_header eq SIG_HEADER ) {
-        $self->{fileobj}->close;
-        $obj->_throw_error( "Old file version found." );
+        $self->_fileobj->close;
+        $self->_throw_error( "Old file version found." );
     }
 
     my $buffer2;
     $bytes_read += read( $fh, $buffer2, $size );
-    my ($file_version, @values) = unpack( 'N S A S A S', $buffer2 );
+    my ($running_transactions, @values) = unpack( 'N n A n A n', $buffer2 );
+
+    $self->_fileobj->set_transaction_offset( 13 );
+
     if ( @values < 5 || grep { !defined } @values ) {
-        $self->{fileobj}->close;
-        $obj->_throw_error("Corrupted file - bad header");
+        $self->_fileobj->close;
+        $self->_throw_error("Corrupted file - bad header");
     }
 
     #XXX Add warnings if values weren't set right
@@ -157,44 +173,18 @@ sub read_file_header {
     return $bytes_read;
 }
 
-sub get_file_version {
-    my $self = shift;
-    my ($obj) = @_;
-
-    my $fh = $obj->_fh;
-
-    seek( $fh, 13 + $obj->_fileobj->{file_offset}, SEEK_SET );
-    my $buffer;
-    my $bytes_read = read( $fh, $buffer, 4 );
-    unless ( $bytes_read == 4 ) {
-        $obj->_throw_error( "Cannot read file version" );
-    }
-
-    return unpack( 'N', $buffer );
-}
-
-sub write_file_version {
-    my $self = shift;
-    my ($obj, $new_version) = @_;
-
-    my $fh = $obj->_fh;
-
-    seek( $fh, 13 + $obj->_fileobj->{file_offset}, SEEK_SET );
-    print( $fh pack( 'N', $new_version ) );
-
-    return;
-}
-
 sub setup_fh {
     my $self = shift;
     my ($obj) = @_;
 
-    my $fh = $obj->_fh;
+    local($/,$\);
+
+    my $fh = $self->_fh;
     flock $fh, LOCK_EX;
 
     #XXX The duplication of calculate_sizes needs to go away
     unless ( $obj->{base_offset} ) {
-        my $bytes_read = $self->read_file_header( $obj );
+        my $bytes_read = $self->read_file_header;
 
         $self->calculate_sizes;
 
@@ -202,14 +192,14 @@ sub setup_fh {
         # File is empty -- write header and master index
         ##
         if (!$bytes_read) {
-            $self->write_file_header( $obj );
+            $self->_fileobj->audit( "# Database created on" );
 
-            $obj->{base_offset} = $self->_request_space(
-                $obj, $self->tag_size( $self->{index_size} ),
-            );
+            $self->write_file_header;
+
+            $obj->{base_offset} = $self->_request_space( $self->tag_size( $self->{index_size} ) );
 
             $self->write_tag(
-                $obj, $obj->_base_offset, $obj->_type,
+                $obj->_base_offset, $obj->_type,
                 chr(0)x$self->{index_size},
             );
 
@@ -224,11 +214,15 @@ sub setup_fh {
             ##
             # Get our type from master index header
             ##
-            my $tag = $self->load_tag($obj, $obj->_base_offset)
-            or $obj->_throw_error("Corrupted file, no master index record");
+            my $tag = $self->load_tag($obj->_base_offset);
+            unless ( $tag ) {
+                flock $fh, LOCK_UN;
+                $self->_throw_error("Corrupted file, no master index record");
+            }
 
-            unless ($obj->{type} eq $tag->{signature}) {
-                $obj->_throw_error("File type mismatch");
+            unless ($obj->_type eq $tag->{signature}) {
+                flock $fh, LOCK_UN;
+                $self->_throw_error("File type mismatch");
             }
         }
     }
@@ -237,10 +231,10 @@ sub setup_fh {
     }
 
     #XXX We have to make sure we don't mess up when autoflush isn't turned on
-    unless ( $obj->_fileobj->{inode} ) {
-        my @stats = stat($obj->_fh);
-        $obj->_fileobj->{inode} = $stats[1];
-        $obj->_fileobj->{end} = $stats[7];
+    unless ( $self->_fileobj->{inode} ) {
+        my @stats = stat($fh);
+        $self->_fileobj->{inode} = $stats[1];
+        $self->_fileobj->{end} = $stats[7];
     }
 
     flock $fh, LOCK_UN;
@@ -259,13 +253,15 @@ sub write_tag {
     # Given offset, signature and content, create tag and write to disk
     ##
     my $self = shift;
-    my ($obj, $offset, $sig, $content) = @_;
+    my ($offset, $sig, $content) = @_;
     my $size = length( $content );
 
-    my $fh = $obj->_fh;
+    local($/,$\);
+
+    my $fh = $self->_fh;
 
     if ( defined $offset ) {
-        seek($fh, $offset + $obj->_fileobj->{file_offset}, SEEK_SET);
+        seek($fh, $offset + $self->_fileobj->{file_offset}, SEEK_SET);
     }
 
     print( $fh $sig . pack($self->{data_pack}, $size) . $content );
@@ -285,13 +281,15 @@ sub load_tag {
     # Given offset, load single tag and return signature, size and data
     ##
     my $self = shift;
-    my ($obj, $offset) = @_;
+    my ($offset) = @_;
+
+    local($/,$\);
 
 #    print join(':',map{$_||''}caller(1)), $/;
 
-    my $fh = $obj->_fh;
+    my $fh = $self->_fh;
 
-    seek($fh, $offset + $obj->_fileobj->{file_offset}, SEEK_SET);
+    seek($fh, $offset + $self->_fileobj->{file_offset}, SEEK_SET);
 
     #XXX I'm not sure this check will work if autoflush isn't enabled ...
     return if eof $fh;
@@ -352,7 +350,7 @@ sub _get_dbm_object {
 
 sub _length_needed {
     my $self = shift;
-    my ($obj, $value, $key) = @_;
+    my ($value, $key) = @_;
 
     my $is_dbm_deep = eval {
         local $SIG{'__DIE__'};
@@ -362,12 +360,12 @@ sub _length_needed {
     my $len = SIG_SIZE + $self->{data_size}
             + $self->{data_size} + length( $key );
 
-    if ( $is_dbm_deep && $value->_fileobj eq $obj->_fileobj ) {
+    if ( $is_dbm_deep && $value->_fileobj eq $self->_fileobj ) {
         return $len + $self->{long_size};
     }
 
     my $r = Scalar::Util::reftype( $value ) || '';
-    if ( $obj->_fileobj->{autobless} ) {
+    if ( $self->_fileobj->{autobless} ) {
         # This is for the bit saying whether or not this thing is blessed.
         $len += 1;
     }
@@ -383,7 +381,7 @@ sub _length_needed {
 
     # if autobless is enabled, must also take into consideration
     # the class name as it is stored after the key.
-    if ( $obj->_fileobj->{autobless} ) {
+    if ( $self->_fileobj->{autobless} ) {
         my $c = Scalar::Util::blessed($value);
         if ( defined $c && !$is_dbm_deep ) {
             $len += $self->{data_size} + length($c);
@@ -399,7 +397,10 @@ sub add_bucket {
     # plain (undigested) key and value.
     ##
     my $self = shift;
-    my ($obj, $tag, $md5, $plain_key, $value) = @_;
+    my ($tag, $md5, $plain_key, $value, $deleted, $orig_key) = @_;
+    $deleted ||= 0;
+
+    local($/,$\);
 
     # This verifies that only supported values will be stored.
     {
@@ -409,7 +410,7 @@ sub add_bucket {
         last if $r eq 'HASH';
         last if $r eq 'ARRAY';
 
-        $obj->_throw_error(
+        $self->_throw_error(
             "Storage of variables of type '$r' is not supported."
         );
     }
@@ -417,14 +418,20 @@ sub add_bucket {
     my $location = 0;
     my $result = 2;
 
-    my $root = $obj->_fileobj;
-    my $fh   = $obj->_fh;
+    my $root = $self->_fileobj;
+    my $fh   = $self->_fh;
+
+    my $actual_length = $self->_length_needed( $value, $plain_key );
 
-    my $actual_length = $self->_length_needed( $obj, $value, $plain_key );
+    #ACID - This is a mutation. Must only find the exact transaction
+    my ($subloc, $offset, $size,$is_deleted) = $self->_find_in_buckets( $tag, $md5, 1 );
 
-    my ($subloc, $offset, $size) = $self->_find_in_buckets( $tag, $md5 );
+    my @transactions;
+    if ( $self->_fileobj->transaction_id == 0 ) {
+        @transactions = $self->_fileobj->current_transactions;
+    }
 
-#    $self->_release_space( $obj, $size, $subloc );
+#    $self->_release_space( $size, $subloc );
     # Updating a known md5
 #XXX This needs updating to use _release_space
     if ( $subloc ) {
@@ -434,7 +441,7 @@ sub add_bucket {
             $location = $subloc;
         }
         else {
-            $location = $self->_request_space( $obj, $actual_length );
+            $location = $self->_request_space( $actual_length );
             seek(
                 $fh,
                 $tag->{offset} + $offset
@@ -443,37 +450,48 @@ sub add_bucket {
             );
             print( $fh pack($self->{long_pack}, $location ) );
             print( $fh pack($self->{long_pack}, $actual_length ) );
+            print( $fh pack('n n', $root->transaction_id, $deleted ) );
         }
     }
     # Adding a new md5
     elsif ( defined $offset ) {
-        $location = $self->_request_space( $obj, $actual_length );
+        $location = $self->_request_space( $actual_length );
 
         seek( $fh, $tag->{offset} + $offset + $root->{file_offset}, SEEK_SET );
         print( $fh $md5 . pack($self->{long_pack}, $location ) );
         print( $fh pack($self->{long_pack}, $actual_length ) );
+        print( $fh pack('n n', $root->transaction_id, $deleted ) );
+
+        for ( @transactions ) {
+            my $tag2 = $self->load_tag( $tag->{offset} - SIG_SIZE - $self->{data_size} );
+            $self->_fileobj->{transaction_id} = $_;
+            $self->add_bucket( $tag2, $md5, '', '', 1, $orig_key );
+            $self->_fileobj->{transaction_id} = 0;
+        }
     }
     # If bucket didn't fit into list, split into a new index level
     # split_index() will do the _request_space() call
     else {
-        $location = $self->split_index( $obj, $md5, $tag );
+        $location = $self->split_index( $md5, $tag );
     }
 
-    $self->write_value( $obj, $location, $plain_key, $value );
+    $self->write_value( $location, $plain_key, $value, $orig_key );
 
     return $result;
 }
 
 sub write_value {
     my $self = shift;
-    my ($obj, $location, $key, $value) = @_;
+    my ($location, $key, $value, $orig_key) = @_;
+
+    local($/,$\);
 
-    my $fh = $obj->_fh;
-    my $root = $obj->_fileobj;
+    my $fh = $self->_fh;
+    my $root = $self->_fileobj;
 
     my $dbm_deep_obj = _get_dbm_object( $value );
-    if ( $dbm_deep_obj && $dbm_deep_obj->_fileobj ne $obj->_fileobj ) {
-        $obj->_throw_error( "Cannot cross-reference. Use export() instead" );
+    if ( $dbm_deep_obj && $dbm_deep_obj->_fileobj ne $self->_fileobj ) {
+        $self->_throw_error( "Cannot cross-reference. Use export() instead" );
     }
 
     seek($fh, $location + $root->{file_offset}, SEEK_SET);
@@ -484,25 +502,25 @@ sub write_value {
     ##
     my $r = Scalar::Util::reftype( $value ) || '';
     if ( $dbm_deep_obj ) {
-        $self->write_tag( $obj, undef, SIG_INTERNAL,pack($self->{long_pack}, $dbm_deep_obj->_base_offset) );
+        $self->write_tag( undef, SIG_INTERNAL,pack($self->{long_pack}, $dbm_deep_obj->_base_offset) );
     }
     elsif ($r eq 'HASH') {
         if ( !$dbm_deep_obj && tied %{$value} ) {
-            $obj->_throw_error( "Cannot store something that is tied" );
+            $self->_throw_error( "Cannot store something that is tied" );
         }
-        $self->write_tag( $obj, undef, SIG_HASH, chr(0)x$self->{index_size} );
+        $self->write_tag( undef, SIG_HASH, chr(0)x$self->{index_size} );
     }
     elsif ($r eq 'ARRAY') {
         if ( !$dbm_deep_obj && tied @{$value} ) {
-            $obj->_throw_error( "Cannot store something that is tied" );
+            $self->_throw_error( "Cannot store something that is tied" );
         }
-        $self->write_tag( $obj, undef, SIG_ARRAY, chr(0)x$self->{index_size} );
+        $self->write_tag( undef, SIG_ARRAY, chr(0)x$self->{index_size} );
     }
     elsif (!defined($value)) {
-        $self->write_tag( $obj, undef, SIG_NULL, '' );
+        $self->write_tag( undef, SIG_NULL, '' );
     }
     else {
-        $self->write_tag( $obj, undef, SIG_DATA, $value );
+        $self->write_tag( undef, SIG_DATA, $value );
     }
 
     ##
@@ -540,6 +558,8 @@ sub write_value {
         tie %$value, 'DBM::Deep', {
             base_offset => $location,
             fileobj     => $root,
+            parent      => $self->{obj},
+            parent_key  => $orig_key,
         };
         %$value = %x;
     }
@@ -548,6 +568,8 @@ sub write_value {
         tie @$value, 'DBM::Deep', {
             base_offset => $location,
             fileobj     => $root,
+            parent      => $self->{obj},
+            parent_key  => $orig_key,
         };
         @$value = @x;
     }
@@ -557,30 +579,33 @@ sub write_value {
 
 sub split_index {
     my $self = shift;
-    my ($obj, $md5, $tag) = @_;
+    my ($md5, $tag) = @_;
 
-    my $fh = $obj->_fh;
-    my $root = $obj->_fileobj;
+    local($/,$\);
+
+    my $fh = $self->_fh;
+    my $root = $self->_fileobj;
 
     my $loc = $self->_request_space(
-        $obj, $self->tag_size( $self->{index_size} ),
+        $self->tag_size( $self->{index_size} ),
     );
 
     seek($fh, $tag->{ref_loc} + $root->{file_offset}, SEEK_SET);
     print( $fh pack($self->{long_pack}, $loc) );
 
     my $index_tag = $self->write_tag(
-        $obj, $loc, SIG_INDEX,
+        $loc, SIG_INDEX,
         chr(0)x$self->{index_size},
     );
 
     my $newtag_loc = $self->_request_space(
-        $obj, $self->tag_size( $self->{bucket_list_size} ),
+        $self->tag_size( $self->{bucket_list_size} ),
     );
 
     my $keys = $tag->{content}
              . $md5 . pack($self->{long_pack}, $newtag_loc)
-                    . pack($self->{long_pack}, 0);
+                    . pack($self->{long_pack}, 0)  # size
+                    . pack($self->{long_pack}, 0); # transaction ID
 
     my @newloc = ();
     BUCKET:
@@ -611,13 +636,13 @@ sub split_index {
         seek($fh, $index_tag->{offset} + ($num * $self->{long_size}) + $root->{file_offset}, SEEK_SET);
 
         my $loc = $self->_request_space(
-            $obj, $self->tag_size( $self->{bucket_list_size} ),
+            $self->tag_size( $self->{bucket_list_size} ),
         );
 
         print( $fh pack($self->{long_pack}, $loc) );
 
         my $blist_tag = $self->write_tag(
-            $obj, $loc, SIG_BLIST,
+            $loc, SIG_BLIST,
             chr(0)x$self->{bucket_list_size},
         );
 
@@ -628,7 +653,7 @@ sub split_index {
     }
 
     $self->_release_space(
-        $obj, $self->tag_size( $self->{bucket_list_size} ),
+        $self->tag_size( $self->{bucket_list_size} ),
         $tag->{offset} - SIG_SIZE - $self->{data_size},
     );
 
@@ -637,15 +662,17 @@ sub split_index {
 
 sub read_from_loc {
     my $self = shift;
-    my ($obj, $subloc) = @_;
+    my ($subloc, $orig_key) = @_;
+
+    local($/,$\);
 
-    my $fh = $obj->_fh;
+    my $fh = $self->_fh;
 
     ##
     # Found match -- seek to offset and read signature
     ##
     my $signature;
-    seek($fh, $subloc + $obj->_fileobj->{file_offset}, SEEK_SET);
+    seek($fh, $subloc + $self->_fileobj->{file_offset}, SEEK_SET);
     read( $fh, $signature, SIG_SIZE);
 
     ##
@@ -653,9 +680,11 @@ sub read_from_loc {
     ##
     if (($signature eq SIG_HASH) || ($signature eq SIG_ARRAY)) {
         my $new_obj = DBM::Deep->new({
-            type => $signature,
+            type        => $signature,
             base_offset => $subloc,
-            fileobj     => $obj->_fileobj,
+            fileobj     => $self->_fileobj,
+            parent      => $self->{obj},
+            parent_key  => $orig_key,
         });
 
         if ($new_obj->_fileobj->{autobless}) {
@@ -696,7 +725,7 @@ sub read_from_loc {
             read( $fh, $new_loc, $size );
             $new_loc = unpack( $self->{long_pack}, $new_loc );
 
-            return $self->read_from_loc( $obj, $new_loc );
+            return $self->read_from_loc( $new_loc, $orig_key );
         }
         else {
             return;
@@ -726,11 +755,12 @@ sub get_bucket_value {
     # Fetch single value given tag and MD5 digested key.
     ##
     my $self = shift;
-    my ($obj, $tag, $md5) = @_;
+    my ($tag, $md5, $orig_key) = @_;
 
-    my ($subloc, $offset, $size) = $self->_find_in_buckets( $tag, $md5 );
-    if ( $subloc ) {
-        return $self->read_from_loc( $obj, $subloc );
+    #ACID - This is a read. Can find exact or HEAD
+    my ($subloc, $offset, $size,$is_deleted) = $self->_find_in_buckets( $tag, $md5 );
+    if ( $subloc && !$is_deleted ) {
+        return $self->read_from_loc( $subloc, $orig_key );
     }
     return;
 }
@@ -740,13 +770,16 @@ sub delete_bucket {
     # Delete single key/value pair given tag and MD5 digested key.
     ##
     my $self = shift;
-    my ($obj, $tag, $md5) = @_;
+    my ($tag, $md5, $orig_key) = @_;
+
+    local($/,$\);
 
-    my ($subloc, $offset, $size) = $self->_find_in_buckets( $tag, $md5 );
+    #ACID - This is a mutation. Must only find the exact transaction
+    my ($subloc, $offset, $size) = $self->_find_in_buckets( $tag, $md5, 1 );
 #XXX This needs _release_space()
     if ( $subloc ) {
-        my $fh = $obj->_fh;
-        seek($fh, $tag->{offset} + $offset + $obj->_fileobj->{file_offset}, SEEK_SET);
+        my $fh = $self->_fh;
+        seek($fh, $tag->{offset} + $offset + $self->_fileobj->{file_offset}, SEEK_SET);
         print( $fh substr($tag->{content}, $offset + $self->{bucket_size} ) );
         print( $fh chr(0) x $self->{bucket_size} );
 
@@ -760,10 +793,11 @@ sub bucket_exists {
     # Check existence of single key given tag and MD5 digested key.
     ##
     my $self = shift;
-    my ($obj, $tag, $md5) = @_;
+    my ($tag, $md5) = @_;
 
-    my ($subloc, $offset, $size) = $self->_find_in_buckets( $tag, $md5 );
-    return $subloc && 1;
+    #ACID - This is a read. Can find exact or HEAD
+    my ($subloc, $offset, $size, $is_deleted) = $self->_find_in_buckets( $tag, $md5 );
+    return ($subloc && !$is_deleted) && 1;
 }
 
 sub find_bucket_list {
@@ -771,35 +805,37 @@ sub find_bucket_list {
     # Locate offset for bucket list, given digested key
     ##
     my $self = shift;
-    my ($obj, $md5, $args) = @_;
+    my ($offset, $md5, $args) = @_;
     $args = {} unless $args;
 
+    local($/,$\);
+
     ##
     # Locate offset for bucket list using digest index system
     ##
-    my $tag = $self->load_tag($obj, $obj->_base_offset)
-        or $obj->_throw_error( "INTERNAL ERROR - Cannot find tag" );
+    my $tag = $self->load_tag( $offset )
+        or $self->_throw_error( "INTERNAL ERROR - Cannot find tag" );
 
     my $ch = 0;
     while ($tag->{signature} ne SIG_BLIST) {
         my $num = ord substr($md5, $ch, 1);
 
         my $ref_loc = $tag->{offset} + ($num * $self->{long_size});
-        $tag = $self->index_lookup( $obj, $tag, $num );
+        $tag = $self->index_lookup( $tag, $num );
 
         if (!$tag) {
             return if !$args->{create};
 
             my $loc = $self->_request_space(
-                $obj, $self->tag_size( $self->{bucket_list_size} ),
+                $self->tag_size( $self->{bucket_list_size} ),
             );
 
-            my $fh = $obj->_fh;
-            seek($fh, $ref_loc + $obj->_fileobj->{file_offset}, SEEK_SET);
+            my $fh = $self->_fh;
+            seek($fh, $ref_loc + $self->_fileobj->{file_offset}, SEEK_SET);
             print( $fh pack($self->{long_pack}, $loc) );
 
             $tag = $self->write_tag(
-                $obj, $loc, SIG_BLIST,
+                $loc, SIG_BLIST,
                 chr(0)x$self->{bucket_list_size},
             );
 
@@ -821,7 +857,7 @@ sub index_lookup {
     # Given index tag, lookup single entry in index and return .
     ##
     my $self = shift;
-    my ($obj, $tag, $index) = @_;
+    my ($tag, $index) = @_;
 
     my $location = unpack(
         $self->{long_pack},
@@ -834,7 +870,7 @@ sub index_lookup {
 
     if (!$location) { return; }
 
-    return $self->load_tag( $obj, $location );
+    return $self->load_tag( $location );
 }
 
 sub traverse_index {
@@ -844,9 +880,11 @@ sub traverse_index {
     my $self = shift;
     my ($obj, $offset, $ch, $force_return_next) = @_;
 
-    my $tag = $self->load_tag($obj, $offset );
+    local($/,$\);
 
-    my $fh = $obj->_fh;
+    my $tag = $self->load_tag( $offset );
+
+    my $fh = $self->_fh;
 
     if ($tag->{signature} ne SIG_BLIST) {
         my $content = $tag->{content};
@@ -896,7 +934,7 @@ sub traverse_index {
             }
             # Seek to bucket location and skip over signature
             elsif ($obj->{return_next}) {
-                seek($fh, $subloc + $obj->_fileobj->{file_offset}, SEEK_SET);
+                seek($fh, $subloc + $self->_fileobj->{file_offset}, SEEK_SET);
 
                 # Skip over value to get to plain key
                 my $sig;
@@ -951,8 +989,10 @@ sub _get_key_subloc {
     my $self = shift;
     my ($keys, $idx) = @_;
 
-    my ($key, $subloc, $size) = unpack(
-        "a$self->{hash_size} $self->{long_pack} $self->{long_pack}",
+    my ($key, $subloc, $size, $transaction_id, $is_deleted) = unpack(
+        # This is 'a', not 'A'. Please read the pack() documentation for the
+        # difference between the two and why it's important.
+        "a$self->{hash_size} $self->{long_pack}2 n2",
         substr(
             $keys,
             ($idx * $self->{bucket_size}),
@@ -960,43 +1000,48 @@ sub _get_key_subloc {
         ),
     );
 
-    return ($key, $subloc, $size);
+    return ($key, $subloc, $size, $transaction_id, $is_deleted);
 }
 
 sub _find_in_buckets {
     my $self = shift;
-    my ($tag, $md5) = @_;
+    my ($tag, $md5, $exact) = @_;
+
+    my $trans_id = $self->_fileobj->transaction_id;
+
+    my @zero;
 
     BUCKET:
     for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
-        my ($key, $subloc, $size) = $self->_get_key_subloc(
+        my ($key, $subloc, $size, $transaction_id, $is_deleted) = $self->_get_key_subloc(
             $tag->{content}, $i,
         );
 
-        return ($subloc, $i * $self->{bucket_size}, $size) unless $subloc;
+        my @rv = ($subloc, $i * $self->{bucket_size}, $size, $is_deleted);
+
+        unless ( $subloc ) {
+            if ( !$exact && @zero and $trans_id ) {
+                @rv = ($zero[2], $zero[0] * $self->{bucket_size},$zero[3],$is_deleted);
+            }
+            return @rv;
+        }
 
         next BUCKET if $key ne $md5;
 
-        return ($subloc, $i * $self->{bucket_size}, $size);
+        # Save off the HEAD in case we need it.
+        @zero = ($i,$key,$subloc,$size,$transaction_id,$is_deleted) if $transaction_id == 0;
+
+        next BUCKET if $transaction_id != $trans_id;
+
+        return @rv;
     }
 
     return;
 }
 
-#sub _print_at {
-#    my $self = shift;
-#    my ($obj, $spot, $data) = @_;
-#
-#    my $fh = $obj->_fh;
-#    seek( $fh, $spot, SEEK_SET );
-#    print( $fh $data );
-#
-#    return;
-#}
-
 sub _request_space {
     my $self = shift;
-    my ($obj, $size) = @_;
+    my ($size) = @_;
 
     my $loc = $self->_fileobj->{end};
     $self->_fileobj->{end} += $size;
@@ -1006,12 +1051,14 @@ sub _request_space {
 
 sub _release_space {
     my $self = shift;
-    my ($obj, $size, $loc) = @_;
+    my ($size, $loc) = @_;
+
+    local($/,$\);
 
     my $next_loc = 0;
 
-    my $fh = $obj->_fh;
-    seek( $fh, $loc + $obj->_fileobj->{file_offset}, SEEK_SET );
+    my $fh = $self->_fh;
+    seek( $fh, $loc + $self->_fileobj->{file_offset}, SEEK_SET );
     print( $fh SIG_FREE
         . pack($self->{long_pack}, $size )
         . pack($self->{long_pack}, $next_loc )
@@ -1020,6 +1067,10 @@ sub _release_space {
     return;
 }
 
+sub _throw_error {
+    die "DBM::Deep: $_[1]\n";
+}
+
 1;
 __END__
 
@@ -1027,10 +1078,12 @@ __END__
 # attempt at refactoring on the physical level instead of the virtual level.
 sub _read_at {
     my $self = shift;
-    my ($obj, $spot, $amount, $unpack) = @_;
+    my ($spot, $amount, $unpack) = @_;
 
-    my $fh = $obj->_fh;
-    seek( $fh, $spot + $obj->_fileobj->{file_offset}, SEEK_SET );
+    local($/,$\);
+
+    my $fh = $self->_fh;
+    seek( $fh, $spot + $self->_fileobj->{file_offset}, SEEK_SET );
 
     my $buffer;
     my $bytes_read = read( $fh, $buffer, $amount );
@@ -1046,3 +1099,48 @@ sub _read_at {
         return $buffer;
     }
 }
+
+sub _print_at {
+    my $self = shift;
+    my ($spot, $data) = @_;
+
+    local($/,$\);
+
+    my $fh = $self->_fh;
+    seek( $fh, $spot, SEEK_SET );
+    print( $fh $data );
+
+    return;
+}
+
+sub get_file_version {
+    my $self = shift;
+
+    local($/,$\);
+
+    my $fh = $self->_fh;
+
+    seek( $fh, 13 + $self->_fileobj->{file_offset}, SEEK_SET );
+    my $buffer;
+    my $bytes_read = read( $fh, $buffer, 4 );
+    unless ( $bytes_read == 4 ) {
+        $self->_throw_error( "Cannot read file version" );
+    }
+
+    return unpack( 'N', $buffer );
+}
+
+sub write_file_version {
+    my $self = shift;
+    my ($new_version) = @_;
+
+    local($/,$\);
+
+    my $fh = $self->_fh;
+
+    seek( $fh, 13 + $self->_fileobj->{file_offset}, SEEK_SET );
+    print( $fh pack( 'N', $new_version ) );
+
+    return;
+}
+