384dc7992777673c3452f2b1b9e5bf4d8c90a531
[dbsrgits/DBIx-Data-Store-old.git] / lib / DBIx / Data / Collection / Set.pm
1 package DBIx::Data::Collection::Set;
2
3 use Moose;
4 use Method::Signatures::Simple;
5 use Data::Perl::Stream::Array;
6 use Data::Perl::Collection::Set;
7 use Scalar::Util qw(weaken refaddr);
8
9 has _inflator => (
10    is => 'ro',
11    handles => {
12       _inflate      => 'inflate',
13       _deflate      => 'deflate',
14       _merge        => 'merge',
15       _refresh      => 'refresh',
16       _deflate_spec => 'deflate_spec',
17       _merge_spec   => 'merge_spec',
18    },
19    default => method {
20       require DBIx::Data::Store::SimpleInflator;
21       my $args = {};
22       $args->{class} = $self->_class if $self->_has_class;
23       DBIx::Data::Store::SimpleInflator->new($args)
24    },
25 );
26
27 has _store => (is => 'ro', required => 1, init_arg => 'store');
28
29 has _class => (is => 'ro', predicate => '_has_class', init_arg => 'class');
30
31 has _set_over => (is => 'ro', required => 1, init_arg => 'set_over');
32
33 ## member cache (all members)
34
35 has _member_cache => (
36   is => 'ro', lazy_build => 1,
37   predicate => '_member_cache_built',
38   writer => '_set_member_cache',
39 );
40
41 method _build__member_cache {
42   my $stream = $self->_new_raw_stream;
43   my @cache;
44   while (my ($raw) = $stream->next) {
45     my $obj = do {
46       if (my ($obj) = $self->_key_cache_get_raw($raw)) {
47         # can't just $self->_merge($obj, $raw) since $obj might have changed
48         $self->_refresh($obj, $raw)
49       } else {
50         $self->_add_to_key_cache($self->_inflate($raw))
51       }
52     };
53     push @cache, $obj;
54   }
55   $self->_notify_observers(all_members => \@cache);
56   \@cache
57 }
58
59 method _add_to_member_cache ($to_add) {
60   return $to_add unless $self->_member_cache_built;
61   push @{$self->_member_cache}, $to_add;
62   $to_add
63 }
64
65 method _remove_from_member_cache ($to_remove) {
66   return $to_remove unless $self->_member_cache_built;
67   @{$self->_member_cache} = grep $_ ne $to_remove, @{$self->_member_cache};
68   $to_remove
69 }
70
71 ## key cache - by primary/unique key
72
73 has _key_cache => (is => 'ro', default => sub { {} });
74
75 method _add_to_key_cache ($to_add) {
76   $self->_key_cache->{$self->_object_to_id($to_add)} = $to_add;
77   $to_add
78 }
79
80 method _remove_from_key_cache ($to_remove) {
81   # should return $to_remove
82   delete $self->_key_cache->{$self->_object_to_id($to_remove)}
83 }
84
85 method _key_cache_has_raw ($raw) {
86   exists $self->_key_cache->{$self->_raw_to_id($raw)}
87 }
88
89 method _key_cache_has_object ($obj) {
90   exists $self->_key_cache->{$self->_object_to_id($obj)}
91 }
92
93 method _key_cache_get_raw ($raw) {
94   $self->_key_cache_get_id($self->_raw_to_id($raw))
95 }
96
97 method _key_cache_get_object ($obj) {
98   $self->_key_cache_get_id($self->_object_to_id($obj))
99 }
100
101 method _key_cache_get_object_spec ($spec) {
102   # see _object_spec_to_id for doc of what the difference is
103   $self->_key_cache_get_id($self->_object_spec_to_id($spec))
104 }
105
106 method _key_cache_get_id ($id) {
107   exists $self->_key_cache->{$id}
108     ? ($self->_key_cache->{$id})
109     : ()
110 }
111
112 method _all_key_cache_members {
113   values %{$self->_key_cache}
114 }
115
116 method _set_key_cache_members ($members) {
117   %{$self->_key_cache} = (map +($self->_object_to_id($_) => $_), @$members);
118   return
119 }
120
121 ## observers
122
123 has _observer_callbacks => (
124   is => 'ro', default => sub { {} },
125 );
126
127 method _notify_observers ($event, $payload) {
128   my $oc = $self->_observer_callbacks;
129   foreach my $refaddr (keys %$oc) {
130     my ($obj, $cb) = @{$oc->{$refaddr}};
131     unless (defined $obj) { # weak ref was garbage collected
132       delete $oc->{$refaddr};
133       next;
134     }
135     $obj->$cb($self, $event, $payload);
136   }
137   $payload
138 }
139
140 method _register_observer ($obj, $cb) {
141   my $entry = [ $obj, $cb ];
142   weaken($entry->[0]);
143   $self->_observer_callbacks->{refaddr($obj)} = $entry;
144   return
145 }
146
147 method _setup_observation_of ($other) {
148   $other->_register_observer($self, method ($from, $event, $payload) {
149     if ($event eq 'add' or $event eq 'get') {
150       $self->_add_to_caches($payload);
151     } elsif ($event eq 'remove') {
152       $self->_remove_from_caches($payload);
153     } elsif ($event eq 'all_members') {
154       # separate arrayref since future add will trigger push()
155       $self->_set_caches([ @$payload ]);
156     }
157   });
158   return
159 }
160
161
162 ## methods to get ids
163
164 method _raw_to_id ($raw) {
165   # XXX must escape this. or do something else.
166   join ';', map $raw->{$_}, @{$self->_set_over}
167 }
168
169 method _object_to_id ($obj) {
170   $self->_raw_to_id($self->_deflate($obj))
171 }
172
173 method _object_spec_to_id ($spec) {
174   # intentionally C&P from _raw_to - this is not the same thing. If a column
175   # were mapped to an attribute of a different name, the raw would have the
176   # column name as a key but an object spec would have the attribute name
177   join ';', map $spec->{$_}, @{$self->_set_over}
178 }
179
180 ## array-ish operations - i.e. get all members
181
182 method _new_raw_stream {
183   $self->_store->new_select_command({})->execute
184 }
185
186 method flatten {
187   @{$self->_member_cache};
188 }
189
190 method as_stream {
191   Data::Perl::Stream::Array->new(array => $self->_member_cache);
192 }
193
194 # theoretically inefficient except that if we're being asked this then
195 # either the data should have been pre-loaded or we're going to get all
196 # elements anyway
197
198 method count {
199   scalar $self->flatten
200 }
201
202 method map ($sub) {
203   Data::Perl::Collection::Set->new(
204     members => [ map $sub->($_), $self->flatten ]
205   )
206 }
207
208 method _set_caches ($members) {
209   $self->_set_member_cache($members);
210   $self->_set_key_cache_members($members);
211   return
212 }
213
214 ## load single row
215
216 method get ($spec) {
217   if (my ($got) = $self->_key_cache_get_object_spec($spec)) {
218     return $got
219   }
220   if (my ($raw) = $self->_get_from_store($self->_deflate_spec($spec))) {
221     return $self->_notify_observers(
222       get => $self->_add_to_key_cache($self->_inflate($raw))
223     );
224   }
225   return undef # we aren't handling cache misses here yet
226 }
227
228 method _get_from_store ($raw) {
229   $self->_store->new_select_single_command($raw)->execute
230 }
231
232 ## add member
233
234 method add ($new) {
235   $self->_add_to_store($new);
236   $self->_add_to_caches($new);
237   $self->_notify_observers(add => $new);
238   $new
239 }
240
241 method _add_to_store ($new) {
242   my $new_raw = $self->_deflate($new);
243   $self->_merge($new, $self->_store->new_insert_command($new_raw)->execute);
244   $new
245 }
246
247 method _add_to_caches ($new) {
248   $self->_add_to_member_cache($new);
249   $self->_add_to_key_cache($new);
250   $new
251 }
252
253 ## remove member
254
255 method remove ($old) {
256   $self->_remove_from_store($old);
257   $self->_remove_from_caches($old);
258   $self->_notify_observers(remove => $old);
259   $old
260 }
261
262 method _remove_from_store ($old) {
263   $self->_store->new_delete_single_command($self->_deflate($old))->execute
264 }
265
266 method _remove_from_caches ($old) {
267   $self->_remove_from_member_cache($old);
268   $self->_remove_from_key_cache($old);
269   $old
270 }
271
272 ## update member
273
274 method _update_in_store ($obj) {
275   # this is currently a call command but we should think about it
276   # being a row command so that we can have RETURNING or other
277   # mechanisms handle things like set-on-update datetime values
278   $self->_store->new_update_single_command($self->_deflate($obj))->execute
279 }
280
281 # I do wonder if we needed _merge_spec or if we'd be better off with
282 # just using the raw merge routine ...
283
284 method _update_set_in_store ($spec) {
285   $self->_store->new_update_command($self->_deflate_spec($spec))->execute;
286   if ($self->_member_cache_built) {
287     my $cache = $self->_member_cache;
288     foreach my $obj (@{$cache}) {
289       $self->_merge_spec($obj, $spec);
290     }
291     $self->_notify_observers(all_members => $cache);
292   }
293   return
294 }
295
296 method _remove_set_from_store {
297   $self->_store->new_delete_command->execute;
298   $self->_set_caches([]);
299   $self->_notify_observers(all_members => []);
300   return
301 }
302
303 1;