Added more tests and rollback/commit are kinda working
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine3.pm
1 package DBM::Deep::Engine3;
2
3 use 5.6.0;
4
5 use strict;
6
7 our $VERSION = q(0.99_03);
8
9 use Scalar::Util ();
10
11 # File-wide notes:
12 # * Every method in here assumes that the storage has been appropriately
13 #   safeguarded. This can be anything from flock() to some sort of manual
14 #   mutex. But, it's the caller's responsability to make sure that this has
15 #   been done.
16
17 # Setup file and tag signatures.  These should never change.
18 sub SIG_FILE     () { 'DPDB' }
19 sub SIG_HEADER   () { 'h'    }
20 sub SIG_INTERNAL () { 'i'    }
21 sub SIG_HASH     () { 'H'    }
22 sub SIG_ARRAY    () { 'A'    }
23 sub SIG_NULL     () { 'N'    }
24 sub SIG_DATA     () { 'D'    }
25 sub SIG_INDEX    () { 'I'    }
26 sub SIG_BLIST    () { 'B'    }
27 sub SIG_FREE     () { 'F'    }
28 sub SIG_KEYS     () { 'K'    }
29 sub SIG_SIZE     () {  1     }
30
31 ################################################################################
32
33 # Please refer to the pack() documentation for further information
34 my %StP = (
35     1 => 'C', # Unsigned char value
36     2 => 'n', # Unsigned short in "network" (big-endian) order
37     4 => 'N', # Unsigned long in "network" (big-endian) order
38     8 => 'Q', # Usigned quad (no order specified, presumably machine-dependent)
39 );
40
41 sub new {
42     my $class = shift;
43     my ($args) = @_;
44
45     print "\n********* NEW ********\n\n";
46     my $self = bless {
47         byte_size   => 4,
48
49         digest      => undef,
50         hash_size   => 16, # In bytes
51         max_buckets => 16,
52         num_txns    => 16, # HEAD plus 15 running txns
53         trans_id    => 0,  # Default to the HEAD
54
55         entries => {}, # This is the list of entries for transactions
56         storage => undef,
57     }, $class;
58
59     if ( defined $args->{pack_size} ) {
60         if ( lc $args->{pack_size} eq 'small' ) {
61             $args->{byte_size} = 2;
62         }
63         elsif ( lc $args->{pack_size} eq 'medium' ) {
64             $args->{byte_size} = 4;
65         }
66         elsif ( lc $args->{pack_size} eq 'large' ) {
67             $args->{byte_size} = 8;
68         }
69         else {
70             die "Unknown pack_size value: '$args->{pack_size}'\n";
71         }
72     }
73
74     # Grab the parameters we want to use
75     foreach my $param ( keys %$self ) {
76         next unless exists $args->{$param};
77         $self->{$param} = $args->{$param};
78     }
79
80     $self->{byte_pack} = $StP{ $self->byte_size };
81
82     ##
83     # Number of buckets per blist before another level of indexing is
84     # done. Increase this value for slightly greater speed, but larger database
85     # files. DO NOT decrease this value below 16, due to risk of recursive
86     # reindex overrun.
87     ##
88     if ( $self->{max_buckets} < 16 ) {
89         warn "Floor of max_buckets is 16. Setting it to 16 from '$self->{max_buckets}'\n";
90         $self->{max_buckets} = 16;
91     }
92
93     if ( !$self->{digest} ) {
94         require Digest::MD5;
95         $self->{digest} = \&Digest::MD5::md5;
96     }
97
98     return $self;
99 }
100
101 ################################################################################
102
103 sub read_value {
104     my $self = shift;
105     my ($obj, $key) = @_;
106
107     # This will be a Reference sector
108     my $sector = $self->_load_sector( $obj->_base_offset )
109         or die "How did read_value fail (no sector for '$obj')?!\n";
110
111     my $key_md5 = $self->_apply_digest( $key );
112
113     my $value_sector = $sector->get_data_for({
114         key_md5    => $key_md5,
115         allow_head => 1,
116     });
117
118     unless ( $value_sector ) {
119         $value_sector = DBM::Deep::Engine::Sector::Null->new({
120             engine => $self,
121             data   => undef,
122         });
123
124         $sector->write_data({
125             key_md5 => $key_md5,
126             key     => $key,
127             value   => $value_sector,
128         });
129     }
130
131     return $value_sector->data;
132 }
133
134 sub get_classname {
135     my $self = shift;
136     my ($obj) = @_;
137
138     # This will be a Reference sector
139     my $sector = $self->_load_sector( $obj->_base_offset )
140         or die "How did read_value fail (no sector for '$obj')?!\n";
141
142     return $sector->get_classname;
143 }
144
145 sub key_exists {
146     my $self = shift;
147     my ($obj, $key) = @_;
148
149     # This will be a Reference sector
150     my $sector = $self->_load_sector( $obj->_base_offset )
151         or die "How did key_exists fail (no sector for '$obj')?!\n";
152
153     my $data = $sector->get_data_for({
154         key_md5    => $self->_apply_digest( $key ),
155         allow_head => 1,
156     });
157
158     # exists() returns 1 or '' for true/false.
159     return $data ? 1 : '';
160 }
161
162 sub delete_key {
163     my $self = shift;
164     my ($obj, $key) = @_;
165
166     my $sector = $self->_load_sector( $obj->_base_offset )
167         or die "How did delete_key fail (no sector for '$obj')?!\n";
168
169     return $sector->delete_key({
170         key_md5    => $self->_apply_digest( $key ),
171         allow_head => 0,
172     });
173 }
174
175 sub write_value {
176     my $self = shift;
177     my ($obj, $key, $value) = @_;
178
179     my $r = Scalar::Util::reftype( $value ) || '';
180     {
181         last if $r eq '';
182         last if $r eq 'HASH';
183         last if $r eq 'ARRAY';
184
185         DBM::Deep->_throw_error(
186             "Storage of references of type '$r' is not supported."
187         );
188     }
189
190     my ($class, $type);
191     if ( !defined $value ) {
192         $class = 'DBM::Deep::Engine::Sector::Null';
193     }
194     elsif ( $r eq 'ARRAY' || $r eq 'HASH' ) {
195         if ( $r eq 'ARRAY' && tied(@$value) ) {
196             DBM::Deep->_throw_error( "Cannot store something that is tied." );
197         }
198         if ( $r eq 'HASH' && tied(%$value) ) {
199             DBM::Deep->_throw_error( "Cannot store something that is tied." );
200         }
201         $class = 'DBM::Deep::Engine::Sector::Reference';
202         $type = substr( $r, 0, 1 );
203     }
204     else {
205         $class = 'DBM::Deep::Engine::Sector::Scalar';
206     }
207
208     # This will be a Reference sector
209     my $sector = $self->_load_sector( $obj->_base_offset )
210         or die "How did write_value fail (no sector for '$obj')?!\n";
211
212     # Create this after loading the reference sector in case something bad happens.
213     # This way, we won't allocate value sector(s) needlessly.
214     my $value_sector = $class->new({
215         engine => $self,
216         data   => $value,
217         type   => $type,
218     });
219
220     $sector->write_data({
221         key     => $key,
222         key_md5 => $self->_apply_digest( $key ),
223         value   => $value_sector,
224     });
225
226     # This code is to make sure we write all the values in the $value to the disk
227     # and to make sure all changes to $value after the assignment are reflected
228     # on disk. This may be counter-intuitive at first, but it is correct dwimmery.
229     #   NOTE - simply tying $value won't perform a STORE on each value. Hence, the
230     # copy to a temp value.
231     if ( $r eq 'ARRAY' ) {
232         my @temp = @$value;
233         tie @$value, 'DBM::Deep', {
234             base_offset => $value_sector->offset,
235             storage     => $self->storage,
236             engine      => $self,
237         };
238         @$value = @temp;
239         bless $value, 'DBM::Deep::Array' unless Scalar::Util::blessed( $value );
240     }
241     elsif ( $r eq 'HASH' ) {
242         my %temp = %$value;
243         tie %$value, 'DBM::Deep', {
244             base_offset => $value_sector->offset,
245             storage     => $self->storage,
246             engine      => $self,
247         };
248
249         %$value = %temp;
250         bless $value, 'DBM::Deep::Hash' unless Scalar::Util::blessed( $value );
251     }
252
253     return 1;
254 }
255
256 sub get_next_key {
257     my $self = shift;
258     my ($obj, $prev_key) = @_;
259
260     # XXX Need to add logic about resetting the iterator if any key in the reference has changed
261     unless ( $prev_key ) {
262         $obj->{iterator} = DBM::Deep::Engine::Iterator->new({
263             base_offset => $obj->_base_offset,
264             engine      => $self,
265         });
266     }
267
268     return $obj->{iterator}->get_next_key;
269 }
270
271 ################################################################################
272
273 sub setup_fh {
274     my $self = shift;
275     my ($obj) = @_;
276
277     # We're opening the file.
278     unless ( $obj->_base_offset ) {
279         my $bytes_read = $self->_read_file_header;
280
281         # Creating a new file
282         unless ( $bytes_read ) {
283             $self->_write_file_header;
284
285             # 1) Create Array/Hash entry
286             my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
287                 engine => $self,
288                 type   => $obj->_type,
289             });
290             $obj->{base_offset} = $initial_reference->offset;
291
292             $self->storage->flush;
293         }
294         # Reading from an existing file
295         else {
296             $obj->{base_offset} = $bytes_read;
297             my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
298                 engine => $self,
299                 offset => $obj->_base_offset,
300             });
301             unless ( $initial_reference ) {
302                 DBM::Deep->_throw_error("Corrupted file, no master index record");
303             }
304
305             unless ($obj->_type eq $initial_reference->type) {
306                 DBM::Deep->_throw_error("File type mismatch");
307             }
308         }
309     }
310
311     return 1;
312 }
313
314 sub begin_work {
315     my $self = shift;
316     my ($obj) = @_;
317
318     if ( $self->trans_id ) {
319         DBM::Deep->_throw_error( "Cannot begin_work within a transaction" );
320     }
321
322     my @slots = $self->read_transaction_slots;
323     for my $i ( 1 .. @slots ) {
324         next if $slots[$i];
325         $slots[$i] = 1;
326         $self->set_trans_id( $i );
327         last;
328     }
329     $self->write_transaction_slots( @slots );
330
331     if ( !$self->trans_id ) {
332         DBM::Deep->_throw_error( "Cannot begin_work - no available transactions" );
333     }
334
335     return;
336 }
337
338 sub rollback {
339     my $self = shift;
340     my ($obj) = @_;
341
342     if ( !$self->trans_id ) {
343         DBM::Deep->_throw_error( "Cannot rollback without a transaction" );
344     }
345
346     # Each entry is the file location for a bucket that has a modification for
347     # this transaction. The entries need to be expunged.
348     foreach my $entry (@{ $self->get_entries } ) {
349         # Remove the entry here
350         my $read_loc = $entry
351           + $self->hash_size
352           + $self->byte_size
353           + $self->trans_id * $self->byte_size;
354
355         my $data_loc = $self->storage->read_at( $read_loc, $self->byte_size );
356         $data_loc = unpack( $StP{$self->byte_size}, $data_loc );
357         $self->storage->print_at( $read_loc, pack( $StP{$self->byte_size}, 0 ) );
358
359         if ( $data_loc > 1 ) {
360             $self->_load_sector( $data_loc )->free;
361         }
362     }
363
364     $self->clear_entries;
365
366     my @slots = $self->read_transaction_slots;
367     $slots[$self->trans_id] = 0;
368     $self->write_transaction_slots( @slots );
369     $self->set_trans_id( 0 );
370
371     return 1;
372 }
373
374 sub commit {
375     my $self = shift;
376     my ($obj) = @_;
377
378     if ( !$self->trans_id ) {
379         DBM::Deep->_throw_error( "Cannot commit without a transaction" );
380     }
381
382     print "TID: " . $self->trans_id, $/;
383     foreach my $entry (@{ $self->get_entries } ) {
384         print "$entry\n";
385         # Overwrite the entry in head with the entry in trans_id
386         my $base = $entry
387           + $self->hash_size
388           + $self->byte_size;
389
390         my $head_loc = $self->storage->read_at( $base, $self->byte_size );
391         $head_loc = unpack( $StP{$self->byte_size}, $head_loc );
392         my $trans_loc = $self->storage->read_at(
393             $base + $self->trans_id * $self->byte_size, $self->byte_size,
394         );
395
396         $self->storage->print_at( $base, $trans_loc );
397         $self->storage->print_at(
398             $base + $self->trans_id * $self->byte_size,
399             pack( $StP{$self->byte_size}, 0 ),
400         );
401
402         if ( $head_loc > 1 ) {
403             $self->_load_sector( $head_loc )->free;
404         }
405     }
406
407     $self->clear_entries;
408
409     my @slots = $self->read_transaction_slots;
410     $slots[$self->trans_id] = 0;
411     $self->write_transaction_slots( @slots );
412     $self->set_trans_id( 0 );
413
414     return 1;
415 }
416
417 sub read_transaction_slots {
418     my $self = shift;
419     return split '', unpack( "b32", $self->storage->read_at( $self->trans_loc, 4 ) );
420 }
421
422 sub write_transaction_slots {
423     my $self = shift;
424     $self->storage->print_at( $self->trans_loc,
425         pack( "b32", join('', @_) ),
426     );
427 }
428
429 sub get_entries {
430     my $self = shift;
431     return [ keys %{ $self->{entries}{$self->trans_id} ||= {} } ];
432 }
433
434 sub add_entry {
435     my $self = shift;
436     my ($trans_id, $loc) = @_;
437
438     print "$trans_id => $loc\n";
439     $self->{entries}{$trans_id} ||= {};
440     $self->{entries}{$trans_id}{$loc} = undef;
441     use Data::Dumper;print "$self: " . Dumper $self->{entries};
442 }
443
444 sub clear_entries {
445     my $self = shift;
446     print "Clearing\n";
447     delete $self->{entries}{$self->trans_id};
448 }
449
450 ################################################################################
451
452 {
453     my $header_fixed = length( SIG_FILE ) + 1 + 4 + 4;
454
455     sub _write_file_header {
456         my $self = shift;
457
458         my $header_var = 1 + 1 + 4 + 2 * $self->byte_size;
459
460         my $loc = $self->storage->request_space( $header_fixed + $header_var );
461
462         $self->storage->print_at( $loc,
463             SIG_FILE,
464             SIG_HEADER,
465             pack('N', 1),           # header version - at this point, we're at 9 bytes
466             pack('N', $header_var), # header size
467             # --- Above is $header_fixed. Below is $header_var
468             pack('C', $self->byte_size),
469             pack('C', $self->max_buckets),
470             pack('N', 0 ),                   # Running transactions
471             pack($StP{$self->byte_size}, 0), # Start of free chain (blist size)
472             pack($StP{$self->byte_size}, 0), # Start of free chain (data size)
473         );
474
475         $self->set_trans_loc( $header_fixed + 2 );
476         $self->set_chains_loc( $header_fixed + 6 );
477
478         return;
479     }
480
481     sub _read_file_header {
482         my $self = shift;
483
484         my $buffer = $self->storage->read_at( 0, $header_fixed );
485         return unless length($buffer);
486
487         my ($file_signature, $sig_header, $header_version, $size) = unpack(
488             'A4 A N N', $buffer
489         );
490
491         unless ( $file_signature eq SIG_FILE ) {
492             $self->storage->close;
493             DBM::Deep->_throw_error( "Signature not found -- file is not a Deep DB" );
494         }
495
496         unless ( $sig_header eq SIG_HEADER ) {
497             $self->storage->close;
498             DBM::Deep->_throw_error( "Old file version found." );
499         }
500
501         my $buffer2 = $self->storage->read_at( undef, $size );
502         my @values = unpack( 'C C', $buffer2 );
503
504         $self->set_trans_loc( $header_fixed + 2 );
505         $self->set_chains_loc( $header_fixed + 6 );
506
507         if ( @values < 2 || grep { !defined } @values ) {
508             $self->storage->close;
509             DBM::Deep->_throw_error("Corrupted file - bad header");
510         }
511
512         #XXX Add warnings if values weren't set right
513         @{$self}{qw(byte_size max_buckets)} = @values;
514
515         my $header_var = 1 + 1 + 4 + 2 * $self->byte_size;
516         unless ( $size eq $header_var ) {
517             $self->storage->close;
518             DBM::Deep->_throw_error( "Unexpected size found ($size <-> $header_var)." );
519         }
520
521         return length($buffer) + length($buffer2);
522     }
523 }
524
525 sub _load_sector {
526     my $self = shift;
527     my ($offset) = @_;
528
529     my $type = $self->storage->read_at( $offset, 1 );
530     return if $type eq chr(0);
531
532     if ( $type eq $self->SIG_ARRAY || $type eq $self->SIG_HASH ) {
533         return DBM::Deep::Engine::Sector::Reference->new({
534             engine => $self,
535             type   => $type,
536             offset => $offset,
537         });
538     }
539     # XXX Don't we need key_md5 here?
540     elsif ( $type eq $self->SIG_BLIST ) {
541         return DBM::Deep::Engine::Sector::BucketList->new({
542             engine => $self,
543             type   => $type,
544             offset => $offset,
545         });
546     }
547     elsif ( $type eq $self->SIG_NULL ) {
548         return DBM::Deep::Engine::Sector::Null->new({
549             engine => $self,
550             type   => $type,
551             offset => $offset,
552         });
553     }
554     elsif ( $type eq $self->SIG_DATA ) {
555         return DBM::Deep::Engine::Sector::Scalar->new({
556             engine => $self,
557             type   => $type,
558             offset => $offset,
559         });
560     }
561     # This was deleted from under us, so just return and let the caller figure it out.
562     elsif ( $type eq $self->SIG_FREE ) {
563         return;
564     }
565
566     die "'$offset': Don't know what to do with type '$type'\n";
567 }
568
569 sub _apply_digest {
570     my $self = shift;
571     return $self->{digest}->(@_);
572 }
573
574 sub _add_free_sector {
575     my $self = shift;
576     my ($offset, $size) = @_;
577
578     my $chains_offset;
579     # Data sector
580     if ( $size == 256 ) {
581         $chains_offset = $self->byte_size;
582     }
583     # Blist sector
584     else {
585         $chains_offset = 0;
586     }
587
588     my $old_head = $self->storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
589
590     $self->storage->print_at( $self->chains_loc + $chains_offset,
591         pack( $StP{$self->byte_size}, $offset ),
592     );
593
594     # Record the old head in the new sector after the signature
595     $self->storage->print_at( $offset + 1, $old_head );
596 }
597
598 sub _request_sector {
599     my $self = shift;
600     my ($size) = @_;
601
602     my $chains_offset;
603     # Data sector
604     if ( $size == 256 ) {
605         $chains_offset = $self->byte_size;
606     }
607     # Blist sector
608     else {
609         $chains_offset = 0;
610     }
611
612     my $old_head = $self->storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
613     my $loc = unpack( $StP{$self->byte_size}, $old_head );
614
615     # We don't have any free sectors of the right size, so allocate a new one.
616     unless ( $loc ) {
617         return $self->storage->request_space( $size );
618     }
619
620     my $new_head = $self->storage->read_at( $loc + 1, $self->byte_size );
621     $self->storage->print_at( $self->chains_loc + $chains_offset, $new_head );
622
623     return $loc;
624 }
625
626 ################################################################################
627
628 sub storage     { $_[0]{storage} }
629 sub byte_size   { $_[0]{byte_size} }
630 sub hash_size   { $_[0]{hash_size} }
631 sub num_txns    { $_[0]{num_txns} }
632 sub max_buckets { $_[0]{max_buckets} }
633 sub blank_md5   { chr(0) x $_[0]->hash_size }
634
635 sub trans_id     { $_[0]{trans_id} }
636 sub set_trans_id { $_[0]{trans_id} = $_[1] }
637
638 sub trans_loc     { $_[0]{trans_loc} }
639 sub set_trans_loc { $_[0]{trans_loc} = $_[1] }
640
641 sub chains_loc     { $_[0]{chains_loc} }
642 sub set_chains_loc { $_[0]{chains_loc} = $_[1] }
643
644 ################################################################################
645
646 package DBM::Deep::Engine::Iterator;
647
648 sub new {
649     my $class = shift;
650     my ($args) = @_;
651
652     my $self = bless {
653         breadcrumbs => [],
654         engine      => $args->{engine},
655         base_offset => $args->{base_offset},
656     }, $class;
657
658     Scalar::Util::weaken( $self->{engine} );
659
660     return $self;
661 }
662
663 sub reset {
664     my $self = shift;
665     $self->{breadcrumbs} = [];
666 }
667
668 sub get_next_key {
669     my $self = shift;
670
671     my $crumbs = $self->{breadcrumbs};
672
673     unless ( @$crumbs ) {
674         # This will be a Reference sector
675         my $sector = $self->{engine}->_load_sector( $self->{base_offset} )
676             # or die "Iterator: How did this fail (no ref sector for '$self->{base_offset}')?!\n";
677             # If no sector is found, thist must have been deleted from under us.
678             or return;
679         push @$crumbs, [ $sector->get_blist_loc, 0 ];
680     }
681
682     my $key;
683     while ( 1 ) {
684         my ($offset, $idx) = @{ $crumbs->[-1] };
685         unless ( $offset ) {
686             $self->reset;
687             last;
688         }
689
690         if ( $idx >= $self->{engine}->max_buckets ) {
691             $self->reset;
692             last;
693         }
694
695         my $sector = $self->{engine}->_load_sector( $offset )
696             or die "Iterator: How did this fail (no blist sector for '$offset')?!\n";
697
698         #XXX Think this through!
699         my $loc =  $sector->get_data_location_for({
700             idx => $idx,
701         });
702         unless ( $loc ) {
703             $crumbs->[-1][1]++;
704             next;
705         }
706
707         my $key_sector = $sector->get_key_for( $idx );
708         unless ( $key_sector ) {
709             $self->reset;
710             last;
711         }
712
713         $crumbs->[-1][1]++;
714         $key = $key_sector->data;
715         last;
716     }
717
718     return $key;
719 }
720
721 package DBM::Deep::Engine::Sector;
722
723 sub new {
724     my $self = bless $_[1], $_[0];
725     Scalar::Util::weaken( $self->{engine} );
726     $self->_init;
727     return $self;
728 }
729 sub _init {}
730 sub clone { die "Must be implemented in the child class" }
731
732 sub engine { $_[0]{engine} }
733 sub offset { $_[0]{offset} }
734 sub type   { $_[0]{type} }
735
736 sub free {
737     my $self = shift;
738
739     $self->engine->storage->print_at( $self->offset,
740         $self->engine->SIG_FREE,
741         chr(0) x ($self->size - 1),
742     );
743
744     $self->engine->_add_free_sector(
745         $self->offset, $self->size,
746     );
747
748     return;
749 }
750
751 package DBM::Deep::Engine::Sector::Data;
752
753 our @ISA = qw( DBM::Deep::Engine::Sector );
754
755 # This is in bytes
756 sub size { return 256 }
757
758 sub clone {
759     my $self = shift;
760     return ref($self)->new({
761         engine => $self->engine,
762         data   => $self->data,
763         type   => $self->type,
764     });
765 }
766
767 package DBM::Deep::Engine::Sector::Scalar;
768
769 our @ISA = qw( DBM::Deep::Engine::Sector::Data );
770
771 sub free {
772     my $self = shift;
773
774     my $chain_loc = $self->chain_loc;
775
776     $self->SUPER::free();
777
778     if ( $chain_loc ) {
779         $self->engine->_load_sector( $chain_loc )->free;
780     }
781
782     return;
783 }
784
785 sub type { $_[0]{engine}->SIG_DATA }
786 sub _init {
787     my $self = shift;
788
789     my $engine = $self->engine;
790
791     unless ( $self->offset ) {
792         my $data_section = $self->size - 3 - 1 * $engine->byte_size;
793
794         $self->{offset} = $engine->_request_sector( $self->size );
795
796         my $data = delete $self->{data};
797         my $dlen = length $data;
798         my $continue = 1;
799         my $curr_offset = $self->offset;
800         while ( $continue ) {
801
802             my $next_offset = 0;
803
804             my ($leftover, $this_len, $chunk);
805             if ( $dlen > $data_section ) {
806                 $leftover = 0;
807                 $this_len = $data_section;
808                 $chunk = substr( $data, 0, $this_len );
809
810                 $dlen -= $data_section;
811                 $next_offset = $engine->_request_sector( $self->size );
812                 $data = substr( $data, $this_len );
813             }
814             else {
815                 $leftover = $data_section - $dlen;
816                 $this_len = $dlen;
817                 $chunk = $data;
818
819                 $continue = 0;
820             }
821
822             $engine->storage->print_at( $curr_offset,
823                 $self->type,                                     # Sector type
824                 pack( $StP{1}, 0 ),                              # Recycled counter
825                 pack( $StP{$engine->byte_size}, $next_offset ),  # Chain loc
826                 pack( $StP{1}, $this_len ),                      # Data length
827                 $chunk,                                          # Data to be stored in this sector
828                 chr(0) x $leftover,                              # Zero-fill the rest
829             );
830
831             $curr_offset = $next_offset;
832         }
833
834         return;
835     }
836 }
837
838 sub data_length {
839     my $self = shift;
840
841     my $buffer = $self->engine->storage->read_at(
842         $self->offset + 2 + $self->engine->byte_size, 1
843     );
844
845     return unpack( $StP{1}, $buffer );
846 }
847
848 sub chain_loc {
849     my $self = shift;
850     my $chain_loc = $self->engine->storage->read_at(
851         $self->offset + 2, $self->engine->byte_size,
852     );
853     return unpack( $StP{$self->engine->byte_size}, $chain_loc );
854 }
855
856 sub data {
857     my $self = shift;
858
859     my $data;
860     while ( 1 ) {
861         my $chain_loc = $self->chain_loc;
862
863         $data .= $self->engine->storage->read_at(
864             $self->offset + 2 + $self->engine->byte_size + 1, $self->data_length,
865         );
866
867         last unless $chain_loc;
868
869         $self = $self->engine->_load_sector( $chain_loc );
870     }
871
872     return $data;
873 }
874
875 package DBM::Deep::Engine::Sector::Null;
876
877 our @ISA = qw( DBM::Deep::Engine::Sector::Data );
878
879 sub type { $_[0]{engine}->SIG_NULL }
880 sub data_length { 0 }
881 sub data { return }
882
883 sub _init {
884     my $self = shift;
885
886     my $engine = $self->engine;
887
888     unless ( $self->offset ) {
889         my $leftover = $self->size - 3 - 1 * $engine->byte_size;
890
891         $self->{offset} = $engine->_request_sector( $self->size );
892         $engine->storage->print_at( $self->offset,
893             $self->type,                          # Sector type
894             pack( $StP{1}, 0 ),                   # Recycled counter
895             pack( $StP{$engine->byte_size}, 0 ),  # Chain loc
896             pack( $StP{1}, $self->data_length ),  # Data length
897             chr(0) x $leftover,                   # Zero-fill the rest
898         );
899
900         return;
901     }
902 }
903
904 package DBM::Deep::Engine::Sector::Reference;
905
906 our @ISA = qw( DBM::Deep::Engine::Sector::Data );
907
908 sub _init {
909     my $self = shift;
910
911     my $engine = $self->engine;
912
913     unless ( $self->offset ) {
914         my $classname = Scalar::Util::blessed( delete $self->{data} );
915         my $leftover = $self->size - 4 - 2 * $engine->byte_size;
916
917         my $class_offset = 0;
918         if ( defined $classname ) {
919             my $class_sector = DBM::Deep::Engine::Sector::Scalar->new({
920                 engine => $self->engine,
921                 data   => $classname,
922             });
923             $class_offset = $class_sector->offset;
924         }
925
926         $self->{offset} = $engine->_request_sector( $self->size );
927         $engine->storage->print_at( $self->offset,
928             $self->type,                                     # Sector type
929             pack( $StP{1}, 0 ),                              # Recycled counter
930             pack( $StP{$engine->byte_size}, 0 ),             # Index/BList loc
931             pack( $StP{$engine->byte_size}, $class_offset ), # Classname loc
932             chr(0) x $leftover,                              # Zero-fill the rest
933         );
934
935         return;
936     }
937
938     $self->{type} = $engine->storage->read_at( $self->offset, 1 );
939
940     return;
941 }
942
943 sub get_data_for {
944     my $self = shift;
945     my ($args) = @_;
946
947     # Assume that the head is not allowed unless otherwise specified.
948     $args->{allow_head} = 0 unless exists $args->{allow_head};
949
950     # Assume we don't create a new blist location unless otherwise specified.
951     $args->{create} = 0 unless exists $args->{create};
952
953     my $blist = $self->get_bucket_list({
954         key_md5 => $args->{key_md5},
955         create  => $args->{create},
956     });
957     return unless $blist && $blist->{found};
958
959     # At this point, $blist knows where the md5 is. What it -doesn't- know yet
960     # is whether or not this transaction has this key. That's part of the next
961     # function call.
962     my $location = $blist->get_data_location_for({
963         allow_head => $args->{allow_head},
964     }) or return;
965
966     return $self->engine->_load_sector( $location );
967 }
968
969 sub write_data {
970     my $self = shift;
971     my ($args) = @_;
972
973     my $blist = $self->get_bucket_list({
974         key_md5 => $args->{key_md5},
975         create  => 1,
976     }) or die "How did write_data fail (no blist)?!\n";
977
978     # Handle any transactional bookkeeping.
979     if ( $self->engine->trans_id ) {
980         if ( ! $blist->{found} ) {
981             $blist->mark_deleted({
982                 trans_id => 0,
983             });
984         }
985     }
986     else {
987         my @transactions = $self->engine->read_transaction_slots;
988         my @trans_ids = grep { $transactions[$_] } 0 .. $#transactions;
989         if ( $blist->{found} ) {
990             if ( @trans_ids ) {
991                 my $old_value = $blist->get_data_for;
992                 foreach my $other_trans_id ( @trans_ids ) {
993                     next if $blist->get_data_location_for({ trans_id => $other_trans_id, allow_head => 0 });
994                     print "write_md5 to save a value\n";
995                     $blist->write_md5({
996                         trans_id => $other_trans_id,
997                         key      => $args->{key},
998                         key_md5  => $args->{key_md5},
999                         value    => $old_value->clone,
1000                     });
1001                 }
1002             }
1003         }
1004         else {
1005             if ( @trans_ids ) {
1006                 foreach my $other_trans_id ( @trans_ids ) {
1007                     next if $blist->get_data_location_for({ trans_id => $other_trans_id, allow_head => 0 });
1008                     $blist->mark_deleted({
1009                         trans_id => $other_trans_id,
1010                     });
1011                 }
1012             }
1013         }
1014     }
1015
1016     #XXX Is this safe to do transactionally?
1017     # Free the place we're about to write to.
1018     if ( $blist->get_data_location_for({ allow_head => 0 }) ) {
1019         $blist->get_data_for({ allow_head => 0 })->free;
1020     }
1021
1022     $blist->write_md5({
1023         key      => $args->{key},
1024         key_md5  => $args->{key_md5},
1025         value    => $args->{value},
1026     });
1027 }
1028
1029 sub delete_key {
1030     my $self = shift;
1031     my ($args) = @_;
1032
1033     # XXX What should happen if this fails?
1034     my $blist = $self->get_bucket_list({
1035         key_md5 => $args->{key_md5},
1036     }) or die "How did delete_key fail (no blist)?!\n";
1037
1038     # Save the location so that we can free the data
1039     my $location = $blist->get_data_location_for({
1040         allow_head => 0,
1041     });
1042     my $old_value = $self->engine->_load_sector( $location );
1043
1044     if ( $self->engine->trans_id == 0 ) {
1045         my @transactions = $self->engine->read_transaction_slots;
1046         my @trans_ids = grep { $transactions[$_] } 0 .. $#transactions;
1047         if ( @trans_ids ) {
1048             foreach my $other_trans_id ( @trans_ids ) {
1049                 next if $blist->get_data_location_for({ trans_id => $other_trans_id, allow_head => 0 });
1050                 $blist->write_md5({
1051                     trans_id => $other_trans_id,
1052                     key      => $args->{key},
1053                     key_md5  => $args->{key_md5},
1054                     value    => $old_value->clone,
1055                 });
1056             }
1057         }
1058     }
1059
1060     $blist->mark_deleted( $args );
1061
1062     my $data = $old_value->data;
1063     $old_value->free;
1064
1065     return $data;
1066 }
1067
1068 sub get_blist_loc {
1069     my $self = shift;
1070
1071     my $e = $self->engine;
1072     my $blist_loc = $e->storage->read_at( $self->offset + 2, $e->byte_size );
1073     return unpack( $StP{$e->byte_size}, $blist_loc );
1074 }
1075
1076 sub get_bucket_list {
1077     my $self = shift;
1078     my ($args) = @_;
1079     $args ||= {};
1080
1081     # XXX Add in check here for recycling?
1082
1083     my $engine = $self->engine;
1084
1085     my $blist_loc = $self->get_blist_loc;
1086
1087     # There's no index or blist yet
1088     unless ( $blist_loc ) {
1089         return unless $args->{create};
1090
1091         my $blist = DBM::Deep::Engine::Sector::BucketList->new({
1092             engine  => $engine,
1093             key_md5 => $args->{key_md5},
1094         });
1095
1096         $engine->storage->print_at( $self->offset + 2,
1097             pack( $StP{$engine->byte_size}, $blist->offset ),
1098         );
1099
1100         return $blist;
1101     }
1102
1103     return DBM::Deep::Engine::Sector::BucketList->new({
1104         engine  => $engine,
1105         offset  => $blist_loc,
1106         key_md5 => $args->{key_md5},
1107     });
1108 }
1109
1110 sub get_classname {
1111     my $self = shift;
1112
1113     my $class_offset = $self->engine->storage->read_at(
1114         $self->offset + 2 + 1 * $self->engine->byte_size, $self->engine->byte_size,
1115     );
1116     $class_offset = unpack ( $StP{$self->engine->byte_size}, $class_offset );
1117
1118     return unless $class_offset;
1119
1120     return $self->engine->_load_sector( $class_offset )->data;
1121 }
1122
1123 sub data {
1124     my $self = shift;
1125
1126     my $new_obj = DBM::Deep->new({
1127         type        => $self->type,
1128         base_offset => $self->offset,
1129         storage     => $self->engine->storage,
1130         engine      => $self->engine,
1131     });
1132
1133     if ( $self->engine->storage->{autobless} ) {
1134         my $classname = $self->get_classname;
1135         if ( defined $classname ) {
1136             bless $new_obj, $classname;
1137         }
1138     }
1139
1140     return $new_obj;
1141 }
1142
1143 package DBM::Deep::Engine::Sector::BucketList;
1144
1145 our @ISA = qw( DBM::Deep::Engine::Sector );
1146
1147 sub idx_for_txn { return $_[1] + 1 }
1148
1149 sub _init {
1150     my $self = shift;
1151
1152     my $engine = $self->engine;
1153
1154     unless ( $self->offset ) {
1155         my $leftover = $self->size - $self->base_size;
1156
1157         $self->{offset} = $engine->_request_sector( $self->size );
1158         $engine->storage->print_at( $self->offset,
1159             $engine->SIG_BLIST, # Sector type
1160             pack( $StP{1}, 0 ), # Recycled counter
1161             chr(0) x $leftover, # Zero-fill the data
1162         );
1163     }
1164
1165     if ( $self->{key_md5} ) {
1166         $self->find_md5;
1167     }
1168
1169     return $self;
1170 }
1171
1172 sub base_size { 2 } # Sig + recycled counter
1173
1174 sub size {
1175     my $self = shift;
1176     unless ( $self->{size} ) {
1177         my $e = $self->engine;
1178         $self->{size} = $self->base_size + $e->max_buckets * $self->bucket_size; # Base + numbuckets * bucketsize
1179     }
1180     return $self->{size};
1181 }
1182
1183 sub bucket_size {
1184     my $self = shift;
1185     unless ( $self->{bucket_size} ) {
1186         my $e = $self->engine;
1187         # Key + transactions
1188         my $locs_size = (1 + $e->num_txns ) * $e->byte_size;
1189         $self->{bucket_size} = $e->hash_size + $locs_size;
1190     }
1191     return $self->{bucket_size};
1192 }
1193
1194 sub has_md5 {
1195     my $self = shift;
1196     unless ( exists $self->{found} ) {
1197         $self->find_md5;
1198     }
1199     return $self->{found};
1200 }
1201
1202 sub find_md5 {
1203     my $self = shift;
1204
1205     $self->{found} = undef;
1206     $self->{idx}   = -1;
1207
1208     # If we don't have an MD5, then what are we supposed to do?
1209     unless ( exists $self->{key_md5} ) {
1210         DBM::Deep->_throw_error( "Cannot find_md5 without a key_md5 set" );
1211     }
1212
1213     my $e = $self->engine;
1214     foreach my $idx ( 0 .. $e->max_buckets - 1 ) {
1215         my $potential = $e->storage->read_at(
1216             $self->offset + $self->base_size + $idx * $self->bucket_size, $e->hash_size,
1217         );
1218
1219         if ( $potential eq $e->blank_md5 ) {
1220             $self->{idx} = $idx;
1221             return;
1222         }
1223
1224         if ( $potential eq $self->{key_md5} ) {
1225             $self->{found} = 1;
1226             $self->{idx} = $idx;
1227             return;
1228         }
1229     }
1230
1231     return;
1232 }
1233
1234 sub write_md5 {
1235     my $self = shift;
1236     my ($args) = @_;
1237
1238     DBM::Deep->_throw_error( "write_md5: no key" ) unless exists $args->{key};
1239     DBM::Deep->_throw_error( "write_md5: no key_md5" ) unless exists $args->{key_md5};
1240     DBM::Deep->_throw_error( "write_md5: no value" ) unless exists $args->{value};
1241
1242     my $engine = $self->engine;
1243
1244     $args->{trans_id} = $engine->trans_id unless exists $args->{trans_id};
1245
1246     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
1247     print "Adding $args->{trans_id} -> $spot\n";
1248     $engine->add_entry( $args->{trans_id}, $spot );
1249
1250     unless ($self->{found}) {
1251         my $key_sector = DBM::Deep::Engine::Sector::Scalar->new({
1252             engine => $engine,
1253             data   => $args->{key},
1254         });
1255
1256         $engine->storage->print_at( $spot,
1257             $args->{key_md5},
1258             pack( $StP{$engine->byte_size}, $key_sector->offset ),
1259         );
1260     }
1261
1262     my $loc = $spot
1263       + $engine->hash_size
1264       + $engine->byte_size
1265       + $args->{trans_id} * $engine->byte_size;
1266
1267     $engine->storage->print_at( $loc,
1268         pack( $StP{$engine->byte_size}, $args->{value}->offset ),
1269     );
1270 }
1271
1272 sub mark_deleted {
1273     my $self = shift;
1274     my ($args) = @_;
1275     $args ||= {};
1276
1277     my $engine = $self->engine;
1278
1279     $args->{trans_id} = $engine->trans_id unless exists $args->{trans_id};
1280
1281     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
1282     $engine->add_entry( $args->{trans_id}, $spot );
1283
1284     my $loc = $spot
1285       + $engine->hash_size
1286       + $engine->byte_size
1287       + $args->{trans_id} * $engine->byte_size;
1288
1289     $engine->storage->print_at( $loc,
1290         pack( $StP{$engine->byte_size}, 1 ), # 1 is the marker for deleted
1291     );
1292 }
1293
1294 sub delete_md5 {
1295     my $self = shift;
1296     my ($args) = @_;
1297
1298     my $engine = $self->engine;
1299     return undef unless $self->{found};
1300
1301     # Save the location so that we can free the data
1302     my $location = $self->get_data_location_for({
1303         allow_head => 0,
1304     });
1305     my $key_sector = $self->get_key_for;
1306
1307     #XXX This isn't going to work right and you know it! This eradicates data
1308     # that we're not ready to eradicate just yet.
1309     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
1310     $engine->storage->print_at( $spot,
1311         $engine->storage->read_at(
1312             $spot + $self->bucket_size,
1313             $self->bucket_size * ( $engine->num_txns - $self->{idx} - 1 ),
1314         ),
1315         chr(0) x $self->bucket_size,
1316     );
1317
1318     $key_sector->free;
1319
1320     my $data_sector = $self->engine->_load_sector( $location );
1321     my $data = $data_sector->data;
1322     $data_sector->free;
1323
1324     return $data;
1325 }
1326
1327 sub get_data_location_for {
1328     my $self = shift;
1329     my ($args) = @_;
1330     $args ||= {};
1331
1332     $args->{allow_head} = 0 unless exists $args->{allow_head};
1333     $args->{trans_id}   = $self->engine->trans_id unless exists $args->{trans_id};
1334     $args->{idx}        = $self->{idx} unless exists $args->{idx};
1335
1336     my $location = $self->engine->storage->read_at(
1337         $self->offset + $self->base_size
1338       + $args->{idx} * $self->bucket_size
1339       + $self->engine->hash_size
1340       + $self->engine->byte_size
1341       + $args->{trans_id} * $self->engine->byte_size,
1342         $self->engine->byte_size,
1343     );
1344     my $loc = unpack( $StP{$self->engine->byte_size}, $location );
1345
1346     # If we're in a transaction and we never wrote to this location, try the
1347     # HEAD instead.
1348     if ( $args->{trans_id} && !$loc && $args->{allow_head} ) {
1349         return $self->get_data_location_for({
1350             trans_id   => 0,
1351             allow_head => 1,
1352         });
1353     }
1354     return $loc <= 1 ? 0 : $loc;
1355 }
1356
1357 sub get_data_for {
1358     my $self = shift;
1359     my ($args) = @_;
1360     $args ||= {};
1361
1362     return unless $self->{found};
1363     my $location = $self->get_data_location_for({
1364         allow_head => $args->{allow_head},
1365     });
1366     return $self->engine->_load_sector( $location );
1367 }
1368
1369 sub get_key_for {
1370     my $self = shift;
1371     my ($idx) = @_;
1372     $idx = $self->{idx} unless defined $idx;
1373
1374     my $location = $self->engine->storage->read_at(
1375         $self->offset + $self->base_size + $idx * $self->bucket_size + $self->engine->hash_size,
1376         $self->engine->byte_size,
1377     );
1378     $location = unpack( $StP{$self->engine->byte_size}, $location );
1379     return unless $location;
1380     return $self->engine->_load_sector( $location );
1381 }
1382
1383 1;
1384 __END__