d2f96bf2156328c16db3e03f5eae4219cd371aee
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / SQLAHacks.pm
1 package # Hide from PAUSE
2   DBIx::Class::SQLAHacks; # Would merge upstream, but nate doesn't reply :(
3
4 use base qw/SQL::Abstract::Limit/;
5 use Carp::Clan qw/^DBIx::Class/;
6
7 sub new {
8   my $self = shift->SUPER::new(@_);
9
10   # This prevents the caching of $dbh in S::A::L, I believe
11   # If limit_dialect is a ref (like a $dbh), go ahead and replace
12   #   it with what it resolves to:
13   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
14     if ref $self->{limit_dialect};
15
16   $self;
17 }
18
19
20
21 # Some databases (sqlite) do not handle multiple parenthesis
22 # around in/between arguments. A tentative x IN ( ( 1, 2 ,3) )
23 # is interpreted as x IN 1 or something similar.
24 #
25 # Since we currently do not have access to the SQLA AST, resort
26 # to barbaric mutilation of any SQL supplied in literal form
27
28 sub _strip_outer_paren {
29   my ($self, $arg) = @_;
30
31   return $self->_SWITCH_refkind ($arg, {
32     ARRAYREFREF => sub {
33       $$arg->[0] = __strip_outer_paren ($$arg->[0]);
34       return $arg;
35     },
36     SCALARREF => sub {
37       return \__strip_outer_paren( $$arg );
38     },
39     FALLBACK => sub {
40       return $arg
41     },
42   });
43 }
44
45 sub __strip_outer_paren {
46   my $sql = shift;
47
48   if ($sql and not ref $sql) {
49     while ($sql =~ /^ \s* \( (.*) \) \s* $/x ) {
50       $sql = $1;
51     }
52   }
53
54   return $sql;
55 }
56
57 sub _where_field_IN {
58   my ($self, $lhs, $op, $rhs) = @_;
59   $rhs = $self->_strip_outer_paren ($rhs);
60   return $self->SUPER::_where_field_IN ($lhs, $op, $rhs);
61 }
62
63 sub _where_field_BETWEEN {
64   my ($self, $lhs, $op, $rhs) = @_;
65   $rhs = $self->_strip_outer_paren ($rhs);
66   return $self->SUPER::_where_field_BETWEEN ($lhs, $op, $rhs);
67 }
68
69
70
71 # DB2 is the only remaining DB using this. Even though we are not sure if
72 # RowNumberOver is still needed here (should be part of SQLA) leave the 
73 # code in place
74 sub _RowNumberOver {
75   my ($self, $sql, $order, $rows, $offset ) = @_;
76
77   $offset += 1;
78   my $last = $rows + $offset;
79   my ( $order_by ) = $self->_order_by( $order );
80
81   $sql = <<"SQL";
82 SELECT * FROM
83 (
84    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
85       $sql
86       $order_by
87    ) Q1
88 ) Q2
89 WHERE ROW_NUM BETWEEN $offset AND $last
90
91 SQL
92
93   return $sql;
94 }
95
96
97 # While we're at it, this should make LIMIT queries more efficient,
98 #  without digging into things too deeply
99 use Scalar::Util 'blessed';
100 sub _find_syntax {
101   my ($self, $syntax) = @_;
102   
103   # DB2 is the only remaining DB using this. Even though we are not sure if
104   # RowNumberOver is still needed here (should be part of SQLA) leave the 
105   # code in place
106   my $dbhname = blessed($syntax) ? $syntax->{Driver}{Name} : $syntax;
107   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
108     return 'RowNumberOver';
109   }
110   
111   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
112 }
113
114 sub select {
115   my ($self, $table, $fields, $where, $order, @rest) = @_;
116   local $self->{having_bind} = [];
117   local $self->{from_bind} = [];
118
119   if (ref $table eq 'SCALAR') {
120     $table = $$table;
121   }
122   elsif (not ref $table) {
123     $table = $self->_quote($table);
124   }
125   local $self->{rownum_hack_count} = 1
126     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
127   @rest = (-1) unless defined $rest[0];
128   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
129     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
130   my ($sql, @where_bind) = $self->SUPER::select(
131     $table, $self->_recurse_fields($fields), $where, $order, @rest
132   );
133   $sql .= 
134     $self->{for} ?
135     (
136       $self->{for} eq 'update' ? ' FOR UPDATE' :
137       $self->{for} eq 'shared' ? ' FOR SHARE'  :
138       ''
139     ) :
140     ''
141   ;
142   return wantarray ? ($sql, @{$self->{from_bind}}, @where_bind, @{$self->{having_bind}}) : $sql;
143 }
144
145 sub insert {
146   my $self = shift;
147   my $table = shift;
148   $table = $self->_quote($table) unless ref($table);
149   $self->SUPER::insert($table, @_);
150 }
151
152 sub update {
153   my $self = shift;
154   my $table = shift;
155   $table = $self->_quote($table) unless ref($table);
156   $self->SUPER::update($table, @_);
157 }
158
159 sub delete {
160   my $self = shift;
161   my $table = shift;
162   $table = $self->_quote($table) unless ref($table);
163   $self->SUPER::delete($table, @_);
164 }
165
166 sub _emulate_limit {
167   my $self = shift;
168   if ($_[3] == -1) {
169     return $_[1].$self->_order_by($_[2]);
170   } else {
171     return $self->SUPER::_emulate_limit(@_);
172   }
173 }
174
175 sub _recurse_fields {
176   my ($self, $fields, $params) = @_;
177   my $ref = ref $fields;
178   return $self->_quote($fields) unless $ref;
179   return $$fields if $ref eq 'SCALAR';
180
181   if ($ref eq 'ARRAY') {
182     return join(', ', map {
183       $self->_recurse_fields($_)
184         .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack})
185           ? ' AS col'.$self->{rownum_hack_count}++
186           : '')
187       } @$fields);
188   } elsif ($ref eq 'HASH') {
189     foreach my $func (keys %$fields) {
190       if ($func eq 'distinct') {
191         my $_fields = $fields->{$func};
192         if (ref $_fields eq 'ARRAY' && @{$_fields} > 1) {
193           die "Unsupported syntax, please use " . 
194               "{ group_by => [ qw/" . (join ' ', @$_fields) . "/ ] }" .
195               " or " .
196               "{ select => [ qw/" . (join ' ', @$_fields) . "/ ], distinct => 1 }";
197         }
198         else {
199           $_fields = @{$_fields}[0] if ref $_fields eq 'ARRAY';
200           carp "This syntax will be deprecated in 09, please use " . 
201                "{ group_by => '${_fields}' }" . 
202                " or " .
203                "{ select => '${_fields}', distinct => 1 }";
204         }
205       }
206       
207       return $self->_sqlcase($func)
208         .'( '.$self->_recurse_fields($fields->{$func}).' )';
209     }
210   }
211   # Is the second check absolutely necessary?
212   elsif ( $ref eq 'REF' and ref($$fields) eq 'ARRAY' ) {
213     return $self->_fold_sqlbind( $fields );
214   }
215   else {
216     Carp::croak($ref . qq{ unexpected in _recurse_fields()})
217   }
218 }
219
220 sub _order_by {
221   my $self = shift;
222   my $ret = '';
223   my @extra;
224   if (ref $_[0] eq 'HASH') {
225     if (defined $_[0]->{group_by}) {
226       $ret = $self->_sqlcase(' group by ')
227         .$self->_recurse_fields($_[0]->{group_by}, { no_rownum_hack => 1 });
228     }
229     if (defined $_[0]->{having}) {
230       my $frag;
231       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
232       push(@{$self->{having_bind}}, @extra);
233       $ret .= $self->_sqlcase(' having ').$frag;
234     }
235     if (defined $_[0]->{order_by}) {
236       $ret .= $self->_order_by($_[0]->{order_by});
237     }
238     if (grep { $_ =~ /^-(desc|asc)/i } keys %{$_[0]}) {
239       return $self->SUPER::_order_by($_[0]);
240     }
241   } elsif (ref $_[0] eq 'SCALAR') {
242     $ret = $self->_sqlcase(' order by ').${ $_[0] };
243   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
244     my @order = @{+shift};
245     $ret = $self->_sqlcase(' order by ')
246           .join(', ', map {
247                         my $r = $self->_order_by($_, @_);
248                         $r =~ s/^ ?ORDER BY //i;
249                         $r;
250                       } @order);
251   } else {
252     $ret = $self->SUPER::_order_by(@_);
253   }
254   return $ret;
255 }
256
257 sub _order_directions {
258   my ($self, $order) = @_;
259   $order = $order->{order_by} if ref $order eq 'HASH';
260   return $self->SUPER::_order_directions($order);
261 }
262
263 sub _table {
264   my ($self, $from) = @_;
265   if (ref $from eq 'ARRAY') {
266     return $self->_recurse_from(@$from);
267   } elsif (ref $from eq 'HASH') {
268     return $self->_make_as($from);
269   } else {
270     return $from; # would love to quote here but _table ends up getting called
271                   # twice during an ->select without a limit clause due to
272                   # the way S::A::Limit->select works. should maybe consider
273                   # bypassing this and doing S::A::select($self, ...) in
274                   # our select method above. meantime, quoting shims have
275                   # been added to select/insert/update/delete here
276   }
277 }
278
279 sub _recurse_from {
280   my ($self, $from, @join) = @_;
281   my @sqlf;
282   push(@sqlf, $self->_make_as($from));
283   foreach my $j (@join) {
284     my ($to, $on) = @$j;
285
286     # check whether a join type exists
287     my $join_clause = '';
288     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
289     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
290       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
291     } else {
292       $join_clause = ' JOIN ';
293     }
294     push(@sqlf, $join_clause);
295
296     if (ref $to eq 'ARRAY') {
297       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
298     } else {
299       push(@sqlf, $self->_make_as($to));
300     }
301     push(@sqlf, ' ON ', $self->_join_condition($on));
302   }
303   return join('', @sqlf);
304 }
305
306 sub _fold_sqlbind {
307   my ($self, $sqlbind) = @_;
308   my $sql = shift @$$sqlbind;
309   push @{$self->{from_bind}}, @$$sqlbind;
310   return $sql;
311 }
312
313 sub _make_as {
314   my ($self, $from) = @_;
315   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_
316                         : ref $_ eq 'REF'    ? $self->_fold_sqlbind($_)
317                         : $self->_quote($_))
318                        } reverse each %{$self->_skip_options($from)});
319 }
320
321 sub _skip_options {
322   my ($self, $hash) = @_;
323   my $clean_hash = {};
324   $clean_hash->{$_} = $hash->{$_}
325     for grep {!/^-/} keys %$hash;
326   return $clean_hash;
327 }
328
329 sub _join_condition {
330   my ($self, $cond) = @_;
331   if (ref $cond eq 'HASH') {
332     my %j;
333     for (keys %$cond) {
334       my $v = $cond->{$_};
335       if (ref $v) {
336         # XXX no throw_exception() in this package and croak() fails with strange results
337         Carp::croak(ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
338             if ref($v) ne 'SCALAR';
339         $j{$_} = $v;
340       }
341       else {
342         my $x = '= '.$self->_quote($v); $j{$_} = \$x;
343       }
344     };
345     return scalar($self->_recurse_where(\%j));
346   } elsif (ref $cond eq 'ARRAY') {
347     return join(' OR ', map { $self->_join_condition($_) } @$cond);
348   } else {
349     die "Can't handle this yet!";
350   }
351 }
352
353 sub _quote {
354   my ($self, $label) = @_;
355   return '' unless defined $label;
356   return "*" if $label eq '*';
357   return $label unless $self->{quote_char};
358   if(ref $self->{quote_char} eq "ARRAY"){
359     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
360       if !defined $self->{name_sep};
361     my $sep = $self->{name_sep};
362     return join($self->{name_sep},
363         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
364        split(/\Q$sep\E/,$label));
365   }
366   return $self->SUPER::_quote($label);
367 }
368
369 sub limit_dialect {
370     my $self = shift;
371     $self->{limit_dialect} = shift if @_;
372     return $self->{limit_dialect};
373 }
374
375 sub quote_char {
376     my $self = shift;
377     $self->{quote_char} = shift if @_;
378     return $self->{quote_char};
379 }
380
381 sub name_sep {
382     my $self = shift;
383     $self->{name_sep} = shift if @_;
384     return $self->{name_sep};
385 }
386
387 1;
388
389 __END__
390
391 =pod
392
393 =head1 NAME
394
395 DBIx::Class::SQLAHacks - Things desired to be merged into SQL::Abstract
396
397 =head1 METHODS
398
399 =head2 new
400
401 Tries to determine limit dialect.
402
403 =head2 select
404
405 Quotes table names, handles "limit" dialects (e.g. where rownum between x and
406 y), supports SELECT ... FOR UPDATE and SELECT ... FOR SHARE.
407
408 =head2 insert update delete
409
410 Just quotes table names.
411
412 =head2 limit_dialect
413
414 Specifies the dialect of used for implementing an SQL "limit" clause for
415 restricting the number of query results returned.  Valid values are: RowNum.
416
417 See L<DBIx::Class::Storage::DBI/connect_info> for details.
418
419 =head2 name_sep
420
421 Character separating quoted table names.
422
423 See L<DBIx::Class::Storage::DBI/connect_info> for details.
424
425 =head2 quote_char
426
427 Set to an array-ref to specify separate left and right quotes for table names.
428
429 See L<DBIx::Class::Storage::DBI/connect_info> for details.
430
431 =cut
432