use strict;
use warnings FATAL => 'all';
-use DBM::Deep::Engine::Sector::BucketList;
-use DBM::Deep::Engine::Sector::Index;
-use DBM::Deep::Engine::Sector::Null;
-use DBM::Deep::Engine::Sector::Reference;
-use DBM::Deep::Engine::Sector::Scalar;
-use DBM::Deep::Iterator;
-
# Never import symbols into our namespace. We are a class, not a library.
# -RobK, 2008-05-27
use Scalar::Util ();
);
sub StP { $StP{$_[1]} }
+# Import these after the SIG_* definitions because those definitions are used
+# in the headers of these classes. -RobK, 2008-06-20
+use DBM::Deep::Engine::Sector::BucketList;
+use DBM::Deep::Engine::Sector::FileHeader;
+use DBM::Deep::Engine::Sector::Index;
+use DBM::Deep::Engine::Sector::Null;
+use DBM::Deep::Engine::Sector::Reference;
+use DBM::Deep::Engine::Sector::Scalar;
+use DBM::Deep::Iterator;
+
################################################################################
sub new {
my $class = shift;
my ($args) = @_;
+ $args->{storage} = DBM::Deep::File->new( $args )
+ unless exists $args->{storage};
+
my $self = bless {
byte_size => 4,
# This will be a Reference sector
my $sector = $self->_load_sector( $obj->_base_offset )
- or DBM::Deep->_throw_error( "How did get_classname fail (no sector for '$obj')?!" );
+ or DBM::Deep->_throw_error( "How did make_reference fail (no sector for '$obj')?!" );
if ( $sector->staleness != $obj->_staleness ) {
return;
# This will be a Reference sector
my $sector = $self->_load_sector( $obj->_base_offset )
- or DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep." );
+ or DBM::Deep->_throw_error( "1: Cannot write to a deleted spot in DBM::Deep." );
if ( $sector->staleness != $obj->_staleness ) {
- DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep." );
+ DBM::Deep->_throw_error( "2: Cannot write to a deleted spot in DBM::Deep." );
}
my ($class, $type);
my $self = shift;
my ($obj) = @_;
- # We're opening the file.
- unless ( $obj->_base_offset ) {
- my $bytes_read = $self->_read_file_header;
+ return 1 if $obj->_base_offset;
- # Creating a new file
- unless ( $bytes_read ) {
- $self->_write_file_header;
+ my $header = $self->_load_header;
- # 1) Create Array/Hash entry
- my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
- engine => $self,
- type => $obj->_type,
- });
- $obj->{base_offset} = $initial_reference->offset;
- $obj->{staleness} = $initial_reference->staleness;
+ # Creating a new file
+ if ( $header->is_new ) {
+ # 1) Create Array/Hash entry
+ my $sector = DBM::Deep::Engine::Sector::Reference->new({
+ engine => $self,
+ type => $obj->_type,
+ });
+ $obj->{base_offset} = $sector->offset;
+ $obj->{staleness} = $sector->staleness;
- $self->storage->flush;
+ $self->flush;
+ }
+ # Reading from an existing file
+ else {
+ $obj->{base_offset} = $header->size;
+ my $sector = DBM::Deep::Engine::Sector::Reference->new({
+ engine => $self,
+ offset => $obj->_base_offset,
+ });
+ unless ( $sector ) {
+ DBM::Deep->_throw_error("Corrupted file, no master index record");
}
- # Reading from an existing file
- else {
- $obj->{base_offset} = $bytes_read;
- my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
- engine => $self,
- offset => $obj->_base_offset,
- });
- unless ( $initial_reference ) {
- DBM::Deep->_throw_error("Corrupted file, no master index record");
- }
- unless ($obj->_type eq $initial_reference->type) {
- DBM::Deep->_throw_error("File type mismatch");
- }
-
- $obj->{staleness} = $initial_reference->staleness;
+ unless ($obj->_type eq $sector->type) {
+ DBM::Deep->_throw_error("File type mismatch");
}
+
+ $obj->{staleness} = $sector->staleness;
}
+ $self->storage->set_inode;
+
return 1;
}
DBM::Deep->_throw_error( "Cannot rollback without an active transaction" );
}
- # Each entry is the file location for a bucket that has a modification for
- # this transaction. The entries need to be expunged.
- foreach my $entry (@{ $self->get_entries } ) {
- # Remove the entry here
- my $read_loc = $entry
- + $self->hash_size
- + $self->byte_size
- + $self->byte_size
- + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE );
-
- my $data_loc = $self->storage->read_at( $read_loc, $self->byte_size );
- $data_loc = unpack( $StP{$self->byte_size}, $data_loc );
- $self->storage->print_at( $read_loc, pack( $StP{$self->byte_size}, 0 ) );
-
- if ( $data_loc > 1 ) {
- $self->_load_sector( $data_loc )->free;
- }
+ foreach my $entry ( @{ $self->get_entries } ) {
+ my ($sector, $idx) = split ':', $entry;
+ $self->_load_sector( $sector )->rollback( $idx );
}
$self->clear_entries;
DBM::Deep->_throw_error( "Cannot commit without an active transaction" );
}
- foreach my $entry (@{ $self->get_entries } ) {
- # Overwrite the entry in head with the entry in trans_id
- my $base = $entry
- + $self->hash_size
- + $self->byte_size;
-
- my $head_loc = $self->storage->read_at( $base, $self->byte_size );
- $head_loc = unpack( $StP{$self->byte_size}, $head_loc );
-
- my $spot = $base + $self->byte_size + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE );
- my $trans_loc = $self->storage->read_at(
- $spot, $self->byte_size,
- );
-
- $self->storage->print_at( $base, $trans_loc );
- $self->storage->print_at(
- $spot,
- pack( $StP{$self->byte_size} . ' ' . $StP{$STALE_SIZE}, (0) x 2 ),
- );
-
- if ( $head_loc > 1 ) {
- $self->_load_sector( $head_loc )->free;
- }
+ foreach my $entry ( @{ $self->get_entries } ) {
+ my ($sector, $idx) = split ':', $entry;
+ $self->_load_sector( $sector )->commit( $idx );
}
$self->clear_entries;
sub read_txn_slots {
my $self = shift;
- my $bl = $self->txn_bitfield_len;
- my $num_bits = $bl * 8;
- return split '', unpack( 'b'.$num_bits,
- $self->storage->read_at(
- $self->trans_loc, $bl,
- )
- );
+ return $self->_load_header->read_txn_slots(@_);
}
sub write_txn_slots {
my $self = shift;
- my $num_bits = $self->txn_bitfield_len * 8;
- $self->storage->print_at( $self->trans_loc,
- pack( 'b'.$num_bits, join('', @_) ),
- );
+ return $self->_load_header->write_txn_slots(@_);
}
sub get_running_txn_ids {
sub get_txn_staleness_counter {
my $self = shift;
- my ($trans_id) = @_;
-
- # Hardcode staleness of 0 for the HEAD
- return 0 unless $trans_id;
-
- return unpack( $StP{$STALE_SIZE},
- $self->storage->read_at(
- $self->trans_loc + $self->txn_bitfield_len + $STALE_SIZE * ($trans_id - 1),
- $STALE_SIZE,
- )
- );
+ return $self->_load_header->get_txn_staleness_counter(@_);
}
sub inc_txn_staleness_counter {
my $self = shift;
- my ($trans_id) = @_;
-
- # Hardcode staleness of 0 for the HEAD
- return 0 unless $trans_id;
-
- $self->storage->print_at(
- $self->trans_loc + $self->txn_bitfield_len + $STALE_SIZE * ($trans_id - 1),
- pack( $StP{$STALE_SIZE}, $self->get_txn_staleness_counter( $trans_id ) + 1 ),
- );
+ return $self->_load_header->inc_txn_staleness_counter(@_);
}
sub get_entries {
sub add_entry {
my $self = shift;
- my ($trans_id, $loc) = @_;
+ my ($trans_id, $loc, $idx) = @_;
+
+ return unless $trans_id;
$self->{entries}{$trans_id} ||= {};
- $self->{entries}{$trans_id}{$loc} = undef;
+ $self->{entries}{$trans_id}{"$loc:$idx"} = undef;
}
# If the buckets are being relocated because of a reindexing, the entries
# mechanism needs to be made aware of it.
sub reindex_entry {
my $self = shift;
- my ($old_loc, $new_loc) = @_;
+ my ($old_loc, $old_idx, $new_loc, $new_idx) = @_;
TRANS:
while ( my ($trans_id, $locs) = each %{ $self->{entries} } ) {
- if ( exists $locs->{$old_loc} ) {
- delete $locs->{$old_loc};
- $locs->{$new_loc} = undef;
+ if ( exists $locs->{"$old_loc:$old_idx"} ) {
+ delete $locs->{"$old_loc:$old_idx"};
+ $locs->{"$new_loc:$new_idx"} = undef;
next TRANS;
}
}
################################################################################
-{
- my $header_fixed = length( SIG_FILE ) + 1 + 4 + 4;
- my $this_file_version = 3;
-
- sub _write_file_header {
- my $self = shift;
-
- my $nt = $self->num_txns;
- my $bl = $self->txn_bitfield_len;
-
- my $header_var = 1 + 1 + 1 + 1 + $bl + $STALE_SIZE * ($nt - 1) + 3 * $self->byte_size;
-
- my $loc = $self->storage->request_space( $header_fixed + $header_var );
+sub _apply_digest {
+ my $self = shift;
+ return $self->{digest}->(@_);
+}
- $self->storage->print_at( $loc,
- SIG_FILE,
- SIG_HEADER,
- pack('N', $this_file_version), # At this point, we're at 9 bytes
- pack('N', $header_var), # header size
- # --- Above is $header_fixed. Below is $header_var
- pack('C', $self->byte_size),
+sub _add_free_blist_sector { shift->_add_free_sector( 0, @_ ) }
+sub _add_free_data_sector { shift->_add_free_sector( 1, @_ ) }
+sub _add_free_index_sector { shift->_add_free_sector( 2, @_ ) }
+sub _add_free_sector { shift->_load_header->add_free_sector( @_ ) }
- # These shenanigans are to allow a 256 within a C
- pack('C', $self->max_buckets - 1),
- pack('C', $self->data_sector_size - 1),
+sub _request_blist_sector { shift->_request_sector( 0, @_ ) }
+sub _request_data_sector { shift->_request_sector( 1, @_ ) }
+sub _request_index_sector { shift->_request_sector( 2, @_ ) }
+sub _request_sector { shift->_load_header->request_sector( @_ ) }
- pack('C', $nt),
- pack('C' . $bl, 0 ), # Transaction activeness bitfield
- pack($StP{$STALE_SIZE}.($nt-1), 0 x ($nt-1) ), # Transaction staleness counters
- pack($StP{$self->byte_size}, 0), # Start of free chain (blist size)
- pack($StP{$self->byte_size}, 0), # Start of free chain (data size)
- pack($StP{$self->byte_size}, 0), # Start of free chain (index size)
- );
+################################################################################
- #XXX Set these less fragilely
- $self->set_trans_loc( $header_fixed + 4 );
- $self->set_chains_loc( $header_fixed + 4 + $bl + $STALE_SIZE * ($nt-1) );
+{
+ my %t = (
+ SIG_ARRAY => 'Reference',
+ SIG_HASH => 'Reference',
+ SIG_BLIST => 'BucketList',
+ SIG_INDEX => 'Index',
+ SIG_NULL => 'Null',
+ SIG_DATA => 'Scalar',
+ );
- return;
+ my %class_for;
+ while ( my ($k,$v) = each %t ) {
+ $class_for{ DBM::Deep::Engine->$k } = "DBM::Deep::Engine::Sector::$v";
}
- sub _read_file_header {
+ sub load_sector {
my $self = shift;
+ my ($offset) = @_;
- my $buffer = $self->storage->read_at( 0, $header_fixed );
- return unless length($buffer);
-
- my ($file_signature, $sig_header, $file_version, $size) = unpack(
- 'A4 A N N', $buffer
- );
-
- unless ( $file_signature eq SIG_FILE ) {
- $self->storage->close;
- DBM::Deep->_throw_error( "Signature not found -- file is not a Deep DB" );
- }
-
- unless ( $sig_header eq SIG_HEADER ) {
- $self->storage->close;
- DBM::Deep->_throw_error( "Pre-1.00 file version found" );
- }
+ my $data = $self->get_data( $offset )
+ or return;#die "Cannot read from '$offset'\n";
+ my $type = substr( $$data, 0, 1 );
+ my $class = $class_for{ $type };
+ return $class->new({
+ engine => $self,
+ type => $type,
+ offset => $offset,
+ });
+ }
+ *_load_sector = \&load_sector;
- unless ( $file_version == $this_file_version ) {
- $self->storage->close;
- DBM::Deep->_throw_error(
- "Wrong file version found - " . $file_version .
- " - expected " . $this_file_version
- );
- }
+ sub load_header {
+ my $self = shift;
- my $buffer2 = $self->storage->read_at( undef, $size );
- my @values = unpack( 'C C C C', $buffer2 );
+ #XXX Does this mean we make too many objects? -RobK, 2008-06-23
+ return DBM::Deep::Engine::Sector::FileHeader->new({
+ engine => $self,
+ offset => 0,
+ });
+ }
+ *_load_header = \&load_header;
- if ( @values != 4 || grep { !defined } @values ) {
- $self->storage->close;
- DBM::Deep->_throw_error("Corrupted file - bad header");
- }
+ sub get_data {
+ my $self = shift;
+ my ($offset, $size) = @_;
+ return unless defined $offset;
- #XXX Add warnings if values weren't set right
- @{$self}{qw(byte_size max_buckets data_sector_size num_txns)} = @values;
+ unless ( exists $self->sector_cache->{$offset} ) {
+ # Don't worry about the header sector. It will manage itself.
+ return unless $offset;
- # These shenangians are to allow a 256 within a C
- $self->{max_buckets} += 1;
- $self->{data_sector_size} += 1;
+ if ( !defined $size ) {
+ my $type = $self->storage->read_at( $offset, 1 )
+ or die "($offset): Cannot read from '$offset' to find the type\n";
- my $bl = $self->txn_bitfield_len;
+ if ( $type eq $self->SIG_FREE ) {
+ return;
+ }
- my $header_var = scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) + 3 * $self->byte_size;
- unless ( $size == $header_var ) {
- $self->storage->close;
- DBM::Deep->_throw_error( "Unexpected size found ($size <-> $header_var)." );
+ my $class = $class_for{$type}
+ or die "($offset): Cannot find class for '$type'\n";
+ $size = $class->size( $self )
+ or die "($offset): '$class' doesn't return a size\n";
+ $self->sector_cache->{$offset} = $type . $self->storage->read_at( undef, $size - 1 );
+ }
+ else {
+ $self->sector_cache->{$offset} = $self->storage->read_at( $offset, $size )
+ or return;
+ }
}
- $self->set_trans_loc( $header_fixed + scalar(@values) );
- $self->set_chains_loc( $header_fixed + scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) );
-
- return length($buffer) + length($buffer2);
+ return \$self->sector_cache->{$offset};
}
}
-sub _load_sector {
+sub sector_cache {
my $self = shift;
- my ($offset) = @_;
-
- # Add a catch for offset of 0 or 1
- return if !$offset || $offset <= 1;
-
- my $type = $self->storage->read_at( $offset, 1 );
- return if $type eq chr(0);
-
- if ( $type eq $self->SIG_ARRAY || $type eq $self->SIG_HASH ) {
- return DBM::Deep::Engine::Sector::Reference->new({
- engine => $self,
- type => $type,
- offset => $offset,
- });
- }
- # XXX Don't we need key_md5 here?
- elsif ( $type eq $self->SIG_BLIST ) {
- return DBM::Deep::Engine::Sector::BucketList->new({
- engine => $self,
- type => $type,
- offset => $offset,
- });
- }
- elsif ( $type eq $self->SIG_INDEX ) {
- return DBM::Deep::Engine::Sector::Index->new({
- engine => $self,
- type => $type,
- offset => $offset,
- });
- }
- elsif ( $type eq $self->SIG_NULL ) {
- return DBM::Deep::Engine::Sector::Null->new({
- engine => $self,
- type => $type,
- offset => $offset,
- });
- }
- elsif ( $type eq $self->SIG_DATA ) {
- return DBM::Deep::Engine::Sector::Scalar->new({
- engine => $self,
- type => $type,
- offset => $offset,
- });
- }
- # This was deleted from under us, so just return and let the caller figure it out.
- elsif ( $type eq $self->SIG_FREE ) {
- return;
- }
+ return $self->{sector_cache} ||= {};
+}
- DBM::Deep->_throw_error( "'$offset': Don't know what to do with type '$type'" );
+sub clear_sector_cache {
+ my $self = shift;
+ $self->{sector_cache} = {};
}
-sub _apply_digest {
+sub dirty_sectors {
my $self = shift;
- return $self->{digest}->(@_);
+ return $self->{dirty_sectors} ||= {};
}
-sub _add_free_blist_sector { shift->_add_free_sector( 0, @_ ) }
-sub _add_free_data_sector { shift->_add_free_sector( 1, @_ ) }
-sub _add_free_index_sector { shift->_add_free_sector( 2, @_ ) }
+sub clear_dirty_sectors {
+ my $self = shift;
+ $self->{dirty_sectors} = {};
+}
-sub _add_free_sector {
+sub add_dirty_sector {
my $self = shift;
- my ($multiple, $offset, $size) = @_;
+ my ($offset) = @_;
- my $chains_offset = $multiple * $self->byte_size;
+ $self->dirty_sectors->{ $offset } = undef;
+}
- my $storage = $self->storage;
+sub flush {
+ my $self = shift;
- # Increment staleness.
- # XXX Can this increment+modulo be done by "&= 0x1" ?
- my $staleness = unpack( $StP{$STALE_SIZE}, $storage->read_at( $offset + SIG_SIZE, $STALE_SIZE ) );
- $staleness = ($staleness + 1 ) % ( 2 ** ( 8 * $STALE_SIZE ) );
- $storage->print_at( $offset + SIG_SIZE, pack( $StP{$STALE_SIZE}, $staleness ) );
+ my $sectors = $self->dirty_sectors;
+ for my $offset (sort { $a <=> $b } keys %{ $sectors }) {
+ $self->storage->print_at( $offset, $self->sector_cache->{$offset} );
+ }
- my $old_head = $storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
+ # Why do we need to have the storage flush? Shouldn't autoflush take care of things?
+ # -RobK, 2008-06-26
+ $self->storage->flush;
- $storage->print_at( $self->chains_loc + $chains_offset,
- pack( $StP{$self->byte_size}, $offset ),
- );
+ $self->clear_dirty_sectors;
- # Record the old head in the new sector after the signature and staleness counter
- $storage->print_at( $offset + SIG_SIZE + $STALE_SIZE, $old_head );
+ $self->clear_sector_cache;
}
-sub _request_blist_sector { shift->_request_sector( 0, @_ ) }
-sub _request_data_sector { shift->_request_sector( 1, @_ ) }
-sub _request_index_sector { shift->_request_sector( 2, @_ ) }
+################################################################################
-sub _request_sector {
+sub lock_exclusive {
my $self = shift;
- my ($multiple, $size) = @_;
-
- my $chains_offset = $multiple * $self->byte_size;
-
- my $old_head = $self->storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
- my $loc = unpack( $StP{$self->byte_size}, $old_head );
+ my ($obj) = @_;
+ return $self->storage->lock_exclusive( $obj );
+}
- # We don't have any free sectors of the right size, so allocate a new one.
- unless ( $loc ) {
- my $offset = $self->storage->request_space( $size );
+sub lock_shared {
+ my $self = shift;
+ my ($obj) = @_;
+ return $self->storage->lock_shared( $obj );
+}
- # Zero out the new sector. This also guarantees correct increases
- # in the filesize.
- $self->storage->print_at( $offset, chr(0) x $size );
+sub unlock {
+ my $self = shift;
+ my ($obj) = @_;
- return $offset;
- }
+ my $rv = $self->storage->unlock( $obj );
- # Read the new head after the signature and the staleness counter
- my $new_head = $self->storage->read_at( $loc + SIG_SIZE + $STALE_SIZE, $self->byte_size );
- $self->storage->print_at( $self->chains_loc + $chains_offset, $new_head );
- $self->storage->print_at(
- $loc + SIG_SIZE + $STALE_SIZE,
- pack( $StP{$self->byte_size}, 0 ),
- );
+ $self->flush if $rv;
- return $loc;
+ return $rv;
}
################################################################################
sub _dump_file {
my $self = shift;
+ $self->flush;
# Read the header
- my $spot = $self->_read_file_header();
+ my $header_sector = DBM::Deep::Engine::Sector::FileHeader->new({
+ engine => $self,
+ });
my %types = (
0 => 'B',
my $return = "";
+ # Filesize
+ $return .= "Size: " . (-s $self->storage->{fh}) . $/;
+
# Header values
$return .= "NumTxns: " . $self->num_txns . $/;
$return .= $/;
}
+ my $spot = $header_sector->size;
SECTOR:
while ( $spot < $self->storage->{end} ) {
# Read each sector in order.