use Moose;
use Method::Signatures::Simple;
use Data::Perl::Stream::Array;
+use Scalar::Util qw(weaken refaddr);
has _store => (is => 'ro', required => 1, init_arg => 'store');
-has _class => (is => 'ro', predicate => '_has_class');
+has _class => (is => 'ro', predicate => '_has_class', init_arg => 'class');
-has _member_cache => (is => 'rw', lazy_build => 1);
+has _set_over => (is => 'ro', required => 1, init_arg => 'set_over');
+
+## member cache (all members)
+
+has _member_cache => (
+ is => 'ro', lazy_build => 1,
+ predicate => '_member_cache_built',
+ writer => '_set_member_cache',
+);
method _build__member_cache {
my $stream = $self->_new_raw_stream;
my @cache;
while (my ($raw) = $stream->next) {
- push @cache, $self->_inflate($raw);
+ my $obj = do {
+ if (my ($obj) = $self->_key_cache_get_raw($raw)) {
+ # can't just $self->_merge($obj, $raw) since $obj might have changed
+ $self->_refresh($obj, $raw)
+ } else {
+ $self->_add_to_key_cache($self->_inflate($raw))
+ }
+ };
+ push @cache, $obj;
}
- \@cache;
+ $self->_notify_observers(all_members => \@cache);
+ \@cache
}
-method _new_raw_stream {
- $self->_store->new_select_command([])->execute;
+method _add_to_member_cache ($to_add) {
+ return $to_add unless $self->_member_cache_built;
+ push @{$self->_member_cache}, $to_add;
+ $to_add
+}
+
+method _remove_from_member_cache ($to_remove) {
+ return $to_remove unless $self->_member_cache_built;
+ @{$self->_member_cache} = grep $_ ne $to_remove, @{$self->_member_cache};
+ $to_remove
+}
+
+## key cache - by primary/unique key
+
+has _key_cache => (is => 'ro', default => sub { {} });
+
+method _add_to_key_cache ($to_add) {
+ $self->_key_cache->{$self->_object_to_id($to_add)} = $to_add;
+ $to_add
+}
+
+method _remove_from_key_cache ($to_remove) {
+ # should return $to_remove
+ delete $self->_key_cache->{$self->_object_to_id($to_remove)}
+}
+
+method _key_cache_has_raw ($raw) {
+ exists $self->_key_cache->{$self->_raw_to_id($raw)}
+}
+
+method _key_cache_has_object ($obj) {
+ exists $self->_key_cache->{$self->_object_to_id($obj)}
}
+method _key_cache_get_raw ($raw) {
+ $self->_key_cache_get_id($self->_raw_to_id($raw))
+}
+
+method _key_cache_get_object ($obj) {
+ $self->_key_cache_get_id($self->_object_to_id($obj))
+}
+
+method _key_cache_get_object_spec ($spec) {
+ # see _object_spec_to_id for doc of what the difference is
+ $self->_key_cache_get_id($self->_object_spec_to_id($spec))
+}
+
+method _key_cache_get_id ($id) {
+ exists $self->_key_cache->{$id}
+ ? ($self->_key_cache->{$id})
+ : ()
+}
+
+method _all_key_cache_members {
+ values %{$self->_key_cache}
+}
+
+## observers
+
+has _observer_callbacks => (
+ is => 'ro', default => sub { {} },
+);
+
+method _notify_observers ($event, $payload) {
+ my $oc = $self->_observer_callbacks;
+ foreach my $refaddr (keys %$oc) {
+ my ($obj, $cb) = @{$oc->{$refaddr}};
+ unless (defined $obj) { # weak ref was garbage collected
+ delete $oc->{$refaddr};
+ next;
+ }
+ $obj->$cb($self, $event, $payload);
+ }
+ $payload
+}
+
+method _register_observer ($obj, $cb) {
+ my $entry = [ $obj, $cb ];
+ weaken($entry->[0]);
+ $self->_observer_callbacks->{refaddr($obj)} = $entry;
+ return
+}
+
+method _setup_observation_of ($other) {
+ $other->_register_observer($self, method ($from, $event, $payload) {
+ if ($event eq 'add' or $event eq 'get') {
+ $self->_add_to_caches($payload);
+ } elsif ($event eq 'remove') {
+ $self->_remove_from_caches($payload);
+ } elsif ($event eq 'all_members') {
+ # separate arrayref since future add will trigger push()
+ $self->_set_member_cache([ @$payload ]);
+ }
+ });
+ return
+}
+
+## thunking between the store representation and the set representation
+#
+# _inflate is raw data -> final repr
+# _deflate is final repr -> raw data
+# _merge takes final repr + raw data and updates the repr
+# (this is used for pk-generated values and later lazy loading)
+#
+# _deflate_spec is attributes of final repr -> raw data
+
method _inflate ($raw) {
bless($raw, $self->_class) if $self->_has_class;
- $raw;
+ $raw
+}
+
+method _deflate ($obj) {
+ +{ %$obj }
+}
+
+method _merge ($obj, $raw) {
+ @{$obj}{keys %$raw} = values %$raw;
+ $obj
+}
+
+method _refresh ($obj, $raw) {
+ # if $obj has been changed but not flushed we'd destroy data doing
+ # a blind merge - but if $obj has change tracking of some sort then
+ # we -could- do something safely, so this method exists to be mangled
+ # by subclasses
+ $obj
+}
+
+method _deflate_spec ($spec) {
+ $spec
+}
+
+## methods to get ids
+
+method _raw_to_id ($raw) {
+ # XXX must escape this. or do something else.
+ join ';', map $raw->{$_}, @{$self->_set_over}
+}
+
+method _object_to_id ($obj) {
+ $self->_raw_to_id($self->_deflate($obj))
+}
+
+method _object_spec_to_id ($spec) {
+ # intentionally C&P from _raw_to - this is not the same thing. If a column
+ # were mapped to an attribute of a different name, the raw would have the
+ # column name as a key but an object spec would have the attribute name
+ join ';', map $spec->{$_}, @{$self->_set_over}
+}
+
+## array-ish operations - i.e. get all members
+
+method _new_raw_stream {
+ $self->_store->new_select_command([])->execute
}
method flatten {
Data::Perl::Stream::Array->new(array => $self->_member_cache);
}
+## load single row
+
+method get ($spec) {
+ if (my ($got) = $self->_key_cache_get_object_spec($spec)) {
+ return $got
+ }
+ if (my ($raw) = $self->_get_from_store($self->_deflate_spec($spec))) {
+ return $self->_notify_observers(
+ get => $self->_add_to_key_cache($self->_inflate($raw))
+ );
+ }
+ return undef # we aren't handling cache misses here yet
+}
+
+method _get_from_store ($raw) {
+ $self->_store->new_select_single_command($raw)->execute
+}
+
+## add to set
+
+method add ($new) {
+ $self->_add_to_store($new);
+ $self->_add_to_caches($new);
+ $self->_notify_observers(add => $new);
+ $new
+}
+
+method _add_to_store ($new) {
+ my $new_raw = $self->_deflate($new);
+ $self->_merge($new, $self->_store->new_insert_command($new_raw)->execute);
+ $new
+}
+
+method _add_to_caches ($new) {
+ $self->_add_to_member_cache($new);
+ $self->_add_to_key_cache($new);
+ $new
+}
+
+## remove from set
+
+method remove ($old) {
+ $self->_remove_from_store($old);
+ $self->_remove_from_caches($old);
+ $self->_notify_observers(remove => $old);
+ $old
+}
+
+method _remove_from_store ($old) {
+ $self->_store->new_delete_command($self->_deflate($old))->execute
+}
+
+method _remove_from_caches ($old) {
+ $self->_remove_from_member_cache($old);
+ $self->_remove_from_key_cache($old);
+ $old
+}
+
+## update
+
+method _update_in_store ($obj) {
+ # this is currently a call command but we should think about it
+ # being a row command so that we can have RETURNING or other
+ # mechanisms handle things like set-on-update datetime values
+ $self->_store->new_update_command($self->_deflate($obj))->execute
+}
+
1;