Removed Engine.pm and Engine2.pm in preparation to moving Engine3.pm over to Engine.pm
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 use 5.6.0;
4
5 use strict;
6
7 our $VERSION = q(0.99_03);
8
9 use Scalar::Util ();
10
11 # File-wide notes:
12 # * Every method in here assumes that the storage has been appropriately
13 #   safeguarded. This can be anything from flock() to some sort of manual
14 #   mutex. But, it's the caller's responsability to make sure that this has
15 #   been done.
16
17 # Setup file and tag signatures.  These should never change.
18 sub SIG_FILE     () { 'DPDB' }
19 sub SIG_HEADER   () { 'h'    }
20 sub SIG_INTERNAL () { 'i'    }
21 sub SIG_HASH     () { 'H'    }
22 sub SIG_ARRAY    () { 'A'    }
23 sub SIG_NULL     () { 'N'    }
24 sub SIG_DATA     () { 'D'    }
25 sub SIG_INDEX    () { 'I'    }
26 sub SIG_BLIST    () { 'B'    }
27 sub SIG_FREE     () { 'F'    }
28 sub SIG_KEYS     () { 'K'    }
29 sub SIG_SIZE     () {  1     }
30 sub STALE_SIZE   () {  1     }
31
32 ################################################################################
33
34 # Please refer to the pack() documentation for further information
35 my %StP = (
36     1 => 'C', # Unsigned char value (no order specified, presumably ASCII)
37     2 => 'n', # Unsigned short in "network" (big-endian) order
38     4 => 'N', # Unsigned long in "network" (big-endian) order
39     8 => 'Q', # Usigned quad (no order specified, presumably machine-dependent)
40 );
41
42 sub new {
43     my $class = shift;
44     my ($args) = @_;
45
46     my $self = bless {
47         byte_size   => 4,
48
49         digest      => undef,
50         hash_size   => 16,  # In bytes
51         hash_chars  => 256, # Number of chars the algorithm uses per byte
52         max_buckets => 16,
53         num_txns    => 16, # HEAD plus 15 running txns
54         trans_id    => 0,  # Default to the HEAD
55
56         entries => {}, # This is the list of entries for transactions
57         storage => undef,
58     }, $class;
59
60     if ( defined $args->{pack_size} ) {
61         if ( lc $args->{pack_size} eq 'small' ) {
62             $args->{byte_size} = 2;
63         }
64         elsif ( lc $args->{pack_size} eq 'medium' ) {
65             $args->{byte_size} = 4;
66         }
67         elsif ( lc $args->{pack_size} eq 'large' ) {
68             $args->{byte_size} = 8;
69         }
70         else {
71             die "Unknown pack_size value: '$args->{pack_size}'\n";
72         }
73     }
74
75     # Grab the parameters we want to use
76     foreach my $param ( keys %$self ) {
77         next unless exists $args->{$param};
78         $self->{$param} = $args->{$param};
79     }
80
81     $self->{byte_pack} = $StP{ $self->byte_size };
82
83     ##
84     # Number of buckets per blist before another level of indexing is
85     # done. Increase this value for slightly greater speed, but larger database
86     # files. DO NOT decrease this value below 16, due to risk of recursive
87     # reindex overrun.
88     ##
89     if ( $self->{max_buckets} < 16 ) {
90         warn "Floor of max_buckets is 16. Setting it to 16 from '$self->{max_buckets}'\n";
91         $self->{max_buckets} = 16;
92     }
93
94     if ( !$self->{digest} ) {
95         require Digest::MD5;
96         $self->{digest} = \&Digest::MD5::md5;
97     }
98
99     return $self;
100 }
101
102 ################################################################################
103
104 sub read_value {
105     my $self = shift;
106     my ($obj, $key) = @_;
107
108     # This will be a Reference sector
109     my $sector = $self->_load_sector( $obj->_base_offset )
110         or return;
111
112     if ( $sector->staleness != $obj->_staleness ) {
113         return;
114     }
115
116     my $key_md5 = $self->_apply_digest( $key );
117
118     my $value_sector = $sector->get_data_for({
119         key_md5    => $key_md5,
120         allow_head => 1,
121     });
122
123     unless ( $value_sector ) {
124         $value_sector = DBM::Deep::Engine::Sector::Null->new({
125             engine => $self,
126             data   => undef,
127         });
128
129         $sector->write_data({
130             key_md5 => $key_md5,
131             key     => $key,
132             value   => $value_sector,
133         });
134     }
135
136     return $value_sector->data;
137 }
138
139 sub get_classname {
140     my $self = shift;
141     my ($obj) = @_;
142
143     # This will be a Reference sector
144     my $sector = $self->_load_sector( $obj->_base_offset )
145         or die "How did get_classname fail (no sector for '$obj')?!\n";
146
147     if ( $sector->staleness != $obj->_staleness ) {
148         return;
149     }
150
151     return $sector->get_classname;
152 }
153
154 sub key_exists {
155     my $self = shift;
156     my ($obj, $key) = @_;
157
158     # This will be a Reference sector
159     my $sector = $self->_load_sector( $obj->_base_offset )
160         or return '';
161
162     if ( $sector->staleness != $obj->_staleness ) {
163         return '';
164     }
165
166     my $data = $sector->get_data_for({
167         key_md5    => $self->_apply_digest( $key ),
168         allow_head => 1,
169     });
170
171     # exists() returns 1 or '' for true/false.
172     return $data ? 1 : '';
173 }
174
175 sub delete_key {
176     my $self = shift;
177     my ($obj, $key) = @_;
178
179     my $sector = $self->_load_sector( $obj->_base_offset )
180         or return;
181
182     if ( $sector->staleness != $obj->_staleness ) {
183         return;
184     }
185
186     return $sector->delete_key({
187         key_md5    => $self->_apply_digest( $key ),
188         allow_head => 0,
189     });
190 }
191
192 sub write_value {
193     my $self = shift;
194     my ($obj, $key, $value) = @_;
195
196     my $r = Scalar::Util::reftype( $value ) || '';
197     {
198         last if $r eq '';
199         last if $r eq 'HASH';
200         last if $r eq 'ARRAY';
201
202         DBM::Deep->_throw_error(
203             "Storage of references of type '$r' is not supported."
204         );
205     }
206
207     my ($class, $type);
208     if ( !defined $value ) {
209         $class = 'DBM::Deep::Engine::Sector::Null';
210     }
211     elsif ( $r eq 'ARRAY' || $r eq 'HASH' ) {
212         if ( $r eq 'ARRAY' && tied(@$value) ) {
213             DBM::Deep->_throw_error( "Cannot store something that is tied." );
214         }
215         if ( $r eq 'HASH' && tied(%$value) ) {
216             DBM::Deep->_throw_error( "Cannot store something that is tied." );
217         }
218         $class = 'DBM::Deep::Engine::Sector::Reference';
219         $type = substr( $r, 0, 1 );
220     }
221     else {
222         $class = 'DBM::Deep::Engine::Sector::Scalar';
223     }
224
225     # This will be a Reference sector
226     my $sector = $self->_load_sector( $obj->_base_offset )
227         or die "Cannot write to a deleted spot in DBM::Deep.\n";
228
229     if ( $sector->staleness != $obj->_staleness ) {
230         die "Cannot write to a deleted spot in DBM::Deep.\n";
231     }
232
233     # Create this after loading the reference sector in case something bad happens.
234     # This way, we won't allocate value sector(s) needlessly.
235     my $value_sector = $class->new({
236         engine => $self,
237         data   => $value,
238         type   => $type,
239     });
240
241     $sector->write_data({
242         key     => $key,
243         key_md5 => $self->_apply_digest( $key ),
244         value   => $value_sector,
245     });
246
247     # This code is to make sure we write all the values in the $value to the disk
248     # and to make sure all changes to $value after the assignment are reflected
249     # on disk. This may be counter-intuitive at first, but it is correct dwimmery.
250     #   NOTE - simply tying $value won't perform a STORE on each value. Hence, the
251     # copy to a temp value.
252     if ( $r eq 'ARRAY' ) {
253         my @temp = @$value;
254         tie @$value, 'DBM::Deep', {
255             base_offset => $value_sector->offset,
256             staleness   => $value_sector->staleness,
257             storage     => $self->storage,
258             engine      => $self,
259         };
260         @$value = @temp;
261         bless $value, 'DBM::Deep::Array' unless Scalar::Util::blessed( $value );
262     }
263     elsif ( $r eq 'HASH' ) {
264         my %temp = %$value;
265         tie %$value, 'DBM::Deep', {
266             base_offset => $value_sector->offset,
267             staleness   => $value_sector->staleness,
268             storage     => $self->storage,
269             engine      => $self,
270         };
271
272         %$value = %temp;
273         bless $value, 'DBM::Deep::Hash' unless Scalar::Util::blessed( $value );
274     }
275
276     return 1;
277 }
278
279 # XXX Add staleness here
280 sub get_next_key {
281     my $self = shift;
282     my ($obj, $prev_key) = @_;
283
284     # XXX Need to add logic about resetting the iterator if any key in the reference has changed
285     unless ( $prev_key ) {
286         $obj->{iterator} = DBM::Deep::Iterator->new({
287             base_offset => $obj->_base_offset,
288             engine      => $self,
289         });
290     }
291
292     return $obj->{iterator}->get_next_key( $obj );
293 }
294
295 ################################################################################
296
297 sub setup_fh {
298     my $self = shift;
299     my ($obj) = @_;
300
301     # We're opening the file.
302     unless ( $obj->_base_offset ) {
303         my $bytes_read = $self->_read_file_header;
304
305         # Creating a new file
306         unless ( $bytes_read ) {
307             $self->_write_file_header;
308
309             # 1) Create Array/Hash entry
310             my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
311                 engine => $self,
312                 type   => $obj->_type,
313             });
314             $obj->{base_offset} = $initial_reference->offset;
315             $obj->{staleness} = $initial_reference->staleness;
316
317             $self->storage->flush;
318         }
319         # Reading from an existing file
320         else {
321             $obj->{base_offset} = $bytes_read;
322             my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
323                 engine => $self,
324                 offset => $obj->_base_offset,
325             });
326             unless ( $initial_reference ) {
327                 DBM::Deep->_throw_error("Corrupted file, no master index record");
328             }
329
330             unless ($obj->_type eq $initial_reference->type) {
331                 DBM::Deep->_throw_error("File type mismatch");
332             }
333
334             $obj->{staleness} = $initial_reference->staleness;
335         }
336     }
337
338     return 1;
339 }
340
341 sub begin_work {
342     my $self = shift;
343     my ($obj) = @_;
344
345     if ( $self->trans_id ) {
346         DBM::Deep->_throw_error( "Cannot begin_work within a transaction" );
347     }
348
349     my @slots = $self->read_txn_slots;
350     for my $i ( 1 .. @slots ) {
351         next if $slots[$i];
352         $slots[$i] = 1;
353         $self->set_trans_id( $i );
354         last;
355     }
356     $self->write_txn_slots( @slots );
357
358     if ( !$self->trans_id ) {
359         DBM::Deep->_throw_error( "Cannot begin_work - no available transactions" );
360     }
361
362     return;
363 }
364
365 sub rollback {
366     my $self = shift;
367     my ($obj) = @_;
368
369     if ( !$self->trans_id ) {
370         DBM::Deep->_throw_error( "Cannot rollback without a transaction" );
371     }
372
373     # Each entry is the file location for a bucket that has a modification for
374     # this transaction. The entries need to be expunged.
375     foreach my $entry (@{ $self->get_entries } ) {
376         # Remove the entry here
377         my $read_loc = $entry
378           + $self->hash_size
379           + $self->byte_size
380           + $self->trans_id * ( $self->byte_size + 4 );
381
382         my $data_loc = $self->storage->read_at( $read_loc, $self->byte_size );
383         $data_loc = unpack( $StP{$self->byte_size}, $data_loc );
384         $self->storage->print_at( $read_loc, pack( $StP{$self->byte_size}, 0 ) );
385
386         if ( $data_loc > 1 ) {
387             $self->_load_sector( $data_loc )->free;
388         }
389     }
390
391     $self->clear_entries;
392
393     my @slots = $self->read_txn_slots;
394     $slots[$self->trans_id] = 0;
395     $self->write_txn_slots( @slots );
396     $self->inc_txn_staleness_counter( $self->trans_id );
397     $self->set_trans_id( 0 );
398
399     return 1;
400 }
401
402 sub commit {
403     my $self = shift;
404     my ($obj) = @_;
405
406     if ( !$self->trans_id ) {
407         DBM::Deep->_throw_error( "Cannot commit without a transaction" );
408     }
409
410     foreach my $entry (@{ $self->get_entries } ) {
411         # Overwrite the entry in head with the entry in trans_id
412         my $base = $entry
413           + $self->hash_size
414           + $self->byte_size;
415
416         my $head_loc = $self->storage->read_at( $base, $self->byte_size );
417         $head_loc = unpack( $StP{$self->byte_size}, $head_loc );
418         my $trans_loc = $self->storage->read_at(
419             $base + $self->trans_id * ( $self->byte_size + 4 ), $self->byte_size,
420         );
421
422         $self->storage->print_at( $base, $trans_loc );
423         $self->storage->print_at(
424             $base + $self->trans_id * ( $self->byte_size + 4 ),
425             pack( $StP{$self->byte_size} . ' N', (0) x 2 ),
426         );
427
428         if ( $head_loc > 1 ) {
429             $self->_load_sector( $head_loc )->free;
430         }
431     }
432
433     $self->clear_entries;
434
435     my @slots = $self->read_txn_slots;
436     $slots[$self->trans_id] = 0;
437     $self->write_txn_slots( @slots );
438     $self->inc_txn_staleness_counter( $self->trans_id );
439     $self->set_trans_id( 0 );
440
441     return 1;
442 }
443
444 sub read_txn_slots {
445     my $self = shift;
446     return split '', unpack( 'b32',
447         $self->storage->read_at(
448             $self->trans_loc, 4,
449         )
450     );
451 }
452
453 sub write_txn_slots {
454     my $self = shift;
455     $self->storage->print_at( $self->trans_loc,
456         pack( 'b32', join('', @_) ),
457     );
458 }
459
460 sub get_running_txn_ids {
461     my $self = shift;
462     my @transactions = $self->read_txn_slots;
463     my @trans_ids = grep { $transactions[$_] } 0 .. $#transactions;
464 }
465
466 sub get_txn_staleness_counter {
467     my $self = shift;
468     my ($trans_id) = @_;
469
470     # Hardcode staleness of 0 for the HEAD
471     return 0 unless $trans_id;
472
473     my $x = unpack( 'N',
474         $self->storage->read_at(
475             $self->trans_loc + 4 * $trans_id,
476             4,
477         )
478     );
479     return $x;
480 }
481
482 sub inc_txn_staleness_counter {
483     my $self = shift;
484     my ($trans_id) = @_;
485
486     # Hardcode staleness of 0 for the HEAD
487     return unless $trans_id;
488
489     $self->storage->print_at(
490         $self->trans_loc + 4 * $trans_id,
491         pack( 'N', $self->get_txn_staleness_counter( $trans_id ) + 1 ),
492     );
493 }
494
495 sub get_entries {
496     my $self = shift;
497     return [ keys %{ $self->{entries}{$self->trans_id} ||= {} } ];
498 }
499
500 sub add_entry {
501     my $self = shift;
502     my ($trans_id, $loc) = @_;
503
504     $self->{entries}{$trans_id} ||= {};
505     $self->{entries}{$trans_id}{$loc} = undef;
506 }
507
508 sub clear_entries {
509     my $self = shift;
510     delete $self->{entries}{$self->trans_id};
511 }
512
513 ################################################################################
514
515 {
516     my $header_fixed = length( SIG_FILE ) + 1 + 4 + 4;
517
518     sub _write_file_header {
519         my $self = shift;
520
521         my $header_var = 1 + 1 + 4 + 4 * $self->num_txns + 3 * $self->byte_size;
522
523         my $loc = $self->storage->request_space( $header_fixed + $header_var );
524
525         $self->storage->print_at( $loc,
526             SIG_FILE,
527             SIG_HEADER,
528             pack('N', 1),           # header version - at this point, we're at 9 bytes
529             pack('N', $header_var), # header size
530             # --- Above is $header_fixed. Below is $header_var
531             pack('C', $self->byte_size),
532             pack('C', $self->max_buckets),
533             pack('N', 0 ),                   # Transaction activeness bitfield
534             pack('N' . $self->num_txns, 0 x $self->num_txns ), # Transaction staleness counters
535             pack($StP{$self->byte_size}, 0), # Start of free chain (blist size)
536             pack($StP{$self->byte_size}, 0), # Start of free chain (data size)
537             pack($StP{$self->byte_size}, 0), # Start of free chain (index size)
538         );
539
540         $self->set_trans_loc( $header_fixed + 2 );
541         $self->set_chains_loc( $header_fixed + 2 + 4 + 4 * $self->num_txns );
542
543         return;
544     }
545
546     sub _read_file_header {
547         my $self = shift;
548
549         my $buffer = $self->storage->read_at( 0, $header_fixed );
550         return unless length($buffer);
551
552         my ($file_signature, $sig_header, $header_version, $size) = unpack(
553             'A4 A N N', $buffer
554         );
555
556         unless ( $file_signature eq SIG_FILE ) {
557             $self->storage->close;
558             DBM::Deep->_throw_error( "Signature not found -- file is not a Deep DB" );
559         }
560
561         unless ( $sig_header eq SIG_HEADER ) {
562             $self->storage->close;
563             DBM::Deep->_throw_error( "Old file version found." );
564         }
565
566         my $buffer2 = $self->storage->read_at( undef, $size );
567         my @values = unpack( 'C C', $buffer2 );
568
569         $self->set_trans_loc( $header_fixed + 2 );
570         $self->set_chains_loc( $header_fixed + 2 + 4 + 4 * $self->num_txns );
571
572         if ( @values < 2 || grep { !defined } @values ) {
573             $self->storage->close;
574             DBM::Deep->_throw_error("Corrupted file - bad header");
575         }
576
577         #XXX Add warnings if values weren't set right
578         @{$self}{qw(byte_size max_buckets)} = @values;
579
580         my $header_var = 1 + 1 + 4 + 4 * $self->num_txns + 3 * $self->byte_size;
581         unless ( $size eq $header_var ) {
582             $self->storage->close;
583             DBM::Deep->_throw_error( "Unexpected size found ($size <-> $header_var)." );
584         }
585
586         return length($buffer) + length($buffer2);
587     }
588 }
589
590 sub _load_sector {
591     my $self = shift;
592     my ($offset) = @_;
593
594     # Add a catch for offset of 0 or 1
595     return if $offset <= 1;
596
597     my $type = $self->storage->read_at( $offset, 1 );
598     return if $type eq chr(0);
599
600     if ( $type eq $self->SIG_ARRAY || $type eq $self->SIG_HASH ) {
601         return DBM::Deep::Engine::Sector::Reference->new({
602             engine => $self,
603             type   => $type,
604             offset => $offset,
605         });
606     }
607     # XXX Don't we need key_md5 here?
608     elsif ( $type eq $self->SIG_BLIST ) {
609         return DBM::Deep::Engine::Sector::BucketList->new({
610             engine => $self,
611             type   => $type,
612             offset => $offset,
613         });
614     }
615     elsif ( $type eq $self->SIG_INDEX ) {
616         return DBM::Deep::Engine::Sector::Index->new({
617             engine => $self,
618             type   => $type,
619             offset => $offset,
620         });
621     }
622     elsif ( $type eq $self->SIG_NULL ) {
623         return DBM::Deep::Engine::Sector::Null->new({
624             engine => $self,
625             type   => $type,
626             offset => $offset,
627         });
628     }
629     elsif ( $type eq $self->SIG_DATA ) {
630         return DBM::Deep::Engine::Sector::Scalar->new({
631             engine => $self,
632             type   => $type,
633             offset => $offset,
634         });
635     }
636     # This was deleted from under us, so just return and let the caller figure it out.
637     elsif ( $type eq $self->SIG_FREE ) {
638         return;
639     }
640
641     die "'$offset': Don't know what to do with type '$type'\n";
642 }
643
644 sub _apply_digest {
645     my $self = shift;
646     return $self->{digest}->(@_);
647 }
648
649 sub _add_free_blist_sector { shift->_add_free_sector( 0, @_ ) }
650 sub _add_free_data_sector { shift->_add_free_sector( 1, @_ ) }
651 sub _add_free_index_sector { shift->_add_free_sector( 2, @_ ) }
652
653 sub _add_free_sector {
654     my $self = shift;
655     my ($multiple, $offset, $size) = @_;
656
657     my $chains_offset = $multiple * $self->byte_size;
658
659     my $storage = $self->storage;
660
661     # Increment staleness.
662     # XXX Can this increment+modulo be done by "&= 0x1" ?
663     my $staleness = unpack( $StP{STALE_SIZE()}, $storage->read_at( $offset + SIG_SIZE, STALE_SIZE ) );
664     $staleness = ($staleness + 1 ) % ( 2 ** ( 8 * STALE_SIZE ) );
665     $storage->print_at( $offset + SIG_SIZE, pack( $StP{STALE_SIZE()}, $staleness ) );
666
667     my $old_head = $storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
668
669     $storage->print_at( $self->chains_loc + $chains_offset,
670         pack( $StP{$self->byte_size}, $offset ),
671     );
672
673     # Record the old head in the new sector after the signature and staleness counter
674     $storage->print_at( $offset + SIG_SIZE + STALE_SIZE, $old_head );
675 }
676
677 sub _request_blist_sector { shift->_request_sector( 0, @_ ) }
678 sub _request_data_sector { shift->_request_sector( 1, @_ ) }
679 sub _request_index_sector { shift->_request_sector( 2, @_ ) }
680
681 sub _request_sector {
682     my $self = shift;
683     my ($multiple, $size) = @_;
684
685     my $chains_offset = $multiple * $self->byte_size;
686
687     my $old_head = $self->storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
688     my $loc = unpack( $StP{$self->byte_size}, $old_head );
689
690     # We don't have any free sectors of the right size, so allocate a new one.
691     unless ( $loc ) {
692         my $offset = $self->storage->request_space( $size );
693
694         # Zero out the new sector. This also guarantees correct increases
695         # in the filesize.
696         $self->storage->print_at( $offset, chr(0) x $size );
697
698         return $offset;
699     }
700
701     # Read the new head after the signature and the staleness counter
702     my $new_head = $self->storage->read_at( $loc + SIG_SIZE + STALE_SIZE, $self->byte_size );
703     $self->storage->print_at( $self->chains_loc + $chains_offset, $new_head );
704
705     return $loc;
706 }
707
708 ################################################################################
709
710 sub storage     { $_[0]{storage} }
711 sub byte_size   { $_[0]{byte_size} }
712 sub hash_size   { $_[0]{hash_size} }
713 sub hash_chars  { $_[0]{hash_chars} }
714 sub num_txns    { $_[0]{num_txns} }
715 sub max_buckets { $_[0]{max_buckets} }
716 sub blank_md5   { chr(0) x $_[0]->hash_size }
717
718 sub trans_id     { $_[0]{trans_id} }
719 sub set_trans_id { $_[0]{trans_id} = $_[1] }
720
721 sub trans_loc     { $_[0]{trans_loc} }
722 sub set_trans_loc { $_[0]{trans_loc} = $_[1] }
723
724 sub chains_loc     { $_[0]{chains_loc} }
725 sub set_chains_loc { $_[0]{chains_loc} = $_[1] }
726
727 ################################################################################
728
729 package DBM::Deep::Iterator;
730
731 sub new {
732     my $class = shift;
733     my ($args) = @_;
734
735     my $self = bless {
736         breadcrumbs => [],
737         engine      => $args->{engine},
738         base_offset => $args->{base_offset},
739     }, $class;
740
741     Scalar::Util::weaken( $self->{engine} );
742
743     return $self;
744 }
745
746 sub reset { $_[0]{breadcrumbs} = [] }
747
748 sub get_sector_iterator {
749     my $self = shift;
750     my ($loc) = @_;
751
752     my $sector = $self->{engine}->_load_sector( $loc )
753         or return;
754
755     if ( $sector->isa( 'DBM::Deep::Engine::Sector::Index' ) ) {
756         return DBM::Deep::Iterator::Index->new({
757             iterator => $self,
758             sector   => $sector,
759         });
760     }
761     elsif ( $sector->isa( 'DBM::Deep::Engine::Sector::BucketList' ) ) {
762         return DBM::Deep::Iterator::BucketList->new({
763             iterator => $self,
764             sector   => $sector,
765         });
766     }
767     else {
768         die "Why did $loc make a $sector?";
769     }
770 }
771
772 sub get_next_key {
773     my $self = shift;
774     my ($obj) = @_;
775
776     my $crumbs = $self->{breadcrumbs};
777     my $e = $self->{engine};
778
779     unless ( @$crumbs ) {
780         # This will be a Reference sector
781         my $sector = $e->_load_sector( $self->{base_offset} )
782             # If no sector is found, thist must have been deleted from under us.
783             or return;
784
785         if ( $sector->staleness != $obj->_staleness ) {
786             return;
787         }
788
789         my $loc = $sector->get_blist_loc
790             or return;
791
792         push @$crumbs, $self->get_sector_iterator( $loc );
793     }
794
795     FIND_NEXT_KEY: {
796         # We're at the end.
797         unless ( @$crumbs ) {
798             $self->reset;
799             return;
800         }
801
802         my $iterator = $crumbs->[-1];
803
804         # This level is done.
805         if ( $iterator->at_end ) {
806             pop @$crumbs;
807             redo FIND_NEXT_KEY;
808         }
809
810         if ( $iterator->isa( 'DBM::Deep::Iterator::Index' ) ) {
811             # If we don't have any more, it will be caught at the
812             # prior check.
813             if ( my $next = $iterator->get_next_iterator ) {
814                 push @$crumbs, $next;
815             }
816             redo FIND_NEXT_KEY;
817         }
818
819         unless ( $iterator->isa( 'DBM::Deep::Iterator::BucketList' ) ) {
820             DBM::Deep->_throw_error(
821                 "Should have a bucketlist iterator here - instead have $iterator"
822             );
823         }
824
825         # At this point, we have a BucketList iterator
826         my $key = $iterator->get_next_key;
827         if ( defined $key ) {
828             return $key;
829         }
830
831         # We hit the end of the bucketlist iterator, so redo
832         redo FIND_NEXT_KEY;
833     }
834
835     DBM::Deep->_throw_error( "get_next_key(): How did we get here?" );
836 }
837
838 package DBM::Deep::Iterator::Index;
839
840 sub new {
841     my $self = bless $_[1] => $_[0];
842     $self->{curr_index} = 0;
843     return $self;
844 }
845
846 sub at_end {
847     my $self = shift;
848     return $self->{curr_index} >= $self->{iterator}{engine}->hash_chars;
849 }
850
851 sub get_next_iterator {
852     my $self = shift;
853
854     my $loc;
855     while ( !$loc ) {
856         return if $self->at_end;
857         $loc = $self->{sector}->get_entry( $self->{curr_index}++ );
858     }
859
860     return $self->{iterator}->get_sector_iterator( $loc );
861 }
862
863 package DBM::Deep::Iterator::BucketList;
864
865 sub new {
866     my $self = bless $_[1] => $_[0];
867     $self->{curr_index} = 0;
868     return $self;
869 }
870
871 sub at_end {
872     my $self = shift;
873     return $self->{curr_index} >= $self->{iterator}{engine}->max_buckets;
874 }
875
876 sub get_next_key {
877     my $self = shift;
878
879     return if $self->at_end;
880
881     my $data_loc = $self->{sector}->get_data_location_for({
882         allow_head => 1,
883         idx => $self->{curr_index}++,
884     }) or return;
885
886     my $key_sector = $self->{sector}->get_key_for( $self->{curr_index} - 1 );
887
888     #XXX Is this check necessary now?
889     return unless $key_sector;
890
891     return $key_sector->data;
892 }
893
894 package DBM::Deep::Engine::Sector;
895
896 sub new {
897     my $self = bless $_[1], $_[0];
898     Scalar::Util::weaken( $self->{engine} );
899     $self->_init;
900     return $self;
901 }
902 sub _init {}
903 sub clone { die "Must be implemented in the child class" }
904
905 sub engine { $_[0]{engine} }
906 sub offset { $_[0]{offset} }
907 sub type   { $_[0]{type} }
908
909 sub base_size {
910    my $self = shift;
911    return $self->engine->SIG_SIZE + $self->engine->STALE_SIZE;
912 }
913
914 sub free {
915     my $self = shift;
916
917     my $e = $self->engine;
918
919     $e->storage->print_at( $self->offset, $e->SIG_FREE );
920     # Skip staleness counter
921     $e->storage->print_at( $self->offset + $self->base_size,
922         chr(0) x ($self->size - $self->base_size),
923     );
924
925     my $free_meth = $self->free_meth;
926     $e->$free_meth( $self->offset, $self->size );
927
928     return;
929 }
930
931 package DBM::Deep::Engine::Sector::Data;
932
933 our @ISA = qw( DBM::Deep::Engine::Sector );
934
935 # This is in bytes
936 sub size { return 256 }
937 sub free_meth { return '_add_free_data_sector' }
938
939 sub clone {
940     my $self = shift;
941     return ref($self)->new({
942         engine => $self->engine,
943         data   => $self->data,
944         type   => $self->type,
945     });
946 }
947
948 package DBM::Deep::Engine::Sector::Scalar;
949
950 our @ISA = qw( DBM::Deep::Engine::Sector::Data );
951
952 sub free {
953     my $self = shift;
954
955     my $chain_loc = $self->chain_loc;
956
957     $self->SUPER::free();
958
959     if ( $chain_loc ) {
960         $self->engine->_load_sector( $chain_loc )->free;
961     }
962
963     return;
964 }
965
966 sub type { $_[0]{engine}->SIG_DATA }
967 sub _init {
968     my $self = shift;
969
970     my $engine = $self->engine;
971
972     unless ( $self->offset ) {
973         my $data_section = $self->size - $self->base_size - 1 * $engine->byte_size - 1;
974
975         $self->{offset} = $engine->_request_data_sector( $self->size );
976
977         my $data = delete $self->{data};
978         my $dlen = length $data;
979         my $continue = 1;
980         my $curr_offset = $self->offset;
981         while ( $continue ) {
982
983             my $next_offset = 0;
984
985             my ($leftover, $this_len, $chunk);
986             if ( $dlen > $data_section ) {
987                 $leftover = 0;
988                 $this_len = $data_section;
989                 $chunk = substr( $data, 0, $this_len );
990
991                 $dlen -= $data_section;
992                 $next_offset = $engine->_request_data_sector( $self->size );
993                 $data = substr( $data, $this_len );
994             }
995             else {
996                 $leftover = $data_section - $dlen;
997                 $this_len = $dlen;
998                 $chunk = $data;
999
1000                 $continue = 0;
1001             }
1002
1003             $engine->storage->print_at( $curr_offset, $self->type ); # Sector type
1004             # Skip staleness
1005             $engine->storage->print_at( $curr_offset + $self->base_size,
1006                 pack( $StP{$engine->byte_size}, $next_offset ),  # Chain loc
1007                 pack( $StP{1}, $this_len ),                      # Data length
1008                 $chunk,                                          # Data to be stored in this sector
1009                 chr(0) x $leftover,                              # Zero-fill the rest
1010             );
1011
1012             $curr_offset = $next_offset;
1013         }
1014
1015         return;
1016     }
1017 }
1018
1019 sub data_length {
1020     my $self = shift;
1021
1022     my $buffer = $self->engine->storage->read_at(
1023         $self->offset + $self->base_size + $self->engine->byte_size, 1
1024     );
1025
1026     return unpack( $StP{1}, $buffer );
1027 }
1028
1029 sub chain_loc {
1030     my $self = shift;
1031     my $chain_loc = $self->engine->storage->read_at(
1032         $self->offset + $self->base_size, $self->engine->byte_size,
1033     );
1034     return unpack( $StP{$self->engine->byte_size}, $chain_loc );
1035 }
1036
1037 sub data {
1038     my $self = shift;
1039
1040     my $data;
1041     while ( 1 ) {
1042         my $chain_loc = $self->chain_loc;
1043
1044         $data .= $self->engine->storage->read_at(
1045             $self->offset + $self->base_size + $self->engine->byte_size + 1, $self->data_length,
1046         );
1047
1048         last unless $chain_loc;
1049
1050         $self = $self->engine->_load_sector( $chain_loc );
1051     }
1052
1053     return $data;
1054 }
1055
1056 package DBM::Deep::Engine::Sector::Null;
1057
1058 our @ISA = qw( DBM::Deep::Engine::Sector::Data );
1059
1060 sub type { $_[0]{engine}->SIG_NULL }
1061 sub data_length { 0 }
1062 sub data { return }
1063
1064 sub _init {
1065     my $self = shift;
1066
1067     my $engine = $self->engine;
1068
1069     unless ( $self->offset ) {
1070         my $leftover = $self->size - $self->base_size - 1 * $engine->byte_size - 1;
1071
1072         $self->{offset} = $engine->_request_data_sector( $self->size );
1073         $engine->storage->print_at( $self->offset, $self->type ); # Sector type
1074         # Skip staleness counter
1075         $engine->storage->print_at( $self->offset + $self->base_size,
1076             pack( $StP{$engine->byte_size}, 0 ),  # Chain loc
1077             pack( $StP{1}, $self->data_length ),  # Data length
1078             chr(0) x $leftover,                   # Zero-fill the rest
1079         );
1080
1081         return;
1082     }
1083 }
1084
1085 package DBM::Deep::Engine::Sector::Reference;
1086
1087 our @ISA = qw( DBM::Deep::Engine::Sector::Data );
1088
1089 sub _init {
1090     my $self = shift;
1091
1092     my $e = $self->engine;
1093
1094     unless ( $self->offset ) {
1095         my $classname = Scalar::Util::blessed( delete $self->{data} );
1096         my $leftover = $self->size - $self->base_size - 2 * $e->byte_size;
1097
1098         my $class_offset = 0;
1099         if ( defined $classname ) {
1100             my $class_sector = DBM::Deep::Engine::Sector::Scalar->new({
1101                 engine => $e,
1102                 data   => $classname,
1103             });
1104             $class_offset = $class_sector->offset;
1105         }
1106
1107         $self->{offset} = $e->_request_data_sector( $self->size );
1108         $e->storage->print_at( $self->offset, $self->type ); # Sector type
1109         # Skip staleness counter
1110         $e->storage->print_at( $self->offset + $self->base_size,
1111             pack( $StP{$e->byte_size}, 0 ),             # Index/BList loc
1112             pack( $StP{$e->byte_size}, $class_offset ), # Classname loc
1113             chr(0) x $leftover,                         # Zero-fill the rest
1114         );
1115     }
1116     else {
1117         $self->{type} = $e->storage->read_at( $self->offset, 1 );
1118     }
1119
1120     $self->{staleness} = unpack(
1121         $StP{$e->STALE_SIZE},
1122         $e->storage->read_at( $self->offset + $e->SIG_SIZE, $e->STALE_SIZE ),
1123     );
1124
1125     return;
1126 }
1127
1128 sub free {
1129     my $self = shift;
1130
1131     my $blist_loc = $self->get_blist_loc;
1132     $self->engine->_load_sector( $blist_loc )->free if $blist_loc;
1133
1134     my $class_loc = $self->get_class_offset;
1135     $self->engine->_load_sector( $class_loc )->free if $class_loc;
1136
1137     $self->SUPER::free();
1138 }
1139
1140 sub staleness { $_[0]{staleness} }
1141
1142 sub get_data_for {
1143     my $self = shift;
1144     my ($args) = @_;
1145
1146     # Assume that the head is not allowed unless otherwise specified.
1147     $args->{allow_head} = 0 unless exists $args->{allow_head};
1148
1149     # Assume we don't create a new blist location unless otherwise specified.
1150     $args->{create} = 0 unless exists $args->{create};
1151
1152     my $blist = $self->get_bucket_list({
1153         key_md5 => $args->{key_md5},
1154         key => $args->{key},
1155         create  => $args->{create},
1156     });
1157     return unless $blist && $blist->{found};
1158
1159     # At this point, $blist knows where the md5 is. What it -doesn't- know yet
1160     # is whether or not this transaction has this key. That's part of the next
1161     # function call.
1162     my $location = $blist->get_data_location_for({
1163         allow_head => $args->{allow_head},
1164     }) or return;
1165
1166     return $self->engine->_load_sector( $location );
1167 }
1168
1169 sub write_data {
1170     my $self = shift;
1171     my ($args) = @_;
1172
1173     my $blist = $self->get_bucket_list({
1174         key_md5 => $args->{key_md5},
1175         key => $args->{key},
1176         create  => 1,
1177     }) or die "How did write_data fail (no blist)?!\n";
1178
1179     # Handle any transactional bookkeeping.
1180     if ( $self->engine->trans_id ) {
1181         if ( ! $blist->has_md5 ) {
1182             $blist->mark_deleted({
1183                 trans_id => 0,
1184             });
1185         }
1186     }
1187     else {
1188         my @trans_ids = $self->engine->get_running_txn_ids;
1189         if ( $blist->has_md5 ) {
1190             if ( @trans_ids ) {
1191                 my $old_value = $blist->get_data_for;
1192                 foreach my $other_trans_id ( @trans_ids ) {
1193                     next if $blist->get_data_location_for({
1194                         trans_id   => $other_trans_id,
1195                         allow_head => 0,
1196                     });
1197                     $blist->write_md5({
1198                         trans_id => $other_trans_id,
1199                         key      => $args->{key},
1200                         key_md5  => $args->{key_md5},
1201                         value    => $old_value->clone,
1202                     });
1203                 }
1204             }
1205         }
1206         else {
1207             if ( @trans_ids ) {
1208                 foreach my $other_trans_id ( @trans_ids ) {
1209                     next if $blist->get_data_location_for({ trans_id => $other_trans_id, allow_head => 0 });
1210                     $blist->mark_deleted({
1211                         trans_id => $other_trans_id,
1212                     });
1213                 }
1214             }
1215         }
1216     }
1217
1218     #XXX Is this safe to do transactionally?
1219     # Free the place we're about to write to.
1220     if ( $blist->get_data_location_for({ allow_head => 0 }) ) {
1221         $blist->get_data_for({ allow_head => 0 })->free;
1222     }
1223
1224     $blist->write_md5({
1225         key      => $args->{key},
1226         key_md5  => $args->{key_md5},
1227         value    => $args->{value},
1228     });
1229 }
1230
1231 sub delete_key {
1232     my $self = shift;
1233     my ($args) = @_;
1234
1235     # XXX What should happen if this fails?
1236     my $blist = $self->get_bucket_list({
1237         key_md5 => $args->{key_md5},
1238     }) or die "How did delete_key fail (no blist)?!\n";
1239
1240     # Save the location so that we can free the data
1241     my $location = $blist->get_data_location_for({
1242         allow_head => 0,
1243     });
1244     my $old_value = $location && $self->engine->_load_sector( $location );
1245
1246     if ( $self->engine->trans_id == 0 ) {
1247         my @trans_ids = $self->engine->get_running_txn_ids;
1248         if ( @trans_ids ) {
1249             foreach my $other_trans_id ( @trans_ids ) {
1250                 next if $blist->get_data_location_for({ trans_id => $other_trans_id, allow_head => 0 });
1251                 $blist->write_md5({
1252                     trans_id => $other_trans_id,
1253                     key      => $args->{key},
1254                     key_md5  => $args->{key_md5},
1255                     value    => $old_value->clone,
1256                 });
1257             }
1258         }
1259     }
1260
1261     $blist->mark_deleted( $args );
1262
1263     my $data;
1264     if ( $old_value ) {
1265         $data = $old_value->data;
1266         $old_value->free;
1267     }
1268
1269     return $data;
1270 }
1271
1272 sub get_blist_loc {
1273     my $self = shift;
1274
1275     my $e = $self->engine;
1276     my $blist_loc = $e->storage->read_at( $self->offset + $self->base_size, $e->byte_size );
1277     return unpack( $StP{$e->byte_size}, $blist_loc );
1278 }
1279
1280 sub get_bucket_list {
1281     my $self = shift;
1282     my ($args) = @_;
1283     $args ||= {};
1284
1285     # XXX Add in check here for recycling?
1286
1287     my $engine = $self->engine;
1288
1289     my $blist_loc = $self->get_blist_loc;
1290
1291     # There's no index or blist yet
1292     unless ( $blist_loc ) {
1293         return unless $args->{create};
1294
1295         my $blist = DBM::Deep::Engine::Sector::BucketList->new({
1296             engine  => $engine,
1297             key_md5 => $args->{key_md5},
1298         });
1299
1300         $engine->storage->print_at( $self->offset + $self->base_size,
1301             pack( $StP{$engine->byte_size}, $blist->offset ),
1302         );
1303
1304         return $blist;
1305     }
1306
1307     # Add searching here through the index layers, if any
1308     my $sector = $engine->_load_sector( $blist_loc )
1309         or die "Cannot read sector at $blist_loc in get_bucket_list()";
1310     my $i = 0;
1311     my $last_sector = undef;
1312     while ( $sector->isa( 'DBM::Deep::Engine::Sector::Index' ) ) {
1313         $blist_loc = $sector->get_entry( ord( substr( $args->{key_md5}, $i++, 1 ) ) );
1314         $last_sector = $sector;
1315         if ( $blist_loc ) {
1316             $sector = $engine->_load_sector( $blist_loc )
1317                 or die "Cannot read sector at $blist_loc in get_bucket_list()";
1318         }
1319         else {
1320             $sector = undef;
1321             last;
1322         }
1323     }
1324
1325     # This means we went through the Index sector(s) and found an empty slot
1326     unless ( $sector ) {
1327         return unless $args->{create};
1328
1329         die "No last_sector when attempting to build a new entry"
1330             unless $last_sector;
1331
1332         my $blist = DBM::Deep::Engine::Sector::BucketList->new({
1333             engine  => $engine,
1334             key_md5 => $args->{key_md5},
1335         });
1336
1337         $last_sector->set_entry( ord( substr( $args->{key_md5}, $i - 1, 1 ) ) => $blist->offset );
1338
1339         return $blist;
1340     }
1341
1342     $sector->find_md5( $args->{key_md5} );
1343
1344     # See whether or not we need to reindex the bucketlist
1345     if ( !$sector->has_md5 && $args->{create} && $sector->{idx} == -1 ) {
1346         my $new_index = DBM::Deep::Engine::Sector::Index->new({
1347             engine => $engine,
1348         });
1349
1350         my %blist_cache;
1351         foreach my $md5 ( $sector->chopped_up ) {
1352             my $idx = ord( substr( $md5, $i, 1 ) );
1353
1354             # XXX This is inefficient
1355             my $blist = $blist_cache{$idx}
1356                 ||= DBM::Deep::Engine::Sector::BucketList->new({
1357                     engine => $engine,
1358                 });
1359
1360             $new_index->set_entry( $idx => $blist->offset );
1361
1362             $blist->write_at_next_open( $md5 );
1363         }
1364
1365         # Handle the new item separately.
1366         {
1367             my $idx = ord( substr( $args->{key_md5}, $i, 1 ) );
1368             my $blist = $blist_cache{$idx}
1369                 ||= DBM::Deep::Engine::Sector::BucketList->new({
1370                     engine => $engine,
1371                 });
1372
1373             $new_index->set_entry( $idx => $blist->offset );
1374
1375             #XXX THIS IS HACKY!
1376             $blist->find_md5( $args->{key_md5} );
1377             $blist->write_md5({
1378                 key     => $args->{key},
1379                 key_md5 => $args->{key_md5},
1380                 value   => DBM::Deep::Engine::Sector::Null->new({
1381                     engine => $engine,
1382                     data   => undef,
1383                 }),
1384             });
1385         }
1386
1387         if ( $last_sector ) {
1388             $last_sector->set_entry(
1389                 ord( substr( $args->{key_md5}, $i - 1, 1 ) ),
1390                 $new_index->offset,
1391             );
1392         } else {
1393             $engine->storage->print_at( $self->offset + $self->base_size,
1394                 pack( $StP{$engine->byte_size}, $new_index->offset ),
1395             );
1396         }
1397
1398         $sector->free;
1399
1400         $sector = $blist_cache{ ord( substr( $args->{key_md5}, $i, 1 ) ) };
1401         $sector->find_md5( $args->{key_md5} );
1402     }
1403
1404     return $sector;
1405 }
1406
1407 sub get_class_offset {
1408     my $self = shift;
1409
1410     my $e = $self->engine;
1411     return unpack(
1412         $StP{$e->byte_size},
1413         $e->storage->read_at(
1414             $self->offset + $self->base_size + 1 * $e->byte_size, $e->byte_size,
1415         ),
1416     );
1417 }
1418
1419 sub get_classname {
1420     my $self = shift;
1421
1422     my $class_offset = $self->get_class_offset;
1423
1424     return unless $class_offset;
1425
1426     return $self->engine->_load_sector( $class_offset )->data;
1427 }
1428
1429 sub data {
1430     my $self = shift;
1431
1432     my $new_obj = DBM::Deep->new({
1433         type        => $self->type,
1434         base_offset => $self->offset,
1435         staleness   => $self->staleness,
1436         storage     => $self->engine->storage,
1437         engine      => $self->engine,
1438     });
1439
1440     if ( $self->engine->storage->{autobless} ) {
1441         my $classname = $self->get_classname;
1442         if ( defined $classname ) {
1443             bless $new_obj, $classname;
1444         }
1445     }
1446
1447     return $new_obj;
1448 }
1449
1450 package DBM::Deep::Engine::Sector::BucketList;
1451
1452 our @ISA = qw( DBM::Deep::Engine::Sector );
1453
1454 sub _init {
1455     my $self = shift;
1456
1457     my $engine = $self->engine;
1458
1459     unless ( $self->offset ) {
1460         my $leftover = $self->size - $self->base_size;
1461
1462         $self->{offset} = $engine->_request_blist_sector( $self->size );
1463         $engine->storage->print_at( $self->offset, $engine->SIG_BLIST ); # Sector type
1464         # Skip staleness counter
1465         $engine->storage->print_at( $self->offset + $self->base_size,
1466             chr(0) x $leftover, # Zero-fill the data
1467         );
1468     }
1469
1470     if ( $self->{key_md5} ) {
1471         $self->find_md5;
1472     }
1473
1474     return $self;
1475 }
1476
1477 sub size {
1478     my $self = shift;
1479     unless ( $self->{size} ) {
1480         my $e = $self->engine;
1481         # Base + numbuckets * bucketsize
1482         $self->{size} = $self->base_size + $e->max_buckets * $self->bucket_size;
1483     }
1484     return $self->{size};
1485 }
1486
1487 sub free_meth { return '_add_free_blist_sector' }
1488
1489 sub bucket_size {
1490     my $self = shift;
1491     unless ( $self->{bucket_size} ) {
1492         my $e = $self->engine;
1493         # Key + head (location) + transactions (location + staleness-counter)
1494         my $location_size = $e->byte_size + $e->num_txns * ( $e->byte_size + 4 );
1495         $self->{bucket_size} = $e->hash_size + $location_size;
1496     }
1497     return $self->{bucket_size};
1498 }
1499
1500 sub chopped_up {
1501     my $self = shift;
1502
1503     my $e = $self->engine;
1504
1505     my @buckets;
1506     foreach my $idx ( 0 .. $e->max_buckets - 1 ) {
1507         my $md5 = $e->storage->read_at(
1508             $self->offset + $self->base_size + $idx * $self->bucket_size, $e->hash_size,
1509         );
1510
1511         last if $md5 eq $e->blank_md5;
1512
1513         my $rest = $e->storage->read_at( undef, $self->bucket_size - $e->hash_size );
1514         push @buckets, $md5 . $rest;
1515     }
1516
1517     return @buckets;
1518 }
1519
1520 sub write_at_next_open {
1521     my $self = shift;
1522     my ($md5) = @_;
1523
1524     #XXX This is such a hack!
1525     $self->{_next_open} = 0 unless exists $self->{_next_open};
1526
1527     $self->engine->storage->print_at(
1528         $self->offset + $self->base_size + $self->{_next_open}++ * $self->bucket_size,
1529         $md5,
1530     );
1531 }
1532
1533 sub has_md5 {
1534     my $self = shift;
1535     unless ( exists $self->{found} ) {
1536         $self->find_md5;
1537     }
1538     return $self->{found};
1539 }
1540
1541 sub find_md5 {
1542     my $self = shift;
1543
1544     $self->{found} = undef;
1545     $self->{idx}   = -1;
1546
1547     if ( @_ ) {
1548         $self->{key_md5} = shift;
1549     }
1550
1551     # If we don't have an MD5, then what are we supposed to do?
1552     unless ( exists $self->{key_md5} ) {
1553         DBM::Deep->_throw_error( "Cannot find_md5 without a key_md5 set" );
1554     }
1555
1556     my $e = $self->engine;
1557     foreach my $idx ( 0 .. $e->max_buckets - 1 ) {
1558         my $potential = $e->storage->read_at(
1559             $self->offset + $self->base_size + $idx * $self->bucket_size, $e->hash_size,
1560         );
1561
1562         if ( $potential eq $e->blank_md5 ) {
1563             $self->{idx} = $idx;
1564             return;
1565         }
1566
1567         if ( $potential eq $self->{key_md5} ) {
1568             $self->{found} = 1;
1569             $self->{idx} = $idx;
1570             return;
1571         }
1572     }
1573
1574     return;
1575 }
1576
1577 sub write_md5 {
1578     my $self = shift;
1579     my ($args) = @_;
1580
1581     DBM::Deep->_throw_error( "write_md5: no key" ) unless exists $args->{key};
1582     DBM::Deep->_throw_error( "write_md5: no key_md5" ) unless exists $args->{key_md5};
1583     DBM::Deep->_throw_error( "write_md5: no value" ) unless exists $args->{value};
1584
1585     my $engine = $self->engine;
1586
1587     $args->{trans_id} = $engine->trans_id unless exists $args->{trans_id};
1588
1589     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
1590     $engine->add_entry( $args->{trans_id}, $spot );
1591
1592     unless ($self->{found}) {
1593         my $key_sector = DBM::Deep::Engine::Sector::Scalar->new({
1594             engine => $engine,
1595             data   => $args->{key},
1596         });
1597
1598         $engine->storage->print_at( $spot,
1599             $args->{key_md5},
1600             pack( $StP{$engine->byte_size}, $key_sector->offset ),
1601         );
1602     }
1603
1604     my $loc = $spot
1605       + $engine->hash_size
1606       + $engine->byte_size
1607       + $args->{trans_id} * ( $engine->byte_size + 4 );
1608
1609     $engine->storage->print_at( $loc,
1610         pack( $StP{$engine->byte_size}, $args->{value}->offset ),
1611         pack( 'N', $engine->get_txn_staleness_counter( $args->{trans_id} ) ),
1612     );
1613 }
1614
1615 sub mark_deleted {
1616     my $self = shift;
1617     my ($args) = @_;
1618     $args ||= {};
1619
1620     my $engine = $self->engine;
1621
1622     $args->{trans_id} = $engine->trans_id unless exists $args->{trans_id};
1623
1624     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
1625     $engine->add_entry( $args->{trans_id}, $spot );
1626
1627     my $loc = $spot
1628       + $engine->hash_size
1629       + $engine->byte_size
1630       + $args->{trans_id} * ( $engine->byte_size + 4 );
1631
1632     $engine->storage->print_at( $loc,
1633         pack( $StP{$engine->byte_size}, 1 ), # 1 is the marker for deleted
1634         pack( 'N', $engine->get_txn_staleness_counter( $args->{trans_id} ) ),
1635     );
1636 }
1637
1638 sub delete_md5 {
1639     my $self = shift;
1640     my ($args) = @_;
1641
1642     my $engine = $self->engine;
1643     return undef unless $self->{found};
1644
1645     # Save the location so that we can free the data
1646     my $location = $self->get_data_location_for({
1647         allow_head => 0,
1648     });
1649     my $key_sector = $self->get_key_for;
1650
1651     #XXX This isn't going to work right and you know it! This eradicates data
1652     # that we're not ready to eradicate just yet.
1653     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
1654     $engine->storage->print_at( $spot,
1655         $engine->storage->read_at(
1656             $spot + $self->bucket_size,
1657             $self->bucket_size * ( $engine->num_txns - $self->{idx} - 1 ),
1658         ),
1659         chr(0) x $self->bucket_size,
1660     );
1661
1662     $key_sector->free;
1663
1664     my $data_sector = $self->engine->_load_sector( $location );
1665     my $data = $data_sector->data;
1666     $data_sector->free;
1667
1668     return $data;
1669 }
1670
1671 sub get_data_location_for {
1672     my $self = shift;
1673     my ($args) = @_;
1674     $args ||= {};
1675
1676     $args->{allow_head} = 0 unless exists $args->{allow_head};
1677     $args->{trans_id}   = $self->engine->trans_id unless exists $args->{trans_id};
1678     $args->{idx}        = $self->{idx} unless exists $args->{idx};
1679
1680     my $e = $self->engine;
1681
1682     my $spot = $self->offset + $self->base_size
1683       + $args->{idx} * $self->bucket_size
1684       + $e->hash_size
1685       + $e->byte_size
1686       + $args->{trans_id} * ( $e->byte_size + 4 );
1687
1688     my $buffer = $e->storage->read_at(
1689         $spot,
1690         $e->byte_size + 4,
1691     );
1692     my ($loc, $staleness) = unpack( $StP{$e->byte_size} . ' N', $buffer );
1693
1694     # We have found an entry that is old, so get rid of it
1695     if ( $staleness != (my $s = $e->get_txn_staleness_counter( $args->{trans_id} ) ) ) {
1696         $e->storage->print_at(
1697             $spot,
1698             pack( $StP{$e->byte_size} . ' N', (0) x 2 ), 
1699         );
1700         $loc = 0;
1701     }
1702
1703     # If we're in a transaction and we never wrote to this location, try the
1704     # HEAD instead.
1705     if ( $args->{trans_id} && !$loc && $args->{allow_head} ) {
1706         return $self->get_data_location_for({
1707             trans_id   => 0,
1708             allow_head => 1,
1709             idx        => $args->{idx},
1710         });
1711     }
1712     return $loc <= 1 ? 0 : $loc;
1713 }
1714
1715 sub get_data_for {
1716     my $self = shift;
1717     my ($args) = @_;
1718     $args ||= {};
1719
1720     return unless $self->{found};
1721     my $location = $self->get_data_location_for({
1722         allow_head => $args->{allow_head},
1723     });
1724     return $self->engine->_load_sector( $location );
1725 }
1726
1727 sub get_key_for {
1728     my $self = shift;
1729     my ($idx) = @_;
1730     $idx = $self->{idx} unless defined $idx;
1731
1732     if ( $idx >= $self->engine->max_buckets ) {
1733         DBM::Deep->_throw_error( "get_key_for(): Attempting to retrieve $idx" );
1734     }
1735
1736     my $location = $self->engine->storage->read_at(
1737         $self->offset + $self->base_size + $idx * $self->bucket_size + $self->engine->hash_size,
1738         $self->engine->byte_size,
1739     );
1740     $location = unpack( $StP{$self->engine->byte_size}, $location );
1741     return unless $location;
1742     return $self->engine->_load_sector( $location );
1743 }
1744
1745 package DBM::Deep::Engine::Sector::Index;
1746
1747 our @ISA = qw( DBM::Deep::Engine::Sector );
1748
1749 sub _init {
1750     my $self = shift;
1751
1752     my $engine = $self->engine;
1753
1754     unless ( $self->offset ) {
1755         my $leftover = $self->size - $self->base_size;
1756
1757         $self->{offset} = $engine->_request_index_sector( $self->size );
1758         $engine->storage->print_at( $self->offset, $engine->SIG_INDEX ); # Sector type
1759         # Skip staleness counter
1760         $engine->storage->print_at( $self->offset + $self->base_size,
1761             chr(0) x $leftover, # Zero-fill the rest
1762         );
1763     }
1764
1765     return $self;
1766 }
1767
1768 sub size {
1769     my $self = shift;
1770     unless ( $self->{size} ) {
1771         my $e = $self->engine;
1772         $self->{size} = $self->base_size + $e->byte_size * $e->hash_chars;
1773     }
1774     return $self->{size};
1775 }
1776
1777 sub free_meth { return '_add_free_index_sector' }
1778
1779 sub free {
1780     my $self = shift;
1781     my $e = $self->engine;
1782
1783     for my $i ( 0 .. $e->hash_chars - 1 ) {
1784         my $l = $self->location_for( $i ) or next;
1785         $e->_load_sector( $l )->free;
1786     }
1787
1788     $self->SUPER::free();
1789 }
1790
1791 sub _loc_for {
1792     my $self = shift;
1793     my ($idx) = @_;
1794     return $self->offset + $self->base_size + $idx * $self->engine->byte_size;
1795 }
1796
1797 sub get_entry {
1798     my $self = shift;
1799     my ($idx) = @_;
1800
1801     my $e = $self->engine;
1802
1803     die "get_entry: Out of range ($idx)"
1804         if $idx < 0 || $idx >= $e->hash_chars;
1805
1806     return unpack(
1807         $StP{$e->byte_size},
1808         $e->storage->read_at( $self->_loc_for( $idx ), $e->byte_size ),
1809     );
1810 }
1811
1812 sub set_entry {
1813     my $self = shift;
1814     my ($idx, $loc) = @_;
1815
1816     my $e = $self->engine;
1817
1818     die "set_entry: Out of range ($idx)"
1819         if $idx < 0 || $idx >= $e->hash_chars;
1820
1821     $self->engine->storage->print_at(
1822         $self->_loc_for( $idx ),
1823         pack( $StP{$e->byte_size}, $loc ),
1824     );
1825 }
1826
1827 1;
1828 __END__