X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=blobdiff_plain;f=lib%2FDBM%2FDeep%2FEngine.pm;h=0faa0d3d1c162fe4dc80c5067048d6e9456f497d;hb=97d40a0ac43400f8416c3e930c5626c5cb472a55;hp=fd7e78eaf6b12aedc728a8cb6d880c8b0cd97374;hpb=065b45be4e413444714f0b35aa9653e10753118b;p=dbsrgits%2FDBM-Deep.git diff --git a/lib/DBM/Deep/Engine.pm b/lib/DBM/Deep/Engine.pm index fd7e78e..0faa0d3 100644 --- a/lib/DBM/Deep/Engine.pm +++ b/lib/DBM/Deep/Engine.pm @@ -5,13 +5,6 @@ use 5.006_000; use strict; use warnings FATAL => 'all'; -use DBM::Deep::Engine::Sector::BucketList; -use DBM::Deep::Engine::Sector::Index; -use DBM::Deep::Engine::Sector::Null; -use DBM::Deep::Engine::Sector::Reference; -use DBM::Deep::Engine::Sector::Scalar; -use DBM::Deep::Iterator; - # Never import symbols into our namespace. We are a class, not a library. # -RobK, 2008-05-27 use Scalar::Util (); @@ -47,12 +40,25 @@ my %StP = ( ); sub StP { $StP{$_[1]} } +# Import these after the SIG_* definitions because those definitions are used +# in the headers of these classes. -RobK, 2008-06-20 +use DBM::Deep::Engine::Sector::BucketList; +use DBM::Deep::Engine::Sector::FileHeader; +use DBM::Deep::Engine::Sector::Index; +use DBM::Deep::Engine::Sector::Null; +use DBM::Deep::Engine::Sector::Reference; +use DBM::Deep::Engine::Sector::Scalar; +use DBM::Deep::Iterator; + ################################################################################ sub new { my $class = shift; my ($args) = @_; + $args->{storage} = DBM::Deep::File->new( $args ) + unless exists $args->{storage}; + my $self = bless { byte_size => 4, @@ -180,7 +186,7 @@ sub make_reference { # This will be a Reference sector my $sector = $self->_load_sector( $obj->_base_offset ) - or DBM::Deep->_throw_error( "How did get_classname fail (no sector for '$obj')?!" ); + or DBM::Deep->_throw_error( "How did make_reference fail (no sector for '$obj')?!" ); if ( $sector->staleness != $obj->_staleness ) { return; @@ -278,10 +284,10 @@ sub write_value { # This will be a Reference sector my $sector = $self->_load_sector( $obj->_base_offset ) - or DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep." ); + or DBM::Deep->_throw_error( "1: Cannot write to a deleted spot in DBM::Deep." ); if ( $sector->staleness != $obj->_staleness ) { - DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep." ); + DBM::Deep->_throw_error( "2: Cannot write to a deleted spot in DBM::Deep." ); } my ($class, $type); @@ -408,43 +414,42 @@ sub setup_fh { my $self = shift; my ($obj) = @_; - # We're opening the file. - unless ( $obj->_base_offset ) { - my $bytes_read = $self->_read_file_header; + return 1 if $obj->_base_offset; - # Creating a new file - unless ( $bytes_read ) { - $self->_write_file_header; + my $header = $self->_load_header; - # 1) Create Array/Hash entry - my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({ - engine => $self, - type => $obj->_type, - }); - $obj->{base_offset} = $initial_reference->offset; - $obj->{staleness} = $initial_reference->staleness; + # Creating a new file + if ( $header->is_new ) { + # 1) Create Array/Hash entry + my $sector = DBM::Deep::Engine::Sector::Reference->new({ + engine => $self, + type => $obj->_type, + }); + $obj->{base_offset} = $sector->offset; + $obj->{staleness} = $sector->staleness; - $self->storage->flush; + $self->flush; + } + # Reading from an existing file + else { + $obj->{base_offset} = $header->size; + my $sector = DBM::Deep::Engine::Sector::Reference->new({ + engine => $self, + offset => $obj->_base_offset, + }); + unless ( $sector ) { + DBM::Deep->_throw_error("Corrupted file, no master index record"); } - # Reading from an existing file - else { - $obj->{base_offset} = $bytes_read; - my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({ - engine => $self, - offset => $obj->_base_offset, - }); - unless ( $initial_reference ) { - DBM::Deep->_throw_error("Corrupted file, no master index record"); - } - unless ($obj->_type eq $initial_reference->type) { - DBM::Deep->_throw_error("File type mismatch"); - } - - $obj->{staleness} = $initial_reference->staleness; + unless ($obj->_type eq $sector->type) { + DBM::Deep->_throw_error("File type mismatch"); } + + $obj->{staleness} = $sector->staleness; } + $self->storage->set_inode; + return 1; } @@ -486,23 +491,9 @@ sub rollback { DBM::Deep->_throw_error( "Cannot rollback without an active transaction" ); } - # Each entry is the file location for a bucket that has a modification for - # this transaction. The entries need to be expunged. - foreach my $entry (@{ $self->get_entries } ) { - # Remove the entry here - my $read_loc = $entry - + $self->hash_size - + $self->byte_size - + $self->byte_size - + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE ); - - my $data_loc = $self->storage->read_at( $read_loc, $self->byte_size ); - $data_loc = unpack( $StP{$self->byte_size}, $data_loc ); - $self->storage->print_at( $read_loc, pack( $StP{$self->byte_size}, 0 ) ); - - if ( $data_loc > 1 ) { - $self->_load_sector( $data_loc )->free; - } + foreach my $entry ( @{ $self->get_entries } ) { + my ($sector, $idx) = split ':', $entry; + $self->_load_sector( $sector )->rollback( $idx ); } $self->clear_entries; @@ -524,29 +515,9 @@ sub commit { DBM::Deep->_throw_error( "Cannot commit without an active transaction" ); } - foreach my $entry (@{ $self->get_entries } ) { - # Overwrite the entry in head with the entry in trans_id - my $base = $entry - + $self->hash_size - + $self->byte_size; - - my $head_loc = $self->storage->read_at( $base, $self->byte_size ); - $head_loc = unpack( $StP{$self->byte_size}, $head_loc ); - - my $spot = $base + $self->byte_size + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE ); - my $trans_loc = $self->storage->read_at( - $spot, $self->byte_size, - ); - - $self->storage->print_at( $base, $trans_loc ); - $self->storage->print_at( - $spot, - pack( $StP{$self->byte_size} . ' ' . $StP{$STALE_SIZE}, (0) x 2 ), - ); - - if ( $head_loc > 1 ) { - $self->_load_sector( $head_loc )->free; - } + foreach my $entry ( @{ $self->get_entries } ) { + my ($sector, $idx) = split ':', $entry; + $self->_load_sector( $sector )->commit( $idx ); } $self->clear_entries; @@ -562,21 +533,12 @@ sub commit { sub read_txn_slots { my $self = shift; - my $bl = $self->txn_bitfield_len; - my $num_bits = $bl * 8; - return split '', unpack( 'b'.$num_bits, - $self->storage->read_at( - $self->trans_loc, $bl, - ) - ); + return $self->_load_header->read_txn_slots(@_); } sub write_txn_slots { my $self = shift; - my $num_bits = $self->txn_bitfield_len * 8; - $self->storage->print_at( $self->trans_loc, - pack( 'b'.$num_bits, join('', @_) ), - ); + return $self->_load_header->write_txn_slots(@_); } sub get_running_txn_ids { @@ -587,30 +549,12 @@ sub get_running_txn_ids { sub get_txn_staleness_counter { my $self = shift; - my ($trans_id) = @_; - - # Hardcode staleness of 0 for the HEAD - return 0 unless $trans_id; - - return unpack( $StP{$STALE_SIZE}, - $self->storage->read_at( - $self->trans_loc + $self->txn_bitfield_len + $STALE_SIZE * ($trans_id - 1), - $STALE_SIZE, - ) - ); + return $self->_load_header->get_txn_staleness_counter(@_); } sub inc_txn_staleness_counter { my $self = shift; - my ($trans_id) = @_; - - # Hardcode staleness of 0 for the HEAD - return 0 unless $trans_id; - - $self->storage->print_at( - $self->trans_loc + $self->txn_bitfield_len + $STALE_SIZE * ($trans_id - 1), - pack( $StP{$STALE_SIZE}, $self->get_txn_staleness_counter( $trans_id ) + 1 ), - ); + return $self->_load_header->inc_txn_staleness_counter(@_); } sub get_entries { @@ -620,23 +564,25 @@ sub get_entries { sub add_entry { my $self = shift; - my ($trans_id, $loc) = @_; + my ($trans_id, $loc, $idx) = @_; + + return unless $trans_id; $self->{entries}{$trans_id} ||= {}; - $self->{entries}{$trans_id}{$loc} = undef; + $self->{entries}{$trans_id}{"$loc:$idx"} = undef; } # If the buckets are being relocated because of a reindexing, the entries # mechanism needs to be made aware of it. sub reindex_entry { my $self = shift; - my ($old_loc, $new_loc) = @_; + my ($old_loc, $old_idx, $new_loc, $new_idx) = @_; TRANS: while ( my ($trans_id, $locs) = each %{ $self->{entries} } ) { - if ( exists $locs->{$old_loc} ) { - delete $locs->{$old_loc}; - $locs->{$new_loc} = undef; + if ( exists $locs->{"$old_loc:$old_idx"} ) { + delete $locs->{"$old_loc:$old_idx"}; + $locs->{"$new_loc:$new_idx"} = undef; next TRANS; } } @@ -649,225 +595,165 @@ sub clear_entries { ################################################################################ -{ - my $header_fixed = length( SIG_FILE ) + 1 + 4 + 4; - my $this_file_version = 3; - - sub _write_file_header { - my $self = shift; - - my $nt = $self->num_txns; - my $bl = $self->txn_bitfield_len; - - my $header_var = 1 + 1 + 1 + 1 + $bl + $STALE_SIZE * ($nt - 1) + 3 * $self->byte_size; - - my $loc = $self->storage->request_space( $header_fixed + $header_var ); +sub _apply_digest { + my $self = shift; + return $self->{digest}->(@_); +} - $self->storage->print_at( $loc, - SIG_FILE, - SIG_HEADER, - pack('N', $this_file_version), # At this point, we're at 9 bytes - pack('N', $header_var), # header size - # --- Above is $header_fixed. Below is $header_var - pack('C', $self->byte_size), +sub _add_free_blist_sector { shift->_add_free_sector( 0, @_ ) } +sub _add_free_data_sector { shift->_add_free_sector( 1, @_ ) } +sub _add_free_index_sector { shift->_add_free_sector( 2, @_ ) } +sub _add_free_sector { shift->_load_header->add_free_sector( @_ ) } - # These shenanigans are to allow a 256 within a C - pack('C', $self->max_buckets - 1), - pack('C', $self->data_sector_size - 1), +sub _request_blist_sector { shift->_request_sector( 0, @_ ) } +sub _request_data_sector { shift->_request_sector( 1, @_ ) } +sub _request_index_sector { shift->_request_sector( 2, @_ ) } +sub _request_sector { shift->_load_header->request_sector( @_ ) } - pack('C', $nt), - pack('C' . $bl, 0 ), # Transaction activeness bitfield - pack($StP{$STALE_SIZE}.($nt-1), 0 x ($nt-1) ), # Transaction staleness counters - pack($StP{$self->byte_size}, 0), # Start of free chain (blist size) - pack($StP{$self->byte_size}, 0), # Start of free chain (data size) - pack($StP{$self->byte_size}, 0), # Start of free chain (index size) - ); +################################################################################ - #XXX Set these less fragilely - $self->set_trans_loc( $header_fixed + 4 ); - $self->set_chains_loc( $header_fixed + 4 + $bl + $STALE_SIZE * ($nt-1) ); +{ + my %t = ( + SIG_ARRAY => 'Reference', + SIG_HASH => 'Reference', + SIG_BLIST => 'BucketList', + SIG_INDEX => 'Index', + SIG_NULL => 'Null', + SIG_DATA => 'Scalar', + ); - return; + my %class_for; + while ( my ($k,$v) = each %t ) { + $class_for{ DBM::Deep::Engine->$k } = "DBM::Deep::Engine::Sector::$v"; } - sub _read_file_header { + sub load_sector { my $self = shift; + my ($offset) = @_; - my $buffer = $self->storage->read_at( 0, $header_fixed ); - return unless length($buffer); - - my ($file_signature, $sig_header, $file_version, $size) = unpack( - 'A4 A N N', $buffer - ); - - unless ( $file_signature eq SIG_FILE ) { - $self->storage->close; - DBM::Deep->_throw_error( "Signature not found -- file is not a Deep DB" ); - } - - unless ( $sig_header eq SIG_HEADER ) { - $self->storage->close; - DBM::Deep->_throw_error( "Pre-1.00 file version found" ); - } + my $data = $self->get_data( $offset ) + or return;#die "Cannot read from '$offset'\n"; + my $type = substr( $$data, 0, 1 ); + my $class = $class_for{ $type }; + return $class->new({ + engine => $self, + type => $type, + offset => $offset, + }); + } + *_load_sector = \&load_sector; - unless ( $file_version == $this_file_version ) { - $self->storage->close; - DBM::Deep->_throw_error( - "Wrong file version found - " . $file_version . - " - expected " . $this_file_version - ); - } + sub load_header { + my $self = shift; - my $buffer2 = $self->storage->read_at( undef, $size ); - my @values = unpack( 'C C C C', $buffer2 ); + #XXX Does this mean we make too many objects? -RobK, 2008-06-23 + return DBM::Deep::Engine::Sector::FileHeader->new({ + engine => $self, + offset => 0, + }); + } + *_load_header = \&load_header; - if ( @values != 4 || grep { !defined } @values ) { - $self->storage->close; - DBM::Deep->_throw_error("Corrupted file - bad header"); - } + sub get_data { + my $self = shift; + my ($offset, $size) = @_; + return unless defined $offset; - #XXX Add warnings if values weren't set right - @{$self}{qw(byte_size max_buckets data_sector_size num_txns)} = @values; + unless ( exists $self->sector_cache->{$offset} ) { + # Don't worry about the header sector. It will manage itself. + return unless $offset; - # These shenangians are to allow a 256 within a C - $self->{max_buckets} += 1; - $self->{data_sector_size} += 1; + if ( !defined $size ) { + my $type = $self->storage->read_at( $offset, 1 ) + or die "($offset): Cannot read from '$offset' to find the type\n"; - my $bl = $self->txn_bitfield_len; + if ( $type eq $self->SIG_FREE ) { + return; + } - my $header_var = scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) + 3 * $self->byte_size; - unless ( $size == $header_var ) { - $self->storage->close; - DBM::Deep->_throw_error( "Unexpected size found ($size <-> $header_var)." ); + my $class = $class_for{$type} + or die "($offset): Cannot find class for '$type'\n"; + $size = $class->size( $self ) + or die "($offset): '$class' doesn't return a size\n"; + $self->sector_cache->{$offset} = $type . $self->storage->read_at( undef, $size - 1 ); + } + else { + $self->sector_cache->{$offset} = $self->storage->read_at( $offset, $size ) + or return; + } } - $self->set_trans_loc( $header_fixed + scalar(@values) ); - $self->set_chains_loc( $header_fixed + scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) ); - - return length($buffer) + length($buffer2); + return \$self->sector_cache->{$offset}; } } -sub _load_sector { +sub sector_cache { my $self = shift; - my ($offset) = @_; - - # Add a catch for offset of 0 or 1 - return if !$offset || $offset <= 1; - - my $type = $self->storage->read_at( $offset, 1 ); - return if $type eq chr(0); - - if ( $type eq $self->SIG_ARRAY || $type eq $self->SIG_HASH ) { - return DBM::Deep::Engine::Sector::Reference->new({ - engine => $self, - type => $type, - offset => $offset, - }); - } - # XXX Don't we need key_md5 here? - elsif ( $type eq $self->SIG_BLIST ) { - return DBM::Deep::Engine::Sector::BucketList->new({ - engine => $self, - type => $type, - offset => $offset, - }); - } - elsif ( $type eq $self->SIG_INDEX ) { - return DBM::Deep::Engine::Sector::Index->new({ - engine => $self, - type => $type, - offset => $offset, - }); - } - elsif ( $type eq $self->SIG_NULL ) { - return DBM::Deep::Engine::Sector::Null->new({ - engine => $self, - type => $type, - offset => $offset, - }); - } - elsif ( $type eq $self->SIG_DATA ) { - return DBM::Deep::Engine::Sector::Scalar->new({ - engine => $self, - type => $type, - offset => $offset, - }); - } - # This was deleted from under us, so just return and let the caller figure it out. - elsif ( $type eq $self->SIG_FREE ) { - return; - } + return $self->{sector_cache} ||= {}; +} - DBM::Deep->_throw_error( "'$offset': Don't know what to do with type '$type'" ); +sub clear_sector_cache { + my $self = shift; + $self->{sector_cache} = {}; } -sub _apply_digest { +sub dirty_sectors { my $self = shift; - return $self->{digest}->(@_); + return $self->{dirty_sectors} ||= {}; } -sub _add_free_blist_sector { shift->_add_free_sector( 0, @_ ) } -sub _add_free_data_sector { shift->_add_free_sector( 1, @_ ) } -sub _add_free_index_sector { shift->_add_free_sector( 2, @_ ) } +sub clear_dirty_sectors { + my $self = shift; + $self->{dirty_sectors} = {}; +} -sub _add_free_sector { +sub add_dirty_sector { my $self = shift; - my ($multiple, $offset, $size) = @_; + my ($offset) = @_; - my $chains_offset = $multiple * $self->byte_size; + $self->dirty_sectors->{ $offset } = undef; +} - my $storage = $self->storage; +sub flush { + my $self = shift; - # Increment staleness. - # XXX Can this increment+modulo be done by "&= 0x1" ? - my $staleness = unpack( $StP{$STALE_SIZE}, $storage->read_at( $offset + SIG_SIZE, $STALE_SIZE ) ); - $staleness = ($staleness + 1 ) % ( 2 ** ( 8 * $STALE_SIZE ) ); - $storage->print_at( $offset + SIG_SIZE, pack( $StP{$STALE_SIZE}, $staleness ) ); + my $sectors = $self->dirty_sectors; + for my $offset (sort { $a <=> $b } keys %{ $sectors }) { + $self->storage->print_at( $offset, $self->sector_cache->{$offset} ); + } - my $old_head = $storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size ); + # Why do we need to have the storage flush? Shouldn't autoflush take care of things? + # -RobK, 2008-06-26 + $self->storage->flush; - $storage->print_at( $self->chains_loc + $chains_offset, - pack( $StP{$self->byte_size}, $offset ), - ); + $self->clear_dirty_sectors; - # Record the old head in the new sector after the signature and staleness counter - $storage->print_at( $offset + SIG_SIZE + $STALE_SIZE, $old_head ); + $self->clear_sector_cache; } -sub _request_blist_sector { shift->_request_sector( 0, @_ ) } -sub _request_data_sector { shift->_request_sector( 1, @_ ) } -sub _request_index_sector { shift->_request_sector( 2, @_ ) } +################################################################################ -sub _request_sector { +sub lock_exclusive { my $self = shift; - my ($multiple, $size) = @_; - - my $chains_offset = $multiple * $self->byte_size; - - my $old_head = $self->storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size ); - my $loc = unpack( $StP{$self->byte_size}, $old_head ); + my ($obj) = @_; + return $self->storage->lock_exclusive( $obj ); +} - # We don't have any free sectors of the right size, so allocate a new one. - unless ( $loc ) { - my $offset = $self->storage->request_space( $size ); +sub lock_shared { + my $self = shift; + my ($obj) = @_; + return $self->storage->lock_shared( $obj ); +} - # Zero out the new sector. This also guarantees correct increases - # in the filesize. - $self->storage->print_at( $offset, chr(0) x $size ); +sub unlock { + my $self = shift; + my ($obj) = @_; - return $offset; - } + my $rv = $self->storage->unlock( $obj ); - # Read the new head after the signature and the staleness counter - my $new_head = $self->storage->read_at( $loc + SIG_SIZE + $STALE_SIZE, $self->byte_size ); - $self->storage->print_at( $self->chains_loc + $chains_offset, $new_head ); - $self->storage->print_at( - $loc + SIG_SIZE + $STALE_SIZE, - pack( $StP{$self->byte_size}, 0 ), - ); + $self->flush if $rv; - return $loc; + return $rv; } ################################################################################ @@ -908,9 +794,12 @@ sub clear_cache { %{$_[0]->cache} = () } sub _dump_file { my $self = shift; + $self->flush; # Read the header - my $spot = $self->_read_file_header(); + my $header_sector = DBM::Deep::Engine::Sector::FileHeader->new({ + engine => $self, + }); my %types = ( 0 => 'B', @@ -926,6 +815,9 @@ sub _dump_file { my $return = ""; + # Filesize + $return .= "Size: " . (-s $self->storage->{fh}) . $/; + # Header values $return .= "NumTxns: " . $self->num_txns . $/; @@ -952,6 +844,7 @@ sub _dump_file { $return .= $/; } + my $spot = $header_sector->size; SECTOR: while ( $spot < $self->storage->{end} ) { # Read each sector in order.