X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=blobdiff_plain;f=lib%2FDBIx%2FClass%2FSQLAHacks.pm;h=d2dc16edc58e57e759a0bad3ddf449923aad261d;hb=75f025cf247e75869dcdfe46b37e24d4e0b15e8b;hp=b5c9f8a4681343054fdc95c7e37ed39703081b60;hpb=81446c4ffa75bc3c215920ea8a9f7b74c50623b0;p=dbsrgits%2FDBIx-Class.git diff --git a/lib/DBIx/Class/SQLAHacks.pm b/lib/DBIx/Class/SQLAHacks.pm index b5c9f8a..d2dc16e 100644 --- a/lib/DBIx/Class/SQLAHacks.pm +++ b/lib/DBIx/Class/SQLAHacks.pm @@ -46,60 +46,113 @@ sub new { $self; } -# generate inner/outer select lists for various limit dialects +# !!! THIS IS ALSO HORRIFIC !!! /me ashamed +# +# Generates inner/outer select lists for various limit dialects # which result in one or more subqueries (e.g. RNO, Top, RowNum) # Any non-root-table columns need to have their table qualifier -# turned into a column name (otherwise names in subqueries clash +# turned into a column alias (otherwise names in subqueries clash # and/or lose their source table) -sub _subqueried_selection { +# +# Returns inner/outer strings of SQL QUOTED selectors with aliases +# (to be used in whatever select statement), and an alias index hashref +# of QUOTED SEL => QUOTED ALIAS pairs (to maybe be used for string-subst +# higher up). +# If an order_by is supplied, the inner select needs to bring out columns +# used in implicit (non-selected) orders, and the order condition itself +# needs to be realiased to the proper names in the outer query. Thus we +# also return a hashref (order doesn't matter) of QUOTED EXTRA-SEL => +# QUOTED ALIAS pairs, which is a list of extra selectors that do *not* +# exist in the original select list + +sub _subqueried_limit_attrs { my ($self, $rs_attrs) = @_; - croak 'Limit usable only in the context of DBIC (missing $rs_attrs)' unless $rs_attrs; + croak 'Limit dialect implementation usable only in the context of DBIC (missing $rs_attrs)' + unless ref ($rs_attrs) eq 'HASH'; + + my ($re_sep, $re_alias) = map { quotemeta $_ } ( + $self->name_sep || '.', + $rs_attrs->{alias}, + ); - # correlate select and as - my @sel; + # correlate select and as, build selection index + my (@sel, $in_sel_index); for my $i (0 .. $#{$rs_attrs->{select}}) { + my $s = $rs_attrs->{select}[$i]; + my $sql_sel = $self->_recurse_fields ($s); + my $sql_alias = (ref $s) eq 'HASH' ? $s->{-as} : undef; + + push @sel, { - sql => $self->_recurse_fields ($s), + sql => $sql_sel, unquoted_sql => do { local $self->{quote_char}; $self->_recurse_fields ($s) }, as => - ( (ref $s) eq 'HASH' ? $s->{-as} : undef) + $sql_alias || $rs_attrs->{as}[$i] || croak "Select argument $i ($s) without corresponding 'as'" , }; + + $in_sel_index->{$sql_sel}++; + $in_sel_index->{$self->_quote ($sql_alias)}++ if $sql_alias; + + # record unqualified versions too, so we do not have + # to reselect the same column twice (in qualified and + # unqualified form) + if (! ref $s && $sql_sel =~ / $re_sep (.+) $/x) { + $in_sel_index->{$1}++; + } } - my ($qsep, $qalias) = map { quotemeta $_ } ( - $self->name_sep || '.', - $rs_attrs->{alias}, - ); # re-alias and remove any name separators from aliases, # unless we are dealing with the current source alias - # (which will transcend the subqueries and is necessary + # (which will transcend the subqueries as it is necessary # for possible further chaining) - my (@insel, @outsel); + my (@in_sel, @out_sel, %renamed); for my $node (@sel) { - if (List::Util::first { $_ =~ / (?{as}, $node->{unquoted_sql}) ) { - $node->{as} =~ s/ $qsep /__/xg; - push @insel, sprintf '%s AS %s', $node->{sql}, $self->_quote($node->{as}); - push @outsel, $self->_quote ($node->{as}); + if (List::Util::first { $_ =~ / (?{as}, $node->{unquoted_sql}) ) { + $node->{as} =~ s/ $re_sep /__/xg; + my $quoted_as = $self->_quote($node->{as}); + push @in_sel, sprintf '%s AS %s', $node->{sql}, $quoted_as; + push @out_sel, $quoted_as; + $renamed{$node->{sql}} = $quoted_as; } else { - push @insel, $node->{sql}; - push @outsel, $self->_quote ($node->{as}); + push @in_sel, $node->{sql}; + push @out_sel, $self->_quote ($node->{as}); } } - return map { join (', ', @$_ ) } (\@insel, \@outsel); -} + # see if the order gives us anything + my %extra_order_sel; + for my $chunk ($self->_order_by_chunks ($rs_attrs->{order_by})) { + # order with bind + $chunk = $chunk->[0] if (ref $chunk) eq 'ARRAY'; + $chunk =~ s/\s+ (?: ASC|DESC ) \s* $//ix; + next if $in_sel_index->{$chunk}; -# ANSI standard Limit/Offset implementation. DB2 and MSSQL use this + $extra_order_sel{$chunk} ||= $self->_quote ( + 'ORDER__BY__' . scalar keys %extra_order_sel + ); + } + + return ( + (map { join (', ', @$_ ) } ( + \@in_sel, + \@out_sel) + ), + \%renamed, + keys %extra_order_sel ? \%extra_order_sel : (), + ); +} + +# ANSI standard Limit/Offset implementation. DB2 and MSSQL >= 2005 use this sub _RowNumberOver { my ($self, $sql, $rs_attrs, $rows, $offset ) = @_; @@ -107,28 +160,51 @@ sub _RowNumberOver { $sql =~ s/^ \s* SELECT \s+ .+? \s+ (?= \b FROM \b )//ix or croak "Unrecognizable SELECT: $sql"; - # get selectors - my ($insel, $outsel) = $self->_subqueried_selection ($rs_attrs); + # get selectors, and scan the order_by (if any) + my ($in_sel, $out_sel, $alias_map, $extra_order_sel) + = $self->_subqueried_limit_attrs ( $rs_attrs ); # make up an order if none exists - my $order_by = $self->_order_by( - (delete $rs_attrs->{order_by}) || $self->_rno_default_order - ); + my $requested_order = (delete $rs_attrs->{order_by}) || $self->_rno_default_order; + my $rno_ord = $self->_order_by ($requested_order); + + # this is the order supplement magic + my $mid_sel = $out_sel; + if ($extra_order_sel) { + for my $extra_col (sort + { $extra_order_sel->{$a} cmp $extra_order_sel->{$b} } + keys %$extra_order_sel + ) { + $in_sel .= sprintf (', %s AS %s', + $extra_col, + $extra_order_sel->{$extra_col}, + ); + + $mid_sel .= ', ' . $extra_order_sel->{$extra_col}; + } + } + + # and this is order re-alias magic + for ($extra_order_sel, $alias_map) { + for my $col (keys %$_) { + my $re_col = quotemeta ($col); + $rno_ord =~ s/$re_col/$_->{$col}/; + } + } # whatever is left of the order_by (only where is processed at this point) my $group_having = $self->_parse_rs_attrs($rs_attrs); my $qalias = $self->_quote ($rs_attrs->{alias}); - my $idx_name = $self->_quote ('rno__row__index'); $sql = sprintf (<_parse_rs_attrs ($rs_attrs), ); @@ -167,9 +243,9 @@ sub _FirstSkip { or croak "Unrecognizable SELECT: $sql"; return sprintf ('SELECT %s%s%s%s', - sprintf ('FIRST %d ', $rows), + sprintf ('FIRST %u ', $rows), $offset - ? sprintf ('SKIP %d ', $offset) + ? sprintf ('SKIP %u ', $offset) : '' , $sql, @@ -185,7 +261,7 @@ sub _RowNum { $sql =~ s/^ \s* SELECT \s+ .+? \s+ (?= \b FROM \b )//ix or croak "Unrecognizable SELECT: $sql"; - my ($insel, $outsel) = $self->_subqueried_selection ($rs_attrs); + my ($insel, $outsel) = $self->_subqueried_limit_attrs ($rs_attrs); my $qalias = $self->_quote ($rs_attrs->{alias}); my $idx_name = $self->_quote ('rownum__index'); @@ -197,7 +273,7 @@ SELECT $outsel FROM ( SELECT $outsel, ROWNUM $idx_name FROM ( SELECT $insel ${sql}${order_group_having} ) $qalias -) $qalias WHERE $idx_name BETWEEN %d AND %d +) $qalias WHERE $idx_name BETWEEN %u AND %u EOS @@ -205,8 +281,7 @@ EOS return $sql; } -=begin -# Crappy Top based Limit/Offset support. Legacy from MSSQL. +# Crappy Top based Limit/Offset support. Legacy for MSSQL < 2005 sub _Top { my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_; @@ -215,154 +290,205 @@ sub _Top { or croak "Unrecognizable SELECT: $sql"; # get selectors - my ($insel, $outsel) = $self->_subqueried_selection ($rs_attrs); + my ($in_sel, $out_sel, $alias_map, $extra_order_sel) + = $self->_subqueried_limit_attrs ($rs_attrs); + + my $requested_order = delete $rs_attrs->{order_by}; - # deal with order - my $rs_alias = $rs_attrs->{alias}; - my $req_order = delete $rs_attrs->{order_by}; - my $name_sep = $self->name_sep || '.'; + my $order_by_requested = $self->_order_by ($requested_order); - # examine normalized version, collapses nesting - my $limit_order = scalar $self->_order_by_chunks ($req_order) - ? $req_order + # make up an order unless supplied + my $inner_order = ($order_by_requested + ? $requested_order : [ map - { join ('', $rs_alias, $name_sep, $_ ) } - ( $rs_attrs->{_rsroot_source_handle}->resolve->primary_columns ) + { join ('', $rs_attrs->{alias}, $self->{name_sep}||'.', $_ ) } + ( $rs_attrs->{_rsroot_source_handle}->resolve->_pri_cols ) ] - ; - - my ( $order_by_inner, $order_by_outer ) = $self->_order_directions($limit_order); - my $order_by_requested = $self->_order_by ($req_order); - + ); + my ($order_by_inner, $order_by_reversed); + # localise as we already have all the bind values we need + { + local $self->{order_bind}; + $order_by_inner = $self->_order_by ($inner_order); - my $esc_name_sep = "\Q$name_sep\E"; - my $col_re = qr/ ^ (?: (.+) $esc_name_sep )? ([^$esc_name_sep]+) $ /x; + my @out_chunks; + for my $ch ($self->_order_by_chunks ($inner_order)) { + $ch = $ch->[0] if ref $ch eq 'ARRAY'; - my $quoted_rs_alias = $self->_quote ($rs_alias); + $ch =~ s/\s+ ( ASC|DESC ) \s* $//ix; + my $dir = uc ($1||'ASC'); - # construct the new select lists, rename(alias) some columns if necessary - my (@outer_select, @inner_select, %seen_names, %col_aliases, %outer_col_aliases); + push @out_chunks, \join (' ', $ch, $dir eq 'ASC' ? 'DESC' : 'ASC' ); + } - for (@{$rs_attrs->{select}}) { - next if ref $_; - my ($table, $orig_colname) = ( $_ =~ $col_re ); - next unless $table; - $seen_names{$orig_colname}++; + $order_by_reversed = $self->_order_by (\@out_chunks); } - for my $i (0 .. $#sql_select) { - - my $colsel_arg = $rs_attrs->{select}[$i]; - my $colsel_sql = $sql_select[$i]; - - # this may or may not work (in case of a scalarref or something) - my ($table, $orig_colname) = ( $colsel_arg =~ $col_re ); - - my $quoted_alias; - # do not attempt to understand non-scalar selects - alias numerically - if (ref $colsel_arg) { - $quoted_alias = $self->_quote ('column_' . (@inner_select + 1) ); - } - # column name seen more than once - alias it - elsif ($orig_colname && - ($seen_names{$orig_colname} && $seen_names{$orig_colname} > 1) ) { - $quoted_alias = $self->_quote ("${table}__${orig_colname}"); - } - - # we did rename - make a record and adjust - if ($quoted_alias) { - # alias inner - push @inner_select, "$colsel_sql AS $quoted_alias"; - - # push alias to outer - push @outer_select, $quoted_alias; - - # Any aliasing accumulated here will be considered - # both for inner and outer adjustments of ORDER BY - $self->__record_alias ( - \%col_aliases, - $quoted_alias, - $colsel_arg, - $table ? $orig_colname : undef, + # this is the order supplement magic + my $mid_sel = $out_sel; + if ($extra_order_sel) { + for my $extra_col (sort + { $extra_order_sel->{$a} cmp $extra_order_sel->{$b} } + keys %$extra_order_sel + ) { + $in_sel .= sprintf (', %s AS %s', + $extra_col, + $extra_order_sel->{$extra_col}, ); + + $mid_sel .= ', ' . $extra_order_sel->{$extra_col}; } + } - # otherwise just leave things intact inside, and use the abbreviated one outside - # (as we do not have table names anymore) - else { - push @inner_select, $colsel_sql; - - my $outer_quoted = $self->_quote ($orig_colname); # it was not a duplicate so should just work - push @outer_select, $outer_quoted; - $self->__record_alias ( - \%outer_col_aliases, - $outer_quoted, - $colsel_arg, - $table ? $orig_colname : undef, - ); + # and this is order re-alias magic + for my $map ($extra_order_sel, $alias_map) { + for my $col (keys %$map) { + my $re_col = quotemeta ($col); + $_ =~ s/$re_col/$map->{$col}/ + for ($order_by_reversed, $order_by_requested); } } - my $outer_select = join (', ', @outer_select ); - my $inner_select = join (', ', @inner_select ); + # generate the rest of the sql + my $grpby_having = $self->_parse_rs_attrs ($rs_attrs); - %outer_col_aliases = (%outer_col_aliases, %col_aliases); + my $quoted_rs_alias = $self->_quote ($rs_attrs->{alias}); + $sql = sprintf ('SELECT TOP %u %s %s %s %s', + $rows + ($offset||0), + $in_sel, + $sql, + $grpby_having, + $order_by_inner, + ); + $sql = sprintf ('SELECT TOP %u %s FROM ( %s ) %s %s', + $rows, + $mid_sel, + $sql, + $quoted_rs_alias, + $order_by_reversed, + ) if $offset; + $sql = sprintf ('SELECT TOP %u %s FROM ( %s ) %s %s', + $rows, + $out_sel, + $sql, + $quoted_rs_alias, + $order_by_requested, + ) if ( ($offset && $order_by_requested) || ($mid_sel ne $out_sel) ); - # generate the rest - my $grpby_having = $self->_parse_rs_attrs ($rs_attrs); + $sql =~ s/\s*\n\s*/ /g; # easier to read in the debugger + return $sql; +} - # short circuit for counts - the ordering complexity is needless - if ($rs_attrs->{-for_count_only}) { - return "SELECT TOP $rows $inner_select $sql $grpby_having $order_by_outer"; - } +# This is the most evil limit "dialect" (more of a hack) for *really* +# stupid databases. It works by ordering the set by some unique column, +# and calculating amount of rows that have a less-er value (thus +# emulating a RowNum-like index). Of course this implies the set can +# only be ordered by a single unique columns. +sub _GenericSubQ { + my ($self, $sql, $rs_attrs, $rows, $offset) = @_; - # we can't really adjust the order_by columns, as introspection is lacking - # resort to simple substitution - for my $col (keys %outer_col_aliases) { - for ($order_by_requested, $order_by_outer) { - $_ =~ s/\s+$col\s+/ $outer_col_aliases{$col} /g; - } - } - for my $col (keys %col_aliases) { - $order_by_inner =~ s/\s+$col\s+/ $col_aliases{$col} /g; + my $root_rsrc = $rs_attrs->{_rsroot_source_handle}->resolve; + my $root_tbl_name = $root_rsrc->name; + + # mangle the input sql as we will be replacing the selector + $sql =~ s/^ \s* SELECT \s+ .+? \s+ (?= \b FROM \b )//ix + or croak "Unrecognizable SELECT: $sql"; + + my ($order_by, @rest) = do { + local $self->{quote_char}; + $self->_order_by_chunks ($rs_attrs->{order_by}) + }; + + unless ( + $order_by + && + ! @rest + && + ( ! ref $order_by + || + ( ref $order_by eq 'ARRAY' and @$order_by == 1 ) + ) + ) { + croak ( + 'Generic Subquery Limit does not work on resultsets without an order, or resultsets ' + . 'with complex order criteria (multicolumn and/or functions). Provide a single, ' + . 'unique-column order criteria.' + ); } + ($order_by) = @$order_by if ref $order_by; - my $inner_lim = $rows + $offset; + $order_by =~ s/\s+ ( ASC|DESC ) \s* $//ix; + my $direction = lc ($1 || 'asc'); - $sql = "SELECT TOP $inner_lim $inner_select $sql $grpby_having $order_by_inner"; + my ($unq_sort_col) = $order_by =~ /(?:^|\.)([^\.]+)$/; - if ($offset) { - $sql = <<"SQL"; + my $inf = $root_rsrc->storage->_resolve_column_info ( + $rs_attrs->{from}, [$order_by, $unq_sort_col] + ); - SELECT TOP $rows $outer_select FROM - ( - $sql - ) $quoted_rs_alias - $order_by_outer -SQL + my $ord_colinfo = $inf->{$order_by} || croak "Unable to determine source of order-criteria '$order_by'"; + if ($ord_colinfo->{-result_source}->name ne $root_tbl_name) { + croak "Generic Subquery Limit order criteria can be only based on the root-source '" + . $root_rsrc->source_name . "' (aliased as '$rs_attrs->{alias}')"; } - if ($order_by_requested) { - $sql = <<"SQL"; - - SELECT $outer_select FROM - ( $sql ) $quoted_rs_alias - $order_by_requested -SQL + # make sure order column is qualified + $order_by = "$rs_attrs->{alias}.$order_by" + unless $order_by =~ /^$rs_attrs->{alias}\./; + my $is_u; + my $ucs = { $root_rsrc->unique_constraints }; + for (values %$ucs ) { + if (@$_ == 1 && "$rs_attrs->{alias}.$_->[0]" eq $order_by) { + $is_u++; + last; + } } + croak "Generic Subquery Limit order criteria column '$order_by' must be unique (no unique constraint found)" + unless $is_u; + + my ($in_sel, $out_sel, $alias_map, $extra_order_sel) + = $self->_subqueried_limit_attrs ($rs_attrs); + + my $cmp_op = $direction eq 'desc' ? '>' : '<'; + my $count_tbl_alias = 'rownum__emulation'; - $sql =~ s/\s*\n\s*/ /g; # parsing out multiline statements is harder than a single line + my $order_group_having = $self->_parse_rs_attrs($rs_attrs); + + # add the order supplement (if any) as this is what will be used for the outer WHERE + $in_sel .= ", $_" for keys %{$extra_order_sel||{}}; + + $sql = sprintf (<_quote ($_) } ( + $rs_attrs->{alias}, + $root_tbl_name, + $count_tbl_alias, + "$count_tbl_alias.$unq_sort_col", + $order_by, + )), + $offset + ? sprintf ('BETWEEN %u AND %u', $offset, $offset + $rows - 1) + : sprintf ('< %u', $rows ) + , + ); + + $sql =~ s/\s*\n\s*/ /g; # easier to read in the debugger return $sql; } -=cut + # While we're at it, this should make LIMIT queries more efficient, # without digging into things too deeply @@ -382,8 +508,6 @@ sub select { $table = $self->_quote($table); } - local $self->{rownum_hack_count} = 1 - if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum'); @rest = (-1) unless defined $rest[0]; croak "LIMIT 0 Does Not Compute" if $rest[0] == 0; # and anyway, SQL::Abstract::Limit will cause a barf if we don't first @@ -509,7 +633,7 @@ sub _parse_rs_attrs { my $sql = ''; - if (my $g = $self->_recurse_fields($arg->{group_by}, { no_rownum_hack => 1 }) ) { + if (my $g = $self->_recurse_fields($arg->{group_by}) ) { $sql .= $self->_sqlcase(' group by ') . $g; }