Refactor count handling - use a different code path depending on the complexity of...
[dbsrgits/DBIx-Class-Historic.git] / lib / DBIx / Class / ResultSet.pm
index 7611945..22c7237 100644 (file)
@@ -796,19 +796,16 @@ sub _collapse_query {
   if (ref $query eq 'ARRAY') {
     foreach my $subquery (@$query) {
       next unless ref $subquery;  # -or
-#      warn "ARRAY: " . Dumper $subquery;
       $collapsed = $self->_collapse_query($subquery, $collapsed);
     }
   }
   elsif (ref $query eq 'HASH') {
     if (keys %$query and (keys %$query)[0] eq '-and') {
       foreach my $subquery (@{$query->{-and}}) {
-#        warn "HASH: " . Dumper $subquery;
         $collapsed = $self->_collapse_query($subquery, $collapsed);
       }
     }
     else {
-#      warn "LEAF: " . Dumper $query;
       foreach my $col (keys %$query) {
         my $value = $query->{$col};
         $collapsed->{$col}{$value}++;
@@ -1153,11 +1150,45 @@ on the resultset and counts the results of that.
 
 =cut
 
+my @count_via_subq_attrs = qw/join seen_join group_by/;
 sub count {
   my $self = shift;
   return $self->search(@_)->count if @_ and defined $_[0];
   return scalar @{ $self->get_cache } if $self->get_cache;
-  my $count = $self->_count;
+
+  my @check_attrs = @count_via_subq_attrs;
+
+  # if we are not paged - we are simply asking for a limit
+  if (not $self->{attrs}{page} and not $self->{attrs}{software_limit}) {
+    push @check_attrs, qw/rows offset/;
+  }
+
+  return $self->_has_attr (@check_attrs)
+    ? $self->_count_subq
+    : $self->_count_simple
+}
+
+sub _count_subq {
+  my $self = shift;
+
+  my $attrs = { %{$self->_resolved_attrs} };
+
+  my $select_cols = $attrs->{group_by} || [ map { "$attrs->{alias}.$_" } ($self->result_source->primary_columns) ];
+  $attrs->{from} = [{
+    count_subq => $self->search ({}, { columns => $select_cols, group_by => $select_cols })
+                         ->as_query
+  }];
+
+  # the subquery above will integrate everything, including 'where' and any pagers
+  delete $attrs->{$_} for (@count_via_subq_attrs, qw/where rows offset pager page/ );
+
+  return $self->__count ($attrs);
+}
+
+sub _count_simple {
+  my $self = shift;
+
+  my $count = $self->__count;
   return 0 unless $count;
 
   # need to take offset from resolved attrs
@@ -1169,26 +1200,20 @@ sub count {
   return $count;
 }
 
-sub _count { # Separated out so pager can get the full count
-  my $self = shift;
-  my $attrs = { %{$self->_resolved_attrs} };
+sub __count {
+  my ($self, $attrs) = @_;
 
-  if (my $group_by = $attrs->{group_by}) {
-    delete $attrs->{order_by};
-
-    $attrs->{select} = $group_by; 
-    $attrs->{from} = [ { 'mesub' => (ref $self)->new($self->result_source, $attrs)->cursor->as_query } ];
-    delete $attrs->{where};
-  }
+  $attrs ||= { %{$self->{attrs}} };
 
   $attrs->{select} = { count => '*' };
   $attrs->{as} = [qw/count/];
 
-  # offset, order by, group by, where and page are not needed to count. record_filter is cdbi
-  delete $attrs->{$_} for qw/rows offset order_by group_by page pager record_filter/;
+  # take off any pagers, record_filter is cdbi, and no point of ordering a count
+  delete $attrs->{$_} for qw/rows offset page pager order_by record_filter/;
 
   my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
   my ($count) = $tmp_rs->cursor->next;
+
   return $count;
 }
 
@@ -1315,10 +1340,58 @@ sub _cond_for_update_delete {
   # No-op. No condition, we're updating/deleting everything
   return $cond unless ref $full_cond;
 
-  foreach my $pk ($self->result_source->primary_columns) {
+  # Some attributes when present require a subquery
+  # This might not work on some database (mysql), but...
+  # it won't work without the subquery either so who cares
+  if ($self->_has_attr (qw/join seen_join group_by row offset page/) ) {
+
+    foreach my $pk ($self->result_source->primary_columns) {
       $cond->{$pk} = { -in => $self->get_column($pk)->as_query };
+    }
+
+    return $cond;
   }
 
+  if (ref $full_cond eq 'ARRAY') {
+    $cond = [
+      map {
+        my %hash;
+        foreach my $key (keys %{$_}) {
+          $key =~ /([^.]+)$/;
+          $hash{$1} = $_->{$key};
+        }
+        \%hash;
+      } @{$full_cond}
+    ];
+  }
+  elsif (ref $full_cond eq 'HASH') {
+    if ((keys %{$full_cond})[0] eq '-and') {
+      $cond->{-and} = [];
+      my @cond = @{$full_cond->{-and}};
+       for (my $i = 0; $i < @cond; $i++) {
+        my $entry = $cond[$i];
+        my $hash;
+        if (ref $entry eq 'HASH') {
+          $hash = $self->_cond_for_update_delete($entry);
+        }
+        else {
+          $entry =~ /([^.]+)$/;
+          $hash->{$1} = $cond[++$i];
+        }
+        push @{$cond->{-and}}, $hash;
+      }
+    }
+    else {
+      foreach my $key (keys %{$full_cond}) {
+        $key =~ /([^.]+)$/;
+        $cond->{$1} = $full_cond->{$key};
+      }
+    }
+  }
+  else {
+    $self->throw_exception("Can't update/delete on resultset with condition unless hash or array");
+  }
   return $cond;
 }
 
@@ -1345,7 +1418,7 @@ sub update {
     unless ref $values eq 'HASH';
 
   my $cond = $self->_cond_for_update_delete;
-  
+
   return $self->result_source->storage->update(
     $self->result_source, $values, $cond
   );
@@ -1627,7 +1700,7 @@ sub pager {
     unless $self->{attrs}{page};
   $attrs->{rows} ||= 10;
   return $self->{pager} ||= Data::Page->new(
-    $self->_count, $attrs->{rows}, $self->{attrs}{page});
+    $self->__count, $attrs->{rows}, $self->{attrs}{page});
 }
 
 =head2 page
@@ -1731,6 +1804,37 @@ sub _is_deterministic_value {
   return 0;
 }
 
+# _has_attr
+#
+# determines if the resultset defines at least one
+# of the attributes supplied
+#
+# used to determine if a subquery is neccessary
+
+sub _has_attr {
+  my ($self, @attr_names) = @_;
+
+  my $attrs = $self->_resolved_attrs;
+
+  my $join_check_req;
+
+  for my $n (@attr_names) {
+    return 1 if defined $attrs->{$n};
+    ++$join_check_req if $n =~ /join/;
+  }
+
+  # a join can be expressed as a multi-level from
+  return 1 if (
+    $join_check_req
+      and
+    ref $attrs->{from} eq 'ARRAY'
+      and
+    @{$attrs->{from}} > 1 
+  );
+
+  return 0;
+}
+
 # _collapse_cond
 #
 # Recursively collapse the condition.
@@ -1743,19 +1847,16 @@ sub _collapse_cond {
   if (ref $cond eq 'ARRAY') {
     foreach my $subcond (@$cond) {
       next unless ref $subcond;  # -or
-#      warn "ARRAY: " . Dumper $subcond;
       $collapsed = $self->_collapse_cond($subcond, $collapsed);
     }
   }
   elsif (ref $cond eq 'HASH') {
     if (keys %$cond and (keys %$cond)[0] eq '-and') {
       foreach my $subcond (@{$cond->{-and}}) {
-#        warn "HASH: " . Dumper $subcond;
         $collapsed = $self->_collapse_cond($subcond, $collapsed);
       }
     }
     else {
-#      warn "LEAF: " . Dumper $cond;
       foreach my $col (keys %$cond) {
         my $value = $cond->{$col};
         $collapsed->{$col} = $value;