you cannot take off a version number
[dbsrgits/DBIx-Class-Journal.git] / lib / DBIx / Class / Schema / Journal.pm
1 package DBIx::Class::Schema::Journal;
2
3 use base qw/DBIx::Class/;
4
5 use Scalar::Util 'blessed';
6 use DBIx::Class::Schema::Journal::DB;
7 use Class::C3::Componentised ();
8
9 __PACKAGE__->mk_classdata('journal_storage_type');
10 __PACKAGE__->mk_classdata('journal_connection');
11 __PACKAGE__->mk_classdata('journal_deploy_on_connect');
12 __PACKAGE__->mk_classdata('journal_sources'); ## [ source names ]
13 __PACKAGE__->mk_classdata('journal_user'); ## [ class, field for user id ]
14 __PACKAGE__->mk_classdata('journal_copy_sources');
15 __PACKAGE__->mk_classdata('__journal_schema_prototype');
16 __PACKAGE__->mk_classdata('_journal_schema'); ## schema object for journal
17 __PACKAGE__->mk_classdata('journal_component');
18 __PACKAGE__->mk_classdata('journal_components');
19 __PACKAGE__->mk_classdata('journal_nested_changesets');
20 __PACKAGE__->mk_classdata('journal_prefix');
21
22 use strict;
23 use warnings;
24
25 our $VERSION = '0.01';
26
27 sub _journal_schema_prototype {
28     my $self = shift;
29     if (my $proto = $self->__journal_schema_prototype) {
30           return $proto;
31     }
32     my $c = blessed($self)||$self;
33     my $journal_schema_class = "${c}::_JOURNAL";
34     Class::C3::Componentised->inject_base($journal_schema_class, 'DBIx::Class::Schema::Journal::DB');
35     $journal_schema_class->load_components($self->journal_components)
36         if $self->journal_components;
37     my $proto = $self->__journal_schema_prototype (
38         $journal_schema_class->compose_namespace( $c.'::Journal')
39     );
40
41
42     my $comp = $self->journal_component || "Journal";
43
44     my $prefix = $self->journal_prefix || '';
45     foreach my $audit (qw(ChangeSet ChangeLog)) {
46         my $class = blessed($proto) . "::$audit";
47
48         Class::C3::Componentised->inject_base($class, "DBIx::Class::Schema::Journal::DB::$audit");
49
50         $class->journal_define_table(blessed($proto), $prefix);
51
52         $proto->register_class($audit, $class);
53
54         $self->register_class($audit, $class)
55             if $self->journal_copy_sources;
56     }
57
58     ## Create auditlog+history per table
59     my %j_sources = map { $_ => 1 } $self->journal_sources
60        ? @{$self->journal_sources}
61        : $self->sources;
62
63     foreach my $s_name ($self->sources) {
64         next unless($j_sources{$s_name});
65         $self->create_journal_for($s_name => $proto);
66         $self->class($s_name)->load_components($comp);
67     }
68     return $proto;
69 }
70
71 sub connection {
72     my $self = shift;
73     my $schema = $self->next::method(@_);
74
75     my $journal_schema = (ref $self||$self)->_journal_schema_prototype->clone;
76
77     if($self->journal_connection) {
78         $journal_schema->storage_type($self->journal_storage_type)
79             if $self->journal_storage_type;
80         $journal_schema->connection(@{ $self->journal_connection });
81     } else {
82         $journal_schema->storage( $schema->storage );
83     }
84
85     $self->_journal_schema($journal_schema);
86
87
88     if ( $self->journal_nested_changesets ) {
89         $self->_journal_schema->nested_changesets(1);
90         die 'FIXME nested changeset schema not yet supported... add parent_id to ChangeSet here';
91     }
92
93     $self->journal_schema_deploy()
94         if $self->journal_deploy_on_connect;
95
96     ## Set up relationship between changeset->user_id and this schema's user
97     if(!@{$self->journal_user || []}) {
98         #warn "No Journal User set!"; # no need to warn, user_id is useful even without a rel
99         return $schema;
100     }
101
102     $self->_journal_schema->class('ChangeSet')->belongs_to('user', @{$self->journal_user});
103     $self->_journal_schema->storage->disconnect();
104
105     return $schema;
106 }
107
108 sub journal_schema_deploy {
109     my $self = shift;
110
111     $self->_journal_schema->deploy(@_);
112 }
113
114 sub create_journal_for {
115     my ($self, $s_name, $journal_schema) = @_;
116
117     my $source = $self->source($s_name);
118
119     foreach my $audit (qw(AuditLog AuditHistory)) {
120         my $audit_source = $s_name.$audit;
121         my $class = blessed($journal_schema) . "::$audit_source";
122
123         Class::C3::Componentised->inject_base($class, "DBIx::Class::Schema::Journal::DB::$audit");
124
125         $class->journal_define_table($source, blessed($journal_schema));
126
127         $journal_schema->register_class($audit_source, $class);
128
129         $self->register_class($audit_source, $class)
130             if $self->journal_copy_sources;
131     }
132 }
133
134 # XXX FIXME deploy is not idempotent :-(
135 sub bootstrap_journal {
136     my $self = shift;
137     $self->journal_schema_deploy;
138     $self->prepopulate_journal;
139 }
140
141 # copy data from original schema sources into the journal as inserts in one
142 # changeset, so that later deletes will not fail to be journalled.
143 sub prepopulate_journal {
144     my $self = shift;
145     my $schema = $self;
146
147     # woah, looks like prepopulate has already run?
148     return if $schema->_journal_schema->resultset('ChangeSet')->count != 0;
149
150     # using our own overridden txn_do (see below) will create a changeset
151     $schema->txn_do( sub {
152         my %j_sources = map { $_ => 1 } $self->journal_sources
153         ? @{$self->journal_sources}
154         : $self->sources;
155
156         my $j_schema = $self->_journal_schema;
157         my $changelog_rs = $j_schema->resultset('ChangeLog');
158         my $chs_id = $j_schema->current_changeset;
159
160         foreach my $s_name ($self->sources) {
161             next unless $j_sources{$s_name};
162
163             my $from_rs = $schema->resultset($s_name);
164             my @pks = $from_rs->result_source->primary_columns;
165             $from_rs->result_class('DBIx::Class::ResultClass::HashRefInflator');
166
167             my $to_rs  = $j_schema->resultset("${s_name}AuditHistory");
168             my $log_rs = $j_schema->resultset("${s_name}AuditLog");
169
170             my $page = 1;
171             while (
172                 my @x = $from_rs->search(undef, {
173                     rows => 1_000,
174                     page => $page++,
175                 })
176             ) {
177                 # get some number of change log IDs to be generated for this page
178                 my @log_ids = map $_->id,
179                    $changelog_rs->populate([
180                        map +{ changeset_id => $chs_id }, (0 .. $#x)
181                    ]);
182
183
184                 my @datas;
185                 for my $idx (0 .. $#x ) {
186                    push @datas, {
187                        create_id => $log_ids[$idx],
188                        map { $_ => $x[$idx]->{$_} } @pks,
189                    }
190                 }
191                 # create the audit log entries for the rows in this page
192                 $log_rs->populate([@datas]);
193
194                 # now populate the audit history
195                 $to_rs->populate([
196                     map +{
197                         %{$x[$_]},
198                         audit_change_id => $log_ids[$_],
199                     }, (0 .. $#x)
200                 ]);
201             }
202         }
203     });
204 }
205
206 sub txn_do {
207     my ($self, $user_code, @args) = @_;
208
209     my $jschema = $self->_journal_schema;
210
211     my $code = $user_code;
212
213     my $current_changeset = $jschema->_current_changeset;
214     if ( !$current_changeset || $self->journal_nested_changesets ) {
215         my $current_changeset_ref = $jschema->_current_changeset_container;
216
217         unless ( $current_changeset_ref ) {
218             # this is a hash because scalar refs can't be localized
219             $current_changeset_ref = { };
220             $jschema->_current_changeset_container($current_changeset_ref);
221         }
222
223         # wrap the thunk with a new changeset creation
224         $code = sub {
225             my $changeset = $jschema->journal_create_changeset( parent_id => $current_changeset );
226             local $current_changeset_ref->{changeset} = $changeset->id;
227             $user_code->(@_);
228         };
229
230     }
231
232     if ( $jschema->storage != $self->storage ) {
233         my $inner_code = $code;
234         $code = sub { $jschema->txn_do($inner_code, @_) };
235     }
236
237     return $self->next::method($code, @args);
238 }
239
240 sub changeset_user {
241     my ($self, $userid) = @_;
242
243     return $self->_journal_schema->current_user()
244        if @_ == 1;
245
246     $self->_journal_schema->current_user($userid);
247 }
248
249 sub changeset_session {
250     my ($self, $sessionid) = @_;
251
252     return $self->_journal_schema->current_session()
253        if @_ == 1;
254
255     $self->_journal_schema->current_session($sessionid);
256 }
257
258 1;