f05970baba13939432f2838fff36ac3e9791bfb2
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / SQLAHacks.pm
1 package # Hide from PAUSE
2   DBIx::Class::SQLAHacks;
3
4 use base qw/SQL::Abstract::Limit/;
5 use strict;
6 use warnings;
7 use Carp::Clan qw/^DBIx::Class/;
8
9 sub new {
10   my $self = shift->SUPER::new(@_);
11
12   # This prevents the caching of $dbh in S::A::L, I believe
13   # If limit_dialect is a ref (like a $dbh), go ahead and replace
14   #   it with what it resolves to:
15   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
16     if ref $self->{limit_dialect};
17
18   $self;
19 }
20
21
22 # Some databases (sqlite) do not handle multiple parenthesis
23 # around in/between arguments. A tentative x IN ( ( 1, 2 ,3) )
24 # is interpreted as x IN 1 or something similar.
25 #
26 # Since we currently do not have access to the SQLA AST, resort
27 # to barbaric mutilation of any SQL supplied in literal form
28
29 sub _strip_outer_paren {
30   my ($self, $arg) = @_;
31
32   return $self->_SWITCH_refkind ($arg, {
33     ARRAYREFREF => sub {
34       $$arg->[0] = __strip_outer_paren ($$arg->[0]);
35       return $arg;
36     },
37     SCALARREF => sub {
38       return \__strip_outer_paren( $$arg );
39     },
40     FALLBACK => sub {
41       return $arg
42     },
43   });
44 }
45
46 sub __strip_outer_paren {
47   my $sql = shift;
48
49   if ($sql and not ref $sql) {
50     while ($sql =~ /^ \s* \( (.*) \) \s* $/x ) {
51       $sql = $1;
52     }
53   }
54
55   return $sql;
56 }
57
58 sub _where_field_IN {
59   my ($self, $lhs, $op, $rhs) = @_;
60   $rhs = $self->_strip_outer_paren ($rhs);
61   return $self->SUPER::_where_field_IN ($lhs, $op, $rhs);
62 }
63
64 sub _where_field_BETWEEN {
65   my ($self, $lhs, $op, $rhs) = @_;
66   $rhs = $self->_strip_outer_paren ($rhs);
67   return $self->SUPER::_where_field_BETWEEN ($lhs, $op, $rhs);
68 }
69
70 # Slow but ANSI standard Limit/Offset support. DB2 uses this
71 sub _RowNumberOver {
72   my ($self, $sql, $order, $rows, $offset ) = @_;
73
74   $offset += 1;
75   my $last = $rows + $offset - 1;
76   my ( $order_by ) = $self->_order_by( $order );
77
78   $sql = <<"SQL";
79 SELECT * FROM
80 (
81    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
82       $sql
83       $order_by
84    ) Q1
85 ) Q2
86 WHERE ROW_NUM BETWEEN $offset AND $last
87
88 SQL
89
90   return $sql;
91 }
92
93 # Crappy Top based Limit/Offset support. MSSQL uses this currently,
94 # but may have to switch to RowNumberOver one day
95 sub _Top {
96   my ( $self, $sql, $order, $rows, $offset ) = @_;
97
98   croak '$order supplied to SQLAHacks limit emulators must be a hash'
99     if (ref $order ne 'HASH');
100
101   $order = { %$order }; #copy
102
103   my $last = $rows + $offset;
104
105   my $req_order = $self->_order_by ($order->{order_by});
106
107   my $limit_order = $req_order ? $order->{order_by} : $order->{_virtual_order_by};
108
109   delete $order->{$_} for qw/order_by _virtual_order_by/;
110   my $grpby_having = $self->_order_by ($order);
111
112   my ( $order_by_inner, $order_by_outer ) = $self->_order_directions($limit_order);
113
114   $sql =~ s/^\s*(SELECT|select)//;
115
116   $sql = <<"SQL";
117   SELECT * FROM
118   (
119     SELECT TOP $rows * FROM
120     (
121         SELECT TOP $last $sql $grpby_having $order_by_inner
122     ) AS foo
123     $order_by_outer
124   ) AS bar
125   $req_order
126
127 SQL
128     return $sql;
129 }
130
131
132
133 # While we're at it, this should make LIMIT queries more efficient,
134 #  without digging into things too deeply
135 sub _find_syntax {
136   my ($self, $syntax) = @_;
137   return $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
138 }
139
140 sub select {
141   my ($self, $table, $fields, $where, $order, @rest) = @_;
142
143   $self->{"${_}_bind"} = [] for (qw/having from order/);
144
145   if (ref $table eq 'SCALAR') {
146     $table = $$table;
147   }
148   elsif (not ref $table) {
149     $table = $self->_quote($table);
150   }
151   local $self->{rownum_hack_count} = 1
152     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
153   @rest = (-1) unless defined $rest[0];
154   croak "LIMIT 0 Does Not Compute" if $rest[0] == 0;
155     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
156   my ($sql, @where_bind) = $self->SUPER::select(
157     $table, $self->_recurse_fields($fields), $where, $order, @rest
158   );
159   $sql .= 
160     $self->{for} ?
161     (
162       $self->{for} eq 'update' ? ' FOR UPDATE' :
163       $self->{for} eq 'shared' ? ' FOR SHARE'  :
164       ''
165     ) :
166     ''
167   ;
168   return wantarray ? ($sql, @{$self->{from_bind}}, @where_bind, @{$self->{having_bind}}, @{$self->{order_bind}} ) : $sql;
169 }
170
171 sub insert {
172   my $self = shift;
173   my $table = shift;
174   $table = $self->_quote($table) unless ref($table);
175   $self->SUPER::insert($table, @_);
176 }
177
178 sub update {
179   my $self = shift;
180   my $table = shift;
181   $table = $self->_quote($table) unless ref($table);
182   $self->SUPER::update($table, @_);
183 }
184
185 sub delete {
186   my $self = shift;
187   my $table = shift;
188   $table = $self->_quote($table) unless ref($table);
189   $self->SUPER::delete($table, @_);
190 }
191
192 sub _emulate_limit {
193   my $self = shift;
194   if ($_[3] == -1) {
195     return $_[1].$self->_order_by($_[2]);
196   } else {
197     return $self->SUPER::_emulate_limit(@_);
198   }
199 }
200
201 sub _recurse_fields {
202   my ($self, $fields, $params) = @_;
203   my $ref = ref $fields;
204   return $self->_quote($fields) unless $ref;
205   return $$fields if $ref eq 'SCALAR';
206
207   if ($ref eq 'ARRAY') {
208     return join(', ', map {
209       $self->_recurse_fields($_)
210         .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack})
211           ? ' AS col'.$self->{rownum_hack_count}++
212           : '')
213       } @$fields);
214   } elsif ($ref eq 'HASH') {
215     foreach my $func (keys %$fields) {
216       if ($func eq 'distinct') {
217         my $_fields = $fields->{$func};
218         if (ref $_fields eq 'ARRAY' && @{$_fields} > 1) {
219           croak (
220             'The select => { distinct => ... } syntax is not supported for multiple columns.'
221            .' Instead please use { group_by => [ qw/' . (join ' ', @$_fields) . '/ ] }'
222            .' or { select => [ qw/' . (join ' ', @$_fields) . '/ ], distinct => 1 }'
223           );
224         }
225         else {
226           $_fields = @{$_fields}[0] if ref $_fields eq 'ARRAY';
227           carp (
228             'The select => { distinct => ... } syntax will be deprecated in DBIC version 0.09,'
229            ." please use { group_by => '${_fields}' } or { select => '${_fields}', distinct => 1 }"
230           );
231         }
232       }
233       return $self->_sqlcase($func)
234         .'( '.$self->_recurse_fields($fields->{$func}).' )';
235     }
236   }
237   # Is the second check absolutely necessary?
238   elsif ( $ref eq 'REF' and ref($$fields) eq 'ARRAY' ) {
239     return $self->_fold_sqlbind( $fields );
240   }
241   else {
242     croak($ref . qq{ unexpected in _recurse_fields()})
243   }
244 }
245
246 sub _order_by {
247   my ($self, $arg) = @_;
248
249   if (ref $arg eq 'HASH' and keys %$arg and not grep { $_ =~ /^-(?:desc|asc)/i } keys %$arg ) {
250
251     my $ret = '';
252
253     if (defined $arg->{group_by}) {
254       $ret = $self->_sqlcase(' group by ')
255         .$self->_recurse_fields($arg->{group_by}, { no_rownum_hack => 1 });
256     }
257
258     if (defined $arg->{having}) {
259       my ($frag, @bind) = $self->_recurse_where($arg->{having});
260       push(@{$self->{having_bind}}, @bind);
261       $ret .= $self->_sqlcase(' having ').$frag;
262     }
263
264     if (defined $arg->{order_by}) {
265       my ($frag, @bind) = $self->SUPER::_order_by($arg->{order_by});
266       push(@{$self->{order_bind}}, @bind);
267       $ret .= $frag;
268     }
269
270     return $ret;
271   }
272   else {
273     my ($sql, @bind) = $self->SUPER::_order_by ($arg);
274     push(@{$self->{order_bind}}, @bind);
275     return $sql;
276   }
277 }
278
279 sub _order_directions {
280   my ($self, $order) = @_;
281
282   # strip bind values - none of the current _order_directions users support them
283   return $self->SUPER::_order_directions( [ map
284     { ref $_ ? $_->[0] : $_ }
285     $self->_order_by_chunks ($order)
286   ]);
287 }
288
289 sub _table {
290   my ($self, $from) = @_;
291   if (ref $from eq 'ARRAY') {
292     return $self->_recurse_from(@$from);
293   } elsif (ref $from eq 'HASH') {
294     return $self->_make_as($from);
295   } else {
296     return $from; # would love to quote here but _table ends up getting called
297                   # twice during an ->select without a limit clause due to
298                   # the way S::A::Limit->select works. should maybe consider
299                   # bypassing this and doing S::A::select($self, ...) in
300                   # our select method above. meantime, quoting shims have
301                   # been added to select/insert/update/delete here
302   }
303 }
304
305 sub _recurse_from {
306   my ($self, $from, @join) = @_;
307   my @sqlf;
308   push(@sqlf, $self->_make_as($from));
309   foreach my $j (@join) {
310     my ($to, $on) = @$j;
311
312     # check whether a join type exists
313     my $join_clause = '';
314     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
315     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
316       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
317     } else {
318       $join_clause = ' JOIN ';
319     }
320     push(@sqlf, $join_clause);
321
322     if (ref $to eq 'ARRAY') {
323       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
324     } else {
325       push(@sqlf, $self->_make_as($to));
326     }
327     push(@sqlf, ' ON ', $self->_join_condition($on));
328   }
329   return join('', @sqlf);
330 }
331
332 sub _fold_sqlbind {
333   my ($self, $sqlbind) = @_;
334
335   my @sqlbind = @$$sqlbind; # copy
336   my $sql = shift @sqlbind;
337   push @{$self->{from_bind}}, @sqlbind;
338
339   return $sql;
340 }
341
342 sub _make_as {
343   my ($self, $from) = @_;
344   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_
345                         : ref $_ eq 'REF'    ? $self->_fold_sqlbind($_)
346                         : $self->_quote($_))
347                        } reverse each %{$self->_skip_options($from)});
348 }
349
350 sub _skip_options {
351   my ($self, $hash) = @_;
352   my $clean_hash = {};
353   $clean_hash->{$_} = $hash->{$_}
354     for grep {!/^-/} keys %$hash;
355   return $clean_hash;
356 }
357
358 sub _join_condition {
359   my ($self, $cond) = @_;
360   if (ref $cond eq 'HASH') {
361     my %j;
362     for (keys %$cond) {
363       my $v = $cond->{$_};
364       if (ref $v) {
365         croak (ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
366             if ref($v) ne 'SCALAR';
367         $j{$_} = $v;
368       }
369       else {
370         my $x = '= '.$self->_quote($v); $j{$_} = \$x;
371       }
372     };
373     return scalar($self->_recurse_where(\%j));
374   } elsif (ref $cond eq 'ARRAY') {
375     return join(' OR ', map { $self->_join_condition($_) } @$cond);
376   } else {
377     die "Can't handle this yet!";
378   }
379 }
380
381 sub _quote {
382   my ($self, $label) = @_;
383   return '' unless defined $label;
384   return "*" if $label eq '*';
385   return $label unless $self->{quote_char};
386   if(ref $self->{quote_char} eq "ARRAY"){
387     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
388       if !defined $self->{name_sep};
389     my $sep = $self->{name_sep};
390     return join($self->{name_sep},
391         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
392        split(/\Q$sep\E/,$label));
393   }
394   return $self->SUPER::_quote($label);
395 }
396
397 sub limit_dialect {
398     my $self = shift;
399     $self->{limit_dialect} = shift if @_;
400     return $self->{limit_dialect};
401 }
402
403 sub quote_char {
404     my $self = shift;
405     $self->{quote_char} = shift if @_;
406     return $self->{quote_char};
407 }
408
409 sub name_sep {
410     my $self = shift;
411     $self->{name_sep} = shift if @_;
412     return $self->{name_sep};
413 }
414
415 1;
416
417 __END__
418
419 =pod
420
421 =head1 NAME
422
423 DBIx::Class::SQLAHacks - This module is a subclass of SQL::Abstract::Limit
424 and includes a number of DBIC-specific workarounds, not yet suitable for
425 inclusion into SQLA proper.
426
427 =head1 METHODS
428
429 =head2 new
430
431 Tries to determine limit dialect.
432
433 =head2 select
434
435 Quotes table names, handles "limit" dialects (e.g. where rownum between x and
436 y), supports SELECT ... FOR UPDATE and SELECT ... FOR SHARE.
437
438 =head2 insert update delete
439
440 Just quotes table names.
441
442 =head2 limit_dialect
443
444 Specifies the dialect of used for implementing an SQL "limit" clause for
445 restricting the number of query results returned.  Valid values are: RowNum.
446
447 See L<DBIx::Class::Storage::DBI/connect_info> for details.
448
449 =head2 name_sep
450
451 Character separating quoted table names.
452
453 See L<DBIx::Class::Storage::DBI/connect_info> for details.
454
455 =head2 quote_char
456
457 Set to an array-ref to specify separate left and right quotes for table names.
458
459 See L<DBIx::Class::Storage::DBI/connect_info> for details.
460
461 =cut
462