Fix order by clauses for MSSQL
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / SQLAHacks.pm
CommitLineData
6f4ddea1 1package # Hide from PAUSE
855c6fd0 2 DBIx::Class::SQLAHacks;
6f4ddea1 3
4use base qw/SQL::Abstract::Limit/;
db56cf3d 5use Carp::Clan qw/^DBIx::Class/;
6f4ddea1 6
7sub new {
8 my $self = shift->SUPER::new(@_);
9
10 # This prevents the caching of $dbh in S::A::L, I believe
11 # If limit_dialect is a ref (like a $dbh), go ahead and replace
12 # it with what it resolves to:
13 $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
14 if ref $self->{limit_dialect};
15
16 $self;
17}
18
19
6f4ddea1 20# Some databases (sqlite) do not handle multiple parenthesis
21# around in/between arguments. A tentative x IN ( ( 1, 2 ,3) )
22# is interpreted as x IN 1 or something similar.
23#
24# Since we currently do not have access to the SQLA AST, resort
25# to barbaric mutilation of any SQL supplied in literal form
26
27sub _strip_outer_paren {
28 my ($self, $arg) = @_;
29
30 return $self->_SWITCH_refkind ($arg, {
31 ARRAYREFREF => sub {
32 $$arg->[0] = __strip_outer_paren ($$arg->[0]);
33 return $arg;
34 },
35 SCALARREF => sub {
36 return \__strip_outer_paren( $$arg );
37 },
38 FALLBACK => sub {
39 return $arg
40 },
41 });
42}
43
44sub __strip_outer_paren {
45 my $sql = shift;
46
47 if ($sql and not ref $sql) {
48 while ($sql =~ /^ \s* \( (.*) \) \s* $/x ) {
49 $sql = $1;
50 }
51 }
52
53 return $sql;
54}
55
56sub _where_field_IN {
57 my ($self, $lhs, $op, $rhs) = @_;
58 $rhs = $self->_strip_outer_paren ($rhs);
59 return $self->SUPER::_where_field_IN ($lhs, $op, $rhs);
60}
61
62sub _where_field_BETWEEN {
63 my ($self, $lhs, $op, $rhs) = @_;
64 $rhs = $self->_strip_outer_paren ($rhs);
65 return $self->SUPER::_where_field_BETWEEN ($lhs, $op, $rhs);
66}
67
68
69
70# DB2 is the only remaining DB using this. Even though we are not sure if
71# RowNumberOver is still needed here (should be part of SQLA) leave the
72# code in place
73sub _RowNumberOver {
74 my ($self, $sql, $order, $rows, $offset ) = @_;
75
76 $offset += 1;
77 my $last = $rows + $offset;
78 my ( $order_by ) = $self->_order_by( $order );
79
80 $sql = <<"SQL";
81SELECT * FROM
82(
83 SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
84 $sql
85 $order_by
86 ) Q1
87) Q2
88WHERE ROW_NUM BETWEEN $offset AND $last
89
90SQL
91
92 return $sql;
93}
94
95
96# While we're at it, this should make LIMIT queries more efficient,
97# without digging into things too deeply
98use Scalar::Util 'blessed';
99sub _find_syntax {
100 my ($self, $syntax) = @_;
101
102 # DB2 is the only remaining DB using this. Even though we are not sure if
103 # RowNumberOver is still needed here (should be part of SQLA) leave the
104 # code in place
105 my $dbhname = blessed($syntax) ? $syntax->{Driver}{Name} : $syntax;
106 if(ref($self) && $dbhname && $dbhname eq 'DB2') {
107 return 'RowNumberOver';
108 }
109
110 $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
111}
112
113sub select {
114 my ($self, $table, $fields, $where, $order, @rest) = @_;
db56cf3d 115 local $self->{having_bind} = [];
116 local $self->{from_bind} = [];
117
6f4ddea1 118 if (ref $table eq 'SCALAR') {
119 $table = $$table;
120 }
121 elsif (not ref $table) {
122 $table = $self->_quote($table);
123 }
124 local $self->{rownum_hack_count} = 1
125 if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
126 @rest = (-1) unless defined $rest[0];
e8fcf76f 127 croak "LIMIT 0 Does Not Compute" if $rest[0] == 0;
6f4ddea1 128 # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
db56cf3d 129 my ($sql, @where_bind) = $self->SUPER::select(
6f4ddea1 130 $table, $self->_recurse_fields($fields), $where, $order, @rest
131 );
132 $sql .=
133 $self->{for} ?
134 (
135 $self->{for} eq 'update' ? ' FOR UPDATE' :
136 $self->{for} eq 'shared' ? ' FOR SHARE' :
137 ''
138 ) :
139 ''
140 ;
db56cf3d 141 return wantarray ? ($sql, @{$self->{from_bind}}, @where_bind, @{$self->{having_bind}}) : $sql;
6f4ddea1 142}
143
144sub insert {
145 my $self = shift;
146 my $table = shift;
147 $table = $self->_quote($table) unless ref($table);
148 $self->SUPER::insert($table, @_);
149}
150
151sub update {
152 my $self = shift;
153 my $table = shift;
154 $table = $self->_quote($table) unless ref($table);
155 $self->SUPER::update($table, @_);
156}
157
158sub delete {
159 my $self = shift;
160 my $table = shift;
161 $table = $self->_quote($table) unless ref($table);
162 $self->SUPER::delete($table, @_);
163}
164
165sub _emulate_limit {
166 my $self = shift;
167 if ($_[3] == -1) {
168 return $_[1].$self->_order_by($_[2]);
169 } else {
170 return $self->SUPER::_emulate_limit(@_);
171 }
172}
173
174sub _recurse_fields {
175 my ($self, $fields, $params) = @_;
176 my $ref = ref $fields;
177 return $self->_quote($fields) unless $ref;
178 return $$fields if $ref eq 'SCALAR';
179
180 if ($ref eq 'ARRAY') {
181 return join(', ', map {
182 $self->_recurse_fields($_)
183 .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack})
184 ? ' AS col'.$self->{rownum_hack_count}++
185 : '')
186 } @$fields);
187 } elsif ($ref eq 'HASH') {
188 foreach my $func (keys %$fields) {
db56cf3d 189 if ($func eq 'distinct') {
190 my $_fields = $fields->{$func};
191 if (ref $_fields eq 'ARRAY' && @{$_fields} > 1) {
e8fcf76f 192 croak "Unsupported syntax, please use " .
db56cf3d 193 "{ group_by => [ qw/" . (join ' ', @$_fields) . "/ ] }" .
194 " or " .
195 "{ select => [ qw/" . (join ' ', @$_fields) . "/ ], distinct => 1 }";
196 }
197 else {
198 $_fields = @{$_fields}[0] if ref $_fields eq 'ARRAY';
199 carp "This syntax will be deprecated in 09, please use " .
200 "{ group_by => '${_fields}' }" .
201 " or " .
202 "{ select => '${_fields}', distinct => 1 }";
203 }
204 }
205
6f4ddea1 206 return $self->_sqlcase($func)
207 .'( '.$self->_recurse_fields($fields->{$func}).' )';
208 }
209 }
210 # Is the second check absolutely necessary?
211 elsif ( $ref eq 'REF' and ref($$fields) eq 'ARRAY' ) {
db56cf3d 212 return $self->_fold_sqlbind( $fields );
6f4ddea1 213 }
214 else {
e8fcf76f 215 croak($ref . qq{ unexpected in _recurse_fields()})
6f4ddea1 216 }
217}
218
219sub _order_by {
220 my $self = shift;
221 my $ret = '';
222 my @extra;
223 if (ref $_[0] eq 'HASH') {
224 if (defined $_[0]->{group_by}) {
225 $ret = $self->_sqlcase(' group by ')
226 .$self->_recurse_fields($_[0]->{group_by}, { no_rownum_hack => 1 });
227 }
228 if (defined $_[0]->{having}) {
229 my $frag;
230 ($frag, @extra) = $self->_recurse_where($_[0]->{having});
231 push(@{$self->{having_bind}}, @extra);
232 $ret .= $self->_sqlcase(' having ').$frag;
233 }
234 if (defined $_[0]->{order_by}) {
235 $ret .= $self->_order_by($_[0]->{order_by});
236 }
237 if (grep { $_ =~ /^-(desc|asc)/i } keys %{$_[0]}) {
238 return $self->SUPER::_order_by($_[0]);
239 }
240 } elsif (ref $_[0] eq 'SCALAR') {
241 $ret = $self->_sqlcase(' order by ').${ $_[0] };
242 } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
243 my @order = @{+shift};
244 $ret = $self->_sqlcase(' order by ')
245 .join(', ', map {
246 my $r = $self->_order_by($_, @_);
247 $r =~ s/^ ?ORDER BY //i;
248 $r;
249 } @order);
250 } else {
251 $ret = $self->SUPER::_order_by(@_);
252 }
253 return $ret;
254}
255
256sub _order_directions {
257 my ($self, $order) = @_;
258 $order = $order->{order_by} if ref $order eq 'HASH';
fd4cb60a 259 if (ref $order eq 'HASH') {
260 $order = [$self->_order_directions_hash($order)];
261 } elsif (ref $order eq 'ARRAY') {
262 $order = [map {
263 if (ref $_ eq 'HASH') {
264 $self->_order_directions_hash($_);
265 } else {
266 $_;
267 }
268 } @{ $order }];
269 }
6f4ddea1 270 return $self->SUPER::_order_directions($order);
271}
272
fd4cb60a 273sub _order_directions_hash {
274 my ($self, $order) = @_;
275 if (grep { $_ =~ /^-(desc|asc)/i } keys %{$order}) {
276 return map {
277 my $key = $_;
278 my @tmp;
279 s/^-(desc|asc)/\1/i;
280 my $dir = $_;
281 if (ref $order->{ $key } eq 'ARRAY') {
282 @tmp = map "$_ $dir", @{ $order->{ $key } };
283 } else { # should be scalar
284 @tmp = ( "$order->{$key} $dir" );
285 }
286 @tmp;
287 } keys %{$order};
288 }
289}
290
6f4ddea1 291sub _table {
292 my ($self, $from) = @_;
293 if (ref $from eq 'ARRAY') {
294 return $self->_recurse_from(@$from);
295 } elsif (ref $from eq 'HASH') {
296 return $self->_make_as($from);
297 } else {
298 return $from; # would love to quote here but _table ends up getting called
299 # twice during an ->select without a limit clause due to
300 # the way S::A::Limit->select works. should maybe consider
301 # bypassing this and doing S::A::select($self, ...) in
302 # our select method above. meantime, quoting shims have
303 # been added to select/insert/update/delete here
304 }
305}
306
307sub _recurse_from {
308 my ($self, $from, @join) = @_;
309 my @sqlf;
310 push(@sqlf, $self->_make_as($from));
311 foreach my $j (@join) {
312 my ($to, $on) = @$j;
313
314 # check whether a join type exists
315 my $join_clause = '';
316 my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
317 if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
318 $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
319 } else {
320 $join_clause = ' JOIN ';
321 }
322 push(@sqlf, $join_clause);
323
324 if (ref $to eq 'ARRAY') {
325 push(@sqlf, '(', $self->_recurse_from(@$to), ')');
326 } else {
327 push(@sqlf, $self->_make_as($to));
328 }
329 push(@sqlf, ' ON ', $self->_join_condition($on));
330 }
331 return join('', @sqlf);
332}
333
db56cf3d 334sub _fold_sqlbind {
335 my ($self, $sqlbind) = @_;
336 my $sql = shift @$$sqlbind;
337 push @{$self->{from_bind}}, @$$sqlbind;
338 return $sql;
6f4ddea1 339}
340
341sub _make_as {
342 my ($self, $from) = @_;
db56cf3d 343 return join(' ', map { (ref $_ eq 'SCALAR' ? $$_
344 : ref $_ eq 'REF' ? $self->_fold_sqlbind($_)
345 : $self->_quote($_))
6f4ddea1 346 } reverse each %{$self->_skip_options($from)});
347}
348
349sub _skip_options {
350 my ($self, $hash) = @_;
351 my $clean_hash = {};
352 $clean_hash->{$_} = $hash->{$_}
353 for grep {!/^-/} keys %$hash;
354 return $clean_hash;
355}
356
357sub _join_condition {
358 my ($self, $cond) = @_;
359 if (ref $cond eq 'HASH') {
360 my %j;
361 for (keys %$cond) {
362 my $v = $cond->{$_};
363 if (ref $v) {
e8fcf76f 364 croak (ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
6f4ddea1 365 if ref($v) ne 'SCALAR';
366 $j{$_} = $v;
367 }
368 else {
369 my $x = '= '.$self->_quote($v); $j{$_} = \$x;
370 }
371 };
372 return scalar($self->_recurse_where(\%j));
373 } elsif (ref $cond eq 'ARRAY') {
374 return join(' OR ', map { $self->_join_condition($_) } @$cond);
375 } else {
376 die "Can't handle this yet!";
377 }
378}
379
380sub _quote {
381 my ($self, $label) = @_;
382 return '' unless defined $label;
383 return "*" if $label eq '*';
384 return $label unless $self->{quote_char};
385 if(ref $self->{quote_char} eq "ARRAY"){
386 return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
387 if !defined $self->{name_sep};
388 my $sep = $self->{name_sep};
389 return join($self->{name_sep},
390 map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1] }
391 split(/\Q$sep\E/,$label));
392 }
393 return $self->SUPER::_quote($label);
394}
395
396sub limit_dialect {
397 my $self = shift;
398 $self->{limit_dialect} = shift if @_;
399 return $self->{limit_dialect};
400}
401
402sub quote_char {
403 my $self = shift;
404 $self->{quote_char} = shift if @_;
405 return $self->{quote_char};
406}
407
408sub name_sep {
409 my $self = shift;
410 $self->{name_sep} = shift if @_;
411 return $self->{name_sep};
412}
413
4141;
415
416__END__
417
418=pod
419
420=head1 NAME
421
855c6fd0 422DBIx::Class::SQLAHacks - This module is a subclass of SQL::Abstract::Limit
423and includes a number of DBIC-specific workarounds, not yet suitable for
424inclusion into SQLA proper.
6f4ddea1 425
426=head1 METHODS
427
428=head2 new
429
430Tries to determine limit dialect.
431
432=head2 select
433
434Quotes table names, handles "limit" dialects (e.g. where rownum between x and
435y), supports SELECT ... FOR UPDATE and SELECT ... FOR SHARE.
436
437=head2 insert update delete
438
439Just quotes table names.
440
441=head2 limit_dialect
442
443Specifies the dialect of used for implementing an SQL "limit" clause for
444restricting the number of query results returned. Valid values are: RowNum.
445
446See L<DBIx::Class::Storage::DBI/connect_info> for details.
447
448=head2 name_sep
449
450Character separating quoted table names.
451
452See L<DBIx::Class::Storage::DBI/connect_info> for details.
453
454=head2 quote_char
455
456Set to an array-ref to specify separate left and right quotes for table names.
457
458See L<DBIx::Class::Storage::DBI/connect_info> for details.
459
460=cut
461