f9a5675c839a13f0bc4d6e52cf26075c0e932bc4
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / SQLAHacks.pm
1 package # Hide from PAUSE
2   DBIx::Class::SQLAHacks;
3
4 use base qw/SQL::Abstract::Limit/;
5 use strict;
6 use warnings;
7 use Carp::Clan qw/^DBIx::Class/;
8 use Scalar::Util();
9
10 sub new {
11   my $self = shift->SUPER::new(@_);
12
13   # This prevents the caching of $dbh in S::A::L, I believe
14   # If limit_dialect is a ref (like a $dbh), go ahead and replace
15   #   it with what it resolves to:
16   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
17     if ref $self->{limit_dialect};
18
19   $self;
20 }
21
22
23 # Some databases (sqlite) do not handle multiple parenthesis
24 # around in/between arguments. A tentative x IN ( ( 1, 2 ,3) )
25 # is interpreted as x IN 1 or something similar.
26 #
27 # Since we currently do not have access to the SQLA AST, resort
28 # to barbaric mutilation of any SQL supplied in literal form
29
30 sub _strip_outer_paren {
31   my ($self, $arg) = @_;
32
33   return $self->_SWITCH_refkind ($arg, {
34     ARRAYREFREF => sub {
35       $$arg->[0] = __strip_outer_paren ($$arg->[0]);
36       return $arg;
37     },
38     SCALARREF => sub {
39       return \__strip_outer_paren( $$arg );
40     },
41     FALLBACK => sub {
42       return $arg
43     },
44   });
45 }
46
47 sub __strip_outer_paren {
48   my $sql = shift;
49
50   if ($sql and not ref $sql) {
51     while ($sql =~ /^ \s* \( (.*) \) \s* $/x ) {
52       $sql = $1;
53     }
54   }
55
56   return $sql;
57 }
58
59 sub _where_field_IN {
60   my ($self, $lhs, $op, $rhs) = @_;
61   $rhs = $self->_strip_outer_paren ($rhs);
62   return $self->SUPER::_where_field_IN ($lhs, $op, $rhs);
63 }
64
65 sub _where_field_BETWEEN {
66   my ($self, $lhs, $op, $rhs) = @_;
67   $rhs = $self->_strip_outer_paren ($rhs);
68   return $self->SUPER::_where_field_BETWEEN ($lhs, $op, $rhs);
69 }
70
71 # Slow but ANSI standard Limit/Offset support. DB2 uses this
72 sub _RowNumberOver {
73   my ($self, $sql, $order, $rows, $offset ) = @_;
74
75   $offset += 1;
76   my $last = $rows + $offset - 1;
77   my ( $order_by ) = $self->_order_by( $order );
78
79   $sql = <<"SQL";
80 SELECT * FROM
81 (
82    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
83       $sql
84       $order_by
85    ) Q1
86 ) Q2
87 WHERE ROW_NUM BETWEEN $offset AND $last
88
89 SQL
90
91   return $sql;
92 }
93
94 # Crappy Top based Limit/Offset support. MSSQL uses this currently,
95 # but may have to switch to RowNumberOver one day
96 sub _Top {
97   my ( $self, $sql, $order, $rows, $offset ) = @_;
98
99   croak '$order supplied to SQLAHacks limit emulators must be a hash'
100     if (ref $order ne 'HASH');
101
102   my $last = $rows + $offset;
103
104   my $req_order = $self->_order_by ($order->{order_by});
105   my $limit_order = $req_order ? $order->{order_by} : $order->{_virtual_order_by};
106
107   my ( $order_by_inner, $order_by_outer ) = $self->_order_directions($limit_order);
108
109   $sql =~ s/^\s*(SELECT|select)//;
110
111   $sql = <<"SQL";
112   SELECT * FROM
113   (
114     SELECT TOP $rows * FROM
115     (
116         SELECT TOP $last $sql $order_by_inner
117     ) AS foo
118     $order_by_outer
119   ) AS bar
120   $req_order
121
122 SQL
123     return $sql;
124 }
125
126
127
128 # While we're at it, this should make LIMIT queries more efficient,
129 #  without digging into things too deeply
130 sub _find_syntax {
131   my ($self, $syntax) = @_;
132   return $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
133 }
134
135 sub select {
136   my ($self, $table, $fields, $where, $order, @rest) = @_;
137   local $self->{having_bind} = [];
138   local $self->{from_bind} = [];
139
140   if (ref $table eq 'SCALAR') {
141     $table = $$table;
142   }
143   elsif (not ref $table) {
144     $table = $self->_quote($table);
145   }
146   local $self->{rownum_hack_count} = 1
147     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
148   @rest = (-1) unless defined $rest[0];
149   croak "LIMIT 0 Does Not Compute" if $rest[0] == 0;
150     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
151   my ($sql, @where_bind) = $self->SUPER::select(
152     $table, $self->_recurse_fields($fields), $where, $order, @rest
153   );
154   $sql .= 
155     $self->{for} ?
156     (
157       $self->{for} eq 'update' ? ' FOR UPDATE' :
158       $self->{for} eq 'shared' ? ' FOR SHARE'  :
159       ''
160     ) :
161     ''
162   ;
163   return wantarray ? ($sql, @{$self->{from_bind}}, @where_bind, @{$self->{having_bind}}) : $sql;
164 }
165
166 sub insert {
167   my $self = shift;
168   my $table = shift;
169   $table = $self->_quote($table) unless ref($table);
170   $self->SUPER::insert($table, @_);
171 }
172
173 sub update {
174   my $self = shift;
175   my $table = shift;
176   $table = $self->_quote($table) unless ref($table);
177   $self->SUPER::update($table, @_);
178 }
179
180 sub delete {
181   my $self = shift;
182   my $table = shift;
183   $table = $self->_quote($table) unless ref($table);
184   $self->SUPER::delete($table, @_);
185 }
186
187 sub _emulate_limit {
188   my $self = shift;
189   if ($_[3] == -1) {
190     return $_[1].$self->_order_by($_[2]);
191   } else {
192     return $self->SUPER::_emulate_limit(@_);
193   }
194 }
195
196 sub _recurse_fields {
197   my ($self, $fields, $params) = @_;
198   my $ref = ref $fields;
199   return $self->_quote($fields) unless $ref;
200   return $$fields if $ref eq 'SCALAR';
201
202   if ($ref eq 'ARRAY') {
203     return join(', ', map {
204       $self->_recurse_fields($_)
205         .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack})
206           ? ' AS col'.$self->{rownum_hack_count}++
207           : '')
208       } @$fields);
209   } elsif ($ref eq 'HASH') {
210     foreach my $func (keys %$fields) {
211       if ($func eq 'distinct') {
212         my $_fields = $fields->{$func};
213         if (ref $_fields eq 'ARRAY' && @{$_fields} > 1) {
214           croak (
215             'The select => { distinct => ... } syntax is not supported for multiple columns.'
216            .' Instead please use { group_by => [ qw/' . (join ' ', @$_fields) . '/ ] }'
217            .' or { select => [ qw/' . (join ' ', @$_fields) . '/ ], distinct => 1 }'
218           );
219         }
220         else {
221           $_fields = @{$_fields}[0] if ref $_fields eq 'ARRAY';
222           carp (
223             'The select => { distinct => ... } syntax will be deprecated in DBIC version 0.09,'
224            ." please use { group_by => '${_fields}' } or { select => '${_fields}', distinct => 1 }"
225           );
226         }
227       }
228       return $self->_sqlcase($func)
229         .'( '.$self->_recurse_fields($fields->{$func}).' )';
230     }
231   }
232   # Is the second check absolutely necessary?
233   elsif ( $ref eq 'REF' and ref($$fields) eq 'ARRAY' ) {
234     return $self->_fold_sqlbind( $fields );
235   }
236   else {
237     croak($ref . qq{ unexpected in _recurse_fields()})
238   }
239 }
240
241 sub _order_by {
242   my $self = shift;
243   my $ret = '';
244   my @extra;
245   if (ref $_[0] eq 'HASH') {
246
247     if (defined $_[0]->{group_by}) {
248       $ret = $self->_sqlcase(' group by ')
249         .$self->_recurse_fields($_[0]->{group_by}, { no_rownum_hack => 1 });
250     }
251
252     if (defined $_[0]->{having}) {
253       my $frag;
254       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
255       push(@{$self->{having_bind}}, @extra);
256       $ret .= $self->_sqlcase(' having ').$frag;
257     }
258
259     if (defined $_[0]->{order_by}) {
260       $ret .= $self->_order_by($_[0]->{order_by});
261     }
262
263     if (grep { $_ =~ /^-(desc|asc)/i } keys %{$_[0]}) {
264       return $self->SUPER::_order_by($_[0]);
265     }
266
267   } elsif (ref $_[0] eq 'SCALAR') {
268     $ret = $self->_sqlcase(' order by ').${ $_[0] };
269   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
270     my @order = map {
271       my $r = $self->_order_by($_, @_);
272       $r =~ s/^ ?ORDER BY //i;
273       $r || ();
274     } @{+shift};
275
276     $ret = $self->_sqlcase(' order by ') . join(', ', @order) if @order;
277
278   } else {
279     $ret = $self->SUPER::_order_by(@_);
280   }
281   return $ret;
282 }
283
284 sub _order_directions {
285   my ($self, $order) = @_;
286   return $self->SUPER::_order_directions( $self->_resolve_order($order) );
287 }
288
289 sub _resolve_order {
290   my ($self, $order) = @_;
291
292   if (ref $order eq 'HASH') {
293     $order = [$self->_resolve_order_hash($order)];
294   }
295   elsif (ref $order eq 'ARRAY') {
296     $order = [map {
297       if (ref ($_) eq 'SCALAR') {
298         $$_
299       }
300       elsif (ref ($_) eq 'HASH') {
301         $self->_resolve_order_hash($_)
302       }
303       else {
304         $_
305       }
306     }  @$order];
307   }
308
309   return $order;
310 }
311
312 sub _resolve_order_hash {
313   my ($self, $order) = @_;
314   my @new_order;
315   foreach my $key (keys %{ $order }) {
316     if ($key =~ /^-(desc|asc)/i ) {
317       my $direction = $1;
318       my $type = ref $order->{ $key };
319       if ($type eq 'ARRAY') {
320         push @new_order, map( "$_ $direction", @{ $order->{ $key } } );
321       } elsif (!$type) {
322         push @new_order, "$order->{$key} $direction";
323       } else {
324         croak "hash order_by can only contain Scalar or Array, not $type";
325       }
326     } else {
327       croak "$key is not a valid direction, use -asc or -desc";
328     }
329   }
330
331   return @new_order;
332 }
333
334 sub _table {
335   my ($self, $from) = @_;
336   if (ref $from eq 'ARRAY') {
337     return $self->_recurse_from(@$from);
338   } elsif (ref $from eq 'HASH') {
339     return $self->_make_as($from);
340   } else {
341     return $from; # would love to quote here but _table ends up getting called
342                   # twice during an ->select without a limit clause due to
343                   # the way S::A::Limit->select works. should maybe consider
344                   # bypassing this and doing S::A::select($self, ...) in
345                   # our select method above. meantime, quoting shims have
346                   # been added to select/insert/update/delete here
347   }
348 }
349
350 sub _recurse_from {
351   my ($self, $from, @join) = @_;
352   my @sqlf;
353   push(@sqlf, $self->_make_as($from));
354   foreach my $j (@join) {
355     my ($to, $on) = @$j;
356
357     # check whether a join type exists
358     my $join_clause = '';
359     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
360     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
361       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
362     } else {
363       $join_clause = ' JOIN ';
364     }
365     push(@sqlf, $join_clause);
366
367     if (ref $to eq 'ARRAY') {
368       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
369     } else {
370       push(@sqlf, $self->_make_as($to));
371     }
372     push(@sqlf, ' ON ', $self->_join_condition($on));
373   }
374   return join('', @sqlf);
375 }
376
377 sub _fold_sqlbind {
378   my ($self, $sqlbind) = @_;
379
380   my @sqlbind = @$$sqlbind; # copy
381   my $sql = shift @sqlbind;
382   push @{$self->{from_bind}}, @sqlbind;
383
384   return $sql;
385 }
386
387 sub _make_as {
388   my ($self, $from) = @_;
389   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_
390                         : ref $_ eq 'REF'    ? $self->_fold_sqlbind($_)
391                         : $self->_quote($_))
392                        } reverse each %{$self->_skip_options($from)});
393 }
394
395 sub _skip_options {
396   my ($self, $hash) = @_;
397   my $clean_hash = {};
398   $clean_hash->{$_} = $hash->{$_}
399     for grep {!/^-/} keys %$hash;
400   return $clean_hash;
401 }
402
403 sub _join_condition {
404   my ($self, $cond) = @_;
405   if (ref $cond eq 'HASH') {
406     my %j;
407     for (keys %$cond) {
408       my $v = $cond->{$_};
409       if (ref $v) {
410         croak (ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
411             if ref($v) ne 'SCALAR';
412         $j{$_} = $v;
413       }
414       else {
415         my $x = '= '.$self->_quote($v); $j{$_} = \$x;
416       }
417     };
418     return scalar($self->_recurse_where(\%j));
419   } elsif (ref $cond eq 'ARRAY') {
420     return join(' OR ', map { $self->_join_condition($_) } @$cond);
421   } else {
422     die "Can't handle this yet!";
423   }
424 }
425
426 sub _quote {
427   my ($self, $label) = @_;
428   return '' unless defined $label;
429   return "*" if $label eq '*';
430   return $label unless $self->{quote_char};
431   if(ref $self->{quote_char} eq "ARRAY"){
432     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
433       if !defined $self->{name_sep};
434     my $sep = $self->{name_sep};
435     return join($self->{name_sep},
436         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
437        split(/\Q$sep\E/,$label));
438   }
439   return $self->SUPER::_quote($label);
440 }
441
442 sub limit_dialect {
443     my $self = shift;
444     $self->{limit_dialect} = shift if @_;
445     return $self->{limit_dialect};
446 }
447
448 sub quote_char {
449     my $self = shift;
450     $self->{quote_char} = shift if @_;
451     return $self->{quote_char};
452 }
453
454 sub name_sep {
455     my $self = shift;
456     $self->{name_sep} = shift if @_;
457     return $self->{name_sep};
458 }
459
460 1;
461
462 __END__
463
464 =pod
465
466 =head1 NAME
467
468 DBIx::Class::SQLAHacks - This module is a subclass of SQL::Abstract::Limit
469 and includes a number of DBIC-specific workarounds, not yet suitable for
470 inclusion into SQLA proper.
471
472 =head1 METHODS
473
474 =head2 new
475
476 Tries to determine limit dialect.
477
478 =head2 select
479
480 Quotes table names, handles "limit" dialects (e.g. where rownum between x and
481 y), supports SELECT ... FOR UPDATE and SELECT ... FOR SHARE.
482
483 =head2 insert update delete
484
485 Just quotes table names.
486
487 =head2 limit_dialect
488
489 Specifies the dialect of used for implementing an SQL "limit" clause for
490 restricting the number of query results returned.  Valid values are: RowNum.
491
492 See L<DBIx::Class::Storage::DBI/connect_info> for details.
493
494 =head2 name_sep
495
496 Character separating quoted table names.
497
498 See L<DBIx::Class::Storage::DBI/connect_info> for details.
499
500 =head2 quote_char
501
502 Set to an array-ref to specify separate left and right quotes for table names.
503
504 See L<DBIx::Class::Storage::DBI/connect_info> for details.
505
506 =cut
507