Change some defaults and the tests to match
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 use 5.006_000;
4
5 use strict;
6 use warnings;
7
8 our $VERSION = q(1.0000);
9
10 use Scalar::Util ();
11
12 # File-wide notes:
13 # * Every method in here assumes that the storage has been appropriately
14 #   safeguarded. This can be anything from flock() to some sort of manual
15 #   mutex. But, it's the caller's responsability to make sure that this has
16 #   been done.
17
18 # Setup file and tag signatures.  These should never change.
19 sub SIG_FILE     () { 'DPDB' }
20 sub SIG_HEADER   () { 'h'    }
21 sub SIG_HASH     () { 'H'    }
22 sub SIG_ARRAY    () { 'A'    }
23 sub SIG_NULL     () { 'N'    }
24 sub SIG_DATA     () { 'D'    }
25 sub SIG_INDEX    () { 'I'    }
26 sub SIG_BLIST    () { 'B'    }
27 sub SIG_FREE     () { 'F'    }
28 sub SIG_SIZE     () {  1     }
29 sub STALE_SIZE   () {  1     }
30
31 # Please refer to the pack() documentation for further information
32 my %StP = (
33     1 => 'C', # Unsigned char value (no order needed as it's just one byte)
34     2 => 'n', # Unsigned short in "network" (big-endian) order
35     4 => 'N', # Unsigned long in "network" (big-endian) order
36     8 => 'Q', # Usigned quad (no order specified, presumably machine-dependent)
37 );
38
39 ################################################################################
40
41 sub new {
42     my $class = shift;
43     my ($args) = @_;
44
45     my $self = bless {
46         byte_size   => 4,
47
48         digest      => undef,
49         hash_size   => 16,  # In bytes
50         hash_chars  => 256, # Number of chars the algorithm uses per byte
51         max_buckets => 16,
52         num_txns    => 1,   # The HEAD
53         trans_id    => 0,   # Default to the HEAD
54
55         data_sector_size => 64, # Size in bytes of each data sector
56
57         entries => {}, # This is the list of entries for transactions
58         storage => undef,
59     }, $class;
60
61     # Never allow byte_size to be set directly.
62     delete $args->{byte_size};
63     if ( defined $args->{pack_size} ) {
64         if ( lc $args->{pack_size} eq 'small' ) {
65             $args->{byte_size} = 2;
66         }
67         elsif ( lc $args->{pack_size} eq 'medium' ) {
68             $args->{byte_size} = 4;
69         }
70         elsif ( lc $args->{pack_size} eq 'large' ) {
71             $args->{byte_size} = 8;
72         }
73         else {
74             DBM::Deep->_throw_error( "Unknown pack_size value: '$args->{pack_size}'" );
75         }
76     }
77
78     # Grab the parameters we want to use
79     foreach my $param ( keys %$self ) {
80         next unless exists $args->{$param};
81         $self->{$param} = $args->{$param};
82     }
83
84     ##
85     # Number of buckets per blist before another level of indexing is
86     # done. Increase this value for slightly greater speed, but larger database
87     # files. DO NOT decrease this value below 16, due to risk of recursive
88     # reindex overrun.
89     ##
90     if (   !defined $self->{max_buckets}
91         || !length $self->{max_buckets}
92         || $self->{max_buckets} =~ /\D/
93         || $self->{max_buckets} < 16
94     ) {
95         $self->{max_buckets} = '(undef)' if !defined $self->{max_buckets};
96         warn "Floor of max_buckets is 16. Setting it to 16 from '$self->{max_buckets}'\n";
97         $self->{max_buckets} = 16;
98     }
99     elsif ( $self->{max_buckets} > 256 ) {
100         warn "Ceiling of max_buckets is 256. Setting it to 256 from '$self->{max_buckets}'\n";
101         $self->{max_buckets} = 256;
102     }
103
104     if (   !defined $self->{num_txns}
105         || !length $self->{num_txns}
106         || $self->{num_txns} =~ /\D/
107         || $self->{num_txns} < 1
108     ) {
109         $self->{num_txns} = '(undef)' if !defined $self->{num_txns};
110         warn "Floor of num_txns is 1. Setting it to 1 from '$self->{num_txns}'\n";
111         $self->{num_txns} = 1;
112     }
113     elsif ( $self->{num_txns} > 255 ) {
114         warn "Ceiling of num_txns is 255. Setting it to 255 from '$self->{num_txns}'\n";
115         $self->{num_txns} = 255;
116     }
117
118     if (   !defined $self->{data_sector_size}
119         || !length $self->{data_sector_size}
120         || $self->{data_sector_size} =~ /\D/
121         || $self->{data_sector_size} < 32
122     ) {
123         $self->{data_sector_size} = '(undef)' if !defined $self->{data_sector_size};
124         warn "Floor of data_sector_size is 32. Setting it to 32 from '$self->{data_sector_size}'\n";
125         $self->{data_sector_size} = 32;
126     }
127     elsif ( $self->{data_sector_size} > 256 ) {
128         warn "Ceiling of data_sector_size is 256. Setting it to 256 from '$self->{data_sector_size}'\n";
129         $self->{data_sector_size} = 256;
130     }
131
132     if ( !$self->{digest} ) {
133         require Digest::MD5;
134         $self->{digest} = \&Digest::MD5::md5;
135     }
136
137     return $self;
138 }
139
140 ################################################################################
141
142 sub read_value {
143     my $self = shift;
144     my ($obj, $key) = @_;
145
146     # This will be a Reference sector
147     my $sector = $self->_load_sector( $obj->_base_offset )
148         or return;
149
150     if ( $sector->staleness != $obj->_staleness ) {
151         return;
152     }
153
154     my $key_md5 = $self->_apply_digest( $key );
155
156     my $value_sector = $sector->get_data_for({
157         key_md5    => $key_md5,
158         allow_head => 1,
159     });
160
161     unless ( $value_sector ) {
162         $value_sector = DBM::Deep::Engine::Sector::Null->new({
163             engine => $self,
164             data   => undef,
165         });
166
167         $sector->write_data({
168             key_md5 => $key_md5,
169             key     => $key,
170             value   => $value_sector,
171         });
172     }
173
174     return $value_sector->data;
175 }
176
177 sub get_classname {
178     my $self = shift;
179     my ($obj) = @_;
180
181     # This will be a Reference sector
182     my $sector = $self->_load_sector( $obj->_base_offset )
183         or DBM::Deep->_throw_error( "How did get_classname fail (no sector for '$obj')?!" );
184
185     if ( $sector->staleness != $obj->_staleness ) {
186         return;
187     }
188
189     return $sector->get_classname;
190 }
191
192 sub key_exists {
193     my $self = shift;
194     my ($obj, $key) = @_;
195
196     # This will be a Reference sector
197     my $sector = $self->_load_sector( $obj->_base_offset )
198         or return '';
199
200     if ( $sector->staleness != $obj->_staleness ) {
201         return '';
202     }
203
204     my $data = $sector->get_data_for({
205         key_md5    => $self->_apply_digest( $key ),
206         allow_head => 1,
207     });
208
209     # exists() returns 1 or '' for true/false.
210     return $data ? 1 : '';
211 }
212
213 sub delete_key {
214     my $self = shift;
215     my ($obj, $key) = @_;
216
217     my $sector = $self->_load_sector( $obj->_base_offset )
218         or return;
219
220     if ( $sector->staleness != $obj->_staleness ) {
221         return;
222     }
223
224     return $sector->delete_key({
225         key_md5    => $self->_apply_digest( $key ),
226         allow_head => 0,
227     });
228 }
229
230 sub write_value {
231     my $self = shift;
232     my ($obj, $key, $value) = @_;
233
234     my $r = Scalar::Util::reftype( $value ) || '';
235     {
236         last if $r eq '';
237         last if $r eq 'HASH';
238         last if $r eq 'ARRAY';
239
240         DBM::Deep->_throw_error(
241             "Storage of references of type '$r' is not supported."
242         );
243     }
244
245     my ($class, $type);
246     if ( !defined $value ) {
247         $class = 'DBM::Deep::Engine::Sector::Null';
248     }
249     elsif ( $r eq 'ARRAY' || $r eq 'HASH' ) {
250         if ( $r eq 'ARRAY' && tied(@$value) ) {
251             DBM::Deep->_throw_error( "Cannot store something that is tied." );
252         }
253         if ( $r eq 'HASH' && tied(%$value) ) {
254             DBM::Deep->_throw_error( "Cannot store something that is tied." );
255         }
256         $class = 'DBM::Deep::Engine::Sector::Reference';
257         $type = substr( $r, 0, 1 );
258     }
259     else {
260         $class = 'DBM::Deep::Engine::Sector::Scalar';
261     }
262
263     # This will be a Reference sector
264     my $sector = $self->_load_sector( $obj->_base_offset )
265         or DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep." );
266
267     if ( $sector->staleness != $obj->_staleness ) {
268         DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep.n" );
269     }
270
271     # Create this after loading the reference sector in case something bad happens.
272     # This way, we won't allocate value sector(s) needlessly.
273     my $value_sector = $class->new({
274         engine => $self,
275         data   => $value,
276         type   => $type,
277     });
278
279     $sector->write_data({
280         key     => $key,
281         key_md5 => $self->_apply_digest( $key ),
282         value   => $value_sector,
283     });
284
285     # This code is to make sure we write all the values in the $value to the disk
286     # and to make sure all changes to $value after the assignment are reflected
287     # on disk. This may be counter-intuitive at first, but it is correct dwimmery.
288     #   NOTE - simply tying $value won't perform a STORE on each value. Hence, the
289     # copy to a temp value.
290     if ( $r eq 'ARRAY' ) {
291         my @temp = @$value;
292         tie @$value, 'DBM::Deep', {
293             base_offset => $value_sector->offset,
294             staleness   => $value_sector->staleness,
295             storage     => $self->storage,
296             engine      => $self,
297         };
298         @$value = @temp;
299         bless $value, 'DBM::Deep::Array' unless Scalar::Util::blessed( $value );
300     }
301     elsif ( $r eq 'HASH' ) {
302         my %temp = %$value;
303         tie %$value, 'DBM::Deep', {
304             base_offset => $value_sector->offset,
305             staleness   => $value_sector->staleness,
306             storage     => $self->storage,
307             engine      => $self,
308         };
309
310         %$value = %temp;
311         bless $value, 'DBM::Deep::Hash' unless Scalar::Util::blessed( $value );
312     }
313
314     return 1;
315 }
316
317 # XXX Add staleness here
318 sub get_next_key {
319     my $self = shift;
320     my ($obj, $prev_key) = @_;
321
322     # XXX Need to add logic about resetting the iterator if any key in the reference has changed
323     unless ( $prev_key ) {
324         $obj->{iterator} = DBM::Deep::Iterator->new({
325             base_offset => $obj->_base_offset,
326             engine      => $self,
327         });
328     }
329
330     return $obj->{iterator}->get_next_key( $obj );
331 }
332
333 ################################################################################
334
335 sub setup_fh {
336     my $self = shift;
337     my ($obj) = @_;
338
339     # We're opening the file.
340     unless ( $obj->_base_offset ) {
341         my $bytes_read = $self->_read_file_header;
342
343         # Creating a new file
344         unless ( $bytes_read ) {
345             $self->_write_file_header;
346
347             # 1) Create Array/Hash entry
348             my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
349                 engine => $self,
350                 type   => $obj->_type,
351             });
352             $obj->{base_offset} = $initial_reference->offset;
353             $obj->{staleness} = $initial_reference->staleness;
354
355             $self->storage->flush;
356         }
357         # Reading from an existing file
358         else {
359             $obj->{base_offset} = $bytes_read;
360             my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
361                 engine => $self,
362                 offset => $obj->_base_offset,
363             });
364             unless ( $initial_reference ) {
365                 DBM::Deep->_throw_error("Corrupted file, no master index record");
366             }
367
368             unless ($obj->_type eq $initial_reference->type) {
369                 DBM::Deep->_throw_error("File type mismatch");
370             }
371
372             $obj->{staleness} = $initial_reference->staleness;
373         }
374     }
375
376     return 1;
377 }
378
379 sub begin_work {
380     my $self = shift;
381     my ($obj) = @_;
382
383     if ( $self->trans_id ) {
384         DBM::Deep->_throw_error( "Cannot begin_work within an active transaction" );
385     }
386
387     my @slots = $self->read_txn_slots;
388     for my $i ( 1 .. @slots ) {
389         next if $slots[$i];
390         $slots[$i] = 1;
391         $self->set_trans_id( $i );
392         last;
393     }
394     $self->write_txn_slots( @slots );
395
396     if ( !$self->trans_id ) {
397         DBM::Deep->_throw_error( "Cannot begin_work - no available transactions" );
398     }
399
400     return;
401 }
402
403 sub rollback {
404     my $self = shift;
405     my ($obj) = @_;
406
407     if ( !$self->trans_id ) {
408         DBM::Deep->_throw_error( "Cannot rollback without an active transaction" );
409     }
410
411     # Each entry is the file location for a bucket that has a modification for
412     # this transaction. The entries need to be expunged.
413     foreach my $entry (@{ $self->get_entries } ) {
414         # Remove the entry here
415         my $read_loc = $entry
416           + $self->hash_size
417           + $self->byte_size
418           + $self->trans_id * ( $self->byte_size + 4 );
419
420         my $data_loc = $self->storage->read_at( $read_loc, $self->byte_size );
421         $data_loc = unpack( $StP{$self->byte_size}, $data_loc );
422         $self->storage->print_at( $read_loc, pack( $StP{$self->byte_size}, 0 ) );
423
424         if ( $data_loc > 1 ) {
425             $self->_load_sector( $data_loc )->free;
426         }
427     }
428
429     $self->clear_entries;
430
431     my @slots = $self->read_txn_slots;
432     $slots[$self->trans_id] = 0;
433     $self->write_txn_slots( @slots );
434     $self->inc_txn_staleness_counter( $self->trans_id );
435     $self->set_trans_id( 0 );
436
437     return 1;
438 }
439
440 sub commit {
441     my $self = shift;
442     my ($obj) = @_;
443
444     if ( !$self->trans_id ) {
445         DBM::Deep->_throw_error( "Cannot commit without an active transaction" );
446     }
447
448     foreach my $entry (@{ $self->get_entries } ) {
449         # Overwrite the entry in head with the entry in trans_id
450         my $base = $entry
451           + $self->hash_size
452           + $self->byte_size;
453
454         my $head_loc = $self->storage->read_at( $base, $self->byte_size );
455         $head_loc = unpack( $StP{$self->byte_size}, $head_loc );
456         my $trans_loc = $self->storage->read_at(
457             $base + $self->trans_id * ( $self->byte_size + 4 ), $self->byte_size,
458         );
459
460         $self->storage->print_at( $base, $trans_loc );
461         $self->storage->print_at(
462             $base + $self->trans_id * ( $self->byte_size + 4 ),
463             pack( $StP{$self->byte_size} . ' N', (0) x 2 ),
464         );
465
466         if ( $head_loc > 1 ) {
467             $self->_load_sector( $head_loc )->free;
468         }
469     }
470
471     $self->clear_entries;
472
473     my @slots = $self->read_txn_slots;
474     $slots[$self->trans_id] = 0;
475     $self->write_txn_slots( @slots );
476     $self->inc_txn_staleness_counter( $self->trans_id );
477     $self->set_trans_id( 0 );
478
479     return 1;
480 }
481
482 sub read_txn_slots {
483     my $self = shift;
484     return split '', unpack( 'b32',
485         $self->storage->read_at(
486             $self->trans_loc, 4,
487         )
488     );
489 }
490
491 sub write_txn_slots {
492     my $self = shift;
493     $self->storage->print_at( $self->trans_loc,
494         pack( 'b32', join('', @_) ),
495     );
496 }
497
498 sub get_running_txn_ids {
499     my $self = shift;
500     my @transactions = $self->read_txn_slots;
501     my @trans_ids = grep { $transactions[$_] } 0 .. $#transactions;
502 }
503
504 sub get_txn_staleness_counter {
505     my $self = shift;
506     my ($trans_id) = @_;
507
508     # Hardcode staleness of 0 for the HEAD
509     return 0 unless $trans_id;
510
511     my $x = unpack( 'N',
512         $self->storage->read_at(
513             $self->trans_loc + 4 * $trans_id,
514             4,
515         )
516     );
517     return $x;
518 }
519
520 sub inc_txn_staleness_counter {
521     my $self = shift;
522     my ($trans_id) = @_;
523
524     # Hardcode staleness of 0 for the HEAD
525     return unless $trans_id;
526
527     $self->storage->print_at(
528         $self->trans_loc + 4 * $trans_id,
529         pack( 'N', $self->get_txn_staleness_counter( $trans_id ) + 1 ),
530     );
531 }
532
533 sub get_entries {
534     my $self = shift;
535     return [ keys %{ $self->{entries}{$self->trans_id} ||= {} } ];
536 }
537
538 sub add_entry {
539     my $self = shift;
540     my ($trans_id, $loc) = @_;
541
542     $self->{entries}{$trans_id} ||= {};
543     $self->{entries}{$trans_id}{$loc} = undef;
544 }
545
546 # If the buckets are being relocated because of a reindexing, the entries
547 # mechanism needs to be made aware of it.
548 sub reindex_entry {
549     my $self = shift;
550     my ($old_loc, $new_loc) = @_;
551
552     TRANS:
553     while ( my ($trans_id, $locs) = each %{ $self->{entries} } ) {
554         foreach my $orig_loc ( keys %{ $locs } ) {
555             if ( $orig_loc == $old_loc ) {
556                 delete $locs->{orig_loc};
557                 $locs->{$new_loc} = undef;
558                 next TRANS;
559             }
560         }
561     }
562 }
563
564 sub clear_entries {
565     my $self = shift;
566     delete $self->{entries}{$self->trans_id};
567 }
568
569 ################################################################################
570
571 {
572     my $header_fixed = length( SIG_FILE ) + 1 + 4 + 4;
573     my $this_file_version = 2;
574
575     sub _write_file_header {
576         my $self = shift;
577
578         my $nt = $self->num_txns;
579
580         my $header_var = 1 + 1 + 1 + 1 + 4 + 4 * $nt + 3 * $self->byte_size;
581
582         my $loc = $self->storage->request_space( $header_fixed + $header_var );
583
584         $self->storage->print_at( $loc,
585             SIG_FILE,
586             SIG_HEADER,
587             pack('N', $this_file_version), # At this point, we're at 9 bytes
588             pack('N', $header_var),        # header size
589             # --- Above is $header_fixed. Below is $header_var
590             pack('C', $self->byte_size),
591
592             # These shenanigans are to allow a 256 within a C
593             pack('C', $self->max_buckets - 1),
594             pack('C', $self->data_sector_size - 1),
595
596             pack('C', $nt),
597             pack('N', 0 ),                   # Transaction activeness bitfield
598             pack('N' . $nt, 0 x $nt ),       # Transaction staleness counters
599             pack($StP{$self->byte_size}, 0), # Start of free chain (blist size)
600             pack($StP{$self->byte_size}, 0), # Start of free chain (data size)
601             pack($StP{$self->byte_size}, 0), # Start of free chain (index size)
602         );
603
604         #XXX Set these less fragilely
605         $self->set_trans_loc( $header_fixed + 4 );
606         $self->set_chains_loc( $header_fixed + 4 + 4 + 4 * $nt );
607
608         return;
609     }
610
611     sub _read_file_header {
612         my $self = shift;
613
614         my $buffer = $self->storage->read_at( 0, $header_fixed );
615         return unless length($buffer);
616
617         my ($file_signature, $sig_header, $file_version, $size) = unpack(
618             'A4 A N N', $buffer
619         );
620
621         unless ( $file_signature eq SIG_FILE ) {
622             $self->storage->close;
623             DBM::Deep->_throw_error( "Signature not found -- file is not a Deep DB" );
624         }
625
626         unless ( $sig_header eq SIG_HEADER ) {
627             $self->storage->close;
628             DBM::Deep->_throw_error( "Pre-1.00 file version found" );
629         }
630
631         unless ( $file_version == $this_file_version ) {
632             $self->storage->close;
633             DBM::Deep->_throw_error(
634                 "Wrong file version found - " .  $file_version .
635                 " - expected " . $this_file_version
636             );
637         }
638
639         my $buffer2 = $self->storage->read_at( undef, $size );
640         my @values = unpack( 'C C C C', $buffer2 );
641
642         if ( @values != 4 || grep { !defined } @values ) {
643             $self->storage->close;
644             DBM::Deep->_throw_error("Corrupted file - bad header");
645         }
646
647         $self->set_trans_loc( $header_fixed + scalar(@values) );
648         $self->set_chains_loc( $header_fixed + scalar(@values) + 4 + 4 * $self->num_txns );
649
650         #XXX Add warnings if values weren't set right
651         @{$self}{qw(byte_size max_buckets data_sector_size num_txns)} = @values;
652
653         # These shenangians are to allow a 256 within a C
654         $self->{max_buckets} += 1;
655         $self->{data_sector_size} += 1;
656
657         my $header_var = scalar(@values) + 4 + 4 * $self->num_txns + 3 * $self->byte_size;
658         unless ( $size == $header_var ) {
659             $self->storage->close;
660             DBM::Deep->_throw_error( "Unexpected size found ($size <-> $header_var)." );
661         }
662
663         return length($buffer) + length($buffer2);
664     }
665 }
666
667 sub _load_sector {
668     my $self = shift;
669     my ($offset) = @_;
670
671     # Add a catch for offset of 0 or 1
672     return if $offset <= 1;
673
674     my $type = $self->storage->read_at( $offset, 1 );
675     return if $type eq chr(0);
676
677     if ( $type eq $self->SIG_ARRAY || $type eq $self->SIG_HASH ) {
678         return DBM::Deep::Engine::Sector::Reference->new({
679             engine => $self,
680             type   => $type,
681             offset => $offset,
682         });
683     }
684     # XXX Don't we need key_md5 here?
685     elsif ( $type eq $self->SIG_BLIST ) {
686         return DBM::Deep::Engine::Sector::BucketList->new({
687             engine => $self,
688             type   => $type,
689             offset => $offset,
690         });
691     }
692     elsif ( $type eq $self->SIG_INDEX ) {
693         return DBM::Deep::Engine::Sector::Index->new({
694             engine => $self,
695             type   => $type,
696             offset => $offset,
697         });
698     }
699     elsif ( $type eq $self->SIG_NULL ) {
700         return DBM::Deep::Engine::Sector::Null->new({
701             engine => $self,
702             type   => $type,
703             offset => $offset,
704         });
705     }
706     elsif ( $type eq $self->SIG_DATA ) {
707         return DBM::Deep::Engine::Sector::Scalar->new({
708             engine => $self,
709             type   => $type,
710             offset => $offset,
711         });
712     }
713     # This was deleted from under us, so just return and let the caller figure it out.
714     elsif ( $type eq $self->SIG_FREE ) {
715         return;
716     }
717
718     DBM::Deep->_throw_error( "'$offset': Don't know what to do with type '$type'" );
719 }
720
721 sub _apply_digest {
722     my $self = shift;
723     return $self->{digest}->(@_);
724 }
725
726 sub _add_free_blist_sector { shift->_add_free_sector( 0, @_ ) }
727 sub _add_free_data_sector { shift->_add_free_sector( 1, @_ ) }
728 sub _add_free_index_sector { shift->_add_free_sector( 2, @_ ) }
729
730 sub _add_free_sector {
731     my $self = shift;
732     my ($multiple, $offset, $size) = @_;
733
734     my $chains_offset = $multiple * $self->byte_size;
735
736     my $storage = $self->storage;
737
738     # Increment staleness.
739     # XXX Can this increment+modulo be done by "&= 0x1" ?
740     my $staleness = unpack( $StP{STALE_SIZE()}, $storage->read_at( $offset + SIG_SIZE, STALE_SIZE ) );
741     $staleness = ($staleness + 1 ) % ( 2 ** ( 8 * STALE_SIZE ) );
742     $storage->print_at( $offset + SIG_SIZE, pack( $StP{STALE_SIZE()}, $staleness ) );
743
744     my $old_head = $storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
745
746     $storage->print_at( $self->chains_loc + $chains_offset,
747         pack( $StP{$self->byte_size}, $offset ),
748     );
749
750     # Record the old head in the new sector after the signature and staleness counter
751     $storage->print_at( $offset + SIG_SIZE + STALE_SIZE, $old_head );
752 }
753
754 sub _request_blist_sector { shift->_request_sector( 0, @_ ) }
755 sub _request_data_sector { shift->_request_sector( 1, @_ ) }
756 sub _request_index_sector { shift->_request_sector( 2, @_ ) }
757
758 sub _request_sector {
759     my $self = shift;
760     my ($multiple, $size) = @_;
761
762     my $chains_offset = $multiple * $self->byte_size;
763
764     my $old_head = $self->storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
765     my $loc = unpack( $StP{$self->byte_size}, $old_head );
766
767     # We don't have any free sectors of the right size, so allocate a new one.
768     unless ( $loc ) {
769         my $offset = $self->storage->request_space( $size );
770
771         # Zero out the new sector. This also guarantees correct increases
772         # in the filesize.
773         $self->storage->print_at( $offset, chr(0) x $size );
774
775         return $offset;
776     }
777
778     # Read the new head after the signature and the staleness counter
779     my $new_head = $self->storage->read_at( $loc + SIG_SIZE + STALE_SIZE, $self->byte_size );
780     $self->storage->print_at( $self->chains_loc + $chains_offset, $new_head );
781     $self->storage->print_at(
782         $loc + SIG_SIZE + STALE_SIZE,
783         pack( $StP{$self->byte_size}, 0 ),
784     );
785
786     return $loc;
787 }
788
789 ################################################################################
790
791 sub storage     { $_[0]{storage} }
792 sub byte_size   { $_[0]{byte_size} }
793 sub hash_size   { $_[0]{hash_size} }
794 sub hash_chars  { $_[0]{hash_chars} }
795 sub num_txns    { $_[0]{num_txns} }
796 sub max_buckets { $_[0]{max_buckets} }
797 sub blank_md5   { chr(0) x $_[0]->hash_size }
798 sub data_sector_size { $_[0]{data_sector_size} }
799
800 sub trans_id     { $_[0]{trans_id} }
801 sub set_trans_id { $_[0]{trans_id} = $_[1] }
802
803 sub trans_loc     { $_[0]{trans_loc} }
804 sub set_trans_loc { $_[0]{trans_loc} = $_[1] }
805
806 sub chains_loc     { $_[0]{chains_loc} }
807 sub set_chains_loc { $_[0]{chains_loc} = $_[1] }
808
809 ################################################################################
810
811 package DBM::Deep::Iterator;
812
813 sub new {
814     my $class = shift;
815     my ($args) = @_;
816
817     my $self = bless {
818         breadcrumbs => [],
819         engine      => $args->{engine},
820         base_offset => $args->{base_offset},
821     }, $class;
822
823     Scalar::Util::weaken( $self->{engine} );
824
825     return $self;
826 }
827
828 sub reset { $_[0]{breadcrumbs} = [] }
829
830 sub get_sector_iterator {
831     my $self = shift;
832     my ($loc) = @_;
833
834     my $sector = $self->{engine}->_load_sector( $loc )
835         or return;
836
837     if ( $sector->isa( 'DBM::Deep::Engine::Sector::Index' ) ) {
838         return DBM::Deep::Iterator::Index->new({
839             iterator => $self,
840             sector   => $sector,
841         });
842     }
843     elsif ( $sector->isa( 'DBM::Deep::Engine::Sector::BucketList' ) ) {
844         return DBM::Deep::Iterator::BucketList->new({
845             iterator => $self,
846             sector   => $sector,
847         });
848     }
849
850     DBM::Deep->_throw_error( "get_sector_iterator(): Why did $loc make a $sector?" );
851 }
852
853 sub get_next_key {
854     my $self = shift;
855     my ($obj) = @_;
856
857     my $crumbs = $self->{breadcrumbs};
858     my $e = $self->{engine};
859
860     unless ( @$crumbs ) {
861         # This will be a Reference sector
862         my $sector = $e->_load_sector( $self->{base_offset} )
863             # If no sector is found, thist must have been deleted from under us.
864             or return;
865
866         if ( $sector->staleness != $obj->_staleness ) {
867             return;
868         }
869
870         my $loc = $sector->get_blist_loc
871             or return;
872
873         push @$crumbs, $self->get_sector_iterator( $loc );
874     }
875
876     FIND_NEXT_KEY: {
877         # We're at the end.
878         unless ( @$crumbs ) {
879             $self->reset;
880             return;
881         }
882
883         my $iterator = $crumbs->[-1];
884
885         # This level is done.
886         if ( $iterator->at_end ) {
887             pop @$crumbs;
888             redo FIND_NEXT_KEY;
889         }
890
891         if ( $iterator->isa( 'DBM::Deep::Iterator::Index' ) ) {
892             # If we don't have any more, it will be caught at the
893             # prior check.
894             if ( my $next = $iterator->get_next_iterator ) {
895                 push @$crumbs, $next;
896             }
897             redo FIND_NEXT_KEY;
898         }
899
900         unless ( $iterator->isa( 'DBM::Deep::Iterator::BucketList' ) ) {
901             DBM::Deep->_throw_error(
902                 "Should have a bucketlist iterator here - instead have $iterator"
903             );
904         }
905
906         # At this point, we have a BucketList iterator
907         my $key = $iterator->get_next_key;
908         if ( defined $key ) {
909             return $key;
910         }
911         #XXX else { $iterator->set_to_end() } ?
912
913         # We hit the end of the bucketlist iterator, so redo
914         redo FIND_NEXT_KEY;
915     }
916
917     DBM::Deep->_throw_error( "get_next_key(): How did we get here?" );
918 }
919
920 package DBM::Deep::Iterator::Index;
921
922 sub new {
923     my $self = bless $_[1] => $_[0];
924     $self->{curr_index} = 0;
925     return $self;
926 }
927
928 sub at_end {
929     my $self = shift;
930     return $self->{curr_index} >= $self->{iterator}{engine}->hash_chars;
931 }
932
933 sub get_next_iterator {
934     my $self = shift;
935
936     my $loc;
937     while ( !$loc ) {
938         return if $self->at_end;
939         $loc = $self->{sector}->get_entry( $self->{curr_index}++ );
940     }
941
942     return $self->{iterator}->get_sector_iterator( $loc );
943 }
944
945 package DBM::Deep::Iterator::BucketList;
946
947 sub new {
948     my $self = bless $_[1] => $_[0];
949     $self->{curr_index} = 0;
950     return $self;
951 }
952
953 sub at_end {
954     my $self = shift;
955     return $self->{curr_index} >= $self->{iterator}{engine}->max_buckets;
956 }
957
958 sub get_next_key {
959     my $self = shift;
960
961     return if $self->at_end;
962
963     my $idx = $self->{curr_index}++;
964
965     my $data_loc = $self->{sector}->get_data_location_for({
966         allow_head => 1,
967         idx        => $idx,
968     }) or return;
969
970     #XXX Do we want to add corruption checks here?
971     return $self->{sector}->get_key_for( $idx )->data;
972 }
973
974 package DBM::Deep::Engine::Sector;
975
976 sub new {
977     my $self = bless $_[1], $_[0];
978     Scalar::Util::weaken( $self->{engine} );
979     $self->_init;
980     return $self;
981 }
982
983 #sub _init {}
984 #sub clone { DBM::Deep->_throw_error( "Must be implemented in the child class" ); }
985
986 sub engine { $_[0]{engine} }
987 sub offset { $_[0]{offset} }
988 sub type   { $_[0]{type} }
989
990 sub base_size {
991    my $self = shift;
992    return $self->engine->SIG_SIZE + $self->engine->STALE_SIZE;
993 }
994
995 sub free {
996     my $self = shift;
997
998     my $e = $self->engine;
999
1000     $e->storage->print_at( $self->offset, $e->SIG_FREE );
1001     # Skip staleness counter
1002     $e->storage->print_at( $self->offset + $self->base_size,
1003         chr(0) x ($self->size - $self->base_size),
1004     );
1005
1006     my $free_meth = $self->free_meth;
1007     $e->$free_meth( $self->offset, $self->size );
1008
1009     return;
1010 }
1011
1012 package DBM::Deep::Engine::Sector::Data;
1013
1014 our @ISA = qw( DBM::Deep::Engine::Sector );
1015
1016 # This is in bytes
1017 sub size { $_[0]{engine}->data_sector_size }
1018 sub free_meth { return '_add_free_data_sector' }
1019
1020 sub clone {
1021     my $self = shift;
1022     return ref($self)->new({
1023         engine => $self->engine,
1024         data   => $self->data,
1025         type   => $self->type,
1026     });
1027 }
1028
1029 package DBM::Deep::Engine::Sector::Scalar;
1030
1031 our @ISA = qw( DBM::Deep::Engine::Sector::Data );
1032
1033 sub free {
1034     my $self = shift;
1035
1036     my $chain_loc = $self->chain_loc;
1037
1038     $self->SUPER::free();
1039
1040     if ( $chain_loc ) {
1041         $self->engine->_load_sector( $chain_loc )->free;
1042     }
1043
1044     return;
1045 }
1046
1047 sub type { $_[0]{engine}->SIG_DATA }
1048 sub _init {
1049     my $self = shift;
1050
1051     my $engine = $self->engine;
1052
1053     unless ( $self->offset ) {
1054         my $data_section = $self->size - $self->base_size - 1 * $engine->byte_size - 1;
1055
1056         $self->{offset} = $engine->_request_data_sector( $self->size );
1057
1058         my $data = delete $self->{data};
1059         my $dlen = length $data;
1060         my $continue = 1;
1061         my $curr_offset = $self->offset;
1062         while ( $continue ) {
1063
1064             my $next_offset = 0;
1065
1066             my ($leftover, $this_len, $chunk);
1067             if ( $dlen > $data_section ) {
1068                 $leftover = 0;
1069                 $this_len = $data_section;
1070                 $chunk = substr( $data, 0, $this_len );
1071
1072                 $dlen -= $data_section;
1073                 $next_offset = $engine->_request_data_sector( $self->size );
1074                 $data = substr( $data, $this_len );
1075             }
1076             else {
1077                 $leftover = $data_section - $dlen;
1078                 $this_len = $dlen;
1079                 $chunk = $data;
1080
1081                 $continue = 0;
1082             }
1083
1084             $engine->storage->print_at( $curr_offset, $self->type ); # Sector type
1085             # Skip staleness
1086             $engine->storage->print_at( $curr_offset + $self->base_size,
1087                 pack( $StP{$engine->byte_size}, $next_offset ),  # Chain loc
1088                 pack( $StP{1}, $this_len ),                      # Data length
1089                 $chunk,                                          # Data to be stored in this sector
1090                 chr(0) x $leftover,                              # Zero-fill the rest
1091             );
1092
1093             $curr_offset = $next_offset;
1094         }
1095
1096         return;
1097     }
1098 }
1099
1100 sub data_length {
1101     my $self = shift;
1102
1103     my $buffer = $self->engine->storage->read_at(
1104         $self->offset + $self->base_size + $self->engine->byte_size, 1
1105     );
1106
1107     return unpack( $StP{1}, $buffer );
1108 }
1109
1110 sub chain_loc {
1111     my $self = shift;
1112     return unpack(
1113         $StP{$self->engine->byte_size},
1114         $self->engine->storage->read_at(
1115             $self->offset + $self->base_size,
1116             $self->engine->byte_size,
1117         ),
1118     );
1119 }
1120
1121 sub data {
1122     my $self = shift;
1123
1124     my $data;
1125     while ( 1 ) {
1126         my $chain_loc = $self->chain_loc;
1127
1128         $data .= $self->engine->storage->read_at(
1129             $self->offset + $self->base_size + $self->engine->byte_size + 1, $self->data_length,
1130         );
1131
1132         last unless $chain_loc;
1133
1134         $self = $self->engine->_load_sector( $chain_loc );
1135     }
1136
1137     return $data;
1138 }
1139
1140 package DBM::Deep::Engine::Sector::Null;
1141
1142 our @ISA = qw( DBM::Deep::Engine::Sector::Data );
1143
1144 sub type { $_[0]{engine}->SIG_NULL }
1145 sub data_length { 0 }
1146 sub data { return }
1147
1148 sub _init {
1149     my $self = shift;
1150
1151     my $engine = $self->engine;
1152
1153     unless ( $self->offset ) {
1154         my $leftover = $self->size - $self->base_size - 1 * $engine->byte_size - 1;
1155
1156         $self->{offset} = $engine->_request_data_sector( $self->size );
1157         $engine->storage->print_at( $self->offset, $self->type ); # Sector type
1158         # Skip staleness counter
1159         $engine->storage->print_at( $self->offset + $self->base_size,
1160             pack( $StP{$engine->byte_size}, 0 ),  # Chain loc
1161             pack( $StP{1}, $self->data_length ),  # Data length
1162             chr(0) x $leftover,                   # Zero-fill the rest
1163         );
1164
1165         return;
1166     }
1167 }
1168
1169 package DBM::Deep::Engine::Sector::Reference;
1170
1171 our @ISA = qw( DBM::Deep::Engine::Sector::Data );
1172
1173 sub _init {
1174     my $self = shift;
1175
1176     my $e = $self->engine;
1177
1178     unless ( $self->offset ) {
1179         my $classname = Scalar::Util::blessed( delete $self->{data} );
1180         my $leftover = $self->size - $self->base_size - 2 * $e->byte_size;
1181
1182         my $class_offset = 0;
1183         if ( defined $classname ) {
1184             my $class_sector = DBM::Deep::Engine::Sector::Scalar->new({
1185                 engine => $e,
1186                 data   => $classname,
1187             });
1188             $class_offset = $class_sector->offset;
1189         }
1190
1191         $self->{offset} = $e->_request_data_sector( $self->size );
1192         $e->storage->print_at( $self->offset, $self->type ); # Sector type
1193         # Skip staleness counter
1194         $e->storage->print_at( $self->offset + $self->base_size,
1195             pack( $StP{$e->byte_size}, 0 ),             # Index/BList loc
1196             pack( $StP{$e->byte_size}, $class_offset ), # Classname loc
1197             chr(0) x $leftover,                         # Zero-fill the rest
1198         );
1199     }
1200     else {
1201         $self->{type} = $e->storage->read_at( $self->offset, 1 );
1202     }
1203
1204     $self->{staleness} = unpack(
1205         $StP{$e->STALE_SIZE},
1206         $e->storage->read_at( $self->offset + $e->SIG_SIZE, $e->STALE_SIZE ),
1207     );
1208
1209     return;
1210 }
1211
1212 sub free {
1213     my $self = shift;
1214
1215     my $blist_loc = $self->get_blist_loc;
1216     $self->engine->_load_sector( $blist_loc )->free if $blist_loc;
1217
1218     my $class_loc = $self->get_class_offset;
1219     $self->engine->_load_sector( $class_loc )->free if $class_loc;
1220
1221     $self->SUPER::free();
1222 }
1223
1224 sub staleness { $_[0]{staleness} }
1225
1226 sub get_data_for {
1227     my $self = shift;
1228     my ($args) = @_;
1229
1230     # Assume that the head is not allowed unless otherwise specified.
1231     $args->{allow_head} = 0 unless exists $args->{allow_head};
1232
1233     # Assume we don't create a new blist location unless otherwise specified.
1234     $args->{create} = 0 unless exists $args->{create};
1235
1236     my $blist = $self->get_bucket_list({
1237         key_md5 => $args->{key_md5},
1238         key => $args->{key},
1239         create  => $args->{create},
1240     });
1241     return unless $blist && $blist->{found};
1242
1243     # At this point, $blist knows where the md5 is. What it -doesn't- know yet
1244     # is whether or not this transaction has this key. That's part of the next
1245     # function call.
1246     my $location = $blist->get_data_location_for({
1247         allow_head => $args->{allow_head},
1248     }) or return;
1249
1250     return $self->engine->_load_sector( $location );
1251 }
1252
1253 sub write_data {
1254     my $self = shift;
1255     my ($args) = @_;
1256
1257     my $blist = $self->get_bucket_list({
1258         key_md5 => $args->{key_md5},
1259         key => $args->{key},
1260         create  => 1,
1261     }) or DBM::Deep->_throw_error( "How did write_data fail (no blist)?!" );
1262
1263     # Handle any transactional bookkeeping.
1264     if ( $self->engine->trans_id ) {
1265         if ( ! $blist->has_md5 ) {
1266             $blist->mark_deleted({
1267                 trans_id => 0,
1268             });
1269         }
1270     }
1271     else {
1272         my @trans_ids = $self->engine->get_running_txn_ids;
1273         if ( $blist->has_md5 ) {
1274             if ( @trans_ids ) {
1275                 my $old_value = $blist->get_data_for;
1276                 foreach my $other_trans_id ( @trans_ids ) {
1277                     next if $blist->get_data_location_for({
1278                         trans_id   => $other_trans_id,
1279                         allow_head => 0,
1280                     });
1281                     $blist->write_md5({
1282                         trans_id => $other_trans_id,
1283                         key      => $args->{key},
1284                         key_md5  => $args->{key_md5},
1285                         value    => $old_value->clone,
1286                     });
1287                 }
1288             }
1289         }
1290         else {
1291             if ( @trans_ids ) {
1292                 foreach my $other_trans_id ( @trans_ids ) {
1293                     #XXX This doesn't seem to possible to ever happen . . .
1294                     next if $blist->get_data_location_for({ trans_id => $other_trans_id, allow_head => 0 });
1295                     $blist->mark_deleted({
1296                         trans_id => $other_trans_id,
1297                     });
1298                 }
1299             }
1300         }
1301     }
1302
1303     #XXX Is this safe to do transactionally?
1304     # Free the place we're about to write to.
1305     if ( $blist->get_data_location_for({ allow_head => 0 }) ) {
1306         $blist->get_data_for({ allow_head => 0 })->free;
1307     }
1308
1309     $blist->write_md5({
1310         key      => $args->{key},
1311         key_md5  => $args->{key_md5},
1312         value    => $args->{value},
1313     });
1314 }
1315
1316 sub delete_key {
1317     my $self = shift;
1318     my ($args) = @_;
1319
1320     # XXX What should happen if this fails?
1321     my $blist = $self->get_bucket_list({
1322         key_md5 => $args->{key_md5},
1323     }) or DBM::Deep->_throw_error( "How did delete_key fail (no blist)?!" );
1324
1325     # Save the location so that we can free the data
1326     my $location = $blist->get_data_location_for({
1327         allow_head => 0,
1328     });
1329     my $old_value = $location && $self->engine->_load_sector( $location );
1330
1331     my @trans_ids = $self->engine->get_running_txn_ids;
1332
1333     if ( $self->engine->trans_id == 0 ) {
1334         if ( @trans_ids ) {
1335             foreach my $other_trans_id ( @trans_ids ) {
1336                 next if $blist->get_data_location_for({ trans_id => $other_trans_id, allow_head => 0 });
1337                 $blist->write_md5({
1338                     trans_id => $other_trans_id,
1339                     key      => $args->{key},
1340                     key_md5  => $args->{key_md5},
1341                     value    => $old_value->clone,
1342                 });
1343             }
1344         }
1345     }
1346
1347     my $data;
1348     if ( @trans_ids ) {
1349         $blist->mark_deleted( $args );
1350
1351         if ( $old_value ) {
1352             $data = $old_value->data;
1353             $old_value->free;
1354         }
1355     }
1356     else {
1357         $data = $blist->delete_md5( $args );
1358     }
1359
1360     return $data;
1361 }
1362
1363 sub get_blist_loc {
1364     my $self = shift;
1365
1366     my $e = $self->engine;
1367     my $blist_loc = $e->storage->read_at( $self->offset + $self->base_size, $e->byte_size );
1368     return unpack( $StP{$e->byte_size}, $blist_loc );
1369 }
1370
1371 sub get_bucket_list {
1372     my $self = shift;
1373     my ($args) = @_;
1374     $args ||= {};
1375
1376     # XXX Add in check here for recycling?
1377
1378     my $engine = $self->engine;
1379
1380     my $blist_loc = $self->get_blist_loc;
1381
1382     # There's no index or blist yet
1383     unless ( $blist_loc ) {
1384         return unless $args->{create};
1385
1386         my $blist = DBM::Deep::Engine::Sector::BucketList->new({
1387             engine  => $engine,
1388             key_md5 => $args->{key_md5},
1389         });
1390
1391         $engine->storage->print_at( $self->offset + $self->base_size,
1392             pack( $StP{$engine->byte_size}, $blist->offset ),
1393         );
1394
1395         return $blist;
1396     }
1397
1398     my $sector = $engine->_load_sector( $blist_loc )
1399         or DBM::Deep->_throw_error( "Cannot read sector at $blist_loc in get_bucket_list()" );
1400     my $i = 0;
1401     my $last_sector = undef;
1402     while ( $sector->isa( 'DBM::Deep::Engine::Sector::Index' ) ) {
1403         $blist_loc = $sector->get_entry( ord( substr( $args->{key_md5}, $i++, 1 ) ) );
1404         $last_sector = $sector;
1405         if ( $blist_loc ) {
1406             $sector = $engine->_load_sector( $blist_loc )
1407                 or DBM::Deep->_throw_error( "Cannot read sector at $blist_loc in get_bucket_list()" );
1408         }
1409         else {
1410             $sector = undef;
1411             last;
1412         }
1413     }
1414
1415     # This means we went through the Index sector(s) and found an empty slot
1416     unless ( $sector ) {
1417         return unless $args->{create};
1418
1419         DBM::Deep->_throw_error( "No last_sector when attempting to build a new entry" )
1420             unless $last_sector;
1421
1422         my $blist = DBM::Deep::Engine::Sector::BucketList->new({
1423             engine  => $engine,
1424             key_md5 => $args->{key_md5},
1425         });
1426
1427         $last_sector->set_entry( ord( substr( $args->{key_md5}, $i - 1, 1 ) ) => $blist->offset );
1428
1429         return $blist;
1430     }
1431
1432     $sector->find_md5( $args->{key_md5} );
1433
1434     # See whether or not we need to reindex the bucketlist
1435     if ( !$sector->has_md5 && $args->{create} && $sector->{idx} == -1 ) {
1436         my $new_index = DBM::Deep::Engine::Sector::Index->new({
1437             engine => $engine,
1438         });
1439
1440         my %blist_cache;
1441         #XXX q.v. the comments for this function.
1442         foreach my $entry ( $sector->chopped_up ) {
1443             my ($spot, $md5) = @{$entry};
1444             my $idx = ord( substr( $md5, $i, 1 ) );
1445
1446             # XXX This is inefficient
1447             my $blist = $blist_cache{$idx}
1448                 ||= DBM::Deep::Engine::Sector::BucketList->new({
1449                     engine => $engine,
1450                 });
1451
1452             $new_index->set_entry( $idx => $blist->offset );
1453
1454             my $new_spot = $blist->write_at_next_open( $md5 );
1455             $engine->reindex_entry( $spot => $new_spot );
1456         }
1457
1458         # Handle the new item separately.
1459         {
1460             my $idx = ord( substr( $args->{key_md5}, $i, 1 ) );
1461             my $blist = $blist_cache{$idx}
1462                 ||= DBM::Deep::Engine::Sector::BucketList->new({
1463                     engine => $engine,
1464                 });
1465
1466             $new_index->set_entry( $idx => $blist->offset );
1467
1468             #XXX THIS IS HACKY!
1469             $blist->find_md5( $args->{key_md5} );
1470             $blist->write_md5({
1471                 key     => $args->{key},
1472                 key_md5 => $args->{key_md5},
1473                 value   => DBM::Deep::Engine::Sector::Null->new({
1474                     engine => $engine,
1475                     data   => undef,
1476                 }),
1477             });
1478         }
1479
1480         if ( $last_sector ) {
1481             $last_sector->set_entry(
1482                 ord( substr( $args->{key_md5}, $i - 1, 1 ) ),
1483                 $new_index->offset,
1484             );
1485         } else {
1486             $engine->storage->print_at( $self->offset + $self->base_size,
1487                 pack( $StP{$engine->byte_size}, $new_index->offset ),
1488             );
1489         }
1490
1491         $sector->free;
1492
1493         $sector = $blist_cache{ ord( substr( $args->{key_md5}, $i, 1 ) ) };
1494         $sector->find_md5( $args->{key_md5} );
1495     }
1496
1497     return $sector;
1498 }
1499
1500 sub get_class_offset {
1501     my $self = shift;
1502
1503     my $e = $self->engine;
1504     return unpack(
1505         $StP{$e->byte_size},
1506         $e->storage->read_at(
1507             $self->offset + $self->base_size + 1 * $e->byte_size, $e->byte_size,
1508         ),
1509     );
1510 }
1511
1512 sub get_classname {
1513     my $self = shift;
1514
1515     my $class_offset = $self->get_class_offset;
1516
1517     return unless $class_offset;
1518
1519     return $self->engine->_load_sector( $class_offset )->data;
1520 }
1521
1522 #XXX Add singleton handling here
1523 sub data {
1524     my $self = shift;
1525
1526     my $new_obj = DBM::Deep->new({
1527         type        => $self->type,
1528         base_offset => $self->offset,
1529         staleness   => $self->staleness,
1530         storage     => $self->engine->storage,
1531         engine      => $self->engine,
1532     });
1533
1534     if ( $self->engine->storage->{autobless} ) {
1535         my $classname = $self->get_classname;
1536         if ( defined $classname ) {
1537             bless $new_obj, $classname;
1538         }
1539     }
1540
1541     return $new_obj;
1542 }
1543
1544 package DBM::Deep::Engine::Sector::BucketList;
1545
1546 our @ISA = qw( DBM::Deep::Engine::Sector );
1547
1548 sub _init {
1549     my $self = shift;
1550
1551     my $engine = $self->engine;
1552
1553     unless ( $self->offset ) {
1554         my $leftover = $self->size - $self->base_size;
1555
1556         $self->{offset} = $engine->_request_blist_sector( $self->size );
1557         $engine->storage->print_at( $self->offset, $engine->SIG_BLIST ); # Sector type
1558         # Skip staleness counter
1559         $engine->storage->print_at( $self->offset + $self->base_size,
1560             chr(0) x $leftover, # Zero-fill the data
1561         );
1562     }
1563
1564     if ( $self->{key_md5} ) {
1565         $self->find_md5;
1566     }
1567
1568     return $self;
1569 }
1570
1571 sub size {
1572     my $self = shift;
1573     unless ( $self->{size} ) {
1574         my $e = $self->engine;
1575         # Base + numbuckets * bucketsize
1576         $self->{size} = $self->base_size + $e->max_buckets * $self->bucket_size;
1577     }
1578     return $self->{size};
1579 }
1580
1581 sub free_meth { return '_add_free_blist_sector' }
1582
1583 sub bucket_size {
1584     my $self = shift;
1585     unless ( $self->{bucket_size} ) {
1586         my $e = $self->engine;
1587         # Key + head (location) + transactions (location + staleness-counter)
1588         my $location_size = $e->byte_size + $e->num_txns * ( $e->byte_size + 4 );
1589         $self->{bucket_size} = $e->hash_size + $location_size;
1590     }
1591     return $self->{bucket_size};
1592 }
1593
1594 # XXX This is such a poor hack. I need to rethink this code.
1595 sub chopped_up {
1596     my $self = shift;
1597
1598     my $e = $self->engine;
1599
1600     my @buckets;
1601     foreach my $idx ( 0 .. $e->max_buckets - 1 ) {
1602         my $spot = $self->offset + $self->base_size + $idx * $self->bucket_size;
1603         my $md5 = $e->storage->read_at( $spot, $e->hash_size );
1604
1605         #XXX If we're chopping, why would we ever have the blank_md5?
1606         last if $md5 eq $e->blank_md5;
1607
1608         my $rest = $e->storage->read_at( undef, $self->bucket_size - $e->hash_size );
1609         push @buckets, [ $spot, $md5 . $rest ];
1610     }
1611
1612     return @buckets;
1613 }
1614
1615 sub write_at_next_open {
1616     my $self = shift;
1617     my ($entry) = @_;
1618
1619     #XXX This is such a hack!
1620     $self->{_next_open} = 0 unless exists $self->{_next_open};
1621
1622     my $spot = $self->offset + $self->base_size + $self->{_next_open}++ * $self->bucket_size;
1623     $self->engine->storage->print_at( $spot, $entry );
1624
1625     return $spot;
1626 }
1627
1628 sub has_md5 {
1629     my $self = shift;
1630     unless ( exists $self->{found} ) {
1631         $self->find_md5;
1632     }
1633     return $self->{found};
1634 }
1635
1636 sub find_md5 {
1637     my $self = shift;
1638
1639     $self->{found} = undef;
1640     $self->{idx}   = -1;
1641
1642     if ( @_ ) {
1643         $self->{key_md5} = shift;
1644     }
1645
1646     # If we don't have an MD5, then what are we supposed to do?
1647     unless ( exists $self->{key_md5} ) {
1648         DBM::Deep->_throw_error( "Cannot find_md5 without a key_md5 set" );
1649     }
1650
1651     my $e = $self->engine;
1652     foreach my $idx ( 0 .. $e->max_buckets - 1 ) {
1653         my $potential = $e->storage->read_at(
1654             $self->offset + $self->base_size + $idx * $self->bucket_size, $e->hash_size,
1655         );
1656
1657         if ( $potential eq $e->blank_md5 ) {
1658             $self->{idx} = $idx;
1659             return;
1660         }
1661
1662         if ( $potential eq $self->{key_md5} ) {
1663             $self->{found} = 1;
1664             $self->{idx} = $idx;
1665             return;
1666         }
1667     }
1668
1669     return;
1670 }
1671
1672 sub write_md5 {
1673     my $self = shift;
1674     my ($args) = @_;
1675
1676     DBM::Deep->_throw_error( "write_md5: no key" ) unless exists $args->{key};
1677     DBM::Deep->_throw_error( "write_md5: no key_md5" ) unless exists $args->{key_md5};
1678     DBM::Deep->_throw_error( "write_md5: no value" ) unless exists $args->{value};
1679
1680     my $engine = $self->engine;
1681
1682     $args->{trans_id} = $engine->trans_id unless exists $args->{trans_id};
1683
1684     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
1685     $engine->add_entry( $args->{trans_id}, $spot );
1686
1687     unless ($self->{found}) {
1688         my $key_sector = DBM::Deep::Engine::Sector::Scalar->new({
1689             engine => $engine,
1690             data   => $args->{key},
1691         });
1692
1693         $engine->storage->print_at( $spot,
1694             $args->{key_md5},
1695             pack( $StP{$engine->byte_size}, $key_sector->offset ),
1696         );
1697     }
1698
1699     my $loc = $spot
1700       + $engine->hash_size
1701       + $engine->byte_size
1702       + $args->{trans_id} * ( $engine->byte_size + 4 );
1703
1704     $engine->storage->print_at( $loc,
1705         pack( $StP{$engine->byte_size}, $args->{value}->offset ),
1706         pack( 'N', $engine->get_txn_staleness_counter( $args->{trans_id} ) ),
1707     );
1708 }
1709
1710 sub mark_deleted {
1711     my $self = shift;
1712     my ($args) = @_;
1713     $args ||= {};
1714
1715     my $engine = $self->engine;
1716
1717     $args->{trans_id} = $engine->trans_id unless exists $args->{trans_id};
1718
1719     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
1720     $engine->add_entry( $args->{trans_id}, $spot );
1721
1722     my $loc = $spot
1723       + $engine->hash_size
1724       + $engine->byte_size
1725       + $args->{trans_id} * ( $engine->byte_size + 4 );
1726
1727     $engine->storage->print_at( $loc,
1728         pack( $StP{$engine->byte_size}, 1 ), # 1 is the marker for deleted
1729         pack( 'N', $engine->get_txn_staleness_counter( $args->{trans_id} ) ),
1730     );
1731 }
1732
1733 sub delete_md5 {
1734     my $self = shift;
1735     my ($args) = @_;
1736
1737     my $engine = $self->engine;
1738     return undef unless $self->{found};
1739
1740     # Save the location so that we can free the data
1741     my $location = $self->get_data_location_for({
1742         allow_head => 0,
1743     });
1744     my $key_sector = $self->get_key_for;
1745
1746     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
1747     $engine->storage->print_at( $spot,
1748         $engine->storage->read_at(
1749             $spot + $self->bucket_size,
1750             $self->bucket_size * ( $engine->max_buckets - $self->{idx} - 1 ),
1751         ),
1752         chr(0) x $self->bucket_size,
1753     );
1754
1755     $key_sector->free;
1756
1757     my $data_sector = $self->engine->_load_sector( $location );
1758     my $data = $data_sector->data;
1759     $data_sector->free;
1760
1761     return $data;
1762 }
1763
1764 sub get_data_location_for {
1765     my $self = shift;
1766     my ($args) = @_;
1767     $args ||= {};
1768
1769     $args->{allow_head} = 0 unless exists $args->{allow_head};
1770     $args->{trans_id}   = $self->engine->trans_id unless exists $args->{trans_id};
1771     $args->{idx}        = $self->{idx} unless exists $args->{idx};
1772
1773     my $e = $self->engine;
1774
1775     my $spot = $self->offset + $self->base_size
1776       + $args->{idx} * $self->bucket_size
1777       + $e->hash_size
1778       + $e->byte_size
1779       + $args->{trans_id} * ( $e->byte_size + 4 );
1780
1781     my $buffer = $e->storage->read_at(
1782         $spot,
1783         $e->byte_size + 4,
1784     );
1785     my ($loc, $staleness) = unpack( $StP{$e->byte_size} . ' N', $buffer );
1786
1787     # We have found an entry that is old, so get rid of it
1788     if ( $staleness != (my $s = $e->get_txn_staleness_counter( $args->{trans_id} ) ) ) {
1789         $e->storage->print_at(
1790             $spot,
1791             pack( $StP{$e->byte_size} . ' N', (0) x 2 ), 
1792         );
1793         $loc = 0;
1794     }
1795
1796     # If we're in a transaction and we never wrote to this location, try the
1797     # HEAD instead.
1798     if ( $args->{trans_id} && !$loc && $args->{allow_head} ) {
1799         return $self->get_data_location_for({
1800             trans_id   => 0,
1801             allow_head => 1,
1802             idx        => $args->{idx},
1803         });
1804     }
1805     return $loc <= 1 ? 0 : $loc;
1806 }
1807
1808 sub get_data_for {
1809     my $self = shift;
1810     my ($args) = @_;
1811     $args ||= {};
1812
1813     return unless $self->{found};
1814     my $location = $self->get_data_location_for({
1815         allow_head => $args->{allow_head},
1816     });
1817     return $self->engine->_load_sector( $location );
1818 }
1819
1820 sub get_key_for {
1821     my $self = shift;
1822     my ($idx) = @_;
1823     $idx = $self->{idx} unless defined $idx;
1824
1825     if ( $idx >= $self->engine->max_buckets ) {
1826         DBM::Deep->_throw_error( "get_key_for(): Attempting to retrieve $idx" );
1827     }
1828
1829     my $location = $self->engine->storage->read_at(
1830         $self->offset + $self->base_size + $idx * $self->bucket_size + $self->engine->hash_size,
1831         $self->engine->byte_size,
1832     );
1833     $location = unpack( $StP{$self->engine->byte_size}, $location );
1834     DBM::Deep->_throw_error( "get_key_for: No location?" ) unless $location;
1835
1836     return $self->engine->_load_sector( $location );
1837 }
1838
1839 package DBM::Deep::Engine::Sector::Index;
1840
1841 our @ISA = qw( DBM::Deep::Engine::Sector );
1842
1843 sub _init {
1844     my $self = shift;
1845
1846     my $engine = $self->engine;
1847
1848     unless ( $self->offset ) {
1849         my $leftover = $self->size - $self->base_size;
1850
1851         $self->{offset} = $engine->_request_index_sector( $self->size );
1852         $engine->storage->print_at( $self->offset, $engine->SIG_INDEX ); # Sector type
1853         # Skip staleness counter
1854         $engine->storage->print_at( $self->offset + $self->base_size,
1855             chr(0) x $leftover, # Zero-fill the rest
1856         );
1857     }
1858
1859     return $self;
1860 }
1861
1862 sub size {
1863     my $self = shift;
1864     unless ( $self->{size} ) {
1865         my $e = $self->engine;
1866         $self->{size} = $self->base_size + $e->byte_size * $e->hash_chars;
1867     }
1868     return $self->{size};
1869 }
1870
1871 sub free_meth { return '_add_free_index_sector' }
1872
1873 sub free {
1874     my $self = shift;
1875     my $e = $self->engine;
1876
1877     for my $i ( 0 .. $e->hash_chars - 1 ) {
1878         my $l = $self->get_entry( $i ) or next;
1879         $e->_load_sector( $l )->free;
1880     }
1881
1882     $self->SUPER::free();
1883 }
1884
1885 sub _loc_for {
1886     my $self = shift;
1887     my ($idx) = @_;
1888     return $self->offset + $self->base_size + $idx * $self->engine->byte_size;
1889 }
1890
1891 sub get_entry {
1892     my $self = shift;
1893     my ($idx) = @_;
1894
1895     my $e = $self->engine;
1896
1897     DBM::Deep->_throw_error( "get_entry: Out of range ($idx)" )
1898         if $idx < 0 || $idx >= $e->hash_chars;
1899
1900     return unpack(
1901         $StP{$e->byte_size},
1902         $e->storage->read_at( $self->_loc_for( $idx ), $e->byte_size ),
1903     );
1904 }
1905
1906 sub set_entry {
1907     my $self = shift;
1908     my ($idx, $loc) = @_;
1909
1910     my $e = $self->engine;
1911
1912     DBM::Deep->_throw_error( "set_entry: Out of range ($idx)" )
1913         if $idx < 0 || $idx >= $e->hash_chars;
1914
1915     $self->engine->storage->print_at(
1916         $self->_loc_for( $idx ),
1917         pack( $StP{$e->byte_size}, $loc ),
1918     );
1919 }
1920
1921 1;
1922 __END__