e863c9ecc1837209f240d89a565779b70f75bfb9
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 use 5.6.0;
4
5 use strict;
6 use warnings;
7
8 our $VERSION = q(0.99_03);
9
10 use Fcntl qw( :DEFAULT :flock );
11 use Scalar::Util ();
12
13 # File-wide notes:
14 # * To add to bucket_size, make sure you modify the following:
15 #   - calculate_sizes()
16 #   - _get_key_subloc()
17 #   - add_bucket() - where the buckets are printed
18 #
19 # * Every method in here assumes that the _storage has been appropriately
20 #   safeguarded. This can be anything from flock() to some sort of manual
21 #   mutex. But, it's the caller's responsability to make sure that this has
22 #   been done.
23
24 ##
25 # Setup file and tag signatures.  These should never change.
26 ##
27 sub SIG_FILE     () { 'DPDB' }
28 sub SIG_HEADER   () { 'h'    }
29 sub SIG_INTERNAL () { 'i'    }
30 sub SIG_HASH     () { 'H'    }
31 sub SIG_ARRAY    () { 'A'    }
32 sub SIG_NULL     () { 'N'    }
33 sub SIG_DATA     () { 'D'    }
34 sub SIG_INDEX    () { 'I'    }
35 sub SIG_BLIST    () { 'B'    }
36 sub SIG_FREE     () { 'F'    }
37 sub SIG_KEYS     () { 'K'    }
38 sub SIG_SIZE     () {  1     }
39
40 ################################################################################
41 #
42 # This is new code. It is a complete rewrite of the engine based on a new API
43 #
44 ################################################################################
45
46 sub read_value {
47     my $self = shift;
48     my ($offset, $key, $orig_key) = @_;
49
50     my $dig_key = $self->apply_digest( $key );
51     my $tag = $self->find_blist( $offset, $dig_key ) or return;
52     return $self->get_bucket_value( $tag, $dig_key, $orig_key );
53 }
54
55 =pod
56 sub read_value {
57     my $self = shift;
58     my ($trans_id, $base_offset, $key) = @_;
59     
60     my ($_val_offset, $_is_del) = $self->_find_value_offset({
61         offset     => $base_offset,
62         trans_id   => $trans_id,
63         allow_head => 1,
64     });
65     die "Attempt to use a deleted value" if $_is_del;
66     die "Internal error!" if !$_val_offset;
67
68     my ($key_offset) = $self->_find_key_offset({
69         offset  => $_val_offset,
70         key_md5 => $self->_apply_digest( $key ),
71         create  => 0,
72     });
73     return if !$key_offset;
74
75     my ($val_offset, $is_del) = $self->_find_value_offset({
76         offset     => $key_offset,
77         trans_id   => $trans_id,
78         allow_head => 1,
79     });
80     return if $is_del;
81     die "Internal error!" if !$val_offset;
82
83     return $self->_read_value({
84         offset => $val_offset,
85     });
86 }
87 =cut
88
89 sub key_exists {
90     my $self = shift;
91     my ($offset, $key) = @_;
92
93     my $dig_key = $self->apply_digest( $key );
94     # exists() returns the empty string, not undef
95     my $tag = $self->find_blist( $offset, $dig_key ) or return '';
96     return $self->bucket_exists( $tag, $dig_key, $key );
97 }
98
99 =pod
100 sub key_exists {
101     my $self = shift;
102     my ($trans_id, $base_offset, $key) = @_;
103     
104     my ($_val_offset, $_is_del) = $self->_find_value_offset({
105         offset     => $base_offset,
106         trans_id   => $trans_id,
107         allow_head => 1,
108     });
109     die "Attempt to use a deleted value" if $_is_del;
110     die "Internal error!" if !$_val_offset;
111
112     my ($key_offset) = $self->_find_key_offset({
113         offset  => $_val_offset,
114         key_md5 => $self->_apply_digest( $key ),
115         create  => 0,
116     });
117     return if !$key_offset;
118
119     my ($val_offset, $is_del) = $self->_find_value_offset({
120         offset     => $key_offset,
121         trans_id   => $trans_id,
122         allow_head => 1,
123     });
124
125     return 1 if $is_del;
126
127     die "Internal error!" if !$_val_offset;
128     return '';
129 }
130 =cut
131
132 sub get_next_key {
133     my $self = shift;
134     my ($offset) = @_;
135
136     # If the previous key was not specifed, start at the top and
137     # return the first one found.
138     my $temp;
139     if ( @_ > 1 ) {
140         $temp = {
141             prev_md5    => $self->apply_digest($_[1]),
142             return_next => 0,
143         };
144     }
145     else {
146         $temp = {
147             prev_md5    => chr(0) x $self->{hash_size},
148             return_next => 1,
149         };
150     }
151
152     return $self->traverse_index( $temp, $offset, 0 );
153 }
154
155 sub delete_key {
156     my $self = shift;
157     my ($offset, $key, $orig_key) = @_;
158
159     my $dig_key = $self->apply_digest( $key );
160     my $tag = $self->find_blist( $offset, $dig_key ) or return;
161     my $value = $self->get_bucket_value( $tag, $dig_key, $orig_key );
162     $self->delete_bucket( $tag, $dig_key, $orig_key );
163     return $value;
164 }
165
166 =pod
167 sub delete_key {
168     my $self = shift;
169     my ($trans_id, $base_offset, $key) = @_;
170
171     my ($_val_offset, $_is_del) = $self->_find_value_offset({
172         offset     => $base_offset,
173         trans_id   => $trans_id,
174         allow_head => 1,
175     });
176     die "Attempt to use a deleted value" if $_is_del;
177     die "Internal error!" if !$_val_offset;
178
179     my ($key_offset) = $self->_find_key_offset({
180         offset  => $_val_offset,
181         key_md5 => $self->_apply_digest( $key ),
182         create  => 0,
183     });
184     return if !$key_offset;
185
186     if ( $trans_id ) {
187         $self->_mark_as_deleted({
188             offset   => $key_offset,
189             trans_id => $trans_id,
190         });
191     }
192     else {
193         my $value = $self->read_value( $trans_id, $base_offset, $key );
194         if ( @transactions ) {
195             foreach my $other_trans_id ( @transactions ) {
196                 #XXX Finish this!
197                 # next if the $trans_id has an entry in the keyloc
198                 # store $value for $other_trans_id
199             }
200         }
201         else {
202             $self->_remove_key_offset({
203                 offset  => $_val_offset,
204                 key_md5 => $self->_apply_digest( $key ),
205             });
206         }
207     }
208 }
209 =cut
210
211 sub write_value {
212     my $self = shift;
213     my ($offset, $key, $value, $orig_key) = @_;
214
215     my $dig_key = $self->apply_digest( $key );
216     my $tag = $self->find_blist( $offset, $dig_key, { create => 1 } );
217     return $self->add_bucket( $tag, $dig_key, $key, $value, undef, $orig_key );
218 }
219
220 =pod
221 sub write_value {
222     my $self = shift;
223     my ($trans_id, $base_offset, $key) = @_;
224
225     my ($_val_offset, $_is_del) = $self->_find_value_offset({
226         offset     => $base_offset,
227         trans_id   => $trans_id,
228         allow_head => 1,
229     });
230     die "Attempt to use a deleted value" if $_is_del;
231     die "Internal error!" if !$_val_offset;
232
233     my ($key_offset, $is_new) = $self->_find_key_offset({
234         offset  => $_val_offset,
235         key_md5 => $self->_apply_digest( $key ),
236         create  => 1,
237     });
238     die "Cannot find/create new key offset!" if !$key_offset;
239
240
241 }
242 =cut
243
244 ################################################################################
245 #
246 # Below here is the old code. It will be folded into the code above as it can.
247 #
248 ################################################################################
249
250 sub new {
251     my $class = shift;
252     my ($args) = @_;
253
254     my $self = bless {
255         long_size => 4,
256         long_pack => 'N',
257         data_size => 4,
258         data_pack => 'N',
259
260         digest    => \&Digest::MD5::md5,
261         hash_size => 16, # In bytes
262
263         ##
264         # Number of buckets per blist before another level of indexing is
265         # done. Increase this value for slightly greater speed, but larger database
266         # files. DO NOT decrease this value below 16, due to risk of recursive
267         # reindex overrun.
268         ##
269         max_buckets => 16,
270
271         storage => undef,
272         obj     => undef,
273     }, $class;
274
275     if ( defined $args->{pack_size} ) {
276         if ( lc $args->{pack_size} eq 'small' ) {
277             $args->{long_size} = 2;
278             $args->{long_pack} = 'n';
279         }
280         elsif ( lc $args->{pack_size} eq 'medium' ) {
281             $args->{long_size} = 4;
282             $args->{long_pack} = 'N';
283         }
284         elsif ( lc $args->{pack_size} eq 'large' ) {
285             $args->{long_size} = 8;
286             $args->{long_pack} = 'Q';
287         }
288         else {
289             die "Unknown pack_size value: '$args->{pack_size}'\n";
290         }
291     }
292
293     # Grab the parameters we want to use
294     foreach my $param ( keys %$self ) {
295         next unless exists $args->{$param};
296         $self->{$param} = $args->{$param};
297     }
298     Scalar::Util::weaken( $self->{obj} ) if $self->{obj};
299
300     if ( $self->{max_buckets} < 16 ) {
301         warn "Floor of max_buckets is 16. Setting it to 16 from '$self->{max_buckets}'\n";
302         $self->{max_buckets} = 16;
303     }
304
305     return $self;
306 }
307
308 sub _storage { return $_[0]{storage} }
309
310 sub apply_digest {
311     my $self = shift;
312     return $self->{digest}->(@_);
313 }
314
315 sub calculate_sizes {
316     my $self = shift;
317
318     # The 2**8 here indicates the number of different characters in the
319     # current hashing algorithm
320     #XXX Does this need to be updated with different hashing algorithms?
321     $self->{hash_chars_used}  = (2**8);
322     $self->{index_size}       = $self->{hash_chars_used} * $self->{long_size};
323
324     $self->{bucket_size}      = $self->{hash_size} + $self->{long_size} * 2;
325     $self->{bucket_list_size} = $self->{max_buckets} * $self->{bucket_size};
326
327     $self->{key_size}         = $self->{long_size} * 2;
328     $self->{keyloc_size}      = $self->{max_buckets} * $self->{key_size};
329
330     return;
331 }
332
333 sub write_file_header {
334     my $self = shift;
335
336     my $loc = $self->_storage->request_space( length( SIG_FILE ) + 33 );
337
338     $self->_storage->print_at( $loc,
339         SIG_FILE,
340         SIG_HEADER,
341         pack('N', 1),  # header version
342         pack('N', 24), # header size
343         pack('N4', 0, 0, 0, 0),  # currently running transaction IDs
344         pack('n', $self->{long_size}),
345         pack('A', $self->{long_pack}),
346         pack('n', $self->{data_size}),
347         pack('A', $self->{data_pack}),
348         pack('n', $self->{max_buckets}),
349     );
350
351     $self->_storage->set_transaction_offset( 13 );
352
353     return;
354 }
355
356 sub read_file_header {
357     my $self = shift;
358
359     my $buffer = $self->_storage->read_at( 0, length(SIG_FILE) + 9 );
360     return unless length($buffer);
361
362     my ($file_signature, $sig_header, $header_version, $size) = unpack(
363         'A4 A N N', $buffer
364     );
365
366     unless ( $file_signature eq SIG_FILE ) {
367         $self->_storage->close;
368         $self->_throw_error( "Signature not found -- file is not a Deep DB" );
369     }
370
371     unless ( $sig_header eq SIG_HEADER ) {
372         $self->_storage->close;
373         $self->_throw_error( "Old file version found." );
374     }
375
376     my $buffer2 = $self->_storage->read_at( undef, $size );
377     my ($a1, $a2, $a3, $a4, @values) = unpack( 'N4 n A n A n', $buffer2 );
378
379     $self->_storage->set_transaction_offset( 13 );
380
381     if ( @values < 5 || grep { !defined } @values ) {
382         $self->_storage->close;
383         $self->_throw_error("Corrupted file - bad header");
384     }
385
386     #XXX Add warnings if values weren't set right
387     @{$self}{qw(long_size long_pack data_size data_pack max_buckets)} = @values;
388
389     return length($buffer) + length($buffer2);
390 }
391
392 sub setup_fh {
393     my $self = shift;
394     my ($obj) = @_;
395
396     # Need to remove use of $fh here
397     my $fh = $self->_storage->{fh};
398     flock $fh, LOCK_EX;
399
400     #XXX The duplication of calculate_sizes needs to go away
401     unless ( $obj->{base_offset} ) {
402         my $bytes_read = $self->read_file_header;
403
404         $self->calculate_sizes;
405
406         ##
407         # File is empty -- write header and master index
408         ##
409         if (!$bytes_read) {
410             $self->_storage->audit( "# Database created on" );
411
412             $self->write_file_header;
413
414             $obj->{base_offset} = $self->_storage->request_space(
415                 $self->tag_size( $self->{index_size} ),
416             );
417
418             $self->write_tag(
419                 $obj->_base_offset, $obj->_type,
420                 chr(0)x$self->{index_size},
421             );
422
423             # Flush the filehandle
424             my $old_fh = select $fh;
425             my $old_af = $|; $| = 1; $| = $old_af;
426             select $old_fh;
427         }
428         else {
429             $obj->{base_offset} = $bytes_read;
430
431             ##
432             # Get our type from master index header
433             ##
434             my $tag = $self->load_tag($obj->_base_offset);
435             unless ( $tag ) {
436                 flock $fh, LOCK_UN;
437                 $self->_throw_error("Corrupted file, no master index record");
438             }
439
440             unless ($obj->_type eq $tag->{signature}) {
441                 flock $fh, LOCK_UN;
442                 $self->_throw_error("File type mismatch");
443             }
444         }
445     }
446     else {
447         $self->calculate_sizes;
448     }
449
450     #XXX We have to make sure we don't mess up when autoflush isn't turned on
451     $self->_storage->set_inode;
452
453     flock $fh, LOCK_UN;
454
455     return 1;
456 }
457
458 sub tag_size {
459     my $self = shift;
460     my ($size) = @_;
461     return SIG_SIZE + $self->{data_size} + $size;
462 }
463
464 sub write_tag {
465     ##
466     # Given offset, signature and content, create tag and write to disk
467     ##
468     my $self = shift;
469     my ($offset, $sig, $content) = @_;
470     my $size = length( $content );
471
472     $self->_storage->print_at(
473         $offset, 
474         $sig, pack($self->{data_pack}, $size), $content,
475     );
476
477     return unless defined $offset;
478
479     return {
480         signature => $sig,
481         #XXX Is this even used?
482         size      => $size,
483         offset    => $offset + SIG_SIZE + $self->{data_size},
484         content   => $content
485     };
486 }
487
488 sub load_tag {
489     ##
490     # Given offset, load single tag and return signature, size and data
491     ##
492     my $self = shift;
493     my ($offset) = @_;
494
495     my $storage = $self->_storage;
496
497     my ($sig, $size) = unpack(
498         "A $self->{data_pack}",
499         $storage->read_at( $offset, SIG_SIZE + $self->{data_size} ),
500     );
501
502     return {
503         signature => $sig,
504         size      => $size,   #XXX Is this even used?
505         offset    => $offset + SIG_SIZE + $self->{data_size},
506         content   => $storage->read_at( undef, $size ),
507     };
508 }
509
510 sub find_keyloc {
511     my $self = shift;
512     my ($tag, $transaction_id) = @_;
513     $transaction_id = $self->_storage->transaction_id
514         unless defined $transaction_id;
515
516     for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
517         my ($loc, $trans_id, $is_deleted) = unpack(
518             "$self->{long_pack} C C",
519             substr( $tag->{content}, $i * $self->{key_size}, $self->{key_size} ),
520         );
521
522         if ( $loc == 0 ) {
523             return ( $loc, $is_deleted, $i * $self->{key_size} );
524         }
525
526         next if $transaction_id != $trans_id;
527
528         return ( $loc, $is_deleted, $i * $self->{key_size} );
529     }
530
531     return;
532 }
533
534 sub add_bucket {
535     ##
536     # Adds one key/value pair to bucket list, given offset, MD5 digest of key,
537     # plain (undigested) key and value.
538     ##
539     my $self = shift;
540     my ($tag, $md5, $plain_key, $value, $deleted, $orig_key) = @_;
541
542     # This verifies that only supported values will be stored.
543     {
544         my $r = Scalar::Util::reftype( $value );
545
546         last if !defined $r;
547         last if $r eq 'HASH';
548         last if $r eq 'ARRAY';
549
550         $self->_throw_error(
551             "Storage of references of type '$r' is not supported."
552         );
553     }
554
555     my $storage = $self->_storage;
556
557     #ACID - This is a mutation. Must only find the exact transaction
558     my ($keyloc, $offset) = $self->_find_in_buckets( $tag, $md5, 1 );
559
560     my @transactions;
561     if ( $storage->transaction_id == 0 ) {
562         @transactions = $storage->current_transactions;
563     }
564
565 #    $self->_release_space( $size, $subloc );
566 #XXX This needs updating to use _release_space
567
568     my $location;
569     my $size = $self->_length_needed( $value, $plain_key );
570
571     # Updating a known md5
572     if ( $keyloc ) {
573         my $keytag = $self->load_tag( $keyloc );
574         my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
575
576         if ( $subloc && !$is_deleted && @transactions ) {
577             my $old_value = $self->read_from_loc( $subloc, $orig_key );
578             my $old_size = $self->_length_needed( $old_value, $plain_key );
579
580             for my $trans_id ( @transactions ) {
581                 my ($loc, $is_deleted, $offset2) = $self->find_keyloc( $keytag, $trans_id );
582                 unless ($loc) {
583                     my $location2 = $storage->request_space( $old_size );
584                     $storage->print_at( $keytag->{offset} + $offset2,
585                         pack($self->{long_pack}, $location2 ),
586                         pack( 'C C', $trans_id, 0 ),
587                     );
588                     $self->_write_value( $location2, $plain_key, $old_value, $orig_key );
589                 }
590             }
591         }
592
593         $location = $self->_storage->request_space( $size );
594         #XXX This needs to be transactionally-aware in terms of which keytag->{offset} to use
595         $storage->print_at( $keytag->{offset} + $offset,
596             pack($self->{long_pack}, $location ),
597             pack( 'C C', $storage->transaction_id, 0 ),
598         );
599     }
600     # Adding a new md5
601     else {
602         my $keyloc = $storage->request_space( $self->tag_size( $self->{keyloc_size} ) );
603
604         # The bucket fit into list
605         if ( defined $offset ) {
606             $storage->print_at( $tag->{offset} + $offset,
607                 $md5, pack( $self->{long_pack}, $keyloc ),
608             );
609         }
610         # If bucket didn't fit into list, split into a new index level
611         else {
612             $self->split_index( $tag, $md5, $keyloc );
613         }
614
615         my $keytag = $self->write_tag(
616             $keyloc, SIG_KEYS, chr(0)x$self->{keyloc_size},
617         );
618
619         $location = $self->_storage->request_space( $size );
620         $storage->print_at( $keytag->{offset},
621             pack( $self->{long_pack}, $location ),
622             pack( 'C C', $storage->transaction_id, 0 ),
623         );
624
625         my $offset = 1;
626         for my $trans_id ( @transactions ) {
627             $storage->print_at( $keytag->{offset} + $self->{key_size} * $offset++,
628                 pack( $self->{long_pack}, 0 ),
629                 pack( 'C C', $trans_id, 1 ),
630             );
631         }
632     }
633
634     $self->_write_value( $location, $plain_key, $value, $orig_key );
635
636     return 1;
637 }
638
639 sub _write_value {
640     my $self = shift;
641     my ($location, $key, $value, $orig_key) = @_;
642
643     my $storage = $self->_storage;
644
645     my $dbm_deep_obj = _get_dbm_object( $value );
646     if ( $dbm_deep_obj && $dbm_deep_obj->_storage ne $storage ) {
647         $self->_throw_error( "Cannot cross-reference. Use export() instead" );
648     }
649
650     ##
651     # Write signature based on content type, set content length and write
652     # actual value.
653     ##
654     my $r = Scalar::Util::reftype( $value ) || '';
655     if ( $dbm_deep_obj ) {
656         $self->write_tag( $location, SIG_INTERNAL,pack($self->{long_pack}, $dbm_deep_obj->_base_offset) );
657     }
658     elsif ($r eq 'HASH') {
659         if ( !$dbm_deep_obj && tied %{$value} ) {
660             $self->_throw_error( "Cannot store something that is tied" );
661         }
662         $self->write_tag( $location, SIG_HASH, chr(0)x$self->{index_size} );
663     }
664     elsif ($r eq 'ARRAY') {
665         if ( !$dbm_deep_obj && tied @{$value} ) {
666             $self->_throw_error( "Cannot store something that is tied" );
667         }
668         $self->write_tag( $location, SIG_ARRAY, chr(0)x$self->{index_size} );
669     }
670     elsif (!defined($value)) {
671         $self->write_tag( $location, SIG_NULL, '' );
672     }
673     else {
674         $self->write_tag( $location, SIG_DATA, $value );
675     }
676
677     ##
678     # Plain key is stored AFTER value, as keys are typically fetched less often.
679     ##
680     $storage->print_at( undef, pack($self->{data_pack}, length($key)) . $key );
681
682     # Internal references don't care about autobless
683     return 1 if $dbm_deep_obj;
684
685     ##
686     # If value is blessed, preserve class name
687     ##
688     if ( $storage->{autobless} ) {
689         if ( defined( my $c = Scalar::Util::blessed($value) ) ) {
690             $storage->print_at( undef, chr(1), pack($self->{data_pack}, length($c)) . $c );
691         }
692         else {
693             $storage->print_at( undef, chr(0) );
694         }
695     }
696
697     ##
698     # Tie the passed in reference so that changes to it are reflected in the
699     # datafile. The use of $location as the base_offset will act as the
700     # the linkage between parent and child.
701     #
702     # The overall assignment is a hack around the fact that just tying doesn't
703     # store the values. This may not be the wrong thing to do.
704     ##
705     if ($r eq 'HASH') {
706         my %x = %$value;
707         tie %$value, 'DBM::Deep', {
708             base_offset => $location,
709             storage     => $storage,
710             parent      => $self->{obj},
711             parent_key  => $orig_key,
712         };
713         %$value = %x;
714         bless $value, 'DBM::Deep::Hash' unless Scalar::Util::blessed( $value );
715     }
716     elsif ($r eq 'ARRAY') {
717         my @x = @$value;
718         tie @$value, 'DBM::Deep', {
719             base_offset => $location,
720             storage     => $storage,
721             parent      => $self->{obj},
722             parent_key  => $orig_key,
723         };
724         @$value = @x;
725         bless $value, 'DBM::Deep::Array' unless Scalar::Util::blessed( $value );
726     }
727
728     return 1;
729 }
730
731 sub split_index {
732     my $self = shift;
733     my ($tag, $md5, $keyloc) = @_;
734
735     my $storage = $self->_storage;
736
737     my $loc = $storage->request_space(
738         $self->tag_size( $self->{index_size} ),
739     );
740
741     $storage->print_at( $tag->{ref_loc}, pack($self->{long_pack}, $loc) );
742
743     my $index_tag = $self->write_tag(
744         $loc, SIG_INDEX,
745         chr(0)x$self->{index_size},
746     );
747
748     my $keys = $tag->{content}
749              . $md5 . pack($self->{long_pack}, $keyloc);
750
751     my @newloc = ();
752     BUCKET:
753     # The <= here is deliberate - we have max_buckets+1 keys to iterate
754     # through, unlike every other loop that uses max_buckets as a stop.
755     for (my $i = 0; $i <= $self->{max_buckets}; $i++) {
756         my ($key, $old_subloc) = $self->_get_key_subloc( $keys, $i );
757
758         die "[INTERNAL ERROR]: No key in split_index()\n" unless $key;
759         die "[INTERNAL ERROR]: No subloc in split_index()\n" unless $old_subloc;
760
761         my $num = ord(substr($key, $tag->{ch} + 1, 1));
762
763         if ($newloc[$num]) {
764             my $subkeys = $storage->read_at( $newloc[$num], $self->{bucket_list_size} );
765
766             # This is looking for the first empty spot
767             my ($subloc, $offset) = $self->_find_in_buckets(
768                 { content => $subkeys }, '',
769             );
770
771             $storage->print_at(
772                 $newloc[$num] + $offset,
773                 $key, pack($self->{long_pack}, $old_subloc),
774             );
775
776             next;
777         }
778
779         my $loc = $storage->request_space(
780             $self->tag_size( $self->{bucket_list_size} ),
781         );
782
783         $storage->print_at(
784             $index_tag->{offset} + ($num * $self->{long_size}),
785             pack($self->{long_pack}, $loc),
786         );
787
788         my $blist_tag = $self->write_tag(
789             $loc, SIG_BLIST,
790             chr(0)x$self->{bucket_list_size},
791         );
792
793         $storage->print_at( $blist_tag->{offset}, $key . pack($self->{long_pack}, $old_subloc) );
794
795         $newloc[$num] = $blist_tag->{offset};
796     }
797
798     $self->_release_space(
799         $self->tag_size( $self->{bucket_list_size} ),
800         $tag->{offset} - SIG_SIZE - $self->{data_size},
801     );
802
803     return 1;
804 }
805
806 sub read_from_loc {
807     my $self = shift;
808     my ($subloc, $orig_key) = @_;
809
810     my $storage = $self->_storage;
811
812     my $signature = $storage->read_at( $subloc, SIG_SIZE );
813
814     ##
815     # If value is a hash or array, return new DBM::Deep object with correct offset
816     ##
817     if (($signature eq SIG_HASH) || ($signature eq SIG_ARRAY)) {
818         #XXX This needs to be a singleton
819 #        my $new_obj;
820 #        my $is_autobless;
821 #        if ( $signature eq SIG_HASH ) {
822 #            $new_obj = {};
823 #            tie %$new_obj, 'DBM::Deep', {
824 #                base_offset => $subloc,
825 #                storage     => $self->_storage,
826 #                parent      => $self->{obj},
827 #                parent_key  => $orig_key,
828 #            };
829 #            $is_autobless = tied(%$new_obj)->_storage->{autobless};
830 #        }
831 #        else {
832 #            $new_obj = [];
833 #            tie @$new_obj, 'DBM::Deep', {
834 #                base_offset => $subloc,
835 #                storage     => $self->_storage,
836 #                parent      => $self->{obj},
837 #                parent_key  => $orig_key,
838 #            };
839 #            $is_autobless = tied(@$new_obj)->_storage->{autobless};
840 #        }
841 #
842 #        if ($is_autobless) {
843
844         my $new_obj = DBM::Deep->new({
845             type        => $signature,
846             base_offset => $subloc,
847             storage     => $self->_storage,
848             parent      => $self->{obj},
849             parent_key  => $orig_key,
850         });
851
852         if ($new_obj->_storage->{autobless}) {
853             ##
854             # Skip over value and plain key to see if object needs
855             # to be re-blessed
856             ##
857             $storage->increment_pointer( $self->{data_size} + $self->{index_size} );
858
859             my $size = $storage->read_at( undef, $self->{data_size} );
860             $size = unpack($self->{data_pack}, $size);
861             if ($size) { $storage->increment_pointer( $size ); }
862
863             my $bless_bit = $storage->read_at( undef, 1 );
864             if ( ord($bless_bit) ) {
865                 my $size = unpack(
866                     $self->{data_pack},
867                     $storage->read_at( undef, $self->{data_size} ),
868                 );
869
870                 if ( $size ) {
871                     $new_obj = bless $new_obj, $storage->read_at( undef, $size );
872                 }
873             }
874         }
875
876         return $new_obj;
877     }
878     elsif ( $signature eq SIG_INTERNAL ) {
879         my $size = $storage->read_at( undef, $self->{data_size} );
880         $size = unpack($self->{data_pack}, $size);
881
882         if ( $size ) {
883             my $new_loc = $storage->read_at( undef, $size );
884             $new_loc = unpack( $self->{long_pack}, $new_loc ); 
885             return $self->read_from_loc( $new_loc, $orig_key );
886         }
887         else {
888             return;
889         }
890     }
891     ##
892     # Otherwise return actual value
893     ##
894     elsif ( $signature eq SIG_DATA ) {
895         my $size = $storage->read_at( undef, $self->{data_size} );
896         $size = unpack($self->{data_pack}, $size);
897
898         my $value = $size ? $storage->read_at( undef, $size ) : '';
899         return $value;
900     }
901
902     ##
903     # Key exists, but content is null
904     ##
905     return;
906 }
907
908 sub get_bucket_value {
909     ##
910     # Fetch single value given tag and MD5 digested key.
911     ##
912     my $self = shift;
913     my ($tag, $md5, $orig_key) = @_;
914
915     #ACID - This is a read. Can find exact or HEAD
916     my ($keyloc, $offset) = $self->_find_in_buckets( $tag, $md5 );
917
918     if ( !$keyloc ) {
919         #XXX Need to use real key
920 #        $self->add_bucket( $tag, $md5, $orig_key, undef, $orig_key );
921 #        return;
922     }
923 #    elsif ( !$is_deleted ) {
924     else {
925         my $keytag = $self->load_tag( $keyloc );
926         my ($subloc, $is_deleted) = $self->find_keyloc( $keytag );
927         if (!$subloc && !$is_deleted) {
928             ($subloc, $is_deleted) = $self->find_keyloc( $keytag, 0 );
929         }
930         if ( $subloc && !$is_deleted ) {
931             return $self->read_from_loc( $subloc, $orig_key );
932         }
933     }
934
935     return;
936 }
937
938 sub delete_bucket {
939     ##
940     # Delete single key/value pair given tag and MD5 digested key.
941     ##
942     my $self = shift;
943     my ($tag, $md5, $orig_key) = @_;
944
945     #ACID - Although this is a mutation, we must find any transaction.
946     # This is because we need to mark something as deleted that is in the HEAD.
947     my ($keyloc, $offset) = $self->_find_in_buckets( $tag, $md5 );
948
949     return if !$keyloc;
950
951     my $storage = $self->_storage;
952
953     my @transactions;
954     if ( $storage->transaction_id == 0 ) {
955         @transactions = $storage->current_transactions;
956     }
957
958     if ( $storage->transaction_id == 0 ) {
959         my $keytag = $self->load_tag( $keyloc );
960
961         my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
962         return if !$subloc || $is_deleted;
963
964         my $value = $self->read_from_loc( $subloc, $orig_key );
965
966         my $size = $self->_length_needed( $value, $orig_key );
967
968         for my $trans_id ( @transactions ) {
969             my ($loc, $is_deleted, $offset2) = $self->find_keyloc( $keytag, $trans_id );
970             unless ($loc) {
971                 my $location2 = $storage->request_space( $size );
972                 $storage->print_at( $keytag->{offset} + $offset2,
973                     pack($self->{long_pack}, $location2 ),
974                     pack( 'C C', $trans_id, 0 ),
975                 );
976                 $self->_write_value( $location2, $orig_key, $value, $orig_key );
977             }
978         }
979
980         $keytag = $self->load_tag( $keyloc );
981         ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
982         $storage->print_at( $keytag->{offset} + $offset,
983             substr( $keytag->{content}, $offset + $self->{key_size} ),
984             chr(0) x $self->{key_size},
985         );
986     }
987     else {
988         my $keytag = $self->load_tag( $keyloc );
989
990         my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
991
992         $storage->print_at( $keytag->{offset} + $offset,
993             pack($self->{long_pack}, 0 ),
994             pack( 'C C', $storage->transaction_id, 1 ),
995         );
996     }
997
998     return 1;
999 }
1000
1001 sub bucket_exists {
1002     ##
1003     # Check existence of single key given tag and MD5 digested key.
1004     ##
1005     my $self = shift;
1006     my ($tag, $md5) = @_;
1007
1008     #ACID - This is a read. Can find exact or HEAD
1009     my ($keyloc) = $self->_find_in_buckets( $tag, $md5 );
1010     my $keytag = $self->load_tag( $keyloc );
1011     my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
1012     if ( !$subloc && !$is_deleted ) {
1013         ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag, 0 );
1014     }
1015     return ($subloc && !$is_deleted) && 1;
1016 }
1017
1018 sub find_blist {
1019     ##
1020     # Locate offset for bucket list, given digested key
1021     ##
1022     my $self = shift;
1023     my ($offset, $md5, $args) = @_;
1024     $args = {} unless $args;
1025
1026     ##
1027     # Locate offset for bucket list using digest index system
1028     ##
1029     my $tag = $self->load_tag( $offset )
1030         or $self->_throw_error( "INTERNAL ERROR - Cannot find tag" );
1031
1032     my $ch = 0;
1033     while ($tag->{signature} ne SIG_BLIST) {
1034         my $num = ord substr($md5, $ch, 1);
1035
1036         my $ref_loc = $tag->{offset} + ($num * $self->{long_size});
1037         $tag = $self->index_lookup( $tag, $num );
1038
1039         if (!$tag) {
1040             return if !$args->{create};
1041
1042             my $loc = $self->_storage->request_space(
1043                 $self->tag_size( $self->{bucket_list_size} ),
1044             );
1045
1046             $self->_storage->print_at( $ref_loc, pack($self->{long_pack}, $loc) );
1047
1048             $tag = $self->write_tag(
1049                 $loc, SIG_BLIST,
1050                 chr(0)x$self->{bucket_list_size},
1051             );
1052
1053             $tag->{ref_loc} = $ref_loc;
1054             $tag->{ch} = $ch;
1055
1056             last;
1057         }
1058
1059         $tag->{ch} = $ch++;
1060         $tag->{ref_loc} = $ref_loc;
1061     }
1062
1063     return $tag;
1064 }
1065
1066 sub index_lookup {
1067     ##
1068     # Given index tag, lookup single entry in index and return .
1069     ##
1070     my $self = shift;
1071     my ($tag, $index) = @_;
1072
1073     my $location = unpack(
1074         $self->{long_pack},
1075         substr(
1076             $tag->{content},
1077             $index * $self->{long_size},
1078             $self->{long_size},
1079         ),
1080     );
1081
1082     if (!$location) { return; }
1083
1084     return $self->load_tag( $location );
1085 }
1086
1087 sub traverse_index {
1088     ##
1089     # Scan index and recursively step into deeper levels, looking for next key.
1090     ##
1091     my $self = shift;
1092     my ($xxxx, $offset, $ch, $force_return_next) = @_;
1093
1094     my $tag = $self->load_tag( $offset );
1095
1096     if ($tag->{signature} ne SIG_BLIST) {
1097         my $start = $xxxx->{return_next} ? 0 : ord(substr($xxxx->{prev_md5}, $ch, 1));
1098
1099         for (my $idx = $start; $idx < $self->{hash_chars_used}; $idx++) {
1100             my $subloc = unpack(
1101                 $self->{long_pack},
1102                 substr(
1103                     $tag->{content},
1104                     $idx * $self->{long_size},
1105                     $self->{long_size},
1106                 ),
1107             );
1108
1109             if ($subloc) {
1110                 my $result = $self->traverse_index(
1111                     $xxxx, $subloc, $ch + 1, $force_return_next,
1112                 );
1113
1114                 if (defined $result) { return $result; }
1115             }
1116         } # index loop
1117
1118         $xxxx->{return_next} = 1;
1119     }
1120     # This is the bucket list
1121     else {
1122         my $keys = $tag->{content};
1123         if ($force_return_next) { $xxxx->{return_next} = 1; }
1124
1125         ##
1126         # Iterate through buckets, looking for a key match
1127         ##
1128         my $transaction_id = $self->_storage->transaction_id;
1129         for (my $i = 0; $i < $self->{max_buckets}; $i++) {
1130             my ($key, $keyloc) = $self->_get_key_subloc( $keys, $i );
1131
1132             # End of bucket list -- return to outer loop
1133             if (!$keyloc) {
1134                 $xxxx->{return_next} = 1;
1135                 last;
1136             }
1137             # Located previous key -- return next one found
1138             elsif ($key eq $xxxx->{prev_md5}) {
1139                 $xxxx->{return_next} = 1;
1140                 next;
1141             }
1142             # Seek to bucket location and skip over signature
1143             elsif ($xxxx->{return_next}) {
1144                 my $storage = $self->_storage;
1145
1146                 my $keytag = $self->load_tag( $keyloc );
1147                 my ($subloc, $is_deleted) = $self->find_keyloc( $keytag );
1148                 if ( $subloc == 0 && !$is_deleted ) {
1149                     ($subloc, $is_deleted) = $self->find_keyloc( $keytag, 0 );
1150                 }
1151                 next if $is_deleted;
1152
1153                 # Skip over value to get to plain key
1154                 my $sig = $storage->read_at( $subloc, SIG_SIZE );
1155
1156                 my $size = $storage->read_at( undef, $self->{data_size} );
1157                 $size = unpack($self->{data_pack}, $size);
1158                 if ($size) { $storage->increment_pointer( $size ); }
1159
1160                 # Read in plain key and return as scalar
1161                 $size = $storage->read_at( undef, $self->{data_size} );
1162                 $size = unpack($self->{data_pack}, $size);
1163
1164                 my $plain_key;
1165                 if ($size) { $plain_key = $storage->read_at( undef, $size); }
1166                 return $plain_key;
1167             }
1168         }
1169
1170         $xxxx->{return_next} = 1;
1171     }
1172
1173     return;
1174 }
1175
1176 # Utilities
1177
1178 sub _get_key_subloc {
1179     my $self = shift;
1180     my ($keys, $idx) = @_;
1181
1182     return unpack(
1183         # This is 'a', not 'A'. Please read the pack() documentation for the
1184         # difference between the two and why it's important.
1185         "a$self->{hash_size} $self->{long_pack}",
1186         substr(
1187             $keys,
1188             ($idx * $self->{bucket_size}),
1189             $self->{bucket_size},
1190         ),
1191     );
1192 }
1193
1194 sub _find_in_buckets {
1195     my $self = shift;
1196     my ($tag, $md5) = @_;
1197
1198     BUCKET:
1199     for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
1200         my ($key, $subloc) = $self->_get_key_subloc(
1201             $tag->{content}, $i,
1202         );
1203
1204         my @rv = ($subloc, $i * $self->{bucket_size});
1205
1206         unless ( $subloc ) {
1207             return @rv;
1208         }
1209
1210         next BUCKET if $key ne $md5;
1211
1212         return @rv;
1213     }
1214
1215     return;
1216 }
1217
1218 sub _release_space {
1219     my $self = shift;
1220     my ($size, $loc) = @_;
1221
1222     my $next_loc = 0;
1223
1224     $self->_storage->print_at( $loc,
1225         SIG_FREE, 
1226         pack($self->{long_pack}, $size ),
1227         pack($self->{long_pack}, $next_loc ),
1228     );
1229
1230     return;
1231 }
1232
1233 sub _throw_error {
1234     die "DBM::Deep: $_[1]\n";
1235 }
1236
1237 sub _get_dbm_object {
1238     my $item = shift;
1239
1240     my $obj = eval {
1241         local $SIG{__DIE__};
1242         if ($item->isa( 'DBM::Deep' )) {
1243             return $item;
1244         }
1245         return;
1246     };
1247     return $obj if $obj;
1248
1249     my $r = Scalar::Util::reftype( $item ) || '';
1250     if ( $r eq 'HASH' ) {
1251         my $obj = eval {
1252             local $SIG{__DIE__};
1253             my $obj = tied(%$item);
1254             if ($obj->isa( 'DBM::Deep' )) {
1255                 return $obj;
1256             }
1257             return;
1258         };
1259         return $obj if $obj;
1260     }
1261     elsif ( $r eq 'ARRAY' ) {
1262         my $obj = eval {
1263             local $SIG{__DIE__};
1264             my $obj = tied(@$item);
1265             if ($obj->isa( 'DBM::Deep' )) {
1266                 return $obj;
1267             }
1268             return;
1269         };
1270         return $obj if $obj;
1271     }
1272
1273     return;
1274 }
1275
1276 sub _length_needed {
1277     my $self = shift;
1278     my ($value, $key) = @_;
1279
1280     my $is_dbm_deep = eval {
1281         local $SIG{'__DIE__'};
1282         $value->isa( 'DBM::Deep' );
1283     };
1284
1285     my $len = SIG_SIZE
1286             + $self->{data_size} # size for value
1287             + $self->{data_size} # size for key
1288             + length( $key );    # length of key
1289
1290     if ( $is_dbm_deep && $value->_storage eq $self->_storage ) {
1291         # long_size is for the internal reference
1292         return $len + $self->{long_size};
1293     }
1294
1295     if ( $self->_storage->{autobless} ) {
1296         # This is for the bit saying whether or not this thing is blessed.
1297         $len += 1;
1298     }
1299
1300     my $r = Scalar::Util::reftype( $value ) || '';
1301     unless ( $r eq 'HASH' || $r eq 'ARRAY' ) {
1302         if ( defined $value ) {
1303             $len += length( $value );
1304         }
1305         return $len;
1306     }
1307
1308     $len += $self->{index_size};
1309
1310     # if autobless is enabled, must also take into consideration
1311     # the class name as it is stored after the key.
1312     if ( $self->_storage->{autobless} ) {
1313         my $c = Scalar::Util::blessed($value);
1314         if ( defined $c && !$is_dbm_deep ) {
1315             $len += $self->{data_size} + length($c);
1316         }
1317     }
1318
1319     return $len;
1320 }
1321
1322 1;
1323 __END__