use base qw/SQL::Abstract::Limit/;
use strict;
use warnings;
-use Carp::Clan qw/^DBIx::Class/;
+use Carp::Clan qw/^DBIx::Class|^SQL::Abstract/;
+
+BEGIN {
+ # reinstall the carp()/croak() functions imported into SQL::Abstract
+ # as Carp and Carp::Clan do not like each other much
+ no warnings qw/redefine/;
+ no strict qw/refs/;
+ for my $f (qw/carp croak/) {
+ my $orig = \&{"SQL::Abstract::$f"};
+ *{"SQL::Abstract::$f"} = sub {
+
+ local $Carp::CarpLevel = 1; # even though Carp::Clan ignores this, $orig will not
+
+ if (Carp::longmess() =~ /DBIx::Class::SQLAHacks::[\w]+\(\) called/) {
+ __PACKAGE__->can($f)->(@_);
+ }
+ else {
+ $orig->(@_);
+ }
+ }
+ }
+}
sub new {
my $self = shift->SUPER::new(@_);
return $self->SUPER::_where_field_BETWEEN ($lhs, $op, $rhs);
}
-
-
-# DB2 is the only remaining DB using this. Even though we are not sure if
-# RowNumberOver is still needed here (should be part of SQLA) leave the
-# code in place
+# Slow but ANSI standard Limit/Offset support. DB2 uses this
sub _RowNumberOver {
my ($self, $sql, $order, $rows, $offset ) = @_;
return $sql;
}
+# Crappy Top based Limit/Offset support. MSSQL uses this currently,
+# but may have to switch to RowNumberOver one day
+sub _Top {
+ my ( $self, $sql, $order, $rows, $offset ) = @_;
+
+ # mangle the input sql so it can be properly aliased in the outer queries
+ $sql =~ s/^ \s* SELECT \s+ (.+?) \s+ (?=FROM)//ix
+ or croak "Unrecognizable SELECT: $sql";
+ my $select = $1;
+
+ my (@outer_select, %col_index);
+ for my $selected_col (@{$self->{_dbic_rs_attrs}{select}}) {
+
+ my $new_colname;
+
+ if (ref $selected_col) {
+ $new_colname = $self->_quote ('column_' . (@outer_select + 1) );
+ }
+ else {
+ my $quoted_col = $self->_quote ($selected_col);
+
+ my $name_sep = $self->name_sep || '.';
+ $name_sep = "\Q$name_sep\E";
+
+ my ($table, $orig_colname) = ( $selected_col =~ / (?: (.+) $name_sep )? ([^$name_sep]+) $ /x );
+ $new_colname = $self->_quote ("${table}__${orig_colname}");
+
+ $select =~ s/(\Q$quoted_col\E|\Q$selected_col\E)/"$1 AS $new_colname"/e;
+
+ # record qualified name if available (should be)
+ $col_index{$selected_col} = $new_colname if $table;
+
+ # record unqialified name, undef if a duplicate is found
+ if (exists $col_index{$orig_colname}) {
+ $col_index{$orig_colname} = undef;
+ }
+ else {
+ $col_index{$orig_colname} = $new_colname;
+ }
+ }
+
+ push @outer_select, $new_colname;
+ }
+
+ my $outer_select = join (', ', @outer_select );
+
+
+ # deal with order
+ croak '$order supplied to SQLAHacks limit emulators must be a hash'
+ if (ref $order ne 'HASH');
+
+ $order = { %$order }; #copy
+
+ my $req_order = [ $self->_order_by_chunks ($order->{order_by}) ];
+ my $limit_order = [ @$req_order ? @$req_order : $self->_order_by_chunks ($order->{_virtual_order_by}) ];
+
+
+ # normalize all column names in order by
+ # no copies, just aliasing ($_)
+ for ($req_order, $limit_order) {
+ for ( @{$_ || []} ) {
+ $_ = $col_index{$_} if $col_index{$_};
+ }
+ }
+
+
+ # generate the rest
+ delete $order->{$_} for qw/order_by _virtual_order_by/;
+ my $grpby_having = $self->_order_by ($order);
+
+ my ( $order_by_inner, $order_by_outer ) = $self->_order_directions($limit_order);
+
+ my $last = $rows + $offset;
+
+ $sql = <<"SQL";
+
+ SELECT TOP $rows $outer_select FROM
+ (
+ SELECT TOP $last $select $sql $grpby_having $order_by_inner
+ ) AS inner_sel
+ $order_by_outer
+SQL
+
+ if (@$req_order) {
+ my $order_by_requested = $self->_order_by ($req_order);
+
+ $sql = <<"SQL";
+
+ SELECT $outer_select FROM
+ ( $sql ) AS outer_sel
+ $order_by_requested;
+SQL
+
+ }
+
+ return $sql;
+}
+
+
# While we're at it, this should make LIMIT queries more efficient,
# without digging into things too deeply
-use Scalar::Util 'blessed';
sub _find_syntax {
my ($self, $syntax) = @_;
-
- # DB2 is the only remaining DB using this. Even though we are not sure if
- # RowNumberOver is still needed here (should be part of SQLA) leave the
- # code in place
- my $dbhname = blessed($syntax) ? $syntax->{Driver}{Name} : $syntax;
- if(ref($self) && $dbhname) {
- if ($dbhname eq 'DB2') {
- return 'RowNumberOver';
- }
- }
-
- $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
+ return $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
}
+my $for_syntax = {
+ update => 'FOR UPDATE',
+ shared => 'FOR SHARE',
+};
sub select {
my ($self, $table, $fields, $where, $order, @rest) = @_;
- local $self->{having_bind} = [];
- local $self->{from_bind} = [];
+
+ $self->{"${_}_bind"} = [] for (qw/having from order/);
if (ref $table eq 'SCALAR') {
$table = $$table;
my ($sql, @where_bind) = $self->SUPER::select(
$table, $self->_recurse_fields($fields), $where, $order, @rest
);
- $sql .=
- $self->{for} ?
- (
- $self->{for} eq 'update' ? ' FOR UPDATE' :
- $self->{for} eq 'shared' ? ' FOR SHARE' :
- ''
- ) :
- ''
- ;
- return wantarray ? ($sql, @{$self->{from_bind}}, @where_bind, @{$self->{having_bind}}) : $sql;
+ if (my $for = delete $self->{_dbic_rs_attrs}{for}) {
+ $sql .= " $for_syntax->{$for}" if $for_syntax->{$for};
+ }
+
+ return wantarray ? ($sql, @{$self->{from_bind}}, @where_bind, @{$self->{having_bind}}, @{$self->{order_bind}} ) : $sql;
}
sub insert {
my $self = shift;
my $table = shift;
$table = $self->_quote($table) unless ref($table);
+
+ # SQLA will emit INSERT INTO $table ( ) VALUES ( )
+ # which is sadly understood only by MySQL. Change default behavior here,
+ # until SQLA2 comes with proper dialect support
+ if (! $_[0] or (ref $_[0] eq 'HASH' and !keys %{$_[0]} ) ) {
+ return "INSERT INTO ${table} DEFAULT VALUES"
+ }
+
$self->SUPER::insert($table, @_);
}
}
sub _order_by {
- my $self = shift;
- my $ret = '';
- my @extra;
- if (ref $_[0] eq 'HASH') {
- if (defined $_[0]->{group_by}) {
+ my ($self, $arg) = @_;
+
+ if (ref $arg eq 'HASH' and keys %$arg and not grep { $_ =~ /^-(?:desc|asc)/i } keys %$arg ) {
+
+ my $ret = '';
+
+ if (defined $arg->{group_by}) {
$ret = $self->_sqlcase(' group by ')
- .$self->_recurse_fields($_[0]->{group_by}, { no_rownum_hack => 1 });
+ .$self->_recurse_fields($arg->{group_by}, { no_rownum_hack => 1 });
}
- if (defined $_[0]->{having}) {
- my $frag;
- ($frag, @extra) = $self->_recurse_where($_[0]->{having});
- push(@{$self->{having_bind}}, @extra);
+
+ if (defined $arg->{having}) {
+ my ($frag, @bind) = $self->_recurse_where($arg->{having});
+ push(@{$self->{having_bind}}, @bind);
$ret .= $self->_sqlcase(' having ').$frag;
}
- if (defined $_[0]->{order_by}) {
- $ret .= $self->_order_by($_[0]->{order_by});
- }
- if (grep { $_ =~ /^-(desc|asc)/i } keys %{$_[0]}) {
- return $self->SUPER::_order_by($_[0]);
+
+ if (defined $arg->{order_by}) {
+ my ($frag, @bind) = $self->SUPER::_order_by($arg->{order_by});
+ push(@{$self->{order_bind}}, @bind);
+ $ret .= $frag;
}
- } elsif (ref $_[0] eq 'SCALAR') {
- $ret = $self->_sqlcase(' order by ').${ $_[0] };
- } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
- my @order = @{+shift};
- $ret = $self->_sqlcase(' order by ')
- .join(', ', map {
- my $r = $self->_order_by($_, @_);
- $r =~ s/^ ?ORDER BY //i;
- $r;
- } @order);
- } else {
- $ret = $self->SUPER::_order_by(@_);
+
+ return $ret;
+ }
+ else {
+ my ($sql, @bind) = $self->SUPER::_order_by ($arg);
+ push(@{$self->{order_bind}}, @bind);
+ return $sql;
}
- return $ret;
}
sub _order_directions {
my ($self, $order) = @_;
- $order = $order->{order_by} if ref $order eq 'HASH';
- if (ref $order eq 'HASH') {
- $order = [$self->_order_directions_hash($order)];
- } elsif (ref $order eq 'ARRAY') {
- $order = [map {
- if (ref $_ eq 'HASH') {
- $self->_order_directions_hash($_);
- } else {
- $_;
- }
- } @{ $order }];
- }
- return $self->SUPER::_order_directions($order);
-}
-sub _order_directions_hash {
- my ($self, $order) = @_;
- my @new_order;
- foreach my $key (keys %{ $order }) {
- if ($key =~ /^-(desc|asc)/i ) {
- my $direction = $1;
- my $type = ref $order->{ $key };
- if ($type eq 'ARRAY') {
- push @new_order, map( "$_ $direction", @{ $order->{ $key } } );
- } elsif (!$type) {
- push @new_order, "$order->{$key} $direction";
- } else {
- croak "hash order_by can only contain Scalar or Array, not $type";
- }
- } else {
- croak "$key is not a valid direction, use -asc or -desc";
- }
- }
- return @new_order;
+ # strip bind values - none of the current _order_directions users support them
+ return $self->SUPER::_order_directions( [ map
+ { ref $_ ? $_->[0] : $_ }
+ $self->_order_by_chunks ($order)
+ ]);
}
sub _table {
sub _fold_sqlbind {
my ($self, $sqlbind) = @_;
- my $sql = shift @$$sqlbind;
- push @{$self->{from_bind}}, @$$sqlbind;
+
+ my @sqlbind = @$$sqlbind; # copy
+ my $sql = shift @sqlbind;
+ push @{$self->{from_bind}}, @sqlbind;
+
return $sql;
}