The Top limit emulation bundled with SQLA::Limit assumes that the limited resultset...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / SQLAHacks.pm
CommitLineData
6f4ddea1 1package # Hide from PAUSE
855c6fd0 2 DBIx::Class::SQLAHacks;
6f4ddea1 3
4use base qw/SQL::Abstract::Limit/;
e3764383 5use strict;
6use warnings;
db56cf3d 7use Carp::Clan qw/^DBIx::Class/;
ac788c46 8use Scalar::Util();
6f4ddea1 9
10sub new {
11 my $self = shift->SUPER::new(@_);
12
13 # This prevents the caching of $dbh in S::A::L, I believe
14 # If limit_dialect is a ref (like a $dbh), go ahead and replace
15 # it with what it resolves to:
16 $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
17 if ref $self->{limit_dialect};
18
19 $self;
20}
21
22
6f4ddea1 23# Some databases (sqlite) do not handle multiple parenthesis
24# around in/between arguments. A tentative x IN ( ( 1, 2 ,3) )
25# is interpreted as x IN 1 or something similar.
26#
27# Since we currently do not have access to the SQLA AST, resort
28# to barbaric mutilation of any SQL supplied in literal form
29
30sub _strip_outer_paren {
31 my ($self, $arg) = @_;
32
33 return $self->_SWITCH_refkind ($arg, {
34 ARRAYREFREF => sub {
35 $$arg->[0] = __strip_outer_paren ($$arg->[0]);
36 return $arg;
37 },
38 SCALARREF => sub {
39 return \__strip_outer_paren( $$arg );
40 },
41 FALLBACK => sub {
42 return $arg
43 },
44 });
45}
46
47sub __strip_outer_paren {
48 my $sql = shift;
49
50 if ($sql and not ref $sql) {
51 while ($sql =~ /^ \s* \( (.*) \) \s* $/x ) {
52 $sql = $1;
53 }
54 }
55
56 return $sql;
57}
58
59sub _where_field_IN {
60 my ($self, $lhs, $op, $rhs) = @_;
61 $rhs = $self->_strip_outer_paren ($rhs);
62 return $self->SUPER::_where_field_IN ($lhs, $op, $rhs);
63}
64
65sub _where_field_BETWEEN {
66 my ($self, $lhs, $op, $rhs) = @_;
67 $rhs = $self->_strip_outer_paren ($rhs);
68 return $self->SUPER::_where_field_BETWEEN ($lhs, $op, $rhs);
69}
70
15827712 71# Slow but ANSI standard Limit/Offset support. DB2 uses this
6f4ddea1 72sub _RowNumberOver {
73 my ($self, $sql, $order, $rows, $offset ) = @_;
74
75 $offset += 1;
03e1d9af 76 my $last = $rows + $offset - 1;
6f4ddea1 77 my ( $order_by ) = $self->_order_by( $order );
78
79 $sql = <<"SQL";
80SELECT * FROM
81(
82 SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
83 $sql
84 $order_by
85 ) Q1
86) Q2
87WHERE ROW_NUM BETWEEN $offset AND $last
88
89SQL
90
91 return $sql;
92}
93
15827712 94# Crappy Top based Limit/Offset support. MSSQL uses this currently,
95# but may have to switch to RowNumberOver one day
96sub _Top {
97 my ( $self, $sql, $order, $rows, $offset ) = @_;
98
99 croak '$order supplied to SQLAHacks limit emulators must be a hash'
100 if (ref $order ne 'HASH');
101
102 my $last = $rows + $offset;
103
104 my $req_order = $self->_order_by ($order->{order_by});
105 my $limit_order = $req_order ? $order->{order_by} : $order->{_virtual_order_by};
106
107 my ( $order_by_inner, $order_by_outer ) = $self->_order_directions($limit_order);
108
109 $sql =~ s/^\s*(SELECT|select)//;
110
111 $sql = <<"SQL";
112 SELECT * FROM
113 (
114 SELECT TOP $rows * FROM
115 (
116 SELECT TOP $last $sql $order_by_inner
117 ) AS foo
118 $order_by_outer
119 ) AS bar
120 $req_order
121
122SQL
123 return $sql;
124}
125
126
6f4ddea1 127
128# While we're at it, this should make LIMIT queries more efficient,
129# without digging into things too deeply
6f4ddea1 130sub _find_syntax {
131 my ($self, $syntax) = @_;
ac788c46 132 return $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
6f4ddea1 133}
134
135sub select {
136 my ($self, $table, $fields, $where, $order, @rest) = @_;
db56cf3d 137 local $self->{having_bind} = [];
138 local $self->{from_bind} = [];
139
6f4ddea1 140 if (ref $table eq 'SCALAR') {
141 $table = $$table;
142 }
143 elsif (not ref $table) {
144 $table = $self->_quote($table);
145 }
146 local $self->{rownum_hack_count} = 1
147 if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
148 @rest = (-1) unless defined $rest[0];
e8fcf76f 149 croak "LIMIT 0 Does Not Compute" if $rest[0] == 0;
6f4ddea1 150 # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
db56cf3d 151 my ($sql, @where_bind) = $self->SUPER::select(
6f4ddea1 152 $table, $self->_recurse_fields($fields), $where, $order, @rest
153 );
154 $sql .=
155 $self->{for} ?
156 (
157 $self->{for} eq 'update' ? ' FOR UPDATE' :
158 $self->{for} eq 'shared' ? ' FOR SHARE' :
159 ''
160 ) :
161 ''
162 ;
db56cf3d 163 return wantarray ? ($sql, @{$self->{from_bind}}, @where_bind, @{$self->{having_bind}}) : $sql;
6f4ddea1 164}
165
166sub insert {
167 my $self = shift;
168 my $table = shift;
169 $table = $self->_quote($table) unless ref($table);
170 $self->SUPER::insert($table, @_);
171}
172
173sub update {
174 my $self = shift;
175 my $table = shift;
176 $table = $self->_quote($table) unless ref($table);
177 $self->SUPER::update($table, @_);
178}
179
180sub delete {
181 my $self = shift;
182 my $table = shift;
183 $table = $self->_quote($table) unless ref($table);
184 $self->SUPER::delete($table, @_);
185}
186
187sub _emulate_limit {
188 my $self = shift;
189 if ($_[3] == -1) {
190 return $_[1].$self->_order_by($_[2]);
191 } else {
192 return $self->SUPER::_emulate_limit(@_);
193 }
194}
195
196sub _recurse_fields {
197 my ($self, $fields, $params) = @_;
198 my $ref = ref $fields;
199 return $self->_quote($fields) unless $ref;
200 return $$fields if $ref eq 'SCALAR';
201
202 if ($ref eq 'ARRAY') {
203 return join(', ', map {
204 $self->_recurse_fields($_)
205 .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack})
206 ? ' AS col'.$self->{rownum_hack_count}++
207 : '')
208 } @$fields);
209 } elsif ($ref eq 'HASH') {
210 foreach my $func (keys %$fields) {
db56cf3d 211 if ($func eq 'distinct') {
212 my $_fields = $fields->{$func};
213 if (ref $_fields eq 'ARRAY' && @{$_fields} > 1) {
04ebc6f4 214 croak (
215 'The select => { distinct => ... } syntax is not supported for multiple columns.'
216 .' Instead please use { group_by => [ qw/' . (join ' ', @$_fields) . '/ ] }'
217 .' or { select => [ qw/' . (join ' ', @$_fields) . '/ ], distinct => 1 }'
218 );
db56cf3d 219 }
220 else {
221 $_fields = @{$_fields}[0] if ref $_fields eq 'ARRAY';
04ebc6f4 222 carp (
223 'The select => { distinct => ... } syntax will be deprecated in DBIC version 0.09,'
224 ." please use { group_by => '${_fields}' } or { select => '${_fields}', distinct => 1 }"
225 );
db56cf3d 226 }
227 }
6f4ddea1 228 return $self->_sqlcase($func)
229 .'( '.$self->_recurse_fields($fields->{$func}).' )';
230 }
231 }
232 # Is the second check absolutely necessary?
233 elsif ( $ref eq 'REF' and ref($$fields) eq 'ARRAY' ) {
db56cf3d 234 return $self->_fold_sqlbind( $fields );
6f4ddea1 235 }
236 else {
e8fcf76f 237 croak($ref . qq{ unexpected in _recurse_fields()})
6f4ddea1 238 }
239}
240
241sub _order_by {
242 my $self = shift;
243 my $ret = '';
244 my @extra;
245 if (ref $_[0] eq 'HASH') {
15827712 246
6f4ddea1 247 if (defined $_[0]->{group_by}) {
248 $ret = $self->_sqlcase(' group by ')
249 .$self->_recurse_fields($_[0]->{group_by}, { no_rownum_hack => 1 });
250 }
15827712 251
6f4ddea1 252 if (defined $_[0]->{having}) {
253 my $frag;
254 ($frag, @extra) = $self->_recurse_where($_[0]->{having});
255 push(@{$self->{having_bind}}, @extra);
256 $ret .= $self->_sqlcase(' having ').$frag;
257 }
15827712 258
6f4ddea1 259 if (defined $_[0]->{order_by}) {
260 $ret .= $self->_order_by($_[0]->{order_by});
261 }
15827712 262
6f4ddea1 263 if (grep { $_ =~ /^-(desc|asc)/i } keys %{$_[0]}) {
264 return $self->SUPER::_order_by($_[0]);
265 }
15827712 266
6f4ddea1 267 } elsif (ref $_[0] eq 'SCALAR') {
268 $ret = $self->_sqlcase(' order by ').${ $_[0] };
269 } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
15827712 270 my @order = map {
271 my $r = $self->_order_by($_, @_);
272 $r =~ s/^ ?ORDER BY //i;
273 $r || ();
274 } @{+shift};
275
276 $ret = $self->_sqlcase(' order by ') . join(', ', @order) if @order;
277
6f4ddea1 278 } else {
279 $ret = $self->SUPER::_order_by(@_);
280 }
281 return $ret;
282}
283
284sub _order_directions {
285 my ($self, $order) = @_;
fde3719a 286 return $self->SUPER::_order_directions( $self->_resolve_order($order) );
287}
288
289sub _resolve_order {
290 my ($self, $order) = @_;
fde3719a 291
fd4cb60a 292 if (ref $order eq 'HASH') {
fde3719a 293 $order = [$self->_resolve_order_hash($order)];
294 }
295 elsif (ref $order eq 'ARRAY') {
fd4cb60a 296 $order = [map {
fde3719a 297 if (ref ($_) eq 'SCALAR') {
298 $$_
299 }
300 elsif (ref ($_) eq 'HASH') {
301 $self->_resolve_order_hash($_)
fd4cb60a 302 }
fde3719a 303 else {
304 $_
305 }
306 } @$order];
fd4cb60a 307 }
fde3719a 308
309 return $order;
6f4ddea1 310}
311
fde3719a 312sub _resolve_order_hash {
fd4cb60a 313 my ($self, $order) = @_;
e3764383 314 my @new_order;
315 foreach my $key (keys %{ $order }) {
316 if ($key =~ /^-(desc|asc)/i ) {
317 my $direction = $1;
318 my $type = ref $order->{ $key };
319 if ($type eq 'ARRAY') {
320 push @new_order, map( "$_ $direction", @{ $order->{ $key } } );
321 } elsif (!$type) {
322 push @new_order, "$order->{$key} $direction";
323 } else {
324 croak "hash order_by can only contain Scalar or Array, not $type";
325 }
326 } else {
327 croak "$key is not a valid direction, use -asc or -desc";
328 }
329 }
15827712 330
e3764383 331 return @new_order;
fd4cb60a 332}
333
6f4ddea1 334sub _table {
335 my ($self, $from) = @_;
336 if (ref $from eq 'ARRAY') {
337 return $self->_recurse_from(@$from);
338 } elsif (ref $from eq 'HASH') {
339 return $self->_make_as($from);
340 } else {
341 return $from; # would love to quote here but _table ends up getting called
342 # twice during an ->select without a limit clause due to
343 # the way S::A::Limit->select works. should maybe consider
344 # bypassing this and doing S::A::select($self, ...) in
345 # our select method above. meantime, quoting shims have
346 # been added to select/insert/update/delete here
347 }
348}
349
350sub _recurse_from {
351 my ($self, $from, @join) = @_;
352 my @sqlf;
353 push(@sqlf, $self->_make_as($from));
354 foreach my $j (@join) {
355 my ($to, $on) = @$j;
356
357 # check whether a join type exists
358 my $join_clause = '';
359 my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
360 if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
361 $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
362 } else {
363 $join_clause = ' JOIN ';
364 }
365 push(@sqlf, $join_clause);
366
367 if (ref $to eq 'ARRAY') {
368 push(@sqlf, '(', $self->_recurse_from(@$to), ')');
369 } else {
370 push(@sqlf, $self->_make_as($to));
371 }
372 push(@sqlf, ' ON ', $self->_join_condition($on));
373 }
374 return join('', @sqlf);
375}
376
db56cf3d 377sub _fold_sqlbind {
378 my ($self, $sqlbind) = @_;
69989ea9 379
380 my @sqlbind = @$$sqlbind; # copy
381 my $sql = shift @sqlbind;
382 push @{$self->{from_bind}}, @sqlbind;
383
db56cf3d 384 return $sql;
6f4ddea1 385}
386
387sub _make_as {
388 my ($self, $from) = @_;
db56cf3d 389 return join(' ', map { (ref $_ eq 'SCALAR' ? $$_
390 : ref $_ eq 'REF' ? $self->_fold_sqlbind($_)
391 : $self->_quote($_))
6f4ddea1 392 } reverse each %{$self->_skip_options($from)});
393}
394
395sub _skip_options {
396 my ($self, $hash) = @_;
397 my $clean_hash = {};
398 $clean_hash->{$_} = $hash->{$_}
399 for grep {!/^-/} keys %$hash;
400 return $clean_hash;
401}
402
403sub _join_condition {
404 my ($self, $cond) = @_;
405 if (ref $cond eq 'HASH') {
406 my %j;
407 for (keys %$cond) {
408 my $v = $cond->{$_};
409 if (ref $v) {
e8fcf76f 410 croak (ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
6f4ddea1 411 if ref($v) ne 'SCALAR';
412 $j{$_} = $v;
413 }
414 else {
415 my $x = '= '.$self->_quote($v); $j{$_} = \$x;
416 }
417 };
418 return scalar($self->_recurse_where(\%j));
419 } elsif (ref $cond eq 'ARRAY') {
420 return join(' OR ', map { $self->_join_condition($_) } @$cond);
421 } else {
422 die "Can't handle this yet!";
423 }
424}
425
426sub _quote {
427 my ($self, $label) = @_;
428 return '' unless defined $label;
429 return "*" if $label eq '*';
430 return $label unless $self->{quote_char};
431 if(ref $self->{quote_char} eq "ARRAY"){
432 return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
433 if !defined $self->{name_sep};
434 my $sep = $self->{name_sep};
435 return join($self->{name_sep},
436 map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1] }
437 split(/\Q$sep\E/,$label));
438 }
439 return $self->SUPER::_quote($label);
440}
441
442sub limit_dialect {
443 my $self = shift;
444 $self->{limit_dialect} = shift if @_;
445 return $self->{limit_dialect};
446}
447
448sub quote_char {
449 my $self = shift;
450 $self->{quote_char} = shift if @_;
451 return $self->{quote_char};
452}
453
454sub name_sep {
455 my $self = shift;
456 $self->{name_sep} = shift if @_;
457 return $self->{name_sep};
458}
459
4601;
461
462__END__
463
464=pod
465
466=head1 NAME
467
855c6fd0 468DBIx::Class::SQLAHacks - This module is a subclass of SQL::Abstract::Limit
469and includes a number of DBIC-specific workarounds, not yet suitable for
470inclusion into SQLA proper.
6f4ddea1 471
472=head1 METHODS
473
474=head2 new
475
476Tries to determine limit dialect.
477
478=head2 select
479
480Quotes table names, handles "limit" dialects (e.g. where rownum between x and
481y), supports SELECT ... FOR UPDATE and SELECT ... FOR SHARE.
482
483=head2 insert update delete
484
485Just quotes table names.
486
487=head2 limit_dialect
488
489Specifies the dialect of used for implementing an SQL "limit" clause for
490restricting the number of query results returned. Valid values are: RowNum.
491
492See L<DBIx::Class::Storage::DBI/connect_info> for details.
493
494=head2 name_sep
495
496Character separating quoted table names.
497
498See L<DBIx::Class::Storage::DBI/connect_info> for details.
499
500=head2 quote_char
501
502Set to an array-ref to specify separate left and right quotes for table names.
503
504See L<DBIx::Class::Storage::DBI/connect_info> for details.
505
506=cut
507