r11705@rob-kinyons-powerbook58: rob | 2006-05-01 13:19:45 -0400
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 use 5.6.0;
4
5 use strict;
6 use warnings;
7
8 our $VERSION = q(0.99_01);
9
10 use Fcntl qw( :DEFAULT :flock );
11 use Scalar::Util ();
12
13 # File-wide notes:
14 # * To add to bucket_size, make sure you modify the following:
15 #   - calculate_sizes()
16 #   - _get_key_subloc()
17 #   - add_bucket() - where the buckets are printed
18
19 ##
20 # Setup file and tag signatures.  These should never change.
21 ##
22 sub SIG_FILE     () { 'DPDB' }
23 sub SIG_HEADER   () { 'h'    }
24 sub SIG_INTERNAL () { 'i'    }
25 sub SIG_HASH     () { 'H'    }
26 sub SIG_ARRAY    () { 'A'    }
27 sub SIG_NULL     () { 'N'    }
28 sub SIG_DATA     () { 'D'    }
29 sub SIG_INDEX    () { 'I'    }
30 sub SIG_BLIST    () { 'B'    }
31 sub SIG_FREE     () { 'F'    }
32 sub SIG_KEYS     () { 'K'    }
33 sub SIG_SIZE     () {  1     }
34
35 sub new {
36     my $class = shift;
37     my ($args) = @_;
38
39     my $self = bless {
40         long_size => 4,
41         long_pack => 'N',
42         data_size => 4,
43         data_pack => 'N',
44
45         digest    => \&Digest::MD5::md5,
46         hash_size => 16,
47
48         ##
49         # Maximum number of buckets per blist before another level of indexing is
50         # done. Increase this value for slightly greater speed, but larger database
51         # files. DO NOT decrease this value below 16, due to risk of recursive
52         # reindex overrun.
53         ##
54         max_buckets => 16,
55
56         fileobj => undef,
57         obj     => undef,
58     }, $class;
59
60     if ( defined $args->{pack_size} ) {
61         if ( lc $args->{pack_size} eq 'small' ) {
62             $args->{long_size} = 2;
63             $args->{long_pack} = 'n';
64         }
65         elsif ( lc $args->{pack_size} eq 'medium' ) {
66             $args->{long_size} = 4;
67             $args->{long_pack} = 'N';
68         }
69         elsif ( lc $args->{pack_size} eq 'large' ) {
70             $args->{long_size} = 8;
71             $args->{long_pack} = 'Q';
72         }
73         else {
74             die "Unknown pack_size value: '$args->{pack_size}'\n";
75         }
76     }
77
78     # Grab the parameters we want to use
79     foreach my $param ( keys %$self ) {
80         next unless exists $args->{$param};
81         $self->{$param} = $args->{$param};
82     }
83     Scalar::Util::weaken( $self->{obj} ) if $self->{obj};
84
85     if ( $self->{max_buckets} < 16 ) {
86         warn "Floor of max_buckets is 16. Setting it to 16 from '$self->{max_buckets}'\n";
87         $self->{max_buckets} = 16;
88     }
89
90     return $self;
91 }
92
93 sub _fileobj { return $_[0]{fileobj} }
94
95 sub calculate_sizes {
96     my $self = shift;
97
98     # The 2**8 here indicates the number of different characters in the
99     # current hashing algorithm
100     #XXX Does this need to be updated with different hashing algorithms?
101     $self->{hash_chars_used}  = (2**8);
102     $self->{index_size}       = $self->{hash_chars_used} * $self->{long_size};
103
104     $self->{bucket_size}      = $self->{hash_size} + $self->{long_size} * 2;
105     $self->{bucket_list_size} = $self->{max_buckets} * $self->{bucket_size};
106
107     $self->{key_size}         = $self->{long_size} * 2;
108     $self->{keyloc_size}      = $self->{max_buckets} * $self->{key_size};
109
110     return;
111 }
112
113 sub write_file_header {
114     my $self = shift;
115
116     my $loc = $self->_fileobj->request_space( length( SIG_FILE ) + 33 );
117
118     $self->_fileobj->print_at( $loc,
119         SIG_FILE,
120         SIG_HEADER,
121         pack('N', 1),  # header version
122         pack('N', 24), # header size
123         pack('N4', 0, 0, 0, 0),  # currently running transaction IDs
124         pack('n', $self->{long_size}),
125         pack('A', $self->{long_pack}),
126         pack('n', $self->{data_size}),
127         pack('A', $self->{data_pack}),
128         pack('n', $self->{max_buckets}),
129     );
130
131     $self->_fileobj->set_transaction_offset( 13 );
132
133     return;
134 }
135
136 sub read_file_header {
137     my $self = shift;
138
139     my $buffer = $self->_fileobj->read_at( 0, length(SIG_FILE) + 9 );
140     return unless length($buffer);
141
142     my ($file_signature, $sig_header, $header_version, $size) = unpack(
143         'A4 A N N', $buffer
144     );
145
146     unless ( $file_signature eq SIG_FILE ) {
147         $self->_fileobj->close;
148         $self->_throw_error( "Signature not found -- file is not a Deep DB" );
149     }
150
151     unless ( $sig_header eq SIG_HEADER ) {
152         $self->_fileobj->close;
153         $self->_throw_error( "Old file version found." );
154     }
155
156     my $buffer2 = $self->_fileobj->read_at( undef, $size );
157     my ($a1, $a2, $a3, $a4, @values) = unpack( 'N4 n A n A n', $buffer2 );
158
159     $self->_fileobj->set_transaction_offset( 13 );
160
161     if ( @values < 5 || grep { !defined } @values ) {
162         $self->_fileobj->close;
163         $self->_throw_error("Corrupted file - bad header");
164     }
165
166     #XXX Add warnings if values weren't set right
167     @{$self}{qw(long_size long_pack data_size data_pack max_buckets)} = @values;
168
169     return length($buffer) + length($buffer2);
170 }
171
172 sub setup_fh {
173     my $self = shift;
174     my ($obj) = @_;
175
176     # Need to remove use of $fh here
177     my $fh = $self->_fileobj->{fh};
178     flock $fh, LOCK_EX;
179
180     #XXX The duplication of calculate_sizes needs to go away
181     unless ( $obj->{base_offset} ) {
182         my $bytes_read = $self->read_file_header;
183
184         $self->calculate_sizes;
185
186         ##
187         # File is empty -- write header and master index
188         ##
189         if (!$bytes_read) {
190             $self->_fileobj->audit( "# Database created on" );
191
192             $self->write_file_header;
193
194             $obj->{base_offset} = $self->_fileobj->request_space(
195                 $self->tag_size( $self->{index_size} ),
196             );
197
198             $self->write_tag(
199                 $obj->_base_offset, $obj->_type,
200                 chr(0)x$self->{index_size},
201             );
202
203             # Flush the filehandle
204             my $old_fh = select $fh;
205             my $old_af = $|; $| = 1; $| = $old_af;
206             select $old_fh;
207         }
208         else {
209             $obj->{base_offset} = $bytes_read;
210
211             ##
212             # Get our type from master index header
213             ##
214             my $tag = $self->load_tag($obj->_base_offset);
215             unless ( $tag ) {
216                 flock $fh, LOCK_UN;
217                 $self->_throw_error("Corrupted file, no master index record");
218             }
219
220             unless ($obj->_type eq $tag->{signature}) {
221                 flock $fh, LOCK_UN;
222                 $self->_throw_error("File type mismatch");
223             }
224         }
225     }
226     else {
227         $self->calculate_sizes;
228     }
229
230     #XXX We have to make sure we don't mess up when autoflush isn't turned on
231     $self->_fileobj->set_inode;
232
233     flock $fh, LOCK_UN;
234
235     return 1;
236 }
237
238 sub tag_size {
239     my $self = shift;
240     my ($size) = @_;
241     return SIG_SIZE + $self->{data_size} + $size;
242 }
243
244 sub write_tag {
245     ##
246     # Given offset, signature and content, create tag and write to disk
247     ##
248     my $self = shift;
249     my ($offset, $sig, $content) = @_;
250     my $size = length( $content );
251
252     $self->_fileobj->print_at(
253         $offset, 
254         $sig, pack($self->{data_pack}, $size), $content,
255     );
256
257     return unless defined $offset;
258
259     return {
260         signature => $sig,
261         #XXX Is this even used?
262         size      => $size,
263         offset    => $offset + SIG_SIZE + $self->{data_size},
264         content   => $content
265     };
266 }
267
268 sub load_tag {
269     ##
270     # Given offset, load single tag and return signature, size and data
271     ##
272     my $self = shift;
273     my ($offset) = @_;
274
275     my $fileobj = $self->_fileobj;
276
277     my ($sig, $size) = unpack(
278         "A $self->{data_pack}",
279         $fileobj->read_at( $offset, SIG_SIZE + $self->{data_size} ),
280     );
281
282     return {
283         signature => $sig,
284         #XXX Is this even used?
285         size      => $size,
286         offset    => $offset + SIG_SIZE + $self->{data_size},
287         content   => $fileobj->read_at( undef, $size ),
288     };
289 }
290
291 sub find_keyloc {
292     my $self = shift;
293     my ($tag, $transaction_id) = @_;
294     $transaction_id = $self->_fileobj->transaction_id
295         unless defined $transaction_id;
296
297     for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
298         my ($loc, $trans_id, $is_deleted) = unpack(
299             "$self->{long_pack} C C",
300             substr( $tag->{content}, $i * $self->{key_size}, $self->{key_size} ),
301         );
302
303         if ( $loc == 0 ) {
304             return ( $loc, $is_deleted, $i * $self->{key_size} );
305         }
306
307         next if $transaction_id != $trans_id;
308
309         return ( $loc, $is_deleted, $i * $self->{key_size} );
310     }
311
312     return;
313 }
314
315 sub add_bucket {
316     ##
317     # Adds one key/value pair to bucket list, given offset, MD5 digest of key,
318     # plain (undigested) key and value.
319     ##
320     my $self = shift;
321     my ($tag, $md5, $plain_key, $value, $deleted, $orig_key) = @_;
322
323     # This verifies that only supported values will be stored.
324     {
325         my $r = Scalar::Util::reftype( $value );
326
327         last if !defined $r;
328         last if $r eq 'HASH';
329         last if $r eq 'ARRAY';
330
331         $self->_throw_error(
332             "Storage of references of type '$r' is not supported."
333         );
334     }
335
336     my $fileobj = $self->_fileobj;
337
338     #ACID - This is a mutation. Must only find the exact transaction
339     my ($keyloc, $offset) = $self->_find_in_buckets( $tag, $md5, 1 );
340
341     my @transactions;
342     if ( $fileobj->transaction_id == 0 ) {
343         @transactions = $fileobj->current_transactions;
344     }
345
346 #    $self->_release_space( $size, $subloc );
347 #XXX This needs updating to use _release_space
348
349     my $location;
350     my $size = $self->_length_needed( $value, $plain_key );
351
352     # Updating a known md5
353     if ( $keyloc ) {
354         my $keytag = $self->load_tag( $keyloc );
355         my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
356
357         if ( $subloc && !$is_deleted && @transactions ) {
358             my $old_value = $self->read_from_loc( $subloc, $orig_key );
359             my $old_size = $self->_length_needed( $old_value, $plain_key );
360
361             for my $trans_id ( @transactions ) {
362                 my ($loc, $is_deleted, $offset2) = $self->find_keyloc( $keytag, $trans_id );
363                 unless ($loc) {
364                     my $location2 = $fileobj->request_space( $old_size );
365                     $fileobj->print_at( $keytag->{offset} + $offset2,
366                         pack($self->{long_pack}, $location2 ),
367                         pack( 'C C', $trans_id, 0 ),
368                     );
369                     $self->write_value( $location2, $plain_key, $old_value, $orig_key );
370                 }
371             }
372         }
373
374         $location = $self->_fileobj->request_space( $size );
375         #XXX This needs to be transactionally-aware in terms of which keytag->{offset} to use
376         $fileobj->print_at( $keytag->{offset} + $offset,
377             pack($self->{long_pack}, $location ),
378             pack( 'C C', $fileobj->transaction_id, 0 ),
379         );
380     }
381     # Adding a new md5
382     else {
383         my $keyloc = $fileobj->request_space( $self->tag_size( $self->{keyloc_size} ) );
384
385         # The bucket fit into list
386         if ( defined $offset ) {
387             $fileobj->print_at( $tag->{offset} + $offset,
388                 $md5, pack( $self->{long_pack}, $keyloc ),
389             );
390         }
391         # If bucket didn't fit into list, split into a new index level
392         else {
393             $self->split_index( $tag, $md5, $keyloc );
394         }
395
396         my $keytag = $self->write_tag(
397             $keyloc, SIG_KEYS, chr(0)x$self->{keyloc_size},
398         );
399
400         $location = $self->_fileobj->request_space( $size );
401         $fileobj->print_at( $keytag->{offset},
402             pack( $self->{long_pack}, $location ),
403             pack( 'C C', $fileobj->transaction_id, 0 ),
404         );
405
406         my $offset = 1;
407         for my $trans_id ( @transactions ) {
408             $fileobj->print_at( $keytag->{offset} + $self->{key_size} * $offset++,
409                 pack( $self->{long_pack}, 0 ),
410                 pack( 'C C', $trans_id, 1 ),
411             );
412         }
413     }
414
415     $self->write_value( $location, $plain_key, $value, $orig_key );
416
417     return 1;
418 }
419
420 sub write_value {
421     my $self = shift;
422     my ($location, $key, $value, $orig_key) = @_;
423
424     my $fileobj = $self->_fileobj;
425
426     my $dbm_deep_obj = _get_dbm_object( $value );
427     if ( $dbm_deep_obj && $dbm_deep_obj->_fileobj ne $fileobj ) {
428         $self->_throw_error( "Cannot cross-reference. Use export() instead" );
429     }
430
431     ##
432     # Write signature based on content type, set content length and write
433     # actual value.
434     ##
435     my $r = Scalar::Util::reftype( $value ) || '';
436     if ( $dbm_deep_obj ) {
437         $self->write_tag( $location, SIG_INTERNAL,pack($self->{long_pack}, $dbm_deep_obj->_base_offset) );
438     }
439     elsif ($r eq 'HASH') {
440         if ( !$dbm_deep_obj && tied %{$value} ) {
441             $self->_throw_error( "Cannot store something that is tied" );
442         }
443         $self->write_tag( $location, SIG_HASH, chr(0)x$self->{index_size} );
444     }
445     elsif ($r eq 'ARRAY') {
446         if ( !$dbm_deep_obj && tied @{$value} ) {
447             $self->_throw_error( "Cannot store something that is tied" );
448         }
449         $self->write_tag( $location, SIG_ARRAY, chr(0)x$self->{index_size} );
450     }
451     elsif (!defined($value)) {
452         $self->write_tag( $location, SIG_NULL, '' );
453     }
454     else {
455         $self->write_tag( $location, SIG_DATA, $value );
456     }
457
458     ##
459     # Plain key is stored AFTER value, as keys are typically fetched less often.
460     ##
461     $fileobj->print_at( undef, pack($self->{data_pack}, length($key)) . $key );
462
463     # Internal references don't care about autobless
464     return 1 if $dbm_deep_obj;
465
466     ##
467     # If value is blessed, preserve class name
468     ##
469     if ( $fileobj->{autobless} ) {
470         if ( defined( my $c = Scalar::Util::blessed($value) ) ) {
471             $fileobj->print_at( undef, chr(1), pack($self->{data_pack}, length($c)) . $c );
472         }
473         else {
474             $fileobj->print_at( undef, chr(0) );
475         }
476     }
477
478     ##
479     # Tie the passed in reference so that changes to it are reflected in the
480     # datafile. The use of $location as the base_offset will act as the
481     # the linkage between parent and child.
482     #
483     # The overall assignment is a hack around the fact that just tying doesn't
484     # store the values. This may not be the wrong thing to do.
485     ##
486     if ($r eq 'HASH') {
487         my %x = %$value;
488         tie %$value, 'DBM::Deep', {
489             base_offset => $location,
490             fileobj     => $fileobj,
491             parent      => $self->{obj},
492             parent_key  => $orig_key,
493         };
494         %$value = %x;
495     }
496     elsif ($r eq 'ARRAY') {
497         my @x = @$value;
498         tie @$value, 'DBM::Deep', {
499             base_offset => $location,
500             fileobj     => $fileobj,
501             parent      => $self->{obj},
502             parent_key  => $orig_key,
503         };
504         @$value = @x;
505     }
506
507     return 1;
508 }
509
510 sub split_index {
511     my $self = shift;
512     my ($tag, $md5, $keyloc) = @_;
513
514     my $fileobj = $self->_fileobj;
515
516     my $loc = $fileobj->request_space(
517         $self->tag_size( $self->{index_size} ),
518     );
519
520     $fileobj->print_at( $tag->{ref_loc}, pack($self->{long_pack}, $loc) );
521
522     my $index_tag = $self->write_tag(
523         $loc, SIG_INDEX,
524         chr(0)x$self->{index_size},
525     );
526
527     my $keys = $tag->{content}
528              . $md5 . pack($self->{long_pack}, $keyloc);
529
530     my @newloc = ();
531     BUCKET:
532     # The <= here is deliberate - we have max_buckets+1 keys to iterate
533     # through, unlike every other loop that uses max_buckets as a stop.
534     for (my $i = 0; $i <= $self->{max_buckets}; $i++) {
535         my ($key, $old_subloc) = $self->_get_key_subloc( $keys, $i );
536
537         die "[INTERNAL ERROR]: No key in split_index()\n" unless $key;
538         die "[INTERNAL ERROR]: No subloc in split_index()\n" unless $old_subloc;
539
540         my $num = ord(substr($key, $tag->{ch} + 1, 1));
541
542         if ($newloc[$num]) {
543             my $subkeys = $fileobj->read_at( $newloc[$num], $self->{bucket_list_size} );
544
545             # This is looking for the first empty spot
546             my ($subloc, $offset) = $self->_find_in_buckets(
547                 { content => $subkeys }, '',
548             );
549
550             $fileobj->print_at(
551                 $newloc[$num] + $offset,
552                 $key, pack($self->{long_pack}, $old_subloc),
553             );
554
555             next;
556         }
557
558         my $loc = $fileobj->request_space(
559             $self->tag_size( $self->{bucket_list_size} ),
560         );
561
562         $fileobj->print_at(
563             $index_tag->{offset} + ($num * $self->{long_size}),
564             pack($self->{long_pack}, $loc),
565         );
566
567         my $blist_tag = $self->write_tag(
568             $loc, SIG_BLIST,
569             chr(0)x$self->{bucket_list_size},
570         );
571
572         $fileobj->print_at( $blist_tag->{offset}, $key . pack($self->{long_pack}, $old_subloc) );
573
574         $newloc[$num] = $blist_tag->{offset};
575     }
576
577     $self->_release_space(
578         $self->tag_size( $self->{bucket_list_size} ),
579         $tag->{offset} - SIG_SIZE - $self->{data_size},
580     );
581
582     return 1;
583 }
584
585 sub read_from_loc {
586     my $self = shift;
587     my ($subloc, $orig_key) = @_;
588
589     my $fileobj = $self->_fileobj;
590
591     my $signature = $fileobj->read_at( $subloc, SIG_SIZE );
592
593     ##
594     # If value is a hash or array, return new DBM::Deep object with correct offset
595     ##
596     if (($signature eq SIG_HASH) || ($signature eq SIG_ARRAY)) {
597         my $new_obj = DBM::Deep->new({
598             type        => $signature,
599             base_offset => $subloc,
600             fileobj     => $self->_fileobj,
601             parent      => $self->{obj},
602             parent_key  => $orig_key,
603         });
604
605         if ($new_obj->_fileobj->{autobless}) {
606             ##
607             # Skip over value and plain key to see if object needs
608             # to be re-blessed
609             ##
610             $fileobj->increment_pointer( $self->{data_size} + $self->{index_size} );
611
612             my $size = $fileobj->read_at( undef, $self->{data_size} );
613             $size = unpack($self->{data_pack}, $size);
614             if ($size) { $fileobj->increment_pointer( $size ); }
615
616             my $bless_bit = $fileobj->read_at( undef, 1 );
617             if ( ord($bless_bit) ) {
618                 my $size = unpack(
619                     $self->{data_pack},
620                     $fileobj->read_at( undef, $self->{data_size} ),
621                 );
622
623                 if ( $size ) {
624                     $new_obj = bless $new_obj, $fileobj->read_at( undef, $size );
625                 }
626             }
627         }
628
629         return $new_obj;
630     }
631     elsif ( $signature eq SIG_INTERNAL ) {
632         my $size = $fileobj->read_at( undef, $self->{data_size} );
633         $size = unpack($self->{data_pack}, $size);
634
635         if ( $size ) {
636             my $new_loc = $fileobj->read_at( undef, $size );
637             $new_loc = unpack( $self->{long_pack}, $new_loc ); 
638             return $self->read_from_loc( $new_loc, $orig_key );
639         }
640         else {
641             return;
642         }
643     }
644     ##
645     # Otherwise return actual value
646     ##
647     elsif ( $signature eq SIG_DATA ) {
648         my $size = $fileobj->read_at( undef, $self->{data_size} );
649         $size = unpack($self->{data_pack}, $size);
650
651         my $value = $size ? $fileobj->read_at( undef, $size ) : '';
652         return $value;
653     }
654
655     ##
656     # Key exists, but content is null
657     ##
658     return;
659 }
660
661 sub get_bucket_value {
662     ##
663     # Fetch single value given tag and MD5 digested key.
664     ##
665     my $self = shift;
666     my ($tag, $md5, $orig_key) = @_;
667
668     #ACID - This is a read. Can find exact or HEAD
669     my ($keyloc, $offset) = $self->_find_in_buckets( $tag, $md5 );
670
671     if ( !$keyloc ) {
672         #XXX Need to use real key
673 #        $self->add_bucket( $tag, $md5, $orig_key, undef, $orig_key );
674 #        return;
675     }
676 #    elsif ( !$is_deleted ) {
677     else {
678         my $keytag = $self->load_tag( $keyloc );
679         my ($subloc, $is_deleted) = $self->find_keyloc( $keytag );
680         if (!$subloc && !$is_deleted) {
681             ($subloc, $is_deleted) = $self->find_keyloc( $keytag, 0 );
682         }
683         if ( $subloc && !$is_deleted ) {
684             return $self->read_from_loc( $subloc, $orig_key );
685         }
686     }
687
688     return;
689 }
690
691 sub delete_bucket {
692     ##
693     # Delete single key/value pair given tag and MD5 digested key.
694     ##
695     my $self = shift;
696     my ($tag, $md5, $orig_key) = @_;
697
698     #ACID - Although this is a mutation, we must find any transaction.
699     # This is because we need to mark something as deleted that is in the HEAD.
700     my ($keyloc, $offset) = $self->_find_in_buckets( $tag, $md5 );
701
702     return if !$keyloc;
703
704     my $fileobj = $self->_fileobj;
705
706     my @transactions;
707     if ( $fileobj->transaction_id == 0 ) {
708         @transactions = $fileobj->current_transactions;
709     }
710
711     if ( $fileobj->transaction_id == 0 ) {
712         my $keytag = $self->load_tag( $keyloc );
713
714         my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
715         return if !$subloc || $is_deleted;
716
717         my $value = $self->read_from_loc( $subloc, $orig_key );
718
719         my $size = $self->_length_needed( $value, $orig_key );
720
721         for my $trans_id ( @transactions ) {
722             my ($loc, $is_deleted, $offset2) = $self->find_keyloc( $keytag, $trans_id );
723             unless ($loc) {
724                 my $location2 = $fileobj->request_space( $size );
725                 $fileobj->print_at( $keytag->{offset} + $offset2,
726                     pack($self->{long_pack}, $location2 ),
727                     pack( 'C C', $trans_id, 0 ),
728                 );
729                 $self->write_value( $location2, $orig_key, $value, $orig_key );
730             }
731         }
732
733         $keytag = $self->load_tag( $keyloc );
734         ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
735         $fileobj->print_at( $keytag->{offset} + $offset,
736             substr( $keytag->{content}, $offset + $self->{key_size} ),
737             chr(0) x $self->{key_size},
738         );
739     }
740     else {
741         my $keytag = $self->load_tag( $keyloc );
742
743         my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
744
745         $fileobj->print_at( $keytag->{offset} + $offset,
746             pack($self->{long_pack}, 0 ),
747             pack( 'C C', $fileobj->transaction_id, 1 ),
748         );
749     }
750
751     return 1;
752 }
753
754 sub bucket_exists {
755     ##
756     # Check existence of single key given tag and MD5 digested key.
757     ##
758     my $self = shift;
759     my ($tag, $md5) = @_;
760
761     #ACID - This is a read. Can find exact or HEAD
762     my ($keyloc) = $self->_find_in_buckets( $tag, $md5 );
763     my $keytag = $self->load_tag( $keyloc );
764     my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
765     if ( !$subloc && !$is_deleted ) {
766         ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag, 0 );
767     }
768     return ($subloc && !$is_deleted) && 1;
769 }
770
771 sub find_blist {
772     ##
773     # Locate offset for bucket list, given digested key
774     ##
775     my $self = shift;
776     my ($offset, $md5, $args) = @_;
777     $args = {} unless $args;
778
779     ##
780     # Locate offset for bucket list using digest index system
781     ##
782     my $tag = $self->load_tag( $offset )
783         or $self->_throw_error( "INTERNAL ERROR - Cannot find tag" );
784
785     my $ch = 0;
786     while ($tag->{signature} ne SIG_BLIST) {
787         my $num = ord substr($md5, $ch, 1);
788
789         my $ref_loc = $tag->{offset} + ($num * $self->{long_size});
790         $tag = $self->index_lookup( $tag, $num );
791
792         if (!$tag) {
793             return if !$args->{create};
794
795             my $loc = $self->_fileobj->request_space(
796                 $self->tag_size( $self->{bucket_list_size} ),
797             );
798
799             $self->_fileobj->print_at( $ref_loc, pack($self->{long_pack}, $loc) );
800
801             $tag = $self->write_tag(
802                 $loc, SIG_BLIST,
803                 chr(0)x$self->{bucket_list_size},
804             );
805
806             $tag->{ref_loc} = $ref_loc;
807             $tag->{ch} = $ch;
808
809             last;
810         }
811
812         $tag->{ch} = $ch++;
813         $tag->{ref_loc} = $ref_loc;
814     }
815
816     return $tag;
817 }
818
819 sub index_lookup {
820     ##
821     # Given index tag, lookup single entry in index and return .
822     ##
823     my $self = shift;
824     my ($tag, $index) = @_;
825
826     my $location = unpack(
827         $self->{long_pack},
828         substr(
829             $tag->{content},
830             $index * $self->{long_size},
831             $self->{long_size},
832         ),
833     );
834
835     if (!$location) { return; }
836
837     return $self->load_tag( $location );
838 }
839
840 sub traverse_index {
841     ##
842     # Scan index and recursively step into deeper levels, looking for next key.
843     ##
844     my $self = shift;
845     my ($xxxx, $offset, $ch, $force_return_next) = @_;
846
847     my $tag = $self->load_tag( $offset );
848
849     if ($tag->{signature} ne SIG_BLIST) {
850         my $start = $xxxx->{return_next} ? 0 : ord(substr($xxxx->{prev_md5}, $ch, 1));
851
852         for (my $idx = $start; $idx < $self->{hash_chars_used}; $idx++) {
853             my $subloc = unpack(
854                 $self->{long_pack},
855                 substr(
856                     $tag->{content},
857                     $idx * $self->{long_size},
858                     $self->{long_size},
859                 ),
860             );
861
862             if ($subloc) {
863                 my $result = $self->traverse_index(
864                     $xxxx, $subloc, $ch + 1, $force_return_next,
865                 );
866
867                 if (defined $result) { return $result; }
868             }
869         } # index loop
870
871         $xxxx->{return_next} = 1;
872     }
873     # This is the bucket list
874     else {
875         my $keys = $tag->{content};
876         if ($force_return_next) { $xxxx->{return_next} = 1; }
877
878         ##
879         # Iterate through buckets, looking for a key match
880         ##
881         my $transaction_id = $self->_fileobj->transaction_id;
882         for (my $i = 0; $i < $self->{max_buckets}; $i++) {
883             my ($key, $keyloc) = $self->_get_key_subloc( $keys, $i );
884
885             # End of bucket list -- return to outer loop
886             if (!$keyloc) {
887                 $xxxx->{return_next} = 1;
888                 last;
889             }
890             # Located previous key -- return next one found
891             elsif ($key eq $xxxx->{prev_md5}) {
892                 $xxxx->{return_next} = 1;
893                 next;
894             }
895             # Seek to bucket location and skip over signature
896             elsif ($xxxx->{return_next}) {
897                 my $fileobj = $self->_fileobj;
898
899                 my $keytag = $self->load_tag( $keyloc );
900                 my ($subloc, $is_deleted) = $self->find_keyloc( $keytag );
901                 if ( $subloc == 0 && !$is_deleted ) {
902                     ($subloc, $is_deleted) = $self->find_keyloc( $keytag, 0 );
903                 }
904                 next if $is_deleted;
905
906                 # Skip over value to get to plain key
907                 my $sig = $fileobj->read_at( $subloc, SIG_SIZE );
908
909                 my $size = $fileobj->read_at( undef, $self->{data_size} );
910                 $size = unpack($self->{data_pack}, $size);
911                 if ($size) { $fileobj->increment_pointer( $size ); }
912
913                 # Read in plain key and return as scalar
914                 $size = $fileobj->read_at( undef, $self->{data_size} );
915                 $size = unpack($self->{data_pack}, $size);
916
917                 my $plain_key;
918                 if ($size) { $plain_key = $fileobj->read_at( undef, $size); }
919                 return $plain_key;
920             }
921         }
922
923         $xxxx->{return_next} = 1;
924     }
925
926     return;
927 }
928
929 sub get_next_key {
930     ##
931     # Locate next key, given digested previous one
932     ##
933     my $self = shift;
934     my ($obj) = @_;
935
936     ##
937     # If the previous key was not specifed, start at the top and
938     # return the first one found.
939     ##
940     my $temp;
941     if ( @_ > 1 ) {
942         $temp = {
943             prev_md5    => $_[1],
944             return_next => 0,
945         };
946     }
947     else {
948         $temp = {
949             prev_md5    => chr(0) x $self->{hash_size},
950             return_next => 1,
951         };
952     }
953
954     return $self->traverse_index( $temp, $obj->_base_offset, 0 );
955 }
956
957 # Utilities
958
959 sub _get_key_subloc {
960     my $self = shift;
961     my ($keys, $idx) = @_;
962
963     return unpack(
964         # This is 'a', not 'A'. Please read the pack() documentation for the
965         # difference between the two and why it's important.
966         "a$self->{hash_size} $self->{long_pack}",
967         substr(
968             $keys,
969             ($idx * $self->{bucket_size}),
970             $self->{bucket_size},
971         ),
972     );
973 }
974
975 sub _find_in_buckets {
976     my $self = shift;
977     my ($tag, $md5) = @_;
978
979     BUCKET:
980     for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
981         my ($key, $subloc) = $self->_get_key_subloc(
982             $tag->{content}, $i,
983         );
984
985         my @rv = ($subloc, $i * $self->{bucket_size});
986
987         unless ( $subloc ) {
988             return @rv;
989         }
990
991         next BUCKET if $key ne $md5;
992
993         return @rv;
994     }
995
996     return;
997 }
998
999 sub _release_space {
1000     my $self = shift;
1001     my ($size, $loc) = @_;
1002
1003     my $next_loc = 0;
1004
1005     $self->_fileobj->print_at( $loc,
1006         SIG_FREE, 
1007         pack($self->{long_pack}, $size ),
1008         pack($self->{long_pack}, $next_loc ),
1009     );
1010
1011     return;
1012 }
1013
1014 sub _throw_error {
1015     die "DBM::Deep: $_[1]\n";
1016 }
1017
1018 sub _get_dbm_object {
1019     my $item = shift;
1020
1021     my $obj = eval {
1022         local $SIG{__DIE__};
1023         if ($item->isa( 'DBM::Deep' )) {
1024             return $item;
1025         }
1026         return;
1027     };
1028     return $obj if $obj;
1029
1030     my $r = Scalar::Util::reftype( $item ) || '';
1031     if ( $r eq 'HASH' ) {
1032         my $obj = eval {
1033             local $SIG{__DIE__};
1034             my $obj = tied(%$item);
1035             if ($obj->isa( 'DBM::Deep' )) {
1036                 return $obj;
1037             }
1038             return;
1039         };
1040         return $obj if $obj;
1041     }
1042     elsif ( $r eq 'ARRAY' ) {
1043         my $obj = eval {
1044             local $SIG{__DIE__};
1045             my $obj = tied(@$item);
1046             if ($obj->isa( 'DBM::Deep' )) {
1047                 return $obj;
1048             }
1049             return;
1050         };
1051         return $obj if $obj;
1052     }
1053
1054     return;
1055 }
1056
1057 sub _length_needed {
1058     my $self = shift;
1059     my ($value, $key) = @_;
1060
1061     my $is_dbm_deep = eval {
1062         local $SIG{'__DIE__'};
1063         $value->isa( 'DBM::Deep' );
1064     };
1065
1066     my $len = SIG_SIZE
1067             + $self->{data_size} # size for value
1068             + $self->{data_size} # size for key
1069             + length( $key );    # length of key
1070
1071     if ( $is_dbm_deep && $value->_fileobj eq $self->_fileobj ) {
1072         # long_size is for the internal reference
1073         return $len + $self->{long_size};
1074     }
1075
1076     if ( $self->_fileobj->{autobless} ) {
1077         # This is for the bit saying whether or not this thing is blessed.
1078         $len += 1;
1079     }
1080
1081     my $r = Scalar::Util::reftype( $value ) || '';
1082     unless ( $r eq 'HASH' || $r eq 'ARRAY' ) {
1083         if ( defined $value ) {
1084             $len += length( $value );
1085         }
1086         return $len;
1087     }
1088
1089     $len += $self->{index_size};
1090
1091     # if autobless is enabled, must also take into consideration
1092     # the class name as it is stored after the key.
1093     if ( $self->_fileobj->{autobless} ) {
1094         my $c = Scalar::Util::blessed($value);
1095         if ( defined $c && !$is_dbm_deep ) {
1096             $len += $self->{data_size} + length($c);
1097         }
1098     }
1099
1100     return $len;
1101 }
1102
1103 1;
1104 __END__