Committing failing test for freespace staleness
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine3.pm
1 package DBM::Deep::Engine3;
2
3 use 5.6.0;
4
5 use strict;
6
7 our $VERSION = q(0.99_03);
8
9 use Scalar::Util ();
10
11 # File-wide notes:
12 # * Every method in here assumes that the storage has been appropriately
13 #   safeguarded. This can be anything from flock() to some sort of manual
14 #   mutex. But, it's the caller's responsability to make sure that this has
15 #   been done.
16
17 # Setup file and tag signatures.  These should never change.
18 sub SIG_FILE     () { 'DPDB' }
19 sub SIG_HEADER   () { 'h'    }
20 sub SIG_INTERNAL () { 'i'    }
21 sub SIG_HASH     () { 'H'    }
22 sub SIG_ARRAY    () { 'A'    }
23 sub SIG_NULL     () { 'N'    }
24 sub SIG_DATA     () { 'D'    }
25 sub SIG_INDEX    () { 'I'    }
26 sub SIG_BLIST    () { 'B'    }
27 sub SIG_FREE     () { 'F'    }
28 sub SIG_KEYS     () { 'K'    }
29 sub SIG_SIZE     () {  1     }
30
31 ################################################################################
32
33 # Please refer to the pack() documentation for further information
34 my %StP = (
35     1 => 'C', # Unsigned char value (no order specified, presumably ASCII)
36     2 => 'n', # Unsigned short in "network" (big-endian) order
37     4 => 'N', # Unsigned long in "network" (big-endian) order
38     8 => 'Q', # Usigned quad (no order specified, presumably machine-dependent)
39 );
40
41 sub new {
42     my $class = shift;
43     my ($args) = @_;
44
45     my $self = bless {
46         byte_size   => 4,
47
48         digest      => undef,
49         hash_size   => 16, # In bytes
50         max_buckets => 16,
51         num_txns    => 16, # HEAD plus 15 running txns
52         trans_id    => 0,  # Default to the HEAD
53
54         entries => {}, # This is the list of entries for transactions
55         storage => undef,
56     }, $class;
57
58     if ( defined $args->{pack_size} ) {
59         if ( lc $args->{pack_size} eq 'small' ) {
60             $args->{byte_size} = 2;
61         }
62         elsif ( lc $args->{pack_size} eq 'medium' ) {
63             $args->{byte_size} = 4;
64         }
65         elsif ( lc $args->{pack_size} eq 'large' ) {
66             $args->{byte_size} = 8;
67         }
68         else {
69             die "Unknown pack_size value: '$args->{pack_size}'\n";
70         }
71     }
72
73     # Grab the parameters we want to use
74     foreach my $param ( keys %$self ) {
75         next unless exists $args->{$param};
76         $self->{$param} = $args->{$param};
77     }
78
79     $self->{byte_pack} = $StP{ $self->byte_size };
80
81     ##
82     # Number of buckets per blist before another level of indexing is
83     # done. Increase this value for slightly greater speed, but larger database
84     # files. DO NOT decrease this value below 16, due to risk of recursive
85     # reindex overrun.
86     ##
87     if ( $self->{max_buckets} < 16 ) {
88         warn "Floor of max_buckets is 16. Setting it to 16 from '$self->{max_buckets}'\n";
89         $self->{max_buckets} = 16;
90     }
91
92     if ( !$self->{digest} ) {
93         require Digest::MD5;
94         $self->{digest} = \&Digest::MD5::md5;
95     }
96
97     return $self;
98 }
99
100 ################################################################################
101
102 sub read_value {
103     my $self = shift;
104     my ($obj, $key) = @_;
105
106     # This will be a Reference sector
107     my $sector = $self->_load_sector( $obj->_base_offset )
108         or return;
109
110     my $key_md5 = $self->_apply_digest( $key );
111
112     my $value_sector = $sector->get_data_for({
113         key_md5    => $key_md5,
114         allow_head => 1,
115     });
116
117     unless ( $value_sector ) {
118         $value_sector = DBM::Deep::Engine::Sector::Null->new({
119             engine => $self,
120             data   => undef,
121         });
122
123         $sector->write_data({
124             key_md5 => $key_md5,
125             key     => $key,
126             value   => $value_sector,
127         });
128     }
129
130     return $value_sector->data;
131 }
132
133 sub get_classname {
134     my $self = shift;
135     my ($obj) = @_;
136
137     # This will be a Reference sector
138     my $sector = $self->_load_sector( $obj->_base_offset )
139         or die "How did get_classname fail (no sector for '$obj')?!\n";
140
141     return $sector->get_classname;
142 }
143
144 sub key_exists {
145     my $self = shift;
146     my ($obj, $key) = @_;
147
148     # This will be a Reference sector
149     my $sector = $self->_load_sector( $obj->_base_offset )
150         or return '';
151
152     my $data = $sector->get_data_for({
153         key_md5    => $self->_apply_digest( $key ),
154         allow_head => 1,
155     });
156
157     # exists() returns 1 or '' for true/false.
158     return $data ? 1 : '';
159 }
160
161 sub delete_key {
162     my $self = shift;
163     my ($obj, $key) = @_;
164
165     my $sector = $self->_load_sector( $obj->_base_offset )
166         or return;
167
168     return $sector->delete_key({
169         key_md5    => $self->_apply_digest( $key ),
170         allow_head => 0,
171     });
172 }
173
174 sub write_value {
175     my $self = shift;
176     my ($obj, $key, $value) = @_;
177
178     my $r = Scalar::Util::reftype( $value ) || '';
179     {
180         last if $r eq '';
181         last if $r eq 'HASH';
182         last if $r eq 'ARRAY';
183
184         DBM::Deep->_throw_error(
185             "Storage of references of type '$r' is not supported."
186         );
187     }
188
189     my ($class, $type);
190     if ( !defined $value ) {
191         $class = 'DBM::Deep::Engine::Sector::Null';
192     }
193     elsif ( $r eq 'ARRAY' || $r eq 'HASH' ) {
194         if ( $r eq 'ARRAY' && tied(@$value) ) {
195             DBM::Deep->_throw_error( "Cannot store something that is tied." );
196         }
197         if ( $r eq 'HASH' && tied(%$value) ) {
198             DBM::Deep->_throw_error( "Cannot store something that is tied." );
199         }
200         $class = 'DBM::Deep::Engine::Sector::Reference';
201         $type = substr( $r, 0, 1 );
202     }
203     else {
204         $class = 'DBM::Deep::Engine::Sector::Scalar';
205     }
206
207     # This will be a Reference sector
208     my $sector = $self->_load_sector( $obj->_base_offset )
209         or die "Cannot write to a deleted spot in DBM::Deep.\n";
210
211     # Create this after loading the reference sector in case something bad happens.
212     # This way, we won't allocate value sector(s) needlessly.
213     my $value_sector = $class->new({
214         engine => $self,
215         data   => $value,
216         type   => $type,
217     });
218
219     $sector->write_data({
220         key     => $key,
221         key_md5 => $self->_apply_digest( $key ),
222         value   => $value_sector,
223     });
224
225     # This code is to make sure we write all the values in the $value to the disk
226     # and to make sure all changes to $value after the assignment are reflected
227     # on disk. This may be counter-intuitive at first, but it is correct dwimmery.
228     #   NOTE - simply tying $value won't perform a STORE on each value. Hence, the
229     # copy to a temp value.
230     if ( $r eq 'ARRAY' ) {
231         my @temp = @$value;
232         tie @$value, 'DBM::Deep', {
233             base_offset => $value_sector->offset,
234             storage     => $self->storage,
235             engine      => $self,
236         };
237         @$value = @temp;
238         bless $value, 'DBM::Deep::Array' unless Scalar::Util::blessed( $value );
239     }
240     elsif ( $r eq 'HASH' ) {
241         my %temp = %$value;
242         tie %$value, 'DBM::Deep', {
243             base_offset => $value_sector->offset,
244             storage     => $self->storage,
245             engine      => $self,
246         };
247
248         %$value = %temp;
249         bless $value, 'DBM::Deep::Hash' unless Scalar::Util::blessed( $value );
250     }
251
252     return 1;
253 }
254
255 sub get_next_key {
256     my $self = shift;
257     my ($obj, $prev_key) = @_;
258
259     # XXX Need to add logic about resetting the iterator if any key in the reference has changed
260     unless ( $prev_key ) {
261         $obj->{iterator} = DBM::Deep::Engine::Iterator->new({
262             base_offset => $obj->_base_offset,
263             engine      => $self,
264         });
265     }
266
267     return $obj->{iterator}->get_next_key;
268 }
269
270 ################################################################################
271
272 sub setup_fh {
273     my $self = shift;
274     my ($obj) = @_;
275
276     # We're opening the file.
277     unless ( $obj->_base_offset ) {
278         my $bytes_read = $self->_read_file_header;
279
280         # Creating a new file
281         unless ( $bytes_read ) {
282             $self->_write_file_header;
283
284             # 1) Create Array/Hash entry
285             my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
286                 engine => $self,
287                 type   => $obj->_type,
288             });
289             $obj->{base_offset} = $initial_reference->offset;
290
291             $self->storage->flush;
292         }
293         # Reading from an existing file
294         else {
295             $obj->{base_offset} = $bytes_read;
296             my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
297                 engine => $self,
298                 offset => $obj->_base_offset,
299             });
300             unless ( $initial_reference ) {
301                 DBM::Deep->_throw_error("Corrupted file, no master index record");
302             }
303
304             unless ($obj->_type eq $initial_reference->type) {
305                 DBM::Deep->_throw_error("File type mismatch");
306             }
307         }
308     }
309
310     return 1;
311 }
312
313 sub begin_work {
314     my $self = shift;
315     my ($obj) = @_;
316
317     if ( $self->trans_id ) {
318         DBM::Deep->_throw_error( "Cannot begin_work within a transaction" );
319     }
320
321     my @slots = $self->read_txn_slots;
322     for my $i ( 1 .. @slots ) {
323         next if $slots[$i];
324         $slots[$i] = 1;
325         $self->set_trans_id( $i );
326         last;
327     }
328     $self->write_txn_slots( @slots );
329
330     if ( !$self->trans_id ) {
331         DBM::Deep->_throw_error( "Cannot begin_work - no available transactions" );
332     }
333
334     return;
335 }
336
337 sub rollback {
338     my $self = shift;
339     my ($obj) = @_;
340
341     if ( !$self->trans_id ) {
342         DBM::Deep->_throw_error( "Cannot rollback without a transaction" );
343     }
344
345     # Each entry is the file location for a bucket that has a modification for
346     # this transaction. The entries need to be expunged.
347     foreach my $entry (@{ $self->get_entries } ) {
348         # Remove the entry here
349         my $read_loc = $entry
350           + $self->hash_size
351           + $self->byte_size
352           + $self->trans_id * ( $self->byte_size + 4 );
353
354         my $data_loc = $self->storage->read_at( $read_loc, $self->byte_size );
355         $data_loc = unpack( $StP{$self->byte_size}, $data_loc );
356         $self->storage->print_at( $read_loc, pack( $StP{$self->byte_size}, 0 ) );
357
358         if ( $data_loc > 1 ) {
359             $self->_load_sector( $data_loc )->free;
360         }
361     }
362
363     $self->clear_entries;
364
365     my @slots = $self->read_txn_slots;
366     $slots[$self->trans_id] = 0;
367     $self->write_txn_slots( @slots );
368     $self->inc_txn_staleness_counter( $self->trans_id );
369     $self->set_trans_id( 0 );
370
371     return 1;
372 }
373
374 sub commit {
375     my $self = shift;
376     my ($obj) = @_;
377
378     if ( !$self->trans_id ) {
379         DBM::Deep->_throw_error( "Cannot commit without a transaction" );
380     }
381
382     foreach my $entry (@{ $self->get_entries } ) {
383         # Overwrite the entry in head with the entry in trans_id
384         my $base = $entry
385           + $self->hash_size
386           + $self->byte_size;
387
388         my $head_loc = $self->storage->read_at( $base, $self->byte_size );
389         $head_loc = unpack( $StP{$self->byte_size}, $head_loc );
390         my $trans_loc = $self->storage->read_at(
391             $base + $self->trans_id * ( $self->byte_size + 4 ), $self->byte_size,
392         );
393
394         $self->storage->print_at( $base, $trans_loc );
395         $self->storage->print_at(
396             $base + $self->trans_id * ( $self->byte_size + 4 ),
397             pack( $StP{$self->byte_size} . ' N', (0) x 2 ),
398         );
399
400         if ( $head_loc > 1 ) {
401             $self->_load_sector( $head_loc )->free;
402         }
403     }
404
405     $self->clear_entries;
406
407     my @slots = $self->read_txn_slots;
408     $slots[$self->trans_id] = 0;
409     $self->write_txn_slots( @slots );
410     $self->inc_txn_staleness_counter( $self->trans_id );
411     $self->set_trans_id( 0 );
412
413     return 1;
414 }
415
416 sub read_txn_slots {
417     my $self = shift;
418     return split '', unpack( 'b32',
419         $self->storage->read_at(
420             $self->trans_loc, 4,
421         )
422     );
423 }
424
425 sub write_txn_slots {
426     my $self = shift;
427     $self->storage->print_at( $self->trans_loc,
428         pack( 'b32', join('', @_) ),
429     );
430 }
431
432 sub get_running_txn_ids {
433     my $self = shift;
434     my @transactions = $self->read_txn_slots;
435     my @trans_ids = grep { $transactions[$_] } 0 .. $#transactions;
436 }
437
438 sub get_txn_staleness_counter {
439     my $self = shift;
440     my ($trans_id) = @_;
441
442     # Hardcode staleness of 0 for the HEAD
443     return 0 unless $trans_id;
444
445     my $x = unpack( 'N',
446         $self->storage->read_at(
447             $self->trans_loc + 4 * $trans_id,
448             4,
449         )
450     );
451     return $x;
452 }
453
454 sub inc_txn_staleness_counter {
455     my $self = shift;
456     my ($trans_id) = @_;
457
458     # Hardcode staleness of 0 for the HEAD
459     return unless $trans_id;
460
461     $self->storage->print_at(
462         $self->trans_loc + 4 * $trans_id,
463         pack( 'N', $self->get_txn_staleness_counter( $trans_id ) + 1 ),
464     );
465 }
466
467 sub get_entries {
468     my $self = shift;
469     return [ keys %{ $self->{entries}{$self->trans_id} ||= {} } ];
470 }
471
472 sub add_entry {
473     my $self = shift;
474     my ($trans_id, $loc) = @_;
475
476     $self->{entries}{$trans_id} ||= {};
477     $self->{entries}{$trans_id}{$loc} = undef;
478 }
479
480 sub clear_entries {
481     my $self = shift;
482     delete $self->{entries}{$self->trans_id};
483 }
484
485 ################################################################################
486
487 {
488     my $header_fixed = length( SIG_FILE ) + 1 + 4 + 4;
489
490     sub _write_file_header {
491         my $self = shift;
492
493         my $header_var = 1 + 1 + 4 + 4 * $self->num_txns + 2 * $self->byte_size;
494
495         my $loc = $self->storage->request_space( $header_fixed + $header_var );
496
497         $self->storage->print_at( $loc,
498             SIG_FILE,
499             SIG_HEADER,
500             pack('N', 1),           # header version - at this point, we're at 9 bytes
501             pack('N', $header_var), # header size
502             # --- Above is $header_fixed. Below is $header_var
503             pack('C', $self->byte_size),
504             pack('C', $self->max_buckets),
505             pack('N', 0 ),                   # Transaction activeness bitfield
506             pack('N' . $self->num_txns, 0 x $self->num_txns ), # Transaction staleness counters
507             pack($StP{$self->byte_size}, 0), # Start of free chain (blist size)
508             pack($StP{$self->byte_size}, 0), # Start of free chain (data size)
509         );
510
511         $self->set_trans_loc( $header_fixed + 2 );
512         $self->set_chains_loc( $header_fixed + 2 + 4 + 4 * $self->num_txns );
513
514         return;
515     }
516
517     sub _read_file_header {
518         my $self = shift;
519
520         my $buffer = $self->storage->read_at( 0, $header_fixed );
521         return unless length($buffer);
522
523         my ($file_signature, $sig_header, $header_version, $size) = unpack(
524             'A4 A N N', $buffer
525         );
526
527         unless ( $file_signature eq SIG_FILE ) {
528             $self->storage->close;
529             DBM::Deep->_throw_error( "Signature not found -- file is not a Deep DB" );
530         }
531
532         unless ( $sig_header eq SIG_HEADER ) {
533             $self->storage->close;
534             DBM::Deep->_throw_error( "Old file version found." );
535         }
536
537         my $buffer2 = $self->storage->read_at( undef, $size );
538         my @values = unpack( 'C C', $buffer2 );
539
540         $self->set_trans_loc( $header_fixed + 2 );
541         $self->set_chains_loc( $header_fixed + 2 + 4 + 4 * $self->num_txns );
542
543         if ( @values < 2 || grep { !defined } @values ) {
544             $self->storage->close;
545             DBM::Deep->_throw_error("Corrupted file - bad header");
546         }
547
548         #XXX Add warnings if values weren't set right
549         @{$self}{qw(byte_size max_buckets)} = @values;
550
551         my $header_var = 1 + 1 + 4 + 4 * $self->num_txns + 2 * $self->byte_size;
552         unless ( $size eq $header_var ) {
553             $self->storage->close;
554             DBM::Deep->_throw_error( "Unexpected size found ($size <-> $header_var)." );
555         }
556
557         return length($buffer) + length($buffer2);
558     }
559 }
560
561 sub _load_sector {
562     my $self = shift;
563     my ($offset) = @_;
564
565     my $type = $self->storage->read_at( $offset, 1 );
566     return if $type eq chr(0);
567
568     if ( $type eq $self->SIG_ARRAY || $type eq $self->SIG_HASH ) {
569         return DBM::Deep::Engine::Sector::Reference->new({
570             engine => $self,
571             type   => $type,
572             offset => $offset,
573         });
574     }
575     # XXX Don't we need key_md5 here?
576     elsif ( $type eq $self->SIG_BLIST ) {
577         return DBM::Deep::Engine::Sector::BucketList->new({
578             engine => $self,
579             type   => $type,
580             offset => $offset,
581         });
582     }
583     elsif ( $type eq $self->SIG_NULL ) {
584         return DBM::Deep::Engine::Sector::Null->new({
585             engine => $self,
586             type   => $type,
587             offset => $offset,
588         });
589     }
590     elsif ( $type eq $self->SIG_DATA ) {
591         return DBM::Deep::Engine::Sector::Scalar->new({
592             engine => $self,
593             type   => $type,
594             offset => $offset,
595         });
596     }
597     # This was deleted from under us, so just return and let the caller figure it out.
598     elsif ( $type eq $self->SIG_FREE ) {
599         return;
600     }
601
602     die "'$offset': Don't know what to do with type '$type'\n";
603 }
604
605 sub _apply_digest {
606     my $self = shift;
607     return $self->{digest}->(@_);
608 }
609
610 sub _add_free_sector {
611     my $self = shift;
612     my ($offset, $size) = @_;
613
614     my $chains_offset;
615     # Data sector
616     if ( $size == 256 ) {
617         $chains_offset = $self->byte_size;
618     }
619     # Blist sector
620     else {
621         $chains_offset = 0;
622     }
623
624     my $old_head = $self->storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
625
626     $self->storage->print_at( $self->chains_loc + $chains_offset,
627         pack( $StP{$self->byte_size}, $offset ),
628     );
629
630     # Record the old head in the new sector after the signature
631     $self->storage->print_at( $offset + 1, $old_head );
632 }
633
634 sub _request_sector {
635     my $self = shift;
636     my ($size) = @_;
637
638     my $chains_offset;
639     # Data sector
640     if ( $size == 256 ) {
641         $chains_offset = $self->byte_size;
642     }
643     # Blist sector
644     else {
645         $chains_offset = 0;
646     }
647
648     my $old_head = $self->storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
649     my $loc = unpack( $StP{$self->byte_size}, $old_head );
650
651     # We don't have any free sectors of the right size, so allocate a new one.
652     unless ( $loc ) {
653         return $self->storage->request_space( $size );
654     }
655
656     my $new_head = $self->storage->read_at( $loc + 1, $self->byte_size );
657     $self->storage->print_at( $self->chains_loc + $chains_offset, $new_head );
658
659     return $loc;
660 }
661
662 ################################################################################
663
664 sub storage     { $_[0]{storage} }
665 sub byte_size   { $_[0]{byte_size} }
666 sub hash_size   { $_[0]{hash_size} }
667 sub num_txns    { $_[0]{num_txns} }
668 sub max_buckets { $_[0]{max_buckets} }
669 sub blank_md5   { chr(0) x $_[0]->hash_size }
670
671 sub trans_id     { $_[0]{trans_id} }
672 sub set_trans_id { $_[0]{trans_id} = $_[1] }
673
674 sub trans_loc     { $_[0]{trans_loc} }
675 sub set_trans_loc { $_[0]{trans_loc} = $_[1] }
676
677 sub chains_loc     { $_[0]{chains_loc} }
678 sub set_chains_loc { $_[0]{chains_loc} = $_[1] }
679
680 ################################################################################
681
682 package DBM::Deep::Engine::Iterator;
683
684 sub new {
685     my $class = shift;
686     my ($args) = @_;
687
688     my $self = bless {
689         breadcrumbs => [],
690         engine      => $args->{engine},
691         base_offset => $args->{base_offset},
692     }, $class;
693
694     Scalar::Util::weaken( $self->{engine} );
695
696     return $self;
697 }
698
699 sub reset {
700     my $self = shift;
701     $self->{breadcrumbs} = [];
702 }
703
704 sub get_next_key {
705     my $self = shift;
706
707     my $crumbs = $self->{breadcrumbs};
708
709     unless ( @$crumbs ) {
710         # This will be a Reference sector
711         my $sector = $self->{engine}->_load_sector( $self->{base_offset} )
712             # or die "Iterator: How did this fail (no ref sector for '$self->{base_offset}')?!\n";
713             # If no sector is found, thist must have been deleted from under us.
714             or return;
715         push @$crumbs, [ $sector->get_blist_loc, 0 ];
716     }
717
718     my $key;
719     while ( 1 ) {
720         my ($offset, $idx) = @{ $crumbs->[-1] };
721         unless ( $offset ) {
722             $self->reset;
723             last;
724         }
725
726         if ( $idx >= $self->{engine}->max_buckets ) {
727             $self->reset;
728             last;
729         }
730
731         my $sector = $self->{engine}->_load_sector( $offset )
732             or die "Iterator: How did this fail (no blist sector for '$offset')?!\n";
733
734         #XXX Think this through!
735         my $loc =  $sector->get_data_location_for({
736             idx => $idx,
737             allow_head => 1,
738         });
739         unless ( $loc ) {
740             $crumbs->[-1][1]++;
741             next;
742         }
743
744         my $key_sector = $sector->get_key_for( $idx );
745         unless ( $key_sector ) {
746             $self->reset;
747             last;
748         }
749
750         $crumbs->[-1][1]++;
751         $key = $key_sector->data;
752         last;
753     }
754
755     return $key;
756 }
757
758 package DBM::Deep::Engine::Sector;
759
760 sub new {
761     my $self = bless $_[1], $_[0];
762     Scalar::Util::weaken( $self->{engine} );
763     $self->_init;
764     return $self;
765 }
766 sub _init {}
767 sub clone { die "Must be implemented in the child class" }
768
769 sub engine { $_[0]{engine} }
770 sub offset { $_[0]{offset} }
771 sub type   { $_[0]{type} }
772
773 sub free {
774     my $self = shift;
775
776     $self->engine->storage->print_at( $self->offset,
777         $self->engine->SIG_FREE,
778         chr(0) x ($self->size - 1),
779     );
780
781     $self->engine->_add_free_sector(
782         $self->offset, $self->size,
783     );
784
785     return;
786 }
787
788 package DBM::Deep::Engine::Sector::Data;
789
790 our @ISA = qw( DBM::Deep::Engine::Sector );
791
792 # This is in bytes
793 sub size { return 256 }
794
795 sub clone {
796     my $self = shift;
797     return ref($self)->new({
798         engine => $self->engine,
799         data   => $self->data,
800         type   => $self->type,
801     });
802 }
803
804 package DBM::Deep::Engine::Sector::Scalar;
805
806 our @ISA = qw( DBM::Deep::Engine::Sector::Data );
807
808 sub free {
809     my $self = shift;
810
811     my $chain_loc = $self->chain_loc;
812
813     $self->SUPER::free();
814
815     if ( $chain_loc ) {
816         $self->engine->_load_sector( $chain_loc )->free;
817     }
818
819     return;
820 }
821
822 sub type { $_[0]{engine}->SIG_DATA }
823 sub _init {
824     my $self = shift;
825
826     my $engine = $self->engine;
827
828     unless ( $self->offset ) {
829         my $data_section = $self->size - 3 - 1 * $engine->byte_size;
830
831         $self->{offset} = $engine->_request_sector( $self->size );
832
833         my $data = delete $self->{data};
834         my $dlen = length $data;
835         my $continue = 1;
836         my $curr_offset = $self->offset;
837         while ( $continue ) {
838
839             my $next_offset = 0;
840
841             my ($leftover, $this_len, $chunk);
842             if ( $dlen > $data_section ) {
843                 $leftover = 0;
844                 $this_len = $data_section;
845                 $chunk = substr( $data, 0, $this_len );
846
847                 $dlen -= $data_section;
848                 $next_offset = $engine->_request_sector( $self->size );
849                 $data = substr( $data, $this_len );
850             }
851             else {
852                 $leftover = $data_section - $dlen;
853                 $this_len = $dlen;
854                 $chunk = $data;
855
856                 $continue = 0;
857             }
858
859             $engine->storage->print_at( $curr_offset,
860                 $self->type,                                     # Sector type
861                 pack( $StP{1}, 0 ),                              # Recycled counter
862                 pack( $StP{$engine->byte_size}, $next_offset ),  # Chain loc
863                 pack( $StP{1}, $this_len ),                      # Data length
864                 $chunk,                                          # Data to be stored in this sector
865                 chr(0) x $leftover,                              # Zero-fill the rest
866             );
867
868             $curr_offset = $next_offset;
869         }
870
871         return;
872     }
873 }
874
875 sub data_length {
876     my $self = shift;
877
878     my $buffer = $self->engine->storage->read_at(
879         $self->offset + 2 + $self->engine->byte_size, 1
880     );
881
882     return unpack( $StP{1}, $buffer );
883 }
884
885 sub chain_loc {
886     my $self = shift;
887     my $chain_loc = $self->engine->storage->read_at(
888         $self->offset + 2, $self->engine->byte_size,
889     );
890     return unpack( $StP{$self->engine->byte_size}, $chain_loc );
891 }
892
893 sub data {
894     my $self = shift;
895
896     my $data;
897     while ( 1 ) {
898         my $chain_loc = $self->chain_loc;
899
900         $data .= $self->engine->storage->read_at(
901             $self->offset + 2 + $self->engine->byte_size + 1, $self->data_length,
902         );
903
904         last unless $chain_loc;
905
906         $self = $self->engine->_load_sector( $chain_loc );
907     }
908
909     return $data;
910 }
911
912 package DBM::Deep::Engine::Sector::Null;
913
914 our @ISA = qw( DBM::Deep::Engine::Sector::Data );
915
916 sub type { $_[0]{engine}->SIG_NULL }
917 sub data_length { 0 }
918 sub data { return }
919
920 sub _init {
921     my $self = shift;
922
923     my $engine = $self->engine;
924
925     unless ( $self->offset ) {
926         my $leftover = $self->size - 3 - 1 * $engine->byte_size;
927
928         $self->{offset} = $engine->_request_sector( $self->size );
929         $engine->storage->print_at( $self->offset,
930             $self->type,                          # Sector type
931             pack( $StP{1}, 0 ),                   # Recycled counter
932             pack( $StP{$engine->byte_size}, 0 ),  # Chain loc
933             pack( $StP{1}, $self->data_length ),  # Data length
934             chr(0) x $leftover,                   # Zero-fill the rest
935         );
936
937         return;
938     }
939 }
940
941 package DBM::Deep::Engine::Sector::Reference;
942
943 our @ISA = qw( DBM::Deep::Engine::Sector::Data );
944
945 sub _init {
946     my $self = shift;
947
948     my $engine = $self->engine;
949
950     unless ( $self->offset ) {
951         my $classname = Scalar::Util::blessed( delete $self->{data} );
952         my $leftover = $self->size - 2 - 2 * $engine->byte_size;
953
954         my $class_offset = 0;
955         if ( defined $classname ) {
956             my $class_sector = DBM::Deep::Engine::Sector::Scalar->new({
957                 engine => $self->engine,
958                 data   => $classname,
959             });
960             $class_offset = $class_sector->offset;
961         }
962
963         $self->{offset} = $engine->_request_sector( $self->size );
964         $engine->storage->print_at( $self->offset,
965             $self->type,                                     # Sector type
966             pack( $StP{1}, 0 ),                              # Recycled counter
967             pack( $StP{$engine->byte_size}, 0 ),             # Index/BList loc
968             pack( $StP{$engine->byte_size}, $class_offset ), # Classname loc
969             chr(0) x $leftover,                              # Zero-fill the rest
970         );
971
972         return;
973     }
974
975     $self->{type} = $engine->storage->read_at( $self->offset, 1 );
976
977     return;
978 }
979
980 sub get_data_for {
981     my $self = shift;
982     my ($args) = @_;
983
984     # Assume that the head is not allowed unless otherwise specified.
985     $args->{allow_head} = 0 unless exists $args->{allow_head};
986
987     # Assume we don't create a new blist location unless otherwise specified.
988     $args->{create} = 0 unless exists $args->{create};
989
990     my $blist = $self->get_bucket_list({
991         key_md5 => $args->{key_md5},
992         create  => $args->{create},
993     });
994     return unless $blist && $blist->{found};
995
996     # At this point, $blist knows where the md5 is. What it -doesn't- know yet
997     # is whether or not this transaction has this key. That's part of the next
998     # function call.
999     my $location = $blist->get_data_location_for({
1000         allow_head => $args->{allow_head},
1001     }) or return;
1002
1003     return $self->engine->_load_sector( $location );
1004 }
1005
1006 sub write_data {
1007     my $self = shift;
1008     my ($args) = @_;
1009
1010     my $blist = $self->get_bucket_list({
1011         key_md5 => $args->{key_md5},
1012         create  => 1,
1013     }) or die "How did write_data fail (no blist)?!\n";
1014
1015     # Handle any transactional bookkeeping.
1016     if ( $self->engine->trans_id ) {
1017         if ( ! $blist->{found} ) {
1018             $blist->mark_deleted({
1019                 trans_id => 0,
1020             });
1021         }
1022     }
1023     else {
1024         my @trans_ids = $self->engine->get_running_txn_ids;
1025         if ( $blist->{found} ) {
1026             if ( @trans_ids ) {
1027                 my $old_value = $blist->get_data_for;
1028                 foreach my $other_trans_id ( @trans_ids ) {
1029                     next if $blist->get_data_location_for({
1030                         trans_id   => $other_trans_id,
1031                         allow_head => 0,
1032                     });
1033                     $blist->write_md5({
1034                         trans_id => $other_trans_id,
1035                         key      => $args->{key},
1036                         key_md5  => $args->{key_md5},
1037                         value    => $old_value->clone,
1038                     });
1039                 }
1040             }
1041         }
1042         else {
1043             if ( @trans_ids ) {
1044                 foreach my $other_trans_id ( @trans_ids ) {
1045                     next if $blist->get_data_location_for({ trans_id => $other_trans_id, allow_head => 0 });
1046                     $blist->mark_deleted({
1047                         trans_id => $other_trans_id,
1048                     });
1049                 }
1050             }
1051         }
1052     }
1053
1054     #XXX Is this safe to do transactionally?
1055     # Free the place we're about to write to.
1056     if ( $blist->get_data_location_for({ allow_head => 0 }) ) {
1057         $blist->get_data_for({ allow_head => 0 })->free;
1058     }
1059
1060     $blist->write_md5({
1061         key      => $args->{key},
1062         key_md5  => $args->{key_md5},
1063         value    => $args->{value},
1064     });
1065 }
1066
1067 sub delete_key {
1068     my $self = shift;
1069     my ($args) = @_;
1070
1071     # XXX What should happen if this fails?
1072     my $blist = $self->get_bucket_list({
1073         key_md5 => $args->{key_md5},
1074     }) or die "How did delete_key fail (no blist)?!\n";
1075
1076     # Save the location so that we can free the data
1077     my $location = $blist->get_data_location_for({
1078         allow_head => 0,
1079     });
1080     my $old_value = $location && $self->engine->_load_sector( $location );
1081
1082     if ( $self->engine->trans_id == 0 ) {
1083         my @trans_ids = $self->engine->get_running_txn_ids;
1084         if ( @trans_ids ) {
1085             foreach my $other_trans_id ( @trans_ids ) {
1086                 next if $blist->get_data_location_for({ trans_id => $other_trans_id, allow_head => 0 });
1087                 $blist->write_md5({
1088                     trans_id => $other_trans_id,
1089                     key      => $args->{key},
1090                     key_md5  => $args->{key_md5},
1091                     value    => $old_value->clone,
1092                 });
1093             }
1094         }
1095     }
1096
1097     $blist->mark_deleted( $args );
1098
1099     my $data;
1100     if ( $old_value ) {
1101         $data = $old_value->data;
1102         $old_value->free;
1103     }
1104
1105     return $data;
1106 }
1107
1108 sub get_blist_loc {
1109     my $self = shift;
1110
1111     my $e = $self->engine;
1112     my $blist_loc = $e->storage->read_at( $self->offset + 2, $e->byte_size );
1113     return unpack( $StP{$e->byte_size}, $blist_loc );
1114 }
1115
1116 sub get_bucket_list {
1117     my $self = shift;
1118     my ($args) = @_;
1119     $args ||= {};
1120
1121     # XXX Add in check here for recycling?
1122
1123     my $engine = $self->engine;
1124
1125     my $blist_loc = $self->get_blist_loc;
1126
1127     # There's no index or blist yet
1128     unless ( $blist_loc ) {
1129         return unless $args->{create};
1130
1131         my $blist = DBM::Deep::Engine::Sector::BucketList->new({
1132             engine  => $engine,
1133             key_md5 => $args->{key_md5},
1134         });
1135
1136         $engine->storage->print_at( $self->offset + 2,
1137             pack( $StP{$engine->byte_size}, $blist->offset ),
1138         );
1139
1140         return $blist;
1141     }
1142
1143     return DBM::Deep::Engine::Sector::BucketList->new({
1144         engine  => $engine,
1145         offset  => $blist_loc,
1146         key_md5 => $args->{key_md5},
1147     });
1148 }
1149
1150 sub get_classname {
1151     my $self = shift;
1152
1153     my $class_offset = $self->engine->storage->read_at(
1154         $self->offset + 2 + 1 * $self->engine->byte_size, $self->engine->byte_size,
1155     );
1156     $class_offset = unpack ( $StP{$self->engine->byte_size}, $class_offset );
1157
1158     return unless $class_offset;
1159
1160     return $self->engine->_load_sector( $class_offset )->data;
1161 }
1162
1163 sub data {
1164     my $self = shift;
1165
1166     my $new_obj = DBM::Deep->new({
1167         type        => $self->type,
1168         base_offset => $self->offset,
1169         storage     => $self->engine->storage,
1170         engine      => $self->engine,
1171     });
1172
1173     if ( $self->engine->storage->{autobless} ) {
1174         my $classname = $self->get_classname;
1175         if ( defined $classname ) {
1176             bless $new_obj, $classname;
1177         }
1178     }
1179
1180     return $new_obj;
1181 }
1182
1183 package DBM::Deep::Engine::Sector::BucketList;
1184
1185 our @ISA = qw( DBM::Deep::Engine::Sector );
1186
1187 sub _init {
1188     my $self = shift;
1189
1190     my $engine = $self->engine;
1191
1192     unless ( $self->offset ) {
1193         my $leftover = $self->size - $self->base_size;
1194
1195         $self->{offset} = $engine->_request_sector( $self->size );
1196         $engine->storage->print_at( $self->offset,
1197             $engine->SIG_BLIST, # Sector type
1198             pack( $StP{1}, 0 ), # Recycled counter
1199             chr(0) x $leftover, # Zero-fill the data
1200         );
1201     }
1202
1203     if ( $self->{key_md5} ) {
1204         $self->find_md5;
1205     }
1206
1207     return $self;
1208 }
1209
1210 sub base_size { 1 + 1 } # Sig + recycled counter
1211
1212 sub size {
1213     my $self = shift;
1214     unless ( $self->{size} ) {
1215         my $e = $self->engine;
1216         $self->{size} = $self->base_size + $e->max_buckets * $self->bucket_size; # Base + numbuckets * bucketsize
1217     }
1218     return $self->{size};
1219 }
1220
1221 sub bucket_size {
1222     my $self = shift;
1223     unless ( $self->{bucket_size} ) {
1224         my $e = $self->engine;
1225         # Key + head (location) + transactions (location + staleness-counter)
1226         my $location_size = $e->byte_size + $e->num_txns * ( $e->byte_size + 4 );
1227         $self->{bucket_size} = $e->hash_size + $location_size;
1228     }
1229     return $self->{bucket_size};
1230 }
1231
1232 sub has_md5 {
1233     my $self = shift;
1234     unless ( exists $self->{found} ) {
1235         $self->find_md5;
1236     }
1237     return $self->{found};
1238 }
1239
1240 sub find_md5 {
1241     my $self = shift;
1242
1243     $self->{found} = undef;
1244     $self->{idx}   = -1;
1245
1246     # If we don't have an MD5, then what are we supposed to do?
1247     unless ( exists $self->{key_md5} ) {
1248         DBM::Deep->_throw_error( "Cannot find_md5 without a key_md5 set" );
1249     }
1250
1251     my $e = $self->engine;
1252     foreach my $idx ( 0 .. $e->max_buckets - 1 ) {
1253         my $potential = $e->storage->read_at(
1254             $self->offset + $self->base_size + $idx * $self->bucket_size, $e->hash_size,
1255         );
1256
1257         if ( $potential eq $e->blank_md5 ) {
1258             $self->{idx} = $idx;
1259             return;
1260         }
1261
1262         if ( $potential eq $self->{key_md5} ) {
1263             $self->{found} = 1;
1264             $self->{idx} = $idx;
1265             return;
1266         }
1267     }
1268
1269     return;
1270 }
1271
1272 sub write_md5 {
1273     my $self = shift;
1274     my ($args) = @_;
1275
1276     DBM::Deep->_throw_error( "write_md5: no key" ) unless exists $args->{key};
1277     DBM::Deep->_throw_error( "write_md5: no key_md5" ) unless exists $args->{key_md5};
1278     DBM::Deep->_throw_error( "write_md5: no value" ) unless exists $args->{value};
1279
1280     my $engine = $self->engine;
1281
1282     $args->{trans_id} = $engine->trans_id unless exists $args->{trans_id};
1283
1284     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
1285     $engine->add_entry( $args->{trans_id}, $spot );
1286
1287     unless ($self->{found}) {
1288         my $key_sector = DBM::Deep::Engine::Sector::Scalar->new({
1289             engine => $engine,
1290             data   => $args->{key},
1291         });
1292
1293         $engine->storage->print_at( $spot,
1294             $args->{key_md5},
1295             pack( $StP{$engine->byte_size}, $key_sector->offset ),
1296         );
1297     }
1298
1299     my $loc = $spot
1300       + $engine->hash_size
1301       + $engine->byte_size
1302       + $args->{trans_id} * ( $engine->byte_size + 4 );
1303
1304     $engine->storage->print_at( $loc,
1305         pack( $StP{$engine->byte_size}, $args->{value}->offset ),
1306         pack( 'N', $engine->get_txn_staleness_counter( $args->{trans_id} ) ),
1307     );
1308 }
1309
1310 sub mark_deleted {
1311     my $self = shift;
1312     my ($args) = @_;
1313     $args ||= {};
1314
1315     my $engine = $self->engine;
1316
1317     $args->{trans_id} = $engine->trans_id unless exists $args->{trans_id};
1318
1319     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
1320     $engine->add_entry( $args->{trans_id}, $spot );
1321
1322     my $loc = $spot
1323       + $engine->hash_size
1324       + $engine->byte_size
1325       + $args->{trans_id} * ( $engine->byte_size + 4 );
1326
1327     $engine->storage->print_at( $loc,
1328         pack( $StP{$engine->byte_size}, 1 ), # 1 is the marker for deleted
1329         pack( 'N', $engine->get_txn_staleness_counter( $args->{trans_id} ) ),
1330     );
1331 }
1332
1333 sub delete_md5 {
1334     my $self = shift;
1335     my ($args) = @_;
1336
1337     my $engine = $self->engine;
1338     return undef unless $self->{found};
1339
1340     # Save the location so that we can free the data
1341     my $location = $self->get_data_location_for({
1342         allow_head => 0,
1343     });
1344     my $key_sector = $self->get_key_for;
1345
1346     #XXX This isn't going to work right and you know it! This eradicates data
1347     # that we're not ready to eradicate just yet.
1348     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
1349     $engine->storage->print_at( $spot,
1350         $engine->storage->read_at(
1351             $spot + $self->bucket_size,
1352             $self->bucket_size * ( $engine->num_txns - $self->{idx} - 1 ),
1353         ),
1354         chr(0) x $self->bucket_size,
1355     );
1356
1357     $key_sector->free;
1358
1359     my $data_sector = $self->engine->_load_sector( $location );
1360     my $data = $data_sector->data;
1361     $data_sector->free;
1362
1363     return $data;
1364 }
1365
1366 sub get_data_location_for {
1367     my $self = shift;
1368     my ($args) = @_;
1369     $args ||= {};
1370
1371     $args->{allow_head} = 0 unless exists $args->{allow_head};
1372     $args->{trans_id}   = $self->engine->trans_id unless exists $args->{trans_id};
1373     $args->{idx}        = $self->{idx} unless exists $args->{idx};
1374
1375     my $e = $self->engine;
1376
1377     my $spot = $self->offset + $self->base_size
1378       + $args->{idx} * $self->bucket_size
1379       + $e->hash_size
1380       + $e->byte_size
1381       + $args->{trans_id} * ( $e->byte_size + 4 );
1382
1383     my $buffer = $e->storage->read_at(
1384         $spot,
1385         $e->byte_size + 4,
1386     );
1387     my ($loc, $staleness) = unpack( $StP{$e->byte_size} . ' N', $buffer );
1388
1389     # We have found an entry that is old, so get rid of it
1390     if ( $staleness != (my $s = $e->get_txn_staleness_counter( $args->{trans_id} ) ) ) {
1391         $e->storage->print_at(
1392             $spot,
1393             pack( $StP{$e->byte_size} . ' N', (0) x 2 ), 
1394         );
1395         $loc = 0;
1396     }
1397
1398     # If we're in a transaction and we never wrote to this location, try the
1399     # HEAD instead.
1400     if ( $args->{trans_id} && !$loc && $args->{allow_head} ) {
1401         return $self->get_data_location_for({
1402             trans_id   => 0,
1403             allow_head => 1,
1404             idx        => $args->{idx},
1405         });
1406     }
1407     return $loc <= 1 ? 0 : $loc;
1408 }
1409
1410 sub get_data_for {
1411     my $self = shift;
1412     my ($args) = @_;
1413     $args ||= {};
1414
1415     return unless $self->{found};
1416     my $location = $self->get_data_location_for({
1417         allow_head => $args->{allow_head},
1418     });
1419     return $self->engine->_load_sector( $location );
1420 }
1421
1422 sub get_key_for {
1423     my $self = shift;
1424     my ($idx) = @_;
1425     $idx = $self->{idx} unless defined $idx;
1426
1427     my $location = $self->engine->storage->read_at(
1428         $self->offset + $self->base_size + $idx * $self->bucket_size + $self->engine->hash_size,
1429         $self->engine->byte_size,
1430     );
1431     $location = unpack( $StP{$self->engine->byte_size}, $location );
1432     return unless $location;
1433     return $self->engine->_load_sector( $location );
1434 }
1435
1436 1;
1437 __END__
1438
1439 package DBM::Deep::Engine::Sector::BucketList;
1440
1441 our @ISA = qw( DBM::Deep::Engine::Sector );
1442
1443 sub _init {
1444     my $self = shift;
1445
1446     my $engine = $self->engine;
1447
1448     unless ( $self->offset ) {
1449         my $leftover = $self->size - $self->base_size;
1450
1451         $self->{offset} = $engine->_request_sector( $self->size );
1452         $engine->storage->print_at( $self->offset,
1453             $engine->SIG_BLIST, # Sector type
1454             pack( $StP{1}, 0 ), # Recycled counter
1455             chr(0) x $leftover, # Zero-fill the data
1456         );
1457     }
1458
1459     if ( $self->{key_md5} ) {
1460         $self->find_md5;
1461     }
1462
1463     return $self;
1464 }
1465
1466 sub base_size { 1 + 1 } # Sig + recycled counter
1467
1468 sub size {
1469     my $self = shift;
1470     unless ( $self->{size} ) {
1471         my $e = $self->engine;
1472         $self->{size} = $self->base_size + $e->max_buckets * $self->bucket_size; # Base + numbuckets * bucketsize
1473     }
1474     return $self->{size};
1475 }
1476
1477 sub bucket_size {
1478     my $self = shift;
1479     unless ( $self->{bucket_size} ) {
1480         my $e = $self->engine;
1481         # Key + head (location) + transactions (location + staleness-counter)
1482         my $location_size = $e->byte_size + $e->num_txns * ( $e->byte_size + 4 );
1483         $self->{bucket_size} = $e->hash_size + $location_size;
1484     }
1485     return $self->{bucket_size};
1486 }
1487
1488 1;
1489 __END__