X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=blobdiff_plain;f=lib%2FDBM%2FDeep%2FEngine.pm;h=933027e66dfc1d720a9a12499101a90f45d070da;hb=f0276afb03319e4eb8235ab9e89f54e6b58d7113;hp=a4918929555de88c4d67837753ed41175f897ba7;hpb=2120a181ed29ae691819ebdbd7cf5233d7fc672c;p=dbsrgits%2FDBM-Deep.git diff --git a/lib/DBM/Deep/Engine.pm b/lib/DBM/Deep/Engine.pm index a491892..933027e 100644 --- a/lib/DBM/Deep/Engine.pm +++ b/lib/DBM/Deep/Engine.pm @@ -3,11 +3,14 @@ package DBM::Deep::Engine; use 5.006_000; use strict; +use warnings FATAL => 'all'; -our $VERSION = q(0.99_04); - +# Never import symbols into our namespace. We are a class, not a library. +# -RobK, 2008-05-27 use Scalar::Util (); +#use Data::Dumper (); + # File-wide notes: # * Every method in here assumes that the storage has been appropriately # safeguarded. This can be anything from flock() to some sort of manual @@ -17,7 +20,6 @@ use Scalar::Util (); # Setup file and tag signatures. These should never change. sub SIG_FILE () { 'DPDB' } sub SIG_HEADER () { 'h' } -sub SIG_INTERNAL () { 'i' } sub SIG_HASH () { 'H' } sub SIG_ARRAY () { 'A' } sub SIG_NULL () { 'N' } @@ -25,24 +27,135 @@ sub SIG_DATA () { 'D' } sub SIG_INDEX () { 'I' } sub SIG_BLIST () { 'B' } sub SIG_FREE () { 'F' } -sub SIG_KEYS () { 'K' } sub SIG_SIZE () { 1 } -sub STALE_SIZE () { 1 } + +use DBM::Deep::Iterator::BucketList (); +use DBM::Deep::Iterator::Index (); +use DBM::Deep::Engine::Sector::Data (); +use DBM::Deep::Engine::Sector::BucketList (); +use DBM::Deep::Engine::Sector::Index (); +use DBM::Deep::Engine::Sector::Null (); +use DBM::Deep::Engine::Sector::Reference (); +use DBM::Deep::Engine::Sector::Scalar (); +use DBM::Deep::Null (); + +my $STALE_SIZE = 2; # Please refer to the pack() documentation for further information my %StP = ( - 1 => 'C', # Unsigned char value (no order specified, presumably ASCII) + 1 => 'C', # Unsigned char value (no order needed as it's just one byte) 2 => 'n', # Unsigned short in "network" (big-endian) order 4 => 'N', # Unsigned long in "network" (big-endian) order 8 => 'Q', # Usigned quad (no order specified, presumably machine-dependent) ); -################################################################################ +=head1 NAME + +DBM::Deep::Engine + +=head1 PURPOSE + +This is an internal-use-only object for L. It mediates the low-level +mapping between the L objects and the storage medium. + +The purpose of this documentation is to provide low-level documentation for +developers. It is B intended to be used by the general public. This +documentation and what it documents can and will change without notice. + +=head1 OVERVIEW + +The engine exposes an API to the DBM::Deep objects (DBM::Deep, DBM::Deep::Array, +and DBM::Deep::Hash) for their use to access the actual stored values. This API +is the following: + +=over 4 + +=item * new + +=item * read_value + +=item * get_classname + +=item * make_reference + +=item * key_exists + +=item * delete_key + +=item * write_value + +=item * get_next_key + +=item * setup_fh + +=item * begin_work + +=item * commit + +=item * rollback + +=item * lock_exclusive + +=item * lock_shared + +=item * unlock + +=back + +They are explained in their own sections below. These methods, in turn, may +provide some bounds-checking, but primarily act to instantiate objects in the +Engine::Sector::* hierarchy and dispatch to them. + +=head1 TRANSACTIONS + +Transactions in DBM::Deep are implemented using a variant of MVCC. This attempts +to keep the amount of actual work done against the file low while stil providing +Atomicity, Consistency, and Isolation. Durability, unfortunately, cannot be done +with only one file. + +=head2 STALENESS + +If another process uses a transaction slot and writes stuff to it, then terminates, +the data that process wrote it still within the file. In order to address this, +there is also a transaction staleness counter associated within every write. +Each time a transaction is started, that process increments that transaction's +staleness counter. If, when it reads a value, the staleness counters aren't +identical, DBM::Deep will consider the value on disk to be stale and discard it. + +=head2 DURABILITY + +The fourth leg of ACID is Durability, the guarantee that when a commit returns, +the data will be there the next time you read from it. This should be regardless +of any crashes or powerdowns in between the commit and subsequent read. DBM::Deep +does provide that guarantee; once the commit returns, all of the data has been +transferred from the transaction shadow to the HEAD. The issue arises with partial +commits - a commit that is interrupted in some fashion. In keeping with DBM::Deep's +"tradition" of very light error-checking and non-existent error-handling, there is +no way to recover from a partial commit. (This is probably a failure in Consistency +as well as Durability.) + +Other DBMSes use transaction logs (a separate file, generally) to achieve Durability. +As DBM::Deep is a single-file, we would have to do something similar to what SQLite +and BDB do in terms of committing using synchonized writes. To do this, we would have +to use a much higher RAM footprint and some serious programming that make my head +hurts just to think about it. + +=head1 EXTERNAL METHODS + +=head2 new() + +This takes a set of args. These args are described in the documentation for +L. + +=cut sub new { my $class = shift; my ($args) = @_; + $args->{storage} = DBM::Deep::File->new( $args ) + unless exists $args->{storage}; + my $self = bless { byte_size => 4, @@ -50,13 +163,17 @@ sub new { hash_size => 16, # In bytes hash_chars => 256, # Number of chars the algorithm uses per byte max_buckets => 16, - num_txns => 2, # HEAD plus 1 additional transaction for importing + num_txns => 1, # The HEAD trans_id => 0, # Default to the HEAD + data_sector_size => 64, # Size in bytes of each data sector + entries => {}, # This is the list of entries for transactions storage => undef, }, $class; + # Never allow byte_size to be set directly. + delete $args->{byte_size}; if ( defined $args->{pack_size} ) { if ( lc $args->{pack_size} eq 'small' ) { $args->{byte_size} = 2; @@ -78,15 +195,26 @@ sub new { $self->{$param} = $args->{$param}; } - ## - # Number of buckets per blist before another level of indexing is - # done. Increase this value for slightly greater speed, but larger database - # files. DO NOT decrease this value below 16, due to risk of recursive - # reindex overrun. - ## - if ( $self->{max_buckets} < 16 ) { - warn "Floor of max_buckets is 16. Setting it to 16 from '$self->{max_buckets}'\n"; - $self->{max_buckets} = 16; + my %validations = ( + max_buckets => { floor => 16, ceil => 256 }, + num_txns => { floor => 1, ceil => 255 }, + data_sector_size => { floor => 32, ceil => 256 }, + ); + + while ( my ($attr, $c) = each %validations ) { + if ( !defined $self->{$attr} + || !length $self->{$attr} + || $self->{$attr} =~ /\D/ + || $self->{$attr} < $c->{floor} + ) { + $self->{$attr} = '(undef)' if !defined $self->{$attr}; + warn "Floor of $attr is $c->{floor}. Setting it to $c->{floor} from '$self->{$attr}'\n"; + $self->{$attr} = $c->{floor}; + } + elsif ( $self->{$attr} > $c->{ceil} ) { + warn "Ceiling of $attr is $c->{ceil}. Setting it to $c->{ceil} from '$self->{$attr}'\n"; + $self->{$attr} = $c->{ceil}; + } } if ( !$self->{digest} ) { @@ -97,7 +225,12 @@ sub new { return $self; } -################################################################################ +=head2 read_value( $obj, $key ) + +This takes an object that provides _base_offset() and a string. It returns the +value stored in the corresponding Sector::Value's data section. + +=cut sub read_value { my $self = shift; @@ -134,6 +267,17 @@ sub read_value { return $value_sector->data; } +=head2 get_classname( $obj ) + +This takes an object that provides _base_offset() and returns the classname (if any) +associated with it. + +It delegates to Sector::Reference::get_classname() for the heavy lifting. + +It performs a staleness check. + +=cut + sub get_classname { my $self = shift; my ($obj) = @_; @@ -149,6 +293,74 @@ sub get_classname { return $sector->get_classname; } +=head2 make_reference( $obj, $old_key, $new_key ) + +This takes an object that provides _base_offset() and two strings. The +strings correspond to the old key and new key, respectively. This operation +is equivalent to (given C<< $db->{foo} = []; >>) C<< $db->{bar} = $db->{foo}; >>. + +This returns nothing. + +=cut + +sub make_reference { + my $self = shift; + my ($obj, $old_key, $new_key) = @_; + + # This will be a Reference sector + my $sector = $self->_load_sector( $obj->_base_offset ) + or DBM::Deep->_throw_error( "How did make_reference fail (no sector for '$obj')?!" ); + + if ( $sector->staleness != $obj->_staleness ) { + return; + } + + my $old_md5 = $self->_apply_digest( $old_key ); + + my $value_sector = $sector->get_data_for({ + key_md5 => $old_md5, + allow_head => 1, + }); + + unless ( $value_sector ) { + $value_sector = DBM::Deep::Engine::Sector::Null->new({ + engine => $self, + data => undef, + }); + + $sector->write_data({ + key_md5 => $old_md5, + key => $old_key, + value => $value_sector, + }); + } + + if ( $value_sector->isa( 'DBM::Deep::Engine::Sector::Reference' ) ) { + $sector->write_data({ + key => $new_key, + key_md5 => $self->_apply_digest( $new_key ), + value => $value_sector, + }); + $value_sector->increment_refcount; + } + else { + $sector->write_data({ + key => $new_key, + key_md5 => $self->_apply_digest( $new_key ), + value => $value_sector->clone, + }); + } + + return; +} + +=head2 key_exists( $obj, $key ) + +This takes an object that provides _base_offset() and a string for +the key to be checked. This returns 1 for true and "" for false. + +=cut + sub key_exists { my $self = shift; my ($obj, $key) = @_; @@ -170,6 +382,14 @@ sub key_exists { return $data ? 1 : ''; } +=head2 delete_key( $obj, $key ) + +This takes an object that provides _base_offset() and a string for +the key to be deleted. This returns the result of the Sector::Reference +delete_key() method. + +=cut + sub delete_key { my $self = shift; my ($obj, $key) = @_; @@ -187,6 +407,15 @@ sub delete_key { }); } +=head2 write_value( $obj, $key, $value ) + +This takes an object that provides _base_offset(), a string for the +key, and a value. This value can be anything storable within L. + +This returns 1 upon success. + +=cut + sub write_value { my $self = shift; my ($obj, $key, $value) = @_; @@ -202,32 +431,70 @@ sub write_value { ); } + # This will be a Reference sector + my $sector = $self->_load_sector( $obj->_base_offset ) + or DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep." ); + + if ( $sector->staleness != $obj->_staleness ) { + DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep." ); + } + my ($class, $type); if ( !defined $value ) { $class = 'DBM::Deep::Engine::Sector::Null'; } elsif ( $r eq 'ARRAY' || $r eq 'HASH' ) { - if ( $r eq 'ARRAY' && tied(@$value) ) { - DBM::Deep->_throw_error( "Cannot store something that is tied." ); + my $tmpvar; + if ( $r eq 'ARRAY' ) { + $tmpvar = tied @$value; + } elsif ( $r eq 'HASH' ) { + $tmpvar = tied %$value; } - if ( $r eq 'HASH' && tied(%$value) ) { - DBM::Deep->_throw_error( "Cannot store something that is tied." ); + + if ( $tmpvar ) { + my $is_dbm_deep = eval { local $SIG{'__DIE__'}; $tmpvar->isa( 'DBM::Deep' ); }; + + unless ( $is_dbm_deep ) { + DBM::Deep->_throw_error( "Cannot store something that is tied." ); + } + + unless ( $tmpvar->_engine->storage == $self->storage ) { + DBM::Deep->_throw_error( "Cannot store values across DBM::Deep files. Please use export() instead." ); + } + + # First, verify if we're storing the same thing to this spot. If we are, then + # this should be a no-op. -EJS, 2008-05-19 + my $loc = $sector->get_data_location_for({ + key_md5 => $self->_apply_digest( $key ), + allow_head => 1, + }); + + if ( defined($loc) && $loc == $tmpvar->_base_offset ) { + return 1; + } + + #XXX Can this use $loc? + my $value_sector = $self->_load_sector( $tmpvar->_base_offset ); + $sector->write_data({ + key => $key, + key_md5 => $self->_apply_digest( $key ), + value => $value_sector, + }); + $value_sector->increment_refcount; + + return 1; } + $class = 'DBM::Deep::Engine::Sector::Reference'; $type = substr( $r, 0, 1 ); } else { + if ( tied($value) ) { + DBM::Deep->_throw_error( "Cannot store something that is tied." ); + } $class = 'DBM::Deep::Engine::Sector::Scalar'; } - # This will be a Reference sector - my $sector = $self->_load_sector( $obj->_base_offset ) - or DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep." ); - - if ( $sector->staleness != $obj->_staleness ) { - DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep.n" ); - } - # Create this after loading the reference sector in case something bad happens. # This way, we won't allocate value sector(s) needlessly. my $value_sector = $class->new({ @@ -274,6 +541,15 @@ sub write_value { return 1; } +=head2 get_next_key( $obj, $prev_key ) + +This takes an object that provides _base_offset() and an optional string +representing the prior key returned via a prior invocation of this method. + +This method delegates to C<< DBM::Deep::Iterator->get_next_key() >>. + +=cut + # XXX Add staleness here sub get_next_key { my $self = shift; @@ -290,7 +566,15 @@ sub get_next_key { return $obj->{iterator}->get_next_key( $obj ); } -################################################################################ +=head2 setup_fh( $obj ) + +This takes an object that provides _base_offset(). It will do everything needed +in order to properly initialize all values for necessary functioning. If this is +called upon an already initialized object, this will also reset the inode. + +This returns 1. + +=cut sub setup_fh { my $self = shift; @@ -333,9 +617,23 @@ sub setup_fh { } } + $self->storage->set_inode; + return 1; } +=head2 begin_work( $obj ) + +This takes an object that provides _base_offset(). It will set up all necessary +bookkeeping in order to run all work within a transaction. + +If $obj is already within a transaction, an error wiill be thrown. If there are +no more available transactions, an error will be thrown. + +This returns undef. + +=cut + sub begin_work { my $self = shift; my ($obj) = @_; @@ -345,12 +643,18 @@ sub begin_work { } my @slots = $self->read_txn_slots; - for my $i ( 1 .. @slots ) { + my $found; + for my $i ( 0 .. $#slots ) { next if $slots[$i]; + $slots[$i] = 1; - $self->set_trans_id( $i ); + $self->set_trans_id( $i + 1 ); + $found = 1; last; } + unless ( $found ) { + DBM::Deep->_throw_error( "Cannot allocate transaction ID" ); + } $self->write_txn_slots( @slots ); if ( !$self->trans_id ) { @@ -360,6 +664,17 @@ sub begin_work { return; } +=head2 rollback( $obj ) + +This takes an object that provides _base_offset(). It will revert all +actions taken within the running transaction. + +If $obj is not within a transaction, an error will be thrown. + +This returns 1. + +=cut + sub rollback { my $self = shift; my ($obj) = @_; @@ -375,7 +690,8 @@ sub rollback { my $read_loc = $entry + $self->hash_size + $self->byte_size - + $self->trans_id * ( $self->byte_size + 4 ); + + $self->byte_size + + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE ); my $data_loc = $self->storage->read_at( $read_loc, $self->byte_size ); $data_loc = unpack( $StP{$self->byte_size}, $data_loc ); @@ -389,7 +705,7 @@ sub rollback { $self->clear_entries; my @slots = $self->read_txn_slots; - $slots[$self->trans_id] = 0; + $slots[$self->trans_id-1] = 0; $self->write_txn_slots( @slots ); $self->inc_txn_staleness_counter( $self->trans_id ); $self->set_trans_id( 0 ); @@ -397,6 +713,17 @@ sub rollback { return 1; } +=head2 commit( $obj ) + +This takes an object that provides _base_offset(). It will apply all +actions taken within the transaction to the HEAD. + +If $obj is not within a transaction, an error will be thrown. + +This returns 1. + +=cut + sub commit { my $self = shift; my ($obj) = @_; @@ -413,14 +740,16 @@ sub commit { my $head_loc = $self->storage->read_at( $base, $self->byte_size ); $head_loc = unpack( $StP{$self->byte_size}, $head_loc ); + + my $spot = $base + $self->byte_size + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE ); my $trans_loc = $self->storage->read_at( - $base + $self->trans_id * ( $self->byte_size + 4 ), $self->byte_size, + $spot, $self->byte_size, ); $self->storage->print_at( $base, $trans_loc ); $self->storage->print_at( - $base + $self->trans_id * ( $self->byte_size + 4 ), - pack( $StP{$self->byte_size} . ' N', (0) x 2 ), + $spot, + pack( $StP{$self->byte_size} . ' ' . $StP{$STALE_SIZE}, (0) x 2 ), ); if ( $head_loc > 1 ) { @@ -431,7 +760,7 @@ sub commit { $self->clear_entries; my @slots = $self->read_txn_slots; - $slots[$self->trans_id] = 0; + $slots[$self->trans_id-1] = 0; $self->write_txn_slots( @slots ); $self->inc_txn_staleness_counter( $self->trans_id ); $self->set_trans_id( 0 ); @@ -439,28 +768,123 @@ sub commit { return 1; } +=head2 lock_exclusive() + +This takes an object that provides _base_offset(). It will guarantee that +the storage has taken precautions to be safe for a write. + +This returns nothing. + +=cut + +sub lock_exclusive { + my $self = shift; + my ($obj) = @_; + return $self->storage->lock_exclusive( $obj ); +} + +=head2 lock_shared() + +This takes an object that provides _base_offset(). It will guarantee that +the storage has taken precautions to be safe for a read. + +This returns nothing. + +=cut + +sub lock_shared { + my $self = shift; + my ($obj) = @_; + return $self->storage->lock_shared( $obj ); +} + +=head2 unlock() + +This takes an object that provides _base_offset(). It will guarantee that +the storage has released all locks taken. + +This returns nothing. + +=cut + +sub unlock { + my $self = shift; + my ($obj) = @_; + + my $rv = $self->storage->unlock( $obj ); + + $self->flush if $rv; + + return $rv; +} + +=head1 INTERNAL METHODS + +The following methods are internal-use-only to DBM::Deep::Engine. + +=cut + +=head2 read_txn_slots() + +This takes no arguments. + +This will return an array with a 1 or 0 in each slot. Each spot represents one +available transaction. If the slot is 1, that transaction is taken. If it is 0, +the transaction is available. + +=cut + sub read_txn_slots { my $self = shift; - return split '', unpack( 'b32', + my $bl = $self->txn_bitfield_len; + my $num_bits = $bl * 8; + return split '', unpack( 'b'.$num_bits, $self->storage->read_at( - $self->trans_loc, 4, + $self->trans_loc, $bl, ) ); } +=head2 write_txn_slots( @slots ) + +This takes an array of 1's and 0's. This array represents the transaction slots +returned by L. In other words, the following is true: + + @x = read_txn_slots( write_txn_slots( @x ) ); + +(With the obviously missing object referents added back in.) + +=cut + sub write_txn_slots { my $self = shift; + my $num_bits = $self->txn_bitfield_len * 8; $self->storage->print_at( $self->trans_loc, - pack( 'b32', join('', @_) ), + pack( 'b'.$num_bits, join('', @_) ), ); } +=head2 get_running_txn_ids() + +This takes no arguments. + +This will return an array of taken transaction IDs. This wraps L. + +=cut + sub get_running_txn_ids { my $self = shift; my @transactions = $self->read_txn_slots; - my @trans_ids = grep { $transactions[$_] } 0 .. $#transactions; + my @trans_ids = map { $_+1 } grep { $transactions[$_] } 0 .. $#transactions; } +=head2 get_txn_staleness_counter( $trans_id ) + +This will return the staleness counter for the given transaction ID. Please see +L for more information. + +=cut + sub get_txn_staleness_counter { my $self = shift; my ($trans_id) = @_; @@ -468,33 +892,59 @@ sub get_txn_staleness_counter { # Hardcode staleness of 0 for the HEAD return 0 unless $trans_id; - my $x = unpack( 'N', + return unpack( $StP{$STALE_SIZE}, $self->storage->read_at( - $self->trans_loc + 4 * $trans_id, - 4, + $self->trans_loc + $self->txn_bitfield_len + $STALE_SIZE * ($trans_id - 1), + $STALE_SIZE, ) ); - return $x; } +=head2 inc_txn_staleness_counter( $trans_id ) + +This will increment the staleness counter for the given transaction ID. Please see +L for more information. + +=cut + sub inc_txn_staleness_counter { my $self = shift; my ($trans_id) = @_; # Hardcode staleness of 0 for the HEAD - return unless $trans_id; + return 0 unless $trans_id; $self->storage->print_at( - $self->trans_loc + 4 * $trans_id, - pack( 'N', $self->get_txn_staleness_counter( $trans_id ) + 1 ), + $self->trans_loc + $self->txn_bitfield_len + $STALE_SIZE * ($trans_id - 1), + pack( $StP{$STALE_SIZE}, $self->get_txn_staleness_counter( $trans_id ) + 1 ), ); } +=head2 get_entries() + +This takes no arguments. + +This returns a list of all the sectors that have been modified by this transaction. + +=cut + sub get_entries { my $self = shift; return [ keys %{ $self->{entries}{$self->trans_id} ||= {} } ]; } +=head2 add_entry( $trans_id, $location ) + +This takes a transaction ID and a file location and marks the sector at that location +as having been modified by the transaction identified by $trans_id. + +This returns nothing. + +B: Unlike all the other _entries() methods, there are several cases where +C<< $trans_id != $self->trans_id >> for this method. + +=cut + sub add_entry { my $self = shift; my ($trans_id, $loc) = @_; @@ -503,59 +953,92 @@ sub add_entry { $self->{entries}{$trans_id}{$loc} = undef; } -# If the buckets are being relocated because of a reindexing, the entries -# mechanism needs to be made aware of it. +=head2 reindex_entry( $old_loc, $new_loc ) + +This takes two locations (old and new, respectively). If a location that has been +modified by this transaction is subsequently reindexed due to a bucketlist +overflowing, then the entries hash needs to be made aware of this change. + +This returns nothing. + +=cut + sub reindex_entry { my $self = shift; my ($old_loc, $new_loc) = @_; TRANS: while ( my ($trans_id, $locs) = each %{ $self->{entries} } ) { - foreach my $orig_loc ( keys %{ $locs } ) { - if ( $orig_loc == $old_loc ) { - delete $locs->{orig_loc}; - $locs->{$new_loc} = undef; - next TRANS; - } + if ( exists $locs->{$old_loc} ) { + delete $locs->{$old_loc}; + $locs->{$new_loc} = undef; + next TRANS; } } } +=head2 clear_entries() + +This takes no arguments. It will clear the entries list for the running transaction. + +This returns nothing. + +=cut + sub clear_entries { my $self = shift; delete $self->{entries}{$self->trans_id}; } -################################################################################ +=head2 _write_file_header() + +This writes the file header for a new file. This will write the various settings +that set how the file is interpreted. + +=head2 _read_file_header() + +This reads the file header from an existing file. This will read the various +settings that set how the file is interpreted. + +=cut { my $header_fixed = length( SIG_FILE ) + 1 + 4 + 4; + my $this_file_version = 3; sub _write_file_header { my $self = shift; - my $header_var = 1 + 1 + 1 + 4 + 4 * $self->num_txns + 3 * $self->byte_size; + my $nt = $self->num_txns; + my $bl = $self->txn_bitfield_len; + + my $header_var = 1 + 1 + 1 + 1 + $bl + $STALE_SIZE * ($nt - 1) + 3 * $self->byte_size; my $loc = $self->storage->request_space( $header_fixed + $header_var ); $self->storage->print_at( $loc, SIG_FILE, SIG_HEADER, - pack('N', 1), # header version - at this point, we're at 9 bytes - pack('N', $header_var), # header size + pack('N', $this_file_version), # At this point, we're at 9 bytes + pack('N', $header_var), # header size # --- Above is $header_fixed. Below is $header_var pack('C', $self->byte_size), - pack('C', $self->max_buckets), - pack('C', $self->num_txns), - pack('N', 0 ), # Transaction activeness bitfield - pack('N' . $self->num_txns, 0 x $self->num_txns ), # Transaction staleness counters + + # These shenanigans are to allow a 256 within a C + pack('C', $self->max_buckets - 1), + pack('C', $self->data_sector_size - 1), + + pack('C', $nt), + pack('C' . $bl, 0 ), # Transaction activeness bitfield + pack($StP{$STALE_SIZE}.($nt-1), 0 x ($nt-1) ), # Transaction staleness counters pack($StP{$self->byte_size}, 0), # Start of free chain (blist size) pack($StP{$self->byte_size}, 0), # Start of free chain (data size) pack($StP{$self->byte_size}, 0), # Start of free chain (index size) ); - $self->set_trans_loc( $header_fixed + 3 ); - $self->set_chains_loc( $header_fixed + 3 + 4 + 4 * $self->num_txns ); + #XXX Set these less fragilely + $self->set_trans_loc( $header_fixed + 4 ); + $self->set_chains_loc( $header_fixed + 4 + $bl + $STALE_SIZE * ($nt-1) ); return; } @@ -566,7 +1049,7 @@ sub clear_entries { my $buffer = $self->storage->read_at( 0, $header_fixed ); return unless length($buffer); - my ($file_signature, $sig_header, $header_version, $size) = unpack( + my ($file_signature, $sig_header, $file_version, $size) = unpack( 'A4 A N N', $buffer ); @@ -577,39 +1060,60 @@ sub clear_entries { unless ( $sig_header eq SIG_HEADER ) { $self->storage->close; - DBM::Deep->_throw_error( "Old file version found." ); + DBM::Deep->_throw_error( "Pre-1.00 file version found" ); + } + + unless ( $file_version == $this_file_version ) { + $self->storage->close; + DBM::Deep->_throw_error( + "Wrong file version found - " . $file_version . + " - expected " . $this_file_version + ); } my $buffer2 = $self->storage->read_at( undef, $size ); - my @values = unpack( 'C C C', $buffer2 ); + my @values = unpack( 'C C C C', $buffer2 ); - if ( @values != 3 || grep { !defined } @values ) { + if ( @values != 4 || grep { !defined } @values ) { $self->storage->close; DBM::Deep->_throw_error("Corrupted file - bad header"); } - $self->set_trans_loc( $header_fixed + scalar(@values) ); - $self->set_chains_loc( $header_fixed + scalar(@values) + 4 + 4 * $self->num_txns ); - #XXX Add warnings if values weren't set right - @{$self}{qw(byte_size max_buckets num_txns)} = @values; + @{$self}{qw(byte_size max_buckets data_sector_size num_txns)} = @values; - my $header_var = scalar(@values) + 4 + 4 * $self->num_txns + 3 * $self->byte_size; + # These shenangians are to allow a 256 within a C + $self->{max_buckets} += 1; + $self->{data_sector_size} += 1; + + my $bl = $self->txn_bitfield_len; + + my $header_var = scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) + 3 * $self->byte_size; unless ( $size == $header_var ) { $self->storage->close; DBM::Deep->_throw_error( "Unexpected size found ($size <-> $header_var)." ); } + $self->set_trans_loc( $header_fixed + scalar(@values) ); + $self->set_chains_loc( $header_fixed + scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) ); + return length($buffer) + length($buffer2); } } +=head2 _load_sector( $offset ) + +This will instantiate and return the sector object that represents the data found +at $offset. + +=cut + sub _load_sector { my $self = shift; my ($offset) = @_; # Add a catch for offset of 0 or 1 - return if $offset <= 1; + return if !$offset || $offset <= 1; my $type = $self->storage->read_at( $offset, 1 ); return if $type eq chr(0); @@ -658,15 +1162,45 @@ sub _load_sector { DBM::Deep->_throw_error( "'$offset': Don't know what to do with type '$type'" ); } +=head2 _apply_digest( @stuff ) + +This will apply the digest methd (default to Digest::MD5::md5) to the arguments +passed in and return the result. + +=cut + sub _apply_digest { my $self = shift; return $self->{digest}->(@_); } -sub _add_free_blist_sector { shift->_add_free_sector( 0, @_ ) } -sub _add_free_data_sector { shift->_add_free_sector( 1, @_ ) } +=head2 _add_free_blist_sector( $offset, $size ) + +=head2 _add_free_data_sector( $offset, $size ) + +=head2 _add_free_index_sector( $offset, $size ) + +These methods are all wrappers around _add_free_sector(), providing the proper +chain offset ($multiple) for the sector type. + +=cut + +sub _add_free_blist_sector { shift->_add_free_sector( 0, @_ ) } +sub _add_free_data_sector { shift->_add_free_sector( 1, @_ ) } sub _add_free_index_sector { shift->_add_free_sector( 2, @_ ) } +=head2 _add_free_sector( $multiple, $offset, $size ) + +_add_free_sector() takes the offset into the chains location, the offset of the +sector, and the size of that sector. It will mark the sector as a free sector +and put it into the list of sectors that are free of this type for use later. + +This returns nothing. + +B: $size is unused? + +=cut + sub _add_free_sector { my $self = shift; my ($multiple, $offset, $size) = @_; @@ -677,9 +1211,9 @@ sub _add_free_sector { # Increment staleness. # XXX Can this increment+modulo be done by "&= 0x1" ? - my $staleness = unpack( $StP{STALE_SIZE()}, $storage->read_at( $offset + SIG_SIZE, STALE_SIZE ) ); - $staleness = ($staleness + 1 ) % ( 2 ** ( 8 * STALE_SIZE ) ); - $storage->print_at( $offset + SIG_SIZE, pack( $StP{STALE_SIZE()}, $staleness ) ); + my $staleness = unpack( $StP{$STALE_SIZE}, $storage->read_at( $offset + SIG_SIZE, $STALE_SIZE ) ); + $staleness = ($staleness + 1 ) % ( 2 ** ( 8 * $STALE_SIZE ) ); + $storage->print_at( $offset + SIG_SIZE, pack( $StP{$STALE_SIZE}, $staleness ) ); my $old_head = $storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size ); @@ -688,13 +1222,34 @@ sub _add_free_sector { ); # Record the old head in the new sector after the signature and staleness counter - $storage->print_at( $offset + SIG_SIZE + STALE_SIZE, $old_head ); + $storage->print_at( $offset + SIG_SIZE + $STALE_SIZE, $old_head ); } +=head2 _request_blist_sector( $size ) + +=head2 _request_data_sector( $size ) + +=head2 _request_index_sector( $size ) + +These methods are all wrappers around _request_sector(), providing the proper +chain offset ($multiple) for the sector type. + +=cut + sub _request_blist_sector { shift->_request_sector( 0, @_ ) } sub _request_data_sector { shift->_request_sector( 1, @_ ) } sub _request_index_sector { shift->_request_sector( 2, @_ ) } +=head2 _request_sector( $multiple $size ) + +This takes the offset into the chains location and the size of that sector. + +This returns the object with the sector. If there is an available free sector of +that type, then it will be reused. If there isn't one, then a new one will be +allocated. + +=cut + sub _request_sector { my $self = shift; my ($multiple, $size) = @_; @@ -716,1145 +1271,220 @@ sub _request_sector { } # Read the new head after the signature and the staleness counter - my $new_head = $self->storage->read_at( $loc + SIG_SIZE + STALE_SIZE, $self->byte_size ); + my $new_head = $self->storage->read_at( $loc + SIG_SIZE + $STALE_SIZE, $self->byte_size ); $self->storage->print_at( $self->chains_loc + $chains_offset, $new_head ); $self->storage->print_at( - $loc + SIG_SIZE + STALE_SIZE, + $loc + SIG_SIZE + $STALE_SIZE, pack( $StP{$self->byte_size}, 0 ), ); return $loc; } -################################################################################ - -sub storage { $_[0]{storage} } -sub byte_size { $_[0]{byte_size} } -sub hash_size { $_[0]{hash_size} } -sub hash_chars { $_[0]{hash_chars} } -sub num_txns { $_[0]{num_txns} } -sub max_buckets { $_[0]{max_buckets} } -sub blank_md5 { chr(0) x $_[0]->hash_size } - -sub trans_id { $_[0]{trans_id} } -sub set_trans_id { $_[0]{trans_id} = $_[1] } - -sub trans_loc { $_[0]{trans_loc} } -sub set_trans_loc { $_[0]{trans_loc} = $_[1] } - -sub chains_loc { $_[0]{chains_loc} } -sub set_chains_loc { $_[0]{chains_loc} = $_[1] } - -################################################################################ - -package DBM::Deep::Iterator; - -sub new { - my $class = shift; - my ($args) = @_; - - my $self = bless { - breadcrumbs => [], - engine => $args->{engine}, - base_offset => $args->{base_offset}, - }, $class; - - Scalar::Util::weaken( $self->{engine} ); - - return $self; -} - -sub reset { $_[0]{breadcrumbs} = [] } - -sub get_sector_iterator { - my $self = shift; - my ($loc) = @_; - - my $sector = $self->{engine}->_load_sector( $loc ) - or return; - - if ( $sector->isa( 'DBM::Deep::Engine::Sector::Index' ) ) { - return DBM::Deep::Iterator::Index->new({ - iterator => $self, - sector => $sector, - }); - } - elsif ( $sector->isa( 'DBM::Deep::Engine::Sector::BucketList' ) ) { - return DBM::Deep::Iterator::BucketList->new({ - iterator => $self, - sector => $sector, - }); - } - - DBM::Deep->_throw_error( "get_sector_iterator(): Why did $loc make a $sector?" ); -} - -sub get_next_key { - my $self = shift; - my ($obj) = @_; - - my $crumbs = $self->{breadcrumbs}; - my $e = $self->{engine}; - - unless ( @$crumbs ) { - # This will be a Reference sector - my $sector = $e->_load_sector( $self->{base_offset} ) - # If no sector is found, thist must have been deleted from under us. - or return; - - if ( $sector->staleness != $obj->_staleness ) { - return; - } - - my $loc = $sector->get_blist_loc - or return; - - push @$crumbs, $self->get_sector_iterator( $loc ); - } - - FIND_NEXT_KEY: { - # We're at the end. - unless ( @$crumbs ) { - $self->reset; - return; - } - - my $iterator = $crumbs->[-1]; - - # This level is done. - if ( $iterator->at_end ) { - pop @$crumbs; - redo FIND_NEXT_KEY; - } - - if ( $iterator->isa( 'DBM::Deep::Iterator::Index' ) ) { - # If we don't have any more, it will be caught at the - # prior check. - if ( my $next = $iterator->get_next_iterator ) { - push @$crumbs, $next; - } - redo FIND_NEXT_KEY; - } - - unless ( $iterator->isa( 'DBM::Deep::Iterator::BucketList' ) ) { - DBM::Deep->_throw_error( - "Should have a bucketlist iterator here - instead have $iterator" - ); - } - - # At this point, we have a BucketList iterator - my $key = $iterator->get_next_key; - if ( defined $key ) { - return $key; - } - #XXX else { $iterator->set_to_end() } ? - - # We hit the end of the bucketlist iterator, so redo - redo FIND_NEXT_KEY; - } - - DBM::Deep->_throw_error( "get_next_key(): How did we get here?" ); -} - -package DBM::Deep::Iterator::Index; - -sub new { - my $self = bless $_[1] => $_[0]; - $self->{curr_index} = 0; - return $self; -} - -sub at_end { - my $self = shift; - return $self->{curr_index} >= $self->{iterator}{engine}->hash_chars; -} - -sub get_next_iterator { - my $self = shift; - - my $loc; - while ( !$loc ) { - return if $self->at_end; - $loc = $self->{sector}->get_entry( $self->{curr_index}++ ); - } - - return $self->{iterator}->get_sector_iterator( $loc ); -} +=head2 flush() -package DBM::Deep::Iterator::BucketList; +This takes no arguments. It will do everything necessary to flush all things to +disk. This is usually called during unlock() and setup_fh(). -sub new { - my $self = bless $_[1] => $_[0]; - $self->{curr_index} = 0; - return $self; -} +This returns nothing. -sub at_end { - my $self = shift; - return $self->{curr_index} >= $self->{iterator}{engine}->max_buckets; -} +=cut -sub get_next_key { +sub flush { my $self = shift; - return if $self->at_end; - - my $idx = $self->{curr_index}++; - - my $data_loc = $self->{sector}->get_data_location_for({ - allow_head => 1, - idx => $idx, - }) or return; - - #XXX Do we want to add corruption checks here? - return $self->{sector}->get_key_for( $idx )->data; -} - -package DBM::Deep::Engine::Sector; - -sub new { - my $self = bless $_[1], $_[0]; - Scalar::Util::weaken( $self->{engine} ); - $self->_init; - return $self; -} - -#sub _init {} -#sub clone { DBM::Deep->_throw_error( "Must be implemented in the child class" ); } - -sub engine { $_[0]{engine} } -sub offset { $_[0]{offset} } -sub type { $_[0]{type} } - -sub base_size { - my $self = shift; - return $self->engine->SIG_SIZE + $self->engine->STALE_SIZE; + # Why do we need to have the storage flush? Shouldn't autoflush take care of things? + # -RobK, 2008-06-26 + $self->storage->flush; } -sub free { - my $self = shift; +=head2 ACCESSORS - my $e = $self->engine; +The following are readonly attributes. - $e->storage->print_at( $self->offset, $e->SIG_FREE ); - # Skip staleness counter - $e->storage->print_at( $self->offset + $self->base_size, - chr(0) x ($self->size - $self->base_size), - ); +=over 4 - my $free_meth = $self->free_meth; - $e->$free_meth( $self->offset, $self->size ); +=item * storage - return; -} +=item * byte_size -package DBM::Deep::Engine::Sector::Data; +=item * hash_size -our @ISA = qw( DBM::Deep::Engine::Sector ); +=item * hash_chars -# This is in bytes -sub size { return 256 } -sub free_meth { return '_add_free_data_sector' } +=item * num_txns -sub clone { - my $self = shift; - return ref($self)->new({ - engine => $self->engine, - data => $self->data, - type => $self->type, - }); -} +=item * max_buckets -package DBM::Deep::Engine::Sector::Scalar; +=item * blank_md5 -our @ISA = qw( DBM::Deep::Engine::Sector::Data ); +=item * data_sector_size -sub free { - my $self = shift; - - my $chain_loc = $self->chain_loc; +=item * txn_bitfield_len - $self->SUPER::free(); +=back - if ( $chain_loc ) { - $self->engine->_load_sector( $chain_loc )->free; - } +=cut - return; -} +sub storage { $_[0]{storage} } +sub byte_size { $_[0]{byte_size} } +sub hash_size { $_[0]{hash_size} } +sub hash_chars { $_[0]{hash_chars} } +sub num_txns { $_[0]{num_txns} } +sub max_buckets { $_[0]{max_buckets} } +sub blank_md5 { chr(0) x $_[0]->hash_size } +sub data_sector_size { $_[0]{data_sector_size} } -sub type { $_[0]{engine}->SIG_DATA } -sub _init { +# This is a calculated value +sub txn_bitfield_len { my $self = shift; - - my $engine = $self->engine; - - unless ( $self->offset ) { - my $data_section = $self->size - $self->base_size - 1 * $engine->byte_size - 1; - - $self->{offset} = $engine->_request_data_sector( $self->size ); - - my $data = delete $self->{data}; - my $dlen = length $data; - my $continue = 1; - my $curr_offset = $self->offset; - while ( $continue ) { - - my $next_offset = 0; - - my ($leftover, $this_len, $chunk); - if ( $dlen > $data_section ) { - $leftover = 0; - $this_len = $data_section; - $chunk = substr( $data, 0, $this_len ); - - $dlen -= $data_section; - $next_offset = $engine->_request_data_sector( $self->size ); - $data = substr( $data, $this_len ); - } - else { - $leftover = $data_section - $dlen; - $this_len = $dlen; - $chunk = $data; - - $continue = 0; - } - - $engine->storage->print_at( $curr_offset, $self->type ); # Sector type - # Skip staleness - $engine->storage->print_at( $curr_offset + $self->base_size, - pack( $StP{$engine->byte_size}, $next_offset ), # Chain loc - pack( $StP{1}, $this_len ), # Data length - $chunk, # Data to be stored in this sector - chr(0) x $leftover, # Zero-fill the rest - ); - - $curr_offset = $next_offset; + unless ( exists $self->{txn_bitfield_len} ) { + my $temp = ($self->num_txns) / 8; + if ( $temp > int( $temp ) ) { + $temp = int( $temp ) + 1; } - - return; + $self->{txn_bitfield_len} = $temp; } + return $self->{txn_bitfield_len}; } -sub data_length { - my $self = shift; - - my $buffer = $self->engine->storage->read_at( - $self->offset + $self->base_size + $self->engine->byte_size, 1 - ); - - return unpack( $StP{1}, $buffer ); -} - -sub chain_loc { - my $self = shift; - return unpack( - $StP{$self->engine->byte_size}, - $self->engine->storage->read_at( - $self->offset + $self->base_size, - $self->engine->byte_size, - ), - ); -} - -sub data { - my $self = shift; +=pod - my $data; - while ( 1 ) { - my $chain_loc = $self->chain_loc; +The following are read/write attributes. - $data .= $self->engine->storage->read_at( - $self->offset + $self->base_size + $self->engine->byte_size + 1, $self->data_length, - ); +=over 4 - last unless $chain_loc; +=item * trans_id / set_trans_id( $new_id ) - $self = $self->engine->_load_sector( $chain_loc ); - } +=item * trans_loc / set_trans_loc( $new_loc ) - return $data; -} +=item * chains_loc / set_chains_loc( $new_loc ) -package DBM::Deep::Engine::Sector::Null; +=back -our @ISA = qw( DBM::Deep::Engine::Sector::Data ); +=cut -sub type { $_[0]{engine}->SIG_NULL } -sub data_length { 0 } -sub data { return } - -sub _init { - my $self = shift; +sub trans_id { $_[0]{trans_id} } +sub set_trans_id { $_[0]{trans_id} = $_[1] } - my $engine = $self->engine; +sub trans_loc { $_[0]{trans_loc} } +sub set_trans_loc { $_[0]{trans_loc} = $_[1] } - unless ( $self->offset ) { - my $leftover = $self->size - $self->base_size - 1 * $engine->byte_size - 1; +sub chains_loc { $_[0]{chains_loc} } +sub set_chains_loc { $_[0]{chains_loc} = $_[1] } - $self->{offset} = $engine->_request_data_sector( $self->size ); - $engine->storage->print_at( $self->offset, $self->type ); # Sector type - # Skip staleness counter - $engine->storage->print_at( $self->offset + $self->base_size, - pack( $StP{$engine->byte_size}, 0 ), # Chain loc - pack( $StP{1}, $self->data_length ), # Data length - chr(0) x $leftover, # Zero-fill the rest - ); +sub cache { $_[0]{cache} ||= {} } +sub clear_cache { %{$_[0]->cache} = () } - return; - } -} +=head2 _dump_file() -package DBM::Deep::Engine::Sector::Reference; +This method takes no arguments. It's used to print out a textual representation of the DBM::Deep +DB file. It assumes the file is not-corrupted. -our @ISA = qw( DBM::Deep::Engine::Sector::Data ); +=cut -sub _init { +sub _dump_file { my $self = shift; - my $e = $self->engine; - - unless ( $self->offset ) { - my $classname = Scalar::Util::blessed( delete $self->{data} ); - my $leftover = $self->size - $self->base_size - 2 * $e->byte_size; + # Read the header + my $spot = $self->_read_file_header(); - my $class_offset = 0; - if ( defined $classname ) { - my $class_sector = DBM::Deep::Engine::Sector::Scalar->new({ - engine => $e, - data => $classname, - }); - $class_offset = $class_sector->offset; - } - - $self->{offset} = $e->_request_data_sector( $self->size ); - $e->storage->print_at( $self->offset, $self->type ); # Sector type - # Skip staleness counter - $e->storage->print_at( $self->offset + $self->base_size, - pack( $StP{$e->byte_size}, 0 ), # Index/BList loc - pack( $StP{$e->byte_size}, $class_offset ), # Classname loc - chr(0) x $leftover, # Zero-fill the rest - ); - } - else { - $self->{type} = $e->storage->read_at( $self->offset, 1 ); - } - - $self->{staleness} = unpack( - $StP{$e->STALE_SIZE}, - $e->storage->read_at( $self->offset + $e->SIG_SIZE, $e->STALE_SIZE ), + my %types = ( + 0 => 'B', + 1 => 'D', + 2 => 'I', ); - return; -} - -sub free { - my $self = shift; - - my $blist_loc = $self->get_blist_loc; - $self->engine->_load_sector( $blist_loc )->free if $blist_loc; - - my $class_loc = $self->get_class_offset; - $self->engine->_load_sector( $class_loc )->free if $class_loc; - - $self->SUPER::free(); -} - -sub staleness { $_[0]{staleness} } - -sub get_data_for { - my $self = shift; - my ($args) = @_; - - # Assume that the head is not allowed unless otherwise specified. - $args->{allow_head} = 0 unless exists $args->{allow_head}; - - # Assume we don't create a new blist location unless otherwise specified. - $args->{create} = 0 unless exists $args->{create}; + my %sizes = ( + 'D' => $self->data_sector_size, + 'B' => DBM::Deep::Engine::Sector::BucketList->new({engine=>$self,offset=>1})->size, + 'I' => DBM::Deep::Engine::Sector::Index->new({engine=>$self,offset=>1})->size, + ); - my $blist = $self->get_bucket_list({ - key_md5 => $args->{key_md5}, - key => $args->{key}, - create => $args->{create}, - }); - return unless $blist && $blist->{found}; + my $return = ""; - # At this point, $blist knows where the md5 is. What it -doesn't- know yet - # is whether or not this transaction has this key. That's part of the next - # function call. - my $location = $blist->get_data_location_for({ - allow_head => $args->{allow_head}, - }) or return; + # Header values + $return .= "NumTxns: " . $self->num_txns . $/; - return $self->engine->_load_sector( $location ); -} + # Read the free sector chains + my %sectors; + foreach my $multiple ( 0 .. 2 ) { + $return .= "Chains($types{$multiple}):"; + my $old_loc = $self->chains_loc + $multiple * $self->byte_size; + while ( 1 ) { + my $loc = unpack( + $StP{$self->byte_size}, + $self->storage->read_at( $old_loc, $self->byte_size ), + ); -sub write_data { - my $self = shift; - my ($args) = @_; + # We're now out of free sectors of this kind. + unless ( $loc ) { + last; + } - my $blist = $self->get_bucket_list({ - key_md5 => $args->{key_md5}, - key => $args->{key}, - create => 1, - }) or DBM::Deep->_throw_error( "How did write_data fail (no blist)?!" ); - - # Handle any transactional bookkeeping. - if ( $self->engine->trans_id ) { - if ( ! $blist->has_md5 ) { - $blist->mark_deleted({ - trans_id => 0, - }); + $sectors{ $types{$multiple} }{ $loc } = undef; + $old_loc = $loc + SIG_SIZE + $STALE_SIZE; + $return .= " $loc"; } - } - else { - my @trans_ids = $self->engine->get_running_txn_ids; - if ( $blist->has_md5 ) { - if ( @trans_ids ) { - my $old_value = $blist->get_data_for; - foreach my $other_trans_id ( @trans_ids ) { - next if $blist->get_data_location_for({ - trans_id => $other_trans_id, - allow_head => 0, - }); - $blist->write_md5({ - trans_id => $other_trans_id, - key => $args->{key}, - key_md5 => $args->{key_md5}, - value => $old_value->clone, - }); + $return .= $/; + } + + SECTOR: + while ( $spot < $self->storage->{end} ) { + # Read each sector in order. + my $sector = $self->_load_sector( $spot ); + if ( !$sector ) { + # Find it in the free-sectors that were found already + foreach my $type ( keys %sectors ) { + if ( exists $sectors{$type}{$spot} ) { + my $size = $sizes{$type}; + $return .= sprintf "%08d: %s %04d\n", $spot, 'F' . $type, $size; + $spot += $size; + next SECTOR; } } + + die "********\n$return\nDidn't find free sector for $spot in chains\n********\n"; } else { - if ( @trans_ids ) { - foreach my $other_trans_id ( @trans_ids ) { - #XXX This doesn't seem to possible to ever happen . . . - next if $blist->get_data_location_for({ trans_id => $other_trans_id, allow_head => 0 }); - $blist->mark_deleted({ - trans_id => $other_trans_id, - }); - } + $return .= sprintf "%08d: %s %04d", $spot, $sector->type, $sector->size; + if ( $sector->type eq 'D' ) { + $return .= ' ' . $sector->data; } - } - } - - #XXX Is this safe to do transactionally? - # Free the place we're about to write to. - if ( $blist->get_data_location_for({ allow_head => 0 }) ) { - $blist->get_data_for({ allow_head => 0 })->free; - } - - $blist->write_md5({ - key => $args->{key}, - key_md5 => $args->{key_md5}, - value => $args->{value}, - }); -} - -sub delete_key { - my $self = shift; - my ($args) = @_; - - # XXX What should happen if this fails? - my $blist = $self->get_bucket_list({ - key_md5 => $args->{key_md5}, - }) or DBM::Deep->_throw_error( "How did delete_key fail (no blist)?!" ); - - # Save the location so that we can free the data - my $location = $blist->get_data_location_for({ - allow_head => 0, - }); - my $old_value = $location && $self->engine->_load_sector( $location ); - - my @trans_ids = $self->engine->get_running_txn_ids; - - if ( $self->engine->trans_id == 0 ) { - if ( @trans_ids ) { - foreach my $other_trans_id ( @trans_ids ) { - next if $blist->get_data_location_for({ trans_id => $other_trans_id, allow_head => 0 }); - $blist->write_md5({ - trans_id => $other_trans_id, - key => $args->{key}, - key_md5 => $args->{key_md5}, - value => $old_value->clone, - }); + elsif ( $sector->type eq 'A' || $sector->type eq 'H' ) { + $return .= ' REF: ' . $sector->get_refcount; } - } - } - - my $data; - if ( @trans_ids ) { - $blist->mark_deleted( $args ); - - if ( $old_value ) { - $data = $old_value->data; - $old_value->free; - } - } - else { - $data = $blist->delete_md5( $args ); - } - - return $data; -} - -sub get_blist_loc { - my $self = shift; - - my $e = $self->engine; - my $blist_loc = $e->storage->read_at( $self->offset + $self->base_size, $e->byte_size ); - return unpack( $StP{$e->byte_size}, $blist_loc ); -} - -sub get_bucket_list { - my $self = shift; - my ($args) = @_; - $args ||= {}; - - # XXX Add in check here for recycling? - - my $engine = $self->engine; - - my $blist_loc = $self->get_blist_loc; - - # There's no index or blist yet - unless ( $blist_loc ) { - return unless $args->{create}; - - my $blist = DBM::Deep::Engine::Sector::BucketList->new({ - engine => $engine, - key_md5 => $args->{key_md5}, - }); - - $engine->storage->print_at( $self->offset + $self->base_size, - pack( $StP{$engine->byte_size}, $blist->offset ), - ); - - return $blist; - } - - my $sector = $engine->_load_sector( $blist_loc ) - or DBM::Deep->_throw_error( "Cannot read sector at $blist_loc in get_bucket_list()" ); - my $i = 0; - my $last_sector = undef; - while ( $sector->isa( 'DBM::Deep::Engine::Sector::Index' ) ) { - $blist_loc = $sector->get_entry( ord( substr( $args->{key_md5}, $i++, 1 ) ) ); - $last_sector = $sector; - if ( $blist_loc ) { - $sector = $engine->_load_sector( $blist_loc ) - or DBM::Deep->_throw_error( "Cannot read sector at $blist_loc in get_bucket_list()" ); - } - else { - $sector = undef; - last; - } - } - - # This means we went through the Index sector(s) and found an empty slot - unless ( $sector ) { - return unless $args->{create}; - - DBM::Deep->_throw_error( "No last_sector when attempting to build a new entry" ) - unless $last_sector; - - my $blist = DBM::Deep::Engine::Sector::BucketList->new({ - engine => $engine, - key_md5 => $args->{key_md5}, - }); - - $last_sector->set_entry( ord( substr( $args->{key_md5}, $i - 1, 1 ) ) => $blist->offset ); - - return $blist; - } - - $sector->find_md5( $args->{key_md5} ); - - # See whether or not we need to reindex the bucketlist - if ( !$sector->has_md5 && $args->{create} && $sector->{idx} == -1 ) { - my $new_index = DBM::Deep::Engine::Sector::Index->new({ - engine => $engine, - }); - - my %blist_cache; - #XXX q.v. the comments for this function. - foreach my $entry ( $sector->chopped_up ) { - my ($spot, $md5) = @{$entry}; - my $idx = ord( substr( $md5, $i, 1 ) ); - - # XXX This is inefficient - my $blist = $blist_cache{$idx} - ||= DBM::Deep::Engine::Sector::BucketList->new({ - engine => $engine, - }); - - $new_index->set_entry( $idx => $blist->offset ); - - my $new_spot = $blist->write_at_next_open( $md5 ); - $engine->reindex_entry( $spot => $new_spot ); - } - - # Handle the new item separately. - { - my $idx = ord( substr( $args->{key_md5}, $i, 1 ) ); - my $blist = $blist_cache{$idx} - ||= DBM::Deep::Engine::Sector::BucketList->new({ - engine => $engine, - }); - - $new_index->set_entry( $idx => $blist->offset ); - - #XXX THIS IS HACKY! - $blist->find_md5( $args->{key_md5} ); - $blist->write_md5({ - key => $args->{key}, - key_md5 => $args->{key_md5}, - value => DBM::Deep::Engine::Sector::Null->new({ - engine => $engine, - data => undef, - }), - }); - } - - if ( $last_sector ) { - $last_sector->set_entry( - ord( substr( $args->{key_md5}, $i - 1, 1 ) ), - $new_index->offset, - ); - } else { - $engine->storage->print_at( $self->offset + $self->base_size, - pack( $StP{$engine->byte_size}, $new_index->offset ), - ); - } - - $sector->free; - - $sector = $blist_cache{ ord( substr( $args->{key_md5}, $i, 1 ) ) }; - $sector->find_md5( $args->{key_md5} ); - } - - return $sector; -} - -sub get_class_offset { - my $self = shift; - - my $e = $self->engine; - return unpack( - $StP{$e->byte_size}, - $e->storage->read_at( - $self->offset + $self->base_size + 1 * $e->byte_size, $e->byte_size, - ), - ); -} - -sub get_classname { - my $self = shift; - - my $class_offset = $self->get_class_offset; - - return unless $class_offset; - - return $self->engine->_load_sector( $class_offset )->data; -} - -#XXX Add singleton handling here -sub data { - my $self = shift; - - my $new_obj = DBM::Deep->new({ - type => $self->type, - base_offset => $self->offset, - staleness => $self->staleness, - storage => $self->engine->storage, - engine => $self->engine, - }); - - if ( $self->engine->storage->{autobless} ) { - my $classname = $self->get_classname; - if ( defined $classname ) { - bless $new_obj, $classname; - } - } - - return $new_obj; -} - -package DBM::Deep::Engine::Sector::BucketList; - -our @ISA = qw( DBM::Deep::Engine::Sector ); - -sub _init { - my $self = shift; - - my $engine = $self->engine; - - unless ( $self->offset ) { - my $leftover = $self->size - $self->base_size; - - $self->{offset} = $engine->_request_blist_sector( $self->size ); - $engine->storage->print_at( $self->offset, $engine->SIG_BLIST ); # Sector type - # Skip staleness counter - $engine->storage->print_at( $self->offset + $self->base_size, - chr(0) x $leftover, # Zero-fill the data - ); - } - - if ( $self->{key_md5} ) { - $self->find_md5; - } - - return $self; -} - -sub size { - my $self = shift; - unless ( $self->{size} ) { - my $e = $self->engine; - # Base + numbuckets * bucketsize - $self->{size} = $self->base_size + $e->max_buckets * $self->bucket_size; - } - return $self->{size}; -} - -sub free_meth { return '_add_free_blist_sector' } - -sub bucket_size { - my $self = shift; - unless ( $self->{bucket_size} ) { - my $e = $self->engine; - # Key + head (location) + transactions (location + staleness-counter) - my $location_size = $e->byte_size + $e->num_txns * ( $e->byte_size + 4 ); - $self->{bucket_size} = $e->hash_size + $location_size; - } - return $self->{bucket_size}; -} - -# XXX This is such a poor hack. I need to rethink this code. -sub chopped_up { - my $self = shift; - - my $e = $self->engine; - - my @buckets; - foreach my $idx ( 0 .. $e->max_buckets - 1 ) { - my $spot = $self->offset + $self->base_size + $idx * $self->bucket_size; - my $md5 = $e->storage->read_at( $spot, $e->hash_size ); - - #XXX If we're chopping, why would we ever have the blank_md5? - last if $md5 eq $e->blank_md5; - - my $rest = $e->storage->read_at( undef, $self->bucket_size - $e->hash_size ); - push @buckets, [ $spot, $md5 . $rest ]; - } - - return @buckets; -} - -sub write_at_next_open { - my $self = shift; - my ($entry) = @_; - - #XXX This is such a hack! - $self->{_next_open} = 0 unless exists $self->{_next_open}; - - my $spot = $self->offset + $self->base_size + $self->{_next_open}++ * $self->bucket_size; - $self->engine->storage->print_at( $spot, $entry ); - - return $spot; -} - -sub has_md5 { - my $self = shift; - unless ( exists $self->{found} ) { - $self->find_md5; - } - return $self->{found}; -} - -sub find_md5 { - my $self = shift; - - $self->{found} = undef; - $self->{idx} = -1; - - if ( @_ ) { - $self->{key_md5} = shift; - } - - # If we don't have an MD5, then what are we supposed to do? - unless ( exists $self->{key_md5} ) { - DBM::Deep->_throw_error( "Cannot find_md5 without a key_md5 set" ); - } - - my $e = $self->engine; - foreach my $idx ( 0 .. $e->max_buckets - 1 ) { - my $potential = $e->storage->read_at( - $self->offset + $self->base_size + $idx * $self->bucket_size, $e->hash_size, - ); - - if ( $potential eq $e->blank_md5 ) { - $self->{idx} = $idx; - return; - } + elsif ( $sector->type eq 'B' ) { + foreach my $bucket ( $sector->chopped_up ) { + $return .= "\n "; + $return .= sprintf "%08d", unpack($StP{$self->byte_size}, + substr( $bucket->[-1], $self->hash_size, $self->byte_size), + ); + my $l = unpack( $StP{$self->byte_size}, + substr( $bucket->[-1], + $self->hash_size + $self->byte_size, + $self->byte_size, + ), + ); + $return .= sprintf " %08d", $l; + foreach my $txn ( 0 .. $self->num_txns - 2 ) { + my $l = unpack( $StP{$self->byte_size}, + substr( $bucket->[-1], + $self->hash_size + 2 * $self->byte_size + $txn * ($self->byte_size + $STALE_SIZE), + $self->byte_size, + ), + ); + $return .= sprintf " %08d", $l; + } + } + } + $return .= $/; - if ( $potential eq $self->{key_md5} ) { - $self->{found} = 1; - $self->{idx} = $idx; - return; + $spot += $sector->size; } } - return; -} - -sub write_md5 { - my $self = shift; - my ($args) = @_; - - DBM::Deep->_throw_error( "write_md5: no key" ) unless exists $args->{key}; - DBM::Deep->_throw_error( "write_md5: no key_md5" ) unless exists $args->{key_md5}; - DBM::Deep->_throw_error( "write_md5: no value" ) unless exists $args->{value}; - - my $engine = $self->engine; - - $args->{trans_id} = $engine->trans_id unless exists $args->{trans_id}; - - my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size; - $engine->add_entry( $args->{trans_id}, $spot ); - - unless ($self->{found}) { - my $key_sector = DBM::Deep::Engine::Sector::Scalar->new({ - engine => $engine, - data => $args->{key}, - }); - - $engine->storage->print_at( $spot, - $args->{key_md5}, - pack( $StP{$engine->byte_size}, $key_sector->offset ), - ); - } - - my $loc = $spot - + $engine->hash_size - + $engine->byte_size - + $args->{trans_id} * ( $engine->byte_size + 4 ); - - $engine->storage->print_at( $loc, - pack( $StP{$engine->byte_size}, $args->{value}->offset ), - pack( 'N', $engine->get_txn_staleness_counter( $args->{trans_id} ) ), - ); -} - -sub mark_deleted { - my $self = shift; - my ($args) = @_; - $args ||= {}; - - my $engine = $self->engine; - - $args->{trans_id} = $engine->trans_id unless exists $args->{trans_id}; - - my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size; - $engine->add_entry( $args->{trans_id}, $spot ); - - my $loc = $spot - + $engine->hash_size - + $engine->byte_size - + $args->{trans_id} * ( $engine->byte_size + 4 ); - - $engine->storage->print_at( $loc, - pack( $StP{$engine->byte_size}, 1 ), # 1 is the marker for deleted - pack( 'N', $engine->get_txn_staleness_counter( $args->{trans_id} ) ), - ); -} - -sub delete_md5 { - my $self = shift; - my ($args) = @_; - - my $engine = $self->engine; - return undef unless $self->{found}; - - # Save the location so that we can free the data - my $location = $self->get_data_location_for({ - allow_head => 0, - }); - my $key_sector = $self->get_key_for; - - my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size; - $engine->storage->print_at( $spot, - $engine->storage->read_at( - $spot + $self->bucket_size, - $self->bucket_size * ( $engine->max_buckets - $self->{idx} - 1 ), - ), - chr(0) x $self->bucket_size, - ); - - $key_sector->free; - - my $data_sector = $self->engine->_load_sector( $location ); - my $data = $data_sector->data; - $data_sector->free; - - return $data; -} - -sub get_data_location_for { - my $self = shift; - my ($args) = @_; - $args ||= {}; - - $args->{allow_head} = 0 unless exists $args->{allow_head}; - $args->{trans_id} = $self->engine->trans_id unless exists $args->{trans_id}; - $args->{idx} = $self->{idx} unless exists $args->{idx}; - - my $e = $self->engine; - - my $spot = $self->offset + $self->base_size - + $args->{idx} * $self->bucket_size - + $e->hash_size - + $e->byte_size - + $args->{trans_id} * ( $e->byte_size + 4 ); - - my $buffer = $e->storage->read_at( - $spot, - $e->byte_size + 4, - ); - my ($loc, $staleness) = unpack( $StP{$e->byte_size} . ' N', $buffer ); - - # We have found an entry that is old, so get rid of it - if ( $staleness != (my $s = $e->get_txn_staleness_counter( $args->{trans_id} ) ) ) { - $e->storage->print_at( - $spot, - pack( $StP{$e->byte_size} . ' N', (0) x 2 ), - ); - $loc = 0; - } - - # If we're in a transaction and we never wrote to this location, try the - # HEAD instead. - if ( $args->{trans_id} && !$loc && $args->{allow_head} ) { - return $self->get_data_location_for({ - trans_id => 0, - allow_head => 1, - idx => $args->{idx}, - }); - } - return $loc <= 1 ? 0 : $loc; -} - -sub get_data_for { - my $self = shift; - my ($args) = @_; - $args ||= {}; - - return unless $self->{found}; - my $location = $self->get_data_location_for({ - allow_head => $args->{allow_head}, - }); - return $self->engine->_load_sector( $location ); -} - -sub get_key_for { - my $self = shift; - my ($idx) = @_; - $idx = $self->{idx} unless defined $idx; - - if ( $idx >= $self->engine->max_buckets ) { - DBM::Deep->_throw_error( "get_key_for(): Attempting to retrieve $idx" ); - } - - my $location = $self->engine->storage->read_at( - $self->offset + $self->base_size + $idx * $self->bucket_size + $self->engine->hash_size, - $self->engine->byte_size, - ); - $location = unpack( $StP{$self->engine->byte_size}, $location ); - DBM::Deep->_throw_error( "get_key_for: No location?" ) unless $location; - - return $self->engine->_load_sector( $location ); -} - -package DBM::Deep::Engine::Sector::Index; - -our @ISA = qw( DBM::Deep::Engine::Sector ); - -sub _init { - my $self = shift; - - my $engine = $self->engine; - - unless ( $self->offset ) { - my $leftover = $self->size - $self->base_size; - - $self->{offset} = $engine->_request_index_sector( $self->size ); - $engine->storage->print_at( $self->offset, $engine->SIG_INDEX ); # Sector type - # Skip staleness counter - $engine->storage->print_at( $self->offset + $self->base_size, - chr(0) x $leftover, # Zero-fill the rest - ); - } - - return $self; -} - -sub size { - my $self = shift; - unless ( $self->{size} ) { - my $e = $self->engine; - $self->{size} = $self->base_size + $e->byte_size * $e->hash_chars; - } - return $self->{size}; -} - -sub free_meth { return '_add_free_index_sector' } - -sub free { - my $self = shift; - my $e = $self->engine; - - for my $i ( 0 .. $e->hash_chars - 1 ) { - my $l = $self->get_entry( $i ) or next; - $e->_load_sector( $l )->free; - } - - $self->SUPER::free(); -} - -sub _loc_for { - my $self = shift; - my ($idx) = @_; - return $self->offset + $self->base_size + $idx * $self->engine->byte_size; -} - -sub get_entry { - my $self = shift; - my ($idx) = @_; - - my $e = $self->engine; - - DBM::Deep->_throw_error( "get_entry: Out of range ($idx)" ) - if $idx < 0 || $idx >= $e->hash_chars; - - return unpack( - $StP{$e->byte_size}, - $e->storage->read_at( $self->_loc_for( $idx ), $e->byte_size ), - ); -} - -sub set_entry { - my $self = shift; - my ($idx, $loc) = @_; - - my $e = $self->engine; - - DBM::Deep->_throw_error( "set_entry: Out of range ($idx)" ) - if $idx < 0 || $idx >= $e->hash_chars; - - $self->engine->storage->print_at( - $self->_loc_for( $idx ), - pack( $StP{$e->byte_size}, $loc ), - ); + return $return; } 1;