delete is now transactional
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 use 5.6.0;
4
5 use strict;
6 use warnings;
7
8 use Fcntl qw( :DEFAULT :flock );
9 use Scalar::Util ();
10
11 # File-wide notes:
12 # * To add to bucket_size, make sure you modify the following:
13 #   - calculate_sizes()
14 #   - _get_key_subloc()
15 #   - add_bucket() - where the buckets are printed
16
17 ##
18 # Setup file and tag signatures.  These should never change.
19 ##
20 sub SIG_FILE     () { 'DPDB' }
21 sub SIG_HEADER   () { 'h'    }
22 sub SIG_INTERNAL () { 'i'    }
23 sub SIG_HASH     () { 'H'    }
24 sub SIG_ARRAY    () { 'A'    }
25 sub SIG_NULL     () { 'N'    }
26 sub SIG_DATA     () { 'D'    }
27 sub SIG_INDEX    () { 'I'    }
28 sub SIG_BLIST    () { 'B'    }
29 sub SIG_FREE     () { 'F'    }
30 sub SIG_SIZE     () {  1     }
31
32 sub new {
33     my $class = shift;
34     my ($args) = @_;
35
36     my $self = bless {
37         long_size   => 4,
38         long_pack   => 'N',
39         data_size   => 4,
40         data_pack   => 'N',
41
42         digest      => \&Digest::MD5::md5,
43         hash_size   => 16,
44
45         ##
46         # Maximum number of buckets per list before another level of indexing is
47         # done. Increase this value for slightly greater speed, but larger database
48         # files. DO NOT decrease this value below 16, due to risk of recursive
49         # reindex overrun.
50         ##
51         max_buckets => 16,
52
53         fileobj => undef,
54         obj     => undef,
55     }, $class;
56
57     if ( defined $args->{pack_size} ) {
58         if ( lc $args->{pack_size} eq 'small' ) {
59             $args->{long_size} = 2;
60             $args->{long_pack} = 'n';
61         }
62         elsif ( lc $args->{pack_size} eq 'medium' ) {
63             $args->{long_size} = 4;
64             $args->{long_pack} = 'N';
65         }
66         elsif ( lc $args->{pack_size} eq 'large' ) {
67             $args->{long_size} = 8;
68             $args->{long_pack} = 'Q';
69         }
70         else {
71             die "Unknown pack_size value: '$args->{pack_size}'\n";
72         }
73     }
74
75     # Grab the parameters we want to use
76     foreach my $param ( keys %$self ) {
77         next unless exists $args->{$param};
78         $self->{$param} = $args->{$param};
79     }
80     Scalar::Util::weaken( $self->{obj} ) if $self->{obj};
81
82     if ( $self->{max_buckets} < 16 ) {
83         warn "Floor of max_buckets is 16. Setting it to 16 from '$self->{max_buckets}'\n";
84         $self->{max_buckets} = 16;
85     }
86
87     return $self;
88 }
89
90 sub _fileobj { return $_[0]{fileobj} }
91
92 sub calculate_sizes {
93     my $self = shift;
94
95     # The 2**8 here indicates the number of different characters in the
96     # current hashing algorithm
97     #XXX Does this need to be updated with different hashing algorithms?
98     $self->{index_size}       = (2**8) * $self->{long_size};
99     $self->{bucket_size}      = $self->{hash_size} + $self->{long_size} * 3;
100     $self->{bucket_list_size} = $self->{max_buckets} * $self->{bucket_size};
101
102     return;
103 }
104
105 sub write_file_header {
106     my $self = shift;
107
108     my $loc = $self->_fileobj->request_space( length( SIG_FILE ) + 21 );
109
110     $self->_fileobj->print_at( $loc,
111         SIG_FILE,
112         SIG_HEADER,
113         pack('N', 1),  # header version
114         pack('N', 12), # header size
115         pack('N', 0),  # currently running transaction IDs
116         pack('n', $self->{long_size}),
117         pack('A', $self->{long_pack}),
118         pack('n', $self->{data_size}),
119         pack('A', $self->{data_pack}),
120         pack('n', $self->{max_buckets}),
121     );
122
123     $self->_fileobj->set_transaction_offset( 13 );
124
125     return;
126 }
127
128 sub read_file_header {
129     my $self = shift;
130
131     my $buffer = $self->_fileobj->read_at( 0, length(SIG_FILE) + 9 );
132     return unless length($buffer);
133
134     my ($file_signature, $sig_header, $header_version, $size) = unpack(
135         'A4 A N N', $buffer
136     );
137
138     unless ( $file_signature eq SIG_FILE ) {
139         $self->_fileobj->close;
140         $self->_throw_error( "Signature not found -- file is not a Deep DB" );
141     }
142
143     unless ( $sig_header eq SIG_HEADER ) {
144         $self->_fileobj->close;
145         $self->_throw_error( "Old file version found." );
146     }
147
148     my $buffer2 = $self->_fileobj->read_at( undef, $size );
149     my ($running_transactions, @values) = unpack( 'N n A n A n', $buffer2 );
150
151     $self->_fileobj->set_transaction_offset( 13 );
152
153     if ( @values < 5 || grep { !defined } @values ) {
154         $self->_fileobj->close;
155         $self->_throw_error("Corrupted file - bad header");
156     }
157
158     #XXX Add warnings if values weren't set right
159     @{$self}{qw(long_size long_pack data_size data_pack max_buckets)} = @values;
160
161     return length($buffer) + length($buffer2);
162 }
163
164 sub setup_fh {
165     my $self = shift;
166     my ($obj) = @_;
167
168     # Need to remove use of $fh here
169     my $fh = $self->_fileobj->{fh};
170     flock $fh, LOCK_EX;
171
172     #XXX The duplication of calculate_sizes needs to go away
173     unless ( $obj->{base_offset} ) {
174         my $bytes_read = $self->read_file_header;
175
176         $self->calculate_sizes;
177
178         ##
179         # File is empty -- write header and master index
180         ##
181         if (!$bytes_read) {
182             $self->_fileobj->audit( "# Database created on" );
183
184             $self->write_file_header;
185
186             $obj->{base_offset} = $self->_fileobj->request_space(
187                 $self->tag_size( $self->{index_size} ),
188             );
189
190             $self->write_tag(
191                 $obj->_base_offset, $obj->_type,
192                 chr(0)x$self->{index_size},
193             );
194
195             # Flush the filehandle
196             my $old_fh = select $fh;
197             my $old_af = $|; $| = 1; $| = $old_af;
198             select $old_fh;
199         }
200         else {
201             $obj->{base_offset} = $bytes_read;
202
203             ##
204             # Get our type from master index header
205             ##
206             my $tag = $self->load_tag($obj->_base_offset);
207             unless ( $tag ) {
208                 flock $fh, LOCK_UN;
209                 $self->_throw_error("Corrupted file, no master index record");
210             }
211
212             unless ($obj->_type eq $tag->{signature}) {
213                 flock $fh, LOCK_UN;
214                 $self->_throw_error("File type mismatch");
215             }
216         }
217     }
218     else {
219         $self->calculate_sizes;
220     }
221
222     #XXX We have to make sure we don't mess up when autoflush isn't turned on
223     $self->_fileobj->set_inode;
224
225     flock $fh, LOCK_UN;
226
227     return 1;
228 }
229
230 sub tag_size {
231     my $self = shift;
232     my ($size) = @_;
233     return SIG_SIZE + $self->{data_size} + $size;
234 }
235
236 sub write_tag {
237     ##
238     # Given offset, signature and content, create tag and write to disk
239     ##
240     my $self = shift;
241     my ($offset, $sig, $content) = @_;
242     my $size = length( $content );
243
244     $self->_fileobj->print_at(
245         $offset, 
246         $sig, pack($self->{data_pack}, $size), $content,
247     );
248
249     return unless defined $offset;
250
251     return {
252         signature => $sig,
253         size => $size,
254         offset => $offset + SIG_SIZE + $self->{data_size},
255         content => $content
256     };
257 }
258
259 sub load_tag {
260     ##
261     # Given offset, load single tag and return signature, size and data
262     ##
263     my $self = shift;
264     my ($offset) = @_;
265
266     my $fileobj = $self->_fileobj;
267
268     my $s = SIG_SIZE + $self->{data_size};
269     my $b = $fileobj->read_at( $offset, $s );
270     my ($sig, $size) = unpack( "A $self->{data_pack}", $b );
271
272     my $buffer = $fileobj->read_at( undef, $size );
273
274     return {
275         signature => $sig,
276         size => $size,
277         offset => $offset + SIG_SIZE + $self->{data_size},
278         content => $buffer
279     };
280 }
281
282 sub _get_dbm_object {
283     my $item = shift;
284
285     my $obj = eval {
286         local $SIG{__DIE__};
287         if ($item->isa( 'DBM::Deep' )) {
288             return $item;
289         }
290         return;
291     };
292     return $obj if $obj;
293
294     my $r = Scalar::Util::reftype( $item ) || '';
295     if ( $r eq 'HASH' ) {
296         my $obj = eval {
297             local $SIG{__DIE__};
298             my $obj = tied(%$item);
299             if ($obj->isa( 'DBM::Deep' )) {
300                 return $obj;
301             }
302             return;
303         };
304         return $obj if $obj;
305     }
306     elsif ( $r eq 'ARRAY' ) {
307         my $obj = eval {
308             local $SIG{__DIE__};
309             my $obj = tied(@$item);
310             if ($obj->isa( 'DBM::Deep' )) {
311                 return $obj;
312             }
313             return;
314         };
315         return $obj if $obj;
316     }
317
318     return;
319 }
320
321 sub _length_needed {
322     my $self = shift;
323     my ($value, $key) = @_;
324
325     my $is_dbm_deep = eval {
326         local $SIG{'__DIE__'};
327         $value->isa( 'DBM::Deep' );
328     };
329
330     my $len = SIG_SIZE
331             + $self->{data_size} # size for value
332             + $self->{data_size} # size for key
333             + length( $key );    # length of key
334
335     if ( $is_dbm_deep && $value->_fileobj eq $self->_fileobj ) {
336         # long_size is for the internal reference
337         return $len + $self->{long_size};
338     }
339
340     if ( $self->_fileobj->{autobless} ) {
341         # This is for the bit saying whether or not this thing is blessed.
342         $len += 1;
343     }
344
345     my $r = Scalar::Util::reftype( $value ) || '';
346     unless ( $r eq 'HASH' || $r eq 'ARRAY' ) {
347         if ( defined $value ) {
348             $len += length( $value );
349         }
350         return $len;
351     }
352
353     $len += $self->{index_size};
354
355     # if autobless is enabled, must also take into consideration
356     # the class name as it is stored after the key.
357     if ( $self->_fileobj->{autobless} ) {
358         my $c = Scalar::Util::blessed($value);
359         if ( defined $c && !$is_dbm_deep ) {
360             $len += $self->{data_size} + length($c);
361         }
362     }
363
364     return $len;
365 }
366
367 sub add_bucket {
368     ##
369     # Adds one key/value pair to bucket list, given offset, MD5 digest of key,
370     # plain (undigested) key and value.
371     ##
372     my $self = shift;
373     my ($tag, $md5, $plain_key, $value, $deleted, $orig_key) = @_;
374     $deleted ||= 0;
375
376     local($/,$\);
377
378     # This verifies that only supported values will be stored.
379     {
380         my $r = Scalar::Util::reftype( $value );
381         last if !defined $r;
382
383         last if $r eq 'HASH';
384         last if $r eq 'ARRAY';
385
386         $self->_throw_error(
387             "Storage of variables of type '$r' is not supported."
388         );
389     }
390
391     my $location = 0;
392     my $result = 2;
393
394     my $fileobj = $self->_fileobj;
395
396     my $actual_length = $self->_length_needed( $value, $plain_key );
397
398     #ACID - This is a mutation. Must only find the exact transaction
399     my ($subloc, $offset, $size,$is_deleted) = $self->_find_in_buckets( $tag, $md5, 1 );
400
401     my @transactions;
402     if ( $fileobj->transaction_id == 0 ) {
403         @transactions = $fileobj->current_transactions;
404     }
405
406 #    $self->_release_space( $size, $subloc );
407     # Updating a known md5
408 #XXX This needs updating to use _release_space
409     if ( $subloc ) {
410         $result = 1;
411
412         if ($actual_length <= $size) {
413             $location = $subloc;
414         }
415         else {
416             $location = $fileobj->request_space( $actual_length );
417
418             $fileobj->print_at( $tag->{offset} + $offset + $self->{hash_size},
419                 pack($self->{long_pack}, $location ),
420                 pack($self->{long_pack}, $actual_length ),
421                 pack('n n', $fileobj->transaction_id, $deleted ),
422             );
423         }
424     }
425     # Adding a new md5
426     elsif ( defined $offset ) {
427         $location = $fileobj->request_space( $actual_length );
428
429         $fileobj->print_at( $tag->{offset} + $offset,
430             $md5,
431             pack($self->{long_pack}, $location ),
432             pack($self->{long_pack}, $actual_length ),
433             pack('n n', $fileobj->transaction_id, $deleted ),
434         );
435
436         for ( @transactions ) {
437             my $tag2 = $self->load_tag( $tag->{offset} - SIG_SIZE - $self->{data_size} );
438             $fileobj->{transaction_id} = $_;
439             $self->add_bucket( $tag2, $md5, '', '', 1, $orig_key );
440             $fileobj->{transaction_id} = 0;
441         }
442         $tag = $self->load_tag( $tag->{offset} - SIG_SIZE - $self->{data_size} );
443     }
444     # If bucket didn't fit into list, split into a new index level
445     # split_index() will do the _fileobj->request_space() call
446     else {
447         $location = $self->split_index( $md5, $tag );
448     }
449
450     $self->write_value( $location, $plain_key, $value, $orig_key );
451
452     return $result;
453 }
454
455 sub write_value {
456     my $self = shift;
457     my ($location, $key, $value, $orig_key) = @_;
458
459     my $fileobj = $self->_fileobj;
460
461     my $dbm_deep_obj = _get_dbm_object( $value );
462     if ( $dbm_deep_obj && $dbm_deep_obj->_fileobj ne $fileobj ) {
463         $self->_throw_error( "Cannot cross-reference. Use export() instead" );
464     }
465
466     ##
467     # Write signature based on content type, set content length and write
468     # actual value.
469     ##
470     my $r = Scalar::Util::reftype( $value ) || '';
471     if ( $dbm_deep_obj ) {
472         $self->write_tag( $location, SIG_INTERNAL,pack($self->{long_pack}, $dbm_deep_obj->_base_offset) );
473     }
474     elsif ($r eq 'HASH') {
475         if ( !$dbm_deep_obj && tied %{$value} ) {
476             $self->_throw_error( "Cannot store something that is tied" );
477         }
478         $self->write_tag( $location, SIG_HASH, chr(0)x$self->{index_size} );
479     }
480     elsif ($r eq 'ARRAY') {
481         if ( !$dbm_deep_obj && tied @{$value} ) {
482             $self->_throw_error( "Cannot store something that is tied" );
483         }
484         $self->write_tag( $location, SIG_ARRAY, chr(0)x$self->{index_size} );
485     }
486     elsif (!defined($value)) {
487         $self->write_tag( $location, SIG_NULL, '' );
488     }
489     else {
490         $self->write_tag( $location, SIG_DATA, $value );
491     }
492
493     ##
494     # Plain key is stored AFTER value, as keys are typically fetched less often.
495     ##
496     $fileobj->print_at( undef, pack($self->{data_pack}, length($key)) . $key );
497
498     # Internal references don't care about autobless
499     return 1 if $dbm_deep_obj;
500
501     ##
502     # If value is blessed, preserve class name
503     ##
504     if ( $fileobj->{autobless} ) {
505         if ( defined( my $c = Scalar::Util::blessed($value) ) ) {
506             $fileobj->print_at( undef, chr(1), pack($self->{data_pack}, length($c)) . $c );
507         }
508         else {
509             $fileobj->print_at( undef, chr(0) );
510         }
511     }
512
513     ##
514     # Tie the passed in reference so that changes to it are reflected in the
515     # datafile. The use of $location as the base_offset will act as the
516     # the linkage between parent and child.
517     #
518     # The overall assignment is a hack around the fact that just tying doesn't
519     # store the values. This may not be the wrong thing to do.
520     ##
521     if ($r eq 'HASH') {
522         my %x = %$value;
523         tie %$value, 'DBM::Deep', {
524             base_offset => $location,
525             fileobj     => $fileobj,
526             parent      => $self->{obj},
527             parent_key  => $orig_key,
528         };
529         %$value = %x;
530     }
531     elsif ($r eq 'ARRAY') {
532         my @x = @$value;
533         tie @$value, 'DBM::Deep', {
534             base_offset => $location,
535             fileobj     => $fileobj,
536             parent      => $self->{obj},
537             parent_key  => $orig_key,
538         };
539         @$value = @x;
540     }
541
542     return 1;
543 }
544
545 sub split_index {
546     my $self = shift;
547     my ($md5, $tag) = @_;
548
549     my $fileobj = $self->_fileobj;
550
551     my $loc = $fileobj->request_space(
552         $self->tag_size( $self->{index_size} ),
553     );
554
555     $fileobj->print_at( $tag->{ref_loc}, pack($self->{long_pack}, $loc) );
556
557     my $index_tag = $self->write_tag(
558         $loc, SIG_INDEX,
559         chr(0)x$self->{index_size},
560     );
561
562     my $newtag_loc = $fileobj->request_space(
563         $self->tag_size( $self->{bucket_list_size} ),
564     );
565
566     my $keys = $tag->{content}
567              . $md5 . pack($self->{long_pack}, $newtag_loc)
568                     . pack($self->{long_pack}, 0)  # size
569                     . pack($self->{long_pack}, 0); # transaction ID
570
571     my @newloc = ();
572     BUCKET:
573     # The <= here is deliberate - we have max_buckets+1 keys to iterate
574     # through, unlike every other loop that uses max_buckets as a stop.
575     for (my $i = 0; $i <= $self->{max_buckets}; $i++) {
576         my ($key, $old_subloc, $size) = $self->_get_key_subloc( $keys, $i );
577
578         die "[INTERNAL ERROR]: No key in split_index()\n" unless $key;
579         die "[INTERNAL ERROR]: No subloc in split_index()\n" unless $old_subloc;
580
581         my $num = ord(substr($key, $tag->{ch} + 1, 1));
582
583         if ($newloc[$num]) {
584             my $subkeys = $fileobj->read_at( $newloc[$num], $self->{bucket_list_size} );
585
586             # This is looking for the first empty spot
587             my ($subloc, $offset, $size) = $self->_find_in_buckets(
588                 { content => $subkeys }, '',
589             );
590
591             $fileobj->print_at(
592                 $newloc[$num] + $offset,
593                 $key, pack($self->{long_pack}, $old_subloc),
594             );
595
596             next;
597         }
598
599         my $loc = $fileobj->request_space(
600             $self->tag_size( $self->{bucket_list_size} ),
601         );
602
603         $fileobj->print_at(
604             $index_tag->{offset} + ($num * $self->{long_size}),
605             pack($self->{long_pack}, $loc),
606         );
607
608         my $blist_tag = $self->write_tag(
609             $loc, SIG_BLIST,
610             chr(0)x$self->{bucket_list_size},
611         );
612
613         $fileobj->print_at( $blist_tag->{offset}, $key . pack($self->{long_pack}, $old_subloc) );
614
615         $newloc[$num] = $blist_tag->{offset};
616     }
617
618     $self->_release_space(
619         $self->tag_size( $self->{bucket_list_size} ),
620         $tag->{offset} - SIG_SIZE - $self->{data_size},
621     );
622
623     return $newtag_loc;
624 }
625
626 sub read_from_loc {
627     my $self = shift;
628     my ($subloc, $orig_key) = @_;
629
630     my $fileobj = $self->_fileobj;
631
632     my $signature = $fileobj->read_at( $subloc, SIG_SIZE );
633
634     ##
635     # If value is a hash or array, return new DBM::Deep object with correct offset
636     ##
637     if (($signature eq SIG_HASH) || ($signature eq SIG_ARRAY)) {
638         my $new_obj = DBM::Deep->new({
639             type        => $signature,
640             base_offset => $subloc,
641             fileobj     => $self->_fileobj,
642             parent      => $self->{obj},
643             parent_key  => $orig_key,
644         });
645
646         if ($new_obj->_fileobj->{autobless}) {
647             ##
648             # Skip over value and plain key to see if object needs
649             # to be re-blessed
650             ##
651             $fileobj->increment_pointer( $self->{data_size} + $self->{index_size} );
652
653             my $size = $fileobj->read_at( undef, $self->{data_size} );
654             $size = unpack($self->{data_pack}, $size);
655             if ($size) { $fileobj->increment_pointer( $size ); }
656
657             my $bless_bit = $fileobj->read_at( undef, 1 );
658             if (ord($bless_bit)) {
659                 ##
660                 # Yes, object needs to be re-blessed
661                 ##
662                 my $size = $fileobj->read_at( undef, $self->{data_size} );
663                 $size = unpack($self->{data_pack}, $size);
664
665                 my $class_name;
666                 if ($size) { $class_name = $fileobj->read_at( undef, $size ); }
667                 if (defined $class_name) { $new_obj = bless( $new_obj, $class_name ); }
668             }
669         }
670
671         return $new_obj;
672     }
673     elsif ( $signature eq SIG_INTERNAL ) {
674         my $size = $fileobj->read_at( undef, $self->{data_size} );
675         $size = unpack($self->{data_pack}, $size);
676
677         if ( $size ) {
678             my $new_loc = $fileobj->read_at( undef, $size );
679             $new_loc = unpack( $self->{long_pack}, $new_loc ); 
680             return $self->read_from_loc( $new_loc, $orig_key );
681         }
682         else {
683             return;
684         }
685     }
686     ##
687     # Otherwise return actual value
688     ##
689     elsif ( $signature eq SIG_DATA ) {
690         my $size = $fileobj->read_at( undef, $self->{data_size} );
691         $size = unpack($self->{data_pack}, $size);
692
693         my $value = '';
694         if ($size) { $value = $fileobj->read_at( undef, $size ); }
695         return $value;
696     }
697
698     ##
699     # Key exists, but content is null
700     ##
701     return;
702 }
703
704 sub get_bucket_value {
705     ##
706     # Fetch single value given tag and MD5 digested key.
707     ##
708     my $self = shift;
709     my ($tag, $md5, $orig_key) = @_;
710
711     #ACID - This is a read. Can find exact or HEAD
712     my ($subloc, $offset, $size,$is_deleted) = $self->_find_in_buckets( $tag, $md5 );
713     if ( $subloc && !$is_deleted ) {
714         return $self->read_from_loc( $subloc, $orig_key );
715     }
716     return;
717 }
718
719 sub delete_bucket {
720     ##
721     # Delete single key/value pair given tag and MD5 digested key.
722     ##
723     my $self = shift;
724     my ($tag, $md5, $orig_key) = @_;
725
726     #ACID - Although this is a mutation, we must find any transaction.
727     # This is because we need to mark something as deleted that is in the HEAD.
728     my ($subloc, $offset, $size,$is_deleted) = $self->_find_in_buckets( $tag, $md5 );
729
730     return if !$subloc;
731
732     my $fileobj = $self->_fileobj;
733
734     my @transactions;
735     if ( $fileobj->transaction_id == 0 ) {
736         @transactions = $fileobj->current_transactions;
737     }
738
739     if ( $fileobj->transaction_id == 0 ) {
740         my $value = $self->read_from_loc( $subloc, $orig_key );
741
742         for (@transactions) {
743             $fileobj->{transaction_id} = $_;
744             #XXX Need to use real key
745             $self->add_bucket( $tag, $md5, $orig_key, $value, undef, $orig_key );
746             $fileobj->{transaction_id} = 0;
747         }
748         $tag = $self->load_tag( $tag->{offset} - SIG_SIZE - $self->{data_size} );
749
750         #XXX This needs _release_space() for the value and anything below
751         $fileobj->print_at(
752             $tag->{offset} + $offset,
753             substr( $tag->{content}, $offset + $self->{bucket_size} ),
754             chr(0) x $self->{bucket_size},
755         );
756     }
757     else {
758         $self->add_bucket( $tag, $md5, '', '', 1, $orig_key );
759     }
760
761     return 1;
762 }
763
764 sub bucket_exists {
765     ##
766     # Check existence of single key given tag and MD5 digested key.
767     ##
768     my $self = shift;
769     my ($tag, $md5) = @_;
770
771     #ACID - This is a read. Can find exact or HEAD
772     my ($subloc, $offset, $size, $is_deleted) = $self->_find_in_buckets( $tag, $md5 );
773     return ($subloc && !$is_deleted) && 1;
774 }
775
776 sub find_bucket_list {
777     ##
778     # Locate offset for bucket list, given digested key
779     ##
780     my $self = shift;
781     my ($offset, $md5, $args) = @_;
782     $args = {} unless $args;
783
784     local($/,$\);
785
786     ##
787     # Locate offset for bucket list using digest index system
788     ##
789     my $tag = $self->load_tag( $offset )
790         or $self->_throw_error( "INTERNAL ERROR - Cannot find tag" );
791
792     my $ch = 0;
793     while ($tag->{signature} ne SIG_BLIST) {
794         my $num = ord substr($md5, $ch, 1);
795
796         my $ref_loc = $tag->{offset} + ($num * $self->{long_size});
797         $tag = $self->index_lookup( $tag, $num );
798
799         if (!$tag) {
800             return if !$args->{create};
801
802             my $loc = $self->_fileobj->request_space(
803                 $self->tag_size( $self->{bucket_list_size} ),
804             );
805
806             $self->_fileobj->print_at( $ref_loc, pack($self->{long_pack}, $loc) );
807
808             $tag = $self->write_tag(
809                 $loc, SIG_BLIST,
810                 chr(0)x$self->{bucket_list_size},
811             );
812
813             $tag->{ref_loc} = $ref_loc;
814             $tag->{ch} = $ch;
815
816             last;
817         }
818
819         $tag->{ch} = $ch++;
820         $tag->{ref_loc} = $ref_loc;
821     }
822
823     return $tag;
824 }
825
826 sub index_lookup {
827     ##
828     # Given index tag, lookup single entry in index and return .
829     ##
830     my $self = shift;
831     my ($tag, $index) = @_;
832
833     my $location = unpack(
834         $self->{long_pack},
835         substr(
836             $tag->{content},
837             $index * $self->{long_size},
838             $self->{long_size},
839         ),
840     );
841
842     if (!$location) { return; }
843
844     return $self->load_tag( $location );
845 }
846
847 sub traverse_index {
848     ##
849     # Scan index and recursively step into deeper levels, looking for next key.
850     ##
851     my $self = shift;
852     my ($obj, $offset, $ch, $force_return_next) = @_;
853
854     my $tag = $self->load_tag( $offset );
855
856     if ($tag->{signature} ne SIG_BLIST) {
857         my $content = $tag->{content};
858         my $start = $obj->{return_next} ? 0 : ord(substr($obj->{prev_md5}, $ch, 1));
859
860         for (my $idx = $start; $idx < (2**8); $idx++) {
861             my $subloc = unpack(
862                 $self->{long_pack},
863                 substr(
864                     $content,
865                     $idx * $self->{long_size},
866                     $self->{long_size},
867                 ),
868             );
869
870             if ($subloc) {
871                 my $result = $self->traverse_index(
872                     $obj, $subloc, $ch + 1, $force_return_next,
873                 );
874
875                 if (defined($result)) { return $result; }
876             }
877         } # index loop
878
879         $obj->{return_next} = 1;
880     } # tag is an index
881
882     else {
883         my $keys = $tag->{content};
884         if ($force_return_next) { $obj->{return_next} = 1; }
885
886         ##
887         # Iterate through buckets, looking for a key match
888         ##
889         for (my $i = 0; $i < $self->{max_buckets}; $i++) {
890             my ($key, $subloc) = $self->_get_key_subloc( $keys, $i );
891
892             # End of bucket list -- return to outer loop
893             if (!$subloc) {
894                 $obj->{return_next} = 1;
895                 last;
896             }
897             # Located previous key -- return next one found
898             elsif ($key eq $obj->{prev_md5}) {
899                 $obj->{return_next} = 1;
900                 next;
901             }
902             # Seek to bucket location and skip over signature
903             elsif ($obj->{return_next}) {
904                 my $fileobj = $self->_fileobj;
905
906                 # Skip over value to get to plain key
907                 my $sig = $fileobj->read_at( $subloc, SIG_SIZE );
908
909                 my $size = $fileobj->read_at( undef, $self->{data_size} );
910                 $size = unpack($self->{data_pack}, $size);
911                 if ($size) { $fileobj->increment_pointer( $size ); }
912
913                 # Read in plain key and return as scalar
914                 $size = $fileobj->read_at( undef, $self->{data_size} );
915                 $size = unpack($self->{data_pack}, $size);
916                 my $plain_key;
917                 if ($size) { $plain_key = $fileobj->read_at( undef, $size); }
918
919                 return $plain_key;
920             }
921         }
922
923         $obj->{return_next} = 1;
924     } # tag is a bucket list
925
926     return;
927 }
928
929 sub get_next_key {
930     ##
931     # Locate next key, given digested previous one
932     ##
933     my $self = shift;
934     my ($obj) = @_;
935
936     $obj->{prev_md5} = $_[1] ? $_[1] : undef;
937     $obj->{return_next} = 0;
938
939     ##
940     # If the previous key was not specifed, start at the top and
941     # return the first one found.
942     ##
943     if (!$obj->{prev_md5}) {
944         $obj->{prev_md5} = chr(0) x $self->{hash_size};
945         $obj->{return_next} = 1;
946     }
947
948     return $self->traverse_index( $obj, $obj->_base_offset, 0 );
949 }
950
951 # Utilities
952
953 sub _get_key_subloc {
954     my $self = shift;
955     my ($keys, $idx) = @_;
956
957     my ($key, $subloc, $size, $transaction_id, $is_deleted) = unpack(
958         # This is 'a', not 'A'. Please read the pack() documentation for the
959         # difference between the two and why it's important.
960         "a$self->{hash_size} $self->{long_pack}2 n2",
961         substr(
962             $keys,
963             ($idx * $self->{bucket_size}),
964             $self->{bucket_size},
965         ),
966     );
967
968     return ($key, $subloc, $size, $transaction_id, $is_deleted);
969 }
970
971 sub _find_in_buckets {
972     my $self = shift;
973     my ($tag, $md5, $exact) = @_;
974     $exact ||= 0;
975
976     my $trans_id = $self->_fileobj->transaction_id;
977
978     my @zero;
979
980     BUCKET:
981     for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
982         my ($key, $subloc, $size, $transaction_id, $is_deleted) = $self->_get_key_subloc(
983             $tag->{content}, $i,
984         );
985
986         my @rv = ($subloc, $i * $self->{bucket_size}, $size, $is_deleted);
987
988         unless ( $subloc ) {
989             if ( !$exact && @zero && $trans_id ) {
990                 @rv = ($zero[2], $zero[0] * $self->{bucket_size},$zero[3],$is_deleted);
991             }
992             return @rv;
993         }
994
995         next BUCKET if $key ne $md5;
996
997         # Save off the HEAD in case we need it.
998         @zero = ($i,$key,$subloc,$size,$transaction_id,$is_deleted) if $transaction_id == 0;
999
1000         next BUCKET if $transaction_id != $trans_id;
1001
1002         return @rv;
1003     }
1004
1005     return;
1006 }
1007
1008 sub _release_space {
1009     my $self = shift;
1010     my ($size, $loc) = @_;
1011
1012     my $next_loc = 0;
1013
1014     $self->_fileobj->print_at( $loc,
1015         SIG_FREE, 
1016         pack($self->{long_pack}, $size ),
1017         pack($self->{long_pack}, $next_loc ),
1018     );
1019
1020     return;
1021 }
1022
1023 sub _throw_error {
1024     die "DBM::Deep: $_[1]\n";
1025 }
1026
1027 1;
1028 __END__