my $class = shift;
my ($args) = @_;
+ print "\n********* NEW ********\n\n";
my $self = bless {
byte_size => 4,
num_txns => 16, # HEAD plus 15 running txns
trans_id => 0, # Default to the HEAD
+ entries => {}, # This is the list of entries for transactions
storage => undef,
}, $class;
key_md5 => $self->_apply_digest( $key ),
allow_head => 0,
});
-
- my $key_md5 = $self->_apply_digest( $key );
-
- # XXX What should happen if this fails?
- my $blist = $sector->get_bucket_list({
- key_md5 => $key_md5,
- }) or die "How did delete_key fail (no blist)?!\n";
-
- return $blist->delete_md5( $key_md5 );
}
sub write_value {
my ($obj) = @_;
if ( $self->trans_id ) {
- DBM::Deep->throw_error( "Cannot begin_work within a transaction" );
+ DBM::Deep->_throw_error( "Cannot begin_work within a transaction" );
}
my @slots = $self->read_transaction_slots;
$self->write_transaction_slots( @slots );
if ( !$self->trans_id ) {
- DBM::Deep->throw_error( "Cannot begin_work - no available transactions" );
+ DBM::Deep->_throw_error( "Cannot begin_work - no available transactions" );
}
return;
my ($obj) = @_;
if ( !$self->trans_id ) {
- DBM::Deep->throw_error( "Cannot rollback without a transaction" );
+ DBM::Deep->_throw_error( "Cannot rollback without a transaction" );
+ }
+
+ # Each entry is the file location for a bucket that has a modification for
+ # this transaction. The entries need to be expunged.
+ foreach my $entry (@{ $self->get_entries } ) {
+ # Remove the entry here
+ my $read_loc = $entry
+ + $self->hash_size
+ + $self->byte_size
+ + $self->trans_id * $self->byte_size;
+
+ my $data_loc = $self->storage->read_at( $read_loc, $self->byte_size );
+ $data_loc = unpack( $StP{$self->byte_size}, $data_loc );
+ $self->storage->print_at( $read_loc, pack( $StP{$self->byte_size}, 0 ) );
+
+ if ( $data_loc > 1 ) {
+ $self->_load_sector( $data_loc )->free;
+ }
}
+
+ $self->clear_entries;
+
+ my @slots = $self->read_transaction_slots;
+ $slots[$self->trans_id] = 0;
+ $self->write_transaction_slots( @slots );
+ $self->set_trans_id( 0 );
+
+ return 1;
}
sub commit {
my ($obj) = @_;
if ( !$self->trans_id ) {
- DBM::Deep->throw_error( "Cannot commit without a transaction" );
+ DBM::Deep->_throw_error( "Cannot commit without a transaction" );
+ }
+
+ print "TID: " . $self->trans_id, $/;
+ foreach my $entry (@{ $self->get_entries } ) {
+ print "$entry\n";
+ # Overwrite the entry in head with the entry in trans_id
+ my $base = $entry
+ + $self->hash_size
+ + $self->byte_size;
+
+ my $head_loc = $self->storage->read_at( $base, $self->byte_size );
+ $head_loc = unpack( $StP{$self->byte_size}, $head_loc );
+ my $trans_loc = $self->storage->read_at(
+ $base + $self->trans_id * $self->byte_size, $self->byte_size,
+ );
+
+ $self->storage->print_at( $base, $trans_loc );
+ $self->storage->print_at(
+ $base + $self->trans_id * $self->byte_size,
+ pack( $StP{$self->byte_size}, 0 ),
+ );
+
+ if ( $head_loc > 1 ) {
+ $self->_load_sector( $head_loc )->free;
+ }
}
+
+ $self->clear_entries;
+
+ my @slots = $self->read_transaction_slots;
+ $slots[$self->trans_id] = 0;
+ $self->write_transaction_slots( @slots );
+ $self->set_trans_id( 0 );
+
+ return 1;
}
sub read_transaction_slots {
);
}
+sub get_entries {
+ my $self = shift;
+ return [ keys %{ $self->{entries}{$self->trans_id} ||= {} } ];
+}
+
+sub add_entry {
+ my $self = shift;
+ my ($trans_id, $loc) = @_;
+
+ print "$trans_id => $loc\n";
+ $self->{entries}{$trans_id} ||= {};
+ $self->{entries}{$trans_id}{$loc} = undef;
+ use Data::Dumper;print "$self: " . Dumper $self->{entries};
+}
+
+sub clear_entries {
+ my $self = shift;
+ print "Clearing\n";
+ delete $self->{entries}{$self->trans_id};
+}
+
################################################################################
{
my $blist = $self->get_bucket_list({
key_md5 => $args->{key_md5},
create => 1,
- }) or die "How did write_value fail (no blist)?!\n";
+ }) or die "How did write_data fail (no blist)?!\n";
# Handle any transactional bookkeeping.
if ( $self->engine->trans_id ) {
my $old_value = $blist->get_data_for;
foreach my $other_trans_id ( @trans_ids ) {
next if $blist->get_data_location_for({ trans_id => $other_trans_id, allow_head => 0 });
+ print "write_md5 to save a value\n";
$blist->write_md5({
trans_id => $other_trans_id,
key => $args->{key},
}
}
+ #XXX Is this safe to do transactionally?
# Free the place we're about to write to.
if ( $blist->get_data_location_for({ allow_head => 0 }) ) {
$blist->get_data_for({ allow_head => 0 })->free;
key_md5 => $args->{key_md5},
}) or die "How did delete_key fail (no blist)?!\n";
- return $blist->delete_md5( $args );
+ # Save the location so that we can free the data
+ my $location = $blist->get_data_location_for({
+ allow_head => 0,
+ });
+ my $old_value = $self->engine->_load_sector( $location );
+
+ if ( $self->engine->trans_id == 0 ) {
+ my @transactions = $self->engine->read_transaction_slots;
+ my @trans_ids = grep { $transactions[$_] } 0 .. $#transactions;
+ if ( @trans_ids ) {
+ foreach my $other_trans_id ( @trans_ids ) {
+ next if $blist->get_data_location_for({ trans_id => $other_trans_id, allow_head => 0 });
+ $blist->write_md5({
+ trans_id => $other_trans_id,
+ key => $args->{key},
+ key_md5 => $args->{key_md5},
+ value => $old_value->clone,
+ });
+ }
+ }
+ }
+
+ $blist->mark_deleted( $args );
+
+ my $data = $old_value->data;
+ $old_value->free;
+
+ return $data;
}
sub get_blist_loc {
# If we don't have an MD5, then what are we supposed to do?
unless ( exists $self->{key_md5} ) {
- DBM::Deep->throw( "Cannot find_md5 without a key_md5 set" );
+ DBM::Deep->_throw_error( "Cannot find_md5 without a key_md5 set" );
}
my $e = $self->engine;
sub write_md5 {
my $self = shift;
my ($args) = @_;
- $args ||= {};
- $args->{trans_id} = $self->engine->trans_id unless exists $args->{trans_id};
+ DBM::Deep->_throw_error( "write_md5: no key" ) unless exists $args->{key};
+ DBM::Deep->_throw_error( "write_md5: no key_md5" ) unless exists $args->{key_md5};
+ DBM::Deep->_throw_error( "write_md5: no value" ) unless exists $args->{value};
my $engine = $self->engine;
+
+ $args->{trans_id} = $engine->trans_id unless exists $args->{trans_id};
+
my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
+ print "Adding $args->{trans_id} -> $spot\n";
+ $engine->add_entry( $args->{trans_id}, $spot );
unless ($self->{found}) {
my $key_sector = DBM::Deep::Engine::Sector::Scalar->new({
- engine => $self->engine,
+ engine => $engine,
data => $args->{key},
});
$engine->storage->print_at( $spot,
$args->{key_md5},
- pack( $StP{$self->engine->byte_size}, $key_sector->offset ),
+ pack( $StP{$engine->byte_size}, $key_sector->offset ),
);
}
- $engine->storage->print_at(
- $spot
+ my $loc = $spot
+ $engine->hash_size
+ $engine->byte_size
- + $args->{trans_id} * $engine->byte_size,
+ + $args->{trans_id} * $engine->byte_size;
+
+ $engine->storage->print_at( $loc,
pack( $StP{$engine->byte_size}, $args->{value}->offset ),
);
}
sub mark_deleted {
my $self = shift;
my ($args) = @_;
+ $args ||= {};
+
+ my $engine = $self->engine;
+
+ $args->{trans_id} = $engine->trans_id unless exists $args->{trans_id};
my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
- $self->engine->storage->print_at(
- $spot
- + $self->engine->hash_size
- + $self->engine->byte_size
- + $args->{trans_id} * $self->engine->byte_size,
- pack( $StP{$self->engine->byte_size}, 1 ), # 1 is the marker for deleted
+ $engine->add_entry( $args->{trans_id}, $spot );
+
+ my $loc = $spot
+ + $engine->hash_size
+ + $engine->byte_size
+ + $args->{trans_id} * $engine->byte_size;
+
+ $engine->storage->print_at( $loc,
+ pack( $StP{$engine->byte_size}, 1 ), # 1 is the marker for deleted
);
}
});
my $key_sector = $self->get_key_for;
+ #XXX This isn't going to work right and you know it! This eradicates data
+ # that we're not ready to eradicate just yet.
my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
$engine->storage->print_at( $spot,
$engine->storage->read_at(
cmp_bag( [ keys %$db1 ], [qw( x z )], "DB1 keys correct" );
cmp_bag( [ keys %$db2 ], [qw( x other_x )], "DB2 keys correct" );
-SKIP:{ skip "unfinished yet", 51 }
-__END__
$db1->rollback;
is( $db2->{other_x}, 'bar', "DB2 set other_x within DB1's transaction, so DB2 can see it" );
is( $db1->{other_x}, 'foo', "Since other_x was modified after the transaction began, DB1 doesn't see the change." );
- cmp_bag( [ keys %$db1 ], [qw( x other_x )], "DB1 keys correct" );
+ $db1->{z} = 'a';
+ is( $db1->{z}, 'a', "Within DB1 transaction, DB1's Z is A" );
+ ok( !exists $db2->{z}, "Since z was added after the transaction began, DB2 doesn't see it." );
+
+ cmp_bag( [ keys %$db1 ], [qw( x other_x z )], "DB1 keys correct" );
cmp_bag( [ keys %$db2 ], [qw( x other_x )], "DB2 keys correct" );
$db1->commit;
is( $db1->{x}, 'z', "After commit, DB1's X is Z" );
is( $db2->{x}, 'z', "After commit, DB2's X is Z" );
+is( $db1->{z}, 'a', "After commit, DB1's Z is A" );
+is( $db2->{z}, 'a', "After commit, DB2's Z is A" );
+
+is( $db1->{other_x}, 'bar', "After commit, DB1's other_x is bar" );
+is( $db2->{other_x}, 'bar', "After commit, DB2's other_x is bar" );
+
$db1->begin_work;
+ is( $db1->{x}, 'z', "After commit, DB1's X is Z" );
+ is( $db2->{x}, 'z', "After commit, DB2's X is Z" );
+
+ is( $db1->{z}, 'a', "After commit, DB1's Z is A" );
+ is( $db2->{z}, 'a', "After commit, DB2's Z is A" );
+
+ is( $db1->{other_x}, 'bar', "After begin_work, DB1's other_x is still bar" );
+ is( $db2->{other_x}, 'bar', "After begin_work, DB2's other_x is still bar" );
+__END__
delete $db2->{other_x};
ok( !exists $db2->{other_x}, "DB2 deleted other_x in DB1's transaction, so it can't see it anymore" );
is( $db1->{other_x}, 'bar', "Since other_x was deleted after the transaction began, DB1 still sees it." );
-
+__END__
cmp_bag( [ keys %$db1 ], [qw( x other_x )], "DB1 keys correct" );
cmp_bag( [ keys %$db2 ], [qw( x )], "DB2 keys correct" );