Fixed bug where overwrites weren't transaction-aware
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 use 5.6.0;
4
5 use strict;
6 use warnings;
7
8 use Fcntl qw( :DEFAULT :flock );
9 use Scalar::Util ();
10
11 # File-wide notes:
12 # * To add to bucket_size, make sure you modify the following:
13 #   - calculate_sizes()
14 #   - _get_key_subloc()
15 #   - add_bucket() - where the buckets are printed
16
17 ##
18 # Setup file and tag signatures.  These should never change.
19 ##
20 sub SIG_FILE     () { 'DPDB' }
21 sub SIG_HEADER   () { 'h'    }
22 sub SIG_INTERNAL () { 'i'    }
23 sub SIG_HASH     () { 'H'    }
24 sub SIG_ARRAY    () { 'A'    }
25 sub SIG_NULL     () { 'N'    }
26 sub SIG_DATA     () { 'D'    }
27 sub SIG_INDEX    () { 'I'    }
28 sub SIG_BLIST    () { 'B'    }
29 sub SIG_FREE     () { 'F'    }
30 sub SIG_SIZE     () {  1     }
31
32 sub new {
33     my $class = shift;
34     my ($args) = @_;
35
36     my $self = bless {
37         long_size   => 4,
38         long_pack   => 'N',
39         data_size   => 4,
40         data_pack   => 'N',
41
42         digest      => \&Digest::MD5::md5,
43         hash_size   => 16,
44
45         ##
46         # Maximum number of buckets per list before another level of indexing is
47         # done. Increase this value for slightly greater speed, but larger database
48         # files. DO NOT decrease this value below 16, due to risk of recursive
49         # reindex overrun.
50         ##
51         max_buckets => 16,
52
53         fileobj => undef,
54         obj     => undef,
55     }, $class;
56
57     if ( defined $args->{pack_size} ) {
58         if ( lc $args->{pack_size} eq 'small' ) {
59             $args->{long_size} = 2;
60             $args->{long_pack} = 'n';
61         }
62         elsif ( lc $args->{pack_size} eq 'medium' ) {
63             $args->{long_size} = 4;
64             $args->{long_pack} = 'N';
65         }
66         elsif ( lc $args->{pack_size} eq 'large' ) {
67             $args->{long_size} = 8;
68             $args->{long_pack} = 'Q';
69         }
70         else {
71             die "Unknown pack_size value: '$args->{pack_size}'\n";
72         }
73     }
74
75     # Grab the parameters we want to use
76     foreach my $param ( keys %$self ) {
77         next unless exists $args->{$param};
78         $self->{$param} = $args->{$param};
79     }
80     Scalar::Util::weaken( $self->{obj} ) if $self->{obj};
81
82     if ( $self->{max_buckets} < 16 ) {
83         warn "Floor of max_buckets is 16. Setting it to 16 from '$self->{max_buckets}'\n";
84         $self->{max_buckets} = 16;
85     }
86
87     return $self;
88 }
89
90 sub _fileobj { return $_[0]{fileobj} }
91
92 sub calculate_sizes {
93     my $self = shift;
94
95     # The 2**8 here indicates the number of different characters in the
96     # current hashing algorithm
97     #XXX Does this need to be updated with different hashing algorithms?
98     $self->{index_size}       = (2**8) * $self->{long_size};
99     $self->{bucket_size}      = $self->{hash_size} + $self->{long_size} * 3;
100     $self->{bucket_list_size} = $self->{max_buckets} * $self->{bucket_size};
101
102     return;
103 }
104
105 sub write_file_header {
106     my $self = shift;
107
108     my $loc = $self->_fileobj->request_space( length( SIG_FILE ) + 21 );
109
110     $self->_fileobj->print_at( $loc,
111         SIG_FILE,
112         SIG_HEADER,
113         pack('N', 1),  # header version
114         pack('N', 12), # header size
115         pack('N', 0),  # currently running transaction IDs
116         pack('n', $self->{long_size}),
117         pack('A', $self->{long_pack}),
118         pack('n', $self->{data_size}),
119         pack('A', $self->{data_pack}),
120         pack('n', $self->{max_buckets}),
121     );
122
123     $self->_fileobj->set_transaction_offset( 13 );
124
125     return;
126 }
127
128 sub read_file_header {
129     my $self = shift;
130
131     my $buffer = $self->_fileobj->read_at( 0, length(SIG_FILE) + 9 );
132     return unless length($buffer);
133
134     my ($file_signature, $sig_header, $header_version, $size) = unpack(
135         'A4 A N N', $buffer
136     );
137
138     unless ( $file_signature eq SIG_FILE ) {
139         $self->_fileobj->close;
140         $self->_throw_error( "Signature not found -- file is not a Deep DB" );
141     }
142
143     unless ( $sig_header eq SIG_HEADER ) {
144         $self->_fileobj->close;
145         $self->_throw_error( "Old file version found." );
146     }
147
148     my $buffer2 = $self->_fileobj->read_at( undef, $size );
149     my ($running_transactions, @values) = unpack( 'N n A n A n', $buffer2 );
150
151     $self->_fileobj->set_transaction_offset( 13 );
152
153     if ( @values < 5 || grep { !defined } @values ) {
154         $self->_fileobj->close;
155         $self->_throw_error("Corrupted file - bad header");
156     }
157
158     #XXX Add warnings if values weren't set right
159     @{$self}{qw(long_size long_pack data_size data_pack max_buckets)} = @values;
160
161     return length($buffer) + length($buffer2);
162 }
163
164 sub setup_fh {
165     my $self = shift;
166     my ($obj) = @_;
167
168     # Need to remove use of $fh here
169     my $fh = $self->_fileobj->{fh};
170     flock $fh, LOCK_EX;
171
172     #XXX The duplication of calculate_sizes needs to go away
173     unless ( $obj->{base_offset} ) {
174         my $bytes_read = $self->read_file_header;
175
176         $self->calculate_sizes;
177
178         ##
179         # File is empty -- write header and master index
180         ##
181         if (!$bytes_read) {
182             $self->_fileobj->audit( "# Database created on" );
183
184             $self->write_file_header;
185
186             $obj->{base_offset} = $self->_fileobj->request_space(
187                 $self->tag_size( $self->{index_size} ),
188             );
189
190             $self->write_tag(
191                 $obj->_base_offset, $obj->_type,
192                 chr(0)x$self->{index_size},
193             );
194
195             # Flush the filehandle
196             my $old_fh = select $fh;
197             my $old_af = $|; $| = 1; $| = $old_af;
198             select $old_fh;
199         }
200         else {
201             $obj->{base_offset} = $bytes_read;
202
203             ##
204             # Get our type from master index header
205             ##
206             my $tag = $self->load_tag($obj->_base_offset);
207             unless ( $tag ) {
208                 flock $fh, LOCK_UN;
209                 $self->_throw_error("Corrupted file, no master index record");
210             }
211
212             unless ($obj->_type eq $tag->{signature}) {
213                 flock $fh, LOCK_UN;
214                 $self->_throw_error("File type mismatch");
215             }
216         }
217     }
218     else {
219         $self->calculate_sizes;
220     }
221
222     #XXX We have to make sure we don't mess up when autoflush isn't turned on
223     $self->_fileobj->set_inode;
224
225     flock $fh, LOCK_UN;
226
227     return 1;
228 }
229
230 sub tag_size {
231     my $self = shift;
232     my ($size) = @_;
233     return SIG_SIZE + $self->{data_size} + $size;
234 }
235
236 sub write_tag {
237     ##
238     # Given offset, signature and content, create tag and write to disk
239     ##
240     my $self = shift;
241     my ($offset, $sig, $content) = @_;
242     my $size = length( $content );
243
244     $self->_fileobj->print_at(
245         $offset, 
246         $sig, pack($self->{data_pack}, $size), $content,
247     );
248
249     return unless defined $offset;
250
251     return {
252         signature => $sig,
253         size => $size,
254         offset => $offset + SIG_SIZE + $self->{data_size},
255         content => $content
256     };
257 }
258
259 sub load_tag {
260     ##
261     # Given offset, load single tag and return signature, size and data
262     ##
263     my $self = shift;
264     my ($offset) = @_;
265
266     my $fileobj = $self->_fileobj;
267
268     my $s = SIG_SIZE + $self->{data_size};
269     my $b = $fileobj->read_at( $offset, $s );
270     my ($sig, $size) = unpack( "A $self->{data_pack}", $b );
271
272     my $buffer = $fileobj->read_at( undef, $size );
273
274     return {
275         signature => $sig,
276         size => $size,
277         offset => $offset + SIG_SIZE + $self->{data_size},
278         content => $buffer
279     };
280 }
281
282 sub _get_dbm_object {
283     my $item = shift;
284
285     my $obj = eval {
286         local $SIG{__DIE__};
287         if ($item->isa( 'DBM::Deep' )) {
288             return $item;
289         }
290         return;
291     };
292     return $obj if $obj;
293
294     my $r = Scalar::Util::reftype( $item ) || '';
295     if ( $r eq 'HASH' ) {
296         my $obj = eval {
297             local $SIG{__DIE__};
298             my $obj = tied(%$item);
299             if ($obj->isa( 'DBM::Deep' )) {
300                 return $obj;
301             }
302             return;
303         };
304         return $obj if $obj;
305     }
306     elsif ( $r eq 'ARRAY' ) {
307         my $obj = eval {
308             local $SIG{__DIE__};
309             my $obj = tied(@$item);
310             if ($obj->isa( 'DBM::Deep' )) {
311                 return $obj;
312             }
313             return;
314         };
315         return $obj if $obj;
316     }
317
318     return;
319 }
320
321 sub _length_needed {
322     my $self = shift;
323     my ($value, $key) = @_;
324
325     my $is_dbm_deep = eval {
326         local $SIG{'__DIE__'};
327         $value->isa( 'DBM::Deep' );
328     };
329
330     my $len = SIG_SIZE
331             + $self->{data_size} # size for value
332             + $self->{data_size} # size for key
333             + length( $key );    # length of key
334
335     if ( $is_dbm_deep && $value->_fileobj eq $self->_fileobj ) {
336         # long_size is for the internal reference
337         return $len + $self->{long_size};
338     }
339
340     if ( $self->_fileobj->{autobless} ) {
341         # This is for the bit saying whether or not this thing is blessed.
342         $len += 1;
343     }
344
345     my $r = Scalar::Util::reftype( $value ) || '';
346     unless ( $r eq 'HASH' || $r eq 'ARRAY' ) {
347         if ( defined $value ) {
348             $len += length( $value );
349         }
350         return $len;
351     }
352
353     $len += $self->{index_size};
354
355     # if autobless is enabled, must also take into consideration
356     # the class name as it is stored after the key.
357     if ( $self->_fileobj->{autobless} ) {
358         my $c = Scalar::Util::blessed($value);
359         if ( defined $c && !$is_dbm_deep ) {
360             $len += $self->{data_size} + length($c);
361         }
362     }
363
364     return $len;
365 }
366
367 sub add_bucket {
368     ##
369     # Adds one key/value pair to bucket list, given offset, MD5 digest of key,
370     # plain (undigested) key and value.
371     ##
372     my $self = shift;
373     my ($tag, $md5, $plain_key, $value, $deleted, $orig_key) = @_;
374     $deleted ||= 0;
375
376     local($/,$\);
377
378     # This verifies that only supported values will be stored.
379     {
380         my $r = Scalar::Util::reftype( $value );
381         last if !defined $r;
382
383         last if $r eq 'HASH';
384         last if $r eq 'ARRAY';
385
386         $self->_throw_error(
387             "Storage of variables of type '$r' is not supported."
388         );
389     }
390
391     my $location = 0;
392     my $result = 2;
393
394     my $fileobj = $self->_fileobj;
395
396     my $actual_length = $self->_length_needed( $value, $plain_key );
397
398     #ACID - This is a mutation. Must only find the exact transaction
399     my ($subloc, $offset, $size,$is_deleted) = $self->_find_in_buckets( $tag, $md5, 1 );
400
401     my @transactions;
402     if ( $fileobj->transaction_id == 0 ) {
403         @transactions = $fileobj->current_transactions;
404     }
405
406 #    $self->_release_space( $size, $subloc );
407     # Updating a known md5
408 #XXX This needs updating to use _release_space
409     if ( $subloc ) {
410         $result = 1;
411
412         if ($actual_length <= $size) {
413             $location = $subloc;
414         }
415         else {
416             $location = $fileobj->request_space( $actual_length );
417
418             $fileobj->print_at( $tag->{offset} + $offset + $self->{hash_size},
419                 pack($self->{long_pack}, $location ),
420                 pack($self->{long_pack}, $actual_length ),
421                 pack('n n', $fileobj->transaction_id, $deleted ),
422             );
423         }
424
425         my $old_value = $self->read_from_loc( $subloc, $orig_key );
426         for ( @transactions ) {
427             my $tag2 = $self->load_tag( $tag->{offset} - SIG_SIZE - $self->{data_size} );
428             $fileobj->{transaction_id} = $_;
429             $self->add_bucket( $tag2, $md5, $orig_key, $old_value, undef, $orig_key );
430             $fileobj->{transaction_id} = 0;
431         }
432         $tag = $self->load_tag( $tag->{offset} - SIG_SIZE - $self->{data_size} );
433     }
434     # Adding a new md5
435     elsif ( defined $offset ) {
436         $location = $fileobj->request_space( $actual_length );
437
438         $fileobj->print_at( $tag->{offset} + $offset,
439             $md5,
440             pack($self->{long_pack}, $location ),
441             pack($self->{long_pack}, $actual_length ),
442             pack('n n', $fileobj->transaction_id, $deleted ),
443         );
444
445         for ( @transactions ) {
446             my $tag2 = $self->load_tag( $tag->{offset} - SIG_SIZE - $self->{data_size} );
447             $fileobj->{transaction_id} = $_;
448             $self->add_bucket( $tag2, $md5, '', '', 1, $orig_key );
449             $fileobj->{transaction_id} = 0;
450         }
451         $tag = $self->load_tag( $tag->{offset} - SIG_SIZE - $self->{data_size} );
452     }
453     # If bucket didn't fit into list, split into a new index level
454     # split_index() will do the _fileobj->request_space() call
455     else {
456         $location = $self->split_index( $md5, $tag );
457     }
458
459     $self->write_value( $location, $plain_key, $value, $orig_key );
460
461     return $result;
462 }
463
464 sub write_value {
465     my $self = shift;
466     my ($location, $key, $value, $orig_key) = @_;
467
468     my $fileobj = $self->_fileobj;
469
470     my $dbm_deep_obj = _get_dbm_object( $value );
471     if ( $dbm_deep_obj && $dbm_deep_obj->_fileobj ne $fileobj ) {
472         $self->_throw_error( "Cannot cross-reference. Use export() instead" );
473     }
474
475     ##
476     # Write signature based on content type, set content length and write
477     # actual value.
478     ##
479     my $r = Scalar::Util::reftype( $value ) || '';
480     if ( $dbm_deep_obj ) {
481         $self->write_tag( $location, SIG_INTERNAL,pack($self->{long_pack}, $dbm_deep_obj->_base_offset) );
482     }
483     elsif ($r eq 'HASH') {
484         if ( !$dbm_deep_obj && tied %{$value} ) {
485             $self->_throw_error( "Cannot store something that is tied" );
486         }
487         $self->write_tag( $location, SIG_HASH, chr(0)x$self->{index_size} );
488     }
489     elsif ($r eq 'ARRAY') {
490         if ( !$dbm_deep_obj && tied @{$value} ) {
491             $self->_throw_error( "Cannot store something that is tied" );
492         }
493         $self->write_tag( $location, SIG_ARRAY, chr(0)x$self->{index_size} );
494     }
495     elsif (!defined($value)) {
496         $self->write_tag( $location, SIG_NULL, '' );
497     }
498     else {
499         $self->write_tag( $location, SIG_DATA, $value );
500     }
501
502     ##
503     # Plain key is stored AFTER value, as keys are typically fetched less often.
504     ##
505     $fileobj->print_at( undef, pack($self->{data_pack}, length($key)) . $key );
506
507     # Internal references don't care about autobless
508     return 1 if $dbm_deep_obj;
509
510     ##
511     # If value is blessed, preserve class name
512     ##
513     if ( $fileobj->{autobless} ) {
514         if ( defined( my $c = Scalar::Util::blessed($value) ) ) {
515             $fileobj->print_at( undef, chr(1), pack($self->{data_pack}, length($c)) . $c );
516         }
517         else {
518             $fileobj->print_at( undef, chr(0) );
519         }
520     }
521
522     ##
523     # Tie the passed in reference so that changes to it are reflected in the
524     # datafile. The use of $location as the base_offset will act as the
525     # the linkage between parent and child.
526     #
527     # The overall assignment is a hack around the fact that just tying doesn't
528     # store the values. This may not be the wrong thing to do.
529     ##
530     if ($r eq 'HASH') {
531         my %x = %$value;
532         tie %$value, 'DBM::Deep', {
533             base_offset => $location,
534             fileobj     => $fileobj,
535             parent      => $self->{obj},
536             parent_key  => $orig_key,
537         };
538         %$value = %x;
539     }
540     elsif ($r eq 'ARRAY') {
541         my @x = @$value;
542         tie @$value, 'DBM::Deep', {
543             base_offset => $location,
544             fileobj     => $fileobj,
545             parent      => $self->{obj},
546             parent_key  => $orig_key,
547         };
548         @$value = @x;
549     }
550
551     return 1;
552 }
553
554 sub split_index {
555     my $self = shift;
556     my ($md5, $tag) = @_;
557
558     my $fileobj = $self->_fileobj;
559
560     my $loc = $fileobj->request_space(
561         $self->tag_size( $self->{index_size} ),
562     );
563
564     $fileobj->print_at( $tag->{ref_loc}, pack($self->{long_pack}, $loc) );
565
566     my $index_tag = $self->write_tag(
567         $loc, SIG_INDEX,
568         chr(0)x$self->{index_size},
569     );
570
571     my $newtag_loc = $fileobj->request_space(
572         $self->tag_size( $self->{bucket_list_size} ),
573     );
574
575     my $keys = $tag->{content}
576              . $md5 . pack($self->{long_pack}, $newtag_loc)
577                     . pack($self->{long_pack}, 0)  # size
578                     . pack($self->{long_pack}, 0); # transaction ID
579
580     my @newloc = ();
581     BUCKET:
582     # The <= here is deliberate - we have max_buckets+1 keys to iterate
583     # through, unlike every other loop that uses max_buckets as a stop.
584     for (my $i = 0; $i <= $self->{max_buckets}; $i++) {
585         my ($key, $old_subloc, $size) = $self->_get_key_subloc( $keys, $i );
586
587         die "[INTERNAL ERROR]: No key in split_index()\n" unless $key;
588         die "[INTERNAL ERROR]: No subloc in split_index()\n" unless $old_subloc;
589
590         my $num = ord(substr($key, $tag->{ch} + 1, 1));
591
592         if ($newloc[$num]) {
593             my $subkeys = $fileobj->read_at( $newloc[$num], $self->{bucket_list_size} );
594
595             # This is looking for the first empty spot
596             my ($subloc, $offset, $size) = $self->_find_in_buckets(
597                 { content => $subkeys }, '',
598             );
599
600             $fileobj->print_at(
601                 $newloc[$num] + $offset,
602                 $key, pack($self->{long_pack}, $old_subloc),
603             );
604
605             next;
606         }
607
608         my $loc = $fileobj->request_space(
609             $self->tag_size( $self->{bucket_list_size} ),
610         );
611
612         $fileobj->print_at(
613             $index_tag->{offset} + ($num * $self->{long_size}),
614             pack($self->{long_pack}, $loc),
615         );
616
617         my $blist_tag = $self->write_tag(
618             $loc, SIG_BLIST,
619             chr(0)x$self->{bucket_list_size},
620         );
621
622         $fileobj->print_at( $blist_tag->{offset}, $key . pack($self->{long_pack}, $old_subloc) );
623
624         $newloc[$num] = $blist_tag->{offset};
625     }
626
627     $self->_release_space(
628         $self->tag_size( $self->{bucket_list_size} ),
629         $tag->{offset} - SIG_SIZE - $self->{data_size},
630     );
631
632     return $newtag_loc;
633 }
634
635 sub read_from_loc {
636     my $self = shift;
637     my ($subloc, $orig_key) = @_;
638
639     my $fileobj = $self->_fileobj;
640
641     my $signature = $fileobj->read_at( $subloc, SIG_SIZE );
642
643     ##
644     # If value is a hash or array, return new DBM::Deep object with correct offset
645     ##
646     if (($signature eq SIG_HASH) || ($signature eq SIG_ARRAY)) {
647         my $new_obj = DBM::Deep->new({
648             type        => $signature,
649             base_offset => $subloc,
650             fileobj     => $self->_fileobj,
651             parent      => $self->{obj},
652             parent_key  => $orig_key,
653         });
654
655         if ($new_obj->_fileobj->{autobless}) {
656             ##
657             # Skip over value and plain key to see if object needs
658             # to be re-blessed
659             ##
660             $fileobj->increment_pointer( $self->{data_size} + $self->{index_size} );
661
662             my $size = $fileobj->read_at( undef, $self->{data_size} );
663             $size = unpack($self->{data_pack}, $size);
664             if ($size) { $fileobj->increment_pointer( $size ); }
665
666             my $bless_bit = $fileobj->read_at( undef, 1 );
667             if (ord($bless_bit)) {
668                 ##
669                 # Yes, object needs to be re-blessed
670                 ##
671                 my $size = $fileobj->read_at( undef, $self->{data_size} );
672                 $size = unpack($self->{data_pack}, $size);
673
674                 my $class_name;
675                 if ($size) { $class_name = $fileobj->read_at( undef, $size ); }
676                 if (defined $class_name) { $new_obj = bless( $new_obj, $class_name ); }
677             }
678         }
679
680         return $new_obj;
681     }
682     elsif ( $signature eq SIG_INTERNAL ) {
683         my $size = $fileobj->read_at( undef, $self->{data_size} );
684         $size = unpack($self->{data_pack}, $size);
685
686         if ( $size ) {
687             my $new_loc = $fileobj->read_at( undef, $size );
688             $new_loc = unpack( $self->{long_pack}, $new_loc ); 
689             return $self->read_from_loc( $new_loc, $orig_key );
690         }
691         else {
692             return;
693         }
694     }
695     ##
696     # Otherwise return actual value
697     ##
698     elsif ( $signature eq SIG_DATA ) {
699         my $size = $fileobj->read_at( undef, $self->{data_size} );
700         $size = unpack($self->{data_pack}, $size);
701
702         my $value = '';
703         if ($size) { $value = $fileobj->read_at( undef, $size ); }
704         return $value;
705     }
706
707     ##
708     # Key exists, but content is null
709     ##
710     return;
711 }
712
713 sub get_bucket_value {
714     ##
715     # Fetch single value given tag and MD5 digested key.
716     ##
717     my $self = shift;
718     my ($tag, $md5, $orig_key) = @_;
719
720     #ACID - This is a read. Can find exact or HEAD
721     my ($subloc, $offset, $size, $is_deleted) = $self->_find_in_buckets( $tag, $md5 );
722
723     if ( !$subloc ) {
724         #XXX Need to use real key
725 #        $self->add_bucket( $tag, $md5, $orig_key, undef, undef, $orig_key );
726 #        return;
727     }
728     elsif ( !$is_deleted ) {
729         return $self->read_from_loc( $subloc, $orig_key );
730     }
731
732     return;
733 }
734
735 sub delete_bucket {
736     ##
737     # Delete single key/value pair given tag and MD5 digested key.
738     ##
739     my $self = shift;
740     my ($tag, $md5, $orig_key) = @_;
741
742     #ACID - Although this is a mutation, we must find any transaction.
743     # This is because we need to mark something as deleted that is in the HEAD.
744     my ($subloc, $offset, $size,$is_deleted) = $self->_find_in_buckets( $tag, $md5 );
745
746     return if !$subloc;
747
748     my $fileobj = $self->_fileobj;
749
750     my @transactions;
751     if ( $fileobj->transaction_id == 0 ) {
752         @transactions = $fileobj->current_transactions;
753     }
754
755     if ( $fileobj->transaction_id == 0 ) {
756         my $value = $self->read_from_loc( $subloc, $orig_key );
757
758         for (@transactions) {
759             $fileobj->{transaction_id} = $_;
760             #XXX Need to use real key
761             $self->add_bucket( $tag, $md5, $orig_key, $value, undef, $orig_key );
762             $fileobj->{transaction_id} = 0;
763         }
764         $tag = $self->load_tag( $tag->{offset} - SIG_SIZE - $self->{data_size} );
765
766         #XXX This needs _release_space() for the value and anything below
767         $fileobj->print_at(
768             $tag->{offset} + $offset,
769             substr( $tag->{content}, $offset + $self->{bucket_size} ),
770             chr(0) x $self->{bucket_size},
771         );
772     }
773     else {
774         $self->add_bucket( $tag, $md5, '', '', 1, $orig_key );
775     }
776
777     return 1;
778 }
779
780 sub bucket_exists {
781     ##
782     # Check existence of single key given tag and MD5 digested key.
783     ##
784     my $self = shift;
785     my ($tag, $md5) = @_;
786
787     #ACID - This is a read. Can find exact or HEAD
788     my ($subloc, $offset, $size, $is_deleted) = $self->_find_in_buckets( $tag, $md5 );
789     return ($subloc && !$is_deleted) && 1;
790 }
791
792 sub find_bucket_list {
793     ##
794     # Locate offset for bucket list, given digested key
795     ##
796     my $self = shift;
797     my ($offset, $md5, $args) = @_;
798     $args = {} unless $args;
799
800     local($/,$\);
801
802     ##
803     # Locate offset for bucket list using digest index system
804     ##
805     my $tag = $self->load_tag( $offset )
806         or $self->_throw_error( "INTERNAL ERROR - Cannot find tag" );
807
808     my $ch = 0;
809     while ($tag->{signature} ne SIG_BLIST) {
810         my $num = ord substr($md5, $ch, 1);
811
812         my $ref_loc = $tag->{offset} + ($num * $self->{long_size});
813         $tag = $self->index_lookup( $tag, $num );
814
815         if (!$tag) {
816             return if !$args->{create};
817
818             my $loc = $self->_fileobj->request_space(
819                 $self->tag_size( $self->{bucket_list_size} ),
820             );
821
822             $self->_fileobj->print_at( $ref_loc, pack($self->{long_pack}, $loc) );
823
824             $tag = $self->write_tag(
825                 $loc, SIG_BLIST,
826                 chr(0)x$self->{bucket_list_size},
827             );
828
829             $tag->{ref_loc} = $ref_loc;
830             $tag->{ch} = $ch;
831
832             last;
833         }
834
835         $tag->{ch} = $ch++;
836         $tag->{ref_loc} = $ref_loc;
837     }
838
839     return $tag;
840 }
841
842 sub index_lookup {
843     ##
844     # Given index tag, lookup single entry in index and return .
845     ##
846     my $self = shift;
847     my ($tag, $index) = @_;
848
849     my $location = unpack(
850         $self->{long_pack},
851         substr(
852             $tag->{content},
853             $index * $self->{long_size},
854             $self->{long_size},
855         ),
856     );
857
858     if (!$location) { return; }
859
860     return $self->load_tag( $location );
861 }
862
863 sub traverse_index {
864     ##
865     # Scan index and recursively step into deeper levels, looking for next key.
866     ##
867     my $self = shift;
868     my ($obj, $offset, $ch, $force_return_next) = @_;
869
870     my $tag = $self->load_tag( $offset );
871
872     if ($tag->{signature} ne SIG_BLIST) {
873         my $content = $tag->{content};
874         my $start = $obj->{return_next} ? 0 : ord(substr($obj->{prev_md5}, $ch, 1));
875
876         for (my $idx = $start; $idx < (2**8); $idx++) {
877             my $subloc = unpack(
878                 $self->{long_pack},
879                 substr(
880                     $content,
881                     $idx * $self->{long_size},
882                     $self->{long_size},
883                 ),
884             );
885
886             if ($subloc) {
887                 my $result = $self->traverse_index(
888                     $obj, $subloc, $ch + 1, $force_return_next,
889                 );
890
891                 if (defined($result)) { return $result; }
892             }
893         } # index loop
894
895         $obj->{return_next} = 1;
896     }
897     # This is the bucket list
898     else {
899         my $keys = $tag->{content};
900         if ($force_return_next) { $obj->{return_next} = 1; }
901
902         ##
903         # Iterate through buckets, looking for a key match
904         ##
905         my $transaction_id = $self->_fileobj->transaction_id;
906         for (my $i = 0; $i < $self->{max_buckets}; $i++) {
907             my ($key, $subloc, $size, $trans_id, $is_deleted) = $self->_get_key_subloc( $keys, $i );
908
909             next if $is_deleted;
910 #XXX Need to find all the copies of this key to find out if $transaction_id has it
911 #XXX marked as deleted, in use, or what.
912             next if $trans_id && $trans_id != $transaction_id;
913
914             # End of bucket list -- return to outer loop
915             if (!$subloc) {
916                 $obj->{return_next} = 1;
917                 last;
918             }
919             # Located previous key -- return next one found
920             elsif ($key eq $obj->{prev_md5}) {
921                 $obj->{return_next} = 1;
922                 next;
923             }
924             # Seek to bucket location and skip over signature
925             elsif ($obj->{return_next}) {
926                 my $fileobj = $self->_fileobj;
927
928                 # Skip over value to get to plain key
929                 my $sig = $fileobj->read_at( $subloc, SIG_SIZE );
930
931                 my $size = $fileobj->read_at( undef, $self->{data_size} );
932                 $size = unpack($self->{data_pack}, $size);
933                 if ($size) { $fileobj->increment_pointer( $size ); }
934
935                 # Read in plain key and return as scalar
936                 $size = $fileobj->read_at( undef, $self->{data_size} );
937                 $size = unpack($self->{data_pack}, $size);
938                 my $plain_key;
939                 if ($size) { $plain_key = $fileobj->read_at( undef, $size); }
940
941                 return $plain_key;
942             }
943         }
944
945         $obj->{return_next} = 1;
946     }
947
948     return;
949 }
950
951 sub get_next_key {
952     ##
953     # Locate next key, given digested previous one
954     ##
955     my $self = shift;
956     my ($obj) = @_;
957
958     $obj->{prev_md5} = $_[1] ? $_[1] : undef;
959     $obj->{return_next} = 0;
960
961     ##
962     # If the previous key was not specifed, start at the top and
963     # return the first one found.
964     ##
965     if (!$obj->{prev_md5}) {
966         $obj->{prev_md5} = chr(0) x $self->{hash_size};
967         $obj->{return_next} = 1;
968     }
969
970     return $self->traverse_index( $obj, $obj->_base_offset, 0 );
971 }
972
973 # Utilities
974
975 sub _get_key_subloc {
976     my $self = shift;
977     my ($keys, $idx) = @_;
978
979     my ($key, $subloc, $size, $transaction_id, $is_deleted) = unpack(
980         # This is 'a', not 'A'. Please read the pack() documentation for the
981         # difference between the two and why it's important.
982         "a$self->{hash_size} $self->{long_pack}2 n2",
983         substr(
984             $keys,
985             ($idx * $self->{bucket_size}),
986             $self->{bucket_size},
987         ),
988     );
989
990     return ($key, $subloc, $size, $transaction_id, $is_deleted);
991 }
992
993 sub _find_in_buckets {
994     my $self = shift;
995     my ($tag, $md5, $exact) = @_;
996     $exact ||= 0;
997
998     my $trans_id = $self->_fileobj->transaction_id;
999
1000     my @zero;
1001
1002     BUCKET:
1003     for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
1004         my ($key, $subloc, $size, $transaction_id, $is_deleted) = $self->_get_key_subloc(
1005             $tag->{content}, $i,
1006         );
1007
1008         my @rv = ($subloc, $i * $self->{bucket_size}, $size, $is_deleted);
1009
1010         unless ( $subloc ) {
1011             if ( !$exact && @zero && $trans_id ) {
1012                 @rv = ($zero[2], $zero[0] * $self->{bucket_size},$zero[3],$is_deleted);
1013             }
1014             return @rv;
1015         }
1016
1017         next BUCKET if $key ne $md5;
1018
1019         # Save off the HEAD in case we need it.
1020         @zero = ($i,$key,$subloc,$size,$transaction_id,$is_deleted) if $transaction_id == 0;
1021
1022         next BUCKET if $transaction_id != $trans_id;
1023
1024         return @rv;
1025     }
1026
1027     return;
1028 }
1029
1030 sub _release_space {
1031     my $self = shift;
1032     my ($size, $loc) = @_;
1033
1034     my $next_loc = 0;
1035
1036     $self->_fileobj->print_at( $loc,
1037         SIG_FREE, 
1038         pack($self->{long_pack}, $size ),
1039         pack($self->{long_pack}, $next_loc ),
1040     );
1041
1042     return;
1043 }
1044
1045 sub _throw_error {
1046     die "DBM::Deep: $_[1]\n";
1047 }
1048
1049 1;
1050 __END__