Added ability for ::File to read and write transaction ID
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 use 5.6.0;
4
5 use strict;
6 use warnings;
7
8 use Fcntl qw( :DEFAULT :flock :seek );
9
10 # File-wide notes:
11 # * All the local($/,$\); are to protect read() and print() from -l.
12 # * To add to bucket_size, make sure you modify the following:
13 #   - calculate_sizes()
14 #   - _get_key_subloc()
15 #   - add_bucket() - where the buckets are printed
16
17 ##
18 # Setup file and tag signatures.  These should never change.
19 ##
20 sub SIG_FILE     () { 'DPDB' }
21 sub SIG_HEADER   () { 'h'    }
22 sub SIG_INTERNAL () { 'i'    }
23 sub SIG_HASH     () { 'H'    }
24 sub SIG_ARRAY    () { 'A'    }
25 sub SIG_NULL     () { 'N'    }
26 sub SIG_DATA     () { 'D'    }
27 sub SIG_INDEX    () { 'I'    }
28 sub SIG_BLIST    () { 'B'    }
29 sub SIG_FREE     () { 'F'    }
30 sub SIG_SIZE     () {  1     }
31
32 sub new {
33     my $class = shift;
34     my ($args) = @_;
35
36     my $self = bless {
37         long_size   => 4,
38         long_pack   => 'N',
39         data_size   => 4,
40         data_pack   => 'N',
41
42         digest      => \&Digest::MD5::md5,
43         hash_size   => 16,
44
45         ##
46         # Maximum number of buckets per list before another level of indexing is
47         # done. Increase this value for slightly greater speed, but larger database
48         # files. DO NOT decrease this value below 16, due to risk of recursive
49         # reindex overrun.
50         ##
51         max_buckets => 16,
52
53         fileobj => undef,
54     }, $class;
55
56     if ( defined $args->{pack_size} ) {
57         if ( lc $args->{pack_size} eq 'small' ) {
58             $args->{long_size} = 2;
59             $args->{long_pack} = 'S';
60         }
61         elsif ( lc $args->{pack_size} eq 'medium' ) {
62             $args->{long_size} = 4;
63             $args->{long_pack} = 'N';
64         }
65         elsif ( lc $args->{pack_size} eq 'large' ) {
66             $args->{long_size} = 8;
67             $args->{long_pack} = 'Q';
68         }
69         else {
70             die "Unknown pack_size value: '$args->{pack_size}'\n";
71         }
72     }
73
74     # Grab the parameters we want to use
75     foreach my $param ( keys %$self ) {
76         next unless exists $args->{$param};
77         $self->{$param} = $args->{$param};
78     }
79
80     if ( $self->{max_buckets} < 16 ) {
81         warn "Floor of max_buckets is 16. Setting it to 16 from '$self->{max_buckets}'\n";
82         $self->{max_buckets} = 16;
83     }
84
85     return $self;
86 }
87
88 sub _fileobj { return $_[0]{fileobj} }
89 sub _fh      { return $_[0]->_fileobj->{fh} }
90
91 sub calculate_sizes {
92     my $self = shift;
93
94     #XXX Does this need to be updated with different hashing algorithms?
95     $self->{index_size}       = (2**8) * $self->{long_size};
96     $self->{bucket_size}      = $self->{hash_size} + $self->{long_size} * 3;
97     $self->{bucket_list_size} = $self->{max_buckets} * $self->{bucket_size};
98
99     return;
100 }
101
102 sub write_file_header {
103     my $self = shift;
104
105     local($/,$\);
106
107     my $fh = $self->_fh;
108
109     my $loc = $self->_request_space( length( SIG_FILE ) + 21 );
110     seek($fh, $loc + $self->_fileobj->{file_offset}, SEEK_SET);
111     print( $fh
112         SIG_FILE,
113         SIG_HEADER,
114         pack('N', 1),  # header version
115         pack('N', 12), # header size
116         pack('N', 0),  # currently running transaction IDs
117         pack('S', $self->{long_size}),
118         pack('A', $self->{long_pack}),
119         pack('S', $self->{data_size}),
120         pack('A', $self->{data_pack}),
121         pack('S', $self->{max_buckets}),
122     );
123
124     $self->_fileobj->set_transaction_offset( 13 );
125
126     return;
127 }
128
129 sub read_file_header {
130     my $self = shift;
131
132     local($/,$\);
133
134     my $fh = $self->_fh;
135
136     seek($fh, 0 + $self->_fileobj->{file_offset}, SEEK_SET);
137     my $buffer;
138     my $bytes_read = read( $fh, $buffer, length(SIG_FILE) + 9 );
139
140     return unless $bytes_read;
141
142     my ($file_signature, $sig_header, $header_version, $size) = unpack(
143         'A4 A N N', $buffer
144     );
145
146     unless ( $file_signature eq SIG_FILE ) {
147         $self->_fileobj->close;
148         $self->_throw_error( "Signature not found -- file is not a Deep DB" );
149     }
150
151     unless ( $sig_header eq SIG_HEADER ) {
152         $self->_fileobj->close;
153         $self->_throw_error( "Old file version found." );
154     }
155
156     my $buffer2;
157     $bytes_read += read( $fh, $buffer2, $size );
158     my ($running_transactions, @values) = unpack( 'N S A S A S', $buffer2 );
159
160     $self->_fileobj->set_transaction_offset( 13 );
161
162     if ( @values < 5 || grep { !defined } @values ) {
163         $self->_fileobj->close;
164         $self->_throw_error("Corrupted file - bad header");
165     }
166
167     #XXX Add warnings if values weren't set right
168     @{$self}{qw(long_size long_pack data_size data_pack max_buckets)} = @values;
169
170     return $bytes_read;
171 }
172
173 sub setup_fh {
174     my $self = shift;
175     my ($obj) = @_;
176
177     my $fh = $self->_fh;
178     flock $fh, LOCK_EX;
179
180     #XXX The duplication of calculate_sizes needs to go away
181     unless ( $obj->{base_offset} ) {
182         my $bytes_read = $self->read_file_header;
183
184         $self->calculate_sizes;
185
186         ##
187         # File is empty -- write header and master index
188         ##
189         if (!$bytes_read) {
190             $self->write_file_header;
191
192             $obj->{base_offset} = $self->_request_space( $self->tag_size( $self->{index_size} ) );
193
194             $self->write_tag(
195                 $obj->_base_offset, $obj->_type,
196                 chr(0)x$self->{index_size},
197             );
198
199             # Flush the filehandle
200             my $old_fh = select $fh;
201             my $old_af = $|; $| = 1; $| = $old_af;
202             select $old_fh;
203         }
204         else {
205             $obj->{base_offset} = $bytes_read;
206
207             ##
208             # Get our type from master index header
209             ##
210             my $tag = $self->load_tag($obj->_base_offset)
211                 or $self->_throw_error("Corrupted file, no master index record");
212
213             unless ($obj->_type eq $tag->{signature}) {
214                 $self->_throw_error("File type mismatch");
215             }
216         }
217     }
218     else {
219         $self->calculate_sizes;
220     }
221
222     #XXX We have to make sure we don't mess up when autoflush isn't turned on
223     unless ( $self->_fileobj->{inode} ) {
224         my @stats = stat($fh);
225         $self->_fileobj->{inode} = $stats[1];
226         $self->_fileobj->{end} = $stats[7];
227     }
228
229     flock $fh, LOCK_UN;
230
231     return 1;
232 }
233
234 sub tag_size {
235     my $self = shift;
236     my ($size) = @_;
237     return SIG_SIZE + $self->{data_size} + $size;
238 }
239
240 sub write_tag {
241     ##
242     # Given offset, signature and content, create tag and write to disk
243     ##
244     my $self = shift;
245     my ($offset, $sig, $content) = @_;
246     my $size = length( $content );
247
248     local($/,$\);
249
250     my $fh = $self->_fh;
251
252     if ( defined $offset ) {
253         seek($fh, $offset + $self->_fileobj->{file_offset}, SEEK_SET);
254     }
255
256     print( $fh $sig . pack($self->{data_pack}, $size) . $content );
257
258     return unless defined $offset;
259
260     return {
261         signature => $sig,
262         size => $size,
263         offset => $offset + SIG_SIZE + $self->{data_size},
264         content => $content
265     };
266 }
267
268 sub load_tag {
269     ##
270     # Given offset, load single tag and return signature, size and data
271     ##
272     my $self = shift;
273     my ($offset) = @_;
274
275     local($/,$\);
276
277 #    print join(':',map{$_||''}caller(1)), $/;
278
279     my $fh = $self->_fh;
280
281     seek($fh, $offset + $self->_fileobj->{file_offset}, SEEK_SET);
282
283     #XXX I'm not sure this check will work if autoflush isn't enabled ...
284     return if eof $fh;
285
286     my $b;
287     read( $fh, $b, SIG_SIZE + $self->{data_size} );
288     my ($sig, $size) = unpack( "A $self->{data_pack}", $b );
289
290     my $buffer;
291     read( $fh, $buffer, $size);
292
293     return {
294         signature => $sig,
295         size => $size,
296         offset => $offset + SIG_SIZE + $self->{data_size},
297         content => $buffer
298     };
299 }
300
301 sub _get_dbm_object {
302     my $item = shift;
303
304     my $obj = eval {
305         local $SIG{__DIE__};
306         if ($item->isa( 'DBM::Deep' )) {
307             return $item;
308         }
309         return;
310     };
311     return $obj if $obj;
312
313     my $r = Scalar::Util::reftype( $item ) || '';
314     if ( $r eq 'HASH' ) {
315         my $obj = eval {
316             local $SIG{__DIE__};
317             my $obj = tied(%$item);
318             if ($obj->isa( 'DBM::Deep' )) {
319                 return $obj;
320             }
321             return;
322         };
323         return $obj if $obj;
324     }
325     elsif ( $r eq 'ARRAY' ) {
326         my $obj = eval {
327             local $SIG{__DIE__};
328             my $obj = tied(@$item);
329             if ($obj->isa( 'DBM::Deep' )) {
330                 return $obj;
331             }
332             return;
333         };
334         return $obj if $obj;
335     }
336
337     return;
338 }
339
340 sub _length_needed {
341     my $self = shift;
342     my ($value, $key) = @_;
343
344     my $is_dbm_deep = eval {
345         local $SIG{'__DIE__'};
346         $value->isa( 'DBM::Deep' );
347     };
348
349     my $len = SIG_SIZE + $self->{data_size}
350             + $self->{data_size} + length( $key );
351
352     if ( $is_dbm_deep && $value->_fileobj eq $self->_fileobj ) {
353         return $len + $self->{long_size};
354     }
355
356     my $r = Scalar::Util::reftype( $value ) || '';
357     if ( $self->_fileobj->{autobless} ) {
358         # This is for the bit saying whether or not this thing is blessed.
359         $len += 1;
360     }
361
362     unless ( $r eq 'HASH' || $r eq 'ARRAY' ) {
363         if ( defined $value ) {
364             $len += length( $value );
365         }
366         return $len;
367     }
368
369     $len += $self->{index_size};
370
371     # if autobless is enabled, must also take into consideration
372     # the class name as it is stored after the key.
373     if ( $self->_fileobj->{autobless} ) {
374         my $c = Scalar::Util::blessed($value);
375         if ( defined $c && !$is_dbm_deep ) {
376             $len += $self->{data_size} + length($c);
377         }
378     }
379
380     return $len;
381 }
382
383 sub add_bucket {
384     ##
385     # Adds one key/value pair to bucket list, given offset, MD5 digest of key,
386     # plain (undigested) key and value.
387     ##
388     my $self = shift;
389     my ($tag, $md5, $plain_key, $value) = @_;
390
391     local($/,$\);
392
393     # This verifies that only supported values will be stored.
394     {
395         my $r = Scalar::Util::reftype( $value );
396         last if !defined $r;
397
398         last if $r eq 'HASH';
399         last if $r eq 'ARRAY';
400
401         $self->_throw_error(
402             "Storage of variables of type '$r' is not supported."
403         );
404     }
405
406     my $location = 0;
407     my $result = 2;
408
409     my $root = $self->_fileobj;
410     my $fh   = $self->_fh;
411
412     my $actual_length = $self->_length_needed( $value, $plain_key );
413
414     #ACID - This is a mutation. Must only find the exact transaction
415     my ($subloc, $offset, $size) = $self->_find_in_buckets( $tag, $md5, 1 );
416
417 #    $self->_release_space( $size, $subloc );
418     # Updating a known md5
419 #XXX This needs updating to use _release_space
420     if ( $subloc ) {
421         $result = 1;
422
423         if ($actual_length <= $size) {
424             $location = $subloc;
425         }
426         else {
427             $location = $self->_request_space( $actual_length );
428             seek(
429                 $fh,
430                 $tag->{offset} + $offset
431               + $self->{hash_size} + $root->{file_offset},
432                 SEEK_SET,
433             );
434             print( $fh pack($self->{long_pack}, $location ) );
435             print( $fh pack($self->{long_pack}, $actual_length ) );
436             print( $fh pack($self->{long_pack}, $root->transaction_id ) );
437         }
438     }
439     # Adding a new md5
440     elsif ( defined $offset ) {
441         $location = $self->_request_space( $actual_length );
442
443         seek( $fh, $tag->{offset} + $offset + $root->{file_offset}, SEEK_SET );
444         print( $fh $md5 . pack($self->{long_pack}, $location ) );
445         print( $fh pack($self->{long_pack}, $actual_length ) );
446         print( $fh pack($self->{long_pack}, $root->transaction_id ) );
447     }
448     # If bucket didn't fit into list, split into a new index level
449     # split_index() will do the _request_space() call
450     else {
451         $location = $self->split_index( $md5, $tag );
452     }
453
454     $self->write_value( $location, $plain_key, $value );
455
456     return $result;
457 }
458
459 sub write_value {
460     my $self = shift;
461     my ($location, $key, $value) = @_;
462
463     local($/,$\);
464
465     my $fh = $self->_fh;
466     my $root = $self->_fileobj;
467
468     my $dbm_deep_obj = _get_dbm_object( $value );
469     if ( $dbm_deep_obj && $dbm_deep_obj->_fileobj ne $self->_fileobj ) {
470         $self->_throw_error( "Cannot cross-reference. Use export() instead" );
471     }
472
473     seek($fh, $location + $root->{file_offset}, SEEK_SET);
474
475     ##
476     # Write signature based on content type, set content length and write
477     # actual value.
478     ##
479     my $r = Scalar::Util::reftype( $value ) || '';
480     if ( $dbm_deep_obj ) {
481         $self->write_tag( undef, SIG_INTERNAL,pack($self->{long_pack}, $dbm_deep_obj->_base_offset) );
482     }
483     elsif ($r eq 'HASH') {
484         if ( !$dbm_deep_obj && tied %{$value} ) {
485             $self->_throw_error( "Cannot store something that is tied" );
486         }
487         $self->write_tag( undef, SIG_HASH, chr(0)x$self->{index_size} );
488     }
489     elsif ($r eq 'ARRAY') {
490         if ( !$dbm_deep_obj && tied @{$value} ) {
491             $self->_throw_error( "Cannot store something that is tied" );
492         }
493         $self->write_tag( undef, SIG_ARRAY, chr(0)x$self->{index_size} );
494     }
495     elsif (!defined($value)) {
496         $self->write_tag( undef, SIG_NULL, '' );
497     }
498     else {
499         $self->write_tag( undef, SIG_DATA, $value );
500     }
501
502     ##
503     # Plain key is stored AFTER value, as keys are typically fetched less often.
504     ##
505     print( $fh pack($self->{data_pack}, length($key)) . $key );
506
507     # Internal references don't care about autobless
508     return 1 if $dbm_deep_obj;
509
510     ##
511     # If value is blessed, preserve class name
512     ##
513     if ( $root->{autobless} ) {
514         my $c = Scalar::Util::blessed($value);
515         if ( defined $c && !$dbm_deep_obj ) {
516             print( $fh chr(1) );
517             print( $fh pack($self->{data_pack}, length($c)) . $c );
518         }
519         else {
520             print( $fh chr(0) );
521         }
522     }
523
524     ##
525     # Tie the passed in reference so that changes to it are reflected in the
526     # datafile. The use of $location as the base_offset will act as the
527     # the linkage between parent and child.
528     #
529     # The overall assignment is a hack around the fact that just tying doesn't
530     # store the values. This may not be the wrong thing to do.
531     ##
532     if ($r eq 'HASH') {
533         my %x = %$value;
534         tie %$value, 'DBM::Deep', {
535             base_offset => $location,
536             fileobj     => $root,
537         };
538         %$value = %x;
539     }
540     elsif ($r eq 'ARRAY') {
541         my @x = @$value;
542         tie @$value, 'DBM::Deep', {
543             base_offset => $location,
544             fileobj     => $root,
545         };
546         @$value = @x;
547     }
548
549     return 1;
550 }
551
552 sub split_index {
553     my $self = shift;
554     my ($md5, $tag) = @_;
555
556     local($/,$\);
557
558     my $fh = $self->_fh;
559     my $root = $self->_fileobj;
560
561     my $loc = $self->_request_space(
562         $self->tag_size( $self->{index_size} ),
563     );
564
565     seek($fh, $tag->{ref_loc} + $root->{file_offset}, SEEK_SET);
566     print( $fh pack($self->{long_pack}, $loc) );
567
568     my $index_tag = $self->write_tag(
569         $loc, SIG_INDEX,
570         chr(0)x$self->{index_size},
571     );
572
573     my $newtag_loc = $self->_request_space(
574         $self->tag_size( $self->{bucket_list_size} ),
575     );
576
577     my $keys = $tag->{content}
578              . $md5 . pack($self->{long_pack}, $newtag_loc)
579                     . pack($self->{long_pack}, 0)  # size
580                     . pack($self->{long_pack}, 0); # transaction ID
581
582     my @newloc = ();
583     BUCKET:
584     for (my $i = 0; $i <= $self->{max_buckets}; $i++) {
585         my ($key, $old_subloc, $size) = $self->_get_key_subloc( $keys, $i );
586
587         die "[INTERNAL ERROR]: No key in split_index()\n" unless $key;
588         die "[INTERNAL ERROR]: No subloc in split_index()\n" unless $old_subloc;
589
590         my $num = ord(substr($key, $tag->{ch} + 1, 1));
591
592         if ($newloc[$num]) {
593             seek($fh, $newloc[$num] + $root->{file_offset}, SEEK_SET);
594             my $subkeys;
595             read( $fh, $subkeys, $self->{bucket_list_size});
596
597             # This is looking for the first empty spot
598             my ($subloc, $offset, $size) = $self->_find_in_buckets(
599                 { content => $subkeys }, '',
600             );
601
602             seek($fh, $newloc[$num] + $offset + $root->{file_offset}, SEEK_SET);
603             print( $fh $key . pack($self->{long_pack}, $old_subloc) );
604
605             next;
606         }
607
608         seek($fh, $index_tag->{offset} + ($num * $self->{long_size}) + $root->{file_offset}, SEEK_SET);
609
610         my $loc = $self->_request_space(
611             $self->tag_size( $self->{bucket_list_size} ),
612         );
613
614         print( $fh pack($self->{long_pack}, $loc) );
615
616         my $blist_tag = $self->write_tag(
617             $loc, SIG_BLIST,
618             chr(0)x$self->{bucket_list_size},
619         );
620
621         seek($fh, $blist_tag->{offset} + $root->{file_offset}, SEEK_SET);
622         print( $fh $key . pack($self->{long_pack}, $old_subloc) );
623
624         $newloc[$num] = $blist_tag->{offset};
625     }
626
627     $self->_release_space(
628         $self->tag_size( $self->{bucket_list_size} ),
629         $tag->{offset} - SIG_SIZE - $self->{data_size},
630     );
631
632     return $newtag_loc;
633 }
634
635 sub read_from_loc {
636     my $self = shift;
637     my ($subloc) = @_;
638
639     local($/,$\);
640
641     my $fh = $self->_fh;
642
643     ##
644     # Found match -- seek to offset and read signature
645     ##
646     my $signature;
647     seek($fh, $subloc + $self->_fileobj->{file_offset}, SEEK_SET);
648     read( $fh, $signature, SIG_SIZE);
649
650     ##
651     # If value is a hash or array, return new DBM::Deep object with correct offset
652     ##
653     if (($signature eq SIG_HASH) || ($signature eq SIG_ARRAY)) {
654         my $new_obj = DBM::Deep->new({
655             type => $signature,
656             base_offset => $subloc,
657             fileobj     => $self->_fileobj,
658         });
659
660         if ($new_obj->_fileobj->{autobless}) {
661             ##
662             # Skip over value and plain key to see if object needs
663             # to be re-blessed
664             ##
665             seek($fh, $self->{data_size} + $self->{index_size}, SEEK_CUR);
666
667             my $size;
668             read( $fh, $size, $self->{data_size});
669             $size = unpack($self->{data_pack}, $size);
670             if ($size) { seek($fh, $size, SEEK_CUR); }
671
672             my $bless_bit;
673             read( $fh, $bless_bit, 1);
674             if (ord($bless_bit)) {
675                 ##
676                 # Yes, object needs to be re-blessed
677                 ##
678                 my $class_name;
679                 read( $fh, $size, $self->{data_size});
680                 $size = unpack($self->{data_pack}, $size);
681                 if ($size) { read( $fh, $class_name, $size); }
682                 if ($class_name) { $new_obj = bless( $new_obj, $class_name ); }
683             }
684         }
685
686         return $new_obj;
687     }
688     elsif ( $signature eq SIG_INTERNAL ) {
689         my $size;
690         read( $fh, $size, $self->{data_size});
691         $size = unpack($self->{data_pack}, $size);
692
693         if ( $size ) {
694             my $new_loc;
695             read( $fh, $new_loc, $size );
696             $new_loc = unpack( $self->{long_pack}, $new_loc );
697
698             return $self->read_from_loc( $new_loc );
699         }
700         else {
701             return;
702         }
703     }
704     ##
705     # Otherwise return actual value
706     ##
707     elsif ( $signature eq SIG_DATA ) {
708         my $size;
709         read( $fh, $size, $self->{data_size});
710         $size = unpack($self->{data_pack}, $size);
711
712         my $value = '';
713         if ($size) { read( $fh, $value, $size); }
714         return $value;
715     }
716
717     ##
718     # Key exists, but content is null
719     ##
720     return;
721 }
722
723 sub get_bucket_value {
724     ##
725     # Fetch single value given tag and MD5 digested key.
726     ##
727     my $self = shift;
728     my ($tag, $md5) = @_;
729
730     #ACID - This is a read. Can find exact or HEAD
731     my ($subloc, $offset, $size) = $self->_find_in_buckets( $tag, $md5 );
732     if ( $subloc ) {
733         return $self->read_from_loc( $subloc );
734     }
735     return;
736 }
737
738 sub delete_bucket {
739     ##
740     # Delete single key/value pair given tag and MD5 digested key.
741     ##
742     my $self = shift;
743     my ($tag, $md5) = @_;
744
745     local($/,$\);
746
747     #ACID - This is a mutation. Must only find the exact transaction
748     my ($subloc, $offset, $size) = $self->_find_in_buckets( $tag, $md5, 1 );
749 #XXX This needs _release_space()
750     if ( $subloc ) {
751         my $fh = $self->_fh;
752         seek($fh, $tag->{offset} + $offset + $self->_fileobj->{file_offset}, SEEK_SET);
753         print( $fh substr($tag->{content}, $offset + $self->{bucket_size} ) );
754         print( $fh chr(0) x $self->{bucket_size} );
755
756         return 1;
757     }
758     return;
759 }
760
761 sub bucket_exists {
762     ##
763     # Check existence of single key given tag and MD5 digested key.
764     ##
765     my $self = shift;
766     my ($tag, $md5) = @_;
767
768     #ACID - This is a read. Can find exact or HEAD
769     my ($subloc, $offset, $size) = $self->_find_in_buckets( $tag, $md5 );
770     return $subloc && 1;
771 }
772
773 sub find_bucket_list {
774     ##
775     # Locate offset for bucket list, given digested key
776     ##
777     my $self = shift;
778     my ($offset, $md5, $args) = @_;
779     $args = {} unless $args;
780
781     local($/,$\);
782
783     ##
784     # Locate offset for bucket list using digest index system
785     ##
786     my $tag = $self->load_tag( $offset )
787         or $self->_throw_error( "INTERNAL ERROR - Cannot find tag" );
788
789     my $ch = 0;
790     while ($tag->{signature} ne SIG_BLIST) {
791         my $num = ord substr($md5, $ch, 1);
792
793         my $ref_loc = $tag->{offset} + ($num * $self->{long_size});
794         $tag = $self->index_lookup( $tag, $num );
795
796         if (!$tag) {
797             return if !$args->{create};
798
799             my $loc = $self->_request_space(
800                 $self->tag_size( $self->{bucket_list_size} ),
801             );
802
803             my $fh = $self->_fh;
804             seek($fh, $ref_loc + $self->_fileobj->{file_offset}, SEEK_SET);
805             print( $fh pack($self->{long_pack}, $loc) );
806
807             $tag = $self->write_tag(
808                 $loc, SIG_BLIST,
809                 chr(0)x$self->{bucket_list_size},
810             );
811
812             $tag->{ref_loc} = $ref_loc;
813             $tag->{ch} = $ch;
814
815             last;
816         }
817
818         $tag->{ch} = $ch++;
819         $tag->{ref_loc} = $ref_loc;
820     }
821
822     return $tag;
823 }
824
825 sub index_lookup {
826     ##
827     # Given index tag, lookup single entry in index and return .
828     ##
829     my $self = shift;
830     my ($tag, $index) = @_;
831
832     my $location = unpack(
833         $self->{long_pack},
834         substr(
835             $tag->{content},
836             $index * $self->{long_size},
837             $self->{long_size},
838         ),
839     );
840
841     if (!$location) { return; }
842
843     return $self->load_tag( $location );
844 }
845
846 sub traverse_index {
847     ##
848     # Scan index and recursively step into deeper levels, looking for next key.
849     ##
850     my $self = shift;
851     my ($obj, $offset, $ch, $force_return_next) = @_;
852
853     local($/,$\);
854
855     my $tag = $self->load_tag( $offset );
856
857     my $fh = $self->_fh;
858
859     if ($tag->{signature} ne SIG_BLIST) {
860         my $content = $tag->{content};
861         my $start = $obj->{return_next} ? 0 : ord(substr($obj->{prev_md5}, $ch, 1));
862
863         for (my $idx = $start; $idx < (2**8); $idx++) {
864             my $subloc = unpack(
865                 $self->{long_pack},
866                 substr(
867                     $content,
868                     $idx * $self->{long_size},
869                     $self->{long_size},
870                 ),
871             );
872
873             if ($subloc) {
874                 my $result = $self->traverse_index(
875                     $obj, $subloc, $ch + 1, $force_return_next,
876                 );
877
878                 if (defined($result)) { return $result; }
879             }
880         } # index loop
881
882         $obj->{return_next} = 1;
883     } # tag is an index
884
885     else {
886         my $keys = $tag->{content};
887         if ($force_return_next) { $obj->{return_next} = 1; }
888
889         ##
890         # Iterate through buckets, looking for a key match
891         ##
892         for (my $i = 0; $i < $self->{max_buckets}; $i++) {
893             my ($key, $subloc) = $self->_get_key_subloc( $keys, $i );
894
895             # End of bucket list -- return to outer loop
896             if (!$subloc) {
897                 $obj->{return_next} = 1;
898                 last;
899             }
900             # Located previous key -- return next one found
901             elsif ($key eq $obj->{prev_md5}) {
902                 $obj->{return_next} = 1;
903                 next;
904             }
905             # Seek to bucket location and skip over signature
906             elsif ($obj->{return_next}) {
907                 seek($fh, $subloc + $self->_fileobj->{file_offset}, SEEK_SET);
908
909                 # Skip over value to get to plain key
910                 my $sig;
911                 read( $fh, $sig, SIG_SIZE );
912
913                 my $size;
914                 read( $fh, $size, $self->{data_size});
915                 $size = unpack($self->{data_pack}, $size);
916                 if ($size) { seek($fh, $size, SEEK_CUR); }
917
918                 # Read in plain key and return as scalar
919                 my $plain_key;
920                 read( $fh, $size, $self->{data_size});
921                 $size = unpack($self->{data_pack}, $size);
922                 if ($size) { read( $fh, $plain_key, $size); }
923
924                 return $plain_key;
925             }
926         }
927
928         $obj->{return_next} = 1;
929     } # tag is a bucket list
930
931     return;
932 }
933
934 sub get_next_key {
935     ##
936     # Locate next key, given digested previous one
937     ##
938     my $self = shift;
939     my ($obj) = @_;
940
941     $obj->{prev_md5} = $_[1] ? $_[1] : undef;
942     $obj->{return_next} = 0;
943
944     ##
945     # If the previous key was not specifed, start at the top and
946     # return the first one found.
947     ##
948     if (!$obj->{prev_md5}) {
949         $obj->{prev_md5} = chr(0) x $self->{hash_size};
950         $obj->{return_next} = 1;
951     }
952
953     return $self->traverse_index( $obj, $obj->_base_offset, 0 );
954 }
955
956 # Utilities
957
958 sub _get_key_subloc {
959     my $self = shift;
960     my ($keys, $idx) = @_;
961
962     my ($key, $subloc, $size, $transaction) = unpack(
963         # This is 'a', not 'A'. Please read the pack() documentation for the
964         # difference between the two and why it's important.
965         "a$self->{hash_size} $self->{long_pack}3",
966         substr(
967             $keys,
968             ($idx * $self->{bucket_size}),
969             $self->{bucket_size},
970         ),
971     );
972
973     return ($key, $subloc, $size, $transaction);
974 }
975
976 sub _find_in_buckets {
977     my $self = shift;
978     my ($tag, $md5, $exact) = @_;
979
980     my $trans_id = $self->_fileobj->transaction_id;
981
982     my @zero;
983
984     BUCKET:
985     for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
986         my ($key, $subloc, $size, $transaction_id) = $self->_get_key_subloc(
987             $tag->{content}, $i,
988         );
989
990         my @rv = ($subloc, $i * $self->{bucket_size}, $size);
991
992         unless ( $subloc ) {
993             if ( !$exact && @zero and $trans_id ) {
994                 @rv = ($zero[2], $zero[0] * $self->{bucket_size}, $zero[3]);
995             }
996             return @rv;
997         }
998
999         next BUCKET if $key ne $md5;
1000
1001         # Save off the HEAD in case we need it.
1002         @zero = ($i,$key,$subloc,$size,$transaction_id) if $transaction_id == 0;
1003
1004         next BUCKET if $transaction_id != $trans_id;
1005
1006         return @rv;
1007     }
1008
1009     return;
1010 }
1011
1012 sub _request_space {
1013     my $self = shift;
1014     my ($size) = @_;
1015
1016     my $loc = $self->_fileobj->{end};
1017     $self->_fileobj->{end} += $size;
1018
1019     return $loc;
1020 }
1021
1022 sub _release_space {
1023     my $self = shift;
1024     my ($size, $loc) = @_;
1025
1026     local($/,$\);
1027
1028     my $next_loc = 0;
1029
1030     my $fh = $self->_fh;
1031     seek( $fh, $loc + $self->_fileobj->{file_offset}, SEEK_SET );
1032     print( $fh SIG_FREE
1033         . pack($self->{long_pack}, $size )
1034         . pack($self->{long_pack}, $next_loc )
1035     );
1036
1037     return;
1038 }
1039
1040 sub _throw_error {
1041     die "DBM::Deep: $_[1]\n";
1042 }
1043
1044 1;
1045 __END__
1046
1047 # This will be added in later, after more refactoring is done. This is an early
1048 # attempt at refactoring on the physical level instead of the virtual level.
1049 sub _read_at {
1050     my $self = shift;
1051     my ($spot, $amount, $unpack) = @_;
1052
1053     local($/,$\);
1054
1055     my $fh = $self->_fh;
1056     seek( $fh, $spot + $self->_fileobj->{file_offset}, SEEK_SET );
1057
1058     my $buffer;
1059     my $bytes_read = read( $fh, $buffer, $amount );
1060
1061     if ( $unpack ) {
1062         $buffer = unpack( $unpack, $buffer );
1063     }
1064
1065     if ( wantarray ) {
1066         return ($buffer, $bytes_read);
1067     }
1068     else {
1069         return $buffer;
1070     }
1071 }
1072
1073 sub _print_at {
1074     my $self = shift;
1075     my ($spot, $data) = @_;
1076
1077     local($/,$\);
1078
1079     my $fh = $self->_fh;
1080     seek( $fh, $spot, SEEK_SET );
1081     print( $fh $data );
1082
1083     return;
1084 }
1085
1086 sub get_file_version {
1087     my $self = shift;
1088
1089     local($/,$\);
1090
1091     my $fh = $self->_fh;
1092
1093     seek( $fh, 13 + $self->_fileobj->{file_offset}, SEEK_SET );
1094     my $buffer;
1095     my $bytes_read = read( $fh, $buffer, 4 );
1096     unless ( $bytes_read == 4 ) {
1097         $self->_throw_error( "Cannot read file version" );
1098     }
1099
1100     return unpack( 'N', $buffer );
1101 }
1102
1103 sub write_file_version {
1104     my $self = shift;
1105     my ($new_version) = @_;
1106
1107     local($/,$\);
1108
1109     my $fh = $self->_fh;
1110
1111     seek( $fh, 13 + $self->_fileobj->{file_offset}, SEEK_SET );
1112     print( $fh pack( 'N', $new_version ) );
1113
1114     return;
1115 }
1116