Factor out IDENTITY_INSERT for Sybase ASE and MSSQL into a component
[dbsrgits/DBIx-Class-Historic.git] / lib / DBIx / Class / SQLMaker / LimitDialects.pm
index 29d8eb3..ac145c3 100644 (file)
@@ -95,13 +95,9 @@ B<< MSSQL >= 2005 >>.
 sub _RowNumberOver {
   my ($self, $sql, $rs_attrs, $rows, $offset ) = @_;
 
-  # mangle the input sql as we will be replacing the selector
-  $sql =~ s/^ \s* SELECT \s+ .+? \s+ (?= \b FROM \b )//ix
-    or $self->throw_exception("Unrecognizable SELECT: $sql");
-
   # get selectors, and scan the order_by (if any)
-  my ($in_sel, $out_sel, $alias_map, $extra_order_sel)
-    = $self->_subqueried_limit_attrs ( $rs_attrs );
+  my ($stripped_sql, $in_sel, $out_sel, $alias_map, $extra_order_sel)
+    = $self->_subqueried_limit_attrs ( $sql, $rs_attrs );
 
   # make up an order if none exists
   my $requested_order = (delete $rs_attrs->{order_by}) || $self->_rno_default_order;
@@ -118,8 +114,6 @@ sub _RowNumberOver {
         $extra_col,
         $extra_order_sel->{$extra_col},
       );
-
-      $mid_sel .= ', ' . $extra_order_sel->{$extra_col};
     }
   }
 
@@ -137,18 +131,18 @@ sub _RowNumberOver {
   my $qalias = $self->_quote ($rs_attrs->{alias});
   my $idx_name = $self->_quote ('rno__row__index');
 
-  $sql = <<EOS;
+  push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset + 1], [ $self->__total_bindtype => $offset + $rows ];
+
+  return <<EOS;
 
 SELECT $out_sel FROM (
   SELECT $mid_sel, ROW_NUMBER() OVER( $rno_ord ) AS $idx_name FROM (
-    SELECT $in_sel ${sql}${group_having}
+    SELECT $in_sel ${stripped_sql}${group_having}
   ) $qalias
 ) $qalias WHERE $idx_name >= ? AND $idx_name <= ?
 
 EOS
-   push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset + 1], [ $self->__total_bindtype => $offset + $rows ];
 
-  return $sql;
 }
 
 # some databases are happy with OVER (), some need OVER (ORDER BY (SELECT (1)) )
@@ -173,13 +167,13 @@ sub _SkipFirst {
   return sprintf ('SELECT %s%s%s%s',
     $offset
       ? do {
-         push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset];
+         push @{$self->{pre_select_bind}}, [ $self->__offset_bindtype => $offset];
          'SKIP ? '
       }
       : ''
     ,
     do {
-       push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
+       push @{$self->{pre_select_bind}}, [ $self->__rows_bindtype => $rows ];
        'FIRST ? '
     },
     $sql,
@@ -203,12 +197,12 @@ sub _FirstSkip {
 
   return sprintf ('SELECT %s%s%s%s',
     do {
-       push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
+       push @{$self->{pre_select_bind}}, [ $self->__rows_bindtype => $rows ];
        'FIRST ? '
     },
     $offset
       ? do {
-         push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset];
+         push @{$self->{pre_select_bind}}, [ $self->__offset_bindtype => $offset];
          'SKIP ? '
       }
       : ''
@@ -218,79 +212,132 @@ sub _FirstSkip {
   );
 }
 
+
 =head2 RowNum
 
+Depending on the resultset attributes one of:
+
  SELECT * FROM (
   SELECT *, ROWNUM rownum__index FROM (
    SELECT ...
   ) WHERE ROWNUM <= ($limit+$offset)
  ) WHERE rownum__index >= ($offset+1)
 
+or
+
+ SELECT * FROM (
+  SELECT *, ROWNUM rownum__index FROM (
+    SELECT ...
+  )
+ ) WHERE rownum__index BETWEEN ($offset+1) AND ($limit+$offset)
+
+or
+
+ SELECT * FROM (
+    SELECT ...
+  ) WHERE ROWNUM <= ($limit+1)
+
 Supported by B<Oracle>.
 
 =cut
 sub _RowNum {
   my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
 
-  # mangle the input sql as we will be replacing the selector
-  $sql =~ s/^ \s* SELECT \s+ .+? \s+ (?= \b FROM \b )//ix
-    or $self->throw_exception("Unrecognizable SELECT: $sql");
-
-  my ($insel, $outsel) = $self->_subqueried_limit_attrs ($rs_attrs);
+  my ($stripped_sql, $insel, $outsel) = $self->_subqueried_limit_attrs ($sql, $rs_attrs);
 
   my $qalias = $self->_quote ($rs_attrs->{alias});
   my $idx_name = $self->_quote ('rownum__index');
   my $order_group_having = $self->_parse_rs_attrs($rs_attrs);
 
-  if ($offset) {
+  #
+  # There are two ways to limit in Oracle, one vastly faster than the other
+  # on large resultsets: https://decipherinfosys.wordpress.com/2007/08/09/paging-and-countstopkey-optimization/
+  # However Oracle is retarded and does not preserve stable ROWNUM() values
+  # when called twice in the same scope. Therefore unless the resultset is
+  # ordered by a unique set of columns, it is not safe to use the faster
+  # method, and the slower BETWEEN query is used instead
+  #
+  # FIXME - this is quite expensive, and does not perform caching of any sort
+  # as soon as some of the DQ work becomes viable consider switching this
+  # over
+  if (
+    $rs_attrs->{order_by}
+      and
+    $rs_attrs->{_rsroot_rsrc}->storage->_order_by_is_stable(
+      $rs_attrs->{from}, $rs_attrs->{order_by}
+    )
+  ) {
+    # if offset is 0 (first page) the we can skip a subquery
+    if (! $offset) {
+      push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
 
-    push @{$self->{limit_bind}}, [ $self->__total_bindtype => $offset + $rows ], [ $self->__offset_bindtype => $offset + 1 ];
-    $sql =<<"EOS";
+      return <<EOS;
+SELECT $outsel FROM (
+  SELECT $insel ${stripped_sql}${order_group_having}
+) $qalias WHERE ROWNUM <= ?
+EOS
+    }
+    else {
+      push @{$self->{limit_bind}}, [ $self->__total_bindtype => $offset + $rows ], [ $self->__offset_bindtype => $offset + 1 ];
+
+      return <<EOS;
 SELECT $outsel FROM (
   SELECT $outsel, ROWNUM $idx_name FROM (
-    SELECT $insel ${sql}${order_group_having}
+    SELECT $insel ${stripped_sql}${order_group_having}
   ) $qalias WHERE ROWNUM <= ?
 ) $qalias WHERE $idx_name >= ?
 EOS
+    }
   }
   else {
-    push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
-    $sql =<<"EOS";
-  SELECT $outsel FROM (
-    SELECT $insel ${sql}${order_group_having}
-  ) $qalias WHERE ROWNUM <= ?
+    push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset + 1 ], [ $self->__total_bindtype => $offset + $rows ];
+
+    return <<EOS;
+SELECT $outsel FROM (
+  SELECT $outsel, ROWNUM $idx_name FROM (
+    SELECT $insel ${stripped_sql}${order_group_having}
+  ) $qalias
+) $qalias WHERE $idx_name BETWEEN ? AND ?
 EOS
   }
-
-  return $sql;
 }
 
-# used by _Top and _FetchFirst
+# used by _Top and _FetchFirst below
 sub _prep_for_skimming_limit {
   my ( $self, $sql, $rs_attrs ) = @_;
 
-  # mangle the input sql as we will be replacing the selector
-  $sql =~ s/^ \s* SELECT \s+ .+? \s+ (?= \b FROM \b )//ix
-    or $self->throw_exception("Unrecognizable SELECT: $sql");
-
-  my %r = ( inner_sql => $sql );
-
   # get selectors
-  my ($alias_map, $extra_order_sel);
-  ($r{in_sel}, $r{out_sel}, $alias_map, $extra_order_sel)
-    = $self->_subqueried_limit_attrs ($rs_attrs);
+  my (%r, $alias_map, $extra_order_sel);
+  ($r{inner_sql}, $r{in_sel}, $r{out_sel}, $alias_map, $extra_order_sel)
+    = $self->_subqueried_limit_attrs ($sql, $rs_attrs);
 
   my $requested_order = delete $rs_attrs->{order_by};
   $r{order_by_requested} = $self->_order_by ($requested_order);
 
-  # make up an order unless supplied
-  my $inner_order = ($r{order_by_requested}
-    ? $requested_order
-    : [ map
+  # make up an order unless supplied or sanity check what we are given
+  my $inner_order;
+  if ($r{order_by_requested}) {
+    $self->throw_exception (
+      'Unable to safely perform "skimming type" limit with supplied unstable order criteria'
+    ) unless $rs_attrs->{_rsroot_rsrc}->schema->storage->_order_by_is_stable(
+      $rs_attrs->{from},
+      $requested_order
+    );
+
+    $inner_order = $requested_order;
+  }
+  else {
+    $inner_order = [ map
       { "$rs_attrs->{alias}.$_" }
-      ( $rs_attrs->{_rsroot_rsrc}->_pri_cols )
-    ]
-  );
+      ( @{
+        $rs_attrs->{_rsroot_rsrc}->_identifying_column_set
+          ||
+        $self->throw_exception(sprintf(
+          'Unable to auto-construct stable order criteria for "skimming type" limit '
+        . "dialect based on source '%s'", $rs_attrs->{_rsroot_rsrc}->name) );
+      } )
+    ];
+  }
 
   # localise as we already have all the bind values we need
   {
@@ -325,11 +372,21 @@ sub _prep_for_skimming_limit {
       $r{mid_sel} .= ', ' . $extra_order_sel->{$extra_col};
     }
 
-    # since whatever order bindvals there are, they will be realiased
-    # and need to show up in front of the entire initial inner subquery
-    # *unshift* the selector bind stack to make this happen (horrible,
-    # horrible, but we don't have another mechanism yet)
-    unshift @{$self->{select_bind}}, @{$self->{order_bind}};
+    # Whatever order bindvals there are, they will be realiased and
+    # need to show up in front of the entire initial inner subquery
+    push @{$self->{pre_select_bind}}, @{$self->{order_bind}};
+  }
+
+  # if this is a part of something bigger, we need to add back all
+  # the extra order_by's, as they may be relied upon by the outside
+  # of a prefetch or something
+  if ($rs_attrs->{_is_internal_subuery} and keys %$extra_order_sel) {
+    $r{out_sel} .= sprintf ", $extra_order_sel->{$_} AS $_"
+      for sort
+        { $extra_order_sel->{$a} cmp $extra_order_sel->{$b} }
+          grep { $_ !~ /[^\w\-]/ }  # ignore functions
+          keys %$extra_order_sel
+    ;
   }
 
   # and this is order re-alias magic
@@ -505,70 +562,32 @@ sub _GenericSubQ {
   my $root_rsrc = $rs_attrs->{_rsroot_rsrc};
   my $root_tbl_name = $root_rsrc->name;
 
-  # mangle the input sql as we will be replacing the selector
-  $sql =~ s/^ \s* SELECT \s+ .+? \s+ (?= \b FROM \b )//ix
-    or $self->throw_exception("Unrecognizable SELECT: $sql");
-
-  my ($order_by, @rest) = do {
+  my ($first_order_by) = do {
     local $self->{quote_char};
-    $self->_order_by_chunks ($rs_attrs->{order_by})
-  };
-
-  unless (
-    $order_by
-      &&
-    ! @rest
-      &&
-    ( ! ref $order_by
-        ||
-      ( ref $order_by eq 'ARRAY' and @$order_by == 1 )
-    )
-  ) {
-    $self->throw_exception (
-      'Generic Subquery Limit does not work on resultsets without an order, or resultsets '
-    . 'with complex order criteria (multicolumn and/or functions). Provide a single, '
-    . 'unique-column order criteria.'
-    );
-  }
-
-  ($order_by) = @$order_by if ref $order_by;
+    map { ref $_ ? $_->[0] : $_ } $self->_order_by_chunks ($rs_attrs->{order_by})
+  } or $self->throw_exception (
+    'Generic Subquery Limit does not work on resultsets without an order. Provide a single, '
+  . 'unique-column order criteria.'
+  );
 
-  $order_by =~ s/\s+ ( ASC|DESC ) \s* $//ix;
+  $first_order_by =~ s/\s+ ( ASC|DESC ) \s* $//ix;
   my $direction = lc ($1 || 'asc');
 
-  my ($unq_sort_col) = $order_by =~ /(?:^|\.)([^\.]+)$/;
-
-  my $inf = $root_rsrc->storage->_resolve_column_info (
-    $rs_attrs->{from}, [$order_by, $unq_sort_col]
-  );
+  my ($first_ord_alias, $first_ord_col) = $first_order_by =~ /^ (?: ([^\.]+) \. )? ([^\.]+) $/x;
 
-  my $ord_colinfo = $inf->{$order_by} || $self->throw_exception("Unable to determine source of order-criteria '$order_by'");
+  $self->throw_exception(sprintf
+    "Generic Subquery Limit order criteria can be only based on the root-source '%s'"
+  . " (aliased as '%s')", $root_rsrc->source_name, $rs_attrs->{alias},
+  ) if ($first_ord_alias and $first_ord_alias ne $rs_attrs->{alias});
 
-  if ($ord_colinfo->{-result_source}->name ne $root_tbl_name) {
-    $self->throw_exception(sprintf
-      "Generic Subquery Limit order criteria can be only based on the root-source '%s'"
-    . " (aliased as '%s')", $root_rsrc->source_name, $rs_attrs->{alias},
-    );
-  }
+  $first_ord_alias ||= $rs_attrs->{alias};
 
-  # make sure order column is qualified
-  $order_by = "$rs_attrs->{alias}.$order_by"
-    unless $order_by =~ /^$rs_attrs->{alias}\./;
-
-  my $is_u;
-  my $ucs = { $root_rsrc->unique_constraints };
-  for (values %$ucs ) {
-    if (@$_ == 1 && "$rs_attrs->{alias}.$_->[0]" eq $order_by) {
-      $is_u++;
-      last;
-    }
-  }
   $self->throw_exception(
-    "Generic Subquery Limit order criteria column '$order_by' must be unique (no unique constraint found)"
-  ) unless $is_u;
+    "Generic Subquery Limit first order criteria '$first_ord_col' must be unique"
+  ) unless $root_rsrc->_identifying_column_set([$first_ord_col]);
 
-  my ($in_sel, $out_sel, $alias_map, $extra_order_sel)
-    = $self->_subqueried_limit_attrs ($rs_attrs);
+  my ($stripped_sql, $in_sel, $out_sel, $alias_map, $extra_order_sel)
+    = $self->_subqueried_limit_attrs ($sql, $rs_attrs);
 
   my $cmp_op = $direction eq 'desc' ? '>' : '<';
   my $count_tbl_alias = 'rownum__emulation';
@@ -579,35 +598,37 @@ sub _GenericSubQ {
   # add the order supplement (if any) as this is what will be used for the outer WHERE
   $in_sel .= ", $_" for keys %{$extra_order_sel||{}};
 
-  $sql = sprintf (<<EOS,
+  my $rownum_cond;
+  if ($offset) {
+    $rownum_cond = 'BETWEEN ? AND ?';
+
+    push @{$self->{limit_bind}},
+      [ $self->__offset_bindtype => $offset ],
+      [ $self->__total_bindtype => $offset + $rows - 1]
+    ;
+  }
+  else {
+    $rownum_cond = '< ?';
+
+    push @{$self->{limit_bind}},
+      [ $self->__rows_bindtype => $rows ]
+    ;
+  }
+
+  return sprintf ("
 SELECT $out_sel
   FROM (
-    SELECT $in_sel ${sql}${group_having_sql}
+    SELECT $in_sel ${stripped_sql}${group_having_sql}
   ) %s
-WHERE ( SELECT COUNT(*) FROM %s %s WHERE %s $cmp_op %s ) %s
+WHERE ( SELECT COUNT(*) FROM %s %s WHERE %s $cmp_op %s ) $rownum_cond
 $order_sql
-EOS
-    ( map { $self->_quote ($_) } (
-      $rs_attrs->{alias},
-      $root_tbl_name,
-      $count_tbl_alias,
-      "$count_tbl_alias.$unq_sort_col",
-      $order_by,
-    )),
-    $offset
-      ? do {
-         push @{$self->{limit_bind}},
-            [ $self->__offset_bindtype => $offset ], [ $self->__total_bindtype => $offset + $rows - 1];
-         'BETWEEN ? AND ?';
-        }
-      : do {
-         push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
-         '< ?';
-         }
-    ,
-  );
-
-  return $sql;
+  ", map { $self->_quote ($_) } (
+    $rs_attrs->{alias},
+    $root_tbl_name,
+    $count_tbl_alias,
+    "$count_tbl_alias.$first_ord_col",
+    "$first_ord_alias.$first_ord_col",
+  ));
 }
 
 
@@ -619,10 +640,10 @@ EOS
 # turned into a column alias (otherwise names in subqueries clash
 # and/or lose their source table)
 #
-# Returns inner/outer strings of SQL QUOTED selectors with aliases
-# (to be used in whatever select statement), and an alias index hashref
-# of QUOTED SEL => QUOTED ALIAS pairs (to maybe be used for string-subst
-# higher up).
+# Returns mangled proto-sql, inner/outer strings of SQL QUOTED selectors
+# with aliases (to be used in whatever select statement), and an alias
+# index hashref of QUOTED SEL => QUOTED ALIAS pairs (to maybe be used
+# for string-subst higher up).
 # If an order_by is supplied, the inner select needs to bring out columns
 # used in implicit (non-selected) orders, and the order condition itself
 # needs to be realiased to the proper names in the outer query. Thus we
@@ -630,14 +651,26 @@ EOS
 # QUOTED ALIAS pairs, which is a list of extra selectors that do *not*
 # exist in the original select list
 sub _subqueried_limit_attrs {
-  my ($self, $rs_attrs) = @_;
+  my ($self, $proto_sql, $rs_attrs) = @_;
 
   $self->throw_exception(
     'Limit dialect implementation usable only in the context of DBIC (missing $rs_attrs)'
   ) unless ref ($rs_attrs) eq 'HASH';
 
+  # mangle the input sql as we will be replacing the selector entirely
+  unless (
+    $rs_attrs->{_selector_sql}
+      and
+    $proto_sql =~ s/^ \s* SELECT \s* \Q$rs_attrs->{_selector_sql}//ix
+  ) {
+    $self->throw_exception("Unrecognizable SELECT: $proto_sql");
+  }
+
   my ($re_sep, $re_alias) = map { quotemeta $_ } ( $self->{name_sep}, $rs_attrs->{alias} );
 
+  # insulate from the multiple _recurse_fields calls below
+  local $self->{select_bind};
+
   # correlate select and as, build selection index
   my (@sel, $in_sel_index);
   for my $i (0 .. $#{$rs_attrs->{select}}) {
@@ -648,7 +681,10 @@ sub _subqueried_limit_attrs {
 
     push @sel, {
       sql => $sql_sel,
-      unquoted_sql => do { local $self->{quote_char}; $self->_recurse_fields ($s) },
+      unquoted_sql => do {
+        local $self->{quote_char};
+        $self->_recurse_fields ($s);
+      },
       as =>
         $sql_alias
           ||
@@ -692,7 +728,6 @@ sub _subqueried_limit_attrs {
       push @out_sel, $self->_quote ($node->{as});
     }
   }
-
   # see if the order gives us anything
   my %extra_order_sel;
   for my $chunk ($self->_order_by_chunks ($rs_attrs->{order_by})) {
@@ -708,6 +743,7 @@ sub _subqueried_limit_attrs {
   }
 
   return (
+    $proto_sql,
     (map { join (', ', @$_ ) } (
       \@in_sel,
       \@out_sel)