Factor out the order_by sqlahacks resolver
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / SQLAHacks.pm
CommitLineData
6f4ddea1 1package # Hide from PAUSE
855c6fd0 2 DBIx::Class::SQLAHacks;
6f4ddea1 3
4use base qw/SQL::Abstract::Limit/;
e3764383 5use strict;
6use warnings;
db56cf3d 7use Carp::Clan qw/^DBIx::Class/;
6f4ddea1 8
9sub new {
10 my $self = shift->SUPER::new(@_);
11
12 # This prevents the caching of $dbh in S::A::L, I believe
13 # If limit_dialect is a ref (like a $dbh), go ahead and replace
14 # it with what it resolves to:
15 $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
16 if ref $self->{limit_dialect};
17
18 $self;
19}
20
21
6f4ddea1 22# Some databases (sqlite) do not handle multiple parenthesis
23# around in/between arguments. A tentative x IN ( ( 1, 2 ,3) )
24# is interpreted as x IN 1 or something similar.
25#
26# Since we currently do not have access to the SQLA AST, resort
27# to barbaric mutilation of any SQL supplied in literal form
28
29sub _strip_outer_paren {
30 my ($self, $arg) = @_;
31
32 return $self->_SWITCH_refkind ($arg, {
33 ARRAYREFREF => sub {
34 $$arg->[0] = __strip_outer_paren ($$arg->[0]);
35 return $arg;
36 },
37 SCALARREF => sub {
38 return \__strip_outer_paren( $$arg );
39 },
40 FALLBACK => sub {
41 return $arg
42 },
43 });
44}
45
46sub __strip_outer_paren {
47 my $sql = shift;
48
49 if ($sql and not ref $sql) {
50 while ($sql =~ /^ \s* \( (.*) \) \s* $/x ) {
51 $sql = $1;
52 }
53 }
54
55 return $sql;
56}
57
58sub _where_field_IN {
59 my ($self, $lhs, $op, $rhs) = @_;
60 $rhs = $self->_strip_outer_paren ($rhs);
61 return $self->SUPER::_where_field_IN ($lhs, $op, $rhs);
62}
63
64sub _where_field_BETWEEN {
65 my ($self, $lhs, $op, $rhs) = @_;
66 $rhs = $self->_strip_outer_paren ($rhs);
67 return $self->SUPER::_where_field_BETWEEN ($lhs, $op, $rhs);
68}
69
70
71
72# DB2 is the only remaining DB using this. Even though we are not sure if
73# RowNumberOver is still needed here (should be part of SQLA) leave the
74# code in place
75sub _RowNumberOver {
76 my ($self, $sql, $order, $rows, $offset ) = @_;
77
78 $offset += 1;
03e1d9af 79 my $last = $rows + $offset - 1;
6f4ddea1 80 my ( $order_by ) = $self->_order_by( $order );
81
82 $sql = <<"SQL";
83SELECT * FROM
84(
85 SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
86 $sql
87 $order_by
88 ) Q1
89) Q2
90WHERE ROW_NUM BETWEEN $offset AND $last
91
92SQL
93
94 return $sql;
95}
96
97
98# While we're at it, this should make LIMIT queries more efficient,
99# without digging into things too deeply
100use Scalar::Util 'blessed';
101sub _find_syntax {
102 my ($self, $syntax) = @_;
103
104 # DB2 is the only remaining DB using this. Even though we are not sure if
105 # RowNumberOver is still needed here (should be part of SQLA) leave the
106 # code in place
107 my $dbhname = blessed($syntax) ? $syntax->{Driver}{Name} : $syntax;
a964a928 108 if(ref($self) && $dbhname) {
109 if ($dbhname eq 'DB2') {
110 return 'RowNumberOver';
111 }
6f4ddea1 112 }
113
114 $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
115}
116
117sub select {
118 my ($self, $table, $fields, $where, $order, @rest) = @_;
db56cf3d 119 local $self->{having_bind} = [];
120 local $self->{from_bind} = [];
121
6f4ddea1 122 if (ref $table eq 'SCALAR') {
123 $table = $$table;
124 }
125 elsif (not ref $table) {
126 $table = $self->_quote($table);
127 }
128 local $self->{rownum_hack_count} = 1
129 if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
130 @rest = (-1) unless defined $rest[0];
e8fcf76f 131 croak "LIMIT 0 Does Not Compute" if $rest[0] == 0;
6f4ddea1 132 # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
db56cf3d 133 my ($sql, @where_bind) = $self->SUPER::select(
6f4ddea1 134 $table, $self->_recurse_fields($fields), $where, $order, @rest
135 );
136 $sql .=
137 $self->{for} ?
138 (
139 $self->{for} eq 'update' ? ' FOR UPDATE' :
140 $self->{for} eq 'shared' ? ' FOR SHARE' :
141 ''
142 ) :
143 ''
144 ;
db56cf3d 145 return wantarray ? ($sql, @{$self->{from_bind}}, @where_bind, @{$self->{having_bind}}) : $sql;
6f4ddea1 146}
147
148sub insert {
149 my $self = shift;
150 my $table = shift;
151 $table = $self->_quote($table) unless ref($table);
152 $self->SUPER::insert($table, @_);
153}
154
155sub update {
156 my $self = shift;
157 my $table = shift;
158 $table = $self->_quote($table) unless ref($table);
159 $self->SUPER::update($table, @_);
160}
161
162sub delete {
163 my $self = shift;
164 my $table = shift;
165 $table = $self->_quote($table) unless ref($table);
166 $self->SUPER::delete($table, @_);
167}
168
169sub _emulate_limit {
170 my $self = shift;
171 if ($_[3] == -1) {
172 return $_[1].$self->_order_by($_[2]);
173 } else {
174 return $self->SUPER::_emulate_limit(@_);
175 }
176}
177
178sub _recurse_fields {
179 my ($self, $fields, $params) = @_;
180 my $ref = ref $fields;
181 return $self->_quote($fields) unless $ref;
182 return $$fields if $ref eq 'SCALAR';
183
184 if ($ref eq 'ARRAY') {
185 return join(', ', map {
186 $self->_recurse_fields($_)
187 .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack})
188 ? ' AS col'.$self->{rownum_hack_count}++
189 : '')
190 } @$fields);
191 } elsif ($ref eq 'HASH') {
192 foreach my $func (keys %$fields) {
db56cf3d 193 if ($func eq 'distinct') {
194 my $_fields = $fields->{$func};
195 if (ref $_fields eq 'ARRAY' && @{$_fields} > 1) {
04ebc6f4 196 croak (
197 'The select => { distinct => ... } syntax is not supported for multiple columns.'
198 .' Instead please use { group_by => [ qw/' . (join ' ', @$_fields) . '/ ] }'
199 .' or { select => [ qw/' . (join ' ', @$_fields) . '/ ], distinct => 1 }'
200 );
db56cf3d 201 }
202 else {
203 $_fields = @{$_fields}[0] if ref $_fields eq 'ARRAY';
04ebc6f4 204 carp (
205 'The select => { distinct => ... } syntax will be deprecated in DBIC version 0.09,'
206 ." please use { group_by => '${_fields}' } or { select => '${_fields}', distinct => 1 }"
207 );
db56cf3d 208 }
209 }
6f4ddea1 210 return $self->_sqlcase($func)
211 .'( '.$self->_recurse_fields($fields->{$func}).' )';
212 }
213 }
214 # Is the second check absolutely necessary?
215 elsif ( $ref eq 'REF' and ref($$fields) eq 'ARRAY' ) {
db56cf3d 216 return $self->_fold_sqlbind( $fields );
6f4ddea1 217 }
218 else {
e8fcf76f 219 croak($ref . qq{ unexpected in _recurse_fields()})
6f4ddea1 220 }
221}
222
223sub _order_by {
224 my $self = shift;
225 my $ret = '';
226 my @extra;
227 if (ref $_[0] eq 'HASH') {
228 if (defined $_[0]->{group_by}) {
229 $ret = $self->_sqlcase(' group by ')
230 .$self->_recurse_fields($_[0]->{group_by}, { no_rownum_hack => 1 });
231 }
232 if (defined $_[0]->{having}) {
233 my $frag;
234 ($frag, @extra) = $self->_recurse_where($_[0]->{having});
235 push(@{$self->{having_bind}}, @extra);
236 $ret .= $self->_sqlcase(' having ').$frag;
237 }
238 if (defined $_[0]->{order_by}) {
239 $ret .= $self->_order_by($_[0]->{order_by});
240 }
241 if (grep { $_ =~ /^-(desc|asc)/i } keys %{$_[0]}) {
242 return $self->SUPER::_order_by($_[0]);
243 }
244 } elsif (ref $_[0] eq 'SCALAR') {
245 $ret = $self->_sqlcase(' order by ').${ $_[0] };
246 } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
247 my @order = @{+shift};
248 $ret = $self->_sqlcase(' order by ')
249 .join(', ', map {
250 my $r = $self->_order_by($_, @_);
251 $r =~ s/^ ?ORDER BY //i;
252 $r;
253 } @order);
254 } else {
255 $ret = $self->SUPER::_order_by(@_);
256 }
257 return $ret;
258}
259
260sub _order_directions {
261 my ($self, $order) = @_;
fde3719a 262 return $self->SUPER::_order_directions( $self->_resolve_order($order) );
263}
264
265sub _resolve_order {
266 my ($self, $order) = @_;
267 $order = $order->{order_by} if (ref $order eq 'HASH' and $order->{order_by});
268
fd4cb60a 269 if (ref $order eq 'HASH') {
fde3719a 270 $order = [$self->_resolve_order_hash($order)];
271 }
272 elsif (ref $order eq 'ARRAY') {
fd4cb60a 273 $order = [map {
fde3719a 274 if (ref ($_) eq 'SCALAR') {
275 $$_
276 }
277 elsif (ref ($_) eq 'HASH') {
278 $self->_resolve_order_hash($_)
fd4cb60a 279 }
fde3719a 280 else {
281 $_
282 }
283 } @$order];
fd4cb60a 284 }
fde3719a 285
286 return $order;
6f4ddea1 287}
288
fde3719a 289sub _resolve_order_hash {
fd4cb60a 290 my ($self, $order) = @_;
e3764383 291 my @new_order;
292 foreach my $key (keys %{ $order }) {
293 if ($key =~ /^-(desc|asc)/i ) {
294 my $direction = $1;
295 my $type = ref $order->{ $key };
296 if ($type eq 'ARRAY') {
297 push @new_order, map( "$_ $direction", @{ $order->{ $key } } );
298 } elsif (!$type) {
299 push @new_order, "$order->{$key} $direction";
300 } else {
301 croak "hash order_by can only contain Scalar or Array, not $type";
302 }
303 } else {
304 croak "$key is not a valid direction, use -asc or -desc";
305 }
306 }
307 return @new_order;
fd4cb60a 308}
309
6f4ddea1 310sub _table {
311 my ($self, $from) = @_;
312 if (ref $from eq 'ARRAY') {
313 return $self->_recurse_from(@$from);
314 } elsif (ref $from eq 'HASH') {
315 return $self->_make_as($from);
316 } else {
317 return $from; # would love to quote here but _table ends up getting called
318 # twice during an ->select without a limit clause due to
319 # the way S::A::Limit->select works. should maybe consider
320 # bypassing this and doing S::A::select($self, ...) in
321 # our select method above. meantime, quoting shims have
322 # been added to select/insert/update/delete here
323 }
324}
325
326sub _recurse_from {
327 my ($self, $from, @join) = @_;
328 my @sqlf;
329 push(@sqlf, $self->_make_as($from));
330 foreach my $j (@join) {
331 my ($to, $on) = @$j;
332
333 # check whether a join type exists
334 my $join_clause = '';
335 my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
336 if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
337 $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
338 } else {
339 $join_clause = ' JOIN ';
340 }
341 push(@sqlf, $join_clause);
342
343 if (ref $to eq 'ARRAY') {
344 push(@sqlf, '(', $self->_recurse_from(@$to), ')');
345 } else {
346 push(@sqlf, $self->_make_as($to));
347 }
348 push(@sqlf, ' ON ', $self->_join_condition($on));
349 }
350 return join('', @sqlf);
351}
352
db56cf3d 353sub _fold_sqlbind {
354 my ($self, $sqlbind) = @_;
69989ea9 355
356 my @sqlbind = @$$sqlbind; # copy
357 my $sql = shift @sqlbind;
358 push @{$self->{from_bind}}, @sqlbind;
359
db56cf3d 360 return $sql;
6f4ddea1 361}
362
363sub _make_as {
364 my ($self, $from) = @_;
db56cf3d 365 return join(' ', map { (ref $_ eq 'SCALAR' ? $$_
366 : ref $_ eq 'REF' ? $self->_fold_sqlbind($_)
367 : $self->_quote($_))
6f4ddea1 368 } reverse each %{$self->_skip_options($from)});
369}
370
371sub _skip_options {
372 my ($self, $hash) = @_;
373 my $clean_hash = {};
374 $clean_hash->{$_} = $hash->{$_}
375 for grep {!/^-/} keys %$hash;
376 return $clean_hash;
377}
378
379sub _join_condition {
380 my ($self, $cond) = @_;
381 if (ref $cond eq 'HASH') {
382 my %j;
383 for (keys %$cond) {
384 my $v = $cond->{$_};
385 if (ref $v) {
e8fcf76f 386 croak (ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
6f4ddea1 387 if ref($v) ne 'SCALAR';
388 $j{$_} = $v;
389 }
390 else {
391 my $x = '= '.$self->_quote($v); $j{$_} = \$x;
392 }
393 };
394 return scalar($self->_recurse_where(\%j));
395 } elsif (ref $cond eq 'ARRAY') {
396 return join(' OR ', map { $self->_join_condition($_) } @$cond);
397 } else {
398 die "Can't handle this yet!";
399 }
400}
401
402sub _quote {
403 my ($self, $label) = @_;
404 return '' unless defined $label;
405 return "*" if $label eq '*';
406 return $label unless $self->{quote_char};
407 if(ref $self->{quote_char} eq "ARRAY"){
408 return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
409 if !defined $self->{name_sep};
410 my $sep = $self->{name_sep};
411 return join($self->{name_sep},
412 map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1] }
413 split(/\Q$sep\E/,$label));
414 }
415 return $self->SUPER::_quote($label);
416}
417
418sub limit_dialect {
419 my $self = shift;
420 $self->{limit_dialect} = shift if @_;
421 return $self->{limit_dialect};
422}
423
424sub quote_char {
425 my $self = shift;
426 $self->{quote_char} = shift if @_;
427 return $self->{quote_char};
428}
429
430sub name_sep {
431 my $self = shift;
432 $self->{name_sep} = shift if @_;
433 return $self->{name_sep};
434}
435
4361;
437
438__END__
439
440=pod
441
442=head1 NAME
443
855c6fd0 444DBIx::Class::SQLAHacks - This module is a subclass of SQL::Abstract::Limit
445and includes a number of DBIC-specific workarounds, not yet suitable for
446inclusion into SQLA proper.
6f4ddea1 447
448=head1 METHODS
449
450=head2 new
451
452Tries to determine limit dialect.
453
454=head2 select
455
456Quotes table names, handles "limit" dialects (e.g. where rownum between x and
457y), supports SELECT ... FOR UPDATE and SELECT ... FOR SHARE.
458
459=head2 insert update delete
460
461Just quotes table names.
462
463=head2 limit_dialect
464
465Specifies the dialect of used for implementing an SQL "limit" clause for
466restricting the number of query results returned. Valid values are: RowNum.
467
468See L<DBIx::Class::Storage::DBI/connect_info> for details.
469
470=head2 name_sep
471
472Character separating quoted table names.
473
474See L<DBIx::Class::Storage::DBI/connect_info> for details.
475
476=head2 quote_char
477
478Set to an array-ref to specify separate left and right quotes for table names.
479
480See L<DBIx::Class::Storage::DBI/connect_info> for details.
481
482=cut
483