(RT #40782) '0' as a hashkey wasn't iterated over correctly.
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 use 5.006_000;
4
5 use strict;
6 use warnings FATAL => 'all';
7 no warnings 'recursion';
8
9 # Never import symbols into our namespace. We are a class, not a library.
10 # -RobK, 2008-05-27
11 use Scalar::Util ();
12
13 #use Data::Dumper ();
14
15 # File-wide notes:
16 # * Every method in here assumes that the storage has been appropriately
17 #   safeguarded. This can be anything from flock() to some sort of manual
18 #   mutex. But, it's the caller's responsability to make sure that this has
19 #   been done.
20
21 # Setup file and tag signatures.  These should never change.
22 sub SIG_FILE     () { 'DPDB' }
23 sub SIG_HEADER   () { 'h'    }
24 sub SIG_HASH     () { 'H'    }
25 sub SIG_ARRAY    () { 'A'    }
26 sub SIG_NULL     () { 'N'    }
27 sub SIG_DATA     () { 'D'    }
28 sub SIG_INDEX    () { 'I'    }
29 sub SIG_BLIST    () { 'B'    }
30 sub SIG_FREE     () { 'F'    }
31 sub SIG_SIZE     () {  1     }
32
33 use DBM::Deep::Iterator ();
34 use DBM::Deep::Engine::Sector::Data ();
35 use DBM::Deep::Engine::Sector::BucketList ();
36 use DBM::Deep::Engine::Sector::Index ();
37 use DBM::Deep::Engine::Sector::Null ();
38 use DBM::Deep::Engine::Sector::Reference ();
39 use DBM::Deep::Engine::Sector::Scalar ();
40 use DBM::Deep::Null ();
41
42 my $STALE_SIZE = 2;
43
44 # Please refer to the pack() documentation for further information
45 my %StP = (
46     1 => 'C', # Unsigned char value (no order needed as it's just one byte)
47     2 => 'n', # Unsigned short in "network" (big-endian) order
48     4 => 'N', # Unsigned long in "network" (big-endian) order
49     8 => 'Q', # Usigned quad (no order specified, presumably machine-dependent)
50 );
51
52 =head1 NAME
53
54 DBM::Deep::Engine
55
56 =head1 PURPOSE
57
58 This is an internal-use-only object for L<DBM::Deep/>. It mediates the low-level
59 mapping between the L<DBM::Deep/> objects and the storage medium.
60
61 The purpose of this documentation is to provide low-level documentation for
62 developers. It is B<not> intended to be used by the general public. This
63 documentation and what it documents can and will change without notice.
64
65 =head1 OVERVIEW
66
67 The engine exposes an API to the DBM::Deep objects (DBM::Deep, DBM::Deep::Array,
68 and DBM::Deep::Hash) for their use to access the actual stored values. This API
69 is the following:
70
71 =over 4
72
73 =item * new
74
75 =item * read_value
76
77 =item * get_classname
78
79 =item * make_reference
80
81 =item * key_exists
82
83 =item * delete_key
84
85 =item * write_value
86
87 =item * get_next_key
88
89 =item * setup_fh
90
91 =item * begin_work
92
93 =item * commit
94
95 =item * rollback
96
97 =item * lock_exclusive
98
99 =item * lock_shared
100
101 =item * unlock
102
103 =back
104
105 They are explained in their own sections below. These methods, in turn, may
106 provide some bounds-checking, but primarily act to instantiate objects in the
107 Engine::Sector::* hierarchy and dispatch to them.
108
109 =head1 TRANSACTIONS
110
111 Transactions in DBM::Deep are implemented using a variant of MVCC. This attempts
112 to keep the amount of actual work done against the file low while stil providing
113 Atomicity, Consistency, and Isolation. Durability, unfortunately, cannot be done
114 with only one file.
115
116 =head2 STALENESS
117
118 If another process uses a transaction slot and writes stuff to it, then terminates,
119 the data that process wrote it still within the file. In order to address this,
120 there is also a transaction staleness counter associated within every write.
121 Each time a transaction is started, that process increments that transaction's
122 staleness counter. If, when it reads a value, the staleness counters aren't
123 identical, DBM::Deep will consider the value on disk to be stale and discard it.
124
125 =head2 DURABILITY
126
127 The fourth leg of ACID is Durability, the guarantee that when a commit returns,
128 the data will be there the next time you read from it. This should be regardless
129 of any crashes or powerdowns in between the commit and subsequent read. DBM::Deep
130 does provide that guarantee; once the commit returns, all of the data has been
131 transferred from the transaction shadow to the HEAD. The issue arises with partial
132 commits - a commit that is interrupted in some fashion. In keeping with DBM::Deep's
133 "tradition" of very light error-checking and non-existent error-handling, there is
134 no way to recover from a partial commit. (This is probably a failure in Consistency
135 as well as Durability.)
136
137 Other DBMSes use transaction logs (a separate file, generally) to achieve Durability.
138 As DBM::Deep is a single-file, we would have to do something similar to what SQLite
139 and BDB do in terms of committing using synchonized writes. To do this, we would have
140 to use a much higher RAM footprint and some serious programming that make my head
141 hurts just to think about it.
142
143 =head1 EXTERNAL METHODS
144
145 =head2 new()
146
147 This takes a set of args. These args are described in the documentation for
148 L<DBM::Deep/new>.
149
150 =cut
151
152 sub new {
153     my $class = shift;
154     my ($args) = @_;
155
156     $args->{storage} = DBM::Deep::File->new( $args )
157         unless exists $args->{storage};
158
159     my $self = bless {
160         byte_size   => 4,
161
162         digest      => undef,
163         hash_size   => 16,  # In bytes
164         hash_chars  => 256, # Number of chars the algorithm uses per byte
165         max_buckets => 16,
166         num_txns    => 1,   # The HEAD
167         trans_id    => 0,   # Default to the HEAD
168
169         data_sector_size => 64, # Size in bytes of each data sector
170
171         entries => {}, # This is the list of entries for transactions
172         storage => undef,
173     }, $class;
174
175     # Never allow byte_size to be set directly.
176     delete $args->{byte_size};
177     if ( defined $args->{pack_size} ) {
178         if ( lc $args->{pack_size} eq 'small' ) {
179             $args->{byte_size} = 2;
180         }
181         elsif ( lc $args->{pack_size} eq 'medium' ) {
182             $args->{byte_size} = 4;
183         }
184         elsif ( lc $args->{pack_size} eq 'large' ) {
185             $args->{byte_size} = 8;
186         }
187         else {
188             DBM::Deep->_throw_error( "Unknown pack_size value: '$args->{pack_size}'" );
189         }
190     }
191
192     # Grab the parameters we want to use
193     foreach my $param ( keys %$self ) {
194         next unless exists $args->{$param};
195         $self->{$param} = $args->{$param};
196     }
197
198     my %validations = (
199         max_buckets      => { floor => 16, ceil => 256 },
200         num_txns         => { floor => 1,  ceil => 255 },
201         data_sector_size => { floor => 32, ceil => 256 },
202     );
203
204     while ( my ($attr, $c) = each %validations ) {
205         if (   !defined $self->{$attr}
206             || !length $self->{$attr}
207             || $self->{$attr} =~ /\D/
208             || $self->{$attr} < $c->{floor}
209         ) {
210             $self->{$attr} = '(undef)' if !defined $self->{$attr};
211             warn "Floor of $attr is $c->{floor}. Setting it to $c->{floor} from '$self->{$attr}'\n";
212             $self->{$attr} = $c->{floor};
213         }
214         elsif ( $self->{$attr} > $c->{ceil} ) {
215             warn "Ceiling of $attr is $c->{ceil}. Setting it to $c->{ceil} from '$self->{$attr}'\n";
216             $self->{$attr} = $c->{ceil};
217         }
218     }
219
220     if ( !$self->{digest} ) {
221         require Digest::MD5;
222         $self->{digest} = \&Digest::MD5::md5;
223     }
224
225     return $self;
226 }
227
228 =head2 read_value( $obj, $key )
229
230 This takes an object that provides _base_offset() and a string. It returns the
231 value stored in the corresponding Sector::Value's data section.
232
233 =cut
234
235 sub read_value {
236     my $self = shift;
237     my ($obj, $key) = @_;
238
239     # This will be a Reference sector
240     my $sector = $self->_load_sector( $obj->_base_offset )
241         or return;
242
243     if ( $sector->staleness != $obj->_staleness ) {
244         return;
245     }
246
247     my $key_md5 = $self->_apply_digest( $key );
248
249     my $value_sector = $sector->get_data_for({
250         key_md5    => $key_md5,
251         allow_head => 1,
252     });
253
254     unless ( $value_sector ) {
255         $value_sector = DBM::Deep::Engine::Sector::Null->new({
256             engine => $self,
257             data   => undef,
258         });
259
260         $sector->write_data({
261             key_md5 => $key_md5,
262             key     => $key,
263             value   => $value_sector,
264         });
265     }
266
267     return $value_sector->data;
268 }
269
270 =head2 get_classname( $obj )
271
272 This takes an object that provides _base_offset() and returns the classname (if any)
273 associated with it.
274
275 It delegates to Sector::Reference::get_classname() for the heavy lifting.
276
277 It performs a staleness check.
278
279 =cut
280
281 sub get_classname {
282     my $self = shift;
283     my ($obj) = @_;
284
285     # This will be a Reference sector
286     my $sector = $self->_load_sector( $obj->_base_offset )
287         or DBM::Deep->_throw_error( "How did get_classname fail (no sector for '$obj')?!" );
288
289     if ( $sector->staleness != $obj->_staleness ) {
290         return;
291     }
292
293     return $sector->get_classname;
294 }
295
296 =head2 make_reference( $obj, $old_key, $new_key )
297
298 This takes an object that provides _base_offset() and two strings. The
299 strings correspond to the old key and new key, respectively. This operation
300 is equivalent to (given C<< $db->{foo} = []; >>) C<< $db->{bar} = $db->{foo}; >>.
301
302 This returns nothing.
303
304 =cut
305
306 sub make_reference {
307     my $self = shift;
308     my ($obj, $old_key, $new_key) = @_;
309
310     # This will be a Reference sector
311     my $sector = $self->_load_sector( $obj->_base_offset )
312         or DBM::Deep->_throw_error( "How did make_reference fail (no sector for '$obj')?!" );
313
314     if ( $sector->staleness != $obj->_staleness ) {
315         return;
316     }
317
318     my $old_md5 = $self->_apply_digest( $old_key );
319
320     my $value_sector = $sector->get_data_for({
321         key_md5    => $old_md5,
322         allow_head => 1,
323     });
324
325     unless ( $value_sector ) {
326         $value_sector = DBM::Deep::Engine::Sector::Null->new({
327             engine => $self,
328             data   => undef,
329         });
330
331         $sector->write_data({
332             key_md5 => $old_md5,
333             key     => $old_key,
334             value   => $value_sector,
335         });
336     }
337
338     if ( $value_sector->isa( 'DBM::Deep::Engine::Sector::Reference' ) ) {
339         $sector->write_data({
340             key     => $new_key,
341             key_md5 => $self->_apply_digest( $new_key ),
342             value   => $value_sector,
343         });
344         $value_sector->increment_refcount;
345     }
346     else {
347         $sector->write_data({
348             key     => $new_key,
349             key_md5 => $self->_apply_digest( $new_key ),
350             value   => $value_sector->clone,
351         });
352     }
353
354     return;
355 }
356
357 =head2 key_exists( $obj, $key )
358
359 This takes an object that provides _base_offset() and a string for
360 the key to be checked. This returns 1 for true and "" for false.
361
362 =cut
363
364 sub key_exists {
365     my $self = shift;
366     my ($obj, $key) = @_;
367
368     # This will be a Reference sector
369     my $sector = $self->_load_sector( $obj->_base_offset )
370         or return '';
371
372     if ( $sector->staleness != $obj->_staleness ) {
373         return '';
374     }
375
376     my $data = $sector->get_data_for({
377         key_md5    => $self->_apply_digest( $key ),
378         allow_head => 1,
379     });
380
381     # exists() returns 1 or '' for true/false.
382     return $data ? 1 : '';
383 }
384
385 =head2 delete_key( $obj, $key )
386
387 This takes an object that provides _base_offset() and a string for
388 the key to be deleted. This returns the result of the Sector::Reference
389 delete_key() method.
390
391 =cut
392
393 sub delete_key {
394     my $self = shift;
395     my ($obj, $key) = @_;
396
397     my $sector = $self->_load_sector( $obj->_base_offset )
398         or return;
399
400     if ( $sector->staleness != $obj->_staleness ) {
401         return;
402     }
403
404     return $sector->delete_key({
405         key_md5    => $self->_apply_digest( $key ),
406         allow_head => 0,
407     });
408 }
409
410 =head2 write_value( $obj, $key, $value )
411
412 This takes an object that provides _base_offset(), a string for the
413 key, and a value. This value can be anything storable within L<DBM::Deep/>.
414
415 This returns 1 upon success.
416
417 =cut
418
419 sub write_value {
420     my $self = shift;
421     my ($obj, $key, $value) = @_;
422
423     my $r = Scalar::Util::reftype( $value ) || '';
424     {
425         last if $r eq '';
426         last if $r eq 'HASH';
427         last if $r eq 'ARRAY';
428
429         DBM::Deep->_throw_error(
430             "Storage of references of type '$r' is not supported."
431         );
432     }
433
434     # This will be a Reference sector
435     my $sector = $self->_load_sector( $obj->_base_offset )
436         or DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep." );
437
438     if ( $sector->staleness != $obj->_staleness ) {
439         DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep." );
440     }
441
442     my ($class, $type);
443     if ( !defined $value ) {
444         $class = 'DBM::Deep::Engine::Sector::Null';
445     }
446     elsif ( $r eq 'ARRAY' || $r eq 'HASH' ) {
447         my $tmpvar;
448         if ( $r eq 'ARRAY' ) {
449             $tmpvar = tied @$value;
450         } elsif ( $r eq 'HASH' ) {
451             $tmpvar = tied %$value;
452         }
453
454         if ( $tmpvar ) {
455             my $is_dbm_deep = eval { local $SIG{'__DIE__'}; $tmpvar->isa( 'DBM::Deep' ); };
456
457             unless ( $is_dbm_deep ) {
458                 DBM::Deep->_throw_error( "Cannot store something that is tied." );
459             }
460
461             unless ( $tmpvar->_engine->storage == $self->storage ) {
462                 DBM::Deep->_throw_error( "Cannot store values across DBM::Deep files. Please use export() instead." );
463             }
464
465             # First, verify if we're storing the same thing to this spot. If we are, then
466             # this should be a no-op. -EJS, 2008-05-19
467             my $loc = $sector->get_data_location_for({
468                 key_md5 => $self->_apply_digest( $key ),
469                 allow_head => 1,
470             });
471
472             if ( defined($loc) && $loc == $tmpvar->_base_offset ) {
473                 return 1;
474             }
475
476             #XXX Can this use $loc?
477             my $value_sector = $self->_load_sector( $tmpvar->_base_offset );
478             $sector->write_data({
479                 key     => $key,
480                 key_md5 => $self->_apply_digest( $key ),
481                 value   => $value_sector,
482             });
483             $value_sector->increment_refcount;
484
485             return 1;
486         }
487
488         $class = 'DBM::Deep::Engine::Sector::Reference';
489         $type = substr( $r, 0, 1 );
490     }
491     else {
492         if ( tied($value) ) {
493             DBM::Deep->_throw_error( "Cannot store something that is tied." );
494         }
495         $class = 'DBM::Deep::Engine::Sector::Scalar';
496     }
497
498     # Create this after loading the reference sector in case something bad happens.
499     # This way, we won't allocate value sector(s) needlessly.
500     my $value_sector = $class->new({
501         engine => $self,
502         data   => $value,
503         type   => $type,
504     });
505
506     $sector->write_data({
507         key     => $key,
508         key_md5 => $self->_apply_digest( $key ),
509         value   => $value_sector,
510     });
511
512     # This code is to make sure we write all the values in the $value to the disk
513     # and to make sure all changes to $value after the assignment are reflected
514     # on disk. This may be counter-intuitive at first, but it is correct dwimmery.
515     #   NOTE - simply tying $value won't perform a STORE on each value. Hence, the
516     # copy to a temp value.
517     if ( $r eq 'ARRAY' ) {
518         my @temp = @$value;
519         tie @$value, 'DBM::Deep', {
520             base_offset => $value_sector->offset,
521             staleness   => $value_sector->staleness,
522             storage     => $self->storage,
523             engine      => $self,
524         };
525         @$value = @temp;
526         bless $value, 'DBM::Deep::Array' unless Scalar::Util::blessed( $value );
527     }
528     elsif ( $r eq 'HASH' ) {
529         my %temp = %$value;
530         tie %$value, 'DBM::Deep', {
531             base_offset => $value_sector->offset,
532             staleness   => $value_sector->staleness,
533             storage     => $self->storage,
534             engine      => $self,
535         };
536
537         %$value = %temp;
538         bless $value, 'DBM::Deep::Hash' unless Scalar::Util::blessed( $value );
539     }
540
541     return 1;
542 }
543
544 =head2 get_next_key( $obj, $prev_key )
545
546 This takes an object that provides _base_offset() and an optional string
547 representing the prior key returned via a prior invocation of this method.
548
549 This method delegates to C<< DBM::Deep::Iterator->get_next_key() >>.
550
551 =cut
552
553 # XXX Add staleness here
554 sub get_next_key {
555     my $self = shift;
556     my ($obj, $prev_key) = @_;
557
558     # XXX Need to add logic about resetting the iterator if any key in the reference has changed
559     unless ( defined $prev_key ) {
560         $obj->{iterator} = DBM::Deep::Iterator->new({
561             base_offset => $obj->_base_offset,
562             engine      => $self,
563         });
564     }
565
566     return $obj->{iterator}->get_next_key( $obj );
567 }
568
569 =head2 setup_fh( $obj )
570
571 This takes an object that provides _base_offset(). It will do everything needed
572 in order to properly initialize all values for necessary functioning. If this is
573 called upon an already initialized object, this will also reset the inode.
574
575 This returns 1.
576
577 =cut
578
579 sub setup_fh {
580     my $self = shift;
581     my ($obj) = @_;
582
583     # We're opening the file.
584     unless ( $obj->_base_offset ) {
585         my $bytes_read = $self->_read_file_header;
586
587         # Creating a new file
588         unless ( $bytes_read ) {
589             $self->_write_file_header;
590
591             # 1) Create Array/Hash entry
592             my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
593                 engine => $self,
594                 type   => $obj->_type,
595             });
596             $obj->{base_offset} = $initial_reference->offset;
597             $obj->{staleness} = $initial_reference->staleness;
598
599             $self->storage->flush;
600         }
601         # Reading from an existing file
602         else {
603             $obj->{base_offset} = $bytes_read;
604             my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
605                 engine => $self,
606                 offset => $obj->_base_offset,
607             });
608             unless ( $initial_reference ) {
609                 DBM::Deep->_throw_error("Corrupted file, no master index record");
610             }
611
612             unless ($obj->_type eq $initial_reference->type) {
613                 DBM::Deep->_throw_error("File type mismatch");
614             }
615
616             $obj->{staleness} = $initial_reference->staleness;
617         }
618     }
619
620     $self->storage->set_inode;
621
622     return 1;
623 }
624
625 =head2 begin_work( $obj )
626
627 This takes an object that provides _base_offset(). It will set up all necessary
628 bookkeeping in order to run all work within a transaction.
629
630 If $obj is already within a transaction, an error wiill be thrown. If there are
631 no more available transactions, an error will be thrown.
632
633 This returns undef.
634
635 =cut
636
637 sub begin_work {
638     my $self = shift;
639     my ($obj) = @_;
640
641     if ( $self->trans_id ) {
642         DBM::Deep->_throw_error( "Cannot begin_work within an active transaction" );
643     }
644
645     my @slots = $self->read_txn_slots;
646     my $found;
647     for my $i ( 0 .. $#slots ) {
648         next if $slots[$i];
649
650         $slots[$i] = 1;
651         $self->set_trans_id( $i + 1 );
652         $found = 1;
653         last;
654     }
655     unless ( $found ) {
656         DBM::Deep->_throw_error( "Cannot allocate transaction ID" );
657     }
658     $self->write_txn_slots( @slots );
659
660     if ( !$self->trans_id ) {
661         DBM::Deep->_throw_error( "Cannot begin_work - no available transactions" );
662     }
663
664     return;
665 }
666
667 =head2 rollback( $obj )
668
669 This takes an object that provides _base_offset(). It will revert all
670 actions taken within the running transaction.
671
672 If $obj is not within a transaction, an error will be thrown.
673
674 This returns 1.
675
676 =cut
677
678 sub rollback {
679     my $self = shift;
680     my ($obj) = @_;
681
682     if ( !$self->trans_id ) {
683         DBM::Deep->_throw_error( "Cannot rollback without an active transaction" );
684     }
685
686     # Each entry is the file location for a bucket that has a modification for
687     # this transaction. The entries need to be expunged.
688     foreach my $entry (@{ $self->get_entries } ) {
689         # Remove the entry here
690         my $read_loc = $entry
691           + $self->hash_size
692           + $self->byte_size
693           + $self->byte_size
694           + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE );
695
696         my $data_loc = $self->storage->read_at( $read_loc, $self->byte_size );
697         $data_loc = unpack( $StP{$self->byte_size}, $data_loc );
698         $self->storage->print_at( $read_loc, pack( $StP{$self->byte_size}, 0 ) );
699
700         if ( $data_loc > 1 ) {
701             $self->_load_sector( $data_loc )->free;
702         }
703     }
704
705     $self->clear_entries;
706
707     my @slots = $self->read_txn_slots;
708     $slots[$self->trans_id-1] = 0;
709     $self->write_txn_slots( @slots );
710     $self->inc_txn_staleness_counter( $self->trans_id );
711     $self->set_trans_id( 0 );
712
713     return 1;
714 }
715
716 =head2 commit( $obj )
717
718 This takes an object that provides _base_offset(). It will apply all
719 actions taken within the transaction to the HEAD.
720
721 If $obj is not within a transaction, an error will be thrown.
722
723 This returns 1.
724
725 =cut
726
727 sub commit {
728     my $self = shift;
729     my ($obj) = @_;
730
731     if ( !$self->trans_id ) {
732         DBM::Deep->_throw_error( "Cannot commit without an active transaction" );
733     }
734
735     foreach my $entry (@{ $self->get_entries } ) {
736         # Overwrite the entry in head with the entry in trans_id
737         my $base = $entry
738           + $self->hash_size
739           + $self->byte_size;
740
741         my $head_loc = $self->storage->read_at( $base, $self->byte_size );
742         $head_loc = unpack( $StP{$self->byte_size}, $head_loc );
743
744         my $spot = $base + $self->byte_size + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE );
745         my $trans_loc = $self->storage->read_at(
746             $spot, $self->byte_size,
747         );
748
749         $self->storage->print_at( $base, $trans_loc );
750         $self->storage->print_at(
751             $spot,
752             pack( $StP{$self->byte_size} . ' ' . $StP{$STALE_SIZE}, (0) x 2 ),
753         );
754
755         if ( $head_loc > 1 ) {
756             $self->_load_sector( $head_loc )->free;
757         }
758     }
759
760     $self->clear_entries;
761
762     my @slots = $self->read_txn_slots;
763     $slots[$self->trans_id-1] = 0;
764     $self->write_txn_slots( @slots );
765     $self->inc_txn_staleness_counter( $self->trans_id );
766     $self->set_trans_id( 0 );
767
768     return 1;
769 }
770
771 =head2 lock_exclusive()
772
773 This takes an object that provides _base_offset(). It will guarantee that
774 the storage has taken precautions to be safe for a write.
775
776 This returns nothing.
777
778 =cut
779
780 sub lock_exclusive {
781     my $self = shift;
782     my ($obj) = @_;
783     return $self->storage->lock_exclusive( $obj );
784 }
785
786 =head2 lock_shared()
787
788 This takes an object that provides _base_offset(). It will guarantee that
789 the storage has taken precautions to be safe for a read.
790
791 This returns nothing.
792
793 =cut
794
795 sub lock_shared {
796     my $self = shift;
797     my ($obj) = @_;
798     return $self->storage->lock_shared( $obj );
799 }
800
801 =head2 unlock()
802
803 This takes an object that provides _base_offset(). It will guarantee that
804 the storage has released all locks taken.
805
806 This returns nothing.
807
808 =cut
809
810 sub unlock {
811     my $self = shift;
812     my ($obj) = @_;
813
814     my $rv = $self->storage->unlock( $obj );
815
816     $self->flush if $rv;
817
818     return $rv;
819 }
820
821 =head1 INTERNAL METHODS
822
823 The following methods are internal-use-only to DBM::Deep::Engine.
824
825 =cut
826
827 =head2 read_txn_slots()
828
829 This takes no arguments.
830
831 This will return an array with a 1 or 0 in each slot. Each spot represents one
832 available transaction. If the slot is 1, that transaction is taken. If it is 0,
833 the transaction is available.
834
835 =cut
836
837 sub read_txn_slots {
838     my $self = shift;
839     my $bl = $self->txn_bitfield_len;
840     my $num_bits = $bl * 8;
841     return split '', unpack( 'b'.$num_bits,
842         $self->storage->read_at(
843             $self->trans_loc, $bl,
844         )
845     );
846 }
847
848 =head2 write_txn_slots( @slots )
849
850 This takes an array of 1's and 0's. This array represents the transaction slots
851 returned by L</read_txn_slots()>. In other words, the following is true:
852
853   @x = read_txn_slots( write_txn_slots( @x ) );
854
855 (With the obviously missing object referents added back in.)
856
857 =cut
858
859 sub write_txn_slots {
860     my $self = shift;
861     my $num_bits = $self->txn_bitfield_len * 8;
862     $self->storage->print_at( $self->trans_loc,
863         pack( 'b'.$num_bits, join('', @_) ),
864     );
865 }
866
867 =head2 get_running_txn_ids()
868
869 This takes no arguments.
870
871 This will return an array of taken transaction IDs. This wraps L</read_txn_slots()>.
872
873 =cut
874
875 sub get_running_txn_ids {
876     my $self = shift;
877     my @transactions = $self->read_txn_slots;
878     my @trans_ids = map { $_+1 } grep { $transactions[$_] } 0 .. $#transactions;
879 }
880
881 =head2 get_txn_staleness_counter( $trans_id )
882
883 This will return the staleness counter for the given transaction ID. Please see
884 L</TRANSACTION STALENESS> for more information.
885
886 =cut
887
888 sub get_txn_staleness_counter {
889     my $self = shift;
890     my ($trans_id) = @_;
891
892     # Hardcode staleness of 0 for the HEAD
893     return 0 unless $trans_id;
894
895     return unpack( $StP{$STALE_SIZE},
896         $self->storage->read_at(
897             $self->trans_loc + $self->txn_bitfield_len + $STALE_SIZE * ($trans_id - 1),
898             $STALE_SIZE,
899         )
900     );
901 }
902
903 =head2 inc_txn_staleness_counter( $trans_id )
904
905 This will increment the staleness counter for the given transaction ID. Please see
906 L</TRANSACTION STALENESS> for more information.
907
908 =cut
909
910 sub inc_txn_staleness_counter {
911     my $self = shift;
912     my ($trans_id) = @_;
913
914     # Hardcode staleness of 0 for the HEAD
915     return 0 unless $trans_id;
916
917     $self->storage->print_at(
918         $self->trans_loc + $self->txn_bitfield_len + $STALE_SIZE * ($trans_id - 1),
919         pack( $StP{$STALE_SIZE}, $self->get_txn_staleness_counter( $trans_id ) + 1 ),
920     );
921 }
922
923 =head2 get_entries()
924
925 This takes no arguments.
926
927 This returns a list of all the sectors that have been modified by this transaction.
928
929 =cut
930
931 sub get_entries {
932     my $self = shift;
933     return [ keys %{ $self->{entries}{$self->trans_id} ||= {} } ];
934 }
935
936 =head2 add_entry( $trans_id, $location )
937
938 This takes a transaction ID and a file location and marks the sector at that location
939 as having been modified by the transaction identified by $trans_id.
940
941 This returns nothing.
942
943 B<NOTE>: Unlike all the other _entries() methods, there are several cases where
944 C<< $trans_id != $self->trans_id >> for this method.
945
946 =cut
947
948 sub add_entry {
949     my $self = shift;
950     my ($trans_id, $loc) = @_;
951
952     $self->{entries}{$trans_id} ||= {};
953     $self->{entries}{$trans_id}{$loc} = undef;
954 }
955
956 =head2 reindex_entry( $old_loc, $new_loc )
957
958 This takes two locations (old and new, respectively). If a location that has been
959 modified by this transaction is subsequently reindexed due to a bucketlist
960 overflowing, then the entries hash needs to be made aware of this change.
961
962 This returns nothing.
963
964 =cut
965
966 sub reindex_entry {
967     my $self = shift;
968     my ($old_loc, $new_loc) = @_;
969
970     TRANS:
971     while ( my ($trans_id, $locs) = each %{ $self->{entries} } ) {
972         if ( exists $locs->{$old_loc} ) {
973             delete $locs->{$old_loc};
974             $locs->{$new_loc} = undef;
975             next TRANS;
976         }
977     }
978 }
979
980 =head2 clear_entries()
981
982 This takes no arguments. It will clear the entries list for the running transaction.
983
984 This returns nothing.
985
986 =cut
987
988 sub clear_entries {
989     my $self = shift;
990     delete $self->{entries}{$self->trans_id};
991 }
992
993 =head2 _write_file_header()
994
995 This writes the file header for a new file. This will write the various settings
996 that set how the file is interpreted.
997
998 =head2 _read_file_header()
999
1000 This reads the file header from an existing file. This will read the various
1001 settings that set how the file is interpreted.
1002
1003 =cut
1004
1005 {
1006     my $header_fixed = length( SIG_FILE ) + 1 + 4 + 4;
1007     my $this_file_version = 3;
1008
1009     sub _write_file_header {
1010         my $self = shift;
1011
1012         my $nt = $self->num_txns;
1013         my $bl = $self->txn_bitfield_len;
1014
1015         my $header_var = 1 + 1 + 1 + 1 + $bl + $STALE_SIZE * ($nt - 1) + 3 * $self->byte_size;
1016
1017         my $loc = $self->storage->request_space( $header_fixed + $header_var );
1018
1019         $self->storage->print_at( $loc,
1020             SIG_FILE,
1021             SIG_HEADER,
1022             pack('N', $this_file_version), # At this point, we're at 9 bytes
1023             pack('N', $header_var),        # header size
1024             # --- Above is $header_fixed. Below is $header_var
1025             pack('C', $self->byte_size),
1026
1027             # These shenanigans are to allow a 256 within a C
1028             pack('C', $self->max_buckets - 1),
1029             pack('C', $self->data_sector_size - 1),
1030
1031             pack('C', $nt),
1032             pack('C' . $bl, 0 ),                           # Transaction activeness bitfield
1033             pack($StP{$STALE_SIZE}.($nt-1), 0 x ($nt-1) ), # Transaction staleness counters
1034             pack($StP{$self->byte_size}, 0), # Start of free chain (blist size)
1035             pack($StP{$self->byte_size}, 0), # Start of free chain (data size)
1036             pack($StP{$self->byte_size}, 0), # Start of free chain (index size)
1037         );
1038
1039         #XXX Set these less fragilely
1040         $self->set_trans_loc( $header_fixed + 4 );
1041         $self->set_chains_loc( $header_fixed + 4 + $bl + $STALE_SIZE * ($nt-1) );
1042
1043         return;
1044     }
1045
1046     sub _read_file_header {
1047         my $self = shift;
1048
1049         my $buffer = $self->storage->read_at( 0, $header_fixed );
1050         return unless length($buffer);
1051
1052         my ($file_signature, $sig_header, $file_version, $size) = unpack(
1053             'A4 A N N', $buffer
1054         );
1055
1056         unless ( $file_signature eq SIG_FILE ) {
1057             $self->storage->close;
1058             DBM::Deep->_throw_error( "Signature not found -- file is not a Deep DB" );
1059         }
1060
1061         unless ( $sig_header eq SIG_HEADER ) {
1062             $self->storage->close;
1063             DBM::Deep->_throw_error( "Pre-1.00 file version found" );
1064         }
1065
1066         unless ( $file_version == $this_file_version ) {
1067             $self->storage->close;
1068             DBM::Deep->_throw_error(
1069                 "Wrong file version found - " .  $file_version .
1070                 " - expected " . $this_file_version
1071             );
1072         }
1073
1074         my $buffer2 = $self->storage->read_at( undef, $size );
1075         my @values = unpack( 'C C C C', $buffer2 );
1076
1077         if ( @values != 4 || grep { !defined } @values ) {
1078             $self->storage->close;
1079             DBM::Deep->_throw_error("Corrupted file - bad header");
1080         }
1081
1082         #XXX Add warnings if values weren't set right
1083         @{$self}{qw(byte_size max_buckets data_sector_size num_txns)} = @values;
1084
1085         # These shenangians are to allow a 256 within a C
1086         $self->{max_buckets} += 1;
1087         $self->{data_sector_size} += 1;
1088
1089         my $bl = $self->txn_bitfield_len;
1090
1091         my $header_var = scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) + 3 * $self->byte_size;
1092         unless ( $size == $header_var ) {
1093             $self->storage->close;
1094             DBM::Deep->_throw_error( "Unexpected size found ($size <-> $header_var)." );
1095         }
1096
1097         $self->set_trans_loc( $header_fixed + scalar(@values) );
1098         $self->set_chains_loc( $header_fixed + scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) );
1099
1100         return length($buffer) + length($buffer2);
1101     }
1102 }
1103
1104 =head2 _load_sector( $offset )
1105
1106 This will instantiate and return the sector object that represents the data found
1107 at $offset.
1108
1109 =cut
1110
1111 sub _load_sector {
1112     my $self = shift;
1113     my ($offset) = @_;
1114
1115     # Add a catch for offset of 0 or 1
1116     return if !$offset || $offset <= 1;
1117
1118     my $type = $self->storage->read_at( $offset, 1 );
1119     return if $type eq chr(0);
1120
1121     if ( $type eq $self->SIG_ARRAY || $type eq $self->SIG_HASH ) {
1122         return DBM::Deep::Engine::Sector::Reference->new({
1123             engine => $self,
1124             type   => $type,
1125             offset => $offset,
1126         });
1127     }
1128     # XXX Don't we need key_md5 here?
1129     elsif ( $type eq $self->SIG_BLIST ) {
1130         return DBM::Deep::Engine::Sector::BucketList->new({
1131             engine => $self,
1132             type   => $type,
1133             offset => $offset,
1134         });
1135     }
1136     elsif ( $type eq $self->SIG_INDEX ) {
1137         return DBM::Deep::Engine::Sector::Index->new({
1138             engine => $self,
1139             type   => $type,
1140             offset => $offset,
1141         });
1142     }
1143     elsif ( $type eq $self->SIG_NULL ) {
1144         return DBM::Deep::Engine::Sector::Null->new({
1145             engine => $self,
1146             type   => $type,
1147             offset => $offset,
1148         });
1149     }
1150     elsif ( $type eq $self->SIG_DATA ) {
1151         return DBM::Deep::Engine::Sector::Scalar->new({
1152             engine => $self,
1153             type   => $type,
1154             offset => $offset,
1155         });
1156     }
1157     # This was deleted from under us, so just return and let the caller figure it out.
1158     elsif ( $type eq $self->SIG_FREE ) {
1159         return;
1160     }
1161
1162     DBM::Deep->_throw_error( "'$offset': Don't know what to do with type '$type'" );
1163 }
1164
1165 =head2 _apply_digest( @stuff )
1166
1167 This will apply the digest methd (default to Digest::MD5::md5) to the arguments
1168 passed in and return the result.
1169
1170 =cut
1171
1172 sub _apply_digest {
1173     my $self = shift;
1174     return $self->{digest}->(@_);
1175 }
1176
1177 =head2 _add_free_blist_sector( $offset, $size )
1178
1179 =head2 _add_free_data_sector( $offset, $size )
1180
1181 =head2 _add_free_index_sector( $offset, $size )
1182
1183 These methods are all wrappers around _add_free_sector(), providing the proper
1184 chain offset ($multiple) for the sector type.
1185
1186 =cut
1187
1188 sub _add_free_blist_sector { shift->_add_free_sector( 0, @_ ) }
1189 sub _add_free_data_sector { shift->_add_free_sector( 1, @_ ) }
1190 sub _add_free_index_sector { shift->_add_free_sector( 2, @_ ) }
1191
1192 =head2 _add_free_sector( $multiple, $offset, $size )
1193
1194 _add_free_sector() takes the offset into the chains location, the offset of the
1195 sector, and the size of that sector. It will mark the sector as a free sector
1196 and put it into the list of sectors that are free of this type for use later.
1197
1198 This returns nothing.
1199
1200 B<NOTE>: $size is unused?
1201
1202 =cut
1203
1204 sub _add_free_sector {
1205     my $self = shift;
1206     my ($multiple, $offset, $size) = @_;
1207
1208     my $chains_offset = $multiple * $self->byte_size;
1209
1210     my $storage = $self->storage;
1211
1212     # Increment staleness.
1213     # XXX Can this increment+modulo be done by "&= 0x1" ?
1214     my $staleness = unpack( $StP{$STALE_SIZE}, $storage->read_at( $offset + SIG_SIZE, $STALE_SIZE ) );
1215     $staleness = ($staleness + 1 ) % ( 2 ** ( 8 * $STALE_SIZE ) );
1216     $storage->print_at( $offset + SIG_SIZE, pack( $StP{$STALE_SIZE}, $staleness ) );
1217
1218     my $old_head = $storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
1219
1220     $storage->print_at( $self->chains_loc + $chains_offset,
1221         pack( $StP{$self->byte_size}, $offset ),
1222     );
1223
1224     # Record the old head in the new sector after the signature and staleness counter
1225     $storage->print_at( $offset + SIG_SIZE + $STALE_SIZE, $old_head );
1226 }
1227
1228 =head2 _request_blist_sector( $size )
1229
1230 =head2 _request_data_sector( $size )
1231
1232 =head2 _request_index_sector( $size )
1233
1234 These methods are all wrappers around _request_sector(), providing the proper
1235 chain offset ($multiple) for the sector type.
1236
1237 =cut
1238
1239 sub _request_blist_sector { shift->_request_sector( 0, @_ ) }
1240 sub _request_data_sector { shift->_request_sector( 1, @_ ) }
1241 sub _request_index_sector { shift->_request_sector( 2, @_ ) }
1242
1243 =head2 _request_sector( $multiple $size )
1244
1245 This takes the offset into the chains location and the size of that sector.
1246
1247 This returns the object with the sector. If there is an available free sector of
1248 that type, then it will be reused. If there isn't one, then a new one will be
1249 allocated.
1250
1251 =cut
1252
1253 sub _request_sector {
1254     my $self = shift;
1255     my ($multiple, $size) = @_;
1256
1257     my $chains_offset = $multiple * $self->byte_size;
1258
1259     my $old_head = $self->storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
1260     my $loc = unpack( $StP{$self->byte_size}, $old_head );
1261
1262     # We don't have any free sectors of the right size, so allocate a new one.
1263     unless ( $loc ) {
1264         my $offset = $self->storage->request_space( $size );
1265
1266         # Zero out the new sector. This also guarantees correct increases
1267         # in the filesize.
1268         $self->storage->print_at( $offset, chr(0) x $size );
1269
1270         return $offset;
1271     }
1272
1273     # Read the new head after the signature and the staleness counter
1274     my $new_head = $self->storage->read_at( $loc + SIG_SIZE + $STALE_SIZE, $self->byte_size );
1275     $self->storage->print_at( $self->chains_loc + $chains_offset, $new_head );
1276     $self->storage->print_at(
1277         $loc + SIG_SIZE + $STALE_SIZE,
1278         pack( $StP{$self->byte_size}, 0 ),
1279     );
1280
1281     return $loc;
1282 }
1283
1284 =head2 flush()
1285
1286 This takes no arguments. It will do everything necessary to flush all things to
1287 disk. This is usually called during unlock() and setup_fh().
1288
1289 This returns nothing.
1290
1291 =cut
1292
1293 sub flush {
1294     my $self = shift;
1295
1296     # Why do we need to have the storage flush? Shouldn't autoflush take care of things?
1297     # -RobK, 2008-06-26
1298     $self->storage->flush;
1299 }
1300
1301 =head2 ACCESSORS
1302
1303 The following are readonly attributes.
1304
1305 =over 4
1306
1307 =item * storage
1308
1309 =item * byte_size
1310
1311 =item * hash_size
1312
1313 =item * hash_chars
1314
1315 =item * num_txns
1316
1317 =item * max_buckets
1318
1319 =item * blank_md5
1320
1321 =item * data_sector_size
1322
1323 =item * txn_bitfield_len
1324
1325 =back
1326
1327 =cut
1328
1329 sub storage     { $_[0]{storage} }
1330 sub byte_size   { $_[0]{byte_size} }
1331 sub hash_size   { $_[0]{hash_size} }
1332 sub hash_chars  { $_[0]{hash_chars} }
1333 sub num_txns    { $_[0]{num_txns} }
1334 sub max_buckets { $_[0]{max_buckets} }
1335 sub blank_md5   { chr(0) x $_[0]->hash_size }
1336 sub data_sector_size { $_[0]{data_sector_size} }
1337
1338 # This is a calculated value
1339 sub txn_bitfield_len {
1340     my $self = shift;
1341     unless ( exists $self->{txn_bitfield_len} ) {
1342         my $temp = ($self->num_txns) / 8;
1343         if ( $temp > int( $temp ) ) {
1344             $temp = int( $temp ) + 1;
1345         }
1346         $self->{txn_bitfield_len} = $temp;
1347     }
1348     return $self->{txn_bitfield_len};
1349 }
1350
1351 =pod
1352
1353 The following are read/write attributes. 
1354
1355 =over 4
1356
1357 =item * trans_id / set_trans_id( $new_id )
1358
1359 =item * trans_loc / set_trans_loc( $new_loc )
1360
1361 =item * chains_loc / set_chains_loc( $new_loc )
1362
1363 =back
1364
1365 =cut
1366
1367 sub trans_id     { $_[0]{trans_id} }
1368 sub set_trans_id { $_[0]{trans_id} = $_[1] }
1369
1370 sub trans_loc     { $_[0]{trans_loc} }
1371 sub set_trans_loc { $_[0]{trans_loc} = $_[1] }
1372
1373 sub chains_loc     { $_[0]{chains_loc} }
1374 sub set_chains_loc { $_[0]{chains_loc} = $_[1] }
1375
1376 sub cache       { $_[0]{cache} ||= {} }
1377 sub clear_cache { %{$_[0]->cache} = () }
1378
1379 =head2 _dump_file()
1380
1381 This method takes no arguments. It's used to print out a textual representation of the DBM::Deep
1382 DB file. It assumes the file is not-corrupted.
1383
1384 =cut
1385
1386 sub _dump_file {
1387     my $self = shift;
1388
1389     # Read the header
1390     my $spot = $self->_read_file_header();
1391
1392     my %types = (
1393         0 => 'B',
1394         1 => 'D',
1395         2 => 'I',
1396     );
1397
1398     my %sizes = (
1399         'D' => $self->data_sector_size,
1400         'B' => DBM::Deep::Engine::Sector::BucketList->new({engine=>$self,offset=>1})->size,
1401         'I' => DBM::Deep::Engine::Sector::Index->new({engine=>$self,offset=>1})->size,
1402     );
1403
1404     my $return = "";
1405
1406     # Header values
1407     $return .= "NumTxns: " . $self->num_txns . $/;
1408
1409     # Read the free sector chains
1410     my %sectors;
1411     foreach my $multiple ( 0 .. 2 ) {
1412         $return .= "Chains($types{$multiple}):";
1413         my $old_loc = $self->chains_loc + $multiple * $self->byte_size;
1414         while ( 1 ) {
1415             my $loc = unpack(
1416                 $StP{$self->byte_size},
1417                 $self->storage->read_at( $old_loc, $self->byte_size ),
1418             );
1419
1420             # We're now out of free sectors of this kind.
1421             unless ( $loc ) {
1422                 last;
1423             }
1424
1425             $sectors{ $types{$multiple} }{ $loc } = undef;
1426             $old_loc = $loc + SIG_SIZE + $STALE_SIZE;
1427             $return .= " $loc";
1428         }
1429         $return .= $/;
1430     }
1431
1432     SECTOR:
1433     while ( $spot < $self->storage->{end} ) {
1434         # Read each sector in order.
1435         my $sector = $self->_load_sector( $spot );
1436         if ( !$sector ) {
1437             # Find it in the free-sectors that were found already
1438             foreach my $type ( keys %sectors ) {
1439                 if ( exists $sectors{$type}{$spot} ) {
1440                     my $size = $sizes{$type};
1441                     $return .= sprintf "%08d: %s %04d\n", $spot, 'F' . $type, $size;
1442                     $spot += $size;
1443                     next SECTOR;
1444                 }
1445             }
1446
1447             die "********\n$return\nDidn't find free sector for $spot in chains\n********\n";
1448         }
1449         else {
1450             $return .= sprintf "%08d: %s  %04d", $spot, $sector->type, $sector->size;
1451             if ( $sector->type eq 'D' ) {
1452                 $return .= ' ' . $sector->data;
1453             }
1454             elsif ( $sector->type eq 'A' || $sector->type eq 'H' ) {
1455                 $return .= ' REF: ' . $sector->get_refcount;
1456             }
1457             elsif ( $sector->type eq 'B' ) {
1458                 foreach my $bucket ( $sector->chopped_up ) {
1459                     $return .= "\n    ";
1460                     $return .= sprintf "%08d", unpack($StP{$self->byte_size},
1461                         substr( $bucket->[-1], $self->hash_size, $self->byte_size),
1462                     );
1463                     my $l = unpack( $StP{$self->byte_size},
1464                         substr( $bucket->[-1],
1465                             $self->hash_size + $self->byte_size,
1466                             $self->byte_size,
1467                         ),
1468                     );
1469                     $return .= sprintf " %08d", $l;
1470                     foreach my $txn ( 0 .. $self->num_txns - 2 ) {
1471                         my $l = unpack( $StP{$self->byte_size},
1472                             substr( $bucket->[-1],
1473                                 $self->hash_size + 2 * $self->byte_size + $txn * ($self->byte_size + $STALE_SIZE),
1474                                 $self->byte_size,
1475                             ),
1476                         );
1477                         $return .= sprintf " %08d", $l;
1478                     }
1479                 }
1480             }
1481             $return .= $/;
1482
1483             $spot += $sector->size;
1484         }
1485     }
1486
1487     return $return;
1488 }
1489
1490 1;
1491 __END__