Merge 'top_limit_tweaks' into 'top_limit_altfix'
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / SQLAHacks.pm
1 package # Hide from PAUSE
2   DBIx::Class::SQLAHacks;
3
4 use base qw/SQL::Abstract::Limit/;
5 use strict;
6 use warnings;
7 use Carp::Clan qw/^DBIx::Class/;
8 use Scalar::Util();
9
10 sub new {
11   my $self = shift->SUPER::new(@_);
12
13   # This prevents the caching of $dbh in S::A::L, I believe
14   # If limit_dialect is a ref (like a $dbh), go ahead and replace
15   #   it with what it resolves to:
16   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
17     if ref $self->{limit_dialect};
18
19   $self;
20 }
21
22
23 # Some databases (sqlite) do not handle multiple parenthesis
24 # around in/between arguments. A tentative x IN ( ( 1, 2 ,3) )
25 # is interpreted as x IN 1 or something similar.
26 #
27 # Since we currently do not have access to the SQLA AST, resort
28 # to barbaric mutilation of any SQL supplied in literal form
29
30 sub _strip_outer_paren {
31   my ($self, $arg) = @_;
32
33   return $self->_SWITCH_refkind ($arg, {
34     ARRAYREFREF => sub {
35       $$arg->[0] = __strip_outer_paren ($$arg->[0]);
36       return $arg;
37     },
38     SCALARREF => sub {
39       return \__strip_outer_paren( $$arg );
40     },
41     FALLBACK => sub {
42       return $arg
43     },
44   });
45 }
46
47 sub __strip_outer_paren {
48   my $sql = shift;
49
50   if ($sql and not ref $sql) {
51     while ($sql =~ /^ \s* \( (.*) \) \s* $/x ) {
52       $sql = $1;
53     }
54   }
55
56   return $sql;
57 }
58
59 sub _where_field_IN {
60   my ($self, $lhs, $op, $rhs) = @_;
61   $rhs = $self->_strip_outer_paren ($rhs);
62   return $self->SUPER::_where_field_IN ($lhs, $op, $rhs);
63 }
64
65 sub _where_field_BETWEEN {
66   my ($self, $lhs, $op, $rhs) = @_;
67   $rhs = $self->_strip_outer_paren ($rhs);
68   return $self->SUPER::_where_field_BETWEEN ($lhs, $op, $rhs);
69 }
70
71 # Slow but ANSI standard Limit/Offset support. DB2 uses this
72 sub _RowNumberOver {
73   my ($self, $sql, $order, $rows, $offset ) = @_;
74
75   $offset += 1;
76   my $last = $rows + $offset - 1;
77   my ( $order_by ) = $self->_order_by( $order );
78
79   $sql = <<"SQL";
80 SELECT * FROM
81 (
82    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
83       $sql
84       $order_by
85    ) Q1
86 ) Q2
87 WHERE ROW_NUM BETWEEN $offset AND $last
88
89 SQL
90
91   return $sql;
92 }
93
94 # Crappy Top based Limit/Offset support. MSSQL uses this currently,
95 # but may have to switch to RowNumberOver one day
96 sub _Top {
97   my ( $self, $sql, $order, $rows, $offset ) = @_;
98
99   croak '$order supplied to SQLAHacks limit emulators must be a hash'
100     if (ref $order ne 'HASH');
101
102   $order = { %$order }; #copy
103
104   my $last = $rows + $offset;
105
106   my $req_order = $self->_order_by ($order->{order_by});
107   my $limit_order = $req_order ? $order->{order_by} : $order->{_virtual_order_by};
108
109   delete $order->{$_} for qw/order_by _virtual_order_by/;
110   my $grpby_having = $self->_order_by ($order);
111
112   my ( $order_by_inner, $order_by_outer ) = $self->_order_directions($limit_order);
113
114   $sql =~ s/^\s*(SELECT|select)//;
115
116   $sql = <<"SQL";
117   SELECT * FROM
118   (
119     SELECT TOP $rows * FROM
120     (
121         SELECT TOP $last $sql $grpby_having $order_by_inner
122     ) AS foo
123     $order_by_outer
124   ) AS bar
125   $req_order
126
127 SQL
128     return $sql;
129 }
130
131
132
133 # While we're at it, this should make LIMIT queries more efficient,
134 #  without digging into things too deeply
135 sub _find_syntax {
136   my ($self, $syntax) = @_;
137   return $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
138 }
139
140 sub select {
141   my ($self, $table, $fields, $where, $order, @rest) = @_;
142   local $self->{having_bind} = [];
143   local $self->{from_bind} = [];
144
145   if (ref $table eq 'SCALAR') {
146     $table = $$table;
147   }
148   elsif (not ref $table) {
149     $table = $self->_quote($table);
150   }
151   local $self->{rownum_hack_count} = 1
152     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
153   @rest = (-1) unless defined $rest[0];
154   croak "LIMIT 0 Does Not Compute" if $rest[0] == 0;
155     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
156   my ($sql, @where_bind) = $self->SUPER::select(
157     $table, $self->_recurse_fields($fields), $where, $order, @rest
158   );
159   $sql .= 
160     $self->{for} ?
161     (
162       $self->{for} eq 'update' ? ' FOR UPDATE' :
163       $self->{for} eq 'shared' ? ' FOR SHARE'  :
164       ''
165     ) :
166     ''
167   ;
168   return wantarray ? ($sql, @{$self->{from_bind}}, @where_bind, @{$self->{having_bind}}) : $sql;
169 }
170
171 sub insert {
172   my $self = shift;
173   my $table = shift;
174   $table = $self->_quote($table) unless ref($table);
175   $self->SUPER::insert($table, @_);
176 }
177
178 sub update {
179   my $self = shift;
180   my $table = shift;
181   $table = $self->_quote($table) unless ref($table);
182   $self->SUPER::update($table, @_);
183 }
184
185 sub delete {
186   my $self = shift;
187   my $table = shift;
188   $table = $self->_quote($table) unless ref($table);
189   $self->SUPER::delete($table, @_);
190 }
191
192 sub _emulate_limit {
193   my $self = shift;
194   if ($_[3] == -1) {
195     return $_[1].$self->_order_by($_[2]);
196   } else {
197     return $self->SUPER::_emulate_limit(@_);
198   }
199 }
200
201 sub _recurse_fields {
202   my ($self, $fields, $params) = @_;
203   my $ref = ref $fields;
204   return $self->_quote($fields) unless $ref;
205   return $$fields if $ref eq 'SCALAR';
206
207   if ($ref eq 'ARRAY') {
208     return join(', ', map {
209       $self->_recurse_fields($_)
210         .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack})
211           ? ' AS col'.$self->{rownum_hack_count}++
212           : '')
213       } @$fields);
214   } elsif ($ref eq 'HASH') {
215     foreach my $func (keys %$fields) {
216       if ($func eq 'distinct') {
217         my $_fields = $fields->{$func};
218         if (ref $_fields eq 'ARRAY' && @{$_fields} > 1) {
219           croak (
220             'The select => { distinct => ... } syntax is not supported for multiple columns.'
221            .' Instead please use { group_by => [ qw/' . (join ' ', @$_fields) . '/ ] }'
222            .' or { select => [ qw/' . (join ' ', @$_fields) . '/ ], distinct => 1 }'
223           );
224         }
225         else {
226           $_fields = @{$_fields}[0] if ref $_fields eq 'ARRAY';
227           carp (
228             'The select => { distinct => ... } syntax will be deprecated in DBIC version 0.09,'
229            ." please use { group_by => '${_fields}' } or { select => '${_fields}', distinct => 1 }"
230           );
231         }
232       }
233       return $self->_sqlcase($func)
234         .'( '.$self->_recurse_fields($fields->{$func}).' )';
235     }
236   }
237   # Is the second check absolutely necessary?
238   elsif ( $ref eq 'REF' and ref($$fields) eq 'ARRAY' ) {
239     return $self->_fold_sqlbind( $fields );
240   }
241   else {
242     croak($ref . qq{ unexpected in _recurse_fields()})
243   }
244 }
245
246 sub _order_by {
247   my $self = shift;
248   my $ret = '';
249   my @extra;
250   if (ref $_[0] eq 'HASH') {
251
252     if (defined $_[0]->{group_by}) {
253       $ret = $self->_sqlcase(' group by ')
254         .$self->_recurse_fields($_[0]->{group_by}, { no_rownum_hack => 1 });
255     }
256
257     if (defined $_[0]->{having}) {
258       my $frag;
259       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
260       push(@{$self->{having_bind}}, @extra);
261       $ret .= $self->_sqlcase(' having ').$frag;
262     }
263
264     if (defined $_[0]->{order_by}) {
265       $ret .= $self->_order_by($_[0]->{order_by});
266     }
267
268     if (grep { $_ =~ /^-(desc|asc)/i } keys %{$_[0]}) {
269       return $self->SUPER::_order_by($_[0]);
270     }
271
272   } elsif (ref $_[0] eq 'SCALAR') {
273     $ret = $self->_sqlcase(' order by ').${ $_[0] };
274   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
275     my @order = map {
276       my $r = $self->_order_by($_, @_);
277       $r =~ s/^ ?ORDER BY //i;
278       $r || ();
279     } @{+shift};
280
281     $ret = $self->_sqlcase(' order by ') . join(', ', @order) if @order;
282
283   } else {
284     $ret = $self->SUPER::_order_by(@_);
285   }
286   return $ret;
287 }
288
289 sub _order_directions {
290   my ($self, $order) = @_;
291   return $self->SUPER::_order_directions( $self->_resolve_order($order) );
292 }
293
294 sub _resolve_order {
295   my ($self, $order) = @_;
296
297   if (ref $order eq 'HASH') {
298     $order = [$self->_resolve_order_hash($order)];
299   }
300   elsif (ref $order eq 'ARRAY') {
301     $order = [map {
302       if (ref ($_) eq 'SCALAR') {
303         $$_
304       }
305       elsif (ref ($_) eq 'HASH') {
306         $self->_resolve_order_hash($_)
307       }
308       else {
309         $_
310       }
311     }  @$order];
312   }
313
314   return $order;
315 }
316
317 sub _resolve_order_hash {
318   my ($self, $order) = @_;
319   my @new_order;
320   foreach my $key (keys %{ $order }) {
321     if ($key =~ /^-(desc|asc)/i ) {
322       my $direction = $1;
323       my $type = ref $order->{ $key };
324       if ($type eq 'ARRAY') {
325         push @new_order, map( "$_ $direction", @{ $order->{ $key } } );
326       } elsif (!$type) {
327         push @new_order, "$order->{$key} $direction";
328       } else {
329         croak "hash order_by can only contain Scalar or Array, not $type";
330       }
331     } else {
332       croak "$key is not a valid direction, use -asc or -desc";
333     }
334   }
335
336   return @new_order;
337 }
338
339 sub _table {
340   my ($self, $from) = @_;
341   if (ref $from eq 'ARRAY') {
342     return $self->_recurse_from(@$from);
343   } elsif (ref $from eq 'HASH') {
344     return $self->_make_as($from);
345   } else {
346     return $from; # would love to quote here but _table ends up getting called
347                   # twice during an ->select without a limit clause due to
348                   # the way S::A::Limit->select works. should maybe consider
349                   # bypassing this and doing S::A::select($self, ...) in
350                   # our select method above. meantime, quoting shims have
351                   # been added to select/insert/update/delete here
352   }
353 }
354
355 sub _recurse_from {
356   my ($self, $from, @join) = @_;
357   my @sqlf;
358   push(@sqlf, $self->_make_as($from));
359   foreach my $j (@join) {
360     my ($to, $on) = @$j;
361
362     # check whether a join type exists
363     my $join_clause = '';
364     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
365     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
366       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
367     } else {
368       $join_clause = ' JOIN ';
369     }
370     push(@sqlf, $join_clause);
371
372     if (ref $to eq 'ARRAY') {
373       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
374     } else {
375       push(@sqlf, $self->_make_as($to));
376     }
377     push(@sqlf, ' ON ', $self->_join_condition($on));
378   }
379   return join('', @sqlf);
380 }
381
382 sub _fold_sqlbind {
383   my ($self, $sqlbind) = @_;
384
385   my @sqlbind = @$$sqlbind; # copy
386   my $sql = shift @sqlbind;
387   push @{$self->{from_bind}}, @sqlbind;
388
389   return $sql;
390 }
391
392 sub _make_as {
393   my ($self, $from) = @_;
394   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_
395                         : ref $_ eq 'REF'    ? $self->_fold_sqlbind($_)
396                         : $self->_quote($_))
397                        } reverse each %{$self->_skip_options($from)});
398 }
399
400 sub _skip_options {
401   my ($self, $hash) = @_;
402   my $clean_hash = {};
403   $clean_hash->{$_} = $hash->{$_}
404     for grep {!/^-/} keys %$hash;
405   return $clean_hash;
406 }
407
408 sub _join_condition {
409   my ($self, $cond) = @_;
410   if (ref $cond eq 'HASH') {
411     my %j;
412     for (keys %$cond) {
413       my $v = $cond->{$_};
414       if (ref $v) {
415         croak (ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
416             if ref($v) ne 'SCALAR';
417         $j{$_} = $v;
418       }
419       else {
420         my $x = '= '.$self->_quote($v); $j{$_} = \$x;
421       }
422     };
423     return scalar($self->_recurse_where(\%j));
424   } elsif (ref $cond eq 'ARRAY') {
425     return join(' OR ', map { $self->_join_condition($_) } @$cond);
426   } else {
427     die "Can't handle this yet!";
428   }
429 }
430
431 sub _quote {
432   my ($self, $label) = @_;
433   return '' unless defined $label;
434   return "*" if $label eq '*';
435   return $label unless $self->{quote_char};
436   if(ref $self->{quote_char} eq "ARRAY"){
437     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
438       if !defined $self->{name_sep};
439     my $sep = $self->{name_sep};
440     return join($self->{name_sep},
441         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
442        split(/\Q$sep\E/,$label));
443   }
444   return $self->SUPER::_quote($label);
445 }
446
447 sub limit_dialect {
448     my $self = shift;
449     $self->{limit_dialect} = shift if @_;
450     return $self->{limit_dialect};
451 }
452
453 sub quote_char {
454     my $self = shift;
455     $self->{quote_char} = shift if @_;
456     return $self->{quote_char};
457 }
458
459 sub name_sep {
460     my $self = shift;
461     $self->{name_sep} = shift if @_;
462     return $self->{name_sep};
463 }
464
465 1;
466
467 __END__
468
469 =pod
470
471 =head1 NAME
472
473 DBIx::Class::SQLAHacks - This module is a subclass of SQL::Abstract::Limit
474 and includes a number of DBIC-specific workarounds, not yet suitable for
475 inclusion into SQLA proper.
476
477 =head1 METHODS
478
479 =head2 new
480
481 Tries to determine limit dialect.
482
483 =head2 select
484
485 Quotes table names, handles "limit" dialects (e.g. where rownum between x and
486 y), supports SELECT ... FOR UPDATE and SELECT ... FOR SHARE.
487
488 =head2 insert update delete
489
490 Just quotes table names.
491
492 =head2 limit_dialect
493
494 Specifies the dialect of used for implementing an SQL "limit" clause for
495 restricting the number of query results returned.  Valid values are: RowNum.
496
497 See L<DBIx::Class::Storage::DBI/connect_info> for details.
498
499 =head2 name_sep
500
501 Character separating quoted table names.
502
503 See L<DBIx::Class::Storage::DBI/connect_info> for details.
504
505 =head2 quote_char
506
507 Set to an array-ref to specify separate left and right quotes for table names.
508
509 See L<DBIx::Class::Storage::DBI/connect_info> for details.
510
511 =cut
512