take more care in mangling SELECT when applying subquery limits
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / SQLMaker / LimitDialects.pm
index 6ec33d5..f3b815b 100644 (file)
@@ -6,6 +6,17 @@ use strict;
 use List::Util 'first';
 use namespace::clean;
 
+# constants are used not only here, but also in comparison tests
+sub __rows_bindtype () {
+  +{ sqlt_datatype => 'integer' }
+}
+sub __offset_bindtype () {
+  +{ sqlt_datatype => 'integer' }
+}
+sub __total_bindtype () {
+  +{ sqlt_datatype => 'integer' }
+}
+
 =head1 NAME
 
 DBIx::Class::SQLMaker::LimitDialects - SQL::Abstract::Limit-like functionality for DBIx::Class::SQLMaker
@@ -30,8 +41,6 @@ names.
 
 Currently the provided dialects are:
 
-=cut
-
 =head2 LimitOffset
 
  SELECT ... LIMIT $limit OFFSET $offset
@@ -40,9 +49,13 @@ Supported by B<PostgreSQL> and B<SQLite>
 
 =cut
 sub _LimitOffset {
-    my ( $self, $sql, $order, $rows, $offset ) = @_;
-    $sql .= $self->_order_by( $order ) . " LIMIT $rows";
-    $sql .= " OFFSET $offset" if +$offset;
+    my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
+    $sql .= $self->_parse_rs_attrs( $rs_attrs ) . " LIMIT ?";
+    push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
+    if ($offset) {
+      $sql .= " OFFSET ?";
+      push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset ];
+    }
     return $sql;
 }
 
@@ -54,10 +67,15 @@ Supported by B<MySQL> and any L<SQL::Statement> based DBD
 
 =cut
 sub _LimitXY {
-    my ( $self, $sql, $order, $rows, $offset ) = @_;
-    $sql .= $self->_order_by( $order ) . " LIMIT ";
-    $sql .= "$offset, " if +$offset;
-    $sql .= $rows;
+    my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
+    $sql .= $self->_parse_rs_attrs( $rs_attrs ) . " LIMIT ";
+    if ($offset) {
+      $sql .= '?, ';
+      push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset ];
+    }
+    $sql .= '?';
+    push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
+
     return $sql;
 }
 
@@ -77,13 +95,9 @@ B<< MSSQL >= 2005 >>.
 sub _RowNumberOver {
   my ($self, $sql, $rs_attrs, $rows, $offset ) = @_;
 
-  # mangle the input sql as we will be replacing the selector
-  $sql =~ s/^ \s* SELECT \s+ .+? \s+ (?= \b FROM \b )//ix
-    or $self->throw_exception("Unrecognizable SELECT: $sql");
-
   # get selectors, and scan the order_by (if any)
-  my ($in_sel, $out_sel, $alias_map, $extra_order_sel)
-    = $self->_subqueried_limit_attrs ( $rs_attrs );
+  my ($stripped_sql, $in_sel, $out_sel, $alias_map, $extra_order_sel)
+    = $self->_subqueried_limit_attrs ( $sql, $rs_attrs );
 
   # make up an order if none exists
   my $requested_order = (delete $rs_attrs->{order_by}) || $self->_rno_default_order;
@@ -119,17 +133,18 @@ sub _RowNumberOver {
   my $qalias = $self->_quote ($rs_attrs->{alias});
   my $idx_name = $self->_quote ('rno__row__index');
 
-  $sql = sprintf (<<EOS, $offset + 1, $offset + $rows, );
+  push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset + 1], [ $self->__total_bindtype => $offset + $rows ];
+
+  return <<EOS;
 
 SELECT $out_sel FROM (
   SELECT $mid_sel, ROW_NUMBER() OVER( $rno_ord ) AS $idx_name FROM (
-    SELECT $in_sel ${sql}${group_having}
+    SELECT $in_sel ${stripped_sql}${group_having}
   ) $qalias
-) $qalias WHERE $idx_name BETWEEN %u AND %u
+) $qalias WHERE $idx_name >= ? AND $idx_name <= ?
 
 EOS
 
-  return $sql;
 }
 
 # some databases are happy with OVER (), some need OVER (ORDER BY (SELECT (1)) )
@@ -153,10 +168,16 @@ sub _SkipFirst {
 
   return sprintf ('SELECT %s%s%s%s',
     $offset
-      ? sprintf ('SKIP %u ', $offset)
+      ? do {
+         push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset];
+         'SKIP ? '
+      }
       : ''
     ,
-    sprintf ('FIRST %u ', $rows),
+    do {
+       push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
+       'FIRST ? '
+    },
     $sql,
     $self->_parse_rs_attrs ($rs_attrs),
   );
@@ -177,9 +198,15 @@ sub _FirstSkip {
     or $self->throw_exception("Unrecognizable SELECT: $sql");
 
   return sprintf ('SELECT %s%s%s%s',
-    sprintf ('FIRST %u ', $rows),
+    do {
+       push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
+       'FIRST ? '
+    },
     $offset
-      ? sprintf ('SKIP %u ', $offset)
+      ? do {
+         push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset];
+         'SKIP ? '
+      }
       : ''
     ,
     $sql,
@@ -201,55 +228,46 @@ Supported by B<Oracle>.
 sub _RowNum {
   my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
 
-  # mangle the input sql as we will be replacing the selector
-  $sql =~ s/^ \s* SELECT \s+ .+? \s+ (?= \b FROM \b )//ix
-    or $self->throw_exception("Unrecognizable SELECT: $sql");
-
-  my ($insel, $outsel) = $self->_subqueried_limit_attrs ($rs_attrs);
+  my ($stripped_sql, $insel, $outsel) = $self->_subqueried_limit_attrs ($sql, $rs_attrs);
 
   my $qalias = $self->_quote ($rs_attrs->{alias});
   my $idx_name = $self->_quote ('rownum__index');
   my $order_group_having = $self->_parse_rs_attrs($rs_attrs);
 
+
   if ($offset) {
 
-    $sql = sprintf (<<EOS, $offset + $rows, $offset + 1 );
+    push @{$self->{limit_bind}}, [ $self->__total_bindtype => $offset + $rows ], [ $self->__offset_bindtype => $offset + 1 ];
 
+    return <<EOS;
 SELECT $outsel FROM (
   SELECT $outsel, ROWNUM $idx_name FROM (
-    SELECT $insel ${sql}${order_group_having}
-  ) $qalias WHERE ROWNUM <= %u
-) $qalias WHERE $idx_name >= %u
-
+    SELECT $insel ${stripped_sql}${order_group_having}
+  ) $qalias WHERE ROWNUM <= ?
+) $qalias WHERE $idx_name >= ?
 EOS
+
   }
   else {
-    $sql = sprintf (<<EOS, $rows );
+    push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
 
+    return <<EOS;
   SELECT $outsel FROM (
-    SELECT $insel ${sql}${order_group_having}
-  ) $qalias WHERE ROWNUM <= %u
-
+    SELECT $insel ${stripped_sql}${order_group_having}
+  ) $qalias WHERE ROWNUM <= ?
 EOS
-  }
 
-  return $sql;
+  }
 }
 
 # used by _Top and _FetchFirst
 sub _prep_for_skimming_limit {
   my ( $self, $sql, $rs_attrs ) = @_;
 
-  # mangle the input sql as we will be replacing the selector
-  $sql =~ s/^ \s* SELECT \s+ .+? \s+ (?= \b FROM \b )//ix
-    or $self->throw_exception("Unrecognizable SELECT: $sql");
-
-  my %r = ( inner_sql => $sql );
-
   # get selectors
-  my ($alias_map, $extra_order_sel);
-  ($r{in_sel}, $r{out_sel}, $alias_map, $extra_order_sel)
-    = $self->_subqueried_limit_attrs ($rs_attrs);
+  my (%r, $alias_map, $extra_order_sel);
+  ($r{inner_sql}, $r{in_sel}, $r{out_sel}, $alias_map, $extra_order_sel)
+    = $self->_subqueried_limit_attrs ($sql, $rs_attrs);
 
   my $requested_order = delete $rs_attrs->{order_by};
   $r{order_by_requested} = $self->_order_by ($requested_order);
@@ -476,10 +494,6 @@ sub _GenericSubQ {
   my $root_rsrc = $rs_attrs->{_rsroot_rsrc};
   my $root_tbl_name = $root_rsrc->name;
 
-  # mangle the input sql as we will be replacing the selector
-  $sql =~ s/^ \s* SELECT \s+ .+? \s+ (?= \b FROM \b )//ix
-    or $self->throw_exception("Unrecognizable SELECT: $sql");
-
   my ($order_by, @rest) = do {
     local $self->{quote_char};
     $self->_order_by_chunks ($rs_attrs->{order_by})
@@ -538,8 +552,8 @@ sub _GenericSubQ {
     "Generic Subquery Limit order criteria column '$order_by' must be unique (no unique constraint found)"
   ) unless $is_u;
 
-  my ($in_sel, $out_sel, $alias_map, $extra_order_sel)
-    = $self->_subqueried_limit_attrs ($rs_attrs);
+  my ($stripped_sql, $in_sel, $out_sel, $alias_map, $extra_order_sel)
+    = $self->_subqueried_limit_attrs ($sql, $rs_attrs);
 
   my $cmp_op = $direction eq 'desc' ? '>' : '<';
   my $count_tbl_alias = 'rownum__emulation';
@@ -550,28 +564,37 @@ sub _GenericSubQ {
   # add the order supplement (if any) as this is what will be used for the outer WHERE
   $in_sel .= ", $_" for keys %{$extra_order_sel||{}};
 
-  $sql = sprintf (<<EOS,
+  my $rownum_cond;
+  if ($offset) {
+    $rownum_cond = 'BETWEEN ? AND ?';
+
+    push @{$self->{limit_bind}},
+      [ $self->__offset_bindtype => $offset ],
+      [ $self->__total_bindtype => $offset + $rows - 1]
+    ;
+  }
+  else {
+    $rownum_cond = '< ?';
+
+    push @{$self->{limit_bind}},
+      [ $self->__rows_bindtype => $rows ]
+    ;
+  }
+
+  return sprintf ("
 SELECT $out_sel
   FROM (
-    SELECT $in_sel ${sql}${group_having_sql}
+    SELECT $in_sel ${stripped_sql}${group_having_sql}
   ) %s
-WHERE ( SELECT COUNT(*) FROM %s %s WHERE %s $cmp_op %s ) %s
+WHERE ( SELECT COUNT(*) FROM %s %s WHERE %s $cmp_op %s ) $rownum_cond
 $order_sql
-EOS
-    ( map { $self->_quote ($_) } (
-      $rs_attrs->{alias},
-      $root_tbl_name,
-      $count_tbl_alias,
-      "$count_tbl_alias.$unq_sort_col",
-      $order_by,
-    )),
-    $offset
-      ? sprintf ('BETWEEN %u AND %u', $offset, $offset + $rows - 1)
-      : sprintf ('< %u', $rows )
-    ,
-  );
-
-  return $sql;
+  ", map { $self->_quote ($_) } (
+    $rs_attrs->{alias},
+    $root_tbl_name,
+    $count_tbl_alias,
+    "$count_tbl_alias.$unq_sort_col",
+    $order_by,
+  ));
 }
 
 
@@ -583,10 +606,10 @@ EOS
 # turned into a column alias (otherwise names in subqueries clash
 # and/or lose their source table)
 #
-# Returns inner/outer strings of SQL QUOTED selectors with aliases
-# (to be used in whatever select statement), and an alias index hashref
-# of QUOTED SEL => QUOTED ALIAS pairs (to maybe be used for string-subst
-# higher up).
+# Returns mangled proto-sql, inner/outer strings of SQL QUOTED selectors
+# with aliases (to be used in whatever select statement), and an alias
+# index hashref of QUOTED SEL => QUOTED ALIAS pairs (to maybe be used 
+# for string-subst higher up).
 # If an order_by is supplied, the inner select needs to bring out columns
 # used in implicit (non-selected) orders, and the order condition itself
 # needs to be realiased to the proper names in the outer query. Thus we
@@ -594,14 +617,26 @@ EOS
 # QUOTED ALIAS pairs, which is a list of extra selectors that do *not*
 # exist in the original select list
 sub _subqueried_limit_attrs {
-  my ($self, $rs_attrs) = @_;
+  my ($self, $proto_sql, $rs_attrs) = @_;
 
   $self->throw_exception(
     'Limit dialect implementation usable only in the context of DBIC (missing $rs_attrs)'
   ) unless ref ($rs_attrs) eq 'HASH';
 
+  # mangle the input sql as we will be replacing the selector entirely
+  unless (
+    $rs_attrs->{_selector_sql}
+      and
+    $proto_sql =~ s/^ \s* SELECT \s* \Q$rs_attrs->{_selector_sql}//ix
+  ) {
+    $self->throw_exception("Unrecognizable SELECT: $proto_sql");
+  }
+
   my ($re_sep, $re_alias) = map { quotemeta $_ } ( $self->{name_sep}, $rs_attrs->{alias} );
 
+  # insulate from the multiple _recurse_fields calls below
+  local $self->{select_bind};
+
   # correlate select and as, build selection index
   my (@sel, $in_sel_index);
   for my $i (0 .. $#{$rs_attrs->{select}}) {
@@ -612,7 +647,10 @@ sub _subqueried_limit_attrs {
 
     push @sel, {
       sql => $sql_sel,
-      unquoted_sql => do { local $self->{quote_char}; $self->_recurse_fields ($s) },
+      unquoted_sql => do {
+        local $self->{quote_char};
+        $self->_recurse_fields ($s);
+      },
       as =>
         $sql_alias
           ||
@@ -656,7 +694,6 @@ sub _subqueried_limit_attrs {
       push @out_sel, $self->_quote ($node->{as});
     }
   }
-
   # see if the order gives us anything
   my %extra_order_sel;
   for my $chunk ($self->_order_by_chunks ($rs_attrs->{order_by})) {
@@ -672,6 +709,7 @@ sub _subqueried_limit_attrs {
   }
 
   return (
+    $proto_sql,
     (map { join (', ', @$_ ) } (
       \@in_sel,
       \@out_sel)