r14235@Rob-Kinyons-PowerBook: rob | 2006-06-14 22:24:47 -0400
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 use 5.6.0;
4
5 use strict;
6 use warnings;
7
8 our $VERSION = q(0.99_03);
9
10 use Fcntl qw( :DEFAULT :flock );
11 use Scalar::Util ();
12
13 # File-wide notes:
14 # * To add to bucket_size, make sure you modify the following:
15 #   - calculate_sizes()
16 #   - _get_key_subloc()
17 #   - add_bucket() - where the buckets are printed
18 #
19 # * Every method in here assumes that the _storage has been appropriately
20 #   safeguarded. This can be anything from flock() to some sort of manual
21 #   mutex. But, it's the caller's responsability to make sure that this has
22 #   been done.
23
24 ##
25 # Setup file and tag signatures.  These should never change.
26 ##
27 sub SIG_FILE     () { 'DPDB' }
28 sub SIG_HEADER   () { 'h'    }
29 sub SIG_INTERNAL () { 'i'    }
30 sub SIG_HASH     () { 'H'    }
31 sub SIG_ARRAY    () { 'A'    }
32 sub SIG_NULL     () { 'N'    }
33 sub SIG_DATA     () { 'D'    }
34 sub SIG_INDEX    () { 'I'    }
35 sub SIG_BLIST    () { 'B'    }
36 sub SIG_FREE     () { 'F'    }
37 sub SIG_KEYS     () { 'K'    }
38 sub SIG_SIZE     () {  1     }
39
40 # This is the transaction ID for the HEAD
41 sub HEAD () { 0 }
42
43 ################################################################################
44 #
45 # This is new code. It is a complete rewrite of the engine based on a new API
46 #
47 ################################################################################
48
49 sub read_value {
50     my $self = shift;
51     my ($trans_id, $offset, $key, $orig_key) = @_;
52
53     my $dig_key = $self->_apply_digest( $key );
54     my $tag = $self->find_blist( $offset, $dig_key ) or return;
55     return $self->get_bucket_value( $tag, $dig_key, $orig_key );
56 }
57
58 sub key_exists {
59     my $self = shift;
60     my ($trans_id, $offset, $key) = @_;
61
62     my $dig_key = $self->_apply_digest( $key );
63     # exists() returns the empty string, not undef
64     my $tag = $self->find_blist( $offset, $dig_key ) or return '';
65     return $self->bucket_exists( $tag, $dig_key, $key );
66 }
67
68 sub get_next_key {
69     my $self = shift;
70     my ($trans_id, $offset) = @_;
71
72     # If the previous key was not specifed, start at the top and
73     # return the first one found.
74     my $temp;
75     if ( @_ > 2 ) {
76         $temp = {
77             prev_md5    => $self->_apply_digest($_[2]),
78             return_next => 0,
79         };
80     }
81     else {
82         $temp = {
83             prev_md5    => chr(0) x $self->{hash_size},
84             return_next => 1,
85         };
86     }
87
88     return $self->traverse_index( $temp, $offset, 0 );
89 }
90
91 sub delete_key {
92     my $self = shift;
93     my ($trans_id, $offset, $key, $orig_key) = @_;
94
95     my $dig_key = $self->_apply_digest( $key );
96     my $tag = $self->find_blist( $offset, $dig_key ) or return;
97     my $value = $self->get_bucket_value( $tag, $dig_key, $orig_key );
98     $self->delete_bucket( $tag, $dig_key, $orig_key );
99     return $value;
100 }
101
102 sub write_value {
103     my $self = shift;
104     my ($trans_id, $offset, $key, $value, $orig_key) = @_;
105
106     my $dig_key = $self->_apply_digest( $key );
107     my $tag = $self->find_blist( $offset, $dig_key, { create => 1 } );
108     return $self->add_bucket( $tag, $dig_key, $key, $value, undef, $orig_key );
109 }
110
111 ################################################################################
112 #
113 # Below here is the old code. It will be folded into the code above as it can.
114 #
115 ################################################################################
116
117 sub new {
118     my $class = shift;
119     my ($args) = @_;
120
121     my $self = bless {
122         long_size => 4,
123         long_pack => 'N',
124         data_size => 4,
125         data_pack => 'N',
126
127         digest    => \&Digest::MD5::md5,
128         hash_size => 16, # In bytes
129
130         ##
131         # Number of buckets per blist before another level of indexing is
132         # done. Increase this value for slightly greater speed, but larger database
133         # files. DO NOT decrease this value below 16, due to risk of recursive
134         # reindex overrun.
135         ##
136         max_buckets => 16,
137
138         storage => undef,
139         obj     => undef,
140     }, $class;
141
142     if ( defined $args->{pack_size} ) {
143         if ( lc $args->{pack_size} eq 'small' ) {
144             $args->{long_size} = 2;
145             $args->{long_pack} = 'n';
146         }
147         elsif ( lc $args->{pack_size} eq 'medium' ) {
148             $args->{long_size} = 4;
149             $args->{long_pack} = 'N';
150         }
151         elsif ( lc $args->{pack_size} eq 'large' ) {
152             $args->{long_size} = 8;
153             $args->{long_pack} = 'Q';
154         }
155         else {
156             die "Unknown pack_size value: '$args->{pack_size}'\n";
157         }
158     }
159
160     # Grab the parameters we want to use
161     foreach my $param ( keys %$self ) {
162         next unless exists $args->{$param};
163         $self->{$param} = $args->{$param};
164     }
165     Scalar::Util::weaken( $self->{obj} ) if $self->{obj};
166
167     if ( $self->{max_buckets} < 16 ) {
168         warn "Floor of max_buckets is 16. Setting it to 16 from '$self->{max_buckets}'\n";
169         $self->{max_buckets} = 16;
170     }
171
172     return $self;
173 }
174
175 sub _storage { return $_[0]{storage} }
176
177 sub _apply_digest {
178     my $self = shift;
179     return $self->{digest}->(@_);
180 }
181
182 sub calculate_sizes {
183     my $self = shift;
184
185     # The 2**8 here indicates the number of different characters in the
186     # current hashing algorithm
187     #XXX Does this need to be updated with different hashing algorithms?
188     $self->{hash_chars_used}  = (2**8);
189     $self->{index_size}       = $self->{hash_chars_used} * $self->{long_size};
190
191     $self->{bucket_size}      = $self->{hash_size} + $self->{long_size} * 2;
192     $self->{bucket_list_size} = $self->{max_buckets} * $self->{bucket_size};
193
194     $self->{key_size}         = $self->{long_size} * 2;
195     $self->{keyloc_size}      = $self->{max_buckets} * $self->{key_size};
196
197     return;
198 }
199
200 sub write_file_header {
201     my $self = shift;
202
203     my $loc = $self->_storage->request_space( length( SIG_FILE ) + 33 );
204
205     $self->_storage->print_at( $loc,
206         SIG_FILE,
207         SIG_HEADER,
208         pack('N', 1),  # header version
209         pack('N', 24), # header size
210         pack('N4', 0, 0, 0, 0),  # currently running transaction IDs
211         pack('n', $self->{long_size}),
212         pack('A', $self->{long_pack}),
213         pack('n', $self->{data_size}),
214         pack('A', $self->{data_pack}),
215         pack('n', $self->{max_buckets}),
216     );
217
218     $self->_storage->set_transaction_offset( 13 );
219
220     return;
221 }
222
223 sub read_file_header {
224     my $self = shift;
225
226     my $buffer = $self->_storage->read_at( 0, length(SIG_FILE) + 9 );
227     return unless length($buffer);
228
229     my ($file_signature, $sig_header, $header_version, $size) = unpack(
230         'A4 A N N', $buffer
231     );
232
233     unless ( $file_signature eq SIG_FILE ) {
234         $self->_storage->close;
235         $self->_throw_error( "Signature not found -- file is not a Deep DB" );
236     }
237
238     unless ( $sig_header eq SIG_HEADER ) {
239         $self->_storage->close;
240         $self->_throw_error( "Old file version found." );
241     }
242
243     my $buffer2 = $self->_storage->read_at( undef, $size );
244     my ($a1, $a2, $a3, $a4, @values) = unpack( 'N4 n A n A n', $buffer2 );
245
246     $self->_storage->set_transaction_offset( 13 );
247
248     if ( @values < 5 || grep { !defined } @values ) {
249         $self->_storage->close;
250         $self->_throw_error("Corrupted file - bad header");
251     }
252
253     #XXX Add warnings if values weren't set right
254     @{$self}{qw(long_size long_pack data_size data_pack max_buckets)} = @values;
255
256     return length($buffer) + length($buffer2);
257 }
258
259 sub setup_fh {
260     my $self = shift;
261     my ($obj) = @_;
262
263     # Need to remove use of $fh here
264     my $fh = $self->_storage->{fh};
265     flock $fh, LOCK_EX;
266
267     #XXX The duplication of calculate_sizes needs to go away
268     unless ( $obj->{base_offset} ) {
269         my $bytes_read = $self->read_file_header;
270
271         $self->calculate_sizes;
272
273         ##
274         # File is empty -- write header and master index
275         ##
276         if (!$bytes_read) {
277             $self->_storage->audit( "# Database created on" );
278
279             $self->write_file_header;
280
281             $obj->{base_offset} = $self->_storage->request_space(
282                 $self->tag_size( $self->{index_size} ),
283             );
284
285             $self->write_tag(
286                 $obj->_base_offset, $obj->_type,
287                 chr(0)x$self->{index_size},
288             );
289
290             # Flush the filehandle
291             my $old_fh = select $fh;
292             my $old_af = $|; $| = 1; $| = $old_af;
293             select $old_fh;
294         }
295         else {
296             $obj->{base_offset} = $bytes_read;
297
298             ##
299             # Get our type from master index header
300             ##
301             my $tag = $self->load_tag($obj->_base_offset);
302             unless ( $tag ) {
303                 flock $fh, LOCK_UN;
304                 $self->_throw_error("Corrupted file, no master index record");
305             }
306
307             unless ($obj->_type eq $tag->{signature}) {
308                 flock $fh, LOCK_UN;
309                 $self->_throw_error("File type mismatch");
310             }
311         }
312     }
313     else {
314         $self->calculate_sizes;
315     }
316
317     #XXX We have to make sure we don't mess up when autoflush isn't turned on
318     $self->_storage->set_inode;
319
320     flock $fh, LOCK_UN;
321
322     return 1;
323 }
324
325 sub tag_size {
326     my $self = shift;
327     my ($size) = @_;
328     return SIG_SIZE + $self->{data_size} + $size;
329 }
330
331 sub write_tag {
332     ##
333     # Given offset, signature and content, create tag and write to disk
334     ##
335     my $self = shift;
336     my ($offset, $sig, $content) = @_;
337     my $size = length( $content );
338
339     $self->_storage->print_at(
340         $offset, 
341         $sig, pack($self->{data_pack}, $size), $content,
342     );
343
344     return unless defined $offset;
345
346     return {
347         signature => $sig,
348         #XXX Is this even used?
349         size      => $size,
350         offset    => $offset + SIG_SIZE + $self->{data_size},
351         content   => $content
352     };
353 }
354
355 sub load_tag {
356     ##
357     # Given offset, load single tag and return signature, size and data
358     ##
359     my $self = shift;
360     my ($offset) = @_;
361
362     my $storage = $self->_storage;
363
364     my ($sig, $size) = unpack(
365         "A $self->{data_pack}",
366         $storage->read_at( $offset, SIG_SIZE + $self->{data_size} ),
367     );
368
369     return {
370         signature => $sig,
371         size      => $size,   #XXX Is this even used?
372         start     => $offset,
373         offset    => $offset + SIG_SIZE + $self->{data_size},
374         content   => $storage->read_at( undef, $size ),
375     };
376 }
377
378 sub find_keyloc {
379     my $self = shift;
380     my ($tag, $transaction_id) = @_;
381     $transaction_id = $self->_storage->transaction_id
382         unless defined $transaction_id;
383
384     for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
385         my ($loc, $trans_id, $is_deleted) = unpack(
386             "$self->{long_pack} C C",
387             substr( $tag->{content}, $i * $self->{key_size}, $self->{key_size} ),
388         );
389
390         next if $loc != HEAD && $transaction_id != $trans_id;
391         return( $loc, $is_deleted, $i * $self->{key_size} );
392     }
393
394     return;
395 }
396
397 sub add_bucket {
398     ##
399     # Adds one key/value pair to bucket list, given offset, MD5 digest of key,
400     # plain (undigested) key and value.
401     ##
402     my $self = shift;
403     my ($tag, $md5, $plain_key, $value, $deleted, $orig_key) = @_;
404
405     # This verifies that only supported values will be stored.
406     {
407         my $r = Scalar::Util::reftype( $value );
408
409         last if !defined $r;
410         last if $r eq 'HASH';
411         last if $r eq 'ARRAY';
412
413         $self->_throw_error(
414             "Storage of references of type '$r' is not supported."
415         );
416     }
417
418     my $storage = $self->_storage;
419
420     #ACID - This is a mutation. Must only find the exact transaction
421     my ($keyloc, $offset) = $self->_find_in_buckets( $tag, $md5, 1 );
422
423     my @transactions;
424     if ( $storage->transaction_id == 0 ) {
425         @transactions = $storage->current_transactions;
426     }
427
428 #    $self->_release_space( $size, $subloc );
429 #XXX This needs updating to use _release_space
430
431     my $location;
432     my $size = $self->_length_needed( $value, $plain_key );
433
434     # Updating a known md5
435     if ( $keyloc ) {
436         my $keytag = $self->load_tag( $keyloc );
437         my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
438
439         if ( $subloc && !$is_deleted && @transactions ) {
440             my $old_value = $self->read_from_loc( $subloc, $orig_key );
441             my $old_size = $self->_length_needed( $old_value, $plain_key );
442
443             for my $trans_id ( @transactions ) {
444                 my ($loc, $is_deleted, $offset2) = $self->find_keyloc( $keytag, $trans_id );
445                 unless ($loc) {
446                     my $location2 = $storage->request_space( $old_size );
447                     $storage->print_at( $keytag->{offset} + $offset2,
448                         pack($self->{long_pack}, $location2 ),
449                         pack( 'C C', $trans_id, 0 ),
450                     );
451                     $self->_write_value( $location2, $plain_key, $old_value, $orig_key );
452                 }
453             }
454         }
455
456         $location = $self->_storage->request_space( $size );
457         #XXX This needs to be transactionally-aware in terms of which keytag->{offset} to use
458         $storage->print_at( $keytag->{offset} + $offset,
459             pack($self->{long_pack}, $location ),
460             pack( 'C C', $storage->transaction_id, 0 ),
461         );
462     }
463     # Adding a new md5
464     else {
465         my $keyloc = $storage->request_space( $self->tag_size( $self->{keyloc_size} ) );
466
467         # The bucket fit into list
468         if ( defined $offset ) {
469             $storage->print_at( $tag->{offset} + $offset,
470                 $md5, pack( $self->{long_pack}, $keyloc ),
471             );
472         }
473         # If bucket didn't fit into list, split into a new index level
474         else {
475             $self->split_index( $tag, $md5, $keyloc );
476         }
477
478         my $keytag = $self->write_tag(
479             $keyloc, SIG_KEYS, chr(0)x$self->{keyloc_size},
480         );
481
482         $location = $self->_storage->request_space( $size );
483         $storage->print_at( $keytag->{offset},
484             pack( $self->{long_pack}, $location ),
485             pack( 'C C', $storage->transaction_id, 0 ),
486         );
487
488         my $offset = 1;
489         for my $trans_id ( @transactions ) {
490             $storage->print_at( $keytag->{offset} + $self->{key_size} * $offset++,
491                 pack( $self->{long_pack}, 0 ),
492                 pack( 'C C', $trans_id, 1 ),
493             );
494         }
495     }
496
497     $self->_write_value( $location, $plain_key, $value, $orig_key );
498
499     return 1;
500 }
501
502 sub _write_value {
503     my $self = shift;
504     my ($location, $key, $value, $orig_key) = @_;
505
506     my $storage = $self->_storage;
507
508     my $dbm_deep_obj = _get_dbm_object( $value );
509     if ( $dbm_deep_obj && $dbm_deep_obj->_storage ne $storage ) {
510         $self->_throw_error( "Cannot cross-reference. Use export() instead" );
511     }
512
513     ##
514     # Write signature based on content type, set content length and write
515     # actual value.
516     ##
517     my $r = Scalar::Util::reftype( $value ) || '';
518     if ( $dbm_deep_obj ) {
519         $self->write_tag( $location, SIG_INTERNAL,pack($self->{long_pack}, $dbm_deep_obj->_base_offset) );
520     }
521     elsif ($r eq 'HASH') {
522         if ( !$dbm_deep_obj && tied %{$value} ) {
523             $self->_throw_error( "Cannot store something that is tied" );
524         }
525         $self->write_tag( $location, SIG_HASH, chr(0)x$self->{index_size} );
526     }
527     elsif ($r eq 'ARRAY') {
528         if ( !$dbm_deep_obj && tied @{$value} ) {
529             $self->_throw_error( "Cannot store something that is tied" );
530         }
531         $self->write_tag( $location, SIG_ARRAY, chr(0)x$self->{index_size} );
532     }
533     elsif (!defined($value)) {
534         $self->write_tag( $location, SIG_NULL, '' );
535     }
536     else {
537         $self->write_tag( $location, SIG_DATA, $value );
538     }
539
540     ##
541     # Plain key is stored AFTER value, as keys are typically fetched less often.
542     ##
543     $storage->print_at( undef, pack($self->{data_pack}, length($key)) . $key );
544
545     # Internal references don't care about autobless
546     return 1 if $dbm_deep_obj;
547
548     ##
549     # If value is blessed, preserve class name
550     ##
551     if ( $storage->{autobless} ) {
552         if ( defined( my $c = Scalar::Util::blessed($value) ) ) {
553             $storage->print_at( undef, chr(1), pack($self->{data_pack}, length($c)) . $c );
554         }
555         else {
556             $storage->print_at( undef, chr(0) );
557         }
558     }
559
560     ##
561     # Tie the passed in reference so that changes to it are reflected in the
562     # datafile. The use of $location as the base_offset will act as the
563     # the linkage between parent and child.
564     #
565     # The overall assignment is a hack around the fact that just tying doesn't
566     # store the values. This may not be the wrong thing to do.
567     ##
568     if ($r eq 'HASH') {
569         my %x = %$value;
570         tie %$value, 'DBM::Deep', {
571             base_offset => $location,
572             storage     => $storage,
573             parent      => $self->{obj},
574             parent_key  => $orig_key,
575         };
576         %$value = %x;
577         bless $value, 'DBM::Deep::Hash' unless Scalar::Util::blessed( $value );
578     }
579     elsif ($r eq 'ARRAY') {
580         my @x = @$value;
581         tie @$value, 'DBM::Deep', {
582             base_offset => $location,
583             storage     => $storage,
584             parent      => $self->{obj},
585             parent_key  => $orig_key,
586         };
587         @$value = @x;
588         bless $value, 'DBM::Deep::Array' unless Scalar::Util::blessed( $value );
589     }
590
591     return 1;
592 }
593
594 sub split_index {
595     my $self = shift;
596     my ($tag, $md5, $keyloc) = @_;
597
598     my $storage = $self->_storage;
599
600     my $loc = $storage->request_space(
601         $self->tag_size( $self->{index_size} ),
602     );
603
604     $storage->print_at( $tag->{ref_loc}, pack($self->{long_pack}, $loc) );
605
606     my $index_tag = $self->write_tag(
607         $loc, SIG_INDEX,
608         chr(0)x$self->{index_size},
609     );
610
611     my $keys = $tag->{content}
612              . $md5 . pack($self->{long_pack}, $keyloc);
613
614     my @newloc = ();
615     BUCKET:
616     # The <= here is deliberate - we have max_buckets+1 keys to iterate
617     # through, unlike every other loop that uses max_buckets as a stop.
618     for (my $i = 0; $i <= $self->{max_buckets}; $i++) {
619         my ($key, $old_subloc) = $self->_get_key_subloc( $keys, $i );
620
621         die "[INTERNAL ERROR]: No key in split_index()\n" unless $key;
622         die "[INTERNAL ERROR]: No subloc in split_index()\n" unless $old_subloc;
623
624         my $num = ord(substr($key, $tag->{ch} + 1, 1));
625
626         if ($newloc[$num]) {
627             my $subkeys = $storage->read_at( $newloc[$num], $self->{bucket_list_size} );
628
629             # This is looking for the first empty spot
630             my ($subloc, $offset) = $self->_find_in_buckets(
631                 { content => $subkeys }, '',
632             );
633
634             $storage->print_at(
635                 $newloc[$num] + $offset,
636                 $key, pack($self->{long_pack}, $old_subloc),
637             );
638
639             next;
640         }
641
642         my $loc = $storage->request_space(
643             $self->tag_size( $self->{bucket_list_size} ),
644         );
645
646         $storage->print_at(
647             $index_tag->{offset} + ($num * $self->{long_size}),
648             pack($self->{long_pack}, $loc),
649         );
650
651         my $blist_tag = $self->write_tag(
652             $loc, SIG_BLIST,
653             chr(0)x$self->{bucket_list_size},
654         );
655
656         $storage->print_at( $blist_tag->{offset}, $key . pack($self->{long_pack}, $old_subloc) );
657
658         $newloc[$num] = $blist_tag->{offset};
659     }
660
661     $self->_release_space(
662         $self->tag_size( $self->{bucket_list_size} ),
663         $tag->{start},
664     );
665
666     return 1;
667 }
668
669 sub read_from_loc {
670     my $self = shift;
671     my ($subloc, $orig_key) = @_;
672
673     my $storage = $self->_storage;
674
675     my $signature = $storage->read_at( $subloc, SIG_SIZE );
676
677     ##
678     # If value is a hash or array, return new DBM::Deep object with correct offset
679     ##
680     if (($signature eq SIG_HASH) || ($signature eq SIG_ARRAY)) {
681         #XXX This needs to be a singleton
682 #        my $new_obj;
683 #        my $is_autobless;
684 #        if ( $signature eq SIG_HASH ) {
685 #            $new_obj = {};
686 #            tie %$new_obj, 'DBM::Deep', {
687 #                base_offset => $subloc,
688 #                storage     => $self->_storage,
689 #                parent      => $self->{obj},
690 #                parent_key  => $orig_key,
691 #            };
692 #            $is_autobless = tied(%$new_obj)->_storage->{autobless};
693 #        }
694 #        else {
695 #            $new_obj = [];
696 #            tie @$new_obj, 'DBM::Deep', {
697 #                base_offset => $subloc,
698 #                storage     => $self->_storage,
699 #                parent      => $self->{obj},
700 #                parent_key  => $orig_key,
701 #            };
702 #            $is_autobless = tied(@$new_obj)->_storage->{autobless};
703 #        }
704 #
705 #        if ($is_autobless) {
706
707         my $new_obj = DBM::Deep->new({
708             type        => $signature,
709             base_offset => $subloc,
710             storage     => $self->_storage,
711             parent      => $self->{obj},
712             parent_key  => $orig_key,
713         });
714
715         if ($new_obj->_storage->{autobless}) {
716             ##
717             # Skip over value and plain key to see if object needs
718             # to be re-blessed
719             ##
720             $storage->increment_pointer( $self->{data_size} + $self->{index_size} );
721
722             my $size = $storage->read_at( undef, $self->{data_size} );
723             $size = unpack($self->{data_pack}, $size);
724             if ($size) { $storage->increment_pointer( $size ); }
725
726             my $bless_bit = $storage->read_at( undef, 1 );
727             if ( ord($bless_bit) ) {
728                 my $size = unpack(
729                     $self->{data_pack},
730                     $storage->read_at( undef, $self->{data_size} ),
731                 );
732
733                 if ( $size ) {
734                     $new_obj = bless $new_obj, $storage->read_at( undef, $size );
735                 }
736             }
737         }
738
739         return $new_obj;
740     }
741     elsif ( $signature eq SIG_INTERNAL ) {
742         my $size = $storage->read_at( undef, $self->{data_size} );
743         $size = unpack($self->{data_pack}, $size);
744
745         if ( $size ) {
746             my $new_loc = $storage->read_at( undef, $size );
747             $new_loc = unpack( $self->{long_pack}, $new_loc ); 
748             return $self->read_from_loc( $new_loc, $orig_key );
749         }
750         else {
751             return;
752         }
753     }
754     ##
755     # Otherwise return actual value
756     ##
757     elsif ( $signature eq SIG_DATA ) {
758         my $size = $storage->read_at( undef, $self->{data_size} );
759         $size = unpack($self->{data_pack}, $size);
760
761         my $value = $size ? $storage->read_at( undef, $size ) : '';
762         return $value;
763     }
764
765     ##
766     # Key exists, but content is null
767     ##
768     return;
769 }
770
771 sub get_bucket_value {
772     ##
773     # Fetch single value given tag and MD5 digested key.
774     ##
775     my $self = shift;
776     my ($tag, $md5, $orig_key) = @_;
777
778     #ACID - This is a read. Can find exact or HEAD
779     my ($keyloc, $offset) = $self->_find_in_buckets( $tag, $md5 );
780
781     if ( !$keyloc ) {
782         #XXX Need to use real key
783 #        $self->add_bucket( $tag, $md5, $orig_key, undef, $orig_key );
784 #        return;
785     }
786 #    elsif ( !$is_deleted ) {
787     else {
788         my $keytag = $self->load_tag( $keyloc );
789         my ($subloc, $is_deleted) = $self->find_keyloc( $keytag );
790         if (!$subloc && !$is_deleted) {
791             ($subloc, $is_deleted) = $self->find_keyloc( $keytag, 0 );
792         }
793         if ( $subloc && !$is_deleted ) {
794             return $self->read_from_loc( $subloc, $orig_key );
795         }
796     }
797
798     return;
799 }
800
801 sub delete_bucket {
802     ##
803     # Delete single key/value pair given tag and MD5 digested key.
804     ##
805     my $self = shift;
806     my ($tag, $md5, $orig_key) = @_;
807
808     #ACID - Although this is a mutation, we must find any transaction.
809     # This is because we need to mark something as deleted that is in the HEAD.
810     my ($keyloc, $offset) = $self->_find_in_buckets( $tag, $md5 );
811
812     return if !$keyloc;
813
814     my $storage = $self->_storage;
815
816     my @transactions;
817     if ( $storage->transaction_id == 0 ) {
818         @transactions = $storage->current_transactions;
819     }
820
821     if ( $storage->transaction_id == 0 ) {
822         my $keytag = $self->load_tag( $keyloc );
823
824         my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
825         return if !$subloc || $is_deleted;
826
827         my $value = $self->read_from_loc( $subloc, $orig_key );
828
829         my $size = $self->_length_needed( $value, $orig_key );
830
831         for my $trans_id ( @transactions ) {
832             my ($loc, $is_deleted, $offset2) = $self->find_keyloc( $keytag, $trans_id );
833             unless ($loc) {
834                 my $location2 = $storage->request_space( $size );
835                 $storage->print_at( $keytag->{offset} + $offset2,
836                     pack($self->{long_pack}, $location2 ),
837                     pack( 'C C', $trans_id, 0 ),
838                 );
839                 $self->_write_value( $location2, $orig_key, $value, $orig_key );
840             }
841         }
842
843         $keytag = $self->load_tag( $keyloc );
844         ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
845         $storage->print_at( $keytag->{offset} + $offset,
846             substr( $keytag->{content}, $offset + $self->{key_size} ),
847             chr(0) x $self->{key_size},
848         );
849     }
850     else {
851         my $keytag = $self->load_tag( $keyloc );
852
853         my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
854
855         $storage->print_at( $keytag->{offset} + $offset,
856             pack($self->{long_pack}, 0 ),
857             pack( 'C C', $storage->transaction_id, 1 ),
858         );
859     }
860
861     return 1;
862 }
863
864 sub bucket_exists {
865     ##
866     # Check existence of single key given tag and MD5 digested key.
867     ##
868     my $self = shift;
869     my ($tag, $md5) = @_;
870
871     #ACID - This is a read. Can find exact or HEAD
872     my ($keyloc) = $self->_find_in_buckets( $tag, $md5 );
873     my $keytag = $self->load_tag( $keyloc );
874     my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
875     if ( !$subloc && !$is_deleted ) {
876         ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag, 0 );
877     }
878     return ($subloc && !$is_deleted) && 1;
879 }
880
881 sub find_blist {
882     ##
883     # Locate offset for bucket list, given digested key
884     ##
885     my $self = shift;
886     my ($offset, $md5, $args) = @_;
887     $args = {} unless $args;
888
889     ##
890     # Locate offset for bucket list using digest index system
891     ##
892     my $tag = $self->load_tag( $offset )
893         or $self->_throw_error( "INTERNAL ERROR - Cannot find tag" );
894
895     #XXX What happens when $ch >= $self->{hash_size} ??
896     for (my $ch = 0; $tag->{signature} ne SIG_BLIST; $ch++) {
897         my $num = ord substr($md5, $ch, 1);
898
899         my $ref_loc = $tag->{offset} + ($num * $self->{long_size});
900         $tag = $self->index_lookup( $tag, $num );
901
902         if (!$tag) {
903             return if !$args->{create};
904
905             my $loc = $self->_storage->request_space(
906                 $self->tag_size( $self->{bucket_list_size} ),
907             );
908
909             $self->_storage->print_at( $ref_loc, pack($self->{long_pack}, $loc) );
910
911             $tag = $self->write_tag(
912                 $loc, SIG_BLIST,
913                 chr(0)x$self->{bucket_list_size},
914             );
915
916             $tag->{ref_loc} = $ref_loc;
917             $tag->{ch} = $ch;
918
919             last;
920         }
921
922         $tag->{ch} = $ch;
923         $tag->{ref_loc} = $ref_loc;
924     }
925
926     return $tag;
927 }
928
929 sub index_lookup {
930     ##
931     # Given index tag, lookup single entry in index and return .
932     ##
933     my $self = shift;
934     my ($tag, $index) = @_;
935
936     my $location = unpack(
937         $self->{long_pack},
938         substr(
939             $tag->{content},
940             $index * $self->{long_size},
941             $self->{long_size},
942         ),
943     );
944
945     if (!$location) { return; }
946
947     return $self->load_tag( $location );
948 }
949
950 sub traverse_index {
951     ##
952     # Scan index and recursively step into deeper levels, looking for next key.
953     ##
954     my $self = shift;
955     my ($xxxx, $offset, $ch, $force_return_next) = @_;
956
957     my $tag = $self->load_tag( $offset );
958
959     if ($tag->{signature} ne SIG_BLIST) {
960         my $start = $xxxx->{return_next} ? 0 : ord(substr($xxxx->{prev_md5}, $ch, 1));
961
962         for (my $idx = $start; $idx < $self->{hash_chars_used}; $idx++) {
963             my $subloc = unpack(
964                 $self->{long_pack},
965                 substr(
966                     $tag->{content},
967                     $idx * $self->{long_size},
968                     $self->{long_size},
969                 ),
970             );
971
972             if ($subloc) {
973                 my $result = $self->traverse_index(
974                     $xxxx, $subloc, $ch + 1, $force_return_next,
975                 );
976
977                 if (defined $result) { return $result; }
978             }
979         } # index loop
980
981         $xxxx->{return_next} = 1;
982     }
983     # This is the bucket list
984     else {
985         my $keys = $tag->{content};
986         if ($force_return_next) { $xxxx->{return_next} = 1; }
987
988         ##
989         # Iterate through buckets, looking for a key match
990         ##
991         my $transaction_id = $self->_storage->transaction_id;
992         for (my $i = 0; $i < $self->{max_buckets}; $i++) {
993             my ($key, $keyloc) = $self->_get_key_subloc( $keys, $i );
994
995             # End of bucket list -- return to outer loop
996             if (!$keyloc) {
997                 $xxxx->{return_next} = 1;
998                 last;
999             }
1000             # Located previous key -- return next one found
1001             elsif ($key eq $xxxx->{prev_md5}) {
1002                 $xxxx->{return_next} = 1;
1003                 next;
1004             }
1005             # Seek to bucket location and skip over signature
1006             elsif ($xxxx->{return_next}) {
1007                 my $storage = $self->_storage;
1008
1009                 my $keytag = $self->load_tag( $keyloc );
1010                 my ($subloc, $is_deleted) = $self->find_keyloc( $keytag );
1011                 if ( $subloc == 0 && !$is_deleted ) {
1012                     ($subloc, $is_deleted) = $self->find_keyloc( $keytag, 0 );
1013                 }
1014                 next if $is_deleted;
1015
1016                 # Skip over value to get to plain key
1017                 my $sig = $storage->read_at( $subloc, SIG_SIZE );
1018
1019                 my $size = $storage->read_at( undef, $self->{data_size} );
1020                 $size = unpack($self->{data_pack}, $size);
1021                 if ($size) { $storage->increment_pointer( $size ); }
1022
1023                 # Read in plain key and return as scalar
1024                 $size = $storage->read_at( undef, $self->{data_size} );
1025                 $size = unpack($self->{data_pack}, $size);
1026
1027                 my $plain_key;
1028                 if ($size) { $plain_key = $storage->read_at( undef, $size); }
1029                 return $plain_key;
1030             }
1031         }
1032
1033         $xxxx->{return_next} = 1;
1034     }
1035
1036     return;
1037 }
1038
1039 # Utilities
1040
1041 sub _get_key_subloc {
1042     my $self = shift;
1043     my ($keys, $idx) = @_;
1044
1045     return unpack(
1046         # This is 'a', not 'A'. Please read the pack() documentation for the
1047         # difference between the two and why it's important.
1048         "a$self->{hash_size} $self->{long_pack}",
1049         substr(
1050             $keys,
1051             ($idx * $self->{bucket_size}),
1052             $self->{bucket_size},
1053         ),
1054     );
1055 }
1056
1057 sub _find_in_buckets {
1058     my $self = shift;
1059     my ($tag, $md5) = @_;
1060
1061     BUCKET:
1062     for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
1063         my ($key, $subloc) = $self->_get_key_subloc(
1064             $tag->{content}, $i,
1065         );
1066
1067         next BUCKET if $subloc && $key ne $md5;
1068         return( $subloc, $i * $self->{bucket_size} );
1069     }
1070
1071     return;
1072 }
1073
1074 sub _release_space {
1075     my $self = shift;
1076     my ($size, $loc) = @_;
1077
1078     my $next_loc = 0;
1079
1080     $self->_storage->print_at( $loc,
1081         SIG_FREE, 
1082         pack($self->{long_pack}, $size ),
1083         pack($self->{long_pack}, $next_loc ),
1084     );
1085
1086     return;
1087 }
1088
1089 sub _throw_error {
1090     die "DBM::Deep: $_[1]\n";
1091 }
1092
1093 sub _get_dbm_object {
1094     my $item = shift;
1095
1096     my $obj = eval {
1097         local $SIG{__DIE__};
1098         if ($item->isa( 'DBM::Deep' )) {
1099             return $item;
1100         }
1101         return;
1102     };
1103     return $obj if $obj;
1104
1105     my $r = Scalar::Util::reftype( $item ) || '';
1106     if ( $r eq 'HASH' ) {
1107         my $obj = eval {
1108             local $SIG{__DIE__};
1109             my $obj = tied(%$item);
1110             if ($obj->isa( 'DBM::Deep' )) {
1111                 return $obj;
1112             }
1113             return;
1114         };
1115         return $obj if $obj;
1116     }
1117     elsif ( $r eq 'ARRAY' ) {
1118         my $obj = eval {
1119             local $SIG{__DIE__};
1120             my $obj = tied(@$item);
1121             if ($obj->isa( 'DBM::Deep' )) {
1122                 return $obj;
1123             }
1124             return;
1125         };
1126         return $obj if $obj;
1127     }
1128
1129     return;
1130 }
1131
1132 sub _length_needed {
1133     my $self = shift;
1134     my ($value, $key) = @_;
1135
1136     my $is_dbm_deep = eval {
1137         local $SIG{'__DIE__'};
1138         $value->isa( 'DBM::Deep' );
1139     };
1140
1141     my $len = SIG_SIZE
1142             + $self->{data_size} # size for value
1143             + $self->{data_size} # size for key
1144             + length( $key );    # length of key
1145
1146     if ( $is_dbm_deep && $value->_storage eq $self->_storage ) {
1147         # long_size is for the internal reference
1148         return $len + $self->{long_size};
1149     }
1150
1151     if ( $self->_storage->{autobless} ) {
1152         # This is for the bit saying whether or not this thing is blessed.
1153         $len += 1;
1154     }
1155
1156     my $r = Scalar::Util::reftype( $value ) || '';
1157     unless ( $r eq 'HASH' || $r eq 'ARRAY' ) {
1158         if ( defined $value ) {
1159             $len += length( $value );
1160         }
1161         return $len;
1162     }
1163
1164     $len += $self->{index_size};
1165
1166     # if autobless is enabled, must also take into consideration
1167     # the class name as it is stored after the key.
1168     if ( $self->_storage->{autobless} ) {
1169         my $c = Scalar::Util::blessed($value);
1170         if ( defined $c && !$is_dbm_deep ) {
1171             $len += $self->{data_size} + length($c);
1172         }
1173     }
1174
1175     return $len;
1176 }
1177
1178 1;
1179 __END__