e538843c1aa00c14b2f78f86e5622dd47fd099d7
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / SQLMaker / Util.pm
1 package   #hide from PAUSE
2   DBIx::Class::SQLMaker::Util;
3
4 use strict;
5 use warnings;
6
7 use base 'Exporter';
8 our @EXPORT_OK = qw(
9   normalize_sqla_condition
10   extract_equality_conditions
11 );
12
13 use DBIx::Class::Carp;
14 use Carp 'croak';
15 use SQL::Abstract qw( is_literal_value is_plain_value );
16 use DBIx::Class::_Util qw( UNRESOLVABLE_CONDITION serialize dump_value );
17
18
19 # Attempts to flatten a passed in SQLA condition as much as possible towards
20 # a plain hashref, *without* altering its semantics.
21 #
22 # FIXME - while relatively robust, this is still imperfect, one of the first
23 # things to tackle when we get access to a formalized AST. Note that this code
24 # is covered by a *ridiculous* amount of tests, so starting with porting this
25 # code would be a rather good exercise
26 sub normalize_sqla_condition {
27   my ($where, $where_is_anded_array) = @_;
28
29   my $fin;
30
31   if (! $where) {
32     return;
33   }
34   elsif ($where_is_anded_array or ref $where eq 'HASH') {
35
36     my @pairs;
37
38     my @pieces = $where_is_anded_array ? @$where : $where;
39     while (@pieces) {
40       my $chunk = shift @pieces;
41
42       if (ref $chunk eq 'HASH') {
43         for (sort keys %$chunk) {
44
45           # Match SQLA 1.79 behavior
46           unless( length $_ ) {
47             is_literal_value($chunk->{$_})
48               ? carp 'Hash-pairs consisting of an empty string with a literal are deprecated, use -and => [ $literal ] instead'
49               : croak 'Supplying an empty left hand side argument is not supported in hash-pairs'
50             ;
51           }
52
53           push @pairs, $_ => $chunk->{$_};
54         }
55       }
56       elsif (ref $chunk eq 'ARRAY') {
57         push @pairs, -or => $chunk
58           if @$chunk;
59       }
60       elsif ( ! length ref $chunk) {
61
62         # Match SQLA 1.79 behavior
63         croak("Supplying an empty left hand side argument is not supported in array-pairs")
64           if $where_is_anded_array and (! defined $chunk or ! length $chunk);
65
66         push @pairs, $chunk, shift @pieces;
67       }
68       else {
69         push @pairs, '', $chunk;
70       }
71     }
72
73     return unless @pairs;
74
75     my @conds = _normalize_cond_unroll_pairs(\@pairs)
76       or return;
77
78     # Consolidate various @conds back into something more compact
79     for my $c (@conds) {
80       if (ref $c ne 'HASH') {
81         push @{$fin->{-and}}, $c;
82       }
83       else {
84         for my $col (sort keys %$c) {
85
86           # consolidate all -and nodes
87           if ($col =~ /^\-and$/i) {
88             push @{$fin->{-and}},
89               ref $c->{$col} eq 'ARRAY' ? @{$c->{$col}}
90             : ref $c->{$col} eq 'HASH' ? %{$c->{$col}}
91             : { $col => $c->{$col} }
92             ;
93           }
94           elsif ($col =~ /^\-/) {
95             push @{$fin->{-and}}, { $col => $c->{$col} };
96           }
97           elsif (exists $fin->{$col}) {
98             $fin->{$col} = [ -and => map {
99               (ref $_ eq 'ARRAY' and ($_->[0]||'') =~ /^\-and$/i )
100                 ? @{$_}[1..$#$_]
101                 : $_
102               ;
103             } ($fin->{$col}, $c->{$col}) ];
104           }
105           else {
106             $fin->{$col} = $c->{$col};
107           }
108         }
109       }
110     }
111   }
112   elsif (ref $where eq 'ARRAY') {
113     # we are always at top-level here, it is safe to dump empty *standalone* pieces
114     my $fin_idx;
115
116     for (my $i = 0; $i <= $#$where; $i++ ) {
117
118       # Match SQLA 1.79 behavior
119       croak(
120         "Supplying an empty left hand side argument is not supported in array-pairs"
121       ) if (! defined $where->[$i] or ! length $where->[$i]);
122
123       my $logic_mod = lc ( ($where->[$i] =~ /^(\-(?:and|or))$/i)[0] || '' );
124
125       if ($logic_mod) {
126         $i++;
127         croak("Unsupported top-level op/arg pair: [ $logic_mod => $where->[$i] ]")
128           unless ref $where->[$i] eq 'HASH' or ref $where->[$i] eq 'ARRAY';
129
130         my $sub_elt = normalize_sqla_condition({ $logic_mod => $where->[$i] })
131           or next;
132
133         my @keys = keys %$sub_elt;
134         if ( @keys == 1 and $keys[0] !~ /^\-/ ) {
135           $fin_idx->{ "COL_$keys[0]_" . serialize $sub_elt } = $sub_elt;
136         }
137         else {
138           $fin_idx->{ "SER_" . serialize $sub_elt } = $sub_elt;
139         }
140       }
141       elsif (! length ref $where->[$i] ) {
142         my $sub_elt = normalize_sqla_condition({ @{$where}[$i, $i+1] })
143           or next;
144
145         $fin_idx->{ "COL_$where->[$i]_" . serialize $sub_elt } = $sub_elt;
146         $i++;
147       }
148       else {
149         $fin_idx->{ "SER_" . serialize $where->[$i] } = normalize_sqla_condition( $where->[$i] ) || next;
150       }
151     }
152
153     if (! $fin_idx) {
154       return;
155     }
156     elsif ( keys %$fin_idx == 1 ) {
157       $fin = (values %$fin_idx)[0];
158     }
159     else {
160       my @or;
161
162       # at this point everything is at most one level deep - unroll if needed
163       for (sort keys %$fin_idx) {
164         if ( ref $fin_idx->{$_} eq 'HASH' and keys %{$fin_idx->{$_}} == 1 ) {
165           my ($l, $r) = %{$fin_idx->{$_}};
166
167           if (
168             ref $r eq 'ARRAY'
169               and
170             (
171               ( @$r == 1 and $l =~ /^\-and$/i )
172                 or
173               $l =~ /^\-or$/i
174             )
175           ) {
176             push @or, @$r
177           }
178
179           elsif (
180             ref $r eq 'HASH'
181               and
182             keys %$r == 1
183               and
184             $l =~ /^\-(?:and|or)$/i
185           ) {
186             push @or, %$r;
187           }
188
189           else {
190             push @or, $l, $r;
191           }
192         }
193         else {
194           push @or, $fin_idx->{$_};
195         }
196       }
197
198       $fin->{-or} = \@or;
199     }
200   }
201   else {
202     # not a hash not an array
203     $fin = { -and => [ $where ] };
204   }
205
206   # unroll single-element -and's
207   while (
208     $fin->{-and}
209       and
210     @{$fin->{-and}} < 2
211   ) {
212     my $and = delete $fin->{-and};
213     last if @$and == 0;
214
215     # at this point we have @$and == 1
216     if (
217       ref $and->[0] eq 'HASH'
218         and
219       ! grep { exists $fin->{$_} } keys %{$and->[0]}
220     ) {
221       $fin = {
222         %$fin, %{$and->[0]}
223       };
224     }
225     else {
226       $fin->{-and} = $and;
227       last;
228     }
229   }
230
231   # compress same-column conds found in $fin
232   for my $col ( grep { $_ !~ /^\-/ } keys %$fin ) {
233     next unless ref $fin->{$col} eq 'ARRAY' and ($fin->{$col}[0]||'') =~ /^\-and$/i;
234     my $val_bag = { map {
235       (! defined $_ )                          ? ( UNDEF => undef )
236     : ( ! length ref $_ or is_plain_value $_ ) ? ( "VAL_$_" => $_ )
237     : ( ( 'SER_' . serialize $_ ) => $_ )
238     } @{$fin->{$col}}[1 .. $#{$fin->{$col}}] };
239
240     if (keys %$val_bag == 1 ) {
241       ($fin->{$col}) = values %$val_bag;
242     }
243     else {
244       $fin->{$col} = [ -and => map { $val_bag->{$_} } sort keys %$val_bag ];
245     }
246   }
247
248   return keys %$fin ? $fin : ();
249 }
250
251 sub _normalize_cond_unroll_pairs {
252   my $pairs = shift;
253
254   my @conds;
255
256   while (@$pairs) {
257     my ($lhs, $rhs) = splice @$pairs, 0, 2;
258
259     if (! length $lhs) {
260       push @conds, normalize_sqla_condition($rhs);
261     }
262     elsif ( $lhs =~ /^\-and$/i ) {
263       push @conds, normalize_sqla_condition($rhs, (ref $rhs eq 'ARRAY'));
264     }
265     elsif ( $lhs =~ /^\-or$/i ) {
266       push @conds, normalize_sqla_condition(
267         (ref $rhs eq 'HASH') ? [ map { $_ => $rhs->{$_} } sort keys %$rhs ] : $rhs
268       );
269     }
270     else {
271       if (ref $rhs eq 'HASH' and ! keys %$rhs) {
272         # FIXME - SQLA seems to be doing... nothing...?
273       }
274       # normalize top level -ident, for saner extract_fixed_condition_columns code
275       elsif (ref $rhs eq 'HASH' and keys %$rhs == 1 and exists $rhs->{-ident}) {
276         push @conds, { $lhs => { '=', $rhs } };
277       }
278       elsif (ref $rhs eq 'HASH' and keys %$rhs == 1 and exists $rhs->{-value} and is_plain_value $rhs->{-value}) {
279         push @conds, { $lhs => $rhs->{-value} };
280       }
281       elsif (ref $rhs eq 'HASH' and keys %$rhs == 1 and exists $rhs->{'='}) {
282         if ( length ref $rhs->{'='} and is_literal_value $rhs->{'='} ) {
283           push @conds, { $lhs => $rhs };
284         }
285         else {
286           for my $p (_normalize_cond_unroll_pairs([ $lhs => $rhs->{'='} ])) {
287
288             # extra sanity check
289             if (keys %$p > 1) {
290               local $Data::Dumper::Deepcopy = 1;
291               croak(
292                 "Internal error: unexpected collapse unroll:"
293               . dump_value { in => { $lhs => $rhs }, out => $p }
294               );
295             }
296
297             my ($l, $r) = %$p;
298
299             push @conds, (
300               ! length ref $r
301                 or
302               # the unroller recursion may return a '=' prepended value already
303               ref $r eq 'HASH' and keys %$rhs == 1 and exists $rhs->{'='}
304                 or
305               is_plain_value($r)
306             )
307               ? { $l => $r }
308               : { $l => { '=' => $r } }
309             ;
310           }
311         }
312       }
313       elsif (ref $rhs eq 'ARRAY') {
314         # some of these conditionals encounter multi-values - roll them out using
315         # an unshift, which will cause extra looping in the while{} above
316         if (! @$rhs ) {
317           push @conds, { $lhs => [] };
318         }
319         elsif ( ($rhs->[0]||'') =~ /^\-(?:and|or)$/i ) {
320           croak("Value modifier not followed by any values: $lhs => [ $rhs->[0] ] ")
321             if @$rhs == 1;
322
323           if( $rhs->[0] =~ /^\-and$/i ) {
324             unshift @$pairs, map { $lhs => $_ } @{$rhs}[1..$#$rhs];
325           }
326           # if not an AND then it's an OR
327           elsif(@$rhs == 2) {
328             unshift @$pairs, $lhs => $rhs->[1];
329           }
330           else {
331             push @conds, { $lhs => [ @{$rhs}[1..$#$rhs] ] };
332           }
333         }
334         elsif (@$rhs == 1) {
335           unshift @$pairs, $lhs => $rhs->[0];
336         }
337         else {
338           push @conds, { $lhs => $rhs };
339         }
340       }
341       # unroll func + { -value => ... }
342       elsif (
343         ref $rhs eq 'HASH'
344           and
345         ( my ($subop) = keys %$rhs ) == 1
346           and
347         length ref ((values %$rhs)[0])
348           and
349         my $vref = is_plain_value( (values %$rhs)[0] )
350       ) {
351         push @conds, (
352           (length ref $$vref)
353             ? { $lhs => $rhs }
354             : { $lhs => { $subop => $$vref } }
355         );
356       }
357       else {
358         push @conds, { $lhs => $rhs };
359       }
360     }
361   }
362
363   return @conds;
364 }
365
366 # Analyzes a given condition and attempts to extract all columns
367 # with a definitive fixed-condition criteria. Returns a hashref
368 # of k/v pairs suitable to be passed to set_columns(), with a
369 # MAJOR CAVEAT - multi-value (contradictory) equalities are still
370 # represented as a reference to the UNRESOVABLE_CONDITION constant
371 # The reason we do this is that some codepaths only care about the
372 # codition being stable, as opposed to actually making sense
373 #
374 # The normal mode is used to figure out if a resultset is constrained
375 # to a column which is part of a unique constraint, which in turn
376 # allows us to better predict how ordering will behave etc.
377 #
378 # With the optional "consider_nulls" boolean argument, the function
379 # is instead used to infer inambiguous values from conditions
380 # (e.g. the inheritance of resultset conditions on new_result)
381 #
382 sub extract_equality_conditions {
383   my ($where, $consider_nulls) = @_;
384   my $where_hash = normalize_sqla_condition($where);
385
386   my $res = {};
387   my ($c, $v);
388   for $c (keys %$where_hash) {
389     my $vals;
390
391     if (!defined ($v = $where_hash->{$c}) ) {
392       $vals->{UNDEF} = $v if $consider_nulls
393     }
394     elsif (
395       ref $v eq 'HASH'
396         and
397       keys %$v == 1
398     ) {
399       if (exists $v->{-value}) {
400         if (defined $v->{-value}) {
401           $vals->{"VAL_$v->{-value}"} = $v->{-value}
402         }
403         elsif( $consider_nulls ) {
404           $vals->{UNDEF} = $v->{-value};
405         }
406       }
407       # do not need to check for plain values - normalize_sqla_condition did it for us
408       elsif(
409         length ref $v->{'='}
410           and
411         (
412           ( ref $v->{'='} eq 'HASH' and keys %{$v->{'='}} == 1 and exists $v->{'='}{-ident} )
413             or
414           is_literal_value($v->{'='})
415         )
416        ) {
417         $vals->{ 'SER_' . serialize $v->{'='} } = $v->{'='};
418       }
419     }
420     elsif (
421       ! length ref $v
422         or
423       is_plain_value ($v)
424     ) {
425       $vals->{"VAL_$v"} = $v;
426     }
427     elsif (ref $v eq 'ARRAY' and ($v->[0]||'') eq '-and') {
428       for ( @{$v}[1..$#$v] ) {
429         my $subval = extract_equality_conditions({ $c => $_ }, 'consider nulls');  # always fish nulls out on recursion
430         next unless exists $subval->{$c};  # didn't find anything
431         $vals->{
432           ! defined $subval->{$c}                                        ? 'UNDEF'
433         : ( ! length ref $subval->{$c} or is_plain_value $subval->{$c} ) ? "VAL_$subval->{$c}"
434         : ( 'SER_' . serialize $subval->{$c} )
435         } = $subval->{$c};
436       }
437     }
438
439     if (keys %$vals == 1) {
440       ($res->{$c}) = (values %$vals)
441         unless !$consider_nulls and exists $vals->{UNDEF};
442     }
443     elsif (keys %$vals > 1) {
444       $res->{$c} = UNRESOLVABLE_CONDITION;
445     }
446   }
447
448   $res;
449 }
450
451 1;