Speed up clear()
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine.pm
1 package DBM::Deep::Engine;
2
3 use 5.006_000;
4
5 use strict;
6 use warnings FATAL => 'all';
7 no warnings 'recursion';
8
9 # Never import symbols into our namespace. We are a class, not a library.
10 # -RobK, 2008-05-27
11 use Scalar::Util ();
12
13 #use Data::Dumper ();
14
15 # File-wide notes:
16 # * Every method in here assumes that the storage has been appropriately
17 #   safeguarded. This can be anything from flock() to some sort of manual
18 #   mutex. But, it's the caller's responsability to make sure that this has
19 #   been done.
20
21 # Setup file and tag signatures.  These should never change.
22 sub SIG_FILE     () { 'DPDB' }
23 sub SIG_HEADER   () { 'h'    }
24 sub SIG_HASH     () { 'H'    }
25 sub SIG_ARRAY    () { 'A'    }
26 sub SIG_NULL     () { 'N'    }
27 sub SIG_DATA     () { 'D'    }
28 sub SIG_INDEX    () { 'I'    }
29 sub SIG_BLIST    () { 'B'    }
30 sub SIG_FREE     () { 'F'    }
31 sub SIG_SIZE     () {  1     }
32
33 use DBM::Deep::Iterator ();
34 use DBM::Deep::Engine::Sector::Data ();
35 use DBM::Deep::Engine::Sector::BucketList ();
36 use DBM::Deep::Engine::Sector::Index ();
37 use DBM::Deep::Engine::Sector::Null ();
38 use DBM::Deep::Engine::Sector::Reference ();
39 use DBM::Deep::Engine::Sector::Scalar ();
40 use DBM::Deep::Null ();
41
42 my $STALE_SIZE = 2;
43
44 # Please refer to the pack() documentation for further information
45 my %StP = (
46     1 => 'C', # Unsigned char value (no order needed as it's just one byte)
47     2 => 'n', # Unsigned short in "network" (big-endian) order
48     4 => 'N', # Unsigned long in "network" (big-endian) order
49     8 => 'Q', # Usigned quad (no order specified, presumably machine-dependent)
50 );
51
52 =head1 NAME
53
54 DBM::Deep::Engine
55
56 =head1 PURPOSE
57
58 This is an internal-use-only object for L<DBM::Deep/>. It mediates the low-level
59 mapping between the L<DBM::Deep/> objects and the storage medium.
60
61 The purpose of this documentation is to provide low-level documentation for
62 developers. It is B<not> intended to be used by the general public. This
63 documentation and what it documents can and will change without notice.
64
65 =head1 OVERVIEW
66
67 The engine exposes an API to the DBM::Deep objects (DBM::Deep, DBM::Deep::Array,
68 and DBM::Deep::Hash) for their use to access the actual stored values. This API
69 is the following:
70
71 =over 4
72
73 =item * new
74
75 =item * read_value
76
77 =item * get_classname
78
79 =item * make_reference
80
81 =item * key_exists
82
83 =item * delete_key
84
85 =item * write_value
86
87 =item * get_next_key
88
89 =item * clear
90
91 =item * setup_fh
92
93 =item * begin_work
94
95 =item * commit
96
97 =item * rollback
98
99 =item * lock_exclusive
100
101 =item * lock_shared
102
103 =item * unlock
104
105 =back
106
107 They are explained in their own sections below. These methods, in turn, may
108 provide some bounds-checking, but primarily act to instantiate objects in the
109 Engine::Sector::* hierarchy and dispatch to them.
110
111 =head1 TRANSACTIONS
112
113 Transactions in DBM::Deep are implemented using a variant of MVCC. This attempts
114 to keep the amount of actual work done against the file low while stil providing
115 Atomicity, Consistency, and Isolation. Durability, unfortunately, cannot be done
116 with only one file.
117
118 =head2 STALENESS
119
120 If another process uses a transaction slot and writes stuff to it, then terminates,
121 the data that process wrote it still within the file. In order to address this,
122 there is also a transaction staleness counter associated within every write.
123 Each time a transaction is started, that process increments that transaction's
124 staleness counter. If, when it reads a value, the staleness counters aren't
125 identical, DBM::Deep will consider the value on disk to be stale and discard it.
126
127 =head2 DURABILITY
128
129 The fourth leg of ACID is Durability, the guarantee that when a commit returns,
130 the data will be there the next time you read from it. This should be regardless
131 of any crashes or powerdowns in between the commit and subsequent read. DBM::Deep
132 does provide that guarantee; once the commit returns, all of the data has been
133 transferred from the transaction shadow to the HEAD. The issue arises with partial
134 commits - a commit that is interrupted in some fashion. In keeping with DBM::Deep's
135 "tradition" of very light error-checking and non-existent error-handling, there is
136 no way to recover from a partial commit. (This is probably a failure in Consistency
137 as well as Durability.)
138
139 Other DBMSes use transaction logs (a separate file, generally) to achieve Durability.
140 As DBM::Deep is a single-file, we would have to do something similar to what SQLite
141 and BDB do in terms of committing using synchonized writes. To do this, we would have
142 to use a much higher RAM footprint and some serious programming that make my head
143 hurts just to think about it.
144
145 =head1 EXTERNAL METHODS
146
147 =head2 new()
148
149 This takes a set of args. These args are described in the documentation for
150 L<DBM::Deep/new>.
151
152 =cut
153
154 sub new {
155     my $class = shift;
156     my ($args) = @_;
157
158     $args->{storage} = DBM::Deep::File->new( $args )
159         unless exists $args->{storage};
160
161     my $self = bless {
162         byte_size   => 4,
163
164         digest      => undef,
165         hash_size   => 16,  # In bytes
166         hash_chars  => 256, # Number of chars the algorithm uses per byte
167         max_buckets => 16,
168         num_txns    => 1,   # The HEAD
169         trans_id    => 0,   # Default to the HEAD
170
171         data_sector_size => 64, # Size in bytes of each data sector
172
173         entries => {}, # This is the list of entries for transactions
174         storage => undef,
175     }, $class;
176
177     # Never allow byte_size to be set directly.
178     delete $args->{byte_size};
179     if ( defined $args->{pack_size} ) {
180         if ( lc $args->{pack_size} eq 'small' ) {
181             $args->{byte_size} = 2;
182         }
183         elsif ( lc $args->{pack_size} eq 'medium' ) {
184             $args->{byte_size} = 4;
185         }
186         elsif ( lc $args->{pack_size} eq 'large' ) {
187             $args->{byte_size} = 8;
188         }
189         else {
190             DBM::Deep->_throw_error( "Unknown pack_size value: '$args->{pack_size}'" );
191         }
192     }
193
194     # Grab the parameters we want to use
195     foreach my $param ( keys %$self ) {
196         next unless exists $args->{$param};
197         $self->{$param} = $args->{$param};
198     }
199
200     my %validations = (
201         max_buckets      => { floor => 16, ceil => 256 },
202         num_txns         => { floor => 1,  ceil => 255 },
203         data_sector_size => { floor => 32, ceil => 256 },
204     );
205
206     while ( my ($attr, $c) = each %validations ) {
207         if (   !defined $self->{$attr}
208             || !length $self->{$attr}
209             || $self->{$attr} =~ /\D/
210             || $self->{$attr} < $c->{floor}
211         ) {
212             $self->{$attr} = '(undef)' if !defined $self->{$attr};
213             warn "Floor of $attr is $c->{floor}. Setting it to $c->{floor} from '$self->{$attr}'\n";
214             $self->{$attr} = $c->{floor};
215         }
216         elsif ( $self->{$attr} > $c->{ceil} ) {
217             warn "Ceiling of $attr is $c->{ceil}. Setting it to $c->{ceil} from '$self->{$attr}'\n";
218             $self->{$attr} = $c->{ceil};
219         }
220     }
221
222     if ( !$self->{digest} ) {
223         require Digest::MD5;
224         $self->{digest} = \&Digest::MD5::md5;
225     }
226
227     return $self;
228 }
229
230 =head2 read_value( $obj, $key )
231
232 This takes an object that provides _base_offset() and a string. It returns the
233 value stored in the corresponding Sector::Value's data section.
234
235 =cut
236
237 sub read_value {
238     my $self = shift;
239     my ($obj, $key) = @_;
240
241     # This will be a Reference sector
242     my $sector = $self->_load_sector( $obj->_base_offset )
243         or return;
244
245     if ( $sector->staleness != $obj->_staleness ) {
246         return;
247     }
248
249     my $key_md5 = $self->_apply_digest( $key );
250
251     my $value_sector = $sector->get_data_for({
252         key_md5    => $key_md5,
253         allow_head => 1,
254     });
255
256     unless ( $value_sector ) {
257         $value_sector = DBM::Deep::Engine::Sector::Null->new({
258             engine => $self,
259             data   => undef,
260         });
261
262         $sector->write_data({
263             key_md5 => $key_md5,
264             key     => $key,
265             value   => $value_sector,
266         });
267     }
268
269     return $value_sector->data;
270 }
271
272 =head2 get_classname( $obj )
273
274 This takes an object that provides _base_offset() and returns the classname (if any)
275 associated with it.
276
277 It delegates to Sector::Reference::get_classname() for the heavy lifting.
278
279 It performs a staleness check.
280
281 =cut
282
283 sub get_classname {
284     my $self = shift;
285     my ($obj) = @_;
286
287     # This will be a Reference sector
288     my $sector = $self->_load_sector( $obj->_base_offset )
289         or DBM::Deep->_throw_error( "How did get_classname fail (no sector for '$obj')?!" );
290
291     if ( $sector->staleness != $obj->_staleness ) {
292         return;
293     }
294
295     return $sector->get_classname;
296 }
297
298 =head2 make_reference( $obj, $old_key, $new_key )
299
300 This takes an object that provides _base_offset() and two strings. The
301 strings correspond to the old key and new key, respectively. This operation
302 is equivalent to (given C<< $db->{foo} = []; >>) C<< $db->{bar} = $db->{foo}; >>.
303
304 This returns nothing.
305
306 =cut
307
308 sub make_reference {
309     my $self = shift;
310     my ($obj, $old_key, $new_key) = @_;
311
312     # This will be a Reference sector
313     my $sector = $self->_load_sector( $obj->_base_offset )
314         or DBM::Deep->_throw_error( "How did make_reference fail (no sector for '$obj')?!" );
315
316     if ( $sector->staleness != $obj->_staleness ) {
317         return;
318     }
319
320     my $old_md5 = $self->_apply_digest( $old_key );
321
322     my $value_sector = $sector->get_data_for({
323         key_md5    => $old_md5,
324         allow_head => 1,
325     });
326
327     unless ( $value_sector ) {
328         $value_sector = DBM::Deep::Engine::Sector::Null->new({
329             engine => $self,
330             data   => undef,
331         });
332
333         $sector->write_data({
334             key_md5 => $old_md5,
335             key     => $old_key,
336             value   => $value_sector,
337         });
338     }
339
340     if ( $value_sector->isa( 'DBM::Deep::Engine::Sector::Reference' ) ) {
341         $sector->write_data({
342             key     => $new_key,
343             key_md5 => $self->_apply_digest( $new_key ),
344             value   => $value_sector,
345         });
346         $value_sector->increment_refcount;
347     }
348     else {
349         $sector->write_data({
350             key     => $new_key,
351             key_md5 => $self->_apply_digest( $new_key ),
352             value   => $value_sector->clone,
353         });
354     }
355
356     return;
357 }
358
359 =head2 key_exists( $obj, $key )
360
361 This takes an object that provides _base_offset() and a string for
362 the key to be checked. This returns 1 for true and "" for false.
363
364 =cut
365
366 sub key_exists {
367     my $self = shift;
368     my ($obj, $key) = @_;
369
370     # This will be a Reference sector
371     my $sector = $self->_load_sector( $obj->_base_offset )
372         or return '';
373
374     if ( $sector->staleness != $obj->_staleness ) {
375         return '';
376     }
377
378     my $data = $sector->get_data_for({
379         key_md5    => $self->_apply_digest( $key ),
380         allow_head => 1,
381     });
382
383     # exists() returns 1 or '' for true/false.
384     return $data ? 1 : '';
385 }
386
387 =head2 delete_key( $obj, $key )
388
389 This takes an object that provides _base_offset() and a string for
390 the key to be deleted. This returns the result of the Sector::Reference
391 delete_key() method.
392
393 =cut
394
395 sub delete_key {
396     my $self = shift;
397     my ($obj, $key) = @_;
398
399     my $sector = $self->_load_sector( $obj->_base_offset )
400         or return;
401
402     if ( $sector->staleness != $obj->_staleness ) {
403         return;
404     }
405
406     return $sector->delete_key({
407         key_md5    => $self->_apply_digest( $key ),
408         allow_head => 0,
409     });
410 }
411
412 =head2 write_value( $obj, $key, $value )
413
414 This takes an object that provides _base_offset(), a string for the
415 key, and a value. This value can be anything storable within L<DBM::Deep/>.
416
417 This returns 1 upon success.
418
419 =cut
420
421 sub write_value {
422     my $self = shift;
423     my ($obj, $key, $value) = @_;
424
425     my $r = Scalar::Util::reftype( $value ) || '';
426     {
427         last if $r eq '';
428         last if $r eq 'HASH';
429         last if $r eq 'ARRAY';
430
431         DBM::Deep->_throw_error(
432             "Storage of references of type '$r' is not supported."
433         );
434     }
435
436     # This will be a Reference sector
437     my $sector = $self->_load_sector( $obj->_base_offset )
438         or DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep." );
439
440     if ( $sector->staleness != $obj->_staleness ) {
441         DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep." );
442     }
443
444     my ($class, $type);
445     if ( !defined $value ) {
446         $class = 'DBM::Deep::Engine::Sector::Null';
447     }
448     elsif ( $r eq 'ARRAY' || $r eq 'HASH' ) {
449         my $tmpvar;
450         if ( $r eq 'ARRAY' ) {
451             $tmpvar = tied @$value;
452         } elsif ( $r eq 'HASH' ) {
453             $tmpvar = tied %$value;
454         }
455
456         if ( $tmpvar ) {
457             my $is_dbm_deep = eval { local $SIG{'__DIE__'}; $tmpvar->isa( 'DBM::Deep' ); };
458
459             unless ( $is_dbm_deep ) {
460                 DBM::Deep->_throw_error( "Cannot store something that is tied." );
461             }
462
463             unless ( $tmpvar->_engine->storage == $self->storage ) {
464                 DBM::Deep->_throw_error( "Cannot store values across DBM::Deep files. Please use export() instead." );
465             }
466
467             # First, verify if we're storing the same thing to this spot. If we are, then
468             # this should be a no-op. -EJS, 2008-05-19
469             my $loc = $sector->get_data_location_for({
470                 key_md5 => $self->_apply_digest( $key ),
471                 allow_head => 1,
472             });
473
474             if ( defined($loc) && $loc == $tmpvar->_base_offset ) {
475                 return 1;
476             }
477
478             #XXX Can this use $loc?
479             my $value_sector = $self->_load_sector( $tmpvar->_base_offset );
480             $sector->write_data({
481                 key     => $key,
482                 key_md5 => $self->_apply_digest( $key ),
483                 value   => $value_sector,
484             });
485             $value_sector->increment_refcount;
486
487             return 1;
488         }
489
490         $class = 'DBM::Deep::Engine::Sector::Reference';
491         $type = substr( $r, 0, 1 );
492     }
493     else {
494         if ( tied($value) ) {
495             DBM::Deep->_throw_error( "Cannot store something that is tied." );
496         }
497         $class = 'DBM::Deep::Engine::Sector::Scalar';
498     }
499
500     # Create this after loading the reference sector in case something bad happens.
501     # This way, we won't allocate value sector(s) needlessly.
502     my $value_sector = $class->new({
503         engine => $self,
504         data   => $value,
505         type   => $type,
506     });
507
508     $sector->write_data({
509         key     => $key,
510         key_md5 => $self->_apply_digest( $key ),
511         value   => $value_sector,
512     });
513
514     # This code is to make sure we write all the values in the $value to the disk
515     # and to make sure all changes to $value after the assignment are reflected
516     # on disk. This may be counter-intuitive at first, but it is correct dwimmery.
517     #   NOTE - simply tying $value won't perform a STORE on each value. Hence, the
518     # copy to a temp value.
519     if ( $r eq 'ARRAY' ) {
520         my @temp = @$value;
521         tie @$value, 'DBM::Deep', {
522             base_offset => $value_sector->offset,
523             staleness   => $value_sector->staleness,
524             storage     => $self->storage,
525             engine      => $self,
526         };
527         @$value = @temp;
528         bless $value, 'DBM::Deep::Array' unless Scalar::Util::blessed( $value );
529     }
530     elsif ( $r eq 'HASH' ) {
531         my %temp = %$value;
532         tie %$value, 'DBM::Deep', {
533             base_offset => $value_sector->offset,
534             staleness   => $value_sector->staleness,
535             storage     => $self->storage,
536             engine      => $self,
537         };
538
539         %$value = %temp;
540         bless $value, 'DBM::Deep::Hash' unless Scalar::Util::blessed( $value );
541     }
542
543     return 1;
544 }
545
546 =head2 get_next_key( $obj, $prev_key )
547
548 This takes an object that provides _base_offset() and an optional string
549 representing the prior key returned via a prior invocation of this method.
550
551 This method delegates to C<< DBM::Deep::Iterator->get_next_key() >>.
552
553 =cut
554
555 # XXX Add staleness here
556 sub get_next_key {
557     my $self = shift;
558     my ($obj, $prev_key) = @_;
559
560     # XXX Need to add logic about resetting the iterator if any key in the reference has changed
561     unless ( defined $prev_key ) {
562         $obj->{iterator} = DBM::Deep::Iterator->new({
563             base_offset => $obj->_base_offset,
564             engine      => $self,
565         });
566     }
567
568     return $obj->{iterator}->get_next_key( $obj );
569 }
570
571 =head2 clear( $obj )
572
573 This takes an object that provides _base_offset() and deletes all its 
574 elements, returning nothing.
575
576 =cut
577
578 sub clear {
579     my $self = shift;
580     my $obj = shift;
581
582     my $sector = $self->_load_sector( $obj->_base_offset )
583         or return;
584
585     if ( $sector->staleness != $obj->_staleness ) {
586         return;
587     }
588
589     $sector->clear;
590     return;
591 }
592
593 =head2 setup_fh( $obj )
594
595 This takes an object that provides _base_offset(). It will do everything needed
596 in order to properly initialize all values for necessary functioning. If this is
597 called upon an already initialized object, this will also reset the inode.
598
599 This returns 1.
600
601 =cut
602
603 sub setup_fh {
604     my $self = shift;
605     my ($obj) = @_;
606
607     # We're opening the file.
608     unless ( $obj->_base_offset ) {
609         my $bytes_read = $self->_read_file_header;
610
611         # Creating a new file
612         unless ( $bytes_read ) {
613             $self->_write_file_header;
614
615             # 1) Create Array/Hash entry
616             my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
617                 engine => $self,
618                 type   => $obj->_type,
619             });
620             $obj->{base_offset} = $initial_reference->offset;
621             $obj->{staleness} = $initial_reference->staleness;
622
623             $self->storage->flush;
624         }
625         # Reading from an existing file
626         else {
627             $obj->{base_offset} = $bytes_read;
628             my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
629                 engine => $self,
630                 offset => $obj->_base_offset,
631             });
632             unless ( $initial_reference ) {
633                 DBM::Deep->_throw_error("Corrupted file, no master index record");
634             }
635
636             unless ($obj->_type eq $initial_reference->type) {
637                 DBM::Deep->_throw_error("File type mismatch");
638             }
639
640             $obj->{staleness} = $initial_reference->staleness;
641         }
642     }
643
644     $self->storage->set_inode;
645
646     return 1;
647 }
648
649 =head2 begin_work( $obj )
650
651 This takes an object that provides _base_offset(). It will set up all necessary
652 bookkeeping in order to run all work within a transaction.
653
654 If $obj is already within a transaction, an error wiill be thrown. If there are
655 no more available transactions, an error will be thrown.
656
657 This returns undef.
658
659 =cut
660
661 sub begin_work {
662     my $self = shift;
663     my ($obj) = @_;
664
665     if ( $self->trans_id ) {
666         DBM::Deep->_throw_error( "Cannot begin_work within an active transaction" );
667     }
668
669     my @slots = $self->read_txn_slots;
670     my $found;
671     for my $i ( 0 .. $#slots ) {
672         next if $slots[$i];
673
674         $slots[$i] = 1;
675         $self->set_trans_id( $i + 1 );
676         $found = 1;
677         last;
678     }
679     unless ( $found ) {
680         DBM::Deep->_throw_error( "Cannot allocate transaction ID" );
681     }
682     $self->write_txn_slots( @slots );
683
684     if ( !$self->trans_id ) {
685         DBM::Deep->_throw_error( "Cannot begin_work - no available transactions" );
686     }
687
688     return;
689 }
690
691 =head2 rollback( $obj )
692
693 This takes an object that provides _base_offset(). It will revert all
694 actions taken within the running transaction.
695
696 If $obj is not within a transaction, an error will be thrown.
697
698 This returns 1.
699
700 =cut
701
702 sub rollback {
703     my $self = shift;
704     my ($obj) = @_;
705
706     if ( !$self->trans_id ) {
707         DBM::Deep->_throw_error( "Cannot rollback without an active transaction" );
708     }
709
710     # Each entry is the file location for a bucket that has a modification for
711     # this transaction. The entries need to be expunged.
712     foreach my $entry (@{ $self->get_entries } ) {
713         # Remove the entry here
714         my $read_loc = $entry
715           + $self->hash_size
716           + $self->byte_size
717           + $self->byte_size
718           + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE );
719
720         my $data_loc = $self->storage->read_at( $read_loc, $self->byte_size );
721         $data_loc = unpack( $StP{$self->byte_size}, $data_loc );
722         $self->storage->print_at( $read_loc, pack( $StP{$self->byte_size}, 0 ) );
723
724         if ( $data_loc > 1 ) {
725             $self->_load_sector( $data_loc )->free;
726         }
727     }
728
729     $self->clear_entries;
730
731     my @slots = $self->read_txn_slots;
732     $slots[$self->trans_id-1] = 0;
733     $self->write_txn_slots( @slots );
734     $self->inc_txn_staleness_counter( $self->trans_id );
735     $self->set_trans_id( 0 );
736
737     return 1;
738 }
739
740 =head2 commit( $obj )
741
742 This takes an object that provides _base_offset(). It will apply all
743 actions taken within the transaction to the HEAD.
744
745 If $obj is not within a transaction, an error will be thrown.
746
747 This returns 1.
748
749 =cut
750
751 sub commit {
752     my $self = shift;
753     my ($obj) = @_;
754
755     if ( !$self->trans_id ) {
756         DBM::Deep->_throw_error( "Cannot commit without an active transaction" );
757     }
758
759     foreach my $entry (@{ $self->get_entries } ) {
760         # Overwrite the entry in head with the entry in trans_id
761         my $base = $entry
762           + $self->hash_size
763           + $self->byte_size;
764
765         my $head_loc = $self->storage->read_at( $base, $self->byte_size );
766         $head_loc = unpack( $StP{$self->byte_size}, $head_loc );
767
768         my $spot = $base + $self->byte_size + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE );
769         my $trans_loc = $self->storage->read_at(
770             $spot, $self->byte_size,
771         );
772
773         $self->storage->print_at( $base, $trans_loc );
774         $self->storage->print_at(
775             $spot,
776             pack( $StP{$self->byte_size} . ' ' . $StP{$STALE_SIZE}, (0) x 2 ),
777         );
778
779         if ( $head_loc > 1 ) {
780             $self->_load_sector( $head_loc )->free;
781         }
782     }
783
784     $self->clear_entries;
785
786     my @slots = $self->read_txn_slots;
787     $slots[$self->trans_id-1] = 0;
788     $self->write_txn_slots( @slots );
789     $self->inc_txn_staleness_counter( $self->trans_id );
790     $self->set_trans_id( 0 );
791
792     return 1;
793 }
794
795 =head2 lock_exclusive()
796
797 This takes an object that provides _base_offset(). It will guarantee that
798 the storage has taken precautions to be safe for a write.
799
800 This returns nothing.
801
802 =cut
803
804 sub lock_exclusive {
805     my $self = shift;
806     my ($obj) = @_;
807     return $self->storage->lock_exclusive( $obj );
808 }
809
810 =head2 lock_shared()
811
812 This takes an object that provides _base_offset(). It will guarantee that
813 the storage has taken precautions to be safe for a read.
814
815 This returns nothing.
816
817 =cut
818
819 sub lock_shared {
820     my $self = shift;
821     my ($obj) = @_;
822     return $self->storage->lock_shared( $obj );
823 }
824
825 =head2 unlock()
826
827 This takes an object that provides _base_offset(). It will guarantee that
828 the storage has released all locks taken.
829
830 This returns nothing.
831
832 =cut
833
834 sub unlock {
835     my $self = shift;
836     my ($obj) = @_;
837
838     my $rv = $self->storage->unlock( $obj );
839
840     $self->flush if $rv;
841
842     return $rv;
843 }
844
845 =head1 INTERNAL METHODS
846
847 The following methods are internal-use-only to DBM::Deep::Engine.
848
849 =cut
850
851 =head2 read_txn_slots()
852
853 This takes no arguments.
854
855 This will return an array with a 1 or 0 in each slot. Each spot represents one
856 available transaction. If the slot is 1, that transaction is taken. If it is 0,
857 the transaction is available.
858
859 =cut
860
861 sub read_txn_slots {
862     my $self = shift;
863     my $bl = $self->txn_bitfield_len;
864     my $num_bits = $bl * 8;
865     return split '', unpack( 'b'.$num_bits,
866         $self->storage->read_at(
867             $self->trans_loc, $bl,
868         )
869     );
870 }
871
872 =head2 write_txn_slots( @slots )
873
874 This takes an array of 1's and 0's. This array represents the transaction slots
875 returned by L</read_txn_slots()>. In other words, the following is true:
876
877   @x = read_txn_slots( write_txn_slots( @x ) );
878
879 (With the obviously missing object referents added back in.)
880
881 =cut
882
883 sub write_txn_slots {
884     my $self = shift;
885     my $num_bits = $self->txn_bitfield_len * 8;
886     $self->storage->print_at( $self->trans_loc,
887         pack( 'b'.$num_bits, join('', @_) ),
888     );
889 }
890
891 =head2 get_running_txn_ids()
892
893 This takes no arguments.
894
895 This will return an array of taken transaction IDs. This wraps L</read_txn_slots()>.
896
897 =cut
898
899 sub get_running_txn_ids {
900     my $self = shift;
901     my @transactions = $self->read_txn_slots;
902     my @trans_ids = map { $_+1 } grep { $transactions[$_] } 0 .. $#transactions;
903 }
904
905 =head2 get_txn_staleness_counter( $trans_id )
906
907 This will return the staleness counter for the given transaction ID. Please see
908 L</TRANSACTION STALENESS> for more information.
909
910 =cut
911
912 sub get_txn_staleness_counter {
913     my $self = shift;
914     my ($trans_id) = @_;
915
916     # Hardcode staleness of 0 for the HEAD
917     return 0 unless $trans_id;
918
919     return unpack( $StP{$STALE_SIZE},
920         $self->storage->read_at(
921             $self->trans_loc + $self->txn_bitfield_len + $STALE_SIZE * ($trans_id - 1),
922             $STALE_SIZE,
923         )
924     );
925 }
926
927 =head2 inc_txn_staleness_counter( $trans_id )
928
929 This will increment the staleness counter for the given transaction ID. Please see
930 L</TRANSACTION STALENESS> for more information.
931
932 =cut
933
934 sub inc_txn_staleness_counter {
935     my $self = shift;
936     my ($trans_id) = @_;
937
938     # Hardcode staleness of 0 for the HEAD
939     return 0 unless $trans_id;
940
941     $self->storage->print_at(
942         $self->trans_loc + $self->txn_bitfield_len + $STALE_SIZE * ($trans_id - 1),
943         pack( $StP{$STALE_SIZE}, $self->get_txn_staleness_counter( $trans_id ) + 1 ),
944     );
945 }
946
947 =head2 get_entries()
948
949 This takes no arguments.
950
951 This returns a list of all the sectors that have been modified by this transaction.
952
953 =cut
954
955 sub get_entries {
956     my $self = shift;
957     return [ keys %{ $self->{entries}{$self->trans_id} ||= {} } ];
958 }
959
960 =head2 add_entry( $trans_id, $location )
961
962 This takes a transaction ID and a file location and marks the sector at that location
963 as having been modified by the transaction identified by $trans_id.
964
965 This returns nothing.
966
967 B<NOTE>: Unlike all the other _entries() methods, there are several cases where
968 C<< $trans_id != $self->trans_id >> for this method.
969
970 =cut
971
972 sub add_entry {
973     my $self = shift;
974     my ($trans_id, $loc) = @_;
975
976     $self->{entries}{$trans_id} ||= {};
977     $self->{entries}{$trans_id}{$loc} = undef;
978 }
979
980 =head2 reindex_entry( $old_loc, $new_loc )
981
982 This takes two locations (old and new, respectively). If a location that has been
983 modified by this transaction is subsequently reindexed due to a bucketlist
984 overflowing, then the entries hash needs to be made aware of this change.
985
986 This returns nothing.
987
988 =cut
989
990 sub reindex_entry {
991     my $self = shift;
992     my ($old_loc, $new_loc) = @_;
993
994     TRANS:
995     while ( my ($trans_id, $locs) = each %{ $self->{entries} } ) {
996         if ( exists $locs->{$old_loc} ) {
997             delete $locs->{$old_loc};
998             $locs->{$new_loc} = undef;
999             next TRANS;
1000         }
1001     }
1002 }
1003
1004 =head2 clear_entries()
1005
1006 This takes no arguments. It will clear the entries list for the running transaction.
1007
1008 This returns nothing.
1009
1010 =cut
1011
1012 sub clear_entries {
1013     my $self = shift;
1014     delete $self->{entries}{$self->trans_id};
1015 }
1016
1017 =head2 _write_file_header()
1018
1019 This writes the file header for a new file. This will write the various settings
1020 that set how the file is interpreted.
1021
1022 =head2 _read_file_header()
1023
1024 This reads the file header from an existing file. This will read the various
1025 settings that set how the file is interpreted.
1026
1027 =cut
1028
1029 {
1030     my $header_fixed = length( SIG_FILE ) + 1 + 4 + 4;
1031     my $this_file_version = 3;
1032
1033     sub _write_file_header {
1034         my $self = shift;
1035
1036         my $nt = $self->num_txns;
1037         my $bl = $self->txn_bitfield_len;
1038
1039         my $header_var = 1 + 1 + 1 + 1 + $bl + $STALE_SIZE * ($nt - 1) + 3 * $self->byte_size;
1040
1041         my $loc = $self->storage->request_space( $header_fixed + $header_var );
1042
1043         $self->storage->print_at( $loc,
1044             SIG_FILE,
1045             SIG_HEADER,
1046             pack('N', $this_file_version), # At this point, we're at 9 bytes
1047             pack('N', $header_var),        # header size
1048             # --- Above is $header_fixed. Below is $header_var
1049             pack('C', $self->byte_size),
1050
1051             # These shenanigans are to allow a 256 within a C
1052             pack('C', $self->max_buckets - 1),
1053             pack('C', $self->data_sector_size - 1),
1054
1055             pack('C', $nt),
1056             pack('C' . $bl, 0 ),                           # Transaction activeness bitfield
1057             pack($StP{$STALE_SIZE}.($nt-1), 0 x ($nt-1) ), # Transaction staleness counters
1058             pack($StP{$self->byte_size}, 0), # Start of free chain (blist size)
1059             pack($StP{$self->byte_size}, 0), # Start of free chain (data size)
1060             pack($StP{$self->byte_size}, 0), # Start of free chain (index size)
1061         );
1062
1063         #XXX Set these less fragilely
1064         $self->set_trans_loc( $header_fixed + 4 );
1065         $self->set_chains_loc( $header_fixed + 4 + $bl + $STALE_SIZE * ($nt-1) );
1066
1067         return;
1068     }
1069
1070     sub _read_file_header {
1071         my $self = shift;
1072
1073         my $buffer = $self->storage->read_at( 0, $header_fixed );
1074         return unless length($buffer);
1075
1076         my ($file_signature, $sig_header, $file_version, $size) = unpack(
1077             'A4 A N N', $buffer
1078         );
1079
1080         unless ( $file_signature eq SIG_FILE ) {
1081             $self->storage->close;
1082             DBM::Deep->_throw_error( "Signature not found -- file is not a Deep DB" );
1083         }
1084
1085         unless ( $sig_header eq SIG_HEADER ) {
1086             $self->storage->close;
1087             DBM::Deep->_throw_error( "Pre-1.00 file version found" );
1088         }
1089
1090         unless ( $file_version == $this_file_version ) {
1091             $self->storage->close;
1092             DBM::Deep->_throw_error(
1093                 "Wrong file version found - " .  $file_version .
1094                 " - expected " . $this_file_version
1095             );
1096         }
1097
1098         my $buffer2 = $self->storage->read_at( undef, $size );
1099         my @values = unpack( 'C C C C', $buffer2 );
1100
1101         if ( @values != 4 || grep { !defined } @values ) {
1102             $self->storage->close;
1103             DBM::Deep->_throw_error("Corrupted file - bad header");
1104         }
1105
1106         #XXX Add warnings if values weren't set right
1107         @{$self}{qw(byte_size max_buckets data_sector_size num_txns)} = @values;
1108
1109         # These shenangians are to allow a 256 within a C
1110         $self->{max_buckets} += 1;
1111         $self->{data_sector_size} += 1;
1112
1113         my $bl = $self->txn_bitfield_len;
1114
1115         my $header_var = scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) + 3 * $self->byte_size;
1116         unless ( $size == $header_var ) {
1117             $self->storage->close;
1118             DBM::Deep->_throw_error( "Unexpected size found ($size <-> $header_var)." );
1119         }
1120
1121         $self->set_trans_loc( $header_fixed + scalar(@values) );
1122         $self->set_chains_loc( $header_fixed + scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) );
1123
1124         return length($buffer) + length($buffer2);
1125     }
1126 }
1127
1128 =head2 _load_sector( $offset )
1129
1130 This will instantiate and return the sector object that represents the data found
1131 at $offset.
1132
1133 =cut
1134
1135 sub _load_sector {
1136     my $self = shift;
1137     my ($offset) = @_;
1138
1139     # Add a catch for offset of 0 or 1
1140     return if !$offset || $offset <= 1;
1141
1142     my $type = $self->storage->read_at( $offset, 1 );
1143     return if $type eq chr(0);
1144
1145     if ( $type eq $self->SIG_ARRAY || $type eq $self->SIG_HASH ) {
1146         return DBM::Deep::Engine::Sector::Reference->new({
1147             engine => $self,
1148             type   => $type,
1149             offset => $offset,
1150         });
1151     }
1152     # XXX Don't we need key_md5 here?
1153     elsif ( $type eq $self->SIG_BLIST ) {
1154         return DBM::Deep::Engine::Sector::BucketList->new({
1155             engine => $self,
1156             type   => $type,
1157             offset => $offset,
1158         });
1159     }
1160     elsif ( $type eq $self->SIG_INDEX ) {
1161         return DBM::Deep::Engine::Sector::Index->new({
1162             engine => $self,
1163             type   => $type,
1164             offset => $offset,
1165         });
1166     }
1167     elsif ( $type eq $self->SIG_NULL ) {
1168         return DBM::Deep::Engine::Sector::Null->new({
1169             engine => $self,
1170             type   => $type,
1171             offset => $offset,
1172         });
1173     }
1174     elsif ( $type eq $self->SIG_DATA ) {
1175         return DBM::Deep::Engine::Sector::Scalar->new({
1176             engine => $self,
1177             type   => $type,
1178             offset => $offset,
1179         });
1180     }
1181     # This was deleted from under us, so just return and let the caller figure it out.
1182     elsif ( $type eq $self->SIG_FREE ) {
1183         return;
1184     }
1185
1186     DBM::Deep->_throw_error( "'$offset': Don't know what to do with type '$type'" );
1187 }
1188
1189 =head2 _apply_digest( @stuff )
1190
1191 This will apply the digest methd (default to Digest::MD5::md5) to the arguments
1192 passed in and return the result.
1193
1194 =cut
1195
1196 sub _apply_digest {
1197     my $self = shift;
1198     return $self->{digest}->(@_);
1199 }
1200
1201 =head2 _add_free_blist_sector( $offset, $size )
1202
1203 =head2 _add_free_data_sector( $offset, $size )
1204
1205 =head2 _add_free_index_sector( $offset, $size )
1206
1207 These methods are all wrappers around _add_free_sector(), providing the proper
1208 chain offset ($multiple) for the sector type.
1209
1210 =cut
1211
1212 sub _add_free_blist_sector { shift->_add_free_sector( 0, @_ ) }
1213 sub _add_free_data_sector { shift->_add_free_sector( 1, @_ ) }
1214 sub _add_free_index_sector { shift->_add_free_sector( 2, @_ ) }
1215
1216 =head2 _add_free_sector( $multiple, $offset, $size )
1217
1218 _add_free_sector() takes the offset into the chains location, the offset of the
1219 sector, and the size of that sector. It will mark the sector as a free sector
1220 and put it into the list of sectors that are free of this type for use later.
1221
1222 This returns nothing.
1223
1224 B<NOTE>: $size is unused?
1225
1226 =cut
1227
1228 sub _add_free_sector {
1229     my $self = shift;
1230     my ($multiple, $offset, $size) = @_;
1231
1232     my $chains_offset = $multiple * $self->byte_size;
1233
1234     my $storage = $self->storage;
1235
1236     # Increment staleness.
1237     # XXX Can this increment+modulo be done by "&= 0x1" ?
1238     my $staleness = unpack( $StP{$STALE_SIZE}, $storage->read_at( $offset + SIG_SIZE, $STALE_SIZE ) );
1239     $staleness = ($staleness + 1 ) % ( 2 ** ( 8 * $STALE_SIZE ) );
1240     $storage->print_at( $offset + SIG_SIZE, pack( $StP{$STALE_SIZE}, $staleness ) );
1241
1242     my $old_head = $storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
1243
1244     $storage->print_at( $self->chains_loc + $chains_offset,
1245         pack( $StP{$self->byte_size}, $offset ),
1246     );
1247
1248     # Record the old head in the new sector after the signature and staleness counter
1249     $storage->print_at( $offset + SIG_SIZE + $STALE_SIZE, $old_head );
1250 }
1251
1252 =head2 _request_blist_sector( $size )
1253
1254 =head2 _request_data_sector( $size )
1255
1256 =head2 _request_index_sector( $size )
1257
1258 These methods are all wrappers around _request_sector(), providing the proper
1259 chain offset ($multiple) for the sector type.
1260
1261 =cut
1262
1263 sub _request_blist_sector { shift->_request_sector( 0, @_ ) }
1264 sub _request_data_sector { shift->_request_sector( 1, @_ ) }
1265 sub _request_index_sector { shift->_request_sector( 2, @_ ) }
1266
1267 =head2 _request_sector( $multiple $size )
1268
1269 This takes the offset into the chains location and the size of that sector.
1270
1271 This returns the object with the sector. If there is an available free sector of
1272 that type, then it will be reused. If there isn't one, then a new one will be
1273 allocated.
1274
1275 =cut
1276
1277 sub _request_sector {
1278     my $self = shift;
1279     my ($multiple, $size) = @_;
1280
1281     my $chains_offset = $multiple * $self->byte_size;
1282
1283     my $old_head = $self->storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
1284     my $loc = unpack( $StP{$self->byte_size}, $old_head );
1285
1286     # We don't have any free sectors of the right size, so allocate a new one.
1287     unless ( $loc ) {
1288         my $offset = $self->storage->request_space( $size );
1289
1290         # Zero out the new sector. This also guarantees correct increases
1291         # in the filesize.
1292         $self->storage->print_at( $offset, chr(0) x $size );
1293
1294         return $offset;
1295     }
1296
1297     # Read the new head after the signature and the staleness counter
1298     my $new_head = $self->storage->read_at( $loc + SIG_SIZE + $STALE_SIZE, $self->byte_size );
1299     $self->storage->print_at( $self->chains_loc + $chains_offset, $new_head );
1300     $self->storage->print_at(
1301         $loc + SIG_SIZE + $STALE_SIZE,
1302         pack( $StP{$self->byte_size}, 0 ),
1303     );
1304
1305     return $loc;
1306 }
1307
1308 =head2 flush()
1309
1310 This takes no arguments. It will do everything necessary to flush all things to
1311 disk. This is usually called during unlock() and setup_fh().
1312
1313 This returns nothing.
1314
1315 =cut
1316
1317 sub flush {
1318     my $self = shift;
1319
1320     # Why do we need to have the storage flush? Shouldn't autoflush take care of things?
1321     # -RobK, 2008-06-26
1322     $self->storage->flush;
1323 }
1324
1325 =head2 ACCESSORS
1326
1327 The following are readonly attributes.
1328
1329 =over 4
1330
1331 =item * storage
1332
1333 =item * byte_size
1334
1335 =item * hash_size
1336
1337 =item * hash_chars
1338
1339 =item * num_txns
1340
1341 =item * max_buckets
1342
1343 =item * blank_md5
1344
1345 =item * data_sector_size
1346
1347 =item * txn_bitfield_len
1348
1349 =back
1350
1351 =cut
1352
1353 sub storage     { $_[0]{storage} }
1354 sub byte_size   { $_[0]{byte_size} }
1355 sub hash_size   { $_[0]{hash_size} }
1356 sub hash_chars  { $_[0]{hash_chars} }
1357 sub num_txns    { $_[0]{num_txns} }
1358 sub max_buckets { $_[0]{max_buckets} }
1359 sub blank_md5   { chr(0) x $_[0]->hash_size }
1360 sub data_sector_size { $_[0]{data_sector_size} }
1361
1362 # This is a calculated value
1363 sub txn_bitfield_len {
1364     my $self = shift;
1365     unless ( exists $self->{txn_bitfield_len} ) {
1366         my $temp = ($self->num_txns) / 8;
1367         if ( $temp > int( $temp ) ) {
1368             $temp = int( $temp ) + 1;
1369         }
1370         $self->{txn_bitfield_len} = $temp;
1371     }
1372     return $self->{txn_bitfield_len};
1373 }
1374
1375 =pod
1376
1377 The following are read/write attributes. 
1378
1379 =over 4
1380
1381 =item * trans_id / set_trans_id( $new_id )
1382
1383 =item * trans_loc / set_trans_loc( $new_loc )
1384
1385 =item * chains_loc / set_chains_loc( $new_loc )
1386
1387 =back
1388
1389 =cut
1390
1391 sub trans_id     { $_[0]{trans_id} }
1392 sub set_trans_id { $_[0]{trans_id} = $_[1] }
1393
1394 sub trans_loc     { $_[0]{trans_loc} }
1395 sub set_trans_loc { $_[0]{trans_loc} = $_[1] }
1396
1397 sub chains_loc     { $_[0]{chains_loc} }
1398 sub set_chains_loc { $_[0]{chains_loc} = $_[1] }
1399
1400 sub cache       { $_[0]{cache} ||= {} }
1401 sub clear_cache { %{$_[0]->cache} = () }
1402
1403 =head2 _dump_file()
1404
1405 This method takes no arguments. It's used to print out a textual representation of the DBM::Deep
1406 DB file. It assumes the file is not-corrupted.
1407
1408 =cut
1409
1410 sub _dump_file {
1411     my $self = shift;
1412
1413     # Read the header
1414     my $spot = $self->_read_file_header();
1415
1416     my %types = (
1417         0 => 'B',
1418         1 => 'D',
1419         2 => 'I',
1420     );
1421
1422     my %sizes = (
1423         'D' => $self->data_sector_size,
1424         'B' => DBM::Deep::Engine::Sector::BucketList->new({engine=>$self,offset=>1})->size,
1425         'I' => DBM::Deep::Engine::Sector::Index->new({engine=>$self,offset=>1})->size,
1426     );
1427
1428     my $return = "";
1429
1430     # Header values
1431     $return .= "NumTxns: " . $self->num_txns . $/;
1432
1433     # Read the free sector chains
1434     my %sectors;
1435     foreach my $multiple ( 0 .. 2 ) {
1436         $return .= "Chains($types{$multiple}):";
1437         my $old_loc = $self->chains_loc + $multiple * $self->byte_size;
1438         while ( 1 ) {
1439             my $loc = unpack(
1440                 $StP{$self->byte_size},
1441                 $self->storage->read_at( $old_loc, $self->byte_size ),
1442             );
1443
1444             # We're now out of free sectors of this kind.
1445             unless ( $loc ) {
1446                 last;
1447             }
1448
1449             $sectors{ $types{$multiple} }{ $loc } = undef;
1450             $old_loc = $loc + SIG_SIZE + $STALE_SIZE;
1451             $return .= " $loc";
1452         }
1453         $return .= $/;
1454     }
1455
1456     SECTOR:
1457     while ( $spot < $self->storage->{end} ) {
1458         # Read each sector in order.
1459         my $sector = $self->_load_sector( $spot );
1460         if ( !$sector ) {
1461             # Find it in the free-sectors that were found already
1462             foreach my $type ( keys %sectors ) {
1463                 if ( exists $sectors{$type}{$spot} ) {
1464                     my $size = $sizes{$type};
1465                     $return .= sprintf "%08d: %s %04d\n", $spot, 'F' . $type, $size;
1466                     $spot += $size;
1467                     next SECTOR;
1468                 }
1469             }
1470
1471             die "********\n$return\nDidn't find free sector for $spot in chains\n********\n";
1472         }
1473         else {
1474             $return .= sprintf "%08d: %s  %04d", $spot, $sector->type, $sector->size;
1475             if ( $sector->type eq 'D' ) {
1476                 $return .= ' ' . $sector->data;
1477             }
1478             elsif ( $sector->type eq 'A' || $sector->type eq 'H' ) {
1479                 $return .= ' REF: ' . $sector->get_refcount;
1480             }
1481             elsif ( $sector->type eq 'B' ) {
1482                 foreach my $bucket ( $sector->chopped_up ) {
1483                     $return .= "\n    ";
1484                     $return .= sprintf "%08d", unpack($StP{$self->byte_size},
1485                         substr( $bucket->[-1], $self->hash_size, $self->byte_size),
1486                     );
1487                     my $l = unpack( $StP{$self->byte_size},
1488                         substr( $bucket->[-1],
1489                             $self->hash_size + $self->byte_size,
1490                             $self->byte_size,
1491                         ),
1492                     );
1493                     $return .= sprintf " %08d", $l;
1494                     foreach my $txn ( 0 .. $self->num_txns - 2 ) {
1495                         my $l = unpack( $StP{$self->byte_size},
1496                             substr( $bucket->[-1],
1497                                 $self->hash_size + 2 * $self->byte_size + $txn * ($self->byte_size + $STALE_SIZE),
1498                                 $self->byte_size,
1499                             ),
1500                         );
1501                         $return .= sprintf " %08d", $l;
1502                     }
1503                 }
1504             }
1505             $return .= $/;
1506
1507             $spot += $sector->size;
1508         }
1509     }
1510
1511     return $return;
1512 }
1513
1514 1;
1515 __END__