r14010@Rob-Kinyons-PowerBook: rob | 2006-06-07 14:35:06 -0400
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 use 5.6.0;
4
5 use strict;
6
7 our $VERSION = q(0.99_03);
8
9 use Fcntl qw( :DEFAULT :flock );
10 use Scalar::Util ();
11
12 # File-wide notes:
13 # * To add to bucket_size, make sure you modify the following:
14 #   - calculate_sizes()
15 #   - _get_key_subloc()
16 #   - add_bucket() - where the buckets are printed
17 #
18 # * Every method in here assumes that the _storage has been appropriately
19 #   safeguarded. This can be anything from flock() to some sort of manual
20 #   mutex. But, it's the caller's responsability to make sure that this has
21 #   been done.
22
23 ##
24 # Setup file and tag signatures.  These should never change.
25 ##
26 sub SIG_FILE     () { 'DPDB' }
27 sub SIG_HEADER   () { 'h'    }
28 sub SIG_INTERNAL () { 'i'    }
29 sub SIG_HASH     () { 'H'    }
30 sub SIG_ARRAY    () { 'A'    }
31 sub SIG_NULL     () { 'N'    }
32 sub SIG_DATA     () { 'D'    }
33 sub SIG_INDEX    () { 'I'    }
34 sub SIG_BLIST    () { 'B'    }
35 sub SIG_FREE     () { 'F'    }
36 sub SIG_KEYS     () { 'K'    }
37 sub SIG_SIZE     () {  1     }
38
39 # This is the transaction ID for the HEAD
40 sub HEAD () { 0 }
41
42 ################################################################################
43 #
44 # This is new code. It is a complete rewrite of the engine based on a new API
45 #
46 ################################################################################
47
48 sub read_value {
49     my $self = shift;
50     my ($trans_id, $offset, $key, $orig_key) = @_;
51
52     my $dig_key = $self->_apply_digest( $key );
53     my $tag = $self->find_blist( $offset, $dig_key ) or return;
54     return $self->get_bucket_value( $tag, $dig_key, $orig_key );
55 }
56
57 sub key_exists {
58     my $self = shift;
59     my ($trans_id, $offset, $key) = @_;
60
61     my $dig_key = $self->_apply_digest( $key );
62     # exists() returns the empty string, not undef
63     my $tag = $self->find_blist( $offset, $dig_key ) or return '';
64     return $self->bucket_exists( $tag, $dig_key, $key );
65 }
66
67 =pod
68 sub key_exists {
69     my $self = shift;
70     my ($trans_id, $base_offset, $key) = @_;
71     
72     my ($_val_offset, $_is_del) = $self->_find_value_offset({
73         offset     => $base_offset,
74         trans_id   => $trans_id,
75         allow_head => 1,
76     });
77     die "Attempt to use a deleted value" if $_is_del;
78     die "Internal error!" if !$_val_offset;
79
80     my ($key_offset) = $self->_find_key_offset({
81         offset  => $_val_offset,
82         key_md5 => $self->_apply_digest( $key ),
83         create  => 0,
84     });
85     return if !$key_offset;
86
87     my ($val_offset, $is_del) = $self->_find_value_offset({
88         offset     => $key_offset,
89         trans_id   => $trans_id,
90         allow_head => 1,
91     });
92
93     return 1 if $is_del;
94
95     die "Internal error!" if !$_val_offset;
96     return '';
97 }
98 =cut
99
100 sub get_next_key {
101     my $self = shift;
102     my ($trans_id, $offset) = @_;
103
104     # If the previous key was not specifed, start at the top and
105     # return the first one found.
106     my $temp;
107     if ( @_ > 2 ) {
108         $temp = {
109             prev_md5    => $self->_apply_digest($_[2]),
110             return_next => 0,
111         };
112     }
113     else {
114         $temp = {
115             prev_md5    => chr(0) x $self->{hash_size},
116             return_next => 1,
117         };
118     }
119
120     return $self->traverse_index( $temp, $offset, 0 );
121 }
122
123 sub delete_key {
124     my $self = shift;
125     my ($trans_id, $offset, $key, $orig_key) = @_;
126
127     my $dig_key = $self->_apply_digest( $key );
128     my $tag = $self->find_blist( $offset, $dig_key ) or return;
129     my $value = $self->get_bucket_value( $tag, $dig_key, $orig_key );
130     $self->delete_bucket( $tag, $dig_key, $orig_key );
131     return $value;
132 }
133
134 sub write_value {
135     my $self = shift;
136     my ($trans_id, $offset, $key, $value, $orig_key) = @_;
137
138     my $dig_key = $self->_apply_digest( $key );
139     my $tag = $self->find_blist( $offset, $dig_key, { create => 1 } );
140     return $self->add_bucket( $tag, $dig_key, $key, $value, undef, $orig_key );
141 }
142
143 ################################################################################
144 #
145 # Below here is the old code. It will be folded into the code above as it can.
146 #
147 ################################################################################
148
149 sub new {
150     my $class = shift;
151     my ($args) = @_;
152
153     my $self = bless {
154         long_size => 4,
155         long_pack => 'N',
156         data_size => 4,
157         data_pack => 'N',
158
159         digest    => \&Digest::MD5::md5,
160         hash_size => 16, # In bytes
161
162         ##
163         # Number of buckets per blist before another level of indexing is
164         # done. Increase this value for slightly greater speed, but larger database
165         # files. DO NOT decrease this value below 16, due to risk of recursive
166         # reindex overrun.
167         ##
168         max_buckets => 16,
169
170         storage => undef,
171         obj     => undef,
172     }, $class;
173
174     if ( defined $args->{pack_size} ) {
175         if ( lc $args->{pack_size} eq 'small' ) {
176             $args->{long_size} = 2;
177             $args->{long_pack} = 'n';
178         }
179         elsif ( lc $args->{pack_size} eq 'medium' ) {
180             $args->{long_size} = 4;
181             $args->{long_pack} = 'N';
182         }
183         elsif ( lc $args->{pack_size} eq 'large' ) {
184             $args->{long_size} = 8;
185             $args->{long_pack} = 'Q';
186         }
187         else {
188             die "Unknown pack_size value: '$args->{pack_size}'\n";
189         }
190     }
191
192     # Grab the parameters we want to use
193     foreach my $param ( keys %$self ) {
194         next unless exists $args->{$param};
195         $self->{$param} = $args->{$param};
196     }
197     Scalar::Util::weaken( $self->{obj} ) if $self->{obj};
198
199     if ( $self->{max_buckets} < 16 ) {
200         warn "Floor of max_buckets is 16. Setting it to 16 from '$self->{max_buckets}'\n";
201         $self->{max_buckets} = 16;
202     }
203
204     return $self;
205 }
206
207 sub _storage { return $_[0]{storage} }
208
209 sub _apply_digest {
210     my $self = shift;
211     return $self->{digest}->(@_);
212 }
213
214 sub calculate_sizes {
215     my $self = shift;
216
217     # The 2**8 here indicates the number of different characters in the
218     # current hashing algorithm
219     #XXX Does this need to be updated with different hashing algorithms?
220     $self->{hash_chars_used}  = (2**8);
221     $self->{index_size}       = $self->{hash_chars_used} * $self->{long_size};
222
223     $self->{bucket_size}      = $self->{hash_size} + $self->{long_size} * 2;
224     $self->{bucket_list_size} = $self->{max_buckets} * $self->{bucket_size};
225
226     $self->{key_size}         = $self->{long_size} * 2;
227     $self->{keyloc_size}      = $self->{max_buckets} * $self->{key_size};
228
229     return;
230 }
231
232 sub write_file_header {
233     my $self = shift;
234
235     my $loc = $self->_storage->request_space( length( SIG_FILE ) + 33 );
236
237     $self->_storage->print_at( $loc,
238         SIG_FILE,
239         SIG_HEADER,
240         pack('N', 1),  # header version
241         pack('N', 24), # header size
242         pack('N4', 0, 0, 0, 0),  # currently running transaction IDs
243         pack('n', $self->{long_size}),
244         pack('A', $self->{long_pack}),
245         pack('n', $self->{data_size}),
246         pack('A', $self->{data_pack}),
247         pack('n', $self->{max_buckets}),
248     );
249
250     $self->_storage->set_transaction_offset( 13 );
251
252     return;
253 }
254
255 sub read_file_header {
256     my $self = shift;
257
258     my $buffer = $self->_storage->read_at( 0, length(SIG_FILE) + 9 );
259     return unless length($buffer);
260
261     my ($file_signature, $sig_header, $header_version, $size) = unpack(
262         'A4 A N N', $buffer
263     );
264
265     unless ( $file_signature eq SIG_FILE ) {
266         $self->_storage->close;
267         $self->_throw_error( "Signature not found -- file is not a Deep DB" );
268     }
269
270     unless ( $sig_header eq SIG_HEADER ) {
271         $self->_storage->close;
272         $self->_throw_error( "Old file version found." );
273     }
274
275     my $buffer2 = $self->_storage->read_at( undef, $size );
276     my ($a1, $a2, $a3, $a4, @values) = unpack( 'N4 n A n A n', $buffer2 );
277
278     $self->_storage->set_transaction_offset( 13 );
279
280     if ( @values < 5 || grep { !defined } @values ) {
281         $self->_storage->close;
282         $self->_throw_error("Corrupted file - bad header");
283     }
284
285     #XXX Add warnings if values weren't set right
286     @{$self}{qw(long_size long_pack data_size data_pack max_buckets)} = @values;
287
288     return length($buffer) + length($buffer2);
289 }
290
291 sub setup_fh {
292     my $self = shift;
293     my ($obj) = @_;
294
295     # Need to remove use of $fh here
296     my $fh = $self->_storage->{fh};
297     flock $fh, LOCK_EX;
298
299     #XXX The duplication of calculate_sizes needs to go away
300     unless ( $obj->{base_offset} ) {
301         my $bytes_read = $self->read_file_header;
302
303         $self->calculate_sizes;
304
305         ##
306         # File is empty -- write header and master index
307         ##
308         if (!$bytes_read) {
309             $self->_storage->audit( "# Database created on" );
310
311             $self->write_file_header;
312
313             $obj->{base_offset} = $self->_storage->request_space(
314                 $self->tag_size( $self->{index_size} ),
315             );
316
317             $self->write_tag(
318                 $obj->_base_offset, $obj->_type,
319                 chr(0)x$self->{index_size},
320             );
321
322             # Flush the filehandle
323             my $old_fh = select $fh;
324             my $old_af = $|; $| = 1; $| = $old_af;
325             select $old_fh;
326         }
327         else {
328             $obj->{base_offset} = $bytes_read;
329
330             ##
331             # Get our type from master index header
332             ##
333             my $tag = $self->load_tag($obj->_base_offset);
334             unless ( $tag ) {
335                 flock $fh, LOCK_UN;
336                 $self->_throw_error("Corrupted file, no master index record");
337             }
338
339             unless ($obj->_type eq $tag->{signature}) {
340                 flock $fh, LOCK_UN;
341                 $self->_throw_error("File type mismatch");
342             }
343         }
344     }
345     else {
346         $self->calculate_sizes;
347     }
348
349     #XXX We have to make sure we don't mess up when autoflush isn't turned on
350     $self->_storage->set_inode;
351
352     flock $fh, LOCK_UN;
353
354     return 1;
355 }
356
357 sub tag_size {
358     my $self = shift;
359     my ($size) = @_;
360     return SIG_SIZE + $self->{data_size} + $size;
361 }
362
363 sub write_tag {
364     ##
365     # Given offset, signature and content, create tag and write to disk
366     ##
367     my $self = shift;
368     my ($offset, $sig, $content) = @_;
369     my $size = length( $content );
370
371     $self->_storage->print_at(
372         $offset, 
373         $sig, pack($self->{data_pack}, $size), $content,
374     );
375
376     return unless defined $offset;
377
378     return {
379         signature => $sig,
380         #XXX Is this even used?
381         size      => $size,
382         start     => $offset,
383         offset    => $offset + SIG_SIZE + $self->{data_size},
384         content   => $content,
385         is_new    => 1,
386     };
387 }
388
389 sub load_tag {
390     ##
391     # Given offset, load single tag and return signature, size and data
392     ##
393     my $self = shift;
394     my ($offset) = @_;
395
396     my $storage = $self->_storage;
397
398     my ($sig, $size) = unpack(
399         "A $self->{data_pack}",
400         $storage->read_at( $offset, SIG_SIZE + $self->{data_size} ),
401     );
402
403     return {
404         signature => $sig,
405         size      => $size,   #XXX Is this even used?
406         start     => $offset,
407         offset    => $offset + SIG_SIZE + $self->{data_size},
408         content   => $storage->read_at( undef, $size ),
409     };
410 }
411
412 sub find_keyloc {
413     my $self = shift;
414     my ($tag, $transaction_id) = @_;
415     $transaction_id = $self->_storage->transaction_id
416         unless defined $transaction_id;
417
418     for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
419         my ($loc, $trans_id, $is_deleted) = unpack(
420             "$self->{long_pack} C C",
421             substr( $tag->{content}, $i * $self->{key_size}, $self->{key_size} ),
422         );
423
424         next if $loc != HEAD && $transaction_id != $trans_id;
425         return( $loc, $is_deleted, $i * $self->{key_size} );
426     }
427
428     return;
429 }
430
431 sub add_bucket {
432     ##
433     # Adds one key/value pair to bucket list, given offset, MD5 digest of key,
434     # plain (undigested) key and value.
435     ##
436     my $self = shift;
437     my ($tag, $md5, $plain_key, $value, $deleted, $orig_key) = @_;
438
439     # This verifies that only supported values will be stored.
440     {
441         my $r = Scalar::Util::reftype( $value );
442
443         last if !defined $r;
444         last if $r eq 'HASH';
445         last if $r eq 'ARRAY';
446
447         $self->_throw_error(
448             "Storage of references of type '$r' is not supported."
449         );
450     }
451
452     my $storage = $self->_storage;
453
454     #ACID - This is a mutation. Must only find the exact transaction
455     my ($keyloc, $offset) = $self->_find_in_buckets( $tag, $md5, 1 );
456
457     my @transactions;
458     if ( $storage->transaction_id == 0 ) {
459         @transactions = $storage->current_transactions;
460     }
461
462 #    $self->_release_space( $size, $subloc );
463 #XXX This needs updating to use _release_space
464
465     my $location;
466     my $size = $self->_length_needed( $value, $plain_key );
467
468     # Updating a known md5
469     if ( $keyloc ) {
470         my $keytag = $self->load_tag( $keyloc );
471         my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
472
473         if ( $subloc && !$is_deleted && @transactions ) {
474             my $old_value = $self->read_from_loc( $subloc, $orig_key );
475             my $old_size = $self->_length_needed( $old_value, $plain_key );
476
477             for my $trans_id ( @transactions ) {
478                 my ($loc, $is_deleted, $offset2) = $self->find_keyloc( $keytag, $trans_id );
479                 unless ($loc) {
480                     my $location2 = $storage->request_space( $old_size );
481                     $storage->print_at( $keytag->{offset} + $offset2,
482                         pack($self->{long_pack}, $location2 ),
483                         pack( 'C C', $trans_id, 0 ),
484                     );
485                     $self->_write_value( $location2, $plain_key, $old_value, $orig_key );
486                 }
487             }
488         }
489
490         $location = $self->_storage->request_space( $size );
491         #XXX This needs to be transactionally-aware in terms of which keytag->{offset} to use
492         $storage->print_at( $keytag->{offset} + $offset,
493             pack($self->{long_pack}, $location ),
494             pack( 'C C', $storage->transaction_id, 0 ),
495         );
496     }
497     # Adding a new md5
498     else {
499         my $keyloc = $storage->request_space( $self->tag_size( $self->{keyloc_size} ) );
500
501         # The bucket fit into list
502         if ( defined $offset ) {
503             $storage->print_at( $tag->{offset} + $offset,
504                 $md5, pack( $self->{long_pack}, $keyloc ),
505             );
506         }
507         # If bucket didn't fit into list, split into a new index level
508         else {
509             $self->split_index( $tag, $md5, $keyloc );
510         }
511
512         my $keytag = $self->write_tag(
513             $keyloc, SIG_KEYS, chr(0)x$self->{keyloc_size},
514         );
515
516         $location = $self->_storage->request_space( $size );
517         $storage->print_at( $keytag->{offset},
518             pack( $self->{long_pack}, $location ),
519             pack( 'C C', $storage->transaction_id, 0 ),
520         );
521
522         my $offset = 1;
523         for my $trans_id ( @transactions ) {
524             $storage->print_at( $keytag->{offset} + $self->{key_size} * $offset++,
525                 pack( $self->{long_pack}, 0 ),
526                 pack( 'C C', $trans_id, 1 ),
527             );
528         }
529     }
530
531     $self->_write_value( $location, $plain_key, $value, $orig_key );
532
533     return 1;
534 }
535
536 sub _write_value {
537     my $self = shift;
538     my ($key_loc, $location, $key, $value, $orig_key) = @_;
539
540     my $storage = $self->_storage;
541
542     my $dbm_deep_obj = _get_dbm_object( $value );
543     if ( $dbm_deep_obj && $dbm_deep_obj->_storage ne $storage ) {
544         $self->_throw_error( "Cannot cross-reference. Use export() instead" );
545     }
546
547     ##
548     # Write signature based on content type, set content length and write
549     # actual value.
550     ##
551     my $r = Scalar::Util::reftype( $value ) || '';
552     if ( $dbm_deep_obj ) {
553         $self->write_tag( $location, SIG_INTERNAL,pack($self->{long_pack}, $dbm_deep_obj->_base_offset) );
554     }
555     elsif ($r eq 'HASH') {
556         if ( !$dbm_deep_obj && tied %{$value} ) {
557             $self->_throw_error( "Cannot store something that is tied" );
558         }
559         $self->write_tag( $location, SIG_HASH, chr(0)x$self->{index_size} );
560     }
561     elsif ($r eq 'ARRAY') {
562         if ( !$dbm_deep_obj && tied @{$value} ) {
563             $self->_throw_error( "Cannot store something that is tied" );
564         }
565         $self->write_tag( $location, SIG_ARRAY, chr(0)x$self->{index_size} );
566     }
567     elsif (!defined($value)) {
568         $self->write_tag( $location, SIG_NULL, '' );
569     }
570     else {
571         $self->write_tag( $location, SIG_DATA, $value );
572     }
573
574     ##
575     # Plain key is stored AFTER value, as keys are typically fetched less often.
576     ##
577     $storage->print_at( undef, pack($self->{data_pack}, length($key)) . $key );
578
579     # Internal references don't care about autobless
580     return 1 if $dbm_deep_obj;
581
582     ##
583     # If value is blessed, preserve class name
584     ##
585     if ( $storage->{autobless} ) {
586         if ( defined( my $c = Scalar::Util::blessed($value) ) ) {
587             $storage->print_at( undef, chr(1), pack($self->{data_pack}, length($c)) . $c );
588         }
589         else {
590             $storage->print_at( undef, chr(0) );
591         }
592     }
593
594     ##
595     # Tie the passed in reference so that changes to it are reflected in the
596     # datafile. The use of $location as the base_offset will act as the
597     # the linkage between parent and child.
598     #
599     # The overall assignment is a hack around the fact that just tying doesn't
600     # store the values. This may not be the wrong thing to do.
601     ##
602     if ($r eq 'HASH') {
603         my %x = %$value;
604         tie %$value, 'DBM::Deep', {
605             base_offset => $key_loc,
606             storage     => $storage,
607             parent      => $self->{obj},
608             parent_key  => $orig_key,
609         };
610         %$value = %x;
611         bless $value, 'DBM::Deep::Hash' unless Scalar::Util::blessed( $value );
612     }
613     elsif ($r eq 'ARRAY') {
614         my @x = @$value;
615         tie @$value, 'DBM::Deep', {
616             base_offset => $key_loc,
617             storage     => $storage,
618             parent      => $self->{obj},
619             parent_key  => $orig_key,
620         };
621         @$value = @x;
622         bless $value, 'DBM::Deep::Array' unless Scalar::Util::blessed( $value );
623     }
624
625     return 1;
626 }
627
628 sub split_index {
629     my $self = shift;
630     my ($tag, $md5, $keyloc) = @_;
631
632     my $storage = $self->_storage;
633
634     my $loc = $storage->request_space(
635         $self->tag_size( $self->{index_size} ),
636     );
637
638     $storage->print_at( $tag->{ref_loc}, pack($self->{long_pack}, $loc) );
639
640     my $index_tag = $self->write_tag(
641         $loc, SIG_INDEX,
642         chr(0)x$self->{index_size},
643     );
644
645     my $keys = $tag->{content}
646              . $md5 . pack($self->{long_pack}, $keyloc);
647
648     my @newloc = ();
649     BUCKET:
650     # The <= here is deliberate - we have max_buckets+1 keys to iterate
651     # through, unlike every other loop that uses max_buckets as a stop.
652     for (my $i = 0; $i <= $self->{max_buckets}; $i++) {
653         my ($key, $old_subloc) = $self->_get_key_subloc( $keys, $i );
654
655         die "[INTERNAL ERROR]: No key in split_index()\n" unless $key;
656         die "[INTERNAL ERROR]: No subloc in split_index()\n" unless $old_subloc;
657
658         my $num = ord(substr($key, $tag->{ch} + 1, 1));
659
660         if ($newloc[$num]) {
661             my $subkeys = $storage->read_at( $newloc[$num], $self->{bucket_list_size} );
662
663             # This is looking for the first empty spot
664             my ($subloc, $offset) = $self->_find_in_buckets(
665                 { content => $subkeys }, '',
666             );
667
668             $storage->print_at(
669                 $newloc[$num] + $offset,
670                 $key, pack($self->{long_pack}, $old_subloc),
671             );
672
673             next;
674         }
675
676         my $loc = $storage->request_space(
677             $self->tag_size( $self->{bucket_list_size} ),
678         );
679
680         $storage->print_at(
681             $index_tag->{offset} + ($num * $self->{long_size}),
682             pack($self->{long_pack}, $loc),
683         );
684
685         my $blist_tag = $self->write_tag(
686             $loc, SIG_BLIST,
687             chr(0)x$self->{bucket_list_size},
688         );
689
690         $storage->print_at( $blist_tag->{offset}, $key . pack($self->{long_pack}, $old_subloc) );
691
692         $newloc[$num] = $blist_tag->{offset};
693     }
694
695     $self->_release_space(
696         $self->tag_size( $self->{bucket_list_size} ),
697         $tag->{start},
698     );
699
700     return 1;
701 }
702
703 sub read_from_loc {
704     my $self = shift;
705     my ($key_loc, $subloc, $orig_key) = @_;
706
707     my $storage = $self->_storage;
708
709     my $signature = $storage->read_at( $subloc, SIG_SIZE );
710
711     ##
712     # If value is a hash or array, return new DBM::Deep object with correct offset
713     ##
714     if (($signature eq SIG_HASH) || ($signature eq SIG_ARRAY)) {
715         #XXX This needs to be a singleton
716 #        my $new_obj;
717 #        my $is_autobless;
718 #        if ( $signature eq SIG_HASH ) {
719 #            $new_obj = {};
720 #            tie %$new_obj, 'DBM::Deep', {
721 #                base_offset => $subloc,
722 #                storage     => $self->_storage,
723 #                parent      => $self->{obj},
724 #                parent_key  => $orig_key,
725 #            };
726 #            $is_autobless = tied(%$new_obj)->_storage->{autobless};
727 #        }
728 #        else {
729 #            $new_obj = [];
730 #            tie @$new_obj, 'DBM::Deep', {
731 #                base_offset => $subloc,
732 #                storage     => $self->_storage,
733 #                parent      => $self->{obj},
734 #                parent_key  => $orig_key,
735 #            };
736 #            $is_autobless = tied(@$new_obj)->_storage->{autobless};
737 #        }
738 #
739 #        if ($is_autobless) {
740
741         my $new_obj = DBM::Deep->new({
742             type        => $signature,
743             base_offset => $key_loc,
744             storage     => $self->_storage,
745             parent      => $self->{obj},
746             parent_key  => $orig_key,
747         });
748
749         if ($new_obj->_storage->{autobless}) {
750             ##
751             # Skip over value and plain key to see if object needs
752             # to be re-blessed
753             ##
754             $storage->increment_pointer( $self->{data_size} + $self->{index_size} );
755
756             my $size = $storage->read_at( undef, $self->{data_size} );
757             $size = unpack($self->{data_pack}, $size);
758             if ($size) { $storage->increment_pointer( $size ); }
759
760             my $bless_bit = $storage->read_at( undef, 1 );
761             if ( ord($bless_bit) ) {
762                 my $size = unpack(
763                     $self->{data_pack},
764                     $storage->read_at( undef, $self->{data_size} ),
765                 );
766
767                 if ( $size ) {
768                     $new_obj = bless $new_obj, $storage->read_at( undef, $size );
769                 }
770             }
771         }
772
773         return $new_obj;
774     }
775     elsif ( $signature eq SIG_INTERNAL ) {
776         my $size = $storage->read_at( undef, $self->{data_size} );
777         $size = unpack($self->{data_pack}, $size);
778
779         if ( $size ) {
780             my $new_loc = $storage->read_at( undef, $size );
781             $new_loc = unpack( $self->{long_pack}, $new_loc ); 
782             return $self->read_from_loc( $key_loc, $new_loc, $orig_key );
783         }
784         else {
785             return;
786         }
787     }
788     ##
789     # Otherwise return actual value
790     ##
791     elsif ( $signature eq SIG_DATA ) {
792         my $size = $storage->read_at( undef, $self->{data_size} );
793         $size = unpack($self->{data_pack}, $size);
794
795         my $value = $size ? $storage->read_at( undef, $size ) : '';
796         return $value;
797     }
798
799     ##
800     # Key exists, but content is null
801     ##
802     return;
803 }
804
805 sub get_bucket_value {
806     ##
807     # Fetch single value given tag and MD5 digested key.
808     ##
809     my $self = shift;
810     my ($tag, $md5, $orig_key) = @_;
811
812     #ACID - This is a read. Can find exact or HEAD
813     my ($keyloc, $offset) = $self->_find_in_buckets( $tag, $md5 );
814
815     if ( !$keyloc ) {
816         #XXX Need to use real key
817 #        $self->add_bucket( $tag, $md5, $orig_key, undef, $orig_key );
818 #        return;
819     }
820 #    elsif ( !$is_deleted ) {
821     else {
822         my $keytag = $self->load_tag( $keyloc );
823         my ($subloc, $is_deleted) = $self->find_keyloc( $keytag );
824         if (!$subloc && !$is_deleted) {
825             ($subloc, $is_deleted) = $self->find_keyloc( $keytag, 0 );
826         }
827         if ( $subloc && !$is_deleted ) {
828             return $self->read_from_loc( $subloc, $orig_key );
829         }
830     }
831
832     return;
833 }
834
835 sub delete_bucket {
836     ##
837     # Delete single key/value pair given tag and MD5 digested key.
838     ##
839     my $self = shift;
840     my ($tag, $md5, $orig_key) = @_;
841
842     #ACID - Although this is a mutation, we must find any transaction.
843     # This is because we need to mark something as deleted that is in the HEAD.
844     my ($keyloc, $offset) = $self->_find_in_buckets( $tag, $md5 );
845
846     return if !$keyloc;
847
848     my $storage = $self->_storage;
849
850     my @transactions;
851     if ( $storage->transaction_id == 0 ) {
852         @transactions = $storage->current_transactions;
853     }
854
855     if ( $storage->transaction_id == 0 ) {
856         my $keytag = $self->load_tag( $keyloc );
857
858         my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
859         return if !$subloc || $is_deleted;
860
861         my $value = $self->read_from_loc( $subloc, $orig_key );
862
863         my $size = $self->_length_needed( $value, $orig_key );
864
865         for my $trans_id ( @transactions ) {
866             my ($loc, $is_deleted, $offset2) = $self->find_keyloc( $keytag, $trans_id );
867             unless ($loc) {
868                 my $location2 = $storage->request_space( $size );
869                 $storage->print_at( $keytag->{offset} + $offset2,
870                     pack($self->{long_pack}, $location2 ),
871                     pack( 'C C', $trans_id, 0 ),
872                 );
873                 $self->_write_value( $location2, $orig_key, $value, $orig_key );
874             }
875         }
876
877         $keytag = $self->load_tag( $keyloc );
878         ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
879         $storage->print_at( $keytag->{offset} + $offset,
880             substr( $keytag->{content}, $offset + $self->{key_size} ),
881             chr(0) x $self->{key_size},
882         );
883     }
884     else {
885         my $keytag = $self->load_tag( $keyloc );
886
887         my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
888
889         $storage->print_at( $keytag->{offset} + $offset,
890             pack($self->{long_pack}, 0 ),
891             pack( 'C C', $storage->transaction_id, 1 ),
892         );
893     }
894
895     return 1;
896 }
897
898 sub bucket_exists {
899     ##
900     # Check existence of single key given tag and MD5 digested key.
901     ##
902     my $self = shift;
903     my ($tag, $md5) = @_;
904
905     #ACID - This is a read. Can find exact or HEAD
906     my ($keyloc) = $self->_find_in_buckets( $tag, $md5 );
907     my $keytag = $self->load_tag( $keyloc );
908     my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
909     if ( !$subloc && !$is_deleted ) {
910         ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag, 0 );
911     }
912     return ($subloc && !$is_deleted) && 1;
913 }
914
915 sub find_blist {
916     ##
917     # Locate offset for bucket list, given digested key
918     ##
919     my $self = shift;
920     my ($offset, $md5, $args) = @_;
921     $args = {} unless $args;
922
923     ##
924     # Locate offset for bucket list using digest index system
925     ##
926     my $tag = $self->load_tag( $offset )
927         or $self->_throw_error( "INTERNAL ERROR - Cannot find tag" );
928
929     #XXX What happens when $ch >= $self->{hash_size} ??
930     for (my $ch = 0; $tag->{signature} ne SIG_BLIST; $ch++) {
931         my $num = ord substr($md5, $ch, 1);
932
933         my $ref_loc = $tag->{offset} + ($num * $self->{long_size});
934         $tag = $self->index_lookup( $tag, $num );
935
936         if (!$tag) {
937             return if !$args->{create};
938
939             my $loc = $self->_storage->request_space(
940                 $self->tag_size( $self->{bucket_list_size} ),
941             );
942
943             $self->_storage->print_at( $ref_loc, pack($self->{long_pack}, $loc) );
944
945             $tag = $self->write_tag(
946                 $loc, SIG_BLIST,
947                 chr(0)x$self->{bucket_list_size},
948             );
949
950             $tag->{ref_loc} = $ref_loc;
951             $tag->{ch} = $ch;
952
953             last;
954         }
955
956         $tag->{ch} = $ch;
957         $tag->{ref_loc} = $ref_loc;
958     }
959
960     return $tag;
961 }
962
963 sub index_lookup {
964     ##
965     # Given index tag, lookup single entry in index and return .
966     ##
967     my $self = shift;
968     my ($tag, $index) = @_;
969
970     my $location = unpack(
971         $self->{long_pack},
972         substr(
973             $tag->{content},
974             $index * $self->{long_size},
975             $self->{long_size},
976         ),
977     );
978
979     if (!$location) { return; }
980
981     return $self->load_tag( $location );
982 }
983
984 sub traverse_index {
985     ##
986     # Scan index and recursively step into deeper levels, looking for next key.
987     ##
988     my $self = shift;
989     my ($xxxx, $offset, $ch, $force_return_next) = @_;
990
991     my $tag = $self->load_tag( $offset );
992
993     if ($tag->{signature} ne SIG_BLIST) {
994         my $start = $xxxx->{return_next} ? 0 : ord(substr($xxxx->{prev_md5}, $ch, 1));
995
996         for (my $idx = $start; $idx < $self->{hash_chars_used}; $idx++) {
997             my $subloc = unpack(
998                 $self->{long_pack},
999                 substr(
1000                     $tag->{content},
1001                     $idx * $self->{long_size},
1002                     $self->{long_size},
1003                 ),
1004             );
1005
1006             if ($subloc) {
1007                 my $result = $self->traverse_index(
1008                     $xxxx, $subloc, $ch + 1, $force_return_next,
1009                 );
1010
1011                 if (defined $result) { return $result; }
1012             }
1013         } # index loop
1014
1015         $xxxx->{return_next} = 1;
1016     }
1017     # This is the bucket list
1018     else {
1019         my $keys = $tag->{content};
1020         if ($force_return_next) { $xxxx->{return_next} = 1; }
1021
1022         ##
1023         # Iterate through buckets, looking for a key match
1024         ##
1025         my $transaction_id = $self->_storage->transaction_id;
1026         for (my $i = 0; $i < $self->{max_buckets}; $i++) {
1027             my ($key, $keyloc) = $self->_get_key_subloc( $keys, $i );
1028
1029             # End of bucket list -- return to outer loop
1030             if (!$keyloc) {
1031                 $xxxx->{return_next} = 1;
1032                 last;
1033             }
1034             # Located previous key -- return next one found
1035             elsif ($key eq $xxxx->{prev_md5}) {
1036                 $xxxx->{return_next} = 1;
1037                 next;
1038             }
1039             # Seek to bucket location and skip over signature
1040             elsif ($xxxx->{return_next}) {
1041                 my $storage = $self->_storage;
1042
1043                 my $keytag = $self->load_tag( $keyloc );
1044                 my ($subloc, $is_deleted) = $self->find_keyloc( $keytag );
1045                 if ( $subloc == 0 && !$is_deleted ) {
1046                     ($subloc, $is_deleted) = $self->find_keyloc( $keytag, 0 );
1047                 }
1048                 next if $is_deleted;
1049
1050                 # Skip over value to get to plain key
1051                 my $sig = $storage->read_at( $subloc, SIG_SIZE );
1052
1053                 my $size = $storage->read_at( undef, $self->{data_size} );
1054                 $size = unpack($self->{data_pack}, $size);
1055                 if ($size) { $storage->increment_pointer( $size ); }
1056
1057                 # Read in plain key and return as scalar
1058                 $size = $storage->read_at( undef, $self->{data_size} );
1059                 $size = unpack($self->{data_pack}, $size);
1060
1061                 my $plain_key;
1062                 if ($size) { $plain_key = $storage->read_at( undef, $size); }
1063                 return $plain_key;
1064             }
1065         }
1066
1067         $xxxx->{return_next} = 1;
1068     }
1069
1070     return;
1071 }
1072
1073 # Utilities
1074
1075 sub _get_key_subloc {
1076     my $self = shift;
1077     my ($keys, $idx) = @_;
1078
1079     return unpack(
1080         # This is 'a', not 'A'. Please read the pack() documentation for the
1081         # difference between the two and why it's important.
1082         "a$self->{hash_size} $self->{long_pack}",
1083         substr(
1084             $keys,
1085             ($idx * $self->{bucket_size}),
1086             $self->{bucket_size},
1087         ),
1088     );
1089 }
1090
1091 sub _find_in_buckets {
1092     my $self = shift;
1093     my ($tag, $md5) = @_;
1094
1095     BUCKET:
1096     for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
1097         my ($key, $subloc) = $self->_get_key_subloc(
1098             $tag->{content}, $i,
1099         );
1100
1101         next BUCKET if $subloc && $key ne $md5;
1102         return( $subloc, $i * $self->{bucket_size} );
1103     }
1104
1105     return;
1106 }
1107
1108 sub _release_space {
1109     my $self = shift;
1110     my ($size, $loc) = @_;
1111
1112     my $next_loc = 0;
1113
1114     $self->_storage->print_at( $loc,
1115         SIG_FREE, 
1116         pack($self->{long_pack}, $size ),
1117         pack($self->{long_pack}, $next_loc ),
1118     );
1119
1120     return;
1121 }
1122
1123 sub _throw_error {
1124     die "DBM::Deep: $_[1]\n";
1125 }
1126
1127 sub _get_dbm_object {
1128     my $item = shift;
1129
1130     my $obj = eval {
1131         local $SIG{__DIE__};
1132         if ($item->isa( 'DBM::Deep' )) {
1133             return $item;
1134         }
1135         return;
1136     };
1137     return $obj if $obj;
1138
1139     my $r = Scalar::Util::reftype( $item ) || '';
1140     if ( $r eq 'HASH' ) {
1141         my $obj = eval {
1142             local $SIG{__DIE__};
1143             my $obj = tied(%$item);
1144             if ($obj->isa( 'DBM::Deep' )) {
1145                 return $obj;
1146             }
1147             return;
1148         };
1149         return $obj if $obj;
1150     }
1151     elsif ( $r eq 'ARRAY' ) {
1152         my $obj = eval {
1153             local $SIG{__DIE__};
1154             my $obj = tied(@$item);
1155             if ($obj->isa( 'DBM::Deep' )) {
1156                 return $obj;
1157             }
1158             return;
1159         };
1160         return $obj if $obj;
1161     }
1162
1163     return;
1164 }
1165
1166 sub _length_needed {
1167     my $self = shift;
1168     my ($value, $key) = @_;
1169
1170     my $is_dbm_deep = eval {
1171         local $SIG{'__DIE__'};
1172         $value->isa( 'DBM::Deep' );
1173     };
1174
1175     my $len = SIG_SIZE
1176             + $self->{data_size} # size for value
1177             + $self->{data_size} # size for key
1178             + length( $key );    # length of key
1179
1180     if ( $is_dbm_deep && $value->_storage eq $self->_storage ) {
1181         # long_size is for the internal reference
1182         return $len + $self->{long_size};
1183     }
1184
1185     if ( $self->_storage->{autobless} ) {
1186         # This is for the bit saying whether or not this thing is blessed.
1187         $len += 1;
1188     }
1189
1190     my $r = Scalar::Util::reftype( $value ) || '';
1191     unless ( $r eq 'HASH' || $r eq 'ARRAY' ) {
1192         if ( defined $value ) {
1193             $len += length( $value );
1194         }
1195         return $len;
1196     }
1197
1198     $len += $self->{index_size};
1199
1200     # if autobless is enabled, must also take into consideration
1201     # the class name as it is stored after the key.
1202     if ( $self->_storage->{autobless} ) {
1203         my $c = Scalar::Util::blessed($value);
1204         if ( defined $c && !$is_dbm_deep ) {
1205             $len += $self->{data_size} + length($c);
1206         }
1207     }
1208
1209     return $len;
1210 }
1211
1212 1;
1213 __END__