Merge 'trunk' into 'joined_count'
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
index 00fc70c..079e0e4 100644 (file)
@@ -307,7 +307,7 @@ sub search_rs {
   my $new_attrs = { %{$our_attrs}, %{$attrs} };
 
   # merge new attrs into inherited
-  foreach my $key (qw/join prefetch +select +as/) {
+  foreach my $key (qw/join prefetch +select +as bind/) {
     next unless exists $attrs->{$key};
     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
   }
@@ -1148,19 +1148,47 @@ Performs an SQL C<COUNT> with the same query as the resultset was built
 with to find the number of elements. If passed arguments, does a search
 on the resultset and counts the results of that.
 
-Note: When using C<count> with C<group_by>, L<DBIx::Class> emulates C<GROUP BY>
-using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
-not support C<DISTINCT> with multiple columns. If you are using such a
-database, you should only use columns from the main table in your C<group_by>
-clause.
-
 =cut
 
+my @count_via_subq_attrs = qw/join seen_join group_by/;
 sub count {
   my $self = shift;
   return $self->search(@_)->count if @_ and defined $_[0];
   return scalar @{ $self->get_cache } if $self->get_cache;
-  my $count = $self->_count;
+
+  my @check_attrs = @count_via_subq_attrs;
+
+  # if we are not paged - we are simply asking for a limit
+  if (not $self->{attrs}{page} and not $self->{attrs}{software_limit}) {
+    push @check_attrs, qw/rows offset/;
+  }
+
+  return $self->_has_attr (@check_attrs)
+    ? $self->_count_subq
+    : $self->_count_simple
+}
+
+sub _count_subq {
+  my $self = shift;
+
+  my $attrs = { %{$self->_resolved_attrs} };
+
+  my $select_cols = $attrs->{group_by} || [ map { "$attrs->{alias}.$_" } ($self->result_source->primary_columns) ];
+  $attrs->{from} = [{
+    count_subq => $self->search ({}, { columns => $select_cols, group_by => $select_cols })
+                         ->as_query
+  }];
+
+  # the subquery above will integrate everything, including 'where' and any pagers
+  delete $attrs->{$_} for (@count_via_subq_attrs, qw/where rows offset pager page/ );
+
+  return $self->__count ($attrs);
+}
+
+sub _count_simple {
+  my $self = shift;
+
+  my $count = $self->__count;
   return 0 unless $count;
 
   # need to take offset from resolved attrs
@@ -1172,37 +1200,20 @@ sub count {
   return $count;
 }
 
-sub _count { # Separated out so pager can get the full count
-  my $self = shift;
-  my $select = { count => '*' };
-
-  my $attrs = { %{$self->_resolved_attrs} };
-  if (my $group_by = delete $attrs->{group_by}) {
-    delete $attrs->{having};
-    my @distinct = (ref $group_by ?  @$group_by : ($group_by));
-    # todo: try CONCAT for multi-column pk
-    my @pk = $self->result_source->primary_columns;
-    if (@pk == 1) {
-      my $alias = $attrs->{alias};
-      foreach my $column (@distinct) {
-        if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
-          @distinct = ($column);
-          last;
-        }
-      }
-    }
+sub __count {
+  my ($self, $attrs) = @_;
 
-    $select = { count => { distinct => \@distinct } };
-  }
+  $attrs ||= { %{$self->{attrs}} };
 
-  $attrs->{select} = $select;
+  $attrs->{select} = { count => '*' };
   $attrs->{as} = [qw/count/];
 
-  # offset, order by and page are not needed to count. record_filter is cdbi
-  delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
+  # take off any pagers, record_filter is cdbi, and no point of ordering a count
+  delete $attrs->{$_} for qw/rows offset page pager order_by record_filter/;
 
   my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
   my ($count) = $tmp_rs->cursor->next;
+
   return $count;
 }
 
@@ -1315,6 +1326,18 @@ sub first {
   return $_[0]->reset->next;
 }
 
+
+# _update_delete_via_subq
+#
+# Presence of some rs attributes requires a subquery to reliably 
+# update/deletre
+#
+
+sub _update_delete_via_subq {
+  return $_[0]->_has_attr (qw/join seen_join group_by row offset page/);
+}
+
+
 # _cond_for_update_delete
 #
 # update/delete require the condition to be modified to handle
@@ -1329,8 +1352,44 @@ sub _cond_for_update_delete {
   # No-op. No condition, we're updating/deleting everything
   return $cond unless ref $full_cond;
 
-  foreach my $pk ($self->result_source->primary_columns) {
-      $cond->{$pk} = { -in => $self->get_column($pk)->as_query };
+  if (ref $full_cond eq 'ARRAY') {
+    $cond = [
+      map {
+        my %hash;
+        foreach my $key (keys %{$_}) {
+          $key =~ /([^.]+)$/;
+          $hash{$1} = $_->{$key};
+        }
+        \%hash;
+      } @{$full_cond}
+    ];
+  }
+  elsif (ref $full_cond eq 'HASH') {
+    if ((keys %{$full_cond})[0] eq '-and') {
+      $cond->{-and} = [];
+      my @cond = @{$full_cond->{-and}};
+       for (my $i = 0; $i < @cond; $i++) {
+        my $entry = $cond[$i];
+        my $hash;
+        if (ref $entry eq 'HASH') {
+          $hash = $self->_cond_for_update_delete($entry);
+        }
+        else {
+          $entry =~ /([^.]+)$/;
+          $hash->{$1} = $cond[++$i];
+        }
+        push @{$cond->{-and}}, $hash;
+      }
+    }
+    else {
+      foreach my $key (keys %{$full_cond}) {
+        $key =~ /([^.]+)$/;
+        $cond->{$1} = $full_cond->{$key};
+      }
+    }
+  }
+  else {
+    $self->throw_exception("Can't update/delete on resultset with condition unless hash or array");
   }
 
   return $cond;
@@ -1355,11 +1414,16 @@ if no records were updated; exact type of success value is storage-dependent.
 
 sub update {
   my ($self, $values) = @_;
-  $self->throw_exception("Values for update must be a hash")
+  $self->throw_exception('Values for update must be a hash')
     unless ref $values eq 'HASH';
 
+  # rs operations with subqueries are Storage dependent - delegate
+  if ($self->_update_delete_via_subq) {
+    return $self->result_source->storage->subq_update_delete($self, 'update', $values);
+  }
+
   my $cond = $self->_cond_for_update_delete;
-  
+
   return $self->result_source->storage->update(
     $self->result_source, $values, $cond
   );
@@ -1382,7 +1446,7 @@ will run DBIC cascade triggers, while L</update> will not.
 
 sub update_all {
   my ($self, $values) = @_;
-  $self->throw_exception("Values for update must be a hash")
+  $self->throw_exception('Values for update_all must be a hash')
     unless ref $values eq 'HASH';
   foreach my $obj ($self->all) {
     $obj->set_columns($values)->update;
@@ -1413,9 +1477,14 @@ need to respecify your query in a way that can be expressed without a join.
 =cut
 
 sub delete {
-  my ($self) = @_;
-  $self->throw_exception("Delete should not be passed any arguments")
-    if $_[1];
+  my $self = shift;
+  $self->throw_exception('delete does not accept any arguments')
+    if @_;
+
+  # rs operations with subqueries are Storage dependent - delegate
+  if ($self->_update_delete_via_subq) {
+    return $self->result_source->storage->subq_update_delete($self, 'delete');
+  }
 
   my $cond = $self->_cond_for_update_delete;
 
@@ -1439,7 +1508,10 @@ will run DBIC cascade triggers, while L</delete> will not.
 =cut
 
 sub delete_all {
-  my ($self) = @_;
+  my $self = shift;
+  $self->throw_exception('delete_all does not accept any arguments')
+    if @_;
+
   $_->delete for $self->all;
   return 1;
 }
@@ -1641,7 +1713,7 @@ sub pager {
     unless $self->{attrs}{page};
   $attrs->{rows} ||= 10;
   return $self->{pager} ||= Data::Page->new(
-    $self->_count, $attrs->{rows}, $self->{attrs}{page});
+    $self->__count, $attrs->{rows}, $self->{attrs}{page});
 }
 
 =head2 page
@@ -1745,6 +1817,37 @@ sub _is_deterministic_value {
   return 0;
 }
 
+# _has_attr
+#
+# determines if the resultset defines at least one
+# of the attributes supplied
+#
+# used to determine if a subquery is neccessary
+
+sub _has_attr {
+  my ($self, @attr_names) = @_;
+
+  my $attrs = $self->_resolved_attrs;
+
+  my $join_check_req;
+
+  for my $n (@attr_names) {
+    return 1 if defined $attrs->{$n};
+    ++$join_check_req if $n =~ /join/;
+  }
+
+  # a join can be expressed as a multi-level from
+  return 1 if (
+    $join_check_req
+      and
+    ref $attrs->{from} eq 'ARRAY'
+      and
+    @{$attrs->{from}} > 1 
+  );
+
+  return 0;
+}
+
 # _collapse_cond
 #
 # Recursively collapse the condition.