Commit right before adding the keylist
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 use 5.6.0;
4
5 use strict;
6 use warnings;
7
8 our $VERSION = q(0.99_01);
9
10 use Fcntl qw( :DEFAULT :flock );
11 use Scalar::Util ();
12
13 # File-wide notes:
14 # * To add to bucket_size, make sure you modify the following:
15 #   - calculate_sizes()
16 #   - _get_key_subloc()
17 #   - add_bucket() - where the buckets are printed
18
19 ##
20 # Setup file and tag signatures.  These should never change.
21 ##
22 sub SIG_FILE     () { 'DPDB' }
23 sub SIG_HEADER   () { 'h'    }
24 sub SIG_INTERNAL () { 'i'    }
25 sub SIG_HASH     () { 'H'    }
26 sub SIG_ARRAY    () { 'A'    }
27 sub SIG_NULL     () { 'N'    }
28 sub SIG_DATA     () { 'D'    }
29 sub SIG_INDEX    () { 'I'    }
30 sub SIG_BLIST    () { 'B'    }
31 sub SIG_FREE     () { 'F'    }
32 sub SIG_KEYS     () { 'K'    }
33 sub SIG_SIZE     () {  1     }
34
35 sub new {
36     my $class = shift;
37     my ($args) = @_;
38
39     my $self = bless {
40         long_size => 4,
41         long_pack => 'N',
42         data_size => 4,
43         data_pack => 'N',
44
45         digest    => \&Digest::MD5::md5,
46         hash_size => 16,
47
48         ##
49         # Maximum number of buckets per blist before another level of indexing is
50         # done. Increase this value for slightly greater speed, but larger database
51         # files. DO NOT decrease this value below 16, due to risk of recursive
52         # reindex overrun.
53         ##
54         max_buckets => 16,
55
56         fileobj => undef,
57         obj     => undef,
58     }, $class;
59
60     if ( defined $args->{pack_size} ) {
61         if ( lc $args->{pack_size} eq 'small' ) {
62             $args->{long_size} = 2;
63             $args->{long_pack} = 'n';
64         }
65         elsif ( lc $args->{pack_size} eq 'medium' ) {
66             $args->{long_size} = 4;
67             $args->{long_pack} = 'N';
68         }
69         elsif ( lc $args->{pack_size} eq 'large' ) {
70             $args->{long_size} = 8;
71             $args->{long_pack} = 'Q';
72         }
73         else {
74             die "Unknown pack_size value: '$args->{pack_size}'\n";
75         }
76     }
77
78     # Grab the parameters we want to use
79     foreach my $param ( keys %$self ) {
80         next unless exists $args->{$param};
81         $self->{$param} = $args->{$param};
82     }
83     Scalar::Util::weaken( $self->{obj} ) if $self->{obj};
84
85     if ( $self->{max_buckets} < 16 ) {
86         warn "Floor of max_buckets is 16. Setting it to 16 from '$self->{max_buckets}'\n";
87         $self->{max_buckets} = 16;
88     }
89
90     return $self;
91 }
92
93 sub _fileobj { return $_[0]{fileobj} }
94
95 sub calculate_sizes {
96     my $self = shift;
97
98     # The 2**8 here indicates the number of different characters in the
99     # current hashing algorithm
100     #XXX Does this need to be updated with different hashing algorithms?
101     $self->{index_size}       = (2**8) * $self->{long_size};
102     $self->{bucket_size}      = $self->{hash_size} + $self->{long_size} * 3;
103     $self->{bucket_list_size} = $self->{max_buckets} * $self->{bucket_size};
104
105     return;
106 }
107
108 sub write_file_header {
109     my $self = shift;
110
111     my $loc = $self->_fileobj->request_space( length( SIG_FILE ) + 21 );
112
113     $self->_fileobj->print_at( $loc,
114         SIG_FILE,
115         SIG_HEADER,
116         pack('N', 1),  # header version
117         pack('N', 12), # header size
118         pack('N', 0),  # currently running transaction IDs
119         pack('n', $self->{long_size}),
120         pack('A', $self->{long_pack}),
121         pack('n', $self->{data_size}),
122         pack('A', $self->{data_pack}),
123         pack('n', $self->{max_buckets}),
124     );
125
126     $self->_fileobj->set_transaction_offset( 13 );
127
128     return;
129 }
130
131 sub read_file_header {
132     my $self = shift;
133
134     my $buffer = $self->_fileobj->read_at( 0, length(SIG_FILE) + 9 );
135     return unless length($buffer);
136
137     my ($file_signature, $sig_header, $header_version, $size) = unpack(
138         'A4 A N N', $buffer
139     );
140
141     unless ( $file_signature eq SIG_FILE ) {
142         $self->_fileobj->close;
143         $self->_throw_error( "Signature not found -- file is not a Deep DB" );
144     }
145
146     unless ( $sig_header eq SIG_HEADER ) {
147         $self->_fileobj->close;
148         $self->_throw_error( "Old file version found." );
149     }
150
151     my $buffer2 = $self->_fileobj->read_at( undef, $size );
152     my ($running_transactions, @values) = unpack( 'N n A n A n', $buffer2 );
153
154     $self->_fileobj->set_transaction_offset( 13 );
155
156     if ( @values < 5 || grep { !defined } @values ) {
157         $self->_fileobj->close;
158         $self->_throw_error("Corrupted file - bad header");
159     }
160
161     #XXX Add warnings if values weren't set right
162     @{$self}{qw(long_size long_pack data_size data_pack max_buckets)} = @values;
163
164     return length($buffer) + length($buffer2);
165 }
166
167 sub setup_fh {
168     my $self = shift;
169     my ($obj) = @_;
170
171     # Need to remove use of $fh here
172     my $fh = $self->_fileobj->{fh};
173     flock $fh, LOCK_EX;
174
175     #XXX The duplication of calculate_sizes needs to go away
176     unless ( $obj->{base_offset} ) {
177         my $bytes_read = $self->read_file_header;
178
179         $self->calculate_sizes;
180
181         ##
182         # File is empty -- write header and master index
183         ##
184         if (!$bytes_read) {
185             $self->_fileobj->audit( "# Database created on" );
186
187             $self->write_file_header;
188
189             $obj->{base_offset} = $self->_fileobj->request_space(
190                 $self->tag_size( $self->{index_size} ),
191             );
192
193             $self->write_tag(
194                 $obj->_base_offset, $obj->_type,
195                 chr(0)x$self->{index_size},
196             );
197
198             # Flush the filehandle
199             my $old_fh = select $fh;
200             my $old_af = $|; $| = 1; $| = $old_af;
201             select $old_fh;
202         }
203         else {
204             $obj->{base_offset} = $bytes_read;
205
206             ##
207             # Get our type from master index header
208             ##
209             my $tag = $self->load_tag($obj->_base_offset);
210             unless ( $tag ) {
211                 flock $fh, LOCK_UN;
212                 $self->_throw_error("Corrupted file, no master index record");
213             }
214
215             unless ($obj->_type eq $tag->{signature}) {
216                 flock $fh, LOCK_UN;
217                 $self->_throw_error("File type mismatch");
218             }
219         }
220     }
221     else {
222         $self->calculate_sizes;
223     }
224
225     #XXX We have to make sure we don't mess up when autoflush isn't turned on
226     $self->_fileobj->set_inode;
227
228     flock $fh, LOCK_UN;
229
230     return 1;
231 }
232
233 sub tag_size {
234     my $self = shift;
235     my ($size) = @_;
236     return SIG_SIZE + $self->{data_size} + $size;
237 }
238
239 sub write_tag {
240     ##
241     # Given offset, signature and content, create tag and write to disk
242     ##
243     my $self = shift;
244     my ($offset, $sig, $content) = @_;
245     my $size = length( $content );
246
247     $self->_fileobj->print_at(
248         $offset, 
249         $sig, pack($self->{data_pack}, $size), $content,
250     );
251
252     return unless defined $offset;
253
254     return {
255         signature => $sig,
256         #XXX Is this even used?
257         size      => $size,
258         offset    => $offset + SIG_SIZE + $self->{data_size},
259         content   => $content
260     };
261 }
262
263 sub load_tag {
264     ##
265     # Given offset, load single tag and return signature, size and data
266     ##
267     my $self = shift;
268     my ($offset) = @_;
269
270     my $fileobj = $self->_fileobj;
271
272     my ($sig, $size) = unpack(
273         "A $self->{data_pack}",
274         $fileobj->read_at( $offset, SIG_SIZE + $self->{data_size} ),
275     );
276
277     return {
278         signature => $sig,
279         #XXX Is this even used?
280         size      => $size,
281         offset    => $offset + SIG_SIZE + $self->{data_size},
282         content   => $fileobj->read_at( undef, $size ),
283     };
284 }
285
286 sub add_bucket {
287     ##
288     # Adds one key/value pair to bucket list, given offset, MD5 digest of key,
289     # plain (undigested) key and value.
290     ##
291     my $self = shift;
292     my ($tag, $md5, $plain_key, $value, $deleted, $orig_key) = @_;
293     $deleted ||= 0;
294
295     # This verifies that only supported values will be stored.
296     {
297         my $r = Scalar::Util::reftype( $value );
298
299         last if !defined $r;
300         last if $r eq 'HASH';
301         last if $r eq 'ARRAY';
302
303         $self->_throw_error(
304             "Storage of references of type '$r' is not supported."
305         );
306     }
307
308     my $fileobj = $self->_fileobj;
309
310     my $actual_length = $self->_length_needed( $value, $plain_key );
311
312     #ACID - This is a mutation. Must only find the exact transaction
313     my ($subloc, $offset, $size,$is_deleted) = $self->_find_in_buckets( $tag, $md5, 1 );
314
315     my @transactions;
316     if ( $fileobj->transaction_id == 0 ) {
317         @transactions = $fileobj->current_transactions;
318     }
319
320 #    $self->_release_space( $size, $subloc );
321     # Updating a known md5
322 #XXX This needs updating to use _release_space
323     my $location;
324     if ( $subloc ) {
325         if ($actual_length <= $size) {
326             $location = $subloc;
327         }
328         else {
329             $location = $fileobj->request_space( $actual_length );
330
331             $fileobj->print_at( $tag->{offset} + $offset + $self->{hash_size},
332                 pack($self->{long_pack}, $location ),
333                 pack($self->{long_pack}, $actual_length ),
334                 pack('n n', $fileobj->transaction_id, $deleted ),
335             );
336         }
337
338         my $old_value = $self->read_from_loc( $subloc, $orig_key );
339         for ( @transactions ) {
340             my $tag2 = $self->load_tag( $tag->{offset} - SIG_SIZE - $self->{data_size} );
341             $fileobj->{transaction_id} = $_;
342             $self->add_bucket( $tag2, $md5, $orig_key, $old_value, undef, $orig_key );
343             $fileobj->{transaction_id} = 0;
344         }
345         $tag = $self->load_tag( $tag->{offset} - SIG_SIZE - $self->{data_size} );
346     }
347     # Adding a new md5
348     elsif ( defined $offset ) {
349         $location = $fileobj->request_space( $actual_length );
350
351         $fileobj->print_at( $tag->{offset} + $offset,
352             $md5,
353             pack($self->{long_pack}, $location ),
354             pack($self->{long_pack}, $actual_length ),
355             pack('n n', $fileobj->transaction_id, $deleted ),
356         );
357
358         for ( @transactions ) {
359             my $tag2 = $self->load_tag( $tag->{offset} - SIG_SIZE - $self->{data_size} );
360             $fileobj->{transaction_id} = $_;
361             $self->add_bucket( $tag2, $md5, '', '', 1, $orig_key );
362             $fileobj->{transaction_id} = 0;
363         }
364         $tag = $self->load_tag( $tag->{offset} - SIG_SIZE - $self->{data_size} );
365     }
366     # If bucket didn't fit into list, split into a new index level
367     # split_index() will do the $self->_fileobj->request_space() call
368     #XXX It also needs to be transactionally aware
369     else {
370         $location = $self->split_index( $md5, $tag );
371     }
372
373     $self->write_value( $location, $plain_key, $value, $orig_key );
374
375     return 1;
376 }
377
378 sub write_value {
379     my $self = shift;
380     my ($location, $key, $value, $orig_key) = @_;
381
382     my $fileobj = $self->_fileobj;
383
384     my $dbm_deep_obj = _get_dbm_object( $value );
385     if ( $dbm_deep_obj && $dbm_deep_obj->_fileobj ne $fileobj ) {
386         $self->_throw_error( "Cannot cross-reference. Use export() instead" );
387     }
388
389     ##
390     # Write signature based on content type, set content length and write
391     # actual value.
392     ##
393     my $r = Scalar::Util::reftype( $value ) || '';
394     if ( $dbm_deep_obj ) {
395         $self->write_tag( $location, SIG_INTERNAL,pack($self->{long_pack}, $dbm_deep_obj->_base_offset) );
396     }
397     elsif ($r eq 'HASH') {
398         if ( !$dbm_deep_obj && tied %{$value} ) {
399             $self->_throw_error( "Cannot store something that is tied" );
400         }
401         $self->write_tag( $location, SIG_HASH, chr(0)x$self->{index_size} );
402     }
403     elsif ($r eq 'ARRAY') {
404         if ( !$dbm_deep_obj && tied @{$value} ) {
405             $self->_throw_error( "Cannot store something that is tied" );
406         }
407         $self->write_tag( $location, SIG_ARRAY, chr(0)x$self->{index_size} );
408     }
409     elsif (!defined($value)) {
410         $self->write_tag( $location, SIG_NULL, '' );
411     }
412     else {
413         $self->write_tag( $location, SIG_DATA, $value );
414     }
415
416     ##
417     # Plain key is stored AFTER value, as keys are typically fetched less often.
418     ##
419     $fileobj->print_at( undef, pack($self->{data_pack}, length($key)) . $key );
420
421     # Internal references don't care about autobless
422     return 1 if $dbm_deep_obj;
423
424     ##
425     # If value is blessed, preserve class name
426     ##
427     if ( $fileobj->{autobless} ) {
428         if ( defined( my $c = Scalar::Util::blessed($value) ) ) {
429             $fileobj->print_at( undef, chr(1), pack($self->{data_pack}, length($c)) . $c );
430         }
431         else {
432             $fileobj->print_at( undef, chr(0) );
433         }
434     }
435
436     ##
437     # Tie the passed in reference so that changes to it are reflected in the
438     # datafile. The use of $location as the base_offset will act as the
439     # the linkage between parent and child.
440     #
441     # The overall assignment is a hack around the fact that just tying doesn't
442     # store the values. This may not be the wrong thing to do.
443     ##
444     if ($r eq 'HASH') {
445         my %x = %$value;
446         tie %$value, 'DBM::Deep', {
447             base_offset => $location,
448             fileobj     => $fileobj,
449             parent      => $self->{obj},
450             parent_key  => $orig_key,
451         };
452         %$value = %x;
453     }
454     elsif ($r eq 'ARRAY') {
455         my @x = @$value;
456         tie @$value, 'DBM::Deep', {
457             base_offset => $location,
458             fileobj     => $fileobj,
459             parent      => $self->{obj},
460             parent_key  => $orig_key,
461         };
462         @$value = @x;
463     }
464
465     return 1;
466 }
467
468 sub split_index {
469     my $self = shift;
470     my ($md5, $tag) = @_;
471
472     my $fileobj = $self->_fileobj;
473
474     my $loc = $fileobj->request_space(
475         $self->tag_size( $self->{index_size} ),
476     );
477
478     $fileobj->print_at( $tag->{ref_loc}, pack($self->{long_pack}, $loc) );
479
480     my $index_tag = $self->write_tag(
481         $loc, SIG_INDEX,
482         chr(0)x$self->{index_size},
483     );
484
485     my $newtag_loc = $fileobj->request_space(
486         $self->tag_size( $self->{bucket_list_size} ),
487     );
488
489     my $keys = $tag->{content}
490              . $md5 . pack($self->{long_pack}, $newtag_loc)
491                     . pack($self->{long_pack}, 0)  # size
492                     . pack($self->{long_pack}, 0); # transaction ID
493
494     my @newloc = ();
495     BUCKET:
496     # The <= here is deliberate - we have max_buckets+1 keys to iterate
497     # through, unlike every other loop that uses max_buckets as a stop.
498     for (my $i = 0; $i <= $self->{max_buckets}; $i++) {
499         my ($key, $old_subloc, $size) = $self->_get_key_subloc( $keys, $i );
500
501         die "[INTERNAL ERROR]: No key in split_index()\n" unless $key;
502         die "[INTERNAL ERROR]: No subloc in split_index()\n" unless $old_subloc;
503
504         my $num = ord(substr($key, $tag->{ch} + 1, 1));
505
506         if ($newloc[$num]) {
507             my $subkeys = $fileobj->read_at( $newloc[$num], $self->{bucket_list_size} );
508
509             # This is looking for the first empty spot
510             my ($subloc, $offset, $size) = $self->_find_in_buckets(
511                 { content => $subkeys }, '',
512             );
513
514             $fileobj->print_at(
515                 $newloc[$num] + $offset,
516                 $key, pack($self->{long_pack}, $old_subloc),
517             );
518
519             next;
520         }
521
522         my $loc = $fileobj->request_space(
523             $self->tag_size( $self->{bucket_list_size} ),
524         );
525
526         $fileobj->print_at(
527             $index_tag->{offset} + ($num * $self->{long_size}),
528             pack($self->{long_pack}, $loc),
529         );
530
531         my $blist_tag = $self->write_tag(
532             $loc, SIG_BLIST,
533             chr(0)x$self->{bucket_list_size},
534         );
535
536         $fileobj->print_at( $blist_tag->{offset}, $key . pack($self->{long_pack}, $old_subloc) );
537
538         $newloc[$num] = $blist_tag->{offset};
539     }
540
541     $self->_release_space(
542         $self->tag_size( $self->{bucket_list_size} ),
543         $tag->{offset} - SIG_SIZE - $self->{data_size},
544     );
545
546     return $newtag_loc;
547 }
548
549 sub read_from_loc {
550     my $self = shift;
551     my ($subloc, $orig_key) = @_;
552
553     my $fileobj = $self->_fileobj;
554
555     my $signature = $fileobj->read_at( $subloc, SIG_SIZE );
556
557     ##
558     # If value is a hash or array, return new DBM::Deep object with correct offset
559     ##
560     if (($signature eq SIG_HASH) || ($signature eq SIG_ARRAY)) {
561         my $new_obj = DBM::Deep->new({
562             type        => $signature,
563             base_offset => $subloc,
564             fileobj     => $self->_fileobj,
565             parent      => $self->{obj},
566             parent_key  => $orig_key,
567         });
568
569         if ($new_obj->_fileobj->{autobless}) {
570             ##
571             # Skip over value and plain key to see if object needs
572             # to be re-blessed
573             ##
574             $fileobj->increment_pointer( $self->{data_size} + $self->{index_size} );
575
576             my $size = $fileobj->read_at( undef, $self->{data_size} );
577             $size = unpack($self->{data_pack}, $size);
578             if ($size) { $fileobj->increment_pointer( $size ); }
579
580             my $bless_bit = $fileobj->read_at( undef, 1 );
581             if ( ord($bless_bit) ) {
582                 my $size = unpack(
583                     $self->{data_pack},
584                     $fileobj->read_at( undef, $self->{data_size} ),
585                 );
586
587                 if ( $size ) {
588                     $new_obj = bless $new_obj, $fileobj->read_at( undef, $size );
589                 }
590             }
591         }
592
593         return $new_obj;
594     }
595     elsif ( $signature eq SIG_INTERNAL ) {
596         my $size = $fileobj->read_at( undef, $self->{data_size} );
597         $size = unpack($self->{data_pack}, $size);
598
599         if ( $size ) {
600             my $new_loc = $fileobj->read_at( undef, $size );
601             $new_loc = unpack( $self->{long_pack}, $new_loc ); 
602             return $self->read_from_loc( $new_loc, $orig_key );
603         }
604         else {
605             return;
606         }
607     }
608     ##
609     # Otherwise return actual value
610     ##
611     elsif ( $signature eq SIG_DATA ) {
612         my $size = $fileobj->read_at( undef, $self->{data_size} );
613         $size = unpack($self->{data_pack}, $size);
614
615         my $value = $size ? $fileobj->read_at( undef, $size ) : '';
616         return $value;
617     }
618
619     ##
620     # Key exists, but content is null
621     ##
622     return;
623 }
624
625 sub get_bucket_value {
626     ##
627     # Fetch single value given tag and MD5 digested key.
628     ##
629     my $self = shift;
630     my ($tag, $md5, $orig_key) = @_;
631
632     #ACID - This is a read. Can find exact or HEAD
633     my ($subloc, $offset, $size, $is_deleted) = $self->_find_in_buckets( $tag, $md5 );
634
635     if ( !$subloc ) {
636         #XXX Need to use real key
637 #        $self->add_bucket( $tag, $md5, $orig_key, undef, undef, $orig_key );
638 #        return;
639     }
640     elsif ( !$is_deleted ) {
641         return $self->read_from_loc( $subloc, $orig_key );
642     }
643
644     return;
645 }
646
647 sub delete_bucket {
648     ##
649     # Delete single key/value pair given tag and MD5 digested key.
650     ##
651     my $self = shift;
652     my ($tag, $md5, $orig_key) = @_;
653
654     #ACID - Although this is a mutation, we must find any transaction.
655     # This is because we need to mark something as deleted that is in the HEAD.
656     my ($subloc, $offset, $size,$is_deleted) = $self->_find_in_buckets( $tag, $md5 );
657
658     return if !$subloc;
659
660     my $fileobj = $self->_fileobj;
661
662     my @transactions;
663     if ( $fileobj->transaction_id == 0 ) {
664         @transactions = $fileobj->current_transactions;
665     }
666
667     if ( $fileobj->transaction_id == 0 ) {
668         my $value = $self->read_from_loc( $subloc, $orig_key );
669
670         for (@transactions) {
671             $fileobj->{transaction_id} = $_;
672             #XXX Need to use real key
673             $self->add_bucket( $tag, $md5, $orig_key, $value, undef, $orig_key );
674             $fileobj->{transaction_id} = 0;
675         }
676         $tag = $self->load_tag( $tag->{offset} - SIG_SIZE - $self->{data_size} );
677
678         #XXX This needs _release_space() for the value and anything below
679         $fileobj->print_at(
680             $tag->{offset} + $offset,
681             substr( $tag->{content}, $offset + $self->{bucket_size} ),
682             chr(0) x $self->{bucket_size},
683         );
684     }
685     else {
686         $self->add_bucket( $tag, $md5, '', '', 1, $orig_key );
687     }
688
689     return 1;
690 }
691
692 sub bucket_exists {
693     ##
694     # Check existence of single key given tag and MD5 digested key.
695     ##
696     my $self = shift;
697     my ($tag, $md5) = @_;
698
699     #ACID - This is a read. Can find exact or HEAD
700     my ($subloc, $offset, $size, $is_deleted) = $self->_find_in_buckets( $tag, $md5 );
701     return ($subloc && !$is_deleted) && 1;
702 }
703
704 sub find_blist {
705     ##
706     # Locate offset for bucket list, given digested key
707     ##
708     my $self = shift;
709     my ($offset, $md5, $args) = @_;
710     $args = {} unless $args;
711
712     ##
713     # Locate offset for bucket list using digest index system
714     ##
715     my $tag = $self->load_tag( $offset )
716         or $self->_throw_error( "INTERNAL ERROR - Cannot find tag" );
717
718     my $ch = 0;
719     while ($tag->{signature} ne SIG_BLIST) {
720         my $num = ord substr($md5, $ch, 1);
721
722         my $ref_loc = $tag->{offset} + ($num * $self->{long_size});
723         $tag = $self->index_lookup( $tag, $num );
724
725         if (!$tag) {
726             return if !$args->{create};
727
728             my $loc = $self->_fileobj->request_space(
729                 $self->tag_size( $self->{bucket_list_size} ),
730             );
731
732             $self->_fileobj->print_at( $ref_loc, pack($self->{long_pack}, $loc) );
733
734             $tag = $self->write_tag(
735                 $loc, SIG_BLIST,
736                 chr(0)x$self->{bucket_list_size},
737             );
738
739             $tag->{ref_loc} = $ref_loc;
740             $tag->{ch} = $ch;
741
742             last;
743         }
744
745         $tag->{ch} = $ch++;
746         $tag->{ref_loc} = $ref_loc;
747     }
748
749     return $tag;
750 }
751
752 sub index_lookup {
753     ##
754     # Given index tag, lookup single entry in index and return .
755     ##
756     my $self = shift;
757     my ($tag, $index) = @_;
758
759     my $location = unpack(
760         $self->{long_pack},
761         substr(
762             $tag->{content},
763             $index * $self->{long_size},
764             $self->{long_size},
765         ),
766     );
767
768     if (!$location) { return; }
769
770     return $self->load_tag( $location );
771 }
772
773 sub traverse_index {
774     ##
775     # Scan index and recursively step into deeper levels, looking for next key.
776     ##
777     my $self = shift;
778     my ($obj, $offset, $ch, $force_return_next) = @_;
779
780     my $tag = $self->load_tag( $offset );
781
782     if ($tag->{signature} ne SIG_BLIST) {
783         my $content = $tag->{content};
784         my $start = $obj->{return_next} ? 0 : ord(substr($obj->{prev_md5}, $ch, 1));
785
786         for (my $idx = $start; $idx < (2**8); $idx++) {
787             my $subloc = unpack(
788                 $self->{long_pack},
789                 substr(
790                     $content,
791                     $idx * $self->{long_size},
792                     $self->{long_size},
793                 ),
794             );
795
796             if ($subloc) {
797                 my $result = $self->traverse_index(
798                     $obj, $subloc, $ch + 1, $force_return_next,
799                 );
800
801                 if (defined($result)) { return $result; }
802             }
803         } # index loop
804
805         $obj->{return_next} = 1;
806     }
807     # This is the bucket list
808     else {
809         my $keys = $tag->{content};
810         if ($force_return_next) { $obj->{return_next} = 1; }
811
812         ##
813         # Iterate through buckets, looking for a key match
814         ##
815         my $transaction_id = $self->_fileobj->transaction_id;
816         for (my $i = 0; $i < $self->{max_buckets}; $i++) {
817             my ($key, $subloc, $size, $trans_id, $is_deleted) = $self->_get_key_subloc( $keys, $i );
818
819             next if $is_deleted;
820 #XXX Need to find all the copies of this key to find out if $transaction_id has it
821 #XXX marked as deleted, in use, or what.
822             next if $trans_id && $trans_id != $transaction_id;
823
824             # End of bucket list -- return to outer loop
825             if (!$subloc) {
826                 $obj->{return_next} = 1;
827                 last;
828             }
829             # Located previous key -- return next one found
830             elsif ($key eq $obj->{prev_md5}) {
831                 $obj->{return_next} = 1;
832                 next;
833             }
834             # Seek to bucket location and skip over signature
835             elsif ($obj->{return_next}) {
836                 my $fileobj = $self->_fileobj;
837
838                 # Skip over value to get to plain key
839                 my $sig = $fileobj->read_at( $subloc, SIG_SIZE );
840
841                 my $size = $fileobj->read_at( undef, $self->{data_size} );
842                 $size = unpack($self->{data_pack}, $size);
843                 if ($size) { $fileobj->increment_pointer( $size ); }
844
845                 # Read in plain key and return as scalar
846                 $size = $fileobj->read_at( undef, $self->{data_size} );
847                 $size = unpack($self->{data_pack}, $size);
848
849                 my $plain_key;
850                 if ($size) { $plain_key = $fileobj->read_at( undef, $size); }
851                 return $plain_key;
852             }
853         }
854
855         $obj->{return_next} = 1;
856     }
857
858     return;
859 }
860
861 sub get_next_key {
862     ##
863     # Locate next key, given digested previous one
864     ##
865     my $self = shift;
866     my ($obj) = @_;
867
868     $obj->{prev_md5} = $_[1] ? $_[1] : undef;
869     $obj->{return_next} = 0;
870
871     ##
872     # If the previous key was not specifed, start at the top and
873     # return the first one found.
874     ##
875     if (!$obj->{prev_md5}) {
876         $obj->{prev_md5} = chr(0) x $self->{hash_size};
877         $obj->{return_next} = 1;
878     }
879
880     return $self->traverse_index( $obj, $obj->_base_offset, 0 );
881 }
882
883 # Utilities
884
885 sub _get_key_subloc {
886     my $self = shift;
887     my ($keys, $idx) = @_;
888
889     my ($key, $subloc, $size, $transaction_id, $is_deleted) = unpack(
890         # This is 'a', not 'A'. Please read the pack() documentation for the
891         # difference between the two and why it's important.
892         "a$self->{hash_size} $self->{long_pack}2 n2",
893         substr(
894             $keys,
895             ($idx * $self->{bucket_size}),
896             $self->{bucket_size},
897         ),
898     );
899
900     return ($key, $subloc, $size, $transaction_id, $is_deleted);
901 }
902
903 sub _find_in_buckets {
904     my $self = shift;
905     my ($tag, $md5, $exact) = @_;
906     $exact ||= 0;
907
908     my $trans_id = $self->_fileobj->transaction_id;
909
910     my @zero;
911
912     BUCKET:
913     for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
914         my ($key, $subloc, $size, $transaction_id, $is_deleted) = $self->_get_key_subloc(
915             $tag->{content}, $i,
916         );
917
918         my @rv = ($subloc, $i * $self->{bucket_size}, $size, $is_deleted);
919
920         unless ( $subloc ) {
921             if ( !$exact && @zero && $trans_id ) {
922                 @rv = ($zero[2], $zero[0] * $self->{bucket_size},$zero[3],$is_deleted);
923             }
924             return @rv;
925         }
926
927         next BUCKET if $key ne $md5;
928
929         # Save off the HEAD in case we need it.
930         @zero = ($i,$key,$subloc,$size,$transaction_id,$is_deleted) if $transaction_id == 0;
931
932         next BUCKET if $transaction_id != $trans_id;
933
934         return @rv;
935     }
936
937     return;
938 }
939
940 sub _release_space {
941     my $self = shift;
942     my ($size, $loc) = @_;
943
944     my $next_loc = 0;
945
946     $self->_fileobj->print_at( $loc,
947         SIG_FREE, 
948         pack($self->{long_pack}, $size ),
949         pack($self->{long_pack}, $next_loc ),
950     );
951
952     return;
953 }
954
955 sub _throw_error {
956     die "DBM::Deep: $_[1]\n";
957 }
958
959 sub _get_dbm_object {
960     my $item = shift;
961
962     my $obj = eval {
963         local $SIG{__DIE__};
964         if ($item->isa( 'DBM::Deep' )) {
965             return $item;
966         }
967         return;
968     };
969     return $obj if $obj;
970
971     my $r = Scalar::Util::reftype( $item ) || '';
972     if ( $r eq 'HASH' ) {
973         my $obj = eval {
974             local $SIG{__DIE__};
975             my $obj = tied(%$item);
976             if ($obj->isa( 'DBM::Deep' )) {
977                 return $obj;
978             }
979             return;
980         };
981         return $obj if $obj;
982     }
983     elsif ( $r eq 'ARRAY' ) {
984         my $obj = eval {
985             local $SIG{__DIE__};
986             my $obj = tied(@$item);
987             if ($obj->isa( 'DBM::Deep' )) {
988                 return $obj;
989             }
990             return;
991         };
992         return $obj if $obj;
993     }
994
995     return;
996 }
997
998 sub _length_needed {
999     my $self = shift;
1000     my ($value, $key) = @_;
1001
1002     my $is_dbm_deep = eval {
1003         local $SIG{'__DIE__'};
1004         $value->isa( 'DBM::Deep' );
1005     };
1006
1007     my $len = SIG_SIZE
1008             + $self->{data_size} # size for value
1009             + $self->{data_size} # size for key
1010             + length( $key );    # length of key
1011
1012     if ( $is_dbm_deep && $value->_fileobj eq $self->_fileobj ) {
1013         # long_size is for the internal reference
1014         return $len + $self->{long_size};
1015     }
1016
1017     if ( $self->_fileobj->{autobless} ) {
1018         # This is for the bit saying whether or not this thing is blessed.
1019         $len += 1;
1020     }
1021
1022     my $r = Scalar::Util::reftype( $value ) || '';
1023     unless ( $r eq 'HASH' || $r eq 'ARRAY' ) {
1024         if ( defined $value ) {
1025             $len += length( $value );
1026         }
1027         return $len;
1028     }
1029
1030     $len += $self->{index_size};
1031
1032     # if autobless is enabled, must also take into consideration
1033     # the class name as it is stored after the key.
1034     if ( $self->_fileobj->{autobless} ) {
1035         my $c = Scalar::Util::blessed($value);
1036         if ( defined $c && !$is_dbm_deep ) {
1037             $len += $self->{data_size} + length($c);
1038         }
1039     }
1040
1041     return $len;
1042 }
1043
1044 1;
1045 __END__