r13304@rob-kinyons-powerbook58: rob | 2006-05-18 15:10:48 -0400
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 use 5.6.0;
4
5 use strict;
6 use warnings;
7
8 our $VERSION = q(0.99_03);
9
10 use Fcntl qw( :DEFAULT :flock );
11 use Scalar::Util ();
12
13 # File-wide notes:
14 # * To add to bucket_size, make sure you modify the following:
15 #   - calculate_sizes()
16 #   - _get_key_subloc()
17 #   - add_bucket() - where the buckets are printed
18 #
19 # * Every method in here assumes that the _fileobj has been appropriately
20 #   safeguarded. This can be anything from flock() to some sort of manual
21 #   mutex. But, it's the caller's responsability to make sure that this has
22 #   been done.
23
24 ##
25 # Setup file and tag signatures.  These should never change.
26 ##
27 sub SIG_FILE     () { 'DPDB' }
28 sub SIG_HEADER   () { 'h'    }
29 sub SIG_INTERNAL () { 'i'    }
30 sub SIG_HASH     () { 'H'    }
31 sub SIG_ARRAY    () { 'A'    }
32 sub SIG_NULL     () { 'N'    }
33 sub SIG_DATA     () { 'D'    }
34 sub SIG_INDEX    () { 'I'    }
35 sub SIG_BLIST    () { 'B'    }
36 sub SIG_FREE     () { 'F'    }
37 sub SIG_KEYS     () { 'K'    }
38 sub SIG_SIZE     () {  1     }
39
40 ################################################################################
41 #
42 # This is new code. It is a complete rewrite of the engine based on a new API
43 #
44 ################################################################################
45
46 sub write_value {
47     my $self = shift;
48     my ($offset, $key, $value, $orig_key) = @_;
49
50     my $dig_key = $self->apply_digest( $key );
51     my $tag = $self->find_blist( $offset, $dig_key, { create => 1 } );
52     return $self->add_bucket( $tag, $dig_key, $key, $value, undef, $orig_key );
53 }
54
55 sub read_value {
56     my $self = shift;
57     my ($offset, $key) = @_;
58
59     my $dig_key = $self->apply_digest( $key );
60     my $tag = $self->find_blist( $offset, $dig_key );
61     return $self->get_bucket_value( $tag, $dig_key, $key );
62 }
63
64 sub delete_key {
65     my $self = shift;
66     my ($offset, $key) = @_;
67
68     my $dig_key = $self->apply_digest( $key );
69     my $tag = $self->find_blist( $offset, $dig_key );
70     return $self->delete_bucket( $tag, $dig_key, $key );
71 }
72
73 sub key_exists {
74     my $self = shift;
75     my ($offset, $key) = @_;
76
77     my $dig_key = $self->apply_digest( $key );
78     my $tag = $self->find_blist( $offset, $dig_key );
79     return $self->bucket_exists( $tag, $dig_key, $key );
80 }
81
82 sub XXXget_next_key {
83     my $self = shift;
84     my ($offset, $prev_key) = @_;
85
86 #    my $dig_key = $self->apply_digest( $key );
87 }
88
89 ################################################################################
90 #
91 # Below here is the old code. It will be folded into the code above as it can.
92 #
93 ################################################################################
94
95 sub new {
96     my $class = shift;
97     my ($args) = @_;
98
99     my $self = bless {
100         long_size => 4,
101         long_pack => 'N',
102         data_size => 4,
103         data_pack => 'N',
104
105         digest    => \&Digest::MD5::md5,
106         hash_size => 16, # In bytes
107
108         ##
109         # Number of buckets per blist before another level of indexing is
110         # done. Increase this value for slightly greater speed, but larger database
111         # files. DO NOT decrease this value below 16, due to risk of recursive
112         # reindex overrun.
113         ##
114         max_buckets => 16,
115
116         fileobj => undef,
117         obj     => undef,
118     }, $class;
119
120     if ( defined $args->{pack_size} ) {
121         if ( lc $args->{pack_size} eq 'small' ) {
122             $args->{long_size} = 2;
123             $args->{long_pack} = 'n';
124         }
125         elsif ( lc $args->{pack_size} eq 'medium' ) {
126             $args->{long_size} = 4;
127             $args->{long_pack} = 'N';
128         }
129         elsif ( lc $args->{pack_size} eq 'large' ) {
130             $args->{long_size} = 8;
131             $args->{long_pack} = 'Q';
132         }
133         else {
134             die "Unknown pack_size value: '$args->{pack_size}'\n";
135         }
136     }
137
138     # Grab the parameters we want to use
139     foreach my $param ( keys %$self ) {
140         next unless exists $args->{$param};
141         $self->{$param} = $args->{$param};
142     }
143     Scalar::Util::weaken( $self->{obj} ) if $self->{obj};
144
145     if ( $self->{max_buckets} < 16 ) {
146         warn "Floor of max_buckets is 16. Setting it to 16 from '$self->{max_buckets}'\n";
147         $self->{max_buckets} = 16;
148     }
149
150     return $self;
151 }
152
153 sub _fileobj { return $_[0]{fileobj} }
154
155 sub apply_digest {
156     my $self = shift;
157     return $self->{digest}->(@_);
158 }
159
160 sub calculate_sizes {
161     my $self = shift;
162
163     # The 2**8 here indicates the number of different characters in the
164     # current hashing algorithm
165     #XXX Does this need to be updated with different hashing algorithms?
166     $self->{hash_chars_used}  = (2**8);
167     $self->{index_size}       = $self->{hash_chars_used} * $self->{long_size};
168
169     $self->{bucket_size}      = $self->{hash_size} + $self->{long_size} * 2;
170     $self->{bucket_list_size} = $self->{max_buckets} * $self->{bucket_size};
171
172     $self->{key_size}         = $self->{long_size} * 2;
173     $self->{keyloc_size}      = $self->{max_buckets} * $self->{key_size};
174
175     return;
176 }
177
178 sub write_file_header {
179     my $self = shift;
180
181     my $loc = $self->_fileobj->request_space( length( SIG_FILE ) + 33 );
182
183     $self->_fileobj->print_at( $loc,
184         SIG_FILE,
185         SIG_HEADER,
186         pack('N', 1),  # header version
187         pack('N', 24), # header size
188         pack('N4', 0, 0, 0, 0),  # currently running transaction IDs
189         pack('n', $self->{long_size}),
190         pack('A', $self->{long_pack}),
191         pack('n', $self->{data_size}),
192         pack('A', $self->{data_pack}),
193         pack('n', $self->{max_buckets}),
194     );
195
196     $self->_fileobj->set_transaction_offset( 13 );
197
198     return;
199 }
200
201 sub read_file_header {
202     my $self = shift;
203
204     my $buffer = $self->_fileobj->read_at( 0, length(SIG_FILE) + 9 );
205     return unless length($buffer);
206
207     my ($file_signature, $sig_header, $header_version, $size) = unpack(
208         'A4 A N N', $buffer
209     );
210
211     unless ( $file_signature eq SIG_FILE ) {
212         $self->_fileobj->close;
213         $self->_throw_error( "Signature not found -- file is not a Deep DB" );
214     }
215
216     unless ( $sig_header eq SIG_HEADER ) {
217         $self->_fileobj->close;
218         $self->_throw_error( "Old file version found." );
219     }
220
221     my $buffer2 = $self->_fileobj->read_at( undef, $size );
222     my ($a1, $a2, $a3, $a4, @values) = unpack( 'N4 n A n A n', $buffer2 );
223
224     $self->_fileobj->set_transaction_offset( 13 );
225
226     if ( @values < 5 || grep { !defined } @values ) {
227         $self->_fileobj->close;
228         $self->_throw_error("Corrupted file - bad header");
229     }
230
231     #XXX Add warnings if values weren't set right
232     @{$self}{qw(long_size long_pack data_size data_pack max_buckets)} = @values;
233
234     return length($buffer) + length($buffer2);
235 }
236
237 sub setup_fh {
238     my $self = shift;
239     my ($obj) = @_;
240
241     # Need to remove use of $fh here
242     my $fh = $self->_fileobj->{fh};
243     flock $fh, LOCK_EX;
244
245     #XXX The duplication of calculate_sizes needs to go away
246     unless ( $obj->{base_offset} ) {
247         my $bytes_read = $self->read_file_header;
248
249         $self->calculate_sizes;
250
251         ##
252         # File is empty -- write header and master index
253         ##
254         if (!$bytes_read) {
255             $self->_fileobj->audit( "# Database created on" );
256
257             $self->write_file_header;
258
259             $obj->{base_offset} = $self->_fileobj->request_space(
260                 $self->tag_size( $self->{index_size} ),
261             );
262
263             $self->write_tag(
264                 $obj->_base_offset, $obj->_type,
265                 chr(0)x$self->{index_size},
266             );
267
268             # Flush the filehandle
269             my $old_fh = select $fh;
270             my $old_af = $|; $| = 1; $| = $old_af;
271             select $old_fh;
272         }
273         else {
274             $obj->{base_offset} = $bytes_read;
275
276             ##
277             # Get our type from master index header
278             ##
279             my $tag = $self->load_tag($obj->_base_offset);
280             unless ( $tag ) {
281                 flock $fh, LOCK_UN;
282                 $self->_throw_error("Corrupted file, no master index record");
283             }
284
285             unless ($obj->_type eq $tag->{signature}) {
286                 flock $fh, LOCK_UN;
287                 $self->_throw_error("File type mismatch");
288             }
289         }
290     }
291     else {
292         $self->calculate_sizes;
293     }
294
295     #XXX We have to make sure we don't mess up when autoflush isn't turned on
296     $self->_fileobj->set_inode;
297
298     flock $fh, LOCK_UN;
299
300     return 1;
301 }
302
303 sub tag_size {
304     my $self = shift;
305     my ($size) = @_;
306     return SIG_SIZE + $self->{data_size} + $size;
307 }
308
309 sub write_tag {
310     ##
311     # Given offset, signature and content, create tag and write to disk
312     ##
313     my $self = shift;
314     my ($offset, $sig, $content) = @_;
315     my $size = length( $content );
316
317     $self->_fileobj->print_at(
318         $offset, 
319         $sig, pack($self->{data_pack}, $size), $content,
320     );
321
322     return unless defined $offset;
323
324     return {
325         signature => $sig,
326         #XXX Is this even used?
327         size      => $size,
328         offset    => $offset + SIG_SIZE + $self->{data_size},
329         content   => $content
330     };
331 }
332
333 sub load_tag {
334     ##
335     # Given offset, load single tag and return signature, size and data
336     ##
337     my $self = shift;
338     my ($offset) = @_;
339
340     my $fileobj = $self->_fileobj;
341
342     my ($sig, $size) = unpack(
343         "A $self->{data_pack}",
344         $fileobj->read_at( $offset, SIG_SIZE + $self->{data_size} ),
345     );
346
347     return {
348         signature => $sig,
349         size      => $size,   #XXX Is this even used?
350         offset    => $offset + SIG_SIZE + $self->{data_size},
351         content   => $fileobj->read_at( undef, $size ),
352     };
353 }
354
355 sub find_keyloc {
356     my $self = shift;
357     my ($tag, $transaction_id) = @_;
358     $transaction_id = $self->_fileobj->transaction_id
359         unless defined $transaction_id;
360
361     for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
362         my ($loc, $trans_id, $is_deleted) = unpack(
363             "$self->{long_pack} C C",
364             substr( $tag->{content}, $i * $self->{key_size}, $self->{key_size} ),
365         );
366
367         if ( $loc == 0 ) {
368             return ( $loc, $is_deleted, $i * $self->{key_size} );
369         }
370
371         next if $transaction_id != $trans_id;
372
373         return ( $loc, $is_deleted, $i * $self->{key_size} );
374     }
375
376     return;
377 }
378
379 sub add_bucket {
380     ##
381     # Adds one key/value pair to bucket list, given offset, MD5 digest of key,
382     # plain (undigested) key and value.
383     ##
384     my $self = shift;
385     my ($tag, $md5, $plain_key, $value, $deleted, $orig_key) = @_;
386
387     # This verifies that only supported values will be stored.
388     {
389         my $r = Scalar::Util::reftype( $value );
390
391         last if !defined $r;
392         last if $r eq 'HASH';
393         last if $r eq 'ARRAY';
394
395         $self->_throw_error(
396             "Storage of references of type '$r' is not supported."
397         );
398     }
399
400     my $fileobj = $self->_fileobj;
401
402     #ACID - This is a mutation. Must only find the exact transaction
403     my ($keyloc, $offset) = $self->_find_in_buckets( $tag, $md5, 1 );
404
405     my @transactions;
406     if ( $fileobj->transaction_id == 0 ) {
407         @transactions = $fileobj->current_transactions;
408     }
409
410 #    $self->_release_space( $size, $subloc );
411 #XXX This needs updating to use _release_space
412
413     my $location;
414     my $size = $self->_length_needed( $value, $plain_key );
415
416     # Updating a known md5
417     if ( $keyloc ) {
418         my $keytag = $self->load_tag( $keyloc );
419         my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
420
421         if ( $subloc && !$is_deleted && @transactions ) {
422             my $old_value = $self->read_from_loc( $subloc, $orig_key );
423             my $old_size = $self->_length_needed( $old_value, $plain_key );
424
425             for my $trans_id ( @transactions ) {
426                 my ($loc, $is_deleted, $offset2) = $self->find_keyloc( $keytag, $trans_id );
427                 unless ($loc) {
428                     my $location2 = $fileobj->request_space( $old_size );
429                     $fileobj->print_at( $keytag->{offset} + $offset2,
430                         pack($self->{long_pack}, $location2 ),
431                         pack( 'C C', $trans_id, 0 ),
432                     );
433                     $self->_write_value( $location2, $plain_key, $old_value, $orig_key );
434                 }
435             }
436         }
437
438         $location = $self->_fileobj->request_space( $size );
439         #XXX This needs to be transactionally-aware in terms of which keytag->{offset} to use
440         $fileobj->print_at( $keytag->{offset} + $offset,
441             pack($self->{long_pack}, $location ),
442             pack( 'C C', $fileobj->transaction_id, 0 ),
443         );
444     }
445     # Adding a new md5
446     else {
447         my $keyloc = $fileobj->request_space( $self->tag_size( $self->{keyloc_size} ) );
448
449         # The bucket fit into list
450         if ( defined $offset ) {
451             $fileobj->print_at( $tag->{offset} + $offset,
452                 $md5, pack( $self->{long_pack}, $keyloc ),
453             );
454         }
455         # If bucket didn't fit into list, split into a new index level
456         else {
457             $self->split_index( $tag, $md5, $keyloc );
458         }
459
460         my $keytag = $self->write_tag(
461             $keyloc, SIG_KEYS, chr(0)x$self->{keyloc_size},
462         );
463
464         $location = $self->_fileobj->request_space( $size );
465         $fileobj->print_at( $keytag->{offset},
466             pack( $self->{long_pack}, $location ),
467             pack( 'C C', $fileobj->transaction_id, 0 ),
468         );
469
470         my $offset = 1;
471         for my $trans_id ( @transactions ) {
472             $fileobj->print_at( $keytag->{offset} + $self->{key_size} * $offset++,
473                 pack( $self->{long_pack}, 0 ),
474                 pack( 'C C', $trans_id, 1 ),
475             );
476         }
477     }
478
479     $self->_write_value( $location, $plain_key, $value, $orig_key );
480
481     return 1;
482 }
483
484 sub _write_value {
485     my $self = shift;
486     my ($location, $key, $value, $orig_key) = @_;
487
488     my $fileobj = $self->_fileobj;
489
490     my $dbm_deep_obj = _get_dbm_object( $value );
491     if ( $dbm_deep_obj && $dbm_deep_obj->_fileobj ne $fileobj ) {
492         $self->_throw_error( "Cannot cross-reference. Use export() instead" );
493     }
494
495     ##
496     # Write signature based on content type, set content length and write
497     # actual value.
498     ##
499     my $r = Scalar::Util::reftype( $value ) || '';
500     if ( $dbm_deep_obj ) {
501         $self->write_tag( $location, SIG_INTERNAL,pack($self->{long_pack}, $dbm_deep_obj->_base_offset) );
502     }
503     elsif ($r eq 'HASH') {
504         if ( !$dbm_deep_obj && tied %{$value} ) {
505             $self->_throw_error( "Cannot store something that is tied" );
506         }
507         $self->write_tag( $location, SIG_HASH, chr(0)x$self->{index_size} );
508     }
509     elsif ($r eq 'ARRAY') {
510         if ( !$dbm_deep_obj && tied @{$value} ) {
511             $self->_throw_error( "Cannot store something that is tied" );
512         }
513         $self->write_tag( $location, SIG_ARRAY, chr(0)x$self->{index_size} );
514     }
515     elsif (!defined($value)) {
516         $self->write_tag( $location, SIG_NULL, '' );
517     }
518     else {
519         $self->write_tag( $location, SIG_DATA, $value );
520     }
521
522     ##
523     # Plain key is stored AFTER value, as keys are typically fetched less often.
524     ##
525     $fileobj->print_at( undef, pack($self->{data_pack}, length($key)) . $key );
526
527     # Internal references don't care about autobless
528     return 1 if $dbm_deep_obj;
529
530     ##
531     # If value is blessed, preserve class name
532     ##
533     if ( $fileobj->{autobless} ) {
534         if ( defined( my $c = Scalar::Util::blessed($value) ) ) {
535             $fileobj->print_at( undef, chr(1), pack($self->{data_pack}, length($c)) . $c );
536         }
537         else {
538             $fileobj->print_at( undef, chr(0) );
539         }
540     }
541
542     ##
543     # Tie the passed in reference so that changes to it are reflected in the
544     # datafile. The use of $location as the base_offset will act as the
545     # the linkage between parent and child.
546     #
547     # The overall assignment is a hack around the fact that just tying doesn't
548     # store the values. This may not be the wrong thing to do.
549     ##
550     if ($r eq 'HASH') {
551         my %x = %$value;
552         tie %$value, 'DBM::Deep', {
553             base_offset => $location,
554             fileobj     => $fileobj,
555             parent      => $self->{obj},
556             parent_key  => $orig_key,
557         };
558         %$value = %x;
559     }
560     elsif ($r eq 'ARRAY') {
561         my @x = @$value;
562         tie @$value, 'DBM::Deep', {
563             base_offset => $location,
564             fileobj     => $fileobj,
565             parent      => $self->{obj},
566             parent_key  => $orig_key,
567         };
568         @$value = @x;
569     }
570
571     return 1;
572 }
573
574 sub split_index {
575     my $self = shift;
576     my ($tag, $md5, $keyloc) = @_;
577
578     my $fileobj = $self->_fileobj;
579
580     my $loc = $fileobj->request_space(
581         $self->tag_size( $self->{index_size} ),
582     );
583
584     $fileobj->print_at( $tag->{ref_loc}, pack($self->{long_pack}, $loc) );
585
586     my $index_tag = $self->write_tag(
587         $loc, SIG_INDEX,
588         chr(0)x$self->{index_size},
589     );
590
591     my $keys = $tag->{content}
592              . $md5 . pack($self->{long_pack}, $keyloc);
593
594     my @newloc = ();
595     BUCKET:
596     # The <= here is deliberate - we have max_buckets+1 keys to iterate
597     # through, unlike every other loop that uses max_buckets as a stop.
598     for (my $i = 0; $i <= $self->{max_buckets}; $i++) {
599         my ($key, $old_subloc) = $self->_get_key_subloc( $keys, $i );
600
601         die "[INTERNAL ERROR]: No key in split_index()\n" unless $key;
602         die "[INTERNAL ERROR]: No subloc in split_index()\n" unless $old_subloc;
603
604         my $num = ord(substr($key, $tag->{ch} + 1, 1));
605
606         if ($newloc[$num]) {
607             my $subkeys = $fileobj->read_at( $newloc[$num], $self->{bucket_list_size} );
608
609             # This is looking for the first empty spot
610             my ($subloc, $offset) = $self->_find_in_buckets(
611                 { content => $subkeys }, '',
612             );
613
614             $fileobj->print_at(
615                 $newloc[$num] + $offset,
616                 $key, pack($self->{long_pack}, $old_subloc),
617             );
618
619             next;
620         }
621
622         my $loc = $fileobj->request_space(
623             $self->tag_size( $self->{bucket_list_size} ),
624         );
625
626         $fileobj->print_at(
627             $index_tag->{offset} + ($num * $self->{long_size}),
628             pack($self->{long_pack}, $loc),
629         );
630
631         my $blist_tag = $self->write_tag(
632             $loc, SIG_BLIST,
633             chr(0)x$self->{bucket_list_size},
634         );
635
636         $fileobj->print_at( $blist_tag->{offset}, $key . pack($self->{long_pack}, $old_subloc) );
637
638         $newloc[$num] = $blist_tag->{offset};
639     }
640
641     $self->_release_space(
642         $self->tag_size( $self->{bucket_list_size} ),
643         $tag->{offset} - SIG_SIZE - $self->{data_size},
644     );
645
646     return 1;
647 }
648
649 sub read_from_loc {
650     my $self = shift;
651     my ($subloc, $orig_key) = @_;
652
653     my $fileobj = $self->_fileobj;
654
655     my $signature = $fileobj->read_at( $subloc, SIG_SIZE );
656
657     ##
658     # If value is a hash or array, return new DBM::Deep object with correct offset
659     ##
660     if (($signature eq SIG_HASH) || ($signature eq SIG_ARRAY)) {
661         #XXX This needs to be a singleton
662         my $new_obj = DBM::Deep->new({
663             type        => $signature,
664             base_offset => $subloc,
665             fileobj     => $self->_fileobj,
666             parent      => $self->{obj},
667             parent_key  => $orig_key,
668         });
669
670         if ($new_obj->_fileobj->{autobless}) {
671             ##
672             # Skip over value and plain key to see if object needs
673             # to be re-blessed
674             ##
675             $fileobj->increment_pointer( $self->{data_size} + $self->{index_size} );
676
677             my $size = $fileobj->read_at( undef, $self->{data_size} );
678             $size = unpack($self->{data_pack}, $size);
679             if ($size) { $fileobj->increment_pointer( $size ); }
680
681             my $bless_bit = $fileobj->read_at( undef, 1 );
682             if ( ord($bless_bit) ) {
683                 my $size = unpack(
684                     $self->{data_pack},
685                     $fileobj->read_at( undef, $self->{data_size} ),
686                 );
687
688                 if ( $size ) {
689                     $new_obj = bless $new_obj, $fileobj->read_at( undef, $size );
690                 }
691             }
692         }
693
694         return $new_obj;
695     }
696     elsif ( $signature eq SIG_INTERNAL ) {
697         my $size = $fileobj->read_at( undef, $self->{data_size} );
698         $size = unpack($self->{data_pack}, $size);
699
700         if ( $size ) {
701             my $new_loc = $fileobj->read_at( undef, $size );
702             $new_loc = unpack( $self->{long_pack}, $new_loc ); 
703             return $self->read_from_loc( $new_loc, $orig_key );
704         }
705         else {
706             return;
707         }
708     }
709     ##
710     # Otherwise return actual value
711     ##
712     elsif ( $signature eq SIG_DATA ) {
713         my $size = $fileobj->read_at( undef, $self->{data_size} );
714         $size = unpack($self->{data_pack}, $size);
715
716         my $value = $size ? $fileobj->read_at( undef, $size ) : '';
717         return $value;
718     }
719
720     ##
721     # Key exists, but content is null
722     ##
723     return;
724 }
725
726 sub get_bucket_value {
727     ##
728     # Fetch single value given tag and MD5 digested key.
729     ##
730     my $self = shift;
731     my ($tag, $md5, $orig_key) = @_;
732
733     #ACID - This is a read. Can find exact or HEAD
734     my ($keyloc, $offset) = $self->_find_in_buckets( $tag, $md5 );
735
736     if ( !$keyloc ) {
737         #XXX Need to use real key
738 #        $self->add_bucket( $tag, $md5, $orig_key, undef, $orig_key );
739 #        return;
740     }
741 #    elsif ( !$is_deleted ) {
742     else {
743         my $keytag = $self->load_tag( $keyloc );
744         my ($subloc, $is_deleted) = $self->find_keyloc( $keytag );
745         if (!$subloc && !$is_deleted) {
746             ($subloc, $is_deleted) = $self->find_keyloc( $keytag, 0 );
747         }
748         if ( $subloc && !$is_deleted ) {
749             return $self->read_from_loc( $subloc, $orig_key );
750         }
751     }
752
753     return;
754 }
755
756 sub delete_bucket {
757     ##
758     # Delete single key/value pair given tag and MD5 digested key.
759     ##
760     my $self = shift;
761     my ($tag, $md5, $orig_key) = @_;
762
763     #ACID - Although this is a mutation, we must find any transaction.
764     # This is because we need to mark something as deleted that is in the HEAD.
765     my ($keyloc, $offset) = $self->_find_in_buckets( $tag, $md5 );
766
767     return if !$keyloc;
768
769     my $fileobj = $self->_fileobj;
770
771     my @transactions;
772     if ( $fileobj->transaction_id == 0 ) {
773         @transactions = $fileobj->current_transactions;
774     }
775
776     if ( $fileobj->transaction_id == 0 ) {
777         my $keytag = $self->load_tag( $keyloc );
778
779         my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
780         return if !$subloc || $is_deleted;
781
782         my $value = $self->read_from_loc( $subloc, $orig_key );
783
784         my $size = $self->_length_needed( $value, $orig_key );
785
786         for my $trans_id ( @transactions ) {
787             my ($loc, $is_deleted, $offset2) = $self->find_keyloc( $keytag, $trans_id );
788             unless ($loc) {
789                 my $location2 = $fileobj->request_space( $size );
790                 $fileobj->print_at( $keytag->{offset} + $offset2,
791                     pack($self->{long_pack}, $location2 ),
792                     pack( 'C C', $trans_id, 0 ),
793                 );
794                 $self->_write_value( $location2, $orig_key, $value, $orig_key );
795             }
796         }
797
798         $keytag = $self->load_tag( $keyloc );
799         ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
800         $fileobj->print_at( $keytag->{offset} + $offset,
801             substr( $keytag->{content}, $offset + $self->{key_size} ),
802             chr(0) x $self->{key_size},
803         );
804     }
805     else {
806         my $keytag = $self->load_tag( $keyloc );
807
808         my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
809
810         $fileobj->print_at( $keytag->{offset} + $offset,
811             pack($self->{long_pack}, 0 ),
812             pack( 'C C', $fileobj->transaction_id, 1 ),
813         );
814     }
815
816     return 1;
817 }
818
819 sub bucket_exists {
820     ##
821     # Check existence of single key given tag and MD5 digested key.
822     ##
823     my $self = shift;
824     my ($tag, $md5) = @_;
825
826     #ACID - This is a read. Can find exact or HEAD
827     my ($keyloc) = $self->_find_in_buckets( $tag, $md5 );
828     my $keytag = $self->load_tag( $keyloc );
829     my ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag );
830     if ( !$subloc && !$is_deleted ) {
831         ($subloc, $is_deleted, $offset) = $self->find_keyloc( $keytag, 0 );
832     }
833     return ($subloc && !$is_deleted) && 1;
834 }
835
836 sub find_blist {
837     ##
838     # Locate offset for bucket list, given digested key
839     ##
840     my $self = shift;
841     my ($offset, $md5, $args) = @_;
842     $args = {} unless $args;
843
844     ##
845     # Locate offset for bucket list using digest index system
846     ##
847     my $tag = $self->load_tag( $offset )
848         or $self->_throw_error( "INTERNAL ERROR - Cannot find tag" );
849
850     my $ch = 0;
851     while ($tag->{signature} ne SIG_BLIST) {
852         my $num = ord substr($md5, $ch, 1);
853
854         my $ref_loc = $tag->{offset} + ($num * $self->{long_size});
855         $tag = $self->index_lookup( $tag, $num );
856
857         if (!$tag) {
858             return if !$args->{create};
859
860             my $loc = $self->_fileobj->request_space(
861                 $self->tag_size( $self->{bucket_list_size} ),
862             );
863
864             $self->_fileobj->print_at( $ref_loc, pack($self->{long_pack}, $loc) );
865
866             $tag = $self->write_tag(
867                 $loc, SIG_BLIST,
868                 chr(0)x$self->{bucket_list_size},
869             );
870
871             $tag->{ref_loc} = $ref_loc;
872             $tag->{ch} = $ch;
873
874             last;
875         }
876
877         $tag->{ch} = $ch++;
878         $tag->{ref_loc} = $ref_loc;
879     }
880
881     return $tag;
882 }
883
884 sub index_lookup {
885     ##
886     # Given index tag, lookup single entry in index and return .
887     ##
888     my $self = shift;
889     my ($tag, $index) = @_;
890
891     my $location = unpack(
892         $self->{long_pack},
893         substr(
894             $tag->{content},
895             $index * $self->{long_size},
896             $self->{long_size},
897         ),
898     );
899
900     if (!$location) { return; }
901
902     return $self->load_tag( $location );
903 }
904
905 sub traverse_index {
906     ##
907     # Scan index and recursively step into deeper levels, looking for next key.
908     ##
909     my $self = shift;
910     my ($xxxx, $offset, $ch, $force_return_next) = @_;
911
912     my $tag = $self->load_tag( $offset );
913
914     if ($tag->{signature} ne SIG_BLIST) {
915         my $start = $xxxx->{return_next} ? 0 : ord(substr($xxxx->{prev_md5}, $ch, 1));
916
917         for (my $idx = $start; $idx < $self->{hash_chars_used}; $idx++) {
918             my $subloc = unpack(
919                 $self->{long_pack},
920                 substr(
921                     $tag->{content},
922                     $idx * $self->{long_size},
923                     $self->{long_size},
924                 ),
925             );
926
927             if ($subloc) {
928                 my $result = $self->traverse_index(
929                     $xxxx, $subloc, $ch + 1, $force_return_next,
930                 );
931
932                 if (defined $result) { return $result; }
933             }
934         } # index loop
935
936         $xxxx->{return_next} = 1;
937     }
938     # This is the bucket list
939     else {
940         my $keys = $tag->{content};
941         if ($force_return_next) { $xxxx->{return_next} = 1; }
942
943         ##
944         # Iterate through buckets, looking for a key match
945         ##
946         my $transaction_id = $self->_fileobj->transaction_id;
947         for (my $i = 0; $i < $self->{max_buckets}; $i++) {
948             my ($key, $keyloc) = $self->_get_key_subloc( $keys, $i );
949
950             # End of bucket list -- return to outer loop
951             if (!$keyloc) {
952                 $xxxx->{return_next} = 1;
953                 last;
954             }
955             # Located previous key -- return next one found
956             elsif ($key eq $xxxx->{prev_md5}) {
957                 $xxxx->{return_next} = 1;
958                 next;
959             }
960             # Seek to bucket location and skip over signature
961             elsif ($xxxx->{return_next}) {
962                 my $fileobj = $self->_fileobj;
963
964                 my $keytag = $self->load_tag( $keyloc );
965                 my ($subloc, $is_deleted) = $self->find_keyloc( $keytag );
966                 if ( $subloc == 0 && !$is_deleted ) {
967                     ($subloc, $is_deleted) = $self->find_keyloc( $keytag, 0 );
968                 }
969                 next if $is_deleted;
970
971                 # Skip over value to get to plain key
972                 my $sig = $fileobj->read_at( $subloc, SIG_SIZE );
973
974                 my $size = $fileobj->read_at( undef, $self->{data_size} );
975                 $size = unpack($self->{data_pack}, $size);
976                 if ($size) { $fileobj->increment_pointer( $size ); }
977
978                 # Read in plain key and return as scalar
979                 $size = $fileobj->read_at( undef, $self->{data_size} );
980                 $size = unpack($self->{data_pack}, $size);
981
982                 my $plain_key;
983                 if ($size) { $plain_key = $fileobj->read_at( undef, $size); }
984                 return $plain_key;
985             }
986         }
987
988         $xxxx->{return_next} = 1;
989     }
990
991     return;
992 }
993
994 sub get_next_key {
995     ##
996     # Locate next key, given digested previous one
997     ##
998     my $self = shift;
999     my ($obj) = @_;
1000
1001     ##
1002     # If the previous key was not specifed, start at the top and
1003     # return the first one found.
1004     ##
1005     my $temp;
1006     if ( @_ > 1 ) {
1007         $temp = {
1008             prev_md5    => $_[1],
1009             return_next => 0,
1010         };
1011     }
1012     else {
1013         $temp = {
1014             prev_md5    => chr(0) x $self->{hash_size},
1015             return_next => 1,
1016         };
1017     }
1018
1019     return $self->traverse_index( $temp, $obj->_base_offset, 0 );
1020 }
1021
1022 # Utilities
1023
1024 sub _get_key_subloc {
1025     my $self = shift;
1026     my ($keys, $idx) = @_;
1027
1028     return unpack(
1029         # This is 'a', not 'A'. Please read the pack() documentation for the
1030         # difference between the two and why it's important.
1031         "a$self->{hash_size} $self->{long_pack}",
1032         substr(
1033             $keys,
1034             ($idx * $self->{bucket_size}),
1035             $self->{bucket_size},
1036         ),
1037     );
1038 }
1039
1040 sub _find_in_buckets {
1041     my $self = shift;
1042     my ($tag, $md5) = @_;
1043
1044     BUCKET:
1045     for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
1046         my ($key, $subloc) = $self->_get_key_subloc(
1047             $tag->{content}, $i,
1048         );
1049
1050         my @rv = ($subloc, $i * $self->{bucket_size});
1051
1052         unless ( $subloc ) {
1053             return @rv;
1054         }
1055
1056         next BUCKET if $key ne $md5;
1057
1058         return @rv;
1059     }
1060
1061     return;
1062 }
1063
1064 sub _release_space {
1065     my $self = shift;
1066     my ($size, $loc) = @_;
1067
1068     my $next_loc = 0;
1069
1070     $self->_fileobj->print_at( $loc,
1071         SIG_FREE, 
1072         pack($self->{long_pack}, $size ),
1073         pack($self->{long_pack}, $next_loc ),
1074     );
1075
1076     return;
1077 }
1078
1079 sub _throw_error {
1080     die "DBM::Deep: $_[1]\n";
1081 }
1082
1083 sub _get_dbm_object {
1084     my $item = shift;
1085
1086     my $obj = eval {
1087         local $SIG{__DIE__};
1088         if ($item->isa( 'DBM::Deep' )) {
1089             return $item;
1090         }
1091         return;
1092     };
1093     return $obj if $obj;
1094
1095     my $r = Scalar::Util::reftype( $item ) || '';
1096     if ( $r eq 'HASH' ) {
1097         my $obj = eval {
1098             local $SIG{__DIE__};
1099             my $obj = tied(%$item);
1100             if ($obj->isa( 'DBM::Deep' )) {
1101                 return $obj;
1102             }
1103             return;
1104         };
1105         return $obj if $obj;
1106     }
1107     elsif ( $r eq 'ARRAY' ) {
1108         my $obj = eval {
1109             local $SIG{__DIE__};
1110             my $obj = tied(@$item);
1111             if ($obj->isa( 'DBM::Deep' )) {
1112                 return $obj;
1113             }
1114             return;
1115         };
1116         return $obj if $obj;
1117     }
1118
1119     return;
1120 }
1121
1122 sub _length_needed {
1123     my $self = shift;
1124     my ($value, $key) = @_;
1125
1126     my $is_dbm_deep = eval {
1127         local $SIG{'__DIE__'};
1128         $value->isa( 'DBM::Deep' );
1129     };
1130
1131     my $len = SIG_SIZE
1132             + $self->{data_size} # size for value
1133             + $self->{data_size} # size for key
1134             + length( $key );    # length of key
1135
1136     if ( $is_dbm_deep && $value->_fileobj eq $self->_fileobj ) {
1137         # long_size is for the internal reference
1138         return $len + $self->{long_size};
1139     }
1140
1141     if ( $self->_fileobj->{autobless} ) {
1142         # This is for the bit saying whether or not this thing is blessed.
1143         $len += 1;
1144     }
1145
1146     my $r = Scalar::Util::reftype( $value ) || '';
1147     unless ( $r eq 'HASH' || $r eq 'ARRAY' ) {
1148         if ( defined $value ) {
1149             $len += length( $value );
1150         }
1151         return $len;
1152     }
1153
1154     $len += $self->{index_size};
1155
1156     # if autobless is enabled, must also take into consideration
1157     # the class name as it is stored after the key.
1158     if ( $self->_fileobj->{autobless} ) {
1159         my $c = Scalar::Util::blessed($value);
1160         if ( defined $c && !$is_dbm_deep ) {
1161             $len += $self->{data_size} + length($c);
1162         }
1163     }
1164
1165     return $len;
1166 }
1167
1168 1;
1169 __END__