Merge branch 'current/for_cpan_index' into current/dq
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
index 89ec655..d6c5e9b 100644 (file)
@@ -6,9 +6,11 @@ use base qw/DBIx::Class/;
 use DBIx::Class::Carp;
 use DBIx::Class::ResultSetColumn;
 use Scalar::Util qw/blessed weaken reftype/;
+use DBIx::Class::_Util 'fail_on_internal_wantarray';
 use Try::Tiny;
-use Data::Compare (); # no imports!!! guard against insane architecture
-
+use Data::Dumper::Concise ();
+use Data::Query::Constants;
+use Data::Query::ExprHelpers;
 # not importing first() as it will clash with our own method
 use List::Util ();
 
@@ -141,8 +143,8 @@ another.
 
 =head3 Resolving conditions and attributes
 
-When a resultset is chained from another resultset (ie:
-C<my $new_rs = $old_rs->search(\%extra_cond, \%attrs)>), conditions
+When a resultset is chained from another resultset (e.g.:
+C<< my $new_rs = $old_rs->search(\%extra_cond, \%attrs) >>), conditions
 and attributes with the same keys need resolving.
 
 If any of L</columns>, L</select>, L</as> are present, they reset the
@@ -244,7 +246,9 @@ sub new {
   my ($source, $attrs) = @_;
   $source = $source->resolve
     if $source->isa('DBIx::Class::ResultSourceHandle');
+
   $attrs = { %{$attrs||{}} };
+  delete @{$attrs}{qw(_last_sqlmaker_alias_map _related_results_construction)};
 
   if ($attrs->{page}) {
     $attrs->{rows} ||= 10;
@@ -302,8 +306,8 @@ call it as C<search(undef, \%attrs)>.
 
 For a list of attributes that can be passed to C<search>, see
 L</ATTRIBUTES>. For more examples of using this function, see
-L<Searching|DBIx::Class::Manual::Cookbook/Searching>. For a complete
-documentation for the first argument, see L<SQL::Abstract>
+L<Searching|DBIx::Class::Manual::Cookbook/SEARCHING>. For a complete
+documentation for the first argument, see L<SQL::Abstract/"WHERE CLAUSES">
 and its extension L<DBIx::Class::SQLMaker>.
 
 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
@@ -325,6 +329,7 @@ sub search {
   my $rs = $self->search_rs( @_ );
 
   if (wantarray) {
+    DBIx::Class::_ENV_::ASSERT_NO_INTERNAL_WANTARRAY and my $sog = fail_on_internal_wantarray($rs);
     return $rs->all;
   }
   elsif (defined wantarray) {
@@ -393,6 +398,10 @@ sub search_rs {
     $call_cond = { @_ };
   }
 
+  if (blessed($call_cond) and $call_cond->isa('Data::Query::ExprBuilder')) {
+    $call_cond = \$call_cond->{expr};
+  }
+
   # see if we can keep the cache (no $rs changes)
   my $cache;
   my %safe = (alias => 1, cache => 1);
@@ -404,11 +413,22 @@ sub search_rs {
     ref $call_cond eq 'ARRAY' && ! @$call_cond
   )) {
     $cache = $self->get_cache;
+  } elsif (
+    $self->{attrs}{cache} and
+    ($self->{attrs}{grep_cache} or $call_attrs->{grep_cache})
+  ) {
+    if (
+      keys %$call_attrs
+      and not (exists $call_attrs->{grep_cache} and !$call_attrs->{grep_cache})
+    ) {
+      die "Can't do complex search on resultset with grep_cache set";
+    }
+    my $grep_one = $self->_construct_perl_predicate($call_cond);
+    $cache = [ grep $grep_one->($_), $self->all ];
   }
 
   my $old_attrs = { %{$self->{attrs}} };
-  my $old_having = delete $old_attrs->{having};
-  my $old_where = delete $old_attrs->{where};
+  my ($old_having, $old_where) = delete @{$old_attrs}{qw(having where)};
 
   my $new_attrs = { %$old_attrs };
 
@@ -582,60 +602,104 @@ sub _normalize_selection {
 sub _stack_cond {
   my ($self, $left, $right) = @_;
 
-  # collapse single element top-level conditions
-  # (single pass only, unlikely to need recursion)
-  for ($left, $right) {
-    if (ref $_ eq 'ARRAY') {
-      if (@$_ == 0) {
-        $_ = undef;
-      }
-      elsif (@$_ == 1) {
-        $_ = $_->[0];
-      }
-    }
-    elsif (ref $_ eq 'HASH') {
-      my ($first, $more) = keys %$_;
+  my $source = $self->result_source;
 
-      # empty hash
-      if (! defined $first) {
-        $_ = undef;
-      }
-      # one element hash
-      elsif (! defined $more) {
-        if ($first eq '-and' and ref $_->{'-and'} eq 'HASH') {
-          $_ = $_->{'-and'};
-        }
-        elsif ($first eq '-or' and ref $_->{'-or'} eq 'ARRAY') {
-          $_ = $_->{'-or'};
-        }
-      }
-    }
-  }
+  my $converter = $source->schema->storage->sql_maker->converter;
 
-  # merge hashes with weeding out of duplicates (simple cases only)
-  if (ref $left eq 'HASH' and ref $right eq 'HASH') {
+  my @top = map $source->_extract_top_level_conditions(
+    $converter->_expr_to_dq($_)
+  ), grep defined, $left, $right;
 
-    # shallow copy to destroy
-    $right = { %$right };
-    for (grep { exists $right->{$_} } keys %$left) {
-      # the use of eq_deeply here is justified - the rhs of an
-      # expression can contain a lot of twisted weird stuff
-      delete $right->{$_} if Data::Compare::Compare( $left->{$_}, $right->{$_} );
-    }
+  return undef unless @top;
 
-    $right = undef unless keys %$right;
-  }
+  my %seen;
 
+  my @uniq = grep { !$seen{Data::Dumper::Concise::Dumper($_)}++ } @top;
 
-  if (defined $left xor defined $right) {
-    return defined $left ? $left : $right;
-  }
-  elsif (! defined $left) {
-    return undef;
-  }
-  else {
-    return { -and => [ $left, $right ] };
+  return \$uniq[0] if @uniq == 1;
+
+  return \Operator({ 'SQL.Naive' => 'AND' }, \@uniq);
+}
+
+my %perl_op_map = (
+  '=' => { numeric => '==', string => 'eq' },
+);
+
+sub _construct_perl_predicate {
+  my ($self, $cond) = @_;
+
+  # This shouldn't really live here but it'll do for the moment.
+
+  my %alias_map = (
+    $self->current_source_alias => {
+      join_path => [],
+      source => $self->result_source,
+      columns_info => $self->result_source->columns_info,
+    },
+  );
+
+  my $attrs = $self->_resolved_attrs;
+  foreach my $j ( @{$attrs->{from}}[1 .. $#{$attrs->{from}} ] ) {
+    next unless $j->[0]{-alias};
+    next unless $j->[0]{-join_path};
+    $alias_map{$j->[0]{-alias}} = {
+      join_path => [ map { keys %$_ } @{$j->[0]{-join_path}} ],
+      source => $j->[0]{-rsrc},
+      columns_info => $j->[0]{-rsrc}->columns_info,
+    };
   }
+
+  my %as_map = map +($attrs->{select}[$_] => $attrs->{as}[$_]),
+                 grep !ref($attrs->{select}[$_]), 0..$#{$attrs->{select}};
+
+  my $storage = $self->result_source->schema->storage;
+  my $sql_maker = $storage->sql_maker;
+  my $tree = map_dq_tree {
+    if (is_Operator) {
+      my $op = $_->{operator}{'SQL.Naive'} or die "No operator";
+      if (lc($op) =~ /^(?:and|or|not)$/i) {
+        return Operator({ 'Perl' => lc($op) }, $op->{args});
+      }
+      if (my $op_map = $perl_op_map{$op}) {
+        die "Binop doesn't have two args - wtf?"
+          unless @{$_->{args}} == 2;
+        my $data_type;
+        my @mapped_args = map {
+          if (is_Identifier) {
+            die "Identifier not alias.colname"
+              unless @{$_->{elements}} == 2;
+            my ($alias, $col) = @{$_->{elements}};
+            die "${alias}.${col} not selected"
+              unless $as_map{"${alias}.${col}"};
+            unless ($data_type) {
+              my $colinfo = $alias_map{$alias}{columns_info}{$col};
+              unless (defined $colinfo->{is_numeric}) {
+                $colinfo->{is_numeric} = (
+                  $storage->is_datatype_numeric($colinfo->{data_type})
+                    ? 1
+                    : 0
+                );
+              }
+              $data_type = $colinfo->{is_numeric} ? 'numeric' : 'string';
+            }
+            Identifier(@{$alias_map{$alias}{join_path}}, $col);
+          } elsif (is_Value) {
+            $_;
+          } else {
+            die "Argument to operator neither identifier nor value";
+          }
+        } @{$_->{args}};
+        die "Couldn't determine numeric versus string" unless $data_type;
+        return \Operator({ Perl => $op_map->{$data_type} }, \@mapped_args);
+      }
+    }
+    die "Unable to map node to perl";
+  } $sql_maker->converter->_where_to_dq($cond);
+  my ($code, @values) = @{$storage->perl_renderer->render($tree)};
+  my $sub = eval q!sub { !.$code.q! }!
+    or die "Failed to build sub: $@";
+  my @args = map $_->{value}, @values;
+  return sub { local $_ = $_[0]; $sub->(@args) };
 }
 
 =head2 search_literal
@@ -645,7 +709,7 @@ should only be used in that context. C<search_literal> is a convenience
 method. It is equivalent to calling C<< $schema->search(\[]) >>, but if you
 want to ensure columns are bound correctly, use L</search>.
 
-See L<DBIx::Class::Manual::Cookbook/Searching> and
+See L<DBIx::Class::Manual::Cookbook/SEARCHING> and
 L<DBIx::Class::Manual::FAQ/Searching> for searching techniques that do not
 require C<search_literal>.
 
@@ -1003,7 +1067,7 @@ sub cursor {
   my $self = shift;
 
   return $self->{cursor} ||= do {
-    my $attrs = { %{$self->_resolved_attrs } };
+    my $attrs = $self->_resolved_attrs;
     $self->result_source->storage->select(
       $attrs->{from}, $attrs->{select}, $attrs->{where}, $attrs
     );
@@ -1062,7 +1126,7 @@ sub single {
   my $attrs = { %{$self->_resolved_attrs} };
 
   $self->throw_exception(
-    'single() can not be used on resultsets prefetching has_many. Use find( \%cond ) or next() instead'
+    'single() can not be used on resultsets collapsing a has_many. Use find( \%cond ) or next() instead'
   ) if $attrs->{collapse};
 
   if ($where) {
@@ -1081,6 +1145,7 @@ sub single {
     $attrs->{from}, $attrs->{select},
     $attrs->{where}, $attrs
   )];
+
   return undef unless @$data;
   $self->{_stashed_rows} = [ $data ];
   $self->_construct_results->[0];
@@ -1209,8 +1274,6 @@ sub slice {
   $attrs->{offset} += $min;
   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
   return $self->search(undef, $attrs);
-  #my $slice = (ref $self)->new($self->result_source, $attrs);
-  #return (wantarray ? $slice->all : $slice);
 }
 
 =head2 next
@@ -1297,49 +1360,33 @@ sub _construct_results {
     $attrs->{_order_is_artificial} = 1;
   }
 
-  my $cursor = $self->cursor;
-
   # this will be used as both initial raw-row collector AND as a RV of
   # _construct_results. Not regrowing the array twice matters a lot...
   # a surprising amount actually
   my $rows = delete $self->{_stashed_rows};
 
+  my $cursor; # we may not need one at all
+
   my $did_fetch_all = $fetch_all;
 
   if ($fetch_all) {
     # FIXME SUBOPTIMAL - we can do better, cursor->next/all (well diff. methods) should return a ref
-    $rows = [ ($rows ? @$rows : ()), $cursor->all ];
+    $rows = [ ($rows ? @$rows : ()), $self->cursor->all ];
   }
   elsif( $attrs->{collapse} ) {
 
-    $attrs->{_ordered_for_collapse} = (!$attrs->{order_by}) ? 0 : do {
-      my $st = $rsrc->schema->storage;
-      my @ord_cols = map
-        { $_->[0] }
-        ( $st->_extract_order_criteria($attrs->{order_by}) )
-      ;
-
-      my $colinfos = $st->_resolve_column_info($attrs->{from}, \@ord_cols);
+    # a cursor will need to be closed over in case of collapse
+    $cursor = $self->cursor;
 
-      for (0 .. $#ord_cols) {
-        if (
-          ! $colinfos->{$ord_cols[$_]}
-            or
-          $colinfos->{$ord_cols[$_]}{-result_source} != $rsrc
-        ) {
-          splice @ord_cols, $_;
-          last;
-        }
-      }
-
-      # since all we check here are the start of the order_by belonging to the
-      # top level $rsrc, a present identifying set will mean that the resultset
-      # is ordered by its leftmost table in a tsable manner
-      (@ord_cols and $rsrc->_identifying_column_set({ map
-        { $colinfos->{$_}{-colname} => $colinfos->{$_} }
-        @ord_cols
-      })) ? 1 : 0;
-    } unless defined $attrs->{_ordered_for_collapse};
+    $attrs->{_ordered_for_collapse} = (
+      (
+        $attrs->{order_by}
+          and
+        $rsrc->schema
+              ->storage
+               ->_main_source_order_by_portion_is_stable($rsrc, $attrs->{order_by}, $attrs->{where})
+      ) ? 1 : 0
+    ) unless defined $attrs->{_ordered_for_collapse};
 
     if (! $attrs->{_ordered_for_collapse}) {
       $did_fetch_all = 1;
@@ -1356,6 +1403,7 @@ sub _construct_results {
 
   if (! $did_fetch_all and ! @{$rows||[]} ) {
     # FIXME SUBOPTIMAL - we can do better, cursor->next/all (well diff. methods) should return a ref
+    $cursor ||= $self->cursor;
     if (scalar (my @r = $cursor->next) ) {
       $rows = [ \@r ];
     }
@@ -1363,14 +1411,34 @@ sub _construct_results {
 
   return undef unless @{$rows||[]};
 
-  my @extra_collapser_args;
-  if ($attrs->{collapse} and ! $did_fetch_all ) {
+  # sanity check - people are too clever for their own good
+  if ($attrs->{collapse} and my $aliastypes = $attrs->{_last_sqlmaker_alias_map} ) {
 
-    @extra_collapser_args = (
-      # FIXME SUBOPTIMAL - we can do better, cursor->next/all (well diff. methods) should return a ref
-      sub { my @r = $cursor->next or return; \@r }, # how the collapser gets more rows
-      ($self->{_stashed_rows} = []),                # where does it stuff excess
-    );
+    my $multiplied_selectors;
+    for my $sel_alias ( grep { $_ ne $attrs->{alias} } keys %{ $aliastypes->{selecting} } ) {
+      if (
+        $aliastypes->{multiplying}{$sel_alias}
+          or
+        $aliastypes->{premultiplied}{$sel_alias}
+      ) {
+        $multiplied_selectors->{$_} = 1 for values %{$aliastypes->{selecting}{$sel_alias}{-seen_columns}}
+      }
+    }
+
+    for my $i (0 .. $#{$attrs->{as}} ) {
+      my $sel = $attrs->{select}[$i];
+
+      if (ref $sel eq 'SCALAR') {
+        $sel = $$sel;
+      }
+      elsif( ref $sel eq 'REF' and ref $$sel eq 'ARRAY' ) {
+        $sel = $$sel->[0];
+      }
+
+      $self->throw_exception(
+        'Result collapse not possible - selection from a has_many source redirected to the main object'
+      ) if ($multiplied_selectors->{$sel} and $attrs->{as}[$i] !~ /\./);
+    }
   }
 
   # hotspot - skip the setter
@@ -1383,7 +1451,6 @@ sub _construct_results {
 
   my $infmap = $attrs->{as};
 
-
   $self->{_result_inflator}{is_core_row} = ( (
     $inflator_cref
       ==
@@ -1427,32 +1494,79 @@ sub _construct_results {
       );
     }
   }
-  # Special-case multi-object HRI (we always prune, and there is no $inflator_cref pass)
-  elsif ($self->{_result_inflator}{is_hri}) {
-    ( $self->{_row_parser}{hri} ||= $rsrc->_mk_row_parser({
-      eval => 1,
-      inflate_map => $infmap,
-      selection => $attrs->{select},
-      collapse => $attrs->{collapse},
-      premultiplied => $attrs->{_main_source_premultiplied},
-      hri_style => 1,
-      prune_null_branches => 1,
-    }) )->($rows, @extra_collapser_args);
-  }
-  # Regular multi-object
   else {
-    my $parser_type = $self->{_result_inflator}{is_core_row} ? 'classic_pruning' : 'classic_nonpruning';
+    my $parser_type =
+        $self->{_result_inflator}{is_hri}       ? 'hri'
+      : $self->{_result_inflator}{is_core_row}  ? 'classic_pruning'
+      :                                           'classic_nonpruning'
+    ;
 
-    ( $self->{_row_parser}{$parser_type} ||= $rsrc->_mk_row_parser({
+    # $args and $attrs to _mk_row_parser are separated to delineate what is
+    # core collapser stuff and what is dbic $rs specific
+    @{$self->{_row_parser}{$parser_type}}{qw(cref nullcheck)} = $rsrc->_mk_row_parser({
       eval => 1,
       inflate_map => $infmap,
-      selection => $attrs->{select},
       collapse => $attrs->{collapse},
       premultiplied => $attrs->{_main_source_premultiplied},
-      prune_null_branches => $self->{_result_inflator}{is_core_row},
-    }) )->($rows, @extra_collapser_args);
+      hri_style => $self->{_result_inflator}{is_hri},
+      prune_null_branches => $self->{_result_inflator}{is_hri} || $self->{_result_inflator}{is_core_row},
+    }, $attrs) unless $self->{_row_parser}{$parser_type}{cref};
+
+    # column_info metadata historically hasn't been too reliable.
+    # We need to start fixing this somehow (the collapse resolver
+    # can't work without it). Add an explicit check for the *main*
+    # result, hopefully this will gradually weed out such errors
+    #
+    # FIXME - this is a temporary kludge that reduces performance
+    # It is however necessary for the time being
+    my ($unrolled_non_null_cols_to_check, $err);
+
+    if (my $check_non_null_cols = $self->{_row_parser}{$parser_type}{nullcheck} ) {
+
+      $err =
+        'Collapse aborted due to invalid ResultSource metadata - the following '
+      . 'selections are declared non-nullable but NULLs were retrieved: '
+      ;
+
+      my @violating_idx;
+      COL: for my $i (@$check_non_null_cols) {
+        ! defined $_->[$i] and push @violating_idx, $i and next COL for @$rows;
+      }
+
+      $self->throw_exception( $err . join (', ', map { "'$infmap->[$_]'" } @violating_idx ) )
+        if @violating_idx;
+
+      $unrolled_non_null_cols_to_check = join (',', @$check_non_null_cols);
+    }
 
-    $_ = $inflator_cref->($res_class, $rsrc, @$_) for @$rows;
+    my $next_cref =
+      ($did_fetch_all or ! $attrs->{collapse})  ? undef
+    : defined $unrolled_non_null_cols_to_check  ? eval sprintf <<'EOS', $unrolled_non_null_cols_to_check
+sub {
+  # FIXME SUBOPTIMAL - we can do better, cursor->next/all (well diff. methods) should return a ref
+  my @r = $cursor->next or return;
+  if (my @violating_idx = grep { ! defined $r[$_] } (%s) ) {
+    $self->throw_exception( $err . join (', ', map { "'$infmap->[$_]'" } @violating_idx ) )
+  }
+  \@r
+}
+EOS
+    : sub {
+        # FIXME SUBOPTIMAL - we can do better, cursor->next/all (well diff. methods) should return a ref
+        my @r = $cursor->next or return;
+        \@r
+      }
+    ;
+
+    $self->{_row_parser}{$parser_type}{cref}->(
+      $rows,
+      $next_cref ? ( $next_cref, $self->{_stashed_rows} = [] ) : (),
+    );
+
+    # Special-case multi-object HRI - there is no $inflator_cref pass
+    unless ($self->{_result_inflator}{is_hri}) {
+      $_ = $inflator_cref->($res_class, $rsrc, @$_) for @$rows
+    }
   }
 
   # The @$rows check seems odd at first - why wouldn't we want to warn
@@ -1598,10 +1712,10 @@ sub count_rs {
   # software based limiting can not be ported if this $rs is to be used
   # in a subquery itself (i.e. ->as_query)
   if ($self->_has_resolved_attr (qw/collapse group_by offset rows/)) {
-    return $self->_count_subq_rs;
+    return $self->_count_subq_rs($self->{_attrs});
   }
   else {
-    return $self->_count_rs;
+    return $self->_count_rs($self->{_attrs});
   }
 }
 
@@ -1612,19 +1726,17 @@ sub _count_rs {
   my ($self, $attrs) = @_;
 
   my $rsrc = $self->result_source;
-  $attrs ||= $self->_resolved_attrs;
 
   my $tmp_attrs = { %$attrs };
   # take off any limits, record_filter is cdbi, and no point of ordering nor locking a count
-  delete @{$tmp_attrs}{qw/rows offset order_by _related_results_construction record_filter for/};
+  delete @{$tmp_attrs}{qw/rows offset order_by record_filter for/};
 
   # overwrite the selector (supplied by the storage)
-  $tmp_attrs->{select} = $rsrc->storage->_count_select ($rsrc, $attrs);
-  $tmp_attrs->{as} = 'count';
-
-  my $tmp_rs = $rsrc->resultset_class->new($rsrc, $tmp_attrs)->get_column ('count');
-
-  return $tmp_rs;
+  $rsrc->resultset_class->new($rsrc, {
+    %$tmp_attrs,
+    select => $rsrc->storage->_count_select ($rsrc, $attrs),
+    as => 'count',
+  })->get_column ('count');
 }
 
 #
@@ -1634,11 +1746,10 @@ sub _count_subq_rs {
   my ($self, $attrs) = @_;
 
   my $rsrc = $self->result_source;
-  $attrs ||= $self->_resolved_attrs;
 
   my $sub_attrs = { %$attrs };
   # extra selectors do not go in the subquery and there is no point of ordering it, nor locking it
-  delete @{$sub_attrs}{qw/collapse columns as select _related_results_construction order_by for/};
+  delete @{$sub_attrs}{qw/collapse columns as select order_by for/};
 
   # if we multi-prefetch we group_by something unique, as this is what we would
   # get out of the rs via ->next/->all. We *DO WANT* to clobber old group_by regardless
@@ -1678,15 +1789,20 @@ sub _count_subq_rs {
         $sql_maker->{name_sep} = '';
       }
 
+      # delete local is 5.12+
+      local @{$sql_maker}{qw(renderer converter)};
+      delete @{$sql_maker}{qw(renderer converter)};
+
       my ($lquote, $rquote, $sep) = map { quotemeta $_ } ($sql_maker->_quote_chars, $sql_maker->name_sep);
 
-      my $having_sql = $sql_maker->_parse_rs_attrs ({ having => $attrs->{having} });
+      my $having_sql = $sql_maker->_render_sqla(where => $attrs->{having});
+
       my %seen_having;
 
       # search for both a proper quoted qualified string, for a naive unquoted scalarref
       # and if all fails for an utterly naive quoted scalar-with-function
       while ($having_sql =~ /
-        $rquote $sep $lquote (.+?) $rquote
+        (?: $rquote $sep)? $lquote (.+?) $rquote
           |
         [\s,] \w+ \. (\w+) [\s,]
           |
@@ -1856,20 +1972,12 @@ sub _rs_update_delete {
 
   # simplify the joinmap, so we can further decide if a subq is necessary
   if (!$needs_subq and @{$attrs->{from}} > 1) {
-    $attrs->{from} = $storage->_prune_unused_joins ($attrs->{from}, $attrs->{select}, $self->{cond}, $attrs);
-
-    # check if there are any joins left after the prune
-    if ( @{$attrs->{from}} > 1 ) {
-      $join_classifications = $storage->_resolve_aliastypes_from_select_args (
-        [ @{$attrs->{from}}[1 .. $#{$attrs->{from}}] ],
-        $attrs->{select},
-        $self->{cond},
-        $attrs
-      );
 
-      # any non-pruneable joins imply subq
-      $needs_subq = scalar keys %{ $join_classifications->{restricting} || {} };
-    }
+    ($attrs->{from}, $join_classifications) =
+      $storage->_prune_unused_joins ($attrs);
+
+    # any non-pruneable non-local restricting joins imply subq
+    $needs_subq = defined List::Util::first { $_ ne $attrs->{alias} } keys %{ $join_classifications->{restricting} || {} };
   }
 
   # check if the head is composite (by now all joins are thrown out unless $needs_subq)
@@ -1884,12 +1992,18 @@ sub _rs_update_delete {
   if (! $needs_subq) {
     # Most databases do not allow aliasing of tables in UPDATE/DELETE. Thus
     # a condition containing 'me' or other table prefixes will not work
-    # at all. Tell SQLMaker to dequalify idents via a gross hack.
-    $cond = do {
-      my $sqla = $rsrc->storage->sql_maker;
-      local $sqla->{_dequalify_idents} = 1;
-      \[ $sqla->_recurse_where($self->{cond}) ];
-    };
+    # at all - so we convert the WHERE to a dq tree now, dequalify all
+    # identifiers found therein via a scan across the tree, and then use
+    # \{} style to pass the result onwards for use in the final query
+    if ($self->{cond}) {
+      $cond = do {
+        my $converter = $rsrc->storage->sql_maker->converter;
+        scan_dq_nodes({
+          DQ_IDENTIFIER ,=> sub { $_ = [ $_->[-1] ] for $_[0]->{elements} }
+        }, my $where_dq = $converter->_where_to_dq($self->{cond}));
+        \$where_dq;
+      };
+    }
   }
   else {
     # we got this far - means it is time to wrap a subquery
@@ -1902,20 +2016,28 @@ sub _rs_update_delete {
     );
 
     # make a new $rs selecting only the PKs (that's all we really need for the subq)
-    delete $attrs->{$_} for qw/select as collapse _related_results_construction/;
+    delete $attrs->{$_} for qw/select as collapse/;
     $attrs->{columns} = [ map { "$attrs->{alias}.$_" } @$idcols ];
-    $attrs->{group_by} = \ '';  # FIXME - this is an evil hack, it causes the optimiser to kick in and throw away the LEFT joins
+
+    # this will be consumed by the pruner waaaaay down the stack
+    $attrs->{_force_prune_multiplying_joins} = 1;
+
     my $subrs = (ref $self)->new($rsrc, $attrs);
 
     if (@$idcols == 1) {
-      $cond = { $idcols->[0] => { -in => $subrs->as_query } };
+      $cond = { $idcols->[0] => { -in => \$subrs->_as_select_dq } };
     }
     elsif ($storage->_use_multicolumn_in) {
       # no syntax for calling this properly yet
       # !!! EXPERIMENTAL API !!! WILL CHANGE !!!
-      $cond = $storage->sql_maker->_where_op_multicolumn_in (
-        $idcols, # how do I convey a list of idents...? can binds reside on lhs?
-        $subrs->as_query
+      my $left = $storage->sql_maker->_render_sqla(select_select => $idcols);
+      $left =~ s/^SELECT //i;
+      my $right = $storage->sql_maker
+                          ->converter
+                          ->_literal_to_dq(${$subrs->as_query});
+      $cond = \Operator(
+        { 'SQL.Naive' => 'in' },
+        [ Literal(SQL => "( $left )"), $right ],
       ),
     }
     else {
@@ -1925,6 +2047,8 @@ sub _rs_update_delete {
       if (
         $existing_group_by
           or
+        # we do not need to check pre-multipliers, since if the premulti is there, its
+        # parent (who is multi) will be there too
         keys %{ $join_classifications->{multiplying} || {} }
       ) {
         # make sure if there is a supplied group_by it matches the columns compiled above
@@ -2144,7 +2268,7 @@ first element should be a list of column names and each subsequent
 element should be a data value in the earlier specified column order.
 For example:
 
-  $Arstist_rs->populate([
+  $schema->resultset("Artist")->populate([
     [ qw( artistid name ) ],
     [ 100, 'A Formally Unknown Singer' ],
     [ 101, 'A singer that jumped the shark two albums ago' ],
@@ -2268,6 +2392,11 @@ sub populate {
           $rel,
         );
 
+        if (ref($related) eq 'REF' and ref($$related) eq 'HASH') {
+          $related = $self->result_source
+                          ->_extract_fixed_values_for($$related, $rel);
+        }
+
         my @rows_to_add = ref $item->{$rel} eq 'ARRAY' ? @{$item->{$rel}} : ($item->{$rel});
         my @populate = map { {%$_, %$related} } @rows_to_add;
 
@@ -2277,8 +2406,7 @@ sub populate {
   }
 }
 
-
-# populate() argumnets went over several incarnations
+# populate() arguments went over several incarnations
 # What we ultimately support is AoH
 sub _normalize_populate_args {
   my ($self, $arg) = @_;
@@ -2442,17 +2570,8 @@ sub _merge_with_rscond {
   if (! defined $self->{cond}) {
     # just massage $data below
   }
-  elsif ($self->{cond} eq $DBIx::Class::ResultSource::UNRESOLVABLE_CONDITION) {
-    %new_data = %{ $self->{attrs}{related_objects} || {} };  # nothing might have been inserted yet
-    @cols_from_relations = keys %new_data;
-  }
-  elsif (ref $self->{cond} ne 'HASH') {
-    $self->throw_exception(
-      "Can't abstract implicit construct, resultset condition not a hash"
-    );
-  }
-  else {
-    # precendence must be given to passed values over values inherited from
+  elsif (ref $self->{cond} eq 'HASH') {
+    # precedence must be given to passed values over values inherited from
     # the cond, so the order here is important.
     my $collapsed_cond = $self->_collapse_cond($self->{cond});
     my %implied = %{$self->_remove_alias($collapsed_cond, $alias)};
@@ -2473,6 +2592,23 @@ sub _merge_with_rscond {
       }
     }
   }
+  elsif (ref $self->{cond} eq 'REF' and ref ${$self->{cond}} eq 'HASH') {
+    if ((${$self->{cond}})->{'DBIx::Class::ResultSource.UNRESOLVABLE'}) {
+      %new_data = %{ $self->{attrs}{related_objects} || {} };  # nothing might have been inserted yet
+      @cols_from_relations = keys %new_data;
+    } else {
+      %new_data = %{$self->_remove_alias(
+        $self->result_source
+             ->_extract_fixed_values_for(${$self->{cond}}),
+        $alias
+      )};
+    }
+  }
+  else {
+    $self->throw_exception(
+      "Can't abstract implicit construct, resultset condition not a hash"
+    );
+  }
 
   %new_data = (
     %new_data,
@@ -2487,7 +2623,7 @@ sub _merge_with_rscond {
 # determines if the resultset defines at least one
 # of the attributes supplied
 #
-# used to determine if a subquery is neccessary
+# used to determine if a subquery is necessary
 #
 # supports some virtual attributes:
 #   -join
@@ -2611,9 +2747,24 @@ sub as_query {
 
   my $attrs = { %{ $self->_resolved_attrs } };
 
-  $self->result_source->storage->_select_args_to_query (
+  my $aq = $self->result_source->storage->_select_args_to_query (
+    $attrs->{from}, $attrs->{select}, $attrs->{where}, $attrs
+  );
+
+  $aq;
+}
+
+sub _as_select_dq {
+  my $self = shift;
+  my $attrs = { %{ $self->_resolved_attrs } };
+  my $storage = $self->result_source->storage;
+  my (undef, $ident, @args) = $storage->_select_args(
     $attrs->{from}, $attrs->{select}, $attrs->{where}, $attrs
   );
+  $ident = $ident->from if blessed($ident);
+  $storage->sql_maker->converter->_select_to_dq(
+    $ident, @args
+  );
 }
 
 =head2 find_or_new
@@ -2630,7 +2781,7 @@ sub as_query {
     { artist => 'fred' }, { key => 'artists' });
 
   $cd->cd_to_producer->find_or_new({ producer => $producer },
-                                   { key => 'primary });
+                                   { key => 'primary' });
 
 Find an existing record from this resultset using L</find>. if none exists,
 instantiate a new result object and return it. The object will not be saved
@@ -3085,7 +3236,10 @@ Returns a related resultset for the supplied relationship name.
 sub related_resultset {
   my ($self, $rel) = @_;
 
-  return $self->{related_resultsets}{$rel} ||= do {
+  return $self->{related_resultsets}{$rel}
+    if defined $self->{related_resultsets}{$rel};
+
+  return $self->{related_resultsets}{$rel} = do {
     my $rsrc = $self->result_source;
     my $rel_info = $rsrc->relationship_info($rel);
 
@@ -3111,15 +3265,6 @@ sub related_resultset {
     #XXX - temp fix for result_class bug. There likely is a more elegant fix -groditi
     delete @{$attrs}{qw(result_class alias)};
 
-    my $related_cache;
-
-    if (my $cache = $self->get_cache) {
-      $related_cache = [ map
-        { @{$_->related_resultset($rel)->get_cache||[]} }
-        @$cache
-      ];
-    }
-
     my $rel_source = $rsrc->related_source($rel);
 
     my $new = do {
@@ -3140,7 +3285,16 @@ sub related_resultset {
                        where => $attrs->{where},
                    });
     };
-    $new->set_cache($related_cache) if $related_cache;
+
+    if (my $cache = $self->get_cache) {
+      my @related_cache = map
+        { @{$_->related_resultset($rel)->get_cache||[]} }
+        @$cache
+      ;
+
+      $new->set_cache(\@related_cache) if @related_cache;
+    }
+
     $new;
   };
 }
@@ -3373,6 +3527,9 @@ sub _resolved_attrs {
   my $source = $self->result_source;
   my $alias  = $attrs->{alias};
 
+  $self->throw_exception("Specifying distinct => 1 in conjunction with collapse => 1 is unsupported")
+    if $attrs->{collapse} and $attrs->{distinct};
+
   # default selection list
   $attrs->{columns} = [ $source->columns ]
     unless List::Util::first { exists $attrs->{$_} } qw/columns cols select as/;
@@ -3462,7 +3619,7 @@ sub _resolved_attrs {
         $source->_resolve_join(
           $join,
           $alias,
-          { %{ $attrs->{seen_join} || {} } },
+          ($attrs->{seen_join} = { %{ $attrs->{seen_join} || {} } }),
           ( $attrs->{seen_join} && keys %{$attrs->{seen_join}})
             ? $attrs->{from}[-1][0]{-join_path}
             : []
@@ -3483,26 +3640,9 @@ sub _resolved_attrs {
     $attrs->{group_by} = [ $attrs->{group_by} ];
   }
 
-  # generate the distinct induced group_by early, as prefetch will be carried via a
-  # subquery (since a group_by is present)
-  if (delete $attrs->{distinct}) {
-    if ($attrs->{group_by}) {
-      carp_unique ("Useless use of distinct on a grouped resultset ('distinct' is ignored when a 'group_by' is present)");
-    }
-    else {
-      $attrs->{_grouped_by_distinct} = 1;
-      # distinct affects only the main selection part, not what prefetch may
-      # add below.
-      $attrs->{group_by} = $source->storage->_group_over_selection (
-        $attrs->{from},
-        $attrs->{select},
-        $attrs->{order_by},
-      );
-    }
-  }
 
   # generate selections based on the prefetch helper
-  my $prefetch;
+  my ($prefetch, @prefetch_select, @prefetch_as);
   $prefetch = $self->_merge_joinpref_attr( {}, delete $attrs->{prefetch} )
     if defined $attrs->{prefetch};
 
@@ -3511,6 +3651,9 @@ sub _resolved_attrs {
     $self->throw_exception("Unable to prefetch, resultset contains an unnamed selector $attrs->{_dark_selector}{string}")
       if $attrs->{_dark_selector};
 
+    $self->throw_exception("Specifying prefetch in conjunction with an explicit collapse => 0 is unsupported")
+      if defined $attrs->{collapse} and ! $attrs->{collapse};
+
     $attrs->{collapse} = 1;
 
     # this is a separate structure (we don't look in {from} directly)
@@ -3536,19 +3679,13 @@ sub _resolved_attrs {
 
     my @prefetch = $source->_resolve_prefetch( $prefetch, $alias, $join_map );
 
-    push @{ $attrs->{select} }, (map { $_->[0] } @prefetch);
-    push @{ $attrs->{as} }, (map { $_->[1] } @prefetch);
-  }
-
-  if ( List::Util::first { $_ =~ /\./ } @{$attrs->{as}} ) {
-    $attrs->{_related_results_construction} = 1;
-  }
-  else {
-    $attrs->{collapse} = 0;
+    # save these for after distinct resolution
+    @prefetch_select = map { $_->[0] } @prefetch;
+    @prefetch_as = map { $_->[1] } @prefetch;
   }
 
   # run through the resulting joinstructure (starting from our current slot)
-  # and unset collapse if proven unnesessary
+  # and unset collapse if proven unnecessary
   #
   # also while we are at it find out if the current root source has
   # been premultiplied by previous related_source chaining
@@ -3559,7 +3696,7 @@ sub _resolved_attrs {
 
     if (ref $attrs->{from} eq 'ARRAY') {
 
-      if (@{$attrs->{from}} <= 1) {
+      if (@{$attrs->{from}} == 1) {
         # no joins - no collapse
         $attrs->{collapse} = 0;
       }
@@ -3596,6 +3733,34 @@ sub _resolved_attrs {
     }
   }
 
+  # generate the distinct induced group_by before injecting the prefetched select/as parts
+  if (delete $attrs->{distinct}) {
+    if ($attrs->{group_by}) {
+      carp_unique ("Useless use of distinct on a grouped resultset ('distinct' is ignored when a 'group_by' is present)");
+    }
+    else {
+      $attrs->{_grouped_by_distinct} = 1;
+      # distinct affects only the main selection part, not what prefetch may add below
+      ($attrs->{group_by}, my $new_order) = $source->storage->_group_over_selection($attrs);
+
+      # FIXME possibly ignore a rewritten order_by (may turn out to be an issue)
+      # The thinking is: if we are collapsing the subquerying prefetch engine will
+      # rip stuff apart for us anyway, and we do not want to have a potentially
+      # function-converted external order_by
+      # ( there is an explicit if ( collapse && _grouped_by_distinct ) check in DBIHacks )
+      $attrs->{order_by} = $new_order unless $attrs->{collapse};
+    }
+  }
+
+  # inject prefetch-bound selection (if any)
+  push @{$attrs->{select}}, @prefetch_select;
+  push @{$attrs->{as}}, @prefetch_as;
+
+  # whether we can get away with the dumbest (possibly DBI-internal) collapser
+  if ( List::Util::first { $_ =~ /\./ } @{$attrs->{as}} ) {
+    $attrs->{_related_results_construction} = 1;
+  }
+
   # if both page and offset are specified, produce a combined offset
   # even though it doesn't make much sense, this is what pre 081xx has
   # been doing
@@ -4174,7 +4339,7 @@ object with all of its related data.
 If an L</order_by> is already declared, and orders the resultset in a way that
 makes collapsing as described above impossible (e.g. C<< ORDER BY
 has_many_rel.column >> or C<ORDER BY RANDOM()>), DBIC will automatically
-switch to "eager" mode and slurp the entire resultset before consturcting the
+switch to "eager" mode and slurp the entire resultset before constructing the
 first object returned by L</next>.
 
 Setting this attribute on a resultset that does not join any has_many
@@ -4388,8 +4553,17 @@ or with an in-place function in which case literal SQL is required:
 
 =back
 
-Set to 1 to group by all columns. If the resultset already has a group_by
-attribute, this setting is ignored and an appropriate warning is issued.
+Set to 1 to automatically generate a L</group_by> clause based on the selection
+(including intelligent handling of L</order_by> contents). Note that the group
+criteria calculation takes place over the B<final> selection. This includes
+any L</+columns>, L</+select> or L</order_by> additions in subsequent
+L</search> calls, and standalone columns selected via
+L<DBIx::Class::ResultSetColumn> (L</get_column>). A notable exception are the
+extra selections specified via L</prefetch> - such selections are explicitly
+excluded from group criteria calculations.
+
+If the final ResultSet also explicitly defines a L</group_by> attribute, this
+setting is ignored and an appropriate warning is issued.
 
 =head2 where
 
@@ -4598,7 +4772,7 @@ or to a sensible value based on the "data_type".
 =item dbic_colname
 
 Used to fill in missing sqlt_datatype and sqlt_size attributes (if they are
-explicitly specified they are never overriden).  Also used by some weird DBDs,
+explicitly specified they are never overridden).  Also used by some weird DBDs,
 where the column name should be available at bind_param time (e.g. Oracle).
 
 =back
@@ -4609,6 +4783,7 @@ supported:
   [ $name => $val ] === [ { dbic_colname => $name }, $val ]
   [ \$dt  => $val ] === [ { sqlt_datatype => $dt }, $val ]
   [ undef,   $val ] === [ {}, $val ]
+  $val              === [ {}, $val ]
 
 =head1 AUTHOR AND CONTRIBUTORS