Merge 'top_limit_altfix' into 'trunk'
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / SQLAHacks.pm
1 package # Hide from PAUSE
2   DBIx::Class::SQLAHacks;
3
4 use base qw/SQL::Abstract::Limit/;
5 use strict;
6 use warnings;
7 use Carp::Clan qw/^DBIx::Class/;
8
9 sub new {
10   my $self = shift->SUPER::new(@_);
11
12   # This prevents the caching of $dbh in S::A::L, I believe
13   # If limit_dialect is a ref (like a $dbh), go ahead and replace
14   #   it with what it resolves to:
15   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
16     if ref $self->{limit_dialect};
17
18   $self;
19 }
20
21
22 # Some databases (sqlite) do not handle multiple parenthesis
23 # around in/between arguments. A tentative x IN ( ( 1, 2 ,3) )
24 # is interpreted as x IN 1 or something similar.
25 #
26 # Since we currently do not have access to the SQLA AST, resort
27 # to barbaric mutilation of any SQL supplied in literal form
28
29 sub _strip_outer_paren {
30   my ($self, $arg) = @_;
31
32   return $self->_SWITCH_refkind ($arg, {
33     ARRAYREFREF => sub {
34       $$arg->[0] = __strip_outer_paren ($$arg->[0]);
35       return $arg;
36     },
37     SCALARREF => sub {
38       return \__strip_outer_paren( $$arg );
39     },
40     FALLBACK => sub {
41       return $arg
42     },
43   });
44 }
45
46 sub __strip_outer_paren {
47   my $sql = shift;
48
49   if ($sql and not ref $sql) {
50     while ($sql =~ /^ \s* \( (.*) \) \s* $/x ) {
51       $sql = $1;
52     }
53   }
54
55   return $sql;
56 }
57
58 sub _where_field_IN {
59   my ($self, $lhs, $op, $rhs) = @_;
60   $rhs = $self->_strip_outer_paren ($rhs);
61   return $self->SUPER::_where_field_IN ($lhs, $op, $rhs);
62 }
63
64 sub _where_field_BETWEEN {
65   my ($self, $lhs, $op, $rhs) = @_;
66   $rhs = $self->_strip_outer_paren ($rhs);
67   return $self->SUPER::_where_field_BETWEEN ($lhs, $op, $rhs);
68 }
69
70 # Slow but ANSI standard Limit/Offset support. DB2 uses this
71 sub _RowNumberOver {
72   my ($self, $sql, $order, $rows, $offset ) = @_;
73
74   $offset += 1;
75   my $last = $rows + $offset - 1;
76   my ( $order_by ) = $self->_order_by( $order );
77
78   $sql = <<"SQL";
79 SELECT * FROM
80 (
81    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
82       $sql
83       $order_by
84    ) Q1
85 ) Q2
86 WHERE ROW_NUM BETWEEN $offset AND $last
87
88 SQL
89
90   return $sql;
91 }
92
93 # Crappy Top based Limit/Offset support. MSSQL uses this currently,
94 # but may have to switch to RowNumberOver one day
95 sub _Top {
96   my ( $self, $sql, $order, $rows, $offset ) = @_;
97
98   croak '$order supplied to SQLAHacks limit emulators must be a hash'
99     if (ref $order ne 'HASH');
100
101   $order = { %$order }; #copy
102
103   my $last = $rows + $offset;
104
105   my $req_order = $self->_order_by ($order->{order_by});
106   my $limit_order = $req_order ? $order->{order_by} : $order->{_virtual_order_by};
107
108   delete $order->{$_} for qw/order_by _virtual_order_by/;
109   my $grpby_having = $self->_order_by ($order);
110
111   my ( $order_by_inner, $order_by_outer ) = $self->_order_directions($limit_order);
112
113   $sql =~ s/^\s*(SELECT|select)//;
114
115   $sql = <<"SQL";
116   SELECT * FROM
117   (
118     SELECT TOP $rows * FROM
119     (
120         SELECT TOP $last $sql $grpby_having $order_by_inner
121     ) AS foo
122     $order_by_outer
123   ) AS bar
124   $req_order
125
126 SQL
127     return $sql;
128 }
129
130
131
132 # While we're at it, this should make LIMIT queries more efficient,
133 #  without digging into things too deeply
134 sub _find_syntax {
135   my ($self, $syntax) = @_;
136   return $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
137 }
138
139 sub select {
140   my ($self, $table, $fields, $where, $order, @rest) = @_;
141   local $self->{having_bind} = [];
142   local $self->{from_bind} = [];
143
144   if (ref $table eq 'SCALAR') {
145     $table = $$table;
146   }
147   elsif (not ref $table) {
148     $table = $self->_quote($table);
149   }
150   local $self->{rownum_hack_count} = 1
151     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
152   @rest = (-1) unless defined $rest[0];
153   croak "LIMIT 0 Does Not Compute" if $rest[0] == 0;
154     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
155   my ($sql, @where_bind) = $self->SUPER::select(
156     $table, $self->_recurse_fields($fields), $where, $order, @rest
157   );
158   $sql .= 
159     $self->{for} ?
160     (
161       $self->{for} eq 'update' ? ' FOR UPDATE' :
162       $self->{for} eq 'shared' ? ' FOR SHARE'  :
163       ''
164     ) :
165     ''
166   ;
167   return wantarray ? ($sql, @{$self->{from_bind}}, @where_bind, @{$self->{having_bind}}) : $sql;
168 }
169
170 sub insert {
171   my $self = shift;
172   my $table = shift;
173   $table = $self->_quote($table) unless ref($table);
174   $self->SUPER::insert($table, @_);
175 }
176
177 sub update {
178   my $self = shift;
179   my $table = shift;
180   $table = $self->_quote($table) unless ref($table);
181   $self->SUPER::update($table, @_);
182 }
183
184 sub delete {
185   my $self = shift;
186   my $table = shift;
187   $table = $self->_quote($table) unless ref($table);
188   $self->SUPER::delete($table, @_);
189 }
190
191 sub _emulate_limit {
192   my $self = shift;
193   if ($_[3] == -1) {
194     return $_[1].$self->_order_by($_[2]);
195   } else {
196     return $self->SUPER::_emulate_limit(@_);
197   }
198 }
199
200 sub _recurse_fields {
201   my ($self, $fields, $params) = @_;
202   my $ref = ref $fields;
203   return $self->_quote($fields) unless $ref;
204   return $$fields if $ref eq 'SCALAR';
205
206   if ($ref eq 'ARRAY') {
207     return join(', ', map {
208       $self->_recurse_fields($_)
209         .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack})
210           ? ' AS col'.$self->{rownum_hack_count}++
211           : '')
212       } @$fields);
213   } elsif ($ref eq 'HASH') {
214     foreach my $func (keys %$fields) {
215       if ($func eq 'distinct') {
216         my $_fields = $fields->{$func};
217         if (ref $_fields eq 'ARRAY' && @{$_fields} > 1) {
218           croak (
219             'The select => { distinct => ... } syntax is not supported for multiple columns.'
220            .' Instead please use { group_by => [ qw/' . (join ' ', @$_fields) . '/ ] }'
221            .' or { select => [ qw/' . (join ' ', @$_fields) . '/ ], distinct => 1 }'
222           );
223         }
224         else {
225           $_fields = @{$_fields}[0] if ref $_fields eq 'ARRAY';
226           carp (
227             'The select => { distinct => ... } syntax will be deprecated in DBIC version 0.09,'
228            ." please use { group_by => '${_fields}' } or { select => '${_fields}', distinct => 1 }"
229           );
230         }
231       }
232       return $self->_sqlcase($func)
233         .'( '.$self->_recurse_fields($fields->{$func}).' )';
234     }
235   }
236   # Is the second check absolutely necessary?
237   elsif ( $ref eq 'REF' and ref($$fields) eq 'ARRAY' ) {
238     return $self->_fold_sqlbind( $fields );
239   }
240   else {
241     croak($ref . qq{ unexpected in _recurse_fields()})
242   }
243 }
244
245 sub _order_by {
246   my $self = shift;
247   my $ret = '';
248   my @extra;
249   if (ref $_[0] eq 'HASH') {
250
251     if (defined $_[0]->{group_by}) {
252       $ret = $self->_sqlcase(' group by ')
253         .$self->_recurse_fields($_[0]->{group_by}, { no_rownum_hack => 1 });
254     }
255
256     if (defined $_[0]->{having}) {
257       my $frag;
258       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
259       push(@{$self->{having_bind}}, @extra);
260       $ret .= $self->_sqlcase(' having ').$frag;
261     }
262
263     if (defined $_[0]->{order_by}) {
264       $ret .= $self->_order_by($_[0]->{order_by});
265     }
266
267     if (grep { $_ =~ /^-(desc|asc)/i } keys %{$_[0]}) {
268       return $self->SUPER::_order_by($_[0]);
269     }
270
271   } elsif (ref $_[0] eq 'SCALAR') {
272     $ret = $self->_sqlcase(' order by ').${ $_[0] };
273   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
274     my @order = map {
275       my $r = $self->_order_by($_, @_);
276       $r =~ s/^ ?ORDER BY //i;
277       $r || ();
278     } @{+shift};
279
280     $ret = $self->_sqlcase(' order by ') . join(', ', @order) if @order;
281
282   } else {
283     $ret = $self->SUPER::_order_by(@_);
284   }
285   return $ret;
286 }
287
288 sub _order_directions {
289   my ($self, $order) = @_;
290   return $self->SUPER::_order_directions( $self->_resolve_order($order) );
291 }
292
293 sub _resolve_order {
294   my ($self, $order) = @_;
295
296   if (ref $order eq 'HASH') {
297     $order = [$self->_resolve_order_hash($order)];
298   }
299   elsif (ref $order eq 'ARRAY') {
300     $order = [map {
301       if (ref ($_) eq 'SCALAR') {
302         $$_
303       }
304       elsif (ref ($_) eq 'HASH') {
305         $self->_resolve_order_hash($_)
306       }
307       else {
308         $_
309       }
310     }  @$order];
311   }
312
313   return $order;
314 }
315
316 sub _resolve_order_hash {
317   my ($self, $order) = @_;
318   my @new_order;
319   foreach my $key (keys %{ $order }) {
320     if ($key =~ /^-(desc|asc)/i ) {
321       my $direction = $1;
322       my $type = ref $order->{ $key };
323       if ($type eq 'ARRAY') {
324         push @new_order, map( "$_ $direction", @{ $order->{ $key } } );
325       } elsif (!$type) {
326         push @new_order, "$order->{$key} $direction";
327       } else {
328         croak "hash order_by can only contain Scalar or Array, not $type";
329       }
330     } else {
331       croak "$key is not a valid direction, use -asc or -desc";
332     }
333   }
334
335   return @new_order;
336 }
337
338 sub _table {
339   my ($self, $from) = @_;
340   if (ref $from eq 'ARRAY') {
341     return $self->_recurse_from(@$from);
342   } elsif (ref $from eq 'HASH') {
343     return $self->_make_as($from);
344   } else {
345     return $from; # would love to quote here but _table ends up getting called
346                   # twice during an ->select without a limit clause due to
347                   # the way S::A::Limit->select works. should maybe consider
348                   # bypassing this and doing S::A::select($self, ...) in
349                   # our select method above. meantime, quoting shims have
350                   # been added to select/insert/update/delete here
351   }
352 }
353
354 sub _recurse_from {
355   my ($self, $from, @join) = @_;
356   my @sqlf;
357   push(@sqlf, $self->_make_as($from));
358   foreach my $j (@join) {
359     my ($to, $on) = @$j;
360
361     # check whether a join type exists
362     my $join_clause = '';
363     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
364     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
365       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
366     } else {
367       $join_clause = ' JOIN ';
368     }
369     push(@sqlf, $join_clause);
370
371     if (ref $to eq 'ARRAY') {
372       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
373     } else {
374       push(@sqlf, $self->_make_as($to));
375     }
376     push(@sqlf, ' ON ', $self->_join_condition($on));
377   }
378   return join('', @sqlf);
379 }
380
381 sub _fold_sqlbind {
382   my ($self, $sqlbind) = @_;
383
384   my @sqlbind = @$$sqlbind; # copy
385   my $sql = shift @sqlbind;
386   push @{$self->{from_bind}}, @sqlbind;
387
388   return $sql;
389 }
390
391 sub _make_as {
392   my ($self, $from) = @_;
393   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_
394                         : ref $_ eq 'REF'    ? $self->_fold_sqlbind($_)
395                         : $self->_quote($_))
396                        } reverse each %{$self->_skip_options($from)});
397 }
398
399 sub _skip_options {
400   my ($self, $hash) = @_;
401   my $clean_hash = {};
402   $clean_hash->{$_} = $hash->{$_}
403     for grep {!/^-/} keys %$hash;
404   return $clean_hash;
405 }
406
407 sub _join_condition {
408   my ($self, $cond) = @_;
409   if (ref $cond eq 'HASH') {
410     my %j;
411     for (keys %$cond) {
412       my $v = $cond->{$_};
413       if (ref $v) {
414         croak (ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
415             if ref($v) ne 'SCALAR';
416         $j{$_} = $v;
417       }
418       else {
419         my $x = '= '.$self->_quote($v); $j{$_} = \$x;
420       }
421     };
422     return scalar($self->_recurse_where(\%j));
423   } elsif (ref $cond eq 'ARRAY') {
424     return join(' OR ', map { $self->_join_condition($_) } @$cond);
425   } else {
426     die "Can't handle this yet!";
427   }
428 }
429
430 sub _quote {
431   my ($self, $label) = @_;
432   return '' unless defined $label;
433   return "*" if $label eq '*';
434   return $label unless $self->{quote_char};
435   if(ref $self->{quote_char} eq "ARRAY"){
436     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
437       if !defined $self->{name_sep};
438     my $sep = $self->{name_sep};
439     return join($self->{name_sep},
440         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
441        split(/\Q$sep\E/,$label));
442   }
443   return $self->SUPER::_quote($label);
444 }
445
446 sub limit_dialect {
447     my $self = shift;
448     $self->{limit_dialect} = shift if @_;
449     return $self->{limit_dialect};
450 }
451
452 sub quote_char {
453     my $self = shift;
454     $self->{quote_char} = shift if @_;
455     return $self->{quote_char};
456 }
457
458 sub name_sep {
459     my $self = shift;
460     $self->{name_sep} = shift if @_;
461     return $self->{name_sep};
462 }
463
464 1;
465
466 __END__
467
468 =pod
469
470 =head1 NAME
471
472 DBIx::Class::SQLAHacks - This module is a subclass of SQL::Abstract::Limit
473 and includes a number of DBIC-specific workarounds, not yet suitable for
474 inclusion into SQLA proper.
475
476 =head1 METHODS
477
478 =head2 new
479
480 Tries to determine limit dialect.
481
482 =head2 select
483
484 Quotes table names, handles "limit" dialects (e.g. where rownum between x and
485 y), supports SELECT ... FOR UPDATE and SELECT ... FOR SHARE.
486
487 =head2 insert update delete
488
489 Just quotes table names.
490
491 =head2 limit_dialect
492
493 Specifies the dialect of used for implementing an SQL "limit" clause for
494 restricting the number of query results returned.  Valid values are: RowNum.
495
496 See L<DBIx::Class::Storage::DBI/connect_info> for details.
497
498 =head2 name_sep
499
500 Character separating quoted table names.
501
502 See L<DBIx::Class::Storage::DBI/connect_info> for details.
503
504 =head2 quote_char
505
506 Set to an array-ref to specify separate left and right quotes for table names.
507
508 See L<DBIx::Class::Storage::DBI/connect_info> for details.
509
510 =cut
511