X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=blobdiff_plain;f=lib%2FDBIx%2FClass%2FSQLAHacks.pm;h=bf6d148aa8fe5935614a34765e570ac3e47c832f;hb=6a247f3368100ac0557e33cc534bc8ad0ccb1175;hp=5e3ab35f57a61a03a37651c6b068e6aad928247f;hpb=a6b68a60b376e918a6058f37cb1115ba3163a59b;p=dbsrgits%2FDBIx-Class.git diff --git a/lib/DBIx/Class/SQLAHacks.pm b/lib/DBIx/Class/SQLAHacks.pm index 5e3ab35..bf6d148 100644 --- a/lib/DBIx/Class/SQLAHacks.pm +++ b/lib/DBIx/Class/SQLAHacks.pm @@ -1,15 +1,25 @@ package # Hide from PAUSE DBIx::Class::SQLAHacks; -# This module is a subclass of SQL::Abstract::Limit and includes a number -# of DBIC-specific workarounds, not yet suitable for inclusion into the -# SQLA core - -use base qw/SQL::Abstract::Limit/; +# This module is a subclass of SQL::Abstract and includes a number of +# DBIC-specific workarounds, not yet suitable for inclusion into the +# SQLA core. +# It also provides all (and more than) the functionality of +# SQL::Abstract::Limit, which proved to be very hard to keep updated + +use base qw/ + SQL::Abstract + Class::Accessor::Grouped +/; +use mro 'c3'; use strict; use warnings; -use Carp::Clan qw/^DBIx::Class|^SQL::Abstract/; -use Sub::Name(); +use List::Util 'first'; +use Sub::Name 'subname'; +use Carp::Clan qw/^DBIx::Class|^SQL::Abstract|^Try::Tiny/; +use namespace::clean; + +__PACKAGE__->mk_group_accessors (simple => qw/quote_char name_sep limit_dialect/); BEGIN { # reinstall the carp()/croak() functions imported into SQL::Abstract @@ -19,10 +29,11 @@ BEGIN { for my $f (qw/carp croak/) { my $orig = \&{"SQL::Abstract::$f"}; - *{"SQL::Abstract::$f"} = Sub::Name::subname "SQL::Abstract::$f" => + my $clan_import = \&{$f}; + *{"SQL::Abstract::$f"} = subname "SQL::Abstract::$f" => sub { if (Carp::longmess() =~ /DBIx::Class::SQLAHacks::[\w]+ .+? called \s at/x) { - __PACKAGE__->can($f)->(@_); + $clan_import->(@_); } else { goto $orig; @@ -31,47 +42,201 @@ BEGIN { } } +# the "oh noes offset/top without limit" constant +# limited to 32 bits for sanity (and consistency, +# since it is ultimately handed to sprintf %u) +# Implemented as a method, since ::Storage::DBI also +# refers to it (i.e. for the case of software_limit or +# as the value to abuse with MSSQL ordered subqueries) +sub __max_int { 0xFFFFFFFF }; -# Tries to determine limit dialect. +# !!! THIS IS ALSO HORRIFIC !!! /me ashamed +# +# Generates inner/outer select lists for various limit dialects +# which result in one or more subqueries (e.g. RNO, Top, RowNum) +# Any non-root-table columns need to have their table qualifier +# turned into a column alias (otherwise names in subqueries clash +# and/or lose their source table) # -sub new { - my $self = shift->SUPER::new(@_); +# Returns inner/outer strings of SQL QUOTED selectors with aliases +# (to be used in whatever select statement), and an alias index hashref +# of QUOTED SEL => QUOTED ALIAS pairs (to maybe be used for string-subst +# higher up). +# If an order_by is supplied, the inner select needs to bring out columns +# used in implicit (non-selected) orders, and the order condition itself +# needs to be realiased to the proper names in the outer query. Thus we +# also return a hashref (order doesn't matter) of QUOTED EXTRA-SEL => +# QUOTED ALIAS pairs, which is a list of extra selectors that do *not* +# exist in the original select list + +sub _subqueried_limit_attrs { + my ($self, $rs_attrs) = @_; + + croak 'Limit dialect implementation usable only in the context of DBIC (missing $rs_attrs)' + unless ref ($rs_attrs) eq 'HASH'; + + my ($re_sep, $re_alias) = map { quotemeta $_ } ( + $self->name_sep || '.', + $rs_attrs->{alias}, + ); + + # correlate select and as, build selection index + my (@sel, $in_sel_index); + for my $i (0 .. $#{$rs_attrs->{select}}) { + + my $s = $rs_attrs->{select}[$i]; + my $sql_sel = $self->_recurse_fields ($s); + my $sql_alias = (ref $s) eq 'HASH' ? $s->{-as} : undef; + + + push @sel, { + sql => $sql_sel, + unquoted_sql => do { local $self->{quote_char}; $self->_recurse_fields ($s) }, + as => + $sql_alias + || + $rs_attrs->{as}[$i] + || + croak "Select argument $i ($s) without corresponding 'as'" + , + }; + + $in_sel_index->{$sql_sel}++; + $in_sel_index->{$self->_quote ($sql_alias)}++ if $sql_alias; + + # record unqualified versions too, so we do not have + # to reselect the same column twice (in qualified and + # unqualified form) + if (! ref $s && $sql_sel =~ / $re_sep (.+) $/x) { + $in_sel_index->{$1}++; + } + } + + + # re-alias and remove any name separators from aliases, + # unless we are dealing with the current source alias + # (which will transcend the subqueries as it is necessary + # for possible further chaining) + my (@in_sel, @out_sel, %renamed); + for my $node (@sel) { + if (first { $_ =~ / (?{as}, $node->{unquoted_sql}) ) { + $node->{as} = $self->_unqualify_colname($node->{as}); + my $quoted_as = $self->_quote($node->{as}); + push @in_sel, sprintf '%s AS %s', $node->{sql}, $quoted_as; + push @out_sel, $quoted_as; + $renamed{$node->{sql}} = $quoted_as; + } + else { + push @in_sel, $node->{sql}; + push @out_sel, $self->_quote ($node->{as}); + } + } + + # see if the order gives us anything + my %extra_order_sel; + for my $chunk ($self->_order_by_chunks ($rs_attrs->{order_by})) { + # order with bind + $chunk = $chunk->[0] if (ref $chunk) eq 'ARRAY'; + $chunk =~ s/\s+ (?: ASC|DESC ) \s* $//ix; + + next if $in_sel_index->{$chunk}; + + $extra_order_sel{$chunk} ||= $self->_quote ( + 'ORDER__BY__' . scalar keys %extra_order_sel + ); + } + + return ( + (map { join (', ', @$_ ) } ( + \@in_sel, + \@out_sel) + ), + \%renamed, + keys %extra_order_sel ? \%extra_order_sel : (), + ); +} + +sub _unqualify_colname { + my ($self, $fqcn) = @_; + my $re_sep = quotemeta($self->name_sep || '.'); + $fqcn =~ s/ $re_sep /__/xg; + return $fqcn; +} - # This prevents the caching of $dbh in S::A::L, I believe - # If limit_dialect is a ref (like a $dbh), go ahead and replace - # it with what it resolves to: - $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect}) - if ref $self->{limit_dialect}; +# +# Follow limit dialect implementations +# - $self; +# PostgreSQL and SQLite +sub _LimitOffset { + my ( $self, $sql, $order, $rows, $offset ) = @_; + $sql .= $self->_order_by( $order ) . " LIMIT $rows"; + $sql .= " OFFSET $offset" if +$offset; + return $sql; } +# MySQL and any SQL::Statement based DBD +sub _LimitXY { + my ( $self, $sql, $order, $rows, $offset ) = @_; + $sql .= $self->_order_by( $order ) . " LIMIT "; + $sql .= "$offset, " if +$offset; + $sql .= $rows; + return $sql; +} -# ANSI standard Limit/Offset implementation. DB2 and MSSQL use this +# ANSI standard Limit/Offset implementation. DB2 and MSSQL >= 2005 use this sub _RowNumberOver { my ($self, $sql, $rs_attrs, $rows, $offset ) = @_; - # get the select to make the final amount of columns equal the original one - my ($select) = $sql =~ /^ \s* SELECT \s+ (.+?) \s+ FROM/ix + # mangle the input sql as we will be replacing the selector + $sql =~ s/^ \s* SELECT \s+ .+? \s+ (?= \b FROM \b )//ix or croak "Unrecognizable SELECT: $sql"; + # get selectors, and scan the order_by (if any) + my ($in_sel, $out_sel, $alias_map, $extra_order_sel) + = $self->_subqueried_limit_attrs ( $rs_attrs ); + # make up an order if none exists - my $order_by = $self->_order_by( - (delete $rs_attrs->{order_by}) || $self->_rno_default_order - ); + my $requested_order = (delete $rs_attrs->{order_by}) || $self->_rno_default_order; + my $rno_ord = $self->_order_by ($requested_order); + + # this is the order supplement magic + my $mid_sel = $out_sel; + if ($extra_order_sel) { + for my $extra_col (sort + { $extra_order_sel->{$a} cmp $extra_order_sel->{$b} } + keys %$extra_order_sel + ) { + $in_sel .= sprintf (', %s AS %s', + $extra_col, + $extra_order_sel->{$extra_col}, + ); + + $mid_sel .= ', ' . $extra_order_sel->{$extra_col}; + } + } + + # and this is order re-alias magic + for ($extra_order_sel, $alias_map) { + for my $col (keys %$_) { + my $re_col = quotemeta ($col); + $rno_ord =~ s/$re_col/$_->{$col}/; + } + } - # whatever is left of the order_by + # whatever is left of the order_by (only where is processed at this point) my $group_having = $self->_parse_rs_attrs($rs_attrs); my $qalias = $self->_quote ($rs_attrs->{alias}); + my $idx_name = $self->_quote ('rno__row__index'); $sql = sprintf (<_parse_rs_attrs ($rs_attrs), ); } # Firebird specific limit, reverse of _SkipFirst for Informix +# According to SQLA::Limit firebird/interbase also supports +# ROWS X TO Y (in addition to FIRST X SKIP Y) sub _FirstSkip { my ($self, $sql, $rs_attrs, $rows, $offset) = @_; @@ -110,9 +279,9 @@ sub _FirstSkip { or croak "Unrecognizable SELECT: $sql"; return sprintf ('SELECT %s%s%s%s', - sprintf ('FIRST %d ', $rows), + sprintf ('FIRST %u ', $rows), $offset - ? sprintf ('SKIP %d ', $offset) + ? sprintf ('SKIP %u ', $offset) : '' , $sql, @@ -120,294 +289,368 @@ sub _FirstSkip { ); } -# Crappy Top based Limit/Offset support. Legacy from MSSQL. +# WhOracle limits +sub _RowNum { + my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_; + + # mangle the input sql as we will be replacing the selector + $sql =~ s/^ \s* SELECT \s+ .+? \s+ (?= \b FROM \b )//ix + or croak "Unrecognizable SELECT: $sql"; + + my ($insel, $outsel) = $self->_subqueried_limit_attrs ($rs_attrs); + + my $qalias = $self->_quote ($rs_attrs->{alias}); + my $idx_name = $self->_quote ('rownum__index'); + my $order_group_having = $self->_parse_rs_attrs($rs_attrs); + + $sql = sprintf (<{select}}) { - croak (sprintf ( - 'SQL SELECT did not parse cleanly - retrieved %d comma separated elements, while ' - . 'the resultset select attribure contains %d elements: %s', - scalar @sql_select, - scalar @{$rs_attrs->{select}}, - $sql_select, - )); - } - my $name_sep = $self->name_sep || '.'; - my $esc_name_sep = "\Q$name_sep\E"; - my $col_re = qr/ ^ (?: (.+) $esc_name_sep )? ([^$esc_name_sep]+) $ /x; + # get selectors + my ($in_sel, $out_sel, $alias_map, $extra_order_sel) + = $self->_subqueried_limit_attrs ($rs_attrs); - my $rs_alias = $rs_attrs->{alias}; - my $quoted_rs_alias = $self->_quote ($rs_alias); + my $requested_order = delete $rs_attrs->{order_by}; - # construct the new select lists, rename(alias) some columns if necessary - my (@outer_select, @inner_select, %seen_names, %col_aliases, %outer_col_aliases); + my $order_by_requested = $self->_order_by ($requested_order); - for (@{$rs_attrs->{select}}) { - next if ref $_; - my ($table, $orig_colname) = ( $_ =~ $col_re ); - next unless $table; - $seen_names{$orig_colname}++; - } + # make up an order unless supplied + my $inner_order = ($order_by_requested + ? $requested_order + : [ map + { join ('', $rs_attrs->{alias}, $self->{name_sep}||'.', $_ ) } + ( $rs_attrs->{_rsroot_source_handle}->resolve->_pri_cols ) + ] + ); - for my $i (0 .. $#sql_select) { + my ($order_by_inner, $order_by_reversed); - my $colsel_arg = $rs_attrs->{select}[$i]; - my $colsel_sql = $sql_select[$i]; + # localise as we already have all the bind values we need + { + local $self->{order_bind}; + $order_by_inner = $self->_order_by ($inner_order); - # this may or may not work (in case of a scalarref or something) - my ($table, $orig_colname) = ( $colsel_arg =~ $col_re ); + my @out_chunks; + for my $ch ($self->_order_by_chunks ($inner_order)) { + $ch = $ch->[0] if ref $ch eq 'ARRAY'; - my $quoted_alias; - # do not attempt to understand non-scalar selects - alias numerically - if (ref $colsel_arg) { - $quoted_alias = $self->_quote ('column_' . (@inner_select + 1) ); - } - # column name seen more than once - alias it - elsif ($orig_colname && - ($seen_names{$orig_colname} && $seen_names{$orig_colname} > 1) ) { - $quoted_alias = $self->_quote ("${table}__${orig_colname}"); + $ch =~ s/\s+ ( ASC|DESC ) \s* $//ix; + my $dir = uc ($1||'ASC'); + + push @out_chunks, \join (' ', $ch, $dir eq 'ASC' ? 'DESC' : 'ASC' ); } - # we did rename - make a record and adjust - if ($quoted_alias) { - # alias inner - push @inner_select, "$colsel_sql AS $quoted_alias"; - - # push alias to outer - push @outer_select, $quoted_alias; - - # Any aliasing accumulated here will be considered - # both for inner and outer adjustments of ORDER BY - $self->__record_alias ( - \%col_aliases, - $quoted_alias, - $colsel_arg, - $table ? $orig_colname : undef, + $order_by_reversed = $self->_order_by (\@out_chunks); + } + + # this is the order supplement magic + my $mid_sel = $out_sel; + if ($extra_order_sel) { + for my $extra_col (sort + { $extra_order_sel->{$a} cmp $extra_order_sel->{$b} } + keys %$extra_order_sel + ) { + $in_sel .= sprintf (', %s AS %s', + $extra_col, + $extra_order_sel->{$extra_col}, ); + + $mid_sel .= ', ' . $extra_order_sel->{$extra_col}; } - # otherwise just leave things intact inside, and use the abbreviated one outside - # (as we do not have table names anymore) - else { - push @inner_select, $colsel_sql; - - my $outer_quoted = $self->_quote ($orig_colname); # it was not a duplicate so should just work - push @outer_select, $outer_quoted; - $self->__record_alias ( - \%outer_col_aliases, - $outer_quoted, - $colsel_arg, - $table ? $orig_colname : undef, - ); + # since whatever order bindvals there are, they will be realiased + # and need to show up in front of the entire initial inner subquery + # Unshift *from_bind* to make this happen (horrible, horrible, but + # we don't have another mechanism yet) + unshift @{$self->{from_bind}}, @{$self->{order_bind}}; + } + + # and this is order re-alias magic + for my $map ($extra_order_sel, $alias_map) { + for my $col (keys %$map) { + my $re_col = quotemeta ($col); + $_ =~ s/$re_col/$map->{$col}/ + for ($order_by_reversed, $order_by_requested); } } - my $outer_select = join (', ', @outer_select ); - my $inner_select = join (', ', @inner_select ); + # generate the rest of the sql + my $grpby_having = $self->_parse_rs_attrs ($rs_attrs); - %outer_col_aliases = (%outer_col_aliases, %col_aliases); + my $quoted_rs_alias = $self->_quote ($rs_attrs->{alias}); - # deal with order - croak '$order/attr container supplied to SQLAHacks limit emulators must be a hash' - if (ref $rs_attrs ne 'HASH'); + $sql = sprintf ('SELECT TOP %u %s %s %s %s', + $rows + ($offset||0), + $in_sel, + $sql, + $grpby_having, + $order_by_inner, + ); - my $req_order = $rs_attrs->{order_by}; + $sql = sprintf ('SELECT TOP %u %s FROM ( %s ) %s %s', + $rows, + $mid_sel, + $sql, + $quoted_rs_alias, + $order_by_reversed, + ) if $offset; - # examine normalized version, collapses nesting - my $limit_order = scalar $self->_order_by_chunks ($req_order) - ? $req_order - : [ map - { join ('', $rs_alias, $name_sep, $_ ) } - ( $rs_attrs->{_rsroot_source_handle}->resolve->primary_columns ) - ] - ; + $sql = sprintf ('SELECT TOP %u %s FROM ( %s ) %s %s', + $rows, + $out_sel, + $sql, + $quoted_rs_alias, + $order_by_requested, + ) if ( ($offset && $order_by_requested) || ($mid_sel ne $out_sel) ); - my ( $order_by_inner, $order_by_outer ) = $self->_order_directions($limit_order); - my $order_by_requested = $self->_order_by ($req_order); + $sql =~ s/\s*\n\s*/ /g; # easier to read in the debugger + return $sql; +} - # generate the rest - delete $rs_attrs->{order_by}; - my $grpby_having = $self->_parse_rs_attrs ($rs_attrs); +# This for Sybase ASE, to use SET ROWCOUNT when there is no offset, and +# GenericSubQ otherwise. +sub _RowCountOrGenericSubQ { + my $self = shift; + my ($sql, $rs_attrs, $rows, $offset) = @_; - # short circuit for counts - the ordering complexity is needless - if ($rs_attrs->{-for_count_only}) { - return "SELECT TOP $rows $inner_select $sql $grpby_having $order_by_outer"; - } + return $self->_GenericSubQ(@_) if $offset; - # we can't really adjust the order_by columns, as introspection is lacking - # resort to simple substitution - for my $col (keys %outer_col_aliases) { - for ($order_by_requested, $order_by_outer) { - $_ =~ s/\s+$col\s+/ $outer_col_aliases{$col} /g; - } - } - for my $col (keys %col_aliases) { - $order_by_inner =~ s/\s+$col\s+/ $col_aliases{$col} /g; + return sprintf <<"EOF", $rows, $sql; +SET ROWCOUNT %d +%s +SET ROWCOUNT 0 +EOF +} + +# This is the most evil limit "dialect" (more of a hack) for *really* +# stupid databases. It works by ordering the set by some unique column, +# and calculating amount of rows that have a less-er value (thus +# emulating a RowNum-like index). Of course this implies the set can +# only be ordered by a single unique columns. +sub _GenericSubQ { + my ($self, $sql, $rs_attrs, $rows, $offset) = @_; + + my $root_rsrc = $rs_attrs->{_rsroot_source_handle}->resolve; + my $root_tbl_name = $root_rsrc->name; + + # mangle the input sql as we will be replacing the selector + $sql =~ s/^ \s* SELECT \s+ .+? \s+ (?= \b FROM \b )//ix + or croak "Unrecognizable SELECT: $sql"; + + my ($order_by, @rest) = do { + local $self->{quote_char}; + $self->_order_by_chunks ($rs_attrs->{order_by}) + }; + + unless ( + $order_by + && + ! @rest + && + ( ! ref $order_by + || + ( ref $order_by eq 'ARRAY' and @$order_by == 1 ) + ) + ) { + croak ( + 'Generic Subquery Limit does not work on resultsets without an order, or resultsets ' + . 'with complex order criteria (multicolumn and/or functions). Provide a single, ' + . 'unique-column order criteria.' + ); } + ($order_by) = @$order_by if ref $order_by; - my $inner_lim = $rows + $offset; + $order_by =~ s/\s+ ( ASC|DESC ) \s* $//ix; + my $direction = lc ($1 || 'asc'); - $sql = "SELECT TOP $inner_lim $inner_select $sql $grpby_having $order_by_inner"; + my ($unq_sort_col) = $order_by =~ /(?:^|\.)([^\.]+)$/; - if ($offset) { - $sql = <<"SQL"; + my $inf = $root_rsrc->storage->_resolve_column_info ( + $rs_attrs->{from}, [$order_by, $unq_sort_col] + ); - SELECT TOP $rows $outer_select FROM - ( - $sql - ) $quoted_rs_alias - $order_by_outer -SQL + my $ord_colinfo = $inf->{$order_by} || croak "Unable to determine source of order-criteria '$order_by'"; + if ($ord_colinfo->{-result_source}->name ne $root_tbl_name) { + croak "Generic Subquery Limit order criteria can be only based on the root-source '" + . $root_rsrc->source_name . "' (aliased as '$rs_attrs->{alias}')"; } - if ($order_by_requested) { - $sql = <<"SQL"; - - SELECT $outer_select FROM - ( $sql ) $quoted_rs_alias - $order_by_requested -SQL + # make sure order column is qualified + $order_by = "$rs_attrs->{alias}.$order_by" + unless $order_by =~ /^$rs_attrs->{alias}\./; + my $is_u; + my $ucs = { $root_rsrc->unique_constraints }; + for (values %$ucs ) { + if (@$_ == 1 && "$rs_attrs->{alias}.$_->[0]" eq $order_by) { + $is_u++; + last; + } } + croak "Generic Subquery Limit order criteria column '$order_by' must be unique (no unique constraint found)" + unless $is_u; + + my ($in_sel, $out_sel, $alias_map, $extra_order_sel) + = $self->_subqueried_limit_attrs ($rs_attrs); + + my $cmp_op = $direction eq 'desc' ? '>' : '<'; + my $count_tbl_alias = 'rownum__emulation'; - $sql =~ s/\s*\n\s*/ /g; # parsing out multiline statements is harder than a single line + my $order_sql = $self->_order_by (delete $rs_attrs->{order_by}); + my $group_having_sql = $self->_parse_rs_attrs($rs_attrs); + + # add the order supplement (if any) as this is what will be used for the outer WHERE + $in_sel .= ", $_" for keys %{$extra_order_sel||{}}; + + $sql = sprintf (<_quote ($_) } ( + $rs_attrs->{alias}, + $root_tbl_name, + $count_tbl_alias, + "$count_tbl_alias.$unq_sort_col", + $order_by, + )), + $offset + ? sprintf ('BETWEEN %u AND %u', $offset, $offset + $rows - 1) + : sprintf ('< %u', $rows ) + , + ); + + $sql =~ s/\s*\n\s*/ /g; # easier to read in the debugger return $sql; } -# action at a distance to shorten Top code above -sub __record_alias { - my ($self, $register, $alias, $fqcol, $col) = @_; +# +# Actual SQL::Abstract overrid^Whacks +# + +# Handle limit-dialect selection +sub select { + my ($self, $table, $fields, $where, $rs_attrs, $limit, $offset) = @_; - # record qualified name - $register->{$fqcol} = $alias; - $register->{$self->_quote($fqcol)} = $alias; - return unless $col; + $fields = $self->_recurse_fields($fields); - # record unqualified name, undef (no adjustment) if a duplicate is found - if (exists $register->{$col}) { - $register->{$col} = undef; + if (defined $offset) { + croak ('A supplied offset must be a non-negative integer') + if ( $offset =~ /\D/ or $offset < 0 ); } - else { - $register->{$col} = $alias; + $offset ||= 0; + + if (defined $limit) { + croak ('A supplied limit must be a positive integer') + if ( $limit =~ /\D/ or $limit <= 0 ); + } + elsif ($offset) { + $limit = $self->__max_int; } - $register->{$self->_quote($col)} = $register->{$col}; -} + my ($sql, @bind); + if ($limit) { + # this is legacy code-flow from SQLA::Limit, it is not set in stone + ($sql, @bind) = $self->next::method ($table, $fields, $where); -# While we're at it, this should make LIMIT queries more efficient, -# without digging into things too deeply -sub _find_syntax { - my ($self, $syntax) = @_; - return $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax); -} + my $limiter = + $self->can ('emulate_limit') # also backcompat hook from SQLA::Limit + || + do { + my $dialect = $self->limit_dialect + or croak "Unable to generate SQL-limit - no limit dialect specified on $self, and no emulate_limit method found"; + $self->can ("_$dialect") + or croak "SQLAHacks does not implement the requested dialect '$dialect'"; + } + ; -# Quotes table names, handles "limit" dialects (e.g. where rownum between x and -# y) -sub select { - my ($self, $table, $fields, $where, $rs_attrs, @rest) = @_; + $sql = $self->$limiter ($sql, $rs_attrs, $limit, $offset); + } + else { + ($sql, @bind) = $self->next::method ($table, $fields, $where, $rs_attrs); + } - $self->{"${_}_bind"} = [] for (qw/having from order/); + push @{$self->{where_bind}}, @bind; - if (not ref($table) or ref($table) eq 'SCALAR') { - $table = $self->_quote($table); - } +# this *must* be called, otherwise extra binds will remain in the sql-maker + my @all_bind = $self->_assemble_binds; - local $self->{rownum_hack_count} = 1 - if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum'); - @rest = (-1) unless defined $rest[0]; - croak "LIMIT 0 Does Not Compute" if $rest[0] == 0; - # and anyway, SQL::Abstract::Limit will cause a barf if we don't first + return wantarray ? ($sql, @all_bind) : $sql; +} - my ($sql, @where_bind) = $self->SUPER::select( - $table, $self->_recurse_fields($fields), $where, $rs_attrs, @rest - ); - return wantarray ? ($sql, @{$self->{from_bind}}, @where_bind, @{$self->{having_bind}}, @{$self->{order_bind}} ) : $sql; +sub _assemble_binds { + my $self = shift; + return map { @{ (delete $self->{"${_}_bind"}) || [] } } (qw/from where having order/); } -# Quotes table names, and handles default inserts +# Handle default inserts sub insert { - my $self = shift; - my $table = shift; - $table = $self->_quote($table); +# optimized due to hotttnesss +# my ($self, $table, $data, $options) = @_; # SQLA will emit INSERT INTO $table ( ) VALUES ( ) # which is sadly understood only by MySQL. Change default behavior here, # until SQLA2 comes with proper dialect support - if (! $_[0] or (ref $_[0] eq 'HASH' and !keys %{$_[0]} ) ) { - my $sql = "INSERT INTO ${table} DEFAULT VALUES"; + if (! $_[2] or (ref $_[2] eq 'HASH' and !keys %{$_[2]} ) ) { + my $sql = "INSERT INTO $_[1] DEFAULT VALUES"; - if (my $ret = ($_[1]||{})->{returning} ) { - $sql .= $self->_insert_returning ($ret); + if (my $ret = ($_[3]||{})->{returning} ) { + $sql .= $_[0]->_insert_returning ($ret); } return $sql; } - $self->SUPER::insert($table, @_); -} - -# Just quotes table names. -sub update { - my $self = shift; - my $table = shift; - $table = $self->_quote($table); - $self->SUPER::update($table, @_); -} - -# Just quotes table names. -sub delete { - my $self = shift; - my $table = shift; - $table = $self->_quote($table); - $self->SUPER::delete($table, @_); -} - -sub _emulate_limit { - my $self = shift; - # my ( $syntax, $sql, $order, $rows, $offset ) = @_; - - if ($_[3] == -1) { - return $_[1] . $self->_parse_rs_attrs($_[2]); - } else { - return $self->SUPER::_emulate_limit(@_); - } + next::method(@_); } sub _recurse_fields { - my ($self, $fields, $params) = @_; + my ($self, $fields) = @_; my $ref = ref $fields; return $self->_quote($fields) unless $ref; return $$fields if $ref eq 'SCALAR'; if ($ref eq 'ARRAY') { - return join(', ', map { - $self->_recurse_fields($_) - .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack}) - ? ' AS col'.$self->{rownum_hack_count}++ - : '') - } @$fields); + return join(', ', map { $self->_recurse_fields($_) } @$fields); } elsif ($ref eq 'HASH') { - my %hash = %$fields; + my %hash = %$fields; # shallow copy my $as = delete $hash{-as}; # if supplied - my ($func, $args) = each %hash; - delete $hash{$func}; + my ($func, $args, @toomany) = %hash; + + # there should be only one pair + if (@toomany) { + croak "Malformed select argument - too many keys in hash: " . join (',', keys %$fields ); + } if (lc ($func) eq 'distinct' && ref $args eq 'ARRAY' && @$args > 1) { croak ( @@ -425,11 +668,6 @@ sub _recurse_fields { : '' ); - # there should be nothing left - if (keys %hash) { - croak "Malformed select argument - too many keys in hash: " . join (',', keys %$fields ); - } - return $select; } # Is the second check absolutely necessary? @@ -459,7 +697,7 @@ sub _parse_rs_attrs { my $sql = ''; - if (my $g = $self->_recurse_fields($arg->{group_by}, { no_rownum_hack => 1 }) ) { + if (my $g = $self->_recurse_fields($arg->{group_by}) ) { $sql .= $self->_sqlcase(' group by ') . $g; } @@ -488,36 +726,25 @@ sub _order_by { return $self->_parse_rs_attrs ($arg); } else { - my ($sql, @bind) = $self->SUPER::_order_by ($arg); + my ($sql, @bind) = $self->next::method($arg); push @{$self->{order_bind}}, @bind; return $sql; } } -sub _order_directions { - my ($self, $order) = @_; - - # strip bind values - none of the current _order_directions users support them - return $self->SUPER::_order_directions( [ map - { ref $_ ? $_->[0] : $_ } - $self->_order_by_chunks ($order) - ]); -} - sub _table { - my ($self, $from) = @_; - if (ref $from eq 'ARRAY') { - return $self->_recurse_from(@$from); - } elsif (ref $from eq 'HASH') { - return $self->_make_as($from); - } else { - return $from; # would love to quote here but _table ends up getting called - # twice during an ->select without a limit clause due to - # the way S::A::Limit->select works. should maybe consider - # bypassing this and doing S::A::select($self, ...) in - # our select method above. meantime, quoting shims have - # been added to select/insert/update/delete here +# optimized due to hotttnesss +# my ($self, $from) = @_; + if (my $ref = ref $_[1] ) { + if ($ref eq 'ARRAY') { + return $_[0]->_recurse_from(@{$_[1]}); + } + elsif ($ref eq 'HASH') { + return $_[0]->_make_as($_[1]); + } } + + return $_[0]->next::method ($_[1]); } sub _generate_join_clause { @@ -603,29 +830,9 @@ sub _join_condition { } elsif (ref $cond eq 'ARRAY') { return join(' OR ', map { $self->_join_condition($_) } @$cond); } else { - die "Can't handle this yet!"; + croak "Can't handle this yet!"; } } -sub limit_dialect { - my $self = shift; - $self->{limit_dialect} = shift if @_; - return $self->{limit_dialect}; -} - -# Set to an array-ref to specify separate left and right quotes for table names. -# A single scalar is equivalen to [ $char, $char ] -sub quote_char { - my $self = shift; - $self->{quote_char} = shift if @_; - return $self->{quote_char}; -} - -# Character separating quoted table names. -sub name_sep { - my $self = shift; - $self->{name_sep} = shift if @_; - return $self->{name_sep}; -} 1;