First checkin of the reversion back from the failed optimization effort. I will be...
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 use 5.006_000;
4
5 use strict;
6 use warnings FATAL => 'all';
7
8 # Never import symbols into our namespace. We are a class, not a library.
9 # -RobK, 2008-05-27
10 use Scalar::Util ();
11
12 #use Data::Dumper ();
13
14 # File-wide notes:
15 # * Every method in here assumes that the storage has been appropriately
16 #   safeguarded. This can be anything from flock() to some sort of manual
17 #   mutex. But, it's the caller's responsability to make sure that this has
18 #   been done.
19
20 # Setup file and tag signatures.  These should never change.
21 sub SIG_FILE     () { 'DPDB' }
22 sub SIG_HEADER   () { 'h'    }
23 sub SIG_HASH     () { 'H'    }
24 sub SIG_ARRAY    () { 'A'    }
25 sub SIG_NULL     () { 'N'    }
26 sub SIG_DATA     () { 'D'    }
27 sub SIG_INDEX    () { 'I'    }
28 sub SIG_BLIST    () { 'B'    }
29 sub SIG_FREE     () { 'F'    }
30 sub SIG_SIZE     () {  1     }
31
32 my $STALE_SIZE = 2;
33
34 # Please refer to the pack() documentation for further information
35 my %StP = (
36     1 => 'C', # Unsigned char value (no order needed as it's just one byte)
37     2 => 'n', # Unsigned short in "network" (big-endian) order
38     4 => 'N', # Unsigned long in "network" (big-endian) order
39     8 => 'Q', # Usigned quad (no order specified, presumably machine-dependent)
40 );
41
42 ################################################################################
43
44 sub new {
45     my $class = shift;
46     my ($args) = @_;
47
48     $args->{storage} = DBM::Deep::File->new( $args )
49         unless exists $args->{storage};
50
51     my $self = bless {
52         byte_size   => 4,
53
54         digest      => undef,
55         hash_size   => 16,  # In bytes
56         hash_chars  => 256, # Number of chars the algorithm uses per byte
57         max_buckets => 16,
58         num_txns    => 1,   # The HEAD
59         trans_id    => 0,   # Default to the HEAD
60
61         data_sector_size => 64, # Size in bytes of each data sector
62
63         entries => {}, # This is the list of entries for transactions
64         storage => undef,
65     }, $class;
66
67     # Never allow byte_size to be set directly.
68     delete $args->{byte_size};
69     if ( defined $args->{pack_size} ) {
70         if ( lc $args->{pack_size} eq 'small' ) {
71             $args->{byte_size} = 2;
72         }
73         elsif ( lc $args->{pack_size} eq 'medium' ) {
74             $args->{byte_size} = 4;
75         }
76         elsif ( lc $args->{pack_size} eq 'large' ) {
77             $args->{byte_size} = 8;
78         }
79         else {
80             DBM::Deep->_throw_error( "Unknown pack_size value: '$args->{pack_size}'" );
81         }
82     }
83
84     # Grab the parameters we want to use
85     foreach my $param ( keys %$self ) {
86         next unless exists $args->{$param};
87         $self->{$param} = $args->{$param};
88     }
89
90     my %validations = (
91         max_buckets      => { floor => 16, ceil => 256 },
92         num_txns         => { floor => 1,  ceil => 255 },
93         data_sector_size => { floor => 32, ceil => 256 },
94     );
95
96     while ( my ($attr, $c) = each %validations ) {
97         if (   !defined $self->{$attr}
98             || !length $self->{$attr}
99             || $self->{$attr} =~ /\D/
100             || $self->{$attr} < $c->{floor}
101         ) {
102             $self->{$attr} = '(undef)' if !defined $self->{$attr};
103             warn "Floor of $attr is $c->{floor}. Setting it to $c->{floor} from '$self->{$attr}'\n";
104             $self->{$attr} = $c->{floor};
105         }
106         elsif ( $self->{$attr} > $c->{ceil} ) {
107             warn "Ceiling of $attr is $c->{ceil}. Setting it to $c->{ceil} from '$self->{$attr}'\n";
108             $self->{$attr} = $c->{ceil};
109         }
110     }
111
112     if ( !$self->{digest} ) {
113         require Digest::MD5;
114         $self->{digest} = \&Digest::MD5::md5;
115     }
116
117     return $self;
118 }
119
120 ################################################################################
121
122 sub read_value {
123     my $self = shift;
124     my ($obj, $key) = @_;
125
126     # This will be a Reference sector
127     my $sector = $self->_load_sector( $obj->_base_offset )
128         or return;
129
130     if ( $sector->staleness != $obj->_staleness ) {
131         return;
132     }
133
134     my $key_md5 = $self->_apply_digest( $key );
135
136     my $value_sector = $sector->get_data_for({
137         key_md5    => $key_md5,
138         allow_head => 1,
139     });
140
141     unless ( $value_sector ) {
142         $value_sector = DBM::Deep::Engine::Sector::Null->new({
143             engine => $self,
144             data   => undef,
145         });
146
147         $sector->write_data({
148             key_md5 => $key_md5,
149             key     => $key,
150             value   => $value_sector,
151         });
152     }
153
154     return $value_sector->data;
155 }
156
157 sub get_classname {
158     my $self = shift;
159     my ($obj) = @_;
160
161     # This will be a Reference sector
162     my $sector = $self->_load_sector( $obj->_base_offset )
163         or DBM::Deep->_throw_error( "How did get_classname fail (no sector for '$obj')?!" );
164
165     if ( $sector->staleness != $obj->_staleness ) {
166         return;
167     }
168
169     return $sector->get_classname;
170 }
171
172 sub make_reference {
173     my $self = shift;
174     my ($obj, $old_key, $new_key) = @_;
175
176     # This will be a Reference sector
177     my $sector = $self->_load_sector( $obj->_base_offset )
178         or DBM::Deep->_throw_error( "How did get_classname fail (no sector for '$obj')?!" );
179
180     if ( $sector->staleness != $obj->_staleness ) {
181         return;
182     }
183
184     my $old_md5 = $self->_apply_digest( $old_key );
185
186     my $value_sector = $sector->get_data_for({
187         key_md5    => $old_md5,
188         allow_head => 1,
189     });
190
191     unless ( $value_sector ) {
192         $value_sector = DBM::Deep::Engine::Sector::Null->new({
193             engine => $self,
194             data   => undef,
195         });
196
197         $sector->write_data({
198             key_md5 => $old_md5,
199             key     => $old_key,
200             value   => $value_sector,
201         });
202     }
203
204     if ( $value_sector->isa( 'DBM::Deep::Engine::Sector::Reference' ) ) {
205         $sector->write_data({
206             key     => $new_key,
207             key_md5 => $self->_apply_digest( $new_key ),
208             value   => $value_sector,
209         });
210         $value_sector->increment_refcount;
211     }
212     else {
213         $sector->write_data({
214             key     => $new_key,
215             key_md5 => $self->_apply_digest( $new_key ),
216             value   => $value_sector->clone,
217         });
218     }
219 }
220
221 sub key_exists {
222     my $self = shift;
223     my ($obj, $key) = @_;
224
225     # This will be a Reference sector
226     my $sector = $self->_load_sector( $obj->_base_offset )
227         or return '';
228
229     if ( $sector->staleness != $obj->_staleness ) {
230         return '';
231     }
232
233     my $data = $sector->get_data_for({
234         key_md5    => $self->_apply_digest( $key ),
235         allow_head => 1,
236     });
237
238     # exists() returns 1 or '' for true/false.
239     return $data ? 1 : '';
240 }
241
242 sub delete_key {
243     my $self = shift;
244     my ($obj, $key) = @_;
245
246     my $sector = $self->_load_sector( $obj->_base_offset )
247         or return;
248
249     if ( $sector->staleness != $obj->_staleness ) {
250         return;
251     }
252
253     return $sector->delete_key({
254         key_md5    => $self->_apply_digest( $key ),
255         allow_head => 0,
256     });
257 }
258
259 sub write_value {
260     my $self = shift;
261     my ($obj, $key, $value) = @_;
262
263     my $r = Scalar::Util::reftype( $value ) || '';
264     {
265         last if $r eq '';
266         last if $r eq 'HASH';
267         last if $r eq 'ARRAY';
268
269         DBM::Deep->_throw_error(
270             "Storage of references of type '$r' is not supported."
271         );
272     }
273
274     # This will be a Reference sector
275     my $sector = $self->_load_sector( $obj->_base_offset )
276         or DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep." );
277
278     if ( $sector->staleness != $obj->_staleness ) {
279         DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep." );
280     }
281
282     my ($class, $type);
283     if ( !defined $value ) {
284         $class = 'DBM::Deep::Engine::Sector::Null';
285     }
286     elsif ( $r eq 'ARRAY' || $r eq 'HASH' ) {
287         my $tmpvar;
288         if ( $r eq 'ARRAY' ) {
289             $tmpvar = tied @$value;
290         } elsif ( $r eq 'HASH' ) {
291             $tmpvar = tied %$value;
292         }
293
294         if ( $tmpvar ) {
295             my $is_dbm_deep = eval { local $SIG{'__DIE__'}; $tmpvar->isa( 'DBM::Deep' ); };
296
297             unless ( $is_dbm_deep ) {
298                 DBM::Deep->_throw_error( "Cannot store something that is tied." );
299             }
300
301             unless ( $tmpvar->_engine->storage == $self->storage ) {
302                 DBM::Deep->_throw_error( "Cannot store values across DBM::Deep files. Please use export() instead." );
303             }
304
305             # First, verify if we're storing the same thing to this spot. If we are, then
306             # this should be a no-op. -EJS, 2008-05-19
307             my $loc = $sector->get_data_location_for({
308                 key_md5 => $self->_apply_digest( $key ),
309                 allow_head => 1,
310             });
311
312             if ( defined($loc) && $loc == $tmpvar->_base_offset ) {
313                 return 1;
314             }
315
316             #XXX Can this use $loc?
317             my $value_sector = $self->_load_sector( $tmpvar->_base_offset );
318             $sector->write_data({
319                 key     => $key,
320                 key_md5 => $self->_apply_digest( $key ),
321                 value   => $value_sector,
322             });
323             $value_sector->increment_refcount;
324
325             return 1;
326         }
327
328         $class = 'DBM::Deep::Engine::Sector::Reference';
329         $type = substr( $r, 0, 1 );
330     }
331     else {
332         if ( tied($value) ) {
333             DBM::Deep->_throw_error( "Cannot store something that is tied." );
334         }
335         $class = 'DBM::Deep::Engine::Sector::Scalar';
336     }
337
338     # Create this after loading the reference sector in case something bad happens.
339     # This way, we won't allocate value sector(s) needlessly.
340     my $value_sector = $class->new({
341         engine => $self,
342         data   => $value,
343         type   => $type,
344     });
345
346     $sector->write_data({
347         key     => $key,
348         key_md5 => $self->_apply_digest( $key ),
349         value   => $value_sector,
350     });
351
352     # This code is to make sure we write all the values in the $value to the disk
353     # and to make sure all changes to $value after the assignment are reflected
354     # on disk. This may be counter-intuitive at first, but it is correct dwimmery.
355     #   NOTE - simply tying $value won't perform a STORE on each value. Hence, the
356     # copy to a temp value.
357     if ( $r eq 'ARRAY' ) {
358         my @temp = @$value;
359         tie @$value, 'DBM::Deep', {
360             base_offset => $value_sector->offset,
361             staleness   => $value_sector->staleness,
362             storage     => $self->storage,
363             engine      => $self,
364         };
365         @$value = @temp;
366         bless $value, 'DBM::Deep::Array' unless Scalar::Util::blessed( $value );
367     }
368     elsif ( $r eq 'HASH' ) {
369         my %temp = %$value;
370         tie %$value, 'DBM::Deep', {
371             base_offset => $value_sector->offset,
372             staleness   => $value_sector->staleness,
373             storage     => $self->storage,
374             engine      => $self,
375         };
376
377         %$value = %temp;
378         bless $value, 'DBM::Deep::Hash' unless Scalar::Util::blessed( $value );
379     }
380
381     return 1;
382 }
383
384 # XXX Add staleness here
385 sub get_next_key {
386     my $self = shift;
387     my ($obj, $prev_key) = @_;
388
389     # XXX Need to add logic about resetting the iterator if any key in the reference has changed
390     unless ( $prev_key ) {
391         $obj->{iterator} = DBM::Deep::Iterator->new({
392             base_offset => $obj->_base_offset,
393             engine      => $self,
394         });
395     }
396
397     return $obj->{iterator}->get_next_key( $obj );
398 }
399
400 ################################################################################
401
402 sub setup_fh {
403     my $self = shift;
404     my ($obj) = @_;
405
406     # We're opening the file.
407     unless ( $obj->_base_offset ) {
408         my $bytes_read = $self->_read_file_header;
409
410         # Creating a new file
411         unless ( $bytes_read ) {
412             $self->_write_file_header;
413
414             # 1) Create Array/Hash entry
415             my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
416                 engine => $self,
417                 type   => $obj->_type,
418             });
419             $obj->{base_offset} = $initial_reference->offset;
420             $obj->{staleness} = $initial_reference->staleness;
421
422             $self->storage->flush;
423         }
424         # Reading from an existing file
425         else {
426             $obj->{base_offset} = $bytes_read;
427             my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
428                 engine => $self,
429                 offset => $obj->_base_offset,
430             });
431             unless ( $initial_reference ) {
432                 DBM::Deep->_throw_error("Corrupted file, no master index record");
433             }
434
435             unless ($obj->_type eq $initial_reference->type) {
436                 DBM::Deep->_throw_error("File type mismatch");
437             }
438
439             $obj->{staleness} = $initial_reference->staleness;
440         }
441     }
442
443     return 1;
444 }
445
446 sub begin_work {
447     my $self = shift;
448     my ($obj) = @_;
449
450     if ( $self->trans_id ) {
451         DBM::Deep->_throw_error( "Cannot begin_work within an active transaction" );
452     }
453
454     my @slots = $self->read_txn_slots;
455     my $found;
456     for my $i ( 0 .. $#slots ) {
457         next if $slots[$i];
458
459         $slots[$i] = 1;
460         $self->set_trans_id( $i + 1 );
461         $found = 1;
462         last;
463     }
464     unless ( $found ) {
465         DBM::Deep->_throw_error( "Cannot allocate transaction ID" );
466     }
467     $self->write_txn_slots( @slots );
468
469     if ( !$self->trans_id ) {
470         DBM::Deep->_throw_error( "Cannot begin_work - no available transactions" );
471     }
472
473     return;
474 }
475
476 sub rollback {
477     my $self = shift;
478     my ($obj) = @_;
479
480     if ( !$self->trans_id ) {
481         DBM::Deep->_throw_error( "Cannot rollback without an active transaction" );
482     }
483
484     # Each entry is the file location for a bucket that has a modification for
485     # this transaction. The entries need to be expunged.
486     foreach my $entry (@{ $self->get_entries } ) {
487         # Remove the entry here
488         my $read_loc = $entry
489           + $self->hash_size
490           + $self->byte_size
491           + $self->byte_size
492           + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE );
493
494         my $data_loc = $self->storage->read_at( $read_loc, $self->byte_size );
495         $data_loc = unpack( $StP{$self->byte_size}, $data_loc );
496         $self->storage->print_at( $read_loc, pack( $StP{$self->byte_size}, 0 ) );
497
498         if ( $data_loc > 1 ) {
499             $self->_load_sector( $data_loc )->free;
500         }
501     }
502
503     $self->clear_entries;
504
505     my @slots = $self->read_txn_slots;
506     $slots[$self->trans_id-1] = 0;
507     $self->write_txn_slots( @slots );
508     $self->inc_txn_staleness_counter( $self->trans_id );
509     $self->set_trans_id( 0 );
510
511     return 1;
512 }
513
514 sub commit {
515     my $self = shift;
516     my ($obj) = @_;
517
518     if ( !$self->trans_id ) {
519         DBM::Deep->_throw_error( "Cannot commit without an active transaction" );
520     }
521
522     foreach my $entry (@{ $self->get_entries } ) {
523         # Overwrite the entry in head with the entry in trans_id
524         my $base = $entry
525           + $self->hash_size
526           + $self->byte_size;
527
528         my $head_loc = $self->storage->read_at( $base, $self->byte_size );
529         $head_loc = unpack( $StP{$self->byte_size}, $head_loc );
530
531         my $spot = $base + $self->byte_size + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE );
532         my $trans_loc = $self->storage->read_at(
533             $spot, $self->byte_size,
534         );
535
536         $self->storage->print_at( $base, $trans_loc );
537         $self->storage->print_at(
538             $spot,
539             pack( $StP{$self->byte_size} . ' ' . $StP{$STALE_SIZE}, (0) x 2 ),
540         );
541
542         if ( $head_loc > 1 ) {
543             $self->_load_sector( $head_loc )->free;
544         }
545     }
546
547     $self->clear_entries;
548
549     my @slots = $self->read_txn_slots;
550     $slots[$self->trans_id-1] = 0;
551     $self->write_txn_slots( @slots );
552     $self->inc_txn_staleness_counter( $self->trans_id );
553     $self->set_trans_id( 0 );
554
555     return 1;
556 }
557
558 sub read_txn_slots {
559     my $self = shift;
560     my $bl = $self->txn_bitfield_len;
561     my $num_bits = $bl * 8;
562     return split '', unpack( 'b'.$num_bits,
563         $self->storage->read_at(
564             $self->trans_loc, $bl,
565         )
566     );
567 }
568
569 sub write_txn_slots {
570     my $self = shift;
571     my $num_bits = $self->txn_bitfield_len * 8;
572     $self->storage->print_at( $self->trans_loc,
573         pack( 'b'.$num_bits, join('', @_) ),
574     );
575 }
576
577 sub get_running_txn_ids {
578     my $self = shift;
579     my @transactions = $self->read_txn_slots;
580     my @trans_ids = map { $_+1} grep { $transactions[$_] } 0 .. $#transactions;
581 }
582
583 sub get_txn_staleness_counter {
584     my $self = shift;
585     my ($trans_id) = @_;
586
587     # Hardcode staleness of 0 for the HEAD
588     return 0 unless $trans_id;
589
590     return unpack( $StP{$STALE_SIZE},
591         $self->storage->read_at(
592             $self->trans_loc + $self->txn_bitfield_len + $STALE_SIZE * ($trans_id - 1),
593             $STALE_SIZE,
594         )
595     );
596 }
597
598 sub inc_txn_staleness_counter {
599     my $self = shift;
600     my ($trans_id) = @_;
601
602     # Hardcode staleness of 0 for the HEAD
603     return 0 unless $trans_id;
604
605     $self->storage->print_at(
606         $self->trans_loc + $self->txn_bitfield_len + $STALE_SIZE * ($trans_id - 1),
607         pack( $StP{$STALE_SIZE}, $self->get_txn_staleness_counter( $trans_id ) + 1 ),
608     );
609 }
610
611 sub get_entries {
612     my $self = shift;
613     return [ keys %{ $self->{entries}{$self->trans_id} ||= {} } ];
614 }
615
616 sub add_entry {
617     my $self = shift;
618     my ($trans_id, $loc) = @_;
619
620     $self->{entries}{$trans_id} ||= {};
621     $self->{entries}{$trans_id}{$loc} = undef;
622 }
623
624 # If the buckets are being relocated because of a reindexing, the entries
625 # mechanism needs to be made aware of it.
626 sub reindex_entry {
627     my $self = shift;
628     my ($old_loc, $new_loc) = @_;
629
630     TRANS:
631     while ( my ($trans_id, $locs) = each %{ $self->{entries} } ) {
632         if ( exists $locs->{$old_loc} ) {
633             delete $locs->{$old_loc};
634             $locs->{$new_loc} = undef;
635             next TRANS;
636         }
637     }
638 }
639
640 sub clear_entries {
641     my $self = shift;
642     delete $self->{entries}{$self->trans_id};
643 }
644
645 ################################################################################
646
647 {
648     my $header_fixed = length( SIG_FILE ) + 1 + 4 + 4;
649     my $this_file_version = 3;
650
651     sub _write_file_header {
652         my $self = shift;
653
654         my $nt = $self->num_txns;
655         my $bl = $self->txn_bitfield_len;
656
657         my $header_var = 1 + 1 + 1 + 1 + $bl + $STALE_SIZE * ($nt - 1) + 3 * $self->byte_size;
658
659         my $loc = $self->storage->request_space( $header_fixed + $header_var );
660
661         $self->storage->print_at( $loc,
662             SIG_FILE,
663             SIG_HEADER,
664             pack('N', $this_file_version), # At this point, we're at 9 bytes
665             pack('N', $header_var),        # header size
666             # --- Above is $header_fixed. Below is $header_var
667             pack('C', $self->byte_size),
668
669             # These shenanigans are to allow a 256 within a C
670             pack('C', $self->max_buckets - 1),
671             pack('C', $self->data_sector_size - 1),
672
673             pack('C', $nt),
674             pack('C' . $bl, 0 ),                           # Transaction activeness bitfield
675             pack($StP{$STALE_SIZE}.($nt-1), 0 x ($nt-1) ), # Transaction staleness counters
676             pack($StP{$self->byte_size}, 0), # Start of free chain (blist size)
677             pack($StP{$self->byte_size}, 0), # Start of free chain (data size)
678             pack($StP{$self->byte_size}, 0), # Start of free chain (index size)
679         );
680
681         #XXX Set these less fragilely
682         $self->set_trans_loc( $header_fixed + 4 );
683         $self->set_chains_loc( $header_fixed + 4 + $bl + $STALE_SIZE * ($nt-1) );
684
685         return;
686     }
687
688     sub _read_file_header {
689         my $self = shift;
690
691         my $buffer = $self->storage->read_at( 0, $header_fixed );
692         return unless length($buffer);
693
694         my ($file_signature, $sig_header, $file_version, $size) = unpack(
695             'A4 A N N', $buffer
696         );
697
698         unless ( $file_signature eq SIG_FILE ) {
699             $self->storage->close;
700             DBM::Deep->_throw_error( "Signature not found -- file is not a Deep DB" );
701         }
702
703         unless ( $sig_header eq SIG_HEADER ) {
704             $self->storage->close;
705             DBM::Deep->_throw_error( "Pre-1.00 file version found" );
706         }
707
708         unless ( $file_version == $this_file_version ) {
709             $self->storage->close;
710             DBM::Deep->_throw_error(
711                 "Wrong file version found - " .  $file_version .
712                 " - expected " . $this_file_version
713             );
714         }
715
716         my $buffer2 = $self->storage->read_at( undef, $size );
717         my @values = unpack( 'C C C C', $buffer2 );
718
719         if ( @values != 4 || grep { !defined } @values ) {
720             $self->storage->close;
721             DBM::Deep->_throw_error("Corrupted file - bad header");
722         }
723
724         #XXX Add warnings if values weren't set right
725         @{$self}{qw(byte_size max_buckets data_sector_size num_txns)} = @values;
726
727         # These shenangians are to allow a 256 within a C
728         $self->{max_buckets} += 1;
729         $self->{data_sector_size} += 1;
730
731         my $bl = $self->txn_bitfield_len;
732
733         my $header_var = scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) + 3 * $self->byte_size;
734         unless ( $size == $header_var ) {
735             $self->storage->close;
736             DBM::Deep->_throw_error( "Unexpected size found ($size <-> $header_var)." );
737         }
738
739         $self->set_trans_loc( $header_fixed + scalar(@values) );
740         $self->set_chains_loc( $header_fixed + scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) );
741
742         return length($buffer) + length($buffer2);
743     }
744 }
745
746 sub _load_sector {
747     my $self = shift;
748     my ($offset) = @_;
749
750     # Add a catch for offset of 0 or 1
751     return if !$offset || $offset <= 1;
752
753     my $type = $self->storage->read_at( $offset, 1 );
754     return if $type eq chr(0);
755
756     if ( $type eq $self->SIG_ARRAY || $type eq $self->SIG_HASH ) {
757         return DBM::Deep::Engine::Sector::Reference->new({
758             engine => $self,
759             type   => $type,
760             offset => $offset,
761         });
762     }
763     # XXX Don't we need key_md5 here?
764     elsif ( $type eq $self->SIG_BLIST ) {
765         return DBM::Deep::Engine::Sector::BucketList->new({
766             engine => $self,
767             type   => $type,
768             offset => $offset,
769         });
770     }
771     elsif ( $type eq $self->SIG_INDEX ) {
772         return DBM::Deep::Engine::Sector::Index->new({
773             engine => $self,
774             type   => $type,
775             offset => $offset,
776         });
777     }
778     elsif ( $type eq $self->SIG_NULL ) {
779         return DBM::Deep::Engine::Sector::Null->new({
780             engine => $self,
781             type   => $type,
782             offset => $offset,
783         });
784     }
785     elsif ( $type eq $self->SIG_DATA ) {
786         return DBM::Deep::Engine::Sector::Scalar->new({
787             engine => $self,
788             type   => $type,
789             offset => $offset,
790         });
791     }
792     # This was deleted from under us, so just return and let the caller figure it out.
793     elsif ( $type eq $self->SIG_FREE ) {
794         return;
795     }
796
797     DBM::Deep->_throw_error( "'$offset': Don't know what to do with type '$type'" );
798 }
799
800 sub _apply_digest {
801     my $self = shift;
802     return $self->{digest}->(@_);
803 }
804
805 sub _add_free_blist_sector { shift->_add_free_sector( 0, @_ ) }
806 sub _add_free_data_sector { shift->_add_free_sector( 1, @_ ) }
807 sub _add_free_index_sector { shift->_add_free_sector( 2, @_ ) }
808
809 sub _add_free_sector {
810     my $self = shift;
811     my ($multiple, $offset, $size) = @_;
812
813     my $chains_offset = $multiple * $self->byte_size;
814
815     my $storage = $self->storage;
816
817     # Increment staleness.
818     # XXX Can this increment+modulo be done by "&= 0x1" ?
819     my $staleness = unpack( $StP{$STALE_SIZE}, $storage->read_at( $offset + SIG_SIZE, $STALE_SIZE ) );
820     $staleness = ($staleness + 1 ) % ( 2 ** ( 8 * $STALE_SIZE ) );
821     $storage->print_at( $offset + SIG_SIZE, pack( $StP{$STALE_SIZE}, $staleness ) );
822
823     my $old_head = $storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
824
825     $storage->print_at( $self->chains_loc + $chains_offset,
826         pack( $StP{$self->byte_size}, $offset ),
827     );
828
829     # Record the old head in the new sector after the signature and staleness counter
830     $storage->print_at( $offset + SIG_SIZE + $STALE_SIZE, $old_head );
831 }
832
833 sub _request_blist_sector { shift->_request_sector( 0, @_ ) }
834 sub _request_data_sector { shift->_request_sector( 1, @_ ) }
835 sub _request_index_sector { shift->_request_sector( 2, @_ ) }
836
837 sub _request_sector {
838     my $self = shift;
839     my ($multiple, $size) = @_;
840
841     my $chains_offset = $multiple * $self->byte_size;
842
843     my $old_head = $self->storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
844     my $loc = unpack( $StP{$self->byte_size}, $old_head );
845
846     # We don't have any free sectors of the right size, so allocate a new one.
847     unless ( $loc ) {
848         my $offset = $self->storage->request_space( $size );
849
850         # Zero out the new sector. This also guarantees correct increases
851         # in the filesize.
852         $self->storage->print_at( $offset, chr(0) x $size );
853
854         return $offset;
855     }
856
857     # Read the new head after the signature and the staleness counter
858     my $new_head = $self->storage->read_at( $loc + SIG_SIZE + $STALE_SIZE, $self->byte_size );
859     $self->storage->print_at( $self->chains_loc + $chains_offset, $new_head );
860     $self->storage->print_at(
861         $loc + SIG_SIZE + $STALE_SIZE,
862         pack( $StP{$self->byte_size}, 0 ),
863     );
864
865     return $loc;
866 }
867
868 ################################################################################
869
870 sub flush {
871     my $self = shift;
872
873 #    my $sectors = $self->dirty_sectors;
874 #    for my $offset (sort { $a <=> $b } keys %{ $sectors }) {
875 #        $self->storage->print_at( $offset, $self->sector_cache->{$offset} );
876 #    }
877
878     # Why do we need to have the storage flush? Shouldn't autoflush take care of things?
879     # -RobK, 2008-06-26
880     $self->storage->flush;
881
882 #    $self->clear_dirty_sectors;
883
884 #    $self->clear_sector_cache;
885 }
886
887 sub lock_exclusive {
888     my $self = shift;
889     my ($obj) = @_;
890     return $self->storage->lock_exclusive( $obj );
891 }
892
893 sub lock_shared {
894     my $self = shift;
895     my ($obj) = @_;
896     return $self->storage->lock_shared( $obj );
897 }
898
899 sub unlock {
900     my $self = shift;
901     my ($obj) = @_;
902
903     my $rv = $self->storage->unlock( $obj );
904
905     $self->flush if $rv;
906
907     return $rv;
908 }
909
910 ################################################################################
911
912 sub storage     { $_[0]{storage} }
913 sub byte_size   { $_[0]{byte_size} }
914 sub hash_size   { $_[0]{hash_size} }
915 sub hash_chars  { $_[0]{hash_chars} }
916 sub num_txns    { $_[0]{num_txns} }
917 sub max_buckets { $_[0]{max_buckets} }
918 sub blank_md5   { chr(0) x $_[0]->hash_size }
919 sub data_sector_size { $_[0]{data_sector_size} }
920
921 # This is a calculated value
922 sub txn_bitfield_len {
923     my $self = shift;
924     unless ( exists $self->{txn_bitfield_len} ) {
925         my $temp = ($self->num_txns) / 8;
926         if ( $temp > int( $temp ) ) {
927             $temp = int( $temp ) + 1;
928         }
929         $self->{txn_bitfield_len} = $temp;
930     }
931     return $self->{txn_bitfield_len};
932 }
933
934 sub trans_id     { $_[0]{trans_id} }
935 sub set_trans_id { $_[0]{trans_id} = $_[1] }
936
937 sub trans_loc     { $_[0]{trans_loc} }
938 sub set_trans_loc { $_[0]{trans_loc} = $_[1] }
939
940 sub chains_loc     { $_[0]{chains_loc} }
941 sub set_chains_loc { $_[0]{chains_loc} = $_[1] }
942
943 sub cache       { $_[0]{cache} ||= {} }
944 sub clear_cache { %{$_[0]->cache} = () }
945
946 sub _dump_file {
947     my $self = shift;
948
949     # Read the header
950     my $spot = $self->_read_file_header();
951
952     my %types = (
953         0 => 'B',
954         1 => 'D',
955         2 => 'I',
956     );
957
958     my %sizes = (
959         'D' => $self->data_sector_size,
960         'B' => DBM::Deep::Engine::Sector::BucketList->new({engine=>$self,offset=>1})->size,
961         'I' => DBM::Deep::Engine::Sector::Index->new({engine=>$self,offset=>1})->size,
962     );
963
964     my $return = "";
965
966     # Header values
967     $return .= "NumTxns: " . $self->num_txns . $/;
968
969     # Read the free sector chains
970     my %sectors;
971     foreach my $multiple ( 0 .. 2 ) {
972         $return .= "Chains($types{$multiple}):";
973         my $old_loc = $self->chains_loc + $multiple * $self->byte_size;
974         while ( 1 ) {
975             my $loc = unpack(
976                 $StP{$self->byte_size},
977                 $self->storage->read_at( $old_loc, $self->byte_size ),
978             );
979
980             # We're now out of free sectors of this kind.
981             unless ( $loc ) {
982                 last;
983             }
984
985             $sectors{ $types{$multiple} }{ $loc } = undef;
986             $old_loc = $loc + SIG_SIZE + $STALE_SIZE;
987             $return .= " $loc";
988         }
989         $return .= $/;
990     }
991
992     SECTOR:
993     while ( $spot < $self->storage->{end} ) {
994         # Read each sector in order.
995         my $sector = $self->_load_sector( $spot );
996         if ( !$sector ) {
997             # Find it in the free-sectors that were found already
998             foreach my $type ( keys %sectors ) {
999                 if ( exists $sectors{$type}{$spot} ) {
1000                     my $size = $sizes{$type};
1001                     $return .= sprintf "%08d: %s %04d\n", $spot, 'F' . $type, $size;
1002                     $spot += $size;
1003                     next SECTOR;
1004                 }
1005             }
1006
1007             die "********\n$return\nDidn't find free sector for $spot in chains\n********\n";
1008         }
1009         else {
1010             $return .= sprintf "%08d: %s  %04d", $spot, $sector->type, $sector->size;
1011             if ( $sector->type eq 'D' ) {
1012                 $return .= ' ' . $sector->data;
1013             }
1014             elsif ( $sector->type eq 'A' || $sector->type eq 'H' ) {
1015                 $return .= ' REF: ' . $sector->get_refcount;
1016             }
1017             elsif ( $sector->type eq 'B' ) {
1018                 foreach my $bucket ( $sector->chopped_up ) {
1019                     $return .= "\n    ";
1020                     $return .= sprintf "%08d", unpack($StP{$self->byte_size},
1021                         substr( $bucket->[-1], $self->hash_size, $self->byte_size),
1022                     );
1023                     my $l = unpack( $StP{$self->byte_size},
1024                         substr( $bucket->[-1],
1025                             $self->hash_size + $self->byte_size,
1026                             $self->byte_size,
1027                         ),
1028                     );
1029                     $return .= sprintf " %08d", $l;
1030                     foreach my $txn ( 0 .. $self->num_txns - 2 ) {
1031                         my $l = unpack( $StP{$self->byte_size},
1032                             substr( $bucket->[-1],
1033                                 $self->hash_size + 2 * $self->byte_size + $txn * ($self->byte_size + $STALE_SIZE),
1034                                 $self->byte_size,
1035                             ),
1036                         );
1037                         $return .= sprintf " %08d", $l;
1038                     }
1039                 }
1040             }
1041             $return .= $/;
1042
1043             $spot += $sector->size;
1044         }
1045     }
1046
1047     return $return;
1048 }
1049
1050 ################################################################################
1051
1052 package DBM::Deep::Iterator;
1053
1054 sub new {
1055     my $class = shift;
1056     my ($args) = @_;
1057
1058     my $self = bless {
1059         breadcrumbs => [],
1060         engine      => $args->{engine},
1061         base_offset => $args->{base_offset},
1062     }, $class;
1063
1064     Scalar::Util::weaken( $self->{engine} );
1065
1066     return $self;
1067 }
1068
1069 sub reset { $_[0]{breadcrumbs} = [] }
1070
1071 sub get_sector_iterator {
1072     my $self = shift;
1073     my ($loc) = @_;
1074
1075     my $sector = $self->{engine}->_load_sector( $loc )
1076         or return;
1077
1078     if ( $sector->isa( 'DBM::Deep::Engine::Sector::Index' ) ) {
1079         return DBM::Deep::Iterator::Index->new({
1080             iterator => $self,
1081             sector   => $sector,
1082         });
1083     }
1084     elsif ( $sector->isa( 'DBM::Deep::Engine::Sector::BucketList' ) ) {
1085         return DBM::Deep::Iterator::BucketList->new({
1086             iterator => $self,
1087             sector   => $sector,
1088         });
1089     }
1090
1091     DBM::Deep->_throw_error( "get_sector_iterator(): Why did $loc make a $sector?" );
1092 }
1093
1094 sub get_next_key {
1095     my $self = shift;
1096     my ($obj) = @_;
1097
1098     my $crumbs = $self->{breadcrumbs};
1099     my $e = $self->{engine};
1100
1101     unless ( @$crumbs ) {
1102         # This will be a Reference sector
1103         my $sector = $e->_load_sector( $self->{base_offset} )
1104             # If no sector is found, thist must have been deleted from under us.
1105             or return;
1106
1107         if ( $sector->staleness != $obj->_staleness ) {
1108             return;
1109         }
1110
1111         my $loc = $sector->get_blist_loc
1112             or return;
1113
1114         push @$crumbs, $self->get_sector_iterator( $loc );
1115     }
1116
1117     FIND_NEXT_KEY: {
1118         # We're at the end.
1119         unless ( @$crumbs ) {
1120             $self->reset;
1121             return;
1122         }
1123
1124         my $iterator = $crumbs->[-1];
1125
1126         # This level is done.
1127         if ( $iterator->at_end ) {
1128             pop @$crumbs;
1129             redo FIND_NEXT_KEY;
1130         }
1131
1132         if ( $iterator->isa( 'DBM::Deep::Iterator::Index' ) ) {
1133             # If we don't have any more, it will be caught at the
1134             # prior check.
1135             if ( my $next = $iterator->get_next_iterator ) {
1136                 push @$crumbs, $next;
1137             }
1138             redo FIND_NEXT_KEY;
1139         }
1140
1141         unless ( $iterator->isa( 'DBM::Deep::Iterator::BucketList' ) ) {
1142             DBM::Deep->_throw_error(
1143                 "Should have a bucketlist iterator here - instead have $iterator"
1144             );
1145         }
1146
1147         # At this point, we have a BucketList iterator
1148         my $key = $iterator->get_next_key;
1149         if ( defined $key ) {
1150             return $key;
1151         }
1152         #XXX else { $iterator->set_to_end() } ?
1153
1154         # We hit the end of the bucketlist iterator, so redo
1155         redo FIND_NEXT_KEY;
1156     }
1157
1158     DBM::Deep->_throw_error( "get_next_key(): How did we get here?" );
1159 }
1160
1161 package DBM::Deep::Iterator::Index;
1162
1163 sub new {
1164     my $self = bless $_[1] => $_[0];
1165     $self->{curr_index} = 0;
1166     return $self;
1167 }
1168
1169 sub at_end {
1170     my $self = shift;
1171     return $self->{curr_index} >= $self->{iterator}{engine}->hash_chars;
1172 }
1173
1174 sub get_next_iterator {
1175     my $self = shift;
1176
1177     my $loc;
1178     while ( !$loc ) {
1179         return if $self->at_end;
1180         $loc = $self->{sector}->get_entry( $self->{curr_index}++ );
1181     }
1182
1183     return $self->{iterator}->get_sector_iterator( $loc );
1184 }
1185
1186 package DBM::Deep::Iterator::BucketList;
1187
1188 sub new {
1189     my $self = bless $_[1] => $_[0];
1190     $self->{curr_index} = 0;
1191     return $self;
1192 }
1193
1194 sub at_end {
1195     my $self = shift;
1196     return $self->{curr_index} >= $self->{iterator}{engine}->max_buckets;
1197 }
1198
1199 sub get_next_key {
1200     my $self = shift;
1201
1202     return if $self->at_end;
1203
1204     my $idx = $self->{curr_index}++;
1205
1206     my $data_loc = $self->{sector}->get_data_location_for({
1207         allow_head => 1,
1208         idx        => $idx,
1209     }) or return;
1210
1211     #XXX Do we want to add corruption checks here?
1212     return $self->{sector}->get_key_for( $idx )->data;
1213 }
1214
1215 package DBM::Deep::Engine::Sector;
1216
1217 sub new {
1218     my $self = bless $_[1], $_[0];
1219     Scalar::Util::weaken( $self->{engine} );
1220     $self->_init;
1221     return $self;
1222 }
1223
1224 #sub _init {}
1225 #sub clone { DBM::Deep->_throw_error( "Must be implemented in the child class" ); }
1226
1227 sub engine { $_[0]{engine} }
1228 sub offset { $_[0]{offset} }
1229 sub type   { $_[0]{type} }
1230
1231 sub base_size {
1232    my $self = shift;
1233    return $self->engine->SIG_SIZE + $STALE_SIZE;
1234 }
1235
1236 sub free {
1237     my $self = shift;
1238
1239     my $e = $self->engine;
1240
1241     $e->storage->print_at( $self->offset, $e->SIG_FREE );
1242     # Skip staleness counter
1243     $e->storage->print_at( $self->offset + $self->base_size,
1244         chr(0) x ($self->size - $self->base_size),
1245     );
1246
1247     my $free_meth = $self->free_meth;
1248     $e->$free_meth( $self->offset, $self->size );
1249
1250     return;
1251 }
1252
1253 package DBM::Deep::Engine::Sector::Data;
1254
1255 our @ISA = qw( DBM::Deep::Engine::Sector );
1256
1257 # This is in bytes
1258 sub size { $_[0]{engine}->data_sector_size }
1259 sub free_meth { return '_add_free_data_sector' }
1260
1261 sub clone {
1262     my $self = shift;
1263     return ref($self)->new({
1264         engine => $self->engine,
1265         type   => $self->type,
1266         data   => $self->data,
1267     });
1268 }
1269
1270 package DBM::Deep::Engine::Sector::Scalar;
1271
1272 our @ISA = qw( DBM::Deep::Engine::Sector::Data );
1273
1274 sub free {
1275     my $self = shift;
1276
1277     my $chain_loc = $self->chain_loc;
1278
1279     $self->SUPER::free();
1280
1281     if ( $chain_loc ) {
1282         $self->engine->_load_sector( $chain_loc )->free;
1283     }
1284
1285     return;
1286 }
1287
1288 sub type { $_[0]{engine}->SIG_DATA }
1289 sub _init {
1290     my $self = shift;
1291
1292     my $engine = $self->engine;
1293
1294     unless ( $self->offset ) {
1295         my $data_section = $self->size - $self->base_size - $engine->byte_size - 1;
1296
1297         $self->{offset} = $engine->_request_data_sector( $self->size );
1298
1299         my $data = delete $self->{data};
1300         my $dlen = length $data;
1301         my $continue = 1;
1302         my $curr_offset = $self->offset;
1303         while ( $continue ) {
1304
1305             my $next_offset = 0;
1306
1307             my ($leftover, $this_len, $chunk);
1308             if ( $dlen > $data_section ) {
1309                 $leftover = 0;
1310                 $this_len = $data_section;
1311                 $chunk = substr( $data, 0, $this_len );
1312
1313                 $dlen -= $data_section;
1314                 $next_offset = $engine->_request_data_sector( $self->size );
1315                 $data = substr( $data, $this_len );
1316             }
1317             else {
1318                 $leftover = $data_section - $dlen;
1319                 $this_len = $dlen;
1320                 $chunk = $data;
1321
1322                 $continue = 0;
1323             }
1324
1325             $engine->storage->print_at( $curr_offset, $self->type ); # Sector type
1326             # Skip staleness
1327             $engine->storage->print_at( $curr_offset + $self->base_size,
1328                 pack( $StP{$engine->byte_size}, $next_offset ),  # Chain loc
1329                 pack( $StP{1}, $this_len ),                      # Data length
1330                 $chunk,                                          # Data to be stored in this sector
1331                 chr(0) x $leftover,                              # Zero-fill the rest
1332             );
1333
1334             $curr_offset = $next_offset;
1335         }
1336
1337         return;
1338     }
1339 }
1340
1341 sub data_length {
1342     my $self = shift;
1343
1344     my $buffer = $self->engine->storage->read_at(
1345         $self->offset + $self->base_size + $self->engine->byte_size, 1
1346     );
1347
1348     return unpack( $StP{1}, $buffer );
1349 }
1350
1351 sub chain_loc {
1352     my $self = shift;
1353     return unpack(
1354         $StP{$self->engine->byte_size},
1355         $self->engine->storage->read_at(
1356             $self->offset + $self->base_size,
1357             $self->engine->byte_size,
1358         ),
1359     );
1360 }
1361
1362 sub data {
1363     my $self = shift;
1364 #    my ($args) = @_;
1365 #    $args ||= {};
1366
1367     my $data;
1368     while ( 1 ) {
1369         my $chain_loc = $self->chain_loc;
1370
1371         $data .= $self->engine->storage->read_at(
1372             $self->offset + $self->base_size + $self->engine->byte_size + 1, $self->data_length,
1373         );
1374
1375         last unless $chain_loc;
1376
1377         $self = $self->engine->_load_sector( $chain_loc );
1378     }
1379
1380     return $data;
1381 }
1382
1383 package DBM::Deep::Engine::Sector::Null;
1384
1385 our @ISA = qw( DBM::Deep::Engine::Sector::Data );
1386
1387 sub type { $_[0]{engine}->SIG_NULL }
1388 sub data_length { 0 }
1389 sub data { return }
1390
1391 sub _init {
1392     my $self = shift;
1393
1394     my $engine = $self->engine;
1395
1396     unless ( $self->offset ) {
1397         my $leftover = $self->size - $self->base_size - 1 * $engine->byte_size - 1;
1398
1399         $self->{offset} = $engine->_request_data_sector( $self->size );
1400         $engine->storage->print_at( $self->offset, $self->type ); # Sector type
1401         # Skip staleness counter
1402         $engine->storage->print_at( $self->offset + $self->base_size,
1403             pack( $StP{$engine->byte_size}, 0 ),  # Chain loc
1404             pack( $StP{1}, $self->data_length ),  # Data length
1405             chr(0) x $leftover,                   # Zero-fill the rest
1406         );
1407
1408         return;
1409     }
1410 }
1411
1412 package DBM::Deep::Engine::Sector::Reference;
1413
1414 our @ISA = qw( DBM::Deep::Engine::Sector::Data );
1415
1416 sub _init {
1417     my $self = shift;
1418
1419     my $e = $self->engine;
1420
1421     unless ( $self->offset ) {
1422         my $classname = Scalar::Util::blessed( delete $self->{data} );
1423         my $leftover = $self->size - $self->base_size - 3 * $e->byte_size;
1424
1425         my $class_offset = 0;
1426         if ( defined $classname ) {
1427             my $class_sector = DBM::Deep::Engine::Sector::Scalar->new({
1428                 engine => $e,
1429                 data   => $classname,
1430             });
1431             $class_offset = $class_sector->offset;
1432         }
1433
1434         $self->{offset} = $e->_request_data_sector( $self->size );
1435         $e->storage->print_at( $self->offset, $self->type ); # Sector type
1436         # Skip staleness counter
1437         $e->storage->print_at( $self->offset + $self->base_size,
1438             pack( $StP{$e->byte_size}, 0 ),             # Index/BList loc
1439             pack( $StP{$e->byte_size}, $class_offset ), # Classname loc
1440             pack( $StP{$e->byte_size}, 1 ),             # Initial refcount
1441             chr(0) x $leftover,                         # Zero-fill the rest
1442         );
1443     }
1444     else {
1445         $self->{type} = $e->storage->read_at( $self->offset, 1 );
1446     }
1447
1448     $self->{staleness} = unpack(
1449         $StP{$STALE_SIZE},
1450         $e->storage->read_at( $self->offset + $e->SIG_SIZE, $STALE_SIZE ),
1451     );
1452
1453     return;
1454 }
1455
1456 sub staleness { $_[0]{staleness} }
1457
1458 sub get_data_location_for {
1459     my $self = shift;
1460     my ($args) = @_;
1461
1462     # Assume that the head is not allowed unless otherwise specified.
1463     $args->{allow_head} = 0 unless exists $args->{allow_head};
1464
1465     # Assume we don't create a new blist location unless otherwise specified.
1466     $args->{create} = 0 unless exists $args->{create};
1467
1468     my $blist = $self->get_bucket_list({
1469         key_md5 => $args->{key_md5},
1470         key => $args->{key},
1471         create  => $args->{create},
1472     });
1473     return unless $blist && $blist->{found};
1474
1475     # At this point, $blist knows where the md5 is. What it -doesn't- know yet
1476     # is whether or not this transaction has this key. That's part of the next
1477     # function call.
1478     my $location = $blist->get_data_location_for({
1479         allow_head => $args->{allow_head},
1480     }) or return;
1481
1482     return $location;
1483 }
1484
1485 sub get_data_for {
1486     my $self = shift;
1487     my ($args) = @_;
1488
1489     my $location = $self->get_data_location_for( $args )
1490         or return;
1491
1492     return $self->engine->_load_sector( $location );
1493 }
1494
1495 sub write_data {
1496     my $self = shift;
1497     my ($args) = @_;
1498
1499     my $blist = $self->get_bucket_list({
1500         key_md5 => $args->{key_md5},
1501         key => $args->{key},
1502         create  => 1,
1503     }) or DBM::Deep->_throw_error( "How did write_data fail (no blist)?!" );
1504
1505     # Handle any transactional bookkeeping.
1506     if ( $self->engine->trans_id ) {
1507         if ( ! $blist->has_md5 ) {
1508             $blist->mark_deleted({
1509                 trans_id => 0,
1510             });
1511         }
1512     }
1513     else {
1514         my @trans_ids = $self->engine->get_running_txn_ids;
1515         if ( $blist->has_md5 ) {
1516             if ( @trans_ids ) {
1517                 my $old_value = $blist->get_data_for;
1518                 foreach my $other_trans_id ( @trans_ids ) {
1519                     next if $blist->get_data_location_for({
1520                         trans_id   => $other_trans_id,
1521                         allow_head => 0,
1522                     });
1523                     $blist->write_md5({
1524                         trans_id => $other_trans_id,
1525                         key      => $args->{key},
1526                         key_md5  => $args->{key_md5},
1527                         value    => $old_value->clone,
1528                     });
1529                 }
1530             }
1531         }
1532         else {
1533             if ( @trans_ids ) {
1534                 foreach my $other_trans_id ( @trans_ids ) {
1535                     #XXX This doesn't seem to possible to ever happen . . .
1536                     next if $blist->get_data_location_for({ trans_id => $other_trans_id, allow_head => 0 });
1537                     $blist->mark_deleted({
1538                         trans_id => $other_trans_id,
1539                     });
1540                 }
1541             }
1542         }
1543     }
1544
1545     #XXX Is this safe to do transactionally?
1546     # Free the place we're about to write to.
1547     if ( $blist->get_data_location_for({ allow_head => 0 }) ) {
1548         $blist->get_data_for({ allow_head => 0 })->free;
1549     }
1550
1551     $blist->write_md5({
1552         key      => $args->{key},
1553         key_md5  => $args->{key_md5},
1554         value    => $args->{value},
1555     });
1556 }
1557
1558 sub delete_key {
1559     my $self = shift;
1560     my ($args) = @_;
1561
1562     # XXX What should happen if this fails?
1563     my $blist = $self->get_bucket_list({
1564         key_md5 => $args->{key_md5},
1565     }) or DBM::Deep->_throw_error( "How did delete_key fail (no blist)?!" );
1566
1567     # Save the location so that we can free the data
1568     my $location = $blist->get_data_location_for({
1569         allow_head => 0,
1570     });
1571     my $old_value = $location && $self->engine->_load_sector( $location );
1572
1573     my @trans_ids = $self->engine->get_running_txn_ids;
1574
1575     # If we're the HEAD and there are running txns, then we need to clone this value to the other
1576     # transactions to preserve Isolation.
1577     if ( $self->engine->trans_id == 0 ) {
1578         if ( @trans_ids ) {
1579             foreach my $other_trans_id ( @trans_ids ) {
1580                 next if $blist->get_data_location_for({ trans_id => $other_trans_id, allow_head => 0 });
1581                 $blist->write_md5({
1582                     trans_id => $other_trans_id,
1583                     key      => $args->{key},
1584                     key_md5  => $args->{key_md5},
1585                     value    => $old_value->clone,
1586                 });
1587             }
1588         }
1589     }
1590
1591     my $data;
1592     if ( @trans_ids ) {
1593         $blist->mark_deleted( $args );
1594
1595         if ( $old_value ) {
1596             $data = $old_value->data({ export => 1 });
1597             $old_value->free;
1598         }
1599     }
1600     else {
1601         $data = $blist->delete_md5( $args );
1602     }
1603
1604     return $data;
1605 }
1606
1607 sub get_blist_loc {
1608     my $self = shift;
1609
1610     my $e = $self->engine;
1611     my $blist_loc = $e->storage->read_at( $self->offset + $self->base_size, $e->byte_size );
1612     return unpack( $StP{$e->byte_size}, $blist_loc );
1613 }
1614
1615 sub get_bucket_list {
1616     my $self = shift;
1617     my ($args) = @_;
1618     $args ||= {};
1619
1620     # XXX Add in check here for recycling?
1621
1622     my $engine = $self->engine;
1623
1624     my $blist_loc = $self->get_blist_loc;
1625
1626     # There's no index or blist yet
1627     unless ( $blist_loc ) {
1628         return unless $args->{create};
1629
1630         my $blist = DBM::Deep::Engine::Sector::BucketList->new({
1631             engine  => $engine,
1632             key_md5 => $args->{key_md5},
1633         });
1634
1635         $engine->storage->print_at( $self->offset + $self->base_size,
1636             pack( $StP{$engine->byte_size}, $blist->offset ),
1637         );
1638
1639         return $blist;
1640     }
1641
1642     my $sector = $engine->_load_sector( $blist_loc )
1643         or DBM::Deep->_throw_error( "Cannot read sector at $blist_loc in get_bucket_list()" );
1644     my $i = 0;
1645     my $last_sector = undef;
1646     while ( $sector->isa( 'DBM::Deep::Engine::Sector::Index' ) ) {
1647         $blist_loc = $sector->get_entry( ord( substr( $args->{key_md5}, $i++, 1 ) ) );
1648         $last_sector = $sector;
1649         if ( $blist_loc ) {
1650             $sector = $engine->_load_sector( $blist_loc )
1651                 or DBM::Deep->_throw_error( "Cannot read sector at $blist_loc in get_bucket_list()" );
1652         }
1653         else {
1654             $sector = undef;
1655             last;
1656         }
1657     }
1658
1659     # This means we went through the Index sector(s) and found an empty slot
1660     unless ( $sector ) {
1661         return unless $args->{create};
1662
1663         DBM::Deep->_throw_error( "No last_sector when attempting to build a new entry" )
1664             unless $last_sector;
1665
1666         my $blist = DBM::Deep::Engine::Sector::BucketList->new({
1667             engine  => $engine,
1668             key_md5 => $args->{key_md5},
1669         });
1670
1671         $last_sector->set_entry( ord( substr( $args->{key_md5}, $i - 1, 1 ) ) => $blist->offset );
1672
1673         return $blist;
1674     }
1675
1676     $sector->find_md5( $args->{key_md5} );
1677
1678     # See whether or not we need to reindex the bucketlist
1679     # Yes, the double-braces are there for a reason. if() doesn't create a redo-able block,
1680     # so we have to create a bare block within the if() for redo-purposes. Patch and idea
1681     # submitted by sprout@cpan.org. -RobK, 2008-01-09
1682     if ( !$sector->has_md5 && $args->{create} && $sector->{idx} == -1 ) {{
1683         my $redo;
1684
1685         my $new_index = DBM::Deep::Engine::Sector::Index->new({
1686             engine => $engine,
1687         });
1688
1689         my %blist_cache;
1690         #XXX q.v. the comments for this function.
1691         foreach my $entry ( $sector->chopped_up ) {
1692             my ($spot, $md5) = @{$entry};
1693             my $idx = ord( substr( $md5, $i, 1 ) );
1694
1695             # XXX This is inefficient
1696             my $blist = $blist_cache{$idx}
1697                 ||= DBM::Deep::Engine::Sector::BucketList->new({
1698                     engine => $engine,
1699                 });
1700
1701             $new_index->set_entry( $idx => $blist->offset );
1702
1703             my $new_spot = $blist->write_at_next_open( $md5 );
1704             $engine->reindex_entry( $spot => $new_spot );
1705         }
1706
1707         # Handle the new item separately.
1708         {
1709             my $idx = ord( substr( $args->{key_md5}, $i, 1 ) );
1710
1711             # If all the previous blist's items have been thrown into one
1712             # blist and the new item belongs in there too, we need
1713             # another index.
1714             if ( keys %blist_cache == 1 and each %blist_cache == $idx ) {
1715                 ++$i, ++$redo;
1716             } else {
1717                 my $blist = $blist_cache{$idx}
1718                     ||= DBM::Deep::Engine::Sector::BucketList->new({
1719                         engine => $engine,
1720                     });
1721     
1722                 $new_index->set_entry( $idx => $blist->offset );
1723     
1724                 #XXX THIS IS HACKY!
1725                 $blist->find_md5( $args->{key_md5} );
1726                 $blist->write_md5({
1727                     key     => $args->{key},
1728                     key_md5 => $args->{key_md5},
1729                     value   => DBM::Deep::Engine::Sector::Null->new({
1730                         engine => $engine,
1731                         data   => undef,
1732                     }),
1733                 });
1734             }
1735 #            my $blist = $blist_cache{$idx}
1736 #                ||= DBM::Deep::Engine::Sector::BucketList->new({
1737 #                    engine => $engine,
1738 #                });
1739 #
1740 #            $new_index->set_entry( $idx => $blist->offset );
1741 #
1742 #            #XXX THIS IS HACKY!
1743 #            $blist->find_md5( $args->{key_md5} );
1744 #            $blist->write_md5({
1745 #                key     => $args->{key},
1746 #                key_md5 => $args->{key_md5},
1747 #                value   => DBM::Deep::Engine::Sector::Null->new({
1748 #                    engine => $engine,
1749 #                    data   => undef,
1750 #                }),
1751 #            });
1752         }
1753
1754         if ( $last_sector ) {
1755             $last_sector->set_entry(
1756                 ord( substr( $args->{key_md5}, $i - 1, 1 ) ),
1757                 $new_index->offset,
1758             );
1759         } else {
1760             $engine->storage->print_at( $self->offset + $self->base_size,
1761                 pack( $StP{$engine->byte_size}, $new_index->offset ),
1762             );
1763         }
1764
1765         $sector->clear;
1766         $sector->free;
1767
1768         if ( $redo ) {
1769             (undef, $sector) = %blist_cache;
1770             $last_sector = $new_index;
1771             redo;
1772         }
1773
1774         $sector = $blist_cache{ ord( substr( $args->{key_md5}, $i, 1 ) ) };
1775         $sector->find_md5( $args->{key_md5} );
1776     }}
1777
1778     return $sector;
1779 }
1780
1781 sub get_class_offset {
1782     my $self = shift;
1783
1784     my $e = $self->engine;
1785     return unpack(
1786         $StP{$e->byte_size},
1787         $e->storage->read_at(
1788             $self->offset + $self->base_size + 1 * $e->byte_size, $e->byte_size,
1789         ),
1790     );
1791 }
1792
1793 sub get_classname {
1794     my $self = shift;
1795
1796     my $class_offset = $self->get_class_offset;
1797
1798     return unless $class_offset;
1799
1800     return $self->engine->_load_sector( $class_offset )->data;
1801 }
1802
1803 sub data {
1804     my $self = shift;
1805     my ($args) = @_;
1806     $args ||= {};
1807
1808     my $obj;
1809     unless ( $obj = $self->engine->cache->{ $self->offset } ) {
1810         $obj = DBM::Deep->new({
1811             type        => $self->type,
1812             base_offset => $self->offset,
1813             staleness   => $self->staleness,
1814             storage     => $self->engine->storage,
1815             engine      => $self->engine,
1816         });
1817
1818         if ( $self->engine->storage->{autobless} ) {
1819             my $classname = $self->get_classname;
1820             if ( defined $classname ) {
1821                 bless $obj, $classname;
1822             }
1823         }
1824
1825         $self->engine->cache->{$self->offset} = $obj;
1826     }
1827
1828     # We're not exporting, so just return.
1829     unless ( $args->{export} ) {
1830         return $obj;
1831     }
1832
1833     # We shouldn't export if this is still referred to.
1834     if ( $self->get_refcount > 1 ) {
1835         return $obj;
1836     }
1837
1838     return $obj->export;
1839 }
1840
1841 sub free {
1842     my $self = shift;
1843
1844     # We're not ready to be removed yet.
1845     if ( $self->decrement_refcount > 0 ) {
1846         return;
1847     }
1848
1849     # Rebless the object into DBM::Deep::Null.
1850     eval { %{ $self->engine->cache->{ $self->offset } } = (); };
1851     eval { @{ $self->engine->cache->{ $self->offset } } = (); };
1852     bless $self->engine->cache->{ $self->offset }, 'DBM::Deep::Null';
1853     delete $self->engine->cache->{ $self->offset };
1854
1855     my $blist_loc = $self->get_blist_loc;
1856     $self->engine->_load_sector( $blist_loc )->free if $blist_loc;
1857
1858     my $class_loc = $self->get_class_offset;
1859     $self->engine->_load_sector( $class_loc )->free if $class_loc;
1860
1861     $self->SUPER::free();
1862 }
1863
1864 sub increment_refcount {
1865     my $self = shift;
1866
1867     my $refcount = $self->get_refcount;
1868
1869     $refcount++;
1870
1871     $self->write_refcount( $refcount );
1872
1873     return $refcount;
1874 }
1875
1876 sub decrement_refcount {
1877     my $self = shift;
1878
1879     my $refcount = $self->get_refcount;
1880
1881     $refcount--;
1882
1883     $self->write_refcount( $refcount );
1884
1885     return $refcount;
1886 }
1887
1888 sub get_refcount {
1889     my $self = shift;
1890
1891     my $e = $self->engine;
1892     return unpack(
1893         $StP{$e->byte_size},
1894         $e->storage->read_at(
1895             $self->offset + $self->base_size + 2 * $e->byte_size, $e->byte_size,
1896         ),
1897     );
1898 }
1899
1900 sub write_refcount {
1901     my $self = shift;
1902     my ($num) = @_;
1903
1904     my $e = $self->engine;
1905     $e->storage->print_at(
1906         $self->offset + $self->base_size + 2 * $e->byte_size,
1907         pack( $StP{$e->byte_size}, $num ),
1908     );
1909 }
1910
1911 package DBM::Deep::Engine::Sector::BucketList;
1912
1913 our @ISA = qw( DBM::Deep::Engine::Sector );
1914
1915 sub _init {
1916     my $self = shift;
1917
1918     my $engine = $self->engine;
1919
1920     unless ( $self->offset ) {
1921         my $leftover = $self->size - $self->base_size;
1922
1923         $self->{offset} = $engine->_request_blist_sector( $self->size );
1924         $engine->storage->print_at( $self->offset, $engine->SIG_BLIST ); # Sector type
1925         # Skip staleness counter
1926         $engine->storage->print_at( $self->offset + $self->base_size,
1927             chr(0) x $leftover, # Zero-fill the data
1928         );
1929     }
1930
1931     if ( $self->{key_md5} ) {
1932         $self->find_md5;
1933     }
1934
1935     return $self;
1936 }
1937
1938 sub clear {
1939     my $self = shift;
1940     $self->engine->storage->print_at( $self->offset + $self->base_size,
1941         chr(0) x ($self->size - $self->base_size), # Zero-fill the data
1942     );
1943 }
1944
1945 sub size {
1946     my $self = shift;
1947     unless ( $self->{size} ) {
1948         my $e = $self->engine;
1949         # Base + numbuckets * bucketsize
1950         $self->{size} = $self->base_size + $e->max_buckets * $self->bucket_size;
1951     }
1952     return $self->{size};
1953 }
1954
1955 sub free_meth { return '_add_free_blist_sector' }
1956
1957 sub free {
1958     my $self = shift;
1959
1960     my $e = $self->engine;
1961     foreach my $bucket ( $self->chopped_up ) {
1962         my $rest = $bucket->[-1];
1963
1964         # Delete the keysector
1965         my $l = unpack( $StP{$e->byte_size}, substr( $rest, $e->hash_size, $e->byte_size ) );
1966         my $s = $e->_load_sector( $l ); $s->free if $s;
1967
1968         # Delete the HEAD sector
1969         $l = unpack( $StP{$e->byte_size},
1970             substr( $rest,
1971                 $e->hash_size + $e->byte_size,
1972                 $e->byte_size,
1973             ),
1974         );
1975         $s = $e->_load_sector( $l ); $s->free if $s;
1976
1977         foreach my $txn ( 0 .. $e->num_txns - 2 ) {
1978             my $l = unpack( $StP{$e->byte_size},
1979                 substr( $rest,
1980                     $e->hash_size + 2 * $e->byte_size + $txn * ($e->byte_size + $STALE_SIZE),
1981                     $e->byte_size,
1982                 ),
1983             );
1984             my $s = $e->_load_sector( $l ); $s->free if $s;
1985         }
1986     }
1987
1988     $self->SUPER::free();
1989 }
1990
1991 sub bucket_size {
1992     my $self = shift;
1993     unless ( $self->{bucket_size} ) {
1994         my $e = $self->engine;
1995         # Key + head (location) + transactions (location + staleness-counter)
1996         my $location_size = $e->byte_size + $e->byte_size + ($e->num_txns - 1) * ($e->byte_size + $STALE_SIZE);
1997         $self->{bucket_size} = $e->hash_size + $location_size;
1998     }
1999     return $self->{bucket_size};
2000 }
2001
2002 # XXX This is such a poor hack. I need to rethink this code.
2003 sub chopped_up {
2004     my $self = shift;
2005
2006     my $e = $self->engine;
2007
2008     my @buckets;
2009     foreach my $idx ( 0 .. $e->max_buckets - 1 ) {
2010         my $spot = $self->offset + $self->base_size + $idx * $self->bucket_size;
2011         my $md5 = $e->storage->read_at( $spot, $e->hash_size );
2012
2013         #XXX If we're chopping, why would we ever have the blank_md5?
2014         last if $md5 eq $e->blank_md5;
2015
2016         my $rest = $e->storage->read_at( undef, $self->bucket_size - $e->hash_size );
2017         push @buckets, [ $spot, $md5 . $rest ];
2018     }
2019
2020     return @buckets;
2021 }
2022
2023 sub write_at_next_open {
2024     my $self = shift;
2025     my ($entry) = @_;
2026
2027     #XXX This is such a hack!
2028     $self->{_next_open} = 0 unless exists $self->{_next_open};
2029
2030     my $spot = $self->offset + $self->base_size + $self->{_next_open}++ * $self->bucket_size;
2031     $self->engine->storage->print_at( $spot, $entry );
2032
2033     return $spot;
2034 }
2035
2036 sub has_md5 {
2037     my $self = shift;
2038     unless ( exists $self->{found} ) {
2039         $self->find_md5;
2040     }
2041     return $self->{found};
2042 }
2043
2044 sub find_md5 {
2045     my $self = shift;
2046
2047     $self->{found} = undef;
2048     $self->{idx}   = -1;
2049
2050     if ( @_ ) {
2051         $self->{key_md5} = shift;
2052     }
2053
2054     # If we don't have an MD5, then what are we supposed to do?
2055     unless ( exists $self->{key_md5} ) {
2056         DBM::Deep->_throw_error( "Cannot find_md5 without a key_md5 set" );
2057     }
2058
2059     my $e = $self->engine;
2060     foreach my $idx ( 0 .. $e->max_buckets - 1 ) {
2061         my $potential = $e->storage->read_at(
2062             $self->offset + $self->base_size + $idx * $self->bucket_size, $e->hash_size,
2063         );
2064
2065         if ( $potential eq $e->blank_md5 ) {
2066             $self->{idx} = $idx;
2067             return;
2068         }
2069
2070         if ( $potential eq $self->{key_md5} ) {
2071             $self->{found} = 1;
2072             $self->{idx} = $idx;
2073             return;
2074         }
2075     }
2076
2077     return;
2078 }
2079
2080 sub write_md5 {
2081     my $self = shift;
2082     my ($args) = @_;
2083
2084     DBM::Deep->_throw_error( "write_md5: no key" ) unless exists $args->{key};
2085     DBM::Deep->_throw_error( "write_md5: no key_md5" ) unless exists $args->{key_md5};
2086     DBM::Deep->_throw_error( "write_md5: no value" ) unless exists $args->{value};
2087
2088     my $engine = $self->engine;
2089
2090     $args->{trans_id} = $engine->trans_id unless exists $args->{trans_id};
2091
2092     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
2093     $engine->add_entry( $args->{trans_id}, $spot );
2094
2095     unless ($self->{found}) {
2096         my $key_sector = DBM::Deep::Engine::Sector::Scalar->new({
2097             engine => $engine,
2098             data   => $args->{key},
2099         });
2100
2101         $engine->storage->print_at( $spot,
2102             $args->{key_md5},
2103             pack( $StP{$engine->byte_size}, $key_sector->offset ),
2104         );
2105     }
2106
2107     my $loc = $spot
2108       + $engine->hash_size
2109       + $engine->byte_size;
2110
2111     if ( $args->{trans_id} ) {
2112         $loc += $engine->byte_size + ($args->{trans_id} - 1) * ( $engine->byte_size + $STALE_SIZE );
2113
2114         $engine->storage->print_at( $loc,
2115             pack( $StP{$engine->byte_size}, $args->{value}->offset ),
2116             pack( $StP{$STALE_SIZE}, $engine->get_txn_staleness_counter( $args->{trans_id} ) ),
2117         );
2118     }
2119     else {
2120         $engine->storage->print_at( $loc,
2121             pack( $StP{$engine->byte_size}, $args->{value}->offset ),
2122         );
2123     }
2124 }
2125
2126 sub mark_deleted {
2127     my $self = shift;
2128     my ($args) = @_;
2129     $args ||= {};
2130
2131     my $engine = $self->engine;
2132
2133     $args->{trans_id} = $engine->trans_id unless exists $args->{trans_id};
2134
2135     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
2136     $engine->add_entry( $args->{trans_id}, $spot );
2137
2138     my $loc = $spot
2139       + $engine->hash_size
2140       + $engine->byte_size;
2141
2142     if ( $args->{trans_id} ) {
2143         $loc += $engine->byte_size + ($args->{trans_id} - 1) * ( $engine->byte_size + $STALE_SIZE );
2144
2145         $engine->storage->print_at( $loc,
2146             pack( $StP{$engine->byte_size}, 1 ), # 1 is the marker for deleted
2147             pack( $StP{$STALE_SIZE}, $engine->get_txn_staleness_counter( $args->{trans_id} ) ),
2148         );
2149     }
2150     else {
2151         $engine->storage->print_at( $loc,
2152             pack( $StP{$engine->byte_size}, 1 ), # 1 is the marker for deleted
2153         );
2154     }
2155
2156 }
2157
2158 sub delete_md5 {
2159     my $self = shift;
2160     my ($args) = @_;
2161
2162     my $engine = $self->engine;
2163     return undef unless $self->{found};
2164
2165     # Save the location so that we can free the data
2166     my $location = $self->get_data_location_for({
2167         allow_head => 0,
2168     });
2169     my $key_sector = $self->get_key_for;
2170
2171     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
2172     $engine->storage->print_at( $spot,
2173         $engine->storage->read_at(
2174             $spot + $self->bucket_size,
2175             $self->bucket_size * ( $engine->max_buckets - $self->{idx} - 1 ),
2176         ),
2177         chr(0) x $self->bucket_size,
2178     );
2179
2180     $key_sector->free;
2181
2182     my $data_sector = $self->engine->_load_sector( $location );
2183     my $data = $data_sector->data({ export => 1 });
2184     $data_sector->free;
2185
2186     return $data;
2187 }
2188
2189 sub get_data_location_for {
2190     my $self = shift;
2191     my ($args) = @_;
2192     $args ||= {};
2193
2194     $args->{allow_head} = 0 unless exists $args->{allow_head};
2195     $args->{trans_id}   = $self->engine->trans_id unless exists $args->{trans_id};
2196     $args->{idx}        = $self->{idx} unless exists $args->{idx};
2197
2198     my $e = $self->engine;
2199
2200     my $spot = $self->offset + $self->base_size
2201       + $args->{idx} * $self->bucket_size
2202       + $e->hash_size
2203       + $e->byte_size;
2204
2205     if ( $args->{trans_id} ) {
2206         $spot += $e->byte_size + ($args->{trans_id} - 1) * ( $e->byte_size + $STALE_SIZE );
2207     }
2208
2209     my $buffer = $e->storage->read_at(
2210         $spot,
2211         $e->byte_size + $STALE_SIZE,
2212     );
2213     my ($loc, $staleness) = unpack( $StP{$e->byte_size} . ' ' . $StP{$STALE_SIZE}, $buffer );
2214
2215     # XXX Merge the two if-clauses below
2216     if ( $args->{trans_id} ) {
2217         # We have found an entry that is old, so get rid of it
2218         if ( $staleness != (my $s = $e->get_txn_staleness_counter( $args->{trans_id} ) ) ) {
2219             $e->storage->print_at(
2220                 $spot,
2221                 pack( $StP{$e->byte_size} . ' ' . $StP{$STALE_SIZE}, (0) x 2 ), 
2222             );
2223             $loc = 0;
2224         }
2225     }
2226
2227     # If we're in a transaction and we never wrote to this location, try the
2228     # HEAD instead.
2229     if ( $args->{trans_id} && !$loc && $args->{allow_head} ) {
2230         return $self->get_data_location_for({
2231             trans_id   => 0,
2232             allow_head => 1,
2233             idx        => $args->{idx},
2234         });
2235     }
2236
2237     return $loc <= 1 ? 0 : $loc;
2238 }
2239
2240 sub get_data_for {
2241     my $self = shift;
2242     my ($args) = @_;
2243     $args ||= {};
2244
2245     return unless $self->{found};
2246     my $location = $self->get_data_location_for({
2247         allow_head => $args->{allow_head},
2248     });
2249     return $self->engine->_load_sector( $location );
2250 }
2251
2252 sub get_key_for {
2253     my $self = shift;
2254     my ($idx) = @_;
2255     $idx = $self->{idx} unless defined $idx;
2256
2257     if ( $idx >= $self->engine->max_buckets ) {
2258         DBM::Deep->_throw_error( "get_key_for(): Attempting to retrieve $idx" );
2259     }
2260
2261     my $location = $self->engine->storage->read_at(
2262         $self->offset + $self->base_size + $idx * $self->bucket_size + $self->engine->hash_size,
2263         $self->engine->byte_size,
2264     );
2265     $location = unpack( $StP{$self->engine->byte_size}, $location );
2266     DBM::Deep->_throw_error( "get_key_for: No location?" ) unless $location;
2267
2268     return $self->engine->_load_sector( $location );
2269 }
2270
2271 package DBM::Deep::Engine::Sector::Index;
2272
2273 our @ISA = qw( DBM::Deep::Engine::Sector );
2274
2275 sub _init {
2276     my $self = shift;
2277
2278     my $engine = $self->engine;
2279
2280     unless ( $self->offset ) {
2281         my $leftover = $self->size - $self->base_size;
2282
2283         $self->{offset} = $engine->_request_index_sector( $self->size );
2284         $engine->storage->print_at( $self->offset, $engine->SIG_INDEX ); # Sector type
2285         # Skip staleness counter
2286         $engine->storage->print_at( $self->offset + $self->base_size,
2287             chr(0) x $leftover, # Zero-fill the rest
2288         );
2289     }
2290
2291     return $self;
2292 }
2293
2294 #XXX Change here
2295 sub size {
2296     my $self = shift;
2297     unless ( $self->{size} ) {
2298         my $e = $self->engine;
2299         $self->{size} = $self->base_size + $e->byte_size * $e->hash_chars;
2300     }
2301     return $self->{size};
2302 }
2303
2304 sub free_meth { return '_add_free_index_sector' }
2305
2306 sub free {
2307     my $self = shift;
2308     my $e = $self->engine;
2309
2310     for my $i ( 0 .. $e->hash_chars - 1 ) {
2311         my $l = $self->get_entry( $i ) or next;
2312         $e->_load_sector( $l )->free;
2313     }
2314
2315     $self->SUPER::free();
2316 }
2317
2318 sub _loc_for {
2319     my $self = shift;
2320     my ($idx) = @_;
2321     return $self->offset + $self->base_size + $idx * $self->engine->byte_size;
2322 }
2323
2324 sub get_entry {
2325     my $self = shift;
2326     my ($idx) = @_;
2327
2328     my $e = $self->engine;
2329
2330     DBM::Deep->_throw_error( "get_entry: Out of range ($idx)" )
2331         if $idx < 0 || $idx >= $e->hash_chars;
2332
2333     return unpack(
2334         $StP{$e->byte_size},
2335         $e->storage->read_at( $self->_loc_for( $idx ), $e->byte_size ),
2336     );
2337 }
2338
2339 sub set_entry {
2340     my $self = shift;
2341     my ($idx, $loc) = @_;
2342
2343     my $e = $self->engine;
2344
2345     DBM::Deep->_throw_error( "set_entry: Out of range ($idx)" )
2346         if $idx < 0 || $idx >= $e->hash_chars;
2347
2348     $self->engine->storage->print_at(
2349         $self->_loc_for( $idx ),
2350         pack( $StP{$e->byte_size}, $loc ),
2351     );
2352 }
2353
2354 # This was copied from MARCEL's Class::Null. However, I couldn't use it because
2355 # I need an undef value, not an implementation of the Null Class pattern.
2356 package DBM::Deep::Null;
2357
2358 use overload
2359     'bool'   => sub { undef },
2360     '""'     => sub { undef },
2361     '0+'     => sub { undef },
2362     fallback => 1,
2363     nomethod => 'AUTOLOAD';
2364
2365 sub AUTOLOAD { return; }
2366
2367 1;
2368 __END__