X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=blobdiff_plain;f=lib%2FDBM%2FDeep%2FEngine.pm;h=933027e66dfc1d720a9a12499101a90f45d070da;hb=f0276afb03319e4eb8235ab9e89f54e6b58d7113;hp=03e08e97a48d7c8892306478fbbd7c7b51b7fb6c;hpb=f37c15abf1fbc8644137e02da0c3022fd2f5d43f;p=dbsrgits%2FDBM-Deep.git diff --git a/lib/DBM/Deep/Engine.pm b/lib/DBM/Deep/Engine.pm index 03e08e9..933027e 100644 --- a/lib/DBM/Deep/Engine.pm +++ b/lib/DBM/Deep/Engine.pm @@ -1,940 +1,1491 @@ package DBM::Deep::Engine; +use 5.006_000; + use strict; +use warnings FATAL => 'all'; + +# Never import symbols into our namespace. We are a class, not a library. +# -RobK, 2008-05-27 +use Scalar::Util (); -use Fcntl qw( :DEFAULT :flock :seek ); +#use Data::Dumper (); + +# File-wide notes: +# * Every method in here assumes that the storage has been appropriately +# safeguarded. This can be anything from flock() to some sort of manual +# mutex. But, it's the caller's responsability to make sure that this has +# been done. -## # Setup file and tag signatures. These should never change. -## sub SIG_FILE () { 'DPDB' } -sub SIG_INTERNAL () { 'i' } +sub SIG_HEADER () { 'h' } sub SIG_HASH () { 'H' } sub SIG_ARRAY () { 'A' } sub SIG_NULL () { 'N' } sub SIG_DATA () { 'D' } sub SIG_INDEX () { 'I' } sub SIG_BLIST () { 'B' } +sub SIG_FREE () { 'F' } sub SIG_SIZE () { 1 } -sub precalc_sizes { - ## - # Precalculate index, bucket and bucket list sizes - ## - my $self = shift; +use DBM::Deep::Iterator::BucketList (); +use DBM::Deep::Iterator::Index (); +use DBM::Deep::Engine::Sector::Data (); +use DBM::Deep::Engine::Sector::BucketList (); +use DBM::Deep::Engine::Sector::Index (); +use DBM::Deep::Engine::Sector::Null (); +use DBM::Deep::Engine::Sector::Reference (); +use DBM::Deep::Engine::Sector::Scalar (); +use DBM::Deep::Null (); - $self->{index_size} = (2**8) * $self->{long_size}; - $self->{bucket_size} = $self->{hash_size} + $self->{long_size}; - $self->{bucket_list_size} = $self->{max_buckets} * $self->{bucket_size}; +my $STALE_SIZE = 2; - return 1; -} +# Please refer to the pack() documentation for further information +my %StP = ( + 1 => 'C', # Unsigned char value (no order needed as it's just one byte) + 2 => 'n', # Unsigned short in "network" (big-endian) order + 4 => 'N', # Unsigned long in "network" (big-endian) order + 8 => 'Q', # Usigned quad (no order specified, presumably machine-dependent) +); -sub set_pack { - ## - # Set pack/unpack modes (see file header for more) - ## - my $self = shift; - my ($long_s, $long_p, $data_s, $data_p) = @_; - - ## - # Set to 4 and 'N' for 32-bit offset tags (default). Theoretical limit of 4 - # GB per file. - # (Perl must be compiled with largefile support for files > 2 GB) - # - # Set to 8 and 'Q' for 64-bit offsets. Theoretical limit of 16 XB per file. - # (Perl must be compiled with largefile and 64-bit long support) - ## - $self->{long_size} = $long_s ? $long_s : 4; - $self->{long_pack} = $long_p ? $long_p : 'N'; - - ## - # Set to 4 and 'N' for 32-bit data length prefixes. Limit of 4 GB for each - # key/value. Upgrading this is possible (see above) but probably not - # necessary. If you need more than 4 GB for a single key or value, this - # module is really not for you :-) - ## - $self->{data_size} = $data_s ? $data_s : 4; - $self->{data_pack} = $data_p ? $data_p : 'N'; - - return $self->precalc_sizes(); -} - -sub set_digest { - ## - # Set key digest function (default is MD5) - ## - my $self = shift; - my ($digest_func, $hash_size) = @_; +=head1 NAME - $self->{digest} = $digest_func ? $digest_func : \&Digest::MD5::md5; - $self->{hash_size} = $hash_size ? $hash_size : 16; +DBM::Deep::Engine - return $self->precalc_sizes(); -} +=head1 PURPOSE -sub new { - my $class = shift; - my ($args) = @_; +This is an internal-use-only object for L. It mediates the low-level +mapping between the L objects and the storage medium. - my $self = bless { - long_size => 4, - long_pack => 'N', - data_size => 4, - data_pack => 'N', - - digest => \&Digest::MD5::md5, - hash_size => 16, - - ## - # Maximum number of buckets per list before another level of indexing is - # done. - # Increase this value for slightly greater speed, but larger database - # files. DO NOT decrease this value below 16, due to risk of recursive - # reindex overrun. - ## - max_buckets => 16, - }, $class; +The purpose of this documentation is to provide low-level documentation for +developers. It is B intended to be used by the general public. This +documentation and what it documents can and will change without notice. - $self->precalc_sizes; +=head1 OVERVIEW - return $self; -} +The engine exposes an API to the DBM::Deep objects (DBM::Deep, DBM::Deep::Array, +and DBM::Deep::Hash) for their use to access the actual stored values. This API +is the following: -sub setup_fh { - my $self = shift; - my ($obj) = @_; +=over 4 - $self->open( $obj ) if !defined $obj->_fh; +=item * new - my $fh = $obj->_fh; - flock $fh, LOCK_EX; +=item * read_value - unless ( $obj->{base_offset} ) { - seek($fh, 0 + $obj->_root->{file_offset}, SEEK_SET); - my $signature; - my $bytes_read = read( $fh, $signature, length(SIG_FILE)); +=item * get_classname - ## - # File is empty -- write signature and master index - ## - if (!$bytes_read) { - my $loc = $self->_request_space( $obj, length( SIG_FILE ) ); - seek($fh, $loc + $obj->_root->{file_offset}, SEEK_SET); - print( $fh SIG_FILE); +=item * make_reference - $obj->{base_offset} = $self->_request_space( - $obj, $self->tag_size( $self->{index_size} ), - ); +=item * key_exists - $self->create_tag( - $obj, $obj->_base_offset, $obj->_type, - chr(0)x$self->{index_size}, - ); +=item * delete_key + +=item * write_value + +=item * get_next_key + +=item * setup_fh + +=item * begin_work + +=item * commit + +=item * rollback + +=item * lock_exclusive + +=item * lock_shared + +=item * unlock + +=back + +They are explained in their own sections below. These methods, in turn, may +provide some bounds-checking, but primarily act to instantiate objects in the +Engine::Sector::* hierarchy and dispatch to them. + +=head1 TRANSACTIONS + +Transactions in DBM::Deep are implemented using a variant of MVCC. This attempts +to keep the amount of actual work done against the file low while stil providing +Atomicity, Consistency, and Isolation. Durability, unfortunately, cannot be done +with only one file. + +=head2 STALENESS + +If another process uses a transaction slot and writes stuff to it, then terminates, +the data that process wrote it still within the file. In order to address this, +there is also a transaction staleness counter associated within every write. +Each time a transaction is started, that process increments that transaction's +staleness counter. If, when it reads a value, the staleness counters aren't +identical, DBM::Deep will consider the value on disk to be stale and discard it. + +=head2 DURABILITY + +The fourth leg of ACID is Durability, the guarantee that when a commit returns, +the data will be there the next time you read from it. This should be regardless +of any crashes or powerdowns in between the commit and subsequent read. DBM::Deep +does provide that guarantee; once the commit returns, all of the data has been +transferred from the transaction shadow to the HEAD. The issue arises with partial +commits - a commit that is interrupted in some fashion. In keeping with DBM::Deep's +"tradition" of very light error-checking and non-existent error-handling, there is +no way to recover from a partial commit. (This is probably a failure in Consistency +as well as Durability.) + +Other DBMSes use transaction logs (a separate file, generally) to achieve Durability. +As DBM::Deep is a single-file, we would have to do something similar to what SQLite +and BDB do in terms of committing using synchonized writes. To do this, we would have +to use a much higher RAM footprint and some serious programming that make my head +hurts just to think about it. + +=head1 EXTERNAL METHODS + +=head2 new() + +This takes a set of args. These args are described in the documentation for +L. + +=cut + +sub new { + my $class = shift; + my ($args) = @_; + + $args->{storage} = DBM::Deep::File->new( $args ) + unless exists $args->{storage}; + + my $self = bless { + byte_size => 4, + + digest => undef, + hash_size => 16, # In bytes + hash_chars => 256, # Number of chars the algorithm uses per byte + max_buckets => 16, + num_txns => 1, # The HEAD + trans_id => 0, # Default to the HEAD + + data_sector_size => 64, # Size in bytes of each data sector + + entries => {}, # This is the list of entries for transactions + storage => undef, + }, $class; - # Flush the filehandle - my $old_fh = select $fh; - my $old_af = $|; $| = 1; $| = $old_af; - select $old_fh; + # Never allow byte_size to be set directly. + delete $args->{byte_size}; + if ( defined $args->{pack_size} ) { + if ( lc $args->{pack_size} eq 'small' ) { + $args->{byte_size} = 2; + } + elsif ( lc $args->{pack_size} eq 'medium' ) { + $args->{byte_size} = 4; + } + elsif ( lc $args->{pack_size} eq 'large' ) { + $args->{byte_size} = 8; } else { - $obj->{base_offset} = $bytes_read; + DBM::Deep->_throw_error( "Unknown pack_size value: '$args->{pack_size}'" ); + } + } - ## - # Check signature was valid - ## - unless ($signature eq SIG_FILE) { - $self->close_fh( $obj ); - $obj->_throw_error("Signature not found -- file is not a Deep DB"); - } + # Grab the parameters we want to use + foreach my $param ( keys %$self ) { + next unless exists $args->{$param}; + $self->{$param} = $args->{$param}; + } - ## - # Get our type from master index signature - ## - my $tag = $self->load_tag($obj, $obj->_base_offset) - or $obj->_throw_error("Corrupted file, no master index record"); + my %validations = ( + max_buckets => { floor => 16, ceil => 256 }, + num_txns => { floor => 1, ceil => 255 }, + data_sector_size => { floor => 32, ceil => 256 }, + ); - unless ($obj->{type} eq $tag->{signature}) { - $obj->_throw_error("File type mismatch"); - } + while ( my ($attr, $c) = each %validations ) { + if ( !defined $self->{$attr} + || !length $self->{$attr} + || $self->{$attr} =~ /\D/ + || $self->{$attr} < $c->{floor} + ) { + $self->{$attr} = '(undef)' if !defined $self->{$attr}; + warn "Floor of $attr is $c->{floor}. Setting it to $c->{floor} from '$self->{$attr}'\n"; + $self->{$attr} = $c->{floor}; + } + elsif ( $self->{$attr} > $c->{ceil} ) { + warn "Ceiling of $attr is $c->{ceil}. Setting it to $c->{ceil} from '$self->{$attr}'\n"; + $self->{$attr} = $c->{ceil}; } } - #XXX We have to make sure we don't mess up when autoflush isn't turned on - unless ( $obj->_root->{inode} ) { - my @stats = stat($obj->_fh); - $obj->_root->{inode} = $stats[1]; - $obj->_root->{end} = $stats[7]; + if ( !$self->{digest} ) { + require Digest::MD5; + $self->{digest} = \&Digest::MD5::md5; } - flock $fh, LOCK_UN; - - return 1; + return $self; } -sub open { - ## - # Open a fh to the database, create if nonexistent. - # Make sure file signature matches DBM::Deep spec. - ## +=head2 read_value( $obj, $key ) + +This takes an object that provides _base_offset() and a string. It returns the +value stored in the corresponding Sector::Value's data section. + +=cut + +sub read_value { my $self = shift; - my ($obj) = @_; + my ($obj, $key) = @_; + + # This will be a Reference sector + my $sector = $self->_load_sector( $obj->_base_offset ) + or return; + + if ( $sector->staleness != $obj->_staleness ) { + return; + } - # Theoretically, adding O_BINARY should remove the need for the binmode - # Of course, testing it is going to be ... interesting. - my $flags = O_RDWR | O_CREAT | O_BINARY; + my $key_md5 = $self->_apply_digest( $key ); - my $fh; - my $filename = $obj->_root->{file}; - sysopen( $fh, $filename, $flags ) - or $obj->_throw_error("Cannot sysopen file '$filename': $!"); - $obj->_root->{fh} = $fh; + my $value_sector = $sector->get_data_for({ + key_md5 => $key_md5, + allow_head => 1, + }); - #XXX Can we remove this by using the right sysopen() flags? - # Maybe ... q.v. above - binmode $fh; # for win32 + unless ( $value_sector ) { + $value_sector = DBM::Deep::Engine::Sector::Null->new({ + engine => $self, + data => undef, + }); - if ($obj->_root->{autoflush}) { - my $old = select $fh; - $|=1; - select $old; + $sector->write_data({ + key_md5 => $key_md5, + key => $key, + value => $value_sector, + }); } - return 1; + return $value_sector->data; } -sub close_fh { +=head2 get_classname( $obj ) + +This takes an object that provides _base_offset() and returns the classname (if any) +associated with it. + +It delegates to Sector::Reference::get_classname() for the heavy lifting. + +It performs a staleness check. + +=cut + +sub get_classname { my $self = shift; my ($obj) = @_; - if ( my $fh = $obj->_root->{fh} ) { - close $fh; + # This will be a Reference sector + my $sector = $self->_load_sector( $obj->_base_offset ) + or DBM::Deep->_throw_error( "How did get_classname fail (no sector for '$obj')?!" ); + + if ( $sector->staleness != $obj->_staleness ) { + return; } - $obj->_root->{fh} = undef; - return 1; + return $sector->get_classname; } -sub tag_size { - my $self = shift; - my ($size) = @_; - return SIG_SIZE + $self->{data_size} + $size; -} +=head2 make_reference( $obj, $old_key, $new_key ) + +This takes an object that provides _base_offset() and two strings. The +strings correspond to the old key and new key, respectively. This operation +is equivalent to (given C<< $db->{foo} = []; >>) C<< $db->{bar} = $db->{foo}; >>. -sub create_tag { - ## - # Given offset, signature and content, create tag and write to disk - ## +This returns nothing. + +=cut + +sub make_reference { my $self = shift; - my ($obj, $offset, $sig, $content) = @_; - my $size = length( $content ); + my ($obj, $old_key, $new_key) = @_; - my $fh = $obj->_fh; + # This will be a Reference sector + my $sector = $self->_load_sector( $obj->_base_offset ) + or DBM::Deep->_throw_error( "How did make_reference fail (no sector for '$obj')?!" ); - if ( defined $offset ) { - seek($fh, $offset + $obj->_root->{file_offset}, SEEK_SET); + if ( $sector->staleness != $obj->_staleness ) { + return; } - print( $fh $sig . pack($self->{data_pack}, $size) . $content ); + my $old_md5 = $self->_apply_digest( $old_key ); + + my $value_sector = $sector->get_data_for({ + key_md5 => $old_md5, + allow_head => 1, + }); - return unless defined $offset; + unless ( $value_sector ) { + $value_sector = DBM::Deep::Engine::Sector::Null->new({ + engine => $self, + data => undef, + }); - return { - signature => $sig, - size => $size, - offset => $offset + SIG_SIZE + $self->{data_size}, - content => $content - }; + $sector->write_data({ + key_md5 => $old_md5, + key => $old_key, + value => $value_sector, + }); + } + + if ( $value_sector->isa( 'DBM::Deep::Engine::Sector::Reference' ) ) { + $sector->write_data({ + key => $new_key, + key_md5 => $self->_apply_digest( $new_key ), + value => $value_sector, + }); + $value_sector->increment_refcount; + } + else { + $sector->write_data({ + key => $new_key, + key_md5 => $self->_apply_digest( $new_key ), + value => $value_sector->clone, + }); + } + + return; } -sub load_tag { - ## - # Given offset, load single tag and return signature, size and data - ## - my $self = shift; - my ($obj, $offset) = @_; +=head2 key_exists( $obj, $key ) -# print join(':',map{$_||''}caller(1)), $/; +This takes an object that provides _base_offset() and a string for +the key to be checked. This returns 1 for true and "" for false. - my $fh = $obj->_fh; +=cut - seek($fh, $offset + $obj->_root->{file_offset}, SEEK_SET); +sub key_exists { + my $self = shift; + my ($obj, $key) = @_; - #XXX I'm not sure this check will work if autoflush isn't enabled ... - return if eof $fh; + # This will be a Reference sector + my $sector = $self->_load_sector( $obj->_base_offset ) + or return ''; - my $b; - read( $fh, $b, SIG_SIZE + $self->{data_size} ); - my ($sig, $size) = unpack( "A $self->{data_pack}", $b ); + if ( $sector->staleness != $obj->_staleness ) { + return ''; + } - my $buffer; - read( $fh, $buffer, $size); + my $data = $sector->get_data_for({ + key_md5 => $self->_apply_digest( $key ), + allow_head => 1, + }); - return { - signature => $sig, - size => $size, - offset => $offset + SIG_SIZE + $self->{data_size}, - content => $buffer - }; + # exists() returns 1 or '' for true/false. + return $data ? 1 : ''; } -sub _length_needed { - my $self = shift; - my ($obj, $value, $key) = @_; +=head2 delete_key( $obj, $key ) - my $is_dbm_deep = eval { - local $SIG{'__DIE__'}; - $value->isa( 'DBM::Deep' ); - }; +This takes an object that provides _base_offset() and a string for +the key to be deleted. This returns the result of the Sector::Reference +delete_key() method. - my $len = SIG_SIZE + $self->{data_size} - + $self->{data_size} + length( $key ); +=cut - if ( $is_dbm_deep && $value->_root eq $obj->_root ) { - return $len + $self->{long_size}; - } +sub delete_key { + my $self = shift; + my ($obj, $key) = @_; - my $r = Scalar::Util::reftype( $value ) || ''; - unless ( $r eq 'HASH' || $r eq 'ARRAY' ) { - if ( defined $value ) { - $len += length( $value ); - } - return $len; + my $sector = $self->_load_sector( $obj->_base_offset ) + or return; + + if ( $sector->staleness != $obj->_staleness ) { + return; } - $len += $self->{index_size}; + return $sector->delete_key({ + key_md5 => $self->_apply_digest( $key ), + allow_head => 0, + }); +} - # if autobless is enabled, must also take into consideration - # the class name as it is stored after the key. - if ( $obj->_root->{autobless} ) { - # This is for the bit saying whether or not this thing is blessed. - $len += 1; +=head2 write_value( $obj, $key, $value ) - my $value_class = Scalar::Util::blessed($value); - if ( defined $value_class && !$is_dbm_deep ) { - $len += $self->{data_size} + length($value_class); - } - } +This takes an object that provides _base_offset(), a string for the +key, and a value. This value can be anything storable within L. - return $len; -} +This returns 1 upon success. -sub add_bucket { - ## - # Adds one key/value pair to bucket list, given offset, MD5 digest of key, - # plain (undigested) key and value. - ## +=cut + +sub write_value { my $self = shift; - my ($obj, $tag, $md5, $plain_key, $value) = @_; + my ($obj, $key, $value) = @_; - # This verifies that only supported values will be stored. + my $r = Scalar::Util::reftype( $value ) || ''; { - my $r = Scalar::Util::reftype( $value ); - last if !defined $r; - + last if $r eq ''; last if $r eq 'HASH'; last if $r eq 'ARRAY'; - $obj->_throw_error( - "Storage of variables of type '$r' is not supported." + DBM::Deep->_throw_error( + "Storage of references of type '$r' is not supported." ); } - my $location = 0; - my $result = 2; + # This will be a Reference sector + my $sector = $self->_load_sector( $obj->_base_offset ) + or DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep." ); - my $root = $obj->_root; - my $fh = $obj->_fh; + if ( $sector->staleness != $obj->_staleness ) { + DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep." ); + } + + my ($class, $type); + if ( !defined $value ) { + $class = 'DBM::Deep::Engine::Sector::Null'; + } + elsif ( $r eq 'ARRAY' || $r eq 'HASH' ) { + my $tmpvar; + if ( $r eq 'ARRAY' ) { + $tmpvar = tied @$value; + } elsif ( $r eq 'HASH' ) { + $tmpvar = tied %$value; + } - my $actual_length = $self->_length_needed( $obj, $value, $plain_key ); + if ( $tmpvar ) { + my $is_dbm_deep = eval { local $SIG{'__DIE__'}; $tmpvar->isa( 'DBM::Deep' ); }; - my ($subloc, $offset) = $self->_find_in_buckets( $tag, $md5 ); + unless ( $is_dbm_deep ) { + DBM::Deep->_throw_error( "Cannot store something that is tied." ); + } - # Updating a known md5 - if ( $subloc ) { - $result = 1; + unless ( $tmpvar->_engine->storage == $self->storage ) { + DBM::Deep->_throw_error( "Cannot store values across DBM::Deep files. Please use export() instead." ); + } - seek($fh, $subloc + SIG_SIZE + $root->{file_offset}, SEEK_SET); - my $size; - read( $fh, $size, $self->{data_size}); - $size = unpack($self->{data_pack}, $size); + # First, verify if we're storing the same thing to this spot. If we are, then + # this should be a no-op. -EJS, 2008-05-19 + my $loc = $sector->get_data_location_for({ + key_md5 => $self->_apply_digest( $key ), + allow_head => 1, + }); - if ($actual_length <= $size) { - $location = $subloc; - } - else { - $location = $self->_request_space( $obj, $actual_length ); - seek( - $fh, - $tag->{offset} + $offset + $self->{hash_size} + $root->{file_offset}, - SEEK_SET, - ); - print( $fh pack($self->{long_pack}, $location) ); + if ( defined($loc) && $loc == $tmpvar->_base_offset ) { + return 1; + } + + #XXX Can this use $loc? + my $value_sector = $self->_load_sector( $tmpvar->_base_offset ); + $sector->write_data({ + key => $key, + key_md5 => $self->_apply_digest( $key ), + value => $value_sector, + }); + $value_sector->increment_refcount; + + return 1; } - } - # Adding a new md5 - elsif ( defined $offset ) { - $location = $self->_request_space( $obj, $actual_length ); - seek( $fh, $tag->{offset} + $offset + $root->{file_offset}, SEEK_SET ); - print( $fh $md5 . pack($self->{long_pack}, $location) ); + $class = 'DBM::Deep::Engine::Sector::Reference'; + $type = substr( $r, 0, 1 ); } - # If bucket didn't fit into list, split into a new index level else { - $self->split_index( $obj, $md5, $tag ); - - $location = $self->_request_space( $obj, $actual_length ); + if ( tied($value) ) { + DBM::Deep->_throw_error( "Cannot store something that is tied." ); + } + $class = 'DBM::Deep::Engine::Sector::Scalar'; } - $self->write_value( $obj, $location, $plain_key, $value ); + # Create this after loading the reference sector in case something bad happens. + # This way, we won't allocate value sector(s) needlessly. + my $value_sector = $class->new({ + engine => $self, + data => $value, + type => $type, + }); + + $sector->write_data({ + key => $key, + key_md5 => $self->_apply_digest( $key ), + value => $value_sector, + }); + + # This code is to make sure we write all the values in the $value to the disk + # and to make sure all changes to $value after the assignment are reflected + # on disk. This may be counter-intuitive at first, but it is correct dwimmery. + # NOTE - simply tying $value won't perform a STORE on each value. Hence, the + # copy to a temp value. + if ( $r eq 'ARRAY' ) { + my @temp = @$value; + tie @$value, 'DBM::Deep', { + base_offset => $value_sector->offset, + staleness => $value_sector->staleness, + storage => $self->storage, + engine => $self, + }; + @$value = @temp; + bless $value, 'DBM::Deep::Array' unless Scalar::Util::blessed( $value ); + } + elsif ( $r eq 'HASH' ) { + my %temp = %$value; + tie %$value, 'DBM::Deep', { + base_offset => $value_sector->offset, + staleness => $value_sector->staleness, + storage => $self->storage, + engine => $self, + }; + + %$value = %temp; + bless $value, 'DBM::Deep::Hash' unless Scalar::Util::blessed( $value ); + } - return $result; + return 1; } -sub write_value { +=head2 get_next_key( $obj, $prev_key ) + +This takes an object that provides _base_offset() and an optional string +representing the prior key returned via a prior invocation of this method. + +This method delegates to C<< DBM::Deep::Iterator->get_next_key() >>. + +=cut + +# XXX Add staleness here +sub get_next_key { my $self = shift; - my ($obj, $location, $key, $value) = @_; + my ($obj, $prev_key) = @_; + + # XXX Need to add logic about resetting the iterator if any key in the reference has changed + unless ( $prev_key ) { + $obj->{iterator} = DBM::Deep::Iterator->new({ + base_offset => $obj->_base_offset, + engine => $self, + }); + } - my $fh = $obj->_fh; - my $root = $obj->_root; + return $obj->{iterator}->get_next_key( $obj ); +} - my $is_dbm_deep = eval { - local $SIG{'__DIE__'}; - $value->isa( 'DBM::Deep' ); - }; +=head2 setup_fh( $obj ) - my $internal_ref = $is_dbm_deep && ($value->_root eq $root); +This takes an object that provides _base_offset(). It will do everything needed +in order to properly initialize all values for necessary functioning. If this is +called upon an already initialized object, this will also reset the inode. - seek($fh, $location + $root->{file_offset}, SEEK_SET); +This returns 1. - ## - # Write signature based on content type, set content length and write - # actual value. - ## - my $r = Scalar::Util::reftype($value) || ''; - if ( $internal_ref ) { - $self->create_tag( $obj, undef, SIG_INTERNAL,pack($self->{long_pack}, $value->_base_offset) ); - } - elsif ($r eq 'HASH') { - $self->create_tag( $obj, undef, SIG_HASH, chr(0)x$self->{index_size} ); - } - elsif ($r eq 'ARRAY') { - $self->create_tag( $obj, undef, SIG_ARRAY, chr(0)x$self->{index_size} ); - } - elsif (!defined($value)) { - $self->create_tag( $obj, undef, SIG_INTERNAL, '' ); - } - else { - $self->create_tag( $obj, undef, SIG_DATA, $value ); - } - - ## - # Plain key is stored AFTER value, as keys are typically fetched less often. - ## - print( $fh pack($self->{data_pack}, length($key)) . $key ); - - ## - # If value is blessed, preserve class name - ## - if ( $root->{autobless} ) { - my $value_class = Scalar::Util::blessed($value); - if ( defined $value_class && !$is_dbm_deep ) { - print( $fh chr(1) ); - print( $fh pack($self->{data_pack}, length($value_class)) . $value_class ); +=cut + +sub setup_fh { + my $self = shift; + my ($obj) = @_; + + # We're opening the file. + unless ( $obj->_base_offset ) { + my $bytes_read = $self->_read_file_header; + + # Creating a new file + unless ( $bytes_read ) { + $self->_write_file_header; + + # 1) Create Array/Hash entry + my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({ + engine => $self, + type => $obj->_type, + }); + $obj->{base_offset} = $initial_reference->offset; + $obj->{staleness} = $initial_reference->staleness; + + $self->storage->flush; } + # Reading from an existing file else { - print( $fh chr(0) ); - } - } - - ## - # If content is a hash or array, create new child DBM::Deep object and - # pass each key or element to it. - ## - if ( !$internal_ref ) { - if ($r eq 'HASH') { - my $branch = DBM::Deep->new( - type => DBM::Deep->TYPE_HASH, - base_offset => $location, - root => $root, - ); - foreach my $key (keys %{$value}) { - $branch->STORE( $key, $value->{$key} ); + $obj->{base_offset} = $bytes_read; + my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({ + engine => $self, + offset => $obj->_base_offset, + }); + unless ( $initial_reference ) { + DBM::Deep->_throw_error("Corrupted file, no master index record"); } - } - elsif ($r eq 'ARRAY') { - my $branch = DBM::Deep->new( - type => DBM::Deep->TYPE_ARRAY, - base_offset => $location, - root => $root, - ); - my $index = 0; - foreach my $element (@{$value}) { - $branch->STORE( $index, $element ); - $index++; + + unless ($obj->_type eq $initial_reference->type) { + DBM::Deep->_throw_error("File type mismatch"); } + + $obj->{staleness} = $initial_reference->staleness; } } + $self->storage->set_inode; + return 1; } -sub split_index { - my $self = shift; - my ($obj, $md5, $tag) = @_; +=head2 begin_work( $obj ) - my $fh = $obj->_fh; - my $root = $obj->_root; - my $keys = $tag->{content}; +This takes an object that provides _base_offset(). It will set up all necessary +bookkeeping in order to run all work within a transaction. - seek($fh, $tag->{ref_loc} + $root->{file_offset}, SEEK_SET); +If $obj is already within a transaction, an error wiill be thrown. If there are +no more available transactions, an error will be thrown. - my $loc = $self->_request_space( - $obj, $self->tag_size( $self->{index_size} ), - ); +This returns undef. - print( $fh pack($self->{long_pack}, $loc) ); +=cut - my $index_tag = $self->create_tag( - $obj, $loc, SIG_INDEX, - chr(0)x$self->{index_size}, - ); +sub begin_work { + my $self = shift; + my ($obj) = @_; - my @offsets = (); + if ( $self->trans_id ) { + DBM::Deep->_throw_error( "Cannot begin_work within an active transaction" ); + } - $keys .= $md5 . pack($self->{long_pack}, 0); + my @slots = $self->read_txn_slots; + my $found; + for my $i ( 0 .. $#slots ) { + next if $slots[$i]; - BUCKET: - for (my $i = 0; $i <= $self->{max_buckets}; $i++) { - my ($key, $old_subloc) = $self->_get_key_subloc( $keys, $i ); + $slots[$i] = 1; + $self->set_trans_id( $i + 1 ); + $found = 1; + last; + } + unless ( $found ) { + DBM::Deep->_throw_error( "Cannot allocate transaction ID" ); + } + $self->write_txn_slots( @slots ); - next BUCKET unless $key; + if ( !$self->trans_id ) { + DBM::Deep->_throw_error( "Cannot begin_work - no available transactions" ); + } - my $num = ord(substr($key, $tag->{ch} + 1, 1)); + return; +} - if ($offsets[$num]) { - my $offset = $offsets[$num] + SIG_SIZE + $self->{data_size}; - seek($fh, $offset + $root->{file_offset}, SEEK_SET); - my $subkeys; - read( $fh, $subkeys, $self->{bucket_list_size}); +=head2 rollback( $obj ) - for (my $k=0; $k<$self->{max_buckets}; $k++) { - my ($temp, $subloc) = $self->_get_key_subloc( $subkeys, $k ); +This takes an object that provides _base_offset(). It will revert all +actions taken within the running transaction. - if (!$subloc) { - seek($fh, $offset + ($k * $self->{bucket_size}) + $root->{file_offset}, SEEK_SET); - print( $fh $key . pack($self->{long_pack}, $old_subloc || $root->{end}) ); - last; - } - } # k loop - } - else { - $offsets[$num] = $root->{end}; - seek($fh, $index_tag->{offset} + ($num * $self->{long_size}) + $root->{file_offset}, SEEK_SET); +If $obj is not within a transaction, an error will be thrown. - my $loc = $self->_request_space( - $obj, $self->tag_size( $self->{bucket_list_size} ), - ); +This returns 1. - print( $fh pack($self->{long_pack}, $loc) ); +=cut - my $blist_tag = $self->create_tag( - $obj, $loc, SIG_BLIST, - chr(0)x$self->{bucket_list_size}, - ); +sub rollback { + my $self = shift; + my ($obj) = @_; + + if ( !$self->trans_id ) { + DBM::Deep->_throw_error( "Cannot rollback without an active transaction" ); + } - seek($fh, $blist_tag->{offset} + $root->{file_offset}, SEEK_SET); - print( $fh $key . pack($self->{long_pack}, $old_subloc || $root->{end}) ); + # Each entry is the file location for a bucket that has a modification for + # this transaction. The entries need to be expunged. + foreach my $entry (@{ $self->get_entries } ) { + # Remove the entry here + my $read_loc = $entry + + $self->hash_size + + $self->byte_size + + $self->byte_size + + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE ); + + my $data_loc = $self->storage->read_at( $read_loc, $self->byte_size ); + $data_loc = unpack( $StP{$self->byte_size}, $data_loc ); + $self->storage->print_at( $read_loc, pack( $StP{$self->byte_size}, 0 ) ); + + if ( $data_loc > 1 ) { + $self->_load_sector( $data_loc )->free; } - } # i loop + } - return; + $self->clear_entries; + + my @slots = $self->read_txn_slots; + $slots[$self->trans_id-1] = 0; + $self->write_txn_slots( @slots ); + $self->inc_txn_staleness_counter( $self->trans_id ); + $self->set_trans_id( 0 ); + + return 1; } -sub read_from_loc { - my $self = shift; - my ($obj, $subloc) = @_; - - my $fh = $obj->_fh; - - ## - # Found match -- seek to offset and read signature - ## - my $signature; - seek($fh, $subloc + $obj->_root->{file_offset}, SEEK_SET); - read( $fh, $signature, SIG_SIZE); - - ## - # If value is a hash or array, return new DBM::Deep object with correct offset - ## - if (($signature eq SIG_HASH) || ($signature eq SIG_ARRAY)) { - my $obj = DBM::Deep->new( - type => $signature, - base_offset => $subloc, - root => $obj->_root, - ); +=head2 commit( $obj ) - if ($obj->_root->{autobless}) { - ## - # Skip over value and plain key to see if object needs - # to be re-blessed - ## - seek($fh, $self->{data_size} + $self->{index_size}, SEEK_CUR); - - my $size; - read( $fh, $size, $self->{data_size}); $size = unpack($self->{data_pack}, $size); - if ($size) { seek($fh, $size, SEEK_CUR); } - - my $bless_bit; - read( $fh, $bless_bit, 1); - if (ord($bless_bit)) { - ## - # Yes, object needs to be re-blessed - ## - my $class_name; - read( $fh, $size, $self->{data_size}); $size = unpack($self->{data_pack}, $size); - if ($size) { read( $fh, $class_name, $size); } - if ($class_name) { $obj = bless( $obj, $class_name ); } - } - } +This takes an object that provides _base_offset(). It will apply all +actions taken within the transaction to the HEAD. - return $obj; +If $obj is not within a transaction, an error will be thrown. + +This returns 1. + +=cut + +sub commit { + my $self = shift; + my ($obj) = @_; + + if ( !$self->trans_id ) { + DBM::Deep->_throw_error( "Cannot commit without an active transaction" ); } - elsif ( $signature eq SIG_INTERNAL ) { - my $size; - read( $fh, $size, $self->{data_size}); - $size = unpack($self->{data_pack}, $size); - if ( $size ) { - my $new_loc; - read( $fh, $new_loc, $size ); - $new_loc = unpack( $self->{long_pack}, $new_loc ); + foreach my $entry (@{ $self->get_entries } ) { + # Overwrite the entry in head with the entry in trans_id + my $base = $entry + + $self->hash_size + + $self->byte_size; - return $self->read_from_loc( $obj, $new_loc ); - } - else { - return; + my $head_loc = $self->storage->read_at( $base, $self->byte_size ); + $head_loc = unpack( $StP{$self->byte_size}, $head_loc ); + + my $spot = $base + $self->byte_size + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE ); + my $trans_loc = $self->storage->read_at( + $spot, $self->byte_size, + ); + + $self->storage->print_at( $base, $trans_loc ); + $self->storage->print_at( + $spot, + pack( $StP{$self->byte_size} . ' ' . $StP{$STALE_SIZE}, (0) x 2 ), + ); + + if ( $head_loc > 1 ) { + $self->_load_sector( $head_loc )->free; } } - ## - # Otherwise return actual value - ## - elsif ($signature eq SIG_DATA) { - my $size; - read( $fh, $size, $self->{data_size}); - $size = unpack($self->{data_pack}, $size); - my $value = ''; - if ($size) { read( $fh, $value, $size); } - return $value; - } + $self->clear_entries; - ## - # Key exists, but content is null - ## - return; + my @slots = $self->read_txn_slots; + $slots[$self->trans_id-1] = 0; + $self->write_txn_slots( @slots ); + $self->inc_txn_staleness_counter( $self->trans_id ); + $self->set_trans_id( 0 ); + + return 1; } -sub get_bucket_value { - ## - # Fetch single value given tag and MD5 digested key. - ## +=head2 lock_exclusive() + +This takes an object that provides _base_offset(). It will guarantee that +the storage has taken precautions to be safe for a write. + +This returns nothing. + +=cut + +sub lock_exclusive { my $self = shift; - my ($obj, $tag, $md5) = @_; + my ($obj) = @_; + return $self->storage->lock_exclusive( $obj ); +} - my ($subloc, $offset) = $self->_find_in_buckets( $tag, $md5 ); - if ( $subloc ) { - return $self->read_from_loc( $obj, $subloc ); - } - return; +=head2 lock_shared() + +This takes an object that provides _base_offset(). It will guarantee that +the storage has taken precautions to be safe for a read. + +This returns nothing. + +=cut + +sub lock_shared { + my $self = shift; + my ($obj) = @_; + return $self->storage->lock_shared( $obj ); } -sub delete_bucket { - ## - # Delete single key/value pair given tag and MD5 digested key. - ## +=head2 unlock() + +This takes an object that provides _base_offset(). It will guarantee that +the storage has released all locks taken. + +This returns nothing. + +=cut + +sub unlock { my $self = shift; - my ($obj, $tag, $md5) = @_; + my ($obj) = @_; - my ($subloc, $offset) = $self->_find_in_buckets( $tag, $md5 ); - if ( $subloc ) { - my $fh = $obj->_fh; - seek($fh, $tag->{offset} + $offset + $obj->_root->{file_offset}, SEEK_SET); - print( $fh substr($tag->{content}, $offset + $self->{bucket_size} ) ); - print( $fh chr(0) x $self->{bucket_size} ); + my $rv = $self->storage->unlock( $obj ); - return 1; - } - return; + $self->flush if $rv; + + return $rv; } -sub bucket_exists { - ## - # Check existence of single key given tag and MD5 digested key. - ## +=head1 INTERNAL METHODS + +The following methods are internal-use-only to DBM::Deep::Engine. + +=cut + +=head2 read_txn_slots() + +This takes no arguments. + +This will return an array with a 1 or 0 in each slot. Each spot represents one +available transaction. If the slot is 1, that transaction is taken. If it is 0, +the transaction is available. + +=cut + +sub read_txn_slots { my $self = shift; - my ($obj, $tag, $md5) = @_; + my $bl = $self->txn_bitfield_len; + my $num_bits = $bl * 8; + return split '', unpack( 'b'.$num_bits, + $self->storage->read_at( + $self->trans_loc, $bl, + ) + ); +} - my ($subloc, $offset) = $self->_find_in_buckets( $tag, $md5 ); - return $subloc && 1; +=head2 write_txn_slots( @slots ) + +This takes an array of 1's and 0's. This array represents the transaction slots +returned by L. In other words, the following is true: + + @x = read_txn_slots( write_txn_slots( @x ) ); + +(With the obviously missing object referents added back in.) + +=cut + +sub write_txn_slots { + my $self = shift; + my $num_bits = $self->txn_bitfield_len * 8; + $self->storage->print_at( $self->trans_loc, + pack( 'b'.$num_bits, join('', @_) ), + ); } -sub find_bucket_list { - ## - # Locate offset for bucket list, given digested key - ## +=head2 get_running_txn_ids() + +This takes no arguments. + +This will return an array of taken transaction IDs. This wraps L. + +=cut + +sub get_running_txn_ids { my $self = shift; - my ($obj, $md5, $args) = @_; - $args = {} unless $args; + my @transactions = $self->read_txn_slots; + my @trans_ids = map { $_+1 } grep { $transactions[$_] } 0 .. $#transactions; +} - ## - # Locate offset for bucket list using digest index system - ## - my $tag = $self->load_tag($obj, $obj->_base_offset) - or $obj->_throw_error( "INTERNAL ERROR - Cannot find tag" ); +=head2 get_txn_staleness_counter( $trans_id ) - my $ch = 0; - while ($tag->{signature} ne SIG_BLIST) { - my $num = ord substr($md5, $ch, 1); +This will return the staleness counter for the given transaction ID. Please see +L for more information. - my $ref_loc = $tag->{offset} + ($num * $self->{long_size}); - $tag = $self->index_lookup( $obj, $tag, $num ); +=cut - if (!$tag) { - return if !$args->{create}; +sub get_txn_staleness_counter { + my $self = shift; + my ($trans_id) = @_; - my $fh = $obj->_fh; - seek($fh, $ref_loc + $obj->_root->{file_offset}, SEEK_SET); + # Hardcode staleness of 0 for the HEAD + return 0 unless $trans_id; - my $loc = $self->_request_space( - $obj, $self->tag_size( $self->{bucket_list_size} ), - ); + return unpack( $StP{$STALE_SIZE}, + $self->storage->read_at( + $self->trans_loc + $self->txn_bitfield_len + $STALE_SIZE * ($trans_id - 1), + $STALE_SIZE, + ) + ); +} - print( $fh pack($self->{long_pack}, $loc) ); +=head2 inc_txn_staleness_counter( $trans_id ) - $tag = $self->create_tag( - $obj, $loc, SIG_BLIST, - chr(0)x$self->{bucket_list_size}, - ); +This will increment the staleness counter for the given transaction ID. Please see +L for more information. - $tag->{ref_loc} = $ref_loc; - $tag->{ch} = $ch; +=cut - last; - } +sub inc_txn_staleness_counter { + my $self = shift; + my ($trans_id) = @_; - $tag->{ch} = $ch++; - $tag->{ref_loc} = $ref_loc; - } + # Hardcode staleness of 0 for the HEAD + return 0 unless $trans_id; - return $tag; + $self->storage->print_at( + $self->trans_loc + $self->txn_bitfield_len + $STALE_SIZE * ($trans_id - 1), + pack( $StP{$STALE_SIZE}, $self->get_txn_staleness_counter( $trans_id ) + 1 ), + ); } -sub index_lookup { - ## - # Given index tag, lookup single entry in index and return . - ## +=head2 get_entries() + +This takes no arguments. + +This returns a list of all the sectors that have been modified by this transaction. + +=cut + +sub get_entries { my $self = shift; - my ($obj, $tag, $index) = @_; - - my $location = unpack( - $self->{long_pack}, - substr( - $tag->{content}, - $index * $self->{long_size}, - $self->{long_size}, - ), - ); + return [ keys %{ $self->{entries}{$self->trans_id} ||= {} } ]; +} + +=head2 add_entry( $trans_id, $location ) - if (!$location) { return; } +This takes a transaction ID and a file location and marks the sector at that location +as having been modified by the transaction identified by $trans_id. - return $self->load_tag( $obj, $location ); +This returns nothing. + +B: Unlike all the other _entries() methods, there are several cases where +C<< $trans_id != $self->trans_id >> for this method. + +=cut + +sub add_entry { + my $self = shift; + my ($trans_id, $loc) = @_; + + $self->{entries}{$trans_id} ||= {}; + $self->{entries}{$trans_id}{$loc} = undef; } -sub traverse_index { - ## - # Scan index and recursively step into deeper levels, looking for next key. - ## +=head2 reindex_entry( $old_loc, $new_loc ) + +This takes two locations (old and new, respectively). If a location that has been +modified by this transaction is subsequently reindexed due to a bucketlist +overflowing, then the entries hash needs to be made aware of this change. + +This returns nothing. + +=cut + +sub reindex_entry { + my $self = shift; + my ($old_loc, $new_loc) = @_; + + TRANS: + while ( my ($trans_id, $locs) = each %{ $self->{entries} } ) { + if ( exists $locs->{$old_loc} ) { + delete $locs->{$old_loc}; + $locs->{$new_loc} = undef; + next TRANS; + } + } +} + +=head2 clear_entries() + +This takes no arguments. It will clear the entries list for the running transaction. + +This returns nothing. + +=cut + +sub clear_entries { my $self = shift; - my ($obj, $offset, $ch, $force_return_next) = @_; + delete $self->{entries}{$self->trans_id}; +} + +=head2 _write_file_header() + +This writes the file header for a new file. This will write the various settings +that set how the file is interpreted. + +=head2 _read_file_header() + +This reads the file header from an existing file. This will read the various +settings that set how the file is interpreted. + +=cut + +{ + my $header_fixed = length( SIG_FILE ) + 1 + 4 + 4; + my $this_file_version = 3; + + sub _write_file_header { + my $self = shift; + + my $nt = $self->num_txns; + my $bl = $self->txn_bitfield_len; + + my $header_var = 1 + 1 + 1 + 1 + $bl + $STALE_SIZE * ($nt - 1) + 3 * $self->byte_size; + + my $loc = $self->storage->request_space( $header_fixed + $header_var ); + + $self->storage->print_at( $loc, + SIG_FILE, + SIG_HEADER, + pack('N', $this_file_version), # At this point, we're at 9 bytes + pack('N', $header_var), # header size + # --- Above is $header_fixed. Below is $header_var + pack('C', $self->byte_size), + + # These shenanigans are to allow a 256 within a C + pack('C', $self->max_buckets - 1), + pack('C', $self->data_sector_size - 1), + + pack('C', $nt), + pack('C' . $bl, 0 ), # Transaction activeness bitfield + pack($StP{$STALE_SIZE}.($nt-1), 0 x ($nt-1) ), # Transaction staleness counters + pack($StP{$self->byte_size}, 0), # Start of free chain (blist size) + pack($StP{$self->byte_size}, 0), # Start of free chain (data size) + pack($StP{$self->byte_size}, 0), # Start of free chain (index size) + ); - my $tag = $self->load_tag($obj, $offset ); + #XXX Set these less fragilely + $self->set_trans_loc( $header_fixed + 4 ); + $self->set_chains_loc( $header_fixed + 4 + $bl + $STALE_SIZE * ($nt-1) ); - my $fh = $obj->_fh; + return; + } + + sub _read_file_header { + my $self = shift; + + my $buffer = $self->storage->read_at( 0, $header_fixed ); + return unless length($buffer); + + my ($file_signature, $sig_header, $file_version, $size) = unpack( + 'A4 A N N', $buffer + ); + + unless ( $file_signature eq SIG_FILE ) { + $self->storage->close; + DBM::Deep->_throw_error( "Signature not found -- file is not a Deep DB" ); + } - if ($tag->{signature} ne SIG_BLIST) { - my $content = $tag->{content}; - my $start = $obj->{return_next} ? 0 : ord(substr($obj->{prev_md5}, $ch, 1)); + unless ( $sig_header eq SIG_HEADER ) { + $self->storage->close; + DBM::Deep->_throw_error( "Pre-1.00 file version found" ); + } - for (my $idx = $start; $idx < (2**8); $idx++) { - my $subloc = unpack( - $self->{long_pack}, - substr( - $content, - $idx * $self->{long_size}, - $self->{long_size}, - ), + unless ( $file_version == $this_file_version ) { + $self->storage->close; + DBM::Deep->_throw_error( + "Wrong file version found - " . $file_version . + " - expected " . $this_file_version ); + } - if ($subloc) { - my $result = $self->traverse_index( - $obj, $subloc, $ch + 1, $force_return_next, - ); + my $buffer2 = $self->storage->read_at( undef, $size ); + my @values = unpack( 'C C C C', $buffer2 ); - if (defined($result)) { return $result; } - } - } # index loop + if ( @values != 4 || grep { !defined } @values ) { + $self->storage->close; + DBM::Deep->_throw_error("Corrupted file - bad header"); + } - $obj->{return_next} = 1; - } # tag is an index + #XXX Add warnings if values weren't set right + @{$self}{qw(byte_size max_buckets data_sector_size num_txns)} = @values; - else { - my $keys = $tag->{content}; - if ($force_return_next) { $obj->{return_next} = 1; } - - ## - # Iterate through buckets, looking for a key match - ## - for (my $i = 0; $i < $self->{max_buckets}; $i++) { - my ($key, $subloc) = $self->_get_key_subloc( $keys, $i ); - - # End of bucket list -- return to outer loop - if (!$subloc) { - $obj->{return_next} = 1; - last; - } - # Located previous key -- return next one found - elsif ($key eq $obj->{prev_md5}) { - $obj->{return_next} = 1; - next; - } - # Seek to bucket location and skip over signature - elsif ($obj->{return_next}) { - seek($fh, $subloc + $obj->_root->{file_offset}, SEEK_SET); - - # Skip over value to get to plain key - my $sig; - read( $fh, $sig, SIG_SIZE ); - - my $size; - read( $fh, $size, $self->{data_size}); - $size = unpack($self->{data_pack}, $size); - if ($size) { seek($fh, $size, SEEK_CUR); } - - # Read in plain key and return as scalar - my $plain_key; - read( $fh, $size, $self->{data_size}); - $size = unpack($self->{data_pack}, $size); - if ($size) { read( $fh, $plain_key, $size); } - - return $plain_key; - } + # These shenangians are to allow a 256 within a C + $self->{max_buckets} += 1; + $self->{data_sector_size} += 1; + + my $bl = $self->txn_bitfield_len; + + my $header_var = scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) + 3 * $self->byte_size; + unless ( $size == $header_var ) { + $self->storage->close; + DBM::Deep->_throw_error( "Unexpected size found ($size <-> $header_var)." ); } - $obj->{return_next} = 1; - } # tag is a bucket list + $self->set_trans_loc( $header_fixed + scalar(@values) ); + $self->set_chains_loc( $header_fixed + scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) ); - return; + return length($buffer) + length($buffer2); + } } -sub get_next_key { - ## - # Locate next key, given digested previous one - ## +=head2 _load_sector( $offset ) + +This will instantiate and return the sector object that represents the data found +at $offset. + +=cut + +sub _load_sector { my $self = shift; - my ($obj) = @_; + my ($offset) = @_; + + # Add a catch for offset of 0 or 1 + return if !$offset || $offset <= 1; - $obj->{prev_md5} = $_[1] ? $_[1] : undef; - $obj->{return_next} = 0; + my $type = $self->storage->read_at( $offset, 1 ); + return if $type eq chr(0); - ## - # If the previous key was not specifed, start at the top and - # return the first one found. - ## - if (!$obj->{prev_md5}) { - $obj->{prev_md5} = chr(0) x $self->{hash_size}; - $obj->{return_next} = 1; + if ( $type eq $self->SIG_ARRAY || $type eq $self->SIG_HASH ) { + return DBM::Deep::Engine::Sector::Reference->new({ + engine => $self, + type => $type, + offset => $offset, + }); + } + # XXX Don't we need key_md5 here? + elsif ( $type eq $self->SIG_BLIST ) { + return DBM::Deep::Engine::Sector::BucketList->new({ + engine => $self, + type => $type, + offset => $offset, + }); + } + elsif ( $type eq $self->SIG_INDEX ) { + return DBM::Deep::Engine::Sector::Index->new({ + engine => $self, + type => $type, + offset => $offset, + }); + } + elsif ( $type eq $self->SIG_NULL ) { + return DBM::Deep::Engine::Sector::Null->new({ + engine => $self, + type => $type, + offset => $offset, + }); + } + elsif ( $type eq $self->SIG_DATA ) { + return DBM::Deep::Engine::Sector::Scalar->new({ + engine => $self, + type => $type, + offset => $offset, + }); + } + # This was deleted from under us, so just return and let the caller figure it out. + elsif ( $type eq $self->SIG_FREE ) { + return; } - return $self->traverse_index( $obj, $obj->_base_offset, 0 ); + DBM::Deep->_throw_error( "'$offset': Don't know what to do with type '$type'" ); } -# Utilities +=head2 _apply_digest( @stuff ) -sub _get_key_subloc { - my $self = shift; - my ($keys, $idx) = @_; - - my ($key, $subloc) = unpack( - "a$self->{hash_size} $self->{long_pack}", - substr( - $keys, - ($idx * $self->{bucket_size}), - $self->{bucket_size}, - ), - ); +This will apply the digest methd (default to Digest::MD5::md5) to the arguments +passed in and return the result. + +=cut - return ($key, $subloc); +sub _apply_digest { + my $self = shift; + return $self->{digest}->(@_); } -sub _find_in_buckets { +=head2 _add_free_blist_sector( $offset, $size ) + +=head2 _add_free_data_sector( $offset, $size ) + +=head2 _add_free_index_sector( $offset, $size ) + +These methods are all wrappers around _add_free_sector(), providing the proper +chain offset ($multiple) for the sector type. + +=cut + +sub _add_free_blist_sector { shift->_add_free_sector( 0, @_ ) } +sub _add_free_data_sector { shift->_add_free_sector( 1, @_ ) } +sub _add_free_index_sector { shift->_add_free_sector( 2, @_ ) } + +=head2 _add_free_sector( $multiple, $offset, $size ) + +_add_free_sector() takes the offset into the chains location, the offset of the +sector, and the size of that sector. It will mark the sector as a free sector +and put it into the list of sectors that are free of this type for use later. + +This returns nothing. + +B: $size is unused? + +=cut + +sub _add_free_sector { my $self = shift; - my ($tag, $md5) = @_; + my ($multiple, $offset, $size) = @_; - BUCKET: - for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) { - my ($key, $subloc) = $self->_get_key_subloc( $tag->{content}, $i ); + my $chains_offset = $multiple * $self->byte_size; - return ($subloc, $i * $self->{bucket_size}) unless $subloc; + my $storage = $self->storage; - next BUCKET if $key ne $md5; + # Increment staleness. + # XXX Can this increment+modulo be done by "&= 0x1" ? + my $staleness = unpack( $StP{$STALE_SIZE}, $storage->read_at( $offset + SIG_SIZE, $STALE_SIZE ) ); + $staleness = ($staleness + 1 ) % ( 2 ** ( 8 * $STALE_SIZE ) ); + $storage->print_at( $offset + SIG_SIZE, pack( $StP{$STALE_SIZE}, $staleness ) ); - return ($subloc, $i * $self->{bucket_size}); - } + my $old_head = $storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size ); - return; + $storage->print_at( $self->chains_loc + $chains_offset, + pack( $StP{$self->byte_size}, $offset ), + ); + + # Record the old head in the new sector after the signature and staleness counter + $storage->print_at( $offset + SIG_SIZE + $STALE_SIZE, $old_head ); } -sub _request_space { +=head2 _request_blist_sector( $size ) + +=head2 _request_data_sector( $size ) + +=head2 _request_index_sector( $size ) + +These methods are all wrappers around _request_sector(), providing the proper +chain offset ($multiple) for the sector type. + +=cut + +sub _request_blist_sector { shift->_request_sector( 0, @_ ) } +sub _request_data_sector { shift->_request_sector( 1, @_ ) } +sub _request_index_sector { shift->_request_sector( 2, @_ ) } + +=head2 _request_sector( $multiple $size ) + +This takes the offset into the chains location and the size of that sector. + +This returns the object with the sector. If there is an available free sector of +that type, then it will be reused. If there isn't one, then a new one will be +allocated. + +=cut + +sub _request_sector { my $self = shift; - my ($obj, $size) = @_; + my ($multiple, $size) = @_; + + my $chains_offset = $multiple * $self->byte_size; + + my $old_head = $self->storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size ); + my $loc = unpack( $StP{$self->byte_size}, $old_head ); - my $loc = $obj->_root->{end}; - $obj->_root->{end} += $size; + # We don't have any free sectors of the right size, so allocate a new one. + unless ( $loc ) { + my $offset = $self->storage->request_space( $size ); + + # Zero out the new sector. This also guarantees correct increases + # in the filesize. + $self->storage->print_at( $offset, chr(0) x $size ); + + return $offset; + } + + # Read the new head after the signature and the staleness counter + my $new_head = $self->storage->read_at( $loc + SIG_SIZE + $STALE_SIZE, $self->byte_size ); + $self->storage->print_at( $self->chains_loc + $chains_offset, $new_head ); + $self->storage->print_at( + $loc + SIG_SIZE + $STALE_SIZE, + pack( $StP{$self->byte_size}, 0 ), + ); return $loc; } -sub _release_space { +=head2 flush() + +This takes no arguments. It will do everything necessary to flush all things to +disk. This is usually called during unlock() and setup_fh(). + +This returns nothing. + +=cut + +sub flush { my $self = shift; - my ($obj, $size, $loc) = @_; - return; + # Why do we need to have the storage flush? Shouldn't autoflush take care of things? + # -RobK, 2008-06-26 + $self->storage->flush; } -1; -__END__ +=head2 ACCESSORS -# This will be added in later, after more refactoring is done. This is an early -# attempt at refactoring on the physical level instead of the virtual level. -sub _read_at { - my $self = shift; - my ($obj, $spot, $amount, $unpack) = @_; +The following are readonly attributes. + +=over 4 + +=item * storage + +=item * byte_size + +=item * hash_size + +=item * hash_chars + +=item * num_txns + +=item * max_buckets - my $fh = $obj->_fh; - seek( $fh, $spot + $obj->_root->{file_offset}, SEEK_SET ); +=item * blank_md5 - my $buffer; - my $bytes_read = read( $fh, $buffer, $amount ); +=item * data_sector_size - if ( $unpack ) { - $buffer = unpack( $unpack, $buffer ); +=item * txn_bitfield_len + +=back + +=cut + +sub storage { $_[0]{storage} } +sub byte_size { $_[0]{byte_size} } +sub hash_size { $_[0]{hash_size} } +sub hash_chars { $_[0]{hash_chars} } +sub num_txns { $_[0]{num_txns} } +sub max_buckets { $_[0]{max_buckets} } +sub blank_md5 { chr(0) x $_[0]->hash_size } +sub data_sector_size { $_[0]{data_sector_size} } + +# This is a calculated value +sub txn_bitfield_len { + my $self = shift; + unless ( exists $self->{txn_bitfield_len} ) { + my $temp = ($self->num_txns) / 8; + if ( $temp > int( $temp ) ) { + $temp = int( $temp ) + 1; + } + $self->{txn_bitfield_len} = $temp; } + return $self->{txn_bitfield_len}; +} + +=pod + +The following are read/write attributes. + +=over 4 + +=item * trans_id / set_trans_id( $new_id ) + +=item * trans_loc / set_trans_loc( $new_loc ) + +=item * chains_loc / set_chains_loc( $new_loc ) + +=back + +=cut + +sub trans_id { $_[0]{trans_id} } +sub set_trans_id { $_[0]{trans_id} = $_[1] } + +sub trans_loc { $_[0]{trans_loc} } +sub set_trans_loc { $_[0]{trans_loc} = $_[1] } + +sub chains_loc { $_[0]{chains_loc} } +sub set_chains_loc { $_[0]{chains_loc} = $_[1] } + +sub cache { $_[0]{cache} ||= {} } +sub clear_cache { %{$_[0]->cache} = () } + +=head2 _dump_file() + +This method takes no arguments. It's used to print out a textual representation of the DBM::Deep +DB file. It assumes the file is not-corrupted. + +=cut + +sub _dump_file { + my $self = shift; + + # Read the header + my $spot = $self->_read_file_header(); + + my %types = ( + 0 => 'B', + 1 => 'D', + 2 => 'I', + ); - if ( wantarray ) { - return ($buffer, $bytes_read); + my %sizes = ( + 'D' => $self->data_sector_size, + 'B' => DBM::Deep::Engine::Sector::BucketList->new({engine=>$self,offset=>1})->size, + 'I' => DBM::Deep::Engine::Sector::Index->new({engine=>$self,offset=>1})->size, + ); + + my $return = ""; + + # Header values + $return .= "NumTxns: " . $self->num_txns . $/; + + # Read the free sector chains + my %sectors; + foreach my $multiple ( 0 .. 2 ) { + $return .= "Chains($types{$multiple}):"; + my $old_loc = $self->chains_loc + $multiple * $self->byte_size; + while ( 1 ) { + my $loc = unpack( + $StP{$self->byte_size}, + $self->storage->read_at( $old_loc, $self->byte_size ), + ); + + # We're now out of free sectors of this kind. + unless ( $loc ) { + last; + } + + $sectors{ $types{$multiple} }{ $loc } = undef; + $old_loc = $loc + SIG_SIZE + $STALE_SIZE; + $return .= " $loc"; + } + $return .= $/; } - else { - return $buffer; + + SECTOR: + while ( $spot < $self->storage->{end} ) { + # Read each sector in order. + my $sector = $self->_load_sector( $spot ); + if ( !$sector ) { + # Find it in the free-sectors that were found already + foreach my $type ( keys %sectors ) { + if ( exists $sectors{$type}{$spot} ) { + my $size = $sizes{$type}; + $return .= sprintf "%08d: %s %04d\n", $spot, 'F' . $type, $size; + $spot += $size; + next SECTOR; + } + } + + die "********\n$return\nDidn't find free sector for $spot in chains\n********\n"; + } + else { + $return .= sprintf "%08d: %s %04d", $spot, $sector->type, $sector->size; + if ( $sector->type eq 'D' ) { + $return .= ' ' . $sector->data; + } + elsif ( $sector->type eq 'A' || $sector->type eq 'H' ) { + $return .= ' REF: ' . $sector->get_refcount; + } + elsif ( $sector->type eq 'B' ) { + foreach my $bucket ( $sector->chopped_up ) { + $return .= "\n "; + $return .= sprintf "%08d", unpack($StP{$self->byte_size}, + substr( $bucket->[-1], $self->hash_size, $self->byte_size), + ); + my $l = unpack( $StP{$self->byte_size}, + substr( $bucket->[-1], + $self->hash_size + $self->byte_size, + $self->byte_size, + ), + ); + $return .= sprintf " %08d", $l; + foreach my $txn ( 0 .. $self->num_txns - 2 ) { + my $l = unpack( $StP{$self->byte_size}, + substr( $bucket->[-1], + $self->hash_size + 2 * $self->byte_size + $txn * ($self->byte_size + $STALE_SIZE), + $self->byte_size, + ), + ); + $return .= sprintf " %08d", $l; + } + } + } + $return .= $/; + + $spot += $sector->size; + } } + + return $return; } + +1; +__END__