add a couple bits and pieces
[dbsrgits/DBIx-Data-Store-old.git] / lib / DBIx / Data / Collection / Set.pm
index ceb1e3f..3e965ba 100644 (file)
@@ -3,18 +3,21 @@ package DBIx::Data::Collection::Set;
 use Moose;
 use Method::Signatures::Simple;
 use Data::Perl::Stream::Array;
+use Data::Perl::Collection::Set;
+use Scalar::Util qw(weaken refaddr);
 
 has _store => (is => 'ro', required => 1, init_arg => 'store');
 
-has _class => (is => 'ro', predicate => '_has_class');
+has _class => (is => 'ro', predicate => '_has_class', init_arg => 'class');
 
 has _set_over => (is => 'ro', required => 1, init_arg => 'set_over');
 
 ## member cache (all members)
 
 has _member_cache => (
-  is => 'rw', lazy_build => 1,
+  is => 'ro', lazy_build => 1,
   predicate => '_member_cache_built',
+  writer => '_set_member_cache',
 );
 
 method _build__member_cache {
@@ -23,13 +26,15 @@ method _build__member_cache {
   while (my ($raw) = $stream->next) {
     my $obj = do {
       if (my ($obj) = $self->_key_cache_get_raw($raw)) {
-        $self->_merge($obj, $raw)
+        # can't just $self->_merge($obj, $raw) since $obj might have changed
+        $self->_refresh($obj, $raw)
       } else {
         $self->_add_to_key_cache($self->_inflate($raw))
       }
     };
     push @cache, $obj;
   }
+  $self->_notify_observers(all_members => \@cache);
   \@cache
 }
 
@@ -86,6 +91,55 @@ method _key_cache_get_id ($id) {
     : ()
 }
 
+method _all_key_cache_members {
+  values %{$self->_key_cache}
+}
+
+method _set_key_cache_members ($members) {
+  %{$self->_key_cache} = (map +($self->_object_to_id($_) => $_), @$members);
+  return
+}
+
+## observers
+
+has _observer_callbacks => (
+  is => 'ro', default => sub { {} },
+);
+
+method _notify_observers ($event, $payload) {
+  my $oc = $self->_observer_callbacks;
+  foreach my $refaddr (keys %$oc) {
+    my ($obj, $cb) = @{$oc->{$refaddr}};
+    unless (defined $obj) { # weak ref was garbage collected
+      delete $oc->{$refaddr};
+      next;
+    }
+    $obj->$cb($self, $event, $payload);
+  }
+  $payload
+}
+
+method _register_observer ($obj, $cb) {
+  my $entry = [ $obj, $cb ];
+  weaken($entry->[0]);
+  $self->_observer_callbacks->{refaddr($obj)} = $entry;
+  return
+}
+
+method _setup_observation_of ($other) {
+  $other->_register_observer($self, method ($from, $event, $payload) {
+    if ($event eq 'add' or $event eq 'get') {
+      $self->_add_to_caches($payload);
+    } elsif ($event eq 'remove') {
+      $self->_remove_from_caches($payload);
+    } elsif ($event eq 'all_members') {
+      # separate arrayref since future add will trigger push()
+      $self->_set_caches([ @$payload ]);
+    }
+  });
+  return
+}
+
 ## thunking between the store representation and the set representation
 #
 # _inflate is raw data -> final repr
@@ -94,6 +148,7 @@ method _key_cache_get_id ($id) {
 #    (this is used for pk-generated values and later lazy loading)
 #
 # _deflate_spec is attributes of final repr -> raw data
+# _merge_spec is final repr + extra attributes and update repr
 
 method _inflate ($raw) {
   bless($raw, $self->_class) if $self->_has_class;
@@ -109,10 +164,23 @@ method _merge ($obj, $raw) {
   $obj
 }
 
+method _refresh ($obj, $raw) {
+  # if $obj has been changed but not flushed we'd destroy data doing
+  # a blind merge - but if $obj has change tracking of some sort then
+  # we -could- do something safely, so this method exists to be mangled
+  # by subclasses
+  $obj
+}
+
 method _deflate_spec ($spec) {
   $spec
 }
 
+method _merge_spec ($obj, $spec) {
+  @{$obj}{keys %$spec} = values %$spec;
+  $obj
+}
+
 ## methods to get ids
 
 method _raw_to_id ($raw) {
@@ -134,7 +202,7 @@ method _object_spec_to_id ($spec) {
 ## array-ish operations - i.e. get all members
 
 method _new_raw_stream {
-  $self->_store->new_select_command([])->execute
+  $self->_store->new_select_command({})->execute
 }
 
 method flatten {
@@ -145,6 +213,26 @@ method as_stream {
   Data::Perl::Stream::Array->new(array => $self->_member_cache);
 }
 
+# theoretically inefficient except that if we're being asked this then
+# either the data should have been pre-loaded or we're going to get all
+# elements anyway
+
+method count {
+  scalar $self->flatten
+}
+
+method map ($sub) {
+  Data::Perl::Collection::Set->new(
+    members => [ map $sub->($_), $self->flatten ]
+  )
+}
+
+method _set_caches ($members) {
+  $self->_set_member_cache($members);
+  $self->_set_key_cache_members($members);
+  return
+}
+
 ## load single row
 
 method get ($spec) {
@@ -152,7 +240,9 @@ method get ($spec) {
     return $got
   }
   if (my ($raw) = $self->_get_from_store($self->_deflate_spec($spec))) {
-    return $self->_add_to_key_cache($self->_inflate($raw))
+    return $self->_notify_observers(
+      get => $self->_add_to_key_cache($self->_inflate($raw))
+    );
   }
   return undef # we aren't handling cache misses here yet
 }
@@ -161,11 +251,12 @@ method _get_from_store ($raw) {
   $self->_store->new_select_single_command($raw)->execute
 }
 
-## add to set
+## add member
 
 method add ($new) {
   $self->_add_to_store($new);
   $self->_add_to_caches($new);
+  $self->_notify_observers(add => $new);
   $new
 }
 
@@ -181,16 +272,17 @@ method _add_to_caches ($new) {
   $new
 }
 
-## remove from set
+## remove member
 
 method remove ($old) {
   $self->_remove_from_store($old);
   $self->_remove_from_caches($old);
+  $self->_notify_observers(remove => $old);
   $old
 }
 
 method _remove_from_store ($old) {
-  $self->_store->new_delete_command($self->_deflate($old))->execute
+  $self->_store->new_delete_single_command($self->_deflate($old))->execute
 }
 
 method _remove_from_caches ($old) {
@@ -199,13 +291,35 @@ method _remove_from_caches ($old) {
   $old
 }
 
-## update
+## update member
 
 method _update_in_store ($obj) {
   # this is currently a call command but we should think about it
   # being a row command so that we can have RETURNING or other
   # mechanisms handle things like set-on-update datetime values
-  $self->_store->new_update_command($self->_deflate($obj))->execute
+  $self->_store->new_update_single_command($self->_deflate($obj))->execute
+}
+
+# I do wonder if we needed _merge_spec or if we'd be better off with
+# just using the raw merge routine ...
+
+method _update_set_in_store ($spec) {
+  $self->_store->new_update_command($self->_deflate_spec($spec))->execute;
+  if ($self->_member_cache_built) {
+    my $cache = $self->_member_cache;
+    foreach my $obj (@{$cache}) {
+      $self->_merge_spec($obj, $spec);
+    }
+    $self->_notify_observers(all_members => $cache);
+  }
+  return
+}
+
+method _remove_set_from_store {
+  $self->_store->new_delete_command->execute;
+  $self->_set_caches([]);
+  $self->_notify_observers(all_members => []);
+  return
 }
 
 1;