r15625@rob-kinyons-computer (orig r9171): rkinyon | 2007-02-26 11:56:32 -0500
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
index a491892..663a0f0 100644 (file)
@@ -3,8 +3,9 @@ package DBM::Deep::Engine;
 use 5.006_000;
 
 use strict;
+use warnings;
 
-our $VERSION = q(0.99_04);
+our $VERSION = q(1.0000);
 
 use Scalar::Util ();
 
@@ -17,7 +18,6 @@ use Scalar::Util ();
 # Setup file and tag signatures.  These should never change.
 sub SIG_FILE     () { 'DPDB' }
 sub SIG_HEADER   () { 'h'    }
-sub SIG_INTERNAL () { 'i'    }
 sub SIG_HASH     () { 'H'    }
 sub SIG_ARRAY    () { 'A'    }
 sub SIG_NULL     () { 'N'    }
@@ -25,13 +25,13 @@ sub SIG_DATA     () { 'D'    }
 sub SIG_INDEX    () { 'I'    }
 sub SIG_BLIST    () { 'B'    }
 sub SIG_FREE     () { 'F'    }
-sub SIG_KEYS     () { 'K'    }
 sub SIG_SIZE     () {  1     }
-sub STALE_SIZE   () {  1     }
+
+my $STALE_SIZE = 2;
 
 # Please refer to the pack() documentation for further information
 my %StP = (
-    1 => 'C', # Unsigned char value (no order specified, presumably ASCII)
+    1 => 'C', # Unsigned char value (no order needed as it's just one byte)
     2 => 'n', # Unsigned short in "network" (big-endian) order
     4 => 'N', # Unsigned long in "network" (big-endian) order
     8 => 'Q', # Usigned quad (no order specified, presumably machine-dependent)
@@ -50,13 +50,17 @@ sub new {
         hash_size   => 16,  # In bytes
         hash_chars  => 256, # Number of chars the algorithm uses per byte
         max_buckets => 16,
-        num_txns    => 2,   # HEAD plus 1 additional transaction for importing
+        num_txns    => 1,   # The HEAD
         trans_id    => 0,   # Default to the HEAD
 
+        data_sector_size => 64, # Size in bytes of each data sector
+
         entries => {}, # This is the list of entries for transactions
         storage => undef,
     }, $class;
 
+    # Never allow byte_size to be set directly.
+    delete $args->{byte_size};
     if ( defined $args->{pack_size} ) {
         if ( lc $args->{pack_size} eq 'small' ) {
             $args->{byte_size} = 2;
@@ -78,15 +82,26 @@ sub new {
         $self->{$param} = $args->{$param};
     }
 
-    ##
-    # Number of buckets per blist before another level of indexing is
-    # done. Increase this value for slightly greater speed, but larger database
-    # files. DO NOT decrease this value below 16, due to risk of recursive
-    # reindex overrun.
-    ##
-    if ( $self->{max_buckets} < 16 ) {
-        warn "Floor of max_buckets is 16. Setting it to 16 from '$self->{max_buckets}'\n";
-        $self->{max_buckets} = 16;
+    my %validations = (
+        max_buckets      => { floor => 16, ceil => 256 },
+        num_txns         => { floor => 1,  ceil => 255 },
+        data_sector_size => { floor => 32, ceil => 256 },
+    );
+
+    while ( my ($attr, $c) = each %validations ) {
+        if (   !defined $self->{$attr}
+            || !length $self->{$attr}
+            || $self->{$attr} =~ /\D/
+            || $self->{$attr} < $c->{floor}
+        ) {
+            $self->{$attr} = '(undef)' if !defined $self->{$attr};
+            warn "Floor of $attr is $c->{floor}. Setting it to $c->{floor} from '$self->{$attr}'\n";
+            $self->{$attr} = $c->{floor};
+        }
+        elsif ( $self->{$attr} > $c->{ceil} ) {
+            warn "Ceiling of $attr is $c->{ceil}. Setting it to $c->{ceil} from '$self->{$attr}'\n";
+            $self->{$attr} = $c->{ceil};
+        }
     }
 
     if ( !$self->{digest} ) {
@@ -345,12 +360,18 @@ sub begin_work {
     }
 
     my @slots = $self->read_txn_slots;
-    for my $i ( 1 .. @slots ) {
+    my $found;
+    for my $i ( 0 .. $#slots ) {
         next if $slots[$i];
+
         $slots[$i] = 1;
-        $self->set_trans_id( $i );
+        $self->set_trans_id( $i + 1 );
+        $found = 1;
         last;
     }
+    unless ( $found ) {
+        DBM::Deep->_throw_error( "Cannot allocate transaction ID" );
+    }
     $self->write_txn_slots( @slots );
 
     if ( !$self->trans_id ) {
@@ -375,7 +396,8 @@ sub rollback {
         my $read_loc = $entry
           + $self->hash_size
           + $self->byte_size
-          + $self->trans_id * ( $self->byte_size + 4 );
+          + $self->byte_size
+          + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE );
 
         my $data_loc = $self->storage->read_at( $read_loc, $self->byte_size );
         $data_loc = unpack( $StP{$self->byte_size}, $data_loc );
@@ -389,7 +411,7 @@ sub rollback {
     $self->clear_entries;
 
     my @slots = $self->read_txn_slots;
-    $slots[$self->trans_id] = 0;
+    $slots[$self->trans_id-1] = 0;
     $self->write_txn_slots( @slots );
     $self->inc_txn_staleness_counter( $self->trans_id );
     $self->set_trans_id( 0 );
@@ -413,14 +435,16 @@ sub commit {
 
         my $head_loc = $self->storage->read_at( $base, $self->byte_size );
         $head_loc = unpack( $StP{$self->byte_size}, $head_loc );
+
+        my $spot = $base + $self->byte_size + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE );
         my $trans_loc = $self->storage->read_at(
-            $base + $self->trans_id * ( $self->byte_size + 4 ), $self->byte_size,
+            $spot, $self->byte_size,
         );
 
         $self->storage->print_at( $base, $trans_loc );
         $self->storage->print_at(
-            $base + $self->trans_id * ( $self->byte_size + 4 ),
-            pack( $StP{$self->byte_size} . ' N', (0) x 2 ),
+            $spot,
+            pack( $StP{$self->byte_size} . ' ' . $StP{$STALE_SIZE}, (0) x 2 ),
         );
 
         if ( $head_loc > 1 ) {
@@ -431,7 +455,7 @@ sub commit {
     $self->clear_entries;
 
     my @slots = $self->read_txn_slots;
-    $slots[$self->trans_id] = 0;
+    $slots[$self->trans_id-1] = 0;
     $self->write_txn_slots( @slots );
     $self->inc_txn_staleness_counter( $self->trans_id );
     $self->set_trans_id( 0 );
@@ -441,24 +465,27 @@ sub commit {
 
 sub read_txn_slots {
     my $self = shift;
-    return split '', unpack( 'b32',
+    my $bl = $self->txn_bitfield_len;
+    my $num_bits = $bl * 8;
+    return split '', unpack( 'b'.$num_bits,
         $self->storage->read_at(
-            $self->trans_loc, 4,
+            $self->trans_loc, $bl,
         )
     );
 }
 
 sub write_txn_slots {
     my $self = shift;
+    my $num_bits = $self->txn_bitfield_len * 8;
     $self->storage->print_at( $self->trans_loc,
-        pack( 'b32', join('', @_) ),
+        pack( 'b'.$num_bits, join('', @_) ),
     );
 }
 
 sub get_running_txn_ids {
     my $self = shift;
     my @transactions = $self->read_txn_slots;
-    my @trans_ids = grep { $transactions[$_] } 0 .. $#transactions;
+    my @trans_ids = map { $_+1} grep { $transactions[$_] } 0 .. $#transactions;
 }
 
 sub get_txn_staleness_counter {
@@ -468,13 +495,12 @@ sub get_txn_staleness_counter {
     # Hardcode staleness of 0 for the HEAD
     return 0 unless $trans_id;
 
-    my $x = unpack( 'N',
+    return unpack( $StP{$STALE_SIZE},
         $self->storage->read_at(
-            $self->trans_loc + 4 * $trans_id,
+            $self->trans_loc + 4 + $STALE_SIZE * ($trans_id - 1),
             4,
         )
     );
-    return $x;
 }
 
 sub inc_txn_staleness_counter {
@@ -485,8 +511,8 @@ sub inc_txn_staleness_counter {
     return unless $trans_id;
 
     $self->storage->print_at(
-        $self->trans_loc + 4 * $trans_id,
-        pack( 'N', $self->get_txn_staleness_counter( $trans_id ) + 1 ),
+        $self->trans_loc + 4 + $STALE_SIZE * ($trans_id - 1),
+        pack( $StP{$STALE_SIZE}, $self->get_txn_staleness_counter( $trans_id ) + 1 ),
     );
 }
 
@@ -530,32 +556,41 @@ sub clear_entries {
 
 {
     my $header_fixed = length( SIG_FILE ) + 1 + 4 + 4;
+    my $this_file_version = 2;
 
     sub _write_file_header {
         my $self = shift;
 
-        my $header_var = 1 + 1 + 1 + 4 + 4 * $self->num_txns + 3 * $self->byte_size;
+        my $nt = $self->num_txns;
+        my $bl = $self->txn_bitfield_len;
+
+        my $header_var = 1 + 1 + 1 + 1 + $bl + $STALE_SIZE * ($nt - 1) + 3 * $self->byte_size;
 
         my $loc = $self->storage->request_space( $header_fixed + $header_var );
 
         $self->storage->print_at( $loc,
             SIG_FILE,
             SIG_HEADER,
-            pack('N', 1),           # header version - at this point, we're at 9 bytes
-            pack('N', $header_var), # header size
+            pack('N', $this_file_version), # At this point, we're at 9 bytes
+            pack('N', $header_var),        # header size
             # --- Above is $header_fixed. Below is $header_var
             pack('C', $self->byte_size),
-            pack('C', $self->max_buckets),
-            pack('C', $self->num_txns),
-            pack('N', 0 ),                   # Transaction activeness bitfield
-            pack('N' . $self->num_txns, 0 x $self->num_txns ), # Transaction staleness counters
+
+            # These shenanigans are to allow a 256 within a C
+            pack('C', $self->max_buckets - 1),
+            pack('C', $self->data_sector_size - 1),
+
+            pack('C', $nt),
+            pack('C' . $bl, 0 ),                           # Transaction activeness bitfield
+            pack($StP{$STALE_SIZE}.($nt-1), 0 x ($nt-1) ), # Transaction staleness counters
             pack($StP{$self->byte_size}, 0), # Start of free chain (blist size)
             pack($StP{$self->byte_size}, 0), # Start of free chain (data size)
             pack($StP{$self->byte_size}, 0), # Start of free chain (index size)
         );
 
-        $self->set_trans_loc( $header_fixed + 3 );
-        $self->set_chains_loc( $header_fixed + 3 + 4 + 4 * $self->num_txns );
+        #XXX Set these less fragilely
+        $self->set_trans_loc( $header_fixed + 4 );
+        $self->set_chains_loc( $header_fixed + 4 + $bl + $STALE_SIZE * ($nt-1) );
 
         return;
     }
@@ -566,7 +601,7 @@ sub clear_entries {
         my $buffer = $self->storage->read_at( 0, $header_fixed );
         return unless length($buffer);
 
-        my ($file_signature, $sig_header, $header_version, $size) = unpack(
+        my ($file_signature, $sig_header, $file_version, $size) = unpack(
             'A4 A N N', $buffer
         );
 
@@ -577,29 +612,43 @@ sub clear_entries {
 
         unless ( $sig_header eq SIG_HEADER ) {
             $self->storage->close;
-            DBM::Deep->_throw_error( "Old file version found." );
+            DBM::Deep->_throw_error( "Pre-1.00 file version found" );
+        }
+
+        unless ( $file_version == $this_file_version ) {
+            $self->storage->close;
+            DBM::Deep->_throw_error(
+                "Wrong file version found - " .  $file_version .
+                " - expected " . $this_file_version
+            );
         }
 
         my $buffer2 = $self->storage->read_at( undef, $size );
-        my @values = unpack( 'C C C', $buffer2 );
+        my @values = unpack( 'C C C C', $buffer2 );
 
-        if ( @values != 3 || grep { !defined } @values ) {
+        if ( @values != 4 || grep { !defined } @values ) {
             $self->storage->close;
             DBM::Deep->_throw_error("Corrupted file - bad header");
         }
 
-        $self->set_trans_loc( $header_fixed + scalar(@values) );
-        $self->set_chains_loc( $header_fixed + scalar(@values) + 4 + 4 * $self->num_txns );
-
         #XXX Add warnings if values weren't set right
-        @{$self}{qw(byte_size max_buckets num_txns)} = @values;
+        @{$self}{qw(byte_size max_buckets data_sector_size num_txns)} = @values;
 
-        my $header_var = scalar(@values) + 4 + 4 * $self->num_txns + 3 * $self->byte_size;
+        # These shenangians are to allow a 256 within a C
+        $self->{max_buckets} += 1;
+        $self->{data_sector_size} += 1;
+
+        my $bl = $self->txn_bitfield_len;
+
+        my $header_var = scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) + 3 * $self->byte_size;
         unless ( $size == $header_var ) {
             $self->storage->close;
             DBM::Deep->_throw_error( "Unexpected size found ($size <-> $header_var)." );
         }
 
+        $self->set_trans_loc( $header_fixed + scalar(@values) );
+        $self->set_chains_loc( $header_fixed + scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) );
+
         return length($buffer) + length($buffer2);
     }
 }
@@ -677,9 +726,9 @@ sub _add_free_sector {
 
     # Increment staleness.
     # XXX Can this increment+modulo be done by "&= 0x1" ?
-    my $staleness = unpack( $StP{STALE_SIZE()}, $storage->read_at( $offset + SIG_SIZE, STALE_SIZE ) );
-    $staleness = ($staleness + 1 ) % ( 2 ** ( 8 * STALE_SIZE ) );
-    $storage->print_at( $offset + SIG_SIZE, pack( $StP{STALE_SIZE()}, $staleness ) );
+    my $staleness = unpack( $StP{$STALE_SIZE}, $storage->read_at( $offset + SIG_SIZE, $STALE_SIZE ) );
+    $staleness = ($staleness + 1 ) % ( 2 ** ( 8 * $STALE_SIZE ) );
+    $storage->print_at( $offset + SIG_SIZE, pack( $StP{$STALE_SIZE}, $staleness ) );
 
     my $old_head = $storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
 
@@ -688,7 +737,7 @@ sub _add_free_sector {
     );
 
     # Record the old head in the new sector after the signature and staleness counter
-    $storage->print_at( $offset + SIG_SIZE + STALE_SIZE, $old_head );
+    $storage->print_at( $offset + SIG_SIZE + $STALE_SIZE, $old_head );
 }
 
 sub _request_blist_sector { shift->_request_sector( 0, @_ ) }
@@ -716,10 +765,10 @@ sub _request_sector {
     }
 
     # Read the new head after the signature and the staleness counter
-    my $new_head = $self->storage->read_at( $loc + SIG_SIZE + STALE_SIZE, $self->byte_size );
+    my $new_head = $self->storage->read_at( $loc + SIG_SIZE + $STALE_SIZE, $self->byte_size );
     $self->storage->print_at( $self->chains_loc + $chains_offset, $new_head );
     $self->storage->print_at(
-        $loc + SIG_SIZE + STALE_SIZE,
+        $loc + SIG_SIZE + $STALE_SIZE,
         pack( $StP{$self->byte_size}, 0 ),
     );
 
@@ -735,6 +784,20 @@ sub hash_chars  { $_[0]{hash_chars} }
 sub num_txns    { $_[0]{num_txns} }
 sub max_buckets { $_[0]{max_buckets} }
 sub blank_md5   { chr(0) x $_[0]->hash_size }
+sub data_sector_size { $_[0]{data_sector_size} }
+
+# This is a calculated value
+sub txn_bitfield_len {
+    my $self = shift;
+    unless ( exists $self->{txn_bitfield_len} ) {
+        my $temp = ($self->num_txns) / 8;
+        if ( $temp > int( $temp ) ) {
+            $temp = int( $temp ) + 1;
+        }
+        $self->{txn_bitfield_len} = $temp;
+    }
+    return $self->{txn_bitfield_len};
+}
 
 sub trans_id     { $_[0]{trans_id} }
 sub set_trans_id { $_[0]{trans_id} = $_[1] }
@@ -928,7 +991,7 @@ sub type   { $_[0]{type} }
 
 sub base_size {
    my $self = shift;
-   return $self->engine->SIG_SIZE + $self->engine->STALE_SIZE;
+   return $self->engine->SIG_SIZE + $STALE_SIZE;
 }
 
 sub free {
@@ -953,15 +1016,15 @@ package DBM::Deep::Engine::Sector::Data;
 our @ISA = qw( DBM::Deep::Engine::Sector );
 
 # This is in bytes
-sub size { return 256 }
+sub size { $_[0]{engine}->data_sector_size }
 sub free_meth { return '_add_free_data_sector' }
 
 sub clone {
     my $self = shift;
     return ref($self)->new({
         engine => $self->engine,
-        data   => $self->data,
         type   => $self->type,
+        data   => $self->data,
     });
 }
 
@@ -990,7 +1053,7 @@ sub _init {
     my $engine = $self->engine;
 
     unless ( $self->offset ) {
-        my $data_section = $self->size - $self->base_size - 1 * $engine->byte_size - 1;
+        my $data_section = $self->size - $self->base_size - $engine->byte_size - 1;
 
         $self->{offset} = $engine->_request_data_sector( $self->size );
 
@@ -1141,8 +1204,8 @@ sub _init {
     }
 
     $self->{staleness} = unpack(
-        $StP{$e->STALE_SIZE},
-        $e->storage->read_at( $self->offset + $e->SIG_SIZE, $e->STALE_SIZE ),
+        $StP{$STALE_SIZE},
+        $e->storage->read_at( $self->offset + $e->SIG_SIZE, $STALE_SIZE ),
     );
 
     return;
@@ -1524,7 +1587,7 @@ sub bucket_size {
     unless ( $self->{bucket_size} ) {
         my $e = $self->engine;
         # Key + head (location) + transactions (location + staleness-counter)
-        my $location_size = $e->byte_size + $e->num_txns * ( $e->byte_size + 4 );
+        my $location_size = $e->byte_size + $e->byte_size + ($e->num_txns - 1) * ($e->byte_size + $STALE_SIZE);
         $self->{bucket_size} = $e->hash_size + $location_size;
     }
     return $self->{bucket_size};
@@ -1637,13 +1700,21 @@ sub write_md5 {
 
     my $loc = $spot
       + $engine->hash_size
-      + $engine->byte_size
-      + $args->{trans_id} * ( $engine->byte_size + 4 );
+      + $engine->byte_size;
 
-    $engine->storage->print_at( $loc,
-        pack( $StP{$engine->byte_size}, $args->{value}->offset ),
-        pack( 'N', $engine->get_txn_staleness_counter( $args->{trans_id} ) ),
-    );
+    if ( $args->{trans_id} ) {
+        $loc += $engine->byte_size + ($args->{trans_id} - 1) * ( $engine->byte_size + $STALE_SIZE );
+
+        $engine->storage->print_at( $loc,
+            pack( $StP{$engine->byte_size}, $args->{value}->offset ),
+            pack( $StP{$STALE_SIZE}, $engine->get_txn_staleness_counter( $args->{trans_id} ) ),
+        );
+    }
+    else {
+        $engine->storage->print_at( $loc,
+            pack( $StP{$engine->byte_size}, $args->{value}->offset ),
+        );
+    }
 }
 
 sub mark_deleted {
@@ -1660,13 +1731,22 @@ sub mark_deleted {
 
     my $loc = $spot
       + $engine->hash_size
-      + $engine->byte_size
-      + $args->{trans_id} * ( $engine->byte_size + 4 );
+      + $engine->byte_size;
+
+    if ( $args->{trans_id} ) {
+        $loc += $engine->byte_size + ($args->{trans_id} - 1) * ( $engine->byte_size + $STALE_SIZE );
+
+        $engine->storage->print_at( $loc,
+            pack( $StP{$engine->byte_size}, 1 ), # 1 is the marker for deleted
+            pack( $StP{$STALE_SIZE}, $engine->get_txn_staleness_counter( $args->{trans_id} ) ),
+        );
+    }
+    else {
+        $engine->storage->print_at( $loc,
+            pack( $StP{$engine->byte_size}, 1 ), # 1 is the marker for deleted
+        );
+    }
 
-    $engine->storage->print_at( $loc,
-        pack( $StP{$engine->byte_size}, 1 ), # 1 is the marker for deleted
-        pack( 'N', $engine->get_txn_staleness_counter( $args->{trans_id} ) ),
-    );
 }
 
 sub delete_md5 {
@@ -1714,22 +1794,27 @@ sub get_data_location_for {
     my $spot = $self->offset + $self->base_size
       + $args->{idx} * $self->bucket_size
       + $e->hash_size
-      + $e->byte_size
-      + $args->{trans_id} * ( $e->byte_size + 4 );
+      + $e->byte_size;
+
+    if ( $args->{trans_id} ) {
+        $spot += $e->byte_size + ($args->{trans_id} - 1) * ( $e->byte_size + $STALE_SIZE );
+    }
 
     my $buffer = $e->storage->read_at(
         $spot,
-        $e->byte_size + 4,
+        $e->byte_size + $STALE_SIZE,
     );
-    my ($loc, $staleness) = unpack( $StP{$e->byte_size} . ' N', $buffer );
-
-    # We have found an entry that is old, so get rid of it
-    if ( $staleness != (my $s = $e->get_txn_staleness_counter( $args->{trans_id} ) ) ) {
-        $e->storage->print_at(
-            $spot,
-            pack( $StP{$e->byte_size} . ' N', (0) x 2 ), 
-        );
-        $loc = 0;
+    my ($loc, $staleness) = unpack( $StP{$e->byte_size} . ' ' . $StP{$STALE_SIZE}, $buffer );
+
+    if ( $args->{trans_id} ) {
+        # We have found an entry that is old, so get rid of it
+        if ( $staleness != (my $s = $e->get_txn_staleness_counter( $args->{trans_id} ) ) ) {
+            $e->storage->print_at(
+                $spot,
+                pack( $StP{$e->byte_size} . ' ' . $StP{$STALE_SIZE}, (0) x 2 ), 
+            );
+            $loc = 0;
+        }
     }
 
     # If we're in a transaction and we never wrote to this location, try the
@@ -1798,6 +1883,7 @@ sub _init {
     return $self;
 }
 
+#XXX Change here
 sub size {
     my $self = shift;
     unless ( $self->{size} ) {