Merge 'top_limit_tweaks' into 'top_limit_altfix'
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / SQLAHacks.pm
CommitLineData
6f4ddea1 1package # Hide from PAUSE
855c6fd0 2 DBIx::Class::SQLAHacks;
6f4ddea1 3
4use base qw/SQL::Abstract::Limit/;
e3764383 5use strict;
6use warnings;
db56cf3d 7use Carp::Clan qw/^DBIx::Class/;
ac788c46 8use Scalar::Util();
6f4ddea1 9
10sub new {
11 my $self = shift->SUPER::new(@_);
12
13 # This prevents the caching of $dbh in S::A::L, I believe
14 # If limit_dialect is a ref (like a $dbh), go ahead and replace
15 # it with what it resolves to:
16 $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
17 if ref $self->{limit_dialect};
18
19 $self;
20}
21
22
6f4ddea1 23# Some databases (sqlite) do not handle multiple parenthesis
24# around in/between arguments. A tentative x IN ( ( 1, 2 ,3) )
25# is interpreted as x IN 1 or something similar.
26#
27# Since we currently do not have access to the SQLA AST, resort
28# to barbaric mutilation of any SQL supplied in literal form
29
30sub _strip_outer_paren {
31 my ($self, $arg) = @_;
32
33 return $self->_SWITCH_refkind ($arg, {
34 ARRAYREFREF => sub {
35 $$arg->[0] = __strip_outer_paren ($$arg->[0]);
36 return $arg;
37 },
38 SCALARREF => sub {
39 return \__strip_outer_paren( $$arg );
40 },
41 FALLBACK => sub {
42 return $arg
43 },
44 });
45}
46
47sub __strip_outer_paren {
48 my $sql = shift;
49
50 if ($sql and not ref $sql) {
51 while ($sql =~ /^ \s* \( (.*) \) \s* $/x ) {
52 $sql = $1;
53 }
54 }
55
56 return $sql;
57}
58
59sub _where_field_IN {
60 my ($self, $lhs, $op, $rhs) = @_;
61 $rhs = $self->_strip_outer_paren ($rhs);
62 return $self->SUPER::_where_field_IN ($lhs, $op, $rhs);
63}
64
65sub _where_field_BETWEEN {
66 my ($self, $lhs, $op, $rhs) = @_;
67 $rhs = $self->_strip_outer_paren ($rhs);
68 return $self->SUPER::_where_field_BETWEEN ($lhs, $op, $rhs);
69}
70
15827712 71# Slow but ANSI standard Limit/Offset support. DB2 uses this
6f4ddea1 72sub _RowNumberOver {
73 my ($self, $sql, $order, $rows, $offset ) = @_;
74
75 $offset += 1;
03e1d9af 76 my $last = $rows + $offset - 1;
6f4ddea1 77 my ( $order_by ) = $self->_order_by( $order );
78
79 $sql = <<"SQL";
80SELECT * FROM
81(
82 SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
83 $sql
84 $order_by
85 ) Q1
86) Q2
87WHERE ROW_NUM BETWEEN $offset AND $last
88
89SQL
90
91 return $sql;
92}
93
15827712 94# Crappy Top based Limit/Offset support. MSSQL uses this currently,
95# but may have to switch to RowNumberOver one day
96sub _Top {
97 my ( $self, $sql, $order, $rows, $offset ) = @_;
98
99 croak '$order supplied to SQLAHacks limit emulators must be a hash'
100 if (ref $order ne 'HASH');
101
8f6dbee9 102 $order = { %$order }; #copy
103
15827712 104 my $last = $rows + $offset;
105
106 my $req_order = $self->_order_by ($order->{order_by});
107 my $limit_order = $req_order ? $order->{order_by} : $order->{_virtual_order_by};
108
8f6dbee9 109 delete $order->{$_} for qw/order_by _virtual_order_by/;
110 my $grpby_having = $self->_order_by ($order);
111
15827712 112 my ( $order_by_inner, $order_by_outer ) = $self->_order_directions($limit_order);
113
114 $sql =~ s/^\s*(SELECT|select)//;
115
116 $sql = <<"SQL";
117 SELECT * FROM
118 (
119 SELECT TOP $rows * FROM
120 (
8f6dbee9 121 SELECT TOP $last $sql $grpby_having $order_by_inner
15827712 122 ) AS foo
123 $order_by_outer
124 ) AS bar
125 $req_order
126
127SQL
128 return $sql;
129}
130
131
6f4ddea1 132
133# While we're at it, this should make LIMIT queries more efficient,
134# without digging into things too deeply
6f4ddea1 135sub _find_syntax {
136 my ($self, $syntax) = @_;
ac788c46 137 return $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
6f4ddea1 138}
139
140sub select {
141 my ($self, $table, $fields, $where, $order, @rest) = @_;
db56cf3d 142 local $self->{having_bind} = [];
143 local $self->{from_bind} = [];
144
6f4ddea1 145 if (ref $table eq 'SCALAR') {
146 $table = $$table;
147 }
148 elsif (not ref $table) {
149 $table = $self->_quote($table);
150 }
151 local $self->{rownum_hack_count} = 1
152 if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
153 @rest = (-1) unless defined $rest[0];
e8fcf76f 154 croak "LIMIT 0 Does Not Compute" if $rest[0] == 0;
6f4ddea1 155 # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
db56cf3d 156 my ($sql, @where_bind) = $self->SUPER::select(
6f4ddea1 157 $table, $self->_recurse_fields($fields), $where, $order, @rest
158 );
159 $sql .=
160 $self->{for} ?
161 (
162 $self->{for} eq 'update' ? ' FOR UPDATE' :
163 $self->{for} eq 'shared' ? ' FOR SHARE' :
164 ''
165 ) :
166 ''
167 ;
db56cf3d 168 return wantarray ? ($sql, @{$self->{from_bind}}, @where_bind, @{$self->{having_bind}}) : $sql;
6f4ddea1 169}
170
171sub insert {
172 my $self = shift;
173 my $table = shift;
174 $table = $self->_quote($table) unless ref($table);
175 $self->SUPER::insert($table, @_);
176}
177
178sub update {
179 my $self = shift;
180 my $table = shift;
181 $table = $self->_quote($table) unless ref($table);
182 $self->SUPER::update($table, @_);
183}
184
185sub delete {
186 my $self = shift;
187 my $table = shift;
188 $table = $self->_quote($table) unless ref($table);
189 $self->SUPER::delete($table, @_);
190}
191
192sub _emulate_limit {
193 my $self = shift;
194 if ($_[3] == -1) {
195 return $_[1].$self->_order_by($_[2]);
196 } else {
197 return $self->SUPER::_emulate_limit(@_);
198 }
199}
200
201sub _recurse_fields {
202 my ($self, $fields, $params) = @_;
203 my $ref = ref $fields;
204 return $self->_quote($fields) unless $ref;
205 return $$fields if $ref eq 'SCALAR';
206
207 if ($ref eq 'ARRAY') {
208 return join(', ', map {
209 $self->_recurse_fields($_)
210 .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack})
211 ? ' AS col'.$self->{rownum_hack_count}++
212 : '')
213 } @$fields);
214 } elsif ($ref eq 'HASH') {
215 foreach my $func (keys %$fields) {
db56cf3d 216 if ($func eq 'distinct') {
217 my $_fields = $fields->{$func};
218 if (ref $_fields eq 'ARRAY' && @{$_fields} > 1) {
04ebc6f4 219 croak (
220 'The select => { distinct => ... } syntax is not supported for multiple columns.'
221 .' Instead please use { group_by => [ qw/' . (join ' ', @$_fields) . '/ ] }'
222 .' or { select => [ qw/' . (join ' ', @$_fields) . '/ ], distinct => 1 }'
223 );
db56cf3d 224 }
225 else {
226 $_fields = @{$_fields}[0] if ref $_fields eq 'ARRAY';
04ebc6f4 227 carp (
228 'The select => { distinct => ... } syntax will be deprecated in DBIC version 0.09,'
229 ." please use { group_by => '${_fields}' } or { select => '${_fields}', distinct => 1 }"
230 );
db56cf3d 231 }
232 }
6f4ddea1 233 return $self->_sqlcase($func)
234 .'( '.$self->_recurse_fields($fields->{$func}).' )';
235 }
236 }
237 # Is the second check absolutely necessary?
238 elsif ( $ref eq 'REF' and ref($$fields) eq 'ARRAY' ) {
db56cf3d 239 return $self->_fold_sqlbind( $fields );
6f4ddea1 240 }
241 else {
e8fcf76f 242 croak($ref . qq{ unexpected in _recurse_fields()})
6f4ddea1 243 }
244}
245
246sub _order_by {
247 my $self = shift;
248 my $ret = '';
249 my @extra;
250 if (ref $_[0] eq 'HASH') {
15827712 251
6f4ddea1 252 if (defined $_[0]->{group_by}) {
253 $ret = $self->_sqlcase(' group by ')
254 .$self->_recurse_fields($_[0]->{group_by}, { no_rownum_hack => 1 });
255 }
15827712 256
6f4ddea1 257 if (defined $_[0]->{having}) {
258 my $frag;
259 ($frag, @extra) = $self->_recurse_where($_[0]->{having});
260 push(@{$self->{having_bind}}, @extra);
261 $ret .= $self->_sqlcase(' having ').$frag;
262 }
15827712 263
6f4ddea1 264 if (defined $_[0]->{order_by}) {
265 $ret .= $self->_order_by($_[0]->{order_by});
266 }
15827712 267
6f4ddea1 268 if (grep { $_ =~ /^-(desc|asc)/i } keys %{$_[0]}) {
269 return $self->SUPER::_order_by($_[0]);
270 }
15827712 271
6f4ddea1 272 } elsif (ref $_[0] eq 'SCALAR') {
273 $ret = $self->_sqlcase(' order by ').${ $_[0] };
274 } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
15827712 275 my @order = map {
276 my $r = $self->_order_by($_, @_);
277 $r =~ s/^ ?ORDER BY //i;
278 $r || ();
279 } @{+shift};
280
281 $ret = $self->_sqlcase(' order by ') . join(', ', @order) if @order;
282
6f4ddea1 283 } else {
284 $ret = $self->SUPER::_order_by(@_);
285 }
286 return $ret;
287}
288
289sub _order_directions {
290 my ($self, $order) = @_;
fde3719a 291 return $self->SUPER::_order_directions( $self->_resolve_order($order) );
292}
293
294sub _resolve_order {
295 my ($self, $order) = @_;
fde3719a 296
fd4cb60a 297 if (ref $order eq 'HASH') {
fde3719a 298 $order = [$self->_resolve_order_hash($order)];
299 }
300 elsif (ref $order eq 'ARRAY') {
fd4cb60a 301 $order = [map {
fde3719a 302 if (ref ($_) eq 'SCALAR') {
303 $$_
304 }
305 elsif (ref ($_) eq 'HASH') {
306 $self->_resolve_order_hash($_)
fd4cb60a 307 }
fde3719a 308 else {
309 $_
310 }
311 } @$order];
fd4cb60a 312 }
fde3719a 313
314 return $order;
6f4ddea1 315}
316
fde3719a 317sub _resolve_order_hash {
fd4cb60a 318 my ($self, $order) = @_;
e3764383 319 my @new_order;
320 foreach my $key (keys %{ $order }) {
321 if ($key =~ /^-(desc|asc)/i ) {
322 my $direction = $1;
323 my $type = ref $order->{ $key };
324 if ($type eq 'ARRAY') {
325 push @new_order, map( "$_ $direction", @{ $order->{ $key } } );
326 } elsif (!$type) {
327 push @new_order, "$order->{$key} $direction";
328 } else {
329 croak "hash order_by can only contain Scalar or Array, not $type";
330 }
331 } else {
332 croak "$key is not a valid direction, use -asc or -desc";
333 }
334 }
15827712 335
e3764383 336 return @new_order;
fd4cb60a 337}
338
6f4ddea1 339sub _table {
340 my ($self, $from) = @_;
341 if (ref $from eq 'ARRAY') {
342 return $self->_recurse_from(@$from);
343 } elsif (ref $from eq 'HASH') {
344 return $self->_make_as($from);
345 } else {
346 return $from; # would love to quote here but _table ends up getting called
347 # twice during an ->select without a limit clause due to
348 # the way S::A::Limit->select works. should maybe consider
349 # bypassing this and doing S::A::select($self, ...) in
350 # our select method above. meantime, quoting shims have
351 # been added to select/insert/update/delete here
352 }
353}
354
355sub _recurse_from {
356 my ($self, $from, @join) = @_;
357 my @sqlf;
358 push(@sqlf, $self->_make_as($from));
359 foreach my $j (@join) {
360 my ($to, $on) = @$j;
361
362 # check whether a join type exists
363 my $join_clause = '';
364 my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
365 if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
366 $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
367 } else {
368 $join_clause = ' JOIN ';
369 }
370 push(@sqlf, $join_clause);
371
372 if (ref $to eq 'ARRAY') {
373 push(@sqlf, '(', $self->_recurse_from(@$to), ')');
374 } else {
375 push(@sqlf, $self->_make_as($to));
376 }
377 push(@sqlf, ' ON ', $self->_join_condition($on));
378 }
379 return join('', @sqlf);
380}
381
db56cf3d 382sub _fold_sqlbind {
383 my ($self, $sqlbind) = @_;
69989ea9 384
385 my @sqlbind = @$$sqlbind; # copy
386 my $sql = shift @sqlbind;
387 push @{$self->{from_bind}}, @sqlbind;
388
db56cf3d 389 return $sql;
6f4ddea1 390}
391
392sub _make_as {
393 my ($self, $from) = @_;
db56cf3d 394 return join(' ', map { (ref $_ eq 'SCALAR' ? $$_
395 : ref $_ eq 'REF' ? $self->_fold_sqlbind($_)
396 : $self->_quote($_))
6f4ddea1 397 } reverse each %{$self->_skip_options($from)});
398}
399
400sub _skip_options {
401 my ($self, $hash) = @_;
402 my $clean_hash = {};
403 $clean_hash->{$_} = $hash->{$_}
404 for grep {!/^-/} keys %$hash;
405 return $clean_hash;
406}
407
408sub _join_condition {
409 my ($self, $cond) = @_;
410 if (ref $cond eq 'HASH') {
411 my %j;
412 for (keys %$cond) {
413 my $v = $cond->{$_};
414 if (ref $v) {
e8fcf76f 415 croak (ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
6f4ddea1 416 if ref($v) ne 'SCALAR';
417 $j{$_} = $v;
418 }
419 else {
420 my $x = '= '.$self->_quote($v); $j{$_} = \$x;
421 }
422 };
423 return scalar($self->_recurse_where(\%j));
424 } elsif (ref $cond eq 'ARRAY') {
425 return join(' OR ', map { $self->_join_condition($_) } @$cond);
426 } else {
427 die "Can't handle this yet!";
428 }
429}
430
431sub _quote {
432 my ($self, $label) = @_;
433 return '' unless defined $label;
434 return "*" if $label eq '*';
435 return $label unless $self->{quote_char};
436 if(ref $self->{quote_char} eq "ARRAY"){
437 return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
438 if !defined $self->{name_sep};
439 my $sep = $self->{name_sep};
440 return join($self->{name_sep},
441 map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1] }
442 split(/\Q$sep\E/,$label));
443 }
444 return $self->SUPER::_quote($label);
445}
446
447sub limit_dialect {
448 my $self = shift;
449 $self->{limit_dialect} = shift if @_;
450 return $self->{limit_dialect};
451}
452
453sub quote_char {
454 my $self = shift;
455 $self->{quote_char} = shift if @_;
456 return $self->{quote_char};
457}
458
459sub name_sep {
460 my $self = shift;
461 $self->{name_sep} = shift if @_;
462 return $self->{name_sep};
463}
464
4651;
466
467__END__
468
469=pod
470
471=head1 NAME
472
855c6fd0 473DBIx::Class::SQLAHacks - This module is a subclass of SQL::Abstract::Limit
474and includes a number of DBIC-specific workarounds, not yet suitable for
475inclusion into SQLA proper.
6f4ddea1 476
477=head1 METHODS
478
479=head2 new
480
481Tries to determine limit dialect.
482
483=head2 select
484
485Quotes table names, handles "limit" dialects (e.g. where rownum between x and
486y), supports SELECT ... FOR UPDATE and SELECT ... FOR SHARE.
487
488=head2 insert update delete
489
490Just quotes table names.
491
492=head2 limit_dialect
493
494Specifies the dialect of used for implementing an SQL "limit" clause for
495restricting the number of query results returned. Valid values are: RowNum.
496
497See L<DBIx::Class::Storage::DBI/connect_info> for details.
498
499=head2 name_sep
500
501Character separating quoted table names.
502
503See L<DBIx::Class::Storage::DBI/connect_info> for details.
504
505=head2 quote_char
506
507Set to an array-ref to specify separate left and right quotes for table names.
508
509See L<DBIx::Class::Storage::DBI/connect_info> for details.
510
511=cut
512