Commit | Line | Data |
f0f14c64 |
1 | package DBIx::Class::Schema::Journal; |
2 | |
3 | use base qw/DBIx::Class/; |
4 | |
d27ed438 |
5 | use Scalar::Util 'blessed'; |
b5851590 |
6 | use DBIx::Class::Schema::Journal::DB; |
c8f87617 |
7 | use Class::C3::Componentised (); |
d27ed438 |
8 | |
9 | __PACKAGE__->mk_classdata('journal_storage_type'); |
10 | __PACKAGE__->mk_classdata('journal_connection'); |
aba93491 |
11 | __PACKAGE__->mk_classdata('journal_deploy_on_connect'); |
f0f14c64 |
12 | __PACKAGE__->mk_classdata('journal_sources'); ## [ source names ] |
13 | __PACKAGE__->mk_classdata('journal_user'); ## [ class, field for user id ] |
53c47638 |
14 | __PACKAGE__->mk_classdata('journal_copy_sources'); |
2ae5bebc |
15 | __PACKAGE__->mk_classdata('__journal_schema_prototype'); |
ec16e73a |
16 | __PACKAGE__->mk_classdata('_journal_schema'); ## schema object for journal |
8dc58fe2 |
17 | __PACKAGE__->mk_classdata('journal_component'); |
751cfa93 |
18 | __PACKAGE__->mk_classdata('journal_components'); |
aba93491 |
19 | __PACKAGE__->mk_classdata('journal_nested_changesets'); |
5b64dcdc |
20 | __PACKAGE__->mk_classdata('journal_prefix'); |
f0f14c64 |
21 | |
ec16e73a |
22 | use strict; |
23 | use warnings; |
f3602465 |
24 | |
d19af369 |
25 | |
548cc9f7 |
26 | sub _journal_schema_prototype { |
2ae5bebc |
27 | my $self = shift; |
548cc9f7 |
28 | if (my $proto = $self->__journal_schema_prototype) { |
2ae5bebc |
29 | return $proto; |
30 | } |
928b6c45 |
31 | my $c = blessed($self)||$self; |
32 | my $journal_schema_class = "${c}::_JOURNAL"; |
33 | Class::C3::Componentised->inject_base($journal_schema_class, 'DBIx::Class::Schema::Journal::DB'); |
751cfa93 |
34 | $journal_schema_class->load_components($self->journal_components) |
35 | if $self->journal_components; |
548cc9f7 |
36 | my $proto = $self->__journal_schema_prototype ( |
37 | $journal_schema_class->compose_namespace( $c.'::Journal') |
2ae5bebc |
38 | ); |
751cfa93 |
39 | |
40 | |
53c47638 |
41 | my $comp = $self->journal_component || "Journal"; |
5b64dcdc |
42 | |
43 | my $prefix = $self->journal_prefix || ''; |
44 | foreach my $audit (qw(ChangeSet ChangeLog)) { |
45 | my $class = blessed($proto) . "::$audit"; |
46 | |
47 | Class::C3::Componentised->inject_base($class, "DBIx::Class::Schema::Journal::DB::$audit"); |
48 | |
49 | $class->journal_define_table(blessed($proto), $prefix); |
50 | |
51 | $proto->register_class($audit, $class); |
52 | |
53 | $self->register_class($audit, $class) |
54 | if $self->journal_copy_sources; |
55 | } |
53c47638 |
56 | |
57 | ## Create auditlog+history per table |
58 | my %j_sources = map { $_ => 1 } $self->journal_sources |
548cc9f7 |
59 | ? @{$self->journal_sources} |
60 | : $self->sources; |
53c47638 |
61 | |
548cc9f7 |
62 | foreach my $s_name ($self->sources) { |
53c47638 |
63 | next unless($j_sources{$s_name}); |
64 | $self->create_journal_for($s_name => $proto); |
65 | $self->class($s_name)->load_components($comp); |
53c47638 |
66 | } |
67 | return $proto; |
2ae5bebc |
68 | } |
69 | |
548cc9f7 |
70 | sub connection { |
f0f14c64 |
71 | my $self = shift; |
52558dc4 |
72 | my $schema = $self->next::method(@_); |
f0f14c64 |
73 | |
53c47638 |
74 | my $journal_schema = (ref $self||$self)->_journal_schema_prototype->clone; |
0f91ba2b |
75 | |
548cc9f7 |
76 | if($self->journal_connection) { |
77 | $journal_schema->storage_type($self->journal_storage_type) |
78 | if $self->journal_storage_type; |
0f91ba2b |
79 | $journal_schema->connection(@{ $self->journal_connection }); |
80 | } else { |
81 | $journal_schema->storage( $schema->storage ); |
d27ed438 |
82 | } |
83 | |
0f91ba2b |
84 | $self->_journal_schema($journal_schema); |
85 | |
f0f14c64 |
86 | |
aba93491 |
87 | if ( $self->journal_nested_changesets ) { |
88 | $self->_journal_schema->nested_changesets(1); |
548cc9f7 |
89 | die 'FIXME nested changeset schema not yet supported... add parent_id to ChangeSet here'; |
aba93491 |
90 | } |
91 | |
aa873584 |
92 | $self->journal_schema_deploy() |
aba93491 |
93 | if $self->journal_deploy_on_connect; |
51b16220 |
94 | |
f0f14c64 |
95 | ## Set up relationship between changeset->user_id and this schema's user |
548cc9f7 |
96 | if(!@{$self->journal_user || []}) { |
64b056b4 |
97 | #warn "No Journal User set!"; # no need to warn, user_id is useful even without a rel |
51b16220 |
98 | return $schema; |
f0f14c64 |
99 | } |
100 | |
c5fba518 |
101 | $self->_journal_schema->class('ChangeSet')->belongs_to('user', @{$self->journal_user}); |
f4f0b7c9 |
102 | $self->_journal_schema->storage->disconnect(); |
52558dc4 |
103 | |
104 | return $schema; |
f0f14c64 |
105 | } |
106 | |
548cc9f7 |
107 | sub deploy { |
108 | my $self = shift; |
aba93491 |
109 | |
548cc9f7 |
110 | $self->next::method(@_); |
aba93491 |
111 | |
548cc9f7 |
112 | $self->journal_schema_deploy(@_); |
aba93491 |
113 | } |
114 | |
548cc9f7 |
115 | sub journal_schema_deploy { |
116 | my $self = shift; |
5fc8406c |
117 | |
548cc9f7 |
118 | $self->_journal_schema->deploy(@_); |
51b16220 |
119 | } |
120 | |
548cc9f7 |
121 | sub create_journal_for { |
53c47638 |
122 | my ($self, $s_name, $journal_schema) = @_; |
f0f14c64 |
123 | |
124 | my $source = $self->source($s_name); |
30a4f241 |
125 | |
59c8adb5 |
126 | foreach my $audit (qw(AuditLog AuditHistory)) { |
851aad7c |
127 | my $audit_source = $s_name.$audit; |
53c47638 |
128 | my $class = blessed($journal_schema) . "::$audit_source"; |
f0f14c64 |
129 | |
928b6c45 |
130 | Class::C3::Componentised->inject_base($class, "DBIx::Class::Schema::Journal::DB::$audit"); |
0f91ba2b |
131 | |
5b64dcdc |
132 | $class->journal_define_table($source, blessed($journal_schema)); |
0f91ba2b |
133 | |
53c47638 |
134 | $journal_schema->register_class($audit_source, $class); |
135 | |
548cc9f7 |
136 | $self->register_class($audit_source, $class) |
137 | if $self->journal_copy_sources; |
0f91ba2b |
138 | } |
f0f14c64 |
139 | } |
140 | |
462d8e70 |
141 | # XXX FIXME deploy is not idempotenta :-( |
142 | sub bootstrap_journal { |
1d09727d |
143 | my $self = shift; |
462d8e70 |
144 | $self->journal_schema_deploy; |
145 | $self->prepopulate_journal; |
146 | } |
1d09727d |
147 | |
462d8e70 |
148 | # copy data from original schema sources into the journal as inserts in one |
149 | # changeset, so that later deletes will not fail to be journalled. |
150 | sub prepopulate_journal { |
151 | my $self = shift; |
1d09727d |
152 | my $schema = $self; |
462d8e70 |
153 | |
154 | # woah, looks like prepopulate has already run? |
155 | return if $schema->_journal_schema->resultset('ChangeSet')->count != 0; |
1d09727d |
156 | |
157 | # using our own overridden txn_do (see below) will create a changeset |
158 | $schema->txn_do( sub { |
462d8e70 |
159 | my %j_sources = map { $_ => 1 } $self->journal_sources |
160 | ? @{$self->journal_sources} |
161 | : $self->sources; |
162 | |
163 | my $j_schema = $self->_journal_schema; |
164 | my $changelog_rs = $j_schema->resultset('ChangeLog'); |
1d09727d |
165 | my $chs_id = $j_schema->current_changeset; |
166 | |
167 | foreach my $s_name ($self->sources) { |
168 | next unless $j_sources{$s_name}; |
169 | |
170 | my $from_rs = $schema->resultset($s_name); |
171 | my ($pk) = $from_rs->result_source->primary_columns; |
172 | $from_rs->result_class('DBIx::Class::ResultClass::HashRefInflator'); |
173 | |
174 | my $to_rs = $j_schema->resultset("${s_name}AuditHistory"); |
175 | my $log_rs = $j_schema->resultset("${s_name}AuditLog"); |
176 | |
177 | my $page = 1; |
178 | while ( |
179 | my @x = $from_rs->search(undef, { |
180 | rows => 1_000, |
181 | page => $page++, |
182 | }) |
183 | ) { |
184 | # get some number of change log IDs to be generated for this page |
185 | my @log_ids = map { $_->id } |
186 | $changelog_rs->populate([ |
187 | map {{ changeset_id => $chs_id }} (0 .. $#x) |
188 | ]); |
189 | |
190 | # create the audit log entries for the rows in this page |
191 | $log_rs->populate([ |
192 | map {{ create_id => $log_ids[$_], id => $x[$_]->{$pk} }} (0 .. $#x) |
193 | ]); |
194 | |
195 | # now populate the audit history |
196 | $to_rs->populate([ |
197 | map +{ |
198 | %{$x[$_]}, |
199 | audit_change_id => $log_ids[$_], |
200 | }, (0 .. $#x) |
201 | ]); |
202 | } |
203 | } |
204 | }); |
205 | } |
206 | |
548cc9f7 |
207 | sub txn_do { |
aba93491 |
208 | my ($self, $user_code, @args) = @_; |
74f04ccc |
209 | |
aba93491 |
210 | my $jschema = $self->_journal_schema; |
8092c4ed |
211 | |
4233d9a1 |
212 | my $code = $user_code; |
ec16e73a |
213 | |
0f91ba2b |
214 | my $current_changeset = $jschema->_current_changeset; |
548cc9f7 |
215 | if ( !$current_changeset || $self->journal_nested_changesets ) { |
aba93491 |
216 | my $current_changeset_ref = $jschema->_current_changeset_container; |
217 | |
218 | unless ( $current_changeset_ref ) { |
219 | # this is a hash because scalar refs can't be localized |
220 | $current_changeset_ref = { }; |
221 | $jschema->_current_changeset_container($current_changeset_ref); |
222 | } |
223 | |
224 | # wrap the thunk with a new changeset creation |
225 | $code = sub { |
794e01fa |
226 | my $changeset = $jschema->journal_create_changeset( parent_id => $current_changeset ); |
227 | local $current_changeset_ref->{changeset} = $changeset->id; |
228 | $user_code->(@_); |
229 | }; |
4233d9a1 |
230 | |
52558dc4 |
231 | } |
74f04ccc |
232 | |
794e01fa |
233 | if ( $jschema->storage != $self->storage ) { |
234 | my $inner_code = $code; |
235 | $code = sub { $jschema->txn_do($inner_code, @_) }; |
236 | } |
4233d9a1 |
237 | |
794e01fa |
238 | return $self->next::method($code, @args); |
74f04ccc |
239 | } |
240 | |
548cc9f7 |
241 | sub changeset_user { |
ec16e73a |
242 | my ($self, $userid) = @_; |
243 | |
548cc9f7 |
244 | return $self->_journal_schema->current_user() |
245 | if @_ == 1; |
ec16e73a |
246 | |
247 | $self->_journal_schema->current_user($userid); |
248 | } |
249 | |
548cc9f7 |
250 | sub changeset_session { |
ec16e73a |
251 | my ($self, $sessionid) = @_; |
252 | |
548cc9f7 |
253 | return $self->_journal_schema->current_session() |
254 | if @_ == 1; |
ec16e73a |
255 | |
256 | $self->_journal_schema->current_session($sessionid); |
257 | } |
258 | |
f0f14c64 |
259 | 1; |