X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=blobdiff_plain;f=lib%2FDBM%2FDeep%2FEngine.pm;h=933027e66dfc1d720a9a12499101a90f45d070da;hb=f0276afb03319e4eb8235ab9e89f54e6b58d7113;hp=81177e2877304bf641edbb146e64037c33bf11da;hpb=d608b06eb358b91f64973067d7ca3be0a59dcba8;p=dbsrgits%2FDBM-Deep.git
diff --git a/lib/DBM/Deep/Engine.pm b/lib/DBM/Deep/Engine.pm
index 81177e2..933027e 100644
--- a/lib/DBM/Deep/Engine.pm
+++ b/lib/DBM/Deep/Engine.pm
@@ -1,921 +1,1490 @@
package DBM::Deep::Engine;
+use 5.006_000;
+
use strict;
+use warnings FATAL => 'all';
+
+# Never import symbols into our namespace. We are a class, not a library.
+# -RobK, 2008-05-27
+use Scalar::Util ();
-use Fcntl qw( :DEFAULT :flock :seek );
+#use Data::Dumper ();
+
+# File-wide notes:
+# * Every method in here assumes that the storage has been appropriately
+# safeguarded. This can be anything from flock() to some sort of manual
+# mutex. But, it's the caller's responsability to make sure that this has
+# been done.
-##
# Setup file and tag signatures. These should never change.
-##
sub SIG_FILE () { 'DPDB' }
-sub SIG_INTERNAL () { 'i' }
+sub SIG_HEADER () { 'h' }
sub SIG_HASH () { 'H' }
sub SIG_ARRAY () { 'A' }
-sub SIG_SCALAR () { 'S' }
sub SIG_NULL () { 'N' }
sub SIG_DATA () { 'D' }
sub SIG_INDEX () { 'I' }
sub SIG_BLIST () { 'B' }
+sub SIG_FREE () { 'F' }
sub SIG_SIZE () { 1 }
-sub precalc_sizes {
- ##
- # Precalculate index, bucket and bucket list sizes
- ##
- my $self = shift;
+use DBM::Deep::Iterator::BucketList ();
+use DBM::Deep::Iterator::Index ();
+use DBM::Deep::Engine::Sector::Data ();
+use DBM::Deep::Engine::Sector::BucketList ();
+use DBM::Deep::Engine::Sector::Index ();
+use DBM::Deep::Engine::Sector::Null ();
+use DBM::Deep::Engine::Sector::Reference ();
+use DBM::Deep::Engine::Sector::Scalar ();
+use DBM::Deep::Null ();
- $self->{index_size} = (2**8) * $self->{long_size};
- $self->{bucket_size} = $self->{hash_size} + $self->{long_size};
- $self->{bucket_list_size} = $self->{max_buckets} * $self->{bucket_size};
+my $STALE_SIZE = 2;
- return 1;
-}
+# Please refer to the pack() documentation for further information
+my %StP = (
+ 1 => 'C', # Unsigned char value (no order needed as it's just one byte)
+ 2 => 'n', # Unsigned short in "network" (big-endian) order
+ 4 => 'N', # Unsigned long in "network" (big-endian) order
+ 8 => 'Q', # Usigned quad (no order specified, presumably machine-dependent)
+);
-sub set_pack {
- ##
- # Set pack/unpack modes (see file header for more)
- ##
- my $self = shift;
- my ($long_s, $long_p, $data_s, $data_p) = @_;
-
- ##
- # Set to 4 and 'N' for 32-bit offset tags (default). Theoretical limit of 4
- # GB per file.
- # (Perl must be compiled with largefile support for files > 2 GB)
- #
- # Set to 8 and 'Q' for 64-bit offsets. Theoretical limit of 16 XB per file.
- # (Perl must be compiled with largefile and 64-bit long support)
- ##
- $self->{long_size} = $long_s ? $long_s : 4;
- $self->{long_pack} = $long_p ? $long_p : 'N';
-
- ##
- # Set to 4 and 'N' for 32-bit data length prefixes. Limit of 4 GB for each
- # key/value. Upgrading this is possible (see above) but probably not necessary.
- # If you need more than 4 GB for a single key or value, this module is really
- # not for you :-)
- ##
- $self->{data_size} = $data_s ? $data_s : 4;
- $self->{data_pack} = $data_p ? $data_p : 'N';
-
- return $self->precalc_sizes();
-}
-
-sub set_digest {
- ##
- # Set key digest function (default is MD5)
- ##
- my $self = shift;
- my ($digest_func, $hash_size) = @_;
+=head1 NAME
- $self->{digest} = $digest_func ? $digest_func : \&Digest::MD5::md5;
- $self->{hash_size} = $hash_size ? $hash_size : 16;
+DBM::Deep::Engine
- return $self->precalc_sizes();
-}
+=head1 PURPOSE
+
+This is an internal-use-only object for L. It mediates the low-level
+mapping between the L objects and the storage medium.
+
+The purpose of this documentation is to provide low-level documentation for
+developers. It is B intended to be used by the general public. This
+documentation and what it documents can and will change without notice.
+
+=head1 OVERVIEW
+
+The engine exposes an API to the DBM::Deep objects (DBM::Deep, DBM::Deep::Array,
+and DBM::Deep::Hash) for their use to access the actual stored values. This API
+is the following:
+
+=over 4
+
+=item * new
+
+=item * read_value
+
+=item * get_classname
+
+=item * make_reference
+
+=item * key_exists
+
+=item * delete_key
+
+=item * write_value
+
+=item * get_next_key
+
+=item * setup_fh
+
+=item * begin_work
+
+=item * commit
+
+=item * rollback
+
+=item * lock_exclusive
+
+=item * lock_shared
+
+=item * unlock
+
+=back
+
+They are explained in their own sections below. These methods, in turn, may
+provide some bounds-checking, but primarily act to instantiate objects in the
+Engine::Sector::* hierarchy and dispatch to them.
+
+=head1 TRANSACTIONS
+
+Transactions in DBM::Deep are implemented using a variant of MVCC. This attempts
+to keep the amount of actual work done against the file low while stil providing
+Atomicity, Consistency, and Isolation. Durability, unfortunately, cannot be done
+with only one file.
+
+=head2 STALENESS
+
+If another process uses a transaction slot and writes stuff to it, then terminates,
+the data that process wrote it still within the file. In order to address this,
+there is also a transaction staleness counter associated within every write.
+Each time a transaction is started, that process increments that transaction's
+staleness counter. If, when it reads a value, the staleness counters aren't
+identical, DBM::Deep will consider the value on disk to be stale and discard it.
+
+=head2 DURABILITY
+
+The fourth leg of ACID is Durability, the guarantee that when a commit returns,
+the data will be there the next time you read from it. This should be regardless
+of any crashes or powerdowns in between the commit and subsequent read. DBM::Deep
+does provide that guarantee; once the commit returns, all of the data has been
+transferred from the transaction shadow to the HEAD. The issue arises with partial
+commits - a commit that is interrupted in some fashion. In keeping with DBM::Deep's
+"tradition" of very light error-checking and non-existent error-handling, there is
+no way to recover from a partial commit. (This is probably a failure in Consistency
+as well as Durability.)
+
+Other DBMSes use transaction logs (a separate file, generally) to achieve Durability.
+As DBM::Deep is a single-file, we would have to do something similar to what SQLite
+and BDB do in terms of committing using synchonized writes. To do this, we would have
+to use a much higher RAM footprint and some serious programming that make my head
+hurts just to think about it.
+
+=head1 EXTERNAL METHODS
+
+=head2 new()
+
+This takes a set of args. These args are described in the documentation for
+L.
+
+=cut
sub new {
my $class = shift;
my ($args) = @_;
+ $args->{storage} = DBM::Deep::File->new( $args )
+ unless exists $args->{storage};
+
my $self = bless {
- long_size => 4,
- long_pack => 'N',
- data_size => 4,
- data_pack => 'N',
-
- digest => \&Digest::MD5::md5,
- hash_size => 16,
-
- ##
- # Maximum number of buckets per list before another level of indexing is done.
- # Increase this value for slightly greater speed, but larger database files.
- # DO NOT decrease this value below 16, due to risk of recursive reindex overrun.
- ##
+ byte_size => 4,
+
+ digest => undef,
+ hash_size => 16, # In bytes
+ hash_chars => 256, # Number of chars the algorithm uses per byte
max_buckets => 16,
+ num_txns => 1, # The HEAD
+ trans_id => 0, # Default to the HEAD
+
+ data_sector_size => 64, # Size in bytes of each data sector
+
+ entries => {}, # This is the list of entries for transactions
+ storage => undef,
}, $class;
- $self->precalc_sizes;
+ # Never allow byte_size to be set directly.
+ delete $args->{byte_size};
+ if ( defined $args->{pack_size} ) {
+ if ( lc $args->{pack_size} eq 'small' ) {
+ $args->{byte_size} = 2;
+ }
+ elsif ( lc $args->{pack_size} eq 'medium' ) {
+ $args->{byte_size} = 4;
+ }
+ elsif ( lc $args->{pack_size} eq 'large' ) {
+ $args->{byte_size} = 8;
+ }
+ else {
+ DBM::Deep->_throw_error( "Unknown pack_size value: '$args->{pack_size}'" );
+ }
+ }
+
+ # Grab the parameters we want to use
+ foreach my $param ( keys %$self ) {
+ next unless exists $args->{$param};
+ $self->{$param} = $args->{$param};
+ }
+
+ my %validations = (
+ max_buckets => { floor => 16, ceil => 256 },
+ num_txns => { floor => 1, ceil => 255 },
+ data_sector_size => { floor => 32, ceil => 256 },
+ );
+
+ while ( my ($attr, $c) = each %validations ) {
+ if ( !defined $self->{$attr}
+ || !length $self->{$attr}
+ || $self->{$attr} =~ /\D/
+ || $self->{$attr} < $c->{floor}
+ ) {
+ $self->{$attr} = '(undef)' if !defined $self->{$attr};
+ warn "Floor of $attr is $c->{floor}. Setting it to $c->{floor} from '$self->{$attr}'\n";
+ $self->{$attr} = $c->{floor};
+ }
+ elsif ( $self->{$attr} > $c->{ceil} ) {
+ warn "Ceiling of $attr is $c->{ceil}. Setting it to $c->{ceil} from '$self->{$attr}'\n";
+ $self->{$attr} = $c->{ceil};
+ }
+ }
+
+ if ( !$self->{digest} ) {
+ require Digest::MD5;
+ $self->{digest} = \&Digest::MD5::md5;
+ }
return $self;
}
-sub setup_fh {
+=head2 read_value( $obj, $key )
+
+This takes an object that provides _base_offset() and a string. It returns the
+value stored in the corresponding Sector::Value's data section.
+
+=cut
+
+sub read_value {
my $self = shift;
- my ($obj) = @_;
+ my ($obj, $key) = @_;
- $self->open( $obj ) if !defined $obj->_fh;
+ # This will be a Reference sector
+ my $sector = $self->_load_sector( $obj->_base_offset )
+ or return;
- #XXX We have to make sure we don't mess up when autoflush isn't turned on
- unless ( $obj->_root->{inode} ) {
- my @stats = stat($obj->_fh);
- $obj->_root->{inode} = $stats[1];
- $obj->_root->{end} = $stats[7];
+ if ( $sector->staleness != $obj->_staleness ) {
+ return;
}
- return 1;
+ my $key_md5 = $self->_apply_digest( $key );
+
+ my $value_sector = $sector->get_data_for({
+ key_md5 => $key_md5,
+ allow_head => 1,
+ });
+
+ unless ( $value_sector ) {
+ $value_sector = DBM::Deep::Engine::Sector::Null->new({
+ engine => $self,
+ data => undef,
+ });
+
+ $sector->write_data({
+ key_md5 => $key_md5,
+ key => $key,
+ value => $value_sector,
+ });
+ }
+
+ return $value_sector->data;
}
-sub open {
- ##
- # Open a fh to the database, create if nonexistent.
- # Make sure file signature matches DBM::Deep spec.
- ##
- my $self = shift;
- my ($obj) = @_;
+=head2 get_classname( $obj )
+
+This takes an object that provides _base_offset() and returns the classname (if any)
+associated with it.
- if (defined($obj->_fh)) { $self->close_fh( $obj ); }
+It delegates to Sector::Reference::get_classname() for the heavy lifting.
- # Theoretically, adding O_BINARY should remove the need for the binmode
- # Of course, testing it is going to be ... interesting.
- my $flags = O_RDWR | O_CREAT | O_BINARY;
+It performs a staleness check.
- my $fh;
- sysopen( $fh, $obj->_root->{file}, $flags )
- or $obj->_throw_error("Cannot sysopen file: " . $obj->_root->{file} . ": $!");
- $obj->_root->{fh} = $fh;
+=cut
- #XXX Can we remove this by using the right sysopen() flags?
- # Maybe ... q.v. above
- binmode $fh; # for win32
+sub get_classname {
+ my $self = shift;
+ my ($obj) = @_;
- if ($obj->_root->{autoflush}) {
- my $old = select $fh;
- $|=1;
- select $old;
+ # This will be a Reference sector
+ my $sector = $self->_load_sector( $obj->_base_offset )
+ or DBM::Deep->_throw_error( "How did get_classname fail (no sector for '$obj')?!" );
+
+ if ( $sector->staleness != $obj->_staleness ) {
+ return;
}
- seek($fh, 0 + $obj->_root->{file_offset}, SEEK_SET);
+ return $sector->get_classname;
+}
- my $signature;
- my $bytes_read = read( $fh, $signature, length(SIG_FILE));
+=head2 make_reference( $obj, $old_key, $new_key )
- ##
- # File is empty -- write signature and master index
- ##
- if (!$bytes_read) {
- seek($fh, 0 + $obj->_root->{file_offset}, SEEK_SET);
- print( $fh SIG_FILE);
+This takes an object that provides _base_offset() and two strings. The
+strings correspond to the old key and new key, respectively. This operation
+is equivalent to (given C<< $db->{foo} = []; >>) C<< $db->{bar} = $db->{foo}; >>.
- $self->create_tag($obj, $obj->_base_offset, $obj->_type, chr(0) x $self->{index_size});
+This returns nothing.
- # Flush the filehandle
- my $old_fh = select $fh;
- my $old_af = $|; $| = 1; $| = $old_af;
- select $old_fh;
+=cut
- return 1;
- }
+sub make_reference {
+ my $self = shift;
+ my ($obj, $old_key, $new_key) = @_;
+
+ # This will be a Reference sector
+ my $sector = $self->_load_sector( $obj->_base_offset )
+ or DBM::Deep->_throw_error( "How did make_reference fail (no sector for '$obj')?!" );
- ##
- # Check signature was valid
- ##
- unless ($signature eq SIG_FILE) {
- $self->close_fh( $obj );
- $obj->_throw_error("Signature not found -- file is not a Deep DB");
+ if ( $sector->staleness != $obj->_staleness ) {
+ return;
}
- ##
- # Get our type from master index signature
- ##
- my $tag = $self->load_tag($obj, $obj->_base_offset)
- or $obj->_throw_error("Corrupted file, no master index record");
+ my $old_md5 = $self->_apply_digest( $old_key );
- unless ($obj->{type} eq $tag->{signature}) {
- $obj->_throw_error("File type mismatch");
+ my $value_sector = $sector->get_data_for({
+ key_md5 => $old_md5,
+ allow_head => 1,
+ });
+
+ unless ( $value_sector ) {
+ $value_sector = DBM::Deep::Engine::Sector::Null->new({
+ engine => $self,
+ data => undef,
+ });
+
+ $sector->write_data({
+ key_md5 => $old_md5,
+ key => $old_key,
+ value => $value_sector,
+ });
}
-#XXX We probably also want to store the hash algorithm name and not assume anything
-#XXX The cool thing would be to allow a different hashing algorithm at every level
+ if ( $value_sector->isa( 'DBM::Deep::Engine::Sector::Reference' ) ) {
+ $sector->write_data({
+ key => $new_key,
+ key_md5 => $self->_apply_digest( $new_key ),
+ value => $value_sector,
+ });
+ $value_sector->increment_refcount;
+ }
+ else {
+ $sector->write_data({
+ key => $new_key,
+ key_md5 => $self->_apply_digest( $new_key ),
+ value => $value_sector->clone,
+ });
+ }
- return 1;
+ return;
}
-sub close_fh {
- my $self = shift;
- my ($obj) = @_;
+=head2 key_exists( $obj, $key )
- if ( my $fh = $obj->_root->{fh} ) {
- close $fh;
- }
- $obj->_root->{fh} = undef;
+This takes an object that provides _base_offset() and a string for
+the key to be checked. This returns 1 for true and "" for false.
- return 1;
-}
+=cut
-sub create_tag {
- ##
- # Given offset, signature and content, create tag and write to disk
- ##
+sub key_exists {
my $self = shift;
- my ($obj, $offset, $sig, $content) = @_;
- my $size = length($content);
-
- my $fh = $obj->_fh;
+ my ($obj, $key) = @_;
- seek($fh, $offset + $obj->_root->{file_offset}, SEEK_SET);
- print( $fh $sig . pack($self->{data_pack}, $size) . $content );
+ # This will be a Reference sector
+ my $sector = $self->_load_sector( $obj->_base_offset )
+ or return '';
- if ($offset == $obj->_root->{end}) {
- $obj->_root->{end} += SIG_SIZE + $self->{data_size} + $size;
+ if ( $sector->staleness != $obj->_staleness ) {
+ return '';
}
- return {
- signature => $sig,
- size => $size,
- offset => $offset + SIG_SIZE + $self->{data_size},
- content => $content
- };
+ my $data = $sector->get_data_for({
+ key_md5 => $self->_apply_digest( $key ),
+ allow_head => 1,
+ });
+
+ # exists() returns 1 or '' for true/false.
+ return $data ? 1 : '';
}
-sub load_tag {
- ##
- # Given offset, load single tag and return signature, size and data
- ##
- my $self = shift;
- my ($obj, $offset) = @_;
+=head2 delete_key( $obj, $key )
- my $fh = $obj->_fh;
+This takes an object that provides _base_offset() and a string for
+the key to be deleted. This returns the result of the Sector::Reference
+delete_key() method.
- seek($fh, $offset + $obj->_root->{file_offset}, SEEK_SET);
+=cut
- #XXX I'm not sure this check will work if autoflush isn't enabled ...
- return if eof $fh;
+sub delete_key {
+ my $self = shift;
+ my ($obj, $key) = @_;
- my $b;
- read( $fh, $b, SIG_SIZE + $self->{data_size} );
- my ($sig, $size) = unpack( "A $self->{data_pack}", $b );
+ my $sector = $self->_load_sector( $obj->_base_offset )
+ or return;
- my $buffer;
- read( $fh, $buffer, $size);
+ if ( $sector->staleness != $obj->_staleness ) {
+ return;
+ }
- return {
- signature => $sig,
- size => $size,
- offset => $offset + SIG_SIZE + $self->{data_size},
- content => $buffer
- };
+ return $sector->delete_key({
+ key_md5 => $self->_apply_digest( $key ),
+ allow_head => 0,
+ });
}
-sub add_bucket {
- ##
- # Adds one key/value pair to bucket list, given offset, MD5 digest of key,
- # plain (undigested) key and value.
- ##
- my $self = shift;
- my ($obj, $tag, $md5, $plain_key, $value) = @_;
+=head2 write_value( $obj, $key, $value )
- my $location = 0;
- my $result = 2;
+This takes an object that provides _base_offset(), a string for the
+key, and a value. This value can be anything storable within L.
- my $root = $obj->_root;
+This returns 1 upon success.
- my $is_dbm_deep = eval { local $SIG{'__DIE__'}; $value->isa( 'DBM::Deep' ) };
- my $internal_ref = $is_dbm_deep && ($value->_root eq $root);
+=cut
- my $fh = $obj->_fh;
+sub write_value {
+ my $self = shift;
+ my ($obj, $key, $value) = @_;
+ my $r = Scalar::Util::reftype( $value ) || '';
{
- my ($subloc, $offset) = $self->_find_in_buckets( $tag, $md5 );
-
- # Updating a known md5
- if ( $subloc ) {
- $result = 1;
-
- seek($fh, $subloc + SIG_SIZE + $root->{file_offset}, SEEK_SET);
- my $size;
- read( $fh, $size, $self->{data_size});
- $size = unpack($self->{data_pack}, $size);
-
- ##
- # If value is a hash, array, or raw value with equal or less size, we can
- # reuse the same content area of the database. Otherwise, we have to create
- # a new content area at the EOF.
- ##
- my $actual_length;
- if ( $internal_ref ) {
- $actual_length = $self->{long_size};
- }
- else {
- my $r = Scalar::Util::reftype( $value ) || '';
- if ( $r eq 'HASH' || $r eq 'ARRAY' ) {
- $actual_length = $self->{index_size};
-
- # if autobless is enabled, must also take into consideration
- # the class name, as it is stored along with key/value.
- if ( $root->{autobless} ) {
- my $value_class = Scalar::Util::blessed($value);
- if ( defined $value_class && !$value->isa('DBM::Deep') ) {
- $actual_length += length($value_class);
- }
- }
- }
- else { $actual_length = length($value); }
- }
+ last if $r eq '';
+ last if $r eq 'HASH';
+ last if $r eq 'ARRAY';
- if ($actual_length <= $size) {
- $location = $subloc;
- }
- else {
- $location = $root->{end};
- seek(
- $fh,
- $tag->{offset} + $offset + $self->{hash_size} + $root->{file_offset},
- SEEK_SET,
- );
- print( $fh pack($self->{long_pack}, $location) );
- }
- }
- # Adding a new md5
- elsif ( defined $offset ) {
- $result = 2;
- $location = $root->{end};
+ DBM::Deep->_throw_error(
+ "Storage of references of type '$r' is not supported."
+ );
+ }
- seek( $fh, $tag->{offset} + $offset + $root->{file_offset}, SEEK_SET );
- print( $fh $md5 . pack($self->{long_pack}, $location) );
- }
- # If bucket didn't fit into list, split into a new index level
- else {
- $self->split_index( $obj, $md5, $tag );
+ # This will be a Reference sector
+ my $sector = $self->_load_sector( $obj->_base_offset )
+ or DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep." );
- $location = $root->{end};
- }
+ if ( $sector->staleness != $obj->_staleness ) {
+ DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep." );
}
- ##
- # Seek to content area and store signature, value and plaintext key
- ##
- if ($location) {
- seek($fh, $location + $root->{file_offset}, SEEK_SET);
-
- ##
- # Write signature based on content type, set content length and write
- # actual value.
- ##
- my $r = Scalar::Util::reftype($value) || '';
- my $content_length;
- if ( $internal_ref ) {
- print( $fh SIG_INTERNAL );
- print( $fh pack($self->{data_pack}, $self->{long_size}) );
- print( $fh pack($self->{long_pack}, $value->_base_offset) );
- $content_length = $self->{long_size};
+ my ($class, $type);
+ if ( !defined $value ) {
+ $class = 'DBM::Deep::Engine::Sector::Null';
+ }
+ elsif ( $r eq 'ARRAY' || $r eq 'HASH' ) {
+ my $tmpvar;
+ if ( $r eq 'ARRAY' ) {
+ $tmpvar = tied @$value;
+ } elsif ( $r eq 'HASH' ) {
+ $tmpvar = tied %$value;
}
- else {
- if ($r eq 'HASH') {
- print( $fh SIG_HASH );
- print( $fh pack($self->{data_pack}, $self->{index_size}) . chr(0) x $self->{index_size} );
- $content_length = $self->{index_size};
- }
- elsif ($r eq 'ARRAY') {
- print( $fh SIG_ARRAY );
- print( $fh pack($self->{data_pack}, $self->{index_size}) . chr(0) x $self->{index_size} );
- $content_length = $self->{index_size};
- }
- elsif (!defined($value)) {
- print( $fh SIG_NULL );
- print( $fh pack($self->{data_pack}, 0) );
- $content_length = 0;
- }
- else {
- print( $fh SIG_DATA );
- print( $fh pack($self->{data_pack}, length($value)) . $value );
- $content_length = length($value);
+
+ if ( $tmpvar ) {
+ my $is_dbm_deep = eval { local $SIG{'__DIE__'}; $tmpvar->isa( 'DBM::Deep' ); };
+
+ unless ( $is_dbm_deep ) {
+ DBM::Deep->_throw_error( "Cannot store something that is tied." );
}
- }
- ##
- # Plain key is stored AFTER value, as keys are typically fetched less often.
- ##
- print( $fh pack($self->{data_pack}, length($plain_key)) . $plain_key );
-
- ##
- # If value is blessed, preserve class name
- ##
- if ( $root->{autobless} ) {
- my $value_class = Scalar::Util::blessed($value);
- if ( defined $value_class && !$value->isa( 'DBM::Deep' ) ) {
- ##
- # Blessed ref -- will restore later
- ##
- print( $fh chr(1) );
- print( $fh pack($self->{data_pack}, length($value_class)) . $value_class );
- $content_length += 1;
- $content_length += $self->{data_size} + length($value_class);
+ unless ( $tmpvar->_engine->storage == $self->storage ) {
+ DBM::Deep->_throw_error( "Cannot store values across DBM::Deep files. Please use export() instead." );
}
- else {
- print( $fh chr(0) );
- $content_length += 1;
+
+ # First, verify if we're storing the same thing to this spot. If we are, then
+ # this should be a no-op. -EJS, 2008-05-19
+ my $loc = $sector->get_data_location_for({
+ key_md5 => $self->_apply_digest( $key ),
+ allow_head => 1,
+ });
+
+ if ( defined($loc) && $loc == $tmpvar->_base_offset ) {
+ return 1;
}
- }
- ##
- # If this is a new content area, advance EOF counter
- ##
- if ($location == $root->{end}) {
- $root->{end} += SIG_SIZE;
- $root->{end} += $self->{data_size} + $content_length;
- $root->{end} += $self->{data_size} + length($plain_key);
+ #XXX Can this use $loc?
+ my $value_sector = $self->_load_sector( $tmpvar->_base_offset );
+ $sector->write_data({
+ key => $key,
+ key_md5 => $self->_apply_digest( $key ),
+ value => $value_sector,
+ });
+ $value_sector->increment_refcount;
+
+ return 1;
}
- ##
- # If content is a hash or array, create new child DBM::Deep object and
- # pass each key or element to it.
- ##
- if ( ! $internal_ref ) {
- if ($r eq 'HASH') {
- my $branch = DBM::Deep->new(
- type => DBM::Deep->TYPE_HASH,
- base_offset => $location,
- root => $root,
- );
- foreach my $key (keys %{$value}) {
- $branch->STORE( $key, $value->{$key} );
- }
- }
- elsif ($r eq 'ARRAY') {
- my $branch = DBM::Deep->new(
- type => DBM::Deep->TYPE_ARRAY,
- base_offset => $location,
- root => $root,
- );
- my $index = 0;
- foreach my $element (@{$value}) {
- $branch->STORE( $index, $element );
- $index++;
- }
- }
+ $class = 'DBM::Deep::Engine::Sector::Reference';
+ $type = substr( $r, 0, 1 );
+ }
+ else {
+ if ( tied($value) ) {
+ DBM::Deep->_throw_error( "Cannot store something that is tied." );
}
+ $class = 'DBM::Deep::Engine::Sector::Scalar';
+ }
- return $result;
+ # Create this after loading the reference sector in case something bad happens.
+ # This way, we won't allocate value sector(s) needlessly.
+ my $value_sector = $class->new({
+ engine => $self,
+ data => $value,
+ type => $type,
+ });
+
+ $sector->write_data({
+ key => $key,
+ key_md5 => $self->_apply_digest( $key ),
+ value => $value_sector,
+ });
+
+ # This code is to make sure we write all the values in the $value to the disk
+ # and to make sure all changes to $value after the assignment are reflected
+ # on disk. This may be counter-intuitive at first, but it is correct dwimmery.
+ # NOTE - simply tying $value won't perform a STORE on each value. Hence, the
+ # copy to a temp value.
+ if ( $r eq 'ARRAY' ) {
+ my @temp = @$value;
+ tie @$value, 'DBM::Deep', {
+ base_offset => $value_sector->offset,
+ staleness => $value_sector->staleness,
+ storage => $self->storage,
+ engine => $self,
+ };
+ @$value = @temp;
+ bless $value, 'DBM::Deep::Array' unless Scalar::Util::blessed( $value );
+ }
+ elsif ( $r eq 'HASH' ) {
+ my %temp = %$value;
+ tie %$value, 'DBM::Deep', {
+ base_offset => $value_sector->offset,
+ staleness => $value_sector->staleness,
+ storage => $self->storage,
+ engine => $self,
+ };
+
+ %$value = %temp;
+ bless $value, 'DBM::Deep::Hash' unless Scalar::Util::blessed( $value );
}
- $obj->_throw_error("Fatal error: indexing failed -- possibly due to corruption in file");
+ return 1;
}
-sub split_index {
- my $self = shift;
- my ($obj, $md5, $tag) = @_;
+=head2 get_next_key( $obj, $prev_key )
- my $fh = $obj->_fh;
- my $root = $obj->_root;
- my $keys = $tag->{content};
+This takes an object that provides _base_offset() and an optional string
+representing the prior key returned via a prior invocation of this method.
- seek($fh, $tag->{ref_loc} + $root->{file_offset}, SEEK_SET);
- print( $fh pack($self->{long_pack}, $root->{end}) );
+This method delegates to C<< DBM::Deep::Iterator->get_next_key() >>.
- my $index_tag = $self->create_tag(
- $obj,
- $root->{end},
- SIG_INDEX,
- chr(0) x $self->{index_size},
- );
+=cut
- my @offsets = ();
+# XXX Add staleness here
+sub get_next_key {
+ my $self = shift;
+ my ($obj, $prev_key) = @_;
+
+ # XXX Need to add logic about resetting the iterator if any key in the reference has changed
+ unless ( $prev_key ) {
+ $obj->{iterator} = DBM::Deep::Iterator->new({
+ base_offset => $obj->_base_offset,
+ engine => $self,
+ });
+ }
- $keys .= $md5 . pack($self->{long_pack}, 0);
+ return $obj->{iterator}->get_next_key( $obj );
+}
- BUCKET:
- for (my $i = 0; $i <= $self->{max_buckets}; $i++) {
- my ($key, $old_subloc) = $self->_get_key_subloc( $keys, $i );
+=head2 setup_fh( $obj )
- next BUCKET unless $key;
+This takes an object that provides _base_offset(). It will do everything needed
+in order to properly initialize all values for necessary functioning. If this is
+called upon an already initialized object, this will also reset the inode.
- my $num = ord(substr($key, $tag->{ch} + 1, 1));
+This returns 1.
- if ($offsets[$num]) {
- my $offset = $offsets[$num] + SIG_SIZE + $self->{data_size};
- seek($fh, $offset + $root->{file_offset}, SEEK_SET);
- my $subkeys;
- read( $fh, $subkeys, $self->{bucket_list_size});
+=cut
- for (my $k=0; $k<$self->{max_buckets}; $k++) {
- my ($temp, $subloc) = $self->_get_key_subloc( $subkeys, $k );
+sub setup_fh {
+ my $self = shift;
+ my ($obj) = @_;
- if (!$subloc) {
- seek($fh, $offset + ($k * $self->{bucket_size}) + $root->{file_offset}, SEEK_SET);
- print( $fh $key . pack($self->{long_pack}, $old_subloc || $root->{end}) );
- last;
- }
- } # k loop
+ # We're opening the file.
+ unless ( $obj->_base_offset ) {
+ my $bytes_read = $self->_read_file_header;
+
+ # Creating a new file
+ unless ( $bytes_read ) {
+ $self->_write_file_header;
+
+ # 1) Create Array/Hash entry
+ my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
+ engine => $self,
+ type => $obj->_type,
+ });
+ $obj->{base_offset} = $initial_reference->offset;
+ $obj->{staleness} = $initial_reference->staleness;
+
+ $self->storage->flush;
}
+ # Reading from an existing file
else {
- $offsets[$num] = $root->{end};
- seek($fh, $index_tag->{offset} + ($num * $self->{long_size}) + $root->{file_offset}, SEEK_SET);
- print( $fh pack($self->{long_pack}, $root->{end}) );
+ $obj->{base_offset} = $bytes_read;
+ my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
+ engine => $self,
+ offset => $obj->_base_offset,
+ });
+ unless ( $initial_reference ) {
+ DBM::Deep->_throw_error("Corrupted file, no master index record");
+ }
- my $blist_tag = $self->create_tag($obj, $root->{end}, SIG_BLIST, chr(0) x $self->{bucket_list_size});
+ unless ($obj->_type eq $initial_reference->type) {
+ DBM::Deep->_throw_error("File type mismatch");
+ }
- seek($fh, $blist_tag->{offset} + $root->{file_offset}, SEEK_SET);
- print( $fh $key . pack($self->{long_pack}, $old_subloc || $root->{end}) );
+ $obj->{staleness} = $initial_reference->staleness;
}
- } # i loop
+ }
- return;
+ $self->storage->set_inode;
+
+ return 1;
}
-sub read_from_loc {
- my $self = shift;
- my ($obj, $subloc) = @_;
-
- my $fh = $obj->_fh;
-
- ##
- # Found match -- seek to offset and read signature
- ##
- my $signature;
- seek($fh, $subloc + $obj->_root->{file_offset}, SEEK_SET);
- read( $fh, $signature, SIG_SIZE);
-
- ##
- # If value is a hash or array, return new DBM::Deep object with correct offset
- ##
- if (($signature eq SIG_HASH) || ($signature eq SIG_ARRAY)) {
- my $obj = DBM::Deep->new(
- type => $signature,
- base_offset => $subloc,
- root => $obj->_root,
- );
+=head2 begin_work( $obj )
- if ($obj->_root->{autobless}) {
- ##
- # Skip over value and plain key to see if object needs
- # to be re-blessed
- ##
- seek($fh, $self->{data_size} + $self->{index_size}, SEEK_CUR);
-
- my $size;
- read( $fh, $size, $self->{data_size}); $size = unpack($self->{data_pack}, $size);
- if ($size) { seek($fh, $size, SEEK_CUR); }
-
- my $bless_bit;
- read( $fh, $bless_bit, 1);
- if (ord($bless_bit)) {
- ##
- # Yes, object needs to be re-blessed
- ##
- my $class_name;
- read( $fh, $size, $self->{data_size}); $size = unpack($self->{data_pack}, $size);
- if ($size) { read( $fh, $class_name, $size); }
- if ($class_name) { $obj = bless( $obj, $class_name ); }
- }
- }
+This takes an object that provides _base_offset(). It will set up all necessary
+bookkeeping in order to run all work within a transaction.
+
+If $obj is already within a transaction, an error wiill be thrown. If there are
+no more available transactions, an error will be thrown.
- return $obj;
+This returns undef.
+
+=cut
+
+sub begin_work {
+ my $self = shift;
+ my ($obj) = @_;
+
+ if ( $self->trans_id ) {
+ DBM::Deep->_throw_error( "Cannot begin_work within an active transaction" );
}
- elsif ( $signature eq SIG_INTERNAL ) {
- my $size;
- read( $fh, $size, $self->{data_size});
- $size = unpack($self->{data_pack}, $size);
- if ( $size ) {
- my $new_loc;
- read( $fh, $new_loc, $size );
- $new_loc = unpack( $self->{long_pack}, $new_loc );
+ my @slots = $self->read_txn_slots;
+ my $found;
+ for my $i ( 0 .. $#slots ) {
+ next if $slots[$i];
- return $self->read_from_loc( $obj, $new_loc );
- }
- else {
- return;
- }
+ $slots[$i] = 1;
+ $self->set_trans_id( $i + 1 );
+ $found = 1;
+ last;
}
- ##
- # Otherwise return actual value
- ##
- elsif ($signature eq SIG_DATA) {
- my $size;
- read( $fh, $size, $self->{data_size});
- $size = unpack($self->{data_pack}, $size);
+ unless ( $found ) {
+ DBM::Deep->_throw_error( "Cannot allocate transaction ID" );
+ }
+ $self->write_txn_slots( @slots );
- my $value = '';
- if ($size) { read( $fh, $value, $size); }
- return $value;
+ if ( !$self->trans_id ) {
+ DBM::Deep->_throw_error( "Cannot begin_work - no available transactions" );
}
- ##
- # Key exists, but content is null
- ##
return;
}
-sub get_bucket_value {
- ##
- # Fetch single value given tag and MD5 digested key.
- ##
+=head2 rollback( $obj )
+
+This takes an object that provides _base_offset(). It will revert all
+actions taken within the running transaction.
+
+If $obj is not within a transaction, an error will be thrown.
+
+This returns 1.
+
+=cut
+
+sub rollback {
my $self = shift;
- my ($obj, $tag, $md5) = @_;
- my $keys = $tag->{content};
-
- ##
- # Iterate through buckets, looking for a key match
- ##
- BUCKET:
- for (my $i = 0; $i < $self->{max_buckets}; $i++) {
- my ($key, $subloc) = $self->_get_key_subloc( $keys, $i );
-
- if (!$subloc) {
- ##
- # Hit end of list, no match
- ##
- return;
- }
+ my ($obj) = @_;
- if ( $md5 ne $key ) {
- next BUCKET;
+ if ( !$self->trans_id ) {
+ DBM::Deep->_throw_error( "Cannot rollback without an active transaction" );
+ }
+
+ # Each entry is the file location for a bucket that has a modification for
+ # this transaction. The entries need to be expunged.
+ foreach my $entry (@{ $self->get_entries } ) {
+ # Remove the entry here
+ my $read_loc = $entry
+ + $self->hash_size
+ + $self->byte_size
+ + $self->byte_size
+ + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE );
+
+ my $data_loc = $self->storage->read_at( $read_loc, $self->byte_size );
+ $data_loc = unpack( $StP{$self->byte_size}, $data_loc );
+ $self->storage->print_at( $read_loc, pack( $StP{$self->byte_size}, 0 ) );
+
+ if ( $data_loc > 1 ) {
+ $self->_load_sector( $data_loc )->free;
}
+ }
- return $self->read_from_loc( $obj, $subloc );
- } # i loop
+ $self->clear_entries;
- return;
+ my @slots = $self->read_txn_slots;
+ $slots[$self->trans_id-1] = 0;
+ $self->write_txn_slots( @slots );
+ $self->inc_txn_staleness_counter( $self->trans_id );
+ $self->set_trans_id( 0 );
+
+ return 1;
}
-sub delete_bucket {
- ##
- # Delete single key/value pair given tag and MD5 digested key.
- ##
+=head2 commit( $obj )
+
+This takes an object that provides _base_offset(). It will apply all
+actions taken within the transaction to the HEAD.
+
+If $obj is not within a transaction, an error will be thrown.
+
+This returns 1.
+
+=cut
+
+sub commit {
my $self = shift;
- my ($obj, $tag, $md5) = @_;
- my $keys = $tag->{content};
-
- my $fh = $obj->_fh;
-
- ##
- # Iterate through buckets, looking for a key match
- ##
- BUCKET:
- for (my $i=0; $i<$self->{max_buckets}; $i++) {
- my ($key, $subloc) = $self->_get_key_subloc( $keys, $i );
-
- if (!$subloc) {
- ##
- # Hit end of list, no match
- ##
- return;
- }
+ my ($obj) = @_;
+
+ if ( !$self->trans_id ) {
+ DBM::Deep->_throw_error( "Cannot commit without an active transaction" );
+ }
- if ( $md5 ne $key ) {
- next BUCKET;
+ foreach my $entry (@{ $self->get_entries } ) {
+ # Overwrite the entry in head with the entry in trans_id
+ my $base = $entry
+ + $self->hash_size
+ + $self->byte_size;
+
+ my $head_loc = $self->storage->read_at( $base, $self->byte_size );
+ $head_loc = unpack( $StP{$self->byte_size}, $head_loc );
+
+ my $spot = $base + $self->byte_size + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE );
+ my $trans_loc = $self->storage->read_at(
+ $spot, $self->byte_size,
+ );
+
+ $self->storage->print_at( $base, $trans_loc );
+ $self->storage->print_at(
+ $spot,
+ pack( $StP{$self->byte_size} . ' ' . $StP{$STALE_SIZE}, (0) x 2 ),
+ );
+
+ if ( $head_loc > 1 ) {
+ $self->_load_sector( $head_loc )->free;
}
+ }
- ##
- # Matched key -- delete bucket and return
- ##
- seek($fh, $tag->{offset} + ($i * $self->{bucket_size}) + $obj->_root->{file_offset}, SEEK_SET);
- print( $fh substr($keys, ($i+1) * $self->{bucket_size} ) );
- print( $fh chr(0) x $self->{bucket_size} );
+ $self->clear_entries;
- return 1;
- } # i loop
+ my @slots = $self->read_txn_slots;
+ $slots[$self->trans_id-1] = 0;
+ $self->write_txn_slots( @slots );
+ $self->inc_txn_staleness_counter( $self->trans_id );
+ $self->set_trans_id( 0 );
- return;
+ return 1;
}
-sub bucket_exists {
- ##
- # Check existence of single key given tag and MD5 digested key.
- ##
+=head2 lock_exclusive()
+
+This takes an object that provides _base_offset(). It will guarantee that
+the storage has taken precautions to be safe for a write.
+
+This returns nothing.
+
+=cut
+
+sub lock_exclusive {
my $self = shift;
- my ($obj, $tag, $md5) = @_;
- my $keys = $tag->{content};
-
- ##
- # Iterate through buckets, looking for a key match
- ##
- BUCKET:
- for (my $i=0; $i<$self->{max_buckets}; $i++) {
- my ($key, $subloc) = $self->_get_key_subloc( $keys, $i );
-
- if (!$subloc) {
- ##
- # Hit end of list, no match
- ##
- return;
- }
+ my ($obj) = @_;
+ return $self->storage->lock_exclusive( $obj );
+}
- if ( $md5 ne $key ) {
- next BUCKET;
- }
+=head2 lock_shared()
- ##
- # Matched key -- return true
- ##
- return 1;
- } # i loop
+This takes an object that provides _base_offset(). It will guarantee that
+the storage has taken precautions to be safe for a read.
- return;
+This returns nothing.
+
+=cut
+
+sub lock_shared {
+ my $self = shift;
+ my ($obj) = @_;
+ return $self->storage->lock_shared( $obj );
}
-sub find_bucket_list {
- ##
- # Locate offset for bucket list, given digested key
- ##
+=head2 unlock()
+
+This takes an object that provides _base_offset(). It will guarantee that
+the storage has released all locks taken.
+
+This returns nothing.
+
+=cut
+
+sub unlock {
my $self = shift;
- my ($obj, $md5, $args) = @_;
- $args = {} unless $args;
+ my ($obj) = @_;
- ##
- # Locate offset for bucket list using digest index system
- ##
- my $tag = $self->load_tag($obj, $obj->_base_offset)
- or $self->_throw_error( "INTERNAL ERROR - Cannot find tag" );
+ my $rv = $self->storage->unlock( $obj );
- my $ch = 0;
- while ($tag->{signature} ne SIG_BLIST) {
- my $num = ord substr($md5, $ch, 1);
+ $self->flush if $rv;
- my $ref_loc = $tag->{offset} + ($num * $self->{long_size});
- $tag = $self->index_lookup( $obj, $tag, $num );
+ return $rv;
+}
- if (!$tag) {
- if ( $args->{create} ) {
- my $fh = $obj->_fh;
- seek($fh, $ref_loc + $obj->_root->{file_offset}, SEEK_SET);
- print( $fh pack($self->{long_pack}, $obj->_root->{end}) );
+=head1 INTERNAL METHODS
- $tag = $self->create_tag(
- $obj, $obj->_root->{end},
- SIG_BLIST,
- chr(0) x $self->{bucket_list_size},
- );
+The following methods are internal-use-only to DBM::Deep::Engine.
- $tag->{ref_loc} = $ref_loc;
- $tag->{ch} = $ch;
+=cut
- last;
- }
- else {
- return;
- }
- }
+=head2 read_txn_slots()
- $tag->{ch} = $ch;
- $tag->{ref_loc} = $ref_loc;
+This takes no arguments.
- $ch++;
- }
+This will return an array with a 1 or 0 in each slot. Each spot represents one
+available transaction. If the slot is 1, that transaction is taken. If it is 0,
+the transaction is available.
+
+=cut
+
+sub read_txn_slots {
+ my $self = shift;
+ my $bl = $self->txn_bitfield_len;
+ my $num_bits = $bl * 8;
+ return split '', unpack( 'b'.$num_bits,
+ $self->storage->read_at(
+ $self->trans_loc, $bl,
+ )
+ );
+}
+
+=head2 write_txn_slots( @slots )
+
+This takes an array of 1's and 0's. This array represents the transaction slots
+returned by L. In other words, the following is true:
+
+ @x = read_txn_slots( write_txn_slots( @x ) );
+
+(With the obviously missing object referents added back in.)
+
+=cut
+
+sub write_txn_slots {
+ my $self = shift;
+ my $num_bits = $self->txn_bitfield_len * 8;
+ $self->storage->print_at( $self->trans_loc,
+ pack( 'b'.$num_bits, join('', @_) ),
+ );
+}
+
+=head2 get_running_txn_ids()
+
+This takes no arguments.
+
+This will return an array of taken transaction IDs. This wraps L.
+
+=cut
- return $tag;
+sub get_running_txn_ids {
+ my $self = shift;
+ my @transactions = $self->read_txn_slots;
+ my @trans_ids = map { $_+1 } grep { $transactions[$_] } 0 .. $#transactions;
}
-sub index_lookup {
- ##
- # Given index tag, lookup single entry in index and return .
- ##
+=head2 get_txn_staleness_counter( $trans_id )
+
+This will return the staleness counter for the given transaction ID. Please see
+L for more information.
+
+=cut
+
+sub get_txn_staleness_counter {
my $self = shift;
- my ($obj, $tag, $index) = @_;
-
- my $location = unpack(
- $self->{long_pack},
- substr(
- $tag->{content},
- $index * $self->{long_size},
- $self->{long_size},
- ),
+ my ($trans_id) = @_;
+
+ # Hardcode staleness of 0 for the HEAD
+ return 0 unless $trans_id;
+
+ return unpack( $StP{$STALE_SIZE},
+ $self->storage->read_at(
+ $self->trans_loc + $self->txn_bitfield_len + $STALE_SIZE * ($trans_id - 1),
+ $STALE_SIZE,
+ )
);
+}
+
+=head2 inc_txn_staleness_counter( $trans_id )
+
+This will increment the staleness counter for the given transaction ID. Please see
+L for more information.
+
+=cut
+
+sub inc_txn_staleness_counter {
+ my $self = shift;
+ my ($trans_id) = @_;
+
+ # Hardcode staleness of 0 for the HEAD
+ return 0 unless $trans_id;
+
+ $self->storage->print_at(
+ $self->trans_loc + $self->txn_bitfield_len + $STALE_SIZE * ($trans_id - 1),
+ pack( $StP{$STALE_SIZE}, $self->get_txn_staleness_counter( $trans_id ) + 1 ),
+ );
+}
+
+=head2 get_entries()
+
+This takes no arguments.
+
+This returns a list of all the sectors that have been modified by this transaction.
+
+=cut
+
+sub get_entries {
+ my $self = shift;
+ return [ keys %{ $self->{entries}{$self->trans_id} ||= {} } ];
+}
+
+=head2 add_entry( $trans_id, $location )
+
+This takes a transaction ID and a file location and marks the sector at that location
+as having been modified by the transaction identified by $trans_id.
+
+This returns nothing.
+
+B: Unlike all the other _entries() methods, there are several cases where
+C<< $trans_id != $self->trans_id >> for this method.
+
+=cut
+
+sub add_entry {
+ my $self = shift;
+ my ($trans_id, $loc) = @_;
+
+ $self->{entries}{$trans_id} ||= {};
+ $self->{entries}{$trans_id}{$loc} = undef;
+}
+
+=head2 reindex_entry( $old_loc, $new_loc )
+
+This takes two locations (old and new, respectively). If a location that has been
+modified by this transaction is subsequently reindexed due to a bucketlist
+overflowing, then the entries hash needs to be made aware of this change.
- if (!$location) { return; }
+This returns nothing.
- return $self->load_tag( $obj, $location );
+=cut
+
+sub reindex_entry {
+ my $self = shift;
+ my ($old_loc, $new_loc) = @_;
+
+ TRANS:
+ while ( my ($trans_id, $locs) = each %{ $self->{entries} } ) {
+ if ( exists $locs->{$old_loc} ) {
+ delete $locs->{$old_loc};
+ $locs->{$new_loc} = undef;
+ next TRANS;
+ }
+ }
}
-sub traverse_index {
- ##
- # Scan index and recursively step into deeper levels, looking for next key.
- ##
+=head2 clear_entries()
+
+This takes no arguments. It will clear the entries list for the running transaction.
+
+This returns nothing.
+
+=cut
+
+sub clear_entries {
my $self = shift;
- my ($obj, $offset, $ch, $force_return_next) = @_;
+ delete $self->{entries}{$self->trans_id};
+}
+
+=head2 _write_file_header()
+
+This writes the file header for a new file. This will write the various settings
+that set how the file is interpreted.
+
+=head2 _read_file_header()
+
+This reads the file header from an existing file. This will read the various
+settings that set how the file is interpreted.
+
+=cut
- my $tag = $self->load_tag($obj, $offset );
+{
+ my $header_fixed = length( SIG_FILE ) + 1 + 4 + 4;
+ my $this_file_version = 3;
- my $fh = $obj->_fh;
+ sub _write_file_header {
+ my $self = shift;
- if ($tag->{signature} ne SIG_BLIST) {
- my $content = $tag->{content};
- my $start = $obj->{return_next} ? 0 : ord(substr($obj->{prev_md5}, $ch, 1));
+ my $nt = $self->num_txns;
+ my $bl = $self->txn_bitfield_len;
- for (my $index = $start; $index < 256; $index++) {
- my $subloc = unpack(
- $self->{long_pack},
- substr($content, $index * $self->{long_size}, $self->{long_size}),
+ my $header_var = 1 + 1 + 1 + 1 + $bl + $STALE_SIZE * ($nt - 1) + 3 * $self->byte_size;
+
+ my $loc = $self->storage->request_space( $header_fixed + $header_var );
+
+ $self->storage->print_at( $loc,
+ SIG_FILE,
+ SIG_HEADER,
+ pack('N', $this_file_version), # At this point, we're at 9 bytes
+ pack('N', $header_var), # header size
+ # --- Above is $header_fixed. Below is $header_var
+ pack('C', $self->byte_size),
+
+ # These shenanigans are to allow a 256 within a C
+ pack('C', $self->max_buckets - 1),
+ pack('C', $self->data_sector_size - 1),
+
+ pack('C', $nt),
+ pack('C' . $bl, 0 ), # Transaction activeness bitfield
+ pack($StP{$STALE_SIZE}.($nt-1), 0 x ($nt-1) ), # Transaction staleness counters
+ pack($StP{$self->byte_size}, 0), # Start of free chain (blist size)
+ pack($StP{$self->byte_size}, 0), # Start of free chain (data size)
+ pack($StP{$self->byte_size}, 0), # Start of free chain (index size)
+ );
+
+ #XXX Set these less fragilely
+ $self->set_trans_loc( $header_fixed + 4 );
+ $self->set_chains_loc( $header_fixed + 4 + $bl + $STALE_SIZE * ($nt-1) );
+
+ return;
+ }
+
+ sub _read_file_header {
+ my $self = shift;
+
+ my $buffer = $self->storage->read_at( 0, $header_fixed );
+ return unless length($buffer);
+
+ my ($file_signature, $sig_header, $file_version, $size) = unpack(
+ 'A4 A N N', $buffer
+ );
+
+ unless ( $file_signature eq SIG_FILE ) {
+ $self->storage->close;
+ DBM::Deep->_throw_error( "Signature not found -- file is not a Deep DB" );
+ }
+
+ unless ( $sig_header eq SIG_HEADER ) {
+ $self->storage->close;
+ DBM::Deep->_throw_error( "Pre-1.00 file version found" );
+ }
+
+ unless ( $file_version == $this_file_version ) {
+ $self->storage->close;
+ DBM::Deep->_throw_error(
+ "Wrong file version found - " . $file_version .
+ " - expected " . $this_file_version
);
+ }
- if ($subloc) {
- my $result = $self->traverse_index(
- $obj, $subloc, $ch + 1, $force_return_next,
- );
+ my $buffer2 = $self->storage->read_at( undef, $size );
+ my @values = unpack( 'C C C C', $buffer2 );
- if (defined($result)) { return $result; }
- }
- } # index loop
+ if ( @values != 4 || grep { !defined } @values ) {
+ $self->storage->close;
+ DBM::Deep->_throw_error("Corrupted file - bad header");
+ }
- $obj->{return_next} = 1;
- } # tag is an index
+ #XXX Add warnings if values weren't set right
+ @{$self}{qw(byte_size max_buckets data_sector_size num_txns)} = @values;
- else {
- my $keys = $tag->{content};
- if ($force_return_next) { $obj->{return_next} = 1; }
-
- ##
- # Iterate through buckets, looking for a key match
- ##
- for (my $i = 0; $i < $self->{max_buckets}; $i++) {
- my ($key, $subloc) = $self->_get_key_subloc( $keys, $i );
-
- # End of bucket list -- return to outer loop
- if (!$subloc) {
- $obj->{return_next} = 1;
- last;
- }
- # Located previous key -- return next one found
- elsif ($key eq $obj->{prev_md5}) {
- $obj->{return_next} = 1;
- next;
- }
- # Seek to bucket location and skip over signature
- elsif ($obj->{return_next}) {
- seek($fh, $subloc + $obj->_root->{file_offset}, SEEK_SET);
-
- # Skip over value to get to plain key
- my $sig;
- read( $fh, $sig, SIG_SIZE );
-
- my $size;
- read( $fh, $size, $self->{data_size});
- $size = unpack($self->{data_pack}, $size);
- if ($size) { seek($fh, $size, SEEK_CUR); }
-
- # Read in plain key and return as scalar
- my $plain_key;
- read( $fh, $size, $self->{data_size});
- $size = unpack($self->{data_pack}, $size);
- if ($size) { read( $fh, $plain_key, $size); }
-
- return $plain_key;
- }
+ # These shenangians are to allow a 256 within a C
+ $self->{max_buckets} += 1;
+ $self->{data_sector_size} += 1;
+
+ my $bl = $self->txn_bitfield_len;
+
+ my $header_var = scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) + 3 * $self->byte_size;
+ unless ( $size == $header_var ) {
+ $self->storage->close;
+ DBM::Deep->_throw_error( "Unexpected size found ($size <-> $header_var)." );
}
- $obj->{return_next} = 1;
- } # tag is a bucket list
+ $self->set_trans_loc( $header_fixed + scalar(@values) );
+ $self->set_chains_loc( $header_fixed + scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) );
- return;
+ return length($buffer) + length($buffer2);
+ }
}
-sub get_next_key {
- ##
- # Locate next key, given digested previous one
- ##
+=head2 _load_sector( $offset )
+
+This will instantiate and return the sector object that represents the data found
+at $offset.
+
+=cut
+
+sub _load_sector {
my $self = shift;
- my ($obj) = @_;
+ my ($offset) = @_;
+
+ # Add a catch for offset of 0 or 1
+ return if !$offset || $offset <= 1;
- $obj->{prev_md5} = $_[1] ? $_[1] : undef;
- $obj->{return_next} = 0;
+ my $type = $self->storage->read_at( $offset, 1 );
+ return if $type eq chr(0);
- ##
- # If the previous key was not specifed, start at the top and
- # return the first one found.
- ##
- if (!$obj->{prev_md5}) {
- $obj->{prev_md5} = chr(0) x $self->{hash_size};
- $obj->{return_next} = 1;
+ if ( $type eq $self->SIG_ARRAY || $type eq $self->SIG_HASH ) {
+ return DBM::Deep::Engine::Sector::Reference->new({
+ engine => $self,
+ type => $type,
+ offset => $offset,
+ });
}
+ # XXX Don't we need key_md5 here?
+ elsif ( $type eq $self->SIG_BLIST ) {
+ return DBM::Deep::Engine::Sector::BucketList->new({
+ engine => $self,
+ type => $type,
+ offset => $offset,
+ });
+ }
+ elsif ( $type eq $self->SIG_INDEX ) {
+ return DBM::Deep::Engine::Sector::Index->new({
+ engine => $self,
+ type => $type,
+ offset => $offset,
+ });
+ }
+ elsif ( $type eq $self->SIG_NULL ) {
+ return DBM::Deep::Engine::Sector::Null->new({
+ engine => $self,
+ type => $type,
+ offset => $offset,
+ });
+ }
+ elsif ( $type eq $self->SIG_DATA ) {
+ return DBM::Deep::Engine::Sector::Scalar->new({
+ engine => $self,
+ type => $type,
+ offset => $offset,
+ });
+ }
+ # This was deleted from under us, so just return and let the caller figure it out.
+ elsif ( $type eq $self->SIG_FREE ) {
+ return;
+ }
+
+ DBM::Deep->_throw_error( "'$offset': Don't know what to do with type '$type'" );
+}
+
+=head2 _apply_digest( @stuff )
+
+This will apply the digest methd (default to Digest::MD5::md5) to the arguments
+passed in and return the result.
+
+=cut
- return $self->traverse_index( $obj, $obj->_base_offset, 0 );
+sub _apply_digest {
+ my $self = shift;
+ return $self->{digest}->(@_);
}
-# Utilities
+=head2 _add_free_blist_sector( $offset, $size )
+
+=head2 _add_free_data_sector( $offset, $size )
+
+=head2 _add_free_index_sector( $offset, $size )
+
+These methods are all wrappers around _add_free_sector(), providing the proper
+chain offset ($multiple) for the sector type.
+
+=cut
+
+sub _add_free_blist_sector { shift->_add_free_sector( 0, @_ ) }
+sub _add_free_data_sector { shift->_add_free_sector( 1, @_ ) }
+sub _add_free_index_sector { shift->_add_free_sector( 2, @_ ) }
+
+=head2 _add_free_sector( $multiple, $offset, $size )
+
+_add_free_sector() takes the offset into the chains location, the offset of the
+sector, and the size of that sector. It will mark the sector as a free sector
+and put it into the list of sectors that are free of this type for use later.
+
+This returns nothing.
+
+B: $size is unused?
-sub _get_key_subloc {
+=cut
+
+sub _add_free_sector {
my $self = shift;
- my ($keys, $idx) = @_;
-
- my ($key, $subloc) = unpack(
- "a$self->{hash_size} $self->{long_pack}",
- substr(
- $keys,
- ($idx * $self->{bucket_size}),
- $self->{bucket_size},
- ),
+ my ($multiple, $offset, $size) = @_;
+
+ my $chains_offset = $multiple * $self->byte_size;
+
+ my $storage = $self->storage;
+
+ # Increment staleness.
+ # XXX Can this increment+modulo be done by "&= 0x1" ?
+ my $staleness = unpack( $StP{$STALE_SIZE}, $storage->read_at( $offset + SIG_SIZE, $STALE_SIZE ) );
+ $staleness = ($staleness + 1 ) % ( 2 ** ( 8 * $STALE_SIZE ) );
+ $storage->print_at( $offset + SIG_SIZE, pack( $StP{$STALE_SIZE}, $staleness ) );
+
+ my $old_head = $storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
+
+ $storage->print_at( $self->chains_loc + $chains_offset,
+ pack( $StP{$self->byte_size}, $offset ),
);
- return ($key, $subloc);
+ # Record the old head in the new sector after the signature and staleness counter
+ $storage->print_at( $offset + SIG_SIZE + $STALE_SIZE, $old_head );
}
-sub _find_in_buckets {
+=head2 _request_blist_sector( $size )
+
+=head2 _request_data_sector( $size )
+
+=head2 _request_index_sector( $size )
+
+These methods are all wrappers around _request_sector(), providing the proper
+chain offset ($multiple) for the sector type.
+
+=cut
+
+sub _request_blist_sector { shift->_request_sector( 0, @_ ) }
+sub _request_data_sector { shift->_request_sector( 1, @_ ) }
+sub _request_index_sector { shift->_request_sector( 2, @_ ) }
+
+=head2 _request_sector( $multiple $size )
+
+This takes the offset into the chains location and the size of that sector.
+
+This returns the object with the sector. If there is an available free sector of
+that type, then it will be reused. If there isn't one, then a new one will be
+allocated.
+
+=cut
+
+sub _request_sector {
my $self = shift;
- my ($tag, $md5) = @_;
+ my ($multiple, $size) = @_;
- BUCKET:
- for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
- my ($key, $subloc) = $self->_get_key_subloc( $tag->{content}, $i );
+ my $chains_offset = $multiple * $self->byte_size;
- return ($subloc, $i * $self->{bucket_size}) unless $subloc;
+ my $old_head = $self->storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
+ my $loc = unpack( $StP{$self->byte_size}, $old_head );
- next BUCKET if $key ne $md5;
+ # We don't have any free sectors of the right size, so allocate a new one.
+ unless ( $loc ) {
+ my $offset = $self->storage->request_space( $size );
- return ($subloc, $i * $self->{bucket_size});
+ # Zero out the new sector. This also guarantees correct increases
+ # in the filesize.
+ $self->storage->print_at( $offset, chr(0) x $size );
+
+ return $offset;
}
- return;
+ # Read the new head after the signature and the staleness counter
+ my $new_head = $self->storage->read_at( $loc + SIG_SIZE + $STALE_SIZE, $self->byte_size );
+ $self->storage->print_at( $self->chains_loc + $chains_offset, $new_head );
+ $self->storage->print_at(
+ $loc + SIG_SIZE + $STALE_SIZE,
+ pack( $StP{$self->byte_size}, 0 ),
+ );
+
+ return $loc;
+}
+
+=head2 flush()
+
+This takes no arguments. It will do everything necessary to flush all things to
+disk. This is usually called during unlock() and setup_fh().
+
+This returns nothing.
+
+=cut
+
+sub flush {
+ my $self = shift;
+
+ # Why do we need to have the storage flush? Shouldn't autoflush take care of things?
+ # -RobK, 2008-06-26
+ $self->storage->flush;
+}
+
+=head2 ACCESSORS
+
+The following are readonly attributes.
+
+=over 4
+
+=item * storage
+
+=item * byte_size
+
+=item * hash_size
+
+=item * hash_chars
+
+=item * num_txns
+
+=item * max_buckets
+
+=item * blank_md5
+
+=item * data_sector_size
+
+=item * txn_bitfield_len
+
+=back
+
+=cut
+
+sub storage { $_[0]{storage} }
+sub byte_size { $_[0]{byte_size} }
+sub hash_size { $_[0]{hash_size} }
+sub hash_chars { $_[0]{hash_chars} }
+sub num_txns { $_[0]{num_txns} }
+sub max_buckets { $_[0]{max_buckets} }
+sub blank_md5 { chr(0) x $_[0]->hash_size }
+sub data_sector_size { $_[0]{data_sector_size} }
+
+# This is a calculated value
+sub txn_bitfield_len {
+ my $self = shift;
+ unless ( exists $self->{txn_bitfield_len} ) {
+ my $temp = ($self->num_txns) / 8;
+ if ( $temp > int( $temp ) ) {
+ $temp = int( $temp ) + 1;
+ }
+ $self->{txn_bitfield_len} = $temp;
+ }
+ return $self->{txn_bitfield_len};
+}
+
+=pod
+
+The following are read/write attributes.
+
+=over 4
+
+=item * trans_id / set_trans_id( $new_id )
+
+=item * trans_loc / set_trans_loc( $new_loc )
+
+=item * chains_loc / set_chains_loc( $new_loc )
+
+=back
+
+=cut
+
+sub trans_id { $_[0]{trans_id} }
+sub set_trans_id { $_[0]{trans_id} = $_[1] }
+
+sub trans_loc { $_[0]{trans_loc} }
+sub set_trans_loc { $_[0]{trans_loc} = $_[1] }
+
+sub chains_loc { $_[0]{chains_loc} }
+sub set_chains_loc { $_[0]{chains_loc} = $_[1] }
+
+sub cache { $_[0]{cache} ||= {} }
+sub clear_cache { %{$_[0]->cache} = () }
+
+=head2 _dump_file()
+
+This method takes no arguments. It's used to print out a textual representation of the DBM::Deep
+DB file. It assumes the file is not-corrupted.
+
+=cut
+
+sub _dump_file {
+ my $self = shift;
+
+ # Read the header
+ my $spot = $self->_read_file_header();
+
+ my %types = (
+ 0 => 'B',
+ 1 => 'D',
+ 2 => 'I',
+ );
+
+ my %sizes = (
+ 'D' => $self->data_sector_size,
+ 'B' => DBM::Deep::Engine::Sector::BucketList->new({engine=>$self,offset=>1})->size,
+ 'I' => DBM::Deep::Engine::Sector::Index->new({engine=>$self,offset=>1})->size,
+ );
+
+ my $return = "";
+
+ # Header values
+ $return .= "NumTxns: " . $self->num_txns . $/;
+
+ # Read the free sector chains
+ my %sectors;
+ foreach my $multiple ( 0 .. 2 ) {
+ $return .= "Chains($types{$multiple}):";
+ my $old_loc = $self->chains_loc + $multiple * $self->byte_size;
+ while ( 1 ) {
+ my $loc = unpack(
+ $StP{$self->byte_size},
+ $self->storage->read_at( $old_loc, $self->byte_size ),
+ );
+
+ # We're now out of free sectors of this kind.
+ unless ( $loc ) {
+ last;
+ }
+
+ $sectors{ $types{$multiple} }{ $loc } = undef;
+ $old_loc = $loc + SIG_SIZE + $STALE_SIZE;
+ $return .= " $loc";
+ }
+ $return .= $/;
+ }
+
+ SECTOR:
+ while ( $spot < $self->storage->{end} ) {
+ # Read each sector in order.
+ my $sector = $self->_load_sector( $spot );
+ if ( !$sector ) {
+ # Find it in the free-sectors that were found already
+ foreach my $type ( keys %sectors ) {
+ if ( exists $sectors{$type}{$spot} ) {
+ my $size = $sizes{$type};
+ $return .= sprintf "%08d: %s %04d\n", $spot, 'F' . $type, $size;
+ $spot += $size;
+ next SECTOR;
+ }
+ }
+
+ die "********\n$return\nDidn't find free sector for $spot in chains\n********\n";
+ }
+ else {
+ $return .= sprintf "%08d: %s %04d", $spot, $sector->type, $sector->size;
+ if ( $sector->type eq 'D' ) {
+ $return .= ' ' . $sector->data;
+ }
+ elsif ( $sector->type eq 'A' || $sector->type eq 'H' ) {
+ $return .= ' REF: ' . $sector->get_refcount;
+ }
+ elsif ( $sector->type eq 'B' ) {
+ foreach my $bucket ( $sector->chopped_up ) {
+ $return .= "\n ";
+ $return .= sprintf "%08d", unpack($StP{$self->byte_size},
+ substr( $bucket->[-1], $self->hash_size, $self->byte_size),
+ );
+ my $l = unpack( $StP{$self->byte_size},
+ substr( $bucket->[-1],
+ $self->hash_size + $self->byte_size,
+ $self->byte_size,
+ ),
+ );
+ $return .= sprintf " %08d", $l;
+ foreach my $txn ( 0 .. $self->num_txns - 2 ) {
+ my $l = unpack( $StP{$self->byte_size},
+ substr( $bucket->[-1],
+ $self->hash_size + 2 * $self->byte_size + $txn * ($self->byte_size + $STALE_SIZE),
+ $self->byte_size,
+ ),
+ );
+ $return .= sprintf " %08d", $l;
+ }
+ }
+ }
+ $return .= $/;
+
+ $spot += $sector->size;
+ }
+ }
+
+ return $return;
}
1;