typo fixes
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBIHacks.pm
index 1f48bc6..4c54e2c 100644 (file)
@@ -34,29 +34,39 @@ sub _prune_unused_joins {
 
   my $aliastypes = $self->_resolve_aliastypes_from_select_args(@_);
 
+  my $orig_joins = delete $aliastypes->{joining};
+  my $orig_multiplying = $aliastypes->{multiplying};
+
   # a grouped set will not be affected by amount of rows. Thus any
   # {multiplying} joins can go
-  delete $aliastypes->{multiplying} if $attrs->{group_by};
+  delete $aliastypes->{multiplying}
+    if $attrs->{_force_prune_multiplying_joins} or $attrs->{group_by};
 
   my @newfrom = $from->[0]; # FROM head is always present
 
   my %need_joins;
+
   for (values %$aliastypes) {
     # add all requested aliases
     $need_joins{$_} = 1 for keys %$_;
 
     # add all their parents (as per joinpath which is an AoH { table => alias })
-    $need_joins{$_} = 1 for map { values %$_ } map { @$_ } values %$_;
+    $need_joins{$_} = 1 for map { values %$_ } map { @{$_->{-parents}} } values %$_;
   }
+
   for my $j (@{$from}[1..$#$from]) {
     push @newfrom, $j if (
-      (! $j->[0]{-alias}) # legacy crap
+      (! defined $j->[0]{-alias}) # legacy crap
         ||
       $need_joins{$j->[0]{-alias}}
     );
   }
 
-  return \@newfrom;
+  return ( \@newfrom, {
+    multiplying => { map { $need_joins{$_} ? ($_  => $orig_multiplying->{$_}) : () } keys %$orig_multiplying },
+    %$aliastypes,
+    joining => { map { $_ => $orig_joins->{$_} } keys %need_joins },
+  } );
 }
 
 #
@@ -66,29 +76,32 @@ sub _prune_unused_joins {
 sub _adjust_select_args_for_complex_prefetch {
   my ($self, $from, $select, $where, $attrs) = @_;
 
-  $self->throw_exception ('Nothing to prefetch... how did we get here?!')
-    if not @{$attrs->{_prefetch_selector_range}};
-
   $self->throw_exception ('Complex prefetches are not supported on resultsets with a custom from attribute')
     if (ref $from ne 'ARRAY' || ref $from->[0] ne 'HASH' || ref $from->[1] ne 'ARRAY');
 
+  my $root_alias = $attrs->{alias};
+
   # generate inner/outer attribute lists, remove stuff that doesn't apply
   my $outer_attrs = { %$attrs };
-  delete $outer_attrs->{$_} for qw/where bind rows offset group_by having/;
+  delete @{$outer_attrs}{qw(where bind rows offset group_by _grouped_by_distinct having)};
 
-  my $inner_attrs = { %$attrs, _is_internal_subuery => 1 };
-  delete $inner_attrs->{$_} for qw/for collapse _prefetch_selector_range select as/;
+  my $inner_attrs = { %$attrs };
+  delete @{$inner_attrs}{qw(from for collapse select as _related_results_construction)};
 
-  # if the user did not request it, there is no point using it inside
-  delete $inner_attrs->{order_by} if delete $inner_attrs->{_order_is_artificial};
+  # there is no point of ordering the insides if there is no limit
+  delete $inner_attrs->{order_by} if (
+    delete $inner_attrs->{_order_is_artificial}
+      or
+    ! $inner_attrs->{rows}
+  );
 
   # generate the inner/outer select lists
   # for inside we consider only stuff *not* brought in by the prefetch
   # on the outside we substitute any function for its alias
   my $outer_select = [ @$select ];
-  my $inner_select = [];
+  my $inner_select;
 
-  my ($root_source, $root_source_offset);
+  my ($root_node, $root_node_offset);
 
   for my $i (0 .. $#$from) {
     my $node = $from->[$i];
@@ -97,26 +110,30 @@ sub _adjust_select_args_for_complex_prefetch {
           : next
     ;
 
-    if ( ($h->{-alias}||'') eq $attrs->{alias} and $root_source = $h->{-rsrc} ) {
-      $root_source_offset = $i;
+    if ( ($h->{-alias}||'') eq $root_alias and $h->{-rsrc} ) {
+      $root_node = $h;
+      $root_node_offset = $i;
       last;
     }
   }
 
   $self->throw_exception ('Complex prefetches are not supported on resultsets with a custom from attribute')
-    unless $root_source;
+    unless $root_node;
 
   # use the heavy duty resolver to take care of aliased/nonaliased naming
   my $colinfo = $self->_resolve_column_info($from);
   my $selected_root_columns;
 
-  my ($p_start, $p_end) = @{$outer_attrs->{_prefetch_selector_range}};
-  for my $i (0 .. $p_start - 1, $p_end + 1 .. $#$outer_select) {
+  for my $i (0 .. $#$outer_select) {
     my $sel = $outer_select->[$i];
 
+    next if (
+      $colinfo->{$sel} and $colinfo->{$sel}{-source_alias} ne $root_alias
+    );
+
     if (ref $sel eq 'HASH' ) {
       $sel->{-as} ||= $attrs->{as}[$i];
-      $outer_select->[$i] = join ('.', $attrs->{alias}, ($sel->{-as} || "inner_column_$i") );
+      $outer_select->[$i] = join ('.', $root_alias, ($sel->{-as} || "inner_column_$i") );
     }
     elsif (! ref $sel and my $ci = $colinfo->{$sel}) {
       $selected_root_columns->{$ci->{-colname}} = 1;
@@ -127,77 +144,164 @@ sub _adjust_select_args_for_complex_prefetch {
     push @{$inner_attrs->{as}}, $attrs->{as}[$i];
   }
 
-  # We will need to fetch all native columns in the inner subquery, which may be a part
-  # of an *outer* join condition. We can not just fetch everything because a potential
-  # has_many restricting join collapse *will not work* on heavy data types.
-  # Time for more horrible SQL parsing, aughhhh
-
-  # MASSIVE FIXME - in fact when we are fully transitioned to DQ and the support is
-  # is sane - we will need to trim the select list to *only* fetch stuff that is
-  # necessary to build joins. In the current implementation if I am selecting a blob
-  # and the group_by kicks in - we are fucked, and all the user can do is not select
-  # that column. This is silly!
-
-  my $retardo_sqla_cache = {};
-  for my $cond ( map { $_->[1] } @{$from}[$root_source_offset + 1 .. $#$from] ) {
-    for my $col (@{$self->_extract_condition_columns($cond, $retardo_sqla_cache)}) {
-      my $ci = $colinfo->{$col};
-      if (
-        $ci
-          and
-        $ci->{-source_alias} eq $attrs->{alias}
-          and
-        ! $selected_root_columns->{$ci->{-colname}}++
-      ) {
-        # adding it to both to keep limits not supporting dark selectors happy
-        push @$inner_select, $ci->{-fq_colname};
-        push @{$inner_attrs->{as}}, $ci->{-fq_colname};
-      }
+  # We will need to fetch all native columns in the inner subquery, which may
+  # be a part of an *outer* join condition, or an order_by (which needs to be
+  # preserved outside)
+  # We can not just fetch everything because a potential has_many restricting
+  # join collapse *will not work* on heavy data types.
+  my $connecting_aliastypes = $self->_resolve_aliastypes_from_select_args(
+    $from,
+    [],
+    $where,
+    $inner_attrs
+  );
+
+  for (sort map { keys %{$_->{-seen_columns}||{}} } map { values %$_ } values %$connecting_aliastypes) {
+    my $ci = $colinfo->{$_} or next;
+    if (
+      $ci->{-source_alias} eq $root_alias
+        and
+      ! $selected_root_columns->{$ci->{-colname}}++
+    ) {
+      # adding it to both to keep limits not supporting dark selectors happy
+      push @$inner_select, $ci->{-fq_colname};
+      push @{$inner_attrs->{as}}, $ci->{-fq_colname};
     }
   }
 
   # construct the inner $from and lock it in a subquery
   # we need to prune first, because this will determine if we need a group_by below
-  # the fake group_by is so that the pruner throws away all non-selecting, non-restricting
-  # multijoins (since we def. do not care about those inside the subquery)
-
+  # throw away all non-selecting, non-restricting multijoins
+  # (since we def. do not care about multiplication those inside the subquery)
   my $inner_subq = do {
 
     # must use it here regardless of user requests
     local $self->{_use_join_optimizer} = 1;
 
-    my $inner_from = $self->_prune_unused_joins ($from, $inner_select, $where, {
-      group_by => ['dummy'], %$inner_attrs,
+    # throw away multijoins since we def. do not care about those inside the subquery
+    my ($inner_from, $inner_aliastypes) = $self->_prune_unused_joins ($from, $inner_select, $where, {
+      %$inner_attrs, _force_prune_multiplying_joins => 1
     });
 
-    my $inner_aliastypes =
-      $self->_resolve_aliastypes_from_select_args( $inner_from, $inner_select, $where, $inner_attrs );
-
-    # we need to simulate collapse in the subq if a multiplying join is pulled
-    # by being a non-selecting restrictor
+    # uh-oh a multiplier (which is not us) left in, this is a problem
     if (
-      ! $inner_attrs->{group_by}
+      $inner_aliastypes->{multiplying}
         and
-      first {
-        $inner_aliastypes->{restricting}{$_}
-          and
-        ! $inner_aliastypes->{selecting}{$_}
-      } ( keys %{$inner_aliastypes->{multiplying}||{}} )
+      # if there are user-supplied groups - assume user knows wtf they are up to
+      ( ! $inner_aliastypes->{grouping} or $inner_attrs->{_grouped_by_distinct} )
+        and
+      my @multipliers = grep { $_ ne $root_alias } keys %{$inner_aliastypes->{multiplying}}
     ) {
-      my $unprocessed_order_chunks;
-      ($inner_attrs->{group_by}, $unprocessed_order_chunks) = $self->_group_over_selection (
-        $inner_from, $inner_select, $inner_attrs->{order_by}
-      );
-
-      $self->throw_exception (
-        'A required group_by clause could not be constructed automatically due to a complex '
-      . 'order_by criteria. Either order_by columns only (no functions) or construct a suitable '
-      . 'group_by by hand'
-      )  if $unprocessed_order_chunks;
+
+      # if none of the multipliers came from an order_by (guaranteed to have been combined
+      # with a limit) - easy - just slap a group_by to simulate a collapse and be on our way
+      if (
+        ! $inner_aliastypes->{ordering}
+          or
+        ! first { $inner_aliastypes->{ordering}{$_} } @multipliers
+      ) {
+
+        my $unprocessed_order_chunks;
+        ($inner_attrs->{group_by}, $unprocessed_order_chunks) = $self->_group_over_selection ({
+          %$inner_attrs,
+          from => $inner_from,
+          select => $inner_select,
+        });
+
+        $self->throw_exception (
+          'A required group_by clause could not be constructed automatically due to a complex '
+        . 'order_by criteria. Either order_by columns only (no functions) or construct a suitable '
+        . 'group_by by hand'
+        )  if $unprocessed_order_chunks;
+      }
+      else {
+        # We need to order by external columns and group at the same time
+        # so we can calculate the proper limit
+        # This doesn't really make sense in SQL, however from DBICs point
+        # of view is rather valid (order the leftmost objects by whatever
+        # criteria and get the offset/rows many). There is a way around
+        # this however in SQL - we simply tae the direction of each piece
+        # of the foreign order and convert them to MIN(X) for ASC or MAX(X)
+        # for DESC, and group_by the root columns. The end result should be
+        # exactly what we expect
+
+        # supplement the main selection with pks if not already there,
+        # as they will have to be a part of the group_by to collapse
+        # things properly
+        my $cur_sel = { map { $_ => 1 } @$inner_select };
+
+        my @pks = map { "$root_alias.$_" } $root_node->{-rsrc}->primary_columns
+          or $self->throw_exception( sprintf
+            'Unable to perform complex limited prefetch off %s without declared primary key',
+            $root_node->{-rsrc}->source_name,
+          );
+        for my $col (@pks) {
+          push @$inner_select, $col
+            unless $cur_sel->{$col}++;
+        }
+
+        # wrap any part of the order_by that "responds" to an ordering alias
+        # into a MIN/MAX
+        # FIXME - this code is a joke, will need to be completely rewritten in
+        # the DQ branch. But I need to push a POC here, otherwise the
+        # pesky tests won't pass
+        my $sql_maker = $self->sql_maker;
+        my ($lquote, $rquote, $sep) = map { quotemeta $_ } ($sql_maker->_quote_chars, $sql_maker->name_sep);
+        my $own_re = qr/ $lquote \Q$root_alias\E $rquote $sep | \b \Q$root_alias\E $sep /x;
+        my @order_chunks = map { ref $_ eq 'ARRAY' ? $_ : [ $_ ] } $sql_maker->_order_by_chunks($attrs->{order_by});
+        my @new_order = map { \$_ } @order_chunks;
+        my $inner_columns_info = $self->_resolve_column_info($inner_from);
+
+        # loop through and replace stuff that is not "ours" with a min/max func
+        # everything is a literal at this point, since we are likely properly
+        # quoted and stuff
+        for my $i (0 .. $#new_order) {
+          my $chunk = $order_chunks[$i][0];
+
+          # skip ourselves
+          next if $chunk =~ $own_re;
+
+          ($chunk, my $is_desc) = $sql_maker->_split_order_chunk($chunk);
+
+          # maybe our own unqualified column
+          my $ord_bit = (
+            $lquote and $sep and $chunk =~ /^ $lquote ([^$sep]+) $rquote $/x
+          ) ? $1 : $chunk;
+
+          next if (
+            $ord_bit
+              and
+            $inner_columns_info->{$ord_bit}
+              and
+            $inner_columns_info->{$ord_bit}{-source_alias} eq $root_alias
+          );
+
+          $new_order[$i] = \[
+            sprintf(
+              '%s(%s)%s',
+              ($is_desc ? 'MAX' : 'MIN'),
+              $chunk,
+              ($is_desc ? ' DESC' : ''),
+            ),
+            @ {$order_chunks[$i]} [ 1 .. $#{$order_chunks[$i]} ]
+          ];
+        }
+
+        $inner_attrs->{order_by} = \@new_order;
+
+        # do not care about leftovers here - it will be all the functions
+        # we just created
+        ($inner_attrs->{group_by}) = $self->_group_over_selection ({
+          %$inner_attrs,
+          from => $inner_from,
+          select => $inner_select,
+        });
+      }
     }
 
     # we already optimized $inner_from above
-    local $self->{_use_join_optimizer} = 0;
+    # and already local()ized
+    $self->{_use_join_optimizer} = 0;
 
     # generate the subquery
     $self->_select_args_to_query (
@@ -224,40 +328,38 @@ sub _adjust_select_args_for_complex_prefetch {
   my @outer_from;
 
   # we may not be the head
-  if ($root_source_offset) {
-    # first generate the outer_from, up to the substitution point
-    @outer_from = splice @$from, 0, $root_source_offset;
-
-    my $root_node = shift @$from;
+  if ($root_node_offset) {
+    # first generate the outer_from, up and including the substitution point
+    @outer_from = splice @$from, 0, $root_node_offset;
 
     push @outer_from, [
       {
-        -alias => $attrs->{alias},
-        -rsrc => $root_node->[0]{-rsrc},
-        $attrs->{alias} => $inner_subq,
+        -alias => $root_alias,
+        -rsrc => $root_node->{-rsrc},
+        $root_alias => $inner_subq,
       },
-      @{$root_node}[1 .. $#$root_node],
+      @{$from->[0]}[1 .. $#{$from->[0]}],
     ];
   }
   else {
-    my $root_node = shift @$from;
-
     @outer_from = {
-      -alias => $attrs->{alias},
+      -alias => $root_alias,
       -rsrc => $root_node->{-rsrc},
-      $attrs->{alias} => $inner_subq,
+      $root_alias => $inner_subq,
     };
   }
 
+  shift @$from; # what we just replaced above
+
   # scan the *remaining* from spec against different attributes, and see which joins are needed
   # in what role
-  my $outer_aliastypes =
+  my $outer_aliastypes = $outer_attrs->{_aliastypes} =
     $self->_resolve_aliastypes_from_select_args( $from, $outer_select, $where, $outer_attrs );
 
   # unroll parents
-  my ($outer_select_chain, $outer_restrict_chain) = map { +{
-    map { $_ => 1 } map { values %$_} map { @$_ } values %{ $outer_aliastypes->{$_} || {} }
-  } } qw/selecting restricting/;
+  my ($outer_select_chain, @outer_nonselecting_chains) = map { +{
+    map { $_ => 1 } map { values %$_} map { @{$_->{-parents}} } values %{ $outer_aliastypes->{$_} || {} }
+  } } qw/selecting restricting grouping ordering/;
 
   # see what's left - throw away if not selecting/restricting
   # also throw in a group_by if a non-selecting multiplier,
@@ -271,18 +373,19 @@ sub _adjust_select_args_for_complex_prefetch {
     ) {
       push @outer_from, $j
     }
-    elsif ($outer_restrict_chain->{$alias}) {
+    elsif (first { $_->{$alias} } @outer_nonselecting_chains ) {
       push @outer_from, $j;
       $need_outer_group_by ||= $outer_aliastypes->{multiplying}{$alias} ? 1 : 0;
     }
   }
 
-  if ($need_outer_group_by and ! $outer_attrs->{group_by}) {
-
+  if ( $need_outer_group_by and $attrs->{_grouped_by_distinct} ) {
     my $unprocessed_order_chunks;
-    ($outer_attrs->{group_by}, $unprocessed_order_chunks) = $self->_group_over_selection (
-      \@outer_from, $outer_select, $outer_attrs->{order_by}
-    );
+    ($outer_attrs->{group_by}, $unprocessed_order_chunks) = $self->_group_over_selection ({
+      %$outer_attrs,
+      from => \@outer_from,
+      select => $outer_select,
+    });
 
     $self->throw_exception (
       'A required group_by clause could not be constructed automatically due to a complex '
@@ -296,7 +399,7 @@ sub _adjust_select_args_for_complex_prefetch {
   # Unfortunately not much can be done until SQLA2 introspection arrives, and even
   # then if where conditions apply to the *right* side of the prefetch, you may have
   # to both filter the inner select (e.g. to apply a limit) and then have to re-filter
-  # the outer select to exclude joins you didin't want in the first place
+  # the outer select to exclude joins you didn't want in the first place
   #
   # OTOH it can be seen as a plus: <ash> (notes that this query would make a DBA cry ;)
   return (\@outer_from, $outer_select, $where, $outer_attrs);
@@ -307,7 +410,7 @@ sub _adjust_select_args_for_complex_prefetch {
 #
 # Due to a lack of SQLA2 we fall back to crude scans of all the
 # select/where/order/group attributes, in order to determine what
-# aliases are neded to fulfill the query. This information is used
+# aliases are needed to fulfill the query. This information is used
 # throughout the code to prune unnecessary JOINs from the queries
 # in an attempt to reduce the execution time.
 # Although the method is pretty horrific, the worst thing that can
@@ -331,7 +434,7 @@ sub _resolve_aliastypes_from_select_args {
       or next;
 
     $alias_list->{$al} = $j;
-    $aliases_by_type->{multiplying}{$al} ||= $j->{-join_path}||[] if (
+    $aliases_by_type->{multiplying}{$al} ||= { -parents => $j->{-join_path}||[] } if (
       # not array == {from} head == can't be multiplying
       ( ref($_) eq 'ARRAY' and ! $j->{-is_single} )
         or
@@ -340,7 +443,7 @@ sub _resolve_aliastypes_from_select_args {
     );
   }
 
-  # get a column to source/alias map (including unqualified ones)
+  # get a column to source/alias map (including unambiguous unqualified ones)
   my $colinfo = $self->_resolve_column_info ($from);
 
   # set up a botched SQLA
@@ -351,6 +454,7 @@ sub _resolve_aliastypes_from_select_args {
   local $sql_maker->{where_bind};
   local $sql_maker->{group_bind};
   local $sql_maker->{having_bind};
+  local $sql_maker->{from_bind};
 
   # we can't scan properly without any quoting (\b doesn't cut it
   # everywhere), so unless there is proper quoting set - use our
@@ -374,32 +478,54 @@ sub _resolve_aliastypes_from_select_args {
   my $to_scan = {
     restricting => [
       $sql_maker->_recurse_where ($where),
-      $sql_maker->_parse_rs_attrs ({
-        map { $_ => $attrs->{$_} } (qw/group_by having/)
-      }),
+      $sql_maker->_parse_rs_attrs ({ having => $attrs->{having} }),
+    ],
+    grouping => [
+      $sql_maker->_parse_rs_attrs ({ group_by => $attrs->{group_by} }),
+    ],
+    joining => [
+      $sql_maker->_recurse_from (
+        ref $from->[0] eq 'ARRAY' ? $from->[0][0] : $from->[0],
+        @{$from}[1 .. $#$from],
+      ),
     ],
     selecting => [
       $sql_maker->_recurse_fields ($select),
-      ( map { $_->[0] } $self->_extract_order_criteria ($attrs->{order_by}, $sql_maker) ),
+    ],
+    ordering => [
+      map { $_->[0] } $self->_extract_order_criteria ($attrs->{order_by}, $sql_maker),
     ],
   };
 
   # throw away empty chunks
   $_ = [ map { $_ || () } @$_ ] for values %$to_scan;
 
-  # first loop through all fully qualified columns and get the corresponding
+  # first see if we have any exact matches (qualified or unqualified)
+  for my $type (keys %$to_scan) {
+    for my $piece (@{$to_scan->{$type}}) {
+      if ($colinfo->{$piece} and my $alias = $colinfo->{$piece}{-source_alias}) {
+        $aliases_by_type->{$type}{$alias} ||= { -parents => $alias_list->{$alias}{-join_path}||[] };
+        $aliases_by_type->{$type}{$alias}{-seen_columns}{$colinfo->{$piece}{-fq_colname}} = $piece;
+      }
+    }
+  }
+
+  # now loop through all fully qualified columns and get the corresponding
   # alias (should work even if they are in scalarrefs)
   for my $alias (keys %$alias_list) {
     my $al_re = qr/
-      $lquote $alias $rquote $sep
+      $lquote $alias $rquote $sep (?: $lquote ([^$rquote]+) $rquote )?
         |
-      \b $alias \.
+      \b $alias \. ([^\s\)\($rquote]+)?
     /x;
 
     for my $type (keys %$to_scan) {
       for my $piece (@{$to_scan->{$type}}) {
-        $aliases_by_type->{$type}{$alias} ||= $alias_list->{$alias}{-join_path}||[]
-          if ($piece =~ $al_re);
+        if (my @matches = $piece =~ /$al_re/g) {
+          $aliases_by_type->{$type}{$alias} ||= { -parents => $alias_list->{$alias}{-join_path}||[] };
+          $aliases_by_type->{$type}{$alias}{-seen_columns}{"$alias.$_"} = "$alias.$_"
+            for grep { defined $_ } @matches;
+        }
       }
     }
   }
@@ -409,13 +535,15 @@ sub _resolve_aliastypes_from_select_args {
   for my $col (keys %$colinfo) {
     next if $col =~ / \. /x;   # if column is qualified it was caught by the above
 
-    my $col_re = qr/ $lquote $col $rquote /x;
+    my $col_re = qr/ $lquote ($col) $rquote /x;
 
     for my $type (keys %$to_scan) {
       for my $piece (@{$to_scan->{$type}}) {
-        if ($piece =~ $col_re) {
+        if ( my @matches = $piece =~ /$col_re/g) {
           my $alias = $colinfo->{$col}{-source_alias};
-          $aliases_by_type->{$type}{$alias} ||= $alias_list->{$alias}{-join_path}||[];
+          $aliases_by_type->{$type}{$alias} ||= { -parents => $alias_list->{$alias}{-join_path}||[] };
+          $aliases_by_type->{$type}{$alias}{-seen_columns}{"$alias.$_"} = $_
+            for grep { defined $_ } @matches;
         }
       }
     }
@@ -424,55 +552,65 @@ sub _resolve_aliastypes_from_select_args {
   # Add any non-left joins to the restriction list (such joins are indeed restrictions)
   for my $j (values %$alias_list) {
     my $alias = $j->{-alias} or next;
-    $aliases_by_type->{restricting}{$alias} ||= $j->{-join_path}||[] if (
+    $aliases_by_type->{restricting}{$alias} ||= { -parents => $j->{-join_path}||[] } if (
       (not $j->{-join_type})
         or
       ($j->{-join_type} !~ /^left (?: \s+ outer)? $/xi)
     );
   }
 
+  for (keys %$aliases_by_type) {
+    delete $aliases_by_type->{$_} unless keys %{$aliases_by_type->{$_}};
+  }
+
   return $aliases_by_type;
 }
 
 # This is the engine behind { distinct => 1 }
 sub _group_over_selection {
-  my ($self, $from, $select, $order_by) = @_;
+  my ($self, $attrs) = @_;
 
-  my $rs_column_list = $self->_resolve_column_info ($from);
+  my $colinfos = $self->_resolve_column_info ($attrs->{from});
 
   my (@group_by, %group_index);
 
   # the logic is: if it is a { func => val } we assume an aggregate,
   # otherwise if \'...' or \[...] we assume the user knows what is
   # going on thus group over it
-  for (@$select) {
+  for (@{$attrs->{select}}) {
     if (! ref($_) or ref ($_) ne 'HASH' ) {
       push @group_by, $_;
       $group_index{$_}++;
-      if ($rs_column_list->{$_} and $_ !~ /\./ ) {
+      if ($colinfos->{$_} and $_ !~ /\./ ) {
         # add a fully qualified version as well
-        $group_index{"$rs_column_list->{$_}{-source_alias}.$_"}++;
+        $group_index{"$colinfos->{$_}{-source_alias}.$_"}++;
       }
     }
   }
 
-  # add any order_by parts that are not already present in the group_by
+  # add any order_by parts *from the main source* that are not already
+  # present in the group_by
   # we need to be careful not to add any named functions/aggregates
   # i.e. order_by => [ ... { count => 'foo' } ... ]
   my @leftovers;
-  for ($self->_extract_order_criteria($order_by)) {
+  for ($self->_extract_order_criteria($attrs->{order_by})) {
     # only consider real columns (for functions the user got to do an explicit group_by)
     if (@$_ != 1) {
       push @leftovers, $_;
       next;
     }
     my $chunk = $_->[0];
-    my $colinfo = $rs_column_list->{$chunk} or do {
+
+    if (
+      !$colinfos->{$chunk}
+        or
+      $colinfos->{$chunk}{-source_alias} ne $attrs->{alias}
+    ) {
       push @leftovers, $_;
       next;
-    };
+    }
 
-    $chunk = "$colinfo->{-source_alias}.$chunk" if $chunk !~ /\./;
+    $chunk = $colinfos->{$chunk}{-fq_colname};
     push @group_by, $chunk unless $group_index{$chunk}++;
   }
 
@@ -486,14 +624,12 @@ sub _resolve_ident_sources {
   my ($self, $ident) = @_;
 
   my $alias2source = {};
-  my $rs_alias;
 
   # the reason this is so contrived is that $ident may be a {from}
   # structure, specifying multiple tables to join
   if ( blessed $ident && $ident->isa("DBIx::Class::ResultSource") ) {
     # this is compat mode for insert/update/delete which do not deal with aliases
     $alias2source->{me} = $ident;
-    $rs_alias = 'me';
   }
   elsif (ref $ident eq 'ARRAY') {
 
@@ -501,7 +637,6 @@ sub _resolve_ident_sources {
       my $tabinfo;
       if (ref $_ eq 'HASH') {
         $tabinfo = $_;
-        $rs_alias = $tabinfo->{-alias};
       }
       if (ref $_ eq 'ARRAY' and ref $_->[0] eq 'HASH') {
         $tabinfo = $_->[0];
@@ -512,7 +647,7 @@ sub _resolve_ident_sources {
     }
   }
 
-  return ($alias2source, $rs_alias);
+  return $alias2source;
 }
 
 # Takes $ident, \@column_names
@@ -524,7 +659,7 @@ sub _resolve_ident_sources {
 # for all sources
 sub _resolve_column_info {
   my ($self, $ident, $colnames) = @_;
-  my ($alias2src, $root_alias) = $self->_resolve_ident_sources($ident);
+  my $alias2src = $self->_resolve_ident_sources($ident);
 
   my (%seen_cols, @auto_colnames);
 
@@ -646,74 +781,30 @@ sub _inner_join_to_node {
   return \@new_from;
 }
 
-# yet another atrocity: attempt to extract all columns from a
-# where condition by hooking _quote
-sub _extract_condition_columns {
-  my ($self, $cond, $sql_maker_cache) = @_;
-
-  return [] unless $cond;
-
-  my $sm = $sql_maker_cache->{condparser} ||= $self->{_sql_ident_capturer} ||= do {
-    # FIXME - replace with a Moo trait
-    my $orig_sm_class = ref $self->sql_maker;
-    my $smic_class = "${orig_sm_class}::_IdentCapture_";
-
-    unless ($smic_class->isa('SQL::Abstract')) {
-
-      no strict 'refs';
-      *{"${smic_class}::_quote"} = subname "${smic_class}::_quote" => sub {
-        my ($self, $ident) = @_;
-        if (ref $ident eq 'SCALAR') {
-          $ident = $$ident;
-          my $storage_quotes = $self->sql_quote_char || '"';
-          my ($ql, $qr) = map
-            { quotemeta $_ }
-            (ref $storage_quotes eq 'ARRAY' ? @$storage_quotes : ($storage_quotes) x 2 )
-          ;
-
-          while ($ident =~ /
-            $ql (\w+) $qr
-              |
-            ([\w\.]+)
-          /xg) {
-            $self->{_captured_idents}{$1||$2}++;
-          }
-        }
-        else {
-          $self->{_captured_idents}{$ident}++;
-        }
-        return $ident;
-      };
-
-      *{"${smic_class}::_get_captured_idents"} = subname "${smic_class}::_get_captures" => sub {
-        (delete shift->{_captured_idents}) || {};
-      };
-
-      $self->inject_base ($smic_class, $orig_sm_class);
-
-    }
-
-    $smic_class->new();
-  };
-
-  $sm->_recurse_where($cond);
-
-  return [ sort keys %{$sm->_get_captured_idents} ];
-}
-
 sub _extract_order_criteria {
   my ($self, $order_by, $sql_maker) = @_;
 
   my $parser = sub {
-    my ($sql_maker, $order_by) = @_;
+    my ($sql_maker, $order_by, $orig_quote_chars) = @_;
 
     return scalar $sql_maker->_order_by_chunks ($order_by)
       unless wantarray;
 
+    my ($lq, $rq, $sep) = map { quotemeta($_) } (
+      ($orig_quote_chars ? @$orig_quote_chars : $sql_maker->_quote_chars),
+      $sql_maker->name_sep
+    );
+
     my @chunks;
     for ($sql_maker->_order_by_chunks ($order_by) ) {
-      my $chunk = ref $_ ? $_ : [ $_ ];
-      $chunk->[0] =~ s/\s+ (?: ASC|DESC ) \s* $//ix;
+      my $chunk = ref $_ ? [ @$_ ] : [ $_ ];
+      ($chunk->[0]) = $sql_maker->_split_order_chunk($chunk->[0]);
+
+      # order criteria may have come back pre-quoted (literals and whatnot)
+      # this is fragile, but the best we can currently do
+      $chunk->[0] =~ s/^ $lq (.+?) $rq $sep $lq (.+?) $rq $/"$1.$2"/xe
+        or $chunk->[0] =~ s/^ $lq (.+) $rq $/$1/x;
+
       push @chunks, $chunk;
     }
 
@@ -725,8 +816,13 @@ sub _extract_order_criteria {
   }
   else {
     $sql_maker = $self->sql_maker;
+
+    # pass these in to deal with literals coming from
+    # the user or the deep guts of prefetch
+    my $orig_quote_chars = [$sql_maker->_quote_chars];
+
     local $sql_maker->{quote_char};
-    return $parser->($sql_maker, $order_by);
+    return $parser->($sql_maker, $order_by, $orig_quote_chars);
   }
 }
 
@@ -751,7 +847,78 @@ sub _order_by_is_stable {
   return undef;
 }
 
-# returns an arrayref of column names which *definitely* have som
+# this is almost identical to the above, except it accepts only
+# a single rsrc, and will succeed only if the first portion of the order
+# by is stable.
+# returns that portion as a colinfo hashref on success
+sub _main_source_order_by_portion_is_stable {
+  my ($self, $main_rsrc, $order_by, $where) = @_;
+
+  die "Huh... I expect a blessed result_source..."
+    if ref($main_rsrc) eq 'ARRAY';
+
+  my @ord_cols = map
+    { $_->[0] }
+    ( $self->_extract_order_criteria($order_by) )
+  ;
+  return unless @ord_cols;
+
+  my $colinfos = $self->_resolve_column_info($main_rsrc);
+
+  for (0 .. $#ord_cols) {
+    if (
+      ! $colinfos->{$ord_cols[$_]}
+        or
+      $colinfos->{$ord_cols[$_]}{-result_source} != $main_rsrc
+    ) {
+      $#ord_cols =  $_ - 1;
+      last;
+    }
+  }
+
+  # we just truncated it above
+  return unless @ord_cols;
+
+  my $order_portion_ci = { map {
+    $colinfos->{$_}{-colname} => $colinfos->{$_},
+    $colinfos->{$_}{-fq_colname} => $colinfos->{$_},
+  } @ord_cols };
+
+  # since all we check here are the start of the order_by belonging to the
+  # top level $rsrc, a present identifying set will mean that the resultset
+  # is ordered by its leftmost table in a stable manner
+  #
+  # RV of _identifying_column_set contains unqualified names only
+  my $unqualified_idset = $main_rsrc->_identifying_column_set({
+    ( $where ? %{
+      $self->_resolve_column_info(
+        $main_rsrc, $self->_extract_fixed_condition_columns($where)
+      )
+    } : () ),
+    %$order_portion_ci
+  }) or return;
+
+  my $ret_info;
+  my %unqualified_idcols_from_order = map {
+    $order_portion_ci->{$_} ? ( $_ => $order_portion_ci->{$_} ) : ()
+  } @$unqualified_idset;
+
+  # extra optimization - cut the order_by at the end of the identifying set
+  # (just in case the user was stupid and overlooked the obvious)
+  for my $i (0 .. $#ord_cols) {
+    my $col = $ord_cols[$i];
+    my $unqualified_colname = $order_portion_ci->{$col}{-colname};
+    $ret_info->{$col} = { %{$order_portion_ci->{$col}}, -idx_in_order_subset => $i };
+    delete $unqualified_idcols_from_order{$ret_info->{$col}{-colname}};
+
+    # we didn't reach the end of the identifying portion yet
+    return $ret_info unless keys %unqualified_idcols_from_order;
+  }
+
+  die 'How did we get here...';
+}
+
+# returns an arrayref of column names which *definitely* have some
 # sort of non-nullable equality requested in the given condition
 # specification. This is used to figure out if a resultset is
 # constrained to a column which is part of a unique constraint,