Define how Top limit emulation should behave
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / SQLAHacks.pm
1 package # Hide from PAUSE
2   DBIx::Class::SQLAHacks;
3
4 use base qw/SQL::Abstract::Limit/;
5 use strict;
6 use warnings;
7 use Carp::Clan qw/^DBIx::Class/;
8 use Scalar::Util();
9
10 sub new {
11   my $self = shift->SUPER::new(@_);
12
13   # This prevents the caching of $dbh in S::A::L, I believe
14   # If limit_dialect is a ref (like a $dbh), go ahead and replace
15   #   it with what it resolves to:
16   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
17     if ref $self->{limit_dialect};
18
19   $self;
20 }
21
22
23 # Some databases (sqlite) do not handle multiple parenthesis
24 # around in/between arguments. A tentative x IN ( ( 1, 2 ,3) )
25 # is interpreted as x IN 1 or something similar.
26 #
27 # Since we currently do not have access to the SQLA AST, resort
28 # to barbaric mutilation of any SQL supplied in literal form
29
30 sub _strip_outer_paren {
31   my ($self, $arg) = @_;
32
33   return $self->_SWITCH_refkind ($arg, {
34     ARRAYREFREF => sub {
35       $$arg->[0] = __strip_outer_paren ($$arg->[0]);
36       return $arg;
37     },
38     SCALARREF => sub {
39       return \__strip_outer_paren( $$arg );
40     },
41     FALLBACK => sub {
42       return $arg
43     },
44   });
45 }
46
47 sub __strip_outer_paren {
48   my $sql = shift;
49
50   if ($sql and not ref $sql) {
51     while ($sql =~ /^ \s* \( (.*) \) \s* $/x ) {
52       $sql = $1;
53     }
54   }
55
56   return $sql;
57 }
58
59 sub _where_field_IN {
60   my ($self, $lhs, $op, $rhs) = @_;
61   $rhs = $self->_strip_outer_paren ($rhs);
62   return $self->SUPER::_where_field_IN ($lhs, $op, $rhs);
63 }
64
65 sub _where_field_BETWEEN {
66   my ($self, $lhs, $op, $rhs) = @_;
67   $rhs = $self->_strip_outer_paren ($rhs);
68   return $self->SUPER::_where_field_BETWEEN ($lhs, $op, $rhs);
69 }
70
71
72
73 # DB2 is the only remaining DB using this. Even though we are not sure if
74 # RowNumberOver is still needed here (should be part of SQLA) leave the 
75 # code in place
76 sub _RowNumberOver {
77   my ($self, $sql, $order, $rows, $offset ) = @_;
78
79   $offset += 1;
80   my $last = $rows + $offset - 1;
81   my ( $order_by ) = $self->_order_by( $order );
82
83   $sql = <<"SQL";
84 SELECT * FROM
85 (
86    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
87       $sql
88       $order_by
89    ) Q1
90 ) Q2
91 WHERE ROW_NUM BETWEEN $offset AND $last
92
93 SQL
94
95   return $sql;
96 }
97
98
99 # While we're at it, this should make LIMIT queries more efficient,
100 #  without digging into things too deeply
101 sub _find_syntax {
102   my ($self, $syntax) = @_;
103   return $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
104 }
105
106 sub select {
107   my ($self, $table, $fields, $where, $order, @rest) = @_;
108   local $self->{having_bind} = [];
109   local $self->{from_bind} = [];
110
111   if (ref $table eq 'SCALAR') {
112     $table = $$table;
113   }
114   elsif (not ref $table) {
115     $table = $self->_quote($table);
116   }
117   local $self->{rownum_hack_count} = 1
118     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
119   @rest = (-1) unless defined $rest[0];
120   croak "LIMIT 0 Does Not Compute" if $rest[0] == 0;
121     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
122   my ($sql, @where_bind) = $self->SUPER::select(
123     $table, $self->_recurse_fields($fields), $where, $order, @rest
124   );
125   $sql .= 
126     $self->{for} ?
127     (
128       $self->{for} eq 'update' ? ' FOR UPDATE' :
129       $self->{for} eq 'shared' ? ' FOR SHARE'  :
130       ''
131     ) :
132     ''
133   ;
134   return wantarray ? ($sql, @{$self->{from_bind}}, @where_bind, @{$self->{having_bind}}) : $sql;
135 }
136
137 sub insert {
138   my $self = shift;
139   my $table = shift;
140   $table = $self->_quote($table) unless ref($table);
141   $self->SUPER::insert($table, @_);
142 }
143
144 sub update {
145   my $self = shift;
146   my $table = shift;
147   $table = $self->_quote($table) unless ref($table);
148   $self->SUPER::update($table, @_);
149 }
150
151 sub delete {
152   my $self = shift;
153   my $table = shift;
154   $table = $self->_quote($table) unless ref($table);
155   $self->SUPER::delete($table, @_);
156 }
157
158 sub _emulate_limit {
159   my $self = shift;
160   if ($_[3] == -1) {
161     return $_[1].$self->_order_by($_[2]);
162   } else {
163     return $self->SUPER::_emulate_limit(@_);
164   }
165 }
166
167 sub _recurse_fields {
168   my ($self, $fields, $params) = @_;
169   my $ref = ref $fields;
170   return $self->_quote($fields) unless $ref;
171   return $$fields if $ref eq 'SCALAR';
172
173   if ($ref eq 'ARRAY') {
174     return join(', ', map {
175       $self->_recurse_fields($_)
176         .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack})
177           ? ' AS col'.$self->{rownum_hack_count}++
178           : '')
179       } @$fields);
180   } elsif ($ref eq 'HASH') {
181     foreach my $func (keys %$fields) {
182       if ($func eq 'distinct') {
183         my $_fields = $fields->{$func};
184         if (ref $_fields eq 'ARRAY' && @{$_fields} > 1) {
185           croak (
186             'The select => { distinct => ... } syntax is not supported for multiple columns.'
187            .' Instead please use { group_by => [ qw/' . (join ' ', @$_fields) . '/ ] }'
188            .' or { select => [ qw/' . (join ' ', @$_fields) . '/ ], distinct => 1 }'
189           );
190         }
191         else {
192           $_fields = @{$_fields}[0] if ref $_fields eq 'ARRAY';
193           carp (
194             'The select => { distinct => ... } syntax will be deprecated in DBIC version 0.09,'
195            ." please use { group_by => '${_fields}' } or { select => '${_fields}', distinct => 1 }"
196           );
197         }
198       }
199       return $self->_sqlcase($func)
200         .'( '.$self->_recurse_fields($fields->{$func}).' )';
201     }
202   }
203   # Is the second check absolutely necessary?
204   elsif ( $ref eq 'REF' and ref($$fields) eq 'ARRAY' ) {
205     return $self->_fold_sqlbind( $fields );
206   }
207   else {
208     croak($ref . qq{ unexpected in _recurse_fields()})
209   }
210 }
211
212 sub _order_by {
213   my $self = shift;
214   my $ret = '';
215   my @extra;
216   if (ref $_[0] eq 'HASH') {
217     if (defined $_[0]->{group_by}) {
218       $ret = $self->_sqlcase(' group by ')
219         .$self->_recurse_fields($_[0]->{group_by}, { no_rownum_hack => 1 });
220     }
221     if (defined $_[0]->{having}) {
222       my $frag;
223       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
224       push(@{$self->{having_bind}}, @extra);
225       $ret .= $self->_sqlcase(' having ').$frag;
226     }
227     if (defined $_[0]->{order_by}) {
228       $ret .= $self->_order_by($_[0]->{order_by});
229     }
230     if (grep { $_ =~ /^-(desc|asc)/i } keys %{$_[0]}) {
231       return $self->SUPER::_order_by($_[0]);
232     }
233   } elsif (ref $_[0] eq 'SCALAR') {
234     $ret = $self->_sqlcase(' order by ').${ $_[0] };
235   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
236     my @order = @{+shift};
237     $ret = $self->_sqlcase(' order by ')
238           .join(', ', map {
239                         my $r = $self->_order_by($_, @_);
240                         $r =~ s/^ ?ORDER BY //i;
241                         $r;
242                       } @order);
243   } else {
244     $ret = $self->SUPER::_order_by(@_);
245   }
246   return $ret;
247 }
248
249 sub _order_directions {
250   my ($self, $order) = @_;
251   return $self->SUPER::_order_directions( $self->_resolve_order($order) );
252 }
253
254 sub _resolve_order {
255   my ($self, $order) = @_;
256   $order = $order->{order_by} if (ref $order eq 'HASH' and $order->{order_by});
257
258   if (ref $order eq 'HASH') {
259     $order = [$self->_resolve_order_hash($order)];
260   }
261   elsif (ref $order eq 'ARRAY') {
262     $order = [map {
263       if (ref ($_) eq 'SCALAR') {
264         $$_
265       }
266       elsif (ref ($_) eq 'HASH') {
267         $self->_resolve_order_hash($_)
268       }
269       else {
270         $_
271       }
272     }  @$order];
273   }
274
275   return $order;
276 }
277
278 sub _resolve_order_hash {
279   my ($self, $order) = @_;
280   my @new_order;
281   foreach my $key (keys %{ $order }) {
282     if ($key =~ /^-(desc|asc)/i ) {
283       my $direction = $1;
284       my $type = ref $order->{ $key };
285       if ($type eq 'ARRAY') {
286         push @new_order, map( "$_ $direction", @{ $order->{ $key } } );
287       } elsif (!$type) {
288         push @new_order, "$order->{$key} $direction";
289       } else {
290         croak "hash order_by can only contain Scalar or Array, not $type";
291       }
292     } else {
293       croak "$key is not a valid direction, use -asc or -desc";
294     }
295   }
296   return @new_order;
297 }
298
299 sub _table {
300   my ($self, $from) = @_;
301   if (ref $from eq 'ARRAY') {
302     return $self->_recurse_from(@$from);
303   } elsif (ref $from eq 'HASH') {
304     return $self->_make_as($from);
305   } else {
306     return $from; # would love to quote here but _table ends up getting called
307                   # twice during an ->select without a limit clause due to
308                   # the way S::A::Limit->select works. should maybe consider
309                   # bypassing this and doing S::A::select($self, ...) in
310                   # our select method above. meantime, quoting shims have
311                   # been added to select/insert/update/delete here
312   }
313 }
314
315 sub _recurse_from {
316   my ($self, $from, @join) = @_;
317   my @sqlf;
318   push(@sqlf, $self->_make_as($from));
319   foreach my $j (@join) {
320     my ($to, $on) = @$j;
321
322     # check whether a join type exists
323     my $join_clause = '';
324     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
325     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
326       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
327     } else {
328       $join_clause = ' JOIN ';
329     }
330     push(@sqlf, $join_clause);
331
332     if (ref $to eq 'ARRAY') {
333       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
334     } else {
335       push(@sqlf, $self->_make_as($to));
336     }
337     push(@sqlf, ' ON ', $self->_join_condition($on));
338   }
339   return join('', @sqlf);
340 }
341
342 sub _fold_sqlbind {
343   my ($self, $sqlbind) = @_;
344
345   my @sqlbind = @$$sqlbind; # copy
346   my $sql = shift @sqlbind;
347   push @{$self->{from_bind}}, @sqlbind;
348
349   return $sql;
350 }
351
352 sub _make_as {
353   my ($self, $from) = @_;
354   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_
355                         : ref $_ eq 'REF'    ? $self->_fold_sqlbind($_)
356                         : $self->_quote($_))
357                        } reverse each %{$self->_skip_options($from)});
358 }
359
360 sub _skip_options {
361   my ($self, $hash) = @_;
362   my $clean_hash = {};
363   $clean_hash->{$_} = $hash->{$_}
364     for grep {!/^-/} keys %$hash;
365   return $clean_hash;
366 }
367
368 sub _join_condition {
369   my ($self, $cond) = @_;
370   if (ref $cond eq 'HASH') {
371     my %j;
372     for (keys %$cond) {
373       my $v = $cond->{$_};
374       if (ref $v) {
375         croak (ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
376             if ref($v) ne 'SCALAR';
377         $j{$_} = $v;
378       }
379       else {
380         my $x = '= '.$self->_quote($v); $j{$_} = \$x;
381       }
382     };
383     return scalar($self->_recurse_where(\%j));
384   } elsif (ref $cond eq 'ARRAY') {
385     return join(' OR ', map { $self->_join_condition($_) } @$cond);
386   } else {
387     die "Can't handle this yet!";
388   }
389 }
390
391 sub _quote {
392   my ($self, $label) = @_;
393   return '' unless defined $label;
394   return "*" if $label eq '*';
395   return $label unless $self->{quote_char};
396   if(ref $self->{quote_char} eq "ARRAY"){
397     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
398       if !defined $self->{name_sep};
399     my $sep = $self->{name_sep};
400     return join($self->{name_sep},
401         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
402        split(/\Q$sep\E/,$label));
403   }
404   return $self->SUPER::_quote($label);
405 }
406
407 sub limit_dialect {
408     my $self = shift;
409     $self->{limit_dialect} = shift if @_;
410     return $self->{limit_dialect};
411 }
412
413 sub quote_char {
414     my $self = shift;
415     $self->{quote_char} = shift if @_;
416     return $self->{quote_char};
417 }
418
419 sub name_sep {
420     my $self = shift;
421     $self->{name_sep} = shift if @_;
422     return $self->{name_sep};
423 }
424
425 1;
426
427 __END__
428
429 =pod
430
431 =head1 NAME
432
433 DBIx::Class::SQLAHacks - This module is a subclass of SQL::Abstract::Limit
434 and includes a number of DBIC-specific workarounds, not yet suitable for
435 inclusion into SQLA proper.
436
437 =head1 METHODS
438
439 =head2 new
440
441 Tries to determine limit dialect.
442
443 =head2 select
444
445 Quotes table names, handles "limit" dialects (e.g. where rownum between x and
446 y), supports SELECT ... FOR UPDATE and SELECT ... FOR SHARE.
447
448 =head2 insert update delete
449
450 Just quotes table names.
451
452 =head2 limit_dialect
453
454 Specifies the dialect of used for implementing an SQL "limit" clause for
455 restricting the number of query results returned.  Valid values are: RowNum.
456
457 See L<DBIx::Class::Storage::DBI/connect_info> for details.
458
459 =head2 name_sep
460
461 Character separating quoted table names.
462
463 See L<DBIx::Class::Storage::DBI/connect_info> for details.
464
465 =head2 quote_char
466
467 Set to an array-ref to specify separate left and right quotes for table names.
468
469 See L<DBIx::Class::Storage::DBI/connect_info> for details.
470
471 =cut
472