Fixed a couple of overlooks in reading an existing file's header.
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 use 5.006_000;
4
5 use strict;
6 use warnings FATAL => 'all';
7
8 # Never import symbols into our namespace. We are a class, not a library.
9 # -RobK, 2008-05-27
10 use Scalar::Util ();
11
12 #use Data::Dumper ();
13
14 # File-wide notes:
15 # * Every method in here assumes that the storage has been appropriately
16 #   safeguarded. This can be anything from flock() to some sort of manual
17 #   mutex. But, it's the caller's responsability to make sure that this has
18 #   been done.
19
20 # Setup file and tag signatures.  These should never change.
21 sub SIG_FILE     () { 'DPDB' }
22 sub SIG_HEADER   () { 'h'    }
23 sub SIG_HASH     () { 'H'    }
24 sub SIG_ARRAY    () { 'A'    }
25 sub SIG_NULL     () { 'N'    }
26 sub SIG_DATA     () { 'D'    }
27 sub SIG_INDEX    () { 'I'    }
28 sub SIG_BLIST    () { 'B'    }
29 sub SIG_FREE     () { 'F'    }
30 sub SIG_SIZE     () {  1     }
31
32 our $STALE_SIZE = 2;
33
34 # Please refer to the pack() documentation for further information
35 my %StP = (
36     1 => 'C', # Unsigned char value (no order needed as it's just one byte)
37     2 => 'n', # Unsigned short in "network" (big-endian) order
38     4 => 'N', # Unsigned long in "network" (big-endian) order
39     8 => 'Q', # Usigned quad (no order specified, presumably machine-dependent)
40 );
41 sub StP { $StP{$_[1]} }
42
43 # Import these after the SIG_* definitions because those definitions are used
44 # in the headers of these classes. -RobK, 2008-06-20
45 use DBM::Deep::Engine::Sector::BucketList;
46 use DBM::Deep::Engine::Sector::FileHeader;
47 use DBM::Deep::Engine::Sector::Index;
48 use DBM::Deep::Engine::Sector::Null;
49 use DBM::Deep::Engine::Sector::Reference;
50 use DBM::Deep::Engine::Sector::Scalar;
51 use DBM::Deep::Iterator;
52
53 ################################################################################
54
55 sub new {
56     my $class = shift;
57     my ($args) = @_;
58
59     $args->{storage} = DBM::Deep::File->new( $args )
60         unless exists $args->{storage};
61
62     my $self = bless {
63         byte_size   => 4,
64
65         digest      => undef,
66         hash_size   => 16,  # In bytes
67         hash_chars  => 256, # Number of chars the algorithm uses per byte
68         max_buckets => 16,
69         num_txns    => 1,   # The HEAD
70         trans_id    => 0,   # Default to the HEAD
71
72         data_sector_size => 64, # Size in bytes of each data sector
73
74         entries => {}, # This is the list of entries for transactions
75         storage => undef,
76     }, $class;
77
78     # Never allow byte_size to be set directly.
79     delete $args->{byte_size};
80     if ( defined $args->{pack_size} ) {
81         if ( lc $args->{pack_size} eq 'small' ) {
82             $args->{byte_size} = 2;
83         }
84         elsif ( lc $args->{pack_size} eq 'medium' ) {
85             $args->{byte_size} = 4;
86         }
87         elsif ( lc $args->{pack_size} eq 'large' ) {
88             $args->{byte_size} = 8;
89         }
90         else {
91             DBM::Deep->_throw_error( "Unknown pack_size value: '$args->{pack_size}'" );
92         }
93     }
94
95     # Grab the parameters we want to use
96     foreach my $param ( keys %$self ) {
97         next unless exists $args->{$param};
98         $self->{$param} = $args->{$param};
99     }
100
101     my %validations = (
102         max_buckets      => { floor => 16, ceil => 256 },
103         num_txns         => { floor => 1,  ceil => 255 },
104         data_sector_size => { floor => 32, ceil => 256 },
105     );
106
107     while ( my ($attr, $c) = each %validations ) {
108         if (   !defined $self->{$attr}
109             || !length $self->{$attr}
110             || $self->{$attr} =~ /\D/
111             || $self->{$attr} < $c->{floor}
112         ) {
113             $self->{$attr} = '(undef)' if !defined $self->{$attr};
114             warn "Floor of $attr is $c->{floor}. Setting it to $c->{floor} from '$self->{$attr}'\n";
115             $self->{$attr} = $c->{floor};
116         }
117         elsif ( $self->{$attr} > $c->{ceil} ) {
118             warn "Ceiling of $attr is $c->{ceil}. Setting it to $c->{ceil} from '$self->{$attr}'\n";
119             $self->{$attr} = $c->{ceil};
120         }
121     }
122
123     if ( !$self->{digest} ) {
124         require Digest::MD5;
125         $self->{digest} = \&Digest::MD5::md5;
126     }
127
128     return $self;
129 }
130
131 ################################################################################
132
133 sub read_value {
134     my $self = shift;
135     my ($obj, $key) = @_;
136
137     # This will be a Reference sector
138     my $sector = $self->_load_sector( $obj->_base_offset )
139         or return;
140
141     if ( $sector->staleness != $obj->_staleness ) {
142         return;
143     }
144
145     my $key_md5 = $self->_apply_digest( $key );
146
147     my $value_sector = $sector->get_data_for({
148         key_md5    => $key_md5,
149         allow_head => 1,
150     });
151
152     unless ( $value_sector ) {
153         $value_sector = DBM::Deep::Engine::Sector::Null->new({
154             engine => $self,
155             data   => undef,
156         });
157
158         $sector->write_data({
159             key_md5 => $key_md5,
160             key     => $key,
161             value   => $value_sector,
162         });
163     }
164
165     return $value_sector->data;
166 }
167
168 sub get_classname {
169     my $self = shift;
170     my ($obj) = @_;
171
172     # This will be a Reference sector
173     my $sector = $self->_load_sector( $obj->_base_offset )
174         or DBM::Deep->_throw_error( "How did get_classname fail (no sector for '$obj')?!" );
175
176     if ( $sector->staleness != $obj->_staleness ) {
177         return;
178     }
179
180     return $sector->get_classname;
181 }
182
183 sub make_reference {
184     my $self = shift;
185     my ($obj, $old_key, $new_key) = @_;
186
187     # This will be a Reference sector
188     my $sector = $self->_load_sector( $obj->_base_offset )
189         or DBM::Deep->_throw_error( "How did make_reference fail (no sector for '$obj')?!" );
190
191     if ( $sector->staleness != $obj->_staleness ) {
192         return;
193     }
194
195     my $old_md5 = $self->_apply_digest( $old_key );
196
197     my $value_sector = $sector->get_data_for({
198         key_md5    => $old_md5,
199         allow_head => 1,
200     });
201
202     unless ( $value_sector ) {
203         $value_sector = DBM::Deep::Engine::Sector::Null->new({
204             engine => $self,
205             data   => undef,
206         });
207
208         $sector->write_data({
209             key_md5 => $old_md5,
210             key     => $old_key,
211             value   => $value_sector,
212         });
213     }
214
215     if ( $value_sector->isa( 'DBM::Deep::Engine::Sector::Reference' ) ) {
216         $sector->write_data({
217             key     => $new_key,
218             key_md5 => $self->_apply_digest( $new_key ),
219             value   => $value_sector,
220         });
221         $value_sector->increment_refcount;
222     }
223     else {
224         $sector->write_data({
225             key     => $new_key,
226             key_md5 => $self->_apply_digest( $new_key ),
227             value   => $value_sector->clone,
228         });
229     }
230 }
231
232 sub key_exists {
233     my $self = shift;
234     my ($obj, $key) = @_;
235
236     # This will be a Reference sector
237     my $sector = $self->_load_sector( $obj->_base_offset )
238         or return '';
239
240     if ( $sector->staleness != $obj->_staleness ) {
241         return '';
242     }
243
244     my $data = $sector->get_data_for({
245         key_md5    => $self->_apply_digest( $key ),
246         allow_head => 1,
247     });
248
249     # exists() returns 1 or '' for true/false.
250     return $data ? 1 : '';
251 }
252
253 sub delete_key {
254     my $self = shift;
255     my ($obj, $key) = @_;
256
257     my $sector = $self->_load_sector( $obj->_base_offset )
258         or return;
259
260     if ( $sector->staleness != $obj->_staleness ) {
261         return;
262     }
263
264     return $sector->delete_key({
265         key_md5    => $self->_apply_digest( $key ),
266         allow_head => 0,
267     });
268 }
269
270 sub write_value {
271     my $self = shift;
272     my ($obj, $key, $value) = @_;
273
274     my $r = Scalar::Util::reftype( $value ) || '';
275     {
276         last if $r eq '';
277         last if $r eq 'HASH';
278         last if $r eq 'ARRAY';
279
280         DBM::Deep->_throw_error(
281             "Storage of references of type '$r' is not supported."
282         );
283     }
284
285     # This will be a Reference sector
286     my $sector = $self->_load_sector( $obj->_base_offset )
287         or DBM::Deep->_throw_error( "1: Cannot write to a deleted spot in DBM::Deep." );
288
289     if ( $sector->staleness != $obj->_staleness ) {
290         DBM::Deep->_throw_error( "2: Cannot write to a deleted spot in DBM::Deep." );
291     }
292
293     my ($class, $type);
294     if ( !defined $value ) {
295         $class = 'DBM::Deep::Engine::Sector::Null';
296     }
297     elsif ( $r eq 'ARRAY' || $r eq 'HASH' ) {
298         my $tmpvar;
299         if ( $r eq 'ARRAY' ) {
300             $tmpvar = tied @$value;
301         } elsif ( $r eq 'HASH' ) {
302             $tmpvar = tied %$value;
303         }
304
305         if ( $tmpvar ) {
306             my $is_dbm_deep = eval { local $SIG{'__DIE__'}; $tmpvar->isa( 'DBM::Deep' ); };
307
308             unless ( $is_dbm_deep ) {
309                 DBM::Deep->_throw_error( "Cannot store something that is tied." );
310             }
311
312             unless ( $tmpvar->_engine->storage == $self->storage ) {
313                 DBM::Deep->_throw_error( "Cannot store values across DBM::Deep files. Please use export() instead." );
314             }
315
316             # First, verify if we're storing the same thing to this spot. If we are, then
317             # this should be a no-op. -EJS, 2008-05-19
318             my $loc = $sector->get_data_location_for({
319                 key_md5 => $self->_apply_digest( $key ),
320                 allow_head => 1,
321             });
322
323             if ( defined($loc) && $loc == $tmpvar->_base_offset ) {
324                 return 1;
325             }
326
327             #XXX Can this use $loc?
328             my $value_sector = $self->_load_sector( $tmpvar->_base_offset );
329             $sector->write_data({
330                 key     => $key,
331                 key_md5 => $self->_apply_digest( $key ),
332                 value   => $value_sector,
333             });
334             $value_sector->increment_refcount;
335
336             return 1;
337         }
338
339         $class = 'DBM::Deep::Engine::Sector::Reference';
340         $type = substr( $r, 0, 1 );
341     }
342     else {
343         if ( tied($value) ) {
344             DBM::Deep->_throw_error( "Cannot store something that is tied." );
345         }
346         $class = 'DBM::Deep::Engine::Sector::Scalar';
347     }
348
349     # Create this after loading the reference sector in case something bad happens.
350     # This way, we won't allocate value sector(s) needlessly.
351     my $value_sector = $class->new({
352         engine => $self,
353         data   => $value,
354         type   => $type,
355     });
356
357     $sector->write_data({
358         key     => $key,
359         key_md5 => $self->_apply_digest( $key ),
360         value   => $value_sector,
361     });
362
363     # This code is to make sure we write all the values in the $value to the disk
364     # and to make sure all changes to $value after the assignment are reflected
365     # on disk. This may be counter-intuitive at first, but it is correct dwimmery.
366     #   NOTE - simply tying $value won't perform a STORE on each value. Hence, the
367     # copy to a temp value.
368     if ( $r eq 'ARRAY' ) {
369         my @temp = @$value;
370         tie @$value, 'DBM::Deep', {
371             base_offset => $value_sector->offset,
372             staleness   => $value_sector->staleness,
373             storage     => $self->storage,
374             engine      => $self,
375         };
376         @$value = @temp;
377         bless $value, 'DBM::Deep::Array' unless Scalar::Util::blessed( $value );
378     }
379     elsif ( $r eq 'HASH' ) {
380         my %temp = %$value;
381         tie %$value, 'DBM::Deep', {
382             base_offset => $value_sector->offset,
383             staleness   => $value_sector->staleness,
384             storage     => $self->storage,
385             engine      => $self,
386         };
387
388         %$value = %temp;
389         bless $value, 'DBM::Deep::Hash' unless Scalar::Util::blessed( $value );
390     }
391
392     return 1;
393 }
394
395 # XXX Add staleness here
396 sub get_next_key {
397     my $self = shift;
398     my ($obj, $prev_key) = @_;
399
400     # XXX Need to add logic about resetting the iterator if any key in the reference has changed
401     unless ( $prev_key ) {
402         $obj->{iterator} = DBM::Deep::Iterator->new({
403             base_offset => $obj->_base_offset,
404             engine      => $self,
405         });
406     }
407
408     return $obj->{iterator}->get_next_key( $obj );
409 }
410
411 ################################################################################
412
413 sub setup_fh {
414     my $self = shift;
415     my ($obj) = @_;
416
417     return 1 if $obj->_base_offset;
418
419     my $header = DBM::Deep::Engine::Sector::FileHeader->new({
420         engine => $self,
421     });
422
423     # Creating a new file
424     if ( $header->is_new ) {
425         # 1) Create Array/Hash entry
426         my $sector = DBM::Deep::Engine::Sector::Reference->new({
427             engine => $self,
428             type   => $obj->_type,
429         });
430         $obj->{base_offset} = $sector->offset;
431         $obj->{staleness} = $sector->staleness;
432
433         $self->flush;
434     }
435     # Reading from an existing file
436     else {
437         $obj->{base_offset} = $header->size;
438         my $sector = DBM::Deep::Engine::Sector::Reference->new({
439             engine => $self,
440             offset => $obj->_base_offset,
441         });
442         unless ( $sector ) {
443             DBM::Deep->_throw_error("Corrupted file, no master index record");
444         }
445
446         unless ($obj->_type eq $sector->type) {
447             DBM::Deep->_throw_error("File type mismatch");
448         }
449
450         $obj->{staleness} = $sector->staleness;
451     }
452
453     $self->storage->set_inode;
454
455     return 1;
456 }
457
458 sub begin_work {
459     my $self = shift;
460     my ($obj) = @_;
461
462     if ( $self->trans_id ) {
463         DBM::Deep->_throw_error( "Cannot begin_work within an active transaction" );
464     }
465
466     my @slots = $self->read_txn_slots;
467     my $found;
468     for my $i ( 0 .. $#slots ) {
469         next if $slots[$i];
470
471         $slots[$i] = 1;
472         $self->set_trans_id( $i + 1 );
473         $found = 1;
474         last;
475     }
476     unless ( $found ) {
477         DBM::Deep->_throw_error( "Cannot allocate transaction ID" );
478     }
479     $self->write_txn_slots( @slots );
480
481     if ( !$self->trans_id ) {
482         DBM::Deep->_throw_error( "Cannot begin_work - no available transactions" );
483     }
484
485     return;
486 }
487
488 sub rollback {
489     my $self = shift;
490     my ($obj) = @_;
491
492     if ( !$self->trans_id ) {
493         DBM::Deep->_throw_error( "Cannot rollback without an active transaction" );
494     }
495
496     # Each entry is the file location for a bucket that has a modification for
497     # this transaction. The entries need to be expunged.
498     foreach my $entry (@{ $self->get_entries } ) {
499         # Remove the entry here
500         my $read_loc = $entry
501           + $self->hash_size
502           + $self->byte_size
503           + $self->byte_size
504           + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE );
505
506         my $data_loc = $self->storage->read_at( $read_loc, $self->byte_size );
507         $data_loc = unpack( $StP{$self->byte_size}, $data_loc );
508         $self->storage->print_at( $read_loc, pack( $StP{$self->byte_size}, 0 ) );
509
510         if ( $data_loc > 1 ) {
511             $self->_load_sector( $data_loc )->free;
512         }
513     }
514
515     $self->clear_entries;
516
517     my @slots = $self->read_txn_slots;
518     $slots[$self->trans_id-1] = 0;
519     $self->write_txn_slots( @slots );
520     $self->inc_txn_staleness_counter( $self->trans_id );
521     $self->set_trans_id( 0 );
522
523     return 1;
524 }
525
526 sub commit {
527     my $self = shift;
528     my ($obj) = @_;
529
530     if ( !$self->trans_id ) {
531         DBM::Deep->_throw_error( "Cannot commit without an active transaction" );
532     }
533
534     foreach my $entry (@{ $self->get_entries } ) {
535         # Overwrite the entry in head with the entry in trans_id
536         my $base = $entry
537           + $self->hash_size
538           + $self->byte_size;
539
540         my $head_loc = $self->storage->read_at( $base, $self->byte_size );
541         $head_loc = unpack( $StP{$self->byte_size}, $head_loc );
542
543         my $spot = $base + $self->byte_size + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE );
544         my $trans_loc = $self->storage->read_at(
545             $spot, $self->byte_size,
546         );
547
548         $self->storage->print_at( $base, $trans_loc );
549         $self->storage->print_at(
550             $spot,
551             pack( $StP{$self->byte_size} . ' ' . $StP{$STALE_SIZE}, (0) x 2 ),
552         );
553
554         if ( $head_loc > 1 ) {
555             $self->_load_sector( $head_loc )->free;
556         }
557     }
558
559     $self->clear_entries;
560
561     my @slots = $self->read_txn_slots;
562     $slots[$self->trans_id-1] = 0;
563     $self->write_txn_slots( @slots );
564     $self->inc_txn_staleness_counter( $self->trans_id );
565     $self->set_trans_id( 0 );
566
567     return 1;
568 }
569
570 sub read_txn_slots {
571     my $self = shift;
572     my $bl = $self->txn_bitfield_len;
573     my $num_bits = $bl * 8;
574     return split '', unpack( 'b'.$num_bits,
575         $self->storage->read_at(
576             $self->trans_loc, $bl,
577         )
578     );
579 }
580
581 sub write_txn_slots {
582     my $self = shift;
583     my $num_bits = $self->txn_bitfield_len * 8;
584     $self->storage->print_at( $self->trans_loc,
585         pack( 'b'.$num_bits, join('', @_) ),
586     );
587 }
588
589 sub get_running_txn_ids {
590     my $self = shift;
591     my @transactions = $self->read_txn_slots;
592     my @trans_ids = map { $_+1} grep { $transactions[$_] } 0 .. $#transactions;
593 }
594
595 sub get_txn_staleness_counter {
596     my $self = shift;
597     my ($trans_id) = @_;
598
599     # Hardcode staleness of 0 for the HEAD
600     return 0 unless $trans_id;
601
602     return unpack( $StP{$STALE_SIZE},
603         $self->storage->read_at(
604             $self->trans_loc + $self->txn_bitfield_len + $STALE_SIZE * ($trans_id - 1),
605             $STALE_SIZE,
606         )
607     );
608 }
609
610 sub inc_txn_staleness_counter {
611     my $self = shift;
612     my ($trans_id) = @_;
613
614     # Hardcode staleness of 0 for the HEAD
615     return 0 unless $trans_id;
616
617     $self->storage->print_at(
618         $self->trans_loc + $self->txn_bitfield_len + $STALE_SIZE * ($trans_id - 1),
619         pack( $StP{$STALE_SIZE}, $self->get_txn_staleness_counter( $trans_id ) + 1 ),
620     );
621 }
622
623 sub get_entries {
624     my $self = shift;
625     return [ keys %{ $self->{entries}{$self->trans_id} ||= {} } ];
626 }
627
628 sub add_entry {
629     my $self = shift;
630     my ($trans_id, $loc) = @_;
631
632     $self->{entries}{$trans_id} ||= {};
633     $self->{entries}{$trans_id}{$loc} = undef;
634 }
635
636 # If the buckets are being relocated because of a reindexing, the entries
637 # mechanism needs to be made aware of it.
638 sub reindex_entry {
639     my $self = shift;
640     my ($old_loc, $new_loc) = @_;
641
642     TRANS:
643     while ( my ($trans_id, $locs) = each %{ $self->{entries} } ) {
644         if ( exists $locs->{$old_loc} ) {
645             delete $locs->{$old_loc};
646             $locs->{$new_loc} = undef;
647             next TRANS;
648         }
649     }
650 }
651
652 sub clear_entries {
653     my $self = shift;
654     delete $self->{entries}{$self->trans_id};
655 }
656
657 ################################################################################
658
659 sub _load_sector {
660     my $self = shift;
661     my ($offset) = @_;
662
663     # Add a catch for offset of 0 or 1
664     return if !$offset || $offset <= 1;
665
666     unless ( exists $self->sector_cache->{ $offset } ) {
667         my $type = $self->storage->read_at( $offset, $self->SIG_SIZE );
668
669         # XXX Don't we want to do something more proactive here? -RobK, 2008-06-19
670         return if $type eq chr(0);
671
672         if ( $type eq $self->SIG_ARRAY || $type eq $self->SIG_HASH ) {
673             $self->sector_cache->{$offset} = DBM::Deep::Engine::Sector::Reference->new({
674                 engine => $self,
675                 type   => $type,
676                 offset => $offset,
677             });
678         }
679         # XXX Don't we need key_md5 here?
680         elsif ( $type eq $self->SIG_BLIST ) {
681             $self->sector_cache->{$offset} = DBM::Deep::Engine::Sector::BucketList->new({
682                 engine => $self,
683                 type   => $type,
684                 offset => $offset,
685             });
686         }
687         elsif ( $type eq $self->SIG_INDEX ) {
688             $self->sector_cache->{$offset} = DBM::Deep::Engine::Sector::Index->new({
689                 engine => $self,
690                 type   => $type,
691                 offset => $offset,
692             });
693         }
694         elsif ( $type eq $self->SIG_NULL ) {
695             $self->sector_cache->{$offset} = DBM::Deep::Engine::Sector::Null->new({
696                 engine => $self,
697                 type   => $type,
698                 offset => $offset,
699             });
700         }
701         elsif ( $type eq $self->SIG_DATA ) {
702             $self->sector_cache->{$offset} = DBM::Deep::Engine::Sector::Scalar->new({
703                 engine => $self,
704                 type   => $type,
705                 offset => $offset,
706             });
707         }
708         # This was deleted from under us, so just return and let the caller figure it out.
709         elsif ( $type eq $self->SIG_FREE ) {
710             return;
711         }
712         else {
713             DBM::Deep->_throw_error( "'$offset': Don't know what to do with type '$type'" );
714         }
715     }
716
717     return $self->sector_cache->{$offset};
718 }
719
720 sub _apply_digest {
721     my $self = shift;
722     return $self->{digest}->(@_);
723 }
724
725 sub _add_free_blist_sector { shift->_add_free_sector( 0, @_ ) }
726 sub _add_free_data_sector { shift->_add_free_sector( 1, @_ ) }
727 sub _add_free_index_sector { shift->_add_free_sector( 2, @_ ) }
728
729 sub _add_free_sector {
730     my $self = shift;
731     my ($multiple, $offset, $size) = @_;
732
733     my $chains_offset = $multiple * $self->byte_size;
734
735     my $storage = $self->storage;
736
737     # Increment staleness.
738     # XXX Can this increment+modulo be done by "&= 0x1" ?
739     my $staleness = unpack( $StP{$STALE_SIZE}, $storage->read_at( $offset + SIG_SIZE, $STALE_SIZE ) );
740     $staleness = ($staleness + 1 ) % ( 2 ** ( 8 * $STALE_SIZE ) );
741     $storage->print_at( $offset + SIG_SIZE, pack( $StP{$STALE_SIZE}, $staleness ) );
742
743     my $old_head = $storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
744
745     $storage->print_at( $self->chains_loc + $chains_offset,
746         pack( $StP{$self->byte_size}, $offset ),
747     );
748
749     # Record the old head in the new sector after the signature and staleness counter
750     $storage->print_at( $offset + SIG_SIZE + $STALE_SIZE, $old_head );
751 }
752
753 sub _request_blist_sector { shift->_request_sector( 0, @_ ) }
754 sub _request_data_sector { shift->_request_sector( 1, @_ ) }
755 sub _request_index_sector { shift->_request_sector( 2, @_ ) }
756
757 sub _request_sector {
758     my $self = shift;
759     my ($multiple, $size) = @_;
760
761     my $chains_offset = $multiple * $self->byte_size;
762
763     my $old_head = $self->storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
764     my $loc = unpack( $StP{$self->byte_size}, $old_head );
765
766     # We don't have any free sectors of the right size, so allocate a new one.
767     unless ( $loc ) {
768         my $offset = $self->storage->request_space( $size );
769
770         # Zero out the new sector. This also guarantees correct increases
771         # in the filesize.
772         $self->storage->print_at( $offset, chr(0) x $size );
773
774         return $offset;
775     }
776
777     # Read the new head after the signature and the staleness counter
778     my $new_head = $self->storage->read_at( $loc + SIG_SIZE + $STALE_SIZE, $self->byte_size );
779     $self->storage->print_at( $self->chains_loc + $chains_offset, $new_head );
780     $self->storage->print_at(
781         $loc + SIG_SIZE + $STALE_SIZE,
782         pack( $StP{$self->byte_size}, 0 ),
783     );
784
785     return $loc;
786 }
787
788 ################################################################################
789
790 sub sector_cache {
791     my $self = shift;
792     return $self->{sector_cache} ||= {};
793 }
794
795 sub clear_sector_cache {
796     my $self = shift;
797     $self->{sector_cache} = {};
798 }
799
800 sub dirty_sectors {
801     my $self = shift;
802     return $self->{dirty_sectors} ||= {};
803 }
804
805 sub clear_dirty_sectors {
806     my $self = shift;
807     $self->{dirty_sectors} = {};
808 }
809
810 sub add_dirty_sector {
811     my $self = shift;
812     my ($sector) = @_;
813
814 #    if ( exists $self->dirty_sectors->{ $sector->offset } ) {
815 #        DBM::Deep->_throw_error( "We have a duplicate sector!! " . $sector->offset );
816 #    }
817
818     $self->dirty_sectors->{ $sector->offset } = $sector;
819 }
820
821 sub flush {
822     my $self = shift;
823
824     my $sectors = $self->dirty_sectors;
825     for my $offset (sort { $a <=> $b } keys %{ $sectors }) {
826         $sectors->{$offset}->flush;
827     }
828
829     $self->clear_dirty_sectors;
830
831     $self->clear_sector_cache;
832 }
833
834 ################################################################################
835
836 sub lock_exclusive {
837     my $self = shift;
838     my ($obj) = @_;
839     return $self->storage->lock_exclusive( $obj );
840 }
841
842 sub lock_shared {
843     my $self = shift;
844     my ($obj) = @_;
845     return $self->storage->lock_shared( $obj );
846 }
847
848 sub unlock {
849     my $self = shift;
850     my ($obj) = @_;
851
852     my $rv = $self->storage->unlock( $obj );
853
854     $self->flush if $rv;
855
856     return $rv;
857 }
858
859 ################################################################################
860
861 sub storage     { $_[0]{storage} }
862 sub byte_size   { $_[0]{byte_size} }
863 sub hash_size   { $_[0]{hash_size} }
864 sub hash_chars  { $_[0]{hash_chars} }
865 sub num_txns    { $_[0]{num_txns} }
866 sub max_buckets { $_[0]{max_buckets} }
867 sub blank_md5   { chr(0) x $_[0]->hash_size }
868 sub data_sector_size { $_[0]{data_sector_size} }
869
870 # This is a calculated value
871 sub txn_bitfield_len {
872     my $self = shift;
873     unless ( exists $self->{txn_bitfield_len} ) {
874         my $temp = ($self->num_txns) / 8;
875         if ( $temp > int( $temp ) ) {
876             $temp = int( $temp ) + 1;
877         }
878         $self->{txn_bitfield_len} = $temp;
879     }
880     return $self->{txn_bitfield_len};
881 }
882
883 sub trans_id     { $_[0]{trans_id} }
884 sub set_trans_id { $_[0]{trans_id} = $_[1] }
885
886 sub trans_loc     { $_[0]{trans_loc} }
887 sub set_trans_loc { $_[0]{trans_loc} = $_[1] }
888
889 sub chains_loc     { $_[0]{chains_loc} }
890 sub set_chains_loc { $_[0]{chains_loc} = $_[1] }
891
892 sub cache       { $_[0]{cache} ||= {} }
893 sub clear_cache { %{$_[0]->cache} = () }
894
895 sub _dump_file {
896     my $self = shift;
897     $self->flush;
898
899     # Read the header
900     my $header_sector = DBM::Deep::Engine::Sector::FileHeader->new({
901         engine => $self,
902     });
903
904     my %types = (
905         0 => 'B',
906         1 => 'D',
907         2 => 'I',
908     );
909
910     my %sizes = (
911         'D' => $self->data_sector_size,
912         'B' => DBM::Deep::Engine::Sector::BucketList->new({engine=>$self,offset=>1})->size,
913         'I' => DBM::Deep::Engine::Sector::Index->new({engine=>$self,offset=>1})->size,
914     );
915
916     my $return = "";
917
918     # Filesize
919     $return .= "Size: " . (-s $self->storage->{fh}) . $/;
920
921     # Header values
922     $return .= "NumTxns: " . $self->num_txns . $/;
923
924     # Read the free sector chains
925     my %sectors;
926     foreach my $multiple ( 0 .. 2 ) {
927         $return .= "Chains($types{$multiple}):";
928         my $old_loc = $self->chains_loc + $multiple * $self->byte_size;
929         while ( 1 ) {
930             my $loc = unpack(
931                 $StP{$self->byte_size},
932                 $self->storage->read_at( $old_loc, $self->byte_size ),
933             );
934
935             # We're now out of free sectors of this kind.
936             unless ( $loc ) {
937                 last;
938             }
939
940             $sectors{ $types{$multiple} }{ $loc } = undef;
941             $old_loc = $loc + SIG_SIZE + $STALE_SIZE;
942             $return .= " $loc";
943         }
944         $return .= $/;
945     }
946
947     my $spot = $header_sector->size;
948     SECTOR:
949     while ( $spot < $self->storage->{end} ) {
950         # Read each sector in order.
951         my $sector = $self->_load_sector( $spot );
952         if ( !$sector ) {
953             # Find it in the free-sectors that were found already
954             foreach my $type ( keys %sectors ) {
955                 if ( exists $sectors{$type}{$spot} ) {
956                     my $size = $sizes{$type};
957                     $return .= sprintf "%08d: %s %04d\n", $spot, 'F' . $type, $size;
958                     $spot += $size;
959                     next SECTOR;
960                 }
961             }
962
963             die "********\n$return\nDidn't find free sector for $spot in chains\n********\n";
964         }
965         else {
966             $return .= sprintf "%08d: %s  %04d", $spot, $sector->type, $sector->size;
967             if ( $sector->type eq 'D' ) {
968                 $return .= ' ' . $sector->data;
969             }
970             elsif ( $sector->type eq 'A' || $sector->type eq 'H' ) {
971                 $return .= ' REF: ' . $sector->get_refcount;
972             }
973             elsif ( $sector->type eq 'B' ) {
974                 foreach my $bucket ( $sector->chopped_up ) {
975                     $return .= "\n    ";
976                     $return .= sprintf "%08d", unpack($StP{$self->byte_size},
977                         substr( $bucket->[-1], $self->hash_size, $self->byte_size),
978                     );
979                     my $l = unpack( $StP{$self->byte_size},
980                         substr( $bucket->[-1],
981                             $self->hash_size + $self->byte_size,
982                             $self->byte_size,
983                         ),
984                     );
985                     $return .= sprintf " %08d", $l;
986                     foreach my $txn ( 0 .. $self->num_txns - 2 ) {
987                         my $l = unpack( $StP{$self->byte_size},
988                             substr( $bucket->[-1],
989                                 $self->hash_size + 2 * $self->byte_size + $txn * ($self->byte_size + $STALE_SIZE),
990                                 $self->byte_size,
991                             ),
992                         );
993                         $return .= sprintf " %08d", $l;
994                     }
995                 }
996             }
997             $return .= $/;
998
999             $spot += $sector->size;
1000         }
1001     }
1002
1003     return $return;
1004 }
1005
1006 1;
1007 __END__