3f61cc8e0ab226adf9fc4421c1bbb1b3fefbee25
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 use 5.006_000;
4
5 use strict;
6 use warnings FATAL => 'all';
7
8 # Never import symbols into our namespace. We are a class, not a library.
9 # -RobK, 2008-05-27
10 use Scalar::Util ();
11
12 #use Data::Dumper ();
13
14 # File-wide notes:
15 # * Every method in here assumes that the storage has been appropriately
16 #   safeguarded. This can be anything from flock() to some sort of manual
17 #   mutex. But, it's the caller's responsability to make sure that this has
18 #   been done.
19
20 # Setup file and tag signatures.  These should never change.
21 sub SIG_FILE     () { 'DPDB' }
22 sub SIG_HEADER   () { 'h'    }
23 sub SIG_HASH     () { 'H'    }
24 sub SIG_ARRAY    () { 'A'    }
25 sub SIG_NULL     () { 'N'    }
26 sub SIG_DATA     () { 'D'    }
27 sub SIG_INDEX    () { 'I'    }
28 sub SIG_BLIST    () { 'B'    }
29 sub SIG_FREE     () { 'F'    }
30 sub SIG_SIZE     () {  1     }
31
32 my $STALE_SIZE = 2;
33
34 # Please refer to the pack() documentation for further information
35 my %StP = (
36     1 => 'C', # Unsigned char value (no order needed as it's just one byte)
37     2 => 'n', # Unsigned short in "network" (big-endian) order
38     4 => 'N', # Unsigned long in "network" (big-endian) order
39     8 => 'Q', # Usigned quad (no order specified, presumably machine-dependent)
40 );
41
42 =head1 NAME
43
44 DBM::Deep::Engine
45
46 =head1 PURPOSE
47
48 This is an internal-use-only object for L<DBM::Deep/>. It mediates the low-level
49 mapping between the L<DBM::Deep/> objects and the storage medium.
50
51 The purpose of this documentation is to provide low-level documentation for
52 developers. It is B<not> intended to be used by the general public. This
53 documentation and what it documents can and will change without notice.
54
55 =head1 OVERVIEW
56
57 The engine exposes an API to the DBM::Deep objects (DBM::Deep, DBM::Deep::Array,
58 and DBM::Deep::Hash) for their use to access the actual stored values. This API
59 is the following:
60
61 =over 4
62
63 =item * new
64
65 =item * read_value
66
67 =item * get_classname
68
69 =item * make_reference
70
71 =item * key_exists
72
73 =item * delete_key
74
75 =item * write_value
76
77 =item * get_next_key
78
79 =item * setup_fh
80
81 =item * begin_work
82
83 =item * commit
84
85 =item * rollback
86
87 =item * lock_exclusive
88
89 =item * lock_shared
90
91 =item * unlock
92
93 =back
94
95 They are explained in their own sections below. These methods, in turn, may
96 provide some bounds-checking, but primarily act to instantiate objects in the
97 Engine::Sector::* hierarchy and dispatch to them.
98
99 =head1 TRANSACTIONS
100
101 Transactions in DBM::Deep are implemented using a variant of MVCC. This attempts
102 to keep the amount of actual work done against the file low while stil providing
103 Atomicity, Consistency, and Isolation. Durability, unfortunately, cannot be done
104 with only one file.
105
106 =head2 STALENESS
107
108 If another process uses a transaction slot and writes stuff to it, then terminates,
109 the data that process wrote it still within the file. In order to address this,
110 there is also a transaction staleness counter associated within every write.
111 Each time a transaction is started, that process increments that transaction's
112 staleness counter. If, when it reads a value, the staleness counters aren't
113 identical, DBM::Deep will consider the value on disk to be stale and discard it.
114
115 =head2 DURABILITY
116
117 The fourth leg of ACID is Durability, the guarantee that when a commit returns,
118 the data will be there the next time you read from it. This should be regardless
119 of any crashes or powerdowns in between the commit and subsequent read. DBM::Deep
120 does provide that guarantee; once the commit returns, all of the data has been
121 transferred from the transaction shadow to the HEAD. The issue arises with partial
122 commits - a commit that is interrupted in some fashion. In keeping with DBM::Deep's
123 "tradition" of very light error-checking and non-existent error-handling, there is
124 no way to recover from a partial commit. (This is probably a failure in Consistency
125 as well as Durability.)
126
127 Other DBMSes use transaction logs (a separate file, generally) to achieve Durability.
128 As DBM::Deep is a single-file, we would have to do something similar to what SQLite
129 and BDB do in terms of committing using synchonized writes. To do this, we would have
130 to use a much higher RAM footprint and some serious programming that make my head
131 hurts just to think about it.
132
133 =head1 EXTERNAL METHODS
134
135 =cut
136
137 ################################################################################
138
139 =head2 new()
140
141 This takes a set of args. These args are described in the documentation for
142 L<DBM::Deep/new>.
143
144 =cut
145
146 sub new {
147     my $class = shift;
148     my ($args) = @_;
149
150     $args->{storage} = DBM::Deep::File->new( $args )
151         unless exists $args->{storage};
152
153     my $self = bless {
154         byte_size   => 4,
155
156         digest      => undef,
157         hash_size   => 16,  # In bytes
158         hash_chars  => 256, # Number of chars the algorithm uses per byte
159         max_buckets => 16,
160         num_txns    => 1,   # The HEAD
161         trans_id    => 0,   # Default to the HEAD
162
163         data_sector_size => 64, # Size in bytes of each data sector
164
165         entries => {}, # This is the list of entries for transactions
166         storage => undef,
167     }, $class;
168
169     # Never allow byte_size to be set directly.
170     delete $args->{byte_size};
171     if ( defined $args->{pack_size} ) {
172         if ( lc $args->{pack_size} eq 'small' ) {
173             $args->{byte_size} = 2;
174         }
175         elsif ( lc $args->{pack_size} eq 'medium' ) {
176             $args->{byte_size} = 4;
177         }
178         elsif ( lc $args->{pack_size} eq 'large' ) {
179             $args->{byte_size} = 8;
180         }
181         else {
182             DBM::Deep->_throw_error( "Unknown pack_size value: '$args->{pack_size}'" );
183         }
184     }
185
186     # Grab the parameters we want to use
187     foreach my $param ( keys %$self ) {
188         next unless exists $args->{$param};
189         $self->{$param} = $args->{$param};
190     }
191
192     my %validations = (
193         max_buckets      => { floor => 16, ceil => 256 },
194         num_txns         => { floor => 1,  ceil => 255 },
195         data_sector_size => { floor => 32, ceil => 256 },
196     );
197
198     while ( my ($attr, $c) = each %validations ) {
199         if (   !defined $self->{$attr}
200             || !length $self->{$attr}
201             || $self->{$attr} =~ /\D/
202             || $self->{$attr} < $c->{floor}
203         ) {
204             $self->{$attr} = '(undef)' if !defined $self->{$attr};
205             warn "Floor of $attr is $c->{floor}. Setting it to $c->{floor} from '$self->{$attr}'\n";
206             $self->{$attr} = $c->{floor};
207         }
208         elsif ( $self->{$attr} > $c->{ceil} ) {
209             warn "Ceiling of $attr is $c->{ceil}. Setting it to $c->{ceil} from '$self->{$attr}'\n";
210             $self->{$attr} = $c->{ceil};
211         }
212     }
213
214     if ( !$self->{digest} ) {
215         require Digest::MD5;
216         $self->{digest} = \&Digest::MD5::md5;
217     }
218
219     return $self;
220 }
221
222 ################################################################################
223
224 =head2 read_value( $obj, $key )
225
226 This takes an object that provides _base_offset() and a string. It returns the
227 value stored in the corresponding Sector::Value's data section.
228
229 =cut
230
231 sub read_value {
232     my $self = shift;
233     my ($obj, $key) = @_;
234
235     # This will be a Reference sector
236     my $sector = $self->_load_sector( $obj->_base_offset )
237         or return;
238
239     if ( $sector->staleness != $obj->_staleness ) {
240         return;
241     }
242
243     my $key_md5 = $self->_apply_digest( $key );
244
245     my $value_sector = $sector->get_data_for({
246         key_md5    => $key_md5,
247         allow_head => 1,
248     });
249
250     unless ( $value_sector ) {
251         $value_sector = DBM::Deep::Engine::Sector::Null->new({
252             engine => $self,
253             data   => undef,
254         });
255
256         $sector->write_data({
257             key_md5 => $key_md5,
258             key     => $key,
259             value   => $value_sector,
260         });
261     }
262
263     return $value_sector->data;
264 }
265
266 =head2 get_classname( $obj )
267
268 This takes an object that provides _base_offset() and returns the classname (if any)
269 associated with it.
270
271 It delegates to Sector::Reference::get_classname() for the heavy lifting.
272
273 It performs a staleness check.
274
275 =cut
276
277 sub get_classname {
278     my $self = shift;
279     my ($obj) = @_;
280
281     # This will be a Reference sector
282     my $sector = $self->_load_sector( $obj->_base_offset )
283         or DBM::Deep->_throw_error( "How did get_classname fail (no sector for '$obj')?!" );
284
285     if ( $sector->staleness != $obj->_staleness ) {
286         return;
287     }
288
289     return $sector->get_classname;
290 }
291
292 =head2 make_reference( $obj, $old_key, $new_key )
293
294 This takes an object that provides _base_offset() and two strings. The
295 strings correspond to the old key and new key, respectively. This operation
296 is equivalent to (given C<< $db->{foo} = []; >>) C<< $db->{bar} = $db->{foo}; >>.
297
298 This returns nothing.
299
300 =cut
301
302 sub make_reference {
303     my $self = shift;
304     my ($obj, $old_key, $new_key) = @_;
305
306     # This will be a Reference sector
307     my $sector = $self->_load_sector( $obj->_base_offset )
308         or DBM::Deep->_throw_error( "How did make_reference fail (no sector for '$obj')?!" );
309
310     if ( $sector->staleness != $obj->_staleness ) {
311         return;
312     }
313
314     my $old_md5 = $self->_apply_digest( $old_key );
315
316     my $value_sector = $sector->get_data_for({
317         key_md5    => $old_md5,
318         allow_head => 1,
319     });
320
321     unless ( $value_sector ) {
322         $value_sector = DBM::Deep::Engine::Sector::Null->new({
323             engine => $self,
324             data   => undef,
325         });
326
327         $sector->write_data({
328             key_md5 => $old_md5,
329             key     => $old_key,
330             value   => $value_sector,
331         });
332     }
333
334     if ( $value_sector->isa( 'DBM::Deep::Engine::Sector::Reference' ) ) {
335         $sector->write_data({
336             key     => $new_key,
337             key_md5 => $self->_apply_digest( $new_key ),
338             value   => $value_sector,
339         });
340         $value_sector->increment_refcount;
341     }
342     else {
343         $sector->write_data({
344             key     => $new_key,
345             key_md5 => $self->_apply_digest( $new_key ),
346             value   => $value_sector->clone,
347         });
348     }
349
350     return;
351 }
352
353 =head2 key_exists( $obj, $key )
354
355 This takes an object that provides _base_offset() and a string for
356 the key to be checked. This returns 1 for true and "" for false.
357
358 =cut
359
360 sub key_exists {
361     my $self = shift;
362     my ($obj, $key) = @_;
363
364     # This will be a Reference sector
365     my $sector = $self->_load_sector( $obj->_base_offset )
366         or return '';
367
368     if ( $sector->staleness != $obj->_staleness ) {
369         return '';
370     }
371
372     my $data = $sector->get_data_for({
373         key_md5    => $self->_apply_digest( $key ),
374         allow_head => 1,
375     });
376
377     # exists() returns 1 or '' for true/false.
378     return $data ? 1 : '';
379 }
380
381 =head2 delete_key( $obj, $key )
382
383 This takes an object that provides _base_offset() and a string for
384 the key to be deleted. This returns the result of the Sector::Reference
385 delete_key() method.
386
387 =cut
388
389 sub delete_key {
390     my $self = shift;
391     my ($obj, $key) = @_;
392
393     my $sector = $self->_load_sector( $obj->_base_offset )
394         or return;
395
396     if ( $sector->staleness != $obj->_staleness ) {
397         return;
398     }
399
400     return $sector->delete_key({
401         key_md5    => $self->_apply_digest( $key ),
402         allow_head => 0,
403     });
404 }
405
406 =head2 write_value( $obj, $key, $value )
407
408 This takes an object that provides _base_offset(), a string for the
409 key, and a value. This value can be anything storable within L<DBM::Deep/>.
410
411 This returns 1 upon success.
412
413 =cut
414
415 sub write_value {
416     my $self = shift;
417     my ($obj, $key, $value) = @_;
418
419     my $r = Scalar::Util::reftype( $value ) || '';
420     {
421         last if $r eq '';
422         last if $r eq 'HASH';
423         last if $r eq 'ARRAY';
424
425         DBM::Deep->_throw_error(
426             "Storage of references of type '$r' is not supported."
427         );
428     }
429
430     # This will be a Reference sector
431     my $sector = $self->_load_sector( $obj->_base_offset )
432         or DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep." );
433
434     if ( $sector->staleness != $obj->_staleness ) {
435         DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep." );
436     }
437
438     my ($class, $type);
439     if ( !defined $value ) {
440         $class = 'DBM::Deep::Engine::Sector::Null';
441     }
442     elsif ( $r eq 'ARRAY' || $r eq 'HASH' ) {
443         my $tmpvar;
444         if ( $r eq 'ARRAY' ) {
445             $tmpvar = tied @$value;
446         } elsif ( $r eq 'HASH' ) {
447             $tmpvar = tied %$value;
448         }
449
450         if ( $tmpvar ) {
451             my $is_dbm_deep = eval { local $SIG{'__DIE__'}; $tmpvar->isa( 'DBM::Deep' ); };
452
453             unless ( $is_dbm_deep ) {
454                 DBM::Deep->_throw_error( "Cannot store something that is tied." );
455             }
456
457             unless ( $tmpvar->_engine->storage == $self->storage ) {
458                 DBM::Deep->_throw_error( "Cannot store values across DBM::Deep files. Please use export() instead." );
459             }
460
461             # First, verify if we're storing the same thing to this spot. If we are, then
462             # this should be a no-op. -EJS, 2008-05-19
463             my $loc = $sector->get_data_location_for({
464                 key_md5 => $self->_apply_digest( $key ),
465                 allow_head => 1,
466             });
467
468             if ( defined($loc) && $loc == $tmpvar->_base_offset ) {
469                 return 1;
470             }
471
472             #XXX Can this use $loc?
473             my $value_sector = $self->_load_sector( $tmpvar->_base_offset );
474             $sector->write_data({
475                 key     => $key,
476                 key_md5 => $self->_apply_digest( $key ),
477                 value   => $value_sector,
478             });
479             $value_sector->increment_refcount;
480
481             return 1;
482         }
483
484         $class = 'DBM::Deep::Engine::Sector::Reference';
485         $type = substr( $r, 0, 1 );
486     }
487     else {
488         if ( tied($value) ) {
489             DBM::Deep->_throw_error( "Cannot store something that is tied." );
490         }
491         $class = 'DBM::Deep::Engine::Sector::Scalar';
492     }
493
494     # Create this after loading the reference sector in case something bad happens.
495     # This way, we won't allocate value sector(s) needlessly.
496     my $value_sector = $class->new({
497         engine => $self,
498         data   => $value,
499         type   => $type,
500     });
501
502     $sector->write_data({
503         key     => $key,
504         key_md5 => $self->_apply_digest( $key ),
505         value   => $value_sector,
506     });
507
508     # This code is to make sure we write all the values in the $value to the disk
509     # and to make sure all changes to $value after the assignment are reflected
510     # on disk. This may be counter-intuitive at first, but it is correct dwimmery.
511     #   NOTE - simply tying $value won't perform a STORE on each value. Hence, the
512     # copy to a temp value.
513     if ( $r eq 'ARRAY' ) {
514         my @temp = @$value;
515         tie @$value, 'DBM::Deep', {
516             base_offset => $value_sector->offset,
517             staleness   => $value_sector->staleness,
518             storage     => $self->storage,
519             engine      => $self,
520         };
521         @$value = @temp;
522         bless $value, 'DBM::Deep::Array' unless Scalar::Util::blessed( $value );
523     }
524     elsif ( $r eq 'HASH' ) {
525         my %temp = %$value;
526         tie %$value, 'DBM::Deep', {
527             base_offset => $value_sector->offset,
528             staleness   => $value_sector->staleness,
529             storage     => $self->storage,
530             engine      => $self,
531         };
532
533         %$value = %temp;
534         bless $value, 'DBM::Deep::Hash' unless Scalar::Util::blessed( $value );
535     }
536
537     return 1;
538 }
539
540 =head2 get_next_key( $obj, $prev_key )
541
542 This takes an object that provides _base_offset() and an optional string
543 representing the prior key returned via a prior invocation of this method.
544
545 This method delegates to C<< DBM::Deep::Iterator->get_next_key() >>.
546
547 =cut
548
549 # XXX Add staleness here
550 sub get_next_key {
551     my $self = shift;
552     my ($obj, $prev_key) = @_;
553
554     # XXX Need to add logic about resetting the iterator if any key in the reference has changed
555     unless ( $prev_key ) {
556         $obj->{iterator} = DBM::Deep::Iterator->new({
557             base_offset => $obj->_base_offset,
558             engine      => $self,
559         });
560     }
561
562     return $obj->{iterator}->get_next_key( $obj );
563 }
564
565 ################################################################################
566
567 =head2 setup_fh( $obj )
568
569 This takes an object that provides _base_offset(). It will do everything needed
570 in order to properly initialize all values for necessary functioning. If this is
571 called upon an already initialized object, this will also reset the inode.
572
573 This returns 1.
574
575 =cut
576
577 sub setup_fh {
578     my $self = shift;
579     my ($obj) = @_;
580
581     # We're opening the file.
582     unless ( $obj->_base_offset ) {
583         my $bytes_read = $self->_read_file_header;
584
585         # Creating a new file
586         unless ( $bytes_read ) {
587             $self->_write_file_header;
588
589             # 1) Create Array/Hash entry
590             my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
591                 engine => $self,
592                 type   => $obj->_type,
593             });
594             $obj->{base_offset} = $initial_reference->offset;
595             $obj->{staleness} = $initial_reference->staleness;
596
597             $self->storage->flush;
598         }
599         # Reading from an existing file
600         else {
601             $obj->{base_offset} = $bytes_read;
602             my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
603                 engine => $self,
604                 offset => $obj->_base_offset,
605             });
606             unless ( $initial_reference ) {
607                 DBM::Deep->_throw_error("Corrupted file, no master index record");
608             }
609
610             unless ($obj->_type eq $initial_reference->type) {
611                 DBM::Deep->_throw_error("File type mismatch");
612             }
613
614             $obj->{staleness} = $initial_reference->staleness;
615         }
616     }
617
618     $self->storage->set_inode;
619
620     return 1;
621 }
622
623 =head2 begin_work( $obj )
624
625 This takes an object that provides _base_offset(). It will set up all necessary
626 bookkeeping in order to run all work within a transaction.
627
628 If $obj is already within a transaction, an error wiill be thrown. If there are
629 no more available transactions, an error will be thrown.
630
631 This returns undef.
632
633 =cut
634
635 sub begin_work {
636     my $self = shift;
637     my ($obj) = @_;
638
639     if ( $self->trans_id ) {
640         DBM::Deep->_throw_error( "Cannot begin_work within an active transaction" );
641     }
642
643     my @slots = $self->read_txn_slots;
644     my $found;
645     for my $i ( 0 .. $#slots ) {
646         next if $slots[$i];
647
648         $slots[$i] = 1;
649         $self->set_trans_id( $i + 1 );
650         $found = 1;
651         last;
652     }
653     unless ( $found ) {
654         DBM::Deep->_throw_error( "Cannot allocate transaction ID" );
655     }
656     $self->write_txn_slots( @slots );
657
658     if ( !$self->trans_id ) {
659         DBM::Deep->_throw_error( "Cannot begin_work - no available transactions" );
660     }
661
662     return;
663 }
664
665 =head2 rollback( $obj )
666
667 This takes an object that provides _base_offset(). It will revert all
668 actions taken within the running transaction.
669
670 If $obj is not within a transaction, an error will be thrown.
671
672 This returns 1.
673
674 =cut
675
676 sub rollback {
677     my $self = shift;
678     my ($obj) = @_;
679
680     if ( !$self->trans_id ) {
681         DBM::Deep->_throw_error( "Cannot rollback without an active transaction" );
682     }
683
684     # Each entry is the file location for a bucket that has a modification for
685     # this transaction. The entries need to be expunged.
686     foreach my $entry (@{ $self->get_entries } ) {
687         # Remove the entry here
688         my $read_loc = $entry
689           + $self->hash_size
690           + $self->byte_size
691           + $self->byte_size
692           + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE );
693
694         my $data_loc = $self->storage->read_at( $read_loc, $self->byte_size );
695         $data_loc = unpack( $StP{$self->byte_size}, $data_loc );
696         $self->storage->print_at( $read_loc, pack( $StP{$self->byte_size}, 0 ) );
697
698         if ( $data_loc > 1 ) {
699             $self->_load_sector( $data_loc )->free;
700         }
701     }
702
703     $self->clear_entries;
704
705     my @slots = $self->read_txn_slots;
706     $slots[$self->trans_id-1] = 0;
707     $self->write_txn_slots( @slots );
708     $self->inc_txn_staleness_counter( $self->trans_id );
709     $self->set_trans_id( 0 );
710
711     return 1;
712 }
713
714 =head2 commit( $obj )
715
716 This takes an object that provides _base_offset(). It will apply all
717 actions taken within the transaction to the HEAD.
718
719 If $obj is not within a transaction, an error will be thrown.
720
721 This returns 1.
722
723 =cut
724
725 sub commit {
726     my $self = shift;
727     my ($obj) = @_;
728
729     if ( !$self->trans_id ) {
730         DBM::Deep->_throw_error( "Cannot commit without an active transaction" );
731     }
732
733     foreach my $entry (@{ $self->get_entries } ) {
734         # Overwrite the entry in head with the entry in trans_id
735         my $base = $entry
736           + $self->hash_size
737           + $self->byte_size;
738
739         my $head_loc = $self->storage->read_at( $base, $self->byte_size );
740         $head_loc = unpack( $StP{$self->byte_size}, $head_loc );
741
742         my $spot = $base + $self->byte_size + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE );
743         my $trans_loc = $self->storage->read_at(
744             $spot, $self->byte_size,
745         );
746
747         $self->storage->print_at( $base, $trans_loc );
748         $self->storage->print_at(
749             $spot,
750             pack( $StP{$self->byte_size} . ' ' . $StP{$STALE_SIZE}, (0) x 2 ),
751         );
752
753         if ( $head_loc > 1 ) {
754             $self->_load_sector( $head_loc )->free;
755         }
756     }
757
758     $self->clear_entries;
759
760     my @slots = $self->read_txn_slots;
761     $slots[$self->trans_id-1] = 0;
762     $self->write_txn_slots( @slots );
763     $self->inc_txn_staleness_counter( $self->trans_id );
764     $self->set_trans_id( 0 );
765
766     return 1;
767 }
768
769 =head2 lock_exclusive()
770
771 This takes an object that provides _base_offset(). It will guarantee that
772 the storage has taken precautions to be safe for a write.
773
774 This returns nothing.
775
776 =cut
777
778 sub lock_exclusive {
779     my $self = shift;
780     my ($obj) = @_;
781     return $self->storage->lock_exclusive( $obj );
782 }
783
784 =head2 lock_shared()
785
786 This takes an object that provides _base_offset(). It will guarantee that
787 the storage has taken precautions to be safe for a read.
788
789 This returns nothing.
790
791 =cut
792
793 sub lock_shared {
794     my $self = shift;
795     my ($obj) = @_;
796     return $self->storage->lock_shared( $obj );
797 }
798
799 =head2 unlock()
800
801 This takes an object that provides _base_offset(). It will guarantee that
802 the storage has released all locks taken.
803
804 This returns nothing.
805
806 =cut
807
808 sub unlock {
809     my $self = shift;
810     my ($obj) = @_;
811
812     my $rv = $self->storage->unlock( $obj );
813
814     $self->flush if $rv;
815
816     return $rv;
817 }
818
819 ################################################################################
820
821 =head1 INTERNAL METHODS
822
823 The following methods are internal-use-only to DBM::Deep::Engine.
824
825 =cut
826
827 =head2 read_txn_slots()
828
829 This takes no arguments.
830
831 This will return an array with a 1 or 0 in each slot. Each spot represents one
832 available transaction. If the slot is 1, that transaction is taken. If it is 0,
833 the transaction is available.
834
835 =cut
836
837 sub read_txn_slots {
838     my $self = shift;
839     my $bl = $self->txn_bitfield_len;
840     my $num_bits = $bl * 8;
841     return split '', unpack( 'b'.$num_bits,
842         $self->storage->read_at(
843             $self->trans_loc, $bl,
844         )
845     );
846 }
847
848 =head2 write_txn_slots( @slots )
849
850 This takes an array of 1's and 0's. This array represents the transaction slots
851 returned by L</read_txn_slots()>. In other words, the following is true:
852
853   @x = read_txn_slots( write_txn_slots( @x ) );
854
855 (With the obviously missing object referents added back in.)
856
857 =cut
858
859 sub write_txn_slots {
860     my $self = shift;
861     my $num_bits = $self->txn_bitfield_len * 8;
862     $self->storage->print_at( $self->trans_loc,
863         pack( 'b'.$num_bits, join('', @_) ),
864     );
865 }
866
867 =head2 get_running_txn_ids()
868
869 This takes no arguments.
870
871 This will return an array of taken transaction IDs. This wraps L</read_txn_slots()>.
872
873 =cut
874
875 sub get_running_txn_ids {
876     my $self = shift;
877     my @transactions = $self->read_txn_slots;
878     my @trans_ids = map { $_+1 } grep { $transactions[$_] } 0 .. $#transactions;
879 }
880
881 =head2 get_txn_staleness_counter( $trans_id )
882
883 This will return the staleness counter for the given transaction ID. Please see
884 L</TRANSACTION STALENESS> for more information.
885
886 =cut
887
888 sub get_txn_staleness_counter {
889     my $self = shift;
890     my ($trans_id) = @_;
891
892     # Hardcode staleness of 0 for the HEAD
893     return 0 unless $trans_id;
894
895     return unpack( $StP{$STALE_SIZE},
896         $self->storage->read_at(
897             $self->trans_loc + $self->txn_bitfield_len + $STALE_SIZE * ($trans_id - 1),
898             $STALE_SIZE,
899         )
900     );
901 }
902
903 =head2 inc_txn_staleness_counter( $trans_id )
904
905 This will increment the staleness counter for the given transaction ID. Please see
906 L</TRANSACTION STALENESS> for more information.
907
908 =cut
909
910 sub inc_txn_staleness_counter {
911     my $self = shift;
912     my ($trans_id) = @_;
913
914     # Hardcode staleness of 0 for the HEAD
915     return 0 unless $trans_id;
916
917     $self->storage->print_at(
918         $self->trans_loc + $self->txn_bitfield_len + $STALE_SIZE * ($trans_id - 1),
919         pack( $StP{$STALE_SIZE}, $self->get_txn_staleness_counter( $trans_id ) + 1 ),
920     );
921 }
922
923 =head2 get_entries()
924
925 This takes no arguments.
926
927 This returns a list of all the sectors that have been modified by this transaction.
928
929 =cut
930
931 sub get_entries {
932     my $self = shift;
933     return [ keys %{ $self->{entries}{$self->trans_id} ||= {} } ];
934 }
935
936 =head2 add_entry( $trans_id, $location )
937
938 This takes a transaction ID and a file location and marks the sector at that location
939 as having been modified by the transaction identified by $trans_id.
940
941 This returns nothing.
942
943 B<NOTE>: Unlike all the other _entries() methods, there are several cases where
944 C<< $trans_id != $self->trans_id >> for this method.
945
946 =cut
947
948 sub add_entry {
949     my $self = shift;
950     my ($trans_id, $loc) = @_;
951
952     $self->{entries}{$trans_id} ||= {};
953     $self->{entries}{$trans_id}{$loc} = undef;
954 }
955
956 =head2 reindex_entry( $old_loc, $new_loc )
957
958 This takes two locations (old and new, respectively). If a location that has been
959 modified by this transaction is subsequently reindexed due to a bucketlist
960 overflowing, then the entries hash needs to be made aware of this change.
961
962 This returns nothing.
963
964 =cut
965
966 sub reindex_entry {
967     my $self = shift;
968     my ($old_loc, $new_loc) = @_;
969
970     TRANS:
971     while ( my ($trans_id, $locs) = each %{ $self->{entries} } ) {
972         if ( exists $locs->{$old_loc} ) {
973             delete $locs->{$old_loc};
974             $locs->{$new_loc} = undef;
975             next TRANS;
976         }
977     }
978 }
979
980 =head2 clear_entries()
981
982 This takes no arguments. It will clear the entries list for the running transaction.
983
984 This returns nothing.
985
986 =cut
987
988 sub clear_entries {
989     my $self = shift;
990     delete $self->{entries}{$self->trans_id};
991 }
992
993 ################################################################################
994
995 =head2 _write_file_header()
996
997 This writes the file header for a new file. This will write the various settings
998 that set how the file is interpreted.
999
1000 =head2 _read_file_header()
1001
1002 This reads the file header from an existing file. This will read the various
1003 settings that set how the file is interpreted.
1004
1005 =cut
1006
1007 {
1008     my $header_fixed = length( SIG_FILE ) + 1 + 4 + 4;
1009     my $this_file_version = 3;
1010
1011     sub _write_file_header {
1012         my $self = shift;
1013
1014         my $nt = $self->num_txns;
1015         my $bl = $self->txn_bitfield_len;
1016
1017         my $header_var = 1 + 1 + 1 + 1 + $bl + $STALE_SIZE * ($nt - 1) + 3 * $self->byte_size;
1018
1019         my $loc = $self->storage->request_space( $header_fixed + $header_var );
1020
1021         $self->storage->print_at( $loc,
1022             SIG_FILE,
1023             SIG_HEADER,
1024             pack('N', $this_file_version), # At this point, we're at 9 bytes
1025             pack('N', $header_var),        # header size
1026             # --- Above is $header_fixed. Below is $header_var
1027             pack('C', $self->byte_size),
1028
1029             # These shenanigans are to allow a 256 within a C
1030             pack('C', $self->max_buckets - 1),
1031             pack('C', $self->data_sector_size - 1),
1032
1033             pack('C', $nt),
1034             pack('C' . $bl, 0 ),                           # Transaction activeness bitfield
1035             pack($StP{$STALE_SIZE}.($nt-1), 0 x ($nt-1) ), # Transaction staleness counters
1036             pack($StP{$self->byte_size}, 0), # Start of free chain (blist size)
1037             pack($StP{$self->byte_size}, 0), # Start of free chain (data size)
1038             pack($StP{$self->byte_size}, 0), # Start of free chain (index size)
1039         );
1040
1041         #XXX Set these less fragilely
1042         $self->set_trans_loc( $header_fixed + 4 );
1043         $self->set_chains_loc( $header_fixed + 4 + $bl + $STALE_SIZE * ($nt-1) );
1044
1045         return;
1046     }
1047
1048     sub _read_file_header {
1049         my $self = shift;
1050
1051         my $buffer = $self->storage->read_at( 0, $header_fixed );
1052         return unless length($buffer);
1053
1054         my ($file_signature, $sig_header, $file_version, $size) = unpack(
1055             'A4 A N N', $buffer
1056         );
1057
1058         unless ( $file_signature eq SIG_FILE ) {
1059             $self->storage->close;
1060             DBM::Deep->_throw_error( "Signature not found -- file is not a Deep DB" );
1061         }
1062
1063         unless ( $sig_header eq SIG_HEADER ) {
1064             $self->storage->close;
1065             DBM::Deep->_throw_error( "Pre-1.00 file version found" );
1066         }
1067
1068         unless ( $file_version == $this_file_version ) {
1069             $self->storage->close;
1070             DBM::Deep->_throw_error(
1071                 "Wrong file version found - " .  $file_version .
1072                 " - expected " . $this_file_version
1073             );
1074         }
1075
1076         my $buffer2 = $self->storage->read_at( undef, $size );
1077         my @values = unpack( 'C C C C', $buffer2 );
1078
1079         if ( @values != 4 || grep { !defined } @values ) {
1080             $self->storage->close;
1081             DBM::Deep->_throw_error("Corrupted file - bad header");
1082         }
1083
1084         #XXX Add warnings if values weren't set right
1085         @{$self}{qw(byte_size max_buckets data_sector_size num_txns)} = @values;
1086
1087         # These shenangians are to allow a 256 within a C
1088         $self->{max_buckets} += 1;
1089         $self->{data_sector_size} += 1;
1090
1091         my $bl = $self->txn_bitfield_len;
1092
1093         my $header_var = scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) + 3 * $self->byte_size;
1094         unless ( $size == $header_var ) {
1095             $self->storage->close;
1096             DBM::Deep->_throw_error( "Unexpected size found ($size <-> $header_var)." );
1097         }
1098
1099         $self->set_trans_loc( $header_fixed + scalar(@values) );
1100         $self->set_chains_loc( $header_fixed + scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) );
1101
1102         return length($buffer) + length($buffer2);
1103     }
1104 }
1105
1106 =head2 _load_sector( $offset )
1107
1108 This will instantiate and return the sector object that represents the data found
1109 at $offset.
1110
1111 =cut
1112
1113 sub _load_sector {
1114     my $self = shift;
1115     my ($offset) = @_;
1116
1117     # Add a catch for offset of 0 or 1
1118     return if !$offset || $offset <= 1;
1119
1120     my $type = $self->storage->read_at( $offset, 1 );
1121     return if $type eq chr(0);
1122
1123     if ( $type eq $self->SIG_ARRAY || $type eq $self->SIG_HASH ) {
1124         return DBM::Deep::Engine::Sector::Reference->new({
1125             engine => $self,
1126             type   => $type,
1127             offset => $offset,
1128         });
1129     }
1130     # XXX Don't we need key_md5 here?
1131     elsif ( $type eq $self->SIG_BLIST ) {
1132         return DBM::Deep::Engine::Sector::BucketList->new({
1133             engine => $self,
1134             type   => $type,
1135             offset => $offset,
1136         });
1137     }
1138     elsif ( $type eq $self->SIG_INDEX ) {
1139         return DBM::Deep::Engine::Sector::Index->new({
1140             engine => $self,
1141             type   => $type,
1142             offset => $offset,
1143         });
1144     }
1145     elsif ( $type eq $self->SIG_NULL ) {
1146         return DBM::Deep::Engine::Sector::Null->new({
1147             engine => $self,
1148             type   => $type,
1149             offset => $offset,
1150         });
1151     }
1152     elsif ( $type eq $self->SIG_DATA ) {
1153         return DBM::Deep::Engine::Sector::Scalar->new({
1154             engine => $self,
1155             type   => $type,
1156             offset => $offset,
1157         });
1158     }
1159     # This was deleted from under us, so just return and let the caller figure it out.
1160     elsif ( $type eq $self->SIG_FREE ) {
1161         return;
1162     }
1163
1164     DBM::Deep->_throw_error( "'$offset': Don't know what to do with type '$type'" );
1165 }
1166
1167 =head2 _apply_digest( @stuff )
1168
1169 This will apply the digest methd (default to Digest::MD5::md5) to the arguments
1170 passed in and return the result.
1171
1172 =cut
1173
1174 sub _apply_digest {
1175     my $self = shift;
1176     return $self->{digest}->(@_);
1177 }
1178
1179 =head2 _add_free_blist_sector( $offset, $size )
1180
1181 =head2 _add_free_data_sector( $offset, $size )
1182
1183 =head2 _add_free_index_sector( $offset, $size )
1184
1185 These methods are all wrappers around _add_free_sector(), providing the proper
1186 chain offset ($multiple) for the sector type.
1187
1188 =cut
1189
1190 sub _add_free_blist_sector { shift->_add_free_sector( 0, @_ ) }
1191 sub _add_free_data_sector { shift->_add_free_sector( 1, @_ ) }
1192 sub _add_free_index_sector { shift->_add_free_sector( 2, @_ ) }
1193
1194 =head2 _add_free_sector( $multiple, $offset, $size )
1195
1196 _add_free_sector() takes the offset into the chains location, the offset of the
1197 sector, and the size of that sector. It will mark the sector as a free sector
1198 and put it into the list of sectors that are free of this type for use later.
1199
1200 This returns nothing.
1201
1202 B<NOTE>: $size is unused?
1203
1204 =cut
1205
1206 sub _add_free_sector {
1207     my $self = shift;
1208     my ($multiple, $offset, $size) = @_;
1209
1210     my $chains_offset = $multiple * $self->byte_size;
1211
1212     my $storage = $self->storage;
1213
1214     # Increment staleness.
1215     # XXX Can this increment+modulo be done by "&= 0x1" ?
1216     my $staleness = unpack( $StP{$STALE_SIZE}, $storage->read_at( $offset + SIG_SIZE, $STALE_SIZE ) );
1217     $staleness = ($staleness + 1 ) % ( 2 ** ( 8 * $STALE_SIZE ) );
1218     $storage->print_at( $offset + SIG_SIZE, pack( $StP{$STALE_SIZE}, $staleness ) );
1219
1220     my $old_head = $storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
1221
1222     $storage->print_at( $self->chains_loc + $chains_offset,
1223         pack( $StP{$self->byte_size}, $offset ),
1224     );
1225
1226     # Record the old head in the new sector after the signature and staleness counter
1227     $storage->print_at( $offset + SIG_SIZE + $STALE_SIZE, $old_head );
1228 }
1229
1230 =head2 _request_blist_sector( $size )
1231
1232 =head2 _request_data_sector( $size )
1233
1234 =head2 _request_index_sector( $size )
1235
1236 These methods are all wrappers around _request_sector(), providing the proper
1237 chain offset ($multiple) for the sector type.
1238
1239 =cut
1240
1241 sub _request_blist_sector { shift->_request_sector( 0, @_ ) }
1242 sub _request_data_sector { shift->_request_sector( 1, @_ ) }
1243 sub _request_index_sector { shift->_request_sector( 2, @_ ) }
1244
1245 =head2 _request_sector( $multiple $size )
1246
1247 This takes the offset into the chains location and the size of that sector.
1248
1249 This returns the object with the sector. If there is an available free sector of
1250 that type, then it will be reused. If there isn't one, then a new one will be
1251 allocated.
1252
1253 =cut
1254
1255 sub _request_sector {
1256     my $self = shift;
1257     my ($multiple, $size) = @_;
1258
1259     my $chains_offset = $multiple * $self->byte_size;
1260
1261     my $old_head = $self->storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
1262     my $loc = unpack( $StP{$self->byte_size}, $old_head );
1263
1264     # We don't have any free sectors of the right size, so allocate a new one.
1265     unless ( $loc ) {
1266         my $offset = $self->storage->request_space( $size );
1267
1268         # Zero out the new sector. This also guarantees correct increases
1269         # in the filesize.
1270         $self->storage->print_at( $offset, chr(0) x $size );
1271
1272         return $offset;
1273     }
1274
1275     # Read the new head after the signature and the staleness counter
1276     my $new_head = $self->storage->read_at( $loc + SIG_SIZE + $STALE_SIZE, $self->byte_size );
1277     $self->storage->print_at( $self->chains_loc + $chains_offset, $new_head );
1278     $self->storage->print_at(
1279         $loc + SIG_SIZE + $STALE_SIZE,
1280         pack( $StP{$self->byte_size}, 0 ),
1281     );
1282
1283     return $loc;
1284 }
1285
1286 ################################################################################
1287
1288 =head2 flush()
1289
1290 This takes no arguments. It will do everything necessary to flush all things to
1291 disk. This is usually called during unlock() and setup_fh().
1292
1293 This returns nothing.
1294
1295 =cut
1296
1297 sub flush {
1298     my $self = shift;
1299
1300     # Why do we need to have the storage flush? Shouldn't autoflush take care of things?
1301     # -RobK, 2008-06-26
1302     $self->storage->flush;
1303 }
1304
1305 ################################################################################
1306
1307 sub storage     { $_[0]{storage} }
1308 sub byte_size   { $_[0]{byte_size} }
1309 sub hash_size   { $_[0]{hash_size} }
1310 sub hash_chars  { $_[0]{hash_chars} }
1311 sub num_txns    { $_[0]{num_txns} }
1312 sub max_buckets { $_[0]{max_buckets} }
1313 sub blank_md5   { chr(0) x $_[0]->hash_size }
1314 sub data_sector_size { $_[0]{data_sector_size} }
1315
1316 # This is a calculated value
1317 sub txn_bitfield_len {
1318     my $self = shift;
1319     unless ( exists $self->{txn_bitfield_len} ) {
1320         my $temp = ($self->num_txns) / 8;
1321         if ( $temp > int( $temp ) ) {
1322             $temp = int( $temp ) + 1;
1323         }
1324         $self->{txn_bitfield_len} = $temp;
1325     }
1326     return $self->{txn_bitfield_len};
1327 }
1328
1329 sub trans_id     { $_[0]{trans_id} }
1330 sub set_trans_id { $_[0]{trans_id} = $_[1] }
1331
1332 sub trans_loc     { $_[0]{trans_loc} }
1333 sub set_trans_loc { $_[0]{trans_loc} = $_[1] }
1334
1335 sub chains_loc     { $_[0]{chains_loc} }
1336 sub set_chains_loc { $_[0]{chains_loc} = $_[1] }
1337
1338 sub cache       { $_[0]{cache} ||= {} }
1339 sub clear_cache { %{$_[0]->cache} = () }
1340
1341 sub _dump_file {
1342     my $self = shift;
1343
1344     # Read the header
1345     my $spot = $self->_read_file_header();
1346
1347     my %types = (
1348         0 => 'B',
1349         1 => 'D',
1350         2 => 'I',
1351     );
1352
1353     my %sizes = (
1354         'D' => $self->data_sector_size,
1355         'B' => DBM::Deep::Engine::Sector::BucketList->new({engine=>$self,offset=>1})->size,
1356         'I' => DBM::Deep::Engine::Sector::Index->new({engine=>$self,offset=>1})->size,
1357     );
1358
1359     my $return = "";
1360
1361     # Header values
1362     $return .= "NumTxns: " . $self->num_txns . $/;
1363
1364     # Read the free sector chains
1365     my %sectors;
1366     foreach my $multiple ( 0 .. 2 ) {
1367         $return .= "Chains($types{$multiple}):";
1368         my $old_loc = $self->chains_loc + $multiple * $self->byte_size;
1369         while ( 1 ) {
1370             my $loc = unpack(
1371                 $StP{$self->byte_size},
1372                 $self->storage->read_at( $old_loc, $self->byte_size ),
1373             );
1374
1375             # We're now out of free sectors of this kind.
1376             unless ( $loc ) {
1377                 last;
1378             }
1379
1380             $sectors{ $types{$multiple} }{ $loc } = undef;
1381             $old_loc = $loc + SIG_SIZE + $STALE_SIZE;
1382             $return .= " $loc";
1383         }
1384         $return .= $/;
1385     }
1386
1387     SECTOR:
1388     while ( $spot < $self->storage->{end} ) {
1389         # Read each sector in order.
1390         my $sector = $self->_load_sector( $spot );
1391         if ( !$sector ) {
1392             # Find it in the free-sectors that were found already
1393             foreach my $type ( keys %sectors ) {
1394                 if ( exists $sectors{$type}{$spot} ) {
1395                     my $size = $sizes{$type};
1396                     $return .= sprintf "%08d: %s %04d\n", $spot, 'F' . $type, $size;
1397                     $spot += $size;
1398                     next SECTOR;
1399                 }
1400             }
1401
1402             die "********\n$return\nDidn't find free sector for $spot in chains\n********\n";
1403         }
1404         else {
1405             $return .= sprintf "%08d: %s  %04d", $spot, $sector->type, $sector->size;
1406             if ( $sector->type eq 'D' ) {
1407                 $return .= ' ' . $sector->data;
1408             }
1409             elsif ( $sector->type eq 'A' || $sector->type eq 'H' ) {
1410                 $return .= ' REF: ' . $sector->get_refcount;
1411             }
1412             elsif ( $sector->type eq 'B' ) {
1413                 foreach my $bucket ( $sector->chopped_up ) {
1414                     $return .= "\n    ";
1415                     $return .= sprintf "%08d", unpack($StP{$self->byte_size},
1416                         substr( $bucket->[-1], $self->hash_size, $self->byte_size),
1417                     );
1418                     my $l = unpack( $StP{$self->byte_size},
1419                         substr( $bucket->[-1],
1420                             $self->hash_size + $self->byte_size,
1421                             $self->byte_size,
1422                         ),
1423                     );
1424                     $return .= sprintf " %08d", $l;
1425                     foreach my $txn ( 0 .. $self->num_txns - 2 ) {
1426                         my $l = unpack( $StP{$self->byte_size},
1427                             substr( $bucket->[-1],
1428                                 $self->hash_size + 2 * $self->byte_size + $txn * ($self->byte_size + $STALE_SIZE),
1429                                 $self->byte_size,
1430                             ),
1431                         );
1432                         $return .= sprintf " %08d", $l;
1433                     }
1434                 }
1435             }
1436             $return .= $/;
1437
1438             $spot += $sector->size;
1439         }
1440     }
1441
1442     return $return;
1443 }
1444
1445 ################################################################################
1446
1447 package DBM::Deep::Iterator;
1448
1449 sub new {
1450     my $class = shift;
1451     my ($args) = @_;
1452
1453     my $self = bless {
1454         breadcrumbs => [],
1455         engine      => $args->{engine},
1456         base_offset => $args->{base_offset},
1457     }, $class;
1458
1459     Scalar::Util::weaken( $self->{engine} );
1460
1461     return $self;
1462 }
1463
1464 sub reset { $_[0]{breadcrumbs} = [] }
1465
1466 sub get_sector_iterator {
1467     my $self = shift;
1468     my ($loc) = @_;
1469
1470     my $sector = $self->{engine}->_load_sector( $loc )
1471         or return;
1472
1473     if ( $sector->isa( 'DBM::Deep::Engine::Sector::Index' ) ) {
1474         return DBM::Deep::Iterator::Index->new({
1475             iterator => $self,
1476             sector   => $sector,
1477         });
1478     }
1479     elsif ( $sector->isa( 'DBM::Deep::Engine::Sector::BucketList' ) ) {
1480         return DBM::Deep::Iterator::BucketList->new({
1481             iterator => $self,
1482             sector   => $sector,
1483         });
1484     }
1485
1486     DBM::Deep->_throw_error( "get_sector_iterator(): Why did $loc make a $sector?" );
1487 }
1488
1489 sub get_next_key {
1490     my $self = shift;
1491     my ($obj) = @_;
1492
1493     my $crumbs = $self->{breadcrumbs};
1494     my $e = $self->{engine};
1495
1496     unless ( @$crumbs ) {
1497         # This will be a Reference sector
1498         my $sector = $e->_load_sector( $self->{base_offset} )
1499             # If no sector is found, thist must have been deleted from under us.
1500             or return;
1501
1502         if ( $sector->staleness != $obj->_staleness ) {
1503             return;
1504         }
1505
1506         my $loc = $sector->get_blist_loc
1507             or return;
1508
1509         push @$crumbs, $self->get_sector_iterator( $loc );
1510     }
1511
1512     FIND_NEXT_KEY: {
1513         # We're at the end.
1514         unless ( @$crumbs ) {
1515             $self->reset;
1516             return;
1517         }
1518
1519         my $iterator = $crumbs->[-1];
1520
1521         # This level is done.
1522         if ( $iterator->at_end ) {
1523             pop @$crumbs;
1524             redo FIND_NEXT_KEY;
1525         }
1526
1527         if ( $iterator->isa( 'DBM::Deep::Iterator::Index' ) ) {
1528             # If we don't have any more, it will be caught at the
1529             # prior check.
1530             if ( my $next = $iterator->get_next_iterator ) {
1531                 push @$crumbs, $next;
1532             }
1533             redo FIND_NEXT_KEY;
1534         }
1535
1536         unless ( $iterator->isa( 'DBM::Deep::Iterator::BucketList' ) ) {
1537             DBM::Deep->_throw_error(
1538                 "Should have a bucketlist iterator here - instead have $iterator"
1539             );
1540         }
1541
1542         # At this point, we have a BucketList iterator
1543         my $key = $iterator->get_next_key;
1544         if ( defined $key ) {
1545             return $key;
1546         }
1547         #XXX else { $iterator->set_to_end() } ?
1548
1549         # We hit the end of the bucketlist iterator, so redo
1550         redo FIND_NEXT_KEY;
1551     }
1552
1553     DBM::Deep->_throw_error( "get_next_key(): How did we get here?" );
1554 }
1555
1556 package DBM::Deep::Iterator::Index;
1557
1558 sub new {
1559     my $self = bless $_[1] => $_[0];
1560     $self->{curr_index} = 0;
1561     return $self;
1562 }
1563
1564 sub at_end {
1565     my $self = shift;
1566     return $self->{curr_index} >= $self->{iterator}{engine}->hash_chars;
1567 }
1568
1569 sub get_next_iterator {
1570     my $self = shift;
1571
1572     my $loc;
1573     while ( !$loc ) {
1574         return if $self->at_end;
1575         $loc = $self->{sector}->get_entry( $self->{curr_index}++ );
1576     }
1577
1578     return $self->{iterator}->get_sector_iterator( $loc );
1579 }
1580
1581 package DBM::Deep::Iterator::BucketList;
1582
1583 sub new {
1584     my $self = bless $_[1] => $_[0];
1585     $self->{curr_index} = 0;
1586     return $self;
1587 }
1588
1589 sub at_end {
1590     my $self = shift;
1591     return $self->{curr_index} >= $self->{iterator}{engine}->max_buckets;
1592 }
1593
1594 sub get_next_key {
1595     my $self = shift;
1596
1597     return if $self->at_end;
1598
1599     my $idx = $self->{curr_index}++;
1600
1601     my $data_loc = $self->{sector}->get_data_location_for({
1602         allow_head => 1,
1603         idx        => $idx,
1604     }) or return;
1605
1606     #XXX Do we want to add corruption checks here?
1607     return $self->{sector}->get_key_for( $idx )->data;
1608 }
1609
1610 package DBM::Deep::Engine::Sector;
1611
1612 sub new {
1613     my $self = bless $_[1], $_[0];
1614     Scalar::Util::weaken( $self->{engine} );
1615     $self->_init;
1616     return $self;
1617 }
1618
1619 #sub _init {}
1620 #sub clone { DBM::Deep->_throw_error( "Must be implemented in the child class" ); }
1621
1622 sub engine { $_[0]{engine} }
1623 sub offset { $_[0]{offset} }
1624 sub type   { $_[0]{type} }
1625
1626 sub base_size {
1627    my $self = shift;
1628    return $self->engine->SIG_SIZE + $STALE_SIZE;
1629 }
1630
1631 sub free {
1632     my $self = shift;
1633
1634     my $e = $self->engine;
1635
1636     $e->storage->print_at( $self->offset, $e->SIG_FREE );
1637     # Skip staleness counter
1638     $e->storage->print_at( $self->offset + $self->base_size,
1639         chr(0) x ($self->size - $self->base_size),
1640     );
1641
1642     my $free_meth = $self->free_meth;
1643     $e->$free_meth( $self->offset, $self->size );
1644
1645     return;
1646 }
1647
1648 package DBM::Deep::Engine::Sector::Data;
1649
1650 our @ISA = qw( DBM::Deep::Engine::Sector );
1651
1652 # This is in bytes
1653 sub size { $_[0]{engine}->data_sector_size }
1654 sub free_meth { return '_add_free_data_sector' }
1655
1656 sub clone {
1657     my $self = shift;
1658     return ref($self)->new({
1659         engine => $self->engine,
1660         type   => $self->type,
1661         data   => $self->data,
1662     });
1663 }
1664
1665 package DBM::Deep::Engine::Sector::Scalar;
1666
1667 our @ISA = qw( DBM::Deep::Engine::Sector::Data );
1668
1669 sub free {
1670     my $self = shift;
1671
1672     my $chain_loc = $self->chain_loc;
1673
1674     $self->SUPER::free();
1675
1676     if ( $chain_loc ) {
1677         $self->engine->_load_sector( $chain_loc )->free;
1678     }
1679
1680     return;
1681 }
1682
1683 sub type { $_[0]{engine}->SIG_DATA }
1684 sub _init {
1685     my $self = shift;
1686
1687     my $engine = $self->engine;
1688
1689     unless ( $self->offset ) {
1690         my $data_section = $self->size - $self->base_size - $engine->byte_size - 1;
1691
1692         $self->{offset} = $engine->_request_data_sector( $self->size );
1693
1694         my $data = delete $self->{data};
1695         my $dlen = length $data;
1696         my $continue = 1;
1697         my $curr_offset = $self->offset;
1698         while ( $continue ) {
1699
1700             my $next_offset = 0;
1701
1702             my ($leftover, $this_len, $chunk);
1703             if ( $dlen > $data_section ) {
1704                 $leftover = 0;
1705                 $this_len = $data_section;
1706                 $chunk = substr( $data, 0, $this_len );
1707
1708                 $dlen -= $data_section;
1709                 $next_offset = $engine->_request_data_sector( $self->size );
1710                 $data = substr( $data, $this_len );
1711             }
1712             else {
1713                 $leftover = $data_section - $dlen;
1714                 $this_len = $dlen;
1715                 $chunk = $data;
1716
1717                 $continue = 0;
1718             }
1719
1720             $engine->storage->print_at( $curr_offset, $self->type ); # Sector type
1721             # Skip staleness
1722             $engine->storage->print_at( $curr_offset + $self->base_size,
1723                 pack( $StP{$engine->byte_size}, $next_offset ),  # Chain loc
1724                 pack( $StP{1}, $this_len ),                      # Data length
1725                 $chunk,                                          # Data to be stored in this sector
1726                 chr(0) x $leftover,                              # Zero-fill the rest
1727             );
1728
1729             $curr_offset = $next_offset;
1730         }
1731
1732         return;
1733     }
1734 }
1735
1736 sub data_length {
1737     my $self = shift;
1738
1739     my $buffer = $self->engine->storage->read_at(
1740         $self->offset + $self->base_size + $self->engine->byte_size, 1
1741     );
1742
1743     return unpack( $StP{1}, $buffer );
1744 }
1745
1746 sub chain_loc {
1747     my $self = shift;
1748     return unpack(
1749         $StP{$self->engine->byte_size},
1750         $self->engine->storage->read_at(
1751             $self->offset + $self->base_size,
1752             $self->engine->byte_size,
1753         ),
1754     );
1755 }
1756
1757 sub data {
1758     my $self = shift;
1759 #    my ($args) = @_;
1760 #    $args ||= {};
1761
1762     my $data;
1763     while ( 1 ) {
1764         my $chain_loc = $self->chain_loc;
1765
1766         $data .= $self->engine->storage->read_at(
1767             $self->offset + $self->base_size + $self->engine->byte_size + 1, $self->data_length,
1768         );
1769
1770         last unless $chain_loc;
1771
1772         $self = $self->engine->_load_sector( $chain_loc );
1773     }
1774
1775     return $data;
1776 }
1777
1778 package DBM::Deep::Engine::Sector::Null;
1779
1780 our @ISA = qw( DBM::Deep::Engine::Sector::Data );
1781
1782 sub type { $_[0]{engine}->SIG_NULL }
1783 sub data_length { 0 }
1784 sub data { return }
1785
1786 sub _init {
1787     my $self = shift;
1788
1789     my $engine = $self->engine;
1790
1791     unless ( $self->offset ) {
1792         my $leftover = $self->size - $self->base_size - 1 * $engine->byte_size - 1;
1793
1794         $self->{offset} = $engine->_request_data_sector( $self->size );
1795         $engine->storage->print_at( $self->offset, $self->type ); # Sector type
1796         # Skip staleness counter
1797         $engine->storage->print_at( $self->offset + $self->base_size,
1798             pack( $StP{$engine->byte_size}, 0 ),  # Chain loc
1799             pack( $StP{1}, $self->data_length ),  # Data length
1800             chr(0) x $leftover,                   # Zero-fill the rest
1801         );
1802
1803         return;
1804     }
1805 }
1806
1807 package DBM::Deep::Engine::Sector::Reference;
1808
1809 our @ISA = qw( DBM::Deep::Engine::Sector::Data );
1810
1811 sub _init {
1812     my $self = shift;
1813
1814     my $e = $self->engine;
1815
1816     unless ( $self->offset ) {
1817         my $classname = Scalar::Util::blessed( delete $self->{data} );
1818         my $leftover = $self->size - $self->base_size - 3 * $e->byte_size;
1819
1820         my $class_offset = 0;
1821         if ( defined $classname ) {
1822             my $class_sector = DBM::Deep::Engine::Sector::Scalar->new({
1823                 engine => $e,
1824                 data   => $classname,
1825             });
1826             $class_offset = $class_sector->offset;
1827         }
1828
1829         $self->{offset} = $e->_request_data_sector( $self->size );
1830         $e->storage->print_at( $self->offset, $self->type ); # Sector type
1831         # Skip staleness counter
1832         $e->storage->print_at( $self->offset + $self->base_size,
1833             pack( $StP{$e->byte_size}, 0 ),             # Index/BList loc
1834             pack( $StP{$e->byte_size}, $class_offset ), # Classname loc
1835             pack( $StP{$e->byte_size}, 1 ),             # Initial refcount
1836             chr(0) x $leftover,                         # Zero-fill the rest
1837         );
1838     }
1839     else {
1840         $self->{type} = $e->storage->read_at( $self->offset, 1 );
1841     }
1842
1843     $self->{staleness} = unpack(
1844         $StP{$STALE_SIZE},
1845         $e->storage->read_at( $self->offset + $e->SIG_SIZE, $STALE_SIZE ),
1846     );
1847
1848     return;
1849 }
1850
1851 sub staleness { $_[0]{staleness} }
1852
1853 sub get_data_location_for {
1854     my $self = shift;
1855     my ($args) = @_;
1856
1857     # Assume that the head is not allowed unless otherwise specified.
1858     $args->{allow_head} = 0 unless exists $args->{allow_head};
1859
1860     # Assume we don't create a new blist location unless otherwise specified.
1861     $args->{create} = 0 unless exists $args->{create};
1862
1863     my $blist = $self->get_bucket_list({
1864         key_md5 => $args->{key_md5},
1865         key => $args->{key},
1866         create  => $args->{create},
1867     });
1868     return unless $blist && $blist->{found};
1869
1870     # At this point, $blist knows where the md5 is. What it -doesn't- know yet
1871     # is whether or not this transaction has this key. That's part of the next
1872     # function call.
1873     my $location = $blist->get_data_location_for({
1874         allow_head => $args->{allow_head},
1875     }) or return;
1876
1877     return $location;
1878 }
1879
1880 sub get_data_for {
1881     my $self = shift;
1882     my ($args) = @_;
1883
1884     my $location = $self->get_data_location_for( $args )
1885         or return;
1886
1887     return $self->engine->_load_sector( $location );
1888 }
1889
1890 sub write_data {
1891     my $self = shift;
1892     my ($args) = @_;
1893
1894     my $blist = $self->get_bucket_list({
1895         key_md5 => $args->{key_md5},
1896         key => $args->{key},
1897         create  => 1,
1898     }) or DBM::Deep->_throw_error( "How did write_data fail (no blist)?!" );
1899
1900     # Handle any transactional bookkeeping.
1901     if ( $self->engine->trans_id ) {
1902         if ( ! $blist->has_md5 ) {
1903             $blist->mark_deleted({
1904                 trans_id => 0,
1905             });
1906         }
1907     }
1908     else {
1909         my @trans_ids = $self->engine->get_running_txn_ids;
1910         if ( $blist->has_md5 ) {
1911             if ( @trans_ids ) {
1912                 my $old_value = $blist->get_data_for;
1913                 foreach my $other_trans_id ( @trans_ids ) {
1914                     next if $blist->get_data_location_for({
1915                         trans_id   => $other_trans_id,
1916                         allow_head => 0,
1917                     });
1918                     $blist->write_md5({
1919                         trans_id => $other_trans_id,
1920                         key      => $args->{key},
1921                         key_md5  => $args->{key_md5},
1922                         value    => $old_value->clone,
1923                     });
1924                 }
1925             }
1926         }
1927         else {
1928             if ( @trans_ids ) {
1929                 foreach my $other_trans_id ( @trans_ids ) {
1930                     #XXX This doesn't seem to possible to ever happen . . .
1931                     next if $blist->get_data_location_for({ trans_id => $other_trans_id, allow_head => 0 });
1932                     $blist->mark_deleted({
1933                         trans_id => $other_trans_id,
1934                     });
1935                 }
1936             }
1937         }
1938     }
1939
1940     #XXX Is this safe to do transactionally?
1941     # Free the place we're about to write to.
1942     if ( $blist->get_data_location_for({ allow_head => 0 }) ) {
1943         $blist->get_data_for({ allow_head => 0 })->free;
1944     }
1945
1946     $blist->write_md5({
1947         key      => $args->{key},
1948         key_md5  => $args->{key_md5},
1949         value    => $args->{value},
1950     });
1951 }
1952
1953 sub delete_key {
1954     my $self = shift;
1955     my ($args) = @_;
1956
1957     # XXX What should happen if this fails?
1958     my $blist = $self->get_bucket_list({
1959         key_md5 => $args->{key_md5},
1960     }) or DBM::Deep->_throw_error( "How did delete_key fail (no blist)?!" );
1961
1962     # Save the location so that we can free the data
1963     my $location = $blist->get_data_location_for({
1964         allow_head => 0,
1965     });
1966     my $old_value = $location && $self->engine->_load_sector( $location );
1967
1968     my @trans_ids = $self->engine->get_running_txn_ids;
1969
1970     # If we're the HEAD and there are running txns, then we need to clone this value to the other
1971     # transactions to preserve Isolation.
1972     if ( $self->engine->trans_id == 0 ) {
1973         if ( @trans_ids ) {
1974             foreach my $other_trans_id ( @trans_ids ) {
1975                 next if $blist->get_data_location_for({ trans_id => $other_trans_id, allow_head => 0 });
1976                 $blist->write_md5({
1977                     trans_id => $other_trans_id,
1978                     key      => $args->{key},
1979                     key_md5  => $args->{key_md5},
1980                     value    => $old_value->clone,
1981                 });
1982             }
1983         }
1984     }
1985
1986     my $data;
1987     if ( @trans_ids ) {
1988         $blist->mark_deleted( $args );
1989
1990         if ( $old_value ) {
1991             $data = $old_value->data({ export => 1 });
1992             $old_value->free;
1993         }
1994     }
1995     else {
1996         $data = $blist->delete_md5( $args );
1997     }
1998
1999     return $data;
2000 }
2001
2002 sub get_blist_loc {
2003     my $self = shift;
2004
2005     my $e = $self->engine;
2006     my $blist_loc = $e->storage->read_at( $self->offset + $self->base_size, $e->byte_size );
2007     return unpack( $StP{$e->byte_size}, $blist_loc );
2008 }
2009
2010 sub get_bucket_list {
2011     my $self = shift;
2012     my ($args) = @_;
2013     $args ||= {};
2014
2015     # XXX Add in check here for recycling?
2016
2017     my $engine = $self->engine;
2018
2019     my $blist_loc = $self->get_blist_loc;
2020
2021     # There's no index or blist yet
2022     unless ( $blist_loc ) {
2023         return unless $args->{create};
2024
2025         my $blist = DBM::Deep::Engine::Sector::BucketList->new({
2026             engine  => $engine,
2027             key_md5 => $args->{key_md5},
2028         });
2029
2030         $engine->storage->print_at( $self->offset + $self->base_size,
2031             pack( $StP{$engine->byte_size}, $blist->offset ),
2032         );
2033
2034         return $blist;
2035     }
2036
2037     my $sector = $engine->_load_sector( $blist_loc )
2038         or DBM::Deep->_throw_error( "Cannot read sector at $blist_loc in get_bucket_list()" );
2039     my $i = 0;
2040     my $last_sector = undef;
2041     while ( $sector->isa( 'DBM::Deep::Engine::Sector::Index' ) ) {
2042         $blist_loc = $sector->get_entry( ord( substr( $args->{key_md5}, $i++, 1 ) ) );
2043         $last_sector = $sector;
2044         if ( $blist_loc ) {
2045             $sector = $engine->_load_sector( $blist_loc )
2046                 or DBM::Deep->_throw_error( "Cannot read sector at $blist_loc in get_bucket_list()" );
2047         }
2048         else {
2049             $sector = undef;
2050             last;
2051         }
2052     }
2053
2054     # This means we went through the Index sector(s) and found an empty slot
2055     unless ( $sector ) {
2056         return unless $args->{create};
2057
2058         DBM::Deep->_throw_error( "No last_sector when attempting to build a new entry" )
2059             unless $last_sector;
2060
2061         my $blist = DBM::Deep::Engine::Sector::BucketList->new({
2062             engine  => $engine,
2063             key_md5 => $args->{key_md5},
2064         });
2065
2066         $last_sector->set_entry( ord( substr( $args->{key_md5}, $i - 1, 1 ) ) => $blist->offset );
2067
2068         return $blist;
2069     }
2070
2071     $sector->find_md5( $args->{key_md5} );
2072
2073     # See whether or not we need to reindex the bucketlist
2074     # Yes, the double-braces are there for a reason. if() doesn't create a redo-able block,
2075     # so we have to create a bare block within the if() for redo-purposes. Patch and idea
2076     # submitted by sprout@cpan.org. -RobK, 2008-01-09
2077     if ( !$sector->has_md5 && $args->{create} && $sector->{idx} == -1 ) {{
2078         my $redo;
2079
2080         my $new_index = DBM::Deep::Engine::Sector::Index->new({
2081             engine => $engine,
2082         });
2083
2084         my %blist_cache;
2085         #XXX q.v. the comments for this function.
2086         foreach my $entry ( $sector->chopped_up ) {
2087             my ($spot, $md5) = @{$entry};
2088             my $idx = ord( substr( $md5, $i, 1 ) );
2089
2090             # XXX This is inefficient
2091             my $blist = $blist_cache{$idx}
2092                 ||= DBM::Deep::Engine::Sector::BucketList->new({
2093                     engine => $engine,
2094                 });
2095
2096             $new_index->set_entry( $idx => $blist->offset );
2097
2098             my $new_spot = $blist->write_at_next_open( $md5 );
2099             $engine->reindex_entry( $spot => $new_spot );
2100         }
2101
2102         # Handle the new item separately.
2103         {
2104             my $idx = ord( substr( $args->{key_md5}, $i, 1 ) );
2105
2106             # If all the previous blist's items have been thrown into one
2107             # blist and the new item belongs in there too, we need
2108             # another index.
2109             if ( keys %blist_cache == 1 and each %blist_cache == $idx ) {
2110                 ++$i, ++$redo;
2111             } else {
2112                 my $blist = $blist_cache{$idx}
2113                     ||= DBM::Deep::Engine::Sector::BucketList->new({
2114                         engine => $engine,
2115                     });
2116     
2117                 $new_index->set_entry( $idx => $blist->offset );
2118     
2119                 #XXX THIS IS HACKY!
2120                 $blist->find_md5( $args->{key_md5} );
2121                 $blist->write_md5({
2122                     key     => $args->{key},
2123                     key_md5 => $args->{key_md5},
2124                     value   => DBM::Deep::Engine::Sector::Null->new({
2125                         engine => $engine,
2126                         data   => undef,
2127                     }),
2128                 });
2129             }
2130 #            my $blist = $blist_cache{$idx}
2131 #                ||= DBM::Deep::Engine::Sector::BucketList->new({
2132 #                    engine => $engine,
2133 #                });
2134 #
2135 #            $new_index->set_entry( $idx => $blist->offset );
2136 #
2137 #            #XXX THIS IS HACKY!
2138 #            $blist->find_md5( $args->{key_md5} );
2139 #            $blist->write_md5({
2140 #                key     => $args->{key},
2141 #                key_md5 => $args->{key_md5},
2142 #                value   => DBM::Deep::Engine::Sector::Null->new({
2143 #                    engine => $engine,
2144 #                    data   => undef,
2145 #                }),
2146 #            });
2147         }
2148
2149         if ( $last_sector ) {
2150             $last_sector->set_entry(
2151                 ord( substr( $args->{key_md5}, $i - 1, 1 ) ),
2152                 $new_index->offset,
2153             );
2154         } else {
2155             $engine->storage->print_at( $self->offset + $self->base_size,
2156                 pack( $StP{$engine->byte_size}, $new_index->offset ),
2157             );
2158         }
2159
2160         $sector->clear;
2161         $sector->free;
2162
2163         if ( $redo ) {
2164             (undef, $sector) = %blist_cache;
2165             $last_sector = $new_index;
2166             redo;
2167         }
2168
2169         $sector = $blist_cache{ ord( substr( $args->{key_md5}, $i, 1 ) ) };
2170         $sector->find_md5( $args->{key_md5} );
2171     }}
2172
2173     return $sector;
2174 }
2175
2176 sub get_class_offset {
2177     my $self = shift;
2178
2179     my $e = $self->engine;
2180     return unpack(
2181         $StP{$e->byte_size},
2182         $e->storage->read_at(
2183             $self->offset + $self->base_size + 1 * $e->byte_size, $e->byte_size,
2184         ),
2185     );
2186 }
2187
2188 sub get_classname {
2189     my $self = shift;
2190
2191     my $class_offset = $self->get_class_offset;
2192
2193     return unless $class_offset;
2194
2195     return $self->engine->_load_sector( $class_offset )->data;
2196 }
2197
2198 sub data {
2199     my $self = shift;
2200     my ($args) = @_;
2201     $args ||= {};
2202
2203     my $obj;
2204     unless ( $obj = $self->engine->cache->{ $self->offset } ) {
2205         $obj = DBM::Deep->new({
2206             type        => $self->type,
2207             base_offset => $self->offset,
2208             staleness   => $self->staleness,
2209             storage     => $self->engine->storage,
2210             engine      => $self->engine,
2211         });
2212
2213         if ( $self->engine->storage->{autobless} ) {
2214             my $classname = $self->get_classname;
2215             if ( defined $classname ) {
2216                 bless $obj, $classname;
2217             }
2218         }
2219
2220         $self->engine->cache->{$self->offset} = $obj;
2221     }
2222
2223     # We're not exporting, so just return.
2224     unless ( $args->{export} ) {
2225         return $obj;
2226     }
2227
2228     # We shouldn't export if this is still referred to.
2229     if ( $self->get_refcount > 1 ) {
2230         return $obj;
2231     }
2232
2233     return $obj->export;
2234 }
2235
2236 sub free {
2237     my $self = shift;
2238
2239     # We're not ready to be removed yet.
2240     if ( $self->decrement_refcount > 0 ) {
2241         return;
2242     }
2243
2244     # Rebless the object into DBM::Deep::Null.
2245     eval { %{ $self->engine->cache->{ $self->offset } } = (); };
2246     eval { @{ $self->engine->cache->{ $self->offset } } = (); };
2247     bless $self->engine->cache->{ $self->offset }, 'DBM::Deep::Null';
2248     delete $self->engine->cache->{ $self->offset };
2249
2250     my $blist_loc = $self->get_blist_loc;
2251     $self->engine->_load_sector( $blist_loc )->free if $blist_loc;
2252
2253     my $class_loc = $self->get_class_offset;
2254     $self->engine->_load_sector( $class_loc )->free if $class_loc;
2255
2256     $self->SUPER::free();
2257 }
2258
2259 sub increment_refcount {
2260     my $self = shift;
2261
2262     my $refcount = $self->get_refcount;
2263
2264     $refcount++;
2265
2266     $self->write_refcount( $refcount );
2267
2268     return $refcount;
2269 }
2270
2271 sub decrement_refcount {
2272     my $self = shift;
2273
2274     my $refcount = $self->get_refcount;
2275
2276     $refcount--;
2277
2278     $self->write_refcount( $refcount );
2279
2280     return $refcount;
2281 }
2282
2283 sub get_refcount {
2284     my $self = shift;
2285
2286     my $e = $self->engine;
2287     return unpack(
2288         $StP{$e->byte_size},
2289         $e->storage->read_at(
2290             $self->offset + $self->base_size + 2 * $e->byte_size, $e->byte_size,
2291         ),
2292     );
2293 }
2294
2295 sub write_refcount {
2296     my $self = shift;
2297     my ($num) = @_;
2298
2299     my $e = $self->engine;
2300     $e->storage->print_at(
2301         $self->offset + $self->base_size + 2 * $e->byte_size,
2302         pack( $StP{$e->byte_size}, $num ),
2303     );
2304 }
2305
2306 package DBM::Deep::Engine::Sector::BucketList;
2307
2308 our @ISA = qw( DBM::Deep::Engine::Sector );
2309
2310 sub _init {
2311     my $self = shift;
2312
2313     my $engine = $self->engine;
2314
2315     unless ( $self->offset ) {
2316         my $leftover = $self->size - $self->base_size;
2317
2318         $self->{offset} = $engine->_request_blist_sector( $self->size );
2319         $engine->storage->print_at( $self->offset, $engine->SIG_BLIST ); # Sector type
2320         # Skip staleness counter
2321         $engine->storage->print_at( $self->offset + $self->base_size,
2322             chr(0) x $leftover, # Zero-fill the data
2323         );
2324     }
2325
2326     if ( $self->{key_md5} ) {
2327         $self->find_md5;
2328     }
2329
2330     return $self;
2331 }
2332
2333 sub clear {
2334     my $self = shift;
2335     $self->engine->storage->print_at( $self->offset + $self->base_size,
2336         chr(0) x ($self->size - $self->base_size), # Zero-fill the data
2337     );
2338 }
2339
2340 sub size {
2341     my $self = shift;
2342     unless ( $self->{size} ) {
2343         my $e = $self->engine;
2344         # Base + numbuckets * bucketsize
2345         $self->{size} = $self->base_size + $e->max_buckets * $self->bucket_size;
2346     }
2347     return $self->{size};
2348 }
2349
2350 sub free_meth { return '_add_free_blist_sector' }
2351
2352 sub free {
2353     my $self = shift;
2354
2355     my $e = $self->engine;
2356     foreach my $bucket ( $self->chopped_up ) {
2357         my $rest = $bucket->[-1];
2358
2359         # Delete the keysector
2360         my $l = unpack( $StP{$e->byte_size}, substr( $rest, $e->hash_size, $e->byte_size ) );
2361         my $s = $e->_load_sector( $l ); $s->free if $s;
2362
2363         # Delete the HEAD sector
2364         $l = unpack( $StP{$e->byte_size},
2365             substr( $rest,
2366                 $e->hash_size + $e->byte_size,
2367                 $e->byte_size,
2368             ),
2369         );
2370         $s = $e->_load_sector( $l ); $s->free if $s;
2371
2372         foreach my $txn ( 0 .. $e->num_txns - 2 ) {
2373             my $l = unpack( $StP{$e->byte_size},
2374                 substr( $rest,
2375                     $e->hash_size + 2 * $e->byte_size + $txn * ($e->byte_size + $STALE_SIZE),
2376                     $e->byte_size,
2377                 ),
2378             );
2379             my $s = $e->_load_sector( $l ); $s->free if $s;
2380         }
2381     }
2382
2383     $self->SUPER::free();
2384 }
2385
2386 sub bucket_size {
2387     my $self = shift;
2388     unless ( $self->{bucket_size} ) {
2389         my $e = $self->engine;
2390         # Key + head (location) + transactions (location + staleness-counter)
2391         my $location_size = $e->byte_size + $e->byte_size + ($e->num_txns - 1) * ($e->byte_size + $STALE_SIZE);
2392         $self->{bucket_size} = $e->hash_size + $location_size;
2393     }
2394     return $self->{bucket_size};
2395 }
2396
2397 # XXX This is such a poor hack. I need to rethink this code.
2398 sub chopped_up {
2399     my $self = shift;
2400
2401     my $e = $self->engine;
2402
2403     my @buckets;
2404     foreach my $idx ( 0 .. $e->max_buckets - 1 ) {
2405         my $spot = $self->offset + $self->base_size + $idx * $self->bucket_size;
2406         my $md5 = $e->storage->read_at( $spot, $e->hash_size );
2407
2408         #XXX If we're chopping, why would we ever have the blank_md5?
2409         last if $md5 eq $e->blank_md5;
2410
2411         my $rest = $e->storage->read_at( undef, $self->bucket_size - $e->hash_size );
2412         push @buckets, [ $spot, $md5 . $rest ];
2413     }
2414
2415     return @buckets;
2416 }
2417
2418 sub write_at_next_open {
2419     my $self = shift;
2420     my ($entry) = @_;
2421
2422     #XXX This is such a hack!
2423     $self->{_next_open} = 0 unless exists $self->{_next_open};
2424
2425     my $spot = $self->offset + $self->base_size + $self->{_next_open}++ * $self->bucket_size;
2426     $self->engine->storage->print_at( $spot, $entry );
2427
2428     return $spot;
2429 }
2430
2431 sub has_md5 {
2432     my $self = shift;
2433     unless ( exists $self->{found} ) {
2434         $self->find_md5;
2435     }
2436     return $self->{found};
2437 }
2438
2439 sub find_md5 {
2440     my $self = shift;
2441
2442     $self->{found} = undef;
2443     $self->{idx}   = -1;
2444
2445     if ( @_ ) {
2446         $self->{key_md5} = shift;
2447     }
2448
2449     # If we don't have an MD5, then what are we supposed to do?
2450     unless ( exists $self->{key_md5} ) {
2451         DBM::Deep->_throw_error( "Cannot find_md5 without a key_md5 set" );
2452     }
2453
2454     my $e = $self->engine;
2455     foreach my $idx ( 0 .. $e->max_buckets - 1 ) {
2456         my $potential = $e->storage->read_at(
2457             $self->offset + $self->base_size + $idx * $self->bucket_size, $e->hash_size,
2458         );
2459
2460         if ( $potential eq $e->blank_md5 ) {
2461             $self->{idx} = $idx;
2462             return;
2463         }
2464
2465         if ( $potential eq $self->{key_md5} ) {
2466             $self->{found} = 1;
2467             $self->{idx} = $idx;
2468             return;
2469         }
2470     }
2471
2472     return;
2473 }
2474
2475 sub write_md5 {
2476     my $self = shift;
2477     my ($args) = @_;
2478
2479     DBM::Deep->_throw_error( "write_md5: no key" ) unless exists $args->{key};
2480     DBM::Deep->_throw_error( "write_md5: no key_md5" ) unless exists $args->{key_md5};
2481     DBM::Deep->_throw_error( "write_md5: no value" ) unless exists $args->{value};
2482
2483     my $engine = $self->engine;
2484
2485     $args->{trans_id} = $engine->trans_id unless exists $args->{trans_id};
2486
2487     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
2488     $engine->add_entry( $args->{trans_id}, $spot );
2489
2490     unless ($self->{found}) {
2491         my $key_sector = DBM::Deep::Engine::Sector::Scalar->new({
2492             engine => $engine,
2493             data   => $args->{key},
2494         });
2495
2496         $engine->storage->print_at( $spot,
2497             $args->{key_md5},
2498             pack( $StP{$engine->byte_size}, $key_sector->offset ),
2499         );
2500     }
2501
2502     my $loc = $spot
2503       + $engine->hash_size
2504       + $engine->byte_size;
2505
2506     if ( $args->{trans_id} ) {
2507         $loc += $engine->byte_size + ($args->{trans_id} - 1) * ( $engine->byte_size + $STALE_SIZE );
2508
2509         $engine->storage->print_at( $loc,
2510             pack( $StP{$engine->byte_size}, $args->{value}->offset ),
2511             pack( $StP{$STALE_SIZE}, $engine->get_txn_staleness_counter( $args->{trans_id} ) ),
2512         );
2513     }
2514     else {
2515         $engine->storage->print_at( $loc,
2516             pack( $StP{$engine->byte_size}, $args->{value}->offset ),
2517         );
2518     }
2519 }
2520
2521 sub mark_deleted {
2522     my $self = shift;
2523     my ($args) = @_;
2524     $args ||= {};
2525
2526     my $engine = $self->engine;
2527
2528     $args->{trans_id} = $engine->trans_id unless exists $args->{trans_id};
2529
2530     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
2531     $engine->add_entry( $args->{trans_id}, $spot );
2532
2533     my $loc = $spot
2534       + $engine->hash_size
2535       + $engine->byte_size;
2536
2537     if ( $args->{trans_id} ) {
2538         $loc += $engine->byte_size + ($args->{trans_id} - 1) * ( $engine->byte_size + $STALE_SIZE );
2539
2540         $engine->storage->print_at( $loc,
2541             pack( $StP{$engine->byte_size}, 1 ), # 1 is the marker for deleted
2542             pack( $StP{$STALE_SIZE}, $engine->get_txn_staleness_counter( $args->{trans_id} ) ),
2543         );
2544     }
2545     else {
2546         $engine->storage->print_at( $loc,
2547             pack( $StP{$engine->byte_size}, 1 ), # 1 is the marker for deleted
2548         );
2549     }
2550
2551 }
2552
2553 sub delete_md5 {
2554     my $self = shift;
2555     my ($args) = @_;
2556
2557     my $engine = $self->engine;
2558     return undef unless $self->{found};
2559
2560     # Save the location so that we can free the data
2561     my $location = $self->get_data_location_for({
2562         allow_head => 0,
2563     });
2564     my $key_sector = $self->get_key_for;
2565
2566     my $spot = $self->offset + $self->base_size + $self->{idx} * $self->bucket_size;
2567     $engine->storage->print_at( $spot,
2568         $engine->storage->read_at(
2569             $spot + $self->bucket_size,
2570             $self->bucket_size * ( $engine->max_buckets - $self->{idx} - 1 ),
2571         ),
2572         chr(0) x $self->bucket_size,
2573     );
2574
2575     $key_sector->free;
2576
2577     my $data_sector = $self->engine->_load_sector( $location );
2578     my $data = $data_sector->data({ export => 1 });
2579     $data_sector->free;
2580
2581     return $data;
2582 }
2583
2584 sub get_data_location_for {
2585     my $self = shift;
2586     my ($args) = @_;
2587     $args ||= {};
2588
2589     $args->{allow_head} = 0 unless exists $args->{allow_head};
2590     $args->{trans_id}   = $self->engine->trans_id unless exists $args->{trans_id};
2591     $args->{idx}        = $self->{idx} unless exists $args->{idx};
2592
2593     my $e = $self->engine;
2594
2595     my $spot = $self->offset + $self->base_size
2596       + $args->{idx} * $self->bucket_size
2597       + $e->hash_size
2598       + $e->byte_size;
2599
2600     if ( $args->{trans_id} ) {
2601         $spot += $e->byte_size + ($args->{trans_id} - 1) * ( $e->byte_size + $STALE_SIZE );
2602     }
2603
2604     my $buffer = $e->storage->read_at(
2605         $spot,
2606         $e->byte_size + $STALE_SIZE,
2607     );
2608     my ($loc, $staleness) = unpack( $StP{$e->byte_size} . ' ' . $StP{$STALE_SIZE}, $buffer );
2609
2610     # XXX Merge the two if-clauses below
2611     if ( $args->{trans_id} ) {
2612         # We have found an entry that is old, so get rid of it
2613         if ( $staleness != (my $s = $e->get_txn_staleness_counter( $args->{trans_id} ) ) ) {
2614             $e->storage->print_at(
2615                 $spot,
2616                 pack( $StP{$e->byte_size} . ' ' . $StP{$STALE_SIZE}, (0) x 2 ), 
2617             );
2618             $loc = 0;
2619         }
2620     }
2621
2622     # If we're in a transaction and we never wrote to this location, try the
2623     # HEAD instead.
2624     if ( $args->{trans_id} && !$loc && $args->{allow_head} ) {
2625         return $self->get_data_location_for({
2626             trans_id   => 0,
2627             allow_head => 1,
2628             idx        => $args->{idx},
2629         });
2630     }
2631
2632     return $loc <= 1 ? 0 : $loc;
2633 }
2634
2635 sub get_data_for {
2636     my $self = shift;
2637     my ($args) = @_;
2638     $args ||= {};
2639
2640     return unless $self->{found};
2641     my $location = $self->get_data_location_for({
2642         allow_head => $args->{allow_head},
2643     });
2644     return $self->engine->_load_sector( $location );
2645 }
2646
2647 sub get_key_for {
2648     my $self = shift;
2649     my ($idx) = @_;
2650     $idx = $self->{idx} unless defined $idx;
2651
2652     if ( $idx >= $self->engine->max_buckets ) {
2653         DBM::Deep->_throw_error( "get_key_for(): Attempting to retrieve $idx" );
2654     }
2655
2656     my $location = $self->engine->storage->read_at(
2657         $self->offset + $self->base_size + $idx * $self->bucket_size + $self->engine->hash_size,
2658         $self->engine->byte_size,
2659     );
2660     $location = unpack( $StP{$self->engine->byte_size}, $location );
2661     DBM::Deep->_throw_error( "get_key_for: No location?" ) unless $location;
2662
2663     return $self->engine->_load_sector( $location );
2664 }
2665
2666 package DBM::Deep::Engine::Sector::Index;
2667
2668 our @ISA = qw( DBM::Deep::Engine::Sector );
2669
2670 sub _init {
2671     my $self = shift;
2672
2673     my $engine = $self->engine;
2674
2675     unless ( $self->offset ) {
2676         my $leftover = $self->size - $self->base_size;
2677
2678         $self->{offset} = $engine->_request_index_sector( $self->size );
2679         $engine->storage->print_at( $self->offset, $engine->SIG_INDEX ); # Sector type
2680         # Skip staleness counter
2681         $engine->storage->print_at( $self->offset + $self->base_size,
2682             chr(0) x $leftover, # Zero-fill the rest
2683         );
2684     }
2685
2686     return $self;
2687 }
2688
2689 #XXX Change here
2690 sub size {
2691     my $self = shift;
2692     unless ( $self->{size} ) {
2693         my $e = $self->engine;
2694         $self->{size} = $self->base_size + $e->byte_size * $e->hash_chars;
2695     }
2696     return $self->{size};
2697 }
2698
2699 sub free_meth { return '_add_free_index_sector' }
2700
2701 sub free {
2702     my $self = shift;
2703     my $e = $self->engine;
2704
2705     for my $i ( 0 .. $e->hash_chars - 1 ) {
2706         my $l = $self->get_entry( $i ) or next;
2707         $e->_load_sector( $l )->free;
2708     }
2709
2710     $self->SUPER::free();
2711 }
2712
2713 sub _loc_for {
2714     my $self = shift;
2715     my ($idx) = @_;
2716     return $self->offset + $self->base_size + $idx * $self->engine->byte_size;
2717 }
2718
2719 sub get_entry {
2720     my $self = shift;
2721     my ($idx) = @_;
2722
2723     my $e = $self->engine;
2724
2725     DBM::Deep->_throw_error( "get_entry: Out of range ($idx)" )
2726         if $idx < 0 || $idx >= $e->hash_chars;
2727
2728     return unpack(
2729         $StP{$e->byte_size},
2730         $e->storage->read_at( $self->_loc_for( $idx ), $e->byte_size ),
2731     );
2732 }
2733
2734 sub set_entry {
2735     my $self = shift;
2736     my ($idx, $loc) = @_;
2737
2738     my $e = $self->engine;
2739
2740     DBM::Deep->_throw_error( "set_entry: Out of range ($idx)" )
2741         if $idx < 0 || $idx >= $e->hash_chars;
2742
2743     $self->engine->storage->print_at(
2744         $self->_loc_for( $idx ),
2745         pack( $StP{$e->byte_size}, $loc ),
2746     );
2747 }
2748
2749 # This was copied from MARCEL's Class::Null. However, I couldn't use it because
2750 # I need an undef value, not an implementation of the Null Class pattern.
2751 package DBM::Deep::Null;
2752
2753 use overload
2754     'bool'   => sub { undef },
2755     '""'     => sub { undef },
2756     '0+'     => sub { undef },
2757     fallback => 1,
2758     nomethod => 'AUTOLOAD';
2759
2760 sub AUTOLOAD { return; }
2761
2762 1;
2763 __END__