r6200@rob-kinyons-computer-2 (orig r9980): rkinyon | 2007-09-22 21:02:54 -0400
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 use 5.006_000;
4
5 use strict;
6 use warnings;
7
8 our $VERSION = q(1.0003);
9
10 use Scalar::Util ();
11
12 # File-wide notes:
13 # * Every method in here assumes that the storage has been appropriately
14 #   safeguarded. This can be anything from flock() to some sort of manual
15 #   mutex. But, it's the caller's responsability to make sure that this has
16 #   been done.
17
18 # Setup file and tag signatures.  These should never change.
19 sub SIG_FILE     () { 'DPDB' }
20 sub SIG_HEADER   () { 'h'    }
21 sub SIG_HASH     () { 'H'    }
22 sub SIG_ARRAY    () { 'A'    }
23 sub SIG_NULL     () { 'N'    }
24 sub SIG_DATA     () { 'D'    }
25 sub SIG_INDEX    () { 'I'    }
26 sub SIG_BLIST    () { 'B'    }
27 sub SIG_FREE     () { 'F'    }
28 sub SIG_SIZE     () {  1     }
29
30 my $STALE_SIZE = 2;
31
32 # Please refer to the pack() documentation for further information
33 my %StP = (
34     1 => 'C', # Unsigned char value (no order needed as it's just one byte)
35     2 => 'n', # Unsigned short in "network" (big-endian) order
36     4 => 'N', # Unsigned long in "network" (big-endian) order
37     8 => 'Q', # Usigned quad (no order specified, presumably machine-dependent)
38 );
39
40 ################################################################################
41
42 sub new {
43     my $class = shift;
44     my ($args) = @_;
45
46     my $self = bless {
47         byte_size   => 4,
48
49         digest      => undef,
50         hash_size   => 16,  # In bytes
51         hash_chars  => 256, # Number of chars the algorithm uses per byte
52         max_buckets => 16,
53         num_txns    => 1,   # The HEAD
54         trans_id    => 0,   # Default to the HEAD
55
56         data_sector_size => 64, # Size in bytes of each data sector
57
58         entries => {}, # This is the list of entries for transactions
59         storage => undef,
60     }, $class;
61
62     # Never allow byte_size to be set directly.
63     delete $args->{byte_size};
64     if ( defined $args->{pack_size} ) {
65         if ( lc $args->{pack_size} eq 'small' ) {
66             $args->{byte_size} = 2;
67         }
68         elsif ( lc $args->{pack_size} eq 'medium' ) {
69             $args->{byte_size} = 4;
70         }
71         elsif ( lc $args->{pack_size} eq 'large' ) {
72             $args->{byte_size} = 8;
73         }
74         else {
75             DBM::Deep->_throw_error( "Unknown pack_size value: '$args->{pack_size}'" );
76         }
77     }
78
79     # Grab the parameters we want to use
80     foreach my $param ( keys %$self ) {
81         next unless exists $args->{$param};
82         $self->{$param} = $args->{$param};
83     }
84
85     my %validations = (
86         max_buckets      => { floor => 16, ceil => 256 },
87         num_txns         => { floor => 1,  ceil => 255 },
88         data_sector_size => { floor => 32, ceil => 256 },
89     );
90
91     while ( my ($attr, $c) = each %validations ) {
92         if (   !defined $self->{$attr}
93             || !length $self->{$attr}
94             || $self->{$attr} =~ /\D/
95             || $self->{$attr} < $c->{floor}
96         ) {
97             $self->{$attr} = '(undef)' if !defined $self->{$attr};
98             warn "Floor of $attr is $c->{floor}. Setting it to $c->{floor} from '$self->{$attr}'\n";
99             $self->{$attr} = $c->{floor};
100         }
101         elsif ( $self->{$attr} > $c->{ceil} ) {
102             warn "Ceiling of $attr is $c->{ceil}. Setting it to $c->{ceil} from '$self->{$attr}'\n";
103             $self->{$attr} = $c->{ceil};
104         }
105     }
106
107     if ( !$self->{digest} ) {
108         require Digest::MD5;
109         $self->{digest} = \&Digest::MD5::md5;
110     }
111
112     return $self;
113 }
114
115 ################################################################################
116
117 sub read_value {
118     my $self = shift;
119     my ($obj, $key) = @_;
120
121     # This will be a Reference sector
122     my $sector = $self->_load_sector( $obj->_base_offset )
123         or return;
124
125     if ( $sector->staleness != $obj->_staleness ) {
126         return;
127     }
128
129     my $key_md5 = $self->_apply_digest( $key );
130
131     my $value_sector = $sector->get_data_for({
132         key_md5    => $key_md5,
133         allow_head => 1,
134     });
135
136     unless ( $value_sector ) {
137         $value_sector = DBM::Deep::Engine::Sector::Null->new({
138             engine => $self,
139             data   => undef,
140         });
141
142         $sector->write_data({
143             key_md5 => $key_md5,
144             key     => $key,
145             value   => $value_sector,
146         });
147     }
148
149     return $value_sector->data;
150 }
151
152 sub get_classname {
153     my $self = shift;
154     my ($obj) = @_;
155
156     # This will be a Reference sector
157     my $sector = $self->_load_sector( $obj->_base_offset )
158         or DBM::Deep->_throw_error( "How did get_classname fail (no sector for '$obj')?!" );
159
160     if ( $sector->staleness != $obj->_staleness ) {
161         return;
162     }
163
164     return $sector->get_classname;
165 }
166
167 sub make_reference {
168     my $self = shift;
169     my ($obj, $old_key, $new_key) = @_;
170
171     # This will be a Reference sector
172     my $sector = $self->_load_sector( $obj->_base_offset )
173         or DBM::Deep->_throw_error( "How did get_classname fail (no sector for '$obj')?!" );
174
175     if ( $sector->staleness != $obj->_staleness ) {
176         return;
177     }
178
179     my $old_md5 = $self->_apply_digest( $old_key );
180
181     my $value_sector = $sector->get_data_for({
182         key_md5    => $old_md5,
183         allow_head => 1,
184     });
185
186     unless ( $value_sector ) {
187         $value_sector = DBM::Deep::Engine::Sector::Null->new({
188             engine => $self,
189             data   => undef,
190         });
191
192         $sector->write_data({
193             key_md5 => $old_md5,
194             key     => $old_key,
195             value   => $value_sector,
196         });
197     }
198
199     if ( $value_sector->isa( 'DBM::Deep::Engine::Sector::Reference' ) ) {
200         $sector->write_data({
201             key     => $new_key,
202             key_md5 => $self->_apply_digest( $new_key ),
203             value   => $value_sector,
204         });
205         $value_sector->increment_refcount;
206     }
207     else {
208         $sector->write_data({
209             key     => $new_key,
210             key_md5 => $self->_apply_digest( $new_key ),
211             value   => $value_sector->clone,
212         });
213     }
214 }
215
216 sub key_exists {
217     my $self = shift;
218     my ($obj, $key) = @_;
219
220     # This will be a Reference sector
221     my $sector = $self->_load_sector( $obj->_base_offset )
222         or return '';
223
224     if ( $sector->staleness != $obj->_staleness ) {
225         return '';
226     }
227
228     my $data = $sector->get_data_for({
229         key_md5    => $self->_apply_digest( $key ),
230         allow_head => 1,
231     });
232
233     # exists() returns 1 or '' for true/false.
234     return $data ? 1 : '';
235 }
236
237 sub delete_key {
238     my $self = shift;
239     my ($obj, $key) = @_;
240
241     my $sector = $self->_load_sector( $obj->_base_offset )
242         or return;
243
244     if ( $sector->staleness != $obj->_staleness ) {
245         return;
246     }
247
248     return $sector->delete_key({
249         key_md5    => $self->_apply_digest( $key ),
250         allow_head => 0,
251     });
252 }
253
254 sub write_value {
255     my $self = shift;
256     my ($obj, $key, $value) = @_;
257
258     my $r = Scalar::Util::reftype( $value ) || '';
259     {
260         last if $r eq '';
261         last if $r eq 'HASH';
262         last if $r eq 'ARRAY';
263
264         DBM::Deep->_throw_error(
265             "Storage of references of type '$r' is not supported."
266         );
267     }
268
269     # This will be a Reference sector
270     my $sector = $self->_load_sector( $obj->_base_offset )
271         or DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep." );
272
273     if ( $sector->staleness != $obj->_staleness ) {
274         DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep.n" );
275     }
276
277     my ($class, $type);
278     if ( !defined $value ) {
279         $class = 'DBM::Deep::Engine::Sector::Null';
280     }
281     elsif ( $r eq 'ARRAY' || $r eq 'HASH' ) {
282         my $is_dbm_deep = eval { local $SIG{'__DIE__'}; $value->isa( 'DBM::Deep' ); };
283         if ( $is_dbm_deep ) {
284             if ( $value->_engine->storage == $self->storage ) {
285                 my $value_sector = $self->_load_sector( $value->_base_offset );
286                 $sector->write_data({
287                     key     => $key,
288                     key_md5 => $self->_apply_digest( $key ),
289                     value   => $value_sector,
290                 });
291                 $value_sector->increment_refcount;
292                 return 1;
293             }
294
295             DBM::Deep->_throw_error( "Cannot store values across DBM::Deep files. Please use export() instead." );
296         }
297         if ( $r eq 'ARRAY' && tied(@$value) ) {
298             DBM::Deep->_throw_error( "Cannot store something that is tied." );
299         }
300         if ( $r eq 'HASH' && tied(%$value) ) {
301             DBM::Deep->_throw_error( "Cannot store something that is tied." );
302         }
303         $class = 'DBM::Deep::Engine::Sector::Reference';
304         $type = substr( $r, 0, 1 );
305     }
306     else {
307         if ( tied($value) ) {
308             DBM::Deep->_throw_error( "Cannot store something that is tied." );
309         }
310         $class = 'DBM::Deep::Engine::Sector::Scalar';
311     }
312
313     # Create this after loading the reference sector in case something bad happens.
314     # This way, we won't allocate value sector(s) needlessly.
315     my $value_sector = $class->new({
316         engine => $self,
317         data   => $value,
318         type   => $type,
319     });
320
321     $sector->write_data({
322         key     => $key,
323         key_md5 => $self->_apply_digest( $key ),
324         value   => $value_sector,
325     });
326
327     # This code is to make sure we write all the values in the $value to the disk
328     # and to make sure all changes to $value after the assignment are reflected
329     # on disk. This may be counter-intuitive at first, but it is correct dwimmery.
330     #   NOTE - simply tying $value won't perform a STORE on each value. Hence, the
331     # copy to a temp value.
332     if ( $r eq 'ARRAY' ) {
333         my @temp = @$value;
334         tie @$value, 'DBM::Deep', {
335             base_offset => $value_sector->offset,
336             staleness   => $value_sector->staleness,
337             storage     => $self->storage,
338             engine      => $self,
339         };
340         @$value = @temp;
341         bless $value, 'DBM::Deep::Array' unless Scalar::Util::blessed( $value );
342     }
343     elsif ( $r eq 'HASH' ) {
344         my %temp = %$value;
345         tie %$value, 'DBM::Deep', {
346             base_offset => $value_sector->offset,
347             staleness   => $value_sector->staleness,
348             storage     => $self->storage,
349             engine      => $self,
350         };
351
352         %$value = %temp;
353         bless $value, 'DBM::Deep::Hash' unless Scalar::Util::blessed( $value );
354     }
355
356     return 1;
357 }
358
359 # XXX Add staleness here
360 sub get_next_key {
361     my $self = shift;
362     my ($obj, $prev_key) = @_;
363
364     # XXX Need to add logic about resetting the iterator if any key in the reference has changed
365     unless ( $prev_key ) {
366         $obj->{iterator} = DBM::Deep::Iterator->new({
367             base_offset => $obj->_base_offset,
368             engine      => $self,
369         });
370     }
371
372     return $obj->{iterator}->get_next_key( $obj );
373 }
374
375 ################################################################################
376
377 sub setup_fh {
378     my $self = shift;
379     my ($obj) = @_;
380
381     # We're opening the file.
382     unless ( $obj->_base_offset ) {
383         my $bytes_read = $self->_read_file_header;
384
385         # Creating a new file
386         unless ( $bytes_read ) {
387             $self->_write_file_header;
388
389             # 1) Create Array/Hash entry
390             my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
391                 engine => $self,
392                 type   => $obj->_type,
393             });
394             $obj->{base_offset} = $initial_reference->offset;
395             $obj->{staleness} = $initial_reference->staleness;
396
397             $self->storage->flush;
398         }
399         # Reading from an existing file
400         else {
401             $obj->{base_offset} = $bytes_read;
402             my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
403                 engine => $self,
404                 offset => $obj->_base_offset,
405             });
406             unless ( $initial_reference ) {
407                 DBM::Deep->_throw_error("Corrupted file, no master index record");
408             }
409
410             unless ($obj->_type eq $initial_reference->type) {
411                 DBM::Deep->_throw_error("File type mismatch");
412             }
413
414             $obj->{staleness} = $initial_reference->staleness;
415         }
416     }
417
418     return 1;
419 }
420
421 sub begin_work {
422     my $self = shift;
423     my ($obj) = @_;
424
425     if ( $self->trans_id ) {
426         DBM::Deep->_throw_error( "Cannot begin_work within an active transaction" );
427     }
428
429     my @slots = $self->read_txn_slots;
430     my $found;
431     for my $i ( 0 .. $#slots ) {
432         next if $slots[$i];
433
434         $slots[$i] = 1;
435         $self->set_trans_id( $i + 1 );
436         $found = 1;
437         last;
438     }
439     unless ( $found ) {
440         DBM::Deep->_throw_error( "Cannot allocate transaction ID" );
441     }
442     $self->write_txn_slots( @slots );
443
444     if ( !$self->trans_id ) {
445         DBM::Deep->_throw_error( "Cannot begin_work - no available transactions" );
446     }
447
448     return;
449 }
450
451 sub rollback {
452     my $self = shift;
453     my ($obj) = @_;
454
455     if ( !$self->trans_id ) {
456         DBM::Deep->_throw_error( "Cannot rollback without an active transaction" );
457     }
458
459     # Each entry is the file location for a bucket that has a modification for
460     # this transaction. The entries need to be expunged.
461     foreach my $entry (@{ $self->get_entries } ) {
462         # Remove the entry here
463         my $read_loc = $entry
464           + $self->hash_size
465           + $self->byte_size
466           + $self->byte_size
467           + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE );
468
469         my $data_loc = $self->storage->read_at( $read_loc, $self->byte_size );
470         $data_loc = unpack( $StP{$self->byte_size}, $data_loc );
471         $self->storage->print_at( $read_loc, pack( $StP{$self->byte_size}, 0 ) );
472
473         if ( $data_loc > 1 ) {
474             $self->_load_sector( $data_loc )->free;
475         }
476     }
477
478     $self->clear_entries;
479
480     my @slots = $self->read_txn_slots;
481     $slots[$self->trans_id-1] = 0;
482     $self->write_txn_slots( @slots );
483     $self->inc_txn_staleness_counter( $self->trans_id );
484     $self->set_trans_id( 0 );
485
486     return 1;
487 }
488
489 sub commit {
490     my $self = shift;
491     my ($obj) = @_;
492
493     if ( !$self->trans_id ) {
494         DBM::Deep->_throw_error( "Cannot commit without an active transaction" );
495     }
496
497     foreach my $entry (@{ $self->get_entries } ) {
498         # Overwrite the entry in head with the entry in trans_id
499         my $base = $entry
500           + $self->hash_size
501           + $self->byte_size;
502
503         my $head_loc = $self->storage->read_at( $base, $self->byte_size );
504         $head_loc = unpack( $StP{$self->byte_size}, $head_loc );
505
506         my $spot = $base + $self->byte_size + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE );
507         my $trans_loc = $self->storage->read_at(
508             $spot, $self->byte_size,
509         );
510
511         $self->storage->print_at( $base, $trans_loc );
512         $self->storage->print_at(
513             $spot,
514             pack( $StP{$self->byte_size} . ' ' . $StP{$STALE_SIZE}, (0) x 2 ),
515         );
516
517         if ( $head_loc > 1 ) {
518             $self->_load_sector( $head_loc )->free;
519         }
520     }
521
522     $self->clear_entries;
523
524     my @slots = $self->read_txn_slots;
525     $slots[$self->trans_id-1] = 0;
526     $self->write_txn_slots( @slots );
527     $self->inc_txn_staleness_counter( $self->trans_id );
528     $self->set_trans_id( 0 );
529
530     return 1;
531 }
532
533 sub read_txn_slots {
534     my $self = shift;
535     my $bl = $self->txn_bitfield_len;
536     my $num_bits = $bl * 8;
537     return split '', unpack( 'b'.$num_bits,
538         $self->storage->read_at(
539             $self->trans_loc, $bl,
540         )
541     );
542 }
543
544 sub write_txn_slots {
545     my $self = shift;
546     my $num_bits = $self->txn_bitfield_len * 8;
547     $self->storage->print_at( $self->trans_loc,
548         pack( 'b'.$num_bits, join('', @_) ),
549     );
550 }
551
552 sub get_running_txn_ids {
553     my $self = shift;
554     my @transactions = $self->read_txn_slots;
555     my @trans_ids = map { $_+1} grep { $transactions[$_] } 0 .. $#transactions;
556 }
557
558 sub get_txn_staleness_counter {
559     my $self = shift;
560     my ($trans_id) = @_;
561
562     # Hardcode staleness of 0 for the HEAD
563     return 0 unless $trans_id;
564
565     return unpack( $StP{$STALE_SIZE},
566         $self->storage->read_at(
567             $self->trans_loc + 4 + $STALE_SIZE * ($trans_id - 1),
568             4,
569         )
570     );
571 }
572
573 sub inc_txn_staleness_counter {
574     my $self = shift;
575     my ($trans_id) = @_;
576
577     # Hardcode staleness of 0 for the HEAD
578     return unless $trans_id;
579
580     $self->storage->print_at(
581         $self->trans_loc + 4 + $STALE_SIZE * ($trans_id - 1),
582         pack( $StP{$STALE_SIZE}, $self->get_txn_staleness_counter( $trans_id ) + 1 ),
583     );
584 }
585
586 sub get_entries {
587     my $self = shift;
588     return [ keys %{ $self->{entries}{$self->trans_id} ||= {} } ];
589 }
590
591 sub add_entry {
592     my $self = shift;
593     my ($trans_id, $loc) = @_;
594
595     $self->{entries}{$trans_id} ||= {};
596     $self->{entries}{$trans_id}{$loc} = undef;
597 }
598
599 # If the buckets are being relocated because of a reindexing, the entries
600 # mechanism needs to be made aware of it.
601 sub reindex_entry {
602     my $self = shift;
603     my ($old_loc, $new_loc) = @_;
604
605     TRANS:
606     while ( my ($trans_id, $locs) = each %{ $self->{entries} } ) {
607         foreach my $orig_loc ( keys %{ $locs } ) {
608             if ( $orig_loc == $old_loc ) {
609                 delete $locs->{orig_loc};
610                 $locs->{$new_loc} = undef;
611                 next TRANS;
612             }
613         }
614     }
615 }
616
617 sub clear_entries {
618     my $self = shift;
619     delete $self->{entries}{$self->trans_id};
620 }
621
622 ################################################################################
623
624 {
625     my $header_fixed = length( SIG_FILE ) + 1 + 4 + 4;
626     my $this_file_version = 3;
627
628     sub _write_file_header {
629         my $self = shift;
630
631         my $nt = $self->num_txns;
632         my $bl = $self->txn_bitfield_len;
633
634         my $header_var = 1 + 1 + 1 + 1 + $bl + $STALE_SIZE * ($nt - 1) + 3 * $self->byte_size;
635
636         my $loc = $self->storage->request_space( $header_fixed + $header_var );
637
638         $self->storage->print_at( $loc,
639             SIG_FILE,
640             SIG_HEADER,
641             pack('N', $this_file_version), # At this point, we're at 9 bytes
642             pack('N', $header_var),        # header size
643             # --- Above is $header_fixed. Below is $header_var
644             pack('C', $self->byte_size),
645
646             # These shenanigans are to allow a 256 within a C
647             pack('C', $self->max_buckets - 1),
648             pack('C', $self->data_sector_size - 1),
649
650             pack('C', $nt),
651             pack('C' . $bl, 0 ),                           # Transaction activeness bitfield
652             pack($StP{$STALE_SIZE}.($nt-1), 0 x ($nt-1) ), # Transaction staleness counters
653             pack($StP{$self->byte_size}, 0), # Start of free chain (blist size)
654             pack($StP{$self->byte_size}, 0), # Start of free chain (data size)
655             pack($StP{$self->byte_size}, 0), # Start of free chain (index size)
656         );
657
658         #XXX Set these less fragilely
659         $self->set_trans_loc( $header_fixed + 4 );
660         $self->set_chains_loc( $header_fixed + 4 + $bl + $STALE_SIZE * ($nt-1) );
661
662         return;
663     }
664
665     sub _read_file_header {
666         my $self = shift;
667
668         my $buffer = $self->storage->read_at( 0, $header_fixed );
669         return unless length($buffer);
670
671         my ($file_signature, $sig_header, $file_version, $size) = unpack(
672             'A4 A N N', $buffer
673         );
674
675         unless ( $file_signature eq SIG_FILE ) {
676             $self->storage->close;
677             DBM::Deep->_throw_error( "Signature not found -- file is not a Deep DB" );
678         }
679
680         unless ( $sig_header eq SIG_HEADER ) {
681             $self->storage->close;
682             DBM::Deep->_throw_error( "Pre-1.00 file version found" );
683         }
684
685         unless ( $file_version == $this_file_version ) {
686             $self->storage->close;
687             DBM::Deep->_throw_error(
688                 "Wrong file version found - " .  $file_version .
689                 " - expected " . $this_file_version
690             );
691         }
692
693         my $buffer2 = $self->storage->read_at( undef, $size );
694         my @values = unpack( 'C C C C', $buffer2 );
695
696         if ( @values != 4 || grep { !defined } @values ) {
697             $self->storage->close;
698             DBM::Deep->_throw_error("Corrupted file - bad header");
699         }
700
701         #XXX Add warnings if values weren't set right
702         @{$self}{qw(byte_size max_buckets data_sector_size num_txns)} = @values;
703
704         # These shenangians are to allow a 256 within a C
705         $self->{max_buckets} += 1;
706         $self->{data_sector_size} += 1;
707
708         my $bl = $self->txn_bitfield_len;
709
710         my $header_var = scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) + 3 * $self->byte_size;
711         unless ( $size == $header_var ) {
712             $self->storage->close;
713             DBM::Deep->_throw_error( "Unexpected size found ($size <-> $header_var)." );
714         }
715
716         $self->set_trans_loc( $header_fixed + scalar(@values) );
717         $self->set_chains_loc( $header_fixed + scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) );
718
719         return length($buffer) + length($buffer2);
720     }
721 }
722
723 sub _load_sector {
724     my $self = shift;
725     my ($offset) = @_;
726
727     # Add a catch for offset of 0 or 1
728     return if $offset <= 1;
729
730     my $type = $self->storage->read_at( $offset, 1 );
731     return if $type eq chr(0);
732
733     if ( $type eq $self->SIG_ARRAY || $type eq $self->SIG_HASH ) {
734         return DBM::Deep::Engine::Sector::Reference->new({
735             engine => $self,
736             type   => $type,
737             offset => $offset,
738         });
739     }
740     # XXX Don't we need key_md5 here?
741     elsif ( $type eq $self->SIG_BLIST ) {
742         return DBM::Deep::Engine::Sector::BucketList->new({
743             engine => $self,
744             type   => $type,
745             offset => $offset,
746         });
747     }
748     elsif ( $type eq $self->SIG_INDEX ) {
749         return DBM::Deep::Engine::Sector::Index->new({
750             engine => $self,
751             type   => $type,
752             offset => $offset,
753         });
754     }
755     elsif ( $type eq $self->SIG_NULL ) {
756         return DBM::Deep::Engine::Sector::Null->new({
757             engine => $self,
758             type   => $type,
759             offset => $offset,
760         });
761     }
762     elsif ( $type eq $self->SIG_DATA ) {
763         return DBM::Deep::Engine::Sector::Scalar->new({
764             engine => $self,
765             type   => $type,
766             offset => $offset,
767         });
768     }
769     # This was deleted from under us, so just return and let the caller figure it out.
770     elsif ( $type eq $self->SIG_FREE ) {
771         return;
772     }
773
774     DBM::Deep->_throw_error( "'$offset': Don't know what to do with type '$type'" );
775 }
776
777 sub _apply_digest {
778     my $self = shift;
779     return $self->{digest}->(@_);
780 }
781
782 sub _add_free_blist_sector { shift->_add_free_sector( 0, @_ ) }
783 sub _add_free_data_sector { shift->_add_free_sector( 1, @_ ) }
784 sub _add_free_index_sector { shift->_add_free_sector( 2, @_ ) }
785
786 sub _add_free_sector {
787     my $self = shift;
788     my ($multiple, $offset, $size) = @_;
789
790     my $chains_offset = $multiple * $self->byte_size;
791
792     my $storage = $self->storage;
793
794     # Increment staleness.
795     # XXX Can this increment+modulo be done by "&= 0x1" ?
796     my $staleness = unpack( $StP{$STALE_SIZE}, $storage->read_at( $offset + SIG_SIZE, $STALE_SIZE ) );
797     $staleness = ($staleness + 1 ) % ( 2 ** ( 8 * $STALE_SIZE ) );
798     $storage->print_at( $offset + SIG_SIZE, pack( $StP{$STALE_SIZE}, $staleness ) );
799
800     my $old_head = $storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
801
802     $storage->print_at( $self->chains_loc + $chains_offset,
803         pack( $StP{$self->byte_size}, $offset ),
804     );
805
806     # Record the old head in the new sector after the signature and staleness counter
807     $storage->print_at( $offset + SIG_SIZE + $STALE_SIZE, $old_head );
808 }
809
810 sub _request_blist_sector { shift->_request_sector( 0, @_ ) }
811 sub _request_data_sector { shift->_request_sector( 1, @_ ) }
812 sub _request_index_sector { shift->_request_sector( 2, @_ ) }
813
814 sub _request_sector {
815     my $self = shift;
816     my ($multiple, $size) = @_;
817
818     my $chains_offset = $multiple * $self->byte_size;
819
820     my $old_head = $self->storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
821     my $loc = unpack( $StP{$self->byte_size}, $old_head );
822
823     # We don't have any free sectors of the right size, so allocate a new one.
824     unless ( $loc ) {
825         my $offset = $self->storage->request_space( $size );
826
827         # Zero out the new sector. This also guarantees correct increases
828         # in the filesize.
829         $self->storage->print_at( $offset, chr(0) x $size );
830
831         return $offset;
832     }
833
834     # Read the new head after the signature and the staleness counter
835     my $new_head = $self->storage->read_at( $loc + SIG_SIZE + $STALE_SIZE, $self->byte_size );
836     $self->storage->print_at( $self->chains_loc + $chains_offset, $new_head );
837     $self->storage->print_at(
838         $loc + SIG_SIZE + $STALE_SIZE,
839         pack( $StP{$self->byte_size}, 0 ),
840     );
841
842     return $loc;
843 }
844
845 ################################################################################
846
847 sub storage     { $_[0]{storage} }
848 sub byte_size   { $_[0]{byte_size} }
849 sub hash_size   { $_[0]{hash_size} }
850 sub hash_chars  { $_[0]{hash_chars} }
851 sub num_txns    { $_[0]{num_txns} }
852 sub max_buckets { $_[0]{max_buckets} }
853 sub blank_md5   { chr(0) x $_[0]->hash_size }
854 sub data_sector_size { $_[0]{data_sector_size} }
855
856 # This is a calculated value
857 sub txn_bitfield_len {
858     my $self = shift;
859     unless ( exists $self->{txn_bitfield_len} ) {
860         my $temp = ($self->num_txns) / 8;
861         if ( $temp > int( $temp ) ) {
862             $temp = int( $temp ) + 1;
863         }
864         $self->{txn_bitfield_len} = $temp;
865     }
866     return $self->{txn_bitfield_len};
867 }
868
869 sub trans_id     { $_[0]{trans_id} }
870 sub set_trans_id { $_[0]{trans_id} = $_[1] }
871
872 sub trans_loc     { $_[0]{trans_loc} }
873 sub set_trans_loc { $_[0]{trans_loc} = $_[1] }
874
875 sub chains_loc     { $_[0]{chains_loc} }
876 sub set_chains_loc { $_[0]{chains_loc} = $_[1] }
877
878 ################################################################################
879
880 package DBM::Deep::Iterator;
881
882 sub new {
883     my $class = shift;
884     my ($args) = @_;
885
886     my $self = bless {
887         breadcrumbs => [],
888         engine      => $args->{engine},
889         base_offset => $args->{base_offset},
890     }, $class;
891
892     Scalar::Util::weaken( $self->{engine} );
893
894     return $self;
895 }
896
897 sub reset { $_[0]{breadcrumbs} = [] }
898
899 sub get_sector_iterator {
900     my $self = shift;
901     my ($loc) = @_;
902
903     my $sector = $self->{engine}->_load_sector( $loc )
904         or return;
905
906     if ( $sector->isa( 'DBM::Deep::Engine::Sector::Index' ) ) {
907         return DBM::Deep::Iterator::Index->new({
908             iterator => $self,
909             sector   => $sector,
910         });
911     }
912     elsif ( $sector->isa( 'DBM::Deep::Engine::Sector::BucketList' ) ) {
913         return DBM::Deep::Iterator::BucketList->new({
914             iterator => $self,
915             sector   => $sector,
916         });
917     }
918
919     DBM::Deep->_throw_error( "get_sector_iterator(): Why did $loc make a $sector?" );
920 }
921
922 sub get_next_key {
923     my $self = shift;
924     my ($obj) = @_;
925
926     my $crumbs = $self->{breadcrumbs};
927     my $e = $self->{engine};
928
929     unless ( @$crumbs ) {
930         # This will be a Reference sector
931         my $sector = $e->_load_sector( $self->{base_offset} )
932             # If no sector is found, thist must have been deleted from under us.
933             or return;
934
935         if ( $sector->staleness != $obj->_staleness ) {
936             return;
937         }
938
939         my $loc = $sector->get_blist_loc
940             or return;
941
942         push @$crumbs, $self->get_sector_iterator( $loc );
943     }
944
945     FIND_NEXT_KEY: {
946         # We're at the end.
947         unless ( @$crumbs ) {
948             $self->reset;
949             return;
950         }
951
952         my $iterator = $crumbs->[-1];
953
954         # This level is done.
955         if ( $iterator->at_end ) {
956             pop @$crumbs;
957             redo FIND_NEXT_KEY;
958         }
959
960         if ( $iterator->isa( 'DBM::Deep::Iterator::Index' ) ) {
961             # If we don't have any more, it will be caught at the
962             # prior check.
963             if ( my $next = $iterator->get_next_iterator ) {
964                 push @$crumbs, $next;
965             }
966             redo FIND_NEXT_KEY;
967         }
968
969         unless ( $iterator->isa( 'DBM::Deep::Iterator::BucketList' ) ) {
970             DBM::Deep->_throw_error(
971                 "Should have a bucketlist iterator here - instead have $iterator"
972             );
973         }
974
975         # At this point, we have a BucketList iterator
976         my $key = $iterator->get_next_key;
977         if ( defined $key ) {
978             return $key;
979         }
980         #XXX else { $iterator->set_to_end() } ?
981
982         # We hit the end of the bucketlist iterator, so redo
983         redo FIND_NEXT_KEY;
984     }
985
986     DBM::Deep->_throw_error( "get_next_key(): How did we get here?" );
987 }
988
989 package DBM::Deep::Iterator::Index;
990
991 sub new {
992     my $self = bless $_[1] => $_[0];
993     $self->{curr_index} = 0;
994     return $self;
995 }
996
997 sub at_end {
998     my $self = shift;
999     return $self->{curr_index} >= $self->{iterator}{engine}->hash_chars;
1000 }
1001
1002 sub get_next_iterator {
1003     my $self = shift;
1004
1005     my $loc;
1006     while ( !$loc ) {
1007         return if $self->at_end;
1008         $loc = $self->{sector}->get_entry( $self->{curr_index}++ );
1009     }
1010
1011     return $self->{iterator}->get_sector_iterator( $loc );
1012 }
1013
1014 package DBM::Deep::Iterator::BucketList;
1015
1016 sub new {
1017     my $self = bless $_[1] => $_[0];
1018     $self->{curr_index} = 0;
1019     return $self;
1020 }
1021
1022 sub at_end {
1023     my $self = shift;
1024     return $self->{curr_index} >= $self->{iterator}{engine}->max_buckets;
1025 }
1026
1027 sub get_next_key {
1028     my $self = shift;
1029
1030     return if $self->at_end;
1031
1032     my $idx = $self->{curr_index}++;
1033
1034     my $data_loc = $self->{sector}->get_data_location_for({
1035         allow_head => 1,
1036         idx        => $idx,
1037     }) or return;
1038
1039     #XXX Do we want to add corruption checks here?
1040     return $self->{sector}->get_key_for( $idx )->data;
1041 }
1042
1043 package DBM::Deep::Engine::Sector;
1044
1045 sub new {
1046     my $self = bless $_[1], $_[0];
1047     Scalar::Util::weaken( $self->{engine} );
1048     $self->_init;
1049     return $self;
1050 }
1051
1052 #sub _init {}
1053 #sub clone { DBM::Deep->_throw_error( "Must be implemented in the child class" ); }
1054
1055 sub engine { $_[0]{engine} }
1056 sub offset { $_[0]{offset} }
1057 sub type   { $_[0]{type} }
1058
1059 sub base_size {
1060    my $self = shift;
1061    return $self->engine->SIG_SIZE + $STALE_SIZE;
1062 }
1063
1064 sub free {
1065     my $self = shift;
1066
1067     my $e = $self->engine;
1068
1069     $e->storage->print_at( $self->offset, $e->SIG_FREE );
1070     # Skip staleness counter
1071     $e->storage->print_at( $self->offset + $self->base_size,
1072         chr(0) x ($self->size - $self->base_size),
1073     );
1074
1075     my $free_meth = $self->free_meth;
1076     $e->$free_meth( $self->offset, $self->size );
1077
1078     return;
1079 }
1080
1081 package DBM::Deep::Engine::Sector::Data;
1082
1083 our @ISA = qw( DBM::Deep::Engine::Sector );
1084
1085 # This is in bytes
1086 sub size { $_[0]{engine}->data_sector_size }
1087 sub free_meth { return '_add_free_data_sector' }
1088
1089 sub clone {
1090     my $self = shift;
1091     return ref($self)->new({
1092         engine => $self->engine,
1093         type   => $self->type,
1094         data   => $self->data,
1095     });
1096 }
1097
1098 package DBM::Deep::Engine::Sector::Scalar;
1099
1100 our @ISA = qw( DBM::Deep::Engine::Sector::Data );
1101
1102 sub free {
1103     my $self = shift;
1104
1105     my $chain_loc = $self->chain_loc;
1106
1107     $self->SUPER::free();
1108
1109     if ( $chain_loc ) {
1110         $self->engine->_load_sector( $chain_loc )->free;
1111     }
1112
1113     return;
1114 }
1115
1116 sub type { $_[0]{engine}->SIG_DATA }
1117 sub _init {
1118     my $self = shift;
1119
1120     my $engine = $self->engine;
1121
1122     unless ( $self->offset ) {
1123         my $data_section = $self->size - $self->base_size - $engine->byte_size - 1;
1124
1125         $self->{offset} = $engine->_request_data_sector( $self->size );
1126
1127         my $data = delete $self->{data};
1128         my $dlen = length $data;
1129         my $continue = 1;
1130         my $curr_offset = $self->offset;
1131         while ( $continue ) {
1132
1133             my $next_offset = 0;
1134
1135             my ($leftover, $this_len, $chunk);
1136             if ( $dlen > $data_section ) {
1137                 $leftover = 0;
1138                 $this_len = $data_section;
1139                 $chunk = substr( $data, 0, $this_len );
1140
1141                 $dlen -= $data_section;
1142                 $next_offset = $engine->_request_data_sector( $self->size );
1143                 $data = substr( $data, $this_len );
1144             }
1145             else {
1146                 $leftover = $data_section - $dlen;
1147                 $this_len = $dlen;
1148                 $chunk = $data;
1149
1150                 $continue = 0;
1151             }
1152
1153             $engine->storage->print_at( $curr_offset, $self->type ); # Sector type
1154             # Skip staleness
1155             $engine->storage->print_at( $curr_offset + $self->base_size,
1156                 pack( $StP{$engine->byte_size}, $next_offset ),  # Chain loc
1157                 pack( $StP{1}, $this_len ),                      # Data length
1158                 $chunk,                                          # Data to be stored in this sector
1159                 chr(0) x $leftover,                              # Zero-fill the rest
1160             );
1161
1162             $curr_offset = $next_offset;
1163         }
1164
1165         return;
1166     }
1167 }
1168
1169 sub data_length {
1170     my $self = shift;
1171
1172     my $buffer = $self->engine->storage->read_at(
1173         $self->offset + $self->base_size + $self->engine->byte_size, 1
1174     );
1175
1176     return unpack( $StP{1}, $buffer );
1177 }
1178
1179 sub chain_loc {
1180     my $self = shift;
1181     return unpack(
1182         $StP{$self->engine->byte_size},
1183         $self->engine->storage->read_at(
1184             $self->offset + $self->base_size,
1185             $self->engine->byte_size,
1186         ),
1187     );
1188 }
1189
1190 sub data {
1191     my $self = shift;
1192
1193     my $data;
1194     while ( 1 ) {
1195         my $chain_loc = $self->chain_loc;
1196
1197         $data .= $self->engine->storage->read_at(
1198             $self->offset + $self->base_size + $self->engine->byte_size + 1, $self->data_length,
1199         );
1200
1201         last unless $chain_loc;
1202
1203         $self = $self->engine->_load_sector( $chain_loc );
1204     }
1205
1206     return $data;
1207 }
1208
1209 package DBM::Deep::Engine::Sector::Null;
1210
1211 our @ISA = qw( DBM::Deep::Engine::Sector::Data );
1212
1213 sub type { $_[0]{engine}->SIG_NULL }
1214 sub data_length { 0 }
1215 sub data { return }
1216
1217 sub _init {
1218     my $self = shift;
1219
1220     my $engine = $self->engine;
1221
1222     unless ( $self->offset ) {
1223         my $leftover = $self->size - $self->base_size - 1 * $engine->byte_size - 1;
1224
1225         $self->{offset} = $engine->_request_data_sector( $self->size );
1226         $engine->storage->print_at( $self->offset, $self->type ); # Sector type
1227         # Skip staleness counter
1228         $engine->storage->print_at( $self->offset + $self->base_size,
1229             pack( $StP{$engine->byte_size}, 0 ),  # Chain loc
1230             pack( $StP{1}, $self->data_length ),  # Data length
1231             chr(0) x $leftover,                   # Zero-fill the rest
1232         );
1233
1234         return;
1235     }
1236 }
1237
1238 package DBM::Deep::Engine::Sector::Reference;
1239
1240 our @ISA = qw( DBM::Deep::Engine::Sector::Data );
1241
1242 sub _init {
1243     my $self = shift;
1244
1245     my $e = $self->engine;
1246
1247     unless ( $self->offset ) {
1248         my $classname = Scalar::Util::blessed( delete $self->{data} );
1249         my $leftover = $self->size - $self->base_size - 3 * $e->byte_size;
1250
1251         my $class_offset = 0;
1252         if ( defined $classname ) {
1253             my $class_sector = DBM::Deep::Engine::Sector::Scalar->new({
1254                 engine => $e,
1255                 data   => $classname,
1256             });
1257             $class_offset = $class_sector->offset;
1258         }
1259
1260         $self->{offset} = $e->_request_data_sector( $self->size );
1261         $e->storage->print_at( $self->offset, $self->type ); # Sector type
1262         # Skip staleness counter
1263         $e->storage->print_at( $self->offset + $self->base_size,
1264             pack( $StP{$e->byte_size}, 0 ),             # Index/BList loc
1265             pack( $StP{$e->byte_size}, $class_offset ), # Classname loc
1266             pack( $StP{$e->byte_size}, 1 ),             # Initial refcount
1267             chr(0) x $leftover,                         # Zero-fill the rest
1268         );
1269     }
1270     else {
1271         $self->{type} = $e->storage->read_at( $self->offset, 1 );
1272     }
1273
1274     $self->{staleness} = unpack(
1275         $StP{$STALE_SIZE},
1276         $e->storage->read_at( $self->offset + $e->SIG_SIZE, $STALE_SIZE ),
1277     );
1278
1279     return;
1280 }
1281
1282 sub free {
1283     my $self = shift;
1284
1285     # We're not ready to be removed yet.
1286     if ( $self->decrement_refcount > 0 ) {
1287         return;
1288     }
1289
1290     my $blist_loc = $self->get_blist_loc;
1291     $self->engine->_load_sector( $blist_loc )->free if $blist_loc;
1292
1293     my $class_loc = $self->get_class_offset;
1294     $self->engine->_load_sector( $class_loc )->free if $class_loc;
1295
1296     $self->SUPER::free();
1297 }
1298
1299 sub staleness { $_[0]{staleness} }
1300
1301 sub get_data_for {
1302     my $self = shift;
1303     my ($args) = @_;
1304
1305     # Assume that the head is not allowed unless otherwise specified.
1306     $args->{allow_head} = 0 unless exists $args->{allow_head};
1307
1308     # Assume we don't create a new blist location unless otherwise specified.
1309     $args->{create} = 0 unless exists $args->{create};
1310
1311     my $blist = $self->get_bucket_list({
1312         key_md5 => $args->{key_md5},
1313         key => $args->{key},
1314         create  => $args->{create},
1315     });
1316     return unless $blist && $blist->{found};
1317
1318     # At this point, $blist knows where the md5 is. What it -doesn't- know yet
1319     # is whether or not this transaction has this key. That's part of the next
1320     # function call.
1321     my $location = $blist->get_data_location_for({
1322         allow_head => $args->{allow_head},
1323     }) or return;
1324
1325     return $self->engine->_load_sector( $location );
1326 }
1327
1328 sub write_data {
1329     my $self = shift;
1330     my ($args) = @_;
1331
1332     my $blist = $self->get_bucket_list({
1333         key_md5 => $args->{key_md5},
1334         key => $args->{key},
1335         create  => 1,
1336     }) or DBM::Deep->_throw_error( "How did write_data fail (no blist)?!" );
1337
1338     # Handle any transactional bookkeeping.
1339     if ( $self->engine->trans_id ) {
1340         if ( ! $blist->has_md5 ) {
1341             $blist->mark_deleted({
1342                 trans_id => 0,
1343             });
1344         }
1345     }
1346     else {
1347         my @trans_ids = $self->engine->get_running_txn_ids;
1348         if ( $blist->has_md5 ) {
1349             if ( @trans_ids ) {
1350                 my $old_value = $blist->get_data_for;
1351                 foreach my $other_trans_id ( @trans_ids ) {
1352                     next if $blist->get_data_location_for({
1353                         trans_id   => $other_trans_id,
1354                         allow_head => 0,
1355                     });
1356                     $blist->write_md5({
1357                         trans_id => $other_trans_id,
1358                         key      => $args->{key},
1359                         key_md5  => $args->{key_md5},
1360                         value    => $old_value->clone,
1361                     });
1362                 }
1363             }
1364         }
1365         else {
1366             if ( @trans_ids ) {
1367                 foreach my $other_trans_id ( @trans_ids ) {
1368                     #XXX This doesn't seem to possible to ever happen . . .
1369                     next if $blist->get_data_location_for({ trans_id => $other_trans_id, allow_head => 0 });
1370                     $blist->mark_deleted({
1371                         trans_id => $other_trans_id,
1372                     });
1373                 }
1374             }
1375         }
1376     }
1377
1378     #XXX Is this safe to do transactionally?
1379     # Free the place we're about to write to.
1380     if ( $blist->get_data_location_for({ allow_head => 0 }) ) {
1381         $blist->get_data_for({ allow_head => 0 })->free;
1382     }
1383
1384     $blist->write_md5({
1385         key      => $args->{key},
1386         key_md5  => $args->{key_md5},
1387         value    => $args->{value},
1388     });
1389 }
1390
1391 sub delete_key {
1392     my $self = shift;
1393     my ($args) = @_;
1394
1395     # XXX What should happen if this fails?
1396     my $blist = $self->get_bucket_list({
1397         key_md5 => $args->{key_md5},
1398     }) or DBM::Deep->_throw_error( "How did delete_key fail (no blist)?!" );
1399
1400     # Save the location so that we can free the data
1401     my $location = $blist->get_data_location_for({
1402         allow_head => 0,
1403     });
1404     my $old_value = $location && $self->engine->_load_sector( $location );
1405
1406     my @trans_ids = $self->engine->get_running_txn_ids;
1407
1408     if ( $self->engine->trans_id == 0 ) {
1409         if ( @trans_ids ) {
1410             foreach my $other_trans_id ( @trans_ids ) {
1411                 next if $blist->get_data_location_for({ trans_id => $other_trans_id, allow_head => 0 });
1412                 $blist->write_md5({
1413                     trans_id => $other_trans_id,
1414                     key      => $args->{key},
1415                     key_md5  => $args->{key_md5},
1416                     value    => $old_value->clone,
1417                 });
1418             }
1419         }
1420     }
1421
1422     my $data;
1423     if ( @trans_ids ) {
1424         $blist->mark_deleted( $args );
1425
1426         if ( $old_value ) {
1427             $data = $old_value->data;
1428             $old_value->free;
1429         }
1430     }
1431     else {
1432         $data = $blist->delete_md5( $args );
1433     }
1434
1435     return $data;
1436 }
1437
1438 sub get_blist_loc {
1439     my $self = shift;
1440
1441     my $e = $self->engine;
1442     my $blist_loc = $e->storage->read_at( $self->offset + $self->base_size, $e->byte_size );
1443     return unpack( $StP{$e->byte_size}, $blist_loc );
1444 }
1445
1446 sub get_bucket_list {
1447     my $self = shift;
1448     my ($args) = @_;
1449     $args ||= {};
1450
1451     # XXX Add in check here for recycling?
1452
1453     my $engine = $self->engine;
1454
1455     my $blist_loc = $self->get_blist_loc;
1456
1457     # There's no index or blist yet
1458     unless ( $blist_loc ) {
1459         return unless $args->{create};
1460
1461         my $blist = DBM::Deep::Engine::Sector::BucketList->new({
1462             engine  => $engine,
1463             key_md5 => $args->{key_md5},
1464         });
1465
1466         $engine->storage->print_at( $self->offset + $self->base_size,
1467             pack( $StP{$engine->byte_size}, $blist->offset ),
1468         );
1469
1470         return $blist;
1471     }
1472
1473     my $sector = $engine->_load_sector( $blist_loc )
1474         or DBM::Deep->_throw_error( "Cannot read sector at $blist_loc in get_bucket_list()" );
1475     my $i = 0;
1476     my $last_sector = undef;
1477     while ( $sector->isa( 'DBM::Deep::Engine::Sector::Index' ) ) {
1478         $blist_loc = $sector->get_entry( ord( substr( $args->{key_md5}, $i++, 1 ) ) );
1479         $last_sector = $sector;
1480         if ( $blist_loc ) {
1481             $sector = $engine->_load_sector( $blist_loc )
1482                 or DBM::Deep->_throw_error( "Cannot read sector at $blist_loc in get_bucket_list()" );
1483         }
1484         else {
1485             $sector = undef;
1486             last;
1487         }
1488     }
1489
1490     # This means we went through the Index sector(s) and found an empty slot
1491     unless ( $sector ) {
1492         return unless $args->{create};
1493
1494         DBM::Deep->_throw_error( "No last_sector when attempting to build a new entry" )
1495             unless $last_sector;
1496
1497         my $blist = DBM::Deep::Engine::Sector::BucketList->new({
1498             engine  => $engine,
1499             key_md5 => $args->{key_md5},
1500         });
1501
1502         $last_sector->set_entry( ord( substr( $args->{key_md5}, $i - 1, 1 ) ) => $blist->offset );
1503
1504         return $blist;
1505     }
1506
1507     $sector->find_md5( $args->{key_md5} );
1508
1509     # See whether or not we need to reindex the bucketlist
1510     if ( !$sector->has_md5 && $args->{create} && $sector->{idx} == -1 ) {
1511         my $new_index = DBM::Deep::Engine::Sector::Index->new({
1512             engine => $engine,
1513         });
1514
1515         my %blist_cache;
1516         #XXX q.v. the comments for this function.
1517         foreach my $entry ( $sector->chopped_up ) {
1518             my ($spot, $md5) = @{$entry};
1519             my $idx = ord( substr( $md5, $i, 1 ) );
1520
1521             # XXX This is inefficient
1522             my $blist = $blist_cache{$idx}
1523                 ||= DBM::Deep::Engine::Sector::BucketList->new({
1524                     engine => $engine,
1525                 });
1526
1527             $new_index->set_entry( $idx => $blist->offset );
1528
1529             my $new_spot = $blist->write_at_next_open( $md5 );
1530             $engine->reindex_entry( $spot => $new_spot );
1531         }
1532
1533         # Handle the new item separately.
1534         {
1535             my $idx = ord( substr( $args->{key_md5}, $i, 1 ) );
1536             my $blist = $blist_cache{$idx}
1537                 ||= DBM::Deep::Engine::Sector::BucketList->new({
1538                     engine => $engine,
1539                 });
1540
1541             $new_index->set_entry( $idx => $blist->offset );
1542
1543             #XXX THIS IS HACKY!
1544             $blist->find_md5( $args->{key_md5} );
1545             $blist->write_md5({
1546                 key     => $args->{key},
1547                 key_md5 => $args->{key_md5},
1548                 value   => DBM::Deep::Engine::Sector::Null->new({
1549                     engine => $engine,
1550                     data   => undef,
1551                 }),
1552             });
1553         }
1554
1555         if ( $last_sector ) {
1556             $last_sector->set_entry(
1557                 ord( substr( $args->{key_md5}, $i - 1, 1 ) ),
1558                 $new_index->offset,
1559             );
1560         } else {
1561             $engine->storage->print_at( $self->offset + $self->base_size,
1562                 pack( $StP{$engine->byte_size}, $new_index->offset ),
1563             );
1564         }
1565
1566         $sector->free;
1567
1568         $sector = $blist_cache{ ord( substr( $args->{key_md5}, $i, 1 ) ) };
1569         $sector->find_md5( $args->{key_md5} );
1570     }
1571
1572     return $sector;
1573 }
1574
1575 sub get_class_offset {
1576     my $self = shift;
1577
1578     my $e = $self->engine;
1579     return unpack(
1580         $StP{$e->byte_size},
1581         $e->storage->read_at(
1582             $self->offset + $self->base_size + 1 * $e->byte_size, $e->byte_size,
1583         ),
1584     );
1585 }
1586
1587 sub get_classname {
1588     my $self = shift;
1589
1590     my $class_offset = $self->get_class_offset;
1591
1592     return unless $class_offset;
1593
1594     return $self->engine->_load_sector( $class_offset )->data;
1595 }
1596
1597 #XXX Add singleton handling here
1598 sub data {
1599     my $self = shift;
1600
1601     my $new_obj = DBM::Deep->new({
1602         type        => $self->type,
1603         base_offset => $self->offset,
1604         staleness   => $self->staleness,
1605         storage     => $self->engine->storage,
1606         engine      => $self->engine,
1607     });
1608
1609     if ( $self->engine->storage->{autobless} ) {
1610         my $classname = $self->get_classname;
1611         if ( defined $classname ) {
1612             bless $new_obj, $classname;
1613         }
1614     }
1615
1616     return $new_obj;
1617 }
1618
1619 sub increment_refcount {
1620     my $self = shift;
1621
1622     my $e = $self->engine;
1623     my $refcount = unpack(
1624         $StP{$e->byte_size},
1625         $e->storage->read_at(
1626             $self->offset + $self->base_size + 2 * $e->byte_size, $e->byte_size,
1627         ),
1628     );
1629
1630     $refcount++;
1631
1632     $e->storage->print_at(
1633         $self->offset + $self->base_size + 2 * $e->byte_size,
1634         pack( $StP{$e->byte_size}, $refcount ),
1635     );
1636
1637     return $refcount;
1638 }
1639
1640 sub decrement_refcount {
1641     my $self = shift;
1642
1643     my $e = $self->engine;
1644     my $refcount = unpack(
1645         $StP{$e->byte_size},
1646         $e->storage->read_at(
1647             $self->offset + $self->base_size + 2 * $e->byte_size, $e->byte_size,
1648         ),
1649     );
1650
1651     $refcount--;
1652
1653     $e->storage->print_at(
1654         $self->offset + $self->base_size + 2 * $e->byte_size,
1655         pack( $StP{$e->byte_size}, $refcount ),
1656     );
1657
1658     return $refcount;
1659 }
1660
1661 sub get_refcount {
1662     my $self = shift;
1663
1664     my $e = $self->engine;
1665     return unpack(
1666         $StP{$e->byte_size},
1667         $e->storage->read_at(
1668             $self->offset + $self->base_size + 2 * $e->byte_size, $e->byte_size,
1669         ),
1670     );
1671 }
1672
1673 package DBM::Deep::Engine::Sector::BucketList;
1674
1675 our @ISA = qw( DBM::Deep::Engine::Sector );
1676
1677 sub _init {
1678     my $self = shift;
1679
1680     my $engine = $self->engine;
1681
1682     unless ( $self->offset ) {
1683         my $leftover = $self->size - $self->base_size;
1684
1685         $self->{offset} = $engine->_request_blist_sector( $self->size );
1686         $engine->storage->print_at( $self->offset, $engine->SIG_BLIST ); # Sector type
1687         # Skip staleness counter
1688         $engine->storage->print_at( $self->offset + $self->base_size,
1689             chr(0) x $leftover, # Zero-fill the data
1690         );
1691     }
1692
1693     if ( $self->{key_md5} ) {
1694         $self->find_md5;
1695     }
1696
1697     return $self;
1698 }
1699
1700 sub size {
1701     my $self = shift;
1702     unless ( $self->{size} ) {
1703         my $e = $self->engine;
1704         # Base + numbuckets * bucketsize
1705         $self->{size} = $self->base_size + $e->max_buckets * $self->bucket_size;
1706     }
1707     return $self->{size};
1708 }
1709
1710 sub free_meth { return '_add_free_blist_sector' }
1711
1712 sub bucket_size {
1713     my $self = shift;
1714     unless ( $self->{bucket_size} ) {
1715         my $e = $self->engine;
1716         # Key + head (location) + transactions (location + staleness-counter)
1717         my $location_size = $e->byte_size + $e->byte_size + ($e->num_txns - 1) * ($e->byte_size + $STALE_SIZE);
1718         $self->{bucket_size} = $e->hash_size + $location_size;
1719     }
1720     return $self->{bucket_size};
1721 }
1722
1723 # XXX This is such a poor hack. I need to rethink this code.
1724 sub chopped_up {
1725     my $self = shift;
1726
1727     my $e = $self->engine;
1728
1729     my @buckets;
1730     foreach my $idx ( 0 .. $e->max_buckets - 1 ) {
1731         my $spot = $self->offset + $self->base_size + $idx * $self->bucket_size;
1732         my $md5 = $e->storage->read_at( $spot, $e->hash_size );
1733
1734         #XXX If we're chopping, why would we ever have the blank_md5?
1735         last if $md5 eq $e->blank_md5;
1736
1737         my $rest = $e->storage->read_at( undef, $self->bucket_size - $e->hash_size );
1738         push @buckets, [ $spot, $md5 . $rest ];
1739     }
1740
1741     return @buckets;
1742 }
1743
1744 sub write_at_next_open {
1745     my $self = shift;
1746     my ($entry) = @_;
1747
1748     #XXX This is such a hack!
1749     $self->{_next_open} = 0 unless exists $self->{_next_open};
1750
1751     my $spot = $self->offset + $self->base_size + $self->{_next_open}++ * $self->bucket_size;
1752     $self->engine->storage->print_at( $spot, $entry );
1753
1754     return $spot;
1755 }
1756
1757 sub has_md5 {
1758     my $self = shift;
1759     unless ( exists $self->{found} ) {
1760         $self->find_md5;
1761     }
1762     return $self->{found};
1763 }
1764
1765 sub find_md5 {
1766     my $self = shift;
1767
1768     $self->{found} = undef;
1769     $self->{idx}   = -1;
1770
1771     if ( @_ ) {
1772         $self->{key_md5} = shift;
1773     }
1774
1775     # If we don't have an MD5, then what are we supposed to do?
1776     unless ( exists $self->{key_md5} ) {
1777         DBM::Deep->_throw_error( "Cannot find_md5 without a key_md5 set" );
1778     }
1779
1780     my $e = $self->engine;
1781     foreach my $idx ( 0 .. $e->max_buckets - 1 ) {
1782         my $potential = $e->storage->read_at(
1783             $self->offset + $self->base_size + $idx * $self->bucket_size, $e->hash_size,
1784         );
1785
1786         if ( $potential eq $e->blank_md5 ) {
1787             $self->{idx} = $idx;
1788             return;
1789         }
1790
1791         if ( $potential eq $self->{key_md5} ) {
1792             $self->{found} = 1;
1793             $self->{idx} = $idx;
1794             return;
1795         }
1796     }
1797
1798     return;
1799 }
1800
1801 sub write_md5 {
1802     my $self = shift;
1803     my ($args) = @_;
1804
1805     DBM::Deep->_throw_error( "write_md5: no key" ) unless exists $args->{key};
1806     DBM::Deep->_throw_error( "write_md5: no key_md5" ) unless exists $args->{key_md5};
1807     DBM::Deep->_throw_error( "write_md5: no value" ) unless exists $args->{value};
1808
1809     my $engine = $self->engine;
1810
1811     $args->{trans_id} = $engine->trans_id unless exists $args->{trans_id};
1812
1813     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
1814     $engine->add_entry( $args->{trans_id}, $spot );
1815
1816     unless ($self->{found}) {
1817         my $key_sector = DBM::Deep::Engine::Sector::Scalar->new({
1818             engine => $engine,
1819             data   => $args->{key},
1820         });
1821
1822         $engine->storage->print_at( $spot,
1823             $args->{key_md5},
1824             pack( $StP{$engine->byte_size}, $key_sector->offset ),
1825         );
1826     }
1827
1828     my $loc = $spot
1829       + $engine->hash_size
1830       + $engine->byte_size;
1831
1832     if ( $args->{trans_id} ) {
1833         $loc += $engine->byte_size + ($args->{trans_id} - 1) * ( $engine->byte_size + $STALE_SIZE );
1834
1835         $engine->storage->print_at( $loc,
1836             pack( $StP{$engine->byte_size}, $args->{value}->offset ),
1837             pack( $StP{$STALE_SIZE}, $engine->get_txn_staleness_counter( $args->{trans_id} ) ),
1838         );
1839     }
1840     else {
1841         $engine->storage->print_at( $loc,
1842             pack( $StP{$engine->byte_size}, $args->{value}->offset ),
1843         );
1844     }
1845 }
1846
1847 sub mark_deleted {
1848     my $self = shift;
1849     my ($args) = @_;
1850     $args ||= {};
1851
1852     my $engine = $self->engine;
1853
1854     $args->{trans_id} = $engine->trans_id unless exists $args->{trans_id};
1855
1856     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
1857     $engine->add_entry( $args->{trans_id}, $spot );
1858
1859     my $loc = $spot
1860       + $engine->hash_size
1861       + $engine->byte_size;
1862
1863     if ( $args->{trans_id} ) {
1864         $loc += $engine->byte_size + ($args->{trans_id} - 1) * ( $engine->byte_size + $STALE_SIZE );
1865
1866         $engine->storage->print_at( $loc,
1867             pack( $StP{$engine->byte_size}, 1 ), # 1 is the marker for deleted
1868             pack( $StP{$STALE_SIZE}, $engine->get_txn_staleness_counter( $args->{trans_id} ) ),
1869         );
1870     }
1871     else {
1872         $engine->storage->print_at( $loc,
1873             pack( $StP{$engine->byte_size}, 1 ), # 1 is the marker for deleted
1874         );
1875     }
1876
1877 }
1878
1879 sub delete_md5 {
1880     my $self = shift;
1881     my ($args) = @_;
1882
1883     my $engine = $self->engine;
1884     return undef unless $self->{found};
1885
1886     # Save the location so that we can free the data
1887     my $location = $self->get_data_location_for({
1888         allow_head => 0,
1889     });
1890     my $key_sector = $self->get_key_for;
1891
1892     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
1893     $engine->storage->print_at( $spot,
1894         $engine->storage->read_at(
1895             $spot + $self->bucket_size,
1896             $self->bucket_size * ( $engine->max_buckets - $self->{idx} - 1 ),
1897         ),
1898         chr(0) x $self->bucket_size,
1899     );
1900
1901     $key_sector->free;
1902
1903     my $data_sector = $self->engine->_load_sector( $location );
1904     my $data = $data_sector->data;
1905     $data_sector->free;
1906
1907     return $data;
1908 }
1909
1910 sub get_data_location_for {
1911     my $self = shift;
1912     my ($args) = @_;
1913     $args ||= {};
1914
1915     $args->{allow_head} = 0 unless exists $args->{allow_head};
1916     $args->{trans_id}   = $self->engine->trans_id unless exists $args->{trans_id};
1917     $args->{idx}        = $self->{idx} unless exists $args->{idx};
1918
1919     my $e = $self->engine;
1920
1921     my $spot = $self->offset + $self->base_size
1922       + $args->{idx} * $self->bucket_size
1923       + $e->hash_size
1924       + $e->byte_size;
1925
1926     if ( $args->{trans_id} ) {
1927         $spot += $e->byte_size + ($args->{trans_id} - 1) * ( $e->byte_size + $STALE_SIZE );
1928     }
1929
1930     my $buffer = $e->storage->read_at(
1931         $spot,
1932         $e->byte_size + $STALE_SIZE,
1933     );
1934     my ($loc, $staleness) = unpack( $StP{$e->byte_size} . ' ' . $StP{$STALE_SIZE}, $buffer );
1935
1936     if ( $args->{trans_id} ) {
1937         # We have found an entry that is old, so get rid of it
1938         if ( $staleness != (my $s = $e->get_txn_staleness_counter( $args->{trans_id} ) ) ) {
1939             $e->storage->print_at(
1940                 $spot,
1941                 pack( $StP{$e->byte_size} . ' ' . $StP{$STALE_SIZE}, (0) x 2 ), 
1942             );
1943             $loc = 0;
1944         }
1945     }
1946
1947     # If we're in a transaction and we never wrote to this location, try the
1948     # HEAD instead.
1949     if ( $args->{trans_id} && !$loc && $args->{allow_head} ) {
1950         return $self->get_data_location_for({
1951             trans_id   => 0,
1952             allow_head => 1,
1953             idx        => $args->{idx},
1954         });
1955     }
1956     return $loc <= 1 ? 0 : $loc;
1957 }
1958
1959 sub get_data_for {
1960     my $self = shift;
1961     my ($args) = @_;
1962     $args ||= {};
1963
1964     return unless $self->{found};
1965     my $location = $self->get_data_location_for({
1966         allow_head => $args->{allow_head},
1967     });
1968     return $self->engine->_load_sector( $location );
1969 }
1970
1971 sub get_key_for {
1972     my $self = shift;
1973     my ($idx) = @_;
1974     $idx = $self->{idx} unless defined $idx;
1975
1976     if ( $idx >= $self->engine->max_buckets ) {
1977         DBM::Deep->_throw_error( "get_key_for(): Attempting to retrieve $idx" );
1978     }
1979
1980     my $location = $self->engine->storage->read_at(
1981         $self->offset + $self->base_size + $idx * $self->bucket_size + $self->engine->hash_size,
1982         $self->engine->byte_size,
1983     );
1984     $location = unpack( $StP{$self->engine->byte_size}, $location );
1985     DBM::Deep->_throw_error( "get_key_for: No location?" ) unless $location;
1986
1987     return $self->engine->_load_sector( $location );
1988 }
1989
1990 package DBM::Deep::Engine::Sector::Index;
1991
1992 our @ISA = qw( DBM::Deep::Engine::Sector );
1993
1994 sub _init {
1995     my $self = shift;
1996
1997     my $engine = $self->engine;
1998
1999     unless ( $self->offset ) {
2000         my $leftover = $self->size - $self->base_size;
2001
2002         $self->{offset} = $engine->_request_index_sector( $self->size );
2003         $engine->storage->print_at( $self->offset, $engine->SIG_INDEX ); # Sector type
2004         # Skip staleness counter
2005         $engine->storage->print_at( $self->offset + $self->base_size,
2006             chr(0) x $leftover, # Zero-fill the rest
2007         );
2008     }
2009
2010     return $self;
2011 }
2012
2013 #XXX Change here
2014 sub size {
2015     my $self = shift;
2016     unless ( $self->{size} ) {
2017         my $e = $self->engine;
2018         $self->{size} = $self->base_size + $e->byte_size * $e->hash_chars;
2019     }
2020     return $self->{size};
2021 }
2022
2023 sub free_meth { return '_add_free_index_sector' }
2024
2025 sub free {
2026     my $self = shift;
2027     my $e = $self->engine;
2028
2029     for my $i ( 0 .. $e->hash_chars - 1 ) {
2030         my $l = $self->get_entry( $i ) or next;
2031         $e->_load_sector( $l )->free;
2032     }
2033
2034     $self->SUPER::free();
2035 }
2036
2037 sub _loc_for {
2038     my $self = shift;
2039     my ($idx) = @_;
2040     return $self->offset + $self->base_size + $idx * $self->engine->byte_size;
2041 }
2042
2043 sub get_entry {
2044     my $self = shift;
2045     my ($idx) = @_;
2046
2047     my $e = $self->engine;
2048
2049     DBM::Deep->_throw_error( "get_entry: Out of range ($idx)" )
2050         if $idx < 0 || $idx >= $e->hash_chars;
2051
2052     return unpack(
2053         $StP{$e->byte_size},
2054         $e->storage->read_at( $self->_loc_for( $idx ), $e->byte_size ),
2055     );
2056 }
2057
2058 sub set_entry {
2059     my $self = shift;
2060     my ($idx, $loc) = @_;
2061
2062     my $e = $self->engine;
2063
2064     DBM::Deep->_throw_error( "set_entry: Out of range ($idx)" )
2065         if $idx < 0 || $idx >= $e->hash_chars;
2066
2067     $self->engine->storage->print_at(
2068         $self->_loc_for( $idx ),
2069         pack( $StP{$e->byte_size}, $loc ),
2070     );
2071 }
2072
2073 1;
2074 __END__