add OffsetFetch support
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / SQLMaker / LimitDialects.pm
1 package DBIx::Class::SQLMaker::LimitDialects;
2
3 use warnings;
4 use strict;
5
6 use List::Util 'first';
7 use namespace::clean;
8
9 # constants are used not only here, but also in comparison tests
10 sub __rows_bindtype () {
11   +{ sqlt_datatype => 'integer' }
12 }
13 sub __offset_bindtype () {
14   +{ sqlt_datatype => 'integer' }
15 }
16 sub __total_bindtype () {
17   +{ sqlt_datatype => 'integer' }
18 }
19
20 =head1 NAME
21
22 DBIx::Class::SQLMaker::LimitDialects - SQL::Abstract::Limit-like functionality for DBIx::Class::SQLMaker
23
24 =head1 DESCRIPTION
25
26 This module replicates a lot of the functionality originally found in
27 L<SQL::Abstract::Limit>. While simple limits would work as-is, the more
28 complex dialects that require e.g. subqueries could not be reliably
29 implemented without taking full advantage of the metadata locked within
30 L<DBIx::Class::ResultSource> classes. After reimplementation of close to
31 80% of the L<SQL::Abstract::Limit> functionality it was deemed more
32 practical to simply make an independent DBIx::Class-specific limit-dialect
33 provider.
34
35 =head1 SQL LIMIT DIALECTS
36
37 Note that the actual implementations listed below never use C<*> literally.
38 Instead proper re-aliasing of selectors and order criteria is done, so that
39 the limit dialect are safe to use on joined resultsets with clashing column
40 names.
41
42 Currently the provided dialects are:
43
44 =head2 LimitOffset
45
46  SELECT ... LIMIT $limit OFFSET $offset
47
48 Supported by B<PostgreSQL> and B<SQLite>
49
50 =cut
51 sub _LimitOffset {
52     my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
53     $sql .= $self->_parse_rs_attrs( $rs_attrs ) . " LIMIT ?";
54     push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
55     if ($offset) {
56       $sql .= " OFFSET ?";
57       push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset ];
58     }
59     return $sql;
60 }
61
62 =head2 LimitXY
63
64  SELECT ... LIMIT $offset $limit
65
66 Supported by B<MySQL> and any L<SQL::Statement> based DBD
67
68 =cut
69 sub _LimitXY {
70     my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
71     $sql .= $self->_parse_rs_attrs( $rs_attrs ) . " LIMIT ";
72     if ($offset) {
73       $sql .= '?, ';
74       push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset ];
75     }
76     $sql .= '?';
77     push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
78
79     return $sql;
80 }
81
82 =head2 RowNumberOver
83
84  SELECT * FROM (
85   SELECT *, ROW_NUMBER() OVER( ORDER BY ... ) AS RNO__ROW__INDEX FROM (
86    SELECT ...
87   )
88  ) WHERE RNO__ROW__INDEX BETWEEN ($offset+1) AND ($limit+$offset)
89
90
91 ANSI standard Limit/Offset implementation. Supported by B<DB2> and
92 B<< MSSQL >= 2005 >>.
93
94 =cut
95 sub _RowNumberOver {
96   my ($self, $sql, $rs_attrs, $rows, $offset ) = @_;
97
98   # get selectors, and scan the order_by (if any)
99   my $sq_attrs = $self->_subqueried_limit_attrs ( $sql, $rs_attrs );
100
101   # make up an order if none exists
102   my $requested_order = (delete $rs_attrs->{order_by}) || $self->_rno_default_order;
103
104   # the order binds (if any) will need to go at the end of the entire inner select
105   local $self->{order_bind};
106   my $rno_ord = $self->_order_by ($requested_order);
107   push @{$self->{select_bind}}, @{$self->{order_bind}};
108
109   # this is the order supplement magic
110   my $mid_sel = $sq_attrs->{selection_outer};
111   if (my $extra_order_sel = $sq_attrs->{order_supplement}) {
112     for my $extra_col (sort
113       { $extra_order_sel->{$a} cmp $extra_order_sel->{$b} }
114       keys %$extra_order_sel
115     ) {
116       $sq_attrs->{selection_inner} .= sprintf (', %s AS %s',
117         $extra_col,
118         $extra_order_sel->{$extra_col},
119       );
120     }
121   }
122
123   # and this is order re-alias magic
124   for my $map ($sq_attrs->{order_supplement}, $sq_attrs->{outer_renames}) {
125     for my $col (sort { (length $b) <=> (length $a) } keys %{$map||{}} ) {
126       my $re_col = quotemeta ($col);
127       $rno_ord =~ s/$re_col/$map->{$col}/;
128     }
129   }
130
131   # whatever is left of the order_by (only where is processed at this point)
132   my $group_having = $self->_parse_rs_attrs($rs_attrs);
133
134   my $qalias = $self->_quote ($rs_attrs->{alias});
135   my $idx_name = $self->_quote ('rno__row__index');
136
137   push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset + 1], [ $self->__total_bindtype => $offset + $rows ];
138
139   return <<EOS;
140
141 SELECT $sq_attrs->{selection_outer} FROM (
142   SELECT $mid_sel, ROW_NUMBER() OVER( $rno_ord ) AS $idx_name FROM (
143     SELECT $sq_attrs->{selection_inner} $sq_attrs->{query_leftover}${group_having}
144   ) $qalias
145 ) $qalias WHERE $idx_name >= ? AND $idx_name <= ?
146
147 EOS
148
149 }
150
151 # some databases are happy with OVER (), some need OVER (ORDER BY (SELECT (1)) )
152 sub _rno_default_order {
153   return undef;
154 }
155
156 =head2 OffsetFetchNext
157
158  SELECT * FROM ... OFFSET 10 ROWS FETCH NEXT 5 ROWS ONLY
159
160 Suported by B<SQL Server 2012>, purported to be more efficient than L</RowNumberOver>.
161
162 =cut
163 sub _OffsetFetchNext {
164   my ($self, $sql, $rs_attrs, $rows, $offset ) = @_;
165
166   my $requested_order = (delete $rs_attrs->{order_by}) || \'1';
167
168   my $ord = $self->_order_by ($requested_order) || ' ORDER BY 1';
169
170   $sql .= $self->_parse_rs_attrs( $rs_attrs )
171        . $ord
172        . ' OFFSET ? ROWS FETCH NEXT ? ROWS ONLY';
173   push @{$self->{limit_bind}},
174     [ $self->__offset_bindtype => $offset || 0],
175     [ $self->__rows_bindtype => $rows];
176
177   return $sql;
178 }
179
180 =head2 SkipFirst
181
182  SELECT SKIP $offset FIRST $limit * FROM ...
183
184 Suported by B<Informix>, almost like LimitOffset. According to
185 L<SQL::Abstract::Limit> C<... SKIP $offset LIMIT $limit ...> is also supported.
186
187 =cut
188 sub _SkipFirst {
189   my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
190
191   $sql =~ s/^ \s* SELECT \s+ //ix
192     or $self->throw_exception("Unrecognizable SELECT: $sql");
193
194   return sprintf ('SELECT %s%s%s%s',
195     $offset
196       ? do {
197          push @{$self->{pre_select_bind}}, [ $self->__offset_bindtype => $offset];
198          'SKIP ? '
199       }
200       : ''
201     ,
202     do {
203        push @{$self->{pre_select_bind}}, [ $self->__rows_bindtype => $rows ];
204        'FIRST ? '
205     },
206     $sql,
207     $self->_parse_rs_attrs ($rs_attrs),
208   );
209 }
210
211 =head2 FirstSkip
212
213  SELECT FIRST $limit SKIP $offset * FROM ...
214
215 Supported by B<Firebird/Interbase>, reverse of SkipFirst. According to
216 L<SQL::Abstract::Limit> C<... ROWS $limit TO $offset ...> is also supported.
217
218 =cut
219 sub _FirstSkip {
220   my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
221
222   $sql =~ s/^ \s* SELECT \s+ //ix
223     or $self->throw_exception("Unrecognizable SELECT: $sql");
224
225   return sprintf ('SELECT %s%s%s%s',
226     do {
227        push @{$self->{pre_select_bind}}, [ $self->__rows_bindtype => $rows ];
228        'FIRST ? '
229     },
230     $offset
231       ? do {
232          push @{$self->{pre_select_bind}}, [ $self->__offset_bindtype => $offset];
233          'SKIP ? '
234       }
235       : ''
236     ,
237     $sql,
238     $self->_parse_rs_attrs ($rs_attrs),
239   );
240 }
241
242
243 =head2 RowNum
244
245 Depending on the resultset attributes one of:
246
247  SELECT * FROM (
248   SELECT *, ROWNUM AS rownum__index FROM (
249    SELECT ...
250   ) WHERE ROWNUM <= ($limit+$offset)
251  ) WHERE rownum__index >= ($offset+1)
252
253 or
254
255  SELECT * FROM (
256   SELECT *, ROWNUM AS rownum__index FROM (
257     SELECT ...
258   )
259  ) WHERE rownum__index BETWEEN ($offset+1) AND ($limit+$offset)
260
261 or
262
263  SELECT * FROM (
264     SELECT ...
265   ) WHERE ROWNUM <= ($limit+1)
266
267 Supported by B<Oracle>.
268
269 =cut
270 sub _RowNum {
271   my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
272
273   my $sq_attrs = $self->_subqueried_limit_attrs ($sql, $rs_attrs);
274
275   my $qalias = $self->_quote ($rs_attrs->{alias});
276   my $idx_name = $self->_quote ('rownum__index');
277   my $order_group_having = $self->_parse_rs_attrs($rs_attrs);
278
279
280   # if no offset (e.g. first page) - we can skip one of the subqueries
281   if (! $offset) {
282     push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
283
284     return <<EOS;
285 SELECT $sq_attrs->{selection_outer} FROM (
286   SELECT $sq_attrs->{selection_inner} $sq_attrs->{query_leftover}${order_group_having}
287 ) $qalias WHERE ROWNUM <= ?
288 EOS
289   }
290
291   #
292   # There are two ways to limit in Oracle, one vastly faster than the other
293   # on large resultsets: https://decipherinfosys.wordpress.com/2007/08/09/paging-and-countstopkey-optimization/
294   # However Oracle is retarded and does not preserve stable ROWNUM() values
295   # when called twice in the same scope. Therefore unless the resultset is
296   # ordered by a unique set of columns, it is not safe to use the faster
297   # method, and the slower BETWEEN query is used instead
298   #
299   # FIXME - this is quite expensive, and does not perform caching of any sort
300   # as soon as some of the DQ work becomes viable consider switching this
301   # over
302   if (
303     $rs_attrs->{order_by}
304       and
305     $rs_attrs->{_rsroot_rsrc}->storage->_order_by_is_stable(
306       @{$rs_attrs}{qw/from order_by where/}
307     )
308   ) {
309     push @{$self->{limit_bind}}, [ $self->__total_bindtype => $offset + $rows ], [ $self->__offset_bindtype => $offset + 1 ];
310
311     return <<EOS;
312 SELECT $sq_attrs->{selection_outer} FROM (
313   SELECT $sq_attrs->{selection_outer}, ROWNUM AS $idx_name FROM (
314     SELECT $sq_attrs->{selection_inner} $sq_attrs->{query_leftover}${order_group_having}
315   ) $qalias WHERE ROWNUM <= ?
316 ) $qalias WHERE $idx_name >= ?
317 EOS
318   }
319   else {
320     push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset + 1 ], [ $self->__total_bindtype => $offset + $rows ];
321
322     return <<EOS;
323 SELECT $sq_attrs->{selection_outer} FROM (
324   SELECT $sq_attrs->{selection_outer}, ROWNUM AS $idx_name FROM (
325     SELECT $sq_attrs->{selection_inner} $sq_attrs->{query_leftover}${order_group_having}
326   ) $qalias
327 ) $qalias WHERE $idx_name BETWEEN ? AND ?
328 EOS
329   }
330 }
331
332 # used by _Top and _FetchFirst below
333 sub _prep_for_skimming_limit {
334   my ( $self, $sql, $rs_attrs ) = @_;
335
336   # get selectors
337   my $sq_attrs = $self->_subqueried_limit_attrs ($sql, $rs_attrs);
338
339   my $requested_order = delete $rs_attrs->{order_by};
340   $sq_attrs->{order_by_requested} = $self->_order_by ($requested_order);
341   $sq_attrs->{grpby_having} = $self->_parse_rs_attrs ($rs_attrs);
342
343   # without an offset things are easy
344   if (! $rs_attrs->{offset}) {
345     $sq_attrs->{order_by_inner} = $sq_attrs->{order_by_requested};
346   }
347   else {
348     $sq_attrs->{quoted_rs_alias} = $self->_quote ($rs_attrs->{alias});
349
350     # localise as we already have all the bind values we need
351     local $self->{order_bind};
352
353     # make up an order unless supplied or sanity check what we are given
354     my $inner_order;
355     if ($sq_attrs->{order_by_requested}) {
356       $self->throw_exception (
357         'Unable to safely perform "skimming type" limit with supplied unstable order criteria'
358       ) unless ($rs_attrs->{_rsroot_rsrc}->schema->storage->_order_by_is_stable(
359         $rs_attrs->{from},
360         $requested_order,
361         $rs_attrs->{where},
362       ));
363
364       $inner_order = $requested_order;
365     }
366     else {
367       $inner_order = [ map
368         { "$rs_attrs->{alias}.$_" }
369         ( @{
370           $rs_attrs->{_rsroot_rsrc}->_identifying_column_set
371             ||
372           $self->throw_exception(sprintf(
373             'Unable to auto-construct stable order criteria for "skimming type" limit '
374           . "dialect based on source '%s'", $rs_attrs->{_rsroot_rsrc}->name) );
375         } )
376       ];
377     }
378
379     $sq_attrs->{order_by_inner} = $self->_order_by ($inner_order);
380
381     my @out_chunks;
382     for my $ch ($self->_order_by_chunks ($inner_order)) {
383       $ch = $ch->[0] if ref $ch eq 'ARRAY';
384
385       ($ch, my $is_desc) = $self->_split_order_chunk($ch);
386
387       # !NOTE! outside chunks come in reverse order ( !$is_desc )
388       push @out_chunks, { ($is_desc ? '-asc' : '-desc') => \$ch };
389     }
390
391     $sq_attrs->{order_by_middle} = $self->_order_by (\@out_chunks);
392
393     # this is the order supplement magic
394     $sq_attrs->{selection_middle} = $sq_attrs->{selection_outer};
395     if (my $extra_order_sel = $sq_attrs->{order_supplement}) {
396       for my $extra_col (sort
397         { $extra_order_sel->{$a} cmp $extra_order_sel->{$b} }
398         keys %$extra_order_sel
399       ) {
400         $sq_attrs->{selection_inner} .= sprintf (', %s AS %s',
401           $extra_col,
402           $extra_order_sel->{$extra_col},
403         );
404
405         $sq_attrs->{selection_middle} .= ', ' . $extra_order_sel->{$extra_col};
406       }
407
408       # Whatever order bindvals there are, they will be realiased and
409       # reselected, and need to show up at end of the initial inner select
410       push @{$self->{select_bind}}, @{$self->{order_bind}};
411     }
412
413     # and this is order re-alias magic
414     for my $map ($sq_attrs->{order_supplement}, $sq_attrs->{outer_renames}) {
415       for my $col (sort { (length $b) <=> (length $a) } keys %{$map||{}}) {
416         my $re_col = quotemeta ($col);
417         $_ =~ s/$re_col/$map->{$col}/
418           for ($sq_attrs->{order_by_middle}, $sq_attrs->{order_by_requested});
419       }
420     }
421   }
422
423   $sq_attrs;
424 }
425
426 =head2 Top
427
428  SELECT * FROM
429
430  SELECT TOP $limit FROM (
431   SELECT TOP $limit FROM (
432    SELECT TOP ($limit+$offset) ...
433   ) ORDER BY $reversed_original_order
434  ) ORDER BY $original_order
435
436 Unreliable Top-based implementation, supported by B<< MSSQL < 2005 >>.
437
438 =head3 CAVEAT
439
440 Due to its implementation, this limit dialect returns B<incorrect results>
441 when $limit+$offset > total amount of rows in the resultset.
442
443 =cut
444
445 sub _Top {
446   my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
447
448   my $lim = $self->_prep_for_skimming_limit($sql, $rs_attrs);
449
450   $sql = sprintf ('SELECT TOP %u %s %s %s %s',
451     $rows + ($offset||0),
452     $offset ? $lim->{selection_inner} : $lim->{selection_original},
453     $lim->{query_leftover},
454     $lim->{grpby_having},
455     $lim->{order_by_inner},
456   );
457
458   $sql = sprintf ('SELECT TOP %u %s FROM ( %s ) %s %s',
459     $rows,
460     $lim->{selection_middle},
461     $sql,
462     $lim->{quoted_rs_alias},
463     $lim->{order_by_middle},
464   ) if $offset;
465
466   $sql = sprintf ('SELECT %s FROM ( %s ) %s %s',
467     $lim->{selection_outer},
468     $sql,
469     $lim->{quoted_rs_alias},
470     $lim->{order_by_requested},
471   ) if $offset and (
472     $lim->{order_by_requested} or $lim->{selection_middle} ne $lim->{selection_outer}
473   );
474
475   return $sql;
476 }
477
478 =head2 FetchFirst
479
480  SELECT * FROM
481  (
482  SELECT * FROM (
483   SELECT * FROM (
484    SELECT * FROM ...
485   ) ORDER BY $reversed_original_order
486     FETCH FIRST $limit ROWS ONLY
487  ) ORDER BY $original_order
488    FETCH FIRST $limit ROWS ONLY
489  )
490
491 Unreliable FetchFirst-based implementation, supported by B<< IBM DB2 <= V5R3 >>.
492
493 =head3 CAVEAT
494
495 Due to its implementation, this limit dialect returns B<incorrect results>
496 when $limit+$offset > total amount of rows in the resultset.
497
498 =cut
499
500 sub _FetchFirst {
501   my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
502
503   my $lim = $self->_prep_for_skimming_limit($sql, $rs_attrs);
504
505   $sql = sprintf ('SELECT %s %s %s %s FETCH FIRST %u ROWS ONLY',
506     $offset ? $lim->{selection_inner} : $lim->{selection_original},
507     $lim->{query_leftover},
508     $lim->{grpby_having},
509     $lim->{order_by_inner},
510     $rows + ($offset||0),
511   );
512
513   $sql = sprintf ('SELECT %s FROM ( %s ) %s %s FETCH FIRST %u ROWS ONLY',
514     $lim->{selection_middle},
515     $sql,
516     $lim->{quoted_rs_alias},
517     $lim->{order_by_middle},
518     $rows,
519   ) if $offset;
520
521
522   $sql = sprintf ('SELECT %s FROM ( %s ) %s %s',
523     $lim->{selection_outer},
524     $sql,
525     $lim->{quoted_rs_alias},
526     $lim->{order_by_requested},
527   ) if $offset and (
528     $lim->{order_by_requested} or $lim->{selection_middle} ne $lim->{selection_outer}
529   );
530
531   return $sql;
532 }
533
534 =head2 GenericSubQ
535
536  SELECT * FROM (
537   SELECT ...
538  )
539  WHERE (
540   SELECT COUNT(*) FROM $original_table cnt WHERE cnt.id < $original_table.id
541  ) BETWEEN $offset AND ($offset+$rows-1)
542
543 This is the most evil limit "dialect" (more of a hack) for I<really> stupid
544 databases. It works by ordering the set by some unique column, and calculating
545 the amount of rows that have a less-er value (thus emulating a L</RowNum>-like
546 index). Of course this implies the set can only be ordered by a single unique
547 column.
548
549 Also note that this technique can be and often is B<excruciatingly slow>. You
550 may have much better luck using L<DBIx::Class::ResultSet/software_limit>
551 instead.
552
553 Currently used by B<Sybase ASE>, due to lack of any other option.
554
555 =cut
556 sub _GenericSubQ {
557   my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
558
559   my $root_rsrc = $rs_attrs->{_rsroot_rsrc};
560
561   # Explicitly require an order_by
562   # GenSubQ is slow enough as it is, just emulating things
563   # like in other cases is not wise - make the user work
564   # to shoot their DBA in the foot
565   my $supplied_order = delete $rs_attrs->{order_by} or $self->throw_exception (
566     'Generic Subquery Limit does not work on resultsets without an order. Provide a stable, '
567   . 'root-table-based order criteria.'
568   );
569
570   my $usable_order_ci = $root_rsrc->storage->_main_source_order_by_portion_is_stable(
571     $root_rsrc,
572     $supplied_order,
573     $rs_attrs->{where},
574   ) or $self->throw_exception(
575     'Generic Subquery Limit can not work with order criteria based on sources other than the current one'
576   );
577
578 ###
579 ###
580 ### we need to know the directions after we figured out the above - reextract *again*
581 ### this is eyebleed - trying to get it to work at first
582   my @order_bits = do {
583     local $self->{quote_char};
584     local $self->{order_bind};
585     map { ref $_ ? $_->[0] : $_ } $self->_order_by_chunks ($supplied_order)
586   };
587
588   # truncate to what we'll use
589   $#order_bits = ( (keys %$usable_order_ci) - 1 );
590
591   # @order_bits likely will come back quoted (due to how the prefetch
592   # rewriter operates
593   # Hence supplement the column_info lookup table with quoted versions
594   if ($self->quote_char) {
595     $usable_order_ci->{$self->_quote($_)} = $usable_order_ci->{$_}
596       for keys %$usable_order_ci;
597   }
598
599 # calculate the condition
600   my $count_tbl_alias = 'rownum__emulation';
601   my $root_alias = $rs_attrs->{alias};
602   my $root_tbl_name = $root_rsrc->name;
603
604   my (@unqualified_names, @qualified_names, @is_desc, @new_order_by);
605
606   for my $bit (@order_bits) {
607
608     ($bit, my $is_desc) = $self->_split_order_chunk($bit);
609
610     push @is_desc, $is_desc;
611     push @unqualified_names, $usable_order_ci->{$bit}{-colname};
612     push @qualified_names, $usable_order_ci->{$bit}{-fq_colname};
613
614     push @new_order_by, { ($is_desc ? '-desc' : '-asc') => $usable_order_ci->{$bit}{-fq_colname} };
615   };
616
617   my (@where_cond, @skip_colpair_stack);
618   for my $i (0 .. $#order_bits) {
619     my $ci = $usable_order_ci->{$order_bits[$i]};
620
621     my ($subq_col, $main_col) = map { "$_.$ci->{-colname}" } ($count_tbl_alias, $root_alias);
622     my $cur_cond = { $subq_col => { ($is_desc[$i] ? '>' : '<') => { -ident => $main_col } } };
623
624     push @skip_colpair_stack, [
625       { $main_col => { -ident => $subq_col } },
626     ];
627
628     # we can trust the nullability flag because
629     # we already used it during _id_col_set resolution
630     #
631     if ($ci->{is_nullable}) {
632       push @{$skip_colpair_stack[-1]}, { $main_col => undef, $subq_col=> undef };
633
634       $cur_cond = [
635         {
636           ($is_desc[$i] ? $subq_col : $main_col) => { '!=', undef },
637           ($is_desc[$i] ? $main_col : $subq_col) => undef,
638         },
639         {
640           $subq_col => { '!=', undef },
641           $main_col => { '!=', undef },
642           -and => $cur_cond,
643         },
644       ];
645     }
646
647     push @where_cond, { '-and', => [ @skip_colpair_stack[0..$i-1], $cur_cond ] };
648   }
649
650 # reuse the sqlmaker WHERE, this will not be returning binds
651   my $counted_where = do {
652     local $self->{where_bind};
653     $self->where(\@where_cond);
654   };
655
656 # construct the rownum condition by hand
657   my $rownum_cond;
658   if ($offset) {
659     $rownum_cond = 'BETWEEN ? AND ?';
660     push @{$self->{limit_bind}},
661       [ $self->__offset_bindtype => $offset ],
662       [ $self->__total_bindtype => $offset + $rows - 1]
663     ;
664   }
665   else {
666     $rownum_cond = '< ?';
667     push @{$self->{limit_bind}},
668       [ $self->__rows_bindtype => $rows ]
669     ;
670   }
671
672 # and what we will order by inside
673   my $inner_order_sql = do {
674     local $self->{order_bind};
675
676     my $s = $self->_order_by (\@new_order_by);
677
678     $self->throw_exception('Inner gensubq order may not contain binds... something went wrong')
679       if @{$self->{order_bind}};
680
681     $s;
682   };
683
684 ### resume originally scheduled programming
685 ###
686 ###
687
688   # we need to supply the order for the supplements to be properly calculated
689   my $sq_attrs = $self->_subqueried_limit_attrs (
690     $sql, { %$rs_attrs, order_by => \@new_order_by }
691   );
692
693   my $in_sel = $sq_attrs->{selection_inner};
694
695   # add the order supplement (if any) as this is what will be used for the outer WHERE
696   $in_sel .= ", $_" for sort keys %{$sq_attrs->{order_supplement}};
697
698   my $group_having_sql = $self->_parse_rs_attrs($rs_attrs);
699
700
701   return sprintf ("
702 SELECT $sq_attrs->{selection_outer}
703   FROM (
704     SELECT $in_sel $sq_attrs->{query_leftover}${group_having_sql}
705   ) %s
706 WHERE ( SELECT COUNT(*) FROM %s %s $counted_where ) $rownum_cond
707 $inner_order_sql
708   ", map { $self->_quote ($_) } (
709     $rs_attrs->{alias},
710     $root_tbl_name,
711     $count_tbl_alias,
712   ));
713 }
714
715
716 # !!! THIS IS ALSO HORRIFIC !!! /me ashamed
717 #
718 # Generates inner/outer select lists for various limit dialects
719 # which result in one or more subqueries (e.g. RNO, Top, RowNum)
720 # Any non-root-table columns need to have their table qualifier
721 # turned into a column alias (otherwise names in subqueries clash
722 # and/or lose their source table)
723 #
724 # Returns mangled proto-sql, inner/outer strings of SQL QUOTED selectors
725 # with aliases (to be used in whatever select statement), and an alias
726 # index hashref of QUOTED SEL => QUOTED ALIAS pairs (to maybe be used
727 # for string-subst higher up).
728 # If an order_by is supplied, the inner select needs to bring out columns
729 # used in implicit (non-selected) orders, and the order condition itself
730 # needs to be realiased to the proper names in the outer query. Thus we
731 # also return a hashref (order doesn't matter) of QUOTED EXTRA-SEL =>
732 # QUOTED ALIAS pairs, which is a list of extra selectors that do *not*
733 # exist in the original select list
734 sub _subqueried_limit_attrs {
735   my ($self, $proto_sql, $rs_attrs) = @_;
736
737   $self->throw_exception(
738     'Limit dialect implementation usable only in the context of DBIC (missing $rs_attrs)'
739   ) unless ref ($rs_attrs) eq 'HASH';
740
741   # mangle the input sql as we will be replacing the selector entirely
742   unless (
743     $rs_attrs->{_selector_sql}
744       and
745     $proto_sql =~ s/^ \s* SELECT \s* \Q$rs_attrs->{_selector_sql}//ix
746   ) {
747     $self->throw_exception("Unrecognizable SELECT: $proto_sql");
748   }
749
750   my ($re_sep, $re_alias) = map { quotemeta $_ } ( $self->{name_sep}, $rs_attrs->{alias} );
751
752   # correlate select and as, build selection index
753   my (@sel, $in_sel_index);
754   for my $i (0 .. $#{$rs_attrs->{select}}) {
755
756     my $s = $rs_attrs->{select}[$i];
757     my $sql_alias = (ref $s) eq 'HASH' ? $s->{-as} : undef;
758
759     # we throw away the @bind here deliberately
760     my ($sql_sel) = $self->_recurse_fields ($s);
761
762     push @sel, {
763       arg => $s,
764       sql => $sql_sel,
765       unquoted_sql => do {
766         local $self->{quote_char};
767         ($self->_recurse_fields ($s))[0]; # ignore binds again
768       },
769       as =>
770         $sql_alias
771           ||
772         $rs_attrs->{as}[$i]
773           ||
774         $self->throw_exception("Select argument $i ($s) without corresponding 'as'")
775       ,
776     };
777
778     # anything with a placeholder in it needs re-selection
779     $in_sel_index->{$sql_sel}++ unless $sql_sel =~ / (?: ^ | \W ) \? (?: \W | $ ) /x;
780
781     $in_sel_index->{$self->_quote ($sql_alias)}++ if $sql_alias;
782
783     # record unqualified versions too, so we do not have
784     # to reselect the same column twice (in qualified and
785     # unqualified form)
786     if (! ref $s && $sql_sel =~ / $re_sep (.+) $/x) {
787       $in_sel_index->{$1}++;
788     }
789   }
790
791
792   # re-alias and remove any name separators from aliases,
793   # unless we are dealing with the current source alias
794   # (which will transcend the subqueries as it is necessary
795   # for possible further chaining)
796   # same for anything we do not recognize
797   my ($sel, $renamed);
798   for my $node (@sel) {
799     push @{$sel->{original}}, $node->{sql};
800
801     if (
802       ! $in_sel_index->{$node->{sql}}
803         or
804       $node->{as} =~ / (?<! ^ $re_alias ) \. /x
805         or
806       $node->{unquoted_sql} =~ / (?<! ^ $re_alias ) $re_sep /x
807     ) {
808       $node->{as} = $self->_unqualify_colname($node->{as});
809       my $quoted_as = $self->_quote($node->{as});
810       push @{$sel->{inner}}, sprintf '%s AS %s', $node->{sql}, $quoted_as;
811       push @{$sel->{outer}}, $quoted_as;
812       $renamed->{$node->{sql}} = $quoted_as;
813     }
814     else {
815       push @{$sel->{inner}}, $node->{sql};
816       push @{$sel->{outer}}, $self->_quote (ref $node->{arg} ? $node->{as} : $node->{arg});
817     }
818   }
819
820   # see if the order gives us anything
821   my $extra_order_sel;
822   for my $chunk ($self->_order_by_chunks ($rs_attrs->{order_by})) {
823     # order with bind
824     $chunk = $chunk->[0] if (ref $chunk) eq 'ARRAY';
825     ($chunk) = $self->_split_order_chunk($chunk);
826
827     next if $in_sel_index->{$chunk};
828
829     $extra_order_sel->{$chunk} ||= $self->_quote (
830       'ORDER__BY__' . sprintf '%03d', scalar keys %{$extra_order_sel||{}}
831     );
832   }
833
834   return {
835     query_leftover => $proto_sql,
836     (map {( "selection_$_" => join (', ', @{$sel->{$_}} ) )} keys %$sel ),
837     outer_renames => $renamed,
838     order_supplement => $extra_order_sel,
839   };
840 }
841
842 sub _unqualify_colname {
843   my ($self, $fqcn) = @_;
844   $fqcn =~ s/ \. /__/xg;
845   return $fqcn;
846 }
847
848 1;
849
850 =head1 AUTHORS
851
852 See L<DBIx::Class/CONTRIBUTORS>.
853
854 =head1 LICENSE
855
856 You may distribute this code under the same terms as Perl itself.
857
858 =cut