First pass at cleanup
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 #use Sub::Caller qw( load_tag );
4
5 use 5.6.0;
6
7 use strict;
8
9 our $VERSION = q(0.99_03);
10
11 use Fcntl qw( :DEFAULT :flock );
12 use Scalar::Util ();
13
14 # File-wide notes:
15 # * To add to bucket_size, make sure you modify the following:
16 #   - calculate_sizes()
17 #   - _get_key_subloc()
18 #   - add_bucket() - where the buckets are printed
19 #
20 # * Every method in here assumes that the _storage has been appropriately
21 #   safeguarded. This can be anything from flock() to some sort of manual
22 #   mutex. But, it's the caller's responsability to make sure that this has
23 #   been done.
24
25 ##
26 # Setup file and tag signatures.  These should never change.
27 ##
28 sub SIG_FILE     () { 'DPDB' }
29 sub SIG_HEADER   () { 'h'    }
30 sub SIG_INTERNAL () { 'i'    }
31 sub SIG_HASH     () { 'H'    }
32 sub SIG_ARRAY    () { 'A'    }
33 sub SIG_NULL     () { 'N'    }
34 sub SIG_DATA     () { 'D'    }
35 sub SIG_INDEX    () { 'I'    }
36 sub SIG_BLIST    () { 'B'    }
37 sub SIG_FREE     () { 'F'    }
38 sub SIG_KEYS     () { 'K'    }
39 sub SIG_SIZE     () {  1     }
40
41 # This is the transaction ID for the HEAD
42 sub HEAD () { 0 }
43
44 ################################################################################
45 #
46 # This is new code. It is a complete rewrite of the engine based on a new API
47 #
48 ################################################################################
49
50 sub read_value {
51     my $self = shift;
52     my ($trans_id, $offset, $key, $orig_key) = @_;
53
54     my $dig_key = $self->_apply_digest( $key );
55     my $tag = $self->find_blist( $offset, $dig_key ) or return;
56     return $self->get_bucket_value( $tag, $dig_key, $orig_key );
57 }
58
59 sub key_exists {
60     my $self = shift;
61     my ($trans_id, $offset, $key) = @_;
62
63     my $dig_key = $self->_apply_digest( $key );
64     # exists() returns the empty string, not undef
65     my $tag = $self->find_blist( $offset, $dig_key ) or return '';
66     return $self->bucket_exists( $tag, $dig_key, $key );
67 }
68
69 sub get_next_key {
70     my $self = shift;
71     my ($trans_id, $offset) = @_;
72
73     # If the previous key was not specifed, start at the top and
74     # return the first one found.
75     my $temp;
76     if ( @_ > 2 ) {
77         $temp = {
78             prev_md5    => $self->_apply_digest($_[2]),
79             return_next => 0,
80         };
81     }
82     else {
83         $temp = {
84             prev_md5    => chr(0) x $self->{hash_size},
85             return_next => 1,
86         };
87     }
88
89     return $self->traverse_index( $temp, $offset, 0 );
90 }
91
92 sub delete_key {
93     my $self = shift;
94     my ($trans_id, $offset, $key, $orig_key) = @_;
95
96     my $dig_key = $self->_apply_digest( $key );
97     my $tag = $self->find_blist( $offset, $dig_key ) or return;
98     my $value = $self->get_bucket_value( $tag, $dig_key, $orig_key );
99     $self->delete_bucket( $tag, $dig_key, $orig_key );
100     return $value;
101 }
102
103 sub write_value {
104     my $self = shift;
105     my ($trans_id, $offset, $key, $value, $orig_key) = @_;
106
107     my $dig_key = $self->_apply_digest( $key );
108     my $tag = $self->find_blist( $offset, $dig_key, { create => 1 } );
109     return $self->add_bucket( $tag, $dig_key, $key, $value, undef, $orig_key );
110 }
111
112 ################################################################################
113 #
114 # Below here is the old code. It will be folded into the code above as it can.
115 #
116 ################################################################################
117
118 sub new {
119     my $class = shift;
120     my ($args) = @_;
121
122     my $self = bless {
123         long_size => 4,
124         long_pack => 'N',
125         data_size => 4,
126         data_pack => 'N',
127
128         digest    => \&Digest::MD5::md5,
129         hash_size => 16, # In bytes
130
131         ##
132         # Number of buckets per blist before another level of indexing is
133         # done. Increase this value for slightly greater speed, but larger database
134         # files. DO NOT decrease this value below 16, due to risk of recursive
135         # reindex overrun.
136         ##
137         max_buckets => 16,
138
139         storage => undef,
140         obj     => undef,
141     }, $class;
142
143     if ( defined $args->{pack_size} ) {
144         if ( lc $args->{pack_size} eq 'small' ) {
145             $args->{long_size} = 2;
146             $args->{long_pack} = 'n';
147         }
148         elsif ( lc $args->{pack_size} eq 'medium' ) {
149             $args->{long_size} = 4;
150             $args->{long_pack} = 'N';
151         }
152         elsif ( lc $args->{pack_size} eq 'large' ) {
153             $args->{long_size} = 8;
154             $args->{long_pack} = 'Q';
155         }
156         else {
157             die "Unknown pack_size value: '$args->{pack_size}'\n";
158         }
159     }
160
161     # Grab the parameters we want to use
162     foreach my $param ( keys %$self ) {
163         next unless exists $args->{$param};
164         $self->{$param} = $args->{$param};
165     }
166     Scalar::Util::weaken( $self->{obj} ) if $self->{obj};
167
168     if ( $self->{max_buckets} < 16 ) {
169         warn "Floor of max_buckets is 16. Setting it to 16 from '$self->{max_buckets}'\n";
170         $self->{max_buckets} = 16;
171     }
172
173     return $self;
174 }
175
176 sub _storage { return $_[0]{storage} }
177
178 sub _apply_digest {
179     my $self = shift;
180     return $self->{digest}->(@_);
181 }
182
183 sub calculate_sizes {
184     my $self = shift;
185
186     # The 2**8 here indicates the number of different characters in the
187     # current hashing algorithm
188     #XXX Does this need to be updated with different hashing algorithms?
189     $self->{hash_chars_used}  = (2**8);
190     $self->{index_size}       = $self->{hash_chars_used} * $self->{long_size};
191
192     $self->{bucket_size}      = $self->{hash_size} + $self->{long_size} * 2;
193     $self->{bucket_list_size} = $self->{max_buckets} * $self->{bucket_size};
194
195     $self->{key_size}         = $self->{long_size} * 2;
196     $self->{keyloc_size}      = $self->{max_buckets} * $self->{key_size};
197
198     return;
199 }
200
201 sub write_file_header {
202     my $self = shift;
203
204     my $loc = $self->_storage->request_space( length( SIG_FILE ) + 33 );
205
206     $self->_storage->print_at( $loc,
207         SIG_FILE,
208         SIG_HEADER,
209         pack('N', 1),  # header version
210         pack('N', 24), # header size
211         pack('N4', 0, 0, 0, 0),  # currently running transaction IDs
212         pack('n', $self->{long_size}),
213         pack('A', $self->{long_pack}),
214         pack('n', $self->{data_size}),
215         pack('A', $self->{data_pack}),
216         pack('n', $self->{max_buckets}),
217     );
218
219     $self->_storage->set_transaction_offset( 13 );
220
221     return;
222 }
223
224 sub read_file_header {
225     my $self = shift;
226
227     my $buffer = $self->_storage->read_at( 0, length(SIG_FILE) + 9 );
228     return unless length($buffer);
229
230     my ($file_signature, $sig_header, $header_version, $size) = unpack(
231         'A4 A N N', $buffer
232     );
233
234     unless ( $file_signature eq SIG_FILE ) {
235         $self->_storage->close;
236         $self->_throw_error( "Signature not found -- file is not a Deep DB" );
237     }
238
239     unless ( $sig_header eq SIG_HEADER ) {
240         $self->_storage->close;
241         $self->_throw_error( "Old file version found." );
242     }
243
244     my $buffer2 = $self->_storage->read_at( undef, $size );
245     my ($a1, $a2, $a3, $a4, @values) = unpack( 'N4 n A n A n', $buffer2 );
246
247     $self->_storage->set_transaction_offset( 13 );
248
249     if ( @values < 5 || grep { !defined } @values ) {
250         $self->_storage->close;
251         $self->_throw_error("Corrupted file - bad header");
252     }
253
254     #XXX Add warnings if values weren't set right
255     @{$self}{qw(long_size long_pack data_size data_pack max_buckets)} = @values;
256
257     return length($buffer) + length($buffer2);
258 }
259
260 sub setup_fh {
261     my $self = shift;
262     my ($obj) = @_;
263
264     # Need to remove use of $fh here
265     my $fh = $self->_storage->{fh};
266     flock $fh, LOCK_EX;
267
268     #XXX The duplication of calculate_sizes needs to go away
269     unless ( $obj->{base_offset} ) {
270         my $bytes_read = $self->read_file_header;
271
272         $self->calculate_sizes;
273
274         ##
275         # File is empty -- write header and master index
276         ##
277         if (!$bytes_read) {
278             $self->_storage->audit( "# Database created on" );
279
280             $self->write_file_header;
281
282             $obj->{base_offset} = $self->_storage->request_space(
283                 $self->tag_size( $self->{index_size} ),
284             );
285
286             $self->write_tag(
287                 $obj->_base_offset, $obj->_type,
288                 chr(0)x$self->{index_size},
289             );
290
291             # Flush the filehandle
292             my $old_fh = select $fh;
293             my $old_af = $|; $| = 1; $| = $old_af;
294             select $old_fh;
295         }
296         else {
297             $obj->{base_offset} = $bytes_read;
298
299             ##
300             # Get our type from master index header
301             ##
302             my $tag = $self->load_tag($obj->_base_offset);
303             unless ( $tag ) {
304                 flock $fh, LOCK_UN;
305                 $self->_throw_error("Corrupted file, no master index record");
306             }
307
308             unless ($obj->_type eq $tag->{signature}) {
309                 flock $fh, LOCK_UN;
310                 $self->_throw_error("File type mismatch");
311             }
312         }
313     }
314     else {
315         $self->calculate_sizes;
316     }
317
318     #XXX We have to make sure we don't mess up when autoflush isn't turned on
319     $self->_storage->set_inode;
320
321     flock $fh, LOCK_UN;
322
323     return 1;
324 }
325
326 sub tag_size {
327     my $self = shift;
328     my ($size) = @_;
329     return SIG_SIZE + $self->{data_size} + $size;
330 }
331
332 sub write_tag {
333     ##
334     # Given offset, signature and content, create tag and write to disk
335     ##
336     my $self = shift;
337     my ($offset, $sig, $content) = @_;
338     my $size = length( $content );
339
340     $self->_storage->print_at(
341         $offset, 
342         $sig, pack($self->{data_pack}, $size), $content,
343     );
344
345     return unless defined $offset;
346
347     return {
348         signature => $sig,
349         #XXX Is this even used?
350         size      => $size,
351         start     => $offset,
352         offset    => $offset + SIG_SIZE + $self->{data_size},
353         content   => $content,
354         is_new    => 1,
355     };
356 }
357
358 sub load_tag {
359     ##
360     # Given offset, load single tag and return signature, size and data
361     ##
362     my $self = shift;
363     my ($offset) = @_;
364     print join(":",map{$_||''}caller) . " - load_tag($offset)\n" if $::DEBUG;
365
366     my $storage = $self->_storage;
367
368     my ($sig, $size) = unpack(
369         "A $self->{data_pack}",
370         $storage->read_at( $offset, SIG_SIZE + $self->{data_size} ),
371     );
372
373     return {
374         signature => $sig,
375         size      => $size,   #XXX Is this even used?
376         start     => $offset,
377         offset    => $offset + SIG_SIZE + $self->{data_size},
378         content   => $storage->read_at( undef, $size ),
379     };
380 }
381
382 sub find_keyloc {
383     my $self = shift;
384     my ($tag, $transaction_id) = @_;
385     $transaction_id = $self->_storage->transaction_id
386         unless defined $transaction_id;
387
388     for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
389         my ($loc, $trans_id, $is_deleted) = unpack(
390             "$self->{long_pack} C C",
391             substr( $tag->{content}, $i * $self->{key_size}, $self->{key_size} ),
392         );
393
394         next if $loc != HEAD && $transaction_id != $trans_id;
395         return( $loc, $is_deleted, $i * $self->{key_size} );
396     }
397
398     return;
399 }
400
401 sub add_bucket {
402     ##
403     # Adds one key/value pair to bucket list, given offset, MD5 digest of key,
404     # plain (undigested) key and value.
405     ##
406     my $self = shift;
407     my ($tag, $md5, $plain_key, $value, $deleted, $orig_key) = @_;
408
409     # This verifies that only supported values will be stored.
410     {
411         my $r = Scalar::Util::reftype( $value );
412
413         last if !defined $r;
414         last if $r eq 'HASH';
415         last if $r eq 'ARRAY';
416
417         $self->_throw_error(
418             "Storage of references of type '$r' is not supported."
419         );
420     }
421
422     my $storage = $self->_storage;
423
424     #ACID - This is a mutation. Must only find the exact transaction
425     my ($keyloc, $offset) = $self->_find_in_buckets( $tag, $md5, 1 );
426
427     my @transactions;
428     if ( $storage->transaction_id == 0 ) {
429         @transactions = $storage->current_transactions;
430     }
431
432 #    $self->_release_space( $size, $subloc );
433 #XXX This needs updating to use _release_space
434
435     my $location;
436     my $size = $self->_length_needed( $value, $plain_key );
437
438     # Updating a known md5
439     if ( $keyloc ) {
440         my $keytag = $self->load_tag( $keyloc );
441         my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
442
443         if ( $subloc && !$is_deleted && @transactions ) {
444             my $old_value = $self->read_from_loc( $subloc, $orig_key );
445             my $old_size = $self->_length_needed( $old_value, $plain_key );
446
447             for my $trans_id ( @transactions ) {
448                 my ($loc, $is_deleted, $offset2) = $self->find_keyloc( $keytag, $trans_id );
449                 unless ($loc) {
450                     my $location2 = $storage->request_space( $old_size );
451                     $storage->print_at( $keytag->{offset} + $offset2,
452                         pack($self->{long_pack}, $location2 ),
453                         pack( 'C C', $trans_id, 0 ),
454                     );
455                     $self->_write_value( $location2, $plain_key, $old_value, $orig_key );
456                 }
457             }
458         }
459
460         $location = $self->_storage->request_space( $size );
461         #XXX This needs to be transactionally-aware in terms of which keytag->{offset} to use
462         $storage->print_at( $keytag->{offset} + $offset,
463             pack($self->{long_pack}, $location ),
464             pack( 'C C', $storage->transaction_id, 0 ),
465         );
466     }
467     # Adding a new md5
468     else {
469         my $keyloc = $storage->request_space( $self->tag_size( $self->{keyloc_size} ) );
470
471         # The bucket fit into list
472         if ( defined $offset ) {
473             $storage->print_at( $tag->{offset} + $offset,
474                 $md5, pack( $self->{long_pack}, $keyloc ),
475             );
476         }
477         # If bucket didn't fit into list, split into a new index level
478         else {
479             $self->split_index( $tag, $md5, $keyloc );
480         }
481
482         my $keytag = $self->write_tag(
483             $keyloc, SIG_KEYS, chr(0)x$self->{keyloc_size},
484         );
485
486         $location = $self->_storage->request_space( $size );
487         $storage->print_at( $keytag->{offset},
488             pack( $self->{long_pack}, $location ),
489             pack( 'C C', $storage->transaction_id, 0 ),
490         );
491
492         my $offset = 1;
493         for my $trans_id ( @transactions ) {
494             $storage->print_at( $keytag->{offset} + $self->{key_size} * $offset++,
495                 pack( $self->{long_pack}, 0 ),
496                 pack( 'C C', $trans_id, 1 ),
497             );
498         }
499     }
500
501     $self->_write_value( $location, $plain_key, $value, $orig_key );
502
503     return 1;
504 }
505
506 sub _write_value {
507     my $self = shift;
508     my ($key_loc, $location, $key, $value, $orig_key) = @_;
509
510     my $storage = $self->_storage;
511
512     my $dbm_deep_obj = _get_dbm_object( $value );
513     if ( $dbm_deep_obj && $dbm_deep_obj->_storage ne $storage ) {
514         $self->_throw_error( "Cannot cross-reference. Use export() instead" );
515     }
516
517     ##
518     # Write signature based on content type, set content length and write
519     # actual value.
520     ##
521     my $r = Scalar::Util::reftype( $value ) || '';
522     if ( $dbm_deep_obj ) {
523         $self->write_tag( $location, SIG_INTERNAL,pack($self->{long_pack}, $dbm_deep_obj->_base_offset) );
524     }
525     elsif ($r eq 'HASH') {
526         if ( !$dbm_deep_obj && tied %{$value} ) {
527             $self->_throw_error( "Cannot store something that is tied" );
528         }
529         $self->write_tag( $location, SIG_HASH, chr(0)x$self->{index_size} );
530     }
531     elsif ($r eq 'ARRAY') {
532         if ( !$dbm_deep_obj && tied @{$value} ) {
533             $self->_throw_error( "Cannot store something that is tied" );
534         }
535         $self->write_tag( $location, SIG_ARRAY, chr(0)x$self->{index_size} );
536     }
537     elsif (!defined($value)) {
538         $self->write_tag( $location, SIG_NULL, '' );
539     }
540     else {
541         $self->write_tag( $location, SIG_DATA, $value );
542     }
543
544     ##
545     # Plain key is stored AFTER value, as keys are typically fetched less often.
546     ##
547     $storage->print_at( undef, pack($self->{data_pack}, length($key)) . $key );
548
549     # Internal references don't care about autobless
550     return 1 if $dbm_deep_obj;
551
552     ##
553     # If value is blessed, preserve class name
554     ##
555     if ( $storage->{autobless} ) {
556         if ( defined( my $c = Scalar::Util::blessed($value) ) ) {
557             $storage->print_at( undef, chr(1), pack($self->{data_pack}, length($c)) . $c );
558         }
559         else {
560             $storage->print_at( undef, chr(0) );
561         }
562     }
563
564     ##
565     # Tie the passed in reference so that changes to it are reflected in the
566     # datafile. The use of $location as the base_offset will act as the
567     # the linkage between parent and child.
568     #
569     # The overall assignment is a hack around the fact that just tying doesn't
570     # store the values. This may not be the wrong thing to do.
571     ##
572     if ($r eq 'HASH') {
573         my %x = %$value;
574         tie %$value, 'DBM::Deep', {
575             base_offset => $key_loc,
576             storage     => $storage,
577             parent      => $self->{obj},
578             parent_key  => $orig_key,
579         };
580         %$value = %x;
581         bless $value, 'DBM::Deep::Hash' unless Scalar::Util::blessed( $value );
582     }
583     elsif ($r eq 'ARRAY') {
584         my @x = @$value;
585         tie @$value, 'DBM::Deep', {
586             base_offset => $key_loc,
587             storage     => $storage,
588             parent      => $self->{obj},
589             parent_key  => $orig_key,
590         };
591         @$value = @x;
592         bless $value, 'DBM::Deep::Array' unless Scalar::Util::blessed( $value );
593     }
594
595     return 1;
596 }
597
598 sub split_index {
599     my $self = shift;
600     my ($tag, $md5, $keyloc) = @_;
601
602     my $storage = $self->_storage;
603
604     my $loc = $storage->request_space(
605         $self->tag_size( $self->{index_size} ),
606     );
607
608     $storage->print_at( $tag->{ref_loc}, pack($self->{long_pack}, $loc) );
609
610     my $index_tag = $self->write_tag(
611         $loc, SIG_INDEX,
612         chr(0)x$self->{index_size},
613     );
614
615     my $keys = $tag->{content}
616              . $md5 . pack($self->{long_pack}, $keyloc);
617
618     my @newloc = ();
619     BUCKET:
620     # The <= here is deliberate - we have max_buckets+1 keys to iterate
621     # through, unlike every other loop that uses max_buckets as a stop.
622     for (my $i = 0; $i <= $self->{max_buckets}; $i++) {
623         my ($key, $old_subloc) = $self->_get_key_subloc( $keys, $i );
624
625         die "[INTERNAL ERROR]: No key in split_index()\n" unless $key;
626         die "[INTERNAL ERROR]: No subloc in split_index()\n" unless $old_subloc;
627
628         my $num = ord(substr($key, $tag->{ch} + 1, 1));
629
630         if ($newloc[$num]) {
631             my $subkeys = $storage->read_at( $newloc[$num], $self->{bucket_list_size} );
632
633             # This is looking for the first empty spot
634             my ($subloc, $offset) = $self->_find_in_buckets(
635                 { content => $subkeys }, '',
636             );
637
638             $storage->print_at(
639                 $newloc[$num] + $offset,
640                 $key, pack($self->{long_pack}, $old_subloc),
641             );
642
643             next;
644         }
645
646         my $loc = $storage->request_space(
647             $self->tag_size( $self->{bucket_list_size} ),
648         );
649
650         $storage->print_at(
651             $index_tag->{offset} + ($num * $self->{long_size}),
652             pack($self->{long_pack}, $loc),
653         );
654
655         my $blist_tag = $self->write_tag(
656             $loc, SIG_BLIST,
657             chr(0)x$self->{bucket_list_size},
658         );
659
660         $storage->print_at( $blist_tag->{offset}, $key . pack($self->{long_pack}, $old_subloc) );
661
662         $newloc[$num] = $blist_tag->{offset};
663     }
664
665     $self->_release_space(
666         $self->tag_size( $self->{bucket_list_size} ),
667         $tag->{start},
668     );
669
670     return 1;
671 }
672
673 sub read_from_loc {
674     my $self = shift;
675     my ($key_loc, $subloc, $orig_key) = @_;
676
677     my $storage = $self->_storage;
678
679     my $signature = $storage->read_at( $subloc, SIG_SIZE );
680
681     ##
682     # If value is a hash or array, return new DBM::Deep object with correct offset
683     ##
684     if (($signature eq SIG_HASH) || ($signature eq SIG_ARRAY)) {
685         #XXX This needs to be a singleton
686 #        my $new_obj;
687 #        my $is_autobless;
688 #        if ( $signature eq SIG_HASH ) {
689 #            $new_obj = {};
690 #            tie %$new_obj, 'DBM::Deep', {
691 #                base_offset => $subloc,
692 #                storage     => $self->_storage,
693 #                parent      => $self->{obj},
694 #                parent_key  => $orig_key,
695 #            };
696 #            $is_autobless = tied(%$new_obj)->_storage->{autobless};
697 #        }
698 #        else {
699 #            $new_obj = [];
700 #            tie @$new_obj, 'DBM::Deep', {
701 #                base_offset => $subloc,
702 #                storage     => $self->_storage,
703 #                parent      => $self->{obj},
704 #                parent_key  => $orig_key,
705 #            };
706 #            $is_autobless = tied(@$new_obj)->_storage->{autobless};
707 #        }
708 #
709 #        if ($is_autobless) {
710
711         my $new_obj = DBM::Deep->new({
712             type        => $signature,
713             base_offset => $key_loc,
714             storage     => $self->_storage,
715             parent      => $self->{obj},
716             parent_key  => $orig_key,
717         });
718
719         if ($new_obj->_storage->{autobless}) {
720             ##
721             # Skip over value and plain key to see if object needs
722             # to be re-blessed
723             ##
724             $storage->increment_pointer( $self->{data_size} + $self->{index_size} );
725
726             my $size = $storage->read_at( undef, $self->{data_size} );
727             $size = unpack($self->{data_pack}, $size);
728             if ($size) { $storage->increment_pointer( $size ); }
729
730             my $bless_bit = $storage->read_at( undef, 1 );
731             if ( ord($bless_bit) ) {
732                 my $size = unpack(
733                     $self->{data_pack},
734                     $storage->read_at( undef, $self->{data_size} ),
735                 );
736
737                 if ( $size ) {
738                     $new_obj = bless $new_obj, $storage->read_at( undef, $size );
739                 }
740             }
741         }
742
743         return $new_obj;
744     }
745     elsif ( $signature eq SIG_INTERNAL ) {
746         my $size = $storage->read_at( undef, $self->{data_size} );
747         $size = unpack($self->{data_pack}, $size);
748
749         if ( $size ) {
750             my $new_loc = $storage->read_at( undef, $size );
751             $new_loc = unpack( $self->{long_pack}, $new_loc ); 
752             return $self->read_from_loc( $key_loc, $new_loc, $orig_key );
753         }
754         else {
755             return;
756         }
757     }
758     ##
759     # Otherwise return actual value
760     ##
761     elsif ( $signature eq SIG_DATA ) {
762         my $size = $storage->read_at( undef, $self->{data_size} );
763         $size = unpack($self->{data_pack}, $size);
764
765         my $value = $size ? $storage->read_at( undef, $size ) : '';
766         return $value;
767     }
768
769     ##
770     # Key exists, but content is null
771     ##
772     return;
773 }
774
775 sub get_bucket_value {
776     ##
777     # Fetch single value given tag and MD5 digested key.
778     ##
779     my $self = shift;
780     my ($tag, $md5, $orig_key) = @_;
781
782     #ACID - This is a read. Can find exact or HEAD
783     my ($keyloc, $offset) = $self->_find_in_buckets( $tag, $md5 );
784
785     if ( !$keyloc ) {
786         #XXX Need to use real key
787 #        $self->add_bucket( $tag, $md5, $orig_key, undef, $orig_key );
788 #        return;
789     }
790 #    elsif ( !$is_deleted ) {
791     else {
792         my $keytag = $self->load_tag( $keyloc );
793         my ($subloc, $is_deleted) = $self->find_keyloc( $keytag );
794         if (!$subloc && !$is_deleted) {
795             ($subloc, $is_deleted) = $self->find_keyloc( $keytag, 0 );
796         }
797         if ( $subloc && !$is_deleted ) {
798             return $self->read_from_loc( $subloc, $orig_key );
799         }
800     }
801
802     return;
803 }
804
805 sub delete_bucket {
806     ##
807     # Delete single key/value pair given tag and MD5 digested key.
808     ##
809     my $self = shift;
810     my ($tag, $md5, $orig_key) = @_;
811
812     #ACID - Although this is a mutation, we must find any transaction.
813     # This is because we need to mark something as deleted that is in the HEAD.
814     my ($keyloc, $offset) = $self->_find_in_buckets( $tag, $md5 );
815
816     return if !$keyloc;
817
818     my $storage = $self->_storage;
819
820     my @transactions;
821     if ( $storage->transaction_id == 0 ) {
822         @transactions = $storage->current_transactions;
823     }
824
825     if ( $storage->transaction_id == 0 ) {
826         my $keytag = $self->load_tag( $keyloc );
827
828         my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
829         return if !$subloc || $is_deleted;
830
831         my $value = $self->read_from_loc( $subloc, $orig_key );
832
833         my $size = $self->_length_needed( $value, $orig_key );
834
835         for my $trans_id ( @transactions ) {
836             my ($loc, $is_deleted, $offset2) = $self->find_keyloc( $keytag, $trans_id );
837             unless ($loc) {
838                 my $location2 = $storage->request_space( $size );
839                 $storage->print_at( $keytag->{offset} + $offset2,
840                     pack($self->{long_pack}, $location2 ),
841                     pack( 'C C', $trans_id, 0 ),
842                 );
843                 $self->_write_value( $location2, $orig_key, $value, $orig_key );
844             }
845         }
846
847         $keytag = $self->load_tag( $keyloc );
848         ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
849         $storage->print_at( $keytag->{offset} + $offset,
850             substr( $keytag->{content}, $offset + $self->{key_size} ),
851             chr(0) x $self->{key_size},
852         );
853     }
854     else {
855         my $keytag = $self->load_tag( $keyloc );
856
857         my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
858
859         $storage->print_at( $keytag->{offset} + $offset,
860             pack($self->{long_pack}, 0 ),
861             pack( 'C C', $storage->transaction_id, 1 ),
862         );
863     }
864
865     return 1;
866 }
867
868 sub bucket_exists {
869     ##
870     # Check existence of single key given tag and MD5 digested key.
871     ##
872     my $self = shift;
873     my ($tag, $md5) = @_;
874
875     #ACID - This is a read. Can find exact or HEAD
876     my ($keyloc) = $self->_find_in_buckets( $tag, $md5 );
877     my $keytag = $self->load_tag( $keyloc );
878     my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
879     if ( !$subloc && !$is_deleted ) {
880         ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag, 0 );
881     }
882     return ($subloc && !$is_deleted) && 1;
883 }
884
885 sub find_blist {
886     ##
887     # Locate offset for bucket list, given digested key
888     ##
889     my $self = shift;
890     my ($offset, $md5, $args) = @_;
891     $args = {} unless $args;
892
893     ##
894     # Locate offset for bucket list using digest index system
895     ##
896     my $tag = $self->load_tag( $offset )
897         or $self->_throw_error( "INTERNAL ERROR - Cannot find tag" );
898
899     #XXX What happens when $ch >= $self->{hash_size} ??
900     for (my $ch = 0; $tag->{signature} ne SIG_BLIST; $ch++) {
901         my $num = ord substr($md5, $ch, 1);
902
903         my $ref_loc = $tag->{offset} + ($num * $self->{long_size});
904         $tag = $self->index_lookup( $tag, $num );
905
906         if (!$tag) {
907             return if !$args->{create};
908
909             my $loc = $self->_storage->request_space(
910                 $self->tag_size( $self->{bucket_list_size} ),
911             );
912
913             $self->_storage->print_at( $ref_loc, pack($self->{long_pack}, $loc) );
914
915             $tag = $self->write_tag(
916                 $loc, SIG_BLIST,
917                 chr(0)x$self->{bucket_list_size},
918             );
919
920             $tag->{ref_loc} = $ref_loc;
921             $tag->{ch} = $ch;
922
923             last;
924         }
925
926         $tag->{ch} = $ch;
927         $tag->{ref_loc} = $ref_loc;
928     }
929
930     return $tag;
931 }
932
933 sub index_lookup {
934     ##
935     # Given index tag, lookup single entry in index and return .
936     ##
937     my $self = shift;
938     my ($tag, $index) = @_;
939
940     my $location = unpack(
941         $self->{long_pack},
942         substr(
943             $tag->{content},
944             $index * $self->{long_size},
945             $self->{long_size},
946         ),
947     );
948
949     if (!$location) { return; }
950
951     return $self->load_tag( $location );
952 }
953
954 sub traverse_index {
955     ##
956     # Scan index and recursively step into deeper levels, looking for next key.
957     ##
958     my $self = shift;
959     my ($xxxx, $offset, $ch, $force_return_next) = @_;
960
961     my $tag = $self->load_tag( $offset );
962
963     if ($tag->{signature} ne SIG_BLIST) {
964         my $start = $xxxx->{return_next} ? 0 : ord(substr($xxxx->{prev_md5}, $ch, 1));
965
966         for (my $idx = $start; $idx < $self->{hash_chars_used}; $idx++) {
967             my $subloc = unpack(
968                 $self->{long_pack},
969                 substr(
970                     $tag->{content},
971                     $idx * $self->{long_size},
972                     $self->{long_size},
973                 ),
974             );
975
976             if ($subloc) {
977                 my $result = $self->traverse_index(
978                     $xxxx, $subloc, $ch + 1, $force_return_next,
979                 );
980
981                 if (defined $result) { return $result; }
982             }
983         } # index loop
984
985         $xxxx->{return_next} = 1;
986     }
987     # This is the bucket list
988     else {
989         my $keys = $tag->{content};
990         if ($force_return_next) { $xxxx->{return_next} = 1; }
991
992         ##
993         # Iterate through buckets, looking for a key match
994         ##
995         my $transaction_id = $self->_storage->transaction_id;
996         for (my $i = 0; $i < $self->{max_buckets}; $i++) {
997             my ($key, $keyloc) = $self->_get_key_subloc( $keys, $i );
998
999             # End of bucket list -- return to outer loop
1000             if (!$keyloc) {
1001                 $xxxx->{return_next} = 1;
1002                 last;
1003             }
1004             # Located previous key -- return next one found
1005             elsif ($key eq $xxxx->{prev_md5}) {
1006                 $xxxx->{return_next} = 1;
1007                 next;
1008             }
1009             # Seek to bucket location and skip over signature
1010             elsif ($xxxx->{return_next}) {
1011                 my $storage = $self->_storage;
1012
1013                 my $keytag = $self->load_tag( $keyloc );
1014                 my ($subloc, $is_deleted) = $self->find_keyloc( $keytag );
1015                 if ( $subloc == 0 && !$is_deleted ) {
1016                     ($subloc, $is_deleted) = $self->find_keyloc( $keytag, 0 );
1017                 }
1018                 next if $is_deleted;
1019
1020                 # Skip over value to get to plain key
1021                 my $sig = $storage->read_at( $subloc, SIG_SIZE );
1022
1023                 my $size = $storage->read_at( undef, $self->{data_size} );
1024                 $size = unpack($self->{data_pack}, $size);
1025                 if ($size) { $storage->increment_pointer( $size ); }
1026
1027                 # Read in plain key and return as scalar
1028                 $size = $storage->read_at( undef, $self->{data_size} );
1029                 $size = unpack($self->{data_pack}, $size);
1030
1031                 my $plain_key;
1032                 if ($size) { $plain_key = $storage->read_at( undef, $size); }
1033                 return $plain_key;
1034             }
1035         }
1036
1037         $xxxx->{return_next} = 1;
1038     }
1039
1040     return;
1041 }
1042
1043 # Utilities
1044
1045 sub _get_key_subloc {
1046     my $self = shift;
1047     my ($keys, $idx) = @_;
1048
1049     return unpack(
1050         # This is 'a', not 'A'. Please read the pack() documentation for the
1051         # difference between the two and why it's important.
1052         "a$self->{hash_size} $self->{long_pack}",
1053         substr(
1054             $keys,
1055             ($idx * $self->{bucket_size}),
1056             $self->{bucket_size},
1057         ),
1058     );
1059 }
1060
1061 sub _find_in_buckets {
1062     my $self = shift;
1063     my ($tag, $md5) = @_;
1064
1065     BUCKET:
1066     for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
1067         my ($key, $subloc) = $self->_get_key_subloc(
1068             $tag->{content}, $i,
1069         );
1070
1071         next BUCKET if $subloc && $key ne $md5;
1072         return( $subloc, $i * $self->{bucket_size} );
1073     }
1074
1075     return;
1076 }
1077
1078 sub _release_space {
1079     my $self = shift;
1080     my ($size, $loc) = @_;
1081
1082     my $next_loc = 0;
1083
1084     $self->_storage->print_at( $loc,
1085         SIG_FREE, 
1086         pack($self->{long_pack}, $size ),
1087         pack($self->{long_pack}, $next_loc ),
1088     );
1089
1090     return;
1091 }
1092
1093 sub _throw_error {
1094     die "DBM::Deep: $_[1]\n";
1095 }
1096
1097 sub _get_dbm_object {
1098     my $item = shift;
1099
1100     my $obj = eval {
1101         local $SIG{__DIE__};
1102         if ($item->isa( 'DBM::Deep' )) {
1103             return $item;
1104         }
1105         return;
1106     };
1107     return $obj if $obj;
1108
1109     my $r = Scalar::Util::reftype( $item ) || '';
1110     if ( $r eq 'HASH' ) {
1111         my $obj = eval {
1112             local $SIG{__DIE__};
1113             my $obj = tied(%$item);
1114             if ($obj->isa( 'DBM::Deep' )) {
1115                 return $obj;
1116             }
1117             return;
1118         };
1119         return $obj if $obj;
1120     }
1121     elsif ( $r eq 'ARRAY' ) {
1122         my $obj = eval {
1123             local $SIG{__DIE__};
1124             my $obj = tied(@$item);
1125             if ($obj->isa( 'DBM::Deep' )) {
1126                 return $obj;
1127             }
1128             return;
1129         };
1130         return $obj if $obj;
1131     }
1132
1133     return;
1134 }
1135
1136 sub _length_needed {
1137     my $self = shift;
1138     my ($value, $key) = @_;
1139
1140     my $is_dbm_deep = eval {
1141         local $SIG{'__DIE__'};
1142         $value->isa( 'DBM::Deep' );
1143     };
1144
1145     my $len = SIG_SIZE
1146             + $self->{data_size} # size for value
1147             + $self->{data_size} # size for key
1148             + length( $key );    # length of key
1149
1150     if ( $is_dbm_deep && $value->_storage eq $self->_storage ) {
1151         # long_size is for the internal reference
1152         return $len + $self->{long_size};
1153     }
1154
1155     if ( $self->_storage->{autobless} ) {
1156         # This is for the bit saying whether or not this thing is blessed.
1157         $len += 1;
1158     }
1159
1160     my $r = Scalar::Util::reftype( $value ) || '';
1161     unless ( $r eq 'HASH' || $r eq 'ARRAY' ) {
1162         if ( defined $value ) {
1163             $len += length( $value );
1164         }
1165         return $len;
1166     }
1167
1168     $len += $self->{index_size};
1169
1170     # if autobless is enabled, must also take into consideration
1171     # the class name as it is stored after the key.
1172     if ( $self->_storage->{autobless} ) {
1173         my $c = Scalar::Util::blessed($value);
1174         if ( defined $c && !$is_dbm_deep ) {
1175             $len += $self->{data_size} + length($c);
1176         }
1177     }
1178
1179     return $len;
1180 }
1181
1182 1;
1183 __END__