Broke add_bucket up into _find_in_buckets() ... will propagate that method tomorrow
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 use strict;
4
5 use Fcntl qw( :DEFAULT :flock :seek );
6
7 ##
8 # Setup file and tag signatures.  These should never change.
9 ##
10 sub SIG_FILE     () { 'DPDB' }
11 sub SIG_INTERNAL () { 'i'    }
12 sub SIG_HASH     () { 'H'    }
13 sub SIG_ARRAY    () { 'A'    }
14 sub SIG_SCALAR   () { 'S'    }
15 sub SIG_NULL     () { 'N'    }
16 sub SIG_DATA     () { 'D'    }
17 sub SIG_INDEX    () { 'I'    }
18 sub SIG_BLIST    () { 'B'    }
19 sub SIG_SIZE     () {  1     }
20
21 sub precalc_sizes {
22     ##
23     # Precalculate index, bucket and bucket list sizes
24     ##
25     my $self = shift;
26
27     $self->{index_size}       = (2**8) * $self->{long_size};
28     $self->{bucket_size}      = $self->{hash_size} + $self->{long_size};
29     $self->{bucket_list_size} = $self->{max_buckets} * $self->{bucket_size};
30
31     return 1;
32 }
33
34 sub set_pack {
35     ##
36     # Set pack/unpack modes (see file header for more)
37     ##
38     my $self = shift;
39     my ($long_s, $long_p, $data_s, $data_p) = @_;
40
41     ##
42     # Set to 4 and 'N' for 32-bit offset tags (default).  Theoretical limit of 4
43     # GB per file.
44     #    (Perl must be compiled with largefile support for files > 2 GB)
45     #
46     # Set to 8 and 'Q' for 64-bit offsets.  Theoretical limit of 16 XB per file.
47     #    (Perl must be compiled with largefile and 64-bit long support)
48     ##
49     $self->{long_size} = $long_s ? $long_s : 4;
50     $self->{long_pack} = $long_p ? $long_p : 'N';
51
52     ##
53     # Set to 4 and 'N' for 32-bit data length prefixes.  Limit of 4 GB for each
54     # key/value. Upgrading this is possible (see above) but probably not necessary.
55     # If you need more than 4 GB for a single key or value, this module is really
56     # not for you :-)
57     ##
58     $self->{data_size} = $data_s ? $data_s : 4;
59     $self->{data_pack} = $data_p ? $data_p : 'N';
60
61     return $self->precalc_sizes();
62 }
63
64 sub set_digest {
65     ##
66     # Set key digest function (default is MD5)
67     ##
68     my $self = shift;
69     my ($digest_func, $hash_size) = @_;
70
71     $self->{digest} = $digest_func ? $digest_func : \&Digest::MD5::md5;
72     $self->{hash_size} = $hash_size ? $hash_size : 16;
73
74     return $self->precalc_sizes();
75 }
76
77 sub new {
78     my $class = shift;
79     my ($args) = @_;
80
81     my $self = bless {
82         long_size   => 4,
83         long_pack   => 'N',
84         data_size   => 4,
85         data_pack   => 'N',
86
87         digest      => \&Digest::MD5::md5,
88         hash_size   => 16,
89
90         ##
91         # Maximum number of buckets per list before another level of indexing is done.
92         # Increase this value for slightly greater speed, but larger database files.
93         # DO NOT decrease this value below 16, due to risk of recursive reindex overrun.
94         ##
95         max_buckets => 16,
96     }, $class;
97
98     $self->precalc_sizes;
99
100     return $self;
101 }
102
103 sub setup_fh {
104     my $self = shift;
105     my ($obj) = @_;
106
107     $self->open( $obj ) if !defined $obj->_fh;
108
109     #XXX We have to make sure we don't mess up when autoflush isn't turned on
110     unless ( $obj->_root->{inode} ) {
111         my @stats = stat($obj->_fh);
112         $obj->_root->{inode} = $stats[1];
113         $obj->_root->{end} = $stats[7];
114     }
115
116     return 1;
117 }
118
119 sub open {
120     ##
121     # Open a fh to the database, create if nonexistent.
122     # Make sure file signature matches DBM::Deep spec.
123     ##
124     my $self = shift;
125     my ($obj) = @_;
126
127     if (defined($obj->_fh)) { $self->close_fh( $obj ); }
128
129     # Theoretically, adding O_BINARY should remove the need for the binmode
130     # Of course, testing it is going to be ... interesting.
131     my $flags = O_RDWR | O_CREAT | O_BINARY;
132
133     my $fh;
134     sysopen( $fh, $obj->_root->{file}, $flags )
135         or $obj->_throw_error("Cannot sysopen file: " . $obj->_root->{file} . ": $!");
136     $obj->_root->{fh} = $fh;
137
138     #XXX Can we remove this by using the right sysopen() flags?
139     # Maybe ... q.v. above
140     binmode $fh; # for win32
141
142     if ($obj->_root->{autoflush}) {
143         my $old = select $fh;
144         $|=1;
145         select $old;
146     }
147
148     seek($fh, 0 + $obj->_root->{file_offset}, SEEK_SET);
149
150     my $signature;
151     my $bytes_read = read( $fh, $signature, length(SIG_FILE));
152
153     ##
154     # File is empty -- write signature and master index
155     ##
156     if (!$bytes_read) {
157         seek($fh, 0 + $obj->_root->{file_offset}, SEEK_SET);
158         print( $fh SIG_FILE);
159
160         $self->create_tag($obj, $obj->_base_offset, $obj->_type, chr(0) x $self->{index_size});
161
162         # Flush the filehandle
163         my $old_fh = select $fh;
164         my $old_af = $|; $| = 1; $| = $old_af;
165         select $old_fh;
166
167         return 1;
168     }
169
170     ##
171     # Check signature was valid
172     ##
173     unless ($signature eq SIG_FILE) {
174         $self->close_fh( $obj );
175         $obj->_throw_error("Signature not found -- file is not a Deep DB");
176     }
177
178     ##
179     # Get our type from master index signature
180     ##
181     my $tag = $self->load_tag($obj, $obj->_base_offset)
182         or $obj->_throw_error("Corrupted file, no master index record");
183
184     unless ($obj->{type} eq $tag->{signature}) {
185         $obj->_throw_error("File type mismatch");
186     }
187
188 #XXX We probably also want to store the hash algorithm name and not assume anything
189 #XXX The cool thing would be to allow a different hashing algorithm at every level
190
191     return 1;
192 }
193
194 sub close_fh {
195     my $self = shift;
196     my ($obj) = @_;
197
198     if ( my $fh = $obj->_root->{fh} ) {
199         close $fh;
200     }
201     $obj->_root->{fh} = undef;
202
203     return 1;
204 }
205
206 sub create_tag {
207     ##
208     # Given offset, signature and content, create tag and write to disk
209     ##
210     my $self = shift;
211     my ($obj, $offset, $sig, $content) = @_;
212     my $size = length($content);
213
214     my $fh = $obj->_fh;
215
216     seek($fh, $offset + $obj->_root->{file_offset}, SEEK_SET);
217     print( $fh $sig . pack($self->{data_pack}, $size) . $content );
218
219     if ($offset == $obj->_root->{end}) {
220         $obj->_root->{end} += SIG_SIZE + $self->{data_size} + $size;
221     }
222
223     return {
224         signature => $sig,
225         size => $size,
226         offset => $offset + SIG_SIZE + $self->{data_size},
227         content => $content
228     };
229 }
230
231 sub load_tag {
232     ##
233     # Given offset, load single tag and return signature, size and data
234     ##
235     my $self = shift;
236     my ($obj, $offset) = @_;
237
238     my $fh = $obj->_fh;
239
240     seek($fh, $offset + $obj->_root->{file_offset}, SEEK_SET);
241
242     #XXX I'm not sure this check will work if autoflush isn't enabled ...
243     return if eof $fh;
244
245     my $b;
246     read( $fh, $b, SIG_SIZE + $self->{data_size} );
247     my ($sig, $size) = unpack( "A $self->{data_pack}", $b );
248
249     my $buffer;
250     read( $fh, $buffer, $size);
251
252     return {
253         signature => $sig,
254         size => $size,
255         offset => $offset + SIG_SIZE + $self->{data_size},
256         content => $buffer
257     };
258 }
259
260 sub add_bucket {
261     ##
262     # Adds one key/value pair to bucket list, given offset, MD5 digest of key,
263     # plain (undigested) key and value.
264     ##
265     my $self = shift;
266     my ($obj, $tag, $md5, $plain_key, $value) = @_;
267
268     my $location = 0;
269     my $result = 2;
270
271     my $root = $obj->_root;
272
273     my $is_dbm_deep = eval { local $SIG{'__DIE__'}; $value->isa( 'DBM::Deep' ) };
274     my $internal_ref = $is_dbm_deep && ($value->_root eq $root);
275
276     my $fh = $obj->_fh;
277
278     {
279         my ($subloc, $offset) = $self->_find_in_buckets( $tag, $md5 );
280
281         # Updating a known md5
282         if ( $subloc ) {
283             $result = 1;
284
285             seek($fh, $subloc + SIG_SIZE + $root->{file_offset}, SEEK_SET);
286             my $size;
287             read( $fh, $size, $self->{data_size});
288             $size = unpack($self->{data_pack}, $size);
289
290             ##
291             # If value is a hash, array, or raw value with equal or less size, we can
292             # reuse the same content area of the database.  Otherwise, we have to create
293             # a new content area at the EOF.
294             ##
295             my $actual_length;
296             if ( $internal_ref ) {
297                 $actual_length = $self->{long_size};
298             }
299             else {
300                 my $r = Scalar::Util::reftype( $value ) || '';
301                 if ( $r eq 'HASH' || $r eq 'ARRAY' ) {
302                     $actual_length = $self->{index_size};
303
304                     # if autobless is enabled, must also take into consideration
305                     # the class name, as it is stored along with key/value.
306                     if ( $root->{autobless} ) {
307                         my $value_class = Scalar::Util::blessed($value);
308                         if ( defined $value_class && !$value->isa('DBM::Deep') ) {
309                             $actual_length += length($value_class);
310                         }
311                     }
312                 }
313                 else { $actual_length = length($value); }
314             }
315
316             if ($actual_length <= $size) {
317                 $location = $subloc;
318             }
319             else {
320                 $location = $root->{end};
321                 seek(
322                     $fh,
323                     $tag->{offset} + $offset + $self->{hash_size} + $root->{file_offset},
324                     SEEK_SET,
325                 );
326                 print( $fh pack($self->{long_pack}, $location) );
327             }
328         }
329         # Adding a new md5
330         elsif ( defined $offset ) {
331             $result = 2;
332             $location = $root->{end};
333
334             seek( $fh, $tag->{offset} + $offset + $root->{file_offset}, SEEK_SET );
335             print( $fh $md5 . pack($self->{long_pack}, $location) );
336         }
337         # If bucket didn't fit into list, split into a new index level
338         else {
339             $self->split_index( $obj, $md5, $tag );
340
341             $location = $root->{end};
342         }
343     }
344
345     ##
346     # Seek to content area and store signature, value and plaintext key
347     ##
348     if ($location) {
349         seek($fh, $location + $root->{file_offset}, SEEK_SET);
350
351         ##
352         # Write signature based on content type, set content length and write
353         # actual value.
354         ##
355         my $r = Scalar::Util::reftype($value) || '';
356         my $content_length;
357         if ( $internal_ref ) {
358             print( $fh SIG_INTERNAL );
359             print( $fh pack($self->{data_pack}, $self->{long_size}) );
360             print( $fh pack($self->{long_pack}, $value->_base_offset) );
361             $content_length = $self->{long_size};
362         }
363         else {
364             if ($r eq 'HASH') {
365                 print( $fh SIG_HASH );
366                 print( $fh pack($self->{data_pack}, $self->{index_size}) . chr(0) x $self->{index_size} );
367                 $content_length = $self->{index_size};
368             }
369             elsif ($r eq 'ARRAY') {
370                 print( $fh SIG_ARRAY );
371                 print( $fh pack($self->{data_pack}, $self->{index_size}) . chr(0) x $self->{index_size} );
372                 $content_length = $self->{index_size};
373             }
374             elsif (!defined($value)) {
375                 print( $fh SIG_NULL );
376                 print( $fh pack($self->{data_pack}, 0) );
377                 $content_length = 0;
378             }
379             else {
380                 print( $fh SIG_DATA );
381                 print( $fh pack($self->{data_pack}, length($value)) . $value );
382                 $content_length = length($value);
383             }
384         }
385
386         ##
387         # Plain key is stored AFTER value, as keys are typically fetched less often.
388         ##
389         print( $fh pack($self->{data_pack}, length($plain_key)) . $plain_key );
390
391         ##
392         # If value is blessed, preserve class name
393         ##
394         if ( $root->{autobless} ) {
395             my $value_class = Scalar::Util::blessed($value);
396             if ( defined $value_class && !$value->isa( 'DBM::Deep' ) ) {
397                 ##
398                 # Blessed ref -- will restore later
399                 ##
400                 print( $fh chr(1) );
401                 print( $fh pack($self->{data_pack}, length($value_class)) . $value_class );
402                 $content_length += 1;
403                 $content_length += $self->{data_size} + length($value_class);
404             }
405             else {
406                 print( $fh chr(0) );
407                 $content_length += 1;
408             }
409         }
410
411         ##
412         # If this is a new content area, advance EOF counter
413         ##
414         if ($location == $root->{end}) {
415             $root->{end} += SIG_SIZE;
416             $root->{end} += $self->{data_size} + $content_length;
417             $root->{end} += $self->{data_size} + length($plain_key);
418         }
419
420         ##
421         # If content is a hash or array, create new child DBM::Deep object and
422         # pass each key or element to it.
423         ##
424         if ( ! $internal_ref ) {
425             if ($r eq 'HASH') {
426                 my $branch = DBM::Deep->new(
427                     type => DBM::Deep->TYPE_HASH,
428                     base_offset => $location,
429                     root => $root,
430                 );
431                 foreach my $key (keys %{$value}) {
432                     $branch->STORE( $key, $value->{$key} );
433                 }
434             }
435             elsif ($r eq 'ARRAY') {
436                 my $branch = DBM::Deep->new(
437                     type => DBM::Deep->TYPE_ARRAY,
438                     base_offset => $location,
439                     root => $root,
440                 );
441                 my $index = 0;
442                 foreach my $element (@{$value}) {
443                     $branch->STORE( $index, $element );
444                     $index++;
445                 }
446             }
447         }
448
449         return $result;
450     }
451
452     $obj->_throw_error("Fatal error: indexing failed -- possibly due to corruption in file");
453 }
454
455 sub split_index {
456     my $self = shift;
457     my ($obj, $md5, $tag) = @_;
458
459     my $fh = $obj->_fh;
460     my $root = $obj->_root;
461     my $keys = $tag->{content};
462
463     seek($fh, $tag->{ref_loc} + $root->{file_offset}, SEEK_SET);
464     print( $fh pack($self->{long_pack}, $root->{end}) );
465
466     my $index_tag = $self->create_tag(
467         $obj,
468         $root->{end},
469         SIG_INDEX,
470         chr(0) x $self->{index_size},
471     );
472
473     my @offsets = ();
474
475     $keys .= $md5 . pack($self->{long_pack}, 0);
476
477     BUCKET:
478     for (my $i = 0; $i <= $self->{max_buckets}; $i++) {
479         my ($key, $old_subloc) = $self->_get_key_subloc( $keys, $i );
480
481         next BUCKET unless $key;
482
483         my $num = ord(substr($key, $tag->{ch} + 1, 1));
484
485         if ($offsets[$num]) {
486             my $offset = $offsets[$num] + SIG_SIZE + $self->{data_size};
487             seek($fh, $offset + $root->{file_offset}, SEEK_SET);
488             my $subkeys;
489             read( $fh, $subkeys, $self->{bucket_list_size});
490
491             for (my $k=0; $k<$self->{max_buckets}; $k++) {
492                 my ($temp, $subloc) = $self->_get_key_subloc( $subkeys, $k );
493
494                 if (!$subloc) {
495                     seek($fh, $offset + ($k * $self->{bucket_size}) + $root->{file_offset}, SEEK_SET);
496                     print( $fh $key . pack($self->{long_pack}, $old_subloc || $root->{end}) );
497                     last;
498                 }
499             } # k loop
500         }
501         else {
502             $offsets[$num] = $root->{end};
503             seek($fh, $index_tag->{offset} + ($num * $self->{long_size}) + $root->{file_offset}, SEEK_SET);
504             print( $fh pack($self->{long_pack}, $root->{end}) );
505
506             my $blist_tag = $self->create_tag($obj, $root->{end}, SIG_BLIST, chr(0) x $self->{bucket_list_size});
507
508             seek($fh, $blist_tag->{offset} + $root->{file_offset}, SEEK_SET);
509             print( $fh $key . pack($self->{long_pack}, $old_subloc || $root->{end}) );
510         }
511     } # i loop
512
513     return;
514 }
515
516 sub read_from_loc {
517     my $self = shift;
518     my ($obj, $subloc) = @_;
519
520     my $fh = $obj->_fh;
521
522     ##
523     # Found match -- seek to offset and read signature
524     ##
525     my $signature;
526     seek($fh, $subloc + $obj->_root->{file_offset}, SEEK_SET);
527     read( $fh, $signature, SIG_SIZE);
528
529     ##
530     # If value is a hash or array, return new DBM::Deep object with correct offset
531     ##
532     if (($signature eq SIG_HASH) || ($signature eq SIG_ARRAY)) {
533         my $obj = DBM::Deep->new(
534             type => $signature,
535             base_offset => $subloc,
536             root => $obj->_root,
537         );
538
539         if ($obj->_root->{autobless}) {
540             ##
541             # Skip over value and plain key to see if object needs
542             # to be re-blessed
543             ##
544             seek($fh, $self->{data_size} + $self->{index_size}, SEEK_CUR);
545
546             my $size;
547             read( $fh, $size, $self->{data_size}); $size = unpack($self->{data_pack}, $size);
548             if ($size) { seek($fh, $size, SEEK_CUR); }
549
550             my $bless_bit;
551             read( $fh, $bless_bit, 1);
552             if (ord($bless_bit)) {
553                 ##
554                 # Yes, object needs to be re-blessed
555                 ##
556                 my $class_name;
557                 read( $fh, $size, $self->{data_size}); $size = unpack($self->{data_pack}, $size);
558                 if ($size) { read( $fh, $class_name, $size); }
559                 if ($class_name) { $obj = bless( $obj, $class_name ); }
560             }
561         }
562
563         return $obj;
564     }
565     elsif ( $signature eq SIG_INTERNAL ) {
566         my $size;
567         read( $fh, $size, $self->{data_size});
568         $size = unpack($self->{data_pack}, $size);
569
570         if ( $size ) {
571             my $new_loc;
572             read( $fh, $new_loc, $size );
573             $new_loc = unpack( $self->{long_pack}, $new_loc );
574
575             return $self->read_from_loc( $obj, $new_loc );
576         }
577         else {
578             return;
579         }
580     }
581     ##
582     # Otherwise return actual value
583     ##
584     elsif ($signature eq SIG_DATA) {
585         my $size;
586         read( $fh, $size, $self->{data_size});
587         $size = unpack($self->{data_pack}, $size);
588
589         my $value = '';
590         if ($size) { read( $fh, $value, $size); }
591         return $value;
592     }
593
594     ##
595     # Key exists, but content is null
596     ##
597     return;
598 }
599
600 sub get_bucket_value {
601     ##
602     # Fetch single value given tag and MD5 digested key.
603     ##
604     my $self = shift;
605     my ($obj, $tag, $md5) = @_;
606     my $keys = $tag->{content};
607
608     ##
609     # Iterate through buckets, looking for a key match
610     ##
611     BUCKET:
612     for (my $i = 0; $i < $self->{max_buckets}; $i++) {
613         my ($key, $subloc) = $self->_get_key_subloc( $keys, $i );
614
615         if (!$subloc) {
616             ##
617             # Hit end of list, no match
618             ##
619             return;
620         }
621
622         if ( $md5 ne $key ) {
623             next BUCKET;
624         }
625
626         return $self->read_from_loc( $obj, $subloc );
627     } # i loop
628
629     return;
630 }
631
632 sub delete_bucket {
633     ##
634     # Delete single key/value pair given tag and MD5 digested key.
635     ##
636     my $self = shift;
637     my ($obj, $tag, $md5) = @_;
638     my $keys = $tag->{content};
639
640     my $fh = $obj->_fh;
641
642     ##
643     # Iterate through buckets, looking for a key match
644     ##
645     BUCKET:
646     for (my $i=0; $i<$self->{max_buckets}; $i++) {
647         my ($key, $subloc) = $self->_get_key_subloc( $keys, $i );
648
649         if (!$subloc) {
650             ##
651             # Hit end of list, no match
652             ##
653             return;
654         }
655
656         if ( $md5 ne $key ) {
657             next BUCKET;
658         }
659
660         ##
661         # Matched key -- delete bucket and return
662         ##
663         seek($fh, $tag->{offset} + ($i * $self->{bucket_size}) + $obj->_root->{file_offset}, SEEK_SET);
664         print( $fh substr($keys, ($i+1) * $self->{bucket_size} ) );
665         print( $fh chr(0) x $self->{bucket_size} );
666
667         return 1;
668     } # i loop
669
670     return;
671 }
672
673 sub bucket_exists {
674     ##
675     # Check existence of single key given tag and MD5 digested key.
676     ##
677     my $self = shift;
678     my ($obj, $tag, $md5) = @_;
679     my $keys = $tag->{content};
680
681     ##
682     # Iterate through buckets, looking for a key match
683     ##
684     BUCKET:
685     for (my $i=0; $i<$self->{max_buckets}; $i++) {
686         my ($key, $subloc) = $self->_get_key_subloc( $keys, $i );
687
688         if (!$subloc) {
689             ##
690             # Hit end of list, no match
691             ##
692             return;
693         }
694
695         if ( $md5 ne $key ) {
696             next BUCKET;
697         }
698
699         ##
700         # Matched key -- return true
701         ##
702         return 1;
703     } # i loop
704
705     return;
706 }
707
708 sub find_bucket_list {
709     ##
710     # Locate offset for bucket list, given digested key
711     ##
712     my $self = shift;
713     my ($obj, $md5, $args) = @_;
714     $args = {} unless $args;
715
716     ##
717     # Locate offset for bucket list using digest index system
718     ##
719     my $tag = $self->load_tag($obj, $obj->_base_offset)
720         or $self->_throw_error( "INTERNAL ERROR - Cannot find tag" );
721
722     my $ch = 0;
723     while ($tag->{signature} ne SIG_BLIST) {
724         my $num = ord substr($md5, $ch, 1);
725
726         my $ref_loc = $tag->{offset} + ($num * $self->{long_size});
727         $tag = $self->index_lookup( $obj, $tag, $num );
728
729         if (!$tag) {
730             if ( $args->{create} ) {
731                 my $fh = $obj->_fh;
732                 seek($fh, $ref_loc + $obj->_root->{file_offset}, SEEK_SET);
733                 print( $fh pack($self->{long_pack}, $obj->_root->{end}) );
734
735                 $tag = $self->create_tag(
736                     $obj, $obj->_root->{end},
737                     SIG_BLIST,
738                     chr(0) x $self->{bucket_list_size},
739                 );
740
741                 $tag->{ref_loc} = $ref_loc;
742                 $tag->{ch} = $ch;
743
744                 last;
745             }
746             else {
747                 return;
748             }
749         }
750
751         $tag->{ch} = $ch;
752         $tag->{ref_loc} = $ref_loc;
753
754         $ch++;
755     }
756
757     return $tag;
758 }
759
760 sub index_lookup {
761     ##
762     # Given index tag, lookup single entry in index and return .
763     ##
764     my $self = shift;
765     my ($obj, $tag, $index) = @_;
766
767     my $location = unpack(
768         $self->{long_pack},
769         substr(
770             $tag->{content},
771             $index * $self->{long_size},
772             $self->{long_size},
773         ),
774     );
775
776     if (!$location) { return; }
777
778     return $self->load_tag( $obj, $location );
779 }
780
781 sub traverse_index {
782     ##
783     # Scan index and recursively step into deeper levels, looking for next key.
784     ##
785     my $self = shift;
786     my ($obj, $offset, $ch, $force_return_next) = @_;
787
788     my $tag = $self->load_tag($obj, $offset );
789
790     my $fh = $obj->_fh;
791
792     if ($tag->{signature} ne SIG_BLIST) {
793         my $content = $tag->{content};
794         my $start = $obj->{return_next} ? 0 : ord(substr($obj->{prev_md5}, $ch, 1));
795
796         for (my $index = $start; $index < 256; $index++) {
797             my $subloc = unpack(
798                 $self->{long_pack},
799                 substr($content, $index * $self->{long_size}, $self->{long_size}),
800             );
801
802             if ($subloc) {
803                 my $result = $self->traverse_index(
804                     $obj, $subloc, $ch + 1, $force_return_next,
805                 );
806
807                 if (defined($result)) { return $result; }
808             }
809         } # index loop
810
811         $obj->{return_next} = 1;
812     } # tag is an index
813
814     else {
815         my $keys = $tag->{content};
816         if ($force_return_next) { $obj->{return_next} = 1; }
817
818         ##
819         # Iterate through buckets, looking for a key match
820         ##
821         for (my $i = 0; $i < $self->{max_buckets}; $i++) {
822             my ($key, $subloc) = $self->_get_key_subloc( $keys, $i );
823
824             # End of bucket list -- return to outer loop
825             if (!$subloc) {
826                 $obj->{return_next} = 1;
827                 last;
828             }
829             # Located previous key -- return next one found
830             elsif ($key eq $obj->{prev_md5}) {
831                 $obj->{return_next} = 1;
832                 next;
833             }
834             # Seek to bucket location and skip over signature
835             elsif ($obj->{return_next}) {
836                 seek($fh, $subloc + $obj->_root->{file_offset}, SEEK_SET);
837
838                 # Skip over value to get to plain key
839                 my $sig;
840                 read( $fh, $sig, SIG_SIZE );
841
842                 my $size;
843                 read( $fh, $size, $self->{data_size});
844                 $size = unpack($self->{data_pack}, $size);
845                 if ($size) { seek($fh, $size, SEEK_CUR); }
846
847                 # Read in plain key and return as scalar
848                 my $plain_key;
849                 read( $fh, $size, $self->{data_size});
850                 $size = unpack($self->{data_pack}, $size);
851                 if ($size) { read( $fh, $plain_key, $size); }
852
853                 return $plain_key;
854             }
855         }
856
857         $obj->{return_next} = 1;
858     } # tag is a bucket list
859
860     return;
861 }
862
863 sub get_next_key {
864     ##
865     # Locate next key, given digested previous one
866     ##
867     my $self = shift;
868     my ($obj) = @_;
869
870     $obj->{prev_md5} = $_[1] ? $_[1] : undef;
871     $obj->{return_next} = 0;
872
873     ##
874     # If the previous key was not specifed, start at the top and
875     # return the first one found.
876     ##
877     if (!$obj->{prev_md5}) {
878         $obj->{prev_md5} = chr(0) x $self->{hash_size};
879         $obj->{return_next} = 1;
880     }
881
882     return $self->traverse_index( $obj, $obj->_base_offset, 0 );
883 }
884
885 # Utilities
886
887 sub _get_key_subloc {
888     my $self = shift;
889     my ($keys, $idx) = @_;
890
891     my ($key, $subloc) = unpack(
892         "a$self->{hash_size} $self->{long_pack}",
893         substr(
894             $keys,
895             ($idx * $self->{bucket_size}),
896             $self->{bucket_size},
897         ),
898     );
899
900     return ($key, $subloc);
901 }
902
903 sub _find_in_buckets {
904     my $self = shift;
905     my ($tag, $md5) = @_;
906
907     BUCKET:
908     for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
909         my ($key, $subloc) = $self->_get_key_subloc( $tag->{content}, $i );
910
911         return ($subloc, $i * $self->{bucket_size}) unless $subloc;
912
913         next BUCKET if $key ne $md5;
914
915         return ($subloc, $i * $self->{bucket_size});
916     }
917
918     return;
919 }
920
921 1;
922 __END__