r14236@Rob-Kinyons-PowerBook: rob | 2006-06-14 23:07:31 -0400
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 use 5.6.0;
4
5 use strict;
6
7 our $VERSION = q(0.99_03);
8
9 use Fcntl qw( :DEFAULT :flock );
10 use Scalar::Util ();
11
12 # File-wide notes:
13 # * To add to bucket_size, make sure you modify the following:
14 #   - calculate_sizes()
15 #   - _get_key_subloc()
16 #   - add_bucket() - where the buckets are printed
17 #
18 # * Every method in here assumes that the _storage has been appropriately
19 #   safeguarded. This can be anything from flock() to some sort of manual
20 #   mutex. But, it's the caller's responsability to make sure that this has
21 #   been done.
22
23 ##
24 # Setup file and tag signatures.  These should never change.
25 ##
26 sub SIG_FILE     () { 'DPDB' }
27 sub SIG_HEADER   () { 'h'    }
28 sub SIG_INTERNAL () { 'i'    }
29 sub SIG_HASH     () { 'H'    }
30 sub SIG_ARRAY    () { 'A'    }
31 sub SIG_NULL     () { 'N'    }
32 sub SIG_DATA     () { 'D'    }
33 sub SIG_INDEX    () { 'I'    }
34 sub SIG_BLIST    () { 'B'    }
35 sub SIG_FREE     () { 'F'    }
36 sub SIG_KEYS     () { 'K'    }
37 sub SIG_SIZE     () {  1     }
38
39 # This is the transaction ID for the HEAD
40 sub HEAD () { 0 }
41
42 ################################################################################
43 #
44 # This is new code. It is a complete rewrite of the engine based on a new API
45 #
46 ################################################################################
47
48 sub read_value {
49     my $self = shift;
50     my ($trans_id, $offset, $key, $orig_key) = @_;
51
52     my $dig_key = $self->_apply_digest( $key );
53     my $tag = $self->find_blist( $offset, $dig_key ) or return;
54     return $self->get_bucket_value( $tag, $dig_key, $orig_key );
55 }
56
57 sub key_exists {
58     my $self = shift;
59     my ($trans_id, $offset, $key) = @_;
60
61     my $dig_key = $self->_apply_digest( $key );
62     # exists() returns the empty string, not undef
63     my $tag = $self->find_blist( $offset, $dig_key ) or return '';
64     return $self->bucket_exists( $tag, $dig_key, $key );
65 }
66
67 sub get_next_key {
68     my $self = shift;
69     my ($trans_id, $offset) = @_;
70
71     # If the previous key was not specifed, start at the top and
72     # return the first one found.
73     my $temp;
74     if ( @_ > 2 ) {
75         $temp = {
76             prev_md5    => $self->_apply_digest($_[2]),
77             return_next => 0,
78         };
79     }
80     else {
81         $temp = {
82             prev_md5    => chr(0) x $self->{hash_size},
83             return_next => 1,
84         };
85     }
86
87     return $self->traverse_index( $temp, $offset, 0 );
88 }
89
90 sub delete_key {
91     my $self = shift;
92     my ($trans_id, $offset, $key, $orig_key) = @_;
93
94     my $dig_key = $self->_apply_digest( $key );
95     my $tag = $self->find_blist( $offset, $dig_key ) or return;
96     my $value = $self->get_bucket_value( $tag, $dig_key, $orig_key );
97     $self->delete_bucket( $tag, $dig_key, $orig_key );
98     return $value;
99 }
100
101 sub write_value {
102     my $self = shift;
103     my ($trans_id, $offset, $key, $value, $orig_key) = @_;
104
105     my $dig_key = $self->_apply_digest( $key );
106     my $tag = $self->find_blist( $offset, $dig_key, { create => 1 } );
107     return $self->add_bucket( $tag, $dig_key, $key, $value, undef, $orig_key );
108 }
109
110 ################################################################################
111 #
112 # Below here is the old code. It will be folded into the code above as it can.
113 #
114 ################################################################################
115
116 sub new {
117     my $class = shift;
118     my ($args) = @_;
119
120     my $self = bless {
121         long_size => 4,
122         long_pack => 'N',
123         data_size => 4,
124         data_pack => 'N',
125
126         digest    => \&Digest::MD5::md5,
127         hash_size => 16, # In bytes
128
129         ##
130         # Number of buckets per blist before another level of indexing is
131         # done. Increase this value for slightly greater speed, but larger database
132         # files. DO NOT decrease this value below 16, due to risk of recursive
133         # reindex overrun.
134         ##
135         max_buckets => 16,
136
137         storage => undef,
138         obj     => undef,
139     }, $class;
140
141     if ( defined $args->{pack_size} ) {
142         if ( lc $args->{pack_size} eq 'small' ) {
143             $args->{long_size} = 2;
144             $args->{long_pack} = 'n';
145         }
146         elsif ( lc $args->{pack_size} eq 'medium' ) {
147             $args->{long_size} = 4;
148             $args->{long_pack} = 'N';
149         }
150         elsif ( lc $args->{pack_size} eq 'large' ) {
151             $args->{long_size} = 8;
152             $args->{long_pack} = 'Q';
153         }
154         else {
155             die "Unknown pack_size value: '$args->{pack_size}'\n";
156         }
157     }
158
159     # Grab the parameters we want to use
160     foreach my $param ( keys %$self ) {
161         next unless exists $args->{$param};
162         $self->{$param} = $args->{$param};
163     }
164     Scalar::Util::weaken( $self->{obj} ) if $self->{obj};
165
166     if ( $self->{max_buckets} < 16 ) {
167         warn "Floor of max_buckets is 16. Setting it to 16 from '$self->{max_buckets}'\n";
168         $self->{max_buckets} = 16;
169     }
170
171     return $self;
172 }
173
174 sub _storage { return $_[0]{storage} }
175
176 sub _apply_digest {
177     my $self = shift;
178     return $self->{digest}->(@_);
179 }
180
181 sub calculate_sizes {
182     my $self = shift;
183
184     # The 2**8 here indicates the number of different characters in the
185     # current hashing algorithm
186     #XXX Does this need to be updated with different hashing algorithms?
187     $self->{hash_chars_used}  = (2**8);
188     $self->{index_size}       = $self->{hash_chars_used} * $self->{long_size};
189
190     $self->{bucket_size}      = $self->{hash_size} + $self->{long_size} * 2;
191     $self->{bucket_list_size} = $self->{max_buckets} * $self->{bucket_size};
192
193     $self->{key_size}         = $self->{long_size} * 2;
194     $self->{keyloc_size}      = $self->{max_buckets} * $self->{key_size};
195
196     return;
197 }
198
199 sub write_file_header {
200     my $self = shift;
201
202     my $loc = $self->_storage->request_space( length( SIG_FILE ) + 33 );
203
204     $self->_storage->print_at( $loc,
205         SIG_FILE,
206         SIG_HEADER,
207         pack('N', 1),  # header version
208         pack('N', 24), # header size
209         pack('N4', 0, 0, 0, 0),  # currently running transaction IDs
210         pack('n', $self->{long_size}),
211         pack('A', $self->{long_pack}),
212         pack('n', $self->{data_size}),
213         pack('A', $self->{data_pack}),
214         pack('n', $self->{max_buckets}),
215     );
216
217     $self->_storage->set_transaction_offset( 13 );
218
219     return;
220 }
221
222 sub read_file_header {
223     my $self = shift;
224
225     my $buffer = $self->_storage->read_at( 0, length(SIG_FILE) + 9 );
226     return unless length($buffer);
227
228     my ($file_signature, $sig_header, $header_version, $size) = unpack(
229         'A4 A N N', $buffer
230     );
231
232     unless ( $file_signature eq SIG_FILE ) {
233         $self->_storage->close;
234         $self->_throw_error( "Signature not found -- file is not a Deep DB" );
235     }
236
237     unless ( $sig_header eq SIG_HEADER ) {
238         $self->_storage->close;
239         $self->_throw_error( "Old file version found." );
240     }
241
242     my $buffer2 = $self->_storage->read_at( undef, $size );
243     my ($a1, $a2, $a3, $a4, @values) = unpack( 'N4 n A n A n', $buffer2 );
244
245     $self->_storage->set_transaction_offset( 13 );
246
247     if ( @values < 5 || grep { !defined } @values ) {
248         $self->_storage->close;
249         $self->_throw_error("Corrupted file - bad header");
250     }
251
252     #XXX Add warnings if values weren't set right
253     @{$self}{qw(long_size long_pack data_size data_pack max_buckets)} = @values;
254
255     return length($buffer) + length($buffer2);
256 }
257
258 sub setup_fh {
259     my $self = shift;
260     my ($obj) = @_;
261
262     # Need to remove use of $fh here
263     my $fh = $self->_storage->{fh};
264     flock $fh, LOCK_EX;
265
266     #XXX The duplication of calculate_sizes needs to go away
267     unless ( $obj->{base_offset} ) {
268         my $bytes_read = $self->read_file_header;
269
270         $self->calculate_sizes;
271
272         ##
273         # File is empty -- write header and master index
274         ##
275         if (!$bytes_read) {
276             $self->_storage->audit( "# Database created on" );
277
278             $self->write_file_header;
279
280             $obj->{base_offset} = $self->_storage->request_space(
281                 $self->tag_size( $self->{index_size} ),
282             );
283
284             $self->write_tag(
285                 $obj->_base_offset, $obj->_type,
286                 chr(0)x$self->{index_size},
287             );
288
289             # Flush the filehandle
290             my $old_fh = select $fh;
291             my $old_af = $|; $| = 1; $| = $old_af;
292             select $old_fh;
293         }
294         else {
295             $obj->{base_offset} = $bytes_read;
296
297             ##
298             # Get our type from master index header
299             ##
300             my $tag = $self->load_tag($obj->_base_offset);
301             unless ( $tag ) {
302                 flock $fh, LOCK_UN;
303                 $self->_throw_error("Corrupted file, no master index record");
304             }
305
306             unless ($obj->_type eq $tag->{signature}) {
307                 flock $fh, LOCK_UN;
308                 $self->_throw_error("File type mismatch");
309             }
310         }
311     }
312     else {
313         $self->calculate_sizes;
314     }
315
316     #XXX We have to make sure we don't mess up when autoflush isn't turned on
317     $self->_storage->set_inode;
318
319     flock $fh, LOCK_UN;
320
321     return 1;
322 }
323
324 sub tag_size {
325     my $self = shift;
326     my ($size) = @_;
327     return SIG_SIZE + $self->{data_size} + $size;
328 }
329
330 sub write_tag {
331     ##
332     # Given offset, signature and content, create tag and write to disk
333     ##
334     my $self = shift;
335     my ($offset, $sig, $content) = @_;
336     my $size = length( $content );
337
338     $self->_storage->print_at(
339         $offset, 
340         $sig, pack($self->{data_pack}, $size), $content,
341     );
342
343     return unless defined $offset;
344
345     return {
346         signature => $sig,
347         #XXX Is this even used?
348         size      => $size,
349         start     => $offset,
350         offset    => $offset + SIG_SIZE + $self->{data_size},
351         content   => $content,
352         is_new    => 1,
353     };
354 }
355
356 sub load_tag {
357     ##
358     # Given offset, load single tag and return signature, size and data
359     ##
360     my $self = shift;
361     my ($offset) = @_;
362
363     my $storage = $self->_storage;
364
365     my ($sig, $size) = unpack(
366         "A $self->{data_pack}",
367         $storage->read_at( $offset, SIG_SIZE + $self->{data_size} ),
368     );
369
370     return {
371         signature => $sig,
372         size      => $size,   #XXX Is this even used?
373         start     => $offset,
374         offset    => $offset + SIG_SIZE + $self->{data_size},
375         content   => $storage->read_at( undef, $size ),
376     };
377 }
378
379 sub find_keyloc {
380     my $self = shift;
381     my ($tag, $transaction_id) = @_;
382     $transaction_id = $self->_storage->transaction_id
383         unless defined $transaction_id;
384
385     for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
386         my ($loc, $trans_id, $is_deleted) = unpack(
387             "$self->{long_pack} C C",
388             substr( $tag->{content}, $i * $self->{key_size}, $self->{key_size} ),
389         );
390
391         next if $loc != HEAD && $transaction_id != $trans_id;
392         return( $loc, $is_deleted, $i * $self->{key_size} );
393     }
394
395     return;
396 }
397
398 sub add_bucket {
399     ##
400     # Adds one key/value pair to bucket list, given offset, MD5 digest of key,
401     # plain (undigested) key and value.
402     ##
403     my $self = shift;
404     my ($tag, $md5, $plain_key, $value, $deleted, $orig_key) = @_;
405
406     # This verifies that only supported values will be stored.
407     {
408         my $r = Scalar::Util::reftype( $value );
409
410         last if !defined $r;
411         last if $r eq 'HASH';
412         last if $r eq 'ARRAY';
413
414         $self->_throw_error(
415             "Storage of references of type '$r' is not supported."
416         );
417     }
418
419     my $storage = $self->_storage;
420
421     #ACID - This is a mutation. Must only find the exact transaction
422     my ($keyloc, $offset) = $self->_find_in_buckets( $tag, $md5, 1 );
423
424     my @transactions;
425     if ( $storage->transaction_id == 0 ) {
426         @transactions = $storage->current_transactions;
427     }
428
429 #    $self->_release_space( $size, $subloc );
430 #XXX This needs updating to use _release_space
431
432     my $location;
433     my $size = $self->_length_needed( $value, $plain_key );
434
435     # Updating a known md5
436     if ( $keyloc ) {
437         my $keytag = $self->load_tag( $keyloc );
438         my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
439
440         if ( $subloc && !$is_deleted && @transactions ) {
441             my $old_value = $self->read_from_loc( $subloc, $orig_key );
442             my $old_size = $self->_length_needed( $old_value, $plain_key );
443
444             for my $trans_id ( @transactions ) {
445                 my ($loc, $is_deleted, $offset2) = $self->find_keyloc( $keytag, $trans_id );
446                 unless ($loc) {
447                     my $location2 = $storage->request_space( $old_size );
448                     $storage->print_at( $keytag->{offset} + $offset2,
449                         pack($self->{long_pack}, $location2 ),
450                         pack( 'C C', $trans_id, 0 ),
451                     );
452                     $self->_write_value( $location2, $plain_key, $old_value, $orig_key );
453                 }
454             }
455         }
456
457         $location = $self->_storage->request_space( $size );
458         #XXX This needs to be transactionally-aware in terms of which keytag->{offset} to use
459         $storage->print_at( $keytag->{offset} + $offset,
460             pack($self->{long_pack}, $location ),
461             pack( 'C C', $storage->transaction_id, 0 ),
462         );
463     }
464     # Adding a new md5
465     else {
466         my $keyloc = $storage->request_space( $self->tag_size( $self->{keyloc_size} ) );
467
468         # The bucket fit into list
469         if ( defined $offset ) {
470             $storage->print_at( $tag->{offset} + $offset,
471                 $md5, pack( $self->{long_pack}, $keyloc ),
472             );
473         }
474         # If bucket didn't fit into list, split into a new index level
475         else {
476             $self->split_index( $tag, $md5, $keyloc );
477         }
478
479         my $keytag = $self->write_tag(
480             $keyloc, SIG_KEYS, chr(0)x$self->{keyloc_size},
481         );
482
483         $location = $self->_storage->request_space( $size );
484         $storage->print_at( $keytag->{offset},
485             pack( $self->{long_pack}, $location ),
486             pack( 'C C', $storage->transaction_id, 0 ),
487         );
488
489         my $offset = 1;
490         for my $trans_id ( @transactions ) {
491             $storage->print_at( $keytag->{offset} + $self->{key_size} * $offset++,
492                 pack( $self->{long_pack}, 0 ),
493                 pack( 'C C', $trans_id, 1 ),
494             );
495         }
496     }
497
498     $self->_write_value( $location, $plain_key, $value, $orig_key );
499
500     return 1;
501 }
502
503 sub _write_value {
504     my $self = shift;
505     my ($key_loc, $location, $key, $value, $orig_key) = @_;
506
507     my $storage = $self->_storage;
508
509     my $dbm_deep_obj = _get_dbm_object( $value );
510     if ( $dbm_deep_obj && $dbm_deep_obj->_storage ne $storage ) {
511         $self->_throw_error( "Cannot cross-reference. Use export() instead" );
512     }
513
514     ##
515     # Write signature based on content type, set content length and write
516     # actual value.
517     ##
518     my $r = Scalar::Util::reftype( $value ) || '';
519     if ( $dbm_deep_obj ) {
520         $self->write_tag( $location, SIG_INTERNAL,pack($self->{long_pack}, $dbm_deep_obj->_base_offset) );
521     }
522     elsif ($r eq 'HASH') {
523         if ( !$dbm_deep_obj && tied %{$value} ) {
524             $self->_throw_error( "Cannot store something that is tied" );
525         }
526         $self->write_tag( $location, SIG_HASH, chr(0)x$self->{index_size} );
527     }
528     elsif ($r eq 'ARRAY') {
529         if ( !$dbm_deep_obj && tied @{$value} ) {
530             $self->_throw_error( "Cannot store something that is tied" );
531         }
532         $self->write_tag( $location, SIG_ARRAY, chr(0)x$self->{index_size} );
533     }
534     elsif (!defined($value)) {
535         $self->write_tag( $location, SIG_NULL, '' );
536     }
537     else {
538         $self->write_tag( $location, SIG_DATA, $value );
539     }
540
541     ##
542     # Plain key is stored AFTER value, as keys are typically fetched less often.
543     ##
544     $storage->print_at( undef, pack($self->{data_pack}, length($key)) . $key );
545
546     # Internal references don't care about autobless
547     return 1 if $dbm_deep_obj;
548
549     ##
550     # If value is blessed, preserve class name
551     ##
552     if ( $storage->{autobless} ) {
553         if ( defined( my $c = Scalar::Util::blessed($value) ) ) {
554             $storage->print_at( undef, chr(1), pack($self->{data_pack}, length($c)) . $c );
555         }
556         else {
557             $storage->print_at( undef, chr(0) );
558         }
559     }
560
561     ##
562     # Tie the passed in reference so that changes to it are reflected in the
563     # datafile. The use of $location as the base_offset will act as the
564     # the linkage between parent and child.
565     #
566     # The overall assignment is a hack around the fact that just tying doesn't
567     # store the values. This may not be the wrong thing to do.
568     ##
569     if ($r eq 'HASH') {
570         my %x = %$value;
571         tie %$value, 'DBM::Deep', {
572             base_offset => $key_loc,
573             storage     => $storage,
574             parent      => $self->{obj},
575             parent_key  => $orig_key,
576         };
577         %$value = %x;
578         bless $value, 'DBM::Deep::Hash' unless Scalar::Util::blessed( $value );
579     }
580     elsif ($r eq 'ARRAY') {
581         my @x = @$value;
582         tie @$value, 'DBM::Deep', {
583             base_offset => $key_loc,
584             storage     => $storage,
585             parent      => $self->{obj},
586             parent_key  => $orig_key,
587         };
588         @$value = @x;
589         bless $value, 'DBM::Deep::Array' unless Scalar::Util::blessed( $value );
590     }
591
592     return 1;
593 }
594
595 sub split_index {
596     my $self = shift;
597     my ($tag, $md5, $keyloc) = @_;
598
599     my $storage = $self->_storage;
600
601     my $loc = $storage->request_space(
602         $self->tag_size( $self->{index_size} ),
603     );
604
605     $storage->print_at( $tag->{ref_loc}, pack($self->{long_pack}, $loc) );
606
607     my $index_tag = $self->write_tag(
608         $loc, SIG_INDEX,
609         chr(0)x$self->{index_size},
610     );
611
612     my $keys = $tag->{content}
613              . $md5 . pack($self->{long_pack}, $keyloc);
614
615     my @newloc = ();
616     BUCKET:
617     # The <= here is deliberate - we have max_buckets+1 keys to iterate
618     # through, unlike every other loop that uses max_buckets as a stop.
619     for (my $i = 0; $i <= $self->{max_buckets}; $i++) {
620         my ($key, $old_subloc) = $self->_get_key_subloc( $keys, $i );
621
622         die "[INTERNAL ERROR]: No key in split_index()\n" unless $key;
623         die "[INTERNAL ERROR]: No subloc in split_index()\n" unless $old_subloc;
624
625         my $num = ord(substr($key, $tag->{ch} + 1, 1));
626
627         if ($newloc[$num]) {
628             my $subkeys = $storage->read_at( $newloc[$num], $self->{bucket_list_size} );
629
630             # This is looking for the first empty spot
631             my ($subloc, $offset) = $self->_find_in_buckets(
632                 { content => $subkeys }, '',
633             );
634
635             $storage->print_at(
636                 $newloc[$num] + $offset,
637                 $key, pack($self->{long_pack}, $old_subloc),
638             );
639
640             next;
641         }
642
643         my $loc = $storage->request_space(
644             $self->tag_size( $self->{bucket_list_size} ),
645         );
646
647         $storage->print_at(
648             $index_tag->{offset} + ($num * $self->{long_size}),
649             pack($self->{long_pack}, $loc),
650         );
651
652         my $blist_tag = $self->write_tag(
653             $loc, SIG_BLIST,
654             chr(0)x$self->{bucket_list_size},
655         );
656
657         $storage->print_at( $blist_tag->{offset}, $key . pack($self->{long_pack}, $old_subloc) );
658
659         $newloc[$num] = $blist_tag->{offset};
660     }
661
662     $self->_release_space(
663         $self->tag_size( $self->{bucket_list_size} ),
664         $tag->{start},
665     );
666
667     return 1;
668 }
669
670 sub read_from_loc {
671     my $self = shift;
672     my ($key_loc, $subloc, $orig_key) = @_;
673
674     my $storage = $self->_storage;
675
676     my $signature = $storage->read_at( $subloc, SIG_SIZE );
677
678     ##
679     # If value is a hash or array, return new DBM::Deep object with correct offset
680     ##
681     if (($signature eq SIG_HASH) || ($signature eq SIG_ARRAY)) {
682         #XXX This needs to be a singleton
683 #        my $new_obj;
684 #        my $is_autobless;
685 #        if ( $signature eq SIG_HASH ) {
686 #            $new_obj = {};
687 #            tie %$new_obj, 'DBM::Deep', {
688 #                base_offset => $subloc,
689 #                storage     => $self->_storage,
690 #                parent      => $self->{obj},
691 #                parent_key  => $orig_key,
692 #            };
693 #            $is_autobless = tied(%$new_obj)->_storage->{autobless};
694 #        }
695 #        else {
696 #            $new_obj = [];
697 #            tie @$new_obj, 'DBM::Deep', {
698 #                base_offset => $subloc,
699 #                storage     => $self->_storage,
700 #                parent      => $self->{obj},
701 #                parent_key  => $orig_key,
702 #            };
703 #            $is_autobless = tied(@$new_obj)->_storage->{autobless};
704 #        }
705 #
706 #        if ($is_autobless) {
707
708         my $new_obj = DBM::Deep->new({
709             type        => $signature,
710             base_offset => $key_loc,
711             storage     => $self->_storage,
712             parent      => $self->{obj},
713             parent_key  => $orig_key,
714         });
715
716         if ($new_obj->_storage->{autobless}) {
717             ##
718             # Skip over value and plain key to see if object needs
719             # to be re-blessed
720             ##
721             $storage->increment_pointer( $self->{data_size} + $self->{index_size} );
722
723             my $size = $storage->read_at( undef, $self->{data_size} );
724             $size = unpack($self->{data_pack}, $size);
725             if ($size) { $storage->increment_pointer( $size ); }
726
727             my $bless_bit = $storage->read_at( undef, 1 );
728             if ( ord($bless_bit) ) {
729                 my $size = unpack(
730                     $self->{data_pack},
731                     $storage->read_at( undef, $self->{data_size} ),
732                 );
733
734                 if ( $size ) {
735                     $new_obj = bless $new_obj, $storage->read_at( undef, $size );
736                 }
737             }
738         }
739
740         return $new_obj;
741     }
742     elsif ( $signature eq SIG_INTERNAL ) {
743         my $size = $storage->read_at( undef, $self->{data_size} );
744         $size = unpack($self->{data_pack}, $size);
745
746         if ( $size ) {
747             my $new_loc = $storage->read_at( undef, $size );
748             $new_loc = unpack( $self->{long_pack}, $new_loc ); 
749             return $self->read_from_loc( $key_loc, $new_loc, $orig_key );
750         }
751         else {
752             return;
753         }
754     }
755     ##
756     # Otherwise return actual value
757     ##
758     elsif ( $signature eq SIG_DATA ) {
759         my $size = $storage->read_at( undef, $self->{data_size} );
760         $size = unpack($self->{data_pack}, $size);
761
762         my $value = $size ? $storage->read_at( undef, $size ) : '';
763         return $value;
764     }
765
766     ##
767     # Key exists, but content is null
768     ##
769     return;
770 }
771
772 sub get_bucket_value {
773     ##
774     # Fetch single value given tag and MD5 digested key.
775     ##
776     my $self = shift;
777     my ($tag, $md5, $orig_key) = @_;
778
779     #ACID - This is a read. Can find exact or HEAD
780     my ($keyloc, $offset) = $self->_find_in_buckets( $tag, $md5 );
781
782     if ( !$keyloc ) {
783         #XXX Need to use real key
784 #        $self->add_bucket( $tag, $md5, $orig_key, undef, $orig_key );
785 #        return;
786     }
787 #    elsif ( !$is_deleted ) {
788     else {
789         my $keytag = $self->load_tag( $keyloc );
790         my ($subloc, $is_deleted) = $self->find_keyloc( $keytag );
791         if (!$subloc && !$is_deleted) {
792             ($subloc, $is_deleted) = $self->find_keyloc( $keytag, 0 );
793         }
794         if ( $subloc && !$is_deleted ) {
795             return $self->read_from_loc( $subloc, $orig_key );
796         }
797     }
798
799     return;
800 }
801
802 sub delete_bucket {
803     ##
804     # Delete single key/value pair given tag and MD5 digested key.
805     ##
806     my $self = shift;
807     my ($tag, $md5, $orig_key) = @_;
808
809     #ACID - Although this is a mutation, we must find any transaction.
810     # This is because we need to mark something as deleted that is in the HEAD.
811     my ($keyloc, $offset) = $self->_find_in_buckets( $tag, $md5 );
812
813     return if !$keyloc;
814
815     my $storage = $self->_storage;
816
817     my @transactions;
818     if ( $storage->transaction_id == 0 ) {
819         @transactions = $storage->current_transactions;
820     }
821
822     if ( $storage->transaction_id == 0 ) {
823         my $keytag = $self->load_tag( $keyloc );
824
825         my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
826         return if !$subloc || $is_deleted;
827
828         my $value = $self->read_from_loc( $subloc, $orig_key );
829
830         my $size = $self->_length_needed( $value, $orig_key );
831
832         for my $trans_id ( @transactions ) {
833             my ($loc, $is_deleted, $offset2) = $self->find_keyloc( $keytag, $trans_id );
834             unless ($loc) {
835                 my $location2 = $storage->request_space( $size );
836                 $storage->print_at( $keytag->{offset} + $offset2,
837                     pack($self->{long_pack}, $location2 ),
838                     pack( 'C C', $trans_id, 0 ),
839                 );
840                 $self->_write_value( $location2, $orig_key, $value, $orig_key );
841             }
842         }
843
844         $keytag = $self->load_tag( $keyloc );
845         ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
846         $storage->print_at( $keytag->{offset} + $offset,
847             substr( $keytag->{content}, $offset + $self->{key_size} ),
848             chr(0) x $self->{key_size},
849         );
850     }
851     else {
852         my $keytag = $self->load_tag( $keyloc );
853
854         my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
855
856         $storage->print_at( $keytag->{offset} + $offset,
857             pack($self->{long_pack}, 0 ),
858             pack( 'C C', $storage->transaction_id, 1 ),
859         );
860     }
861
862     return 1;
863 }
864
865 sub bucket_exists {
866     ##
867     # Check existence of single key given tag and MD5 digested key.
868     ##
869     my $self = shift;
870     my ($tag, $md5) = @_;
871
872     #ACID - This is a read. Can find exact or HEAD
873     my ($keyloc) = $self->_find_in_buckets( $tag, $md5 );
874     my $keytag = $self->load_tag( $keyloc );
875     my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
876     if ( !$subloc && !$is_deleted ) {
877         ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag, 0 );
878     }
879     return ($subloc && !$is_deleted) && 1;
880 }
881
882 sub find_blist {
883     ##
884     # Locate offset for bucket list, given digested key
885     ##
886     my $self = shift;
887     my ($offset, $md5, $args) = @_;
888     $args = {} unless $args;
889
890     ##
891     # Locate offset for bucket list using digest index system
892     ##
893     my $tag = $self->load_tag( $offset )
894         or $self->_throw_error( "INTERNAL ERROR - Cannot find tag" );
895
896     #XXX What happens when $ch >= $self->{hash_size} ??
897     for (my $ch = 0; $tag->{signature} ne SIG_BLIST; $ch++) {
898         my $num = ord substr($md5, $ch, 1);
899
900         my $ref_loc = $tag->{offset} + ($num * $self->{long_size});
901         $tag = $self->index_lookup( $tag, $num );
902
903         if (!$tag) {
904             return if !$args->{create};
905
906             my $loc = $self->_storage->request_space(
907                 $self->tag_size( $self->{bucket_list_size} ),
908             );
909
910             $self->_storage->print_at( $ref_loc, pack($self->{long_pack}, $loc) );
911
912             $tag = $self->write_tag(
913                 $loc, SIG_BLIST,
914                 chr(0)x$self->{bucket_list_size},
915             );
916
917             $tag->{ref_loc} = $ref_loc;
918             $tag->{ch} = $ch;
919
920             last;
921         }
922
923         $tag->{ch} = $ch;
924         $tag->{ref_loc} = $ref_loc;
925     }
926
927     return $tag;
928 }
929
930 sub index_lookup {
931     ##
932     # Given index tag, lookup single entry in index and return .
933     ##
934     my $self = shift;
935     my ($tag, $index) = @_;
936
937     my $location = unpack(
938         $self->{long_pack},
939         substr(
940             $tag->{content},
941             $index * $self->{long_size},
942             $self->{long_size},
943         ),
944     );
945
946     if (!$location) { return; }
947
948     return $self->load_tag( $location );
949 }
950
951 sub traverse_index {
952     ##
953     # Scan index and recursively step into deeper levels, looking for next key.
954     ##
955     my $self = shift;
956     my ($xxxx, $offset, $ch, $force_return_next) = @_;
957
958     my $tag = $self->load_tag( $offset );
959
960     if ($tag->{signature} ne SIG_BLIST) {
961         my $start = $xxxx->{return_next} ? 0 : ord(substr($xxxx->{prev_md5}, $ch, 1));
962
963         for (my $idx = $start; $idx < $self->{hash_chars_used}; $idx++) {
964             my $subloc = unpack(
965                 $self->{long_pack},
966                 substr(
967                     $tag->{content},
968                     $idx * $self->{long_size},
969                     $self->{long_size},
970                 ),
971             );
972
973             if ($subloc) {
974                 my $result = $self->traverse_index(
975                     $xxxx, $subloc, $ch + 1, $force_return_next,
976                 );
977
978                 if (defined $result) { return $result; }
979             }
980         } # index loop
981
982         $xxxx->{return_next} = 1;
983     }
984     # This is the bucket list
985     else {
986         my $keys = $tag->{content};
987         if ($force_return_next) { $xxxx->{return_next} = 1; }
988
989         ##
990         # Iterate through buckets, looking for a key match
991         ##
992         my $transaction_id = $self->_storage->transaction_id;
993         for (my $i = 0; $i < $self->{max_buckets}; $i++) {
994             my ($key, $keyloc) = $self->_get_key_subloc( $keys, $i );
995
996             # End of bucket list -- return to outer loop
997             if (!$keyloc) {
998                 $xxxx->{return_next} = 1;
999                 last;
1000             }
1001             # Located previous key -- return next one found
1002             elsif ($key eq $xxxx->{prev_md5}) {
1003                 $xxxx->{return_next} = 1;
1004                 next;
1005             }
1006             # Seek to bucket location and skip over signature
1007             elsif ($xxxx->{return_next}) {
1008                 my $storage = $self->_storage;
1009
1010                 my $keytag = $self->load_tag( $keyloc );
1011                 my ($subloc, $is_deleted) = $self->find_keyloc( $keytag );
1012                 if ( $subloc == 0 && !$is_deleted ) {
1013                     ($subloc, $is_deleted) = $self->find_keyloc( $keytag, 0 );
1014                 }
1015                 next if $is_deleted;
1016
1017                 # Skip over value to get to plain key
1018                 my $sig = $storage->read_at( $subloc, SIG_SIZE );
1019
1020                 my $size = $storage->read_at( undef, $self->{data_size} );
1021                 $size = unpack($self->{data_pack}, $size);
1022                 if ($size) { $storage->increment_pointer( $size ); }
1023
1024                 # Read in plain key and return as scalar
1025                 $size = $storage->read_at( undef, $self->{data_size} );
1026                 $size = unpack($self->{data_pack}, $size);
1027
1028                 my $plain_key;
1029                 if ($size) { $plain_key = $storage->read_at( undef, $size); }
1030                 return $plain_key;
1031             }
1032         }
1033
1034         $xxxx->{return_next} = 1;
1035     }
1036
1037     return;
1038 }
1039
1040 # Utilities
1041
1042 sub _get_key_subloc {
1043     my $self = shift;
1044     my ($keys, $idx) = @_;
1045
1046     return unpack(
1047         # This is 'a', not 'A'. Please read the pack() documentation for the
1048         # difference between the two and why it's important.
1049         "a$self->{hash_size} $self->{long_pack}",
1050         substr(
1051             $keys,
1052             ($idx * $self->{bucket_size}),
1053             $self->{bucket_size},
1054         ),
1055     );
1056 }
1057
1058 sub _find_in_buckets {
1059     my $self = shift;
1060     my ($tag, $md5) = @_;
1061
1062     BUCKET:
1063     for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
1064         my ($key, $subloc) = $self->_get_key_subloc(
1065             $tag->{content}, $i,
1066         );
1067
1068         next BUCKET if $subloc && $key ne $md5;
1069         return( $subloc, $i * $self->{bucket_size} );
1070     }
1071
1072     return;
1073 }
1074
1075 sub _release_space {
1076     my $self = shift;
1077     my ($size, $loc) = @_;
1078
1079     my $next_loc = 0;
1080
1081     $self->_storage->print_at( $loc,
1082         SIG_FREE, 
1083         pack($self->{long_pack}, $size ),
1084         pack($self->{long_pack}, $next_loc ),
1085     );
1086
1087     return;
1088 }
1089
1090 sub _throw_error {
1091     die "DBM::Deep: $_[1]\n";
1092 }
1093
1094 sub _get_dbm_object {
1095     my $item = shift;
1096
1097     my $obj = eval {
1098         local $SIG{__DIE__};
1099         if ($item->isa( 'DBM::Deep' )) {
1100             return $item;
1101         }
1102         return;
1103     };
1104     return $obj if $obj;
1105
1106     my $r = Scalar::Util::reftype( $item ) || '';
1107     if ( $r eq 'HASH' ) {
1108         my $obj = eval {
1109             local $SIG{__DIE__};
1110             my $obj = tied(%$item);
1111             if ($obj->isa( 'DBM::Deep' )) {
1112                 return $obj;
1113             }
1114             return;
1115         };
1116         return $obj if $obj;
1117     }
1118     elsif ( $r eq 'ARRAY' ) {
1119         my $obj = eval {
1120             local $SIG{__DIE__};
1121             my $obj = tied(@$item);
1122             if ($obj->isa( 'DBM::Deep' )) {
1123                 return $obj;
1124             }
1125             return;
1126         };
1127         return $obj if $obj;
1128     }
1129
1130     return;
1131 }
1132
1133 sub _length_needed {
1134     my $self = shift;
1135     my ($value, $key) = @_;
1136
1137     my $is_dbm_deep = eval {
1138         local $SIG{'__DIE__'};
1139         $value->isa( 'DBM::Deep' );
1140     };
1141
1142     my $len = SIG_SIZE
1143             + $self->{data_size} # size for value
1144             + $self->{data_size} # size for key
1145             + length( $key );    # length of key
1146
1147     if ( $is_dbm_deep && $value->_storage eq $self->_storage ) {
1148         # long_size is for the internal reference
1149         return $len + $self->{long_size};
1150     }
1151
1152     if ( $self->_storage->{autobless} ) {
1153         # This is for the bit saying whether or not this thing is blessed.
1154         $len += 1;
1155     }
1156
1157     my $r = Scalar::Util::reftype( $value ) || '';
1158     unless ( $r eq 'HASH' || $r eq 'ARRAY' ) {
1159         if ( defined $value ) {
1160             $len += length( $value );
1161         }
1162         return $len;
1163     }
1164
1165     $len += $self->{index_size};
1166
1167     # if autobless is enabled, must also take into consideration
1168     # the class name as it is stored after the key.
1169     if ( $self->_storage->{autobless} ) {
1170         my $c = Scalar::Util::blessed($value);
1171         if ( defined $c && !$is_dbm_deep ) {
1172             $len += $self->{data_size} + length($c);
1173         }
1174     }
1175
1176     return $len;
1177 }
1178
1179 1;
1180 __END__