6c8e9594e4b911065b128ea58cbdd2a5657228ac
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 use 5.6.0;
4
5 use strict;
6 use warnings;
7
8 use Fcntl qw( :DEFAULT :flock :seek );
9 use Scalar::Util ();
10
11 # File-wide notes:
12 # * All the local($/,$\); are to protect read() and print() from -l.
13 # * To add to bucket_size, make sure you modify the following:
14 #   - calculate_sizes()
15 #   - _get_key_subloc()
16 #   - add_bucket() - where the buckets are printed
17
18 ##
19 # Setup file and tag signatures.  These should never change.
20 ##
21 sub SIG_FILE     () { 'DPDB' }
22 sub SIG_HEADER   () { 'h'    }
23 sub SIG_INTERNAL () { 'i'    }
24 sub SIG_HASH     () { 'H'    }
25 sub SIG_ARRAY    () { 'A'    }
26 sub SIG_NULL     () { 'N'    }
27 sub SIG_DATA     () { 'D'    }
28 sub SIG_INDEX    () { 'I'    }
29 sub SIG_BLIST    () { 'B'    }
30 sub SIG_FREE     () { 'F'    }
31 sub SIG_SIZE     () {  1     }
32
33 sub new {
34     my $class = shift;
35     my ($args) = @_;
36
37     my $self = bless {
38         long_size   => 4,
39         long_pack   => 'N',
40         data_size   => 4,
41         data_pack   => 'N',
42
43         digest      => \&Digest::MD5::md5,
44         hash_size   => 16,
45
46         ##
47         # Maximum number of buckets per list before another level of indexing is
48         # done. Increase this value for slightly greater speed, but larger database
49         # files. DO NOT decrease this value below 16, due to risk of recursive
50         # reindex overrun.
51         ##
52         max_buckets => 16,
53
54         fileobj => undef,
55         obj     => undef,
56     }, $class;
57
58     if ( defined $args->{pack_size} ) {
59         if ( lc $args->{pack_size} eq 'small' ) {
60             $args->{long_size} = 2;
61             $args->{long_pack} = 'n';
62         }
63         elsif ( lc $args->{pack_size} eq 'medium' ) {
64             $args->{long_size} = 4;
65             $args->{long_pack} = 'N';
66         }
67         elsif ( lc $args->{pack_size} eq 'large' ) {
68             $args->{long_size} = 8;
69             $args->{long_pack} = 'Q';
70         }
71         else {
72             die "Unknown pack_size value: '$args->{pack_size}'\n";
73         }
74     }
75
76     # Grab the parameters we want to use
77     foreach my $param ( keys %$self ) {
78         next unless exists $args->{$param};
79         $self->{$param} = $args->{$param};
80     }
81     Scalar::Util::weaken( $self->{obj} ) if $self->{obj};
82
83     if ( $self->{max_buckets} < 16 ) {
84         warn "Floor of max_buckets is 16. Setting it to 16 from '$self->{max_buckets}'\n";
85         $self->{max_buckets} = 16;
86     }
87
88     return $self;
89 }
90
91 sub _fileobj { return $_[0]{fileobj} }
92 sub _fh      { return $_[0]->_fileobj->{fh} }
93
94 sub calculate_sizes {
95     my $self = shift;
96
97     #XXX Does this need to be updated with different hashing algorithms?
98     $self->{index_size}       = (2**8) * $self->{long_size};
99     $self->{bucket_size}      = $self->{hash_size} + $self->{long_size} * 3;
100     $self->{bucket_list_size} = $self->{max_buckets} * $self->{bucket_size};
101
102     return;
103 }
104
105 sub write_file_header {
106     my $self = shift;
107
108     local($/,$\);
109
110     my $fh = $self->_fh;
111
112     my $loc = $self->_request_space( length( SIG_FILE ) + 21 );
113     seek($fh, $loc + $self->_fileobj->{file_offset}, SEEK_SET);
114     print( $fh
115         SIG_FILE,
116         SIG_HEADER,
117         pack('N', 1),  # header version
118         pack('N', 12), # header size
119         pack('N', 0),  # currently running transaction IDs
120         pack('n', $self->{long_size}),
121         pack('A', $self->{long_pack}),
122         pack('n', $self->{data_size}),
123         pack('A', $self->{data_pack}),
124         pack('n', $self->{max_buckets}),
125     );
126
127     $self->_fileobj->set_transaction_offset( 13 );
128
129     return;
130 }
131
132 sub read_file_header {
133     my $self = shift;
134
135     local($/,$\);
136
137     my $fh = $self->_fh;
138
139     seek($fh, 0 + $self->_fileobj->{file_offset}, SEEK_SET);
140     my $buffer;
141     my $bytes_read = read( $fh, $buffer, length(SIG_FILE) + 9 );
142
143     return unless $bytes_read;
144
145     my ($file_signature, $sig_header, $header_version, $size) = unpack(
146         'A4 A N N', $buffer
147     );
148
149     unless ( $file_signature eq SIG_FILE ) {
150         $self->_fileobj->close;
151         $self->_throw_error( "Signature not found -- file is not a Deep DB" );
152     }
153
154     unless ( $sig_header eq SIG_HEADER ) {
155         $self->_fileobj->close;
156         $self->_throw_error( "Old file version found." );
157     }
158
159     my $buffer2;
160     $bytes_read += read( $fh, $buffer2, $size );
161     my ($running_transactions, @values) = unpack( 'N n A n A n', $buffer2 );
162
163     $self->_fileobj->set_transaction_offset( 13 );
164
165     if ( @values < 5 || grep { !defined } @values ) {
166         $self->_fileobj->close;
167         $self->_throw_error("Corrupted file - bad header");
168     }
169
170     #XXX Add warnings if values weren't set right
171     @{$self}{qw(long_size long_pack data_size data_pack max_buckets)} = @values;
172
173     return $bytes_read;
174 }
175
176 sub setup_fh {
177     my $self = shift;
178     my ($obj) = @_;
179
180     local($/,$\);
181
182     my $fh = $self->_fh;
183     flock $fh, LOCK_EX;
184
185     #XXX The duplication of calculate_sizes needs to go away
186     unless ( $obj->{base_offset} ) {
187         my $bytes_read = $self->read_file_header;
188
189         $self->calculate_sizes;
190
191         ##
192         # File is empty -- write header and master index
193         ##
194         if (!$bytes_read) {
195             if ( my $afh = $self->_fileobj->{audit_fh} ) {
196                 flock( $afh, LOCK_EX );
197                 print( $afh "# Database created on " . localtime(time) . "\n" );
198                 flock( $afh, LOCK_UN );
199             }
200
201             $self->write_file_header;
202
203             $obj->{base_offset} = $self->_request_space( $self->tag_size( $self->{index_size} ) );
204
205             $self->write_tag(
206                 $obj->_base_offset, $obj->_type,
207                 chr(0)x$self->{index_size},
208             );
209
210             # Flush the filehandle
211             my $old_fh = select $fh;
212             my $old_af = $|; $| = 1; $| = $old_af;
213             select $old_fh;
214         }
215         else {
216             $obj->{base_offset} = $bytes_read;
217
218             ##
219             # Get our type from master index header
220             ##
221             my $tag = $self->load_tag($obj->_base_offset);
222             unless ( $tag ) {
223                 flock $fh, LOCK_UN;
224                 $self->_throw_error("Corrupted file, no master index record");
225             }
226
227             unless ($obj->_type eq $tag->{signature}) {
228                 flock $fh, LOCK_UN;
229                 $self->_throw_error("File type mismatch");
230             }
231         }
232     }
233     else {
234         $self->calculate_sizes;
235     }
236
237     #XXX We have to make sure we don't mess up when autoflush isn't turned on
238     unless ( $self->_fileobj->{inode} ) {
239         my @stats = stat($fh);
240         $self->_fileobj->{inode} = $stats[1];
241         $self->_fileobj->{end} = $stats[7];
242     }
243
244     flock $fh, LOCK_UN;
245
246     return 1;
247 }
248
249 sub tag_size {
250     my $self = shift;
251     my ($size) = @_;
252     return SIG_SIZE + $self->{data_size} + $size;
253 }
254
255 sub write_tag {
256     ##
257     # Given offset, signature and content, create tag and write to disk
258     ##
259     my $self = shift;
260     my ($offset, $sig, $content) = @_;
261     my $size = length( $content );
262
263     local($/,$\);
264
265     my $fh = $self->_fh;
266
267     if ( defined $offset ) {
268         seek($fh, $offset + $self->_fileobj->{file_offset}, SEEK_SET);
269     }
270
271     print( $fh $sig . pack($self->{data_pack}, $size) . $content );
272
273     return unless defined $offset;
274
275     return {
276         signature => $sig,
277         size => $size,
278         offset => $offset + SIG_SIZE + $self->{data_size},
279         content => $content
280     };
281 }
282
283 sub load_tag {
284     ##
285     # Given offset, load single tag and return signature, size and data
286     ##
287     my $self = shift;
288     my ($offset) = @_;
289
290     local($/,$\);
291
292 #    print join(':',map{$_||''}caller(1)), $/;
293
294     my $fh = $self->_fh;
295
296     seek($fh, $offset + $self->_fileobj->{file_offset}, SEEK_SET);
297
298     #XXX I'm not sure this check will work if autoflush isn't enabled ...
299     return if eof $fh;
300
301     my $b;
302     read( $fh, $b, SIG_SIZE + $self->{data_size} );
303     my ($sig, $size) = unpack( "A $self->{data_pack}", $b );
304
305     my $buffer;
306     read( $fh, $buffer, $size);
307
308     return {
309         signature => $sig,
310         size => $size,
311         offset => $offset + SIG_SIZE + $self->{data_size},
312         content => $buffer
313     };
314 }
315
316 sub _get_dbm_object {
317     my $item = shift;
318
319     my $obj = eval {
320         local $SIG{__DIE__};
321         if ($item->isa( 'DBM::Deep' )) {
322             return $item;
323         }
324         return;
325     };
326     return $obj if $obj;
327
328     my $r = Scalar::Util::reftype( $item ) || '';
329     if ( $r eq 'HASH' ) {
330         my $obj = eval {
331             local $SIG{__DIE__};
332             my $obj = tied(%$item);
333             if ($obj->isa( 'DBM::Deep' )) {
334                 return $obj;
335             }
336             return;
337         };
338         return $obj if $obj;
339     }
340     elsif ( $r eq 'ARRAY' ) {
341         my $obj = eval {
342             local $SIG{__DIE__};
343             my $obj = tied(@$item);
344             if ($obj->isa( 'DBM::Deep' )) {
345                 return $obj;
346             }
347             return;
348         };
349         return $obj if $obj;
350     }
351
352     return;
353 }
354
355 sub _length_needed {
356     my $self = shift;
357     my ($value, $key) = @_;
358
359     my $is_dbm_deep = eval {
360         local $SIG{'__DIE__'};
361         $value->isa( 'DBM::Deep' );
362     };
363
364     my $len = SIG_SIZE + $self->{data_size}
365             + $self->{data_size} + length( $key );
366
367     if ( $is_dbm_deep && $value->_fileobj eq $self->_fileobj ) {
368         return $len + $self->{long_size};
369     }
370
371     my $r = Scalar::Util::reftype( $value ) || '';
372     if ( $self->_fileobj->{autobless} ) {
373         # This is for the bit saying whether or not this thing is blessed.
374         $len += 1;
375     }
376
377     unless ( $r eq 'HASH' || $r eq 'ARRAY' ) {
378         if ( defined $value ) {
379             $len += length( $value );
380         }
381         return $len;
382     }
383
384     $len += $self->{index_size};
385
386     # if autobless is enabled, must also take into consideration
387     # the class name as it is stored after the key.
388     if ( $self->_fileobj->{autobless} ) {
389         my $c = Scalar::Util::blessed($value);
390         if ( defined $c && !$is_dbm_deep ) {
391             $len += $self->{data_size} + length($c);
392         }
393     }
394
395     return $len;
396 }
397
398 sub add_bucket {
399     ##
400     # Adds one key/value pair to bucket list, given offset, MD5 digest of key,
401     # plain (undigested) key and value.
402     ##
403     my $self = shift;
404     my ($tag, $md5, $plain_key, $value, $deleted, $orig_key) = @_;
405     $deleted ||= 0;
406
407     local($/,$\);
408
409     # This verifies that only supported values will be stored.
410     {
411         my $r = Scalar::Util::reftype( $value );
412         last if !defined $r;
413
414         last if $r eq 'HASH';
415         last if $r eq 'ARRAY';
416
417         $self->_throw_error(
418             "Storage of variables of type '$r' is not supported."
419         );
420     }
421
422     my $location = 0;
423     my $result = 2;
424
425     my $root = $self->_fileobj;
426     my $fh   = $self->_fh;
427
428     my $actual_length = $self->_length_needed( $value, $plain_key );
429
430     #ACID - This is a mutation. Must only find the exact transaction
431     my ($subloc, $offset, $size,$is_deleted) = $self->_find_in_buckets( $tag, $md5, 1 );
432
433     my @transactions;
434     if ( $self->_fileobj->transaction_id == 0 ) {
435         @transactions = $self->_fileobj->current_transactions;
436     }
437
438 #    $self->_release_space( $size, $subloc );
439     # Updating a known md5
440 #XXX This needs updating to use _release_space
441     if ( $subloc ) {
442         $result = 1;
443
444         if ($actual_length <= $size) {
445             $location = $subloc;
446         }
447         else {
448             $location = $self->_request_space( $actual_length );
449             seek(
450                 $fh,
451                 $tag->{offset} + $offset
452               + $self->{hash_size} + $root->{file_offset},
453                 SEEK_SET,
454             );
455             print( $fh pack($self->{long_pack}, $location ) );
456             print( $fh pack($self->{long_pack}, $actual_length ) );
457             print( $fh pack('n n', $root->transaction_id, $deleted ) );
458         }
459     }
460     # Adding a new md5
461     elsif ( defined $offset ) {
462         $location = $self->_request_space( $actual_length );
463
464         seek( $fh, $tag->{offset} + $offset + $root->{file_offset}, SEEK_SET );
465         print( $fh $md5 . pack($self->{long_pack}, $location ) );
466         print( $fh pack($self->{long_pack}, $actual_length ) );
467         print( $fh pack('n n', $root->transaction_id, $deleted ) );
468
469         for ( @transactions ) {
470             my $tag2 = $self->load_tag( $tag->{offset} - SIG_SIZE - $self->{data_size} );
471             $self->_fileobj->{transaction_id} = $_;
472             $self->add_bucket( $tag2, $md5, '', '', 1, $orig_key );
473             $self->_fileobj->{transaction_id} = 0;
474         }
475     }
476     # If bucket didn't fit into list, split into a new index level
477     # split_index() will do the _request_space() call
478     else {
479         $location = $self->split_index( $md5, $tag );
480     }
481
482     $self->write_value( $location, $plain_key, $value, $orig_key );
483
484     return $result;
485 }
486
487 sub write_value {
488     my $self = shift;
489     my ($location, $key, $value, $orig_key) = @_;
490
491     local($/,$\);
492
493     my $fh = $self->_fh;
494     my $root = $self->_fileobj;
495
496     my $dbm_deep_obj = _get_dbm_object( $value );
497     if ( $dbm_deep_obj && $dbm_deep_obj->_fileobj ne $self->_fileobj ) {
498         $self->_throw_error( "Cannot cross-reference. Use export() instead" );
499     }
500
501     seek($fh, $location + $root->{file_offset}, SEEK_SET);
502
503     ##
504     # Write signature based on content type, set content length and write
505     # actual value.
506     ##
507     my $r = Scalar::Util::reftype( $value ) || '';
508     if ( $dbm_deep_obj ) {
509         $self->write_tag( undef, SIG_INTERNAL,pack($self->{long_pack}, $dbm_deep_obj->_base_offset) );
510     }
511     elsif ($r eq 'HASH') {
512         if ( !$dbm_deep_obj && tied %{$value} ) {
513             $self->_throw_error( "Cannot store something that is tied" );
514         }
515         $self->write_tag( undef, SIG_HASH, chr(0)x$self->{index_size} );
516     }
517     elsif ($r eq 'ARRAY') {
518         if ( !$dbm_deep_obj && tied @{$value} ) {
519             $self->_throw_error( "Cannot store something that is tied" );
520         }
521         $self->write_tag( undef, SIG_ARRAY, chr(0)x$self->{index_size} );
522     }
523     elsif (!defined($value)) {
524         $self->write_tag( undef, SIG_NULL, '' );
525     }
526     else {
527         $self->write_tag( undef, SIG_DATA, $value );
528     }
529
530     ##
531     # Plain key is stored AFTER value, as keys are typically fetched less often.
532     ##
533     print( $fh pack($self->{data_pack}, length($key)) . $key );
534
535     # Internal references don't care about autobless
536     return 1 if $dbm_deep_obj;
537
538     ##
539     # If value is blessed, preserve class name
540     ##
541     if ( $root->{autobless} ) {
542         my $c = Scalar::Util::blessed($value);
543         if ( defined $c && !$dbm_deep_obj ) {
544             print( $fh chr(1) );
545             print( $fh pack($self->{data_pack}, length($c)) . $c );
546         }
547         else {
548             print( $fh chr(0) );
549         }
550     }
551
552     ##
553     # Tie the passed in reference so that changes to it are reflected in the
554     # datafile. The use of $location as the base_offset will act as the
555     # the linkage between parent and child.
556     #
557     # The overall assignment is a hack around the fact that just tying doesn't
558     # store the values. This may not be the wrong thing to do.
559     ##
560     if ($r eq 'HASH') {
561         my %x = %$value;
562         tie %$value, 'DBM::Deep', {
563             base_offset => $location,
564             fileobj     => $root,
565             parent      => $self->{obj},
566             parent_key  => $orig_key,
567         };
568         %$value = %x;
569     }
570     elsif ($r eq 'ARRAY') {
571         my @x = @$value;
572         tie @$value, 'DBM::Deep', {
573             base_offset => $location,
574             fileobj     => $root,
575             parent      => $self->{obj},
576             parent_key  => $orig_key,
577         };
578         @$value = @x;
579     }
580
581     return 1;
582 }
583
584 sub split_index {
585     my $self = shift;
586     my ($md5, $tag) = @_;
587
588     local($/,$\);
589
590     my $fh = $self->_fh;
591     my $root = $self->_fileobj;
592
593     my $loc = $self->_request_space(
594         $self->tag_size( $self->{index_size} ),
595     );
596
597     seek($fh, $tag->{ref_loc} + $root->{file_offset}, SEEK_SET);
598     print( $fh pack($self->{long_pack}, $loc) );
599
600     my $index_tag = $self->write_tag(
601         $loc, SIG_INDEX,
602         chr(0)x$self->{index_size},
603     );
604
605     my $newtag_loc = $self->_request_space(
606         $self->tag_size( $self->{bucket_list_size} ),
607     );
608
609     my $keys = $tag->{content}
610              . $md5 . pack($self->{long_pack}, $newtag_loc)
611                     . pack($self->{long_pack}, 0)  # size
612                     . pack($self->{long_pack}, 0); # transaction ID
613
614     my @newloc = ();
615     BUCKET:
616     for (my $i = 0; $i <= $self->{max_buckets}; $i++) {
617         my ($key, $old_subloc, $size) = $self->_get_key_subloc( $keys, $i );
618
619         die "[INTERNAL ERROR]: No key in split_index()\n" unless $key;
620         die "[INTERNAL ERROR]: No subloc in split_index()\n" unless $old_subloc;
621
622         my $num = ord(substr($key, $tag->{ch} + 1, 1));
623
624         if ($newloc[$num]) {
625             seek($fh, $newloc[$num] + $root->{file_offset}, SEEK_SET);
626             my $subkeys;
627             read( $fh, $subkeys, $self->{bucket_list_size});
628
629             # This is looking for the first empty spot
630             my ($subloc, $offset, $size) = $self->_find_in_buckets(
631                 { content => $subkeys }, '',
632             );
633
634             seek($fh, $newloc[$num] + $offset + $root->{file_offset}, SEEK_SET);
635             print( $fh $key . pack($self->{long_pack}, $old_subloc) );
636
637             next;
638         }
639
640         seek($fh, $index_tag->{offset} + ($num * $self->{long_size}) + $root->{file_offset}, SEEK_SET);
641
642         my $loc = $self->_request_space(
643             $self->tag_size( $self->{bucket_list_size} ),
644         );
645
646         print( $fh pack($self->{long_pack}, $loc) );
647
648         my $blist_tag = $self->write_tag(
649             $loc, SIG_BLIST,
650             chr(0)x$self->{bucket_list_size},
651         );
652
653         seek($fh, $blist_tag->{offset} + $root->{file_offset}, SEEK_SET);
654         print( $fh $key . pack($self->{long_pack}, $old_subloc) );
655
656         $newloc[$num] = $blist_tag->{offset};
657     }
658
659     $self->_release_space(
660         $self->tag_size( $self->{bucket_list_size} ),
661         $tag->{offset} - SIG_SIZE - $self->{data_size},
662     );
663
664     return $newtag_loc;
665 }
666
667 sub read_from_loc {
668     my $self = shift;
669     my ($subloc, $orig_key) = @_;
670
671     local($/,$\);
672
673     my $fh = $self->_fh;
674
675     ##
676     # Found match -- seek to offset and read signature
677     ##
678     my $signature;
679     seek($fh, $subloc + $self->_fileobj->{file_offset}, SEEK_SET);
680     read( $fh, $signature, SIG_SIZE);
681
682     ##
683     # If value is a hash or array, return new DBM::Deep object with correct offset
684     ##
685     if (($signature eq SIG_HASH) || ($signature eq SIG_ARRAY)) {
686         my $new_obj = DBM::Deep->new({
687             type        => $signature,
688             base_offset => $subloc,
689             fileobj     => $self->_fileobj,
690             parent      => $self->{obj},
691             parent_key  => $orig_key,
692         });
693
694         if ($new_obj->_fileobj->{autobless}) {
695             ##
696             # Skip over value and plain key to see if object needs
697             # to be re-blessed
698             ##
699             seek($fh, $self->{data_size} + $self->{index_size}, SEEK_CUR);
700
701             my $size;
702             read( $fh, $size, $self->{data_size});
703             $size = unpack($self->{data_pack}, $size);
704             if ($size) { seek($fh, $size, SEEK_CUR); }
705
706             my $bless_bit;
707             read( $fh, $bless_bit, 1);
708             if (ord($bless_bit)) {
709                 ##
710                 # Yes, object needs to be re-blessed
711                 ##
712                 my $class_name;
713                 read( $fh, $size, $self->{data_size});
714                 $size = unpack($self->{data_pack}, $size);
715                 if ($size) { read( $fh, $class_name, $size); }
716                 if ($class_name) { $new_obj = bless( $new_obj, $class_name ); }
717             }
718         }
719
720         return $new_obj;
721     }
722     elsif ( $signature eq SIG_INTERNAL ) {
723         my $size;
724         read( $fh, $size, $self->{data_size});
725         $size = unpack($self->{data_pack}, $size);
726
727         if ( $size ) {
728             my $new_loc;
729             read( $fh, $new_loc, $size );
730             $new_loc = unpack( $self->{long_pack}, $new_loc );
731
732             return $self->read_from_loc( $new_loc, $orig_key );
733         }
734         else {
735             return;
736         }
737     }
738     ##
739     # Otherwise return actual value
740     ##
741     elsif ( $signature eq SIG_DATA ) {
742         my $size;
743         read( $fh, $size, $self->{data_size});
744         $size = unpack($self->{data_pack}, $size);
745
746         my $value = '';
747         if ($size) { read( $fh, $value, $size); }
748         return $value;
749     }
750
751     ##
752     # Key exists, but content is null
753     ##
754     return;
755 }
756
757 sub get_bucket_value {
758     ##
759     # Fetch single value given tag and MD5 digested key.
760     ##
761     my $self = shift;
762     my ($tag, $md5, $orig_key) = @_;
763
764     #ACID - This is a read. Can find exact or HEAD
765     my ($subloc, $offset, $size,$is_deleted) = $self->_find_in_buckets( $tag, $md5 );
766     if ( $subloc && !$is_deleted ) {
767         return $self->read_from_loc( $subloc, $orig_key );
768     }
769     return;
770 }
771
772 sub delete_bucket {
773     ##
774     # Delete single key/value pair given tag and MD5 digested key.
775     ##
776     my $self = shift;
777     my ($tag, $md5, $orig_key) = @_;
778
779     local($/,$\);
780
781     #ACID - This is a mutation. Must only find the exact transaction
782     my ($subloc, $offset, $size) = $self->_find_in_buckets( $tag, $md5, 1 );
783 #XXX This needs _release_space()
784     if ( $subloc ) {
785         my $fh = $self->_fh;
786         seek($fh, $tag->{offset} + $offset + $self->_fileobj->{file_offset}, SEEK_SET);
787         print( $fh substr($tag->{content}, $offset + $self->{bucket_size} ) );
788         print( $fh chr(0) x $self->{bucket_size} );
789
790         return 1;
791     }
792     return;
793 }
794
795 sub bucket_exists {
796     ##
797     # Check existence of single key given tag and MD5 digested key.
798     ##
799     my $self = shift;
800     my ($tag, $md5) = @_;
801
802     #ACID - This is a read. Can find exact or HEAD
803     my ($subloc, $offset, $size, $is_deleted) = $self->_find_in_buckets( $tag, $md5 );
804     return ($subloc && !$is_deleted) && 1;
805 }
806
807 sub find_bucket_list {
808     ##
809     # Locate offset for bucket list, given digested key
810     ##
811     my $self = shift;
812     my ($offset, $md5, $args) = @_;
813     $args = {} unless $args;
814
815     local($/,$\);
816
817     ##
818     # Locate offset for bucket list using digest index system
819     ##
820     my $tag = $self->load_tag( $offset )
821         or $self->_throw_error( "INTERNAL ERROR - Cannot find tag" );
822
823     my $ch = 0;
824     while ($tag->{signature} ne SIG_BLIST) {
825         my $num = ord substr($md5, $ch, 1);
826
827         my $ref_loc = $tag->{offset} + ($num * $self->{long_size});
828         $tag = $self->index_lookup( $tag, $num );
829
830         if (!$tag) {
831             return if !$args->{create};
832
833             my $loc = $self->_request_space(
834                 $self->tag_size( $self->{bucket_list_size} ),
835             );
836
837             my $fh = $self->_fh;
838             seek($fh, $ref_loc + $self->_fileobj->{file_offset}, SEEK_SET);
839             print( $fh pack($self->{long_pack}, $loc) );
840
841             $tag = $self->write_tag(
842                 $loc, SIG_BLIST,
843                 chr(0)x$self->{bucket_list_size},
844             );
845
846             $tag->{ref_loc} = $ref_loc;
847             $tag->{ch} = $ch;
848
849             last;
850         }
851
852         $tag->{ch} = $ch++;
853         $tag->{ref_loc} = $ref_loc;
854     }
855
856     return $tag;
857 }
858
859 sub index_lookup {
860     ##
861     # Given index tag, lookup single entry in index and return .
862     ##
863     my $self = shift;
864     my ($tag, $index) = @_;
865
866     my $location = unpack(
867         $self->{long_pack},
868         substr(
869             $tag->{content},
870             $index * $self->{long_size},
871             $self->{long_size},
872         ),
873     );
874
875     if (!$location) { return; }
876
877     return $self->load_tag( $location );
878 }
879
880 sub traverse_index {
881     ##
882     # Scan index and recursively step into deeper levels, looking for next key.
883     ##
884     my $self = shift;
885     my ($obj, $offset, $ch, $force_return_next) = @_;
886
887     local($/,$\);
888
889     my $tag = $self->load_tag( $offset );
890
891     my $fh = $self->_fh;
892
893     if ($tag->{signature} ne SIG_BLIST) {
894         my $content = $tag->{content};
895         my $start = $obj->{return_next} ? 0 : ord(substr($obj->{prev_md5}, $ch, 1));
896
897         for (my $idx = $start; $idx < (2**8); $idx++) {
898             my $subloc = unpack(
899                 $self->{long_pack},
900                 substr(
901                     $content,
902                     $idx * $self->{long_size},
903                     $self->{long_size},
904                 ),
905             );
906
907             if ($subloc) {
908                 my $result = $self->traverse_index(
909                     $obj, $subloc, $ch + 1, $force_return_next,
910                 );
911
912                 if (defined($result)) { return $result; }
913             }
914         } # index loop
915
916         $obj->{return_next} = 1;
917     } # tag is an index
918
919     else {
920         my $keys = $tag->{content};
921         if ($force_return_next) { $obj->{return_next} = 1; }
922
923         ##
924         # Iterate through buckets, looking for a key match
925         ##
926         for (my $i = 0; $i < $self->{max_buckets}; $i++) {
927             my ($key, $subloc) = $self->_get_key_subloc( $keys, $i );
928
929             # End of bucket list -- return to outer loop
930             if (!$subloc) {
931                 $obj->{return_next} = 1;
932                 last;
933             }
934             # Located previous key -- return next one found
935             elsif ($key eq $obj->{prev_md5}) {
936                 $obj->{return_next} = 1;
937                 next;
938             }
939             # Seek to bucket location and skip over signature
940             elsif ($obj->{return_next}) {
941                 seek($fh, $subloc + $self->_fileobj->{file_offset}, SEEK_SET);
942
943                 # Skip over value to get to plain key
944                 my $sig;
945                 read( $fh, $sig, SIG_SIZE );
946
947                 my $size;
948                 read( $fh, $size, $self->{data_size});
949                 $size = unpack($self->{data_pack}, $size);
950                 if ($size) { seek($fh, $size, SEEK_CUR); }
951
952                 # Read in plain key and return as scalar
953                 my $plain_key;
954                 read( $fh, $size, $self->{data_size});
955                 $size = unpack($self->{data_pack}, $size);
956                 if ($size) { read( $fh, $plain_key, $size); }
957
958                 return $plain_key;
959             }
960         }
961
962         $obj->{return_next} = 1;
963     } # tag is a bucket list
964
965     return;
966 }
967
968 sub get_next_key {
969     ##
970     # Locate next key, given digested previous one
971     ##
972     my $self = shift;
973     my ($obj) = @_;
974
975     $obj->{prev_md5} = $_[1] ? $_[1] : undef;
976     $obj->{return_next} = 0;
977
978     ##
979     # If the previous key was not specifed, start at the top and
980     # return the first one found.
981     ##
982     if (!$obj->{prev_md5}) {
983         $obj->{prev_md5} = chr(0) x $self->{hash_size};
984         $obj->{return_next} = 1;
985     }
986
987     return $self->traverse_index( $obj, $obj->_base_offset, 0 );
988 }
989
990 # Utilities
991
992 sub _get_key_subloc {
993     my $self = shift;
994     my ($keys, $idx) = @_;
995
996     my ($key, $subloc, $size, $transaction_id, $is_deleted) = unpack(
997         # This is 'a', not 'A'. Please read the pack() documentation for the
998         # difference between the two and why it's important.
999         "a$self->{hash_size} $self->{long_pack}2 n2",
1000         substr(
1001             $keys,
1002             ($idx * $self->{bucket_size}),
1003             $self->{bucket_size},
1004         ),
1005     );
1006
1007     return ($key, $subloc, $size, $transaction_id, $is_deleted);
1008 }
1009
1010 sub _find_in_buckets {
1011     my $self = shift;
1012     my ($tag, $md5, $exact) = @_;
1013
1014     my $trans_id = $self->_fileobj->transaction_id;
1015
1016     my @zero;
1017
1018     BUCKET:
1019     for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
1020         my ($key, $subloc, $size, $transaction_id, $is_deleted) = $self->_get_key_subloc(
1021             $tag->{content}, $i,
1022         );
1023
1024         my @rv = ($subloc, $i * $self->{bucket_size}, $size, $is_deleted);
1025
1026         unless ( $subloc ) {
1027             if ( !$exact && @zero and $trans_id ) {
1028                 @rv = ($zero[2], $zero[0] * $self->{bucket_size},$zero[3],$is_deleted);
1029             }
1030             return @rv;
1031         }
1032
1033         next BUCKET if $key ne $md5;
1034
1035         # Save off the HEAD in case we need it.
1036         @zero = ($i,$key,$subloc,$size,$transaction_id,$is_deleted) if $transaction_id == 0;
1037
1038         next BUCKET if $transaction_id != $trans_id;
1039
1040         return @rv;
1041     }
1042
1043     return;
1044 }
1045
1046 sub _request_space {
1047     my $self = shift;
1048     my ($size) = @_;
1049
1050     my $loc = $self->_fileobj->{end};
1051     $self->_fileobj->{end} += $size;
1052
1053     return $loc;
1054 }
1055
1056 sub _release_space {
1057     my $self = shift;
1058     my ($size, $loc) = @_;
1059
1060     local($/,$\);
1061
1062     my $next_loc = 0;
1063
1064     my $fh = $self->_fh;
1065     seek( $fh, $loc + $self->_fileobj->{file_offset}, SEEK_SET );
1066     print( $fh SIG_FREE
1067         . pack($self->{long_pack}, $size )
1068         . pack($self->{long_pack}, $next_loc )
1069     );
1070
1071     return;
1072 }
1073
1074 sub _throw_error {
1075     die "DBM::Deep: $_[1]\n";
1076 }
1077
1078 1;
1079 __END__
1080
1081 # This will be added in later, after more refactoring is done. This is an early
1082 # attempt at refactoring on the physical level instead of the virtual level.
1083 sub _read_at {
1084     my $self = shift;
1085     my ($spot, $amount, $unpack) = @_;
1086
1087     local($/,$\);
1088
1089     my $fh = $self->_fh;
1090     seek( $fh, $spot + $self->_fileobj->{file_offset}, SEEK_SET );
1091
1092     my $buffer;
1093     my $bytes_read = read( $fh, $buffer, $amount );
1094
1095     if ( $unpack ) {
1096         $buffer = unpack( $unpack, $buffer );
1097     }
1098
1099     if ( wantarray ) {
1100         return ($buffer, $bytes_read);
1101     }
1102     else {
1103         return $buffer;
1104     }
1105 }
1106
1107 sub _print_at {
1108     my $self = shift;
1109     my ($spot, $data) = @_;
1110
1111     local($/,$\);
1112
1113     my $fh = $self->_fh;
1114     seek( $fh, $spot, SEEK_SET );
1115     print( $fh $data );
1116
1117     return;
1118 }
1119
1120 sub get_file_version {
1121     my $self = shift;
1122
1123     local($/,$\);
1124
1125     my $fh = $self->_fh;
1126
1127     seek( $fh, 13 + $self->_fileobj->{file_offset}, SEEK_SET );
1128     my $buffer;
1129     my $bytes_read = read( $fh, $buffer, 4 );
1130     unless ( $bytes_read == 4 ) {
1131         $self->_throw_error( "Cannot read file version" );
1132     }
1133
1134     return unpack( 'N', $buffer );
1135 }
1136
1137 sub write_file_version {
1138     my $self = shift;
1139     my ($new_version) = @_;
1140
1141     local($/,$\);
1142
1143     my $fh = $self->_fh;
1144
1145     seek( $fh, 13 + $self->_fileobj->{file_offset}, SEEK_SET );
1146     print( $fh pack( 'N', $new_version ) );
1147
1148     return;
1149 }
1150