Add an explicit deduplication of identical condition in cond normalizer
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / SQLMaker / Util.pm
1 package   #hide from PAUSE
2   DBIx::Class::SQLMaker::Util;
3
4 use strict;
5 use warnings;
6
7 use base 'Exporter';
8 our @EXPORT_OK = qw(
9   normalize_sqla_condition
10   extract_equality_conditions
11 );
12
13 use DBIx::Class::Carp;
14 use Carp 'croak';
15 use SQL::Abstract qw( is_literal_value is_plain_value );
16 use DBIx::Class::_Util qw( UNRESOLVABLE_CONDITION dump_value modver_gt_or_eq );
17
18 # Can not use DBIx::Class::_Util::serialize as it is based on
19 # Storable and leaks through differences between PVIV and an identical IV
20 # Since SQLA itself is lossy in this regard (it does not make proper copies
21 # for efficiency) one could end up in a situation where semantically
22 # identical values aren't treated as such
23 my $dd_obj;
24 sub lax_serialize ($) {
25   my $dump_str = (
26     $dd_obj
27       ||=
28     do {
29       require Data::Dumper;
30
31       # Warnings without this on early loads under -w
32       # Why? Because fuck me, that's why :/
33       local $Data::Dumper::Indent = 0
34         unless defined $Data::Dumper::Indent;
35
36       # Make sure each option is spelled out with a value, so that
37       # global environment changes can not override any of these
38       # between two serialization calls
39       #
40       my $d = Data::Dumper->new([])
41         ->Indent('0')
42         ->Purity(0)
43         ->Pad('')
44         ->Useqq(0)
45         ->Terse(1)
46         ->Freezer('')
47         ->Toaster('')
48         ->Deepcopy(0)
49         ->Quotekeys(0)
50         ->Bless('bless')
51         ->Pair(' => ')
52         ->Maxdepth(0)
53         ->Useperl(0)
54         ->Sortkeys(1)
55         ->Deparse(0)
56       ;
57
58       # FIXME - this is kinda ridiculous - there ought to be a
59       # Data::Dumper->new_with_defaults or somesuch...
60       #
61       if( modver_gt_or_eq ( 'Data::Dumper', '2.136' ) ) {
62         $d->Sparseseen(1);
63
64         if( modver_gt_or_eq ( 'Data::Dumper', '2.153' ) ) {
65           $d->Maxrecurse(1000);
66
67           if( modver_gt_or_eq ( 'Data::Dumper', '2.160' ) ) {
68             $d->Trailingcomma(0);
69           }
70         }
71       }
72
73       $d;
74     }
75   )->Values([$_[0]])->Dump;
76
77   $dd_obj->Reset->Values([]);
78
79   $dump_str;
80 }
81
82
83 # Attempts to flatten a passed in SQLA condition as much as possible towards
84 # a plain hashref, *without* altering its semantics.
85 #
86 # FIXME - while relatively robust, this is still imperfect, one of the first
87 # things to tackle when we get access to a formalized AST. Note that this code
88 # is covered by a *ridiculous* amount of tests, so starting with porting this
89 # code would be a rather good exercise
90 sub normalize_sqla_condition {
91   my ($where, $where_is_anded_array) = @_;
92
93   my $fin;
94
95   if (! $where) {
96     return;
97   }
98   elsif ($where_is_anded_array or ref $where eq 'HASH') {
99
100     my @pairs;
101
102     my @pieces = $where_is_anded_array ? @$where : $where;
103     while (@pieces) {
104       my $chunk = shift @pieces;
105
106       if (ref $chunk eq 'HASH') {
107         for (sort keys %$chunk) {
108
109           # Match SQLA 1.79 behavior
110           unless( length $_ ) {
111             is_literal_value($chunk->{$_})
112               ? carp 'Hash-pairs consisting of an empty string with a literal are deprecated, use -and => [ $literal ] instead'
113               : croak 'Supplying an empty left hand side argument is not supported in hash-pairs'
114             ;
115           }
116
117           push @pairs, $_ => $chunk->{$_};
118         }
119       }
120       elsif (ref $chunk eq 'ARRAY') {
121         push @pairs, -or => $chunk
122           if @$chunk;
123       }
124       elsif ( ! length ref $chunk) {
125
126         # Match SQLA 1.79 behavior
127         croak("Supplying an empty left hand side argument is not supported in array-pairs")
128           if $where_is_anded_array and (! defined $chunk or ! length $chunk);
129
130         push @pairs, $chunk, shift @pieces;
131       }
132       else {
133         push @pairs, '', $chunk;
134       }
135     }
136
137     return unless @pairs;
138
139     my @conds = _normalize_cond_unroll_pairs(\@pairs)
140       or return;
141
142     # Consolidate various @conds back into something more compact
143     for my $c (@conds) {
144       if (ref $c ne 'HASH') {
145         push @{$fin->{-and}}, $c;
146       }
147       else {
148         for my $col (keys %$c) {
149
150           # consolidate all -and nodes
151           if ($col =~ /^\-and$/i) {
152             push @{$fin->{-and}},
153               ref $c->{$col} eq 'ARRAY' ? @{$c->{$col}}
154             : ref $c->{$col} eq 'HASH' ? %{$c->{$col}}
155             : { $col => $c->{$col} }
156             ;
157           }
158           elsif ($col =~ /^\-/) {
159             push @{$fin->{-and}}, { $col => $c->{$col} };
160           }
161           elsif (exists $fin->{$col}) {
162             $fin->{$col} = [ -and => map {
163               (ref $_ eq 'ARRAY' and ($_->[0]||'') =~ /^\-and$/i )
164                 ? @{$_}[1..$#$_]
165                 : $_
166               ;
167             } ($fin->{$col}, $c->{$col}) ];
168           }
169           else {
170             $fin->{$col} = $c->{$col};
171           }
172         }
173       }
174     }
175
176     # a deduplication (and sort) pass on all individual -and/-or members
177     for my $op (qw( -and -or )) {
178       if( @{ $fin->{$op} || [] } > 1 ) {
179         my $seen_chunks = { map {
180           lax_serialize($_) => $_
181         } @{$fin->{$op}} };
182
183         $fin->{$op} = [ @{$seen_chunks}{ sort keys %$seen_chunks } ];
184       }
185     }
186   }
187   elsif (ref $where eq 'ARRAY') {
188     # we are always at top-level here, it is safe to dump empty *standalone* pieces
189     my $fin_idx;
190
191     for (my $i = 0; $i <= $#$where; $i++ ) {
192
193       # Match SQLA 1.79 behavior
194       croak(
195         "Supplying an empty left hand side argument is not supported in array-pairs"
196       ) if (! defined $where->[$i] or ! length $where->[$i]);
197
198       my $logic_mod = lc ( ($where->[$i] =~ /^(\-(?:and|or))$/i)[0] || '' );
199
200       if ($logic_mod) {
201         $i++;
202         croak("Unsupported top-level op/arg pair: [ $logic_mod => $where->[$i] ]")
203           unless ref $where->[$i] eq 'HASH' or ref $where->[$i] eq 'ARRAY';
204
205         my $sub_elt = normalize_sqla_condition({ $logic_mod => $where->[$i] })
206           or next;
207
208         my @keys = keys %$sub_elt;
209         if ( @keys == 1 and $keys[0] !~ /^\-/ ) {
210           $fin_idx->{ "COL_$keys[0]_" . lax_serialize $sub_elt } = $sub_elt;
211         }
212         else {
213           $fin_idx->{ "SER_" . lax_serialize $sub_elt } = $sub_elt;
214         }
215       }
216       elsif (! length ref $where->[$i] ) {
217         my $sub_elt = normalize_sqla_condition({ @{$where}[$i, $i+1] })
218           or next;
219
220         $fin_idx->{ "COL_$where->[$i]_" . lax_serialize $sub_elt } = $sub_elt;
221         $i++;
222       }
223       else {
224         $fin_idx->{ "SER_" . lax_serialize $where->[$i] } = normalize_sqla_condition( $where->[$i] ) || next;
225       }
226     }
227
228     if (! $fin_idx) {
229       return;
230     }
231     elsif ( keys %$fin_idx == 1 ) {
232       $fin = (values %$fin_idx)[0];
233     }
234     else {
235       my @or;
236
237       # at this point everything is at most one level deep - unroll if needed
238       for (sort keys %$fin_idx) {
239         if ( ref $fin_idx->{$_} eq 'HASH' and keys %{$fin_idx->{$_}} == 1 ) {
240           my ($l, $r) = %{$fin_idx->{$_}};
241
242           if (
243             ref $r eq 'ARRAY'
244               and
245             (
246               ( @$r == 1 and $l =~ /^\-and$/i )
247                 or
248               $l =~ /^\-or$/i
249             )
250           ) {
251             push @or, @$r
252           }
253
254           elsif (
255             ref $r eq 'HASH'
256               and
257             keys %$r == 1
258               and
259             $l =~ /^\-(?:and|or)$/i
260           ) {
261             push @or, %$r;
262           }
263
264           else {
265             push @or, $l, $r;
266           }
267         }
268         else {
269           push @or, $fin_idx->{$_};
270         }
271       }
272
273       $fin->{-or} = \@or;
274     }
275   }
276   else {
277     # not a hash not an array
278     $fin = { -and => [ $where ] };
279   }
280
281   # unroll single-element -and's
282   while (
283     $fin->{-and}
284       and
285     @{$fin->{-and}} < 2
286   ) {
287     my $and = delete $fin->{-and};
288     last if @$and == 0;
289
290     # at this point we have @$and == 1
291     if (
292       ref $and->[0] eq 'HASH'
293         and
294       ! grep { exists $fin->{$_} } keys %{$and->[0]}
295     ) {
296       $fin = {
297         %$fin, %{$and->[0]}
298       };
299     }
300     else {
301       $fin->{-and} = $and;
302       last;
303     }
304   }
305
306   # compress same-column conds found in $fin
307   for my $col ( grep { $_ !~ /^\-/ } keys %$fin ) {
308     next unless ref $fin->{$col} eq 'ARRAY' and ($fin->{$col}[0]||'') =~ /^\-and$/i;
309     my $val_bag = { map {
310       (! defined $_ )                          ? ( UNDEF => undef )
311     : ( ! length ref $_ or is_plain_value $_ ) ? ( "VAL_$_" => $_ )
312     : ( ( 'SER_' . lax_serialize $_ ) => $_ )
313     } @{$fin->{$col}}[1 .. $#{$fin->{$col}}] };
314
315     if (keys %$val_bag == 1 ) {
316       ($fin->{$col}) = values %$val_bag;
317     }
318     else {
319       $fin->{$col} = [ -and => map { $val_bag->{$_} } sort keys %$val_bag ];
320     }
321   }
322
323   return keys %$fin ? $fin : ();
324 }
325
326 sub _normalize_cond_unroll_pairs {
327   my $pairs = shift;
328
329   my @conds;
330
331   while (@$pairs) {
332     my ($lhs, $rhs) = splice @$pairs, 0, 2;
333
334     if (! length $lhs) {
335       push @conds, normalize_sqla_condition($rhs);
336     }
337     elsif ( $lhs =~ /^\-and$/i ) {
338       push @conds, normalize_sqla_condition($rhs, (ref $rhs eq 'ARRAY'));
339     }
340     elsif ( $lhs =~ /^\-or$/i ) {
341       push @conds, normalize_sqla_condition(
342         (ref $rhs eq 'HASH') ? [ map { $_ => $rhs->{$_} } sort keys %$rhs ] : $rhs
343       );
344     }
345     else {
346       if (ref $rhs eq 'HASH' and ! keys %$rhs) {
347         # FIXME - SQLA seems to be doing... nothing...?
348       }
349       # normalize top level -ident, for saner extract_fixed_condition_columns code
350       elsif (ref $rhs eq 'HASH' and keys %$rhs == 1 and exists $rhs->{-ident}) {
351         push @conds, { $lhs => { '=', $rhs } };
352       }
353       elsif (ref $rhs eq 'HASH' and keys %$rhs == 1 and exists $rhs->{-value} and is_plain_value $rhs->{-value}) {
354         push @conds, { $lhs => $rhs->{-value} };
355       }
356       elsif (ref $rhs eq 'HASH' and keys %$rhs == 1 and exists $rhs->{'='}) {
357         if ( length ref $rhs->{'='} and is_literal_value $rhs->{'='} ) {
358           push @conds, { $lhs => $rhs };
359         }
360         else {
361           for my $p (_normalize_cond_unroll_pairs([ $lhs => $rhs->{'='} ])) {
362
363             # extra sanity check
364             if (keys %$p > 1) {
365               local $Data::Dumper::Deepcopy = 1;
366               croak(
367                 "Internal error: unexpected collapse unroll:"
368               . dump_value { in => { $lhs => $rhs }, out => $p }
369               );
370             }
371
372             my ($l, $r) = %$p;
373
374             push @conds, (
375               ! length ref $r
376                 or
377               # the unroller recursion may return a '=' prepended value already
378               ref $r eq 'HASH' and keys %$rhs == 1 and exists $rhs->{'='}
379                 or
380               is_plain_value($r)
381             )
382               ? { $l => $r }
383               : { $l => { '=' => $r } }
384             ;
385           }
386         }
387       }
388       elsif (ref $rhs eq 'ARRAY') {
389         # some of these conditionals encounter multi-values - roll them out using
390         # an unshift, which will cause extra looping in the while{} above
391         if (! @$rhs ) {
392           push @conds, { $lhs => [] };
393         }
394         elsif ( ($rhs->[0]||'') =~ /^\-(?:and|or)$/i ) {
395           croak("Value modifier not followed by any values: $lhs => [ $rhs->[0] ] ")
396             if @$rhs == 1;
397
398           if( $rhs->[0] =~ /^\-and$/i ) {
399             unshift @$pairs, map { $lhs => $_ } @{$rhs}[1..$#$rhs];
400           }
401           # if not an AND then it's an OR
402           elsif(@$rhs == 2) {
403             unshift @$pairs, $lhs => $rhs->[1];
404           }
405           else {
406             push @conds, { $lhs => [ @{$rhs}[1..$#$rhs] ] };
407           }
408         }
409         elsif (@$rhs == 1) {
410           unshift @$pairs, $lhs => $rhs->[0];
411         }
412         else {
413           push @conds, { $lhs => $rhs };
414         }
415       }
416       # unroll func + { -value => ... }
417       elsif (
418         ref $rhs eq 'HASH'
419           and
420         ( my ($subop) = keys %$rhs ) == 1
421           and
422         length ref ((values %$rhs)[0])
423           and
424         my $vref = is_plain_value( (values %$rhs)[0] )
425       ) {
426         push @conds, (
427           (length ref $$vref)
428             ? { $lhs => $rhs }
429             : { $lhs => { $subop => $$vref } }
430         );
431       }
432       else {
433         push @conds, { $lhs => $rhs };
434       }
435     }
436   }
437
438   return @conds;
439 }
440
441 # Analyzes a given condition and attempts to extract all columns
442 # with a definitive fixed-condition criteria. Returns a hashref
443 # of k/v pairs suitable to be passed to set_columns(), with a
444 # MAJOR CAVEAT - multi-value (contradictory) equalities are still
445 # represented as a reference to the UNRESOVABLE_CONDITION constant
446 # The reason we do this is that some codepaths only care about the
447 # codition being stable, as opposed to actually making sense
448 #
449 # The normal mode is used to figure out if a resultset is constrained
450 # to a column which is part of a unique constraint, which in turn
451 # allows us to better predict how ordering will behave etc.
452 #
453 # With the optional "consider_nulls" boolean argument, the function
454 # is instead used to infer inambiguous values from conditions
455 # (e.g. the inheritance of resultset conditions on new_result)
456 #
457 sub extract_equality_conditions {
458   my ($where, $consider_nulls) = @_;
459   my $where_hash = normalize_sqla_condition($where);
460
461   my $res = {};
462   my ($c, $v);
463   for $c (keys %$where_hash) {
464     my $vals;
465
466     if (!defined ($v = $where_hash->{$c}) ) {
467       $vals->{UNDEF} = $v if $consider_nulls
468     }
469     elsif (
470       ref $v eq 'HASH'
471         and
472       keys %$v == 1
473     ) {
474       if (exists $v->{-value}) {
475         if (defined $v->{-value}) {
476           $vals->{"VAL_$v->{-value}"} = $v->{-value}
477         }
478         elsif( $consider_nulls ) {
479           $vals->{UNDEF} = $v->{-value};
480         }
481       }
482       # do not need to check for plain values - normalize_sqla_condition did it for us
483       elsif(
484         length ref $v->{'='}
485           and
486         (
487           ( ref $v->{'='} eq 'HASH' and keys %{$v->{'='}} == 1 and exists $v->{'='}{-ident} )
488             or
489           is_literal_value($v->{'='})
490         )
491        ) {
492         $vals->{ 'SER_' . lax_serialize $v->{'='} } = $v->{'='};
493       }
494     }
495     elsif (
496       ! length ref $v
497         or
498       is_plain_value ($v)
499     ) {
500       $vals->{"VAL_$v"} = $v;
501     }
502     elsif (ref $v eq 'ARRAY' and ($v->[0]||'') eq '-and') {
503       for ( @{$v}[1..$#$v] ) {
504         my $subval = extract_equality_conditions({ $c => $_ }, 'consider nulls');  # always fish nulls out on recursion
505         next unless exists $subval->{$c};  # didn't find anything
506         $vals->{
507           ! defined $subval->{$c}                                        ? 'UNDEF'
508         : ( ! length ref $subval->{$c} or is_plain_value $subval->{$c} ) ? "VAL_$subval->{$c}"
509         : ( 'SER_' . lax_serialize $subval->{$c} )
510         } = $subval->{$c};
511       }
512     }
513
514     if (keys %$vals == 1) {
515       ($res->{$c}) = (values %$vals)
516         unless !$consider_nulls and exists $vals->{UNDEF};
517     }
518     elsif (keys %$vals > 1) {
519       $res->{$c} = UNRESOLVABLE_CONDITION;
520     }
521   }
522
523   $res;
524 }
525
526 1;