Merge 'top_limit_altfix' into 'trunk'
Peter Rabbitson [Tue, 26 May 2009 14:29:55 +0000 (14:29 +0000)]
lib/DBIx/Class/ResultSet.pm
lib/DBIx/Class/SQLAHacks.pm
lib/DBIx/Class/Storage/DBI.pm
t/42toplimit.t

index d91d944..5a8595c 100644 (file)
@@ -661,6 +661,8 @@ sub cursor {
   my ($self) = @_;
 
   my $attrs = $self->_resolved_attrs_copy;
+  $attrs->{_virtual_order_by} = $self->_gen_virtual_order;
+
   return $self->{cursor}
     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
           $attrs->{where},$attrs);
@@ -712,6 +714,8 @@ sub single {
   }
 
   my $attrs = $self->_resolved_attrs_copy;
+  $attrs->{_virtual_order_by} = $self->_gen_virtual_order;
+
   if ($where) {
     if (defined $attrs->{where}) {
       $attrs->{where} = {
@@ -738,6 +742,16 @@ sub single {
   return (@data ? ($self->_construct_object(@data))[0] : undef);
 }
 
+# _gen_virtual_order
+#
+# This is a horrble hack, but seems like the best we can do at this point
+# Some limit emulations (Top) require an ordered resultset in order to 
+# function at all. So supply a PK order to be used if necessary
+
+sub _gen_virtual_order {
+  return [ shift->result_source->primary_columns ];
+}
+
 # _is_unique_query
 #
 # Try to determine if the specified query is guaranteed to be unique, based on
index be4fc28..2aca425 100644 (file)
@@ -67,11 +67,7 @@ sub _where_field_BETWEEN {
   return $self->SUPER::_where_field_BETWEEN ($lhs, $op, $rhs);
 }
 
-
-
-# DB2 is the only remaining DB using this. Even though we are not sure if
-# RowNumberOver is still needed here (should be part of SQLA) leave the 
-# code in place
+# Slow but ANSI standard Limit/Offset support. DB2 uses this
 sub _RowNumberOver {
   my ($self, $sql, $order, $rows, $offset ) = @_;
 
@@ -94,6 +90,44 @@ SQL
   return $sql;
 }
 
+# Crappy Top based Limit/Offset support. MSSQL uses this currently,
+# but may have to switch to RowNumberOver one day
+sub _Top {
+  my ( $self, $sql, $order, $rows, $offset ) = @_;
+
+  croak '$order supplied to SQLAHacks limit emulators must be a hash'
+    if (ref $order ne 'HASH');
+
+  $order = { %$order }; #copy
+
+  my $last = $rows + $offset;
+
+  my $req_order = $self->_order_by ($order->{order_by});
+  my $limit_order = $req_order ? $order->{order_by} : $order->{_virtual_order_by};
+
+  delete $order->{$_} for qw/order_by _virtual_order_by/;
+  my $grpby_having = $self->_order_by ($order);
+
+  my ( $order_by_inner, $order_by_outer ) = $self->_order_directions($limit_order);
+
+  $sql =~ s/^\s*(SELECT|select)//;
+
+  $sql = <<"SQL";
+  SELECT * FROM
+  (
+    SELECT TOP $rows * FROM
+    (
+        SELECT TOP $last $sql $grpby_having $order_by_inner
+    ) AS foo
+    $order_by_outer
+  ) AS bar
+  $req_order
+
+SQL
+    return $sql;
+}
+
+
 
 # While we're at it, this should make LIMIT queries more efficient,
 #  without digging into things too deeply
@@ -213,32 +247,38 @@ sub _order_by {
   my $ret = '';
   my @extra;
   if (ref $_[0] eq 'HASH') {
+
     if (defined $_[0]->{group_by}) {
       $ret = $self->_sqlcase(' group by ')
         .$self->_recurse_fields($_[0]->{group_by}, { no_rownum_hack => 1 });
     }
+
     if (defined $_[0]->{having}) {
       my $frag;
       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
       push(@{$self->{having_bind}}, @extra);
       $ret .= $self->_sqlcase(' having ').$frag;
     }
+
     if (defined $_[0]->{order_by}) {
       $ret .= $self->_order_by($_[0]->{order_by});
     }
+
     if (grep { $_ =~ /^-(desc|asc)/i } keys %{$_[0]}) {
       return $self->SUPER::_order_by($_[0]);
     }
+
   } elsif (ref $_[0] eq 'SCALAR') {
     $ret = $self->_sqlcase(' order by ').${ $_[0] };
   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
-    my @order = @{+shift};
-    $ret = $self->_sqlcase(' order by ')
-          .join(', ', map {
-                        my $r = $self->_order_by($_, @_);
-                        $r =~ s/^ ?ORDER BY //i;
-                        $r;
-                      } @order);
+    my @order = map {
+      my $r = $self->_order_by($_, @_);
+      $r =~ s/^ ?ORDER BY //i;
+      $r || ();
+    } @{+shift};
+
+    $ret = $self->_sqlcase(' order by ') . join(', ', @order) if @order;
+
   } else {
     $ret = $self->SUPER::_order_by(@_);
   }
@@ -252,7 +292,6 @@ sub _order_directions {
 
 sub _resolve_order {
   my ($self, $order) = @_;
-  $order = $order->{order_by} if (ref $order eq 'HASH' and $order->{order_by});
 
   if (ref $order eq 'HASH') {
     $order = [$self->_resolve_order_hash($order)];
@@ -292,6 +331,7 @@ sub _resolve_order_hash {
       croak "$key is not a valid direction, use -asc or -desc";
     }
   }
+
   return @new_order;
 }
 
index 0d9cace..9f38504 100644 (file)
@@ -11,6 +11,7 @@ use DBIx::Class::SQLAHacks;
 use DBIx::Class::Storage::DBI::Cursor;
 use DBIx::Class::Storage::Statistics;
 use Scalar::Util qw/blessed weaken/;
+use List::Util();
 
 __PACKAGE__->mk_group_accessors('simple' =>
     qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts
@@ -1177,11 +1178,14 @@ sub _select_args {
   my $sql_maker = $self->sql_maker;
   $sql_maker->{for} = $for;
 
-  if (exists $attrs->{group_by} || $attrs->{having}) {
+  my @in_order_attrs = qw/group_by having _virtual_order_by/;
+  if (List::Util::first { exists $attrs->{$_} } (@in_order_attrs) ) {
     $order = {
-      group_by => $attrs->{group_by},
-      having => $attrs->{having},
-      ($order ? (order_by => $order) : ())
+      ($order
+        ? (order_by => $order)
+        : ()
+      ),
+      ( map { $_ => $attrs->{$_} } (@in_order_attrs) )
     };
   }
   my $bind_attrs = {}; ## Future support
index 9e2bec7..f63b74c 100644 (file)
@@ -9,6 +9,8 @@ use DBIC::SqlMakerTest;
 my $schema = DBICTest->init_schema;
 
 # Trick the sqlite DB to use Top limit emulation
+# We could test all of this via $sq->$op directly,
+# but some conditions needs a $rsrc
 delete $schema->storage->_sql_maker->{_cached_syntax};
 $schema->storage->_sql_maker->limit_dialect ('Top');
 
@@ -115,5 +117,22 @@ my @tests = (
   },
 );
 
-plan (tests => scalar @tests);
+plan (tests => scalar @tests + 1);
+
 test_order ($_) for @tests;
+
+is_same_sql_bind (
+  $rs->search ({}, { group_by => 'bar', order_by => 'bar' })->as_query,
+  '(
+    SELECT * FROM
+    (
+      SELECT TOP 1 * FROM
+      (
+        SELECT TOP 4  me.foo, me.bar, me.hello, me.goodbye, me.sensors, me.read_count FROM fourkeys me GROUP BY bar ORDER BY bar ASC
+      ) AS foo
+      ORDER BY bar DESC
+    ) AS bar
+    ORDER BY bar
+  )',
+  [],
+);