Some cleanups and code dedup of Top and FetchFirst limit dialects
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / SQLMaker / LimitDialects.pm
1 package DBIx::Class::SQLMaker::LimitDialects;
2
3 use warnings;
4 use strict;
5
6 use List::Util 'first';
7 use namespace::clean;
8
9 # constants are used not only here, but also in comparison tests
10 sub __rows_bindtype () {
11   +{ sqlt_datatype => 'integer' }
12 }
13 sub __offset_bindtype () {
14   +{ sqlt_datatype => 'integer' }
15 }
16 sub __total_bindtype () {
17   +{ sqlt_datatype => 'integer' }
18 }
19
20 =head1 NAME
21
22 DBIx::Class::SQLMaker::LimitDialects - SQL::Abstract::Limit-like functionality for DBIx::Class::SQLMaker
23
24 =head1 DESCRIPTION
25
26 This module replicates a lot of the functionality originally found in
27 L<SQL::Abstract::Limit>. While simple limits would work as-is, the more
28 complex dialects that require e.g. subqueries could not be reliably
29 implemented without taking full advantage of the metadata locked within
30 L<DBIx::Class::ResultSource> classes. After reimplementation of close to
31 80% of the L<SQL::Abstract::Limit> functionality it was deemed more
32 practical to simply make an independent DBIx::Class-specific limit-dialect
33 provider.
34
35 =head1 SQL LIMIT DIALECTS
36
37 Note that the actual implementations listed below never use C<*> literally.
38 Instead proper re-aliasing of selectors and order criteria is done, so that
39 the limit dialect are safe to use on joined resultsets with clashing column
40 names.
41
42 Currently the provided dialects are:
43
44 =head2 LimitOffset
45
46  SELECT ... LIMIT $limit OFFSET $offset
47
48 Supported by B<PostgreSQL> and B<SQLite>
49
50 =cut
51 sub _LimitOffset {
52     my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
53     $sql .= $self->_parse_rs_attrs( $rs_attrs ) . " LIMIT ?";
54     push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
55     if ($offset) {
56       $sql .= " OFFSET ?";
57       push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset ];
58     }
59     return $sql;
60 }
61
62 =head2 LimitXY
63
64  SELECT ... LIMIT $offset $limit
65
66 Supported by B<MySQL> and any L<SQL::Statement> based DBD
67
68 =cut
69 sub _LimitXY {
70     my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
71     $sql .= $self->_parse_rs_attrs( $rs_attrs ) . " LIMIT ";
72     if ($offset) {
73       $sql .= '?, ';
74       push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset ];
75     }
76     $sql .= '?';
77     push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
78
79     return $sql;
80 }
81
82 =head2 RowNumberOver
83
84  SELECT * FROM (
85   SELECT *, ROW_NUMBER() OVER( ORDER BY ... ) AS RNO__ROW__INDEX FROM (
86    SELECT ...
87   )
88  ) WHERE RNO__ROW__INDEX BETWEEN ($offset+1) AND ($limit+$offset)
89
90
91 ANSI standard Limit/Offset implementation. Supported by B<DB2> and
92 B<< MSSQL >= 2005 >>.
93
94 =cut
95 sub _RowNumberOver {
96   my ($self, $sql, $rs_attrs, $rows, $offset ) = @_;
97
98   # get selectors, and scan the order_by (if any)
99   my ($stripped_sql, $in_sel, $out_sel, $alias_map, $extra_order_sel)
100     = $self->_subqueried_limit_attrs ( $sql, $rs_attrs );
101
102   # make up an order if none exists
103   my $requested_order = (delete $rs_attrs->{order_by}) || $self->_rno_default_order;
104   my $rno_ord = $self->_order_by ($requested_order);
105
106   # this is the order supplement magic
107   my $mid_sel = $out_sel;
108   if ($extra_order_sel) {
109     for my $extra_col (sort
110       { $extra_order_sel->{$a} cmp $extra_order_sel->{$b} }
111       keys %$extra_order_sel
112     ) {
113       $in_sel .= sprintf (', %s AS %s',
114         $extra_col,
115         $extra_order_sel->{$extra_col},
116       );
117     }
118   }
119
120   # and this is order re-alias magic
121   for ($extra_order_sel, $alias_map) {
122     for my $col (keys %$_) {
123       my $re_col = quotemeta ($col);
124       $rno_ord =~ s/$re_col/$_->{$col}/;
125     }
126   }
127
128   # whatever is left of the order_by (only where is processed at this point)
129   my $group_having = $self->_parse_rs_attrs($rs_attrs);
130
131   my $qalias = $self->_quote ($rs_attrs->{alias});
132   my $idx_name = $self->_quote ('rno__row__index');
133
134   push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset + 1], [ $self->__total_bindtype => $offset + $rows ];
135
136   return <<EOS;
137
138 SELECT $out_sel FROM (
139   SELECT $mid_sel, ROW_NUMBER() OVER( $rno_ord ) AS $idx_name FROM (
140     SELECT $in_sel ${stripped_sql}${group_having}
141   ) $qalias
142 ) $qalias WHERE $idx_name >= ? AND $idx_name <= ?
143
144 EOS
145
146 }
147
148 # some databases are happy with OVER (), some need OVER (ORDER BY (SELECT (1)) )
149 sub _rno_default_order {
150   return undef;
151 }
152
153 =head2 SkipFirst
154
155  SELECT SKIP $offset FIRST $limit * FROM ...
156
157 Suported by B<Informix>, almost like LimitOffset. According to
158 L<SQL::Abstract::Limit> C<... SKIP $offset LIMIT $limit ...> is also supported.
159
160 =cut
161 sub _SkipFirst {
162   my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
163
164   $sql =~ s/^ \s* SELECT \s+ //ix
165     or $self->throw_exception("Unrecognizable SELECT: $sql");
166
167   return sprintf ('SELECT %s%s%s%s',
168     $offset
169       ? do {
170          push @{$self->{pre_select_bind}}, [ $self->__offset_bindtype => $offset];
171          'SKIP ? '
172       }
173       : ''
174     ,
175     do {
176        push @{$self->{pre_select_bind}}, [ $self->__rows_bindtype => $rows ];
177        'FIRST ? '
178     },
179     $sql,
180     $self->_parse_rs_attrs ($rs_attrs),
181   );
182 }
183
184 =head2 FirstSkip
185
186  SELECT FIRST $limit SKIP $offset * FROM ...
187
188 Supported by B<Firebird/Interbase>, reverse of SkipFirst. According to
189 L<SQL::Abstract::Limit> C<... ROWS $limit TO $offset ...> is also supported.
190
191 =cut
192 sub _FirstSkip {
193   my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
194
195   $sql =~ s/^ \s* SELECT \s+ //ix
196     or $self->throw_exception("Unrecognizable SELECT: $sql");
197
198   return sprintf ('SELECT %s%s%s%s',
199     do {
200        push @{$self->{pre_select_bind}}, [ $self->__rows_bindtype => $rows ];
201        'FIRST ? '
202     },
203     $offset
204       ? do {
205          push @{$self->{pre_select_bind}}, [ $self->__offset_bindtype => $offset];
206          'SKIP ? '
207       }
208       : ''
209     ,
210     $sql,
211     $self->_parse_rs_attrs ($rs_attrs),
212   );
213 }
214
215
216 =head2 RowNum
217
218 Depending on the resultset attributes one of:
219
220  SELECT * FROM (
221   SELECT *, ROWNUM rownum__index FROM (
222    SELECT ...
223   ) WHERE ROWNUM <= ($limit+$offset)
224  ) WHERE rownum__index >= ($offset+1)
225
226 or
227
228  SELECT * FROM (
229   SELECT *, ROWNUM rownum__index FROM (
230     SELECT ...
231   )
232  ) WHERE rownum__index BETWEEN ($offset+1) AND ($limit+$offset)
233
234 or
235
236  SELECT * FROM (
237     SELECT ...
238   ) WHERE ROWNUM <= ($limit+1)
239
240 Supported by B<Oracle>.
241
242 =cut
243 sub _RowNum {
244   my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
245
246   my ($stripped_sql, $insel, $outsel) = $self->_subqueried_limit_attrs ($sql, $rs_attrs);
247
248   my $qalias = $self->_quote ($rs_attrs->{alias});
249   my $idx_name = $self->_quote ('rownum__index');
250   my $order_group_having = $self->_parse_rs_attrs($rs_attrs);
251
252   #
253   # There are two ways to limit in Oracle, one vastly faster than the other
254   # on large resultsets: https://decipherinfosys.wordpress.com/2007/08/09/paging-and-countstopkey-optimization/
255   # However Oracle is retarded and does not preserve stable ROWNUM() values
256   # when called twice in the same scope. Therefore unless the resultset is
257   # ordered by a unique set of columns, it is not safe to use the faster
258   # method, and the slower BETWEEN query is used instead
259   #
260   # FIXME - this is quite expensive, and does not perform caching of any sort
261   # as soon as some of the DQ work becomes viable consider switching this
262   # over
263   if (
264     $rs_attrs->{order_by}
265       and
266     $rs_attrs->{_rsroot_rsrc}->storage->_order_by_is_stable(
267       $rs_attrs->{from}, $rs_attrs->{order_by}
268     )
269   ) {
270     # if offset is 0 (first page) the we can skip a subquery
271     if (! $offset) {
272       push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
273
274       return <<EOS;
275 SELECT $outsel FROM (
276   SELECT $insel ${stripped_sql}${order_group_having}
277 ) $qalias WHERE ROWNUM <= ?
278 EOS
279     }
280     else {
281       push @{$self->{limit_bind}}, [ $self->__total_bindtype => $offset + $rows ], [ $self->__offset_bindtype => $offset + 1 ];
282
283       return <<EOS;
284 SELECT $outsel FROM (
285   SELECT $outsel, ROWNUM $idx_name FROM (
286     SELECT $insel ${stripped_sql}${order_group_having}
287   ) $qalias WHERE ROWNUM <= ?
288 ) $qalias WHERE $idx_name >= ?
289 EOS
290     }
291   }
292   else {
293     push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset + 1 ], [ $self->__total_bindtype => $offset + $rows ];
294
295     return <<EOS;
296 SELECT $outsel FROM (
297   SELECT $outsel, ROWNUM $idx_name FROM (
298     SELECT $insel ${stripped_sql}${order_group_having}
299   ) $qalias
300 ) $qalias WHERE $idx_name BETWEEN ? AND ?
301 EOS
302   }
303 }
304
305 # used by _Top and _FetchFirst below
306 sub _prep_for_skimming_limit {
307   my ( $self, $sql, $rs_attrs ) = @_;
308
309   # get selectors
310   my (%r, $alias_map, $extra_order_sel);
311   ($r{inner_sql}, $r{in_sel}, $r{out_sel}, $alias_map, $extra_order_sel)
312     = $self->_subqueried_limit_attrs ($sql, $rs_attrs);
313
314   my $requested_order = delete $rs_attrs->{order_by};
315   $r{order_by_requested} = $self->_order_by ($requested_order);
316
317   # make up an order unless supplied or sanity check what we are given
318   my $inner_order;
319   if ($r{order_by_requested}) {
320     $self->throw_exception (
321       'Unable to safely perform "skimming type" limit with supplied unstable order criteria'
322     ) unless $rs_attrs->{_rsroot_rsrc}->schema->storage->_order_by_is_stable(
323       $rs_attrs->{from},
324       $requested_order
325     );
326
327     $inner_order = $requested_order;
328   }
329   else {
330     $inner_order = [ map
331       { "$rs_attrs->{alias}.$_" }
332       ( @{
333         $rs_attrs->{_rsroot_rsrc}->_identifying_column_set
334           ||
335         $self->throw_exception(sprintf(
336           'Unable to auto-construct stable order criteria for "skimming type" limit '
337         . "dialect based on source '%s'", $rs_attrs->{_rsroot_rsrc}->name) );
338       } )
339     ];
340   }
341
342   # localise as we already have all the bind values we need
343   {
344     local $self->{order_bind};
345     $r{order_by_inner} = $self->_order_by ($inner_order);
346
347     my @out_chunks;
348     for my $ch ($self->_order_by_chunks ($inner_order)) {
349       $ch = $ch->[0] if ref $ch eq 'ARRAY';
350
351       $ch =~ s/\s+ ( ASC|DESC ) \s* $//ix;
352       my $dir = uc ($1||'ASC');
353
354       push @out_chunks, \join (' ', $ch, $dir eq 'ASC' ? 'DESC' : 'ASC' );
355     }
356
357     $r{order_by_reversed} = $self->_order_by (\@out_chunks);
358   }
359
360   # this is the order supplement magic
361   $r{mid_sel} = $r{out_sel};
362   if ($extra_order_sel) {
363     for my $extra_col (sort
364       { $extra_order_sel->{$a} cmp $extra_order_sel->{$b} }
365       keys %$extra_order_sel
366     ) {
367       $r{in_sel} .= sprintf (', %s AS %s',
368         $extra_col,
369         $extra_order_sel->{$extra_col},
370       );
371
372       $r{mid_sel} .= ', ' . $extra_order_sel->{$extra_col};
373     }
374
375     # Whatever order bindvals there are, they will be realiased and
376     # need to show up in front of the entire initial inner subquery
377     push @{$self->{pre_select_bind}}, @{$self->{order_bind}};
378   }
379
380   # if this is a part of something bigger, we need to add back all
381   # the extra order_by's, as they may be relied upon by the outside
382   # of a prefetch or something
383   if ($rs_attrs->{_is_internal_subuery} and keys %$extra_order_sel) {
384     $r{out_sel} .= sprintf ", $extra_order_sel->{$_} AS $_"
385       for sort
386         { $extra_order_sel->{$a} cmp $extra_order_sel->{$b} }
387           grep { $_ !~ /[^\w\-]/ }  # ignore functions
388           keys %$extra_order_sel
389     ;
390   }
391
392   # and this is order re-alias magic
393   for my $map ($extra_order_sel, $alias_map) {
394     for my $col (keys %$map) {
395       my $re_col = quotemeta ($col);
396       $_ =~ s/$re_col/$map->{$col}/
397         for ($r{order_by_reversed}, $r{order_by_requested});
398     }
399   }
400
401   # generate the rest of the sql
402   $r{grpby_having} = $self->_parse_rs_attrs ($rs_attrs);
403
404   $r{quoted_rs_alias} = $self->_quote ($rs_attrs->{alias});
405
406   \%r;
407 }
408
409 =head2 Top
410
411  SELECT * FROM
412
413  SELECT TOP $limit FROM (
414   SELECT TOP $limit FROM (
415    SELECT TOP ($limit+$offset) ...
416   ) ORDER BY $reversed_original_order
417  ) ORDER BY $original_order
418
419 Unreliable Top-based implementation, supported by B<< MSSQL < 2005 >>.
420
421 =head3 CAVEAT
422
423 Due to its implementation, this limit dialect returns B<incorrect results>
424 when $limit+$offset > total amount of rows in the resultset.
425
426 =cut
427
428 sub _Top {
429   my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
430
431   my %l = %{ $self->_prep_for_skimming_limit($sql, $rs_attrs) };
432
433   $sql = sprintf ('SELECT TOP %u %s %s %s %s',
434     $rows + ($offset||0),
435     $l{in_sel},
436     $l{inner_sql},
437     $l{grpby_having},
438     $l{order_by_inner},
439   );
440
441   $sql = sprintf ('SELECT TOP %u %s FROM ( %s ) %s %s',
442     $rows,
443     $l{mid_sel},
444     $sql,
445     $l{quoted_rs_alias},
446     $l{order_by_reversed},
447   ) if $offset;
448
449   $sql = sprintf ('SELECT TOP %u %s FROM ( %s ) %s %s',
450     $rows,
451     $l{out_sel},
452     $sql,
453     $l{quoted_rs_alias},
454     $l{order_by_requested},
455   ) if ( ($offset && $l{order_by_requested}) || ($l{mid_sel} ne $l{out_sel}) );
456
457   return $sql;
458 }
459
460 =head2 FetchFirst
461
462  SELECT * FROM
463  (
464  SELECT * FROM (
465   SELECT * FROM (
466    SELECT * FROM ...
467   ) ORDER BY $reversed_original_order
468     FETCH FIRST $limit ROWS ONLY
469  ) ORDER BY $original_order
470    FETCH FIRST $limit ROWS ONLY
471  )
472
473 Unreliable FetchFirst-based implementation, supported by B<< IBM DB2 <= V5R3 >>.
474
475 =head3 CAVEAT
476
477 Due to its implementation, this limit dialect returns B<incorrect results>
478 when $limit+$offset > total amount of rows in the resultset.
479
480 =cut
481
482 sub _FetchFirst {
483   my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
484
485   my %l = %{ $self->_prep_for_skimming_limit($sql, $rs_attrs) };
486
487   $sql = sprintf ('SELECT %s %s %s %s FETCH FIRST %u ROWS ONLY',
488     $l{in_sel},
489     $l{inner_sql},
490     $l{grpby_having},
491     $l{order_by_inner},
492     $rows + ($offset||0),
493   );
494
495   $sql = sprintf ('SELECT %s FROM ( %s ) %s %s FETCH FIRST %u ROWS ONLY',
496     $l{mid_sel},
497     $sql,
498     $l{quoted_rs_alias},
499     $l{order_by_reversed},
500     $rows,
501   ) if $offset;
502
503   $sql = sprintf ('SELECT %s FROM ( %s ) %s %s FETCH FIRST %u ROWS ONLY',
504     $l{out_sel},
505     $sql,
506     $l{quoted_rs_alias},
507     $l{order_by_requested},
508     $rows,
509   ) if ( ($offset && $l{order_by_requested}) || ($l{mid_sel} ne $l{out_sel}) );
510
511   return $sql;
512 }
513
514 =head2 RowCountOrGenericSubQ
515
516 This is not exactly a limit dialect, but more of a proxy for B<Sybase ASE>.
517 If no $offset is supplied the limit is simply performed as:
518
519  SET ROWCOUNT $limit
520  SELECT ...
521  SET ROWCOUNT 0
522
523 Otherwise we fall back to L</GenericSubQ>
524
525 =cut
526
527 sub _RowCountOrGenericSubQ {
528   my $self = shift;
529   my ($sql, $rs_attrs, $rows, $offset) = @_;
530
531   return $self->_GenericSubQ(@_) if $offset;
532
533   return sprintf <<"EOF", $rows, $sql;
534 SET ROWCOUNT %d
535 %s
536 SET ROWCOUNT 0
537 EOF
538 }
539
540 =head2 GenericSubQ
541
542  SELECT * FROM (
543   SELECT ...
544  )
545  WHERE (
546   SELECT COUNT(*) FROM $original_table cnt WHERE cnt.id < $original_table.id
547  ) BETWEEN $offset AND ($offset+$rows-1)
548
549 This is the most evil limit "dialect" (more of a hack) for I<really> stupid
550 databases. It works by ordering the set by some unique column, and calculating
551 the amount of rows that have a less-er value (thus emulating a L</RowNum>-like
552 index). Of course this implies the set can only be ordered by a single unique
553 column. Also note that this technique can be and often is B<excruciatingly
554 slow>.
555
556 Currently used by B<Sybase ASE>, due to lack of any other option.
557
558 =cut
559 sub _GenericSubQ {
560   my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
561
562   my $root_rsrc = $rs_attrs->{_rsroot_rsrc};
563   my $root_tbl_name = $root_rsrc->name;
564
565   my ($order_by, @rest) = do {
566     local $self->{quote_char};
567     $self->_order_by_chunks ($rs_attrs->{order_by})
568   };
569
570   unless (
571     $order_by
572       &&
573     ! @rest
574       &&
575     ( ! ref $order_by
576         ||
577       ( ref $order_by eq 'ARRAY' and @$order_by == 1 )
578     )
579   ) {
580     $self->throw_exception (
581       'Generic Subquery Limit does not work on resultsets without an order, or resultsets '
582     . 'with complex order criteria (multicolumn and/or functions). Provide a single, '
583     . 'unique-column order criteria.'
584     );
585   }
586
587   ($order_by) = @$order_by if ref $order_by;
588
589   $order_by =~ s/\s+ ( ASC|DESC ) \s* $//ix;
590   my $direction = lc ($1 || 'asc');
591
592   my ($unq_sort_col) = $order_by =~ /(?:^|\.)([^\.]+)$/;
593
594   my $inf = $root_rsrc->storage->_resolve_column_info (
595     $rs_attrs->{from}, [$order_by, $unq_sort_col]
596   );
597
598   my $ord_colinfo = $inf->{$order_by} || $self->throw_exception("Unable to determine source of order-criteria '$order_by'");
599
600   if ($ord_colinfo->{-result_source}->name ne $root_tbl_name) {
601     $self->throw_exception(sprintf
602       "Generic Subquery Limit order criteria can be only based on the root-source '%s'"
603     . " (aliased as '%s')", $root_rsrc->source_name, $rs_attrs->{alias},
604     );
605   }
606
607   # make sure order column is qualified
608   $order_by = "$rs_attrs->{alias}.$order_by"
609     unless $order_by =~ /^$rs_attrs->{alias}\./;
610
611   my $is_u;
612   my $ucs = { $root_rsrc->unique_constraints };
613   for (values %$ucs ) {
614     if (@$_ == 1 && "$rs_attrs->{alias}.$_->[0]" eq $order_by) {
615       $is_u++;
616       last;
617     }
618   }
619   $self->throw_exception(
620     "Generic Subquery Limit order criteria column '$order_by' must be unique (no unique constraint found)"
621   ) unless $is_u;
622
623   my ($stripped_sql, $in_sel, $out_sel, $alias_map, $extra_order_sel)
624     = $self->_subqueried_limit_attrs ($sql, $rs_attrs);
625
626   my $cmp_op = $direction eq 'desc' ? '>' : '<';
627   my $count_tbl_alias = 'rownum__emulation';
628
629   my $order_sql = $self->_order_by (delete $rs_attrs->{order_by});
630   my $group_having_sql = $self->_parse_rs_attrs($rs_attrs);
631
632   # add the order supplement (if any) as this is what will be used for the outer WHERE
633   $in_sel .= ", $_" for keys %{$extra_order_sel||{}};
634
635   my $rownum_cond;
636   if ($offset) {
637     $rownum_cond = 'BETWEEN ? AND ?';
638
639     push @{$self->{limit_bind}},
640       [ $self->__offset_bindtype => $offset ],
641       [ $self->__total_bindtype => $offset + $rows - 1]
642     ;
643   }
644   else {
645     $rownum_cond = '< ?';
646
647     push @{$self->{limit_bind}},
648       [ $self->__rows_bindtype => $rows ]
649     ;
650   }
651
652   return sprintf ("
653 SELECT $out_sel
654   FROM (
655     SELECT $in_sel ${stripped_sql}${group_having_sql}
656   ) %s
657 WHERE ( SELECT COUNT(*) FROM %s %s WHERE %s $cmp_op %s ) $rownum_cond
658 $order_sql
659   ", map { $self->_quote ($_) } (
660     $rs_attrs->{alias},
661     $root_tbl_name,
662     $count_tbl_alias,
663     "$count_tbl_alias.$unq_sort_col",
664     $order_by,
665   ));
666 }
667
668
669 # !!! THIS IS ALSO HORRIFIC !!! /me ashamed
670 #
671 # Generates inner/outer select lists for various limit dialects
672 # which result in one or more subqueries (e.g. RNO, Top, RowNum)
673 # Any non-root-table columns need to have their table qualifier
674 # turned into a column alias (otherwise names in subqueries clash
675 # and/or lose their source table)
676 #
677 # Returns mangled proto-sql, inner/outer strings of SQL QUOTED selectors
678 # with aliases (to be used in whatever select statement), and an alias
679 # index hashref of QUOTED SEL => QUOTED ALIAS pairs (to maybe be used
680 # for string-subst higher up).
681 # If an order_by is supplied, the inner select needs to bring out columns
682 # used in implicit (non-selected) orders, and the order condition itself
683 # needs to be realiased to the proper names in the outer query. Thus we
684 # also return a hashref (order doesn't matter) of QUOTED EXTRA-SEL =>
685 # QUOTED ALIAS pairs, which is a list of extra selectors that do *not*
686 # exist in the original select list
687 sub _subqueried_limit_attrs {
688   my ($self, $proto_sql, $rs_attrs) = @_;
689
690   $self->throw_exception(
691     'Limit dialect implementation usable only in the context of DBIC (missing $rs_attrs)'
692   ) unless ref ($rs_attrs) eq 'HASH';
693
694   # mangle the input sql as we will be replacing the selector entirely
695   unless (
696     $rs_attrs->{_selector_sql}
697       and
698     $proto_sql =~ s/^ \s* SELECT \s* \Q$rs_attrs->{_selector_sql}//ix
699   ) {
700     $self->throw_exception("Unrecognizable SELECT: $proto_sql");
701   }
702
703   my ($re_sep, $re_alias) = map { quotemeta $_ } ( $self->{name_sep}, $rs_attrs->{alias} );
704
705   # insulate from the multiple _recurse_fields calls below
706   local $self->{select_bind};
707
708   # correlate select and as, build selection index
709   my (@sel, $in_sel_index);
710   for my $i (0 .. $#{$rs_attrs->{select}}) {
711
712     my $s = $rs_attrs->{select}[$i];
713     my $sql_sel = $self->_recurse_fields ($s);
714     my $sql_alias = (ref $s) eq 'HASH' ? $s->{-as} : undef;
715
716     push @sel, {
717       sql => $sql_sel,
718       unquoted_sql => do {
719         local $self->{quote_char};
720         $self->_recurse_fields ($s);
721       },
722       as =>
723         $sql_alias
724           ||
725         $rs_attrs->{as}[$i]
726           ||
727         $self->throw_exception("Select argument $i ($s) without corresponding 'as'")
728       ,
729     };
730
731     $in_sel_index->{$sql_sel}++;
732     $in_sel_index->{$self->_quote ($sql_alias)}++ if $sql_alias;
733
734     # record unqualified versions too, so we do not have
735     # to reselect the same column twice (in qualified and
736     # unqualified form)
737     if (! ref $s && $sql_sel =~ / $re_sep (.+) $/x) {
738       $in_sel_index->{$1}++;
739     }
740   }
741
742
743   # re-alias and remove any name separators from aliases,
744   # unless we are dealing with the current source alias
745   # (which will transcend the subqueries as it is necessary
746   # for possible further chaining)
747   my (@in_sel, @out_sel, %renamed);
748   for my $node (@sel) {
749     if (
750       $node->{as} =~ / (?<! ^ $re_alias ) \. /x
751         or
752       $node->{unquoted_sql} =~ / (?<! ^ $re_alias ) $re_sep /x
753     ) {
754       $node->{as} = $self->_unqualify_colname($node->{as});
755       my $quoted_as = $self->_quote($node->{as});
756       push @in_sel, sprintf '%s AS %s', $node->{sql}, $quoted_as;
757       push @out_sel, $quoted_as;
758       $renamed{$node->{sql}} = $quoted_as;
759     }
760     else {
761       push @in_sel, $node->{sql};
762       push @out_sel, $self->_quote ($node->{as});
763     }
764   }
765   # see if the order gives us anything
766   my %extra_order_sel;
767   for my $chunk ($self->_order_by_chunks ($rs_attrs->{order_by})) {
768     # order with bind
769     $chunk = $chunk->[0] if (ref $chunk) eq 'ARRAY';
770     $chunk =~ s/\s+ (?: ASC|DESC ) \s* $//ix;
771
772     next if $in_sel_index->{$chunk};
773
774     $extra_order_sel{$chunk} ||= $self->_quote (
775       'ORDER__BY__' . scalar keys %extra_order_sel
776     );
777   }
778
779   return (
780     $proto_sql,
781     (map { join (', ', @$_ ) } (
782       \@in_sel,
783       \@out_sel)
784     ),
785     \%renamed,
786     keys %extra_order_sel ? \%extra_order_sel : (),
787   );
788 }
789
790 sub _unqualify_colname {
791   my ($self, $fqcn) = @_;
792   $fqcn =~ s/ \. /__/xg;
793   return $fqcn;
794 }
795
796 1;
797
798 =head1 AUTHORS
799
800 See L<DBIx::Class/CONTRIBUTORS>.
801
802 =head1 LICENSE
803
804 You may distribute this code under the same terms as Perl itself.
805
806 =cut