Half way working stuff, needs a LOT of tweaking still
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / SQLAHacks.pm
1 package # Hide from PAUSE
2   DBIx::Class::SQLAHacks;
3
4 use base qw/SQL::Abstract::Limit/;
5 use strict;
6 use warnings;
7 use Carp::Clan qw/^DBIx::Class|^SQL::Abstract/;
8
9 BEGIN {
10   # reinstall the carp()/croak() functions imported into SQL::Abstract
11   # as Carp and Carp::Clan do not like each other much
12   no warnings qw/redefine/;
13   no strict qw/refs/;
14   for my $f (qw/carp croak/) {
15     my $orig = \&{"SQL::Abstract::$f"};
16     *{"SQL::Abstract::$f"} = sub {
17
18       local $Carp::CarpLevel = 1;   # even though Carp::Clan ignores this, $orig will not
19
20       if (Carp::longmess() =~ /DBIx::Class::SQLAHacks::[\w]+\(\) called/) {
21         __PACKAGE__->can($f)->(@_);
22       }
23       else {
24         $orig->(@_);
25       }
26     }
27   }
28 }
29
30 sub new {
31   my $self = shift->SUPER::new(@_);
32
33   # This prevents the caching of $dbh in S::A::L, I believe
34   # If limit_dialect is a ref (like a $dbh), go ahead and replace
35   #   it with what it resolves to:
36   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
37     if ref $self->{limit_dialect};
38
39   $self;
40 }
41
42
43 # Some databases (sqlite) do not handle multiple parenthesis
44 # around in/between arguments. A tentative x IN ( ( 1, 2 ,3) )
45 # is interpreted as x IN 1 or something similar.
46 #
47 # Since we currently do not have access to the SQLA AST, resort
48 # to barbaric mutilation of any SQL supplied in literal form
49
50 sub _strip_outer_paren {
51   my ($self, $arg) = @_;
52
53   return $self->_SWITCH_refkind ($arg, {
54     ARRAYREFREF => sub {
55       $$arg->[0] = __strip_outer_paren ($$arg->[0]);
56       return $arg;
57     },
58     SCALARREF => sub {
59       return \__strip_outer_paren( $$arg );
60     },
61     FALLBACK => sub {
62       return $arg
63     },
64   });
65 }
66
67 sub __strip_outer_paren {
68   my $sql = shift;
69
70   if ($sql and not ref $sql) {
71     while ($sql =~ /^ \s* \( (.*) \) \s* $/x ) {
72       $sql = $1;
73     }
74   }
75
76   return $sql;
77 }
78
79 sub _where_field_IN {
80   my ($self, $lhs, $op, $rhs) = @_;
81   $rhs = $self->_strip_outer_paren ($rhs);
82   return $self->SUPER::_where_field_IN ($lhs, $op, $rhs);
83 }
84
85 sub _where_field_BETWEEN {
86   my ($self, $lhs, $op, $rhs) = @_;
87   $rhs = $self->_strip_outer_paren ($rhs);
88   return $self->SUPER::_where_field_BETWEEN ($lhs, $op, $rhs);
89 }
90
91 # Slow but ANSI standard Limit/Offset support. DB2 uses this
92 sub _RowNumberOver {
93   my ($self, $sql, $order, $rows, $offset ) = @_;
94
95   $offset += 1;
96   my $last = $rows + $offset - 1;
97   my ( $order_by ) = $self->_order_by( $order );
98
99   $sql = <<"SQL";
100 SELECT * FROM
101 (
102    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
103       $sql
104       $order_by
105    ) Q1
106 ) Q2
107 WHERE ROW_NUM BETWEEN $offset AND $last
108
109 SQL
110
111   return $sql;
112 }
113
114 # Crappy Top based Limit/Offset support. MSSQL uses this currently,
115 # but may have to switch to RowNumberOver one day
116 sub _Top {
117   my ( $self, $sql, $order, $rows, $offset ) = @_;
118
119   # mangle the input sql so it can be properly aliased in the outer queries
120   $sql =~ s/^ \s* SELECT \s+ (.+?) \s+ (?=FROM)//ix
121     or croak "Unrecognizable SELECT: $sql";
122   my $select = $1;
123
124   my (@outer_select, %col_index);
125   for my $selected_col (@{$self->{_dbic_rs_attrs}{select}}) {
126
127     my $new_colname;
128
129     if (ref $selected_col) {
130       $new_colname = $self->_quote ('column_' . (@outer_select + 1) );
131     }
132     else {
133       my $quoted_col = $self->_quote ($selected_col);
134
135       my $name_sep = $self->name_sep || '.';
136       $name_sep = "\Q$name_sep\E";
137
138       my ($table, $orig_colname) = ( $selected_col =~ / (?: (.+) $name_sep )? ([^$name_sep]+) $ /x );
139       $new_colname = $self->_quote ("${table}__${orig_colname}");
140
141       $select =~ s/(\Q$quoted_col\E|\Q$selected_col\E)/"$1 AS $new_colname"/e;
142
143       # record qualified name if available (should be)
144       $col_index{$selected_col} = $new_colname if $table;
145
146       # record unqialified name, undef if a duplicate is found
147       if (exists $col_index{$orig_colname}) {
148         $col_index{$orig_colname} = undef;
149       }
150       else {
151         $col_index{$orig_colname} = $new_colname;
152       }
153     }
154
155     push @outer_select, $new_colname;
156   }
157
158   my $outer_select = join (', ', @outer_select );
159
160
161   # deal with order
162   croak '$order supplied to SQLAHacks limit emulators must be a hash'
163     if (ref $order ne 'HASH');
164
165   $order = { %$order }; #copy
166
167   my $req_order = [ $self->_order_by_chunks ($order->{order_by}) ];
168   my $limit_order = [ @$req_order ? @$req_order : $self->_order_by_chunks ($order->{_virtual_order_by}) ];
169
170
171   # normalize all column names in order by
172   # no copies, just aliasing ($_)
173   for ($req_order, $limit_order) {
174     for ( @{$_ || []} ) {
175       $_ = $col_index{$_} if $col_index{$_};
176     }
177   }
178
179
180   # generate the rest
181   delete $order->{$_} for qw/order_by _virtual_order_by/;
182   my $grpby_having = $self->_order_by ($order);
183
184   my ( $order_by_inner, $order_by_outer ) = $self->_order_directions($limit_order);
185
186   my $last = $rows + $offset;
187
188   $sql = <<"SQL";
189
190     SELECT TOP $rows $outer_select FROM
191     (
192       SELECT TOP $last $select $sql $grpby_having $order_by_inner
193     ) AS inner_sel
194     $order_by_outer
195 SQL
196
197   if (@$req_order) {
198     my $order_by_requested = $self->_order_by ($req_order);
199
200     $sql = <<"SQL";
201
202   SELECT $outer_select FROM
203   ( $sql ) AS outer_sel
204   $order_by_requested;
205 SQL
206
207   }
208
209   return $sql;
210 }
211
212
213
214 # While we're at it, this should make LIMIT queries more efficient,
215 #  without digging into things too deeply
216 sub _find_syntax {
217   my ($self, $syntax) = @_;
218   return $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
219 }
220
221 my $for_syntax = {
222   update => 'FOR UPDATE',
223   shared => 'FOR SHARE',
224 };
225 sub select {
226   my ($self, $table, $fields, $where, $order, @rest) = @_;
227
228   $self->{"${_}_bind"} = [] for (qw/having from order/);
229
230   if (ref $table eq 'SCALAR') {
231     $table = $$table;
232   }
233   elsif (not ref $table) {
234     $table = $self->_quote($table);
235   }
236   local $self->{rownum_hack_count} = 1
237     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
238   @rest = (-1) unless defined $rest[0];
239   croak "LIMIT 0 Does Not Compute" if $rest[0] == 0;
240     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
241   my ($sql, @where_bind) = $self->SUPER::select(
242     $table, $self->_recurse_fields($fields), $where, $order, @rest
243   );
244   if (my $for = delete $self->{_dbic_rs_attrs}{for}) {
245     $sql .= " $for_syntax->{$for}" if $for_syntax->{$for};
246   }
247
248   return wantarray ? ($sql, @{$self->{from_bind}}, @where_bind, @{$self->{having_bind}}, @{$self->{order_bind}} ) : $sql;
249 }
250
251 sub insert {
252   my $self = shift;
253   my $table = shift;
254   $table = $self->_quote($table) unless ref($table);
255
256   # SQLA will emit INSERT INTO $table ( ) VALUES ( )
257   # which is sadly understood only by MySQL. Change default behavior here,
258   # until SQLA2 comes with proper dialect support
259   if (! $_[0] or (ref $_[0] eq 'HASH' and !keys %{$_[0]} ) ) {
260     return "INSERT INTO ${table} DEFAULT VALUES"
261   }
262
263   $self->SUPER::insert($table, @_);
264 }
265
266 sub update {
267   my $self = shift;
268   my $table = shift;
269   $table = $self->_quote($table) unless ref($table);
270   $self->SUPER::update($table, @_);
271 }
272
273 sub delete {
274   my $self = shift;
275   my $table = shift;
276   $table = $self->_quote($table) unless ref($table);
277   $self->SUPER::delete($table, @_);
278 }
279
280 sub _emulate_limit {
281   my $self = shift;
282   if ($_[3] == -1) {
283     return $_[1].$self->_order_by($_[2]);
284   } else {
285     return $self->SUPER::_emulate_limit(@_);
286   }
287 }
288
289 sub _recurse_fields {
290   my ($self, $fields, $params) = @_;
291   my $ref = ref $fields;
292   return $self->_quote($fields) unless $ref;
293   return $$fields if $ref eq 'SCALAR';
294
295   if ($ref eq 'ARRAY') {
296     return join(', ', map {
297       $self->_recurse_fields($_)
298         .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack})
299           ? ' AS col'.$self->{rownum_hack_count}++
300           : '')
301       } @$fields);
302   } elsif ($ref eq 'HASH') {
303     foreach my $func (keys %$fields) {
304       if ($func eq 'distinct') {
305         my $_fields = $fields->{$func};
306         if (ref $_fields eq 'ARRAY' && @{$_fields} > 1) {
307           croak (
308             'The select => { distinct => ... } syntax is not supported for multiple columns.'
309            .' Instead please use { group_by => [ qw/' . (join ' ', @$_fields) . '/ ] }'
310            .' or { select => [ qw/' . (join ' ', @$_fields) . '/ ], distinct => 1 }'
311           );
312         }
313         else {
314           $_fields = @{$_fields}[0] if ref $_fields eq 'ARRAY';
315           carp (
316             'The select => { distinct => ... } syntax will be deprecated in DBIC version 0.09,'
317            ." please use { group_by => '${_fields}' } or { select => '${_fields}', distinct => 1 }"
318           );
319         }
320       }
321       return $self->_sqlcase($func)
322         .'( '.$self->_recurse_fields($fields->{$func}).' )';
323     }
324   }
325   # Is the second check absolutely necessary?
326   elsif ( $ref eq 'REF' and ref($$fields) eq 'ARRAY' ) {
327     return $self->_fold_sqlbind( $fields );
328   }
329   else {
330     croak($ref . qq{ unexpected in _recurse_fields()})
331   }
332 }
333
334 sub _order_by {
335   my ($self, $arg) = @_;
336
337   if (ref $arg eq 'HASH' and keys %$arg and not grep { $_ =~ /^-(?:desc|asc)/i } keys %$arg ) {
338
339     my $ret = '';
340
341     if (defined $arg->{group_by}) {
342       $ret = $self->_sqlcase(' group by ')
343         .$self->_recurse_fields($arg->{group_by}, { no_rownum_hack => 1 });
344     }
345
346     if (defined $arg->{having}) {
347       my ($frag, @bind) = $self->_recurse_where($arg->{having});
348       push(@{$self->{having_bind}}, @bind);
349       $ret .= $self->_sqlcase(' having ').$frag;
350     }
351
352     if (defined $arg->{order_by}) {
353       my ($frag, @bind) = $self->SUPER::_order_by($arg->{order_by});
354       push(@{$self->{order_bind}}, @bind);
355       $ret .= $frag;
356     }
357
358     return $ret;
359   }
360   else {
361     my ($sql, @bind) = $self->SUPER::_order_by ($arg);
362     push(@{$self->{order_bind}}, @bind);
363     return $sql;
364   }
365 }
366
367 sub _order_directions {
368   my ($self, $order) = @_;
369
370   # strip bind values - none of the current _order_directions users support them
371   return $self->SUPER::_order_directions( [ map
372     { ref $_ ? $_->[0] : $_ }
373     $self->_order_by_chunks ($order)
374   ]);
375 }
376
377 sub _table {
378   my ($self, $from) = @_;
379   if (ref $from eq 'ARRAY') {
380     return $self->_recurse_from(@$from);
381   } elsif (ref $from eq 'HASH') {
382     return $self->_make_as($from);
383   } else {
384     return $from; # would love to quote here but _table ends up getting called
385                   # twice during an ->select without a limit clause due to
386                   # the way S::A::Limit->select works. should maybe consider
387                   # bypassing this and doing S::A::select($self, ...) in
388                   # our select method above. meantime, quoting shims have
389                   # been added to select/insert/update/delete here
390   }
391 }
392
393 sub _recurse_from {
394   my ($self, $from, @join) = @_;
395   my @sqlf;
396   push(@sqlf, $self->_make_as($from));
397   foreach my $j (@join) {
398     my ($to, $on) = @$j;
399
400     # check whether a join type exists
401     my $join_clause = '';
402     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
403     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
404       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
405     } else {
406       $join_clause = ' JOIN ';
407     }
408     push(@sqlf, $join_clause);
409
410     if (ref $to eq 'ARRAY') {
411       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
412     } else {
413       push(@sqlf, $self->_make_as($to));
414     }
415     push(@sqlf, ' ON ', $self->_join_condition($on));
416   }
417   return join('', @sqlf);
418 }
419
420 sub _fold_sqlbind {
421   my ($self, $sqlbind) = @_;
422
423   my @sqlbind = @$$sqlbind; # copy
424   my $sql = shift @sqlbind;
425   push @{$self->{from_bind}}, @sqlbind;
426
427   return $sql;
428 }
429
430 sub _make_as {
431   my ($self, $from) = @_;
432   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_
433                         : ref $_ eq 'REF'    ? $self->_fold_sqlbind($_)
434                         : $self->_quote($_))
435                        } reverse each %{$self->_skip_options($from)});
436 }
437
438 sub _skip_options {
439   my ($self, $hash) = @_;
440   my $clean_hash = {};
441   $clean_hash->{$_} = $hash->{$_}
442     for grep {!/^-/} keys %$hash;
443   return $clean_hash;
444 }
445
446 sub _join_condition {
447   my ($self, $cond) = @_;
448   if (ref $cond eq 'HASH') {
449     my %j;
450     for (keys %$cond) {
451       my $v = $cond->{$_};
452       if (ref $v) {
453         croak (ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
454             if ref($v) ne 'SCALAR';
455         $j{$_} = $v;
456       }
457       else {
458         my $x = '= '.$self->_quote($v); $j{$_} = \$x;
459       }
460     };
461     return scalar($self->_recurse_where(\%j));
462   } elsif (ref $cond eq 'ARRAY') {
463     return join(' OR ', map { $self->_join_condition($_) } @$cond);
464   } else {
465     die "Can't handle this yet!";
466   }
467 }
468
469 sub _quote {
470   my ($self, $label) = @_;
471   return '' unless defined $label;
472   return "*" if $label eq '*';
473   return $label unless $self->{quote_char};
474   if(ref $self->{quote_char} eq "ARRAY"){
475     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
476       if !defined $self->{name_sep};
477     my $sep = $self->{name_sep};
478     return join($self->{name_sep},
479         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
480        split(/\Q$sep\E/,$label));
481   }
482   return $self->SUPER::_quote($label);
483 }
484
485 sub limit_dialect {
486     my $self = shift;
487     $self->{limit_dialect} = shift if @_;
488     return $self->{limit_dialect};
489 }
490
491 sub quote_char {
492     my $self = shift;
493     $self->{quote_char} = shift if @_;
494     return $self->{quote_char};
495 }
496
497 sub name_sep {
498     my $self = shift;
499     $self->{name_sep} = shift if @_;
500     return $self->{name_sep};
501 }
502
503 1;
504
505 __END__
506
507 =pod
508
509 =head1 NAME
510
511 DBIx::Class::SQLAHacks - This module is a subclass of SQL::Abstract::Limit
512 and includes a number of DBIC-specific workarounds, not yet suitable for
513 inclusion into SQLA proper.
514
515 =head1 METHODS
516
517 =head2 new
518
519 Tries to determine limit dialect.
520
521 =head2 select
522
523 Quotes table names, handles "limit" dialects (e.g. where rownum between x and
524 y), supports SELECT ... FOR UPDATE and SELECT ... FOR SHARE.
525
526 =head2 insert update delete
527
528 Just quotes table names.
529
530 =head2 limit_dialect
531
532 Specifies the dialect of used for implementing an SQL "limit" clause for
533 restricting the number of query results returned.  Valid values are: RowNum.
534
535 See L<DBIx::Class::Storage::DBI/connect_info> for details.
536
537 =head2 name_sep
538
539 Character separating quoted table names.
540
541 See L<DBIx::Class::Storage::DBI/connect_info> for details.
542
543 =head2 quote_char
544
545 Set to an array-ref to specify separate left and right quotes for table names.
546
547 See L<DBIx::Class::Storage::DBI/connect_info> for details.
548
549 =cut
550