add a couple bits and pieces
[dbsrgits/DBIx-Data-Store-old.git] / lib / DBIx / Data / Collection / Set.pm
1 package DBIx::Data::Collection::Set;
2
3 use Moose;
4 use Method::Signatures::Simple;
5 use Data::Perl::Stream::Array;
6 use Data::Perl::Collection::Set;
7 use Scalar::Util qw(weaken refaddr);
8
9 has _store => (is => 'ro', required => 1, init_arg => 'store');
10
11 has _class => (is => 'ro', predicate => '_has_class', init_arg => 'class');
12
13 has _set_over => (is => 'ro', required => 1, init_arg => 'set_over');
14
15 ## member cache (all members)
16
17 has _member_cache => (
18   is => 'ro', lazy_build => 1,
19   predicate => '_member_cache_built',
20   writer => '_set_member_cache',
21 );
22
23 method _build__member_cache {
24   my $stream = $self->_new_raw_stream;
25   my @cache;
26   while (my ($raw) = $stream->next) {
27     my $obj = do {
28       if (my ($obj) = $self->_key_cache_get_raw($raw)) {
29         # can't just $self->_merge($obj, $raw) since $obj might have changed
30         $self->_refresh($obj, $raw)
31       } else {
32         $self->_add_to_key_cache($self->_inflate($raw))
33       }
34     };
35     push @cache, $obj;
36   }
37   $self->_notify_observers(all_members => \@cache);
38   \@cache
39 }
40
41 method _add_to_member_cache ($to_add) {
42   return $to_add unless $self->_member_cache_built;
43   push @{$self->_member_cache}, $to_add;
44   $to_add
45 }
46
47 method _remove_from_member_cache ($to_remove) {
48   return $to_remove unless $self->_member_cache_built;
49   @{$self->_member_cache} = grep $_ ne $to_remove, @{$self->_member_cache};
50   $to_remove
51 }
52
53 ## key cache - by primary/unique key
54
55 has _key_cache => (is => 'ro', default => sub { {} });
56
57 method _add_to_key_cache ($to_add) {
58   $self->_key_cache->{$self->_object_to_id($to_add)} = $to_add;
59   $to_add
60 }
61
62 method _remove_from_key_cache ($to_remove) {
63   # should return $to_remove
64   delete $self->_key_cache->{$self->_object_to_id($to_remove)}
65 }
66
67 method _key_cache_has_raw ($raw) {
68   exists $self->_key_cache->{$self->_raw_to_id($raw)}
69 }
70
71 method _key_cache_has_object ($obj) {
72   exists $self->_key_cache->{$self->_object_to_id($obj)}
73 }
74
75 method _key_cache_get_raw ($raw) {
76   $self->_key_cache_get_id($self->_raw_to_id($raw))
77 }
78
79 method _key_cache_get_object ($obj) {
80   $self->_key_cache_get_id($self->_object_to_id($obj))
81 }
82
83 method _key_cache_get_object_spec ($spec) {
84   # see _object_spec_to_id for doc of what the difference is
85   $self->_key_cache_get_id($self->_object_spec_to_id($spec))
86 }
87
88 method _key_cache_get_id ($id) {
89   exists $self->_key_cache->{$id}
90     ? ($self->_key_cache->{$id})
91     : ()
92 }
93
94 method _all_key_cache_members {
95   values %{$self->_key_cache}
96 }
97
98 method _set_key_cache_members ($members) {
99   %{$self->_key_cache} = (map +($self->_object_to_id($_) => $_), @$members);
100   return
101 }
102
103 ## observers
104
105 has _observer_callbacks => (
106   is => 'ro', default => sub { {} },
107 );
108
109 method _notify_observers ($event, $payload) {
110   my $oc = $self->_observer_callbacks;
111   foreach my $refaddr (keys %$oc) {
112     my ($obj, $cb) = @{$oc->{$refaddr}};
113     unless (defined $obj) { # weak ref was garbage collected
114       delete $oc->{$refaddr};
115       next;
116     }
117     $obj->$cb($self, $event, $payload);
118   }
119   $payload
120 }
121
122 method _register_observer ($obj, $cb) {
123   my $entry = [ $obj, $cb ];
124   weaken($entry->[0]);
125   $self->_observer_callbacks->{refaddr($obj)} = $entry;
126   return
127 }
128
129 method _setup_observation_of ($other) {
130   $other->_register_observer($self, method ($from, $event, $payload) {
131     if ($event eq 'add' or $event eq 'get') {
132       $self->_add_to_caches($payload);
133     } elsif ($event eq 'remove') {
134       $self->_remove_from_caches($payload);
135     } elsif ($event eq 'all_members') {
136       # separate arrayref since future add will trigger push()
137       $self->_set_caches([ @$payload ]);
138     }
139   });
140   return
141 }
142
143 ## thunking between the store representation and the set representation
144 #
145 # _inflate is raw data -> final repr
146 # _deflate is final repr -> raw data
147 # _merge takes final repr + raw data and updates the repr
148 #    (this is used for pk-generated values and later lazy loading)
149 #
150 # _deflate_spec is attributes of final repr -> raw data
151 # _merge_spec is final repr + extra attributes and update repr
152
153 method _inflate ($raw) {
154   bless($raw, $self->_class) if $self->_has_class;
155   $raw
156 }
157
158 method _deflate ($obj) {
159   +{ %$obj }
160 }
161
162 method _merge ($obj, $raw) {
163   @{$obj}{keys %$raw} = values %$raw;
164   $obj
165 }
166
167 method _refresh ($obj, $raw) {
168   # if $obj has been changed but not flushed we'd destroy data doing
169   # a blind merge - but if $obj has change tracking of some sort then
170   # we -could- do something safely, so this method exists to be mangled
171   # by subclasses
172   $obj
173 }
174
175 method _deflate_spec ($spec) {
176   $spec
177 }
178
179 method _merge_spec ($obj, $spec) {
180   @{$obj}{keys %$spec} = values %$spec;
181   $obj
182 }
183
184 ## methods to get ids
185
186 method _raw_to_id ($raw) {
187   # XXX must escape this. or do something else.
188   join ';', map $raw->{$_}, @{$self->_set_over}
189 }
190
191 method _object_to_id ($obj) {
192   $self->_raw_to_id($self->_deflate($obj))
193 }
194
195 method _object_spec_to_id ($spec) {
196   # intentionally C&P from _raw_to - this is not the same thing. If a column
197   # were mapped to an attribute of a different name, the raw would have the
198   # column name as a key but an object spec would have the attribute name
199   join ';', map $spec->{$_}, @{$self->_set_over}
200 }
201
202 ## array-ish operations - i.e. get all members
203
204 method _new_raw_stream {
205   $self->_store->new_select_command({})->execute
206 }
207
208 method flatten {
209   @{$self->_member_cache};
210 }
211
212 method as_stream {
213   Data::Perl::Stream::Array->new(array => $self->_member_cache);
214 }
215
216 # theoretically inefficient except that if we're being asked this then
217 # either the data should have been pre-loaded or we're going to get all
218 # elements anyway
219
220 method count {
221   scalar $self->flatten
222 }
223
224 method map ($sub) {
225   Data::Perl::Collection::Set->new(
226     members => [ map $sub->($_), $self->flatten ]
227   )
228 }
229
230 method _set_caches ($members) {
231   $self->_set_member_cache($members);
232   $self->_set_key_cache_members($members);
233   return
234 }
235
236 ## load single row
237
238 method get ($spec) {
239   if (my ($got) = $self->_key_cache_get_object_spec($spec)) {
240     return $got
241   }
242   if (my ($raw) = $self->_get_from_store($self->_deflate_spec($spec))) {
243     return $self->_notify_observers(
244       get => $self->_add_to_key_cache($self->_inflate($raw))
245     );
246   }
247   return undef # we aren't handling cache misses here yet
248 }
249
250 method _get_from_store ($raw) {
251   $self->_store->new_select_single_command($raw)->execute
252 }
253
254 ## add member
255
256 method add ($new) {
257   $self->_add_to_store($new);
258   $self->_add_to_caches($new);
259   $self->_notify_observers(add => $new);
260   $new
261 }
262
263 method _add_to_store ($new) {
264   my $new_raw = $self->_deflate($new);
265   $self->_merge($new, $self->_store->new_insert_command($new_raw)->execute);
266   $new
267 }
268
269 method _add_to_caches ($new) {
270   $self->_add_to_member_cache($new);
271   $self->_add_to_key_cache($new);
272   $new
273 }
274
275 ## remove member
276
277 method remove ($old) {
278   $self->_remove_from_store($old);
279   $self->_remove_from_caches($old);
280   $self->_notify_observers(remove => $old);
281   $old
282 }
283
284 method _remove_from_store ($old) {
285   $self->_store->new_delete_single_command($self->_deflate($old))->execute
286 }
287
288 method _remove_from_caches ($old) {
289   $self->_remove_from_member_cache($old);
290   $self->_remove_from_key_cache($old);
291   $old
292 }
293
294 ## update member
295
296 method _update_in_store ($obj) {
297   # this is currently a call command but we should think about it
298   # being a row command so that we can have RETURNING or other
299   # mechanisms handle things like set-on-update datetime values
300   $self->_store->new_update_single_command($self->_deflate($obj))->execute
301 }
302
303 # I do wonder if we needed _merge_spec or if we'd be better off with
304 # just using the raw merge routine ...
305
306 method _update_set_in_store ($spec) {
307   $self->_store->new_update_command($self->_deflate_spec($spec))->execute;
308   if ($self->_member_cache_built) {
309     my $cache = $self->_member_cache;
310     foreach my $obj (@{$cache}) {
311       $self->_merge_spec($obj, $spec);
312     }
313     $self->_notify_observers(all_members => $cache);
314   }
315   return
316 }
317
318 method _remove_set_from_store {
319   $self->_store->new_delete_command->execute;
320   $self->_set_caches([]);
321   $self->_notify_observers(all_members => []);
322   return
323 }
324
325 1;