Added faililng tests for autovivification and clear() within transactions
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 use 5.6.0;
4
5 use strict;
6 use warnings;
7
8 use Fcntl qw( :DEFAULT :flock );
9 use Scalar::Util ();
10
11 # File-wide notes:
12 # * To add to bucket_size, make sure you modify the following:
13 #   - calculate_sizes()
14 #   - _get_key_subloc()
15 #   - add_bucket() - where the buckets are printed
16
17 ##
18 # Setup file and tag signatures.  These should never change.
19 ##
20 sub SIG_FILE     () { 'DPDB' }
21 sub SIG_HEADER   () { 'h'    }
22 sub SIG_INTERNAL () { 'i'    }
23 sub SIG_HASH     () { 'H'    }
24 sub SIG_ARRAY    () { 'A'    }
25 sub SIG_NULL     () { 'N'    }
26 sub SIG_DATA     () { 'D'    }
27 sub SIG_INDEX    () { 'I'    }
28 sub SIG_BLIST    () { 'B'    }
29 sub SIG_FREE     () { 'F'    }
30 sub SIG_SIZE     () {  1     }
31
32 sub new {
33     my $class = shift;
34     my ($args) = @_;
35
36     my $self = bless {
37         long_size   => 4,
38         long_pack   => 'N',
39         data_size   => 4,
40         data_pack   => 'N',
41
42         digest      => \&Digest::MD5::md5,
43         hash_size   => 16,
44
45         ##
46         # Maximum number of buckets per list before another level of indexing is
47         # done. Increase this value for slightly greater speed, but larger database
48         # files. DO NOT decrease this value below 16, due to risk of recursive
49         # reindex overrun.
50         ##
51         max_buckets => 16,
52
53         fileobj => undef,
54         obj     => undef,
55     }, $class;
56
57     if ( defined $args->{pack_size} ) {
58         if ( lc $args->{pack_size} eq 'small' ) {
59             $args->{long_size} = 2;
60             $args->{long_pack} = 'n';
61         }
62         elsif ( lc $args->{pack_size} eq 'medium' ) {
63             $args->{long_size} = 4;
64             $args->{long_pack} = 'N';
65         }
66         elsif ( lc $args->{pack_size} eq 'large' ) {
67             $args->{long_size} = 8;
68             $args->{long_pack} = 'Q';
69         }
70         else {
71             die "Unknown pack_size value: '$args->{pack_size}'\n";
72         }
73     }
74
75     # Grab the parameters we want to use
76     foreach my $param ( keys %$self ) {
77         next unless exists $args->{$param};
78         $self->{$param} = $args->{$param};
79     }
80     Scalar::Util::weaken( $self->{obj} ) if $self->{obj};
81
82     if ( $self->{max_buckets} < 16 ) {
83         warn "Floor of max_buckets is 16. Setting it to 16 from '$self->{max_buckets}'\n";
84         $self->{max_buckets} = 16;
85     }
86
87     return $self;
88 }
89
90 sub _fileobj { return $_[0]{fileobj} }
91
92 sub calculate_sizes {
93     my $self = shift;
94
95     # The 2**8 here indicates the number of different characters in the
96     # current hashing algorithm
97     #XXX Does this need to be updated with different hashing algorithms?
98     $self->{index_size}       = (2**8) * $self->{long_size};
99     $self->{bucket_size}      = $self->{hash_size} + $self->{long_size} * 3;
100     $self->{bucket_list_size} = $self->{max_buckets} * $self->{bucket_size};
101
102     return;
103 }
104
105 sub write_file_header {
106     my $self = shift;
107
108     my $loc = $self->_fileobj->request_space( length( SIG_FILE ) + 21 );
109
110     $self->_fileobj->print_at( $loc,
111         SIG_FILE,
112         SIG_HEADER,
113         pack('N', 1),  # header version
114         pack('N', 12), # header size
115         pack('N', 0),  # currently running transaction IDs
116         pack('n', $self->{long_size}),
117         pack('A', $self->{long_pack}),
118         pack('n', $self->{data_size}),
119         pack('A', $self->{data_pack}),
120         pack('n', $self->{max_buckets}),
121     );
122
123     $self->_fileobj->set_transaction_offset( 13 );
124
125     return;
126 }
127
128 sub read_file_header {
129     my $self = shift;
130
131     my $buffer = $self->_fileobj->read_at( 0, length(SIG_FILE) + 9 );
132     return unless length($buffer);
133
134     my ($file_signature, $sig_header, $header_version, $size) = unpack(
135         'A4 A N N', $buffer
136     );
137
138     unless ( $file_signature eq SIG_FILE ) {
139         $self->_fileobj->close;
140         $self->_throw_error( "Signature not found -- file is not a Deep DB" );
141     }
142
143     unless ( $sig_header eq SIG_HEADER ) {
144         $self->_fileobj->close;
145         $self->_throw_error( "Old file version found." );
146     }
147
148     my $buffer2 = $self->_fileobj->read_at( undef, $size );
149     my ($running_transactions, @values) = unpack( 'N n A n A n', $buffer2 );
150
151     $self->_fileobj->set_transaction_offset( 13 );
152
153     if ( @values < 5 || grep { !defined } @values ) {
154         $self->_fileobj->close;
155         $self->_throw_error("Corrupted file - bad header");
156     }
157
158     #XXX Add warnings if values weren't set right
159     @{$self}{qw(long_size long_pack data_size data_pack max_buckets)} = @values;
160
161     return length($buffer) + length($buffer2);
162 }
163
164 sub setup_fh {
165     my $self = shift;
166     my ($obj) = @_;
167
168     # Need to remove use of $fh here
169     my $fh = $self->_fileobj->{fh};
170     flock $fh, LOCK_EX;
171
172     #XXX The duplication of calculate_sizes needs to go away
173     unless ( $obj->{base_offset} ) {
174         my $bytes_read = $self->read_file_header;
175
176         $self->calculate_sizes;
177
178         ##
179         # File is empty -- write header and master index
180         ##
181         if (!$bytes_read) {
182             $self->_fileobj->audit( "# Database created on" );
183
184             $self->write_file_header;
185
186             $obj->{base_offset} = $self->_fileobj->request_space(
187                 $self->tag_size( $self->{index_size} ),
188             );
189
190             $self->write_tag(
191                 $obj->_base_offset, $obj->_type,
192                 chr(0)x$self->{index_size},
193             );
194
195             # Flush the filehandle
196             my $old_fh = select $fh;
197             my $old_af = $|; $| = 1; $| = $old_af;
198             select $old_fh;
199         }
200         else {
201             $obj->{base_offset} = $bytes_read;
202
203             ##
204             # Get our type from master index header
205             ##
206             my $tag = $self->load_tag($obj->_base_offset);
207             unless ( $tag ) {
208                 flock $fh, LOCK_UN;
209                 $self->_throw_error("Corrupted file, no master index record");
210             }
211
212             unless ($obj->_type eq $tag->{signature}) {
213                 flock $fh, LOCK_UN;
214                 $self->_throw_error("File type mismatch");
215             }
216         }
217     }
218     else {
219         $self->calculate_sizes;
220     }
221
222     #XXX We have to make sure we don't mess up when autoflush isn't turned on
223     $self->_fileobj->set_inode;
224
225     flock $fh, LOCK_UN;
226
227     return 1;
228 }
229
230 sub tag_size {
231     my $self = shift;
232     my ($size) = @_;
233     return SIG_SIZE + $self->{data_size} + $size;
234 }
235
236 sub write_tag {
237     ##
238     # Given offset, signature and content, create tag and write to disk
239     ##
240     my $self = shift;
241     my ($offset, $sig, $content) = @_;
242     my $size = length( $content );
243
244     $self->_fileobj->print_at(
245         $offset, 
246         $sig, pack($self->{data_pack}, $size), $content,
247     );
248
249     return unless defined $offset;
250
251     return {
252         signature => $sig,
253         size => $size,
254         offset => $offset + SIG_SIZE + $self->{data_size},
255         content => $content
256     };
257 }
258
259 sub load_tag {
260     ##
261     # Given offset, load single tag and return signature, size and data
262     ##
263     my $self = shift;
264     my ($offset) = @_;
265
266     my $fileobj = $self->_fileobj;
267
268     my $s = SIG_SIZE + $self->{data_size};
269     my $b = $fileobj->read_at( $offset, $s );
270     my ($sig, $size) = unpack( "A $self->{data_pack}", $b );
271
272     my $buffer = $fileobj->read_at( undef, $size );
273
274     return {
275         signature => $sig,
276         size => $size,
277         offset => $offset + SIG_SIZE + $self->{data_size},
278         content => $buffer
279     };
280 }
281
282 sub _get_dbm_object {
283     my $item = shift;
284
285     my $obj = eval {
286         local $SIG{__DIE__};
287         if ($item->isa( 'DBM::Deep' )) {
288             return $item;
289         }
290         return;
291     };
292     return $obj if $obj;
293
294     my $r = Scalar::Util::reftype( $item ) || '';
295     if ( $r eq 'HASH' ) {
296         my $obj = eval {
297             local $SIG{__DIE__};
298             my $obj = tied(%$item);
299             if ($obj->isa( 'DBM::Deep' )) {
300                 return $obj;
301             }
302             return;
303         };
304         return $obj if $obj;
305     }
306     elsif ( $r eq 'ARRAY' ) {
307         my $obj = eval {
308             local $SIG{__DIE__};
309             my $obj = tied(@$item);
310             if ($obj->isa( 'DBM::Deep' )) {
311                 return $obj;
312             }
313             return;
314         };
315         return $obj if $obj;
316     }
317
318     return;
319 }
320
321 sub _length_needed {
322     my $self = shift;
323     my ($value, $key) = @_;
324
325     my $is_dbm_deep = eval {
326         local $SIG{'__DIE__'};
327         $value->isa( 'DBM::Deep' );
328     };
329
330     my $len = SIG_SIZE
331             + $self->{data_size} # size for value
332             + $self->{data_size} # size for key
333             + length( $key );    # length of key
334
335     if ( $is_dbm_deep && $value->_fileobj eq $self->_fileobj ) {
336         # long_size is for the internal reference
337         return $len + $self->{long_size};
338     }
339
340     if ( $self->_fileobj->{autobless} ) {
341         # This is for the bit saying whether or not this thing is blessed.
342         $len += 1;
343     }
344
345     my $r = Scalar::Util::reftype( $value ) || '';
346     unless ( $r eq 'HASH' || $r eq 'ARRAY' ) {
347         if ( defined $value ) {
348             $len += length( $value );
349         }
350         return $len;
351     }
352
353     $len += $self->{index_size};
354
355     # if autobless is enabled, must also take into consideration
356     # the class name as it is stored after the key.
357     if ( $self->_fileobj->{autobless} ) {
358         my $c = Scalar::Util::blessed($value);
359         if ( defined $c && !$is_dbm_deep ) {
360             $len += $self->{data_size} + length($c);
361         }
362     }
363
364     return $len;
365 }
366
367 sub add_bucket {
368     ##
369     # Adds one key/value pair to bucket list, given offset, MD5 digest of key,
370     # plain (undigested) key and value.
371     ##
372     my $self = shift;
373     my ($tag, $md5, $plain_key, $value, $deleted, $orig_key) = @_;
374     $deleted ||= 0;
375
376     local($/,$\);
377
378     # This verifies that only supported values will be stored.
379     {
380         my $r = Scalar::Util::reftype( $value );
381         last if !defined $r;
382
383         last if $r eq 'HASH';
384         last if $r eq 'ARRAY';
385
386         $self->_throw_error(
387             "Storage of variables of type '$r' is not supported."
388         );
389     }
390
391     my $location = 0;
392     my $result = 2;
393
394     my $fileobj = $self->_fileobj;
395
396     my $actual_length = $self->_length_needed( $value, $plain_key );
397
398     #ACID - This is a mutation. Must only find the exact transaction
399     my ($subloc, $offset, $size,$is_deleted) = $self->_find_in_buckets( $tag, $md5, 1 );
400
401     my @transactions;
402     if ( $fileobj->transaction_id == 0 ) {
403         @transactions = $fileobj->current_transactions;
404     }
405
406 #    $self->_release_space( $size, $subloc );
407     # Updating a known md5
408 #XXX This needs updating to use _release_space
409     if ( $subloc ) {
410         $result = 1;
411
412         if ($actual_length <= $size) {
413             $location = $subloc;
414         }
415         else {
416             $location = $fileobj->request_space( $actual_length );
417
418             $fileobj->print_at( $tag->{offset} + $offset + $self->{hash_size},
419                 pack($self->{long_pack}, $location ),
420                 pack($self->{long_pack}, $actual_length ),
421                 pack('n n', $fileobj->transaction_id, $deleted ),
422             );
423         }
424     }
425     # Adding a new md5
426     elsif ( defined $offset ) {
427         $location = $fileobj->request_space( $actual_length );
428
429         $fileobj->print_at( $tag->{offset} + $offset,
430             $md5,
431             pack($self->{long_pack}, $location ),
432             pack($self->{long_pack}, $actual_length ),
433             pack('n n', $fileobj->transaction_id, $deleted ),
434         );
435
436         for ( @transactions ) {
437             my $tag2 = $self->load_tag( $tag->{offset} - SIG_SIZE - $self->{data_size} );
438             $fileobj->{transaction_id} = $_;
439             $self->add_bucket( $tag2, $md5, '', '', 1, $orig_key );
440             $fileobj->{transaction_id} = 0;
441         }
442         $tag = $self->load_tag( $tag->{offset} - SIG_SIZE - $self->{data_size} );
443     }
444     # If bucket didn't fit into list, split into a new index level
445     # split_index() will do the _fileobj->request_space() call
446     else {
447         $location = $self->split_index( $md5, $tag );
448     }
449
450     $self->write_value( $location, $plain_key, $value, $orig_key );
451
452     return $result;
453 }
454
455 sub write_value {
456     my $self = shift;
457     my ($location, $key, $value, $orig_key) = @_;
458
459     my $fileobj = $self->_fileobj;
460
461     my $dbm_deep_obj = _get_dbm_object( $value );
462     if ( $dbm_deep_obj && $dbm_deep_obj->_fileobj ne $fileobj ) {
463         $self->_throw_error( "Cannot cross-reference. Use export() instead" );
464     }
465
466     ##
467     # Write signature based on content type, set content length and write
468     # actual value.
469     ##
470     my $r = Scalar::Util::reftype( $value ) || '';
471     if ( $dbm_deep_obj ) {
472         $self->write_tag( $location, SIG_INTERNAL,pack($self->{long_pack}, $dbm_deep_obj->_base_offset) );
473     }
474     elsif ($r eq 'HASH') {
475         if ( !$dbm_deep_obj && tied %{$value} ) {
476             $self->_throw_error( "Cannot store something that is tied" );
477         }
478         $self->write_tag( $location, SIG_HASH, chr(0)x$self->{index_size} );
479     }
480     elsif ($r eq 'ARRAY') {
481         if ( !$dbm_deep_obj && tied @{$value} ) {
482             $self->_throw_error( "Cannot store something that is tied" );
483         }
484         $self->write_tag( $location, SIG_ARRAY, chr(0)x$self->{index_size} );
485     }
486     elsif (!defined($value)) {
487         $self->write_tag( $location, SIG_NULL, '' );
488     }
489     else {
490         $self->write_tag( $location, SIG_DATA, $value );
491     }
492
493     ##
494     # Plain key is stored AFTER value, as keys are typically fetched less often.
495     ##
496     $fileobj->print_at( undef, pack($self->{data_pack}, length($key)) . $key );
497
498     # Internal references don't care about autobless
499     return 1 if $dbm_deep_obj;
500
501     ##
502     # If value is blessed, preserve class name
503     ##
504     if ( $fileobj->{autobless} ) {
505         if ( defined( my $c = Scalar::Util::blessed($value) ) ) {
506             $fileobj->print_at( undef, chr(1), pack($self->{data_pack}, length($c)) . $c );
507         }
508         else {
509             $fileobj->print_at( undef, chr(0) );
510         }
511     }
512
513     ##
514     # Tie the passed in reference so that changes to it are reflected in the
515     # datafile. The use of $location as the base_offset will act as the
516     # the linkage between parent and child.
517     #
518     # The overall assignment is a hack around the fact that just tying doesn't
519     # store the values. This may not be the wrong thing to do.
520     ##
521     if ($r eq 'HASH') {
522         my %x = %$value;
523         tie %$value, 'DBM::Deep', {
524             base_offset => $location,
525             fileobj     => $fileobj,
526             parent      => $self->{obj},
527             parent_key  => $orig_key,
528         };
529         %$value = %x;
530     }
531     elsif ($r eq 'ARRAY') {
532         my @x = @$value;
533         tie @$value, 'DBM::Deep', {
534             base_offset => $location,
535             fileobj     => $fileobj,
536             parent      => $self->{obj},
537             parent_key  => $orig_key,
538         };
539         @$value = @x;
540     }
541
542     return 1;
543 }
544
545 sub split_index {
546     my $self = shift;
547     my ($md5, $tag) = @_;
548
549     my $fileobj = $self->_fileobj;
550
551     my $loc = $fileobj->request_space(
552         $self->tag_size( $self->{index_size} ),
553     );
554
555     $fileobj->print_at( $tag->{ref_loc}, pack($self->{long_pack}, $loc) );
556
557     my $index_tag = $self->write_tag(
558         $loc, SIG_INDEX,
559         chr(0)x$self->{index_size},
560     );
561
562     my $newtag_loc = $fileobj->request_space(
563         $self->tag_size( $self->{bucket_list_size} ),
564     );
565
566     my $keys = $tag->{content}
567              . $md5 . pack($self->{long_pack}, $newtag_loc)
568                     . pack($self->{long_pack}, 0)  # size
569                     . pack($self->{long_pack}, 0); # transaction ID
570
571     my @newloc = ();
572     BUCKET:
573     # The <= here is deliberate - we have max_buckets+1 keys to iterate
574     # through, unlike every other loop that uses max_buckets as a stop.
575     for (my $i = 0; $i <= $self->{max_buckets}; $i++) {
576         my ($key, $old_subloc, $size) = $self->_get_key_subloc( $keys, $i );
577
578         die "[INTERNAL ERROR]: No key in split_index()\n" unless $key;
579         die "[INTERNAL ERROR]: No subloc in split_index()\n" unless $old_subloc;
580
581         my $num = ord(substr($key, $tag->{ch} + 1, 1));
582
583         if ($newloc[$num]) {
584             my $subkeys = $fileobj->read_at( $newloc[$num], $self->{bucket_list_size} );
585
586             # This is looking for the first empty spot
587             my ($subloc, $offset, $size) = $self->_find_in_buckets(
588                 { content => $subkeys }, '',
589             );
590
591             $fileobj->print_at(
592                 $newloc[$num] + $offset,
593                 $key, pack($self->{long_pack}, $old_subloc),
594             );
595
596             next;
597         }
598
599         my $loc = $fileobj->request_space(
600             $self->tag_size( $self->{bucket_list_size} ),
601         );
602
603         $fileobj->print_at(
604             $index_tag->{offset} + ($num * $self->{long_size}),
605             pack($self->{long_pack}, $loc),
606         );
607
608         my $blist_tag = $self->write_tag(
609             $loc, SIG_BLIST,
610             chr(0)x$self->{bucket_list_size},
611         );
612
613         $fileobj->print_at( $blist_tag->{offset}, $key . pack($self->{long_pack}, $old_subloc) );
614
615         $newloc[$num] = $blist_tag->{offset};
616     }
617
618     $self->_release_space(
619         $self->tag_size( $self->{bucket_list_size} ),
620         $tag->{offset} - SIG_SIZE - $self->{data_size},
621     );
622
623     return $newtag_loc;
624 }
625
626 sub read_from_loc {
627     my $self = shift;
628     my ($subloc, $orig_key) = @_;
629
630     my $fileobj = $self->_fileobj;
631
632     my $signature = $fileobj->read_at( $subloc, SIG_SIZE );
633
634     ##
635     # If value is a hash or array, return new DBM::Deep object with correct offset
636     ##
637     if (($signature eq SIG_HASH) || ($signature eq SIG_ARRAY)) {
638         my $new_obj = DBM::Deep->new({
639             type        => $signature,
640             base_offset => $subloc,
641             fileobj     => $self->_fileobj,
642             parent      => $self->{obj},
643             parent_key  => $orig_key,
644         });
645
646         if ($new_obj->_fileobj->{autobless}) {
647             ##
648             # Skip over value and plain key to see if object needs
649             # to be re-blessed
650             ##
651             $fileobj->increment_pointer( $self->{data_size} + $self->{index_size} );
652
653             my $size = $fileobj->read_at( undef, $self->{data_size} );
654             $size = unpack($self->{data_pack}, $size);
655             if ($size) { $fileobj->increment_pointer( $size ); }
656
657             my $bless_bit = $fileobj->read_at( undef, 1 );
658             if (ord($bless_bit)) {
659                 ##
660                 # Yes, object needs to be re-blessed
661                 ##
662                 my $size = $fileobj->read_at( undef, $self->{data_size} );
663                 $size = unpack($self->{data_pack}, $size);
664
665                 my $class_name;
666                 if ($size) { $class_name = $fileobj->read_at( undef, $size ); }
667                 if (defined $class_name) { $new_obj = bless( $new_obj, $class_name ); }
668             }
669         }
670
671         return $new_obj;
672     }
673     elsif ( $signature eq SIG_INTERNAL ) {
674         my $size = $fileobj->read_at( undef, $self->{data_size} );
675         $size = unpack($self->{data_pack}, $size);
676
677         if ( $size ) {
678             my $new_loc = $fileobj->read_at( undef, $size );
679             $new_loc = unpack( $self->{long_pack}, $new_loc ); 
680             return $self->read_from_loc( $new_loc, $orig_key );
681         }
682         else {
683             return;
684         }
685     }
686     ##
687     # Otherwise return actual value
688     ##
689     elsif ( $signature eq SIG_DATA ) {
690         my $size = $fileobj->read_at( undef, $self->{data_size} );
691         $size = unpack($self->{data_pack}, $size);
692
693         my $value = '';
694         if ($size) { $value = $fileobj->read_at( undef, $size ); }
695         return $value;
696     }
697
698     ##
699     # Key exists, but content is null
700     ##
701     return;
702 }
703
704 sub get_bucket_value {
705     ##
706     # Fetch single value given tag and MD5 digested key.
707     ##
708     my $self = shift;
709     my ($tag, $md5, $orig_key) = @_;
710
711     #ACID - This is a read. Can find exact or HEAD
712     my ($subloc, $offset, $size, $is_deleted) = $self->_find_in_buckets( $tag, $md5 );
713
714     if ( !$subloc ) {
715         #XXX Need to use real key
716 #        $self->add_bucket( $tag, $md5, $orig_key, undef, undef, $orig_key );
717 #        return;
718     }
719     elsif ( !$is_deleted ) {
720         return $self->read_from_loc( $subloc, $orig_key );
721     }
722
723     return;
724 }
725
726 sub delete_bucket {
727     ##
728     # Delete single key/value pair given tag and MD5 digested key.
729     ##
730     my $self = shift;
731     my ($tag, $md5, $orig_key) = @_;
732
733     #ACID - Although this is a mutation, we must find any transaction.
734     # This is because we need to mark something as deleted that is in the HEAD.
735     my ($subloc, $offset, $size,$is_deleted) = $self->_find_in_buckets( $tag, $md5 );
736
737     return if !$subloc;
738
739     my $fileobj = $self->_fileobj;
740
741     my @transactions;
742     if ( $fileobj->transaction_id == 0 ) {
743         @transactions = $fileobj->current_transactions;
744     }
745
746     if ( $fileobj->transaction_id == 0 ) {
747         my $value = $self->read_from_loc( $subloc, $orig_key );
748
749         for (@transactions) {
750             $fileobj->{transaction_id} = $_;
751             #XXX Need to use real key
752             $self->add_bucket( $tag, $md5, $orig_key, $value, undef, $orig_key );
753             $fileobj->{transaction_id} = 0;
754         }
755         $tag = $self->load_tag( $tag->{offset} - SIG_SIZE - $self->{data_size} );
756
757         #XXX This needs _release_space() for the value and anything below
758         $fileobj->print_at(
759             $tag->{offset} + $offset,
760             substr( $tag->{content}, $offset + $self->{bucket_size} ),
761             chr(0) x $self->{bucket_size},
762         );
763     }
764     else {
765         $self->add_bucket( $tag, $md5, '', '', 1, $orig_key );
766     }
767
768     return 1;
769 }
770
771 sub bucket_exists {
772     ##
773     # Check existence of single key given tag and MD5 digested key.
774     ##
775     my $self = shift;
776     my ($tag, $md5) = @_;
777
778     #ACID - This is a read. Can find exact or HEAD
779     my ($subloc, $offset, $size, $is_deleted) = $self->_find_in_buckets( $tag, $md5 );
780     return ($subloc && !$is_deleted) && 1;
781 }
782
783 sub find_bucket_list {
784     ##
785     # Locate offset for bucket list, given digested key
786     ##
787     my $self = shift;
788     my ($offset, $md5, $args) = @_;
789     $args = {} unless $args;
790
791     local($/,$\);
792
793     ##
794     # Locate offset for bucket list using digest index system
795     ##
796     my $tag = $self->load_tag( $offset )
797         or $self->_throw_error( "INTERNAL ERROR - Cannot find tag" );
798
799     my $ch = 0;
800     while ($tag->{signature} ne SIG_BLIST) {
801         my $num = ord substr($md5, $ch, 1);
802
803         my $ref_loc = $tag->{offset} + ($num * $self->{long_size});
804         $tag = $self->index_lookup( $tag, $num );
805
806         if (!$tag) {
807             return if !$args->{create};
808
809             my $loc = $self->_fileobj->request_space(
810                 $self->tag_size( $self->{bucket_list_size} ),
811             );
812
813             $self->_fileobj->print_at( $ref_loc, pack($self->{long_pack}, $loc) );
814
815             $tag = $self->write_tag(
816                 $loc, SIG_BLIST,
817                 chr(0)x$self->{bucket_list_size},
818             );
819
820             $tag->{ref_loc} = $ref_loc;
821             $tag->{ch} = $ch;
822
823             last;
824         }
825
826         $tag->{ch} = $ch++;
827         $tag->{ref_loc} = $ref_loc;
828     }
829
830     return $tag;
831 }
832
833 sub index_lookup {
834     ##
835     # Given index tag, lookup single entry in index and return .
836     ##
837     my $self = shift;
838     my ($tag, $index) = @_;
839
840     my $location = unpack(
841         $self->{long_pack},
842         substr(
843             $tag->{content},
844             $index * $self->{long_size},
845             $self->{long_size},
846         ),
847     );
848
849     if (!$location) { return; }
850
851     return $self->load_tag( $location );
852 }
853
854 sub traverse_index {
855     ##
856     # Scan index and recursively step into deeper levels, looking for next key.
857     ##
858     my $self = shift;
859     my ($obj, $offset, $ch, $force_return_next) = @_;
860
861     my $tag = $self->load_tag( $offset );
862
863     if ($tag->{signature} ne SIG_BLIST) {
864         my $content = $tag->{content};
865         my $start = $obj->{return_next} ? 0 : ord(substr($obj->{prev_md5}, $ch, 1));
866
867         for (my $idx = $start; $idx < (2**8); $idx++) {
868             my $subloc = unpack(
869                 $self->{long_pack},
870                 substr(
871                     $content,
872                     $idx * $self->{long_size},
873                     $self->{long_size},
874                 ),
875             );
876
877             if ($subloc) {
878                 my $result = $self->traverse_index(
879                     $obj, $subloc, $ch + 1, $force_return_next,
880                 );
881
882                 if (defined($result)) { return $result; }
883             }
884         } # index loop
885
886         $obj->{return_next} = 1;
887     } # tag is an index
888
889     else {
890         my $keys = $tag->{content};
891         if ($force_return_next) { $obj->{return_next} = 1; }
892
893         ##
894         # Iterate through buckets, looking for a key match
895         ##
896         for (my $i = 0; $i < $self->{max_buckets}; $i++) {
897             my ($key, $subloc) = $self->_get_key_subloc( $keys, $i );
898
899             # End of bucket list -- return to outer loop
900             if (!$subloc) {
901                 $obj->{return_next} = 1;
902                 last;
903             }
904             # Located previous key -- return next one found
905             elsif ($key eq $obj->{prev_md5}) {
906                 $obj->{return_next} = 1;
907                 next;
908             }
909             # Seek to bucket location and skip over signature
910             elsif ($obj->{return_next}) {
911                 my $fileobj = $self->_fileobj;
912
913                 # Skip over value to get to plain key
914                 my $sig = $fileobj->read_at( $subloc, SIG_SIZE );
915
916                 my $size = $fileobj->read_at( undef, $self->{data_size} );
917                 $size = unpack($self->{data_pack}, $size);
918                 if ($size) { $fileobj->increment_pointer( $size ); }
919
920                 # Read in plain key and return as scalar
921                 $size = $fileobj->read_at( undef, $self->{data_size} );
922                 $size = unpack($self->{data_pack}, $size);
923                 my $plain_key;
924                 if ($size) { $plain_key = $fileobj->read_at( undef, $size); }
925
926                 return $plain_key;
927             }
928         }
929
930         $obj->{return_next} = 1;
931     } # tag is a bucket list
932
933     return;
934 }
935
936 sub get_next_key {
937     ##
938     # Locate next key, given digested previous one
939     ##
940     my $self = shift;
941     my ($obj) = @_;
942
943     $obj->{prev_md5} = $_[1] ? $_[1] : undef;
944     $obj->{return_next} = 0;
945
946     ##
947     # If the previous key was not specifed, start at the top and
948     # return the first one found.
949     ##
950     if (!$obj->{prev_md5}) {
951         $obj->{prev_md5} = chr(0) x $self->{hash_size};
952         $obj->{return_next} = 1;
953     }
954
955     return $self->traverse_index( $obj, $obj->_base_offset, 0 );
956 }
957
958 # Utilities
959
960 sub _get_key_subloc {
961     my $self = shift;
962     my ($keys, $idx) = @_;
963
964     my ($key, $subloc, $size, $transaction_id, $is_deleted) = unpack(
965         # This is 'a', not 'A'. Please read the pack() documentation for the
966         # difference between the two and why it's important.
967         "a$self->{hash_size} $self->{long_pack}2 n2",
968         substr(
969             $keys,
970             ($idx * $self->{bucket_size}),
971             $self->{bucket_size},
972         ),
973     );
974
975     return ($key, $subloc, $size, $transaction_id, $is_deleted);
976 }
977
978 sub _find_in_buckets {
979     my $self = shift;
980     my ($tag, $md5, $exact) = @_;
981     $exact ||= 0;
982
983     my $trans_id = $self->_fileobj->transaction_id;
984
985     my @zero;
986
987     BUCKET:
988     for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
989         my ($key, $subloc, $size, $transaction_id, $is_deleted) = $self->_get_key_subloc(
990             $tag->{content}, $i,
991         );
992
993         my @rv = ($subloc, $i * $self->{bucket_size}, $size, $is_deleted);
994
995         unless ( $subloc ) {
996             if ( !$exact && @zero && $trans_id ) {
997                 @rv = ($zero[2], $zero[0] * $self->{bucket_size},$zero[3],$is_deleted);
998             }
999             return @rv;
1000         }
1001
1002         next BUCKET if $key ne $md5;
1003
1004         # Save off the HEAD in case we need it.
1005         @zero = ($i,$key,$subloc,$size,$transaction_id,$is_deleted) if $transaction_id == 0;
1006
1007         next BUCKET if $transaction_id != $trans_id;
1008
1009         return @rv;
1010     }
1011
1012     return;
1013 }
1014
1015 sub _release_space {
1016     my $self = shift;
1017     my ($size, $loc) = @_;
1018
1019     my $next_loc = 0;
1020
1021     $self->_fileobj->print_at( $loc,
1022         SIG_FREE, 
1023         pack($self->{long_pack}, $size ),
1024         pack($self->{long_pack}, $next_loc ),
1025     );
1026
1027     return;
1028 }
1029
1030 sub _throw_error {
1031     die "DBM::Deep: $_[1]\n";
1032 }
1033
1034 1;
1035 __END__