Minor fixes, including removing the ==2/1 from add_bucket()
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 use 5.6.0;
4
5 use strict;
6 use warnings;
7
8 our $VERSION = q(0.99_01);
9
10 use Fcntl qw( :DEFAULT :flock );
11 use Scalar::Util ();
12
13 # File-wide notes:
14 # * To add to bucket_size, make sure you modify the following:
15 #   - calculate_sizes()
16 #   - _get_key_subloc()
17 #   - add_bucket() - where the buckets are printed
18
19 ##
20 # Setup file and tag signatures.  These should never change.
21 ##
22 sub SIG_FILE     () { 'DPDB' }
23 sub SIG_HEADER   () { 'h'    }
24 sub SIG_INTERNAL () { 'i'    }
25 sub SIG_HASH     () { 'H'    }
26 sub SIG_ARRAY    () { 'A'    }
27 sub SIG_NULL     () { 'N'    }
28 sub SIG_DATA     () { 'D'    }
29 sub SIG_INDEX    () { 'I'    }
30 sub SIG_BLIST    () { 'B'    }
31 sub SIG_FREE     () { 'F'    }
32 sub SIG_KEYS     () { 'K'    }
33 sub SIG_SIZE     () {  1     }
34
35 sub new {
36     my $class = shift;
37     my ($args) = @_;
38
39     my $self = bless {
40         long_size => 4,
41         long_pack => 'N',
42         data_size => 4,
43         data_pack => 'N',
44
45         digest    => \&Digest::MD5::md5,
46         hash_size => 16,
47
48         ##
49         # Maximum number of buckets per blist before another level of indexing is
50         # done. Increase this value for slightly greater speed, but larger database
51         # files. DO NOT decrease this value below 16, due to risk of recursive
52         # reindex overrun.
53         ##
54         max_buckets => 16,
55
56         fileobj => undef,
57         obj     => undef,
58     }, $class;
59
60     if ( defined $args->{pack_size} ) {
61         if ( lc $args->{pack_size} eq 'small' ) {
62             $args->{long_size} = 2;
63             $args->{long_pack} = 'n';
64         }
65         elsif ( lc $args->{pack_size} eq 'medium' ) {
66             $args->{long_size} = 4;
67             $args->{long_pack} = 'N';
68         }
69         elsif ( lc $args->{pack_size} eq 'large' ) {
70             $args->{long_size} = 8;
71             $args->{long_pack} = 'Q';
72         }
73         else {
74             die "Unknown pack_size value: '$args->{pack_size}'\n";
75         }
76     }
77
78     # Grab the parameters we want to use
79     foreach my $param ( keys %$self ) {
80         next unless exists $args->{$param};
81         $self->{$param} = $args->{$param};
82     }
83     Scalar::Util::weaken( $self->{obj} ) if $self->{obj};
84
85     if ( $self->{max_buckets} < 16 ) {
86         warn "Floor of max_buckets is 16. Setting it to 16 from '$self->{max_buckets}'\n";
87         $self->{max_buckets} = 16;
88     }
89
90     return $self;
91 }
92
93 sub _fileobj { return $_[0]{fileobj} }
94
95 sub calculate_sizes {
96     my $self = shift;
97
98     # The 2**8 here indicates the number of different characters in the
99     # current hashing algorithm
100     #XXX Does this need to be updated with different hashing algorithms?
101     $self->{index_size}       = (2**8) * $self->{long_size};
102     $self->{bucket_size}      = $self->{hash_size} + $self->{long_size} * 3;
103     $self->{bucket_list_size} = $self->{max_buckets} * $self->{bucket_size};
104
105     return;
106 }
107
108 sub write_file_header {
109     my $self = shift;
110
111     my $loc = $self->_fileobj->request_space( length( SIG_FILE ) + 21 );
112
113     $self->_fileobj->print_at( $loc,
114         SIG_FILE,
115         SIG_HEADER,
116         pack('N', 1),  # header version
117         pack('N', 12), # header size
118         pack('N', 0),  # currently running transaction IDs
119         pack('n', $self->{long_size}),
120         pack('A', $self->{long_pack}),
121         pack('n', $self->{data_size}),
122         pack('A', $self->{data_pack}),
123         pack('n', $self->{max_buckets}),
124     );
125
126     $self->_fileobj->set_transaction_offset( 13 );
127
128     return;
129 }
130
131 sub read_file_header {
132     my $self = shift;
133
134     my $buffer = $self->_fileobj->read_at( 0, length(SIG_FILE) + 9 );
135     return unless length($buffer);
136
137     my ($file_signature, $sig_header, $header_version, $size) = unpack(
138         'A4 A N N', $buffer
139     );
140
141     unless ( $file_signature eq SIG_FILE ) {
142         $self->_fileobj->close;
143         $self->_throw_error( "Signature not found -- file is not a Deep DB" );
144     }
145
146     unless ( $sig_header eq SIG_HEADER ) {
147         $self->_fileobj->close;
148         $self->_throw_error( "Old file version found." );
149     }
150
151     my $buffer2 = $self->_fileobj->read_at( undef, $size );
152     my ($running_transactions, @values) = unpack( 'N n A n A n', $buffer2 );
153
154     $self->_fileobj->set_transaction_offset( 13 );
155
156     if ( @values < 5 || grep { !defined } @values ) {
157         $self->_fileobj->close;
158         $self->_throw_error("Corrupted file - bad header");
159     }
160
161     #XXX Add warnings if values weren't set right
162     @{$self}{qw(long_size long_pack data_size data_pack max_buckets)} = @values;
163
164     return length($buffer) + length($buffer2);
165 }
166
167 sub setup_fh {
168     my $self = shift;
169     my ($obj) = @_;
170
171     # Need to remove use of $fh here
172     my $fh = $self->_fileobj->{fh};
173     flock $fh, LOCK_EX;
174
175     #XXX The duplication of calculate_sizes needs to go away
176     unless ( $obj->{base_offset} ) {
177         my $bytes_read = $self->read_file_header;
178
179         $self->calculate_sizes;
180
181         ##
182         # File is empty -- write header and master index
183         ##
184         if (!$bytes_read) {
185             $self->_fileobj->audit( "# Database created on" );
186
187             $self->write_file_header;
188
189             $obj->{base_offset} = $self->_fileobj->request_space(
190                 $self->tag_size( $self->{index_size} ),
191             );
192
193             $self->write_tag(
194                 $obj->_base_offset, $obj->_type,
195                 chr(0)x$self->{index_size},
196             );
197
198             # Flush the filehandle
199             my $old_fh = select $fh;
200             my $old_af = $|; $| = 1; $| = $old_af;
201             select $old_fh;
202         }
203         else {
204             $obj->{base_offset} = $bytes_read;
205
206             ##
207             # Get our type from master index header
208             ##
209             my $tag = $self->load_tag($obj->_base_offset);
210             unless ( $tag ) {
211                 flock $fh, LOCK_UN;
212                 $self->_throw_error("Corrupted file, no master index record");
213             }
214
215             unless ($obj->_type eq $tag->{signature}) {
216                 flock $fh, LOCK_UN;
217                 $self->_throw_error("File type mismatch");
218             }
219         }
220     }
221     else {
222         $self->calculate_sizes;
223     }
224
225     #XXX We have to make sure we don't mess up when autoflush isn't turned on
226     $self->_fileobj->set_inode;
227
228     flock $fh, LOCK_UN;
229
230     return 1;
231 }
232
233 sub tag_size {
234     my $self = shift;
235     my ($size) = @_;
236     return SIG_SIZE + $self->{data_size} + $size;
237 }
238
239 sub write_tag {
240     ##
241     # Given offset, signature and content, create tag and write to disk
242     ##
243     my $self = shift;
244     my ($offset, $sig, $content) = @_;
245     my $size = length( $content );
246
247     $self->_fileobj->print_at(
248         $offset, 
249         $sig, pack($self->{data_pack}, $size), $content,
250     );
251
252     return unless defined $offset;
253
254     return {
255         signature => $sig,
256         size      => $size,
257         offset    => $offset + SIG_SIZE + $self->{data_size},
258         content   => $content
259     };
260 }
261
262 sub load_tag {
263     ##
264     # Given offset, load single tag and return signature, size and data
265     ##
266     my $self = shift;
267     my ($offset) = @_;
268
269     my $fileobj = $self->_fileobj;
270
271     my ($sig, $size) = unpack(
272         "A $self->{data_pack}",
273         $fileobj->read_at( $offset, SIG_SIZE + $self->{data_size} ),
274     );
275
276     return {
277         signature => $sig,
278         size      => $size,
279         offset    => $offset + SIG_SIZE + $self->{data_size},
280         content   => $fileobj->read_at( undef, $size ),
281     };
282 }
283
284 sub add_bucket {
285     ##
286     # Adds one key/value pair to bucket list, given offset, MD5 digest of key,
287     # plain (undigested) key and value.
288     ##
289     my $self = shift;
290     my ($tag, $md5, $plain_key, $value, $deleted, $orig_key) = @_;
291     $deleted ||= 0;
292
293     # This verifies that only supported values will be stored.
294     {
295         my $r = Scalar::Util::reftype( $value );
296
297         last if !defined $r;
298         last if $r eq 'HASH';
299         last if $r eq 'ARRAY';
300
301         $self->_throw_error(
302             "Storage of references of type '$r' is not supported."
303         );
304     }
305
306     my $fileobj = $self->_fileobj;
307
308     my $actual_length = $self->_length_needed( $value, $plain_key );
309
310     #ACID - This is a mutation. Must only find the exact transaction
311     my ($subloc, $offset, $size,$is_deleted) = $self->_find_in_buckets( $tag, $md5, 1 );
312
313     my @transactions;
314     if ( $fileobj->transaction_id == 0 ) {
315         @transactions = $fileobj->current_transactions;
316     }
317
318 #    $self->_release_space( $size, $subloc );
319     # Updating a known md5
320 #XXX This needs updating to use _release_space
321     my $location;
322     if ( $subloc ) {
323         if ($actual_length <= $size) {
324             $location = $subloc;
325         }
326         else {
327             $location = $fileobj->request_space( $actual_length );
328
329             $fileobj->print_at( $tag->{offset} + $offset + $self->{hash_size},
330                 pack($self->{long_pack}, $location ),
331                 pack($self->{long_pack}, $actual_length ),
332                 pack('n n', $fileobj->transaction_id, $deleted ),
333             );
334         }
335
336         my $old_value = $self->read_from_loc( $subloc, $orig_key );
337         for ( @transactions ) {
338             my $tag2 = $self->load_tag( $tag->{offset} - SIG_SIZE - $self->{data_size} );
339             $fileobj->{transaction_id} = $_;
340             $self->add_bucket( $tag2, $md5, $orig_key, $old_value, undef, $orig_key );
341             $fileobj->{transaction_id} = 0;
342         }
343         $tag = $self->load_tag( $tag->{offset} - SIG_SIZE - $self->{data_size} );
344     }
345     # Adding a new md5
346     elsif ( defined $offset ) {
347         $location = $fileobj->request_space( $actual_length );
348
349         $fileobj->print_at( $tag->{offset} + $offset,
350             $md5,
351             pack($self->{long_pack}, $location ),
352             pack($self->{long_pack}, $actual_length ),
353             pack('n n', $fileobj->transaction_id, $deleted ),
354         );
355
356         for ( @transactions ) {
357             my $tag2 = $self->load_tag( $tag->{offset} - SIG_SIZE - $self->{data_size} );
358             $fileobj->{transaction_id} = $_;
359             $self->add_bucket( $tag2, $md5, '', '', 1, $orig_key );
360             $fileobj->{transaction_id} = 0;
361         }
362         $tag = $self->load_tag( $tag->{offset} - SIG_SIZE - $self->{data_size} );
363     }
364     # If bucket didn't fit into list, split into a new index level
365     # split_index() will do the $self->_fileobj->request_space() call
366     #XXX It also needs to be transactionally aware
367     else {
368         $location = $self->split_index( $md5, $tag );
369     }
370
371     $self->write_value( $location, $plain_key, $value, $orig_key );
372
373     return 1;
374 }
375
376 sub write_value {
377     my $self = shift;
378     my ($location, $key, $value, $orig_key) = @_;
379
380     my $fileobj = $self->_fileobj;
381
382     my $dbm_deep_obj = _get_dbm_object( $value );
383     if ( $dbm_deep_obj && $dbm_deep_obj->_fileobj ne $fileobj ) {
384         $self->_throw_error( "Cannot cross-reference. Use export() instead" );
385     }
386
387     ##
388     # Write signature based on content type, set content length and write
389     # actual value.
390     ##
391     my $r = Scalar::Util::reftype( $value ) || '';
392     if ( $dbm_deep_obj ) {
393         $self->write_tag( $location, SIG_INTERNAL,pack($self->{long_pack}, $dbm_deep_obj->_base_offset) );
394     }
395     elsif ($r eq 'HASH') {
396         if ( !$dbm_deep_obj && tied %{$value} ) {
397             $self->_throw_error( "Cannot store something that is tied" );
398         }
399         $self->write_tag( $location, SIG_HASH, chr(0)x$self->{index_size} );
400     }
401     elsif ($r eq 'ARRAY') {
402         if ( !$dbm_deep_obj && tied @{$value} ) {
403             $self->_throw_error( "Cannot store something that is tied" );
404         }
405         $self->write_tag( $location, SIG_ARRAY, chr(0)x$self->{index_size} );
406     }
407     elsif (!defined($value)) {
408         $self->write_tag( $location, SIG_NULL, '' );
409     }
410     else {
411         $self->write_tag( $location, SIG_DATA, $value );
412     }
413
414     ##
415     # Plain key is stored AFTER value, as keys are typically fetched less often.
416     ##
417     $fileobj->print_at( undef, pack($self->{data_pack}, length($key)) . $key );
418
419     # Internal references don't care about autobless
420     return 1 if $dbm_deep_obj;
421
422     ##
423     # If value is blessed, preserve class name
424     ##
425     if ( $fileobj->{autobless} ) {
426         if ( defined( my $c = Scalar::Util::blessed($value) ) ) {
427             $fileobj->print_at( undef, chr(1), pack($self->{data_pack}, length($c)) . $c );
428         }
429         else {
430             $fileobj->print_at( undef, chr(0) );
431         }
432     }
433
434     ##
435     # Tie the passed in reference so that changes to it are reflected in the
436     # datafile. The use of $location as the base_offset will act as the
437     # the linkage between parent and child.
438     #
439     # The overall assignment is a hack around the fact that just tying doesn't
440     # store the values. This may not be the wrong thing to do.
441     ##
442     if ($r eq 'HASH') {
443         my %x = %$value;
444         tie %$value, 'DBM::Deep', {
445             base_offset => $location,
446             fileobj     => $fileobj,
447             parent      => $self->{obj},
448             parent_key  => $orig_key,
449         };
450         %$value = %x;
451     }
452     elsif ($r eq 'ARRAY') {
453         my @x = @$value;
454         tie @$value, 'DBM::Deep', {
455             base_offset => $location,
456             fileobj     => $fileobj,
457             parent      => $self->{obj},
458             parent_key  => $orig_key,
459         };
460         @$value = @x;
461     }
462
463     return 1;
464 }
465
466 sub split_index {
467     my $self = shift;
468     my ($md5, $tag) = @_;
469
470     my $fileobj = $self->_fileobj;
471
472     my $loc = $fileobj->request_space(
473         $self->tag_size( $self->{index_size} ),
474     );
475
476     $fileobj->print_at( $tag->{ref_loc}, pack($self->{long_pack}, $loc) );
477
478     my $index_tag = $self->write_tag(
479         $loc, SIG_INDEX,
480         chr(0)x$self->{index_size},
481     );
482
483     my $newtag_loc = $fileobj->request_space(
484         $self->tag_size( $self->{bucket_list_size} ),
485     );
486
487     my $keys = $tag->{content}
488              . $md5 . pack($self->{long_pack}, $newtag_loc)
489                     . pack($self->{long_pack}, 0)  # size
490                     . pack($self->{long_pack}, 0); # transaction ID
491
492     my @newloc = ();
493     BUCKET:
494     # The <= here is deliberate - we have max_buckets+1 keys to iterate
495     # through, unlike every other loop that uses max_buckets as a stop.
496     for (my $i = 0; $i <= $self->{max_buckets}; $i++) {
497         my ($key, $old_subloc, $size) = $self->_get_key_subloc( $keys, $i );
498
499         die "[INTERNAL ERROR]: No key in split_index()\n" unless $key;
500         die "[INTERNAL ERROR]: No subloc in split_index()\n" unless $old_subloc;
501
502         my $num = ord(substr($key, $tag->{ch} + 1, 1));
503
504         if ($newloc[$num]) {
505             my $subkeys = $fileobj->read_at( $newloc[$num], $self->{bucket_list_size} );
506
507             # This is looking for the first empty spot
508             my ($subloc, $offset, $size) = $self->_find_in_buckets(
509                 { content => $subkeys }, '',
510             );
511
512             $fileobj->print_at(
513                 $newloc[$num] + $offset,
514                 $key, pack($self->{long_pack}, $old_subloc),
515             );
516
517             next;
518         }
519
520         my $loc = $fileobj->request_space(
521             $self->tag_size( $self->{bucket_list_size} ),
522         );
523
524         $fileobj->print_at(
525             $index_tag->{offset} + ($num * $self->{long_size}),
526             pack($self->{long_pack}, $loc),
527         );
528
529         my $blist_tag = $self->write_tag(
530             $loc, SIG_BLIST,
531             chr(0)x$self->{bucket_list_size},
532         );
533
534         $fileobj->print_at( $blist_tag->{offset}, $key . pack($self->{long_pack}, $old_subloc) );
535
536         $newloc[$num] = $blist_tag->{offset};
537     }
538
539     $self->_release_space(
540         $self->tag_size( $self->{bucket_list_size} ),
541         $tag->{offset} - SIG_SIZE - $self->{data_size},
542     );
543
544     return $newtag_loc;
545 }
546
547 sub read_from_loc {
548     my $self = shift;
549     my ($subloc, $orig_key) = @_;
550
551     my $fileobj = $self->_fileobj;
552
553     my $signature = $fileobj->read_at( $subloc, SIG_SIZE );
554
555     ##
556     # If value is a hash or array, return new DBM::Deep object with correct offset
557     ##
558     if (($signature eq SIG_HASH) || ($signature eq SIG_ARRAY)) {
559         my $new_obj = DBM::Deep->new({
560             type        => $signature,
561             base_offset => $subloc,
562             fileobj     => $self->_fileobj,
563             parent      => $self->{obj},
564             parent_key  => $orig_key,
565         });
566
567         if ($new_obj->_fileobj->{autobless}) {
568             ##
569             # Skip over value and plain key to see if object needs
570             # to be re-blessed
571             ##
572             $fileobj->increment_pointer( $self->{data_size} + $self->{index_size} );
573
574             my $size = $fileobj->read_at( undef, $self->{data_size} );
575             $size = unpack($self->{data_pack}, $size);
576             if ($size) { $fileobj->increment_pointer( $size ); }
577
578             my $bless_bit = $fileobj->read_at( undef, 1 );
579             if ( ord($bless_bit) ) {
580                 my $size = unpack(
581                     $self->{data_pack},
582                     $fileobj->read_at( undef, $self->{data_size} ),
583                 );
584
585                 if ( $size ) {
586                     $new_obj = bless $new_obj, $fileobj->read_at( undef, $size );
587                 }
588             }
589         }
590
591         return $new_obj;
592     }
593     elsif ( $signature eq SIG_INTERNAL ) {
594         my $size = $fileobj->read_at( undef, $self->{data_size} );
595         $size = unpack($self->{data_pack}, $size);
596
597         if ( $size ) {
598             my $new_loc = $fileobj->read_at( undef, $size );
599             $new_loc = unpack( $self->{long_pack}, $new_loc ); 
600             return $self->read_from_loc( $new_loc, $orig_key );
601         }
602         else {
603             return;
604         }
605     }
606     ##
607     # Otherwise return actual value
608     ##
609     elsif ( $signature eq SIG_DATA ) {
610         my $size = $fileobj->read_at( undef, $self->{data_size} );
611         $size = unpack($self->{data_pack}, $size);
612
613         my $value = $size ? $fileobj->read_at( undef, $size ) : '';
614         return $value;
615     }
616
617     ##
618     # Key exists, but content is null
619     ##
620     return;
621 }
622
623 sub get_bucket_value {
624     ##
625     # Fetch single value given tag and MD5 digested key.
626     ##
627     my $self = shift;
628     my ($tag, $md5, $orig_key) = @_;
629
630     #ACID - This is a read. Can find exact or HEAD
631     my ($subloc, $offset, $size, $is_deleted) = $self->_find_in_buckets( $tag, $md5 );
632
633     if ( !$subloc ) {
634         #XXX Need to use real key
635 #        $self->add_bucket( $tag, $md5, $orig_key, undef, undef, $orig_key );
636 #        return;
637     }
638     elsif ( !$is_deleted ) {
639         return $self->read_from_loc( $subloc, $orig_key );
640     }
641
642     return;
643 }
644
645 sub delete_bucket {
646     ##
647     # Delete single key/value pair given tag and MD5 digested key.
648     ##
649     my $self = shift;
650     my ($tag, $md5, $orig_key) = @_;
651
652     #ACID - Although this is a mutation, we must find any transaction.
653     # This is because we need to mark something as deleted that is in the HEAD.
654     my ($subloc, $offset, $size,$is_deleted) = $self->_find_in_buckets( $tag, $md5 );
655
656     return if !$subloc;
657
658     my $fileobj = $self->_fileobj;
659
660     my @transactions;
661     if ( $fileobj->transaction_id == 0 ) {
662         @transactions = $fileobj->current_transactions;
663     }
664
665     if ( $fileobj->transaction_id == 0 ) {
666         my $value = $self->read_from_loc( $subloc, $orig_key );
667
668         for (@transactions) {
669             $fileobj->{transaction_id} = $_;
670             #XXX Need to use real key
671             $self->add_bucket( $tag, $md5, $orig_key, $value, undef, $orig_key );
672             $fileobj->{transaction_id} = 0;
673         }
674         $tag = $self->load_tag( $tag->{offset} - SIG_SIZE - $self->{data_size} );
675
676         #XXX This needs _release_space() for the value and anything below
677         $fileobj->print_at(
678             $tag->{offset} + $offset,
679             substr( $tag->{content}, $offset + $self->{bucket_size} ),
680             chr(0) x $self->{bucket_size},
681         );
682     }
683     else {
684         $self->add_bucket( $tag, $md5, '', '', 1, $orig_key );
685     }
686
687     return 1;
688 }
689
690 sub bucket_exists {
691     ##
692     # Check existence of single key given tag and MD5 digested key.
693     ##
694     my $self = shift;
695     my ($tag, $md5) = @_;
696
697     #ACID - This is a read. Can find exact or HEAD
698     my ($subloc, $offset, $size, $is_deleted) = $self->_find_in_buckets( $tag, $md5 );
699     return ($subloc && !$is_deleted) && 1;
700 }
701
702 sub find_bucket_list {
703     ##
704     # Locate offset for bucket list, given digested key
705     ##
706     my $self = shift;
707     my ($offset, $md5, $args) = @_;
708     $args = {} unless $args;
709
710     ##
711     # Locate offset for bucket list using digest index system
712     ##
713     my $tag = $self->load_tag( $offset )
714         or $self->_throw_error( "INTERNAL ERROR - Cannot find tag" );
715
716     my $ch = 0;
717     while ($tag->{signature} ne SIG_BLIST) {
718         my $num = ord substr($md5, $ch, 1);
719
720         my $ref_loc = $tag->{offset} + ($num * $self->{long_size});
721         $tag = $self->index_lookup( $tag, $num );
722
723         if (!$tag) {
724             return if !$args->{create};
725
726             my $loc = $self->_fileobj->request_space(
727                 $self->tag_size( $self->{bucket_list_size} ),
728             );
729
730             $self->_fileobj->print_at( $ref_loc, pack($self->{long_pack}, $loc) );
731
732             $tag = $self->write_tag(
733                 $loc, SIG_BLIST,
734                 chr(0)x$self->{bucket_list_size},
735             );
736
737             $tag->{ref_loc} = $ref_loc;
738             $tag->{ch} = $ch;
739
740             last;
741         }
742
743         $tag->{ch} = $ch++;
744         $tag->{ref_loc} = $ref_loc;
745     }
746
747     return $tag;
748 }
749
750 sub index_lookup {
751     ##
752     # Given index tag, lookup single entry in index and return .
753     ##
754     my $self = shift;
755     my ($tag, $index) = @_;
756
757     my $location = unpack(
758         $self->{long_pack},
759         substr(
760             $tag->{content},
761             $index * $self->{long_size},
762             $self->{long_size},
763         ),
764     );
765
766     if (!$location) { return; }
767
768     return $self->load_tag( $location );
769 }
770
771 sub traverse_index {
772     ##
773     # Scan index and recursively step into deeper levels, looking for next key.
774     ##
775     my $self = shift;
776     my ($obj, $offset, $ch, $force_return_next) = @_;
777
778     my $tag = $self->load_tag( $offset );
779
780     if ($tag->{signature} ne SIG_BLIST) {
781         my $content = $tag->{content};
782         my $start = $obj->{return_next} ? 0 : ord(substr($obj->{prev_md5}, $ch, 1));
783
784         for (my $idx = $start; $idx < (2**8); $idx++) {
785             my $subloc = unpack(
786                 $self->{long_pack},
787                 substr(
788                     $content,
789                     $idx * $self->{long_size},
790                     $self->{long_size},
791                 ),
792             );
793
794             if ($subloc) {
795                 my $result = $self->traverse_index(
796                     $obj, $subloc, $ch + 1, $force_return_next,
797                 );
798
799                 if (defined($result)) { return $result; }
800             }
801         } # index loop
802
803         $obj->{return_next} = 1;
804     }
805     # This is the bucket list
806     else {
807         my $keys = $tag->{content};
808         if ($force_return_next) { $obj->{return_next} = 1; }
809
810         ##
811         # Iterate through buckets, looking for a key match
812         ##
813         my $transaction_id = $self->_fileobj->transaction_id;
814         for (my $i = 0; $i < $self->{max_buckets}; $i++) {
815             my ($key, $subloc, $size, $trans_id, $is_deleted) = $self->_get_key_subloc( $keys, $i );
816
817             next if $is_deleted;
818 #XXX Need to find all the copies of this key to find out if $transaction_id has it
819 #XXX marked as deleted, in use, or what.
820             next if $trans_id && $trans_id != $transaction_id;
821
822             # End of bucket list -- return to outer loop
823             if (!$subloc) {
824                 $obj->{return_next} = 1;
825                 last;
826             }
827             # Located previous key -- return next one found
828             elsif ($key eq $obj->{prev_md5}) {
829                 $obj->{return_next} = 1;
830                 next;
831             }
832             # Seek to bucket location and skip over signature
833             elsif ($obj->{return_next}) {
834                 my $fileobj = $self->_fileobj;
835
836                 # Skip over value to get to plain key
837                 my $sig = $fileobj->read_at( $subloc, SIG_SIZE );
838
839                 my $size = $fileobj->read_at( undef, $self->{data_size} );
840                 $size = unpack($self->{data_pack}, $size);
841                 if ($size) { $fileobj->increment_pointer( $size ); }
842
843                 # Read in plain key and return as scalar
844                 $size = $fileobj->read_at( undef, $self->{data_size} );
845                 $size = unpack($self->{data_pack}, $size);
846
847                 my $plain_key;
848                 if ($size) { $plain_key = $fileobj->read_at( undef, $size); }
849                 return $plain_key;
850             }
851         }
852
853         $obj->{return_next} = 1;
854     }
855
856     return;
857 }
858
859 sub get_next_key {
860     ##
861     # Locate next key, given digested previous one
862     ##
863     my $self = shift;
864     my ($obj) = @_;
865
866     $obj->{prev_md5} = $_[1] ? $_[1] : undef;
867     $obj->{return_next} = 0;
868
869     ##
870     # If the previous key was not specifed, start at the top and
871     # return the first one found.
872     ##
873     if (!$obj->{prev_md5}) {
874         $obj->{prev_md5} = chr(0) x $self->{hash_size};
875         $obj->{return_next} = 1;
876     }
877
878     return $self->traverse_index( $obj, $obj->_base_offset, 0 );
879 }
880
881 # Utilities
882
883 sub _get_key_subloc {
884     my $self = shift;
885     my ($keys, $idx) = @_;
886
887     my ($key, $subloc, $size, $transaction_id, $is_deleted) = unpack(
888         # This is 'a', not 'A'. Please read the pack() documentation for the
889         # difference between the two and why it's important.
890         "a$self->{hash_size} $self->{long_pack}2 n2",
891         substr(
892             $keys,
893             ($idx * $self->{bucket_size}),
894             $self->{bucket_size},
895         ),
896     );
897
898     return ($key, $subloc, $size, $transaction_id, $is_deleted);
899 }
900
901 sub _find_in_buckets {
902     my $self = shift;
903     my ($tag, $md5, $exact) = @_;
904     $exact ||= 0;
905
906     my $trans_id = $self->_fileobj->transaction_id;
907
908     my @zero;
909
910     BUCKET:
911     for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
912         my ($key, $subloc, $size, $transaction_id, $is_deleted) = $self->_get_key_subloc(
913             $tag->{content}, $i,
914         );
915
916         my @rv = ($subloc, $i * $self->{bucket_size}, $size, $is_deleted);
917
918         unless ( $subloc ) {
919             if ( !$exact && @zero && $trans_id ) {
920                 @rv = ($zero[2], $zero[0] * $self->{bucket_size},$zero[3],$is_deleted);
921             }
922             return @rv;
923         }
924
925         next BUCKET if $key ne $md5;
926
927         # Save off the HEAD in case we need it.
928         @zero = ($i,$key,$subloc,$size,$transaction_id,$is_deleted) if $transaction_id == 0;
929
930         next BUCKET if $transaction_id != $trans_id;
931
932         return @rv;
933     }
934
935     return;
936 }
937
938 sub _release_space {
939     my $self = shift;
940     my ($size, $loc) = @_;
941
942     my $next_loc = 0;
943
944     $self->_fileobj->print_at( $loc,
945         SIG_FREE, 
946         pack($self->{long_pack}, $size ),
947         pack($self->{long_pack}, $next_loc ),
948     );
949
950     return;
951 }
952
953 sub _throw_error {
954     die "DBM::Deep: $_[1]\n";
955 }
956
957 sub _get_dbm_object {
958     my $item = shift;
959
960     my $obj = eval {
961         local $SIG{__DIE__};
962         if ($item->isa( 'DBM::Deep' )) {
963             return $item;
964         }
965         return;
966     };
967     return $obj if $obj;
968
969     my $r = Scalar::Util::reftype( $item ) || '';
970     if ( $r eq 'HASH' ) {
971         my $obj = eval {
972             local $SIG{__DIE__};
973             my $obj = tied(%$item);
974             if ($obj->isa( 'DBM::Deep' )) {
975                 return $obj;
976             }
977             return;
978         };
979         return $obj if $obj;
980     }
981     elsif ( $r eq 'ARRAY' ) {
982         my $obj = eval {
983             local $SIG{__DIE__};
984             my $obj = tied(@$item);
985             if ($obj->isa( 'DBM::Deep' )) {
986                 return $obj;
987             }
988             return;
989         };
990         return $obj if $obj;
991     }
992
993     return;
994 }
995
996 sub _length_needed {
997     my $self = shift;
998     my ($value, $key) = @_;
999
1000     my $is_dbm_deep = eval {
1001         local $SIG{'__DIE__'};
1002         $value->isa( 'DBM::Deep' );
1003     };
1004
1005     my $len = SIG_SIZE
1006             + $self->{data_size} # size for value
1007             + $self->{data_size} # size for key
1008             + length( $key );    # length of key
1009
1010     if ( $is_dbm_deep && $value->_fileobj eq $self->_fileobj ) {
1011         # long_size is for the internal reference
1012         return $len + $self->{long_size};
1013     }
1014
1015     if ( $self->_fileobj->{autobless} ) {
1016         # This is for the bit saying whether or not this thing is blessed.
1017         $len += 1;
1018     }
1019
1020     my $r = Scalar::Util::reftype( $value ) || '';
1021     unless ( $r eq 'HASH' || $r eq 'ARRAY' ) {
1022         if ( defined $value ) {
1023             $len += length( $value );
1024         }
1025         return $len;
1026     }
1027
1028     $len += $self->{index_size};
1029
1030     # if autobless is enabled, must also take into consideration
1031     # the class name as it is stored after the key.
1032     if ( $self->_fileobj->{autobless} ) {
1033         my $c = Scalar::Util::blessed($value);
1034         if ( defined $c && !$is_dbm_deep ) {
1035             $len += $self->{data_size} + length($c);
1036         }
1037     }
1038
1039     return $len;
1040 }
1041
1042 1;
1043 __END__