Move the DB2 Limit syntax setting into the storage class
[dbsrgits/DBIx-Class-Historic.git] / lib / DBIx / Class / SQLAHacks.pm
CommitLineData
6f4ddea1 1package # Hide from PAUSE
855c6fd0 2 DBIx::Class::SQLAHacks;
6f4ddea1 3
4use base qw/SQL::Abstract::Limit/;
e3764383 5use strict;
6use warnings;
db56cf3d 7use Carp::Clan qw/^DBIx::Class/;
ac788c46 8use Scalar::Util();
6f4ddea1 9
10sub new {
11 my $self = shift->SUPER::new(@_);
12
13 # This prevents the caching of $dbh in S::A::L, I believe
14 # If limit_dialect is a ref (like a $dbh), go ahead and replace
15 # it with what it resolves to:
16 $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
17 if ref $self->{limit_dialect};
18
19 $self;
20}
21
22
6f4ddea1 23# Some databases (sqlite) do not handle multiple parenthesis
24# around in/between arguments. A tentative x IN ( ( 1, 2 ,3) )
25# is interpreted as x IN 1 or something similar.
26#
27# Since we currently do not have access to the SQLA AST, resort
28# to barbaric mutilation of any SQL supplied in literal form
29
30sub _strip_outer_paren {
31 my ($self, $arg) = @_;
32
33 return $self->_SWITCH_refkind ($arg, {
34 ARRAYREFREF => sub {
35 $$arg->[0] = __strip_outer_paren ($$arg->[0]);
36 return $arg;
37 },
38 SCALARREF => sub {
39 return \__strip_outer_paren( $$arg );
40 },
41 FALLBACK => sub {
42 return $arg
43 },
44 });
45}
46
47sub __strip_outer_paren {
48 my $sql = shift;
49
50 if ($sql and not ref $sql) {
51 while ($sql =~ /^ \s* \( (.*) \) \s* $/x ) {
52 $sql = $1;
53 }
54 }
55
56 return $sql;
57}
58
59sub _where_field_IN {
60 my ($self, $lhs, $op, $rhs) = @_;
61 $rhs = $self->_strip_outer_paren ($rhs);
62 return $self->SUPER::_where_field_IN ($lhs, $op, $rhs);
63}
64
65sub _where_field_BETWEEN {
66 my ($self, $lhs, $op, $rhs) = @_;
67 $rhs = $self->_strip_outer_paren ($rhs);
68 return $self->SUPER::_where_field_BETWEEN ($lhs, $op, $rhs);
69}
70
71
72
73# DB2 is the only remaining DB using this. Even though we are not sure if
74# RowNumberOver is still needed here (should be part of SQLA) leave the
75# code in place
76sub _RowNumberOver {
77 my ($self, $sql, $order, $rows, $offset ) = @_;
78
79 $offset += 1;
03e1d9af 80 my $last = $rows + $offset - 1;
6f4ddea1 81 my ( $order_by ) = $self->_order_by( $order );
82
83 $sql = <<"SQL";
84SELECT * FROM
85(
86 SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
87 $sql
88 $order_by
89 ) Q1
90) Q2
91WHERE ROW_NUM BETWEEN $offset AND $last
92
93SQL
94
95 return $sql;
96}
97
98
99# While we're at it, this should make LIMIT queries more efficient,
100# without digging into things too deeply
6f4ddea1 101sub _find_syntax {
102 my ($self, $syntax) = @_;
ac788c46 103 return $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
6f4ddea1 104}
105
106sub select {
107 my ($self, $table, $fields, $where, $order, @rest) = @_;
db56cf3d 108 local $self->{having_bind} = [];
109 local $self->{from_bind} = [];
110
6f4ddea1 111 if (ref $table eq 'SCALAR') {
112 $table = $$table;
113 }
114 elsif (not ref $table) {
115 $table = $self->_quote($table);
116 }
117 local $self->{rownum_hack_count} = 1
118 if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
119 @rest = (-1) unless defined $rest[0];
e8fcf76f 120 croak "LIMIT 0 Does Not Compute" if $rest[0] == 0;
6f4ddea1 121 # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
db56cf3d 122 my ($sql, @where_bind) = $self->SUPER::select(
6f4ddea1 123 $table, $self->_recurse_fields($fields), $where, $order, @rest
124 );
125 $sql .=
126 $self->{for} ?
127 (
128 $self->{for} eq 'update' ? ' FOR UPDATE' :
129 $self->{for} eq 'shared' ? ' FOR SHARE' :
130 ''
131 ) :
132 ''
133 ;
db56cf3d 134 return wantarray ? ($sql, @{$self->{from_bind}}, @where_bind, @{$self->{having_bind}}) : $sql;
6f4ddea1 135}
136
137sub insert {
138 my $self = shift;
139 my $table = shift;
140 $table = $self->_quote($table) unless ref($table);
141 $self->SUPER::insert($table, @_);
142}
143
144sub update {
145 my $self = shift;
146 my $table = shift;
147 $table = $self->_quote($table) unless ref($table);
148 $self->SUPER::update($table, @_);
149}
150
151sub delete {
152 my $self = shift;
153 my $table = shift;
154 $table = $self->_quote($table) unless ref($table);
155 $self->SUPER::delete($table, @_);
156}
157
158sub _emulate_limit {
159 my $self = shift;
160 if ($_[3] == -1) {
161 return $_[1].$self->_order_by($_[2]);
162 } else {
163 return $self->SUPER::_emulate_limit(@_);
164 }
165}
166
167sub _recurse_fields {
168 my ($self, $fields, $params) = @_;
169 my $ref = ref $fields;
170 return $self->_quote($fields) unless $ref;
171 return $$fields if $ref eq 'SCALAR';
172
173 if ($ref eq 'ARRAY') {
174 return join(', ', map {
175 $self->_recurse_fields($_)
176 .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack})
177 ? ' AS col'.$self->{rownum_hack_count}++
178 : '')
179 } @$fields);
180 } elsif ($ref eq 'HASH') {
181 foreach my $func (keys %$fields) {
db56cf3d 182 if ($func eq 'distinct') {
183 my $_fields = $fields->{$func};
184 if (ref $_fields eq 'ARRAY' && @{$_fields} > 1) {
04ebc6f4 185 croak (
186 'The select => { distinct => ... } syntax is not supported for multiple columns.'
187 .' Instead please use { group_by => [ qw/' . (join ' ', @$_fields) . '/ ] }'
188 .' or { select => [ qw/' . (join ' ', @$_fields) . '/ ], distinct => 1 }'
189 );
db56cf3d 190 }
191 else {
192 $_fields = @{$_fields}[0] if ref $_fields eq 'ARRAY';
04ebc6f4 193 carp (
194 'The select => { distinct => ... } syntax will be deprecated in DBIC version 0.09,'
195 ." please use { group_by => '${_fields}' } or { select => '${_fields}', distinct => 1 }"
196 );
db56cf3d 197 }
198 }
6f4ddea1 199 return $self->_sqlcase($func)
200 .'( '.$self->_recurse_fields($fields->{$func}).' )';
201 }
202 }
203 # Is the second check absolutely necessary?
204 elsif ( $ref eq 'REF' and ref($$fields) eq 'ARRAY' ) {
db56cf3d 205 return $self->_fold_sqlbind( $fields );
6f4ddea1 206 }
207 else {
e8fcf76f 208 croak($ref . qq{ unexpected in _recurse_fields()})
6f4ddea1 209 }
210}
211
212sub _order_by {
213 my $self = shift;
214 my $ret = '';
215 my @extra;
216 if (ref $_[0] eq 'HASH') {
217 if (defined $_[0]->{group_by}) {
218 $ret = $self->_sqlcase(' group by ')
219 .$self->_recurse_fields($_[0]->{group_by}, { no_rownum_hack => 1 });
220 }
221 if (defined $_[0]->{having}) {
222 my $frag;
223 ($frag, @extra) = $self->_recurse_where($_[0]->{having});
224 push(@{$self->{having_bind}}, @extra);
225 $ret .= $self->_sqlcase(' having ').$frag;
226 }
227 if (defined $_[0]->{order_by}) {
228 $ret .= $self->_order_by($_[0]->{order_by});
229 }
230 if (grep { $_ =~ /^-(desc|asc)/i } keys %{$_[0]}) {
231 return $self->SUPER::_order_by($_[0]);
232 }
233 } elsif (ref $_[0] eq 'SCALAR') {
234 $ret = $self->_sqlcase(' order by ').${ $_[0] };
235 } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
236 my @order = @{+shift};
237 $ret = $self->_sqlcase(' order by ')
238 .join(', ', map {
239 my $r = $self->_order_by($_, @_);
240 $r =~ s/^ ?ORDER BY //i;
241 $r;
242 } @order);
243 } else {
244 $ret = $self->SUPER::_order_by(@_);
245 }
246 return $ret;
247}
248
249sub _order_directions {
250 my ($self, $order) = @_;
fde3719a 251 return $self->SUPER::_order_directions( $self->_resolve_order($order) );
252}
253
254sub _resolve_order {
255 my ($self, $order) = @_;
256 $order = $order->{order_by} if (ref $order eq 'HASH' and $order->{order_by});
257
fd4cb60a 258 if (ref $order eq 'HASH') {
fde3719a 259 $order = [$self->_resolve_order_hash($order)];
260 }
261 elsif (ref $order eq 'ARRAY') {
fd4cb60a 262 $order = [map {
fde3719a 263 if (ref ($_) eq 'SCALAR') {
264 $$_
265 }
266 elsif (ref ($_) eq 'HASH') {
267 $self->_resolve_order_hash($_)
fd4cb60a 268 }
fde3719a 269 else {
270 $_
271 }
272 } @$order];
fd4cb60a 273 }
fde3719a 274
275 return $order;
6f4ddea1 276}
277
fde3719a 278sub _resolve_order_hash {
fd4cb60a 279 my ($self, $order) = @_;
e3764383 280 my @new_order;
281 foreach my $key (keys %{ $order }) {
282 if ($key =~ /^-(desc|asc)/i ) {
283 my $direction = $1;
284 my $type = ref $order->{ $key };
285 if ($type eq 'ARRAY') {
286 push @new_order, map( "$_ $direction", @{ $order->{ $key } } );
287 } elsif (!$type) {
288 push @new_order, "$order->{$key} $direction";
289 } else {
290 croak "hash order_by can only contain Scalar or Array, not $type";
291 }
292 } else {
293 croak "$key is not a valid direction, use -asc or -desc";
294 }
295 }
296 return @new_order;
fd4cb60a 297}
298
6f4ddea1 299sub _table {
300 my ($self, $from) = @_;
301 if (ref $from eq 'ARRAY') {
302 return $self->_recurse_from(@$from);
303 } elsif (ref $from eq 'HASH') {
304 return $self->_make_as($from);
305 } else {
306 return $from; # would love to quote here but _table ends up getting called
307 # twice during an ->select without a limit clause due to
308 # the way S::A::Limit->select works. should maybe consider
309 # bypassing this and doing S::A::select($self, ...) in
310 # our select method above. meantime, quoting shims have
311 # been added to select/insert/update/delete here
312 }
313}
314
315sub _recurse_from {
316 my ($self, $from, @join) = @_;
317 my @sqlf;
318 push(@sqlf, $self->_make_as($from));
319 foreach my $j (@join) {
320 my ($to, $on) = @$j;
321
322 # check whether a join type exists
323 my $join_clause = '';
324 my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
325 if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
326 $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
327 } else {
328 $join_clause = ' JOIN ';
329 }
330 push(@sqlf, $join_clause);
331
332 if (ref $to eq 'ARRAY') {
333 push(@sqlf, '(', $self->_recurse_from(@$to), ')');
334 } else {
335 push(@sqlf, $self->_make_as($to));
336 }
337 push(@sqlf, ' ON ', $self->_join_condition($on));
338 }
339 return join('', @sqlf);
340}
341
db56cf3d 342sub _fold_sqlbind {
343 my ($self, $sqlbind) = @_;
69989ea9 344
345 my @sqlbind = @$$sqlbind; # copy
346 my $sql = shift @sqlbind;
347 push @{$self->{from_bind}}, @sqlbind;
348
db56cf3d 349 return $sql;
6f4ddea1 350}
351
352sub _make_as {
353 my ($self, $from) = @_;
db56cf3d 354 return join(' ', map { (ref $_ eq 'SCALAR' ? $$_
355 : ref $_ eq 'REF' ? $self->_fold_sqlbind($_)
356 : $self->_quote($_))
6f4ddea1 357 } reverse each %{$self->_skip_options($from)});
358}
359
360sub _skip_options {
361 my ($self, $hash) = @_;
362 my $clean_hash = {};
363 $clean_hash->{$_} = $hash->{$_}
364 for grep {!/^-/} keys %$hash;
365 return $clean_hash;
366}
367
368sub _join_condition {
369 my ($self, $cond) = @_;
370 if (ref $cond eq 'HASH') {
371 my %j;
372 for (keys %$cond) {
373 my $v = $cond->{$_};
374 if (ref $v) {
e8fcf76f 375 croak (ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
6f4ddea1 376 if ref($v) ne 'SCALAR';
377 $j{$_} = $v;
378 }
379 else {
380 my $x = '= '.$self->_quote($v); $j{$_} = \$x;
381 }
382 };
383 return scalar($self->_recurse_where(\%j));
384 } elsif (ref $cond eq 'ARRAY') {
385 return join(' OR ', map { $self->_join_condition($_) } @$cond);
386 } else {
387 die "Can't handle this yet!";
388 }
389}
390
391sub _quote {
392 my ($self, $label) = @_;
393 return '' unless defined $label;
394 return "*" if $label eq '*';
395 return $label unless $self->{quote_char};
396 if(ref $self->{quote_char} eq "ARRAY"){
397 return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
398 if !defined $self->{name_sep};
399 my $sep = $self->{name_sep};
400 return join($self->{name_sep},
401 map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1] }
402 split(/\Q$sep\E/,$label));
403 }
404 return $self->SUPER::_quote($label);
405}
406
407sub limit_dialect {
408 my $self = shift;
409 $self->{limit_dialect} = shift if @_;
410 return $self->{limit_dialect};
411}
412
413sub quote_char {
414 my $self = shift;
415 $self->{quote_char} = shift if @_;
416 return $self->{quote_char};
417}
418
419sub name_sep {
420 my $self = shift;
421 $self->{name_sep} = shift if @_;
422 return $self->{name_sep};
423}
424
4251;
426
427__END__
428
429=pod
430
431=head1 NAME
432
855c6fd0 433DBIx::Class::SQLAHacks - This module is a subclass of SQL::Abstract::Limit
434and includes a number of DBIC-specific workarounds, not yet suitable for
435inclusion into SQLA proper.
6f4ddea1 436
437=head1 METHODS
438
439=head2 new
440
441Tries to determine limit dialect.
442
443=head2 select
444
445Quotes table names, handles "limit" dialects (e.g. where rownum between x and
446y), supports SELECT ... FOR UPDATE and SELECT ... FOR SHARE.
447
448=head2 insert update delete
449
450Just quotes table names.
451
452=head2 limit_dialect
453
454Specifies the dialect of used for implementing an SQL "limit" clause for
455restricting the number of query results returned. Valid values are: RowNum.
456
457See L<DBIx::Class::Storage::DBI/connect_info> for details.
458
459=head2 name_sep
460
461Character separating quoted table names.
462
463See L<DBIx::Class::Storage::DBI/connect_info> for details.
464
465=head2 quote_char
466
467Set to an array-ref to specify separate left and right quotes for table names.
468
469See L<DBIx::Class::Storage::DBI/connect_info> for details.
470
471=cut
472