Moved almost all direct accesses to into ::File
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 use 5.6.0;
4
5 use strict;
6 use warnings;
7
8 use Fcntl qw( :DEFAULT :flock :seek );
9 use Scalar::Util ();
10
11 # File-wide notes:
12 # * To add to bucket_size, make sure you modify the following:
13 #   - calculate_sizes()
14 #   - _get_key_subloc()
15 #   - add_bucket() - where the buckets are printed
16
17 ##
18 # Setup file and tag signatures.  These should never change.
19 ##
20 sub SIG_FILE     () { 'DPDB' }
21 sub SIG_HEADER   () { 'h'    }
22 sub SIG_INTERNAL () { 'i'    }
23 sub SIG_HASH     () { 'H'    }
24 sub SIG_ARRAY    () { 'A'    }
25 sub SIG_NULL     () { 'N'    }
26 sub SIG_DATA     () { 'D'    }
27 sub SIG_INDEX    () { 'I'    }
28 sub SIG_BLIST    () { 'B'    }
29 sub SIG_FREE     () { 'F'    }
30 sub SIG_SIZE     () {  1     }
31
32 sub new {
33     my $class = shift;
34     my ($args) = @_;
35
36     my $self = bless {
37         long_size   => 4,
38         long_pack   => 'N',
39         data_size   => 4,
40         data_pack   => 'N',
41
42         digest      => \&Digest::MD5::md5,
43         hash_size   => 16,
44
45         ##
46         # Maximum number of buckets per list before another level of indexing is
47         # done. Increase this value for slightly greater speed, but larger database
48         # files. DO NOT decrease this value below 16, due to risk of recursive
49         # reindex overrun.
50         ##
51         max_buckets => 16,
52
53         fileobj => undef,
54         obj     => undef,
55     }, $class;
56
57     if ( defined $args->{pack_size} ) {
58         if ( lc $args->{pack_size} eq 'small' ) {
59             $args->{long_size} = 2;
60             $args->{long_pack} = 'n';
61         }
62         elsif ( lc $args->{pack_size} eq 'medium' ) {
63             $args->{long_size} = 4;
64             $args->{long_pack} = 'N';
65         }
66         elsif ( lc $args->{pack_size} eq 'large' ) {
67             $args->{long_size} = 8;
68             $args->{long_pack} = 'Q';
69         }
70         else {
71             die "Unknown pack_size value: '$args->{pack_size}'\n";
72         }
73     }
74
75     # Grab the parameters we want to use
76     foreach my $param ( keys %$self ) {
77         next unless exists $args->{$param};
78         $self->{$param} = $args->{$param};
79     }
80     Scalar::Util::weaken( $self->{obj} ) if $self->{obj};
81
82     if ( $self->{max_buckets} < 16 ) {
83         warn "Floor of max_buckets is 16. Setting it to 16 from '$self->{max_buckets}'\n";
84         $self->{max_buckets} = 16;
85     }
86
87     return $self;
88 }
89
90 sub _fileobj { return $_[0]{fileobj} }
91
92 sub calculate_sizes {
93     my $self = shift;
94
95     #XXX Does this need to be updated with different hashing algorithms?
96     $self->{index_size}       = (2**8) * $self->{long_size};
97     $self->{bucket_size}      = $self->{hash_size} + $self->{long_size} * 3;
98     $self->{bucket_list_size} = $self->{max_buckets} * $self->{bucket_size};
99
100     return;
101 }
102
103 sub write_file_header {
104     my $self = shift;
105
106     my $loc = $self->_fileobj->request_space( length( SIG_FILE ) + 21 );
107
108     $self->_fileobj->print_at( $loc,
109         SIG_FILE,
110         SIG_HEADER,
111         pack('N', 1),  # header version
112         pack('N', 12), # header size
113         pack('N', 0),  # currently running transaction IDs
114         pack('n', $self->{long_size}),
115         pack('A', $self->{long_pack}),
116         pack('n', $self->{data_size}),
117         pack('A', $self->{data_pack}),
118         pack('n', $self->{max_buckets}),
119     );
120
121     $self->_fileobj->set_transaction_offset( 13 );
122
123     return;
124 }
125
126 sub read_file_header {
127     my $self = shift;
128
129     my $buffer = $self->_fileobj->read_at( 0, length(SIG_FILE) + 9 );
130     return unless length($buffer);
131
132     my ($file_signature, $sig_header, $header_version, $size) = unpack(
133         'A4 A N N', $buffer
134     );
135
136     unless ( $file_signature eq SIG_FILE ) {
137         $self->_fileobj->close;
138         $self->_throw_error( "Signature not found -- file is not a Deep DB" );
139     }
140
141     unless ( $sig_header eq SIG_HEADER ) {
142         $self->_fileobj->close;
143         $self->_throw_error( "Old file version found." );
144     }
145
146     my $buffer2 = $self->_fileobj->read_at( undef, $size );
147     my ($running_transactions, @values) = unpack( 'N n A n A n', $buffer2 );
148
149     $self->_fileobj->set_transaction_offset( 13 );
150
151     if ( @values < 5 || grep { !defined } @values ) {
152         $self->_fileobj->close;
153         $self->_throw_error("Corrupted file - bad header");
154     }
155
156     #XXX Add warnings if values weren't set right
157     @{$self}{qw(long_size long_pack data_size data_pack max_buckets)} = @values;
158
159     return length($buffer) + length($buffer2);
160 }
161
162 sub setup_fh {
163     my $self = shift;
164     my ($obj) = @_;
165
166     # Need to remove use of $fh here
167     my $fh = $self->_fileobj->{fh};
168     flock $fh, LOCK_EX;
169
170     #XXX The duplication of calculate_sizes needs to go away
171     unless ( $obj->{base_offset} ) {
172         my $bytes_read = $self->read_file_header;
173
174         $self->calculate_sizes;
175
176         ##
177         # File is empty -- write header and master index
178         ##
179         if (!$bytes_read) {
180             $self->_fileobj->audit( "# Database created on" );
181
182             $self->write_file_header;
183
184             $obj->{base_offset} = $self->_fileobj->request_space( $self->tag_size( $self->{index_size} ) );
185
186             $self->write_tag(
187                 $obj->_base_offset, $obj->_type,
188                 chr(0)x$self->{index_size},
189             );
190
191             # Flush the filehandle
192             my $old_fh = select $fh;
193             my $old_af = $|; $| = 1; $| = $old_af;
194             select $old_fh;
195         }
196         else {
197             $obj->{base_offset} = $bytes_read;
198
199             ##
200             # Get our type from master index header
201             ##
202             my $tag = $self->load_tag($obj->_base_offset);
203             unless ( $tag ) {
204                 flock $fh, LOCK_UN;
205                 $self->_throw_error("Corrupted file, no master index record");
206             }
207
208             unless ($obj->_type eq $tag->{signature}) {
209                 flock $fh, LOCK_UN;
210                 $self->_throw_error("File type mismatch");
211             }
212         }
213     }
214     else {
215         $self->calculate_sizes;
216     }
217
218     #XXX We have to make sure we don't mess up when autoflush isn't turned on
219     $self->_fileobj->set_inode;
220
221     flock $fh, LOCK_UN;
222
223     return 1;
224 }
225
226 sub tag_size {
227     my $self = shift;
228     my ($size) = @_;
229     return SIG_SIZE + $self->{data_size} + $size;
230 }
231
232 sub write_tag {
233     ##
234     # Given offset, signature and content, create tag and write to disk
235     ##
236     my $self = shift;
237     my ($offset, $sig, $content) = @_;
238     my $size = length( $content );
239
240     $self->_fileobj->print_at(
241         $offset, 
242         $sig, pack($self->{data_pack}, $size), $content,
243     );
244
245     return unless defined $offset;
246
247     return {
248         signature => $sig,
249         size => $size,
250         offset => $offset + SIG_SIZE + $self->{data_size},
251         content => $content
252     };
253 }
254
255 sub load_tag {
256     ##
257     # Given offset, load single tag and return signature, size and data
258     ##
259     my $self = shift;
260     my ($offset) = @_;
261
262     my $fileobj = $self->_fileobj;
263
264     my $s = SIG_SIZE + $self->{data_size};
265     my $b = $fileobj->read_at( $offset, $s );
266     my ($sig, $size) = unpack( "A $self->{data_pack}", $b );
267
268     my $buffer = $fileobj->read_at( undef, $size );
269
270     return {
271         signature => $sig,
272         size => $size,
273         offset => $offset + SIG_SIZE + $self->{data_size},
274         content => $buffer
275     };
276 }
277
278 sub _get_dbm_object {
279     my $item = shift;
280
281     my $obj = eval {
282         local $SIG{__DIE__};
283         if ($item->isa( 'DBM::Deep' )) {
284             return $item;
285         }
286         return;
287     };
288     return $obj if $obj;
289
290     my $r = Scalar::Util::reftype( $item ) || '';
291     if ( $r eq 'HASH' ) {
292         my $obj = eval {
293             local $SIG{__DIE__};
294             my $obj = tied(%$item);
295             if ($obj->isa( 'DBM::Deep' )) {
296                 return $obj;
297             }
298             return;
299         };
300         return $obj if $obj;
301     }
302     elsif ( $r eq 'ARRAY' ) {
303         my $obj = eval {
304             local $SIG{__DIE__};
305             my $obj = tied(@$item);
306             if ($obj->isa( 'DBM::Deep' )) {
307                 return $obj;
308             }
309             return;
310         };
311         return $obj if $obj;
312     }
313
314     return;
315 }
316
317 sub _length_needed {
318     my $self = shift;
319     my ($value, $key) = @_;
320
321     my $is_dbm_deep = eval {
322         local $SIG{'__DIE__'};
323         $value->isa( 'DBM::Deep' );
324     };
325
326     my $len = SIG_SIZE + $self->{data_size}
327             + $self->{data_size} + length( $key );
328
329     if ( $is_dbm_deep && $value->_fileobj eq $self->_fileobj ) {
330         return $len + $self->{long_size};
331     }
332
333     my $r = Scalar::Util::reftype( $value ) || '';
334     if ( $self->_fileobj->{autobless} ) {
335         # This is for the bit saying whether or not this thing is blessed.
336         $len += 1;
337     }
338
339     unless ( $r eq 'HASH' || $r eq 'ARRAY' ) {
340         if ( defined $value ) {
341             $len += length( $value );
342         }
343         return $len;
344     }
345
346     $len += $self->{index_size};
347
348     # if autobless is enabled, must also take into consideration
349     # the class name as it is stored after the key.
350     if ( $self->_fileobj->{autobless} ) {
351         my $c = Scalar::Util::blessed($value);
352         if ( defined $c && !$is_dbm_deep ) {
353             $len += $self->{data_size} + length($c);
354         }
355     }
356
357     return $len;
358 }
359
360 sub add_bucket {
361     ##
362     # Adds one key/value pair to bucket list, given offset, MD5 digest of key,
363     # plain (undigested) key and value.
364     ##
365     my $self = shift;
366     my ($tag, $md5, $plain_key, $value, $deleted, $orig_key) = @_;
367     $deleted ||= 0;
368
369     local($/,$\);
370
371     # This verifies that only supported values will be stored.
372     {
373         my $r = Scalar::Util::reftype( $value );
374         last if !defined $r;
375
376         last if $r eq 'HASH';
377         last if $r eq 'ARRAY';
378
379         $self->_throw_error(
380             "Storage of variables of type '$r' is not supported."
381         );
382     }
383
384     my $location = 0;
385     my $result = 2;
386
387     my $fileobj = $self->_fileobj;
388
389     my $actual_length = $self->_length_needed( $value, $plain_key );
390
391     #ACID - This is a mutation. Must only find the exact transaction
392     my ($subloc, $offset, $size,$is_deleted) = $self->_find_in_buckets( $tag, $md5, 1 );
393
394     my @transactions;
395     if ( $fileobj->transaction_id == 0 ) {
396         @transactions = $fileobj->current_transactions;
397     }
398
399 #    $self->_release_space( $size, $subloc );
400     # Updating a known md5
401 #XXX This needs updating to use _release_space
402     if ( $subloc ) {
403         $result = 1;
404
405         if ($actual_length <= $size) {
406             $location = $subloc;
407         }
408         else {
409             $location = $fileobj->request_space( $actual_length );
410
411             $fileobj->print_at( $tag->{offset} + $offset + $self->{hash_size},
412                 pack($self->{long_pack}, $location ),
413                 pack($self->{long_pack}, $actual_length ),
414                 pack('n n', $fileobj->transaction_id, $deleted ),
415             );
416         }
417     }
418     # Adding a new md5
419     elsif ( defined $offset ) {
420         $location = $fileobj->request_space( $actual_length );
421
422         $fileobj->print_at( $tag->{offset} + $offset,
423             $md5,
424             pack($self->{long_pack}, $location ),
425             pack($self->{long_pack}, $actual_length ),
426             pack('n n', $fileobj->transaction_id, $deleted ),
427         );
428
429         for ( @transactions ) {
430             my $tag2 = $self->load_tag( $tag->{offset} - SIG_SIZE - $self->{data_size} );
431             $fileobj->{transaction_id} = $_;
432             $self->add_bucket( $tag2, $md5, '', '', 1, $orig_key );
433             $fileobj->{transaction_id} = 0;
434         }
435     }
436     # If bucket didn't fit into list, split into a new index level
437     # split_index() will do the _fileobj->request_space() call
438     else {
439         $location = $self->split_index( $md5, $tag );
440     }
441
442     $self->write_value( $location, $plain_key, $value, $orig_key );
443
444     return $result;
445 }
446
447 sub write_value {
448     my $self = shift;
449     my ($location, $key, $value, $orig_key) = @_;
450
451     my $fileobj = $self->_fileobj;
452
453     my $dbm_deep_obj = _get_dbm_object( $value );
454     if ( $dbm_deep_obj && $dbm_deep_obj->_fileobj ne $fileobj ) {
455         $self->_throw_error( "Cannot cross-reference. Use export() instead" );
456     }
457
458     ##
459     # Write signature based on content type, set content length and write
460     # actual value.
461     ##
462     my $r = Scalar::Util::reftype( $value ) || '';
463     if ( $dbm_deep_obj ) {
464         $self->write_tag( $location, SIG_INTERNAL,pack($self->{long_pack}, $dbm_deep_obj->_base_offset) );
465     }
466     elsif ($r eq 'HASH') {
467         if ( !$dbm_deep_obj && tied %{$value} ) {
468             $self->_throw_error( "Cannot store something that is tied" );
469         }
470         $self->write_tag( $location, SIG_HASH, chr(0)x$self->{index_size} );
471     }
472     elsif ($r eq 'ARRAY') {
473         if ( !$dbm_deep_obj && tied @{$value} ) {
474             $self->_throw_error( "Cannot store something that is tied" );
475         }
476         $self->write_tag( $location, SIG_ARRAY, chr(0)x$self->{index_size} );
477     }
478     elsif (!defined($value)) {
479         $self->write_tag( $location, SIG_NULL, '' );
480     }
481     else {
482         $self->write_tag( $location, SIG_DATA, $value );
483     }
484
485     ##
486     # Plain key is stored AFTER value, as keys are typically fetched less often.
487     ##
488     $fileobj->print_at( undef, pack($self->{data_pack}, length($key)) . $key );
489
490     # Internal references don't care about autobless
491     return 1 if $dbm_deep_obj;
492
493     ##
494     # If value is blessed, preserve class name
495     ##
496     if ( $fileobj->{autobless} ) {
497         my $c = Scalar::Util::blessed($value);
498         if ( defined $c && !$dbm_deep_obj ) {
499             $fileobj->print_at( undef, chr(1), pack($self->{data_pack}, length($c)) . $c );
500         }
501         else {
502             $fileobj->print_at( undef, chr(0) );
503         }
504     }
505
506     ##
507     # Tie the passed in reference so that changes to it are reflected in the
508     # datafile. The use of $location as the base_offset will act as the
509     # the linkage between parent and child.
510     #
511     # The overall assignment is a hack around the fact that just tying doesn't
512     # store the values. This may not be the wrong thing to do.
513     ##
514     if ($r eq 'HASH') {
515         my %x = %$value;
516         tie %$value, 'DBM::Deep', {
517             base_offset => $location,
518             fileobj     => $fileobj,
519             parent      => $self->{obj},
520             parent_key  => $orig_key,
521         };
522         %$value = %x;
523     }
524     elsif ($r eq 'ARRAY') {
525         my @x = @$value;
526         tie @$value, 'DBM::Deep', {
527             base_offset => $location,
528             fileobj     => $fileobj,
529             parent      => $self->{obj},
530             parent_key  => $orig_key,
531         };
532         @$value = @x;
533     }
534
535     return 1;
536 }
537
538 sub split_index {
539     my $self = shift;
540     my ($md5, $tag) = @_;
541
542     my $fileobj = $self->_fileobj;
543
544     my $loc = $fileobj->request_space(
545         $self->tag_size( $self->{index_size} ),
546     );
547
548     $fileobj->print_at( $tag->{ref_loc}, pack($self->{long_pack}, $loc) );
549
550     my $index_tag = $self->write_tag(
551         $loc, SIG_INDEX,
552         chr(0)x$self->{index_size},
553     );
554
555     my $newtag_loc = $fileobj->request_space(
556         $self->tag_size( $self->{bucket_list_size} ),
557     );
558
559     my $keys = $tag->{content}
560              . $md5 . pack($self->{long_pack}, $newtag_loc)
561                     . pack($self->{long_pack}, 0)  # size
562                     . pack($self->{long_pack}, 0); # transaction ID
563
564     my @newloc = ();
565     BUCKET:
566     for (my $i = 0; $i <= $self->{max_buckets}; $i++) {
567         my ($key, $old_subloc, $size) = $self->_get_key_subloc( $keys, $i );
568
569         die "[INTERNAL ERROR]: No key in split_index()\n" unless $key;
570         die "[INTERNAL ERROR]: No subloc in split_index()\n" unless $old_subloc;
571
572         my $num = ord(substr($key, $tag->{ch} + 1, 1));
573
574         if ($newloc[$num]) {
575             my $subkeys = $fileobj->read_at( $newloc[$num], $self->{bucket_list_size} );
576
577             # This is looking for the first empty spot
578             my ($subloc, $offset, $size) = $self->_find_in_buckets(
579                 { content => $subkeys }, '',
580             );
581
582             $fileobj->print_at( $newloc[$num] + $offset, $key . pack($self->{long_pack}, $old_subloc) );
583
584             next;
585         }
586
587         my $loc = $fileobj->request_space(
588             $self->tag_size( $self->{bucket_list_size} ),
589         );
590
591         $fileobj->print_at(
592             $index_tag->{offset} + ($num * $self->{long_size}),
593             pack($self->{long_pack}, $loc),
594         );
595
596         my $blist_tag = $self->write_tag(
597             $loc, SIG_BLIST,
598             chr(0)x$self->{bucket_list_size},
599         );
600
601         $fileobj->print_at( $blist_tag->{offset}, $key . pack($self->{long_pack}, $old_subloc) );
602
603         $newloc[$num] = $blist_tag->{offset};
604     }
605
606     $self->_release_space(
607         $self->tag_size( $self->{bucket_list_size} ),
608         $tag->{offset} - SIG_SIZE - $self->{data_size},
609     );
610
611     return $newtag_loc;
612 }
613
614 sub read_from_loc {
615     my $self = shift;
616     my ($subloc, $orig_key) = @_;
617
618     my $fileobj = $self->_fileobj;
619
620     ##
621     # Found match -- seek to offset and read signature
622     ##
623     my $signature = $fileobj->read_at( $subloc, SIG_SIZE );
624
625     ##
626     # If value is a hash or array, return new DBM::Deep object with correct offset
627     ##
628     if (($signature eq SIG_HASH) || ($signature eq SIG_ARRAY)) {
629         my $new_obj = DBM::Deep->new({
630             type        => $signature,
631             base_offset => $subloc,
632             fileobj     => $self->_fileobj,
633             parent      => $self->{obj},
634             parent_key  => $orig_key,
635         });
636
637         if ($new_obj->_fileobj->{autobless}) {
638             ##
639             # Skip over value and plain key to see if object needs
640             # to be re-blessed
641             ##
642             $fileobj->increment_pointer( $self->{data_size} + $self->{index_size} );
643
644             my $size = $fileobj->read_at( undef, $self->{data_size} );
645             $size = unpack($self->{data_pack}, $size);
646             if ($size) { $fileobj->increment_pointer( $size ); }
647
648             my $bless_bit = $fileobj->read_at( undef, 1 );
649             if (ord($bless_bit)) {
650                 ##
651                 # Yes, object needs to be re-blessed
652                 ##
653                 my $size = $fileobj->read_at( undef, $self->{data_size} );
654                 $size = unpack($self->{data_pack}, $size);
655
656                 my $class_name;
657                 if ($size) { $class_name = $fileobj->read_at( undef, $size ); }
658                 if (defined $class_name) { $new_obj = bless( $new_obj, $class_name ); }
659             }
660         }
661
662         return $new_obj;
663     }
664     elsif ( $signature eq SIG_INTERNAL ) {
665         my $size = $fileobj->read_at( undef, $self->{data_size} );
666         $size = unpack($self->{data_pack}, $size);
667
668         if ( $size ) {
669             my $new_loc = $fileobj->read_at( undef, $size );
670             $new_loc = unpack( $self->{long_pack}, $new_loc ); 
671             return $self->read_from_loc( $new_loc, $orig_key );
672         }
673         else {
674             return;
675         }
676     }
677     ##
678     # Otherwise return actual value
679     ##
680     elsif ( $signature eq SIG_DATA ) {
681         my $size = $fileobj->read_at( undef, $self->{data_size} );
682         $size = unpack($self->{data_pack}, $size);
683
684         my $value = '';
685         if ($size) { $value = $fileobj->read_at( undef, $size ); }
686         return $value;
687     }
688
689     ##
690     # Key exists, but content is null
691     ##
692     return;
693 }
694
695 sub get_bucket_value {
696     ##
697     # Fetch single value given tag and MD5 digested key.
698     ##
699     my $self = shift;
700     my ($tag, $md5, $orig_key) = @_;
701
702     #ACID - This is a read. Can find exact or HEAD
703     my ($subloc, $offset, $size,$is_deleted) = $self->_find_in_buckets( $tag, $md5 );
704     if ( $subloc && !$is_deleted ) {
705         return $self->read_from_loc( $subloc, $orig_key );
706     }
707     return;
708 }
709
710 sub delete_bucket {
711     ##
712     # Delete single key/value pair given tag and MD5 digested key.
713     ##
714     my $self = shift;
715     my ($tag, $md5, $orig_key) = @_;
716
717     #ACID - This is a mutation. Must only find the exact transaction
718     my ($subloc, $offset, $size) = $self->_find_in_buckets( $tag, $md5, 1 );
719 #XXX This needs _release_space() for the value and anything below
720     if ( $subloc ) {
721         $self->_fileobj->print_at(
722             $tag->{offset} + $offset,
723             substr($tag->{content}, $offset + $self->{bucket_size} ),
724             chr(0) x $self->{bucket_size},
725         );
726
727         return 1;
728     }
729     return;
730 }
731
732 sub bucket_exists {
733     ##
734     # Check existence of single key given tag and MD5 digested key.
735     ##
736     my $self = shift;
737     my ($tag, $md5) = @_;
738
739     #ACID - This is a read. Can find exact or HEAD
740     my ($subloc, $offset, $size, $is_deleted) = $self->_find_in_buckets( $tag, $md5 );
741     return ($subloc && !$is_deleted) && 1;
742 }
743
744 sub find_bucket_list {
745     ##
746     # Locate offset for bucket list, given digested key
747     ##
748     my $self = shift;
749     my ($offset, $md5, $args) = @_;
750     $args = {} unless $args;
751
752     local($/,$\);
753
754     ##
755     # Locate offset for bucket list using digest index system
756     ##
757     my $tag = $self->load_tag( $offset )
758         or $self->_throw_error( "INTERNAL ERROR - Cannot find tag" );
759
760     my $ch = 0;
761     while ($tag->{signature} ne SIG_BLIST) {
762         my $num = ord substr($md5, $ch, 1);
763
764         my $ref_loc = $tag->{offset} + ($num * $self->{long_size});
765         $tag = $self->index_lookup( $tag, $num );
766
767         if (!$tag) {
768             return if !$args->{create};
769
770             my $loc = $self->_fileobj->request_space(
771                 $self->tag_size( $self->{bucket_list_size} ),
772             );
773
774             $self->_fileobj->print_at( $ref_loc, pack($self->{long_pack}, $loc) );
775
776             $tag = $self->write_tag(
777                 $loc, SIG_BLIST,
778                 chr(0)x$self->{bucket_list_size},
779             );
780
781             $tag->{ref_loc} = $ref_loc;
782             $tag->{ch} = $ch;
783
784             last;
785         }
786
787         $tag->{ch} = $ch++;
788         $tag->{ref_loc} = $ref_loc;
789     }
790
791     return $tag;
792 }
793
794 sub index_lookup {
795     ##
796     # Given index tag, lookup single entry in index and return .
797     ##
798     my $self = shift;
799     my ($tag, $index) = @_;
800
801     my $location = unpack(
802         $self->{long_pack},
803         substr(
804             $tag->{content},
805             $index * $self->{long_size},
806             $self->{long_size},
807         ),
808     );
809
810     if (!$location) { return; }
811
812     return $self->load_tag( $location );
813 }
814
815 sub traverse_index {
816     ##
817     # Scan index and recursively step into deeper levels, looking for next key.
818     ##
819     my $self = shift;
820     my ($obj, $offset, $ch, $force_return_next) = @_;
821
822     my $tag = $self->load_tag( $offset );
823
824     if ($tag->{signature} ne SIG_BLIST) {
825         my $content = $tag->{content};
826         my $start = $obj->{return_next} ? 0 : ord(substr($obj->{prev_md5}, $ch, 1));
827
828         for (my $idx = $start; $idx < (2**8); $idx++) {
829             my $subloc = unpack(
830                 $self->{long_pack},
831                 substr(
832                     $content,
833                     $idx * $self->{long_size},
834                     $self->{long_size},
835                 ),
836             );
837
838             if ($subloc) {
839                 my $result = $self->traverse_index(
840                     $obj, $subloc, $ch + 1, $force_return_next,
841                 );
842
843                 if (defined($result)) { return $result; }
844             }
845         } # index loop
846
847         $obj->{return_next} = 1;
848     } # tag is an index
849
850     else {
851         my $keys = $tag->{content};
852         if ($force_return_next) { $obj->{return_next} = 1; }
853
854         ##
855         # Iterate through buckets, looking for a key match
856         ##
857         for (my $i = 0; $i < $self->{max_buckets}; $i++) {
858             my ($key, $subloc) = $self->_get_key_subloc( $keys, $i );
859
860             # End of bucket list -- return to outer loop
861             if (!$subloc) {
862                 $obj->{return_next} = 1;
863                 last;
864             }
865             # Located previous key -- return next one found
866             elsif ($key eq $obj->{prev_md5}) {
867                 $obj->{return_next} = 1;
868                 next;
869             }
870             # Seek to bucket location and skip over signature
871             elsif ($obj->{return_next}) {
872                 my $fileobj = $self->_fileobj;
873
874                 # Skip over value to get to plain key
875                 my $sig = $fileobj->read_at( $subloc, SIG_SIZE );
876
877                 my $size = $fileobj->read_at( undef, $self->{data_size} );
878                 $size = unpack($self->{data_pack}, $size);
879                 if ($size) { $fileobj->increment_pointer( $size ); }
880
881                 # Read in plain key and return as scalar
882                 $size = $fileobj->read_at( undef, $self->{data_size} );
883                 $size = unpack($self->{data_pack}, $size);
884                 my $plain_key;
885                 if ($size) { $plain_key = $fileobj->read_at( undef, $size); }
886
887                 return $plain_key;
888             }
889         }
890
891         $obj->{return_next} = 1;
892     } # tag is a bucket list
893
894     return;
895 }
896
897 sub get_next_key {
898     ##
899     # Locate next key, given digested previous one
900     ##
901     my $self = shift;
902     my ($obj) = @_;
903
904     $obj->{prev_md5} = $_[1] ? $_[1] : undef;
905     $obj->{return_next} = 0;
906
907     ##
908     # If the previous key was not specifed, start at the top and
909     # return the first one found.
910     ##
911     if (!$obj->{prev_md5}) {
912         $obj->{prev_md5} = chr(0) x $self->{hash_size};
913         $obj->{return_next} = 1;
914     }
915
916     return $self->traverse_index( $obj, $obj->_base_offset, 0 );
917 }
918
919 # Utilities
920
921 sub _get_key_subloc {
922     my $self = shift;
923     my ($keys, $idx) = @_;
924
925     my ($key, $subloc, $size, $transaction_id, $is_deleted) = unpack(
926         # This is 'a', not 'A'. Please read the pack() documentation for the
927         # difference between the two and why it's important.
928         "a$self->{hash_size} $self->{long_pack}2 n2",
929         substr(
930             $keys,
931             ($idx * $self->{bucket_size}),
932             $self->{bucket_size},
933         ),
934     );
935
936     return ($key, $subloc, $size, $transaction_id, $is_deleted);
937 }
938
939 sub _find_in_buckets {
940     my $self = shift;
941     my ($tag, $md5, $exact) = @_;
942
943     my $trans_id = $self->_fileobj->transaction_id;
944
945     my @zero;
946
947     BUCKET:
948     for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
949         my ($key, $subloc, $size, $transaction_id, $is_deleted) = $self->_get_key_subloc(
950             $tag->{content}, $i,
951         );
952
953         my @rv = ($subloc, $i * $self->{bucket_size}, $size, $is_deleted);
954
955         unless ( $subloc ) {
956             if ( !$exact && @zero and $trans_id ) {
957                 @rv = ($zero[2], $zero[0] * $self->{bucket_size},$zero[3],$is_deleted);
958             }
959             return @rv;
960         }
961
962         next BUCKET if $key ne $md5;
963
964         # Save off the HEAD in case we need it.
965         @zero = ($i,$key,$subloc,$size,$transaction_id,$is_deleted) if $transaction_id == 0;
966
967         next BUCKET if $transaction_id != $trans_id;
968
969         return @rv;
970     }
971
972     return;
973 }
974
975 sub _release_space {
976     my $self = shift;
977     my ($size, $loc) = @_;
978
979     my $next_loc = 0;
980
981     $self->_fileobj->print_at( $loc,
982         SIG_FREE, 
983         pack($self->{long_pack}, $size ),
984         pack($self->{long_pack}, $next_loc ),
985     );
986
987     return;
988 }
989
990 sub _throw_error {
991     die "DBM::Deep: $_[1]\n";
992 }
993
994 1;
995 __END__