Half way working stuff, needs a LOT of tweaking still
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / SQLAHacks.pm
CommitLineData
6f4ddea1 1package # Hide from PAUSE
855c6fd0 2 DBIx::Class::SQLAHacks;
6f4ddea1 3
4use base qw/SQL::Abstract::Limit/;
e3764383 5use strict;
6use warnings;
b2b22cd6 7use Carp::Clan qw/^DBIx::Class|^SQL::Abstract/;
8
9BEGIN {
10 # reinstall the carp()/croak() functions imported into SQL::Abstract
11 # as Carp and Carp::Clan do not like each other much
12 no warnings qw/redefine/;
13 no strict qw/refs/;
14 for my $f (qw/carp croak/) {
15 my $orig = \&{"SQL::Abstract::$f"};
16 *{"SQL::Abstract::$f"} = sub {
17
18 local $Carp::CarpLevel = 1; # even though Carp::Clan ignores this, $orig will not
19
20 if (Carp::longmess() =~ /DBIx::Class::SQLAHacks::[\w]+\(\) called/) {
21 __PACKAGE__->can($f)->(@_);
22 }
23 else {
24 $orig->(@_);
25 }
26 }
27 }
28}
6f4ddea1 29
30sub new {
31 my $self = shift->SUPER::new(@_);
32
33 # This prevents the caching of $dbh in S::A::L, I believe
34 # If limit_dialect is a ref (like a $dbh), go ahead and replace
35 # it with what it resolves to:
36 $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
37 if ref $self->{limit_dialect};
38
39 $self;
40}
41
42
6f4ddea1 43# Some databases (sqlite) do not handle multiple parenthesis
44# around in/between arguments. A tentative x IN ( ( 1, 2 ,3) )
45# is interpreted as x IN 1 or something similar.
46#
47# Since we currently do not have access to the SQLA AST, resort
48# to barbaric mutilation of any SQL supplied in literal form
49
50sub _strip_outer_paren {
51 my ($self, $arg) = @_;
52
53 return $self->_SWITCH_refkind ($arg, {
54 ARRAYREFREF => sub {
55 $$arg->[0] = __strip_outer_paren ($$arg->[0]);
56 return $arg;
57 },
58 SCALARREF => sub {
59 return \__strip_outer_paren( $$arg );
60 },
61 FALLBACK => sub {
62 return $arg
63 },
64 });
65}
66
67sub __strip_outer_paren {
68 my $sql = shift;
69
70 if ($sql and not ref $sql) {
71 while ($sql =~ /^ \s* \( (.*) \) \s* $/x ) {
72 $sql = $1;
73 }
74 }
75
76 return $sql;
77}
78
79sub _where_field_IN {
80 my ($self, $lhs, $op, $rhs) = @_;
81 $rhs = $self->_strip_outer_paren ($rhs);
82 return $self->SUPER::_where_field_IN ($lhs, $op, $rhs);
83}
84
85sub _where_field_BETWEEN {
86 my ($self, $lhs, $op, $rhs) = @_;
87 $rhs = $self->_strip_outer_paren ($rhs);
88 return $self->SUPER::_where_field_BETWEEN ($lhs, $op, $rhs);
89}
90
15827712 91# Slow but ANSI standard Limit/Offset support. DB2 uses this
6f4ddea1 92sub _RowNumberOver {
93 my ($self, $sql, $order, $rows, $offset ) = @_;
94
95 $offset += 1;
03e1d9af 96 my $last = $rows + $offset - 1;
6f4ddea1 97 my ( $order_by ) = $self->_order_by( $order );
98
99 $sql = <<"SQL";
100SELECT * FROM
101(
102 SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
103 $sql
104 $order_by
105 ) Q1
106) Q2
107WHERE ROW_NUM BETWEEN $offset AND $last
108
109SQL
110
111 return $sql;
112}
113
15827712 114# Crappy Top based Limit/Offset support. MSSQL uses this currently,
115# but may have to switch to RowNumberOver one day
116sub _Top {
117 my ( $self, $sql, $order, $rows, $offset ) = @_;
118
b1e1d073 119 # mangle the input sql so it can be properly aliased in the outer queries
120 $sql =~ s/^ \s* SELECT \s+ (.+?) \s+ (?=FROM)//ix
121 or croak "Unrecognizable SELECT: $sql";
122 my $select = $1;
123
124 my (@outer_select, %col_index);
125 for my $selected_col (@{$self->{_dbic_rs_attrs}{select}}) {
126
127 my $new_colname;
128
129 if (ref $selected_col) {
130 $new_colname = $self->_quote ('column_' . (@outer_select + 1) );
131 }
132 else {
133 my $quoted_col = $self->_quote ($selected_col);
134
135 my $name_sep = $self->name_sep || '.';
136 $name_sep = "\Q$name_sep\E";
137
138 my ($table, $orig_colname) = ( $selected_col =~ / (?: (.+) $name_sep )? ([^$name_sep]+) $ /x );
139 $new_colname = $self->_quote ("${table}__${orig_colname}");
140
141 $select =~ s/(\Q$quoted_col\E|\Q$selected_col\E)/"$1 AS $new_colname"/e;
142
143 # record qualified name if available (should be)
144 $col_index{$selected_col} = $new_colname if $table;
145
146 # record unqialified name, undef if a duplicate is found
147 if (exists $col_index{$orig_colname}) {
148 $col_index{$orig_colname} = undef;
149 }
150 else {
151 $col_index{$orig_colname} = $new_colname;
152 }
153 }
154
155 push @outer_select, $new_colname;
156 }
157
158 my $outer_select = join (', ', @outer_select );
159
160
161 # deal with order
15827712 162 croak '$order supplied to SQLAHacks limit emulators must be a hash'
163 if (ref $order ne 'HASH');
164
8f6dbee9 165 $order = { %$order }; #copy
166
b1e1d073 167 my $req_order = [ $self->_order_by_chunks ($order->{order_by}) ];
168 my $limit_order = [ @$req_order ? @$req_order : $self->_order_by_chunks ($order->{_virtual_order_by}) ];
169
15827712 170
b1e1d073 171 # normalize all column names in order by
172 # no copies, just aliasing ($_)
173 for ($req_order, $limit_order) {
174 for ( @{$_ || []} ) {
175 $_ = $col_index{$_} if $col_index{$_};
176 }
177 }
1cbd3034 178
15827712 179
b1e1d073 180 # generate the rest
8f6dbee9 181 delete $order->{$_} for qw/order_by _virtual_order_by/;
182 my $grpby_having = $self->_order_by ($order);
183
15827712 184 my ( $order_by_inner, $order_by_outer ) = $self->_order_directions($limit_order);
185
b1e1d073 186 my $last = $rows + $offset;
15827712 187
188 $sql = <<"SQL";
b1e1d073 189
190 SELECT TOP $rows $outer_select FROM
15827712 191 (
b1e1d073 192 SELECT TOP $last $select $sql $grpby_having $order_by_inner
193 ) AS inner_sel
15827712 194 $order_by_outer
b1e1d073 195SQL
196
197 if (@$req_order) {
198 my $order_by_requested = $self->_order_by ($req_order);
15827712 199
b1e1d073 200 $sql = <<"SQL";
201
202 SELECT $outer_select FROM
203 ( $sql ) AS outer_sel
204 $order_by_requested;
15827712 205SQL
b1e1d073 206
207 }
208
209 return $sql;
15827712 210}
211
212
6f4ddea1 213
214# While we're at it, this should make LIMIT queries more efficient,
215# without digging into things too deeply
6f4ddea1 216sub _find_syntax {
217 my ($self, $syntax) = @_;
ac788c46 218 return $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
6f4ddea1 219}
220
ec7cfd36 221my $for_syntax = {
222 update => 'FOR UPDATE',
223 shared => 'FOR SHARE',
224};
6f4ddea1 225sub select {
226 my ($self, $table, $fields, $where, $order, @rest) = @_;
1cbd3034 227
228 $self->{"${_}_bind"} = [] for (qw/having from order/);
db56cf3d 229
6f4ddea1 230 if (ref $table eq 'SCALAR') {
231 $table = $$table;
232 }
233 elsif (not ref $table) {
234 $table = $self->_quote($table);
235 }
236 local $self->{rownum_hack_count} = 1
237 if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
238 @rest = (-1) unless defined $rest[0];
e8fcf76f 239 croak "LIMIT 0 Does Not Compute" if $rest[0] == 0;
6f4ddea1 240 # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
db56cf3d 241 my ($sql, @where_bind) = $self->SUPER::select(
6f4ddea1 242 $table, $self->_recurse_fields($fields), $where, $order, @rest
243 );
ec7cfd36 244 if (my $for = delete $self->{_dbic_rs_attrs}{for}) {
245 $sql .= " $for_syntax->{$for}" if $for_syntax->{$for};
246 }
247
1cbd3034 248 return wantarray ? ($sql, @{$self->{from_bind}}, @where_bind, @{$self->{having_bind}}, @{$self->{order_bind}} ) : $sql;
6f4ddea1 249}
250
251sub insert {
252 my $self = shift;
253 my $table = shift;
254 $table = $self->_quote($table) unless ref($table);
7a72e5a5 255
256 # SQLA will emit INSERT INTO $table ( ) VALUES ( )
257 # which is sadly understood only by MySQL. Change default behavior here,
258 # until SQLA2 comes with proper dialect support
259 if (! $_[0] or (ref $_[0] eq 'HASH' and !keys %{$_[0]} ) ) {
260 return "INSERT INTO ${table} DEFAULT VALUES"
261 }
262
6f4ddea1 263 $self->SUPER::insert($table, @_);
264}
265
266sub update {
267 my $self = shift;
268 my $table = shift;
269 $table = $self->_quote($table) unless ref($table);
270 $self->SUPER::update($table, @_);
271}
272
273sub delete {
274 my $self = shift;
275 my $table = shift;
276 $table = $self->_quote($table) unless ref($table);
277 $self->SUPER::delete($table, @_);
278}
279
280sub _emulate_limit {
281 my $self = shift;
282 if ($_[3] == -1) {
283 return $_[1].$self->_order_by($_[2]);
284 } else {
285 return $self->SUPER::_emulate_limit(@_);
286 }
287}
288
289sub _recurse_fields {
290 my ($self, $fields, $params) = @_;
291 my $ref = ref $fields;
292 return $self->_quote($fields) unless $ref;
293 return $$fields if $ref eq 'SCALAR';
294
295 if ($ref eq 'ARRAY') {
296 return join(', ', map {
297 $self->_recurse_fields($_)
298 .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack})
299 ? ' AS col'.$self->{rownum_hack_count}++
300 : '')
301 } @$fields);
302 } elsif ($ref eq 'HASH') {
303 foreach my $func (keys %$fields) {
db56cf3d 304 if ($func eq 'distinct') {
305 my $_fields = $fields->{$func};
306 if (ref $_fields eq 'ARRAY' && @{$_fields} > 1) {
04ebc6f4 307 croak (
308 'The select => { distinct => ... } syntax is not supported for multiple columns.'
309 .' Instead please use { group_by => [ qw/' . (join ' ', @$_fields) . '/ ] }'
310 .' or { select => [ qw/' . (join ' ', @$_fields) . '/ ], distinct => 1 }'
311 );
db56cf3d 312 }
313 else {
314 $_fields = @{$_fields}[0] if ref $_fields eq 'ARRAY';
04ebc6f4 315 carp (
316 'The select => { distinct => ... } syntax will be deprecated in DBIC version 0.09,'
317 ." please use { group_by => '${_fields}' } or { select => '${_fields}', distinct => 1 }"
318 );
db56cf3d 319 }
320 }
6f4ddea1 321 return $self->_sqlcase($func)
322 .'( '.$self->_recurse_fields($fields->{$func}).' )';
323 }
324 }
325 # Is the second check absolutely necessary?
326 elsif ( $ref eq 'REF' and ref($$fields) eq 'ARRAY' ) {
db56cf3d 327 return $self->_fold_sqlbind( $fields );
6f4ddea1 328 }
329 else {
e8fcf76f 330 croak($ref . qq{ unexpected in _recurse_fields()})
6f4ddea1 331 }
332}
333
334sub _order_by {
1cbd3034 335 my ($self, $arg) = @_;
15827712 336
1cbd3034 337 if (ref $arg eq 'HASH' and keys %$arg and not grep { $_ =~ /^-(?:desc|asc)/i } keys %$arg ) {
338
339 my $ret = '';
340
341 if (defined $arg->{group_by}) {
6f4ddea1 342 $ret = $self->_sqlcase(' group by ')
1cbd3034 343 .$self->_recurse_fields($arg->{group_by}, { no_rownum_hack => 1 });
6f4ddea1 344 }
15827712 345
1cbd3034 346 if (defined $arg->{having}) {
347 my ($frag, @bind) = $self->_recurse_where($arg->{having});
348 push(@{$self->{having_bind}}, @bind);
6f4ddea1 349 $ret .= $self->_sqlcase(' having ').$frag;
350 }
15827712 351
1cbd3034 352 if (defined $arg->{order_by}) {
353 my ($frag, @bind) = $self->SUPER::_order_by($arg->{order_by});
354 push(@{$self->{order_bind}}, @bind);
355 $ret .= $frag;
6f4ddea1 356 }
15827712 357
1cbd3034 358 return $ret;
fde3719a 359 }
1cbd3034 360 else {
361 my ($sql, @bind) = $self->SUPER::_order_by ($arg);
362 push(@{$self->{order_bind}}, @bind);
363 return $sql;
fd4cb60a 364 }
6f4ddea1 365}
366
1cbd3034 367sub _order_directions {
fd4cb60a 368 my ($self, $order) = @_;
15827712 369
1cbd3034 370 # strip bind values - none of the current _order_directions users support them
371 return $self->SUPER::_order_directions( [ map
372 { ref $_ ? $_->[0] : $_ }
373 $self->_order_by_chunks ($order)
374 ]);
fd4cb60a 375}
376
6f4ddea1 377sub _table {
378 my ($self, $from) = @_;
379 if (ref $from eq 'ARRAY') {
380 return $self->_recurse_from(@$from);
381 } elsif (ref $from eq 'HASH') {
382 return $self->_make_as($from);
383 } else {
384 return $from; # would love to quote here but _table ends up getting called
385 # twice during an ->select without a limit clause due to
386 # the way S::A::Limit->select works. should maybe consider
387 # bypassing this and doing S::A::select($self, ...) in
388 # our select method above. meantime, quoting shims have
389 # been added to select/insert/update/delete here
390 }
391}
392
393sub _recurse_from {
394 my ($self, $from, @join) = @_;
395 my @sqlf;
396 push(@sqlf, $self->_make_as($from));
397 foreach my $j (@join) {
398 my ($to, $on) = @$j;
399
400 # check whether a join type exists
401 my $join_clause = '';
402 my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
403 if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
404 $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
405 } else {
406 $join_clause = ' JOIN ';
407 }
408 push(@sqlf, $join_clause);
409
410 if (ref $to eq 'ARRAY') {
411 push(@sqlf, '(', $self->_recurse_from(@$to), ')');
412 } else {
413 push(@sqlf, $self->_make_as($to));
414 }
415 push(@sqlf, ' ON ', $self->_join_condition($on));
416 }
417 return join('', @sqlf);
418}
419
db56cf3d 420sub _fold_sqlbind {
421 my ($self, $sqlbind) = @_;
69989ea9 422
423 my @sqlbind = @$$sqlbind; # copy
424 my $sql = shift @sqlbind;
425 push @{$self->{from_bind}}, @sqlbind;
426
db56cf3d 427 return $sql;
6f4ddea1 428}
429
430sub _make_as {
431 my ($self, $from) = @_;
db56cf3d 432 return join(' ', map { (ref $_ eq 'SCALAR' ? $$_
433 : ref $_ eq 'REF' ? $self->_fold_sqlbind($_)
434 : $self->_quote($_))
6f4ddea1 435 } reverse each %{$self->_skip_options($from)});
436}
437
438sub _skip_options {
439 my ($self, $hash) = @_;
440 my $clean_hash = {};
441 $clean_hash->{$_} = $hash->{$_}
442 for grep {!/^-/} keys %$hash;
443 return $clean_hash;
444}
445
446sub _join_condition {
447 my ($self, $cond) = @_;
448 if (ref $cond eq 'HASH') {
449 my %j;
450 for (keys %$cond) {
451 my $v = $cond->{$_};
452 if (ref $v) {
e8fcf76f 453 croak (ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
6f4ddea1 454 if ref($v) ne 'SCALAR';
455 $j{$_} = $v;
456 }
457 else {
458 my $x = '= '.$self->_quote($v); $j{$_} = \$x;
459 }
460 };
461 return scalar($self->_recurse_where(\%j));
462 } elsif (ref $cond eq 'ARRAY') {
463 return join(' OR ', map { $self->_join_condition($_) } @$cond);
464 } else {
465 die "Can't handle this yet!";
466 }
467}
468
469sub _quote {
470 my ($self, $label) = @_;
471 return '' unless defined $label;
472 return "*" if $label eq '*';
473 return $label unless $self->{quote_char};
474 if(ref $self->{quote_char} eq "ARRAY"){
475 return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
476 if !defined $self->{name_sep};
477 my $sep = $self->{name_sep};
478 return join($self->{name_sep},
479 map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1] }
480 split(/\Q$sep\E/,$label));
481 }
482 return $self->SUPER::_quote($label);
483}
484
485sub limit_dialect {
486 my $self = shift;
487 $self->{limit_dialect} = shift if @_;
488 return $self->{limit_dialect};
489}
490
491sub quote_char {
492 my $self = shift;
493 $self->{quote_char} = shift if @_;
494 return $self->{quote_char};
495}
496
497sub name_sep {
498 my $self = shift;
499 $self->{name_sep} = shift if @_;
500 return $self->{name_sep};
501}
502
5031;
504
505__END__
506
507=pod
508
509=head1 NAME
510
855c6fd0 511DBIx::Class::SQLAHacks - This module is a subclass of SQL::Abstract::Limit
512and includes a number of DBIC-specific workarounds, not yet suitable for
513inclusion into SQLA proper.
6f4ddea1 514
515=head1 METHODS
516
517=head2 new
518
519Tries to determine limit dialect.
520
521=head2 select
522
523Quotes table names, handles "limit" dialects (e.g. where rownum between x and
524y), supports SELECT ... FOR UPDATE and SELECT ... FOR SHARE.
525
526=head2 insert update delete
527
528Just quotes table names.
529
530=head2 limit_dialect
531
532Specifies the dialect of used for implementing an SQL "limit" clause for
533restricting the number of query results returned. Valid values are: RowNum.
534
535See L<DBIx::Class::Storage::DBI/connect_info> for details.
536
537=head2 name_sep
538
539Character separating quoted table names.
540
541See L<DBIx::Class::Storage::DBI/connect_info> for details.
542
543=head2 quote_char
544
545Set to an array-ref to specify separate left and right quotes for table names.
546
547See L<DBIx::Class::Storage::DBI/connect_info> for details.
548
549=cut
550