fixed order of rows difference between first and subsequent pages for Oracle
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / SQLMaker / LimitDialects.pm
1 package DBIx::Class::SQLMaker::LimitDialects;
2
3 use warnings;
4 use strict;
5
6 use List::Util 'first';
7 use namespace::clean;
8
9 # constants are used not only here, but also in comparison tests
10 sub __rows_bindtype () {
11   +{ sqlt_datatype => 'integer' }
12 }
13 sub __offset_bindtype () {
14   +{ sqlt_datatype => 'integer' }
15 }
16 sub __total_bindtype () {
17   +{ sqlt_datatype => 'integer' }
18 }
19
20 =head1 NAME
21
22 DBIx::Class::SQLMaker::LimitDialects - SQL::Abstract::Limit-like functionality for DBIx::Class::SQLMaker
23
24 =head1 DESCRIPTION
25
26 This module replicates a lot of the functionality originally found in
27 L<SQL::Abstract::Limit>. While simple limits would work as-is, the more
28 complex dialects that require e.g. subqueries could not be reliably
29 implemented without taking full advantage of the metadata locked within
30 L<DBIx::Class::ResultSource> classes. After reimplementation of close to
31 80% of the L<SQL::Abstract::Limit> functionality it was deemed more
32 practical to simply make an independent DBIx::Class-specific limit-dialect
33 provider.
34
35 =head1 SQL LIMIT DIALECTS
36
37 Note that the actual implementations listed below never use C<*> literally.
38 Instead proper re-aliasing of selectors and order criteria is done, so that
39 the limit dialect are safe to use on joined resultsets with clashing column
40 names.
41
42 Currently the provided dialects are:
43
44 =head2 LimitOffset
45
46  SELECT ... LIMIT $limit OFFSET $offset
47
48 Supported by B<PostgreSQL> and B<SQLite>
49
50 =cut
51 sub _LimitOffset {
52     my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
53     $sql .= $self->_parse_rs_attrs( $rs_attrs ) . " LIMIT ?";
54     push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
55     if ($offset) {
56       $sql .= " OFFSET ?";
57       push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset ];
58     }
59     return $sql;
60 }
61
62 =head2 LimitXY
63
64  SELECT ... LIMIT $offset $limit
65
66 Supported by B<MySQL> and any L<SQL::Statement> based DBD
67
68 =cut
69 sub _LimitXY {
70     my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
71     $sql .= $self->_parse_rs_attrs( $rs_attrs ) . " LIMIT ";
72     if ($offset) {
73       $sql .= '?, ';
74       push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset ];
75     }
76     $sql .= '?';
77     push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
78
79     return $sql;
80 }
81
82 =head2 RowNumberOver
83
84  SELECT * FROM (
85   SELECT *, ROW_NUMBER() OVER( ORDER BY ... ) AS RNO__ROW__INDEX FROM (
86    SELECT ...
87   )
88  ) WHERE RNO__ROW__INDEX BETWEEN ($offset+1) AND ($limit+$offset)
89
90
91 ANSI standard Limit/Offset implementation. Supported by B<DB2> and
92 B<< MSSQL >= 2005 >>.
93
94 =cut
95 sub _RowNumberOver {
96   my ($self, $sql, $rs_attrs, $rows, $offset ) = @_;
97
98   # get selectors, and scan the order_by (if any)
99   my ($stripped_sql, $in_sel, $out_sel, $alias_map, $extra_order_sel)
100     = $self->_subqueried_limit_attrs ( $sql, $rs_attrs );
101
102   # make up an order if none exists
103   my $requested_order = (delete $rs_attrs->{order_by}) || $self->_rno_default_order;
104   my $rno_ord = $self->_order_by ($requested_order);
105
106   # this is the order supplement magic
107   my $mid_sel = $out_sel;
108   if ($extra_order_sel) {
109     for my $extra_col (sort
110       { $extra_order_sel->{$a} cmp $extra_order_sel->{$b} }
111       keys %$extra_order_sel
112     ) {
113       $in_sel .= sprintf (', %s AS %s',
114         $extra_col,
115         $extra_order_sel->{$extra_col},
116       );
117
118       $mid_sel .= ', ' . $extra_order_sel->{$extra_col};
119     }
120   }
121
122   # and this is order re-alias magic
123   for ($extra_order_sel, $alias_map) {
124     for my $col (keys %$_) {
125       my $re_col = quotemeta ($col);
126       $rno_ord =~ s/$re_col/$_->{$col}/;
127     }
128   }
129
130   # whatever is left of the order_by (only where is processed at this point)
131   my $group_having = $self->_parse_rs_attrs($rs_attrs);
132
133   my $qalias = $self->_quote ($rs_attrs->{alias});
134   my $idx_name = $self->_quote ('rno__row__index');
135
136   push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset + 1], [ $self->__total_bindtype => $offset + $rows ];
137
138   return <<EOS;
139
140 SELECT $out_sel FROM (
141   SELECT $mid_sel, ROW_NUMBER() OVER( $rno_ord ) AS $idx_name FROM (
142     SELECT $in_sel ${stripped_sql}${group_having}
143   ) $qalias
144 ) $qalias WHERE $idx_name >= ? AND $idx_name <= ?
145
146 EOS
147
148 }
149
150 # some databases are happy with OVER (), some need OVER (ORDER BY (SELECT (1)) )
151 sub _rno_default_order {
152   return undef;
153 }
154
155 =head2 SkipFirst
156
157  SELECT SKIP $offset FIRST $limit * FROM ...
158
159 Suported by B<Informix>, almost like LimitOffset. According to
160 L<SQL::Abstract::Limit> C<... SKIP $offset LIMIT $limit ...> is also supported.
161
162 =cut
163 sub _SkipFirst {
164   my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
165
166   $sql =~ s/^ \s* SELECT \s+ //ix
167     or $self->throw_exception("Unrecognizable SELECT: $sql");
168
169   return sprintf ('SELECT %s%s%s%s',
170     $offset
171       ? do {
172          push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset];
173          'SKIP ? '
174       }
175       : ''
176     ,
177     do {
178        push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
179        'FIRST ? '
180     },
181     $sql,
182     $self->_parse_rs_attrs ($rs_attrs),
183   );
184 }
185
186 =head2 FirstSkip
187
188  SELECT FIRST $limit SKIP $offset * FROM ...
189
190 Supported by B<Firebird/Interbase>, reverse of SkipFirst. According to
191 L<SQL::Abstract::Limit> C<... ROWS $limit TO $offset ...> is also supported.
192
193 =cut
194 sub _FirstSkip {
195   my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
196
197   $sql =~ s/^ \s* SELECT \s+ //ix
198     or $self->throw_exception("Unrecognizable SELECT: $sql");
199
200   return sprintf ('SELECT %s%s%s%s',
201     do {
202        push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
203        'FIRST ? '
204     },
205     $offset
206       ? do {
207          push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset];
208          'SKIP ? '
209       }
210       : ''
211     ,
212     $sql,
213     $self->_parse_rs_attrs ($rs_attrs),
214   );
215 }
216
217
218 =head2 RowNum
219
220 Depending on the resultset attributes one of:
221
222  SELECT * FROM (
223   SELECT *, ROWNUM rownum__index FROM (
224    SELECT ...
225   ) WHERE ROWNUM <= ($limit+$offset)
226  ) WHERE rownum__index >= ($offset+1)
227
228 or
229
230  SELECT * FROM (
231   SELECT *, ROWNUM rownum__index FROM (
232     SELECT ...
233   )
234  ) WHERE rownum__index BETWEEN ($offset+1) AND ($limit+$offset)
235
236 or
237
238  SELECT * FROM (
239     SELECT ...
240   ) WHERE ROWNUM <= ($limit+1)
241
242 Supported by B<Oracle>.
243
244 =cut
245 sub _RowNum {
246   my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
247
248   my ($stripped_sql, $insel, $outsel) = $self->_subqueried_limit_attrs ($sql, $rs_attrs);
249
250   my $qalias = $self->_quote ($rs_attrs->{alias});
251   my $idx_name = $self->_quote ('rownum__index');
252   my $order_group_having = $self->_parse_rs_attrs($rs_attrs);
253
254   #
255   # There are two ways to limit in Oracle, one vastly faster than the other
256   # on large resultsets: https://decipherinfosys.wordpress.com/2007/08/09/paging-and-countstopkey-optimization/
257   # However Oracle is retarded and does not preserve stable ROWNUM() values
258   # when called twice in the same scope. Therefore unless the resultset is
259   # ordered by a unique set of columns, it is not safe to use the faster
260   # method, and the slower BETWEEN query is used instead
261   #
262   # FIXME - this is quite expensive, and doe snot perform caching of any sort
263   # as soon as some of the DQ work becomes viable consider switching this
264   # over
265   if ( __order_by_is_unique($rs_attrs) ) {
266
267     # if offset is 0 (first page) the we can skip a subquery
268     if (! $offset) {
269       push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
270
271       return <<EOS;
272 SELECT $outsel FROM (
273   SELECT $insel ${stripped_sql}${order_group_having}
274 ) $qalias WHERE ROWNUM <= ?
275 EOS
276     }
277     else {
278       push @{$self->{limit_bind}}, [ $self->__total_bindtype => $offset + $rows ], [ $self->__offset_bindtype => $offset + 1 ];
279
280       return <<EOS;
281 SELECT $outsel FROM (
282   SELECT $outsel, ROWNUM $idx_name FROM (
283     SELECT $insel ${stripped_sql}${order_group_having}
284   ) $qalias WHERE ROWNUM <= ?
285 ) $qalias WHERE $idx_name >= ?
286 EOS
287     }
288   }
289   else {
290     push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset + 1 ], [ $self->__total_bindtype => $offset + $rows ];
291
292     return <<EOS;
293 SELECT $outsel FROM (
294   SELECT $outsel, ROWNUM $idx_name FROM (
295     SELECT $insel ${stripped_sql}${order_group_having}
296   ) $qalias
297 ) $qalias WHERE $idx_name BETWEEN ? AND ?
298 EOS
299   }
300 }
301
302 # determine if the supplied order_by contains a unique column (set)
303 sub __order_by_is_unique {
304   my $rs_attrs = shift;
305   my $rsrc = $rs_attrs->{_rsroot_rsrc};
306   my $order_by = $rs_attrs->{order_by}
307     || return 0;
308
309   my $storage = $rsrc->schema->storage;
310
311   my @order_by_cols = map { $_->[0] } $storage->_extract_order_criteria($order_by)
312     or return 0;
313
314   my $colinfo =
315     $storage->_resolve_column_info($rs_attrs->{from}, \@order_by_cols);
316
317   my $sources = {
318     map {( "$_" => $_ )} map { $_->{-result_source} } values %$colinfo
319   };
320
321   my $supplied_order = {
322     map { $_ => 1 }
323     grep { exists $colinfo->{$_} and ! $colinfo->{$_}{is_nullable} }
324     @order_by_cols
325   };
326
327   return 0 unless keys %$supplied_order;
328
329   for my $uks (
330     map { values %$_ } map { +{ $_->unique_constraints } } values %$sources
331   ) {
332     return 1
333       unless first { ! exists $supplied_order->{$_} } @$uks;
334   }
335
336   return 0;
337 }
338
339 # used by _Top and _FetchFirst below
340 sub _prep_for_skimming_limit {
341   my ( $self, $sql, $rs_attrs ) = @_;
342
343   # get selectors
344   my (%r, $alias_map, $extra_order_sel);
345   ($r{inner_sql}, $r{in_sel}, $r{out_sel}, $alias_map, $extra_order_sel)
346     = $self->_subqueried_limit_attrs ($sql, $rs_attrs);
347
348   my $requested_order = delete $rs_attrs->{order_by};
349   $r{order_by_requested} = $self->_order_by ($requested_order);
350
351   # make up an order unless supplied
352   my $inner_order = ($r{order_by_requested}
353     ? $requested_order
354     : [ map
355       { "$rs_attrs->{alias}.$_" }
356       ( $rs_attrs->{_rsroot_rsrc}->_pri_cols )
357     ]
358   );
359
360   # localise as we already have all the bind values we need
361   {
362     local $self->{order_bind};
363     $r{order_by_inner} = $self->_order_by ($inner_order);
364
365     my @out_chunks;
366     for my $ch ($self->_order_by_chunks ($inner_order)) {
367       $ch = $ch->[0] if ref $ch eq 'ARRAY';
368
369       $ch =~ s/\s+ ( ASC|DESC ) \s* $//ix;
370       my $dir = uc ($1||'ASC');
371
372       push @out_chunks, \join (' ', $ch, $dir eq 'ASC' ? 'DESC' : 'ASC' );
373     }
374
375     $r{order_by_reversed} = $self->_order_by (\@out_chunks);
376   }
377
378   # this is the order supplement magic
379   $r{mid_sel} = $r{out_sel};
380   if ($extra_order_sel) {
381     for my $extra_col (sort
382       { $extra_order_sel->{$a} cmp $extra_order_sel->{$b} }
383       keys %$extra_order_sel
384     ) {
385       $r{in_sel} .= sprintf (', %s AS %s',
386         $extra_col,
387         $extra_order_sel->{$extra_col},
388       );
389
390       $r{mid_sel} .= ', ' . $extra_order_sel->{$extra_col};
391     }
392
393     # since whatever order bindvals there are, they will be realiased
394     # and need to show up in front of the entire initial inner subquery
395     # *unshift* the selector bind stack to make this happen (horrible,
396     # horrible, but we don't have another mechanism yet)
397     unshift @{$self->{select_bind}}, @{$self->{order_bind}};
398   }
399
400   # and this is order re-alias magic
401   for my $map ($extra_order_sel, $alias_map) {
402     for my $col (keys %$map) {
403       my $re_col = quotemeta ($col);
404       $_ =~ s/$re_col/$map->{$col}/
405         for ($r{order_by_reversed}, $r{order_by_requested});
406     }
407   }
408
409   # generate the rest of the sql
410   $r{grpby_having} = $self->_parse_rs_attrs ($rs_attrs);
411
412   $r{quoted_rs_alias} = $self->_quote ($rs_attrs->{alias});
413
414   \%r;
415 }
416
417 =head2 Top
418
419  SELECT * FROM
420
421  SELECT TOP $limit FROM (
422   SELECT TOP $limit FROM (
423    SELECT TOP ($limit+$offset) ...
424   ) ORDER BY $reversed_original_order
425  ) ORDER BY $original_order
426
427 Unreliable Top-based implementation, supported by B<< MSSQL < 2005 >>.
428
429 =head3 CAVEAT
430
431 Due to its implementation, this limit dialect returns B<incorrect results>
432 when $limit+$offset > total amount of rows in the resultset.
433
434 =cut
435
436 sub _Top {
437   my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
438
439   my %l = %{ $self->_prep_for_skimming_limit($sql, $rs_attrs) };
440
441   $sql = sprintf ('SELECT TOP %u %s %s %s %s',
442     $rows + ($offset||0),
443     $l{in_sel},
444     $l{inner_sql},
445     $l{grpby_having},
446     $l{order_by_inner},
447   );
448
449   $sql = sprintf ('SELECT TOP %u %s FROM ( %s ) %s %s',
450     $rows,
451     $l{mid_sel},
452     $sql,
453     $l{quoted_rs_alias},
454     $l{order_by_reversed},
455   ) if $offset;
456
457   $sql = sprintf ('SELECT TOP %u %s FROM ( %s ) %s %s',
458     $rows,
459     $l{out_sel},
460     $sql,
461     $l{quoted_rs_alias},
462     $l{order_by_requested},
463   ) if ( ($offset && $l{order_by_requested}) || ($l{mid_sel} ne $l{out_sel}) );
464
465   return $sql;
466 }
467
468 =head2 FetchFirst
469
470  SELECT * FROM
471  (
472  SELECT * FROM (
473   SELECT * FROM (
474    SELECT * FROM ...
475   ) ORDER BY $reversed_original_order
476     FETCH FIRST $limit ROWS ONLY
477  ) ORDER BY $original_order
478    FETCH FIRST $limit ROWS ONLY
479  )
480
481 Unreliable FetchFirst-based implementation, supported by B<< IBM DB2 <= V5R3 >>.
482
483 =head3 CAVEAT
484
485 Due to its implementation, this limit dialect returns B<incorrect results>
486 when $limit+$offset > total amount of rows in the resultset.
487
488 =cut
489
490 sub _FetchFirst {
491   my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
492
493   my %l = %{ $self->_prep_for_skimming_limit($sql, $rs_attrs) };
494
495   $sql = sprintf ('SELECT %s %s %s %s FETCH FIRST %u ROWS ONLY',
496     $l{in_sel},
497     $l{inner_sql},
498     $l{grpby_having},
499     $l{order_by_inner},
500     $rows + ($offset||0),
501   );
502
503   $sql = sprintf ('SELECT %s FROM ( %s ) %s %s FETCH FIRST %u ROWS ONLY',
504     $l{mid_sel},
505     $sql,
506     $l{quoted_rs_alias},
507     $l{order_by_reversed},
508     $rows,
509   ) if $offset;
510
511   $sql = sprintf ('SELECT %s FROM ( %s ) %s %s FETCH FIRST %u ROWS ONLY',
512     $l{out_sel},
513     $sql,
514     $l{quoted_rs_alias},
515     $l{order_by_requested},
516     $rows,
517   ) if ( ($offset && $l{order_by_requested}) || ($l{mid_sel} ne $l{out_sel}) );
518
519   return $sql;
520 }
521
522 =head2 RowCountOrGenericSubQ
523
524 This is not exactly a limit dialect, but more of a proxy for B<Sybase ASE>.
525 If no $offset is supplied the limit is simply performed as:
526
527  SET ROWCOUNT $limit
528  SELECT ...
529  SET ROWCOUNT 0
530
531 Otherwise we fall back to L</GenericSubQ>
532
533 =cut
534
535 sub _RowCountOrGenericSubQ {
536   my $self = shift;
537   my ($sql, $rs_attrs, $rows, $offset) = @_;
538
539   return $self->_GenericSubQ(@_) if $offset;
540
541   return sprintf <<"EOF", $rows, $sql;
542 SET ROWCOUNT %d
543 %s
544 SET ROWCOUNT 0
545 EOF
546 }
547
548 =head2 GenericSubQ
549
550  SELECT * FROM (
551   SELECT ...
552  )
553  WHERE (
554   SELECT COUNT(*) FROM $original_table cnt WHERE cnt.id < $original_table.id
555  ) BETWEEN $offset AND ($offset+$rows-1)
556
557 This is the most evil limit "dialect" (more of a hack) for I<really> stupid
558 databases. It works by ordering the set by some unique column, and calculating
559 the amount of rows that have a less-er value (thus emulating a L</RowNum>-like
560 index). Of course this implies the set can only be ordered by a single unique
561 column. Also note that this technique can be and often is B<excruciatingly
562 slow>.
563
564 Currently used by B<Sybase ASE>, due to lack of any other option.
565
566 =cut
567 sub _GenericSubQ {
568   my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
569
570   my $root_rsrc = $rs_attrs->{_rsroot_rsrc};
571   my $root_tbl_name = $root_rsrc->name;
572
573   my ($order_by, @rest) = do {
574     local $self->{quote_char};
575     $self->_order_by_chunks ($rs_attrs->{order_by})
576   };
577
578   unless (
579     $order_by
580       &&
581     ! @rest
582       &&
583     ( ! ref $order_by
584         ||
585       ( ref $order_by eq 'ARRAY' and @$order_by == 1 )
586     )
587   ) {
588     $self->throw_exception (
589       'Generic Subquery Limit does not work on resultsets without an order, or resultsets '
590     . 'with complex order criteria (multicolumn and/or functions). Provide a single, '
591     . 'unique-column order criteria.'
592     );
593   }
594
595   ($order_by) = @$order_by if ref $order_by;
596
597   $order_by =~ s/\s+ ( ASC|DESC ) \s* $//ix;
598   my $direction = lc ($1 || 'asc');
599
600   my ($unq_sort_col) = $order_by =~ /(?:^|\.)([^\.]+)$/;
601
602   my $inf = $root_rsrc->storage->_resolve_column_info (
603     $rs_attrs->{from}, [$order_by, $unq_sort_col]
604   );
605
606   my $ord_colinfo = $inf->{$order_by} || $self->throw_exception("Unable to determine source of order-criteria '$order_by'");
607
608   if ($ord_colinfo->{-result_source}->name ne $root_tbl_name) {
609     $self->throw_exception(sprintf
610       "Generic Subquery Limit order criteria can be only based on the root-source '%s'"
611     . " (aliased as '%s')", $root_rsrc->source_name, $rs_attrs->{alias},
612     );
613   }
614
615   # make sure order column is qualified
616   $order_by = "$rs_attrs->{alias}.$order_by"
617     unless $order_by =~ /^$rs_attrs->{alias}\./;
618
619   my $is_u;
620   my $ucs = { $root_rsrc->unique_constraints };
621   for (values %$ucs ) {
622     if (@$_ == 1 && "$rs_attrs->{alias}.$_->[0]" eq $order_by) {
623       $is_u++;
624       last;
625     }
626   }
627   $self->throw_exception(
628     "Generic Subquery Limit order criteria column '$order_by' must be unique (no unique constraint found)"
629   ) unless $is_u;
630
631   my ($stripped_sql, $in_sel, $out_sel, $alias_map, $extra_order_sel)
632     = $self->_subqueried_limit_attrs ($sql, $rs_attrs);
633
634   my $cmp_op = $direction eq 'desc' ? '>' : '<';
635   my $count_tbl_alias = 'rownum__emulation';
636
637   my $order_sql = $self->_order_by (delete $rs_attrs->{order_by});
638   my $group_having_sql = $self->_parse_rs_attrs($rs_attrs);
639
640   # add the order supplement (if any) as this is what will be used for the outer WHERE
641   $in_sel .= ", $_" for keys %{$extra_order_sel||{}};
642
643   my $rownum_cond;
644   if ($offset) {
645     $rownum_cond = 'BETWEEN ? AND ?';
646
647     push @{$self->{limit_bind}},
648       [ $self->__offset_bindtype => $offset ],
649       [ $self->__total_bindtype => $offset + $rows - 1]
650     ;
651   }
652   else {
653     $rownum_cond = '< ?';
654
655     push @{$self->{limit_bind}},
656       [ $self->__rows_bindtype => $rows ]
657     ;
658   }
659
660   return sprintf ("
661 SELECT $out_sel
662   FROM (
663     SELECT $in_sel ${stripped_sql}${group_having_sql}
664   ) %s
665 WHERE ( SELECT COUNT(*) FROM %s %s WHERE %s $cmp_op %s ) $rownum_cond
666 $order_sql
667   ", map { $self->_quote ($_) } (
668     $rs_attrs->{alias},
669     $root_tbl_name,
670     $count_tbl_alias,
671     "$count_tbl_alias.$unq_sort_col",
672     $order_by,
673   ));
674 }
675
676
677 # !!! THIS IS ALSO HORRIFIC !!! /me ashamed
678 #
679 # Generates inner/outer select lists for various limit dialects
680 # which result in one or more subqueries (e.g. RNO, Top, RowNum)
681 # Any non-root-table columns need to have their table qualifier
682 # turned into a column alias (otherwise names in subqueries clash
683 # and/or lose their source table)
684 #
685 # Returns mangled proto-sql, inner/outer strings of SQL QUOTED selectors
686 # with aliases (to be used in whatever select statement), and an alias
687 # index hashref of QUOTED SEL => QUOTED ALIAS pairs (to maybe be used 
688 # for string-subst higher up).
689 # If an order_by is supplied, the inner select needs to bring out columns
690 # used in implicit (non-selected) orders, and the order condition itself
691 # needs to be realiased to the proper names in the outer query. Thus we
692 # also return a hashref (order doesn't matter) of QUOTED EXTRA-SEL =>
693 # QUOTED ALIAS pairs, which is a list of extra selectors that do *not*
694 # exist in the original select list
695 sub _subqueried_limit_attrs {
696   my ($self, $proto_sql, $rs_attrs) = @_;
697
698   $self->throw_exception(
699     'Limit dialect implementation usable only in the context of DBIC (missing $rs_attrs)'
700   ) unless ref ($rs_attrs) eq 'HASH';
701
702   # mangle the input sql as we will be replacing the selector entirely
703   unless (
704     $rs_attrs->{_selector_sql}
705       and
706     $proto_sql =~ s/^ \s* SELECT \s* \Q$rs_attrs->{_selector_sql}//ix
707   ) {
708     $self->throw_exception("Unrecognizable SELECT: $proto_sql");
709   }
710
711   my ($re_sep, $re_alias) = map { quotemeta $_ } ( $self->{name_sep}, $rs_attrs->{alias} );
712
713   # insulate from the multiple _recurse_fields calls below
714   local $self->{select_bind};
715
716   # correlate select and as, build selection index
717   my (@sel, $in_sel_index);
718   for my $i (0 .. $#{$rs_attrs->{select}}) {
719
720     my $s = $rs_attrs->{select}[$i];
721     my $sql_sel = $self->_recurse_fields ($s);
722     my $sql_alias = (ref $s) eq 'HASH' ? $s->{-as} : undef;
723
724     push @sel, {
725       sql => $sql_sel,
726       unquoted_sql => do {
727         local $self->{quote_char};
728         $self->_recurse_fields ($s);
729       },
730       as =>
731         $sql_alias
732           ||
733         $rs_attrs->{as}[$i]
734           ||
735         $self->throw_exception("Select argument $i ($s) without corresponding 'as'")
736       ,
737     };
738
739     $in_sel_index->{$sql_sel}++;
740     $in_sel_index->{$self->_quote ($sql_alias)}++ if $sql_alias;
741
742     # record unqualified versions too, so we do not have
743     # to reselect the same column twice (in qualified and
744     # unqualified form)
745     if (! ref $s && $sql_sel =~ / $re_sep (.+) $/x) {
746       $in_sel_index->{$1}++;
747     }
748   }
749
750
751   # re-alias and remove any name separators from aliases,
752   # unless we are dealing with the current source alias
753   # (which will transcend the subqueries as it is necessary
754   # for possible further chaining)
755   my (@in_sel, @out_sel, %renamed);
756   for my $node (@sel) {
757     if (
758       $node->{as} =~ / (?<! ^ $re_alias ) \. /x
759         or
760       $node->{unquoted_sql} =~ / (?<! ^ $re_alias ) $re_sep /x
761     ) {
762       $node->{as} = $self->_unqualify_colname($node->{as});
763       my $quoted_as = $self->_quote($node->{as});
764       push @in_sel, sprintf '%s AS %s', $node->{sql}, $quoted_as;
765       push @out_sel, $quoted_as;
766       $renamed{$node->{sql}} = $quoted_as;
767     }
768     else {
769       push @in_sel, $node->{sql};
770       push @out_sel, $self->_quote ($node->{as});
771     }
772   }
773   # see if the order gives us anything
774   my %extra_order_sel;
775   for my $chunk ($self->_order_by_chunks ($rs_attrs->{order_by})) {
776     # order with bind
777     $chunk = $chunk->[0] if (ref $chunk) eq 'ARRAY';
778     $chunk =~ s/\s+ (?: ASC|DESC ) \s* $//ix;
779
780     next if $in_sel_index->{$chunk};
781
782     $extra_order_sel{$chunk} ||= $self->_quote (
783       'ORDER__BY__' . scalar keys %extra_order_sel
784     );
785   }
786
787   return (
788     $proto_sql,
789     (map { join (', ', @$_ ) } (
790       \@in_sel,
791       \@out_sel)
792     ),
793     \%renamed,
794     keys %extra_order_sel ? \%extra_order_sel : (),
795   );
796 }
797
798 sub _unqualify_colname {
799   my ($self, $fqcn) = @_;
800   $fqcn =~ s/ \. /__/xg;
801   return $fqcn;
802 }
803
804 1;
805
806 =head1 AUTHORS
807
808 See L<DBIx::Class/CONTRIBUTORS>.
809
810 =head1 LICENSE
811
812 You may distribute this code under the same terms as Perl itself.
813
814 =cut