Consolidate handling of "is this a literal" and "is this a value"
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBIHacks.pm
index 8a3cb02..7a3587d 100644 (file)
@@ -16,6 +16,7 @@ use mro 'c3';
 use List::Util 'first';
 use Scalar::Util 'blessed';
 use Sub::Name 'subname';
+use DBIx::Class::_Util qw(is_plain_value is_literal_value);
 use namespace::clean;
 
 #
@@ -389,7 +390,6 @@ sub _resolve_aliastypes_from_select_args {
   my $sql_maker = $self->sql_maker;
 
   # these are throw away results, do not pollute the bind stack
-  local $sql_maker->{select_bind};
   local $sql_maker->{where_bind};
   local $sql_maker->{group_bind};
   local $sql_maker->{having_bind};
@@ -429,15 +429,46 @@ sub _resolve_aliastypes_from_select_args {
       ),
     ],
     selecting => [
-      $sql_maker->_recurse_fields ($attrs->{select}),
+      map { ($sql_maker->_recurse_fields($_))[0] } @{$attrs->{select}},
     ],
     ordering => [
       map { $_->[0] } $self->_extract_order_criteria ($attrs->{order_by}, $sql_maker),
     ],
   };
 
-  # throw away empty chunks
-  $_ = [ map { $_ || () } @$_ ] for values %$to_scan;
+  # throw away empty chunks and all 2-value arrayrefs: the thinking is that these are
+  # bind value specs left in by the sloppy renderer above. It is ok to do this
+  # at this point, since we are going to end up rewriting this crap anyway
+  for my $v (values %$to_scan) {
+    my @nv;
+    for (@$v) {
+      next if (
+        ! defined $_
+          or
+        (
+          ref $_ eq 'ARRAY'
+            and
+          ( @$_ == 0 or @$_ == 2 )
+        )
+      );
+
+      if (ref $_) {
+        require Data::Dumper::Concise;
+        $self->throw_exception("Unexpected ref in scan-plan: " . Data::Dumper::Concise::Dumper($v) );
+      }
+
+      push @nv, $_;
+    }
+
+    $v = \@nv;
+  }
+
+  # kill all selectors which look like a proper subquery
+  # this is a sucky heuristic *BUT* - if we get it wrong the query will simply
+  # fail to run, so we are relatively safe
+  $to_scan->{selecting} = [ grep {
+    $_ !~ / \A \s* \( \s* SELECT \s+ .+? \s+ FROM \s+ .+? \) \s* \z /xsi
+  } @{ $to_scan->{selecting} || [] } ];
 
   # first see if we have any exact matches (qualified or unqualified)
   for my $type (keys %$to_scan) {
@@ -623,9 +654,10 @@ sub _group_over_selection {
   }
 
   $self->throw_exception ( sprintf
-    'A required group_by clause could not be constructed automatically due to a complex '
-  . 'order_by criteria (%s). Either order_by columns only (no functions) or construct a suitable '
-  . 'group_by by hand',
+    'Unable to programatically derive a required group_by from the supplied '
+  . 'order_by criteria. To proceed either add an explicit group_by, or '
+  . 'simplify your order_by to only include plain columns '
+  . '(supplied order_by: %s)',
     join ', ', map { "'$_'" } @$leftovers,
   ) if $leftovers;
 
@@ -679,6 +711,9 @@ sub _resolve_ident_sources {
 # for all sources
 sub _resolve_column_info {
   my ($self, $ident, $colnames) = @_;
+
+  return {} if $colnames and ! @$colnames;
+
   my $alias2src = $self->_resolve_ident_sources($ident);
 
   my (%seen_cols, @auto_colnames);
@@ -850,8 +885,8 @@ sub _order_by_is_stable {
   my ($self, $ident, $order_by, $where) = @_;
 
   my @cols = (
-    (map { $_->[0] } $self->_extract_order_criteria($order_by)),
-    $where ? @{$self->_extract_fixed_condition_columns($where)} :(),
+    ( map { $_->[0] } $self->_extract_order_criteria($order_by) ),
+    ( $where ? @{ $self->_extract_fixed_condition_columns($where) || [] } : () ),
   ) or return undef;
 
   my $colinfo = $self->_resolve_column_info($ident, \@cols);
@@ -922,7 +957,7 @@ sub _main_source_order_by_portion_is_stable {
   my $unqualified_idset = $main_rsrc->_identifying_column_set({
     ( $where ? %{
       $self->_resolve_column_info(
-        $main_rsrc, $self->_extract_fixed_condition_columns($where)
+        $main_rsrc, $self->_extract_fixed_condition_columns($where)||[]
       )
     } : () ),
     %$order_portion_ci
@@ -948,8 +983,212 @@ sub _main_source_order_by_portion_is_stable {
   die 'How did we get here...';
 }
 
+# Attempts to flatten a passed in SQLA condition as much as possible towards
+# a plain hashref, *without* altering its semantics. Required by
+# create/populate being able to extract definitive conditions from preexisting
+# resultset {where} stacks
+#
+# FIXME - while relatively robust, this is still imperfect, one of the first
+# things to tackle with DQ
+sub _collapse_cond {
+  my ($self, $where, $where_is_anded_array) = @_;
+
+  if (! $where) {
+    return;
+  }
+  elsif ($where_is_anded_array or ref $where eq 'HASH') {
+
+    my @pairs;
+
+    my @pieces = $where_is_anded_array ? @$where : $where;
+    while (@pieces) {
+      my $chunk = shift @pieces;
+
+      if (ref $chunk eq 'HASH') {
+        push @pairs, map { [ $_ => $chunk->{$_} ] } sort keys %$chunk;
+      }
+      elsif (ref $chunk eq 'ARRAY') {
+        push @pairs, [ -or => $chunk ]
+          if @$chunk;
+      }
+      elsif ( ! ref $chunk) {
+        push @pairs, [ $chunk, shift @pieces ];
+      }
+      else {
+        push @pairs, [ '', $chunk ];
+      }
+    }
+
+    return unless @pairs;
+
+    my @conds = $self->_collapse_cond_unroll_pairs(\@pairs)
+      or return;
+
+    # Consolidate various @conds back into something more compact
+    my $fin;
+
+    for my $c (@conds) {
+      if (ref $c ne 'HASH') {
+        push @{$fin->{-and}}, $c;
+      }
+      else {
+        for my $col (sort keys %$c) {
+          if (exists $fin->{$col}) {
+            my ($l, $r) = ($fin->{$col}, $c->{$col});
+
+            (ref $_ ne 'ARRAY' or !@$_) and $_ = [ -and => $_ ] for ($l, $r);
+
+            if (@$l and @$r and $l->[0] eq $r->[0] and $l->[0] eq '-and') {
+              $fin->{$col} = [ -and => map { @$_[1..$#$_] } ($l, $r) ];
+            }
+            else {
+              $fin->{$col} = [ -and => $fin->{$col}, $c->{$col} ];
+            }
+          }
+          else {
+            $fin->{$col} = $c->{$col};
+          }
+        }
+      }
+    }
+
+    if ( ref $fin->{-and} eq 'ARRAY' and @{$fin->{-and}} == 1 ) {
+      my $piece = (delete $fin->{-and})->[0];
+      if (ref $piece eq 'ARRAY') {
+        $fin->{-or} = $fin->{-or} ? [ $piece, $fin->{-or} ] : $piece;
+      }
+      elsif (! exists $fin->{''}) {
+        $fin->{''} = $piece;
+      }
+    }
+
+    return $fin;
+  }
+  elsif (ref $where eq 'ARRAY') {
+    my @w = @$where;
+
+    while ( @w and (
+      (ref $w[0] eq 'ARRAY' and ! @{$w[0]} )
+        or
+      (ref $w[0] eq 'HASH' and ! keys %{$w[0]})
+    )) { shift @w };
+
+    return unless @w;
+
+    if ( @w == 1 ) {
+      return ( ref $w[0] )
+        ? $self->_collapse_cond($w[0])
+        : { $w[0] => undef }
+      ;
+    }
+    elsif ( ref $w[0] ) {
+      return \@w;
+    }
+    elsif ( @w == 2 ) {
+      if ( ( $w[0]||'' ) =~ /^\-and$/i ) {
+        return (ref $w[1] eq 'HASH' or ref $w[1] eq 'ARRAY')
+          ? $self->_collapse_cond($w[1], (ref $w[1] eq 'ARRAY') )
+          : $self->throw_exception("Unsupported top-level op/arg pair: [ $w[0] => $w[1] ]")
+        ;
+      }
+      else {
+        return $self->_collapse_cond({ @w });
+      }
+    }
+  }
+  else {
+    # not a hash not an array
+    return { '' => $where };
+  }
+
+  # catchall, some of the things above fall through
+  return $where;
+}
+
+sub _collapse_cond_unroll_pairs {
+  my ($self, $pairs) = @_;
+
+  my @conds;
+
+  while (@$pairs) {
+    my ($lhs, $rhs) = @{ shift @$pairs };
+
+    if ($lhs eq '') {
+      push @conds, $self->_collapse_cond($rhs);
+    }
+    elsif ( $lhs =~ /^\-and$/i ) {
+      push @conds, $self->_collapse_cond($rhs, (ref $rhs eq 'ARRAY'));
+    }
+    elsif ( $lhs =~ /^\-or$/i ) {
+      push @conds, $self->_collapse_cond(
+        (ref $rhs eq 'HASH') ? [ map { $_ => $rhs->{$_} } sort keys %$rhs ] : $rhs
+      );
+    }
+    else {
+      if (ref $rhs eq 'HASH' and ! keys %$rhs) {
+        # FIXME - SQLA seems to be doing... nothing...?
+      }
+      elsif (ref $rhs eq 'HASH' and keys %$rhs == 1 and exists $rhs->{'='}) {
+        for my $p ($self->_collapse_cond_unroll_pairs([ [ $lhs => $rhs->{'='} ] ])) {
+
+          # extra sanity check
+          if (keys %$p > 1) {
+            require Data::Dumper::Concise;
+            local $Data::Dumper::Deepcopy = 1;
+            $self->throw_exception(
+              "Internal error: unexpected collapse unroll:"
+            . Data::Dumper::Concise::Dumper { in => { $lhs => $rhs }, out => $p }
+            );
+          }
+
+          my ($l, $r) = %$p;
+
+          push @conds, ( ! length ref $r or is_plain_value($r) )
+            ? { $l => $r }
+            : { $l => { '=' => $r } }
+          ;
+        }
+      }
+      elsif (ref $rhs eq 'ARRAY') {
+        # some of these conditionals encounter multi-values - roll them out using
+        # an unshift, which will cause extra looping in the while{} above
+        if (! @$rhs ) {
+          push @conds, { $lhs => [] };
+        }
+        elsif ( ($rhs->[0]||'') =~ /^\-(?:and|or)$/i ) {
+          $self->throw_exception("Value modifier not followed by any values: $lhs => [ $rhs->[0] ] ")
+            if  @$rhs == 1;
+
+          if( $rhs->[0] =~ /^\-and$/i ) {
+            unshift @$pairs, map { [ $lhs => $_ ] } @{$rhs}[1..$#$rhs];
+          }
+          # if not an AND then it's an OR
+          elsif(@$rhs == 2) {
+            unshift @$pairs, [ $lhs => $rhs->[1] ];
+          }
+          else {
+            push @conds, { $lhs => $rhs };
+          }
+        }
+        elsif (@$rhs == 1) {
+          unshift @$pairs, [ $lhs => $rhs->[0] ];
+        }
+        else {
+          push @conds, { $lhs => $rhs };
+        }
+      }
+      else {
+        push @conds, { $lhs => $rhs };
+      }
+    }
+  }
+
+  return @conds;
+}
+
+
 # returns an arrayref of column names which *definitely* have some
-# sort of non-nullable equality requested in the given condition
+# sort of non-nullable *single* equality requested in the given condition
 # specification. This is used to figure out if a resultset is
 # constrained to a column which is part of a unique constraint,
 # which in turn allows us to better predict how ordering will behave
@@ -958,31 +1197,29 @@ sub _main_source_order_by_portion_is_stable {
 # this is a rudimentary, incomplete, and error-prone extractor
 # however this is OK - it is conservative, and if we can not find
 # something that is in fact there - the stack will recover gracefully
-# Also - DQ and the mst it rode in on will save us all RSN!!!
 sub _extract_fixed_condition_columns {
-  my ($self, $where) = @_;
-
-  return unless ref $where eq 'HASH';
-
-  my @cols;
-  for my $lhs (keys %$where) {
-    if ($lhs =~ /^\-and$/i) {
-      push @cols, ref $where->{$lhs} eq 'ARRAY'
-        ? ( map { @{ $self->_extract_fixed_condition_columns($_) } } @{$where->{$lhs}} )
-        : @{ $self->_extract_fixed_condition_columns($where->{$lhs}) }
-      ;
-    }
-    elsif ($lhs !~ /^\-/) {
-      my $val = $where->{$lhs};
-
-      push @cols, $lhs if (defined $val and (
-        ! ref $val
+  my $self = shift;
+  my $where_hash = $self->_collapse_cond(shift);
+
+  my $res;
+  for my $c (keys %$where_hash) {
+    if (defined (my $v = $where_hash->{$c}) ) {
+      if (
+        ! length ref $v
           or
-        (ref $val eq 'HASH' and keys %$val == 1 and defined $val->{'='})
-      ));
+        (ref $v eq 'HASH' and keys %$v == 1 and defined $v->{'='} and (
+          is_literal_value($v->{'='}) or is_plain_value( $v->{'='})
+        ))
+      ) {
+        $res->{$c} = 1;
+      }
+      elsif (ref $v eq 'ARRAY' and ($v->[0]||'') eq '-and') {
+        $res->{$_} = 1 for map { @{ $self->_extract_fixed_condition_columns({ $c => $_ }) } } @{$v}[1..$#$v];
+      }
     }
   }
-  return \@cols;
+
+  return [ sort keys %$res ];
 }
 
 1;