Adding transactions further - settled on MVCC
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 use 5.6.0;
4
5 use strict;
6 use warnings;
7
8 use Fcntl qw( :DEFAULT :flock :seek );
9
10 # File-wide notes:
11 # * All the local($/,$\); are to protect read() and print() from -l.
12
13 ##
14 # Setup file and tag signatures.  These should never change.
15 ##
16 sub SIG_FILE     () { 'DPDB' }
17 sub SIG_HEADER   () { 'h'    }
18 sub SIG_INTERNAL () { 'i'    }
19 sub SIG_HASH     () { 'H'    }
20 sub SIG_ARRAY    () { 'A'    }
21 sub SIG_NULL     () { 'N'    }
22 sub SIG_DATA     () { 'D'    }
23 sub SIG_INDEX    () { 'I'    }
24 sub SIG_BLIST    () { 'B'    }
25 sub SIG_FREE     () { 'F'    }
26 sub SIG_SIZE     () {  1     }
27
28 sub new {
29     my $class = shift;
30     my ($args) = @_;
31
32     my $self = bless {
33         long_size   => 4,
34         long_pack   => 'N',
35         data_size   => 4,
36         data_pack   => 'N',
37
38         digest      => \&Digest::MD5::md5,
39         hash_size   => 16,
40
41         ##
42         # Maximum number of buckets per list before another level of indexing is
43         # done. Increase this value for slightly greater speed, but larger database
44         # files. DO NOT decrease this value below 16, due to risk of recursive
45         # reindex overrun.
46         ##
47         max_buckets => 16,
48
49         fileobj => undef,
50     }, $class;
51
52     if ( defined $args->{pack_size} ) {
53         if ( lc $args->{pack_size} eq 'small' ) {
54             $args->{long_size} = 2;
55             $args->{long_pack} = 'S';
56         }
57         elsif ( lc $args->{pack_size} eq 'medium' ) {
58             $args->{long_size} = 4;
59             $args->{long_pack} = 'N';
60         }
61         elsif ( lc $args->{pack_size} eq 'large' ) {
62             $args->{long_size} = 8;
63             $args->{long_pack} = 'Q';
64         }
65         else {
66             die "Unknown pack_size value: '$args->{pack_size}'\n";
67         }
68     }
69
70     # Grab the parameters we want to use
71     foreach my $param ( keys %$self ) {
72         next unless exists $args->{$param};
73         $self->{$param} = $args->{$param};
74     }
75
76     if ( $self->{max_buckets} < 16 ) {
77         warn "Floor of max_buckets is 16. Setting it to 16 from '$self->{max_buckets}'\n";
78         $self->{max_buckets} = 16;
79     }
80
81     return $self;
82 }
83
84 sub _fileobj { return $_[0]{fileobj} }
85 sub _fh      { return $_[0]->_fileobj->{fh} }
86
87 sub calculate_sizes {
88     my $self = shift;
89
90     #XXX Does this need to be updated with different hashing algorithms?
91     $self->{index_size}       = (2**8) * $self->{long_size};
92     $self->{bucket_size}      = $self->{hash_size} + $self->{long_size} * 3;
93     $self->{bucket_list_size} = $self->{max_buckets} * $self->{bucket_size};
94
95     return;
96 }
97
98 sub write_file_header {
99     my $self = shift;
100
101     local($/,$\);
102
103     my $fh = $self->_fh;
104
105     my $loc = $self->_request_space( length( SIG_FILE ) + 21 );
106     seek($fh, $loc + $self->_fileobj->{file_offset}, SEEK_SET);
107     print( $fh
108         SIG_FILE,
109         SIG_HEADER,
110         pack('N', 1),  # header version
111         pack('N', 12), # header size
112         pack('N', 0),  # currently running transaction IDs
113         pack('S', $self->{long_size}),
114         pack('A', $self->{long_pack}),
115         pack('S', $self->{data_size}),
116         pack('A', $self->{data_pack}),
117         pack('S', $self->{max_buckets}),
118     );
119
120     return;
121 }
122
123 sub read_file_header {
124     my $self = shift;
125
126     local($/,$\);
127
128     my $fh = $self->_fh;
129
130     seek($fh, 0 + $self->_fileobj->{file_offset}, SEEK_SET);
131     my $buffer;
132     my $bytes_read = read( $fh, $buffer, length(SIG_FILE) + 9 );
133
134     return unless $bytes_read;
135
136     my ($file_signature, $sig_header, $header_version, $size) = unpack(
137         'A4 A N N', $buffer
138     );
139
140     unless ( $file_signature eq SIG_FILE ) {
141         $self->_fileobj->close;
142         $self->_throw_error( "Signature not found -- file is not a Deep DB" );
143     }
144
145     unless ( $sig_header eq SIG_HEADER ) {
146         $self->_fileobj->close;
147         $self->_throw_error( "Old file version found." );
148     }
149
150     my $buffer2;
151     $bytes_read += read( $fh, $buffer2, $size );
152     my ($running_transactions, @values) = unpack( 'N S A S A S', $buffer2 );
153
154     $self->_fileobj->set_transaction_offset( 13 );
155
156     if ( @values < 5 || grep { !defined } @values ) {
157         $self->_fileobj->close;
158         $self->_throw_error("Corrupted file - bad header");
159     }
160
161     #XXX Add warnings if values weren't set right
162     @{$self}{qw(long_size long_pack data_size data_pack max_buckets)} = @values;
163
164     return $bytes_read;
165 }
166
167 sub setup_fh {
168     my $self = shift;
169     my ($obj) = @_;
170
171     my $fh = $self->_fh;
172     flock $fh, LOCK_EX;
173
174     #XXX The duplication of calculate_sizes needs to go away
175     unless ( $obj->{base_offset} ) {
176         my $bytes_read = $self->read_file_header;
177
178         $self->calculate_sizes;
179
180         ##
181         # File is empty -- write header and master index
182         ##
183         if (!$bytes_read) {
184             $self->write_file_header;
185
186             $obj->{base_offset} = $self->_request_space( $self->tag_size( $self->{index_size} ) );
187
188             $self->write_tag(
189                 $obj->_base_offset, $obj->_type,
190                 chr(0)x$self->{index_size},
191             );
192
193             # Flush the filehandle
194             my $old_fh = select $fh;
195             my $old_af = $|; $| = 1; $| = $old_af;
196             select $old_fh;
197         }
198         else {
199             $obj->{base_offset} = $bytes_read;
200
201             ##
202             # Get our type from master index header
203             ##
204             my $tag = $self->load_tag($obj->_base_offset)
205                 or $self->_throw_error("Corrupted file, no master index record");
206
207             unless ($obj->_type eq $tag->{signature}) {
208                 $self->_throw_error("File type mismatch");
209             }
210         }
211     }
212     else {
213         $self->calculate_sizes;
214     }
215
216     #XXX We have to make sure we don't mess up when autoflush isn't turned on
217     unless ( $self->_fileobj->{inode} ) {
218         my @stats = stat($fh);
219         $self->_fileobj->{inode} = $stats[1];
220         $self->_fileobj->{end} = $stats[7];
221     }
222
223     flock $fh, LOCK_UN;
224
225     return 1;
226 }
227
228 sub tag_size {
229     my $self = shift;
230     my ($size) = @_;
231     return SIG_SIZE + $self->{data_size} + $size;
232 }
233
234 sub write_tag {
235     ##
236     # Given offset, signature and content, create tag and write to disk
237     ##
238     my $self = shift;
239     my ($offset, $sig, $content) = @_;
240     my $size = length( $content );
241
242     local($/,$\);
243
244     my $fh = $self->_fh;
245
246     if ( defined $offset ) {
247         seek($fh, $offset + $self->_fileobj->{file_offset}, SEEK_SET);
248     }
249
250     print( $fh $sig . pack($self->{data_pack}, $size) . $content );
251
252     return unless defined $offset;
253
254     return {
255         signature => $sig,
256         size => $size,
257         offset => $offset + SIG_SIZE + $self->{data_size},
258         content => $content
259     };
260 }
261
262 sub load_tag {
263     ##
264     # Given offset, load single tag and return signature, size and data
265     ##
266     my $self = shift;
267     my ($offset) = @_;
268
269     local($/,$\);
270
271 #    print join(':',map{$_||''}caller(1)), $/;
272
273     my $fh = $self->_fh;
274
275     seek($fh, $offset + $self->_fileobj->{file_offset}, SEEK_SET);
276
277     #XXX I'm not sure this check will work if autoflush isn't enabled ...
278     return if eof $fh;
279
280     my $b;
281     read( $fh, $b, SIG_SIZE + $self->{data_size} );
282     my ($sig, $size) = unpack( "A $self->{data_pack}", $b );
283
284     my $buffer;
285     read( $fh, $buffer, $size);
286
287     return {
288         signature => $sig,
289         size => $size,
290         offset => $offset + SIG_SIZE + $self->{data_size},
291         content => $buffer
292     };
293 }
294
295 sub _get_dbm_object {
296     my $item = shift;
297
298     my $obj = eval {
299         local $SIG{__DIE__};
300         if ($item->isa( 'DBM::Deep' )) {
301             return $item;
302         }
303         return;
304     };
305     return $obj if $obj;
306
307     my $r = Scalar::Util::reftype( $item ) || '';
308     if ( $r eq 'HASH' ) {
309         my $obj = eval {
310             local $SIG{__DIE__};
311             my $obj = tied(%$item);
312             if ($obj->isa( 'DBM::Deep' )) {
313                 return $obj;
314             }
315             return;
316         };
317         return $obj if $obj;
318     }
319     elsif ( $r eq 'ARRAY' ) {
320         my $obj = eval {
321             local $SIG{__DIE__};
322             my $obj = tied(@$item);
323             if ($obj->isa( 'DBM::Deep' )) {
324                 return $obj;
325             }
326             return;
327         };
328         return $obj if $obj;
329     }
330
331     return;
332 }
333
334 sub _length_needed {
335     my $self = shift;
336     my ($value, $key) = @_;
337
338     my $is_dbm_deep = eval {
339         local $SIG{'__DIE__'};
340         $value->isa( 'DBM::Deep' );
341     };
342
343     my $len = SIG_SIZE + $self->{data_size}
344             + $self->{data_size} + length( $key );
345
346     if ( $is_dbm_deep && $value->_fileobj eq $self->_fileobj ) {
347         return $len + $self->{long_size};
348     }
349
350     my $r = Scalar::Util::reftype( $value ) || '';
351     if ( $self->_fileobj->{autobless} ) {
352         # This is for the bit saying whether or not this thing is blessed.
353         $len += 1;
354     }
355
356     unless ( $r eq 'HASH' || $r eq 'ARRAY' ) {
357         if ( defined $value ) {
358             $len += length( $value );
359         }
360         return $len;
361     }
362
363     $len += $self->{index_size};
364
365     # if autobless is enabled, must also take into consideration
366     # the class name as it is stored after the key.
367     if ( $self->_fileobj->{autobless} ) {
368         my $c = Scalar::Util::blessed($value);
369         if ( defined $c && !$is_dbm_deep ) {
370             $len += $self->{data_size} + length($c);
371         }
372     }
373
374     return $len;
375 }
376
377 sub add_bucket {
378     ##
379     # Adds one key/value pair to bucket list, given offset, MD5 digest of key,
380     # plain (undigested) key and value.
381     ##
382     my $self = shift;
383     my ($tag, $md5, $plain_key, $value) = @_;
384
385     local($/,$\);
386
387     # This verifies that only supported values will be stored.
388     {
389         my $r = Scalar::Util::reftype( $value );
390         last if !defined $r;
391
392         last if $r eq 'HASH';
393         last if $r eq 'ARRAY';
394
395         $self->_throw_error(
396             "Storage of variables of type '$r' is not supported."
397         );
398     }
399
400     my $location = 0;
401     my $result = 2;
402
403     my $root = $self->_fileobj;
404     my $fh   = $self->_fh;
405
406     my $actual_length = $self->_length_needed( $value, $plain_key );
407
408     #ACID - This is a mutation. Must only find the exact transaction
409     my ($subloc, $offset, $size) = $self->_find_in_buckets( $tag, $md5, 1 );
410
411 #    $self->_release_space( $size, $subloc );
412     # Updating a known md5
413 #XXX This needs updating to use _release_space
414     if ( $subloc ) {
415         $result = 1;
416
417         if ($actual_length <= $size) {
418             $location = $subloc;
419         }
420         else {
421             $location = $self->_request_space( $actual_length );
422             seek(
423                 $fh,
424                 $tag->{offset} + $offset
425               + $self->{hash_size} + $root->{file_offset},
426                 SEEK_SET,
427             );
428             print( $fh pack($self->{long_pack}, $location ) );
429             print( $fh pack($self->{long_pack}, $actual_length ) );
430             print( $fh pack($self->{long_pack}, $root->transaction_id ) );
431         }
432     }
433     # Adding a new md5
434     elsif ( defined $offset ) {
435         $location = $self->_request_space( $actual_length );
436
437         seek( $fh, $tag->{offset} + $offset + $root->{file_offset}, SEEK_SET );
438         print( $fh $md5 . pack($self->{long_pack}, $location ) );
439         print( $fh pack($self->{long_pack}, $actual_length ) );
440         print( $fh pack($self->{long_pack}, $root->transaction_id ) );
441     }
442     # If bucket didn't fit into list, split into a new index level
443     # split_index() will do the _request_space() call
444     else {
445         $location = $self->split_index( $md5, $tag );
446     }
447
448     $self->write_value( $location, $plain_key, $value );
449
450     return $result;
451 }
452
453 sub write_value {
454     my $self = shift;
455     my ($location, $key, $value) = @_;
456
457     local($/,$\);
458
459     my $fh = $self->_fh;
460     my $root = $self->_fileobj;
461
462     my $dbm_deep_obj = _get_dbm_object( $value );
463     if ( $dbm_deep_obj && $dbm_deep_obj->_fileobj ne $self->_fileobj ) {
464         $self->_throw_error( "Cannot cross-reference. Use export() instead" );
465     }
466
467     seek($fh, $location + $root->{file_offset}, SEEK_SET);
468
469     ##
470     # Write signature based on content type, set content length and write
471     # actual value.
472     ##
473     my $r = Scalar::Util::reftype( $value ) || '';
474     if ( $dbm_deep_obj ) {
475         $self->write_tag( undef, SIG_INTERNAL,pack($self->{long_pack}, $dbm_deep_obj->_base_offset) );
476     }
477     elsif ($r eq 'HASH') {
478         if ( !$dbm_deep_obj && tied %{$value} ) {
479             $self->_throw_error( "Cannot store something that is tied" );
480         }
481         $self->write_tag( undef, SIG_HASH, chr(0)x$self->{index_size} );
482     }
483     elsif ($r eq 'ARRAY') {
484         if ( !$dbm_deep_obj && tied @{$value} ) {
485             $self->_throw_error( "Cannot store something that is tied" );
486         }
487         $self->write_tag( undef, SIG_ARRAY, chr(0)x$self->{index_size} );
488     }
489     elsif (!defined($value)) {
490         $self->write_tag( undef, SIG_NULL, '' );
491     }
492     else {
493         $self->write_tag( undef, SIG_DATA, $value );
494     }
495
496     ##
497     # Plain key is stored AFTER value, as keys are typically fetched less often.
498     ##
499     print( $fh pack($self->{data_pack}, length($key)) . $key );
500
501     # Internal references don't care about autobless
502     return 1 if $dbm_deep_obj;
503
504     ##
505     # If value is blessed, preserve class name
506     ##
507     if ( $root->{autobless} ) {
508         my $c = Scalar::Util::blessed($value);
509         if ( defined $c && !$dbm_deep_obj ) {
510             print( $fh chr(1) );
511             print( $fh pack($self->{data_pack}, length($c)) . $c );
512         }
513         else {
514             print( $fh chr(0) );
515         }
516     }
517
518     ##
519     # Tie the passed in reference so that changes to it are reflected in the
520     # datafile. The use of $location as the base_offset will act as the
521     # the linkage between parent and child.
522     #
523     # The overall assignment is a hack around the fact that just tying doesn't
524     # store the values. This may not be the wrong thing to do.
525     ##
526     if ($r eq 'HASH') {
527         my %x = %$value;
528         tie %$value, 'DBM::Deep', {
529             base_offset => $location,
530             fileobj     => $root,
531         };
532         %$value = %x;
533     }
534     elsif ($r eq 'ARRAY') {
535         my @x = @$value;
536         tie @$value, 'DBM::Deep', {
537             base_offset => $location,
538             fileobj     => $root,
539         };
540         @$value = @x;
541     }
542
543     return 1;
544 }
545
546 sub split_index {
547     my $self = shift;
548     my ($md5, $tag) = @_;
549
550     local($/,$\);
551
552     my $fh = $self->_fh;
553     my $root = $self->_fileobj;
554
555     my $loc = $self->_request_space(
556         $self->tag_size( $self->{index_size} ),
557     );
558
559     seek($fh, $tag->{ref_loc} + $root->{file_offset}, SEEK_SET);
560     print( $fh pack($self->{long_pack}, $loc) );
561
562     my $index_tag = $self->write_tag(
563         $loc, SIG_INDEX,
564         chr(0)x$self->{index_size},
565     );
566
567     my $newtag_loc = $self->_request_space(
568         $self->tag_size( $self->{bucket_list_size} ),
569     );
570
571     my $keys = $tag->{content}
572              . $md5 . pack($self->{long_pack}, $newtag_loc)
573                     . pack($self->{long_pack}, 0)  # size
574                     . pack($self->{long_pack}, 0); # transaction #
575
576     my @newloc = ();
577     BUCKET:
578     for (my $i = 0; $i <= $self->{max_buckets}; $i++) {
579         my ($key, $old_subloc, $size) = $self->_get_key_subloc( $keys, $i );
580
581         die "[INTERNAL ERROR]: No key in split_index()\n" unless $key;
582         die "[INTERNAL ERROR]: No subloc in split_index()\n" unless $old_subloc;
583
584         my $num = ord(substr($key, $tag->{ch} + 1, 1));
585
586         if ($newloc[$num]) {
587             seek($fh, $newloc[$num] + $root->{file_offset}, SEEK_SET);
588             my $subkeys;
589             read( $fh, $subkeys, $self->{bucket_list_size});
590
591             # This is looking for the first empty spot
592             my ($subloc, $offset, $size) = $self->_find_in_buckets(
593                 { content => $subkeys }, '',
594             );
595
596             seek($fh, $newloc[$num] + $offset + $root->{file_offset}, SEEK_SET);
597             print( $fh $key . pack($self->{long_pack}, $old_subloc) );
598
599             next;
600         }
601
602         seek($fh, $index_tag->{offset} + ($num * $self->{long_size}) + $root->{file_offset}, SEEK_SET);
603
604         my $loc = $self->_request_space(
605             $self->tag_size( $self->{bucket_list_size} ),
606         );
607
608         print( $fh pack($self->{long_pack}, $loc) );
609
610         my $blist_tag = $self->write_tag(
611             $loc, SIG_BLIST,
612             chr(0)x$self->{bucket_list_size},
613         );
614
615         seek($fh, $blist_tag->{offset} + $root->{file_offset}, SEEK_SET);
616         print( $fh $key . pack($self->{long_pack}, $old_subloc) );
617
618         $newloc[$num] = $blist_tag->{offset};
619     }
620
621     $self->_release_space(
622         $self->tag_size( $self->{bucket_list_size} ),
623         $tag->{offset} - SIG_SIZE - $self->{data_size},
624     );
625
626     return $newtag_loc;
627 }
628
629 sub read_from_loc {
630     my $self = shift;
631     my ($subloc) = @_;
632
633     local($/,$\);
634
635     my $fh = $self->_fh;
636
637     ##
638     # Found match -- seek to offset and read signature
639     ##
640     my $signature;
641     seek($fh, $subloc + $self->_fileobj->{file_offset}, SEEK_SET);
642     read( $fh, $signature, SIG_SIZE);
643
644     ##
645     # If value is a hash or array, return new DBM::Deep object with correct offset
646     ##
647     if (($signature eq SIG_HASH) || ($signature eq SIG_ARRAY)) {
648         my $new_obj = DBM::Deep->new({
649             type => $signature,
650             base_offset => $subloc,
651             fileobj     => $self->_fileobj,
652         });
653
654         if ($new_obj->_fileobj->{autobless}) {
655             ##
656             # Skip over value and plain key to see if object needs
657             # to be re-blessed
658             ##
659             seek($fh, $self->{data_size} + $self->{index_size}, SEEK_CUR);
660
661             my $size;
662             read( $fh, $size, $self->{data_size});
663             $size = unpack($self->{data_pack}, $size);
664             if ($size) { seek($fh, $size, SEEK_CUR); }
665
666             my $bless_bit;
667             read( $fh, $bless_bit, 1);
668             if (ord($bless_bit)) {
669                 ##
670                 # Yes, object needs to be re-blessed
671                 ##
672                 my $class_name;
673                 read( $fh, $size, $self->{data_size});
674                 $size = unpack($self->{data_pack}, $size);
675                 if ($size) { read( $fh, $class_name, $size); }
676                 if ($class_name) { $new_obj = bless( $new_obj, $class_name ); }
677             }
678         }
679
680         return $new_obj;
681     }
682     elsif ( $signature eq SIG_INTERNAL ) {
683         my $size;
684         read( $fh, $size, $self->{data_size});
685         $size = unpack($self->{data_pack}, $size);
686
687         if ( $size ) {
688             my $new_loc;
689             read( $fh, $new_loc, $size );
690             $new_loc = unpack( $self->{long_pack}, $new_loc );
691
692             return $self->read_from_loc( $new_loc );
693         }
694         else {
695             return;
696         }
697     }
698     ##
699     # Otherwise return actual value
700     ##
701     elsif ( $signature eq SIG_DATA ) {
702         my $size;
703         read( $fh, $size, $self->{data_size});
704         $size = unpack($self->{data_pack}, $size);
705
706         my $value = '';
707         if ($size) { read( $fh, $value, $size); }
708         return $value;
709     }
710
711     ##
712     # Key exists, but content is null
713     ##
714     return;
715 }
716
717 sub get_bucket_value {
718     ##
719     # Fetch single value given tag and MD5 digested key.
720     ##
721     my $self = shift;
722     my ($tag, $md5) = @_;
723
724     #ACID - This is a read. Can find exact or HEAD
725     my ($subloc, $offset, $size) = $self->_find_in_buckets( $tag, $md5 );
726     if ( $subloc ) {
727         return $self->read_from_loc( $subloc );
728     }
729     return;
730 }
731
732 sub delete_bucket {
733     ##
734     # Delete single key/value pair given tag and MD5 digested key.
735     ##
736     my $self = shift;
737     my ($tag, $md5) = @_;
738
739     local($/,$\);
740
741     #ACID - This is a mutation. Must only find the exact transaction
742     my ($subloc, $offset, $size) = $self->_find_in_buckets( $tag, $md5, 1 );
743 #XXX This needs _release_space()
744     if ( $subloc ) {
745         my $fh = $self->_fh;
746         seek($fh, $tag->{offset} + $offset + $self->_fileobj->{file_offset}, SEEK_SET);
747         print( $fh substr($tag->{content}, $offset + $self->{bucket_size} ) );
748         print( $fh chr(0) x $self->{bucket_size} );
749
750         return 1;
751     }
752     return;
753 }
754
755 sub bucket_exists {
756     ##
757     # Check existence of single key given tag and MD5 digested key.
758     ##
759     my $self = shift;
760     my ($tag, $md5) = @_;
761
762     #ACID - This is a read. Can find exact or HEAD
763     my ($subloc, $offset, $size) = $self->_find_in_buckets( $tag, $md5 );
764     return $subloc && 1;
765 }
766
767 sub find_bucket_list {
768     ##
769     # Locate offset for bucket list, given digested key
770     ##
771     my $self = shift;
772     my ($offset, $md5, $args) = @_;
773     $args = {} unless $args;
774
775     local($/,$\);
776
777     ##
778     # Locate offset for bucket list using digest index system
779     ##
780     my $tag = $self->load_tag( $offset )
781         or $self->_throw_error( "INTERNAL ERROR - Cannot find tag" );
782
783     my $ch = 0;
784     while ($tag->{signature} ne SIG_BLIST) {
785         my $num = ord substr($md5, $ch, 1);
786
787         my $ref_loc = $tag->{offset} + ($num * $self->{long_size});
788         $tag = $self->index_lookup( $tag, $num );
789
790         if (!$tag) {
791             return if !$args->{create};
792
793             my $loc = $self->_request_space(
794                 $self->tag_size( $self->{bucket_list_size} ),
795             );
796
797             my $fh = $self->_fh;
798             seek($fh, $ref_loc + $self->_fileobj->{file_offset}, SEEK_SET);
799             print( $fh pack($self->{long_pack}, $loc) );
800
801             $tag = $self->write_tag(
802                 $loc, SIG_BLIST,
803                 chr(0)x$self->{bucket_list_size},
804             );
805
806             $tag->{ref_loc} = $ref_loc;
807             $tag->{ch} = $ch;
808
809             last;
810         }
811
812         $tag->{ch} = $ch++;
813         $tag->{ref_loc} = $ref_loc;
814     }
815
816     return $tag;
817 }
818
819 sub index_lookup {
820     ##
821     # Given index tag, lookup single entry in index and return .
822     ##
823     my $self = shift;
824     my ($tag, $index) = @_;
825
826     my $location = unpack(
827         $self->{long_pack},
828         substr(
829             $tag->{content},
830             $index * $self->{long_size},
831             $self->{long_size},
832         ),
833     );
834
835     if (!$location) { return; }
836
837     return $self->load_tag( $location );
838 }
839
840 sub traverse_index {
841     ##
842     # Scan index and recursively step into deeper levels, looking for next key.
843     ##
844     my $self = shift;
845     my ($obj, $offset, $ch, $force_return_next) = @_;
846
847     local($/,$\);
848
849     my $tag = $self->load_tag( $offset );
850
851     my $fh = $self->_fh;
852
853     if ($tag->{signature} ne SIG_BLIST) {
854         my $content = $tag->{content};
855         my $start = $obj->{return_next} ? 0 : ord(substr($obj->{prev_md5}, $ch, 1));
856
857         for (my $idx = $start; $idx < (2**8); $idx++) {
858             my $subloc = unpack(
859                 $self->{long_pack},
860                 substr(
861                     $content,
862                     $idx * $self->{long_size},
863                     $self->{long_size},
864                 ),
865             );
866
867             if ($subloc) {
868                 my $result = $self->traverse_index(
869                     $obj, $subloc, $ch + 1, $force_return_next,
870                 );
871
872                 if (defined($result)) { return $result; }
873             }
874         } # index loop
875
876         $obj->{return_next} = 1;
877     } # tag is an index
878
879     else {
880         my $keys = $tag->{content};
881         if ($force_return_next) { $obj->{return_next} = 1; }
882
883         ##
884         # Iterate through buckets, looking for a key match
885         ##
886         for (my $i = 0; $i < $self->{max_buckets}; $i++) {
887             my ($key, $subloc) = $self->_get_key_subloc( $keys, $i );
888
889             # End of bucket list -- return to outer loop
890             if (!$subloc) {
891                 $obj->{return_next} = 1;
892                 last;
893             }
894             # Located previous key -- return next one found
895             elsif ($key eq $obj->{prev_md5}) {
896                 $obj->{return_next} = 1;
897                 next;
898             }
899             # Seek to bucket location and skip over signature
900             elsif ($obj->{return_next}) {
901                 seek($fh, $subloc + $self->_fileobj->{file_offset}, SEEK_SET);
902
903                 # Skip over value to get to plain key
904                 my $sig;
905                 read( $fh, $sig, SIG_SIZE );
906
907                 my $size;
908                 read( $fh, $size, $self->{data_size});
909                 $size = unpack($self->{data_pack}, $size);
910                 if ($size) { seek($fh, $size, SEEK_CUR); }
911
912                 # Read in plain key and return as scalar
913                 my $plain_key;
914                 read( $fh, $size, $self->{data_size});
915                 $size = unpack($self->{data_pack}, $size);
916                 if ($size) { read( $fh, $plain_key, $size); }
917
918                 return $plain_key;
919             }
920         }
921
922         $obj->{return_next} = 1;
923     } # tag is a bucket list
924
925     return;
926 }
927
928 sub get_next_key {
929     ##
930     # Locate next key, given digested previous one
931     ##
932     my $self = shift;
933     my ($obj) = @_;
934
935     $obj->{prev_md5} = $_[1] ? $_[1] : undef;
936     $obj->{return_next} = 0;
937
938     ##
939     # If the previous key was not specifed, start at the top and
940     # return the first one found.
941     ##
942     if (!$obj->{prev_md5}) {
943         $obj->{prev_md5} = chr(0) x $self->{hash_size};
944         $obj->{return_next} = 1;
945     }
946
947     return $self->traverse_index( $obj, $obj->_base_offset, 0 );
948 }
949
950 # Utilities
951
952 sub _get_key_subloc {
953     my $self = shift;
954     my ($keys, $idx) = @_;
955
956     my ($key, $subloc, $size, $transaction) = unpack(
957         # This is 'a', not 'A'. Please read the pack() documentation for the
958         # difference between the two and why it's important.
959         "a$self->{hash_size} $self->{long_pack} $self->{long_pack} $self->{long_pack}",
960         substr(
961             $keys,
962             ($idx * $self->{bucket_size}),
963             $self->{bucket_size},
964         ),
965     );
966
967     return ($key, $subloc, $size, $transaction);
968 }
969
970 sub _find_in_buckets {
971     my $self = shift;
972     my ($tag, $md5, $exact) = @_;
973
974     my $trans_id = $self->_fileobj->transaction_id;
975
976     my @zero;
977
978     BUCKET:
979     for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
980         my ($key, $subloc, $size, $transaction_id) = $self->_get_key_subloc(
981             $tag->{content}, $i,
982         );
983
984         my @rv = ($subloc, $i * $self->{bucket_size}, $size);
985
986         unless ( $subloc ) {
987             return @zero if !$exact && @zero and $trans_id;
988             return @rv;
989         }
990
991         next BUCKET if $key ne $md5;
992
993         # Save off the HEAD in case we need it.
994         @zero = @rv if $transaction_id == 0;
995
996         next BUCKET if $transaction_id != $trans_id;
997
998         return @rv;
999     }
1000
1001     return;
1002 }
1003
1004 sub _request_space {
1005     my $self = shift;
1006     my ($size) = @_;
1007
1008     my $loc = $self->_fileobj->{end};
1009     $self->_fileobj->{end} += $size;
1010
1011     return $loc;
1012 }
1013
1014 sub _release_space {
1015     my $self = shift;
1016     my ($size, $loc) = @_;
1017
1018     local($/,$\);
1019
1020     my $next_loc = 0;
1021
1022     my $fh = $self->_fh;
1023     seek( $fh, $loc + $self->_fileobj->{file_offset}, SEEK_SET );
1024     print( $fh SIG_FREE
1025         . pack($self->{long_pack}, $size )
1026         . pack($self->{long_pack}, $next_loc )
1027     );
1028
1029     return;
1030 }
1031
1032 sub _throw_error {
1033     die "DBM::Deep: $_[1]\n";
1034 }
1035
1036 1;
1037 __END__
1038
1039 # This will be added in later, after more refactoring is done. This is an early
1040 # attempt at refactoring on the physical level instead of the virtual level.
1041 sub _read_at {
1042     my $self = shift;
1043     my ($spot, $amount, $unpack) = @_;
1044
1045     local($/,$\);
1046
1047     my $fh = $self->_fh;
1048     seek( $fh, $spot + $self->_fileobj->{file_offset}, SEEK_SET );
1049
1050     my $buffer;
1051     my $bytes_read = read( $fh, $buffer, $amount );
1052
1053     if ( $unpack ) {
1054         $buffer = unpack( $unpack, $buffer );
1055     }
1056
1057     if ( wantarray ) {
1058         return ($buffer, $bytes_read);
1059     }
1060     else {
1061         return $buffer;
1062     }
1063 }
1064
1065 sub _print_at {
1066     my $self = shift;
1067     my ($spot, $data) = @_;
1068
1069     local($/,$\);
1070
1071     my $fh = $self->_fh;
1072     seek( $fh, $spot, SEEK_SET );
1073     print( $fh $data );
1074
1075     return;
1076 }
1077
1078 sub get_file_version {
1079     my $self = shift;
1080
1081     local($/,$\);
1082
1083     my $fh = $self->_fh;
1084
1085     seek( $fh, 13 + $self->_fileobj->{file_offset}, SEEK_SET );
1086     my $buffer;
1087     my $bytes_read = read( $fh, $buffer, 4 );
1088     unless ( $bytes_read == 4 ) {
1089         $self->_throw_error( "Cannot read file version" );
1090     }
1091
1092     return unpack( 'N', $buffer );
1093 }
1094
1095 sub write_file_version {
1096     my $self = shift;
1097     my ($new_version) = @_;
1098
1099     local($/,$\);
1100
1101     my $fh = $self->_fh;
1102
1103     seek( $fh, 13 + $self->_fileobj->{file_offset}, SEEK_SET );
1104     print( $fh pack( 'N', $new_version ) );
1105
1106     return;
1107 }
1108