POD linebreak cleanup
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 use 5.006_000;
4
5 use strict;
6 use warnings FATAL => 'all';
7
8 # Never import symbols into our namespace. We are a class, not a library.
9 # -RobK, 2008-05-27
10 use Scalar::Util ();
11
12 #use Data::Dumper ();
13
14 # File-wide notes:
15 # * Every method in here assumes that the storage has been appropriately
16 #   safeguarded. This can be anything from flock() to some sort of manual
17 #   mutex. But, it's the caller's responsability to make sure that this has
18 #   been done.
19
20 # Setup file and tag signatures.  These should never change.
21 sub SIG_FILE     () { 'DPDB' }
22 sub SIG_HEADER   () { 'h'    }
23 sub SIG_HASH     () { 'H'    }
24 sub SIG_ARRAY    () { 'A'    }
25 sub SIG_NULL     () { 'N'    }
26 sub SIG_DATA     () { 'D'    }
27 sub SIG_INDEX    () { 'I'    }
28 sub SIG_BLIST    () { 'B'    }
29 sub SIG_FREE     () { 'F'    }
30 sub SIG_SIZE     () {  1     }
31
32 use DBM::Deep::Storage::File ();
33 use DBM::Deep::Iterator ();
34 use DBM::Deep::Engine::Sector::Data ();
35 use DBM::Deep::Engine::Sector::BucketList ();
36 use DBM::Deep::Engine::Sector::Index ();
37 use DBM::Deep::Engine::Sector::Null ();
38 use DBM::Deep::Engine::Sector::Reference ();
39 use DBM::Deep::Engine::Sector::Scalar ();
40 use DBM::Deep::Null ();
41
42 my $STALE_SIZE = 2;
43
44 # Please refer to the pack() documentation for further information
45 my %StP = (
46     1 => 'C', # Unsigned char value (no order needed as it's just one byte)
47     2 => 'n', # Unsigned short in "network" (big-endian) order
48     4 => 'N', # Unsigned long in "network" (big-endian) order
49     8 => 'Q', # Usigned quad (no order specified, presumably machine-dependent)
50 );
51
52 =head1 NAME
53
54 DBM::Deep::Engine
55
56 =head1 PURPOSE
57
58 This is an internal-use-only object for L<DBM::Deep/>. It mediates the low-level
59 mapping between the L<DBM::Deep/> objects and the storage medium.
60
61 The purpose of this documentation is to provide low-level documentation for
62 developers. It is B<not> intended to be used by the general public. This
63 documentation and what it documents can and will change without notice.
64
65 =head1 OVERVIEW
66
67 The engine exposes an API to the DBM::Deep objects (DBM::Deep, DBM::Deep::Array,
68 and DBM::Deep::Hash) for their use to access the actual stored values. This API
69 is the following:
70
71 =over 4
72
73 =item * new
74
75 =item * read_value
76
77 =item * get_classname
78
79 =item * make_reference
80
81 =item * key_exists
82
83 =item * delete_key
84
85 =item * write_value
86
87 =item * get_next_key
88
89 =item * setup_fh
90
91 =item * begin_work
92
93 =item * commit
94
95 =item * rollback
96
97 =item * lock_exclusive
98
99 =item * lock_shared
100
101 =item * unlock
102
103 =back
104
105 They are explained in their own sections below. These methods, in turn, may
106 provide some bounds-checking, but primarily act to instantiate objects in the
107 Engine::Sector::* hierarchy and dispatch to them.
108
109 =head1 TRANSACTIONS
110
111 Transactions in DBM::Deep are implemented using a variant of MVCC. This attempts
112 to keep the amount of actual work done against the file low while stil providing
113 Atomicity, Consistency, and Isolation. Durability, unfortunately, cannot be done
114 with only one file.
115
116 =head2 STALENESS
117
118 If another process uses a transaction slot and writes stuff to it, then
119 terminates, the data that process wrote it still within the file. In order to
120 address this, there is also a transaction staleness counter associated within
121 every write.  Each time a transaction is started, that process increments that
122 transaction's staleness counter. If, when it reads a value, the staleness
123 counters aren't identical, DBM::Deep will consider the value on disk to be stale
124 and discard it.
125
126 =head2 DURABILITY
127
128 The fourth leg of ACID is Durability, the guarantee that when a commit returns,
129 the data will be there the next time you read from it. This should be regardless
130 of any crashes or powerdowns in between the commit and subsequent read.
131 DBM::Deep does provide that guarantee; once the commit returns, all of the data
132 has been transferred from the transaction shadow to the HEAD. The issue arises
133 with partial commits - a commit that is interrupted in some fashion. In keeping
134 with DBM::Deep's "tradition" of very light error-checking and non-existent
135 error-handling, there is no way to recover from a partial commit. (This is
136 probably a failure in Consistency as well as Durability.)
137
138 Other DBMSes use transaction logs (a separate file, generally) to achieve
139 Durability.  As DBM::Deep is a single-file, we would have to do something
140 similar to what SQLite and BDB do in terms of committing using synchonized
141 writes. To do this, we would have to use a much higher RAM footprint and some
142 serious programming that make my head hurts just to think about it.
143
144 =head1 EXTERNAL METHODS
145
146 =head2 new()
147
148 This takes a set of args. These args are described in the documentation for
149 L<DBM::Deep/new>.
150
151 =cut
152
153 sub new {
154     my $class = shift;
155     my ($args) = @_;
156
157     $args->{storage} = DBM::Deep::Storage::File->new( $args )
158         unless exists $args->{storage};
159
160     my $self = bless {
161         byte_size   => 4,
162
163         digest      => undef,
164         hash_size   => 16,  # In bytes
165         hash_chars  => 256, # Number of chars the algorithm uses per byte
166         max_buckets => 16,
167         num_txns    => 1,   # The HEAD
168         trans_id    => 0,   # Default to the HEAD
169
170         data_sector_size => 64, # Size in bytes of each data sector
171
172         entries => {}, # This is the list of entries for transactions
173         storage => undef,
174     }, $class;
175
176     # Never allow byte_size to be set directly.
177     delete $args->{byte_size};
178     if ( defined $args->{pack_size} ) {
179         if ( lc $args->{pack_size} eq 'small' ) {
180             $args->{byte_size} = 2;
181         }
182         elsif ( lc $args->{pack_size} eq 'medium' ) {
183             $args->{byte_size} = 4;
184         }
185         elsif ( lc $args->{pack_size} eq 'large' ) {
186             $args->{byte_size} = 8;
187         }
188         else {
189             DBM::Deep->_throw_error( "Unknown pack_size value: '$args->{pack_size}'" );
190         }
191     }
192
193     # Grab the parameters we want to use
194     foreach my $param ( keys %$self ) {
195         next unless exists $args->{$param};
196         $self->{$param} = $args->{$param};
197     }
198
199     my %validations = (
200         max_buckets      => { floor => 16, ceil => 256 },
201         num_txns         => { floor => 1,  ceil => 255 },
202         data_sector_size => { floor => 32, ceil => 256 },
203     );
204
205     while ( my ($attr, $c) = each %validations ) {
206         if (   !defined $self->{$attr}
207             || !length $self->{$attr}
208             || $self->{$attr} =~ /\D/
209             || $self->{$attr} < $c->{floor}
210         ) {
211             $self->{$attr} = '(undef)' if !defined $self->{$attr};
212             warn "Floor of $attr is $c->{floor}. Setting it to $c->{floor} from '$self->{$attr}'\n";
213             $self->{$attr} = $c->{floor};
214         }
215         elsif ( $self->{$attr} > $c->{ceil} ) {
216             warn "Ceiling of $attr is $c->{ceil}. Setting it to $c->{ceil} from '$self->{$attr}'\n";
217             $self->{$attr} = $c->{ceil};
218         }
219     }
220
221     if ( !$self->{digest} ) {
222         require Digest::MD5;
223         $self->{digest} = \&Digest::MD5::md5;
224     }
225
226     return $self;
227 }
228
229 =head2 read_value( $obj, $key )
230
231 This takes an object that provides _base_offset() and a string. It returns the
232 value stored in the corresponding Sector::Value's data section.
233
234 =cut
235
236 sub read_value {
237     my $self = shift;
238     my ($obj, $key) = @_;
239
240     # This will be a Reference sector
241     my $sector = $self->_load_sector( $obj->_base_offset )
242         or return;
243
244     if ( $sector->staleness != $obj->_staleness ) {
245         return;
246     }
247
248     my $key_md5 = $self->_apply_digest( $key );
249
250     my $value_sector = $sector->get_data_for({
251         key_md5    => $key_md5,
252         allow_head => 1,
253     });
254
255     unless ( $value_sector ) {
256         $value_sector = DBM::Deep::Engine::Sector::Null->new({
257             engine => $self,
258             data   => undef,
259         });
260
261         $sector->write_data({
262             key_md5 => $key_md5,
263             key     => $key,
264             value   => $value_sector,
265         });
266     }
267
268     return $value_sector->data;
269 }
270
271 =head2 get_classname( $obj )
272
273 This takes an object that provides _base_offset() and returns the classname (if
274 any) associated with it.
275
276 It delegates to Sector::Reference::get_classname() for the heavy lifting.
277
278 It performs a staleness check.
279
280 =cut
281
282 sub get_classname {
283     my $self = shift;
284     my ($obj) = @_;
285
286     # This will be a Reference sector
287     my $sector = $self->_load_sector( $obj->_base_offset )
288         or DBM::Deep->_throw_error( "How did get_classname fail (no sector for '$obj')?!" );
289
290     if ( $sector->staleness != $obj->_staleness ) {
291         return;
292     }
293
294     return $sector->get_classname;
295 }
296
297 =head2 make_reference( $obj, $old_key, $new_key )
298
299 This takes an object that provides _base_offset() and two strings. The
300 strings correspond to the old key and new key, respectively. This operation
301 is equivalent to (given C<< $db->{foo} = []; >>) C<< $db->{bar} = $db->{foo} >>.
302
303 This returns nothing.
304
305 =cut
306
307 sub make_reference {
308     my $self = shift;
309     my ($obj, $old_key, $new_key) = @_;
310
311     # This will be a Reference sector
312     my $sector = $self->_load_sector( $obj->_base_offset )
313         or DBM::Deep->_throw_error( "How did make_reference fail (no sector for '$obj')?!" );
314
315     if ( $sector->staleness != $obj->_staleness ) {
316         return;
317     }
318
319     my $old_md5 = $self->_apply_digest( $old_key );
320
321     my $value_sector = $sector->get_data_for({
322         key_md5    => $old_md5,
323         allow_head => 1,
324     });
325
326     unless ( $value_sector ) {
327         $value_sector = DBM::Deep::Engine::Sector::Null->new({
328             engine => $self,
329             data   => undef,
330         });
331
332         $sector->write_data({
333             key_md5 => $old_md5,
334             key     => $old_key,
335             value   => $value_sector,
336         });
337     }
338
339     if ( $value_sector->isa( 'DBM::Deep::Engine::Sector::Reference' ) ) {
340         $sector->write_data({
341             key     => $new_key,
342             key_md5 => $self->_apply_digest( $new_key ),
343             value   => $value_sector,
344         });
345         $value_sector->increment_refcount;
346     }
347     else {
348         $sector->write_data({
349             key     => $new_key,
350             key_md5 => $self->_apply_digest( $new_key ),
351             value   => $value_sector->clone,
352         });
353     }
354
355     return;
356 }
357
358 =head2 key_exists( $obj, $key )
359
360 This takes an object that provides _base_offset() and a string for
361 the key to be checked. This returns 1 for true and "" for false.
362
363 =cut
364
365 sub key_exists {
366     my $self = shift;
367     my ($obj, $key) = @_;
368
369     # This will be a Reference sector
370     my $sector = $self->_load_sector( $obj->_base_offset )
371         or return '';
372
373     if ( $sector->staleness != $obj->_staleness ) {
374         return '';
375     }
376
377     my $data = $sector->get_data_for({
378         key_md5    => $self->_apply_digest( $key ),
379         allow_head => 1,
380     });
381
382     # exists() returns 1 or '' for true/false.
383     return $data ? 1 : '';
384 }
385
386 =head2 delete_key( $obj, $key )
387
388 This takes an object that provides _base_offset() and a string for
389 the key to be deleted. This returns the result of the Sector::Reference
390 delete_key() method.
391
392 =cut
393
394 sub delete_key {
395     my $self = shift;
396     my ($obj, $key) = @_;
397
398     my $sector = $self->_load_sector( $obj->_base_offset )
399         or return;
400
401     if ( $sector->staleness != $obj->_staleness ) {
402         return;
403     }
404
405     return $sector->delete_key({
406         key_md5    => $self->_apply_digest( $key ),
407         allow_head => 0,
408     });
409 }
410
411 =head2 write_value( $obj, $key, $value )
412
413 This takes an object that provides _base_offset(), a string for the
414 key, and a value. This value can be anything storable within L<DBM::Deep/>.
415
416 This returns 1 upon success.
417
418 =cut
419
420 sub write_value {
421     my $self = shift;
422     my ($obj, $key, $value) = @_;
423
424     my $r = Scalar::Util::reftype( $value ) || '';
425     {
426         last if $r eq '';
427         last if $r eq 'HASH';
428         last if $r eq 'ARRAY';
429
430         DBM::Deep->_throw_error(
431             "Storage of references of type '$r' is not supported."
432         );
433     }
434
435     # This will be a Reference sector
436     my $sector = $self->_load_sector( $obj->_base_offset )
437         or DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep." );
438
439     if ( $sector->staleness != $obj->_staleness ) {
440         DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep." );
441     }
442
443     my ($class, $type);
444     if ( !defined $value ) {
445         $class = 'DBM::Deep::Engine::Sector::Null';
446     }
447     elsif ( $r eq 'ARRAY' || $r eq 'HASH' ) {
448         my $tmpvar;
449         if ( $r eq 'ARRAY' ) {
450             $tmpvar = tied @$value;
451         } elsif ( $r eq 'HASH' ) {
452             $tmpvar = tied %$value;
453         }
454
455         if ( $tmpvar ) {
456             my $is_dbm_deep = eval { local $SIG{'__DIE__'}; $tmpvar->isa( 'DBM::Deep' ); };
457
458             unless ( $is_dbm_deep ) {
459                 DBM::Deep->_throw_error( "Cannot store something that is tied." );
460             }
461
462             unless ( $tmpvar->_engine->storage == $self->storage ) {
463                 DBM::Deep->_throw_error( "Cannot store values across DBM::Deep files. Please use export() instead." );
464             }
465
466             # First, verify if we're storing the same thing to this spot. If we are, then
467             # this should be a no-op. -EJS, 2008-05-19
468             my $loc = $sector->get_data_location_for({
469                 key_md5 => $self->_apply_digest( $key ),
470                 allow_head => 1,
471             });
472
473             if ( defined($loc) && $loc == $tmpvar->_base_offset ) {
474                 return 1;
475             }
476
477             #XXX Can this use $loc?
478             my $value_sector = $self->_load_sector( $tmpvar->_base_offset );
479             $sector->write_data({
480                 key     => $key,
481                 key_md5 => $self->_apply_digest( $key ),
482                 value   => $value_sector,
483             });
484             $value_sector->increment_refcount;
485
486             return 1;
487         }
488
489         $class = 'DBM::Deep::Engine::Sector::Reference';
490         $type = substr( $r, 0, 1 );
491     }
492     else {
493         if ( tied($value) ) {
494             DBM::Deep->_throw_error( "Cannot store something that is tied." );
495         }
496         $class = 'DBM::Deep::Engine::Sector::Scalar';
497     }
498
499     # Create this after loading the reference sector in case something bad happens.
500     # This way, we won't allocate value sector(s) needlessly.
501     my $value_sector = $class->new({
502         engine => $self,
503         data   => $value,
504         type   => $type,
505     });
506
507     $sector->write_data({
508         key     => $key,
509         key_md5 => $self->_apply_digest( $key ),
510         value   => $value_sector,
511     });
512
513     # This code is to make sure we write all the values in the $value to the disk
514     # and to make sure all changes to $value after the assignment are reflected
515     # on disk. This may be counter-intuitive at first, but it is correct dwimmery.
516     #   NOTE - simply tying $value won't perform a STORE on each value. Hence, the
517     # copy to a temp value.
518     if ( $r eq 'ARRAY' ) {
519         my @temp = @$value;
520         tie @$value, 'DBM::Deep', {
521             base_offset => $value_sector->offset,
522             staleness   => $value_sector->staleness,
523             storage     => $self->storage,
524             engine      => $self,
525         };
526         @$value = @temp;
527         bless $value, 'DBM::Deep::Array' unless Scalar::Util::blessed( $value );
528     }
529     elsif ( $r eq 'HASH' ) {
530         my %temp = %$value;
531         tie %$value, 'DBM::Deep', {
532             base_offset => $value_sector->offset,
533             staleness   => $value_sector->staleness,
534             storage     => $self->storage,
535             engine      => $self,
536         };
537
538         %$value = %temp;
539         bless $value, 'DBM::Deep::Hash' unless Scalar::Util::blessed( $value );
540     }
541
542     return 1;
543 }
544
545 =head2 get_next_key( $obj, $prev_key )
546
547 This takes an object that provides _base_offset() and an optional string
548 representing the prior key returned via a prior invocation of this method.
549
550 This method delegates to C<< DBM::Deep::Iterator->get_next_key() >>.
551
552 =cut
553
554 # XXX Add staleness here
555 sub get_next_key {
556     my $self = shift;
557     my ($obj, $prev_key) = @_;
558
559     # XXX Need to add logic about resetting the iterator if any key in the reference has changed
560     unless ( $prev_key ) {
561         $obj->{iterator} = DBM::Deep::Iterator->new({
562             base_offset => $obj->_base_offset,
563             engine      => $self,
564         });
565     }
566
567     return $obj->{iterator}->get_next_key( $obj );
568 }
569
570 =head2 setup_fh( $obj )
571
572 This takes an object that provides _base_offset(). It will do everything needed
573 in order to properly initialize all values for necessary functioning. If this is
574 called upon an already initialized object, this will also reset the inode.
575
576 This returns 1.
577
578 =cut
579
580 sub setup_fh {
581     my $self = shift;
582     my ($obj) = @_;
583
584     # We're opening the file.
585     unless ( $obj->_base_offset ) {
586         my $bytes_read = $self->_read_file_header;
587
588         # Creating a new file
589         unless ( $bytes_read ) {
590             $self->_write_file_header;
591
592             # 1) Create Array/Hash entry
593             my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
594                 engine => $self,
595                 type   => $obj->_type,
596             });
597             $obj->{base_offset} = $initial_reference->offset;
598             $obj->{staleness} = $initial_reference->staleness;
599
600             $self->storage->flush;
601         }
602         # Reading from an existing file
603         else {
604             $obj->{base_offset} = $bytes_read;
605             my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
606                 engine => $self,
607                 offset => $obj->_base_offset,
608             });
609             unless ( $initial_reference ) {
610                 DBM::Deep->_throw_error("Corrupted file, no master index record");
611             }
612
613             unless ($obj->_type eq $initial_reference->type) {
614                 DBM::Deep->_throw_error("File type mismatch");
615             }
616
617             $obj->{staleness} = $initial_reference->staleness;
618         }
619     }
620
621     $self->storage->set_inode;
622
623     return 1;
624 }
625
626 =head2 begin_work( $obj )
627
628 This takes an object that provides _base_offset(). It will set up all necessary
629 bookkeeping in order to run all work within a transaction.
630
631 If $obj is already within a transaction, an error wiill be thrown. If there are
632 no more available transactions, an error will be thrown.
633
634 This returns undef.
635
636 =cut
637
638 sub begin_work {
639     my $self = shift;
640     my ($obj) = @_;
641
642     if ( $self->trans_id ) {
643         DBM::Deep->_throw_error( "Cannot begin_work within an active transaction" );
644     }
645
646     my @slots = $self->read_txn_slots;
647     my $found;
648     for my $i ( 0 .. $#slots ) {
649         next if $slots[$i];
650
651         $slots[$i] = 1;
652         $self->set_trans_id( $i + 1 );
653         $found = 1;
654         last;
655     }
656     unless ( $found ) {
657         DBM::Deep->_throw_error( "Cannot allocate transaction ID" );
658     }
659     $self->write_txn_slots( @slots );
660
661     if ( !$self->trans_id ) {
662         DBM::Deep->_throw_error( "Cannot begin_work - no available transactions" );
663     }
664
665     return;
666 }
667
668 =head2 rollback( $obj )
669
670 This takes an object that provides _base_offset(). It will revert all
671 actions taken within the running transaction.
672
673 If $obj is not within a transaction, an error will be thrown.
674
675 This returns 1.
676
677 =cut
678
679 sub rollback {
680     my $self = shift;
681     my ($obj) = @_;
682
683     if ( !$self->trans_id ) {
684         DBM::Deep->_throw_error( "Cannot rollback without an active transaction" );
685     }
686
687     # Each entry is the file location for a bucket that has a modification for
688     # this transaction. The entries need to be expunged.
689     foreach my $entry (@{ $self->get_entries } ) {
690         # Remove the entry here
691         my $read_loc = $entry
692           + $self->hash_size
693           + $self->byte_size
694           + $self->byte_size
695           + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE );
696
697         my $data_loc = $self->storage->read_at( $read_loc, $self->byte_size );
698         $data_loc = unpack( $StP{$self->byte_size}, $data_loc );
699         $self->storage->print_at( $read_loc, pack( $StP{$self->byte_size}, 0 ) );
700
701         if ( $data_loc > 1 ) {
702             $self->_load_sector( $data_loc )->free;
703         }
704     }
705
706     $self->clear_entries;
707
708     my @slots = $self->read_txn_slots;
709     $slots[$self->trans_id-1] = 0;
710     $self->write_txn_slots( @slots );
711     $self->inc_txn_staleness_counter( $self->trans_id );
712     $self->set_trans_id( 0 );
713
714     return 1;
715 }
716
717 =head2 commit( $obj )
718
719 This takes an object that provides _base_offset(). It will apply all
720 actions taken within the transaction to the HEAD.
721
722 If $obj is not within a transaction, an error will be thrown.
723
724 This returns 1.
725
726 =cut
727
728 sub commit {
729     my $self = shift;
730     my ($obj) = @_;
731
732     if ( !$self->trans_id ) {
733         DBM::Deep->_throw_error( "Cannot commit without an active transaction" );
734     }
735
736     foreach my $entry (@{ $self->get_entries } ) {
737         # Overwrite the entry in head with the entry in trans_id
738         my $base = $entry
739           + $self->hash_size
740           + $self->byte_size;
741
742         my $head_loc = $self->storage->read_at( $base, $self->byte_size );
743         $head_loc = unpack( $StP{$self->byte_size}, $head_loc );
744
745         my $spot = $base + $self->byte_size + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE );
746         my $trans_loc = $self->storage->read_at(
747             $spot, $self->byte_size,
748         );
749
750         $self->storage->print_at( $base, $trans_loc );
751         $self->storage->print_at(
752             $spot,
753             pack( $StP{$self->byte_size} . ' ' . $StP{$STALE_SIZE}, (0) x 2 ),
754         );
755
756         if ( $head_loc > 1 ) {
757             $self->_load_sector( $head_loc )->free;
758         }
759     }
760
761     $self->clear_entries;
762
763     my @slots = $self->read_txn_slots;
764     $slots[$self->trans_id-1] = 0;
765     $self->write_txn_slots( @slots );
766     $self->inc_txn_staleness_counter( $self->trans_id );
767     $self->set_trans_id( 0 );
768
769     return 1;
770 }
771
772 =head2 lock_exclusive()
773
774 This takes an object that provides _base_offset(). It will guarantee that
775 the storage has taken precautions to be safe for a write.
776
777 This returns nothing.
778
779 =cut
780
781 sub lock_exclusive {
782     my $self = shift;
783     my ($obj) = @_;
784     return $self->storage->lock_exclusive( $obj );
785 }
786
787 =head2 lock_shared()
788
789 This takes an object that provides _base_offset(). It will guarantee that
790 the storage has taken precautions to be safe for a read.
791
792 This returns nothing.
793
794 =cut
795
796 sub lock_shared {
797     my $self = shift;
798     my ($obj) = @_;
799     return $self->storage->lock_shared( $obj );
800 }
801
802 =head2 unlock()
803
804 This takes an object that provides _base_offset(). It will guarantee that
805 the storage has released all locks taken.
806
807 This returns nothing.
808
809 =cut
810
811 sub unlock {
812     my $self = shift;
813     my ($obj) = @_;
814
815     my $rv = $self->storage->unlock( $obj );
816
817     $self->flush if $rv;
818
819     return $rv;
820 }
821
822 =head1 INTERNAL METHODS
823
824 The following methods are internal-use-only to DBM::Deep::Engine.
825
826 =cut
827
828 =head2 read_txn_slots()
829
830 This takes no arguments.
831
832 This will return an array with a 1 or 0 in each slot. Each spot represents one
833 available transaction. If the slot is 1, that transaction is taken. If it is 0,
834 the transaction is available.
835
836 =cut
837
838 sub read_txn_slots {
839     my $self = shift;
840     my $bl = $self->txn_bitfield_len;
841     my $num_bits = $bl * 8;
842     return split '', unpack( 'b'.$num_bits,
843         $self->storage->read_at(
844             $self->trans_loc, $bl,
845         )
846     );
847 }
848
849 =head2 write_txn_slots( @slots )
850
851 This takes an array of 1's and 0's. This array represents the transaction slots
852 returned by L</read_txn_slots()>. In other words, the following is true:
853
854   @x = read_txn_slots( write_txn_slots( @x ) );
855
856 (With the obviously missing object referents added back in.)
857
858 =cut
859
860 sub write_txn_slots {
861     my $self = shift;
862     my $num_bits = $self->txn_bitfield_len * 8;
863     $self->storage->print_at( $self->trans_loc,
864         pack( 'b'.$num_bits, join('', @_) ),
865     );
866 }
867
868 =head2 get_running_txn_ids()
869
870 This takes no arguments.
871
872 This will return an array of taken transaction IDs. This wraps L</read_txn_slots()>.
873
874 =cut
875
876 sub get_running_txn_ids {
877     my $self = shift;
878     my @transactions = $self->read_txn_slots;
879     my @trans_ids = map { $_+1 } grep { $transactions[$_] } 0 .. $#transactions;
880 }
881
882 =head2 get_txn_staleness_counter( $trans_id )
883
884 This will return the staleness counter for the given transaction ID. Please see
885 L</TRANSACTION STALENESS> for more information.
886
887 =cut
888
889 sub get_txn_staleness_counter {
890     my $self = shift;
891     my ($trans_id) = @_;
892
893     # Hardcode staleness of 0 for the HEAD
894     return 0 unless $trans_id;
895
896     return unpack( $StP{$STALE_SIZE},
897         $self->storage->read_at(
898             $self->trans_loc + $self->txn_bitfield_len + $STALE_SIZE * ($trans_id - 1),
899             $STALE_SIZE,
900         )
901     );
902 }
903
904 =head2 inc_txn_staleness_counter( $trans_id )
905
906 This will increment the staleness counter for the given transaction ID. Please see
907 L</TRANSACTION STALENESS> for more information.
908
909 =cut
910
911 sub inc_txn_staleness_counter {
912     my $self = shift;
913     my ($trans_id) = @_;
914
915     # Hardcode staleness of 0 for the HEAD
916     return 0 unless $trans_id;
917
918     $self->storage->print_at(
919         $self->trans_loc + $self->txn_bitfield_len + $STALE_SIZE * ($trans_id - 1),
920         pack( $StP{$STALE_SIZE}, $self->get_txn_staleness_counter( $trans_id ) + 1 ),
921     );
922 }
923
924 =head2 get_entries()
925
926 This takes no arguments.
927
928 This returns a list of all the sectors that have been modified by this transaction.
929
930 =cut
931
932 sub get_entries {
933     my $self = shift;
934     return [ keys %{ $self->{entries}{$self->trans_id} ||= {} } ];
935 }
936
937 =head2 add_entry( $trans_id, $location )
938
939 This takes a transaction ID and a file location and marks the sector at that
940 location as having been modified by the transaction identified by $trans_id.
941
942 This returns nothing.
943
944 B<NOTE>: Unlike all the other _entries() methods, there are several cases where
945 C<< $trans_id != $self->trans_id >> for this method.
946
947 =cut
948
949 sub add_entry {
950     my $self = shift;
951     my ($trans_id, $loc) = @_;
952
953     $self->{entries}{$trans_id} ||= {};
954     $self->{entries}{$trans_id}{$loc} = undef;
955 }
956
957 =head2 reindex_entry( $old_loc, $new_loc )
958
959 This takes two locations (old and new, respectively). If a location that has
960 been modified by this transaction is subsequently reindexed due to a bucketlist
961 overflowing, then the entries hash needs to be made aware of this change.
962
963 This returns nothing.
964
965 =cut
966
967 sub reindex_entry {
968     my $self = shift;
969     my ($old_loc, $new_loc) = @_;
970
971     TRANS:
972     while ( my ($trans_id, $locs) = each %{ $self->{entries} } ) {
973         if ( exists $locs->{$old_loc} ) {
974             delete $locs->{$old_loc};
975             $locs->{$new_loc} = undef;
976             next TRANS;
977         }
978     }
979 }
980
981 =head2 clear_entries()
982
983 This takes no arguments. It will clear the entries list for the running
984 transaction.
985
986 This returns nothing.
987
988 =cut
989
990 sub clear_entries {
991     my $self = shift;
992     delete $self->{entries}{$self->trans_id};
993 }
994
995 =head2 _write_file_header()
996
997 This writes the file header for a new file. This will write the various settings
998 that set how the file is interpreted.
999
1000 =head2 _read_file_header()
1001
1002 This reads the file header from an existing file. This will read the various
1003 settings that set how the file is interpreted.
1004
1005 =cut
1006
1007 {
1008     my $header_fixed = length( SIG_FILE ) + 1 + 4 + 4;
1009     my $this_file_version = 3;
1010
1011     sub _write_file_header {
1012         my $self = shift;
1013
1014         my $nt = $self->num_txns;
1015         my $bl = $self->txn_bitfield_len;
1016
1017         my $header_var = 1 + 1 + 1 + 1 + $bl + $STALE_SIZE * ($nt - 1) + 3 * $self->byte_size;
1018
1019         my $loc = $self->storage->request_space( $header_fixed + $header_var );
1020
1021         $self->storage->print_at( $loc,
1022             SIG_FILE,
1023             SIG_HEADER,
1024             pack('N', $this_file_version), # At this point, we're at 9 bytes
1025             pack('N', $header_var),        # header size
1026             # --- Above is $header_fixed. Below is $header_var
1027             pack('C', $self->byte_size),
1028
1029             # These shenanigans are to allow a 256 within a C
1030             pack('C', $self->max_buckets - 1),
1031             pack('C', $self->data_sector_size - 1),
1032
1033             pack('C', $nt),
1034             pack('C' . $bl, 0 ),                           # Transaction activeness bitfield
1035             pack($StP{$STALE_SIZE}.($nt-1), 0 x ($nt-1) ), # Transaction staleness counters
1036             pack($StP{$self->byte_size}, 0), # Start of free chain (blist size)
1037             pack($StP{$self->byte_size}, 0), # Start of free chain (data size)
1038             pack($StP{$self->byte_size}, 0), # Start of free chain (index size)
1039         );
1040
1041         #XXX Set these less fragilely
1042         $self->set_trans_loc( $header_fixed + 4 );
1043         $self->set_chains_loc( $header_fixed + 4 + $bl + $STALE_SIZE * ($nt-1) );
1044
1045         return;
1046     }
1047
1048     sub _read_file_header {
1049         my $self = shift;
1050
1051         my $buffer = $self->storage->read_at( 0, $header_fixed );
1052         return unless length($buffer);
1053
1054         my ($file_signature, $sig_header, $file_version, $size) = unpack(
1055             'A4 A N N', $buffer
1056         );
1057
1058         unless ( $file_signature eq SIG_FILE ) {
1059             $self->storage->close;
1060             DBM::Deep->_throw_error( "Signature not found -- file is not a Deep DB" );
1061         }
1062
1063         unless ( $sig_header eq SIG_HEADER ) {
1064             $self->storage->close;
1065             DBM::Deep->_throw_error( "Pre-1.00 file version found" );
1066         }
1067
1068         unless ( $file_version == $this_file_version ) {
1069             $self->storage->close;
1070             DBM::Deep->_throw_error(
1071                 "Wrong file version found - " .  $file_version .
1072                 " - expected " . $this_file_version
1073             );
1074         }
1075
1076         my $buffer2 = $self->storage->read_at( undef, $size );
1077         my @values = unpack( 'C C C C', $buffer2 );
1078
1079         if ( @values != 4 || grep { !defined } @values ) {
1080             $self->storage->close;
1081             DBM::Deep->_throw_error("Corrupted file - bad header");
1082         }
1083
1084         #XXX Add warnings if values weren't set right
1085         @{$self}{qw(byte_size max_buckets data_sector_size num_txns)} = @values;
1086
1087         # These shenangians are to allow a 256 within a C
1088         $self->{max_buckets} += 1;
1089         $self->{data_sector_size} += 1;
1090
1091         my $bl = $self->txn_bitfield_len;
1092
1093         my $header_var = scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) + 3 * $self->byte_size;
1094         unless ( $size == $header_var ) {
1095             $self->storage->close;
1096             DBM::Deep->_throw_error( "Unexpected size found ($size <-> $header_var)." );
1097         }
1098
1099         $self->set_trans_loc( $header_fixed + scalar(@values) );
1100         $self->set_chains_loc( $header_fixed + scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) );
1101
1102         return length($buffer) + length($buffer2);
1103     }
1104 }
1105
1106 =head2 _load_sector( $offset )
1107
1108 This will instantiate and return the sector object that represents the data found
1109 at $offset.
1110
1111 =cut
1112
1113 sub _load_sector {
1114     my $self = shift;
1115     my ($offset) = @_;
1116
1117     # Add a catch for offset of 0 or 1
1118     return if !$offset || $offset <= 1;
1119
1120     my $type = $self->storage->read_at( $offset, 1 );
1121     return if $type eq chr(0);
1122
1123     if ( $type eq $self->SIG_ARRAY || $type eq $self->SIG_HASH ) {
1124         return DBM::Deep::Engine::Sector::Reference->new({
1125             engine => $self,
1126             type   => $type,
1127             offset => $offset,
1128         });
1129     }
1130     # XXX Don't we need key_md5 here?
1131     elsif ( $type eq $self->SIG_BLIST ) {
1132         return DBM::Deep::Engine::Sector::BucketList->new({
1133             engine => $self,
1134             type   => $type,
1135             offset => $offset,
1136         });
1137     }
1138     elsif ( $type eq $self->SIG_INDEX ) {
1139         return DBM::Deep::Engine::Sector::Index->new({
1140             engine => $self,
1141             type   => $type,
1142             offset => $offset,
1143         });
1144     }
1145     elsif ( $type eq $self->SIG_NULL ) {
1146         return DBM::Deep::Engine::Sector::Null->new({
1147             engine => $self,
1148             type   => $type,
1149             offset => $offset,
1150         });
1151     }
1152     elsif ( $type eq $self->SIG_DATA ) {
1153         return DBM::Deep::Engine::Sector::Scalar->new({
1154             engine => $self,
1155             type   => $type,
1156             offset => $offset,
1157         });
1158     }
1159     # This was deleted from under us, so just return and let the caller figure it out.
1160     elsif ( $type eq $self->SIG_FREE ) {
1161         return;
1162     }
1163
1164     DBM::Deep->_throw_error( "'$offset': Don't know what to do with type '$type'" );
1165 }
1166
1167 =head2 _apply_digest( @stuff )
1168
1169 This will apply the digest methd (default to Digest::MD5::md5) to the arguments
1170 passed in and return the result.
1171
1172 =cut
1173
1174 sub _apply_digest {
1175     my $self = shift;
1176     return $self->{digest}->(@_);
1177 }
1178
1179 =head2 _add_free_blist_sector( $offset, $size )
1180
1181 =head2 _add_free_data_sector( $offset, $size )
1182
1183 =head2 _add_free_index_sector( $offset, $size )
1184
1185 These methods are all wrappers around _add_free_sector(), providing the proper
1186 chain offset ($multiple) for the sector type.
1187
1188 =cut
1189
1190 sub _add_free_blist_sector { shift->_add_free_sector( 0, @_ ) }
1191 sub _add_free_data_sector { shift->_add_free_sector( 1, @_ ) }
1192 sub _add_free_index_sector { shift->_add_free_sector( 2, @_ ) }
1193
1194 =head2 _add_free_sector( $multiple, $offset, $size )
1195
1196 _add_free_sector() takes the offset into the chains location, the offset of the
1197 sector, and the size of that sector. It will mark the sector as a free sector
1198 and put it into the list of sectors that are free of this type for use later.
1199
1200 This returns nothing.
1201
1202 B<NOTE>: $size is unused?
1203
1204 =cut
1205
1206 sub _add_free_sector {
1207     my $self = shift;
1208     my ($multiple, $offset, $size) = @_;
1209
1210     my $chains_offset = $multiple * $self->byte_size;
1211
1212     my $storage = $self->storage;
1213
1214     # Increment staleness.
1215     # XXX Can this increment+modulo be done by "&= 0x1" ?
1216     my $staleness = unpack( $StP{$STALE_SIZE}, $storage->read_at( $offset + SIG_SIZE, $STALE_SIZE ) );
1217     $staleness = ($staleness + 1 ) % ( 2 ** ( 8 * $STALE_SIZE ) );
1218     $storage->print_at( $offset + SIG_SIZE, pack( $StP{$STALE_SIZE}, $staleness ) );
1219
1220     my $old_head = $storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
1221
1222     $storage->print_at( $self->chains_loc + $chains_offset,
1223         pack( $StP{$self->byte_size}, $offset ),
1224     );
1225
1226     # Record the old head in the new sector after the signature and staleness counter
1227     $storage->print_at( $offset + SIG_SIZE + $STALE_SIZE, $old_head );
1228 }
1229
1230 =head2 _request_blist_sector( $size )
1231
1232 =head2 _request_data_sector( $size )
1233
1234 =head2 _request_index_sector( $size )
1235
1236 These methods are all wrappers around _request_sector(), providing the proper
1237 chain offset ($multiple) for the sector type.
1238
1239 =cut
1240
1241 sub _request_blist_sector { shift->_request_sector( 0, @_ ) }
1242 sub _request_data_sector { shift->_request_sector( 1, @_ ) }
1243 sub _request_index_sector { shift->_request_sector( 2, @_ ) }
1244
1245 =head2 _request_sector( $multiple $size )
1246
1247 This takes the offset into the chains location and the size of that sector.
1248
1249 This returns the object with the sector. If there is an available free sector of
1250 that type, then it will be reused. If there isn't one, then a new one will be
1251 allocated.
1252
1253 =cut
1254
1255 sub _request_sector {
1256     my $self = shift;
1257     my ($multiple, $size) = @_;
1258
1259     my $chains_offset = $multiple * $self->byte_size;
1260
1261     my $old_head = $self->storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
1262     my $loc = unpack( $StP{$self->byte_size}, $old_head );
1263
1264     # We don't have any free sectors of the right size, so allocate a new one.
1265     unless ( $loc ) {
1266         my $offset = $self->storage->request_space( $size );
1267
1268         # Zero out the new sector. This also guarantees correct increases
1269         # in the filesize.
1270         $self->storage->print_at( $offset, chr(0) x $size );
1271
1272         return $offset;
1273     }
1274
1275     # Read the new head after the signature and the staleness counter
1276     my $new_head = $self->storage->read_at( $loc + SIG_SIZE + $STALE_SIZE, $self->byte_size );
1277     $self->storage->print_at( $self->chains_loc + $chains_offset, $new_head );
1278     $self->storage->print_at(
1279         $loc + SIG_SIZE + $STALE_SIZE,
1280         pack( $StP{$self->byte_size}, 0 ),
1281     );
1282
1283     return $loc;
1284 }
1285
1286 =head2 flush()
1287
1288 This takes no arguments. It will do everything necessary to flush all things to
1289 disk. This is usually called during unlock() and setup_fh().
1290
1291 This returns nothing.
1292
1293 =cut
1294
1295 sub flush {
1296     my $self = shift;
1297
1298     # Why do we need to have the storage flush? Shouldn't autoflush take care of things?
1299     # -RobK, 2008-06-26
1300     $self->storage->flush;
1301 }
1302
1303 =head2 ACCESSORS
1304
1305 The following are readonly attributes.
1306
1307 =over 4
1308
1309 =item * storage
1310
1311 =item * byte_size
1312
1313 =item * hash_size
1314
1315 =item * hash_chars
1316
1317 =item * num_txns
1318
1319 =item * max_buckets
1320
1321 =item * blank_md5
1322
1323 =item * data_sector_size
1324
1325 =item * txn_bitfield_len
1326
1327 =back
1328
1329 =cut
1330
1331 sub storage     { $_[0]{storage} }
1332 sub byte_size   { $_[0]{byte_size} }
1333 sub hash_size   { $_[0]{hash_size} }
1334 sub hash_chars  { $_[0]{hash_chars} }
1335 sub num_txns    { $_[0]{num_txns} }
1336 sub max_buckets { $_[0]{max_buckets} }
1337 sub blank_md5   { chr(0) x $_[0]->hash_size }
1338 sub data_sector_size { $_[0]{data_sector_size} }
1339
1340 # This is a calculated value
1341 sub txn_bitfield_len {
1342     my $self = shift;
1343     unless ( exists $self->{txn_bitfield_len} ) {
1344         my $temp = ($self->num_txns) / 8;
1345         if ( $temp > int( $temp ) ) {
1346             $temp = int( $temp ) + 1;
1347         }
1348         $self->{txn_bitfield_len} = $temp;
1349     }
1350     return $self->{txn_bitfield_len};
1351 }
1352
1353 =pod
1354
1355 The following are read/write attributes. 
1356
1357 =over 4
1358
1359 =item * trans_id / set_trans_id( $new_id )
1360
1361 =item * trans_loc / set_trans_loc( $new_loc )
1362
1363 =item * chains_loc / set_chains_loc( $new_loc )
1364
1365 =back
1366
1367 =cut
1368
1369 sub trans_id     { $_[0]{trans_id} }
1370 sub set_trans_id { $_[0]{trans_id} = $_[1] }
1371
1372 sub trans_loc     { $_[0]{trans_loc} }
1373 sub set_trans_loc { $_[0]{trans_loc} = $_[1] }
1374
1375 sub chains_loc     { $_[0]{chains_loc} }
1376 sub set_chains_loc { $_[0]{chains_loc} = $_[1] }
1377
1378 sub cache       { $_[0]{cache} ||= {} }
1379 sub clear_cache { %{$_[0]->cache} = () }
1380
1381 =head2 _dump_file()
1382
1383 This method takes no arguments. It's used to print out a textual representation
1384 of the DBM::Deep DB file. It assumes the file is not-corrupted.
1385
1386 =cut
1387
1388 sub _dump_file {
1389     my $self = shift;
1390
1391     # Read the header
1392     my $spot = $self->_read_file_header();
1393
1394     my %types = (
1395         0 => 'B',
1396         1 => 'D',
1397         2 => 'I',
1398     );
1399
1400     my %sizes = (
1401         'D' => $self->data_sector_size,
1402         'B' => DBM::Deep::Engine::Sector::BucketList->new({engine=>$self,offset=>1})->size,
1403         'I' => DBM::Deep::Engine::Sector::Index->new({engine=>$self,offset=>1})->size,
1404     );
1405
1406     my $return = "";
1407
1408     # Header values
1409     $return .= "NumTxns: " . $self->num_txns . $/;
1410
1411     # Read the free sector chains
1412     my %sectors;
1413     foreach my $multiple ( 0 .. 2 ) {
1414         $return .= "Chains($types{$multiple}):";
1415         my $old_loc = $self->chains_loc + $multiple * $self->byte_size;
1416         while ( 1 ) {
1417             my $loc = unpack(
1418                 $StP{$self->byte_size},
1419                 $self->storage->read_at( $old_loc, $self->byte_size ),
1420             );
1421
1422             # We're now out of free sectors of this kind.
1423             unless ( $loc ) {
1424                 last;
1425             }
1426
1427             $sectors{ $types{$multiple} }{ $loc } = undef;
1428             $old_loc = $loc + SIG_SIZE + $STALE_SIZE;
1429             $return .= " $loc";
1430         }
1431         $return .= $/;
1432     }
1433
1434     SECTOR:
1435     while ( $spot < $self->storage->{end} ) {
1436         # Read each sector in order.
1437         my $sector = $self->_load_sector( $spot );
1438         if ( !$sector ) {
1439             # Find it in the free-sectors that were found already
1440             foreach my $type ( keys %sectors ) {
1441                 if ( exists $sectors{$type}{$spot} ) {
1442                     my $size = $sizes{$type};
1443                     $return .= sprintf "%08d: %s %04d\n", $spot, 'F' . $type, $size;
1444                     $spot += $size;
1445                     next SECTOR;
1446                 }
1447             }
1448
1449             die "********\n$return\nDidn't find free sector for $spot in chains\n********\n";
1450         }
1451         else {
1452             $return .= sprintf "%08d: %s  %04d", $spot, $sector->type, $sector->size;
1453             if ( $sector->type eq 'D' ) {
1454                 $return .= ' ' . $sector->data;
1455             }
1456             elsif ( $sector->type eq 'A' || $sector->type eq 'H' ) {
1457                 $return .= ' REF: ' . $sector->get_refcount;
1458             }
1459             elsif ( $sector->type eq 'B' ) {
1460                 foreach my $bucket ( $sector->chopped_up ) {
1461                     $return .= "\n    ";
1462                     $return .= sprintf "%08d", unpack($StP{$self->byte_size},
1463                         substr( $bucket->[-1], $self->hash_size, $self->byte_size),
1464                     );
1465                     my $l = unpack( $StP{$self->byte_size},
1466                         substr( $bucket->[-1],
1467                             $self->hash_size + $self->byte_size,
1468                             $self->byte_size,
1469                         ),
1470                     );
1471                     $return .= sprintf " %08d", $l;
1472                     foreach my $txn ( 0 .. $self->num_txns - 2 ) {
1473                         my $l = unpack( $StP{$self->byte_size},
1474                             substr( $bucket->[-1],
1475                                 $self->hash_size + 2 * $self->byte_size + $txn * ($self->byte_size + $STALE_SIZE),
1476                                 $self->byte_size,
1477                             ),
1478                         );
1479                         $return .= sprintf " %08d", $l;
1480                     }
1481                 }
1482             }
1483             $return .= $/;
1484
1485             $spot += $sector->size;
1486         }
1487     }
1488
1489     return $return;
1490 }
1491
1492 1;
1493 __END__