All auditing now goes through a method on ::File
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 use 5.6.0;
4
5 use strict;
6 use warnings;
7
8 use Fcntl qw( :DEFAULT :flock :seek );
9 use Scalar::Util ();
10
11 # File-wide notes:
12 # * All the local($/,$\); are to protect read() and print() from -l.
13 # * To add to bucket_size, make sure you modify the following:
14 #   - calculate_sizes()
15 #   - _get_key_subloc()
16 #   - add_bucket() - where the buckets are printed
17
18 ##
19 # Setup file and tag signatures.  These should never change.
20 ##
21 sub SIG_FILE     () { 'DPDB' }
22 sub SIG_HEADER   () { 'h'    }
23 sub SIG_INTERNAL () { 'i'    }
24 sub SIG_HASH     () { 'H'    }
25 sub SIG_ARRAY    () { 'A'    }
26 sub SIG_NULL     () { 'N'    }
27 sub SIG_DATA     () { 'D'    }
28 sub SIG_INDEX    () { 'I'    }
29 sub SIG_BLIST    () { 'B'    }
30 sub SIG_FREE     () { 'F'    }
31 sub SIG_SIZE     () {  1     }
32
33 sub new {
34     my $class = shift;
35     my ($args) = @_;
36
37     my $self = bless {
38         long_size   => 4,
39         long_pack   => 'N',
40         data_size   => 4,
41         data_pack   => 'N',
42
43         digest      => \&Digest::MD5::md5,
44         hash_size   => 16,
45
46         ##
47         # Maximum number of buckets per list before another level of indexing is
48         # done. Increase this value for slightly greater speed, but larger database
49         # files. DO NOT decrease this value below 16, due to risk of recursive
50         # reindex overrun.
51         ##
52         max_buckets => 16,
53
54         fileobj => undef,
55         obj     => undef,
56     }, $class;
57
58     if ( defined $args->{pack_size} ) {
59         if ( lc $args->{pack_size} eq 'small' ) {
60             $args->{long_size} = 2;
61             $args->{long_pack} = 'n';
62         }
63         elsif ( lc $args->{pack_size} eq 'medium' ) {
64             $args->{long_size} = 4;
65             $args->{long_pack} = 'N';
66         }
67         elsif ( lc $args->{pack_size} eq 'large' ) {
68             $args->{long_size} = 8;
69             $args->{long_pack} = 'Q';
70         }
71         else {
72             die "Unknown pack_size value: '$args->{pack_size}'\n";
73         }
74     }
75
76     # Grab the parameters we want to use
77     foreach my $param ( keys %$self ) {
78         next unless exists $args->{$param};
79         $self->{$param} = $args->{$param};
80     }
81     Scalar::Util::weaken( $self->{obj} ) if $self->{obj};
82
83     if ( $self->{max_buckets} < 16 ) {
84         warn "Floor of max_buckets is 16. Setting it to 16 from '$self->{max_buckets}'\n";
85         $self->{max_buckets} = 16;
86     }
87
88     return $self;
89 }
90
91 sub _fileobj { return $_[0]{fileobj} }
92 sub _fh      { return $_[0]->_fileobj->{fh} }
93
94 sub calculate_sizes {
95     my $self = shift;
96
97     #XXX Does this need to be updated with different hashing algorithms?
98     $self->{index_size}       = (2**8) * $self->{long_size};
99     $self->{bucket_size}      = $self->{hash_size} + $self->{long_size} * 3;
100     $self->{bucket_list_size} = $self->{max_buckets} * $self->{bucket_size};
101
102     return;
103 }
104
105 sub write_file_header {
106     my $self = shift;
107
108     local($/,$\);
109
110     my $fh = $self->_fh;
111
112     my $loc = $self->_request_space( length( SIG_FILE ) + 21 );
113     seek($fh, $loc + $self->_fileobj->{file_offset}, SEEK_SET);
114     print( $fh
115         SIG_FILE,
116         SIG_HEADER,
117         pack('N', 1),  # header version
118         pack('N', 12), # header size
119         pack('N', 0),  # currently running transaction IDs
120         pack('n', $self->{long_size}),
121         pack('A', $self->{long_pack}),
122         pack('n', $self->{data_size}),
123         pack('A', $self->{data_pack}),
124         pack('n', $self->{max_buckets}),
125     );
126
127     $self->_fileobj->set_transaction_offset( 13 );
128
129     return;
130 }
131
132 sub read_file_header {
133     my $self = shift;
134
135     local($/,$\);
136
137     my $fh = $self->_fh;
138
139     seek($fh, 0 + $self->_fileobj->{file_offset}, SEEK_SET);
140     my $buffer;
141     my $bytes_read = read( $fh, $buffer, length(SIG_FILE) + 9 );
142
143     return unless $bytes_read;
144
145     my ($file_signature, $sig_header, $header_version, $size) = unpack(
146         'A4 A N N', $buffer
147     );
148
149     unless ( $file_signature eq SIG_FILE ) {
150         $self->_fileobj->close;
151         $self->_throw_error( "Signature not found -- file is not a Deep DB" );
152     }
153
154     unless ( $sig_header eq SIG_HEADER ) {
155         $self->_fileobj->close;
156         $self->_throw_error( "Old file version found." );
157     }
158
159     my $buffer2;
160     $bytes_read += read( $fh, $buffer2, $size );
161     my ($running_transactions, @values) = unpack( 'N n A n A n', $buffer2 );
162
163     $self->_fileobj->set_transaction_offset( 13 );
164
165     if ( @values < 5 || grep { !defined } @values ) {
166         $self->_fileobj->close;
167         $self->_throw_error("Corrupted file - bad header");
168     }
169
170     #XXX Add warnings if values weren't set right
171     @{$self}{qw(long_size long_pack data_size data_pack max_buckets)} = @values;
172
173     return $bytes_read;
174 }
175
176 sub setup_fh {
177     my $self = shift;
178     my ($obj) = @_;
179
180     local($/,$\);
181
182     my $fh = $self->_fh;
183     flock $fh, LOCK_EX;
184
185     #XXX The duplication of calculate_sizes needs to go away
186     unless ( $obj->{base_offset} ) {
187         my $bytes_read = $self->read_file_header;
188
189         $self->calculate_sizes;
190
191         ##
192         # File is empty -- write header and master index
193         ##
194         if (!$bytes_read) {
195             $self->_fileobj->audit( "# Database created on" );
196
197             $self->write_file_header;
198
199             $obj->{base_offset} = $self->_request_space( $self->tag_size( $self->{index_size} ) );
200
201             $self->write_tag(
202                 $obj->_base_offset, $obj->_type,
203                 chr(0)x$self->{index_size},
204             );
205
206             # Flush the filehandle
207             my $old_fh = select $fh;
208             my $old_af = $|; $| = 1; $| = $old_af;
209             select $old_fh;
210         }
211         else {
212             $obj->{base_offset} = $bytes_read;
213
214             ##
215             # Get our type from master index header
216             ##
217             my $tag = $self->load_tag($obj->_base_offset);
218             unless ( $tag ) {
219                 flock $fh, LOCK_UN;
220                 $self->_throw_error("Corrupted file, no master index record");
221             }
222
223             unless ($obj->_type eq $tag->{signature}) {
224                 flock $fh, LOCK_UN;
225                 $self->_throw_error("File type mismatch");
226             }
227         }
228     }
229     else {
230         $self->calculate_sizes;
231     }
232
233     #XXX We have to make sure we don't mess up when autoflush isn't turned on
234     unless ( $self->_fileobj->{inode} ) {
235         my @stats = stat($fh);
236         $self->_fileobj->{inode} = $stats[1];
237         $self->_fileobj->{end} = $stats[7];
238     }
239
240     flock $fh, LOCK_UN;
241
242     return 1;
243 }
244
245 sub tag_size {
246     my $self = shift;
247     my ($size) = @_;
248     return SIG_SIZE + $self->{data_size} + $size;
249 }
250
251 sub write_tag {
252     ##
253     # Given offset, signature and content, create tag and write to disk
254     ##
255     my $self = shift;
256     my ($offset, $sig, $content) = @_;
257     my $size = length( $content );
258
259     local($/,$\);
260
261     my $fh = $self->_fh;
262
263     if ( defined $offset ) {
264         seek($fh, $offset + $self->_fileobj->{file_offset}, SEEK_SET);
265     }
266
267     print( $fh $sig . pack($self->{data_pack}, $size) . $content );
268
269     return unless defined $offset;
270
271     return {
272         signature => $sig,
273         size => $size,
274         offset => $offset + SIG_SIZE + $self->{data_size},
275         content => $content
276     };
277 }
278
279 sub load_tag {
280     ##
281     # Given offset, load single tag and return signature, size and data
282     ##
283     my $self = shift;
284     my ($offset) = @_;
285
286     local($/,$\);
287
288 #    print join(':',map{$_||''}caller(1)), $/;
289
290     my $fh = $self->_fh;
291
292     seek($fh, $offset + $self->_fileobj->{file_offset}, SEEK_SET);
293
294     #XXX I'm not sure this check will work if autoflush isn't enabled ...
295     return if eof $fh;
296
297     my $b;
298     read( $fh, $b, SIG_SIZE + $self->{data_size} );
299     my ($sig, $size) = unpack( "A $self->{data_pack}", $b );
300
301     my $buffer;
302     read( $fh, $buffer, $size);
303
304     return {
305         signature => $sig,
306         size => $size,
307         offset => $offset + SIG_SIZE + $self->{data_size},
308         content => $buffer
309     };
310 }
311
312 sub _get_dbm_object {
313     my $item = shift;
314
315     my $obj = eval {
316         local $SIG{__DIE__};
317         if ($item->isa( 'DBM::Deep' )) {
318             return $item;
319         }
320         return;
321     };
322     return $obj if $obj;
323
324     my $r = Scalar::Util::reftype( $item ) || '';
325     if ( $r eq 'HASH' ) {
326         my $obj = eval {
327             local $SIG{__DIE__};
328             my $obj = tied(%$item);
329             if ($obj->isa( 'DBM::Deep' )) {
330                 return $obj;
331             }
332             return;
333         };
334         return $obj if $obj;
335     }
336     elsif ( $r eq 'ARRAY' ) {
337         my $obj = eval {
338             local $SIG{__DIE__};
339             my $obj = tied(@$item);
340             if ($obj->isa( 'DBM::Deep' )) {
341                 return $obj;
342             }
343             return;
344         };
345         return $obj if $obj;
346     }
347
348     return;
349 }
350
351 sub _length_needed {
352     my $self = shift;
353     my ($value, $key) = @_;
354
355     my $is_dbm_deep = eval {
356         local $SIG{'__DIE__'};
357         $value->isa( 'DBM::Deep' );
358     };
359
360     my $len = SIG_SIZE + $self->{data_size}
361             + $self->{data_size} + length( $key );
362
363     if ( $is_dbm_deep && $value->_fileobj eq $self->_fileobj ) {
364         return $len + $self->{long_size};
365     }
366
367     my $r = Scalar::Util::reftype( $value ) || '';
368     if ( $self->_fileobj->{autobless} ) {
369         # This is for the bit saying whether or not this thing is blessed.
370         $len += 1;
371     }
372
373     unless ( $r eq 'HASH' || $r eq 'ARRAY' ) {
374         if ( defined $value ) {
375             $len += length( $value );
376         }
377         return $len;
378     }
379
380     $len += $self->{index_size};
381
382     # if autobless is enabled, must also take into consideration
383     # the class name as it is stored after the key.
384     if ( $self->_fileobj->{autobless} ) {
385         my $c = Scalar::Util::blessed($value);
386         if ( defined $c && !$is_dbm_deep ) {
387             $len += $self->{data_size} + length($c);
388         }
389     }
390
391     return $len;
392 }
393
394 sub add_bucket {
395     ##
396     # Adds one key/value pair to bucket list, given offset, MD5 digest of key,
397     # plain (undigested) key and value.
398     ##
399     my $self = shift;
400     my ($tag, $md5, $plain_key, $value, $deleted, $orig_key) = @_;
401     $deleted ||= 0;
402
403     local($/,$\);
404
405     # This verifies that only supported values will be stored.
406     {
407         my $r = Scalar::Util::reftype( $value );
408         last if !defined $r;
409
410         last if $r eq 'HASH';
411         last if $r eq 'ARRAY';
412
413         $self->_throw_error(
414             "Storage of variables of type '$r' is not supported."
415         );
416     }
417
418     my $location = 0;
419     my $result = 2;
420
421     my $root = $self->_fileobj;
422     my $fh   = $self->_fh;
423
424     my $actual_length = $self->_length_needed( $value, $plain_key );
425
426     #ACID - This is a mutation. Must only find the exact transaction
427     my ($subloc, $offset, $size,$is_deleted) = $self->_find_in_buckets( $tag, $md5, 1 );
428
429     my @transactions;
430     if ( $self->_fileobj->transaction_id == 0 ) {
431         @transactions = $self->_fileobj->current_transactions;
432     }
433
434 #    $self->_release_space( $size, $subloc );
435     # Updating a known md5
436 #XXX This needs updating to use _release_space
437     if ( $subloc ) {
438         $result = 1;
439
440         if ($actual_length <= $size) {
441             $location = $subloc;
442         }
443         else {
444             $location = $self->_request_space( $actual_length );
445             seek(
446                 $fh,
447                 $tag->{offset} + $offset
448               + $self->{hash_size} + $root->{file_offset},
449                 SEEK_SET,
450             );
451             print( $fh pack($self->{long_pack}, $location ) );
452             print( $fh pack($self->{long_pack}, $actual_length ) );
453             print( $fh pack('n n', $root->transaction_id, $deleted ) );
454         }
455     }
456     # Adding a new md5
457     elsif ( defined $offset ) {
458         $location = $self->_request_space( $actual_length );
459
460         seek( $fh, $tag->{offset} + $offset + $root->{file_offset}, SEEK_SET );
461         print( $fh $md5 . pack($self->{long_pack}, $location ) );
462         print( $fh pack($self->{long_pack}, $actual_length ) );
463         print( $fh pack('n n', $root->transaction_id, $deleted ) );
464
465         for ( @transactions ) {
466             my $tag2 = $self->load_tag( $tag->{offset} - SIG_SIZE - $self->{data_size} );
467             $self->_fileobj->{transaction_id} = $_;
468             $self->add_bucket( $tag2, $md5, '', '', 1, $orig_key );
469             $self->_fileobj->{transaction_id} = 0;
470         }
471     }
472     # If bucket didn't fit into list, split into a new index level
473     # split_index() will do the _request_space() call
474     else {
475         $location = $self->split_index( $md5, $tag );
476     }
477
478     $self->write_value( $location, $plain_key, $value, $orig_key );
479
480     return $result;
481 }
482
483 sub write_value {
484     my $self = shift;
485     my ($location, $key, $value, $orig_key) = @_;
486
487     local($/,$\);
488
489     my $fh = $self->_fh;
490     my $root = $self->_fileobj;
491
492     my $dbm_deep_obj = _get_dbm_object( $value );
493     if ( $dbm_deep_obj && $dbm_deep_obj->_fileobj ne $self->_fileobj ) {
494         $self->_throw_error( "Cannot cross-reference. Use export() instead" );
495     }
496
497     seek($fh, $location + $root->{file_offset}, SEEK_SET);
498
499     ##
500     # Write signature based on content type, set content length and write
501     # actual value.
502     ##
503     my $r = Scalar::Util::reftype( $value ) || '';
504     if ( $dbm_deep_obj ) {
505         $self->write_tag( undef, SIG_INTERNAL,pack($self->{long_pack}, $dbm_deep_obj->_base_offset) );
506     }
507     elsif ($r eq 'HASH') {
508         if ( !$dbm_deep_obj && tied %{$value} ) {
509             $self->_throw_error( "Cannot store something that is tied" );
510         }
511         $self->write_tag( undef, SIG_HASH, chr(0)x$self->{index_size} );
512     }
513     elsif ($r eq 'ARRAY') {
514         if ( !$dbm_deep_obj && tied @{$value} ) {
515             $self->_throw_error( "Cannot store something that is tied" );
516         }
517         $self->write_tag( undef, SIG_ARRAY, chr(0)x$self->{index_size} );
518     }
519     elsif (!defined($value)) {
520         $self->write_tag( undef, SIG_NULL, '' );
521     }
522     else {
523         $self->write_tag( undef, SIG_DATA, $value );
524     }
525
526     ##
527     # Plain key is stored AFTER value, as keys are typically fetched less often.
528     ##
529     print( $fh pack($self->{data_pack}, length($key)) . $key );
530
531     # Internal references don't care about autobless
532     return 1 if $dbm_deep_obj;
533
534     ##
535     # If value is blessed, preserve class name
536     ##
537     if ( $root->{autobless} ) {
538         my $c = Scalar::Util::blessed($value);
539         if ( defined $c && !$dbm_deep_obj ) {
540             print( $fh chr(1) );
541             print( $fh pack($self->{data_pack}, length($c)) . $c );
542         }
543         else {
544             print( $fh chr(0) );
545         }
546     }
547
548     ##
549     # Tie the passed in reference so that changes to it are reflected in the
550     # datafile. The use of $location as the base_offset will act as the
551     # the linkage between parent and child.
552     #
553     # The overall assignment is a hack around the fact that just tying doesn't
554     # store the values. This may not be the wrong thing to do.
555     ##
556     if ($r eq 'HASH') {
557         my %x = %$value;
558         tie %$value, 'DBM::Deep', {
559             base_offset => $location,
560             fileobj     => $root,
561             parent      => $self->{obj},
562             parent_key  => $orig_key,
563         };
564         %$value = %x;
565     }
566     elsif ($r eq 'ARRAY') {
567         my @x = @$value;
568         tie @$value, 'DBM::Deep', {
569             base_offset => $location,
570             fileobj     => $root,
571             parent      => $self->{obj},
572             parent_key  => $orig_key,
573         };
574         @$value = @x;
575     }
576
577     return 1;
578 }
579
580 sub split_index {
581     my $self = shift;
582     my ($md5, $tag) = @_;
583
584     local($/,$\);
585
586     my $fh = $self->_fh;
587     my $root = $self->_fileobj;
588
589     my $loc = $self->_request_space(
590         $self->tag_size( $self->{index_size} ),
591     );
592
593     seek($fh, $tag->{ref_loc} + $root->{file_offset}, SEEK_SET);
594     print( $fh pack($self->{long_pack}, $loc) );
595
596     my $index_tag = $self->write_tag(
597         $loc, SIG_INDEX,
598         chr(0)x$self->{index_size},
599     );
600
601     my $newtag_loc = $self->_request_space(
602         $self->tag_size( $self->{bucket_list_size} ),
603     );
604
605     my $keys = $tag->{content}
606              . $md5 . pack($self->{long_pack}, $newtag_loc)
607                     . pack($self->{long_pack}, 0)  # size
608                     . pack($self->{long_pack}, 0); # transaction ID
609
610     my @newloc = ();
611     BUCKET:
612     for (my $i = 0; $i <= $self->{max_buckets}; $i++) {
613         my ($key, $old_subloc, $size) = $self->_get_key_subloc( $keys, $i );
614
615         die "[INTERNAL ERROR]: No key in split_index()\n" unless $key;
616         die "[INTERNAL ERROR]: No subloc in split_index()\n" unless $old_subloc;
617
618         my $num = ord(substr($key, $tag->{ch} + 1, 1));
619
620         if ($newloc[$num]) {
621             seek($fh, $newloc[$num] + $root->{file_offset}, SEEK_SET);
622             my $subkeys;
623             read( $fh, $subkeys, $self->{bucket_list_size});
624
625             # This is looking for the first empty spot
626             my ($subloc, $offset, $size) = $self->_find_in_buckets(
627                 { content => $subkeys }, '',
628             );
629
630             seek($fh, $newloc[$num] + $offset + $root->{file_offset}, SEEK_SET);
631             print( $fh $key . pack($self->{long_pack}, $old_subloc) );
632
633             next;
634         }
635
636         seek($fh, $index_tag->{offset} + ($num * $self->{long_size}) + $root->{file_offset}, SEEK_SET);
637
638         my $loc = $self->_request_space(
639             $self->tag_size( $self->{bucket_list_size} ),
640         );
641
642         print( $fh pack($self->{long_pack}, $loc) );
643
644         my $blist_tag = $self->write_tag(
645             $loc, SIG_BLIST,
646             chr(0)x$self->{bucket_list_size},
647         );
648
649         seek($fh, $blist_tag->{offset} + $root->{file_offset}, SEEK_SET);
650         print( $fh $key . pack($self->{long_pack}, $old_subloc) );
651
652         $newloc[$num] = $blist_tag->{offset};
653     }
654
655     $self->_release_space(
656         $self->tag_size( $self->{bucket_list_size} ),
657         $tag->{offset} - SIG_SIZE - $self->{data_size},
658     );
659
660     return $newtag_loc;
661 }
662
663 sub read_from_loc {
664     my $self = shift;
665     my ($subloc, $orig_key) = @_;
666
667     local($/,$\);
668
669     my $fh = $self->_fh;
670
671     ##
672     # Found match -- seek to offset and read signature
673     ##
674     my $signature;
675     seek($fh, $subloc + $self->_fileobj->{file_offset}, SEEK_SET);
676     read( $fh, $signature, SIG_SIZE);
677
678     ##
679     # If value is a hash or array, return new DBM::Deep object with correct offset
680     ##
681     if (($signature eq SIG_HASH) || ($signature eq SIG_ARRAY)) {
682         my $new_obj = DBM::Deep->new({
683             type        => $signature,
684             base_offset => $subloc,
685             fileobj     => $self->_fileobj,
686             parent      => $self->{obj},
687             parent_key  => $orig_key,
688         });
689
690         if ($new_obj->_fileobj->{autobless}) {
691             ##
692             # Skip over value and plain key to see if object needs
693             # to be re-blessed
694             ##
695             seek($fh, $self->{data_size} + $self->{index_size}, SEEK_CUR);
696
697             my $size;
698             read( $fh, $size, $self->{data_size});
699             $size = unpack($self->{data_pack}, $size);
700             if ($size) { seek($fh, $size, SEEK_CUR); }
701
702             my $bless_bit;
703             read( $fh, $bless_bit, 1);
704             if (ord($bless_bit)) {
705                 ##
706                 # Yes, object needs to be re-blessed
707                 ##
708                 my $class_name;
709                 read( $fh, $size, $self->{data_size});
710                 $size = unpack($self->{data_pack}, $size);
711                 if ($size) { read( $fh, $class_name, $size); }
712                 if ($class_name) { $new_obj = bless( $new_obj, $class_name ); }
713             }
714         }
715
716         return $new_obj;
717     }
718     elsif ( $signature eq SIG_INTERNAL ) {
719         my $size;
720         read( $fh, $size, $self->{data_size});
721         $size = unpack($self->{data_pack}, $size);
722
723         if ( $size ) {
724             my $new_loc;
725             read( $fh, $new_loc, $size );
726             $new_loc = unpack( $self->{long_pack}, $new_loc );
727
728             return $self->read_from_loc( $new_loc, $orig_key );
729         }
730         else {
731             return;
732         }
733     }
734     ##
735     # Otherwise return actual value
736     ##
737     elsif ( $signature eq SIG_DATA ) {
738         my $size;
739         read( $fh, $size, $self->{data_size});
740         $size = unpack($self->{data_pack}, $size);
741
742         my $value = '';
743         if ($size) { read( $fh, $value, $size); }
744         return $value;
745     }
746
747     ##
748     # Key exists, but content is null
749     ##
750     return;
751 }
752
753 sub get_bucket_value {
754     ##
755     # Fetch single value given tag and MD5 digested key.
756     ##
757     my $self = shift;
758     my ($tag, $md5, $orig_key) = @_;
759
760     #ACID - This is a read. Can find exact or HEAD
761     my ($subloc, $offset, $size,$is_deleted) = $self->_find_in_buckets( $tag, $md5 );
762     if ( $subloc && !$is_deleted ) {
763         return $self->read_from_loc( $subloc, $orig_key );
764     }
765     return;
766 }
767
768 sub delete_bucket {
769     ##
770     # Delete single key/value pair given tag and MD5 digested key.
771     ##
772     my $self = shift;
773     my ($tag, $md5, $orig_key) = @_;
774
775     local($/,$\);
776
777     #ACID - This is a mutation. Must only find the exact transaction
778     my ($subloc, $offset, $size) = $self->_find_in_buckets( $tag, $md5, 1 );
779 #XXX This needs _release_space()
780     if ( $subloc ) {
781         my $fh = $self->_fh;
782         seek($fh, $tag->{offset} + $offset + $self->_fileobj->{file_offset}, SEEK_SET);
783         print( $fh substr($tag->{content}, $offset + $self->{bucket_size} ) );
784         print( $fh chr(0) x $self->{bucket_size} );
785
786         return 1;
787     }
788     return;
789 }
790
791 sub bucket_exists {
792     ##
793     # Check existence of single key given tag and MD5 digested key.
794     ##
795     my $self = shift;
796     my ($tag, $md5) = @_;
797
798     #ACID - This is a read. Can find exact or HEAD
799     my ($subloc, $offset, $size, $is_deleted) = $self->_find_in_buckets( $tag, $md5 );
800     return ($subloc && !$is_deleted) && 1;
801 }
802
803 sub find_bucket_list {
804     ##
805     # Locate offset for bucket list, given digested key
806     ##
807     my $self = shift;
808     my ($offset, $md5, $args) = @_;
809     $args = {} unless $args;
810
811     local($/,$\);
812
813     ##
814     # Locate offset for bucket list using digest index system
815     ##
816     my $tag = $self->load_tag( $offset )
817         or $self->_throw_error( "INTERNAL ERROR - Cannot find tag" );
818
819     my $ch = 0;
820     while ($tag->{signature} ne SIG_BLIST) {
821         my $num = ord substr($md5, $ch, 1);
822
823         my $ref_loc = $tag->{offset} + ($num * $self->{long_size});
824         $tag = $self->index_lookup( $tag, $num );
825
826         if (!$tag) {
827             return if !$args->{create};
828
829             my $loc = $self->_request_space(
830                 $self->tag_size( $self->{bucket_list_size} ),
831             );
832
833             my $fh = $self->_fh;
834             seek($fh, $ref_loc + $self->_fileobj->{file_offset}, SEEK_SET);
835             print( $fh pack($self->{long_pack}, $loc) );
836
837             $tag = $self->write_tag(
838                 $loc, SIG_BLIST,
839                 chr(0)x$self->{bucket_list_size},
840             );
841
842             $tag->{ref_loc} = $ref_loc;
843             $tag->{ch} = $ch;
844
845             last;
846         }
847
848         $tag->{ch} = $ch++;
849         $tag->{ref_loc} = $ref_loc;
850     }
851
852     return $tag;
853 }
854
855 sub index_lookup {
856     ##
857     # Given index tag, lookup single entry in index and return .
858     ##
859     my $self = shift;
860     my ($tag, $index) = @_;
861
862     my $location = unpack(
863         $self->{long_pack},
864         substr(
865             $tag->{content},
866             $index * $self->{long_size},
867             $self->{long_size},
868         ),
869     );
870
871     if (!$location) { return; }
872
873     return $self->load_tag( $location );
874 }
875
876 sub traverse_index {
877     ##
878     # Scan index and recursively step into deeper levels, looking for next key.
879     ##
880     my $self = shift;
881     my ($obj, $offset, $ch, $force_return_next) = @_;
882
883     local($/,$\);
884
885     my $tag = $self->load_tag( $offset );
886
887     my $fh = $self->_fh;
888
889     if ($tag->{signature} ne SIG_BLIST) {
890         my $content = $tag->{content};
891         my $start = $obj->{return_next} ? 0 : ord(substr($obj->{prev_md5}, $ch, 1));
892
893         for (my $idx = $start; $idx < (2**8); $idx++) {
894             my $subloc = unpack(
895                 $self->{long_pack},
896                 substr(
897                     $content,
898                     $idx * $self->{long_size},
899                     $self->{long_size},
900                 ),
901             );
902
903             if ($subloc) {
904                 my $result = $self->traverse_index(
905                     $obj, $subloc, $ch + 1, $force_return_next,
906                 );
907
908                 if (defined($result)) { return $result; }
909             }
910         } # index loop
911
912         $obj->{return_next} = 1;
913     } # tag is an index
914
915     else {
916         my $keys = $tag->{content};
917         if ($force_return_next) { $obj->{return_next} = 1; }
918
919         ##
920         # Iterate through buckets, looking for a key match
921         ##
922         for (my $i = 0; $i < $self->{max_buckets}; $i++) {
923             my ($key, $subloc) = $self->_get_key_subloc( $keys, $i );
924
925             # End of bucket list -- return to outer loop
926             if (!$subloc) {
927                 $obj->{return_next} = 1;
928                 last;
929             }
930             # Located previous key -- return next one found
931             elsif ($key eq $obj->{prev_md5}) {
932                 $obj->{return_next} = 1;
933                 next;
934             }
935             # Seek to bucket location and skip over signature
936             elsif ($obj->{return_next}) {
937                 seek($fh, $subloc + $self->_fileobj->{file_offset}, SEEK_SET);
938
939                 # Skip over value to get to plain key
940                 my $sig;
941                 read( $fh, $sig, SIG_SIZE );
942
943                 my $size;
944                 read( $fh, $size, $self->{data_size});
945                 $size = unpack($self->{data_pack}, $size);
946                 if ($size) { seek($fh, $size, SEEK_CUR); }
947
948                 # Read in plain key and return as scalar
949                 my $plain_key;
950                 read( $fh, $size, $self->{data_size});
951                 $size = unpack($self->{data_pack}, $size);
952                 if ($size) { read( $fh, $plain_key, $size); }
953
954                 return $plain_key;
955             }
956         }
957
958         $obj->{return_next} = 1;
959     } # tag is a bucket list
960
961     return;
962 }
963
964 sub get_next_key {
965     ##
966     # Locate next key, given digested previous one
967     ##
968     my $self = shift;
969     my ($obj) = @_;
970
971     $obj->{prev_md5} = $_[1] ? $_[1] : undef;
972     $obj->{return_next} = 0;
973
974     ##
975     # If the previous key was not specifed, start at the top and
976     # return the first one found.
977     ##
978     if (!$obj->{prev_md5}) {
979         $obj->{prev_md5} = chr(0) x $self->{hash_size};
980         $obj->{return_next} = 1;
981     }
982
983     return $self->traverse_index( $obj, $obj->_base_offset, 0 );
984 }
985
986 # Utilities
987
988 sub _get_key_subloc {
989     my $self = shift;
990     my ($keys, $idx) = @_;
991
992     my ($key, $subloc, $size, $transaction_id, $is_deleted) = unpack(
993         # This is 'a', not 'A'. Please read the pack() documentation for the
994         # difference between the two and why it's important.
995         "a$self->{hash_size} $self->{long_pack}2 n2",
996         substr(
997             $keys,
998             ($idx * $self->{bucket_size}),
999             $self->{bucket_size},
1000         ),
1001     );
1002
1003     return ($key, $subloc, $size, $transaction_id, $is_deleted);
1004 }
1005
1006 sub _find_in_buckets {
1007     my $self = shift;
1008     my ($tag, $md5, $exact) = @_;
1009
1010     my $trans_id = $self->_fileobj->transaction_id;
1011
1012     my @zero;
1013
1014     BUCKET:
1015     for ( my $i = 0; $i < $self->{max_buckets}; $i++ ) {
1016         my ($key, $subloc, $size, $transaction_id, $is_deleted) = $self->_get_key_subloc(
1017             $tag->{content}, $i,
1018         );
1019
1020         my @rv = ($subloc, $i * $self->{bucket_size}, $size, $is_deleted);
1021
1022         unless ( $subloc ) {
1023             if ( !$exact && @zero and $trans_id ) {
1024                 @rv = ($zero[2], $zero[0] * $self->{bucket_size},$zero[3],$is_deleted);
1025             }
1026             return @rv;
1027         }
1028
1029         next BUCKET if $key ne $md5;
1030
1031         # Save off the HEAD in case we need it.
1032         @zero = ($i,$key,$subloc,$size,$transaction_id,$is_deleted) if $transaction_id == 0;
1033
1034         next BUCKET if $transaction_id != $trans_id;
1035
1036         return @rv;
1037     }
1038
1039     return;
1040 }
1041
1042 sub _request_space {
1043     my $self = shift;
1044     my ($size) = @_;
1045
1046     my $loc = $self->_fileobj->{end};
1047     $self->_fileobj->{end} += $size;
1048
1049     return $loc;
1050 }
1051
1052 sub _release_space {
1053     my $self = shift;
1054     my ($size, $loc) = @_;
1055
1056     local($/,$\);
1057
1058     my $next_loc = 0;
1059
1060     my $fh = $self->_fh;
1061     seek( $fh, $loc + $self->_fileobj->{file_offset}, SEEK_SET );
1062     print( $fh SIG_FREE
1063         . pack($self->{long_pack}, $size )
1064         . pack($self->{long_pack}, $next_loc )
1065     );
1066
1067     return;
1068 }
1069
1070 sub _throw_error {
1071     die "DBM::Deep: $_[1]\n";
1072 }
1073
1074 1;
1075 __END__
1076
1077 # This will be added in later, after more refactoring is done. This is an early
1078 # attempt at refactoring on the physical level instead of the virtual level.
1079 sub _read_at {
1080     my $self = shift;
1081     my ($spot, $amount, $unpack) = @_;
1082
1083     local($/,$\);
1084
1085     my $fh = $self->_fh;
1086     seek( $fh, $spot + $self->_fileobj->{file_offset}, SEEK_SET );
1087
1088     my $buffer;
1089     my $bytes_read = read( $fh, $buffer, $amount );
1090
1091     if ( $unpack ) {
1092         $buffer = unpack( $unpack, $buffer );
1093     }
1094
1095     if ( wantarray ) {
1096         return ($buffer, $bytes_read);
1097     }
1098     else {
1099         return $buffer;
1100     }
1101 }
1102
1103 sub _print_at {
1104     my $self = shift;
1105     my ($spot, $data) = @_;
1106
1107     local($/,$\);
1108
1109     my $fh = $self->_fh;
1110     seek( $fh, $spot, SEEK_SET );
1111     print( $fh $data );
1112
1113     return;
1114 }
1115
1116 sub get_file_version {
1117     my $self = shift;
1118
1119     local($/,$\);
1120
1121     my $fh = $self->_fh;
1122
1123     seek( $fh, 13 + $self->_fileobj->{file_offset}, SEEK_SET );
1124     my $buffer;
1125     my $bytes_read = read( $fh, $buffer, 4 );
1126     unless ( $bytes_read == 4 ) {
1127         $self->_throw_error( "Cannot read file version" );
1128     }
1129
1130     return unpack( 'N', $buffer );
1131 }
1132
1133 sub write_file_version {
1134     my $self = shift;
1135     my ($new_version) = @_;
1136
1137     local($/,$\);
1138
1139     my $fh = $self->_fh;
1140
1141     seek( $fh, 13 + $self->_fileobj->{file_offset}, SEEK_SET );
1142     print( $fh pack( 'N', $new_version ) );
1143
1144     return;
1145 }
1146