Fix order by clauses for MSSQL
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / SQLAHacks.pm
1 package # Hide from PAUSE
2   DBIx::Class::SQLAHacks;
3
4 use base qw/SQL::Abstract::Limit/;
5 use Carp::Clan qw/^DBIx::Class/;
6
7 sub new {
8   my $self = shift->SUPER::new(@_);
9
10   # This prevents the caching of $dbh in S::A::L, I believe
11   # If limit_dialect is a ref (like a $dbh), go ahead and replace
12   #   it with what it resolves to:
13   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
14     if ref $self->{limit_dialect};
15
16   $self;
17 }
18
19
20 # Some databases (sqlite) do not handle multiple parenthesis
21 # around in/between arguments. A tentative x IN ( ( 1, 2 ,3) )
22 # is interpreted as x IN 1 or something similar.
23 #
24 # Since we currently do not have access to the SQLA AST, resort
25 # to barbaric mutilation of any SQL supplied in literal form
26
27 sub _strip_outer_paren {
28   my ($self, $arg) = @_;
29
30   return $self->_SWITCH_refkind ($arg, {
31     ARRAYREFREF => sub {
32       $$arg->[0] = __strip_outer_paren ($$arg->[0]);
33       return $arg;
34     },
35     SCALARREF => sub {
36       return \__strip_outer_paren( $$arg );
37     },
38     FALLBACK => sub {
39       return $arg
40     },
41   });
42 }
43
44 sub __strip_outer_paren {
45   my $sql = shift;
46
47   if ($sql and not ref $sql) {
48     while ($sql =~ /^ \s* \( (.*) \) \s* $/x ) {
49       $sql = $1;
50     }
51   }
52
53   return $sql;
54 }
55
56 sub _where_field_IN {
57   my ($self, $lhs, $op, $rhs) = @_;
58   $rhs = $self->_strip_outer_paren ($rhs);
59   return $self->SUPER::_where_field_IN ($lhs, $op, $rhs);
60 }
61
62 sub _where_field_BETWEEN {
63   my ($self, $lhs, $op, $rhs) = @_;
64   $rhs = $self->_strip_outer_paren ($rhs);
65   return $self->SUPER::_where_field_BETWEEN ($lhs, $op, $rhs);
66 }
67
68
69
70 # DB2 is the only remaining DB using this. Even though we are not sure if
71 # RowNumberOver is still needed here (should be part of SQLA) leave the 
72 # code in place
73 sub _RowNumberOver {
74   my ($self, $sql, $order, $rows, $offset ) = @_;
75
76   $offset += 1;
77   my $last = $rows + $offset;
78   my ( $order_by ) = $self->_order_by( $order );
79
80   $sql = <<"SQL";
81 SELECT * FROM
82 (
83    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
84       $sql
85       $order_by
86    ) Q1
87 ) Q2
88 WHERE ROW_NUM BETWEEN $offset AND $last
89
90 SQL
91
92   return $sql;
93 }
94
95
96 # While we're at it, this should make LIMIT queries more efficient,
97 #  without digging into things too deeply
98 use Scalar::Util 'blessed';
99 sub _find_syntax {
100   my ($self, $syntax) = @_;
101   
102   # DB2 is the only remaining DB using this. Even though we are not sure if
103   # RowNumberOver is still needed here (should be part of SQLA) leave the 
104   # code in place
105   my $dbhname = blessed($syntax) ? $syntax->{Driver}{Name} : $syntax;
106   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
107     return 'RowNumberOver';
108   }
109   
110   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
111 }
112
113 sub select {
114   my ($self, $table, $fields, $where, $order, @rest) = @_;
115   local $self->{having_bind} = [];
116   local $self->{from_bind} = [];
117
118   if (ref $table eq 'SCALAR') {
119     $table = $$table;
120   }
121   elsif (not ref $table) {
122     $table = $self->_quote($table);
123   }
124   local $self->{rownum_hack_count} = 1
125     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
126   @rest = (-1) unless defined $rest[0];
127   croak "LIMIT 0 Does Not Compute" if $rest[0] == 0;
128     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
129   my ($sql, @where_bind) = $self->SUPER::select(
130     $table, $self->_recurse_fields($fields), $where, $order, @rest
131   );
132   $sql .= 
133     $self->{for} ?
134     (
135       $self->{for} eq 'update' ? ' FOR UPDATE' :
136       $self->{for} eq 'shared' ? ' FOR SHARE'  :
137       ''
138     ) :
139     ''
140   ;
141   return wantarray ? ($sql, @{$self->{from_bind}}, @where_bind, @{$self->{having_bind}}) : $sql;
142 }
143
144 sub insert {
145   my $self = shift;
146   my $table = shift;
147   $table = $self->_quote($table) unless ref($table);
148   $self->SUPER::insert($table, @_);
149 }
150
151 sub update {
152   my $self = shift;
153   my $table = shift;
154   $table = $self->_quote($table) unless ref($table);
155   $self->SUPER::update($table, @_);
156 }
157
158 sub delete {
159   my $self = shift;
160   my $table = shift;
161   $table = $self->_quote($table) unless ref($table);
162   $self->SUPER::delete($table, @_);
163 }
164
165 sub _emulate_limit {
166   my $self = shift;
167   if ($_[3] == -1) {
168     return $_[1].$self->_order_by($_[2]);
169   } else {
170     return $self->SUPER::_emulate_limit(@_);
171   }
172 }
173
174 sub _recurse_fields {
175   my ($self, $fields, $params) = @_;
176   my $ref = ref $fields;
177   return $self->_quote($fields) unless $ref;
178   return $$fields if $ref eq 'SCALAR';
179
180   if ($ref eq 'ARRAY') {
181     return join(', ', map {
182       $self->_recurse_fields($_)
183         .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack})
184           ? ' AS col'.$self->{rownum_hack_count}++
185           : '')
186       } @$fields);
187   } elsif ($ref eq 'HASH') {
188     foreach my $func (keys %$fields) {
189       if ($func eq 'distinct') {
190         my $_fields = $fields->{$func};
191         if (ref $_fields eq 'ARRAY' && @{$_fields} > 1) {
192           croak "Unsupported syntax, please use " . 
193               "{ group_by => [ qw/" . (join ' ', @$_fields) . "/ ] }" .
194               " or " .
195               "{ select => [ qw/" . (join ' ', @$_fields) . "/ ], distinct => 1 }";
196         }
197         else {
198           $_fields = @{$_fields}[0] if ref $_fields eq 'ARRAY';
199           carp "This syntax will be deprecated in 09, please use " . 
200                "{ group_by => '${_fields}' }" . 
201                " or " .
202                "{ select => '${_fields}', distinct => 1 }";
203         }
204       }
205       
206       return $self->_sqlcase($func)
207         .'( '.$self->_recurse_fields($fields->{$func}).' )';
208     }
209   }
210   # Is the second check absolutely necessary?
211   elsif ( $ref eq 'REF' and ref($$fields) eq 'ARRAY' ) {
212     return $self->_fold_sqlbind( $fields );
213   }
214   else {
215     croak($ref . qq{ unexpected in _recurse_fields()})
216   }
217 }
218
219 sub _order_by {
220   my $self = shift;
221   my $ret = '';
222   my @extra;
223   if (ref $_[0] eq 'HASH') {
224     if (defined $_[0]->{group_by}) {
225       $ret = $self->_sqlcase(' group by ')
226         .$self->_recurse_fields($_[0]->{group_by}, { no_rownum_hack => 1 });
227     }
228     if (defined $_[0]->{having}) {
229       my $frag;
230       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
231       push(@{$self->{having_bind}}, @extra);
232       $ret .= $self->_sqlcase(' having ').$frag;
233     }
234     if (defined $_[0]->{order_by}) {
235       $ret .= $self->_order_by($_[0]->{order_by});
236     }
237     if (grep { $_ =~ /^-(desc|asc)/i } keys %{$_[0]}) {
238       return $self->SUPER::_order_by($_[0]);
239     }
240   } elsif (ref $_[0] eq 'SCALAR') {
241     $ret = $self->_sqlcase(' order by ').${ $_[0] };
242   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
243     my @order = @{+shift};
244     $ret = $self->_sqlcase(' order by ')
245           .join(', ', map {
246                         my $r = $self->_order_by($_, @_);
247                         $r =~ s/^ ?ORDER BY //i;
248                         $r;
249                       } @order);
250   } else {
251     $ret = $self->SUPER::_order_by(@_);
252   }
253   return $ret;
254 }
255
256 sub _order_directions {
257   my ($self, $order) = @_;
258   $order = $order->{order_by} if ref $order eq 'HASH';
259   if (ref $order eq 'HASH') {
260     $order = [$self->_order_directions_hash($order)];
261   } elsif (ref $order eq 'ARRAY') {
262     $order = [map {
263       if (ref $_ eq 'HASH') {
264         $self->_order_directions_hash($_);
265       } else {
266         $_;
267       }
268     } @{ $order }];
269   }
270   return $self->SUPER::_order_directions($order);
271 }
272
273 sub _order_directions_hash {
274   my ($self, $order) = @_;
275     if (grep { $_ =~ /^-(desc|asc)/i } keys %{$order}) {
276        return map {
277           my $key = $_;
278           my @tmp;
279           s/^-(desc|asc)/\1/i;
280           my $dir = $_;
281           if (ref $order->{ $key } eq 'ARRAY') {
282             @tmp = map "$_ $dir", @{ $order->{ $key } };
283           } else { # should be scalar
284             @tmp = ( "$order->{$key} $dir" );
285           }
286           @tmp;
287        } keys %{$order};
288    }
289 }
290
291 sub _table {
292   my ($self, $from) = @_;
293   if (ref $from eq 'ARRAY') {
294     return $self->_recurse_from(@$from);
295   } elsif (ref $from eq 'HASH') {
296     return $self->_make_as($from);
297   } else {
298     return $from; # would love to quote here but _table ends up getting called
299                   # twice during an ->select without a limit clause due to
300                   # the way S::A::Limit->select works. should maybe consider
301                   # bypassing this and doing S::A::select($self, ...) in
302                   # our select method above. meantime, quoting shims have
303                   # been added to select/insert/update/delete here
304   }
305 }
306
307 sub _recurse_from {
308   my ($self, $from, @join) = @_;
309   my @sqlf;
310   push(@sqlf, $self->_make_as($from));
311   foreach my $j (@join) {
312     my ($to, $on) = @$j;
313
314     # check whether a join type exists
315     my $join_clause = '';
316     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
317     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
318       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
319     } else {
320       $join_clause = ' JOIN ';
321     }
322     push(@sqlf, $join_clause);
323
324     if (ref $to eq 'ARRAY') {
325       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
326     } else {
327       push(@sqlf, $self->_make_as($to));
328     }
329     push(@sqlf, ' ON ', $self->_join_condition($on));
330   }
331   return join('', @sqlf);
332 }
333
334 sub _fold_sqlbind {
335   my ($self, $sqlbind) = @_;
336   my $sql = shift @$$sqlbind;
337   push @{$self->{from_bind}}, @$$sqlbind;
338   return $sql;
339 }
340
341 sub _make_as {
342   my ($self, $from) = @_;
343   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_
344                         : ref $_ eq 'REF'    ? $self->_fold_sqlbind($_)
345                         : $self->_quote($_))
346                        } reverse each %{$self->_skip_options($from)});
347 }
348
349 sub _skip_options {
350   my ($self, $hash) = @_;
351   my $clean_hash = {};
352   $clean_hash->{$_} = $hash->{$_}
353     for grep {!/^-/} keys %$hash;
354   return $clean_hash;
355 }
356
357 sub _join_condition {
358   my ($self, $cond) = @_;
359   if (ref $cond eq 'HASH') {
360     my %j;
361     for (keys %$cond) {
362       my $v = $cond->{$_};
363       if (ref $v) {
364         croak (ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
365             if ref($v) ne 'SCALAR';
366         $j{$_} = $v;
367       }
368       else {
369         my $x = '= '.$self->_quote($v); $j{$_} = \$x;
370       }
371     };
372     return scalar($self->_recurse_where(\%j));
373   } elsif (ref $cond eq 'ARRAY') {
374     return join(' OR ', map { $self->_join_condition($_) } @$cond);
375   } else {
376     die "Can't handle this yet!";
377   }
378 }
379
380 sub _quote {
381   my ($self, $label) = @_;
382   return '' unless defined $label;
383   return "*" if $label eq '*';
384   return $label unless $self->{quote_char};
385   if(ref $self->{quote_char} eq "ARRAY"){
386     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
387       if !defined $self->{name_sep};
388     my $sep = $self->{name_sep};
389     return join($self->{name_sep},
390         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
391        split(/\Q$sep\E/,$label));
392   }
393   return $self->SUPER::_quote($label);
394 }
395
396 sub limit_dialect {
397     my $self = shift;
398     $self->{limit_dialect} = shift if @_;
399     return $self->{limit_dialect};
400 }
401
402 sub quote_char {
403     my $self = shift;
404     $self->{quote_char} = shift if @_;
405     return $self->{quote_char};
406 }
407
408 sub name_sep {
409     my $self = shift;
410     $self->{name_sep} = shift if @_;
411     return $self->{name_sep};
412 }
413
414 1;
415
416 __END__
417
418 =pod
419
420 =head1 NAME
421
422 DBIx::Class::SQLAHacks - This module is a subclass of SQL::Abstract::Limit
423 and includes a number of DBIC-specific workarounds, not yet suitable for
424 inclusion into SQLA proper.
425
426 =head1 METHODS
427
428 =head2 new
429
430 Tries to determine limit dialect.
431
432 =head2 select
433
434 Quotes table names, handles "limit" dialects (e.g. where rownum between x and
435 y), supports SELECT ... FOR UPDATE and SELECT ... FOR SHARE.
436
437 =head2 insert update delete
438
439 Just quotes table names.
440
441 =head2 limit_dialect
442
443 Specifies the dialect of used for implementing an SQL "limit" clause for
444 restricting the number of query results returned.  Valid values are: RowNum.
445
446 See L<DBIx::Class::Storage::DBI/connect_info> for details.
447
448 =head2 name_sep
449
450 Character separating quoted table names.
451
452 See L<DBIx::Class::Storage::DBI/connect_info> for details.
453
454 =head2 quote_char
455
456 Set to an array-ref to specify separate left and right quotes for table names.
457
458 See L<DBIx::Class::Storage::DBI/connect_info> for details.
459
460 =cut
461