r14236@Rob-Kinyons-PowerBook: rob | 2006-06-14 23:07:31 -0400
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 use 5.6.0;
4
5 use strict;
6
7 our $VERSION = q(0.99_03);
8
9 use Fcntl qw( :DEFAULT :flock );
10 use Scalar::Util ();
11
12 # File-wide notes:
13 # * To add to bucket_size, make sure you modify the following:
14 #   - calculate_sizes()
15 #   - _get_key_subloc()
16 #   - add_bucket() - where the buckets are printed
17 #
18 # * Every method in here assumes that the _storage has been appropriately
19 #   safeguarded. This can be anything from flock() to some sort of manual
20 #   mutex. But, it's the caller's responsability to make sure that this has
21 #   been done.
22
23 ##
24 # Setup file and tag signatures.  These should never change.
25 ##
26 sub SIG_FILE     () { 'DPDB' }
27 sub SIG_HEADER   () { 'h'    }
28 sub SIG_INTERNAL () { 'i'    }
29 sub SIG_HASH     () { 'H'    }
30 sub SIG_ARRAY    () { 'A'    }
31 sub SIG_NULL     () { 'N'    }
32 sub SIG_DATA     () { 'D'    }
33 sub SIG_INDEX    () { 'I'    }
34 sub SIG_BLIST    () { 'B'    }
35 sub SIG_FREE     () { 'F'    }
36 sub SIG_KEYS     () { 'K'    }
37 sub SIG_SIZE     () {  1     }
38
39 # This is the transaction ID for the HEAD
40 sub HEAD () { 0 }
41
42 ################################################################################
43 #
44 # This is new code. It is a complete rewrite of the engine based on a new API
45 #
46 ################################################################################
47
48 sub read_value {
49     my $self = shift;
50     my ($trans_id, $offset, $key, $orig_key) = @_;
51
52     my $dig_key = $self->_apply_digest( $key );
53     my $tag = $self->find_blist( $offset, $dig_key ) or return;
54     return $self->get_bucket_value( $tag, $dig_key, $orig_key );
55 }
56
57 sub key_exists {
58     my $self = shift;
59     my ($trans_id, $offset, $key) = @_;
60
61     my $dig_key = $self->_apply_digest( $key );
62     # exists() returns the empty string, not undef
63     my $tag = $self->find_blist( $offset, $dig_key ) or return '';
64     return $self->bucket_exists( $tag, $dig_key, $key );
65 }
66
67 sub get_next_key {
68     my $self = shift;
69     my ($trans_id, $offset) = @_;
70
71     # If the previous key was not specifed, start at the top and
72     # return the first one found.
73     my $temp;
74     if ( @_ > 2 ) {
75         $temp = {
76             prev_md5    => $self->_apply_digest($_[2]),
77             return_next => 0,
78         };
79     }
80     else {
81         $temp = {
82             prev_md5    => chr(0) x $self->{hash_size},
83             return_next => 1,
84         };
85     }
86
87     return $self->traverse_index( $temp, $offset, 0 );
88 }
89
90 sub delete_key {
91     my $self = shift;
92     my ($trans_id, $offset, $key, $orig_key) = @_;
93
94     my $dig_key = $self->_apply_digest( $key );
95     my $tag = $self->find_blist( $offset, $dig_key ) or return;
96     my $value = $self->get_bucket_value( $tag, $dig_key, $orig_key );
97     $self->delete_bucket( $tag, $dig_key, $orig_key );
98     return $value;
99 }
100
101 sub write_value {
102     my $self = shift;
103     my ($trans_id, $offset, $key, $value, $orig_key) = @_;
104
105     my $dig_key = $self->_apply_digest( $key );
106     my $tag = $self->find_blist( $offset, $dig_key, { create => 1 } );
107     return $self->add_bucket( $tag, $dig_key, $key, $value, undef, $orig_key );
108 }
109
110 ################################################################################
111 #
112 # Below here is the old code. It will be folded into the code above as it can.
113 #
114 ################################################################################
115
116 sub new {
117     my $class = shift;
118     my ($args) = @_;
119
120     my $self = bless {
121         long_size => 4,
122         long_pack => 'N',
123         data_size => 4,
124         data_pack => 'N',
125
126         digest    => \&Digest::MD5::md5,
127         hash_size => 16, # In bytes
128
129         ##
130         # Number of buckets per blist before another level of indexing is
131         # done. Increase this value for slightly greater speed, but larger database
132         # files. DO NOT decrease this value below 16, due to risk of recursive
133         # reindex overrun.
134         ##
135         max_buckets => 16,
136
137         storage => undef,
138         obj     => undef,
139     }, $class;
140
141     if ( defined $args->{pack_size} ) {
142         if ( lc $args->{pack_size} eq 'small' ) {
143             $args->{long_size} = 2;
144             $args->{long_pack} = 'n';
145         }
146         elsif ( lc $args->{pack_size} eq 'medium' ) {
147             $args->{long_size} = 4;
148             $args->{long_pack} = 'N';
149         }
150         elsif ( lc $args->{pack_size} eq 'large' ) {
151             $args->{long_size} = 8;
152             $args->{long_pack} = 'Q';
153         }
154         else {
155             die "Unknown pack_size value: '$args->{pack_size}'\n";
156         }
157     }
158
159     # Grab the parameters we want to use
160     foreach my $param ( keys %$self ) {
161         next unless exists $args->{$param};
162         $self->{$param} = $args->{$param};
163     }
164     Scalar::Util::weaken( $self->{obj} ) if $self->{obj};
165
166     if ( $self->{max_buckets} < 16 ) {
167         warn "Floor of max_buckets is 16. Setting it to 16 from '$self->{max_buckets}'\n";
168         $self->{max_buckets} = 16;
169     }
170
171     return $self;
172 }
173
174 sub _storage { return $_[0]{storage} }
175
176 sub _apply_digest {
177     my $self = shift;
178     return $self->{digest}->(@_);
179 }
180
181 sub calculate_sizes {
182     my $self = shift;
183
184     # The 2**8 here indicates the number of different characters in the
185     # current hashing algorithm
186     #XXX Does this need to be updated with different hashing algorithms?
187     $self->{hash_chars_used}  = (2**8);
188     $self->{index_size}       = $self->{hash_chars_used} * $self->{long_size};
189
190     $self->{bucket_size}      = $self->{hash_size} + $self->{long_size} * 2;
191     $self->{bucket_list_size} = $self->{max_buckets} * $self->{bucket_size};
192
193     $self->{key_size}         = $self->{long_size} * 2;
194     $self->{keyloc_size}      = $self->{max_buckets} * $self->{key_size};
195
196     return;
197 }
198
199 sub write_file_header {
200     my $self = shift;
201
202     my $loc = $self->_storage->request_space( length( SIG_FILE ) + 33 );
203
204     $self->_storage->print_at( $loc,
205         SIG_FILE,
206         SIG_HEADER,
207         pack('N', 1),  # header version
208         pack('N', 24), # header size
209         pack('N4', 0, 0, 0, 0),  # currently running transaction IDs
210         pack('n', $self->{long_size}),
211         pack('A', $self->{long_pack}),
212         pack('n', $self->{data_size}),
213         pack('A', $self->{data_pack}),
214         pack('n', $self->{max_buckets}),
215     );
216
217     $self->_storage->set_transaction_offset( 13 );
218
219     return;
220 }
221
222 sub read_file_header {
223     my $self = shift;
224
225     my $buffer = $self->_storage->read_at( 0, length(SIG_FILE) + 9 );
226     return unless length($buffer);
227
228     my ($file_signature, $sig_header, $header_version, $size) = unpack(
229         'A4 A N N', $buffer
230     );
231
232     unless ( $file_signature eq SIG_FILE ) {
233         $self->_storage->close;
234         $self->_throw_error( "Signature not found -- file is not a Deep DB" );
235     }
236
237     unless ( $sig_header eq SIG_HEADER ) {
238         $self->_storage->close;
239         $self->_throw_error( "Old file version found." );
240     }
241
242     my $buffer2 = $self->_storage->read_at( undef, $size );
243     my ($a1, $a2, $a3, $a4, @values) = unpack( 'N4 n A n A n', $buffer2 );
244
245     $self->_storage->set_transaction_offset( 13 );
246
247     if ( @values < 5 || grep { !defined } @values ) {
248         $self->_storage->close;
249         $self->_throw_error("Corrupted file - bad header");
250     }
251
252     #XXX Add warnings if values weren't set right
253     @{$self}{qw(long_size long_pack data_size data_pack max_buckets)} = @values;
254
255     return length($buffer) + length($buffer2);
256 }
257
258 sub setup_fh {
259     my $self = shift;
260     my ($obj) = @_;
261
262     # Need to remove use of $fh here
263     my $fh = $self->_storage->{fh};
264     flock $fh, LOCK_EX;
265
266     #XXX The duplication of calculate_sizes needs to go away
267     unless ( $obj->{base_offset} ) {
268         my $bytes_read = $self->read_file_header;
269
270         $self->calculate_sizes;
271
272         ##
273         # File is empty -- write header and master index
274         ##
275         if (!$bytes_read) {
276             $self->_storage->audit( "# Database created on" );
277
278             $self->write_file_header;
279
280             $obj->{base_offset} = $self->_storage->request_space(
281                 $self->tag_size( $self->{index_size} ),
282             );
283
284             $self->write_tag(
285                 $obj->_base_offset, $obj->_type,
286                 chr(0)x$self->{index_size},
287             );
288
289             # Flush the filehandle
290             my $old_fh = select $fh;
291             my $old_af = $|; $| = 1; $| = $old_af;
292             select $old_fh;
293         }
294         else {
295             $obj->{base_offset} = $bytes_read;
296
297             ##
298             # Get our type from master index header
299             ##
300             my $tag = $self->load_tag($obj->_base_offset);
301             unless ( $tag ) {
302                 flock $fh, LOCK_UN;
303                 $self->_throw_error("Corrupted file, no master index record");
304             }
305
306             unless ($obj->_type eq $tag->{signature}) {
307                 flock $fh, LOCK_UN;
308                 $self->_throw_error("File type mismatch");
309             }
310         }
311     }
312     else {
313         $self->calculate_sizes;
314     }
315
316     #XXX We have to make sure we don't mess up when autoflush isn't turned on
317     $self->_storage->set_inode;
318
319     flock $fh, LOCK_UN;
320
321     return 1;
322 }
323
324 sub tag_size {
325     my $self = shift;
326     my ($size) = @_;
327     return SIG_SIZE + $self->{data_size} + $size;
328 }
329
330 sub write_tag {
331     ##
332     # Given offset, signature and content, create tag and write to disk
333     ##
334     my $self = shift;
335     my ($offset, $sig, $content) = @_;
336     my $size = length( $content );
337
338     $self->_storage->print_at(
339         $offset, 
340         $sig, pack($self->{data_pack}, $size), $content,
341     );
342
343     return unless defined $offset;
344
345     return {
346         signature => $sig,
347         #XXX Is this even used?
348         size      => $size,
349         offset    => $offset + SIG_SIZE + $self->{data_size},
350         content   => $content
351     };
352 }
353
354 sub load_tag {
355     ##
356     # Given offset, load single tag and return signature, size and data
357     ##
358     my $self = shift;
359     my ($offset) = @_;
360
361     my $storage = $self->_storage;
362
363     my ($sig, $size) = unpack(
364         "A $self->{data_pack}",
365         $storage->read_at( $offset, SIG_SIZE + $self->{data_size} ),
366     );
367
368     return {
369         signature => $sig,
370         size      => $size,   #XXX Is this even used?
371         start     => $offset,
372         offset    => $offset + SIG_SIZE + $self->{data_size},
373         content   => $storage->read_at( undef, $size ),
374     };
375 }
376
377 sub find_keyloc {
378     my $self = shift;
379     my ($tag, $transaction_id) = @_;
380     $transaction_id = $self->_storage->transaction_id
381         unless defined $transaction_id;
382
383     for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
384         my ($loc, $trans_id, $is_deleted) = unpack(
385             "$self->{long_pack} C C",
386             substr( $tag->{content}, $i * $self->{key_size}, $self->{key_size} ),
387         );
388
389         next if $loc != HEAD && $transaction_id != $trans_id;
390         return( $loc, $is_deleted, $i * $self->{key_size} );
391     }
392
393     return;
394 }
395
396 sub add_bucket {
397     ##
398     # Adds one key/value pair to bucket list, given offset, MD5 digest of key,
399     # plain (undigested) key and value.
400     ##
401     my $self = shift;
402     my ($tag, $md5, $plain_key, $value, $deleted, $orig_key) = @_;
403
404     # This verifies that only supported values will be stored.
405     {
406         my $r = Scalar::Util::reftype( $value );
407
408         last if !defined $r;
409         last if $r eq 'HASH';
410         last if $r eq 'ARRAY';
411
412         $self->_throw_error(
413             "Storage of references of type '$r' is not supported."
414         );
415     }
416
417     my $storage = $self->_storage;
418
419     #ACID - This is a mutation. Must only find the exact transaction
420     my ($keyloc, $offset) = $self->_find_in_buckets( $tag, $md5, 1 );
421
422     my @transactions;
423     if ( $storage->transaction_id == 0 ) {
424         @transactions = $storage->current_transactions;
425     }
426
427 #    $self->_release_space( $size, $subloc );
428 #XXX This needs updating to use _release_space
429
430     my $location;
431     my $size = $self->_length_needed( $value, $plain_key );
432
433     # Updating a known md5
434     if ( $keyloc ) {
435         my $keytag = $self->load_tag( $keyloc );
436         my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
437
438         if ( $subloc && !$is_deleted && @transactions ) {
439             my $old_value = $self->read_from_loc( $subloc, $orig_key );
440             my $old_size = $self->_length_needed( $old_value, $plain_key );
441
442             for my $trans_id ( @transactions ) {
443                 my ($loc, $is_deleted, $offset2) = $self->find_keyloc( $keytag, $trans_id );
444                 unless ($loc) {
445                     my $location2 = $storage->request_space( $old_size );
446                     $storage->print_at( $keytag->{offset} + $offset2,
447                         pack($self->{long_pack}, $location2 ),
448                         pack( 'C C', $trans_id, 0 ),
449                     );
450                     $self->_write_value( $location2, $plain_key, $old_value, $orig_key );
451                 }
452             }
453         }
454
455         $location = $self->_storage->request_space( $size );
456         #XXX This needs to be transactionally-aware in terms of which keytag->{offset} to use
457         $storage->print_at( $keytag->{offset} + $offset,
458             pack($self->{long_pack}, $location ),
459             pack( 'C C', $storage->transaction_id, 0 ),
460         );
461     }
462     # Adding a new md5
463     else {
464         my $keyloc = $storage->request_space( $self->tag_size( $self->{keyloc_size} ) );
465
466         # The bucket fit into list
467         if ( defined $offset ) {
468             $storage->print_at( $tag->{offset} + $offset,
469                 $md5, pack( $self->{long_pack}, $keyloc ),
470             );
471         }
472         # If bucket didn't fit into list, split into a new index level
473         else {
474             $self->split_index( $tag, $md5, $keyloc );
475         }
476
477         my $keytag = $self->write_tag(
478             $keyloc, SIG_KEYS, chr(0)x$self->{keyloc_size},
479         );
480
481         $location = $self->_storage->request_space( $size );
482         $storage->print_at( $keytag->{offset},
483             pack( $self->{long_pack}, $location ),
484             pack( 'C C', $storage->transaction_id, 0 ),
485         );
486
487         my $offset = 1;
488         for my $trans_id ( @transactions ) {
489             $storage->print_at( $keytag->{offset} + $self->{key_size} * $offset++,
490                 pack( $self->{long_pack}, 0 ),
491                 pack( 'C C', $trans_id, 1 ),
492             );
493         }
494     }
495
496     $self->_write_value( $location, $plain_key, $value, $orig_key );
497
498     return 1;
499 }
500
501 sub _write_value {
502     my $self = shift;
503     my ($key_loc, $location, $key, $value, $orig_key) = @_;
504
505     my $storage = $self->_storage;
506
507     my $dbm_deep_obj = _get_dbm_object( $value );
508     if ( $dbm_deep_obj && $dbm_deep_obj->_storage ne $storage ) {
509         $self->_throw_error( "Cannot cross-reference. Use export() instead" );
510     }
511
512     ##
513     # Write signature based on content type, set content length and write
514     # actual value.
515     ##
516     my $r = Scalar::Util::reftype( $value ) || '';
517     if ( $dbm_deep_obj ) {
518         $self->write_tag( $location, SIG_INTERNAL,pack($self->{long_pack}, $dbm_deep_obj->_base_offset) );
519     }
520     elsif ($r eq 'HASH') {
521         if ( !$dbm_deep_obj && tied %{$value} ) {
522             $self->_throw_error( "Cannot store something that is tied" );
523         }
524         $self->write_tag( $location, SIG_HASH, chr(0)x$self->{index_size} );
525     }
526     elsif ($r eq 'ARRAY') {
527         if ( !$dbm_deep_obj && tied @{$value} ) {
528             $self->_throw_error( "Cannot store something that is tied" );
529         }
530         $self->write_tag( $location, SIG_ARRAY, chr(0)x$self->{index_size} );
531     }
532     elsif (!defined($value)) {
533         $self->write_tag( $location, SIG_NULL, '' );
534     }
535     else {
536         $self->write_tag( $location, SIG_DATA, $value );
537     }
538
539     ##
540     # Plain key is stored AFTER value, as keys are typically fetched less often.
541     ##
542     $storage->print_at( undef, pack($self->{data_pack}, length($key)) . $key );
543
544     # Internal references don't care about autobless
545     return 1 if $dbm_deep_obj;
546
547     ##
548     # If value is blessed, preserve class name
549     ##
550     if ( $storage->{autobless} ) {
551         if ( defined( my $c = Scalar::Util::blessed($value) ) ) {
552             $storage->print_at( undef, chr(1), pack($self->{data_pack}, length($c)) . $c );
553         }
554         else {
555             $storage->print_at( undef, chr(0) );
556         }
557     }
558
559     ##
560     # Tie the passed in reference so that changes to it are reflected in the
561     # datafile. The use of $location as the base_offset will act as the
562     # the linkage between parent and child.
563     #
564     # The overall assignment is a hack around the fact that just tying doesn't
565     # store the values. This may not be the wrong thing to do.
566     ##
567     if ($r eq 'HASH') {
568         my %x = %$value;
569         tie %$value, 'DBM::Deep', {
570             base_offset => $key_loc,
571             storage     => $storage,
572             parent      => $self->{obj},
573             parent_key  => $orig_key,
574         };
575         %$value = %x;
576         bless $value, 'DBM::Deep::Hash' unless Scalar::Util::blessed( $value );
577     }
578     elsif ($r eq 'ARRAY') {
579         my @x = @$value;
580         tie @$value, 'DBM::Deep', {
581             base_offset => $key_loc,
582             storage     => $storage,
583             parent      => $self->{obj},
584             parent_key  => $orig_key,
585         };
586         @$value = @x;
587         bless $value, 'DBM::Deep::Array' unless Scalar::Util::blessed( $value );
588     }
589
590     return 1;
591 }
592
593 sub split_index {
594     my $self = shift;
595     my ($tag, $md5, $keyloc) = @_;
596
597     my $storage = $self->_storage;
598
599     my $loc = $storage->request_space(
600         $self->tag_size( $self->{index_size} ),
601     );
602
603     $storage->print_at( $tag->{ref_loc}, pack($self->{long_pack}, $loc) );
604
605     my $index_tag = $self->write_tag(
606         $loc, SIG_INDEX,
607         chr(0)x$self->{index_size},
608     );
609
610     my $keys = $tag->{content}
611              . $md5 . pack($self->{long_pack}, $keyloc);
612
613     my @newloc = ();
614     BUCKET:
615     # The <= here is deliberate - we have max_buckets+1 keys to iterate
616     # through, unlike every other loop that uses max_buckets as a stop.
617     for (my $i = 0; $i <= $self->{max_buckets}; $i++) {
618         my ($key, $old_subloc) = $self->_get_key_subloc( $keys, $i );
619
620         die "[INTERNAL ERROR]: No key in split_index()\n" unless $key;
621         die "[INTERNAL ERROR]: No subloc in split_index()\n" unless $old_subloc;
622
623         my $num = ord(substr($key, $tag->{ch} + 1, 1));
624
625         if ($newloc[$num]) {
626             my $subkeys = $storage->read_at( $newloc[$num], $self->{bucket_list_size} );
627
628             # This is looking for the first empty spot
629             my ($subloc, $offset) = $self->_find_in_buckets(
630                 { content => $subkeys }, '',
631             );
632
633             $storage->print_at(
634                 $newloc[$num] + $offset,
635                 $key, pack($self->{long_pack}, $old_subloc),
636             );
637
638             next;
639         }
640
641         my $loc = $storage->request_space(
642             $self->tag_size( $self->{bucket_list_size} ),
643         );
644
645         $storage->print_at(
646             $index_tag->{offset} + ($num * $self->{long_size}),
647             pack($self->{long_pack}, $loc),
648         );
649
650         my $blist_tag = $self->write_tag(
651             $loc, SIG_BLIST,
652             chr(0)x$self->{bucket_list_size},
653         );
654
655         $storage->print_at( $blist_tag->{offset}, $key . pack($self->{long_pack}, $old_subloc) );
656
657         $newloc[$num] = $blist_tag->{offset};
658     }
659
660     $self->_release_space(
661         $self->tag_size( $self->{bucket_list_size} ),
662         $tag->{start},
663     );
664
665     return 1;
666 }
667
668 sub read_from_loc {
669     my $self = shift;
670     my ($key_loc, $subloc, $orig_key) = @_;
671
672     my $storage = $self->_storage;
673
674     my $signature = $storage->read_at( $subloc, SIG_SIZE );
675
676     ##
677     # If value is a hash or array, return new DBM::Deep object with correct offset
678     ##
679     if (($signature eq SIG_HASH) || ($signature eq SIG_ARRAY)) {
680         #XXX This needs to be a singleton
681 #        my $new_obj;
682 #        my $is_autobless;
683 #        if ( $signature eq SIG_HASH ) {
684 #            $new_obj = {};
685 #            tie %$new_obj, 'DBM::Deep', {
686 #                base_offset => $subloc,
687 #                storage     => $self->_storage,
688 #                parent      => $self->{obj},
689 #                parent_key  => $orig_key,
690 #            };
691 #            $is_autobless = tied(%$new_obj)->_storage->{autobless};
692 #        }
693 #        else {
694 #            $new_obj = [];
695 #            tie @$new_obj, 'DBM::Deep', {
696 #                base_offset => $subloc,
697 #                storage     => $self->_storage,
698 #                parent      => $self->{obj},
699 #                parent_key  => $orig_key,
700 #            };
701 #            $is_autobless = tied(@$new_obj)->_storage->{autobless};
702 #        }
703 #
704 #        if ($is_autobless) {
705
706         my $new_obj = DBM::Deep->new({
707             type        => $signature,
708             base_offset => $key_loc,
709             storage     => $self->_storage,
710             parent      => $self->{obj},
711             parent_key  => $orig_key,
712         });
713
714         if ($new_obj->_storage->{autobless}) {
715             ##
716             # Skip over value and plain key to see if object needs
717             # to be re-blessed
718             ##
719             $storage->increment_pointer( $self->{data_size} + $self->{index_size} );
720
721             my $size = $storage->read_at( undef, $self->{data_size} );
722             $size = unpack($self->{data_pack}, $size);
723             if ($size) { $storage->increment_pointer( $size ); }
724
725             my $bless_bit = $storage->read_at( undef, 1 );
726             if ( ord($bless_bit) ) {
727                 my $size = unpack(
728                     $self->{data_pack},
729                     $storage->read_at( undef, $self->{data_size} ),
730                 );
731
732                 if ( $size ) {
733                     $new_obj = bless $new_obj, $storage->read_at( undef, $size );
734                 }
735             }
736         }
737
738         return $new_obj;
739     }
740     elsif ( $signature eq SIG_INTERNAL ) {
741         my $size = $storage->read_at( undef, $self->{data_size} );
742         $size = unpack($self->{data_pack}, $size);
743
744         if ( $size ) {
745             my $new_loc = $storage->read_at( undef, $size );
746             $new_loc = unpack( $self->{long_pack}, $new_loc ); 
747             return $self->read_from_loc( $key_loc, $new_loc, $orig_key );
748         }
749         else {
750             return;
751         }
752     }
753     ##
754     # Otherwise return actual value
755     ##
756     elsif ( $signature eq SIG_DATA ) {
757         my $size = $storage->read_at( undef, $self->{data_size} );
758         $size = unpack($self->{data_pack}, $size);
759
760         my $value = $size ? $storage->read_at( undef, $size ) : '';
761         return $value;
762     }
763
764     ##
765     # Key exists, but content is null
766     ##
767     return;
768 }
769
770 sub get_bucket_value {
771     ##
772     # Fetch single value given tag and MD5 digested key.
773     ##
774     my $self = shift;
775     my ($tag, $md5, $orig_key) = @_;
776
777     #ACID - This is a read. Can find exact or HEAD
778     my ($keyloc, $offset) = $self->_find_in_buckets( $tag, $md5 );
779
780     if ( !$keyloc ) {
781         #XXX Need to use real key
782 #        $self->add_bucket( $tag, $md5, $orig_key, undef, $orig_key );
783 #        return;
784     }
785 #    elsif ( !$is_deleted ) {
786     else {
787         my $keytag = $self->load_tag( $keyloc );
788         my ($subloc, $is_deleted) = $self->find_keyloc( $keytag );
789         if (!$subloc && !$is_deleted) {
790             ($subloc, $is_deleted) = $self->find_keyloc( $keytag, 0 );
791         }
792         if ( $subloc && !$is_deleted ) {
793             return $self->read_from_loc( $subloc, $orig_key );
794         }
795     }
796
797     return;
798 }
799
800 sub delete_bucket {
801     ##
802     # Delete single key/value pair given tag and MD5 digested key.
803     ##
804     my $self = shift;
805     my ($tag, $md5, $orig_key) = @_;
806
807     #ACID - Although this is a mutation, we must find any transaction.
808     # This is because we need to mark something as deleted that is in the HEAD.
809     my ($keyloc, $offset) = $self->_find_in_buckets( $tag, $md5 );
810
811     return if !$keyloc;
812
813     my $storage = $self->_storage;
814
815     my @transactions;
816     if ( $storage->transaction_id == 0 ) {
817         @transactions = $storage->current_transactions;
818     }
819
820     if ( $storage->transaction_id == 0 ) {
821         my $keytag = $self->load_tag( $keyloc );
822
823         my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
824         return if !$subloc || $is_deleted;
825
826         my $value = $self->read_from_loc( $subloc, $orig_key );
827
828         my $size = $self->_length_needed( $value, $orig_key );
829
830         for my $trans_id ( @transactions ) {
831             my ($loc, $is_deleted, $offset2) = $self->find_keyloc( $keytag, $trans_id );
832             unless ($loc) {
833                 my $location2 = $storage->request_space( $size );
834                 $storage->print_at( $keytag->{offset} + $offset2,
835                     pack($self->{long_pack}, $location2 ),
836                     pack( 'C C', $trans_id, 0 ),
837                 );
838                 $self->_write_value( $location2, $orig_key, $value, $orig_key );
839             }
840         }
841
842         $keytag = $self->load_tag( $keyloc );
843         ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
844         $storage->print_at( $keytag->{offset} + $offset,
845             substr( $keytag->{content}, $offset + $self->{key_size} ),
846             chr(0) x $self->{key_size},
847         );
848     }
849     else {
850         my $keytag = $self->load_tag( $keyloc );
851
852         my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
853
854         $storage->print_at( $keytag->{offset} + $offset,
855             pack($self->{long_pack}, 0 ),
856             pack( 'C C', $storage->transaction_id, 1 ),
857         );
858     }
859
860     return 1;
861 }
862
863 sub bucket_exists {
864     ##
865     # Check existence of single key given tag and MD5 digested key.
866     ##
867     my $self = shift;
868     my ($tag, $md5) = @_;
869
870     #ACID - This is a read. Can find exact or HEAD
871     my ($keyloc) = $self->_find_in_buckets( $tag, $md5 );
872     my $keytag = $self->load_tag( $keyloc );
873     my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
874     if ( !$subloc && !$is_deleted ) {
875         ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag, 0 );
876     }
877     return ($subloc && !$is_deleted) && 1;
878 }
879
880 sub find_blist {
881     ##
882     # Locate offset for bucket list, given digested key
883     ##
884     my $self = shift;
885     my ($offset, $md5, $args) = @_;
886     $args = {} unless $args;
887
888     ##
889     # Locate offset for bucket list using digest index system
890     ##
891     my $tag = $self->load_tag( $offset )
892         or $self->_throw_error( "INTERNAL ERROR - Cannot find tag" );
893
894     #XXX What happens when $ch >= $self->{hash_size} ??
895     for (my $ch = 0; $tag->{signature} ne SIG_BLIST; $ch++) {
896         my $num = ord substr($md5, $ch, 1);
897
898         my $ref_loc = $tag->{offset} + ($num * $self->{long_size});
899         $tag = $self->index_lookup( $tag, $num );
900
901         if (!$tag) {
902             return if !$args->{create};
903
904             my $loc = $self->_storage->request_space(
905                 $self->tag_size( $self->{bucket_list_size} ),
906             );
907
908             $self->_storage->print_at( $ref_loc, pack($self->{long_pack}, $loc) );
909
910             $tag = $self->write_tag(
911                 $loc, SIG_BLIST,
912                 chr(0)x$self->{bucket_list_size},
913             );
914
915             $tag->{ref_loc} = $ref_loc;
916             $tag->{ch} = $ch;
917
918             last;
919         }
920
921         $tag->{ch} = $ch;
922         $tag->{ref_loc} = $ref_loc;
923     }
924
925     return $tag;
926 }
927
928 sub index_lookup {
929     ##
930     # Given index tag, lookup single entry in index and return .
931     ##
932     my $self = shift;
933     my ($tag, $index) = @_;
934
935     my $location = unpack(
936         $self->{long_pack},
937         substr(
938             $tag->{content},
939             $index * $self->{long_size},
940             $self->{long_size},
941         ),
942     );
943
944     if (!$location) { return; }
945
946     return $self->load_tag( $location );
947 }
948
949 sub traverse_index {
950     ##
951     # Scan index and recursively step into deeper levels, looking for next key.
952     ##
953     my $self = shift;
954     my ($xxxx, $offset, $ch, $force_return_next) = @_;
955
956     my $tag = $self->load_tag( $offset );
957
958     if ($tag->{signature} ne SIG_BLIST) {
959         my $start = $xxxx->{return_next} ? 0 : ord(substr($xxxx->{prev_md5}, $ch, 1));
960
961         for (my $idx = $start; $idx < $self->{hash_chars_used}; $idx++) {
962             my $subloc = unpack(
963                 $self->{long_pack},
964                 substr(
965                     $tag->{content},
966                     $idx * $self->{long_size},
967                     $self->{long_size},
968                 ),
969             );
970
971             if ($subloc) {
972                 my $result = $self->traverse_index(
973                     $xxxx, $subloc, $ch + 1, $force_return_next,
974                 );
975
976                 if (defined $result) { return $result; }
977             }
978         } # index loop
979
980         $xxxx->{return_next} = 1;
981     }
982     # This is the bucket list
983     else {
984         my $keys = $tag->{content};
985         if ($force_return_next) { $xxxx->{return_next} = 1; }
986
987         ##
988         # Iterate through buckets, looking for a key match
989         ##
990         my $transaction_id = $self->_storage->transaction_id;
991         for (my $i = 0; $i < $self->{max_buckets}; $i++) {
992             my ($key, $keyloc) = $self->_get_key_subloc( $keys, $i );
993
994             # End of bucket list -- return to outer loop
995             if (!$keyloc) {
996                 $xxxx->{return_next} = 1;
997                 last;
998             }
999             # Located previous key -- return next one found
1000             elsif ($key eq $xxxx->{prev_md5}) {
1001                 $xxxx->{return_next} = 1;
1002                 next;
1003             }
1004             # Seek to bucket location and skip over signature
1005             elsif ($xxxx->{return_next}) {
1006                 my $storage = $self->_storage;
1007
1008                 my $keytag = $self->load_tag( $keyloc );
1009                 my ($subloc, $is_deleted) = $self->find_keyloc( $keytag );
1010                 if ( $subloc == 0 && !$is_deleted ) {
1011                     ($subloc, $is_deleted) = $self->find_keyloc( $keytag, 0 );
1012                 }
1013                 next if $is_deleted;
1014
1015                 # Skip over value to get to plain key
1016                 my $sig = $storage->read_at( $subloc, SIG_SIZE );
1017
1018                 my $size = $storage->read_at( undef, $self->{data_size} );
1019                 $size = unpack($self->{data_pack}, $size);
1020                 if ($size) { $storage->increment_pointer( $size ); }
1021
1022                 # Read in plain key and return as scalar
1023                 $size = $storage->read_at( undef, $self->{data_size} );
1024                 $size = unpack($self->{data_pack}, $size);
1025
1026                 my $plain_key;
1027                 if ($size) { $plain_key = $storage->read_at( undef, $size); }
1028                 return $plain_key;
1029             }
1030         }
1031
1032         $xxxx->{return_next} = 1;
1033     }
1034
1035     return;
1036 }
1037
1038 # Utilities
1039
1040 sub _get_key_subloc {
1041     my $self = shift;
1042     my ($keys, $idx) = @_;
1043
1044     return unpack(
1045         # This is 'a', not 'A'. Please read the pack() documentation for the
1046         # difference between the two and why it's important.
1047         "a$self->{hash_size} $self->{long_pack}",
1048         substr(
1049             $keys,
1050             ($idx * $self->{bucket_size}),
1051             $self->{bucket_size},
1052         ),
1053     );
1054 }
1055
1056 sub _find_in_buckets {
1057     my $self = shift;
1058     my ($tag, $md5) = @_;
1059
1060     BUCKET:
1061     for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
1062         my ($key, $subloc) = $self->_get_key_subloc(
1063             $tag->{content}, $i,
1064         );
1065
1066         next BUCKET if $subloc && $key ne $md5;
1067         return( $subloc, $i * $self->{bucket_size} );
1068     }
1069
1070     return;
1071 }
1072
1073 sub _release_space {
1074     my $self = shift;
1075     my ($size, $loc) = @_;
1076
1077     my $next_loc = 0;
1078
1079     $self->_storage->print_at( $loc,
1080         SIG_FREE, 
1081         pack($self->{long_pack}, $size ),
1082         pack($self->{long_pack}, $next_loc ),
1083     );
1084
1085     return;
1086 }
1087
1088 sub _throw_error {
1089     die "DBM::Deep: $_[1]\n";
1090 }
1091
1092 sub _get_dbm_object {
1093     my $item = shift;
1094
1095     my $obj = eval {
1096         local $SIG{__DIE__};
1097         if ($item->isa( 'DBM::Deep' )) {
1098             return $item;
1099         }
1100         return;
1101     };
1102     return $obj if $obj;
1103
1104     my $r = Scalar::Util::reftype( $item ) || '';
1105     if ( $r eq 'HASH' ) {
1106         my $obj = eval {
1107             local $SIG{__DIE__};
1108             my $obj = tied(%$item);
1109             if ($obj->isa( 'DBM::Deep' )) {
1110                 return $obj;
1111             }
1112             return;
1113         };
1114         return $obj if $obj;
1115     }
1116     elsif ( $r eq 'ARRAY' ) {
1117         my $obj = eval {
1118             local $SIG{__DIE__};
1119             my $obj = tied(@$item);
1120             if ($obj->isa( 'DBM::Deep' )) {
1121                 return $obj;
1122             }
1123             return;
1124         };
1125         return $obj if $obj;
1126     }
1127
1128     return;
1129 }
1130
1131 sub _length_needed {
1132     my $self = shift;
1133     my ($value, $key) = @_;
1134
1135     my $is_dbm_deep = eval {
1136         local $SIG{'__DIE__'};
1137         $value->isa( 'DBM::Deep' );
1138     };
1139
1140     my $len = SIG_SIZE
1141             + $self->{data_size} # size for value
1142             + $self->{data_size} # size for key
1143             + length( $key );    # length of key
1144
1145     if ( $is_dbm_deep && $value->_storage eq $self->_storage ) {
1146         # long_size is for the internal reference
1147         return $len + $self->{long_size};
1148     }
1149
1150     if ( $self->_storage->{autobless} ) {
1151         # This is for the bit saying whether or not this thing is blessed.
1152         $len += 1;
1153     }
1154
1155     my $r = Scalar::Util::reftype( $value ) || '';
1156     unless ( $r eq 'HASH' || $r eq 'ARRAY' ) {
1157         if ( defined $value ) {
1158             $len += length( $value );
1159         }
1160         return $len;
1161     }
1162
1163     $len += $self->{index_size};
1164
1165     # if autobless is enabled, must also take into consideration
1166     # the class name as it is stored after the key.
1167     if ( $self->_storage->{autobless} ) {
1168         my $c = Scalar::Util::blessed($value);
1169         if ( defined $c && !$is_dbm_deep ) {
1170             $len += $self->{data_size} + length($c);
1171         }
1172     }
1173
1174     return $len;
1175 }
1176
1177 1;
1178 __END__