r14186@rob-kinyons-powerbook58: rob | 2006-06-14 11:44:48 -0400
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
index d49e4e4..73917a4 100644 (file)
@@ -1,13 +1,31 @@
 package DBM::Deep::Engine;
 
+use 5.6.0;
+
 use strict;
+use warnings;
+
+our $VERSION = q(0.99_03);
+
+use Fcntl qw( :DEFAULT :flock );
+use Scalar::Util ();
 
-use Fcntl qw( :DEFAULT :flock :seek );
+# File-wide notes:
+# * To add to bucket_size, make sure you modify the following:
+#   - calculate_sizes()
+#   - _get_key_subloc()
+#   - add_bucket() - where the buckets are printed
+#
+# * Every method in here assumes that the _storage has been appropriately
+#   safeguarded. This can be anything from flock() to some sort of manual
+#   mutex. But, it's the caller's responsability to make sure that this has
+#   been done.
 
 ##
 # Setup file and tag signatures.  These should never change.
 ##
 sub SIG_FILE     () { 'DPDB' }
+sub SIG_HEADER   () { 'h'    }
 sub SIG_INTERNAL () { 'i'    }
 sub SIG_HASH     () { 'H'    }
 sub SIG_ARRAY    () { 'A'    }
@@ -15,229 +33,322 @@ sub SIG_NULL     () { 'N'    }
 sub SIG_DATA     () { 'D'    }
 sub SIG_INDEX    () { 'I'    }
 sub SIG_BLIST    () { 'B'    }
+sub SIG_FREE     () { 'F'    }
+sub SIG_KEYS     () { 'K'    }
 sub SIG_SIZE     () {  1     }
 
-sub precalc_sizes {
-    ##
-    # Precalculate index, bucket and bucket list sizes
-    ##
-    my $self = shift;
+# This is the transaction ID for the HEAD
+sub HEAD () { 0 }
 
-    $self->{index_size}       = (2**8) * $self->{long_size};
-    $self->{bucket_size}      = $self->{hash_size} + $self->{long_size};
-    $self->{bucket_list_size} = $self->{max_buckets} * $self->{bucket_size};
+################################################################################
+#
+# This is new code. It is a complete rewrite of the engine based on a new API
+#
+################################################################################
 
-    return 1;
+sub read_value {
+    my $self = shift;
+    my ($offset, $key, $orig_key) = @_;
+
+    my $dig_key = $self->apply_digest( $key );
+    my $tag = $self->find_blist( $offset, $dig_key ) or return;
+    return $self->get_bucket_value( $tag, $dig_key, $orig_key );
 }
 
-sub set_pack {
-    ##
-    # Set pack/unpack modes (see file header for more)
-    ##
+sub key_exists {
     my $self = shift;
-    my ($long_s, $long_p, $data_s, $data_p) = @_;
+    my ($offset, $key) = @_;
 
-    ##
-    # Set to 4 and 'N' for 32-bit offset tags (default).  Theoretical limit of 4
-    # GB per file.
-    #    (Perl must be compiled with largefile support for files > 2 GB)
-    #
-    # Set to 8 and 'Q' for 64-bit offsets.  Theoretical limit of 16 XB per file.
-    #    (Perl must be compiled with largefile and 64-bit long support)
-    ##
-    $self->{long_size} = $long_s ? $long_s : 4;
-    $self->{long_pack} = $long_p ? $long_p : 'N';
+    my $dig_key = $self->apply_digest( $key );
+    # exists() returns the empty string, not undef
+    my $tag = $self->find_blist( $offset, $dig_key ) or return '';
+    return $self->bucket_exists( $tag, $dig_key, $key );
+}
 
-    ##
-    # Set to 4 and 'N' for 32-bit data length prefixes.  Limit of 4 GB for each
-    # key/value. Upgrading this is possible (see above) but probably not
-    # necessary. If you need more than 4 GB for a single key or value, this
-    # module is really not for you :-)
-    ##
-    $self->{data_size} = $data_s ? $data_s : 4;
-    $self->{data_pack} = $data_p ? $data_p : 'N';
+sub get_next_key {
+    my $self = shift;
+    my ($offset) = @_;
+
+    # If the previous key was not specifed, start at the top and
+    # return the first one found.
+    my $temp;
+    if ( @_ > 1 ) {
+        $temp = {
+            prev_md5    => $self->apply_digest($_[1]),
+            return_next => 0,
+        };
+    }
+    else {
+        $temp = {
+            prev_md5    => chr(0) x $self->{hash_size},
+            return_next => 1,
+        };
+    }
 
-    return $self->precalc_sizes();
+    return $self->traverse_index( $temp, $offset, 0 );
 }
 
-sub set_digest {
-    ##
-    # Set key digest function (default is MD5)
-    ##
+sub delete_key {
     my $self = shift;
-    my ($digest_func, $hash_size) = @_;
+    my ($offset, $key, $orig_key) = @_;
 
-    $self->{digest} = $digest_func ? $digest_func : \&Digest::MD5::md5;
-    $self->{hash_size} = $hash_size ? $hash_size : 16;
+    my $dig_key = $self->apply_digest( $key );
+    my $tag = $self->find_blist( $offset, $dig_key ) or return;
+    my $value = $self->get_bucket_value( $tag, $dig_key, $orig_key );
+    $self->delete_bucket( $tag, $dig_key, $orig_key );
+    return $value;
+}
 
-    return $self->precalc_sizes();
+sub write_value {
+    my $self = shift;
+    my ($offset, $key, $value, $orig_key) = @_;
+
+    my $dig_key = $self->apply_digest( $key );
+    my $tag = $self->find_blist( $offset, $dig_key, { create => 1 } );
+    return $self->add_bucket( $tag, $dig_key, $key, $value, undef, $orig_key );
 }
 
+################################################################################
+#
+# Below here is the old code. It will be folded into the code above as it can.
+#
+################################################################################
+
 sub new {
     my $class = shift;
     my ($args) = @_;
 
     my $self = bless {
-        long_size   => 4,
-        long_pack   => 'N',
-        data_size   => 4,
-        data_pack   => 'N',
+        long_size => 4,
+        long_pack => 'N',
+        data_size => 4,
+        data_pack => 'N',
 
-        digest      => \&Digest::MD5::md5,
-        hash_size   => 16,
+        digest    => \&Digest::MD5::md5,
+        hash_size => 16, # In bytes
 
         ##
-        # Maximum number of buckets per list before another level of indexing is
-        # done.
-        # Increase this value for slightly greater speed, but larger database
+        # Number of buckets per blist before another level of indexing is
+        # done. Increase this value for slightly greater speed, but larger database
         # files. DO NOT decrease this value below 16, due to risk of recursive
         # reindex overrun.
         ##
         max_buckets => 16,
-    }, $class;
 
-    $self->precalc_sizes;
+        storage => undef,
+        obj     => undef,
+    }, $class;
 
-    return $self;
-}
+    if ( defined $args->{pack_size} ) {
+        if ( lc $args->{pack_size} eq 'small' ) {
+            $args->{long_size} = 2;
+            $args->{long_pack} = 'n';
+        }
+        elsif ( lc $args->{pack_size} eq 'medium' ) {
+            $args->{long_size} = 4;
+            $args->{long_pack} = 'N';
+        }
+        elsif ( lc $args->{pack_size} eq 'large' ) {
+            $args->{long_size} = 8;
+            $args->{long_pack} = 'Q';
+        }
+        else {
+            die "Unknown pack_size value: '$args->{pack_size}'\n";
+        }
+    }
 
-sub setup_fh {
-    my $self = shift;
-    my ($obj) = @_;
+    # Grab the parameters we want to use
+    foreach my $param ( keys %$self ) {
+        next unless exists $args->{$param};
+        $self->{$param} = $args->{$param};
+    }
+    Scalar::Util::weaken( $self->{obj} ) if $self->{obj};
 
-    $self->open( $obj ) if !defined $obj->_fh;
+    if ( $self->{max_buckets} < 16 ) {
+        warn "Floor of max_buckets is 16. Setting it to 16 from '$self->{max_buckets}'\n";
+        $self->{max_buckets} = 16;
+    }
 
-    $obj->{base_offset} = length( SIG_FILE )
-        unless defined $obj->{base_offset};
+    return $self;
+}
 
-    #XXX We have to make sure we don't mess up when autoflush isn't turned on
-    unless ( $obj->_root->{inode} ) {
-        my @stats = stat($obj->_fh);
-        $obj->_root->{inode} = $stats[1];
-        $obj->_root->{end} = $stats[7];
-    }
+sub _storage { return $_[0]{storage} }
 
-    return 1;
+sub apply_digest {
+    my $self = shift;
+    return $self->{digest}->(@_);
 }
 
-sub open {
-    ##
-    # Open a fh to the database, create if nonexistent.
-    # Make sure file signature matches DBM::Deep spec.
-    ##
+sub calculate_sizes {
     my $self = shift;
-    my ($obj) = @_;
 
-    if (defined($obj->_fh)) { $self->close_fh( $obj ); }
+    # The 2**8 here indicates the number of different characters in the
+    # current hashing algorithm
+    #XXX Does this need to be updated with different hashing algorithms?
+    $self->{hash_chars_used}  = (2**8);
+    $self->{index_size}       = $self->{hash_chars_used} * $self->{long_size};
 
-    # Theoretically, adding O_BINARY should remove the need for the binmode
-    # Of course, testing it is going to be ... interesting.
-    my $flags = O_RDWR | O_CREAT | O_BINARY;
+    $self->{bucket_size}      = $self->{hash_size} + $self->{long_size} * 2;
+    $self->{bucket_list_size} = $self->{max_buckets} * $self->{bucket_size};
 
-    my $fh;
-    my $filename = $obj->_root->{file};
-    sysopen( $fh, $filename, $flags )
-        or $obj->_throw_error("Cannot sysopen file '$filename': $!");
-    $obj->_root->{fh} = $fh;
+    $self->{key_size}         = $self->{long_size} * 2;
+    $self->{keyloc_size}      = $self->{max_buckets} * $self->{key_size};
 
-    #XXX Can we remove this by using the right sysopen() flags?
-    # Maybe ... q.v. above
-    binmode $fh; # for win32
+    return;
+}
 
-    if ($obj->_root->{autoflush}) {
-        my $old = select $fh;
-        $|=1;
-        select $old;
-    }
+sub write_file_header {
+    my $self = shift;
 
-    seek($fh, 0 + $obj->_root->{file_offset}, SEEK_SET);
-    my $signature;
-    my $bytes_read = read( $fh, $signature, length(SIG_FILE));
+    my $loc = $self->_storage->request_space( length( SIG_FILE ) + 33 );
+
+    $self->_storage->print_at( $loc,
+        SIG_FILE,
+        SIG_HEADER,
+        pack('N', 1),  # header version
+        pack('N', 24), # header size
+        pack('N4', 0, 0, 0, 0),  # currently running transaction IDs
+        pack('n', $self->{long_size}),
+        pack('A', $self->{long_pack}),
+        pack('n', $self->{data_size}),
+        pack('A', $self->{data_pack}),
+        pack('n', $self->{max_buckets}),
+    );
 
-    ##
-    # File is empty -- write signature and master index
-    ##
-    if (!$bytes_read) {
-        seek($fh, 0 + $obj->_root->{file_offset}, SEEK_SET);
-        print( $fh SIG_FILE);
+    $self->_storage->set_transaction_offset( 13 );
 
-        $obj->_root->{end} = length( SIG_FILE );
+    return;
+}
 
-        $obj->{base_offset} = $self->_request_space($obj, $self->{index_size});
+sub read_file_header {
+    my $self = shift;
 
-        $self->create_tag(
-            $obj, $obj->_base_offset, $obj->_type, chr(0) x $self->{index_size},
-        );
+    my $buffer = $self->_storage->read_at( 0, length(SIG_FILE) + 9 );
+    return unless length($buffer);
 
-        # Flush the filehandle
-        my $old_fh = select $fh;
-        my $old_af = $|; $| = 1; $| = $old_af;
-        select $old_fh;
+    my ($file_signature, $sig_header, $header_version, $size) = unpack(
+        'A4 A N N', $buffer
+    );
 
-        return 1;
+    unless ( $file_signature eq SIG_FILE ) {
+        $self->_storage->close;
+        $self->_throw_error( "Signature not found -- file is not a Deep DB" );
     }
 
-    $obj->{base_offset} = $bytes_read
-        unless defined $obj->{base_offset};
-
-    ##
-    # Check signature was valid
-    ##
-    unless ($signature eq SIG_FILE) {
-        $self->close_fh( $obj );
-        $obj->_throw_error("Signature not found -- file is not a Deep DB");
+    unless ( $sig_header eq SIG_HEADER ) {
+        $self->_storage->close;
+        $self->_throw_error( "Old file version found." );
     }
 
-    ##
-    # Get our type from master index signature
-    ##
-    my $tag = $self->load_tag($obj, $obj->_base_offset)
-        or $obj->_throw_error("Corrupted file, no master index record");
+    my $buffer2 = $self->_storage->read_at( undef, $size );
+    my ($a1, $a2, $a3, $a4, @values) = unpack( 'N4 n A n A n', $buffer2 );
+
+    $self->_storage->set_transaction_offset( 13 );
 
-    unless ($obj->{type} eq $tag->{signature}) {
-        $obj->_throw_error("File type mismatch");
+    if ( @values < 5 || grep { !defined } @values ) {
+        $self->_storage->close;
+        $self->_throw_error("Corrupted file - bad header");
     }
 
-#XXX We probably also want to store the hash algorithm name and not assume anything
-#XXX The cool thing would be to allow a different hashing algorithm at every level
+    #XXX Add warnings if values weren't set right
+    @{$self}{qw(long_size long_pack data_size data_pack max_buckets)} = @values;
 
-    return 1;
+    return length($buffer) + length($buffer2);
 }
 
-sub close_fh {
+sub setup_fh {
     my $self = shift;
     my ($obj) = @_;
 
-    if ( my $fh = $obj->_root->{fh} ) {
-        close $fh;
+    # Need to remove use of $fh here
+    my $fh = $self->_storage->{fh};
+    flock $fh, LOCK_EX;
+
+    #XXX The duplication of calculate_sizes needs to go away
+    unless ( $obj->{base_offset} ) {
+        my $bytes_read = $self->read_file_header;
+
+        $self->calculate_sizes;
+
+        ##
+        # File is empty -- write header and master index
+        ##
+        if (!$bytes_read) {
+            $self->_storage->audit( "# Database created on" );
+
+            $self->write_file_header;
+
+            $obj->{base_offset} = $self->_storage->request_space(
+                $self->tag_size( $self->{index_size} ),
+            );
+
+            $self->write_tag(
+                $obj->_base_offset, $obj->_type,
+                chr(0)x$self->{index_size},
+            );
+
+            # Flush the filehandle
+            my $old_fh = select $fh;
+            my $old_af = $|; $| = 1; $| = $old_af;
+            select $old_fh;
+        }
+        else {
+            $obj->{base_offset} = $bytes_read;
+
+            ##
+            # Get our type from master index header
+            ##
+            my $tag = $self->load_tag($obj->_base_offset);
+            unless ( $tag ) {
+                flock $fh, LOCK_UN;
+                $self->_throw_error("Corrupted file, no master index record");
+            }
+
+            unless ($obj->_type eq $tag->{signature}) {
+                flock $fh, LOCK_UN;
+                $self->_throw_error("File type mismatch");
+            }
+        }
+    }
+    else {
+        $self->calculate_sizes;
     }
-    $obj->_root->{fh} = undef;
+
+    #XXX We have to make sure we don't mess up when autoflush isn't turned on
+    $self->_storage->set_inode;
+
+    flock $fh, LOCK_UN;
 
     return 1;
 }
 
-sub create_tag {
+sub tag_size {
+    my $self = shift;
+    my ($size) = @_;
+    return SIG_SIZE + $self->{data_size} + $size;
+}
+
+sub write_tag {
     ##
     # Given offset, signature and content, create tag and write to disk
     ##
     my $self = shift;
-    my ($obj, $offset, $sig, $content) = @_;
-    my $size = length($content);
+    my ($offset, $sig, $content) = @_;
+    my $size = length( $content );
 
-    my $fh = $obj->_fh;
+    $self->_storage->print_at(
+        $offset, 
+        $sig, pack($self->{data_pack}, $size), $content,
+    );
 
-    seek($fh, $offset + $obj->_root->{file_offset}, SEEK_SET);
-    print( $fh $sig . pack($self->{data_pack}, $size) . $content );
-
-    if ($offset == $obj->_root->{end}) {
-        $obj->_root->{end} += SIG_SIZE + $self->{data_size} + $size;
-    }
+    return unless defined $offset;
 
     return {
         signature => $sig,
-        size => $size,
-        offset => $offset + SIG_SIZE + $self->{data_size},
-        content => $content
+        #XXX Is this even used?
+        size      => $size,
+        offset    => $offset + SIG_SIZE + $self->{data_size},
+        content   => $content
     };
 }
 
@@ -246,30 +357,41 @@ sub load_tag {
     # Given offset, load single tag and return signature, size and data
     ##
     my $self = shift;
-    my ($obj, $offset) = @_;
+    my ($offset) = @_;
 
-#    print join(':',map{$_||''}caller(1)), $/;
+    my $storage = $self->_storage;
 
-    my $fh = $obj->_fh;
+    my ($sig, $size) = unpack(
+        "A $self->{data_pack}",
+        $storage->read_at( $offset, SIG_SIZE + $self->{data_size} ),
+    );
 
-    seek($fh, $offset + $obj->_root->{file_offset}, SEEK_SET);
+    return {
+        signature => $sig,
+        size      => $size,   #XXX Is this even used?
+        start     => $offset,
+        offset    => $offset + SIG_SIZE + $self->{data_size},
+        content   => $storage->read_at( undef, $size ),
+    };
+}
 
-    #XXX I'm not sure this check will work if autoflush isn't enabled ...
-    return if eof $fh;
+sub find_keyloc {
+    my $self = shift;
+    my ($tag, $transaction_id) = @_;
+    $transaction_id = $self->_storage->transaction_id
+        unless defined $transaction_id;
 
-    my $b;
-    read( $fh, $b, SIG_SIZE + $self->{data_size} );
-    my ($sig, $size) = unpack( "A $self->{data_pack}", $b );
+    for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
+        my ($loc, $trans_id, $is_deleted) = unpack(
+            "$self->{long_pack} C C",
+            substr( $tag->{content}, $i * $self->{key_size}, $self->{key_size} ),
+        );
 
-    my $buffer;
-    read( $fh, $buffer, $size);
+        next if $loc != HEAD && $transaction_id != $trans_id;
+        return( $loc, $is_deleted, $i * $self->{key_size} );
+    }
 
-    return {
-        signature => $sig,
-        size => $size,
-        offset => $offset + SIG_SIZE + $self->{data_size},
-        content => $buffer
-    };
+    return;
 }
 
 sub add_bucket {
@@ -278,216 +400,192 @@ sub add_bucket {
     # plain (undigested) key and value.
     ##
     my $self = shift;
-    my ($obj, $tag, $md5, $plain_key, $value) = @_;
+    my ($tag, $md5, $plain_key, $value, $deleted, $orig_key) = @_;
 
     # This verifies that only supported values will be stored.
     {
         my $r = Scalar::Util::reftype( $value );
-        last if !defined $r;
 
+        last if !defined $r;
         last if $r eq 'HASH';
         last if $r eq 'ARRAY';
 
-        $obj->_throw_error(
-            "Storage of variables of type '$r' is not supported."
+        $self->_throw_error(
+            "Storage of references of type '$r' is not supported."
         );
     }
 
-    my $location = 0;
-    my $result = 2;
+    my $storage = $self->_storage;
 
-    my $root = $obj->_root;
+    #ACID - This is a mutation. Must only find the exact transaction
+    my ($keyloc, $offset) = $self->_find_in_buckets( $tag, $md5, 1 );
 
-    my $is_dbm_deep = eval {
-        local $SIG{'__DIE__'};
-        $value->isa( 'DBM::Deep' );
-    };
-
-    my $internal_ref = $is_dbm_deep && ($value->_root eq $root);
+    my @transactions;
+    if ( $storage->transaction_id == 0 ) {
+        @transactions = $storage->current_transactions;
+    }
 
-    my $fh = $obj->_fh;
+#    $self->_release_space( $size, $subloc );
+#XXX This needs updating to use _release_space
 
-    my ($subloc, $offset) = $self->_find_in_buckets( $tag, $md5 );
+    my $location;
+    my $size = $self->_length_needed( $value, $plain_key );
 
     # Updating a known md5
-    if ( $subloc ) {
-        $result = 1;
-
-        ##
-        # If value is a hash, array, or raw value with equal or less size, we
-        # can reuse the same content area of the database.  Otherwise, we have
-        # to create a new content area at the EOF.
-        ##
-        my $actual_length;
-        if ( $internal_ref ) {
-            $actual_length = $self->{long_size};
-        }
-        else {
-            my $r = Scalar::Util::reftype( $value ) || '';
-            if ( $r eq 'HASH' || $r eq 'ARRAY' ) {
-                $actual_length = $self->{index_size};
-
-                # if autobless is enabled, must also take into consideration
-                # the class name, as it is stored along with key/value.
-                if ( $root->{autobless} ) {
-                    my $value_class = Scalar::Util::blessed($value);
-                    if ( defined $value_class && !$value->isa('DBM::Deep') ) {
-                        $actual_length += length($value_class);
-                    }
+    if ( $keyloc ) {
+        my $keytag = $self->load_tag( $keyloc );
+        my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
+
+        if ( $subloc && !$is_deleted && @transactions ) {
+            my $old_value = $self->read_from_loc( $subloc, $orig_key );
+            my $old_size = $self->_length_needed( $old_value, $plain_key );
+
+            for my $trans_id ( @transactions ) {
+                my ($loc, $is_deleted, $offset2) = $self->find_keyloc( $keytag, $trans_id );
+                unless ($loc) {
+                    my $location2 = $storage->request_space( $old_size );
+                    $storage->print_at( $keytag->{offset} + $offset2,
+                        pack($self->{long_pack}, $location2 ),
+                        pack( 'C C', $trans_id, 0 ),
+                    );
+                    $self->_write_value( $location2, $plain_key, $old_value, $orig_key );
                 }
             }
-            else { $actual_length = length($value); }
         }
 
-        seek($fh, $subloc + SIG_SIZE + $root->{file_offset}, SEEK_SET);
-        my $size;
-        read( $fh, $size, $self->{data_size});
-        $size = unpack($self->{data_pack}, $size);
+        $location = $self->_storage->request_space( $size );
+        #XXX This needs to be transactionally-aware in terms of which keytag->{offset} to use
+        $storage->print_at( $keytag->{offset} + $offset,
+            pack($self->{long_pack}, $location ),
+            pack( 'C C', $storage->transaction_id, 0 ),
+        );
+    }
+    # Adding a new md5
+    else {
+        my $keyloc = $storage->request_space( $self->tag_size( $self->{keyloc_size} ) );
 
-        if ($actual_length <= $size) {
-            $location = $subloc;
+        # The bucket fit into list
+        if ( defined $offset ) {
+            $storage->print_at( $tag->{offset} + $offset,
+                $md5, pack( $self->{long_pack}, $keyloc ),
+            );
         }
+        # If bucket didn't fit into list, split into a new index level
         else {
-            $location = $root->{end};
-            seek(
-                $fh,
-                $tag->{offset} + $offset + $self->{hash_size} + $root->{file_offset},
-                SEEK_SET,
-            );
-            print( $fh pack($self->{long_pack}, $location) );
+            $self->split_index( $tag, $md5, $keyloc );
         }
-    }
-    # Adding a new md5
-    elsif ( defined $offset ) {
-        $location = $root->{end};
 
-        seek( $fh, $tag->{offset} + $offset + $root->{file_offset}, SEEK_SET );
-        print( $fh $md5 . pack($self->{long_pack}, $location) );
-    }
-    # If bucket didn't fit into list, split into a new index level
-    else {
-        $self->split_index( $obj, $md5, $tag );
+        my $keytag = $self->write_tag(
+            $keyloc, SIG_KEYS, chr(0)x$self->{keyloc_size},
+        );
 
-        $location = $root->{end};
+        $location = $self->_storage->request_space( $size );
+        $storage->print_at( $keytag->{offset},
+            pack( $self->{long_pack}, $location ),
+            pack( 'C C', $storage->transaction_id, 0 ),
+        );
+
+        my $offset = 1;
+        for my $trans_id ( @transactions ) {
+            $storage->print_at( $keytag->{offset} + $self->{key_size} * $offset++,
+                pack( $self->{long_pack}, 0 ),
+                pack( 'C C', $trans_id, 1 ),
+            );
+        }
     }
 
-    $self->write_value( $obj, $location, $plain_key, $value );
+    $self->_write_value( $location, $plain_key, $value, $orig_key );
 
-    return $result;
+    return 1;
 }
 
-sub write_value {
+sub _write_value {
     my $self = shift;
-    my ($obj, $location, $key, $value) = @_;
-
-    my $fh = $obj->_fh;
-    my $root = $obj->_root;
-
-    my $is_dbm_deep = eval {
-        local $SIG{'__DIE__'};
-        $value->isa( 'DBM::Deep' );
-    };
+    my ($location, $key, $value, $orig_key) = @_;
 
-    my $internal_ref = $is_dbm_deep && ($value->_root eq $root);
+    my $storage = $self->_storage;
 
-    seek($fh, $location + $root->{file_offset}, SEEK_SET);
+    my $dbm_deep_obj = _get_dbm_object( $value );
+    if ( $dbm_deep_obj && $dbm_deep_obj->_storage ne $storage ) {
+        $self->_throw_error( "Cannot cross-reference. Use export() instead" );
+    }
 
     ##
     # Write signature based on content type, set content length and write
     # actual value.
     ##
-    my $r = Scalar::Util::reftype($value) || '';
-    my $content_length;
-    if ( $internal_ref ) {
-        print( $fh SIG_INTERNAL );
-        print( $fh pack($self->{data_pack}, $self->{long_size}) );
-        print( $fh pack($self->{long_pack}, $value->_base_offset) );
-        $content_length = $self->{long_size};
+    my $r = Scalar::Util::reftype( $value ) || '';
+    if ( $dbm_deep_obj ) {
+        $self->write_tag( $location, SIG_INTERNAL,pack($self->{long_pack}, $dbm_deep_obj->_base_offset) );
     }
-    else {
-        if ($r eq 'HASH') {
-            print( $fh SIG_HASH );
-            print( $fh pack($self->{data_pack}, $self->{index_size}) . chr(0) x $self->{index_size} );
-            $content_length = $self->{index_size};
-        }
-        elsif ($r eq 'ARRAY') {
-            print( $fh SIG_ARRAY );
-            print( $fh pack($self->{data_pack}, $self->{index_size}) . chr(0) x $self->{index_size} );
-            $content_length = $self->{index_size};
+    elsif ($r eq 'HASH') {
+        if ( !$dbm_deep_obj && tied %{$value} ) {
+            $self->_throw_error( "Cannot store something that is tied" );
         }
-        elsif (!defined($value)) {
-            print( $fh SIG_NULL );
-            print( $fh pack($self->{data_pack}, 0) );
-            $content_length = 0;
-        }
-        else {
-            print( $fh SIG_DATA );
-            print( $fh pack($self->{data_pack}, length($value)) . $value );
-            $content_length = length($value);
+        $self->write_tag( $location, SIG_HASH, chr(0)x$self->{index_size} );
+    }
+    elsif ($r eq 'ARRAY') {
+        if ( !$dbm_deep_obj && tied @{$value} ) {
+            $self->_throw_error( "Cannot store something that is tied" );
         }
+        $self->write_tag( $location, SIG_ARRAY, chr(0)x$self->{index_size} );
+    }
+    elsif (!defined($value)) {
+        $self->write_tag( $location, SIG_NULL, '' );
+    }
+    else {
+        $self->write_tag( $location, SIG_DATA, $value );
     }
 
     ##
     # Plain key is stored AFTER value, as keys are typically fetched less often.
     ##
-    print( $fh pack($self->{data_pack}, length($key)) . $key );
+    $storage->print_at( undef, pack($self->{data_pack}, length($key)) . $key );
+
+    # Internal references don't care about autobless
+    return 1 if $dbm_deep_obj;
 
     ##
     # If value is blessed, preserve class name
     ##
-    if ( $root->{autobless} ) {
-        my $value_class = Scalar::Util::blessed($value);
-        if ( defined $value_class && !$value->isa( 'DBM::Deep' ) ) {
-            ##
-            # Blessed ref -- will restore later
-            ##
-            print( $fh chr(1) );
-            print( $fh pack($self->{data_pack}, length($value_class)) . $value_class );
-            $content_length += 1;
-            $content_length += $self->{data_size} + length($value_class);
+    if ( $storage->{autobless} ) {
+        if ( defined( my $c = Scalar::Util::blessed($value) ) ) {
+            $storage->print_at( undef, chr(1), pack($self->{data_pack}, length($c)) . $c );
         }
         else {
-            print( $fh chr(0) );
-            $content_length += 1;
+            $storage->print_at( undef, chr(0) );
         }
     }
 
     ##
-    # If this is a new content area, advance EOF counter
+    # Tie the passed in reference so that changes to it are reflected in the
+    # datafile. The use of $location as the base_offset will act as the
+    # the linkage between parent and child.
+    #
+    # The overall assignment is a hack around the fact that just tying doesn't
+    # store the values. This may not be the wrong thing to do.
     ##
-    if ($location == $root->{end}) {
-        $root->{end} += SIG_SIZE;
-        $root->{end} += $self->{data_size} + $content_length;
-        $root->{end} += $self->{data_size} + length($key);
+    if ($r eq 'HASH') {
+        my %x = %$value;
+        tie %$value, 'DBM::Deep', {
+            base_offset => $location,
+            storage     => $storage,
+            parent      => $self->{obj},
+            parent_key  => $orig_key,
+        };
+        %$value = %x;
+        bless $value, 'DBM::Deep::Hash' unless Scalar::Util::blessed( $value );
     }
-
-    ##
-    # If content is a hash or array, create new child DBM::Deep object and
-    # pass each key or element to it.
-    ##
-    if ( ! $internal_ref ) {
-        if ($r eq 'HASH') {
-            my $branch = DBM::Deep->new(
-                type => DBM::Deep->TYPE_HASH,
-                base_offset => $location,
-                root => $root,
-            );
-            foreach my $key (keys %{$value}) {
-                $branch->STORE( $key, $value->{$key} );
-            }
-        }
-        elsif ($r eq 'ARRAY') {
-            my $branch = DBM::Deep->new(
-                type => DBM::Deep->TYPE_ARRAY,
-                base_offset => $location,
-                root => $root,
-            );
-            my $index = 0;
-            foreach my $element (@{$value}) {
-                $branch->STORE( $index, $element );
-                $index++;
-            }
-        }
+    elsif ($r eq 'ARRAY') {
+        my @x = @$value;
+        tie @$value, 'DBM::Deep', {
+            base_offset => $location,
+            storage     => $storage,
+            parent      => $self->{obj},
+            parent_key  => $orig_key,
+        };
+        @$value = @x;
+        bless $value, 'DBM::Deep::Array' unless Scalar::Util::blessed( $value );
     }
 
     return 1;
@@ -495,125 +593,159 @@ sub write_value {
 
 sub split_index {
     my $self = shift;
-    my ($obj, $md5, $tag) = @_;
+    my ($tag, $md5, $keyloc) = @_;
 
-    my $fh = $obj->_fh;
-    my $root = $obj->_root;
-    my $keys = $tag->{content};
+    my $storage = $self->_storage;
 
-    seek($fh, $tag->{ref_loc} + $root->{file_offset}, SEEK_SET);
-    print( $fh pack($self->{long_pack}, $root->{end}) );
-
-    my $index_tag = $self->create_tag(
-        $obj,
-        $root->{end},
-        SIG_INDEX,
-        chr(0) x $self->{index_size},
+    my $loc = $storage->request_space(
+        $self->tag_size( $self->{index_size} ),
     );
 
-    my @offsets = ();
+    $storage->print_at( $tag->{ref_loc}, pack($self->{long_pack}, $loc) );
+
+    my $index_tag = $self->write_tag(
+        $loc, SIG_INDEX,
+        chr(0)x$self->{index_size},
+    );
 
-    $keys .= $md5 . pack($self->{long_pack}, 0);
+    my $keys = $tag->{content}
+             . $md5 . pack($self->{long_pack}, $keyloc);
 
+    my @newloc = ();
     BUCKET:
+    # The <= here is deliberate - we have max_buckets+1 keys to iterate
+    # through, unlike every other loop that uses max_buckets as a stop.
     for (my $i = 0; $i <= $self->{max_buckets}; $i++) {
         my ($key, $old_subloc) = $self->_get_key_subloc( $keys, $i );
 
-        next BUCKET unless $key;
+        die "[INTERNAL ERROR]: No key in split_index()\n" unless $key;
+        die "[INTERNAL ERROR]: No subloc in split_index()\n" unless $old_subloc;
 
         my $num = ord(substr($key, $tag->{ch} + 1, 1));
 
-        if ($offsets[$num]) {
-            my $offset = $offsets[$num] + SIG_SIZE + $self->{data_size};
-            seek($fh, $offset + $root->{file_offset}, SEEK_SET);
-            my $subkeys;
-            read( $fh, $subkeys, $self->{bucket_list_size});
+        if ($newloc[$num]) {
+            my $subkeys = $storage->read_at( $newloc[$num], $self->{bucket_list_size} );
 
-            for (my $k=0; $k<$self->{max_buckets}; $k++) {
-                my ($temp, $subloc) = $self->_get_key_subloc( $subkeys, $k );
+            # This is looking for the first empty spot
+            my ($subloc, $offset) = $self->_find_in_buckets(
+                { content => $subkeys }, '',
+            );
 
-                if (!$subloc) {
-                    seek($fh, $offset + ($k * $self->{bucket_size}) + $root->{file_offset}, SEEK_SET);
-                    print( $fh $key . pack($self->{long_pack}, $old_subloc || $root->{end}) );
-                    last;
-                }
-            } # k loop
+            $storage->print_at(
+                $newloc[$num] + $offset,
+                $key, pack($self->{long_pack}, $old_subloc),
+            );
+
+            next;
         }
-        else {
-            $offsets[$num] = $root->{end};
-            seek($fh, $index_tag->{offset} + ($num * $self->{long_size}) + $root->{file_offset}, SEEK_SET);
-            print( $fh pack($self->{long_pack}, $root->{end}) );
 
-            my $blist_tag = $self->create_tag($obj, $root->{end}, SIG_BLIST, chr(0) x $self->{bucket_list_size});
+        my $loc = $storage->request_space(
+            $self->tag_size( $self->{bucket_list_size} ),
+        );
+
+        $storage->print_at(
+            $index_tag->{offset} + ($num * $self->{long_size}),
+            pack($self->{long_pack}, $loc),
+        );
 
-            seek($fh, $blist_tag->{offset} + $root->{file_offset}, SEEK_SET);
-            print( $fh $key . pack($self->{long_pack}, $old_subloc || $root->{end}) );
-        }
-    } # i loop
+        my $blist_tag = $self->write_tag(
+            $loc, SIG_BLIST,
+            chr(0)x$self->{bucket_list_size},
+        );
 
-    return;
+        $storage->print_at( $blist_tag->{offset}, $key . pack($self->{long_pack}, $old_subloc) );
+
+        $newloc[$num] = $blist_tag->{offset};
+    }
+
+    $self->_release_space(
+        $self->tag_size( $self->{bucket_list_size} ),
+        $tag->{start},
+    );
+
+    return 1;
 }
 
 sub read_from_loc {
     my $self = shift;
-    my ($obj, $subloc) = @_;
+    my ($subloc, $orig_key) = @_;
 
-    my $fh = $obj->_fh;
+    my $storage = $self->_storage;
 
-    ##
-    # Found match -- seek to offset and read signature
-    ##
-    my $signature;
-    seek($fh, $subloc + $obj->_root->{file_offset}, SEEK_SET);
-    read( $fh, $signature, SIG_SIZE);
+    my $signature = $storage->read_at( $subloc, SIG_SIZE );
 
     ##
     # If value is a hash or array, return new DBM::Deep object with correct offset
     ##
     if (($signature eq SIG_HASH) || ($signature eq SIG_ARRAY)) {
-        my $obj = DBM::Deep->new(
-            type => $signature,
+        #XXX This needs to be a singleton
+#        my $new_obj;
+#        my $is_autobless;
+#        if ( $signature eq SIG_HASH ) {
+#            $new_obj = {};
+#            tie %$new_obj, 'DBM::Deep', {
+#                base_offset => $subloc,
+#                storage     => $self->_storage,
+#                parent      => $self->{obj},
+#                parent_key  => $orig_key,
+#            };
+#            $is_autobless = tied(%$new_obj)->_storage->{autobless};
+#        }
+#        else {
+#            $new_obj = [];
+#            tie @$new_obj, 'DBM::Deep', {
+#                base_offset => $subloc,
+#                storage     => $self->_storage,
+#                parent      => $self->{obj},
+#                parent_key  => $orig_key,
+#            };
+#            $is_autobless = tied(@$new_obj)->_storage->{autobless};
+#        }
+#
+#        if ($is_autobless) {
+
+        my $new_obj = DBM::Deep->new({
+            type        => $signature,
             base_offset => $subloc,
-            root => $obj->_root,
-        );
+            storage     => $self->_storage,
+            parent      => $self->{obj},
+            parent_key  => $orig_key,
+        });
 
-        if ($obj->_root->{autobless}) {
+        if ($new_obj->_storage->{autobless}) {
             ##
             # Skip over value and plain key to see if object needs
             # to be re-blessed
             ##
-            seek($fh, $self->{data_size} + $self->{index_size}, SEEK_CUR);
-
-            my $size;
-            read( $fh, $size, $self->{data_size}); $size = unpack($self->{data_pack}, $size);
-            if ($size) { seek($fh, $size, SEEK_CUR); }
-
-            my $bless_bit;
-            read( $fh, $bless_bit, 1);
-            if (ord($bless_bit)) {
-                ##
-                # Yes, object needs to be re-blessed
-                ##
-                my $class_name;
-                read( $fh, $size, $self->{data_size}); $size = unpack($self->{data_pack}, $size);
-                if ($size) { read( $fh, $class_name, $size); }
-                if ($class_name) { $obj = bless( $obj, $class_name ); }
+            $storage->increment_pointer( $self->{data_size} + $self->{index_size} );
+
+            my $size = $storage->read_at( undef, $self->{data_size} );
+            $size = unpack($self->{data_pack}, $size);
+            if ($size) { $storage->increment_pointer( $size ); }
+
+            my $bless_bit = $storage->read_at( undef, 1 );
+            if ( ord($bless_bit) ) {
+                my $size = unpack(
+                    $self->{data_pack},
+                    $storage->read_at( undef, $self->{data_size} ),
+                );
+
+                if ( $size ) {
+                    $new_obj = bless $new_obj, $storage->read_at( undef, $size );
+                }
             }
         }
 
-        return $obj;
+        return $new_obj;
     }
     elsif ( $signature eq SIG_INTERNAL ) {
-        my $size;
-        read( $fh, $size, $self->{data_size});
+        my $size = $storage->read_at( undef, $self->{data_size} );
         $size = unpack($self->{data_pack}, $size);
 
         if ( $size ) {
-            my $new_loc;
-            read( $fh, $new_loc, $size );
-            $new_loc = unpack( $self->{long_pack}, $new_loc );
-
-            return $self->read_from_loc( $obj, $new_loc );
+            my $new_loc = $storage->read_at( undef, $size );
+            $new_loc = unpack( $self->{long_pack}, $new_loc ); 
+            return $self->read_from_loc( $new_loc, $orig_key );
         }
         else {
             return;
@@ -622,13 +754,11 @@ sub read_from_loc {
     ##
     # Otherwise return actual value
     ##
-    elsif ($signature eq SIG_DATA) {
-        my $size;
-        read( $fh, $size, $self->{data_size});
+    elsif ( $signature eq SIG_DATA ) {
+        my $size = $storage->read_at( undef, $self->{data_size} );
         $size = unpack($self->{data_pack}, $size);
 
-        my $value = '';
-        if ($size) { read( $fh, $value, $size); }
+        my $value = $size ? $storage->read_at( undef, $size ) : '';
         return $value;
     }
 
@@ -643,12 +773,28 @@ sub get_bucket_value {
     # Fetch single value given tag and MD5 digested key.
     ##
     my $self = shift;
-    my ($obj, $tag, $md5) = @_;
+    my ($tag, $md5, $orig_key) = @_;
+
+    #ACID - This is a read. Can find exact or HEAD
+    my ($keyloc, $offset) = $self->_find_in_buckets( $tag, $md5 );
 
-    my ($subloc, $offset) = $self->_find_in_buckets( $tag, $md5 );
-    if ( $subloc ) {
-        return $self->read_from_loc( $obj, $subloc );
+    if ( !$keyloc ) {
+        #XXX Need to use real key
+#        $self->add_bucket( $tag, $md5, $orig_key, undef, $orig_key );
+#        return;
+    }
+#    elsif ( !$is_deleted ) {
+    else {
+        my $keytag = $self->load_tag( $keyloc );
+        my ($subloc, $is_deleted) = $self->find_keyloc( $keytag );
+        if (!$subloc && !$is_deleted) {
+            ($subloc, $is_deleted) = $self->find_keyloc( $keytag, 0 );
+        }
+        if ( $subloc && !$is_deleted ) {
+            return $self->read_from_loc( $subloc, $orig_key );
+        }
     }
+
     return;
 }
 
@@ -657,18 +803,62 @@ sub delete_bucket {
     # Delete single key/value pair given tag and MD5 digested key.
     ##
     my $self = shift;
-    my ($obj, $tag, $md5) = @_;
+    my ($tag, $md5, $orig_key) = @_;
+
+    #ACID - Although this is a mutation, we must find any transaction.
+    # This is because we need to mark something as deleted that is in the HEAD.
+    my ($keyloc, $offset) = $self->_find_in_buckets( $tag, $md5 );
 
-    my ($subloc, $offset) = $self->_find_in_buckets( $tag, $md5 );
-    if ( $subloc ) {
-        my $fh = $obj->_fh;
-        seek($fh, $tag->{offset} + $offset + $obj->_root->{file_offset}, SEEK_SET);
-        print( $fh substr($tag->{content}, $offset + $self->{bucket_size} ) );
-        print( $fh chr(0) x $self->{bucket_size} );
+    return if !$keyloc;
 
-        return 1;
+    my $storage = $self->_storage;
+
+    my @transactions;
+    if ( $storage->transaction_id == 0 ) {
+        @transactions = $storage->current_transactions;
     }
-    return;
+
+    if ( $storage->transaction_id == 0 ) {
+        my $keytag = $self->load_tag( $keyloc );
+
+        my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
+        return if !$subloc || $is_deleted;
+
+        my $value = $self->read_from_loc( $subloc, $orig_key );
+
+        my $size = $self->_length_needed( $value, $orig_key );
+
+        for my $trans_id ( @transactions ) {
+            my ($loc, $is_deleted, $offset2) = $self->find_keyloc( $keytag, $trans_id );
+            unless ($loc) {
+                my $location2 = $storage->request_space( $size );
+                $storage->print_at( $keytag->{offset} + $offset2,
+                    pack($self->{long_pack}, $location2 ),
+                    pack( 'C C', $trans_id, 0 ),
+                );
+                $self->_write_value( $location2, $orig_key, $value, $orig_key );
+            }
+        }
+
+        $keytag = $self->load_tag( $keyloc );
+        ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
+        $storage->print_at( $keytag->{offset} + $offset,
+            substr( $keytag->{content}, $offset + $self->{key_size} ),
+            chr(0) x $self->{key_size},
+        );
+    }
+    else {
+        my $keytag = $self->load_tag( $keyloc );
+
+        my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
+
+        $storage->print_at( $keytag->{offset} + $offset,
+            pack($self->{long_pack}, 0 ),
+            pack( 'C C', $storage->transaction_id, 1 ),
+        );
+    }
+
+    return 1;
 }
 
 sub bucket_exists {
@@ -676,44 +866,51 @@ sub bucket_exists {
     # Check existence of single key given tag and MD5 digested key.
     ##
     my $self = shift;
-    my ($obj, $tag, $md5) = @_;
+    my ($tag, $md5) = @_;
 
-    my ($subloc, $offset) = $self->_find_in_buckets( $tag, $md5 );
-    return $subloc && 1;
+    #ACID - This is a read. Can find exact or HEAD
+    my ($keyloc) = $self->_find_in_buckets( $tag, $md5 );
+    my $keytag = $self->load_tag( $keyloc );
+    my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
+    if ( !$subloc && !$is_deleted ) {
+        ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag, 0 );
+    }
+    return ($subloc && !$is_deleted) && 1;
 }
 
-sub find_bucket_list {
+sub find_blist {
     ##
     # Locate offset for bucket list, given digested key
     ##
     my $self = shift;
-    my ($obj, $md5, $args) = @_;
+    my ($offset, $md5, $args) = @_;
     $args = {} unless $args;
 
     ##
     # Locate offset for bucket list using digest index system
     ##
-    my $tag = $self->load_tag($obj, $obj->_base_offset)
-        or $obj->_throw_error( "INTERNAL ERROR - Cannot find tag" );
+    my $tag = $self->load_tag( $offset )
+        or $self->_throw_error( "INTERNAL ERROR - Cannot find tag" );
 
-    my $ch = 0;
-    while ($tag->{signature} ne SIG_BLIST) {
+    #XXX What happens when $ch >= $self->{hash_size} ??
+    for (my $ch = 0; $tag->{signature} ne SIG_BLIST; $ch++) {
         my $num = ord substr($md5, $ch, 1);
 
         my $ref_loc = $tag->{offset} + ($num * $self->{long_size});
-        $tag = $self->index_lookup( $obj, $tag, $num );
+        $tag = $self->index_lookup( $tag, $num );
 
         if (!$tag) {
-            return if ! $args->{create};
+            return if !$args->{create};
+
+            my $loc = $self->_storage->request_space(
+                $self->tag_size( $self->{bucket_list_size} ),
+            );
 
-            my $fh = $obj->_fh;
-            seek($fh, $ref_loc + $obj->_root->{file_offset}, SEEK_SET);
-            print( $fh pack($self->{long_pack}, $obj->_root->{end}) );
+            $self->_storage->print_at( $ref_loc, pack($self->{long_pack}, $loc) );
 
-            $tag = $self->create_tag(
-                $obj, $obj->_root->{end},
-                SIG_BLIST,
-                chr(0) x $self->{bucket_list_size},
+            $tag = $self->write_tag(
+                $loc, SIG_BLIST,
+                chr(0)x$self->{bucket_list_size},
             );
 
             $tag->{ref_loc} = $ref_loc;
@@ -724,8 +921,6 @@ sub find_bucket_list {
 
         $tag->{ch} = $ch;
         $tag->{ref_loc} = $ref_loc;
-
-        $ch++;
     }
 
     return $tag;
@@ -736,7 +931,7 @@ sub index_lookup {
     # Given index tag, lookup single entry in index and return .
     ##
     my $self = shift;
-    my ($obj, $tag, $index) = @_;
+    my ($tag, $index) = @_;
 
     my $location = unpack(
         $self->{long_pack},
@@ -749,7 +944,7 @@ sub index_lookup {
 
     if (!$location) { return; }
 
-    return $self->load_tag( $obj, $location );
+    return $self->load_tag( $location );
 }
 
 sub traverse_index {
@@ -757,21 +952,18 @@ sub traverse_index {
     # Scan index and recursively step into deeper levels, looking for next key.
     ##
     my $self = shift;
-    my ($obj, $offset, $ch, $force_return_next) = @_;
-
-    my $tag = $self->load_tag($obj, $offset );
+    my ($xxxx, $offset, $ch, $force_return_next) = @_;
 
-    my $fh = $obj->_fh;
+    my $tag = $self->load_tag( $offset );
 
     if ($tag->{signature} ne SIG_BLIST) {
-        my $content = $tag->{content};
-        my $start = $obj->{return_next} ? 0 : ord(substr($obj->{prev_md5}, $ch, 1));
+        my $start = $xxxx->{return_next} ? 0 : ord(substr($xxxx->{prev_md5}, $ch, 1));
 
-        for (my $idx = $start; $idx < (2**8); $idx++) {
+        for (my $idx = $start; $idx < $self->{hash_chars_used}; $idx++) {
             my $subloc = unpack(
                 $self->{long_pack},
                 substr(
-                    $content,
+                    $tag->{content},
                     $idx * $self->{long_size},
                     $self->{long_size},
                 ),
@@ -779,85 +971,69 @@ sub traverse_index {
 
             if ($subloc) {
                 my $result = $self->traverse_index(
-                    $obj, $subloc, $ch + 1, $force_return_next,
+                    $xxxx, $subloc, $ch + 1, $force_return_next,
                 );
 
-                if (defined($result)) { return $result; }
+                if (defined $result) { return $result; }
             }
         } # index loop
 
-        $obj->{return_next} = 1;
-    } # tag is an index
-
+        $xxxx->{return_next} = 1;
+    }
+    # This is the bucket list
     else {
         my $keys = $tag->{content};
-        if ($force_return_next) { $obj->{return_next} = 1; }
+        if ($force_return_next) { $xxxx->{return_next} = 1; }
 
         ##
         # Iterate through buckets, looking for a key match
         ##
+        my $transaction_id = $self->_storage->transaction_id;
         for (my $i = 0; $i < $self->{max_buckets}; $i++) {
-            my ($key, $subloc) = $self->_get_key_subloc( $keys, $i );
+            my ($key, $keyloc) = $self->_get_key_subloc( $keys, $i );
 
             # End of bucket list -- return to outer loop
-            if (!$subloc) {
-                $obj->{return_next} = 1;
+            if (!$keyloc) {
+                $xxxx->{return_next} = 1;
                 last;
             }
             # Located previous key -- return next one found
-            elsif ($key eq $obj->{prev_md5}) {
-                $obj->{return_next} = 1;
+            elsif ($key eq $xxxx->{prev_md5}) {
+                $xxxx->{return_next} = 1;
                 next;
             }
             # Seek to bucket location and skip over signature
-            elsif ($obj->{return_next}) {
-                seek($fh, $subloc + $obj->_root->{file_offset}, SEEK_SET);
+            elsif ($xxxx->{return_next}) {
+                my $storage = $self->_storage;
+
+                my $keytag = $self->load_tag( $keyloc );
+                my ($subloc, $is_deleted) = $self->find_keyloc( $keytag );
+                if ( $subloc == 0 && !$is_deleted ) {
+                    ($subloc, $is_deleted) = $self->find_keyloc( $keytag, 0 );
+                }
+                next if $is_deleted;
 
                 # Skip over value to get to plain key
-                my $sig;
-                read( $fh, $sig, SIG_SIZE );
+                my $sig = $storage->read_at( $subloc, SIG_SIZE );
 
-                my $size;
-                read( $fh, $size, $self->{data_size});
+                my $size = $storage->read_at( undef, $self->{data_size} );
                 $size = unpack($self->{data_pack}, $size);
-                if ($size) { seek($fh, $size, SEEK_CUR); }
+                if ($size) { $storage->increment_pointer( $size ); }
 
                 # Read in plain key and return as scalar
-                my $plain_key;
-                read( $fh, $size, $self->{data_size});
+                $size = $storage->read_at( undef, $self->{data_size} );
                 $size = unpack($self->{data_pack}, $size);
-                if ($size) { read( $fh, $plain_key, $size); }
 
+                my $plain_key;
+                if ($size) { $plain_key = $storage->read_at( undef, $size); }
                 return $plain_key;
             }
         }
 
-        $obj->{return_next} = 1;
-    } # tag is a bucket list
-
-    return;
-}
-
-sub get_next_key {
-    ##
-    # Locate next key, given digested previous one
-    ##
-    my $self = shift;
-    my ($obj) = @_;
-
-    $obj->{prev_md5} = $_[1] ? $_[1] : undef;
-    $obj->{return_next} = 0;
-
-    ##
-    # If the previous key was not specifed, start at the top and
-    # return the first one found.
-    ##
-    if (!$obj->{prev_md5}) {
-        $obj->{prev_md5} = chr(0) x $self->{hash_size};
-        $obj->{return_next} = 1;
+        $xxxx->{return_next} = 1;
     }
 
-    return $self->traverse_index( $obj, $obj->_base_offset, 0 );
+    return;
 }
 
 # Utilities
@@ -866,7 +1042,9 @@ sub _get_key_subloc {
     my $self = shift;
     my ($keys, $idx) = @_;
 
-    my ($key, $subloc) = unpack(
+    return unpack(
+        # This is 'a', not 'A'. Please read the pack() documentation for the
+        # difference between the two and why it's important.
         "a$self->{hash_size} $self->{long_pack}",
         substr(
             $keys,
@@ -874,8 +1052,6 @@ sub _get_key_subloc {
             $self->{bucket_size},
         ),
     );
-
-    return ($key, $subloc);
 }
 
 sub _find_in_buckets {
@@ -884,57 +1060,120 @@ sub _find_in_buckets {
 
     BUCKET:
     for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
-        my ($key, $subloc) = $self->_get_key_subloc( $tag->{content}, $i );
-
-        return ($subloc, $i * $self->{bucket_size}) unless $subloc;
-
-        next BUCKET if $key ne $md5;
+        my ($key, $subloc) = $self->_get_key_subloc(
+            $tag->{content}, $i,
+        );
 
-        return ($subloc, $i * $self->{bucket_size});
+        next BUCKET if $subloc && $key ne $md5;
+        return( $subloc, $i * $self->{bucket_size} );
     }
 
     return;
 }
 
-sub _request_space {
+sub _release_space {
     my $self = shift;
-    my ($obj, $size) = @_;
+    my ($size, $loc) = @_;
+
+    my $next_loc = 0;
 
-    my $loc = $obj->_root->{end};
+    $self->_storage->print_at( $loc,
+        SIG_FREE, 
+        pack($self->{long_pack}, $size ),
+        pack($self->{long_pack}, $next_loc ),
+    );
 
-    return $loc;
+    return;
 }
 
-sub _release_space {
-    my $self = shift;
-    my ($obj, $size, $loc) = @_;
+sub _throw_error {
+    die "DBM::Deep: $_[1]\n";
+}
+
+sub _get_dbm_object {
+    my $item = shift;
+
+    my $obj = eval {
+        local $SIG{__DIE__};
+        if ($item->isa( 'DBM::Deep' )) {
+            return $item;
+        }
+        return;
+    };
+    return $obj if $obj;
+
+    my $r = Scalar::Util::reftype( $item ) || '';
+    if ( $r eq 'HASH' ) {
+        my $obj = eval {
+            local $SIG{__DIE__};
+            my $obj = tied(%$item);
+            if ($obj->isa( 'DBM::Deep' )) {
+                return $obj;
+            }
+            return;
+        };
+        return $obj if $obj;
+    }
+    elsif ( $r eq 'ARRAY' ) {
+        my $obj = eval {
+            local $SIG{__DIE__};
+            my $obj = tied(@$item);
+            if ($obj->isa( 'DBM::Deep' )) {
+                return $obj;
+            }
+            return;
+        };
+        return $obj if $obj;
+    }
 
     return;
 }
 
-1;
-__END__
-
-# This will be added in later, after more refactoring is done. This is an early
-# attempt at refactoring on the physical level instead of the virtual level.
-sub _read_at {
+sub _length_needed {
     my $self = shift;
-    my ($obj, $spot, $amount, $unpack) = @_;
+    my ($value, $key) = @_;
 
-    my $fh = $obj->_fh;
-    seek( $fh, $spot + $obj->_root->{file_offset}, SEEK_SET );
+    my $is_dbm_deep = eval {
+        local $SIG{'__DIE__'};
+        $value->isa( 'DBM::Deep' );
+    };
 
-    my $buffer;
-    my $bytes_read = read( $fh, $buffer, $amount );
+    my $len = SIG_SIZE
+            + $self->{data_size} # size for value
+            + $self->{data_size} # size for key
+            + length( $key );    # length of key
 
-    if ( $unpack ) {
-        $buffer = unpack( $unpack, $buffer );
+    if ( $is_dbm_deep && $value->_storage eq $self->_storage ) {
+        # long_size is for the internal reference
+        return $len + $self->{long_size};
     }
 
-    if ( wantarray ) {
-        return ($buffer, $bytes_read);
+    if ( $self->_storage->{autobless} ) {
+        # This is for the bit saying whether or not this thing is blessed.
+        $len += 1;
     }
-    else {
-        return $buffer;
+
+    my $r = Scalar::Util::reftype( $value ) || '';
+    unless ( $r eq 'HASH' || $r eq 'ARRAY' ) {
+        if ( defined $value ) {
+            $len += length( $value );
+        }
+        return $len;
+    }
+
+    $len += $self->{index_size};
+
+    # if autobless is enabled, must also take into consideration
+    # the class name as it is stored after the key.
+    if ( $self->_storage->{autobless} ) {
+        my $c = Scalar::Util::blessed($value);
+        if ( defined $c && !$is_dbm_deep ) {
+            $len += $self->{data_size} + length($c);
+        }
     }
+
+    return $len;
 }
+
+1;
+__END__