8e85ac8473b5b4ce8f435bfa8c4898e6f4340e79
[dbsrgits/DBIx-Class-Historic.git] / lib / DBIx / Class / SQLMaker / LimitDialects.pm
1 package DBIx::Class::SQLMaker::LimitDialects;
2
3 use warnings;
4 use strict;
5
6 use List::Util 'first';
7 use namespace::clean;
8
9 # constants are used not only here, but also in comparison tests
10 sub __rows_bindtype () {
11   +{ sqlt_datatype => 'integer' }
12 }
13 sub __offset_bindtype () {
14   +{ sqlt_datatype => 'integer' }
15 }
16 sub __total_bindtype () {
17   +{ sqlt_datatype => 'integer' }
18 }
19
20 =head1 NAME
21
22 DBIx::Class::SQLMaker::LimitDialects - SQL::Abstract::Limit-like functionality for DBIx::Class::SQLMaker
23
24 =head1 DESCRIPTION
25
26 This module replicates a lot of the functionality originally found in
27 L<SQL::Abstract::Limit>. While simple limits would work as-is, the more
28 complex dialects that require e.g. subqueries could not be reliably
29 implemented without taking full advantage of the metadata locked within
30 L<DBIx::Class::ResultSource> classes. After reimplementation of close to
31 80% of the L<SQL::Abstract::Limit> functionality it was deemed more
32 practical to simply make an independent DBIx::Class-specific limit-dialect
33 provider.
34
35 =head1 SQL LIMIT DIALECTS
36
37 Note that the actual implementations listed below never use C<*> literally.
38 Instead proper re-aliasing of selectors and order criteria is done, so that
39 the limit dialect are safe to use on joined resultsets with clashing column
40 names.
41
42 Currently the provided dialects are:
43
44 =head2 LimitOffset
45
46  SELECT ... LIMIT $limit OFFSET $offset
47
48 Supported by B<PostgreSQL> and B<SQLite>
49
50 =cut
51 sub _LimitOffset {
52     my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
53     $sql .= $self->_parse_rs_attrs( $rs_attrs ) . " LIMIT ?";
54     push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
55     if ($offset) {
56       $sql .= " OFFSET ?";
57       push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset ];
58     }
59     return $sql;
60 }
61
62 =head2 LimitXY
63
64  SELECT ... LIMIT $offset $limit
65
66 Supported by B<MySQL> and any L<SQL::Statement> based DBD
67
68 =cut
69 sub _LimitXY {
70     my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
71     $sql .= $self->_parse_rs_attrs( $rs_attrs ) . " LIMIT ";
72     if ($offset) {
73       $sql .= '?, ';
74       push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset ];
75     }
76     $sql .= '?';
77     push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
78
79     return $sql;
80 }
81
82 =head2 RowNumberOver
83
84  SELECT * FROM (
85   SELECT *, ROW_NUMBER() OVER( ORDER BY ... ) AS RNO__ROW__INDEX FROM (
86    SELECT ...
87   )
88  ) WHERE RNO__ROW__INDEX BETWEEN ($offset+1) AND ($limit+$offset)
89
90
91 ANSI standard Limit/Offset implementation. Supported by B<DB2> and
92 B<< MSSQL >= 2005 >>.
93
94 =cut
95 sub _RowNumberOver {
96   my ($self, $sql, $rs_attrs, $rows, $offset ) = @_;
97
98   # get selectors, and scan the order_by (if any)
99   my ($stripped_sql, $in_sel, $out_sel, $alias_map, $extra_order_sel)
100     = $self->_subqueried_limit_attrs ( $sql, $rs_attrs );
101
102   # make up an order if none exists
103   my $requested_order = (delete $rs_attrs->{order_by}) || $self->_rno_default_order;
104   my $rno_ord = $self->_order_by ($requested_order);
105
106   # this is the order supplement magic
107   my $mid_sel = $out_sel;
108   if ($extra_order_sel) {
109     for my $extra_col (sort
110       { $extra_order_sel->{$a} cmp $extra_order_sel->{$b} }
111       keys %$extra_order_sel
112     ) {
113       $in_sel .= sprintf (', %s AS %s',
114         $extra_col,
115         $extra_order_sel->{$extra_col},
116       );
117
118       $mid_sel .= ', ' . $extra_order_sel->{$extra_col};
119     }
120   }
121
122   # and this is order re-alias magic
123   for ($extra_order_sel, $alias_map) {
124     for my $col (keys %$_) {
125       my $re_col = quotemeta ($col);
126       $rno_ord =~ s/$re_col/$_->{$col}/;
127     }
128   }
129
130   # whatever is left of the order_by (only where is processed at this point)
131   my $group_having = $self->_parse_rs_attrs($rs_attrs);
132
133   my $qalias = $self->_quote ($rs_attrs->{alias});
134   my $idx_name = $self->_quote ('rno__row__index');
135
136   push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset + 1], [ $self->__total_bindtype => $offset + $rows ];
137
138   return <<EOS;
139
140 SELECT $out_sel FROM (
141   SELECT $mid_sel, ROW_NUMBER() OVER( $rno_ord ) AS $idx_name FROM (
142     SELECT $in_sel ${stripped_sql}${group_having}
143   ) $qalias
144 ) $qalias WHERE $idx_name >= ? AND $idx_name <= ?
145
146 EOS
147
148 }
149
150 # some databases are happy with OVER (), some need OVER (ORDER BY (SELECT (1)) )
151 sub _rno_default_order {
152   return undef;
153 }
154
155 =head2 SkipFirst
156
157  SELECT SKIP $offset FIRST $limit * FROM ...
158
159 Suported by B<Informix>, almost like LimitOffset. According to
160 L<SQL::Abstract::Limit> C<... SKIP $offset LIMIT $limit ...> is also supported.
161
162 =cut
163 sub _SkipFirst {
164   my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
165
166   $sql =~ s/^ \s* SELECT \s+ //ix
167     or $self->throw_exception("Unrecognizable SELECT: $sql");
168
169   return sprintf ('SELECT %s%s%s%s',
170     $offset
171       ? do {
172          push @{$self->{pre_select_bind}}, [ $self->__offset_bindtype => $offset];
173          'SKIP ? '
174       }
175       : ''
176     ,
177     do {
178        push @{$self->{pre_select_bind}}, [ $self->__rows_bindtype => $rows ];
179        'FIRST ? '
180     },
181     $sql,
182     $self->_parse_rs_attrs ($rs_attrs),
183   );
184 }
185
186 =head2 FirstSkip
187
188  SELECT FIRST $limit SKIP $offset * FROM ...
189
190 Supported by B<Firebird/Interbase>, reverse of SkipFirst. According to
191 L<SQL::Abstract::Limit> C<... ROWS $limit TO $offset ...> is also supported.
192
193 =cut
194 sub _FirstSkip {
195   my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
196
197   $sql =~ s/^ \s* SELECT \s+ //ix
198     or $self->throw_exception("Unrecognizable SELECT: $sql");
199
200   return sprintf ('SELECT %s%s%s%s',
201     do {
202        push @{$self->{pre_select_bind}}, [ $self->__rows_bindtype => $rows ];
203        'FIRST ? '
204     },
205     $offset
206       ? do {
207          push @{$self->{pre_select_bind}}, [ $self->__offset_bindtype => $offset];
208          'SKIP ? '
209       }
210       : ''
211     ,
212     $sql,
213     $self->_parse_rs_attrs ($rs_attrs),
214   );
215 }
216
217
218 =head2 RowNum
219
220 Depending on the resultset attributes one of:
221
222  SELECT * FROM (
223   SELECT *, ROWNUM rownum__index FROM (
224    SELECT ...
225   ) WHERE ROWNUM <= ($limit+$offset)
226  ) WHERE rownum__index >= ($offset+1)
227
228 or
229
230  SELECT * FROM (
231   SELECT *, ROWNUM rownum__index FROM (
232     SELECT ...
233   )
234  ) WHERE rownum__index BETWEEN ($offset+1) AND ($limit+$offset)
235
236 or
237
238  SELECT * FROM (
239     SELECT ...
240   ) WHERE ROWNUM <= ($limit+1)
241
242 Supported by B<Oracle>.
243
244 =cut
245 sub _RowNum {
246   my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
247
248   my ($stripped_sql, $insel, $outsel) = $self->_subqueried_limit_attrs ($sql, $rs_attrs);
249
250   my $qalias = $self->_quote ($rs_attrs->{alias});
251   my $idx_name = $self->_quote ('rownum__index');
252   my $order_group_having = $self->_parse_rs_attrs($rs_attrs);
253
254   #
255   # There are two ways to limit in Oracle, one vastly faster than the other
256   # on large resultsets: https://decipherinfosys.wordpress.com/2007/08/09/paging-and-countstopkey-optimization/
257   # However Oracle is retarded and does not preserve stable ROWNUM() values
258   # when called twice in the same scope. Therefore unless the resultset is
259   # ordered by a unique set of columns, it is not safe to use the faster
260   # method, and the slower BETWEEN query is used instead
261   #
262   # FIXME - this is quite expensive, and doe snot perform caching of any sort
263   # as soon as some of the DQ work becomes viable consider switching this
264   # over
265   if ( __order_by_is_unique($rs_attrs) ) {
266
267     # if offset is 0 (first page) the we can skip a subquery
268     if (! $offset) {
269       push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
270
271       return <<EOS;
272 SELECT $outsel FROM (
273   SELECT $insel ${stripped_sql}${order_group_having}
274 ) $qalias WHERE ROWNUM <= ?
275 EOS
276     }
277     else {
278       push @{$self->{limit_bind}}, [ $self->__total_bindtype => $offset + $rows ], [ $self->__offset_bindtype => $offset + 1 ];
279
280       return <<EOS;
281 SELECT $outsel FROM (
282   SELECT $outsel, ROWNUM $idx_name FROM (
283     SELECT $insel ${stripped_sql}${order_group_having}
284   ) $qalias WHERE ROWNUM <= ?
285 ) $qalias WHERE $idx_name >= ?
286 EOS
287     }
288   }
289   else {
290     push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset + 1 ], [ $self->__total_bindtype => $offset + $rows ];
291
292     return <<EOS;
293 SELECT $outsel FROM (
294   SELECT $outsel, ROWNUM $idx_name FROM (
295     SELECT $insel ${stripped_sql}${order_group_having}
296   ) $qalias
297 ) $qalias WHERE $idx_name BETWEEN ? AND ?
298 EOS
299   }
300 }
301
302 # determine if the supplied order_by contains a unique column (set)
303 sub __order_by_is_unique {
304   my $rs_attrs = shift;
305   my $rsrc = $rs_attrs->{_rsroot_rsrc};
306   my $order_by = $rs_attrs->{order_by}
307     || return 0;
308
309   my $storage = $rsrc->schema->storage;
310
311   my @order_by_cols = map { $_->[0] } $storage->_extract_order_criteria($order_by)
312     or return 0;
313
314   my $colinfo =
315     $storage->_resolve_column_info($rs_attrs->{from}, \@order_by_cols);
316
317   my $sources = {
318     map {( "$_" => $_ )} map { $_->{-result_source} } values %$colinfo
319   };
320
321   my $supplied_order = {
322     map { $_ => 1 }
323     grep { exists $colinfo->{$_} and ! $colinfo->{$_}{is_nullable} }
324     @order_by_cols
325   };
326
327   return 0 unless keys %$supplied_order;
328
329   for my $uks (
330     map { values %$_ } map { +{ $_->unique_constraints } } values %$sources
331   ) {
332     return 1
333       unless first { ! exists $supplied_order->{$_} } @$uks;
334   }
335
336   return 0;
337 }
338
339 # used by _Top and _FetchFirst below
340 sub _prep_for_skimming_limit {
341   my ( $self, $sql, $rs_attrs ) = @_;
342
343   # get selectors
344   my (%r, $alias_map, $extra_order_sel);
345   ($r{inner_sql}, $r{in_sel}, $r{out_sel}, $alias_map, $extra_order_sel)
346     = $self->_subqueried_limit_attrs ($sql, $rs_attrs);
347
348   my $requested_order = delete $rs_attrs->{order_by};
349   $r{order_by_requested} = $self->_order_by ($requested_order);
350
351   # make up an order unless supplied
352   my $inner_order = ($r{order_by_requested}
353     ? $requested_order
354     : [ map
355       { "$rs_attrs->{alias}.$_" }
356       ( $rs_attrs->{_rsroot_rsrc}->_pri_cols )
357     ]
358   );
359
360   # localise as we already have all the bind values we need
361   {
362     local $self->{order_bind};
363     $r{order_by_inner} = $self->_order_by ($inner_order);
364
365     my @out_chunks;
366     for my $ch ($self->_order_by_chunks ($inner_order)) {
367       $ch = $ch->[0] if ref $ch eq 'ARRAY';
368
369       $ch =~ s/\s+ ( ASC|DESC ) \s* $//ix;
370       my $dir = uc ($1||'ASC');
371
372       push @out_chunks, \join (' ', $ch, $dir eq 'ASC' ? 'DESC' : 'ASC' );
373     }
374
375     $r{order_by_reversed} = $self->_order_by (\@out_chunks);
376   }
377
378   # this is the order supplement magic
379   $r{mid_sel} = $r{out_sel};
380   if ($extra_order_sel) {
381     for my $extra_col (sort
382       { $extra_order_sel->{$a} cmp $extra_order_sel->{$b} }
383       keys %$extra_order_sel
384     ) {
385       $r{in_sel} .= sprintf (', %s AS %s',
386         $extra_col,
387         $extra_order_sel->{$extra_col},
388       );
389
390       $r{mid_sel} .= ', ' . $extra_order_sel->{$extra_col};
391     }
392
393     # Whatever order bindvals there are, they will be realiased and
394     # need to show up in front of the entire initial inner subquery
395     push @{$self->{pre_select_bind}}, @{$self->{order_bind}};
396   }
397
398   # and this is order re-alias magic
399   for my $map ($extra_order_sel, $alias_map) {
400     for my $col (keys %$map) {
401       my $re_col = quotemeta ($col);
402       $_ =~ s/$re_col/$map->{$col}/
403         for ($r{order_by_reversed}, $r{order_by_requested});
404     }
405   }
406
407   # generate the rest of the sql
408   $r{grpby_having} = $self->_parse_rs_attrs ($rs_attrs);
409
410   $r{quoted_rs_alias} = $self->_quote ($rs_attrs->{alias});
411
412   \%r;
413 }
414
415 =head2 Top
416
417  SELECT * FROM
418
419  SELECT TOP $limit FROM (
420   SELECT TOP $limit FROM (
421    SELECT TOP ($limit+$offset) ...
422   ) ORDER BY $reversed_original_order
423  ) ORDER BY $original_order
424
425 Unreliable Top-based implementation, supported by B<< MSSQL < 2005 >>.
426
427 =head3 CAVEAT
428
429 Due to its implementation, this limit dialect returns B<incorrect results>
430 when $limit+$offset > total amount of rows in the resultset.
431
432 =cut
433
434 sub _Top {
435   my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
436
437   my %l = %{ $self->_prep_for_skimming_limit($sql, $rs_attrs) };
438
439   $sql = sprintf ('SELECT TOP %u %s %s %s %s',
440     $rows + ($offset||0),
441     $l{in_sel},
442     $l{inner_sql},
443     $l{grpby_having},
444     $l{order_by_inner},
445   );
446
447   $sql = sprintf ('SELECT TOP %u %s FROM ( %s ) %s %s',
448     $rows,
449     $l{mid_sel},
450     $sql,
451     $l{quoted_rs_alias},
452     $l{order_by_reversed},
453   ) if $offset;
454
455   $sql = sprintf ('SELECT TOP %u %s FROM ( %s ) %s %s',
456     $rows,
457     $l{out_sel},
458     $sql,
459     $l{quoted_rs_alias},
460     $l{order_by_requested},
461   ) if ( ($offset && $l{order_by_requested}) || ($l{mid_sel} ne $l{out_sel}) );
462
463   return $sql;
464 }
465
466 =head2 FetchFirst
467
468  SELECT * FROM
469  (
470  SELECT * FROM (
471   SELECT * FROM (
472    SELECT * FROM ...
473   ) ORDER BY $reversed_original_order
474     FETCH FIRST $limit ROWS ONLY
475  ) ORDER BY $original_order
476    FETCH FIRST $limit ROWS ONLY
477  )
478
479 Unreliable FetchFirst-based implementation, supported by B<< IBM DB2 <= V5R3 >>.
480
481 =head3 CAVEAT
482
483 Due to its implementation, this limit dialect returns B<incorrect results>
484 when $limit+$offset > total amount of rows in the resultset.
485
486 =cut
487
488 sub _FetchFirst {
489   my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
490
491   my %l = %{ $self->_prep_for_skimming_limit($sql, $rs_attrs) };
492
493   $sql = sprintf ('SELECT %s %s %s %s FETCH FIRST %u ROWS ONLY',
494     $l{in_sel},
495     $l{inner_sql},
496     $l{grpby_having},
497     $l{order_by_inner},
498     $rows + ($offset||0),
499   );
500
501   $sql = sprintf ('SELECT %s FROM ( %s ) %s %s FETCH FIRST %u ROWS ONLY',
502     $l{mid_sel},
503     $sql,
504     $l{quoted_rs_alias},
505     $l{order_by_reversed},
506     $rows,
507   ) if $offset;
508
509   $sql = sprintf ('SELECT %s FROM ( %s ) %s %s FETCH FIRST %u ROWS ONLY',
510     $l{out_sel},
511     $sql,
512     $l{quoted_rs_alias},
513     $l{order_by_requested},
514     $rows,
515   ) if ( ($offset && $l{order_by_requested}) || ($l{mid_sel} ne $l{out_sel}) );
516
517   return $sql;
518 }
519
520 =head2 RowCountOrGenericSubQ
521
522 This is not exactly a limit dialect, but more of a proxy for B<Sybase ASE>.
523 If no $offset is supplied the limit is simply performed as:
524
525  SET ROWCOUNT $limit
526  SELECT ...
527  SET ROWCOUNT 0
528
529 Otherwise we fall back to L</GenericSubQ>
530
531 =cut
532
533 sub _RowCountOrGenericSubQ {
534   my $self = shift;
535   my ($sql, $rs_attrs, $rows, $offset) = @_;
536
537   return $self->_GenericSubQ(@_) if $offset;
538
539   return sprintf <<"EOF", $rows, $sql;
540 SET ROWCOUNT %d
541 %s
542 SET ROWCOUNT 0
543 EOF
544 }
545
546 =head2 GenericSubQ
547
548  SELECT * FROM (
549   SELECT ...
550  )
551  WHERE (
552   SELECT COUNT(*) FROM $original_table cnt WHERE cnt.id < $original_table.id
553  ) BETWEEN $offset AND ($offset+$rows-1)
554
555 This is the most evil limit "dialect" (more of a hack) for I<really> stupid
556 databases. It works by ordering the set by some unique column, and calculating
557 the amount of rows that have a less-er value (thus emulating a L</RowNum>-like
558 index). Of course this implies the set can only be ordered by a single unique
559 column. Also note that this technique can be and often is B<excruciatingly
560 slow>.
561
562 Currently used by B<Sybase ASE>, due to lack of any other option.
563
564 =cut
565 sub _GenericSubQ {
566   my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
567
568   my $root_rsrc = $rs_attrs->{_rsroot_rsrc};
569   my $root_tbl_name = $root_rsrc->name;
570
571   my ($order_by, @rest) = do {
572     local $self->{quote_char};
573     $self->_order_by_chunks ($rs_attrs->{order_by})
574   };
575
576   unless (
577     $order_by
578       &&
579     ! @rest
580       &&
581     ( ! ref $order_by
582         ||
583       ( ref $order_by eq 'ARRAY' and @$order_by == 1 )
584     )
585   ) {
586     $self->throw_exception (
587       'Generic Subquery Limit does not work on resultsets without an order, or resultsets '
588     . 'with complex order criteria (multicolumn and/or functions). Provide a single, '
589     . 'unique-column order criteria.'
590     );
591   }
592
593   ($order_by) = @$order_by if ref $order_by;
594
595   $order_by =~ s/\s+ ( ASC|DESC ) \s* $//ix;
596   my $direction = lc ($1 || 'asc');
597
598   my ($unq_sort_col) = $order_by =~ /(?:^|\.)([^\.]+)$/;
599
600   my $inf = $root_rsrc->storage->_resolve_column_info (
601     $rs_attrs->{from}, [$order_by, $unq_sort_col]
602   );
603
604   my $ord_colinfo = $inf->{$order_by} || $self->throw_exception("Unable to determine source of order-criteria '$order_by'");
605
606   if ($ord_colinfo->{-result_source}->name ne $root_tbl_name) {
607     $self->throw_exception(sprintf
608       "Generic Subquery Limit order criteria can be only based on the root-source '%s'"
609     . " (aliased as '%s')", $root_rsrc->source_name, $rs_attrs->{alias},
610     );
611   }
612
613   # make sure order column is qualified
614   $order_by = "$rs_attrs->{alias}.$order_by"
615     unless $order_by =~ /^$rs_attrs->{alias}\./;
616
617   my $is_u;
618   my $ucs = { $root_rsrc->unique_constraints };
619   for (values %$ucs ) {
620     if (@$_ == 1 && "$rs_attrs->{alias}.$_->[0]" eq $order_by) {
621       $is_u++;
622       last;
623     }
624   }
625   $self->throw_exception(
626     "Generic Subquery Limit order criteria column '$order_by' must be unique (no unique constraint found)"
627   ) unless $is_u;
628
629   my ($stripped_sql, $in_sel, $out_sel, $alias_map, $extra_order_sel)
630     = $self->_subqueried_limit_attrs ($sql, $rs_attrs);
631
632   my $cmp_op = $direction eq 'desc' ? '>' : '<';
633   my $count_tbl_alias = 'rownum__emulation';
634
635   my $order_sql = $self->_order_by (delete $rs_attrs->{order_by});
636   my $group_having_sql = $self->_parse_rs_attrs($rs_attrs);
637
638   # add the order supplement (if any) as this is what will be used for the outer WHERE
639   $in_sel .= ", $_" for keys %{$extra_order_sel||{}};
640
641   my $rownum_cond;
642   if ($offset) {
643     $rownum_cond = 'BETWEEN ? AND ?';
644
645     push @{$self->{limit_bind}},
646       [ $self->__offset_bindtype => $offset ],
647       [ $self->__total_bindtype => $offset + $rows - 1]
648     ;
649   }
650   else {
651     $rownum_cond = '< ?';
652
653     push @{$self->{limit_bind}},
654       [ $self->__rows_bindtype => $rows ]
655     ;
656   }
657
658   return sprintf ("
659 SELECT $out_sel
660   FROM (
661     SELECT $in_sel ${stripped_sql}${group_having_sql}
662   ) %s
663 WHERE ( SELECT COUNT(*) FROM %s %s WHERE %s $cmp_op %s ) $rownum_cond
664 $order_sql
665   ", map { $self->_quote ($_) } (
666     $rs_attrs->{alias},
667     $root_tbl_name,
668     $count_tbl_alias,
669     "$count_tbl_alias.$unq_sort_col",
670     $order_by,
671   ));
672 }
673
674
675 # !!! THIS IS ALSO HORRIFIC !!! /me ashamed
676 #
677 # Generates inner/outer select lists for various limit dialects
678 # which result in one or more subqueries (e.g. RNO, Top, RowNum)
679 # Any non-root-table columns need to have their table qualifier
680 # turned into a column alias (otherwise names in subqueries clash
681 # and/or lose their source table)
682 #
683 # Returns mangled proto-sql, inner/outer strings of SQL QUOTED selectors
684 # with aliases (to be used in whatever select statement), and an alias
685 # index hashref of QUOTED SEL => QUOTED ALIAS pairs (to maybe be used 
686 # for string-subst higher up).
687 # If an order_by is supplied, the inner select needs to bring out columns
688 # used in implicit (non-selected) orders, and the order condition itself
689 # needs to be realiased to the proper names in the outer query. Thus we
690 # also return a hashref (order doesn't matter) of QUOTED EXTRA-SEL =>
691 # QUOTED ALIAS pairs, which is a list of extra selectors that do *not*
692 # exist in the original select list
693 sub _subqueried_limit_attrs {
694   my ($self, $proto_sql, $rs_attrs) = @_;
695
696   $self->throw_exception(
697     'Limit dialect implementation usable only in the context of DBIC (missing $rs_attrs)'
698   ) unless ref ($rs_attrs) eq 'HASH';
699
700   # mangle the input sql as we will be replacing the selector entirely
701   unless (
702     $rs_attrs->{_selector_sql}
703       and
704     $proto_sql =~ s/^ \s* SELECT \s* \Q$rs_attrs->{_selector_sql}//ix
705   ) {
706     $self->throw_exception("Unrecognizable SELECT: $proto_sql");
707   }
708
709   my ($re_sep, $re_alias) = map { quotemeta $_ } ( $self->{name_sep}, $rs_attrs->{alias} );
710
711   # insulate from the multiple _recurse_fields calls below
712   local $self->{select_bind};
713
714   # correlate select and as, build selection index
715   my (@sel, $in_sel_index);
716   for my $i (0 .. $#{$rs_attrs->{select}}) {
717
718     my $s = $rs_attrs->{select}[$i];
719     my $sql_sel = $self->_recurse_fields ($s);
720     my $sql_alias = (ref $s) eq 'HASH' ? $s->{-as} : undef;
721
722     push @sel, {
723       sql => $sql_sel,
724       unquoted_sql => do {
725         local $self->{quote_char};
726         $self->_recurse_fields ($s);
727       },
728       as =>
729         $sql_alias
730           ||
731         $rs_attrs->{as}[$i]
732           ||
733         $self->throw_exception("Select argument $i ($s) without corresponding 'as'")
734       ,
735     };
736
737     $in_sel_index->{$sql_sel}++;
738     $in_sel_index->{$self->_quote ($sql_alias)}++ if $sql_alias;
739
740     # record unqualified versions too, so we do not have
741     # to reselect the same column twice (in qualified and
742     # unqualified form)
743     if (! ref $s && $sql_sel =~ / $re_sep (.+) $/x) {
744       $in_sel_index->{$1}++;
745     }
746   }
747
748
749   # re-alias and remove any name separators from aliases,
750   # unless we are dealing with the current source alias
751   # (which will transcend the subqueries as it is necessary
752   # for possible further chaining)
753   my (@in_sel, @out_sel, %renamed);
754   for my $node (@sel) {
755     if (
756       $node->{as} =~ / (?<! ^ $re_alias ) \. /x
757         or
758       $node->{unquoted_sql} =~ / (?<! ^ $re_alias ) $re_sep /x
759     ) {
760       $node->{as} = $self->_unqualify_colname($node->{as});
761       my $quoted_as = $self->_quote($node->{as});
762       push @in_sel, sprintf '%s AS %s', $node->{sql}, $quoted_as;
763       push @out_sel, $quoted_as;
764       $renamed{$node->{sql}} = $quoted_as;
765     }
766     else {
767       push @in_sel, $node->{sql};
768       push @out_sel, $self->_quote ($node->{as});
769     }
770   }
771   # see if the order gives us anything
772   my %extra_order_sel;
773   for my $chunk ($self->_order_by_chunks ($rs_attrs->{order_by})) {
774     # order with bind
775     $chunk = $chunk->[0] if (ref $chunk) eq 'ARRAY';
776     $chunk =~ s/\s+ (?: ASC|DESC ) \s* $//ix;
777
778     next if $in_sel_index->{$chunk};
779
780     $extra_order_sel{$chunk} ||= $self->_quote (
781       'ORDER__BY__' . scalar keys %extra_order_sel
782     );
783   }
784
785   return (
786     $proto_sql,
787     (map { join (', ', @$_ ) } (
788       \@in_sel,
789       \@out_sel)
790     ),
791     \%renamed,
792     keys %extra_order_sel ? \%extra_order_sel : (),
793   );
794 }
795
796 sub _unqualify_colname {
797   my ($self, $fqcn) = @_;
798   $fqcn =~ s/ \. /__/xg;
799   return $fqcn;
800 }
801
802 1;
803
804 =head1 AUTHORS
805
806 See L<DBIx::Class/CONTRIBUTORS>.
807
808 =head1 LICENSE
809
810 You may distribute this code under the same terms as Perl itself.
811
812 =cut