r14186@rob-kinyons-powerbook58: rob | 2006-06-14 11:44:48 -0400
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
index b47f58d..73917a4 100644 (file)
 package DBM::Deep::Engine;
 
+use 5.6.0;
+
 use strict;
+use warnings;
+
+our $VERSION = q(0.99_03);
+
+use Fcntl qw( :DEFAULT :flock );
+use Scalar::Util ();
+
+# File-wide notes:
+# * To add to bucket_size, make sure you modify the following:
+#   - calculate_sizes()
+#   - _get_key_subloc()
+#   - add_bucket() - where the buckets are printed
+#
+# * Every method in here assumes that the _storage has been appropriately
+#   safeguarded. This can be anything from flock() to some sort of manual
+#   mutex. But, it's the caller's responsability to make sure that this has
+#   been done.
+
+##
+# Setup file and tag signatures.  These should never change.
+##
+sub SIG_FILE     () { 'DPDB' }
+sub SIG_HEADER   () { 'h'    }
+sub SIG_INTERNAL () { 'i'    }
+sub SIG_HASH     () { 'H'    }
+sub SIG_ARRAY    () { 'A'    }
+sub SIG_NULL     () { 'N'    }
+sub SIG_DATA     () { 'D'    }
+sub SIG_INDEX    () { 'I'    }
+sub SIG_BLIST    () { 'B'    }
+sub SIG_FREE     () { 'F'    }
+sub SIG_KEYS     () { 'K'    }
+sub SIG_SIZE     () {  1     }
+
+# This is the transaction ID for the HEAD
+sub HEAD () { 0 }
+
+################################################################################
+#
+# This is new code. It is a complete rewrite of the engine based on a new API
+#
+################################################################################
+
+sub read_value {
+    my $self = shift;
+    my ($offset, $key, $orig_key) = @_;
 
-use Fcntl qw( :DEFAULT :flock :seek );
+    my $dig_key = $self->apply_digest( $key );
+    my $tag = $self->find_blist( $offset, $dig_key ) or return;
+    return $self->get_bucket_value( $tag, $dig_key, $orig_key );
+}
 
-sub open {
-    ##
-    # Open a fh to the database, create if nonexistent.
-    # Make sure file signature matches DBM::Deep spec.
-    ##
+sub key_exists {
     my $self = shift;
-    my $obj = shift;
-
-    if (defined($obj->_fh)) { $self->close( $obj ); }
+    my ($offset, $key) = @_;
 
-    eval {
-        local $SIG{'__DIE__'};
-        # Theoretically, adding O_BINARY should remove the need for the binmode
-        # Of course, testing it is going to be ... interesting.
-        my $flags = O_RDWR | O_CREAT | O_BINARY;
+    my $dig_key = $self->apply_digest( $key );
+    # exists() returns the empty string, not undef
+    my $tag = $self->find_blist( $offset, $dig_key ) or return '';
+    return $self->bucket_exists( $tag, $dig_key, $key );
+}
 
-        my $fh;
-        sysopen( $fh, $obj->_root->{file}, $flags )
-            or $fh = undef;
-        $obj->_root->{fh} = $fh;
-    }; if ($@ ) { $obj->_throw_error( "Received error: $@\n" ); }
-    if (! defined($obj->_fh)) {
-        return $obj->_throw_error("Cannot sysopen file: " . $obj->_root->{file} . ": $!");
+sub get_next_key {
+    my $self = shift;
+    my ($offset) = @_;
+
+    # If the previous key was not specifed, start at the top and
+    # return the first one found.
+    my $temp;
+    if ( @_ > 1 ) {
+        $temp = {
+            prev_md5    => $self->apply_digest($_[1]),
+            return_next => 0,
+        };
+    }
+    else {
+        $temp = {
+            prev_md5    => chr(0) x $self->{hash_size},
+            return_next => 1,
+        };
     }
 
-    my $fh = $obj->_fh;
+    return $self->traverse_index( $temp, $offset, 0 );
+}
 
-    #XXX Can we remove this by using the right sysopen() flags?
-    # Maybe ... q.v. above
-    binmode $fh; # for win32
+sub delete_key {
+    my $self = shift;
+    my ($offset, $key, $orig_key) = @_;
 
-    if ($obj->_root->{autoflush}) {
-        my $old = select $fh;
-        $|=1;
-        select $old;
-    }
+    my $dig_key = $self->apply_digest( $key );
+    my $tag = $self->find_blist( $offset, $dig_key ) or return;
+    my $value = $self->get_bucket_value( $tag, $dig_key, $orig_key );
+    $self->delete_bucket( $tag, $dig_key, $orig_key );
+    return $value;
+}
 
-    seek($fh, 0 + $obj->_root->{file_offset}, SEEK_SET);
+sub write_value {
+    my $self = shift;
+    my ($offset, $key, $value, $orig_key) = @_;
 
-    my $signature;
-    my $bytes_read = read( $fh, $signature, length(DBM::Deep->SIG_FILE));
+    my $dig_key = $self->apply_digest( $key );
+    my $tag = $self->find_blist( $offset, $dig_key, { create => 1 } );
+    return $self->add_bucket( $tag, $dig_key, $key, $value, undef, $orig_key );
+}
 
-    ##
-    # File is empty -- write signature and master index
-    ##
-    if (!$bytes_read) {
-        seek($fh, 0 + $obj->_root->{file_offset}, SEEK_SET);
-        print( $fh DBM::Deep->SIG_FILE);
-        $self->create_tag($obj, $obj->_base_offset, $obj->_type, chr(0) x $DBM::Deep::INDEX_SIZE);
+################################################################################
+#
+# Below here is the old code. It will be folded into the code above as it can.
+#
+################################################################################
+
+sub new {
+    my $class = shift;
+    my ($args) = @_;
+
+    my $self = bless {
+        long_size => 4,
+        long_pack => 'N',
+        data_size => 4,
+        data_pack => 'N',
 
-        my $plain_key = "[base]";
-        print( $fh pack($DBM::Deep::DATA_LENGTH_PACK, length($plain_key)) . $plain_key );
+        digest    => \&Digest::MD5::md5,
+        hash_size => 16, # In bytes
 
-        # Flush the filehandle
-        my $old_fh = select $fh;
-        my $old_af = $|; $| = 1; $| = $old_af;
-        select $old_fh;
+        ##
+        # Number of buckets per blist before another level of indexing is
+        # done. Increase this value for slightly greater speed, but larger database
+        # files. DO NOT decrease this value below 16, due to risk of recursive
+        # reindex overrun.
+        ##
+        max_buckets => 16,
 
-        my @stats = stat($fh);
-        $obj->_root->{inode} = $stats[1];
-        $obj->_root->{end} = $stats[7];
+        storage => undef,
+        obj     => undef,
+    }, $class;
 
-        return 1;
+    if ( defined $args->{pack_size} ) {
+        if ( lc $args->{pack_size} eq 'small' ) {
+            $args->{long_size} = 2;
+            $args->{long_pack} = 'n';
+        }
+        elsif ( lc $args->{pack_size} eq 'medium' ) {
+            $args->{long_size} = 4;
+            $args->{long_pack} = 'N';
+        }
+        elsif ( lc $args->{pack_size} eq 'large' ) {
+            $args->{long_size} = 8;
+            $args->{long_pack} = 'Q';
+        }
+        else {
+            die "Unknown pack_size value: '$args->{pack_size}'\n";
+        }
     }
 
-    ##
-    # Check signature was valid
-    ##
-    unless ($signature eq DBM::Deep->SIG_FILE) {
-        $self->close( $obj );
-        return $obj->_throw_error("Signature not found -- file is not a Deep DB");
+    # Grab the parameters we want to use
+    foreach my $param ( keys %$self ) {
+        next unless exists $args->{$param};
+        $self->{$param} = $args->{$param};
     }
+    Scalar::Util::weaken( $self->{obj} ) if $self->{obj};
 
-    my @stats = stat($fh);
-    $obj->_root->{inode} = $stats[1];
-    $obj->_root->{end} = $stats[7];
+    if ( $self->{max_buckets} < 16 ) {
+        warn "Floor of max_buckets is 16. Setting it to 16 from '$self->{max_buckets}'\n";
+        $self->{max_buckets} = 16;
+    }
 
-    ##
-    # Get our type from master index signature
-    ##
-    my $tag = $self->load_tag($obj, $obj->_base_offset);
+    return $self;
+}
+
+sub _storage { return $_[0]{storage} }
+
+sub apply_digest {
+    my $self = shift;
+    return $self->{digest}->(@_);
+}
+
+sub calculate_sizes {
+    my $self = shift;
+
+    # The 2**8 here indicates the number of different characters in the
+    # current hashing algorithm
+    #XXX Does this need to be updated with different hashing algorithms?
+    $self->{hash_chars_used}  = (2**8);
+    $self->{index_size}       = $self->{hash_chars_used} * $self->{long_size};
+
+    $self->{bucket_size}      = $self->{hash_size} + $self->{long_size} * 2;
+    $self->{bucket_list_size} = $self->{max_buckets} * $self->{bucket_size};
+
+    $self->{key_size}         = $self->{long_size} * 2;
+    $self->{keyloc_size}      = $self->{max_buckets} * $self->{key_size};
+
+    return;
+}
 
-#XXX We probably also want to store the hash algorithm name and not assume anything
-#XXX The cool thing would be to allow a different hashing algorithm at every level
+sub write_file_header {
+    my $self = shift;
+
+    my $loc = $self->_storage->request_space( length( SIG_FILE ) + 33 );
+
+    $self->_storage->print_at( $loc,
+        SIG_FILE,
+        SIG_HEADER,
+        pack('N', 1),  # header version
+        pack('N', 24), # header size
+        pack('N4', 0, 0, 0, 0),  # currently running transaction IDs
+        pack('n', $self->{long_size}),
+        pack('A', $self->{long_pack}),
+        pack('n', $self->{data_size}),
+        pack('A', $self->{data_pack}),
+        pack('n', $self->{max_buckets}),
+    );
+
+    $self->_storage->set_transaction_offset( 13 );
+
+    return;
+}
 
-    if (!$tag) {
-        return $obj->_throw_error("Corrupted file, no master index record");
+sub read_file_header {
+    my $self = shift;
+
+    my $buffer = $self->_storage->read_at( 0, length(SIG_FILE) + 9 );
+    return unless length($buffer);
+
+    my ($file_signature, $sig_header, $header_version, $size) = unpack(
+        'A4 A N N', $buffer
+    );
+
+    unless ( $file_signature eq SIG_FILE ) {
+        $self->_storage->close;
+        $self->_throw_error( "Signature not found -- file is not a Deep DB" );
     }
-    if ($obj->{type} ne $tag->{signature}) {
-        return $obj->_throw_error("File type mismatch");
+
+    unless ( $sig_header eq SIG_HEADER ) {
+        $self->_storage->close;
+        $self->_throw_error( "Old file version found." );
     }
 
-    return 1;
+    my $buffer2 = $self->_storage->read_at( undef, $size );
+    my ($a1, $a2, $a3, $a4, @values) = unpack( 'N4 n A n A n', $buffer2 );
+
+    $self->_storage->set_transaction_offset( 13 );
+
+    if ( @values < 5 || grep { !defined } @values ) {
+        $self->_storage->close;
+        $self->_throw_error("Corrupted file - bad header");
+    }
+
+    #XXX Add warnings if values weren't set right
+    @{$self}{qw(long_size long_pack data_size data_pack max_buckets)} = @values;
+
+    return length($buffer) + length($buffer2);
 }
 
-sub close {
+sub setup_fh {
     my $self = shift;
-    my $obj = shift;
+    my ($obj) = @_;
 
-    if ( my $fh = $obj->_root->{fh} ) {
-        close $fh;
+    # Need to remove use of $fh here
+    my $fh = $self->_storage->{fh};
+    flock $fh, LOCK_EX;
+
+    #XXX The duplication of calculate_sizes needs to go away
+    unless ( $obj->{base_offset} ) {
+        my $bytes_read = $self->read_file_header;
+
+        $self->calculate_sizes;
+
+        ##
+        # File is empty -- write header and master index
+        ##
+        if (!$bytes_read) {
+            $self->_storage->audit( "# Database created on" );
+
+            $self->write_file_header;
+
+            $obj->{base_offset} = $self->_storage->request_space(
+                $self->tag_size( $self->{index_size} ),
+            );
+
+            $self->write_tag(
+                $obj->_base_offset, $obj->_type,
+                chr(0)x$self->{index_size},
+            );
+
+            # Flush the filehandle
+            my $old_fh = select $fh;
+            my $old_af = $|; $| = 1; $| = $old_af;
+            select $old_fh;
+        }
+        else {
+            $obj->{base_offset} = $bytes_read;
+
+            ##
+            # Get our type from master index header
+            ##
+            my $tag = $self->load_tag($obj->_base_offset);
+            unless ( $tag ) {
+                flock $fh, LOCK_UN;
+                $self->_throw_error("Corrupted file, no master index record");
+            }
+
+            unless ($obj->_type eq $tag->{signature}) {
+                flock $fh, LOCK_UN;
+                $self->_throw_error("File type mismatch");
+            }
+        }
+    }
+    else {
+        $self->calculate_sizes;
     }
-    $obj->_root->{fh} = undef;
+
+    #XXX We have to make sure we don't mess up when autoflush isn't turned on
+    $self->_storage->set_inode;
+
+    flock $fh, LOCK_UN;
 
     return 1;
 }
 
-sub create_tag {
+sub tag_size {
+    my $self = shift;
+    my ($size) = @_;
+    return SIG_SIZE + $self->{data_size} + $size;
+}
+
+sub write_tag {
     ##
     # Given offset, signature and content, create tag and write to disk
     ##
     my $self = shift;
-    my ($obj, $offset, $sig, $content) = @_;
-    my $size = length($content);
+    my ($offset, $sig, $content) = @_;
+    my $size = length( $content );
 
-    my $fh = $obj->_fh;
+    $self->_storage->print_at(
+        $offset, 
+        $sig, pack($self->{data_pack}, $size), $content,
+    );
 
-    seek($fh, $offset + $obj->_root->{file_offset}, SEEK_SET);
-    print( $fh $sig . pack($DBM::Deep::DATA_LENGTH_PACK, $size) . $content );
-
-    if ($offset == $obj->_root->{end}) {
-        $obj->_root->{end} += DBM::Deep->SIG_SIZE + $DBM::Deep::DATA_LENGTH_SIZE + $size;
-    }
+    return unless defined $offset;
 
     return {
         signature => $sig,
-        size => $size,
-        offset => $offset + DBM::Deep->SIG_SIZE + $DBM::Deep::DATA_LENGTH_SIZE,
-        content => $content
+        #XXX Is this even used?
+        size      => $size,
+        offset    => $offset + SIG_SIZE + $self->{data_size},
+        content   => $content
     };
 }
 
@@ -141,39 +357,41 @@ sub load_tag {
     # Given offset, load single tag and return signature, size and data
     ##
     my $self = shift;
-    my ($obj, $offset) = @_;
-
-    my $fh = $obj->_fh;
-
-    seek($fh, $offset + $obj->_root->{file_offset}, SEEK_SET);
-    if (eof $fh) { return undef; }
+    my ($offset) = @_;
 
-    my $b;
-    read( $fh, $b, DBM::Deep->SIG_SIZE + $DBM::Deep::DATA_LENGTH_SIZE );
-    my ($sig, $size) = unpack( "A $DBM::Deep::DATA_LENGTH_PACK", $b );
+    my $storage = $self->_storage;
 
-    my $buffer;
-    read( $fh, $buffer, $size);
+    my ($sig, $size) = unpack(
+        "A $self->{data_pack}",
+        $storage->read_at( $offset, SIG_SIZE + $self->{data_size} ),
+    );
 
     return {
         signature => $sig,
-        size => $size,
-        offset => $offset + DBM::Deep->SIG_SIZE + $DBM::Deep::DATA_LENGTH_SIZE,
-        content => $buffer
+        size      => $size,   #XXX Is this even used?
+        start     => $offset,
+        offset    => $offset + SIG_SIZE + $self->{data_size},
+        content   => $storage->read_at( undef, $size ),
     };
 }
 
-sub index_lookup {
-    ##
-    # Given index tag, lookup single entry in index and return .
-    ##
+sub find_keyloc {
     my $self = shift;
-    my ($obj, $tag, $index) = @_;
-
-    my $location = unpack($DBM::Deep::LONG_PACK, substr($tag->{content}, $index * $DBM::Deep::LONG_SIZE, $DBM::Deep::LONG_SIZE) );
-    if (!$location) { return; }
+    my ($tag, $transaction_id) = @_;
+    $transaction_id = $self->_storage->transaction_id
+        unless defined $transaction_id;
+
+    for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
+        my ($loc, $trans_id, $is_deleted) = unpack(
+            "$self->{long_pack} C C",
+            substr( $tag->{content}, $i * $self->{key_size}, $self->{key_size} ),
+        );
+
+        next if $loc != HEAD && $transaction_id != $trans_id;
+        return( $loc, $is_deleted, $i * $self->{key_size} );
+    }
 
-    return $self->load_tag( $obj, $location );
+    return;
 }
 
 sub add_bucket {
@@ -182,546 +400,779 @@ sub add_bucket {
     # plain (undigested) key and value.
     ##
     my $self = shift;
-    my ($obj, $tag, $md5, $plain_key, $value) = @_;
-    my $keys = $tag->{content};
-    my $location = 0;
-    my $result = 2;
+    my ($tag, $md5, $plain_key, $value, $deleted, $orig_key) = @_;
 
-    my $root = $obj->_root;
+    # This verifies that only supported values will be stored.
+    {
+        my $r = Scalar::Util::reftype( $value );
 
-    my $is_dbm_deep = eval { local $SIG{'__DIE__'}; $value->isa( 'DBM::Deep' ) };
-    my $internal_ref = $is_dbm_deep && ($value->_root eq $root);
+        last if !defined $r;
+        last if $r eq 'HASH';
+        last if $r eq 'ARRAY';
 
-    my $fh = $obj->_fh;
+        $self->_throw_error(
+            "Storage of references of type '$r' is not supported."
+        );
+    }
 
-    ##
-    # Iterate through buckets, seeing if this is a new entry or a replace.
-    ##
-    for (my $i=0; $i<$DBM::Deep::MAX_BUCKETS; $i++) {
-        my $subloc = unpack($DBM::Deep::LONG_PACK, substr($keys, ($i * $DBM::Deep::BUCKET_SIZE) + $DBM::Deep::HASH_SIZE, $DBM::Deep::LONG_SIZE));
-        if (!$subloc) {
-            ##
-            # Found empty bucket (end of list).  Populate and exit loop.
-            ##
-            $result = 2;
+    my $storage = $self->_storage;
 
-            $location = $internal_ref
-                ? $value->_base_offset
-                : $root->{end};
+    #ACID - This is a mutation. Must only find the exact transaction
+    my ($keyloc, $offset) = $self->_find_in_buckets( $tag, $md5, 1 );
 
-            seek($fh, $tag->{offset} + ($i * $DBM::Deep::BUCKET_SIZE) + $root->{file_offset}, SEEK_SET);
-            print( $fh $md5 . pack($DBM::Deep::LONG_PACK, $location) );
-            last;
+    my @transactions;
+    if ( $storage->transaction_id == 0 ) {
+        @transactions = $storage->current_transactions;
+    }
+
+#    $self->_release_space( $size, $subloc );
+#XXX This needs updating to use _release_space
+
+    my $location;
+    my $size = $self->_length_needed( $value, $plain_key );
+
+    # Updating a known md5
+    if ( $keyloc ) {
+        my $keytag = $self->load_tag( $keyloc );
+        my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
+
+        if ( $subloc && !$is_deleted && @transactions ) {
+            my $old_value = $self->read_from_loc( $subloc, $orig_key );
+            my $old_size = $self->_length_needed( $old_value, $plain_key );
+
+            for my $trans_id ( @transactions ) {
+                my ($loc, $is_deleted, $offset2) = $self->find_keyloc( $keytag, $trans_id );
+                unless ($loc) {
+                    my $location2 = $storage->request_space( $old_size );
+                    $storage->print_at( $keytag->{offset} + $offset2,
+                        pack($self->{long_pack}, $location2 ),
+                        pack( 'C C', $trans_id, 0 ),
+                    );
+                    $self->_write_value( $location2, $plain_key, $old_value, $orig_key );
+                }
+            }
         }
 
-        my $key = substr($keys, $i * $DBM::Deep::BUCKET_SIZE, $DBM::Deep::HASH_SIZE);
-        if ($md5 eq $key) {
-            ##
-            # Found existing bucket with same key.  Replace with new value.
-            ##
-            $result = 1;
+        $location = $self->_storage->request_space( $size );
+        #XXX This needs to be transactionally-aware in terms of which keytag->{offset} to use
+        $storage->print_at( $keytag->{offset} + $offset,
+            pack($self->{long_pack}, $location ),
+            pack( 'C C', $storage->transaction_id, 0 ),
+        );
+    }
+    # Adding a new md5
+    else {
+        my $keyloc = $storage->request_space( $self->tag_size( $self->{keyloc_size} ) );
+
+        # The bucket fit into list
+        if ( defined $offset ) {
+            $storage->print_at( $tag->{offset} + $offset,
+                $md5, pack( $self->{long_pack}, $keyloc ),
+            );
+        }
+        # If bucket didn't fit into list, split into a new index level
+        else {
+            $self->split_index( $tag, $md5, $keyloc );
+        }
 
-            if ($internal_ref) {
-                $location = $value->_base_offset;
-                seek($fh, $tag->{offset} + ($i * $DBM::Deep::BUCKET_SIZE) + $root->{file_offset}, SEEK_SET);
-                print( $fh $md5 . pack($DBM::Deep::LONG_PACK, $location) );
-                return $result;
-            }
+        my $keytag = $self->write_tag(
+            $keyloc, SIG_KEYS, chr(0)x$self->{keyloc_size},
+        );
+
+        $location = $self->_storage->request_space( $size );
+        $storage->print_at( $keytag->{offset},
+            pack( $self->{long_pack}, $location ),
+            pack( 'C C', $storage->transaction_id, 0 ),
+        );
+
+        my $offset = 1;
+        for my $trans_id ( @transactions ) {
+            $storage->print_at( $keytag->{offset} + $self->{key_size} * $offset++,
+                pack( $self->{long_pack}, 0 ),
+                pack( 'C C', $trans_id, 1 ),
+            );
+        }
+    }
 
-            seek($fh, $subloc + DBM::Deep->SIG_SIZE + $root->{file_offset}, SEEK_SET);
-            my $size;
-            read( $fh, $size, $DBM::Deep::DATA_LENGTH_SIZE); $size = unpack($DBM::Deep::DATA_LENGTH_PACK, $size);
+    $self->_write_value( $location, $plain_key, $value, $orig_key );
 
-            ##
-            # If value is a hash, array, or raw value with equal or less size, we can
-            # reuse the same content area of the database.  Otherwise, we have to create
-            # a new content area at the EOF.
-            ##
-            my $actual_length;
-            my $r = Scalar::Util::reftype( $value ) || '';
-            if ( $r eq 'HASH' || $r eq 'ARRAY' ) {
-                $actual_length = $DBM::Deep::INDEX_SIZE;
-
-                # if autobless is enabled, must also take into consideration
-                # the class name, as it is stored along with key/value.
-                if ( $root->{autobless} ) {
-                    my $value_class = Scalar::Util::blessed($value);
-                    if ( defined $value_class && !$value->isa('DBM::Deep') ) {
-                        $actual_length += length($value_class);
-                    }
-                }
-            }
-            else { $actual_length = length($value); }
+    return 1;
+}
 
-            if ($actual_length <= $size) {
-                $location = $subloc;
-            }
-            else {
-                $location = $root->{end};
-                seek($fh, $tag->{offset} + ($i * $DBM::Deep::BUCKET_SIZE) + $DBM::Deep::HASH_SIZE + $root->{file_offset}, SEEK_SET);
-                print( $fh pack($DBM::Deep::LONG_PACK, $location) );
-            }
+sub _write_value {
+    my $self = shift;
+    my ($location, $key, $value, $orig_key) = @_;
 
-            last;
+    my $storage = $self->_storage;
+
+    my $dbm_deep_obj = _get_dbm_object( $value );
+    if ( $dbm_deep_obj && $dbm_deep_obj->_storage ne $storage ) {
+        $self->_throw_error( "Cannot cross-reference. Use export() instead" );
+    }
+
+    ##
+    # Write signature based on content type, set content length and write
+    # actual value.
+    ##
+    my $r = Scalar::Util::reftype( $value ) || '';
+    if ( $dbm_deep_obj ) {
+        $self->write_tag( $location, SIG_INTERNAL,pack($self->{long_pack}, $dbm_deep_obj->_base_offset) );
+    }
+    elsif ($r eq 'HASH') {
+        if ( !$dbm_deep_obj && tied %{$value} ) {
+            $self->_throw_error( "Cannot store something that is tied" );
+        }
+        $self->write_tag( $location, SIG_HASH, chr(0)x$self->{index_size} );
+    }
+    elsif ($r eq 'ARRAY') {
+        if ( !$dbm_deep_obj && tied @{$value} ) {
+            $self->_throw_error( "Cannot store something that is tied" );
         }
+        $self->write_tag( $location, SIG_ARRAY, chr(0)x$self->{index_size} );
+    }
+    elsif (!defined($value)) {
+        $self->write_tag( $location, SIG_NULL, '' );
+    }
+    else {
+        $self->write_tag( $location, SIG_DATA, $value );
     }
 
     ##
-    # If this is an internal reference, return now.
-    # No need to write value or plain key
+    # Plain key is stored AFTER value, as keys are typically fetched less often.
     ##
-    if ($internal_ref) {
-        return $result;
+    $storage->print_at( undef, pack($self->{data_pack}, length($key)) . $key );
+
+    # Internal references don't care about autobless
+    return 1 if $dbm_deep_obj;
+
+    ##
+    # If value is blessed, preserve class name
+    ##
+    if ( $storage->{autobless} ) {
+        if ( defined( my $c = Scalar::Util::blessed($value) ) ) {
+            $storage->print_at( undef, chr(1), pack($self->{data_pack}, length($c)) . $c );
+        }
+        else {
+            $storage->print_at( undef, chr(0) );
+        }
     }
 
     ##
-    # If bucket didn't fit into list, split into a new index level
+    # Tie the passed in reference so that changes to it are reflected in the
+    # datafile. The use of $location as the base_offset will act as the
+    # the linkage between parent and child.
+    #
+    # The overall assignment is a hack around the fact that just tying doesn't
+    # store the values. This may not be the wrong thing to do.
     ##
-    if (!$location) {
-        seek($fh, $tag->{ref_loc} + $root->{file_offset}, SEEK_SET);
-        print( $fh pack($DBM::Deep::LONG_PACK, $root->{end}) );
+    if ($r eq 'HASH') {
+        my %x = %$value;
+        tie %$value, 'DBM::Deep', {
+            base_offset => $location,
+            storage     => $storage,
+            parent      => $self->{obj},
+            parent_key  => $orig_key,
+        };
+        %$value = %x;
+        bless $value, 'DBM::Deep::Hash' unless Scalar::Util::blessed( $value );
+    }
+    elsif ($r eq 'ARRAY') {
+        my @x = @$value;
+        tie @$value, 'DBM::Deep', {
+            base_offset => $location,
+            storage     => $storage,
+            parent      => $self->{obj},
+            parent_key  => $orig_key,
+        };
+        @$value = @x;
+        bless $value, 'DBM::Deep::Array' unless Scalar::Util::blessed( $value );
+    }
 
-        my $index_tag = $self->create_tag($obj, $root->{end}, DBM::Deep->SIG_INDEX, chr(0) x $DBM::Deep::INDEX_SIZE);
-        my @offsets = ();
+    return 1;
+}
 
-        $keys .= $md5 . pack($DBM::Deep::LONG_PACK, 0);
+sub split_index {
+    my $self = shift;
+    my ($tag, $md5, $keyloc) = @_;
 
-        for (my $i=0; $i<=$DBM::Deep::MAX_BUCKETS; $i++) {
-            my $key = substr($keys, $i * $DBM::Deep::BUCKET_SIZE, $DBM::Deep::HASH_SIZE);
-            if ($key) {
-                my $old_subloc = unpack($DBM::Deep::LONG_PACK, substr($keys, ($i * $DBM::Deep::BUCKET_SIZE) +
-                        $DBM::Deep::HASH_SIZE, $DBM::Deep::LONG_SIZE));
-                my $num = ord(substr($key, $tag->{ch} + 1, 1));
+    my $storage = $self->_storage;
 
-                if ($offsets[$num]) {
-                    my $offset = $offsets[$num] + DBM::Deep->SIG_SIZE + $DBM::Deep::DATA_LENGTH_SIZE;
-                    seek($fh, $offset + $root->{file_offset}, SEEK_SET);
-                    my $subkeys;
-                    read( $fh, $subkeys, $DBM::Deep::BUCKET_LIST_SIZE);
+    my $loc = $storage->request_space(
+        $self->tag_size( $self->{index_size} ),
+    );
 
-                    for (my $k=0; $k<$DBM::Deep::MAX_BUCKETS; $k++) {
-                        my $subloc = unpack($DBM::Deep::LONG_PACK, substr($subkeys, ($k * $DBM::Deep::BUCKET_SIZE) +
-                                $DBM::Deep::HASH_SIZE, $DBM::Deep::LONG_SIZE));
-                        if (!$subloc) {
-                            seek($fh, $offset + ($k * $DBM::Deep::BUCKET_SIZE) + $root->{file_offset}, SEEK_SET);
-                            print( $fh $key . pack($DBM::Deep::LONG_PACK, $old_subloc || $root->{end}) );
-                            last;
-                        }
-                    } # k loop
-                }
-                else {
-                    $offsets[$num] = $root->{end};
-                    seek($fh, $index_tag->{offset} + ($num * $DBM::Deep::LONG_SIZE) + $root->{file_offset}, SEEK_SET);
-                    print( $fh pack($DBM::Deep::LONG_PACK, $root->{end}) );
+    $storage->print_at( $tag->{ref_loc}, pack($self->{long_pack}, $loc) );
 
-                    my $blist_tag = $self->create_tag($obj, $root->{end}, DBM::Deep->SIG_BLIST, chr(0) x $DBM::Deep::BUCKET_LIST_SIZE);
+    my $index_tag = $self->write_tag(
+        $loc, SIG_INDEX,
+        chr(0)x$self->{index_size},
+    );
 
-                    seek($fh, $blist_tag->{offset} + $root->{file_offset}, SEEK_SET);
-                    print( $fh $key . pack($DBM::Deep::LONG_PACK, $old_subloc || $root->{end}) );
-                }
-            } # key is real
-        } # i loop
+    my $keys = $tag->{content}
+             . $md5 . pack($self->{long_pack}, $keyloc);
+
+    my @newloc = ();
+    BUCKET:
+    # The <= here is deliberate - we have max_buckets+1 keys to iterate
+    # through, unlike every other loop that uses max_buckets as a stop.
+    for (my $i = 0; $i <= $self->{max_buckets}; $i++) {
+        my ($key, $old_subloc) = $self->_get_key_subloc( $keys, $i );
+
+        die "[INTERNAL ERROR]: No key in split_index()\n" unless $key;
+        die "[INTERNAL ERROR]: No subloc in split_index()\n" unless $old_subloc;
+
+        my $num = ord(substr($key, $tag->{ch} + 1, 1));
+
+        if ($newloc[$num]) {
+            my $subkeys = $storage->read_at( $newloc[$num], $self->{bucket_list_size} );
+
+            # This is looking for the first empty spot
+            my ($subloc, $offset) = $self->_find_in_buckets(
+                { content => $subkeys }, '',
+            );
+
+            $storage->print_at(
+                $newloc[$num] + $offset,
+                $key, pack($self->{long_pack}, $old_subloc),
+            );
+
+            next;
+        }
+
+        my $loc = $storage->request_space(
+            $self->tag_size( $self->{bucket_list_size} ),
+        );
+
+        $storage->print_at(
+            $index_tag->{offset} + ($num * $self->{long_size}),
+            pack($self->{long_pack}, $loc),
+        );
+
+        my $blist_tag = $self->write_tag(
+            $loc, SIG_BLIST,
+            chr(0)x$self->{bucket_list_size},
+        );
+
+        $storage->print_at( $blist_tag->{offset}, $key . pack($self->{long_pack}, $old_subloc) );
+
+        $newloc[$num] = $blist_tag->{offset};
+    }
+
+    $self->_release_space(
+        $self->tag_size( $self->{bucket_list_size} ),
+        $tag->{start},
+    );
+
+    return 1;
+}
+
+sub read_from_loc {
+    my $self = shift;
+    my ($subloc, $orig_key) = @_;
+
+    my $storage = $self->_storage;
 
-        $location ||= $root->{end};
-    } # re-index bucket list
+    my $signature = $storage->read_at( $subloc, SIG_SIZE );
 
     ##
-    # Seek to content area and store signature, value and plaintext key
+    # If value is a hash or array, return new DBM::Deep object with correct offset
     ##
-    if ($location) {
-        my $content_length;
-        seek($fh, $location + $root->{file_offset}, SEEK_SET);
+    if (($signature eq SIG_HASH) || ($signature eq SIG_ARRAY)) {
+        #XXX This needs to be a singleton
+#        my $new_obj;
+#        my $is_autobless;
+#        if ( $signature eq SIG_HASH ) {
+#            $new_obj = {};
+#            tie %$new_obj, 'DBM::Deep', {
+#                base_offset => $subloc,
+#                storage     => $self->_storage,
+#                parent      => $self->{obj},
+#                parent_key  => $orig_key,
+#            };
+#            $is_autobless = tied(%$new_obj)->_storage->{autobless};
+#        }
+#        else {
+#            $new_obj = [];
+#            tie @$new_obj, 'DBM::Deep', {
+#                base_offset => $subloc,
+#                storage     => $self->_storage,
+#                parent      => $self->{obj},
+#                parent_key  => $orig_key,
+#            };
+#            $is_autobless = tied(@$new_obj)->_storage->{autobless};
+#        }
+#
+#        if ($is_autobless) {
+
+        my $new_obj = DBM::Deep->new({
+            type        => $signature,
+            base_offset => $subloc,
+            storage     => $self->_storage,
+            parent      => $self->{obj},
+            parent_key  => $orig_key,
+        });
+
+        if ($new_obj->_storage->{autobless}) {
+            ##
+            # Skip over value and plain key to see if object needs
+            # to be re-blessed
+            ##
+            $storage->increment_pointer( $self->{data_size} + $self->{index_size} );
 
-        ##
-        # Write signature based on content type, set content length and write actual value.
-        ##
-        my $r = Scalar::Util::reftype($value) || '';
-        if ($r eq 'HASH') {
-            print( $fh DBM::Deep->TYPE_HASH );
-            print( $fh pack($DBM::Deep::DATA_LENGTH_PACK, $DBM::Deep::INDEX_SIZE) . chr(0) x $DBM::Deep::INDEX_SIZE );
-            $content_length = $DBM::Deep::INDEX_SIZE;
-        }
-        elsif ($r eq 'ARRAY') {
-            print( $fh DBM::Deep->TYPE_ARRAY );
-            print( $fh pack($DBM::Deep::DATA_LENGTH_PACK, $DBM::Deep::INDEX_SIZE) . chr(0) x $DBM::Deep::INDEX_SIZE );
-            $content_length = $DBM::Deep::INDEX_SIZE;
+            my $size = $storage->read_at( undef, $self->{data_size} );
+            $size = unpack($self->{data_pack}, $size);
+            if ($size) { $storage->increment_pointer( $size ); }
+
+            my $bless_bit = $storage->read_at( undef, 1 );
+            if ( ord($bless_bit) ) {
+                my $size = unpack(
+                    $self->{data_pack},
+                    $storage->read_at( undef, $self->{data_size} ),
+                );
+
+                if ( $size ) {
+                    $new_obj = bless $new_obj, $storage->read_at( undef, $size );
+                }
+            }
         }
-        elsif (!defined($value)) {
-            print( $fh DBM::Deep->SIG_NULL );
-            print( $fh pack($DBM::Deep::DATA_LENGTH_PACK, 0) );
-            $content_length = 0;
+
+        return $new_obj;
+    }
+    elsif ( $signature eq SIG_INTERNAL ) {
+        my $size = $storage->read_at( undef, $self->{data_size} );
+        $size = unpack($self->{data_pack}, $size);
+
+        if ( $size ) {
+            my $new_loc = $storage->read_at( undef, $size );
+            $new_loc = unpack( $self->{long_pack}, $new_loc ); 
+            return $self->read_from_loc( $new_loc, $orig_key );
         }
         else {
-            print( $fh DBM::Deep->SIG_DATA );
-            print( $fh pack($DBM::Deep::DATA_LENGTH_PACK, length($value)) . $value );
-            $content_length = length($value);
+            return;
         }
+    }
+    ##
+    # Otherwise return actual value
+    ##
+    elsif ( $signature eq SIG_DATA ) {
+        my $size = $storage->read_at( undef, $self->{data_size} );
+        $size = unpack($self->{data_pack}, $size);
 
-        ##
-        # Plain key is stored AFTER value, as keys are typically fetched less often.
-        ##
-        print( $fh pack($DBM::Deep::DATA_LENGTH_PACK, length($plain_key)) . $plain_key );
+        my $value = $size ? $storage->read_at( undef, $size ) : '';
+        return $value;
+    }
 
-        ##
-        # If value is blessed, preserve class name
-        ##
-        if ( $root->{autobless} ) {
-            my $value_class = Scalar::Util::blessed($value);
-            if ( defined $value_class && $value_class ne 'DBM::Deep' ) {
-                ##
-                # Blessed ref -- will restore later
-                ##
-                print( $fh chr(1) );
-                print( $fh pack($DBM::Deep::DATA_LENGTH_PACK, length($value_class)) . $value_class );
-                $content_length += 1;
-                $content_length += $DBM::Deep::DATA_LENGTH_SIZE + length($value_class);
-            }
-            else {
-                print( $fh chr(0) );
-                $content_length += 1;
-            }
-        }
+    ##
+    # Key exists, but content is null
+    ##
+    return;
+}
 
-        ##
-        # If this is a new content area, advance EOF counter
-        ##
-        if ($location == $root->{end}) {
-            $root->{end} += DBM::Deep->SIG_SIZE;
-            $root->{end} += $DBM::Deep::DATA_LENGTH_SIZE + $content_length;
-            $root->{end} += $DBM::Deep::DATA_LENGTH_SIZE + length($plain_key);
+sub get_bucket_value {
+    ##
+    # Fetch single value given tag and MD5 digested key.
+    ##
+    my $self = shift;
+    my ($tag, $md5, $orig_key) = @_;
+
+    #ACID - This is a read. Can find exact or HEAD
+    my ($keyloc, $offset) = $self->_find_in_buckets( $tag, $md5 );
+
+    if ( !$keyloc ) {
+        #XXX Need to use real key
+#        $self->add_bucket( $tag, $md5, $orig_key, undef, $orig_key );
+#        return;
+    }
+#    elsif ( !$is_deleted ) {
+    else {
+        my $keytag = $self->load_tag( $keyloc );
+        my ($subloc, $is_deleted) = $self->find_keyloc( $keytag );
+        if (!$subloc && !$is_deleted) {
+            ($subloc, $is_deleted) = $self->find_keyloc( $keytag, 0 );
         }
+        if ( $subloc && !$is_deleted ) {
+            return $self->read_from_loc( $subloc, $orig_key );
+        }
+    }
 
-        ##
-        # If content is a hash or array, create new child DBM::Deep object and
-        # pass each key or element to it.
-        ##
-        if ($r eq 'HASH') {
-            my $branch = DBM::Deep->new(
-                type => DBM::Deep->TYPE_HASH,
-                base_offset => $location,
-                root => $root,
-            );
-            foreach my $key (keys %{$value}) {
-                $branch->STORE( $key, $value->{$key} );
+    return;
+}
+
+sub delete_bucket {
+    ##
+    # Delete single key/value pair given tag and MD5 digested key.
+    ##
+    my $self = shift;
+    my ($tag, $md5, $orig_key) = @_;
+
+    #ACID - Although this is a mutation, we must find any transaction.
+    # This is because we need to mark something as deleted that is in the HEAD.
+    my ($keyloc, $offset) = $self->_find_in_buckets( $tag, $md5 );
+
+    return if !$keyloc;
+
+    my $storage = $self->_storage;
+
+    my @transactions;
+    if ( $storage->transaction_id == 0 ) {
+        @transactions = $storage->current_transactions;
+    }
+
+    if ( $storage->transaction_id == 0 ) {
+        my $keytag = $self->load_tag( $keyloc );
+
+        my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
+        return if !$subloc || $is_deleted;
+
+        my $value = $self->read_from_loc( $subloc, $orig_key );
+
+        my $size = $self->_length_needed( $value, $orig_key );
+
+        for my $trans_id ( @transactions ) {
+            my ($loc, $is_deleted, $offset2) = $self->find_keyloc( $keytag, $trans_id );
+            unless ($loc) {
+                my $location2 = $storage->request_space( $size );
+                $storage->print_at( $keytag->{offset} + $offset2,
+                    pack($self->{long_pack}, $location2 ),
+                    pack( 'C C', $trans_id, 0 ),
+                );
+                $self->_write_value( $location2, $orig_key, $value, $orig_key );
             }
         }
-        elsif ($r eq 'ARRAY') {
-            my $branch = DBM::Deep->new(
-                type => DBM::Deep->TYPE_ARRAY,
-                base_offset => $location,
-                root => $root,
+
+        $keytag = $self->load_tag( $keyloc );
+        ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
+        $storage->print_at( $keytag->{offset} + $offset,
+            substr( $keytag->{content}, $offset + $self->{key_size} ),
+            chr(0) x $self->{key_size},
+        );
+    }
+    else {
+        my $keytag = $self->load_tag( $keyloc );
+
+        my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
+
+        $storage->print_at( $keytag->{offset} + $offset,
+            pack($self->{long_pack}, 0 ),
+            pack( 'C C', $storage->transaction_id, 1 ),
+        );
+    }
+
+    return 1;
+}
+
+sub bucket_exists {
+    ##
+    # Check existence of single key given tag and MD5 digested key.
+    ##
+    my $self = shift;
+    my ($tag, $md5) = @_;
+
+    #ACID - This is a read. Can find exact or HEAD
+    my ($keyloc) = $self->_find_in_buckets( $tag, $md5 );
+    my $keytag = $self->load_tag( $keyloc );
+    my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
+    if ( !$subloc && !$is_deleted ) {
+        ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag, 0 );
+    }
+    return ($subloc && !$is_deleted) && 1;
+}
+
+sub find_blist {
+    ##
+    # Locate offset for bucket list, given digested key
+    ##
+    my $self = shift;
+    my ($offset, $md5, $args) = @_;
+    $args = {} unless $args;
+
+    ##
+    # Locate offset for bucket list using digest index system
+    ##
+    my $tag = $self->load_tag( $offset )
+        or $self->_throw_error( "INTERNAL ERROR - Cannot find tag" );
+
+    #XXX What happens when $ch >= $self->{hash_size} ??
+    for (my $ch = 0; $tag->{signature} ne SIG_BLIST; $ch++) {
+        my $num = ord substr($md5, $ch, 1);
+
+        my $ref_loc = $tag->{offset} + ($num * $self->{long_size});
+        $tag = $self->index_lookup( $tag, $num );
+
+        if (!$tag) {
+            return if !$args->{create};
+
+            my $loc = $self->_storage->request_space(
+                $self->tag_size( $self->{bucket_list_size} ),
             );
-            my $index = 0;
-            foreach my $element (@{$value}) {
-                $branch->STORE( $index, $element );
-                $index++;
-            }
+
+            $self->_storage->print_at( $ref_loc, pack($self->{long_pack}, $loc) );
+
+            $tag = $self->write_tag(
+                $loc, SIG_BLIST,
+                chr(0)x$self->{bucket_list_size},
+            );
+
+            $tag->{ref_loc} = $ref_loc;
+            $tag->{ch} = $ch;
+
+            last;
         }
 
-        return $result;
+        $tag->{ch} = $ch;
+        $tag->{ref_loc} = $ref_loc;
     }
 
-    return $obj->_throw_error("Fatal error: indexing failed -- possibly due to corruption in file");
+    return $tag;
 }
 
-sub get_bucket_value {
-       ##
-       # Fetch single value given tag and MD5 digested key.
-       ##
-       my $self = shift;
-       my ($obj, $tag, $md5) = @_;
-       my $keys = $tag->{content};
-
-    my $fh = $obj->_fh;
-
-       ##
-       # Iterate through buckets, looking for a key match
-       ##
-    BUCKET:
-       for (my $i=0; $i<$DBM::Deep::MAX_BUCKETS; $i++) {
-               my $key = substr($keys, $i * $DBM::Deep::BUCKET_SIZE, $DBM::Deep::HASH_SIZE);
-               my $subloc = unpack($DBM::Deep::LONG_PACK, substr($keys, ($i * $DBM::Deep::BUCKET_SIZE) + $DBM::Deep::HASH_SIZE, $DBM::Deep::LONG_SIZE));
-
-               if (!$subloc) {
-                       ##
-                       # Hit end of list, no match
-                       ##
-                       return;
-               }
-
-        if ( $md5 ne $key ) {
-            next BUCKET;
-        }
+sub index_lookup {
+    ##
+    # Given index tag, lookup single entry in index and return .
+    ##
+    my $self = shift;
+    my ($tag, $index) = @_;
 
-        ##
-        # Found match -- seek to offset and read signature
-        ##
-        my $signature;
-        seek($fh, $subloc + $obj->_root->{file_offset}, SEEK_SET);
-        read( $fh, $signature, DBM::Deep->SIG_SIZE);
-        
-        ##
-        # If value is a hash or array, return new DBM::Deep object with correct offset
-        ##
-        if (($signature eq DBM::Deep->TYPE_HASH) || ($signature eq DBM::Deep->TYPE_ARRAY)) {
-            my $obj = DBM::Deep->new(
-                type => $signature,
-                base_offset => $subloc,
-                root => $obj->_root,
+    my $location = unpack(
+        $self->{long_pack},
+        substr(
+            $tag->{content},
+            $index * $self->{long_size},
+            $self->{long_size},
+        ),
+    );
+
+    if (!$location) { return; }
+
+    return $self->load_tag( $location );
+}
+
+sub traverse_index {
+    ##
+    # Scan index and recursively step into deeper levels, looking for next key.
+    ##
+    my $self = shift;
+    my ($xxxx, $offset, $ch, $force_return_next) = @_;
+
+    my $tag = $self->load_tag( $offset );
+
+    if ($tag->{signature} ne SIG_BLIST) {
+        my $start = $xxxx->{return_next} ? 0 : ord(substr($xxxx->{prev_md5}, $ch, 1));
+
+        for (my $idx = $start; $idx < $self->{hash_chars_used}; $idx++) {
+            my $subloc = unpack(
+                $self->{long_pack},
+                substr(
+                    $tag->{content},
+                    $idx * $self->{long_size},
+                    $self->{long_size},
+                ),
             );
-            
-            if ($obj->_root->{autobless}) {
-                ##
-                # Skip over value and plain key to see if object needs
-                # to be re-blessed
-                ##
-                seek($fh, $DBM::Deep::DATA_LENGTH_SIZE + $DBM::Deep::INDEX_SIZE, SEEK_CUR);
-                
-                my $size;
-                read( $fh, $size, $DBM::Deep::DATA_LENGTH_SIZE); $size = unpack($DBM::Deep::DATA_LENGTH_PACK, $size);
-                if ($size) { seek($fh, $size, SEEK_CUR); }
-                
-                my $bless_bit;
-                read( $fh, $bless_bit, 1);
-                if (ord($bless_bit)) {
-                    ##
-                    # Yes, object needs to be re-blessed
-                    ##
-                    my $class_name;
-                    read( $fh, $size, $DBM::Deep::DATA_LENGTH_SIZE); $size = unpack($DBM::Deep::DATA_LENGTH_PACK, $size);
-                    if ($size) { read( $fh, $class_name, $size); }
-                    if ($class_name) { $obj = bless( $obj, $class_name ); }
-                }
+
+            if ($subloc) {
+                my $result = $self->traverse_index(
+                    $xxxx, $subloc, $ch + 1, $force_return_next,
+                );
+
+                if (defined $result) { return $result; }
             }
-            
-            return $obj;
-        }
-        
+        } # index loop
+
+        $xxxx->{return_next} = 1;
+    }
+    # This is the bucket list
+    else {
+        my $keys = $tag->{content};
+        if ($force_return_next) { $xxxx->{return_next} = 1; }
+
         ##
-        # Otherwise return actual value
+        # Iterate through buckets, looking for a key match
         ##
-        elsif ($signature eq DBM::Deep->SIG_DATA) {
-            my $size;
-            my $value = '';
-            read( $fh, $size, $DBM::Deep::DATA_LENGTH_SIZE); $size = unpack($DBM::Deep::DATA_LENGTH_PACK, $size);
-            if ($size) { read( $fh, $value, $size); }
-            return $value;
+        my $transaction_id = $self->_storage->transaction_id;
+        for (my $i = 0; $i < $self->{max_buckets}; $i++) {
+            my ($key, $keyloc) = $self->_get_key_subloc( $keys, $i );
+
+            # End of bucket list -- return to outer loop
+            if (!$keyloc) {
+                $xxxx->{return_next} = 1;
+                last;
+            }
+            # Located previous key -- return next one found
+            elsif ($key eq $xxxx->{prev_md5}) {
+                $xxxx->{return_next} = 1;
+                next;
+            }
+            # Seek to bucket location and skip over signature
+            elsif ($xxxx->{return_next}) {
+                my $storage = $self->_storage;
+
+                my $keytag = $self->load_tag( $keyloc );
+                my ($subloc, $is_deleted) = $self->find_keyloc( $keytag );
+                if ( $subloc == 0 && !$is_deleted ) {
+                    ($subloc, $is_deleted) = $self->find_keyloc( $keytag, 0 );
+                }
+                next if $is_deleted;
+
+                # Skip over value to get to plain key
+                my $sig = $storage->read_at( $subloc, SIG_SIZE );
+
+                my $size = $storage->read_at( undef, $self->{data_size} );
+                $size = unpack($self->{data_pack}, $size);
+                if ($size) { $storage->increment_pointer( $size ); }
+
+                # Read in plain key and return as scalar
+                $size = $storage->read_at( undef, $self->{data_size} );
+                $size = unpack($self->{data_pack}, $size);
+
+                my $plain_key;
+                if ($size) { $plain_key = $storage->read_at( undef, $size); }
+                return $plain_key;
+            }
         }
-        
-        ##
-        # Key exists, but content is null
-        ##
-        else { return; }
-       } # i loop
 
-       return;
+        $xxxx->{return_next} = 1;
+    }
+
+    return;
 }
 
-sub delete_bucket {
-       ##
-       # Delete single key/value pair given tag and MD5 digested key.
-       ##
-       my $self = shift;
-       my ($obj, $tag, $md5) = @_;
-       my $keys = $tag->{content};
-
-    my $fh = $obj->_fh;
-       
-       ##
-       # Iterate through buckets, looking for a key match
-       ##
-    BUCKET:
-       for (my $i=0; $i<$DBM::Deep::MAX_BUCKETS; $i++) {
-               my $key = substr($keys, $i * $DBM::Deep::BUCKET_SIZE, $DBM::Deep::HASH_SIZE);
-               my $subloc = unpack($DBM::Deep::LONG_PACK, substr($keys, ($i * $DBM::Deep::BUCKET_SIZE) + $DBM::Deep::HASH_SIZE, $DBM::Deep::LONG_SIZE));
-
-               if (!$subloc) {
-                       ##
-                       # Hit end of list, no match
-                       ##
-                       return;
-               }
-
-        if ( $md5 ne $key ) {
-            next BUCKET;
-        }
+# Utilities
 
-        ##
-        # Matched key -- delete bucket and return
-        ##
-        seek($fh, $tag->{offset} + ($i * $DBM::Deep::BUCKET_SIZE) + $obj->_root->{file_offset}, SEEK_SET);
-        print( $fh substr($keys, ($i+1) * $DBM::Deep::BUCKET_SIZE ) );
-        print( $fh chr(0) x $DBM::Deep::BUCKET_SIZE );
-        
-        return 1;
-       } # i loop
-
-       return;
+sub _get_key_subloc {
+    my $self = shift;
+    my ($keys, $idx) = @_;
+
+    return unpack(
+        # This is 'a', not 'A'. Please read the pack() documentation for the
+        # difference between the two and why it's important.
+        "a$self->{hash_size} $self->{long_pack}",
+        substr(
+            $keys,
+            ($idx * $self->{bucket_size}),
+            $self->{bucket_size},
+        ),
+    );
 }
 
-sub bucket_exists {
-       ##
-       # Check existence of single key given tag and MD5 digested key.
-       ##
-       my $self = shift;
-       my ($obj, $tag, $md5) = @_;
-       my $keys = $tag->{content};
-       
-       ##
-       # Iterate through buckets, looking for a key match
-       ##
+sub _find_in_buckets {
+    my $self = shift;
+    my ($tag, $md5) = @_;
+
     BUCKET:
-       for (my $i=0; $i<$DBM::Deep::MAX_BUCKETS; $i++) {
-               my $key = substr($keys, $i * $DBM::Deep::BUCKET_SIZE, $DBM::Deep::HASH_SIZE);
-               my $subloc = unpack($DBM::Deep::LONG_PACK, substr($keys, ($i * $DBM::Deep::BUCKET_SIZE) + $DBM::Deep::HASH_SIZE, $DBM::Deep::LONG_SIZE));
-
-               if (!$subloc) {
-                       ##
-                       # Hit end of list, no match
-                       ##
-                       return;
-               }
-
-        if ( $md5 ne $key ) {
-            next BUCKET;
-        }
+    for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
+        my ($key, $subloc) = $self->_get_key_subloc(
+            $tag->{content}, $i,
+        );
 
-        ##
-        # Matched key -- return true
-        ##
-        return 1;
-       } # i loop
+        next BUCKET if $subloc && $key ne $md5;
+        return( $subloc, $i * $self->{bucket_size} );
+    }
 
-       return;
+    return;
 }
 
-sub find_bucket_list {
-       ##
-       # Locate offset for bucket list, given digested key
-       ##
-       my $self = shift;
-       my ($obj, $md5) = @_;
-       
-       ##
-       # Locate offset for bucket list using digest index system
-       ##
-       my $ch = 0;
-       my $tag = $self->load_tag($obj, $obj->_base_offset);
-       if (!$tag) { return; }
-       
-       while ($tag->{signature} ne DBM::Deep->SIG_BLIST) {
-               $tag = $self->index_lookup($obj, $tag, ord(substr($md5, $ch, 1)));
-               if (!$tag) { return; }
-               $ch++;
-       }
-       
-       return $tag;
+sub _release_space {
+    my $self = shift;
+    my ($size, $loc) = @_;
+
+    my $next_loc = 0;
+
+    $self->_storage->print_at( $loc,
+        SIG_FREE, 
+        pack($self->{long_pack}, $size ),
+        pack($self->{long_pack}, $next_loc ),
+    );
+
+    return;
 }
 
-sub traverse_index {
-       ##
-       # Scan index and recursively step into deeper levels, looking for next key.
-       ##
-    my $self = shift;
-    my ($obj, $offset, $ch, $force_return_next) = @_;
-    $force_return_next = undef unless $force_return_next;
-       
-       my $tag = $self->load_tag($obj, $offset );
-
-    my $fh = $obj->_fh;
-       
-       if ($tag->{signature} ne DBM::Deep->SIG_BLIST) {
-               my $content = $tag->{content};
-               my $start;
-               if ($obj->{return_next}) { $start = 0; }
-               else { $start = ord(substr($obj->{prev_md5}, $ch, 1)); }
-               
-               for (my $index = $start; $index < 256; $index++) {
-                       my $subloc = unpack($DBM::Deep::LONG_PACK, substr($content, $index * $DBM::Deep::LONG_SIZE, $DBM::Deep::LONG_SIZE) );
-                       if ($subloc) {
-                               my $result = $self->traverse_index( $obj, $subloc, $ch + 1, $force_return_next );
-                               if (defined($result)) { return $result; }
-                       }
-               } # index loop
-               
-               $obj->{return_next} = 1;
-       } # tag is an index
-       
-       elsif ($tag->{signature} eq DBM::Deep->SIG_BLIST) {
-               my $keys = $tag->{content};
-               if ($force_return_next) { $obj->{return_next} = 1; }
-               
-               ##
-               # Iterate through buckets, looking for a key match
-               ##
-               for (my $i=0; $i<$DBM::Deep::MAX_BUCKETS; $i++) {
-                       my $key = substr($keys, $i * $DBM::Deep::BUCKET_SIZE, $DBM::Deep::HASH_SIZE);
-                       my $subloc = unpack($DBM::Deep::LONG_PACK, substr($keys, ($i * $DBM::Deep::BUCKET_SIZE) + $DBM::Deep::HASH_SIZE, $DBM::Deep::LONG_SIZE));
-       
-                       if (!$subloc) {
-                               ##
-                               # End of bucket list -- return to outer loop
-                               ##
-                               $obj->{return_next} = 1;
-                               last;
-                       }
-                       elsif ($key eq $obj->{prev_md5}) {
-                               ##
-                               # Located previous key -- return next one found
-                               ##
-                               $obj->{return_next} = 1;
-                               next;
-                       }
-                       elsif ($obj->{return_next}) {
-                               ##
-                               # Seek to bucket location and skip over signature
-                               ##
-                               seek($fh, $subloc + DBM::Deep->SIG_SIZE + $obj->_root->{file_offset}, SEEK_SET);
-                               
-                               ##
-                               # Skip over value to get to plain key
-                               ##
-                               my $size;
-                               read( $fh, $size, $DBM::Deep::DATA_LENGTH_SIZE); $size = unpack($DBM::Deep::DATA_LENGTH_PACK, $size);
-                               if ($size) { seek($fh, $size, SEEK_CUR); }
-                               
-                               ##
-                               # Read in plain key and return as scalar
-                               ##
-                               my $plain_key;
-                               read( $fh, $size, $DBM::Deep::DATA_LENGTH_SIZE); $size = unpack($DBM::Deep::DATA_LENGTH_PACK, $size);
-                               if ($size) { read( $fh, $plain_key, $size); }
-                               
-                               return $plain_key;
-                       }
-               } # bucket loop
-               
-               $obj->{return_next} = 1;
-       } # tag is a bucket list
-       
-       return;
+sub _throw_error {
+    die "DBM::Deep: $_[1]\n";
 }
 
-sub get_next_key {
-       ##
-       # Locate next key, given digested previous one
-       ##
+sub _get_dbm_object {
+    my $item = shift;
+
+    my $obj = eval {
+        local $SIG{__DIE__};
+        if ($item->isa( 'DBM::Deep' )) {
+            return $item;
+        }
+        return;
+    };
+    return $obj if $obj;
+
+    my $r = Scalar::Util::reftype( $item ) || '';
+    if ( $r eq 'HASH' ) {
+        my $obj = eval {
+            local $SIG{__DIE__};
+            my $obj = tied(%$item);
+            if ($obj->isa( 'DBM::Deep' )) {
+                return $obj;
+            }
+            return;
+        };
+        return $obj if $obj;
+    }
+    elsif ( $r eq 'ARRAY' ) {
+        my $obj = eval {
+            local $SIG{__DIE__};
+            my $obj = tied(@$item);
+            if ($obj->isa( 'DBM::Deep' )) {
+                return $obj;
+            }
+            return;
+        };
+        return $obj if $obj;
+    }
+
+    return;
+}
+
+sub _length_needed {
     my $self = shift;
-    my ($obj) = @_;
-       
-       $obj->{prev_md5} = $_[1] ? $_[1] : undef;
-       $obj->{return_next} = 0;
-       
-       ##
-       # If the previous key was not specifed, start at the top and
-       # return the first one found.
-       ##
-       if (!$obj->{prev_md5}) {
-               $obj->{prev_md5} = chr(0) x $DBM::Deep::HASH_SIZE;
-               $obj->{return_next} = 1;
-       }
-       
-       return $self->traverse_index( $obj, $obj->_base_offset, 0 );
+    my ($value, $key) = @_;
+
+    my $is_dbm_deep = eval {
+        local $SIG{'__DIE__'};
+        $value->isa( 'DBM::Deep' );
+    };
+
+    my $len = SIG_SIZE
+            + $self->{data_size} # size for value
+            + $self->{data_size} # size for key
+            + length( $key );    # length of key
+
+    if ( $is_dbm_deep && $value->_storage eq $self->_storage ) {
+        # long_size is for the internal reference
+        return $len + $self->{long_size};
+    }
+
+    if ( $self->_storage->{autobless} ) {
+        # This is for the bit saying whether or not this thing is blessed.
+        $len += 1;
+    }
+
+    my $r = Scalar::Util::reftype( $value ) || '';
+    unless ( $r eq 'HASH' || $r eq 'ARRAY' ) {
+        if ( defined $value ) {
+            $len += length( $value );
+        }
+        return $len;
+    }
+
+    $len += $self->{index_size};
+
+    # if autobless is enabled, must also take into consideration
+    # the class name as it is stored after the key.
+    if ( $self->_storage->{autobless} ) {
+        my $c = Scalar::Util::blessed($value);
+        if ( defined $c && !$is_dbm_deep ) {
+            $len += $self->{data_size} + length($c);
+        }
+    }
+
+    return $len;
 }
 
 1;