Handle NULLS clauses when mangling ordering
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / SQLMaker / LimitDialects.pm
1 package DBIx::Class::SQLMaker::LimitDialects;
2
3 use warnings;
4 use strict;
5
6 # constants are used not only here, but also in comparison tests
7 sub __rows_bindtype () {
8   +{ sqlt_datatype => 'integer' }
9 }
10 sub __offset_bindtype () {
11   +{ sqlt_datatype => 'integer' }
12 }
13 sub __total_bindtype () {
14   +{ sqlt_datatype => 'integer' }
15 }
16
17 =head1 NAME
18
19 DBIx::Class::SQLMaker::LimitDialects - SQL::Abstract::Limit-like functionality for DBIx::Class::SQLMaker
20
21 =head1 DESCRIPTION
22
23 This module replicates a lot of the functionality originally found in
24 L<SQL::Abstract::Limit>. While simple limits would work as-is, the more
25 complex dialects that require e.g. subqueries could not be reliably
26 implemented without taking full advantage of the metadata locked within
27 L<DBIx::Class::ResultSource> classes. After reimplementation of close to
28 80% of the L<SQL::Abstract::Limit> functionality it was deemed more
29 practical to simply make an independent DBIx::Class-specific limit-dialect
30 provider.
31
32 =head1 SQL LIMIT DIALECTS
33
34 Note that the actual implementations listed below never use C<*> literally.
35 Instead proper re-aliasing of selectors and order criteria is done, so that
36 the limit dialect are safe to use on joined resultsets with clashing column
37 names.
38
39 Currently the provided dialects are:
40
41 =head2 LimitOffset
42
43  SELECT ... LIMIT $limit OFFSET $offset
44
45 Supported by B<PostgreSQL> and B<SQLite>
46
47 =cut
48 sub _LimitOffset {
49     my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
50     $sql .= $self->_parse_rs_attrs( $rs_attrs ) . " LIMIT ?";
51     push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
52     if ($offset) {
53       $sql .= " OFFSET ?";
54       push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset ];
55     }
56     return $sql;
57 }
58
59 =head2 LimitXY
60
61  SELECT ... LIMIT $offset, $limit
62
63 Supported by B<MySQL> and any L<SQL::Statement> based DBD
64
65 =cut
66 sub _LimitXY {
67     my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
68     $sql .= $self->_parse_rs_attrs( $rs_attrs ) . " LIMIT ";
69     if ($offset) {
70       $sql .= '?, ';
71       push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset ];
72     }
73     $sql .= '?';
74     push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
75
76     return $sql;
77 }
78
79 =head2 RowNumberOver
80
81  SELECT * FROM (
82   SELECT *, ROW_NUMBER() OVER( ORDER BY ... ) AS RNO__ROW__INDEX FROM (
83    SELECT ...
84   )
85  ) WHERE RNO__ROW__INDEX BETWEEN ($offset+1) AND ($limit+$offset)
86
87
88 ANSI standard Limit/Offset implementation. Supported by B<DB2> and
89 B<< MSSQL >= 2005 >>.
90
91 =cut
92 sub _RowNumberOver {
93   my ($self, $sql, $rs_attrs, $rows, $offset ) = @_;
94
95   # get selectors, and scan the order_by (if any)
96   my $sq_attrs = $self->_subqueried_limit_attrs ( $sql, $rs_attrs );
97
98   # make up an order if none exists
99   my $requested_order = (delete $rs_attrs->{order_by}) || $self->_rno_default_order;
100
101   # the order binds (if any) will need to go at the end of the entire inner select
102   local $self->{order_bind};
103   my $rno_ord = $self->_order_by ($requested_order);
104   push @{$self->{select_bind}}, @{$self->{order_bind}};
105
106   # this is the order supplement magic
107   my $mid_sel = $sq_attrs->{selection_outer};
108   if (my $extra_order_sel = $sq_attrs->{order_supplement}) {
109     for my $extra_col (sort
110       { $extra_order_sel->{$a} cmp $extra_order_sel->{$b} }
111       keys %$extra_order_sel
112     ) {
113       $sq_attrs->{selection_inner} .= sprintf (', %s AS %s',
114         $extra_col,
115         $extra_order_sel->{$extra_col},
116       );
117     }
118   }
119
120   # and this is order re-alias magic
121   for my $map ($sq_attrs->{order_supplement}, $sq_attrs->{outer_renames}) {
122     for my $col (sort { (length $b) <=> (length $a) } keys %{$map||{}} ) {
123       my $re_col = quotemeta ($col);
124       $rno_ord =~ s/$re_col/$map->{$col}/;
125     }
126   }
127
128   # whatever is left of the order_by (only where is processed at this point)
129   my $group_having = $self->_parse_rs_attrs($rs_attrs);
130
131   my $qalias = $self->_quote ($rs_attrs->{alias});
132   my $idx_name = $self->_quote ('rno__row__index');
133
134   push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset + 1], [ $self->__total_bindtype => $offset + $rows ];
135
136   return <<EOS;
137
138 SELECT $sq_attrs->{selection_outer} FROM (
139   SELECT $mid_sel, ROW_NUMBER() OVER( $rno_ord ) AS $idx_name FROM (
140     SELECT $sq_attrs->{selection_inner} $sq_attrs->{query_leftover}${group_having}
141   ) $qalias
142 ) $qalias WHERE $idx_name >= ? AND $idx_name <= ?
143
144 EOS
145
146 }
147
148 # some databases are happy with OVER (), some need OVER (ORDER BY (SELECT (1)) )
149 sub _rno_default_order {
150   return undef;
151 }
152
153 =head2 SkipFirst
154
155  SELECT SKIP $offset FIRST $limit * FROM ...
156
157 Supported by B<Informix>, almost like LimitOffset. According to
158 L<SQL::Abstract::Limit> C<... SKIP $offset LIMIT $limit ...> is also supported.
159
160 =cut
161 sub _SkipFirst {
162   my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
163
164   $sql =~ s/^ \s* SELECT \s+ //ix
165     or $self->throw_exception("Unrecognizable SELECT: $sql");
166
167   return sprintf ('SELECT %s%s%s%s',
168     $offset
169       ? do {
170          push @{$self->{pre_select_bind}}, [ $self->__offset_bindtype => $offset];
171          'SKIP ? '
172       }
173       : ''
174     ,
175     do {
176        push @{$self->{pre_select_bind}}, [ $self->__rows_bindtype => $rows ];
177        'FIRST ? '
178     },
179     $sql,
180     $self->_parse_rs_attrs ($rs_attrs),
181   );
182 }
183
184 =head2 FirstSkip
185
186  SELECT FIRST $limit SKIP $offset * FROM ...
187
188 Supported by B<Firebird/Interbase>, reverse of SkipFirst. According to
189 L<SQL::Abstract::Limit> C<... ROWS $limit TO $offset ...> is also supported.
190
191 =cut
192 sub _FirstSkip {
193   my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
194
195   $sql =~ s/^ \s* SELECT \s+ //ix
196     or $self->throw_exception("Unrecognizable SELECT: $sql");
197
198   return sprintf ('SELECT %s%s%s%s',
199     do {
200        push @{$self->{pre_select_bind}}, [ $self->__rows_bindtype => $rows ];
201        'FIRST ? '
202     },
203     $offset
204       ? do {
205          push @{$self->{pre_select_bind}}, [ $self->__offset_bindtype => $offset];
206          'SKIP ? '
207       }
208       : ''
209     ,
210     $sql,
211     $self->_parse_rs_attrs ($rs_attrs),
212   );
213 }
214
215
216 =head2 RowNum
217
218 Depending on the resultset attributes one of:
219
220  SELECT * FROM (
221   SELECT *, ROWNUM AS rownum__index FROM (
222    SELECT ...
223   ) WHERE ROWNUM <= ($limit+$offset)
224  ) WHERE rownum__index >= ($offset+1)
225
226 or
227
228  SELECT * FROM (
229   SELECT *, ROWNUM AS rownum__index FROM (
230     SELECT ...
231   )
232  ) WHERE rownum__index BETWEEN ($offset+1) AND ($limit+$offset)
233
234 or
235
236  SELECT * FROM (
237     SELECT ...
238   ) WHERE ROWNUM <= ($limit+1)
239
240 Supported by B<Oracle>.
241
242 =cut
243 sub _RowNum {
244   my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
245
246   my $sq_attrs = $self->_subqueried_limit_attrs ($sql, $rs_attrs);
247
248   my $qalias = $self->_quote ($rs_attrs->{alias});
249   my $idx_name = $self->_quote ('rownum__index');
250   my $order_group_having = $self->_parse_rs_attrs($rs_attrs);
251
252
253   # if no offset (e.g. first page) - we can skip one of the subqueries
254   if (! $offset) {
255     push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
256
257     return <<EOS;
258 SELECT $sq_attrs->{selection_outer} FROM (
259   SELECT $sq_attrs->{selection_inner} $sq_attrs->{query_leftover}${order_group_having}
260 ) $qalias WHERE ROWNUM <= ?
261 EOS
262   }
263
264   #
265   # There are two ways to limit in Oracle, one vastly faster than the other
266   # on large resultsets: https://decipherinfosys.wordpress.com/2007/08/09/paging-and-countstopkey-optimization/
267   # However Oracle is retarded and does not preserve stable ROWNUM() values
268   # when called twice in the same scope. Therefore unless the resultset is
269   # ordered by a unique set of columns, it is not safe to use the faster
270   # method, and the slower BETWEEN query is used instead
271   #
272   # FIXME - this is quite expensive, and does not perform caching of any sort
273   # as soon as some of the SQLA-inlining work becomes viable consider adding
274   # some rudimentary caching support
275   if (
276     $rs_attrs->{order_by}
277       and
278     $rs_attrs->{result_source}->schema->storage->_order_by_is_stable(
279       @{$rs_attrs}{qw/from order_by where/}
280     )
281   ) {
282     push @{$self->{limit_bind}}, [ $self->__total_bindtype => $offset + $rows ], [ $self->__offset_bindtype => $offset + 1 ];
283
284     return <<EOS;
285 SELECT $sq_attrs->{selection_outer} FROM (
286   SELECT $sq_attrs->{selection_outer}, ROWNUM AS $idx_name FROM (
287     SELECT $sq_attrs->{selection_inner} $sq_attrs->{query_leftover}${order_group_having}
288   ) $qalias WHERE ROWNUM <= ?
289 ) $qalias WHERE $idx_name >= ?
290 EOS
291   }
292   else {
293     push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset + 1 ], [ $self->__total_bindtype => $offset + $rows ];
294
295     return <<EOS;
296 SELECT $sq_attrs->{selection_outer} FROM (
297   SELECT $sq_attrs->{selection_outer}, ROWNUM AS $idx_name FROM (
298     SELECT $sq_attrs->{selection_inner} $sq_attrs->{query_leftover}${order_group_having}
299   ) $qalias
300 ) $qalias WHERE $idx_name BETWEEN ? AND ?
301 EOS
302   }
303 }
304
305 # used by _Top and _FetchFirst below
306 sub _prep_for_skimming_limit {
307   my ( $self, $sql, $rs_attrs ) = @_;
308
309   # get selectors
310   my $sq_attrs = $self->_subqueried_limit_attrs ($sql, $rs_attrs);
311
312   my $requested_order = delete $rs_attrs->{order_by};
313   $sq_attrs->{order_by_requested} = $self->_order_by ($requested_order);
314   $sq_attrs->{grpby_having} = $self->_parse_rs_attrs ($rs_attrs);
315
316   # without an offset things are easy
317   if (! $rs_attrs->{offset}) {
318     $sq_attrs->{order_by_inner} = $sq_attrs->{order_by_requested};
319   }
320   else {
321     $sq_attrs->{quoted_rs_alias} = $self->_quote ($rs_attrs->{alias});
322
323     # localise as we already have all the bind values we need
324     local $self->{order_bind};
325
326     # make up an order unless supplied or sanity check what we are given
327     my $inner_order;
328     if ($sq_attrs->{order_by_requested}) {
329       $self->throw_exception (
330         'Unable to safely perform "skimming type" limit with supplied unstable order criteria'
331       ) unless ($rs_attrs->{result_source}->schema->storage->_order_by_is_stable(
332         $rs_attrs->{from},
333         $requested_order,
334         $rs_attrs->{where},
335       ));
336
337       $inner_order = $requested_order;
338     }
339     else {
340       $inner_order = [ map
341         { "$rs_attrs->{alias}.$_" }
342         ( @{
343           $rs_attrs->{result_source}->_identifying_column_set
344             ||
345           $self->throw_exception(sprintf(
346             'Unable to auto-construct stable order criteria for "skimming type" limit '
347           . "dialect based on source '%s'", $rs_attrs->{result_source}->name) );
348         } )
349       ];
350     }
351
352     $sq_attrs->{order_by_inner} = $self->_order_by ($inner_order);
353
354     my @out_chunks;
355     for my $ch ($self->_order_by_chunks ($inner_order)) {
356       $ch = $ch->[0] if ref $ch eq 'ARRAY';
357
358       ($ch, my ($is_desc, $nulls_pos) ) = $self->_split_order_chunk($ch);
359
360       # !NOTE! outside chunks come in reverse order ( !$is_desc, !$nulls_pos )
361       push @out_chunks, {
362         ($is_desc ? '-asc' : '-desc') => \$ch,
363         $nulls_pos ? (
364           -nulls => ($nulls_pos eq 'FIRST' ? 'LAST' : 'FIRST')
365         ) : (),
366       };
367     }
368
369     $sq_attrs->{order_by_middle} = $self->_order_by (\@out_chunks);
370
371     # this is the order supplement magic
372     $sq_attrs->{selection_middle} = $sq_attrs->{selection_outer};
373     if (my $extra_order_sel = $sq_attrs->{order_supplement}) {
374       for my $extra_col (sort
375         { $extra_order_sel->{$a} cmp $extra_order_sel->{$b} }
376         keys %$extra_order_sel
377       ) {
378         $sq_attrs->{selection_inner} .= sprintf (', %s AS %s',
379           $extra_col,
380           $extra_order_sel->{$extra_col},
381         );
382
383         $sq_attrs->{selection_middle} .= ', ' . $extra_order_sel->{$extra_col};
384       }
385
386       # Whatever order bindvals there are, they will be realiased and
387       # reselected, and need to show up at end of the initial inner select
388       push @{$self->{select_bind}}, @{$self->{order_bind}};
389     }
390
391     # and this is order re-alias magic
392     for my $map ($sq_attrs->{order_supplement}, $sq_attrs->{outer_renames}) {
393       for my $col (sort { (length $b) <=> (length $a) } keys %{$map||{}}) {
394         my $re_col = quotemeta ($col);
395         $_ =~ s/$re_col/$map->{$col}/
396           for ($sq_attrs->{order_by_middle}, $sq_attrs->{order_by_requested});
397       }
398     }
399   }
400
401   $sq_attrs;
402 }
403
404 =head2 Top
405
406  SELECT * FROM
407
408  SELECT TOP $limit FROM (
409   SELECT TOP $limit FROM (
410    SELECT TOP ($limit+$offset) ...
411   ) ORDER BY $reversed_original_order
412  ) ORDER BY $original_order
413
414 Unreliable Top-based implementation, supported by B<< MSSQL < 2005 >>.
415
416 =head3 CAVEAT
417
418 Due to its implementation, this limit dialect returns B<incorrect results>
419 when $limit+$offset > total amount of rows in the resultset.
420
421 =cut
422
423 sub _Top {
424   my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
425
426   my $lim = $self->_prep_for_skimming_limit($sql, $rs_attrs);
427
428   $sql = sprintf ('SELECT TOP %u %s %s %s %s',
429     $rows + ($offset||0),
430     $offset ? $lim->{selection_inner} : $lim->{selection_original},
431     $lim->{query_leftover},
432     $lim->{grpby_having},
433     $lim->{order_by_inner},
434   );
435
436   $sql = sprintf ('SELECT TOP %u %s FROM ( %s ) %s %s',
437     $rows,
438     $lim->{selection_middle},
439     $sql,
440     $lim->{quoted_rs_alias},
441     $lim->{order_by_middle},
442   ) if $offset;
443
444   $sql = sprintf ('SELECT %s FROM ( %s ) %s %s',
445     $lim->{selection_outer},
446     $sql,
447     $lim->{quoted_rs_alias},
448     $lim->{order_by_requested},
449   ) if $offset and (
450     $lim->{order_by_requested} or $lim->{selection_middle} ne $lim->{selection_outer}
451   );
452
453   return $sql;
454 }
455
456 =head2 FetchFirst
457
458  SELECT * FROM
459  (
460  SELECT * FROM (
461   SELECT * FROM (
462    SELECT * FROM ...
463   ) ORDER BY $reversed_original_order
464     FETCH FIRST $limit ROWS ONLY
465  ) ORDER BY $original_order
466    FETCH FIRST $limit ROWS ONLY
467  )
468
469 Unreliable FetchFirst-based implementation, supported by B<< IBM DB2 <= V5R3 >>.
470
471 =head3 CAVEAT
472
473 Due to its implementation, this limit dialect returns B<incorrect results>
474 when $limit+$offset > total amount of rows in the resultset.
475
476 =cut
477
478 sub _FetchFirst {
479   my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
480
481   my $lim = $self->_prep_for_skimming_limit($sql, $rs_attrs);
482
483   $sql = sprintf ('SELECT %s %s %s %s FETCH FIRST %u ROWS ONLY',
484     $offset ? $lim->{selection_inner} : $lim->{selection_original},
485     $lim->{query_leftover},
486     $lim->{grpby_having},
487     $lim->{order_by_inner},
488     $rows + ($offset||0),
489   );
490
491   $sql = sprintf ('SELECT %s FROM ( %s ) %s %s FETCH FIRST %u ROWS ONLY',
492     $lim->{selection_middle},
493     $sql,
494     $lim->{quoted_rs_alias},
495     $lim->{order_by_middle},
496     $rows,
497   ) if $offset;
498
499
500   $sql = sprintf ('SELECT %s FROM ( %s ) %s %s',
501     $lim->{selection_outer},
502     $sql,
503     $lim->{quoted_rs_alias},
504     $lim->{order_by_requested},
505   ) if $offset and (
506     $lim->{order_by_requested} or $lim->{selection_middle} ne $lim->{selection_outer}
507   );
508
509   return $sql;
510 }
511
512 =head2 GenericSubQ
513
514  SELECT * FROM (
515   SELECT ...
516  )
517  WHERE (
518   SELECT COUNT(*) FROM $original_table cnt WHERE cnt.id < $original_table.id
519  ) BETWEEN $offset AND ($offset+$rows-1)
520
521 This is the most evil limit "dialect" (more of a hack) for I<really> stupid
522 databases. It works by ordering the set by some unique column, and calculating
523 the amount of rows that have a less-er value (thus emulating a L</RowNum>-like
524 index). Of course this implies the set can only be ordered by a single unique
525 column.
526
527 Also note that this technique can be and often is B<excruciatingly slow>. You
528 may have much better luck using L<DBIx::Class::ResultSet/software_limit>
529 instead.
530
531 Currently used by B<Sybase ASE>, due to lack of any other option.
532
533 =cut
534 sub _GenericSubQ {
535   my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
536
537   my $main_rsrc = $rs_attrs->{result_source};
538
539   # Explicitly require an order_by
540   # GenSubQ is slow enough as it is, just emulating things
541   # like in other cases is not wise - make the user work
542   # to shoot their DBA in the foot
543   $self->throw_exception (
544     'Generic Subquery Limit does not work on resultsets without an order. Provide a stable, '
545   . 'main-table-based order criteria.'
546   ) unless $rs_attrs->{order_by};
547
548   my $usable_order_colinfo = $main_rsrc->schema->storage->_extract_colinfo_of_stable_main_source_order_by_portion(
549     $rs_attrs
550   );
551
552   $self->throw_exception(
553     'Generic Subquery Limit can not work with order criteria based on sources other than the main one'
554   ) if (
555     ! keys %{$usable_order_colinfo||{}}
556       or
557     grep
558       { $_->{-source_alias} ne $rs_attrs->{alias} }
559       (values %$usable_order_colinfo)
560   );
561
562 ###
563 ###
564 ### we need to know the directions after we figured out the above - reextract *again*
565 ### this is eyebleed - trying to get it to work at first
566   my $supplied_order = delete $rs_attrs->{order_by};
567
568   my @order_bits = do {
569     local $self->{quote_char};
570     local $self->{order_bind};
571     map { ref $_ ? $_->[0] : $_ } $self->_order_by_chunks ($supplied_order)
572   };
573
574   # truncate to what we'll use
575   $#order_bits = ( (keys %$usable_order_colinfo) - 1 );
576
577   # @order_bits likely will come back quoted (due to how the prefetch
578   # rewriter operates
579   # Hence supplement the column_info lookup table with quoted versions
580   if ($self->quote_char) {
581     $usable_order_colinfo->{$self->_quote($_)} = $usable_order_colinfo->{$_}
582       for keys %$usable_order_colinfo;
583   }
584
585 # calculate the condition
586   my $count_tbl_alias = 'rownum__emulation';
587   my $main_alias = $rs_attrs->{alias};
588   my $main_tbl_name = $main_rsrc->name;
589
590   my (@unqualified_names, @qualified_names, @is_desc, @new_order_by);
591
592   for my $bit (@order_bits) {
593
594     ($bit, my ($is_desc, $nulls_pos)) = $self->_split_order_chunk($bit);
595
596     push @is_desc, $is_desc;
597     push @unqualified_names, $usable_order_colinfo->{$bit}{-colname};
598     push @qualified_names, $usable_order_colinfo->{$bit}{-fq_colname};
599
600     push @new_order_by, {
601       ($is_desc ? '-desc' : '-asc') => $usable_order_colinfo->{$bit}{-fq_colname},
602       ($nulls_pos ? ( -nulls => lc $nulls_pos ) : ()),
603     };
604   };
605
606   my (@where_cond, @skip_colpair_stack);
607   for my $i (0 .. $#order_bits) {
608     my $ci = $usable_order_colinfo->{$order_bits[$i]};
609
610     my ($subq_col, $main_col) = map { "$_.$ci->{-colname}" } ($count_tbl_alias, $main_alias);
611     my $cur_cond = { $subq_col => { ($is_desc[$i] ? '>' : '<') => { -ident => $main_col } } };
612
613     push @skip_colpair_stack, [
614       { $main_col => { -ident => $subq_col } },
615     ];
616
617     # we can trust the nullability flag because
618     # we already used it during _id_col_set resolution
619     #
620     if ($ci->{is_nullable}) {
621       push @{$skip_colpair_stack[-1]}, { $main_col => undef, $subq_col=> undef };
622
623       $cur_cond = [
624         {
625           ($is_desc[$i] ? $subq_col : $main_col) => { '!=', undef },
626           ($is_desc[$i] ? $main_col : $subq_col) => undef,
627         },
628         {
629           $subq_col => { '!=', undef },
630           $main_col => { '!=', undef },
631           -and => $cur_cond,
632         },
633       ];
634     }
635
636     push @where_cond, { '-and', => [ @skip_colpair_stack[0..$i-1], $cur_cond ] };
637   }
638
639 # reuse the sqlmaker WHERE, this will not be returning binds
640   my $counted_where = do {
641     local $self->{where_bind};
642     $self->where(\@where_cond);
643   };
644
645 # construct the rownum condition by hand
646   my $rownum_cond;
647   if ($offset) {
648     $rownum_cond = 'BETWEEN ? AND ?';
649     push @{$self->{limit_bind}},
650       [ $self->__offset_bindtype => $offset ],
651       [ $self->__total_bindtype => $offset + $rows - 1]
652     ;
653   }
654   else {
655     $rownum_cond = '< ?';
656     push @{$self->{limit_bind}},
657       [ $self->__rows_bindtype => $rows ]
658     ;
659   }
660
661 # and what we will order by inside
662   my $inner_order_sql = do {
663     local $self->{order_bind};
664
665     my $s = $self->_order_by (\@new_order_by);
666
667     $self->throw_exception('Inner gensubq order may not contain binds... something went wrong')
668       if @{$self->{order_bind}};
669
670     $s;
671   };
672
673 ### resume originally scheduled programming
674 ###
675 ###
676
677   # we need to supply the order for the supplements to be properly calculated
678   my $sq_attrs = $self->_subqueried_limit_attrs (
679     $sql, { %$rs_attrs, order_by => \@new_order_by }
680   );
681
682   my $in_sel = $sq_attrs->{selection_inner};
683
684   # add the order supplement (if any) as this is what will be used for the outer WHERE
685   $in_sel .= ", $_" for sort keys %{$sq_attrs->{order_supplement}};
686
687   my $group_having_sql = $self->_parse_rs_attrs($rs_attrs);
688
689
690   return sprintf ("
691 SELECT $sq_attrs->{selection_outer}
692   FROM (
693     SELECT $in_sel $sq_attrs->{query_leftover}${group_having_sql}
694   ) %s
695 WHERE ( SELECT COUNT(*) FROM %s %s $counted_where ) $rownum_cond
696 $inner_order_sql
697   ", map { $self->_quote ($_) } (
698     $rs_attrs->{alias},
699     $main_tbl_name,
700     $count_tbl_alias,
701   ));
702 }
703
704
705 # !!! THIS IS ALSO HORRIFIC !!! /me ashamed
706 #
707 # Generates inner/outer select lists for various limit dialects
708 # which result in one or more subqueries (e.g. RNO, Top, RowNum)
709 # Any non-main-table columns need to have their table qualifier
710 # turned into a column alias (otherwise names in subqueries clash
711 # and/or lose their source table)
712 #
713 # Returns mangled proto-sql, inner/outer strings of SQL QUOTED selectors
714 # with aliases (to be used in whatever select statement), and an alias
715 # index hashref of QUOTED SEL => QUOTED ALIAS pairs (to maybe be used
716 # for string-subst higher up).
717 # If an order_by is supplied, the inner select needs to bring out columns
718 # used in implicit (non-selected) orders, and the order condition itself
719 # needs to be realiased to the proper names in the outer query. Thus we
720 # also return a hashref (order doesn't matter) of QUOTED EXTRA-SEL =>
721 # QUOTED ALIAS pairs, which is a list of extra selectors that do *not*
722 # exist in the original select list
723 sub _subqueried_limit_attrs {
724   my ($self, $proto_sql, $rs_attrs) = @_;
725
726   $self->throw_exception(
727     'Limit dialect implementation usable only in the context of DBIC (missing $rs_attrs)'
728   ) unless ref ($rs_attrs) eq 'HASH';
729
730   # mangle the input sql as we will be replacing the selector entirely
731   unless (
732     $rs_attrs->{_selector_sql}
733       and
734     $proto_sql =~ s/^ \s* SELECT \s* \Q$rs_attrs->{_selector_sql}//ix
735   ) {
736     $self->throw_exception("Unrecognizable SELECT: $proto_sql");
737   }
738
739   my ($re_sep, $re_alias) = map { quotemeta $_ } ( $self->{name_sep}, $rs_attrs->{alias} );
740
741   # correlate select and as, build selection index
742   my (@sel, $in_sel_index);
743   for my $i (0 .. $#{$rs_attrs->{select}}) {
744
745     my $s = $rs_attrs->{select}[$i];
746     my $sql_alias = (ref $s) eq 'HASH' ? $s->{-as} : undef;
747
748     # we throw away the @bind here deliberately
749     my ($sql_sel) = $self->_recurse_fields ($s);
750
751     push @sel, {
752       arg => $s,
753       sql => $sql_sel,
754       unquoted_sql => do {
755         local $self->{quote_char};
756         ($self->_recurse_fields ($s))[0]; # ignore binds again
757       },
758       as =>
759         $sql_alias
760           ||
761         $rs_attrs->{as}[$i]
762           ||
763         $self->throw_exception("Select argument $i ($s) without corresponding 'as'")
764       ,
765     };
766
767     # anything with a placeholder in it needs re-selection
768     $in_sel_index->{$sql_sel}++ unless $sql_sel =~ / (?: ^ | \W ) \? (?: \W | $ ) /x;
769
770     $in_sel_index->{$self->_quote ($sql_alias)}++ if $sql_alias;
771
772     # record unqualified versions too, so we do not have
773     # to reselect the same column twice (in qualified and
774     # unqualified form)
775     if (! ref $s && $sql_sel =~ / $re_sep (.+) $/x) {
776       $in_sel_index->{$1}++;
777     }
778   }
779
780
781   # re-alias and remove any name separators from aliases,
782   # unless we are dealing with the current source alias
783   # (which will transcend the subqueries as it is necessary
784   # for possible further chaining)
785   # same for anything we do not recognize
786   my ($sel, $renamed);
787   for my $node (@sel) {
788     push @{$sel->{original}}, $node->{sql};
789
790     if (
791       ! $in_sel_index->{$node->{sql}}
792         or
793       $node->{as} =~ / (?<! ^ $re_alias ) \. /x
794         or
795       $node->{unquoted_sql} =~ / (?<! ^ $re_alias ) $re_sep /x
796     ) {
797       $node->{as} = $self->_unqualify_colname($node->{as});
798       my $quoted_as = $self->_quote($node->{as});
799       push @{$sel->{inner}}, sprintf '%s AS %s', $node->{sql}, $quoted_as;
800       push @{$sel->{outer}}, $quoted_as;
801       $renamed->{$node->{sql}} = $quoted_as;
802     }
803     else {
804       push @{$sel->{inner}}, $node->{sql};
805       push @{$sel->{outer}}, $self->_quote (ref $node->{arg} ? $node->{as} : $node->{arg});
806     }
807   }
808
809   # see if the order gives us anything
810   my $extra_order_sel;
811   for my $chunk ($self->_order_by_chunks ($rs_attrs->{order_by})) {
812     # order with bind
813     $chunk = $chunk->[0] if (ref $chunk) eq 'ARRAY';
814     ($chunk) = $self->_split_order_chunk($chunk);
815
816     next if $in_sel_index->{$chunk};
817
818     $extra_order_sel->{$chunk} ||= $self->_quote (
819       'ORDER__BY__' . sprintf '%03d', scalar keys %{$extra_order_sel||{}}
820     );
821   }
822
823   return {
824     query_leftover => $proto_sql,
825     (map {( "selection_$_" => join (', ', @{$sel->{$_}} ) )} keys %$sel ),
826     outer_renames => $renamed,
827     order_supplement => $extra_order_sel,
828   };
829 }
830
831 sub _unqualify_colname {
832   my ($self, $fqcn) = @_;
833   $fqcn =~ s/ \. /__/xg;
834   return $fqcn;
835 }
836
837 =head1 FURTHER QUESTIONS?
838
839 Check the list of L<additional DBIC resources|DBIx::Class/GETTING HELP/SUPPORT>.
840
841 =head1 COPYRIGHT AND LICENSE
842
843 This module is free software L<copyright|DBIx::Class/COPYRIGHT AND LICENSE>
844 by the L<DBIx::Class (DBIC) authors|DBIx::Class/AUTHORS>. You can
845 redistribute it and/or modify it under the same terms as the
846 L<DBIx::Class library|DBIx::Class/COPYRIGHT AND LICENSE>.
847
848 =cut
849
850 1;