49a075ce8fa23c8edb6e508b797c2df07d4b24d5
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 use 5.006_000;
4
5 use strict;
6 use warnings FATAL => 'all';
7
8 # Never import symbols into our namespace. We are a class, not a library.
9 # -RobK, 2008-05-27
10 use Scalar::Util ();
11
12 #use Data::Dumper ();
13
14 # File-wide notes:
15 # * Every method in here assumes that the storage has been appropriately
16 #   safeguarded. This can be anything from flock() to some sort of manual
17 #   mutex. But, it's the caller's responsability to make sure that this has
18 #   been done.
19
20 # Setup file and tag signatures.  These should never change.
21 sub SIG_FILE     () { 'DPDB' }
22 sub SIG_HEADER   () { 'h'    }
23 sub SIG_HASH     () { 'H'    }
24 sub SIG_ARRAY    () { 'A'    }
25 sub SIG_NULL     () { 'N'    }
26 sub SIG_DATA     () { 'D'    }
27 sub SIG_INDEX    () { 'I'    }
28 sub SIG_BLIST    () { 'B'    }
29 sub SIG_FREE     () { 'F'    }
30 sub SIG_SIZE     () {  1     }
31
32 use DBM::Deep::Iterator ();
33 use DBM::Deep::Engine::Sector::Data ();
34 use DBM::Deep::Engine::Sector::BucketList ();
35 use DBM::Deep::Engine::Sector::Index ();
36 use DBM::Deep::Engine::Sector::Null ();
37 use DBM::Deep::Engine::Sector::Reference ();
38 use DBM::Deep::Engine::Sector::Scalar ();
39 use DBM::Deep::Null ();
40
41 my $STALE_SIZE = 2;
42
43 # Please refer to the pack() documentation for further information
44 my %StP = (
45     1 => 'C', # Unsigned char value (no order needed as it's just one byte)
46     2 => 'n', # Unsigned short in "network" (big-endian) order
47     4 => 'N', # Unsigned long in "network" (big-endian) order
48     8 => 'Q', # Usigned quad (no order specified, presumably machine-dependent)
49 );
50
51 =head1 NAME
52
53 DBM::Deep::Engine
54
55 =head1 PURPOSE
56
57 This is an internal-use-only object for L<DBM::Deep/>. It mediates the low-level
58 mapping between the L<DBM::Deep/> objects and the storage medium.
59
60 The purpose of this documentation is to provide low-level documentation for
61 developers. It is B<not> intended to be used by the general public. This
62 documentation and what it documents can and will change without notice.
63
64 =head1 OVERVIEW
65
66 The engine exposes an API to the DBM::Deep objects (DBM::Deep, DBM::Deep::Array,
67 and DBM::Deep::Hash) for their use to access the actual stored values. This API
68 is the following:
69
70 =over 4
71
72 =item * new
73
74 =item * read_value
75
76 =item * get_classname
77
78 =item * make_reference
79
80 =item * key_exists
81
82 =item * delete_key
83
84 =item * write_value
85
86 =item * get_next_key
87
88 =item * setup_fh
89
90 =item * begin_work
91
92 =item * commit
93
94 =item * rollback
95
96 =item * lock_exclusive
97
98 =item * lock_shared
99
100 =item * unlock
101
102 =back
103
104 They are explained in their own sections below. These methods, in turn, may
105 provide some bounds-checking, but primarily act to instantiate objects in the
106 Engine::Sector::* hierarchy and dispatch to them.
107
108 =head1 TRANSACTIONS
109
110 Transactions in DBM::Deep are implemented using a variant of MVCC. This attempts
111 to keep the amount of actual work done against the file low while stil providing
112 Atomicity, Consistency, and Isolation. Durability, unfortunately, cannot be done
113 with only one file.
114
115 =head2 STALENESS
116
117 If another process uses a transaction slot and writes stuff to it, then terminates,
118 the data that process wrote it still within the file. In order to address this,
119 there is also a transaction staleness counter associated within every write.
120 Each time a transaction is started, that process increments that transaction's
121 staleness counter. If, when it reads a value, the staleness counters aren't
122 identical, DBM::Deep will consider the value on disk to be stale and discard it.
123
124 =head2 DURABILITY
125
126 The fourth leg of ACID is Durability, the guarantee that when a commit returns,
127 the data will be there the next time you read from it. This should be regardless
128 of any crashes or powerdowns in between the commit and subsequent read. DBM::Deep
129 does provide that guarantee; once the commit returns, all of the data has been
130 transferred from the transaction shadow to the HEAD. The issue arises with partial
131 commits - a commit that is interrupted in some fashion. In keeping with DBM::Deep's
132 "tradition" of very light error-checking and non-existent error-handling, there is
133 no way to recover from a partial commit. (This is probably a failure in Consistency
134 as well as Durability.)
135
136 Other DBMSes use transaction logs (a separate file, generally) to achieve Durability.
137 As DBM::Deep is a single-file, we would have to do something similar to what SQLite
138 and BDB do in terms of committing using synchonized writes. To do this, we would have
139 to use a much higher RAM footprint and some serious programming that make my head
140 hurts just to think about it.
141
142 =head1 EXTERNAL METHODS
143
144 =head2 new()
145
146 This takes a set of args. These args are described in the documentation for
147 L<DBM::Deep/new>.
148
149 =cut
150
151 sub new {
152     my $class = shift;
153     my ($args) = @_;
154
155     $args->{storage} = DBM::Deep::File->new( $args )
156         unless exists $args->{storage};
157
158     my $self = bless {
159         byte_size   => 4,
160
161         digest      => undef,
162         hash_size   => 16,  # In bytes
163         hash_chars  => 256, # Number of chars the algorithm uses per byte
164         max_buckets => 16,
165         num_txns    => 1,   # The HEAD
166         trans_id    => 0,   # Default to the HEAD
167
168         data_sector_size => 64, # Size in bytes of each data sector
169
170         entries => {}, # This is the list of entries for transactions
171         storage => undef,
172     }, $class;
173
174     # Never allow byte_size to be set directly.
175     delete $args->{byte_size};
176     if ( defined $args->{pack_size} ) {
177         if ( lc $args->{pack_size} eq 'small' ) {
178             $args->{byte_size} = 2;
179         }
180         elsif ( lc $args->{pack_size} eq 'medium' ) {
181             $args->{byte_size} = 4;
182         }
183         elsif ( lc $args->{pack_size} eq 'large' ) {
184             $args->{byte_size} = 8;
185         }
186         else {
187             DBM::Deep->_throw_error( "Unknown pack_size value: '$args->{pack_size}'" );
188         }
189     }
190
191     # Grab the parameters we want to use
192     foreach my $param ( keys %$self ) {
193         next unless exists $args->{$param};
194         $self->{$param} = $args->{$param};
195     }
196
197     my %validations = (
198         max_buckets      => { floor => 16, ceil => 256 },
199         num_txns         => { floor => 1,  ceil => 255 },
200         data_sector_size => { floor => 32, ceil => 256 },
201     );
202
203     while ( my ($attr, $c) = each %validations ) {
204         if (   !defined $self->{$attr}
205             || !length $self->{$attr}
206             || $self->{$attr} =~ /\D/
207             || $self->{$attr} < $c->{floor}
208         ) {
209             $self->{$attr} = '(undef)' if !defined $self->{$attr};
210             warn "Floor of $attr is $c->{floor}. Setting it to $c->{floor} from '$self->{$attr}'\n";
211             $self->{$attr} = $c->{floor};
212         }
213         elsif ( $self->{$attr} > $c->{ceil} ) {
214             warn "Ceiling of $attr is $c->{ceil}. Setting it to $c->{ceil} from '$self->{$attr}'\n";
215             $self->{$attr} = $c->{ceil};
216         }
217     }
218
219     if ( !$self->{digest} ) {
220         require Digest::MD5;
221         $self->{digest} = \&Digest::MD5::md5;
222     }
223
224     return $self;
225 }
226
227 =head2 read_value( $obj, $key )
228
229 This takes an object that provides _base_offset() and a string. It returns the
230 value stored in the corresponding Sector::Value's data section.
231
232 =cut
233
234 sub read_value {
235     my $self = shift;
236     my ($obj, $key) = @_;
237
238     # This will be a Reference sector
239     my $sector = $self->_load_sector( $obj->_base_offset )
240         or return;
241
242     if ( $sector->staleness != $obj->_staleness ) {
243         return;
244     }
245
246     my $key_md5 = $self->_apply_digest( $key );
247
248     my $value_sector = $sector->get_data_for({
249         key_md5    => $key_md5,
250         allow_head => 1,
251     });
252
253     unless ( $value_sector ) {
254         $value_sector = DBM::Deep::Engine::Sector::Null->new({
255             engine => $self,
256             data   => undef,
257         });
258
259         $sector->write_data({
260             key_md5 => $key_md5,
261             key     => $key,
262             value   => $value_sector,
263         });
264     }
265
266     return $value_sector->data;
267 }
268
269 =head2 get_classname( $obj )
270
271 This takes an object that provides _base_offset() and returns the classname (if any)
272 associated with it.
273
274 It delegates to Sector::Reference::get_classname() for the heavy lifting.
275
276 It performs a staleness check.
277
278 =cut
279
280 sub get_classname {
281     my $self = shift;
282     my ($obj) = @_;
283
284     # This will be a Reference sector
285     my $sector = $self->_load_sector( $obj->_base_offset )
286         or DBM::Deep->_throw_error( "How did get_classname fail (no sector for '$obj')?!" );
287
288     if ( $sector->staleness != $obj->_staleness ) {
289         return;
290     }
291
292     return $sector->get_classname;
293 }
294
295 =head2 make_reference( $obj, $old_key, $new_key )
296
297 This takes an object that provides _base_offset() and two strings. The
298 strings correspond to the old key and new key, respectively. This operation
299 is equivalent to (given C<< $db->{foo} = []; >>) C<< $db->{bar} = $db->{foo}; >>.
300
301 This returns nothing.
302
303 =cut
304
305 sub make_reference {
306     my $self = shift;
307     my ($obj, $old_key, $new_key) = @_;
308
309     # This will be a Reference sector
310     my $sector = $self->_load_sector( $obj->_base_offset )
311         or DBM::Deep->_throw_error( "How did make_reference fail (no sector for '$obj')?!" );
312
313     if ( $sector->staleness != $obj->_staleness ) {
314         return;
315     }
316
317     my $old_md5 = $self->_apply_digest( $old_key );
318
319     my $value_sector = $sector->get_data_for({
320         key_md5    => $old_md5,
321         allow_head => 1,
322     });
323
324     unless ( $value_sector ) {
325         $value_sector = DBM::Deep::Engine::Sector::Null->new({
326             engine => $self,
327             data   => undef,
328         });
329
330         $sector->write_data({
331             key_md5 => $old_md5,
332             key     => $old_key,
333             value   => $value_sector,
334         });
335     }
336
337     if ( $value_sector->isa( 'DBM::Deep::Engine::Sector::Reference' ) ) {
338         $sector->write_data({
339             key     => $new_key,
340             key_md5 => $self->_apply_digest( $new_key ),
341             value   => $value_sector,
342         });
343         $value_sector->increment_refcount;
344     }
345     else {
346         $sector->write_data({
347             key     => $new_key,
348             key_md5 => $self->_apply_digest( $new_key ),
349             value   => $value_sector->clone,
350         });
351     }
352
353     return;
354 }
355
356 =head2 key_exists( $obj, $key )
357
358 This takes an object that provides _base_offset() and a string for
359 the key to be checked. This returns 1 for true and "" for false.
360
361 =cut
362
363 sub key_exists {
364     my $self = shift;
365     my ($obj, $key) = @_;
366
367     # This will be a Reference sector
368     my $sector = $self->_load_sector( $obj->_base_offset )
369         or return '';
370
371     if ( $sector->staleness != $obj->_staleness ) {
372         return '';
373     }
374
375     my $data = $sector->get_data_for({
376         key_md5    => $self->_apply_digest( $key ),
377         allow_head => 1,
378     });
379
380     # exists() returns 1 or '' for true/false.
381     return $data ? 1 : '';
382 }
383
384 =head2 delete_key( $obj, $key )
385
386 This takes an object that provides _base_offset() and a string for
387 the key to be deleted. This returns the result of the Sector::Reference
388 delete_key() method.
389
390 =cut
391
392 sub delete_key {
393     my $self = shift;
394     my ($obj, $key) = @_;
395
396     my $sector = $self->_load_sector( $obj->_base_offset )
397         or return;
398
399     if ( $sector->staleness != $obj->_staleness ) {
400         return;
401     }
402
403     return $sector->delete_key({
404         key_md5    => $self->_apply_digest( $key ),
405         allow_head => 0,
406     });
407 }
408
409 =head2 write_value( $obj, $key, $value )
410
411 This takes an object that provides _base_offset(), a string for the
412 key, and a value. This value can be anything storable within L<DBM::Deep/>.
413
414 This returns 1 upon success.
415
416 =cut
417
418 sub write_value {
419     my $self = shift;
420     my ($obj, $key, $value) = @_;
421
422     my $r = Scalar::Util::reftype( $value ) || '';
423     {
424         last if $r eq '';
425         last if $r eq 'HASH';
426         last if $r eq 'ARRAY';
427
428         DBM::Deep->_throw_error(
429             "Storage of references of type '$r' is not supported."
430         );
431     }
432
433     # This will be a Reference sector
434     my $sector = $self->_load_sector( $obj->_base_offset )
435         or DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep." );
436
437     if ( $sector->staleness != $obj->_staleness ) {
438         DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep." );
439     }
440
441     my ($class, $type);
442     if ( !defined $value ) {
443         $class = 'DBM::Deep::Engine::Sector::Null';
444     }
445     elsif ( $r eq 'ARRAY' || $r eq 'HASH' ) {
446         my $tmpvar;
447         if ( $r eq 'ARRAY' ) {
448             $tmpvar = tied @$value;
449         } elsif ( $r eq 'HASH' ) {
450             $tmpvar = tied %$value;
451         }
452
453         if ( $tmpvar ) {
454             my $is_dbm_deep = eval { local $SIG{'__DIE__'}; $tmpvar->isa( 'DBM::Deep' ); };
455
456             unless ( $is_dbm_deep ) {
457                 DBM::Deep->_throw_error( "Cannot store something that is tied." );
458             }
459
460             unless ( $tmpvar->_engine->storage == $self->storage ) {
461                 DBM::Deep->_throw_error( "Cannot store values across DBM::Deep files. Please use export() instead." );
462             }
463
464             # First, verify if we're storing the same thing to this spot. If we are, then
465             # this should be a no-op. -EJS, 2008-05-19
466             my $loc = $sector->get_data_location_for({
467                 key_md5 => $self->_apply_digest( $key ),
468                 allow_head => 1,
469             });
470
471             if ( defined($loc) && $loc == $tmpvar->_base_offset ) {
472                 return 1;
473             }
474
475             #XXX Can this use $loc?
476             my $value_sector = $self->_load_sector( $tmpvar->_base_offset );
477             $sector->write_data({
478                 key     => $key,
479                 key_md5 => $self->_apply_digest( $key ),
480                 value   => $value_sector,
481             });
482             $value_sector->increment_refcount;
483
484             return 1;
485         }
486
487         $class = 'DBM::Deep::Engine::Sector::Reference';
488         $type = substr( $r, 0, 1 );
489     }
490     else {
491         if ( tied($value) ) {
492             DBM::Deep->_throw_error( "Cannot store something that is tied." );
493         }
494         $class = 'DBM::Deep::Engine::Sector::Scalar';
495     }
496
497     # Create this after loading the reference sector in case something bad happens.
498     # This way, we won't allocate value sector(s) needlessly.
499     my $value_sector = $class->new({
500         engine => $self,
501         data   => $value,
502         type   => $type,
503     });
504
505     $sector->write_data({
506         key     => $key,
507         key_md5 => $self->_apply_digest( $key ),
508         value   => $value_sector,
509     });
510
511     # This code is to make sure we write all the values in the $value to the disk
512     # and to make sure all changes to $value after the assignment are reflected
513     # on disk. This may be counter-intuitive at first, but it is correct dwimmery.
514     #   NOTE - simply tying $value won't perform a STORE on each value. Hence, the
515     # copy to a temp value.
516     if ( $r eq 'ARRAY' ) {
517         my @temp = @$value;
518         tie @$value, 'DBM::Deep', {
519             base_offset => $value_sector->offset,
520             staleness   => $value_sector->staleness,
521             storage     => $self->storage,
522             engine      => $self,
523         };
524         @$value = @temp;
525         bless $value, 'DBM::Deep::Array' unless Scalar::Util::blessed( $value );
526     }
527     elsif ( $r eq 'HASH' ) {
528         my %temp = %$value;
529         tie %$value, 'DBM::Deep', {
530             base_offset => $value_sector->offset,
531             staleness   => $value_sector->staleness,
532             storage     => $self->storage,
533             engine      => $self,
534         };
535
536         %$value = %temp;
537         bless $value, 'DBM::Deep::Hash' unless Scalar::Util::blessed( $value );
538     }
539
540     return 1;
541 }
542
543 =head2 get_next_key( $obj, $prev_key )
544
545 This takes an object that provides _base_offset() and an optional string
546 representing the prior key returned via a prior invocation of this method.
547
548 This method delegates to C<< DBM::Deep::Iterator->get_next_key() >>.
549
550 =cut
551
552 # XXX Add staleness here
553 sub get_next_key {
554     my $self = shift;
555     my ($obj, $prev_key) = @_;
556
557     # XXX Need to add logic about resetting the iterator if any key in the reference has changed
558     unless ( $prev_key ) {
559         $obj->{iterator} = DBM::Deep::Iterator->new({
560             base_offset => $obj->_base_offset,
561             engine      => $self,
562         });
563     }
564
565     return $obj->{iterator}->get_next_key( $obj );
566 }
567
568 =head2 setup_fh( $obj )
569
570 This takes an object that provides _base_offset(). It will do everything needed
571 in order to properly initialize all values for necessary functioning. If this is
572 called upon an already initialized object, this will also reset the inode.
573
574 This returns 1.
575
576 =cut
577
578 sub setup_fh {
579     my $self = shift;
580     my ($obj) = @_;
581
582     # We're opening the file.
583     unless ( $obj->_base_offset ) {
584         my $bytes_read = $self->_read_file_header;
585
586         # Creating a new file
587         unless ( $bytes_read ) {
588             $self->_write_file_header;
589
590             # 1) Create Array/Hash entry
591             my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
592                 engine => $self,
593                 type   => $obj->_type,
594             });
595             $obj->{base_offset} = $initial_reference->offset;
596             $obj->{staleness} = $initial_reference->staleness;
597
598             $self->storage->flush;
599         }
600         # Reading from an existing file
601         else {
602             $obj->{base_offset} = $bytes_read;
603             my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
604                 engine => $self,
605                 offset => $obj->_base_offset,
606             });
607             unless ( $initial_reference ) {
608                 DBM::Deep->_throw_error("Corrupted file, no master index record");
609             }
610
611             unless ($obj->_type eq $initial_reference->type) {
612                 DBM::Deep->_throw_error("File type mismatch");
613             }
614
615             $obj->{staleness} = $initial_reference->staleness;
616         }
617     }
618
619     $self->storage->set_inode;
620
621     return 1;
622 }
623
624 =head2 begin_work( $obj )
625
626 This takes an object that provides _base_offset(). It will set up all necessary
627 bookkeeping in order to run all work within a transaction.
628
629 If $obj is already within a transaction, an error wiill be thrown. If there are
630 no more available transactions, an error will be thrown.
631
632 This returns undef.
633
634 =cut
635
636 sub begin_work {
637     my $self = shift;
638     my ($obj) = @_;
639
640     if ( $self->trans_id ) {
641         DBM::Deep->_throw_error( "Cannot begin_work within an active transaction" );
642     }
643
644     my @slots = $self->read_txn_slots;
645     my $found;
646     for my $i ( 0 .. $#slots ) {
647         next if $slots[$i];
648
649         $slots[$i] = 1;
650         $self->set_trans_id( $i + 1 );
651         $found = 1;
652         last;
653     }
654     unless ( $found ) {
655         DBM::Deep->_throw_error( "Cannot allocate transaction ID" );
656     }
657     $self->write_txn_slots( @slots );
658
659     if ( !$self->trans_id ) {
660         DBM::Deep->_throw_error( "Cannot begin_work - no available transactions" );
661     }
662
663     return;
664 }
665
666 =head2 rollback( $obj )
667
668 This takes an object that provides _base_offset(). It will revert all
669 actions taken within the running transaction.
670
671 If $obj is not within a transaction, an error will be thrown.
672
673 This returns 1.
674
675 =cut
676
677 sub rollback {
678     my $self = shift;
679     my ($obj) = @_;
680
681     if ( !$self->trans_id ) {
682         DBM::Deep->_throw_error( "Cannot rollback without an active transaction" );
683     }
684
685     # Each entry is the file location for a bucket that has a modification for
686     # this transaction. The entries need to be expunged.
687     foreach my $entry (@{ $self->get_entries } ) {
688         # Remove the entry here
689         my $read_loc = $entry
690           + $self->hash_size
691           + $self->byte_size
692           + $self->byte_size
693           + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE );
694
695         my $data_loc = $self->storage->read_at( $read_loc, $self->byte_size );
696         $data_loc = unpack( $StP{$self->byte_size}, $data_loc );
697         $self->storage->print_at( $read_loc, pack( $StP{$self->byte_size}, 0 ) );
698
699         if ( $data_loc > 1 ) {
700             $self->_load_sector( $data_loc )->free;
701         }
702     }
703
704     $self->clear_entries;
705
706     my @slots = $self->read_txn_slots;
707     $slots[$self->trans_id-1] = 0;
708     $self->write_txn_slots( @slots );
709     $self->inc_txn_staleness_counter( $self->trans_id );
710     $self->set_trans_id( 0 );
711
712     return 1;
713 }
714
715 =head2 commit( $obj )
716
717 This takes an object that provides _base_offset(). It will apply all
718 actions taken within the transaction to the HEAD.
719
720 If $obj is not within a transaction, an error will be thrown.
721
722 This returns 1.
723
724 =cut
725
726 sub commit {
727     my $self = shift;
728     my ($obj) = @_;
729
730     if ( !$self->trans_id ) {
731         DBM::Deep->_throw_error( "Cannot commit without an active transaction" );
732     }
733
734     foreach my $entry (@{ $self->get_entries } ) {
735         # Overwrite the entry in head with the entry in trans_id
736         my $base = $entry
737           + $self->hash_size
738           + $self->byte_size;
739
740         my $head_loc = $self->storage->read_at( $base, $self->byte_size );
741         $head_loc = unpack( $StP{$self->byte_size}, $head_loc );
742
743         my $spot = $base + $self->byte_size + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE );
744         my $trans_loc = $self->storage->read_at(
745             $spot, $self->byte_size,
746         );
747
748         $self->storage->print_at( $base, $trans_loc );
749         $self->storage->print_at(
750             $spot,
751             pack( $StP{$self->byte_size} . ' ' . $StP{$STALE_SIZE}, (0) x 2 ),
752         );
753
754         if ( $head_loc > 1 ) {
755             $self->_load_sector( $head_loc )->free;
756         }
757     }
758
759     $self->clear_entries;
760
761     my @slots = $self->read_txn_slots;
762     $slots[$self->trans_id-1] = 0;
763     $self->write_txn_slots( @slots );
764     $self->inc_txn_staleness_counter( $self->trans_id );
765     $self->set_trans_id( 0 );
766
767     return 1;
768 }
769
770 =head2 lock_exclusive()
771
772 This takes an object that provides _base_offset(). It will guarantee that
773 the storage has taken precautions to be safe for a write.
774
775 This returns nothing.
776
777 =cut
778
779 sub lock_exclusive {
780     my $self = shift;
781     my ($obj) = @_;
782     return $self->storage->lock_exclusive( $obj );
783 }
784
785 =head2 lock_shared()
786
787 This takes an object that provides _base_offset(). It will guarantee that
788 the storage has taken precautions to be safe for a read.
789
790 This returns nothing.
791
792 =cut
793
794 sub lock_shared {
795     my $self = shift;
796     my ($obj) = @_;
797     return $self->storage->lock_shared( $obj );
798 }
799
800 =head2 unlock()
801
802 This takes an object that provides _base_offset(). It will guarantee that
803 the storage has released all locks taken.
804
805 This returns nothing.
806
807 =cut
808
809 sub unlock {
810     my $self = shift;
811     my ($obj) = @_;
812
813     my $rv = $self->storage->unlock( $obj );
814
815     $self->flush if $rv;
816
817     return $rv;
818 }
819
820 =head1 INTERNAL METHODS
821
822 The following methods are internal-use-only to DBM::Deep::Engine.
823
824 =cut
825
826 =head2 read_txn_slots()
827
828 This takes no arguments.
829
830 This will return an array with a 1 or 0 in each slot. Each spot represents one
831 available transaction. If the slot is 1, that transaction is taken. If it is 0,
832 the transaction is available.
833
834 =cut
835
836 sub read_txn_slots {
837     my $self = shift;
838     my $bl = $self->txn_bitfield_len;
839     my $num_bits = $bl * 8;
840     return split '', unpack( 'b'.$num_bits,
841         $self->storage->read_at(
842             $self->trans_loc, $bl,
843         )
844     );
845 }
846
847 =head2 write_txn_slots( @slots )
848
849 This takes an array of 1's and 0's. This array represents the transaction slots
850 returned by L</read_txn_slots()>. In other words, the following is true:
851
852   @x = read_txn_slots( write_txn_slots( @x ) );
853
854 (With the obviously missing object referents added back in.)
855
856 =cut
857
858 sub write_txn_slots {
859     my $self = shift;
860     my $num_bits = $self->txn_bitfield_len * 8;
861     $self->storage->print_at( $self->trans_loc,
862         pack( 'b'.$num_bits, join('', @_) ),
863     );
864 }
865
866 =head2 get_running_txn_ids()
867
868 This takes no arguments.
869
870 This will return an array of taken transaction IDs. This wraps L</read_txn_slots()>.
871
872 =cut
873
874 sub get_running_txn_ids {
875     my $self = shift;
876     my @transactions = $self->read_txn_slots;
877     my @trans_ids = map { $_+1 } grep { $transactions[$_] } 0 .. $#transactions;
878 }
879
880 =head2 get_txn_staleness_counter( $trans_id )
881
882 This will return the staleness counter for the given transaction ID. Please see
883 L</TRANSACTION STALENESS> for more information.
884
885 =cut
886
887 sub get_txn_staleness_counter {
888     my $self = shift;
889     my ($trans_id) = @_;
890
891     # Hardcode staleness of 0 for the HEAD
892     return 0 unless $trans_id;
893
894     return unpack( $StP{$STALE_SIZE},
895         $self->storage->read_at(
896             $self->trans_loc + $self->txn_bitfield_len + $STALE_SIZE * ($trans_id - 1),
897             $STALE_SIZE,
898         )
899     );
900 }
901
902 =head2 inc_txn_staleness_counter( $trans_id )
903
904 This will increment the staleness counter for the given transaction ID. Please see
905 L</TRANSACTION STALENESS> for more information.
906
907 =cut
908
909 sub inc_txn_staleness_counter {
910     my $self = shift;
911     my ($trans_id) = @_;
912
913     # Hardcode staleness of 0 for the HEAD
914     return 0 unless $trans_id;
915
916     $self->storage->print_at(
917         $self->trans_loc + $self->txn_bitfield_len + $STALE_SIZE * ($trans_id - 1),
918         pack( $StP{$STALE_SIZE}, $self->get_txn_staleness_counter( $trans_id ) + 1 ),
919     );
920 }
921
922 =head2 get_entries()
923
924 This takes no arguments.
925
926 This returns a list of all the sectors that have been modified by this transaction.
927
928 =cut
929
930 sub get_entries {
931     my $self = shift;
932     return [ keys %{ $self->{entries}{$self->trans_id} ||= {} } ];
933 }
934
935 =head2 add_entry( $trans_id, $location )
936
937 This takes a transaction ID and a file location and marks the sector at that location
938 as having been modified by the transaction identified by $trans_id.
939
940 This returns nothing.
941
942 B<NOTE>: Unlike all the other _entries() methods, there are several cases where
943 C<< $trans_id != $self->trans_id >> for this method.
944
945 =cut
946
947 sub add_entry {
948     my $self = shift;
949     my ($trans_id, $loc) = @_;
950
951     $self->{entries}{$trans_id} ||= {};
952     $self->{entries}{$trans_id}{$loc} = undef;
953 }
954
955 =head2 reindex_entry( $old_loc, $new_loc )
956
957 This takes two locations (old and new, respectively). If a location that has been
958 modified by this transaction is subsequently reindexed due to a bucketlist
959 overflowing, then the entries hash needs to be made aware of this change.
960
961 This returns nothing.
962
963 =cut
964
965 sub reindex_entry {
966     my $self = shift;
967     my ($old_loc, $new_loc) = @_;
968
969     TRANS:
970     while ( my ($trans_id, $locs) = each %{ $self->{entries} } ) {
971         if ( exists $locs->{$old_loc} ) {
972             delete $locs->{$old_loc};
973             $locs->{$new_loc} = undef;
974             next TRANS;
975         }
976     }
977 }
978
979 =head2 clear_entries()
980
981 This takes no arguments. It will clear the entries list for the running transaction.
982
983 This returns nothing.
984
985 =cut
986
987 sub clear_entries {
988     my $self = shift;
989     delete $self->{entries}{$self->trans_id};
990 }
991
992 =head2 _write_file_header()
993
994 This writes the file header for a new file. This will write the various settings
995 that set how the file is interpreted.
996
997 =head2 _read_file_header()
998
999 This reads the file header from an existing file. This will read the various
1000 settings that set how the file is interpreted.
1001
1002 =cut
1003
1004 {
1005     my $header_fixed = length( SIG_FILE ) + 1 + 4 + 4;
1006     my $this_file_version = 3;
1007
1008     sub _write_file_header {
1009         my $self = shift;
1010
1011         my $nt = $self->num_txns;
1012         my $bl = $self->txn_bitfield_len;
1013
1014         my $header_var = 1 + 1 + 1 + 1 + $bl + $STALE_SIZE * ($nt - 1) + 3 * $self->byte_size;
1015
1016         my $loc = $self->storage->request_space( $header_fixed + $header_var );
1017
1018         $self->storage->print_at( $loc,
1019             SIG_FILE,
1020             SIG_HEADER,
1021             pack('N', $this_file_version), # At this point, we're at 9 bytes
1022             pack('N', $header_var),        # header size
1023             # --- Above is $header_fixed. Below is $header_var
1024             pack('C', $self->byte_size),
1025
1026             # These shenanigans are to allow a 256 within a C
1027             pack('C', $self->max_buckets - 1),
1028             pack('C', $self->data_sector_size - 1),
1029
1030             pack('C', $nt),
1031             pack('C' . $bl, 0 ),                           # Transaction activeness bitfield
1032             pack($StP{$STALE_SIZE}.($nt-1), 0 x ($nt-1) ), # Transaction staleness counters
1033             pack($StP{$self->byte_size}, 0), # Start of free chain (blist size)
1034             pack($StP{$self->byte_size}, 0), # Start of free chain (data size)
1035             pack($StP{$self->byte_size}, 0), # Start of free chain (index size)
1036         );
1037
1038         #XXX Set these less fragilely
1039         $self->set_trans_loc( $header_fixed + 4 );
1040         $self->set_chains_loc( $header_fixed + 4 + $bl + $STALE_SIZE * ($nt-1) );
1041
1042         return;
1043     }
1044
1045     sub _read_file_header {
1046         my $self = shift;
1047
1048         my $buffer = $self->storage->read_at( 0, $header_fixed );
1049         return unless length($buffer);
1050
1051         my ($file_signature, $sig_header, $file_version, $size) = unpack(
1052             'A4 A N N', $buffer
1053         );
1054
1055         unless ( $file_signature eq SIG_FILE ) {
1056             $self->storage->close;
1057             DBM::Deep->_throw_error( "Signature not found -- file is not a Deep DB" );
1058         }
1059
1060         unless ( $sig_header eq SIG_HEADER ) {
1061             $self->storage->close;
1062             DBM::Deep->_throw_error( "Pre-1.00 file version found" );
1063         }
1064
1065         unless ( $file_version == $this_file_version ) {
1066             $self->storage->close;
1067             DBM::Deep->_throw_error(
1068                 "Wrong file version found - " .  $file_version .
1069                 " - expected " . $this_file_version
1070             );
1071         }
1072
1073         my $buffer2 = $self->storage->read_at( undef, $size );
1074         my @values = unpack( 'C C C C', $buffer2 );
1075
1076         if ( @values != 4 || grep { !defined } @values ) {
1077             $self->storage->close;
1078             DBM::Deep->_throw_error("Corrupted file - bad header");
1079         }
1080
1081         #XXX Add warnings if values weren't set right
1082         @{$self}{qw(byte_size max_buckets data_sector_size num_txns)} = @values;
1083
1084         # These shenangians are to allow a 256 within a C
1085         $self->{max_buckets} += 1;
1086         $self->{data_sector_size} += 1;
1087
1088         my $bl = $self->txn_bitfield_len;
1089
1090         my $header_var = scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) + 3 * $self->byte_size;
1091         unless ( $size == $header_var ) {
1092             $self->storage->close;
1093             DBM::Deep->_throw_error( "Unexpected size found ($size <-> $header_var)." );
1094         }
1095
1096         $self->set_trans_loc( $header_fixed + scalar(@values) );
1097         $self->set_chains_loc( $header_fixed + scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) );
1098
1099         return length($buffer) + length($buffer2);
1100     }
1101 }
1102
1103 =head2 _load_sector( $offset )
1104
1105 This will instantiate and return the sector object that represents the data found
1106 at $offset.
1107
1108 =cut
1109
1110 sub _load_sector {
1111     my $self = shift;
1112     my ($offset) = @_;
1113
1114     # Add a catch for offset of 0 or 1
1115     return if !$offset || $offset <= 1;
1116
1117     my $type = $self->storage->read_at( $offset, 1 );
1118     return if $type eq chr(0);
1119
1120     if ( $type eq $self->SIG_ARRAY || $type eq $self->SIG_HASH ) {
1121         return DBM::Deep::Engine::Sector::Reference->new({
1122             engine => $self,
1123             type   => $type,
1124             offset => $offset,
1125         });
1126     }
1127     # XXX Don't we need key_md5 here?
1128     elsif ( $type eq $self->SIG_BLIST ) {
1129         return DBM::Deep::Engine::Sector::BucketList->new({
1130             engine => $self,
1131             type   => $type,
1132             offset => $offset,
1133         });
1134     }
1135     elsif ( $type eq $self->SIG_INDEX ) {
1136         return DBM::Deep::Engine::Sector::Index->new({
1137             engine => $self,
1138             type   => $type,
1139             offset => $offset,
1140         });
1141     }
1142     elsif ( $type eq $self->SIG_NULL ) {
1143         return DBM::Deep::Engine::Sector::Null->new({
1144             engine => $self,
1145             type   => $type,
1146             offset => $offset,
1147         });
1148     }
1149     elsif ( $type eq $self->SIG_DATA ) {
1150         return DBM::Deep::Engine::Sector::Scalar->new({
1151             engine => $self,
1152             type   => $type,
1153             offset => $offset,
1154         });
1155     }
1156     # This was deleted from under us, so just return and let the caller figure it out.
1157     elsif ( $type eq $self->SIG_FREE ) {
1158         return;
1159     }
1160
1161     DBM::Deep->_throw_error( "'$offset': Don't know what to do with type '$type'" );
1162 }
1163
1164 =head2 _apply_digest( @stuff )
1165
1166 This will apply the digest methd (default to Digest::MD5::md5) to the arguments
1167 passed in and return the result.
1168
1169 =cut
1170
1171 sub _apply_digest {
1172     my $self = shift;
1173     return $self->{digest}->(@_);
1174 }
1175
1176 =head2 _add_free_blist_sector( $offset, $size )
1177
1178 =head2 _add_free_data_sector( $offset, $size )
1179
1180 =head2 _add_free_index_sector( $offset, $size )
1181
1182 These methods are all wrappers around _add_free_sector(), providing the proper
1183 chain offset ($multiple) for the sector type.
1184
1185 =cut
1186
1187 sub _add_free_blist_sector { shift->_add_free_sector( 0, @_ ) }
1188 sub _add_free_data_sector { shift->_add_free_sector( 1, @_ ) }
1189 sub _add_free_index_sector { shift->_add_free_sector( 2, @_ ) }
1190
1191 =head2 _add_free_sector( $multiple, $offset, $size )
1192
1193 _add_free_sector() takes the offset into the chains location, the offset of the
1194 sector, and the size of that sector. It will mark the sector as a free sector
1195 and put it into the list of sectors that are free of this type for use later.
1196
1197 This returns nothing.
1198
1199 B<NOTE>: $size is unused?
1200
1201 =cut
1202
1203 sub _add_free_sector {
1204     my $self = shift;
1205     my ($multiple, $offset, $size) = @_;
1206
1207     my $chains_offset = $multiple * $self->byte_size;
1208
1209     my $storage = $self->storage;
1210
1211     # Increment staleness.
1212     # XXX Can this increment+modulo be done by "&= 0x1" ?
1213     my $staleness = unpack( $StP{$STALE_SIZE}, $storage->read_at( $offset + SIG_SIZE, $STALE_SIZE ) );
1214     $staleness = ($staleness + 1 ) % ( 2 ** ( 8 * $STALE_SIZE ) );
1215     $storage->print_at( $offset + SIG_SIZE, pack( $StP{$STALE_SIZE}, $staleness ) );
1216
1217     my $old_head = $storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
1218
1219     $storage->print_at( $self->chains_loc + $chains_offset,
1220         pack( $StP{$self->byte_size}, $offset ),
1221     );
1222
1223     # Record the old head in the new sector after the signature and staleness counter
1224     $storage->print_at( $offset + SIG_SIZE + $STALE_SIZE, $old_head );
1225 }
1226
1227 =head2 _request_blist_sector( $size )
1228
1229 =head2 _request_data_sector( $size )
1230
1231 =head2 _request_index_sector( $size )
1232
1233 These methods are all wrappers around _request_sector(), providing the proper
1234 chain offset ($multiple) for the sector type.
1235
1236 =cut
1237
1238 sub _request_blist_sector { shift->_request_sector( 0, @_ ) }
1239 sub _request_data_sector { shift->_request_sector( 1, @_ ) }
1240 sub _request_index_sector { shift->_request_sector( 2, @_ ) }
1241
1242 =head2 _request_sector( $multiple $size )
1243
1244 This takes the offset into the chains location and the size of that sector.
1245
1246 This returns the object with the sector. If there is an available free sector of
1247 that type, then it will be reused. If there isn't one, then a new one will be
1248 allocated.
1249
1250 =cut
1251
1252 sub _request_sector {
1253     my $self = shift;
1254     my ($multiple, $size) = @_;
1255
1256     my $chains_offset = $multiple * $self->byte_size;
1257
1258     my $old_head = $self->storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
1259     my $loc = unpack( $StP{$self->byte_size}, $old_head );
1260
1261     # We don't have any free sectors of the right size, so allocate a new one.
1262     unless ( $loc ) {
1263         my $offset = $self->storage->request_space( $size );
1264
1265         # Zero out the new sector. This also guarantees correct increases
1266         # in the filesize.
1267         $self->storage->print_at( $offset, chr(0) x $size );
1268
1269         return $offset;
1270     }
1271
1272     # Read the new head after the signature and the staleness counter
1273     my $new_head = $self->storage->read_at( $loc + SIG_SIZE + $STALE_SIZE, $self->byte_size );
1274     $self->storage->print_at( $self->chains_loc + $chains_offset, $new_head );
1275     $self->storage->print_at(
1276         $loc + SIG_SIZE + $STALE_SIZE,
1277         pack( $StP{$self->byte_size}, 0 ),
1278     );
1279
1280     return $loc;
1281 }
1282
1283 =head2 flush()
1284
1285 This takes no arguments. It will do everything necessary to flush all things to
1286 disk. This is usually called during unlock() and setup_fh().
1287
1288 This returns nothing.
1289
1290 =cut
1291
1292 sub flush {
1293     my $self = shift;
1294
1295     # Why do we need to have the storage flush? Shouldn't autoflush take care of things?
1296     # -RobK, 2008-06-26
1297     $self->storage->flush;
1298 }
1299
1300 =head2 ACCESSORS
1301
1302 The following are readonly attributes.
1303
1304 =over 4
1305
1306 =item * storage
1307
1308 =item * byte_size
1309
1310 =item * hash_size
1311
1312 =item * hash_chars
1313
1314 =item * num_txns
1315
1316 =item * max_buckets
1317
1318 =item * blank_md5
1319
1320 =item * data_sector_size
1321
1322 =item * txn_bitfield_len
1323
1324 =back
1325
1326 =cut
1327
1328 sub storage     { $_[0]{storage} }
1329 sub byte_size   { $_[0]{byte_size} }
1330 sub hash_size   { $_[0]{hash_size} }
1331 sub hash_chars  { $_[0]{hash_chars} }
1332 sub num_txns    { $_[0]{num_txns} }
1333 sub max_buckets { $_[0]{max_buckets} }
1334 sub blank_md5   { chr(0) x $_[0]->hash_size }
1335 sub data_sector_size { $_[0]{data_sector_size} }
1336
1337 # This is a calculated value
1338 sub txn_bitfield_len {
1339     my $self = shift;
1340     unless ( exists $self->{txn_bitfield_len} ) {
1341         my $temp = ($self->num_txns) / 8;
1342         if ( $temp > int( $temp ) ) {
1343             $temp = int( $temp ) + 1;
1344         }
1345         $self->{txn_bitfield_len} = $temp;
1346     }
1347     return $self->{txn_bitfield_len};
1348 }
1349
1350 =pod
1351
1352 The following are read/write attributes. 
1353
1354 =over 4
1355
1356 =item * trans_id / set_trans_id( $new_id )
1357
1358 =item * trans_loc / set_trans_loc( $new_loc )
1359
1360 =item * chains_loc / set_chains_loc( $new_loc )
1361
1362 =back
1363
1364 =cut
1365
1366 sub trans_id     { $_[0]{trans_id} }
1367 sub set_trans_id { $_[0]{trans_id} = $_[1] }
1368
1369 sub trans_loc     { $_[0]{trans_loc} }
1370 sub set_trans_loc { $_[0]{trans_loc} = $_[1] }
1371
1372 sub chains_loc     { $_[0]{chains_loc} }
1373 sub set_chains_loc { $_[0]{chains_loc} = $_[1] }
1374
1375 sub cache       { $_[0]{cache} ||= {} }
1376 sub clear_cache { %{$_[0]->cache} = () }
1377
1378 =head2 _dump_file()
1379
1380 This method takes no arguments. It's used to print out a textual representation of the DBM::Deep
1381 DB file. It assumes the file is not-corrupted.
1382
1383 =cut
1384
1385 sub _dump_file {
1386     my $self = shift;
1387
1388     # Read the header
1389     my $spot = $self->_read_file_header();
1390
1391     my %types = (
1392         0 => 'B',
1393         1 => 'D',
1394         2 => 'I',
1395     );
1396
1397     my %sizes = (
1398         'D' => $self->data_sector_size,
1399         'B' => DBM::Deep::Engine::Sector::BucketList->new({engine=>$self,offset=>1})->size,
1400         'I' => DBM::Deep::Engine::Sector::Index->new({engine=>$self,offset=>1})->size,
1401     );
1402
1403     my $return = "";
1404
1405     # Header values
1406     $return .= "NumTxns: " . $self->num_txns . $/;
1407
1408     # Read the free sector chains
1409     my %sectors;
1410     foreach my $multiple ( 0 .. 2 ) {
1411         $return .= "Chains($types{$multiple}):";
1412         my $old_loc = $self->chains_loc + $multiple * $self->byte_size;
1413         while ( 1 ) {
1414             my $loc = unpack(
1415                 $StP{$self->byte_size},
1416                 $self->storage->read_at( $old_loc, $self->byte_size ),
1417             );
1418
1419             # We're now out of free sectors of this kind.
1420             unless ( $loc ) {
1421                 last;
1422             }
1423
1424             $sectors{ $types{$multiple} }{ $loc } = undef;
1425             $old_loc = $loc + SIG_SIZE + $STALE_SIZE;
1426             $return .= " $loc";
1427         }
1428         $return .= $/;
1429     }
1430
1431     SECTOR:
1432     while ( $spot < $self->storage->{end} ) {
1433         # Read each sector in order.
1434         my $sector = $self->_load_sector( $spot );
1435         if ( !$sector ) {
1436             # Find it in the free-sectors that were found already
1437             foreach my $type ( keys %sectors ) {
1438                 if ( exists $sectors{$type}{$spot} ) {
1439                     my $size = $sizes{$type};
1440                     $return .= sprintf "%08d: %s %04d\n", $spot, 'F' . $type, $size;
1441                     $spot += $size;
1442                     next SECTOR;
1443                 }
1444             }
1445
1446             die "********\n$return\nDidn't find free sector for $spot in chains\n********\n";
1447         }
1448         else {
1449             $return .= sprintf "%08d: %s  %04d", $spot, $sector->type, $sector->size;
1450             if ( $sector->type eq 'D' ) {
1451                 $return .= ' ' . $sector->data;
1452             }
1453             elsif ( $sector->type eq 'A' || $sector->type eq 'H' ) {
1454                 $return .= ' REF: ' . $sector->get_refcount;
1455             }
1456             elsif ( $sector->type eq 'B' ) {
1457                 foreach my $bucket ( $sector->chopped_up ) {
1458                     $return .= "\n    ";
1459                     $return .= sprintf "%08d", unpack($StP{$self->byte_size},
1460                         substr( $bucket->[-1], $self->hash_size, $self->byte_size),
1461                     );
1462                     my $l = unpack( $StP{$self->byte_size},
1463                         substr( $bucket->[-1],
1464                             $self->hash_size + $self->byte_size,
1465                             $self->byte_size,
1466                         ),
1467                     );
1468                     $return .= sprintf " %08d", $l;
1469                     foreach my $txn ( 0 .. $self->num_txns - 2 ) {
1470                         my $l = unpack( $StP{$self->byte_size},
1471                             substr( $bucket->[-1],
1472                                 $self->hash_size + 2 * $self->byte_size + $txn * ($self->byte_size + $STALE_SIZE),
1473                                 $self->byte_size,
1474                             ),
1475                         );
1476                         $return .= sprintf " %08d", $l;
1477                     }
1478                 }
1479             }
1480             $return .= $/;
1481
1482             $spot += $sector->size;
1483         }
1484     }
1485
1486     return $return;
1487 }
1488
1489 1;
1490 __END__