Stop relying on ->can in the SQL::Abstract carp-hack, allows
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / SQLAHacks.pm
CommitLineData
6f4ddea1 1package # Hide from PAUSE
855c6fd0 2 DBIx::Class::SQLAHacks;
6f4ddea1 3
998373c2 4# This module is a subclass of SQL::Abstract::Limit and includes a number
5# of DBIC-specific workarounds, not yet suitable for inclusion into the
6# SQLA core
7
6f4ddea1 8use base qw/SQL::Abstract::Limit/;
e3764383 9use strict;
10use warnings;
6298a324 11use List::Util 'first';
12use Sub::Name 'subname';
ac322978 13use Carp::Clan qw/^DBIx::Class|^SQL::Abstract|^Try::Tiny/;
e8fc51c7 14use namespace::clean;
b2b22cd6 15
16BEGIN {
17 # reinstall the carp()/croak() functions imported into SQL::Abstract
18 # as Carp and Carp::Clan do not like each other much
19 no warnings qw/redefine/;
20 no strict qw/refs/;
21 for my $f (qw/carp croak/) {
329d7385 22
b2b22cd6 23 my $orig = \&{"SQL::Abstract::$f"};
e8fc51c7 24 my $clan_import = \&{$f};
6298a324 25 *{"SQL::Abstract::$f"} = subname "SQL::Abstract::$f" =>
8637bb24 26 sub {
27 if (Carp::longmess() =~ /DBIx::Class::SQLAHacks::[\w]+ .+? called \s at/x) {
e8fc51c7 28 $clan_import->(@_);
8637bb24 29 }
30 else {
31 goto $orig;
32 }
33 };
b2b22cd6 34 }
35}
6f4ddea1 36
e9657379 37# the "oh noes offset/top without limit" constant
02a8d945 38# limited to 32 bits for sanity (and since it is fed
39# to sprintf %u)
e9657379 40sub __max_int { 0xFFFFFFFF };
41
998373c2 42
43# Tries to determine limit dialect.
44#
6f4ddea1 45sub new {
46 my $self = shift->SUPER::new(@_);
47
48 # This prevents the caching of $dbh in S::A::L, I believe
49 # If limit_dialect is a ref (like a $dbh), go ahead and replace
50 # it with what it resolves to:
51 $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
52 if ref $self->{limit_dialect};
53
54 $self;
55}
56
a54bd479 57# !!! THIS IS ALSO HORRIFIC !!! /me ashamed
58#
75f025cf 59# Generates inner/outer select lists for various limit dialects
81446c4f 60# which result in one or more subqueries (e.g. RNO, Top, RowNum)
61# Any non-root-table columns need to have their table qualifier
a54bd479 62# turned into a column alias (otherwise names in subqueries clash
81446c4f 63# and/or lose their source table)
a54bd479 64#
75f025cf 65# Returns inner/outer strings of SQL QUOTED selectors with aliases
a54bd479 66# (to be used in whatever select statement), and an alias index hashref
67# of QUOTED SEL => QUOTED ALIAS pairs (to maybe be used for string-subst
75f025cf 68# higher up).
69# If an order_by is supplied, the inner select needs to bring out columns
70# used in implicit (non-selected) orders, and the order condition itself
71# needs to be realiased to the proper names in the outer query. Thus we
72# also return a hashref (order doesn't matter) of QUOTED EXTRA-SEL =>
73# QUOTED ALIAS pairs, which is a list of extra selectors that do *not*
74# exist in the original select list
a54bd479 75
76sub _subqueried_limit_attrs {
75f025cf 77 my ($self, $rs_attrs) = @_;
81446c4f 78
a54bd479 79 croak 'Limit dialect implementation usable only in the context of DBIC (missing $rs_attrs)'
80 unless ref ($rs_attrs) eq 'HASH';
81
82 my ($re_sep, $re_alias) = map { quotemeta $_ } (
83 $self->name_sep || '.',
84 $rs_attrs->{alias},
85 );
81446c4f 86
a54bd479 87 # correlate select and as, build selection index
88 my (@sel, $in_sel_index);
81446c4f 89 for my $i (0 .. $#{$rs_attrs->{select}}) {
a54bd479 90
81446c4f 91 my $s = $rs_attrs->{select}[$i];
a54bd479 92 my $sql_sel = $self->_recurse_fields ($s);
93 my $sql_alias = (ref $s) eq 'HASH' ? $s->{-as} : undef;
94
95
81446c4f 96 push @sel, {
a54bd479 97 sql => $sql_sel,
81446c4f 98 unquoted_sql => do { local $self->{quote_char}; $self->_recurse_fields ($s) },
99 as =>
a54bd479 100 $sql_alias
81446c4f 101 ||
102 $rs_attrs->{as}[$i]
103 ||
104 croak "Select argument $i ($s) without corresponding 'as'"
105 ,
106 };
a54bd479 107
108 $in_sel_index->{$sql_sel}++;
109 $in_sel_index->{$self->_quote ($sql_alias)}++ if $sql_alias;
110
75f025cf 111 # record unqualified versions too, so we do not have
112 # to reselect the same column twice (in qualified and
113 # unqualified form)
114 if (! ref $s && $sql_sel =~ / $re_sep (.+) $/x) {
115 $in_sel_index->{$1}++;
116 }
81446c4f 117 }
118
81446c4f 119
120 # re-alias and remove any name separators from aliases,
121 # unless we are dealing with the current source alias
a54bd479 122 # (which will transcend the subqueries as it is necessary
81446c4f 123 # for possible further chaining)
a54bd479 124 my (@in_sel, @out_sel, %renamed);
81446c4f 125 for my $node (@sel) {
f8583f8f 126 if (first { $_ =~ / (?<! ^ $re_alias ) $re_sep /x } ($node->{as}, $node->{unquoted_sql}) ) {
63ca94e1 127 $node->{as} = $self->_unqualify_colname($node->{as});
a54bd479 128 my $quoted_as = $self->_quote($node->{as});
129 push @in_sel, sprintf '%s AS %s', $node->{sql}, $quoted_as;
130 push @out_sel, $quoted_as;
131 $renamed{$node->{sql}} = $quoted_as;
81446c4f 132 }
133 else {
a54bd479 134 push @in_sel, $node->{sql};
135 push @out_sel, $self->_quote ($node->{as});
81446c4f 136 }
137 }
138
75f025cf 139 # see if the order gives us anything
a54bd479 140 my %extra_order_sel;
75f025cf 141 for my $chunk ($self->_order_by_chunks ($rs_attrs->{order_by})) {
142 # order with bind
143 $chunk = $chunk->[0] if (ref $chunk) eq 'ARRAY';
144 $chunk =~ s/\s+ (?: ASC|DESC ) \s* $//ix;
a54bd479 145
75f025cf 146 next if $in_sel_index->{$chunk};
a54bd479 147
75f025cf 148 $extra_order_sel{$chunk} ||= $self->_quote (
149 'ORDER__BY__' . scalar keys %extra_order_sel
150 );
a54bd479 151 }
75f025cf 152
a54bd479 153 return (
154 (map { join (', ', @$_ ) } (
155 \@in_sel,
156 \@out_sel)
157 ),
158 \%renamed,
159 keys %extra_order_sel ? \%extra_order_sel : (),
160 );
81446c4f 161}
162
63ca94e1 163sub _unqualify_colname {
164 my ($self, $fqcn) = @_;
165 my $re_sep = quotemeta($self->name_sep || '.');
166 $fqcn =~ s/ $re_sep /__/xg;
167 return $fqcn;
168}
169
a54bd479 170# ANSI standard Limit/Offset implementation. DB2 and MSSQL >= 2005 use this
6f4ddea1 171sub _RowNumberOver {
a6b68a60 172 my ($self, $sql, $rs_attrs, $rows, $offset ) = @_;
6f4ddea1 173
81446c4f 174 # mangle the input sql as we will be replacing the selector
175 $sql =~ s/^ \s* SELECT \s+ .+? \s+ (?= \b FROM \b )//ix
cb478051 176 or croak "Unrecognizable SELECT: $sql";
177
a54bd479 178 # get selectors, and scan the order_by (if any)
75f025cf 179 my ($in_sel, $out_sel, $alias_map, $extra_order_sel)
180 = $self->_subqueried_limit_attrs ( $rs_attrs );
81446c4f 181
a6b68a60 182 # make up an order if none exists
a54bd479 183 my $requested_order = (delete $rs_attrs->{order_by}) || $self->_rno_default_order;
184 my $rno_ord = $self->_order_by ($requested_order);
185
186 # this is the order supplement magic
187 my $mid_sel = $out_sel;
188 if ($extra_order_sel) {
20f44a33 189 for my $extra_col (sort
190 { $extra_order_sel->{$a} cmp $extra_order_sel->{$b} }
191 keys %$extra_order_sel
192 ) {
a54bd479 193 $in_sel .= sprintf (', %s AS %s',
194 $extra_col,
195 $extra_order_sel->{$extra_col},
196 );
197
198 $mid_sel .= ', ' . $extra_order_sel->{$extra_col};
199 }
200 }
201
202 # and this is order re-alias magic
203 for ($extra_order_sel, $alias_map) {
204 for my $col (keys %$_) {
205 my $re_col = quotemeta ($col);
206 $rno_ord =~ s/$re_col/$_->{$col}/;
207 }
208 }
6f4ddea1 209
81446c4f 210 # whatever is left of the order_by (only where is processed at this point)
a6b68a60 211 my $group_having = $self->_parse_rs_attrs($rs_attrs);
6f4ddea1 212
a6b68a60 213 my $qalias = $self->_quote ($rs_attrs->{alias});
81446c4f 214 my $idx_name = $self->_quote ('rno__row__index');
215
cb478051 216 $sql = sprintf (<<EOS, $offset + 1, $offset + $rows, );
6f4ddea1 217
a54bd479 218SELECT $out_sel FROM (
219 SELECT $mid_sel, ROW_NUMBER() OVER( $rno_ord ) AS $idx_name FROM (
220 SELECT $in_sel ${sql}${group_having}
cb478051 221 ) $qalias
75f025cf 222) $qalias WHERE $idx_name BETWEEN %u AND %u
6553ac38 223
224EOS
225
226 $sql =~ s/\s*\n\s*/ /g; # easier to read in the debugger
6f4ddea1 227 return $sql;
228}
229
84ddb3da 230# some databases are happy with OVER (), some need OVER (ORDER BY (SELECT (1)) )
231sub _rno_default_order {
232 return undef;
233}
234
193590c2 235# Informix specific limit, almost like LIMIT/OFFSET
236sub _SkipFirst {
a6b68a60 237 my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
193590c2 238
239 $sql =~ s/^ \s* SELECT \s+ //ix
240 or croak "Unrecognizable SELECT: $sql";
241
242 return sprintf ('SELECT %s%s%s%s',
243 $offset
75f025cf 244 ? sprintf ('SKIP %u ', $offset)
193590c2 245 : ''
246 ,
75f025cf 247 sprintf ('FIRST %u ', $rows),
193590c2 248 $sql,
a6b68a60 249 $self->_parse_rs_attrs ($rs_attrs),
193590c2 250 );
251}
252
145b2a3d 253# Firebird specific limit, reverse of _SkipFirst for Informix
254sub _FirstSkip {
a6b68a60 255 my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
145b2a3d 256
257 $sql =~ s/^ \s* SELECT \s+ //ix
258 or croak "Unrecognizable SELECT: $sql";
259
260 return sprintf ('SELECT %s%s%s%s',
75f025cf 261 sprintf ('FIRST %u ', $rows),
145b2a3d 262 $offset
75f025cf 263 ? sprintf ('SKIP %u ', $offset)
145b2a3d 264 : ''
265 ,
266 $sql,
a6b68a60 267 $self->_parse_rs_attrs ($rs_attrs),
145b2a3d 268 );
269}
270
81446c4f 271# WhOracle limits
272sub _RowNum {
273 my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
274
275 # mangle the input sql as we will be replacing the selector
276 $sql =~ s/^ \s* SELECT \s+ .+? \s+ (?= \b FROM \b )//ix
277 or croak "Unrecognizable SELECT: $sql";
278
a54bd479 279 my ($insel, $outsel) = $self->_subqueried_limit_attrs ($rs_attrs);
81446c4f 280
281 my $qalias = $self->_quote ($rs_attrs->{alias});
282 my $idx_name = $self->_quote ('rownum__index');
283 my $order_group_having = $self->_parse_rs_attrs($rs_attrs);
284
285 $sql = sprintf (<<EOS, $offset + 1, $offset + $rows, );
286
287SELECT $outsel FROM (
288 SELECT $outsel, ROWNUM $idx_name FROM (
289 SELECT $insel ${sql}${order_group_having}
290 ) $qalias
75f025cf 291) $qalias WHERE $idx_name BETWEEN %u AND %u
81446c4f 292
293EOS
294
295 $sql =~ s/\s*\n\s*/ /g; # easier to read in the debugger
296 return $sql;
297}
298
a54bd479 299# Crappy Top based Limit/Offset support. Legacy for MSSQL < 2005
15827712 300sub _Top {
a6b68a60 301 my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
15827712 302
81446c4f 303 # mangle the input sql as we will be replacing the selector
304 $sql =~ s/^ \s* SELECT \s+ .+? \s+ (?= \b FROM \b )//ix
b1e1d073 305 or croak "Unrecognizable SELECT: $sql";
b1e1d073 306
81446c4f 307 # get selectors
a54bd479 308 my ($in_sel, $out_sel, $alias_map, $extra_order_sel)
75f025cf 309 = $self->_subqueried_limit_attrs ($rs_attrs);
81446c4f 310
a54bd479 311 my $requested_order = delete $rs_attrs->{order_by};
81446c4f 312
a54bd479 313 my $order_by_requested = $self->_order_by ($requested_order);
314
315 # make up an order unless supplied
316 my $inner_order = ($order_by_requested
317 ? $requested_order
81446c4f 318 : [ map
a54bd479 319 { join ('', $rs_attrs->{alias}, $self->{name_sep}||'.', $_ ) }
320 ( $rs_attrs->{_rsroot_source_handle}->resolve->_pri_cols )
81446c4f 321 ]
a54bd479 322 );
81446c4f 323
a54bd479 324 my ($order_by_inner, $order_by_reversed);
81446c4f 325
a54bd479 326 # localise as we already have all the bind values we need
327 {
328 local $self->{order_bind};
329 $order_by_inner = $self->_order_by ($inner_order);
ac93965c 330
a54bd479 331 my @out_chunks;
332 for my $ch ($self->_order_by_chunks ($inner_order)) {
333 $ch = $ch->[0] if ref $ch eq 'ARRAY';
20f44a33 334
a54bd479 335 $ch =~ s/\s+ ( ASC|DESC ) \s* $//ix;
336 my $dir = uc ($1||'ASC');
b1e1d073 337
a54bd479 338 push @out_chunks, \join (' ', $ch, $dir eq 'ASC' ? 'DESC' : 'ASC' );
339 }
b1e1d073 340
20f44a33 341 $order_by_reversed = $self->_order_by (\@out_chunks);
42e5b103 342 }
b1e1d073 343
a54bd479 344 # this is the order supplement magic
345 my $mid_sel = $out_sel;
346 if ($extra_order_sel) {
20f44a33 347 for my $extra_col (sort
348 { $extra_order_sel->{$a} cmp $extra_order_sel->{$b} }
349 keys %$extra_order_sel
350 ) {
a54bd479 351 $in_sel .= sprintf (', %s AS %s',
352 $extra_col,
353 $extra_order_sel->{$extra_col},
42e5b103 354 );
b1e1d073 355
a54bd479 356 $mid_sel .= ', ' . $extra_order_sel->{$extra_col};
42e5b103 357 }
25abda27 358
359 # since whatever order bindvals there are, they will be realiased
360 # and need to show up in front of the entire initial inner subquery
361 # Unshift *from_bind* to make this happen (horrible, horrible, but
362 # we don't have another mechanism yet)
363 unshift @{$self->{from_bind}}, @{$self->{order_bind}};
b1e1d073 364 }
365
a54bd479 366 # and this is order re-alias magic
367 for my $map ($extra_order_sel, $alias_map) {
368 for my $col (keys %$map) {
369 my $re_col = quotemeta ($col);
370 $_ =~ s/$re_col/$map->{$col}/
371 for ($order_by_reversed, $order_by_requested);
b1e1d073 372 }
373 }
15827712 374
a54bd479 375 # generate the rest of the sql
376 my $grpby_having = $self->_parse_rs_attrs ($rs_attrs);
15827712 377
a54bd479 378 my $quoted_rs_alias = $self->_quote ($rs_attrs->{alias});
b1e1d073 379
75f025cf 380 $sql = sprintf ('SELECT TOP %u %s %s %s %s',
a54bd479 381 $rows + ($offset||0),
382 $in_sel,
383 $sql,
384 $grpby_having,
385 $order_by_inner,
386 );
15827712 387
75f025cf 388 $sql = sprintf ('SELECT TOP %u %s FROM ( %s ) %s %s',
a54bd479 389 $rows,
390 $mid_sel,
391 $sql,
392 $quoted_rs_alias,
393 $order_by_reversed,
394 ) if $offset;
15827712 395
75f025cf 396 $sql = sprintf ('SELECT TOP %u %s FROM ( %s ) %s %s',
a54bd479 397 $rows,
a54bd479 398 $out_sel,
399 $sql,
400 $quoted_rs_alias,
20f44a33 401 $order_by_requested,
76fc0d71 402 ) if ( ($offset && $order_by_requested) || ($mid_sel ne $out_sel) );
b1e1d073 403
75f025cf 404 $sql =~ s/\s*\n\s*/ /g; # easier to read in the debugger
405 return $sql;
406}
407
3f528319 408# This for Sybase ASE, to use SET ROWCOUNT when there is no offset, and
409# GenericSubQ otherwise.
410sub _RowCountOrGenericSubQ {
411 my $self = shift;
412 my ($sql, $rs_attrs, $rows, $offset) = @_;
413
414 return $self->_GenericSubQ(@_) if $offset;
415
416 return sprintf <<"EOF", $rows, $sql;
417SET ROWCOUNT %d
418%s
419SET ROWCOUNT 0
420EOF
421}
422
75f025cf 423# This is the most evil limit "dialect" (more of a hack) for *really*
424# stupid databases. It works by ordering the set by some unique column,
425# and calculating amount of rows that have a less-er value (thus
426# emulating a RowNum-like index). Of course this implies the set can
427# only be ordered by a single unique columns.
428sub _GenericSubQ {
429 my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
430
431 my $root_rsrc = $rs_attrs->{_rsroot_source_handle}->resolve;
432 my $root_tbl_name = $root_rsrc->name;
433
434 # mangle the input sql as we will be replacing the selector
435 $sql =~ s/^ \s* SELECT \s+ .+? \s+ (?= \b FROM \b )//ix
436 or croak "Unrecognizable SELECT: $sql";
437
438 my ($order_by, @rest) = do {
439 local $self->{quote_char};
440 $self->_order_by_chunks ($rs_attrs->{order_by})
441 };
442
443 unless (
444 $order_by
445 &&
446 ! @rest
447 &&
448 ( ! ref $order_by
449 ||
450 ( ref $order_by eq 'ARRAY' and @$order_by == 1 )
451 )
452 ) {
453 croak (
454 'Generic Subquery Limit does not work on resultsets without an order, or resultsets '
455 . 'with complex order criteria (multicolumn and/or functions). Provide a single, '
456 . 'unique-column order criteria.'
457 );
458 }
459
460 ($order_by) = @$order_by if ref $order_by;
461
462 $order_by =~ s/\s+ ( ASC|DESC ) \s* $//ix;
463 my $direction = lc ($1 || 'asc');
464
465 my ($unq_sort_col) = $order_by =~ /(?:^|\.)([^\.]+)$/;
466
467 my $inf = $root_rsrc->storage->_resolve_column_info (
468 $rs_attrs->{from}, [$order_by, $unq_sort_col]
469 );
470
471 my $ord_colinfo = $inf->{$order_by} || croak "Unable to determine source of order-criteria '$order_by'";
472
473 if ($ord_colinfo->{-result_source}->name ne $root_tbl_name) {
474 croak "Generic Subquery Limit order criteria can be only based on the root-source '"
475 . $root_rsrc->source_name . "' (aliased as '$rs_attrs->{alias}')";
476 }
477
478 # make sure order column is qualified
479 $order_by = "$rs_attrs->{alias}.$order_by"
480 unless $order_by =~ /^$rs_attrs->{alias}\./;
481
482 my $is_u;
483 my $ucs = { $root_rsrc->unique_constraints };
484 for (values %$ucs ) {
485 if (@$_ == 1 && "$rs_attrs->{alias}.$_->[0]" eq $order_by) {
486 $is_u++;
487 last;
488 }
489 }
490 croak "Generic Subquery Limit order criteria column '$order_by' must be unique (no unique constraint found)"
491 unless $is_u;
492
493 my ($in_sel, $out_sel, $alias_map, $extra_order_sel)
494 = $self->_subqueried_limit_attrs ($rs_attrs);
495
496 my $cmp_op = $direction eq 'desc' ? '>' : '<';
497 my $count_tbl_alias = 'rownum__emulation';
498
b13d2248 499 my $order_sql = $self->_order_by (delete $rs_attrs->{order_by});
500 my $group_having_sql = $self->_parse_rs_attrs($rs_attrs);
75f025cf 501
502 # add the order supplement (if any) as this is what will be used for the outer WHERE
503 $in_sel .= ", $_" for keys %{$extra_order_sel||{}};
504
505 $sql = sprintf (<<EOS,
506SELECT $out_sel
507 FROM (
b13d2248 508 SELECT $in_sel ${sql}${group_having_sql}
75f025cf 509 ) %s
510WHERE ( SELECT COUNT(*) FROM %s %s WHERE %s $cmp_op %s ) %s
b13d2248 511$order_sql
75f025cf 512EOS
513 ( map { $self->_quote ($_) } (
514 $rs_attrs->{alias},
515 $root_tbl_name,
516 $count_tbl_alias,
517 "$count_tbl_alias.$unq_sort_col",
518 $order_by,
519 )),
520 $offset
521 ? sprintf ('BETWEEN %u AND %u', $offset, $offset + $rows - 1)
522 : sprintf ('< %u', $rows )
523 ,
524 );
525
526 $sql =~ s/\s*\n\s*/ /g; # easier to read in the debugger
b1e1d073 527 return $sql;
15827712 528}
a54bd479 529
6f4ddea1 530
531# While we're at it, this should make LIMIT queries more efficient,
532# without digging into things too deeply
6f4ddea1 533sub _find_syntax {
534 my ($self, $syntax) = @_;
ac788c46 535 return $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
6f4ddea1 536}
537
998373c2 538# Quotes table names, handles "limit" dialects (e.g. where rownum between x and
a6b68a60 539# y)
6f4ddea1 540sub select {
a6b68a60 541 my ($self, $table, $fields, $where, $rs_attrs, @rest) = @_;
1cbd3034 542
c2b7c5dc 543 if (not ref($table) or ref($table) eq 'SCALAR') {
6f4ddea1 544 $table = $self->_quote($table);
545 }
c2b7c5dc 546
6f4ddea1 547 @rest = (-1) unless defined $rest[0];
e8fcf76f 548 croak "LIMIT 0 Does Not Compute" if $rest[0] == 0;
6f4ddea1 549 # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
a6b68a60 550
49afd714 551 my ($sql, @bind) = $self->SUPER::select(
a6b68a60 552 $table, $self->_recurse_fields($fields), $where, $rs_attrs, @rest
6f4ddea1 553 );
49afd714 554 push @{$self->{where_bind}}, @bind;
583a0c65 555
556# this *must* be called, otherwise extra binds will remain in the sql-maker
49afd714 557 my @all_bind = $self->_assemble_binds;
583a0c65 558
49afd714 559 return wantarray ? ($sql, @all_bind) : $sql;
583a0c65 560}
561
562sub _assemble_binds {
563 my $self = shift;
564 return map { @{ (delete $self->{"${_}_bind"}) || [] } } (qw/from where having order/);
6f4ddea1 565}
566
998373c2 567# Quotes table names, and handles default inserts
6f4ddea1 568sub insert {
569 my $self = shift;
570 my $table = shift;
c2b7c5dc 571 $table = $self->_quote($table);
7a72e5a5 572
573 # SQLA will emit INSERT INTO $table ( ) VALUES ( )
574 # which is sadly understood only by MySQL. Change default behavior here,
575 # until SQLA2 comes with proper dialect support
576 if (! $_[0] or (ref $_[0] eq 'HASH' and !keys %{$_[0]} ) ) {
28d28903 577 my $sql = "INSERT INTO ${table} DEFAULT VALUES";
578
ef8d02eb 579 if (my $ret = ($_[1]||{})->{returning} ) {
580 $sql .= $self->_insert_returning ($ret);
28d28903 581 }
582
583 return $sql;
7a72e5a5 584 }
585
6f4ddea1 586 $self->SUPER::insert($table, @_);
587}
588
998373c2 589# Just quotes table names.
6f4ddea1 590sub update {
591 my $self = shift;
592 my $table = shift;
c2b7c5dc 593 $table = $self->_quote($table);
6f4ddea1 594 $self->SUPER::update($table, @_);
595}
596
998373c2 597# Just quotes table names.
6f4ddea1 598sub delete {
599 my $self = shift;
600 my $table = shift;
c2b7c5dc 601 $table = $self->_quote($table);
6f4ddea1 602 $self->SUPER::delete($table, @_);
603}
604
605sub _emulate_limit {
606 my $self = shift;
a6b68a60 607 # my ( $syntax, $sql, $order, $rows, $offset ) = @_;
608
6f4ddea1 609 if ($_[3] == -1) {
a6b68a60 610 return $_[1] . $self->_parse_rs_attrs($_[2]);
6f4ddea1 611 } else {
612 return $self->SUPER::_emulate_limit(@_);
613 }
614}
615
616sub _recurse_fields {
81446c4f 617 my ($self, $fields) = @_;
6f4ddea1 618 my $ref = ref $fields;
619 return $self->_quote($fields) unless $ref;
620 return $$fields if $ref eq 'SCALAR';
621
622 if ($ref eq 'ARRAY') {
81446c4f 623 return join(', ', map { $self->_recurse_fields($_) } @$fields);
83e09b5b 624 }
625 elsif ($ref eq 'HASH') {
81446c4f 626 my %hash = %$fields; # shallow copy
83e09b5b 627
50136dd9 628 my $as = delete $hash{-as}; # if supplied
629
81446c4f 630 my ($func, $args, @toomany) = %hash;
631
632 # there should be only one pair
633 if (@toomany) {
634 croak "Malformed select argument - too many keys in hash: " . join (',', keys %$fields );
635 }
50136dd9 636
637 if (lc ($func) eq 'distinct' && ref $args eq 'ARRAY' && @$args > 1) {
638 croak (
639 'The select => { distinct => ... } syntax is not supported for multiple columns.'
640 .' Instead please use { group_by => [ qw/' . (join ' ', @$args) . '/ ] }'
641 .' or { select => [ qw/' . (join ' ', @$args) . '/ ], distinct => 1 }'
83e09b5b 642 );
6f4ddea1 643 }
83e09b5b 644
50136dd9 645 my $select = sprintf ('%s( %s )%s',
646 $self->_sqlcase($func),
647 $self->_recurse_fields($args),
648 $as
0491b597 649 ? sprintf (' %s %s', $self->_sqlcase('as'), $self->_quote ($as) )
50136dd9 650 : ''
651 );
652
83e09b5b 653 return $select;
6f4ddea1 654 }
655 # Is the second check absolutely necessary?
656 elsif ( $ref eq 'REF' and ref($$fields) eq 'ARRAY' ) {
db56cf3d 657 return $self->_fold_sqlbind( $fields );
6f4ddea1 658 }
659 else {
e8fcf76f 660 croak($ref . qq{ unexpected in _recurse_fields()})
6f4ddea1 661 }
662}
663
a6b68a60 664my $for_syntax = {
665 update => 'FOR UPDATE',
666 shared => 'FOR SHARE',
667};
668
669# this used to be a part of _order_by but is broken out for clarity.
670# What we have been doing forever is hijacking the $order arg of
671# SQLA::select to pass in arbitrary pieces of data (first the group_by,
672# then pretty much the entire resultset attr-hash, as more and more
673# things in the SQLA space need to have mopre info about the $rs they
674# create SQL for. The alternative would be to keep expanding the
675# signature of _select with more and more positional parameters, which
676# is just gross. All hail SQLA2!
677sub _parse_rs_attrs {
1cbd3034 678 my ($self, $arg) = @_;
15827712 679
a6b68a60 680 my $sql = '';
1cbd3034 681
b03f30a3 682 if (my $g = $self->_recurse_fields($arg->{group_by}) ) {
a6b68a60 683 $sql .= $self->_sqlcase(' group by ') . $g;
684 }
1cbd3034 685
a6b68a60 686 if (defined $arg->{having}) {
687 my ($frag, @bind) = $self->_recurse_where($arg->{having});
688 push(@{$self->{having_bind}}, @bind);
689 $sql .= $self->_sqlcase(' having ') . $frag;
690 }
15827712 691
a6b68a60 692 if (defined $arg->{order_by}) {
693 $sql .= $self->_order_by ($arg->{order_by});
694 }
15827712 695
a6b68a60 696 if (my $for = $arg->{for}) {
697 $sql .= " $for_syntax->{$for}" if $for_syntax->{$for};
698 }
699
700 return $sql;
701}
702
703sub _order_by {
704 my ($self, $arg) = @_;
15827712 705
a6b68a60 706 # check that we are not called in legacy mode (order_by as 4th argument)
707 if (ref $arg eq 'HASH' and not grep { $_ =~ /^-(?:desc|asc)/i } keys %$arg ) {
708 return $self->_parse_rs_attrs ($arg);
fde3719a 709 }
1cbd3034 710 else {
711 my ($sql, @bind) = $self->SUPER::_order_by ($arg);
a6b68a60 712 push @{$self->{order_bind}}, @bind;
1cbd3034 713 return $sql;
fd4cb60a 714 }
6f4ddea1 715}
716
1cbd3034 717sub _order_directions {
fd4cb60a 718 my ($self, $order) = @_;
15827712 719
1cbd3034 720 # strip bind values - none of the current _order_directions users support them
721 return $self->SUPER::_order_directions( [ map
722 { ref $_ ? $_->[0] : $_ }
723 $self->_order_by_chunks ($order)
724 ]);
fd4cb60a 725}
726
6f4ddea1 727sub _table {
728 my ($self, $from) = @_;
729 if (ref $from eq 'ARRAY') {
730 return $self->_recurse_from(@$from);
731 } elsif (ref $from eq 'HASH') {
732 return $self->_make_as($from);
733 } else {
734 return $from; # would love to quote here but _table ends up getting called
735 # twice during an ->select without a limit clause due to
736 # the way S::A::Limit->select works. should maybe consider
737 # bypassing this and doing S::A::select($self, ...) in
738 # our select method above. meantime, quoting shims have
739 # been added to select/insert/update/delete here
740 }
741}
742
b8391c87 743sub _generate_join_clause {
744 my ($self, $join_type) = @_;
745
746 return sprintf ('%s JOIN ',
747 $join_type ? ' ' . uc($join_type) : ''
748 );
749}
750
6f4ddea1 751sub _recurse_from {
752 my ($self, $from, @join) = @_;
753 my @sqlf;
754 push(@sqlf, $self->_make_as($from));
755 foreach my $j (@join) {
756 my ($to, $on) = @$j;
757
aa82ce29 758
6f4ddea1 759 # check whether a join type exists
6f4ddea1 760 my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
aa82ce29 761 my $join_type;
762 if (ref($to_jt) eq 'HASH' and defined($to_jt->{-join_type})) {
763 $join_type = $to_jt->{-join_type};
764 $join_type =~ s/^\s+ | \s+$//xg;
6f4ddea1 765 }
aa82ce29 766
de5f71ef 767 $join_type = $self->{_default_jointype} if not defined $join_type;
aa82ce29 768
b8391c87 769 push @sqlf, $self->_generate_join_clause( $join_type );
6f4ddea1 770
771 if (ref $to eq 'ARRAY') {
772 push(@sqlf, '(', $self->_recurse_from(@$to), ')');
773 } else {
774 push(@sqlf, $self->_make_as($to));
775 }
776 push(@sqlf, ' ON ', $self->_join_condition($on));
777 }
778 return join('', @sqlf);
779}
780
db56cf3d 781sub _fold_sqlbind {
782 my ($self, $sqlbind) = @_;
69989ea9 783
784 my @sqlbind = @$$sqlbind; # copy
785 my $sql = shift @sqlbind;
786 push @{$self->{from_bind}}, @sqlbind;
787
db56cf3d 788 return $sql;
6f4ddea1 789}
790
791sub _make_as {
792 my ($self, $from) = @_;
db56cf3d 793 return join(' ', map { (ref $_ eq 'SCALAR' ? $$_
794 : ref $_ eq 'REF' ? $self->_fold_sqlbind($_)
795 : $self->_quote($_))
6f4ddea1 796 } reverse each %{$self->_skip_options($from)});
797}
798
799sub _skip_options {
800 my ($self, $hash) = @_;
801 my $clean_hash = {};
802 $clean_hash->{$_} = $hash->{$_}
803 for grep {!/^-/} keys %$hash;
804 return $clean_hash;
805}
806
807sub _join_condition {
808 my ($self, $cond) = @_;
809 if (ref $cond eq 'HASH') {
810 my %j;
811 for (keys %$cond) {
812 my $v = $cond->{$_};
813 if (ref $v) {
e8fcf76f 814 croak (ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
6f4ddea1 815 if ref($v) ne 'SCALAR';
816 $j{$_} = $v;
817 }
818 else {
819 my $x = '= '.$self->_quote($v); $j{$_} = \$x;
820 }
821 };
822 return scalar($self->_recurse_where(\%j));
823 } elsif (ref $cond eq 'ARRAY') {
824 return join(' OR ', map { $self->_join_condition($_) } @$cond);
825 } else {
b48d3b4e 826 croak "Can't handle this yet!";
6f4ddea1 827 }
828}
829
6f4ddea1 830sub limit_dialect {
831 my $self = shift;
b2c7cf74 832 if (@_) {
833 $self->{limit_dialect} = shift;
834 undef $self->{_cached_syntax};
835 }
6f4ddea1 836 return $self->{limit_dialect};
837}
838
998373c2 839# Set to an array-ref to specify separate left and right quotes for table names.
840# A single scalar is equivalen to [ $char, $char ]
6f4ddea1 841sub quote_char {
842 my $self = shift;
843 $self->{quote_char} = shift if @_;
844 return $self->{quote_char};
845}
846
998373c2 847# Character separating quoted table names.
6f4ddea1 848sub name_sep {
849 my $self = shift;
850 $self->{name_sep} = shift if @_;
851 return $self->{name_sep};
852}
853
8541;