fix condition extraction for new_result
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / SQLMaker / Converter.pm
CommitLineData
10cef607 1package DBIx::Class::SQLMaker::Converter;
2
d94b8193 3use Data::Query::Constants;
4use Data::Query::ExprHelpers;
10cef607 5use Moo;
6use namespace::clean;
7
8extends 'SQL::Abstract::Converter';
9
28be821e 10has limit_dialect => (is => 'ro', required => 1);
11has name_sep => (is => 'ro', required => 1);
12has slice_stability => (is => 'ro', required => 1);
13has slice_subquery => (is => 'ro', required => 1);
14
15sub __max_int () { 0x7FFFFFFF }
16
17# Handle limit-dialect selection
18sub _select_attrs {
19 my ($self, $table, $fields, $where, $rs_attrs, $limit, $offset) = @_;
20
21 if (defined $offset) {
22 die('A supplied offset must be a non-negative integer')
23 if ( $offset =~ /\D/ or $offset < 0 );
24 }
25 $offset ||= 0;
26
27 if (defined $limit) {
28 die('A supplied limit must be a positive integer')
29 if ( $limit =~ /\D/ or $limit <= 0 );
30 }
31 elsif ($offset) {
32 $limit = $self->__max_int;
33 }
34
35 my %final_attrs = (%{$rs_attrs||{}}, limit => $limit, offset => $offset);
36
37 if ($limit or $offset) {
38 my %slice_stability = %{$self->slice_stability};
39
40 if (my $stability = $slice_stability{$offset ? 'offset' : 'limit'}) {
41 my $source = $rs_attrs->{_rsroot_rsrc};
42 unless (
43 $final_attrs{order_is_stable}
44 = $final_attrs{preserve_order}
45 = $source->schema->storage
46 ->_order_by_is_stable(
47 @final_attrs{qw(from order_by where)}
48 )
49 ) {
50 if ($stability eq 'requires') {
51 if ($self->_order_by_to_dq($final_attrs{order_by})) {
52 die(
53 $self->limit_dialect.' limit/offset implementation requires a stable order for '.($offset ? 'offset' : 'limit')
54 );
55 }
56 if (my $ident_cols = $source->_identifying_column_set) {
57 $final_attrs{order_by} = [
58 map "$final_attrs{alias}.$_", @$ident_cols
59 ];
60 $final_attrs{order_is_stable} = 1;
61 } else {
62 die(sprintf(
63 'Unable to auto-construct stable order criteria for "skimming type"
64 limit '
65 . "dialect based on source '%s'", $source->name) );
66 }
67 }
68 }
69
70 }
71
72 my %slice_subquery = %{$self->slice_subquery};
73
74 if (my $subquery = $slice_subquery{$offset ? 'offset' : 'limit'}) {
75 $fields = [ map {
76 my $f = $fields->[$_];
77 if (ref $f) {
78 $f = { '' => $f } unless ref($f) eq 'HASH';
79 ($f->{-as} ||= $final_attrs{as}[$_]) =~ s/\Q${\$self->name_sep}/__/g;
80 } elsif ($f !~ /^\Q$final_attrs{alias}${\$self->name_sep}/) {
81 $f = { '' => $f };
82 ($f->{-as} ||= $final_attrs{as}[$_]) =~ s/\Q${\$self->name_sep}/__/g;
83 }
84 $f;
85 } 0 .. $#$fields ];
86 }
d94b8193 87
28be821e 88 }
89
90 return ($fields, \%final_attrs);
91}
92
10cef607 93around _select_to_dq => sub {
94 my ($orig, $self) = (shift, shift);
28be821e 95 my ($table, undef, $where) = @_;
96 my ($fields, $attrs) = $self->_select_attrs(@_);
97 my $orig_dq = $self->$orig($table, $fields, $where, $attrs->{order_by}, $attrs);
10cef607 98 return $orig_dq unless $attrs->{limit};
d94b8193 99 if ($self->limit_dialect eq 'GenericSubquery') {
100 my $col_info = $attrs->{_rsroot_rsrc}->columns_info;
101 scan_dq_nodes({
102 DQ_ORDER ,=> sub {
103 if (
104 is_Identifier($_[0]->{by})
105 and (
106 (@{$_[0]->{by}{elements}} == 2
107 and $_[0]->{by}{elements}[0] eq $attrs->{alias})
108 or (@{$_[0]->{by}{elements}} == 1))
109 ) {
110 my $this_col = $col_info->{$_[0]->{by}{elements}[-1]};
111 if ($this_col and not $this_col->{is_nullable}) {
112 $_[0]->{nulls} = 'none'
113 }
114 }
115 }
116 }, $orig_dq);
117 }
10cef607 118 +{
119 type => DQ_SLICE,
120 from => $orig_dq,
121 limit => do {
122 local $SQL::Abstract::Converter::Cur_Col_Meta
123 = { sqlt_datatype => 'integer' };
124 $self->_value_to_dq($attrs->{limit})
125 },
126 ($attrs->{offset}
127 ? (offset => do {
128 local $SQL::Abstract::Converter::Cur_Col_Meta
129 = { sqlt_datatype => 'integer' };
130 $self->_value_to_dq($attrs->{offset})
131 })
132 : ()
133 ),
134 ($attrs->{order_is_stable}
135 ? (order_is_stable => 1)
136 : ()),
137 ($attrs->{preserve_order}
138 ? (preserve_order => 1)
139 : ())
140 };
141};
142
143around _select_field_to_dq => sub {
144 my ($orig, $self) = (shift, shift);
145 my ($field) = @_;
146 my $ref = ref $field;
147 if ($ref eq 'HASH') {
148 my %hash = %$field; # shallow copy
149
150 my $as = delete $hash{-as}; # if supplied
151
152 my ($func, $args, @toomany) = %hash;
153
154 # there should be only one pair
155 if (@toomany) {
156 die( "Malformed select argument - too many keys in hash: " . join (',', keys %$field ) );
157 }
158
159 if (lc ($func) eq 'distinct' && ref $args eq 'ARRAY' && @$args > 1) {
160 die(
161 'The select => { distinct => ... } syntax is not supported for multiple columns.'
162 .' Instead please use { group_by => [ qw/' . (join ' ', @$args) . '/ ] }'
163 .' or { select => [ qw/' . (join ' ', @$args) . '/ ], distinct => 1 }'
164 );
165 }
166
167 my $field_dq = do {
168 if ($func) {
169 $self->_op_to_dq(
170 apply => $self->_ident_to_dq(uc($func)),
171 @{$self->_select_field_list_to_dq($args)},
172 );
173 } else {
174 $self->_select_field_to_dq($args);
175 }
176 };
177
178 return $field_dq unless $as;
179
180 return +{
181 type => DQ_ALIAS,
182 from => $field_dq,
183 to => $as
184 };
185 } else {
186 return $self->$orig(@_);
187 }
188};
189
190around _source_to_dq => sub {
191 my ($orig, $self) = (shift, shift);
192 my $attrs = $_[4]; # table, fields, where, order, attrs
193 my $start_dq = $self->$orig(@_);
194 # if we have HAVING but no GROUP BY we render an empty DQ_GROUP
195 # node, which causes DQ to recognise the HAVING as being what it is.
196 # This ... is kinda bull. But that's how HAVING is specified.
197 return $start_dq unless $attrs->{group_by} or $attrs->{having};
198 my $grouped_dq = $self->_group_by_to_dq($attrs->{group_by}||[], $start_dq);
199 return $grouped_dq unless $attrs->{having};
200 +{
201 type => DQ_WHERE,
202 from => $grouped_dq,
203 where => $self->_where_to_dq($attrs->{having})
204 };
205};
206
207sub _group_by_to_dq {
208 my ($self, $group, $from) = @_;
209 +{
210 type => DQ_GROUP,
211 by => $self->_select_field_list_to_dq($group),
212 from => $from,
213 };
214}
215
216around _table_to_dq => sub {
217 my ($orig, $self) = (shift, shift);
218 my ($spec) = @_;
219 if (my $ref = ref $spec ) {
220 if ($ref eq 'ARRAY') {
221 return $self->_join_to_dq(@$spec);
222 }
223 elsif ($ref eq 'HASH') {
224 my ($as, $table, $toomuch) = ( map
225 { $_ => $spec->{$_} }
226 ( grep { $_ !~ /^\-/ } keys %$spec )
227 );
228 die "Only one table/as pair expected in from-spec but an exra '$toomuch' key present"
229 if defined $toomuch;
230
231 return +{
232 type => DQ_ALIAS,
233 from => $self->_table_to_dq($table),
234 to => $as,
235 ($spec->{-rsrc}
236 ? (
237 'dbix-class.source_name' => $spec->{-rsrc}->source_name,
238 'dbix-class.join_path' => $spec->{-join_path},
239 'dbix-class.is_single' => $spec->{-is_single},
240 )
241 : ()
242 )
243 };
244 }
245 }
246 return $self->$orig(@_);
247};
248
249sub _join_to_dq {
250 my ($self, $from, @joins) = @_;
251
252 my $cur_dq = $self->_table_to_dq($from);
253
254 if (!@joins or @joins == 1 and ref($joins[0]) eq 'HASH') {
255 return $cur_dq;
256 }
257
258 foreach my $join (@joins) {
259 my ($to, $on) = @$join;
260
261 # check whether a join type exists
262 my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
263 my $join_type;
264 if (ref($to_jt) eq 'HASH' and defined($to_jt->{-join_type})) {
265 $join_type = lc($to_jt->{-join_type});
266 $join_type =~ s/^\s+ | \s+$//xg;
267 undef($join_type) unless $join_type =~ s/^(left|right).*/$1/;
268 }
269
270 $cur_dq = +{
271 type => DQ_JOIN,
272 ($join_type ? (outer => $join_type) : ()),
273 left => $cur_dq,
274 right => $self->_table_to_dq($to),
275 ($on
276 ? (on => $self->_expr_to_dq($self->_expand_join_condition($on)))
277 : ()),
278 };
279 }
280
281 return $cur_dq;
282}
283
284sub _expand_join_condition {
285 my ($self, $cond) = @_;
286
287 # Backcompat for the old days when a plain hashref
288 # { 't1.col1' => 't2.col2' } meant ON t1.col1 = t2.col2
289 # Once things settle we should start warning here so that
290 # folks unroll their hacks
291 if (
292 ref $cond eq 'HASH'
293 and
294 keys %$cond == 1
295 and
296 (keys %$cond)[0] =~ /\./
297 and
298 ! ref ( (values %$cond)[0] )
299 ) {
300 return +{ keys %$cond => { -ident => values %$cond } }
301 }
302 elsif ( ref $cond eq 'ARRAY' ) {
303 return [ map $self->_expand_join_condition($_), @$cond ];
304 }
305
306 return $cond;
307}
308
309around _bind_to_dq => sub {
310 my ($orig, $self) = (shift, shift);
311 my @args = do {
312 if ($self->bind_meta) {
313 map { ref($_) eq 'ARRAY' ? $_ : [ {} => $_ ] } @_
314 } else {
315 @_
316 }
317 };
318 return $self->$orig(@args);
319};
320
3211;
322
323=head1 OPERATORS
324
325=head2 -ident
326
327Used to explicitly specify an SQL identifier. Takes a plain string as value
328which is then invariably treated as a column name (and is being properly
329quoted if quoting has been requested). Most useful for comparison of two
330columns:
331
332 my %where = (
333 priority => { '<', 2 },
334 requestor => { -ident => 'submitter' }
335 );
336
337which results in:
338
339 $stmt = 'WHERE "priority" < ? AND "requestor" = "submitter"';
340 @bind = ('2');
341
342=head2 -value
343
344The -value operator signals that the argument to the right is a raw bind value.
345It will be passed straight to DBI, without invoking any of the SQL::Abstract
346condition-parsing logic. This allows you to, for example, pass an array as a
347column value for databases that support array datatypes, e.g.:
348
349 my %where = (
350 array => { -value => [1, 2, 3] }
351 );
352
353which results in:
354
355 $stmt = 'WHERE array = ?';
356 @bind = ([1, 2, 3]);
357
358=head1 AUTHORS
359
360See L<DBIx::Class/CONTRIBUTORS>.
361
362=head1 LICENSE
363
364You may distribute this code under the same terms as Perl itself.
365
366=cut