SQLA::Limit is no more \o/
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / SQLAHacks.pm
CommitLineData
6f4ddea1 1package # Hide from PAUSE
855c6fd0 2 DBIx::Class::SQLAHacks;
6f4ddea1 3
6a247f33 4# This module is a subclass of SQL::Abstract and includes a number of
5# DBIC-specific workarounds, not yet suitable for inclusion into the
6# SQLA core.
7# It also provides all (and more than) the functionality of
8# SQL::Abstract::Limit, which proved to be very hard to keep updated
9
10use base qw/
11 SQL::Abstract
12 Class::Accessor::Grouped
13/;
14use mro 'c3';
e3764383 15use strict;
16use warnings;
6298a324 17use List::Util 'first';
18use Sub::Name 'subname';
ac322978 19use Carp::Clan qw/^DBIx::Class|^SQL::Abstract|^Try::Tiny/;
e8fc51c7 20use namespace::clean;
b2b22cd6 21
6a247f33 22__PACKAGE__->mk_group_accessors (simple => qw/quote_char name_sep limit_dialect/);
23
b2b22cd6 24BEGIN {
25 # reinstall the carp()/croak() functions imported into SQL::Abstract
26 # as Carp and Carp::Clan do not like each other much
27 no warnings qw/redefine/;
28 no strict qw/refs/;
29 for my $f (qw/carp croak/) {
329d7385 30
b2b22cd6 31 my $orig = \&{"SQL::Abstract::$f"};
e8fc51c7 32 my $clan_import = \&{$f};
6298a324 33 *{"SQL::Abstract::$f"} = subname "SQL::Abstract::$f" =>
8637bb24 34 sub {
35 if (Carp::longmess() =~ /DBIx::Class::SQLAHacks::[\w]+ .+? called \s at/x) {
e8fc51c7 36 $clan_import->(@_);
8637bb24 37 }
38 else {
39 goto $orig;
40 }
41 };
b2b22cd6 42 }
43}
6f4ddea1 44
e9657379 45# the "oh noes offset/top without limit" constant
6a247f33 46# limited to 32 bits for sanity (and consistency,
47# since it is ultimately handed to sprintf %u)
48# Implemented as a method, since ::Storage::DBI also
49# refers to it (i.e. for the case of software_limit or
50# as the value to abuse with MSSQL ordered subqueries)
e9657379 51sub __max_int { 0xFFFFFFFF };
52
a54bd479 53# !!! THIS IS ALSO HORRIFIC !!! /me ashamed
54#
75f025cf 55# Generates inner/outer select lists for various limit dialects
81446c4f 56# which result in one or more subqueries (e.g. RNO, Top, RowNum)
57# Any non-root-table columns need to have their table qualifier
a54bd479 58# turned into a column alias (otherwise names in subqueries clash
81446c4f 59# and/or lose their source table)
a54bd479 60#
75f025cf 61# Returns inner/outer strings of SQL QUOTED selectors with aliases
a54bd479 62# (to be used in whatever select statement), and an alias index hashref
63# of QUOTED SEL => QUOTED ALIAS pairs (to maybe be used for string-subst
75f025cf 64# higher up).
65# If an order_by is supplied, the inner select needs to bring out columns
66# used in implicit (non-selected) orders, and the order condition itself
67# needs to be realiased to the proper names in the outer query. Thus we
68# also return a hashref (order doesn't matter) of QUOTED EXTRA-SEL =>
69# QUOTED ALIAS pairs, which is a list of extra selectors that do *not*
70# exist in the original select list
a54bd479 71
72sub _subqueried_limit_attrs {
75f025cf 73 my ($self, $rs_attrs) = @_;
81446c4f 74
a54bd479 75 croak 'Limit dialect implementation usable only in the context of DBIC (missing $rs_attrs)'
76 unless ref ($rs_attrs) eq 'HASH';
77
78 my ($re_sep, $re_alias) = map { quotemeta $_ } (
79 $self->name_sep || '.',
80 $rs_attrs->{alias},
81 );
81446c4f 82
a54bd479 83 # correlate select and as, build selection index
84 my (@sel, $in_sel_index);
81446c4f 85 for my $i (0 .. $#{$rs_attrs->{select}}) {
a54bd479 86
81446c4f 87 my $s = $rs_attrs->{select}[$i];
a54bd479 88 my $sql_sel = $self->_recurse_fields ($s);
89 my $sql_alias = (ref $s) eq 'HASH' ? $s->{-as} : undef;
90
91
81446c4f 92 push @sel, {
a54bd479 93 sql => $sql_sel,
81446c4f 94 unquoted_sql => do { local $self->{quote_char}; $self->_recurse_fields ($s) },
95 as =>
a54bd479 96 $sql_alias
81446c4f 97 ||
98 $rs_attrs->{as}[$i]
99 ||
100 croak "Select argument $i ($s) without corresponding 'as'"
101 ,
102 };
a54bd479 103
104 $in_sel_index->{$sql_sel}++;
105 $in_sel_index->{$self->_quote ($sql_alias)}++ if $sql_alias;
106
75f025cf 107 # record unqualified versions too, so we do not have
108 # to reselect the same column twice (in qualified and
109 # unqualified form)
110 if (! ref $s && $sql_sel =~ / $re_sep (.+) $/x) {
111 $in_sel_index->{$1}++;
112 }
81446c4f 113 }
114
81446c4f 115
116 # re-alias and remove any name separators from aliases,
117 # unless we are dealing with the current source alias
a54bd479 118 # (which will transcend the subqueries as it is necessary
81446c4f 119 # for possible further chaining)
a54bd479 120 my (@in_sel, @out_sel, %renamed);
81446c4f 121 for my $node (@sel) {
f8583f8f 122 if (first { $_ =~ / (?<! ^ $re_alias ) $re_sep /x } ($node->{as}, $node->{unquoted_sql}) ) {
63ca94e1 123 $node->{as} = $self->_unqualify_colname($node->{as});
a54bd479 124 my $quoted_as = $self->_quote($node->{as});
125 push @in_sel, sprintf '%s AS %s', $node->{sql}, $quoted_as;
126 push @out_sel, $quoted_as;
127 $renamed{$node->{sql}} = $quoted_as;
81446c4f 128 }
129 else {
a54bd479 130 push @in_sel, $node->{sql};
131 push @out_sel, $self->_quote ($node->{as});
81446c4f 132 }
133 }
134
75f025cf 135 # see if the order gives us anything
a54bd479 136 my %extra_order_sel;
75f025cf 137 for my $chunk ($self->_order_by_chunks ($rs_attrs->{order_by})) {
138 # order with bind
139 $chunk = $chunk->[0] if (ref $chunk) eq 'ARRAY';
140 $chunk =~ s/\s+ (?: ASC|DESC ) \s* $//ix;
a54bd479 141
75f025cf 142 next if $in_sel_index->{$chunk};
a54bd479 143
75f025cf 144 $extra_order_sel{$chunk} ||= $self->_quote (
145 'ORDER__BY__' . scalar keys %extra_order_sel
146 );
a54bd479 147 }
75f025cf 148
a54bd479 149 return (
150 (map { join (', ', @$_ ) } (
151 \@in_sel,
152 \@out_sel)
153 ),
154 \%renamed,
155 keys %extra_order_sel ? \%extra_order_sel : (),
156 );
81446c4f 157}
158
63ca94e1 159sub _unqualify_colname {
160 my ($self, $fqcn) = @_;
161 my $re_sep = quotemeta($self->name_sep || '.');
162 $fqcn =~ s/ $re_sep /__/xg;
163 return $fqcn;
164}
165
6a247f33 166#
167# Follow limit dialect implementations
168#
169
170# PostgreSQL and SQLite
171sub _LimitOffset {
172 my ( $self, $sql, $order, $rows, $offset ) = @_;
173 $sql .= $self->_order_by( $order ) . " LIMIT $rows";
174 $sql .= " OFFSET $offset" if +$offset;
175 return $sql;
176}
177
178# MySQL and any SQL::Statement based DBD
179sub _LimitXY {
180 my ( $self, $sql, $order, $rows, $offset ) = @_;
181 $sql .= $self->_order_by( $order ) . " LIMIT ";
182 $sql .= "$offset, " if +$offset;
183 $sql .= $rows;
184 return $sql;
185}
186
a54bd479 187# ANSI standard Limit/Offset implementation. DB2 and MSSQL >= 2005 use this
6f4ddea1 188sub _RowNumberOver {
a6b68a60 189 my ($self, $sql, $rs_attrs, $rows, $offset ) = @_;
6f4ddea1 190
81446c4f 191 # mangle the input sql as we will be replacing the selector
192 $sql =~ s/^ \s* SELECT \s+ .+? \s+ (?= \b FROM \b )//ix
cb478051 193 or croak "Unrecognizable SELECT: $sql";
194
a54bd479 195 # get selectors, and scan the order_by (if any)
75f025cf 196 my ($in_sel, $out_sel, $alias_map, $extra_order_sel)
197 = $self->_subqueried_limit_attrs ( $rs_attrs );
81446c4f 198
a6b68a60 199 # make up an order if none exists
a54bd479 200 my $requested_order = (delete $rs_attrs->{order_by}) || $self->_rno_default_order;
201 my $rno_ord = $self->_order_by ($requested_order);
202
203 # this is the order supplement magic
204 my $mid_sel = $out_sel;
205 if ($extra_order_sel) {
20f44a33 206 for my $extra_col (sort
207 { $extra_order_sel->{$a} cmp $extra_order_sel->{$b} }
208 keys %$extra_order_sel
209 ) {
a54bd479 210 $in_sel .= sprintf (', %s AS %s',
211 $extra_col,
212 $extra_order_sel->{$extra_col},
213 );
214
215 $mid_sel .= ', ' . $extra_order_sel->{$extra_col};
216 }
217 }
218
219 # and this is order re-alias magic
220 for ($extra_order_sel, $alias_map) {
221 for my $col (keys %$_) {
222 my $re_col = quotemeta ($col);
223 $rno_ord =~ s/$re_col/$_->{$col}/;
224 }
225 }
6f4ddea1 226
81446c4f 227 # whatever is left of the order_by (only where is processed at this point)
a6b68a60 228 my $group_having = $self->_parse_rs_attrs($rs_attrs);
6f4ddea1 229
a6b68a60 230 my $qalias = $self->_quote ($rs_attrs->{alias});
81446c4f 231 my $idx_name = $self->_quote ('rno__row__index');
232
cb478051 233 $sql = sprintf (<<EOS, $offset + 1, $offset + $rows, );
6f4ddea1 234
a54bd479 235SELECT $out_sel FROM (
236 SELECT $mid_sel, ROW_NUMBER() OVER( $rno_ord ) AS $idx_name FROM (
237 SELECT $in_sel ${sql}${group_having}
cb478051 238 ) $qalias
75f025cf 239) $qalias WHERE $idx_name BETWEEN %u AND %u
6553ac38 240
241EOS
242
243 $sql =~ s/\s*\n\s*/ /g; # easier to read in the debugger
6f4ddea1 244 return $sql;
245}
246
84ddb3da 247# some databases are happy with OVER (), some need OVER (ORDER BY (SELECT (1)) )
248sub _rno_default_order {
249 return undef;
250}
251
193590c2 252# Informix specific limit, almost like LIMIT/OFFSET
6a247f33 253# According to SQLA::Limit informix also supports
254# SKIP X LIMIT Y (in addition to SKIP X FIRST Y)
193590c2 255sub _SkipFirst {
a6b68a60 256 my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
193590c2 257
258 $sql =~ s/^ \s* SELECT \s+ //ix
259 or croak "Unrecognizable SELECT: $sql";
260
261 return sprintf ('SELECT %s%s%s%s',
262 $offset
75f025cf 263 ? sprintf ('SKIP %u ', $offset)
193590c2 264 : ''
265 ,
75f025cf 266 sprintf ('FIRST %u ', $rows),
193590c2 267 $sql,
a6b68a60 268 $self->_parse_rs_attrs ($rs_attrs),
193590c2 269 );
270}
271
145b2a3d 272# Firebird specific limit, reverse of _SkipFirst for Informix
6a247f33 273# According to SQLA::Limit firebird/interbase also supports
274# ROWS X TO Y (in addition to FIRST X SKIP Y)
145b2a3d 275sub _FirstSkip {
a6b68a60 276 my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
145b2a3d 277
278 $sql =~ s/^ \s* SELECT \s+ //ix
279 or croak "Unrecognizable SELECT: $sql";
280
281 return sprintf ('SELECT %s%s%s%s',
75f025cf 282 sprintf ('FIRST %u ', $rows),
145b2a3d 283 $offset
75f025cf 284 ? sprintf ('SKIP %u ', $offset)
145b2a3d 285 : ''
286 ,
287 $sql,
a6b68a60 288 $self->_parse_rs_attrs ($rs_attrs),
145b2a3d 289 );
290}
291
81446c4f 292# WhOracle limits
293sub _RowNum {
294 my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
295
296 # mangle the input sql as we will be replacing the selector
297 $sql =~ s/^ \s* SELECT \s+ .+? \s+ (?= \b FROM \b )//ix
298 or croak "Unrecognizable SELECT: $sql";
299
a54bd479 300 my ($insel, $outsel) = $self->_subqueried_limit_attrs ($rs_attrs);
81446c4f 301
302 my $qalias = $self->_quote ($rs_attrs->{alias});
303 my $idx_name = $self->_quote ('rownum__index');
304 my $order_group_having = $self->_parse_rs_attrs($rs_attrs);
305
306 $sql = sprintf (<<EOS, $offset + 1, $offset + $rows, );
307
308SELECT $outsel FROM (
309 SELECT $outsel, ROWNUM $idx_name FROM (
310 SELECT $insel ${sql}${order_group_having}
311 ) $qalias
75f025cf 312) $qalias WHERE $idx_name BETWEEN %u AND %u
81446c4f 313
314EOS
315
316 $sql =~ s/\s*\n\s*/ /g; # easier to read in the debugger
317 return $sql;
318}
319
a54bd479 320# Crappy Top based Limit/Offset support. Legacy for MSSQL < 2005
15827712 321sub _Top {
a6b68a60 322 my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
15827712 323
81446c4f 324 # mangle the input sql as we will be replacing the selector
325 $sql =~ s/^ \s* SELECT \s+ .+? \s+ (?= \b FROM \b )//ix
b1e1d073 326 or croak "Unrecognizable SELECT: $sql";
b1e1d073 327
81446c4f 328 # get selectors
a54bd479 329 my ($in_sel, $out_sel, $alias_map, $extra_order_sel)
75f025cf 330 = $self->_subqueried_limit_attrs ($rs_attrs);
81446c4f 331
a54bd479 332 my $requested_order = delete $rs_attrs->{order_by};
81446c4f 333
a54bd479 334 my $order_by_requested = $self->_order_by ($requested_order);
335
336 # make up an order unless supplied
337 my $inner_order = ($order_by_requested
338 ? $requested_order
81446c4f 339 : [ map
a54bd479 340 { join ('', $rs_attrs->{alias}, $self->{name_sep}||'.', $_ ) }
341 ( $rs_attrs->{_rsroot_source_handle}->resolve->_pri_cols )
81446c4f 342 ]
a54bd479 343 );
81446c4f 344
a54bd479 345 my ($order_by_inner, $order_by_reversed);
81446c4f 346
a54bd479 347 # localise as we already have all the bind values we need
348 {
349 local $self->{order_bind};
350 $order_by_inner = $self->_order_by ($inner_order);
ac93965c 351
a54bd479 352 my @out_chunks;
353 for my $ch ($self->_order_by_chunks ($inner_order)) {
354 $ch = $ch->[0] if ref $ch eq 'ARRAY';
20f44a33 355
a54bd479 356 $ch =~ s/\s+ ( ASC|DESC ) \s* $//ix;
357 my $dir = uc ($1||'ASC');
b1e1d073 358
a54bd479 359 push @out_chunks, \join (' ', $ch, $dir eq 'ASC' ? 'DESC' : 'ASC' );
360 }
b1e1d073 361
20f44a33 362 $order_by_reversed = $self->_order_by (\@out_chunks);
42e5b103 363 }
b1e1d073 364
a54bd479 365 # this is the order supplement magic
366 my $mid_sel = $out_sel;
367 if ($extra_order_sel) {
20f44a33 368 for my $extra_col (sort
369 { $extra_order_sel->{$a} cmp $extra_order_sel->{$b} }
370 keys %$extra_order_sel
371 ) {
a54bd479 372 $in_sel .= sprintf (', %s AS %s',
373 $extra_col,
374 $extra_order_sel->{$extra_col},
42e5b103 375 );
b1e1d073 376
a54bd479 377 $mid_sel .= ', ' . $extra_order_sel->{$extra_col};
42e5b103 378 }
25abda27 379
380 # since whatever order bindvals there are, they will be realiased
381 # and need to show up in front of the entire initial inner subquery
382 # Unshift *from_bind* to make this happen (horrible, horrible, but
383 # we don't have another mechanism yet)
384 unshift @{$self->{from_bind}}, @{$self->{order_bind}};
b1e1d073 385 }
386
a54bd479 387 # and this is order re-alias magic
388 for my $map ($extra_order_sel, $alias_map) {
389 for my $col (keys %$map) {
390 my $re_col = quotemeta ($col);
391 $_ =~ s/$re_col/$map->{$col}/
392 for ($order_by_reversed, $order_by_requested);
b1e1d073 393 }
394 }
15827712 395
a54bd479 396 # generate the rest of the sql
397 my $grpby_having = $self->_parse_rs_attrs ($rs_attrs);
15827712 398
a54bd479 399 my $quoted_rs_alias = $self->_quote ($rs_attrs->{alias});
b1e1d073 400
75f025cf 401 $sql = sprintf ('SELECT TOP %u %s %s %s %s',
a54bd479 402 $rows + ($offset||0),
403 $in_sel,
404 $sql,
405 $grpby_having,
406 $order_by_inner,
407 );
15827712 408
75f025cf 409 $sql = sprintf ('SELECT TOP %u %s FROM ( %s ) %s %s',
a54bd479 410 $rows,
411 $mid_sel,
412 $sql,
413 $quoted_rs_alias,
414 $order_by_reversed,
415 ) if $offset;
15827712 416
75f025cf 417 $sql = sprintf ('SELECT TOP %u %s FROM ( %s ) %s %s',
a54bd479 418 $rows,
a54bd479 419 $out_sel,
420 $sql,
421 $quoted_rs_alias,
20f44a33 422 $order_by_requested,
76fc0d71 423 ) if ( ($offset && $order_by_requested) || ($mid_sel ne $out_sel) );
b1e1d073 424
75f025cf 425 $sql =~ s/\s*\n\s*/ /g; # easier to read in the debugger
426 return $sql;
427}
428
3f528319 429# This for Sybase ASE, to use SET ROWCOUNT when there is no offset, and
430# GenericSubQ otherwise.
431sub _RowCountOrGenericSubQ {
432 my $self = shift;
433 my ($sql, $rs_attrs, $rows, $offset) = @_;
434
435 return $self->_GenericSubQ(@_) if $offset;
436
437 return sprintf <<"EOF", $rows, $sql;
438SET ROWCOUNT %d
439%s
440SET ROWCOUNT 0
441EOF
442}
443
75f025cf 444# This is the most evil limit "dialect" (more of a hack) for *really*
445# stupid databases. It works by ordering the set by some unique column,
446# and calculating amount of rows that have a less-er value (thus
447# emulating a RowNum-like index). Of course this implies the set can
448# only be ordered by a single unique columns.
449sub _GenericSubQ {
450 my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
451
452 my $root_rsrc = $rs_attrs->{_rsroot_source_handle}->resolve;
453 my $root_tbl_name = $root_rsrc->name;
454
455 # mangle the input sql as we will be replacing the selector
456 $sql =~ s/^ \s* SELECT \s+ .+? \s+ (?= \b FROM \b )//ix
457 or croak "Unrecognizable SELECT: $sql";
458
459 my ($order_by, @rest) = do {
460 local $self->{quote_char};
461 $self->_order_by_chunks ($rs_attrs->{order_by})
462 };
463
464 unless (
465 $order_by
466 &&
467 ! @rest
468 &&
469 ( ! ref $order_by
470 ||
471 ( ref $order_by eq 'ARRAY' and @$order_by == 1 )
472 )
473 ) {
474 croak (
475 'Generic Subquery Limit does not work on resultsets without an order, or resultsets '
476 . 'with complex order criteria (multicolumn and/or functions). Provide a single, '
477 . 'unique-column order criteria.'
478 );
479 }
480
481 ($order_by) = @$order_by if ref $order_by;
482
483 $order_by =~ s/\s+ ( ASC|DESC ) \s* $//ix;
484 my $direction = lc ($1 || 'asc');
485
486 my ($unq_sort_col) = $order_by =~ /(?:^|\.)([^\.]+)$/;
487
488 my $inf = $root_rsrc->storage->_resolve_column_info (
489 $rs_attrs->{from}, [$order_by, $unq_sort_col]
490 );
491
492 my $ord_colinfo = $inf->{$order_by} || croak "Unable to determine source of order-criteria '$order_by'";
493
494 if ($ord_colinfo->{-result_source}->name ne $root_tbl_name) {
495 croak "Generic Subquery Limit order criteria can be only based on the root-source '"
496 . $root_rsrc->source_name . "' (aliased as '$rs_attrs->{alias}')";
497 }
498
499 # make sure order column is qualified
500 $order_by = "$rs_attrs->{alias}.$order_by"
501 unless $order_by =~ /^$rs_attrs->{alias}\./;
502
503 my $is_u;
504 my $ucs = { $root_rsrc->unique_constraints };
505 for (values %$ucs ) {
506 if (@$_ == 1 && "$rs_attrs->{alias}.$_->[0]" eq $order_by) {
507 $is_u++;
508 last;
509 }
510 }
511 croak "Generic Subquery Limit order criteria column '$order_by' must be unique (no unique constraint found)"
512 unless $is_u;
513
514 my ($in_sel, $out_sel, $alias_map, $extra_order_sel)
515 = $self->_subqueried_limit_attrs ($rs_attrs);
516
517 my $cmp_op = $direction eq 'desc' ? '>' : '<';
518 my $count_tbl_alias = 'rownum__emulation';
519
b13d2248 520 my $order_sql = $self->_order_by (delete $rs_attrs->{order_by});
521 my $group_having_sql = $self->_parse_rs_attrs($rs_attrs);
75f025cf 522
523 # add the order supplement (if any) as this is what will be used for the outer WHERE
524 $in_sel .= ", $_" for keys %{$extra_order_sel||{}};
525
526 $sql = sprintf (<<EOS,
527SELECT $out_sel
528 FROM (
b13d2248 529 SELECT $in_sel ${sql}${group_having_sql}
75f025cf 530 ) %s
531WHERE ( SELECT COUNT(*) FROM %s %s WHERE %s $cmp_op %s ) %s
b13d2248 532$order_sql
75f025cf 533EOS
534 ( map { $self->_quote ($_) } (
535 $rs_attrs->{alias},
536 $root_tbl_name,
537 $count_tbl_alias,
538 "$count_tbl_alias.$unq_sort_col",
539 $order_by,
540 )),
541 $offset
542 ? sprintf ('BETWEEN %u AND %u', $offset, $offset + $rows - 1)
543 : sprintf ('< %u', $rows )
544 ,
545 );
546
547 $sql =~ s/\s*\n\s*/ /g; # easier to read in the debugger
b1e1d073 548 return $sql;
15827712 549}
a54bd479 550
6a247f33 551#
552# Actual SQL::Abstract overrid^Whacks
553#
6f4ddea1 554
6a247f33 555# Handle limit-dialect selection
6f4ddea1 556sub select {
6a247f33 557 my ($self, $table, $fields, $where, $rs_attrs, $limit, $offset) = @_;
558
559
560 $fields = $self->_recurse_fields($fields);
561
562 if (defined $offset) {
563 croak ('A supplied offset must be a non-negative integer')
564 if ( $offset =~ /\D/ or $offset < 0 );
565 }
566 $offset ||= 0;
1cbd3034 567
6a247f33 568 if (defined $limit) {
569 croak ('A supplied limit must be a positive integer')
570 if ( $limit =~ /\D/ or $limit <= 0 );
571 }
572 elsif ($offset) {
573 $limit = $self->__max_int;
6f4ddea1 574 }
c2b7c5dc 575
a6b68a60 576
6a247f33 577 my ($sql, @bind);
578 if ($limit) {
579 # this is legacy code-flow from SQLA::Limit, it is not set in stone
580
581 ($sql, @bind) = $self->next::method ($table, $fields, $where);
582
583 my $limiter =
584 $self->can ('emulate_limit') # also backcompat hook from SQLA::Limit
585 ||
586 do {
587 my $dialect = $self->limit_dialect
588 or croak "Unable to generate SQL-limit - no limit dialect specified on $self, and no emulate_limit method found";
589 $self->can ("_$dialect")
590 or croak "SQLAHacks does not implement the requested dialect '$dialect'";
591 }
592 ;
593
594 $sql = $self->$limiter ($sql, $rs_attrs, $limit, $offset);
595 }
596 else {
597 ($sql, @bind) = $self->next::method ($table, $fields, $where, $rs_attrs);
598 }
599
49afd714 600 push @{$self->{where_bind}}, @bind;
583a0c65 601
602# this *must* be called, otherwise extra binds will remain in the sql-maker
49afd714 603 my @all_bind = $self->_assemble_binds;
583a0c65 604
49afd714 605 return wantarray ? ($sql, @all_bind) : $sql;
583a0c65 606}
607
608sub _assemble_binds {
609 my $self = shift;
610 return map { @{ (delete $self->{"${_}_bind"}) || [] } } (qw/from where having order/);
6f4ddea1 611}
612
6a247f33 613# Handle default inserts
6f4ddea1 614sub insert {
6a247f33 615# optimized due to hotttnesss
616# my ($self, $table, $data, $options) = @_;
7a72e5a5 617
618 # SQLA will emit INSERT INTO $table ( ) VALUES ( )
619 # which is sadly understood only by MySQL. Change default behavior here,
620 # until SQLA2 comes with proper dialect support
6a247f33 621 if (! $_[2] or (ref $_[2] eq 'HASH' and !keys %{$_[2]} ) ) {
622 my $sql = "INSERT INTO $_[1] DEFAULT VALUES";
28d28903 623
6a247f33 624 if (my $ret = ($_[3]||{})->{returning} ) {
625 $sql .= $_[0]->_insert_returning ($ret);
28d28903 626 }
627
628 return $sql;
7a72e5a5 629 }
630
6a247f33 631 next::method(@_);
6f4ddea1 632}
633
634sub _recurse_fields {
81446c4f 635 my ($self, $fields) = @_;
6f4ddea1 636 my $ref = ref $fields;
637 return $self->_quote($fields) unless $ref;
638 return $$fields if $ref eq 'SCALAR';
639
640 if ($ref eq 'ARRAY') {
81446c4f 641 return join(', ', map { $self->_recurse_fields($_) } @$fields);
83e09b5b 642 }
643 elsif ($ref eq 'HASH') {
81446c4f 644 my %hash = %$fields; # shallow copy
83e09b5b 645
50136dd9 646 my $as = delete $hash{-as}; # if supplied
647
81446c4f 648 my ($func, $args, @toomany) = %hash;
649
650 # there should be only one pair
651 if (@toomany) {
652 croak "Malformed select argument - too many keys in hash: " . join (',', keys %$fields );
653 }
50136dd9 654
655 if (lc ($func) eq 'distinct' && ref $args eq 'ARRAY' && @$args > 1) {
656 croak (
657 'The select => { distinct => ... } syntax is not supported for multiple columns.'
658 .' Instead please use { group_by => [ qw/' . (join ' ', @$args) . '/ ] }'
659 .' or { select => [ qw/' . (join ' ', @$args) . '/ ], distinct => 1 }'
83e09b5b 660 );
6f4ddea1 661 }
83e09b5b 662
50136dd9 663 my $select = sprintf ('%s( %s )%s',
664 $self->_sqlcase($func),
665 $self->_recurse_fields($args),
666 $as
0491b597 667 ? sprintf (' %s %s', $self->_sqlcase('as'), $self->_quote ($as) )
50136dd9 668 : ''
669 );
670
83e09b5b 671 return $select;
6f4ddea1 672 }
673 # Is the second check absolutely necessary?
674 elsif ( $ref eq 'REF' and ref($$fields) eq 'ARRAY' ) {
db56cf3d 675 return $self->_fold_sqlbind( $fields );
6f4ddea1 676 }
677 else {
e8fcf76f 678 croak($ref . qq{ unexpected in _recurse_fields()})
6f4ddea1 679 }
680}
681
a6b68a60 682my $for_syntax = {
683 update => 'FOR UPDATE',
684 shared => 'FOR SHARE',
685};
686
687# this used to be a part of _order_by but is broken out for clarity.
688# What we have been doing forever is hijacking the $order arg of
689# SQLA::select to pass in arbitrary pieces of data (first the group_by,
690# then pretty much the entire resultset attr-hash, as more and more
691# things in the SQLA space need to have mopre info about the $rs they
692# create SQL for. The alternative would be to keep expanding the
693# signature of _select with more and more positional parameters, which
694# is just gross. All hail SQLA2!
695sub _parse_rs_attrs {
1cbd3034 696 my ($self, $arg) = @_;
15827712 697
a6b68a60 698 my $sql = '';
1cbd3034 699
b03f30a3 700 if (my $g = $self->_recurse_fields($arg->{group_by}) ) {
a6b68a60 701 $sql .= $self->_sqlcase(' group by ') . $g;
702 }
1cbd3034 703
a6b68a60 704 if (defined $arg->{having}) {
705 my ($frag, @bind) = $self->_recurse_where($arg->{having});
706 push(@{$self->{having_bind}}, @bind);
707 $sql .= $self->_sqlcase(' having ') . $frag;
708 }
15827712 709
a6b68a60 710 if (defined $arg->{order_by}) {
711 $sql .= $self->_order_by ($arg->{order_by});
712 }
15827712 713
a6b68a60 714 if (my $for = $arg->{for}) {
715 $sql .= " $for_syntax->{$for}" if $for_syntax->{$for};
716 }
717
718 return $sql;
719}
720
721sub _order_by {
722 my ($self, $arg) = @_;
15827712 723
a6b68a60 724 # check that we are not called in legacy mode (order_by as 4th argument)
725 if (ref $arg eq 'HASH' and not grep { $_ =~ /^-(?:desc|asc)/i } keys %$arg ) {
726 return $self->_parse_rs_attrs ($arg);
fde3719a 727 }
1cbd3034 728 else {
6a247f33 729 my ($sql, @bind) = $self->next::method($arg);
a6b68a60 730 push @{$self->{order_bind}}, @bind;
1cbd3034 731 return $sql;
fd4cb60a 732 }
6f4ddea1 733}
734
735sub _table {
6a247f33 736# optimized due to hotttnesss
737# my ($self, $from) = @_;
738 if (my $ref = ref $_[1] ) {
739 if ($ref eq 'ARRAY') {
740 return $_[0]->_recurse_from(@{$_[1]});
741 }
742 elsif ($ref eq 'HASH') {
743 return $_[0]->_make_as($_[1]);
744 }
6f4ddea1 745 }
6a247f33 746
747 return $_[0]->next::method ($_[1]);
6f4ddea1 748}
749
b8391c87 750sub _generate_join_clause {
751 my ($self, $join_type) = @_;
752
753 return sprintf ('%s JOIN ',
754 $join_type ? ' ' . uc($join_type) : ''
755 );
756}
757
6f4ddea1 758sub _recurse_from {
759 my ($self, $from, @join) = @_;
760 my @sqlf;
761 push(@sqlf, $self->_make_as($from));
762 foreach my $j (@join) {
763 my ($to, $on) = @$j;
764
aa82ce29 765
6f4ddea1 766 # check whether a join type exists
6f4ddea1 767 my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
aa82ce29 768 my $join_type;
769 if (ref($to_jt) eq 'HASH' and defined($to_jt->{-join_type})) {
770 $join_type = $to_jt->{-join_type};
771 $join_type =~ s/^\s+ | \s+$//xg;
6f4ddea1 772 }
aa82ce29 773
de5f71ef 774 $join_type = $self->{_default_jointype} if not defined $join_type;
aa82ce29 775
b8391c87 776 push @sqlf, $self->_generate_join_clause( $join_type );
6f4ddea1 777
778 if (ref $to eq 'ARRAY') {
779 push(@sqlf, '(', $self->_recurse_from(@$to), ')');
780 } else {
781 push(@sqlf, $self->_make_as($to));
782 }
783 push(@sqlf, ' ON ', $self->_join_condition($on));
784 }
785 return join('', @sqlf);
786}
787
db56cf3d 788sub _fold_sqlbind {
789 my ($self, $sqlbind) = @_;
69989ea9 790
791 my @sqlbind = @$$sqlbind; # copy
792 my $sql = shift @sqlbind;
793 push @{$self->{from_bind}}, @sqlbind;
794
db56cf3d 795 return $sql;
6f4ddea1 796}
797
798sub _make_as {
799 my ($self, $from) = @_;
db56cf3d 800 return join(' ', map { (ref $_ eq 'SCALAR' ? $$_
801 : ref $_ eq 'REF' ? $self->_fold_sqlbind($_)
802 : $self->_quote($_))
6f4ddea1 803 } reverse each %{$self->_skip_options($from)});
804}
805
806sub _skip_options {
807 my ($self, $hash) = @_;
808 my $clean_hash = {};
809 $clean_hash->{$_} = $hash->{$_}
810 for grep {!/^-/} keys %$hash;
811 return $clean_hash;
812}
813
814sub _join_condition {
815 my ($self, $cond) = @_;
816 if (ref $cond eq 'HASH') {
817 my %j;
818 for (keys %$cond) {
819 my $v = $cond->{$_};
820 if (ref $v) {
e8fcf76f 821 croak (ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
6f4ddea1 822 if ref($v) ne 'SCALAR';
823 $j{$_} = $v;
824 }
825 else {
826 my $x = '= '.$self->_quote($v); $j{$_} = \$x;
827 }
828 };
829 return scalar($self->_recurse_where(\%j));
830 } elsif (ref $cond eq 'ARRAY') {
831 return join(' OR ', map { $self->_join_condition($_) } @$cond);
832 } else {
b48d3b4e 833 croak "Can't handle this yet!";
6f4ddea1 834 }
835}
836
6f4ddea1 837
8381;