my $class = shift;
my ($args) = @_;
- print "\n********* NEW ********\n\n";
my $self = bless {
byte_size => 4,
DBM::Deep->_throw_error( "Cannot begin_work - no available transactions" );
}
+ $self->set_trans_ctr(
+ $self->get_txn_staleness_counter( $self->trans_id ),
+ );
+
return;
}
my $read_loc = $entry
+ $self->hash_size
+ $self->byte_size
- + $self->trans_id * $self->byte_size;
+ + $self->trans_id * ( 2 * $self->byte_size );
my $data_loc = $self->storage->read_at( $read_loc, $self->byte_size );
$data_loc = unpack( $StP{$self->byte_size}, $data_loc );
my @slots = $self->read_txn_slots;
$slots[$self->trans_id] = 0;
$self->write_txn_slots( @slots );
+ $self->inc_txn_staleness_counter( $self->trans_id );
+ $self->set_trans_ctr( 0 );
$self->set_trans_id( 0 );
return 1;
DBM::Deep->_throw_error( "Cannot commit without a transaction" );
}
- print "TID: " . $self->trans_id, $/;
+ #print "TID: " . $self->trans_id, $/;
foreach my $entry (@{ $self->get_entries } ) {
- print "$entry\n";
+ #print "$entry\n";
# Overwrite the entry in head with the entry in trans_id
my $base = $entry
+ $self->hash_size
my $head_loc = $self->storage->read_at( $base, $self->byte_size );
$head_loc = unpack( $StP{$self->byte_size}, $head_loc );
my $trans_loc = $self->storage->read_at(
- $base + $self->trans_id * $self->byte_size, $self->byte_size,
+ $base + $self->trans_id * ( 2 * $self->byte_size ), $self->byte_size,
);
$self->storage->print_at( $base, $trans_loc );
$self->storage->print_at(
- $base + $self->trans_id * $self->byte_size,
- pack( $StP{$self->byte_size}, 0 ),
+ $base + $self->trans_id * ( 2 * $self->byte_size ),
+ pack( $StP{$self->byte_size} . ' N', (0) x 2 ),
);
if ( $head_loc > 1 ) {
my @slots = $self->read_txn_slots;
$slots[$self->trans_id] = 0;
$self->write_txn_slots( @slots );
+ $self->inc_txn_staleness_counter( $self->trans_id );
+ $self->set_trans_ctr( 0 );
$self->set_trans_id( 0 );
return 1;
my @trans_ids = grep { $transactions[$_] } 0 .. $#transactions;
}
+sub get_txn_staleness_counter {
+ my $self = shift;
+ my ($trans_id) = @_;
+ return unpack( 'N',
+ $self->storage->read_at(
+ $self->trans_loc + 4 * ($trans_id - 1), 4,
+ )
+ );
+}
+
+sub inc_txn_staleness_counter {
+ my $self = shift;
+ my ($trans_id) = @_;
+ $self->storage->print_at(
+ $self->trans_loc,
+ pack( 'N', $self->get_txn_staleness_counter( $trans_id ) + 1 ),
+ );
+}
+
sub get_entries {
my $self = shift;
return [ keys %{ $self->{entries}{$self->trans_id} ||= {} } ];
my $self = shift;
my ($trans_id, $loc) = @_;
- print "$trans_id => $loc\n";
+ #print "$trans_id => $loc\n";
$self->{entries}{$trans_id} ||= {};
$self->{entries}{$trans_id}{$loc} = undef;
- use Data::Dumper;print "$self: " . Dumper $self->{entries};
+ #use Data::Dumper;print "$self: " . Dumper $self->{entries};
}
sub clear_entries {
my $self = shift;
- print "Clearing\n";
+ #print "Clearing\n";
delete $self->{entries}{$self->trans_id};
}
$self->set_trans_loc( $header_fixed + 2 );
$self->set_chains_loc( $header_fixed + 2 + 4 + 4 * $self->num_txns );
+ $self->set_trans_ctr( 0 );
return;
}
$self->set_trans_loc( $header_fixed + 2 );
$self->set_chains_loc( $header_fixed + 2 + 4 + 4 * $self->num_txns );
+ $self->set_trans_ctr( 0 );
if ( @values < 2 || grep { !defined } @values ) {
$self->storage->close;
sub trans_id { $_[0]{trans_id} }
sub set_trans_id { $_[0]{trans_id} = $_[1] }
+sub trans_ctr { $_[0]{trans_ctr} }
+sub set_trans_ctr { $_[0]{trans_ctr} = $_[1] }
+
sub trans_loc { $_[0]{trans_loc} }
sub set_trans_loc { $_[0]{trans_loc} = $_[1] }
my $old_value = $blist->get_data_for;
foreach my $other_trans_id ( @trans_ids ) {
next if $blist->get_data_location_for({ trans_id => $other_trans_id, allow_head => 0 });
- print "write_md5 to save a value\n";
+ #print "write_md5 to save a value\n";
$blist->write_md5({
trans_id => $other_trans_id,
key => $args->{key},
our @ISA = qw( DBM::Deep::Engine::Sector );
-sub idx_for_txn { return $_[1] + 1 }
-
sub _init {
my $self = shift;
return $self;
}
-sub base_size { 2 } # Sig + recycled counter
+sub base_size { 1 + 1 } # Sig + recycled counter
sub size {
my $self = shift;
my $self = shift;
unless ( $self->{bucket_size} ) {
my $e = $self->engine;
- # Key + transactions
- my $locs_size = (1 + $e->num_txns ) * $e->byte_size;
- $self->{bucket_size} = $e->hash_size + $locs_size;
+ # Key + head (location) + transactions (location + staleness-counter)
+ my $location_size = $e->byte_size + $e->num_txns * ( 2 * $e->byte_size );
+ $self->{bucket_size} = $e->hash_size + $location_size;
}
return $self->{bucket_size};
}
$args->{trans_id} = $engine->trans_id unless exists $args->{trans_id};
my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
- print "Adding $args->{trans_id} -> $spot\n";
+ #print "Adding $args->{trans_id} -> $spot\n";
$engine->add_entry( $args->{trans_id}, $spot );
unless ($self->{found}) {
my $loc = $spot
+ $engine->hash_size
+ $engine->byte_size
- + $args->{trans_id} * $engine->byte_size;
+ + $args->{trans_id} * ( 2 * $engine->byte_size );
$engine->storage->print_at( $loc,
pack( $StP{$engine->byte_size}, $args->{value}->offset ),
+ pack( 'N', $engine->get_txn_staleness_counter( $args->{trans_id} ) ),
);
}
my $loc = $spot
+ $engine->hash_size
+ $engine->byte_size
- + $args->{trans_id} * $engine->byte_size;
+ + $args->{trans_id} * ( 2 * $engine->byte_size );
$engine->storage->print_at( $loc,
pack( $StP{$engine->byte_size}, 1 ), # 1 is the marker for deleted
+ pack( 'N', $engine->get_txn_staleness_counter( $args->{trans_id} ) ),
);
}
$args->{trans_id} = $self->engine->trans_id unless exists $args->{trans_id};
$args->{idx} = $self->{idx} unless exists $args->{idx};
- my $location = $self->engine->storage->read_at(
- $self->offset + $self->base_size
+ my $e = $self->engine;
+
+ my $spot = $self->offset + $self->base_size
+ $args->{idx} * $self->bucket_size
- + $self->engine->hash_size
- + $self->engine->byte_size
- + $args->{trans_id} * $self->engine->byte_size,
- $self->engine->byte_size,
+ + $e->hash_size
+ + $e->byte_size
+ + $args->{trans_id} * ( 2 * $e->byte_size );
+
+ my $buffer = $e->storage->read_at(
+ $spot,
+ 2 * $e->byte_size,
);
- my $loc = unpack( $StP{$self->engine->byte_size}, $location );
+ my ($loc, $staleness) = unpack( $StP{$e->byte_size} . ' N', $buffer );
+
+ # We have found an entry that is old, so get rid of it
+ if ( $staleness != $e->get_txn_staleness_counter( $args->{trans_id} ) ) {
+ $e->storage->print_at(
+ $spot,
+ pack( $StP{$e->byte_size} . ' N', (0) x 2 ),
+ );
+ $loc = 0;
+ }
# If we're in a transaction and we never wrote to this location, try the
# HEAD instead.