Added 16 bytes to the header prepping for transaction staleness counters
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine3.pm
1 package DBM::Deep::Engine3;
2
3 use 5.6.0;
4
5 use strict;
6
7 our $VERSION = q(0.99_03);
8
9 use Scalar::Util ();
10
11 # File-wide notes:
12 # * Every method in here assumes that the storage has been appropriately
13 #   safeguarded. This can be anything from flock() to some sort of manual
14 #   mutex. But, it's the caller's responsability to make sure that this has
15 #   been done.
16
17 # Setup file and tag signatures.  These should never change.
18 sub SIG_FILE     () { 'DPDB' }
19 sub SIG_HEADER   () { 'h'    }
20 sub SIG_INTERNAL () { 'i'    }
21 sub SIG_HASH     () { 'H'    }
22 sub SIG_ARRAY    () { 'A'    }
23 sub SIG_NULL     () { 'N'    }
24 sub SIG_DATA     () { 'D'    }
25 sub SIG_INDEX    () { 'I'    }
26 sub SIG_BLIST    () { 'B'    }
27 sub SIG_FREE     () { 'F'    }
28 sub SIG_KEYS     () { 'K'    }
29 sub SIG_SIZE     () {  1     }
30
31 ################################################################################
32
33 # Please refer to the pack() documentation for further information
34 my %StP = (
35     1 => 'C', # Unsigned char value
36     2 => 'n', # Unsigned short in "network" (big-endian) order
37     4 => 'N', # Unsigned long in "network" (big-endian) order
38     8 => 'Q', # Usigned quad (no order specified, presumably machine-dependent)
39 );
40
41 sub new {
42     my $class = shift;
43     my ($args) = @_;
44
45     print "\n********* NEW ********\n\n";
46     my $self = bless {
47         byte_size   => 4,
48
49         digest      => undef,
50         hash_size   => 16, # In bytes
51         max_buckets => 16,
52         num_txns    => 16, # HEAD plus 15 running txns
53         trans_id    => 0,  # Default to the HEAD
54
55         entries => {}, # This is the list of entries for transactions
56         storage => undef,
57     }, $class;
58
59     if ( defined $args->{pack_size} ) {
60         if ( lc $args->{pack_size} eq 'small' ) {
61             $args->{byte_size} = 2;
62         }
63         elsif ( lc $args->{pack_size} eq 'medium' ) {
64             $args->{byte_size} = 4;
65         }
66         elsif ( lc $args->{pack_size} eq 'large' ) {
67             $args->{byte_size} = 8;
68         }
69         else {
70             die "Unknown pack_size value: '$args->{pack_size}'\n";
71         }
72     }
73
74     # Grab the parameters we want to use
75     foreach my $param ( keys %$self ) {
76         next unless exists $args->{$param};
77         $self->{$param} = $args->{$param};
78     }
79
80     $self->{byte_pack} = $StP{ $self->byte_size };
81
82     ##
83     # Number of buckets per blist before another level of indexing is
84     # done. Increase this value for slightly greater speed, but larger database
85     # files. DO NOT decrease this value below 16, due to risk of recursive
86     # reindex overrun.
87     ##
88     if ( $self->{max_buckets} < 16 ) {
89         warn "Floor of max_buckets is 16. Setting it to 16 from '$self->{max_buckets}'\n";
90         $self->{max_buckets} = 16;
91     }
92
93     if ( !$self->{digest} ) {
94         require Digest::MD5;
95         $self->{digest} = \&Digest::MD5::md5;
96     }
97
98     return $self;
99 }
100
101 ################################################################################
102
103 sub read_value {
104     my $self = shift;
105     my ($obj, $key) = @_;
106
107     # This will be a Reference sector
108     my $sector = $self->_load_sector( $obj->_base_offset )
109         or die "How did read_value fail (no sector for '$obj')?!\n";
110
111     my $key_md5 = $self->_apply_digest( $key );
112
113     my $value_sector = $sector->get_data_for({
114         key_md5    => $key_md5,
115         allow_head => 1,
116     });
117
118     unless ( $value_sector ) {
119         $value_sector = DBM::Deep::Engine::Sector::Null->new({
120             engine => $self,
121             data   => undef,
122         });
123
124         $sector->write_data({
125             key_md5 => $key_md5,
126             key     => $key,
127             value   => $value_sector,
128         });
129     }
130
131     return $value_sector->data;
132 }
133
134 sub get_classname {
135     my $self = shift;
136     my ($obj) = @_;
137
138     # This will be a Reference sector
139     my $sector = $self->_load_sector( $obj->_base_offset )
140         or die "How did read_value fail (no sector for '$obj')?!\n";
141
142     return $sector->get_classname;
143 }
144
145 sub key_exists {
146     my $self = shift;
147     my ($obj, $key) = @_;
148
149     # This will be a Reference sector
150     my $sector = $self->_load_sector( $obj->_base_offset )
151         or die "How did key_exists fail (no sector for '$obj')?!\n";
152
153     my $data = $sector->get_data_for({
154         key_md5    => $self->_apply_digest( $key ),
155         allow_head => 1,
156     });
157
158     # exists() returns 1 or '' for true/false.
159     return $data ? 1 : '';
160 }
161
162 sub delete_key {
163     my $self = shift;
164     my ($obj, $key) = @_;
165
166     my $sector = $self->_load_sector( $obj->_base_offset )
167         or die "How did delete_key fail (no sector for '$obj')?!\n";
168
169     return $sector->delete_key({
170         key_md5    => $self->_apply_digest( $key ),
171         allow_head => 0,
172     });
173 }
174
175 sub write_value {
176     my $self = shift;
177     my ($obj, $key, $value) = @_;
178
179     my $r = Scalar::Util::reftype( $value ) || '';
180     {
181         last if $r eq '';
182         last if $r eq 'HASH';
183         last if $r eq 'ARRAY';
184
185         DBM::Deep->_throw_error(
186             "Storage of references of type '$r' is not supported."
187         );
188     }
189
190     my ($class, $type);
191     if ( !defined $value ) {
192         $class = 'DBM::Deep::Engine::Sector::Null';
193     }
194     elsif ( $r eq 'ARRAY' || $r eq 'HASH' ) {
195         if ( $r eq 'ARRAY' && tied(@$value) ) {
196             DBM::Deep->_throw_error( "Cannot store something that is tied." );
197         }
198         if ( $r eq 'HASH' && tied(%$value) ) {
199             DBM::Deep->_throw_error( "Cannot store something that is tied." );
200         }
201         $class = 'DBM::Deep::Engine::Sector::Reference';
202         $type = substr( $r, 0, 1 );
203     }
204     else {
205         $class = 'DBM::Deep::Engine::Sector::Scalar';
206     }
207
208     # This will be a Reference sector
209     my $sector = $self->_load_sector( $obj->_base_offset )
210         or die "How did write_value fail (no sector for '$obj')?!\n";
211
212     # Create this after loading the reference sector in case something bad happens.
213     # This way, we won't allocate value sector(s) needlessly.
214     my $value_sector = $class->new({
215         engine => $self,
216         data   => $value,
217         type   => $type,
218     });
219
220     $sector->write_data({
221         key     => $key,
222         key_md5 => $self->_apply_digest( $key ),
223         value   => $value_sector,
224     });
225
226     # This code is to make sure we write all the values in the $value to the disk
227     # and to make sure all changes to $value after the assignment are reflected
228     # on disk. This may be counter-intuitive at first, but it is correct dwimmery.
229     #   NOTE - simply tying $value won't perform a STORE on each value. Hence, the
230     # copy to a temp value.
231     if ( $r eq 'ARRAY' ) {
232         my @temp = @$value;
233         tie @$value, 'DBM::Deep', {
234             base_offset => $value_sector->offset,
235             storage     => $self->storage,
236             engine      => $self,
237         };
238         @$value = @temp;
239         bless $value, 'DBM::Deep::Array' unless Scalar::Util::blessed( $value );
240     }
241     elsif ( $r eq 'HASH' ) {
242         my %temp = %$value;
243         tie %$value, 'DBM::Deep', {
244             base_offset => $value_sector->offset,
245             storage     => $self->storage,
246             engine      => $self,
247         };
248
249         %$value = %temp;
250         bless $value, 'DBM::Deep::Hash' unless Scalar::Util::blessed( $value );
251     }
252
253     return 1;
254 }
255
256 sub get_next_key {
257     my $self = shift;
258     my ($obj, $prev_key) = @_;
259
260     # XXX Need to add logic about resetting the iterator if any key in the reference has changed
261     unless ( $prev_key ) {
262         $obj->{iterator} = DBM::Deep::Engine::Iterator->new({
263             base_offset => $obj->_base_offset,
264             engine      => $self,
265         });
266     }
267
268     return $obj->{iterator}->get_next_key;
269 }
270
271 ################################################################################
272
273 sub setup_fh {
274     my $self = shift;
275     my ($obj) = @_;
276
277     # We're opening the file.
278     unless ( $obj->_base_offset ) {
279         my $bytes_read = $self->_read_file_header;
280
281         # Creating a new file
282         unless ( $bytes_read ) {
283             $self->_write_file_header;
284
285             # 1) Create Array/Hash entry
286             my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
287                 engine => $self,
288                 type   => $obj->_type,
289             });
290             $obj->{base_offset} = $initial_reference->offset;
291
292             $self->storage->flush;
293         }
294         # Reading from an existing file
295         else {
296             $obj->{base_offset} = $bytes_read;
297             my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
298                 engine => $self,
299                 offset => $obj->_base_offset,
300             });
301             unless ( $initial_reference ) {
302                 DBM::Deep->_throw_error("Corrupted file, no master index record");
303             }
304
305             unless ($obj->_type eq $initial_reference->type) {
306                 DBM::Deep->_throw_error("File type mismatch");
307             }
308         }
309     }
310
311     return 1;
312 }
313
314 sub begin_work {
315     my $self = shift;
316     my ($obj) = @_;
317
318     if ( $self->trans_id ) {
319         DBM::Deep->_throw_error( "Cannot begin_work within a transaction" );
320     }
321
322     my @slots = $self->read_txn_slots;
323     for my $i ( 1 .. @slots ) {
324         next if $slots[$i];
325         $slots[$i] = 1;
326         $self->set_trans_id( $i );
327         last;
328     }
329     $self->write_txn_slots( @slots );
330
331     if ( !$self->trans_id ) {
332         DBM::Deep->_throw_error( "Cannot begin_work - no available transactions" );
333     }
334
335     return;
336 }
337
338 sub rollback {
339     my $self = shift;
340     my ($obj) = @_;
341
342     if ( !$self->trans_id ) {
343         DBM::Deep->_throw_error( "Cannot rollback without a transaction" );
344     }
345
346     # Each entry is the file location for a bucket that has a modification for
347     # this transaction. The entries need to be expunged.
348     foreach my $entry (@{ $self->get_entries } ) {
349         # Remove the entry here
350         my $read_loc = $entry
351           + $self->hash_size
352           + $self->byte_size
353           + $self->trans_id * $self->byte_size;
354
355         my $data_loc = $self->storage->read_at( $read_loc, $self->byte_size );
356         $data_loc = unpack( $StP{$self->byte_size}, $data_loc );
357         $self->storage->print_at( $read_loc, pack( $StP{$self->byte_size}, 0 ) );
358
359         if ( $data_loc > 1 ) {
360             $self->_load_sector( $data_loc )->free;
361         }
362     }
363
364     $self->clear_entries;
365
366     my @slots = $self->read_txn_slots;
367     $slots[$self->trans_id] = 0;
368     $self->write_txn_slots( @slots );
369     $self->set_trans_id( 0 );
370
371     return 1;
372 }
373
374 sub commit {
375     my $self = shift;
376     my ($obj) = @_;
377
378     if ( !$self->trans_id ) {
379         DBM::Deep->_throw_error( "Cannot commit without a transaction" );
380     }
381
382     print "TID: " . $self->trans_id, $/;
383     foreach my $entry (@{ $self->get_entries } ) {
384         print "$entry\n";
385         # Overwrite the entry in head with the entry in trans_id
386         my $base = $entry
387           + $self->hash_size
388           + $self->byte_size;
389
390         my $head_loc = $self->storage->read_at( $base, $self->byte_size );
391         $head_loc = unpack( $StP{$self->byte_size}, $head_loc );
392         my $trans_loc = $self->storage->read_at(
393             $base + $self->trans_id * $self->byte_size, $self->byte_size,
394         );
395
396         $self->storage->print_at( $base, $trans_loc );
397         $self->storage->print_at(
398             $base + $self->trans_id * $self->byte_size,
399             pack( $StP{$self->byte_size}, 0 ),
400         );
401
402         if ( $head_loc > 1 ) {
403             $self->_load_sector( $head_loc )->free;
404         }
405     }
406
407     $self->clear_entries;
408
409     my @slots = $self->read_txn_slots;
410     $slots[$self->trans_id] = 0;
411     $self->write_txn_slots( @slots );
412     $self->set_trans_id( 0 );
413
414     return 1;
415 }
416
417 sub read_txn_slots {
418     my $self = shift;
419     return split '', unpack( 'b32',
420         $self->storage->read_at(
421             $self->trans_loc, 4,
422         )
423     );
424 }
425
426 sub write_txn_slots {
427     my $self = shift;
428     $self->storage->print_at( $self->trans_loc,
429         pack( 'b32', join('', @_) ),
430     );
431 }
432
433 sub get_running_txn_ids {
434     my $self = shift;
435     my @transactions = $self->read_txn_slots;
436     my @trans_ids = grep { $transactions[$_] } 0 .. $#transactions;
437 }
438
439 sub get_entries {
440     my $self = shift;
441     return [ keys %{ $self->{entries}{$self->trans_id} ||= {} } ];
442 }
443
444 sub add_entry {
445     my $self = shift;
446     my ($trans_id, $loc) = @_;
447
448     print "$trans_id => $loc\n";
449     $self->{entries}{$trans_id} ||= {};
450     $self->{entries}{$trans_id}{$loc} = undef;
451     use Data::Dumper;print "$self: " . Dumper $self->{entries};
452 }
453
454 sub clear_entries {
455     my $self = shift;
456     print "Clearing\n";
457     delete $self->{entries}{$self->trans_id};
458 }
459
460 ################################################################################
461
462 {
463     my $header_fixed = length( SIG_FILE ) + 1 + 4 + 4;
464
465     sub _write_file_header {
466         my $self = shift;
467
468         my $header_var = 1 + 1 + 4 + 4 * $self->num_txns + 2 * $self->byte_size;
469
470         my $loc = $self->storage->request_space( $header_fixed + $header_var );
471
472         $self->storage->print_at( $loc,
473             SIG_FILE,
474             SIG_HEADER,
475             pack('N', 1),           # header version - at this point, we're at 9 bytes
476             pack('N', $header_var), # header size
477             # --- Above is $header_fixed. Below is $header_var
478             pack('C', $self->byte_size),
479             pack('C', $self->max_buckets),
480             pack('N', 0 ),                   # Transaction cctiveness bitfield
481             pack('N' . $self->num_txns, 0 ), # Transaction staleness counters
482             pack($StP{$self->byte_size}, 0), # Start of free chain (blist size)
483             pack($StP{$self->byte_size}, 0), # Start of free chain (data size)
484         );
485
486         $self->set_trans_loc( $header_fixed + 2 );
487         $self->set_chains_loc( $header_fixed + 2 + 4 + 4 * $self->num_txns );
488
489         return;
490     }
491
492     sub _read_file_header {
493         my $self = shift;
494
495         my $buffer = $self->storage->read_at( 0, $header_fixed );
496         return unless length($buffer);
497
498         my ($file_signature, $sig_header, $header_version, $size) = unpack(
499             'A4 A N N', $buffer
500         );
501
502         unless ( $file_signature eq SIG_FILE ) {
503             $self->storage->close;
504             DBM::Deep->_throw_error( "Signature not found -- file is not a Deep DB" );
505         }
506
507         unless ( $sig_header eq SIG_HEADER ) {
508             $self->storage->close;
509             DBM::Deep->_throw_error( "Old file version found." );
510         }
511
512         my $buffer2 = $self->storage->read_at( undef, $size );
513         my @values = unpack( 'C C', $buffer2 );
514
515         $self->set_trans_loc( $header_fixed + 2 );
516         $self->set_chains_loc( $header_fixed + 2 + 4 + 4 * $self->num_txns );
517
518         if ( @values < 2 || grep { !defined } @values ) {
519             $self->storage->close;
520             DBM::Deep->_throw_error("Corrupted file - bad header");
521         }
522
523         #XXX Add warnings if values weren't set right
524         @{$self}{qw(byte_size max_buckets)} = @values;
525
526         my $header_var = 1 + 1 + 4 + 4 * $self->num_txns + 2 * $self->byte_size;
527         unless ( $size eq $header_var ) {
528             $self->storage->close;
529             DBM::Deep->_throw_error( "Unexpected size found ($size <-> $header_var)." );
530         }
531
532         return length($buffer) + length($buffer2);
533     }
534 }
535
536 sub _load_sector {
537     my $self = shift;
538     my ($offset) = @_;
539
540     my $type = $self->storage->read_at( $offset, 1 );
541     return if $type eq chr(0);
542
543     if ( $type eq $self->SIG_ARRAY || $type eq $self->SIG_HASH ) {
544         return DBM::Deep::Engine::Sector::Reference->new({
545             engine => $self,
546             type   => $type,
547             offset => $offset,
548         });
549     }
550     # XXX Don't we need key_md5 here?
551     elsif ( $type eq $self->SIG_BLIST ) {
552         return DBM::Deep::Engine::Sector::BucketList->new({
553             engine => $self,
554             type   => $type,
555             offset => $offset,
556         });
557     }
558     elsif ( $type eq $self->SIG_NULL ) {
559         return DBM::Deep::Engine::Sector::Null->new({
560             engine => $self,
561             type   => $type,
562             offset => $offset,
563         });
564     }
565     elsif ( $type eq $self->SIG_DATA ) {
566         return DBM::Deep::Engine::Sector::Scalar->new({
567             engine => $self,
568             type   => $type,
569             offset => $offset,
570         });
571     }
572     # This was deleted from under us, so just return and let the caller figure it out.
573     elsif ( $type eq $self->SIG_FREE ) {
574         return;
575     }
576
577     die "'$offset': Don't know what to do with type '$type'\n";
578 }
579
580 sub _apply_digest {
581     my $self = shift;
582     return $self->{digest}->(@_);
583 }
584
585 sub _add_free_sector {
586     my $self = shift;
587     my ($offset, $size) = @_;
588
589     my $chains_offset;
590     # Data sector
591     if ( $size == 256 ) {
592         $chains_offset = $self->byte_size;
593     }
594     # Blist sector
595     else {
596         $chains_offset = 0;
597     }
598
599     my $old_head = $self->storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
600
601     $self->storage->print_at( $self->chains_loc + $chains_offset,
602         pack( $StP{$self->byte_size}, $offset ),
603     );
604
605     # Record the old head in the new sector after the signature
606     $self->storage->print_at( $offset + 1, $old_head );
607 }
608
609 sub _request_sector {
610     my $self = shift;
611     my ($size) = @_;
612
613     my $chains_offset;
614     # Data sector
615     if ( $size == 256 ) {
616         $chains_offset = $self->byte_size;
617     }
618     # Blist sector
619     else {
620         $chains_offset = 0;
621     }
622
623     my $old_head = $self->storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
624     my $loc = unpack( $StP{$self->byte_size}, $old_head );
625
626     # We don't have any free sectors of the right size, so allocate a new one.
627     unless ( $loc ) {
628         return $self->storage->request_space( $size );
629     }
630
631     my $new_head = $self->storage->read_at( $loc + 1, $self->byte_size );
632     $self->storage->print_at( $self->chains_loc + $chains_offset, $new_head );
633
634     return $loc;
635 }
636
637 ################################################################################
638
639 sub storage     { $_[0]{storage} }
640 sub byte_size   { $_[0]{byte_size} }
641 sub hash_size   { $_[0]{hash_size} }
642 sub num_txns    { $_[0]{num_txns} }
643 sub max_buckets { $_[0]{max_buckets} }
644 sub blank_md5   { chr(0) x $_[0]->hash_size }
645
646 sub trans_id     { $_[0]{trans_id} }
647 sub set_trans_id { $_[0]{trans_id} = $_[1] }
648
649 sub trans_loc     { $_[0]{trans_loc} }
650 sub set_trans_loc { $_[0]{trans_loc} = $_[1] }
651
652 sub chains_loc     { $_[0]{chains_loc} }
653 sub set_chains_loc { $_[0]{chains_loc} = $_[1] }
654
655 ################################################################################
656
657 package DBM::Deep::Engine::Iterator;
658
659 sub new {
660     my $class = shift;
661     my ($args) = @_;
662
663     my $self = bless {
664         breadcrumbs => [],
665         engine      => $args->{engine},
666         base_offset => $args->{base_offset},
667     }, $class;
668
669     Scalar::Util::weaken( $self->{engine} );
670
671     return $self;
672 }
673
674 sub reset {
675     my $self = shift;
676     $self->{breadcrumbs} = [];
677 }
678
679 sub get_next_key {
680     my $self = shift;
681
682     my $crumbs = $self->{breadcrumbs};
683
684     unless ( @$crumbs ) {
685         # This will be a Reference sector
686         my $sector = $self->{engine}->_load_sector( $self->{base_offset} )
687             # or die "Iterator: How did this fail (no ref sector for '$self->{base_offset}')?!\n";
688             # If no sector is found, thist must have been deleted from under us.
689             or return;
690         push @$crumbs, [ $sector->get_blist_loc, 0 ];
691     }
692
693     my $key;
694     while ( 1 ) {
695         my ($offset, $idx) = @{ $crumbs->[-1] };
696         unless ( $offset ) {
697             $self->reset;
698             last;
699         }
700
701         if ( $idx >= $self->{engine}->max_buckets ) {
702             $self->reset;
703             last;
704         }
705
706         my $sector = $self->{engine}->_load_sector( $offset )
707             or die "Iterator: How did this fail (no blist sector for '$offset')?!\n";
708
709         #XXX Think this through!
710         my $loc =  $sector->get_data_location_for({
711             idx => $idx,
712         });
713         unless ( $loc ) {
714             $crumbs->[-1][1]++;
715             next;
716         }
717
718         my $key_sector = $sector->get_key_for( $idx );
719         unless ( $key_sector ) {
720             $self->reset;
721             last;
722         }
723
724         $crumbs->[-1][1]++;
725         $key = $key_sector->data;
726         last;
727     }
728
729     return $key;
730 }
731
732 package DBM::Deep::Engine::Sector;
733
734 sub new {
735     my $self = bless $_[1], $_[0];
736     Scalar::Util::weaken( $self->{engine} );
737     $self->_init;
738     return $self;
739 }
740 sub _init {}
741 sub clone { die "Must be implemented in the child class" }
742
743 sub engine { $_[0]{engine} }
744 sub offset { $_[0]{offset} }
745 sub type   { $_[0]{type} }
746
747 sub free {
748     my $self = shift;
749
750     $self->engine->storage->print_at( $self->offset,
751         $self->engine->SIG_FREE,
752         chr(0) x ($self->size - 1),
753     );
754
755     $self->engine->_add_free_sector(
756         $self->offset, $self->size,
757     );
758
759     return;
760 }
761
762 package DBM::Deep::Engine::Sector::Data;
763
764 our @ISA = qw( DBM::Deep::Engine::Sector );
765
766 # This is in bytes
767 sub size { return 256 }
768
769 sub clone {
770     my $self = shift;
771     return ref($self)->new({
772         engine => $self->engine,
773         data   => $self->data,
774         type   => $self->type,
775     });
776 }
777
778 package DBM::Deep::Engine::Sector::Scalar;
779
780 our @ISA = qw( DBM::Deep::Engine::Sector::Data );
781
782 sub free {
783     my $self = shift;
784
785     my $chain_loc = $self->chain_loc;
786
787     $self->SUPER::free();
788
789     if ( $chain_loc ) {
790         $self->engine->_load_sector( $chain_loc )->free;
791     }
792
793     return;
794 }
795
796 sub type { $_[0]{engine}->SIG_DATA }
797 sub _init {
798     my $self = shift;
799
800     my $engine = $self->engine;
801
802     unless ( $self->offset ) {
803         my $data_section = $self->size - 3 - 1 * $engine->byte_size;
804
805         $self->{offset} = $engine->_request_sector( $self->size );
806
807         my $data = delete $self->{data};
808         my $dlen = length $data;
809         my $continue = 1;
810         my $curr_offset = $self->offset;
811         while ( $continue ) {
812
813             my $next_offset = 0;
814
815             my ($leftover, $this_len, $chunk);
816             if ( $dlen > $data_section ) {
817                 $leftover = 0;
818                 $this_len = $data_section;
819                 $chunk = substr( $data, 0, $this_len );
820
821                 $dlen -= $data_section;
822                 $next_offset = $engine->_request_sector( $self->size );
823                 $data = substr( $data, $this_len );
824             }
825             else {
826                 $leftover = $data_section - $dlen;
827                 $this_len = $dlen;
828                 $chunk = $data;
829
830                 $continue = 0;
831             }
832
833             $engine->storage->print_at( $curr_offset,
834                 $self->type,                                     # Sector type
835                 pack( $StP{1}, 0 ),                              # Recycled counter
836                 pack( $StP{$engine->byte_size}, $next_offset ),  # Chain loc
837                 pack( $StP{1}, $this_len ),                      # Data length
838                 $chunk,                                          # Data to be stored in this sector
839                 chr(0) x $leftover,                              # Zero-fill the rest
840             );
841
842             $curr_offset = $next_offset;
843         }
844
845         return;
846     }
847 }
848
849 sub data_length {
850     my $self = shift;
851
852     my $buffer = $self->engine->storage->read_at(
853         $self->offset + 2 + $self->engine->byte_size, 1
854     );
855
856     return unpack( $StP{1}, $buffer );
857 }
858
859 sub chain_loc {
860     my $self = shift;
861     my $chain_loc = $self->engine->storage->read_at(
862         $self->offset + 2, $self->engine->byte_size,
863     );
864     return unpack( $StP{$self->engine->byte_size}, $chain_loc );
865 }
866
867 sub data {
868     my $self = shift;
869
870     my $data;
871     while ( 1 ) {
872         my $chain_loc = $self->chain_loc;
873
874         $data .= $self->engine->storage->read_at(
875             $self->offset + 2 + $self->engine->byte_size + 1, $self->data_length,
876         );
877
878         last unless $chain_loc;
879
880         $self = $self->engine->_load_sector( $chain_loc );
881     }
882
883     return $data;
884 }
885
886 package DBM::Deep::Engine::Sector::Null;
887
888 our @ISA = qw( DBM::Deep::Engine::Sector::Data );
889
890 sub type { $_[0]{engine}->SIG_NULL }
891 sub data_length { 0 }
892 sub data { return }
893
894 sub _init {
895     my $self = shift;
896
897     my $engine = $self->engine;
898
899     unless ( $self->offset ) {
900         my $leftover = $self->size - 3 - 1 * $engine->byte_size;
901
902         $self->{offset} = $engine->_request_sector( $self->size );
903         $engine->storage->print_at( $self->offset,
904             $self->type,                          # Sector type
905             pack( $StP{1}, 0 ),                   # Recycled counter
906             pack( $StP{$engine->byte_size}, 0 ),  # Chain loc
907             pack( $StP{1}, $self->data_length ),  # Data length
908             chr(0) x $leftover,                   # Zero-fill the rest
909         );
910
911         return;
912     }
913 }
914
915 package DBM::Deep::Engine::Sector::Reference;
916
917 our @ISA = qw( DBM::Deep::Engine::Sector::Data );
918
919 sub _init {
920     my $self = shift;
921
922     my $engine = $self->engine;
923
924     unless ( $self->offset ) {
925         my $classname = Scalar::Util::blessed( delete $self->{data} );
926         my $leftover = $self->size - 4 - 2 * $engine->byte_size;
927
928         my $class_offset = 0;
929         if ( defined $classname ) {
930             my $class_sector = DBM::Deep::Engine::Sector::Scalar->new({
931                 engine => $self->engine,
932                 data   => $classname,
933             });
934             $class_offset = $class_sector->offset;
935         }
936
937         $self->{offset} = $engine->_request_sector( $self->size );
938         $engine->storage->print_at( $self->offset,
939             $self->type,                                     # Sector type
940             pack( $StP{1}, 0 ),                              # Recycled counter
941             pack( $StP{$engine->byte_size}, 0 ),             # Index/BList loc
942             pack( $StP{$engine->byte_size}, $class_offset ), # Classname loc
943             chr(0) x $leftover,                              # Zero-fill the rest
944         );
945
946         return;
947     }
948
949     $self->{type} = $engine->storage->read_at( $self->offset, 1 );
950
951     return;
952 }
953
954 sub get_data_for {
955     my $self = shift;
956     my ($args) = @_;
957
958     # Assume that the head is not allowed unless otherwise specified.
959     $args->{allow_head} = 0 unless exists $args->{allow_head};
960
961     # Assume we don't create a new blist location unless otherwise specified.
962     $args->{create} = 0 unless exists $args->{create};
963
964     my $blist = $self->get_bucket_list({
965         key_md5 => $args->{key_md5},
966         create  => $args->{create},
967     });
968     return unless $blist && $blist->{found};
969
970     # At this point, $blist knows where the md5 is. What it -doesn't- know yet
971     # is whether or not this transaction has this key. That's part of the next
972     # function call.
973     my $location = $blist->get_data_location_for({
974         allow_head => $args->{allow_head},
975     }) or return;
976
977     return $self->engine->_load_sector( $location );
978 }
979
980 sub write_data {
981     my $self = shift;
982     my ($args) = @_;
983
984     my $blist = $self->get_bucket_list({
985         key_md5 => $args->{key_md5},
986         create  => 1,
987     }) or die "How did write_data fail (no blist)?!\n";
988
989     # Handle any transactional bookkeeping.
990     if ( $self->engine->trans_id ) {
991         if ( ! $blist->{found} ) {
992             $blist->mark_deleted({
993                 trans_id => 0,
994             });
995         }
996     }
997     else {
998         my @trans_ids = $self->engine->get_running_txn_ids;
999         if ( $blist->{found} ) {
1000             if ( @trans_ids ) {
1001                 my $old_value = $blist->get_data_for;
1002                 foreach my $other_trans_id ( @trans_ids ) {
1003                     next if $blist->get_data_location_for({ trans_id => $other_trans_id, allow_head => 0 });
1004                     print "write_md5 to save a value\n";
1005                     $blist->write_md5({
1006                         trans_id => $other_trans_id,
1007                         key      => $args->{key},
1008                         key_md5  => $args->{key_md5},
1009                         value    => $old_value->clone,
1010                     });
1011                 }
1012             }
1013         }
1014         else {
1015             if ( @trans_ids ) {
1016                 foreach my $other_trans_id ( @trans_ids ) {
1017                     next if $blist->get_data_location_for({ trans_id => $other_trans_id, allow_head => 0 });
1018                     $blist->mark_deleted({
1019                         trans_id => $other_trans_id,
1020                     });
1021                 }
1022             }
1023         }
1024     }
1025
1026     #XXX Is this safe to do transactionally?
1027     # Free the place we're about to write to.
1028     if ( $blist->get_data_location_for({ allow_head => 0 }) ) {
1029         $blist->get_data_for({ allow_head => 0 })->free;
1030     }
1031
1032     $blist->write_md5({
1033         key      => $args->{key},
1034         key_md5  => $args->{key_md5},
1035         value    => $args->{value},
1036     });
1037 }
1038
1039 sub delete_key {
1040     my $self = shift;
1041     my ($args) = @_;
1042
1043     # XXX What should happen if this fails?
1044     my $blist = $self->get_bucket_list({
1045         key_md5 => $args->{key_md5},
1046     }) or die "How did delete_key fail (no blist)?!\n";
1047
1048     # Save the location so that we can free the data
1049     my $location = $blist->get_data_location_for({
1050         allow_head => 0,
1051     });
1052     my $old_value = $location && $self->engine->_load_sector( $location );
1053
1054     if ( $self->engine->trans_id == 0 ) {
1055         my @trans_ids = $self->engine->get_running_txn_ids;
1056         if ( @trans_ids ) {
1057             foreach my $other_trans_id ( @trans_ids ) {
1058                 next if $blist->get_data_location_for({ trans_id => $other_trans_id, allow_head => 0 });
1059                 $blist->write_md5({
1060                     trans_id => $other_trans_id,
1061                     key      => $args->{key},
1062                     key_md5  => $args->{key_md5},
1063                     value    => $old_value->clone,
1064                 });
1065             }
1066         }
1067     }
1068
1069     $blist->mark_deleted( $args );
1070
1071     my $data;
1072     if ( $old_value ) {
1073         $data = $old_value->data;
1074         $old_value->free;
1075     }
1076
1077     return $data;
1078 }
1079
1080 sub get_blist_loc {
1081     my $self = shift;
1082
1083     my $e = $self->engine;
1084     my $blist_loc = $e->storage->read_at( $self->offset + 2, $e->byte_size );
1085     return unpack( $StP{$e->byte_size}, $blist_loc );
1086 }
1087
1088 sub get_bucket_list {
1089     my $self = shift;
1090     my ($args) = @_;
1091     $args ||= {};
1092
1093     # XXX Add in check here for recycling?
1094
1095     my $engine = $self->engine;
1096
1097     my $blist_loc = $self->get_blist_loc;
1098
1099     # There's no index or blist yet
1100     unless ( $blist_loc ) {
1101         return unless $args->{create};
1102
1103         my $blist = DBM::Deep::Engine::Sector::BucketList->new({
1104             engine  => $engine,
1105             key_md5 => $args->{key_md5},
1106         });
1107
1108         $engine->storage->print_at( $self->offset + 2,
1109             pack( $StP{$engine->byte_size}, $blist->offset ),
1110         );
1111
1112         return $blist;
1113     }
1114
1115     return DBM::Deep::Engine::Sector::BucketList->new({
1116         engine  => $engine,
1117         offset  => $blist_loc,
1118         key_md5 => $args->{key_md5},
1119     });
1120 }
1121
1122 sub get_classname {
1123     my $self = shift;
1124
1125     my $class_offset = $self->engine->storage->read_at(
1126         $self->offset + 2 + 1 * $self->engine->byte_size, $self->engine->byte_size,
1127     );
1128     $class_offset = unpack ( $StP{$self->engine->byte_size}, $class_offset );
1129
1130     return unless $class_offset;
1131
1132     return $self->engine->_load_sector( $class_offset )->data;
1133 }
1134
1135 sub data {
1136     my $self = shift;
1137
1138     my $new_obj = DBM::Deep->new({
1139         type        => $self->type,
1140         base_offset => $self->offset,
1141         storage     => $self->engine->storage,
1142         engine      => $self->engine,
1143     });
1144
1145     if ( $self->engine->storage->{autobless} ) {
1146         my $classname = $self->get_classname;
1147         if ( defined $classname ) {
1148             bless $new_obj, $classname;
1149         }
1150     }
1151
1152     return $new_obj;
1153 }
1154
1155 package DBM::Deep::Engine::Sector::BucketList;
1156
1157 our @ISA = qw( DBM::Deep::Engine::Sector );
1158
1159 sub idx_for_txn { return $_[1] + 1 }
1160
1161 sub _init {
1162     my $self = shift;
1163
1164     my $engine = $self->engine;
1165
1166     unless ( $self->offset ) {
1167         my $leftover = $self->size - $self->base_size;
1168
1169         $self->{offset} = $engine->_request_sector( $self->size );
1170         $engine->storage->print_at( $self->offset,
1171             $engine->SIG_BLIST, # Sector type
1172             pack( $StP{1}, 0 ), # Recycled counter
1173             chr(0) x $leftover, # Zero-fill the data
1174         );
1175     }
1176
1177     if ( $self->{key_md5} ) {
1178         $self->find_md5;
1179     }
1180
1181     return $self;
1182 }
1183
1184 sub base_size { 2 } # Sig + recycled counter
1185
1186 sub size {
1187     my $self = shift;
1188     unless ( $self->{size} ) {
1189         my $e = $self->engine;
1190         $self->{size} = $self->base_size + $e->max_buckets * $self->bucket_size; # Base + numbuckets * bucketsize
1191     }
1192     return $self->{size};
1193 }
1194
1195 sub bucket_size {
1196     my $self = shift;
1197     unless ( $self->{bucket_size} ) {
1198         my $e = $self->engine;
1199         # Key + transactions
1200         my $locs_size = (1 + $e->num_txns ) * $e->byte_size;
1201         $self->{bucket_size} = $e->hash_size + $locs_size;
1202     }
1203     return $self->{bucket_size};
1204 }
1205
1206 sub has_md5 {
1207     my $self = shift;
1208     unless ( exists $self->{found} ) {
1209         $self->find_md5;
1210     }
1211     return $self->{found};
1212 }
1213
1214 sub find_md5 {
1215     my $self = shift;
1216
1217     $self->{found} = undef;
1218     $self->{idx}   = -1;
1219
1220     # If we don't have an MD5, then what are we supposed to do?
1221     unless ( exists $self->{key_md5} ) {
1222         DBM::Deep->_throw_error( "Cannot find_md5 without a key_md5 set" );
1223     }
1224
1225     my $e = $self->engine;
1226     foreach my $idx ( 0 .. $e->max_buckets - 1 ) {
1227         my $potential = $e->storage->read_at(
1228             $self->offset + $self->base_size + $idx * $self->bucket_size, $e->hash_size,
1229         );
1230
1231         if ( $potential eq $e->blank_md5 ) {
1232             $self->{idx} = $idx;
1233             return;
1234         }
1235
1236         if ( $potential eq $self->{key_md5} ) {
1237             $self->{found} = 1;
1238             $self->{idx} = $idx;
1239             return;
1240         }
1241     }
1242
1243     return;
1244 }
1245
1246 sub write_md5 {
1247     my $self = shift;
1248     my ($args) = @_;
1249
1250     DBM::Deep->_throw_error( "write_md5: no key" ) unless exists $args->{key};
1251     DBM::Deep->_throw_error( "write_md5: no key_md5" ) unless exists $args->{key_md5};
1252     DBM::Deep->_throw_error( "write_md5: no value" ) unless exists $args->{value};
1253
1254     my $engine = $self->engine;
1255
1256     $args->{trans_id} = $engine->trans_id unless exists $args->{trans_id};
1257
1258     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
1259     print "Adding $args->{trans_id} -> $spot\n";
1260     $engine->add_entry( $args->{trans_id}, $spot );
1261
1262     unless ($self->{found}) {
1263         my $key_sector = DBM::Deep::Engine::Sector::Scalar->new({
1264             engine => $engine,
1265             data   => $args->{key},
1266         });
1267
1268         $engine->storage->print_at( $spot,
1269             $args->{key_md5},
1270             pack( $StP{$engine->byte_size}, $key_sector->offset ),
1271         );
1272     }
1273
1274     my $loc = $spot
1275       + $engine->hash_size
1276       + $engine->byte_size
1277       + $args->{trans_id} * $engine->byte_size;
1278
1279     $engine->storage->print_at( $loc,
1280         pack( $StP{$engine->byte_size}, $args->{value}->offset ),
1281     );
1282 }
1283
1284 sub mark_deleted {
1285     my $self = shift;
1286     my ($args) = @_;
1287     $args ||= {};
1288
1289     my $engine = $self->engine;
1290
1291     $args->{trans_id} = $engine->trans_id unless exists $args->{trans_id};
1292
1293     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
1294     $engine->add_entry( $args->{trans_id}, $spot );
1295
1296     my $loc = $spot
1297       + $engine->hash_size
1298       + $engine->byte_size
1299       + $args->{trans_id} * $engine->byte_size;
1300
1301     $engine->storage->print_at( $loc,
1302         pack( $StP{$engine->byte_size}, 1 ), # 1 is the marker for deleted
1303     );
1304 }
1305
1306 sub delete_md5 {
1307     my $self = shift;
1308     my ($args) = @_;
1309
1310     my $engine = $self->engine;
1311     return undef unless $self->{found};
1312
1313     # Save the location so that we can free the data
1314     my $location = $self->get_data_location_for({
1315         allow_head => 0,
1316     });
1317     my $key_sector = $self->get_key_for;
1318
1319     #XXX This isn't going to work right and you know it! This eradicates data
1320     # that we're not ready to eradicate just yet.
1321     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
1322     $engine->storage->print_at( $spot,
1323         $engine->storage->read_at(
1324             $spot + $self->bucket_size,
1325             $self->bucket_size * ( $engine->num_txns - $self->{idx} - 1 ),
1326         ),
1327         chr(0) x $self->bucket_size,
1328     );
1329
1330     $key_sector->free;
1331
1332     my $data_sector = $self->engine->_load_sector( $location );
1333     my $data = $data_sector->data;
1334     $data_sector->free;
1335
1336     return $data;
1337 }
1338
1339 sub get_data_location_for {
1340     my $self = shift;
1341     my ($args) = @_;
1342     $args ||= {};
1343
1344     $args->{allow_head} = 0 unless exists $args->{allow_head};
1345     $args->{trans_id}   = $self->engine->trans_id unless exists $args->{trans_id};
1346     $args->{idx}        = $self->{idx} unless exists $args->{idx};
1347
1348     my $location = $self->engine->storage->read_at(
1349         $self->offset + $self->base_size
1350       + $args->{idx} * $self->bucket_size
1351       + $self->engine->hash_size
1352       + $self->engine->byte_size
1353       + $args->{trans_id} * $self->engine->byte_size,
1354         $self->engine->byte_size,
1355     );
1356     my $loc = unpack( $StP{$self->engine->byte_size}, $location );
1357
1358     # If we're in a transaction and we never wrote to this location, try the
1359     # HEAD instead.
1360     if ( $args->{trans_id} && !$loc && $args->{allow_head} ) {
1361         return $self->get_data_location_for({
1362             trans_id   => 0,
1363             allow_head => 1,
1364         });
1365     }
1366     return $loc <= 1 ? 0 : $loc;
1367 }
1368
1369 sub get_data_for {
1370     my $self = shift;
1371     my ($args) = @_;
1372     $args ||= {};
1373
1374     return unless $self->{found};
1375     my $location = $self->get_data_location_for({
1376         allow_head => $args->{allow_head},
1377     });
1378     return $self->engine->_load_sector( $location );
1379 }
1380
1381 sub get_key_for {
1382     my $self = shift;
1383     my ($idx) = @_;
1384     $idx = $self->{idx} unless defined $idx;
1385
1386     my $location = $self->engine->storage->read_at(
1387         $self->offset + $self->base_size + $idx * $self->bucket_size + $self->engine->hash_size,
1388         $self->engine->byte_size,
1389     );
1390     $location = unpack( $StP{$self->engine->byte_size}, $location );
1391     return unless $location;
1392     return $self->engine->_load_sector( $location );
1393 }
1394
1395 1;
1396 __END__