Clarify licensing, ensure footers are consistent throughout the project
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / SQLMaker / LimitDialects.pm
1 package DBIx::Class::SQLMaker::LimitDialects;
2
3 use warnings;
4 use strict;
5
6 use List::Util 'first';
7 use namespace::clean;
8
9 # constants are used not only here, but also in comparison tests
10 sub __rows_bindtype () {
11   +{ sqlt_datatype => 'integer' }
12 }
13 sub __offset_bindtype () {
14   +{ sqlt_datatype => 'integer' }
15 }
16 sub __total_bindtype () {
17   +{ sqlt_datatype => 'integer' }
18 }
19
20 =head1 NAME
21
22 DBIx::Class::SQLMaker::LimitDialects - SQL::Abstract::Limit-like functionality for DBIx::Class::SQLMaker
23
24 =head1 DESCRIPTION
25
26 This module replicates a lot of the functionality originally found in
27 L<SQL::Abstract::Limit>. While simple limits would work as-is, the more
28 complex dialects that require e.g. subqueries could not be reliably
29 implemented without taking full advantage of the metadata locked within
30 L<DBIx::Class::ResultSource> classes. After reimplementation of close to
31 80% of the L<SQL::Abstract::Limit> functionality it was deemed more
32 practical to simply make an independent DBIx::Class-specific limit-dialect
33 provider.
34
35 =head1 SQL LIMIT DIALECTS
36
37 Note that the actual implementations listed below never use C<*> literally.
38 Instead proper re-aliasing of selectors and order criteria is done, so that
39 the limit dialect are safe to use on joined resultsets with clashing column
40 names.
41
42 Currently the provided dialects are:
43
44 =head2 LimitOffset
45
46  SELECT ... LIMIT $limit OFFSET $offset
47
48 Supported by B<PostgreSQL> and B<SQLite>
49
50 =cut
51 sub _LimitOffset {
52     my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
53     $sql .= $self->_parse_rs_attrs( $rs_attrs ) . " LIMIT ?";
54     push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
55     if ($offset) {
56       $sql .= " OFFSET ?";
57       push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset ];
58     }
59     return $sql;
60 }
61
62 =head2 LimitXY
63
64  SELECT ... LIMIT $offset, $limit
65
66 Supported by B<MySQL> and any L<SQL::Statement> based DBD
67
68 =cut
69 sub _LimitXY {
70     my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
71     $sql .= $self->_parse_rs_attrs( $rs_attrs ) . " LIMIT ";
72     if ($offset) {
73       $sql .= '?, ';
74       push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset ];
75     }
76     $sql .= '?';
77     push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
78
79     return $sql;
80 }
81
82 =head2 RowNumberOver
83
84  SELECT * FROM (
85   SELECT *, ROW_NUMBER() OVER( ORDER BY ... ) AS RNO__ROW__INDEX FROM (
86    SELECT ...
87   )
88  ) WHERE RNO__ROW__INDEX BETWEEN ($offset+1) AND ($limit+$offset)
89
90
91 ANSI standard Limit/Offset implementation. Supported by B<DB2> and
92 B<< MSSQL >= 2005 >>.
93
94 =cut
95 sub _RowNumberOver {
96   my ($self, $sql, $rs_attrs, $rows, $offset ) = @_;
97
98   # get selectors, and scan the order_by (if any)
99   my $sq_attrs = $self->_subqueried_limit_attrs ( $sql, $rs_attrs );
100
101   # make up an order if none exists
102   my $requested_order = (delete $rs_attrs->{order_by}) || $self->_rno_default_order;
103
104   # the order binds (if any) will need to go at the end of the entire inner select
105   local $self->{order_bind};
106   my $rno_ord = $self->_order_by ($requested_order);
107   push @{$self->{select_bind}}, @{$self->{order_bind}};
108
109   # this is the order supplement magic
110   my $mid_sel = $sq_attrs->{selection_outer};
111   if (my $extra_order_sel = $sq_attrs->{order_supplement}) {
112     for my $extra_col (sort
113       { $extra_order_sel->{$a} cmp $extra_order_sel->{$b} }
114       keys %$extra_order_sel
115     ) {
116       $sq_attrs->{selection_inner} .= sprintf (', %s AS %s',
117         $extra_col,
118         $extra_order_sel->{$extra_col},
119       );
120     }
121   }
122
123   # and this is order re-alias magic
124   for my $map ($sq_attrs->{order_supplement}, $sq_attrs->{outer_renames}) {
125     for my $col (sort { (length $b) <=> (length $a) } keys %{$map||{}} ) {
126       my $re_col = quotemeta ($col);
127       $rno_ord =~ s/$re_col/$map->{$col}/;
128     }
129   }
130
131   # whatever is left of the order_by (only where is processed at this point)
132   my $group_having = $self->_parse_rs_attrs($rs_attrs);
133
134   my $qalias = $self->_quote ($rs_attrs->{alias});
135   my $idx_name = $self->_quote ('rno__row__index');
136
137   push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset + 1], [ $self->__total_bindtype => $offset + $rows ];
138
139   return <<EOS;
140
141 SELECT $sq_attrs->{selection_outer} FROM (
142   SELECT $mid_sel, ROW_NUMBER() OVER( $rno_ord ) AS $idx_name FROM (
143     SELECT $sq_attrs->{selection_inner} $sq_attrs->{query_leftover}${group_having}
144   ) $qalias
145 ) $qalias WHERE $idx_name >= ? AND $idx_name <= ?
146
147 EOS
148
149 }
150
151 # some databases are happy with OVER (), some need OVER (ORDER BY (SELECT (1)) )
152 sub _rno_default_order {
153   return undef;
154 }
155
156 =head2 SkipFirst
157
158  SELECT SKIP $offset FIRST $limit * FROM ...
159
160 Suported by B<Informix>, almost like LimitOffset. According to
161 L<SQL::Abstract::Limit> C<... SKIP $offset LIMIT $limit ...> is also supported.
162
163 =cut
164 sub _SkipFirst {
165   my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
166
167   $sql =~ s/^ \s* SELECT \s+ //ix
168     or $self->throw_exception("Unrecognizable SELECT: $sql");
169
170   return sprintf ('SELECT %s%s%s%s',
171     $offset
172       ? do {
173          push @{$self->{pre_select_bind}}, [ $self->__offset_bindtype => $offset];
174          'SKIP ? '
175       }
176       : ''
177     ,
178     do {
179        push @{$self->{pre_select_bind}}, [ $self->__rows_bindtype => $rows ];
180        'FIRST ? '
181     },
182     $sql,
183     $self->_parse_rs_attrs ($rs_attrs),
184   );
185 }
186
187 =head2 FirstSkip
188
189  SELECT FIRST $limit SKIP $offset * FROM ...
190
191 Supported by B<Firebird/Interbase>, reverse of SkipFirst. According to
192 L<SQL::Abstract::Limit> C<... ROWS $limit TO $offset ...> is also supported.
193
194 =cut
195 sub _FirstSkip {
196   my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
197
198   $sql =~ s/^ \s* SELECT \s+ //ix
199     or $self->throw_exception("Unrecognizable SELECT: $sql");
200
201   return sprintf ('SELECT %s%s%s%s',
202     do {
203        push @{$self->{pre_select_bind}}, [ $self->__rows_bindtype => $rows ];
204        'FIRST ? '
205     },
206     $offset
207       ? do {
208          push @{$self->{pre_select_bind}}, [ $self->__offset_bindtype => $offset];
209          'SKIP ? '
210       }
211       : ''
212     ,
213     $sql,
214     $self->_parse_rs_attrs ($rs_attrs),
215   );
216 }
217
218
219 =head2 RowNum
220
221 Depending on the resultset attributes one of:
222
223  SELECT * FROM (
224   SELECT *, ROWNUM AS rownum__index FROM (
225    SELECT ...
226   ) WHERE ROWNUM <= ($limit+$offset)
227  ) WHERE rownum__index >= ($offset+1)
228
229 or
230
231  SELECT * FROM (
232   SELECT *, ROWNUM AS rownum__index FROM (
233     SELECT ...
234   )
235  ) WHERE rownum__index BETWEEN ($offset+1) AND ($limit+$offset)
236
237 or
238
239  SELECT * FROM (
240     SELECT ...
241   ) WHERE ROWNUM <= ($limit+1)
242
243 Supported by B<Oracle>.
244
245 =cut
246 sub _RowNum {
247   my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
248
249   my $sq_attrs = $self->_subqueried_limit_attrs ($sql, $rs_attrs);
250
251   my $qalias = $self->_quote ($rs_attrs->{alias});
252   my $idx_name = $self->_quote ('rownum__index');
253   my $order_group_having = $self->_parse_rs_attrs($rs_attrs);
254
255
256   # if no offset (e.g. first page) - we can skip one of the subqueries
257   if (! $offset) {
258     push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
259
260     return <<EOS;
261 SELECT $sq_attrs->{selection_outer} FROM (
262   SELECT $sq_attrs->{selection_inner} $sq_attrs->{query_leftover}${order_group_having}
263 ) $qalias WHERE ROWNUM <= ?
264 EOS
265   }
266
267   #
268   # There are two ways to limit in Oracle, one vastly faster than the other
269   # on large resultsets: https://decipherinfosys.wordpress.com/2007/08/09/paging-and-countstopkey-optimization/
270   # However Oracle is retarded and does not preserve stable ROWNUM() values
271   # when called twice in the same scope. Therefore unless the resultset is
272   # ordered by a unique set of columns, it is not safe to use the faster
273   # method, and the slower BETWEEN query is used instead
274   #
275   # FIXME - this is quite expensive, and does not perform caching of any sort
276   # as soon as some of the DQ work becomes viable consider switching this
277   # over
278   if (
279     $rs_attrs->{order_by}
280       and
281     $rs_attrs->{result_source}->storage->_order_by_is_stable(
282       @{$rs_attrs}{qw/from order_by where/}
283     )
284   ) {
285     push @{$self->{limit_bind}}, [ $self->__total_bindtype => $offset + $rows ], [ $self->__offset_bindtype => $offset + 1 ];
286
287     return <<EOS;
288 SELECT $sq_attrs->{selection_outer} FROM (
289   SELECT $sq_attrs->{selection_outer}, ROWNUM AS $idx_name FROM (
290     SELECT $sq_attrs->{selection_inner} $sq_attrs->{query_leftover}${order_group_having}
291   ) $qalias WHERE ROWNUM <= ?
292 ) $qalias WHERE $idx_name >= ?
293 EOS
294   }
295   else {
296     push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset + 1 ], [ $self->__total_bindtype => $offset + $rows ];
297
298     return <<EOS;
299 SELECT $sq_attrs->{selection_outer} FROM (
300   SELECT $sq_attrs->{selection_outer}, ROWNUM AS $idx_name FROM (
301     SELECT $sq_attrs->{selection_inner} $sq_attrs->{query_leftover}${order_group_having}
302   ) $qalias
303 ) $qalias WHERE $idx_name BETWEEN ? AND ?
304 EOS
305   }
306 }
307
308 # used by _Top and _FetchFirst below
309 sub _prep_for_skimming_limit {
310   my ( $self, $sql, $rs_attrs ) = @_;
311
312   # get selectors
313   my $sq_attrs = $self->_subqueried_limit_attrs ($sql, $rs_attrs);
314
315   my $requested_order = delete $rs_attrs->{order_by};
316   $sq_attrs->{order_by_requested} = $self->_order_by ($requested_order);
317   $sq_attrs->{grpby_having} = $self->_parse_rs_attrs ($rs_attrs);
318
319   # without an offset things are easy
320   if (! $rs_attrs->{offset}) {
321     $sq_attrs->{order_by_inner} = $sq_attrs->{order_by_requested};
322   }
323   else {
324     $sq_attrs->{quoted_rs_alias} = $self->_quote ($rs_attrs->{alias});
325
326     # localise as we already have all the bind values we need
327     local $self->{order_bind};
328
329     # make up an order unless supplied or sanity check what we are given
330     my $inner_order;
331     if ($sq_attrs->{order_by_requested}) {
332       $self->throw_exception (
333         'Unable to safely perform "skimming type" limit with supplied unstable order criteria'
334       ) unless ($rs_attrs->{result_source}->schema->storage->_order_by_is_stable(
335         $rs_attrs->{from},
336         $requested_order,
337         $rs_attrs->{where},
338       ));
339
340       $inner_order = $requested_order;
341     }
342     else {
343       $inner_order = [ map
344         { "$rs_attrs->{alias}.$_" }
345         ( @{
346           $rs_attrs->{result_source}->_identifying_column_set
347             ||
348           $self->throw_exception(sprintf(
349             'Unable to auto-construct stable order criteria for "skimming type" limit '
350           . "dialect based on source '%s'", $rs_attrs->{result_source}->name) );
351         } )
352       ];
353     }
354
355     $sq_attrs->{order_by_inner} = $self->_order_by ($inner_order);
356
357     my @out_chunks;
358     for my $ch ($self->_order_by_chunks ($inner_order)) {
359       $ch = $ch->[0] if ref $ch eq 'ARRAY';
360
361       ($ch, my $is_desc) = $self->_split_order_chunk($ch);
362
363       # !NOTE! outside chunks come in reverse order ( !$is_desc )
364       push @out_chunks, { ($is_desc ? '-asc' : '-desc') => \$ch };
365     }
366
367     $sq_attrs->{order_by_middle} = $self->_order_by (\@out_chunks);
368
369     # this is the order supplement magic
370     $sq_attrs->{selection_middle} = $sq_attrs->{selection_outer};
371     if (my $extra_order_sel = $sq_attrs->{order_supplement}) {
372       for my $extra_col (sort
373         { $extra_order_sel->{$a} cmp $extra_order_sel->{$b} }
374         keys %$extra_order_sel
375       ) {
376         $sq_attrs->{selection_inner} .= sprintf (', %s AS %s',
377           $extra_col,
378           $extra_order_sel->{$extra_col},
379         );
380
381         $sq_attrs->{selection_middle} .= ', ' . $extra_order_sel->{$extra_col};
382       }
383
384       # Whatever order bindvals there are, they will be realiased and
385       # reselected, and need to show up at end of the initial inner select
386       push @{$self->{select_bind}}, @{$self->{order_bind}};
387     }
388
389     # and this is order re-alias magic
390     for my $map ($sq_attrs->{order_supplement}, $sq_attrs->{outer_renames}) {
391       for my $col (sort { (length $b) <=> (length $a) } keys %{$map||{}}) {
392         my $re_col = quotemeta ($col);
393         $_ =~ s/$re_col/$map->{$col}/
394           for ($sq_attrs->{order_by_middle}, $sq_attrs->{order_by_requested});
395       }
396     }
397   }
398
399   $sq_attrs;
400 }
401
402 =head2 Top
403
404  SELECT * FROM
405
406  SELECT TOP $limit FROM (
407   SELECT TOP $limit FROM (
408    SELECT TOP ($limit+$offset) ...
409   ) ORDER BY $reversed_original_order
410  ) ORDER BY $original_order
411
412 Unreliable Top-based implementation, supported by B<< MSSQL < 2005 >>.
413
414 =head3 CAVEAT
415
416 Due to its implementation, this limit dialect returns B<incorrect results>
417 when $limit+$offset > total amount of rows in the resultset.
418
419 =cut
420
421 sub _Top {
422   my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
423
424   my $lim = $self->_prep_for_skimming_limit($sql, $rs_attrs);
425
426   $sql = sprintf ('SELECT TOP %u %s %s %s %s',
427     $rows + ($offset||0),
428     $offset ? $lim->{selection_inner} : $lim->{selection_original},
429     $lim->{query_leftover},
430     $lim->{grpby_having},
431     $lim->{order_by_inner},
432   );
433
434   $sql = sprintf ('SELECT TOP %u %s FROM ( %s ) %s %s',
435     $rows,
436     $lim->{selection_middle},
437     $sql,
438     $lim->{quoted_rs_alias},
439     $lim->{order_by_middle},
440   ) if $offset;
441
442   $sql = sprintf ('SELECT %s FROM ( %s ) %s %s',
443     $lim->{selection_outer},
444     $sql,
445     $lim->{quoted_rs_alias},
446     $lim->{order_by_requested},
447   ) if $offset and (
448     $lim->{order_by_requested} or $lim->{selection_middle} ne $lim->{selection_outer}
449   );
450
451   return $sql;
452 }
453
454 =head2 FetchFirst
455
456  SELECT * FROM
457  (
458  SELECT * FROM (
459   SELECT * FROM (
460    SELECT * FROM ...
461   ) ORDER BY $reversed_original_order
462     FETCH FIRST $limit ROWS ONLY
463  ) ORDER BY $original_order
464    FETCH FIRST $limit ROWS ONLY
465  )
466
467 Unreliable FetchFirst-based implementation, supported by B<< IBM DB2 <= V5R3 >>.
468
469 =head3 CAVEAT
470
471 Due to its implementation, this limit dialect returns B<incorrect results>
472 when $limit+$offset > total amount of rows in the resultset.
473
474 =cut
475
476 sub _FetchFirst {
477   my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
478
479   my $lim = $self->_prep_for_skimming_limit($sql, $rs_attrs);
480
481   $sql = sprintf ('SELECT %s %s %s %s FETCH FIRST %u ROWS ONLY',
482     $offset ? $lim->{selection_inner} : $lim->{selection_original},
483     $lim->{query_leftover},
484     $lim->{grpby_having},
485     $lim->{order_by_inner},
486     $rows + ($offset||0),
487   );
488
489   $sql = sprintf ('SELECT %s FROM ( %s ) %s %s FETCH FIRST %u ROWS ONLY',
490     $lim->{selection_middle},
491     $sql,
492     $lim->{quoted_rs_alias},
493     $lim->{order_by_middle},
494     $rows,
495   ) if $offset;
496
497
498   $sql = sprintf ('SELECT %s FROM ( %s ) %s %s',
499     $lim->{selection_outer},
500     $sql,
501     $lim->{quoted_rs_alias},
502     $lim->{order_by_requested},
503   ) if $offset and (
504     $lim->{order_by_requested} or $lim->{selection_middle} ne $lim->{selection_outer}
505   );
506
507   return $sql;
508 }
509
510 =head2 GenericSubQ
511
512  SELECT * FROM (
513   SELECT ...
514  )
515  WHERE (
516   SELECT COUNT(*) FROM $original_table cnt WHERE cnt.id < $original_table.id
517  ) BETWEEN $offset AND ($offset+$rows-1)
518
519 This is the most evil limit "dialect" (more of a hack) for I<really> stupid
520 databases. It works by ordering the set by some unique column, and calculating
521 the amount of rows that have a less-er value (thus emulating a L</RowNum>-like
522 index). Of course this implies the set can only be ordered by a single unique
523 column.
524
525 Also note that this technique can be and often is B<excruciatingly slow>. You
526 may have much better luck using L<DBIx::Class::ResultSet/software_limit>
527 instead.
528
529 Currently used by B<Sybase ASE>, due to lack of any other option.
530
531 =cut
532 sub _GenericSubQ {
533   my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
534
535   my $main_rsrc = $rs_attrs->{result_source};
536
537   # Explicitly require an order_by
538   # GenSubQ is slow enough as it is, just emulating things
539   # like in other cases is not wise - make the user work
540   # to shoot their DBA in the foot
541   $self->throw_exception (
542     'Generic Subquery Limit does not work on resultsets without an order. Provide a stable, '
543   . 'main-table-based order criteria.'
544   ) unless $rs_attrs->{order_by};
545
546   my $usable_order_colinfo = $main_rsrc->storage->_extract_colinfo_of_stable_main_source_order_by_portion(
547     $rs_attrs
548   );
549
550   $self->throw_exception(
551     'Generic Subquery Limit can not work with order criteria based on sources other than the main one'
552   ) if (
553     ! keys %{$usable_order_colinfo||{}}
554       or
555     grep
556       { $_->{-source_alias} ne $rs_attrs->{alias} }
557       (values %$usable_order_colinfo)
558   );
559
560 ###
561 ###
562 ### we need to know the directions after we figured out the above - reextract *again*
563 ### this is eyebleed - trying to get it to work at first
564   my $supplied_order = delete $rs_attrs->{order_by};
565
566   my @order_bits = do {
567     local $self->{quote_char};
568     local $self->{order_bind};
569     map { ref $_ ? $_->[0] : $_ } $self->_order_by_chunks ($supplied_order)
570   };
571
572   # truncate to what we'll use
573   $#order_bits = ( (keys %$usable_order_colinfo) - 1 );
574
575   # @order_bits likely will come back quoted (due to how the prefetch
576   # rewriter operates
577   # Hence supplement the column_info lookup table with quoted versions
578   if ($self->quote_char) {
579     $usable_order_colinfo->{$self->_quote($_)} = $usable_order_colinfo->{$_}
580       for keys %$usable_order_colinfo;
581   }
582
583 # calculate the condition
584   my $count_tbl_alias = 'rownum__emulation';
585   my $main_alias = $rs_attrs->{alias};
586   my $main_tbl_name = $main_rsrc->name;
587
588   my (@unqualified_names, @qualified_names, @is_desc, @new_order_by);
589
590   for my $bit (@order_bits) {
591
592     ($bit, my $is_desc) = $self->_split_order_chunk($bit);
593
594     push @is_desc, $is_desc;
595     push @unqualified_names, $usable_order_colinfo->{$bit}{-colname};
596     push @qualified_names, $usable_order_colinfo->{$bit}{-fq_colname};
597
598     push @new_order_by, { ($is_desc ? '-desc' : '-asc') => $usable_order_colinfo->{$bit}{-fq_colname} };
599   };
600
601   my (@where_cond, @skip_colpair_stack);
602   for my $i (0 .. $#order_bits) {
603     my $ci = $usable_order_colinfo->{$order_bits[$i]};
604
605     my ($subq_col, $main_col) = map { "$_.$ci->{-colname}" } ($count_tbl_alias, $main_alias);
606     my $cur_cond = { $subq_col => { ($is_desc[$i] ? '>' : '<') => { -ident => $main_col } } };
607
608     push @skip_colpair_stack, [
609       { $main_col => { -ident => $subq_col } },
610     ];
611
612     # we can trust the nullability flag because
613     # we already used it during _id_col_set resolution
614     #
615     if ($ci->{is_nullable}) {
616       push @{$skip_colpair_stack[-1]}, { $main_col => undef, $subq_col=> undef };
617
618       $cur_cond = [
619         {
620           ($is_desc[$i] ? $subq_col : $main_col) => { '!=', undef },
621           ($is_desc[$i] ? $main_col : $subq_col) => undef,
622         },
623         {
624           $subq_col => { '!=', undef },
625           $main_col => { '!=', undef },
626           -and => $cur_cond,
627         },
628       ];
629     }
630
631     push @where_cond, { '-and', => [ @skip_colpair_stack[0..$i-1], $cur_cond ] };
632   }
633
634 # reuse the sqlmaker WHERE, this will not be returning binds
635   my $counted_where = do {
636     local $self->{where_bind};
637     $self->where(\@where_cond);
638   };
639
640 # construct the rownum condition by hand
641   my $rownum_cond;
642   if ($offset) {
643     $rownum_cond = 'BETWEEN ? AND ?';
644     push @{$self->{limit_bind}},
645       [ $self->__offset_bindtype => $offset ],
646       [ $self->__total_bindtype => $offset + $rows - 1]
647     ;
648   }
649   else {
650     $rownum_cond = '< ?';
651     push @{$self->{limit_bind}},
652       [ $self->__rows_bindtype => $rows ]
653     ;
654   }
655
656 # and what we will order by inside
657   my $inner_order_sql = do {
658     local $self->{order_bind};
659
660     my $s = $self->_order_by (\@new_order_by);
661
662     $self->throw_exception('Inner gensubq order may not contain binds... something went wrong')
663       if @{$self->{order_bind}};
664
665     $s;
666   };
667
668 ### resume originally scheduled programming
669 ###
670 ###
671
672   # we need to supply the order for the supplements to be properly calculated
673   my $sq_attrs = $self->_subqueried_limit_attrs (
674     $sql, { %$rs_attrs, order_by => \@new_order_by }
675   );
676
677   my $in_sel = $sq_attrs->{selection_inner};
678
679   # add the order supplement (if any) as this is what will be used for the outer WHERE
680   $in_sel .= ", $_" for sort keys %{$sq_attrs->{order_supplement}};
681
682   my $group_having_sql = $self->_parse_rs_attrs($rs_attrs);
683
684
685   return sprintf ("
686 SELECT $sq_attrs->{selection_outer}
687   FROM (
688     SELECT $in_sel $sq_attrs->{query_leftover}${group_having_sql}
689   ) %s
690 WHERE ( SELECT COUNT(*) FROM %s %s $counted_where ) $rownum_cond
691 $inner_order_sql
692   ", map { $self->_quote ($_) } (
693     $rs_attrs->{alias},
694     $main_tbl_name,
695     $count_tbl_alias,
696   ));
697 }
698
699
700 # !!! THIS IS ALSO HORRIFIC !!! /me ashamed
701 #
702 # Generates inner/outer select lists for various limit dialects
703 # which result in one or more subqueries (e.g. RNO, Top, RowNum)
704 # Any non-main-table columns need to have their table qualifier
705 # turned into a column alias (otherwise names in subqueries clash
706 # and/or lose their source table)
707 #
708 # Returns mangled proto-sql, inner/outer strings of SQL QUOTED selectors
709 # with aliases (to be used in whatever select statement), and an alias
710 # index hashref of QUOTED SEL => QUOTED ALIAS pairs (to maybe be used
711 # for string-subst higher up).
712 # If an order_by is supplied, the inner select needs to bring out columns
713 # used in implicit (non-selected) orders, and the order condition itself
714 # needs to be realiased to the proper names in the outer query. Thus we
715 # also return a hashref (order doesn't matter) of QUOTED EXTRA-SEL =>
716 # QUOTED ALIAS pairs, which is a list of extra selectors that do *not*
717 # exist in the original select list
718 sub _subqueried_limit_attrs {
719   my ($self, $proto_sql, $rs_attrs) = @_;
720
721   $self->throw_exception(
722     'Limit dialect implementation usable only in the context of DBIC (missing $rs_attrs)'
723   ) unless ref ($rs_attrs) eq 'HASH';
724
725   # mangle the input sql as we will be replacing the selector entirely
726   unless (
727     $rs_attrs->{_selector_sql}
728       and
729     $proto_sql =~ s/^ \s* SELECT \s* \Q$rs_attrs->{_selector_sql}//ix
730   ) {
731     $self->throw_exception("Unrecognizable SELECT: $proto_sql");
732   }
733
734   my ($re_sep, $re_alias) = map { quotemeta $_ } ( $self->{name_sep}, $rs_attrs->{alias} );
735
736   # correlate select and as, build selection index
737   my (@sel, $in_sel_index);
738   for my $i (0 .. $#{$rs_attrs->{select}}) {
739
740     my $s = $rs_attrs->{select}[$i];
741     my $sql_alias = (ref $s) eq 'HASH' ? $s->{-as} : undef;
742
743     # we throw away the @bind here deliberately
744     my ($sql_sel) = $self->_recurse_fields ($s);
745
746     push @sel, {
747       arg => $s,
748       sql => $sql_sel,
749       unquoted_sql => do {
750         local $self->{quote_char};
751         ($self->_recurse_fields ($s))[0]; # ignore binds again
752       },
753       as =>
754         $sql_alias
755           ||
756         $rs_attrs->{as}[$i]
757           ||
758         $self->throw_exception("Select argument $i ($s) without corresponding 'as'")
759       ,
760     };
761
762     # anything with a placeholder in it needs re-selection
763     $in_sel_index->{$sql_sel}++ unless $sql_sel =~ / (?: ^ | \W ) \? (?: \W | $ ) /x;
764
765     $in_sel_index->{$self->_quote ($sql_alias)}++ if $sql_alias;
766
767     # record unqualified versions too, so we do not have
768     # to reselect the same column twice (in qualified and
769     # unqualified form)
770     if (! ref $s && $sql_sel =~ / $re_sep (.+) $/x) {
771       $in_sel_index->{$1}++;
772     }
773   }
774
775
776   # re-alias and remove any name separators from aliases,
777   # unless we are dealing with the current source alias
778   # (which will transcend the subqueries as it is necessary
779   # for possible further chaining)
780   # same for anything we do not recognize
781   my ($sel, $renamed);
782   for my $node (@sel) {
783     push @{$sel->{original}}, $node->{sql};
784
785     if (
786       ! $in_sel_index->{$node->{sql}}
787         or
788       $node->{as} =~ / (?<! ^ $re_alias ) \. /x
789         or
790       $node->{unquoted_sql} =~ / (?<! ^ $re_alias ) $re_sep /x
791     ) {
792       $node->{as} = $self->_unqualify_colname($node->{as});
793       my $quoted_as = $self->_quote($node->{as});
794       push @{$sel->{inner}}, sprintf '%s AS %s', $node->{sql}, $quoted_as;
795       push @{$sel->{outer}}, $quoted_as;
796       $renamed->{$node->{sql}} = $quoted_as;
797     }
798     else {
799       push @{$sel->{inner}}, $node->{sql};
800       push @{$sel->{outer}}, $self->_quote (ref $node->{arg} ? $node->{as} : $node->{arg});
801     }
802   }
803
804   # see if the order gives us anything
805   my $extra_order_sel;
806   for my $chunk ($self->_order_by_chunks ($rs_attrs->{order_by})) {
807     # order with bind
808     $chunk = $chunk->[0] if (ref $chunk) eq 'ARRAY';
809     ($chunk) = $self->_split_order_chunk($chunk);
810
811     next if $in_sel_index->{$chunk};
812
813     $extra_order_sel->{$chunk} ||= $self->_quote (
814       'ORDER__BY__' . sprintf '%03d', scalar keys %{$extra_order_sel||{}}
815     );
816   }
817
818   return {
819     query_leftover => $proto_sql,
820     (map {( "selection_$_" => join (', ', @{$sel->{$_}} ) )} keys %$sel ),
821     outer_renames => $renamed,
822     order_supplement => $extra_order_sel,
823   };
824 }
825
826 sub _unqualify_colname {
827   my ($self, $fqcn) = @_;
828   $fqcn =~ s/ \. /__/xg;
829   return $fqcn;
830 }
831
832 =head1 FURTHER QUESTIONS?
833
834 Check the list of L<additional DBIC resources|DBIx::Class/GETTING HELP/SUPPORT>.
835
836 =head1 COPYRIGHT AND LICENSE
837
838 This module is free software L<copyright|DBIx::Class/COPYRIGHT AND LICENSE>
839 by the L<DBIx::Class (DBIC) authors|DBIx::Class/AUTHORS>. You can
840 redistribute it and/or modify it under the same terms as the
841 L<DBIx::Class library|DBIx::Class/COPYRIGHT AND LICENSE>.
842
843 =cut
844
845 1;