1df77fdfc5ff8334d253247aeaebf631c2dc001a
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 use 5.006_000;
4
5 use strict;
6 use warnings FATAL => 'all';
7
8 # Never import symbols into our namespace. We are a class, not a library.
9 # -RobK, 2008-05-27
10 use Scalar::Util ();
11
12 #use Data::Dumper ();
13
14 # File-wide notes:
15 # * Every method in here assumes that the storage has been appropriately
16 #   safeguarded. This can be anything from flock() to some sort of manual
17 #   mutex. But, it's the caller's responsability to make sure that this has
18 #   been done.
19
20 # Setup file and tag signatures.  These should never change.
21 sub SIG_FILE     () { 'DPDB' }
22 sub SIG_HEADER   () { 'h'    }
23 sub SIG_HASH     () { 'H'    }
24 sub SIG_ARRAY    () { 'A'    }
25 sub SIG_NULL     () { 'N'    }
26 sub SIG_DATA     () { 'D'    }
27 sub SIG_INDEX    () { 'I'    }
28 sub SIG_BLIST    () { 'B'    }
29 sub SIG_FREE     () { 'F'    }
30 sub SIG_SIZE     () {  1     }
31
32 my $STALE_SIZE = 2;
33
34 # Please refer to the pack() documentation for further information
35 my %StP = (
36     1 => 'C', # Unsigned char value (no order needed as it's just one byte)
37     2 => 'n', # Unsigned short in "network" (big-endian) order
38     4 => 'N', # Unsigned long in "network" (big-endian) order
39     8 => 'Q', # Usigned quad (no order specified, presumably machine-dependent)
40 );
41
42 ################################################################################
43
44 sub new {
45     my $class = shift;
46     my ($args) = @_;
47
48     $args->{storage} = DBM::Deep::File->new( $args )
49         unless exists $args->{storage};
50
51     my $self = bless {
52         byte_size   => 4,
53
54         digest      => undef,
55         hash_size   => 16,  # In bytes
56         hash_chars  => 256, # Number of chars the algorithm uses per byte
57         max_buckets => 16,
58         num_txns    => 1,   # The HEAD
59         trans_id    => 0,   # Default to the HEAD
60
61         data_sector_size => 64, # Size in bytes of each data sector
62
63         entries => {}, # This is the list of entries for transactions
64         storage => undef,
65     }, $class;
66
67     # Never allow byte_size to be set directly.
68     delete $args->{byte_size};
69     if ( defined $args->{pack_size} ) {
70         if ( lc $args->{pack_size} eq 'small' ) {
71             $args->{byte_size} = 2;
72         }
73         elsif ( lc $args->{pack_size} eq 'medium' ) {
74             $args->{byte_size} = 4;
75         }
76         elsif ( lc $args->{pack_size} eq 'large' ) {
77             $args->{byte_size} = 8;
78         }
79         else {
80             DBM::Deep->_throw_error( "Unknown pack_size value: '$args->{pack_size}'" );
81         }
82     }
83
84     # Grab the parameters we want to use
85     foreach my $param ( keys %$self ) {
86         next unless exists $args->{$param};
87         $self->{$param} = $args->{$param};
88     }
89
90     my %validations = (
91         max_buckets      => { floor => 16, ceil => 256 },
92         num_txns         => { floor => 1,  ceil => 255 },
93         data_sector_size => { floor => 32, ceil => 256 },
94     );
95
96     while ( my ($attr, $c) = each %validations ) {
97         if (   !defined $self->{$attr}
98             || !length $self->{$attr}
99             || $self->{$attr} =~ /\D/
100             || $self->{$attr} < $c->{floor}
101         ) {
102             $self->{$attr} = '(undef)' if !defined $self->{$attr};
103             warn "Floor of $attr is $c->{floor}. Setting it to $c->{floor} from '$self->{$attr}'\n";
104             $self->{$attr} = $c->{floor};
105         }
106         elsif ( $self->{$attr} > $c->{ceil} ) {
107             warn "Ceiling of $attr is $c->{ceil}. Setting it to $c->{ceil} from '$self->{$attr}'\n";
108             $self->{$attr} = $c->{ceil};
109         }
110     }
111
112     if ( !$self->{digest} ) {
113         require Digest::MD5;
114         $self->{digest} = \&Digest::MD5::md5;
115     }
116
117     return $self;
118 }
119
120 ################################################################################
121
122 sub read_value {
123     my $self = shift;
124     my ($obj, $key) = @_;
125
126     # This will be a Reference sector
127     my $sector = $self->_load_sector( $obj->_base_offset )
128         or return;
129
130     if ( $sector->staleness != $obj->_staleness ) {
131         return;
132     }
133
134     my $key_md5 = $self->_apply_digest( $key );
135
136     my $value_sector = $sector->get_data_for({
137         key_md5    => $key_md5,
138         allow_head => 1,
139     });
140
141     unless ( $value_sector ) {
142         $value_sector = DBM::Deep::Engine::Sector::Null->new({
143             engine => $self,
144             data   => undef,
145         });
146
147         $sector->write_data({
148             key_md5 => $key_md5,
149             key     => $key,
150             value   => $value_sector,
151         });
152     }
153
154     return $value_sector->data;
155 }
156
157 sub get_classname {
158     my $self = shift;
159     my ($obj) = @_;
160
161     # This will be a Reference sector
162     my $sector = $self->_load_sector( $obj->_base_offset )
163         or DBM::Deep->_throw_error( "How did get_classname fail (no sector for '$obj')?!" );
164
165     if ( $sector->staleness != $obj->_staleness ) {
166         return;
167     }
168
169     return $sector->get_classname;
170 }
171
172 sub make_reference {
173     my $self = shift;
174     my ($obj, $old_key, $new_key) = @_;
175
176     # This will be a Reference sector
177     my $sector = $self->_load_sector( $obj->_base_offset )
178         or DBM::Deep->_throw_error( "How did get_classname fail (no sector for '$obj')?!" );
179
180     if ( $sector->staleness != $obj->_staleness ) {
181         return;
182     }
183
184     my $old_md5 = $self->_apply_digest( $old_key );
185
186     my $value_sector = $sector->get_data_for({
187         key_md5    => $old_md5,
188         allow_head => 1,
189     });
190
191     unless ( $value_sector ) {
192         $value_sector = DBM::Deep::Engine::Sector::Null->new({
193             engine => $self,
194             data   => undef,
195         });
196
197         $sector->write_data({
198             key_md5 => $old_md5,
199             key     => $old_key,
200             value   => $value_sector,
201         });
202     }
203
204     if ( $value_sector->isa( 'DBM::Deep::Engine::Sector::Reference' ) ) {
205         $sector->write_data({
206             key     => $new_key,
207             key_md5 => $self->_apply_digest( $new_key ),
208             value   => $value_sector,
209         });
210         $value_sector->increment_refcount;
211     }
212     else {
213         $sector->write_data({
214             key     => $new_key,
215             key_md5 => $self->_apply_digest( $new_key ),
216             value   => $value_sector->clone,
217         });
218     }
219 }
220
221 sub key_exists {
222     my $self = shift;
223     my ($obj, $key) = @_;
224
225     # This will be a Reference sector
226     my $sector = $self->_load_sector( $obj->_base_offset )
227         or return '';
228
229     if ( $sector->staleness != $obj->_staleness ) {
230         return '';
231     }
232
233     my $data = $sector->get_data_for({
234         key_md5    => $self->_apply_digest( $key ),
235         allow_head => 1,
236     });
237
238     # exists() returns 1 or '' for true/false.
239     return $data ? 1 : '';
240 }
241
242 sub delete_key {
243     my $self = shift;
244     my ($obj, $key) = @_;
245
246     my $sector = $self->_load_sector( $obj->_base_offset )
247         or return;
248
249     if ( $sector->staleness != $obj->_staleness ) {
250         return;
251     }
252
253     return $sector->delete_key({
254         key_md5    => $self->_apply_digest( $key ),
255         allow_head => 0,
256     });
257 }
258
259 sub write_value {
260     my $self = shift;
261     my ($obj, $key, $value) = @_;
262
263     my $r = Scalar::Util::reftype( $value ) || '';
264     {
265         last if $r eq '';
266         last if $r eq 'HASH';
267         last if $r eq 'ARRAY';
268
269         DBM::Deep->_throw_error(
270             "Storage of references of type '$r' is not supported."
271         );
272     }
273
274     # This will be a Reference sector
275     my $sector = $self->_load_sector( $obj->_base_offset )
276         or DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep." );
277
278     if ( $sector->staleness != $obj->_staleness ) {
279         DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep." );
280     }
281
282     my ($class, $type);
283     if ( !defined $value ) {
284         $class = 'DBM::Deep::Engine::Sector::Null';
285     }
286     elsif ( $r eq 'ARRAY' || $r eq 'HASH' ) {
287         my $tmpvar;
288         if ( $r eq 'ARRAY' ) {
289             $tmpvar = tied @$value;
290         } elsif ( $r eq 'HASH' ) {
291             $tmpvar = tied %$value;
292         }
293
294         if ( $tmpvar ) {
295             my $is_dbm_deep = eval { local $SIG{'__DIE__'}; $tmpvar->isa( 'DBM::Deep' ); };
296
297             unless ( $is_dbm_deep ) {
298                 DBM::Deep->_throw_error( "Cannot store something that is tied." );
299             }
300
301             unless ( $tmpvar->_engine->storage == $self->storage ) {
302                 DBM::Deep->_throw_error( "Cannot store values across DBM::Deep files. Please use export() instead." );
303             }
304
305             # First, verify if we're storing the same thing to this spot. If we are, then
306             # this should be a no-op. -EJS, 2008-05-19
307             my $loc = $sector->get_data_location_for({
308                 key_md5 => $self->_apply_digest( $key ),
309                 allow_head => 1,
310             });
311
312             if ( defined($loc) && $loc == $tmpvar->_base_offset ) {
313                 return 1;
314             }
315
316             #XXX Can this use $loc?
317             my $value_sector = $self->_load_sector( $tmpvar->_base_offset );
318             $sector->write_data({
319                 key     => $key,
320                 key_md5 => $self->_apply_digest( $key ),
321                 value   => $value_sector,
322             });
323             $value_sector->increment_refcount;
324
325             return 1;
326         }
327
328         $class = 'DBM::Deep::Engine::Sector::Reference';
329         $type = substr( $r, 0, 1 );
330     }
331     else {
332         if ( tied($value) ) {
333             DBM::Deep->_throw_error( "Cannot store something that is tied." );
334         }
335         $class = 'DBM::Deep::Engine::Sector::Scalar';
336     }
337
338     # Create this after loading the reference sector in case something bad happens.
339     # This way, we won't allocate value sector(s) needlessly.
340     my $value_sector = $class->new({
341         engine => $self,
342         data   => $value,
343         type   => $type,
344     });
345
346     $sector->write_data({
347         key     => $key,
348         key_md5 => $self->_apply_digest( $key ),
349         value   => $value_sector,
350     });
351
352     # This code is to make sure we write all the values in the $value to the disk
353     # and to make sure all changes to $value after the assignment are reflected
354     # on disk. This may be counter-intuitive at first, but it is correct dwimmery.
355     #   NOTE - simply tying $value won't perform a STORE on each value. Hence, the
356     # copy to a temp value.
357     if ( $r eq 'ARRAY' ) {
358         my @temp = @$value;
359         tie @$value, 'DBM::Deep', {
360             base_offset => $value_sector->offset,
361             staleness   => $value_sector->staleness,
362             storage     => $self->storage,
363             engine      => $self,
364         };
365         @$value = @temp;
366         bless $value, 'DBM::Deep::Array' unless Scalar::Util::blessed( $value );
367     }
368     elsif ( $r eq 'HASH' ) {
369         my %temp = %$value;
370         tie %$value, 'DBM::Deep', {
371             base_offset => $value_sector->offset,
372             staleness   => $value_sector->staleness,
373             storage     => $self->storage,
374             engine      => $self,
375         };
376
377         %$value = %temp;
378         bless $value, 'DBM::Deep::Hash' unless Scalar::Util::blessed( $value );
379     }
380
381     return 1;
382 }
383
384 # XXX Add staleness here
385 sub get_next_key {
386     my $self = shift;
387     my ($obj, $prev_key) = @_;
388
389     # XXX Need to add logic about resetting the iterator if any key in the reference has changed
390     unless ( $prev_key ) {
391         $obj->{iterator} = DBM::Deep::Iterator->new({
392             base_offset => $obj->_base_offset,
393             engine      => $self,
394         });
395     }
396
397     return $obj->{iterator}->get_next_key( $obj );
398 }
399
400 ################################################################################
401
402 sub setup_fh {
403     my $self = shift;
404     my ($obj) = @_;
405
406     # We're opening the file.
407     unless ( $obj->_base_offset ) {
408         my $bytes_read = $self->_read_file_header;
409
410         # Creating a new file
411         unless ( $bytes_read ) {
412             $self->_write_file_header;
413
414             # 1) Create Array/Hash entry
415             my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
416                 engine => $self,
417                 type   => $obj->_type,
418             });
419             $obj->{base_offset} = $initial_reference->offset;
420             $obj->{staleness} = $initial_reference->staleness;
421
422             $self->storage->flush;
423         }
424         # Reading from an existing file
425         else {
426             $obj->{base_offset} = $bytes_read;
427             my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
428                 engine => $self,
429                 offset => $obj->_base_offset,
430             });
431             unless ( $initial_reference ) {
432                 DBM::Deep->_throw_error("Corrupted file, no master index record");
433             }
434
435             unless ($obj->_type eq $initial_reference->type) {
436                 DBM::Deep->_throw_error("File type mismatch");
437             }
438
439             $obj->{staleness} = $initial_reference->staleness;
440         }
441     }
442
443     $self->storage->set_inode;
444
445     return 1;
446 }
447
448 sub begin_work {
449     my $self = shift;
450     my ($obj) = @_;
451
452     if ( $self->trans_id ) {
453         DBM::Deep->_throw_error( "Cannot begin_work within an active transaction" );
454     }
455
456     my @slots = $self->read_txn_slots;
457     my $found;
458     for my $i ( 0 .. $#slots ) {
459         next if $slots[$i];
460
461         $slots[$i] = 1;
462         $self->set_trans_id( $i + 1 );
463         $found = 1;
464         last;
465     }
466     unless ( $found ) {
467         DBM::Deep->_throw_error( "Cannot allocate transaction ID" );
468     }
469     $self->write_txn_slots( @slots );
470
471     if ( !$self->trans_id ) {
472         DBM::Deep->_throw_error( "Cannot begin_work - no available transactions" );
473     }
474
475     return;
476 }
477
478 sub rollback {
479     my $self = shift;
480     my ($obj) = @_;
481
482     if ( !$self->trans_id ) {
483         DBM::Deep->_throw_error( "Cannot rollback without an active transaction" );
484     }
485
486     # Each entry is the file location for a bucket that has a modification for
487     # this transaction. The entries need to be expunged.
488     foreach my $entry (@{ $self->get_entries } ) {
489         # Remove the entry here
490         my $read_loc = $entry
491           + $self->hash_size
492           + $self->byte_size
493           + $self->byte_size
494           + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE );
495
496         my $data_loc = $self->storage->read_at( $read_loc, $self->byte_size );
497         $data_loc = unpack( $StP{$self->byte_size}, $data_loc );
498         $self->storage->print_at( $read_loc, pack( $StP{$self->byte_size}, 0 ) );
499
500         if ( $data_loc > 1 ) {
501             $self->_load_sector( $data_loc )->free;
502         }
503     }
504
505     $self->clear_entries;
506
507     my @slots = $self->read_txn_slots;
508     $slots[$self->trans_id-1] = 0;
509     $self->write_txn_slots( @slots );
510     $self->inc_txn_staleness_counter( $self->trans_id );
511     $self->set_trans_id( 0 );
512
513     return 1;
514 }
515
516 sub commit {
517     my $self = shift;
518     my ($obj) = @_;
519
520     if ( !$self->trans_id ) {
521         DBM::Deep->_throw_error( "Cannot commit without an active transaction" );
522     }
523
524     foreach my $entry (@{ $self->get_entries } ) {
525         # Overwrite the entry in head with the entry in trans_id
526         my $base = $entry
527           + $self->hash_size
528           + $self->byte_size;
529
530         my $head_loc = $self->storage->read_at( $base, $self->byte_size );
531         $head_loc = unpack( $StP{$self->byte_size}, $head_loc );
532
533         my $spot = $base + $self->byte_size + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE );
534         my $trans_loc = $self->storage->read_at(
535             $spot, $self->byte_size,
536         );
537
538         $self->storage->print_at( $base, $trans_loc );
539         $self->storage->print_at(
540             $spot,
541             pack( $StP{$self->byte_size} . ' ' . $StP{$STALE_SIZE}, (0) x 2 ),
542         );
543
544         if ( $head_loc > 1 ) {
545             $self->_load_sector( $head_loc )->free;
546         }
547     }
548
549     $self->clear_entries;
550
551     my @slots = $self->read_txn_slots;
552     $slots[$self->trans_id-1] = 0;
553     $self->write_txn_slots( @slots );
554     $self->inc_txn_staleness_counter( $self->trans_id );
555     $self->set_trans_id( 0 );
556
557     return 1;
558 }
559
560 sub read_txn_slots {
561     my $self = shift;
562     my $bl = $self->txn_bitfield_len;
563     my $num_bits = $bl * 8;
564     return split '', unpack( 'b'.$num_bits,
565         $self->storage->read_at(
566             $self->trans_loc, $bl,
567         )
568     );
569 }
570
571 sub write_txn_slots {
572     my $self = shift;
573     my $num_bits = $self->txn_bitfield_len * 8;
574     $self->storage->print_at( $self->trans_loc,
575         pack( 'b'.$num_bits, join('', @_) ),
576     );
577 }
578
579 sub get_running_txn_ids {
580     my $self = shift;
581     my @transactions = $self->read_txn_slots;
582     my @trans_ids = map { $_+1} grep { $transactions[$_] } 0 .. $#transactions;
583 }
584
585 sub get_txn_staleness_counter {
586     my $self = shift;
587     my ($trans_id) = @_;
588
589     # Hardcode staleness of 0 for the HEAD
590     return 0 unless $trans_id;
591
592     return unpack( $StP{$STALE_SIZE},
593         $self->storage->read_at(
594             $self->trans_loc + $self->txn_bitfield_len + $STALE_SIZE * ($trans_id - 1),
595             $STALE_SIZE,
596         )
597     );
598 }
599
600 sub inc_txn_staleness_counter {
601     my $self = shift;
602     my ($trans_id) = @_;
603
604     # Hardcode staleness of 0 for the HEAD
605     return 0 unless $trans_id;
606
607     $self->storage->print_at(
608         $self->trans_loc + $self->txn_bitfield_len + $STALE_SIZE * ($trans_id - 1),
609         pack( $StP{$STALE_SIZE}, $self->get_txn_staleness_counter( $trans_id ) + 1 ),
610     );
611 }
612
613 sub get_entries {
614     my $self = shift;
615     return [ keys %{ $self->{entries}{$self->trans_id} ||= {} } ];
616 }
617
618 sub add_entry {
619     my $self = shift;
620     my ($trans_id, $loc) = @_;
621
622     $self->{entries}{$trans_id} ||= {};
623     $self->{entries}{$trans_id}{$loc} = undef;
624 }
625
626 # If the buckets are being relocated because of a reindexing, the entries
627 # mechanism needs to be made aware of it.
628 sub reindex_entry {
629     my $self = shift;
630     my ($old_loc, $new_loc) = @_;
631
632     TRANS:
633     while ( my ($trans_id, $locs) = each %{ $self->{entries} } ) {
634         if ( exists $locs->{$old_loc} ) {
635             delete $locs->{$old_loc};
636             $locs->{$new_loc} = undef;
637             next TRANS;
638         }
639     }
640 }
641
642 sub clear_entries {
643     my $self = shift;
644     delete $self->{entries}{$self->trans_id};
645 }
646
647 ################################################################################
648
649 {
650     my $header_fixed = length( SIG_FILE ) + 1 + 4 + 4;
651     my $this_file_version = 3;
652
653     sub _write_file_header {
654         my $self = shift;
655
656         my $nt = $self->num_txns;
657         my $bl = $self->txn_bitfield_len;
658
659         my $header_var = 1 + 1 + 1 + 1 + $bl + $STALE_SIZE * ($nt - 1) + 3 * $self->byte_size;
660
661         my $loc = $self->storage->request_space( $header_fixed + $header_var );
662
663         $self->storage->print_at( $loc,
664             SIG_FILE,
665             SIG_HEADER,
666             pack('N', $this_file_version), # At this point, we're at 9 bytes
667             pack('N', $header_var),        # header size
668             # --- Above is $header_fixed. Below is $header_var
669             pack('C', $self->byte_size),
670
671             # These shenanigans are to allow a 256 within a C
672             pack('C', $self->max_buckets - 1),
673             pack('C', $self->data_sector_size - 1),
674
675             pack('C', $nt),
676             pack('C' . $bl, 0 ),                           # Transaction activeness bitfield
677             pack($StP{$STALE_SIZE}.($nt-1), 0 x ($nt-1) ), # Transaction staleness counters
678             pack($StP{$self->byte_size}, 0), # Start of free chain (blist size)
679             pack($StP{$self->byte_size}, 0), # Start of free chain (data size)
680             pack($StP{$self->byte_size}, 0), # Start of free chain (index size)
681         );
682
683         #XXX Set these less fragilely
684         $self->set_trans_loc( $header_fixed + 4 );
685         $self->set_chains_loc( $header_fixed + 4 + $bl + $STALE_SIZE * ($nt-1) );
686
687         return;
688     }
689
690     sub _read_file_header {
691         my $self = shift;
692
693         my $buffer = $self->storage->read_at( 0, $header_fixed );
694         return unless length($buffer);
695
696         my ($file_signature, $sig_header, $file_version, $size) = unpack(
697             'A4 A N N', $buffer
698         );
699
700         unless ( $file_signature eq SIG_FILE ) {
701             $self->storage->close;
702             DBM::Deep->_throw_error( "Signature not found -- file is not a Deep DB" );
703         }
704
705         unless ( $sig_header eq SIG_HEADER ) {
706             $self->storage->close;
707             DBM::Deep->_throw_error( "Pre-1.00 file version found" );
708         }
709
710         unless ( $file_version == $this_file_version ) {
711             $self->storage->close;
712             DBM::Deep->_throw_error(
713                 "Wrong file version found - " .  $file_version .
714                 " - expected " . $this_file_version
715             );
716         }
717
718         my $buffer2 = $self->storage->read_at( undef, $size );
719         my @values = unpack( 'C C C C', $buffer2 );
720
721         if ( @values != 4 || grep { !defined } @values ) {
722             $self->storage->close;
723             DBM::Deep->_throw_error("Corrupted file - bad header");
724         }
725
726         #XXX Add warnings if values weren't set right
727         @{$self}{qw(byte_size max_buckets data_sector_size num_txns)} = @values;
728
729         # These shenangians are to allow a 256 within a C
730         $self->{max_buckets} += 1;
731         $self->{data_sector_size} += 1;
732
733         my $bl = $self->txn_bitfield_len;
734
735         my $header_var = scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) + 3 * $self->byte_size;
736         unless ( $size == $header_var ) {
737             $self->storage->close;
738             DBM::Deep->_throw_error( "Unexpected size found ($size <-> $header_var)." );
739         }
740
741         $self->set_trans_loc( $header_fixed + scalar(@values) );
742         $self->set_chains_loc( $header_fixed + scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) );
743
744         return length($buffer) + length($buffer2);
745     }
746 }
747
748 sub _load_sector {
749     my $self = shift;
750     my ($offset) = @_;
751
752     # Add a catch for offset of 0 or 1
753     return if !$offset || $offset <= 1;
754
755     my $type = $self->storage->read_at( $offset, 1 );
756     return if $type eq chr(0);
757
758     if ( $type eq $self->SIG_ARRAY || $type eq $self->SIG_HASH ) {
759         return DBM::Deep::Engine::Sector::Reference->new({
760             engine => $self,
761             type   => $type,
762             offset => $offset,
763         });
764     }
765     # XXX Don't we need key_md5 here?
766     elsif ( $type eq $self->SIG_BLIST ) {
767         return DBM::Deep::Engine::Sector::BucketList->new({
768             engine => $self,
769             type   => $type,
770             offset => $offset,
771         });
772     }
773     elsif ( $type eq $self->SIG_INDEX ) {
774         return DBM::Deep::Engine::Sector::Index->new({
775             engine => $self,
776             type   => $type,
777             offset => $offset,
778         });
779     }
780     elsif ( $type eq $self->SIG_NULL ) {
781         return DBM::Deep::Engine::Sector::Null->new({
782             engine => $self,
783             type   => $type,
784             offset => $offset,
785         });
786     }
787     elsif ( $type eq $self->SIG_DATA ) {
788         return DBM::Deep::Engine::Sector::Scalar->new({
789             engine => $self,
790             type   => $type,
791             offset => $offset,
792         });
793     }
794     # This was deleted from under us, so just return and let the caller figure it out.
795     elsif ( $type eq $self->SIG_FREE ) {
796         return;
797     }
798
799     DBM::Deep->_throw_error( "'$offset': Don't know what to do with type '$type'" );
800 }
801
802 sub _apply_digest {
803     my $self = shift;
804     return $self->{digest}->(@_);
805 }
806
807 sub _add_free_blist_sector { shift->_add_free_sector( 0, @_ ) }
808 sub _add_free_data_sector { shift->_add_free_sector( 1, @_ ) }
809 sub _add_free_index_sector { shift->_add_free_sector( 2, @_ ) }
810
811 sub _add_free_sector {
812     my $self = shift;
813     my ($multiple, $offset, $size) = @_;
814
815     my $chains_offset = $multiple * $self->byte_size;
816
817     my $storage = $self->storage;
818
819     # Increment staleness.
820     # XXX Can this increment+modulo be done by "&= 0x1" ?
821     my $staleness = unpack( $StP{$STALE_SIZE}, $storage->read_at( $offset + SIG_SIZE, $STALE_SIZE ) );
822     $staleness = ($staleness + 1 ) % ( 2 ** ( 8 * $STALE_SIZE ) );
823     $storage->print_at( $offset + SIG_SIZE, pack( $StP{$STALE_SIZE}, $staleness ) );
824
825     my $old_head = $storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
826
827     $storage->print_at( $self->chains_loc + $chains_offset,
828         pack( $StP{$self->byte_size}, $offset ),
829     );
830
831     # Record the old head in the new sector after the signature and staleness counter
832     $storage->print_at( $offset + SIG_SIZE + $STALE_SIZE, $old_head );
833 }
834
835 sub _request_blist_sector { shift->_request_sector( 0, @_ ) }
836 sub _request_data_sector { shift->_request_sector( 1, @_ ) }
837 sub _request_index_sector { shift->_request_sector( 2, @_ ) }
838
839 sub _request_sector {
840     my $self = shift;
841     my ($multiple, $size) = @_;
842
843     my $chains_offset = $multiple * $self->byte_size;
844
845     my $old_head = $self->storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
846     my $loc = unpack( $StP{$self->byte_size}, $old_head );
847
848     # We don't have any free sectors of the right size, so allocate a new one.
849     unless ( $loc ) {
850         my $offset = $self->storage->request_space( $size );
851
852         # Zero out the new sector. This also guarantees correct increases
853         # in the filesize.
854         $self->storage->print_at( $offset, chr(0) x $size );
855
856         return $offset;
857     }
858
859     # Read the new head after the signature and the staleness counter
860     my $new_head = $self->storage->read_at( $loc + SIG_SIZE + $STALE_SIZE, $self->byte_size );
861     $self->storage->print_at( $self->chains_loc + $chains_offset, $new_head );
862     $self->storage->print_at(
863         $loc + SIG_SIZE + $STALE_SIZE,
864         pack( $StP{$self->byte_size}, 0 ),
865     );
866
867     return $loc;
868 }
869
870 ################################################################################
871
872 sub flush {
873     my $self = shift;
874
875 #    my $sectors = $self->dirty_sectors;
876 #    for my $offset (sort { $a <=> $b } keys %{ $sectors }) {
877 #        $self->storage->print_at( $offset, $self->sector_cache->{$offset} );
878 #    }
879
880     # Why do we need to have the storage flush? Shouldn't autoflush take care of things?
881     # -RobK, 2008-06-26
882     $self->storage->flush;
883
884 #    $self->clear_dirty_sectors;
885
886 #    $self->clear_sector_cache;
887 }
888
889 sub lock_exclusive {
890     my $self = shift;
891     my ($obj) = @_;
892     return $self->storage->lock_exclusive( $obj );
893 }
894
895 sub lock_shared {
896     my $self = shift;
897     my ($obj) = @_;
898     return $self->storage->lock_shared( $obj );
899 }
900
901 sub unlock {
902     my $self = shift;
903     my ($obj) = @_;
904
905     my $rv = $self->storage->unlock( $obj );
906
907     $self->flush if $rv;
908
909     return $rv;
910 }
911
912 ################################################################################
913
914 sub storage     { $_[0]{storage} }
915 sub byte_size   { $_[0]{byte_size} }
916 sub hash_size   { $_[0]{hash_size} }
917 sub hash_chars  { $_[0]{hash_chars} }
918 sub num_txns    { $_[0]{num_txns} }
919 sub max_buckets { $_[0]{max_buckets} }
920 sub blank_md5   { chr(0) x $_[0]->hash_size }
921 sub data_sector_size { $_[0]{data_sector_size} }
922
923 # This is a calculated value
924 sub txn_bitfield_len {
925     my $self = shift;
926     unless ( exists $self->{txn_bitfield_len} ) {
927         my $temp = ($self->num_txns) / 8;
928         if ( $temp > int( $temp ) ) {
929             $temp = int( $temp ) + 1;
930         }
931         $self->{txn_bitfield_len} = $temp;
932     }
933     return $self->{txn_bitfield_len};
934 }
935
936 sub trans_id     { $_[0]{trans_id} }
937 sub set_trans_id { $_[0]{trans_id} = $_[1] }
938
939 sub trans_loc     { $_[0]{trans_loc} }
940 sub set_trans_loc { $_[0]{trans_loc} = $_[1] }
941
942 sub chains_loc     { $_[0]{chains_loc} }
943 sub set_chains_loc { $_[0]{chains_loc} = $_[1] }
944
945 sub cache       { $_[0]{cache} ||= {} }
946 sub clear_cache { %{$_[0]->cache} = () }
947
948 sub _dump_file {
949     my $self = shift;
950
951     # Read the header
952     my $spot = $self->_read_file_header();
953
954     my %types = (
955         0 => 'B',
956         1 => 'D',
957         2 => 'I',
958     );
959
960     my %sizes = (
961         'D' => $self->data_sector_size,
962         'B' => DBM::Deep::Engine::Sector::BucketList->new({engine=>$self,offset=>1})->size,
963         'I' => DBM::Deep::Engine::Sector::Index->new({engine=>$self,offset=>1})->size,
964     );
965
966     my $return = "";
967
968     # Header values
969     $return .= "NumTxns: " . $self->num_txns . $/;
970
971     # Read the free sector chains
972     my %sectors;
973     foreach my $multiple ( 0 .. 2 ) {
974         $return .= "Chains($types{$multiple}):";
975         my $old_loc = $self->chains_loc + $multiple * $self->byte_size;
976         while ( 1 ) {
977             my $loc = unpack(
978                 $StP{$self->byte_size},
979                 $self->storage->read_at( $old_loc, $self->byte_size ),
980             );
981
982             # We're now out of free sectors of this kind.
983             unless ( $loc ) {
984                 last;
985             }
986
987             $sectors{ $types{$multiple} }{ $loc } = undef;
988             $old_loc = $loc + SIG_SIZE + $STALE_SIZE;
989             $return .= " $loc";
990         }
991         $return .= $/;
992     }
993
994     SECTOR:
995     while ( $spot < $self->storage->{end} ) {
996         # Read each sector in order.
997         my $sector = $self->_load_sector( $spot );
998         if ( !$sector ) {
999             # Find it in the free-sectors that were found already
1000             foreach my $type ( keys %sectors ) {
1001                 if ( exists $sectors{$type}{$spot} ) {
1002                     my $size = $sizes{$type};
1003                     $return .= sprintf "%08d: %s %04d\n", $spot, 'F' . $type, $size;
1004                     $spot += $size;
1005                     next SECTOR;
1006                 }
1007             }
1008
1009             die "********\n$return\nDidn't find free sector for $spot in chains\n********\n";
1010         }
1011         else {
1012             $return .= sprintf "%08d: %s  %04d", $spot, $sector->type, $sector->size;
1013             if ( $sector->type eq 'D' ) {
1014                 $return .= ' ' . $sector->data;
1015             }
1016             elsif ( $sector->type eq 'A' || $sector->type eq 'H' ) {
1017                 $return .= ' REF: ' . $sector->get_refcount;
1018             }
1019             elsif ( $sector->type eq 'B' ) {
1020                 foreach my $bucket ( $sector->chopped_up ) {
1021                     $return .= "\n    ";
1022                     $return .= sprintf "%08d", unpack($StP{$self->byte_size},
1023                         substr( $bucket->[-1], $self->hash_size, $self->byte_size),
1024                     );
1025                     my $l = unpack( $StP{$self->byte_size},
1026                         substr( $bucket->[-1],
1027                             $self->hash_size + $self->byte_size,
1028                             $self->byte_size,
1029                         ),
1030                     );
1031                     $return .= sprintf " %08d", $l;
1032                     foreach my $txn ( 0 .. $self->num_txns - 2 ) {
1033                         my $l = unpack( $StP{$self->byte_size},
1034                             substr( $bucket->[-1],
1035                                 $self->hash_size + 2 * $self->byte_size + $txn * ($self->byte_size + $STALE_SIZE),
1036                                 $self->byte_size,
1037                             ),
1038                         );
1039                         $return .= sprintf " %08d", $l;
1040                     }
1041                 }
1042             }
1043             $return .= $/;
1044
1045             $spot += $sector->size;
1046         }
1047     }
1048
1049     return $return;
1050 }
1051
1052 ################################################################################
1053
1054 package DBM::Deep::Iterator;
1055
1056 sub new {
1057     my $class = shift;
1058     my ($args) = @_;
1059
1060     my $self = bless {
1061         breadcrumbs => [],
1062         engine      => $args->{engine},
1063         base_offset => $args->{base_offset},
1064     }, $class;
1065
1066     Scalar::Util::weaken( $self->{engine} );
1067
1068     return $self;
1069 }
1070
1071 sub reset { $_[0]{breadcrumbs} = [] }
1072
1073 sub get_sector_iterator {
1074     my $self = shift;
1075     my ($loc) = @_;
1076
1077     my $sector = $self->{engine}->_load_sector( $loc )
1078         or return;
1079
1080     if ( $sector->isa( 'DBM::Deep::Engine::Sector::Index' ) ) {
1081         return DBM::Deep::Iterator::Index->new({
1082             iterator => $self,
1083             sector   => $sector,
1084         });
1085     }
1086     elsif ( $sector->isa( 'DBM::Deep::Engine::Sector::BucketList' ) ) {
1087         return DBM::Deep::Iterator::BucketList->new({
1088             iterator => $self,
1089             sector   => $sector,
1090         });
1091     }
1092
1093     DBM::Deep->_throw_error( "get_sector_iterator(): Why did $loc make a $sector?" );
1094 }
1095
1096 sub get_next_key {
1097     my $self = shift;
1098     my ($obj) = @_;
1099
1100     my $crumbs = $self->{breadcrumbs};
1101     my $e = $self->{engine};
1102
1103     unless ( @$crumbs ) {
1104         # This will be a Reference sector
1105         my $sector = $e->_load_sector( $self->{base_offset} )
1106             # If no sector is found, thist must have been deleted from under us.
1107             or return;
1108
1109         if ( $sector->staleness != $obj->_staleness ) {
1110             return;
1111         }
1112
1113         my $loc = $sector->get_blist_loc
1114             or return;
1115
1116         push @$crumbs, $self->get_sector_iterator( $loc );
1117     }
1118
1119     FIND_NEXT_KEY: {
1120         # We're at the end.
1121         unless ( @$crumbs ) {
1122             $self->reset;
1123             return;
1124         }
1125
1126         my $iterator = $crumbs->[-1];
1127
1128         # This level is done.
1129         if ( $iterator->at_end ) {
1130             pop @$crumbs;
1131             redo FIND_NEXT_KEY;
1132         }
1133
1134         if ( $iterator->isa( 'DBM::Deep::Iterator::Index' ) ) {
1135             # If we don't have any more, it will be caught at the
1136             # prior check.
1137             if ( my $next = $iterator->get_next_iterator ) {
1138                 push @$crumbs, $next;
1139             }
1140             redo FIND_NEXT_KEY;
1141         }
1142
1143         unless ( $iterator->isa( 'DBM::Deep::Iterator::BucketList' ) ) {
1144             DBM::Deep->_throw_error(
1145                 "Should have a bucketlist iterator here - instead have $iterator"
1146             );
1147         }
1148
1149         # At this point, we have a BucketList iterator
1150         my $key = $iterator->get_next_key;
1151         if ( defined $key ) {
1152             return $key;
1153         }
1154         #XXX else { $iterator->set_to_end() } ?
1155
1156         # We hit the end of the bucketlist iterator, so redo
1157         redo FIND_NEXT_KEY;
1158     }
1159
1160     DBM::Deep->_throw_error( "get_next_key(): How did we get here?" );
1161 }
1162
1163 package DBM::Deep::Iterator::Index;
1164
1165 sub new {
1166     my $self = bless $_[1] => $_[0];
1167     $self->{curr_index} = 0;
1168     return $self;
1169 }
1170
1171 sub at_end {
1172     my $self = shift;
1173     return $self->{curr_index} >= $self->{iterator}{engine}->hash_chars;
1174 }
1175
1176 sub get_next_iterator {
1177     my $self = shift;
1178
1179     my $loc;
1180     while ( !$loc ) {
1181         return if $self->at_end;
1182         $loc = $self->{sector}->get_entry( $self->{curr_index}++ );
1183     }
1184
1185     return $self->{iterator}->get_sector_iterator( $loc );
1186 }
1187
1188 package DBM::Deep::Iterator::BucketList;
1189
1190 sub new {
1191     my $self = bless $_[1] => $_[0];
1192     $self->{curr_index} = 0;
1193     return $self;
1194 }
1195
1196 sub at_end {
1197     my $self = shift;
1198     return $self->{curr_index} >= $self->{iterator}{engine}->max_buckets;
1199 }
1200
1201 sub get_next_key {
1202     my $self = shift;
1203
1204     return if $self->at_end;
1205
1206     my $idx = $self->{curr_index}++;
1207
1208     my $data_loc = $self->{sector}->get_data_location_for({
1209         allow_head => 1,
1210         idx        => $idx,
1211     }) or return;
1212
1213     #XXX Do we want to add corruption checks here?
1214     return $self->{sector}->get_key_for( $idx )->data;
1215 }
1216
1217 package DBM::Deep::Engine::Sector;
1218
1219 sub new {
1220     my $self = bless $_[1], $_[0];
1221     Scalar::Util::weaken( $self->{engine} );
1222     $self->_init;
1223     return $self;
1224 }
1225
1226 #sub _init {}
1227 #sub clone { DBM::Deep->_throw_error( "Must be implemented in the child class" ); }
1228
1229 sub engine { $_[0]{engine} }
1230 sub offset { $_[0]{offset} }
1231 sub type   { $_[0]{type} }
1232
1233 sub base_size {
1234    my $self = shift;
1235    return $self->engine->SIG_SIZE + $STALE_SIZE;
1236 }
1237
1238 sub free {
1239     my $self = shift;
1240
1241     my $e = $self->engine;
1242
1243     $e->storage->print_at( $self->offset, $e->SIG_FREE );
1244     # Skip staleness counter
1245     $e->storage->print_at( $self->offset + $self->base_size,
1246         chr(0) x ($self->size - $self->base_size),
1247     );
1248
1249     my $free_meth = $self->free_meth;
1250     $e->$free_meth( $self->offset, $self->size );
1251
1252     return;
1253 }
1254
1255 package DBM::Deep::Engine::Sector::Data;
1256
1257 our @ISA = qw( DBM::Deep::Engine::Sector );
1258
1259 # This is in bytes
1260 sub size { $_[0]{engine}->data_sector_size }
1261 sub free_meth { return '_add_free_data_sector' }
1262
1263 sub clone {
1264     my $self = shift;
1265     return ref($self)->new({
1266         engine => $self->engine,
1267         type   => $self->type,
1268         data   => $self->data,
1269     });
1270 }
1271
1272 package DBM::Deep::Engine::Sector::Scalar;
1273
1274 our @ISA = qw( DBM::Deep::Engine::Sector::Data );
1275
1276 sub free {
1277     my $self = shift;
1278
1279     my $chain_loc = $self->chain_loc;
1280
1281     $self->SUPER::free();
1282
1283     if ( $chain_loc ) {
1284         $self->engine->_load_sector( $chain_loc )->free;
1285     }
1286
1287     return;
1288 }
1289
1290 sub type { $_[0]{engine}->SIG_DATA }
1291 sub _init {
1292     my $self = shift;
1293
1294     my $engine = $self->engine;
1295
1296     unless ( $self->offset ) {
1297         my $data_section = $self->size - $self->base_size - $engine->byte_size - 1;
1298
1299         $self->{offset} = $engine->_request_data_sector( $self->size );
1300
1301         my $data = delete $self->{data};
1302         my $dlen = length $data;
1303         my $continue = 1;
1304         my $curr_offset = $self->offset;
1305         while ( $continue ) {
1306
1307             my $next_offset = 0;
1308
1309             my ($leftover, $this_len, $chunk);
1310             if ( $dlen > $data_section ) {
1311                 $leftover = 0;
1312                 $this_len = $data_section;
1313                 $chunk = substr( $data, 0, $this_len );
1314
1315                 $dlen -= $data_section;
1316                 $next_offset = $engine->_request_data_sector( $self->size );
1317                 $data = substr( $data, $this_len );
1318             }
1319             else {
1320                 $leftover = $data_section - $dlen;
1321                 $this_len = $dlen;
1322                 $chunk = $data;
1323
1324                 $continue = 0;
1325             }
1326
1327             $engine->storage->print_at( $curr_offset, $self->type ); # Sector type
1328             # Skip staleness
1329             $engine->storage->print_at( $curr_offset + $self->base_size,
1330                 pack( $StP{$engine->byte_size}, $next_offset ),  # Chain loc
1331                 pack( $StP{1}, $this_len ),                      # Data length
1332                 $chunk,                                          # Data to be stored in this sector
1333                 chr(0) x $leftover,                              # Zero-fill the rest
1334             );
1335
1336             $curr_offset = $next_offset;
1337         }
1338
1339         return;
1340     }
1341 }
1342
1343 sub data_length {
1344     my $self = shift;
1345
1346     my $buffer = $self->engine->storage->read_at(
1347         $self->offset + $self->base_size + $self->engine->byte_size, 1
1348     );
1349
1350     return unpack( $StP{1}, $buffer );
1351 }
1352
1353 sub chain_loc {
1354     my $self = shift;
1355     return unpack(
1356         $StP{$self->engine->byte_size},
1357         $self->engine->storage->read_at(
1358             $self->offset + $self->base_size,
1359             $self->engine->byte_size,
1360         ),
1361     );
1362 }
1363
1364 sub data {
1365     my $self = shift;
1366 #    my ($args) = @_;
1367 #    $args ||= {};
1368
1369     my $data;
1370     while ( 1 ) {
1371         my $chain_loc = $self->chain_loc;
1372
1373         $data .= $self->engine->storage->read_at(
1374             $self->offset + $self->base_size + $self->engine->byte_size + 1, $self->data_length,
1375         );
1376
1377         last unless $chain_loc;
1378
1379         $self = $self->engine->_load_sector( $chain_loc );
1380     }
1381
1382     return $data;
1383 }
1384
1385 package DBM::Deep::Engine::Sector::Null;
1386
1387 our @ISA = qw( DBM::Deep::Engine::Sector::Data );
1388
1389 sub type { $_[0]{engine}->SIG_NULL }
1390 sub data_length { 0 }
1391 sub data { return }
1392
1393 sub _init {
1394     my $self = shift;
1395
1396     my $engine = $self->engine;
1397
1398     unless ( $self->offset ) {
1399         my $leftover = $self->size - $self->base_size - 1 * $engine->byte_size - 1;
1400
1401         $self->{offset} = $engine->_request_data_sector( $self->size );
1402         $engine->storage->print_at( $self->offset, $self->type ); # Sector type
1403         # Skip staleness counter
1404         $engine->storage->print_at( $self->offset + $self->base_size,
1405             pack( $StP{$engine->byte_size}, 0 ),  # Chain loc
1406             pack( $StP{1}, $self->data_length ),  # Data length
1407             chr(0) x $leftover,                   # Zero-fill the rest
1408         );
1409
1410         return;
1411     }
1412 }
1413
1414 package DBM::Deep::Engine::Sector::Reference;
1415
1416 our @ISA = qw( DBM::Deep::Engine::Sector::Data );
1417
1418 sub _init {
1419     my $self = shift;
1420
1421     my $e = $self->engine;
1422
1423     unless ( $self->offset ) {
1424         my $classname = Scalar::Util::blessed( delete $self->{data} );
1425         my $leftover = $self->size - $self->base_size - 3 * $e->byte_size;
1426
1427         my $class_offset = 0;
1428         if ( defined $classname ) {
1429             my $class_sector = DBM::Deep::Engine::Sector::Scalar->new({
1430                 engine => $e,
1431                 data   => $classname,
1432             });
1433             $class_offset = $class_sector->offset;
1434         }
1435
1436         $self->{offset} = $e->_request_data_sector( $self->size );
1437         $e->storage->print_at( $self->offset, $self->type ); # Sector type
1438         # Skip staleness counter
1439         $e->storage->print_at( $self->offset + $self->base_size,
1440             pack( $StP{$e->byte_size}, 0 ),             # Index/BList loc
1441             pack( $StP{$e->byte_size}, $class_offset ), # Classname loc
1442             pack( $StP{$e->byte_size}, 1 ),             # Initial refcount
1443             chr(0) x $leftover,                         # Zero-fill the rest
1444         );
1445     }
1446     else {
1447         $self->{type} = $e->storage->read_at( $self->offset, 1 );
1448     }
1449
1450     $self->{staleness} = unpack(
1451         $StP{$STALE_SIZE},
1452         $e->storage->read_at( $self->offset + $e->SIG_SIZE, $STALE_SIZE ),
1453     );
1454
1455     return;
1456 }
1457
1458 sub staleness { $_[0]{staleness} }
1459
1460 sub get_data_location_for {
1461     my $self = shift;
1462     my ($args) = @_;
1463
1464     # Assume that the head is not allowed unless otherwise specified.
1465     $args->{allow_head} = 0 unless exists $args->{allow_head};
1466
1467     # Assume we don't create a new blist location unless otherwise specified.
1468     $args->{create} = 0 unless exists $args->{create};
1469
1470     my $blist = $self->get_bucket_list({
1471         key_md5 => $args->{key_md5},
1472         key => $args->{key},
1473         create  => $args->{create},
1474     });
1475     return unless $blist && $blist->{found};
1476
1477     # At this point, $blist knows where the md5 is. What it -doesn't- know yet
1478     # is whether or not this transaction has this key. That's part of the next
1479     # function call.
1480     my $location = $blist->get_data_location_for({
1481         allow_head => $args->{allow_head},
1482     }) or return;
1483
1484     return $location;
1485 }
1486
1487 sub get_data_for {
1488     my $self = shift;
1489     my ($args) = @_;
1490
1491     my $location = $self->get_data_location_for( $args )
1492         or return;
1493
1494     return $self->engine->_load_sector( $location );
1495 }
1496
1497 sub write_data {
1498     my $self = shift;
1499     my ($args) = @_;
1500
1501     my $blist = $self->get_bucket_list({
1502         key_md5 => $args->{key_md5},
1503         key => $args->{key},
1504         create  => 1,
1505     }) or DBM::Deep->_throw_error( "How did write_data fail (no blist)?!" );
1506
1507     # Handle any transactional bookkeeping.
1508     if ( $self->engine->trans_id ) {
1509         if ( ! $blist->has_md5 ) {
1510             $blist->mark_deleted({
1511                 trans_id => 0,
1512             });
1513         }
1514     }
1515     else {
1516         my @trans_ids = $self->engine->get_running_txn_ids;
1517         if ( $blist->has_md5 ) {
1518             if ( @trans_ids ) {
1519                 my $old_value = $blist->get_data_for;
1520                 foreach my $other_trans_id ( @trans_ids ) {
1521                     next if $blist->get_data_location_for({
1522                         trans_id   => $other_trans_id,
1523                         allow_head => 0,
1524                     });
1525                     $blist->write_md5({
1526                         trans_id => $other_trans_id,
1527                         key      => $args->{key},
1528                         key_md5  => $args->{key_md5},
1529                         value    => $old_value->clone,
1530                     });
1531                 }
1532             }
1533         }
1534         else {
1535             if ( @trans_ids ) {
1536                 foreach my $other_trans_id ( @trans_ids ) {
1537                     #XXX This doesn't seem to possible to ever happen . . .
1538                     next if $blist->get_data_location_for({ trans_id => $other_trans_id, allow_head => 0 });
1539                     $blist->mark_deleted({
1540                         trans_id => $other_trans_id,
1541                     });
1542                 }
1543             }
1544         }
1545     }
1546
1547     #XXX Is this safe to do transactionally?
1548     # Free the place we're about to write to.
1549     if ( $blist->get_data_location_for({ allow_head => 0 }) ) {
1550         $blist->get_data_for({ allow_head => 0 })->free;
1551     }
1552
1553     $blist->write_md5({
1554         key      => $args->{key},
1555         key_md5  => $args->{key_md5},
1556         value    => $args->{value},
1557     });
1558 }
1559
1560 sub delete_key {
1561     my $self = shift;
1562     my ($args) = @_;
1563
1564     # XXX What should happen if this fails?
1565     my $blist = $self->get_bucket_list({
1566         key_md5 => $args->{key_md5},
1567     }) or DBM::Deep->_throw_error( "How did delete_key fail (no blist)?!" );
1568
1569     # Save the location so that we can free the data
1570     my $location = $blist->get_data_location_for({
1571         allow_head => 0,
1572     });
1573     my $old_value = $location && $self->engine->_load_sector( $location );
1574
1575     my @trans_ids = $self->engine->get_running_txn_ids;
1576
1577     # If we're the HEAD and there are running txns, then we need to clone this value to the other
1578     # transactions to preserve Isolation.
1579     if ( $self->engine->trans_id == 0 ) {
1580         if ( @trans_ids ) {
1581             foreach my $other_trans_id ( @trans_ids ) {
1582                 next if $blist->get_data_location_for({ trans_id => $other_trans_id, allow_head => 0 });
1583                 $blist->write_md5({
1584                     trans_id => $other_trans_id,
1585                     key      => $args->{key},
1586                     key_md5  => $args->{key_md5},
1587                     value    => $old_value->clone,
1588                 });
1589             }
1590         }
1591     }
1592
1593     my $data;
1594     if ( @trans_ids ) {
1595         $blist->mark_deleted( $args );
1596
1597         if ( $old_value ) {
1598             $data = $old_value->data({ export => 1 });
1599             $old_value->free;
1600         }
1601     }
1602     else {
1603         $data = $blist->delete_md5( $args );
1604     }
1605
1606     return $data;
1607 }
1608
1609 sub get_blist_loc {
1610     my $self = shift;
1611
1612     my $e = $self->engine;
1613     my $blist_loc = $e->storage->read_at( $self->offset + $self->base_size, $e->byte_size );
1614     return unpack( $StP{$e->byte_size}, $blist_loc );
1615 }
1616
1617 sub get_bucket_list {
1618     my $self = shift;
1619     my ($args) = @_;
1620     $args ||= {};
1621
1622     # XXX Add in check here for recycling?
1623
1624     my $engine = $self->engine;
1625
1626     my $blist_loc = $self->get_blist_loc;
1627
1628     # There's no index or blist yet
1629     unless ( $blist_loc ) {
1630         return unless $args->{create};
1631
1632         my $blist = DBM::Deep::Engine::Sector::BucketList->new({
1633             engine  => $engine,
1634             key_md5 => $args->{key_md5},
1635         });
1636
1637         $engine->storage->print_at( $self->offset + $self->base_size,
1638             pack( $StP{$engine->byte_size}, $blist->offset ),
1639         );
1640
1641         return $blist;
1642     }
1643
1644     my $sector = $engine->_load_sector( $blist_loc )
1645         or DBM::Deep->_throw_error( "Cannot read sector at $blist_loc in get_bucket_list()" );
1646     my $i = 0;
1647     my $last_sector = undef;
1648     while ( $sector->isa( 'DBM::Deep::Engine::Sector::Index' ) ) {
1649         $blist_loc = $sector->get_entry( ord( substr( $args->{key_md5}, $i++, 1 ) ) );
1650         $last_sector = $sector;
1651         if ( $blist_loc ) {
1652             $sector = $engine->_load_sector( $blist_loc )
1653                 or DBM::Deep->_throw_error( "Cannot read sector at $blist_loc in get_bucket_list()" );
1654         }
1655         else {
1656             $sector = undef;
1657             last;
1658         }
1659     }
1660
1661     # This means we went through the Index sector(s) and found an empty slot
1662     unless ( $sector ) {
1663         return unless $args->{create};
1664
1665         DBM::Deep->_throw_error( "No last_sector when attempting to build a new entry" )
1666             unless $last_sector;
1667
1668         my $blist = DBM::Deep::Engine::Sector::BucketList->new({
1669             engine  => $engine,
1670             key_md5 => $args->{key_md5},
1671         });
1672
1673         $last_sector->set_entry( ord( substr( $args->{key_md5}, $i - 1, 1 ) ) => $blist->offset );
1674
1675         return $blist;
1676     }
1677
1678     $sector->find_md5( $args->{key_md5} );
1679
1680     # See whether or not we need to reindex the bucketlist
1681     # Yes, the double-braces are there for a reason. if() doesn't create a redo-able block,
1682     # so we have to create a bare block within the if() for redo-purposes. Patch and idea
1683     # submitted by sprout@cpan.org. -RobK, 2008-01-09
1684     if ( !$sector->has_md5 && $args->{create} && $sector->{idx} == -1 ) {{
1685         my $redo;
1686
1687         my $new_index = DBM::Deep::Engine::Sector::Index->new({
1688             engine => $engine,
1689         });
1690
1691         my %blist_cache;
1692         #XXX q.v. the comments for this function.
1693         foreach my $entry ( $sector->chopped_up ) {
1694             my ($spot, $md5) = @{$entry};
1695             my $idx = ord( substr( $md5, $i, 1 ) );
1696
1697             # XXX This is inefficient
1698             my $blist = $blist_cache{$idx}
1699                 ||= DBM::Deep::Engine::Sector::BucketList->new({
1700                     engine => $engine,
1701                 });
1702
1703             $new_index->set_entry( $idx => $blist->offset );
1704
1705             my $new_spot = $blist->write_at_next_open( $md5 );
1706             $engine->reindex_entry( $spot => $new_spot );
1707         }
1708
1709         # Handle the new item separately.
1710         {
1711             my $idx = ord( substr( $args->{key_md5}, $i, 1 ) );
1712
1713             # If all the previous blist's items have been thrown into one
1714             # blist and the new item belongs in there too, we need
1715             # another index.
1716             if ( keys %blist_cache == 1 and each %blist_cache == $idx ) {
1717                 ++$i, ++$redo;
1718             } else {
1719                 my $blist = $blist_cache{$idx}
1720                     ||= DBM::Deep::Engine::Sector::BucketList->new({
1721                         engine => $engine,
1722                     });
1723     
1724                 $new_index->set_entry( $idx => $blist->offset );
1725     
1726                 #XXX THIS IS HACKY!
1727                 $blist->find_md5( $args->{key_md5} );
1728                 $blist->write_md5({
1729                     key     => $args->{key},
1730                     key_md5 => $args->{key_md5},
1731                     value   => DBM::Deep::Engine::Sector::Null->new({
1732                         engine => $engine,
1733                         data   => undef,
1734                     }),
1735                 });
1736             }
1737 #            my $blist = $blist_cache{$idx}
1738 #                ||= DBM::Deep::Engine::Sector::BucketList->new({
1739 #                    engine => $engine,
1740 #                });
1741 #
1742 #            $new_index->set_entry( $idx => $blist->offset );
1743 #
1744 #            #XXX THIS IS HACKY!
1745 #            $blist->find_md5( $args->{key_md5} );
1746 #            $blist->write_md5({
1747 #                key     => $args->{key},
1748 #                key_md5 => $args->{key_md5},
1749 #                value   => DBM::Deep::Engine::Sector::Null->new({
1750 #                    engine => $engine,
1751 #                    data   => undef,
1752 #                }),
1753 #            });
1754         }
1755
1756         if ( $last_sector ) {
1757             $last_sector->set_entry(
1758                 ord( substr( $args->{key_md5}, $i - 1, 1 ) ),
1759                 $new_index->offset,
1760             );
1761         } else {
1762             $engine->storage->print_at( $self->offset + $self->base_size,
1763                 pack( $StP{$engine->byte_size}, $new_index->offset ),
1764             );
1765         }
1766
1767         $sector->clear;
1768         $sector->free;
1769
1770         if ( $redo ) {
1771             (undef, $sector) = %blist_cache;
1772             $last_sector = $new_index;
1773             redo;
1774         }
1775
1776         $sector = $blist_cache{ ord( substr( $args->{key_md5}, $i, 1 ) ) };
1777         $sector->find_md5( $args->{key_md5} );
1778     }}
1779
1780     return $sector;
1781 }
1782
1783 sub get_class_offset {
1784     my $self = shift;
1785
1786     my $e = $self->engine;
1787     return unpack(
1788         $StP{$e->byte_size},
1789         $e->storage->read_at(
1790             $self->offset + $self->base_size + 1 * $e->byte_size, $e->byte_size,
1791         ),
1792     );
1793 }
1794
1795 sub get_classname {
1796     my $self = shift;
1797
1798     my $class_offset = $self->get_class_offset;
1799
1800     return unless $class_offset;
1801
1802     return $self->engine->_load_sector( $class_offset )->data;
1803 }
1804
1805 sub data {
1806     my $self = shift;
1807     my ($args) = @_;
1808     $args ||= {};
1809
1810     my $obj;
1811     unless ( $obj = $self->engine->cache->{ $self->offset } ) {
1812         $obj = DBM::Deep->new({
1813             type        => $self->type,
1814             base_offset => $self->offset,
1815             staleness   => $self->staleness,
1816             storage     => $self->engine->storage,
1817             engine      => $self->engine,
1818         });
1819
1820         if ( $self->engine->storage->{autobless} ) {
1821             my $classname = $self->get_classname;
1822             if ( defined $classname ) {
1823                 bless $obj, $classname;
1824             }
1825         }
1826
1827         $self->engine->cache->{$self->offset} = $obj;
1828     }
1829
1830     # We're not exporting, so just return.
1831     unless ( $args->{export} ) {
1832         return $obj;
1833     }
1834
1835     # We shouldn't export if this is still referred to.
1836     if ( $self->get_refcount > 1 ) {
1837         return $obj;
1838     }
1839
1840     return $obj->export;
1841 }
1842
1843 sub free {
1844     my $self = shift;
1845
1846     # We're not ready to be removed yet.
1847     if ( $self->decrement_refcount > 0 ) {
1848         return;
1849     }
1850
1851     # Rebless the object into DBM::Deep::Null.
1852     eval { %{ $self->engine->cache->{ $self->offset } } = (); };
1853     eval { @{ $self->engine->cache->{ $self->offset } } = (); };
1854     bless $self->engine->cache->{ $self->offset }, 'DBM::Deep::Null';
1855     delete $self->engine->cache->{ $self->offset };
1856
1857     my $blist_loc = $self->get_blist_loc;
1858     $self->engine->_load_sector( $blist_loc )->free if $blist_loc;
1859
1860     my $class_loc = $self->get_class_offset;
1861     $self->engine->_load_sector( $class_loc )->free if $class_loc;
1862
1863     $self->SUPER::free();
1864 }
1865
1866 sub increment_refcount {
1867     my $self = shift;
1868
1869     my $refcount = $self->get_refcount;
1870
1871     $refcount++;
1872
1873     $self->write_refcount( $refcount );
1874
1875     return $refcount;
1876 }
1877
1878 sub decrement_refcount {
1879     my $self = shift;
1880
1881     my $refcount = $self->get_refcount;
1882
1883     $refcount--;
1884
1885     $self->write_refcount( $refcount );
1886
1887     return $refcount;
1888 }
1889
1890 sub get_refcount {
1891     my $self = shift;
1892
1893     my $e = $self->engine;
1894     return unpack(
1895         $StP{$e->byte_size},
1896         $e->storage->read_at(
1897             $self->offset + $self->base_size + 2 * $e->byte_size, $e->byte_size,
1898         ),
1899     );
1900 }
1901
1902 sub write_refcount {
1903     my $self = shift;
1904     my ($num) = @_;
1905
1906     my $e = $self->engine;
1907     $e->storage->print_at(
1908         $self->offset + $self->base_size + 2 * $e->byte_size,
1909         pack( $StP{$e->byte_size}, $num ),
1910     );
1911 }
1912
1913 package DBM::Deep::Engine::Sector::BucketList;
1914
1915 our @ISA = qw( DBM::Deep::Engine::Sector );
1916
1917 sub _init {
1918     my $self = shift;
1919
1920     my $engine = $self->engine;
1921
1922     unless ( $self->offset ) {
1923         my $leftover = $self->size - $self->base_size;
1924
1925         $self->{offset} = $engine->_request_blist_sector( $self->size );
1926         $engine->storage->print_at( $self->offset, $engine->SIG_BLIST ); # Sector type
1927         # Skip staleness counter
1928         $engine->storage->print_at( $self->offset + $self->base_size,
1929             chr(0) x $leftover, # Zero-fill the data
1930         );
1931     }
1932
1933     if ( $self->{key_md5} ) {
1934         $self->find_md5;
1935     }
1936
1937     return $self;
1938 }
1939
1940 sub clear {
1941     my $self = shift;
1942     $self->engine->storage->print_at( $self->offset + $self->base_size,
1943         chr(0) x ($self->size - $self->base_size), # Zero-fill the data
1944     );
1945 }
1946
1947 sub size {
1948     my $self = shift;
1949     unless ( $self->{size} ) {
1950         my $e = $self->engine;
1951         # Base + numbuckets * bucketsize
1952         $self->{size} = $self->base_size + $e->max_buckets * $self->bucket_size;
1953     }
1954     return $self->{size};
1955 }
1956
1957 sub free_meth { return '_add_free_blist_sector' }
1958
1959 sub free {
1960     my $self = shift;
1961
1962     my $e = $self->engine;
1963     foreach my $bucket ( $self->chopped_up ) {
1964         my $rest = $bucket->[-1];
1965
1966         # Delete the keysector
1967         my $l = unpack( $StP{$e->byte_size}, substr( $rest, $e->hash_size, $e->byte_size ) );
1968         my $s = $e->_load_sector( $l ); $s->free if $s;
1969
1970         # Delete the HEAD sector
1971         $l = unpack( $StP{$e->byte_size},
1972             substr( $rest,
1973                 $e->hash_size + $e->byte_size,
1974                 $e->byte_size,
1975             ),
1976         );
1977         $s = $e->_load_sector( $l ); $s->free if $s;
1978
1979         foreach my $txn ( 0 .. $e->num_txns - 2 ) {
1980             my $l = unpack( $StP{$e->byte_size},
1981                 substr( $rest,
1982                     $e->hash_size + 2 * $e->byte_size + $txn * ($e->byte_size + $STALE_SIZE),
1983                     $e->byte_size,
1984                 ),
1985             );
1986             my $s = $e->_load_sector( $l ); $s->free if $s;
1987         }
1988     }
1989
1990     $self->SUPER::free();
1991 }
1992
1993 sub bucket_size {
1994     my $self = shift;
1995     unless ( $self->{bucket_size} ) {
1996         my $e = $self->engine;
1997         # Key + head (location) + transactions (location + staleness-counter)
1998         my $location_size = $e->byte_size + $e->byte_size + ($e->num_txns - 1) * ($e->byte_size + $STALE_SIZE);
1999         $self->{bucket_size} = $e->hash_size + $location_size;
2000     }
2001     return $self->{bucket_size};
2002 }
2003
2004 # XXX This is such a poor hack. I need to rethink this code.
2005 sub chopped_up {
2006     my $self = shift;
2007
2008     my $e = $self->engine;
2009
2010     my @buckets;
2011     foreach my $idx ( 0 .. $e->max_buckets - 1 ) {
2012         my $spot = $self->offset + $self->base_size + $idx * $self->bucket_size;
2013         my $md5 = $e->storage->read_at( $spot, $e->hash_size );
2014
2015         #XXX If we're chopping, why would we ever have the blank_md5?
2016         last if $md5 eq $e->blank_md5;
2017
2018         my $rest = $e->storage->read_at( undef, $self->bucket_size - $e->hash_size );
2019         push @buckets, [ $spot, $md5 . $rest ];
2020     }
2021
2022     return @buckets;
2023 }
2024
2025 sub write_at_next_open {
2026     my $self = shift;
2027     my ($entry) = @_;
2028
2029     #XXX This is such a hack!
2030     $self->{_next_open} = 0 unless exists $self->{_next_open};
2031
2032     my $spot = $self->offset + $self->base_size + $self->{_next_open}++ * $self->bucket_size;
2033     $self->engine->storage->print_at( $spot, $entry );
2034
2035     return $spot;
2036 }
2037
2038 sub has_md5 {
2039     my $self = shift;
2040     unless ( exists $self->{found} ) {
2041         $self->find_md5;
2042     }
2043     return $self->{found};
2044 }
2045
2046 sub find_md5 {
2047     my $self = shift;
2048
2049     $self->{found} = undef;
2050     $self->{idx}   = -1;
2051
2052     if ( @_ ) {
2053         $self->{key_md5} = shift;
2054     }
2055
2056     # If we don't have an MD5, then what are we supposed to do?
2057     unless ( exists $self->{key_md5} ) {
2058         DBM::Deep->_throw_error( "Cannot find_md5 without a key_md5 set" );
2059     }
2060
2061     my $e = $self->engine;
2062     foreach my $idx ( 0 .. $e->max_buckets - 1 ) {
2063         my $potential = $e->storage->read_at(
2064             $self->offset + $self->base_size + $idx * $self->bucket_size, $e->hash_size,
2065         );
2066
2067         if ( $potential eq $e->blank_md5 ) {
2068             $self->{idx} = $idx;
2069             return;
2070         }
2071
2072         if ( $potential eq $self->{key_md5} ) {
2073             $self->{found} = 1;
2074             $self->{idx} = $idx;
2075             return;
2076         }
2077     }
2078
2079     return;
2080 }
2081
2082 sub write_md5 {
2083     my $self = shift;
2084     my ($args) = @_;
2085
2086     DBM::Deep->_throw_error( "write_md5: no key" ) unless exists $args->{key};
2087     DBM::Deep->_throw_error( "write_md5: no key_md5" ) unless exists $args->{key_md5};
2088     DBM::Deep->_throw_error( "write_md5: no value" ) unless exists $args->{value};
2089
2090     my $engine = $self->engine;
2091
2092     $args->{trans_id} = $engine->trans_id unless exists $args->{trans_id};
2093
2094     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
2095     $engine->add_entry( $args->{trans_id}, $spot );
2096
2097     unless ($self->{found}) {
2098         my $key_sector = DBM::Deep::Engine::Sector::Scalar->new({
2099             engine => $engine,
2100             data   => $args->{key},
2101         });
2102
2103         $engine->storage->print_at( $spot,
2104             $args->{key_md5},
2105             pack( $StP{$engine->byte_size}, $key_sector->offset ),
2106         );
2107     }
2108
2109     my $loc = $spot
2110       + $engine->hash_size
2111       + $engine->byte_size;
2112
2113     if ( $args->{trans_id} ) {
2114         $loc += $engine->byte_size + ($args->{trans_id} - 1) * ( $engine->byte_size + $STALE_SIZE );
2115
2116         $engine->storage->print_at( $loc,
2117             pack( $StP{$engine->byte_size}, $args->{value}->offset ),
2118             pack( $StP{$STALE_SIZE}, $engine->get_txn_staleness_counter( $args->{trans_id} ) ),
2119         );
2120     }
2121     else {
2122         $engine->storage->print_at( $loc,
2123             pack( $StP{$engine->byte_size}, $args->{value}->offset ),
2124         );
2125     }
2126 }
2127
2128 sub mark_deleted {
2129     my $self = shift;
2130     my ($args) = @_;
2131     $args ||= {};
2132
2133     my $engine = $self->engine;
2134
2135     $args->{trans_id} = $engine->trans_id unless exists $args->{trans_id};
2136
2137     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
2138     $engine->add_entry( $args->{trans_id}, $spot );
2139
2140     my $loc = $spot
2141       + $engine->hash_size
2142       + $engine->byte_size;
2143
2144     if ( $args->{trans_id} ) {
2145         $loc += $engine->byte_size + ($args->{trans_id} - 1) * ( $engine->byte_size + $STALE_SIZE );
2146
2147         $engine->storage->print_at( $loc,
2148             pack( $StP{$engine->byte_size}, 1 ), # 1 is the marker for deleted
2149             pack( $StP{$STALE_SIZE}, $engine->get_txn_staleness_counter( $args->{trans_id} ) ),
2150         );
2151     }
2152     else {
2153         $engine->storage->print_at( $loc,
2154             pack( $StP{$engine->byte_size}, 1 ), # 1 is the marker for deleted
2155         );
2156     }
2157
2158 }
2159
2160 sub delete_md5 {
2161     my $self = shift;
2162     my ($args) = @_;
2163
2164     my $engine = $self->engine;
2165     return undef unless $self->{found};
2166
2167     # Save the location so that we can free the data
2168     my $location = $self->get_data_location_for({
2169         allow_head => 0,
2170     });
2171     my $key_sector = $self->get_key_for;
2172
2173     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
2174     $engine->storage->print_at( $spot,
2175         $engine->storage->read_at(
2176             $spot + $self->bucket_size,
2177             $self->bucket_size * ( $engine->max_buckets - $self->{idx} - 1 ),
2178         ),
2179         chr(0) x $self->bucket_size,
2180     );
2181
2182     $key_sector->free;
2183
2184     my $data_sector = $self->engine->_load_sector( $location );
2185     my $data = $data_sector->data({ export => 1 });
2186     $data_sector->free;
2187
2188     return $data;
2189 }
2190
2191 sub get_data_location_for {
2192     my $self = shift;
2193     my ($args) = @_;
2194     $args ||= {};
2195
2196     $args->{allow_head} = 0 unless exists $args->{allow_head};
2197     $args->{trans_id}   = $self->engine->trans_id unless exists $args->{trans_id};
2198     $args->{idx}        = $self->{idx} unless exists $args->{idx};
2199
2200     my $e = $self->engine;
2201
2202     my $spot = $self->offset + $self->base_size
2203       + $args->{idx} * $self->bucket_size
2204       + $e->hash_size
2205       + $e->byte_size;
2206
2207     if ( $args->{trans_id} ) {
2208         $spot += $e->byte_size + ($args->{trans_id} - 1) * ( $e->byte_size + $STALE_SIZE );
2209     }
2210
2211     my $buffer = $e->storage->read_at(
2212         $spot,
2213         $e->byte_size + $STALE_SIZE,
2214     );
2215     my ($loc, $staleness) = unpack( $StP{$e->byte_size} . ' ' . $StP{$STALE_SIZE}, $buffer );
2216
2217     # XXX Merge the two if-clauses below
2218     if ( $args->{trans_id} ) {
2219         # We have found an entry that is old, so get rid of it
2220         if ( $staleness != (my $s = $e->get_txn_staleness_counter( $args->{trans_id} ) ) ) {
2221             $e->storage->print_at(
2222                 $spot,
2223                 pack( $StP{$e->byte_size} . ' ' . $StP{$STALE_SIZE}, (0) x 2 ), 
2224             );
2225             $loc = 0;
2226         }
2227     }
2228
2229     # If we're in a transaction and we never wrote to this location, try the
2230     # HEAD instead.
2231     if ( $args->{trans_id} && !$loc && $args->{allow_head} ) {
2232         return $self->get_data_location_for({
2233             trans_id   => 0,
2234             allow_head => 1,
2235             idx        => $args->{idx},
2236         });
2237     }
2238
2239     return $loc <= 1 ? 0 : $loc;
2240 }
2241
2242 sub get_data_for {
2243     my $self = shift;
2244     my ($args) = @_;
2245     $args ||= {};
2246
2247     return unless $self->{found};
2248     my $location = $self->get_data_location_for({
2249         allow_head => $args->{allow_head},
2250     });
2251     return $self->engine->_load_sector( $location );
2252 }
2253
2254 sub get_key_for {
2255     my $self = shift;
2256     my ($idx) = @_;
2257     $idx = $self->{idx} unless defined $idx;
2258
2259     if ( $idx >= $self->engine->max_buckets ) {
2260         DBM::Deep->_throw_error( "get_key_for(): Attempting to retrieve $idx" );
2261     }
2262
2263     my $location = $self->engine->storage->read_at(
2264         $self->offset + $self->base_size + $idx * $self->bucket_size + $self->engine->hash_size,
2265         $self->engine->byte_size,
2266     );
2267     $location = unpack( $StP{$self->engine->byte_size}, $location );
2268     DBM::Deep->_throw_error( "get_key_for: No location?" ) unless $location;
2269
2270     return $self->engine->_load_sector( $location );
2271 }
2272
2273 package DBM::Deep::Engine::Sector::Index;
2274
2275 our @ISA = qw( DBM::Deep::Engine::Sector );
2276
2277 sub _init {
2278     my $self = shift;
2279
2280     my $engine = $self->engine;
2281
2282     unless ( $self->offset ) {
2283         my $leftover = $self->size - $self->base_size;
2284
2285         $self->{offset} = $engine->_request_index_sector( $self->size );
2286         $engine->storage->print_at( $self->offset, $engine->SIG_INDEX ); # Sector type
2287         # Skip staleness counter
2288         $engine->storage->print_at( $self->offset + $self->base_size,
2289             chr(0) x $leftover, # Zero-fill the rest
2290         );
2291     }
2292
2293     return $self;
2294 }
2295
2296 #XXX Change here
2297 sub size {
2298     my $self = shift;
2299     unless ( $self->{size} ) {
2300         my $e = $self->engine;
2301         $self->{size} = $self->base_size + $e->byte_size * $e->hash_chars;
2302     }
2303     return $self->{size};
2304 }
2305
2306 sub free_meth { return '_add_free_index_sector' }
2307
2308 sub free {
2309     my $self = shift;
2310     my $e = $self->engine;
2311
2312     for my $i ( 0 .. $e->hash_chars - 1 ) {
2313         my $l = $self->get_entry( $i ) or next;
2314         $e->_load_sector( $l )->free;
2315     }
2316
2317     $self->SUPER::free();
2318 }
2319
2320 sub _loc_for {
2321     my $self = shift;
2322     my ($idx) = @_;
2323     return $self->offset + $self->base_size + $idx * $self->engine->byte_size;
2324 }
2325
2326 sub get_entry {
2327     my $self = shift;
2328     my ($idx) = @_;
2329
2330     my $e = $self->engine;
2331
2332     DBM::Deep->_throw_error( "get_entry: Out of range ($idx)" )
2333         if $idx < 0 || $idx >= $e->hash_chars;
2334
2335     return unpack(
2336         $StP{$e->byte_size},
2337         $e->storage->read_at( $self->_loc_for( $idx ), $e->byte_size ),
2338     );
2339 }
2340
2341 sub set_entry {
2342     my $self = shift;
2343     my ($idx, $loc) = @_;
2344
2345     my $e = $self->engine;
2346
2347     DBM::Deep->_throw_error( "set_entry: Out of range ($idx)" )
2348         if $idx < 0 || $idx >= $e->hash_chars;
2349
2350     $self->engine->storage->print_at(
2351         $self->_loc_for( $idx ),
2352         pack( $StP{$e->byte_size}, $loc ),
2353     );
2354 }
2355
2356 # This was copied from MARCEL's Class::Null. However, I couldn't use it because
2357 # I need an undef value, not an implementation of the Null Class pattern.
2358 package DBM::Deep::Null;
2359
2360 use overload
2361     'bool'   => sub { undef },
2362     '""'     => sub { undef },
2363     '0+'     => sub { undef },
2364     fallback => 1,
2365     nomethod => 'AUTOLOAD';
2366
2367 sub AUTOLOAD { return; }
2368
2369 1;
2370 __END__