More failing tests, particularly for keys() and transactions.
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
index 0cc4bcb..e9d0711 100644 (file)
@@ -1,13 +1,24 @@
 package DBM::Deep::Engine;
 
+use 5.6.0;
+
 use strict;
+use warnings;
+
+use Fcntl qw( :DEFAULT :flock );
+use Scalar::Util ();
 
-use Fcntl qw( :DEFAULT :flock :seek );
+# File-wide notes:
+# * To add to bucket_size, make sure you modify the following:
+#   - calculate_sizes()
+#   - _get_key_subloc()
+#   - add_bucket() - where the buckets are printed
 
 ##
 # Setup file and tag signatures.  These should never change.
 ##
 sub SIG_FILE     () { 'DPDB' }
+sub SIG_HEADER   () { 'h'    }
 sub SIG_INTERNAL () { 'i'    }
 sub SIG_HASH     () { 'H'    }
 sub SIG_ARRAY    () { 'A'    }
@@ -18,62 +29,6 @@ sub SIG_BLIST    () { 'B'    }
 sub SIG_FREE     () { 'F'    }
 sub SIG_SIZE     () {  1     }
 
-sub precalc_sizes {
-    ##
-    # Precalculate index, bucket and bucket list sizes
-    ##
-    my $self = shift;
-
-    $self->{index_size}       = (2**8) * $self->{long_size};
-    $self->{bucket_size}      = $self->{hash_size} + $self->{long_size} * 2;
-    $self->{bucket_list_size} = $self->{max_buckets} * $self->{bucket_size};
-
-    return 1;
-}
-
-sub set_pack {
-    ##
-    # Set pack/unpack modes (see file header for more)
-    ##
-    my $self = shift;
-    my ($long_s, $long_p, $data_s, $data_p) = @_;
-
-    ##
-    # Set to 4 and 'N' for 32-bit offset tags (default).  Theoretical limit of 4
-    # GB per file.
-    #    (Perl must be compiled with largefile support for files > 2 GB)
-    #
-    # Set to 8 and 'Q' for 64-bit offsets.  Theoretical limit of 16 XB per file.
-    #    (Perl must be compiled with largefile and 64-bit long support)
-    ##
-    $self->{long_size} = $long_s ? $long_s : 4;
-    $self->{long_pack} = $long_p ? $long_p : 'N';
-
-    ##
-    # Set to 4 and 'N' for 32-bit data length prefixes.  Limit of 4 GB for each
-    # key/value. Upgrading this is possible (see above) but probably not
-    # necessary. If you need more than 4 GB for a single key or value, this
-    # module is really not for you :-)
-    ##
-    $self->{data_size} = $data_s ? $data_s : 4;
-    $self->{data_pack} = $data_p ? $data_p : 'N';
-
-    return $self->precalc_sizes();
-}
-
-sub set_digest {
-    ##
-    # Set key digest function (default is MD5)
-    ##
-    my $self = shift;
-    my ($digest_func, $hash_size) = @_;
-
-    $self->{digest} = $digest_func ? $digest_func : \&Digest::MD5::md5;
-    $self->{hash_size} = $hash_size ? $hash_size : 16;
-
-    return $self->precalc_sizes();
-}
-
 sub new {
     my $class = shift;
     my ($args) = @_;
@@ -89,47 +44,151 @@ sub new {
 
         ##
         # Maximum number of buckets per list before another level of indexing is
-        # done.
-        # Increase this value for slightly greater speed, but larger database
+        # done. Increase this value for slightly greater speed, but larger database
         # files. DO NOT decrease this value below 16, due to risk of recursive
         # reindex overrun.
         ##
         max_buckets => 16,
+
+        fileobj => undef,
+        obj     => undef,
     }, $class;
 
-    $self->precalc_sizes;
+    if ( defined $args->{pack_size} ) {
+        if ( lc $args->{pack_size} eq 'small' ) {
+            $args->{long_size} = 2;
+            $args->{long_pack} = 'n';
+        }
+        elsif ( lc $args->{pack_size} eq 'medium' ) {
+            $args->{long_size} = 4;
+            $args->{long_pack} = 'N';
+        }
+        elsif ( lc $args->{pack_size} eq 'large' ) {
+            $args->{long_size} = 8;
+            $args->{long_pack} = 'Q';
+        }
+        else {
+            die "Unknown pack_size value: '$args->{pack_size}'\n";
+        }
+    }
+
+    # Grab the parameters we want to use
+    foreach my $param ( keys %$self ) {
+        next unless exists $args->{$param};
+        $self->{$param} = $args->{$param};
+    }
+    Scalar::Util::weaken( $self->{obj} ) if $self->{obj};
+
+    if ( $self->{max_buckets} < 16 ) {
+        warn "Floor of max_buckets is 16. Setting it to 16 from '$self->{max_buckets}'\n";
+        $self->{max_buckets} = 16;
+    }
 
     return $self;
 }
 
+sub _fileobj { return $_[0]{fileobj} }
+
+sub calculate_sizes {
+    my $self = shift;
+
+    # The 2**8 here indicates the number of different characters in the
+    # current hashing algorithm
+    #XXX Does this need to be updated with different hashing algorithms?
+    $self->{index_size}       = (2**8) * $self->{long_size};
+    $self->{bucket_size}      = $self->{hash_size} + $self->{long_size} * 3;
+    $self->{bucket_list_size} = $self->{max_buckets} * $self->{bucket_size};
+
+    return;
+}
+
+sub write_file_header {
+    my $self = shift;
+
+    my $loc = $self->_fileobj->request_space( length( SIG_FILE ) + 21 );
+
+    $self->_fileobj->print_at( $loc,
+        SIG_FILE,
+        SIG_HEADER,
+        pack('N', 1),  # header version
+        pack('N', 12), # header size
+        pack('N', 0),  # currently running transaction IDs
+        pack('n', $self->{long_size}),
+        pack('A', $self->{long_pack}),
+        pack('n', $self->{data_size}),
+        pack('A', $self->{data_pack}),
+        pack('n', $self->{max_buckets}),
+    );
+
+    $self->_fileobj->set_transaction_offset( 13 );
+
+    return;
+}
+
+sub read_file_header {
+    my $self = shift;
+
+    my $buffer = $self->_fileobj->read_at( 0, length(SIG_FILE) + 9 );
+    return unless length($buffer);
+
+    my ($file_signature, $sig_header, $header_version, $size) = unpack(
+        'A4 A N N', $buffer
+    );
+
+    unless ( $file_signature eq SIG_FILE ) {
+        $self->_fileobj->close;
+        $self->_throw_error( "Signature not found -- file is not a Deep DB" );
+    }
+
+    unless ( $sig_header eq SIG_HEADER ) {
+        $self->_fileobj->close;
+        $self->_throw_error( "Old file version found." );
+    }
+
+    my $buffer2 = $self->_fileobj->read_at( undef, $size );
+    my ($running_transactions, @values) = unpack( 'N n A n A n', $buffer2 );
+
+    $self->_fileobj->set_transaction_offset( 13 );
+
+    if ( @values < 5 || grep { !defined } @values ) {
+        $self->_fileobj->close;
+        $self->_throw_error("Corrupted file - bad header");
+    }
+
+    #XXX Add warnings if values weren't set right
+    @{$self}{qw(long_size long_pack data_size data_pack max_buckets)} = @values;
+
+    return length($buffer) + length($buffer2);
+}
+
 sub setup_fh {
     my $self = shift;
     my ($obj) = @_;
 
-    $self->open( $obj ) if !defined $obj->_fh;
-
-    my $fh = $obj->_fh;
+    # Need to remove use of $fh here
+    my $fh = $self->_fileobj->{fh};
     flock $fh, LOCK_EX;
 
+    #XXX The duplication of calculate_sizes needs to go away
     unless ( $obj->{base_offset} ) {
-        seek($fh, 0 + $obj->_root->{file_offset}, SEEK_SET);
-        my $signature;
-        my $bytes_read = read( $fh, $signature, length(SIG_FILE));
+        my $bytes_read = $self->read_file_header;
+
+        $self->calculate_sizes;
 
         ##
-        # File is empty -- write signature and master index
+        # File is empty -- write header and master index
         ##
         if (!$bytes_read) {
-            my $loc = $self->_request_space( $obj, length( SIG_FILE ) );
-            seek($fh, $loc + $obj->_root->{file_offset}, SEEK_SET);
-            print( $fh SIG_FILE);
+            $self->_fileobj->audit( "# Database created on" );
 
-            $obj->{base_offset} = $self->_request_space(
-                $obj, $self->tag_size( $self->{index_size} ),
+            $self->write_file_header;
+
+            $obj->{base_offset} = $self->_fileobj->request_space(
+                $self->tag_size( $self->{index_size} ),
             );
 
             $self->write_tag(
-                $obj, $obj->_base_offset, $obj->_type,
+                $obj->_base_offset, $obj->_type,
                 chr(0)x$self->{index_size},
             );
 
@@ -142,80 +201,32 @@ sub setup_fh {
             $obj->{base_offset} = $bytes_read;
 
             ##
-            # Check signature was valid
+            # Get our type from master index header
             ##
-            unless ($signature eq SIG_FILE) {
-                $self->close_fh( $obj );
-                $obj->_throw_error("Signature not found -- file is not a Deep DB");
+            my $tag = $self->load_tag($obj->_base_offset);
+            unless ( $tag ) {
+                flock $fh, LOCK_UN;
+                $self->_throw_error("Corrupted file, no master index record");
             }
 
-            ##
-            # Get our type from master index signature
-            ##
-            my $tag = $self->load_tag($obj, $obj->_base_offset)
-            or $obj->_throw_error("Corrupted file, no master index record");
-
-            unless ($obj->{type} eq $tag->{signature}) {
-                $obj->_throw_error("File type mismatch");
+            unless ($obj->_type eq $tag->{signature}) {
+                flock $fh, LOCK_UN;
+                $self->_throw_error("File type mismatch");
             }
         }
     }
+    else {
+        $self->calculate_sizes;
+    }
 
     #XXX We have to make sure we don't mess up when autoflush isn't turned on
-    unless ( $obj->_root->{inode} ) {
-        my @stats = stat($obj->_fh);
-        $obj->_root->{inode} = $stats[1];
-        $obj->_root->{end} = $stats[7];
-    }
+    $self->_fileobj->set_inode;
 
     flock $fh, LOCK_UN;
 
     return 1;
 }
 
-sub open {
-    ##
-    # Open a fh to the database, create if nonexistent.
-    # Make sure file signature matches DBM::Deep spec.
-    ##
-    my $self = shift;
-    my ($obj) = @_;
-
-    # Theoretically, adding O_BINARY should remove the need for the binmode
-    # Of course, testing it is going to be ... interesting.
-    my $flags = O_RDWR | O_CREAT | O_BINARY;
-
-    my $fh;
-    my $filename = $obj->_root->{file};
-    sysopen( $fh, $filename, $flags )
-        or $obj->_throw_error("Cannot sysopen file '$filename': $!");
-    $obj->_root->{fh} = $fh;
-
-    #XXX Can we remove this by using the right sysopen() flags?
-    # Maybe ... q.v. above
-    binmode $fh; # for win32
-
-    if ($obj->_root->{autoflush}) {
-        my $old = select $fh;
-        $|=1;
-        select $old;
-    }
-
-    return 1;
-}
-
-sub close_fh {
-    my $self = shift;
-    my ($obj) = @_;
-
-    if ( my $fh = $obj->_root->{fh} ) {
-        close $fh;
-    }
-    $obj->_root->{fh} = undef;
-
-    return 1;
-}
-
 sub tag_size {
     my $self = shift;
     my ($size) = @_;
@@ -227,16 +238,13 @@ sub write_tag {
     # Given offset, signature and content, create tag and write to disk
     ##
     my $self = shift;
-    my ($obj, $offset, $sig, $content) = @_;
+    my ($offset, $sig, $content) = @_;
     my $size = length( $content );
 
-    my $fh = $obj->_fh;
-
-    if ( defined $offset ) {
-        seek($fh, $offset + $obj->_root->{file_offset}, SEEK_SET);
-    }
-
-    print( $fh $sig . pack($self->{data_pack}, $size) . $content );
+    $self->_fileobj->print_at(
+        $offset, 
+        $sig, pack($self->{data_pack}, $size), $content,
+    );
 
     return unless defined $offset;
 
@@ -253,23 +261,15 @@ sub load_tag {
     # Given offset, load single tag and return signature, size and data
     ##
     my $self = shift;
-    my ($obj, $offset) = @_;
-
-#    print join(':',map{$_||''}caller(1)), $/;
+    my ($offset) = @_;
 
-    my $fh = $obj->_fh;
+    my $fileobj = $self->_fileobj;
 
-    seek($fh, $offset + $obj->_root->{file_offset}, SEEK_SET);
-
-    #XXX I'm not sure this check will work if autoflush isn't enabled ...
-    return if eof $fh;
-
-    my $b;
-    read( $fh, $b, SIG_SIZE + $self->{data_size} );
+    my $s = SIG_SIZE + $self->{data_size};
+    my $b = $fileobj->read_at( $offset, $s );
     my ($sig, $size) = unpack( "A $self->{data_pack}", $b );
 
-    my $buffer;
-    read( $fh, $buffer, $size);
+    my $buffer = $fileobj->read_at( undef, $size );
 
     return {
         signature => $sig,
@@ -279,28 +279,70 @@ sub load_tag {
     };
 }
 
+sub _get_dbm_object {
+    my $item = shift;
+
+    my $obj = eval {
+        local $SIG{__DIE__};
+        if ($item->isa( 'DBM::Deep' )) {
+            return $item;
+        }
+        return;
+    };
+    return $obj if $obj;
+
+    my $r = Scalar::Util::reftype( $item ) || '';
+    if ( $r eq 'HASH' ) {
+        my $obj = eval {
+            local $SIG{__DIE__};
+            my $obj = tied(%$item);
+            if ($obj->isa( 'DBM::Deep' )) {
+                return $obj;
+            }
+            return;
+        };
+        return $obj if $obj;
+    }
+    elsif ( $r eq 'ARRAY' ) {
+        my $obj = eval {
+            local $SIG{__DIE__};
+            my $obj = tied(@$item);
+            if ($obj->isa( 'DBM::Deep' )) {
+                return $obj;
+            }
+            return;
+        };
+        return $obj if $obj;
+    }
+
+    return;
+}
+
 sub _length_needed {
     my $self = shift;
-    my ($obj, $value, $key) = @_;
+    my ($value, $key) = @_;
 
     my $is_dbm_deep = eval {
         local $SIG{'__DIE__'};
         $value->isa( 'DBM::Deep' );
     };
 
-    my $len = SIG_SIZE + $self->{data_size}
-            + $self->{data_size} + length( $key );
+    my $len = SIG_SIZE
+            + $self->{data_size} # size for value
+            + $self->{data_size} # size for key
+            + length( $key );    # length of key
 
-    if ( $is_dbm_deep && $value->_root eq $obj->_root ) {
+    if ( $is_dbm_deep && $value->_fileobj eq $self->_fileobj ) {
+        # long_size is for the internal reference
         return $len + $self->{long_size};
     }
 
-    my $r = Scalar::Util::reftype( $value ) || '';
-    if ( $obj->_root->{autobless} ) {
+    if ( $self->_fileobj->{autobless} ) {
         # This is for the bit saying whether or not this thing is blessed.
         $len += 1;
     }
 
+    my $r = Scalar::Util::reftype( $value ) || '';
     unless ( $r eq 'HASH' || $r eq 'ARRAY' ) {
         if ( defined $value ) {
             $len += length( $value );
@@ -312,10 +354,10 @@ sub _length_needed {
 
     # if autobless is enabled, must also take into consideration
     # the class name as it is stored after the key.
-    if ( $obj->_root->{autobless} ) {
-        my $value_class = Scalar::Util::blessed($value);
-        if ( defined $value_class && !$is_dbm_deep ) {
-            $len += $self->{data_size} + length($value_class);
+    if ( $self->_fileobj->{autobless} ) {
+        my $c = Scalar::Util::blessed($value);
+        if ( defined $c && !$is_dbm_deep ) {
+            $len += $self->{data_size} + length($c);
         }
     }
 
@@ -328,7 +370,10 @@ sub add_bucket {
     # plain (undigested) key and value.
     ##
     my $self = shift;
-    my ($obj, $tag, $md5, $plain_key, $value) = @_;
+    my ($tag, $md5, $plain_key, $value, $deleted, $orig_key) = @_;
+    $deleted ||= 0;
+
+    local($/,$\);
 
     # This verifies that only supported values will be stored.
     {
@@ -338,7 +383,7 @@ sub add_bucket {
         last if $r eq 'HASH';
         last if $r eq 'ARRAY';
 
-        $obj->_throw_error(
+        $self->_throw_error(
             "Storage of variables of type '$r' is not supported."
         );
     }
@@ -346,14 +391,19 @@ sub add_bucket {
     my $location = 0;
     my $result = 2;
 
-    my $root = $obj->_root;
-    my $fh   = $obj->_fh;
+    my $fileobj = $self->_fileobj;
+
+    my $actual_length = $self->_length_needed( $value, $plain_key );
 
-    my $actual_length = $self->_length_needed( $obj, $value, $plain_key );
+    #ACID - This is a mutation. Must only find the exact transaction
+    my ($subloc, $offset, $size,$is_deleted) = $self->_find_in_buckets( $tag, $md5, 1 );
 
-    my ($subloc, $offset, $size) = $self->_find_in_buckets( $tag, $md5 );
+    my @transactions;
+    if ( $fileobj->transaction_id == 0 ) {
+        @transactions = $fileobj->current_transactions;
+    }
 
-#    $self->_release_space( $obj, $size, $subloc );
+#    $self->_release_space( $size, $subloc );
     # Updating a known md5
 #XXX This needs updating to use _release_space
     if ( $subloc ) {
@@ -363,128 +413,130 @@ sub add_bucket {
             $location = $subloc;
         }
         else {
-            $location = $self->_request_space( $obj, $actual_length );
-            seek(
-                $fh,
-                $tag->{offset} + $offset
-              + $self->{hash_size} + $root->{file_offset},
-                SEEK_SET,
+            $location = $fileobj->request_space( $actual_length );
+
+            $fileobj->print_at( $tag->{offset} + $offset + $self->{hash_size},
+                pack($self->{long_pack}, $location ),
+                pack($self->{long_pack}, $actual_length ),
+                pack('n n', $fileobj->transaction_id, $deleted ),
             );
-            print( $fh pack($self->{long_pack}, $location ) );
-            print( $fh pack($self->{long_pack}, $actual_length ) );
         }
     }
     # Adding a new md5
     elsif ( defined $offset ) {
-        $location = $self->_request_space( $obj, $actual_length );
+        $location = $fileobj->request_space( $actual_length );
+
+        $fileobj->print_at( $tag->{offset} + $offset,
+            $md5,
+            pack($self->{long_pack}, $location ),
+            pack($self->{long_pack}, $actual_length ),
+            pack('n n', $fileobj->transaction_id, $deleted ),
+        );
 
-        seek( $fh, $tag->{offset} + $offset + $root->{file_offset}, SEEK_SET );
-        print( $fh $md5 . pack($self->{long_pack}, $location ) );
-        print( $fh pack($self->{long_pack}, $actual_length ) );
+        for ( @transactions ) {
+            my $tag2 = $self->load_tag( $tag->{offset} - SIG_SIZE - $self->{data_size} );
+            $fileobj->{transaction_id} = $_;
+            $self->add_bucket( $tag2, $md5, '', '', 1, $orig_key );
+            $fileobj->{transaction_id} = 0;
+        }
+        $tag = $self->load_tag( $tag->{offset} - SIG_SIZE - $self->{data_size} );
     }
     # If bucket didn't fit into list, split into a new index level
-    # split_index() will do the _request_space() call
+    # split_index() will do the _fileobj->request_space() call
     else {
-        $location = $self->split_index( $obj, $md5, $tag );
+        $location = $self->split_index( $md5, $tag );
     }
 
-    $self->write_value( $obj, $location, $plain_key, $value );
+    $self->write_value( $location, $plain_key, $value, $orig_key );
 
     return $result;
 }
 
 sub write_value {
     my $self = shift;
-    my ($obj, $location, $key, $value) = @_;
-
-    my $fh = $obj->_fh;
-    my $root = $obj->_root;
-
-    my $is_dbm_deep = eval {
-        local $SIG{'__DIE__'};
-        $value->isa( 'DBM::Deep' );
-    };
+    my ($location, $key, $value, $orig_key) = @_;
 
-    my $is_internal_ref = $is_dbm_deep && ($value->_root eq $root);
+    my $fileobj = $self->_fileobj;
 
-    seek($fh, $location + $root->{file_offset}, SEEK_SET);
+    my $dbm_deep_obj = _get_dbm_object( $value );
+    if ( $dbm_deep_obj && $dbm_deep_obj->_fileobj ne $fileobj ) {
+        $self->_throw_error( "Cannot cross-reference. Use export() instead" );
+    }
 
     ##
     # Write signature based on content type, set content length and write
     # actual value.
     ##
-    my $r = Scalar::Util::reftype($value) || '';
-    if ( $is_internal_ref ) {
-        $self->write_tag( $obj, undef, SIG_INTERNAL,pack($self->{long_pack}, $value->_base_offset) );
+    my $r = Scalar::Util::reftype( $value ) || '';
+    if ( $dbm_deep_obj ) {
+        $self->write_tag( $location, SIG_INTERNAL,pack($self->{long_pack}, $dbm_deep_obj->_base_offset) );
     }
     elsif ($r eq 'HASH') {
-        if ( tied( %{$value} ) ) {
-            $obj->_throw_error( "Cannot store something that is tied" );
+        if ( !$dbm_deep_obj && tied %{$value} ) {
+            $self->_throw_error( "Cannot store something that is tied" );
         }
-        $self->write_tag( $obj, undef, SIG_HASH, chr(0)x$self->{index_size} );
+        $self->write_tag( $location, SIG_HASH, chr(0)x$self->{index_size} );
     }
     elsif ($r eq 'ARRAY') {
-        if ( tied( @{$value} ) ) {
-            $obj->_throw_error( "Cannot store something that is tied" );
+        if ( !$dbm_deep_obj && tied @{$value} ) {
+            $self->_throw_error( "Cannot store something that is tied" );
         }
-        $self->write_tag( $obj, undef, SIG_ARRAY, chr(0)x$self->{index_size} );
+        $self->write_tag( $location, SIG_ARRAY, chr(0)x$self->{index_size} );
     }
     elsif (!defined($value)) {
-        $self->write_tag( $obj, undef, SIG_NULL, '' );
+        $self->write_tag( $location, SIG_NULL, '' );
     }
     else {
-        $self->write_tag( $obj, undef, SIG_DATA, $value );
+        $self->write_tag( $location, SIG_DATA, $value );
     }
 
     ##
     # Plain key is stored AFTER value, as keys are typically fetched less often.
     ##
-    print( $fh pack($self->{data_pack}, length($key)) . $key );
+    $fileobj->print_at( undef, pack($self->{data_pack}, length($key)) . $key );
 
     # Internal references don't care about autobless
-    return 1 if $is_internal_ref;
+    return 1 if $dbm_deep_obj;
 
     ##
     # If value is blessed, preserve class name
     ##
-    if ( $root->{autobless} ) {
-        my $value_class = Scalar::Util::blessed($value);
-        if ( defined $value_class && !$is_dbm_deep ) {
-            print( $fh chr(1) );
-            print( $fh pack($self->{data_pack}, length($value_class)) . $value_class );
+    if ( $fileobj->{autobless} ) {
+        if ( defined( my $c = Scalar::Util::blessed($value) ) ) {
+            $fileobj->print_at( undef, chr(1), pack($self->{data_pack}, length($c)) . $c );
         }
         else {
-            print( $fh chr(0) );
+            $fileobj->print_at( undef, chr(0) );
         }
     }
 
     ##
-    # If content is a hash or array, create new child DBM::Deep object and
-    # pass each key or element to it.
+    # Tie the passed in reference so that changes to it are reflected in the
+    # datafile. The use of $location as the base_offset will act as the
+    # the linkage between parent and child.
+    #
+    # The overall assignment is a hack around the fact that just tying doesn't
+    # store the values. This may not be the wrong thing to do.
     ##
-    if ( !$is_internal_ref ) {
-        if ($r eq 'HASH') {
-            my $branch = DBM::Deep->new(
-                type => DBM::Deep->TYPE_HASH,
-                base_offset => $location,
-                root => $root,
-            );
-            foreach my $key (keys %{$value}) {
-                $branch->STORE( $key, $value->{$key} );
-            }
-        }
-        elsif ($r eq 'ARRAY') {
-            my $branch = DBM::Deep->new(
-                type => DBM::Deep->TYPE_ARRAY,
-                base_offset => $location,
-                root => $root,
-            );
-            my $index = 0;
-            foreach my $element (@{$value}) {
-                $branch->STORE( $index, $element );
-                $index++;
-            }
-        }
+    if ($r eq 'HASH') {
+        my %x = %$value;
+        tie %$value, 'DBM::Deep', {
+            base_offset => $location,
+            fileobj     => $fileobj,
+            parent      => $self->{obj},
+            parent_key  => $orig_key,
+        };
+        %$value = %x;
+    }
+    elsif ($r eq 'ARRAY') {
+        my @x = @$value;
+        tie @$value, 'DBM::Deep', {
+            base_offset => $location,
+            fileobj     => $fileobj,
+            parent      => $self->{obj},
+            parent_key  => $orig_key,
+        };
+        @$value = @x;
     }
 
     return 1;
@@ -492,33 +544,34 @@ sub write_value {
 
 sub split_index {
     my $self = shift;
-    my ($obj, $md5, $tag) = @_;
+    my ($md5, $tag) = @_;
 
-    my $fh = $obj->_fh;
-    my $root = $obj->_root;
+    my $fileobj = $self->_fileobj;
 
-    my $loc = $self->_request_space(
-        $obj, $self->tag_size( $self->{index_size} ),
+    my $loc = $fileobj->request_space(
+        $self->tag_size( $self->{index_size} ),
     );
 
-    seek($fh, $tag->{ref_loc} + $root->{file_offset}, SEEK_SET);
-    print( $fh pack($self->{long_pack}, $loc) );
+    $fileobj->print_at( $tag->{ref_loc}, pack($self->{long_pack}, $loc) );
 
     my $index_tag = $self->write_tag(
-        $obj, $loc, SIG_INDEX,
+        $loc, SIG_INDEX,
         chr(0)x$self->{index_size},
     );
 
-    my $newtag_loc = $self->_request_space(
-        $obj, $self->tag_size( $self->{bucket_list_size} ),
+    my $newtag_loc = $fileobj->request_space(
+        $self->tag_size( $self->{bucket_list_size} ),
     );
 
     my $keys = $tag->{content}
              . $md5 . pack($self->{long_pack}, $newtag_loc)
-                    . pack($self->{long_pack}, 0);
+                    . pack($self->{long_pack}, 0)  # size
+                    . pack($self->{long_pack}, 0); # transaction ID
 
     my @newloc = ();
     BUCKET:
+    # The <= here is deliberate - we have max_buckets+1 keys to iterate
+    # through, unlike every other loop that uses max_buckets as a stop.
     for (my $i = 0; $i <= $self->{max_buckets}; $i++) {
         my ($key, $old_subloc, $size) = $self->_get_key_subloc( $keys, $i );
 
@@ -528,42 +581,42 @@ sub split_index {
         my $num = ord(substr($key, $tag->{ch} + 1, 1));
 
         if ($newloc[$num]) {
-            seek($fh, $newloc[$num] + $root->{file_offset}, SEEK_SET);
-            my $subkeys;
-            read( $fh, $subkeys, $self->{bucket_list_size});
+            my $subkeys = $fileobj->read_at( $newloc[$num], $self->{bucket_list_size} );
 
             # This is looking for the first empty spot
             my ($subloc, $offset, $size) = $self->_find_in_buckets(
                 { content => $subkeys }, '',
             );
 
-            seek($fh, $newloc[$num] + $offset + $root->{file_offset}, SEEK_SET);
-            print( $fh $key . pack($self->{long_pack}, $old_subloc) );
+            $fileobj->print_at(
+                $newloc[$num] + $offset,
+                $key, pack($self->{long_pack}, $old_subloc),
+            );
 
             next;
         }
 
-        seek($fh, $index_tag->{offset} + ($num * $self->{long_size}) + $root->{file_offset}, SEEK_SET);
-
-        my $loc = $self->_request_space(
-            $obj, $self->tag_size( $self->{bucket_list_size} ),
+        my $loc = $fileobj->request_space(
+            $self->tag_size( $self->{bucket_list_size} ),
         );
 
-        print( $fh pack($self->{long_pack}, $loc) );
+        $fileobj->print_at(
+            $index_tag->{offset} + ($num * $self->{long_size}),
+            pack($self->{long_pack}, $loc),
+        );
 
         my $blist_tag = $self->write_tag(
-            $obj, $loc, SIG_BLIST,
+            $loc, SIG_BLIST,
             chr(0)x$self->{bucket_list_size},
         );
 
-        seek($fh, $blist_tag->{offset} + $root->{file_offset}, SEEK_SET);
-        print( $fh $key . pack($self->{long_pack}, $old_subloc) );
+        $fileobj->print_at( $blist_tag->{offset}, $key . pack($self->{long_pack}, $old_subloc) );
 
         $newloc[$num] = $blist_tag->{offset};
     }
 
     $self->_release_space(
-        $obj, $self->tag_size( $self->{bucket_list_size} ),
+        $self->tag_size( $self->{bucket_list_size} ),
         $tag->{offset} - SIG_SIZE - $self->{data_size},
     );
 
@@ -572,64 +625,59 @@ sub split_index {
 
 sub read_from_loc {
     my $self = shift;
-    my ($obj, $subloc) = @_;
+    my ($subloc, $orig_key) = @_;
 
-    my $fh = $obj->_fh;
+    my $fileobj = $self->_fileobj;
 
-    ##
-    # Found match -- seek to offset and read signature
-    ##
-    my $signature;
-    seek($fh, $subloc + $obj->_root->{file_offset}, SEEK_SET);
-    read( $fh, $signature, SIG_SIZE);
+    my $signature = $fileobj->read_at( $subloc, SIG_SIZE );
 
     ##
     # If value is a hash or array, return new DBM::Deep object with correct offset
     ##
     if (($signature eq SIG_HASH) || ($signature eq SIG_ARRAY)) {
-        my $obj = DBM::Deep->new(
-            type => $signature,
+        my $new_obj = DBM::Deep->new({
+            type        => $signature,
             base_offset => $subloc,
-            root => $obj->_root,
-        );
+            fileobj     => $self->_fileobj,
+            parent      => $self->{obj},
+            parent_key  => $orig_key,
+        });
 
-        if ($obj->_root->{autobless}) {
+        if ($new_obj->_fileobj->{autobless}) {
             ##
             # Skip over value and plain key to see if object needs
             # to be re-blessed
             ##
-            seek($fh, $self->{data_size} + $self->{index_size}, SEEK_CUR);
+            $fileobj->increment_pointer( $self->{data_size} + $self->{index_size} );
 
-            my $size;
-            read( $fh, $size, $self->{data_size}); $size = unpack($self->{data_pack}, $size);
-            if ($size) { seek($fh, $size, SEEK_CUR); }
+            my $size = $fileobj->read_at( undef, $self->{data_size} );
+            $size = unpack($self->{data_pack}, $size);
+            if ($size) { $fileobj->increment_pointer( $size ); }
 
-            my $bless_bit;
-            read( $fh, $bless_bit, 1);
+            my $bless_bit = $fileobj->read_at( undef, 1 );
             if (ord($bless_bit)) {
                 ##
                 # Yes, object needs to be re-blessed
                 ##
+                my $size = $fileobj->read_at( undef, $self->{data_size} );
+                $size = unpack($self->{data_pack}, $size);
+
                 my $class_name;
-                read( $fh, $size, $self->{data_size}); $size = unpack($self->{data_pack}, $size);
-                if ($size) { read( $fh, $class_name, $size); }
-                if ($class_name) { $obj = bless( $obj, $class_name ); }
+                if ($size) { $class_name = $fileobj->read_at( undef, $size ); }
+                if (defined $class_name) { $new_obj = bless( $new_obj, $class_name ); }
             }
         }
 
-        return $obj;
+        return $new_obj;
     }
     elsif ( $signature eq SIG_INTERNAL ) {
-        my $size;
-        read( $fh, $size, $self->{data_size});
+        my $size = $fileobj->read_at( undef, $self->{data_size} );
         $size = unpack($self->{data_pack}, $size);
 
         if ( $size ) {
-            my $new_loc;
-            read( $fh, $new_loc, $size );
-            $new_loc = unpack( $self->{long_pack}, $new_loc );
-
-            return $self->read_from_loc( $obj, $new_loc );
+            my $new_loc = $fileobj->read_at( undef, $size );
+            $new_loc = unpack( $self->{long_pack}, $new_loc ); 
+            return $self->read_from_loc( $new_loc, $orig_key );
         }
         else {
             return;
@@ -638,13 +686,12 @@ sub read_from_loc {
     ##
     # Otherwise return actual value
     ##
-    elsif ($signature eq SIG_DATA) {
-        my $size;
-        read( $fh, $size, $self->{data_size});
+    elsif ( $signature eq SIG_DATA ) {
+        my $size = $fileobj->read_at( undef, $self->{data_size} );
         $size = unpack($self->{data_pack}, $size);
 
         my $value = '';
-        if ($size) { read( $fh, $value, $size); }
+        if ($size) { $value = $fileobj->read_at( undef, $size ); }
         return $value;
     }
 
@@ -659,12 +706,20 @@ sub get_bucket_value {
     # Fetch single value given tag and MD5 digested key.
     ##
     my $self = shift;
-    my ($obj, $tag, $md5) = @_;
+    my ($tag, $md5, $orig_key) = @_;
 
-    my ($subloc, $offset, $size) = $self->_find_in_buckets( $tag, $md5 );
-    if ( $subloc ) {
-        return $self->read_from_loc( $obj, $subloc );
+    #ACID - This is a read. Can find exact or HEAD
+    my ($subloc, $offset, $size, $is_deleted) = $self->_find_in_buckets( $tag, $md5 );
+
+    if ( !$subloc ) {
+        #XXX Need to use real key
+#        $self->add_bucket( $tag, $md5, $orig_key, undef, undef, $orig_key );
+#        return;
     }
+    elsif ( !$is_deleted ) {
+        return $self->read_from_loc( $subloc, $orig_key );
+    }
+
     return;
 }
 
@@ -673,19 +728,44 @@ sub delete_bucket {
     # Delete single key/value pair given tag and MD5 digested key.
     ##
     my $self = shift;
-    my ($obj, $tag, $md5) = @_;
+    my ($tag, $md5, $orig_key) = @_;
 
-    my ($subloc, $offset, $size) = $self->_find_in_buckets( $tag, $md5 );
-#XXX This needs _release_space()
-    if ( $subloc ) {
-        my $fh = $obj->_fh;
-        seek($fh, $tag->{offset} + $offset + $obj->_root->{file_offset}, SEEK_SET);
-        print( $fh substr($tag->{content}, $offset + $self->{bucket_size} ) );
-        print( $fh chr(0) x $self->{bucket_size} );
+    #ACID - Although this is a mutation, we must find any transaction.
+    # This is because we need to mark something as deleted that is in the HEAD.
+    my ($subloc, $offset, $size,$is_deleted) = $self->_find_in_buckets( $tag, $md5 );
+
+    return if !$subloc;
+
+    my $fileobj = $self->_fileobj;
 
-        return 1;
+    my @transactions;
+    if ( $fileobj->transaction_id == 0 ) {
+        @transactions = $fileobj->current_transactions;
     }
-    return;
+
+    if ( $fileobj->transaction_id == 0 ) {
+        my $value = $self->read_from_loc( $subloc, $orig_key );
+
+        for (@transactions) {
+            $fileobj->{transaction_id} = $_;
+            #XXX Need to use real key
+            $self->add_bucket( $tag, $md5, $orig_key, $value, undef, $orig_key );
+            $fileobj->{transaction_id} = 0;
+        }
+        $tag = $self->load_tag( $tag->{offset} - SIG_SIZE - $self->{data_size} );
+
+        #XXX This needs _release_space() for the value and anything below
+        $fileobj->print_at(
+            $tag->{offset} + $offset,
+            substr( $tag->{content}, $offset + $self->{bucket_size} ),
+            chr(0) x $self->{bucket_size},
+        );
+    }
+    else {
+        $self->add_bucket( $tag, $md5, '', '', 1, $orig_key );
+    }
+
+    return 1;
 }
 
 sub bucket_exists {
@@ -693,10 +773,11 @@ sub bucket_exists {
     # Check existence of single key given tag and MD5 digested key.
     ##
     my $self = shift;
-    my ($obj, $tag, $md5) = @_;
+    my ($tag, $md5) = @_;
 
-    my ($subloc, $offset, $size) = $self->_find_in_buckets( $tag, $md5 );
-    return $subloc && 1;
+    #ACID - This is a read. Can find exact or HEAD
+    my ($subloc, $offset, $size, $is_deleted) = $self->_find_in_buckets( $tag, $md5 );
+    return ($subloc && !$is_deleted) && 1;
 }
 
 sub find_bucket_list {
@@ -704,35 +785,35 @@ sub find_bucket_list {
     # Locate offset for bucket list, given digested key
     ##
     my $self = shift;
-    my ($obj, $md5, $args) = @_;
+    my ($offset, $md5, $args) = @_;
     $args = {} unless $args;
 
+    local($/,$\);
+
     ##
     # Locate offset for bucket list using digest index system
     ##
-    my $tag = $self->load_tag($obj, $obj->_base_offset)
-        or $obj->_throw_error( "INTERNAL ERROR - Cannot find tag" );
+    my $tag = $self->load_tag( $offset )
+        or $self->_throw_error( "INTERNAL ERROR - Cannot find tag" );
 
     my $ch = 0;
     while ($tag->{signature} ne SIG_BLIST) {
         my $num = ord substr($md5, $ch, 1);
 
         my $ref_loc = $tag->{offset} + ($num * $self->{long_size});
-        $tag = $self->index_lookup( $obj, $tag, $num );
+        $tag = $self->index_lookup( $tag, $num );
 
         if (!$tag) {
             return if !$args->{create};
 
-            my $loc = $self->_request_space(
-                $obj, $self->tag_size( $self->{bucket_list_size} ),
+            my $loc = $self->_fileobj->request_space(
+                $self->tag_size( $self->{bucket_list_size} ),
             );
 
-            my $fh = $obj->_fh;
-            seek($fh, $ref_loc + $obj->_root->{file_offset}, SEEK_SET);
-            print( $fh pack($self->{long_pack}, $loc) );
+            $self->_fileobj->print_at( $ref_loc, pack($self->{long_pack}, $loc) );
 
             $tag = $self->write_tag(
-                $obj, $loc, SIG_BLIST,
+                $loc, SIG_BLIST,
                 chr(0)x$self->{bucket_list_size},
             );
 
@@ -754,7 +835,7 @@ sub index_lookup {
     # Given index tag, lookup single entry in index and return .
     ##
     my $self = shift;
-    my ($obj, $tag, $index) = @_;
+    my ($tag, $index) = @_;
 
     my $location = unpack(
         $self->{long_pack},
@@ -767,7 +848,7 @@ sub index_lookup {
 
     if (!$location) { return; }
 
-    return $self->load_tag( $obj, $location );
+    return $self->load_tag( $location );
 }
 
 sub traverse_index {
@@ -777,9 +858,7 @@ sub traverse_index {
     my $self = shift;
     my ($obj, $offset, $ch, $force_return_next) = @_;
 
-    my $tag = $self->load_tag($obj, $offset );
-
-    my $fh = $obj->_fh;
+    my $tag = $self->load_tag( $offset );
 
     if ($tag->{signature} ne SIG_BLIST) {
         my $content = $tag->{content};
@@ -805,8 +884,8 @@ sub traverse_index {
         } # index loop
 
         $obj->{return_next} = 1;
-    } # tag is an index
-
+    }
+    # This is the bucket list
     else {
         my $keys = $tag->{content};
         if ($force_return_next) { $obj->{return_next} = 1; }
@@ -814,8 +893,14 @@ sub traverse_index {
         ##
         # Iterate through buckets, looking for a key match
         ##
+        my $transaction_id = $self->_fileobj->transaction_id;
         for (my $i = 0; $i < $self->{max_buckets}; $i++) {
-            my ($key, $subloc) = $self->_get_key_subloc( $keys, $i );
+            my ($key, $subloc, $size, $trans_id, $is_deleted) = $self->_get_key_subloc( $keys, $i );
+
+            next if $is_deleted;
+#XXX Need to find all the copies of this key to find out if $transaction_id has it
+#XXX marked as deleted, in use, or what.
+            next if $trans_id && $trans_id != $transaction_id;
 
             # End of bucket list -- return to outer loop
             if (!$subloc) {
@@ -829,29 +914,27 @@ sub traverse_index {
             }
             # Seek to bucket location and skip over signature
             elsif ($obj->{return_next}) {
-                seek($fh, $subloc + $obj->_root->{file_offset}, SEEK_SET);
+                my $fileobj = $self->_fileobj;
 
                 # Skip over value to get to plain key
-                my $sig;
-                read( $fh, $sig, SIG_SIZE );
+                my $sig = $fileobj->read_at( $subloc, SIG_SIZE );
 
-                my $size;
-                read( $fh, $size, $self->{data_size});
+                my $size = $fileobj->read_at( undef, $self->{data_size} );
                 $size = unpack($self->{data_pack}, $size);
-                if ($size) { seek($fh, $size, SEEK_CUR); }
+                if ($size) { $fileobj->increment_pointer( $size ); }
 
                 # Read in plain key and return as scalar
-                my $plain_key;
-                read( $fh, $size, $self->{data_size});
+                $size = $fileobj->read_at( undef, $self->{data_size} );
                 $size = unpack($self->{data_pack}, $size);
-                if ($size) { read( $fh, $plain_key, $size); }
+                my $plain_key;
+                if ($size) { $plain_key = $fileobj->read_at( undef, $size); }
 
                 return $plain_key;
             }
         }
 
         $obj->{return_next} = 1;
-    } # tag is a bucket list
+    }
 
     return;
 }
@@ -884,8 +967,10 @@ sub _get_key_subloc {
     my $self = shift;
     my ($keys, $idx) = @_;
 
-    my ($key, $subloc, $size) = unpack(
-        "a$self->{hash_size} $self->{long_pack} $self->{long_pack}",
+    my ($key, $subloc, $size, $transaction_id, $is_deleted) = unpack(
+        # This is 'a', not 'A'. Please read the pack() documentation for the
+        # difference between the two and why it's important.
+        "a$self->{hash_size} $self->{long_pack}2 n2",
         substr(
             $keys,
             ($idx * $self->{bucket_size}),
@@ -893,89 +978,64 @@ sub _get_key_subloc {
         ),
     );
 
-    return ($key, $subloc, $size);
+    return ($key, $subloc, $size, $transaction_id, $is_deleted);
 }
 
 sub _find_in_buckets {
     my $self = shift;
-    my ($tag, $md5) = @_;
+    my ($tag, $md5, $exact) = @_;
+    $exact ||= 0;
+
+    my $trans_id = $self->_fileobj->transaction_id;
+
+    my @zero;
 
     BUCKET:
     for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
-        my ($key, $subloc, $size) = $self->_get_key_subloc(
+        my ($key, $subloc, $size, $transaction_id, $is_deleted) = $self->_get_key_subloc(
             $tag->{content}, $i,
         );
 
-        return ($subloc, $i * $self->{bucket_size}, $size) unless $subloc;
+        my @rv = ($subloc, $i * $self->{bucket_size}, $size, $is_deleted);
 
-        next BUCKET if $key ne $md5;
+        unless ( $subloc ) {
+            if ( !$exact && @zero && $trans_id ) {
+                @rv = ($zero[2], $zero[0] * $self->{bucket_size},$zero[3],$is_deleted);
+            }
+            return @rv;
+        }
 
-        return ($subloc, $i * $self->{bucket_size}, $size);
-    }
+        next BUCKET if $key ne $md5;
 
-    return;
-}
+        # Save off the HEAD in case we need it.
+        @zero = ($i,$key,$subloc,$size,$transaction_id,$is_deleted) if $transaction_id == 0;
 
-#sub _print_at {
-#    my $self = shift;
-#    my ($obj, $spot, $data) = @_;
-#
-#    my $fh = $obj->_fh;
-#    seek( $fh, $spot, SEEK_SET );
-#    print( $fh $data );
-#
-#    return;
-#}
-
-sub _request_space {
-    my $self = shift;
-    my ($obj, $size) = @_;
+        next BUCKET if $transaction_id != $trans_id;
 
-    my $loc = $obj->_root->{end};
-    $obj->_root->{end} += $size;
+        return @rv;
+    }
 
-    return $loc;
+    return;
 }
 
 sub _release_space {
     my $self = shift;
-    my ($obj, $size, $loc) = @_;
+    my ($size, $loc) = @_;
 
     my $next_loc = 0;
 
-    my $fh = $obj->_fh;
-    seek( $fh, $loc + $obj->_root->{file_offset}, SEEK_SET );
-    print( $fh SIG_FREE
-        . pack($self->{long_pack}, $size )
-        . pack($self->{long_pack}, $next_loc )
+    $self->_fileobj->print_at( $loc,
+        SIG_FREE, 
+        pack($self->{long_pack}, $size ),
+        pack($self->{long_pack}, $next_loc ),
     );
 
     return;
 }
 
+sub _throw_error {
+    die "DBM::Deep: $_[1]\n";
+}
+
 1;
 __END__
-
-# This will be added in later, after more refactoring is done. This is an early
-# attempt at refactoring on the physical level instead of the virtual level.
-sub _read_at {
-    my $self = shift;
-    my ($obj, $spot, $amount, $unpack) = @_;
-
-    my $fh = $obj->_fh;
-    seek( $fh, $spot + $obj->_root->{file_offset}, SEEK_SET );
-
-    my $buffer;
-    my $bytes_read = read( $fh, $buffer, $amount );
-
-    if ( $unpack ) {
-        $buffer = unpack( $unpack, $buffer );
-    }
-
-    if ( wantarray ) {
-        return ($buffer, $bytes_read);
-    }
-    else {
-        return $buffer;
-    }
-}