Started work on DBM::Deep::Engine breakdown
[dbsrgits/DBM-Deep.git] / lib / DBM / Deep / Engine / File.pm
1 package DBM::Deep::Engine::File;
2
3 use 5.006_000;
4
5 use strict;
6 use warnings FATAL => 'all';
7
8 use base qw( DBM::Deep::Engine );
9
10 # Never import symbols into our namespace. We are a class, not a library.
11 use Scalar::Util ();
12
13 use DBM::Deep::Storage::File ();
14
15 use DBM::Deep::Engine::Sector::Data ();
16 use DBM::Deep::Engine::Sector::BucketList ();
17 use DBM::Deep::Engine::Sector::Index ();
18 use DBM::Deep::Engine::Sector::Null ();
19 use DBM::Deep::Engine::Sector::Reference ();
20 use DBM::Deep::Engine::Sector::Scalar ();
21 use DBM::Deep::Null ();
22
23 my $STALE_SIZE = 2;
24
25 # Please refer to the pack() documentation for further information
26 my %StP = (
27     1 => 'C', # Unsigned char value (no order needed as it's just one byte)
28     2 => 'n', # Unsigned short in "network" (big-endian) order
29     4 => 'N', # Unsigned long in "network" (big-endian) order
30     8 => 'Q', # Usigned quad (no order specified, presumably machine-dependent)
31 );
32
33 =head1 NAME
34
35 DBM::Deep::Engine
36
37 =head1 PURPOSE
38
39 This is an internal-use-only object for L<DBM::Deep/>. It mediates the low-level
40 mapping between the L<DBM::Deep/> objects and the storage medium.
41
42 The purpose of this documentation is to provide low-level documentation for
43 developers. It is B<not> intended to be used by the general public. This
44 documentation and what it documents can and will change without notice.
45
46 =head1 OVERVIEW
47
48 The engine exposes an API to the DBM::Deep objects (DBM::Deep, DBM::Deep::Array,
49 and DBM::Deep::Hash) for their use to access the actual stored values. This API
50 is the following:
51
52 =over 4
53
54 =item * new
55
56 =item * read_value
57
58 =item * get_classname
59
60 =item * make_reference
61
62 =item * key_exists
63
64 =item * delete_key
65
66 =item * write_value
67
68 =item * get_next_key
69
70 =item * setup_fh
71
72 =item * begin_work
73
74 =item * commit
75
76 =item * rollback
77
78 =item * lock_exclusive
79
80 =item * lock_shared
81
82 =item * unlock
83
84 =back
85
86 They are explained in their own sections below. These methods, in turn, may
87 provide some bounds-checking, but primarily act to instantiate objects in the
88 Engine::Sector::* hierarchy and dispatch to them.
89
90 =head1 TRANSACTIONS
91
92 Transactions in DBM::Deep are implemented using a variant of MVCC. This attempts
93 to keep the amount of actual work done against the file low while stil providing
94 Atomicity, Consistency, and Isolation. Durability, unfortunately, cannot be done
95 with only one file.
96
97 =head2 STALENESS
98
99 If another process uses a transaction slot and writes stuff to it, then
100 terminates, the data that process wrote it still within the file. In order to
101 address this, there is also a transaction staleness counter associated within
102 every write.  Each time a transaction is started, that process increments that
103 transaction's staleness counter. If, when it reads a value, the staleness
104 counters aren't identical, DBM::Deep will consider the value on disk to be stale
105 and discard it.
106
107 =head2 DURABILITY
108
109 The fourth leg of ACID is Durability, the guarantee that when a commit returns,
110 the data will be there the next time you read from it. This should be regardless
111 of any crashes or powerdowns in between the commit and subsequent read.
112 DBM::Deep does provide that guarantee; once the commit returns, all of the data
113 has been transferred from the transaction shadow to the HEAD. The issue arises
114 with partial commits - a commit that is interrupted in some fashion. In keeping
115 with DBM::Deep's "tradition" of very light error-checking and non-existent
116 error-handling, there is no way to recover from a partial commit. (This is
117 probably a failure in Consistency as well as Durability.)
118
119 Other DBMSes use transaction logs (a separate file, generally) to achieve
120 Durability.  As DBM::Deep is a single-file, we would have to do something
121 similar to what SQLite and BDB do in terms of committing using synchonized
122 writes. To do this, we would have to use a much higher RAM footprint and some
123 serious programming that make my head hurts just to think about it.
124
125 =head1 EXTERNAL METHODS
126
127 =head2 new()
128
129 This takes a set of args. These args are described in the documentation for
130 L<DBM::Deep/new>.
131
132 =cut
133
134 sub new {
135     my $class = shift;
136     my ($args) = @_;
137
138     $args->{storage} = DBM::Deep::Storage::File->new( $args )
139         unless exists $args->{storage};
140
141     my $self = bless {
142         byte_size   => 4,
143
144         digest      => undef,
145         hash_size   => 16,  # In bytes
146         hash_chars  => 256, # Number of chars the algorithm uses per byte
147         max_buckets => 16,
148         num_txns    => 1,   # The HEAD
149         trans_id    => 0,   # Default to the HEAD
150
151         data_sector_size => 64, # Size in bytes of each data sector
152
153         entries => {}, # This is the list of entries for transactions
154         storage => undef,
155     }, $class;
156
157     # Never allow byte_size to be set directly.
158     delete $args->{byte_size};
159     if ( defined $args->{pack_size} ) {
160         if ( lc $args->{pack_size} eq 'small' ) {
161             $args->{byte_size} = 2;
162         }
163         elsif ( lc $args->{pack_size} eq 'medium' ) {
164             $args->{byte_size} = 4;
165         }
166         elsif ( lc $args->{pack_size} eq 'large' ) {
167             $args->{byte_size} = 8;
168         }
169         else {
170             DBM::Deep->_throw_error( "Unknown pack_size value: '$args->{pack_size}'" );
171         }
172     }
173
174     # Grab the parameters we want to use
175     foreach my $param ( keys %$self ) {
176         next unless exists $args->{$param};
177         $self->{$param} = $args->{$param};
178     }
179
180     my %validations = (
181         max_buckets      => { floor => 16, ceil => 256 },
182         num_txns         => { floor => 1,  ceil => 255 },
183         data_sector_size => { floor => 32, ceil => 256 },
184     );
185
186     while ( my ($attr, $c) = each %validations ) {
187         if (   !defined $self->{$attr}
188             || !length $self->{$attr}
189             || $self->{$attr} =~ /\D/
190             || $self->{$attr} < $c->{floor}
191         ) {
192             $self->{$attr} = '(undef)' if !defined $self->{$attr};
193             warn "Floor of $attr is $c->{floor}. Setting it to $c->{floor} from '$self->{$attr}'\n";
194             $self->{$attr} = $c->{floor};
195         }
196         elsif ( $self->{$attr} > $c->{ceil} ) {
197             warn "Ceiling of $attr is $c->{ceil}. Setting it to $c->{ceil} from '$self->{$attr}'\n";
198             $self->{$attr} = $c->{ceil};
199         }
200     }
201
202     if ( !$self->{digest} ) {
203         require Digest::MD5;
204         $self->{digest} = \&Digest::MD5::md5;
205     }
206
207     return $self;
208 }
209
210 =head2 read_value( $obj, $key )
211
212 This takes an object that provides _base_offset() and a string. It returns the
213 value stored in the corresponding Sector::Value's data section.
214
215 =cut
216
217 sub read_value {
218     my $self = shift;
219     my ($obj, $key) = @_;
220
221     # This will be a Reference sector
222     my $sector = $self->_load_sector( $obj->_base_offset )
223         or return;
224
225     if ( $sector->staleness != $obj->_staleness ) {
226         return;
227     }
228
229     my $key_md5 = $self->_apply_digest( $key );
230
231     my $value_sector = $sector->get_data_for({
232         key_md5    => $key_md5,
233         allow_head => 1,
234     });
235
236     unless ( $value_sector ) {
237         $value_sector = DBM::Deep::Engine::Sector::Null->new({
238             engine => $self,
239             data   => undef,
240         });
241
242         $sector->write_data({
243             key_md5 => $key_md5,
244             key     => $key,
245             value   => $value_sector,
246         });
247     }
248
249     return $value_sector->data;
250 }
251
252 =head2 get_classname( $obj )
253
254 This takes an object that provides _base_offset() and returns the classname (if
255 any) associated with it.
256
257 It delegates to Sector::Reference::get_classname() for the heavy lifting.
258
259 It performs a staleness check.
260
261 =cut
262
263 sub get_classname {
264     my $self = shift;
265     my ($obj) = @_;
266
267     # This will be a Reference sector
268     my $sector = $self->_load_sector( $obj->_base_offset )
269         or DBM::Deep->_throw_error( "How did get_classname fail (no sector for '$obj')?!" );
270
271     if ( $sector->staleness != $obj->_staleness ) {
272         return;
273     }
274
275     return $sector->get_classname;
276 }
277
278 =head2 make_reference( $obj, $old_key, $new_key )
279
280 This takes an object that provides _base_offset() and two strings. The
281 strings correspond to the old key and new key, respectively. This operation
282 is equivalent to (given C<< $db->{foo} = []; >>) C<< $db->{bar} = $db->{foo} >>.
283
284 This returns nothing.
285
286 =cut
287
288 sub make_reference {
289     my $self = shift;
290     my ($obj, $old_key, $new_key) = @_;
291
292     # This will be a Reference sector
293     my $sector = $self->_load_sector( $obj->_base_offset )
294         or DBM::Deep->_throw_error( "How did make_reference fail (no sector for '$obj')?!" );
295
296     if ( $sector->staleness != $obj->_staleness ) {
297         return;
298     }
299
300     my $old_md5 = $self->_apply_digest( $old_key );
301
302     my $value_sector = $sector->get_data_for({
303         key_md5    => $old_md5,
304         allow_head => 1,
305     });
306
307     unless ( $value_sector ) {
308         $value_sector = DBM::Deep::Engine::Sector::Null->new({
309             engine => $self,
310             data   => undef,
311         });
312
313         $sector->write_data({
314             key_md5 => $old_md5,
315             key     => $old_key,
316             value   => $value_sector,
317         });
318     }
319
320     if ( $value_sector->isa( 'DBM::Deep::Engine::Sector::Reference' ) ) {
321         $sector->write_data({
322             key     => $new_key,
323             key_md5 => $self->_apply_digest( $new_key ),
324             value   => $value_sector,
325         });
326         $value_sector->increment_refcount;
327     }
328     else {
329         $sector->write_data({
330             key     => $new_key,
331             key_md5 => $self->_apply_digest( $new_key ),
332             value   => $value_sector->clone,
333         });
334     }
335
336     return;
337 }
338
339 =head2 key_exists( $obj, $key )
340
341 This takes an object that provides _base_offset() and a string for
342 the key to be checked. This returns 1 for true and "" for false.
343
344 =cut
345
346 sub key_exists {
347     my $self = shift;
348     my ($obj, $key) = @_;
349
350     # This will be a Reference sector
351     my $sector = $self->_load_sector( $obj->_base_offset )
352         or return '';
353
354     if ( $sector->staleness != $obj->_staleness ) {
355         return '';
356     }
357
358     my $data = $sector->get_data_for({
359         key_md5    => $self->_apply_digest( $key ),
360         allow_head => 1,
361     });
362
363     # exists() returns 1 or '' for true/false.
364     return $data ? 1 : '';
365 }
366
367 =head2 delete_key( $obj, $key )
368
369 This takes an object that provides _base_offset() and a string for
370 the key to be deleted. This returns the result of the Sector::Reference
371 delete_key() method.
372
373 =cut
374
375 sub delete_key {
376     my $self = shift;
377     my ($obj, $key) = @_;
378
379     my $sector = $self->_load_sector( $obj->_base_offset )
380         or return;
381
382     if ( $sector->staleness != $obj->_staleness ) {
383         return;
384     }
385
386     return $sector->delete_key({
387         key_md5    => $self->_apply_digest( $key ),
388         allow_head => 0,
389     });
390 }
391
392 =head2 write_value( $obj, $key, $value )
393
394 This takes an object that provides _base_offset(), a string for the
395 key, and a value. This value can be anything storable within L<DBM::Deep/>.
396
397 This returns 1 upon success.
398
399 =cut
400
401 sub write_value {
402     my $self = shift;
403     my ($obj, $key, $value) = @_;
404
405     my $r = Scalar::Util::reftype( $value ) || '';
406     {
407         last if $r eq '';
408         last if $r eq 'HASH';
409         last if $r eq 'ARRAY';
410
411         DBM::Deep->_throw_error(
412             "Storage of references of type '$r' is not supported."
413         );
414     }
415
416     # This will be a Reference sector
417     my $sector = $self->_load_sector( $obj->_base_offset )
418         or DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep." );
419
420     if ( $sector->staleness != $obj->_staleness ) {
421         DBM::Deep->_throw_error( "Cannot write to a deleted spot in DBM::Deep." );
422     }
423
424     my ($class, $type);
425     if ( !defined $value ) {
426         $class = 'DBM::Deep::Engine::Sector::Null';
427     }
428     elsif ( $r eq 'ARRAY' || $r eq 'HASH' ) {
429         my $tmpvar;
430         if ( $r eq 'ARRAY' ) {
431             $tmpvar = tied @$value;
432         } elsif ( $r eq 'HASH' ) {
433             $tmpvar = tied %$value;
434         }
435
436         if ( $tmpvar ) {
437             my $is_dbm_deep = eval { local $SIG{'__DIE__'}; $tmpvar->isa( 'DBM::Deep' ); };
438
439             unless ( $is_dbm_deep ) {
440                 DBM::Deep->_throw_error( "Cannot store something that is tied." );
441             }
442
443             unless ( $tmpvar->_engine->storage == $self->storage ) {
444                 DBM::Deep->_throw_error( "Cannot store values across DBM::Deep files. Please use export() instead." );
445             }
446
447             # First, verify if we're storing the same thing to this spot. If we are, then
448             # this should be a no-op. -EJS, 2008-05-19
449             my $loc = $sector->get_data_location_for({
450                 key_md5 => $self->_apply_digest( $key ),
451                 allow_head => 1,
452             });
453
454             if ( defined($loc) && $loc == $tmpvar->_base_offset ) {
455                 return 1;
456             }
457
458             #XXX Can this use $loc?
459             my $value_sector = $self->_load_sector( $tmpvar->_base_offset );
460             $sector->write_data({
461                 key     => $key,
462                 key_md5 => $self->_apply_digest( $key ),
463                 value   => $value_sector,
464             });
465             $value_sector->increment_refcount;
466
467             return 1;
468         }
469
470         $class = 'DBM::Deep::Engine::Sector::Reference';
471         $type = substr( $r, 0, 1 );
472     }
473     else {
474         if ( tied($value) ) {
475             DBM::Deep->_throw_error( "Cannot store something that is tied." );
476         }
477         $class = 'DBM::Deep::Engine::Sector::Scalar';
478     }
479
480     # Create this after loading the reference sector in case something bad happens.
481     # This way, we won't allocate value sector(s) needlessly.
482     my $value_sector = $class->new({
483         engine => $self,
484         data   => $value,
485         type   => $type,
486     });
487
488     $sector->write_data({
489         key     => $key,
490         key_md5 => $self->_apply_digest( $key ),
491         value   => $value_sector,
492     });
493
494     # This code is to make sure we write all the values in the $value to the disk
495     # and to make sure all changes to $value after the assignment are reflected
496     # on disk. This may be counter-intuitive at first, but it is correct dwimmery.
497     #   NOTE - simply tying $value won't perform a STORE on each value. Hence, the
498     # copy to a temp value.
499     if ( $r eq 'ARRAY' ) {
500         my @temp = @$value;
501         tie @$value, 'DBM::Deep', {
502             base_offset => $value_sector->offset,
503             staleness   => $value_sector->staleness,
504             storage     => $self->storage,
505             engine      => $self,
506         };
507         @$value = @temp;
508         bless $value, 'DBM::Deep::Array' unless Scalar::Util::blessed( $value );
509     }
510     elsif ( $r eq 'HASH' ) {
511         my %temp = %$value;
512         tie %$value, 'DBM::Deep', {
513             base_offset => $value_sector->offset,
514             staleness   => $value_sector->staleness,
515             storage     => $self->storage,
516             engine      => $self,
517         };
518
519         %$value = %temp;
520         bless $value, 'DBM::Deep::Hash' unless Scalar::Util::blessed( $value );
521     }
522
523     return 1;
524 }
525
526 =head2 setup_fh( $obj )
527
528 This takes an object that provides _base_offset(). It will do everything needed
529 in order to properly initialize all values for necessary functioning. If this is
530 called upon an already initialized object, this will also reset the inode.
531
532 This returns 1.
533
534 =cut
535
536 sub setup_fh {
537     my $self = shift;
538     my ($obj) = @_;
539
540     # We're opening the file.
541     unless ( $obj->_base_offset ) {
542         my $bytes_read = $self->_read_file_header;
543
544         # Creating a new file
545         unless ( $bytes_read ) {
546             $self->_write_file_header;
547
548             # 1) Create Array/Hash entry
549             my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
550                 engine => $self,
551                 type   => $obj->_type,
552             });
553             $obj->{base_offset} = $initial_reference->offset;
554             $obj->{staleness} = $initial_reference->staleness;
555
556             $self->storage->flush;
557         }
558         # Reading from an existing file
559         else {
560             $obj->{base_offset} = $bytes_read;
561             my $initial_reference = DBM::Deep::Engine::Sector::Reference->new({
562                 engine => $self,
563                 offset => $obj->_base_offset,
564             });
565             unless ( $initial_reference ) {
566                 DBM::Deep->_throw_error("Corrupted file, no master index record");
567             }
568
569             unless ($obj->_type eq $initial_reference->type) {
570                 DBM::Deep->_throw_error("File type mismatch");
571             }
572
573             $obj->{staleness} = $initial_reference->staleness;
574         }
575     }
576
577     $self->storage->set_inode;
578
579     return 1;
580 }
581
582 =head2 begin_work( $obj )
583
584 This takes an object that provides _base_offset(). It will set up all necessary
585 bookkeeping in order to run all work within a transaction.
586
587 If $obj is already within a transaction, an error wiill be thrown. If there are
588 no more available transactions, an error will be thrown.
589
590 This returns undef.
591
592 =cut
593
594 sub begin_work {
595     my $self = shift;
596     my ($obj) = @_;
597
598     if ( $self->trans_id ) {
599         DBM::Deep->_throw_error( "Cannot begin_work within an active transaction" );
600     }
601
602     my @slots = $self->read_txn_slots;
603     my $found;
604     for my $i ( 0 .. $#slots ) {
605         next if $slots[$i];
606
607         $slots[$i] = 1;
608         $self->set_trans_id( $i + 1 );
609         $found = 1;
610         last;
611     }
612     unless ( $found ) {
613         DBM::Deep->_throw_error( "Cannot allocate transaction ID" );
614     }
615     $self->write_txn_slots( @slots );
616
617     if ( !$self->trans_id ) {
618         DBM::Deep->_throw_error( "Cannot begin_work - no available transactions" );
619     }
620
621     return;
622 }
623
624 =head2 rollback( $obj )
625
626 This takes an object that provides _base_offset(). It will revert all
627 actions taken within the running transaction.
628
629 If $obj is not within a transaction, an error will be thrown.
630
631 This returns 1.
632
633 =cut
634
635 sub rollback {
636     my $self = shift;
637     my ($obj) = @_;
638
639     if ( !$self->trans_id ) {
640         DBM::Deep->_throw_error( "Cannot rollback without an active transaction" );
641     }
642
643     # Each entry is the file location for a bucket that has a modification for
644     # this transaction. The entries need to be expunged.
645     foreach my $entry (@{ $self->get_entries } ) {
646         # Remove the entry here
647         my $read_loc = $entry
648           + $self->hash_size
649           + $self->byte_size
650           + $self->byte_size
651           + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE );
652
653         my $data_loc = $self->storage->read_at( $read_loc, $self->byte_size );
654         $data_loc = unpack( $StP{$self->byte_size}, $data_loc );
655         $self->storage->print_at( $read_loc, pack( $StP{$self->byte_size}, 0 ) );
656
657         if ( $data_loc > 1 ) {
658             $self->_load_sector( $data_loc )->free;
659         }
660     }
661
662     $self->clear_entries;
663
664     my @slots = $self->read_txn_slots;
665     $slots[$self->trans_id-1] = 0;
666     $self->write_txn_slots( @slots );
667     $self->inc_txn_staleness_counter( $self->trans_id );
668     $self->set_trans_id( 0 );
669
670     return 1;
671 }
672
673 =head2 commit( $obj )
674
675 This takes an object that provides _base_offset(). It will apply all
676 actions taken within the transaction to the HEAD.
677
678 If $obj is not within a transaction, an error will be thrown.
679
680 This returns 1.
681
682 =cut
683
684 sub commit {
685     my $self = shift;
686     my ($obj) = @_;
687
688     if ( !$self->trans_id ) {
689         DBM::Deep->_throw_error( "Cannot commit without an active transaction" );
690     }
691
692     foreach my $entry (@{ $self->get_entries } ) {
693         # Overwrite the entry in head with the entry in trans_id
694         my $base = $entry
695           + $self->hash_size
696           + $self->byte_size;
697
698         my $head_loc = $self->storage->read_at( $base, $self->byte_size );
699         $head_loc = unpack( $StP{$self->byte_size}, $head_loc );
700
701         my $spot = $base + $self->byte_size + ($self->trans_id - 1) * ( $self->byte_size + $STALE_SIZE );
702         my $trans_loc = $self->storage->read_at(
703             $spot, $self->byte_size,
704         );
705
706         $self->storage->print_at( $base, $trans_loc );
707         $self->storage->print_at(
708             $spot,
709             pack( $StP{$self->byte_size} . ' ' . $StP{$STALE_SIZE}, (0) x 2 ),
710         );
711
712         if ( $head_loc > 1 ) {
713             $self->_load_sector( $head_loc )->free;
714         }
715     }
716
717     $self->clear_entries;
718
719     my @slots = $self->read_txn_slots;
720     $slots[$self->trans_id-1] = 0;
721     $self->write_txn_slots( @slots );
722     $self->inc_txn_staleness_counter( $self->trans_id );
723     $self->set_trans_id( 0 );
724
725     return 1;
726 }
727
728 =head2 lock_exclusive()
729
730 This takes an object that provides _base_offset(). It will guarantee that
731 the storage has taken precautions to be safe for a write.
732
733 This returns nothing.
734
735 =cut
736
737 sub lock_exclusive {
738     my $self = shift;
739     my ($obj) = @_;
740     return $self->storage->lock_exclusive( $obj );
741 }
742
743 =head2 lock_shared()
744
745 This takes an object that provides _base_offset(). It will guarantee that
746 the storage has taken precautions to be safe for a read.
747
748 This returns nothing.
749
750 =cut
751
752 sub lock_shared {
753     my $self = shift;
754     my ($obj) = @_;
755     return $self->storage->lock_shared( $obj );
756 }
757
758 =head2 unlock()
759
760 This takes an object that provides _base_offset(). It will guarantee that
761 the storage has released all locks taken.
762
763 This returns nothing.
764
765 =cut
766
767 sub unlock {
768     my $self = shift;
769     my ($obj) = @_;
770
771     my $rv = $self->storage->unlock( $obj );
772
773     $self->flush if $rv;
774
775     return $rv;
776 }
777
778 =head1 INTERNAL METHODS
779
780 The following methods are internal-use-only to DBM::Deep::Engine::File.
781
782 =cut
783
784 =head2 read_txn_slots()
785
786 This takes no arguments.
787
788 This will return an array with a 1 or 0 in each slot. Each spot represents one
789 available transaction. If the slot is 1, that transaction is taken. If it is 0,
790 the transaction is available.
791
792 =cut
793
794 sub read_txn_slots {
795     my $self = shift;
796     my $bl = $self->txn_bitfield_len;
797     my $num_bits = $bl * 8;
798     return split '', unpack( 'b'.$num_bits,
799         $self->storage->read_at(
800             $self->trans_loc, $bl,
801         )
802     );
803 }
804
805 =head2 write_txn_slots( @slots )
806
807 This takes an array of 1's and 0's. This array represents the transaction slots
808 returned by L</read_txn_slots()>. In other words, the following is true:
809
810   @x = read_txn_slots( write_txn_slots( @x ) );
811
812 (With the obviously missing object referents added back in.)
813
814 =cut
815
816 sub write_txn_slots {
817     my $self = shift;
818     my $num_bits = $self->txn_bitfield_len * 8;
819     $self->storage->print_at( $self->trans_loc,
820         pack( 'b'.$num_bits, join('', @_) ),
821     );
822 }
823
824 =head2 get_running_txn_ids()
825
826 This takes no arguments.
827
828 This will return an array of taken transaction IDs. This wraps L</read_txn_slots()>.
829
830 =cut
831
832 sub get_running_txn_ids {
833     my $self = shift;
834     my @transactions = $self->read_txn_slots;
835     my @trans_ids = map { $_+1 } grep { $transactions[$_] } 0 .. $#transactions;
836 }
837
838 =head2 get_txn_staleness_counter( $trans_id )
839
840 This will return the staleness counter for the given transaction ID. Please see
841 L</TRANSACTION STALENESS> for more information.
842
843 =cut
844
845 sub get_txn_staleness_counter {
846     my $self = shift;
847     my ($trans_id) = @_;
848
849     # Hardcode staleness of 0 for the HEAD
850     return 0 unless $trans_id;
851
852     return unpack( $StP{$STALE_SIZE},
853         $self->storage->read_at(
854             $self->trans_loc + $self->txn_bitfield_len + $STALE_SIZE * ($trans_id - 1),
855             $STALE_SIZE,
856         )
857     );
858 }
859
860 =head2 inc_txn_staleness_counter( $trans_id )
861
862 This will increment the staleness counter for the given transaction ID. Please see
863 L</TRANSACTION STALENESS> for more information.
864
865 =cut
866
867 sub inc_txn_staleness_counter {
868     my $self = shift;
869     my ($trans_id) = @_;
870
871     # Hardcode staleness of 0 for the HEAD
872     return 0 unless $trans_id;
873
874     $self->storage->print_at(
875         $self->trans_loc + $self->txn_bitfield_len + $STALE_SIZE * ($trans_id - 1),
876         pack( $StP{$STALE_SIZE}, $self->get_txn_staleness_counter( $trans_id ) + 1 ),
877     );
878 }
879
880 =head2 get_entries()
881
882 This takes no arguments.
883
884 This returns a list of all the sectors that have been modified by this transaction.
885
886 =cut
887
888 sub get_entries {
889     my $self = shift;
890     return [ keys %{ $self->{entries}{$self->trans_id} ||= {} } ];
891 }
892
893 =head2 add_entry( $trans_id, $location )
894
895 This takes a transaction ID and a file location and marks the sector at that
896 location as having been modified by the transaction identified by $trans_id.
897
898 This returns nothing.
899
900 B<NOTE>: Unlike all the other _entries() methods, there are several cases where
901 C<< $trans_id != $self->trans_id >> for this method.
902
903 =cut
904
905 sub add_entry {
906     my $self = shift;
907     my ($trans_id, $loc) = @_;
908
909     $self->{entries}{$trans_id} ||= {};
910     $self->{entries}{$trans_id}{$loc} = undef;
911 }
912
913 =head2 reindex_entry( $old_loc, $new_loc )
914
915 This takes two locations (old and new, respectively). If a location that has
916 been modified by this transaction is subsequently reindexed due to a bucketlist
917 overflowing, then the entries hash needs to be made aware of this change.
918
919 This returns nothing.
920
921 =cut
922
923 sub reindex_entry {
924     my $self = shift;
925     my ($old_loc, $new_loc) = @_;
926
927     TRANS:
928     while ( my ($trans_id, $locs) = each %{ $self->{entries} } ) {
929         if ( exists $locs->{$old_loc} ) {
930             delete $locs->{$old_loc};
931             $locs->{$new_loc} = undef;
932             next TRANS;
933         }
934     }
935 }
936
937 =head2 clear_entries()
938
939 This takes no arguments. It will clear the entries list for the running
940 transaction.
941
942 This returns nothing.
943
944 =cut
945
946 sub clear_entries {
947     my $self = shift;
948     delete $self->{entries}{$self->trans_id};
949 }
950
951 =head2 _write_file_header()
952
953 This writes the file header for a new file. This will write the various settings
954 that set how the file is interpreted.
955
956 =head2 _read_file_header()
957
958 This reads the file header from an existing file. This will read the various
959 settings that set how the file is interpreted.
960
961 =cut
962
963 {
964     my $header_fixed = length( __PACKAGE__->SIG_FILE ) + 1 + 4 + 4;
965     my $this_file_version = 3;
966
967     sub _write_file_header {
968         my $self = shift;
969
970         my $nt = $self->num_txns;
971         my $bl = $self->txn_bitfield_len;
972
973         my $header_var = 1 + 1 + 1 + 1 + $bl + $STALE_SIZE * ($nt - 1) + 3 * $self->byte_size;
974
975         my $loc = $self->storage->request_space( $header_fixed + $header_var );
976
977         $self->storage->print_at( $loc,
978             $self->SIG_FILE,
979             $self->SIG_HEADER,
980             pack('N', $this_file_version), # At this point, we're at 9 bytes
981             pack('N', $header_var),        # header size
982             # --- Above is $header_fixed. Below is $header_var
983             pack('C', $self->byte_size),
984
985             # These shenanigans are to allow a 256 within a C
986             pack('C', $self->max_buckets - 1),
987             pack('C', $self->data_sector_size - 1),
988
989             pack('C', $nt),
990             pack('C' . $bl, 0 ),                           # Transaction activeness bitfield
991             pack($StP{$STALE_SIZE}.($nt-1), 0 x ($nt-1) ), # Transaction staleness counters
992             pack($StP{$self->byte_size}, 0), # Start of free chain (blist size)
993             pack($StP{$self->byte_size}, 0), # Start of free chain (data size)
994             pack($StP{$self->byte_size}, 0), # Start of free chain (index size)
995         );
996
997         #XXX Set these less fragilely
998         $self->set_trans_loc( $header_fixed + 4 );
999         $self->set_chains_loc( $header_fixed + 4 + $bl + $STALE_SIZE * ($nt-1) );
1000
1001         return;
1002     }
1003
1004     sub _read_file_header {
1005         my $self = shift;
1006
1007         my $buffer = $self->storage->read_at( 0, $header_fixed );
1008         return unless length($buffer);
1009
1010         my ($file_signature, $sig_header, $file_version, $size) = unpack(
1011             'A4 A N N', $buffer
1012         );
1013
1014         unless ( $file_signature eq $self->SIG_FILE ) {
1015             $self->storage->close;
1016             DBM::Deep->_throw_error( "Signature not found -- file is not a Deep DB" );
1017         }
1018
1019         unless ( $sig_header eq $self->SIG_HEADER ) {
1020             $self->storage->close;
1021             DBM::Deep->_throw_error( "Pre-1.00 file version found" );
1022         }
1023
1024         unless ( $file_version == $this_file_version ) {
1025             $self->storage->close;
1026             DBM::Deep->_throw_error(
1027                 "Wrong file version found - " .  $file_version .
1028                 " - expected " . $this_file_version
1029             );
1030         }
1031
1032         my $buffer2 = $self->storage->read_at( undef, $size );
1033         my @values = unpack( 'C C C C', $buffer2 );
1034
1035         if ( @values != 4 || grep { !defined } @values ) {
1036             $self->storage->close;
1037             DBM::Deep->_throw_error("Corrupted file - bad header");
1038         }
1039
1040         #XXX Add warnings if values weren't set right
1041         @{$self}{qw(byte_size max_buckets data_sector_size num_txns)} = @values;
1042
1043         # These shenangians are to allow a 256 within a C
1044         $self->{max_buckets} += 1;
1045         $self->{data_sector_size} += 1;
1046
1047         my $bl = $self->txn_bitfield_len;
1048
1049         my $header_var = scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) + 3 * $self->byte_size;
1050         unless ( $size == $header_var ) {
1051             $self->storage->close;
1052             DBM::Deep->_throw_error( "Unexpected size found ($size <-> $header_var)." );
1053         }
1054
1055         $self->set_trans_loc( $header_fixed + scalar(@values) );
1056         $self->set_chains_loc( $header_fixed + scalar(@values) + $bl + $STALE_SIZE * ($self->num_txns - 1) );
1057
1058         return length($buffer) + length($buffer2);
1059     }
1060 }
1061
1062 =head2 _load_sector( $offset )
1063
1064 This will instantiate and return the sector object that represents the data found
1065 at $offset.
1066
1067 =cut
1068
1069 sub _load_sector {
1070     my $self = shift;
1071     my ($offset) = @_;
1072
1073     # Add a catch for offset of 0 or 1
1074     return if !$offset || $offset <= 1;
1075
1076     my $type = $self->storage->read_at( $offset, 1 );
1077     return if $type eq chr(0);
1078
1079     if ( $type eq $self->SIG_ARRAY || $type eq $self->SIG_HASH ) {
1080         return DBM::Deep::Engine::Sector::Reference->new({
1081             engine => $self,
1082             type   => $type,
1083             offset => $offset,
1084         });
1085     }
1086     # XXX Don't we need key_md5 here?
1087     elsif ( $type eq $self->SIG_BLIST ) {
1088         return DBM::Deep::Engine::Sector::BucketList->new({
1089             engine => $self,
1090             type   => $type,
1091             offset => $offset,
1092         });
1093     }
1094     elsif ( $type eq $self->SIG_INDEX ) {
1095         return DBM::Deep::Engine::Sector::Index->new({
1096             engine => $self,
1097             type   => $type,
1098             offset => $offset,
1099         });
1100     }
1101     elsif ( $type eq $self->SIG_NULL ) {
1102         return DBM::Deep::Engine::Sector::Null->new({
1103             engine => $self,
1104             type   => $type,
1105             offset => $offset,
1106         });
1107     }
1108     elsif ( $type eq $self->SIG_DATA ) {
1109         return DBM::Deep::Engine::Sector::Scalar->new({
1110             engine => $self,
1111             type   => $type,
1112             offset => $offset,
1113         });
1114     }
1115     # This was deleted from under us, so just return and let the caller figure it out.
1116     elsif ( $type eq $self->SIG_FREE ) {
1117         return;
1118     }
1119
1120     DBM::Deep->_throw_error( "'$offset': Don't know what to do with type '$type'" );
1121 }
1122
1123 =head2 _apply_digest( @stuff )
1124
1125 This will apply the digest methd (default to Digest::MD5::md5) to the arguments
1126 passed in and return the result.
1127
1128 =cut
1129
1130 sub _apply_digest {
1131     my $self = shift;
1132     return $self->{digest}->(@_);
1133 }
1134
1135 =head2 _add_free_blist_sector( $offset, $size )
1136
1137 =head2 _add_free_data_sector( $offset, $size )
1138
1139 =head2 _add_free_index_sector( $offset, $size )
1140
1141 These methods are all wrappers around _add_free_sector(), providing the proper
1142 chain offset ($multiple) for the sector type.
1143
1144 =cut
1145
1146 sub _add_free_blist_sector { shift->_add_free_sector( 0, @_ ) }
1147 sub _add_free_data_sector { shift->_add_free_sector( 1, @_ ) }
1148 sub _add_free_index_sector { shift->_add_free_sector( 2, @_ ) }
1149
1150 =head2 _add_free_sector( $multiple, $offset, $size )
1151
1152 _add_free_sector() takes the offset into the chains location, the offset of the
1153 sector, and the size of that sector. It will mark the sector as a free sector
1154 and put it into the list of sectors that are free of this type for use later.
1155
1156 This returns nothing.
1157
1158 B<NOTE>: $size is unused?
1159
1160 =cut
1161
1162 sub _add_free_sector {
1163     my $self = shift;
1164     my ($multiple, $offset, $size) = @_;
1165
1166     my $chains_offset = $multiple * $self->byte_size;
1167
1168     my $storage = $self->storage;
1169
1170     # Increment staleness.
1171     # XXX Can this increment+modulo be done by "&= 0x1" ?
1172     my $staleness = unpack( $StP{$STALE_SIZE}, $storage->read_at( $offset + $self->SIG_SIZE, $STALE_SIZE ) );
1173     $staleness = ($staleness + 1 ) % ( 2 ** ( 8 * $STALE_SIZE ) );
1174     $storage->print_at( $offset + $self->SIG_SIZE, pack( $StP{$STALE_SIZE}, $staleness ) );
1175
1176     my $old_head = $storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
1177
1178     $storage->print_at( $self->chains_loc + $chains_offset,
1179         pack( $StP{$self->byte_size}, $offset ),
1180     );
1181
1182     # Record the old head in the new sector after the signature and staleness counter
1183     $storage->print_at( $offset + $self->SIG_SIZE + $STALE_SIZE, $old_head );
1184 }
1185
1186 =head2 _request_blist_sector( $size )
1187
1188 =head2 _request_data_sector( $size )
1189
1190 =head2 _request_index_sector( $size )
1191
1192 These methods are all wrappers around _request_sector(), providing the proper
1193 chain offset ($multiple) for the sector type.
1194
1195 =cut
1196
1197 sub _request_blist_sector { shift->_request_sector( 0, @_ ) }
1198 sub _request_data_sector { shift->_request_sector( 1, @_ ) }
1199 sub _request_index_sector { shift->_request_sector( 2, @_ ) }
1200
1201 =head2 _request_sector( $multiple $size )
1202
1203 This takes the offset into the chains location and the size of that sector.
1204
1205 This returns the object with the sector. If there is an available free sector of
1206 that type, then it will be reused. If there isn't one, then a new one will be
1207 allocated.
1208
1209 =cut
1210
1211 sub _request_sector {
1212     my $self = shift;
1213     my ($multiple, $size) = @_;
1214
1215     my $chains_offset = $multiple * $self->byte_size;
1216
1217     my $old_head = $self->storage->read_at( $self->chains_loc + $chains_offset, $self->byte_size );
1218     my $loc = unpack( $StP{$self->byte_size}, $old_head );
1219
1220     # We don't have any free sectors of the right size, so allocate a new one.
1221     unless ( $loc ) {
1222         my $offset = $self->storage->request_space( $size );
1223
1224         # Zero out the new sector. This also guarantees correct increases
1225         # in the filesize.
1226         $self->storage->print_at( $offset, chr(0) x $size );
1227
1228         return $offset;
1229     }
1230
1231     # Read the new head after the signature and the staleness counter
1232     my $new_head = $self->storage->read_at( $loc + $self->SIG_SIZE + $STALE_SIZE, $self->byte_size );
1233     $self->storage->print_at( $self->chains_loc + $chains_offset, $new_head );
1234     $self->storage->print_at(
1235         $loc + $self->SIG_SIZE + $STALE_SIZE,
1236         pack( $StP{$self->byte_size}, 0 ),
1237     );
1238
1239     return $loc;
1240 }
1241
1242 =head2 flush()
1243
1244 This takes no arguments. It will do everything necessary to flush all things to
1245 disk. This is usually called during unlock() and setup_fh().
1246
1247 This returns nothing.
1248
1249 =cut
1250
1251 sub flush {
1252     my $self = shift;
1253
1254     # Why do we need to have the storage flush? Shouldn't autoflush take care of things?
1255     # -RobK, 2008-06-26
1256     $self->storage->flush;
1257 }
1258
1259 =head2 ACCESSORS
1260
1261 The following are readonly attributes.
1262
1263 =over 4
1264
1265 =item * storage
1266
1267 =item * byte_size
1268
1269 =item * hash_size
1270
1271 =item * hash_chars
1272
1273 =item * num_txns
1274
1275 =item * max_buckets
1276
1277 =item * blank_md5
1278
1279 =item * data_sector_size
1280
1281 =item * txn_bitfield_len
1282
1283 =back
1284
1285 =cut
1286
1287 sub storage     { $_[0]{storage} }
1288 sub byte_size   { $_[0]{byte_size} }
1289 sub hash_size   { $_[0]{hash_size} }
1290 sub hash_chars  { $_[0]{hash_chars} }
1291 sub num_txns    { $_[0]{num_txns} }
1292 sub max_buckets { $_[0]{max_buckets} }
1293 sub blank_md5   { chr(0) x $_[0]->hash_size }
1294 sub data_sector_size { $_[0]{data_sector_size} }
1295
1296 # This is a calculated value
1297 sub txn_bitfield_len {
1298     my $self = shift;
1299     unless ( exists $self->{txn_bitfield_len} ) {
1300         my $temp = ($self->num_txns) / 8;
1301         if ( $temp > int( $temp ) ) {
1302             $temp = int( $temp ) + 1;
1303         }
1304         $self->{txn_bitfield_len} = $temp;
1305     }
1306     return $self->{txn_bitfield_len};
1307 }
1308
1309 =pod
1310
1311 The following are read/write attributes. 
1312
1313 =over 4
1314
1315 =item * trans_id / set_trans_id( $new_id )
1316
1317 =item * trans_loc / set_trans_loc( $new_loc )
1318
1319 =item * chains_loc / set_chains_loc( $new_loc )
1320
1321 =back
1322
1323 =cut
1324
1325 sub trans_id     { $_[0]{trans_id} }
1326 sub set_trans_id { $_[0]{trans_id} = $_[1] }
1327
1328 sub trans_loc     { $_[0]{trans_loc} }
1329 sub set_trans_loc { $_[0]{trans_loc} = $_[1] }
1330
1331 sub chains_loc     { $_[0]{chains_loc} }
1332 sub set_chains_loc { $_[0]{chains_loc} = $_[1] }
1333
1334 sub cache       { $_[0]{cache} ||= {} }
1335 sub clear_cache { %{$_[0]->cache} = () }
1336
1337 =head2 _dump_file()
1338
1339 This method takes no arguments. It's used to print out a textual representation
1340 of the DBM::Deep DB file. It assumes the file is not-corrupted.
1341
1342 =cut
1343
1344 sub _dump_file {
1345     my $self = shift;
1346
1347     # Read the header
1348     my $spot = $self->_read_file_header();
1349
1350     my %types = (
1351         0 => 'B',
1352         1 => 'D',
1353         2 => 'I',
1354     );
1355
1356     my %sizes = (
1357         'D' => $self->data_sector_size,
1358         'B' => DBM::Deep::Engine::Sector::BucketList->new({engine=>$self,offset=>1})->size,
1359         'I' => DBM::Deep::Engine::Sector::Index->new({engine=>$self,offset=>1})->size,
1360     );
1361
1362     my $return = "";
1363
1364     # Header values
1365     $return .= "NumTxns: " . $self->num_txns . $/;
1366
1367     # Read the free sector chains
1368     my %sectors;
1369     foreach my $multiple ( 0 .. 2 ) {
1370         $return .= "Chains($types{$multiple}):";
1371         my $old_loc = $self->chains_loc + $multiple * $self->byte_size;
1372         while ( 1 ) {
1373             my $loc = unpack(
1374                 $StP{$self->byte_size},
1375                 $self->storage->read_at( $old_loc, $self->byte_size ),
1376             );
1377
1378             # We're now out of free sectors of this kind.
1379             unless ( $loc ) {
1380                 last;
1381             }
1382
1383             $sectors{ $types{$multiple} }{ $loc } = undef;
1384             $old_loc = $loc + $self->SIG_SIZE + $STALE_SIZE;
1385             $return .= " $loc";
1386         }
1387         $return .= $/;
1388     }
1389
1390     SECTOR:
1391     while ( $spot < $self->storage->{end} ) {
1392         # Read each sector in order.
1393         my $sector = $self->_load_sector( $spot );
1394         if ( !$sector ) {
1395             # Find it in the free-sectors that were found already
1396             foreach my $type ( keys %sectors ) {
1397                 if ( exists $sectors{$type}{$spot} ) {
1398                     my $size = $sizes{$type};
1399                     $return .= sprintf "%08d: %s %04d\n", $spot, 'F' . $type, $size;
1400                     $spot += $size;
1401                     next SECTOR;
1402                 }
1403             }
1404
1405             die "********\n$return\nDidn't find free sector for $spot in chains\n********\n";
1406         }
1407         else {
1408             $return .= sprintf "%08d: %s  %04d", $spot, $sector->type, $sector->size;
1409             if ( $sector->type eq 'D' ) {
1410                 $return .= ' ' . $sector->data;
1411             }
1412             elsif ( $sector->type eq 'A' || $sector->type eq 'H' ) {
1413                 $return .= ' REF: ' . $sector->get_refcount;
1414             }
1415             elsif ( $sector->type eq 'B' ) {
1416                 foreach my $bucket ( $sector->chopped_up ) {
1417                     $return .= "\n    ";
1418                     $return .= sprintf "%08d", unpack($StP{$self->byte_size},
1419                         substr( $bucket->[-1], $self->hash_size, $self->byte_size),
1420                     );
1421                     my $l = unpack( $StP{$self->byte_size},
1422                         substr( $bucket->[-1],
1423                             $self->hash_size + $self->byte_size,
1424                             $self->byte_size,
1425                         ),
1426                     );
1427                     $return .= sprintf " %08d", $l;
1428                     foreach my $txn ( 0 .. $self->num_txns - 2 ) {
1429                         my $l = unpack( $StP{$self->byte_size},
1430                             substr( $bucket->[-1],
1431                                 $self->hash_size + 2 * $self->byte_size + $txn * ($self->byte_size + $STALE_SIZE),
1432                                 $self->byte_size,
1433                             ),
1434                         );
1435                         $return .= sprintf " %08d", $l;
1436                     }
1437                 }
1438             }
1439             $return .= $/;
1440
1441             $spot += $sector->size;
1442         }
1443     }
1444
1445     return $return;
1446 }
1447
1448 1;
1449 __END__