Factor out the order_by sqlahacks resolver
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / SQLAHacks.pm
1 package # Hide from PAUSE
2   DBIx::Class::SQLAHacks;
3
4 use base qw/SQL::Abstract::Limit/;
5 use strict;
6 use warnings;
7 use Carp::Clan qw/^DBIx::Class/;
8
9 sub new {
10   my $self = shift->SUPER::new(@_);
11
12   # This prevents the caching of $dbh in S::A::L, I believe
13   # If limit_dialect is a ref (like a $dbh), go ahead and replace
14   #   it with what it resolves to:
15   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
16     if ref $self->{limit_dialect};
17
18   $self;
19 }
20
21
22 # Some databases (sqlite) do not handle multiple parenthesis
23 # around in/between arguments. A tentative x IN ( ( 1, 2 ,3) )
24 # is interpreted as x IN 1 or something similar.
25 #
26 # Since we currently do not have access to the SQLA AST, resort
27 # to barbaric mutilation of any SQL supplied in literal form
28
29 sub _strip_outer_paren {
30   my ($self, $arg) = @_;
31
32   return $self->_SWITCH_refkind ($arg, {
33     ARRAYREFREF => sub {
34       $$arg->[0] = __strip_outer_paren ($$arg->[0]);
35       return $arg;
36     },
37     SCALARREF => sub {
38       return \__strip_outer_paren( $$arg );
39     },
40     FALLBACK => sub {
41       return $arg
42     },
43   });
44 }
45
46 sub __strip_outer_paren {
47   my $sql = shift;
48
49   if ($sql and not ref $sql) {
50     while ($sql =~ /^ \s* \( (.*) \) \s* $/x ) {
51       $sql = $1;
52     }
53   }
54
55   return $sql;
56 }
57
58 sub _where_field_IN {
59   my ($self, $lhs, $op, $rhs) = @_;
60   $rhs = $self->_strip_outer_paren ($rhs);
61   return $self->SUPER::_where_field_IN ($lhs, $op, $rhs);
62 }
63
64 sub _where_field_BETWEEN {
65   my ($self, $lhs, $op, $rhs) = @_;
66   $rhs = $self->_strip_outer_paren ($rhs);
67   return $self->SUPER::_where_field_BETWEEN ($lhs, $op, $rhs);
68 }
69
70
71
72 # DB2 is the only remaining DB using this. Even though we are not sure if
73 # RowNumberOver is still needed here (should be part of SQLA) leave the 
74 # code in place
75 sub _RowNumberOver {
76   my ($self, $sql, $order, $rows, $offset ) = @_;
77
78   $offset += 1;
79   my $last = $rows + $offset - 1;
80   my ( $order_by ) = $self->_order_by( $order );
81
82   $sql = <<"SQL";
83 SELECT * FROM
84 (
85    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
86       $sql
87       $order_by
88    ) Q1
89 ) Q2
90 WHERE ROW_NUM BETWEEN $offset AND $last
91
92 SQL
93
94   return $sql;
95 }
96
97
98 # While we're at it, this should make LIMIT queries more efficient,
99 #  without digging into things too deeply
100 use Scalar::Util 'blessed';
101 sub _find_syntax {
102   my ($self, $syntax) = @_;
103   
104   # DB2 is the only remaining DB using this. Even though we are not sure if
105   # RowNumberOver is still needed here (should be part of SQLA) leave the 
106   # code in place
107   my $dbhname = blessed($syntax) ? $syntax->{Driver}{Name} : $syntax;
108   if(ref($self) && $dbhname) {
109     if ($dbhname eq 'DB2') {
110       return 'RowNumberOver';
111     }
112   }
113   
114   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
115 }
116
117 sub select {
118   my ($self, $table, $fields, $where, $order, @rest) = @_;
119   local $self->{having_bind} = [];
120   local $self->{from_bind} = [];
121
122   if (ref $table eq 'SCALAR') {
123     $table = $$table;
124   }
125   elsif (not ref $table) {
126     $table = $self->_quote($table);
127   }
128   local $self->{rownum_hack_count} = 1
129     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
130   @rest = (-1) unless defined $rest[0];
131   croak "LIMIT 0 Does Not Compute" if $rest[0] == 0;
132     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
133   my ($sql, @where_bind) = $self->SUPER::select(
134     $table, $self->_recurse_fields($fields), $where, $order, @rest
135   );
136   $sql .= 
137     $self->{for} ?
138     (
139       $self->{for} eq 'update' ? ' FOR UPDATE' :
140       $self->{for} eq 'shared' ? ' FOR SHARE'  :
141       ''
142     ) :
143     ''
144   ;
145   return wantarray ? ($sql, @{$self->{from_bind}}, @where_bind, @{$self->{having_bind}}) : $sql;
146 }
147
148 sub insert {
149   my $self = shift;
150   my $table = shift;
151   $table = $self->_quote($table) unless ref($table);
152   $self->SUPER::insert($table, @_);
153 }
154
155 sub update {
156   my $self = shift;
157   my $table = shift;
158   $table = $self->_quote($table) unless ref($table);
159   $self->SUPER::update($table, @_);
160 }
161
162 sub delete {
163   my $self = shift;
164   my $table = shift;
165   $table = $self->_quote($table) unless ref($table);
166   $self->SUPER::delete($table, @_);
167 }
168
169 sub _emulate_limit {
170   my $self = shift;
171   if ($_[3] == -1) {
172     return $_[1].$self->_order_by($_[2]);
173   } else {
174     return $self->SUPER::_emulate_limit(@_);
175   }
176 }
177
178 sub _recurse_fields {
179   my ($self, $fields, $params) = @_;
180   my $ref = ref $fields;
181   return $self->_quote($fields) unless $ref;
182   return $$fields if $ref eq 'SCALAR';
183
184   if ($ref eq 'ARRAY') {
185     return join(', ', map {
186       $self->_recurse_fields($_)
187         .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack})
188           ? ' AS col'.$self->{rownum_hack_count}++
189           : '')
190       } @$fields);
191   } elsif ($ref eq 'HASH') {
192     foreach my $func (keys %$fields) {
193       if ($func eq 'distinct') {
194         my $_fields = $fields->{$func};
195         if (ref $_fields eq 'ARRAY' && @{$_fields} > 1) {
196           croak (
197             'The select => { distinct => ... } syntax is not supported for multiple columns.'
198            .' Instead please use { group_by => [ qw/' . (join ' ', @$_fields) . '/ ] }'
199            .' or { select => [ qw/' . (join ' ', @$_fields) . '/ ], distinct => 1 }'
200           );
201         }
202         else {
203           $_fields = @{$_fields}[0] if ref $_fields eq 'ARRAY';
204           carp (
205             'The select => { distinct => ... } syntax will be deprecated in DBIC version 0.09,'
206            ." please use { group_by => '${_fields}' } or { select => '${_fields}', distinct => 1 }"
207           );
208         }
209       }
210       return $self->_sqlcase($func)
211         .'( '.$self->_recurse_fields($fields->{$func}).' )';
212     }
213   }
214   # Is the second check absolutely necessary?
215   elsif ( $ref eq 'REF' and ref($$fields) eq 'ARRAY' ) {
216     return $self->_fold_sqlbind( $fields );
217   }
218   else {
219     croak($ref . qq{ unexpected in _recurse_fields()})
220   }
221 }
222
223 sub _order_by {
224   my $self = shift;
225   my $ret = '';
226   my @extra;
227   if (ref $_[0] eq 'HASH') {
228     if (defined $_[0]->{group_by}) {
229       $ret = $self->_sqlcase(' group by ')
230         .$self->_recurse_fields($_[0]->{group_by}, { no_rownum_hack => 1 });
231     }
232     if (defined $_[0]->{having}) {
233       my $frag;
234       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
235       push(@{$self->{having_bind}}, @extra);
236       $ret .= $self->_sqlcase(' having ').$frag;
237     }
238     if (defined $_[0]->{order_by}) {
239       $ret .= $self->_order_by($_[0]->{order_by});
240     }
241     if (grep { $_ =~ /^-(desc|asc)/i } keys %{$_[0]}) {
242       return $self->SUPER::_order_by($_[0]);
243     }
244   } elsif (ref $_[0] eq 'SCALAR') {
245     $ret = $self->_sqlcase(' order by ').${ $_[0] };
246   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
247     my @order = @{+shift};
248     $ret = $self->_sqlcase(' order by ')
249           .join(', ', map {
250                         my $r = $self->_order_by($_, @_);
251                         $r =~ s/^ ?ORDER BY //i;
252                         $r;
253                       } @order);
254   } else {
255     $ret = $self->SUPER::_order_by(@_);
256   }
257   return $ret;
258 }
259
260 sub _order_directions {
261   my ($self, $order) = @_;
262   return $self->SUPER::_order_directions( $self->_resolve_order($order) );
263 }
264
265 sub _resolve_order {
266   my ($self, $order) = @_;
267   $order = $order->{order_by} if (ref $order eq 'HASH' and $order->{order_by});
268
269   if (ref $order eq 'HASH') {
270     $order = [$self->_resolve_order_hash($order)];
271   }
272   elsif (ref $order eq 'ARRAY') {
273     $order = [map {
274       if (ref ($_) eq 'SCALAR') {
275         $$_
276       }
277       elsif (ref ($_) eq 'HASH') {
278         $self->_resolve_order_hash($_)
279       }
280       else {
281         $_
282       }
283     }  @$order];
284   }
285
286   return $order;
287 }
288
289 sub _resolve_order_hash {
290   my ($self, $order) = @_;
291   my @new_order;
292   foreach my $key (keys %{ $order }) {
293     if ($key =~ /^-(desc|asc)/i ) {
294       my $direction = $1;
295       my $type = ref $order->{ $key };
296       if ($type eq 'ARRAY') {
297         push @new_order, map( "$_ $direction", @{ $order->{ $key } } );
298       } elsif (!$type) {
299         push @new_order, "$order->{$key} $direction";
300       } else {
301         croak "hash order_by can only contain Scalar or Array, not $type";
302       }
303     } else {
304       croak "$key is not a valid direction, use -asc or -desc";
305     }
306   }
307   return @new_order;
308 }
309
310 sub _table {
311   my ($self, $from) = @_;
312   if (ref $from eq 'ARRAY') {
313     return $self->_recurse_from(@$from);
314   } elsif (ref $from eq 'HASH') {
315     return $self->_make_as($from);
316   } else {
317     return $from; # would love to quote here but _table ends up getting called
318                   # twice during an ->select without a limit clause due to
319                   # the way S::A::Limit->select works. should maybe consider
320                   # bypassing this and doing S::A::select($self, ...) in
321                   # our select method above. meantime, quoting shims have
322                   # been added to select/insert/update/delete here
323   }
324 }
325
326 sub _recurse_from {
327   my ($self, $from, @join) = @_;
328   my @sqlf;
329   push(@sqlf, $self->_make_as($from));
330   foreach my $j (@join) {
331     my ($to, $on) = @$j;
332
333     # check whether a join type exists
334     my $join_clause = '';
335     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
336     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
337       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
338     } else {
339       $join_clause = ' JOIN ';
340     }
341     push(@sqlf, $join_clause);
342
343     if (ref $to eq 'ARRAY') {
344       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
345     } else {
346       push(@sqlf, $self->_make_as($to));
347     }
348     push(@sqlf, ' ON ', $self->_join_condition($on));
349   }
350   return join('', @sqlf);
351 }
352
353 sub _fold_sqlbind {
354   my ($self, $sqlbind) = @_;
355
356   my @sqlbind = @$$sqlbind; # copy
357   my $sql = shift @sqlbind;
358   push @{$self->{from_bind}}, @sqlbind;
359
360   return $sql;
361 }
362
363 sub _make_as {
364   my ($self, $from) = @_;
365   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_
366                         : ref $_ eq 'REF'    ? $self->_fold_sqlbind($_)
367                         : $self->_quote($_))
368                        } reverse each %{$self->_skip_options($from)});
369 }
370
371 sub _skip_options {
372   my ($self, $hash) = @_;
373   my $clean_hash = {};
374   $clean_hash->{$_} = $hash->{$_}
375     for grep {!/^-/} keys %$hash;
376   return $clean_hash;
377 }
378
379 sub _join_condition {
380   my ($self, $cond) = @_;
381   if (ref $cond eq 'HASH') {
382     my %j;
383     for (keys %$cond) {
384       my $v = $cond->{$_};
385       if (ref $v) {
386         croak (ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
387             if ref($v) ne 'SCALAR';
388         $j{$_} = $v;
389       }
390       else {
391         my $x = '= '.$self->_quote($v); $j{$_} = \$x;
392       }
393     };
394     return scalar($self->_recurse_where(\%j));
395   } elsif (ref $cond eq 'ARRAY') {
396     return join(' OR ', map { $self->_join_condition($_) } @$cond);
397   } else {
398     die "Can't handle this yet!";
399   }
400 }
401
402 sub _quote {
403   my ($self, $label) = @_;
404   return '' unless defined $label;
405   return "*" if $label eq '*';
406   return $label unless $self->{quote_char};
407   if(ref $self->{quote_char} eq "ARRAY"){
408     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
409       if !defined $self->{name_sep};
410     my $sep = $self->{name_sep};
411     return join($self->{name_sep},
412         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
413        split(/\Q$sep\E/,$label));
414   }
415   return $self->SUPER::_quote($label);
416 }
417
418 sub limit_dialect {
419     my $self = shift;
420     $self->{limit_dialect} = shift if @_;
421     return $self->{limit_dialect};
422 }
423
424 sub quote_char {
425     my $self = shift;
426     $self->{quote_char} = shift if @_;
427     return $self->{quote_char};
428 }
429
430 sub name_sep {
431     my $self = shift;
432     $self->{name_sep} = shift if @_;
433     return $self->{name_sep};
434 }
435
436 1;
437
438 __END__
439
440 =pod
441
442 =head1 NAME
443
444 DBIx::Class::SQLAHacks - This module is a subclass of SQL::Abstract::Limit
445 and includes a number of DBIC-specific workarounds, not yet suitable for
446 inclusion into SQLA proper.
447
448 =head1 METHODS
449
450 =head2 new
451
452 Tries to determine limit dialect.
453
454 =head2 select
455
456 Quotes table names, handles "limit" dialects (e.g. where rownum between x and
457 y), supports SELECT ... FOR UPDATE and SELECT ... FOR SHARE.
458
459 =head2 insert update delete
460
461 Just quotes table names.
462
463 =head2 limit_dialect
464
465 Specifies the dialect of used for implementing an SQL "limit" clause for
466 restricting the number of query results returned.  Valid values are: RowNum.
467
468 See L<DBIx::Class::Storage::DBI/connect_info> for details.
469
470 =head2 name_sep
471
472 Character separating quoted table names.
473
474 See L<DBIx::Class::Storage::DBI/connect_info> for details.
475
476 =head2 quote_char
477
478 Set to an array-ref to specify separate left and right quotes for table names.
479
480 See L<DBIx::Class::Storage::DBI/connect_info> for details.
481
482 =cut
483