# Although this is needed only if the order_by is not defined, it is
# actually cheaper to just populate this rather than properly examining
# order_by (stuf like [ {} ] and the like)
- $attrs->{_virtual_order_by} = [ $self->result_source->primary_columns ];
-
+ my $prefix = $alias . ($source->schema->storage->_sql_maker_opts->{name_sep} || '.');
+ $attrs->{_virtual_order_by} = [
+ map { $prefix . $_ } ($source->primary_columns)
+ ];
my $collapse = $attrs->{collapse} || {};
if ( my $prefetch = delete $attrs->{prefetch} ) {
sub _Top {
my ( $self, $sql, $order, $rows, $offset ) = @_;
+ # mangle the input sql so it can be properly aliased in the outer queries
+ $sql =~ s/^ \s* SELECT \s+ (.+?) \s+ (?=FROM)//ix
+ or croak "Unrecognizable SELECT: $sql";
+ my $select = $1;
+
+ my (@outer_select, %col_index);
+ for my $selected_col (@{$self->{_dbic_rs_attrs}{select}}) {
+
+ my $new_colname;
+
+ if (ref $selected_col) {
+ $new_colname = $self->_quote ('column_' . (@outer_select + 1) );
+ }
+ else {
+ my $quoted_col = $self->_quote ($selected_col);
+
+ my $name_sep = $self->name_sep || '.';
+ $name_sep = "\Q$name_sep\E";
+
+ my ($table, $orig_colname) = ( $selected_col =~ / (?: (.+) $name_sep )? ([^$name_sep]+) $ /x );
+ $new_colname = $self->_quote ("${table}__${orig_colname}");
+
+ $select =~ s/(\Q$quoted_col\E|\Q$selected_col\E)/"$1 AS $new_colname"/e;
+
+ # record qualified name if available (should be)
+ $col_index{$selected_col} = $new_colname if $table;
+
+ # record unqialified name, undef if a duplicate is found
+ if (exists $col_index{$orig_colname}) {
+ $col_index{$orig_colname} = undef;
+ }
+ else {
+ $col_index{$orig_colname} = $new_colname;
+ }
+ }
+
+ push @outer_select, $new_colname;
+ }
+
+ my $outer_select = join (', ', @outer_select );
+
+
+ # deal with order
croak '$order supplied to SQLAHacks limit emulators must be a hash'
if (ref $order ne 'HASH');
$order = { %$order }; #copy
- my $last = $rows + $offset;
+ my $req_order = [ $self->_order_by_chunks ($order->{order_by}) ];
+ my $limit_order = [ @$req_order ? @$req_order : $self->_order_by_chunks ($order->{_virtual_order_by}) ];
+
- my $req_order = $self->_order_by ($order->{order_by});
+ # normalize all column names in order by
+ # no copies, just aliasing ($_)
+ for ($req_order, $limit_order) {
+ for ( @{$_ || []} ) {
+ $_ = $col_index{$_} if $col_index{$_};
+ }
+ }
- my $limit_order = $req_order ? $order->{order_by} : $order->{_virtual_order_by};
+ # generate the rest
delete $order->{$_} for qw/order_by _virtual_order_by/;
my $grpby_having = $self->_order_by ($order);
my ( $order_by_inner, $order_by_outer ) = $self->_order_directions($limit_order);
- $sql =~ s/^\s*(SELECT|select)//;
+ my $last = $rows + $offset;
$sql = <<"SQL";
- SELECT * FROM
- (
- SELECT TOP $rows * FROM
+
+ SELECT TOP $rows $outer_select FROM
(
- SELECT TOP $last $sql $grpby_having $order_by_inner
- ) AS foo
+ SELECT TOP $last $select $sql $grpby_having $order_by_inner
+ ) AS inner_sel
$order_by_outer
- ) AS bar
- $req_order
+SQL
+
+ if (@$req_order) {
+ my $order_by_requested = $self->_order_by ($req_order);
+ $sql = <<"SQL";
+
+ SELECT $outer_select FROM
+ ( $sql ) AS outer_sel
+ $order_by_requested;
SQL
- return $sql;
+
+ }
+
+ return $sql;
}
sub _resolve_column_info {
my ($self, $ident, $colnames) = @_;
my $alias2src = $self->_resolve_ident_sources($ident);
- my $name_sep = $self->_sql_maker_opts->{name_sep} || '.';
+
+ my $sep = $self->_sql_maker_opts->{name_sep} || '.';
+ $sep = "\Q$sep\E";
+
my %return;
foreach my $col (@{$colnames}) {
- $col =~ m/^([^\Q${name_sep}\E]*)\Q${name_sep}\E/;
+ $col =~ m/^ (?: ([^$sep]+) $sep)? (.+) $/x;
+
my $alias = $1 || 'me';
+ my $colname = $2;
+
my $rsrc = $alias2src->{$alias};
- $return{$col} = $rsrc && { %{$rsrc->column_info($col)}, -result_source => $rsrc };
+ $return{$col} = $rsrc && { %{$rsrc->column_info($colname)}, -result_source => $rsrc };
}
return \%return;
}
plan skip_all => 'Set $ENV{DBICTEST_MSSQL_ODBC_DSN}, _USER and _PASS to run this test'
unless ($dsn && $user);
-plan tests => 21;
+plan tests => 19;
my $schema = DBICTest::Schema->connect($dsn, $user, $pass, {AutoCommit => 1});
rows => 5,
});
- my $owners2 = $schema->resultset ('Owners')->search ({ id => { -in => $owners->get_column ('me.id')->as_query }});
- for ($owners, $owners2) {
- is ($_->all, 8, 'Prefetched grouped search returns correct number of rows');
- is ($_->count, 8, 'Prefetched grouped search returns correct count');
- }
+ is ($owners->all, 3, 'Prefetched grouped search returns correct number of rows');
+ is ($owners->count, 3, 'Prefetched grouped search returns correct count');
# try a ->belongs_to direction (no select collapse)
my $books = $schema->resultset ('BooksInLibrary')->search ({
prefetch => 'owner',
distinct => 1,
order_by => 'name',
- #page => 2,
- #rows => 5,
+ rows => 5,
});
- my $books2 = $schema->resultset ('BooksInLibrary')->search ({ id => { -in => $books->get_column ('me.id')->as_query }});
- for ($books, $books2) {
- is ($_->all, 1, 'Prefetched grouped search returns correct number of rows');
- is ($_->count, 1, 'Prefetched grouped search returns correct count');
- }
+
+ is ($books->page(1)->all, 1, 'Prefetched grouped search returns correct number of rows');
+ is ($books->page(1)->count, 1, 'Prefetched grouped search returns correct count');
+
+ is ($books->page(2)->all, 0, 'Prefetched grouped search returns correct number of rows');
+ is ($books->page(2)->count, 0, 'Prefetched grouped search returns correct count');
}