Slightly golf ::ResultSource::DESTROY and several weaken() calls
[dbsrgits/DBIx-Class-Historic.git] / lib / DBIx / Class / SQLMaker / LimitDialects.pm
1 package DBIx::Class::SQLMaker::LimitDialects;
2
3 use warnings;
4 use strict;
5
6 # constants are used not only here, but also in comparison tests
7 sub __rows_bindtype () {
8   +{ sqlt_datatype => 'integer' }
9 }
10 sub __offset_bindtype () {
11   +{ sqlt_datatype => 'integer' }
12 }
13 sub __total_bindtype () {
14   +{ sqlt_datatype => 'integer' }
15 }
16
17 =head1 NAME
18
19 DBIx::Class::SQLMaker::LimitDialects - SQL::Abstract::Limit-like functionality for DBIx::Class::SQLMaker
20
21 =head1 DESCRIPTION
22
23 This module replicates a lot of the functionality originally found in
24 L<SQL::Abstract::Limit>. While simple limits would work as-is, the more
25 complex dialects that require e.g. subqueries could not be reliably
26 implemented without taking full advantage of the metadata locked within
27 L<DBIx::Class::ResultSource> classes. After reimplementation of close to
28 80% of the L<SQL::Abstract::Limit> functionality it was deemed more
29 practical to simply make an independent DBIx::Class-specific limit-dialect
30 provider.
31
32 =head1 SQL LIMIT DIALECTS
33
34 Note that the actual implementations listed below never use C<*> literally.
35 Instead proper re-aliasing of selectors and order criteria is done, so that
36 the limit dialect are safe to use on joined resultsets with clashing column
37 names.
38
39 Currently the provided dialects are:
40
41 =head2 LimitOffset
42
43  SELECT ... LIMIT $limit OFFSET $offset
44
45 Supported by B<PostgreSQL> and B<SQLite>
46
47 =cut
48 sub _LimitOffset {
49     my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
50     $sql .= $self->_parse_rs_attrs( $rs_attrs ) . " LIMIT ?";
51     push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
52     if ($offset) {
53       $sql .= " OFFSET ?";
54       push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset ];
55     }
56     return $sql;
57 }
58
59 =head2 LimitXY
60
61  SELECT ... LIMIT $offset, $limit
62
63 Supported by B<MySQL> and any L<SQL::Statement> based DBD
64
65 =cut
66 sub _LimitXY {
67     my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
68     $sql .= $self->_parse_rs_attrs( $rs_attrs ) . " LIMIT ";
69     if ($offset) {
70       $sql .= '?, ';
71       push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset ];
72     }
73     $sql .= '?';
74     push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
75
76     return $sql;
77 }
78
79 =head2 RowNumberOver
80
81  SELECT * FROM (
82   SELECT *, ROW_NUMBER() OVER( ORDER BY ... ) AS RNO__ROW__INDEX FROM (
83    SELECT ...
84   )
85  ) WHERE RNO__ROW__INDEX BETWEEN ($offset+1) AND ($limit+$offset)
86
87
88 ANSI standard Limit/Offset implementation. Supported by B<DB2> and
89 B<< MSSQL >= 2005 >>.
90
91 =cut
92 sub _RowNumberOver {
93   my ($self, $sql, $rs_attrs, $rows, $offset ) = @_;
94
95   # get selectors, and scan the order_by (if any)
96   my $sq_attrs = $self->_subqueried_limit_attrs ( $sql, $rs_attrs );
97
98   # make up an order if none exists
99   my $requested_order = (delete $rs_attrs->{order_by}) || $self->_rno_default_order;
100
101   # the order binds (if any) will need to go at the end of the entire inner select
102   local $self->{order_bind};
103   my $rno_ord = $self->_order_by ($requested_order);
104   push @{$self->{select_bind}}, @{$self->{order_bind}};
105
106   # this is the order supplement magic
107   my $mid_sel = $sq_attrs->{selection_outer};
108   if (my $extra_order_sel = $sq_attrs->{order_supplement}) {
109     for my $extra_col (sort
110       { $extra_order_sel->{$a} cmp $extra_order_sel->{$b} }
111       keys %$extra_order_sel
112     ) {
113       $sq_attrs->{selection_inner} .= sprintf (', %s AS %s',
114         $extra_col,
115         $extra_order_sel->{$extra_col},
116       );
117     }
118   }
119
120   # and this is order re-alias magic
121   for my $map ($sq_attrs->{order_supplement}, $sq_attrs->{outer_renames}) {
122     for my $col (sort { (length $b) <=> (length $a) } keys %{$map||{}} ) {
123       my $re_col = quotemeta ($col);
124       $rno_ord =~ s/$re_col/$map->{$col}/;
125     }
126   }
127
128   # whatever is left of the order_by (only where is processed at this point)
129   my $group_having = $self->_parse_rs_attrs($rs_attrs);
130
131   my $qalias = $self->_quote ($rs_attrs->{alias});
132   my $idx_name = $self->_quote ('rno__row__index');
133
134   push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset + 1], [ $self->__total_bindtype => $offset + $rows ];
135
136   return <<EOS;
137
138 SELECT $sq_attrs->{selection_outer} FROM (
139   SELECT $mid_sel, ROW_NUMBER() OVER( $rno_ord ) AS $idx_name FROM (
140     SELECT $sq_attrs->{selection_inner} $sq_attrs->{query_leftover}${group_having}
141   ) $qalias
142 ) $qalias WHERE $idx_name >= ? AND $idx_name <= ?
143
144 EOS
145
146 }
147
148 # some databases are happy with OVER (), some need OVER (ORDER BY (SELECT (1)) )
149 sub _rno_default_order {
150   return undef;
151 }
152
153 =head2 SkipFirst
154
155  SELECT SKIP $offset FIRST $limit * FROM ...
156
157 Supported by B<Informix>, almost like LimitOffset. According to
158 L<SQL::Abstract::Limit> C<... SKIP $offset LIMIT $limit ...> is also supported.
159
160 =cut
161 sub _SkipFirst {
162   my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
163
164   $sql =~ s/^ \s* SELECT \s+ //ix
165     or $self->throw_exception("Unrecognizable SELECT: $sql");
166
167   return sprintf ('SELECT %s%s%s%s',
168     $offset
169       ? do {
170          push @{$self->{pre_select_bind}}, [ $self->__offset_bindtype => $offset];
171          'SKIP ? '
172       }
173       : ''
174     ,
175     do {
176        push @{$self->{pre_select_bind}}, [ $self->__rows_bindtype => $rows ];
177        'FIRST ? '
178     },
179     $sql,
180     $self->_parse_rs_attrs ($rs_attrs),
181   );
182 }
183
184 =head2 FirstSkip
185
186  SELECT FIRST $limit SKIP $offset * FROM ...
187
188 Supported by B<Firebird/Interbase>, reverse of SkipFirst. According to
189 L<SQL::Abstract::Limit> C<... ROWS $limit TO $offset ...> is also supported.
190
191 =cut
192 sub _FirstSkip {
193   my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
194
195   $sql =~ s/^ \s* SELECT \s+ //ix
196     or $self->throw_exception("Unrecognizable SELECT: $sql");
197
198   return sprintf ('SELECT %s%s%s%s',
199     do {
200        push @{$self->{pre_select_bind}}, [ $self->__rows_bindtype => $rows ];
201        'FIRST ? '
202     },
203     $offset
204       ? do {
205          push @{$self->{pre_select_bind}}, [ $self->__offset_bindtype => $offset];
206          'SKIP ? '
207       }
208       : ''
209     ,
210     $sql,
211     $self->_parse_rs_attrs ($rs_attrs),
212   );
213 }
214
215
216 =head2 RowNum
217
218 Depending on the resultset attributes one of:
219
220  SELECT * FROM (
221   SELECT *, ROWNUM AS rownum__index FROM (
222    SELECT ...
223   ) WHERE ROWNUM <= ($limit+$offset)
224  ) WHERE rownum__index >= ($offset+1)
225
226 or
227
228  SELECT * FROM (
229   SELECT *, ROWNUM AS rownum__index FROM (
230     SELECT ...
231   )
232  ) WHERE rownum__index BETWEEN ($offset+1) AND ($limit+$offset)
233
234 or
235
236  SELECT * FROM (
237     SELECT ...
238   ) WHERE ROWNUM <= ($limit+1)
239
240 Supported by B<Oracle>.
241
242 =cut
243 sub _RowNum {
244   my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
245
246   my $sq_attrs = $self->_subqueried_limit_attrs ($sql, $rs_attrs);
247
248   my $qalias = $self->_quote ($rs_attrs->{alias});
249   my $idx_name = $self->_quote ('rownum__index');
250   my $order_group_having = $self->_parse_rs_attrs($rs_attrs);
251
252
253   # if no offset (e.g. first page) - we can skip one of the subqueries
254   if (! $offset) {
255     push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
256
257     return <<EOS;
258 SELECT $sq_attrs->{selection_outer} FROM (
259   SELECT $sq_attrs->{selection_inner} $sq_attrs->{query_leftover}${order_group_having}
260 ) $qalias WHERE ROWNUM <= ?
261 EOS
262   }
263
264   #
265   # There are two ways to limit in Oracle, one vastly faster than the other
266   # on large resultsets: https://decipherinfosys.wordpress.com/2007/08/09/paging-and-countstopkey-optimization/
267   # However Oracle is retarded and does not preserve stable ROWNUM() values
268   # when called twice in the same scope. Therefore unless the resultset is
269   # ordered by a unique set of columns, it is not safe to use the faster
270   # method, and the slower BETWEEN query is used instead
271   #
272   # FIXME - this is quite expensive, and does not perform caching of any sort
273   # as soon as some of the SQLA-inlining work becomes viable consider adding
274   # some rudimentary caching support
275   if (
276     $rs_attrs->{order_by}
277       and
278     $rs_attrs->{result_source}->storage->_order_by_is_stable(
279       @{$rs_attrs}{qw/from order_by where/}
280     )
281   ) {
282     push @{$self->{limit_bind}}, [ $self->__total_bindtype => $offset + $rows ], [ $self->__offset_bindtype => $offset + 1 ];
283
284     return <<EOS;
285 SELECT $sq_attrs->{selection_outer} FROM (
286   SELECT $sq_attrs->{selection_outer}, ROWNUM AS $idx_name FROM (
287     SELECT $sq_attrs->{selection_inner} $sq_attrs->{query_leftover}${order_group_having}
288   ) $qalias WHERE ROWNUM <= ?
289 ) $qalias WHERE $idx_name >= ?
290 EOS
291   }
292   else {
293     push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset + 1 ], [ $self->__total_bindtype => $offset + $rows ];
294
295     return <<EOS;
296 SELECT $sq_attrs->{selection_outer} FROM (
297   SELECT $sq_attrs->{selection_outer}, ROWNUM AS $idx_name FROM (
298     SELECT $sq_attrs->{selection_inner} $sq_attrs->{query_leftover}${order_group_having}
299   ) $qalias
300 ) $qalias WHERE $idx_name BETWEEN ? AND ?
301 EOS
302   }
303 }
304
305 # used by _Top and _FetchFirst below
306 sub _prep_for_skimming_limit {
307   my ( $self, $sql, $rs_attrs ) = @_;
308
309   # get selectors
310   my $sq_attrs = $self->_subqueried_limit_attrs ($sql, $rs_attrs);
311
312   my $requested_order = delete $rs_attrs->{order_by};
313   $sq_attrs->{order_by_requested} = $self->_order_by ($requested_order);
314   $sq_attrs->{grpby_having} = $self->_parse_rs_attrs ($rs_attrs);
315
316   # without an offset things are easy
317   if (! $rs_attrs->{offset}) {
318     $sq_attrs->{order_by_inner} = $sq_attrs->{order_by_requested};
319   }
320   else {
321     $sq_attrs->{quoted_rs_alias} = $self->_quote ($rs_attrs->{alias});
322
323     # localise as we already have all the bind values we need
324     local $self->{order_bind};
325
326     # make up an order unless supplied or sanity check what we are given
327     my $inner_order;
328     if ($sq_attrs->{order_by_requested}) {
329       $self->throw_exception (
330         'Unable to safely perform "skimming type" limit with supplied unstable order criteria'
331       ) unless ($rs_attrs->{result_source}->schema->storage->_order_by_is_stable(
332         $rs_attrs->{from},
333         $requested_order,
334         $rs_attrs->{where},
335       ));
336
337       $inner_order = $requested_order;
338     }
339     else {
340       $inner_order = [ map
341         { "$rs_attrs->{alias}.$_" }
342         ( @{
343           $rs_attrs->{result_source}->_identifying_column_set
344             ||
345           $self->throw_exception(sprintf(
346             'Unable to auto-construct stable order criteria for "skimming type" limit '
347           . "dialect based on source '%s'", $rs_attrs->{result_source}->name) );
348         } )
349       ];
350     }
351
352     $sq_attrs->{order_by_inner} = $self->_order_by ($inner_order);
353
354     my @out_chunks;
355     for my $ch ($self->_order_by_chunks ($inner_order)) {
356       $ch = $ch->[0] if ref $ch eq 'ARRAY';
357
358       ($ch, my $is_desc) = $self->_split_order_chunk($ch);
359
360       # !NOTE! outside chunks come in reverse order ( !$is_desc )
361       push @out_chunks, { ($is_desc ? '-asc' : '-desc') => \$ch };
362     }
363
364     $sq_attrs->{order_by_middle} = $self->_order_by (\@out_chunks);
365
366     # this is the order supplement magic
367     $sq_attrs->{selection_middle} = $sq_attrs->{selection_outer};
368     if (my $extra_order_sel = $sq_attrs->{order_supplement}) {
369       for my $extra_col (sort
370         { $extra_order_sel->{$a} cmp $extra_order_sel->{$b} }
371         keys %$extra_order_sel
372       ) {
373         $sq_attrs->{selection_inner} .= sprintf (', %s AS %s',
374           $extra_col,
375           $extra_order_sel->{$extra_col},
376         );
377
378         $sq_attrs->{selection_middle} .= ', ' . $extra_order_sel->{$extra_col};
379       }
380
381       # Whatever order bindvals there are, they will be realiased and
382       # reselected, and need to show up at end of the initial inner select
383       push @{$self->{select_bind}}, @{$self->{order_bind}};
384     }
385
386     # and this is order re-alias magic
387     for my $map ($sq_attrs->{order_supplement}, $sq_attrs->{outer_renames}) {
388       for my $col (sort { (length $b) <=> (length $a) } keys %{$map||{}}) {
389         my $re_col = quotemeta ($col);
390         $_ =~ s/$re_col/$map->{$col}/
391           for ($sq_attrs->{order_by_middle}, $sq_attrs->{order_by_requested});
392       }
393     }
394   }
395
396   $sq_attrs;
397 }
398
399 =head2 Top
400
401  SELECT * FROM
402
403  SELECT TOP $limit FROM (
404   SELECT TOP $limit FROM (
405    SELECT TOP ($limit+$offset) ...
406   ) ORDER BY $reversed_original_order
407  ) ORDER BY $original_order
408
409 Unreliable Top-based implementation, supported by B<< MSSQL < 2005 >>.
410
411 =head3 CAVEAT
412
413 Due to its implementation, this limit dialect returns B<incorrect results>
414 when $limit+$offset > total amount of rows in the resultset.
415
416 =cut
417
418 sub _Top {
419   my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
420
421   my $lim = $self->_prep_for_skimming_limit($sql, $rs_attrs);
422
423   $sql = sprintf ('SELECT TOP %u %s %s %s %s',
424     $rows + ($offset||0),
425     $offset ? $lim->{selection_inner} : $lim->{selection_original},
426     $lim->{query_leftover},
427     $lim->{grpby_having},
428     $lim->{order_by_inner},
429   );
430
431   $sql = sprintf ('SELECT TOP %u %s FROM ( %s ) %s %s',
432     $rows,
433     $lim->{selection_middle},
434     $sql,
435     $lim->{quoted_rs_alias},
436     $lim->{order_by_middle},
437   ) if $offset;
438
439   $sql = sprintf ('SELECT %s FROM ( %s ) %s %s',
440     $lim->{selection_outer},
441     $sql,
442     $lim->{quoted_rs_alias},
443     $lim->{order_by_requested},
444   ) if $offset and (
445     $lim->{order_by_requested} or $lim->{selection_middle} ne $lim->{selection_outer}
446   );
447
448   return $sql;
449 }
450
451 =head2 FetchFirst
452
453  SELECT * FROM
454  (
455  SELECT * FROM (
456   SELECT * FROM (
457    SELECT * FROM ...
458   ) ORDER BY $reversed_original_order
459     FETCH FIRST $limit ROWS ONLY
460  ) ORDER BY $original_order
461    FETCH FIRST $limit ROWS ONLY
462  )
463
464 Unreliable FetchFirst-based implementation, supported by B<< IBM DB2 <= V5R3 >>.
465
466 =head3 CAVEAT
467
468 Due to its implementation, this limit dialect returns B<incorrect results>
469 when $limit+$offset > total amount of rows in the resultset.
470
471 =cut
472
473 sub _FetchFirst {
474   my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
475
476   my $lim = $self->_prep_for_skimming_limit($sql, $rs_attrs);
477
478   $sql = sprintf ('SELECT %s %s %s %s FETCH FIRST %u ROWS ONLY',
479     $offset ? $lim->{selection_inner} : $lim->{selection_original},
480     $lim->{query_leftover},
481     $lim->{grpby_having},
482     $lim->{order_by_inner},
483     $rows + ($offset||0),
484   );
485
486   $sql = sprintf ('SELECT %s FROM ( %s ) %s %s FETCH FIRST %u ROWS ONLY',
487     $lim->{selection_middle},
488     $sql,
489     $lim->{quoted_rs_alias},
490     $lim->{order_by_middle},
491     $rows,
492   ) if $offset;
493
494
495   $sql = sprintf ('SELECT %s FROM ( %s ) %s %s',
496     $lim->{selection_outer},
497     $sql,
498     $lim->{quoted_rs_alias},
499     $lim->{order_by_requested},
500   ) if $offset and (
501     $lim->{order_by_requested} or $lim->{selection_middle} ne $lim->{selection_outer}
502   );
503
504   return $sql;
505 }
506
507 =head2 GenericSubQ
508
509  SELECT * FROM (
510   SELECT ...
511  )
512  WHERE (
513   SELECT COUNT(*) FROM $original_table cnt WHERE cnt.id < $original_table.id
514  ) BETWEEN $offset AND ($offset+$rows-1)
515
516 This is the most evil limit "dialect" (more of a hack) for I<really> stupid
517 databases. It works by ordering the set by some unique column, and calculating
518 the amount of rows that have a less-er value (thus emulating a L</RowNum>-like
519 index). Of course this implies the set can only be ordered by a single unique
520 column.
521
522 Also note that this technique can be and often is B<excruciatingly slow>. You
523 may have much better luck using L<DBIx::Class::ResultSet/software_limit>
524 instead.
525
526 Currently used by B<Sybase ASE>, due to lack of any other option.
527
528 =cut
529 sub _GenericSubQ {
530   my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
531
532   my $main_rsrc = $rs_attrs->{result_source};
533
534   # Explicitly require an order_by
535   # GenSubQ is slow enough as it is, just emulating things
536   # like in other cases is not wise - make the user work
537   # to shoot their DBA in the foot
538   $self->throw_exception (
539     'Generic Subquery Limit does not work on resultsets without an order. Provide a stable, '
540   . 'main-table-based order criteria.'
541   ) unless $rs_attrs->{order_by};
542
543   my $usable_order_colinfo = $main_rsrc->storage->_extract_colinfo_of_stable_main_source_order_by_portion(
544     $rs_attrs
545   );
546
547   $self->throw_exception(
548     'Generic Subquery Limit can not work with order criteria based on sources other than the main one'
549   ) if (
550     ! keys %{$usable_order_colinfo||{}}
551       or
552     grep
553       { $_->{-source_alias} ne $rs_attrs->{alias} }
554       (values %$usable_order_colinfo)
555   );
556
557 ###
558 ###
559 ### we need to know the directions after we figured out the above - reextract *again*
560 ### this is eyebleed - trying to get it to work at first
561   my $supplied_order = delete $rs_attrs->{order_by};
562
563   my @order_bits = do {
564     local $self->{quote_char};
565     local $self->{order_bind};
566     map { ref $_ ? $_->[0] : $_ } $self->_order_by_chunks ($supplied_order)
567   };
568
569   # truncate to what we'll use
570   $#order_bits = ( (keys %$usable_order_colinfo) - 1 );
571
572   # @order_bits likely will come back quoted (due to how the prefetch
573   # rewriter operates
574   # Hence supplement the column_info lookup table with quoted versions
575   if ($self->quote_char) {
576     $usable_order_colinfo->{$self->_quote($_)} = $usable_order_colinfo->{$_}
577       for keys %$usable_order_colinfo;
578   }
579
580 # calculate the condition
581   my $count_tbl_alias = 'rownum__emulation';
582   my $main_alias = $rs_attrs->{alias};
583   my $main_tbl_name = $main_rsrc->name;
584
585   my (@unqualified_names, @qualified_names, @is_desc, @new_order_by);
586
587   for my $bit (@order_bits) {
588
589     ($bit, my $is_desc) = $self->_split_order_chunk($bit);
590
591     push @is_desc, $is_desc;
592     push @unqualified_names, $usable_order_colinfo->{$bit}{-colname};
593     push @qualified_names, $usable_order_colinfo->{$bit}{-fq_colname};
594
595     push @new_order_by, { ($is_desc ? '-desc' : '-asc') => $usable_order_colinfo->{$bit}{-fq_colname} };
596   };
597
598   my (@where_cond, @skip_colpair_stack);
599   for my $i (0 .. $#order_bits) {
600     my $ci = $usable_order_colinfo->{$order_bits[$i]};
601
602     my ($subq_col, $main_col) = map { "$_.$ci->{-colname}" } ($count_tbl_alias, $main_alias);
603     my $cur_cond = { $subq_col => { ($is_desc[$i] ? '>' : '<') => { -ident => $main_col } } };
604
605     push @skip_colpair_stack, [
606       { $main_col => { -ident => $subq_col } },
607     ];
608
609     # we can trust the nullability flag because
610     # we already used it during _id_col_set resolution
611     #
612     if ($ci->{is_nullable}) {
613       push @{$skip_colpair_stack[-1]}, { $main_col => undef, $subq_col=> undef };
614
615       $cur_cond = [
616         {
617           ($is_desc[$i] ? $subq_col : $main_col) => { '!=', undef },
618           ($is_desc[$i] ? $main_col : $subq_col) => undef,
619         },
620         {
621           $subq_col => { '!=', undef },
622           $main_col => { '!=', undef },
623           -and => $cur_cond,
624         },
625       ];
626     }
627
628     push @where_cond, { '-and', => [ @skip_colpair_stack[0..$i-1], $cur_cond ] };
629   }
630
631 # reuse the sqlmaker WHERE, this will not be returning binds
632   my $counted_where = do {
633     local $self->{where_bind};
634     $self->where(\@where_cond);
635   };
636
637 # construct the rownum condition by hand
638   my $rownum_cond;
639   if ($offset) {
640     $rownum_cond = 'BETWEEN ? AND ?';
641     push @{$self->{limit_bind}},
642       [ $self->__offset_bindtype => $offset ],
643       [ $self->__total_bindtype => $offset + $rows - 1]
644     ;
645   }
646   else {
647     $rownum_cond = '< ?';
648     push @{$self->{limit_bind}},
649       [ $self->__rows_bindtype => $rows ]
650     ;
651   }
652
653 # and what we will order by inside
654   my $inner_order_sql = do {
655     local $self->{order_bind};
656
657     my $s = $self->_order_by (\@new_order_by);
658
659     $self->throw_exception('Inner gensubq order may not contain binds... something went wrong')
660       if @{$self->{order_bind}};
661
662     $s;
663   };
664
665 ### resume originally scheduled programming
666 ###
667 ###
668
669   # we need to supply the order for the supplements to be properly calculated
670   my $sq_attrs = $self->_subqueried_limit_attrs (
671     $sql, { %$rs_attrs, order_by => \@new_order_by }
672   );
673
674   my $in_sel = $sq_attrs->{selection_inner};
675
676   # add the order supplement (if any) as this is what will be used for the outer WHERE
677   $in_sel .= ", $_" for sort keys %{$sq_attrs->{order_supplement}};
678
679   my $group_having_sql = $self->_parse_rs_attrs($rs_attrs);
680
681
682   return sprintf ("
683 SELECT $sq_attrs->{selection_outer}
684   FROM (
685     SELECT $in_sel $sq_attrs->{query_leftover}${group_having_sql}
686   ) %s
687 WHERE ( SELECT COUNT(*) FROM %s %s $counted_where ) $rownum_cond
688 $inner_order_sql
689   ", map { $self->_quote ($_) } (
690     $rs_attrs->{alias},
691     $main_tbl_name,
692     $count_tbl_alias,
693   ));
694 }
695
696
697 # !!! THIS IS ALSO HORRIFIC !!! /me ashamed
698 #
699 # Generates inner/outer select lists for various limit dialects
700 # which result in one or more subqueries (e.g. RNO, Top, RowNum)
701 # Any non-main-table columns need to have their table qualifier
702 # turned into a column alias (otherwise names in subqueries clash
703 # and/or lose their source table)
704 #
705 # Returns mangled proto-sql, inner/outer strings of SQL QUOTED selectors
706 # with aliases (to be used in whatever select statement), and an alias
707 # index hashref of QUOTED SEL => QUOTED ALIAS pairs (to maybe be used
708 # for string-subst higher up).
709 # If an order_by is supplied, the inner select needs to bring out columns
710 # used in implicit (non-selected) orders, and the order condition itself
711 # needs to be realiased to the proper names in the outer query. Thus we
712 # also return a hashref (order doesn't matter) of QUOTED EXTRA-SEL =>
713 # QUOTED ALIAS pairs, which is a list of extra selectors that do *not*
714 # exist in the original select list
715 sub _subqueried_limit_attrs {
716   my ($self, $proto_sql, $rs_attrs) = @_;
717
718   $self->throw_exception(
719     'Limit dialect implementation usable only in the context of DBIC (missing $rs_attrs)'
720   ) unless ref ($rs_attrs) eq 'HASH';
721
722   # mangle the input sql as we will be replacing the selector entirely
723   unless (
724     $rs_attrs->{_selector_sql}
725       and
726     $proto_sql =~ s/^ \s* SELECT \s* \Q$rs_attrs->{_selector_sql}//ix
727   ) {
728     $self->throw_exception("Unrecognizable SELECT: $proto_sql");
729   }
730
731   my ($re_sep, $re_alias) = map { quotemeta $_ } ( $self->{name_sep}, $rs_attrs->{alias} );
732
733   # correlate select and as, build selection index
734   my (@sel, $in_sel_index);
735   for my $i (0 .. $#{$rs_attrs->{select}}) {
736
737     my $s = $rs_attrs->{select}[$i];
738     my $sql_alias = (ref $s) eq 'HASH' ? $s->{-as} : undef;
739
740     # we throw away the @bind here deliberately
741     my ($sql_sel) = $self->_recurse_fields ($s);
742
743     push @sel, {
744       arg => $s,
745       sql => $sql_sel,
746       unquoted_sql => do {
747         local $self->{quote_char};
748         ($self->_recurse_fields ($s))[0]; # ignore binds again
749       },
750       as =>
751         $sql_alias
752           ||
753         $rs_attrs->{as}[$i]
754           ||
755         $self->throw_exception("Select argument $i ($s) without corresponding 'as'")
756       ,
757     };
758
759     # anything with a placeholder in it needs re-selection
760     $in_sel_index->{$sql_sel}++ unless $sql_sel =~ / (?: ^ | \W ) \? (?: \W | $ ) /x;
761
762     $in_sel_index->{$self->_quote ($sql_alias)}++ if $sql_alias;
763
764     # record unqualified versions too, so we do not have
765     # to reselect the same column twice (in qualified and
766     # unqualified form)
767     if (! ref $s && $sql_sel =~ / $re_sep (.+) $/x) {
768       $in_sel_index->{$1}++;
769     }
770   }
771
772
773   # re-alias and remove any name separators from aliases,
774   # unless we are dealing with the current source alias
775   # (which will transcend the subqueries as it is necessary
776   # for possible further chaining)
777   # same for anything we do not recognize
778   my ($sel, $renamed);
779   for my $node (@sel) {
780     push @{$sel->{original}}, $node->{sql};
781
782     if (
783       ! $in_sel_index->{$node->{sql}}
784         or
785       $node->{as} =~ / (?<! ^ $re_alias ) \. /x
786         or
787       $node->{unquoted_sql} =~ / (?<! ^ $re_alias ) $re_sep /x
788     ) {
789       $node->{as} = $self->_unqualify_colname($node->{as});
790       my $quoted_as = $self->_quote($node->{as});
791       push @{$sel->{inner}}, sprintf '%s AS %s', $node->{sql}, $quoted_as;
792       push @{$sel->{outer}}, $quoted_as;
793       $renamed->{$node->{sql}} = $quoted_as;
794     }
795     else {
796       push @{$sel->{inner}}, $node->{sql};
797       push @{$sel->{outer}}, $self->_quote (ref $node->{arg} ? $node->{as} : $node->{arg});
798     }
799   }
800
801   # see if the order gives us anything
802   my $extra_order_sel;
803   for my $chunk ($self->_order_by_chunks ($rs_attrs->{order_by})) {
804     # order with bind
805     $chunk = $chunk->[0] if (ref $chunk) eq 'ARRAY';
806     ($chunk) = $self->_split_order_chunk($chunk);
807
808     next if $in_sel_index->{$chunk};
809
810     $extra_order_sel->{$chunk} ||= $self->_quote (
811       'ORDER__BY__' . sprintf '%03d', scalar keys %{$extra_order_sel||{}}
812     );
813   }
814
815   return {
816     query_leftover => $proto_sql,
817     (map {( "selection_$_" => join (', ', @{$sel->{$_}} ) )} keys %$sel ),
818     outer_renames => $renamed,
819     order_supplement => $extra_order_sel,
820   };
821 }
822
823 sub _unqualify_colname {
824   my ($self, $fqcn) = @_;
825   $fqcn =~ s/ \. /__/xg;
826   return $fqcn;
827 }
828
829 =head1 FURTHER QUESTIONS?
830
831 Check the list of L<additional DBIC resources|DBIx::Class/GETTING HELP/SUPPORT>.
832
833 =head1 COPYRIGHT AND LICENSE
834
835 This module is free software L<copyright|DBIx::Class/COPYRIGHT AND LICENSE>
836 by the L<DBIx::Class (DBIC) authors|DBIx::Class/AUTHORS>. You can
837 redistribute it and/or modify it under the same terms as the
838 L<DBIx::Class library|DBIx::Class/COPYRIGHT AND LICENSE>.
839
840 =cut
841
842 1;