Factor out IDENTITY_INSERT for Sybase ASE and MSSQL into a component
[dbsrgits/DBIx-Class-Historic.git] / lib / DBIx / Class / SQLMaker / LimitDialects.pm
index 5d9afa1..ac145c3 100644 (file)
@@ -314,14 +314,30 @@ sub _prep_for_skimming_limit {
   my $requested_order = delete $rs_attrs->{order_by};
   $r{order_by_requested} = $self->_order_by ($requested_order);
 
-  # make up an order unless supplied
-  my $inner_order = ($r{order_by_requested}
-    ? $requested_order
-    : [ map
+  # make up an order unless supplied or sanity check what we are given
+  my $inner_order;
+  if ($r{order_by_requested}) {
+    $self->throw_exception (
+      'Unable to safely perform "skimming type" limit with supplied unstable order criteria'
+    ) unless $rs_attrs->{_rsroot_rsrc}->schema->storage->_order_by_is_stable(
+      $rs_attrs->{from},
+      $requested_order
+    );
+
+    $inner_order = $requested_order;
+  }
+  else {
+    $inner_order = [ map
       { "$rs_attrs->{alias}.$_" }
-      ( $rs_attrs->{_rsroot_rsrc}->_pri_cols )
-    ]
-  );
+      ( @{
+        $rs_attrs->{_rsroot_rsrc}->_identifying_column_set
+          ||
+        $self->throw_exception(sprintf(
+          'Unable to auto-construct stable order criteria for "skimming type" limit '
+        . "dialect based on source '%s'", $rs_attrs->{_rsroot_rsrc}->name) );
+      } )
+    ];
+  }
 
   # localise as we already have all the bind values we need
   {
@@ -361,6 +377,18 @@ sub _prep_for_skimming_limit {
     push @{$self->{pre_select_bind}}, @{$self->{order_bind}};
   }
 
+  # if this is a part of something bigger, we need to add back all
+  # the extra order_by's, as they may be relied upon by the outside
+  # of a prefetch or something
+  if ($rs_attrs->{_is_internal_subuery} and keys %$extra_order_sel) {
+    $r{out_sel} .= sprintf ", $extra_order_sel->{$_} AS $_"
+      for sort
+        { $extra_order_sel->{$a} cmp $extra_order_sel->{$b} }
+          grep { $_ !~ /[^\w\-]/ }  # ignore functions
+          keys %$extra_order_sel
+    ;
+  }
+
   # and this is order re-alias magic
   for my $map ($extra_order_sel, $alias_map) {
     for my $col (keys %$map) {
@@ -534,63 +562,29 @@ sub _GenericSubQ {
   my $root_rsrc = $rs_attrs->{_rsroot_rsrc};
   my $root_tbl_name = $root_rsrc->name;
 
-  my ($order_by, @rest) = do {
+  my ($first_order_by) = do {
     local $self->{quote_char};
-    $self->_order_by_chunks ($rs_attrs->{order_by})
-  };
-
-  unless (
-    $order_by
-      &&
-    ! @rest
-      &&
-    ( ! ref $order_by
-        ||
-      ( ref $order_by eq 'ARRAY' and @$order_by == 1 )
-    )
-  ) {
-    $self->throw_exception (
-      'Generic Subquery Limit does not work on resultsets without an order, or resultsets '
-    . 'with complex order criteria (multicolumn and/or functions). Provide a single, '
-    . 'unique-column order criteria.'
-    );
-  }
-
-  ($order_by) = @$order_by if ref $order_by;
+    map { ref $_ ? $_->[0] : $_ } $self->_order_by_chunks ($rs_attrs->{order_by})
+  } or $self->throw_exception (
+    'Generic Subquery Limit does not work on resultsets without an order. Provide a single, '
+  . 'unique-column order criteria.'
+  );
 
-  $order_by =~ s/\s+ ( ASC|DESC ) \s* $//ix;
+  $first_order_by =~ s/\s+ ( ASC|DESC ) \s* $//ix;
   my $direction = lc ($1 || 'asc');
 
-  my ($unq_sort_col) = $order_by =~ /(?:^|\.)([^\.]+)$/;
+  my ($first_ord_alias, $first_ord_col) = $first_order_by =~ /^ (?: ([^\.]+) \. )? ([^\.]+) $/x;
 
-  my $inf = $root_rsrc->storage->_resolve_column_info (
-    $rs_attrs->{from}, [$order_by, $unq_sort_col]
-  );
+  $self->throw_exception(sprintf
+    "Generic Subquery Limit order criteria can be only based on the root-source '%s'"
+  . " (aliased as '%s')", $root_rsrc->source_name, $rs_attrs->{alias},
+  ) if ($first_ord_alias and $first_ord_alias ne $rs_attrs->{alias});
 
-  my $ord_colinfo = $inf->{$order_by} || $self->throw_exception("Unable to determine source of order-criteria '$order_by'");
+  $first_ord_alias ||= $rs_attrs->{alias};
 
-  if ($ord_colinfo->{-result_source}->name ne $root_tbl_name) {
-    $self->throw_exception(sprintf
-      "Generic Subquery Limit order criteria can be only based on the root-source '%s'"
-    . " (aliased as '%s')", $root_rsrc->source_name, $rs_attrs->{alias},
-    );
-  }
-
-  # make sure order column is qualified
-  $order_by = "$rs_attrs->{alias}.$order_by"
-    unless $order_by =~ /^$rs_attrs->{alias}\./;
-
-  my $is_u;
-  my $ucs = { $root_rsrc->unique_constraints };
-  for (values %$ucs ) {
-    if (@$_ == 1 && "$rs_attrs->{alias}.$_->[0]" eq $order_by) {
-      $is_u++;
-      last;
-    }
-  }
   $self->throw_exception(
-    "Generic Subquery Limit order criteria column '$order_by' must be unique (no unique constraint found)"
-  ) unless $is_u;
+    "Generic Subquery Limit first order criteria '$first_ord_col' must be unique"
+  ) unless $root_rsrc->_identifying_column_set([$first_ord_col]);
 
   my ($stripped_sql, $in_sel, $out_sel, $alias_map, $extra_order_sel)
     = $self->_subqueried_limit_attrs ($sql, $rs_attrs);
@@ -632,8 +626,8 @@ $order_sql
     $rs_attrs->{alias},
     $root_tbl_name,
     $count_tbl_alias,
-    "$count_tbl_alias.$unq_sort_col",
-    $order_by,
+    "$count_tbl_alias.$first_ord_col",
+    "$first_ord_alias.$first_ord_col",
   ));
 }