r6122@000-443-371 (orig r9951): rkinyon | 2007-09-19 22:33:23 -0400
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 use 5.006_000;
4
5 use strict;
6 use warnings;
7
8 our $VERSION = q(1.0002);
9
10 use Scalar::Util ();
11
12 # File-wide notes:
13 # * Every method in here assumes that the storage has been appropriately
14 #   safeguarded. This can be anything from flock() to some sort of manual
15 #   mutex. But, it's the caller's responsability to make sure that this has
16 #   been done.
17
18 # Setup file and tag signatures.  These should never change.
19 sub SIG_FILE     () { 'DPDB' }
20 sub SIG_HEADER   () { 'h'    }
21 sub SIG_HASH     () { 'H'    }
22 sub SIG_ARRAY    () { 'A'    }
23 sub SIG_NULL     () { 'N'    }
24 sub SIG_DATA     () { 'D'    }
25 sub SIG_INDEX    () { 'I'    }
26 sub SIG_BLIST    () { 'B'    }
27 sub SIG_FREE     () { 'F'    }
28 sub SIG_SIZE     () {  1     }
29
30 my $STALE_SIZE = 2;
31
32 # Please refer to the pack() documentation for further information
33 my %StP = (
34     1 => 'C', # Unsigned char value (no order needed as it's just one byte)
35     2 => 'n', # Unsigned short in "network" (big-endian) order
36     4 => 'N', # Unsigned long in "network" (big-endian) order
37     8 => 'Q', # Usigned quad (no order specified, presumably machine-dependent)
38 );
39
40 ################################################################################
41
42 sub new {
43     my $class = shift;
44     my ($args) = @_;
45
46     my $self = bless {
47         byte_size   => 4,
48
49         digest      => undef,
50         hash_size   => 16,  # In bytes
51         hash_chars  => 256, # Number of chars the algorithm uses per byte
52         max_buckets => 16,
53         num_txns    => 1,   # The HEAD
54         trans_id    => 0,   # Default to the HEAD
55
56         data_sector_size => 64, # Size in bytes of each data sector
57
58         entries => {}, # This is the list of entries for transactions
59         storage => undef,
60     }, $class;
61
62     # Never allow byte_size to be set directly.
63     delete $args->{byte_size};
64     if ( defined $args->{pack_size} ) {
65         if ( lc $args->{pack_size} eq 'small' ) {
66             $args->{byte_size} = 2;
67         }
68         elsif ( lc $args->{pack_size} eq 'medium' ) {
69             $args->{byte_size} = 4;
70         }
71         elsif ( lc $args->{pack_size} eq 'large' ) {
72             $args->{byte_size} = 8;
73         }
74         else {
75             DBM::Deep->_throw_error( "Unknown pack_size value: '$args->{pack_size}'" );
76         }
77     }
78
79     # Grab the parameters we want to use
80     foreach my $param ( keys %$self ) {
81         next unless exists $args->{$param};
82         $self->{$param} = $args->{$param};
83     }
84
85     my %validations = (
86         max_buckets      => { floor => 16, ceil => 256 },
87         num_txns         => { floor => 1,  ceil => 255 },
88         data_sector_size => { floor => 32, ceil => 256 },
89     );
90
91     while ( my ($attr, $c) = each %validations ) {
92         if (   !defined $self->{$attr}
93             || !length $self->{$attr}
94             || $self->{$attr} =~ /\D/
95             || $self->{$attr} < $c->{floor}
96         ) {
97             $self->{$attr} = '(undef)' if !defined $self->{$attr};
98             warn "Floor of $attr is $c->{floor}. Setting it to $c->{floor} from '$self->{$attr}'\n";
99             $self->{$attr} = $c->{floor};
100         }
101         elsif ( $self->{$attr} > $c->{ceil} ) {
102             warn "Ceiling of $attr is $c->{ceil}. Setting it to $c->{ceil} from '$self->{$attr}'\n";
103             $self->{$attr} = $c->{ceil};
104         }
105     }
106
107     if ( !$self->{digest} ) {
108         require Digest::MD5;
109         $self->{digest} = \&Digest::MD5::md5;
110     }
111
112     return $self;
113 }
114
115 ################################################################################
116
117 sub read_value {
118     my $self = shift;
119     my ($obj, $key) = @_;
120
121     # This will be a Reference sector
122     my $sector = $self->_load_sector( $obj->_base_offset )
123         or return;
124
125     if ( $sector->staleness != $obj->_staleness ) {
126         return;
127     }
128
129     my $key_md5 = $self->_apply_digest( $key );
130
131     my $value_sector = $sector->get_data_for({
132         key_md5    => $key_md5,
133         allow_head => 1,
134     });
135
136     unless ( $value_sector ) {
137         $value_sector = DBM::Deep::Engine::Sector::Null->new({
138             engine => $self,
139             data   => undef,
140         });
141
142         $sector->write_data({
143             key_md5 => $key_md5,
144             key     => $key,
145             value   => $value_sector,
146         });
147     }
148
149     return $value_sector->data;
150 }
151
152 sub get_classname {
153     my $self = shift;
154     my ($obj) = @_;
155
156     # This will be a Reference sector
157     my $sector = $self->_load_sector( $obj->_base_offset )
158         or DBM::Deep->_throw_error( "How did get_classname fail (no sector for '$obj')?!" );
159
160     if ( $sector->staleness != $obj->_staleness ) {
161         return;
162     }
163
164     return $sector->get_classname;
165 }
166
167 sub key_exists {
168     my $self = shift;
169     my ($obj, $key) = @_;
170
171     # This will be a Reference sector
172     my $sector = $self->_load_sector( $obj->_base_offset )
173         or return '';
174
175     if ( $sector->staleness != $obj->_staleness ) {
176         return '';
177     }
178
179     my $data = $sector->get_data_for({
180         key_md5    => $self->_apply_digest( $key ),
181         allow_head => 1,
182     });
183
184     # exists() returns 1 or '' for true/false.
185     return $data ? 1 : '';
186 }
187
188 sub delete_key {
189     my $self = shift;
190     my ($obj, $key) = @_;
191
192     my $sector = $self->_load_sector( $obj->_base_offset )
193         or return;
194
195     if ( $sector->staleness != $obj->_staleness ) {
196         return;
197     }
198
199     return $sector->delete_key({
200         key_md5    => $self->_apply_digest( $key ),
201         allow_head => 0,
202     });
203 }
204
205 sub write_value {
206     my $self = shift;
207     my ($obj, $key, $value) = @_;
208
209     my $r = Scalar::Util::reftype( $value ) || '';
210     {
211         last if $r eq '';
212         last if $r eq 'HASH';
213         last if $r eq 'ARRAY';
214
215         DBM::Deep->_throw_error(
216             "Storage of references of type '$r' is not supported."
217         );
218     }
219
220     my ($class, $type);
221     if ( !defined $value ) {
222         $class = 'DBM::Deep::Engine::Sector::Null';
223     }
224     elsif ( $r eq 'ARRAY' || $r eq 'HASH' ) {
225         if ( $r eq 'ARRAY' && tied(@$value) ) {
226             DBM::Deep->_throw_error( "Cannot store something that is tied." );
227         }
228         if ( $r eq 'HASH' && tied(%$value) ) {
229             DBM::Deep->_throw_error( "Cannot store something that is tied." );
230         }
231         $class = 'DBM::Deep::Engine::Sector::Reference';
232         $type = substr( $r, 0, 1 );
233     }
234     else {
235         $class = 'DBM::Deep::Engine::Sector::Scalar';
236     }
237
238     # This will be a Reference sector
239     my $sector = $self->_load_sector( $obj->_base_offset )
240         or DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep." );
241
242     if ( $sector->staleness != $obj->_staleness ) {
243         DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep.n" );
244     }
245
246     # Create this after loading the reference sector in case something bad happens.
247     # This way, we won't allocate value sector(s) needlessly.
248     my $value_sector = $class->new({
249         engine => $self,
250         data   => $value,
251         type   => $type,
252     });
253
254     $sector->write_data({
255         key     => $key,
256         key_md5 => $self->_apply_digest( $key ),
257         value   => $value_sector,
258     });
259
260     # This code is to make sure we write all the values in the $value to the disk
261     # and to make sure all changes to $value after the assignment are reflected
262     # on disk. This may be counter-intuitive at first, but it is correct dwimmery.
263     #   NOTE - simply tying $value won't perform a STORE on each value. Hence, the
264     # copy to a temp value.
265     if ( $r eq 'ARRAY' ) {
266         my @temp = @$value;
267         tie @$value, 'DBM::Deep', {
268             base_offset => $value_sector->offset,
269             staleness   => $value_sector->staleness,
270             storage     => $self->storage,
271             engine      => $self,
272         };
273         @$value = @temp;
274         bless $value, 'DBM::Deep::Array' unless Scalar::Util::blessed( $value );
275     }
276     elsif ( $r eq 'HASH' ) {
277         my %temp = %$value;
278         tie %$value, 'DBM::Deep', {
279             base_offset => $value_sector->offset,
280             staleness   => $value_sector->staleness,
281             storage     => $self->storage,
282             engine      => $self,
283         };
284
285         %$value = %temp;
286         bless $value, 'DBM::Deep::Hash' unless Scalar::Util::blessed( $value );
287     }
288
289     return 1;
290 }
291
292 # XXX Add staleness here
293 sub get_next_key {
294     my $self = shift;
295     my ($obj, $prev_key) = @_;
296
297     # XXX Need to add logic about resetting the iterator if any key in the reference has changed
298     unless ( $prev_key ) {
299         $obj->{iterator} = DBM::Deep::Iterator->new({
300             base_offset => $obj->_base_offset,
301             engine      => $self,
302         });
303     }
304
305     return $obj->{iterator}->get_next_key( $obj );
306 }
307
308 ################################################################################
309
310 sub setup_fh {
311     my $self = shift;
312     my ($obj) = @_;
313
314     # We're opening the file.
315     unless ( $obj->_base_offset ) {
316         my $bytes_read = $self->_read_file_header;
317
318         # Creating a new file
319         unless ( $bytes_read ) {
320             $self->_write_file_header;
321
322             # 1) Create Array/Hash entry
323             my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
324                 engine => $self,
325                 type   => $obj->_type,
326             });
327             $obj->{base_offset} = $initial_reference->offset;
328             $obj->{staleness} = $initial_reference->staleness;
329
330             $self->storage->flush;
331         }
332         # Reading from an existing file
333         else {
334             $obj->{base_offset} = $bytes_read;
335             my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
336                 engine => $self,
337                 offset => $obj->_base_offset,
338             });
339             unless ( $initial_reference ) {
340                 DBM::Deep->_throw_error("Corrupted file, no master index record");
341             }
342
343             unless ($obj->_type eq $initial_reference->type) {
344                 DBM::Deep->_throw_error("File type mismatch");
345             }
346
347             $obj->{staleness} = $initial_reference->staleness;
348         }
349     }
350
351     return 1;
352 }
353
354 sub begin_work {
355     my $self = shift;
356     my ($obj) = @_;
357
358     if ( $self->trans_id ) {
359         DBM::Deep->_throw_error( "Cannot begin_work within an active transaction" );
360     }
361
362     my @slots = $self->read_txn_slots;
363     my $found;
364     for my $i ( 0 .. $#slots ) {
365         next if $slots[$i];
366
367         $slots[$i] = 1;
368         $self->set_trans_id( $i + 1 );
369         $found = 1;
370         last;
371     }
372     unless ( $found ) {
373         DBM::Deep->_throw_error( "Cannot allocate transaction ID" );
374     }
375     $self->write_txn_slots( @slots );
376
377     if ( !$self->trans_id ) {
378         DBM::Deep->_throw_error( "Cannot begin_work - no available transactions" );
379     }
380
381     return;
382 }
383
384 sub rollback {
385     my $self = shift;
386     my ($obj) = @_;
387
388     if ( !$self->trans_id ) {
389         DBM::Deep->_throw_error( "Cannot rollback without an active transaction" );
390     }
391
392     # Each entry is the file location for a bucket that has a modification for
393     # this transaction. The entries need to be expunged.
394     foreach my $entry (@{ $self->get_entries } ) {
395         # Remove the entry here
396         my $read_loc = $entry
397           + $self->hash_size
398           + $self->byte_size
399           + $self->byte_size
400           + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE );
401
402         my $data_loc = $self->storage->read_at( $read_loc, $self->byte_size );
403         $data_loc = unpack( $StP{$self->byte_size}, $data_loc );
404         $self->storage->print_at( $read_loc, pack( $StP{$self->byte_size}, 0 ) );
405
406         if ( $data_loc > 1 ) {
407             $self->_load_sector( $data_loc )->free;
408         }
409     }
410
411     $self->clear_entries;
412
413     my @slots = $self->read_txn_slots;
414     $slots[$self->trans_id-1] = 0;
415     $self->write_txn_slots( @slots );
416     $self->inc_txn_staleness_counter( $self->trans_id );
417     $self->set_trans_id( 0 );
418
419     return 1;
420 }
421
422 sub commit {
423     my $self = shift;
424     my ($obj) = @_;
425
426     if ( !$self->trans_id ) {
427         DBM::Deep->_throw_error( "Cannot commit without an active transaction" );
428     }
429
430     foreach my $entry (@{ $self->get_entries } ) {
431         # Overwrite the entry in head with the entry in trans_id
432         my $base = $entry
433           + $self->hash_size
434           + $self->byte_size;
435
436         my $head_loc = $self->storage->read_at( $base, $self->byte_size );
437         $head_loc = unpack( $StP{$self->byte_size}, $head_loc );
438
439         my $spot = $base + $self->byte_size + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE );
440         my $trans_loc = $self->storage->read_at(
441             $spot, $self->byte_size,
442         );
443
444         $self->storage->print_at( $base, $trans_loc );
445         $self->storage->print_at(
446             $spot,
447             pack( $StP{$self->byte_size} . ' ' . $StP{$STALE_SIZE}, (0) x 2 ),
448         );
449
450         if ( $head_loc > 1 ) {
451             $self->_load_sector( $head_loc )->free;
452         }
453     }
454
455     $self->clear_entries;
456
457     my @slots = $self->read_txn_slots;
458     $slots[$self->trans_id-1] = 0;
459     $self->write_txn_slots( @slots );
460     $self->inc_txn_staleness_counter( $self->trans_id );
461     $self->set_trans_id( 0 );
462
463     return 1;
464 }
465
466 sub read_txn_slots {
467     my $self = shift;
468     my $bl = $self->txn_bitfield_len;
469     my $num_bits = $bl * 8;
470     return split '', unpack( 'b'.$num_bits,
471         $self->storage->read_at(
472             $self->trans_loc, $bl,
473         )
474     );
475 }
476
477 sub write_txn_slots {
478     my $self = shift;
479     my $num_bits = $self->txn_bitfield_len * 8;
480     $self->storage->print_at( $self->trans_loc,
481         pack( 'b'.$num_bits, join('', @_) ),
482     );
483 }
484
485 sub get_running_txn_ids {
486     my $self = shift;
487     my @transactions = $self->read_txn_slots;
488     my @trans_ids = map { $_+1} grep { $transactions[$_] } 0 .. $#transactions;
489 }
490
491 sub get_txn_staleness_counter {
492     my $self = shift;
493     my ($trans_id) = @_;
494
495     # Hardcode staleness of 0 for the HEAD
496     return 0 unless $trans_id;
497
498     return unpack( $StP{$STALE_SIZE},
499         $self->storage->read_at(
500             $self->trans_loc + 4 + $STALE_SIZE * ($trans_id - 1),
501             4,
502         )
503     );
504 }
505
506 sub inc_txn_staleness_counter {
507     my $self = shift;
508     my ($trans_id) = @_;
509
510     # Hardcode staleness of 0 for the HEAD
511     return unless $trans_id;
512
513     $self->storage->print_at(
514         $self->trans_loc + 4 + $STALE_SIZE * ($trans_id - 1),
515         pack( $StP{$STALE_SIZE}, $self->get_txn_staleness_counter( $trans_id ) + 1 ),
516     );
517 }
518
519 sub get_entries {
520     my $self = shift;
521     return [ keys %{ $self->{entries}{$self->trans_id} ||= {} } ];
522 }
523
524 sub add_entry {
525     my $self = shift;
526     my ($trans_id, $loc) = @_;
527
528     $self->{entries}{$trans_id} ||= {};
529     $self->{entries}{$trans_id}{$loc} = undef;
530 }
531
532 # If the buckets are being relocated because of a reindexing, the entries
533 # mechanism needs to be made aware of it.
534 sub reindex_entry {
535     my $self = shift;
536     my ($old_loc, $new_loc) = @_;
537
538     TRANS:
539     while ( my ($trans_id, $locs) = each %{ $self->{entries} } ) {
540         foreach my $orig_loc ( keys %{ $locs } ) {
541             if ( $orig_loc == $old_loc ) {
542                 delete $locs->{orig_loc};
543                 $locs->{$new_loc} = undef;
544                 next TRANS;
545             }
546         }
547     }
548 }
549
550 sub clear_entries {
551     my $self = shift;
552     delete $self->{entries}{$self->trans_id};
553 }
554
555 ################################################################################
556
557 {
558     my $header_fixed = length( SIG_FILE ) + 1 + 4 + 4;
559     my $this_file_version = 2;
560
561     sub _write_file_header {
562         my $self = shift;
563
564         my $nt = $self->num_txns;
565         my $bl = $self->txn_bitfield_len;
566
567         my $header_var = 1 + 1 + 1 + 1 + $bl + $STALE_SIZE * ($nt - 1) + 3 * $self->byte_size;
568
569         my $loc = $self->storage->request_space( $header_fixed + $header_var );
570
571         $self->storage->print_at( $loc,
572             SIG_FILE,
573             SIG_HEADER,
574             pack('N', $this_file_version), # At this point, we're at 9 bytes
575             pack('N', $header_var),        # header size
576             # --- Above is $header_fixed. Below is $header_var
577             pack('C', $self->byte_size),
578
579             # These shenanigans are to allow a 256 within a C
580             pack('C', $self->max_buckets - 1),
581             pack('C', $self->data_sector_size - 1),
582
583             pack('C', $nt),
584             pack('C' . $bl, 0 ),                           # Transaction activeness bitfield
585             pack($StP{$STALE_SIZE}.($nt-1), 0 x ($nt-1) ), # Transaction staleness counters
586             pack($StP{$self->byte_size}, 0), # Start of free chain (blist size)
587             pack($StP{$self->byte_size}, 0), # Start of free chain (data size)
588             pack($StP{$self->byte_size}, 0), # Start of free chain (index size)
589         );
590
591         #XXX Set these less fragilely
592         $self->set_trans_loc( $header_fixed + 4 );
593         $self->set_chains_loc( $header_fixed + 4 + $bl + $STALE_SIZE * ($nt-1) );
594
595         return;
596     }
597
598     sub _read_file_header {
599         my $self = shift;
600
601         my $buffer = $self->storage->read_at( 0, $header_fixed );
602         return unless length($buffer);
603
604         my ($file_signature, $sig_header, $file_version, $size) = unpack(
605             'A4 A N N', $buffer
606         );
607
608         unless ( $file_signature eq SIG_FILE ) {
609             $self->storage->close;
610             DBM::Deep->_throw_error( "Signature not found -- file is not a Deep DB" );
611         }
612
613         unless ( $sig_header eq SIG_HEADER ) {
614             $self->storage->close;
615             DBM::Deep->_throw_error( "Pre-1.00 file version found" );
616         }
617
618         unless ( $file_version == $this_file_version ) {
619             $self->storage->close;
620             DBM::Deep->_throw_error(
621                 "Wrong file version found - " .  $file_version .
622                 " - expected " . $this_file_version
623             );
624         }
625
626         my $buffer2 = $self->storage->read_at( undef, $size );
627         my @values = unpack( 'C C C C', $buffer2 );
628
629         if ( @values != 4 || grep { !defined } @values ) {
630             $self->storage->close;
631             DBM::Deep->_throw_error("Corrupted file - bad header");
632         }
633
634         #XXX Add warnings if values weren't set right
635         @{$self}{qw(byte_size max_buckets data_sector_size num_txns)} = @values;
636
637         # These shenangians are to allow a 256 within a C
638         $self->{max_buckets} += 1;
639         $self->{data_sector_size} += 1;
640
641         my $bl = $self->txn_bitfield_len;
642
643         my $header_var = scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) + 3 * $self->byte_size;
644         unless ( $size == $header_var ) {
645             $self->storage->close;
646             DBM::Deep->_throw_error( "Unexpected size found ($size <-> $header_var)." );
647         }
648
649         $self->set_trans_loc( $header_fixed + scalar(@values) );
650         $self->set_chains_loc( $header_fixed + scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) );
651
652         return length($buffer) + length($buffer2);
653     }
654 }
655
656 sub _load_sector {
657     my $self = shift;
658     my ($offset) = @_;
659
660     # Add a catch for offset of 0 or 1
661     return if $offset <= 1;
662
663     my $type = $self->storage->read_at( $offset, 1 );
664     return if $type eq chr(0);
665
666     if ( $type eq $self->SIG_ARRAY || $type eq $self->SIG_HASH ) {
667         return DBM::Deep::Engine::Sector::Reference->new({
668             engine => $self,
669             type   => $type,
670             offset => $offset,
671         });
672     }
673     # XXX Don't we need key_md5 here?
674     elsif ( $type eq $self->SIG_BLIST ) {
675         return DBM::Deep::Engine::Sector::BucketList->new({
676             engine => $self,
677             type   => $type,
678             offset => $offset,
679         });
680     }
681     elsif ( $type eq $self->SIG_INDEX ) {
682         return DBM::Deep::Engine::Sector::Index->new({
683             engine => $self,
684             type   => $type,
685             offset => $offset,
686         });
687     }
688     elsif ( $type eq $self->SIG_NULL ) {
689         return DBM::Deep::Engine::Sector::Null->new({
690             engine => $self,
691             type   => $type,
692             offset => $offset,
693         });
694     }
695     elsif ( $type eq $self->SIG_DATA ) {
696         return DBM::Deep::Engine::Sector::Scalar->new({
697             engine => $self,
698             type   => $type,
699             offset => $offset,
700         });
701     }
702     # This was deleted from under us, so just return and let the caller figure it out.
703     elsif ( $type eq $self->SIG_FREE ) {
704         return;
705     }
706
707     DBM::Deep->_throw_error( "'$offset': Don't know what to do with type '$type'" );
708 }
709
710 sub _apply_digest {
711     my $self = shift;
712     return $self->{digest}->(@_);
713 }
714
715 sub _add_free_blist_sector { shift->_add_free_sector( 0, @_ ) }
716 sub _add_free_data_sector { shift->_add_free_sector( 1, @_ ) }
717 sub _add_free_index_sector { shift->_add_free_sector( 2, @_ ) }
718
719 sub _add_free_sector {
720     my $self = shift;
721     my ($multiple, $offset, $size) = @_;
722
723     my $chains_offset = $multiple * $self->byte_size;
724
725     my $storage = $self->storage;
726
727     # Increment staleness.
728     # XXX Can this increment+modulo be done by "&= 0x1" ?
729     my $staleness = unpack( $StP{$STALE_SIZE}, $storage->read_at( $offset + SIG_SIZE, $STALE_SIZE ) );
730     $staleness = ($staleness + 1 ) % ( 2 ** ( 8 * $STALE_SIZE ) );
731     $storage->print_at( $offset + SIG_SIZE, pack( $StP{$STALE_SIZE}, $staleness ) );
732
733     my $old_head = $storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
734
735     $storage->print_at( $self->chains_loc + $chains_offset,
736         pack( $StP{$self->byte_size}, $offset ),
737     );
738
739     # Record the old head in the new sector after the signature and staleness counter
740     $storage->print_at( $offset + SIG_SIZE + $STALE_SIZE, $old_head );
741 }
742
743 sub _request_blist_sector { shift->_request_sector( 0, @_ ) }
744 sub _request_data_sector { shift->_request_sector( 1, @_ ) }
745 sub _request_index_sector { shift->_request_sector( 2, @_ ) }
746
747 sub _request_sector {
748     my $self = shift;
749     my ($multiple, $size) = @_;
750
751     my $chains_offset = $multiple * $self->byte_size;
752
753     my $old_head = $self->storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
754     my $loc = unpack( $StP{$self->byte_size}, $old_head );
755
756     # We don't have any free sectors of the right size, so allocate a new one.
757     unless ( $loc ) {
758         my $offset = $self->storage->request_space( $size );
759
760         # Zero out the new sector. This also guarantees correct increases
761         # in the filesize.
762         $self->storage->print_at( $offset, chr(0) x $size );
763
764         return $offset;
765     }
766
767     # Read the new head after the signature and the staleness counter
768     my $new_head = $self->storage->read_at( $loc + SIG_SIZE + $STALE_SIZE, $self->byte_size );
769     $self->storage->print_at( $self->chains_loc + $chains_offset, $new_head );
770     $self->storage->print_at(
771         $loc + SIG_SIZE + $STALE_SIZE,
772         pack( $StP{$self->byte_size}, 0 ),
773     );
774
775     return $loc;
776 }
777
778 ################################################################################
779
780 sub storage     { $_[0]{storage} }
781 sub byte_size   { $_[0]{byte_size} }
782 sub hash_size   { $_[0]{hash_size} }
783 sub hash_chars  { $_[0]{hash_chars} }
784 sub num_txns    { $_[0]{num_txns} }
785 sub max_buckets { $_[0]{max_buckets} }
786 sub blank_md5   { chr(0) x $_[0]->hash_size }
787 sub data_sector_size { $_[0]{data_sector_size} }
788
789 # This is a calculated value
790 sub txn_bitfield_len {
791     my $self = shift;
792     unless ( exists $self->{txn_bitfield_len} ) {
793         my $temp = ($self->num_txns) / 8;
794         if ( $temp > int( $temp ) ) {
795             $temp = int( $temp ) + 1;
796         }
797         $self->{txn_bitfield_len} = $temp;
798     }
799     return $self->{txn_bitfield_len};
800 }
801
802 sub trans_id     { $_[0]{trans_id} }
803 sub set_trans_id { $_[0]{trans_id} = $_[1] }
804
805 sub trans_loc     { $_[0]{trans_loc} }
806 sub set_trans_loc { $_[0]{trans_loc} = $_[1] }
807
808 sub chains_loc     { $_[0]{chains_loc} }
809 sub set_chains_loc { $_[0]{chains_loc} = $_[1] }
810
811 ################################################################################
812
813 package DBM::Deep::Iterator;
814
815 sub new {
816     my $class = shift;
817     my ($args) = @_;
818
819     my $self = bless {
820         breadcrumbs => [],
821         engine      => $args->{engine},
822         base_offset => $args->{base_offset},
823     }, $class;
824
825     Scalar::Util::weaken( $self->{engine} );
826
827     return $self;
828 }
829
830 sub reset { $_[0]{breadcrumbs} = [] }
831
832 sub get_sector_iterator {
833     my $self = shift;
834     my ($loc) = @_;
835
836     my $sector = $self->{engine}->_load_sector( $loc )
837         or return;
838
839     if ( $sector->isa( 'DBM::Deep::Engine::Sector::Index' ) ) {
840         return DBM::Deep::Iterator::Index->new({
841             iterator => $self,
842             sector   => $sector,
843         });
844     }
845     elsif ( $sector->isa( 'DBM::Deep::Engine::Sector::BucketList' ) ) {
846         return DBM::Deep::Iterator::BucketList->new({
847             iterator => $self,
848             sector   => $sector,
849         });
850     }
851
852     DBM::Deep->_throw_error( "get_sector_iterator(): Why did $loc make a $sector?" );
853 }
854
855 sub get_next_key {
856     my $self = shift;
857     my ($obj) = @_;
858
859     my $crumbs = $self->{breadcrumbs};
860     my $e = $self->{engine};
861
862     unless ( @$crumbs ) {
863         # This will be a Reference sector
864         my $sector = $e->_load_sector( $self->{base_offset} )
865             # If no sector is found, thist must have been deleted from under us.
866             or return;
867
868         if ( $sector->staleness != $obj->_staleness ) {
869             return;
870         }
871
872         my $loc = $sector->get_blist_loc
873             or return;
874
875         push @$crumbs, $self->get_sector_iterator( $loc );
876     }
877
878     FIND_NEXT_KEY: {
879         # We're at the end.
880         unless ( @$crumbs ) {
881             $self->reset;
882             return;
883         }
884
885         my $iterator = $crumbs->[-1];
886
887         # This level is done.
888         if ( $iterator->at_end ) {
889             pop @$crumbs;
890             redo FIND_NEXT_KEY;
891         }
892
893         if ( $iterator->isa( 'DBM::Deep::Iterator::Index' ) ) {
894             # If we don't have any more, it will be caught at the
895             # prior check.
896             if ( my $next = $iterator->get_next_iterator ) {
897                 push @$crumbs, $next;
898             }
899             redo FIND_NEXT_KEY;
900         }
901
902         unless ( $iterator->isa( 'DBM::Deep::Iterator::BucketList' ) ) {
903             DBM::Deep->_throw_error(
904                 "Should have a bucketlist iterator here - instead have $iterator"
905             );
906         }
907
908         # At this point, we have a BucketList iterator
909         my $key = $iterator->get_next_key;
910         if ( defined $key ) {
911             return $key;
912         }
913         #XXX else { $iterator->set_to_end() } ?
914
915         # We hit the end of the bucketlist iterator, so redo
916         redo FIND_NEXT_KEY;
917     }
918
919     DBM::Deep->_throw_error( "get_next_key(): How did we get here?" );
920 }
921
922 package DBM::Deep::Iterator::Index;
923
924 sub new {
925     my $self = bless $_[1] => $_[0];
926     $self->{curr_index} = 0;
927     return $self;
928 }
929
930 sub at_end {
931     my $self = shift;
932     return $self->{curr_index} >= $self->{iterator}{engine}->hash_chars;
933 }
934
935 sub get_next_iterator {
936     my $self = shift;
937
938     my $loc;
939     while ( !$loc ) {
940         return if $self->at_end;
941         $loc = $self->{sector}->get_entry( $self->{curr_index}++ );
942     }
943
944     return $self->{iterator}->get_sector_iterator( $loc );
945 }
946
947 package DBM::Deep::Iterator::BucketList;
948
949 sub new {
950     my $self = bless $_[1] => $_[0];
951     $self->{curr_index} = 0;
952     return $self;
953 }
954
955 sub at_end {
956     my $self = shift;
957     return $self->{curr_index} >= $self->{iterator}{engine}->max_buckets;
958 }
959
960 sub get_next_key {
961     my $self = shift;
962
963     return if $self->at_end;
964
965     my $idx = $self->{curr_index}++;
966
967     my $data_loc = $self->{sector}->get_data_location_for({
968         allow_head => 1,
969         idx        => $idx,
970     }) or return;
971
972     #XXX Do we want to add corruption checks here?
973     return $self->{sector}->get_key_for( $idx )->data;
974 }
975
976 package DBM::Deep::Engine::Sector;
977
978 sub new {
979     my $self = bless $_[1], $_[0];
980     Scalar::Util::weaken( $self->{engine} );
981     $self->_init;
982     return $self;
983 }
984
985 #sub _init {}
986 #sub clone { DBM::Deep->_throw_error( "Must be implemented in the child class" ); }
987
988 sub engine { $_[0]{engine} }
989 sub offset { $_[0]{offset} }
990 sub type   { $_[0]{type} }
991
992 sub base_size {
993    my $self = shift;
994    return $self->engine->SIG_SIZE + $STALE_SIZE;
995 }
996
997 sub free {
998     my $self = shift;
999
1000     my $e = $self->engine;
1001
1002     $e->storage->print_at( $self->offset, $e->SIG_FREE );
1003     # Skip staleness counter
1004     $e->storage->print_at( $self->offset + $self->base_size,
1005         chr(0) x ($self->size - $self->base_size),
1006     );
1007
1008     my $free_meth = $self->free_meth;
1009     $e->$free_meth( $self->offset, $self->size );
1010
1011     return;
1012 }
1013
1014 package DBM::Deep::Engine::Sector::Data;
1015
1016 our @ISA = qw( DBM::Deep::Engine::Sector );
1017
1018 # This is in bytes
1019 sub size { $_[0]{engine}->data_sector_size }
1020 sub free_meth { return '_add_free_data_sector' }
1021
1022 sub clone {
1023     my $self = shift;
1024     return ref($self)->new({
1025         engine => $self->engine,
1026         type   => $self->type,
1027         data   => $self->data,
1028     });
1029 }
1030
1031 package DBM::Deep::Engine::Sector::Scalar;
1032
1033 our @ISA = qw( DBM::Deep::Engine::Sector::Data );
1034
1035 sub free {
1036     my $self = shift;
1037
1038     my $chain_loc = $self->chain_loc;
1039
1040     $self->SUPER::free();
1041
1042     if ( $chain_loc ) {
1043         $self->engine->_load_sector( $chain_loc )->free;
1044     }
1045
1046     return;
1047 }
1048
1049 sub type { $_[0]{engine}->SIG_DATA }
1050 sub _init {
1051     my $self = shift;
1052
1053     my $engine = $self->engine;
1054
1055     unless ( $self->offset ) {
1056         my $data_section = $self->size - $self->base_size - $engine->byte_size - 1;
1057
1058         $self->{offset} = $engine->_request_data_sector( $self->size );
1059
1060         my $data = delete $self->{data};
1061         my $dlen = length $data;
1062         my $continue = 1;
1063         my $curr_offset = $self->offset;
1064         while ( $continue ) {
1065
1066             my $next_offset = 0;
1067
1068             my ($leftover, $this_len, $chunk);
1069             if ( $dlen > $data_section ) {
1070                 $leftover = 0;
1071                 $this_len = $data_section;
1072                 $chunk = substr( $data, 0, $this_len );
1073
1074                 $dlen -= $data_section;
1075                 $next_offset = $engine->_request_data_sector( $self->size );
1076                 $data = substr( $data, $this_len );
1077             }
1078             else {
1079                 $leftover = $data_section - $dlen;
1080                 $this_len = $dlen;
1081                 $chunk = $data;
1082
1083                 $continue = 0;
1084             }
1085
1086             $engine->storage->print_at( $curr_offset, $self->type ); # Sector type
1087             # Skip staleness
1088             $engine->storage->print_at( $curr_offset + $self->base_size,
1089                 pack( $StP{$engine->byte_size}, $next_offset ),  # Chain loc
1090                 pack( $StP{1}, $this_len ),                      # Data length
1091                 $chunk,                                          # Data to be stored in this sector
1092                 chr(0) x $leftover,                              # Zero-fill the rest
1093             );
1094
1095             $curr_offset = $next_offset;
1096         }
1097
1098         return;
1099     }
1100 }
1101
1102 sub data_length {
1103     my $self = shift;
1104
1105     my $buffer = $self->engine->storage->read_at(
1106         $self->offset + $self->base_size + $self->engine->byte_size, 1
1107     );
1108
1109     return unpack( $StP{1}, $buffer );
1110 }
1111
1112 sub chain_loc {
1113     my $self = shift;
1114     return unpack(
1115         $StP{$self->engine->byte_size},
1116         $self->engine->storage->read_at(
1117             $self->offset + $self->base_size,
1118             $self->engine->byte_size,
1119         ),
1120     );
1121 }
1122
1123 sub data {
1124     my $self = shift;
1125
1126     my $data;
1127     while ( 1 ) {
1128         my $chain_loc = $self->chain_loc;
1129
1130         $data .= $self->engine->storage->read_at(
1131             $self->offset + $self->base_size + $self->engine->byte_size + 1, $self->data_length,
1132         );
1133
1134         last unless $chain_loc;
1135
1136         $self = $self->engine->_load_sector( $chain_loc );
1137     }
1138
1139     return $data;
1140 }
1141
1142 package DBM::Deep::Engine::Sector::Null;
1143
1144 our @ISA = qw( DBM::Deep::Engine::Sector::Data );
1145
1146 sub type { $_[0]{engine}->SIG_NULL }
1147 sub data_length { 0 }
1148 sub data { return }
1149
1150 sub _init {
1151     my $self = shift;
1152
1153     my $engine = $self->engine;
1154
1155     unless ( $self->offset ) {
1156         my $leftover = $self->size - $self->base_size - 1 * $engine->byte_size - 1;
1157
1158         $self->{offset} = $engine->_request_data_sector( $self->size );
1159         $engine->storage->print_at( $self->offset, $self->type ); # Sector type
1160         # Skip staleness counter
1161         $engine->storage->print_at( $self->offset + $self->base_size,
1162             pack( $StP{$engine->byte_size}, 0 ),  # Chain loc
1163             pack( $StP{1}, $self->data_length ),  # Data length
1164             chr(0) x $leftover,                   # Zero-fill the rest
1165         );
1166
1167         return;
1168     }
1169 }
1170
1171 package DBM::Deep::Engine::Sector::Reference;
1172
1173 our @ISA = qw( DBM::Deep::Engine::Sector::Data );
1174
1175 sub _init {
1176     my $self = shift;
1177
1178     my $e = $self->engine;
1179
1180     unless ( $self->offset ) {
1181         my $classname = Scalar::Util::blessed( delete $self->{data} );
1182         my $leftover = $self->size - $self->base_size - 2 * $e->byte_size;
1183
1184         my $class_offset = 0;
1185         if ( defined $classname ) {
1186             my $class_sector = DBM::Deep::Engine::Sector::Scalar->new({
1187                 engine => $e,
1188                 data   => $classname,
1189             });
1190             $class_offset = $class_sector->offset;
1191         }
1192
1193         $self->{offset} = $e->_request_data_sector( $self->size );
1194         $e->storage->print_at( $self->offset, $self->type ); # Sector type
1195         # Skip staleness counter
1196         $e->storage->print_at( $self->offset + $self->base_size,
1197             pack( $StP{$e->byte_size}, 0 ),             # Index/BList loc
1198             pack( $StP{$e->byte_size}, $class_offset ), # Classname loc
1199             chr(0) x $leftover,                         # Zero-fill the rest
1200         );
1201     }
1202     else {
1203         $self->{type} = $e->storage->read_at( $self->offset, 1 );
1204     }
1205
1206     $self->{staleness} = unpack(
1207         $StP{$STALE_SIZE},
1208         $e->storage->read_at( $self->offset + $e->SIG_SIZE, $STALE_SIZE ),
1209     );
1210
1211     return;
1212 }
1213
1214 sub free {
1215     my $self = shift;
1216
1217     my $blist_loc = $self->get_blist_loc;
1218     $self->engine->_load_sector( $blist_loc )->free if $blist_loc;
1219
1220     my $class_loc = $self->get_class_offset;
1221     $self->engine->_load_sector( $class_loc )->free if $class_loc;
1222
1223     $self->SUPER::free();
1224 }
1225
1226 sub staleness { $_[0]{staleness} }
1227
1228 sub get_data_for {
1229     my $self = shift;
1230     my ($args) = @_;
1231
1232     # Assume that the head is not allowed unless otherwise specified.
1233     $args->{allow_head} = 0 unless exists $args->{allow_head};
1234
1235     # Assume we don't create a new blist location unless otherwise specified.
1236     $args->{create} = 0 unless exists $args->{create};
1237
1238     my $blist = $self->get_bucket_list({
1239         key_md5 => $args->{key_md5},
1240         key => $args->{key},
1241         create  => $args->{create},
1242     });
1243     return unless $blist && $blist->{found};
1244
1245     # At this point, $blist knows where the md5 is. What it -doesn't- know yet
1246     # is whether or not this transaction has this key. That's part of the next
1247     # function call.
1248     my $location = $blist->get_data_location_for({
1249         allow_head => $args->{allow_head},
1250     }) or return;
1251
1252     return $self->engine->_load_sector( $location );
1253 }
1254
1255 sub write_data {
1256     my $self = shift;
1257     my ($args) = @_;
1258
1259     my $blist = $self->get_bucket_list({
1260         key_md5 => $args->{key_md5},
1261         key => $args->{key},
1262         create  => 1,
1263     }) or DBM::Deep->_throw_error( "How did write_data fail (no blist)?!" );
1264
1265     # Handle any transactional bookkeeping.
1266     if ( $self->engine->trans_id ) {
1267         if ( ! $blist->has_md5 ) {
1268             $blist->mark_deleted({
1269                 trans_id => 0,
1270             });
1271         }
1272     }
1273     else {
1274         my @trans_ids = $self->engine->get_running_txn_ids;
1275         if ( $blist->has_md5 ) {
1276             if ( @trans_ids ) {
1277                 my $old_value = $blist->get_data_for;
1278                 foreach my $other_trans_id ( @trans_ids ) {
1279                     next if $blist->get_data_location_for({
1280                         trans_id   => $other_trans_id,
1281                         allow_head => 0,
1282                     });
1283                     $blist->write_md5({
1284                         trans_id => $other_trans_id,
1285                         key      => $args->{key},
1286                         key_md5  => $args->{key_md5},
1287                         value    => $old_value->clone,
1288                     });
1289                 }
1290             }
1291         }
1292         else {
1293             if ( @trans_ids ) {
1294                 foreach my $other_trans_id ( @trans_ids ) {
1295                     #XXX This doesn't seem to possible to ever happen . . .
1296                     next if $blist->get_data_location_for({ trans_id => $other_trans_id, allow_head => 0 });
1297                     $blist->mark_deleted({
1298                         trans_id => $other_trans_id,
1299                     });
1300                 }
1301             }
1302         }
1303     }
1304
1305     #XXX Is this safe to do transactionally?
1306     # Free the place we're about to write to.
1307     if ( $blist->get_data_location_for({ allow_head => 0 }) ) {
1308         $blist->get_data_for({ allow_head => 0 })->free;
1309     }
1310
1311     $blist->write_md5({
1312         key      => $args->{key},
1313         key_md5  => $args->{key_md5},
1314         value    => $args->{value},
1315     });
1316 }
1317
1318 sub delete_key {
1319     my $self = shift;
1320     my ($args) = @_;
1321
1322     # XXX What should happen if this fails?
1323     my $blist = $self->get_bucket_list({
1324         key_md5 => $args->{key_md5},
1325     }) or DBM::Deep->_throw_error( "How did delete_key fail (no blist)?!" );
1326
1327     # Save the location so that we can free the data
1328     my $location = $blist->get_data_location_for({
1329         allow_head => 0,
1330     });
1331     my $old_value = $location && $self->engine->_load_sector( $location );
1332
1333     my @trans_ids = $self->engine->get_running_txn_ids;
1334
1335     if ( $self->engine->trans_id == 0 ) {
1336         if ( @trans_ids ) {
1337             foreach my $other_trans_id ( @trans_ids ) {
1338                 next if $blist->get_data_location_for({ trans_id => $other_trans_id, allow_head => 0 });
1339                 $blist->write_md5({
1340                     trans_id => $other_trans_id,
1341                     key      => $args->{key},
1342                     key_md5  => $args->{key_md5},
1343                     value    => $old_value->clone,
1344                 });
1345             }
1346         }
1347     }
1348
1349     my $data;
1350     if ( @trans_ids ) {
1351         $blist->mark_deleted( $args );
1352
1353         if ( $old_value ) {
1354             $data = $old_value->data;
1355             $old_value->free;
1356         }
1357     }
1358     else {
1359         $data = $blist->delete_md5( $args );
1360     }
1361
1362     return $data;
1363 }
1364
1365 sub get_blist_loc {
1366     my $self = shift;
1367
1368     my $e = $self->engine;
1369     my $blist_loc = $e->storage->read_at( $self->offset + $self->base_size, $e->byte_size );
1370     return unpack( $StP{$e->byte_size}, $blist_loc );
1371 }
1372
1373 sub get_bucket_list {
1374     my $self = shift;
1375     my ($args) = @_;
1376     $args ||= {};
1377
1378     # XXX Add in check here for recycling?
1379
1380     my $engine = $self->engine;
1381
1382     my $blist_loc = $self->get_blist_loc;
1383
1384     # There's no index or blist yet
1385     unless ( $blist_loc ) {
1386         return unless $args->{create};
1387
1388         my $blist = DBM::Deep::Engine::Sector::BucketList->new({
1389             engine  => $engine,
1390             key_md5 => $args->{key_md5},
1391         });
1392
1393         $engine->storage->print_at( $self->offset + $self->base_size,
1394             pack( $StP{$engine->byte_size}, $blist->offset ),
1395         );
1396
1397         return $blist;
1398     }
1399
1400     my $sector = $engine->_load_sector( $blist_loc )
1401         or DBM::Deep->_throw_error( "Cannot read sector at $blist_loc in get_bucket_list()" );
1402     my $i = 0;
1403     my $last_sector = undef;
1404     while ( $sector->isa( 'DBM::Deep::Engine::Sector::Index' ) ) {
1405         $blist_loc = $sector->get_entry( ord( substr( $args->{key_md5}, $i++, 1 ) ) );
1406         $last_sector = $sector;
1407         if ( $blist_loc ) {
1408             $sector = $engine->_load_sector( $blist_loc )
1409                 or DBM::Deep->_throw_error( "Cannot read sector at $blist_loc in get_bucket_list()" );
1410         }
1411         else {
1412             $sector = undef;
1413             last;
1414         }
1415     }
1416
1417     # This means we went through the Index sector(s) and found an empty slot
1418     unless ( $sector ) {
1419         return unless $args->{create};
1420
1421         DBM::Deep->_throw_error( "No last_sector when attempting to build a new entry" )
1422             unless $last_sector;
1423
1424         my $blist = DBM::Deep::Engine::Sector::BucketList->new({
1425             engine  => $engine,
1426             key_md5 => $args->{key_md5},
1427         });
1428
1429         $last_sector->set_entry( ord( substr( $args->{key_md5}, $i - 1, 1 ) ) => $blist->offset );
1430
1431         return $blist;
1432     }
1433
1434     $sector->find_md5( $args->{key_md5} );
1435
1436     # See whether or not we need to reindex the bucketlist
1437     if ( !$sector->has_md5 && $args->{create} && $sector->{idx} == -1 ) {
1438         my $new_index = DBM::Deep::Engine::Sector::Index->new({
1439             engine => $engine,
1440         });
1441
1442         my %blist_cache;
1443         #XXX q.v. the comments for this function.
1444         foreach my $entry ( $sector->chopped_up ) {
1445             my ($spot, $md5) = @{$entry};
1446             my $idx = ord( substr( $md5, $i, 1 ) );
1447
1448             # XXX This is inefficient
1449             my $blist = $blist_cache{$idx}
1450                 ||= DBM::Deep::Engine::Sector::BucketList->new({
1451                     engine => $engine,
1452                 });
1453
1454             $new_index->set_entry( $idx => $blist->offset );
1455
1456             my $new_spot = $blist->write_at_next_open( $md5 );
1457             $engine->reindex_entry( $spot => $new_spot );
1458         }
1459
1460         # Handle the new item separately.
1461         {
1462             my $idx = ord( substr( $args->{key_md5}, $i, 1 ) );
1463             my $blist = $blist_cache{$idx}
1464                 ||= DBM::Deep::Engine::Sector::BucketList->new({
1465                     engine => $engine,
1466                 });
1467
1468             $new_index->set_entry( $idx => $blist->offset );
1469
1470             #XXX THIS IS HACKY!
1471             $blist->find_md5( $args->{key_md5} );
1472             $blist->write_md5({
1473                 key     => $args->{key},
1474                 key_md5 => $args->{key_md5},
1475                 value   => DBM::Deep::Engine::Sector::Null->new({
1476                     engine => $engine,
1477                     data   => undef,
1478                 }),
1479             });
1480         }
1481
1482         if ( $last_sector ) {
1483             $last_sector->set_entry(
1484                 ord( substr( $args->{key_md5}, $i - 1, 1 ) ),
1485                 $new_index->offset,
1486             );
1487         } else {
1488             $engine->storage->print_at( $self->offset + $self->base_size,
1489                 pack( $StP{$engine->byte_size}, $new_index->offset ),
1490             );
1491         }
1492
1493         $sector->free;
1494
1495         $sector = $blist_cache{ ord( substr( $args->{key_md5}, $i, 1 ) ) };
1496         $sector->find_md5( $args->{key_md5} );
1497     }
1498
1499     return $sector;
1500 }
1501
1502 sub get_class_offset {
1503     my $self = shift;
1504
1505     my $e = $self->engine;
1506     return unpack(
1507         $StP{$e->byte_size},
1508         $e->storage->read_at(
1509             $self->offset + $self->base_size + 1 * $e->byte_size, $e->byte_size,
1510         ),
1511     );
1512 }
1513
1514 sub get_classname {
1515     my $self = shift;
1516
1517     my $class_offset = $self->get_class_offset;
1518
1519     return unless $class_offset;
1520
1521     return $self->engine->_load_sector( $class_offset )->data;
1522 }
1523
1524 #XXX Add singleton handling here
1525 sub data {
1526     my $self = shift;
1527
1528     my $new_obj = DBM::Deep->new({
1529         type        => $self->type,
1530         base_offset => $self->offset,
1531         staleness   => $self->staleness,
1532         storage     => $self->engine->storage,
1533         engine      => $self->engine,
1534     });
1535
1536     if ( $self->engine->storage->{autobless} ) {
1537         my $classname = $self->get_classname;
1538         if ( defined $classname ) {
1539             bless $new_obj, $classname;
1540         }
1541     }
1542
1543     return $new_obj;
1544 }
1545
1546 package DBM::Deep::Engine::Sector::BucketList;
1547
1548 our @ISA = qw( DBM::Deep::Engine::Sector );
1549
1550 sub _init {
1551     my $self = shift;
1552
1553     my $engine = $self->engine;
1554
1555     unless ( $self->offset ) {
1556         my $leftover = $self->size - $self->base_size;
1557
1558         $self->{offset} = $engine->_request_blist_sector( $self->size );
1559         $engine->storage->print_at( $self->offset, $engine->SIG_BLIST ); # Sector type
1560         # Skip staleness counter
1561         $engine->storage->print_at( $self->offset + $self->base_size,
1562             chr(0) x $leftover, # Zero-fill the data
1563         );
1564     }
1565
1566     if ( $self->{key_md5} ) {
1567         $self->find_md5;
1568     }
1569
1570     return $self;
1571 }
1572
1573 sub size {
1574     my $self = shift;
1575     unless ( $self->{size} ) {
1576         my $e = $self->engine;
1577         # Base + numbuckets * bucketsize
1578         $self->{size} = $self->base_size + $e->max_buckets * $self->bucket_size;
1579     }
1580     return $self->{size};
1581 }
1582
1583 sub free_meth { return '_add_free_blist_sector' }
1584
1585 sub bucket_size {
1586     my $self = shift;
1587     unless ( $self->{bucket_size} ) {
1588         my $e = $self->engine;
1589         # Key + head (location) + transactions (location + staleness-counter)
1590         my $location_size = $e->byte_size + $e->byte_size + ($e->num_txns - 1) * ($e->byte_size + $STALE_SIZE);
1591         $self->{bucket_size} = $e->hash_size + $location_size;
1592     }
1593     return $self->{bucket_size};
1594 }
1595
1596 # XXX This is such a poor hack. I need to rethink this code.
1597 sub chopped_up {
1598     my $self = shift;
1599
1600     my $e = $self->engine;
1601
1602     my @buckets;
1603     foreach my $idx ( 0 .. $e->max_buckets - 1 ) {
1604         my $spot = $self->offset + $self->base_size + $idx * $self->bucket_size;
1605         my $md5 = $e->storage->read_at( $spot, $e->hash_size );
1606
1607         #XXX If we're chopping, why would we ever have the blank_md5?
1608         last if $md5 eq $e->blank_md5;
1609
1610         my $rest = $e->storage->read_at( undef, $self->bucket_size - $e->hash_size );
1611         push @buckets, [ $spot, $md5 . $rest ];
1612     }
1613
1614     return @buckets;
1615 }
1616
1617 sub write_at_next_open {
1618     my $self = shift;
1619     my ($entry) = @_;
1620
1621     #XXX This is such a hack!
1622     $self->{_next_open} = 0 unless exists $self->{_next_open};
1623
1624     my $spot = $self->offset + $self->base_size + $self->{_next_open}++ * $self->bucket_size;
1625     $self->engine->storage->print_at( $spot, $entry );
1626
1627     return $spot;
1628 }
1629
1630 sub has_md5 {
1631     my $self = shift;
1632     unless ( exists $self->{found} ) {
1633         $self->find_md5;
1634     }
1635     return $self->{found};
1636 }
1637
1638 sub find_md5 {
1639     my $self = shift;
1640
1641     $self->{found} = undef;
1642     $self->{idx}   = -1;
1643
1644     if ( @_ ) {
1645         $self->{key_md5} = shift;
1646     }
1647
1648     # If we don't have an MD5, then what are we supposed to do?
1649     unless ( exists $self->{key_md5} ) {
1650         DBM::Deep->_throw_error( "Cannot find_md5 without a key_md5 set" );
1651     }
1652
1653     my $e = $self->engine;
1654     foreach my $idx ( 0 .. $e->max_buckets - 1 ) {
1655         my $potential = $e->storage->read_at(
1656             $self->offset + $self->base_size + $idx * $self->bucket_size, $e->hash_size,
1657         );
1658
1659         if ( $potential eq $e->blank_md5 ) {
1660             $self->{idx} = $idx;
1661             return;
1662         }
1663
1664         if ( $potential eq $self->{key_md5} ) {
1665             $self->{found} = 1;
1666             $self->{idx} = $idx;
1667             return;
1668         }
1669     }
1670
1671     return;
1672 }
1673
1674 sub write_md5 {
1675     my $self = shift;
1676     my ($args) = @_;
1677
1678     DBM::Deep->_throw_error( "write_md5: no key" ) unless exists $args->{key};
1679     DBM::Deep->_throw_error( "write_md5: no key_md5" ) unless exists $args->{key_md5};
1680     DBM::Deep->_throw_error( "write_md5: no value" ) unless exists $args->{value};
1681
1682     my $engine = $self->engine;
1683
1684     $args->{trans_id} = $engine->trans_id unless exists $args->{trans_id};
1685
1686     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
1687     $engine->add_entry( $args->{trans_id}, $spot );
1688
1689     unless ($self->{found}) {
1690         my $key_sector = DBM::Deep::Engine::Sector::Scalar->new({
1691             engine => $engine,
1692             data   => $args->{key},
1693         });
1694
1695         $engine->storage->print_at( $spot,
1696             $args->{key_md5},
1697             pack( $StP{$engine->byte_size}, $key_sector->offset ),
1698         );
1699     }
1700
1701     my $loc = $spot
1702       + $engine->hash_size
1703       + $engine->byte_size;
1704
1705     if ( $args->{trans_id} ) {
1706         $loc += $engine->byte_size + ($args->{trans_id} - 1) * ( $engine->byte_size + $STALE_SIZE );
1707
1708         $engine->storage->print_at( $loc,
1709             pack( $StP{$engine->byte_size}, $args->{value}->offset ),
1710             pack( $StP{$STALE_SIZE}, $engine->get_txn_staleness_counter( $args->{trans_id} ) ),
1711         );
1712     }
1713     else {
1714         $engine->storage->print_at( $loc,
1715             pack( $StP{$engine->byte_size}, $args->{value}->offset ),
1716         );
1717     }
1718 }
1719
1720 sub mark_deleted {
1721     my $self = shift;
1722     my ($args) = @_;
1723     $args ||= {};
1724
1725     my $engine = $self->engine;
1726
1727     $args->{trans_id} = $engine->trans_id unless exists $args->{trans_id};
1728
1729     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
1730     $engine->add_entry( $args->{trans_id}, $spot );
1731
1732     my $loc = $spot
1733       + $engine->hash_size
1734       + $engine->byte_size;
1735
1736     if ( $args->{trans_id} ) {
1737         $loc += $engine->byte_size + ($args->{trans_id} - 1) * ( $engine->byte_size + $STALE_SIZE );
1738
1739         $engine->storage->print_at( $loc,
1740             pack( $StP{$engine->byte_size}, 1 ), # 1 is the marker for deleted
1741             pack( $StP{$STALE_SIZE}, $engine->get_txn_staleness_counter( $args->{trans_id} ) ),
1742         );
1743     }
1744     else {
1745         $engine->storage->print_at( $loc,
1746             pack( $StP{$engine->byte_size}, 1 ), # 1 is the marker for deleted
1747         );
1748     }
1749
1750 }
1751
1752 sub delete_md5 {
1753     my $self = shift;
1754     my ($args) = @_;
1755
1756     my $engine = $self->engine;
1757     return undef unless $self->{found};
1758
1759     # Save the location so that we can free the data
1760     my $location = $self->get_data_location_for({
1761         allow_head => 0,
1762     });
1763     my $key_sector = $self->get_key_for;
1764
1765     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
1766     $engine->storage->print_at( $spot,
1767         $engine->storage->read_at(
1768             $spot + $self->bucket_size,
1769             $self->bucket_size * ( $engine->max_buckets - $self->{idx} - 1 ),
1770         ),
1771         chr(0) x $self->bucket_size,
1772     );
1773
1774     $key_sector->free;
1775
1776     my $data_sector = $self->engine->_load_sector( $location );
1777     my $data = $data_sector->data;
1778     $data_sector->free;
1779
1780     return $data;
1781 }
1782
1783 sub get_data_location_for {
1784     my $self = shift;
1785     my ($args) = @_;
1786     $args ||= {};
1787
1788     $args->{allow_head} = 0 unless exists $args->{allow_head};
1789     $args->{trans_id}   = $self->engine->trans_id unless exists $args->{trans_id};
1790     $args->{idx}        = $self->{idx} unless exists $args->{idx};
1791
1792     my $e = $self->engine;
1793
1794     my $spot = $self->offset + $self->base_size
1795       + $args->{idx} * $self->bucket_size
1796       + $e->hash_size
1797       + $e->byte_size;
1798
1799     if ( $args->{trans_id} ) {
1800         $spot += $e->byte_size + ($args->{trans_id} - 1) * ( $e->byte_size + $STALE_SIZE );
1801     }
1802
1803     my $buffer = $e->storage->read_at(
1804         $spot,
1805         $e->byte_size + $STALE_SIZE,
1806     );
1807     my ($loc, $staleness) = unpack( $StP{$e->byte_size} . ' ' . $StP{$STALE_SIZE}, $buffer );
1808
1809     if ( $args->{trans_id} ) {
1810         # We have found an entry that is old, so get rid of it
1811         if ( $staleness != (my $s = $e->get_txn_staleness_counter( $args->{trans_id} ) ) ) {
1812             $e->storage->print_at(
1813                 $spot,
1814                 pack( $StP{$e->byte_size} . ' ' . $StP{$STALE_SIZE}, (0) x 2 ), 
1815             );
1816             $loc = 0;
1817         }
1818     }
1819
1820     # If we're in a transaction and we never wrote to this location, try the
1821     # HEAD instead.
1822     if ( $args->{trans_id} && !$loc && $args->{allow_head} ) {
1823         return $self->get_data_location_for({
1824             trans_id   => 0,
1825             allow_head => 1,
1826             idx        => $args->{idx},
1827         });
1828     }
1829     return $loc <= 1 ? 0 : $loc;
1830 }
1831
1832 sub get_data_for {
1833     my $self = shift;
1834     my ($args) = @_;
1835     $args ||= {};
1836
1837     return unless $self->{found};
1838     my $location = $self->get_data_location_for({
1839         allow_head => $args->{allow_head},
1840     });
1841     return $self->engine->_load_sector( $location );
1842 }
1843
1844 sub get_key_for {
1845     my $self = shift;
1846     my ($idx) = @_;
1847     $idx = $self->{idx} unless defined $idx;
1848
1849     if ( $idx >= $self->engine->max_buckets ) {
1850         DBM::Deep->_throw_error( "get_key_for(): Attempting to retrieve $idx" );
1851     }
1852
1853     my $location = $self->engine->storage->read_at(
1854         $self->offset + $self->base_size + $idx * $self->bucket_size + $self->engine->hash_size,
1855         $self->engine->byte_size,
1856     );
1857     $location = unpack( $StP{$self->engine->byte_size}, $location );
1858     DBM::Deep->_throw_error( "get_key_for: No location?" ) unless $location;
1859
1860     return $self->engine->_load_sector( $location );
1861 }
1862
1863 package DBM::Deep::Engine::Sector::Index;
1864
1865 our @ISA = qw( DBM::Deep::Engine::Sector );
1866
1867 sub _init {
1868     my $self = shift;
1869
1870     my $engine = $self->engine;
1871
1872     unless ( $self->offset ) {
1873         my $leftover = $self->size - $self->base_size;
1874
1875         $self->{offset} = $engine->_request_index_sector( $self->size );
1876         $engine->storage->print_at( $self->offset, $engine->SIG_INDEX ); # Sector type
1877         # Skip staleness counter
1878         $engine->storage->print_at( $self->offset + $self->base_size,
1879             chr(0) x $leftover, # Zero-fill the rest
1880         );
1881     }
1882
1883     return $self;
1884 }
1885
1886 #XXX Change here
1887 sub size {
1888     my $self = shift;
1889     unless ( $self->{size} ) {
1890         my $e = $self->engine;
1891         $self->{size} = $self->base_size + $e->byte_size * $e->hash_chars;
1892     }
1893     return $self->{size};
1894 }
1895
1896 sub free_meth { return '_add_free_index_sector' }
1897
1898 sub free {
1899     my $self = shift;
1900     my $e = $self->engine;
1901
1902     for my $i ( 0 .. $e->hash_chars - 1 ) {
1903         my $l = $self->get_entry( $i ) or next;
1904         $e->_load_sector( $l )->free;
1905     }
1906
1907     $self->SUPER::free();
1908 }
1909
1910 sub _loc_for {
1911     my $self = shift;
1912     my ($idx) = @_;
1913     return $self->offset + $self->base_size + $idx * $self->engine->byte_size;
1914 }
1915
1916 sub get_entry {
1917     my $self = shift;
1918     my ($idx) = @_;
1919
1920     my $e = $self->engine;
1921
1922     DBM::Deep->_throw_error( "get_entry: Out of range ($idx)" )
1923         if $idx < 0 || $idx >= $e->hash_chars;
1924
1925     return unpack(
1926         $StP{$e->byte_size},
1927         $e->storage->read_at( $self->_loc_for( $idx ), $e->byte_size ),
1928     );
1929 }
1930
1931 sub set_entry {
1932     my $self = shift;
1933     my ($idx, $loc) = @_;
1934
1935     my $e = $self->engine;
1936
1937     DBM::Deep->_throw_error( "set_entry: Out of range ($idx)" )
1938         if $idx < 0 || $idx >= $e->hash_chars;
1939
1940     $self->engine->storage->print_at(
1941         $self->_loc_for( $idx ),
1942         pack( $StP{$e->byte_size}, $loc ),
1943     );
1944 }
1945
1946 1;
1947 __END__