This seems to be the prefetch+limit fix - ugly as hell but appears to work
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
index fcbd2ef..2160cf0 100644 (file)
@@ -46,27 +46,13 @@ A new ResultSet is returned from calling L</search> on an existing
 ResultSet. The new one will contain all the conditions of the
 original, plus any new conditions added in the C<search> call.
 
-A ResultSet is also an iterator. L</next> is used to return all the
-L<DBIx::Class::Row>s the ResultSet represents.
+A ResultSet also incorporates an implicit iterator. L</next> and L</reset>
+can be used to walk through all the L<DBIx::Class::Row>s the ResultSet
+represents.
 
 The query that the ResultSet represents is B<only> executed against
 the database when these methods are called:
-
-=over
-
-=item L</find>
-
-=item L</next>
-
-=item L</all>
-
-=item L</count>
-
-=item L</single>
-
-=item L</first>
-
-=back
+L</find> L</next> L</all> L</first> L</single> L</count>
 
 =head1 EXAMPLES
 
@@ -674,7 +660,8 @@ L<DBIx::Class::Cursor> for more information.
 sub cursor {
   my ($self) = @_;
 
-  my $attrs = { %{$self->_resolved_attrs} };
+  my $attrs = $self->_resolved_attrs_copy;
+
   return $self->{cursor}
     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
           $attrs->{where},$attrs);
@@ -725,7 +712,8 @@ sub single {
       $self->throw_exception('single() only takes search conditions, no attributes. You want ->search( $cond, $attrs )->single()');
   }
 
-  my $attrs = { %{$self->_resolved_attrs} };
+  my $attrs = $self->_resolved_attrs_copy;
+
   if ($where) {
     if (defined $attrs->{where}) {
       $attrs->{where} = {
@@ -752,6 +740,7 @@ sub single {
   return (@data ? ($self->_construct_object(@data))[0] : undef);
 }
 
+
 # _is_unique_query
 #
 # Try to determine if the specified query is guaranteed to be unique, based on
@@ -870,10 +859,10 @@ instead. An example conversion is:
 
 sub search_like {
   my $class = shift;
-  carp join ("\n",
-    'search_like() is deprecated and will be removed in 0.09.',
-    'Instead use ->search({ x => { -like => "y%" } })',
-    '(note the outer pair of {}s - they are important!)'
+  carp (
+    'search_like() is deprecated and will be removed in DBIC version 0.09.'
+   .' Instead use ->search({ x => { -like => "y%" } })'
+   .' (note the outer pair of {}s - they are important!)'
   );
   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
@@ -1145,83 +1134,25 @@ sub result_class {
 =back
 
 Performs an SQL C<COUNT> with the same query as the resultset was built
-with to find the number of elements. If passed arguments, does a search
-on the resultset and counts the results of that.
+with to find the number of elements. Passing arguments is equivalent to
+C<< $rs->search ($cond, \%attrs)->count >>
 
 =cut
 
-my @count_via_subq_attrs = qw/join seen_join prefetch group_by having/;
 sub count {
   my $self = shift;
   return $self->search(@_)->count if @_ and defined $_[0];
   return scalar @{ $self->get_cache } if $self->get_cache;
 
-  my @check_attrs = @count_via_subq_attrs;
-
-  # if we are not paged - we are simply asking for a limit
-  if (not $self->{attrs}{page} and not $self->{attrs}{software_limit}) {
-    push @check_attrs, qw/rows offset/;
-  }
-
-  return $self->_has_attr (@check_attrs)
-    ? $self->_count_subq
-    : $self->_count_simple
-}
-
-sub _count_subq {
-  my $self = shift;
-
-  my $attrs = { %{$self->_resolved_attrs} };
-
-  # copy for the subquery, we need to do some adjustments to it too
-  my $sub_attrs = { %$attrs };
-
-  # these can not go in the subquery either
-  delete $sub_attrs->{$_} for qw/prefetch select +select as +as columns +columns/;
-
-  # force a group_by and the same set of columns (most databases require this)
-  $sub_attrs->{columns} = $sub_attrs->{group_by} ||= [ map { "$attrs->{alias}.$_" } ($self->result_source->primary_columns) ];
-
-  $attrs->{from} = [{
-    count_subq => (ref $self)->new ($self->result_source, $sub_attrs )->as_query
-  }];
+  my $meth = $self->_has_resolved_attr (qw/collapse group_by/)
+    ? 'count_grouped'
+    : 'count'
+  ;
 
-  # the subquery replaces this
-  delete $attrs->{where};
+  my $attrs = $self->_resolved_attrs_copy;
+  my $rsrc = $self->result_source;
 
-  return $self->__count ($attrs);
-}
-
-sub _count_simple {
-  my $self = shift;
-
-  my $count = $self->__count;
-  return 0 unless $count;
-
-  # need to take offset from resolved attrs
-
-  $count -= $self->{_attrs}{offset} if $self->{_attrs}{offset};
-  $count = $self->{attrs}{rows} if
-    $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
-  $count = 0 if ($count < 0);
-  return $count;
-}
-
-sub __count {
-  my ($self, $attrs) = @_;
-
-  $attrs ||= { %{$self->{attrs}} };
-
-  # these are the only attributes that actually matter for count
-  $attrs = { map { exists $attrs->{$_} ? ( $_ => $attrs->{$_} ) : () } qw/where bind alias from from_bind/ };
-
-  $attrs->{select} = { count => '*' };
-  $attrs->{as} = [qw/count/];
-
-  my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
-  my ($count) = $tmp_rs->cursor->next;
-
-  return $count;
+  return $rsrc->storage->$meth ($rsrc, $attrs);
 }
 
 sub _bool {
@@ -1334,14 +1265,68 @@ sub first {
 }
 
 
-# _update_delete_via_subq
-#
-# Presence of some rs attributes requires a subquery to reliably
-# update/deletre
+# _rs_update_delete
 #
+# Determines whether and what type of subquery is required for the $rs operation.
+# If grouping is necessary either supplies its own, or verifies the current one
+# After all is done delegates to the proper storage method.
+
+sub _rs_update_delete {
+  my ($self, $op, $values) = @_;
+
+  my $rsrc = $self->result_source;
+
+  my $needs_group_by_subq = $self->_has_resolved_attr (qw/collapse group_by -join/);
+  my $needs_subq = $self->_has_resolved_attr (qw/row offset/);
+
+  if ($needs_group_by_subq or $needs_subq) {
+
+    # make a new $rs selecting only the PKs (that's all we really need)
+    my $attrs = $self->_resolved_attrs_copy;
+
+    delete $attrs->{$_} for qw/collapse select as/;
+    $attrs->{columns} = [ map { "$attrs->{alias}.$_" } ($self->result_source->primary_columns) ];
+
+    if ($needs_group_by_subq) {
+      # make sure no group_by was supplied, or if there is one - make sure it matches
+      # the columns compiled above perfectly. Anything else can not be sanely executed
+      # on most databases so croak right then and there
+
+      if (my $g = $attrs->{group_by}) {
+        my @current_group_by = map
+          { $_ =~ /\./ ? $_ : "$attrs->{alias}.$_" }
+          (ref $g eq 'ARRAY' ? @$g : $g );
+
+        if (
+          join ("\x00", sort @current_group_by)
+            ne
+          join ("\x00", sort @{$attrs->{columns}} )
+        ) {
+          $self->throw_exception (
+            "You have just attempted a $op operation on a resultset which does group_by"
+            . ' on columns other than the primary keys, while DBIC internally needs to retrieve'
+            . ' the primary keys in a subselect. All sane RDBMS engines do not support this'
+            . ' kind of queries. Please retry the operation with a modified group_by or'
+            . ' without using one at all.'
+          );
+        }
+      }
+      else {
+        $attrs->{group_by} = $attrs->{columns};
+      }
+    }
+
+    my $subrs = (ref $self)->new($rsrc, $attrs);
 
-sub _update_delete_via_subq {
-  return $_[0]->_has_attr (qw/join seen_join group_by row offset page/);
+    return $self->result_source->storage->_subq_update_delete($subrs, $op, $values);
+  }
+  else {
+    return $rsrc->storage->$op(
+      $rsrc,
+      $op eq 'update' ? $values : (),
+      $self->_cond_for_update_delete,
+    );
+  }
 }
 
 
@@ -1424,16 +1409,7 @@ sub update {
   $self->throw_exception('Values for update must be a hash')
     unless ref $values eq 'HASH';
 
-  # rs operations with subqueries are Storage dependent - delegate
-  if ($self->_update_delete_via_subq) {
-    return $self->result_source->storage->subq_update_delete($self, 'update', $values);
-  }
-
-  my $cond = $self->_cond_for_update_delete;
-
-  return $self->result_source->storage->update(
-    $self->result_source, $values, $cond
-  );
+  return $self->_rs_update_delete ('update', $values);
 }
 
 =head2 update_all
@@ -1467,7 +1443,7 @@ sub update_all {
 
 =item Arguments: none
 
-=item Return Value: 1
+=item Return Value: $storage_rv
 
 =back
 
@@ -1475,11 +1451,8 @@ Deletes the contents of the resultset from its result source. Note that this
 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
 to run. See also L<DBIx::Class::Row/delete>.
 
-delete may not generate correct SQL for a query with joins or a resultset
-chained from a related resultset.  In this case it will generate a warning:-
-
-In these cases you may find that delete_all is more appropriate, or you
-need to respecify your query in a way that can be expressed without a join.
+Return value will be the amount of rows deleted; exact type of return value
+is storage-dependent.
 
 =cut
 
@@ -1488,15 +1461,7 @@ sub delete {
   $self->throw_exception('delete does not accept any arguments')
     if @_;
 
-  # rs operations with subqueries are Storage dependent - delegate
-  if ($self->_update_delete_via_subq) {
-    return $self->result_source->storage->subq_update_delete($self, 'delete');
-  }
-
-  my $cond = $self->_cond_for_update_delete;
-
-  $self->result_source->storage->delete($self->result_source, $cond);
-  return 1;
+  return $self->_rs_update_delete ('delete');
 }
 
 =head2 delete_all
@@ -1616,13 +1581,19 @@ sub populate {
 
     ## do the belongs_to relationships
     foreach my $index (0..$#$data) {
-      if( grep { !defined $data->[$index]->{$_} } @pks ) {
-        my @ret = $self->populate($data);
-        return;
+
+      # delegate to create() for any dataset without primary keys with specified relationships
+      if (grep { !defined $data->[$index]->{$_} } @pks ) {
+        for my $r (@rels) {
+          if (grep { ref $data->[$index]{$r} eq $_ } qw/HASH ARRAY/) {  # a related set must be a HASH or AoH
+            my @ret = $self->populate($data);
+            return;
+          }
+        }
       }
 
       foreach my $rel (@rels) {
-        next unless $data->[$index]->{$rel} && ref $data->[$index]->{$rel} eq "HASH";
+        next unless ref $data->[$index]->{$rel} eq "HASH";
         my $result = $self->related_resultset($rel)->create($data->[$index]->{$rel});
         my ($reverse) = keys %{$self->result_source->reverse_relationship_info($rel)};
         my $related = $result->result_source->_resolve_condition(
@@ -1837,14 +1808,14 @@ sub _is_deterministic_value {
   return 0;
 }
 
-# _has_attr
+# _has_resolved_attr
 #
 # determines if the resultset defines at least one
 # of the attributes supplied
 #
 # used to determine if a subquery is neccessary
 
-sub _has_attr {
+sub _has_resolved_attr {
   my ($self, @attr_names) = @_;
 
   my $attrs = $self->_resolved_attrs;
@@ -1852,11 +1823,24 @@ sub _has_attr {
   my $join_check_req;
 
   for my $n (@attr_names) {
-    return 1 if defined $attrs->{$n};
-    ++$join_check_req if $n =~ /join/;
+    ++$join_check_req if $n eq '-join';
+
+    my $attr =  $attrs->{$n};
+
+    next if not defined $attr;
+
+    if (ref $attr eq 'HASH') {
+      return 1 if keys %$attr;
+    }
+    elsif (ref $attr eq 'ARRAY') {
+      return 1 if @$attr;
+    }
+    else {
+      return 1 if $attr;
+    }
   }
 
-  # a join can be expressed as a multi-level from
+  # a resolved join is expressed as a multi-level from
   return 1 if (
     $join_check_req
       and
@@ -1941,7 +1925,22 @@ B<NOTE>: This feature is still experimental.
 
 =cut
 
-sub as_query { return shift->cursor->as_query(@_) }
+sub as_query {
+  my $self = shift;
+
+  my $attrs = $self->_resolved_attrs_copy;
+
+  # For future use:
+  #
+  # in list ctx:
+  # my ($sql, \@bind, \%dbi_bind_attrs) = _select_args_to_query (...)
+  # $sql also has no wrapping parenthesis in list ctx
+  #
+  my $sqlbind = $self->result_source->storage
+    ->_select_args_to_query ($attrs->{from}, $attrs->{select}, $attrs->{where}, $attrs);
+
+  return $sqlbind;
+}
 
 =head2 find_or_new
 
@@ -1982,8 +1981,10 @@ sub find_or_new {
   my $self     = shift;
   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
-  my $exists   = $self->find($hash, $attrs);
-  return defined $exists ? $exists : $self->new_result($hash);
+  if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
+    return $row;
+  }
+  return $self->new_result($hash);
 }
 
 =head2 create
@@ -2113,8 +2114,10 @@ sub find_or_create {
   my $self     = shift;
   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
-  my $exists   = $self->find($hash, $attrs);
-  return defined $exists ? $exists : $self->create($hash);
+  if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
+    return $row;
+  }
+  return $self->create($hash);
 }
 
 =head2 update_or_create
@@ -2439,10 +2442,17 @@ sub _resolve_from {
   my $source = $self->result_source;
   my $attrs = $self->{attrs};
 
-  my $from = $attrs->{from}
-    || [ { $attrs->{alias} => $source->from } ];
+  my $from = [ @{
+      $attrs->{from}
+        ||
+      [{
+        -source_handle => $source->handle,
+        -alias => $attrs->{alias},
+        $attrs->{alias} => $source->from,
+      }]
+  }];
 
-  my $seen = { %{$attrs->{seen_join}||{}} };
+  my $seen = { %{$attrs->{seen_join} || {} } };
 
   # we need to take the prefetch the attrs into account before we
   # ->_resolve_join as otherwise they get lost - captainL
@@ -2459,6 +2469,12 @@ sub _resolve_from {
   return ($from,$seen);
 }
 
+# too many times we have to do $attrs = { %{$self->_resolved_attrs} }
+sub _resolved_attrs_copy {
+  my $self = shift;
+  return { %{$self->_resolved_attrs (@_)} };
+}
+
 sub _resolved_attrs {
   my $self = shift;
   return $self->{_attrs} if $self->{_attrs};
@@ -2539,7 +2555,11 @@ sub _resolved_attrs {
     push( @{ $attrs->{as} }, @$adds );
   }
 
-  $attrs->{from} ||= [ { $self->{attrs}{alias} => $source->from } ];
+  $attrs->{from} ||= [ {
+    -source_handle => $source->handle,
+    -alias => $self->{attrs}{alias},
+    $self->{attrs}{alias} => $source->from,
+  } ];
 
   if ( exists $attrs->{join} || exists $attrs->{prefetch} ) {
     my $join = delete $attrs->{join} || {};
@@ -2559,8 +2579,6 @@ sub _resolved_attrs {
 
   }
 
-  $attrs->{group_by} ||= $attrs->{select}
-    if delete $attrs->{distinct};
   if ( $attrs->{order_by} ) {
     $attrs->{order_by} = (
       ref( $attrs->{order_by} ) eq 'ARRAY'
@@ -2572,25 +2590,43 @@ sub _resolved_attrs {
     $attrs->{order_by} = [];
   }
 
-  my $collapse = $attrs->{collapse} || {};
+  # If the order_by is otherwise empty - we will use this for TOP limit
+  # emulation and the like.
+  # Although this is needed only if the order_by is not defined, it is
+  # actually cheaper to just populate this rather than properly examining
+  # order_by (stuf like [ {} ] and the like)
+  $attrs->{_virtual_order_by} = [ $self->result_source->primary_columns ];
+
+
+  $attrs->{collapse} ||= {};
   if ( my $prefetch = delete $attrs->{prefetch} ) {
     $prefetch = $self->_merge_attr( {}, $prefetch );
-    my @pre_order;
-    foreach my $p ( ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch) ) {
-
-      # bring joins back to level of current class
-      my $join_map = $self->_joinpath_aliases ($attrs->{from}, $attrs->{seen_join});
-      my @prefetch =
-        $source->_resolve_prefetch( $p, $alias, $join_map, \@pre_order, $collapse );
-      push( @{ $attrs->{select} }, map { $_->[0] } @prefetch );
-      push( @{ $attrs->{as} },     map { $_->[1] } @prefetch );
-    }
-    push( @{ $attrs->{order_by} }, @pre_order );
+
+    my $prefetch_ordering = [];
+
+    my $join_map = $self->_joinpath_aliases ($attrs->{from}, $attrs->{seen_join});
+
+    my @prefetch =
+      $source->_resolve_prefetch( $prefetch, $alias, $join_map, $prefetch_ordering, $attrs->{collapse} );
+
+    push( @{ $attrs->{select} }, map { $_->[0] } @prefetch );
+    push( @{ $attrs->{as} },     map { $_->[1] } @prefetch );
+
+    push( @{ $attrs->{order_by} }, @$prefetch_ordering );
+    $attrs->{_collapse_order_by} = \@$prefetch_ordering;
   }
-  $attrs->{collapse} = $collapse;
 
-  if ( $attrs->{page} and not defined $attrs->{offset} ) {
-    $attrs->{offset} = ( $attrs->{rows} * ( $attrs->{page} - 1 ) );
+
+  if (delete $attrs->{distinct}) {
+    $attrs->{group_by} ||= [ grep { !ref($_) || (ref($_) ne 'HASH') } @{$attrs->{select}} ];
+  }
+
+  # if both page and offset are specified, produce a combined offset
+  # even though it doesn't make much sense, this is what pre 081xx has
+  # been doing
+  if (my $page = delete $attrs->{page}) {
+    $attrs->{offset} = ($attrs->{rows} * ($page - 1)) +
+      ($attrs->{offset} || 0);
   }
 
   return $self->{_attrs} = $attrs;
@@ -2609,7 +2645,7 @@ sub _joinpath_aliases {
 
     my $p = $paths;
     $p = $p->{$_} ||= {} for @{$j->[0]{-join_path}};
-    push @{$p->{-join_aliases} }, $j->[0]{-join_alias};
+    push @{$p->{-join_aliases} }, $j->[0]{-alias};
   }
 
   return $paths;
@@ -3262,9 +3298,21 @@ with a father in the person table, we could explicitly use C<INNER JOIN>:
     # SELECT child.* FROM person child
     # INNER JOIN person father ON child.father_id = father.id
 
-If you need to express really complex joins or you need a subselect, you
+You can select from a subquery by passing a resultset to from as follows.
+
+    $schema->resultset('Artist')->search( 
+        undef, 
+        {   alias => 'artist2',
+            from  => [ { artist2 => $artist_rs->as_query } ],
+        } );
+
+    # and you'll get sql like this..
+    # SELECT artist2.artistid, artist2.name, artist2.rank, artist2.charfield FROM 
+    #   ( SELECT me.artistid, me.name, me.rank, me.charfield FROM artists me ) artist2
+
+If you need to express really complex joins, you
 can supply literal SQL to C<from> via a scalar reference. In this case
-the contents of the scalar will replace the table name asscoiated with the
+the contents of the scalar will replace the table name associated with the
 resultsource.
 
 WARNING: This technique might very well not work as expected on chained